Skip to content

feat(pipes): resolve table dependencies for cache invalidation#343

Open
taitelee wants to merge 25 commits into
mainfrom
pipe-cache-invalidation
Open

feat(pipes): resolve table dependencies for cache invalidation#343
taitelee wants to merge 25 commits into
mainfrom
pipe-cache-invalidation

Conversation

@taitelee

Copy link
Copy Markdown
Contributor

Summary

Pipes were cached TTL-only because they couldn't report which tables they read, so an ingest never invalidated a stale pipe result (#178). This teaches a pipe to resolve the ingested base tables its SQL reads — through views and aliases — and version-invalidate on the same namespace-versioned path structured queries already use.

When a pipe is created or updated, PipesHandler.Put resolves its base tables by running EXPLAIN QUERY TREE over dummy-bound SQL (the table set depends only on the SQL, not on per-request parameter values) and keeping the names the schema
registry knows in the configured database; the result is stored on the pipe (ResolvedTables, server-owned — never trusted from client input). Execute folds those tables into the cache key as whole-table namespaces, encoded exactly as the ingest worker encodes them, so an ingest into any of them invalidates the cached result. Resolution is best-effort and bounded: with no registry/ClickHouse wired or on any failure, the pipe falls back to the previous TTL-only behavior,

Related Issues

Closes #178

@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

The PR implements dependency-aware pipe cache invalidation. VersionManager is refactored to a two-level tableVersions/namespaceVersions model with BumpTable/BumpNamespace APIs. A new pipe_deps.go module resolves a pipe's referenced base tables via EXPLAIN QUERY TREE with recursive view expansion and backtick-aware identifier parsing. NamedQuery stores resolved tables and DummyBind generates safe runnable SQL for analysis. PipesHandler populates resolved tables at Put time and uses them to key cache entries in Execute, enabling ingest-triggered invalidation to evict stale pipe results.

Changes

Pipe dependency-based cache invalidation

Layer / File(s) Summary
VersionManager two-level versioning refactor
internal/cache/version_manager.go
Replaces single versions map with tableVersions and namespaceVersions; adds Namespace struct, NamespaceKey, QueryKey, BumpTable, BumpNamespace; removes GetCacheKey, GetVersion, IncrementVersion.
NamedQuery.ResolvedTables and DummyBind helper
internal/pipes/pipes.go, internal/pipes/dummybind_test.go
NamedQuery gains a server-owned ResolvedTables []string field; DummyBind generates runnable SQL for EXPLAIN analysis by substituting type-appropriate dummy values for all parameters; tests validate typed, boolean, bare inline, inline-default, and no-param cases.
Pipe SQL dependency extraction via EXPLAIN QUERY TREE
internal/api/pipe_deps.go
New pipe_deps.go implements best-effort base-table resolution through EXPLAIN QUERY TREE with recursive view expansion, cycle prevention, identifier parsing (backtick/escape handling), and schema-registry filtering; helpers include resolvePipeDeps, collectReadTables, explainQueryTreeTables, parseQueryTreeTables, filterKnownTables, splitQualified, unquoteIdent, and pipeDeps.
Pipe deps unit tests
internal/api/pipe_deps_test.go
Unit tests cover all parsing/normalization helpers and the pipeDeps namespace converter; fakeCache test double records cache dependency flow; handler tests assert deps pass through on cache HIT/MISS and server-side ownership of resolved_tables.
PipesHandler dependency-based cache wiring
internal/api/pipes.go
PipesHandler adds Registry and Database fields for best-effort resolution; Put calls resolvePipeDeps to populate ResolvedTables (server-owned); Execute computes deps from resolved tables and passes them to Cache.Get/Cache.Set instead of always nil, enabling ingest invalidation to reach pipe cache entries.
Main wiring and integration tests
cmd/wavehouse/main.go, tests/integration/pipe_deps_test.go, clients/ts/src/types.ts, docs/src/content/docs/pipes.mdx, tests/integration/setup_test.go, CHANGELOG.md
main.go constructs pipesHandler separately to assign Registry/Database; integration tests exercise view→base-table resolution, materialized view source tracking, direct table references, UNION multi-table resolution, and MISS→HIT→invalidation→MISS cache cycles against a real ClickHouse instance; client type adds resolved_tables field; docs describe cache invalidation behavior and TTL fallback; setup improves table-name sanitization; CHANGELOG documents two-level versioning and dependency resolution.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant PipesHandler
    participant resolvePipeDeps
    participant ClickHouse
    participant SchemaRegistry
    participant Cache

    rect rgba(100, 149, 237, 0.5)
        Note over Client,Cache: PUT /v1/pipes/{name}
        Client->>PipesHandler: PUT pipe SQL
        PipesHandler->>resolvePipeDeps: pipe definition
        resolvePipeDeps->>ClickHouse: DummyBind SQL → EXPLAIN QUERY TREE
        ClickHouse-->>resolvePipeDeps: table_name identifiers
        resolvePipeDeps->>ClickHouse: system.tables as_select (view expansion)
        ClickHouse-->>resolvePipeDeps: view SQL (recursive)
        resolvePipeDeps->>SchemaRegistry: filterKnownTables
        SchemaRegistry-->>resolvePipeDeps: resolved base table names
        resolvePipeDeps-->>PipesHandler: ResolvedTables []string
        PipesHandler->>Cache: store pipe with ResolvedTables
    end

    rect rgba(60, 179, 113, 0.5)
        Note over Client,Cache: GET /v1/pipes/{name}/execute
        Client->>PipesHandler: Execute request
        PipesHandler->>PipesHandler: pipeDeps(q.ResolvedTables) → []Namespace
        PipesHandler->>Cache: Get(sha, deps)
        Cache-->>PipesHandler: HIT or MISS
        alt MISS
            PipesHandler->>ClickHouse: run pipe SQL
            ClickHouse-->>PipesHandler: results
            PipesHandler->>Cache: Set(sha, deps, results)
        end
        PipesHandler-->>Client: results + X-Cache header
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Wave-RF/WaveHouse#314: Introduces the cache.Namespace struct and foundational VersionManager.QueryKey/BumpTable/BumpNamespace APIs that this PR's pipe invalidation logic is built on.
  • Wave-RF/WaveHouse#177: Modifies PipesHandler cache wiring in internal/api/pipes.go, directly overlapping with this PR's changes to Execute and Put cache integration.
  • Wave-RF/WaveHouse#172: Changes PipesHandler authorization in Execute, colliding with the same handler code paths modified here for dependency-aware cache keying.

