Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions langfuse/temporal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Langfuse integration for Temporal (``temporalio``).

This package provides a Temporal plugin that emits OpenTelemetry spans for
Temporal client/workflow/activity operations and routes them to Langfuse
with Langfuse-specific attributes (session id, user id, tags, metadata,
and optional input/output captures).

Public API::

from langfuse.temporal import LangfusePlugin, LangfusePluginConfig

Usage (minimal)::

from temporalio.client import Client
from langfuse.temporal import LangfusePlugin

client = await Client.connect(
"localhost:7233",
plugins=[LangfusePlugin()],
)

The plugin is worker-capable, so when attached to the ``Client`` it is
automatically carried over to any ``Worker`` constructed from that client.
Do not re-attach it on the worker — see the guide for details.

Framework presets::

from langfuse.temporal.presets import (
langfuse_openai_agents_plugins,
langfuse_pydantic_ai_plugins,
)

Installation::

pip install "langfuse[temporal]"

See the Langfuse docs for the full configuration surface, sandbox /
replay caveats, and framework-specific guides.
"""

from __future__ import annotations

from .attributes import (
LANGFUSE_INPUT,
LANGFUSE_METADATA_PREFIX,
LANGFUSE_OUTPUT,
LANGFUSE_SESSION_ID,
LANGFUSE_TAGS,
LANGFUSE_USER_ID,
)
from .config import (
CaptureConfig,
FactoryContext,
LangfusePluginConfig,
TracingConfig,
UIEnrichmentConfig,
)
from .plugin import PLUGIN_NAME, LangfusePlugin

__all__ = [
"LangfusePlugin",
"LangfusePluginConfig",
"CaptureConfig",
"TracingConfig",
"UIEnrichmentConfig",
"FactoryContext",
"PLUGIN_NAME",
"LANGFUSE_SESSION_ID",
"LANGFUSE_USER_ID",
"LANGFUSE_TAGS",
"LANGFUSE_INPUT",
"LANGFUSE_OUTPUT",
"LANGFUSE_METADATA_PREFIX",
]
69 changes: 69 additions & 0 deletions langfuse/temporal/attributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""OpenTelemetry span attribute keys used by the Langfuse Temporal plugin.

These are the canonical attribute keys that the plugin writes onto the
OpenTelemetry spans produced by ``temporalio.contrib.opentelemetry``. They are
consumed by Langfuse's OTel ingestion path, which maps selected attributes
onto Langfuse trace / observation fields (``session_id``, ``user_id``,
``tags``, ``metadata``, ``input``, ``output``, etc.).

Keeping them centralized prevents typos and makes it easy to audit exactly
which attributes the plugin can emit.
"""

from __future__ import annotations

# Langfuse core correlation attributes. These mirror the attribute keys used
# by the Langfuse OTel SDK and must stay compatible with Langfuse ingestion.
LANGFUSE_SESSION_ID = "langfuse.session.id"
LANGFUSE_USER_ID = "langfuse.user.id"
LANGFUSE_TAGS = "langfuse.tags"
LANGFUSE_METADATA_PREFIX = "langfuse.metadata."
LANGFUSE_ENVIRONMENT = "langfuse.environment"
LANGFUSE_RELEASE = "langfuse.release"
LANGFUSE_VERSION = "langfuse.version"

# Optional payload capture. These are only written when the plugin is
# explicitly configured to capture payloads.
LANGFUSE_INPUT = "langfuse.observation.input"
LANGFUSE_OUTPUT = "langfuse.observation.output"

