Skip to content

Commit 1f2087b

Browse files
committed
Add pseudo listen/notify mechanism for SQLite
A fairly long-standing issue with SQLite compared to Postgres is the lack of a listen/notify system because SQLite doesn't offer such a mechanism. This has forced SQLite to operate in poll-only mode which means there's a longer delay before it sees jobs that it can work. Honker [1] hit Hacker News a few weeks/months back. It implements a listen/notify-like mechanism, and when reading about how it's implemented, I realized it wouldn't be all that crazy to do something similar for River. Basically, it's poll based, and relies on polling a single table fairly actively. That's something that's not great in general, but we already do that for poll-only, except that it's multiple tables instead of just one (i.e. `river_job` + `river_leader`). Here, I propose that we implement listen/notify using a similar strategy by adding a `river_notification` in SQLite. Like listen/notify, this table has a listen/notify, along with a timestamp that we can use to track event creation and prune older rows. We add a SQLite `Listener` similar to the one for Postgres, but which polls `river_notification`, remembering the last ID that it so it can make sure to only fetch new notifications in each River process. A new maintenance service that runs only in SQLite takes care of pruning events in the table older than 5 minutes. A nice property of `river_notification` is that like listen/notify, it all works transactionally as well. New rows are hidden until the transaction that inserted them commits, keeping them invisible from the listener. After commit, the listener sees them and responds. We track a `lastID` in the listener for more efficient reads, which seems like it might be a problem due to out-of-order IDs, but for better or worse this is not possible in SQLite due its single-writer model. IDs always commit in order. You might think that this would create a table that's overly high volume because 1,000 job insertions would create 1,000 notifications, but luckily we've been deduplicating notifications through the use of a notify limiter (i.e. `insertNotifyLimiter.ShouldTrigger(queue)`) for a long time now. Leadership notifications are also low volume, so this table should never have to be very big at any given time, which is nice. Requires a database migration, but that might dovetail fairly nicely with #1224 (convert SQLite json to jsonb), which also does. [1] https://github.com/russellromney/honker
1 parent 5535390 commit 1f2087b

23 files changed

Lines changed: 815 additions & 40 deletions

client.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,14 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
992992
client.testSignals.queueCleaner = &queueCleaner.TestSignals
993993
}
994994