Suggested labels

area/query, area/ingest

Suggested reviewers

  • EricAndrechek
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.94% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat(pipes): resolve table dependencies for cache invalidation' directly and clearly summarizes the main change: enabling pipes to resolve and use table dependencies for cache invalidation.
Description check ✅ Passed The description explains the problem (pipes cached TTL-only without table awareness) and the solution (resolving base tables via EXPLAIN QUERY TREE and version-invalidating), clearly relating to the changeset.
Linked Issues check ✅ Passed The PR fully addresses issue #178's acceptance criteria: pipe cache is now invalidated on table writes, resolved tables are extracted via EXPLAIN QUERY TREE at pipe registration, and multi-table pipes are supported with consistent cache namespace encoding.
Out of Scope Changes check ✅ Passed All changes are directly related to pipe dependency resolution and cache invalidation: query tree parsing, dependency discovery, version manager refactoring, and integration/unit tests validating the feature work.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch pipe-cache-invalidation
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch pipe-cache-invalidation

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@github-actions github-actions Bot added go Pull requests that update go code area/api HTTP handlers, routing, middleware area/ingest Ingest pipeline (Bento, batching, DLQ) area/cache Local / shared / tiered caching area/pipes Named query pipes area/docs Documentation, site/, README area/infra CI, build, deploy, Docker, release labels Jun 11, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4


ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: d7ff65f4-c449-4c3d-a7b3-4ef586eb7bbe

📥 Commits

Reviewing files that changed from the base of the PR and between 9661c82 and 27ae1f1.

📒 Files selected for processing (18)
  • CHANGELOG.md
  • cmd/wavehouse/main.go
  • internal/api/pipe_deps.go
  • internal/api/pipe_deps_test.go
  • internal/api/pipes.go
  • internal/api/structured_query.go
  • internal/cache/cache.go
  • internal/cache/cache_test.go
  • internal/cache/local.go
  • internal/cache/local_test.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
  • internal/ingest/worker.go
  • internal/ingest/worker_test.go
  • internal/pipes/dummybind_test.go
  • internal/pipes/pipes.go
  • internal/testutil/mocks.go
  • tests/integration/pipe_deps_test.go
💤 Files with no reviewable changes (1)
  • internal/cache/cache_test.go
📜 Review details
🧰 Additional context used
📓 Path-based instructions (7)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Use Go 1.26 with strict formatting enforced by gofumpt
Use structured logging with log/slog (JSON handler)
Use Chi v5 for HTTP routing
Return errors, don't panic. Wrap with fmt.Errorf("context: %w", err)
Use package naming: lowercase, single word (or abbreviated). internal/ enforces module privacy
No global state: Dependencies are passed explicitly (constructor injection)
Comment the why, not the what. Add a comment only when the reason isn't obvious from the code; a line that matches the surrounding pattern needs none. Keep comments to 1–2 lines
DRY — one source of truth. Before adding logic, look for an existing helper, type, or constant to reuse; before duplicating a rule, factor it into one place every caller reads
Leave it neater than you found it — within reason. Fix small, safe things in passing: a stale comment, an obvious typo, a misnamed local, dead code on your path

Files:

  • internal/api/structured_query.go
  • internal/testutil/mocks.go
  • internal/ingest/worker.go
  • internal/pipes/pipes.go
  • internal/pipes/dummybind_test.go
  • cmd/wavehouse/main.go
  • tests/integration/pipe_deps_test.go
  • internal/ingest/worker_test.go
  • internal/api/pipe_deps.go
  • internal/cache/local.go
  • internal/api/pipe_deps_test.go
  • internal/cache/local_test.go
  • internal/cache/cache.go
  • internal/api/pipes.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
internal/api/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Chi HTTP router, JWT/JWKS middleware (from auth/), ingest/query/structured-query/SSE/schema/DLQ/policy/pipes handlers, Hub

