Skip to content

Commit d875e92

Browse files
committed
Add pseudo listen/notify mechanism for SQLite
A fairly long-standing issue with SQLite compared to Postgres is the lack of a listen/notify system because SQLite doesn't offer such a mechanism. This has forced SQLite to operate in poll-only mode which means there's a longer delay before it sees jobs that it can work. Honker [1] hit Hacker News a few weeks/months back. It implements a listen/notify-like mechanism, and when reading about how it's implemented, I realized it wouldn't be all that crazy to do something similar for River. Basically, it's poll based, and relies on polling a single table fairly actively. That's something that's not great in general, but we already do that for poll-only, except that it's multiple tables instead of just one (i.e. `river_job` + `river_leader`). Here, I propose that we implement listen/notify using a similar strategy by adding a `river_notification` in SQLite. Like listen/notify, this table has a listen/notify, along with a timestamp that we can use to track event creation and prune older rows. We add a SQLite `Listener` similar to the one for Postgres, but which polls `river_notification`, remembering the last ID that it so it can make sure to only fetch new notifications in each River process. A new maintenance service that runs only in SQLite takes care of pruning events in the table older than 5 minutes. A nice property of `river_notification` is that like listen/notify, it all works transactionally as well. New rows are hidden until the transaction that inserted them commits, keeping them invisible from the listener. After commit, the listener sees them and responds. We track a `lastID` in the listener for more efficient reads, which seems like it might be a problem due to out-of-order IDs, but for better or worse this is not possible in SQLite due its single-writer model. IDs always commit in order. You might think that this would create a table that's overly high volume because 1,000 job insertions would create 1,000 notifications, but luckily we've been deduplicating notifications through the use of a notify limiter (i.e. `insertNotifyLimiter.ShouldTrigger(queue)`) for a long time now. Leadership notifications are also low volume, so this table should never have to be very big at any given time, which is nice. Requires a database migration, but that might dovetail fairly nicely with #1224 (convert SQLite json to jsonb), which also does. I've experimented with a few versions of this. This one also adds a `river_notification` to Postgres, though doesn't use it. The rationale here is (1) that the Postgres `river_notification` makes it easier to test the client as if it was SQLite but without having to bring a SQLite driver into the top-level project as a dependency, and (2) it may be a useful alternative to poll-only even in Postgres down the line, where something like a bouncer makes listen/notify difficult to use. [1] https://github.com/russellromney/honker
1 parent a5db4c6 commit d875e92

34 files changed

Lines changed: 928 additions & 39 deletions

client.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,13 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
992992
client.testSignals.queueCleaner = &queueCleaner.TestSignals
993993
}
994994

