Skip to content

[BUG]: Elasticsearch/OpenSearch integration: span left open on TransportError causes zombie spans #17100

@garrettladley

Description

@garrettladley

Tracer Version(s)

4.3.2

Python Version(s)

Python 3.12

Pip Version(s)

uv-pip 0.5.20

Bug Report

When an OpenSearch/Elasticsearch TransportError (e.g. a 400) is raised during perform_request, the span is never closed. The generator in _get_perform_request_async is abandoned without calling coro.close(), so the with pin.tracer.trace(...) context manager's __exit__ doesn't run until Python GC collects the generator. This creates zombie spans that report durations of minutes (the GC delay) instead of the actual millisecond request time.

ddtrace/contrib/internal/elasticsearch/patch.py:

async def _perform_request(func, instance, args, kwargs):
    coro = _perform_request_coro(func, instance, args, kwargs)
    result = await next(coro)    # <-- raises TransportError
    try:
        coro.send(result)        # <-- never reached
    except StopIteration:
        pass
    return result

When await next(coro) raises, coro.send(result) is skipped. The generator is abandoned with the trace span's with block still open. span.finish() only runs when GC eventually collects the generator and sends GeneratorExit.

Suggested Fix

async def _perform_request(func, instance, args, kwargs):
    coro = _perform_request_coro(func, instance, args, kwargs)
    try:
        result = await next(coro)
    except Exception:
        coro.close()
        raise
    try:
        coro.send(result)
    except StopIteration:
        pass
    return result

Happy to open a PR with this fix if helpful

Reproduction Code

DD_TRACE_DEBUG=true DD_SERVICE=ddtrace-opensearch-repro uv run ddtrace-run python repro.py

repro.py

import asyncio
import gc

import ddtrace.auto  # important: enables ddtrace patching before opensearchpy import
from opensearchpy import AsyncOpenSearch


gc.disable()  # makes the orphaned span easier to observe deterministically


async def main():
    client = AsyncOpenSearch(hosts=[{"host": "localhost", "port": 9200}])

    try:
        await client.search(
            index="nonexistent_index",
            body={"query": {"match_all": {}}},
        )
    except Exception as exc:
        print(f"caught: {type(exc).__name__}: {exc}")
        # Keep the exception/traceback alive so the abandoned generator is not finalized yet.
        await asyncio.sleep(30)
    finally:
        await client.close()

Error Logs

No response

Libraries in Use

opensearch-py==2.8.0

Operating System

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions