Skip to content

feat: queue fan-out per function#1384

Merged
ytallo merged 5 commits intomainfrom
feat/queue-fanout-per-function
Mar 31, 2026
Merged

feat: queue fan-out per function#1384
ytallo merged 5 commits intomainfrom
feat/queue-fanout-per-function

Conversation

@ytallo
Copy link
Copy Markdown
Contributor

@ytallo ytallo commented Mar 31, 2026

Summary

  • Queue topic subscriptions now deliver messages to every distinct function subscribed to that topic (fan-out), while replicas of the same function compete on a shared per-function queue
  • No user-facing API changes -- registerTrigger({ type: 'queue', config: { topic } }) works identically

What changed

Builtin adapter

  • Internal queue name: {topic}::{function_id} (one per function)
  • enqueue fans out to all per-function queues registered on that topic
  • subscribe/unsubscribe track topic -> function_ids mapping
  • DLQ, list_topics, topic_stats, dlq_peek aggregate across per-function queues

RabbitMQ adapter

  • Topic exchange changed from topic to fanout
  • Each subscriber function gets its own queue (iii.{topic}.{function_id}.queue) + DLQ bound to the fanout exchange
  • RabbitMQ natively delivers copies to all bound queues
  • Worker accepts queue name directly

Bridge adapter

  • No changes needed -- fan-out handled by the remote engine's adapter

Test plan

Builtin E2E (7 tests, all passing)

  • Two functions on same topic both receive each message
  • Replicas of same function compete (only one processes)
  • Mixed functions + replicas (5 messages, both functions get all 5)
  • Single subscriber regression
  • Unsubscribe stops delivery to that function only
  • Payload integrity (both functions get identical data)
  • Condition function filtering (one filtered, one not)

