Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b97ad8e
add tags to pull_dataset and experiment config
mehulsonowal Jan 16, 2026
1fcaad9
fix some constructor calls and add record tags to span
gary-huang Jan 26, 2026
9100cb5
Merge branch 'main' of github.com:DataDog/dd-trace-py into mehul.sono…
mehulsonowal Feb 4, 2026
b0e6f22
Merge branch 'main' of github.com:DataDog/dd-trace-py into mehul.sono…
mehulsonowal Feb 12, 2026
d26484e
update batch update and get to v2
mehulsonowal Feb 13, 2026
a65b99c
add unit tests, new fixture
mehulsonowal Feb 19, 2026
b284142
Merge branch 'main' of github.com:DataDog/dd-trace-py into mehul.sono…
mehulsonowal Feb 19, 2026
4d7e924
feat(llmobs): add distributed experiment support
gary-huang Feb 19, 2026
a16b0b8
Merge remote-tracking branch 'origin/main' into mehul.sonowal/add-tag…
gary-huang Feb 20, 2026
2e67dcb
Merge remote-tracking branch 'origin/gary/mlob-4860-v2' into gary/bai…
gary-huang Feb 20, 2026
9690c23
see if systems tests pass
gary-huang Feb 20, 2026
85211e8
new cassettes for existing tests
gary-huang Feb 20, 2026
1762458
add back tests
gary-huang Feb 20, 2026
7642532
undo claude suggestion?
gary-huang Feb 20, 2026
10fac2f
ruff, reno and more cassettes
gary-huang Feb 20, 2026
94e51a6
typing
gary-huang Feb 20, 2026
4eb394f
fix ts
gary-huang Feb 20, 2026
0bac1f2
feat(llmobs): add distributed experiment support
gary-huang Feb 19, 2026
9fd11e0
tests
gary-huang Feb 20, 2026
faa49fe
add example
gary-huang Feb 20, 2026
ee1cf97
Merge remote-tracking branch 'origin/mehul.sonowal/add-tags-to-pull-d…
gary-huang Feb 20, 2026
91c415e
Merge remote-tracking branch 'origin/gary/mlob-4860-v2' into gary/bai…
gary-huang Feb 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
110 changes: 109 additions & 1 deletion ddtrace/llmobs/_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import field
import re
import sys
import time
import traceback
from typing import TYPE_CHECKING
from typing import Any
Expand Down Expand Up @@ -37,6 +38,7 @@
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._writer import LLMObsExperimentEvalMetricEvent
from ddtrace.llmobs._writer import LLMObsExperimentsClient
from ddtrace.llmobs.types import ExportedLLMObsSpan


logger = get_logger(__name__)
Expand Down Expand Up @@ -393,12 +395,14 @@ class DatasetRecordRaw(TypedDict):
input_data: DatasetRecordInputType
expected_output: JSONType
metadata: dict[str, Any]
tags: list[str]


class _UpdatableDatasetRecordOptional(TypedDict, total=False):
input_data: DatasetRecordInputType
expected_output: JSONType
metadata: dict[str, Any]
tags: list[str]


