package store import ( "context" "database/sql" "encoding/json" "fmt" "github.com/wotra/wotra/internal/domain" ) // SyncStore manages the sync_log and server_version. type SyncStore struct { db *sql.DB } func NewSyncStore(db *sql.DB) *SyncStore { return &SyncStore{db: db} } type SyncChange struct { Entity string `json:"entity"` EntityID string `json:"entity_id"` Op string `json:"op"` // "upsert" | "delete" Version int64 `json:"version"` Payload string `json:"payload"` } // Pull returns all sync_log rows with version > sinceVersion. func (s *SyncStore) Pull(ctx context.Context, sinceVersion int64) ([]SyncChange, int64, error) { rows, err := s.db.QueryContext(ctx, `SELECT entity, entity_id, op, version, payload FROM sync_log WHERE version > ? ORDER BY version ASC`, sinceVersion) if err != nil { return nil, 0, err } defer rows.Close() var changes []SyncChange var maxVersion int64 = sinceVersion for rows.Next() { var c SyncChange if err := rows.Scan(&c.Entity, &c.EntityID, &c.Op, &c.Version, &c.Payload); err != nil { return nil, 0, err } if c.Version > maxVersion { maxVersion = c.Version } changes = append(changes, c) } return changes, maxVersion, rows.Err() } // nextVersion returns the next monotonic version number. func (s *SyncStore) nextVersion(ctx context.Context) (int64, error) { var max sql.NullInt64 err := s.db.QueryRowContext(ctx, `SELECT MAX(version) FROM sync_log`).Scan(&max) if err != nil { return 0, err } if !max.Valid { return 1, nil } return max.Int64 + 1, nil } // LogEntry appends an entry upsert to the sync log. func (s *SyncStore) LogEntry(ctx context.Context, e *domain.Entry) error { payload, err := json.Marshal(e) if err != nil { return err } return s.log(ctx, "entries", e.ID, "upsert", string(payload)) } // LogEntryDelete appends an entry delete to the sync log. func (s *SyncStore) LogEntryDelete(ctx context.Context, id string) error { payload := fmt.Sprintf(`{"id":%q}`, id) return s.log(ctx, "entries", id, "delete", payload) } // LogClosedDay appends a closed_day upsert to the sync log. func (s *SyncStore) LogClosedDay(ctx context.Context, d *domain.ClosedDay) error { payload, err := json.Marshal(d) if err != nil { return err } return s.log(ctx, "closed_days", d.DayKey, "upsert", string(payload)) } // LogClosedWeek appends a closed_week upsert to the sync log. func (s *SyncStore) LogClosedWeek(ctx context.Context, w *domain.ClosedWeek) error { payload, err := json.Marshal(w) if err != nil { return err } return s.log(ctx, "closed_weeks", w.WeekKey, "upsert", string(payload)) } func (s *SyncStore) log(ctx context.Context, entity, entityID, op, payload string) error { version, err := s.nextVersion(ctx) if err != nil { return err } _, err = s.db.ExecContext(ctx, `INSERT INTO sync_log (entity, entity_id, op, version, payload) VALUES (?, ?, ?, ?, ?)`, entity, entityID, op, version, payload) return err }