Files:

  • internal/api/structured_query.go
  • internal/api/pipe_deps.go
  • internal/api/pipe_deps_test.go
  • internal/api/pipes.go
internal/ingest/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Ingest worker pipeline (worker.go): JetStream input → per-table batch INSERT with DLQ output. The pipeline is insert-only. Wire format EventMessage carries {table_name, scope, received_timestamp, data} and nothing else

Files:

  • internal/ingest/worker.go
  • internal/ingest/worker_test.go
internal/pipes/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Named query pipes: NamedQuery type + NATS KV store (WAVEHOUSE_PIPES) + .sql file bootstrap. Pre-defined SQL templates with param binding + caching; per-pipe allowed_roles is the only execute-path gate via policy.RoleAllowed

Files:

  • internal/pipes/pipes.go
  • internal/pipes/dummybind_test.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Use table-driven tests with tests := []struct{ name string; ... } and t.Run(tt.name, ...)
Use shared mocks from internal/testutil/ (MockPublisher, MockCache, MockDeduplicator, MockSubscriber) instead of creating ad-hoc mocks
Use testutil.MakeJWT(t, claims) and testutil.MakeExpiredJWT(t, claims) for auth tests
Use testutil.NewTestSchemaRegistry(tables) or discovery.NewSchemaRegistryFromMap(tables) for schema-aware tests
Use policy.NewMemoryStore(p) for in-memory policy testing without NATS
Use pipes.NewMemoryStore(queries...) for in-memory pipes testing without NATS
Use testutil.AssertJSONResponse(t, rec, status, expected) and testutil.AssertJSONContains(t, rec, status, substring) for response assertions

Files:

  • internal/pipes/dummybind_test.go
  • tests/integration/pipe_deps_test.go
  • internal/ingest/worker_test.go
  • internal/api/pipe_deps_test.go
  • internal/cache/local_test.go
  • internal/cache/version_manager_test.go
tests/integration/**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

Go integration tests (//go:build integration; ClickHouse testcontainer); run via make test-integration

Files:

  • tests/integration/pipe_deps_test.go
internal/cache/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Implement Cache interface → LocalCache (Ristretto) + SharedCache (TBD) + TieredCache (singleflight)

Files:

  • internal/cache/local.go
  • internal/cache/local_test.go
  • internal/cache/cache.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
🧠 Learnings (5)
📚 Learning: 2026-06-10T15:01:09.027Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 312
File: docs/src/content/docs/development.md:0-0
Timestamp: 2026-06-10T15:01:09.027Z
Learning: In this repo’s Markdown review (all .md files), do not flag capitalization/style issues for literal paths starting with ".github/" (or any substring that is a path beginning with ".github/"). Treat ".github" as the correct lowercase dotfile directory name, even when it appears inside prose or code spans; automated checks such as LanguageTool’s "(GITHUB)" rule commonly produce false positives for this literal filesystem path.

Applied to files:

  • CHANGELOG.md
📚 Learning: 2026-05-25T11:24:21.130Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 180
File: internal/cache/local.go:0-0
Timestamp: 2026-05-25T11:24:21.130Z
Learning: In WaveHouse’s cache packages (e.g., internal/cache/local.go), it’s acceptable to define package-level `var` constants that hold immutable OpenTelemetry metric attribute sets / `metric.MeasurementOption` values (for example: `cacheL1Attrs = metric.WithAttributes(attribute.String("tier","L1"))`). Treat these as stateless, pre-allocated option values (analogous to `regexp.MustCompile(...)`), not mutable global state. When applying the AGENTS.md “no global state / constructor injection” guideline, apply it to application dependencies (e.g., Cache, Publisher, Deduplicator) rather than to these immutable OTel attribute/measurement option variables—do not flag them as constructor-injection violations.

Applied to files:

  • internal/cache/local.go
  • internal/cache/local_test.go
  • internal/cache/cache.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
📚 Learning: 2026-05-20T01:02:00.784Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 164
File: internal/api/router_test.go:289-350
Timestamp: 2026-05-20T01:02:00.784Z
Learning: In WaveHouse’s internal API tests (files matching internal/api/**/*_test.go), follow the existing separation-of-concerns convention for testing the RequireRole middleware: inject `ContextKeyRole` directly into the request `context.Context` instead of using `testutil.MakeJWT`/JWT-driven flows. Do not refactor role-gate tests to use JWT tokens—JWT parsing and token handling are covered separately in `middleware_test.go` (the dedicated JWT parsing tests), and mixing those concerns would expand the failure surface and reduce isolation.

Applied to files:

  • internal/api/pipe_deps_test.go
📚 Learning: 2026-05-23T01:23:59.268Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 174
File: internal/api/ingest_test.go:111-111
Timestamp: 2026-05-23T01:23:59.268Z
Learning: In WaveHouse Go tests in internal/api/**/*_test.go, use internal/testutil.AssertJSONErrorResponse(t, w) for HTTP error-path JSON assertions. Do not use (or reintroduce) package-local assertJSONErrorResponse helpers. AssertJSONErrorResponse verifies the response Content-Type is application/json, includes the X-Content-Type-Options: nosniff header, and that the JSON body contains an "error" field.