# Temporal-specific metadata. These are ingested by Langfuse as generic span
# attributes (and surfaced as observation metadata) and are always safe to
# emit because they are identifiers, not payload bodies.
TEMPORAL_WORKFLOW_ID = "temporal.workflow.id"
TEMPORAL_RUN_ID = "temporal.workflow.run_id"
TEMPORAL_WORKFLOW_TYPE = "temporal.workflow.type"
TEMPORAL_NAMESPACE = "temporal.namespace"
TEMPORAL_TASK_QUEUE = "temporal.task_queue"
TEMPORAL_ACTIVITY_ID = "temporal.activity.id"
TEMPORAL_ACTIVITY_TYPE = "temporal.activity.type"
TEMPORAL_ATTEMPT = "temporal.attempt"
TEMPORAL_PARENT_WORKFLOW_ID = "temporal.parent.workflow_id"
TEMPORAL_PARENT_RUN_ID = "temporal.parent.run_id"
TEMPORAL_IS_LOCAL_ACTIVITY = "temporal.activity.is_local"
TEMPORAL_IS_REPLAYING = "temporal.workflow.is_replaying"


__all__ = [
"LANGFUSE_SESSION_ID",
"LANGFUSE_USER_ID",
"LANGFUSE_TAGS",
"LANGFUSE_METADATA_PREFIX",
"LANGFUSE_ENVIRONMENT",
"LANGFUSE_RELEASE",
"LANGFUSE_VERSION",
"LANGFUSE_INPUT",
"LANGFUSE_OUTPUT",
"TEMPORAL_WORKFLOW_ID",
"TEMPORAL_RUN_ID",
"TEMPORAL_WORKFLOW_TYPE",
"TEMPORAL_NAMESPACE",
"TEMPORAL_TASK_QUEUE",
"TEMPORAL_ACTIVITY_ID",
"TEMPORAL_ACTIVITY_TYPE",
"TEMPORAL_ATTEMPT",
"TEMPORAL_PARENT_WORKFLOW_ID",
"TEMPORAL_PARENT_RUN_ID",
"TEMPORAL_IS_LOCAL_ACTIVITY",
"TEMPORAL_IS_REPLAYING",
]
188 changes: 188 additions & 0 deletions langfuse/temporal/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""Configuration surface for the Langfuse Temporal plugin.

Organized into logical groups so the main plugin constructor does not have
to explode into a giant positional API. Each knob has a conservative default
so that installing the plugin with no arguments gives you safe, metadata-only
Temporal tracing routed to Langfuse.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Callable, List, Mapping, Optional, Sequence

from .redaction import RedactCallback

# Factory signatures. They are called at span-creation time inside the
# Langfuse enrichment interceptor and must be side-effect free: they run
# inside the Temporal workflow sandbox for workflow spans and must not touch
# the network, wall-clock time, or mutable globals.
IdFactory = Callable[["FactoryContext"], Optional[str]]
TagFactory = Callable[["FactoryContext"], Optional[Sequence[str]]]
MetadataFactory = Callable[["FactoryContext"], Optional[Mapping[str, Any]]]


@dataclass
class FactoryContext:
"""Deterministic context passed to user-provided factory callbacks.

``kind`` is one of ``"client_start_workflow"``, ``"workflow"``,
``"activity"``. ``info`` carries whatever Temporal info object is
available on the current side (the client input, ``workflow.Info``, or
``activity.Info``). ``input`` is the raw Temporal args tuple; it is made
available so factories can, e.g., pull a user_id off a request object,
but factories must not mutate it.
"""

kind: str
info: Any = None
input: Any = None


@dataclass
class CaptureConfig:
"""Controls for payload capture on Temporal spans.

All flags default to ``False`` so that installing the plugin never
accidentally exports sensitive workflow/activity payloads to Langfuse.
When a flag is enabled, the corresponding payload is serialized, passed
through :attr:`redact`, truncated to :attr:`size_limit_bytes`, and
attached to the relevant span.
"""

capture_workflow_inputs: bool = False
capture_workflow_outputs: bool = False
capture_activity_inputs: bool = False
capture_activity_outputs: bool = False

size_limit_bytes: Optional[int] = 32 * 1024
redact: Optional[RedactCallback] = None

