Skip to content

Commit d81d230

Browse files
committed
fix(langchain): normalize tool definitions across all providers in callback handler
- Add _to_langfuse_tool helper returning a list to normalize tool definitions into OpenAI canonical format, handling: - OpenAI / LiteLLM / Ollama: {type: "function", function: {name, description, parameters}} - Anthropic: {name, description, input_schema} - Google / Vertex AI: {function_declarations: [{name, description, parameters}, ...]} - BaseTool / StructuredTool objects not yet converted to dict (fixes #11850) - Flatten Google's function_declarations container (one object → N tools) at call site - Add _convert_tool_call helper to unify tool_calls and invalid_tool_calls conversion, supporting both LangChain (args) and Anthropic streaming (input) formats - Structure on_llm_start input as {messages, tools} so the backend's extractToolsFromObservation can find tool definitions at the top-level tools key - Add _normalize_anthropic_content_blocks to strip streaming artifacts (index, partial_json) from Anthropic tool_use content blocks and fill empty input from message.tool_calls - Add unit tests for all formats and helpers
1 parent caddeff commit d81d230

File tree

3 files changed

+461
-21
lines changed

3 files changed

+461
-21
lines changed

langfuse/_client/propagation.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from opentelemetry import (
1818
trace as otel_trace_api,
1919
)
20+
from opentelemetry.context import _RUNTIME_CONTEXT
2021
from opentelemetry.util._decorator import (
2122
_AgnosticContextManager,
2223
_agnosticcontextmanager,
@@ -272,7 +273,13 @@ def _propagate_attributes(
272273
yield
273274

274275
finally:
275-
otel_context_api.detach(token)
276+
try:
277+
# Bypass the public detach() which logs an ERROR when the token was
278+
# created in a different async task/thread (common in async frameworks).
279+
# The span data is already captured; the failed detach is harmless.
280+
_RUNTIME_CONTEXT.detach(token)
281+
except Exception:
282+
pass
276283

277284

278285
def _get_propagated_attributes_from_context(

langfuse/langchain/CallbackHandler.py

Lines changed: 225 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
from contextvars import Token
23
from typing import (
34
Any,
@@ -35,6 +36,166 @@
3536
from langfuse.logger import langfuse_logger
3637
from langfuse.types import TraceContext
3738

39+
40+
def _to_langfuse_tool(tool: Any) -> List[Any]:
41+
"""Normalize a tool definition into a list of OpenAI-format tool dicts.
42+
43+
Returns a list because Google / Vertex AI's ``function_declarations`` format
44+
bundles multiple tools into a single object, so one input can expand to many.
45+
46+
LangChain providers serialize tools differently depending on the backend:
47+
- OpenAI / LiteLLM / Ollama: {type: "function", function: {name, description, parameters}}
48+
- Anthropic (ChatAnthropic): {name, description, input_schema}
49+
- Google / Vertex AI: {function_declarations: [{name, description, parameters}, ...]}
50+
- BaseTool / StructuredTool objects: LangChain objects not yet converted to dict
51+
52+
All formats are normalized to the canonical OpenAI shape so Langfuse's
53+
``extractToolsFromObservation`` (which uses ``OpenAIToolSchema``) can parse them.
54+
"""
55+
if not isinstance(tool, dict):
56+
# BaseTool / StructuredTool objects — passed without dict conversion (langfuse#11850).
57+
# Extract via duck typing to avoid a hard langchain-core dependency.
58+
if hasattr(tool, "name") and hasattr(tool, "description"):
59+
try:
60+
parameters: Dict[str, Any] = {}
61+
args_schema = getattr(tool, "args_schema", None)
62+
if args_schema is not None:
63+
if hasattr(args_schema, "model_json_schema"): # Pydantic v2
64+
schema = args_schema.model_json_schema()
65+
elif hasattr(args_schema, "schema"): # Pydantic v1
66+
schema = args_schema.schema()
67+
else:
68+
schema = {}
69+
parameters = {k: v for k, v in schema.items() if k != "title"}
70+
return [
71+
{
72+
"type": "function",
73+
"function": {
74+
"name": tool.name,
75+
"description": tool.description or "",
76+
"parameters": parameters,
77+
},
78+
}
79+
]
80+
except Exception:
81+
langfuse_logger.debug(
82+
"Failed to convert BaseTool object to dict: %s", tool
83+
)
84+
return [tool]
85+
86+
# Already in OpenAI format: {type: "function", function: {name, description, parameters}}
87+
if tool.get("type") == "function" and "function" in tool:
88+
return [tool]
89+
90+
# Anthropic format: {name, description, input_schema} -> OpenAI format
91+
if "name" in tool and "input_schema" in tool:
92+
return [
93+
{
94+
"type": "function",
95+
"function": {
96+
"name": tool["name"],
97+
"description": tool.get("description", ""),
98+
"parameters": tool["input_schema"],
99+
},
100+
}
101+
]
102+
103+
# Google / Vertex AI format: {function_declarations: [{name, description, parameters}, ...]}
104+
# One object bundles multiple tool definitions — expand to individual OpenAI-format tools.
105+
if "function_declarations" in tool:
106+
result = []
107+
for decl in tool.get("function_declarations", []):
108+
if not isinstance(decl, dict):
109+
continue
110+
result.append(
111+
{
112+
"type": "function",
113+
"function": {
114+
"name": decl.get("name", ""),
115+
"description": decl.get("description", ""),
116+
"parameters": decl.get("parameters", {}),
117+
},
118+
}
119+
)
120+
return result if result else [tool]
121+
122+
return [tool]
123+
124+
125+
def _normalize_anthropic_content_blocks(
126+
content: List[Any], tool_calls: List[Dict[str, Any]]
127+
) -> List[Any]:
128+
"""Remove streaming artifacts from Anthropic content blocks.
129+
130+
Anthropic streaming leaves tool_use blocks with ``input: {}`` and
131+
streaming-specific fields (``index``, ``partial_json``). The actual
132+
arguments are already reconstructed in ``message.tool_calls``. This
133+
helper fills the empty ``input`` from the normalized tool_calls and
134+
strips the streaming-only keys so the block looks like a proper
135+
Anthropic content block.
136+
"""
137+
if not tool_calls:
138+
return content
139+
tc_by_id: Dict[str, Any] = {
140+
tc["id"]: tc.get("args", {})
141+
for tc in tool_calls
142+
if isinstance(tc, dict) and "id" in tc
143+
}
144+
tc_by_name: Dict[str, Any] = {
145+
tc["name"]: tc.get("args", {})
146+
for tc in tool_calls
147+
if isinstance(tc, dict) and "name" in tc
148+
}
149+
result = []
150+
for block in content:
151+
if isinstance(block, dict) and block.get("type") == "tool_use":
152+
block_input = block.get("input") or {}
153+
if not block_input:
154+
block_input = (
155+
tc_by_id.get(block.get("id", ""))
156+
or tc_by_name.get(block.get("name", ""))
157+
or {}
158+
)
159+
normalized = {
160+
k: v for k, v in block.items() if k not in ("index", "partial_json")
161+
}
162+
normalized["input"] = block_input
163+
result.append(normalized)
164+
else:
165+
result.append(block)
166+
return result
167+
168+
169+
def _convert_tool_call(tc: Any, include_error: bool = False) -> Optional[Dict[str, Any]]:
170+
"""Convert a single tool call dict to Langfuse's canonical format.
171+
172+
Handles both LangChain format {name, args, id} and Anthropic streaming
173+
format {type: "tool_use", name, input, id}.
174+
175+
Returns None (and logs a debug message) if tc is not a dict.
176+
Set include_error=True for invalid_tool_calls entries.
177+
"""
178+
if not isinstance(tc, dict):
179+
langfuse_logger.debug("Skipping tool_call entry that is not a dict: %s", tc)
180+
return None
181+
# Anthropic streaming uses "input" instead of "args"
182+
args = tc.get("args") or tc.get("input") or {}
183+
try:
184+
arguments = json.dumps(args)
185+
except (TypeError, ValueError) as e:
186+
langfuse_logger.debug("Failed to serialize tool call args to JSON: %s", e)
187+
arguments = "{}"
188+
result: Dict[str, Any] = {
189+
"id": tc.get("id", ""),
190+
"type": "function",
191+
"name": tc.get("name", ""),
192+
"arguments": arguments,
193+
}
194+
if include_error:
195+
result["error"] = tc.get("error", "")
196+
return result
197+
198+
38199
try:
39200
import langchain
40201

@@ -841,9 +1002,27 @@ def __on_llm_action(
8411002
self._child_to_parent_run_id_map[run_id] = parent_run_id
8421003

8431004
try:
844-
tools = kwargs.get("invocation_params", {}).get("tools", None)
1005+
observation_input: Any = prompts
1006+
invocation_params = kwargs.get("invocation_params", {})
1007+
langfuse_logger.debug(
1008+
"LLM action invocation_params keys: %s", list(invocation_params.keys())
1009+
)
1010+
tools = invocation_params.get("tools", None)
1011+
langfuse_logger.debug(
1012+
"LLM action tools from invocation_params: %s", tools
1013+
)
8451014
if tools and isinstance(tools, list):
846-
prompts.extend([{"role": "tool", "content": tool} for tool in tools])
1015+
# Structure input as {messages, tools} so extractToolsFromObservation
1016+
# can find tool definitions at the top-level tools key — the canonical
1017+
# format expected by the backend's LLMToolDefinitionSchema.
1018+
normalized_tools = [n for t in tools for n in _to_langfuse_tool(t)]
1019+
langfuse_logger.debug(
1020+
"LLM action normalized tools: %s", normalized_tools
1021+
)
1022+
observation_input = {
1023+
"messages": prompts,
1024+
"tools": normalized_tools,
1025+
}
8471026

8481027
model_name = self._parse_model_and_log_errors(
8491028
serialized=serialized, metadata=metadata, kwargs=kwargs
@@ -868,7 +1047,7 @@ def __on_llm_action(
8681047

8691048
content = {
8701049
"name": self.get_langchain_run_name(serialized, **kwargs),
871-
"input": prompts,
1050+
"input": observation_input,
8721051
"metadata": self.__join_tags_and_metadata(
8731052
tags,
8741053
metadata,
@@ -1049,21 +1228,56 @@ def _convert_message_to_dict(self, message: BaseMessage) -> Dict[str, Any]:
10491228
if isinstance(message, HumanMessage):
10501229
message_dict: Dict[str, Any] = {"role": "user", "content": message.content}
10511230
elif isinstance(message, AIMessage):
1052-
message_dict = {"role": "assistant", "content": message.content}
1053-
1054-
if (
1231+
# Normalize Anthropic content blocks: fill empty tool_use input from
1232+
# message.tool_calls and strip streaming artifacts (index, partial_json).
1233+
content = message.content
1234+
langfuse_logger.debug(
1235+
"AIMessage content type=%s value=%s", type(content).__name__, content
1236+
)
1237+
lc_tool_calls = (
1238+
list(message.tool_calls)
1239+
if hasattr(message, "tool_calls") and message.tool_calls
1240+
else []
1241+
)
1242+
langfuse_logger.debug(
1243+
"AIMessage tool_calls=%s additional_kwargs=%s",
1244+
lc_tool_calls,
1245+
message.additional_kwargs,
1246+
)
1247+
if isinstance(content, list) and lc_tool_calls:
1248+
content = _normalize_anthropic_content_blocks(content, lc_tool_calls)
1249+
langfuse_logger.debug("AIMessage normalized content=%s", content)
1250+
message_dict = {"role": "assistant", "content": content}
1251+
1252+
# Resolve tool_calls: prefer LangChain's normalized {name, args, id}
1253+
# format; fall back to additional_kwargs["tool_calls"] which contains
1254+
# Anthropic's raw {type: "tool_use", name, input, id} format when
1255+
# streaming is used and message.tool_calls is not populated.
1256+
raw_tool_calls = message.tool_calls if (
10551257
hasattr(message, "tool_calls")
10561258
and message.tool_calls is not None
10571259
and len(message.tool_calls) > 0
1058-
):
1059-
message_dict["tool_calls"] = message.tool_calls
1260+
) else message.additional_kwargs.get("tool_calls") or []
1261+
1262+
if raw_tool_calls:
1263+
converted_tool_calls = [
1264+
r for tc in raw_tool_calls if (r := _convert_tool_call(tc)) is not None
1265+
]
1266+
if converted_tool_calls:
1267+
message_dict["tool_calls"] = converted_tool_calls
10601268

10611269
if (
1062-
hasattr(message, "invalid_tool_calls")
1063-
and message.invalid_tool_calls is not None
1270+
hasattr(message, "invalid_tool_calls")
1271+
and message.invalid_tool_calls is not None
10641272
and len(message.invalid_tool_calls) > 0
10651273
):
1066-
message_dict["invalid_tool_calls"] = message.invalid_tool_calls
1274+
converted_invalid_tool_calls = [
1275+
r
1276+
for tc in message.invalid_tool_calls
1277+
if (r := _convert_tool_call(tc, include_error=True)) is not None
1278+
]
1279+
if converted_invalid_tool_calls:
1280+
message_dict["invalid_tool_calls"] = converted_invalid_tool_calls
10671281

10681282
elif isinstance(message, SystemMessage):
10691283
message_dict = {"role": "system", "content": message.content}

0 commit comments

Comments
 (0)