231 lines
7.1 KiB
Go
231 lines
7.1 KiB
Go
package mail
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"math"
|
|
"math/rand/v2"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Worker drains the mail outbox: per tick it walks due rows under
|
|
// `SELECT … FOR UPDATE SKIP LOCKED`, dispatches each through the SMTP
|
|
// sender, and atomically updates the delivery + attempt rows.
|
|
// Implements `internal/app.Component`.
|
|
type Worker struct {
|
|
svc *Service
|
|
}
|
|
|
|
// NewWorker constructs a Worker bound to svc.
|
|
func NewWorker(svc *Service) *Worker { return &Worker{svc: svc} }
|
|
|
|
// claimBatchSize bounds how many rows the worker processes per tick.
|
|
// 16 keeps each tick under a second on a developer machine while
|
|
// leaving headroom for transient SMTP back-pressure.
|
|
const claimBatchSize = 16
|
|
|
|
// Run drives the scan loop until ctx is cancelled. The first tick is
|
|
// the startup-drain pass mandated by `PLAN.md` §5.6.
|
|
func (w *Worker) Run(ctx context.Context) error {
|
|
if w == nil {
|
|
return nil
|
|
}
|
|
logger := w.svc.deps.Logger.Named("worker")
|
|
if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) {
|
|
logger.Warn("initial mail outbox drain failed", zap.Error(err))
|
|
}
|
|
ticker := time.NewTicker(w.svc.deps.Config.WorkerInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) {
|
|
logger.Warn("mail outbox tick failed", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Shutdown is a no-op: each per-row transaction is self-contained, so
|
|
// a cancelled ctx above is enough to stop the loop. Any row already
|
|
// inside a Send call finishes its commit (or rolls back on context
|
|
// cancel) before the worker returns.
|
|
func (w *Worker) Shutdown(_ context.Context) error { return nil }
|
|
|
|
// Tick is exposed for tests so they can drive the worker without
|
|
// timing dependencies.
|
|
func (w *Worker) Tick(ctx context.Context) error { return w.tick(ctx) }
|
|
|
|
// tick processes up to claimBatchSize rows. Each row is handled in its
|
|
// own transaction so a slow SMTP send only holds one row lock at a
|
|
// time. The loop exits as soon as a tick claims zero rows or ctx is
|
|
// cancelled. Rows already handled in this tick are tracked in the
|
|
// `seen` set and excluded from subsequent claims so a transient retry
|
|
// scheduled with next_attempt_at in the past does not chew through a
|
|
// delivery's MaxAttempts budget within a single tick.
|
|
func (w *Worker) tick(ctx context.Context) error {
|
|
seen := make([]uuid.UUID, 0, claimBatchSize)
|
|
for range claimBatchSize {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
more, processed, err := w.processOne(ctx, seen)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !more {
|
|
return nil
|
|
}
|
|
seen = append(seen, processed)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processOne claims a single due row, dispatches it, and commits the
|
|
// state transition. Returns more=false when no row was due, so the
|
|
// caller can short-circuit the tick loop. The delivery_id of the
|
|
// processed row is returned so the tick loop can skip it on
|
|
// subsequent iterations.
|
|
func (w *Worker) processOne(ctx context.Context, exclude []uuid.UUID) (bool, uuid.UUID, error) {
|
|
tx, err := w.svc.deps.Store.BeginTx(ctx)
|
|
if err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
defer func() {
|
|
// Rollback is a no-op after Commit; this catches every error
|
|
// path inside the function.
|
|
_ = tx.Rollback()
|
|
}()
|
|
|
|
claimed, err := w.svc.deps.Store.ClaimDue(ctx, tx, 1, exclude...)
|
|
if err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
if len(claimed) == 0 {
|
|
return false, uuid.Nil, nil
|
|
}
|
|
c := claimed[0]
|
|
logger := w.svc.deps.Logger.Named("worker").With(
|
|
zap.String("delivery_id", c.Delivery.DeliveryID.String()),
|
|
zap.String("template_id", c.Delivery.TemplateID),
|
|
)
|
|
|
|
now := w.svc.deps.Now()
|
|
addresses := make([]string, 0, len(c.Recipients))
|
|
for _, r := range c.Recipients {
|
|
addresses = append(addresses, r.Address)
|
|
}
|
|
subject := ""
|
|
if c.Payload.Subject != nil {
|
|
subject = *c.Payload.Subject
|
|
}
|
|
out := OutboundMessage{
|
|
To: addresses,
|
|
Subject: subject,
|
|
ContentType: c.Payload.ContentType,
|
|
Body: c.Payload.Body,
|
|
}
|
|
|
|
sendErr := w.svc.deps.SMTP.Send(ctx, out)
|
|
finishedAt := w.svc.deps.Now()
|
|
|
|
cycleAttempt := c.Delivery.Attempts + 1
|
|
if sendErr == nil {
|
|
attemptNo, err := w.svc.deps.Store.RecordAttempt(ctx, tx, c.Delivery.DeliveryID, now, finishedAt, OutcomeSuccess, "")
|
|
if err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
if err := w.svc.deps.Store.MarkSent(ctx, tx, c.Delivery.DeliveryID, finishedAt); err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
logger.Info("mail delivery sent",
|
|
zap.Int32("cycle_attempt", cycleAttempt),
|
|
zap.Int32("attempt_no", attemptNo),
|
|
)
|
|
} else {
|
|
permanent := IsPermanent(sendErr)
|
|
outcome := OutcomeTransientError
|
|
if permanent {
|
|
outcome = OutcomePermanentError
|
|
}
|
|
attemptNo, err := w.svc.deps.Store.RecordAttempt(ctx, tx, c.Delivery.DeliveryID, now, finishedAt, outcome, sendErr.Error())
|
|
if err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
|
|
maxAttempts := int32(w.svc.deps.Config.MaxAttempts)
|
|
giveUp := permanent || cycleAttempt >= maxAttempts
|
|
if giveUp {
|
|
reason := sendErr.Error()
|
|
if permanent {
|
|
reason = "permanent: " + reason
|
|
}
|
|
if err := w.svc.deps.Store.MarkDeadLettered(ctx, tx, c.Delivery.DeliveryID, finishedAt, reason); err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
logger.Warn("mail delivery dead-lettered",
|
|
zap.Int32("cycle_attempt", cycleAttempt),
|
|
zap.Int32("attempt_no", attemptNo),
|
|
zap.Int32("max_attempts", maxAttempts),
|
|
zap.Bool("permanent", permanent),
|
|
zap.String("reason", reason),
|
|
)
|
|
} else {
|
|
nextAt := finishedAt.Add(nextBackoff(int(cycleAttempt)))
|
|
if err := w.svc.deps.Store.ScheduleRetry(ctx, tx, c.Delivery.DeliveryID, finishedAt, nextAt, sendErr.Error()); err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
logger.Info("mail delivery retry scheduled",
|
|
zap.Int32("cycle_attempt", cycleAttempt),
|
|
zap.Int32("attempt_no", attemptNo),
|
|
zap.Time("next_attempt_at", nextAt),
|
|
)
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
|
|
if sendErr != nil {
|
|
permanent := IsPermanent(sendErr)
|
|
giveUp := permanent || (c.Delivery.Attempts+1) >= int32(w.svc.deps.Config.MaxAttempts)
|
|
if giveUp {
|
|
w.svc.deps.Admin.OnDeadLetter(ctx, c.Delivery.DeliveryID, c.Delivery.TemplateID, sendErr.Error())
|
|
}
|
|
}
|
|
return true, c.Delivery.DeliveryID, nil
|
|
}
|
|
|
|
// nextBackoff returns the wait between attempt N (1-indexed) and the
|
|
// next try. The schedule grows exponentially up to backoffMax with a
|
|
// uniform ±backoffJitter shake to prevent retry storms.
|
|
func nextBackoff(attempt int) time.Duration {
|
|
if attempt < 1 {
|
|
attempt = 1
|
|
}
|
|
scaled := float64(backoffBase) * math.Pow(backoffFactor, float64(attempt-1))
|
|
if scaled > float64(backoffMax) {
|
|
scaled = float64(backoffMax)
|
|
}
|
|
// Symmetric jitter in [-backoffJitter, +backoffJitter].
|
|
jitter := (rand.Float64()*2 - 1) * backoffJitter
|
|
final := scaled * (1 + jitter)
|
|
if final < float64(backoffBase) {
|
|
final = float64(backoffBase)
|
|
}
|
|
return time.Duration(final)
|
|
}
|
|
|
|
// Compile-time check that Worker satisfies the lifecycle interface
|
|
// shape used elsewhere (Run + Shutdown).
|
|
var _ interface {
|
|
Run(context.Context) error
|
|
Shutdown(context.Context) error
|
|
} = (*Worker)(nil)
|