Files
wotra/internal/handler/sync_handler.go
Andreas Schneider d8366f5c25 Add sync redesign with offline fallback (M9)
- Migration 003: adds logged_at to sync_log for TTL pruning; migrates
  settings_history to UUID TEXT PK with updated_at column
- SyncStore: Prune() deletes rows older than 30d and writes a '_pruned'
  marker at the boundary version; Pull() calls Prune lazily and returns
  ErrSyncStale (410) when the client's since_version is behind the marker
- sync_handler.go: GET /api/sync/pull?since=N; POST /api/sync/push with
  last-updated_at-wins conflict resolution for entries, balance_adjustments,
  settings_history; closed_days/closed_weeks skipped (server-only mutations)
- router.go: passes entryStore, adjustmentStore, settingsStore to SyncHandler
- settings_store.go: UUID PK, updated_at column, Upsert() for push path
- settings_service.go: generates UUID on create, sets updated_at on update
- settings_handler.go: ID params changed from int64 to string
- domain.go: Settings.ID string, Settings.UpdatedAt added
- client.ts: all mutation methods catch TypeError (offline) and fall back
  to Dexie write + outbox enqueue; crypto.randomUUID() for offline creates;
  Settings.id type changed to string
- db.ts: Dexie v3 — settings_history key path changed to string UUID;
  upgrade handler clears table for repopulation via pull
- sync.ts: real pushOutbox to POST /api/sync/push; pullChanges uses GET
  with ?since=N; 410 triggers coldStart() + retry; coldStart() wipes all
  tables and resets last_version
- 4 new Go store tests covering normal pull, stale client, empty prune,
  client-ahead-of-marker; all tests pass (store + service, 19 Vitest)
2026-04-30 22:50:33 +02:00

291 lines
7.6 KiB
Go

