diff --git a/create-notice.sh b/create-notice.sh index 60d965d7b..202929106 100755 --- a/create-notice.sh +++ b/create-notice.sh @@ -70,6 +70,9 @@ function main { add_license "wheel" "https://raw.githubusercontent.com/pypa/wheel/main/LICENSE.txt" add_license "pip" "https://raw.githubusercontent.com/pypa/pip/main/LICENSE.txt" add_license "boto3" "https://raw.githubusercontent.com/boto/boto3/develop/LICENSE" + # OTLP corpus preparation dependencies (otlp extra) + add_license "opentelemetry-proto" "https://raw.githubusercontent.com/open-telemetry/opentelemetry-python/main/LICENSE" + add_license "protobuf" "https://raw.githubusercontent.com/protocolbuffers/protobuf/main/LICENSE" # transitive dependencies # Jinja2 dependencies diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 64086c4c5..aab67b1b9 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -2129,16 +2129,19 @@ def _parse_headers(e): # Some runners return a raw response, causing the 'error' property to be a string literal of the bytes/BytesIO object, # we should avoid bubbling that up # e.g. ApiError(413, '<_io.BytesIO object at 0xffffaf146a70>') + # errors="replace" so binary response bodies (e.g. OTLP ingest returns binary protobuf + # error frames) don't crash the driver with UnicodeDecodeError. Undecodable bytes become + # U+FFFD in the logged/recorded message. if isinstance(e.body, bytes): # could be an empty body - if error_body := e.body.decode("utf-8"): + if error_body := e.body.decode("utf-8", errors="replace"): error_message = error_body else: # to be consistent with an empty 'e.error' error_message = str(None) elif isinstance(e.body, BytesIO): # could be an empty body - if error_body := e.body.read().decode("utf-8"): + if error_body := e.body.read().decode("utf-8", errors="replace"): error_message = error_body else: # to be consistent with an empty 'e.error' @@ -2146,17 +2149,17 @@ def _parse_headers(e): # fallback to 'error' property if the body isn't bytes/BytesIO else: if isinstance(e.error, bytes): - error_message = e.error.decode("utf-8") + error_message = e.error.decode("utf-8", errors="replace") elif isinstance(e.error, BytesIO): - error_message = e.error.read().decode("utf-8") + error_message = e.error.read().decode("utf-8", errors="replace") else: # if the 'error' is empty, we get back str(None) error_message = e.error if isinstance(e.info, bytes): - error_info = e.info.decode("utf-8") + error_info = e.info.decode("utf-8", errors="replace") elif isinstance(e.info, BytesIO): - error_info = e.info.read().decode("utf-8") + error_info = e.info.read().decode("utf-8", errors="replace") else: error_info = e.info diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index a5c139968..d44e7b250 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -46,6 +46,7 @@ def register_default_runners(config: Optional[types.Config] = None): register_runner(track.OperationType.Bulk, BulkIndex(), async_runner=True) + register_runner(track.OperationType.OtlpIngest, OtlpIngest(), async_runner=True) register_runner(track.OperationType.ForceMerge, ForceMerge(), async_runner=True) register_runner(track.OperationType.IndexStats, Retry(IndicesStats()), async_runner=True) register_runner(track.OperationType.NodeStats, NodeStats(), async_runner=True) @@ -806,6 +807,171 @@ def __repr__(self, *args, **kwargs): return "bulk-index" +class _ProtobufSerializer: + """Passthrough serializer for pre-serialized binary protobuf payloads.""" + + mimetype = "application/x-protobuf" + + def dumps(self, data): + if isinstance(data, (bytes, bytearray)): + return bytes(data) + raise TypeError(f"Expected bytes for application/x-protobuf body, got {type(data).__name__}") + + def loads(self, data): + return data + + +def _decode_error_body(body) -> str: + """Best-effort string-rep of an ApiError body for logging. ES returns a mix of UTF-8 text and + binary protobuf framing for OTLP errors (e.g. 429s come back as protobuf with embedded text).""" + if body is None: + return "" + if hasattr(body, "read"): + body = body.read() + if isinstance(body, (bytes, bytearray)): + return bytes(body).decode("utf-8", errors="replace") + return str(body) + + +class OtlpIngest(Runner): + """ + Sends a pre-serialized OTLP ExportMetricsServiceRequest (binary protobuf) to Elasticsearch. + """ + + DEFAULT_ENDPOINT = "/_otlp/v1/metrics" + _PROTOBUF_MIMETYPE = "application/x-protobuf" + # statuses we treat as transient and retry with backoff. Matches elastic-transport's + # default retry_on_status, but we add proper exponential backoff between attempts. + _RETRYABLE_STATUSES = frozenset({429, 502, 503, 504}) + _MAX_BACKOFF_SECONDS = 30.0 + + async def __call__(self, es, params): + """ + :param es: The Elasticsearch client. + :param params: A hash with all parameters. + + Mandatory parameters: + * ``body``: Raw binary protobuf payload (bytes). + + Optional parameters: + * ``endpoint``: OTLP endpoint path. Defaults to ``/_otlp/v1/metrics``. + * ``request-timeout``: Client-side timeout in seconds. + * ``retries-on-error``: Number of retries on retryable errors (429, 502, 503, 504, connection + errors). Defaults to 5. + * ``retry-wait-period``: Base seconds for exponential backoff with full jitter between + retries. Defaults to 0.5 (so attempts sleep up to 0.5, 1, 2, 4, 8 ... seconds, capped at 30s). + """ + body = mandatory(params, "body", self) + path = params.get("endpoint", self.DEFAULT_ENDPOINT) + max_retries = int(params.get("retries-on-error", 5)) + retry_wait_base = float(params.get("retry-wait-period", 0.5)) + + # max_retries=0 on the transport — our outer loop handles retries with proper backoff. + # Without this the transport would fire its own 4 fast back-to-back retries on 429 before + # our backoff ever gets a chance, hammering an already-overloaded ES. + transport_params = {"max_retries": 0} + if "request-timeout" in params: + transport_params["request_timeout"] = params["request-timeout"] + es = es.options(**transport_params) + + # elastic-transport has no built-in serializer for application/x-protobuf; register a + # passthrough serializer so it forwards the already-serialized bytes without modification. + serializers = es.transport.serializers.serializers + if self._PROTOBUF_MIMETYPE not in serializers: + serializers[self._PROTOBUF_MIMETYPE] = _ProtobufSerializer() + + # Local imports follow the existing pattern in this module for the elasticsearch dependency. + # pylint: disable=import-outside-toplevel + import elasticsearch + from elasticsearch import AsyncElasticsearch + + headers = {"Content-Type": self._PROTOBUF_MIMETYPE} + + max_attempts = max_retries + 1 + last_status: Optional[int] = None + last_error_type = "rejected" + for attempt in range(max_attempts): + try: + # Bypass RallyAsyncElasticsearch.perform_request to avoid injecting + # ES REST compatibility headers (Accept: application/vnd.elasticsearch+...) + # that OTLP endpoints do not understand. + await AsyncElasticsearch.perform_request( + es, + method="POST", + path=path, + headers=headers, + body=body, + ) + return { + "weight": 1, + "unit": "ops", + "success": True, + "request-status": 200, + "request-size-bytes": len(body), + } + except elasticsearch.ApiError as e: + last_status = e.status_code + if e.status_code in self._RETRYABLE_STATUSES: + last_error_type = "backpressure" if e.status_code == 429 else "transport" + if attempt < max_attempts - 1: + sleep_s = self._backoff(attempt, retry_wait_base) + self.logger.info( + "OTLP ingest got HTTP %s, retrying in %.2fs (attempt %d/%d).", + e.status_code, + sleep_s, + attempt + 2, + max_attempts, + ) + await asyncio.sleep(sleep_s) + continue + self.logger.warning( + "OTLP ingest failed after %d attempts: HTTP %s — %s", + max_attempts, + e.status_code, + _decode_error_body(e.body), + ) + else: + # non-retryable HTTP error (e.g. 400/401/403/404) — fail fast + last_error_type = "rejected" + self.logger.warning("OTLP ingest failed: HTTP %s — %s", e.status_code, _decode_error_body(e.body)) + break + except (elasticsearch.ConnectionError, elasticsearch.ConnectionTimeout) as e: + last_status = None + last_error_type = "transport" + if attempt < max_attempts - 1: + sleep_s = self._backoff(attempt, retry_wait_base) + self.logger.info( + "OTLP ingest connection error (%s), retrying in %.2fs (attempt %d/%d).", + type(e).__name__, + sleep_s, + attempt + 2, + max_attempts, + ) + await asyncio.sleep(sleep_s) + continue + self.logger.warning("OTLP ingest failed after %d attempts: %s", max_attempts, e) + break + + result = { + "weight": 1, + "unit": "ops", + "success": False, + "error-type": last_error_type, + "request-size-bytes": len(body), + } + if last_status is not None: + result["request-status"] = last_status + return result + + def _backoff(self, attempt: int, base: float) -> float: + """Exponential backoff with full jitter, capped at MAX_BACKOFF_SECONDS.""" + cap = min(self._MAX_BACKOFF_SECONDS, base * (2**attempt)) + return random.uniform(0, cap) + + def __repr__(self, *args, **kwargs): + return "otlp-ingest" + + class ForceMerge(Runner): """ Runs a force merge operation against Elasticsearch. diff --git a/esrally/track/loader.py b/esrally/track/loader.py index 443704ca2..6f929bc71 100644 --- a/esrally/track/loader.py +++ b/esrally/track/loader.py @@ -340,6 +340,20 @@ def first_existing(root_dirs, f): return p return None + def first_existing_with_suffix(root_dirs, f, suffix): + for root_dir in root_dirs: + candidate = os.path.join(root_dir, f) + if os.path.exists(candidate + suffix): + return candidate + return None + + def first_existing_with_any_suffix(root_dirs, f, suffixes): + for suffix in suffixes: + resolved = first_existing_with_suffix(root_dirs, f, suffix) + if resolved is not None: + return resolved + return None + for corpus in t.corpora: data_root = data_dir(cfg, t.name, corpus.name) for document_set in corpus.documents: @@ -347,7 +361,15 @@ def first_existing(root_dirs, f): if document_set.document_archive: document_set.document_archive = first_existing(data_root, document_set.document_archive) if document_set.document_file: - document_set.document_file = first_existing(data_root, document_set.document_file) + resolved = first_existing(data_root, document_set.document_file) + # For OTLP corpora, the hot path reads the .pb (and .offset) — the source JSON is + # only needed during prepare-track when generating the corpus file locally. If only + # the binary corpus has been downloaded, the JSON path won't exist; resolve + # document_file to the path the JSON *would* have so the derived ``.pb`` path is + # correct. + if resolved is None and document_set.is_otlp: + resolved = first_existing_with_any_suffix(data_root, document_set.document_file, (".pb",)) + document_set.document_file = resolved def is_simple_track_mode(cfg: types.Config): @@ -507,6 +529,14 @@ def on_prepare_track(self, track, data_root_dir) -> Generator[tuple[Callable, di "preparator": prep, "document_set": document_set, } + elif document_set.is_otlp: + yield prepare_otlp_document, { + "cfg": self.cfg, + "track": track, + "corpus": corpus, + "preparator": prep, + "document_set": document_set, + } def prepare_document(cfg: types.Config, track, corpus, preparator, document_set): @@ -519,6 +549,15 @@ def prepare_document(cfg: types.Config, track, corpus, preparator, document_set) preparator.prepare_document_set(document_set, data_root[1]) +def prepare_otlp_document(cfg: types.Config, track, corpus, preparator, document_set): + data_root = data_dir(cfg, track.name, corpus.name) + LOG.info("Resolved data root directory for OTLP corpus [%s] in track [%s] to [%s].", corpus.name, track.name, data_root) + if len(data_root) == 1: + preparator.prepare_otlp_document_set(document_set, data_root[0]) + elif not preparator.prepare_bundled_otlp_document_set(document_set, data_root[0]): + preparator.prepare_otlp_document_set(document_set, data_root[1]) + + class Decompressor: def decompress(self, archive_path, documents_path, uncompressed_size): @@ -751,6 +790,137 @@ def prepare_bundled_document_set(self, document_set, data_root): else: return False + def prepare_otlp_document_set(self, document_set, data_root): + """ + Prepares an OTLP binary protobuf corpus file locally. + + Strategy: + 1. If a valid corpus file already exists, nothing to do. + 2. Try downloading a compressed corpus (using the same compression as the JSON corpus, if any), + or the uncompressed corpus directly. Either avoids downloading the much larger JSON source. + 3. Download and decompress the JSON source, then convert it to ``.pb`` locally. + + :param document_set: A document set with source_format == SOURCE_FORMAT_OTLP_PROTOBUF. + :param data_root: The data root directory for this document set. + """ + doc_path = os.path.join(data_root, document_set.document_file) + archive_path = os.path.join(data_root, document_set.document_archive) if document_set.has_compressed_corpus() else None + pb_file = io.OtlpProtobufFile.for_source_file(doc_path) + + # 1. Valid corpus already present + if pb_file.is_valid(): + return + + # 2. Try downloading the corpus file directly — avoids downloading the larger JSON source. + # Prefer the compressed variant (matching the JSON corpus compression) since .pb files + # can be tens of GB and zstd-compressed protobuf is typically 2–4× smaller. + if document_set.base_url and self._try_download_pb(document_set, doc_path, pb_file): + return + + # 3. Ensure JSON source is available + while True: + if self.is_locally_available(doc_path) and self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes): + break + if ( + archive_path + and self.is_locally_available(archive_path) + and self.has_expected_size(archive_path, document_set.compressed_size_in_bytes) + ): + self.decompressor.decompress(archive_path, doc_path, document_set.uncompressed_size_in_bytes) + else: + if document_set.has_compressed_corpus(): + target_path = archive_path + expected_size = document_set.compressed_size_in_bytes + elif document_set.has_uncompressed_corpus(): + target_path = doc_path + expected_size = document_set.uncompressed_size_in_bytes + else: + raise exceptions.RallyAssertionError(f"Track {self.track_name} specifies documents but no corpus") + try: + self.downloader.download(document_set.base_url, target_path, expected_size) + except exceptions.DataError as e: + if e.message == "Cannot download data because no base URL is provided." and self.is_locally_available(target_path): + raise exceptions.DataError( + f"[{target_path}] is present but does not have the expected " + f"size of [{expected_size}] bytes and it cannot be downloaded " + f"because no base URL is provided." + ) from None + raise + + # 4. Convert JSON to .pb + pb_file.create() + + def _try_download_pb(self, document_set, doc_path, pb_file) -> bool: + """ + Try to fetch a pre-built corpus file from the corpus base URL. If the JSON corpus is + compressed (document_archive ends in .zst/.gz/.bz2/etc.), try the matching .{ext} first + and decompress on the fly; otherwise try the uncompressed corpus directly. + + :return: True if a valid corpus file is now on disk, False if nothing was downloaded. + """ + pb_path = pb_file.pb_path + + # try compressed first if the JSON corpus uses an archive + if document_set.has_compressed_corpus(): + _, archive_ext = io.splitext(document_set.document_archive) + if archive_ext and archive_ext in io.SUPPORTED_ARCHIVE_FORMATS: + pb_archive_path = pb_path + archive_ext + try: + self.downloader.download(document_set.base_url, pb_archive_path) + except exceptions.DataError: + LOG.debug("Compressed .pb%s not available remotely, will try uncompressed .pb.", archive_ext) + else: + self.decompressor.decompress(pb_archive_path, pb_path, uncompressed_size=None) + # archive no longer needed once decompressed + try: + os.remove(pb_archive_path) + except OSError: + pass + if pb_file.is_valid(): + return True + + # uncompressed .pb fallback + try: + self.downloader.download(document_set.base_url, pb_path) + except exceptions.DataError: + return False + return pb_file.is_valid() + + def prepare_bundled_otlp_document_set(self, document_set, data_root): + """ + Prepares a bundled OTLP document set (files in the same directory as the track). + + :return: True if the corpus file is ready, False if required files were not found locally. + """ + doc_path = os.path.join(data_root, document_set.document_file) + archive_path = os.path.join(data_root, document_set.document_archive) if document_set.has_compressed_corpus() else None + pb_file = io.OtlpProtobufFile.for_source_file(doc_path) + + if pb_file.is_valid(): + return True + + if self.is_locally_available(doc_path): + if self.has_expected_size(doc_path, document_set.uncompressed_size_in_bytes): + pb_file.create() + return True + else: + raise exceptions.DataError( + f"[{doc_path}] is present but does not have the expected size " f"of [{document_set.uncompressed_size_in_bytes}] bytes." + ) + + if archive_path and self.is_locally_available(archive_path): + if self.has_expected_size(archive_path, document_set.compressed_size_in_bytes): + self.decompressor.decompress(archive_path, doc_path, document_set.uncompressed_size_in_bytes) + pb_file.create() + return True + else: + raise exceptions.DataError( + f"[{archive_path}] is present but does not have " + f"the expected size of [{document_set.compressed_size_in_bytes}] bytes." + ) + + return False + class TemplateSource: """ @@ -1585,6 +1755,29 @@ def _create_corpora(self, corpora_specs, indices, data_streams): meta_data=doc_meta_data, ) corpus.documents.append(docs) + elif source_format == track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF: + source_file = self._r(doc_spec, "source-file") + if io.is_archive(source_file): + document_archive = source_file + document_file = io.splitext(source_file)[0] + else: + document_archive = None + document_file = source_file + num_docs = self._r(doc_spec, "document-count") + compressed_bytes = self._r(doc_spec, "compressed-bytes", mandatory=False) + uncompressed_bytes = self._r(doc_spec, "uncompressed-bytes", mandatory=False) + doc_meta_data = self._r(doc_spec, "meta", error_ctx=name, mandatory=False) + docs = track.Documents( + source_format=source_format, + document_file=document_file, + document_archive=document_archive, + base_url=base_url, + number_of_documents=num_docs, + compressed_size_in_bytes=compressed_bytes, + uncompressed_size_in_bytes=uncompressed_bytes, + meta_data=doc_meta_data, + ) + corpus.documents.append(docs) else: self._error("Unknown source-format [%s] in document corpus [%s]." % (source_format, name)) document_corpora.append(corpus) diff --git a/esrally/track/params.py b/esrally/track/params.py index 91c7816de..e957d55f1 100644 --- a/esrally/track/params.py +++ b/esrally/track/params.py @@ -719,6 +719,191 @@ def params(self): raise exceptions.RallyError("Do not use a BulkIndexParamSource without partitioning") +class OtlpParamSource(ParamSource): + """ + Parameter source for OTLP binary protobuf corpus files (source_format: otlp-proto). + + Reads pre-generated ExportMetricsServiceRequest records from a .pb file. + Supports multi-client partitioning via the companion .pb.offset index. + """ + + def __init__(self, track_obj, params, **kwargs): + super().__init__(track_obj, params, **kwargs) + self._partition_index = 0 + self._total_partitions = 1 + # Streaming state — generators can't be pickled, so they're created lazily on the first + # params() call inside the worker process (after Rally has done its actor pickling). + # The cursor and end_record bounds let us track progress without materializing records. + self._record_iter = None + self._cursor = 0 + self._partition_size: int | None = None + self.looped = params.get("looped", False) + + otlp_docs = self._find_otlp_docs() + if not otlp_docs: + requested = self._params.get("corpora") + if requested: + raise exceptions.InvalidSyntax( + f"No OTLP corpus matching 'corpora'={requested!r} found in track [{track_obj}]. " + f"Available corpora: {[c.name for c in self.track.corpora]}." + ) + raise exceptions.InvalidSyntax( + f"No OTLP corpus found in track [{track_obj}]. " + f"Add at least one document corpus with source_format={track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF!r}." + ) + # expose corpora so used_corpora() in loader.py includes OTLP corpora in the prepare-track phase + seen_names: set[str] = set() + self.corpora = [] + for corpus, _ in otlp_docs: + if corpus.name not in seen_names: + seen_names.add(corpus.name) + self.corpora.append(corpus) + # use the first matching document set + _, self._doc = otlp_docs[0] + + def _find_otlp_docs(self): + # honor the operation's "corpora" param so a track with multiple OTLP corpora can pick + # between them — without this we'd silently use whichever corpus comes first in track.corpora. + track_corpora_names = [corpus.name for corpus in self.track.corpora] + corpora_names = self._params.get("corpora", track_corpora_names) + if isinstance(corpora_names, str): + corpora_names = [corpora_names] + return [ + (corpus, doc) + for corpus in self.track.corpora + if corpus.name in corpora_names + for doc in corpus.documents + if doc.source_format == track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF + ] + + def partition(self, partition_index, total_partitions): + # pylint: disable=protected-access + # the "copy" is another OtlpParamSource instance, so accessing its private state is fine + copy = OtlpParamSource.__new__(OtlpParamSource) + copy.__dict__.update(self.__dict__) + copy._partition_index = partition_index + copy._total_partitions = total_partitions + copy._record_iter = None # streaming iterator created lazily on first params() call + copy._cursor = 0 + copy._partition_size = None + return copy + + def _total_records(self): + """ + Source of truth for partitioning: count records in the .pb file directly. The track's + document-count is only used as a fallback (e.g. when the .pb doesn't exist yet, such as + during initial track parsing — corpora preparation comes later). Trusting the track value + unconditionally is a footgun: if it doesn't match the actual .pb, most workers either get + empty partitions or seek past EOF and the benchmark silently finishes after only one + client does any work. + """ + if not hasattr(self, "_cached_total_records"): + pb_file = io.OtlpProtobufFile.for_source_file(self._doc.document_file) + actual = pb_file.count_records() + self._cached_total_records = actual if actual is not None else self._doc.number_of_documents + return self._cached_total_records + + def size(self): + """ + Return the partition size so Rally treats this as a finite (non-infinite) source. + Without this, the base class returns None → infinite=True → Rally defaults + iterations=1 and each worker runs exactly one operation. + """ + total_records = self._total_records() + if total_records <= 0: + return None + if self._total_partitions > 1: + records_per_partition = total_records / self._total_partitions + start_record = round(records_per_partition * self._partition_index) + end_record = round(records_per_partition * (self._partition_index + 1)) + return end_record - start_record + return total_records + + @property + def percent_completed(self): + """ + Fraction of this partition's records that have been yielded so far. + + Rally pulls this per-client and averages across all clients to render the [N% done] bar. + Without this property the bar stays stuck at 0% because the loop control falls into the + ``infinite`` branch (we don't set a time_period/iterations explicitly — the param source + itself terminates the task by raising StopIteration in non-looped mode). + + Returns ``None`` in looped mode because the cursor cycles back to 0 indefinitely, so a + cursor-based percentage is meaningless. The schedule's time_period / iterations bounds + progress instead in that case. + """ + if self.looped: + return None + partition_size = self.size() + if not partition_size: + return None + return min(self._cursor / partition_size, 1.0) + + def _open_iter(self): + """ + Create a new streaming iterator over this partition's records. We do NOT materialize them + into memory — for large corpora (e.g. 1 MB protobuf records, 6k records per partition) that + would consume 6+ GB per worker. The iterator holds an open file handle and yields one + record at a time; the file handle closes when the generator completes or is GC'd. + """ + if not self._doc.document_file: + raise exceptions.SystemSetupError( + f"OTLP corpus document_file is unset for [{self._doc}]. This usually means neither the " + "source .json nor the pre-built .pb was found in any data root after prepare-track. " + "Check that the prepare-track phase completed successfully (look for the " + "'Successfully downloaded binary protobuf file from ...' log line) and that the " + "data directory is the same as Rally is reading from." + ) + pb_file = io.OtlpProtobufFile.for_source_file(self._doc.document_file) + total_records = self._total_records() + + if total_records > 0 and self._total_partitions > 1: + records_per_partition = total_records / self._total_partitions + start_record = round(records_per_partition * self._partition_index) + end_record = round(records_per_partition * (self._partition_index + 1)) + else: + start_record = 0 + end_record = None + + # cache the partition size so percent_completed/size don't have to recompute + if self._partition_size is None: + self._partition_size = (end_record - start_record) if end_record is not None else total_records + + logger = logging.getLogger(__name__) + logger.info( + "OtlpParamSource partition %d/%d: total_records=%s start=%d end=%s (streaming, not preloading)", + self._partition_index, + self._total_partitions, + total_records, + start_record, + end_record, + ) + return pb_file.read_records(start_record, end_record) + + def params(self): + if self._record_iter is None: + self._record_iter = self._open_iter() + + try: + payload = next(self._record_iter) + except StopIteration: + if not self.looped: + raise + # restart the iterator from the start of this partition. If even the fresh iterator + # is empty (partition has no records at all), the StopIteration here propagates. + self._record_iter = self._open_iter() + self._cursor = 0 + payload = next(self._record_iter) + + self._cursor += 1 + + result = {"body": payload} + if "request-timeout" in self._params: + result["request-timeout"] = self._params["request-timeout"] + return result + + class PartitionBulkIndexParamSource: def __init__( self, @@ -1431,6 +1616,7 @@ def read_bulk(self): register_param_source_for_operation(track.OperationType.Sleep, SleepParamSource) register_param_source_for_operation(track.OperationType.ForceMerge, ForceMergeParamSource) register_param_source_for_operation(track.OperationType.Downsample, DownsampleParamSource) +register_param_source_for_operation(track.OperationType.OtlpIngest, OtlpParamSource) # Also register by name, so users can use it too register_param_source_for_name("file-reader", BulkIndexParamSource) diff --git a/esrally/track/track.py b/esrally/track/track.py index 5d38af7a6..050797b14 100644 --- a/esrally/track/track.py +++ b/esrally/track/track.py @@ -181,6 +181,7 @@ def __eq__(self, other): class Documents: SOURCE_FORMAT_BULK = "bulk" + SOURCE_FORMAT_OTLP_PROTOBUF = "otlp-proto" def __init__( self, @@ -275,6 +276,10 @@ def number_of_lines(self): def is_bulk(self): return self.source_format == Documents.SOURCE_FORMAT_BULK + @property + def is_otlp(self): + return self.source_format == Documents.SOURCE_FORMAT_OTLP_PROTOBUF + def __str__(self): return "%s documents from %s" % (self.source_format, self.document_file) @@ -745,6 +750,7 @@ class OperationType(Enum): # this is classed the same as RawRequest, but could potentially be used to call endpoints that are blocked RunUntil = (58, AdminStatus.No, serverless.Status.Public) EnrichPolicy = (59, AdminStatus.Yes, serverless.Status.Public) + OtlpIngest = (60, AdminStatus.No, serverless.Status.Public) def __init__(self, id: int, admin_status: AdminStatus, serverless_status: serverless.Status): self.id = id @@ -884,6 +890,8 @@ def from_hyphenated_string(cls, v): return OperationType.RunUntil elif v == "enrich-policy": return OperationType.EnrichPolicy + elif v == "otlp-ingest": + return OperationType.OtlpIngest else: raise KeyError(f"No enum value for [{v}]") diff --git a/esrally/utils/io.py b/esrally/utils/io.py index 2a52c0719..de1450237 100644 --- a/esrally/utils/io.py +++ b/esrally/utils/io.py @@ -23,11 +23,12 @@ import mmap import os import shutil +import struct import subprocess import sys import tarfile import zipfile -from collections.abc import Collection, Mapping, Sequence +from collections.abc import Collection, Iterator, Mapping, Sequence from types import TracebackType from typing import IO, Any, AnyStr, Callable, Generic, Literal, Optional, overload @@ -37,6 +38,7 @@ # but they are treated the same by mypy, so I'm not going to use conditional imports here from typing_extensions import Self +from esrally import exceptions from esrally.utils import console, net SUPPORTED_ARCHIVE_FORMATS = [".zip", ".bz2", ".gz", ".tar", ".tar.gz", ".tgz", ".tar.bz2", ".zst"] @@ -705,6 +707,406 @@ def remove_file_offset_table(data_file_path: str) -> None: FileOffsetTable.remove(data_file_path) +def _convert_lines_batch(lines: list[str]) -> tuple[bytes, int]: + """ + Worker function for parallel OTLP JSON → binary protobuf conversion. + + Each worker process imports the protobuf bindings lazily on first call (this happens once + per worker process, since the function is a module-level callable that ``ProcessPoolExecutor`` + pickles by reference). Returns ``(concatenated_records_bytes, record_count)`` for the batch + in source order. + + Format of returned bytes: a concatenation of length-prefixed records, each ``4 bytes big-endian + length || payload``. This is exactly the on-disk format the main process appends to the corpus + file. + """ + # pylint: disable=import-outside-toplevel,no-name-in-module + from google.protobuf.json_format import Parse + from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( + ExportMetricsServiceRequest, + ) + + parts: list[bytes] = [] + for line in lines: + msg = Parse(line, ExportMetricsServiceRequest()) + payload = msg.SerializeToString() + parts.append(struct.pack(">I", len(payload))) + parts.append(payload) + return b"".join(parts), len(lines) + + +class OtlpProtobufFile: + """ + Manages the binary protobuf corpus file derived from an OTLP JSON source. + + On-disk format: sequence of length-prefixed records — + 4-byte big-endian uint32 (payload length) + binary ExportMetricsServiceRequest bytes. + + A companion ``{pb_path}.offset`` file maps record numbers to byte offsets for efficient + multi-client partitioning, using the same ``record_number;byte_offset`` text format as + FileOffsetTable. One entry is written every OFFSET_SAMPLING_INTERVAL records. + """ + + OFFSET_SAMPLING_INTERVAL = 1000 + + def __init__(self, source_json_path: str, pb_path: str): + self.source_json_path = source_json_path + self.pb_path = pb_path + + def exists(self) -> bool: + return os.path.exists(self.pb_path) and os.path.getsize(self.pb_path) > 0 + + def is_valid(self) -> bool: + if not self.exists(): + return False + # if the source JSON is present, the .pb must be newer than it + if os.path.exists(self.source_json_path): + if os.path.getmtime(self.pb_path) < os.path.getmtime(self.source_json_path): + return False + return True + + def try_download_from_corpus_location(self, corpus_base_url: str | None) -> bool: + """ + Attempts to download the pre-built .pb file from the corpus URL. Also attempts to + download the companion .pb.offset file; if that's not available, partitioning will + still work by scanning the .pb from the start (just slightly slower on startup). + + :return: True if the .pb file was downloaded successfully, False otherwise. + """ + if not corpus_base_url: + return False + logger = logging.getLogger(__name__) + pb_name = os.path.basename(self.pb_path) + remote_url = f"{corpus_base_url.rstrip('/')}/{pb_name}" + logger.info("Attempting to download binary protobuf file from [%s]", remote_url) + os.makedirs(os.path.dirname(self.pb_path), exist_ok=True) + try: + net.download(remote_url, self.pb_path) + logger.info("Successfully downloaded binary protobuf file from [%s]", remote_url) + except Exception: + logger.debug("Could not download binary protobuf file from [%s]", remote_url) + return False + + # Best-effort: also fetch the offset index. Failure is non-fatal — read_records() + # falls back to scanning from the start of the .pb if .offset is missing. + offset_path = self.pb_path + ".offset" + offset_url = f"{corpus_base_url.rstrip('/')}/{os.path.basename(offset_path)}" + try: + net.download(offset_url, offset_path) + logger.info("Successfully downloaded offset index from [%s]", offset_url) + except Exception: + logger.debug("Could not download offset index from [%s] (will scan .pb directly)", offset_url) + + return True + + # batch size for parallel conversion. Smaller batches → less memory per in-flight batch. + # The actual memory cost per batch is ~5× the raw input size (Python object overhead + parsed + # protobuf message tree during conversion), so 500 lines × ~50 KB ≈ ~125 MB per in-flight batch. + _CONVERSION_BATCH_SIZE = 500 + # in-flight queue depth. Just enough to keep workers fed while the main thread writes the next + # completed batch to disk — adding more buffers grows memory without much throughput benefit. + _QUEUE_BUFFER = 4 + + def create(self, workers: int | None = None) -> int: + """ + Parse the source OTLP JSON file and write binary protobuf records to the .pb file, + also writing a companion .offset file for fast multi-client partitioning. + + JSON→protobuf conversion is parallelized across processes since each line is independent. + Results are gathered in source order so the .pb byte offsets stay correct. Memory usage + is bounded by limiting the number of in-flight batches — we do NOT buffer the whole input + file (which is what ``ProcessPoolExecutor.map`` would do, since it eagerly consumes its + iterable up front). + + :param workers: Number of worker processes for conversion. Defaults to ``os.cpu_count()``. + Peak memory ≈ ``workers × 325 MB`` regardless of source file size: + ~200 MB per worker process (interpreter + loaded protobuf bindings) plus + ~125 MB per in-flight batch (input strings + parsed proto tree + output). + Override via ``RALLY_OTLP_CONVERSION_WORKERS`` if you need to cap memory. + :return: Total number of records written. + :raises exceptions.SystemSetupError: if opentelemetry-proto is not installed. + """ + # opentelemetry-proto is an optional dependency, only needed when preparing an OTLP corpus. + # we probe the import here so we fail fast with a clear message rather than inside the worker. + # pylint: disable=import-outside-toplevel,unused-import + try: + import opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 # noqa: F401 + except ImportError: + raise exceptions.SystemSetupError( + "The 'opentelemetry-proto' package is required to pre-process OTLP corpus files. " + "Install it with: pip install opentelemetry-proto" + ) + + import collections # pylint: disable=import-outside-toplevel + import concurrent.futures # pylint: disable=import-outside-toplevel + + if workers and workers > 0: + worker_count = workers + else: + worker_count = os.cpu_count() or 1 + # Bounded queue depth = workers + small buffer. Pickling each batch costs ~5× the raw input + # size (input list lives in main + queue + worker simultaneously, plus the parsed protobuf + # message tree dominates worker memory during JSON→proto conversion). Keeping the queue tight + # caps peak memory at roughly ``(workers + _QUEUE_BUFFER) × ~125 MB`` plus ~200 MB per worker + # process for the interpreter and loaded protobuf bindings. + max_in_flight = worker_count + self._QUEUE_BUFFER + + logger = logging.getLogger(__name__) + logger.info( + "Converting OTLP JSON to binary protobuf using %d worker(s) (max in-flight batches: %d).", + worker_count, + max_in_flight, + ) + + offset_path = self.pb_path + ".offset" + record_count = 0 + byte_offset = 0 + batch_index = 0 + # 8 MB output buffer for .pb — much larger than Python's default 8 KB, reduces syscall + # overhead noticeably for multi-gigabyte writes. + with ( + open(self.pb_path, "wb", buffering=8 * 1024 * 1024) as dst, + open(offset_path, "w", encoding="utf-8", buffering=64 * 1024) as off, + concurrent.futures.ProcessPoolExecutor(max_workers=worker_count) as pool, + ): + batch_iter = self._iter_line_batches(self._CONVERSION_BATCH_SIZE) + in_flight: collections.deque = collections.deque() + + # prime the pipeline with up to max_in_flight batches + for batch in batch_iter: + in_flight.append(pool.submit(_convert_lines_batch, batch)) + if len(in_flight) >= max_in_flight: + break + + # consume one result at a time (FIFO preserves source order) and submit a replacement + while in_flight: + chunk_bytes, chunk_record_count = in_flight.popleft().result() + # one offset entry at the start of each batch (BATCH_SIZE == OFFSET_SAMPLING_INTERVAL) + off.write(f"{record_count};{byte_offset}\n") + dst.write(chunk_bytes) + record_count += chunk_record_count + byte_offset += len(chunk_bytes) + batch_index += 1 + if batch_index % 50 == 0: + logger.info( + "OTLP conversion progress: %d records, %d MB written.", + record_count, + byte_offset // (1024 * 1024), + ) + # keep the pipeline full by submitting the next batch (if any) + try: + in_flight.append(pool.submit(_convert_lines_batch, next(batch_iter))) + except StopIteration: + pass + + return record_count + + def _iter_line_batches(self, batch_size: int) -> Iterator[list[str]]: + """Stream the source JSON file as batches of non-blank lines (each line stripped).""" + batch: list[str] = [] + with open(self.source_json_path, encoding="utf-8", buffering=8 * 1024 * 1024) as src: + for line in src: + line = line.strip() + if not line: + continue + batch.append(line) + if len(batch) >= batch_size: + yield batch + batch = [] + if batch: + yield batch + + def read_records(self, start_record: int, end_record: int | None = None) -> Iterator[bytes]: + """ + Generator yielding raw binary payloads from start_record up to (but not including) end_record. + Uses the companion .offset file to seek efficiently. + """ + seek_byte, records_to_skip = self._find_offset(start_record) + with open(self.pb_path, "rb") as f: + f.seek(seek_byte) + for _ in range(records_to_skip): + length_data = f.read(4) + if len(length_data) < 4: + return + (length,) = struct.unpack(">I", length_data) + f.seek(length, 1) + count = 0 + target = None if end_record is None else (end_record - start_record) + while target is None or count < target: + length_data = f.read(4) + if len(length_data) < 4: + return + (length,) = struct.unpack(">I", length_data) + payload = f.read(length) + if len(payload) < length: + return + count += 1 + yield payload + + def count_records(self) -> int | None: + """ + Return the exact number of records in the .pb file. Uses the companion .pb.offset index + (entries every OFFSET_SAMPLING_INTERVAL records) to jump near the end, then scans only the + final partial chunk by reading length prefixes — so this is fast even for multi-GB files. + + Side effect: if no .pb.offset is present, generates one while scanning the file. The + offset file lets subsequent workers (and future runs) seek directly to their partition's + start record instead of walking from byte 0 — a big win for high-index partitions on + multi-GB corpora. + + Returns ``None`` if the .pb file is missing. + """ + if not os.path.exists(self.pb_path): + return None + offset_path = self.pb_path + ".offset" + # if an offset file exists, use it to skip to the last sampled position before scanning the tail + last_record = 0 + last_byte = 0 + if os.path.exists(offset_path): + try: + with open(offset_path, encoding="utf-8") as f: + for line in f: + parts = line.strip().split(";") + if len(parts) != 2: + continue + try: + rec_num, byte_off = int(parts[0]), int(parts[1]) + except ValueError: + continue + if rec_num >= last_record: + last_record = rec_num + last_byte = byte_off + except OSError: + pass + + # If no offset file exists, generate one as we scan. Write to a temp file and atomically + # rename at the end so concurrent workers don't see a half-written file. If another worker + # beat us to the rename, that's fine — both versions of the file are byte-identical. + offset_tmp_path: str | None = None + offset_out: Optional[IO[str]] = None + if not os.path.exists(offset_path): + offset_tmp_path = f"{offset_path}.tmp.{os.getpid()}" + offset_out = open(offset_tmp_path, "w", encoding="utf-8", buffering=64 * 1024) + logging.getLogger(__name__).info("Generating %s while scanning .pb (one-time cost per machine).", offset_path) + + count = last_record + try: + with open(self.pb_path, "rb") as f: + f.seek(last_byte) + byte_offset = last_byte + while True: + if offset_out is not None and count % self.OFFSET_SAMPLING_INTERVAL == 0: + offset_out.write(f"{count};{byte_offset}\n") + header = f.read(4) + if len(header) < 4: + break + (length,) = struct.unpack(">I", header) + # skip the payload without reading it into memory + f.seek(length, 1) + byte_offset += 4 + length + count += 1 + except OSError: + if offset_out is not None and offset_tmp_path is not None: + offset_out.close() + try: + os.remove(offset_tmp_path) + except OSError: + pass + return None + + if offset_out is not None and offset_tmp_path is not None: + offset_out.close() + try: + os.replace(offset_tmp_path, offset_path) + except OSError: + # best-effort — if we can't rename (e.g. another worker beat us), clean up our temp + try: + os.remove(offset_tmp_path) + except OSError: + pass + return count + + def _find_offset(self, target_record: int) -> tuple[int, int]: + """Return (byte_offset, records_still_to_skip) for the sampled position closest to target_record.""" + offset_path = self.pb_path + ".offset" + if not os.path.exists(offset_path): + return 0, target_record + prior_byte = 0 + prior_remaining = target_record + try: + with open(offset_path, encoding="utf-8") as f: + for line in f: + parts = line.strip().split(";") + if len(parts) != 2: + continue + rec_num, byte_off = int(parts[0]), int(parts[1]) + if rec_num <= target_record: + prior_byte = byte_off + prior_remaining = target_record - rec_num + else: + break + except OSError: + pass + return prior_byte, prior_remaining + + @classmethod + def for_source_file(cls, source_json_path: str) -> "OtlpProtobufFile": + if not source_json_path: + raise ValueError( + "OtlpProtobufFile.for_source_file got an empty/None source path. " + "This usually means set_absolute_data_path could not resolve the corpus path: " + "neither the source .json nor the pre-built .pb was found in any data root. " + "Check that prepare-track ran successfully and the .pb is on disk where Rally expects it." + ) + return cls(source_json_path, f"{source_json_path}.pb") + + +def prepare_otlp_protobuf_file(source_json_path: str, corpus_base_url: str | None) -> int | None: + """ + Ensures a binary protobuf (.pb) file exists for the given OTLP JSON corpus. + + Strategy: + 1. If .pb is already valid locally, return None immediately. + 2. Try downloading the .pb from corpus_base_url (avoids downloading the larger JSON source). + 3. If JSON is present locally, convert it to .pb. + + Returns the record count if the .pb was created locally, or None if it already existed or + was downloaded. Returns None without creating the file if the JSON source is absent and the + download failed — the caller is responsible for ensuring the JSON is present if needed. + """ + pb_file = OtlpProtobufFile.for_source_file(source_json_path) + if pb_file.is_valid(): + return None + + if corpus_base_url: + console.info( + "Attempting to download binary protobuf file for [%s] ... " % os.path.basename(source_json_path), + end="", + flush=True, + ) + if pb_file.try_download_from_corpus_location(corpus_base_url) and pb_file.is_valid(): + console.println("[DOWNLOADED]") + return None + console.println("[NOT FOUND - will create locally]") + + if not os.path.exists(source_json_path): + return None + + console.info( + "Converting OTLP JSON to binary protobuf for [%s] ... " % os.path.basename(source_json_path), + end="", + flush=True, + ) + # honor RALLY_OTLP_CONVERSION_WORKERS for environments where the default (os.cpu_count()) needs + # tuning. Each worker process consumes ~200 MB for the interpreter + protobuf bindings, plus an + # in-flight batch of ~125 MB while it's parsing. Reduce on low-RAM machines, leave alone otherwise. + workers_env = os.environ.get("RALLY_OTLP_CONVERSION_WORKERS") + workers = int(workers_env) if workers_env and workers_env.isdigit() else None + record_count = pb_file.create(workers=workers) + console.println("[OK]") + return record_count + + def skip_lines(data_file_path: str, data_file: IO[AnyStr], number_of_lines_to_skip: int) -> None: """ Skips the first `number_of_lines_to_skip` lines in `data_file` as a side effect. diff --git a/pyproject.toml b/pyproject.toml index 141c96302..d4d30b416 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,6 +101,15 @@ dependencies = [ ] [project.optional-dependencies] +# These packages are required for pre-processing OTLP JSON corpus files into binary protobuf format. +# Only needed during corpus preparation (prepare-track phase), not during the benchmark hot path. +otlp = [ + # License: Apache 2.0 + "opentelemetry-proto==1.34.0", + # License: BSD + "protobuf>=5.29.0", +] + # These packages are required to download files from a private AWS S3 bucket s3 = [ # License: Apache 2.0 @@ -115,6 +124,9 @@ s3 = [ develop = [ # s3 "boto3==1.34.68", + # otlp — required for the OtlpProtobufFile / OtlpParamSource / OtlpIngest test suites + "opentelemetry-proto==1.34.0", + "protobuf>=5.29.0", # tests "ujson", "pytest==8.4.2", @@ -137,6 +149,7 @@ develop = [ "types-tabulate==0.8.9", "types-requests>=2.31.0.7,<3", "types-jsonschema==3.2.0", + "types-protobuf>=4.24,<8", # Python dead library removed in version 3.13 and used to build Rally docs. "standard-imghdr==3.13.0", ] diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 66a35295c..9d6cba7d7 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# pylint: disable=protected-access import asyncio import collections @@ -8658,3 +8659,278 @@ async def test_delete_is_false(self, create_mock, refresh_mock, exec_mock): def test_str(self): assert str(runner.EnrichPolicy()) == "enrich-policy" + + +class TestProtobufSerializer: + def test_dumps_bytes_passes_through(self): + serializer = runner._ProtobufSerializer() + payload = b"\x08\x96\x01" # arbitrary protobuf-shaped bytes + assert serializer.dumps(payload) == payload + + def test_dumps_rejects_non_bytes(self): + serializer = runner._ProtobufSerializer() + with pytest.raises(TypeError): + serializer.dumps({"not": "bytes"}) + + def test_loads_returns_data_unchanged(self): + serializer = runner._ProtobufSerializer() + assert serializer.loads(b"\x00\x01") == b"\x00\x01" + + def test_mimetype(self): + assert runner._ProtobufSerializer.mimetype == "application/x-protobuf" + + +class TestOtlpIngestRunner: + _headers = HttpHeaders() + _node = NodeConfig(scheme="http", host="localhost", port=9200) + _OK_META = ApiResponseMeta(status=200, http_version="1.1", headers=_headers, duration=0.1, node=_node) + + def _make_es_mock(self): + es = mock.MagicMock() + # transport.serializers.serializers is a dict the runner mutates to register the protobuf serializer + es.transport.serializers.serializers = {} + # es.options(**kw) returns the same client so chained mutations stay observable + es.options.return_value = es + return es + + @pytest.mark.asyncio + async def test_posts_to_default_endpoint_with_protobuf_body(self): + es = self._make_es_mock() + body = b"\x0a\x05hello" + + with mock.patch( + "elasticsearch.AsyncElasticsearch.perform_request", + new=mock.AsyncMock(return_value=ApiResponse(body=io.BytesIO(b""), meta=self._OK_META)), + ) as pr: + result = await runner.OtlpIngest()(es, {"body": body}) + + pr.assert_awaited_once() + kwargs = pr.await_args.kwargs + assert kwargs["method"] == "POST" + assert kwargs["path"] == "/_otlp/v1/metrics" + assert kwargs["headers"] == {"Content-Type": "application/x-protobuf"} + assert kwargs["body"] is body + assert result == { + "weight": 1, + "unit": "ops", + "success": True, + "request-status": 200, + "request-size-bytes": len(body), + } + + @pytest.mark.asyncio + async def test_custom_endpoint(self): + es = self._make_es_mock() + body = b"\x0a\x05world" + + with mock.patch( + "elasticsearch.AsyncElasticsearch.perform_request", + new=mock.AsyncMock(return_value=ApiResponse(body=io.BytesIO(b""), meta=self._OK_META)), + ) as pr: + await runner.OtlpIngest()(es, {"body": body, "endpoint": "/custom/otlp"}) + + assert pr.await_args.kwargs["path"] == "/custom/otlp" + + @pytest.mark.asyncio + async def test_request_timeout_applied_via_options(self): + es = self._make_es_mock() + body = b"\x0a\x01" + + with mock.patch( + "elasticsearch.AsyncElasticsearch.perform_request", + new=mock.AsyncMock(return_value=ApiResponse(body=io.BytesIO(b""), meta=self._OK_META)), + ): + await runner.OtlpIngest()(es, {"body": body, "request-timeout": 42}) + + # both transport retry disable AND custom timeout flow through es.options + es.options.assert_called_once_with(max_retries=0, request_timeout=42) + + @pytest.mark.asyncio + async def test_transport_retries_always_disabled(self): + # Our outer retry loop handles backoff; the transport's same-node retries fire without delay + # and would just hammer an overloaded ES, so we always disable them. + es = self._make_es_mock() + + with mock.patch( + "elasticsearch.AsyncElasticsearch.perform_request", + new=mock.AsyncMock(return_value=ApiResponse(body=io.BytesIO(b""), meta=self._OK_META)), + ): + await runner.OtlpIngest()(es, {"body": b"\x0a"}) + + es.options.assert_called_once_with(max_retries=0) + + @pytest.mark.asyncio + async def test_registers_protobuf_serializer(self): + es = self._make_es_mock() + assert "application/x-protobuf" not in es.transport.serializers.serializers + + with mock.patch( + "elasticsearch.AsyncElasticsearch.perform_request", + new=mock.AsyncMock(return_value=ApiResponse(body=io.BytesIO(b""), meta=self._OK_META)), + ): + await runner.OtlpIngest()(es, {"body": b"\x0a"}) + + registered = es.transport.serializers.serializers["application/x-protobuf"] + assert isinstance(registered, runner._ProtobufSerializer) + + @pytest.mark.asyncio + async def test_does_not_replace_existing_serializer(self): + es = self._make_es_mock() + existing = runner._ProtobufSerializer() + es.transport.serializers.serializers["application/x-protobuf"] = existing + + with mock.patch( + "elasticsearch.AsyncElasticsearch.perform_request", + new=mock.AsyncMock(return_value=ApiResponse(body=io.BytesIO(b""), meta=self._OK_META)), + ): + await runner.OtlpIngest()(es, {"body": b"\x0a"}) + + # the existing serializer instance is preserved + assert es.transport.serializers.serializers["application/x-protobuf"] is existing + + @pytest.mark.asyncio + async def test_missing_body_raises(self): + es = self._make_es_mock() + with pytest.raises(exceptions.DataError): + await runner.OtlpIngest()(es, {}) + + @pytest.mark.asyncio + async def test_non_retryable_api_error_returns_failure_dict(self): + # 4xx errors other than 429 are not retryable (e.g. malformed request) — we surface them + # as a failed operation so the benchmark continues with a non-zero error rate. + es = self._make_es_mock() + err_meta = ApiResponseMeta(status=400, http_version="1.1", headers=self._headers, duration=0.1, node=self._node) + api_error = elasticsearch.BadRequestError(message="bad", meta=err_meta, body={"error": "boom"}) + + with mock.patch( + "elasticsearch.AsyncElasticsearch.perform_request", + new=mock.AsyncMock(side_effect=api_error), + ) as pr: + result = await runner.OtlpIngest()(es, {"body": b"\x0a"}) + + # only one attempt — no retry for non-retryable status + assert pr.await_count == 1 + assert result["success"] is False + assert result["request-status"] == 400 + assert result["error-type"] == "rejected" + + @pytest.mark.asyncio + async def test_429_retried_with_backoff_then_succeeds(self): + # 429 should trigger our retry loop with backoff. On the third attempt the mock returns OK + # and we expect a success result (with no leakage of the prior failed attempts). + es = self._make_es_mock() + err_meta = ApiResponseMeta(status=429, http_version="1.1", headers=self._headers, duration=0.1, node=self._node) + api_error = elasticsearch.ApiError(message="too many", meta=err_meta, body=b"\x12\xc5\x01rejected") + + ok_response = ApiResponse(body=io.BytesIO(b""), meta=self._OK_META) + # first two attempts → 429, third → 200 + perform = mock.AsyncMock(side_effect=[api_error, api_error, ok_response]) + + sleeps: list[float] = [] + + async def fake_sleep(s): + sleeps.append(s) + + with ( + mock.patch("elasticsearch.AsyncElasticsearch.perform_request", new=perform), + mock.patch("esrally.driver.runner.asyncio.sleep", new=fake_sleep), + ): + result = await runner.OtlpIngest()(es, {"body": b"\x0a"}) + + assert perform.await_count == 3 + # two sleeps for the two retries + assert len(sleeps) == 2 + # each sleep is non-negative (full-jitter exponential — value depends on random.uniform but is bounded) + assert all(s >= 0 for s in sleeps) + assert result["success"] is True + assert result["request-status"] == 200 + + @pytest.mark.asyncio + async def test_429_retries_exhausted_returns_backpressure_failure(self): + # All retries fail with 429 → we should return a failure dict with error-type=backpressure. + es = self._make_es_mock() + err_meta = ApiResponseMeta(status=429, http_version="1.1", headers=self._headers, duration=0.1, node=self._node) + api_error = elasticsearch.ApiError(message="too many", meta=err_meta, body=b"backpressure body") + + with ( + mock.patch( + "elasticsearch.AsyncElasticsearch.perform_request", + new=mock.AsyncMock(side_effect=api_error), + ) as pr, + mock.patch("esrally.driver.runner.asyncio.sleep", new=mock.AsyncMock()), + ): + result = await runner.OtlpIngest()(es, {"body": b"\x0a", "retries-on-error": 2}) + + # 2 retries + 1 initial attempt = 3 attempts + assert pr.await_count == 3 + assert result == { + "weight": 1, + "unit": "ops", + "success": False, + "error-type": "backpressure", + "request-status": 429, + "request-size-bytes": 1, + } + + @pytest.mark.asyncio + async def test_5xx_retried_like_429(self): + es = self._make_es_mock() + err_meta = ApiResponseMeta(status=503, http_version="1.1", headers=self._headers, duration=0.1, node=self._node) + api_error = elasticsearch.ApiError(message="unavailable", meta=err_meta, body=b"") + + with ( + mock.patch( + "elasticsearch.AsyncElasticsearch.perform_request", + new=mock.AsyncMock(side_effect=api_error), + ) as pr, + mock.patch("esrally.driver.runner.asyncio.sleep", new=mock.AsyncMock()), + ): + result = await runner.OtlpIngest()(es, {"body": b"\x0a", "retries-on-error": 1}) + + assert pr.await_count == 2 # 1 retry + 1 initial + assert result["error-type"] == "transport" + assert result["request-status"] == 503 + + @pytest.mark.asyncio + async def test_connection_error_retried(self): + es = self._make_es_mock() + with ( + mock.patch( + "elasticsearch.AsyncElasticsearch.perform_request", + new=mock.AsyncMock(side_effect=elasticsearch.ConnectionError("refused")), + ) as pr, + mock.patch("esrally.driver.runner.asyncio.sleep", new=mock.AsyncMock()), + ): + result = await runner.OtlpIngest()(es, {"body": b"\x0a", "retries-on-error": 2}) + + assert pr.await_count == 3 + assert result["success"] is False + assert result["error-type"] == "transport" + assert "request-status" not in result # connection errors have no HTTP status + + @pytest.mark.asyncio + async def test_retry_wait_period_param_honoured(self): + # base=10 with attempt=0 → uniform(0, 10) → at least we know it sleeps once and cap is 10 + es = self._make_es_mock() + err_meta = ApiResponseMeta(status=429, http_version="1.1", headers=self._headers, duration=0.1, node=self._node) + api_error = elasticsearch.ApiError(message="too many", meta=err_meta, body=b"") + ok = ApiResponse(body=io.BytesIO(b""), meta=self._OK_META) + perform = mock.AsyncMock(side_effect=[api_error, ok]) + + captured: list[float] = [] + + async def fake_sleep(s): + captured.append(s) + + with ( + mock.patch("elasticsearch.AsyncElasticsearch.perform_request", new=perform), + mock.patch("esrally.driver.runner.asyncio.sleep", new=fake_sleep), + ): + await runner.OtlpIngest()(es, {"body": b"\x0a", "retry-wait-period": 10}) + + assert len(captured) == 1 + # full-jitter: random.uniform(0, base*2^0) = uniform(0, 10) + assert 0 <= captured[0] <= 10 + + def test_repr(self): + assert repr(runner.OtlpIngest()) == "otlp-ingest" diff --git a/tests/track/loader_test.py b/tests/track/loader_test.py index 4c2ce8753..54b781837 100644 --- a/tests/track/loader_test.py +++ b/tests/track/loader_test.py @@ -915,6 +915,76 @@ def test_prepare_bundled_document_set_uncompressed_docs_wrong_size(self, is_file assert prepare_file_offset_table.call_count == 0 +class TestOtlpDocumentPreparation: + """Tests for the OTLP-specific path in DocumentSetPreparator — specifically the compressed + .pb download support that mirrors the JSON corpus's archive compression.""" + + def _doc_set(self, *, archive=None, compressed_size=0): + return track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + document_file="metrics.otlp.json", + document_archive=archive, + number_of_documents=10, + base_url="http://example.com/otlp", + uncompressed_size_in_bytes=2000, + compressed_size_in_bytes=compressed_size, + ) + + def _preparator(self): + return loader.DocumentSetPreparator( + track_name="unit-test", + downloader=mock.MagicMock(spec=loader.Downloader), + decompressor=mock.MagicMock(spec=loader.Decompressor), + ) + + def test_skips_when_pb_already_valid(self): + p = self._preparator() + with mock.patch.object(io.OtlpProtobufFile, "is_valid", return_value=True): + p.prepare_otlp_document_set(self._doc_set(), data_root="/tmp") + p.downloader.download.assert_not_called() + + def test_tries_compressed_pb_first_when_corpus_is_compressed(self): + p = self._preparator() + # is_valid: False initially, False before download, True after decompress + with mock.patch.object(io.OtlpProtobufFile, "is_valid", side_effect=[False, True]), mock.patch("os.remove"): + p.prepare_otlp_document_set( + self._doc_set(archive="metrics.otlp.json.zst", compressed_size=500), + data_root="/tmp", + ) + + # downloaded the .pb.zst from the base URL + p.downloader.download.assert_called_once_with("http://example.com/otlp", "/tmp/metrics.otlp.json.pb.zst") + # decompressed it into the .pb path + p.decompressor.decompress.assert_called_once_with( + "/tmp/metrics.otlp.json.pb.zst", "/tmp/metrics.otlp.json.pb", uncompressed_size=None + ) + + def test_falls_back_to_uncompressed_pb_when_compressed_unavailable(self): + p = self._preparator() + # first download (compressed) raises DataError, second (uncompressed) succeeds + p.downloader.download.side_effect = [exceptions.DataError("not found"), None] + with mock.patch.object(io.OtlpProtobufFile, "is_valid", side_effect=[False, True]): + p.prepare_otlp_document_set( + self._doc_set(archive="metrics.otlp.json.zst", compressed_size=500), + data_root="/tmp", + ) + + assert p.downloader.download.call_count == 2 + # second call is the uncompressed .pb + assert p.downloader.download.call_args_list[1] == mock.call("http://example.com/otlp", "/tmp/metrics.otlp.json.pb") + # never decompressed + p.decompressor.decompress.assert_not_called() + + def test_skips_compressed_attempt_when_corpus_is_uncompressed(self): + p = self._preparator() + with mock.patch.object(io.OtlpProtobufFile, "is_valid", side_effect=[False, True]): + p.prepare_otlp_document_set(self._doc_set(), data_root="/tmp") + + # only one download — the uncompressed .pb — no archive attempt + p.downloader.download.assert_called_once_with("http://example.com/otlp", "/tmp/metrics.otlp.json.pb") + p.decompressor.decompress.assert_not_called() + + class TestTemplateSource: @mock.patch("esrally.utils.io.dirname") @mock.patch.object(loader.TemplateSource, "read_glob_files") @@ -1542,6 +1612,40 @@ def test_sets_absolute_path(self, path_exists): assert t.corpora[0].documents[0].document_file == "/data/unittest/docs/documents.json" assert t.corpora[0].documents[0].document_archive == "/data/unittest/docs/documents.json.bz2" + @mock.patch("os.path.exists") + def test_otlp_resolves_via_pb_when_json_missing(self, path_exists): + # OTLP corpora are special: the hot path only needs the .pb (and .pb.offset). If we + # downloaded just the .pb (no source JSON), document_file would otherwise stay None and + # the param source would later try to open "None.pb". + def fake_exists(path: str) -> bool: + # JSON doesn't exist locally; only the .pb does + return path.endswith(".pb") + + path_exists.side_effect = fake_exists + + cfg = config.Config() + cfg.add(config.Scope.application, "benchmarks", "local.dataset.cache", "/data") + + t = track.Track( + name="u", + corpora=[ + track.DocumentCorpus( + "otlp", + documents=[ + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + document_file="metrics.otlp.json", + ) + ], + ) + ], + ) + + loader.set_absolute_data_path(cfg, t) + + # document_file points to where the JSON would be — so the derived .pb path is correct + assert t.corpora[0].documents[0].document_file == "/data/otlp/metrics.otlp.json" + class TestTrackFilter: def filter(self, track_specification, *, include_tasks=None, exclude_tasks=None): diff --git a/tests/track/params_test.py b/tests/track/params_test.py index bd598f217..b59c53863 100644 --- a/tests/track/params_test.py +++ b/tests/track/params_test.py @@ -3450,3 +3450,295 @@ def test_downsample_empty_params(self): assert p["fixed-interval"] == "1h" assert p["target-index"] == f"{p['source-index']}-{p['fixed-interval']}" assert p.get("sampling-method") is None + + +class TestOtlpParamSource: + """Tests for OtlpParamSource — covers partitioning, finite size signalling, and looping.""" + + _SAMPLE_OTLP_JSON_LINE = ( + '{"resourceMetrics":[{"resource":{"attributes":[{"key":"host.name","value":{"stringValue":"host-0"}}]},' + '"scopeMetrics":[{"scope":{"name":"hostmetrics"},"metrics":[' + '{"name":"system.cpu.utilization","gauge":{"dataPoints":[' + '{"timeUnixNano":"1700000000000000000","asDouble":0.42,' + '"attributes":[{"key":"cpu","value":{"stringValue":"0"}}]}]}}]}]}]}' + ) + + def _build_corpus(self, tmp_path, num_records, corpus_name="otlp-corpus"): + json_path = tmp_path / "metrics.otlp.json" + json_path.write_text("\n".join([self._SAMPLE_OTLP_JSON_LINE] * num_records) + "\n") + pb = io.OtlpProtobufFile.for_source_file(str(json_path)) + pb.create() + corpus = track.DocumentCorpus( + name=corpus_name, + documents=[ + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + number_of_documents=num_records, + document_file=str(json_path), + ) + ], + ) + return corpus + + def test_selects_corpus_by_operation_param(self, tmp_path): + # two OTLP corpora — make sure the operation's `corpora` param picks the right one + d_a = tmp_path / "a" + d_b = tmp_path / "b" + d_a.mkdir() + d_b.mkdir() + corpus_a = self._build_corpus(d_a, num_records=3, corpus_name="corpus-60m") + corpus_b = self._build_corpus(d_b, num_records=5, corpus_name="corpus-270m") + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus_a, corpus_b]), + params={"corpora": "corpus-270m"}, + ) + # _doc must come from corpus-270m, not the first-listed one (corpus-60m) + assert source._doc.number_of_documents == 5 + # corpora exposed to prepare-track only contains the selected corpus + assert [c.name for c in source.corpora] == ["corpus-270m"] + + def test_selects_first_corpus_when_no_param(self, tmp_path): + # legacy behaviour — without an explicit `corpora` param we fall back to all matching corpora + d_a = tmp_path / "a" + d_b = tmp_path / "b" + d_a.mkdir() + d_b.mkdir() + corpus_a = self._build_corpus(d_a, num_records=3, corpus_name="corpus-60m") + corpus_b = self._build_corpus(d_b, num_records=5, corpus_name="corpus-270m") + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus_a, corpus_b]), + params={}, + ) + # both corpora are visible to prepare-track… + assert [c.name for c in source.corpora] == ["corpus-60m", "corpus-270m"] + # …and we pick the first one as the active document set + assert source._doc.number_of_documents == 3 + + def test_raises_when_corpora_param_doesnt_match(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=3, corpus_name="corpus-60m") + with pytest.raises(exceptions.InvalidSyntax) as exc: + params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={"corpora": "corpus-270m"}, + ) + assert "corpus-270m" in exc.value.args[0] + assert "corpus-60m" in exc.value.args[0] + + def test_raises_when_no_otlp_corpus(self): + bulk_corpus = track.DocumentCorpus( + name="bulk-only", + documents=[ + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_BULK, + number_of_documents=10, + ) + ], + ) + with pytest.raises(exceptions.InvalidSyntax) as exc: + params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[bulk_corpus]), + params={}, + ) + assert "No OTLP corpus" in exc.value.args[0] + + def test_exposes_corpora_for_prepare_track(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=3) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + # used_corpora() in loader.py checks for this attribute + assert source.corpora == [corpus] + + def test_deduplicates_corpora_by_name(self, tmp_path): + # one corpus with two OTLP document sets — should appear only once + json_path = tmp_path / "metrics.otlp.json" + json_path.write_text(self._SAMPLE_OTLP_JSON_LINE + "\n") + io.OtlpProtobufFile.for_source_file(str(json_path)).create() + corpus = track.DocumentCorpus( + name="otlp-corpus", + documents=[ + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + number_of_documents=1, + document_file=str(json_path), + ), + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + number_of_documents=1, + document_file=str(json_path), + ), + ], + ) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + assert source.corpora == [corpus] + + def test_size_uses_actual_pb_count_not_document_count(self, tmp_path): + # Critical correctness test: if the track's document-count doesn't match the actual .pb, + # we MUST use the actual count or partitioning silently breaks (most workers seek past EOF + # or get empty partitions, and only one client ends up doing any work). + json_path = tmp_path / "metrics.otlp.json" + json_path.write_text("\n".join([self._SAMPLE_OTLP_JSON_LINE] * 100) + "\n") + io.OtlpProtobufFile.for_source_file(str(json_path)).create() + # track claims 10 documents but the .pb actually has 100 + corpus = track.DocumentCorpus( + name="otlp-corpus", + documents=[ + track.Documents( + source_format=track.Documents.SOURCE_FORMAT_OTLP_PROTOBUF, + number_of_documents=10, + document_file=str(json_path), + ) + ], + ) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + p = source.partition(0, 4) + # Partition size should be derived from the ACTUAL 100 records, not the (wrong) 10 + assert p.size() == 25 + # And all 100 records should actually be reachable across 4 partitions + sizes = [source.partition(i, 4).size() for i in range(4)] + assert sum(sizes) == 100 + + def test_size_returns_finite_value_not_none(self, tmp_path): + # critical: size() must NOT be None, otherwise Rally treats us as infinite and defaults iterations=1 + corpus = self._build_corpus(tmp_path, num_records=10) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + assert source.size() == 10 + assert source.infinite is False + + def test_size_accounts_for_partition(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=100) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + # 100 records / 8 partitions = 12 or 13 per partition (depending on rounding) + sizes = [source.partition(i, 8).size() for i in range(8)] + assert sum(sizes) == 100 + # each worker should have at least 12, at most 13 + assert all(12 <= s <= 13 for s in sizes) + + def test_partition_returns_separate_instances(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=8) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + p0 = source.partition(0, 4) + p1 = source.partition(1, 4) + # different instances with different state + assert p0 is not p1 + assert p0._partition_index == 0 + assert p1._partition_index == 1 + # but shared (deduplicated) reference to the underlying document set + assert p0._doc is p1._doc + + def test_params_yields_full_corpus_across_partitions(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=8) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + all_bodies = [] + for i in range(4): + p = source.partition(i, 4) + while True: + try: + all_bodies.append(p.params()["body"]) + except StopIteration: + break + # 8 records total across 4 partitions + assert len(all_bodies) == 8 + + def test_params_raises_stop_iteration_when_exhausted(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=2) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + p = source.partition(0, 1) + p.params() + p.params() + with pytest.raises(StopIteration): + p.params() + + def test_params_loops_when_looped_true(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=2) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={"looped": True}, + ) + p = source.partition(0, 1) + # take 5 records from a 2-record corpus + bodies = [p.params()["body"] for _ in range(5)] + # should cycle through the 2 records + assert bodies[0] == bodies[2] == bodies[4] + assert bodies[1] == bodies[3] + + def test_percent_completed_progresses_with_cursor(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=10) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + p = source.partition(0, 1) + assert p.percent_completed == 0.0 # before any params() call + for i in range(1, 11): + p.params() + assert p.percent_completed == i / 10 + + def test_percent_completed_is_none_when_looped(self, tmp_path): + # in looped mode the cursor cycles back to 0 — Rally must fall back to time/iteration + # based progress, so we return None to avoid misleading numbers. + corpus = self._build_corpus(tmp_path, num_records=3) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={"looped": True}, + ) + p = source.partition(0, 1) + assert p.percent_completed is None + for _ in range(7): + p.params() + assert p.percent_completed is None + + def test_params_propagates_request_timeout(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=1) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={"request-timeout": 30}, + ) + p = source.partition(0, 1) + result = p.params() + assert result["request-timeout"] == 30 + assert "body" in result + + def test_params_body_is_non_empty_bytes(self, tmp_path): + corpus = self._build_corpus(tmp_path, num_records=1) + source = params.OtlpParamSource( + track_obj=track.Track(name="unit-test", corpora=[corpus]), + params={}, + ) + p = source.partition(0, 1) + result = p.params() + assert isinstance(result["body"], bytes) + assert len(result["body"]) > 0 + + def test_registered_for_otlp_ingest_operation(self, tmp_path): + # ensure Rally picks up our class for the otlp-ingest operation type + corpus = self._build_corpus(tmp_path, num_records=1) + source = params.param_source_for_operation( + track.OperationType.OtlpIngest.to_hyphenated_string(), + track.Track(name="unit-test", corpora=[corpus]), + params={}, + task_name="unit-test-task", + ) + assert isinstance(source, params.OtlpParamSource) diff --git a/tests/utils/io_test.py b/tests/utils/io_test.py index d5cdd4557..79651a263 100644 --- a/tests/utils/io_test.py +++ b/tests/utils/io_test.py @@ -292,3 +292,289 @@ def test_returns_none_when_valid_offset_already_exists(self, tmp_path): mock_dl.assert_not_called() assert result is None + + +class TestOtlpProtobufFile: # pylint: disable=too-many-public-methods + """Tests for OTLP JSON → length-prefixed binary protobuf conversion + read-back.""" + + SAMPLE_OTLP_JSON_LINE = ( + '{"resourceMetrics":[{"resource":{"attributes":[{"key":"host.name","value":{"stringValue":"host-0"}}]},' + '"scopeMetrics":[{"scope":{"name":"hostmetrics"},"metrics":[' + '{"name":"system.cpu.utilization","gauge":{"dataPoints":[' + '{"timeUnixNano":"1700000000000000000","asDouble":0.42,' + '"attributes":[{"key":"cpu","value":{"stringValue":"0"}}]}]}}]}]}]}' + ) + + def _write_json_lines(self, tmp_path, lines): + json_path = tmp_path / "metrics.otlp.json" + json_path.write_text("\n".join(lines) + "\n") + return str(json_path) + + def test_for_source_file_derives_pb_path(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + assert pb.source_json_path == json_path + assert pb.pb_path == json_path + ".pb" + + def test_exists_false_when_pb_missing(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + assert pb.exists() is False + + def test_exists_false_when_pb_empty(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + with open(pb.pb_path, "wb"): + pass + assert pb.exists() is False + + def test_is_valid_rejects_pb_older_than_source(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + # write a non-empty .pb but with an older mtime than the source + with open(pb.pb_path, "wb") as f: + f.write(b"\x00\x00\x00\x01x") + json_mtime = os.path.getmtime(json_path) + os.utime(pb.pb_path, (json_mtime - 10, json_mtime - 10)) + assert pb.is_valid() is False + + def test_create_then_read_round_trip(self, tmp_path): + # write 3 identical lines so we get 3 distinct records back + lines = [self.SAMPLE_OTLP_JSON_LINE] * 3 + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + + record_count = pb.create() + + assert record_count == 3 + assert pb.is_valid() is True + # offset index is created alongside + assert os.path.exists(pb.pb_path + ".offset") + # no .count file (we rely on the track's document-count) + assert not os.path.exists(pb.pb_path + ".count") + + records = list(pb.read_records(0, None)) + assert len(records) == 3 + # every record should round-trip identically + assert all(len(r) > 0 for r in records) + assert records[0] == records[1] == records[2] + + def test_create_blank_lines_are_skipped(self, tmp_path): + json_path = self._write_json_lines(tmp_path, ["", self.SAMPLE_OTLP_JSON_LINE, "", self.SAMPLE_OTLP_JSON_LINE, ""]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + assert pb.create() == 2 + + def test_create_with_single_worker_matches_multi_worker(self, tmp_path): + # source must span multiple batches to exercise parallel collection ordering + lines = [self.SAMPLE_OTLP_JSON_LINE] * (io.OtlpProtobufFile._CONVERSION_BATCH_SIZE * 2 + 17) + seq_dir = tmp_path / "seq" + par_dir = tmp_path / "par" + seq_dir.mkdir() + par_dir.mkdir() + json_path_seq = self._write_json_lines(seq_dir, lines) + json_path_par = self._write_json_lines(par_dir, lines) + + pb_seq = io.OtlpProtobufFile.for_source_file(json_path_seq) + pb_par = io.OtlpProtobufFile.for_source_file(json_path_par) + + assert pb_seq.create(workers=1) == len(lines) + assert pb_par.create(workers=4) == len(lines) + + # byte-for-byte identical output regardless of worker count → ordering is preserved + with open(pb_seq.pb_path, "rb") as f1, open(pb_par.pb_path, "rb") as f2: + assert f1.read() == f2.read() + + def test_count_records_returns_none_when_pb_missing(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + assert pb.count_records() is None + + def test_count_records_uses_offset_index(self, tmp_path): + lines = [self.SAMPLE_OTLP_JSON_LINE] * (io.OtlpProtobufFile._CONVERSION_BATCH_SIZE * 2 + 137) + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + assert pb.count_records() == len(lines) + + def test_count_records_generates_offset_index_when_missing(self, tmp_path): + # If the .pb was downloaded without its offset, count_records should regenerate the offset + # on the fly so subsequent runs/partitions can seek efficiently. + lines = [self.SAMPLE_OTLP_JSON_LINE] * (io.OtlpProtobufFile.OFFSET_SAMPLING_INTERVAL + 17) + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + # delete the offset file as if only the .pb was downloaded + os.remove(pb.pb_path + ".offset") + + # call count_records — it should produce an offset file as a side effect + assert pb.count_records() == len(lines) + assert os.path.exists(pb.pb_path + ".offset") + + # confirm the generated offset file is valid: subsequent reads partition correctly + records_via_offset = list(pb.read_records(io.OtlpProtobufFile.OFFSET_SAMPLING_INTERVAL, None)) + assert len(records_via_offset) == 17 + + def test_count_records_works_without_offset_index(self, tmp_path): + lines = [self.SAMPLE_OTLP_JSON_LINE] * 50 + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + os.remove(pb.pb_path + ".offset") + assert pb.count_records() == 50 + + def test_create_offset_file_aligns_with_batches(self, tmp_path): + # 2.5 batches: 3 offset entries (one at the start of each batch) + lines = [self.SAMPLE_OTLP_JSON_LINE] * (io.OtlpProtobufFile._CONVERSION_BATCH_SIZE * 2 + 50) + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create(workers=2) + + with open(pb.pb_path + ".offset") as f: + entries = [line.strip().split(";") for line in f if line.strip()] + assert len(entries) == 3 + # first record numbers are 0, BATCH_SIZE, 2*BATCH_SIZE + assert [int(r) for r, _ in entries] == [ + 0, + io.OtlpProtobufFile._CONVERSION_BATCH_SIZE, + 2 * io.OtlpProtobufFile._CONVERSION_BATCH_SIZE, + ] + # byte offsets are monotonically increasing + byte_offsets = [int(b) for _, b in entries] + assert byte_offsets[0] == 0 + assert byte_offsets[0] < byte_offsets[1] < byte_offsets[2] + + def test_read_records_respects_partition_range(self, tmp_path): + lines = [self.SAMPLE_OTLP_JSON_LINE] * 8 + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + + # 4 partitions across 8 records + slice0 = list(pb.read_records(0, 2)) + slice1 = list(pb.read_records(2, 4)) + slice2 = list(pb.read_records(4, 6)) + slice3 = list(pb.read_records(6, 8)) + assert [len(s) for s in (slice0, slice1, slice2, slice3)] == [2, 2, 2, 2] + # rejoined slices should equal the full read + full = list(pb.read_records(0, None)) + assert slice0 + slice1 + slice2 + slice3 == full + + def test_read_records_handles_start_past_end(self, tmp_path): + lines = [self.SAMPLE_OTLP_JSON_LINE] * 3 + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + # seeking past the end returns no records (does not error) + assert list(pb.read_records(100, 200)) == [] + + def test_read_records_falls_back_without_offset_index(self, tmp_path): + lines = [self.SAMPLE_OTLP_JSON_LINE] * 5 + json_path = self._write_json_lines(tmp_path, lines) + pb = io.OtlpProtobufFile.for_source_file(json_path) + pb.create() + + # remove the offset index — read should still work by scanning from the start + os.remove(pb.pb_path + ".offset") + records = list(pb.read_records(2, 4)) + assert len(records) == 2 + + def test_try_download_returns_false_without_base_url(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + assert pb.try_download_from_corpus_location(None) is False + + def test_try_download_pb_and_offset(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + + downloaded = [] + + def fake_download(url, dest, **kwargs): + downloaded.append(url) + with open(dest, "wb") as f: + f.write(b"\x00") + + with mock.patch("esrally.utils.net.download", side_effect=fake_download): + assert pb.try_download_from_corpus_location("http://example.com/corpus/") is True + + # both .pb and .pb.offset should have been attempted, with trailing slash stripped + assert downloaded == [ + "http://example.com/corpus/metrics.otlp.json.pb", + "http://example.com/corpus/metrics.otlp.json.pb.offset", + ] + + def test_try_download_offset_failure_is_non_fatal(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + + def fake_download(url, dest, **kwargs): + if url.endswith(".offset"): + raise RuntimeError("not found") + with open(dest, "wb") as f: + f.write(b"\x00") + + with mock.patch("esrally.utils.net.download", side_effect=fake_download): + # .pb succeeds, .offset fails → overall result is still True + assert pb.try_download_from_corpus_location("http://example.com/corpus") is True + assert os.path.exists(pb.pb_path) + assert not os.path.exists(pb.pb_path + ".offset") + + def test_try_download_pb_failure_returns_false(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + pb = io.OtlpProtobufFile.for_source_file(json_path) + + with mock.patch("esrally.utils.net.download", side_effect=Exception("404")): + assert pb.try_download_from_corpus_location("http://example.com/corpus") is False + + def test_prepare_skips_when_pb_already_valid(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE] * 3) + # pre-create the .pb so it's already valid + io.OtlpProtobufFile.for_source_file(json_path).create() + + with mock.patch("esrally.utils.net.download") as mock_dl: + result = io.prepare_otlp_protobuf_file(json_path, "http://example.com/corpus") + + mock_dl.assert_not_called() + assert result is None + + def test_prepare_falls_back_to_local_when_download_fails(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE] * 2) + + with mock.patch("esrally.utils.net.download", side_effect=Exception("404")): + result = io.prepare_otlp_protobuf_file(json_path, "http://example.com/corpus") + + assert result == 2 + + def test_prepare_returns_none_when_no_local_json_and_download_fails(self, tmp_path): + # source JSON does not exist + json_path = str(tmp_path / "missing.otlp.json") + + with mock.patch("esrally.utils.net.download", side_effect=Exception("404")): + result = io.prepare_otlp_protobuf_file(json_path, "http://example.com/corpus") + + assert result is None + + def test_prepare_passes_workers_from_env(self, tmp_path): + # env var lets users dial up parallelism without code changes + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE] * 2) + + with ( + mock.patch("esrally.utils.net.download", side_effect=Exception("404")), + mock.patch.object(io.OtlpProtobufFile, "create", return_value=2) as create_mock, + mock.patch.dict(os.environ, {"RALLY_OTLP_CONVERSION_WORKERS": "12"}), + ): + result = io.prepare_otlp_protobuf_file(json_path, None) + + assert result == 2 + create_mock.assert_called_once_with(workers=12) + + def test_prepare_ignores_invalid_workers_env(self, tmp_path): + json_path = self._write_json_lines(tmp_path, [self.SAMPLE_OTLP_JSON_LINE]) + + with ( + mock.patch.object(io.OtlpProtobufFile, "create", return_value=1) as create_mock, + mock.patch.dict(os.environ, {"RALLY_OTLP_CONVERSION_WORKERS": "not-a-number"}), + ): + io.prepare_otlp_protobuf_file(json_path, None) + + create_mock.assert_called_once_with(workers=None) diff --git a/uv.lock b/uv.lock index 737913cae..cf726871d 100644 --- a/uv.lock +++ b/uv.lock @@ -652,7 +652,9 @@ develop = [ { name = "github3-py" }, { name = "gitpython" }, { name = "mypy" }, + { name = "opentelemetry-proto" }, { name = "pre-commit" }, + { name = "protobuf" }, { name = "pylint" }, { name = "pytest" }, { name = "pytest-asyncio" }, @@ -662,11 +664,16 @@ develop = [ { name = "standard-imghdr" }, { name = "trustme" }, { name = "types-jsonschema" }, + { name = "types-protobuf" }, { name = "types-psutil" }, { name = "types-requests" }, { name = "types-tabulate" }, { name = "ujson" }, ] +otlp = [ + { name = "opentelemetry-proto" }, + { name = "protobuf" }, +] s3 = [ { name = "boto3" }, ] @@ -702,8 +709,12 @@ requires-dist = [ { name = "jsonschema", specifier = "==3.1.1" }, { name = "markupsafe", specifier = "==2.0.1" }, { name = "mypy", marker = "extra == 'develop'", specifier = "==1.15.0" }, + { name = "opentelemetry-proto", marker = "extra == 'develop'", specifier = "==1.34.0" }, + { name = "opentelemetry-proto", marker = "extra == 'otlp'", specifier = "==1.34.0" }, { name = "pip", specifier = "==26.1" }, { name = "pre-commit", marker = "extra == 'develop'", specifier = "==2.20.0" }, + { name = "protobuf", marker = "extra == 'develop'", specifier = ">=5.29.0" }, + { name = "protobuf", marker = "extra == 'otlp'", specifier = ">=5.29.0" }, { name = "psutil", specifier = "==5.9.4" }, { name = "py-cpuinfo", specifier = "==7.0.0" }, { name = "pylint", marker = "extra == 'develop'", specifier = "==3.3.8" }, @@ -719,6 +730,7 @@ requires-dist = [ { name = "thespian", specifier = "==4.0.1" }, { name = "trustme", marker = "extra == 'develop'", specifier = "==0.9.0" }, { name = "types-jsonschema", marker = "extra == 'develop'", specifier = "==3.2.0" }, + { name = "types-protobuf", marker = "extra == 'develop'", specifier = ">=4.24,<8" }, { name = "types-psutil", marker = "extra == 'develop'", specifier = "==5.9.4" }, { name = "types-requests", marker = "extra == 'develop'", specifier = ">=2.31.0.7,<3" }, { name = "types-tabulate", marker = "extra == 'develop'", specifier = "==0.8.9" }, @@ -729,7 +741,7 @@ requires-dist = [ { name = "yappi", specifier = "==1.6.10" }, { name = "zstandard", specifier = "==0.25.0" }, ] -provides-extras = ["develop", "s3"] +provides-extras = ["develop", "otlp", "s3"] [package.metadata.requires-dev] dev = [{ name = "esrally", extras = ["develop"], editable = "." }] @@ -1493,6 +1505,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d2/1d/1b658dbd2b9fa9c4c9f32accbfc0205d532c8c6194dc0f2a4c0428e7128a/nodeenv-1.9.1-py2.py3-none-any.whl", hash = "sha256:ba11c9782d29c27c70ffbdda2d7415098754709be8a7056d79a737cd901155c9", size = 22314, upload-time = "2024-06-04T18:44:08.352Z" }, ] +[[package]] +name = "opentelemetry-proto" +version = "1.34.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/95/19/45adb533d0a34990942d12eefb2077d59b22958940c71484a45e694f5dd7/opentelemetry_proto-1.34.0.tar.gz", hash = "sha256:73e40509b692630a47192888424f7e0b8fb19d9ecf2f04e6f708170cd3346dfe", size = 34343, upload-time = "2025-06-04T13:31:35.695Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/db/58/708881f5ad3c72954caa61ac970d3c01209dbebf5e534fb840dfb777bad2/opentelemetry_proto-1.34.0-py3-none-any.whl", hash = "sha256:ffb1f1b27552fda5a1cd581e34243cc0b6f134fb14c1c2a33cc3b4b208c9bf97", size = 55691, upload-time = "2025-06-04T13:31:20.333Z" }, +] + [[package]] name = "packaging" version = "25.0" @@ -1656,6 +1680,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cc/35/cc0aaecf278bb4575b8555f2b137de5ab821595ddae9da9d3cd1da4072c7/propcache-0.3.2-py3-none-any.whl", hash = "sha256:98f1ec44fb675f5052cccc8e609c46ed23a35a1cfd18545ad4e29002d858a43f", size = 12663, upload-time = "2025-06-09T22:56:04.484Z" }, ] +[[package]] +name = "protobuf" +version = "5.29.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/7e/57/394a763c103e0edf87f0938dafcd918d53b4c011dfc5c8ae80f3b0452dbb/protobuf-5.29.6.tar.gz", hash = "sha256:da9ee6a5424b6b30fd5e45c5ea663aef540ca95f9ad99d1e887e819cdf9b8723", size = 425623, upload-time = "2026-02-04T22:54:40.584Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d4/88/9ee58ff7863c479d6f8346686d4636dd4c415b0cbeed7a6a7d0617639c2a/protobuf-5.29.6-cp310-abi3-win32.whl", hash = "sha256:62e8a3114992c7c647bce37dcc93647575fc52d50e48de30c6fcb28a6a291eb1", size = 423357, upload-time = "2026-02-04T22:54:25.805Z" }, + { url = "https://files.pythonhosted.org/packages/1c/66/2dc736a4d576847134fb6d80bd995c569b13cdc7b815d669050bf0ce2d2c/protobuf-5.29.6-cp310-abi3-win_amd64.whl", hash = "sha256:7e6ad413275be172f67fdee0f43484b6de5a904cc1c3ea9804cb6fe2ff366eda", size = 435175, upload-time = "2026-02-04T22:54:28.592Z" }, + { url = "https://files.pythonhosted.org/packages/06/db/49b05966fd208ae3f44dcd33837b6243b4915c57561d730a43f881f24dea/protobuf-5.29.6-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:b5a169e664b4057183a34bdc424540e86eea47560f3c123a0d64de4e137f9269", size = 418619, upload-time = "2026-02-04T22:54:30.266Z" }, + { url = "https://files.pythonhosted.org/packages/b7/d7/48cbf6b0c3c39761e47a99cb483405f0fde2be22cf00d71ef316ce52b458/protobuf-5.29.6-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:a8866b2cff111f0f863c1b3b9e7572dc7eaea23a7fae27f6fc613304046483e6", size = 320284, upload-time = "2026-02-04T22:54:31.782Z" }, + { url = "https://files.pythonhosted.org/packages/e3/dd/cadd6ec43069247d91f6345fa7a0d2858bef6af366dbd7ba8f05d2c77d3b/protobuf-5.29.6-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:e3387f44798ac1106af0233c04fb8abf543772ff241169946f698b3a9a3d3ab9", size = 320478, upload-time = "2026-02-04T22:54:32.909Z" }, + { url = "https://files.pythonhosted.org/packages/5a/cb/e3065b447186cb70aa65acc70c86baf482d82bf75625bf5a2c4f6919c6a3/protobuf-5.29.6-py3-none-any.whl", hash = "sha256:6b9edb641441b2da9fa8f428760fc136a49cf97a52076010cf22a2ff73438a86", size = 173126, upload-time = "2026-02-04T22:54:39.462Z" }, +] + [[package]] name = "psutil" version = "5.9.4" @@ -2312,6 +2350,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/88/f0/7558cd331217021e37975f4f0fb4f58548971d002c6401d5012194fd781b/types_jsonschema-3.2.0-py3-none-any.whl", hash = "sha256:4a8f2e87aa7001361b4c3666565f8684f0e016517228396ac1bffd397d8c3fd0", size = 6619, upload-time = "2021-07-15T18:19:34.203Z" }, ] +[[package]] +name = "types-protobuf" +version = "7.34.1.20260518" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/29/59/e2b13b499d15e6720150c4b1a8d91e31fcacf716b432397475b3151ff7e4/types_protobuf-7.34.1.20260518.tar.gz", hash = "sha256:28cfaded25889cb83ebfb63cfb0a43628f0b6f3785767bec17287dc6468795f2", size = 68936, upload-time = "2026-05-18T06:01:47.332Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/1f/ec5caf72c2e3b688ca3927e0979a04ddad19e1afc4bf1c199bd743e0f419/types_protobuf-7.34.1.20260518-py3-none-any.whl", hash = "sha256:a0a5337413347166439c0e07cbc26c6164d091401c6f01b1dfd8cdb966c4dd8f", size = 85992, upload-time = "2026-05-18T06:01:45.696Z" }, +] + [[package]] name = "types-psutil" version = "5.9.4"