From 39c4f1f047721cfe996f5f37a97fdfad652f3aa8 Mon Sep 17 00:00:00 2001 From: Brandur Date: Wed, 10 Jun 2026 13:02:59 -0500 Subject: [PATCH] Validation to detect duplicate step names in resumable jobs Follows up #1226 to tighten things up a little with a validation. If a user accidentally puts in duplicate step names, detect this problem and return an error. A duplicate step name represents a step that can never be resumed from, so it makes sense to find out about this sooner rather than later. --- CHANGELOG.md | 1 + internal/rivermiddleware/middleware.go | 2 + resumable.go | 20 ++++++++ resumable_test.go | 70 ++++++++++++++++++++++++++ 4 files changed, 93 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index efad6e82..7b2d6527 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276) +- Detect duplicate step names across `river.ResumableStep` and return a validation error. [PR #1281](https://github.com/riverqueue/river/pull/1281) ### Fixed diff --git a/internal/rivermiddleware/middleware.go b/internal/rivermiddleware/middleware.go index 00338906..81a6caa2 100644 --- a/internal/rivermiddleware/middleware.go +++ b/internal/rivermiddleware/middleware.go @@ -34,6 +34,7 @@ func (*ResumableMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doI } state := &ResumableState{ + AllStepNames: make(map[string]struct{}), Cursors: make(map[string]json.RawMessage), ResumeMatched: true, ResumeStep: gjson.GetBytes(job.Metadata, rivercommon.MetadataKeyResumableStep).Str, @@ -80,6 +81,7 @@ func (*ResumableMiddleware) Work(ctx context.Context, job *rivertype.JobRow, doI // ResumableState holds the state for a resumable job execution. It is stored in // the context and accessed by ResumableStep and ResumableStepCursor. type ResumableState struct { + AllStepNames map[string]struct{} CompletedStep string Cursors map[string]json.RawMessage Err error diff --git a/resumable.go b/resumable.go index d286c6e2..447cff63 100644 --- a/resumable.go +++ b/resumable.go @@ -45,6 +45,8 @@ type StepOpts struct{} // ResumableStep runs a resumable step, skipping the step on a later retry if // an earlier attempt already completed it successfully. +// Step names must be unique across all ResumableStep and ResumableStepCursor +// calls in the same Worker execution. // // After a step returns an error, no subsequent steps will be run and the // overall job will be marked as failed with that error. Be careful to put all @@ -57,6 +59,9 @@ func ResumableStep(ctx context.Context, name string, opts *StepOpts, stepFunc fu if state.Err != nil { return } + if !registerResumableStepName(state, name) { + return + } if !state.ResumeMatched { if name == state.ResumeStep { @@ -81,6 +86,8 @@ func ResumableStep(ctx context.Context, name string, opts *StepOpts, stepFunc fu // ResumableStepCursor runs a resumable step that also receives a persisted // cursor value from an earlier failed attempt, if one was recorded with // ResumableSetCursor. +// Step names must be unique across all ResumableStep and ResumableStepCursor +// calls in the same Worker execution. // // The cursor type T is user-specified. It may be a primitive value like an // integer ID, or a more complex type like a struct with multiple fields. It's @@ -102,6 +109,9 @@ func ResumableStepCursor[TCursor any](ctx context.Context, name string, opts *St if state.Err != nil { return } + if !registerResumableStepName(state, name) { + return + } if !state.ResumeMatched { if name == state.ResumeStep { @@ -149,6 +159,16 @@ func mustResumableState(ctx context.Context) *rivermiddleware.ResumableState { return state } +func registerResumableStepName(state *rivermiddleware.ResumableState, name string) bool { + if _, ok := state.AllStepNames[name]; ok { + state.Err = fmt.Errorf("river: duplicate resumable step name %q", name) + return false + } + + state.AllStepNames[name] = struct{}{} + return true +} + func resumableStateFromContext(ctx context.Context) (*rivermiddleware.ResumableState, bool) { state := ctx.Value(rivermiddleware.ResumableContextKey{}) if state == nil { diff --git a/resumable_test.go b/resumable_test.go index e15eec0d..8c17b336 100644 --- a/resumable_test.go +++ b/resumable_test.go @@ -26,6 +26,54 @@ func TestResumableStep(t *testing.T) { return ctx, metadataUpdates, &rivertype.JobRow{Metadata: []byte(metadata)} } + t.Run("DuplicateStepName", func(t *testing.T) { + t.Parallel() + + ctx, _, job := setup(t, `{}`) + + var ran []string + err := (&rivermiddleware.ResumableMiddleware{}).Work(ctx, job, func(ctx context.Context) error { + ResumableStep(ctx, "step1", nil, func(ctx context.Context) error { + ran = append(ran, "first") + return nil + }) + ResumableStep(ctx, "step1", nil, func(ctx context.Context) error { + ran = append(ran, "second") + return nil + }) + + return nil + }) + require.EqualError(t, err, `river: duplicate resumable step name "step1"`) + require.Equal(t, []string{"first"}, ran) + }) + + t.Run("DuplicateStepNameWhenSkippingCompletedSteps", func(t *testing.T) { + t.Parallel() + + ctx, _, job := setup(t, `{"river:resumable_step":"step2"}`) + + var ran []string + err := (&rivermiddleware.ResumableMiddleware{}).Work(ctx, job, func(ctx context.Context) error { + ResumableStep(ctx, "step1", nil, func(ctx context.Context) error { + ran = append(ran, "first") + return nil + }) + ResumableStep(ctx, "step1", nil, func(ctx context.Context) error { + ran = append(ran, "second") + return nil + }) + ResumableStep(ctx, "step2", nil, func(ctx context.Context) error { + ran = append(ran, "third") + return nil + }) + + return nil + }) + require.EqualError(t, err, `river: duplicate resumable step name "step1"`) + require.Empty(t, ran) + }) + t.Run("PanicsOutsideWorker", func(t *testing.T) { t.Parallel() @@ -131,6 +179,28 @@ func TestResumableStepCursor(t *testing.T) { return ctx, metadataUpdates, &rivertype.JobRow{Metadata: []byte(metadata)} } + t.Run("DuplicateStepNameSharedWithCursorStep", func(t *testing.T) { + t.Parallel() + + ctx, _, job := setup(t, `{}`) + + var ran []string + err := (&rivermiddleware.ResumableMiddleware{}).Work(ctx, job, func(ctx context.Context) error { + ResumableStep(ctx, "step1", nil, func(ctx context.Context) error { + ran = append(ran, "step") + return nil + }) + ResumableStepCursor(ctx, "step1", nil, func(ctx context.Context, cursor resumableCursor) error { + ran = append(ran, "cursor") + return nil + }) + + return nil + }) + require.EqualError(t, err, `river: duplicate resumable step name "step1"`) + require.Equal(t, []string{"step"}, ran) + }) + t.Run("ResumesCursor", func(t *testing.T) { t.Parallel()