Skip to content

Commit 3b0708f

Browse files
committed
Split scanner to records and batches
1 parent 15de823 commit 3b0708f

File tree

4 files changed

+86
-216
lines changed

4 files changed

+86
-216
lines changed

crates/fluss/src/client/table/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ mod scanner;
3232
mod writer;
3333

3434
pub use append::{AppendWriter, TableAppend};
35-
pub use scanner::{LogScanner, TableScan};
35+
pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
3636

3737
#[allow(dead_code)]
3838
pub struct FlussTable<'a> {

crates/fluss/src/client/table/scanner.rs

Lines changed: 70 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use parking_lot::{Mutex, RwLock};
2222
use std::collections::{HashMap, HashSet};
2323
use std::slice::from_ref;
2424
use std::sync::Arc;
25-
use std::sync::atomic::{AtomicU8, Ordering};
2625
use std::time::Duration;
2726
use tempfile::TempDir;
2827

@@ -137,33 +136,57 @@ impl<'a> TableScan<'a> {
137136
}
138137

139138
pub fn create_log_scanner(self) -> Result<LogScanner> {
140-
LogScanner::new(
139+
let inner = LogScannerInner::new(
141140
&self.table_info,
142141
self.metadata.clone(),
143142
self.conn.get_connections(),
144143
self.projected_fields,
145-
)
144+
)?;
145+
Ok(LogScanner {
146+
inner: Arc::new(inner),
147+
})
146148
}
147-
}
148149

149-
/// Poll mode for LogScanner - ensures poll() and poll_batches() are not mixed
150-
const POLL_MODE_UNSET: u8 = 0;
151-
const POLL_MODE_RECORDS: u8 = 1;
152-
const POLL_MODE_BATCHES: u8 = 2;
150+
pub fn create_record_batch_log_scanner(self) -> Result<RecordBatchLogScanner> {
151+
let inner = LogScannerInner::new(
152+
&self.table_info,
153+
self.metadata.clone(),
154+
self.conn.get_connections(),
155+
self.projected_fields,
156+
)?;
157+
Ok(RecordBatchLogScanner {
158+
inner: Arc::new(inner),
159+
})
160+
}
161+
}
153162

163+
/// Scanner for reading log records one at a time with per-record metadata.
164+
///
165+
/// Use this scanner when you need access to individual record offsets and timestamps.
166+
/// For batch-level access, use [`RecordBatchLogScanner`] instead.
154167
pub struct LogScanner {
168+
inner: Arc<LogScannerInner>,
169+
}
170+
171+
/// Scanner for reading log data as Arrow RecordBatches.
172+
///
173+
/// More efficient than [`LogScanner`] for batch-level analytics where per-record
174+
/// metadata (offsets, timestamps) is not needed.
175+
pub struct RecordBatchLogScanner {
176+
inner: Arc<LogScannerInner>,
177+
}
178+
179+
/// Private shared implementation for both scanner types
180+
struct LogScannerInner {
155181
table_path: TablePath,
156182
table_id: i64,
157183
metadata: Arc<Metadata>,
158184
log_scanner_status: Arc<LogScannerStatus>,
159185
log_fetcher: LogFetcher,
160-
/// Tracks whether scanner is in records mode (poll) or batches mode (poll_batches).
161-
/// Once set, cannot be changed to prevent data loss from mixing polling methods.
162-
poll_mode: AtomicU8,
163186
}
164187