# Allow/deny lists by Temporal name. A workflow or activity name appears
# on the denylist wins: it is never captured. When an allowlist is
# non-empty, only names on the allowlist are captured.
workflow_allowlist: Optional[Sequence[str]] = None
workflow_denylist: Optional[Sequence[str]] = None
activity_allowlist: Optional[Sequence[str]] = None
activity_denylist: Optional[Sequence[str]] = None

def should_capture_workflow(self, workflow_type: Optional[str]) -> bool:
return _should_capture(
workflow_type, self.workflow_allowlist, self.workflow_denylist
)

def should_capture_activity(self, activity_type: Optional[str]) -> bool:
return _should_capture(
activity_type, self.activity_allowlist, self.activity_denylist
)


def _should_capture(
name: Optional[str],
allowlist: Optional[Sequence[str]],
denylist: Optional[Sequence[str]],
) -> bool:
if name is None:
# With no name to match against, fall back to the user's allowlist
# intent: if they specified an allowlist we default-deny, otherwise
# default-allow.
return not allowlist
if denylist and name in denylist:
return False
if allowlist:
return name in allowlist
return True


@dataclass
class TracingConfig:
"""Controls for which Temporal surfaces produce spans.

The base plugin always instruments client ``start_workflow`` and worker
``execute_workflow`` / ``execute_activity``. The remaining surfaces
(signals, queries, updates, local activities) are opt-in because they
can be extremely high-volume in production and often duplicate info
already present on the parent workflow run.
"""

add_temporal_spans: bool = True
trace_signals: bool = False
trace_queries: bool = False
trace_updates: bool = True
trace_local_activities: bool = True


@dataclass
class UIEnrichmentConfig:
"""Controls for correlating Temporal UI fields with Langfuse.

``memo_trace_id`` asks the plugin to add the Langfuse ``trace_id`` to
the Temporal workflow memo so operators can jump from Temporal UI to
the Langfuse trace. ``search_attribute_key``, when set, does the same
thing via a custom search attribute (which must already be registered
in the target namespace).
"""

memo_trace_id: bool = False
search_attribute_key: Optional[str] = None


@dataclass
class LangfusePluginConfig:
"""Full configuration for :class:`langfuse.temporal.LangfusePlugin`.

Using a dataclass keeps the plugin constructor readable and lets
framework presets build a config once and reuse it.
"""

# Tracing ownership.
tracer_provider: Optional[Any] = None
use_existing_otel: bool = True

# Langfuse client. When omitted, the plugin uses :func:`langfuse.get_client`
# at worker startup. Workflow code never touches this object — it is
# used only for flushing at worker shutdown.
langfuse_client: Optional[Any] = None
flush_on_shutdown: bool = True

# Tracing scope.
tracing: TracingConfig = field(default_factory=TracingConfig)

# Privacy.
capture: CaptureConfig = field(default_factory=CaptureConfig)

# Correlation.
session_id_factory: Optional[IdFactory] = None
user_id_factory: Optional[IdFactory] = None
tags_factory: Optional[TagFactory] = None
metadata_factory: Optional[MetadataFactory] = None

# Static defaults applied to every span (cheap, always safe).
static_tags: Sequence[str] = field(default_factory=list)
static_metadata: Mapping[str, Any] = field(default_factory=dict)
environment: Optional[str] = None
release: Optional[str] = None
version: Optional[str] = None

# UI enrichment.
ui: UIEnrichmentConfig = field(default_factory=UIEnrichmentConfig)

# Deployment mode. When ``True`` the plugin installs tracing/context
# propagation but does not require/initialize a Langfuse exporter,
# which is the right shape for starter-only processes.
context_only: bool = False

def resolved_static_tags(self) -> List[str]:
return list(self.static_tags)


__all__ = [
"CaptureConfig",
"FactoryContext",
"IdFactory",
"LangfusePluginConfig",
"MetadataFactory",
"TagFactory",
"TracingConfig",
"UIEnrichmentConfig",
]
Loading
Loading