Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `MetadataSet` to stage job metadata updates from worker middleware, `HookWorkBegin`, workers, or `HookWorkEnd`, with changes persisted when the job is completed. [PR #1269](https://github.com/riverqueue/river/pull/1269)

### Fixed

⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259)
Expand Down
141 changes: 139 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,37 @@ func Test_Client_Common(t *testing.T) {
require.True(t, workEndHookCalled)
})

t.Run("WithWorkerSettingMetadata", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

AddWorkerArgs(bundle.config.Workers, JobArgs{}, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return MetadataSet(ctx, "worker_key", "worker_value")
}))
Comment thread
peter941221 marked this conversation as resolved.

client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)

var metadata map[string]any
require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata))
require.Equal(t, "worker_value", metadata["worker_key"])
})

t.Run("WithInsertBeginHookOnJobArgs", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1394,6 +1425,110 @@ func Test_Client_Common(t *testing.T) {
require.True(t, hookWorkEndCalled.Load())
})

t.Run("WithWorkEndHookSettingMetadata", func(t *testing.T) {
Comment thread
peter941221 marked this conversation as resolved.
t.Parallel()

_, bundle := setup(t)

bundle.config.Hooks = []rivertype.Hook{
HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error {
require.NoError(t, MetadataSet(ctx, "hook_key", "hook_value"))
return err
}),
}

client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, noOpArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)

var metadata map[string]any
require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata))
require.Equal(t, "hook_value", metadata["hook_key"])
})

t.Run("WithWorkBeginHookSettingMetadata", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

bundle.config.Hooks = []rivertype.Hook{
HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error {
return MetadataSet(ctx, "hook_begin_key", "hook_begin_value")
}),
}

client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, noOpArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)

var metadata map[string]any
require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata))
require.Equal(t, "hook_begin_value", metadata["hook_begin_key"])
})

t.Run("WithWorkerMiddlewareSettingMetadata", func(t *testing.T) {
t.Parallel()

_, bundle := setup(t)

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

worker := &workerWithMiddleware[JobArgs]{
workFunc: func(ctx context.Context, job *Job[JobArgs]) error {
return nil
},
middlewareFunc: func(job *rivertype.JobRow) []rivertype.WorkerMiddleware {
require.Equal(t, (JobArgs{}).Kind(), job.Kind)

return []rivertype.WorkerMiddleware{
WorkerMiddlewareFunc(func(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error {
require.NoError(t, MetadataSet(ctx, "middleware_key", "middleware_value"))
return doInner(ctx)
}),
}
},
}

AddWorker(bundle.config.Workers, worker)

client, err := NewClient(riverpgxv5.New(bundle.dbPool), bundle.config)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes.Job.ID, event.Job.ID)

var metadata map[string]any
require.NoError(t, json.Unmarshal(event.Job.Metadata, &metadata))
require.Equal(t, "middleware_value", metadata["middleware_key"])
})

t.Run("WithGlobalWorkerMiddleware", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1986,8 +2121,10 @@ func Test_Client_Stop_Common(t *testing.T) {

client := runNewTestClient(ctx, t, newTestConfig(t, ""))

// Should shut down quickly:
ctx, cancel := context.WithTimeout(ctx, time.Second)
// Shutdown should still complete promptly, but the client may need a full
// leadership resign attempt before stopping, which can take about a second
// under race instrumentation.
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

require.NoError(t, client.Stop(ctx))
Expand Down
38 changes: 38 additions & 0 deletions metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package river

import (
"context"
"errors"
"strings"

"github.com/riverqueue/river/internal/jobexecutor"
)

var errMetadataNotSettable = errors.New("MetadataSet must be called within a worker, worker middleware, or work hook")

// MetadataSet records a metadata value to be merged into the job's metadata
// when the current work attempt finishes.
//
// This function is only valid from a worker, worker middleware, or work hook
// like rivertype.HookWorkBegin or rivertype.HookWorkEnd.
//
// Metadata updates are stored on the work context and merged into the job row
// when the current work attempt finishes, whether the attempt succeeds or
// errors. Values must be JSON marshalable because metadata is stored in a
// jsonb column, and setting a key replaces any existing value at that key.
//
// Keys prefixed with `river:` are reserved for internal use and may not be set
// by user code.
func MetadataSet(ctx context.Context, key string, value any) error {
if strings.HasPrefix(key, "river:") {
return errors.New("MetadataSet cannot be used with keys prefixed with `river:`")
}

metadataUpdates, ok := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
if !ok {
return errMetadataNotSettable
}

metadataUpdates[key] = value
return nil
}
40 changes: 40 additions & 0 deletions metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package river

import (
"context"
"testing"

"github.com/riverqueue/river/internal/jobexecutor"
"github.com/stretchr/testify/require"
)

func TestMetadataSet(t *testing.T) {
t.Parallel()

t.Run("RejectsReservedPrefix", func(t *testing.T) {
t.Parallel()

ctx := context.WithValue(context.Background(), jobexecutor.ContextKeyMetadataUpdates, map[string]any{})

err := MetadataSet(ctx, "river:reserved", "value")
require.EqualError(t, err, "MetadataSet cannot be used with keys prefixed with `river:`")
})

t.Run("RequiresWorkContext", func(t *testing.T) {
t.Parallel()

err := MetadataSet(context.Background(), "key", "value")
require.EqualError(t, err, "MetadataSet must be called within a worker, worker middleware, or work hook")
})

t.Run("SetsValueOnWorkContext", func(t *testing.T) {
t.Parallel()

metadataUpdates := map[string]any{}
ctx := context.WithValue(context.Background(), jobexecutor.ContextKeyMetadataUpdates, metadataUpdates)

err := MetadataSet(ctx, "key", "value")
require.NoError(t, err)
require.Equal(t, "value", metadataUpdates["key"])
})
}
Loading