class UpdatableDatasetRecord(_UpdatableDatasetRecordOptional):
Expand Down Expand Up @@ -469,6 +473,7 @@ class ExperimentResult(TypedDict):
class Dataset:
name: str
description: str
filter_tags: Optional[list[str]]
_id: str
_records: list[DatasetRecord]
_version: int
Expand All @@ -490,10 +495,12 @@ def __init__(
latest_version: int,
version: int,
_dne_client: "LLMObsExperimentsClient",
filter_tags: Optional[list[str]] = None,
) -> None:
self.name = name
self.project = project
self.description = description
self.filter_tags = filter_tags or []
self._id = dataset_id
self._latest_version = latest_version
self._version = version
Expand Down Expand Up @@ -554,6 +561,7 @@ def _push(
updated_records = list(self._updated_record_ids_to_new_fields.values())
new_version, new_record_ids, new_canonical_ids = self._dne_client.dataset_batch_update(
dataset_id=self._id,
project_id=self.project["_id"],
insert_records=list(self._new_records_by_record_id.values()),
update_records=updated_records,
delete_record_ids=self._deleted_record_ids,
Expand Down Expand Up @@ -737,6 +745,11 @@ class Experiment:
_evaluators: Sequence[Union[EvaluatorType, AsyncEvaluatorType]]
_summary_evaluators: Sequence[Union[SummaryEvaluatorType, AsyncSummaryEvaluatorType]]

@classmethod
def _NO_OP_TASK(cls, input_data, config):
"""No-op task used when initializing distributed experiment objects on remote hosts."""
return None

def __init__(
self,
name: str,
Expand All @@ -750,6 +763,7 @@ def __init__(
_llmobs_instance: Optional["LLMObs"] = None,
summary_evaluators: Optional[Sequence[Union[SummaryEvaluatorType, AsyncSummaryEvaluatorType]]] = None,
runs: Optional[int] = None,
is_distributed: Optional[bool] = False,
) -> None:
self.name = name
self._task = task
Expand All @@ -763,8 +777,12 @@ def __init__(
self._tags["dataset_name"] = dataset.name
self._tags["experiment_name"] = name
self._config: dict[str, JSONType] = config or {}
# Write dataset tags to experiment config
if dataset.filter_tags:
self._config["filtered_record_tags"] = cast(JSONType, dataset.filter_tags)
self._runs: int = runs or 1
self._llmobs_instance = _llmobs_instance
self._is_distributed = is_distributed

if not project_name:
raise ValueError(
Expand All @@ -776,6 +794,7 @@ def __init__(
self._project_id: Optional[str] = None
self._id: Optional[str] = None
self._run_name: Optional[str] = None
self.experiment_span: Optional["ExportedLLMObsSpan"] = None

@property
def url(self) -> str:
Expand Down Expand Up @@ -974,7 +993,7 @@ def _prepare_summary_evaluator_data(

return inputs, outputs, expected_outputs, metadata_list, eval_results_by_name

def _setup_experiment(self, llmobs_not_enabled_error: str) -> None:
def _setup_experiment(self, llmobs_not_enabled_error: str, ensure_unique: bool = True) -> None:
if not self._llmobs_instance or not self._llmobs_instance.enabled:
raise ValueError(llmobs_not_enabled_error)

Expand All @@ -991,6 +1010,7 @@ def _setup_experiment(self, llmobs_not_enabled_error: str) -> None:
convert_tags_dict_to_list(self._tags),
self._description,
self._runs,
ensure_unique,
)
self._id = experiment_id
self._tags["experiment_id"] = str(experiment_id)
Expand Down Expand Up @@ -1061,6 +1081,8 @@ async def _process_record(
experiment_name=self.name,
) as span:
span_context = self._llmobs_instance.export_span(span=span)
if self._is_distributed:
self.experiment_span = span_context
if span_context:
span_id = span_context.get("span_id", "")
trace_id = span_context.get("trace_id", "")
Expand Down Expand Up @@ -1327,6 +1349,92 @@ async def _evaluate_summary_single(summary_evaluator: Any) -> tuple[str, dict[st

return evaluations

async def _run_task_single_iteration(
self,
jobs: int = 1,
raise_errors: bool = False,
run_iteration: Optional[int] = 1,
) -> ExperimentResult:
run = _ExperimentRunInfo(run_iteration or 1)
self._tags["run_id"] = str(run._id)
self._tags["run_iteration"] = str(run._run_iteration)
task_results = await self._run_task(jobs, run, raise_errors, None)
evaluations = await self._run_evaluators(task_results, raise_errors=raise_errors, jobs=jobs)
run_result = self._merge_results(run, task_results, evaluations, [])
experiment_evals = self._generate_metrics_from_exp_results(run_result)
self._llmobs_instance._dne_client.experiment_eval_post( # type: ignore[union-attr]
cast(str, self._id), experiment_evals, convert_tags_dict_to_list(self._tags)
)
return {
"summary_evaluations": {},
"rows": [],
"runs": [run_result],
}

def _submit_eval_metric(
self,
eval_name: str,
eval_value: JSONType,
span: Optional["ExportedLLMObsSpan"] = None,
timestamp_ms: Optional[int] = None,
is_summary_eval: Optional[bool] = None,
reasoning: Optional[str] = None,
assessment: Optional[str] = None,
metadata: Optional[dict[str, JSONType]] = None,
tags: Optional[dict[str, str]] = None,
) -> None:
"""Submit an evaluation metric for a distributed experiment.

:param eval_name: Name of the evaluation metric
:param eval_value: Value of the evaluation metric
:param span: Optional span context dict with span_id and trace_id. If None and not a
summary eval, uses the last span from _run_task_single_iteration.
:param timestamp_ms: Optional timestamp in milliseconds
:param is_summary_eval: Whether this is a summary-level evaluation
:param reasoning: Optional reasoning string
:param assessment: Optional assessment string
:param metadata: Optional metadata dict
:param tags: Optional tags dict
"""
if not self._is_distributed:
raise ValueError("this method is only used for distributed experiments")

if span is not None and (
not isinstance(span, dict)
or not isinstance(span.get("span_id"), str)
or not isinstance(span.get("trace_id"), str)
):
raise TypeError(
"`span` must be a dictionary containing both span_id and trace_id keys. "
"LLMObs.export_span() can be used to generate this dictionary from a given span."
)

if span is None and not is_summary_eval and self.experiment_span is None:
raise TypeError("unexpected state, must supply span or must run the experiment first")

if span is None and not is_summary_eval:
span = self.experiment_span

timestamp_ns = int(time.time() * 1e9)

eval_metric = self._generate_metric_from_evaluation(
eval_name,
eval_value,
None,
span.get("span_id", "") if span else "",
span.get("trace_id", "") if span else "",
timestamp_ns,
"summary" if is_summary_eval else "custom",
reasoning,
assessment,
metadata,
tags,
)

self._llmobs_instance._dne_client.experiment_eval_post( # type: ignore[union-attr]
cast(str, self._id), [eval_metric], convert_tags_dict_to_list(self._tags)
)


class SyncExperiment:
"""Thin synchronous wrapper around the async-native ``Experiment``.
Expand Down
78 changes: 77 additions & 1 deletion ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@
from ddtrace.llmobs._experiment import Dataset
from ddtrace.llmobs._experiment import DatasetRecord
from ddtrace.llmobs._experiment import DatasetRecordInputType
from ddtrace.llmobs._experiment import EvaluatorResult
from ddtrace.llmobs._experiment import EvaluatorType
from ddtrace.llmobs._experiment import Experiment
from ddtrace.llmobs._experiment import ExperimentResult
from ddtrace.llmobs._experiment import JSONType
from ddtrace.llmobs._experiment import Project
from ddtrace.llmobs._experiment import SummaryEvaluatorType
Expand Down Expand Up @@ -892,9 +894,15 @@ def pull_dataset(
dataset_name: str,
project_name: Optional[str] = None,
version: Optional[int] = None,
tags: Optional[list[str]] = None,
) -> Dataset:
if tags is not None and not isinstance(tags, list):
raise ValueError(
"tags must be a list of strings in the format of tag key value pairs. "
'Example: tags=["key1:value1", "key2:value2"]'
)
ds = cls._instance._dne_client.dataset_get_with_records(
dataset_name, (project_name or cls._project_name), version
dataset_name, (project_name or cls._project_name), version, tags
)
return ds

Expand Down Expand Up @@ -1007,6 +1015,7 @@ def create_dataset_from_csv(
input_data={col: row[col] for col in input_data_columns},
expected_output={col: row[col] for col in expected_output_columns},
metadata={col: row[col] for col in metadata_columns},
tags=[],
record_id="",
canonical_id=None,
)
Expand Down Expand Up @@ -1321,6 +1330,73 @@ def async_experiment(
runs=runs,
)

@classmethod
def _distributed_experiment(
cls,
name: str,
dataset: Optional[Dataset] = None,
description: str = "",
project_name: Optional[str] = None,
tags: Optional[dict[str, str]] = None,
config: Optional[ConfigType] = None,
runs: Optional[int] = 1,
) -> Experiment:
experiment = Experiment(
name,
Experiment._NO_OP_TASK,
dataset, # type: ignore[arg-type]
[],
project_name=project_name or cls._project_name,
tags=tags,
description=description,
config=config,
_llmobs_instance=cls._instance,
runs=runs,
is_distributed=True,
)
experiment._setup_experiment(
"LLMObs is not enabled. Ensure LLM Observability is enabled via `LLMObs.enable(...)`",
ensure_unique=False,
)
return experiment

@classmethod
def _run_for_experiment(
cls,
experiment_id: str,
task: Callable[[DatasetRecordInputType, Optional[ConfigType]], JSONType],
dataset_records: list[DatasetRecord],
evaluators: list[
Union[
Callable[[DatasetRecordInputType, JSONType, JSONType], Union[JSONType, EvaluatorResult]],
Callable[[], Union[JSONType, EvaluatorResult]],
]
],
jobs: int = 1,
raise_errors: bool = False,
run_iteration: Optional[int] = 1,
tags: Optional[dict[str, str]] = None,
) -> tuple[Experiment, ExperimentResult]:
if not cls._instance or not cls._instance.enabled:
raise ValueError("LLMObs is not enabled. Ensure LLM Observability is enabled via `LLMObs.enable(...)`")
experiment = cls._instance._dne_client.experiment_get(experiment_id)
experiment._llmobs_instance = cls._instance
experiment._dataset._records = dataset_records
experiment._task = task
experiment._evaluators = evaluators # type: ignore[assignment]

coro = experiment._run_task_single_iteration(jobs, raise_errors, run_iteration)
try:
asyncio.get_running_loop()
except RuntimeError:
results = asyncio.run(coro)
else:
import concurrent.futures

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
results = pool.submit(asyncio.run, coro).result()
return experiment, results

@classmethod
def register_processor(cls, processor: Optional[Callable[[LLMObsSpan], Optional[LLMObsSpan]]] = None) -> None:
"""Register a processor to be called on each LLMObs span.
Expand Down
Loading
Loading