package mail import ( "context" "database/sql" "errors" "fmt" "time" "galaxy/backend/internal/postgres/jet/backend/model" "galaxy/backend/internal/postgres/jet/backend/table" "github.com/go-jet/jet/v2/postgres" "github.com/go-jet/jet/v2/qrm" "github.com/google/uuid" ) // Store is the Postgres-backed query surface for the mail outbox // (`mail_deliveries`, `mail_recipients`, `mail_attempts`, // `mail_dead_letters`, `mail_payloads`). All queries are built through // go-jet against the generated table bindings under // `backend/internal/postgres/jet/backend/table`. type Store struct { db *sql.DB } // NewStore constructs a Store wrapping db. func NewStore(db *sql.DB) *Store { return &Store{db: db} } // Delivery mirrors a row in `backend.mail_deliveries`. Tests and // admin endpoints work against this struct directly. type Delivery struct { DeliveryID uuid.UUID TemplateID string IdempotencyKey string Status string Attempts int32 NextAttemptAt *time.Time PayloadID uuid.UUID LastError string CreatedAt time.Time UpdatedAt time.Time SentAt *time.Time DeadLetteredAt *time.Time } // Attempt mirrors a row in `backend.mail_attempts`. type Attempt struct { AttemptID uuid.UUID DeliveryID uuid.UUID AttemptNo int32 StartedAt time.Time FinishedAt *time.Time Outcome string Error string } // DeadLetter mirrors a row in `backend.mail_dead_letters`. type DeadLetter struct { DeadLetterID uuid.UUID DeliveryID uuid.UUID ArchivedAt time.Time Reason string } // Payload mirrors a row in `backend.mail_payloads`. Body is the raw // rendered bytes; Subject is nullable in the schema and is therefore a // pointer here. type Payload struct { PayloadID uuid.UUID ContentType string Subject *string Body []byte CreatedAt time.Time } // Recipient mirrors a row in `backend.mail_recipients`. type Recipient struct { RecipientID uuid.UUID DeliveryID uuid.UUID Address string Kind string } // EnqueueArgs aggregates the inputs to InsertEnqueue. Constructing the // struct by name keeps the call site readable when the Service grows // new optional fields (locale, headers, etc.). type EnqueueArgs struct { DeliveryID uuid.UUID TemplateID string IdempotencyKey string Recipients []string ContentType string Subject string Body []byte } // deliveryColumns lists the projection used by every read of // `mail_deliveries`. The order matches model.MailDeliveries field // layout for direct QRM scanning. func deliveryColumns() postgres.ColumnList { d := table.MailDeliveries return postgres.ColumnList{ d.DeliveryID, d.TemplateID, d.IdempotencyKey, d.Status, d.Attempts, d.NextAttemptAt, d.PayloadID, d.LastError, d.CreatedAt, d.UpdatedAt, d.SentAt, d.DeadLetteredAt, } } // InsertEnqueue persists a fresh delivery row together with its payload // and recipients in a single transaction. The (template_id, // idempotency_key) UNIQUE constraint handles duplicate enqueue: when // the conflict triggers, the transaction rolls back the payload insert // (so we do not leak orphaned payloads) and reports `inserted=false` // to the caller. func (s *Store) InsertEnqueue(ctx context.Context, args EnqueueArgs) (bool, error) { var inserted bool err := withTx(ctx, s.db, func(tx *sql.Tx) error { payloadID := uuid.New() payloadStmt := table.MailPayloads.INSERT( table.MailPayloads.PayloadID, table.MailPayloads.ContentType, table.MailPayloads.Subject, table.MailPayloads.Body, ).VALUES(payloadID, args.ContentType, args.Subject, args.Body) if _, err := payloadStmt.ExecContext(ctx, tx); err != nil { return fmt.Errorf("insert payload: %w", err) } deliveryStmt := table.MailDeliveries.INSERT( table.MailDeliveries.DeliveryID, table.MailDeliveries.TemplateID, table.MailDeliveries.IdempotencyKey, table.MailDeliveries.Status, table.MailDeliveries.NextAttemptAt, table.MailDeliveries.PayloadID, ).VALUES( args.DeliveryID, args.TemplateID, args.IdempotencyKey, StatusPending, postgres.NOW(), payloadID, ). ON_CONFLICT(table.MailDeliveries.TemplateID, table.MailDeliveries.IdempotencyKey). DO_NOTHING(). RETURNING(table.MailDeliveries.DeliveryID) var stored model.MailDeliveries if err := deliveryStmt.QueryContext(ctx, tx, &stored); err != nil { if errors.Is(err, qrm.ErrNoRows) { // Idempotent re-enqueue. Roll back the transaction so the // orphan payload insert does not survive. return errIdempotentNoop } return fmt.Errorf("insert delivery: %w", err) } for _, addr := range args.Recipients { recipientStmt := table.MailRecipients.INSERT( table.MailRecipients.RecipientID, table.MailRecipients.DeliveryID, table.MailRecipients.Address, table.MailRecipients.Kind, ).VALUES(uuid.New(), args.DeliveryID, addr, RecipientKindTo) if _, err := recipientStmt.ExecContext(ctx, tx); err != nil { return fmt.Errorf("insert recipient %q: %w", addr, err) } } inserted = true return nil }) if errors.Is(err, errIdempotentNoop) { return false, nil } if err != nil { return false, err } return inserted, nil } // errIdempotentNoop is an internal sentinel that tells withTx to roll // back the transaction without surfacing an error to the caller. It // must never escape this package — InsertEnqueue catches it on the // way out. var errIdempotentNoop = errors.New("mail store: idempotent noop") // ClaimDue locks up to `limit` due rows with FOR UPDATE SKIP LOCKED // and returns them with their full payload and recipient set. The // supplied tx must be the worker's per-row transaction; the caller // completes the work and commits. exclude is the list of delivery_ids // already handled in the current tick — they are filtered out so a // row whose retry lands at next_attempt_at <= now() is not re-claimed // inside the same tick loop. func (s *Store) ClaimDue(ctx context.Context, tx *sql.Tx, limit int, exclude ...uuid.UUID) ([]ClaimedDelivery, error) { d := table.MailDeliveries condition := d.Status.IN(postgres.String(StatusPending), postgres.String(StatusRetrying)). AND(d.NextAttemptAt.IS_NULL().OR(d.NextAttemptAt.LT_EQ(postgres.NOW()))) if len(exclude) > 0 { excludeExprs := make([]postgres.Expression, 0, len(exclude)) for _, id := range exclude { excludeExprs = append(excludeExprs, postgres.UUID(id)) } condition = condition.AND(d.DeliveryID.NOT_IN(excludeExprs...)) } stmt := postgres.SELECT(deliveryColumns()). FROM(d). WHERE(condition). ORDER_BY(postgres.COALESCE(d.NextAttemptAt, d.CreatedAt).ASC()). LIMIT(int64(limit)). FOR(postgres.UPDATE().SKIP_LOCKED()) var rows []model.MailDeliveries if err := stmt.QueryContext(ctx, tx, &rows); err != nil { return nil, fmt.Errorf("claim due: %w", err) } claimed := make([]ClaimedDelivery, 0, len(rows)) for _, row := range rows { delivery := modelToDelivery(row) payload, err := s.loadPayloadTx(ctx, tx, delivery.PayloadID) if err != nil { return nil, err } recipients, err := s.listRecipientsTx(ctx, tx, delivery.DeliveryID) if err != nil { return nil, err } claimed = append(claimed, ClaimedDelivery{ Delivery: delivery, Payload: payload, Recipients: recipients, }) } return claimed, nil } // ClaimedDelivery bundles a locked delivery row with its payload and // recipients so the worker has everything it needs in one structure. type ClaimedDelivery struct { Delivery Delivery Payload Payload Recipients []Recipient } // RecordAttempt inserts a row into `mail_attempts` for the given // delivery. attempt_no is derived from MAX(attempt_no) + 1 within the // transaction, which keeps the column monotonic across resend cycles // — the delivery's wire-visible `attempts` field counts only the // current cycle (and resets on resend), while `mail_attempts` stays // append-only forensic history. func (s *Store) RecordAttempt(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, startedAt time.Time, finishedAt time.Time, outcome string, errMsg string) (int32, error) { a := table.MailAttempts // Read the current max attempt_no for this delivery first; the // surrounding worker transaction guarantees no concurrent inserts on // the same delivery_id, so a simple read-then-write is sufficient // (and avoids the awkward correlated subquery inside INSERT...VALUES // that jet does not parenthesise). maxStmt := postgres.SELECT(postgres.MAXi(a.AttemptNo).AS("max")). FROM(a). WHERE(a.DeliveryID.EQ(postgres.UUID(deliveryID))) var maxRow struct { Max *int32 `alias:"max"` } if err := maxStmt.QueryContext(ctx, tx, &maxRow); err != nil { return 0, fmt.Errorf("record attempt: read max attempt_no: %w", err) } nextNo := int32(1) if maxRow.Max != nil { nextNo = *maxRow.Max + 1 } insertStmt := a.INSERT( a.AttemptID, a.DeliveryID, a.AttemptNo, a.StartedAt, a.FinishedAt, a.Outcome, a.Error, ).VALUES( uuid.New(), deliveryID, nextNo, startedAt, finishedAt, outcome, errMsg, ).RETURNING(a.AttemptNo) var inserted model.MailAttempts if err := insertStmt.QueryContext(ctx, tx, &inserted); err != nil { return 0, fmt.Errorf("record attempt: %w", err) } return inserted.AttemptNo, nil } // MarkSent flips the delivery to status='sent' and stamps sent_at. func (s *Store) MarkSent(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, at time.Time) error { d := table.MailDeliveries stmt := d.UPDATE(). SET( d.Status.SET(postgres.String(StatusSent)), d.Attempts.SET(d.Attempts.ADD(postgres.Int(1))), d.SentAt.SET(postgres.TimestampzT(at)), d.UpdatedAt.SET(postgres.TimestampzT(at)), d.NextAttemptAt.SET(postgres.TimestampzExp(postgres.NULL)), d.LastError.SET(postgres.String("")), ). WHERE(d.DeliveryID.EQ(postgres.UUID(deliveryID))) if _, err := stmt.ExecContext(ctx, tx); err != nil { return fmt.Errorf("mark sent: %w", err) } return nil } // ScheduleRetry flips the delivery to status='retrying', bumps // attempts, and arms next_attempt_at. func (s *Store) ScheduleRetry(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, at time.Time, nextAt time.Time, errMsg string) error { d := table.MailDeliveries stmt := d.UPDATE(). SET( d.Status.SET(postgres.String(StatusRetrying)), d.Attempts.SET(d.Attempts.ADD(postgres.Int(1))), d.NextAttemptAt.SET(postgres.TimestampzT(nextAt)), d.UpdatedAt.SET(postgres.TimestampzT(at)), d.LastError.SET(postgres.String(errMsg)), ). WHERE(d.DeliveryID.EQ(postgres.UUID(deliveryID))) if _, err := stmt.ExecContext(ctx, tx); err != nil { return fmt.Errorf("schedule retry: %w", err) } return nil } // MarkDeadLettered moves the delivery to the terminal `dead_lettered` // state and inserts the matching row into `mail_dead_letters` under // the same transaction. func (s *Store) MarkDeadLettered(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, at time.Time, reason string) error { d := table.MailDeliveries updateStmt := d.UPDATE(). SET( d.Status.SET(postgres.String(StatusDeadLettered)), d.Attempts.SET(d.Attempts.ADD(postgres.Int(1))), d.DeadLetteredAt.SET(postgres.TimestampzT(at)), d.UpdatedAt.SET(postgres.TimestampzT(at)), d.NextAttemptAt.SET(postgres.TimestampzExp(postgres.NULL)), d.LastError.SET(postgres.String(reason)), ). WHERE(d.DeliveryID.EQ(postgres.UUID(deliveryID))) if _, err := updateStmt.ExecContext(ctx, tx); err != nil { return fmt.Errorf("mark dead-lettered: %w", err) } dl := table.MailDeadLetters insertStmt := dl.INSERT( dl.DeadLetterID, dl.DeliveryID, dl.ArchivedAt, dl.Reason, ).VALUES(uuid.New(), deliveryID, at, reason) if _, err := insertStmt.ExecContext(ctx, tx); err != nil { return fmt.Errorf("insert dead-letter: %w", err) } return nil } // CountByStatus returns a map keyed by the four status values so the // worker can publish `mail_outbox_depth{state}` without scanning the // whole table per metric tick. func (s *Store) CountByStatus(ctx context.Context) (map[string]int64, error) { d := table.MailDeliveries stmt := postgres.SELECT( d.Status, postgres.COUNT(postgres.STAR).AS("count"), ).FROM(d).GROUP_BY(d.Status) var rows []struct { MailDeliveries model.MailDeliveries Count int64 `alias:"count"` } if err := stmt.QueryContext(ctx, s.db, &rows); err != nil { return nil, fmt.Errorf("count by status: %w", err) } out := map[string]int64{ StatusPending: 0, StatusRetrying: 0, StatusSent: 0, StatusDeadLettered: 0, } for _, row := range rows { out[row.MailDeliveries.Status] = row.Count } return out, nil } // GetDelivery loads a single row by primary key. ErrDeliveryNotFound // is returned when no row matches. func (s *Store) GetDelivery(ctx context.Context, deliveryID uuid.UUID) (Delivery, error) { stmt := postgres.SELECT(deliveryColumns()). FROM(table.MailDeliveries). WHERE(table.MailDeliveries.DeliveryID.EQ(postgres.UUID(deliveryID))). LIMIT(1) var row model.MailDeliveries if err := stmt.QueryContext(ctx, s.db, &row); err != nil { if errors.Is(err, qrm.ErrNoRows) { return Delivery{}, ErrDeliveryNotFound } return Delivery{}, fmt.Errorf("get delivery: %w", err) } return modelToDelivery(row), nil } // ListDeliveries returns the deliveries page in newest-first order // together with the total row count. func (s *Store) ListDeliveries(ctx context.Context, offset, limit int) ([]Delivery, int64, error) { total, err := countAll(ctx, s.db, table.MailDeliveries) if err != nil { return nil, 0, fmt.Errorf("count deliveries: %w", err) } d := table.MailDeliveries stmt := postgres.SELECT(deliveryColumns()). FROM(d). ORDER_BY(d.CreatedAt.DESC(), d.DeliveryID.DESC()). LIMIT(int64(limit)).OFFSET(int64(offset)) var rows []model.MailDeliveries if err := stmt.QueryContext(ctx, s.db, &rows); err != nil { return nil, 0, fmt.Errorf("list deliveries: %w", err) } out := make([]Delivery, 0, len(rows)) for _, row := range rows { out = append(out, modelToDelivery(row)) } return out, total, nil } // ListAttempts returns every attempt for the given delivery, ordered // by attempt_no. func (s *Store) ListAttempts(ctx context.Context, deliveryID uuid.UUID) ([]Attempt, error) { a := table.MailAttempts stmt := postgres.SELECT( a.AttemptID, a.DeliveryID, a.AttemptNo, a.StartedAt, a.FinishedAt, a.Outcome, a.Error, ). FROM(a). WHERE(a.DeliveryID.EQ(postgres.UUID(deliveryID))). ORDER_BY(a.AttemptNo.ASC()) var rows []model.MailAttempts if err := stmt.QueryContext(ctx, s.db, &rows); err != nil { return nil, fmt.Errorf("list attempts: %w", err) } out := make([]Attempt, 0, len(rows)) for _, row := range rows { out = append(out, modelToAttempt(row)) } return out, nil } // ListDeadLetters returns the dead-letter page newest-first. func (s *Store) ListDeadLetters(ctx context.Context, offset, limit int) ([]DeadLetter, int64, error) { total, err := countAll(ctx, s.db, table.MailDeadLetters) if err != nil { return nil, 0, fmt.Errorf("count dead-letters: %w", err) } dl := table.MailDeadLetters stmt := postgres.SELECT( dl.DeadLetterID, dl.DeliveryID, dl.ArchivedAt, dl.Reason, ). FROM(dl). ORDER_BY(dl.ArchivedAt.DESC(), dl.DeadLetterID.DESC()). LIMIT(int64(limit)).OFFSET(int64(offset)) var rows []model.MailDeadLetters if err := stmt.QueryContext(ctx, s.db, &rows); err != nil { return nil, 0, fmt.Errorf("list dead-letters: %w", err) } out := make([]DeadLetter, 0, len(rows)) for _, row := range rows { out = append(out, DeadLetter{ DeadLetterID: row.DeadLetterID, DeliveryID: row.DeliveryID, ArchivedAt: row.ArchivedAt, Reason: row.Reason, }) } return out, total, nil } // ResendNonSent re-arms the delivery for another attempt cycle. The // `status <> 'sent'` clause makes it the storage-level guard that // matches the contract: ErrResendOnSent is returned when the row is // already terminal-sent. ErrDeliveryNotFound surfaces when no row // matches. func (s *Store) ResendNonSent(ctx context.Context, deliveryID uuid.UUID, at time.Time) (Delivery, error) { var d Delivery err := withTx(ctx, s.db, func(tx *sql.Tx) error { md := table.MailDeliveries lockStmt := postgres.SELECT(md.Status). FROM(md). WHERE(md.DeliveryID.EQ(postgres.UUID(deliveryID))). FOR(postgres.UPDATE()) var locked model.MailDeliveries if err := lockStmt.QueryContext(ctx, tx, &locked); err != nil { if errors.Is(err, qrm.ErrNoRows) { return ErrDeliveryNotFound } return fmt.Errorf("lock delivery: %w", err) } if locked.Status == StatusSent { return ErrResendOnSent } updateStmt := md.UPDATE(). SET( md.Status.SET(postgres.String(StatusPending)), md.Attempts.SET(postgres.Int(0)), md.NextAttemptAt.SET(postgres.TimestampzT(at)), md.DeadLetteredAt.SET(postgres.TimestampzExp(postgres.NULL)), md.LastError.SET(postgres.String("")), md.UpdatedAt.SET(postgres.TimestampzT(at)), ). WHERE(md.DeliveryID.EQ(postgres.UUID(deliveryID))) if _, err := updateStmt.ExecContext(ctx, tx); err != nil { return fmt.Errorf("re-arm delivery: %w", err) } reloadStmt := postgres.SELECT(deliveryColumns()). FROM(md). WHERE(md.DeliveryID.EQ(postgres.UUID(deliveryID))). LIMIT(1) var refreshed model.MailDeliveries if err := reloadStmt.QueryContext(ctx, tx, &refreshed); err != nil { return fmt.Errorf("reload delivery: %w", err) } d = modelToDelivery(refreshed) return nil }) if err != nil { return Delivery{}, err } return d, nil } func (s *Store) loadPayloadTx(ctx context.Context, tx *sql.Tx, payloadID uuid.UUID) (Payload, error) { p := table.MailPayloads stmt := postgres.SELECT( p.PayloadID, p.ContentType, p.Subject, p.Body, p.CreatedAt, ).FROM(p). WHERE(p.PayloadID.EQ(postgres.UUID(payloadID))). LIMIT(1) var row model.MailPayloads if err := stmt.QueryContext(ctx, tx, &row); err != nil { return Payload{}, fmt.Errorf("load payload: %w", err) } return Payload{ PayloadID: row.PayloadID, ContentType: row.ContentType, Subject: row.Subject, Body: row.Body, CreatedAt: row.CreatedAt, }, nil } func (s *Store) listRecipientsTx(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID) ([]Recipient, error) { r := table.MailRecipients stmt := postgres.SELECT( r.RecipientID, r.DeliveryID, r.Address, r.Kind, ).FROM(r). WHERE(r.DeliveryID.EQ(postgres.UUID(deliveryID))). ORDER_BY(r.RecipientID.ASC()) var rows []model.MailRecipients if err := stmt.QueryContext(ctx, tx, &rows); err != nil { return nil, fmt.Errorf("list recipients: %w", err) } out := make([]Recipient, 0, len(rows)) for _, row := range rows { out = append(out, Recipient{ RecipientID: row.RecipientID, DeliveryID: row.DeliveryID, Address: row.Address, Kind: row.Kind, }) } return out, nil } // withTx wraps fn in a Postgres transaction. fn's return value // determines commit (nil) vs rollback (non-nil). Rollback errors are // swallowed when fn already returned an error, since the latter is // more actionable. func withTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error { tx, err := db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("mail store: begin tx: %w", err) } if err := fn(tx); err != nil { _ = tx.Rollback() return err } if err := tx.Commit(); err != nil { return fmt.Errorf("mail store: commit tx: %w", err) } return nil } // BeginTx exposes the package-level transaction helper to the worker // so it can scope ClaimDue + RecordAttempt + Mark* under a single // commit boundary. func (s *Store) BeginTx(ctx context.Context) (*sql.Tx, error) { return s.db.BeginTx(ctx, nil) } // modelToDelivery projects a generated model row onto the public // Delivery struct. Pointer fields are copied so callers cannot mutate // the underlying scan buffer. func modelToDelivery(row model.MailDeliveries) Delivery { d := Delivery{ DeliveryID: row.DeliveryID, TemplateID: row.TemplateID, IdempotencyKey: row.IdempotencyKey, Status: row.Status, Attempts: row.Attempts, PayloadID: row.PayloadID, LastError: row.LastError, CreatedAt: row.CreatedAt, UpdatedAt: row.UpdatedAt, } if row.NextAttemptAt != nil { t := *row.NextAttemptAt d.NextAttemptAt = &t } if row.SentAt != nil { t := *row.SentAt d.SentAt = &t } if row.DeadLetteredAt != nil { t := *row.DeadLetteredAt d.DeadLetteredAt = &t } return d } // modelToAttempt projects a generated model row onto the public Attempt // struct. func modelToAttempt(row model.MailAttempts) Attempt { a := Attempt{ AttemptID: row.AttemptID, DeliveryID: row.DeliveryID, AttemptNo: row.AttemptNo, StartedAt: row.StartedAt, Outcome: row.Outcome, Error: row.Error, } if row.FinishedAt != nil { t := *row.FinishedAt a.FinishedAt = &t } return a } // countAll runs `SELECT COUNT(*) FROM ` through jet and returns // the result as int64. The destination uses an alias-tagged scalar so // QRM can map the un-prefixed alias produced by AS("count"). func countAll(ctx context.Context, db qrm.DB, tbl postgres.ReadableTable) (int64, error) { stmt := postgres.SELECT(postgres.COUNT(postgres.STAR).AS("count")).FROM(tbl) var dest struct { Count int64 `alias:"count"` } if err := stmt.QueryContext(ctx, db, &dest); err != nil { return 0, err } return dest.Count, nil }