Applied to files:

  • internal/api/pipe_deps_test.go
📚 Learning: 2026-05-20T20:30:15.808Z
Learnt from: taitelee
Repo: Wave-RF/WaveHouse PR: 172
File: internal/api/pipes_test.go:106-118
Timestamp: 2026-05-20T20:30:15.808Z
Learning: For WaveHouse pipes authorization allowlist checks, fix the empty-role fail-open behavior by (1) removing any outer guard that prevents allowlist evaluation when the incoming `role` is `""` (e.g., don’t short-circuit with `if role != "" { ... }`), and (2) during allowlist scanning, ensure only non-empty allowlist entries can match—e.g., require `ar != "" && ar == role` (so a malformed allowlist like `["" ]` cannot grant access to an empty incoming role via `"" == ""`).

Applied to files:

  • internal/api/pipes.go
🔇 Additional comments (30)
internal/cache/cache.go (1)

8-32: LGTM!

internal/cache/version_manager.go (3)

16-32: LGTM!


34-51: LGTM!


76-94: LGTM!

internal/cache/version_manager_test.go (1)

9-72: LGTM!

internal/cache/local.go (1)

31-75: LGTM!

internal/cache/local_test.go (1)

12-161: LGTM!

internal/api/structured_query.go (2)

129-143: LGTM!


172-174: LGTM!

internal/testutil/mocks.go (1)

117-136: LGTM!

internal/ingest/worker_test.go (4)

224-232: LGTM!


449-543: LGTM!


704-731: LGTM!


733-764: LGTM!

internal/ingest/worker.go (1)

461-509: LGTM!

internal/pipes/pipes.go (2)

29-37: LGTM!


204-243: LGTM!

internal/api/pipe_deps.go (7)

28-35: LGTM!


43-56: LGTM!


64-83: LGTM!


90-103: LGTM!


134-153: LGTM!


196-205: LGTM!


179-187: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Incorrect escape sequence handling for identifiers with both backslashes and backticks.

Lines 183-184 unescape ClickHouse backtick-quoted identifiers in the wrong order, causing incorrect results when an identifier contains both a literal backslash and a literal backtick. ClickHouse uses \\ for a backslash and ``` for a backtick inside backtick-quoted identifiers.

Example: The ClickHouse identifier a\b(a, backslash, backtick, b) is written in SQL as ``a\`b`` (where\` and \`` → `` ``). After stripping the outer backticks, the string is a\\\b`. The current code:

  1. Line 183 replaces \`` with `` ``: a\\\b` → `a\b`
  2. Line 184 tries to replace \\\\ with \\: no match (only two backslashes remain)
  3. Result: a\\b (wrong; expected a\b`)

Sequential ReplaceAll is unsafe here because the replacements can interact. The correct approach is to process escape sequences in a single pass:

  • Iterate byte-by-byte; when \ is seen, check the next byte:
    • If \, output one \ and advance two bytes
    • If `, output one ` and advance two bytes
    • Otherwise, output \ (or handle as an error)
🔧 Proposed fix for correct escape handling
 func unquoteIdent(s string) string {
 	s = strings.TrimSpace(s)
 	if len(s) >= 2 && s[0] == '`' && s[len(s)-1] == '`' {
-		s = s[1 : len(s)-1]
-		s = strings.ReplaceAll(s, "\\`", "`")
-		s = strings.ReplaceAll(s, "\\\\", "\\")
+		s = s[1 : len(s)-1] // strip outer backticks
+		// Unescape in a single pass to avoid interaction between \\ and \`
+		var out strings.Builder
+		for i := 0; i < len(s); i++ {
+			if s[i] == '\\' && i+1 < len(s) {
+				next := s[i+1]
+				if next == '\\' || next == '`' {
+					out.WriteByte(next)
+					i++ // skip the next byte (already consumed)
+					continue
+				}
+			}
+			out.WriteByte(s[i])
+		}
+		s = out.String()
 	}
 	return s
 }
			> Likely an incorrect or invalid review comment.
internal/api/pipes.go (3)

27-33: LGTM!


76-80: LGTM!


155-193: LGTM!

cmd/wavehouse/main.go (1)

367-372: LGTM!

internal/pipes/dummybind_test.go (1)

1-63: LGTM!

internal/api/pipe_deps_test.go (1)

1-192: LGTM!

Comment thread CHANGELOG.md Outdated
Comment thread internal/api/pipe_deps.go Outdated
Comment thread internal/cache/version_manager.go
Comment thread tests/integration/pipe_deps_test.go Outdated
@github-project-automation github-project-automation Bot moved this from Backlog to In review in WaveHouse Task Board Jun 11, 2026
coderabbitai[bot]
coderabbitai Bot previously approved these changes Jun 14, 2026
coderabbitai[bot]
coderabbitai Bot previously approved these changes Jun 14, 2026
@github-code-quality

github-code-quality Bot commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

Code Coverage Overview

Languages: Go

Go

The overall coverage in the branch is 89%. The coverage in the branch is 90%.

