package mail import ( "context" "errors" "math" "math/rand/v2" "time" "github.com/google/uuid" "go.uber.org/zap" ) // Worker drains the mail outbox: per tick it walks due rows under // `SELECT … FOR UPDATE SKIP LOCKED`, dispatches each through the SMTP // sender, and atomically updates the delivery + attempt rows. // Implements `internal/app.Component`. type Worker struct { svc *Service } // NewWorker constructs a Worker bound to svc. func NewWorker(svc *Service) *Worker { return &Worker{svc: svc} } // claimBatchSize bounds how many rows the worker processes per tick. // 16 keeps each tick under a second on a developer machine while // leaving headroom for transient SMTP back-pressure. const claimBatchSize = 16 // Run drives the scan loop until ctx is cancelled. The first tick is // the startup-drain pass mandated by `PLAN.md` §5.6. func (w *Worker) Run(ctx context.Context) error { if w == nil { return nil } logger := w.svc.deps.Logger.Named("worker") if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) { logger.Warn("initial mail outbox drain failed", zap.Error(err)) } ticker := time.NewTicker(w.svc.deps.Config.WorkerInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return nil case <-ticker.C: if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) { logger.Warn("mail outbox tick failed", zap.Error(err)) } } } } // Shutdown is a no-op: each per-row transaction is self-contained, so // a cancelled ctx above is enough to stop the loop. Any row already // inside a Send call finishes its commit (or rolls back on context // cancel) before the worker returns. func (w *Worker) Shutdown(_ context.Context) error { return nil } // Tick is exposed for tests so they can drive the worker without // timing dependencies. func (w *Worker) Tick(ctx context.Context) error { return w.tick(ctx) } // tick processes up to claimBatchSize rows. Each row is handled in its // own transaction so a slow SMTP send only holds one row lock at a // time. The loop exits as soon as a tick claims zero rows or ctx is // cancelled. Rows already handled in this tick are tracked in the // `seen` set and excluded from subsequent claims so a transient retry // scheduled with next_attempt_at in the past does not chew through a // delivery's MaxAttempts budget within a single tick. func (w *Worker) tick(ctx context.Context) error { seen := make([]uuid.UUID, 0, claimBatchSize) for range claimBatchSize { if ctx.Err() != nil { return ctx.Err() } more, processed, err := w.processOne(ctx, seen) if err != nil { return err } if !more { return nil } seen = append(seen, processed) } return nil } // processOne claims a single due row, dispatches it, and commits the // state transition. Returns more=false when no row was due, so the // caller can short-circuit the tick loop. The delivery_id of the // processed row is returned so the tick loop can skip it on // subsequent iterations. func (w *Worker) processOne(ctx context.Context, exclude []uuid.UUID) (bool, uuid.UUID, error) { tx, err := w.svc.deps.Store.BeginTx(ctx) if err != nil { return false, uuid.Nil, err } defer func() { // Rollback is a no-op after Commit; this catches every error // path inside the function. _ = tx.Rollback() }() claimed, err := w.svc.deps.Store.ClaimDue(ctx, tx, 1, exclude...) if err != nil { return false, uuid.Nil, err } if len(claimed) == 0 { return false, uuid.Nil, nil } c := claimed[0] logger := w.svc.deps.Logger.Named("worker").With( zap.String("delivery_id", c.Delivery.DeliveryID.String()), zap.String("template_id", c.Delivery.TemplateID), ) now := w.svc.deps.Now() addresses := make([]string, 0, len(c.Recipients)) for _, r := range c.Recipients { addresses = append(addresses, r.Address) } subject := "" if c.Payload.Subject != nil { subject = *c.Payload.Subject } out := OutboundMessage{ To: addresses, Subject: subject, ContentType: c.Payload.ContentType, Body: c.Payload.Body, } sendErr := w.svc.deps.SMTP.Send(ctx, out) finishedAt := w.svc.deps.Now() cycleAttempt := c.Delivery.Attempts + 1 if sendErr == nil { attemptNo, err := w.svc.deps.Store.RecordAttempt(ctx, tx, c.Delivery.DeliveryID, now, finishedAt, OutcomeSuccess, "") if err != nil { return false, uuid.Nil, err } if err := w.svc.deps.Store.MarkSent(ctx, tx, c.Delivery.DeliveryID, finishedAt); err != nil { return false, uuid.Nil, err } logger.Info("mail delivery sent", zap.Int32("cycle_attempt", cycleAttempt), zap.Int32("attempt_no", attemptNo), ) } else { permanent := IsPermanent(sendErr) outcome := OutcomeTransientError if permanent { outcome = OutcomePermanentError } attemptNo, err := w.svc.deps.Store.RecordAttempt(ctx, tx, c.Delivery.DeliveryID, now, finishedAt, outcome, sendErr.Error()) if err != nil { return false, uuid.Nil, err } maxAttempts := int32(w.svc.deps.Config.MaxAttempts) giveUp := permanent || cycleAttempt >= maxAttempts if giveUp { reason := sendErr.Error() if permanent { reason = "permanent: " + reason } if err := w.svc.deps.Store.MarkDeadLettered(ctx, tx, c.Delivery.DeliveryID, finishedAt, reason); err != nil { return false, uuid.Nil, err } logger.Warn("mail delivery dead-lettered", zap.Int32("cycle_attempt", cycleAttempt), zap.Int32("attempt_no", attemptNo), zap.Int32("max_attempts", maxAttempts), zap.Bool("permanent", permanent), zap.String("reason", reason), ) } else { nextAt := finishedAt.Add(nextBackoff(int(cycleAttempt))) if err := w.svc.deps.Store.ScheduleRetry(ctx, tx, c.Delivery.DeliveryID, finishedAt, nextAt, sendErr.Error()); err != nil { return false, uuid.Nil, err } logger.Info("mail delivery retry scheduled", zap.Int32("cycle_attempt", cycleAttempt), zap.Int32("attempt_no", attemptNo), zap.Time("next_attempt_at", nextAt), ) } } if err := tx.Commit(); err != nil { return false, uuid.Nil, err } if sendErr != nil { permanent := IsPermanent(sendErr) giveUp := permanent || (c.Delivery.Attempts+1) >= int32(w.svc.deps.Config.MaxAttempts) if giveUp { w.svc.deps.Admin.OnDeadLetter(ctx, c.Delivery.DeliveryID, c.Delivery.TemplateID, sendErr.Error()) } } return true, c.Delivery.DeliveryID, nil } // nextBackoff returns the wait between attempt N (1-indexed) and the // next try. The schedule grows exponentially up to backoffMax with a // uniform ±backoffJitter shake to prevent retry storms. func nextBackoff(attempt int) time.Duration { if attempt < 1 { attempt = 1 } scaled := float64(backoffBase) * math.Pow(backoffFactor, float64(attempt-1)) if scaled > float64(backoffMax) { scaled = float64(backoffMax) } // Symmetric jitter in [-backoffJitter, +backoffJitter]. jitter := (rand.Float64()*2 - 1) * backoffJitter final := scaled * (1 + jitter) if final < float64(backoffBase) { final = float64(backoffBase) } return time.Duration(final) } // Compile-time check that Worker satisfies the lifecycle interface // shape used elsewhere (Run + Shutdown). var _ interface { Run(context.Context) error Shutdown(context.Context) error } = (*Worker)(nil)