diff --git a/benches/http.rs b/benches/http.rs index 30b35b8b257f7..5c8b5eaac7efe 100644 --- a/benches/http.rs +++ b/benches/http.rs @@ -63,6 +63,7 @@ fn benchmark_http(c: &mut Criterion) { request: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + retry_strategy: Default::default(), }, ); diff --git a/changelog.d/10870_http_retry_strategy.feature.md b/changelog.d/10870_http_retry_strategy.feature.md new file mode 100644 index 0000000000000..09afba2eddd40 --- /dev/null +++ b/changelog.d/10870_http_retry_strategy.feature.md @@ -0,0 +1,7 @@ +HTTP-based sinks that use the shared retry helpers now support a `retry_strategy` configuration +option to control which HTTP response codes are retried. The `http` sink also includes a new +example showing how to retry only specific transient status codes. + +Issue: https://github.com/vectordotdev/vector/issues/10870 + +authors: ndrsg diff --git a/config/examples/http_sink_custom_retry.yaml b/config/examples/http_sink_custom_retry.yaml new file mode 100644 index 0000000000000..54c3028fbb8fb --- /dev/null +++ b/config/examples/http_sink_custom_retry.yaml @@ -0,0 +1,42 @@ +# HTTP sink example with a custom retry strategy +# ---------------------------------------------------- +# Sends demo logs to an HTTP endpoint and only retries the response codes +# that the upstream API documents as transient. + +data_dir: "/var/lib/vector" + +sources: + demo_logs: + type: "demo_logs" + format: "json" + interval: 1 + +sinks: + http_out: + type: "http" + inputs: [ "demo_logs" ] + uri: "https://example.com/ingest" + method: "post" + + # Skip the startup probe so the example can be adapted locally. + healthcheck: + enabled: false + + # Send newline-delimited JSON in the request body. + framing: + method: "newline_delimited" + encoding: + codec: "json" + + # Control how many retries are made and how quickly backoff grows. + request: + timeout_secs: 60 + retry_attempts: 8 + retry_initial_backoff_secs: 2 + retry_max_duration_secs: 30 + + # Retry only on the exact HTTP status codes that this destination + # treats as temporary failures. + retry_strategy: + type: "custom" + status_codes: [ 408, 425, 429, 503 ] diff --git a/src/sinks/appsignal/config.rs b/src/sinks/appsignal/config.rs index 9e8c000f995a3..fea94af05b132 100644 --- a/src/sinks/appsignal/config.rs +++ b/src/sinks/appsignal/config.rs @@ -21,7 +21,7 @@ use crate::{ prelude::{SinkConfig, SinkContext}, util::{ BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig, - http::HttpStatusRetryLogic, + http::{HttpStatusRetryLogic, RetryStrategy}, }, }, }; @@ -67,6 +67,10 @@ pub(super) struct AppsignalConfig { skip_serializing_if = "crate::serde::is_default" )] acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + retry_strategy: RetryStrategy, } pub(super) fn default_endpoint() -> String { @@ -99,7 +103,10 @@ impl AppsignalConfig { let request_opts = self.request; let request_settings = request_opts.into_settings(); - let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status); + let retry_logic = HttpStatusRetryLogic::new( + |req: &AppsignalResponse| req.http_status, + self.retry_strategy.clone(), + ); let service = ServiceBuilder::new() .settings(request_settings, retry_logic) diff --git a/src/sinks/axiom/config.rs b/src/sinks/axiom/config.rs index 61421973d5475..0930a997abc12 100644 --- a/src/sinks/axiom/config.rs +++ b/src/sinks/axiom/config.rs @@ -15,7 +15,8 @@ use crate::{ Healthcheck, VectorSink, http::config::{HttpMethod, HttpSinkConfig}, util::{ - BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig, + BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, + http::{RequestConfig, RetryStrategy}, }, }, tls::TlsConfig, @@ -124,6 +125,10 @@ pub struct AxiomConfig { skip_serializing_if = "crate::serde::is_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } impl GenerateConfig for AxiomConfig { @@ -180,6 +185,7 @@ impl SinkConfig for AxiomConfig { ), payload_prefix: "".into(), // Always newline delimited JSON payload_suffix: "".into(), // Always newline delimited JSON + retry_strategy: self.retry_strategy.clone(), }; http_sink_config.build(cx).await diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 4e876a18a26bd..67b53525ecd32 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -16,7 +16,10 @@ use crate::{ http::{HttpClient, get_http_scheme_from_uri}, sinks::{ prelude::*, - util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic}, + util::{ + RealtimeSizeBasedDefaultBatchSettings, UriSerde, + http::{HttpStatusRetryLogic, RetryStrategy}, + }, }, }; @@ -109,6 +112,10 @@ pub struct AzureLogsIngestionConfig { skip_serializing_if = "crate::serde::is_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } impl Default for AzureLogsIngestionConfig { @@ -125,6 +132,7 @@ impl Default for AzureLogsIngestionConfig { request: Default::default(), tls: None, acknowledgements: Default::default(), + retry_strategy: Default::default(), } } } @@ -348,8 +356,10 @@ impl AzureLogsIngestionConfig { )?; let healthcheck = service.healthcheck(); - let retry_logic = - HttpStatusRetryLogic::new(|res: &AzureLogsIngestionResponse| res.http_status); + let retry_logic = HttpStatusRetryLogic::new( + |res: &AzureLogsIngestionResponse| res.http_status, + self.retry_strategy.clone(), + ); let request_settings = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_settings, retry_logic) diff --git a/src/sinks/azure_monitor_logs/config.rs b/src/sinks/azure_monitor_logs/config.rs index dc60d179a2ff5..6d68aed7efd75 100644 --- a/src/sinks/azure_monitor_logs/config.rs +++ b/src/sinks/azure_monitor_logs/config.rs @@ -16,7 +16,10 @@ use crate::{ http::{HttpClient, get_http_scheme_from_uri}, sinks::{ prelude::*, - util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic}, + util::{ + RealtimeSizeBasedDefaultBatchSettings, UriSerde, + http::{HttpStatusRetryLogic, RetryStrategy}, + }, }, }; @@ -113,6 +116,10 @@ pub struct AzureMonitorLogsConfig { skip_serializing_if = "crate::serde::is_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } impl Default for AzureMonitorLogsConfig { @@ -129,6 +136,7 @@ impl Default for AzureMonitorLogsConfig { time_generated_key: None, tls: None, acknowledgements: Default::default(), + retry_strategy: Default::default(), } } } @@ -181,8 +189,10 @@ impl AzureMonitorLogsConfig { )?; let healthcheck = service.healthcheck(); - let retry_logic = - HttpStatusRetryLogic::new(|res: &AzureMonitorLogsResponse| res.http_status); + let retry_logic = HttpStatusRetryLogic::new( + |res: &AzureMonitorLogsResponse| res.http_status, + self.retry_strategy.clone(), + ); let request_settings = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_settings, retry_logic) diff --git a/src/sinks/datadog/events/config.rs b/src/sinks/datadog/events/config.rs index 2bfdc66315c3e..f9b0f9361bba8 100644 --- a/src/sinks/datadog/events/config.rs +++ b/src/sinks/datadog/events/config.rs @@ -14,7 +14,10 @@ use crate::{ sinks::{ Healthcheck, VectorSink, datadog::{DatadogCommonConfig, LocalDatadogCommonConfig}, - util::{ServiceBuilderExt, TowerRequestConfig, http::HttpStatusRetryLogic}, + util::{ + ServiceBuilderExt, TowerRequestConfig, + http::{HttpStatusRetryLogic, RetryStrategy}, + }, }, tls::MaybeTlsSettings, }; @@ -33,6 +36,10 @@ pub struct DatadogEventsConfig { #[configurable(derived)] #[serde(default)] pub request: TowerRequestConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } impl GenerateConfig for DatadogEventsConfig { @@ -64,7 +71,10 @@ impl DatadogEventsConfig { let request_opts = self.request; let request_settings = request_opts.into_settings(); - let retry_logic = HttpStatusRetryLogic::new(|req: &DatadogEventsResponse| req.http_status); + let retry_logic = HttpStatusRetryLogic::new( + |req: &DatadogEventsResponse| req.http_status, + self.retry_strategy.clone(), + ); let service = ServiceBuilder::new() .settings(request_settings, retry_logic) diff --git a/src/sinks/gcp/stackdriver/logs/config.rs b/src/sinks/gcp/stackdriver/logs/config.rs index 9b66eea4ac214..addb45163df76 100644 --- a/src/sinks/gcp/stackdriver/logs/config.rs +++ b/src/sinks/gcp/stackdriver/logs/config.rs @@ -21,7 +21,7 @@ use crate::{ prelude::*, util::{ BoxedRawValue, RealtimeSizeBasedDefaultBatchSettings, - http::{HttpService, http_response_retry_logic}, + http::{HttpService, RetryStrategy, http_response_retry_logic}, service::TowerRequestConfigDefaults, }, }, @@ -106,6 +106,10 @@ pub(super) struct StackdriverConfig { skip_serializing_if = "crate::serde::is_default" )] acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } pub(super) fn default_endpoint() -> String { @@ -269,7 +273,10 @@ impl SinkConfig for StackdriverConfig { let service = HttpService::new(client.clone(), stackdriver_logs_service_request_builder); let service = ServiceBuilder::new() - .settings(request_limits, http_response_retry_logic()) + .settings( + request_limits, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = StackdriverLogsSink::new(service, batch_settings, request_builder); diff --git a/src/sinks/gcp/stackdriver/metrics/config.rs b/src/sinks/gcp/stackdriver/metrics/config.rs index 2398a18e9beb9..ea2cc9d6b5293 100644 --- a/src/sinks/gcp/stackdriver/metrics/config.rs +++ b/src/sinks/gcp/stackdriver/metrics/config.rs @@ -15,7 +15,8 @@ use crate::{ prelude::*, util::{ http::{ - HttpRequest, HttpService, HttpServiceRequestBuilder, http_response_retry_logic, + HttpRequest, HttpService, HttpServiceRequestBuilder, RetryStrategy, + http_response_retry_logic, }, service::TowerRequestConfigDefaults, }, @@ -77,6 +78,10 @@ pub struct StackdriverConfig { skip_serializing_if = "crate::serde::is_default" )] pub(super) acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } fn default_metric_namespace_value() -> String { @@ -126,7 +131,10 @@ impl SinkConfig for StackdriverConfig { let service = HttpService::new(client, stackdriver_metrics_service_request_builder); let service = ServiceBuilder::new() - .settings(request_limits, http_response_retry_logic()) + .settings( + request_limits, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder); diff --git a/src/sinks/honeycomb/config.rs b/src/sinks/honeycomb/config.rs index 05a56cd9df368..f179248e1697b 100644 --- a/src/sinks/honeycomb/config.rs +++ b/src/sinks/honeycomb/config.rs @@ -16,7 +16,7 @@ use crate::{ prelude::*, util::{ BatchConfig, BoxedRawValue, - http::{HttpService, http_response_retry_logic}, + http::{HttpService, RetryStrategy, http_response_retry_logic}, }, }, }; @@ -71,6 +71,10 @@ pub struct HoneycombConfig { skip_serializing_if = "crate::serde::is_default" )] acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } fn default_endpoint() -> String { @@ -124,7 +128,10 @@ impl SinkConfig for HoneycombConfig { let request_limits = self.request.into_settings(); let service = ServiceBuilder::new() - .settings(request_limits, http_response_retry_logic()) + .settings( + request_limits, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = HoneycombSink::new(service, batch_settings, request_builder); diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index a3a105fa5c608..2b6abebe0d402 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -30,7 +30,10 @@ use crate::{ prelude::*, util::{ RealtimeSizeBasedDefaultBatchSettings, UriSerde, - http::{HttpService, OrderedHeaderName, RequestConfig, http_response_retry_logic}, + http::{ + HttpService, OrderedHeaderName, RequestConfig, RetryStrategy, + http_response_retry_logic, + }, }, }, }; @@ -100,6 +103,10 @@ pub struct HttpSinkConfig { skip_serializing_if = "crate::serde::is_default" )] pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } /// HTTP method. @@ -330,7 +337,10 @@ impl SinkConfig for HttpSinkConfig { let request_limits = self.request.tower.into_settings(); let service = ServiceBuilder::new() - .settings(request_limits, http_response_retry_logic()) + .settings( + request_limits, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = HttpSink::new( @@ -405,6 +415,7 @@ mod tests { acknowledgements: AcknowledgementsConfig::default(), payload_prefix: String::new(), payload_suffix: String::new(), + retry_strategy: RetryStrategy::default(), }; let external_resource = ExternalResource::new( diff --git a/src/sinks/http/tests.rs b/src/sinks/http/tests.rs index 12ff994239f5d..ee3e6a008db77 100644 --- a/src/sinks/http/tests.rs +++ b/src/sinks/http/tests.rs @@ -67,6 +67,7 @@ fn default_cfg(encoding: EncodingConfigWithFraming) -> HttpSinkConfig { request: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + retry_strategy: Default::default(), } } @@ -445,6 +446,103 @@ async fn retries_on_temporary_error() { .await; } +#[tokio::test] +async fn custom_retry_retries_only_configured_status_code() { + components::assert_sink_compliance(&HTTP_SINK_TAGS, async { + const NUM_LINES: usize = 1; + const NUM_FAILURES: usize = 2; + const CUSTOM_RETRY_CONFIG: &str = r#" + request.retry_attempts = 2 + request.retry_initial_backoff_secs = 1 + request.retry_max_duration_secs = 1 + retry_strategy.type = "custom" + retry_strategy.status_codes = [408, 425, 429, 503] + "#; + + let (in_addr, sink) = build_sink(CUSTOM_RETRY_CONFIG).await; + + let counter = Arc::new(atomic::AtomicUsize::new(0)); + let in_counter = Arc::clone(&counter); + let (rx, trigger, server) = build_test_server_generic(in_addr, move || { + let count = in_counter.fetch_add(1, atomic::Ordering::Relaxed); + if count < NUM_FAILURES { + Response::builder() + .status(StatusCode::SERVICE_UNAVAILABLE) + .body(Body::empty()) + .unwrap_or_else(|_| unreachable!()) + } else { + Response::new(Body::empty()) + } + }); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(100, NUM_LINES, Some(batch)); + let pump = sink.run(events); + + tokio::spawn(server); + + pump.await.unwrap(); + drop(trigger); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = get_received_gzip(rx, |parts| { + assert_eq!(Method::POST, parts.method); + assert_eq!("/frames", parts.uri.path()); + }) + .await; + + let tries = counter.load(atomic::Ordering::Relaxed); + assert_eq!(tries, NUM_FAILURES + 1); + assert_eq!(NUM_LINES, output_lines.len()); + assert_eq!(input_lines, output_lines); + }) + .await; +} + +#[tokio::test] +async fn custom_retry_does_not_retry_unconfigured_status_code() { + components::assert_sink_error(&COMPONENT_ERROR_TAGS, async { + const NUM_LINES: usize = 1; + const CUSTOM_RETRY_CONFIG: &str = r#" + request.retry_attempts = 2 + request.retry_initial_backoff_secs = 1 + request.retry_max_duration_secs = 1 + retry_strategy.type = "custom" + retry_strategy.status_codes = [408, 425, 429, 503] + "#; + + let (in_addr, sink) = build_sink(CUSTOM_RETRY_CONFIG).await; + + let counter = Arc::new(atomic::AtomicUsize::new(0)); + let in_counter = Arc::clone(&counter); + let (rx, trigger, server) = build_test_server_generic(in_addr, move || { + in_counter.fetch_add(1, atomic::Ordering::Relaxed); + Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::empty()) + .unwrap_or_else(|_| unreachable!()) + }); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_input_lines, events) = random_lines_with_stream(100, NUM_LINES, Some(batch)); + let pump = sink.run(events); + + tokio::spawn(server); + + pump.await.unwrap(); + drop(trigger); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); + assert_eq!(counter.load(atomic::Ordering::Relaxed), 1); + + let output_lines = + get_received_gzip(rx, |_| unreachable!("There should be no successful requests")).await; + assert!(output_lines.is_empty()); + }) + .await; +} + #[tokio::test] async fn fails_on_permanent_error() { components::assert_sink_error(&COMPONENT_ERROR_TAGS, async { diff --git a/src/sinks/keep/config.rs b/src/sinks/keep/config.rs index 7b81281253672..d3810976aa3c1 100644 --- a/src/sinks/keep/config.rs +++ b/src/sinks/keep/config.rs @@ -16,7 +16,7 @@ use crate::{ prelude::*, util::{ BatchConfig, BoxedRawValue, - http::{HttpService, http_response_retry_logic}, + http::{HttpService, RetryStrategy, http_response_retry_logic}, }, }, }; @@ -59,6 +59,10 @@ pub struct KeepConfig { skip_serializing_if = "crate::serde::is_default" )] acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } fn default_endpoint() -> String { @@ -111,7 +115,10 @@ impl SinkConfig for KeepConfig { let request_limits = self.request.into_settings(); let service = ServiceBuilder::new() - .settings(request_limits, http_response_retry_logic()) + .settings( + request_limits, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = KeepSink::new(service, batch_settings, request_builder); diff --git a/src/sinks/opentelemetry/mod.rs b/src/sinks/opentelemetry/mod.rs index 7d73e25c4030d..dcc13624372d5 100644 --- a/src/sinks/opentelemetry/mod.rs +++ b/src/sinks/opentelemetry/mod.rs @@ -56,6 +56,7 @@ impl Default for Protocol { request: Default::default(), tls: Default::default(), acknowledgements: Default::default(), + retry_strategy: Default::default(), }) } } diff --git a/src/sinks/prometheus/remote_write/config.rs b/src/sinks/prometheus/remote_write/config.rs index c1b420d6b216d..c9f214953003c 100644 --- a/src/sinks/prometheus/remote_write/config.rs +++ b/src/sinks/prometheus/remote_write/config.rs @@ -17,7 +17,7 @@ use crate::{ prometheus::PrometheusRemoteWriteAuth, util::{ auth::Auth, - http::{OrderedHeaderName, http_response_retry_logic}, + http::{OrderedHeaderName, RetryStrategy, http_response_retry_logic}, service::TowerRequestConfig, }, }, @@ -129,6 +129,10 @@ pub struct RemoteWriteConfig { #[serde(default = "default_compression")] #[derivative(Default(value = "default_compression()"))] pub compression: Compression, + + #[configurable(derived)] + #[serde(default)] + pub retry_strategy: RetryStrategy, } const fn default_compression() -> Compression { @@ -251,7 +255,10 @@ impl SinkConfig for RemoteWriteConfig { headers: validated_headers, }; let service = ServiceBuilder::new() - .settings(request_settings, http_response_retry_logic()) + .settings( + request_settings, + http_response_retry_logic(self.retry_strategy.clone()), + ) .service(service); let sink = RemoteWriteSink { diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index e19025e3c6ee3..875a01f8cf836 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -550,14 +550,107 @@ impl sink::Response for http::Response { } } +/// Configurable retry strategy for `http` based sinks. +/// +/// For more information about error responses, see [Client Error Responses][error_responses]. +/// +/// [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses +#[configurable_component] +#[derive(Debug, Clone, Default, PartialEq)] +#[serde(tag = "type", rename_all = "snake_case")] +#[configurable(metadata(docs::enum_tag_description = "The retry strategy enum."))] +pub enum RetryStrategy { + /// Don't retry any errors + None, + + /// Default strategy. See [`RetryStrategy::retry_action`] for more details. + #[default] + Default, + + /// Retry on *all* errorcodes + All, + + /// Custom retry strategy + Custom { + /// Retry on these specific HTTP status codes + status_codes: Vec, + }, +} + +impl RetryStrategy { + /// Returns the name of the retry strategy. + #[must_use] + const fn name(&self) -> &str { + match self { + Self::None => "Never retry strategy", + Self::Default => "Default retry strategy", + Self::All => "Retry all strategy", + Self::Custom { .. } => "Custom retry strategy", + } + } + + /// Determines if the given status code should be retried. + /// + /// For the `Default` strategy, the following status codes will be retried: + /// - 429 (Too Many Requests) + /// - 408 (Request Timeout) + /// - 5xx (Server Error) + /// + /// For the `Custom` strategy, the status codes specified in the `status_codes` field will be retried. + /// + /// For the `All` strategy, all non-success status codes will be retried. + #[must_use] + pub fn retry_action(&self, status: http::StatusCode) -> RetryAction { + if status.is_success() { + return RetryAction::Successful; + } + + let reason = format!( + "{}: {}", + self.name(), + status + .canonical_reason() + .unwrap_or(status.to_string().as_str()) + ) + .into(); + + match self { + Self::None => RetryAction::DontRetry(reason), + Self::Default => match status { + StatusCode::TOO_MANY_REQUESTS | StatusCode::REQUEST_TIMEOUT => { + RetryAction::Retry(reason) + } + _ => { + if status.is_server_error() { + RetryAction::Retry(reason) + } else { + RetryAction::DontRetry(reason) + } + } + }, + Self::All => RetryAction::Retry(reason), + Self::Custom { status_codes } => { + if status_codes.contains(&status.as_u16()) { + RetryAction::Retry(reason) + } else { + RetryAction::DontRetry(reason) + } + } + } + } +} + #[derive(Debug, Clone)] pub struct HttpRetryLogic { request: PhantomData, + retry_strategy: RetryStrategy, } + impl Default for HttpRetryLogic { fn default() -> Self { Self { request: PhantomData, + retry_strategy: RetryStrategy::Default, } } } @@ -572,20 +665,7 @@ impl RetryLogic for HttpRetryLogic { } fn should_retry_response(&self, response: &Self::Response) -> RetryAction { - let status = response.status(); - - match status { - StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()), - StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()), - StatusCode::NOT_IMPLEMENTED => { - RetryAction::DontRetry("endpoint not implemented".into()) - } - _ if status.is_server_error() => RetryAction::Retry( - format!("{}: {}", status, String::from_utf8_lossy(response.body())).into(), - ), - _ if status.is_success() => RetryAction::Successful, - _ => RetryAction::DontRetry(format!("response status: {status}").into()), - } + self.retry_strategy.retry_action(response.status()) } } @@ -596,6 +676,7 @@ pub struct HttpStatusRetryLogic { func: F, request: PhantomData, response: PhantomData, + retry_strategy: RetryStrategy, } impl HttpStatusRetryLogic @@ -604,11 +685,12 @@ where Req: Send + Sync + 'static, Res: Send + Sync + 'static, { - pub const fn new(func: F) -> HttpStatusRetryLogic { + pub const fn new(func: F, retry_strategy: RetryStrategy) -> HttpStatusRetryLogic { HttpStatusRetryLogic { func, request: PhantomData, response: PhantomData, + retry_strategy, } } } @@ -629,19 +711,7 @@ where fn should_retry_response(&self, response: &Res) -> RetryAction { let status = (self.func)(response); - - match status { - StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()), - StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()), - StatusCode::NOT_IMPLEMENTED => { - RetryAction::DontRetry("endpoint not implemented".into()) - } - _ if status.is_server_error() => { - RetryAction::Retry(format!("Http Status: {status}").into()) - } - _ if status.is_success() => RetryAction::Successful, - _ => RetryAction::DontRetry(format!("Http status: {status}").into()), - } + self.retry_strategy.retry_action(status) } } @@ -654,6 +724,7 @@ where func: self.func.clone(), request: PhantomData, response: PhantomData, + retry_strategy: self.retry_strategy.clone(), } } } @@ -820,12 +891,17 @@ impl DriverResponse for HttpResponse { } /// Creates a `RetryLogic` for use with `HttpResponse`. -pub fn http_response_retry_logic() -> HttpStatusRetryLogic< +pub fn http_response_retry_logic( + retry_strategy: RetryStrategy, +) -> HttpStatusRetryLogic< impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static, Request, HttpResponse, > { - HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status()) + HttpStatusRetryLogic::new( + |req: &HttpResponse| req.http_response.status(), + retry_strategy, + ) } /// Uses the estimated json encoded size to determine batch sizing. @@ -969,6 +1045,54 @@ mod test { ); } + #[test] + fn retry_strategy_none_preserves_success_and_rejects_failures() { + let strategy = RetryStrategy::None; + + assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful()); + assert!( + strategy + .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR) + .is_not_retryable() + ); + } + + #[test] + fn retry_strategy_all_preserves_success_and_retries_failures() { + let strategy = RetryStrategy::All; + + assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful()); + assert!( + strategy + .retry_action::<()>(StatusCode::BAD_REQUEST) + .is_retryable() + ); + assert!( + strategy + .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR) + .is_retryable() + ); + } + + #[test] + fn retry_strategy_custom_only_retries_configured_statuses() { + let strategy = RetryStrategy::Custom { + status_codes: vec![StatusCode::BAD_REQUEST.as_u16()], + }; + + assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful()); + assert!( + strategy + .retry_action::<()>(StatusCode::BAD_REQUEST) + .is_retryable() + ); + assert!( + strategy + .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR) + .is_not_retryable() + ); + } + #[tokio::test] async fn util_http_it_makes_http_requests() { let (_guard, addr) = next_addr();