Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Unlike other SQLite replication solutions that require a leader-follower archite
![Fault Tolerant](https://img.shields.io/badge/Fault%20Tolerant-✔️-green)
![Built on NATS](https://img.shields.io/badge/Built%20on%20NATS-✔️-green)

- **Schema Versioning**: Safe rolling upgrades with automatic pause/resume on schema mismatch
- Multiple snapshot storage options:
- NATS Blob Storage
- WebDAV
Expand All @@ -77,6 +78,7 @@ Unlike other SQLite replication solutions that require a leader-follower archite

Future plans for HarmonyLite include:
- Improved documentation and examples
- Enhanced observability and metrics

## Documentation

Expand Down
6 changes: 4 additions & 2 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ type Configuration struct {
var ConfigPathFlag = flag.String("config", "", "Path to configuration file")
var CleanupFlag = flag.Bool("cleanup", false, "Only cleanup harmonylite triggers and changelogs")
var SaveSnapshotFlag = flag.Bool("save-snapshot", false, "Only take snapshot and upload")
var SchemaStatusFlag = flag.Bool("schema-status", false, "Display schema status")
var SchemaStatusClusterFlag = flag.Bool("schema-status-cluster", false, "Display cluster-wide schema status")
var ClusterAddrFlag = flag.String("cluster-addr", "", "Cluster listening address")
var ClusterPeersFlag = flag.String("cluster-peers", "", "Comma separated list of clusters")
var LeafServerFlag = flag.String("leaf-servers", "", "Comma separated list of leaf servers")
Expand Down Expand Up @@ -186,9 +188,9 @@ var Config = &Configuration{
Namespace: "harmonylite",
Subsystem: "",
},

HealthCheck: &HealthCheckConfiguration{
Enable: false, // Disabled by default
Enable: false, // Disabled by default
Bind: "0.0.0.0:8090",
Path: "/health",
Detailed: true,
Expand Down
11 changes: 6 additions & 5 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,11 +520,12 @@ func (conn *SqliteStreamDB) consumeChangeLogs(tableName string, changes []*chang

if conn.OnChange != nil {
err = conn.OnChange(&ChangeLogEvent{
Id: changeRowID,
Type: changeRow.Type,
TableName: tableName,
Row: row,
tableInfo: conn.watchTablesSchema[tableName],
Id: changeRowID,
Type: changeRow.Type,
TableName: tableName,
Row: row,
SchemaHash: conn.GetSchemaHash(),
tableInfo: conn.watchTablesSchema[tableName],
})

if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions db/change_log_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ type sensitiveTypeWrapper struct {
}

type ChangeLogEvent struct {
Id int64
Type string
TableName string
Row map[string]any
tableInfo []*ColumnInfo `cbor:"-"`
Id int64
Type string
TableName string
Row map[string]any
SchemaHash string `cbor:"sh,omitempty"` // Hash of all watched tables at creation
tableInfo []*ColumnInfo `cbor:"-"`
}

func init() {
Expand Down
63 changes: 63 additions & 0 deletions db/schema_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package db

import (
"context"
"fmt"
"sync"
)

// SchemaCache holds the precomputed schema hash for fast validation
type SchemaCache struct {
mu sync.RWMutex
schemaHash string
schemaManager *SchemaManager
tables []string
}

// NewSchemaCache creates a new SchemaCache
func NewSchemaCache() *SchemaCache {
return &SchemaCache{}
}

// Initialize computes and caches the schema hash for watched tables
func (sc *SchemaCache) Initialize(ctx context.Context, sm *SchemaManager, tables []string) error {
sc.mu.Lock()
defer sc.mu.Unlock()

hash, err := sm.ComputeSchemaHash(ctx, tables)
if err != nil {
return fmt.Errorf("computing schema hash: %w", err)
}
sc.schemaHash = hash
sc.schemaManager = sm
sc.tables = tables
return nil
}

// GetSchemaHash returns the cached schema hash (O(1))
func (sc *SchemaCache) GetSchemaHash() string {
sc.mu.RLock()
defer sc.mu.RUnlock()
return sc.schemaHash
}

// Recompute recalculates the schema hash from the database
// Called during pause state to detect if local DDL has been applied
func (sc *SchemaCache) Recompute(ctx context.Context) (string, error) {
sc.mu.Lock()
defer sc.mu.Unlock()

hash, err := sc.schemaManager.ComputeSchemaHash(ctx, sc.tables)
if err != nil {
return "", fmt.Errorf("recomputing schema hash: %w", err)
}
sc.schemaHash = hash
return hash, nil
}

// IsInitialized returns true if the cache has been initialized
func (sc *SchemaCache) IsInitialized() bool {
sc.mu.RLock()
defer sc.mu.RUnlock()
return sc.schemaManager != nil && len(sc.tables) > 0
}
126 changes: 126 additions & 0 deletions db/schema_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package db

import (
"context"
"crypto/sha256"
"database/sql"
"encoding/hex"
"fmt"
"io"
"sort"

"ariga.io/atlas/sql/migrate"
"ariga.io/atlas/sql/schema"
"ariga.io/atlas/sql/sqlite"
)

// SchemaManager wraps Atlas's SQLite driver for schema operations
type SchemaManager struct {
driver migrate.Driver
db *sql.DB
}

// NewSchemaManager creates a new SchemaManager using the provided database connection
func NewSchemaManager(db *sql.DB) (*SchemaManager, error) {
// Open Atlas driver on existing connection
driver, err := sqlite.Open(db)
if err != nil {
return nil, fmt.Errorf("opening atlas driver: %w", err)
}
return &SchemaManager{driver: driver, db: db}, nil
}

// InspectTables returns Atlas schema.Table objects for the specified tables
func (sm *SchemaManager) InspectTables(ctx context.Context, tables []string) ([]*schema.Table, error) {
// Inspect the schema realm (all tables)
realm, err := sm.driver.InspectRealm(ctx, &schema.InspectRealmOption{
Schemas: []string{"main"},
})
if err != nil {
return nil, fmt.Errorf("inspecting realm: %w", err)
}

if len(realm.Schemas) == 0 {
return nil, nil
}

// Filter to requested tables
tableSet := make(map[string]bool)
for _, t := range tables {
tableSet[t] = true
}

var result []*schema.Table
for _, t := range realm.Schemas[0].Tables {
if tableSet[t.Name] {
result = append(result, t)
}
}
return result, nil
}

// ComputeSchemaHash computes a deterministic SHA-256 hash of the specified tables
func (sm *SchemaManager) ComputeSchemaHash(ctx context.Context, tables []string) (string, error) {
inspected, err := sm.InspectTables(ctx, tables)
if err != nil {
return "", err
}

// Sort tables by name for determinism
sort.Slice(inspected, func(i, j int) bool {
return inspected[i].Name < inspected[j].Name
})

h := sha256.New()
for _, table := range inspected {
if err := hashTable(h, table); err != nil {
return "", err
}
}

return hex.EncodeToString(h.Sum(nil)), nil
}

// hashTable writes a deterministic representation of a table to the hasher
func hashTable(h io.Writer, table *schema.Table) error {
// Sort columns by name for determinism
cols := make([]*schema.Column, len(table.Columns))
copy(cols, table.Columns)
sort.Slice(cols, func(i, j int) bool {
return cols[i].Name < cols[j].Name
})

// Write table name
h.Write([]byte(table.Name))

// Write each column: |name:type:notnull:pk
for _, col := range cols {
isPK := false
if table.PrimaryKey != nil {
for _, pkCol := range table.PrimaryKey.Parts {
if pkCol.C != nil && pkCol.C.Name == col.Name {
isPK = true
break
}
}
}

// Normalize type string using Atlas's type representation
typeStr := col.Type.Raw
if typeStr == "" && col.Type.Type != nil {
typeStr = fmt.Sprintf("%T", col.Type.Type)
}

h.Write([]byte(fmt.Sprintf("|%s:%s:%t:%t",
col.Name, typeStr, !col.Type.Null, isPK)))
}
h.Write([]byte("\n"))
return nil
}

// Close closes the Atlas driver connection (noop as Atlas driver doesn't expose Close)
func (sm *SchemaManager) Close() error {
// Atlas migrate.Driver interface doesn't have Close method
// The underlying DB connection is managed externally
return nil
}
91 changes: 91 additions & 0 deletions db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type SqliteStreamDB struct {
prefix string
watchTablesSchema map[string][]*ColumnInfo
stats *statsSqliteStreamDB
schemaCache *SchemaCache
}

type ColumnInfo struct {
Expand Down Expand Up @@ -179,12 +180,56 @@ func (conn *SqliteStreamDB) InstallCDC(tables []string) error {
conn.watchTablesSchema[n] = colInfo
}

// Create schema version table
createSchemaVersionTable := `
CREATE TABLE IF NOT EXISTS __harmonylite__schema_version (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_hash TEXT NOT NULL,
updated_at INTEGER NOT NULL,
harmonylite_version TEXT
);`
_, err := tx.Exec(createSchemaVersionTable)
if err != nil {
return fmt.Errorf("creating schema version table: %w", err)
}

return nil
})
if err != nil {
return err
}

// Initialize schema cache
ctx := context.Background()
rawDB, ok := sqlConn.DB().Db.(*sql.DB)
if !ok {
return fmt.Errorf("failed to get underlying *sql.DB connection")
}
schemaManager, err := NewSchemaManager(rawDB)
if err != nil {
return fmt.Errorf("creating schema manager: %w", err)
}

conn.schemaCache = NewSchemaCache()
if err := conn.schemaCache.Initialize(ctx, schemaManager, tables); err != nil {
return fmt.Errorf("initializing schema cache: %w", err)
}

// Store schema hash in database
hash := conn.schemaCache.GetSchemaHash()
log.Info().Str("schema_hash", hash[:16]+"...").Msg("Computed schema hash")

err = sqlConn.DB().WithTx(func(tx *goqu.TxDatabase) error {
_, err := tx.Exec(`
INSERT OR REPLACE INTO __harmonylite__schema_version (id, schema_hash, updated_at, harmonylite_version)
VALUES (1, ?, ?, ?)
`, hash, time.Now().Unix(), "dev")
return err
})
if err != nil {
return fmt.Errorf("storing schema hash: %w", err)
}

err = conn.installChangeLogTriggers()
if err != nil {
return err
Expand Down Expand Up @@ -333,6 +378,52 @@ func (conn *SqliteStreamDB) GetRawConnection() *sqlite3.SQLiteConn {
return conn.rawConnection
}

// GetSchemaHash returns the cached schema hash
func (conn *SqliteStreamDB) GetSchemaHash() string {
if conn.schemaCache == nil {
return ""
}
return conn.schemaCache.GetSchemaHash()
}

// GetSchemaCache returns the schema cache for direct access
func (conn *SqliteStreamDB) GetSchemaCache() *SchemaCache {
return conn.schemaCache
}

// UpdateSchemaState recomputes the schema hash and updates the version table
func (conn *SqliteStreamDB) UpdateSchemaState() error {
if conn.schemaCache == nil || !conn.schemaCache.IsInitialized() {
return fmt.Errorf("schema cache not initialized")
}

ctx := context.Background()
newHash, err := conn.schemaCache.Recompute(ctx)
if err != nil {
return fmt.Errorf("recomputing schema hash: %w", err)
}

sqlConn, err := conn.pool.Borrow()
if err != nil {
return err
}
defer sqlConn.Return()

err = sqlConn.DB().WithTx(func(tx *goqu.TxDatabase) error {
_, err := tx.Exec(`
INSERT OR REPLACE INTO __harmonylite__schema_version (id, schema_hash, updated_at, harmonylite_version)
VALUES (1, ?, ?, ?)
`, newHash, time.Now().Unix(), "dev")
return err
})
if err != nil {
return fmt.Errorf("updating schema version table: %w", err)
}

log.Info().Str("schema_hash", newHash[:16]+"...").Msg("Updated schema hash")
return nil
}

func (conn *SqliteStreamDB) GetPath() string {
return conn.dbPath
}
Expand Down
Loading