Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 168 additions & 32 deletions engine/src/modules/queue/adapters/builtin/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// See LICENSE and PATENTS files for details.

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
sync::{
Arc,
atomic::{AtomicU64, Ordering},
Expand Down Expand Up @@ -46,6 +46,8 @@ pub struct BuiltinQueueAdapter {
queue: Arc<BuiltinQueue>,
engine: Arc<Engine>,
subscriptions: Arc<RwLock<HashMap<String, SubscriptionHandle>>>,
topic_functions: Arc<RwLock<HashMap<String, HashSet<String>>>>,
trigger_function_map: Arc<RwLock<HashMap<String, (String, String)>>>,
delivery_map: Arc<RwLock<HashMap<u64, DeliveryInfo>>>,
delivery_counter: Arc<AtomicU64>,
poll_intervals: Arc<RwLock<HashMap<String, u64>>>,
Expand Down Expand Up @@ -142,6 +144,8 @@ impl BuiltinQueueAdapter {
queue,
engine,
subscriptions: Arc::new(RwLock::new(HashMap::new())),
topic_functions: Arc::new(RwLock::new(HashMap::new())),
trigger_function_map: Arc::new(RwLock::new(HashMap::new())),
delivery_map: Arc::new(RwLock::new(HashMap::new())),
delivery_counter: Arc::new(AtomicU64::new(0)),
poll_intervals: Arc::new(RwLock::new(HashMap::new())),
Expand All @@ -155,6 +159,8 @@ impl Clone for BuiltinQueueAdapter {
queue: Arc::clone(&self.queue),
engine: Arc::clone(&self.engine),
subscriptions: Arc::clone(&self.subscriptions),
topic_functions: Arc::clone(&self.topic_functions),
trigger_function_map: Arc::clone(&self.trigger_function_map),
delivery_map: Arc::clone(&self.delivery_map),
delivery_counter: Arc::clone(&self.delivery_counter),
poll_intervals: Arc::clone(&self.poll_intervals),
Expand Down Expand Up @@ -194,7 +200,22 @@ impl QueueAdapter for BuiltinQueueAdapter {
traceparent: Option<String>,
baggage: Option<String>,
) {
self.queue.push(topic, data, traceparent, baggage).await;
let tf = self.topic_functions.read().await;
if let Some(function_ids) = tf.get(topic) {
for fid in function_ids {
let internal_queue = format!("{}::{}", topic, fid);
self.queue
.push(
&internal_queue,
data.clone(),
traceparent.clone(),
baggage.clone(),
)
.await;
}
} else {
self.queue.push(topic, data, traceparent, baggage).await;
}
}

async fn subscribe(
Expand All @@ -205,6 +226,8 @@ impl QueueAdapter for BuiltinQueueAdapter {
condition_function_id: Option<String>,
queue_config: Option<SubscriberQueueConfig>,
) {
let internal_queue = format!("{}::{}", topic, function_id);

let handler: Arc<dyn JobHandler> = Arc::new(FunctionHandler {
engine: Arc::clone(&self.engine),
function_id: function_id.to_string(),
Expand All @@ -223,49 +246,141 @@ impl QueueAdapter for BuiltinQueueAdapter {

let handle = self
.queue
.subscribe(topic, handler, subscription_config)
.subscribe(&internal_queue, handler, subscription_config)
.await;

let mut subs = self.subscriptions.write().await;
subs.insert(format!("{}:{}", topic, id), handle);
{
let mut tf = self.topic_functions.write().await;
tf.entry(topic.to_string())
.or_default()
.insert(function_id.to_string());
}

tracing::debug!(topic = %topic, id = %id, function_id = %function_id, "Subscribed to queue via BuiltinQueueAdapter");
{
let sub_key = format!("{}:{}", topic, id);
let mut tfm = self.trigger_function_map.write().await;
tfm.insert(
sub_key.clone(),
(topic.to_string(), function_id.to_string()),
);
let mut subs = self.subscriptions.write().await;
subs.insert(sub_key, handle);
}

tracing::debug!(topic = %topic, id = %id, function_id = %function_id, internal_queue = %internal_queue, "Subscribed to queue via BuiltinQueueAdapter");
}

async fn unsubscribe(&self, topic: &str, id: &str) {
let key = format!("{}:{}", topic, id);
let mut subs = self.subscriptions.write().await;

if let Some(handle) = subs.remove(&key) {
self.queue.unsubscribe(handle).await;
tracing::debug!(topic = %topic, id = %id, "Unsubscribed from queue");
} else {
tracing::warn!(topic = %topic, id = %id, "No subscription found to unsubscribe");
let removed_mapping = {
let mut tfm = self.trigger_function_map.write().await;
tfm.remove(&key)
};

{
let mut subs = self.subscriptions.write().await;
if let Some(handle) = subs.remove(&key) {
self.queue.unsubscribe(handle).await;
tracing::debug!(topic = %topic, id = %id, "Unsubscribed from queue");
} else {
tracing::warn!(topic = %topic, id = %id, "No subscription found to unsubscribe");
}
}

if let Some((mapped_topic, mapped_fid)) = removed_mapping {
let still_has_triggers = {
let tfm = self.trigger_function_map.read().await;
tfm.values()
.any(|(t, f)| t == &mapped_topic && f == &mapped_fid)
};

if !still_has_triggers {
let mut tf = self.topic_functions.write().await;
if let Some(fids) = tf.get_mut(&mapped_topic) {
fids.remove(&mapped_fid);
if fids.is_empty() {
tf.remove(&mapped_topic);
}
}
}
}
Comment on lines +291 to 307
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Race condition: concurrent subscribe can be orphaned between still_has_triggers check and topic_functions removal.

Between checking trigger_function_map (line 293-296) and removing from topic_functions (line 299-304), another thread can subscribe with the same (topic, function_id). That subscription would add entries to both maps, but then this thread removes the function from topic_functions, causing future enqueue calls to skip the new subscription's queue.

🔒 Proposed fix: re-check under write lock
         if let Some((mapped_topic, mapped_fid)) = removed_mapping {
-            let still_has_triggers = {
-                let tfm = self.trigger_function_map.read().await;
-                tfm.values()
-                    .any(|(t, f)| t == &mapped_topic && f == &mapped_fid)
-            };
-
-            if !still_has_triggers {
-                let mut tf = self.topic_functions.write().await;
+            let mut tf = self.topic_functions.write().await;
+            let tfm = self.trigger_function_map.read().await;
+            let still_has_triggers = tfm
+                .values()
+                .any(|(t, f)| t == &mapped_topic && f == &mapped_fid);
+            drop(tfm);
+
+            if !still_has_triggers {
                 if let Some(fids) = tf.get_mut(&mapped_topic) {
                     fids.remove(&mapped_fid);
                     if fids.is_empty() {
                         tf.remove(&mapped_topic);
                     }
                 }
             }
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/modules/queue/adapters/builtin/adapter.rs` around lines 291 - 307,
There's a race where we check trigger_function_map (still_has_triggers) before
taking the topic_functions write lock, so a concurrent subscribe can add the
(mapped_topic, mapped_fid) after the check but before removal; fix by moving the
existence re-check into the critical section: after acquiring the
topic_functions write lock (tf) re-acquire or re-check trigger_function_map (or
re-compute still_has_triggers under the write lock) and only then remove
mapped_fid from tf if no triggers exist. Use the same identifiers from the diff
(removed_mapping, mapped_topic, mapped_fid, trigger_function_map,
topic_functions, tfm, tf) and ensure the final decision to remove is based on
the re-check performed while holding tf to prevent the race.

}

async fn redrive_dlq(&self, topic: &str) -> anyhow::Result<u64> {
Ok(self.queue.dlq_redrive(topic).await)
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
let mut total = 0u64;
for fid in fids {
let iq = format!("{}::{}", topic, fid);
total += self.queue.dlq_redrive(&iq).await;
}
Ok(total)
} else {
Ok(self.queue.dlq_redrive(topic).await)
}
}

async fn redrive_dlq_message(&self, topic: &str, message_id: &str) -> anyhow::Result<bool> {
Ok(self.queue.dlq_redrive_message(topic, message_id).await)
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
for fid in fids {
let iq = format!("{}::{}", topic, fid);
if self.queue.dlq_redrive_message(&iq, message_id).await {
return Ok(true);
}
}
Ok(false)
} else {
Ok(self.queue.dlq_redrive_message(topic, message_id).await)
}
}

async fn discard_dlq_message(&self, topic: &str, message_id: &str) -> anyhow::Result<bool> {
Ok(self.queue.dlq_discard_message(topic, message_id).await)
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
for fid in fids {
let iq = format!("{}::{}", topic, fid);
if self.queue.dlq_discard_message(&iq, message_id).await {
return Ok(true);
}
}
Ok(false)
} else {
Ok(self.queue.dlq_discard_message(topic, message_id).await)
}
}

async fn dlq_count(&self, topic: &str) -> anyhow::Result<u64> {
Ok(self.queue.dlq_count(topic).await)
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
let mut total = 0u64;
for fid in fids {
let iq = format!("{}::{}", topic, fid);
total += self.queue.dlq_count(&iq).await;
}
Ok(total)
} else {
Ok(self.queue.dlq_count(topic).await)
}
}

async fn dlq_messages(
&self,
topic: &str,
count: usize,
) -> anyhow::Result<Vec<serde_json::Value>> {
Ok(self.queue.dlq_messages(topic, count).await)
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
let mut all = Vec::new();
for fid in fids {
let iq = format!("{}::{}", topic, fid);
all.extend(self.queue.dlq_messages(&iq, count).await);
}
Ok(all)
} else {
Ok(self.queue.dlq_messages(topic, count).await)
}
}
Comment on lines 368 to 384
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

dlq_messages may return more than count messages when aggregating.

When multiple functions subscribe to a topic, this method calls self.queue.dlq_messages(&iq, count) for each function's internal queue, then extends all. This can return up to count * number_of_functions messages, exceeding the caller's limit.

Compare to dlq_peek (lines 589-601) which properly handles this by sorting and applying skip/take after aggregation.

🐛 Proposed fix
     async fn dlq_messages(
         &self,
         topic: &str,
         count: usize,
     ) -> anyhow::Result<Vec<serde_json::Value>> {
         let tf = self.topic_functions.read().await;
         if let Some(fids) = tf.get(topic) {
             let mut all = Vec::new();
             for fid in fids {
                 let iq = format!("{}::{}", topic, fid);
-                all.extend(self.queue.dlq_messages(&iq, count).await);
+                let remaining = count.saturating_sub(all.len());
+                if remaining == 0 {
+                    break;
+                }
+                all.extend(self.queue.dlq_messages(&iq, remaining).await);
             }
+            all.truncate(count);
             Ok(all)
         } else {
             Ok(self.queue.dlq_messages(topic, count).await)
         }
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn dlq_messages(
&self,
topic: &str,
count: usize,
) -> anyhow::Result<Vec<serde_json::Value>> {
Ok(self.queue.dlq_messages(topic, count).await)
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
let mut all = Vec::new();
for fid in fids {
let iq = format!("{}::{}", topic, fid);
all.extend(self.queue.dlq_messages(&iq, count).await);
}
Ok(all)
} else {
Ok(self.queue.dlq_messages(topic, count).await)
}
}
async fn dlq_messages(
&self,
topic: &str,
count: usize,
) -> anyhow::Result<Vec<serde_json::Value>> {
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
let mut all = Vec::new();
for fid in fids {
let iq = format!("{}::{}", topic, fid);
let remaining = count.saturating_sub(all.len());
if remaining == 0 {
break;
}
all.extend(self.queue.dlq_messages(&iq, remaining).await);
}
all.truncate(count);
Ok(all)
} else {
Ok(self.queue.dlq_messages(topic, count).await)
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/modules/queue/adapters/builtin/adapter.rs` around lines 368 - 384,
The current dlq_messages implementation aggregates results from multiple
internal queues (constructed as iq = format!("{}::{}", topic, fid)) by extending
a Vec, which can yield up to count * num_functions items; change dlq_messages to
mirror dlq_peek's aggregation strategy: collect all messages from each
self.queue.dlq_messages(&iq, count).await into a single Vec, sort them
deterministically (e.g., by the same timestamp/field used in dlq_peek) and then
truncate/take only up to count before returning; keep the same behavior for the
single-topic branch (Ok(self.queue.dlq_messages(topic, count).await)) and reuse
topic_functions and iq naming to locate the code.


async fn setup_function_queue(
Expand Down Expand Up @@ -410,17 +525,13 @@ impl QueueAdapter for BuiltinQueueAdapter {
async fn list_topics(&self) -> anyhow::Result<Vec<crate::modules::queue::TopicInfo>> {
let mut topic_counts: HashMap<String, u64> = HashMap::new();

// Gather topics from subscriptions (keys are "topic:id" format)
{
let subs = self.subscriptions.read().await;
for key in subs.keys() {
if let Some(topic) = key.split(':').next() {
*topic_counts.entry(topic.to_string()).or_insert(0) += 1;
}
let tfm = self.trigger_function_map.read().await;
for (topic, _fid) in tfm.values() {
*topic_counts.entry(topic.clone()).or_insert(0) += 1;
}
}

// Gather function queue topics from poll_intervals
{
let intervals = self.poll_intervals.read().await;
for queue_name in intervals.keys() {
Expand All @@ -440,18 +551,27 @@ impl QueueAdapter for BuiltinQueueAdapter {
}

async fn topic_stats(&self, topic: &str) -> anyhow::Result<crate::modules::queue::TopicStats> {
// Count subscribers for this topic
let consumer_count = {
let subs = self.subscriptions.read().await;
subs.keys()
.filter(|key| key.split(':').next().map(|t| t == topic).unwrap_or(false))
.count() as u64
let tfm = self.trigger_function_map.read().await;
tfm.values().filter(|(t, _)| t == topic).count() as u64
};

let dlq_depth = self.queue.dlq_count(topic).await;
let tf = self.topic_functions.read().await;
let mut depth = 0u64;
let mut dlq_depth = 0u64;
if let Some(fids) = tf.get(topic) {
for fid in fids {
let iq = format!("{}::{}", topic, fid);
depth += self.queue.queue_depth(&iq).await;
dlq_depth += self.queue.dlq_count(&iq).await;
}
} else {
depth = self.queue.queue_depth(topic).await;
dlq_depth = self.queue.dlq_count(topic).await;
}

Ok(crate::modules::queue::TopicStats {
depth: self.queue.queue_depth(topic).await,
depth,
consumer_count,
dlq_depth,
config: None,
Expand All @@ -464,7 +584,23 @@ impl QueueAdapter for BuiltinQueueAdapter {
offset: u64,
limit: u64,
) -> anyhow::Result<Vec<crate::modules::queue::DlqMessage>> {
Ok(self.queue.dlq_peek(topic, offset, limit).await)
let tf = self.topic_functions.read().await;
if let Some(fids) = tf.get(topic) {
let mut all = Vec::new();
for fid in fids {
let iq = format!("{}::{}", topic, fid);
all.extend(self.queue.dlq_peek(&iq, 0, offset + limit).await);
}
all.sort_by(|a, b| b.failed_at.cmp(&a.failed_at));
let result = all
.into_iter()
.skip(offset as usize)
.take(limit as usize)
.collect();
Ok(result)
} else {
Ok(self.queue.dlq_peek(topic, offset, limit).await)
}
}
}

Expand Down
25 changes: 23 additions & 2 deletions engine/src/modules/queue/adapters/rabbitmq/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ impl QueueAdapter for RabbitMQAdapter {
condition_function_id: Option<String>,
queue_config: Option<SubscriberQueueConfig>,
) {
use super::naming::RabbitNames;

let topic = topic.to_string();
let id = id.to_string();
let function_id = function_id.to_string();
Expand All @@ -300,11 +302,27 @@ impl QueueAdapter for RabbitMQAdapter {
tracing::error!(
error = ?e,
topic = %topic,
"Failed to setup RabbitMQ topology for subscription"
"Failed to setup RabbitMQ fanout exchange"
);
return;
}

if let Err(e) = self
.topology
.setup_subscriber_queue(&topic, &function_id)
.await
{
tracing::error!(
error = ?e,
topic = %topic,
function_id = %function_id,
"Failed to setup RabbitMQ per-function queue"
);
return;
}

let names = RabbitNames::new(&topic);
let per_function_queue = names.function_queue(&function_id);
let consumer_tag = format!("consumer-{}", Uuid::new_v4());

let effective_queue_mode = queue_config
Expand All @@ -330,6 +348,7 @@ impl QueueAdapter for RabbitMQAdapter {
let topic_clone = topic.clone();
let function_id_clone = function_id.clone();
let consumer_tag_clone = consumer_tag.clone();
let queue_name_clone = per_function_queue.clone();

let task_handle = tokio::spawn(async move {
worker
Expand All @@ -338,6 +357,7 @@ impl QueueAdapter for RabbitMQAdapter {
function_id_clone,
condition_function_id,
consumer_tag_clone,
queue_name_clone,
)
.await;
});
Expand All @@ -355,7 +375,8 @@ impl QueueAdapter for RabbitMQAdapter {
tracing::debug!(
topic = %topic,
function_id = %function_id,
"Subscribed to RabbitMQ queue"
queue = %per_function_queue,
"Subscribed to RabbitMQ per-function queue"
);
}

Expand Down
8 changes: 8 additions & 0 deletions engine/src/modules/queue/adapters/rabbitmq/naming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ impl RabbitNames {
format!("{}.{}.queue", EXCHANGE_PREFIX, self.topic)
}

pub fn function_queue(&self, function_id: &str) -> String {
format!("{}.{}.{}.queue", EXCHANGE_PREFIX, self.topic, function_id)
}

pub fn function_dlq(&self, function_id: &str) -> String {
format!("{}.{}.{}.dlq", EXCHANGE_PREFIX, self.topic, function_id)
}

pub fn dlq(&self) -> String {
format!("{}.{}.dlq", EXCHANGE_PREFIX, self.topic)
}
Expand Down
Loading
Loading