Show a code coverage summary of the most impacted files.
File 725fdee 09a37ee +/-
internal/query/ident.go 100% 0% -100%
internal/discov...ry/discovery.go 98% 93% -5%
internal/cache/...sion_manager.go 100% 96% -4%
internal/cache/local.go 91% 90% -1%
cmd/wavehouse/main.go 69% 69% 0%
internal/pipes/pipes.go 83% 83% 0%
internal/chsql/chsql.go 100% 100% 0%
internal/api/pipes.go 86% 89% +3%
internal/discovery/explain.go 0% 72% +72%
internal/chsql/nats.go 0% 100% +100%

Updated June 26, 2026 21:03 UTC
Code Coverage is in Public Preview. Learn more and provide us with your feedback.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/pipes/pipes.go (1)

263-271: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return an array-shaped dummy for declared array parameters.

ParamDef.Type documents "array", but dummyForType falls through to "x". A pipe like WHERE id IN {{ids}} dummy-binds as IN 'x', making EXPLAIN fail and leaving ResolvedTables nil/TTL-only for a common filtered-pipe shape.

Proposed fix
 func dummyForType(t string) any {
 	switch strings.ToLower(t) {
 	case "number":
 		return 0
 	case "boolean":
 		return false
+	case "array":
+		return []any{nil}
 	default: // "string" and anything unrecognized
 		return "x"
 	}
 }
internal/api/pipes.go (1)

27-33: 🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

Pass dependency-resolution collaborators through the constructor.

Registry and Database are required for the new invalidation behavior, but they are optional mutable fields after NewPipesHandler, so any missed call site silently falls back to TTL-only caching. Prefer constructor args or an options struct that initializes them with the rest of the handler dependencies.

As per coding guidelines, “No global state: Dependencies are passed explicitly (constructor injection)”.

Also applies to: 47-48

Source: Coding guidelines


ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: bf950fb1-b7db-47a4-975e-0611d95c9907

📥 Commits

Reviewing files that changed from the base of the PR and between 038a205 and b1e4bcc.

📒 Files selected for processing (6)
  • CHANGELOG.md
  • cmd/wavehouse/main.go
  • internal/api/pipe_deps.go
  • internal/api/pipes.go
  • internal/pipes/pipes.go
  • tests/integration/pipe_deps_test.go
📜 Review details
⏰ Context from checks skipped due to timeout of 300000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Coverage
  • GitHub Check: Integration tests
  • GitHub Check: Unit tests
  • GitHub Check: E2E tests
🧰 Additional context used
📓 Path-based instructions (5)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Use Go 1.26 with strict formatting enforced by gofumpt
Use structured logging with log/slog (JSON handler)
Use Chi v5 for HTTP routing
Return errors, don't panic. Wrap with fmt.Errorf("context: %w", err)
Use package naming: lowercase, single word (or abbreviated). internal/ enforces module privacy
No global state: Dependencies are passed explicitly (constructor injection)
Comment the why, not the what. Add a comment only when the reason isn't obvious from the code; a line that matches the surrounding pattern needs none. Keep comments to 1–2 lines
DRY — one source of truth. Before adding logic, look for an existing helper, type, or constant to reuse; before duplicating a rule, factor it into one place every caller reads
Leave it neater than you found it — within reason. Fix small, safe things in passing: a stale comment, an obvious typo, a misnamed local, dead code on your path

Files:

  • cmd/wavehouse/main.go
  • internal/pipes/pipes.go
  • internal/api/pipe_deps.go
  • internal/api/pipes.go
  • tests/integration/pipe_deps_test.go
internal/pipes/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Named query pipes: NamedQuery type + NATS KV store (WAVEHOUSE_PIPES) + .sql file bootstrap. Pre-defined SQL templates with param binding + caching; per-pipe allowed_roles is the only execute-path gate via policy.RoleAllowed

Files:

  • internal/pipes/pipes.go
internal/api/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Chi HTTP router, JWT/JWKS middleware (from auth/), ingest/query/structured-query/SSE/schema/DLQ/policy/pipes handlers, Hub

Files:

  • internal/api/pipe_deps.go
  • internal/api/pipes.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Use table-driven tests with tests := []struct{ name string; ... } and t.Run(tt.name, ...)
Use shared mocks from internal/testutil/ (MockPublisher, MockCache, MockDeduplicator, MockSubscriber) instead of creating ad-hoc mocks
Use testutil.MakeJWT(t, claims) and testutil.MakeExpiredJWT(t, claims) for auth tests
Use testutil.NewTestSchemaRegistry(tables) or discovery.NewSchemaRegistryFromMap(tables) for schema-aware tests
Use policy.NewMemoryStore(p) for in-memory policy testing without NATS
Use pipes.NewMemoryStore(queries...) for in-memory pipes testing without NATS
Use testutil.AssertJSONResponse(t, rec, status, expected) and testutil.AssertJSONContains(t, rec, status, substring) for response assertions

Files:

  • tests/integration/pipe_deps_test.go
