Files
galaxy-game/backend/internal/mail/store.go
T
2026-05-06 10:14:55 +03:00

666 lines
21 KiB
Go

package mail
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"galaxy/backend/internal/postgres/jet/backend/model"
"galaxy/backend/internal/postgres/jet/backend/table"
"github.com/go-jet/jet/v2/postgres"
"github.com/go-jet/jet/v2/qrm"
"github.com/google/uuid"
)
// Store is the Postgres-backed query surface for the mail outbox
// (`mail_deliveries`, `mail_recipients`, `mail_attempts`,
// `mail_dead_letters`, `mail_payloads`). All queries are built through
// go-jet against the generated table bindings under
// `backend/internal/postgres/jet/backend/table`.
type Store struct {
db *sql.DB
}
// NewStore constructs a Store wrapping db.
func NewStore(db *sql.DB) *Store {
return &Store{db: db}
}
// Delivery mirrors a row in `backend.mail_deliveries`. Tests and
// admin endpoints work against this struct directly.
type Delivery struct {
DeliveryID uuid.UUID
TemplateID string
IdempotencyKey string
Status string
Attempts int32
NextAttemptAt *time.Time
PayloadID uuid.UUID
LastError string
CreatedAt time.Time
UpdatedAt time.Time
SentAt *time.Time
DeadLetteredAt *time.Time
}
// Attempt mirrors a row in `backend.mail_attempts`.
type Attempt struct {
AttemptID uuid.UUID
DeliveryID uuid.UUID
AttemptNo int32
StartedAt time.Time
FinishedAt *time.Time
Outcome string
Error string
}
// DeadLetter mirrors a row in `backend.mail_dead_letters`.
type DeadLetter struct {
DeadLetterID uuid.UUID
DeliveryID uuid.UUID
ArchivedAt time.Time
Reason string
}
// Payload mirrors a row in `backend.mail_payloads`. Body is the raw
// rendered bytes; Subject is nullable in the schema and is therefore a
// pointer here.
type Payload struct {
PayloadID uuid.UUID
ContentType string
Subject *string
Body []byte
CreatedAt time.Time
}
// Recipient mirrors a row in `backend.mail_recipients`.
type Recipient struct {
RecipientID uuid.UUID
DeliveryID uuid.UUID
Address string
Kind string
}
// EnqueueArgs aggregates the inputs to InsertEnqueue. Constructing the
// struct by name keeps the call site readable when the Service grows
// new optional fields (locale, headers, etc.).
type EnqueueArgs struct {
DeliveryID uuid.UUID
TemplateID string
IdempotencyKey string
Recipients []string
ContentType string
Subject string
Body []byte
}
// deliveryColumns lists the projection used by every read of
// `mail_deliveries`. The order matches model.MailDeliveries field
// layout for direct QRM scanning.
func deliveryColumns() postgres.ColumnList {
d := table.MailDeliveries
return postgres.ColumnList{
d.DeliveryID, d.TemplateID, d.IdempotencyKey, d.Status,
d.Attempts, d.NextAttemptAt, d.PayloadID, d.LastError,
d.CreatedAt, d.UpdatedAt, d.SentAt, d.DeadLetteredAt,
}
}
// InsertEnqueue persists a fresh delivery row together with its payload
// and recipients in a single transaction. The (template_id,
// idempotency_key) UNIQUE constraint handles duplicate enqueue: when
// the conflict triggers, the transaction rolls back the payload insert
// (so we do not leak orphaned payloads) and reports `inserted=false`
// to the caller.
func (s *Store) InsertEnqueue(ctx context.Context, args EnqueueArgs) (bool, error) {
var inserted bool
err := withTx(ctx, s.db, func(tx *sql.Tx) error {
payloadID := uuid.New()
payloadStmt := table.MailPayloads.INSERT(
table.MailPayloads.PayloadID,
table.MailPayloads.ContentType,
table.MailPayloads.Subject,
table.MailPayloads.Body,
).VALUES(payloadID, args.ContentType, args.Subject, args.Body)
if _, err := payloadStmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("insert payload: %w", err)
}
deliveryStmt := table.MailDeliveries.INSERT(
table.MailDeliveries.DeliveryID,
table.MailDeliveries.TemplateID,
table.MailDeliveries.IdempotencyKey,
table.MailDeliveries.Status,
table.MailDeliveries.NextAttemptAt,
table.MailDeliveries.PayloadID,
).VALUES(
args.DeliveryID, args.TemplateID, args.IdempotencyKey, StatusPending,
postgres.NOW(), payloadID,
).
ON_CONFLICT(table.MailDeliveries.TemplateID, table.MailDeliveries.IdempotencyKey).
DO_NOTHING().
RETURNING(table.MailDeliveries.DeliveryID)
var stored model.MailDeliveries
if err := deliveryStmt.QueryContext(ctx, tx, &stored); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
// Idempotent re-enqueue. Roll back the transaction so the
// orphan payload insert does not survive.
return errIdempotentNoop
}
return fmt.Errorf("insert delivery: %w", err)
}
for _, addr := range args.Recipients {
recipientStmt := table.MailRecipients.INSERT(
table.MailRecipients.RecipientID,
table.MailRecipients.DeliveryID,
table.MailRecipients.Address,
table.MailRecipients.Kind,
).VALUES(uuid.New(), args.DeliveryID, addr, RecipientKindTo)
if _, err := recipientStmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("insert recipient %q: %w", addr, err)
}
}
inserted = true
return nil
})
if errors.Is(err, errIdempotentNoop) {
return false, nil
}
if err != nil {
return false, err
}
return inserted, nil
}
// errIdempotentNoop is an internal sentinel that tells withTx to roll
// back the transaction without surfacing an error to the caller. It
// must never escape this package — InsertEnqueue catches it on the
// way out.
var errIdempotentNoop = errors.New("mail store: idempotent noop")
// ClaimDue locks up to `limit` due rows with FOR UPDATE SKIP LOCKED
// and returns them with their full payload and recipient set. The
// supplied tx must be the worker's per-row transaction; the caller
// completes the work and commits. exclude is the list of delivery_ids
// already handled in the current tick — they are filtered out so a
// row whose retry lands at next_attempt_at <= now() is not re-claimed
// inside the same tick loop.
func (s *Store) ClaimDue(ctx context.Context, tx *sql.Tx, limit int, exclude ...uuid.UUID) ([]ClaimedDelivery, error) {
d := table.MailDeliveries
condition := d.Status.IN(postgres.String(StatusPending), postgres.String(StatusRetrying)).
AND(d.NextAttemptAt.IS_NULL().OR(d.NextAttemptAt.LT_EQ(postgres.NOW())))
if len(exclude) > 0 {
excludeExprs := make([]postgres.Expression, 0, len(exclude))
for _, id := range exclude {
excludeExprs = append(excludeExprs, postgres.UUID(id))
}
condition = condition.AND(d.DeliveryID.NOT_IN(excludeExprs...))
}
stmt := postgres.SELECT(deliveryColumns()).
FROM(d).
WHERE(condition).
ORDER_BY(postgres.COALESCE(d.NextAttemptAt, d.CreatedAt).ASC()).
LIMIT(int64(limit)).
FOR(postgres.UPDATE().SKIP_LOCKED())
var rows []model.MailDeliveries
if err := stmt.QueryContext(ctx, tx, &rows); err != nil {
return nil, fmt.Errorf("claim due: %w", err)
}
claimed := make([]ClaimedDelivery, 0, len(rows))
for _, row := range rows {
delivery := modelToDelivery(row)
payload, err := s.loadPayloadTx(ctx, tx, delivery.PayloadID)
if err != nil {
return nil, err
}
recipients, err := s.listRecipientsTx(ctx, tx, delivery.DeliveryID)
if err != nil {
return nil, err
}
claimed = append(claimed, ClaimedDelivery{
Delivery: delivery,
Payload: payload,
Recipients: recipients,
})
}
return claimed, nil
}
// ClaimedDelivery bundles a locked delivery row with its payload and
// recipients so the worker has everything it needs in one structure.
type ClaimedDelivery struct {
Delivery Delivery
Payload Payload
Recipients []Recipient
}
// RecordAttempt inserts a row into `mail_attempts` for the given
// delivery. attempt_no is derived from MAX(attempt_no) + 1 within the
// transaction, which keeps the column monotonic across resend cycles
// — the delivery's wire-visible `attempts` field counts only the
// current cycle (and resets on resend), while `mail_attempts` stays
// append-only forensic history.
func (s *Store) RecordAttempt(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, startedAt time.Time, finishedAt time.Time, outcome string, errMsg string) (int32, error) {
a := table.MailAttempts
// Read the current max attempt_no for this delivery first; the
// surrounding worker transaction guarantees no concurrent inserts on
// the same delivery_id, so a simple read-then-write is sufficient
// (and avoids the awkward correlated subquery inside INSERT...VALUES
// that jet does not parenthesise).
maxStmt := postgres.SELECT(postgres.MAXi(a.AttemptNo).AS("max")).
FROM(a).
WHERE(a.DeliveryID.EQ(postgres.UUID(deliveryID)))
var maxRow struct {
Max *int32 `alias:"max"`
}
if err := maxStmt.QueryContext(ctx, tx, &maxRow); err != nil {
return 0, fmt.Errorf("record attempt: read max attempt_no: %w", err)
}
nextNo := int32(1)
if maxRow.Max != nil {
nextNo = *maxRow.Max + 1
}
insertStmt := a.INSERT(
a.AttemptID, a.DeliveryID, a.AttemptNo,
a.StartedAt, a.FinishedAt, a.Outcome, a.Error,
).VALUES(
uuid.New(), deliveryID, nextNo,
startedAt, finishedAt, outcome, errMsg,
).RETURNING(a.AttemptNo)
var inserted model.MailAttempts
if err := insertStmt.QueryContext(ctx, tx, &inserted); err != nil {
return 0, fmt.Errorf("record attempt: %w", err)
}
return inserted.AttemptNo, nil
}
// MarkSent flips the delivery to status='sent' and stamps sent_at.
func (s *Store) MarkSent(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, at time.Time) error {
d := table.MailDeliveries
stmt := d.UPDATE().
SET(
d.Status.SET(postgres.String(StatusSent)),
d.Attempts.SET(d.Attempts.ADD(postgres.Int(1))),
d.SentAt.SET(postgres.TimestampzT(at)),
d.UpdatedAt.SET(postgres.TimestampzT(at)),
d.NextAttemptAt.SET(postgres.TimestampzExp(postgres.NULL)),
d.LastError.SET(postgres.String("")),
).
WHERE(d.DeliveryID.EQ(postgres.UUID(deliveryID)))
if _, err := stmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("mark sent: %w", err)
}
return nil
}
// ScheduleRetry flips the delivery to status='retrying', bumps
// attempts, and arms next_attempt_at.
func (s *Store) ScheduleRetry(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, at time.Time, nextAt time.Time, errMsg string) error {
d := table.MailDeliveries
stmt := d.UPDATE().
SET(
d.Status.SET(postgres.String(StatusRetrying)),
d.Attempts.SET(d.Attempts.ADD(postgres.Int(1))),
d.NextAttemptAt.SET(postgres.TimestampzT(nextAt)),
d.UpdatedAt.SET(postgres.TimestampzT(at)),
d.LastError.SET(postgres.String(errMsg)),
).
WHERE(d.DeliveryID.EQ(postgres.UUID(deliveryID)))
if _, err := stmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("schedule retry: %w", err)
}
return nil
}
// MarkDeadLettered moves the delivery to the terminal `dead_lettered`
// state and inserts the matching row into `mail_dead_letters` under
// the same transaction.
func (s *Store) MarkDeadLettered(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, at time.Time, reason string) error {
d := table.MailDeliveries
updateStmt := d.UPDATE().
SET(
d.Status.SET(postgres.String(StatusDeadLettered)),
d.Attempts.SET(d.Attempts.ADD(postgres.Int(1))),
d.DeadLetteredAt.SET(postgres.TimestampzT(at)),
d.UpdatedAt.SET(postgres.TimestampzT(at)),
d.NextAttemptAt.SET(postgres.TimestampzExp(postgres.NULL)),
d.LastError.SET(postgres.String(reason)),
).
WHERE(d.DeliveryID.EQ(postgres.UUID(deliveryID)))
if _, err := updateStmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("mark dead-lettered: %w", err)
}
dl := table.MailDeadLetters
insertStmt := dl.INSERT(
dl.DeadLetterID, dl.DeliveryID, dl.ArchivedAt, dl.Reason,
).VALUES(uuid.New(), deliveryID, at, reason)
if _, err := insertStmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("insert dead-letter: %w", err)
}
return nil
}
// CountByStatus returns a map keyed by the four status values so the
// worker can publish `mail_outbox_depth{state}` without scanning the
// whole table per metric tick.
func (s *Store) CountByStatus(ctx context.Context) (map[string]int64, error) {
d := table.MailDeliveries
stmt := postgres.SELECT(
d.Status,
postgres.COUNT(postgres.STAR).AS("count"),
).FROM(d).GROUP_BY(d.Status)
var rows []struct {
MailDeliveries model.MailDeliveries
Count int64 `alias:"count"`
}
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
return nil, fmt.Errorf("count by status: %w", err)
}
out := map[string]int64{
StatusPending: 0,
StatusRetrying: 0,
StatusSent: 0,
StatusDeadLettered: 0,
}
for _, row := range rows {
out[row.MailDeliveries.Status] = row.Count
}
return out, nil
}
// GetDelivery loads a single row by primary key. ErrDeliveryNotFound
// is returned when no row matches.
func (s *Store) GetDelivery(ctx context.Context, deliveryID uuid.UUID) (Delivery, error) {
stmt := postgres.SELECT(deliveryColumns()).
FROM(table.MailDeliveries).
WHERE(table.MailDeliveries.DeliveryID.EQ(postgres.UUID(deliveryID))).
LIMIT(1)
var row model.MailDeliveries
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return Delivery{}, ErrDeliveryNotFound
}
return Delivery{}, fmt.Errorf("get delivery: %w", err)
}
return modelToDelivery(row), nil
}
// ListDeliveries returns the deliveries page in newest-first order
// together with the total row count.
func (s *Store) ListDeliveries(ctx context.Context, offset, limit int) ([]Delivery, int64, error) {
total, err := countAll(ctx, s.db, table.MailDeliveries)
if err != nil {
return nil, 0, fmt.Errorf("count deliveries: %w", err)
}
d := table.MailDeliveries
stmt := postgres.SELECT(deliveryColumns()).
FROM(d).
ORDER_BY(d.CreatedAt.DESC(), d.DeliveryID.DESC()).
LIMIT(int64(limit)).OFFSET(int64(offset))
var rows []model.MailDeliveries
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
return nil, 0, fmt.Errorf("list deliveries: %w", err)
}
out := make([]Delivery, 0, len(rows))
for _, row := range rows {
out = append(out, modelToDelivery(row))
}
return out, total, nil
}
// ListAttempts returns every attempt for the given delivery, ordered
// by attempt_no.
func (s *Store) ListAttempts(ctx context.Context, deliveryID uuid.UUID) ([]Attempt, error) {
a := table.MailAttempts
stmt := postgres.SELECT(
a.AttemptID, a.DeliveryID, a.AttemptNo,
a.StartedAt, a.FinishedAt, a.Outcome, a.Error,
).
FROM(a).
WHERE(a.DeliveryID.EQ(postgres.UUID(deliveryID))).
ORDER_BY(a.AttemptNo.ASC())
var rows []model.MailAttempts
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
return nil, fmt.Errorf("list attempts: %w", err)
}
out := make([]Attempt, 0, len(rows))
for _, row := range rows {
out = append(out, modelToAttempt(row))
}
return out, nil
}
// ListDeadLetters returns the dead-letter page newest-first.
func (s *Store) ListDeadLetters(ctx context.Context, offset, limit int) ([]DeadLetter, int64, error) {
total, err := countAll(ctx, s.db, table.MailDeadLetters)
if err != nil {
return nil, 0, fmt.Errorf("count dead-letters: %w", err)
}
dl := table.MailDeadLetters
stmt := postgres.SELECT(
dl.DeadLetterID, dl.DeliveryID, dl.ArchivedAt, dl.Reason,
).
FROM(dl).
ORDER_BY(dl.ArchivedAt.DESC(), dl.DeadLetterID.DESC()).
LIMIT(int64(limit)).OFFSET(int64(offset))
var rows []model.MailDeadLetters
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
return nil, 0, fmt.Errorf("list dead-letters: %w", err)
}
out := make([]DeadLetter, 0, len(rows))
for _, row := range rows {
out = append(out, DeadLetter{
DeadLetterID: row.DeadLetterID,
DeliveryID: row.DeliveryID,
ArchivedAt: row.ArchivedAt,
Reason: row.Reason,
})
}
return out, total, nil
}
// ResendNonSent re-arms the delivery for another attempt cycle. The
// `status <> 'sent'` clause makes it the storage-level guard that
// matches the contract: ErrResendOnSent is returned when the row is
// already terminal-sent. ErrDeliveryNotFound surfaces when no row
// matches.
func (s *Store) ResendNonSent(ctx context.Context, deliveryID uuid.UUID, at time.Time) (Delivery, error) {
var d Delivery
err := withTx(ctx, s.db, func(tx *sql.Tx) error {
md := table.MailDeliveries
lockStmt := postgres.SELECT(md.Status).
FROM(md).
WHERE(md.DeliveryID.EQ(postgres.UUID(deliveryID))).
FOR(postgres.UPDATE())
var locked model.MailDeliveries
if err := lockStmt.QueryContext(ctx, tx, &locked); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return ErrDeliveryNotFound
}
return fmt.Errorf("lock delivery: %w", err)
}
if locked.Status == StatusSent {
return ErrResendOnSent
}
updateStmt := md.UPDATE().
SET(
md.Status.SET(postgres.String(StatusPending)),
md.Attempts.SET(postgres.Int(0)),
md.NextAttemptAt.SET(postgres.TimestampzT(at)),
md.DeadLetteredAt.SET(postgres.TimestampzExp(postgres.NULL)),
md.LastError.SET(postgres.String("")),
md.UpdatedAt.SET(postgres.TimestampzT(at)),
).
WHERE(md.DeliveryID.EQ(postgres.UUID(deliveryID)))
if _, err := updateStmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("re-arm delivery: %w", err)
}
reloadStmt := postgres.SELECT(deliveryColumns()).
FROM(md).
WHERE(md.DeliveryID.EQ(postgres.UUID(deliveryID))).
LIMIT(1)
var refreshed model.MailDeliveries
if err := reloadStmt.QueryContext(ctx, tx, &refreshed); err != nil {
return fmt.Errorf("reload delivery: %w", err)
}
d = modelToDelivery(refreshed)
return nil
})
if err != nil {
return Delivery{}, err
}
return d, nil
}
func (s *Store) loadPayloadTx(ctx context.Context, tx *sql.Tx, payloadID uuid.UUID) (Payload, error) {
p := table.MailPayloads
stmt := postgres.SELECT(
p.PayloadID, p.ContentType, p.Subject, p.Body, p.CreatedAt,
).FROM(p).
WHERE(p.PayloadID.EQ(postgres.UUID(payloadID))).
LIMIT(1)
var row model.MailPayloads
if err := stmt.QueryContext(ctx, tx, &row); err != nil {
return Payload{}, fmt.Errorf("load payload: %w", err)
}
return Payload{
PayloadID: row.PayloadID,
ContentType: row.ContentType,
Subject: row.Subject,
Body: row.Body,
CreatedAt: row.CreatedAt,
}, nil
}
func (s *Store) listRecipientsTx(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID) ([]Recipient, error) {
r := table.MailRecipients
stmt := postgres.SELECT(
r.RecipientID, r.DeliveryID, r.Address, r.Kind,
).FROM(r).
WHERE(r.DeliveryID.EQ(postgres.UUID(deliveryID))).
ORDER_BY(r.RecipientID.ASC())
var rows []model.MailRecipients
if err := stmt.QueryContext(ctx, tx, &rows); err != nil {
return nil, fmt.Errorf("list recipients: %w", err)
}
out := make([]Recipient, 0, len(rows))
for _, row := range rows {
out = append(out, Recipient{
RecipientID: row.RecipientID,
DeliveryID: row.DeliveryID,
Address: row.Address,
Kind: row.Kind,
})
}
return out, nil
}
// withTx wraps fn in a Postgres transaction. fn's return value
// determines commit (nil) vs rollback (non-nil). Rollback errors are
// swallowed when fn already returned an error, since the latter is
// more actionable.
func withTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("mail store: begin tx: %w", err)
}
if err := fn(tx); err != nil {
_ = tx.Rollback()
return err
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("mail store: commit tx: %w", err)
}
return nil
}
// BeginTx exposes the package-level transaction helper to the worker
// so it can scope ClaimDue + RecordAttempt + Mark* under a single
// commit boundary.
func (s *Store) BeginTx(ctx context.Context) (*sql.Tx, error) {
return s.db.BeginTx(ctx, nil)
}
// modelToDelivery projects a generated model row onto the public
// Delivery struct. Pointer fields are copied so callers cannot mutate
// the underlying scan buffer.
func modelToDelivery(row model.MailDeliveries) Delivery {
d := Delivery{
DeliveryID: row.DeliveryID,
TemplateID: row.TemplateID,
IdempotencyKey: row.IdempotencyKey,
Status: row.Status,
Attempts: row.Attempts,
PayloadID: row.PayloadID,
LastError: row.LastError,
CreatedAt: row.CreatedAt,
UpdatedAt: row.UpdatedAt,
}
if row.NextAttemptAt != nil {
t := *row.NextAttemptAt
d.NextAttemptAt = &t
}
if row.SentAt != nil {
t := *row.SentAt
d.SentAt = &t
}
if row.DeadLetteredAt != nil {
t := *row.DeadLetteredAt
d.DeadLetteredAt = &t
}
return d
}
// modelToAttempt projects a generated model row onto the public Attempt
// struct.
func modelToAttempt(row model.MailAttempts) Attempt {
a := Attempt{
AttemptID: row.AttemptID,
DeliveryID: row.DeliveryID,
AttemptNo: row.AttemptNo,
StartedAt: row.StartedAt,
Outcome: row.Outcome,
Error: row.Error,
}
if row.FinishedAt != nil {
t := *row.FinishedAt
a.FinishedAt = &t
}
return a
}
// countAll runs `SELECT COUNT(*) FROM <tbl>` through jet and returns
// the result as int64. The destination uses an alias-tagged scalar so
// QRM can map the un-prefixed alias produced by AS("count").
func countAll(ctx context.Context, db qrm.DB, tbl postgres.ReadableTable) (int64, error) {
stmt := postgres.SELECT(postgres.COUNT(postgres.STAR).AS("count")).FROM(tbl)
var dest struct {
Count int64 `alias:"count"`
}
if err := stmt.QueryContext(ctx, db, &dest); err != nil {
return 0, err
}
return dest.Count, nil
}