Conversation
|
Deployment failed with the following error: |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughBuiltin adapter adds per-topic-per-function routing state and fans out messages to per-function queues; RabbitMQ switches to a fanout exchange, creates per-function queues/DLQs, and routes/retries by function_id; publisher/retry/worker updated; new e2e and integration tests validate fanout and replica semantics. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Builtin as BuiltinAdapter\n(topic_functions, trigger_function_map)
participant IQ1 as InternalQueue\n(topic::fn_1)
participant IQ2 as InternalQueue\n(topic::fn_2)
participant F1 as Function1
participant F2 as Function2
Client->>Builtin: enqueue(message, topic)
activate Builtin
Builtin->>Builtin: lookup functions for topic
deactivate Builtin
par Fan-out
Builtin->>IQ1: push(message)
activate IQ1
IQ1->>F1: deliver
deactivate IQ1
and
Builtin->>IQ2: push(message)
activate IQ2
IQ2->>F2: deliver
deactivate IQ2
end
sequenceDiagram
participant Subscriber
participant Adapter as RabbitMQAdapter
participant Names as RabbitNames
participant Topology as TopologyManager
participant Exchange as FanoutExchange
participant PFQ as PerFunctionQueue
Subscriber->>Adapter: subscribe(topic, function_id)
activate Adapter
Adapter->>Names: function_queue(function_id)
Names-->>Adapter: per_function_queue_name
Adapter->>Topology: setup_subscriber_queue(topic, function_id)
Topology->>Exchange: ensure fanout exchange declared
Topology->>PFQ: declare DLQ and per-function queue, bind to exchange
Adapter->>Adapter: spawn worker with queue_name = per_function_queue_name
Adapter-->>Subscriber: subscription handle
deactivate Adapter
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Each distinct function_id subscribed to a topic now gets its own
internal queue ({topic}::{function_id}). Enqueue fans out to all
per-function queues. Replicas of the same function compete on their
shared queue. DLQ, topic_stats, and list_topics aggregate across
per-function queues.
Topic exchange changed from topic to fanout. Each subscriber function
gets its own queue (iii.{topic}.{function_id}.queue) and DLQ bound to
the fanout exchange. RabbitMQ natively delivers a copy of each message
to every bound queue. Worker now accepts queue_name directly instead
of computing from topic.
7 builtin adapter tests: two-function fanout, replica competition, mixed functions+replicas, single subscriber regression, unsubscribe stops delivery, payload integrity, condition function filtering. 3 RabbitMQ adapter tests (behind rabbitmq feature): two-function fanout, replica competition, per-function DLQ isolation.
15cd83d to
1989db4
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
engine/src/modules/queue/adapters/rabbitmq/topology.rs (1)
91-100:⚠️ Potential issue | 🔴 CriticalPer-function queue missing
x-dead-letter-exchangeconfiguration — messages won't route to DLQ.The per-function queue is declared with
FieldTable::default(), which means it has no dead-letter-exchange configured. When messages are nack'd withrequeue: false, RabbitMQ will discard them instead of routing to the per-function DLQ.Compare to
setup_function_queue(lines 140-154) which correctly configures DLX:let mut main_queue_args = FieldTable::default(); main_queue_args.insert( "x-dead-letter-exchange".into(), AMQPValue::LongString(names.dlq_exchange().into()), );The DLQ queue is created (lines 80-89) but won't receive any messages without the DLX binding.
🐛 Proposed fix
+ // Configure dead-letter exchange so nack(requeue=false) routes to DLQ + let mut queue_args = FieldTable::default(); + // For per-function queues, we publish directly to the DLQ queue + // since there's no per-function DLQ exchange + queue_args.insert( + "x-dead-letter-routing-key".into(), + AMQPValue::LongString(dlq_name.clone().into()), + ); + self.channel .queue_declare( &queue_name, QueueDeclareOptions { durable: true, ..Default::default() }, - FieldTable::default(), + queue_args, ) .await?;Note: You'll also need to create a DLQ exchange and bind the DLQ queue to it, similar to the pattern in
setup_function_queue.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/modules/queue/adapters/rabbitmq/topology.rs` around lines 91 - 100, The per-function queue declaration uses FieldTable::default() and lacks the x-dead-letter-exchange argument, so nacked messages will be discarded instead of routed to the DLQ; update the queue declaration in the same block that calls queue_declare (where queue_name is used) to build a FieldTable (like in setup_function_queue) and insert ("x-dead-letter-exchange", AMQPValue::LongString(names.dlq_exchange().into())) before calling queue_declare, and also ensure the DLQ exchange and binding for the DLQ queue (created earlier) are created/bound similarly to the pattern used in setup_function_queue.
🧹 Nitpick comments (2)
engine/tests/rabbitmq_queue_integration.rs (1)
1121-1171: Consider reusing existing helpers fromcommon::queue_helpers.
register_rmq_capturing_functionandregister_rmq_counting_fnappear to duplicate functionality fromregister_payload_capturing_functionandregister_counting_functionalready defined incommon::queue_helpers(imported on lines 26-31).💡 Potential consolidation
-fn register_rmq_capturing_function( - engine: &Arc<Engine>, - function_id: &str, -) -> Arc<Mutex<Vec<Value>>> { - // ... implementation ... -} - -fn register_rmq_counting_fn( - engine: &Arc<Engine>, - function_id: &str, -) -> Arc<AtomicU64> { - // ... implementation ... -} +// Use existing helpers from common::queue_helpers +// let cap_a = register_payload_capturing_function(&engine, "test::rmq_fan_a", captured_a.clone());🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/tests/rabbitmq_queue_integration.rs` around lines 1121 - 1171, The two local helpers register_rmq_capturing_function and register_rmq_counting_fn duplicate functionality already provided by common::queue_helpers (register_payload_capturing_function and register_counting_function); replace the local definitions by calling those helpers instead: remove register_rmq_capturing_function and register_rmq_counting_fn, and where they are used call register_payload_capturing_function(engine, function_id) to return the Arc<Mutex<Vec<Value>>> and register_counting_function(engine, function_id) to return the Arc<AtomicU64>, ensuring the types/signatures match and any necessary imports from common::queue_helpers remain in scope.engine/src/modules/queue/adapters/rabbitmq/naming.rs (1)
30-36: Consider adding unit tests for the new naming methods.The new
function_queueandfunction_dlqmethods follow the existing naming pattern, but lack dedicated unit tests in this file. The existingtest_rabbit_namestest could be extended to cover these:💡 Suggested test addition
#[test] fn test_rabbit_names() { let names = RabbitNames::new("user.created"); assert_eq!(names.exchange(), "iii.user.created.exchange"); assert_eq!(names.queue(), "iii.user.created.queue"); assert_eq!(names.dlq(), "iii.user.created.dlq"); + assert_eq!(names.function_queue("handler_a"), "iii.user.created.handler_a.queue"); + assert_eq!(names.function_dlq("handler_a"), "iii.user.created.handler_a.dlq"); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/modules/queue/adapters/rabbitmq/naming.rs` around lines 30 - 36, Add unit tests exercising the new naming methods function_queue and function_dlq to ensure they produce the expected strings; update or extend the existing test_rabbit_names (or add a new test) to construct a Naming instance with a known topic and call Naming::function_queue(function_id) and Naming::function_dlq(function_id), asserting the returned strings equal format!("{}.{}.{}.queue", EXCHANGE_PREFIX, topic, function_id) and format!("{}.{}.{}.dlq", EXCHANGE_PREFIX, topic, function_id) respectively so the methods are covered by the test suite.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/src/modules/queue/adapters/builtin/adapter.rs`:
- Around line 368-384: The current dlq_messages implementation aggregates
results from multiple internal queues (constructed as iq = format!("{}::{}",
topic, fid)) by extending a Vec, which can yield up to count * num_functions
items; change dlq_messages to mirror dlq_peek's aggregation strategy: collect
all messages from each self.queue.dlq_messages(&iq, count).await into a single
Vec, sort them deterministically (e.g., by the same timestamp/field used in
dlq_peek) and then truncate/take only up to count before returning; keep the
same behavior for the single-topic branch (Ok(self.queue.dlq_messages(topic,
count).await)) and reuse topic_functions and iq naming to locate the code.
In `@engine/src/modules/queue/adapters/rabbitmq/adapter.rs`:
- Around line 310-318: The retry handler currently cannot route exhausted
messages to the per-function dead-letter queue because handle_failure lacks
function_id; update the design so exhausted messages are published to
names.function_dlq(function_id) instead of names.dlq(): either extend the retry
handler's handle_failure signature to accept function_id (propagate function_id
from setup_subscriber_queue into the retry handler and use it when publishing to
the DLQ), or remove/avoid creating per-function DLQs in setup_subscriber_queue
if a single topic-level DLQ is intended; changes should reference
setup_subscriber_queue, handle_failure, names.function_dlq, names.dlq and ensure
the code path that publishes exhausted messages uses the function_id-aware DLQ
when available.
In `@engine/tests/queue_e2e_fanout.rs`:
- Around line 101-125: The test registers triggers before the handlers, risking
missed messages; change fanout_two_functions_same_topic_both_receive so that
register_capturing_function("test::fn_a") and
register_capturing_function("test::fn_b") are called and awaited before calling
setup_engine_with_topic_triggers (or alternatively ensure
setup_engine_with_topic_triggers does not subscribe immediately), ensuring
handlers are registered prior to enqueue_to_topic; refer to the functions
register_capturing_function, setup_engine_with_topic_triggers, enqueue_to_topic
and the test function fanout_two_functions_same_topic_both_receive when making
the change.
In `@engine/tests/rabbitmq_queue_integration.rs`:
- Around line 1287-1356: The test rmq_fanout_dlq_per_function currently only
checks the success handler and that the failing function was invoked; update it
to also assert the failed message landed in the per-function DLQ by
querying/consuming the DLQ queue (name pattern:
iii.{topic}.test::rmq_dlq_fail.dlq) after waiting for retries, similar to the
logic in rmq_dlq_exhaustion_with_content_verification: use the same inspection
helper or implement a short consumer to fetch messages from that DLQ, verify
there is exactly one message (or expected count) and that its payload contains
{"data":"dlq-test"}, and add the corresponding assertions in
rmq_fanout_dlq_per_function.
---
Outside diff comments:
In `@engine/src/modules/queue/adapters/rabbitmq/topology.rs`:
- Around line 91-100: The per-function queue declaration uses
FieldTable::default() and lacks the x-dead-letter-exchange argument, so nacked
messages will be discarded instead of routed to the DLQ; update the queue
declaration in the same block that calls queue_declare (where queue_name is
used) to build a FieldTable (like in setup_function_queue) and insert
("x-dead-letter-exchange", AMQPValue::LongString(names.dlq_exchange().into()))
before calling queue_declare, and also ensure the DLQ exchange and binding for
the DLQ queue (created earlier) are created/bound similarly to the pattern used
in setup_function_queue.
---
Nitpick comments:
In `@engine/src/modules/queue/adapters/rabbitmq/naming.rs`:
- Around line 30-36: Add unit tests exercising the new naming methods
function_queue and function_dlq to ensure they produce the expected strings;
update or extend the existing test_rabbit_names (or add a new test) to construct
a Naming instance with a known topic and call
Naming::function_queue(function_id) and Naming::function_dlq(function_id),
asserting the returned strings equal format!("{}.{}.{}.queue", EXCHANGE_PREFIX,
topic, function_id) and format!("{}.{}.{}.dlq", EXCHANGE_PREFIX, topic,
function_id) respectively so the methods are covered by the test suite.
In `@engine/tests/rabbitmq_queue_integration.rs`:
- Around line 1121-1171: The two local helpers register_rmq_capturing_function
and register_rmq_counting_fn duplicate functionality already provided by
common::queue_helpers (register_payload_capturing_function and
register_counting_function); replace the local definitions by calling those
helpers instead: remove register_rmq_capturing_function and
register_rmq_counting_fn, and where they are used call
register_payload_capturing_function(engine, function_id) to return the
Arc<Mutex<Vec<Value>>> and register_counting_function(engine, function_id) to
return the Arc<AtomicU64>, ensuring the types/signatures match and any necessary
imports from common::queue_helpers remain in scope.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 7bb16964-e5c0-40a9-aeb1-c865ffa78a7b
📒 Files selected for processing (7)
engine/src/modules/queue/adapters/builtin/adapter.rsengine/src/modules/queue/adapters/rabbitmq/adapter.rsengine/src/modules/queue/adapters/rabbitmq/naming.rsengine/src/modules/queue/adapters/rabbitmq/topology.rsengine/src/modules/queue/adapters/rabbitmq/worker.rsengine/tests/queue_e2e_fanout.rsengine/tests/rabbitmq_queue_integration.rs
| async fn dlq_messages( | ||
| &self, | ||
| topic: &str, | ||
| count: usize, | ||
| ) -> anyhow::Result<Vec<serde_json::Value>> { | ||
| Ok(self.queue.dlq_messages(topic, count).await) | ||
| let tf = self.topic_functions.read().await; | ||
| if let Some(fids) = tf.get(topic) { | ||
| let mut all = Vec::new(); | ||
| for fid in fids { | ||
| let iq = format!("{}::{}", topic, fid); | ||
| all.extend(self.queue.dlq_messages(&iq, count).await); | ||
| } | ||
| Ok(all) | ||
| } else { | ||
| Ok(self.queue.dlq_messages(topic, count).await) | ||
| } | ||
| } |
There was a problem hiding this comment.
dlq_messages may return more than count messages when aggregating.
When multiple functions subscribe to a topic, this method calls self.queue.dlq_messages(&iq, count) for each function's internal queue, then extends all. This can return up to count * number_of_functions messages, exceeding the caller's limit.
Compare to dlq_peek (lines 589-601) which properly handles this by sorting and applying skip/take after aggregation.
🐛 Proposed fix
async fn dlq_messages(
&self,
topic: &str,
count: usize,
) -> anyhow::Result<Vec<serde_json::Value>> {
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
let mut all = Vec::new();
for fid in fids {
let iq = format!("{}::{}", topic, fid);
- all.extend(self.queue.dlq_messages(&iq, count).await);
+ let remaining = count.saturating_sub(all.len());
+ if remaining == 0 {
+ break;
+ }
+ all.extend(self.queue.dlq_messages(&iq, remaining).await);
}
+ all.truncate(count);
Ok(all)
} else {
Ok(self.queue.dlq_messages(topic, count).await)
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn dlq_messages( | |
| &self, | |
| topic: &str, | |
| count: usize, | |
| ) -> anyhow::Result<Vec<serde_json::Value>> { | |
| Ok(self.queue.dlq_messages(topic, count).await) | |
| let tf = self.topic_functions.read().await; | |
| if let Some(fids) = tf.get(topic) { | |
| let mut all = Vec::new(); | |
| for fid in fids { | |
| let iq = format!("{}::{}", topic, fid); | |
| all.extend(self.queue.dlq_messages(&iq, count).await); | |
| } | |
| Ok(all) | |
| } else { | |
| Ok(self.queue.dlq_messages(topic, count).await) | |
| } | |
| } | |
| async fn dlq_messages( | |
| &self, | |
| topic: &str, | |
| count: usize, | |
| ) -> anyhow::Result<Vec<serde_json::Value>> { | |
| let tf = self.topic_functions.read().await; | |
| if let Some(fids) = tf.get(topic) { | |
| let mut all = Vec::new(); | |
| for fid in fids { | |
| let iq = format!("{}::{}", topic, fid); | |
| let remaining = count.saturating_sub(all.len()); | |
| if remaining == 0 { | |
| break; | |
| } | |
| all.extend(self.queue.dlq_messages(&iq, remaining).await); | |
| } | |
| all.truncate(count); | |
| Ok(all) | |
| } else { | |
| Ok(self.queue.dlq_messages(topic, count).await) | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/modules/queue/adapters/builtin/adapter.rs` around lines 368 - 384,
The current dlq_messages implementation aggregates results from multiple
internal queues (constructed as iq = format!("{}::{}", topic, fid)) by extending
a Vec, which can yield up to count * num_functions items; change dlq_messages to
mirror dlq_peek's aggregation strategy: collect all messages from each
self.queue.dlq_messages(&iq, count).await into a single Vec, sort them
deterministically (e.g., by the same timestamp/field used in dlq_peek) and then
truncate/take only up to count before returning; keep the same behavior for the
single-topic branch (Ok(self.queue.dlq_messages(topic, count).await)) and reuse
topic_functions and iq naming to locate the code.
| #[tokio::test] | ||
| async fn fanout_two_functions_same_topic_both_receive() { | ||
| let topic = format!("fanout-{}", uuid::Uuid::new_v4()); | ||
| let engine = setup_engine_with_topic_triggers( | ||
| &["test::fn_a", "test::fn_b"], | ||
| &topic, | ||
| ) | ||
| .await; | ||
|
|
||
| let cap_a = register_capturing_function(&engine, "test::fn_a"); | ||
| let cap_b = register_capturing_function(&engine, "test::fn_b"); | ||
|
|
||
| enqueue_to_topic(&engine, &topic, json!({"msg": "hello"})) | ||
| .await | ||
| .expect("enqueue should succeed"); | ||
|
|
||
| tokio::time::sleep(Duration::from_secs(2)).await; | ||
|
|
||
| let a = cap_a.lock().await; | ||
| let b = cap_b.lock().await; | ||
| assert_eq!(a.len(), 1, "fn_a should receive exactly 1 message, got {}", a.len()); | ||
| assert_eq!(b.len(), 1, "fn_b should receive exactly 1 message, got {}", b.len()); | ||
| assert_eq!(a[0]["msg"], "hello"); | ||
| assert_eq!(b[0]["msg"], "hello"); | ||
| } |
There was a problem hiding this comment.
Function registration order is reversed — handlers registered after triggers.
In this test, setup_engine_with_topic_triggers registers the triggers (lines 104-108), then register_capturing_function registers the actual handlers (lines 110-111). If the trigger registration immediately subscribes and messages arrive before handlers are registered, those messages would fail to invoke the handler.
Compare to fanout_replicas_compete_within_function (lines 127-166) which correctly registers the handler (line 133) before registering triggers (lines 141-153).
This test may pass due to the 2-second sleep before enqueueing, but the pattern is fragile.
💡 Suggested fix
async fn fanout_two_functions_same_topic_both_receive() {
let topic = format!("fanout-{}", uuid::Uuid::new_v4());
+ iii::modules::observability::metrics::ensure_default_meter();
+ let engine = Arc::new(Engine::new());
+
+ let cap_a = register_capturing_function(&engine, "test::fn_a");
+ let cap_b = register_capturing_function(&engine, "test::fn_b");
+
- let engine = setup_engine_with_topic_triggers(
- &["test::fn_a", "test::fn_b"],
- &topic,
- )
- .await;
+ let module = QueueCoreModule::create(engine.clone(), Some(builtin_queue_config()))
+ .await
+ .expect("QueueCoreModule::create should succeed");
+ module.register_functions(engine.clone());
+ module.initialize().await.expect("init should succeed");
- let cap_a = register_capturing_function(&engine, "test::fn_a");
- let cap_b = register_capturing_function(&engine, "test::fn_b");
+ // Register triggers after handlers are ready
+ for fid in &["test::fn_a", "test::fn_b"] {
+ engine.trigger_registry.register_trigger(Trigger { ... }).await.expect("...");
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/tests/queue_e2e_fanout.rs` around lines 101 - 125, The test registers
triggers before the handlers, risking missed messages; change
fanout_two_functions_same_topic_both_receive so that
register_capturing_function("test::fn_a") and
register_capturing_function("test::fn_b") are called and awaited before calling
setup_engine_with_topic_triggers (or alternatively ensure
setup_engine_with_topic_triggers does not subscribe immediately), ensuring
handlers are registered prior to enqueue_to_topic; refer to the functions
register_capturing_function, setup_engine_with_topic_triggers, enqueue_to_topic
and the test function fanout_two_functions_same_topic_both_receive when making
the change.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
engine/src/modules/queue/adapters/builtin/adapter.rs (1)
252-268: Consider potential ordering issue with separate write locks.The code acquires and releases the
topic_functionslock before acquiringtrigger_function_mapandsubscriptionslocks. While functionally correct, ifenqueueis called concurrently between these two blocks, it may miss the newly registered function. This is a minor race window since the subscription handle is only inserted after both maps are updated.However, given the async nature and typical usage patterns (subscribe happens during initialization), this is unlikely to cause issues in practice.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/modules/queue/adapters/builtin/adapter.rs` around lines 252 - 268, There is a small race between updating topic_functions and then updating trigger_function_map/subscriptions because the first write lock (topic_functions.write().await) is released before the other two are acquired; to fix, acquire the necessary write locks together in a single scope so updates are atomic: take topic_functions.write().await, trigger_function_map.write().await and subscriptions.write().await (or a combined lock) before performing the inserts for topic -> function_id, sub_key -> (topic,function_id) and sub_key -> handle, using the existing symbols topic_functions, trigger_function_map, subscriptions, topic, id, function_id, and handle to locate the code to change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/tests/rabbitmq_queue_integration.rs`:
- Around line 1287-1356: The test fails because topic fanout subscriber queues
created by setup_subscriber_queue() do not create per-function DLX/retry
exchanges like setup_function_queue() does, so publisher.requeue() →
publisher.publish() republishes into the main fanout exchange and redelivers
retries to all subscribers; to fix, make setup_subscriber_queue() create the
same per-function DLX + retry exchange/queue chain (unique to the
function/trigger) and change publisher.requeue()/publish() logic to route
requeued messages to that per-function retry exchange (not the main fanout
exchange) for topic/fanout subscriptions so retries are delivered only to the
originating function.
---
Nitpick comments:
In `@engine/src/modules/queue/adapters/builtin/adapter.rs`:
- Around line 252-268: There is a small race between updating topic_functions
and then updating trigger_function_map/subscriptions because the first write
lock (topic_functions.write().await) is released before the other two are
acquired; to fix, acquire the necessary write locks together in a single scope
so updates are atomic: take topic_functions.write().await,
trigger_function_map.write().await and subscriptions.write().await (or a
combined lock) before performing the inserts for topic -> function_id, sub_key
-> (topic,function_id) and sub_key -> handle, using the existing symbols
topic_functions, trigger_function_map, subscriptions, topic, id, function_id,
and handle to locate the code to change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: bb114904-db48-423a-b1e9-5d9f88346a19
📒 Files selected for processing (7)
engine/src/modules/queue/adapters/builtin/adapter.rsengine/src/modules/queue/adapters/rabbitmq/adapter.rsengine/src/modules/queue/adapters/rabbitmq/naming.rsengine/src/modules/queue/adapters/rabbitmq/topology.rsengine/src/modules/queue/adapters/rabbitmq/worker.rsengine/tests/queue_e2e_fanout.rsengine/tests/rabbitmq_queue_integration.rs
✅ Files skipped from review due to trivial changes (1)
- engine/tests/queue_e2e_fanout.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- engine/src/modules/queue/adapters/rabbitmq/worker.rs
- engine/src/modules/queue/adapters/rabbitmq/naming.rs
- engine/src/modules/queue/adapters/rabbitmq/topology.rs
…xchange When a subscriber failed and the retry handler requeued the message, it published back to the topic's fanout exchange, causing ALL subscribers to receive the retried copy. With max_attempts=3 the success function would get 3 messages instead of 1. Similarly, exhausted messages went to the topic-level DLQ instead of the per-function DLQ. Now requeue() and publish_to_dlq() accept an optional function_id so the worker can target the specific per-function queue/DLQ directly, bypassing the fanout exchange.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
engine/src/modules/queue/adapters/builtin/adapter.rs (1)
203-219: Consider releasing the read lock before pushing to queues.The read lock on
topic_functionsis held across multipleawaitpoints while pushing to each function's queue. Under high fanout (many functions per topic), this blockssubscribe/unsubscribewriters for the duration of all pushes.♻️ Proposed optimization to release lock earlier
async fn enqueue( &self, topic: &str, data: Value, traceparent: Option<String>, baggage: Option<String>, ) { - let tf = self.topic_functions.read().await; - if let Some(function_ids) = tf.get(topic) { - for fid in function_ids { - let internal_queue = format!("{}::{}", topic, fid); - self.queue - .push( - &internal_queue, - data.clone(), - traceparent.clone(), - baggage.clone(), - ) - .await; - } - } else { + let internal_queues: Vec<String> = { + let tf = self.topic_functions.read().await; + tf.get(topic) + .map(|fids| fids.iter().map(|fid| format!("{}::{}", topic, fid)).collect()) + .unwrap_or_default() + }; + + if internal_queues.is_empty() { self.queue.push(topic, data, traceparent, baggage).await; + } else { + for internal_queue in internal_queues { + self.queue + .push( + &internal_queue, + data.clone(), + traceparent.clone(), + baggage.clone(), + ) + .await; + } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/modules/queue/adapters/builtin/adapter.rs` around lines 203 - 219, The read lock on topic_functions (obtained via self.topic_functions.read().await and bound to tf) is held across async await points in the loop calling self.queue.push, which blocks subscribe/unsubscribe writers; fix by acquiring the read lock, clone/collect the function id list (e.g., let function_ids = tf.get(topic).cloned() or function_ids = tf.get(topic).map(|v| v.clone())), then drop the lock (end the scope) and iterate over the cloned function_ids calling self.queue.push with internal_queue for each fid; keep the fallback self.queue.push(topic, ...) when None.engine/src/modules/queue/adapters/rabbitmq/topology.rs (1)
70-115: The per-function queue should use DLX for consistency with setup_function_queue, but note that the current implementation is safe.The current design uses manual DLQ publishing via
handle_failure()andpublish_to_dlq()in the retry handler. This works correctly—the onlynack(requeue=false)call in the worker (line 176–179) is always followed byhandle_failure()at line 183–186, which routes the message to the DLQ.However,
setup_function_queueusesx-dead-letter-exchangefor automatic DLX-based routing (lines 138–140), whilesetup_subscriber_queuedoes not. This inconsistency could become a maintenance risk if future code changes bypass thehandle_failure()path.Consider adding
x-dead-letter-exchangetosetup_subscriber_queueto match the main queue pattern and make the DLQ routing automatic and defensive:Suggested diff
self.channel .queue_declare( &queue_name, QueueDeclareOptions { durable: true, ..Default::default() }, - FieldTable::default(), + { + let mut args = FieldTable::default(); + args.insert( + "x-dead-letter-exchange".into(), + AMQPValue::LongString("".into()), + ); + args.insert( + "x-dead-letter-routing-key".into(), + AMQPValue::LongString(dlq_name.clone().into()), + ); + args + }, ) .await?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/modules/queue/adapters/rabbitmq/topology.rs` around lines 70 - 115, The subscriber queue declaration in setup_subscriber_queue should include the same x-dead-letter-exchange argument used by setup_function_queue so DLQ routing becomes automatic; update the queue_declare call that creates queue_name in setup_subscriber_queue to build a FieldTable containing the "x-dead-letter-exchange" entry pointing to the function DLQ (use names.function_dlq(function_id) or the same exchange identifier used by setup_function_queue) so the queue is declared with the DLX enabled rather than relying solely on manual handle_failure()/publish_to_dlq().engine/src/modules/queue/adapters/rabbitmq/adapter.rs (1)
243-274: Document the fanout exchange pub-sub semantics: messages enqueued before subscribers exist will be silently dropped.The
enqueuemethod callssetup_topic(which only declares the fanout exchange) but notsetup_subscriber_queue. In RabbitMQ, messages published to a fanout exchange with no bound queues are immediately discarded. This is standard pub-sub behavior but differs from queue-based semantics where messages could accumulate.The tests never exercise this scenario—all register subscribers via
setup_engine_with_topic_triggers()before enqueuing. If the system may enqueue messages before subscribers are active, add:
- A note in documentation explaining this behavior
- A warning log when publishing to a topic with no active subscriptions
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/modules/queue/adapters/rabbitmq/adapter.rs` around lines 243 - 274, The enqueue path (enqueue calls topology.setup_topic but not setup_subscriber_queue) means publishing to a fanout exchange will silently drop messages if no queues are bound; update docs to state fanout pub-sub semantics (messages published before subscribers bind are discarded) and add runtime detection + warning: extend or use a topology check (e.g., add/use a topology.has_subscribers(topic) or similar) before publisher.publish in enqueue and emit a warning log (via tracing::warn!) when no subscribers are present; ensure the log references topic and job.id and keep existing success/error logs unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/src/modules/queue/adapters/builtin/adapter.rs`:
- Around line 291-307: There's a race where we check trigger_function_map
(still_has_triggers) before taking the topic_functions write lock, so a
concurrent subscribe can add the (mapped_topic, mapped_fid) after the check but
before removal; fix by moving the existence re-check into the critical section:
after acquiring the topic_functions write lock (tf) re-acquire or re-check
trigger_function_map (or re-compute still_has_triggers under the write lock) and
only then remove mapped_fid from tf if no triggers exist. Use the same
identifiers from the diff (removed_mapping, mapped_topic, mapped_fid,
trigger_function_map, topic_functions, tfm, tf) and ensure the final decision to
remove is based on the re-check performed while holding tf to prevent the race.
---
Nitpick comments:
In `@engine/src/modules/queue/adapters/builtin/adapter.rs`:
- Around line 203-219: The read lock on topic_functions (obtained via
self.topic_functions.read().await and bound to tf) is held across async await
points in the loop calling self.queue.push, which blocks subscribe/unsubscribe
writers; fix by acquiring the read lock, clone/collect the function id list
(e.g., let function_ids = tf.get(topic).cloned() or function_ids =
tf.get(topic).map(|v| v.clone())), then drop the lock (end the scope) and
iterate over the cloned function_ids calling self.queue.push with internal_queue
for each fid; keep the fallback self.queue.push(topic, ...) when None.
In `@engine/src/modules/queue/adapters/rabbitmq/adapter.rs`:
- Around line 243-274: The enqueue path (enqueue calls topology.setup_topic but
not setup_subscriber_queue) means publishing to a fanout exchange will silently
drop messages if no queues are bound; update docs to state fanout pub-sub
semantics (messages published before subscribers bind are discarded) and add
runtime detection + warning: extend or use a topology check (e.g., add/use a
topology.has_subscribers(topic) or similar) before publisher.publish in enqueue
and emit a warning log (via tracing::warn!) when no subscribers are present;
ensure the log references topic and job.id and keep existing success/error logs
unchanged.
In `@engine/src/modules/queue/adapters/rabbitmq/topology.rs`:
- Around line 70-115: The subscriber queue declaration in setup_subscriber_queue
should include the same x-dead-letter-exchange argument used by
setup_function_queue so DLQ routing becomes automatic; update the queue_declare
call that creates queue_name in setup_subscriber_queue to build a FieldTable
containing the "x-dead-letter-exchange" entry pointing to the function DLQ (use
names.function_dlq(function_id) or the same exchange identifier used by
setup_function_queue) so the queue is declared with the DLX enabled rather than
relying solely on manual handle_failure()/publish_to_dlq().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 494f690d-16a3-4836-a6be-8a24f463cc60
📒 Files selected for processing (8)
engine/src/modules/queue/adapters/builtin/adapter.rsengine/src/modules/queue/adapters/rabbitmq/adapter.rsengine/src/modules/queue/adapters/rabbitmq/publisher.rsengine/src/modules/queue/adapters/rabbitmq/retry.rsengine/src/modules/queue/adapters/rabbitmq/topology.rsengine/src/modules/queue/adapters/rabbitmq/worker.rsengine/tests/queue_e2e_fanout.rsengine/tests/rabbitmq_queue_integration.rs
✅ Files skipped from review due to trivial changes (1)
- engine/tests/rabbitmq_queue_integration.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- engine/tests/queue_e2e_fanout.rs
| if let Some((mapped_topic, mapped_fid)) = removed_mapping { | ||
| let still_has_triggers = { | ||
| let tfm = self.trigger_function_map.read().await; | ||
| tfm.values() | ||
| .any(|(t, f)| t == &mapped_topic && f == &mapped_fid) | ||
| }; | ||
|
|
||
| if !still_has_triggers { | ||
| let mut tf = self.topic_functions.write().await; | ||
| if let Some(fids) = tf.get_mut(&mapped_topic) { | ||
| fids.remove(&mapped_fid); | ||
| if fids.is_empty() { | ||
| tf.remove(&mapped_topic); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Race condition: concurrent subscribe can be orphaned between still_has_triggers check and topic_functions removal.
Between checking trigger_function_map (line 293-296) and removing from topic_functions (line 299-304), another thread can subscribe with the same (topic, function_id). That subscription would add entries to both maps, but then this thread removes the function from topic_functions, causing future enqueue calls to skip the new subscription's queue.
🔒 Proposed fix: re-check under write lock
if let Some((mapped_topic, mapped_fid)) = removed_mapping {
- let still_has_triggers = {
- let tfm = self.trigger_function_map.read().await;
- tfm.values()
- .any(|(t, f)| t == &mapped_topic && f == &mapped_fid)
- };
-
- if !still_has_triggers {
- let mut tf = self.topic_functions.write().await;
+ let mut tf = self.topic_functions.write().await;
+ let tfm = self.trigger_function_map.read().await;
+ let still_has_triggers = tfm
+ .values()
+ .any(|(t, f)| t == &mapped_topic && f == &mapped_fid);
+ drop(tfm);
+
+ if !still_has_triggers {
if let Some(fids) = tf.get_mut(&mapped_topic) {
fids.remove(&mapped_fid);
if fids.is_empty() {
tf.remove(&mapped_topic);
}
}
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@engine/src/modules/queue/adapters/builtin/adapter.rs` around lines 291 - 307,
There's a race where we check trigger_function_map (still_has_triggers) before
taking the topic_functions write lock, so a concurrent subscribe can add the
(mapped_topic, mapped_fid) after the check but before removal; fix by moving the
existence re-check into the critical section: after acquiring the
topic_functions write lock (tf) re-acquire or re-check trigger_function_map (or
re-compute still_has_triggers under the write lock) and only then remove
mapped_fid from tf if no triggers exist. Use the same identifiers from the diff
(removed_mapping, mapped_topic, mapped_fid, trigger_function_map,
topic_functions, tfm, tf) and ensure the final decision to remove is based on
the re-check performed while holding tf to prevent the race.
Multiple distinct functions subscribing to the same topic now each receive every message (fan-out) instead of competing. Updated JS and Python integration tests to assert per-function delivery of all messages.
Summary
registerTrigger({ type: 'queue', config: { topic } })works identicallyWhat changed
Builtin adapter
{topic}::{function_id}(one per function)enqueuefans out to all per-function queues registered on that topicsubscribe/unsubscribetracktopic -> function_idsmappinglist_topics,topic_stats,dlq_peekaggregate across per-function queuesRabbitMQ adapter
iii.{topic}.{function_id}.queue) + DLQ bound to the fanout exchangeBridge adapter
Test plan
Builtin E2E (7 tests, all passing)
RabbitMQ E2E (3 tests, behind
rabbitmqfeature flag)Existing tests (32 tests, all passing)
Summary by CodeRabbit
New Features
Tests