diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 22df5060e..b74a62f0c 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -22,21 +22,23 @@ import ( const statsTTL = 90 * 24 * time.Hour // 90 days -// LeaseDuration is the duration used to initially create a lease and to extend it thereafter. -const LeaseDuration = 30 * time.Second +// DefaultLeaseDuration is the default lease duration (30 seconds) +const DefaultLeaseDuration = 30 * time.Second // RDB is a client interface to query and mutate task queues. type RDB struct { client redis.UniversalClient clock timeutil.Clock queuesPublished sync.Map + leaseDuration time.Duration // configurable lease duration } // NewRDB returns a new instance of RDB. func NewRDB(client redis.UniversalClient) *RDB { return &RDB{ - client: client, - clock: timeutil.NewRealClock(), + client: client, + clock: timeutil.NewRealClock(), + leaseDuration: DefaultLeaseDuration, } } @@ -57,6 +59,15 @@ func (r *RDB) SetClock(c timeutil.Clock) { r.clock = c } +// SetLeaseDuration sets the lease duration for this RDB instance. +// If d is zero or negative, the default duration (30s) is used. +func (r *RDB) SetLeaseDuration(d time.Duration) { + if d <= 0 { + d = DefaultLeaseDuration + } + r.leaseDuration = d +} + // Ping checks the connection with redis server. func (r *RDB) Ping() error { return r.client.Ping(context.Background()).Err() @@ -250,7 +261,7 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationT base.ActiveKey(qname), base.LeaseKey(qname), } - leaseExpirationTime = r.clock.Now().Add(LeaseDuration) + leaseExpirationTime = r.clock.Now().Add(r.leaseDuration) argv := []interface{}{ leaseExpirationTime.Unix(), base.TaskKeyPrefix(qname), @@ -1352,10 +1363,10 @@ func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.Task return msgs, nil } -// ExtendLease extends the lease for the given tasks by LeaseDuration (30s). +// ExtendLease extends the lease for the given tasks by the configured lease duration. // It returns a new expiration time if the operation was successful. func (r *RDB) ExtendLease(qname string, ids ...string) (expirationTime time.Time, err error) { - expireAt := r.clock.Now().Add(LeaseDuration) + expireAt := r.clock.Now().Add(r.leaseDuration) var zs []redis.Z for _, id := range ids { zs = append(zs, redis.Z{Member: id, Score: float64(expireAt.Unix())}) diff --git a/server.go b/server.go index cde6516a1..879f96be0 100644 --- a/server.go +++ b/server.go @@ -252,6 +252,25 @@ type Config struct { // If unset or zero, default batch size of 100 is used. // Make sure to not put a big number as the batch size to prevent a long-running script. JanitorBatchSize int + + // LeaseDuration specifies the duration for which a worker can hold a lease on a task. + // The worker must extend the lease by sending heartbeat to the server before it expires. + // + // Longer lease duration provides better tolerance for network instability (e.g., cross-region VPS deployments) + // but delays task recovery when a worker crashes. + // + // Recommended range: 30s to 120s + // If unset or zero, default duration of 30 seconds is used. + LeaseDuration time.Duration + + // HeartbeatInterval specifies the interval between heartbeats sent by the server to extend task leases. + // + // Shorter intervals provide faster detection of worker failures but increase Redis load. + // LeaseDuration should be at least 3x HeartbeatInterval to allow for network failures. + // + // Recommended range: 3s to 10s + // If unset or zero, default interval of 5 seconds is used. + HeartbeatInterval time.Duration } // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. @@ -503,7 +522,34 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { } logger.SetLevel(toInternalLogLevel(loglevel)) + // Process LeaseDuration configuration + leaseDuration := cfg.LeaseDuration + if leaseDuration <= 0 { + leaseDuration = rdb.DefaultLeaseDuration // use constant from rdb package + } + if leaseDuration < 10*time.Second || leaseDuration > 5*time.Minute { + logger.Warnf("LeaseDuration %v is out of recommended range [10s, 5m]", leaseDuration) + } + + // Process HeartbeatInterval configuration + heartbeatInterval := cfg.HeartbeatInterval + if heartbeatInterval <= 0 { + heartbeatInterval = 5 * time.Second // default + } + if heartbeatInterval < 1*time.Second { + logger.Warnf("HeartbeatInterval %v is too short, may cause high Redis load", heartbeatInterval) + } + if heartbeatInterval > 30*time.Second { + logger.Warnf("HeartbeatInterval %v is too long, may delay lease expiration detection", heartbeatInterval) + } + + // Validate: LeaseDuration should be at least 3x HeartbeatInterval + if leaseDuration < heartbeatInterval*3 { + panic(fmt.Sprintf("LeaseDuration (%v) must be at least 3x HeartbeatInterval (%v)", leaseDuration, heartbeatInterval)) + } + rdb := rdb.NewRDB(c) + rdb.SetLeaseDuration(leaseDuration) starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest) @@ -513,12 +559,12 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { syncer := newSyncer(syncerParams{ logger: logger, requestsCh: syncCh, - interval: 5 * time.Second, + interval: heartbeatInterval, }) heartbeater := newHeartbeater(heartbeaterParams{ logger: logger, broker: rdb, - interval: 5 * time.Second, + interval: heartbeatInterval, concurrency: n, queues: queues, strictPriority: cfg.StrictPriority,