Skip to content

feat: workqueue finalization#324

Open
yarolegovich wants to merge 10 commits intomainfrom
yarolegovich/workqueue-finalization
Open

feat: workqueue finalization#324
yarolegovich wants to merge 10 commits intomainfrom
yarolegovich/workqueue-finalization

Conversation

@yarolegovich
Copy link
Copy Markdown
Member

  • Added CallContext field to workqueue.Message struct
  • Added workqueue.NewInMemory
  • Implemented limiter.ConcurrencyConfig handling in workqueues
  • Implemented a default CallContext codec so that service params and auth information can be restored for execution
  • Added BeforeExecutionCallback and AfterExecutionCallback for workqueue.PullQueueConfig as extension points for implementing custom context injection and execution result handling (complete or return a message)
  • Push queues have total control over invoking execution so no need for adding callbacks there. Added a workqueue.WriterFunc for creating writer decorators.
  • Fixed testutil.Logger to use AttachToContext instead of slog.SetDefault which was problematic for parallel tests

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces cross-process context propagation for distributed task execution by adding a ContextCodec interface and implementing it for call contexts (user identity, service parameters, and tenant data). It also enhances the work queue infrastructure by implementing concurrency limiting via a new semaphore utility across in-memory, pull, and push queue variants, and adds execution callbacks to the pull queue. The review feedback highlights several critical issues: potential nil pointer dereferences in the work queues if used before handler registration, unreachable error handling logic in the pull queue, and data integrity concerns regarding shallow copies of service parameters and fragile type assertions during context decoding.

Comment thread a2asrv/ctxcodec.go Outdated
Comment thread a2asrv/workqueue/pullqueue.go Outdated
}

if handleErr != nil {
if returnErr := msg.Return(ctx, handleErr); returnErr != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If handleErr is ErrMalformedPayload, calling msg.Return contradicts the definition of ErrMalformedPayload as a non-retryable error. Returning it to the queue will cause it to be redelivered and fail again. Such errors should result in the message being completed (and optionally moved to a Dead Letter Queue).

Comment thread a2asrv/svcparams.go Outdated
Comment thread a2asrv/workqueue/inmemory.go
Comment thread a2asrv/workqueue/pushqueue.go
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant