Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions sentry_sdk/integrations/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Supports the low-level `mcp.server.lowlevel.Server` API.
"""

from contextlib import contextmanager
import inspect
from functools import wraps
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -352,6 +353,30 @@ def _prepare_handler_data(
)


@contextmanager
def ensure_span(*args, **kwargs):
"""Ensure a span is created for the current context."""

current_span = sentry_sdk.get_current_span()
transaction_exists = (
current_span is not None and current_span.containing_transaction is not None
)

if transaction_exists:
with sentry_sdk.start_span(*args, **kwargs) as span:
yield span
else:
with sentry_sdk.start_transaction(*args, **kwargs):
with sentry_sdk.start_span(*args, **kwargs) as span:
yield span
# with get_start_span_function()(
# op=OP.MCP_SERVER,
# name=span_name,
# origin=MCPIntegration.origin,
# ) as span:
# yield span


async def _async_handler_wrapper(
handler_type: str,
func: "Callable[..., Any]",
Expand Down Expand Up @@ -382,7 +407,7 @@ async def _async_handler_wrapper(
) = _prepare_handler_data(handler_type, original_args, original_kwargs)

# Start span and execute
with get_start_span_function()(
with ensure_span(
op=OP.MCP_SERVER,
name=span_name,
origin=MCPIntegration.origin,
Expand Down Expand Up @@ -454,7 +479,7 @@ def _sync_handler_wrapper(
) = _prepare_handler_data(handler_type, original_args)

# Start span and execute
with get_start_span_function()(
with ensure_span(
op=OP.MCP_SERVER,
name=span_name,
origin=MCPIntegration.origin,
Expand Down
9 changes: 7 additions & 2 deletions sentry_sdk/integrations/openai_agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
_create_get_model_wrapper,
_create_get_all_tools_wrapper,
_create_run_wrapper,
_create_run_streamed_wrapper,
_patch_agent_run,
_patch_error_tracing,
)
Expand All @@ -25,12 +26,16 @@ def _patch_runner() -> None:
# Create the root span for one full agent run (including eventual handoffs)
# Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around
# agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately.
# TODO-anton: Also patch streaming runner: agents.Runner.run_streamed
agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper(
agents.run.DEFAULT_AGENT_RUNNER.run
)

# Creating the actual spans for each agent run.
# Patch streaming runner
agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper(
agents.run.DEFAULT_AGENT_RUNNER.run_streamed
)

# Creating the actual spans for each agent run (works for both streaming and non-streaming).
_patch_agent_run()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .models import _create_get_model_wrapper # noqa: F401
from .tools import _create_get_all_tools_wrapper # noqa: F401
from .runner import _create_run_wrapper # noqa: F401
from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401
from .agent_run import _patch_agent_run # noqa: F401
from .error_tracing import _patch_error_tracing # noqa: F401
79 changes: 79 additions & 0 deletions sentry_sdk/integrations/openai_agents/patches/agent_run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
from functools import wraps

from sentry_sdk.consts import SPANDATA
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.utils import reraise
from ..spans import (
Expand Down Expand Up @@ -31,6 +32,7 @@ def _patch_agent_run() -> None:

# Store original methods
original_run_single_turn = agents.run.AgentRunner._run_single_turn
original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed
original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs
original_execute_final_output = agents._run_impl.RunImpl.execute_final_output

Expand Down Expand Up @@ -147,10 +149,87 @@ async def patched_execute_final_output(
if agent and context_wrapper and _has_active_agent_span(context_wrapper):
end_invoke_agent_span(context_wrapper, agent, final_output)

# For streaming: close the workflow span if it exists
# (For non-streaming, the workflow span is closed by the context manager in _create_run_wrapper)
if agent and hasattr(agent, "_sentry_workflow_span"):
workflow_span = agent._sentry_workflow_span
workflow_span.__exit__(None, None, None)
delattr(agent, "_sentry_workflow_span")

return result

@wraps(
original_run_single_turn_streamed.__func__
if hasattr(original_run_single_turn_streamed, "__func__")
else original_run_single_turn_streamed
)
async def patched_run_single_turn_streamed(
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
) -> "Any":
"""Patched _run_single_turn_streamed that creates agent invocation spans for streaming.

Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
_run_single_turn_streamed uses positional arguments. The call signature is:
_run_single_turn_streamed(
streamed_result, # args[0]
agent, # args[1]
hooks, # args[2]
context_wrapper, # args[3]
run_config, # args[4]
should_run_agent_start_hooks, # args[5]
tool_use_tracker, # args[6]
all_tools, # args[7]
server_conversation_tracker, # args[8] (optional)
)
"""
# Extract positional arguments (streaming version doesn't use keyword-only args)
streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result")
agent = args[1] if len(args) > 1 else kwargs.get("agent")
context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
should_run_agent_start_hooks = (
args[5] if len(args) > 5 else kwargs.get("should_run_agent_start_hooks")
)

span = getattr(context_wrapper, "_sentry_agent_span", None)
# Start agent span when agent starts (but only once per agent)
if should_run_agent_start_hooks and agent and context_wrapper:
# End any existing span for a different agent
if _has_active_agent_span(context_wrapper):
current_agent = _get_current_agent(context_wrapper)
if current_agent and current_agent != agent:
end_invoke_agent_span(context_wrapper, current_agent)

# Build kwargs dict for span creation (for compatibility with _start_invoke_agent_span)
# Include original_input from streamed_result for request messages
span_kwargs = {
"agent": agent,
"context_wrapper": context_wrapper,
"should_run_agent_start_hooks": should_run_agent_start_hooks,
}
if streamed_result and hasattr(streamed_result, "input"):
span_kwargs["original_input"] = streamed_result.input

span = _start_invoke_agent_span(context_wrapper, agent, span_kwargs)
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
agent._sentry_agent_span = span

# Call original streaming method
try:
result = await original_run_single_turn_streamed(*args, **kwargs)
except Exception as exc:
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)

reraise(*sys.exc_info())

return result

# Apply patches
agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn)
agents.run.AgentRunner._run_single_turn_streamed = classmethod(
patched_run_single_turn_streamed
)
agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs)
agents._run_impl.RunImpl.execute_final_output = classmethod(
patched_execute_final_output
Expand Down
84 changes: 77 additions & 7 deletions sentry_sdk/integrations/openai_agents/patches/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

from sentry_sdk.integrations import DidNotEnable

from ..spans import ai_client_span, update_ai_client_span
from ..spans import (
ai_client_span,
update_ai_client_span,
update_ai_client_span_streaming,
)
from sentry_sdk.consts import SPANDATA

from typing import TYPE_CHECKING
Expand Down Expand Up @@ -37,15 +41,19 @@ def wrapped_get_model(
# because we only patch its direct methods, all underlying data can remain unchanged.
model = copy.copy(original_get_model(agent, run_config))

# Wrap _fetch_response if it exists (for OpenAI models) to capture raw response model
# Capture the request model name for spans (agent.model can be None when using defaults)
request_model_name = model.model if hasattr(model, "model") else str(model)
agent._sentry_request_model = request_model_name

# Wrap _fetch_response if it exists (for OpenAI models) to capture response model
if hasattr(model, "_fetch_response"):
original_fetch_response = model._fetch_response

@wraps(original_fetch_response)
async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any":
response = await original_fetch_response(*args, **kwargs)
if hasattr(response, "model"):
agent._sentry_raw_response_model = str(response.model)
if hasattr(response, "model") and response.model:
agent._sentry_response_model = str(response.model)
return response

model._fetch_response = wrapped_fetch_response
Expand All @@ -57,22 +65,84 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any":
with ai_client_span(agent, kwargs) as span:
result = await original_get_response(*args, **kwargs)

response_model = getattr(agent, "_sentry_raw_response_model", None)
# Get response model captured from _fetch_response
response_model = getattr(agent, "_sentry_response_model", None)
if response_model:
# Set response model on agent span
agent_span = getattr(agent, "_sentry_agent_span", None)
if agent_span:
agent_span.set_data(
SPANDATA.GEN_AI_RESPONSE_MODEL, response_model
)

delattr(agent, "_sentry_raw_response_model")
# Clean up after use
delattr(agent, "_sentry_response_model")

update_ai_client_span(span, agent, kwargs, result, response_model)

return result

model.get_response = wrapped_get_response

# Also wrap stream_response for streaming support
if hasattr(model, "stream_response"):
original_stream_response = model.stream_response

@wraps(original_stream_response)
async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any":
"""
Wrap stream_response to create an AI client span for streaming.
stream_response is an async generator, so we yield events within the span.

