feat(memory): durable conversation-mem enqueue wiring#1357
feat(memory): durable conversation-mem enqueue wiring#1357
Conversation
📝 WalkthroughWalkthroughAdds Postgres and Oracle conversation-memory writers, a hooked writer, enqueue planning to generate durable memory rows, and threads MemoryExecutionContext and ConversationMemoryWriter through request handling and persistence so responses and conversation items can create durable memory entries. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Router
participant Handler
participant Enqueue
participant MemWriter
participant Storage
Client->>Router: request (+x-conversation-memory-config header)
Router->>Router: derive MemoryExecutionContext
Router->>Handler: call with MemoryExecutionContext & ConversationMemoryWriter
Handler->>Handler: process response/items\nextract role text
Handler->>Enqueue: build_enqueue_plan(inputs)
Enqueue-->>Handler: EnqueuePlan{rows}
loop for each row
Handler->>MemWriter: create_memory(row)
MemWriter->>Storage: INSERT conversation_memories
Storage-->>MemWriter: ConversationMemoryId
MemWriter-->>Handler: Memory ID
end
Handler-->>Client: final response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request implements durable conversation memory persistence for Oracle and Postgres backends. It introduces ConversationMemoryWriter implementations for both databases, including schema initialization and row insertion logic. The model_gateway is updated to build and enqueue memory plans during response generation and item ingestion. Feedback highlights a potential SQL injection and schema visibility issue in the Oracle implementation, collision risks in Oracle index naming, and invalid identifier generation for Postgres indexes.
There was a problem hiding this comment.
Clean, well-tested PR. The memory enqueue wiring is threaded correctly through all request paths with no double-persistence risks. Control flow in streaming tool loops is sound — each exit path persists exactly once. Good test coverage on the pure planner logic and API integration paths. Two minor nits posted (index name sanitization consistency and a redundant clone).
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 72fe328788
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
72fe328 to
0eb318b
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
model_gateway/src/routers/grpc/harmony/responses/streaming.rs (1)
298-316:⚠️ Potential issue | 🟠 MajorEmit
incomplete_detailsonresponse, not insideusage.Here the persisted response is correct (
final_response.incomplete_details = ...), but the streamedresponse.completedevent puts the field insideusagebecauseemit_completed()only accepts a usage object. That makes streaming clients lose the max-tool-calls reason in the canonicalresponse.incomplete_detailsslot.💡 Proposed fix
- let usage_json = json!({ + let usage_json = json!({ "input_tokens": usage.prompt_tokens, "output_tokens": usage.completion_tokens, "total_tokens": usage.total_tokens, - "incomplete_details": incomplete_details, }); persist_streaming_response( ctx, memory_execution_context, emitter, Some(usage.clone()), original_request, Some(incomplete_details), ) .await; - let event = emitter.emit_completed(Some(&usage_json)); + let mut event = emitter.emit_completed(Some(&usage_json)); + event["response"]["incomplete_details"] = + json!({ "reason": "max_tool_calls" }); emitter.send_event_best_effort(&event, tx);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model_gateway/src/routers/grpc/harmony/responses/streaming.rs` around lines 298 - 316, The streamed response.completed event currently places incomplete_details inside the usage object (see usage_json and emitter.emit_completed), causing clients to miss final_response.incomplete_details; change the emission to put incomplete_details at the top-level of the response.completed event instead of inside usage — either call or implement an overload/variant of emit_completed that accepts both a usage and an optional incomplete_details (referencing emit_completed, usage_json, incomplete_details, persist_streaming_response, and emitter.send_event_best_effort) or construct the event manually so the emitted JSON has "incomplete_details" alongside "usage" in the response.completed payload.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/data_connector/src/postgres.rs`:
- Around line 995-1041: postgres_index_name can produce identifiers >
MAX_POSTGRES_IDENT_LEN when suffix is very long because the hashed branch
appends HASH_HEX_LEN + "_" + prefix; update postgres_index_name to guarantee the
final formatted "{table_part}_{suffix}" never exceeds MAX_POSTGRES_IDENT_LEN by
either (a) ensuring you only take the hashed branch when table_budget >=
HASH_HEX_LEN + 2 (accounting for "_" and at least one prefix char) and otherwise
truncate the suffix further, or (b) after constructing table_part and suffix,
clamp/truncate the suffix so format!("{table_part}_{suffix}") is at most
MAX_POSTGRES_IDENT_LEN; adjust logic around MAX_POSTGRES_IDENT_LEN, HASH_HEX_LEN
and use stable_hash_hex_lower as before to produce the hash when budget allows.
- Around line 870-918: The current constructor builds a single DDL string
(variable ddl) that includes CREATE TABLE and all CREATE INDEX statements and
calls client.batch_execute(&ddl), which causes startup to hard-fail if any index
creation errors (e.g., missing columns due to migration drift); change it to
execute the CREATE TABLE first (batch_execute on the table DDL only) then
iterate over each index (those created via postgres_index_name(&s.table, "...")
and s.col(...)) and call client.batch_execute(&index_ddl) separately for each
index, catching errors and logging them with tracing::debug! (mirroring
PostgresResponseStorage::new behavior) instead of returning an Err so writer
construction can continue on index failures.
In `@model_gateway/src/memory/enqueue.rs`:
- Around line 133-142: The code currently maps MemoryPolicyMode::Unspecified and
::Unrecognized in policy_mode_label which can leak into persisted memory_config
if store_ltm is active unexpectedly; update build_enqueue_plan to validate that
the chosen MemoryPolicyMode is one of the store-bearing variants
(MemoryPolicyMode::StoreOnly or ::StoreAndRecall) before proceeding (when
store_ltm.active() is true) and return an error or assert/panic if not, or
alternatively document the invariant with a clear comment in build_enqueue_plan;
reference the policy_mode_label helper and the MemoryPolicyMode enum when adding
the check so the validation is colocated with the store_ltm.active() branch.
In `@model_gateway/src/routers/grpc/regular/responses/streaming.rs`:
- Around line 639-661: The stop reason is being placed inside usage_json, but
streaming clients expect response.incomplete_details at the top-level of the
emitted completion event; after you call
emitter.emit_completed(Some(&usage_json)) adjust the returned event to add
incomplete_details at the response-level (e.g., set
event.payload["incomplete_details"] = incomplete_details) before calling
emitter.send_event so the emitted event's top-level response contains the
incomplete_details; keep persist_streaming_response as-is (it already stores
incomplete_details at response level).
---
Outside diff comments:
In `@model_gateway/src/routers/grpc/harmony/responses/streaming.rs`:
- Around line 298-316: The streamed response.completed event currently places
incomplete_details inside the usage object (see usage_json and
emitter.emit_completed), causing clients to miss
final_response.incomplete_details; change the emission to put incomplete_details
at the top-level of the response.completed event instead of inside usage —
either call or implement an overload/variant of emit_completed that accepts both
a usage and an optional incomplete_details (referencing emit_completed,
usage_json, incomplete_details, persist_streaming_response, and
emitter.send_event_best_effort) or construct the event manually so the emitted
JSON has "incomplete_details" alongside "usage" in the response.completed
payload.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5941ec0b-2a1d-4e87-8de0-079b246f7934
📒 Files selected for processing (24)
crates/data_connector/src/core.rscrates/data_connector/src/factory.rscrates/data_connector/src/oracle.rscrates/data_connector/src/postgres.rsmodel_gateway/src/memory/enqueue.rsmodel_gateway/src/memory/mod.rsmodel_gateway/src/routers/common/persistence_utils.rsmodel_gateway/src/routers/conversations/handlers.rsmodel_gateway/src/routers/grpc/common/responses/context.rsmodel_gateway/src/routers/grpc/common/responses/utils.rsmodel_gateway/src/routers/grpc/harmony/responses/non_streaming.rsmodel_gateway/src/routers/grpc/harmony/responses/streaming.rsmodel_gateway/src/routers/grpc/regular/responses/common.rsmodel_gateway/src/routers/grpc/regular/responses/handlers.rsmodel_gateway/src/routers/grpc/regular/responses/non_streaming.rsmodel_gateway/src/routers/grpc/regular/responses/streaming.rsmodel_gateway/src/routers/grpc/router.rsmodel_gateway/src/routers/openai/responses/non_streaming.rsmodel_gateway/src/routers/openai/responses/streaming.rsmodel_gateway/src/server.rsmodel_gateway/tests/api/memory_enqueue_api_test.rsmodel_gateway/tests/api/mod.rsmodel_gateway/tests/common/mod.rsmodel_gateway/tests/common/recording_memory_writer.rs
Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
3893a85 to
e406f32
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/data_connector/src/oracle.rs (1)
1374-1396: 🧹 Nitpick | 🔵 TrivialResponse-schema indexes still use raw
format!naming.
create_index_if_missingnow takes an owner, butprev_idx/user_idxhere are still built asformat!("{}_PREV_IDX", s.table)/format!("{}_USER_IDX", s.table)rather than going through the neworacle_index_namehelper. For the defaultRESPONSEStable name this is fine (≤30 chars), but with a customized longschema.responses.table(e.g. a tenant prefix) the raw concatenation can exceed Oracle's 30-char identifier limit and fail DDL at startup, while the newconversation_memoriesindexes would succeed.Not a regression (pre-existing behavior), but since you've just introduced the canonical helper one call site over, routing these two names through
oracle_index_namewould make schema-init robust to custom table names uniformly.♻️ Proposed change
- let prev_idx = format!("{}_PREV_IDX", s.table); + let prev_idx = oracle_index_name(&s.table, "PREV_IDX"); @@ - let user_idx = format!("{}_USER_IDX", s.table); + let user_idx = oracle_index_name(&s.table, "USER_IDX");🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/data_connector/src/oracle.rs` around lines 1374 - 1396, The index names for previous_response_id and safety_identifier are built with raw format! into prev_idx and user_idx which can exceed Oracle's 30-char identifier limit for long s.table values; update the two call sites that build prev_idx and user_idx to use the canonical oracle_index_name helper (e.g., oracle_index_name(&s.table, "PREV_IDX") and oracle_index_name(&s.table, "USER_IDX")) before passing them into create_index_if_missing so the names are properly truncated/sanitized; keep the rest of the create_index_if_missing calls (including schema.owner.as_deref(), &s.table and the CREATE INDEX SQL text) unchanged except to reference the new oracle_index_name-produced identifier.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/data_connector/src/oracle.rs`:
- Around line 1742-1780: Add focused unit tests for oracle_index_name to
exercise boundary behaviors: verify passthrough for short table+suffix,
hashed/truncated prefix behavior for a very long table with a normal suffix
(assert length <=30 and prefix contains stable_hash_hex_upper-driven hash), the
fallback when a very long suffix forces the table_budget==0 branch (ensure
suffix truncated and name still <=30 and starts with letter), and the
leading-char replacement when table names start with digits or '_' (assert first
char is ASCII alphabetic). Use oracle_index_name directly and assert the
resulting string length is <=30 and matches the expected structural properties
(contains '_' separator, starts with a letter, and includes expected
truncated/hashed pieces) to prevent regressions in
build_index_table_part/stable_hash_hex_upper interactions.
- Around line 1187-1235: Add a composite index creation for the dequeue query:
when both "status" and "next_run_at" are not skipped, call
oracle_index_name(&s.table, "STATUS_NEXT_RUN_AT_IDX") and invoke
create_index_if_missing(...) with a CREATE INDEX ... ON {table} (status,
next_run_at); if your schema/workers shard by owner and "owner_id" is present
(s.is_skipped("owner_id") == false), create an index with owner_id as the
leading column (owner_id, status, next_run_at) instead; place this block near
the existing single-column index creations and reuse schema.owner.as_deref(),
&s.table and s.col("...") to build the SQL.
In `@model_gateway/src/routers/grpc/regular/responses/streaming.rs`:
- Around line 907-916: The branch that calls persist_streaming_response(...) and
then breaks needs to also emit the terminal "response.completed" event via the
same emitter used in the other branches so clients don't stay stuck in
"in_progress"; after the await persist_streaming_response(...) call, invoke the
emitter's response-completion/terminal event method (the same call used in the
no-tool and Harmony streaming paths) with the same parameters (e.g.,
context/original_request and any metadata/usage) and only then break/return from
the function so the lifecycle completes.
---
Outside diff comments:
In `@crates/data_connector/src/oracle.rs`:
- Around line 1374-1396: The index names for previous_response_id and
safety_identifier are built with raw format! into prev_idx and user_idx which
can exceed Oracle's 30-char identifier limit for long s.table values; update the
two call sites that build prev_idx and user_idx to use the canonical
oracle_index_name helper (e.g., oracle_index_name(&s.table, "PREV_IDX") and
oracle_index_name(&s.table, "USER_IDX")) before passing them into
create_index_if_missing so the names are properly truncated/sanitized; keep the
rest of the create_index_if_missing calls (including schema.owner.as_deref(),
&s.table and the CREATE INDEX SQL text) unchanged except to reference the new
oracle_index_name-produced identifier.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 7ec40250-984a-4864-900f-8d3b9917f63f
📒 Files selected for processing (6)
crates/data_connector/src/oracle.rscrates/data_connector/src/postgres.rsmodel_gateway/src/memory/enqueue.rsmodel_gateway/src/routers/grpc/harmony/responses/streaming.rsmodel_gateway/src/routers/grpc/regular/responses/streaming.rsmodel_gateway/src/routers/grpc/router.rs
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/src/routers/grpc/regular/responses/streaming.rs`:
- Around line 639-660: persisted responses created by persist_streaming_response
end up with status=ResponseStatus::Completed even when incomplete_details is set
(e.g., max_tool_calls); update persist_streaming_response (the code path that
calls emitter.finalize and then calls persist_response_if_needed / builds
final_response) to detect if incomplete_details.is_some() and, before
persisting, override final_response.status = ResponseStatus::Incomplete so
stored responses with non-null incomplete_details are marked Incomplete per the
Responses API.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a62c1b9d-f9c7-4e70-9609-98a2dc7ddbc4
📒 Files selected for processing (2)
crates/data_connector/src/oracle.rsmodel_gateway/src/routers/grpc/regular/responses/streaming.rs
…tion Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
26fe057 to
c451c59
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c451c59f83
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: dc509d690f
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
…status Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
dc509d6 to
df9e777
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
model_gateway/src/routers/grpc/regular/responses/streaming.rs (1)
239-254:⚠️ Potential issue | 🟠 MajorPersist before the terminal SSE send in the non-MCP path.
Line 240 can fail on a closed channel, and then Line 244 is never reached. That drops the stored response and the new durable memory enqueue for exactly the “client disconnected at the end” case. The MCP branches in this file were already hardened by persisting first; this path needs the same ordering.
Proposed fix
- let completed_event = event_emitter.emit_completed(usage_json.as_ref()); - event_emitter.send_event(&completed_event, &tx)?; - - // Finalize and persist accumulated response let final_response = accumulator.finalize(); persist_response_if_needed( conversation_storage, conversation_item_storage, response_storage, conversation_memory_writer, memory_execution_context, &final_response, &original_request, request_context, ) .await; + + let completed_event = event_emitter.emit_completed(usage_json.as_ref()); + event_emitter.send_event(&completed_event, &tx)?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@model_gateway/src/routers/grpc/regular/responses/streaming.rs` around lines 239 - 254, The terminal SSE send (event_emitter.send_event for completed_event) can fail on a closed channel and prevent persisting the accumulated response; move the persist_response_if_needed call to occur before creating/sending the completed_event in the non-MCP path so accumulator.finalize() and persist_response_if_needed(...) are executed and awaited first, then build/send the completed_event with event_emitter.send_event; update references: completed_event, event_emitter.send_event, accumulator.finalize, and persist_response_if_needed to reflect the new ordering.crates/data_connector/src/oracle.rs (1)
620-650:⚠️ Potential issue | 🟠 MajorBound the
conversation_item_linksPK/index names too.Line 627 and Line 628 still derive Oracle object names directly from
sl.table. With long/customized table names,PK_{table}or{table}_CONV_IDXcan still exceed Oracle’s 30-character identifier limit and fail startup, even though this PR addedoracle_index_name(...)specifically to avoid that class of breakage.Proposed fix
- let pk_name = format!("PK_{}", sl.table); - let idx_name = format!("{}_CONV_IDX", sl.table); + let idx_name = oracle_index_name(&sl.table, "CONV_IDX"); let mut col_defs = vec![ format!("{col_cid} VARCHAR2(64) NOT NULL"), format!("{col_iid} VARCHAR2(64) NOT NULL"), format!("{col_added} TIMESTAMP WITH TIME ZONE"), ]; col_defs.extend(extra_column_defs(sl)); - col_defs.push(format!( - "CONSTRAINT {pk_name} PRIMARY KEY ({col_cid}, {col_iid})" - )); + col_defs.push(format!("PRIMARY KEY ({col_cid}, {col_iid})")); conn.execute( &format!("CREATE TABLE {sl_table} ({})", col_defs.join(", ")), &[], )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/data_connector/src/oracle.rs` around lines 620 - 650, The PK and index names are still built directly from sl.table and can exceed Oracle's 30-char identifier limit; instead wrap those generated names with oracle_index_name(...) before using them. Replace the pk_name and idx_name assignments (currently format!("PK_{}", sl.table) and format!("{}_CONV_IDX", sl.table)) with bounded names produced by calling oracle_index_name(&format!(...)) and then use those bounded names in the CONSTRAINT and CREATE INDEX SQL strings passed to conn.execute (the surrounding code uses sl.col(...), extra_column_defs(sl) and conn.execute(...).map_err(map_oracle_error) so only the pk_name/idx_name construction needs changing).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/wasm/src/storage_hook.rs`:
- Around line 128-137: The current mapping in StorageOperation::CreateMemory
that returns WitOperation::StoreResponse causes durable memory enqueues to be
treated as StoreResponse in WASM hooks; change this to avoid aliasing by either
(A) introducing and returning a proper WIT CreateMemory operation (a new
WitOperation variant) before dispatch so
HookedConversationMemoryWriter::create_memory is routed to a dedicated WIT op,
or (B) explicitly skip or fail the WASM hook for StorageOperation::CreateMemory
(use CREATE_MEMORY_WIT_FALLBACK_LOG_ONCE to log once and return a sentinel/no-op
path that bypasses WASM dispatch) until the ABI is updated; do not map
CreateMemory to WitOperation::StoreResponse.
---
Outside diff comments:
In `@crates/data_connector/src/oracle.rs`:
- Around line 620-650: The PK and index names are still built directly from
sl.table and can exceed Oracle's 30-char identifier limit; instead wrap those
generated names with oracle_index_name(...) before using them. Replace the
pk_name and idx_name assignments (currently format!("PK_{}", sl.table) and
format!("{}_CONV_IDX", sl.table)) with bounded names produced by calling
oracle_index_name(&format!(...)) and then use those bounded names in the
CONSTRAINT and CREATE INDEX SQL strings passed to conn.execute (the surrounding
code uses sl.col(...), extra_column_defs(sl) and
conn.execute(...).map_err(map_oracle_error) so only the pk_name/idx_name
construction needs changing).
In `@model_gateway/src/routers/grpc/regular/responses/streaming.rs`:
- Around line 239-254: The terminal SSE send (event_emitter.send_event for
completed_event) can fail on a closed channel and prevent persisting the
accumulated response; move the persist_response_if_needed call to occur before
creating/sending the completed_event in the non-MCP path so
accumulator.finalize() and persist_response_if_needed(...) are executed and
awaited first, then build/send the completed_event with
event_emitter.send_event; update references: completed_event,
event_emitter.send_event, accumulator.finalize, and persist_response_if_needed
to reflect the new ordering.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f3ff5255-7119-436d-a82e-06e18a6a0464
📒 Files selected for processing (11)
crates/data_connector/src/factory.rscrates/data_connector/src/hooked.rscrates/data_connector/src/hooks.rscrates/data_connector/src/oracle.rscrates/data_connector/src/postgres.rscrates/wasm/src/storage_hook.rsmodel_gateway/src/routers/grpc/harmony/responses/non_streaming.rsmodel_gateway/src/routers/grpc/harmony/responses/streaming.rsmodel_gateway/src/routers/grpc/regular/responses/non_streaming.rsmodel_gateway/src/routers/grpc/regular/responses/streaming.rsmodel_gateway/src/routers/openai/mcp/tool_loop.rs
| // Backward-compatible mapping: existing WIT surface has no dedicated | ||
| // CreateMemory operation yet. | ||
| StorageOperation::CreateMemory => { | ||
| CREATE_MEMORY_WIT_FALLBACK_LOG_ONCE.call_once(|| { | ||
| tracing::debug!( | ||
| "mapping CreateMemory to WIT StoreResponse (no dedicated WIT variant yet)" | ||
| ); | ||
| }); | ||
| WitOperation::StoreResponse | ||
| } |
There was a problem hiding this comment.
Don't alias CreateMemory to StoreResponse.
This makes WASM hooks see durable memory enqueues as StoreResponse, so any response-specific reject/extra-column logic now also runs on HookedConversationMemoryWriter::create_memory. That is a silent behavior change and can break memory inserts on deployments that already use WASM hooks. Please either add a real WIT CreateMemory operation before dispatching this path, or fail/skip the WASM hook explicitly for CreateMemory until the ABI is updated.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/wasm/src/storage_hook.rs` around lines 128 - 137, The current mapping
in StorageOperation::CreateMemory that returns WitOperation::StoreResponse
causes durable memory enqueues to be treated as StoreResponse in WASM hooks;
change this to avoid aliasing by either (A) introducing and returning a proper
WIT CreateMemory operation (a new WitOperation variant) before dispatch so
HookedConversationMemoryWriter::create_memory is routed to a dedicated WIT op,
or (B) explicitly skip or fail the WASM hook for StorageOperation::CreateMemory
(use CREATE_MEMORY_WIT_FALLBACK_LOG_ONCE to log once and return a sentinel/no-op
path that bypasses WASM dispatch) until the ABI is updated; do not map
CreateMemory to WitOperation::StoreResponse.
Description
Problem
Durable conversation-memory enqueue was not consistently wired across ingestion paths, and request-scoped memory context could be dropped in gRPC responses flows. In addition, storage-specific schema/index handling risked drift and naming conflicts under customized schemas.
Solution
Introduce a shared enqueue planning/execution flow for conversation memories, propagate request-scoped memory execution context through gRPC responses paths, and align storage behavior for conversation memory persistence across supported SQL backends.
Changes
memory/enqueue.rsplan builder for durable enqueue row generation./v1/responsespersistence and/v1/conversations/{id}/itemsingestion.ConversationMemoryTypeandConversationMemoryStatuson shared enums.ResponsesContextrequest-context replacement viawith_request_context(...)to remove duplicated constructor wiring.Test Plan
pre-commit run --all-filescargo check -p smg -p data-connectorChecklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspassesSummary by CodeRabbit
New Features
Behavior Changes
Tests