RabbitMQ E2E (3 tests, behind rabbitmq feature flag)

  • Two functions on same topic both receive
  • Replicas compete within function
  • Per-function DLQ isolation (failing function doesn't affect success function)

Existing tests (32 tests, all passing)

  • queue_integration (12 tests)
  • queue_e2e_happy_path (2 tests)
  • queue_e2e_failure_retry (3 tests)
  • queue_e2e_concurrency_resilience (4 tests)
  • dlq_redrive_e2e (11 tests)

Summary by CodeRabbit

  • New Features

    • Per-function fan-out: messages can be routed to per-function subscriber queues and per-function DLQs; retries, requeues and DLQ publishes respect function-level routing and conditional delivery.
    • Topic views and DLQ operations now aggregate across per-function queues for accurate stats and listings.
  • Tests

    • Added end-to-end and RabbitMQ integration tests covering fan-out, replica competition, conditional delivery, delivery guarantees, and per-function DLQ behavior.

@vercel
Copy link
Copy Markdown
Contributor

vercel bot commented Mar 31, 2026

Deployment failed with the following error:

Invalid request: `attribution.gitUser` should NOT have additional property `isBot`.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 31, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: f7e1f535-8b81-449c-8a2e-ca93b05fbd61

📥 Commits

Reviewing files that changed from the base of the PR and between 8062c9f and af2475a.

📒 Files selected for processing (2)
  • frameworks/motia/motia-js/packages/motia/__tests__/integration/queue.integration.test.ts
  • frameworks/motia/motia-py/packages/motia/tests/test_queue_integration.py

📝 Walkthrough

Walkthrough

Builtin adapter adds per-topic-per-function routing state and fans out messages to per-function queues; RabbitMQ switches to a fanout exchange, creates per-function queues/DLQs, and routes/retries by function_id; publisher/retry/worker updated; new e2e and integration tests validate fanout and replica semantics.

Changes

Cohort / File(s) Summary
Builtin adapter (fanout state & routing)
engine/src/modules/queue/adapters/builtin/adapter.rs
Adds topic_functions and trigger_function_map; enqueue fans out to {topic}::{function_id} when mappings exist; subscribe/unsubscribe manage mappings; DLQ, listing, stats, and dlq_peek aggregate across per-function queues.
RabbitMQ adapter — subscribe & worker
engine/src/modules/queue/adapters/rabbitmq/adapter.rs, engine/src/modules/queue/adapters/rabbitmq/worker.rs
subscribe sets up per-function subscriber queue and passes queue_name to worker; Worker::run consumes from provided queue_name and forwards function_id into retry handling.
RabbitMQ naming & topology
engine/src/modules/queue/adapters/rabbitmq/naming.rs, engine/src/modules/queue/adapters/rabbitmq/topology.rs
Added function_queue and function_dlq name helpers; setup_topic now declares a Fanout exchange; new setup_subscriber_queue(topic,function_id) declares per-function DLQ and queue and binds it to the fanout exchange.
RabbitMQ publisher & retry
engine/src/modules/queue/adapters/rabbitmq/publisher.rs, engine/src/modules/queue/adapters/rabbitmq/retry.rs
requeue and publish_to_dlq accept optional function_id and route requeues/DLQ publishes to per-function targets when provided; RetryHandler::handle_failure propagates function_id.
RabbitMQ publisher — requeue/dlq routing
engine/src/modules/queue/adapters/rabbitmq/publisher.rs
requeue/publish_to_dlq signatures changed to accept function_id: Option<&str> and publish to per-function queues/DLQs when given.
Tests — builtin e2e & RabbitMQ integration
engine/tests/queue_e2e_fanout.rs, engine/tests/rabbitmq_queue_integration.rs
Added comprehensive e2e tests for fanout, replica competition, unsubscribe behavior, payload integrity, conditional delivery; added RabbitMQ integration tests validating per-function fanout behavior.
Client SDK tests updated
frameworks/motia/.../queue.integration.test.ts, frameworks/motia/.../test_queue_integration.py
Updated integration tests to expect fan-out semantics (each subscriber receives all messages) and renamed tests accordingly.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Builtin as BuiltinAdapter\n(topic_functions, trigger_function_map)
    participant IQ1 as InternalQueue\n(topic::fn_1)
    participant IQ2 as InternalQueue\n(topic::fn_2)
    participant F1 as Function1
    participant F2 as Function2

    Client->>Builtin: enqueue(message, topic)
    activate Builtin
    Builtin->>Builtin: lookup functions for topic
    deactivate Builtin

    par Fan-out
        Builtin->>IQ1: push(message)
        activate IQ1
        IQ1->>F1: deliver
        deactivate IQ1
    and
        Builtin->>IQ2: push(message)
        activate IQ2
        IQ2->>F2: deliver
        deactivate IQ2
    end
Loading
sequenceDiagram
    participant Subscriber
    participant Adapter as RabbitMQAdapter
    participant Names as RabbitNames
    participant Topology as TopologyManager
    participant Exchange as FanoutExchange
    participant PFQ as PerFunctionQueue

    Subscriber->>Adapter: subscribe(topic, function_id)
    activate Adapter
    Adapter->>Names: function_queue(function_id)
    Names-->>Adapter: per_function_queue_name
    Adapter->>Topology: setup_subscriber_queue(topic, function_id)
    Topology->>Exchange: ensure fanout exchange declared
    Topology->>PFQ: declare DLQ and per-function queue, bind to exchange
    Adapter->>Adapter: spawn worker with queue_name = per_function_queue_name
    Adapter-->>Subscriber: subscription handle
    deactivate Adapter
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • sergiofilhowz
  • guibeira

Poem

🐇 I thumped a message on the ground,
it split and hopped to queues all round.
Each function found its little share,
nibbling payloads with rabbit flair.
Fanout hops — deliveries everywhere! 🎉

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 32.56% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: queue fan-out per function' accurately and concisely describes the main change: implementing fan-out queue delivery on a per-function basis.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/queue-fanout-per-function

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

ytallo added 3 commits March 31, 2026 18:00
Each distinct function_id subscribed to a topic now gets its own
internal queue ({topic}::{function_id}). Enqueue fans out to all
per-function queues. Replicas of the same function compete on their
shared queue. DLQ, topic_stats, and list_topics aggregate across
per-function queues.
Topic exchange changed from topic to fanout. Each subscriber function
gets its own queue (iii.{topic}.{function_id}.queue) and DLQ bound to
the fanout exchange. RabbitMQ natively delivers a copy of each message
to every bound queue. Worker now accepts queue_name directly instead
of computing from topic.
7 builtin adapter tests: two-function fanout, replica competition,
mixed functions+replicas, single subscriber regression, unsubscribe
stops delivery, payload integrity, condition function filtering.

3 RabbitMQ adapter tests (behind rabbitmq feature): two-function
fanout, replica competition, per-function DLQ isolation.
@ytallo ytallo force-pushed the feat/queue-fanout-per-function branch from 15cd83d to 1989db4 Compare March 31, 2026 21:00
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
engine/src/modules/queue/adapters/rabbitmq/topology.rs (1)

91-100: ⚠️ Potential issue | 🔴 Critical

Per-function queue missing x-dead-letter-exchange configuration — messages won't route to DLQ.

The per-function queue is declared with FieldTable::default(), which means it has no dead-letter-exchange configured. When messages are nack'd with requeue: false, RabbitMQ will discard them instead of routing to the per-function DLQ.

Compare to setup_function_queue (lines 140-154) which correctly configures DLX:

let mut main_queue_args = FieldTable::default();
main_queue_args.insert(
    "x-dead-letter-exchange".into(),
    AMQPValue::LongString(names.dlq_exchange().into()),
);

The DLQ queue is created (lines 80-89) but won't receive any messages without the DLX binding.

🐛 Proposed fix
+        // Configure dead-letter exchange so nack(requeue=false) routes to DLQ
+        let mut queue_args = FieldTable::default();
+        // For per-function queues, we publish directly to the DLQ queue
+        // since there's no per-function DLQ exchange
+        queue_args.insert(
+            "x-dead-letter-routing-key".into(),
+            AMQPValue::LongString(dlq_name.clone().into()),
+        );
+
         self.channel
             .queue_declare(
                 &queue_name,
                 QueueDeclareOptions {
                     durable: true,
                     ..Default::default()
                 },
-                FieldTable::default(),
+                queue_args,
             )
             .await?;

Note: You'll also need to create a DLQ exchange and bind the DLQ queue to it, similar to the pattern in setup_function_queue.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/modules/queue/adapters/rabbitmq/topology.rs` around lines 91 -
100, The per-function queue declaration uses FieldTable::default() and lacks the
x-dead-letter-exchange argument, so nacked messages will be discarded instead of
routed to the DLQ; update the queue declaration in the same block that calls
queue_declare (where queue_name is used) to build a FieldTable (like in
setup_function_queue) and insert ("x-dead-letter-exchange",
AMQPValue::LongString(names.dlq_exchange().into())) before calling
queue_declare, and also ensure the DLQ exchange and binding for the DLQ queue
(created earlier) are created/bound similarly to the pattern used in
setup_function_queue.
🧹 Nitpick comments (2)
engine/tests/rabbitmq_queue_integration.rs (1)

1121-1171: Consider reusing existing helpers from common::queue_helpers.

register_rmq_capturing_function and register_rmq_counting_fn appear to duplicate functionality from register_payload_capturing_function and register_counting_function already defined in common::queue_helpers (imported on lines 26-31).

💡 Potential consolidation
-fn register_rmq_capturing_function(
-    engine: &Arc<Engine>,
-    function_id: &str,
-) -> Arc<Mutex<Vec<Value>>> {
-    // ... implementation ...
-}
-
-fn register_rmq_counting_fn(
-    engine: &Arc<Engine>,
-    function_id: &str,
-) -> Arc<AtomicU64> {
-    // ... implementation ...
-}
+// Use existing helpers from common::queue_helpers
+// let cap_a = register_payload_capturing_function(&engine, "test::rmq_fan_a", captured_a.clone());
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/tests/rabbitmq_queue_integration.rs` around lines 1121 - 1171, The two
local helpers register_rmq_capturing_function and register_rmq_counting_fn
duplicate functionality already provided by common::queue_helpers
(register_payload_capturing_function and register_counting_function); replace
the local definitions by calling those helpers instead: remove
register_rmq_capturing_function and register_rmq_counting_fn, and where they are
used call register_payload_capturing_function(engine, function_id) to return the
Arc<Mutex<Vec<Value>>> and register_counting_function(engine, function_id) to
return the Arc<AtomicU64>, ensuring the types/signatures match and any necessary
imports from common::queue_helpers remain in scope.
engine/src/modules/queue/adapters/rabbitmq/naming.rs (1)

30-36: Consider adding unit tests for the new naming methods.

The new function_queue and function_dlq methods follow the existing naming pattern, but lack dedicated unit tests in this file. The existing test_rabbit_names test could be extended to cover these:

💡 Suggested test addition
 #[test]
 fn test_rabbit_names() {
     let names = RabbitNames::new("user.created");

     assert_eq!(names.exchange(), "iii.user.created.exchange");
     assert_eq!(names.queue(), "iii.user.created.queue");
     assert_eq!(names.dlq(), "iii.user.created.dlq");
+    assert_eq!(names.function_queue("handler_a"), "iii.user.created.handler_a.queue");
+    assert_eq!(names.function_dlq("handler_a"), "iii.user.created.handler_a.dlq");
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/modules/queue/adapters/rabbitmq/naming.rs` around lines 30 - 36,
Add unit tests exercising the new naming methods function_queue and function_dlq
to ensure they produce the expected strings; update or extend the existing
test_rabbit_names (or add a new test) to construct a Naming instance with a
known topic and call Naming::function_queue(function_id) and
Naming::function_dlq(function_id), asserting the returned strings equal
format!("{}.{}.{}.queue", EXCHANGE_PREFIX, topic, function_id) and
format!("{}.{}.{}.dlq", EXCHANGE_PREFIX, topic, function_id) respectively so the
methods are covered by the test suite.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@engine/src/modules/queue/adapters/builtin/adapter.rs`:
- Around line 368-384: The current dlq_messages implementation aggregates
results from multiple internal queues (constructed as iq = format!("{}::{}",
topic, fid)) by extending a Vec, which can yield up to count * num_functions
items; change dlq_messages to mirror dlq_peek's aggregation strategy: collect
all messages from each self.queue.dlq_messages(&iq, count).await into a single
Vec, sort them deterministically (e.g., by the same timestamp/field used in
dlq_peek) and then truncate/take only up to count before returning; keep the
same behavior for the single-topic branch (Ok(self.queue.dlq_messages(topic,
count).await)) and reuse topic_functions and iq naming to locate the code.

In `@engine/src/modules/queue/adapters/rabbitmq/adapter.rs`:
- Around line 310-318: The retry handler currently cannot route exhausted
messages to the per-function dead-letter queue because handle_failure lacks
function_id; update the design so exhausted messages are published to
names.function_dlq(function_id) instead of names.dlq(): either extend the retry
handler's handle_failure signature to accept function_id (propagate function_id
from setup_subscriber_queue into the retry handler and use it when publishing to
the DLQ), or remove/avoid creating per-function DLQs in setup_subscriber_queue
if a single topic-level DLQ is intended; changes should reference
setup_subscriber_queue, handle_failure, names.function_dlq, names.dlq and ensure
the code path that publishes exhausted messages uses the function_id-aware DLQ
when available.

In `@engine/tests/queue_e2e_fanout.rs`:
- Around line 101-125: The test registers triggers before the handlers, risking
missed messages; change fanout_two_functions_same_topic_both_receive so that
register_capturing_function("test::fn_a") and
register_capturing_function("test::fn_b") are called and awaited before calling
setup_engine_with_topic_triggers (or alternatively ensure
setup_engine_with_topic_triggers does not subscribe immediately), ensuring
handlers are registered prior to enqueue_to_topic; refer to the functions
register_capturing_function, setup_engine_with_topic_triggers, enqueue_to_topic
and the test function fanout_two_functions_same_topic_both_receive when making
the change.

In `@engine/tests/rabbitmq_queue_integration.rs`:
- Around line 1287-1356: The test rmq_fanout_dlq_per_function currently only
checks the success handler and that the failing function was invoked; update it
to also assert the failed message landed in the per-function DLQ by
querying/consuming the DLQ queue (name pattern:
iii.{topic}.test::rmq_dlq_fail.dlq) after waiting for retries, similar to the
logic in rmq_dlq_exhaustion_with_content_verification: use the same inspection
helper or implement a short consumer to fetch messages from that DLQ, verify
there is exactly one message (or expected count) and that its payload contains
{"data":"dlq-test"}, and add the corresponding assertions in
rmq_fanout_dlq_per_function.

---

Outside diff comments:
In `@engine/src/modules/queue/adapters/rabbitmq/topology.rs`:
- Around line 91-100: The per-function queue declaration uses
FieldTable::default() and lacks the x-dead-letter-exchange argument, so nacked
messages will be discarded instead of routed to the DLQ; update the queue
declaration in the same block that calls queue_declare (where queue_name is
used) to build a FieldTable (like in setup_function_queue) and insert
("x-dead-letter-exchange", AMQPValue::LongString(names.dlq_exchange().into()))
before calling queue_declare, and also ensure the DLQ exchange and binding for
the DLQ queue (created earlier) are created/bound similarly to the pattern used
in setup_function_queue.

---

Nitpick comments:
In `@engine/src/modules/queue/adapters/rabbitmq/naming.rs`:
- Around line 30-36: Add unit tests exercising the new naming methods
function_queue and function_dlq to ensure they produce the expected strings;
update or extend the existing test_rabbit_names (or add a new test) to construct
a Naming instance with a known topic and call
Naming::function_queue(function_id) and Naming::function_dlq(function_id),
asserting the returned strings equal format!("{}.{}.{}.queue", EXCHANGE_PREFIX,
topic, function_id) and format!("{}.{}.{}.dlq", EXCHANGE_PREFIX, topic,
function_id) respectively so the methods are covered by the test suite.

In `@engine/tests/rabbitmq_queue_integration.rs`:
- Around line 1121-1171: The two local helpers register_rmq_capturing_function
and register_rmq_counting_fn duplicate functionality already provided by
common::queue_helpers (register_payload_capturing_function and
register_counting_function); replace the local definitions by calling those
helpers instead: remove register_rmq_capturing_function and
register_rmq_counting_fn, and where they are used call
register_payload_capturing_function(engine, function_id) to return the
Arc<Mutex<Vec<Value>>> and register_counting_function(engine, function_id) to
return the Arc<AtomicU64>, ensuring the types/signatures match and any necessary
imports from common::queue_helpers remain in scope.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 7bb16964-e5c0-40a9-aeb1-c865ffa78a7b

📥 Commits

Reviewing files that changed from the base of the PR and between b56a528 and 15cd83d.

📒 Files selected for processing (7)
  • engine/src/modules/queue/adapters/builtin/adapter.rs
  • engine/src/modules/queue/adapters/rabbitmq/adapter.rs
  • engine/src/modules/queue/adapters/rabbitmq/naming.rs
  • engine/src/modules/queue/adapters/rabbitmq/topology.rs
  • engine/src/modules/queue/adapters/rabbitmq/worker.rs
  • engine/tests/queue_e2e_fanout.rs
  • engine/tests/rabbitmq_queue_integration.rs

Comment on lines 368 to 384
async fn dlq_messages(
&self,
topic: &str,
count: usize,
) -> anyhow::Result<Vec<serde_json::Value>> {
Ok(self.queue.dlq_messages(topic, count).await)
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
let mut all = Vec::new();
for fid in fids {
let iq = format!("{}::{}", topic, fid);
all.extend(self.queue.dlq_messages(&iq, count).await);
}
Ok(all)
} else {
Ok(self.queue.dlq_messages(topic, count).await)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

dlq_messages may return more than count messages when aggregating.

When multiple functions subscribe to a topic, this method calls self.queue.dlq_messages(&iq, count) for each function's internal queue, then extends all. This can return up to count * number_of_functions messages, exceeding the caller's limit.

Compare to dlq_peek (lines 589-601) which properly handles this by sorting and applying skip/take after aggregation.

🐛 Proposed fix
     async fn dlq_messages(
         &self,
         topic: &str,
         count: usize,
     ) -> anyhow::Result<Vec<serde_json::Value>> {
         let tf = self.topic_functions.read().await;
         if let Some(fids) = tf.get(topic) {
             let mut all = Vec::new();
             for fid in fids {
                 let iq = format!("{}::{}", topic, fid);
-                all.extend(self.queue.dlq_messages(&iq, count).await);
+                let remaining = count.saturating_sub(all.len());
+                if remaining == 0 {
+                    break;
+                }
+                all.extend(self.queue.dlq_messages(&iq, remaining).await);
             }
+            all.truncate(count);
             Ok(all)
         } else {
             Ok(self.queue.dlq_messages(topic, count).await)
         }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn dlq_messages(
&self,
topic: &str,
count: usize,
) -> anyhow::Result<Vec<serde_json::Value>> {
Ok(self.queue.dlq_messages(topic, count).await)
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
let mut all = Vec::new();
for fid in fids {
let iq = format!("{}::{}", topic, fid);
all.extend(self.queue.dlq_messages(&iq, count).await);
}
Ok(all)
} else {
Ok(self.queue.dlq_messages(topic, count).await)
}
}
async fn dlq_messages(
&self,
topic: &str,
count: usize,
) -> anyhow::Result<Vec<serde_json::Value>> {
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
let mut all = Vec::new();
for fid in fids {
let iq = format!("{}::{}", topic, fid);
let remaining = count.saturating_sub(all.len());
if remaining == 0 {
break;
}
all.extend(self.queue.dlq_messages(&iq, remaining).await);
}
all.truncate(count);
Ok(all)
} else {
Ok(self.queue.dlq_messages(topic, count).await)
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/modules/queue/adapters/builtin/adapter.rs` around lines 368 - 384,
The current dlq_messages implementation aggregates results from multiple
internal queues (constructed as iq = format!("{}::{}", topic, fid)) by extending
a Vec, which can yield up to count * num_functions items; change dlq_messages to
mirror dlq_peek's aggregation strategy: collect all messages from each
self.queue.dlq_messages(&iq, count).await into a single Vec, sort them
deterministically (e.g., by the same timestamp/field used in dlq_peek) and then
truncate/take only up to count before returning; keep the same behavior for the
single-topic branch (Ok(self.queue.dlq_messages(topic, count).await)) and reuse
topic_functions and iq naming to locate the code.

Comment on lines +101 to +125
#[tokio::test]
async fn fanout_two_functions_same_topic_both_receive() {
let topic = format!("fanout-{}", uuid::Uuid::new_v4());
let engine = setup_engine_with_topic_triggers(
&["test::fn_a", "test::fn_b"],
&topic,
)
.await;

let cap_a = register_capturing_function(&engine, "test::fn_a");
let cap_b = register_capturing_function(&engine, "test::fn_b");

enqueue_to_topic(&engine, &topic, json!({"msg": "hello"}))
.await
.expect("enqueue should succeed");

tokio::time::sleep(Duration::from_secs(2)).await;

let a = cap_a.lock().await;
let b = cap_b.lock().await;
assert_eq!(a.len(), 1, "fn_a should receive exactly 1 message, got {}", a.len());
assert_eq!(b.len(), 1, "fn_b should receive exactly 1 message, got {}", b.len());
assert_eq!(a[0]["msg"], "hello");
assert_eq!(b[0]["msg"], "hello");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Function registration order is reversed — handlers registered after triggers.

In this test, setup_engine_with_topic_triggers registers the triggers (lines 104-108), then register_capturing_function registers the actual handlers (lines 110-111). If the trigger registration immediately subscribes and messages arrive before handlers are registered, those messages would fail to invoke the handler.

Compare to fanout_replicas_compete_within_function (lines 127-166) which correctly registers the handler (line 133) before registering triggers (lines 141-153).

This test may pass due to the 2-second sleep before enqueueing, but the pattern is fragile.

💡 Suggested fix
 async fn fanout_two_functions_same_topic_both_receive() {
     let topic = format!("fanout-{}", uuid::Uuid::new_v4());
+    iii::modules::observability::metrics::ensure_default_meter();
+    let engine = Arc::new(Engine::new());
+
+    let cap_a = register_capturing_function(&engine, "test::fn_a");
+    let cap_b = register_capturing_function(&engine, "test::fn_b");
+
-    let engine = setup_engine_with_topic_triggers(
-        &["test::fn_a", "test::fn_b"],
-        &topic,
-    )
-    .await;
+    let module = QueueCoreModule::create(engine.clone(), Some(builtin_queue_config()))
+        .await
+        .expect("QueueCoreModule::create should succeed");
+    module.register_functions(engine.clone());
+    module.initialize().await.expect("init should succeed");
 
-    let cap_a = register_capturing_function(&engine, "test::fn_a");
-    let cap_b = register_capturing_function(&engine, "test::fn_b");
+    // Register triggers after handlers are ready
+    for fid in &["test::fn_a", "test::fn_b"] {
+        engine.trigger_registry.register_trigger(Trigger { ... }).await.expect("...");
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/tests/queue_e2e_fanout.rs` around lines 101 - 125, The test registers
triggers before the handlers, risking missed messages; change
fanout_two_functions_same_topic_both_receive so that
register_capturing_function("test::fn_a") and
register_capturing_function("test::fn_b") are called and awaited before calling
setup_engine_with_topic_triggers (or alternatively ensure
setup_engine_with_topic_triggers does not subscribe immediately), ensuring
handlers are registered prior to enqueue_to_topic; refer to the functions
register_capturing_function, setup_engine_with_topic_triggers, enqueue_to_topic
and the test function fanout_two_functions_same_topic_both_receive when making
the change.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
engine/src/modules/queue/adapters/builtin/adapter.rs (1)

252-268: Consider potential ordering issue with separate write locks.

The code acquires and releases the topic_functions lock before acquiring trigger_function_map and subscriptions locks. While functionally correct, if enqueue is called concurrently between these two blocks, it may miss the newly registered function. This is a minor race window since the subscription handle is only inserted after both maps are updated.

However, given the async nature and typical usage patterns (subscribe happens during initialization), this is unlikely to cause issues in practice.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/modules/queue/adapters/builtin/adapter.rs` around lines 252 - 268,
There is a small race between updating topic_functions and then updating
trigger_function_map/subscriptions because the first write lock
(topic_functions.write().await) is released before the other two are acquired;
to fix, acquire the necessary write locks together in a single scope so updates
are atomic: take topic_functions.write().await,
trigger_function_map.write().await and subscriptions.write().await (or a
combined lock) before performing the inserts for topic -> function_id, sub_key
-> (topic,function_id) and sub_key -> handle, using the existing symbols
topic_functions, trigger_function_map, subscriptions, topic, id, function_id,
and handle to locate the code to change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@engine/tests/rabbitmq_queue_integration.rs`:
- Around line 1287-1356: The test fails because topic fanout subscriber queues
created by setup_subscriber_queue() do not create per-function DLX/retry
exchanges like setup_function_queue() does, so publisher.requeue() →
publisher.publish() republishes into the main fanout exchange and redelivers
retries to all subscribers; to fix, make setup_subscriber_queue() create the
same per-function DLX + retry exchange/queue chain (unique to the
function/trigger) and change publisher.requeue()/publish() logic to route
requeued messages to that per-function retry exchange (not the main fanout
exchange) for topic/fanout subscriptions so retries are delivered only to the
originating function.

---

Nitpick comments:
In `@engine/src/modules/queue/adapters/builtin/adapter.rs`:
- Around line 252-268: There is a small race between updating topic_functions
and then updating trigger_function_map/subscriptions because the first write
lock (topic_functions.write().await) is released before the other two are
acquired; to fix, acquire the necessary write locks together in a single scope
so updates are atomic: take topic_functions.write().await,
trigger_function_map.write().await and subscriptions.write().await (or a
combined lock) before performing the inserts for topic -> function_id, sub_key
-> (topic,function_id) and sub_key -> handle, using the existing symbols
topic_functions, trigger_function_map, subscriptions, topic, id, function_id,
and handle to locate the code to change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: bb114904-db48-423a-b1e9-5d9f88346a19

📥 Commits

Reviewing files that changed from the base of the PR and between 15cd83d and 1989db4.

📒 Files selected for processing (7)
  • engine/src/modules/queue/adapters/builtin/adapter.rs
  • engine/src/modules/queue/adapters/rabbitmq/adapter.rs
  • engine/src/modules/queue/adapters/rabbitmq/naming.rs
  • engine/src/modules/queue/adapters/rabbitmq/topology.rs
  • engine/src/modules/queue/adapters/rabbitmq/worker.rs
  • engine/tests/queue_e2e_fanout.rs
  • engine/tests/rabbitmq_queue_integration.rs
✅ Files skipped from review due to trivial changes (1)
  • engine/tests/queue_e2e_fanout.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • engine/src/modules/queue/adapters/rabbitmq/worker.rs
  • engine/src/modules/queue/adapters/rabbitmq/naming.rs
  • engine/src/modules/queue/adapters/rabbitmq/topology.rs

sergiofilhowz
sergiofilhowz previously approved these changes Mar 31, 2026
…xchange

When a subscriber failed and the retry handler requeued the message, it
published back to the topic's fanout exchange, causing ALL subscribers to
receive the retried copy. With max_attempts=3 the success function would
get 3 messages instead of 1. Similarly, exhausted messages went to the
topic-level DLQ instead of the per-function DLQ.

Now requeue() and publish_to_dlq() accept an optional function_id so the
worker can target the specific per-function queue/DLQ directly, bypassing
the fanout exchange.
@vercel
Copy link
Copy Markdown
Contributor

vercel bot commented Mar 31, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
iii-website Ready Ready Preview, Comment Mar 31, 2026 11:10pm
motia-docs Ready Ready Preview, Comment Mar 31, 2026 11:10pm

Request Review

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
engine/src/modules/queue/adapters/builtin/adapter.rs (1)

203-219: Consider releasing the read lock before pushing to queues.

The read lock on topic_functions is held across multiple await points while pushing to each function's queue. Under high fanout (many functions per topic), this blocks subscribe/unsubscribe writers for the duration of all pushes.

♻️ Proposed optimization to release lock earlier
     async fn enqueue(
         &self,
         topic: &str,
         data: Value,
         traceparent: Option<String>,
         baggage: Option<String>,
     ) {
-        let tf = self.topic_functions.read().await;
-        if let Some(function_ids) = tf.get(topic) {
-            for fid in function_ids {
-                let internal_queue = format!("{}::{}", topic, fid);
-                self.queue
-                    .push(
-                        &internal_queue,
-                        data.clone(),
-                        traceparent.clone(),
-                        baggage.clone(),
-                    )
-                    .await;
-            }
-        } else {
+        let internal_queues: Vec<String> = {
+            let tf = self.topic_functions.read().await;
+            tf.get(topic)
+                .map(|fids| fids.iter().map(|fid| format!("{}::{}", topic, fid)).collect())
+                .unwrap_or_default()
+        };
+
+        if internal_queues.is_empty() {
             self.queue.push(topic, data, traceparent, baggage).await;
+        } else {
+            for internal_queue in internal_queues {
+                self.queue
+                    .push(
+                        &internal_queue,
+                        data.clone(),
+                        traceparent.clone(),
+                        baggage.clone(),
+                    )
+                    .await;
+            }
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/modules/queue/adapters/builtin/adapter.rs` around lines 203 - 219,
The read lock on topic_functions (obtained via self.topic_functions.read().await
and bound to tf) is held across async await points in the loop calling
self.queue.push, which blocks subscribe/unsubscribe writers; fix by acquiring
the read lock, clone/collect the function id list (e.g., let function_ids =
tf.get(topic).cloned() or function_ids = tf.get(topic).map(|v| v.clone())), then
drop the lock (end the scope) and iterate over the cloned function_ids calling
self.queue.push with internal_queue for each fid; keep the fallback
self.queue.push(topic, ...) when None.
engine/src/modules/queue/adapters/rabbitmq/topology.rs (1)

70-115: The per-function queue should use DLX for consistency with setup_function_queue, but note that the current implementation is safe.

The current design uses manual DLQ publishing via handle_failure() and publish_to_dlq() in the retry handler. This works correctly—the only nack(requeue=false) call in the worker (line 176–179) is always followed by handle_failure() at line 183–186, which routes the message to the DLQ.

However, setup_function_queue uses x-dead-letter-exchange for automatic DLX-based routing (lines 138–140), while setup_subscriber_queue does not. This inconsistency could become a maintenance risk if future code changes bypass the handle_failure() path.

Consider adding x-dead-letter-exchange to setup_subscriber_queue to match the main queue pattern and make the DLQ routing automatic and defensive:

Suggested diff
 self.channel
     .queue_declare(
         &queue_name,
         QueueDeclareOptions {
             durable: true,
             ..Default::default()
         },
-        FieldTable::default(),
+        {
+            let mut args = FieldTable::default();
+            args.insert(
+                "x-dead-letter-exchange".into(),
+                AMQPValue::LongString("".into()),
+            );
+            args.insert(
+                "x-dead-letter-routing-key".into(),
+                AMQPValue::LongString(dlq_name.clone().into()),
+            );
+            args
+        },
     )
     .await?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/modules/queue/adapters/rabbitmq/topology.rs` around lines 70 -
115, The subscriber queue declaration in setup_subscriber_queue should include
the same x-dead-letter-exchange argument used by setup_function_queue so DLQ
routing becomes automatic; update the queue_declare call that creates queue_name
in setup_subscriber_queue to build a FieldTable containing the
"x-dead-letter-exchange" entry pointing to the function DLQ (use
names.function_dlq(function_id) or the same exchange identifier used by
setup_function_queue) so the queue is declared with the DLX enabled rather than
relying solely on manual handle_failure()/publish_to_dlq().
engine/src/modules/queue/adapters/rabbitmq/adapter.rs (1)

243-274: Document the fanout exchange pub-sub semantics: messages enqueued before subscribers exist will be silently dropped.

The enqueue method calls setup_topic (which only declares the fanout exchange) but not setup_subscriber_queue. In RabbitMQ, messages published to a fanout exchange with no bound queues are immediately discarded. This is standard pub-sub behavior but differs from queue-based semantics where messages could accumulate.

The tests never exercise this scenario—all register subscribers via setup_engine_with_topic_triggers() before enqueuing. If the system may enqueue messages before subscribers are active, add:

  1. A note in documentation explaining this behavior
  2. A warning log when publishing to a topic with no active subscriptions
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/modules/queue/adapters/rabbitmq/adapter.rs` around lines 243 -
274, The enqueue path (enqueue calls topology.setup_topic but not
setup_subscriber_queue) means publishing to a fanout exchange will silently drop
messages if no queues are bound; update docs to state fanout pub-sub semantics
(messages published before subscribers bind are discarded) and add runtime
detection + warning: extend or use a topology check (e.g., add/use a
topology.has_subscribers(topic) or similar) before publisher.publish in enqueue
and emit a warning log (via tracing::warn!) when no subscribers are present;
ensure the log references topic and job.id and keep existing success/error logs
unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@engine/src/modules/queue/adapters/builtin/adapter.rs`:
- Around line 291-307: There's a race where we check trigger_function_map
(still_has_triggers) before taking the topic_functions write lock, so a
concurrent subscribe can add the (mapped_topic, mapped_fid) after the check but
before removal; fix by moving the existence re-check into the critical section:
after acquiring the topic_functions write lock (tf) re-acquire or re-check
trigger_function_map (or re-compute still_has_triggers under the write lock) and
only then remove mapped_fid from tf if no triggers exist. Use the same
identifiers from the diff (removed_mapping, mapped_topic, mapped_fid,
trigger_function_map, topic_functions, tfm, tf) and ensure the final decision to
remove is based on the re-check performed while holding tf to prevent the race.

---

Nitpick comments:
In `@engine/src/modules/queue/adapters/builtin/adapter.rs`:
- Around line 203-219: The read lock on topic_functions (obtained via
self.topic_functions.read().await and bound to tf) is held across async await
points in the loop calling self.queue.push, which blocks subscribe/unsubscribe
writers; fix by acquiring the read lock, clone/collect the function id list
(e.g., let function_ids = tf.get(topic).cloned() or function_ids =
tf.get(topic).map(|v| v.clone())), then drop the lock (end the scope) and
iterate over the cloned function_ids calling self.queue.push with internal_queue
for each fid; keep the fallback self.queue.push(topic, ...) when None.

In `@engine/src/modules/queue/adapters/rabbitmq/adapter.rs`:
- Around line 243-274: The enqueue path (enqueue calls topology.setup_topic but
not setup_subscriber_queue) means publishing to a fanout exchange will silently
drop messages if no queues are bound; update docs to state fanout pub-sub
semantics (messages published before subscribers bind are discarded) and add
runtime detection + warning: extend or use a topology check (e.g., add/use a
topology.has_subscribers(topic) or similar) before publisher.publish in enqueue
and emit a warning log (via tracing::warn!) when no subscribers are present;
ensure the log references topic and job.id and keep existing success/error logs
unchanged.

In `@engine/src/modules/queue/adapters/rabbitmq/topology.rs`:
- Around line 70-115: The subscriber queue declaration in setup_subscriber_queue
should include the same x-dead-letter-exchange argument used by
setup_function_queue so DLQ routing becomes automatic; update the queue_declare
call that creates queue_name in setup_subscriber_queue to build a FieldTable
containing the "x-dead-letter-exchange" entry pointing to the function DLQ (use
names.function_dlq(function_id) or the same exchange identifier used by
setup_function_queue) so the queue is declared with the DLX enabled rather than
relying solely on manual handle_failure()/publish_to_dlq().
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 494f690d-16a3-4836-a6be-8a24f463cc60

📥 Commits

Reviewing files that changed from the base of the PR and between 1989db4 and 8062c9f.

📒 Files selected for processing (8)
  • engine/src/modules/queue/adapters/builtin/adapter.rs
  • engine/src/modules/queue/adapters/rabbitmq/adapter.rs
  • engine/src/modules/queue/adapters/rabbitmq/publisher.rs
  • engine/src/modules/queue/adapters/rabbitmq/retry.rs
  • engine/src/modules/queue/adapters/rabbitmq/topology.rs
  • engine/src/modules/queue/adapters/rabbitmq/worker.rs
  • engine/tests/queue_e2e_fanout.rs
  • engine/tests/rabbitmq_queue_integration.rs
✅ Files skipped from review due to trivial changes (1)
  • engine/tests/rabbitmq_queue_integration.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • engine/tests/queue_e2e_fanout.rs

Comment on lines +291 to 307
if let Some((mapped_topic, mapped_fid)) = removed_mapping {
let still_has_triggers = {
let tfm = self.trigger_function_map.read().await;
tfm.values()
.any(|(t, f)| t == &mapped_topic && f == &mapped_fid)
};

if !still_has_triggers {
let mut tf = self.topic_functions.write().await;
if let Some(fids) = tf.get_mut(&mapped_topic) {
fids.remove(&mapped_fid);
if fids.is_empty() {
tf.remove(&mapped_topic);
}
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Race condition: concurrent subscribe can be orphaned between still_has_triggers check and topic_functions removal.

Between checking trigger_function_map (line 293-296) and removing from topic_functions (line 299-304), another thread can subscribe with the same (topic, function_id). That subscription would add entries to both maps, but then this thread removes the function from topic_functions, causing future enqueue calls to skip the new subscription's queue.

🔒 Proposed fix: re-check under write lock
         if let Some((mapped_topic, mapped_fid)) = removed_mapping {
-            let still_has_triggers = {
-                let tfm = self.trigger_function_map.read().await;
-                tfm.values()
-                    .any(|(t, f)| t == &mapped_topic && f == &mapped_fid)
-            };
-
-            if !still_has_triggers {
-                let mut tf = self.topic_functions.write().await;
+            let mut tf = self.topic_functions.write().await;
+            let tfm = self.trigger_function_map.read().await;
+            let still_has_triggers = tfm
+                .values()
+                .any(|(t, f)| t == &mapped_topic && f == &mapped_fid);
+            drop(tfm);
+
+            if !still_has_triggers {
                 if let Some(fids) = tf.get_mut(&mapped_topic) {
                     fids.remove(&mapped_fid);
                     if fids.is_empty() {
                         tf.remove(&mapped_topic);
                     }
                 }
             }
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/modules/queue/adapters/builtin/adapter.rs` around lines 291 - 307,
There's a race where we check trigger_function_map (still_has_triggers) before
taking the topic_functions write lock, so a concurrent subscribe can add the
(mapped_topic, mapped_fid) after the check but before removal; fix by moving the
existence re-check into the critical section: after acquiring the
topic_functions write lock (tf) re-acquire or re-check trigger_function_map (or
re-compute still_has_triggers under the write lock) and only then remove
mapped_fid from tf if no triggers exist. Use the same identifiers from the diff
(removed_mapping, mapped_topic, mapped_fid, trigger_function_map,
topic_functions, tfm, tf) and ensure the final decision to remove is based on
the re-check performed while holding tf to prevent the race.

Multiple distinct functions subscribing to the same topic now each
receive every message (fan-out) instead of competing. Updated JS and
Python integration tests to assert per-function delivery of all messages.
@ytallo ytallo merged commit fbaca9e into main Mar 31, 2026
23 checks passed
@ytallo ytallo deleted the feat/queue-fanout-per-function branch March 31, 2026 23:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants