Skip to content

[TKC-5244] fix: execution listing queries should be faster now#7384

Merged
de-wim merged 11 commits intomainfrom
TKC-5244-postgres-list-executions-slow
Mar 25, 2026
Merged

[TKC-5244] fix: execution listing queries should be faster now#7384
de-wim merged 11 commits intomainfrom
TKC-5244-postgres-list-executions-slow

Conversation

@de-wim
Copy link
Copy Markdown
Contributor

@de-wim de-wim commented Mar 20, 2026

Pull request description

Rewrote postgres execution listing queries, should be quicker.

Checklist (choose whats happened)

  • breaking change! (describe)
  • tested locally
  • tested on cluster
  • added new dependencies
  • updated the docs
  • added a test

Fixes

  • Old queries timed out (~18s for listing the executions of an organization)

de-wim added 4 commits March 20, 2026 21:37
test seemed to just validate generated code matched certain regexes, what's the point?
@de-wim de-wim marked this pull request as ready for review March 23, 2026 16:27
@de-wim de-wim requested a review from a team as a code owner March 23, 2026 16:27
@de-wim de-wim requested review from Copilot and niyiomotoso March 23, 2026 16:27
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR rewrites the Postgres execution listing/summary/totals queries to improve performance (notably by replacing JSONB-array based selector/tag/label filtering with text[]-based filtering), aiming to eliminate timeouts when listing executions for an organization.

Changes:

  • Reworked execution query filters for tags/labels/selectors from JSONB params to text[] params (and updated repository param-building accordingly).
  • Updated sqlc-generated code and bumped SQLC_VERSION to v1.30.0.
  • Simplified the sqlc execution query tests (but at the cost of removing many prior query tests).

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pkg/repository/testworkflow/postgres/postgres.go Updates parameter building/parsing and maps params for the rewritten sqlc queries.
pkg/database/postgres/queries/executions.sql Rewrites execution queries to use unnest(text[])/split_part instead of jsonb_array_elements.
pkg/database/postgres/sqlc/executions.sql.go Regenerated sqlc output reflecting the new query params ([]string instead of []byte JSON blobs).
pkg/database/postgres/sqlc/executions.sql_test.go Replaces many sqlc query tests with a single permissive totals test.
Makefile Bumps sqlc version to v1.30.0.
Comments suppressed due to low confidence (1)

pkg/repository/testworkflow/postgres/postgres.go:2035

  • buildTestWorkflowExecutionTotalParams doesn’t populate HealthRanges even though GetTestWorkflowExecutionsTotals accepts a health_ranges parameter and the SQL applies a health-range filter. This means totals will ignore filter.HealthRanges() while the listing/summary paths honor it. Consider adding the same JSON marshalling logic used in buildTestWorkflowExecutionParams for filter.HealthRangesDefined() so totals and listings stay consistent.
func (r *PostgresRepository) buildTestWorkflowExecutionTotalParams(filter testworkflow.Filter) (sqlc.GetTestWorkflowExecutionsTotalsParams, error) {
	params := sqlc.GetTestWorkflowExecutionsTotalsParams{
		OrganizationID: r.organizationID,
		EnvironmentID:  r.environmentID,
	}

	// Basic filters
	if filter.NameDefined() {
		params.WorkflowName = filter.Name()
	}


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread pkg/repository/testworkflow/postgres/postgres.go Outdated
Comment on lines +1972 to 2005
func (r *PostgresRepository) parseSelectorToText(selector string) ([]string, []string) {
keys := make([]string, 0)
conditions := make([]string, 0)
items := strings.Split(selector, ",")
for _, item := range items {
elements := strings.Split(item, "=")
if len(elements) == 2 {
values[utils.EscapeDots(elements[0])] = append(values[utils.EscapeDots(elements[0])], elements[1])
conditions = append(conditions, utils.EscapeDots(elements[0])+"="+elements[1])
} else if len(elements) == 1 {
condType := "exists"
keys = append(keys, KeyCondition{
Operator: condType,
Key: utils.EscapeDots(elements[0]),
})
key := utils.EscapeDots(elements[0])
keys = append(keys, key)
}
}

for key, value := range values {
conditions = append(conditions, ValueCondition{
Key: key,
Values: value,
})
}

return keys, conditions
}

// Parse label selector into conditions
func (r *PostgresRepository) parseLabelSelector(labelSelector *testworkflow.LabelSelector) ([]KeyCondition, []ValueCondition) {
keys := make([]KeyCondition, 0)
conditions := make([]ValueCondition, 0)
values := make(map[string][]string, 0)
func (r *PostgresRepository) parseLabelSelectorToText(labelSelector *testworkflow.LabelSelector) ([]string, []string) {
keys := make([]string, 0)
conditions := make([]string, 0)
for _, label := range labelSelector.Or {
if label.Value != nil {
values[utils.EscapeDots(label.Key)] = append(values[utils.EscapeDots(label.Key)], *label.Value)
conditions = append(conditions, utils.EscapeDots(label.Key)+"="+*label.Value)
} else if label.Exists != nil {
// Label exists/not exists
condType := "exists"
key := utils.EscapeDots(label.Key)
if !*label.Exists {
condType = "not_exists"
key = key + ":not_exists"
}
keys = append(keys, KeyCondition{
Operator: condType,
Key: utils.EscapeDots(label.Key),
})
keys = append(keys, key)
}
}

for key, value := range values {
conditions = append(conditions, ValueCondition{
Key: key,
Values: value,
})
}

return keys, conditions
}
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The selector/tag/label parsing was refactored to a new text[] encoding (key, key=value, and :not_exists suffix). There are currently no unit tests covering these encodings (e.g., multi-value same key, not-exists, whitespace handling), but they are critical to query correctness. Please add focused tests for buildTestWorkflowExecutionParams (and totals) to verify the produced TagKeys/TagConditions/LabelKeys/... slices for representative filters.

Copilot uses AI. Check for mistakes.
Comment on lines 13 to 21
@@ -22,1238 +19,65 @@ func TestSQLCTestWorkflowExecutionQueries_GetTestWorkflowExecution(t *testing.T)
queries := New(mock)
ctx := context.Background()

Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file previously contained tests for many sqlc execution queries, but now only tests GetTestWorkflowExecutionsTotals. That’s a significant test coverage regression for query generation/argument ordering across the other execution query methods. Please restore the removed tests or add replacement coverage for the other query functions impacted by the query/param-type rewrite.

Copilot uses AI. Check for mistakes.
Comment on lines +25 to +49
// Expect any query - we just want to verify the function can be called
mock.ExpectQuery("SELECT").WithArgs(
"org-id",
"env-id",
"", // workflow_name
[]string{}, // workflow_names
"", // text_search
pgtype.Timestamptz{}, // start_date
pgtype.Timestamptz{}, // end_date
int32(0), // last_n_days
[]string{}, // statuses
"", // runner_id
pgtype.Bool{}, // assigned
"", // actor_name
"", // actor_type
"", // group_id
pgtype.Bool{}, // initialized
[]byte("[]"), // health_ranges
[]string{}, // tag_keys
[]string{}, // tag_conditions
[]string{}, // label_keys
[]string{}, // label_conditions
[]string{}, // selector_keys
[]string{}, // selector_conditions
).WillReturnRows(rows)
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mock.ExpectQuery("SELECT") is very permissive and no longer verifies that the correct sqlc query string is being executed (it will match any query containing the substring “SELECT”). It would be more robust to match the specific query constant (e.g., using regexp.QuoteMeta(getTestWorkflowExecutionsTotals) or the generated constant name) so the test fails if the SQL changes unexpectedly or a different query is called.

Copilot uses AI. Check for mistakes.
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Mar 23, 2026

Greptile Summary

This PR rewrites the Postgres execution listing queries to replace jsonb-typed filter parameters (tag_keys, tag_conditions, label_keys, label_conditions, selector_keys, selector_conditions) with text[] parameters using unnest. The motivation is a major latency reduction (~18 s → fast) for org-level execution listing, likely because Postgres can evaluate unnest of a native array far more efficiently than jsonb_array_elements on a serialised JSON parameter.

What's good:

  • The semantic equivalence of the rewritten filters is maintained throughout: AND logic for key-existence checks (= array_length), OR logic for label-key and value conditions (> 0), and the COUNT(DISTINCT) parity check for selector conditions correctly mirrors the old all-keys-must-match behaviour.
  • Removing the intermediate JSON marshal/unmarshal round-trip in Go simplifies the calling code and eliminates a class of serialisation errors.

Findings:

  • Warning (test regression): 18 test functions covering unmodified CRUD operations (Insert, Update, Delete, Abort, Assign, GetTestWorkflowExecution, etc.) were removed from executions.sql_test.go. The single surviving test uses mock.ExpectQuery("SELECT"), which matches any SELECT query and does not verify the SQL or parameter shape. These tests should be restored with the updated []string parameter types.
  • Info: The SQL replace(key_condition, ':not_exists', '') approach strips all occurrences of the :not_exists substring from a key string, not just the trailing suffix. For standard Kubernetes label keys this is harmless, but it could misbehave if a user-defined key ever contained that literal substring.

Confidence Score: 4/5

  • Safe to merge for the performance fix; the test regression is the main remaining concern but does not affect production behaviour of the changed queries.
  • The SQL rewrite is semantically correct and the performance goal (replacing expensive jsonb_array_elements with unnest on a native array) is sound. The one concrete issue is the removal of 18 unrelated tests, which reduces safety confidence but does not introduce a production bug. Restoring or updating those tests is a follow-up action.
  • pkg/database/postgres/sqlc/executions.sql_test.go — 18 tests covering unchanged CRUD operations were removed and should be restored.

Important Files Changed

Filename Overview
pkg/database/postgres/queries/executions.sql Rewrites tag_keys, tag_conditions, label_keys, label_conditions, selector_keys, and selector_conditions filter parameters from jsonb to text[] with unnest for all four listing queries (CountTestWorkflowExecutions, GetTestWorkflowExecutionsTotals, GetTestWorkflowExecutions, GetTestWorkflowExecutionsSummary). The semantics are preserved: _keys checks use = array_length (AND, all must exist), label_keys uses > 0 (OR), and selector_conditions uses a COUNT(DISTINCT) parity check. health_ranges stays as jsonb.
pkg/database/postgres/sqlc/executions.sql.go Auto-generated by sqlc from the updated .sql file. Parameter structs for the four affected queries have six fields changed from []byte (raw JSON) to []string. The embedded SQL string constants are updated consistently with the .sql source.
pkg/database/postgres/sqlc/executions.sql_test.go 18 test functions covering unmodified CRUD operations were removed. The single remaining test uses mock.ExpectQuery("SELECT") (matches any SELECT), providing minimal coverage of the new query structure. This represents a significant regression in test coverage.
Makefile Bumps SQLC_VERSION from v1.29.0 to v1.30.0 to support the new query generation.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Filter input] --> B[parseSelectorToText]
    A --> C[parseLabelSelectorToText]
    A --> D[parseTagSelectorToText]

    B --> E["SelectorKeys: text[]\nSelectorConditions: text[]"]
    C --> F["LabelKeys: text[]\nLabelConditions: text[]"]
    D --> G["TagKeys: text[]\nTagConditions: text[]"]

    E --> H[SQL unnest + COUNT]
    F --> H
    G --> H

    H --> I["tag_keys / selector_keys:\nCOUNT = array_length\nALL must match"]
    H --> J["label_keys / tag_conditions /\nlabel_conditions:\nCOUNT > 0\nANY match"]
    H --> K["selector_conditions:\nCOUNT DISTINCT matching keys\n= COUNT DISTINCT total keys\nEach key has at least one match"]
Loading

Reviews (1): Last reviewed commit: "fix queries" | Re-trigger Greptile

Comment on lines 13 to 83
@@ -22,1238 +19,65 @@ func TestSQLCTestWorkflowExecutionQueries_GetTestWorkflowExecution(t *testing.T)
queries := New(mock)
ctx := context.Background()

// Define expected query pattern
expectedQuery := `SELECT
e\.id, e\.group_id, e\.runner_id, e\.runner_target, e\.runner_original_target, e\.name, e\.namespace, e\.number, e\.scheduled_at, e\.assigned_at, e\.status_at, e\.test_workflow_execution_name, e\.disable_webhooks, e\.tags, e\.running_context, e\.config_params, e\.runtime, e\.created_at, e\.updated_at,
r\.status, r\.predicted_status, r\.queued_at, r\.started_at, r\.finished_at,
r\.duration, r\.total_duration, r\.duration_ms, r\.paused_ms, r\.total_duration_ms,
r\.pauses, r\.initialization, r\.steps,
w\.name as workflow_name, w\.namespace as workflow_namespace, w\.description as workflow_description,
w\.labels as workflow_labels, w\.annotations as workflow_annotations, w\.created as workflow_created,
w\.updated as workflow_updated, w\.spec as workflow_spec, w\.read_only as workflow_read_only,
w\.status as workflow_status,
rw\.name as resolved_workflow_name, rw\.namespace as resolved_workflow_namespace,
rw\.description as resolved_workflow_description, rw\.labels as resolved_workflow_labels,
rw\.annotations as resolved_workflow_annotations, rw\.created as resolved_workflow_created,
rw\.updated as resolved_workflow_updated, rw\.spec as resolved_workflow_spec,
rw\.read_only as resolved_workflow_read_only, rw\.status as resolved_workflow_status,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', s\.id,
'ref', s\.ref,
'name', s\.name,
'category', s\.category,
'optional', s\.optional,
'negative', s\.negative,
'parent_id', s\.parent_id
\) ORDER BY s\.sig_order
\) FROM test_workflow_signatures s WHERE s\.execution_id = e\.id\),
'\[\]'::json
\)::json as signatures_json,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', o\.id,
'ref', o\.ref,
'name', o\.name,
'value', o\.value
\) ORDER BY o\.out_order
\) FROM test_workflow_outputs o WHERE o\.execution_id = e\.id\),
'\[\]'::json
\)::json as outputs_json,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', rep\.id,
'ref', rep\.ref,
'kind', rep\.kind,
'file', rep\.file,
'summary', rep\.summary
\) ORDER BY rep\.rep_order
\) FROM test_workflow_reports rep WHERE rep\.execution_id = e\.id\),
'\[\]'::json
\)::json as reports_json,
ra\.global as resource_aggregations_global,
ra\.step as resource_aggregations_step
FROM test_workflow_executions e
LEFT JOIN test_workflow_results r ON e\.id = r\.execution_id
LEFT JOIN test_workflows w ON e\.id = w\.execution_id AND w\.workflow_type = 'workflow'
LEFT JOIN test_workflows rw ON e\.id = rw\.execution_id AND rw\.workflow_type = 'resolved_workflow'
LEFT JOIN test_workflow_resource_aggregations ra ON e\.id = ra\.execution_id
WHERE \(e\.id = \$1 OR e\.name = \$1\) AND \(e\.organization_id = \$2 AND e\.environment_id = \$3\)`

// Mock expected result
rows := mock.NewRows([]string{
"id", "group_id", "runner_id", "runner_target", "runner_original_target", "name", "namespace", "number",
"scheduled_at", "assigned_at", "status_at", "test_workflow_execution_name", "disable_webhooks",
"tags", "running_context", "config_params", "runtime", "created_at", "updated_at",
"status", "predicted_status", "queued_at", "started_at", "finished_at",
"duration", "total_duration", "duration_ms", "paused_ms", "total_duration_ms",
"pauses", "initialization", "steps",
"workflow_name", "workflow_namespace", "workflow_description", "workflow_labels", "workflow_annotations",
"workflow_created", "workflow_updated", "workflow_spec", "workflow_read_only", "workflow_status",
"resolved_workflow_name", "resolved_workflow_namespace", "resolved_workflow_description",
"resolved_workflow_labels", "resolved_workflow_annotations", "resolved_workflow_created",
"resolved_workflow_updated", "resolved_workflow_spec", "resolved_workflow_read_only", "resolved_workflow_status",
"signatures_json", "outputs_json", "reports_json", "resource_aggregations_global", "resource_aggregations_step",
}).AddRow(
"test-id", "group-1", "runner-1", []byte(`{}`), []byte(`{}`), "test-execution", "default", int64(1),
time.Now(), time.Now(), time.Now(), "test-execution-name", false,
[]byte(`{"env":"test"}`), []byte(`{}`), []byte(`{}`), []byte(`{}`), time.Now(), time.Now(),
"passed", "passed", time.Now(), time.Now(), time.Now(),
"5m", "5m", int64(300000), int64(0), int64(300000),
[]byte(`[]`), []byte(`{}`), []byte(`{}`),
"test-workflow", "default", "Test workflow", []byte(`{}`), []byte(`{}`),
time.Now(), time.Now(), []byte(`{}`), false, []byte(`{}`),
nil, nil, nil, nil, nil, nil, nil, nil, nil, nil,
[]byte(`[]`), []byte(`[]`), []byte(`[]`), []byte(`{}`), []byte(`{}`),
)

mock.ExpectQuery(expectedQuery).WithArgs("test-id", "org-id", "env-id").WillReturnRows(rows)

// Execute query
result, err := queries.GetTestWorkflowExecution(ctx, GetTestWorkflowExecutionParams{
ID: "test-id",
OrganizationID: "org-id",
EnvironmentID: "env-id",
})

// Assertions
assert.NoError(t, err)
assert.Equal(t, "test-id", result.ID)
assert.Equal(t, "test-execution", result.Name)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_GetTestWorkflowExecutionByNameAndTestWorkflow(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `SELECT
e\.id, e\.group_id, e\.runner_id, e\.runner_target, e\.runner_original_target, e\.name, e\.namespace, e\.number, e\.scheduled_at, e\.assigned_at, e\.status_at, e\.test_workflow_execution_name, e\.disable_webhooks, e\.tags, e\.running_context, e\.config_params, e\.runtime, e\.created_at, e\.updated_at,
r\.status, r\.predicted_status, r\.queued_at, r\.started_at, r\.finished_at,
r\.duration, r\.total_duration, r\.duration_ms, r\.paused_ms, r\.total_duration_ms,
r\.pauses, r\.initialization, r\.steps,
w\.name as workflow_name, w\.namespace as workflow_namespace, w\.description as workflow_description,
w\.labels as workflow_labels, w\.annotations as workflow_annotations, w\.created as workflow_created,
w\.updated as workflow_updated, w\.spec as workflow_spec, w\.read_only as workflow_read_only,
w\.status as workflow_status,
rw\.name as resolved_workflow_name, rw\.namespace as resolved_workflow_namespace,
rw\.description as resolved_workflow_description, rw\.labels as resolved_workflow_labels,
rw\.annotations as resolved_workflow_annotations, rw\.created as resolved_workflow_created,
rw\.updated as resolved_workflow_updated, rw\.spec as resolved_workflow_spec,
rw\.read_only as resolved_workflow_read_only, rw\.status as resolved_workflow_status,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', s\.id,
'ref', s\.ref,
'name', s\.name,
'category', s\.category,
'optional', s\.optional,
'negative', s\.negative,
'parent_id', s\.parent_id
\) ORDER BY s\.sig_order
\) FROM test_workflow_signatures s WHERE s\.execution_id = e\.id\),
'\[\]'::json
\)::json as signatures_json,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', o\.id,
'ref', o\.ref,
'name', o\.name,
'value', o\.value
\) ORDER BY o\.out_order
\) FROM test_workflow_outputs o WHERE o\.execution_id = e\.id\),
'\[\]'::json
\)::json as outputs_json,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', rep\.id,
'ref', rep\.ref,
'kind', rep\.kind,
'file', rep\.file,
'summary', rep\.summary
\) ORDER BY rep\.rep_order
\) FROM test_workflow_reports rep WHERE rep\.execution_id = e\.id\),
'\[\]'::json
\)::json as reports_json,
ra\.global as resource_aggregations_global,
ra\.step as resource_aggregations_step
FROM test_workflow_executions e
LEFT JOIN test_workflow_results r ON e\.id = r\.execution_id
LEFT JOIN test_workflows w ON e\.id = w\.execution_id AND w\.workflow_type = 'workflow'
LEFT JOIN test_workflows rw ON e\.id = rw\.execution_id AND rw\.workflow_type = 'resolved_workflow'
LEFT JOIN test_workflow_resource_aggregations ra ON e\.id = ra\.execution_id
WHERE \(e\.id = \$1 OR e\.name = \$1\) AND w\.name = \$2::text AND \(e\.organization_id = \$3 AND e\.environment_id = \$4\)`

rows := mock.NewRows([]string{
"id", "group_id", "runner_id", "runner_target", "runner_original_target", "name", "namespace", "number",
"scheduled_at", "assigned_at", "status_at", "test_workflow_execution_name", "disable_webhooks",
"tags", "running_context", "config_params", "runtime", "created_at", "updated_at",
"status", "predicted_status", "queued_at", "started_at", "finished_at",
"duration", "total_duration", "duration_ms", "paused_ms", "total_duration_ms",
"pauses", "initialization", "steps",
"workflow_name", "workflow_namespace", "workflow_description", "workflow_labels", "workflow_annotations",
"workflow_created", "workflow_updated", "workflow_spec", "workflow_read_only", "workflow_status",
"resolved_workflow_name", "resolved_workflow_namespace", "resolved_workflow_description",
"resolved_workflow_labels", "resolved_workflow_annotations", "resolved_workflow_created",
"resolved_workflow_updated", "resolved_workflow_spec", "resolved_workflow_read_only", "resolved_workflow_status",
"signatures_json", "outputs_json", "reports_json", "resource_aggregations_global", "resource_aggregations_step",
}).AddRow(
"test-id", "group-1", "runner-1", []byte(`{}`), []byte(`{}`), "test-execution", "default", int64(1),
time.Now(), time.Now(), time.Now(), "test-execution-name", false,
[]byte(`{"env":"test"}`), []byte(`{}`), []byte(`{}`), []byte(`{}`), time.Now(), time.Now(),
"passed", "passed", time.Now(), time.Now(), time.Now(),
"5m", "5m", int64(300000), int64(0), int64(300000),
[]byte(`[]`), []byte(`{}`), []byte(`{}`),
"test-workflow", "default", "Test workflow", []byte(`{}`), []byte(`{}`),
time.Now(), time.Now(), []byte(`{}`), false, []byte(`{}`),
nil, nil, nil, nil, nil, nil, nil, nil, nil, nil,
[]byte(`[]`), []byte(`[]`), []byte(`[]`), []byte(`{}`), []byte(`{}`),
)

mock.ExpectQuery(expectedQuery).WithArgs("test-execution", "test-workflow", "org-id", "env-id").WillReturnRows(rows)

// Execute query
params := GetTestWorkflowExecutionByNameAndTestWorkflowParams{
Name: "test-execution",
WorkflowName: "test-workflow",
OrganizationID: "org-id",
EnvironmentID: "env-id",
}
result, err := queries.GetTestWorkflowExecutionByNameAndTestWorkflow(ctx, params)

// Assertions
assert.NoError(t, err)
assert.Equal(t, "test-id", result.ID)
assert.Equal(t, "test-execution", result.Name)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_GetLatestTestWorkflowExecutionByTestWorkflow(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

rows := mock.NewRows([]string{
"id", "group_id", "runner_id", "runner_target", "runner_original_target", "name", "namespace", "number",
"scheduled_at", "assigned_at", "status_at", "test_workflow_execution_name", "disable_webhooks",
"tags", "running_context", "config_params", "runtime", "created_at", "updated_at",
"status", "predicted_status", "queued_at", "started_at", "finished_at",
"duration", "total_duration", "duration_ms", "paused_ms", "total_duration_ms",
"pauses", "initialization", "steps",
"workflow_name", "workflow_namespace", "workflow_description", "workflow_labels", "workflow_annotations",
"workflow_created", "workflow_updated", "workflow_spec", "workflow_read_only", "workflow_status",
"resolved_workflow_name", "resolved_workflow_namespace", "resolved_workflow_description",
"resolved_workflow_labels", "resolved_workflow_annotations", "resolved_workflow_created",
"resolved_workflow_updated", "resolved_workflow_spec", "resolved_workflow_read_only", "resolved_workflow_status",
"signatures_json", "outputs_json", "reports_json", "resource_aggregations_global", "resource_aggregations_step",
}).AddRow(
"test-id", "group-1", "runner-1", []byte(`{}`), []byte(`{}`), "test-execution", "default", int64(1),
time.Now(), time.Now(), time.Now(), "test-execution-name", false,
[]byte(`{"env":"test"}`), []byte(`{}`), []byte(`{}`), []byte(`{}`), time.Now(), time.Now(),
"passed", "passed", time.Now(), time.Now(), time.Now(),
"5m", "5m", int64(300000), int64(0), int64(300000),
[]byte(`[]`), []byte(`{}`), []byte(`{}`),
"test-workflow", "default", "Test workflow", []byte(`{}`), []byte(`{}`),
time.Now(), time.Now(), []byte(`{}`), false, []byte(`{}`),
nil, nil, nil, nil, nil, nil, nil, nil, nil, nil,
[]byte(`[]`), []byte(`[]`), []byte(`[]`), []byte(`{}`), []byte(`{}`),
)

mock.ExpectQuery(regexp.QuoteMeta(getLatestTestWorkflowExecutionByTestWorkflow)).WithArgs("test-workflow", "org-id", "env-id", true, false).WillReturnRows(rows)

// Execute query
result, err := queries.GetLatestTestWorkflowExecutionByTestWorkflow(ctx, GetLatestTestWorkflowExecutionByTestWorkflowParams{
WorkflowName: "test-workflow",
SortByNumber: true,
OrganizationID: "org-id",
EnvironmentID: "env-id",
})

// Assertions
assert.NoError(t, err)
assert.Equal(t, "test-id", result.ID)
assert.Equal(t, "test-execution", result.Name)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_GetLatestTestWorkflowExecutionsByTestWorkflows(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `SELECT DISTINCT ON \(w\.name\)
e\.id, e\.group_id, e\.runner_id, e\.runner_target, e\.runner_original_target, e\.name, e\.namespace, e\.number, e\.scheduled_at, e\.assigned_at, e\.status_at, e\.test_workflow_execution_name, e\.disable_webhooks, e\.tags, e\.running_context, e\.config_params, e\.runtime, e\.created_at, e\.updated_at,
r\.status, r\.predicted_status, r\.queued_at, r\.started_at, r\.finished_at,
r\.duration, r\.total_duration, r\.duration_ms, r\.paused_ms, r\.total_duration_ms,
r\.pauses, r\.initialization, r\.steps,
w\.name as workflow_name, w\.namespace as workflow_namespace, w\.description as workflow_description,
w\.labels as workflow_labels, w\.annotations as workflow_annotations, w\.created as workflow_created,
w\.updated as workflow_updated, w\.spec as workflow_spec, w\.read_only as workflow_read_only,
w\.status as workflow_status,
rw\.name as resolved_workflow_name, rw\.namespace as resolved_workflow_namespace,
rw\.description as resolved_workflow_description, rw\.labels as resolved_workflow_labels,
rw\.annotations as resolved_workflow_annotations, rw\.created as resolved_workflow_created,
rw\.updated as resolved_workflow_updated, rw\.spec as resolved_workflow_spec,
rw\.read_only as resolved_workflow_read_only, rw\.status as resolved_workflow_status,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', s\.id,
'ref', s\.ref,
'name', s\.name,
'category', s\.category,
'optional', s\.optional,
'negative', s\.negative,
'parent_id', s\.parent_id
\) ORDER BY s\.sig_order
\) FROM test_workflow_signatures s WHERE s\.execution_id = e\.id\),
'\[\]'::json
\)::json as signatures_json,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', o\.id,
'ref', o\.ref,
'name', o\.name,
'value', o\.value
\) ORDER BY o\.out_order
\) FROM test_workflow_outputs o WHERE o\.execution_id = e\.id\),
'\[\]'::json
\)::json as outputs_json,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', rep\.id,
'ref', rep\.ref,
'kind', rep\.kind,
'file', rep\.file,
'summary', rep\.summary
\) ORDER BY rep\.rep_order
\) FROM test_workflow_reports rep WHERE rep\.execution_id = e\.id\),
'\[\]'::json
\)::json as reports_json,
ra\.global as resource_aggregations_global,
ra\.step as resource_aggregations_step
FROM test_workflow_executions e
LEFT JOIN test_workflow_results r ON e\.id = r\.execution_id
LEFT JOIN test_workflows w ON e\.id = w\.execution_id AND w\.workflow_type = 'workflow'
LEFT JOIN test_workflows rw ON e\.id = rw\.execution_id AND rw\.workflow_type = 'resolved_workflow'
LEFT JOIN test_workflow_resource_aggregations ra ON e\.id = ra\.execution_id
WHERE w\.name = ANY\(\$1::text\[\]\) AND \(e\.organization_id = \$2 AND e\.environment_id = \$3\)
ORDER BY w\.name, e\.status_at DESC`

rows := mock.NewRows([]string{
"id", "group_id", "runner_id", "runner_target", "runner_original_target", "name", "namespace", "number",
"scheduled_at", "assigned_at", "status_at", "test_workflow_execution_name", "disable_webhooks",
"tags", "running_context", "config_params", "runtime", "created_at", "updated_at",
"status", "predicted_status", "queued_at", "started_at", "finished_at",
"duration", "total_duration", "duration_ms", "paused_ms", "total_duration_ms",
"pauses", "initialization", "steps",
"workflow_name", "workflow_namespace", "workflow_description", "workflow_labels", "workflow_annotations",
"workflow_created", "workflow_updated", "workflow_spec", "workflow_read_only", "workflow_status",
"resolved_workflow_name", "resolved_workflow_namespace", "resolved_workflow_description",
"resolved_workflow_labels", "resolved_workflow_annotations", "resolved_workflow_created",
"resolved_workflow_updated", "resolved_workflow_spec", "resolved_workflow_read_only", "resolved_workflow_status",
"signatures_json", "outputs_json", "reports_json", "resource_aggregations_global", "resource_aggregations_step",
}).AddRow(
"test-id", "group-1", "runner-1", []byte(`{}`), []byte(`{}`), "test-execution", "default", int64(1),
time.Now(), time.Now(), time.Now(), "test-execution-name", false,
[]byte(`{"env":"test"}`), []byte(`{}`), []byte(`{}`), []byte(`{}`), time.Now(), time.Now(),
"passed", "passed", time.Now(), time.Now(), time.Now(),
"5m", "5m", int64(300000), int64(0), int64(300000),
[]byte(`[]`), []byte(`{}`), []byte(`{}`),
"test-workflow", "default", "Test workflow", []byte(`{}`), []byte(`{}`),
time.Now(), time.Now(), []byte(`{}`), false, []byte(`{}`),
nil, nil, nil, nil, nil, nil, nil, nil, nil, nil,
[]byte(`[]`), []byte(`[]`), []byte(`[]`), []byte(`{}`), []byte(`{}`),
)

workflowNames := []string{"workflow1", "workflow2"}
mock.ExpectQuery(expectedQuery).WithArgs(workflowNames, "org-id", "env-id").WillReturnRows(rows)

// Execute query
result, err := queries.GetLatestTestWorkflowExecutionsByTestWorkflows(ctx, GetLatestTestWorkflowExecutionsByTestWorkflowsParams{
WorkflowNames: workflowNames,
OrganizationID: "org-id",
EnvironmentID: "env-id",
})

// Assertions
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, "test-id", result[0].ID)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_GetRunningTestWorkflowExecutions(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `SELECT
e\.id, e\.group_id, e\.runner_id, e\.runner_target, e\.runner_original_target, e\.name, e\.namespace, e\.number, e\.scheduled_at, e\.assigned_at, e\.status_at, e\.test_workflow_execution_name, e\.disable_webhooks, e\.tags, e\.running_context, e\.config_params, e\.runtime, e\.created_at, e\.updated_at,
r\.status, r\.predicted_status, r\.queued_at, r\.started_at, r\.finished_at,
r\.duration, r\.total_duration, r\.duration_ms, r\.paused_ms, r\.total_duration_ms,
r\.pauses, r\.initialization, r\.steps,
w\.name as workflow_name, w\.namespace as workflow_namespace, w\.description as workflow_description,
w\.labels as workflow_labels, w\.annotations as workflow_annotations, w\.created as workflow_created,
w\.updated as workflow_updated, w\.spec as workflow_spec, w\.read_only as workflow_read_only,
w\.status as workflow_status,
rw\.name as resolved_workflow_name, rw\.namespace as resolved_workflow_namespace,
rw\.description as resolved_workflow_description, rw\.labels as resolved_workflow_labels,
rw\.annotations as resolved_workflow_annotations, rw\.created as resolved_workflow_created,
rw\.updated as resolved_workflow_updated, rw\.spec as resolved_workflow_spec,
rw\.read_only as resolved_workflow_read_only, rw\.status as resolved_workflow_status,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', s\.id,
'ref', s\.ref,
'name', s\.name,
'category', s\.category,
'optional', s\.optional,
'negative', s\.negative,
'parent_id', s\.parent_id
\) ORDER BY s\.sig_order
\) FROM test_workflow_signatures s WHERE s\.execution_id = e\.id\),
'\[\]'::json
\)::json as signatures_json,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', o\.id,
'ref', o\.ref,
'name', o\.name,
'value', o\.value
\) ORDER BY o\.out_order
\) FROM test_workflow_outputs o WHERE o\.execution_id = e\.id\),
'\[\]'::json
\)::json as outputs_json,
COALESCE\(
\(SELECT json_agg\(
json_build_object\(
'id', rep\.id,
'ref', rep\.ref,
'kind', rep\.kind,
'file', rep\.file,
'summary', rep\.summary
\) ORDER BY rep\.rep_order
\) FROM test_workflow_reports rep WHERE rep\.execution_id = e\.id\),
'\[\]'::json
\)::json as reports_json,
ra\.global as resource_aggregations_global,
ra\.step as resource_aggregations_step
FROM test_workflow_executions e
LEFT JOIN test_workflow_results r ON e\.id = r\.execution_id
LEFT JOIN test_workflows w ON e\.id = w\.execution_id AND w\.workflow_type = 'workflow'
LEFT JOIN test_workflows rw ON e\.id = rw\.execution_id AND rw\.workflow_type = 'resolved_workflow'
LEFT JOIN test_workflow_resource_aggregations ra ON e\.id = ra\.execution_id
WHERE r\.status IN \('queued', 'assigned', 'starting', 'running', 'pausing', 'paused', 'resuming'\) AND \(e\.organization_id = \$1 AND e\.environment_id = \$2\)
ORDER BY e\.id DESC`

rows := mock.NewRows([]string{
"id", "group_id", "runner_id", "runner_target", "runner_original_target", "name", "namespace", "number",
"scheduled_at", "assigned_at", "status_at", "test_workflow_execution_name", "disable_webhooks",
"tags", "running_context", "config_params", "runtime", "created_at", "updated_at",
"status", "predicted_status", "queued_at", "started_at", "finished_at",
"duration", "total_duration", "duration_ms", "paused_ms", "total_duration_ms",
"pauses", "initialization", "steps",
"workflow_name", "workflow_namespace", "workflow_description", "workflow_labels", "workflow_annotations",
"workflow_created", "workflow_updated", "workflow_spec", "workflow_read_only", "workflow_status",
"resolved_workflow_name", "resolved_workflow_namespace", "resolved_workflow_description",
"resolved_workflow_labels", "resolved_workflow_annotations", "resolved_workflow_created",
"resolved_workflow_updated", "resolved_workflow_spec", "resolved_workflow_read_only", "resolved_workflow_status",
"signatures_json", "outputs_json", "reports_json", "resource_aggregations_global", "resource_aggregations_step",
}).AddRow(
"test-id", "group-1", "runner-1", []byte(`{}`), []byte(`{}`), "test-execution", "default", int64(1),
time.Now(), time.Now(), time.Now(), "test-execution-name", false,
[]byte(`{"env":"test"}`), []byte(`{}`), []byte(`{}`), []byte(`{}`), time.Now(), time.Now(),
"running", "running", time.Now(), time.Now(), time.Now(),
"5m", "5m", int64(300000), int64(0), int64(300000),
[]byte(`[]`), []byte(`{}`), []byte(`{}`),
"test-workflow", "default", "Test workflow", []byte(`{}`), []byte(`{}`),
time.Now(), time.Now(), []byte(`{}`), false, []byte(`{}`),
nil, nil, nil, nil, nil, nil, nil, nil, nil, nil,
[]byte(`[]`), []byte(`[]`), []byte(`[]`), []byte(`{}`), []byte(`{}`),
)

mock.ExpectQuery(expectedQuery).WithArgs("org-id", "env-id").WillReturnRows(rows)
rows := mock.NewRows([]string{"status", "count"}).AddRow("passed", 10)

// Expect any query - we just want to verify the function can be called
mock.ExpectQuery("SELECT").WithArgs(
"org-id",
"env-id",
"", // workflow_name
[]string{}, // workflow_names
"", // text_search
pgtype.Timestamptz{}, // start_date
pgtype.Timestamptz{}, // end_date
int32(0), // last_n_days
[]string{}, // statuses
"", // runner_id
pgtype.Bool{}, // assigned
"", // actor_name
"", // actor_type
"", // group_id
pgtype.Bool{}, // initialized
[]byte("[]"), // health_ranges
[]string{}, // tag_keys
[]string{}, // tag_conditions
[]string{}, // label_keys
[]string{}, // label_conditions
[]string{}, // selector_keys
[]string{}, // selector_conditions
).WillReturnRows(rows)

// Execute query
result, err := queries.GetRunningTestWorkflowExecutions(ctx, GetRunningTestWorkflowExecutionsParams{
OrganizationID: "org-id",
EnvironmentID: "env-id",
})

// Assertions
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, "test-id", result[0].ID)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_GetTestWorkflowExecutionsTotals(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `SELECT
r\.status,
COUNT\(\*\) as count
FROM test_workflow_executions e
LEFT JOIN test_workflow_results r ON e\.id = r\.execution_id
LEFT JOIN test_workflows w ON e\.id = w\.execution_id AND w\.workflow_type = 'workflow'
WHERE \(e\.organization_id = \$1 AND e\.environment_id = \$2\)
AND \(COALESCE\(\$3::text, ''\) = '' OR w.name = \$3::text\)
AND \(COALESCE\(\$4::text\[\], ARRAY\[\]::text\[\]\) = ARRAY\[\]::text\[\] OR w.name = ANY\(\$4::text\[\]\)\)
AND \(COALESCE\(\$5::text, ''\) = '' OR e.name ILIKE '%' \|\| \$5::text \|\| '%'\)
AND \(COALESCE\(\$6::timestamptz, '1900-01-01'::timestamptz\) = '1900-01-01'::timestamptz OR e.scheduled_at >= \$6::timestamptz\)
AND \(COALESCE\(\$7::timestamptz, '2100-01-01'::timestamptz\) = '2100-01-01'::timestamptz OR e.scheduled_at <= \$7::timestamptz\)
AND \(COALESCE\(\$8::integer, 0\) = 0 OR e.scheduled_at >= NOW\(\) - \(COALESCE\(\$8::integer, 0\) \|\| ' days'\)::interval\)
AND \(COALESCE\(\$9::text\[\], ARRAY\[\]::text\[\]\) = ARRAY\[\]::text\[\] OR r.status = ANY\(\$9::text\[\]\)\)
AND \(COALESCE\(\$10::text, ''\) = '' OR e.runner_id = \$10::text\)
AND \(COALESCE\(\$11, NULL\) IS NULL OR
\(\$11::boolean = true AND e.runner_id IS NOT NULL AND e.runner_id != ''\) OR
\(\$11::boolean = false AND \(e.runner_id IS NULL OR e.runner_id = ''\)\)\)
AND \(COALESCE\(\$12::text, ''\) = '' OR e.running_context->'actor'->>'name' = \$12::text\)
AND \(COALESCE\(\$13::text, ''\) = '' OR e.running_context->'actor'->>'type_' = \$13::text\)
AND \(COALESCE\(\$14::text, ''\) = '' OR e.id = \$14::text OR e.group_id = \$14::text\)
AND \(COALESCE\(\$15, NULL\) IS NULL OR
\(\$15::boolean = true AND \(r.status != 'queued' OR r.steps IS NOT NULL\)\) OR
\(\$15::boolean = false AND r.status = 'queued' AND \(r.steps IS NULL OR r.steps = '\{\}'::jsonb\)\)\)
AND \(COALESCE\(\$16::jsonb, '\[\]'::jsonb\) = '\[\]'::jsonb OR
EXISTS \(
SELECT 1 FROM jsonb_array_elements\(\$16::jsonb\) AS range_obj
WHERE \(w.status->>'health'\)::jsonb->>'overallHealth' IS NOT NULL
AND \(\(w.status->>'health'\)::jsonb->>'overallHealth'\)::double precision >= \(range_obj->>'min'\)::double precision
AND \(\(w.status->>'health'\)::jsonb->>'overallHealth'\)::double precision <= \(range_obj->>'max'\)::double precision
\)
\)
AND \(
\(COALESCE\(\$17::jsonb, '\[\]'::jsonb\) = '\[\]'::jsonb OR
\(SELECT COUNT\(\*\) FROM jsonb_array_elements\(\$17::jsonb\) AS key_condition
WHERE
CASE
WHEN key_condition->>'operator' = 'not_exists' THEN
NOT \(e.tags \? \(key_condition->>'key'\)\)
ELSE
e.tags \? \(key_condition->>'key'\)
END
\) = jsonb_array_length\(\$17::jsonb\)
\)
AND
\(COALESCE\(\$18::jsonb, '\[\]'::jsonb\) = '\[\]'::jsonb OR
\(SELECT COUNT\(\*\) FROM jsonb_array_elements\(\$18::jsonb\) AS condition
WHERE e.tags->>\(condition->>'key'\) = ANY\(
SELECT jsonb_array_elements_text\(condition->'values'\)
\)
\) > 0
\)
\)
AND \(
\(COALESCE\(\$19::jsonb, '\[\]'::jsonb\) = '\[\]'::jsonb OR
\(SELECT COUNT\(\*\) FROM jsonb_array_elements\(\$19::jsonb\) AS key_condition
WHERE
CASE
WHEN key_condition->>'operator' = 'not_exists' THEN
NOT \(w.labels \? \(key_condition->>'key'\)\)
ELSE
w.labels \? \(key_condition->>'key'\)
END
\) > 0
\)
OR
\(COALESCE\(\$20::jsonb, '\[\]'::jsonb\) = '\[\]'::jsonb OR
\(SELECT COUNT\(\*\) FROM jsonb_array_elements\(\$20::jsonb\) AS condition
WHERE w.labels->>\(condition->>'key'\) = ANY\(
SELECT jsonb_array_elements_text\(condition->'values'\)
\)
\) > 0
\)
\)
AND \(
\(COALESCE\(\$21::jsonb, '\[\]'::jsonb\) = '\[\]'::jsonb OR
\(SELECT COUNT\(\*\) FROM jsonb_array_elements\(\$21::jsonb\) AS key_condition
WHERE
CASE
WHEN key_condition->>'operator' = 'not_exists' THEN
NOT \(w.labels \? \(key_condition->>'key'\)\)
ELSE
w.labels \? \(key_condition->>'key'\)
END
\) = jsonb_array_length\(\$21::jsonb\)
\)
AND
\(COALESCE\(\$22::jsonb, '\[\]'::jsonb\) = '\[\]'::jsonb OR
\(SELECT COUNT\(\*\) FROM jsonb_array_elements\(\$22::jsonb\) AS condition
WHERE w.labels->>\(condition->>'key'\) = ANY\(
SELECT jsonb_array_elements_text\(condition->'values'\)
\)
\) = jsonb_array_length\(\$22::jsonb\)
\)
\)
GROUP BY r\.status`

rows := mock.NewRows([]string{"status", "count"}).
AddRow("passed", int64(5)).
AddRow("failed", int64(2))

// Create parameters struct with all required fields
params := GetTestWorkflowExecutionsTotalsParams{
result, err := queries.GetTestWorkflowExecutionsTotals(ctx, GetTestWorkflowExecutionsTotalsParams{
OrganizationID: "org-id",
EnvironmentID: "env-id",
WorkflowName: "",
WorkflowNames: []string{},
TextSearch: "",
StartDate: pgtype.Timestamptz{Valid: false},
EndDate: pgtype.Timestamptz{Valid: false},
StartDate: pgtype.Timestamptz{},
EndDate: pgtype.Timestamptz{},
LastNDays: 0,
Statuses: []string{},
RunnerID: "",
Assigned: pgtype.Bool{Valid: false},
Assigned: pgtype.Bool{},
ActorName: "",
ActorType: "",
GroupID: "",
Initialized: pgtype.Bool{Valid: false},
Initialized: pgtype.Bool{},
HealthRanges: []byte("[]"),
TagKeys: []byte{},
TagConditions: []byte{},
LabelKeys: []byte{},
LabelConditions: []byte{},
SelectorKeys: []byte{},
SelectorConditions: []byte{},
OrganizationID: "org-id",
EnvironmentID: "env-id",
}

mock.ExpectQuery(expectedQuery).WithArgs(
params.OrganizationID,
params.EnvironmentID,
params.WorkflowName,
params.WorkflowNames,
params.TextSearch,
params.StartDate,
params.EndDate,
params.LastNDays,
params.Statuses,
params.RunnerID,
params.Assigned,
params.ActorName,
params.ActorType,
params.GroupID,
params.Initialized,
params.HealthRanges,
params.TagKeys,
params.TagConditions,
params.LabelKeys,
params.LabelConditions,
params.SelectorKeys,
params.SelectorConditions,
).WillReturnRows(rows)

// Execute query
result, err := queries.GetTestWorkflowExecutionsTotals(ctx, params)

// Assertions
assert.NoError(t, err)
assert.Len(t, result, 2)
assert.Equal(t, "passed", result[0].Status.String)
assert.Equal(t, int64(5), result[0].Count)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_InsertTestWorkflowExecution(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `INSERT INTO test_workflow_executions \(
id, group_id, runner_id, runner_target, runner_original_target, name, namespace, number,
scheduled_at, assigned_at, status_at, test_workflow_execution_name, disable_webhooks,
tags, running_context, config_params, organization_id, environment_id, runtime
\) VALUES \(
\$1, \$2, \$3, \$4, \$5, \$6, \$7, \$8,
\$9, \$10, \$11, \$12, \$13,
\$14, \$15, \$16, \$17, \$18, \$19
\)`

params := InsertTestWorkflowExecutionParams{
ID: "test-id",
GroupID: pgtype.Text{String: "group-1", Valid: true},
RunnerID: pgtype.Text{String: "runner-1", Valid: true},
RunnerTarget: []byte(`{}`),
RunnerOriginalTarget: []byte(`{}`),
Name: "test-execution",
Namespace: pgtype.Text{String: "default", Valid: true},
Number: pgtype.Int4{Int32: 1, Valid: true},
ScheduledAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
AssignedAt: pgtype.Timestamptz{Valid: false},
StatusAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
TestWorkflowExecutionName: pgtype.Text{Valid: false},
DisableWebhooks: pgtype.Bool{Bool: false, Valid: true},
Tags: []byte(`{"env":"test"}`),
RunningContext: []byte(`{}`),
ConfigParams: []byte(`{}`),
OrganizationID: "org-id",
EnvironmentID: "env-id",
Runtime: []byte(`{}`),
}

mock.ExpectExec(expectedQuery).WithArgs(
params.ID,
params.GroupID,
params.RunnerID,
params.RunnerTarget,
params.RunnerOriginalTarget,
params.Name,
params.Namespace,
params.Number,
params.ScheduledAt,
params.AssignedAt,
params.StatusAt,
params.TestWorkflowExecutionName,
params.DisableWebhooks,
params.Tags,
params.RunningContext,
params.ConfigParams,
params.OrganizationID,
params.EnvironmentID,
params.Runtime,
).WillReturnResult(pgxmock.NewResult("INSERT", 1))

// Execute query
err = queries.InsertTestWorkflowExecution(ctx, params)

// Assertions
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_UpdateTestWorkflowExecutionResult(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `UPDATE test_workflow_results
SET
status = \$1,
predicted_status = \$2,
queued_at = \$3,
started_at = \$4,
finished_at = \$5,
duration = \$6,
total_duration = \$7,
duration_ms = \$8,
paused_ms = \$9,
total_duration_ms = \$10,
pauses = \$11,
initialization = \$12,
steps = \$13
WHERE execution_id = \$14`

params := UpdateTestWorkflowExecutionResultParams{
Status: pgtype.Text{String: "passed", Valid: true},
PredictedStatus: pgtype.Text{String: "passed", Valid: true},
QueuedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
StartedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
FinishedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
Duration: pgtype.Text{String: "5m", Valid: true},
TotalDuration: pgtype.Text{String: "5m", Valid: true},
DurationMs: pgtype.Int4{Int32: 300000, Valid: true},
PausedMs: pgtype.Int4{Int32: 0, Valid: true},
TotalDurationMs: pgtype.Int4{Int32: 300000, Valid: true},
Pauses: []byte(`[]`),
Initialization: []byte(`{}`),
Steps: []byte(`{}`),
ExecutionID: "test-id",
}

mock.ExpectExec(expectedQuery).WithArgs(
params.Status,
params.PredictedStatus,
params.QueuedAt,
params.StartedAt,
params.FinishedAt,
params.Duration,
params.TotalDuration,
params.DurationMs,
params.PausedMs,
params.TotalDurationMs,
params.Pauses,
params.Initialization,
params.Steps,
params.ExecutionID,
).WillReturnResult(pgxmock.NewResult("UPDATE", 1))

// Execute query
err = queries.UpdateTestWorkflowExecutionResult(ctx, params)

// Assertions
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_DeleteTestWorkflowExecutionsByTestWorkflow(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `DELETE FROM test_workflow_executions e
USING test_workflows w
WHERE e\.id = w\.execution_id AND \(e\.organization_id = \$1 AND e\.environment_id = \$2\)
AND w\.workflow_type = 'workflow'
AND w\.name = \$3`

mock.ExpectExec(expectedQuery).WithArgs("org-id", "env-id", "test-workflow").WillReturnResult(pgxmock.NewResult("DELETE", 1))

// Execute query
err = queries.DeleteTestWorkflowExecutionsByTestWorkflow(ctx, DeleteTestWorkflowExecutionsByTestWorkflowParams{
OrganizationID: "org-id",
EnvironmentID: "env-id",
WorkflowName: "test-workflow",
})

// Assertions
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_DeleteAllTestWorkflowExecutions(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `DELETE FROM test_workflow_executions WHERE organization_id = \$1 AND environment_id = \$2`

mock.ExpectExec(expectedQuery).WithArgs("org-id", "env-id").WillReturnResult(pgxmock.NewResult("DELETE", 5))

// Execute query
err = queries.DeleteAllTestWorkflowExecutions(ctx, DeleteAllTestWorkflowExecutionsParams{
OrganizationID: "org-id",
EnvironmentID: "env-id",
})

// Assertions
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_AssignTestWorkflowExecution(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `UPDATE test_workflow_executions
SET
runner_id = \$1::text,
assigned_at = \$2
FROM test_workflow_results r
WHERE test_workflow_executions\.id = \$3 AND \(test_workflow_executions\.organization_id = \$4 AND test_workflow_executions\.environment_id = \$5\)
AND test_workflow_executions\.id = r\.execution_id
AND r\.status = 'queued'
AND \(\(test_workflow_executions\.runner_id IS NULL OR test_workflow_executions\.runner_id = ''\)
OR \(test_workflow_executions\.runner_id = \$1::text AND assigned_at < \$2\)
OR \(test_workflow_executions\.runner_id = \$6::text AND assigned_at < NOW\(\) - INTERVAL '1 minute' AND assigned_at < \$2\)\)
RETURNING test_workflow_executions\.id`

params := AssignTestWorkflowExecutionParams{
NewRunnerID: "new-runner",
AssignedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
ID: "test-id",
PrevRunnerID: "old-runner",
OrganizationID: "org-id",
EnvironmentID: "env-id",
}

rows := mock.NewRows([]string{"id"}).AddRow("test-id")
mock.ExpectQuery(expectedQuery).WithArgs(
params.NewRunnerID,
params.AssignedAt,
params.ID,
params.OrganizationID,
params.EnvironmentID,
params.PrevRunnerID,
).WillReturnRows(rows)

// Execute query
result, err := queries.AssignTestWorkflowExecution(ctx, params)

// Assertions
assert.NoError(t, err)
assert.Equal(t, "test-id", result)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_AbortTestWorkflowExecutionIfQueued(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `UPDATE test_workflow_executions
SET status_at = \$1
FROM test_workflow_results r
WHERE test_workflow_executions\.id = \$2 AND \(test_workflow_executions\.organization_id = \$3 AND test_workflow_executions\.environment_id = \$4\)
AND test_workflow_executions\.id = r\.execution_id
AND r\.status IN \('queued', 'assigned', 'starting', 'running', 'paused', 'resuming'\)
AND \(test_workflow_executions\.runner_id IS NULL OR test_workflow_executions\.runner_id = ''\)
RETURNING test_workflow_executions\.id`

params := AbortTestWorkflowExecutionIfQueuedParams{
AbortTime: pgtype.Timestamptz{Time: time.Now(), Valid: true},
ID: "test-id",
OrganizationID: "org-id",
EnvironmentID: "env-id",
}

rows := mock.NewRows([]string{"id"}).AddRow("test-id")
mock.ExpectQuery(expectedQuery).WithArgs(params.AbortTime, params.ID, "org-id", "env-id").WillReturnRows(rows)

// Execute query
result, err := queries.AbortTestWorkflowExecutionIfQueued(ctx, params)

// Assertions
assert.NoError(t, err)
assert.Equal(t, "test-id", result)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_AbortTestWorkflowResultIfQueued(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `UPDATE test_workflow_results
SET
status = 'aborted',
predicted_status = 'aborted',
finished_at = \$1,
initialization = jsonb_set\(
jsonb_set\(
jsonb_set\(COALESCE\(initialization, '\{\}'::jsonb\), '\{status\}', '"aborted"'\),
'\{errormessage\}', '"Aborted before initialization\."'
\),
'\{finishedat\}', to_jsonb\(\$1::timestamp\)
\)
WHERE execution_id = \$2
AND status IN \('queued', 'running', 'paused'\)`

params := AbortTestWorkflowResultIfQueuedParams{
AbortTime: pgtype.Timestamptz{Time: time.Now(), Valid: true},
ID: "test-id",
}

mock.ExpectExec(expectedQuery).WithArgs(params.AbortTime, params.ID).WillReturnResult(pgxmock.NewResult("UPDATE", 1))

// Execute query
err = queries.AbortTestWorkflowResultIfQueued(ctx, params)

// Assertions
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_GetPreviousFinishedState(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `SELECT r\.status
FROM test_workflow_executions e
LEFT JOIN test_workflow_results r ON e\.id = r\.execution_id
LEFT JOIN test_workflows w ON e\.id = w\.execution_id AND w\.workflow_type = 'workflow'
WHERE w\.name = \$1::text AND \(e\.organization_id = \$2 AND e\.environment_id = \$3\)
AND r\.finished_at < \$4
AND r\.status IN \('passed', 'failed', 'skipped', 'aborted', 'canceled', 'timeout'\)
ORDER BY r\.finished_at DESC
LIMIT 1`

params := GetPreviousFinishedStateParams{
WorkflowName: "test-workflow",
Date: pgtype.Timestamptz{Time: time.Now(), Valid: true},
OrganizationID: "org-id",
EnvironmentID: "env-id",
}

rows := mock.NewRows([]string{"status"}).AddRow("passed")
mock.ExpectQuery(expectedQuery).WithArgs(params.WorkflowName, params.OrganizationID, params.EnvironmentID, params.Date).WillReturnRows(rows)

// Execute query
result, err := queries.GetPreviousFinishedState(ctx, params)

// Assertions
assert.NoError(t, err)
assert.Equal(t, "passed", result.String)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_GetTestWorkflowExecutionTags(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `WITH tag_extracts AS \(
SELECT
e.id,
w.name as workflow_name,
tag_pair.key as tag_key,
tag_pair.value as tag_value
FROM test_workflow_executions e
LEFT JOIN test_workflows w ON e.id = w.execution_id AND w.workflow_type = 'workflow'
CROSS JOIN LATERAL jsonb_each_text\(e.tags\) AS tag_pair\(key, value\)
WHERE e.tags IS NOT NULL AND \(e\.organization_id = \$2 AND e\.environment_id = \$3\)
AND e.tags != '\{\}'::jsonb
AND jsonb_typeof\(e.tags\) = 'object'
\)
SELECT
tag_key::text,
array_agg\(DISTINCT tag_value ORDER BY tag_value\)::text\[\] as values
FROM tag_extracts
WHERE \(COALESCE\(\$1::text, ''\) = '' OR workflow_name = \$1::text\)
GROUP BY tag_key
ORDER BY tag_key`

rows := mock.NewRows([]string{"tag_key", "values"}).
AddRow("env", []string{"test", "prod"}).
AddRow("version", []string{"1.0", "2.0"})

mock.ExpectQuery(expectedQuery).WithArgs("test-workflow", "org-id", "env-id").WillReturnRows(rows)

// Execute query
result, err := queries.GetTestWorkflowExecutionTags(ctx, GetTestWorkflowExecutionTagsParams{
WorkflowName: "test-workflow",
OrganizationID: "org-id",
EnvironmentID: "env-id",
TagKeys: []string{},
TagConditions: []string{},
LabelKeys: []string{},
LabelConditions: []string{},
SelectorKeys: []string{},
SelectorConditions: []string{},
})

// Assertions
assert.NoError(t, err)
assert.Len(t, result, 2)
assert.Equal(t, "env", result[0].TagKey)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_GetTestWorkflowMetrics(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `SELECT
e\.id as execution_id,
e\.group_id,
r\.duration,
r\.duration_ms,
r\.status,
e\.name,
e\.scheduled_at as start_time,
e\.runner_id
FROM test_workflow_executions e
LEFT JOIN test_workflow_results r ON e\.id = r\.execution_id
LEFT JOIN test_workflows w ON e\.id = w\.execution_id AND w\.workflow_type = 'workflow'
WHERE w\.name = \$1::text AND \(e\.organization_id = \$2 AND e\.environment_id = \$3\)
AND \(\$4::integer = 0 OR e\.scheduled_at >= NOW\(\) - \(\$4::integer \|\| ' days'\)::interval\)
ORDER BY e\.scheduled_at DESC
LIMIT NULLIF\(\$5, 0\)`

params := GetTestWorkflowMetricsParams{
WorkflowName: "test-workflow",
LastNDays: 7,
Lmt: 10,
OrganizationID: "org-id",
EnvironmentID: "env-id",
}

rows := mock.NewRows([]string{
"execution_id", "group_id", "duration", "duration_ms", "status", "name", "start_time", "runner_id",
}).AddRow(
"exec-1", "group-1", "5m", int64(300000), "passed", "test-execution", time.Now(), "runner-1",
)

mock.ExpectQuery(expectedQuery).WithArgs(params.WorkflowName, params.OrganizationID, params.EnvironmentID, params.LastNDays, params.Lmt).WillReturnRows(rows)

// Execute query
result, err := queries.GetTestWorkflowMetrics(ctx, params)

// Assertions
assert.NoError(t, err)
assert.Len(t, result, 1)
assert.Equal(t, "exec-1", result[0].ExecutionID)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_InsertTestWorkflowSignature(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `INSERT INTO test_workflow_signatures \(
execution_id, ref, name, category, optional, negative, parent_id, sig_order
\) VALUES \(
\$1, \$2, \$3, \$4, \$5, \$6, \$7, \$8
\)
RETURNING test_workflow_signatures\.id`

params := InsertTestWorkflowSignatureParams{
ExecutionID: "test-id",
Ref: pgtype.Text{String: "step1", Valid: true},
Name: pgtype.Text{String: "Test Step", Valid: true},
Category: pgtype.Text{String: "test", Valid: true},
Optional: pgtype.Bool{Bool: false, Valid: true},
Negative: pgtype.Bool{Bool: false, Valid: true},
ParentID: pgtype.UUID{Valid: false},
SigOrder: 0,
}

id := uuid.New()
rows := mock.NewRows([]string{"id"}).AddRow(pgtype.UUID{Valid: true, Bytes: id})
mock.ExpectQuery(expectedQuery).WithArgs(
params.ExecutionID,
params.Ref,
params.Name,
params.Category,
params.Optional,
params.Negative,
params.ParentID,
params.SigOrder,
).WillReturnRows(rows)

// Execute query
result, err := queries.InsertTestWorkflowSignature(ctx, params)

// Assertions
assert.NoError(t, err)
assert.Equal(t, pgtype.UUID{Valid: true, Bytes: id}, result)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestSQLCTestWorkflowExecutionQueries_UpdateTestWorkflowExecution(t *testing.T) {
mock, err := pgxmock.NewPool()
require.NoError(t, err)
defer mock.Close()

queries := New(mock)
ctx := context.Background()

expectedQuery := `UPDATE test_workflow_executions
SET
group_id = \$1,
runner_id = \$2,
runner_target = \$3,
runner_original_target = \$4,
name = \$5,
namespace = \$6,
number = \$7,
scheduled_at = \$8,
assigned_at = \$9,
status_at = \$10,
test_workflow_execution_name = \$11,
disable_webhooks = \$12,
tags = \$13,
running_context = \$14,
config_params = \$15,
runtime = \$16
WHERE id = \$17 AND \(organization_id = \$18 AND environment_id = \$19\)`

params := UpdateTestWorkflowExecutionParams{
GroupID: pgtype.Text{String: "group-1", Valid: true},
RunnerID: pgtype.Text{String: "runner-1", Valid: true},
RunnerTarget: []byte(`{}`),
RunnerOriginalTarget: []byte(`{}`),
Name: "updated-execution",
Namespace: pgtype.Text{String: "default", Valid: true},
Number: pgtype.Int4{Int32: 2, Valid: true},
ScheduledAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
AssignedAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
StatusAt: pgtype.Timestamptz{Time: time.Now(), Valid: true},
TestWorkflowExecutionName: pgtype.Text{String: "test-execution", Valid: true},
DisableWebhooks: pgtype.Bool{Bool: false, Valid: true},
Tags: []byte(`{"env":"prod"}`),
RunningContext: []byte(`{}`),
ConfigParams: []byte(`{}`),
Runtime: []byte(`{}`),
ID: "test-id",
OrganizationID: "org-id",
EnvironmentID: "env-id",
}

mock.ExpectExec(expectedQuery).WithArgs(
params.GroupID,
params.RunnerID,
params.RunnerTarget,
params.RunnerOriginalTarget,
params.Name,
params.Namespace,
params.Number,
params.ScheduledAt,
params.AssignedAt,
params.StatusAt,
params.TestWorkflowExecutionName,
params.DisableWebhooks,
params.Tags,
params.RunningContext,
params.ConfigParams,
params.Runtime,
params.ID,
params.OrganizationID,
params.EnvironmentID,
).WillReturnResult(pgxmock.NewResult("UPDATE", 1))

// Execute query
err = queries.UpdateTestWorkflowExecution(ctx, params)

// Assertions
assert.NoError(t, err)
require.Len(t, result, 1)
assert.Equal(t, "passed", result[0].Status.String)
assert.Equal(t, int64(10), result[0].Count)
assert.NoError(t, mock.ExpectationsWereMet())
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Warning: large test coverage regression

This PR removes 18 test functions that covered critical CRUD operations (e.g., GetTestWorkflowExecution, InsertTestWorkflowExecution, UpdateTestWorkflowExecutionResult, AbortTestWorkflowResultIfQueued, AssignTestWorkflowExecution, DeleteAllTestWorkflowExecutions, and more) — none of which were changed by this PR.

The single surviving test uses mock.ExpectQuery("SELECT"), which matches any SELECT query and does not verify the actual SQL or parameter shape at all. The comment on line 25 even says "we just want to verify the function can be called". This means:

  • Regressions in the query text for the affected listing queries will not be caught.
  • Regressions in any of the removed queries (insert, update, delete, assign, abort, etc.) are now completely uncovered.

If these tests were removed because they were hard to maintain after the type change from []byte to []string, they should be restored with updated parameter types rather than deleted.

Comment on lines 1997 to 1999
if !*label.Exists {
condType = "not_exists"
key = key + ":not_exists"
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Info: replace removes all occurrences of :not_exists, not just the suffix

replace(key_condition, ':not_exists', '') in the SQL (matched by the label stored as key + ":not_exists" here) replaces all occurrences of the literal :not_exists substring. For standard Kubernetes label keys (which follow [a-zA-Z0-9_.-] / prefix/name format and cannot contain :), this is harmless. However, if a user-defined label key ever contains :not_exists as a substring, the stripped key would be incorrect.

A more robust approach would be to use left(key_condition, length(key_condition) - length(':not_exists')) in the SQL, or encode the suffix differently (e.g., use a separate boolean flag or a different delimiter character that cannot appear in valid label keys).

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@de-wim
Copy link
Copy Markdown
Contributor Author

de-wim commented Mar 23, 2026

@copilot add unit tests covering all the queries we've changed in this PR

Copy link
Copy Markdown
Contributor

Copilot AI commented Mar 23, 2026

@de-wim I've opened a new pull request, #7402, to work on those changes. Once the pull request is ready, I'll request review from you.

Copy link
Copy Markdown
Collaborator

@vsukhin vsukhin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, but return all the removed tests

@de-wim
Copy link
Copy Markdown
Contributor Author

de-wim commented Mar 24, 2026

looks good, but return all the removed tests

Do we really want to test what query it will run? These tests will break every time we change the slightest thing in the queries? What is the value of such tests?

@de-wim de-wim requested a review from a team as a code owner March 24, 2026 22:08
@de-wim de-wim requested a review from ypoplavs March 24, 2026 22:08
@de-wim de-wim force-pushed the TKC-5244-postgres-list-executions-slow branch from 5dec566 to 9b145fd Compare March 24, 2026 22:33
@vsukhin
Copy link
Copy Markdown
Collaborator

vsukhin commented Mar 25, 2026

@de-wim also please add [TKC-...] issue to the PR title

@de-wim de-wim changed the title fix: execution listing queries should be faster now [TKC-5244] fix: execution listing queries should be faster now Mar 25, 2026
@de-wim de-wim merged commit cf3d825 into main Mar 25, 2026
10 of 11 checks passed
@de-wim de-wim deleted the TKC-5244-postgres-list-executions-slow branch March 25, 2026 11:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants