Skip to content

feat(memory): durable conversation-mem enqueue wiring#1357

Open
zhoug9127 wants to merge 7 commits intomainfrom
feat/ltm-conversation-mem-enqueue
Open

feat(memory): durable conversation-mem enqueue wiring#1357
zhoug9127 wants to merge 7 commits intomainfrom
feat/ltm-conversation-mem-enqueue

Conversation

@zhoug9127
Copy link
Copy Markdown
Collaborator

@zhoug9127 zhoug9127 commented Apr 23, 2026

Description

Problem

Durable conversation-memory enqueue was not consistently wired across ingestion paths, and request-scoped memory context could be dropped in gRPC responses flows. In addition, storage-specific schema/index handling risked drift and naming conflicts under customized schemas.

Solution

Introduce a shared enqueue planning/execution flow for conversation memories, propagate request-scoped memory execution context through gRPC responses paths, and align storage behavior for conversation memory persistence across supported SQL backends.

Changes

  • Add memory/enqueue.rs plan builder for durable enqueue row generation.
  • Wire enqueue from both /v1/responses persistence and /v1/conversations/{id}/items ingestion.
  • Unify enqueue execution/logging through shared router persistence helpers.
  • Propagate parsed memory execution context through gRPC regular/harmony + streaming/non-streaming response paths.
  • Canonicalize storage labels for ConversationMemoryType and ConversationMemoryStatus on shared enums.
  • Add durable conversation-memory writer support in Postgres and Oracle paths, including extra-column handling and table-scoped index naming.
  • Refactor gRPC ResponsesContext request-context replacement via with_request_context(...) to remove duplicated constructor wiring.
  • Add targeted enqueue tests for API ingestion flows and enqueue planner validation paths.

Test Plan

  • pre-commit run --all-files
  • cargo check -p smg -p data-connector
Checklist
  • cargo +nightly fmt passes
  • cargo clippy --all-targets --all-features -- -D warnings passes
  • (Optional) Documentation updated
  • (Optional) Please join us on Slack #sig-smg to discuss, review, and merge PRs

Summary by CodeRabbit

  • New Features

    • Conversation memory persistence enabled for Oracle and Postgres backends.
    • Automatic enqueueing of conversation memory rows when storing items or responses.
    • Per-request memory execution context propagated through response handling.
  • Behavior Changes

    • Responses now mark status as "incomplete" when tool-call limits are hit.
  • Tests

    • New API integration tests and utilities for recording and validating enqueued memory rows.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 23, 2026

📝 Walkthrough

Walkthrough

Adds Postgres and Oracle conversation-memory writers, a hooked writer, enqueue planning to generate durable memory rows, and threads MemoryExecutionContext and ConversationMemoryWriter through request handling and persistence so responses and conversation items can create durable memory entries.

Changes