995+
if driver.DatabaseName() == riverdriver.DatabaseNameSQLite {
996+
sqliteNotificationCleaner := maintenance.NewSQLiteNotificationCleaner(archetype, &maintenance.SQLiteNotificationCleanerConfig{
997+
Schema: config.Schema,
998+
}, driver.GetExecutor())
999+
maintenanceServices = append(maintenanceServices, sqliteNotificationCleaner)
1000+
}
1001+
9951002
{
9961003
var scheduleFunc func(time.Time) time.Time
9971004
if config.ReindexerSchedule != nil {
@@ -2378,8 +2385,6 @@ type JobListResult struct {
23782385
LastCursor *JobListCursor
23792386
}
23802387

2381-
const databaseNameSQLite = "sqlite"
2382-
23832388
var errJobListParamsMetadataNotSupportedSQLite = errors.New("JobListParams.Metadata is not supported on SQLite")
23842389

23852390
// JobList returns a paginated list of jobs matching the provided filters. The
@@ -2401,7 +2406,7 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobL
24012406
}
24022407
params.schema = c.config.Schema
24032408

2404-
if c.driver.DatabaseName() == databaseNameSQLite && params.metadataCalled {
2409+
if c.driver.DatabaseName() == riverdriver.DatabaseNameSQLite && params.metadataCalled {
24052410
return nil, errJobListParamsMetadataNotSupportedSQLite
24062411
}
24072412

@@ -2442,7 +2447,7 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
24422447
}
24432448
params.schema = c.config.Schema
24442449

2445-
if c.driver.DatabaseName() == databaseNameSQLite && params.metadataCalled {
2450+
if c.driver.DatabaseName() == riverdriver.DatabaseNameSQLite && params.metadataCalled {
24462451
return nil, errJobListParamsMetadataNotSupportedSQLite
24472452
}
24482453

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package maintenance
2+
3+
import (
4+
"cmp"
5+
"context"
6+
"errors"
7+
"log/slog"
8+
"time"
9+
10+
"github.com/riverqueue/river/riverdriver"
11+
"github.com/riverqueue/river/rivershared/baseservice"
12+
"github.com/riverqueue/river/rivershared/riversharedmaintenance"
13+
"github.com/riverqueue/river/rivershared/startstop"
14+
"github.com/riverqueue/river/rivershared/testsignal"
15+
"github.com/riverqueue/river/rivershared/util/testutil"
16+
"github.com/riverqueue/river/rivershared/util/timeutil"
17+
)
18+
19+
const (
20+
SQLiteNotificationCleanerIntervalDefault = time.Minute
21+
SQLiteNotificationCleanerRetentionPeriodDefault = 5 * time.Minute
22+
)
23+
24+
// SQLiteNotificationCleanerTestSignals are internal signals used exclusively in tests.
25+
type SQLiteNotificationCleanerTestSignals struct {
26+
DeletedBatch testsignal.TestSignal[struct{}] // notifies when runOnce finishes a pass
27+
}
28+
29+
func (ts *SQLiteNotificationCleanerTestSignals) Init(tb testutil.TestingTB) {
30+
ts.DeletedBatch.Init(tb)
31+
}
32+
33+
type SQLiteNotificationCleanerConfig struct {
34+
// Interval is the amount of time to wait between cleaner runs.
35+
Interval time.Duration
36+
37+
// RetentionPeriod is the amount of time to keep notification rows around
38+
// before they're removed.
39+
RetentionPeriod time.Duration
40+
41+
// Schema where River tables are located. Empty string omits schema.
42+
Schema string
43+
44+
// Timeout is the timeout for each delete query.
45+
Timeout time.Duration
46+
}
47+
48+
func (c *SQLiteNotificationCleanerConfig) mustValidate() *SQLiteNotificationCleanerConfig {
49+
if c.Interval <= 0 {
50+
panic("SQLiteNotificationCleanerConfig.Interval must be above zero")
51+
}
52+
if c.RetentionPeriod <= 0 {
53+
panic("SQLiteNotificationCleanerConfig.RetentionPeriod must be above zero")
54+
}
55+
if c.Timeout <= 0 {
56+
panic("SQLiteNotificationCleanerConfig.Timeout must be above zero")
57+
}
58+
59+
return c
60+
}
61+
62+
// SQLiteNotificationCleaner periodically removes old rows from SQLite's
63+
// notification outbox. It is only needed for the SQLite driver's emulated
64+
// listen/notify support.
65+
type SQLiteNotificationCleaner struct {
66+
riversharedmaintenance.QueueMaintainerServiceBase
67+
startstop.BaseStartStop
68+
69+
// exported for test purposes
70+
Config *SQLiteNotificationCleanerConfig
71+
TestSignals SQLiteNotificationCleanerTestSignals
72+
73+
exec riverdriver.Executor
74+
}
75+
76+
// NewSQLiteNotificationCleaner returns a SQLite notification cleaner.
77+
func NewSQLiteNotificationCleaner(archetype *baseservice.Archetype, config *SQLiteNotificationCleanerConfig, exec riverdriver.Executor) *SQLiteNotificationCleaner {
78+
return baseservice.Init(archetype, &SQLiteNotificationCleaner{
79+
Config: (&SQLiteNotificationCleanerConfig{
80+
Interval: cmp.Or(config.Interval, SQLiteNotificationCleanerIntervalDefault),
81+
RetentionPeriod: cmp.Or(config.RetentionPeriod, SQLiteNotificationCleanerRetentionPeriodDefault),
82+
Schema: config.Schema,
83+
Timeout: cmp.Or(config.Timeout, riversharedmaintenance.TimeoutDefault),
84+
}).mustValidate(),
85+
exec: exec,
86+
})
87+
}
88+
89+
func (s *SQLiteNotificationCleaner) Start(ctx context.Context) error { //nolint:dupl
90+
ctx, shouldStart, started, stopped := s.StartInit(ctx)
91+
if !shouldStart {
92+
return nil
93+
}
94+
95+
s.StaggerStart(ctx)
96+
97+
go func() {
98+
started()
99+
defer stopped() // this defer should come first so it's last out
100+
101+
s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStarted)
102+
defer s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRunLoopStopped)
103+
104+
ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval)
105+
for {
106+
select {
107+
case <-ctx.Done():
108+
return
109+
case <-ticker.C:
110+
}
111+
112+
res, err := s.runOnce(ctx)
113+
if err != nil {
114+
if !errors.Is(err, context.Canceled) {
115+
s.Logger.ErrorContext(ctx, s.Name+": Error cleaning SQLite notifications", slog.String("error", err.Error()))
116+
}
117+
continue
118+
}
119+
120+
if res.NumNotificationsDeleted > 0 {
121+
s.Logger.InfoContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
122+
slog.Int("num_notifications_deleted", res.NumNotificationsDeleted),
123+
)
124+
}
125+
}
126+
}()
127+
128+
return nil
129+
}
130+
131+
type sqliteNotificationCleanerRunOnceResult struct {
132+
NumNotificationsDeleted int
133+
}
134+
135+
func (s *SQLiteNotificationCleaner) runOnce(ctx context.Context) (*sqliteNotificationCleanerRunOnceResult, error) {
136+
ctx, cancelFunc := context.WithTimeout(ctx, s.Config.Timeout)
137+
defer cancelFunc()
138+
139+
numDeleted, err := s.exec.NotificationDeleteBefore(ctx, &riverdriver.NotificationDeleteBeforeParams{
140+
CreatedAtHorizon: time.Now().Add(-s.Config.RetentionPeriod),
141+
Schema: s.Config.Schema,
142+
})
143+
if err != nil {
144+
return nil, err
145+
}
146+
147+
s.TestSignals.DeletedBatch.Signal(struct{}{})
148+
149+
return &sqliteNotificationCleanerRunOnceResult{
150+
NumNotificationsDeleted: numDeleted,
151+
}, nil
152+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package maintenance
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/riverqueue/river/riverdbtest"
11+
"github.com/riverqueue/river/riverdriver"
12+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
13+
"github.com/riverqueue/river/rivershared/riversharedtest"
14+
"github.com/riverqueue/river/rivershared/startstoptest"
15+
)
16+
17+
func TestSQLiteNotificationCleaner(t *testing.T) {
18+
t.Parallel()
19+
20+
ctx := context.Background()
21+
22+
type testBundle struct {
23+
exec riverdriver.Executor
24+
schema string
25+
}
26+
27+
setup := func(t *testing.T) (*SQLiteNotificationCleaner, *testBundle) {
28+
t.Helper()
29+
30+
driver := riverpgxv5.New(riversharedtest.DBPool(ctx, t))
31+
tx, schema := riverdbtest.TestTxPgxDriver(ctx, t, driver, nil)
32+
33+
bundle := &testBundle{
34+
exec: driver.UnwrapExecutor(tx),
35+
schema: schema,
36+
}
37+
38+
cleaner := NewSQLiteNotificationCleaner(
39+
riversharedtest.BaseServiceArchetype(t),
40+
&SQLiteNotificationCleanerConfig{
41+
Interval: time.Hour,
42+
RetentionPeriod: time.Hour,
43+
Schema: bundle.schema,
44+
Timeout: time.Second,
45+
},
46+
bundle.exec,
47+
)
48+
cleaner.StaggerStartupDisable(true)
49+
t.Cleanup(cleaner.Stop)
50+
51+
return cleaner, bundle
52+
}
53+
54+
notificationCount := func(t *testing.T, exec riverdriver.Executor) int {
55+
t.Helper()
56+
57+
var count int
58+
require.NoError(t, exec.QueryRow(ctx, "SELECT count(*) FROM river_notification").Scan(&count))
59+
return count
60+
}
61+
62+
t.Run("Defaults", func(t *testing.T) {
63+
t.Parallel()
64+
65+
cleaner := NewSQLiteNotificationCleaner(
66+
riversharedtest.BaseServiceArchetype(t),
67+
&SQLiteNotificationCleanerConfig{},
68+
nil,
69+
)
70+
71+
require.Equal(t, SQLiteNotificationCleanerIntervalDefault, cleaner.Config.Interval)
72+
require.Equal(t, SQLiteNotificationCleanerRetentionPeriodDefault, cleaner.Config.RetentionPeriod)
73+
})
74+
75+
t.Run("DeletesExpiredNotifications", func(t *testing.T) {
76+
t.Parallel()
77+
78+
cleaner, bundle := setup(t)
79+
cleaner.TestSignals.Init(t)
80+
81+
now := time.Now()
82+
require.NoError(t, bundle.exec.Exec(ctx, `
83+
INSERT INTO river_notification (created_at, payload, topic)
84+
VALUES
85+
($1, 'old_payload_1', 'topic'),
86+
($2, 'old_payload_2', 'topic'),
87+
($3, 'new_payload', 'topic')
88+
`, now.Add(-2*time.Hour), now.Add(-61*time.Minute), now.Add(-30*time.Minute)))
89+
90+
res, err := cleaner.runOnce(ctx)
91+
require.NoError(t, err)
92+
require.Equal(t, 2, res.NumNotificationsDeleted)
93+
cleaner.TestSignals.DeletedBatch.WaitOrTimeout()
94+
require.Equal(t, 1, notificationCount(t, bundle.exec))
95+
})
96+
97+
t.Run("StartStopStress", func(t *testing.T) {
98+
t.Parallel()
99+
100+
cleaner, _ := setup(t)
101+
cleaner.Logger = riversharedtest.LoggerWarn(t) // loop started/stop log is very noisy; suppress
102+
103+
startstoptest.Stress(ctx, t, cleaner)
104+
})
105+
}