Note: stream_response is called with positional args unlike get_response
which uses keyword args. The signature is:
stream_response(
system_instructions, # args[0]
input, # args[1]
model_settings, # args[2]
tools, # args[3]
output_schema, # args[4]
handoffs, # args[5]
tracing, # args[6]
*,
previous_response_id,
conversation_id,
prompt,
)
"""
# Build kwargs dict from positional args for span data capture
span_kwargs = dict(kwargs)
if len(args) > 0:
span_kwargs["system_instructions"] = args[0]
if len(args) > 1:
span_kwargs["input"] = args[1]

with ai_client_span(agent, span_kwargs) as span:
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

streaming_response = None
async for event in original_stream_response(*args, **kwargs):
# Capture the full response from ResponseCompletedEvent
if hasattr(event, "response"):
streaming_response = event.response
yield event

# Update span with response data (usage, output, model)
if streaming_response:
update_ai_client_span_streaming(span, agent, streaming_response)
# Also set response model on agent span
if (
hasattr(streaming_response, "model")
and streaming_response.model
):
agent_span = getattr(agent, "_sentry_agent_span", None)
if agent_span:
agent_span.set_data(
SPANDATA.GEN_AI_RESPONSE_MODEL,
str(streaming_response.model),
)

model.stream_response = wrapped_stream_response

return model

return wrapped_get_model
48 changes: 48 additions & 0 deletions sentry_sdk/integrations/openai_agents/patches/runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
from functools import wraps

import sentry_sdk
Expand Down Expand Up @@ -64,3 +65,50 @@ async def wrapper(*args: "Any", **kwargs: "Any") -> "Any":
return run_result

return wrapper


def _create_run_streamed_wrapper(
original_func: "Callable[..., Any]",
) -> "Callable[..., Any]":
"""
Wraps the agents.Runner.run_streamed method to create a root span for streaming agent workflow runs.

