119 lines
3.5 KiB
Go
119 lines
3.5 KiB
Go
package notification
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Worker drains the notification routes table: per tick it walks due
|
|
// rows under `SELECT … FOR UPDATE SKIP LOCKED`, dispatches each through
|
|
// the matching channel, and atomically updates the route status.
|
|
// Implements `internal/app.Component`.
|
|
type Worker struct {
|
|
svc *Service
|
|
}
|
|
|
|
// NewWorker constructs a Worker bound to svc.
|
|
func NewWorker(svc *Service) *Worker { return &Worker{svc: svc} }
|
|
|
|
// Run drives the scan loop until ctx is cancelled. The first tick is
|
|
// the startup-drain pass: rows queued before the process restart get
|
|
// retried immediately rather than waiting for the first interval.
|
|
func (w *Worker) Run(ctx context.Context) error {
|
|
if w == nil {
|
|
return nil
|
|
}
|
|
logger := w.svc.deps.Logger.Named("worker")
|
|
if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) {
|
|
logger.Warn("initial notification routes drain failed", zap.Error(err))
|
|
}
|
|
interval := w.svc.deps.Config.WorkerInterval
|
|
if interval <= 0 {
|
|
interval = 5 * time.Second
|
|
}
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) {
|
|
logger.Warn("notification routes tick failed", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Shutdown is a no-op: each per-row transaction is self-contained, so
|
|
// a cancelled ctx above the loop is enough to stop the worker.
|
|
func (w *Worker) Shutdown(_ context.Context) error { return nil }
|
|
|
|
// Tick is exposed for tests so they can drive the worker without
|
|
// timing dependencies.
|
|
func (w *Worker) Tick(ctx context.Context) error { return w.tick(ctx) }
|
|
|
|
// tick processes up to claimBatchSize rows. Each row is handled in its
|
|
// own transaction so a slow channel only holds one row lock at a time.
|
|
// The loop exits as soon as a tick claims zero rows or ctx is
|
|
// cancelled. Rows already handled in this tick are tracked in `seen`
|
|
// and excluded from subsequent claims so a transient retry scheduled
|
|
// with next_attempt_at <= now() does not chew through MaxAttempts in a
|
|
// single tick (mirrors the mail-worker pattern).
|
|
func (w *Worker) tick(ctx context.Context) error {
|
|
seen := make([]uuid.UUID, 0, claimBatchSize)
|
|
for range claimBatchSize {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
more, processed, err := w.processOne(ctx, seen)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !more {
|
|
return nil
|
|
}
|
|
seen = append(seen, processed)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processOne claims a single due route, dispatches it, and commits the
|
|
// state transition. Returns more=false when no row was due.
|
|
func (w *Worker) processOne(ctx context.Context, exclude []uuid.UUID) (bool, uuid.UUID, error) {
|
|
tx, err := w.svc.deps.Store.BeginTx(ctx)
|
|
if err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
defer func() { _ = tx.Rollback() }()
|
|
|
|
claimed, err := w.svc.deps.Store.ClaimDueRoutes(ctx, tx, 1, exclude...)
|
|
if err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
if len(claimed) == 0 {
|
|
return false, uuid.Nil, nil
|
|
}
|
|
c := claimed[0]
|
|
dispatchErr := w.svc.performDispatch(ctx, c)
|
|
at := w.svc.nowUTC()
|
|
if err := w.svc.finaliseDispatch(ctx, tx, c, dispatchErr, at); err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return false, uuid.Nil, err
|
|
}
|
|
return true, c.Route.RouteID, nil
|
|
}
|
|
|
|
// Compile-time check that Worker satisfies the lifecycle interface
|
|
// shape used elsewhere (Run + Shutdown).
|
|
var _ interface {
|
|
Run(context.Context) error
|
|
Shutdown(context.Context) error
|
|
} = (*Worker)(nil)
|