diff --git a/.agent/notes/db-driver-test-rerun-2026-05-03.md b/.agent/notes/db-driver-test-rerun-2026-05-03.md new file mode 100644 index 0000000000..c273ad63c7 --- /dev/null +++ b/.agent/notes/db-driver-test-rerun-2026-05-03.md @@ -0,0 +1,18 @@ +# DB Driver Test Rerun 2026-05-03 + +Config: static registry, bare encoding, native/local plus wasm/remote. + +## Results + +- [x] actor-db: native/local passed, wasm/remote passed +- [x] actor-db-raw: native/local passed, wasm/remote passed +- [x] actor-db-init-order: native/local passed, wasm/remote passed +- [x] actor-db-pragma-migration: native/local passed, wasm/remote passed +- [x] actor-sleep-db: native/local passed, wasm/remote passed +- [x] actor-db-stress: native/local passed, wasm/remote passed on rerun + +## Notes + +- Fixed a grace-deadline shutdown bug where SQLite cleanup happened after final state serialization, allowing delayed callbacks from the old generation to issue late DB work. +- Added a shared closed flag to `SqliteDb` so both local and remote SQLite handles fail closed after cleanup. +- The first full wasm/remote stress run hit `ltx trailer checksums must be zeroed` in the kitchen-sink case. The isolated kitchen-sink rerun and the full wasm/remote stress rerun both passed. diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs index 40fff2898b..42bd773ce0 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs @@ -1,6 +1,5 @@ use std::collections::HashSet; use std::io::Cursor; -#[cfg(feature = "sqlite-local")] use std::sync::{ Arc, atomic::{AtomicBool, Ordering}, @@ -76,6 +75,11 @@ pub struct SqliteDb { /// always sets up sqlite storage under the hood, so handle/actor_id are /// not a reliable signal for whether the user opted in; this flag is. enabled: bool, + /// Core owns the logical actor-generation DB lifetime. Depot-client can + /// mark one native worker closed, but stale cloned `c.db` handles could + /// otherwise reopen a new local worker or keep sending remote SQL after + /// actor cleanup. + closed: Arc, #[cfg(feature = "sqlite-local")] // Forced-sync: native SQLite handles are used inside spawn_blocking and // synchronous diagnostic accessors. @@ -108,6 +112,7 @@ impl SqliteDb { generation, backend: select_sqlite_backend(enabled, remote_sqlite), enabled, + closed: Default::default(), #[cfg(feature = "sqlite-local")] db: Default::default(), #[cfg(feature = "sqlite-local")] @@ -149,11 +154,13 @@ impl SqliteDb { } pub async fn open(&self) -> Result<()> { + self.ensure_open()?; match self.backend { SqliteBackend::LocalNative => { #[cfg(feature = "sqlite-local")] { let _open_guard = self.open_lock.lock().await; + self.ensure_open()?; if self.db.lock().is_some() { return Ok(()); } @@ -305,6 +312,9 @@ impl SqliteDb { } pub async fn close(&self) -> Result<()> { + if self.closed.swap(true, Ordering::AcqRel) { + return Ok(()); + } match self.backend { SqliteBackend::LocalNative => { #[cfg(feature = "sqlite-local")] @@ -346,6 +356,7 @@ impl SqliteDb { #[cfg(feature = "sqlite-local")] fn native_db_handle(&self) -> Result { + self.ensure_open()?; self.db .lock() .as_ref() @@ -432,6 +443,7 @@ impl SqliteDb { } fn remote_config(&self) -> Result { + self.ensure_open()?; let config = self.runtime_config()?; let generation = config .generation @@ -444,6 +456,13 @@ impl SqliteDb { }) } + fn ensure_open(&self) -> Result<()> { + if self.closed.load(Ordering::Acquire) { + return Err(SqliteRuntimeError::Closed.build()); + } + Ok(()) + } + async fn remote_exec(&self, sql: String) -> Result { let config = self.remote_config()?; let response = config diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs index 0c4cb1b03a..87c3bc6814 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs @@ -1704,10 +1704,38 @@ impl ActorTask { ShutdownKind::Sleep => LifecycleState::SleepFinalize, ShutdownKind::Destroy => LifecycleState::Destroying, }); + let reason_label = shutdown_reason_label(reason); + let actor_id = self.ctx.actor_id().to_owned(); + // Grace is over once final shutdown starts. Close SQLite before final + // serialization so any late callback from this generation fails closed. + self.ctx.sql().cleanup().await.with_context(|| { + format!("cleanup sqlite before {reason_label} finalization") + })?; + trim_native_allocator_after_shutdown(&actor_id, reason_label); + tracing::debug!( + actor_id = %actor_id, + reason = reason_label, + step = "cleanup_sqlite", + "actor shutdown cleanup step completed" + ); self.save_final_state().await?; self.close_actor_event_channel(); self.join_aborted_run_handle().await; - Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await?; + let ctx = self.ctx.clone(); + tokio::join!( + Self::run_shutdown_task_teardown_sleep_state( + ctx.clone(), + reason_label, + actor_id.clone() + ), + Self::run_shutdown_task_wait_for_state_writes( + ctx.clone(), + reason_label, + actor_id.clone() + ), + Self::run_shutdown_task_cleanup_alarm(ctx, reason, reason_label, actor_id), + ); + if matches!(reason, ShutdownKind::Destroy) { self.ctx.mark_destroy_completed(); } @@ -1715,6 +1743,82 @@ impl ActorTask { Ok(()) } + async fn run_shutdown_task_teardown_sleep_state( + ctx: ActorContext, + reason_label: &'static str, + actor_id: String, + ) { + ctx.teardown_sleep_state().await; + tracing::debug!( + actor_id = %actor_id, + reason = reason_label, + step = "teardown_sleep_state", + "actor shutdown cleanup step completed" + ); + + #[cfg(test)] + run_shutdown_cleanup_hook(&ctx, reason_label); + } + + async fn run_shutdown_task_wait_for_state_writes( + ctx: ActorContext, + reason_label: &'static str, + actor_id: String, + ) { + ctx.wait_for_pending_state_writes().await; + tracing::debug!( + actor_id = %actor_id, + reason = reason_label, + step = "wait_for_pending_state_writes", + "actor shutdown cleanup step completed" + ); + } + + async fn run_shutdown_task_cleanup_alarm( + ctx: ActorContext, + reason: ShutdownKind, + reason_label: &'static str, + actor_id: String, + ) { + match reason { + ShutdownKind::Sleep => { + ctx.sync_alarm_logged(); + tracing::debug!( + actor_id = %actor_id, + reason = reason_label, + step = "sync_alarm", + "actor shutdown cleanup step completed" + ); + // Keep the persisted engine alarm armed across sleep, but abort the + // local Tokio timer owned by this actor generation. + ctx.cancel_local_alarm_timeouts(); + tracing::debug!( + actor_id = %actor_id, + reason = reason_label, + step = "cancel_local_alarm_timeouts", + "actor shutdown cleanup step completed" + ); + } + ShutdownKind::Destroy => { + ctx.cancel_driver_alarm_logged(); + tracing::debug!( + actor_id = %actor_id, + reason = reason_label, + step = "cancel_driver_alarm", + "actor shutdown cleanup step completed" + ); + } + } + + ctx.wait_for_pending_alarm_writes().await; + tracing::debug!( + actor_id = %actor_id, + reason = reason_label, + step = "wait_for_pending_alarm_writes", + "actor shutdown cleanup step completed" + ); + } + async fn save_final_state(&mut self) -> Result<()> { let (reply_tx, reply_rx) = oneshot::channel(); if let Err(error) = self.send_actor_event( @@ -1757,79 +1861,6 @@ impl ActorTask { self.ctx.save_state(deltas).await } - async fn finish_shutdown_cleanup_with_ctx( - ctx: ActorContext, - reason: ShutdownKind, - ) -> Result<()> { - let reason_label = shutdown_reason_label(reason); - let actor_id = ctx.actor_id().to_owned(); - ctx.teardown_sleep_state().await; - tracing::debug!( - actor_id = %actor_id, - reason = reason_label, - step = "teardown_sleep_state", - "actor shutdown cleanup step completed" - ); - #[cfg(test)] - run_shutdown_cleanup_hook(&ctx, reason_label); - ctx.wait_for_pending_state_writes().await; - tracing::debug!( - actor_id = %actor_id, - reason = reason_label, - step = "wait_for_pending_state_writes", - "actor shutdown cleanup step completed" - ); - ctx.sync_alarm_logged(); - tracing::debug!( - actor_id = %actor_id, - reason = reason_label, - step = "sync_alarm", - "actor shutdown cleanup step completed" - ); - ctx.wait_for_pending_alarm_writes().await; - tracing::debug!( - actor_id = %actor_id, - reason = reason_label, - step = "wait_for_pending_alarm_writes", - "actor shutdown cleanup step completed" - ); - ctx.sql() - .cleanup() - .await - .with_context(|| format!("cleanup sqlite during {reason_label} shutdown"))?; - trim_native_allocator_after_shutdown(&actor_id, reason_label); - tracing::debug!( - actor_id = %actor_id, - reason = reason_label, - step = "cleanup_sqlite", - "actor shutdown cleanup step completed" - ); - match reason { - // Match the reference TS runtime: keep the persisted engine alarm armed - // across sleep so the next instance still has a wake trigger, but abort - // the local Tokio timer owned by the shutting-down instance. - ShutdownKind::Sleep => { - ctx.cancel_local_alarm_timeouts(); - tracing::debug!( - actor_id = %actor_id, - reason = reason_label, - step = "cancel_local_alarm_timeouts", - "actor shutdown cleanup step completed" - ); - } - ShutdownKind::Destroy => { - ctx.cancel_driver_alarm_logged(); - tracing::debug!( - actor_id = %actor_id, - reason = reason_label, - step = "cancel_driver_alarm", - "actor shutdown cleanup step completed" - ); - } - } - Ok(()) - } - fn record_inbox_depths(&self) { self.ctx .metrics()