Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/consumer/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<Exe: Executor> ConsumerBuilder<Exe> {
self
}

/// Interval for refreshing the topics when using a topic regex. Unused otherwise.
/// Interval for refreshing the topics when using a topic regex or when errors occur with a MultiTopicConsumer
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_topic_refresh(mut self, refresh_interval: Duration) -> Self {
self.topic_refresh = Some(refresh_interval);
Expand Down
16 changes: 5 additions & 11 deletions src/consumer/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl<T: DeserializeMessage, Exe: Executor> MultiTopicConsumer<T, Exe> {
}),
)
.await?;
trace!("created {} consumers", consumers.len());
info!("created {} consumers", consumers.len());
Ok(consumers)
}));
}
Expand Down Expand Up @@ -352,11 +352,9 @@ impl<T: 'static + DeserializeMessage, Exe: Executor> Stream for MultiTopicConsum
}
}

if self.topic_regex.is_some() {
if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) {
self.update_topics();
return self.poll_next(cx);
}
if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) {
self.update_topics();
return self.poll_next(cx);
}

let mut topics_to_remove = Vec::new();
Expand All @@ -383,11 +381,7 @@ impl<T: 'static + DeserializeMessage, Exe: Executor> Stream for MultiTopicConsum
"Unexpected error consuming from pulsar topic {}: {}",
&topic, e
);
// Only remove topic from MultiTopicConsumer on error if they
// can be re-added later by regex
if self.topic_regex.is_some() {
topics_to_remove.push(topic.clone());
}
topics_to_remove.push(topic.clone());
}
}
} else {
Expand Down