diff --git a/Cargo.lock b/Cargo.lock index 5d287e7..f66269d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1827,6 +1827,17 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "logindex" +version = "0.1.0" +dependencies = [ + "log", + "parking_lot", + "rust-rocksdb", + "storage", + "tempfile", +] + [[package]] name = "loom" version = "0.7.2" @@ -2952,7 +2963,7 @@ dependencies = [ [[package]] name = "rust-librocksdb-sys" version = "0.41.0+10.9.1" -source = "git+https://github.com/zaidoon1/rust-rocksdb?branch=master#83aceef5fc372da2fd2dd78bac786a189742274f" +source = "git+https://github.com/ruojieranyishen/zaidoon1-rust-rocksdb?rev=f7abb18c64fac810f3c4736aef833c340396449b#f7abb18c64fac810f3c4736aef833c340396449b" dependencies = [ "bindgen 0.72.1", "bzip2-sys", @@ -2967,7 +2978,7 @@ dependencies = [ [[package]] name = "rust-rocksdb" version = "0.45.0" -source = "git+https://github.com/zaidoon1/rust-rocksdb?branch=master#83aceef5fc372da2fd2dd78bac786a189742274f" +source = "git+https://github.com/ruojieranyishen/zaidoon1-rust-rocksdb?rev=f7abb18c64fac810f3c4736aef833c340396449b#f7abb18c64fac810f3c4736aef833c340396449b" dependencies = [ "libc", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 97cc119..82e783b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ "src/engine", + "src/logindex", "src/storage", "src/kstd", "src/common/macro", @@ -42,7 +43,7 @@ quote = "1.0" syn = { version = "2.0", features = ["extra-traits", "full"] } env_logger = "0.11" log = "0.4" -rocksdb = { package = "rust-rocksdb", git = "https://github.com/zaidoon1/rust-rocksdb", branch = "master", features = ["multi-threaded-cf"] } +rocksdb = { package = "rust-rocksdb", git = "https://github.com/ruojieranyishen/zaidoon1-rust-rocksdb", rev = "f7abb18c64fac810f3c4736aef833c340396449b", features = ["multi-threaded-cf"] } thiserror = "1.0" serde = "1.0" serde_json = "1.0" @@ -68,6 +69,7 @@ openraft = { version = "0.9.20", features = ["serde"] } ## workspaces members engine = { path = "src/engine" } +logindex = { path = "src/logindex" } storage = { path = "src/storage" } kstd = { path = "src/kstd" } common-macro = { path = "src/common/macro" } diff --git a/src/logindex/Cargo.toml b/src/logindex/Cargo.toml new file mode 100644 index 0000000..09454af --- /dev/null +++ b/src/logindex/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "logindex" +version.workspace = true +edition.workspace = true + +[lints] +workspace = true + +[dependencies] +rocksdb.workspace = true +storage = { path = "../storage" } +parking_lot.workspace = true +log.workspace = true + +[dev-dependencies] +tempfile.workspace = true + +[[test]] +name = "logindex_integration" +path = "tests/logindex_integration.rs" diff --git a/src/logindex/src/cf_tracker.rs b/src/logindex/src/cf_tracker.rs new file mode 100644 index 0000000..98a614b --- /dev/null +++ b/src/logindex/src/cf_tracker.rs @@ -0,0 +1,381 @@ +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; + +use parking_lot::RwLock; + +use crate::COLUMN_FAMILY_COUNT; +use crate::db_access::{DbCfAccess, Result}; +use crate::table_properties::get_largest_log_index_from_collection; +use crate::types::{LogIndex, LogIndexSeqnoPair, SequenceNumber}; + +/// Each CF's applied (latest in memtable) and flushed (latest in SST) +struct LogIndexPair { + applied_index: LogIndexSeqnoPair, + flushed_index: LogIndexSeqnoPair, +} + +impl Default for LogIndexPair { + fn default() -> Self { + Self { + applied_index: LogIndexSeqnoPair::new(0, 0), + flushed_index: LogIndexSeqnoPair::new(0, 0), + } + } +} + +/// Return result of GetSmallestLogIndex +#[derive(Debug, Clone)] +pub struct SmallestIndexRes { + pub smallest_applied_log_index_cf: i32, + pub smallest_applied_log_index: LogIndex, + pub smallest_flushed_log_index_cf: i32, + pub smallest_flushed_log_index: LogIndex, + pub smallest_flushed_seqno: SequenceNumber, +} + +impl Default for SmallestIndexRes { + fn default() -> Self { + Self { + smallest_applied_log_index_cf: -1, + smallest_applied_log_index: i64::MAX, + smallest_flushed_log_index_cf: -1, + smallest_flushed_log_index: i64::MAX, + smallest_flushed_seqno: u64::MAX, + } + } +} + +/// Tracks log index state for all Column Families +pub struct LogIndexOfColumnFamilies { + cf: RwLock<[LogIndexPair; COLUMN_FAMILY_COUNT]>, + last_flush_index: LogIndexSeqnoPair, +} + +impl Default for LogIndexOfColumnFamilies { + fn default() -> Self { + Self { + cf: RwLock::new(std::array::from_fn(|_| LogIndexPair::default())), + last_flush_index: LogIndexSeqnoPair::new(0, 0), + } + } +} + +impl LogIndexOfColumnFamilies { + fn is_valid_cf_id(cf_id: usize) -> bool { + cf_id < COLUMN_FAMILY_COUNT + } + + pub fn new() -> Self { + Self::default() + } + + /// Restore applied/flushed state from each CF's SST + pub fn init(&self, db: &D) -> Result<()> { + for i in 0..COLUMN_FAMILY_COUNT { + let collection = db.get_properties_of_all_tables_cf(i)?; + if let Some(pair) = get_largest_log_index_from_collection(&collection) { + let log_index = pair.applied_log_index(); + let seqno = pair.seqno(); + let cf = self.cf.write(); + cf[i].applied_index.set(log_index, seqno); + cf[i].flushed_index.set(log_index, seqno); + } + } + Ok(()) + } + + /// Get smallest applied/flushed log index (used for Purge, SetFlushedLogIndexGlobal) + /// + /// `flush_cf`: index of CF currently being flushed, `None` means none + pub fn get_smallest_log_index(&self, flush_cf: Option) -> SmallestIndexRes { + let mut res = SmallestIndexRes::default(); + let cf = self.cf.read(); + + for i in 0..COLUMN_FAMILY_COUNT { + let skip = flush_cf + .is_some_and(|fc| i != fc && cf[i].flushed_index.ge_seqno(&cf[i].applied_index)); + if skip { + continue; + } + + let applied = cf[i].applied_index.log_index(); + let flushed = cf[i].flushed_index.log_index(); + let flushed_seqno = cf[i].flushed_index.seqno(); + + if applied < res.smallest_applied_log_index { + res.smallest_applied_log_index = applied; + res.smallest_applied_log_index_cf = i as i32; + } + if flushed < res.smallest_flushed_log_index { + res.smallest_flushed_log_index = flushed; + res.smallest_flushed_seqno = flushed_seqno; + res.smallest_flushed_log_index_cf = i as i32; + } + } + res + } + + pub fn set_flushed_log_index(&self, cf_id: usize, log_index: LogIndex, seqno: SequenceNumber) { + if !Self::is_valid_cf_id(cf_id) { + return; + } + let cf = self.cf.write(); + let li = cf[cf_id].flushed_index.log_index(); + let seq = cf[cf_id].flushed_index.seqno(); + cf[cf_id] + .flushed_index + .set(log_index.max(li), seqno.max(seq)); + } + + pub fn set_flushed_log_index_global(&self, log_index: LogIndex, seqno: SequenceNumber) { + self.set_last_flush_index(log_index, seqno); + let cf = self.cf.write(); + for i in 0..COLUMN_FAMILY_COUNT { + if cf[i].flushed_index.le_seqno(&self.last_flush_index) { + let flush_li = cf[i] + .flushed_index + .log_index() + .max(self.last_flush_index.log_index()); + let flush_seq = cf[i] + .flushed_index + .seqno() + .max(self.last_flush_index.seqno()); + cf[i].flushed_index.set(flush_li, flush_seq); + } + } + } + + /// Whether cur_log_index has been applied (less than applied) + pub fn is_applied(&self, cf_id: usize, cur_log_index: LogIndex) -> bool { + if !Self::is_valid_cf_id(cf_id) { + return false; + } + cur_log_index < self.cf.read()[cf_id].applied_index.log_index() + } + + /// Update applied_index on write; if flushed==applied, also update flushed + pub fn update(&self, cf_id: usize, cur_log_index: LogIndex, cur_seqno: SequenceNumber) { + if !Self::is_valid_cf_id(cf_id) { + return; + } + let cf = self.cf.write(); + if cf[cf_id].flushed_index.le_seqno(&self.last_flush_index) + && cf[cf_id].flushed_index.eq_seqno(&cf[cf_id].applied_index) + { + let flush_li = cf[cf_id] + .flushed_index + .log_index() + .max(self.last_flush_index.log_index()); + let flush_seq = cf[cf_id] + .flushed_index + .seqno() + .max(self.last_flush_index.seqno()); + cf[cf_id].flushed_index.set(flush_li, flush_seq); + } + cf[cf_id].applied_index.set(cur_log_index, cur_seqno); + } + + /// Whether there is a CF that needs flush (gap > 0) + pub fn is_pending_flush(&self) -> bool { + self.get_pending_flush_gap() > 0 + } + + /// max - min of all applied/flushed log indexes + pub fn get_pending_flush_gap(&self) -> u64 { + let cf = self.cf.read(); + let mut s: BTreeSet = BTreeSet::new(); + for i in 0..COLUMN_FAMILY_COUNT { + s.insert(cf[i].applied_index.log_index()); + s.insert(cf[i].flushed_index.log_index()); + } + if s.is_empty() { + return 0; + } + if s.len() == 1 { + return 0; + } + let first = *s.first().unwrap(); + let last = *s.last().unwrap(); + last.saturating_sub(first) as u64 + } + + pub fn set_last_flush_index(&self, log_index: LogIndex, seqno: SequenceNumber) { + let li = self.last_flush_index.log_index().max(log_index); + let seq = self.last_flush_index.seqno().max(seqno); + self.last_flush_index.set(li, seq); + } + + pub fn get_last_flush_index(&self) -> LogIndexSeqnoPair { + self.last_flush_index.clone() + } + + /// Get state of specified CF (for testing or debugging) + #[allow(dead_code)] + pub fn get_cf_applied(&self, cf_id: usize) -> (LogIndex, SequenceNumber) { + if !Self::is_valid_cf_id(cf_id) { + return (0, 0); + } + let cf = self.cf.read(); + ( + cf[cf_id].applied_index.log_index(), + cf[cf_id].applied_index.seqno(), + ) + } + + #[allow(dead_code)] + pub fn get_cf_flushed(&self, cf_id: usize) -> (LogIndex, SequenceNumber) { + if !Self::is_valid_cf_id(cf_id) { + return (0, 0); + } + let cf = self.cf.read(); + ( + cf[cf_id].flushed_index.log_index(), + cf[cf_id].flushed_index.seqno(), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::CF_NAMES; + use crate::LogIndexAndSequenceCollector; + use crate::db_access::DbCfAccess; + use rocksdb::{ColumnFamilyDescriptor, DB, Options}; + use std::sync::Arc; + use tempfile::TempDir; + + struct MultiCfDbAccess<'a> { + db: &'a DB, + } + + impl DbCfAccess for MultiCfDbAccess<'_> { + fn get_properties_of_all_tables_cf( + &self, + cf_id: usize, + ) -> Result { + if cf_id < CF_NAMES.len() { + let cf = self.db.cf_handle(CF_NAMES[cf_id]).expect("cf handle"); + self.db.get_properties_of_all_tables_cf(&cf) + } else { + self.db.get_properties_of_all_tables() + } + } + } + + #[test] + fn test_init_from_db() { + let temp_dir = TempDir::new().expect("temp dir"); + let path = temp_dir.path(); + + let collector = Arc::new(LogIndexAndSequenceCollector::new(0)); + let factory = crate::table_properties::LogIndexTablePropertiesCollectorFactory::new( + collector.clone(), + ); + + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.set_table_properties_collector_factory(factory); + + let cfs: Vec = CF_NAMES + .iter() + .map(|n| ColumnFamilyDescriptor::new(*n, opts.clone())) + .collect(); + + let db = DB::open_cf_descriptors(&opts, path, cfs).expect("open"); + + db.put(b"k", b"v").expect("put"); + db.flush().expect("flush"); + + let latest_seq = db.latest_sequence_number(); + collector.update(233333, latest_seq); + db.put(b"k2", b"v2").expect("put"); // new data to trigger second flush to generate new SST + db.flush().expect("flush"); + + let access = MultiCfDbAccess { db: &db }; + let cf_tracker = LogIndexOfColumnFamilies::new(); + cf_tracker.init(&access).expect("init"); + + let (applied, _) = cf_tracker.get_cf_applied(0); + let (flushed, _) = cf_tracker.get_cf_flushed(0); + assert_eq!(applied, 233333); + assert_eq!(flushed, 233333); + } + + #[test] + fn test_cf_tracker_update_and_get_smallest() { + let cf = LogIndexOfColumnFamilies::new(); + for i in 0..COLUMN_FAMILY_COUNT { + cf.update(i, 100 + i as i64, 1000 + i as u64); + } + cf.update(1, 50, 500); + + let res = cf.get_smallest_log_index(None); + assert_eq!(res.smallest_applied_log_index, 50); + assert_eq!(res.smallest_applied_log_index_cf, 1); + } + + #[test] + fn test_set_flushed_log_index() { + let cf = LogIndexOfColumnFamilies::new(); + cf.update(0, 100, 1000); + cf.set_flushed_log_index(0, 80, 800); + let (applied, _) = cf.get_cf_applied(0); + let (flushed, _) = cf.get_cf_flushed(0); + assert_eq!(applied, 100); + assert_eq!(flushed, 80); // max(0, 80) = 80 + } + + #[test] + fn test_get_pending_flush_gap() { + let cf = LogIndexOfColumnFamilies::new(); + cf.update(0, 10, 100); + cf.update(1, 50, 500); + cf.update(2, 30, 300); + let gap = cf.get_pending_flush_gap(); + // set = {applied 10,50,30} ∪ {flushed 0,0,0} = {0,10,30,50}, gap = 50 + assert_eq!(gap, 50); + } + + #[test] + fn test_is_pending_flush() { + let cf = LogIndexOfColumnFamilies::new(); + assert!(!cf.is_pending_flush()); // all 0, gap=0 + cf.update(0, 10, 100); + assert!(cf.is_pending_flush()); // applied 10 vs flushed 0, gap 10 + } +} diff --git a/src/logindex/src/collector.rs b/src/logindex/src/collector.rs new file mode 100644 index 0000000..11597c8 --- /dev/null +++ b/src/logindex/src/collector.rs @@ -0,0 +1,216 @@ +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the language governing permissions and +// limitations under the License. + +use crate::types::{LogIndex, LogIndexAndSequencePair, SequenceNumber}; +use parking_lot::RwLock; +use std::collections::VecDeque; +const DEFAULT_MAX_GAP: u64 = 1000; + +/// Maintains an ordered (LogIndex, SequenceNumber) mapping queue +/// +/// - Update: append with step_length_mask sampling +/// - FindAppliedLogIndex: binary search for log index corresponding to seqno +/// - Purge: clean up outdated entries after flush +pub struct LogIndexAndSequenceCollector { + /// When step_length_bit=0, mask=0, i.e., full sampling; (1 << bit) - 1 + step_length_mask: u64, + list: RwLock>, + max_gap: u64, +} + +impl LogIndexAndSequenceCollector { + /// Create Collector + /// + /// When step_length_bit=0, full sampling; when >0, sample at 2^bit intervals to save memory + pub fn new(step_length_bit: u8) -> Self { + let step_length_mask = match step_length_bit { + 0 => 0, + 1..=63 => (1u64 << step_length_bit) - 1, + _ => u64::MAX, + }; + Self { + step_length_mask, + list: RwLock::new(VecDeque::new()), + max_gap: DEFAULT_MAX_GAP, + } + } + + /// Binary search for log index corresponding to seqno (including seqno or earlier) + /// + /// seqno=0 (e.g., during compaction) returns 0 + pub fn find_applied_log_index(&self, seqno: SequenceNumber) -> LogIndex { + if seqno == 0 { + return 0; + } + let list = self.list.read(); + if list.is_empty() || seqno < list.front().unwrap().seqno() { + return 0; + } + if seqno >= list.back().unwrap().seqno() { + return list.back().unwrap().applied_log_index(); + } + // partition_point: the first position where predicate is false + // i.e., the index of the first element where seqno() > seqno + let idx = list.partition_point(|p| p.seqno() <= seqno); + list.get(idx.saturating_sub(1)) + .map(|p| p.applied_log_index()) + .unwrap_or(0) + } + + /// Append new mapping (with step_length_mask sampling) + /// + /// Only append when (log_index & step_length_mask) == 0 + /// + /// Note: LogIndex is i64 but expected to be non-negative in practice. + /// Negative values will be skipped as they cast to large u64 values. + pub fn update( + &self, + smallest_applied_log_index: LogIndex, + smallest_flush_seqno: SequenceNumber, + ) { + if smallest_applied_log_index < 0 { + return; + } + if (smallest_applied_log_index as u64 & self.step_length_mask) != 0 { + return; + } + let mut list = self.list.write(); + list.push_back(LogIndexAndSequencePair::new( + smallest_applied_log_index, + smallest_flush_seqno, + )); + } + + /// Clean up outdated mappings after flush + /// + /// Strategy: keep at least one element <= smallest_applied_log_index to ensure correct lookup next flush. + /// When the second element also satisfies the condition, the first can be safely removed. + pub fn purge(&self, smallest_applied_log_index: LogIndex) { + let mut list = self.list.write(); + // Keep at least one element <= smallest_applied_log_index + // When list[1] <= threshold, list[0] can be safely removed + while list.len() >= 2 && list[1].applied_log_index() <= smallest_applied_log_index { + list.pop_front(); + } + } + + /// Whether manual flush is needed (queue length >= max_gap) + pub fn is_flush_pending(&self) -> bool { + self.size() >= self.max_gap + } + + /// Queue length + pub fn size(&self) -> u64 { + self.list.read().len() as u64 + } + + /// Set max_gap (for testing or tuning) + #[allow(dead_code)] + pub fn set_max_gap(&mut self, gap: u64) { + self.max_gap = gap; + } +} + +impl Default for LogIndexAndSequenceCollector { + fn default() -> Self { + Self::new(0) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_find_applied_log_index_empty() { + let c = LogIndexAndSequenceCollector::new(0); + assert_eq!(c.find_applied_log_index(0), 0); + assert_eq!(c.find_applied_log_index(100), 0); + } + + #[test] + fn test_find_applied_log_index_basic() { + let c = LogIndexAndSequenceCollector::new(0); + c.update(10, 100); + c.update(20, 200); + c.update(30, 300); + + assert_eq!(c.find_applied_log_index(0), 0); + assert_eq!(c.find_applied_log_index(50), 0); + assert_eq!(c.find_applied_log_index(100), 10); + assert_eq!(c.find_applied_log_index(150), 10); + assert_eq!(c.find_applied_log_index(200), 20); + assert_eq!(c.find_applied_log_index(250), 20); + assert_eq!(c.find_applied_log_index(300), 30); + assert_eq!(c.find_applied_log_index(400), 30); + } + + #[test] + fn test_update_step_length_mask() { + let c = LogIndexAndSequenceCollector::new(2); // mask = 3 + c.update(0, 100); // 0 & 3 == 0, add + c.update(1, 101); // 1 & 3 != 0, skip + c.update(4, 104); // 4 & 3 == 0, add + assert_eq!(c.size(), 2); + assert_eq!(c.find_applied_log_index(100), 0); + assert_eq!(c.find_applied_log_index(104), 4); + } + + #[test] + fn test_purge() { + let c = LogIndexAndSequenceCollector::new(0); + c.update(10, 100); + c.update(20, 200); + c.update(30, 300); + c.update(40, 400); + + // delete 10, keep 20,30,40 (second=20<=25 delete 10; second=30>25 stop) + c.purge(25); + assert_eq!(c.size(), 3); + assert_eq!(c.find_applied_log_index(300), 30); + assert_eq!(c.find_applied_log_index(250), 20); + } + + #[test] + fn test_is_flush_pending() { + let mut c = LogIndexAndSequenceCollector::new(0); + c.set_max_gap(3); + assert!(!c.is_flush_pending()); + c.update(1, 1); + c.update(2, 2); + assert!(!c.is_flush_pending()); + c.update(3, 3); + assert!(c.is_flush_pending()); + } +} diff --git a/src/logindex/src/db_access.rs b/src/logindex/src/db_access.rs new file mode 100644 index 0000000..8a69ec1 --- /dev/null +++ b/src/logindex/src/db_access.rs @@ -0,0 +1,50 @@ +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the language governing permissions and +// limitations under the License. + +use rocksdb::table_properties::TablePropertiesCollection; + +/// Error type (reuse rocksdb::Error or custom) +pub type Result = std::result::Result; + +/// Thin wrapper interface: provides ability to get TableProperties by CF +/// +/// Equivalent to C++ Redis's GetDB() + GetColumnFamilyHandles()[cf_id], +/// used for LogIndexOfColumnFamilies::Init to iterate CFs and call GetPropertiesOfAllTables. +pub trait DbCfAccess { + /// Get TableProperties of all SSTs for specified CF + /// + /// # Arguments + /// * `cf_id` - ColumnFamily index, range [0, COLUMN_FAMILY_COUNT) + fn get_properties_of_all_tables_cf(&self, cf_id: usize) -> Result; +} diff --git a/src/logindex/src/event_listener.rs b/src/logindex/src/event_listener.rs new file mode 100644 index 0000000..9131edb --- /dev/null +++ b/src/logindex/src/event_listener.rs @@ -0,0 +1,210 @@ +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; + +use rocksdb::event_listener::{EventListener, FlushJobInfo}; + +use crate::cf_tracker::LogIndexOfColumnFamilies; +use crate::collector::LogIndexAndSequenceCollector; +use crate::{COLUMN_FAMILY_COUNT, cf_name_to_index}; + +/// Snapshot callback: `(log_index, is_manual)` +pub type SnapshotCallback = Box; + +/// Manual flush trigger: pass cf_id, caller executes db.flush_cf(cf) +pub type FlushTrigger = Box; + +/// EventListener: updates collector/cf_tracker when flush completes, triggers snapshot or manual flush if needed +pub struct LogIndexAndSequenceCollectorPurger { + collector: std::sync::Arc, + cf_tracker: std::sync::Arc, + callback: SnapshotCallback, + flush_trigger: Option, + count: AtomicU64, + manual_flushing_cf: AtomicI64, +} + +impl LogIndexAndSequenceCollectorPurger { + pub fn new( + collector: std::sync::Arc, + cf_tracker: std::sync::Arc, + callback: SnapshotCallback, + flush_trigger: Option, + ) -> Self { + Self { + collector, + cf_tracker, + callback, + flush_trigger, + count: AtomicU64::new(0), + manual_flushing_cf: AtomicI64::new(-1), + } + } + + /// Set the CF currently being manually flushed (used to skip duplicate triggers) + pub fn set_manual_flushing_cf(&self, cf_id: i64) { + self.manual_flushing_cf.store(cf_id, Ordering::SeqCst); + } +} + +impl EventListener for LogIndexAndSequenceCollectorPurger { + fn on_flush_completed(&self, info: &FlushJobInfo) { + let cf_name = info.cf_name(); + let cf_id = match cf_name.as_ref().and_then(|n| cf_name_to_index(n)) { + Some(id) => id, + None => { + let name_str = cf_name.map(|n| String::from_utf8_lossy(&n).into_owned()); + log::warn!( + "LogIndexAndSequenceCollectorPurger: unknown CF name {:?}, skipping flush completion handling", + name_str + ); + return; + } + }; + + let largest_seqno = info.largest_seqno(); + let log_index = self.collector.find_applied_log_index(largest_seqno); + + self.cf_tracker + .set_flushed_log_index(cf_id, log_index, largest_seqno); + + let res = self.cf_tracker.get_smallest_log_index(Some(cf_id)); + self.collector.purge(res.smallest_applied_log_index); + + if res.smallest_flushed_log_index_cf >= 0 { + self.cf_tracker.set_flushed_log_index_global( + res.smallest_flushed_log_index, + res.smallest_flushed_seqno, + ); + } + + let count = self.count.fetch_add(1, Ordering::SeqCst); + if count.is_multiple_of(10) { + (self.callback)(res.smallest_flushed_log_index, false); + } + + if cf_id as i64 == self.manual_flushing_cf.load(Ordering::SeqCst) { + self.manual_flushing_cf.store(-1, Ordering::SeqCst); + } + + let flushing_cf = self.manual_flushing_cf.load(Ordering::SeqCst); + if flushing_cf != -1 || !self.collector.is_flush_pending() { + return; + } + + if res.smallest_flushed_log_index_cf < 0 { + return; + } + let target_cf = res.smallest_flushed_log_index_cf as usize; + + let Some(ref trigger) = self.flush_trigger else { + return; + }; + + // Attempt to claim manual-flush state; abort if another flush is already in progress + if self + .manual_flushing_cf + .compare_exchange(-1, target_cf as i64, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + return; + } + + if target_cf < COLUMN_FAMILY_COUNT { + trigger(target_cf); + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicU64, Ordering}; + + use rocksdb::{DB, Options}; + use tempfile::TempDir; + + use crate::cf_tracker::LogIndexOfColumnFamilies; + use crate::collector::LogIndexAndSequenceCollector; + use crate::table_properties::LogIndexTablePropertiesCollectorFactory; + + use super::LogIndexAndSequenceCollectorPurger; + + #[test] + fn test_event_listener_on_flush_completed() { + let temp_dir = TempDir::new().expect("temp dir"); + let path = temp_dir.path(); + + let collector = std::sync::Arc::new(LogIndexAndSequenceCollector::new(0)); + let cf_tracker = std::sync::Arc::new(LogIndexOfColumnFamilies::new()); + let callback_count = std::sync::Arc::new(AtomicU64::new(0)); + let callback_count_clone = callback_count.clone(); + let callback = Box::new(move |_log_index: i64, _is_manual: bool| { + callback_count_clone.fetch_add(1, Ordering::SeqCst); + }); + + let purger = LogIndexAndSequenceCollectorPurger::new( + collector.clone(), + cf_tracker.clone(), + callback, + None, + ); + + let factory = LogIndexTablePropertiesCollectorFactory::new(collector.clone()); + + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_table_properties_collector_factory(factory); + opts.add_event_listener(purger); + + let db = DB::open(&opts, path).expect("open"); + + db.put(b"k", b"v").expect("put"); + let latest_seq = db.latest_sequence_number(); + collector.update(233333, latest_seq); + db.flush().expect("flush"); + + let (flushed, _) = cf_tracker.get_cf_flushed(0); + assert_eq!( + flushed, 233333, + "cf_tracker should be updated by on_flush_completed" + ); + + assert_eq!( + callback_count.load(Ordering::SeqCst), + 1, + "snapshot callback called on 1st flush (count % 10 == 0)" + ); + } +} diff --git a/src/logindex/src/lib.rs b/src/logindex/src/lib.rs new file mode 100644 index 0000000..86978a2 --- /dev/null +++ b/src/logindex/src/lib.rs @@ -0,0 +1,100 @@ +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the language governing permissions and +// limitations under the License. + +pub mod cf_tracker; +pub mod collector; +pub mod db_access; +pub mod event_listener; +pub mod table_properties; +pub mod types; + +pub use cf_tracker::{LogIndexOfColumnFamilies, SmallestIndexRes}; +pub use collector::LogIndexAndSequenceCollector; +pub use event_listener::LogIndexAndSequenceCollectorPurger; +pub use table_properties::{ + LogIndexTablePropertiesCollectorFactory, PROPERTY_KEY, get_largest_log_index_from_collection, + read_stats_from_table_props, +}; +pub use types::{LogIndex, LogIndexAndSequencePair, LogIndexSeqnoPair, SequenceNumber}; + +/// Number of column families, consistent with storage::ColumnFamilyIndex::COUNT +pub const COLUMN_FAMILY_COUNT: usize = storage::ColumnFamilyIndex::COUNT; + +/// List of CF names, in the same order as storage::ColumnFamilyIndex +pub const CF_NAMES: [&str; COLUMN_FAMILY_COUNT] = [ + "default", // MetaCF = 0 + "hash_data_cf", // HashesDataCF = 1 + "set_data_cf", // SetsDataCF = 2 + "list_data_cf", // ListsDataCF = 3 + "zset_data_cf", // ZsetsDataCF = 4 + "zset_score_cf", // ZsetsScoreCF = 5 +]; + +const _: () = assert!( + CF_NAMES.len() == storage::ColumnFamilyIndex::COUNT, + "CF_NAMES length must match storage::ColumnFamilyIndex::COUNT" +); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cf_names_match_storage() { + use storage::ColumnFamilyIndex; + let variants: [ColumnFamilyIndex; ColumnFamilyIndex::COUNT] = [ + ColumnFamilyIndex::MetaCF, + ColumnFamilyIndex::HashesDataCF, + ColumnFamilyIndex::SetsDataCF, + ColumnFamilyIndex::ListsDataCF, + ColumnFamilyIndex::ZsetsDataCF, + ColumnFamilyIndex::ZsetsScoreCF, + ]; + for (i, cf_index) in variants.iter().enumerate() { + assert_eq!( + cf_index.name(), + CF_NAMES[i], + "CF_NAMES[{}] mismatch: expected '{}', got '{}'", + i, + cf_index.name(), + CF_NAMES[i] + ); + } + } +} + +/// Convert CF name to index +pub fn cf_name_to_index(name: &[u8]) -> Option { + CF_NAMES.iter().position(|n| n.as_bytes() == name) +} diff --git a/src/logindex/src/table_properties.rs b/src/logindex/src/table_properties.rs new file mode 100644 index 0000000..d145645 --- /dev/null +++ b/src/logindex/src/table_properties.rs @@ -0,0 +1,292 @@ +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::ffi::{CStr, CString}; +use std::sync::Arc; + +use rocksdb::table_properties::TablePropertiesCollection; +use rocksdb::table_properties_collector::{DBEntryType, TablePropertiesCollector}; +use rocksdb::table_properties_collector_factory::{ + TablePropertiesCollectorContext, TablePropertiesCollectorFactory, +}; + +use crate::collector::LogIndexAndSequenceCollector; +use crate::types::{LogIndex, LogIndexAndSequencePair, SequenceNumber}; + +pub const PROPERTY_KEY: &str = "LargestLogIndex/LargestSequenceNumber"; + +const COLLECTOR_NAME: &str = "LogIndexTablePropertiesCollector"; +const FACTORY_NAME: &str = "LogIndexTablePropertiesCollectorFactory"; + +/// Tracks largest_seqno during SST construction, writes "/" on finish +pub struct LogIndexTablePropertiesCollector { + name: CString, + collector: Arc, + largest_seqno: u64, + cache: Option, +} + +impl LogIndexTablePropertiesCollector { + pub fn new(collector: Arc) -> Self { + Self { + name: CString::new(COLLECTOR_NAME).expect("CString"), + collector, + largest_seqno: 0, + cache: None, + } + } + + fn materialize(&mut self) -> (Vec, Vec) { + let log_index = self.cache.unwrap_or_else(|| { + let li = self.collector.find_applied_log_index(self.largest_seqno); + self.cache = Some(li); + li + }); + let value = format!("{}/{}", log_index, self.largest_seqno); + (PROPERTY_KEY.as_bytes().to_vec(), value.into_bytes()) + } +} + +impl TablePropertiesCollector for LogIndexTablePropertiesCollector { + fn name(&self) -> &CStr { + self.name.as_c_str() + } + + fn add( + &mut self, + _key: &[u8], + _value: &[u8], + _entry_type: DBEntryType, + seq: u64, + _file_size: u64, + ) { + if seq > self.largest_seqno { + self.largest_seqno = seq; + } + } + + fn finish(&mut self) -> HashMap, Vec> { + let mut m = HashMap::new(); + let (k, v) = self.materialize(); + m.insert(k, v); + m + } + + fn get_readable_properties(&self) -> HashMap, Vec> { + let mut m = HashMap::new(); + let log_index = self + .cache + .unwrap_or_else(|| self.collector.find_applied_log_index(self.largest_seqno)); + let value = format!("{}/{}", log_index, self.largest_seqno); + m.insert(PROPERTY_KEY.as_bytes().to_vec(), value.into_bytes()); + m + } +} + +pub struct LogIndexTablePropertiesCollectorFactory { + name: CString, + collector: Arc, +} + +impl LogIndexTablePropertiesCollectorFactory { + pub fn new(collector: Arc) -> Self { + Self { + name: CString::new(FACTORY_NAME).expect("CString"), + collector, + } + } +} + +impl TablePropertiesCollectorFactory for LogIndexTablePropertiesCollectorFactory { + type Collector = LogIndexTablePropertiesCollector; + + fn create(&mut self, _context: TablePropertiesCollectorContext) -> Self::Collector { + LogIndexTablePropertiesCollector::new(self.collector.clone()) + } + + fn name(&self) -> &CStr { + self.name.as_c_str() + } +} + +/// Parse log_index/seqno from TableProperties' user_collected_properties +pub fn read_stats_from_table_props( + user_properties: &HashMap, Vec>, +) -> Option { + let key = PROPERTY_KEY.as_bytes(); + let value = user_properties.get(key)?; + let s = String::from_utf8_lossy(value); + let mut parts = s.split('/'); + let log_index: LogIndex = parts.next()?.parse().ok()?; + let seqno: SequenceNumber = parts.next()?.parse().ok()?; + if parts.next().is_some() { + return None; + } + Some(LogIndexAndSequencePair::new(log_index, seqno)) +} + +/// Find the largest log index from TablePropertiesCollection +pub fn get_largest_log_index_from_collection( + collection: &TablePropertiesCollection, +) -> Option { + let mut max_log_index: LogIndex = -1; + let mut max_seqno: SequenceNumber = 0; + + for (_, props) in collection.iter() { + let user_props = props.user_collected_properties(); + if let Some(pair) = read_stats_from_table_props(&user_props) { + if pair.applied_log_index() > max_log_index + || (pair.applied_log_index() == max_log_index && pair.seqno() > max_seqno) + { + max_log_index = pair.applied_log_index(); + max_seqno = pair.seqno(); + } + } + } + + if max_log_index == -1 { + None + } else { + Some(LogIndexAndSequencePair::new(max_log_index, max_seqno)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rocksdb::{DB, Options}; + use tempfile::TempDir; + + #[test] + fn test_table_properties_collector_with_rocksdb() { + let temp_dir = TempDir::new().expect("temp dir"); + let db_path = temp_dir.path(); + + let collector = Arc::new(LogIndexAndSequenceCollector::new(0)); + let factory = LogIndexTablePropertiesCollectorFactory::new(collector.clone()); + + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_table_properties_collector_factory(factory); + + let db = DB::open(&opts, db_path).expect("open"); + + let key = b"table-property-test"; + db.put(key, key).expect("put"); + let res = db.get(key).expect("get"); + assert_eq!(res, Some(key.to_vec())); + + let latest_seq = db.latest_sequence_number(); + const TEST_LOG_INDEX: i64 = 233333; + collector.update(TEST_LOG_INDEX, latest_seq); + + db.flush().expect("flush"); + + let properties = db.get_properties_of_all_tables().expect("get properties"); + assert!(!properties.is_empty()); + + let mut found = false; + for (_table_name, props) in properties.iter() { + let user_props = props.user_collected_properties(); + if let Some(value_bytes) = user_props.get(PROPERTY_KEY.as_bytes()) { + let value_str = String::from_utf8_lossy(value_bytes); + let expected = format!("{}/{}", TEST_LOG_INDEX, latest_seq); + assert_eq!(value_str, expected); + found = true; + break; + } + } + assert!(found, "Should find {} in table properties", PROPERTY_KEY); + } + + #[test] + fn test_read_stats_from_table_props() { + let mut m = HashMap::new(); + m.insert(PROPERTY_KEY.as_bytes().to_vec(), b"233333/5".to_vec()); + let pair = read_stats_from_table_props(&m).unwrap(); + assert_eq!(pair.applied_log_index(), 233333); + assert_eq!(pair.seqno(), 5); + + assert!(read_stats_from_table_props(&HashMap::new()).is_none()); + } + + #[test] + fn test_read_stats_rejects_malformed_values() { + let mut m = HashMap::new(); + m.insert(PROPERTY_KEY.as_bytes().to_vec(), b"233333/5/extra".to_vec()); + assert!( + read_stats_from_table_props(&m).is_none(), + "Should reject extra segments" + ); + + let mut m = HashMap::new(); + m.insert( + PROPERTY_KEY.as_bytes().to_vec(), + b"233333/5/extra/segments".to_vec(), + ); + assert!( + read_stats_from_table_props(&m).is_none(), + "Should reject multiple extra segments" + ); + } + + #[test] + fn test_property_key_matches_cpp() { + const CPP_K_PROPERTY_NAME: &str = "LargestLogIndex/LargestSequenceNumber"; + assert_eq!( + PROPERTY_KEY, CPP_K_PROPERTY_NAME, + "Rust PROPERTY_KEY must match C++ log_index.h kPropertyName" + ); + } + + #[test] + fn test_value_format_cpp_compatible() { + let cases: &[(i64, u64)] = &[ + (0, 0), + (233333, 5), + (-1, 1), + (i64::MAX, u64::MAX), + (i64::MIN, 0), + ]; + for &(log_index, seqno) in cases { + let value = format!("{}/{}", log_index, seqno); + let mut m = HashMap::new(); + m.insert(PROPERTY_KEY.as_bytes().to_vec(), value.into_bytes()); + let pair = read_stats_from_table_props(&m).expect("parse"); + assert_eq!(pair.applied_log_index(), log_index); + assert_eq!(pair.seqno(), seqno); + } + } +} diff --git a/src/logindex/src/types.rs b/src/logindex/src/types.rs new file mode 100644 index 0000000..cee3ba5 --- /dev/null +++ b/src/logindex/src/types.rs @@ -0,0 +1,170 @@ +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; + +/// Raft log index +pub type LogIndex = i64; + +pub type SequenceNumber = u64; + +const ORDERING: Ordering = Ordering::SeqCst; + +/// Value object: stores the binding relationship between (LogIndex, SequenceNumber) +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct LogIndexAndSequencePair { + applied_log_index: LogIndex, + seqno: SequenceNumber, +} + +impl LogIndexAndSequencePair { + pub fn new(applied_log_index: LogIndex, seqno: SequenceNumber) -> Self { + Self { + applied_log_index, + seqno, + } + } + + pub fn applied_log_index(&self) -> LogIndex { + self.applied_log_index + } + + pub fn seqno(&self) -> SequenceNumber { + self.seqno + } + + pub fn set_applied_log_index(&mut self, v: LogIndex) { + self.applied_log_index = v; + } + + pub fn set_seqno(&mut self, v: SequenceNumber) { + self.seqno = v; + } +} + +/// Atomic (log_index, seqno), supports comparison based on seqno +#[derive(Debug, Default)] +pub struct LogIndexSeqnoPair { + log_index: AtomicI64, + seqno: AtomicU64, +} + +impl LogIndexSeqnoPair { + pub fn new(log_index: LogIndex, seqno: SequenceNumber) -> Self { + Self { + log_index: AtomicI64::new(log_index), + seqno: AtomicU64::new(seqno), + } + } + + pub fn log_index(&self) -> LogIndex { + self.log_index.load(ORDERING) + } + + pub fn seqno(&self) -> SequenceNumber { + self.seqno.load(ORDERING) + } + + pub fn set(&self, log_index: LogIndex, seqno: SequenceNumber) { + self.log_index.store(log_index, ORDERING); + self.seqno.store(seqno, ORDERING); + } + + /// Compare based on seqno + pub fn eq_seqno(&self, other: &Self) -> bool { + self.seqno.load(ORDERING) == other.seqno.load(ORDERING) + } + + pub fn le_seqno(&self, other: &Self) -> bool { + self.seqno.load(ORDERING) <= other.seqno.load(ORDERING) + } + + pub fn ge_seqno(&self, other: &Self) -> bool { + self.seqno.load(ORDERING) >= other.seqno.load(ORDERING) + } + + pub fn lt_seqno(&self, other: &Self) -> bool { + self.seqno.load(ORDERING) < other.seqno.load(ORDERING) + } +} + +impl Clone for LogIndexSeqnoPair { + fn clone(&self) -> Self { + Self::new(self.log_index(), self.seqno()) + } +} + +impl PartialEq for LogIndexSeqnoPair { + fn eq(&self, other: &Self) -> bool { + self.eq_seqno(other) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_log_index_and_sequence_pair() { + let mut p = LogIndexAndSequencePair::new(233333, 5); + assert_eq!(p.applied_log_index(), 233333); + assert_eq!(p.seqno(), 5); + + p.set_applied_log_index(100); + p.set_seqno(10); + assert_eq!(p.applied_log_index(), 100); + assert_eq!(p.seqno(), 10); + } + + #[test] + fn test_log_index_seqno_pair() { + let p = LogIndexSeqnoPair::new(100, 50); + assert_eq!(p.log_index(), 100); + assert_eq!(p.seqno(), 50); + + p.set(200, 60); + assert_eq!(p.log_index(), 200); + assert_eq!(p.seqno(), 60); + + let q = LogIndexSeqnoPair::new(0, 70); + assert!(p.lt_seqno(&q)); + assert!(p.le_seqno(&q)); + assert!(!p.ge_seqno(&q)); + assert!(p != q); + + let r = LogIndexSeqnoPair::new(0, 60); + assert!(p.eq_seqno(&r)); + assert!(p == r); + } +} diff --git a/src/logindex/tests/logindex_integration.rs b/src/logindex/tests/logindex_integration.rs new file mode 100644 index 0000000..2f5711a --- /dev/null +++ b/src/logindex/tests/logindex_integration.rs @@ -0,0 +1,141 @@ +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2024-present, arana-db Community. All rights reserved. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or distributed under the License is +// distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the specific +// language governing permissions and limitations under the License. + +use std::sync::Arc; + +use logindex::{ + CF_NAMES, LogIndexAndSequenceCollector, LogIndexOfColumnFamilies, + LogIndexTablePropertiesCollectorFactory, PROPERTY_KEY, get_largest_log_index_from_collection, + read_stats_from_table_props, +}; +use rocksdb::{ColumnFamilyDescriptor, DB, Options}; +use tempfile::TempDir; + +#[test] +fn test_full_flow_multi_cf_properties() { + let temp_dir = TempDir::new().expect("temp dir"); + let path = temp_dir.path(); + + let collector = Arc::new(LogIndexAndSequenceCollector::new(0)); + let factory = LogIndexTablePropertiesCollectorFactory::new(collector.clone()); + + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.set_table_properties_collector_factory(factory); + + let cfs: Vec = CF_NAMES + .iter() + .map(|n| ColumnFamilyDescriptor::new(*n, opts.clone())) + .collect(); + + let db = DB::open_cf_descriptors(&opts, path, cfs).expect("open"); + + db.put(b"k", b"v").expect("put"); + db.flush().expect("flush"); + + let seq_after_first = db.latest_sequence_number(); + const TEST_LOG_INDEX: i64 = 233333; + collector.update(TEST_LOG_INDEX, seq_after_first); + db.put(b"k2", b"v2").expect("put"); + db.flush().expect("flush"); + + let expected_seq = db.latest_sequence_number(); + + let cf_tracker = LogIndexOfColumnFamilies::new(); + for i in 0..CF_NAMES.len() { + let cf = db.cf_handle(CF_NAMES[i]).expect("cf handle"); + let collection = db + .get_properties_of_all_tables_cf(&cf) + .expect("get properties"); + if let Some(pair) = get_largest_log_index_from_collection(&collection) { + assert_eq!(pair.applied_log_index(), TEST_LOG_INDEX); + assert_eq!(pair.seqno(), expected_seq); + } + } + + let default_cf = db.cf_handle("default").expect("cf"); + let collection = db + .get_properties_of_all_tables_cf(&default_cf) + .expect("get properties"); + let pair = get_largest_log_index_from_collection(&collection) + .expect("at least one SST with properties"); + assert_eq!(pair.applied_log_index(), TEST_LOG_INDEX); + assert_eq!(pair.seqno(), expected_seq); + + let mut found_expected_format = false; + for (_table_name, props) in collection.iter() { + let user_props = props.user_collected_properties(); + if let Some(value_bytes) = user_props.get(PROPERTY_KEY.as_bytes()) { + let value_str = String::from_utf8_lossy(value_bytes); + assert!( + value_str.contains('/'), + "value format must be / (C++ compatible)" + ); + let pair = read_stats_from_table_props(&user_props).expect("parse"); + if pair.applied_log_index() == TEST_LOG_INDEX && pair.seqno() == expected_seq { + found_expected_format = true; + } + } + } + assert!( + found_expected_format, + "must find SST with {}/{}", + TEST_LOG_INDEX, expected_seq + ); + + cf_tracker.init(&DbAccess { db: &db }).expect("init"); + let (applied, _) = cf_tracker.get_cf_applied(0); + let (flushed, _) = cf_tracker.get_cf_flushed(0); + assert_eq!(applied, TEST_LOG_INDEX); + assert_eq!(flushed, TEST_LOG_INDEX); +} + +struct DbAccess<'a> { + db: &'a DB, +} + +impl logindex::db_access::DbCfAccess for DbAccess<'_> { + fn get_properties_of_all_tables_cf( + &self, + cf_id: usize, + ) -> std::result::Result + { + if cf_id < CF_NAMES.len() { + let cf = self.db.cf_handle(CF_NAMES[cf_id]).expect("cf handle"); + self.db.get_properties_of_all_tables_cf(&cf) + } else { + self.db.get_properties_of_all_tables() + } + } +}