-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the bug
The calculate_range function in datafusion/datasource/src/mod.rs creates invalid byte ranges (where start > end) when reading single-line JSON files that exceed 10MB total size with target_partitions >= 2.
This causes an error from object_store:
Error: ObjectStore(Generic { store: "S3", source: Inconsistent { start: 1149247, end: 1149246 } })
Root Cause
In calculate_range, the function calculates adjusted byte ranges by finding newline boundaries:
let start_delta = if start != 0 {
find_first_newline(store, location, start - 1, file_size, newline).await?
} else {
0
};
let end_delta = if end != file_size {
find_first_newline(store, location, end - 1, file_size, newline).await?
} else {
0
};
let range = start + start_delta..end + end_delta;
if range.start == range.end {
return Ok(RangeCalculation::TerminateEarly);
}When find_first_newline doesn't find a newline (single-line JSON), it returns the length of the remaining file. This causes start + start_delta to exceed end + end_delta, creating an invalid range.
The current check only handles range.start == range.end, not range.start > range.end.
Trigger Conditions
All must be true:
- Total file size > 10MB (triggers
FileGroupPartitionerrepartitioning) target_partitions >= 2- JSON files are single-line (no internal newlines, e.g., from
json.dump())
To Reproduce
1. Create test data (single-line JSON, >10MB total)
import json, random, string
for i in range(1, 11):
data = {'id': i, 'padding': ''.join(random.choices(string.ascii_letters, k=1100000))}
with open(f'data/file_{i}.json', 'w') as f:
json.dump(data, f) # Single line, no newlines2. Upload to S3/MinIO
mc mb minio/test-bucket
mc cp --recursive data/ minio/test-bucket/data/3. Run DataFusion query
use datafusion::prelude::*;
use object_store::aws::AmazonS3Builder;
use std::sync::Arc;
use url::Url;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let store = Arc::new(AmazonS3Builder::new()
.with_endpoint("http://localhost:9000")
.with_bucket_name("test-bucket")
.with_access_key_id("minioadmin")
.with_secret_access_key("minioadmin")
.with_region("us-east-1")
.with_allow_http(true)
.build()?);
let config = SessionConfig::new().with_target_partitions(2); // Fails with >= 2
let ctx = SessionContext::new_with_config(config);
ctx.register_object_store(&Url::parse("s3://test-bucket")?, store);
ctx.sql("CREATE EXTERNAL TABLE test STORED AS JSON LOCATION 's3://test-bucket/data/'").await?;
let df = ctx.sql("SELECT * FROM test").await?;
let results = df.collect().await?; // FAILS with invalid range error
Ok(())
}Expected behavior
DataFusion should handle single-line JSON files gracefully when partitioning. When a partition contains no complete records (because the entire file is a single line), that partition should be skipped via RangeCalculation::TerminateEarly.
Additional context
Affected versions
- datafusion 51.0.0 (also tested with 50.x, 49.x, 45.x)
- object_store 0.12.4
- Tested with MinIO and RustFS (both fail identically)
Workarounds
- Set partitions to 1:
SessionConfig::new().with_target_partitions(1) - Always add newline when writing NDJSON