995+
if driver.DatabaseName() == riverdriver.DatabaseNameSQLite {
996+
sqliteNotificationCleaner := maintenance.NewSQLiteNotificationCleaner(archetype, &maintenance.SQLiteNotificationCleanerConfig{
997+
DatabaseName: driver.DatabaseName(),
998+
Schema: config.Schema,
999+
}, driver.GetExecutor())
1000+
maintenanceServices = append(maintenanceServices, sqliteNotificationCleaner)
1001+
}
1002+
9951003
{
9961004
var scheduleFunc func(time.Time) time.Time
9971005
if config.ReindexerSchedule != nil {
@@ -2378,8 +2386,6 @@ type JobListResult struct {
23782386
LastCursor *JobListCursor
23792387
}
23802388

2381-
const databaseNameSQLite = "sqlite"
2382-
23832389
var errJobListParamsMetadataNotSupportedSQLite = errors.New("JobListParams.Metadata is not supported on SQLite")
23842390

23852391
// JobList returns a paginated list of jobs matching the provided filters. The
@@ -2401,7 +2407,7 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobL
24012407
}
24022408
params.schema = c.config.Schema
24032409

2404-
if c.driver.DatabaseName() == databaseNameSQLite && params.metadataCalled {
2410+
if c.driver.DatabaseName() == riverdriver.DatabaseNameSQLite && params.metadataCalled {
24052411
return nil, errJobListParamsMetadataNotSupportedSQLite
24062412
}
24072413

@@ -2442,7 +2448,7 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
24422448
}
24432449
params.schema = c.config.Schema
24442450

2445-
if c.driver.DatabaseName() == databaseNameSQLite && params.metadataCalled {
2451+
if c.driver.DatabaseName() == riverdriver.DatabaseNameSQLite && params.metadataCalled {
24462452
return nil, errJobListParamsMetadataNotSupportedSQLite
24472453
}
24482454

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package maintenance
2+
3+
import (
4+
"cmp"
5+
"context"
6+
"errors"
7+
"log/slog"
8+
"time"
9+
10+
"github.com/riverqueue/river/riverdriver"
11+
"github.com/riverqueue/river/rivershared/baseservice"
12+
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
13+
"github.com/riverqueue/river/rivershared/startstop"
14+
"github.com/riverqueue/river/rivershared/testsignal"
15+
"github.com/riverqueue/river/rivershared/util/testutil"
16+
"github.com/riverqueue/river/rivershared/util/timeutil"
17+
)
18+
19+
const (
20+
SQLiteNotificationCleanerIntervalDefault = time.Minute
21+
SQLiteNotificationCleanerRetentionPeriodDefault = 5 * time.Minute
22+
)
23+
24+
// SQLiteNotificationCleanerTestSignals are internal signals used exclusively in tests.
25+
type SQLiteNotificationCleanerTestSignals struct {
26+
DeletedBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass
27+
}
28+
29+
func (ts *SQLiteNotificationCleanerTestSignals) Init(tb testutil.TestingTB) {
30+
ts.DeletedBatch.Init(tb)
31+
}
32+
33+
type SQLiteNotificationCleanerConfig struct {
34+
// DatabaseName is the name returned by the active driver. The service only
35+
// starts when this is SQLite.
36+
DatabaseName string
37+
38+
// Interval is the amount of time to wait between cleaner runs.
39+
Interval time.Duration
40+
41+
// RetentionPeriod is the amount of time to keep notification rows around
42+
// before they're removed.
43+
RetentionPeriod time.Duration
44+
45+
// Schema where River tables are located. Empty string omits schema.
46+
Schema string
47+
48+
// Timeout is the timeout for each delete query.
49+
Timeout time.Duration
50+
}
51+
52+
func (c *SQLiteNotificationCleanerConfig) mustValidate() *SQLiteNotificationCleanerConfig {
53+
if c.Interval <= 0 {
54+
panic("SQLiteNotificationCleanerConfig.Interval must be above zero")
55+
}
56+
if c.RetentionPeriod <= 0 {
57+
panic("SQLiteNotificationCleanerConfig.RetentionPeriod must be above zero")
58+
}
59+
if c.Timeout <= 0 {
60+
panic("SQLiteNotificationCleanerConfig.Timeout must be above zero")
61+
}
62+
63+
return c
64+
}
65+
66+
// SQLiteNotificationCleaner periodically removes old rows from SQLite's
67+
// notification outbox. It is only needed for the SQLite driver's emulated
68+
// listen/notify support.
69+
type SQLiteNotificationCleaner struct {
70+
riversharedmaintenance.QueueMaintainerServiceBase
71+
startstop.BaseStartStop
72+
73+
// exported for test purposes
74+
Config *SQLiteNotificationCleanerConfig
75+
TestSignals SQLiteNotificationCleanerTestSignals
76+
77+
exec notificationDeleter
78+
}
79+
80+
func NewSQLiteNotificationCleaner(archetype *baseservice.Archetype, config *SQLiteNotificationCleanerConfig, exec riverdriver.Executor) *SQLiteNotificationCleaner {
81+
return newSQLiteNotificationCleaner(archetype, config, exec)
82+
}
83+
84+
type notificationDeleter interface {
85+
NotificationDeleteBefore(ctx context.Context, params *riverdriver.NotificationDeleteBeforeParams) (int, error)
86+
}
87+
88+
func newSQLiteNotificationCleaner(archetype *baseservice.Archetype, config *SQLiteNotificationCleanerConfig, exec notificationDeleter) *SQLiteNotificationCleaner {
89+
return baseservice.Init(archetype, &SQLiteNotificationCleaner{
90+
Config: (&SQLiteNotificationCleanerConfig{
91+
DatabaseName: config.DatabaseName,
92+
Interval: cmp.Or(config.Interval, SQLiteNotificationCleanerIntervalDefault),
93+
RetentionPeriod: cmp.Or(config.RetentionPeriod, SQLiteNotificationCleanerRetentionPeriodDefault),
94+
Schema: config.Schema,
95+
Timeout: cmp.Or(config.Timeout, riversharedmaintenance.TimeoutDefault),
96+
}).mustValidate(),
97+
exec: exec,
98+
})
99+
}
100+
101+
func (s *SQLiteNotificationCleaner) Start(ctx context.Context) error {
102+
ctx, shouldStart, started, stopped := s.StartInit(ctx)
103+
if !shouldStart {
104+
return nil
105+
}
106+
107+
// This service only applies to SQLite's notification outbox. If it's
108+
// accidentally wired into another driver, leave it inert.
109+
if s.Config.DatabaseName != riverdriver.DatabaseNameSQLite {
110+
started()
111+
stopped()
112+
return nil
113+
}
114+
115+
s.StaggerStart(ctx)
116+
117+
go func() {
118+
started()
119+
defer stopped() // this defer should come first so it's last out
120+
121+
s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted)
122+
defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped)
123+
124+
ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval)
125+
for {
126+
select {
127+
case <-ctx.Done():
128+
return
129+
case <-ticker.C:
130+
}
131+
132+
res, err := s.runOnce(ctx)
133+
if err != nil {
134+
if !errors.Is(err, context.Canceled) {
135+
s.Logger.ErrorContext(ctx, s.Name+": Error cleaning SQLite notifications", slog.String("error", err.Error()))
136+
}
137+
continue
138+
}
139+
140+
if res.NumNotificationsDeleted > 0 {
141+
s.Logger.InfoContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
142+
slog.Int("num_notifications_deleted", res.NumNotificationsDeleted),
143+
)
144+
}
145+
}
146+
}()
147+
148+
return nil
149+
}
150+
151+
type sqliteNotificationCleanerRunOnceResult struct {
152+
NumNotificationsDeleted int
153+
}
154+
155+
func (s *SQLiteNotificationCleaner) runOnce(ctx context.Context) (*sqliteNotificationCleanerRunOnceResult, error) {
156+
ctx, cancelFunc := context.WithTimeout(ctx, s.Config.Timeout)
157+
defer cancelFunc()
158+
159+
numDeleted, err := s.exec.NotificationDeleteBefore(ctx, &riverdriver.NotificationDeleteBeforeParams{
160+
CreatedAtHorizon: time.Now().Add(-s.Config.RetentionPeriod),
161+
Schema: s.Config.Schema,
162+
})
163+
if err != nil {
164+
return nil, err
165+
}
166+
167+
s.TestSignals.DeletedBatch.Signal(struct{}{})
168+
169+
return &sqliteNotificationCleanerRunOnceResult{
170+
NumNotificationsDeleted: numDeleted,
171+
}, nil
172+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package maintenance
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/riverqueue/river/riverdriver"
12+
"github.com/riverqueue/river/rivershared/riversharedtest"
13+
"github.com/riverqueue/river/rivershared/startstoptest"
14+
)
15+
16+
// Notably, these tests are a little unusual in that unlike other maintenance
17+
// services, they depend on a fake and don't execute real database operations.
18+
// We did this because we'd otherwise have to add a SQLite dependency to the
19+
// top-level project because it's only under the SQLite driver that we create
20+
// `river_notification`. The maintenance service is straightforward the database
21+
// operation exercised in driver tests, so this should be good enough to vet it.
22+
func TestSQLiteNotificationCleaner(t *testing.T) {
23+
t.Parallel()
24+
25+
ctx := context.Background()
26+
27+
setup := func(t *testing.T) (*SQLiteNotificationCleaner, *sqliteNotificationCleanerExecutorMock) {
28+
t.Helper()
29+
30+
exec := &sqliteNotificationCleanerExecutorMock{}
31+
32+
cleaner := newSQLiteNotificationCleaner(
33+
riversharedtest.BaseServiceArchetype(t),
34+
&SQLiteNotificationCleanerConfig{
35+
DatabaseName: riverdriver.DatabaseNameSQLite,
36+
Interval: time.Hour,
37+
RetentionPeriod: time.Hour,
38+
Timeout: time.Second,
39+
},
40+
exec,
41+
)
42+
cleaner.StaggerStartupDisable(true)
43+
t.Cleanup(cleaner.Stop)
44+
45+
return cleaner, exec
46+
}
47+
48+
t.Run("Defaults", func(t *testing.T) {
49+
t.Parallel()
50+
51+
cleaner := NewSQLiteNotificationCleaner(
52+
riversharedtest.BaseServiceArchetype(t),
53+
&SQLiteNotificationCleanerConfig{DatabaseName: riverdriver.DatabaseNameSQLite},
54+
nil,
55+
)
56+
57+
require.Equal(t, riverdriver.DatabaseNameSQLite, cleaner.Config.DatabaseName)
58+
require.Equal(t, SQLiteNotificationCleanerIntervalDefault, cleaner.Config.Interval)
59+
require.Equal(t, SQLiteNotificationCleanerRetentionPeriodDefault, cleaner.Config.RetentionPeriod)
60+
})
61+
62+
t.Run("StartStopStress", func(t *testing.T) {
63+
t.Parallel()
64+
65+
cleaner, _ := setup(t)
66+
cleaner.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress
67+
68+
startstoptest.Stress(ctx, t, cleaner)
69+
})
70+
71+
t.Run("DoesNotStartForNonSQLite", func(t *testing.T) {
72+
t.Parallel()
73+
74+
cleaner := NewSQLiteNotificationCleaner(
75+
riversharedtest.BaseServiceArchetype(t),
76+
&SQLiteNotificationCleanerConfig{
77+
DatabaseName: riverdriver.DatabaseNamePostgres,
78+
},
79+
nil,
80+
)
81+
82+
started := cleaner.Started()
83+
stopped := cleaner.Stopped()
84+
require.NoError(t, cleaner.Start(ctx))
85+
riversharedtest.WaitOrTimeout(t, started)
86+
riversharedtest.WaitOrTimeout(t, stopped)
87+
cleaner.Stop()
88+
})
89+
90+
t.Run("RunOnce", func(t *testing.T) {
91+
t.Parallel()
92+
93+
cleaner, exec := setup(t)
94+
cleaner.TestSignals.Init(t)
95+
96+
res, err := cleaner.runOnce(ctx)
97+
require.NoError(t, err)
98+
require.Equal(t, sqliteNotificationCleanerExecutorMockNumDeleted, res.NumNotificationsDeleted)
99+
cleaner.TestSignals.DeletedBatch.WaitOrTimeout()
100+
101+
params := exec.notificationDeleteBeforeParams()
102+
require.Equal(t, cleaner.Config.Schema, params.Schema)
103+
require.WithinDuration(t, time.Now().Add(-cleaner.Config.RetentionPeriod), params.CreatedAtHorizon, 5*time.Second)
104+
})
105+
}
106+
107+
const sqliteNotificationCleanerExecutorMockNumDeleted = 7
108+
109+
type sqliteNotificationCleanerExecutorMock struct {
110+
mu sync.Mutex
111+
notificationDeleteBeforeParam *riverdriver.NotificationDeleteBeforeParams
112+
}
113+
114+
func (m *sqliteNotificationCleanerExecutorMock) NotificationDeleteBefore(ctx context.Context, params *riverdriver.NotificationDeleteBeforeParams) (int, error) {
115+
_ = ctx
116+
117+
m.mu.Lock()
118+
defer m.mu.Unlock()
119+
120+
paramsCopy := *params
121+
m.notificationDeleteBeforeParam = &paramsCopy
122+
123+
return sqliteNotificationCleanerExecutorMockNumDeleted, nil
124+
}
125+
126+
func (m *sqliteNotificationCleanerExecutorMock) notificationDeleteBeforeParams() *riverdriver.NotificationDeleteBeforeParams {
127+
m.mu.Lock()
128+
defer m.mu.Unlock()
129+
130+
return m.notificationDeleteBeforeParam
131+
}

metadata_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import (
44
"context"
55
"testing"
66

7-
"github.com/riverqueue/river/internal/jobexecutor"
87
"github.com/stretchr/testify/require"
8+
9+
"github.com/riverqueue/river/internal/jobexecutor"
910
)
1011

1112
func TestMetadataSet(t *testing.T) {

0 commit comments

Comments
 (0)