Cohort / File(s) Summary
Core Data Structures
crates/data_connector/src/core.rs
Added ConversationMemoryType::storage_label() and ConversationMemoryStatus::storage_label() returning canonical uppercase storage labels.
Factory & Backend Wiring
crates/data_connector/src/factory.rs
Factory now recognizes Postgres and Oracle as memory-writer backends, instantiates real PostgresConversationMemoryWriter / OracleConversationMemoryWriter, and wraps writers with HookedConversationMemoryWriter when a StorageHook is configured; tests updated.
Postgres Memory Writer
crates/data_connector/src/postgres.rs
New PostgresConversationMemoryWriter with table/index DDL creation, deterministic short index naming helpers, and create_memory that inserts ULID ids and only non-skipped columns including extra columns.
Oracle Memory Writer
crates/data_connector/src/oracle.rs
New OracleConversationMemoryWriter, owner-aware table/index checks (oracle_table_exists, owner-aware create_index_if_missing), conditional schema/index creation, and dynamic INSERT honoring skipped fields and extra columns.
Hooked Writer & Hooks
crates/data_connector/src/hooked.rs, crates/data_connector/src/hooks.rs, crates/wasm/src/storage_hook.rs
Added HookedConversationMemoryWriter to invoke StorageHook::before/after around create_memory; added StorageOperation::CreateMemory and a one-time debug fallback mapping in wasm for the new operation.
Enqueue Planning Module
model_gateway/src/memory/enqueue.rs, model_gateway/src/memory/mod.rs
New enqueue module with EnqueueInputs, EnqueuePlan, EnqueueValidationError, and build_enqueue_plan that validates durable metadata and emits LTM (+30s) and OnDemand (now) NewConversationMemory rows; re-exported.
Persistence Integration
model_gateway/src/routers/common/persistence_utils.rs
persist_conversation_items / inner accept ConversationMemoryWriter and MemoryExecutionContext; added extract_role_message_text_from_items and enqueue_conversation_memory_rows to build and best-effort write memory rows.
Handlers & Routing
model_gateway/src/routers/conversations/handlers.rs, model_gateway/src/server.rs, model_gateway/src/routers/grpc/router.rs
Threaded ConversationMemoryWriter and MemoryExecutionContext into item creation and routing; router derives per-request MemoryExecutionContext and reuses ResponsesContext::with_request_context.
gRPC/OpenAI Response Paths
model_gateway/src/routers/grpc/common/responses/context.rs, .../responses/utils.rs, .../harmony/..., .../regular/..., .../streaming.rs
Added ResponsesContext::with_request_context; response handlers and persistence helpers now accept and forward ConversationMemoryWriter and MemoryExecutionContext; added persist_streaming_response and updated streaming incomplete-status handling.
Request State
model_gateway/src/routers/grpc/regular/responses/common.rs
Added memory_execution_context field to ResponsesCallContext.
Tests & Test Utilities
model_gateway/tests/common/*, model_gateway/tests/api/*
Added RecordingConversationMemoryWriter, refactored test context helpers to accept memory writer, and added memory_enqueue_api_test.rs asserting LTM/OnDemand enqueue behavior, timing, and content.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Router
    participant Handler
    participant Enqueue
    participant MemWriter
    participant Storage

    Client->>Router: request (+x-conversation-memory-config header)
    Router->>Router: derive MemoryExecutionContext
    Router->>Handler: call with MemoryExecutionContext & ConversationMemoryWriter

    Handler->>Handler: process response/items\nextract role text
    Handler->>Enqueue: build_enqueue_plan(inputs)
    Enqueue-->>Handler: EnqueuePlan{rows}

    loop for each row
      Handler->>MemWriter: create_memory(row)
      MemWriter->>Storage: INSERT conversation_memories
      Storage-->>MemWriter: ConversationMemoryId
      MemWriter-->>Handler: Memory ID
    end

    Handler-->>Client: final response
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • key4ng
  • CatherineSue
  • zhaowenzi
  • slin1237

Poem

🐰 Hopping through bytes and midnight logs,

I plant two memories in tidy bogs.
LTM waits thirty, OnDemand leaps now,
Headers whisper, writers take a bow.
A rabbit claps—persisted, wow! 🥕

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: adding durable conversation memory enqueue wiring across the codebase, which aligns with the PR's primary objective.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/ltm-conversation-mem-enqueue

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements durable conversation memory persistence for Oracle and Postgres backends. It introduces ConversationMemoryWriter implementations for both databases, including schema initialization and row insertion logic. The model_gateway is updated to build and enqueue memory plans during response generation and item ingestion. Feedback highlights a potential SQL injection and schema visibility issue in the Oracle implementation, collision risks in Oracle index naming, and invalid identifier generation for Postgres indexes.

Comment thread crates/data_connector/src/oracle.rs Outdated
Comment thread crates/data_connector/src/oracle.rs
Comment thread crates/data_connector/src/postgres.rs
@github-actions github-actions Bot added grpc gRPC client and router changes tests Test changes data-connector Data connector crate changes model-gateway Model gateway crate changes openai OpenAI router changes labels Apr 23, 2026
Comment thread crates/data_connector/src/oracle.rs
Comment thread crates/data_connector/src/oracle.rs Outdated
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean, well-tested PR. The memory enqueue wiring is threaded correctly through all request paths with no double-persistence risks. Control flow in streaming tool loops is sound — each exit path persists exactly once. Good test coverage on the pure planner logic and API integration paths. Two minor nits posted (index name sanitization consistency and a redundant clone).

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 72fe328788

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread model_gateway/src/routers/grpc/regular/responses/streaming.rs
Comment thread model_gateway/src/routers/grpc/harmony/responses/streaming.rs
Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
@zhoug9127 zhoug9127 force-pushed the feat/ltm-conversation-mem-enqueue branch from 72fe328 to 0eb318b Compare April 23, 2026 17:58
Comment thread model_gateway/src/routers/grpc/router.rs Outdated
Comment thread model_gateway/src/routers/grpc/regular/responses/streaming.rs Outdated
Comment thread model_gateway/src/routers/grpc/harmony/responses/streaming.rs Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
model_gateway/src/routers/grpc/harmony/responses/streaming.rs (1)

298-316: ⚠️ Potential issue | 🟠 Major

Emit incomplete_details on response, not inside usage.

Here the persisted response is correct (final_response.incomplete_details = ...), but the streamed response.completed event puts the field inside usage because emit_completed() only accepts a usage object. That makes streaming clients lose the max-tool-calls reason in the canonical response.incomplete_details slot.

💡 Proposed fix
-                    let usage_json = json!({
+                    let usage_json = json!({
                         "input_tokens": usage.prompt_tokens,
                         "output_tokens": usage.completion_tokens,
                         "total_tokens": usage.total_tokens,
-                        "incomplete_details": incomplete_details,
                     });
                     persist_streaming_response(
                         ctx,
                         memory_execution_context,
                         emitter,
                         Some(usage.clone()),
                         original_request,
                         Some(incomplete_details),
                     )
                     .await;
-                    let event = emitter.emit_completed(Some(&usage_json));
+                    let mut event = emitter.emit_completed(Some(&usage_json));
+                    event["response"]["incomplete_details"] =
+                        json!({ "reason": "max_tool_calls" });
                     emitter.send_event_best_effort(&event, tx);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/grpc/harmony/responses/streaming.rs` around lines
298 - 316, The streamed response.completed event currently places
incomplete_details inside the usage object (see usage_json and
emitter.emit_completed), causing clients to miss
final_response.incomplete_details; change the emission to put incomplete_details
at the top-level of the response.completed event instead of inside usage —
either call or implement an overload/variant of emit_completed that accepts both
a usage and an optional incomplete_details (referencing emit_completed,
usage_json, incomplete_details, persist_streaming_response, and
emitter.send_event_best_effort) or construct the event manually so the emitted
JSON has "incomplete_details" alongside "usage" in the response.completed
payload.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/data_connector/src/postgres.rs`:
- Around line 995-1041: postgres_index_name can produce identifiers >
MAX_POSTGRES_IDENT_LEN when suffix is very long because the hashed branch
appends HASH_HEX_LEN + "_" + prefix; update postgres_index_name to guarantee the
final formatted "{table_part}_{suffix}" never exceeds MAX_POSTGRES_IDENT_LEN by
either (a) ensuring you only take the hashed branch when table_budget >=
HASH_HEX_LEN + 2 (accounting for "_" and at least one prefix char) and otherwise
truncate the suffix further, or (b) after constructing table_part and suffix,
clamp/truncate the suffix so format!("{table_part}_{suffix}") is at most
MAX_POSTGRES_IDENT_LEN; adjust logic around MAX_POSTGRES_IDENT_LEN, HASH_HEX_LEN
and use stable_hash_hex_lower as before to produce the hash when budget allows.
- Around line 870-918: The current constructor builds a single DDL string
(variable ddl) that includes CREATE TABLE and all CREATE INDEX statements and
calls client.batch_execute(&ddl), which causes startup to hard-fail if any index
creation errors (e.g., missing columns due to migration drift); change it to
execute the CREATE TABLE first (batch_execute on the table DDL only) then
iterate over each index (those created via postgres_index_name(&s.table, "...")
and s.col(...)) and call client.batch_execute(&index_ddl) separately for each
index, catching errors and logging them with tracing::debug! (mirroring
PostgresResponseStorage::new behavior) instead of returning an Err so writer
construction can continue on index failures.

In `@model_gateway/src/memory/enqueue.rs`:
- Around line 133-142: The code currently maps MemoryPolicyMode::Unspecified and
::Unrecognized in policy_mode_label which can leak into persisted memory_config
if store_ltm is active unexpectedly; update build_enqueue_plan to validate that
the chosen MemoryPolicyMode is one of the store-bearing variants
(MemoryPolicyMode::StoreOnly or ::StoreAndRecall) before proceeding (when
store_ltm.active() is true) and return an error or assert/panic if not, or
alternatively document the invariant with a clear comment in build_enqueue_plan;
reference the policy_mode_label helper and the MemoryPolicyMode enum when adding
the check so the validation is colocated with the store_ltm.active() branch.

In `@model_gateway/src/routers/grpc/regular/responses/streaming.rs`:
- Around line 639-661: The stop reason is being placed inside usage_json, but
streaming clients expect response.incomplete_details at the top-level of the
emitted completion event; after you call
emitter.emit_completed(Some(&usage_json)) adjust the returned event to add
incomplete_details at the response-level (e.g., set
event.payload["incomplete_details"] = incomplete_details) before calling
emitter.send_event so the emitted event's top-level response contains the
incomplete_details; keep persist_streaming_response as-is (it already stores
incomplete_details at response level).

---

Outside diff comments:
In `@model_gateway/src/routers/grpc/harmony/responses/streaming.rs`:
- Around line 298-316: The streamed response.completed event currently places
incomplete_details inside the usage object (see usage_json and
emitter.emit_completed), causing clients to miss
final_response.incomplete_details; change the emission to put incomplete_details
at the top-level of the response.completed event instead of inside usage —
either call or implement an overload/variant of emit_completed that accepts both
a usage and an optional incomplete_details (referencing emit_completed,
usage_json, incomplete_details, persist_streaming_response, and
emitter.send_event_best_effort) or construct the event manually so the emitted
JSON has "incomplete_details" alongside "usage" in the response.completed
payload.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 5941ec0b-2a1d-4e87-8de0-079b246f7934

📥 Commits

Reviewing files that changed from the base of the PR and between e8f8473 and 3893a85.

📒 Files selected for processing (24)
  • crates/data_connector/src/core.rs
  • crates/data_connector/src/factory.rs
  • crates/data_connector/src/oracle.rs
  • crates/data_connector/src/postgres.rs
  • model_gateway/src/memory/enqueue.rs
  • model_gateway/src/memory/mod.rs
  • model_gateway/src/routers/common/persistence_utils.rs
  • model_gateway/src/routers/conversations/handlers.rs
  • model_gateway/src/routers/grpc/common/responses/context.rs
  • model_gateway/src/routers/grpc/common/responses/utils.rs
  • model_gateway/src/routers/grpc/harmony/responses/non_streaming.rs
  • model_gateway/src/routers/grpc/harmony/responses/streaming.rs
  • model_gateway/src/routers/grpc/regular/responses/common.rs
  • model_gateway/src/routers/grpc/regular/responses/handlers.rs
  • model_gateway/src/routers/grpc/regular/responses/non_streaming.rs
  • model_gateway/src/routers/grpc/regular/responses/streaming.rs
  • model_gateway/src/routers/grpc/router.rs
  • model_gateway/src/routers/openai/responses/non_streaming.rs
  • model_gateway/src/routers/openai/responses/streaming.rs
  • model_gateway/src/server.rs
  • model_gateway/tests/api/memory_enqueue_api_test.rs
  • model_gateway/tests/api/mod.rs
  • model_gateway/tests/common/mod.rs
  • model_gateway/tests/common/recording_memory_writer.rs

Comment thread crates/data_connector/src/postgres.rs Outdated
Comment thread crates/data_connector/src/postgres.rs
Comment thread model_gateway/src/memory/enqueue.rs Outdated
Comment thread model_gateway/src/routers/grpc/regular/responses/streaming.rs
Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
@zhoug9127 zhoug9127 force-pushed the feat/ltm-conversation-mem-enqueue branch from 3893a85 to e406f32 Compare April 23, 2026 19:43
Comment thread crates/data_connector/src/postgres.rs
Comment thread model_gateway/src/routers/grpc/regular/responses/streaming.rs
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/data_connector/src/oracle.rs (1)

1374-1396: 🧹 Nitpick | 🔵 Trivial

Response-schema indexes still use raw format! naming.

create_index_if_missing now takes an owner, but prev_idx/user_idx here are still built as format!("{}_PREV_IDX", s.table) / format!("{}_USER_IDX", s.table) rather than going through the new oracle_index_name helper. For the default RESPONSES table name this is fine (≤30 chars), but with a customized long schema.responses.table (e.g. a tenant prefix) the raw concatenation can exceed Oracle's 30-char identifier limit and fail DDL at startup, while the new conversation_memories indexes would succeed.

Not a regression (pre-existing behavior), but since you've just introduced the canonical helper one call site over, routing these two names through oracle_index_name would make schema-init robust to custom table names uniformly.

♻️ Proposed change
-            let prev_idx = format!("{}_PREV_IDX", s.table);
+            let prev_idx = oracle_index_name(&s.table, "PREV_IDX");
@@
-            let user_idx = format!("{}_USER_IDX", s.table);
+            let user_idx = oracle_index_name(&s.table, "USER_IDX");
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/data_connector/src/oracle.rs` around lines 1374 - 1396, The index
names for previous_response_id and safety_identifier are built with raw format!
into prev_idx and user_idx which can exceed Oracle's 30-char identifier limit
for long s.table values; update the two call sites that build prev_idx and
user_idx to use the canonical oracle_index_name helper (e.g.,
oracle_index_name(&s.table, "PREV_IDX") and oracle_index_name(&s.table,
"USER_IDX")) before passing them into create_index_if_missing so the names are
properly truncated/sanitized; keep the rest of the create_index_if_missing calls
(including schema.owner.as_deref(), &s.table and the CREATE INDEX SQL text)
unchanged except to reference the new oracle_index_name-produced identifier.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/data_connector/src/oracle.rs`:
- Around line 1742-1780: Add focused unit tests for oracle_index_name to
exercise boundary behaviors: verify passthrough for short table+suffix,
hashed/truncated prefix behavior for a very long table with a normal suffix
(assert length <=30 and prefix contains stable_hash_hex_upper-driven hash), the
fallback when a very long suffix forces the table_budget==0 branch (ensure
suffix truncated and name still <=30 and starts with letter), and the
leading-char replacement when table names start with digits or '_' (assert first
char is ASCII alphabetic). Use oracle_index_name directly and assert the
resulting string length is <=30 and matches the expected structural properties
(contains '_' separator, starts with a letter, and includes expected
truncated/hashed pieces) to prevent regressions in
build_index_table_part/stable_hash_hex_upper interactions.
- Around line 1187-1235: Add a composite index creation for the dequeue query:
when both "status" and "next_run_at" are not skipped, call
oracle_index_name(&s.table, "STATUS_NEXT_RUN_AT_IDX") and invoke
create_index_if_missing(...) with a CREATE INDEX ... ON {table} (status,
next_run_at); if your schema/workers shard by owner and "owner_id" is present
(s.is_skipped("owner_id") == false), create an index with owner_id as the
leading column (owner_id, status, next_run_at) instead; place this block near
the existing single-column index creations and reuse schema.owner.as_deref(),
&s.table and s.col("...") to build the SQL.

In `@model_gateway/src/routers/grpc/regular/responses/streaming.rs`:
- Around line 907-916: The branch that calls persist_streaming_response(...) and
then breaks needs to also emit the terminal "response.completed" event via the
same emitter used in the other branches so clients don't stay stuck in
"in_progress"; after the await persist_streaming_response(...) call, invoke the
emitter's response-completion/terminal event method (the same call used in the
no-tool and Harmony streaming paths) with the same parameters (e.g.,
context/original_request and any metadata/usage) and only then break/return from
the function so the lifecycle completes.

---

Outside diff comments:
In `@crates/data_connector/src/oracle.rs`:
- Around line 1374-1396: The index names for previous_response_id and
safety_identifier are built with raw format! into prev_idx and user_idx which
can exceed Oracle's 30-char identifier limit for long s.table values; update the
two call sites that build prev_idx and user_idx to use the canonical
oracle_index_name helper (e.g., oracle_index_name(&s.table, "PREV_IDX") and
oracle_index_name(&s.table, "USER_IDX")) before passing them into
create_index_if_missing so the names are properly truncated/sanitized; keep the
rest of the create_index_if_missing calls (including schema.owner.as_deref(),
&s.table and the CREATE INDEX SQL text) unchanged except to reference the new
oracle_index_name-produced identifier.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 7ec40250-984a-4864-900f-8d3b9917f63f

📥 Commits

Reviewing files that changed from the base of the PR and between 3893a85 and e406f32.

📒 Files selected for processing (6)
  • crates/data_connector/src/oracle.rs
  • crates/data_connector/src/postgres.rs
  • model_gateway/src/memory/enqueue.rs
  • model_gateway/src/routers/grpc/harmony/responses/streaming.rs
  • model_gateway/src/routers/grpc/regular/responses/streaming.rs
  • model_gateway/src/routers/grpc/router.rs

Comment thread crates/data_connector/src/oracle.rs
Comment thread crates/data_connector/src/oracle.rs
Comment thread model_gateway/src/routers/grpc/regular/responses/streaming.rs
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model_gateway/src/routers/grpc/regular/responses/streaming.rs`:
- Around line 639-660: persisted responses created by persist_streaming_response
end up with status=ResponseStatus::Completed even when incomplete_details is set
(e.g., max_tool_calls); update persist_streaming_response (the code path that
calls emitter.finalize and then calls persist_response_if_needed / builds
final_response) to detect if incomplete_details.is_some() and, before
persisting, override final_response.status = ResponseStatus::Incomplete so
stored responses with non-null incomplete_details are marked Incomplete per the
Responses API.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: a62c1b9d-f9c7-4e70-9609-98a2dc7ddbc4

📥 Commits

Reviewing files that changed from the base of the PR and between e406f32 and 26fe057.

📒 Files selected for processing (2)
  • crates/data_connector/src/oracle.rs
  • model_gateway/src/routers/grpc/regular/responses/streaming.rs

Comment thread model_gateway/src/routers/grpc/regular/responses/streaming.rs
…tion

Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
@zhoug9127 zhoug9127 force-pushed the feat/ltm-conversation-mem-enqueue branch from 26fe057 to c451c59 Compare April 23, 2026 21:41
Comment thread crates/data_connector/src/postgres.rs
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c451c59f83

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread crates/data_connector/src/factory.rs
Comment thread model_gateway/src/routers/grpc/regular/responses/streaming.rs
Comment thread model_gateway/src/routers/grpc/harmony/responses/streaming.rs
@zhoug9127 zhoug9127 requested a review from tonyluj as a code owner April 23, 2026 22:16
@github-actions github-actions Bot added the wasm WebAssembly related changes label Apr 23, 2026
Comment thread crates/wasm/src/storage_hook.rs Outdated
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: dc509d690f

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread crates/data_connector/src/oracle.rs Outdated
…status

Signed-off-by: Daisy Zhou <zhoug9127@gmail.com>
@zhoug9127 zhoug9127 force-pushed the feat/ltm-conversation-mem-enqueue branch from dc509d6 to df9e777 Compare April 23, 2026 23:08
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
model_gateway/src/routers/grpc/regular/responses/streaming.rs (1)

239-254: ⚠️ Potential issue | 🟠 Major

Persist before the terminal SSE send in the non-MCP path.

Line 240 can fail on a closed channel, and then Line 244 is never reached. That drops the stored response and the new durable memory enqueue for exactly the “client disconnected at the end” case. The MCP branches in this file were already hardened by persisting first; this path needs the same ordering.

Proposed fix
-    let completed_event = event_emitter.emit_completed(usage_json.as_ref());
-    event_emitter.send_event(&completed_event, &tx)?;
-
-    // Finalize and persist accumulated response
     let final_response = accumulator.finalize();
     persist_response_if_needed(
         conversation_storage,
         conversation_item_storage,
         response_storage,
         conversation_memory_writer,
         memory_execution_context,
         &final_response,
         &original_request,
         request_context,
     )
     .await;
+
+    let completed_event = event_emitter.emit_completed(usage_json.as_ref());
+    event_emitter.send_event(&completed_event, &tx)?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/grpc/regular/responses/streaming.rs` around lines
239 - 254, The terminal SSE send (event_emitter.send_event for completed_event)
can fail on a closed channel and prevent persisting the accumulated response;
move the persist_response_if_needed call to occur before creating/sending the
completed_event in the non-MCP path so accumulator.finalize() and
persist_response_if_needed(...) are executed and awaited first, then build/send
the completed_event with event_emitter.send_event; update references:
completed_event, event_emitter.send_event, accumulator.finalize, and
persist_response_if_needed to reflect the new ordering.
crates/data_connector/src/oracle.rs (1)

620-650: ⚠️ Potential issue | 🟠 Major

Bound the conversation_item_links PK/index names too.

Line 627 and Line 628 still derive Oracle object names directly from sl.table. With long/customized table names, PK_{table} or {table}_CONV_IDX can still exceed Oracle’s 30-character identifier limit and fail startup, even though this PR added oracle_index_name(...) specifically to avoid that class of breakage.

Proposed fix
-            let pk_name = format!("PK_{}", sl.table);
-            let idx_name = format!("{}_CONV_IDX", sl.table);
+            let idx_name = oracle_index_name(&sl.table, "CONV_IDX");

             let mut col_defs = vec![
                 format!("{col_cid} VARCHAR2(64) NOT NULL"),
                 format!("{col_iid} VARCHAR2(64) NOT NULL"),
                 format!("{col_added} TIMESTAMP WITH TIME ZONE"),
             ];
             col_defs.extend(extra_column_defs(sl));
-            col_defs.push(format!(
-                "CONSTRAINT {pk_name} PRIMARY KEY ({col_cid}, {col_iid})"
-            ));
+            col_defs.push(format!("PRIMARY KEY ({col_cid}, {col_iid})"));

             conn.execute(
                 &format!("CREATE TABLE {sl_table} ({})", col_defs.join(", ")),
                 &[],
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/data_connector/src/oracle.rs` around lines 620 - 650, The PK and index
names are still built directly from sl.table and can exceed Oracle's 30-char
identifier limit; instead wrap those generated names with oracle_index_name(...)
before using them. Replace the pk_name and idx_name assignments (currently
format!("PK_{}", sl.table) and format!("{}_CONV_IDX", sl.table)) with bounded
names produced by calling oracle_index_name(&format!(...)) and then use those
bounded names in the CONSTRAINT and CREATE INDEX SQL strings passed to
conn.execute (the surrounding code uses sl.col(...), extra_column_defs(sl) and
conn.execute(...).map_err(map_oracle_error) so only the pk_name/idx_name
construction needs changing).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/wasm/src/storage_hook.rs`:
- Around line 128-137: The current mapping in StorageOperation::CreateMemory
that returns WitOperation::StoreResponse causes durable memory enqueues to be
treated as StoreResponse in WASM hooks; change this to avoid aliasing by either
(A) introducing and returning a proper WIT CreateMemory operation (a new
WitOperation variant) before dispatch so
HookedConversationMemoryWriter::create_memory is routed to a dedicated WIT op,
or (B) explicitly skip or fail the WASM hook for StorageOperation::CreateMemory
(use CREATE_MEMORY_WIT_FALLBACK_LOG_ONCE to log once and return a sentinel/no-op
path that bypasses WASM dispatch) until the ABI is updated; do not map
CreateMemory to WitOperation::StoreResponse.

---

Outside diff comments:
In `@crates/data_connector/src/oracle.rs`:
- Around line 620-650: The PK and index names are still built directly from
sl.table and can exceed Oracle's 30-char identifier limit; instead wrap those
generated names with oracle_index_name(...) before using them. Replace the
pk_name and idx_name assignments (currently format!("PK_{}", sl.table) and
format!("{}_CONV_IDX", sl.table)) with bounded names produced by calling
oracle_index_name(&format!(...)) and then use those bounded names in the
CONSTRAINT and CREATE INDEX SQL strings passed to conn.execute (the surrounding
code uses sl.col(...), extra_column_defs(sl) and
conn.execute(...).map_err(map_oracle_error) so only the pk_name/idx_name
construction needs changing).

In `@model_gateway/src/routers/grpc/regular/responses/streaming.rs`:
- Around line 239-254: The terminal SSE send (event_emitter.send_event for
completed_event) can fail on a closed channel and prevent persisting the
accumulated response; move the persist_response_if_needed call to occur before
creating/sending the completed_event in the non-MCP path so
accumulator.finalize() and persist_response_if_needed(...) are executed and
awaited first, then build/send the completed_event with
event_emitter.send_event; update references: completed_event,
event_emitter.send_event, accumulator.finalize, and persist_response_if_needed
to reflect the new ordering.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: f3ff5255-7119-436d-a82e-06e18a6a0464

📥 Commits

Reviewing files that changed from the base of the PR and between 26fe057 and df9e777.

📒 Files selected for processing (11)
  • crates/data_connector/src/factory.rs
  • crates/data_connector/src/hooked.rs
  • crates/data_connector/src/hooks.rs
  • crates/data_connector/src/oracle.rs
  • crates/data_connector/src/postgres.rs
  • crates/wasm/src/storage_hook.rs
  • model_gateway/src/routers/grpc/harmony/responses/non_streaming.rs
  • model_gateway/src/routers/grpc/harmony/responses/streaming.rs
  • model_gateway/src/routers/grpc/regular/responses/non_streaming.rs
  • model_gateway/src/routers/grpc/regular/responses/streaming.rs
  • model_gateway/src/routers/openai/mcp/tool_loop.rs

Comment on lines +128 to +137
// Backward-compatible mapping: existing WIT surface has no dedicated
// CreateMemory operation yet.
StorageOperation::CreateMemory => {
CREATE_MEMORY_WIT_FALLBACK_LOG_ONCE.call_once(|| {
tracing::debug!(
"mapping CreateMemory to WIT StoreResponse (no dedicated WIT variant yet)"
);
});
WitOperation::StoreResponse
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't alias CreateMemory to StoreResponse.

This makes WASM hooks see durable memory enqueues as StoreResponse, so any response-specific reject/extra-column logic now also runs on HookedConversationMemoryWriter::create_memory. That is a silent behavior change and can break memory inserts on deployments that already use WASM hooks. Please either add a real WIT CreateMemory operation before dispatching this path, or fail/skip the WASM hook explicitly for CreateMemory until the ABI is updated.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/wasm/src/storage_hook.rs` around lines 128 - 137, The current mapping
in StorageOperation::CreateMemory that returns WitOperation::StoreResponse
causes durable memory enqueues to be treated as StoreResponse in WASM hooks;
change this to avoid aliasing by either (A) introducing and returning a proper
WIT CreateMemory operation (a new WitOperation variant) before dispatch so
HookedConversationMemoryWriter::create_memory is routed to a dedicated WIT op,
or (B) explicitly skip or fail the WASM hook for StorageOperation::CreateMemory
(use CREATE_MEMORY_WIT_FALLBACK_LOG_ONCE to log once and return a sentinel/no-op
path that bypasses WASM dispatch) until the ABI is updated; do not map
CreateMemory to WitOperation::StoreResponse.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data-connector Data connector crate changes grpc gRPC client and router changes model-gateway Model gateway crate changes openai OpenAI router changes tests Test changes wasm WebAssembly related changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant