Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions create-notice.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ function main {
add_license "wheel" "https://raw.githubusercontent.com/pypa/wheel/main/LICENSE.txt"
add_license "pip" "https://raw.githubusercontent.com/pypa/pip/main/LICENSE.txt"
add_license "boto3" "https://raw.githubusercontent.com/boto/boto3/develop/LICENSE"
# OTLP corpus preparation dependencies (otlp extra)
add_license "opentelemetry-proto" "https://raw.githubusercontent.com/open-telemetry/opentelemetry-python/main/LICENSE"
add_license "protobuf" "https://raw.githubusercontent.com/protocolbuffers/protobuf/main/LICENSE"

# transitive dependencies
# Jinja2 dependencies
Expand Down
15 changes: 9 additions & 6 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2129,34 +2129,37 @@ def _parse_headers(e):
# Some runners return a raw response, causing the 'error' property to be a string literal of the bytes/BytesIO object,
# we should avoid bubbling that up
# e.g. ApiError(413, '<_io.BytesIO object at 0xffffaf146a70>')
# errors="replace" so binary response bodies (e.g. OTLP ingest returns binary protobuf
# error frames) don't crash the driver with UnicodeDecodeError. Undecodable bytes become
# U+FFFD in the logged/recorded message.
if isinstance(e.body, bytes):
# could be an empty body
if error_body := e.body.decode("utf-8"):
if error_body := e.body.decode("utf-8", errors="replace"):
error_message = error_body
else:
# to be consistent with an empty 'e.error'
error_message = str(None)
elif isinstance(e.body, BytesIO):
# could be an empty body
if error_body := e.body.read().decode("utf-8"):
if error_body := e.body.read().decode("utf-8", errors="replace"):
error_message = error_body
else:
# to be consistent with an empty 'e.error'
error_message = str(None)
# fallback to 'error' property if the body isn't bytes/BytesIO
else:
if isinstance(e.error, bytes):
error_message = e.error.decode("utf-8")
error_message = e.error.decode("utf-8", errors="replace")
elif isinstance(e.error, BytesIO):
error_message = e.error.read().decode("utf-8")
error_message = e.error.read().decode("utf-8", errors="replace")
else:
# if the 'error' is empty, we get back str(None)
error_message = e.error

if isinstance(e.info, bytes):
error_info = e.info.decode("utf-8")
error_info = e.info.decode("utf-8", errors="replace")
elif isinstance(e.info, BytesIO):
error_info = e.info.read().decode("utf-8")
error_info = e.info.read().decode("utf-8", errors="replace")
else:
error_info = e.info

Expand Down
166 changes: 166 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

def register_default_runners(config: Optional[types.Config] = None):
register_runner(track.OperationType.Bulk, BulkIndex(), async_runner=True)
register_runner(track.OperationType.OtlpIngest, OtlpIngest(), async_runner=True)
register_runner(track.OperationType.ForceMerge, ForceMerge(), async_runner=True)
register_runner(track.OperationType.IndexStats, Retry(IndicesStats()), async_runner=True)
register_runner(track.OperationType.NodeStats, NodeStats(), async_runner=True)
Expand Down Expand Up @@ -806,6 +807,171 @@ def __repr__(self, *args, **kwargs):
return "bulk-index"


class _ProtobufSerializer:
"""Passthrough serializer for pre-serialized binary protobuf payloads."""

mimetype = "application/x-protobuf"

def dumps(self, data):
if isinstance(data, (bytes, bytearray)):
return bytes(data)
raise TypeError(f"Expected bytes for application/x-protobuf body, got {type(data).__name__}")

def loads(self, data):
return data


def _decode_error_body(body) -> str:
"""Best-effort string-rep of an ApiError body for logging. ES returns a mix of UTF-8 text and
binary protobuf framing for OTLP errors (e.g. 429s come back as protobuf with embedded text)."""
if body is None:
return "<no body>"
if hasattr(body, "read"):
body = body.read()
if isinstance(body, (bytes, bytearray)):
return bytes(body).decode("utf-8", errors="replace")
return str(body)


class OtlpIngest(Runner):
"""
Sends a pre-serialized OTLP ExportMetricsServiceRequest (binary protobuf) to Elasticsearch.
"""

DEFAULT_ENDPOINT = "/_otlp/v1/metrics"
_PROTOBUF_MIMETYPE = "application/x-protobuf"
# statuses we treat as transient and retry with backoff. Matches elastic-transport's
# default retry_on_status, but we add proper exponential backoff between attempts.
_RETRYABLE_STATUSES = frozenset({429, 502, 503, 504})
_MAX_BACKOFF_SECONDS = 30.0

async def __call__(self, es, params):
"""
:param es: The Elasticsearch client.
:param params: A hash with all parameters.

Mandatory parameters:
* ``body``: Raw binary protobuf payload (bytes).

Optional parameters:
* ``endpoint``: OTLP endpoint path. Defaults to ``/_otlp/v1/metrics``.
* ``request-timeout``: Client-side timeout in seconds.
* ``retries-on-error``: Number of retries on retryable errors (429, 502, 503, 504, connection
errors). Defaults to 5.
* ``retry-wait-period``: Base seconds for exponential backoff with full jitter between
retries. Defaults to 0.5 (so attempts sleep up to 0.5, 1, 2, 4, 8 ... seconds, capped at 30s).
"""
body = mandatory(params, "body", self)
path = params.get("endpoint", self.DEFAULT_ENDPOINT)
max_retries = int(params.get("retries-on-error", 5))
retry_wait_base = float(params.get("retry-wait-period", 0.5))

# max_retries=0 on the transport — our outer loop handles retries with proper backoff.
# Without this the transport would fire its own 4 fast back-to-back retries on 429 before
# our backoff ever gets a chance, hammering an already-overloaded ES.
transport_params = {"max_retries": 0}
if "request-timeout" in params:
transport_params["request_timeout"] = params["request-timeout"]
es = es.options(**transport_params)

# elastic-transport has no built-in serializer for application/x-protobuf; register a
# passthrough serializer so it forwards the already-serialized bytes without modification.
serializers = es.transport.serializers.serializers
if self._PROTOBUF_MIMETYPE not in serializers:
serializers[self._PROTOBUF_MIMETYPE] = _ProtobufSerializer()

# Local imports follow the existing pattern in this module for the elasticsearch dependency.
# pylint: disable=import-outside-toplevel
import elasticsearch
from elasticsearch import AsyncElasticsearch

headers = {"Content-Type": self._PROTOBUF_MIMETYPE}

max_attempts = max_retries + 1
last_status: Optional[int] = None
last_error_type = "rejected"
for attempt in range(max_attempts):
try:
# Bypass RallyAsyncElasticsearch.perform_request to avoid injecting
# ES REST compatibility headers (Accept: application/vnd.elasticsearch+...)
# that OTLP endpoints do not understand.
await AsyncElasticsearch.perform_request(
es,
method="POST",
path=path,
headers=headers,
body=body,
)
return {
"weight": 1,
"unit": "ops",
"success": True,
"request-status": 200,
"request-size-bytes": len(body),
}
except elasticsearch.ApiError as e:
last_status = e.status_code
if e.status_code in self._RETRYABLE_STATUSES:
last_error_type = "backpressure" if e.status_code == 429 else "transport"
if attempt < max_attempts - 1:
sleep_s = self._backoff(attempt, retry_wait_base)
self.logger.info(
"OTLP ingest got HTTP %s, retrying in %.2fs (attempt %d/%d).",
e.status_code,
sleep_s,
attempt + 2,
max_attempts,
)
await asyncio.sleep(sleep_s)
continue
self.logger.warning(
"OTLP ingest failed after %d attempts: HTTP %s — %s",
max_attempts,
e.status_code,
_decode_error_body(e.body),
)
else:
# non-retryable HTTP error (e.g. 400/401/403/404) — fail fast
last_error_type = "rejected"
self.logger.warning("OTLP ingest failed: HTTP %s — %s", e.status_code, _decode_error_body(e.body))
break
except (elasticsearch.ConnectionError, elasticsearch.ConnectionTimeout) as e:
last_status = None
last_error_type = "transport"
if attempt < max_attempts - 1:
sleep_s = self._backoff(attempt, retry_wait_base)
self.logger.info(
"OTLP ingest connection error (%s), retrying in %.2fs (attempt %d/%d).",
type(e).__name__,
sleep_s,
attempt + 2,
max_attempts,
)
await asyncio.sleep(sleep_s)
continue
self.logger.warning("OTLP ingest failed after %d attempts: %s", max_attempts, e)
break

result = {
"weight": 1,
"unit": "ops",
"success": False,
"error-type": last_error_type,
"request-size-bytes": len(body),
}
if last_status is not None:
result["request-status"] = last_status
return result

def _backoff(self, attempt: int, base: float) -> float:
"""Exponential backoff with full jitter, capped at MAX_BACKOFF_SECONDS."""
cap = min(self._MAX_BACKOFF_SECONDS, base * (2**attempt))
return random.uniform(0, cap)

def __repr__(self, *args, **kwargs):
return "otlp-ingest"


class ForceMerge(Runner):
"""
Runs a force merge operation against Elasticsearch.
Expand Down
Loading
Loading