165-
impl LogScanner {
166-
pub fn new(
188+
impl LogScannerInner {
189+
fn new(
167190
table_info: &TableInfo,
168191
metadata: Arc<Metadata>,
169192
connections: Arc<RpcClient>,
@@ -182,32 +205,10 @@ impl LogScanner {
182205
log_scanner_status.clone(),
183206
projected_fields,
184207
)?,
185-
poll_mode: AtomicU8::new(POLL_MODE_UNSET),
186208
})
187209
}
188210

189-
pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
190-
// Check and set poll mode to prevent mixing with poll_batches
191-
match self.poll_mode.compare_exchange(
192-
POLL_MODE_UNSET,
193-
POLL_MODE_RECORDS,
194-
Ordering::AcqRel,
195-
Ordering::Acquire,
196-
) {
197-
Ok(_) => { /* First call, set to records mode */ }
198-
Err(POLL_MODE_RECORDS) => { /* Already in records mode, ok */ }
199-
Err(POLL_MODE_BATCHES) => {
200-
return Err(Error::IllegalState {
201-
message: "Cannot call poll() after poll_batches(). Mixing polling methods causes data loss. Create a new scanner to switch methods.".to_string(),
202-
});
203-
}
204-
Err(invalid) => {
205-
return Err(Error::IllegalState {
206-
message: format!("Invalid poll mode state: {}", invalid),
207-
});
208-
}
209-
}
210-
211+
async fn poll_records(&self, timeout: Duration) -> Result<ScanRecords> {
211212
let start = std::time::Instant::now();
212213
let deadline = start + timeout;
213214

@@ -246,7 +247,7 @@ impl LogScanner {
246247
}
247248
}
248249

249-
pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
250+
async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
250251
let table_bucket = TableBucket::new(self.table_id, bucket);
251252
self.metadata
252253
.check_and_update_table_metadata(from_ref(&self.table_path))
@@ -256,7 +257,7 @@ impl LogScanner {
256257
Ok(())
257258
}
258259

259-
pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
260+
async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
260261
self.metadata
261262
.check_and_update_table_metadata(from_ref(&self.table_path))
262263
.await?;
@@ -291,38 +292,7 @@ impl LogScanner {
291292
self.log_fetcher.collect_fetches()
292293
}
293294

294-
/// Poll for Arrow RecordBatches directly.
295-
///
296-
/// More efficient than `poll()` when you need batch-level access for analytics
297-
/// - This method does not expose per-record offsets or timestamps.
298-
/// Use `poll()` if you need that metadata.
299-
///
300-
/// Do not mix `poll()` and `poll_batches()` on the same scanner.
301-
/// Calling `poll_batches()` after `poll()` (or vice versa) will return an error
302-
/// to prevent data loss. Create a new scanner if you need to switch methods.
303-
///
304-
pub async fn poll_batches(&self, timeout: Duration) -> Result<Vec<RecordBatch>> {
305-
// Check and set poll mode to prevent mixing with poll
306-
match self.poll_mode.compare_exchange(
307-
POLL_MODE_UNSET,
308-
POLL_MODE_BATCHES,
309-
Ordering::AcqRel,
310-
Ordering::Acquire,
311-
) {
312-
Ok(_) => { /* First call, set to batches mode */ }
313-
Err(POLL_MODE_BATCHES) => { /* Already in batches mode, ok */ }
314-
Err(POLL_MODE_RECORDS) => {
315-
return Err(Error::IllegalState {
316-
message: "Cannot call poll_batches() after poll(). Mixing polling methods causes data loss. Create a new scanner to switch methods.".to_string(),
317-
});
318-
}
319-
Err(invalid) => {
320-
return Err(Error::IllegalState {
321-
message: format!("Invalid poll mode state: {}", invalid),
322-
});
323-
}
324-
}
325-
295+
async fn poll_batches(&self, timeout: Duration) -> Result<Vec<RecordBatch>> {
326296
let start = std::time::Instant::now();
327297
let deadline = start + timeout;
328298

@@ -363,6 +333,36 @@ impl LogScanner {
363333
}
364334
}
365335

336+
// Implementation for LogScanner (records mode)
337+
impl LogScanner {
338+
pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
339+
self.inner.poll_records(timeout).await
340+
}
341+
342+
pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
343+
self.inner.subscribe(bucket, offset).await
344+
}
345+
346+
pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
347+
self.inner.subscribe_batch(bucket_offsets).await
348+
}
349+
}
350+
351+
// Implementation for RecordBatchLogScanner (batches mode)
352+
impl RecordBatchLogScanner {
353+
pub async fn poll(&self, timeout: Duration) -> Result<Vec<RecordBatch>> {
354+
self.inner.poll_batches(timeout).await
355+
}
356+
357+
pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
358+
self.inner.subscribe(bucket, offset).await
359+
}
360+
361+
pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
362+
self.inner.subscribe_batch(bucket_offsets).await
363+
}
364+
}
365+
366366
struct LogFetcher {
367367
conns: Arc<RpcClient>,
368368
metadata: Arc<Metadata>,

crates/fluss/src/error.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,6 @@ pub enum Error {
9292
)]
9393
IllegalArgument { message: String },
9494

95-
#[snafu(
96-
visibility(pub(crate)),
97-
display("Fluss hitting illegal state error {}.", message)
98-
)]
99-
IllegalState { message: String },
100-
10195
#[snafu(
10296
visibility(pub(crate)),
10397
display("Fluss hitting IO not supported error {}.", message)

0 commit comments

Comments
 (0)