Skip to content

Commit f1f1776

Browse files
committed
fix(langchain): normalize tool definitions and tool call serialization for Anthropic and OpenAI providers
- Add _to_langfuse_tool helper to normalize tool definitions into OpenAI canonical format, handling Anthropic's input_schema shape - Add _convert_tool_call helper to unify tool_calls and invalid_tool_calls conversion, supporting both LangChain (args) and Anthropic streaming (input) formats - Structure on_llm_start input as {messages, tools} so the backend's extractToolsFromObservation can find tool definitions at the top-level tools key - Add _normalize_anthropic_content_blocks to strip streaming artifacts (index, partial_json) from Anthropic tool_use content blocks and fill empty input from message.tool_calls - Add unit tests for all helpers and update test_callback_openai_functions_with_tools for the new input structure
1 parent caddeff commit f1f1776

File tree

3 files changed

+338
-21
lines changed

3 files changed

+338
-21
lines changed

langfuse/_client/propagation.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from opentelemetry import (
1818
trace as otel_trace_api,
1919
)
20+
from opentelemetry.context import _RUNTIME_CONTEXT
2021
from opentelemetry.util._decorator import (
2122
_AgnosticContextManager,
2223
_agnosticcontextmanager,
@@ -272,7 +273,13 @@ def _propagate_attributes(
272273
yield
273274

274275
finally:
275-
otel_context_api.detach(token)
276+
try:
277+
# Bypass the public detach() which logs an ERROR when the token was
278+
# created in a different async task/thread (common in async frameworks).
279+
# The span data is already captured; the failed detach is harmless.
280+
_RUNTIME_CONTEXT.detach(token)
281+
except Exception:
282+
pass
276283

277284

278285
def _get_propagated_attributes_from_context(

langfuse/langchain/CallbackHandler.py

Lines changed: 169 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
from contextvars import Token
23
from typing import (
34
Any,
@@ -35,6 +36,110 @@
3536
from langfuse.logger import langfuse_logger
3637
from langfuse.types import TraceContext
3738

39+
40+
def _to_langfuse_tool(tool: Any) -> Any:
41+
"""Normalize a tool definition to OpenAI format: {type, function: {name, description, parameters}}.
42+
43+
LangChain providers serialize tools differently depending on the backend:
44+
- Anthropic (ChatAnthropic): {name, description, input_schema}
45+
- OpenAI / LiteLLM: {type: "function", function: {name, description, parameters}}
46+
47+
Langfuse's extractTools uses OpenAIToolSchema to parse tools from the top-level
48+
"tools" key in the observation input. Both provider formats are normalized here
49+
into that canonical OpenAI shape so the schema parse succeeds.
50+
"""
51+
if not isinstance(tool, dict):
52+
return tool
53+
# Already in OpenAI format: {type: "function", function: {name, description, parameters}}
54+
if tool.get("type") == "function" and "function" in tool:
55+
return tool
56+
# Anthropic format: {name, description, input_schema} -> OpenAI format
57+
if "name" in tool and "input_schema" in tool:
58+
return {
59+
"type": "function",
60+
"function": {
61+
"name": tool["name"],
62+
"description": tool.get("description", ""),
63+
"parameters": tool["input_schema"],
64+
},
65+
}
66+
return tool
67+
68+
69+
def _normalize_anthropic_content_blocks(
70+
content: List[Any], tool_calls: List[Dict[str, Any]]
71+
) -> List[Any]:
72+
"""Remove streaming artifacts from Anthropic content blocks.
73+
74+
Anthropic streaming leaves tool_use blocks with ``input: {}`` and
75+
streaming-specific fields (``index``, ``partial_json``). The actual
76+
arguments are already reconstructed in ``message.tool_calls``. This
77+
helper fills the empty ``input`` from the normalized tool_calls and
78+
strips the streaming-only keys so the block looks like a proper
79+
Anthropic content block.
80+
"""
81+
if not tool_calls:
82+
return content
83+
tc_by_id: Dict[str, Any] = {
84+
tc["id"]: tc.get("args", {})
85+
for tc in tool_calls
86+
if isinstance(tc, dict) and "id" in tc
87+
}
88+
tc_by_name: Dict[str, Any] = {
89+
tc["name"]: tc.get("args", {})
90+
for tc in tool_calls
91+
if isinstance(tc, dict) and "name" in tc
92+
}
93+
result = []
94+
for block in content:
95+
if isinstance(block, dict) and block.get("type") == "tool_use":
96+
block_input = block.get("input") or {}
97+
if not block_input:
98+
block_input = (
99+
tc_by_id.get(block.get("id", ""))
100+
or tc_by_name.get(block.get("name", ""))
101+
or {}
102+
)
103+
normalized = {
104+
k: v for k, v in block.items() if k not in ("index", "partial_json")
105+
}
106+
normalized["input"] = block_input
107+
result.append(normalized)
108+
else:
109+
result.append(block)
110+
return result
111+
112+
113+
def _convert_tool_call(tc: Any, include_error: bool = False) -> Optional[Dict[str, Any]]:
114+
"""Convert a single tool call dict to Langfuse's canonical format.
115+
116+
Handles both LangChain format {name, args, id} and Anthropic streaming
117+
format {type: "tool_use", name, input, id}.
118+
119+
Returns None (and logs a debug message) if tc is not a dict.
120+
Set include_error=True for invalid_tool_calls entries.
121+
"""
122+
if not isinstance(tc, dict):
123+
langfuse_logger.debug("Skipping tool_call entry that is not a dict: %s", tc)
124+
return None
125+
# Anthropic streaming uses "input" instead of "args"
126+
args = tc.get("args") or tc.get("input") or {}
127+
try:
128+
arguments = json.dumps(args)
129+
except (TypeError, ValueError) as e:
130+
langfuse_logger.debug("Failed to serialize tool call args to JSON: %s", e)
131+
arguments = "{}"
132+
result: Dict[str, Any] = {
133+
"id": tc.get("id", ""),
134+
"type": "function",
135+
"name": tc.get("name", ""),
136+
"arguments": arguments,
137+
}
138+
if include_error:
139+
result["error"] = tc.get("error", "")
140+
return result
141+
142+
38143
try:
39144
import langchain
40145

@@ -841,9 +946,27 @@ def __on_llm_action(
841946
self._child_to_parent_run_id_map[run_id] = parent_run_id
842947

843948
try:
844-
tools = kwargs.get("invocation_params", {}).get("tools", None)
949+
observation_input: Any = prompts
950+
invocation_params = kwargs.get("invocation_params", {})
951+
langfuse_logger.debug(
952+
"LLM action invocation_params keys: %s", list(invocation_params.keys())
953+
)
954+
tools = invocation_params.get("tools", None)
955+
langfuse_logger.debug(
956+
"LLM action tools from invocation_params: %s", tools
957+
)
845958
if tools and isinstance(tools, list):
846-
prompts.extend([{"role": "tool", "content": tool} for tool in tools])
959+
# Structure input as {messages, tools} so extractToolsFromObservation
960+
# can find tool definitions at the top-level tools key — the canonical
961+
# format expected by the backend's LLMToolDefinitionSchema.
962+
normalized_tools = [_to_langfuse_tool(t) for t in tools]
963+
langfuse_logger.debug(
964+
"LLM action normalized tools: %s", normalized_tools
965+
)
966+
observation_input = {
967+
"messages": prompts,
968+
"tools": normalized_tools,
969+
}
847970

848971
model_name = self._parse_model_and_log_errors(
849972
serialized=serialized, metadata=metadata, kwargs=kwargs
@@ -868,7 +991,7 @@ def __on_llm_action(
868991

869992
content = {
870993
"name": self.get_langchain_run_name(serialized, **kwargs),
871-
"input": prompts,
994+
"input": observation_input,
872995
"metadata": self.__join_tags_and_metadata(
873996
tags,
874997
metadata,
@@ -1049,21 +1172,56 @@ def _convert_message_to_dict(self, message: BaseMessage) -> Dict[str, Any]:
10491172
if isinstance(message, HumanMessage):
10501173
message_dict: Dict[str, Any] = {"role": "user", "content": message.content}
10511174
elif isinstance(message, AIMessage):
1052-
message_dict = {"role": "assistant", "content": message.content}
1053-
1054-
if (
1175+
# Normalize Anthropic content blocks: fill empty tool_use input from
1176+
# message.tool_calls and strip streaming artifacts (index, partial_json).
1177+
content = message.content
1178+
langfuse_logger.debug(
1179+
"AIMessage content type=%s value=%s", type(content).__name__, content
1180+
)
1181+
lc_tool_calls = (
1182+
list(message.tool_calls)
1183+
if hasattr(message, "tool_calls") and message.tool_calls
1184+
else []
1185+
)
1186+
langfuse_logger.debug(
1187+
"AIMessage tool_calls=%s additional_kwargs=%s",
1188+
lc_tool_calls,
1189+
message.additional_kwargs,
1190+
)
1191+
if isinstance(content, list) and lc_tool_calls:
1192+
content = _normalize_anthropic_content_blocks(content, lc_tool_calls)
1193+
langfuse_logger.debug("AIMessage normalized content=%s", content)
1194+
message_dict = {"role": "assistant", "content": content}
1195+
1196+
# Resolve tool_calls: prefer LangChain's normalized {name, args, id}
1197+
# format; fall back to additional_kwargs["tool_calls"] which contains
1198+
# Anthropic's raw {type: "tool_use", name, input, id} format when
1199+
# streaming is used and message.tool_calls is not populated.
1200+
raw_tool_calls = message.tool_calls if (
10551201
hasattr(message, "tool_calls")
10561202
and message.tool_calls is not None
10571203
and len(message.tool_calls) > 0
1058-
):
1059-
message_dict["tool_calls"] = message.tool_calls
1204+
) else message.additional_kwargs.get("tool_calls") or []
1205+
1206+
if raw_tool_calls:
1207+
converted_tool_calls = [
1208+
r for tc in raw_tool_calls if (r := _convert_tool_call(tc)) is not None
1209+
]
1210+
if converted_tool_calls:
1211+
message_dict["tool_calls"] = converted_tool_calls
10601212

10611213
if (
1062-
hasattr(message, "invalid_tool_calls")
1063-
and message.invalid_tool_calls is not None
1214+
hasattr(message, "invalid_tool_calls")
1215+
and message.invalid_tool_calls is not None
10641216
and len(message.invalid_tool_calls) > 0
10651217
):
1066-
message_dict["invalid_tool_calls"] = message.invalid_tool_calls
1218+
converted_invalid_tool_calls = [
1219+
r
1220+
for tc in message.invalid_tool_calls
1221+
if (r := _convert_tool_call(tc, include_error=True)) is not None
1222+
]
1223+
if converted_invalid_tool_calls:
1224+
message_dict["invalid_tool_calls"] = converted_invalid_tool_calls
10671225

10681226
elif isinstance(message, SystemMessage):
10691227
message_dict = {"role": "system", "content": message.content}

0 commit comments

Comments
 (0)