package handler
import (
"context"
"database/sql"
"encoding/json"
"errors"
"net/http"
"strconv"
"time"
"github.com/go-chi/chi/v5"
"github.com/wotra/wotra/internal/domain"
"github.com/wotra/wotra/internal/store"
)
// SyncHandler serves /api/sync routes.
type SyncHandler struct {
sync *store.SyncStore
entries *store.EntryStore
adjustments *store.BalanceAdjustmentStore
settings *store.SettingsStore
}
func NewSyncHandler(
sync *store.SyncStore,
entries *store.EntryStore,
adjustments *store.BalanceAdjustmentStore,
settings *store.SettingsStore,
) *SyncHandler {
return &SyncHandler{
sync: sync,
entries: entries,
adjustments: adjustments,
settings: settings,
}
}
func (h *SyncHandler) Routes(r chi.Router) {
r.Get("/sync/pull", h.Pull)
r.Post("/sync/push", h.Push)
}
// Pull GET /api/sync/pull?since=N
func (h *SyncHandler) Pull(w http.ResponseWriter, r *http.Request) {
sinceStr := r.URL.Query().Get("since")
var since int64
if sinceStr != "" {
var err error
since, err = strconv.ParseInt(sinceStr, 10, 64)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid since parameter")
return
}
}
changes, serverVersion, err := h.sync.Pull(r.Context(), since)
if err != nil {
if errors.Is(err, store.ErrSyncStale) {
writeError(w, http.StatusGone, "sync_stale")
return
}
writeError(w, http.StatusInternalServerError, err.Error())
return
}
// Return empty array rather than null.
if changes == nil {
changes = []store.SyncChange{}
}
writeJSON(w, http.StatusOK, map[string]any{
"changes": changes,
"server_version": serverVersion,
})
}
// pushItem is a single change submitted by the client.
type pushItem struct {
Entity string `json:"entity"`
EntityID string `json:"entity_id"`
Op string `json:"op"` // "upsert" | "delete"
Payload json.RawMessage `json:"payload"`
}
// Push POST /api/sync/push
func (h *SyncHandler) Push(w http.ResponseWriter, r *http.Request) {
var body struct {
Changes []pushItem `json:"changes"`
}
if err := decodeJSON(r, &body); err != nil {
writeError(w, http.StatusBadRequest, "invalid JSON")
return
}
ctx := r.Context()
var applied, skipped []string
for _, item := range body.Changes {
ok, err := h.applyPushItem(ctx, item)
if err != nil {
// Log and skip on unexpected errors; don't abort the whole push.
skipped = append(skipped, item.EntityID)
continue
}
if ok {
applied = append(applied, item.EntityID)
} else {
skipped = append(skipped, item.EntityID)
}
}
if applied == nil {
applied = []string{}
}
if skipped == nil {
skipped = []string{}
}
writeJSON(w, http.StatusOK, map[string]any{
"applied": applied,
"skipped": skipped,
})
}
// applyPushItem applies one client change. Returns (true, nil) if applied,
// (false, nil) if skipped (e.g. server row is newer), (false, err) on error.
func (h *SyncHandler) applyPushItem(ctx context.Context, item pushItem) (bool, error) {
switch item.Entity {
case "entries":
return h.applyEntry(ctx, item)
case "balance_adjustments":
return h.applyBalanceAdjustment(ctx, item)
case "settings_history":
return h.applySettings(ctx, item)
default:
// closed_days, closed_weeks — server-only mutations; skip silently.
return false, nil
}
}
// ── entries ───────────────────────────────────────────────────────────────────
func (h *SyncHandler) applyEntry(ctx context.Context, item pushItem) (bool, error) {
if item.Op == "delete" {
var payload struct {
ID string `json:"id"`
UpdatedAt int64 `json:"updated_at"`
}
if err := json.Unmarshal(item.Payload, &payload); err != nil {
return false, err
}
now := time.Now().UnixMilli()
// Only soft-delete if server row is not newer.
existing, err := h.entries.GetByID(ctx, item.EntityID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return true, nil // already gone
}
return false, err
}
if existing.UpdatedAt > payload.UpdatedAt {
return false, nil // server is newer
}
if err := h.entries.SoftDelete(ctx, item.EntityID, now); err != nil {
return false, err
}
if err := h.sync.LogEntryDelete(ctx, item.EntityID); err != nil {
return false, err
}
return true, nil
}
// upsert
var e domain.Entry
if err := json.Unmarshal(item.Payload, &e); err != nil {
return false, err
}
existing, err := h.entries.GetByID(ctx, e.ID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return false, err
}
if existing != nil && existing.UpdatedAt >= e.UpdatedAt {
return false, nil // server is newer or equal
}
if existing == nil {
if err := h.entries.Create(ctx, &e); err != nil {
return false, err
}
} else {
if err := h.entries.Update(ctx, &e); err != nil {
return false, err
}
}
if err := h.sync.LogEntry(ctx, &e); err != nil {
return false, err
}
return true, nil
}
// ── balance_adjustments ───────────────────────────────────────────────────────
func (h *SyncHandler) applyBalanceAdjustment(ctx context.Context, item pushItem) (bool, error) {
if item.Op == "delete" {
var payload struct {
UpdatedAt int64 `json:"updated_at"`
}
if err := json.Unmarshal(item.Payload, &payload); err != nil {
return false, err
}
existing, err := h.adjustments.GetByID(ctx, item.EntityID)
if err != nil {
if errors.Is(err, store.ErrAdjustmentNotFound) {
return true, nil // already gone
}
return false, err
}
if existing.UpdatedAt > payload.UpdatedAt {
return false, nil // server is newer
}
if err := h.adjustments.Delete(ctx, item.EntityID); err != nil {
return false, err
}
if err := h.sync.LogBalanceAdjustmentDelete(ctx, item.EntityID); err != nil {
return false, err
}
return true, nil
}
// upsert
var a domain.BalanceAdjustment
if err := json.Unmarshal(item.Payload, &a); err != nil {
return false, err
}
existing, err := h.adjustments.GetByID(ctx, a.ID)
if err != nil && !errors.Is(err, store.ErrAdjustmentNotFound) {
return false, err
}
if existing != nil && existing.UpdatedAt >= a.UpdatedAt {
return false, nil
}
if existing == nil {
if err := h.adjustments.Create(ctx, &a); err != nil {
return false, err
}
} else {
if err := h.adjustments.Update(ctx, &a); err != nil {
return false, err
}
}
if err := h.sync.LogBalanceAdjustment(ctx, &a); err != nil {
return false, err
}
return true, nil
}
// ── settings_history ──────────────────────────────────────────────────────────
func (h *SyncHandler) applySettings(ctx context.Context, item pushItem) (bool, error) {
if item.Op == "delete" {
// Refuse to delete if it would leave zero rows (same rule as service).
count, err := h.settings.Count(ctx)
if err != nil {
return false, err
}
if count <= 1 {
return false, nil // skip silently
}
if err := h.settings.Delete(ctx, item.EntityID); err != nil {
return false, err
}
if err := h.sync.LogSettingsDelete(ctx, item.EntityID); err != nil {
return false, err
}
return true, nil
}
// upsert — last updated_at wins via store.Upsert
var s domain.Settings
if err := json.Unmarshal(item.Payload, &s); err != nil {
return false, err
}
if err := h.settings.Upsert(ctx, &s); err != nil {
return false, err
}
if err := h.sync.LogSettings(ctx, &s); err != nil {
return false, err
}
return true, nil
}