From a3f430b21064dc9286ce42bdc75f0eb1efead752 Mon Sep 17 00:00:00 2001 From: matt Date: Sat, 11 Apr 2026 08:14:29 +0100 Subject: [PATCH] Fix sled backend del() bug and implement time travel support del() wrote PUT_MARKER instead of DEL_MARKER into the changes buffer, so deletes within a transaction were silently treated as empty puts. range_skip_scan_tuple() returned an error iterator, disabling time travel on the sled backend. Implement it using the same check_key_for_validity/seek pattern as the memory backend, with SledSkipIterator (read path) and SledSkipDualIterator (write path with uncommitted changes). Co-Authored-By: Claude Opus 4.6 (1M context) --- cozo-core/src/storage/sled.rs | 236 ++++++++++++++++++++++++++++++++-- 1 file changed, 225 insertions(+), 11 deletions(-) diff --git a/cozo-core/src/storage/sled.rs b/cozo-core/src/storage/sled.rs index 1811fdf4..b827721d 100644 --- a/cozo-core/src/storage/sled.rs +++ b/cozo-core/src/storage/sled.rs @@ -7,17 +7,16 @@ */ use std::cmp::Ordering; -use std::iter; use std::iter::Fuse; use std::path::Path; use itertools::Itertools; -use miette::{miette, IntoDiagnostic, Result}; +use miette::{IntoDiagnostic, Result}; use sled::{Batch, Config, Db, IVec, Iter, Mode}; -use crate::data::tuple::Tuple; +use crate::data::tuple::{check_key_for_validity, Tuple}; use crate::data::value::ValidityTs; -use crate::runtime::relation::decode_tuple_from_kv; +use crate::runtime::relation::{decode_tuple_from_kv, extend_tuple_from_v}; use crate::storage::{Storage, StoreTx}; use crate::utils::{swap_option_result, TempCollector}; @@ -132,7 +131,7 @@ impl<'s> StoreTx<'s> for SledTx { #[inline] fn del(&mut self, key: &[u8]) -> Result<()> { self.ensure_changes_db()?; - let val_to_write = [PUT_MARKER]; + let val_to_write = [DEL_MARKER]; self.changes .as_mut() .unwrap() @@ -211,13 +210,26 @@ impl<'s> StoreTx<'s> for SledTx { fn range_skip_scan_tuple<'a>( &'a self, - _lower: &[u8], - _upper: &[u8], - _valid_at: ValidityTs, + lower: &[u8], + upper: &[u8], + valid_at: ValidityTs, ) -> Box> + 'a> { - Box::new(iter::once(Err(miette!( - "Sled backend does not support time travelling." - )))) + if let Some(changes) = &self.changes { + Box::new(SledSkipDualIterator { + db: &self.db, + changes, + upper: upper.to_vec(), + valid_at, + next_bound: lower.to_vec(), + }) + } else { + Box::new(SledSkipIterator { + db: &self.db, + upper: upper.to_vec(), + valid_at, + next_bound: lower.to_vec(), + }) + } } fn range_scan<'a>( @@ -423,3 +435,205 @@ impl Iterator for SledIter { swap_option_result(self.next_inner()) } } + +struct SledSkipIterator<'a> { + db: &'a Db, + upper: Vec, + valid_at: ValidityTs, + next_bound: Vec, +} + +impl SledSkipIterator<'_> { + #[inline] + fn next_inner(&mut self) -> Result> { + loop { + let nxt = self + .db + .range(self.next_bound.clone()..self.upper.clone()) + .next(); + match nxt { + None => return Ok(None), + Some(result) => { + let (candidate_key, candidate_val) = result.into_diagnostic()?; + let (ret, nxt_bound) = + check_key_for_validity(&candidate_key, self.valid_at, None); + self.next_bound = nxt_bound; + if let Some(mut nk) = ret { + extend_tuple_from_v(&mut nk, &candidate_val); + return Ok(Some(nk)); + } + } + } + } + } +} + +impl Iterator for SledSkipIterator<'_> { + type Item = Result; + + #[inline] + fn next(&mut self) -> Option { + swap_option_result(self.next_inner()) + } +} + +struct SledSkipDualIterator<'a> { + db: &'a Db, + changes: &'a Db, + upper: Vec, + valid_at: ValidityTs, + next_bound: Vec, +} + +impl SledSkipDualIterator<'_> { + #[inline] + fn next_inner(&mut self) -> Result> { + loop { + let stored_nxt = self + .db + .range(self.next_bound.clone()..self.upper.clone()) + .next() + .transpose() + .into_diagnostic()?; + let delta_nxt = self + .changes + .range(self.next_bound.clone()..self.upper.clone()) + .next() + .transpose() + .into_diagnostic()?; + + let (candidate_key, candidate_val): (Vec, Vec) = + match (stored_nxt, delta_nxt) { + (None, None) => return Ok(None), + (None, Some((dk, dv))) => { + if dv[0] == DEL_MARKER { + let (_, nxt_seek) = + check_key_for_validity(&dk, self.valid_at, None); + self.next_bound = nxt_seek; + continue; + } + (dk.to_vec(), dv[1..].to_vec()) + } + (Some((sk, sv)), None) => (sk.to_vec(), sv.to_vec()), + (Some((sk, sv)), Some((dk, dv))) => { + if sk < dk { + (sk.to_vec(), sv.to_vec()) + } else { + if dv[0] == DEL_MARKER { + let (_, nxt_seek) = + check_key_for_validity(&dk, self.valid_at, None); + self.next_bound = nxt_seek; + continue; + } + (dk.to_vec(), dv[1..].to_vec()) + } + } + }; + + let (ret, nxt_bound) = + check_key_for_validity(&candidate_key, self.valid_at, None); + self.next_bound = nxt_bound; + if let Some(mut nk) = ret { + extend_tuple_from_v(&mut nk, &candidate_val); + return Ok(Some(nk)); + } + } + } +} + +impl Iterator for SledSkipDualIterator<'_> { + type Item = Result; + + #[inline] + fn next(&mut self) -> Option { + swap_option_result(self.next_inner()) + } +} + +#[cfg(test)] +mod tests { + use crate::data::value::{DataValue, Validity}; + use crate::runtime::db::ScriptMutability; + use miette::{IntoDiagnostic, Result}; + use std::collections::BTreeMap; + use tempfile::TempDir; + + use super::*; + + fn setup_test_db() -> Result<(TempDir, crate::Db)> { + let temp_dir = TempDir::new().into_diagnostic()?; + let db = new_cozo_sled(temp_dir.path())?; + db.run_script( + r#" + {:create plain {k: Int => v}} + {:create tt {k: Int, vld: Validity => v}} + "#, + Default::default(), + ScriptMutability::Mutable, + )?; + Ok((temp_dir, db)) + } + + fn run(db: &crate::Db, q: &str) -> Result { + db.run_script(q, Default::default(), ScriptMutability::Mutable) + } + + fn tt_row(k: i64, ts: i64, v: i64) -> Vec { + vec![ + DataValue::from(k), + DataValue::Validity(Validity::from((ts, true))), + DataValue::from(v), + ] + } + + #[test] + fn test_delete() -> Result<()> { + let (_tmp, db) = setup_test_db()?; + + run(&db, "?[k, v] <- [[1, 'a'], [2, 'b'], [3, 'c']] :put plain {k => v}")?; + assert_eq!(run(&db, "?[k, v] := *plain{k, v}")?.rows.len(), 3); + + // Delete + read in same imperative script (exercises uncommitted delta) + let result = run(&db, r#" + {?[k] <- [[2]] :rm plain {k}} + {?[k, v] := *plain{k, v}} + "#)?; + assert_eq!(result.rows.len(), 2); + assert_eq!(result.rows[0][0], DataValue::from(1)); + assert_eq!(result.rows[1][0], DataValue::from(3)); + + Ok(()) + } + + #[test] + fn test_time_travel() -> Result<()> { + let (_tmp, db) = setup_test_db()?; + + let mut to_import = BTreeMap::new(); + to_import.insert( + "tt".to_string(), + crate::NamedRows { + headers: vec!["k".into(), "vld".into(), "v".into()], + rows: vec![ + tt_row(1, 0, 10), tt_row(1, 5, 15), + tt_row(2, 0, 20), tt_row(2, 5, 25), + ], + next: None, + }, + ); + db.import_relations(to_import)?; + + // Two keys, each with versions at t=0 and t=5 + let r = run(&db, "?[k, v] := *tt{k, v @ 0}")?; + assert_eq!(r.rows.len(), 2); + assert_eq!(r.rows[0], vec![DataValue::from(1), DataValue::from(10)]); + assert_eq!(r.rows[1], vec![DataValue::from(2), DataValue::from(20)]); + + let r = run(&db, "?[k, v] := *tt{k, v @ 5}")?; + assert_eq!(r.rows.len(), 2); + assert_eq!(r.rows[0], vec![DataValue::from(1), DataValue::from(15)]); + assert_eq!(r.rows[1], vec![DataValue::from(2), DataValue::from(25)]); + + Ok(()) + } +}