package store import ( "context" "database/sql" "encoding/json" "errors" "fmt" "time" "github.com/wotra/wotra/internal/domain" ) // ErrSyncStale is returned when the client's since_version is behind the prune marker. var ErrSyncStale = errors.New("sync state stale: full re-sync required") // pruneEntity and pruneOp are sentinel values written as a prune marker row. const pruneEntity = "_pruned" const pruneOp = "marker" // SyncStore manages the sync_log. type SyncStore struct { db *sql.DB } func NewSyncStore(db *sql.DB) *SyncStore { return &SyncStore{db: db} } type SyncChange struct { Entity string `json:"entity"` EntityID string `json:"entity_id"` Op string `json:"op"` // "upsert" | "delete" | "marker" Version int64 `json:"version"` Payload string `json:"payload"` } // Pull returns all sync_log rows with version > sinceVersion. // It calls Prune first with a 30-day TTL. // If the client is behind a prune marker it returns ErrSyncStale. func (s *SyncStore) Pull(ctx context.Context, sinceVersion int64) ([]SyncChange, int64, error) { if err := s.Prune(ctx, 30*24*time.Hour); err != nil { return nil, 0, err } rows, err := s.db.QueryContext(ctx, `SELECT entity, entity_id, op, version, payload FROM sync_log WHERE version > ? ORDER BY version ASC`, sinceVersion) if err != nil { return nil, 0, err } defer rows.Close() var changes []SyncChange var maxVersion int64 = sinceVersion for rows.Next() { var c SyncChange if err := rows.Scan(&c.Entity, &c.EntityID, &c.Op, &c.Version, &c.Payload); err != nil { return nil, 0, err } // First row with entity="_pruned" means client is stale. if c.Entity == pruneEntity { return nil, 0, ErrSyncStale } if c.Version > maxVersion { maxVersion = c.Version } changes = append(changes, c) } return changes, maxVersion, rows.Err() } // Prune deletes sync_log rows older than ttl and inserts a prune marker at the // version boundary so stale clients can detect they need a full re-sync. func (s *SyncStore) Prune(ctx context.Context, ttl time.Duration) error { cutoff := time.Now().Add(-ttl).UnixMilli() tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() //nolint:errcheck // Find max version among rows that will be pruned (excluding existing markers). var maxPruned sql.NullInt64 err = tx.QueryRowContext(ctx, `SELECT MAX(version) FROM sync_log WHERE logged_at < ? AND entity != ?`, cutoff, pruneEntity).Scan(&maxPruned) if err != nil { return err } if !maxPruned.Valid { // Nothing to prune. return tx.Commit() } // Delete old rows (but not the existing marker, if any). if _, err = tx.ExecContext(ctx, `DELETE FROM sync_log WHERE logged_at < ? AND entity != ?`, cutoff, pruneEntity); err != nil { return err } // Insert (or replace) the prune marker at the boundary version. now := time.Now().UnixMilli() if _, err = tx.ExecContext(ctx, `INSERT OR REPLACE INTO sync_log (entity, entity_id, op, version, payload, logged_at) VALUES (?, ?, ?, ?, '{}', ?)`, pruneEntity, pruneEntity, pruneOp, maxPruned.Int64, now); err != nil { return err } return tx.Commit() } // nextVersion returns the next monotonic version number. func (s *SyncStore) nextVersion(ctx context.Context) (int64, error) { var max sql.NullInt64 err := s.db.QueryRowContext(ctx, `SELECT MAX(version) FROM sync_log`).Scan(&max) if err != nil { return 0, err } if !max.Valid { return 1, nil } return max.Int64 + 1, nil } // LogEntry appends an entry upsert to the sync log. func (s *SyncStore) LogEntry(ctx context.Context, e *domain.Entry) error { payload, err := json.Marshal(e) if err != nil { return err } return s.log(ctx, "entries", e.ID, "upsert", string(payload)) } // LogEntryDelete appends an entry delete to the sync log. func (s *SyncStore) LogEntryDelete(ctx context.Context, id string) error { payload := fmt.Sprintf(`{"id":%q}`, id) return s.log(ctx, "entries", id, "delete", payload) } // LogClosedDay appends a closed_day upsert to the sync log. func (s *SyncStore) LogClosedDay(ctx context.Context, d *domain.ClosedDay) error { payload, err := json.Marshal(d) if err != nil { return err } return s.log(ctx, "closed_days", d.DayKey, "upsert", string(payload)) } // LogClosedWeek appends a closed_week upsert to the sync log. func (s *SyncStore) LogClosedWeek(ctx context.Context, w *domain.ClosedWeek) error { payload, err := json.Marshal(w) if err != nil { return err } return s.log(ctx, "closed_weeks", w.WeekKey, "upsert", string(payload)) } // LogSettings appends a settings upsert to the sync log. func (s *SyncStore) LogSettings(ctx context.Context, set *domain.Settings) error { payload, err := json.Marshal(set) if err != nil { return err } return s.log(ctx, "settings_history", set.ID, "upsert", string(payload)) } // LogSettingsDelete appends a settings delete to the sync log. func (s *SyncStore) LogSettingsDelete(ctx context.Context, id string) error { payload := fmt.Sprintf(`{"id":%q}`, id) return s.log(ctx, "settings_history", id, "delete", payload) } // LogBalanceAdjustment appends a balance_adjustment upsert to the sync log. func (s *SyncStore) LogBalanceAdjustment(ctx context.Context, a *domain.BalanceAdjustment) error { payload, err := json.Marshal(a) if err != nil { return err } return s.log(ctx, "balance_adjustments", a.ID, "upsert", string(payload)) } // LogBalanceAdjustmentDelete appends a balance_adjustment delete to the sync log. func (s *SyncStore) LogBalanceAdjustmentDelete(ctx context.Context, id string) error { payload := fmt.Sprintf(`{"id":%q}`, id) return s.log(ctx, "balance_adjustments", id, "delete", payload) } func (s *SyncStore) log(ctx context.Context, entity, entityID, op, payload string) error { version, err := s.nextVersion(ctx) if err != nil { return err } now := time.Now().UnixMilli() _, err = s.db.ExecContext(ctx, `INSERT INTO sync_log (entity, entity_id, op, version, payload, logged_at) VALUES (?, ?, ?, ?, ?, ?)`, entity, entityID, op, version, payload, now) return err }