tests/integration/**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

Go integration tests (//go:build integration; ClickHouse testcontainer); run via make test-integration

Files:

  • tests/integration/pipe_deps_test.go
🧠 Learnings (2)
📚 Learning: 2026-05-20T20:30:15.808Z
Learnt from: taitelee
Repo: Wave-RF/WaveHouse PR: 172
File: internal/api/pipes_test.go:106-118
Timestamp: 2026-05-20T20:30:15.808Z
Learning: For WaveHouse pipes authorization allowlist checks, fix the empty-role fail-open behavior by (1) removing any outer guard that prevents allowlist evaluation when the incoming `role` is `""` (e.g., don’t short-circuit with `if role != "" { ... }`), and (2) during allowlist scanning, ensure only non-empty allowlist entries can match—e.g., require `ar != "" && ar == role` (so a malformed allowlist like `["" ]` cannot grant access to an empty incoming role via `"" == ""`).

Applied to files:

  • internal/api/pipes.go
📚 Learning: 2026-06-10T15:01:09.027Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 312
File: docs/src/content/docs/development.md:0-0
Timestamp: 2026-06-10T15:01:09.027Z
Learning: In this repo’s Markdown review (all .md files), do not flag capitalization/style issues for literal paths starting with ".github/" (or any substring that is a path beginning with ".github/"). Treat ".github" as the correct lowercase dotfile directory name, even when it appears inside prose or code spans; automated checks such as LanguageTool’s "(GITHUB)" rule commonly produce false positives for this literal filesystem path.

Applied to files:

  • CHANGELOG.md
🔇 Additional comments (6)
internal/pipes/pipes.go (1)

29-37: LGTM!

Also applies to: 42-48, 152-165, 172-225, 274-355, 381-384

internal/api/pipe_deps.go (1)

16-68: LGTM!

Also applies to: 70-82, 112-297

internal/api/pipes.go (1)

39-44: LGTM!

Also applies to: 77-97, 157-178, 187-225

cmd/wavehouse/main.go (2)

391-391: LGTM!


107-117: The OTLP endpoint migration is complete and correct. cfg.OTel.Addr has been fully removed from the codebase (no references remain except a comment in the test fixture explicitly noting its absence). The config.yaml, documentation, and code consistently use the standard OTEL_EXPORTER_OTLP_ENDPOINT env var; no config-only deployments can silently fall back to a stale field.

tests/integration/pipe_deps_test.go (1)

145-250: LGTM!

Comment thread CHANGELOG.md Outdated
Comment thread internal/api/pipe_deps.go Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
internal/api/pipe_deps.go (1)

92-112: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Registry-known views can still survive as pipe dependencies. The resolver expands views, but it also keeps the original ref before expansion; the new test only checks that the base table is present, so it would not catch [base, view].

  • internal/api/pipe_deps.go#L92-L112: append r only on the non-view path, and continue after appending recursively expanded dependencies.
  • tests/integration/pipe_deps_test.go#L161-L165: assert exact resolved dependencies, or at least assert the view name is absent.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: db06d93b-bf91-4b2f-b8bb-2bd9b91e4f57

📥 Commits

Reviewing files that changed from the base of the PR and between b1e4bcc and 394420d.

📒 Files selected for processing (3)
  • CHANGELOG.md
  • internal/api/pipe_deps.go
  • tests/integration/pipe_deps_test.go
📜 Review details
⏰ Context from checks skipped due to timeout of 300000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Coverage
  • GitHub Check: E2E tests
  • GitHub Check: Analyze (go)
  • GitHub Check: Lint
🧰 Additional context used
📓 Path-based instructions (4)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Use Go 1.26 with strict formatting enforced by gofumpt
Use structured logging with log/slog (JSON handler)
Use Chi v5 for HTTP routing
Return errors, don't panic. Wrap with fmt.Errorf("context: %w", err)
Use package naming: lowercase, single word (or abbreviated). internal/ enforces module privacy
No global state: Dependencies are passed explicitly (constructor injection)
Comment the why, not the what. Add a comment only when the reason isn't obvious from the code; a line that matches the surrounding pattern needs none. Keep comments to 1–2 lines
DRY — one source of truth. Before adding logic, look for an existing helper, type, or constant to reuse; before duplicating a rule, factor it into one place every caller reads
Leave it neater than you found it — within reason. Fix small, safe things in passing: a stale comment, an obvious typo, a misnamed local, dead code on your path

Files:

  • tests/integration/pipe_deps_test.go
  • internal/api/pipe_deps.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Use table-driven tests with tests := []struct{ name string; ... } and t.Run(tt.name, ...)
Use shared mocks from internal/testutil/ (MockPublisher, MockCache, MockDeduplicator, MockSubscriber) instead of creating ad-hoc mocks
Use testutil.MakeJWT(t, claims) and testutil.MakeExpiredJWT(t, claims) for auth tests
Use testutil.NewTestSchemaRegistry(tables) or discovery.NewSchemaRegistryFromMap(tables) for schema-aware tests
Use policy.NewMemoryStore(p) for in-memory policy testing without NATS
Use pipes.NewMemoryStore(queries...) for in-memory pipes testing without NATS
Use testutil.AssertJSONResponse(t, rec, status, expected) and testutil.AssertJSONContains(t, rec, status, substring) for response assertions

Files:

  • tests/integration/pipe_deps_test.go
tests/integration/**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

Go integration tests (//go:build integration; ClickHouse testcontainer); run via make test-integration

Files:

  • tests/integration/pipe_deps_test.go
internal/api/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Chi HTTP router, JWT/JWKS middleware (from auth/), ingest/query/structured-query/SSE/schema/DLQ/policy/pipes handlers, Hub

Files:

  • internal/api/pipe_deps.go
🧠 Learnings (1)
📚 Learning: 2026-06-10T15:01:09.027Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 312
File: docs/src/content/docs/development.md:0-0
Timestamp: 2026-06-10T15:01:09.027Z
Learning: In this repo’s Markdown review (all .md files), do not flag capitalization/style issues for literal paths starting with ".github/" (or any substring that is a path beginning with ".github/"). Treat ".github" as the correct lowercase dotfile directory name, even when it appears inside prose or code spans; automated checks such as LanguageTool’s "(GITHUB)" rule commonly produce false positives for this literal filesystem path.

Applied to files:

  • CHANGELOG.md
🔇 Additional comments (1)
CHANGELOG.md (1)

18-18: LGTM!

Also applies to: 63-64

Comment thread tests/integration/pipe_deps_test.go Outdated
coderabbitai[bot]
coderabbitai Bot previously approved these changes Jun 16, 2026
@github-actions github-actions Bot added documentation Improvements or additions to documentation area/sdk TypeScript SDK (clients/ts/) labels Jun 16, 2026
@github-actions

github-actions Bot commented Jun 16, 2026

Copy link
Copy Markdown

📚 Docs preview is livehttps://d26ce7f2-wavehouse-docs.wave-rf.workers.dev

  • Commit09a37ee: Merge remote-tracking branch 'origin/main' into pipe-cache-invalidation
  • Author@taitelee
  • Committed — 2026-06-26 16:58 (UTC-04:00)
  • Deployed — 2026-06-26 17:02 EDT

coderabbitai[bot]
coderabbitai Bot previously approved these changes Jun 16, 2026
@EricAndrechek

Copy link
Copy Markdown
Member

Not sure this assertion in your summary is accurate, need to think about it some more but pretty sure the parameters could influence the table set used...

(the table set depends only on the SQL, not on per-request parameter values)

Comment thread docs/src/content/docs/architecture.md Outdated
Comment thread internal/api/pipe_deps.go Outdated
Comment thread internal/api/pipe_deps.go Outdated
Comment thread internal/api/pipe_deps.go Outdated
Comment on lines +70 to +72
// maxViewExpansions backstops view→view expansion against a pathological chain;
// the visited set already prevents cycles, this just bounds total work.
const maxViewExpansions = 32

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine, but I'd like this documented and it more clearly logged when this backstop fires so an operator setting up WaveHouse & Pipes knows this has happened and can debug their schemas to see why/what happened.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, update: I think as this is implemented now, if/when this gets hit, we return nil and throw away all the values we'd collected up until this point, is that right? Per my logic that we should still collect dependencies on a best effort to invalidate the TTL when we know we need to even if we don't have ALL the dependencies and thus can't survive LONGER than the TTL, we would need what had been collected here and not just nil.

Comment thread internal/api/pipe_deps.go Outdated
Comment on lines +86 to +89
refs, err := explainQueryTreeTables(ctx, conn, sql)
if err != nil {
return nil
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not want/need error logging here? And is returning nil valid compared to an empty list or whatever? I notice the code that takes this result as input doesn't check for nil return values and just loops over it, so is this handled properly in said cases? Do we have test cases to cover this?

Comment thread internal/api/pipe_deps.go Outdated
Comment thread internal/api/pipe_deps.go Outdated

@EricAndrechek EricAndrechek left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, my notes:

  • What did we decide about the pipes being read-only vs not, and where/how to document and enforce that if at all? I don't remember the consensus we came to but I didn't see anything about it in this PR, unless that was in another one and I'm just losing my mind?
  • We need stored and bootstrapped tables/registries to re-resolve their dependencies on boot. Right now this only happens on Put and (as commented in this review) it needs to happen on bootstrap too. But what happens in the case where the pipe is Put and resolved and saved, and then WaveHouse is restarted? Does it persist with the old dependency chain it had? What happens if the schemas changed then during that time? That's why we have the persistent schema refresh interval and an API endpoint to force a schema discovery update – should this update the pipe dependencies too?
  • In cases where we can't determine all the dependencies, whether it's because another database is queried, we use an external S3 resource or URL, we nest too deeply, etc – I think we should still collect all the dependencies we DO have and know of. This way, if the TTL gets set to, say, 1 minute, but a KNOWN dependency is updated, we can still invalidate immediately. The only case/reason we NEED all the dependencies to be known is when we want to outlast the TTL, which we don't currently support anyway (but should, we need to open a new issue to track this!)
  • We likely eventually will want to support more than just the one database, which would break this current implementation (among other things, which is why I'm not gating the PR being accepted on this working, since support for that would require its own dedicated PR) – but it has made me realize, I think our NATS safe subject stuff, caching keys, and this dependency chain logic, will all fall apart when that is added/supported, since we don't even have a way for NATS to include the database name along with the table... We should make a two-part issue for this, to add in the support for multiple databases (and tracking their dependencies, inserting into them, etc) and to make sure caching, dependency calculations, and NATS subjects, URL param titles (like for streams), policy file configs, etc all still work with databases added on...
  • This PR made me realize, do we properly apply the policy roles on pipes, or do those intentionally not apply to pipes? Regardless, I feel like the path we've chosen should be more clear and explicitly documented...
  • The biggest thing: we NEED to have per-role, claim, parameter, etc caching and isolation, including potentially on the dependency path. Resolving deps once at Put() with dummy params can't express a table set that varies by parameter value. Example:
SELECT user_id, count() FROM (
  SELECT user_id FROM web_events    WHERE {{source}} = 'web'
  UNION ALL
  SELECT user_id FROM mobile_events WHERE {{source}} = 'mobile'
) GROUP BY user_id

?source=web genuinely depends only on web_events; ?source=mobile only on mobile_events. The dummy-bind resolver gets neither right — it over-resolves to both tables (every entry invalidated by either write) or, if EXPLAIN folds the constant-false dummy branches, resolves to an empty set (silent TTL-only). Since the cache is keyed per (sql, params), the correct dep set is per-entry and differs by parameter.
Some viable directions: (a) resolve at cache-miss time on the real bound SQL (synchronous EXPLAIN, deps known before Set, no re-keying); (b) async system.query_log.tables after flush_interval_milliseconds (≈7.5s) — more precise and gives base-table/view expansion for free, but the flush delay outlives most TTLs, requires re-keying the version-folded cache key (deps must be known at Get time, but aren't until after execution), and adds a staleness window. Notice though that none of these solve the scope stuff, which we also need to consider then too...

Comment thread internal/api/pipe_deps.go Outdated
Comment thread internal/api/pipe_deps.go Outdated
Comment thread internal/api/pipe_deps.go Outdated
Comment thread internal/api/pipe_deps.go Outdated
Comment thread internal/api/pipe_deps.go Outdated
Comment thread internal/pipes/pipes.go Outdated
Comment thread internal/pipes/pipes.go Outdated
Comment on lines +384 to +386
// Pipes bootstrapped here are stored without table-dependency resolution (that runs
// in the API Put handler, which holds the ClickHouse connection), so they cache
// TTL-only until next saved via the API.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? I don't like that... I think this should effectively act like a Put on bootup/start once ClickHouse is available and ready.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicer than a boot-time Put now: resolution is lazy, so a .sql-bootstrapped pipe resolves on its first execution and re-resolves on every schema refresh (ClearResolvedDeps). No permanent TTL-only window. Worst case it's TTL-only until first run, then identical to an API-saved pipe. Would you still rather have pipes resolve at boot? Or is the lazy approach okay?

Comment on lines +88 to +89
case r >= 'A' && r <= 'Z':
return r + ('a' - 'A')

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why what is the point in this, how is it different from just including the case in the case above and a plain return r?

Comment on lines +80 to +83
// Sanitize the test name into a valid CH identifier: lowercase, and replace every
// character that isn't a letter, digit, or underscore with '_'. An allowlist (not a
// denylist of a few known separators) so punctuation in a descriptive subtest name —
// commas, parens, colons — can't leak into the CREATE TABLE and cause a syntax error.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the desire/need for this change, but at the same time I wonder if we SHOULD even being restricting it at all... technically the identifiers can be pretty much anything as is, including backticks, slashes/backslashes, etc, so (especially given that this is for testing) it feels like it may be more productive/relevant to allow any/all characters and confirm it still all operates the same, no?

Was this change made since tests (whether old or newly added in this PR) started failing? If so, I'd like to see those cases that fail and investigate them further, as they feel like failures we should chase the root cause of and not just bandaid patch by limiting what table names can be used in our test cases.

Comment thread internal/api/pipe_deps.go Outdated
Comment on lines +70 to +72
// maxViewExpansions backstops view→view expansion against a pathological chain;
// the visited set already prevents cycles, this just bounds total work.
const maxViewExpansions = 32

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, update: I think as this is implemented now, if/when this gets hit, we return nil and throw away all the values we'd collected up until this point, is that right? Per my logic that we should still collect dependencies on a best effort to invalidate the TTL when we know we need to even if we don't have ALL the dependencies and thus can't survive LONGER than the TTL, we would need what had been collected here and not just nil.

@github-actions github-actions Bot added area/ingest Ingest pipeline (Bento, batching, DLQ) area/query Structured query AST, SQL builder and removed area/sdk TypeScript SDK (clients/ts/) labels Jun 26, 2026
coderabbitai[bot]
coderabbitai Bot previously approved these changes Jun 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/api HTTP handlers, routing, middleware area/cache Local / shared / tiered caching area/docs Documentation, site/, README area/infra CI, build, deploy, Docker, release area/ingest Ingest pipeline (Bento, batching, DLQ) area/pipes Named query pipes area/query Structured query AST, SQL builder documentation Improvements or additions to documentation go Pull requests that update go code

Projects

Status: In review

Development

Successfully merging this pull request may close these issues.

feat(cache): pipe cache invalidation via table/scope extraction from named queries

2 participants