Skip to content

Add Langfuse integration for Temporal workflows#1629

Draft
jannikmaierhoefer wants to merge 1 commit intomainfrom
claude/langfuse-temporal-plugin-Fbowg
Draft

Add Langfuse integration for Temporal workflows#1629
jannikmaierhoefer wants to merge 1 commit intomainfrom
claude/langfuse-temporal-plugin-Fbowg

Conversation

@jannikmaierhoefer
Copy link
Copy Markdown
Member

Summary

This PR adds a comprehensive Langfuse integration for Temporal (temporalio), enabling observability of Temporal workflows and activities through OpenTelemetry spans routed to Langfuse. The integration enriches Temporal spans with Langfuse-specific metadata (session ID, user ID, tags, metadata) and optional input/output payloads.

Key Changes

Core Plugin Implementation

  • langfuse/temporal/plugin.py: Main LangfusePlugin class that extends temporalio.plugin.SimplePlugin. Registers both the Temporal OpenTelemetry interceptor and a Langfuse enrichment interceptor. Handles client initialization and graceful shutdown with flushing.

  • langfuse/temporal/interceptor.py: Span enrichment interceptor that decorates OpenTelemetry spans with Langfuse attributes. Implements hooks for:

    • Client-side workflow starts
    • Workflow execution (with Temporal identifiers like workflow_id, run_id)
    • Activity execution (with activity-specific metadata)
    • Completion handlers for optional input/output capture
    • Factory-based dynamic session/user/tag/metadata resolution

Configuration System

  • langfuse/temporal/config.py: Comprehensive configuration dataclasses:
    • LangfusePluginConfig: Main configuration with sensible defaults (metadata-only by default)
    • CaptureConfig: Controls for payload capture with allowlist/denylist support per workflow/activity
    • TracingConfig: Toggles for which Temporal surfaces produce spans
    • UIEnrichmentConfig: Correlation with Temporal UI
    • FactoryContext: Context passed to user-provided factory callbacks for dynamic metadata

Supporting Modules

  • langfuse/temporal/attributes.py: Centralized OpenTelemetry span attribute key definitions for Langfuse and Temporal metadata
  • langfuse/temporal/redaction.py: Payload serialization, redaction, and size-limiting utilities. Supports custom redaction callbacks and graceful handling of non-JSON-serializable objects
  • langfuse/temporal/presets.py: Framework-specific plugin composition helpers for OpenAI Agents SDK and Pydantic AI integrations
  • langfuse/temporal/__init__.py: Public API exports

Testing

  • tests/unit/temporal/test_enrichment.py: Unit tests for span enrichment logic with fake spans (no Temporal dependency required)
  • tests/unit/temporal/test_redaction.py: Tests for payload serialization and redaction
  • tests/unit/temporal/test_plugin.py: Plugin instantiation and configuration tests (skipped if temporalio not installed)
  • tests/unit/temporal/test_config.py: Configuration dataclass validation tests

Notable Implementation Details

  • Defensive Design: All enrichment operations are wrapped in try-catch blocks and logged at debug level. Tracing errors never break application workflows or activities.
  • Lazy Imports: Temporal is imported lazily inside class bodies so the base langfuse.temporal module can be imported and tested without Temporal installed.
  • Sandbox-Safe: Workflow-side code only touches OpenTelemetry API (sandbox-compatible) and temporalio.workflow.info(). The Langfuse client is used exclusively from the worker process.
  • Metadata-Only by Default: All payload capture flags default to False to prevent accidental export of sensitive data.
  • Session Defaulting: When no session factory is provided, defaults to workflow_id per the trace model requirements.
  • Flexible Metadata: Supports static metadata, factory-based dynamic metadata, and static/dynamic tags with proper JSON serialization and OTel attribute coercion.
  • Allowlist/Denylist Support: Fine-grained control over which workflows/activities have payloads captured.

Dependencies

Added optional temporal dependency group requiring temporalio>=1.18.0,<2.

https://claude.ai/code/session_013bfLpjTrf8qPkTftZHb9Hf

Introduces langfuse.temporal.LangfusePlugin, a Temporal SimplePlugin that
enriches the OTel spans produced by temporalio.contrib.opentelemetry with
Langfuse session/user/tag/metadata attributes and optional payload
captures. The plugin defaults to metadata-only capture, stays
sandbox/replay safe, and composes with framework plugins
(OpenAIAgentsPlugin, PydanticAIPlugin) rather than replacing them.

https://claude.ai/code/session_013bfLpjTrf8qPkTftZHb9Hf
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants