package notification import ( "context" "errors" "time" "github.com/google/uuid" "go.uber.org/zap" ) // Worker drains the notification routes table: per tick it walks due // rows under `SELECT … FOR UPDATE SKIP LOCKED`, dispatches each through // the matching channel, and atomically updates the route status. // Implements `internal/app.Component`. type Worker struct { svc *Service } // NewWorker constructs a Worker bound to svc. func NewWorker(svc *Service) *Worker { return &Worker{svc: svc} } // Run drives the scan loop until ctx is cancelled. The first tick is // the startup-drain pass: rows queued before the process restart get // retried immediately rather than waiting for the first interval. func (w *Worker) Run(ctx context.Context) error { if w == nil { return nil } logger := w.svc.deps.Logger.Named("worker") if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) { logger.Warn("initial notification routes drain failed", zap.Error(err)) } interval := w.svc.deps.Config.WorkerInterval if interval <= 0 { interval = 5 * time.Second } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return nil case <-ticker.C: if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) { logger.Warn("notification routes tick failed", zap.Error(err)) } } } } // Shutdown is a no-op: each per-row transaction is self-contained, so // a cancelled ctx above the loop is enough to stop the worker. func (w *Worker) Shutdown(_ context.Context) error { return nil } // Tick is exposed for tests so they can drive the worker without // timing dependencies. func (w *Worker) Tick(ctx context.Context) error { return w.tick(ctx) } // tick processes up to claimBatchSize rows. Each row is handled in its // own transaction so a slow channel only holds one row lock at a time. // The loop exits as soon as a tick claims zero rows or ctx is // cancelled. Rows already handled in this tick are tracked in `seen` // and excluded from subsequent claims so a transient retry scheduled // with next_attempt_at <= now() does not chew through MaxAttempts in a // single tick (mirrors the mail-worker pattern). func (w *Worker) tick(ctx context.Context) error { seen := make([]uuid.UUID, 0, claimBatchSize) for range claimBatchSize { if ctx.Err() != nil { return ctx.Err() } more, processed, err := w.processOne(ctx, seen) if err != nil { return err } if !more { return nil } seen = append(seen, processed) } return nil } // processOne claims a single due route, dispatches it, and commits the // state transition. Returns more=false when no row was due. func (w *Worker) processOne(ctx context.Context, exclude []uuid.UUID) (bool, uuid.UUID, error) { tx, err := w.svc.deps.Store.BeginTx(ctx) if err != nil { return false, uuid.Nil, err } defer func() { _ = tx.Rollback() }() claimed, err := w.svc.deps.Store.ClaimDueRoutes(ctx, tx, 1, exclude...) if err != nil { return false, uuid.Nil, err } if len(claimed) == 0 { return false, uuid.Nil, nil } c := claimed[0] dispatchErr := w.svc.performDispatch(ctx, c) at := w.svc.nowUTC() if err := w.svc.finaliseDispatch(ctx, tx, c, dispatchErr, at); err != nil { return false, uuid.Nil, err } if err := tx.Commit(); err != nil { return false, uuid.Nil, err } return true, c.Route.RouteID, nil } // Compile-time check that Worker satisfies the lifecycle interface // shape used elsewhere (Run + Shutdown). var _ interface { Run(context.Context) error Shutdown(context.Context) error } = (*Worker)(nil)