Unlike run(), run_streamed() returns immediately with a RunResultStreaming object
while execution continues in a background task. The workflow span must stay open
throughout the streaming operation and close when streaming completes or is abandoned.

Note: We don't use isolation_scope() here because it uses context variables that
cannot span async boundaries (the __enter__ and __exit__ would be called from
different async contexts, causing ValueError).
"""

@wraps(original_func)
def wrapper(*args: "Any", **kwargs: "Any") -> "Any":
# Clone agent because agent invocation spans are attached per run.
agent = args[0].clone()

# Start workflow span immediately (before run_streamed returns)
workflow_span = agent_workflow_span(agent)
workflow_span.__enter__()

# Store span on agent for cleanup
agent._sentry_workflow_span = workflow_span

args = (agent, *args[1:])

try:
# Call original function to get RunResultStreaming
run_result = original_func(*args, **kwargs)
except Exception as exc:
# If run_streamed itself fails (not the background task), clean up immediately
workflow_span.__exit__(*sys.exc_info())
_capture_exception(exc)
raise exc from None

# Store references for cleanup
run_result._sentry_workflow_span = workflow_span
run_result._sentry_agent = agent

return run_result

return wrapper
6 changes: 5 additions & 1 deletion sentry_sdk/integrations/openai_agents/spans/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from .agent_workflow import agent_workflow_span # noqa: F401
from .ai_client import ai_client_span, update_ai_client_span # noqa: F401
from .ai_client import (
ai_client_span,
update_ai_client_span,
update_ai_client_span_streaming,
) # noqa: F401
from .execute_tool import execute_tool_span, update_execute_tool_span # noqa: F401
from .handoff import handoff_span # noqa: F401
from .invoke_agent import (
Expand Down
Loading
Loading