Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/db_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
},
db::DBAccess,
env::Env,
event_listener::EventListenerHandle,
ffi,
ffi_util::{from_cstr, to_cpath, CStrLike},
merge_operator::{
Expand Down Expand Up @@ -1850,6 +1851,47 @@ impl Options {
}
}

/// Add an event listener to the options.
///
/// Event listeners can be used to monitor RocksDB events such as flush,
/// compaction, etc.
///
/// The listener will be owned by RocksDB and destroyed when the Options are destroyed.
///
/// # Examples
///
/// ```
/// use rocksdb::{new_event_listener, Options, EventListener, FlushJobInfo};
/// use std::sync::atomic::{AtomicI32, Ordering};
/// use std::sync::Arc;
///
/// struct MyEventListener {
/// flush_count: Arc<AtomicI32>,
/// }
///
/// impl EventListener for MyEventListener {
/// fn on_flush_begin(&self, _info: &FlushJobInfo) {
/// self.flush_count.fetch_add(1, Ordering::Relaxed);
/// }
/// }
///
/// let mut opts = Options::default();
/// let flush_count = Arc::new(AtomicI32::new(0));
///
/// let listener = MyEventListener {
/// flush_count: flush_count.clone(),
/// };
/// let listener_handle = new_event_listener(listener);
/// opts.add_event_listener(listener_handle);
/// ```
pub fn add_event_listener(&mut self, listener: EventListenerHandle) {
let ptr: *mut librocksdb_sys::rocksdb_eventlistener_t = listener.into_raw();
unsafe {
ffi::rocksdb_options_add_eventlistener(self.inner, ptr);
}
// listener is dropped here, but owned is false so Drop won't destroy it
}

/// Prepare the DB for bulk loading.
///
/// All data will be in level 0 without any automatic compaction.
Expand Down
218 changes: 218 additions & 0 deletions src/event_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright 2020 Tyler Neely, Alex Regueiro
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// /* Event listener */
// typedef void (*on_flush_begin_cb)(void*, rocksdb_t*,
// const rocksdb_flushjobinfo_t*);
// typedef void (*on_flush_completed_cb)(void*, rocksdb_t*,
// const rocksdb_flushjobinfo_t*);

use crate::ffi;

use libc::{c_void, size_t};

pub struct FlushJobInfo {
pub(crate) inner: *const ffi::rocksdb_flushjobinfo_t,
}

impl FlushJobInfo {
pub fn cf_name(&self) -> String {
unsafe {
let mut size: size_t = 0;
let ptr = ffi::rocksdb_flushjobinfo_cf_name(self.inner, &mut size);
let slice = std::slice::from_raw_parts(ptr as *const u8, size);
String::from_utf8_lossy(slice).into_owned()
}
}

pub fn file_path(&self) -> String {
unsafe {
let mut size: size_t = 0;
let ptr = ffi::rocksdb_flushjobinfo_file_path(self.inner, &mut size);
let slice = std::slice::from_raw_parts(ptr as *const u8, size);
String::from_utf8_lossy(slice).into_owned()
}
}

pub fn triggered_writes_slowdown(&self) -> bool {
unsafe { ffi::rocksdb_flushjobinfo_triggered_writes_slowdown(self.inner) != 0 }
}

pub fn triggered_writes_stop(&self) -> bool {
unsafe { ffi::rocksdb_flushjobinfo_triggered_writes_stop(self.inner) != 0 }
}

pub fn largest_seqno(&self) -> u64 {
unsafe { ffi::rocksdb_flushjobinfo_largest_seqno(self.inner) }
}

pub fn smallest_seqno(&self) -> u64 {
unsafe { ffi::rocksdb_flushjobinfo_smallest_seqno(self.inner) }
}
}

/// A handle to an EventListener that manages its lifetime.
pub struct EventListenerHandle {
inner: *mut ffi::rocksdb_eventlistener_t,
owned: bool,
}

unsafe impl Send for EventListenerHandle {}
unsafe impl Sync for EventListenerHandle {}

impl EventListenerHandle {
pub(crate) unsafe fn from_raw(ptr: *mut ffi::rocksdb_eventlistener_t) -> Self {
Self {
inner: ptr,
owned: true,
}
}
pub(crate) fn into_raw(mut self) -> *mut ffi::rocksdb_eventlistener_t {
self.owned = false;
self.inner
}
}
impl Drop for EventListenerHandle {
fn drop(&mut self) {
if self.owned && !self.inner.is_null() {
unsafe {
ffi::rocksdb_eventlistener_destroy(self.inner);
}
}
}
}
// WARNING: If an EventListener implementation panics, the panic will unwind across the C/FFI boundary,
// which is undefined behavior in Rust. Consider using std::panic::catch_unwind to wrap the callback body
// to prevent panic propagation to C code. Not fixing this issue for now.
pub trait EventListener: Send + Sync {
fn on_flush_begin(&self, _: &FlushJobInfo) {}
fn on_flush_completed(&self, _: &FlushJobInfo) {}
// fn on_compaction_begin(&self, _: &CompactionJobInfo) {}
// fn on_compaction_completed(&self, _: &CompactionJobInfo) {}
// fn on_subcompaction_begin(&self, _: &SubcompactionJobInfo) {}
// fn on_subcompaction_completed(&self, _: &SubcompactionJobInfo) {}
// fn on_external_file_ingested(&self, _: &IngestionInfo) {}
// fn on_background_error(&self, _: DBBackgroundErrorReason, _: MutableStatus) {}
// fn on_stall_conditions_changed(&self, _: &WriteStallInfo) {}
// fn on_memtable_sealed(&self, _: &MemTableInfo) {}
}

unsafe extern "C" fn destructor<E: EventListener>(ctx: *mut c_void) {
let _ = Box::from_raw(ctx as *mut E);
}