riverdriver/river_driver_interface.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ import (
2424

2525
const AllQueuesString = "*"
2626

27+
const (
28+
DatabaseNamePostgres = "postgres"
29+
DatabaseNameSQLite = "sqlite"
30+
)
31+
2732
const MigrationLineMain = "main"
2833

2934
var (
@@ -138,12 +143,14 @@ type Driver[TTx any] interface {
138143
// API is not stable. DO NOT USE.
139144
SupportsListener() bool
140145

141-
// SupportsListenNotify indicates whether the underlying database supports
142-
// listen/notify. This differs from SupportsListener in that even if a
143-
// driver doesn't a support a listener but the database supports the
144-
// underlying listen/notify mechanism, it will still broadcast in case there
145-
// are other clients/drivers on the database that do support a listener. If
146-
// listen/notify can't be supported at all, no broadcast attempt is made.
146+
// SupportsListenNotify indicates whether the driver can broadcast
147+
// notifications that a listener can receive, either through a native
148+
// database mechanism like Postgres LISTEN/NOTIFY or a driver-specific
149+
// emulation. This differs from SupportsListener in that even if a driver
150+
// doesn't support a listener but the database supports the underlying
151+
// notification mechanism, it will still broadcast in case there are other
152+
// clients/drivers on the database that do support a listener. If
153+
// notifications can't be supported at all, no broadcast attempt is made.
147154
//
148155
// API is not stable. DO NOT USE.
149156
SupportsListenNotify() bool
@@ -256,6 +263,14 @@ type Executor interface {
256263
// the `line` column was added to the migrations table.
257264
MigrationInsertManyAssumingMain(ctx context.Context, params *MigrationInsertManyAssumingMainParams) ([]*Migration, error)
258265

266+
// NotificationDeleteBefore deletes notifications before a certain time
267+
// horizon.
268+
//
269+
// A "notification" in this context refers to a row in `river_notification`
270+
// which is a special table implemented in some databases (e.g. SQLite) that
271+
// simulates Postgres' listen/notify when not available.
272+
NotificationDeleteBefore(ctx context.Context, params *NotificationDeleteBeforeParams) (int, error)
273+
259274
NotifyMany(ctx context.Context, params *NotifyManyParams) error
260275
PGAdvisoryXactLock(ctx context.Context, key int64) (*struct{}, error)
261276

@@ -775,6 +790,11 @@ type NotifyManyParams struct {
775790
Schema string
776791
}
777792

793+
type NotificationDeleteBeforeParams struct {
794+
CreatedAtHorizon time.Time
795+
Schema string
796+
}
797+
778798
type ProducerKeepAliveParams struct {
779799
ID int64
780800
QueueName string
@@ -883,8 +903,10 @@ func MigrationLineMainTruncateTables(version int) []string {
883903
return []string{"river_job", "river_leader"}
884904
case 4:
885905
return []string{"river_job", "river_leader", "river_queue"}
886-
case 0, 5, 6:
906+
case 5, 6:
887907
return []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue"}
908+
case 0, 7:
909+
return []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue", "river_notification"}
888910
}
889911

890912
panic(fmt.Sprintf("unrecognized migration version: %d", version))

riverdriver/riverdatabasesql/internal/dbsqlc/models.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)