- Migration 003: adds logged_at to sync_log for TTL pruning; migrates settings_history to UUID TEXT PK with updated_at column - SyncStore: Prune() deletes rows older than 30d and writes a '_pruned' marker at the boundary version; Pull() calls Prune lazily and returns ErrSyncStale (410) when the client's since_version is behind the marker - sync_handler.go: GET /api/sync/pull?since=N; POST /api/sync/push with last-updated_at-wins conflict resolution for entries, balance_adjustments, settings_history; closed_days/closed_weeks skipped (server-only mutations) - router.go: passes entryStore, adjustmentStore, settingsStore to SyncHandler - settings_store.go: UUID PK, updated_at column, Upsert() for push path - settings_service.go: generates UUID on create, sets updated_at on update - settings_handler.go: ID params changed from int64 to string - domain.go: Settings.ID string, Settings.UpdatedAt added - client.ts: all mutation methods catch TypeError (offline) and fall back to Dexie write + outbox enqueue; crypto.randomUUID() for offline creates; Settings.id type changed to string - db.ts: Dexie v3 — settings_history key path changed to string UUID; upgrade handler clears table for repopulation via pull - sync.ts: real pushOutbox to POST /api/sync/push; pullChanges uses GET with ?since=N; 410 triggers coldStart() + retry; coldStart() wipes all tables and resets last_version - 4 new Go store tests covering normal pull, stale client, empty prune, client-ahead-of-marker; all tests pass (store + service, 19 Vitest)
291 lines
7.6 KiB
Go
291 lines
7.6 KiB
Go
package handler
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"net/http"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/wotra/wotra/internal/domain"
|
|
"github.com/wotra/wotra/internal/store"
|
|
)
|
|
|
|
// SyncHandler serves /api/sync routes.
|
|
type SyncHandler struct {
|
|
sync *store.SyncStore
|
|
entries *store.EntryStore
|
|
adjustments *store.BalanceAdjustmentStore
|
|
settings *store.SettingsStore
|
|
}
|
|
|
|
func NewSyncHandler(
|
|
sync *store.SyncStore,
|
|
entries *store.EntryStore,
|
|
adjustments *store.BalanceAdjustmentStore,
|
|
settings *store.SettingsStore,
|
|
) *SyncHandler {
|
|
return &SyncHandler{
|
|
sync: sync,
|
|
entries: entries,
|
|
adjustments: adjustments,
|
|
settings: settings,
|
|
}
|
|
}
|
|
|
|
func (h *SyncHandler) Routes(r chi.Router) {
|
|
r.Get("/sync/pull", h.Pull)
|
|
r.Post("/sync/push", h.Push)
|
|
}
|
|
|
|
// Pull GET /api/sync/pull?since=N
|
|
func (h *SyncHandler) Pull(w http.ResponseWriter, r *http.Request) {
|
|
sinceStr := r.URL.Query().Get("since")
|
|
var since int64
|
|
if sinceStr != "" {
|
|
var err error
|
|
since, err = strconv.ParseInt(sinceStr, 10, 64)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid since parameter")
|
|
return
|
|
}
|
|
}
|
|
|
|
changes, serverVersion, err := h.sync.Pull(r.Context(), since)
|
|
if err != nil {
|
|
if errors.Is(err, store.ErrSyncStale) {
|
|
writeError(w, http.StatusGone, "sync_stale")
|
|
return
|
|
}
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
|
|
// Return empty array rather than null.
|
|
if changes == nil {
|
|
changes = []store.SyncChange{}
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"changes": changes,
|
|
"server_version": serverVersion,
|
|
})
|
|
}
|
|
|
|
// pushItem is a single change submitted by the client.
|
|
type pushItem struct {
|
|
Entity string `json:"entity"`
|
|
EntityID string `json:"entity_id"`
|
|
Op string `json:"op"` // "upsert" | "delete"
|
|
Payload json.RawMessage `json:"payload"`
|
|
}
|
|
|
|
// Push POST /api/sync/push
|
|
func (h *SyncHandler) Push(w http.ResponseWriter, r *http.Request) {
|
|
var body struct {
|
|
Changes []pushItem `json:"changes"`
|
|
}
|
|
if err := decodeJSON(r, &body); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid JSON")
|
|
return
|
|
}
|
|
|
|
ctx := r.Context()
|
|
var applied, skipped []string
|
|
|
|
for _, item := range body.Changes {
|
|
ok, err := h.applyPushItem(ctx, item)
|
|
if err != nil {
|
|
// Log and skip on unexpected errors; don't abort the whole push.
|
|
skipped = append(skipped, item.EntityID)
|
|
continue
|
|
}
|
|
if ok {
|
|
applied = append(applied, item.EntityID)
|
|
} else {
|
|
skipped = append(skipped, item.EntityID)
|
|
}
|
|
}
|
|
|
|
if applied == nil {
|
|
applied = []string{}
|
|
}
|
|
if skipped == nil {
|
|
skipped = []string{}
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"applied": applied,
|
|
"skipped": skipped,
|
|
})
|
|
}
|
|
|
|
// applyPushItem applies one client change. Returns (true, nil) if applied,
|
|
// (false, nil) if skipped (e.g. server row is newer), (false, err) on error.
|
|
func (h *SyncHandler) applyPushItem(ctx context.Context, item pushItem) (bool, error) {
|
|
switch item.Entity {
|
|
case "entries":
|
|
return h.applyEntry(ctx, item)
|
|
case "balance_adjustments":
|
|
return h.applyBalanceAdjustment(ctx, item)
|
|
case "settings_history":
|
|
return h.applySettings(ctx, item)
|
|
default:
|
|
// closed_days, closed_weeks — server-only mutations; skip silently.
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
// ── entries ───────────────────────────────────────────────────────────────────
|
|
|
|
func (h *SyncHandler) applyEntry(ctx context.Context, item pushItem) (bool, error) {
|
|
if item.Op == "delete" {
|
|
var payload struct {
|
|
ID string `json:"id"`
|
|
UpdatedAt int64 `json:"updated_at"`
|
|
}
|
|
if err := json.Unmarshal(item.Payload, &payload); err != nil {
|
|
return false, err
|
|
}
|
|
now := time.Now().UnixMilli()
|
|
// Only soft-delete if server row is not newer.
|
|
existing, err := h.entries.GetByID(ctx, item.EntityID)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return true, nil // already gone
|
|
}
|
|
return false, err
|
|
}
|
|
if existing.UpdatedAt > payload.UpdatedAt {
|
|
return false, nil // server is newer
|
|
}
|
|
if err := h.entries.SoftDelete(ctx, item.EntityID, now); err != nil {
|
|
return false, err
|
|
}
|
|
if err := h.sync.LogEntryDelete(ctx, item.EntityID); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// upsert
|
|
var e domain.Entry
|
|
if err := json.Unmarshal(item.Payload, &e); err != nil {
|
|
return false, err
|
|
}
|
|
existing, err := h.entries.GetByID(ctx, e.ID)
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
return false, err
|
|
}
|
|
if existing != nil && existing.UpdatedAt >= e.UpdatedAt {
|
|
return false, nil // server is newer or equal
|
|
}
|
|
if existing == nil {
|
|
if err := h.entries.Create(ctx, &e); err != nil {
|
|
return false, err
|
|
}
|
|
} else {
|
|
if err := h.entries.Update(ctx, &e); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
if err := h.sync.LogEntry(ctx, &e); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// ── balance_adjustments ───────────────────────────────────────────────────────
|
|
|
|
func (h *SyncHandler) applyBalanceAdjustment(ctx context.Context, item pushItem) (bool, error) {
|
|
if item.Op == "delete" {
|
|
var payload struct {
|
|
UpdatedAt int64 `json:"updated_at"`
|
|
}
|
|
if err := json.Unmarshal(item.Payload, &payload); err != nil {
|
|
return false, err
|
|
}
|
|
existing, err := h.adjustments.GetByID(ctx, item.EntityID)
|
|
if err != nil {
|
|
if errors.Is(err, store.ErrAdjustmentNotFound) {
|
|
return true, nil // already gone
|
|
}
|
|
return false, err
|
|
}
|
|
if existing.UpdatedAt > payload.UpdatedAt {
|
|
return false, nil // server is newer
|
|
}
|
|
if err := h.adjustments.Delete(ctx, item.EntityID); err != nil {
|
|
return false, err
|
|
}
|
|
if err := h.sync.LogBalanceAdjustmentDelete(ctx, item.EntityID); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// upsert
|
|
var a domain.BalanceAdjustment
|
|
if err := json.Unmarshal(item.Payload, &a); err != nil {
|
|
return false, err
|
|
}
|
|
existing, err := h.adjustments.GetByID(ctx, a.ID)
|
|
if err != nil && !errors.Is(err, store.ErrAdjustmentNotFound) {
|
|
return false, err
|
|
}
|
|
if existing != nil && existing.UpdatedAt >= a.UpdatedAt {
|
|
return false, nil
|
|
}
|
|
if existing == nil {
|
|
if err := h.adjustments.Create(ctx, &a); err != nil {
|
|
return false, err
|
|
}
|
|
} else {
|
|
if err := h.adjustments.Update(ctx, &a); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
if err := h.sync.LogBalanceAdjustment(ctx, &a); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// ── settings_history ──────────────────────────────────────────────────────────
|
|
|
|
func (h *SyncHandler) applySettings(ctx context.Context, item pushItem) (bool, error) {
|
|
if item.Op == "delete" {
|
|
// Refuse to delete if it would leave zero rows (same rule as service).
|
|
count, err := h.settings.Count(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if count <= 1 {
|
|
return false, nil // skip silently
|
|
}
|
|
if err := h.settings.Delete(ctx, item.EntityID); err != nil {
|
|
return false, err
|
|
}
|
|
if err := h.sync.LogSettingsDelete(ctx, item.EntityID); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// upsert — last updated_at wins via store.Upsert
|
|
var s domain.Settings
|
|
if err := json.Unmarshal(item.Payload, &s); err != nil {
|
|
return false, err
|
|
}
|
|
if err := h.settings.Upsert(ctx, &s); err != nil {
|
|
return false, err
|
|
}
|
|
if err := h.sync.LogSettings(ctx, &s); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|