unsafe extern "C" fn on_flush_begin<E: EventListener>(
ctx: *mut c_void,
_db: *mut ffi::rocksdb_t,
flush_job_info: *const ffi::rocksdb_flushjobinfo_t,
) {
let listener = &*(ctx as *const E);
let info = FlushJobInfo {
inner: flush_job_info,
};
listener.on_flush_begin(&info);
}

unsafe extern "C" fn on_flush_completed<E: EventListener>(
ctx: *mut c_void,
_db: *mut ffi::rocksdb_t,
flush_job_info: *const ffi::rocksdb_flushjobinfo_t,
) {
let listener = &*(ctx as *const E);
let info = FlushJobInfo {
inner: flush_job_info,
};
listener.on_flush_completed(&info);
}

unsafe extern "C" fn on_compaction_begin<E: EventListener>(
_ctx: *mut c_void,
_db: *mut ffi::rocksdb_t,
_compaction_job_info: *const ffi::rocksdb_compactionjobinfo_t,
) {
// TODO
}

unsafe extern "C" fn on_compaction_completed<E: EventListener>(
_ctx: *mut c_void,
_db: *mut ffi::rocksdb_t,
_compaction_job_info: *const ffi::rocksdb_compactionjobinfo_t,
) {
// TODO
}

unsafe extern "C" fn on_subcompaction_begin<E: EventListener>(
_ctx: *mut c_void,
_sub_compaction_job_info: *const ffi::rocksdb_subcompactionjobinfo_t,
) {
// TODO
}

unsafe extern "C" fn on_subcompaction_completed<E: EventListener>(
_ctx: *mut c_void,
_sub_compaction_job_info: *const ffi::rocksdb_subcompactionjobinfo_t,
) {
// TODO
}

unsafe extern "C" fn on_external_file_ingested<E: EventListener>(
_ctx: *mut c_void,
_db: *mut ffi::rocksdb_t,
_external_file_ingestion_info: *const ffi::rocksdb_externalfileingestioninfo_t,
) {
// TODO
}

unsafe extern "C" fn on_background_error<E: EventListener>(
_ctx: *mut c_void,
_reason: u32,
_status_ptr: *mut ffi::rocksdb_status_ptr_t,
) {
// TODO
}

unsafe extern "C" fn on_stall_conditions_changed<E: EventListener>(
_ctx: *mut c_void,
_writestall_info: *const ffi::rocksdb_writestallinfo_t,
) {
// TODO
}

unsafe extern "C" fn on_memtable_sealed<E: EventListener>(
_ctx: *mut c_void,
_info: *const ffi::rocksdb_memtableinfo_t,
) {
// TODO
}

pub fn new_event_listener<E: EventListener>(e: E) -> EventListenerHandle {
let p = Box::new(e);
let ptr = unsafe {
ffi::rocksdb_eventlistener_create(
Box::into_raw(p) as *mut c_void,
Some(destructor::<E>),
Some(on_flush_begin::<E>),
Some(on_flush_completed::<E>),
Some(on_compaction_begin::<E>),
Some(on_compaction_completed::<E>),
Some(on_subcompaction_begin::<E>),
Some(on_subcompaction_completed::<E>),
Some(on_external_file_ingested::<E>),
Some(on_background_error::<E>),
Some(on_stall_conditions_changed::<E>),
Some(on_memtable_sealed::<E>),
)
};
unsafe { EventListenerHandle::from_raw(ptr) }
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ mod db_iterator;
mod db_options;
mod db_pinnable_slice;
mod env;
mod event_listener;
mod iter_range;
pub mod merge_operator;
pub mod perf;
Expand Down Expand Up @@ -131,6 +132,7 @@ pub use crate::{
},
db_pinnable_slice::DBPinnableSlice,
env::Env,
event_listener::{new_event_listener, EventListener, EventListenerHandle, FlushJobInfo},
ffi_util::CStrLike,
iter_range::{IterateBounds, PrefixRange},
merge_operator::MergeOperands,
Expand Down
64 changes: 64 additions & 0 deletions tests/test_event_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2020 Tyler Neely
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod util;

use rocksdb::{new_event_listener, EventListener, FlushJobInfo, FlushOptions, Options, DB};
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use util::DBPath;

struct TestEventListener {
flush_begin_count: Arc<AtomicI32>,
flush_completed_count: Arc<AtomicI32>,
}

impl EventListener for TestEventListener {
fn on_flush_begin(&self, info: &FlushJobInfo) {
println!("Flush file: {}", info.file_path());
self.flush_begin_count.fetch_add(1, Ordering::Relaxed);
}

fn on_flush_completed(&self, _: &FlushJobInfo) {
self.flush_completed_count.fetch_add(1, Ordering::Relaxed);
}
}

#[test]
fn test_flush_callback() {
let path = DBPath::new("_test_flush_callback");
let flush_begin_count = Arc::new(AtomicI32::new(0));
let flush_completed_count = Arc::new(AtomicI32::new(0));
{
let mut opts = Options::default();
opts.create_if_missing(true);

let listener = TestEventListener {
flush_begin_count: flush_begin_count.clone(),
flush_completed_count: flush_completed_count.clone(),
};
let listener_ptr = new_event_listener(listener);
opts.add_event_listener(listener_ptr);
let db = DB::open(&opts, &path).unwrap();
db.put(b"k1", b"v1").unwrap();

let mut flush_opts = FlushOptions::default();
flush_opts.set_wait(true);
db.flush_opt(&flush_opts).unwrap();

// Verify callback was called
assert_eq!(flush_begin_count.load(Ordering::Relaxed), 1);
assert_eq!(flush_completed_count.load(Ordering::Relaxed), 1);
}
}