[ISSUE #27] poll_batches() API call and IT tests#127
Conversation
c9850bb to
1ea2031
Compare
|
Hi @luoyuxia, PTAL 🙏 |
leekeiabstraction
left a comment
There was a problem hiding this comment.
Thank you for the PR! Hope you don't mind me reviewing it, left a comment.
|
@leekeiabstraction I've splitted the logic intro two different scanner types. PTAL 🙏 |
79b260f to
3b0708f
Compare
|
@fresh-borzoni Thanks for the pr. I'll have a look when i find some time. |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new batch-oriented scanning API (RecordBatchLogScanner) for analytics workloads, providing compile-time separation between per-record and batch access patterns. The implementation shares common logic via LogScannerInner while exposing distinct public APIs for each mode.
- Created
RecordBatchLogScannerfor direct ArrowRecordBatchaccess alongside existingLogScanner - Implemented shared
LogScannerInnercontaining common scanning logic for both scanner types - Added comprehensive integration tests covering basic functionality, projections, order preservation, and offset tracking
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/fluss/src/client/table/scanner.rs | Core implementation: Added RecordBatchLogScanner and LogScannerInner with separate poll_batches() and collect_batches() methods for batch-level access |
| crates/fluss/src/client/table/mod.rs | Exports new RecordBatchLogScanner type in public API |
| crates/fluss/src/client/table/log_fetch_buffer.rs | Extended CompletedFetch trait with fetch_batches() method and added next_fetched_batch() helper for direct batch extraction |
| crates/fluss/src/record/arrow.rs | Added LogRecordBatch::record_batch() method for efficient batch-level access without row iteration |
| crates/fluss/tests/integration/table.rs | Added 5 comprehensive integration tests covering basic functionality, empty results, projection, order preservation, and offset continuation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
luoyuxia
left a comment
There was a problem hiding this comment.
@fresh-borzoni Thanks for the pr. Only left minor comments. PTAL
|
@luoyuxia Thanks for the review. |
luoyuxia
left a comment
There was a problem hiding this comment.
@fresh-borzoni Thanks for quick update. LGTM!
Purpose
Linked issue: close #27
New feature for analytics use cases. Provides scanner variants with compile-time mode separation between per-record and batch access patterns.
Brief change log
** Scanner architecture: **
RecordBatchLogScannerfor batch access (separate fromLogScanner)LogScannerInnerholding shared implementationArc<LogScannerInner>with distinct public APIs:LogScanner::poll()→Result<ScanRecords>(per-record with offset/timestamp metadata)RecordBatchLogScanner::poll()→Result<Vec<RecordBatch>>(direct batch access)Implementation details:
TableScan::create_record_batch_log_scanner()constructorCompletedFetchtrait withfetch_batches()methodLogRecordBatch::record_batch()for direct batch extractionCleanup:
poll_mode: AtomicU8, mode checking logic)Error::IllegalStatevariant (no longer needed)LogScannerAPI unchangedTests
Integration Tests:
Added test_poll_batches IT test with scenarios:
Basic functionality and data correctness
Timeout behavior on empty table
Field projection support
Order preservation across writes
Offset tracking across consecutive polls
starting from non-zero offset, batch should be sliced
Existing record-level tests continue to pass
API and Format
New API: