Distributed task queue for Go, backed by PostgreSQL.
- Features
- Installation
- Database Setup
- Quick Start
- Architecture
- Producer
- Consumer
- Client
- Web UI
- Observability
- Demo
- Performance
- Testing
- Contributing
- Support
- Project Status
- License
- PostgreSQL-native – no Redis or external broker required
- Safe concurrent processing – multiple consumers with no duplicate work
- Flexible retries – exponential or constant backoff, skip retry, reschedule for later
- Delayed & scheduled tasks – run tasks at a future time
- Batch & transactional enqueue – bulk insert with deduplication, enqueue within your DB transaction
- Per-type worker pools – independent concurrency and timeout settings per task type
- Automatic maintenance – leader election, stuck task rescue, old task cleanup
- Web dashboard – Web UI with task inspection, filtering, retry/cancel/delete, and auth (Basic/OAuth)
- OpenTelemetry – built-in metrics and distributed tracing
go get github.com/yakser/asynqpgRequires Go 1.25+ and PostgreSQL 14+.
Apply migrations to create the asynqpg_tasks table:
make up # start PostgreSQL in Docker
make migrate # apply migrationsOr use testcontainers-go in tests – no manual setup needed.
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/yakser/asynqpg"
"github.com/yakser/asynqpg/producer"
)
func main() {
db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
if err != nil {
log.Fatal(err)
}
p, err := producer.New(producer.Config{Pool: db})
if err != nil {
log.Fatal(err)
}
payload, _ := json.Marshal(map[string]string{"to": "user@example.com", "subject": "Hello"})
_, err = p.Enqueue(context.Background(), asynqpg.NewTask("email:send", payload,
asynqpg.WithMaxRetry(5),
asynqpg.WithDelay(10*time.Second),
))
if err != nil {
log.Fatal(err)
}
}package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/yakser/asynqpg"
"github.com/yakser/asynqpg/consumer"
)
func main() {
db, err := sqlx.Connect("postgres", "postgres://postgres:password@localhost:5432/asynqpg?sslmode=disable")
if err != nil {
log.Fatal(err)
}
c, err := consumer.New(consumer.Config{Pool: db})
if err != nil {
log.Fatal(err)
}
if err := c.RegisterTaskHandler("email:send",
consumer.TaskHandlerFunc(func(ctx context.Context, task *asynqpg.TaskInfo) error {
fmt.Printf("Processing task %d: %s\n", task.ID, task.Type)
// process task...
return nil
}),
consumer.WithWorkersCount(5),
consumer.WithTimeout(30*time.Second),
); err != nil {
log.Fatal(err)
}
if err := c.Start(); err != nil {
log.Fatal(err)
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
if err := c.Stop(); err != nil {
log.Printf("shutdown error: %v", err)
}
}flowchart LR
%% Styles
classDef pending fill:#f59e0b,stroke:#92400e,color:#000
classDef running fill:#3b82f6,stroke:#1e40af,color:#fff
classDef completed fill:#22c55e,stroke:#166534,color:#000
classDef failed fill:#ef4444,stroke:#991b1b,color:#fff
classDef cancelled fill:#6b7280,stroke:#374151,color:#fff
classDef edge fill:#f3f4f6,stroke:#d1d5db,color:#374151
%% States
P["Pending"]:::pending
T["Running"]:::running
subgraph Final
C["Completed"]:::completed
F["Failed"]:::failed
Ca["Cancelled"]:::cancelled
end
%% Entry points
Enqueue(["Enqueue"]):::edge --> P
EnqueueDelayed(["Enqueue (with delay)"]):::edge --> P
%% Normal processing
P -- "fetched by consumer" --> T
T -- "handler success" --> C
T -- "handler error, attempts_left > 0" --> P
T -- "handler error, attempts_left = 0" --> F
%% Rescuer (stuck tasks)
T -- "stuck, attempts_left > 0 (Rescuer)" --> P
T -- "stuck, attempts_left = 0 (Rescuer)" --> F
%% Manual actions via client API
F -- "manual retry" --> P
Ca -- "manual retry" --> P
P -- "manual cancel" --> Ca
F -- "manual cancel" --> Ca
%% Cleaner (maintenance, leader-only)
C -. "auto-delete (Cleaner)" .-> Deleted(["Deleted"]):::edge
F -. "auto-delete (Cleaner)" .-> Deleted
Ca -. "auto-delete (Cleaner)" .-> Deleted
%% Manual delete via client API
P -- "manual delete" --> Deleted
C -- "manual delete" --> Deleted
F -- "manual delete" --> Deleted
Ca -- "manual delete" --> Deleted
Tasks are fetched using SELECT ... FOR NO KEY UPDATE SKIP LOCKED, enabling safe concurrent processing across multiple consumers. The blocked_till column serves as both a delay mechanism (delayed enqueue, retry backoff) and a distributed lock expiry – a task re-appears for fetching only after blocked_till passes.
| Package | Purpose |
|---|---|
producer/ |
Enqueue tasks: Enqueue, EnqueueTx, EnqueueMany, EnqueueManyTx |
consumer/ |
Fetch and process tasks with configurable worker pools |
client/ |
Task inspection and management (get, list, cancel, retry, delete) |
ui/ |
HTTP handler serving REST API + embedded React SPA |
Create a producer with producer.New:
p, err := producer.New(producer.Config{
Pool: db, // required: *sqlx.DB
Logger: slog.Default(), // optional
DefaultMaxRetry: 3, // optional, default: 3
MeterProvider: mp, // optional: OTel metrics
TracerProvider: tp, // optional: OTel traces
})// Single task
id, err := p.Enqueue(ctx, task)
// Within an existing transaction (atomic with your business logic)
id, err = p.EnqueueTx(ctx, tx, task)
// Batch enqueue (skips duplicates by idempotency token)
ids, err := p.EnqueueMany(ctx, tasks)
// Batch within a transaction
ids, err := p.EnqueueManyTx(ctx, tx, tasks)asynqpg.NewTask("type", payload,
asynqpg.WithMaxRetry(5), // max retry attempts
asynqpg.WithDelay(10*time.Second), // delay before first processing
asynqpg.WithIdempotencyToken("unique-token"), // deduplicate enqueues
)
// Schedule a task for a specific time
task := asynqpg.NewTask("report:generate", payload)
task.ProcessAt = time.Date(2026, 1, 1, 9, 0, 0, 0, time.UTC)Create a consumer with consumer.New:
c, err := consumer.New(consumer.Config{
Pool: db, // required
ClientID: "worker-1", // optional, for leader election
RetryPolicy: retryPolicy, // optional, default: exponential backoff
FetchInterval: 100*time.Millisecond,
ShutdownTimeout: 30*time.Second,
// Retention for completed/failed/cancelled tasks
CompletedRetention: 24*time.Hour,
FailedRetention: 7*24*time.Hour,
CancelledRetention: 24*time.Hour,
// DisableMaintenance: true, // set to disable rescuer + cleaner
// DisableBatchCompleter: true, // set to disable batch completions
})err := c.RegisterTaskHandler("email:send", handler,
consumer.WithWorkersCount(10), // goroutines for this task type
consumer.WithMaxAttempts(5), // override default
consumer.WithTimeout(30*time.Second), // per-task execution timeout
)Implement consumer.TaskHandler or use the TaskHandlerFunc adapter:
type TaskHandler interface {
Handle(ctx context.Context, task *asynqpg.TaskInfo) error
}Exponential backoff (default) – attempt^4 seconds with 10% jitter, capped at 24h:
&asynqpg.DefaultRetryPolicy{MaxRetryDelay: 24 * time.Hour}Constant delay:
&asynqpg.ConstantRetryPolicy{Delay: 5 * time.Second}If a handler encounters a permanent error that should not be retried (e.g., invalid payload, business logic rejection), it can return asynqpg.ErrSkipRetry to immediately fail the task, skipping all remaining retry attempts:
func (h *EmailHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
var payload EmailPayload
if err := json.Unmarshal(task.Payload, &payload); err != nil {
// Invalid payload – retrying won't help
return fmt.Errorf("bad payload: %w", asynqpg.ErrSkipRetry)
}
// process task...
return nil
}SkipRetry works with errors.Is, so it can be wrapped with additional context via fmt.Errorf("...: %w", asynqpg.ErrSkipRetry).
Sometimes a handler needs to defer processing without counting it as a failure. TaskSnooze reschedules the task after a given duration without counting it as an attempt – attempts_left and attempts_elapsed remain unchanged:
func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
if !isExternalServiceReady() {
// Try again in 30 seconds, doesn't count as a failed attempt
return asynqpg.TaskSnooze(30 * time.Second)
}
// process task...
return nil
}If you want to reschedule and count it as a failed attempt (with error message stored), use TaskSnoozeWithError:
func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
if err := callExternalAPI(); err != nil {
// Retry in 1 minute, counts as an attempt, error message is stored
return fmt.Errorf("api unavailable: %w", asynqpg.TaskSnoozeWithError(1 * time.Minute))
}
// process task...
return nil
}Key differences:
TaskSnooze |
TaskSnoozeWithError |
|
|---|---|---|
| Counts as attempt | No | Yes |
| Stores error message | No | Yes |
| Respects max retries | No (unlimited snoozes) | Yes (fails when exhausted) |
| Use case | External dependency not ready | Transient error with custom delay |
Both work with errors.As and can be wrapped with fmt.Errorf. Panics on negative duration; zero duration makes the task immediately available.
The library uses two distinct structs to separate concerns:
Task(root package) -- the input struct for enqueueing. Contains only fields you set when creating a task:Type,Payload,Delay,MaxRetry,IdempotencyToken.TaskInfo(root package) -- the runtime struct passed to handlers. Contains all database-assigned fields needed during processing:
func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
task.ID // database ID
task.Type // task type
task.Payload // task payload
task.AttemptsLeft // remaining retry attempts
task.AttemptsElapsed // number of attempts already made
task.CreatedAt // when the task was first enqueued
task.Messages // error messages from previous failed attempts
task.AttemptedAt // when the current processing attempt started
// ...
}The client package has its own client.TaskInfo -- a full database read model returned by client.GetTask / client.ListTasks for inspection and management. It includes additional fields like Status, BlockedTill, UpdatedAt, and FinalizedAt.
Task metadata is also available via context helpers, useful in middleware and utilities:
func (h *MyHandler) Handle(ctx context.Context, task *asynqpg.TaskInfo) error {
id, _ := asynqpg.GetTaskID(ctx) // database ID
retry, _ := asynqpg.GetRetryCount(ctx) // attempts already elapsed
max, _ := asynqpg.GetMaxRetry(ctx) // total max retry count
createdAt, _ := asynqpg.GetCreatedAt(ctx) // task creation time
// Or get all metadata at once:
meta, ok := asynqpg.GetTaskMetadata(ctx)
// meta.ID, meta.RetryCount, meta.MaxRetry, meta.CreatedAt
// ...
}For testing handlers, use asynqpg.WithTaskMetadata to create a context with metadata:
ctx := asynqpg.WithTaskMetadata(context.Background(), asynqpg.TaskMetadata{
ID: 42, RetryCount: 0, MaxRetry: 3, CreatedAt: time.Now(),
})
err := handler.Handle(ctx, task)The consumer supports composable middleware for cross-cutting concerns. Middleware wraps task handlers using the func(TaskHandler) TaskHandler pattern, similar to net/http middleware.
Global middleware applies to all task types:
c, _ := consumer.New(config)
_ = c.Use(func(next consumer.TaskHandler) consumer.TaskHandler {
return consumer.TaskHandlerFunc(func(ctx context.Context, task *asynqpg.TaskInfo) error {
slog.Info("processing task", "type", task.Type, "id", task.ID)
err := next.Handle(ctx, task)
slog.Info("task done", "type", task.Type, "id", task.ID, "error", err)
return err
})
})Per-task-type middleware applies only to a specific handler:
c.RegisterTaskHandler("email:send", emailHandler,
consumer.WithMiddleware(rateLimitMiddleware),
consumer.WithWorkersCount(5),
)Execution order: global middleware (outermost, first registered runs first) wraps per-task middleware, which wraps the handler. A middleware can short-circuit by returning an error without calling next.Handle.
c.Start() // start processing
c.Stop() // graceful shutdown (uses configured ShutdownTimeout)
c.Shutdown(timeout) // graceful shutdown with custom timeoutInspect and manage tasks:
cl, err := client.New(client.Config{Pool: db})
// Get a single task
info, err := cl.GetTask(ctx, taskID)
// List tasks with filtering
result, err := cl.ListTasks(ctx, client.NewListParams().
States(asynqpg.TaskStatusFailed, asynqpg.TaskStatusPending).
Types("email:send").
Limit(50).
OrderBy(client.OrderByCreatedAt, client.SortDesc),
)
// result.Tasks, result.Total
// Manage tasks
_, err = cl.CancelTask(ctx, id) // pending/failed → cancelled
_, err = cl.RetryTask(ctx, id) // failed/cancelled → pending
_, err = cl.DeleteTask(ctx, id) // remove from databaseAll methods have *Tx variants for transactional use.
Mount the dashboard as an HTTP handler:
handler, err := ui.NewHandler(ui.HandlerOpts{
Pool: db,
Prefix: "/asynqpg",
})
http.Handle("/asynqpg/", handler)Basic Auth:
ui.HandlerOpts{
BasicAuth: &ui.BasicAuth{Username: "admin", Password: "secret"},
}OAuth providers (e.g., GitHub):
ui.HandlerOpts{
AuthProviders: []ui.AuthProvider{githubProvider},
SecureCookies: true, // for HTTPS
SessionMaxAge: 24*time.Hour,
}Implement the ui.AuthProvider interface to add any OAuth/SSO provider. See examples/demo/github_provider.go for a complete GitHub OAuth implementation.
All public components accept optional MeterProvider and TracerProvider. When nil, the global OpenTelemetry provider is used.
| Metric | Type | Description |
|---|---|---|
asynqpg.tasks.enqueued |
Counter | Tasks enqueued |
asynqpg.tasks.processed |
Counter | Tasks finished processing |
asynqpg.tasks.errors |
Counter | Processing or enqueue errors |
asynqpg.task.duration |
Histogram | Handler execution duration (seconds) |
asynqpg.task.enqueue_duration |
Histogram | Enqueue latency (seconds) |
asynqpg.tasks.in_flight |
UpDownCounter | Currently executing tasks |
All metrics are tagged with task_type and status attributes.
Spans are created for all enqueue and processing operations with proper SpanKind (Producer/Consumer/Client).
A pre-built Grafana dashboard is included at deploy/grafana/dashboards/asynqpg-overview.json. It is provisioned automatically when you run make demo-up.
The dashboard refreshes every 10 seconds and includes:
- Stat panels – total tasks enqueued, processed, errors, and currently in-flight
- Enqueue Rate – tasks enqueued per second over time
- Processing Rate by Status – throughput broken down by
completed/failed/cancelled - Task Duration Quantiles – p50/p95/p99 handler execution latency
- Enqueue Duration Quantiles – p50/p95/p99 enqueue latency
- Error Rate by Type – error count per task type over time
- Error Ratio – fraction of processed tasks that errored
- In-Flight Over Time – concurrently executing tasks over time
All panels support a Task Type variable filter to drill down into a specific task type.
The demo includes a full observability stack (Jaeger, Prometheus, Grafana, OTel Collector) in deploy/:
make demo-up # start PostgreSQL + observability stack- Jaeger –
http://localhost:16686 - Prometheus –
http://localhost:9090 - Grafana –
http://localhost:3000
Run the full demo with producers, consumers, web UI, and observability:
make demo # start everything
make demo-down # stop all servicesThe library includes Go benchmarks covering SQL-level operations, batch completion, producer throughput, and consumer processing.
make bench # Run benchmarks (uses testcontainers, requires Docker)make test # unit tests (go test -race -count=1 ./...)
make test-integration # integration tests (uses testcontainers, no manual DB needed)
make test-all # both
make bench # benchmarks (integration, requires Docker)
make lint # golangci-lintContributions are welcome, including bug reports, feature requests,
documentation improvements, and code changes. See CONTRIBUTING.md
for local setup, contribution workflow, testing expectations, and pull request
guidelines.
- Bug reports & feature requests: GitHub Issues
- Move
testutilsto a separate module to avoid pullingtestcontainers-gointo consumer projects
Under active development. The API may change before v1.0. Bug reports and feature requests are welcome.








