- Add LogClosedDayDelete and LogClosedWeekDelete to SyncStore - Inject syncStore into EntryService; log Start, Stop, StopByID, Update, CreateInterval, Delete, AutoStopStalledEntries - Inject syncStore into DayService; log CloseDay, MarkDay, ReopenDay, and the recomputeWeek closed-week upsert - Inject syncStore into SettingsService; log Upsert, UpdateSettings, DeleteSettings - Add LogClosedWeek/LogClosedWeekDelete calls in WeekService.CloseWeek and ReopenWeek - Update main.go and all service test helpers for new constructor signatures - All Go tests and 19 Vitest tests pass
215 lines
6.5 KiB
Go
215 lines
6.5 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/wotra/wotra/internal/domain"
|
|
)
|
|
|
|
// ErrSyncStale is returned when the client's since_version is behind the prune marker.
|
|
var ErrSyncStale = errors.New("sync state stale: full re-sync required")
|
|
|
|
// pruneEntity and pruneOp are sentinel values written as a prune marker row.
|
|
const pruneEntity = "_pruned"
|
|
const pruneOp = "marker"
|
|
|
|
// SyncStore manages the sync_log.
|
|
type SyncStore struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
func NewSyncStore(db *sql.DB) *SyncStore {
|
|
return &SyncStore{db: db}
|
|
}
|
|
|
|
type SyncChange struct {
|
|
Entity string `json:"entity"`
|
|
EntityID string `json:"entity_id"`
|
|
Op string `json:"op"` // "upsert" | "delete" | "marker"
|
|
Version int64 `json:"version"`
|
|
Payload string `json:"payload"`
|
|
}
|
|
|
|
// Pull returns all sync_log rows with version > sinceVersion.
|
|
// It calls Prune first with a 30-day TTL.
|
|
// If the client is behind a prune marker it returns ErrSyncStale.
|
|
func (s *SyncStore) Pull(ctx context.Context, sinceVersion int64) ([]SyncChange, int64, error) {
|
|
if err := s.Prune(ctx, 30*24*time.Hour); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
rows, err := s.db.QueryContext(ctx,
|
|
`SELECT entity, entity_id, op, version, payload FROM sync_log
|
|
WHERE version > ? ORDER BY version ASC`, sinceVersion)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var changes []SyncChange
|
|
var maxVersion int64 = sinceVersion
|
|
for rows.Next() {
|
|
var c SyncChange
|
|
if err := rows.Scan(&c.Entity, &c.EntityID, &c.Op, &c.Version, &c.Payload); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
// First row with entity="_pruned" means client is stale.
|
|
if c.Entity == pruneEntity {
|
|
return nil, 0, ErrSyncStale
|
|
}
|
|
if c.Version > maxVersion {
|
|
maxVersion = c.Version
|
|
}
|
|
changes = append(changes, c)
|
|
}
|
|
return changes, maxVersion, rows.Err()
|
|
}
|
|
|
|
// Prune deletes sync_log rows older than ttl and inserts a prune marker at the
|
|
// version boundary so stale clients can detect they need a full re-sync.
|
|
func (s *SyncStore) Prune(ctx context.Context, ttl time.Duration) error {
|
|
cutoff := time.Now().Add(-ttl).UnixMilli()
|
|
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback() //nolint:errcheck
|
|
|
|
// Find max version among rows that will be pruned (excluding existing markers).
|
|
var maxPruned sql.NullInt64
|
|
err = tx.QueryRowContext(ctx,
|
|
`SELECT MAX(version) FROM sync_log WHERE logged_at < ? AND entity != ?`,
|
|
cutoff, pruneEntity).Scan(&maxPruned)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !maxPruned.Valid {
|
|
// Nothing to prune.
|
|
return tx.Commit()
|
|
}
|
|
|
|
// Delete old rows (but not the existing marker, if any).
|
|
if _, err = tx.ExecContext(ctx,
|
|
`DELETE FROM sync_log WHERE logged_at < ? AND entity != ?`,
|
|
cutoff, pruneEntity); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Insert (or replace) the prune marker at the boundary version.
|
|
now := time.Now().UnixMilli()
|
|
if _, err = tx.ExecContext(ctx,
|
|
`INSERT OR REPLACE INTO sync_log (entity, entity_id, op, version, payload, logged_at)
|
|
VALUES (?, ?, ?, ?, '{}', ?)`,
|
|
pruneEntity, pruneEntity, pruneOp, maxPruned.Int64, now); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// nextVersion returns the next monotonic version number.
|
|
func (s *SyncStore) nextVersion(ctx context.Context) (int64, error) {
|
|
var max sql.NullInt64
|
|
err := s.db.QueryRowContext(ctx, `SELECT MAX(version) FROM sync_log`).Scan(&max)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if !max.Valid {
|
|
return 1, nil
|
|
}
|
|
return max.Int64 + 1, nil
|
|
}
|
|
|
|
// LogEntry appends an entry upsert to the sync log.
|
|
func (s *SyncStore) LogEntry(ctx context.Context, e *domain.Entry) error {
|
|
payload, err := json.Marshal(e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.log(ctx, "entries", e.ID, "upsert", string(payload))
|
|
}
|
|
|
|
// LogEntryDelete appends an entry delete to the sync log.
|
|
func (s *SyncStore) LogEntryDelete(ctx context.Context, id string) error {
|
|
payload := fmt.Sprintf(`{"id":%q}`, id)
|
|
return s.log(ctx, "entries", id, "delete", payload)
|
|
}
|
|
|
|
// LogClosedDay appends a closed_day upsert to the sync log.
|
|
func (s *SyncStore) LogClosedDay(ctx context.Context, d *domain.ClosedDay) error {
|
|
payload, err := json.Marshal(d)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.log(ctx, "closed_days", d.DayKey, "upsert", string(payload))
|
|
}
|
|
|
|
// LogClosedDayDelete appends a closed_day delete to the sync log.
|
|
func (s *SyncStore) LogClosedDayDelete(ctx context.Context, dayKey string) error {
|
|
payload := fmt.Sprintf(`{"day_key":%q}`, dayKey)
|
|
return s.log(ctx, "closed_days", dayKey, "delete", payload)
|
|
}
|
|
|
|
// LogClosedWeek appends a closed_week upsert to the sync log.
|
|
func (s *SyncStore) LogClosedWeek(ctx context.Context, w *domain.ClosedWeek) error {
|
|
payload, err := json.Marshal(w)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.log(ctx, "closed_weeks", w.WeekKey, "upsert", string(payload))
|
|
}
|
|
|
|
// LogClosedWeekDelete appends a closed_week delete to the sync log.
|
|
func (s *SyncStore) LogClosedWeekDelete(ctx context.Context, weekKey string) error {
|
|
payload := fmt.Sprintf(`{"week_key":%q}`, weekKey)
|
|
return s.log(ctx, "closed_weeks", weekKey, "delete", payload)
|
|
}
|
|
|
|
// LogSettings appends a settings upsert to the sync log.
|
|
func (s *SyncStore) LogSettings(ctx context.Context, set *domain.Settings) error {
|
|
payload, err := json.Marshal(set)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.log(ctx, "settings_history", set.ID, "upsert", string(payload))
|
|
}
|
|
|
|
// LogSettingsDelete appends a settings delete to the sync log.
|
|
func (s *SyncStore) LogSettingsDelete(ctx context.Context, id string) error {
|
|
payload := fmt.Sprintf(`{"id":%q}`, id)
|
|
return s.log(ctx, "settings_history", id, "delete", payload)
|
|
}
|
|
|
|
// LogBalanceAdjustment appends a balance_adjustment upsert to the sync log.
|
|
func (s *SyncStore) LogBalanceAdjustment(ctx context.Context, a *domain.BalanceAdjustment) error {
|
|
payload, err := json.Marshal(a)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.log(ctx, "balance_adjustments", a.ID, "upsert", string(payload))
|
|
}
|
|
|
|
// LogBalanceAdjustmentDelete appends a balance_adjustment delete to the sync log.
|
|
func (s *SyncStore) LogBalanceAdjustmentDelete(ctx context.Context, id string) error {
|
|
payload := fmt.Sprintf(`{"id":%q}`, id)
|
|
return s.log(ctx, "balance_adjustments", id, "delete", payload)
|
|
}
|
|
|
|
func (s *SyncStore) log(ctx context.Context, entity, entityID, op, payload string) error {
|
|
version, err := s.nextVersion(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
now := time.Now().UnixMilli()
|
|
_, err = s.db.ExecContext(ctx,
|
|
`INSERT INTO sync_log (entity, entity_id, op, version, payload, logged_at) VALUES (?, ?, ?, ?, ?, ?)`,
|
|
entity, entityID, op, version, payload, now)
|
|
return err
|
|
}
|