diff --git a/src/consumer/builder.rs b/src/consumer/builder.rs index 837f556..5d8342e 100644 --- a/src/consumer/builder.rs +++ b/src/consumer/builder.rs @@ -118,7 +118,7 @@ impl ConsumerBuilder { self } - /// Interval for refreshing the topics when using a topic regex. Unused otherwise. + /// Interval for refreshing the topics when using a topic regex or when errors occur with a MultiTopicConsumer #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub fn with_topic_refresh(mut self, refresh_interval: Duration) -> Self { self.topic_refresh = Some(refresh_interval); diff --git a/src/consumer/multi.rs b/src/consumer/multi.rs index 5dc34d8..cc9b9dd 100644 --- a/src/consumer/multi.rs +++ b/src/consumer/multi.rs @@ -194,7 +194,7 @@ impl MultiTopicConsumer { }), ) .await?; - trace!("created {} consumers", consumers.len()); + info!("created {} consumers", consumers.len()); Ok(consumers) })); } @@ -352,11 +352,9 @@ impl Stream for MultiTopicConsum } } - if self.topic_regex.is_some() { - if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) { - self.update_topics(); - return self.poll_next(cx); - } + if let Poll::Ready(Some(_)) = self.refresh.as_mut().poll_next(cx) { + self.update_topics(); + return self.poll_next(cx); } let mut topics_to_remove = Vec::new(); @@ -383,11 +381,7 @@ impl Stream for MultiTopicConsum "Unexpected error consuming from pulsar topic {}: {}", &topic, e ); - // Only remove topic from MultiTopicConsumer on error if they - // can be re-added later by regex - if self.topic_regex.is_some() { - topics_to_remove.push(topic.clone()); - } + topics_to_remove.push(topic.clone()); } } } else {