package handler import ( "context" "database/sql" "encoding/json" "errors" "net/http" "strconv" "time" "github.com/go-chi/chi/v5" "github.com/wotra/wotra/internal/domain" "github.com/wotra/wotra/internal/store" ) // SyncHandler serves /api/sync routes. type SyncHandler struct { sync *store.SyncStore entries *store.EntryStore adjustments *store.BalanceAdjustmentStore settings *store.SettingsStore } func NewSyncHandler( sync *store.SyncStore, entries *store.EntryStore, adjustments *store.BalanceAdjustmentStore, settings *store.SettingsStore, ) *SyncHandler { return &SyncHandler{ sync: sync, entries: entries, adjustments: adjustments, settings: settings, } } func (h *SyncHandler) Routes(r chi.Router) { r.Get("/sync/pull", h.Pull) r.Post("/sync/push", h.Push) } // Pull GET /api/sync/pull?since=N func (h *SyncHandler) Pull(w http.ResponseWriter, r *http.Request) { sinceStr := r.URL.Query().Get("since") var since int64 if sinceStr != "" { var err error since, err = strconv.ParseInt(sinceStr, 10, 64) if err != nil { writeError(w, http.StatusBadRequest, "invalid since parameter") return } } changes, serverVersion, err := h.sync.Pull(r.Context(), since) if err != nil { if errors.Is(err, store.ErrSyncStale) { writeError(w, http.StatusGone, "sync_stale") return } writeError(w, http.StatusInternalServerError, err.Error()) return } // Return empty array rather than null. if changes == nil { changes = []store.SyncChange{} } writeJSON(w, http.StatusOK, map[string]any{ "changes": changes, "server_version": serverVersion, }) } // pushItem is a single change submitted by the client. type pushItem struct { Entity string `json:"entity"` EntityID string `json:"entity_id"` Op string `json:"op"` // "upsert" | "delete" Payload json.RawMessage `json:"payload"` } // Push POST /api/sync/push func (h *SyncHandler) Push(w http.ResponseWriter, r *http.Request) { var body struct { Changes []pushItem `json:"changes"` } if err := decodeJSON(r, &body); err != nil { writeError(w, http.StatusBadRequest, "invalid JSON") return } ctx := r.Context() var applied, skipped []string for _, item := range body.Changes { ok, err := h.applyPushItem(ctx, item) if err != nil { // Log and skip on unexpected errors; don't abort the whole push. skipped = append(skipped, item.EntityID) continue } if ok { applied = append(applied, item.EntityID) } else { skipped = append(skipped, item.EntityID) } } if applied == nil { applied = []string{} } if skipped == nil { skipped = []string{} } writeJSON(w, http.StatusOK, map[string]any{ "applied": applied, "skipped": skipped, }) } // applyPushItem applies one client change. Returns (true, nil) if applied, // (false, nil) if skipped (e.g. server row is newer), (false, err) on error. func (h *SyncHandler) applyPushItem(ctx context.Context, item pushItem) (bool, error) { switch item.Entity { case "entries": return h.applyEntry(ctx, item) case "balance_adjustments": return h.applyBalanceAdjustment(ctx, item) case "settings_history": return h.applySettings(ctx, item) default: // closed_days, closed_weeks — server-only mutations; skip silently. return false, nil } } // ── entries ─────────────────────────────────────────────────────────────────── func (h *SyncHandler) applyEntry(ctx context.Context, item pushItem) (bool, error) { if item.Op == "delete" { var payload struct { ID string `json:"id"` UpdatedAt int64 `json:"updated_at"` } if err := json.Unmarshal(item.Payload, &payload); err != nil { return false, err } now := time.Now().UnixMilli() // Only soft-delete if server row is not newer. existing, err := h.entries.GetByID(ctx, item.EntityID) if err != nil { if errors.Is(err, sql.ErrNoRows) { return true, nil // already gone } return false, err } if existing.UpdatedAt > payload.UpdatedAt { return false, nil // server is newer } if err := h.entries.SoftDelete(ctx, item.EntityID, now); err != nil { return false, err } if err := h.sync.LogEntryDelete(ctx, item.EntityID); err != nil { return false, err } return true, nil } // upsert var e domain.Entry if err := json.Unmarshal(item.Payload, &e); err != nil { return false, err } existing, err := h.entries.GetByID(ctx, e.ID) if err != nil && !errors.Is(err, sql.ErrNoRows) { return false, err } if existing != nil && existing.UpdatedAt >= e.UpdatedAt { return false, nil // server is newer or equal } if existing == nil { if err := h.entries.Create(ctx, &e); err != nil { return false, err } } else { if err := h.entries.Update(ctx, &e); err != nil { return false, err } } if err := h.sync.LogEntry(ctx, &e); err != nil { return false, err } return true, nil } // ── balance_adjustments ─────────────────────────────────────────────────────── func (h *SyncHandler) applyBalanceAdjustment(ctx context.Context, item pushItem) (bool, error) { if item.Op == "delete" { var payload struct { UpdatedAt int64 `json:"updated_at"` } if err := json.Unmarshal(item.Payload, &payload); err != nil { return false, err } existing, err := h.adjustments.GetByID(ctx, item.EntityID) if err != nil { if errors.Is(err, store.ErrAdjustmentNotFound) { return true, nil // already gone } return false, err } if existing.UpdatedAt > payload.UpdatedAt { return false, nil // server is newer } if err := h.adjustments.Delete(ctx, item.EntityID); err != nil { return false, err } if err := h.sync.LogBalanceAdjustmentDelete(ctx, item.EntityID); err != nil { return false, err } return true, nil } // upsert var a domain.BalanceAdjustment if err := json.Unmarshal(item.Payload, &a); err != nil { return false, err } existing, err := h.adjustments.GetByID(ctx, a.ID) if err != nil && !errors.Is(err, store.ErrAdjustmentNotFound) { return false, err } if existing != nil && existing.UpdatedAt >= a.UpdatedAt { return false, nil } if existing == nil { if err := h.adjustments.Create(ctx, &a); err != nil { return false, err } } else { if err := h.adjustments.Update(ctx, &a); err != nil { return false, err } } if err := h.sync.LogBalanceAdjustment(ctx, &a); err != nil { return false, err } return true, nil } // ── settings_history ────────────────────────────────────────────────────────── func (h *SyncHandler) applySettings(ctx context.Context, item pushItem) (bool, error) { if item.Op == "delete" { // Refuse to delete if it would leave zero rows (same rule as service). count, err := h.settings.Count(ctx) if err != nil { return false, err } if count <= 1 { return false, nil // skip silently } if err := h.settings.Delete(ctx, item.EntityID); err != nil { return false, err } if err := h.sync.LogSettingsDelete(ctx, item.EntityID); err != nil { return false, err } return true, nil } // upsert — last updated_at wins via store.Upsert var s domain.Settings if err := json.Unmarshal(item.Payload, &s); err != nil { return false, err } if err := h.settings.Upsert(ctx, &s); err != nil { return false, err } if err := h.sync.LogSettings(ctx, &s); err != nil { return false, err } return true, nil }