From 3d709ffefe3660617c3c06ddad24ae76afce918b Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 1 May 2026 10:42:26 +0900 Subject: [PATCH 1/2] feat(langfuse): add Langfuse tracing module for ai::Client agent calls Adds an opt-in ai::langfuse module that wraps generate_text calls and emits one trace per logical operation with: - one Generation observation (model, modelParameters, input messages, output text, aggregated usage, finish_reason / step count metadata), - one Span per tool invocation, parented to the generation, with the tool's args and result (or error) recorded as input/output. The module hooks into the existing on_tool_call_start / on_tool_call_finish callbacks in GenerateOptions so user-installed callbacks are preserved (chained). After generate_text returns, the caller invokes Trace::end() to flush the accumulated batch to POST /api/public/ingestion via the vendored httplib + Basic auth. Layout - include/ai/langfuse.h: public surface (Config, Tracer, Trace, generate_text helper). - src/langfuse/tracer.cpp: HTTP / batching / event-shape implementation. - examples/langfuse_tracing.cpp: end-to-end demo using OpenAI with lookup_user / get_weather tools across multiple steps. - .env.local.example: credentials template (the populated .env.local is gitignored). - CMakeLists.txt: new ai-sdk-cpp-langfuse target (alias ai::langfuse), AI_SDK_HAS_LANGFUSE=1, included in ai::sdk and the install/export set. Existing targets unchanged. Verified end-to-end: example produced trace with the expected nested generation+tool observations against us.cloud.langfuse.com. --- .env.local.example | 19 ++ CMakeLists.txt | 37 ++- examples/CMakeLists.txt | 3 + examples/langfuse_tracing.cpp | 116 ++++++++ include/ai/langfuse.h | 209 +++++++++++++ src/langfuse/tracer.cpp | 538 ++++++++++++++++++++++++++++++++++ 6 files changed, 920 insertions(+), 2 deletions(-) create mode 100644 .env.local.example create mode 100644 examples/langfuse_tracing.cpp create mode 100644 include/ai/langfuse.h create mode 100644 src/langfuse/tracer.cpp diff --git a/.env.local.example b/.env.local.example new file mode 100644 index 0000000..9976c0c --- /dev/null +++ b/.env.local.example @@ -0,0 +1,19 @@ +# Credentials for examples/langfuse_tracing.cpp. +# Copy to .env.local and fill in your keys, then: +# set -a; source .env.local; set +a +# ./build/examples/langfuse_tracing_debug + +# --- LLM provider (required by the example) --- +OPENAI_API_KEY= + +# --- Langfuse (required) --- +# Project keys from your Langfuse instance -> Settings -> API keys +LANGFUSE_PUBLIC_KEY= +LANGFUSE_SECRET_KEY= + +# --- Langfuse host (optional) --- +# EU cloud (default): https://cloud.langfuse.com +# US cloud: https://us.cloud.langfuse.com +# Self-hosted (langfuse/ docker-compose): http://localhost:3000 +# The example also accepts LANGFUSE_BASE_URL as a synonym. +LANGFUSE_HOST=https://cloud.langfuse.com diff --git a/CMakeLists.txt b/CMakeLists.txt index 8a10ea8..2eb379b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,6 +45,7 @@ set(HTTPLIB_COMPILE_DEFS add_library(ai-sdk-cpp-core) add_library(ai-sdk-cpp-openai) add_library(ai-sdk-cpp-anthropic) +add_library(ai-sdk-cpp-langfuse) add_library(ai-sdk-cpp INTERFACE) # Set library aliases @@ -52,11 +53,13 @@ add_library(ai::sdk ALIAS ai-sdk-cpp) add_library(ai::core ALIAS ai-sdk-cpp-core) add_library(ai::openai ALIAS ai-sdk-cpp-openai) add_library(ai::anthropic ALIAS ai-sdk-cpp-anthropic) +add_library(ai::langfuse ALIAS ai-sdk-cpp-langfuse) # Configure all component libraries configure_ai_component(ai-sdk-cpp-core) configure_ai_component(ai-sdk-cpp-openai) configure_ai_component(ai-sdk-cpp-anthropic) +configure_ai_component(ai-sdk-cpp-langfuse) # Configure main interface library target_include_directories(ai-sdk-cpp @@ -103,6 +106,12 @@ set(ANTHROPIC_SOURCES target_sources(ai-sdk-cpp-openai PRIVATE ${OPENAI_SOURCES}) target_sources(ai-sdk-cpp-anthropic PRIVATE ${ANTHROPIC_SOURCES}) +# Langfuse tracing component +target_sources(ai-sdk-cpp-langfuse + PRIVATE + src/langfuse/tracer.cpp +) + # Link dependencies target_link_libraries(ai-sdk-cpp-core PUBLIC @@ -124,7 +133,7 @@ foreach(provider openai anthropic) $ $ ) - + # Set component availability and HTTP definitions string(TOUPPER ${provider} PROVIDER_UPPER) target_compile_definitions(ai-sdk-cpp-${provider} @@ -135,6 +144,23 @@ foreach(provider openai anthropic) ) endforeach() +# Langfuse tracing component links to core and uses httplib + OpenSSL. +target_link_libraries(ai-sdk-cpp-langfuse + PUBLIC + ai::core + nlohmann_json::nlohmann_json + PRIVATE + $ + $ + $ +) +target_compile_definitions(ai-sdk-cpp-langfuse + PUBLIC + AI_SDK_HAS_LANGFUSE=1 + PRIVATE + ${HTTPLIB_COMPILE_DEFS} +) + # Core needs HTTP definitions too target_compile_definitions(ai-sdk-cpp-core PRIVATE @@ -147,6 +173,7 @@ target_link_libraries(ai-sdk-cpp ai::core ai::openai ai::anthropic + ai::langfuse ) # Define all component availability for main library @@ -154,10 +181,16 @@ target_compile_definitions(ai-sdk-cpp INTERFACE AI_SDK_HAS_OPENAI=1 AI_SDK_HAS_ANTHROPIC=1 + AI_SDK_HAS_LANGFUSE=1 ) # List of all concrete component targets -set(COMPONENT_TARGETS ai-sdk-cpp-core ai-sdk-cpp-openai ai-sdk-cpp-anthropic) +set(COMPONENT_TARGETS + ai-sdk-cpp-core + ai-sdk-cpp-openai + ai-sdk-cpp-anthropic + ai-sdk-cpp-langfuse +) # Common compile options if(MSVC) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 87cedb9..9aa0784 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -53,6 +53,9 @@ add_ai_example(test_tool_integration test_tool_integration.cpp) # Embeddings example add_ai_example(embeddings_example embeddings_example.cpp) +# Langfuse tracing example +add_ai_example(langfuse_tracing langfuse_tracing.cpp) + # Component-specific examples add_subdirectory(components/openai) add_subdirectory(components/anthropic) diff --git a/examples/langfuse_tracing.cpp b/examples/langfuse_tracing.cpp new file mode 100644 index 0000000..a100017 --- /dev/null +++ b/examples/langfuse_tracing.cpp @@ -0,0 +1,116 @@ +/** + * Langfuse Tracing Example - AI SDK C++ + * + * Demonstrates how to wrap an `ai::Client::generate_text` call (with tools and + * multi-step) so the LLM call and each tool execution show up as a trace in + * Langfuse. + * + * Required env: + * OPENAI_API_KEY (or use ai::anthropic instead) + * LANGFUSE_PUBLIC_KEY + * LANGFUSE_SECRET_KEY + * Optional: + * LANGFUSE_HOST (default: https://cloud.langfuse.com) + */ + +#include +#include +#include +#include + +#include +#include +#include + +namespace { + +const char* env_or(const char* name, const char* fallback) { + const char* v = std::getenv(name); + return (v && *v) ? v : fallback; +} + +ai::JsonValue lookup_user(const ai::JsonValue& args, + const ai::ToolExecutionContext&) { + static const std::map users = { + {"alice", {{"name", "Alice"}, {"city", "San Francisco"}}}, + {"bob", {{"name", "Bob"}, {"city", "New York"}}}, + }; + auto id = args.value("user_id", std::string{}); + auto it = users.find(id); + if (it != users.end()) return it->second; + return ai::JsonValue{{"error", "user not found"}}; +} + +ai::JsonValue get_weather(const ai::JsonValue& args, + const ai::ToolExecutionContext&) { + auto loc = args.value("location", std::string{"unknown"}); + return ai::JsonValue{{"location", loc}, {"temperature_c", 21}, {"sky", "clear"}}; +} + +} // namespace + +int main() { + const char* lf_pk = std::getenv("LANGFUSE_PUBLIC_KEY"); + const char* lf_sk = std::getenv("LANGFUSE_SECRET_KEY"); + if (!lf_pk || !lf_sk) { + std::cerr << "Set LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY first.\n"; + return 1; + } + + const char* host = std::getenv("LANGFUSE_HOST"); + if (!host || !*host) host = std::getenv("LANGFUSE_BASE_URL"); + if (!host || !*host) host = "https://cloud.langfuse.com"; + + ai::langfuse::Tracer tracer({ + .host = host, + .public_key = lf_pk, + .secret_key = lf_sk, + .environment = "ai-sdk-cpp-example", + }); + if (!tracer.is_valid()) { + std::cerr << "Langfuse tracer not configured.\n"; + return 1; + } + + auto client = ai::openai::create_client(); + if (!client.is_valid()) { + std::cerr << "OpenAI client not configured (set OPENAI_API_KEY).\n"; + return 1; + } + + ai::ToolSet tools; + tools["lookup_user"] = ai::create_tool( + "Look up a user's profile by id", + ai::create_object_schema({{"user_id", "string"}}), lookup_user); + tools["get_weather"] = ai::create_tool( + "Get the current weather for a location", + ai::create_object_schema({{"location", "string"}}), get_weather); + + ai::GenerateOptions options; + options.model = ai::openai::models::kGpt4oMini; + options.system = + "You are a concise assistant. Use the available tools when helpful."; + options.prompt = "Look up alice and tell me the weather where she lives."; + options.tools = std::move(tools); + options.max_steps = 4; + options.temperature = 0.0; + + auto trace = tracer.start_trace("langfuse_tracing_example"); + trace->set_input(options.prompt); + trace->set_metadata({{"example", "langfuse_tracing"}, + {"sdk", "ai-sdk-cpp"}}); + + auto result = ai::langfuse::generate_text(client, std::move(options), *trace); + + if (result) { + std::cout << "Output: " << result.text << "\n"; + trace->set_output(result.text); + } else { + std::cerr << "Generation failed: " << result.error_message() << "\n"; + trace->set_output(ai::JsonValue{{"error", result.error_message()}}); + } + + trace->end(); + std::cout << "Trace flushed: id=" << trace->id() << "\n"; + return result ? 0 : 2; +} diff --git a/include/ai/langfuse.h b/include/ai/langfuse.h new file mode 100644 index 0000000..8ee9f83 --- /dev/null +++ b/include/ai/langfuse.h @@ -0,0 +1,209 @@ +#pragma once + +/// Langfuse tracing for ai-sdk-cpp. +/// +/// Records traces, generations, and tool spans for `ai::Client::generate_text` +/// calls and POSTs them to Langfuse's `/api/public/ingestion` endpoint. +/// +/// Usage: +/// ```cpp +/// #include +/// #include +/// +/// ai::langfuse::Tracer tracer({ +/// .host = "https://cloud.langfuse.com", +/// .public_key = std::getenv("LANGFUSE_PUBLIC_KEY"), +/// .secret_key = std::getenv("LANGFUSE_SECRET_KEY"), +/// }); +/// +/// auto trace = tracer.start_trace("sql-generation"); +/// trace->set_input(user_prompt); +/// +/// ai::GenerateOptions options{...}; +/// auto result = ai::langfuse::generate_text(client, std::move(options), *trace); +/// +/// trace->set_output(result.text); +/// trace->end(); // synchronous flush +/// ``` + +#include "types/client.h" +#include "types/generate_options.h" +#include "types/tool.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace ai { +namespace langfuse { + +using JsonValue = nlohmann::json; + +struct Config { + /// Base URL of the Langfuse instance, e.g. "https://cloud.langfuse.com" or + /// "https://us.cloud.langfuse.com" or a self-hosted address. + std::string host = "https://cloud.langfuse.com"; + + /// Public API key (`pk-lf-...`). + std::string public_key; + + /// Secret API key (`sk-lf-...`). + std::string secret_key; + + /// Optional release identifier to attach to all traces (commit sha, etc.). + std::string release; + + /// Environment to attach to all traces (Langfuse default is "default"). + std::string environment = "default"; + + /// HTTP timeouts for the ingestion request. + int connection_timeout_sec = 10; + int read_timeout_sec = 30; + + /// If true, suppress HTTP/JSON errors from `Trace::end()` (best-effort + /// tracing). Failures are still logged via the SDK logger. + bool best_effort = true; +}; + +class Trace; + +/// Tracer holds the Langfuse credentials and is the entry point for creating +/// traces. Construct one per process; it is safe to create multiple traces +/// concurrently from a single Tracer. +class Tracer { + public: + explicit Tracer(Config config); + + /// True iff host, public_key and secret_key are set. + bool is_valid() const; + + const Config& config() const { return config_; } + + /// Start a new trace. Returns a shared handle so callbacks attached via + /// `Trace::instrument()` can keep it alive until generate_text returns. + std::shared_ptr start_trace( + const std::string& name, + std::optional input = std::nullopt, + std::optional user_id = std::nullopt, + std::optional session_id = std::nullopt, + std::optional metadata = std::nullopt, + std::vector tags = {}); + + /// POST a batch of ingestion events to Langfuse. Returns true on 2xx. + /// Public so advanced callers can build their own event batches; most users + /// should rely on `Trace::end()`. + bool send_batch(const JsonValue& events); + + private: + Config config_; +}; + +/// A Trace owns a list of pending ingestion events and writes them all to +/// Langfuse on `end()`. Use one Trace per logical request (per call to a +/// user-facing API like `generateSQL`). +/// +/// Methods are thread-safe: callbacks fired from generate_text's worker +/// threads can record into the same Trace concurrently. +class Trace : public std::enable_shared_from_this { + public: + Trace(Tracer& tracer, std::string id, std::string name); + + const std::string& id() const { return id_; } + const std::string& name() const { return name_; } + + void set_input(JsonValue input); + void set_output(JsonValue output); + void set_user_id(std::string user_id); + void set_session_id(std::string session_id); + void set_metadata(JsonValue metadata); + void add_tag(std::string tag); + + /// Attach Langfuse callbacks to `options`. Records: + /// - a parent generation observation (model, modelParameters, input) + /// - one tool span per tool call (input=args, output=result) + /// + /// Existing callbacks on `options` are preserved (chained, called after the + /// tracer's bookkeeping). Caller must invoke `finish_generation()` after + /// `generate_text` returns to fill in end_time, output, and usage. + void instrument(GenerateOptions& options, + const std::string& generation_name = "generate_text"); + + /// Record the final output, usage, and finish_reason of the most recent + /// `instrument()`-ed generation. Safe to call once per `instrument()`. + void finish_generation(const GenerateResult& result); + + /// Flush all accumulated events to Langfuse. Idempotent; subsequent calls + /// are no-ops. Returns true on success (or when best_effort is enabled and + /// the request failed). + bool end(); + + /// Generate a new UUID v4. Exposed so callers (and `Tracer::start_trace`) + /// can mint trace ids without re-implementing UUID generation. + static std::string new_uuid(); + + private: + struct PendingGeneration { + std::string id; + std::string name; + std::chrono::system_clock::time_point start_time; + JsonValue input; + std::string model; + JsonValue model_parameters; + bool finalized = false; + }; + + // Records a tool-call span. Called from on_tool_call_start. + void record_tool_call_start(const ToolCall& call); + // Closes a tool-call span. Called from on_tool_call_finish. + void record_tool_call_finish(const ToolResult& result); + + // Builds and pushes the trace-create event with the current + // input/output/metadata/tags. + JsonValue build_trace_event() const; + + // Helpers + static std::string now_iso8601(); + + Tracer& tracer_; + std::string id_; + std::string name_; + + mutable std::mutex mu_; + std::chrono::system_clock::time_point trace_start_; + std::optional input_; + std::optional output_; + std::optional user_id_; + std::optional session_id_; + std::optional metadata_; + std::vector tags_; + + // Pending observation events accumulated until `end()` flushes them. + std::vector events_; + + // Active generation, set by instrument() and closed by finish_generation(). + std::optional active_generation_; + + // Map tool_call_id -> open span event index in events_, used to update + // span end_time / output when the tool finishes. + std::unordered_map open_tool_spans_; + + std::atomic ended_{false}; +}; + +/// Convenience wrapper around `Trace::instrument` + `Client::generate_text` + +/// `Trace::finish_generation`. Does not call `Trace::end()` so the caller can +/// still attach output/metadata/tags to the trace. +GenerateResult generate_text(Client& client, + GenerateOptions options, + Trace& trace, + const std::string& generation_name = "generate_text"); + +} // namespace langfuse +} // namespace ai diff --git a/src/langfuse/tracer.cpp b/src/langfuse/tracer.cpp new file mode 100644 index 0000000..4954a42 --- /dev/null +++ b/src/langfuse/tracer.cpp @@ -0,0 +1,538 @@ +#include "ai/langfuse.h" + +#include "ai/logger.h" + +#include +#include +#include + +#include + +namespace ai { +namespace langfuse { + +namespace { + +constexpr const char* kIngestionPath = "/api/public/ingestion"; + +struct ParsedHost { + std::string host; + int port = -1; // -1 means "default for scheme" + bool use_ssl = true; + std::string base_path; // e.g. "/langfuse" if hosted under a sub-path +}; + +ParsedHost parse_host(const std::string& url) { + ParsedHost out; + std::string s = url; + if (s.starts_with("https://")) { + s = s.substr(8); + out.use_ssl = true; + } else if (s.starts_with("http://")) { + s = s.substr(7); + out.use_ssl = false; + } + + auto slash = s.find('/'); + std::string host_port = (slash == std::string::npos) ? s : s.substr(0, slash); + out.base_path = (slash == std::string::npos) ? "" : s.substr(slash); + if (!out.base_path.empty() && out.base_path.back() == '/') + out.base_path.pop_back(); + + auto colon = host_port.find(':'); + if (colon != std::string::npos) { + out.host = host_port.substr(0, colon); + try { + out.port = std::stoi(host_port.substr(colon + 1)); + } catch (...) { + out.port = -1; + } + } else { + out.host = host_port; + } + + return out; +} + +std::string base64_encode(const std::string& in) { + static const char* kAlphabet = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + std::string out; + out.reserve(((in.size() + 2) / 3) * 4); + int val = 0; + int valb = -6; + for (unsigned char c : in) { + val = (val << 8) + c; + valb += 8; + while (valb >= 0) { + out.push_back(kAlphabet[(val >> valb) & 0x3F]); + valb -= 6; + } + } + if (valb > -6) { + out.push_back(kAlphabet[((val << 8) >> (valb + 8)) & 0x3F]); + } + while (out.size() % 4) { + out.push_back('='); + } + return out; +} + +JsonValue model_parameters_from(const GenerateOptions& options) { + JsonValue params = JsonValue::object(); + if (options.temperature) params["temperature"] = *options.temperature; + if (options.max_tokens) params["max_tokens"] = *options.max_tokens; + if (options.top_p) params["top_p"] = *options.top_p; + if (options.seed) params["seed"] = *options.seed; + if (options.frequency_penalty) + params["frequency_penalty"] = *options.frequency_penalty; + if (options.presence_penalty) + params["presence_penalty"] = *options.presence_penalty; + if (options.max_steps > 1) params["max_steps"] = options.max_steps; + return params; +} + +JsonValue messages_input_from(const GenerateOptions& options) { + // Prefer explicit messages; otherwise synthesise from system+prompt. + JsonValue arr = JsonValue::array(); + if (!options.system.empty()) { + arr.push_back({{"role", "system"}, {"content", options.system}}); + } + if (!options.messages.empty()) { + for (const auto& m : options.messages) { + JsonValue msg; + msg["role"] = m.roleToString(); + msg["content"] = m.get_text(); + arr.push_back(std::move(msg)); + } + } else if (!options.prompt.empty()) { + arr.push_back({{"role", "user"}, {"content", options.prompt}}); + } + return arr; +} + +JsonValue usage_to_langfuse(const Usage& u) { + // Langfuse accepts both legacy `usage` and `usageDetails`. Send legacy form + // for broad compatibility. + return { + {"input", u.prompt_tokens}, + {"output", u.completion_tokens}, + {"total", u.total_tokens}, + {"unit", "TOKENS"}, + }; +} + +} // namespace + +// --------------------------------------------------------------------------- +// Tracer +// --------------------------------------------------------------------------- + +Tracer::Tracer(Config config) : config_(std::move(config)) {} + +bool Tracer::is_valid() const { + return !config_.host.empty() && !config_.public_key.empty() && + !config_.secret_key.empty(); +} + +std::shared_ptr Tracer::start_trace( + const std::string& name, + std::optional input, + std::optional user_id, + std::optional session_id, + std::optional metadata, + std::vector tags) { + auto trace = std::make_shared(*this, Trace::new_uuid(), name); + if (input) trace->set_input(std::move(*input)); + if (user_id) trace->set_user_id(std::move(*user_id)); + if (session_id) trace->set_session_id(std::move(*session_id)); + if (metadata) trace->set_metadata(std::move(*metadata)); + for (auto& t : tags) trace->add_tag(std::move(t)); + return trace; +} + +bool Tracer::send_batch(const JsonValue& events) { + if (!is_valid()) { + ai::logger::log_warn( + "Langfuse tracer not configured (missing host/public_key/secret_key); " + "dropping {} events", + events.is_array() ? events.size() : 0); + return false; + } + + ParsedHost p = parse_host(config_.host); + // httplib::Client(scheme_host_port) handles http/https + port automatically. + std::string scheme_host_port = + std::string(p.use_ssl ? "https://" : "http://") + p.host; + if (p.port > 0) + scheme_host_port += ":" + std::to_string(p.port); + httplib::Client client(scheme_host_port); + client.enable_server_certificate_verification(true); + client.set_connection_timeout(config_.connection_timeout_sec, 0); + client.set_read_timeout(config_.read_timeout_sec, 0); + + std::string auth = + "Basic " + base64_encode(config_.public_key + ":" + config_.secret_key); + + JsonValue body; + body["batch"] = events; + std::string serialized = body.dump(); + + std::string path = p.base_path + kIngestionPath; + httplib::Headers headers = { + {"Authorization", auth}, + {"User-Agent", "ai-sdk-cpp-langfuse/0.1"}, + {"X-Langfuse-Sdk-Name", "ai-sdk-cpp"}, + {"X-Langfuse-Sdk-Variant", "ai-sdk-cpp"}, + }; + + auto res = client.Post(path.c_str(), headers, serialized, "application/json"); + if (!res) { + ai::logger::log_error("Langfuse ingestion failed: {}", + httplib::to_string(res.error())); + return false; + } + if (res->status >= 200 && res->status < 300) { + ai::logger::log_debug("Langfuse ingestion accepted ({}): {}", res->status, + res->body); + return true; + } + ai::logger::log_error("Langfuse ingestion non-2xx ({}): {}", res->status, + res->body); + return false; +} + +// --------------------------------------------------------------------------- +// Trace +// --------------------------------------------------------------------------- + +Trace::Trace(Tracer& tracer, std::string id, std::string name) + : tracer_(tracer), + id_(std::move(id)), + name_(std::move(name)), + trace_start_(std::chrono::system_clock::now()) {} + +void Trace::set_input(JsonValue input) { + std::lock_guard lock(mu_); + input_ = std::move(input); +} + +void Trace::set_output(JsonValue output) { + std::lock_guard lock(mu_); + output_ = std::move(output); +} + +void Trace::set_user_id(std::string user_id) { + std::lock_guard lock(mu_); + user_id_ = std::move(user_id); +} + +void Trace::set_session_id(std::string session_id) { + std::lock_guard lock(mu_); + session_id_ = std::move(session_id); +} + +void Trace::set_metadata(JsonValue metadata) { + std::lock_guard lock(mu_); + metadata_ = std::move(metadata); +} + +void Trace::add_tag(std::string tag) { + std::lock_guard lock(mu_); + tags_.push_back(std::move(tag)); +} + +void Trace::instrument(GenerateOptions& options, + const std::string& generation_name) { + PendingGeneration gen; + gen.id = new_uuid(); + gen.name = generation_name; + gen.start_time = std::chrono::system_clock::now(); + gen.input = messages_input_from(options); + gen.model = options.model; + gen.model_parameters = model_parameters_from(options); + + { + std::lock_guard lock(mu_); + active_generation_ = std::move(gen); + } + + // Chain tool callbacks so we can record per-tool spans, preserving any + // user-installed callbacks. Capture a weak_ptr so we do not extend the + // Trace's lifetime beyond the caller's intent. + std::weak_ptr self = shared_from_this(); + + auto user_tool_start = options.on_tool_call_start; + options.on_tool_call_start = + [self, user_tool_start](const ToolCall& call) { + if (auto sp = self.lock()) sp->record_tool_call_start(call); + if (user_tool_start) (*user_tool_start)(call); + }; + + auto user_tool_finish = options.on_tool_call_finish; + options.on_tool_call_finish = + [self, user_tool_finish](const ToolResult& result) { + if (auto sp = self.lock()) sp->record_tool_call_finish(result); + if (user_tool_finish) (*user_tool_finish)(result); + }; +} + +void Trace::record_tool_call_start(const ToolCall& call) { + JsonValue body; + body["id"] = new_uuid(); + body["traceId"] = id_; + body["name"] = call.tool_name; + body["startTime"] = now_iso8601(); + body["input"] = call.arguments; + body["environment"] = tracer_.config().environment; + + JsonValue event; + event["id"] = new_uuid(); + event["timestamp"] = now_iso8601(); + event["type"] = "span-create"; + event["body"] = body; + + std::lock_guard lock(mu_); + if (active_generation_) { + event["body"]["parentObservationId"] = active_generation_->id; + } + size_t idx = events_.size(); + events_.push_back(std::move(event)); + open_tool_spans_[call.id] = idx; +} + +void Trace::record_tool_call_finish(const ToolResult& result) { + std::lock_guard lock(mu_); + auto it = open_tool_spans_.find(result.tool_call_id); + if (it == open_tool_spans_.end()) { + // No matching start (shouldn't happen, but be defensive). + JsonValue body; + body["id"] = new_uuid(); + body["traceId"] = id_; + body["name"] = result.tool_name; + body["startTime"] = now_iso8601(); + body["endTime"] = now_iso8601(); + body["input"] = result.arguments; + body["output"] = result.is_success() ? result.result + : JsonValue(result.error_message()); + body["level"] = result.is_success() ? "DEFAULT" : "ERROR"; + if (active_generation_) + body["parentObservationId"] = active_generation_->id; + body["environment"] = tracer_.config().environment; + JsonValue event; + event["id"] = new_uuid(); + event["timestamp"] = now_iso8601(); + event["type"] = "span-create"; + event["body"] = body; + events_.push_back(std::move(event)); + return; + } + + // Close the open span by emitting a span-update event referencing the same id. + JsonValue& open = events_[it->second]; + std::string span_id = open["body"]["id"].get(); + + JsonValue body; + body["id"] = span_id; + body["traceId"] = id_; + body["endTime"] = now_iso8601(); + body["output"] = + result.is_success() ? result.result : JsonValue(result.error_message()); + if (!result.is_success()) { + body["level"] = "ERROR"; + body["statusMessage"] = result.error_message(); + } + + JsonValue update; + update["id"] = new_uuid(); + update["timestamp"] = now_iso8601(); + update["type"] = "span-update"; + update["body"] = body; + events_.push_back(std::move(update)); + open_tool_spans_.erase(it); +} + +void Trace::finish_generation(const GenerateResult& result) { + std::lock_guard lock(mu_); + if (!active_generation_ || active_generation_->finalized) return; + auto& gen = *active_generation_; + gen.finalized = true; + + // Aggregate usage across multi-step results when present. + Usage total = result.usage; + if (!result.steps.empty() && total.total_tokens == 0) { + int p = 0, c = 0; + for (const auto& s : result.steps) { + p += s.usage.prompt_tokens; + c += s.usage.completion_tokens; + } + total = Usage(p, c); + } + + auto fmt = [](std::chrono::system_clock::time_point t) { + auto tt = std::chrono::system_clock::to_time_t(t); + auto ms = std::chrono::duration_cast( + t.time_since_epoch()) + .count() % + 1000; + std::tm tm{}; +#if defined(_WIN32) + gmtime_s(&tm, &tt); +#else + gmtime_r(&tt, &tm); +#endif + char buf[40]; + std::snprintf(buf, sizeof(buf), "%04d-%02d-%02dT%02d:%02d:%02d.%03lldZ", + tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, + tm.tm_min, tm.tm_sec, static_cast(ms)); + return std::string(buf); + }; + + JsonValue body; + body["id"] = gen.id; + body["traceId"] = id_; + body["name"] = gen.name; + body["startTime"] = fmt(gen.start_time); + body["endTime"] = fmt(std::chrono::system_clock::now()); + body["model"] = gen.model; + if (!gen.model_parameters.empty()) + body["modelParameters"] = gen.model_parameters; + body["input"] = gen.input; + body["output"] = result.text; + body["usage"] = usage_to_langfuse(total); + body["environment"] = tracer_.config().environment; + + JsonValue meta = JsonValue::object(); + meta["finish_reason"] = result.finishReasonToString(); + if (!result.steps.empty()) meta["steps"] = result.steps.size(); + if (!result.warnings.empty()) meta["warnings"] = result.warnings; + body["metadata"] = std::move(meta); + + if (!result.is_success() && result.error) { + body["level"] = "ERROR"; + body["statusMessage"] = *result.error; + } + + JsonValue event; + event["id"] = new_uuid(); + event["timestamp"] = now_iso8601(); + event["type"] = "generation-create"; + event["body"] = std::move(body); + events_.push_back(std::move(event)); +} + +JsonValue Trace::build_trace_event() const { + // Caller must hold mu_. + JsonValue body; + body["id"] = id_; + body["name"] = name_; + body["timestamp"] = [this] { + auto tt = std::chrono::system_clock::to_time_t(trace_start_); + auto ms = std::chrono::duration_cast( + trace_start_.time_since_epoch()) + .count() % + 1000; + std::tm tm{}; +#if defined(_WIN32) + gmtime_s(&tm, &tt); +#else + gmtime_r(&tt, &tm); +#endif + char buf[40]; + std::snprintf(buf, sizeof(buf), "%04d-%02d-%02dT%02d:%02d:%02d.%03lldZ", + tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, + tm.tm_min, tm.tm_sec, static_cast(ms)); + return std::string(buf); + }(); + body["environment"] = tracer_.config().environment; + if (!tracer_.config().release.empty()) + body["release"] = tracer_.config().release; + if (input_) body["input"] = *input_; + if (output_) body["output"] = *output_; + if (user_id_) body["userId"] = *user_id_; + if (session_id_) body["sessionId"] = *session_id_; + if (metadata_) body["metadata"] = *metadata_; + if (!tags_.empty()) body["tags"] = tags_; + + JsonValue event; + event["id"] = new_uuid(); + event["timestamp"] = now_iso8601(); + event["type"] = "trace-create"; + event["body"] = std::move(body); + return event; +} + +bool Trace::end() { + if (ended_.exchange(true)) return true; + + JsonValue batch = JsonValue::array(); + { + std::lock_guard lock(mu_); + batch.push_back(build_trace_event()); + for (auto& ev : events_) batch.push_back(std::move(ev)); + events_.clear(); + } + + bool ok = tracer_.send_batch(batch); + return ok || tracer_.config().best_effort; +} + +std::string Trace::now_iso8601() { + auto now = std::chrono::system_clock::now(); + auto tt = std::chrono::system_clock::to_time_t(now); + auto ms = std::chrono::duration_cast( + now.time_since_epoch()) + .count() % + 1000; + std::tm tm{}; +#if defined(_WIN32) + gmtime_s(&tm, &tt); +#else + gmtime_r(&tt, &tm); +#endif + char buf[40]; + std::snprintf(buf, sizeof(buf), "%04d-%02d-%02dT%02d:%02d:%02d.%03lldZ", + tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, + tm.tm_min, tm.tm_sec, static_cast(ms)); + return std::string(buf); +} + +std::string Trace::new_uuid() { + // RFC 4122 v4-compatible UUID. + static thread_local std::mt19937_64 rng{std::random_device{}()}; + std::uniform_int_distribution dist; + uint64_t a = dist(rng); + uint64_t b = dist(rng); + + // Version 4 + a = (a & 0xFFFFFFFFFFFF0FFFULL) | 0x0000000000004000ULL; + // Variant 10xx + b = (b & 0x3FFFFFFFFFFFFFFFULL) | 0x8000000000000000ULL; + + char buf[37]; + std::snprintf(buf, sizeof(buf), + "%08x-%04x-%04x-%04x-%012llx", + static_cast(a >> 32), + static_cast((a >> 16) & 0xFFFF), + static_cast(a & 0xFFFF), + static_cast(b >> 48), + static_cast(b & 0xFFFFFFFFFFFFULL)); + return std::string(buf); +} + +// --------------------------------------------------------------------------- +// Free function helper +// --------------------------------------------------------------------------- + +GenerateResult generate_text(Client& client, + GenerateOptions options, + Trace& trace, + const std::string& generation_name) { + trace.instrument(options, generation_name); + GenerateResult result = client.generate_text(options); + trace.finish_generation(result); + return result; +} + +} // namespace langfuse +} // namespace ai From 91d4de68f06442927ed43ce18cd3c3b1cee1db4e Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 1 May 2026 10:45:06 +0900 Subject: [PATCH 2/2] style: apply clang-format --- examples/langfuse_tracing.cpp | 15 +++-- include/ai/langfuse.h | 12 ++-- src/langfuse/tracer.cpp | 109 ++++++++++++++++++++-------------- 3 files changed, 81 insertions(+), 55 deletions(-) diff --git a/examples/langfuse_tracing.cpp b/examples/langfuse_tracing.cpp index a100017..261d268 100644 --- a/examples/langfuse_tracing.cpp +++ b/examples/langfuse_tracing.cpp @@ -37,14 +37,16 @@ ai::JsonValue lookup_user(const ai::JsonValue& args, }; auto id = args.value("user_id", std::string{}); auto it = users.find(id); - if (it != users.end()) return it->second; + if (it != users.end()) + return it->second; return ai::JsonValue{{"error", "user not found"}}; } ai::JsonValue get_weather(const ai::JsonValue& args, const ai::ToolExecutionContext&) { auto loc = args.value("location", std::string{"unknown"}); - return ai::JsonValue{{"location", loc}, {"temperature_c", 21}, {"sky", "clear"}}; + return ai::JsonValue{ + {"location", loc}, {"temperature_c", 21}, {"sky", "clear"}}; } } // namespace @@ -58,8 +60,10 @@ int main() { } const char* host = std::getenv("LANGFUSE_HOST"); - if (!host || !*host) host = std::getenv("LANGFUSE_BASE_URL"); - if (!host || !*host) host = "https://cloud.langfuse.com"; + if (!host || !*host) + host = std::getenv("LANGFUSE_BASE_URL"); + if (!host || !*host) + host = "https://cloud.langfuse.com"; ai::langfuse::Tracer tracer({ .host = host, @@ -97,8 +101,7 @@ int main() { auto trace = tracer.start_trace("langfuse_tracing_example"); trace->set_input(options.prompt); - trace->set_metadata({{"example", "langfuse_tracing"}, - {"sdk", "ai-sdk-cpp"}}); + trace->set_metadata({{"example", "langfuse_tracing"}, {"sdk", "ai-sdk-cpp"}}); auto result = ai::langfuse::generate_text(client, std::move(options), *trace); diff --git a/include/ai/langfuse.h b/include/ai/langfuse.h index 8ee9f83..99d6e74 100644 --- a/include/ai/langfuse.h +++ b/include/ai/langfuse.h @@ -20,7 +20,8 @@ /// trace->set_input(user_prompt); /// /// ai::GenerateOptions options{...}; -/// auto result = ai::langfuse::generate_text(client, std::move(options), *trace); +/// auto result = ai::langfuse::generate_text(client, std::move(options), +/// *trace); /// /// trace->set_output(result.text); /// trace->end(); // synchronous flush @@ -200,10 +201,11 @@ class Trace : public std::enable_shared_from_this { /// Convenience wrapper around `Trace::instrument` + `Client::generate_text` + /// `Trace::finish_generation`. Does not call `Trace::end()` so the caller can /// still attach output/metadata/tags to the trace. -GenerateResult generate_text(Client& client, - GenerateOptions options, - Trace& trace, - const std::string& generation_name = "generate_text"); +GenerateResult generate_text( + Client& client, + GenerateOptions options, + Trace& trace, + const std::string& generation_name = "generate_text"); } // namespace langfuse } // namespace ai diff --git a/src/langfuse/tracer.cpp b/src/langfuse/tracer.cpp index 4954a42..d4a51be 100644 --- a/src/langfuse/tracer.cpp +++ b/src/langfuse/tracer.cpp @@ -1,12 +1,10 @@ #include "ai/langfuse.h" - #include "ai/logger.h" #include #include -#include - #include +#include namespace ai { namespace langfuse { @@ -80,15 +78,20 @@ std::string base64_encode(const std::string& in) { JsonValue model_parameters_from(const GenerateOptions& options) { JsonValue params = JsonValue::object(); - if (options.temperature) params["temperature"] = *options.temperature; - if (options.max_tokens) params["max_tokens"] = *options.max_tokens; - if (options.top_p) params["top_p"] = *options.top_p; - if (options.seed) params["seed"] = *options.seed; + if (options.temperature) + params["temperature"] = *options.temperature; + if (options.max_tokens) + params["max_tokens"] = *options.max_tokens; + if (options.top_p) + params["top_p"] = *options.top_p; + if (options.seed) + params["seed"] = *options.seed; if (options.frequency_penalty) params["frequency_penalty"] = *options.frequency_penalty; if (options.presence_penalty) params["presence_penalty"] = *options.presence_penalty; - if (options.max_steps > 1) params["max_steps"] = options.max_steps; + if (options.max_steps > 1) + params["max_steps"] = options.max_steps; return params; } @@ -143,11 +146,16 @@ std::shared_ptr Tracer::start_trace( std::optional metadata, std::vector tags) { auto trace = std::make_shared(*this, Trace::new_uuid(), name); - if (input) trace->set_input(std::move(*input)); - if (user_id) trace->set_user_id(std::move(*user_id)); - if (session_id) trace->set_session_id(std::move(*session_id)); - if (metadata) trace->set_metadata(std::move(*metadata)); - for (auto& t : tags) trace->add_tag(std::move(t)); + if (input) + trace->set_input(std::move(*input)); + if (user_id) + trace->set_user_id(std::move(*user_id)); + if (session_id) + trace->set_session_id(std::move(*session_id)); + if (metadata) + trace->set_metadata(std::move(*metadata)); + for (auto& t : tags) + trace->add_tag(std::move(t)); return trace; } @@ -263,18 +271,21 @@ void Trace::instrument(GenerateOptions& options, std::weak_ptr self = shared_from_this(); auto user_tool_start = options.on_tool_call_start; - options.on_tool_call_start = - [self, user_tool_start](const ToolCall& call) { - if (auto sp = self.lock()) sp->record_tool_call_start(call); - if (user_tool_start) (*user_tool_start)(call); - }; + options.on_tool_call_start = [self, user_tool_start](const ToolCall& call) { + if (auto sp = self.lock()) + sp->record_tool_call_start(call); + if (user_tool_start) + (*user_tool_start)(call); + }; auto user_tool_finish = options.on_tool_call_finish; - options.on_tool_call_finish = - [self, user_tool_finish](const ToolResult& result) { - if (auto sp = self.lock()) sp->record_tool_call_finish(result); - if (user_tool_finish) (*user_tool_finish)(result); - }; + options.on_tool_call_finish = [self, + user_tool_finish](const ToolResult& result) { + if (auto sp = self.lock()) + sp->record_tool_call_finish(result); + if (user_tool_finish) + (*user_tool_finish)(result); + }; } void Trace::record_tool_call_start(const ToolCall& call) { @@ -313,8 +324,8 @@ void Trace::record_tool_call_finish(const ToolResult& result) { body["startTime"] = now_iso8601(); body["endTime"] = now_iso8601(); body["input"] = result.arguments; - body["output"] = result.is_success() ? result.result - : JsonValue(result.error_message()); + body["output"] = + result.is_success() ? result.result : JsonValue(result.error_message()); body["level"] = result.is_success() ? "DEFAULT" : "ERROR"; if (active_generation_) body["parentObservationId"] = active_generation_->id; @@ -328,7 +339,8 @@ void Trace::record_tool_call_finish(const ToolResult& result) { return; } - // Close the open span by emitting a span-update event referencing the same id. + // Close the open span by emitting a span-update event referencing the same + // id. JsonValue& open = events_[it->second]; std::string span_id = open["body"]["id"].get(); @@ -354,7 +366,8 @@ void Trace::record_tool_call_finish(const ToolResult& result) { void Trace::finish_generation(const GenerateResult& result) { std::lock_guard lock(mu_); - if (!active_generation_ || active_generation_->finalized) return; + if (!active_generation_ || active_generation_->finalized) + return; auto& gen = *active_generation_; gen.finalized = true; @@ -404,8 +417,10 @@ void Trace::finish_generation(const GenerateResult& result) { JsonValue meta = JsonValue::object(); meta["finish_reason"] = result.finishReasonToString(); - if (!result.steps.empty()) meta["steps"] = result.steps.size(); - if (!result.warnings.empty()) meta["warnings"] = result.warnings; + if (!result.steps.empty()) + meta["steps"] = result.steps.size(); + if (!result.warnings.empty()) + meta["warnings"] = result.warnings; body["metadata"] = std::move(meta); if (!result.is_success() && result.error) { @@ -447,12 +462,18 @@ JsonValue Trace::build_trace_event() const { body["environment"] = tracer_.config().environment; if (!tracer_.config().release.empty()) body["release"] = tracer_.config().release; - if (input_) body["input"] = *input_; - if (output_) body["output"] = *output_; - if (user_id_) body["userId"] = *user_id_; - if (session_id_) body["sessionId"] = *session_id_; - if (metadata_) body["metadata"] = *metadata_; - if (!tags_.empty()) body["tags"] = tags_; + if (input_) + body["input"] = *input_; + if (output_) + body["output"] = *output_; + if (user_id_) + body["userId"] = *user_id_; + if (session_id_) + body["sessionId"] = *session_id_; + if (metadata_) + body["metadata"] = *metadata_; + if (!tags_.empty()) + body["tags"] = tags_; JsonValue event; event["id"] = new_uuid(); @@ -463,13 +484,15 @@ JsonValue Trace::build_trace_event() const { } bool Trace::end() { - if (ended_.exchange(true)) return true; + if (ended_.exchange(true)) + return true; JsonValue batch = JsonValue::array(); { std::lock_guard lock(mu_); batch.push_back(build_trace_event()); - for (auto& ev : events_) batch.push_back(std::move(ev)); + for (auto& ev : events_) + batch.push_back(std::move(ev)); events_.clear(); } @@ -510,13 +533,11 @@ std::string Trace::new_uuid() { b = (b & 0x3FFFFFFFFFFFFFFFULL) | 0x8000000000000000ULL; char buf[37]; - std::snprintf(buf, sizeof(buf), - "%08x-%04x-%04x-%04x-%012llx", - static_cast(a >> 32), - static_cast((a >> 16) & 0xFFFF), - static_cast(a & 0xFFFF), - static_cast(b >> 48), - static_cast(b & 0xFFFFFFFFFFFFULL)); + std::snprintf( + buf, sizeof(buf), "%08x-%04x-%04x-%04x-%012llx", + static_cast(a >> 32), static_cast((a >> 16) & 0xFFFF), + static_cast(a & 0xFFFF), static_cast(b >> 48), + static_cast(b & 0xFFFFFFFFFFFFULL)); return std::string(buf); }