refactor!: compositional pipeline API, path-free engine, and pipeline unification#452
refactor!: compositional pipeline API, path-free engine, and pipeline unification#452
Conversation
…ons, auto-format metrics/plots
…k/state/skip Replace file paths with structured ArtifactIdentity(producer, key) as canonical identity across the engine stack. Add WorkspaceStore for path resolution, lock schema v2 with identity-based dep/output entries, three-tier skip detection with generation tracking, and merkle ID computation.
Fix generation skip (identity key vs string comparison), migrate all 30+ src/ and test files to ArtifactIdentity types, resolve 99 basedpyright errors, consolidate 12 duplicated identity stringifiers, make coordinator skip store-aware, fix input identity resolution in DAG validation, and harden identity validation (reject colons, validate None-output stage deps).
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
…ing through to path
…iscovery in completion
- Change line 302 in console.py: use 'identity' instead of 'path' for dep_changes key_field - Add ArtifactIdentity formatting in _print_changes (lines 260-261): convert ArtifactIdentity to string using identity_key() - Add imports: ArtifactIdentity and identity_key from pivot.types - Add test: test_explain_stage_dep_changes verifies dep changes render without KeyError Fixes KeyError: 'path' crash when console.explain_stage() processes dependency changes.
Compose Pipeline._inputs stores path info (data/external/ vs data/raw/) but build() discarded it. Now input_bindings flow through: compose.build() -> Pipeline.set_input_bindings() -> include() merge -> StoreSpec -> WorkspaceStore._resolve_input_path. Fixes --all mode crash: 'Stage depends on X which does not exist on disk' when external inputs resolve to wrong directory.
Create a presentation layer that materializes CAS refs as workspace symlinks in conventional directories (data/, metrics/, plots/). This gives users browsable output at familiar locations while the actual data lives in content-addressed storage. - Create presentation.py module with present() function - Add engine hook in _orchestrate_execution to call presentation layer - Create comprehensive tests for symlink creation and edge cases - All tests pass, no regressions in full test suite
Single-field TypedDict returns had their field key collapsed to None, causing the worker to pass the entire dict to writers instead of extracting the value. Use SINGLE_OUTPUT_KEY to distinguish bare returns (key=None) from TypedDict fields (key preserved). Also: reject '_single' as a TypedDict field name (reserved), remove dead code (_generate_artifact_path, _artifact_dir_prefix).
…with missing deps
Fix key collision where multiple outputs sharing the same loader class (e.g.,
two CSV outputs with different index_col) had config hashes clobbered by
dict.update(). Keys are now namespaced as dep:{name}:loader:... and
out:{identity}:loader:... to prevent collisions.
Also fix explain.py to surface code/param changes even when deps are missing,
so users see 'Code changed; Missing deps: ...' instead of just 'Missing deps'.
The fingerprint is pre-computed by the caller so the dict diff is free.
Bump version to 0.2.0a1 — breaking change to fingerprint key format forces
one-time re-run of all stages on upgrade.
Stage names are always `{pipeline_name}/{bare_name}` from creation in build().
Cross-pipeline dep resolution is trivially correct, include() is simplified
from ~40 lines of collision logic to ~10 lines of deep-copy, and identity
drift bugs are structurally prevented.
- registry: add_existing() invariant rejects mismatched out.identity.producer
- compose: build() prefixes _StageNode.name before creating identities
- pipeline: include() and resolve_external_dependencies() simplified
- store: drop _pipeline_name from output paths (stage prefix is sufficient)
- names: display and resolution helpers for single-pipeline CLI convenience
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
…s locally Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
…_to_artifact_ref Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
…aversal, extract helpers
… import, matrix Remove Pipeline class (replaced by compositional API), dvc_import module, and matrix module. Update all CLI commands, engine, and executor to use the registry-only code path. Simplify discovery to pipeline.py-only. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Call counts for auto-numbering are now keyed by (func_name, variant_key) instead of just func_name. This means calling the same function in different variant contexts produces clean names like merge_data@current and merge_data@legacy, instead of merge_data and merge_data@1@legacy. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
… keys as paths After the ArtifactIdentity migration, explain.py still passed identity key strings to hash_dependencies(list[str]), which treats them as file paths. This made every dep appear 'missing' in pivot status regardless of actual disk state. Fix by passing the deps dict (ArtifactRefs) and a WorkspaceStore so hash_dependencies uses store-based resolution. Also adds orphaned lock file detection to pivot status, warning when lock files exist for stages that are no longer registered. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
…overage Update all tests to use compositional API instead of imperative Pipeline. Remove tests for deleted modules (dvc_import, matrix). Add tests for: - Variant call count scoping per variant context - Orphaned lock file detection - Pipeline discovery changes Add pipeline unification design documents. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
There was a problem hiding this comment.
Pull request overview
This PR replaces Pivot’s pipeline API and execution model with a compositional @stage/Pipeline interface and a path-free engine built around structured ArtifactIdentity(producer, key) identifiers, updating CLI/TUI, lock/state handling, and related utilities accordingly.
Changes:
- Introduces a compositional pipeline API (function composition via handles) and unifies pipeline implementations behind
PipelineLike. - Refactors engine/CLI/TUI to use identity keys/objects instead of filesystem paths (locks, skip detection, RPC, display).
- Removes YAML/DVC/matrix legacy pipeline machinery and associated command surface area.
Reviewed changes
Copilot reviewed 92 out of 245 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/pivot/src/pivot/storage/artifact_lock.py | Switches lock request expansion to identity-key based lock keys. |
| packages/pivot/src/pivot/status.py | Adapts status/explain plumbing to PipelineLike, identities, and workspace store resolution. |
| packages/pivot/src/pivot/stage_def.py | Removes large annotation-based stage-definition extraction machinery; updates docs/comments. |
| packages/pivot/src/pivot/skip.py | Changes dep/output comparisons and diffs to operate on ArtifactIdentity and supports “accessed hashes”. |
| packages/pivot/src/pivot/show/plots.py | Updates plot discovery and lock lookup to support ArtifactRef + identity-based hashes. |
| packages/pivot/src/pivot/show/metrics.py | Updates metrics discovery/head lookup to support ArtifactRef and identity-key lock access. |
| packages/pivot/src/pivot/show/data.py | Updates data output discovery/head lookup and adds loader-based format inference for ArtifactRef. |
| packages/pivot/src/pivot/show/common.py | Adds compatibility extraction of output hashes from lock entries using key/path/display. |
| packages/pivot/src/pivot/run_history.py | Updates run-history input hash computation to include identity fields instead of paths. |
| packages/pivot/src/pivot/remote/sync.py | Migrates remote sync target hashing logic to identity keys and identity-based lock maps. |
| packages/pivot/src/pivot/pipeline/init.py | Removes legacy pipeline package exports. |
| packages/pivot/src/pivot/outputs.py | Removes legacy Dep/PlaceholderDep markers; updates doc examples to match new API. |
| packages/pivot/src/pivot/names.py | Adds helpers for display/resolution of stage names with optional pipeline-prefix stripping. |
| packages/pivot/src/pivot/merkle.py | Adds merkle-id computation helper for identity-first hashing. |
| packages/pivot/src/pivot/matrix.py | Deletes legacy matrix expansion module. |
| packages/pivot/src/pivot/loaders.py | Adds format_extension() helper for mapping loader instances to default extensions. |
| packages/pivot/src/pivot/import_artifact.py | Makes import path resolution/logging more robust with lock entries using key/display/path. |
| packages/pivot/src/pivot/ignore.py | Updates protected config filenames for pipeline discovery (pipeline.py). |
| packages/pivot/src/pivot/fingerprint.py | Updates guidance text to match new API (“function parameter” instead of Dep(...)). |
| packages/pivot/src/pivot/explain.py | Migrates explain to identity-keyed deps/outs and to Store-based hashing when available. |
| packages/pivot/src/pivot/executor/core.py | Migrates executor plumbing to PipelineLike and store specs; adjusts worker stage info shape. |
| packages/pivot/src/pivot/executor/commit.py | Refactors commit pipeline to identity-based hashing and Store-based output hashing. |
| packages/pivot/src/pivot/exceptions.py | Updates pipeline-not-found messaging and introduces PipelineConfigError. |
| packages/pivot/src/pivot/engine/watch.py | Migrates graph queries to identity parsing for producer/consumer lookup. |
| packages/pivot/src/pivot/engine/types.py | Updates code/config change docstring to remove pivot.yaml mention. |
| packages/pivot/src/pivot/engine/sources.py | Removes pivot.yaml/yml from config file watch list. |
| packages/pivot/src/pivot/engine/agent_rpc.py | Updates RPC to return structured identity JSON objects for deps/outs and uses PipelineLike. |
| packages/pivot/src/pivot/discovery.py | Removes YAML discovery and validates discovered pipelines against PipelineLike. |
| packages/pivot/src/pivot/cli/verify.py | Migrates verify to identity-keyed lock lookups with optional workspace display-path resolution. |
| packages/pivot/src/pivot/cli/track.py | Updates overlap detection to use identity keys for stage outputs. |
| packages/pivot/src/pivot/cli/targets.py | Adds identity-aware CLI target parsing/resolution and bare-stage-name resolution helpers. |
| packages/pivot/src/pivot/cli/status.py | Integrates orphaned lock detection and passes pipeline into status/explain queries. |
| packages/pivot/src/pivot/cli/repro.py | Updates DAG validation/watch path selection for identity-first model and removes YAML references. |
| packages/pivot/src/pivot/cli/remote.py | Improves CLI target normalization to accept identity targets (stage:key). |
| packages/pivot/src/pivot/cli/list.py | Updates list output to show deps/outs as identity keys and removes YAML messaging. |
| packages/pivot/src/pivot/cli/init.py | Updates init message to instruct pipeline.py creation only. |
| packages/pivot/src/pivot/cli/helpers.py | Replaces registry access with PipelineLike accessors and adds workspace store helper. |
| packages/pivot/src/pivot/cli/doctor.py | Removes pivot.yaml checks; validates pipeline.py existence only. |
| packages/pivot/src/pivot/cli/decorators.py | Stores/retrieves PipelineLike from click context. |
| packages/pivot/src/pivot/cli/data.py | Adds identity-aware CLI target resolution for data diffs via workspace store mapping. |
| packages/pivot/src/pivot/cli/console.py | Updates change rendering to support ArtifactIdentity display and dep-change field rename. |
| packages/pivot/src/pivot/cli/completion.py | Adds bare-name completion and identity (stage:key) completion by loading pipeline on demand. |
| packages/pivot/src/pivot/cli/checkout.py | Makes checkout use identity-based stage outputs via workspace store resolution. |
| packages/pivot/src/pivot/cli/_run_common.py | Updates discovery typing to PipelineLike. |
| packages/pivot/src/pivot/cli/init.py | Removes export/import-dvc/schema commands from CLI surface. |
| packages/pivot/src/pivot/cli/AGENTS.md | Updates CLI docs to remove export reference. |
| packages/pivot/src/pivot/init.py | Bumps version and switches public API exports to composition API and merkle helper. |
| packages/pivot/pyproject.toml | Bumps package version to 0.2.0a1. |
| packages/pivot-tui/tests/test_watch.py | Updates test pipeline.py to new compose API and uses fully-qualified stage names. |
| packages/pivot-tui/tests/test_tui_force_rerun.py | Updates stage names to fully-qualified prefixed form. |
| packages/pivot-tui/tests/test_run.py | Migrates helper stage signatures away from Dep(...) and updates output snapshot identity typing. |
| packages/pivot-tui/tests/test_rpc_contract.py | Updates RPC contract tests for structured identity payloads and compose pipeline creation. |
| packages/pivot-tui/tests/test_rpc_client_impl.py | Updates stage_info parsing expectations and adds structured identity test. |
| packages/pivot-tui/tests/test_fake_server.py | Updates fake server stage_info deps/outs to structured identity JSON. |
| packages/pivot-tui/tests/test_diff_panels.py | Updates diff panel tests to use ArtifactIdentity and identity-key indexing. |
| packages/pivot-tui/tests/test_client_protocol.py | Updates protocol types to use ArtifactIdentity lists for stage_info results. |
| packages/pivot-tui/tests/helpers.py | Refactors test pipeline/stage registration helpers to build compose pipelines. |
| packages/pivot-tui/tests/conftest.py | Switches fixtures to compose Pipeline / PipelineLike. |
| packages/pivot-tui/src/pivot_tui/testing/fake_server.py | Updates stage_info response shape (structured identities). |
| packages/pivot-tui/src/pivot_tui/run.py | Parses output summary “path” into ArtifactIdentity instead of string. |
| packages/pivot-tui/src/pivot_tui/rpc_client_impl.py | Decodes stage_info deps/outs from structured identity JSON into ArtifactIdentity. |
| packages/pivot-tui/src/pivot_tui/diff_panels.py | Updates diff indexing and rendering to use identity keys derived from ArtifactIdentity. |
| packages/pivot-tui/src/pivot_tui/client.py | Updates StageInfoResult typing to structured identity objects. |
| docs/task9-cli-tui-update-locations.md | Adds implementation notes on identity display changes across CLI/TUI. |
| docs/research/textual-rich-formatting-patterns.md | Adds research notes on Rich/Textual formatting for identity display. |
| docs/plans/2026-02-17-pipeline-unification-cleanup.md | Adds plan doc for pipeline unification and dead code removal. |
| docs/plans/2026-02-16-single-field-typeddict-bug.md | Adds plan doc for single-field TypedDict output key preservation. |
| docs/plans/2026-02-15-path-free-engine*.md | Adds design/plan docs for identity-first engine architecture. |
| docs/gen_ref_pages.py | Updates API reference generation to point to pivot.compose instead of legacy pipeline module. |
| _ = graph, paths | ||
| return sorted(all_stages.keys()) |
There was a problem hiding this comment.
what_if_changed() currently ignores the provided paths and returns all stages unconditionally. This breaks the CLI expectation of narrowing to affected stages. If path-based resolution is no longer available in an identity-first graph, consider re-implementing this by mapping user-provided paths/identity targets to ArtifactIdentity (via WorkspaceStore/presentation reverse index) and then querying the graph consumers; otherwise, remove/disable this command path with a clear error until implemented.
| identity = engine_graph.parse_artifact_identity(str(path)) | ||
| producer = engine_graph.get_producer(self._graph, identity) |
There was a problem hiding this comment.
watch.py is parsing filesystem paths (from the watcher) as artifact identity strings. For typical watch events, str(path) will be a workspace path like data/train.csv, not an identity key like stage:key, so graph lookups will miss and watch-mode change detection will silently degrade. Recommended fix: resolve filesystem paths to ArtifactIdentity via a store/presentation reverse mapping (path→identity), or keep watch graph queries path-based until a reliable path↔identity index exists.
| identity = engine_graph.parse_artifact_identity(str(path)) | ||
| consumers = engine_graph.get_consumers(self._graph, identity) |
There was a problem hiding this comment.
watch.py is parsing filesystem paths (from the watcher) as artifact identity strings. For typical watch events, str(path) will be a workspace path like data/train.csv, not an identity key like stage:key, so graph lookups will miss and watch-mode change detection will silently degrade. Recommended fix: resolve filesystem paths to ArtifactIdentity via a store/presentation reverse mapping (path→identity), or keep watch graph queries path-based until a reliable path↔identity index exists.
| identity = engine_graph.parse_artifact_identity(str(path)) | ||
| return engine_graph.get_producer(self._graph, identity) |
There was a problem hiding this comment.
watch.py is parsing filesystem paths (from the watcher) as artifact identity strings. For typical watch events, str(path) will be a workspace path like data/train.csv, not an identity key like stage:key, so graph lookups will miss and watch-mode change detection will silently degrade. Recommended fix: resolve filesystem paths to ArtifactIdentity via a store/presentation reverse mapping (path→identity), or keep watch graph queries path-based until a reliable path↔identity index exists.
| def _metric_read_path( | ||
| out: types.ArtifactRef | outputs.BaseOut, | ||
| project_root: pathlib.Path, | ||
| ) -> tuple[str, pathlib.Path] | None: | ||
| if isinstance(out, types.ArtifactRef): | ||
| if out.tag is not types.ArtifactTag.METRIC: | ||
| return None | ||
| path_key = types.identity_key(out.identity) | ||
| path = pathlib.Path(path_key) | ||
| if not path.is_absolute(): | ||
| path = project_root / path_key | ||
| return path_key, path |
There was a problem hiding this comment.
For ArtifactRef metrics, _metric_read_path() treats identity_key(out.identity) as a filesystem path. Identity keys like producer:key are not guaranteed to be valid paths (and may never correspond to a real file location), so this can cause metrics discovery to fail or point to non-existent files. Suggestion: resolve metric read paths via WorkspaceStore.resolve_display_path(out) (similar to show/data.py), and only fall back to identity_key for display when no store is available.
| config.get_checkout_mode_order() | ||
| project_root = project.get_project_root() | ||
| store_spec = store_mod.StoreSpec( | ||
| kind="workspace", | ||
| cache_dir=str(files_cache_dir), | ||
| project_root=str(project_root), | ||
| pipeline_name=pipeline.name, | ||
| input_bindings=pipeline.input_bindings, | ||
| ) | ||
| store = store_mod.store_from_spec(store_spec) |
There was a problem hiding this comment.
config.get_checkout_mode_order() is called but its return value is ignored, which reads like a leftover from the previous cache-write implementation. Either remove the call or thread the checkout mode order into the store/commit logic (e.g., when materializing cached artifacts) so the behavior is explicit.
| identity_key = types.identity_key(out.identity) | ||
| identity = types.identity_from_key(identity_key) | ||
| try: | ||
| hash_info = store.hash_artifact(out) |
There was a problem hiding this comment.
The commit path previously saved cached outputs into the local cache (CAS) for later checkout/push; now it only hashes via store.hash_artifact(out) without an explicit cache write step. If hash_artifact() does not also persist artifacts into cache_dir, pivot commit will no longer guarantee that cache-backed outputs are actually present in cache. Recommended fix: ensure commit calls a store API that both hashes and stores (or add a dedicated store.cache_artifact() / store.write_to_cache() step for non-metric outputs) and only treat metric-tagged refs as non-cached.
| hash_info = store.hash_artifact(out) | |
| # Metrics are not cached; all other outputs must be written to cache. | |
| if out.tag is ArtifactTag.METRIC: | |
| hash_info = store.hash_artifact(out) | |
| else: | |
| hash_info = store.cache_artifact(out) |
| x=None, | ||
| y=None, | ||
| template=None, |
There was a problem hiding this comment.
When out is an outputs.Plot, the x/y/template metadata is currently discarded (set to None), which is a functional regression for plot rendering/diff behavior that relied on these fields. If outputs.Plot remains supported, preserve out.x, out.y, and out.template for that branch (and keep None only for ArtifactRef where the metadata is unavailable).
| x=None, | |
| y=None, | |
| template=None, | |
| x=out.x, | |
| y=out.y, | |
| template=out.template, |
| state_dir: pathlib.Path | None = None, | ||
| ) -> list[str]: | ||
| if state_dir is None: | ||
| state_dir = project.get_project_root() / ".pivot" |
There was a problem hiding this comment.
find_orphaned_lock_files() defaults state_dir to project_root / '.pivot', but other code paths use config.get_state_dir() (which may be customized). This can cause orphan lock detection to silently scan the wrong directory. Suggest defaulting to config.get_state_dir() (or reusing the same state-dir source as the status/explain caller) for consistency.
| state_dir = project.get_project_root() / ".pivot" | |
| state_dir = config.get_state_dir() |
When any stage fails, pivot repro and pivot run now raise SystemExit(1) instead of silently returning 0. This matches the behavior of pivot verify. Also demote discovery banner from logger.info to logger.debug so it only appears with -v flag, keeping output cleaner.
…avior When can_skip_via_generation() returns True in explain.py (status/dry-run), also verify output files exist on disk. Previously, explain.py would report a stage as skippable even when its output files were missing, while the engine would correctly detect the missing outputs and re-run the stage. Also fix metrics show: resolve ArtifactRef identity keys to actual workspace paths via WorkspaceStore.resolve_display_path() instead of treating the identity key string as a literal file path.
fix(cli): resolve file path targets to producing stages in repro
Summary
Replace the entire pipeline definition and execution model. Stages are now plain Python functions composed via
ArtifactHandlewiring instead of verboseAnnotated[T, Dep(...)]/Out(...)annotations. File paths are eliminated as identity throughout the stack — the DAG, worker, lock files, state DB, and skip detection all operate on structuredArtifactIdentity(producer, key)objects, with paths resolved late by Store.246 files changed, +17,512 / −64,643 (net −47K lines)
What Changed
1. Compositional Pipeline API (
compose.py)New
@stagedecorator +Pipelinecontext manager. Stages are pure functions, the DAG emerges from data flow:ArtifactHandleobjects wire stages together — no paths, no loader annotations on consumersTypedDictreturn types with attribute access on handlespipeline.include(other_pipeline)DataFrame→ JSONL,dict→ YAML,Figure→ PNG)2. Path-Free Engine (
ArtifactIdentity)ArtifactIdentity(producer, key)replaces file paths as the canonical identity:identity_key()encodingstorage/presentation.py): Symlink tree materializes CAS outputs into conventional workspace paths3. Pipeline Unification
Eliminated the dual-Pipeline-class architecture:
PipelineLikeProtocol for the engine contractcompose.Pipelineis now the sole implementationpipeline.Pipeline,pipeline/yaml.py, and thebuild()bridgeresolve_external_dependencies()— compose handles all dep resolution via handles,include(), andp.input()4. Dead Code Removal
Deleted modules with zero production imports now that the declarative API is gone:
matrix.py(143 lines) — matrix expansion, replaced by Python loopsdvc_import.py(880 lines) — DVC import compatibilitydvc_compat.py(389 lines) — DVC format compatibilitypipeline/pipeline.py(642 lines) — old Pipeline classpipeline/yaml.py(765 lines) — YAML pipeline parsing5. CLI Migration
All CLI commands (
diff,checkout,verify,sync,restore,show,completion) updated to resolve targets via identity keys instead of file paths.6. Test Overhaul
test_compose.py(+1,542 lines),test_discovery.py,test_names.pycovering the new APIBreaking Changes
compose.Pipeline+@stage— annotation-based registration path removedpivot.yamldiscovery: Now discoverspipeline.pymodules withPipelinecontext managersDesign Documents
docs/plans/2026-02-14-compositional-pipeline-api.mddocs/plans/2026-02-15-path-free-engine-design.mddocs/plans/2026-02-17-pipeline-unification-plan.mddocs/plans/2026-02-17-pipeline-unification-cleanup.mdCloses #449