666 lines
21 KiB
Go
666 lines
21 KiB
Go
package mail
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"galaxy/backend/internal/postgres/jet/backend/model"
|
|
"galaxy/backend/internal/postgres/jet/backend/table"
|
|
|
|
"github.com/go-jet/jet/v2/postgres"
|
|
"github.com/go-jet/jet/v2/qrm"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// Store is the Postgres-backed query surface for the mail outbox
|
|
// (`mail_deliveries`, `mail_recipients`, `mail_attempts`,
|
|
// `mail_dead_letters`, `mail_payloads`). All queries are built through
|
|
// go-jet against the generated table bindings under
|
|
// `backend/internal/postgres/jet/backend/table`.
|
|
type Store struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// NewStore constructs a Store wrapping db.
|
|
func NewStore(db *sql.DB) *Store {
|
|
return &Store{db: db}
|
|
}
|
|
|
|
// Delivery mirrors a row in `backend.mail_deliveries`. Tests and
|
|
// admin endpoints work against this struct directly.
|
|
type Delivery struct {
|
|
DeliveryID uuid.UUID
|
|
TemplateID string
|
|
IdempotencyKey string
|
|
Status string
|
|
Attempts int32
|
|
NextAttemptAt *time.Time
|
|
PayloadID uuid.UUID
|
|
LastError string
|
|
CreatedAt time.Time
|
|
UpdatedAt time.Time
|
|
SentAt *time.Time
|
|
DeadLetteredAt *time.Time
|
|
}
|
|
|
|
// Attempt mirrors a row in `backend.mail_attempts`.
|
|
type Attempt struct {
|
|
AttemptID uuid.UUID
|
|
DeliveryID uuid.UUID
|
|
AttemptNo int32
|
|
StartedAt time.Time
|
|
FinishedAt *time.Time
|
|
Outcome string
|
|
Error string
|
|
}
|
|
|
|
// DeadLetter mirrors a row in `backend.mail_dead_letters`.
|
|
type DeadLetter struct {
|
|
DeadLetterID uuid.UUID
|
|
DeliveryID uuid.UUID
|
|
ArchivedAt time.Time
|
|
Reason string
|
|
}
|
|
|
|
// Payload mirrors a row in `backend.mail_payloads`. Body is the raw
|
|
// rendered bytes; Subject is nullable in the schema and is therefore a
|
|
// pointer here.
|
|
type Payload struct {
|
|
PayloadID uuid.UUID
|
|
ContentType string
|
|
Subject *string
|
|
Body []byte
|
|
CreatedAt time.Time
|
|
}
|
|
|
|
// Recipient mirrors a row in `backend.mail_recipients`.
|
|
type Recipient struct {
|
|
RecipientID uuid.UUID
|
|
DeliveryID uuid.UUID
|
|
Address string
|
|
Kind string
|
|
}
|
|
|
|
// EnqueueArgs aggregates the inputs to InsertEnqueue. Constructing the
|
|
// struct by name keeps the call site readable when the Service grows
|
|
// new optional fields (locale, headers, etc.).
|
|
type EnqueueArgs struct {
|
|
DeliveryID uuid.UUID
|
|
TemplateID string
|
|
IdempotencyKey string
|
|
Recipients []string
|
|
ContentType string
|
|
Subject string
|
|
Body []byte
|
|
}
|
|
|
|
// deliveryColumns lists the projection used by every read of
|
|
// `mail_deliveries`. The order matches model.MailDeliveries field
|
|
// layout for direct QRM scanning.
|
|
func deliveryColumns() postgres.ColumnList {
|
|
d := table.MailDeliveries
|
|
return postgres.ColumnList{
|
|
d.DeliveryID, d.TemplateID, d.IdempotencyKey, d.Status,
|
|
d.Attempts, d.NextAttemptAt, d.PayloadID, d.LastError,
|
|
d.CreatedAt, d.UpdatedAt, d.SentAt, d.DeadLetteredAt,
|
|
}
|
|
}
|
|
|
|
// InsertEnqueue persists a fresh delivery row together with its payload
|
|
// and recipients in a single transaction. The (template_id,
|
|
// idempotency_key) UNIQUE constraint handles duplicate enqueue: when
|
|
// the conflict triggers, the transaction rolls back the payload insert
|
|
// (so we do not leak orphaned payloads) and reports `inserted=false`
|
|
// to the caller.
|
|
func (s *Store) InsertEnqueue(ctx context.Context, args EnqueueArgs) (bool, error) {
|
|
var inserted bool
|
|
err := withTx(ctx, s.db, func(tx *sql.Tx) error {
|
|
payloadID := uuid.New()
|
|
payloadStmt := table.MailPayloads.INSERT(
|
|
table.MailPayloads.PayloadID,
|
|
table.MailPayloads.ContentType,
|
|
table.MailPayloads.Subject,
|
|
table.MailPayloads.Body,
|
|
).VALUES(payloadID, args.ContentType, args.Subject, args.Body)
|
|
if _, err := payloadStmt.ExecContext(ctx, tx); err != nil {
|
|
return fmt.Errorf("insert payload: %w", err)
|
|
}
|
|
|
|
deliveryStmt := table.MailDeliveries.INSERT(
|
|
table.MailDeliveries.DeliveryID,
|
|
table.MailDeliveries.TemplateID,
|
|
table.MailDeliveries.IdempotencyKey,
|
|
table.MailDeliveries.Status,
|
|
table.MailDeliveries.NextAttemptAt,
|
|
table.MailDeliveries.PayloadID,
|
|
).VALUES(
|
|
args.DeliveryID, args.TemplateID, args.IdempotencyKey, StatusPending,
|
|
postgres.NOW(), payloadID,
|
|
).
|
|
ON_CONFLICT(table.MailDeliveries.TemplateID, table.MailDeliveries.IdempotencyKey).
|
|
DO_NOTHING().
|
|
RETURNING(table.MailDeliveries.DeliveryID)
|
|
|
|
var stored model.MailDeliveries
|
|
if err := deliveryStmt.QueryContext(ctx, tx, &stored); err != nil {
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
// Idempotent re-enqueue. Roll back the transaction so the
|
|
// orphan payload insert does not survive.
|
|
return errIdempotentNoop
|
|
}
|
|
return fmt.Errorf("insert delivery: %w", err)
|
|
}
|
|
|
|
for _, addr := range args.Recipients {
|
|
recipientStmt := table.MailRecipients.INSERT(
|
|
table.MailRecipients.RecipientID,
|
|
table.MailRecipients.DeliveryID,
|
|
table.MailRecipients.Address,
|
|
table.MailRecipients.Kind,
|
|
).VALUES(uuid.New(), args.DeliveryID, addr, RecipientKindTo)
|
|
if _, err := recipientStmt.ExecContext(ctx, tx); err != nil {
|
|
return fmt.Errorf("insert recipient %q: %w", addr, err)
|
|
}
|
|
}
|
|
inserted = true
|
|
return nil
|
|
})
|
|
if errors.Is(err, errIdempotentNoop) {
|
|
return false, nil
|
|
}
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return inserted, nil
|
|
}
|
|
|
|
// errIdempotentNoop is an internal sentinel that tells withTx to roll
|
|
// back the transaction without surfacing an error to the caller. It
|
|
// must never escape this package — InsertEnqueue catches it on the
|
|
// way out.
|
|
var errIdempotentNoop = errors.New("mail store: idempotent noop")
|
|
|
|
// ClaimDue locks up to `limit` due rows with FOR UPDATE SKIP LOCKED
|
|
// and returns them with their full payload and recipient set. The
|
|
// supplied tx must be the worker's per-row transaction; the caller
|
|
// completes the work and commits. exclude is the list of delivery_ids
|
|
// already handled in the current tick — they are filtered out so a
|
|
// row whose retry lands at next_attempt_at <= now() is not re-claimed
|
|
// inside the same tick loop.
|
|
func (s *Store) ClaimDue(ctx context.Context, tx *sql.Tx, limit int, exclude ...uuid.UUID) ([]ClaimedDelivery, error) {
|
|
d := table.MailDeliveries
|
|
condition := d.Status.IN(postgres.String(StatusPending), postgres.String(StatusRetrying)).
|
|
AND(d.NextAttemptAt.IS_NULL().OR(d.NextAttemptAt.LT_EQ(postgres.NOW())))
|
|
if len(exclude) > 0 {
|
|
excludeExprs := make([]postgres.Expression, 0, len(exclude))
|
|
for _, id := range exclude {
|
|
excludeExprs = append(excludeExprs, postgres.UUID(id))
|
|
}
|
|
condition = condition.AND(d.DeliveryID.NOT_IN(excludeExprs...))
|
|
}
|
|
|
|
stmt := postgres.SELECT(deliveryColumns()).
|
|
FROM(d).
|
|
WHERE(condition).
|
|
ORDER_BY(postgres.COALESCE(d.NextAttemptAt, d.CreatedAt).ASC()).
|
|
LIMIT(int64(limit)).
|
|
FOR(postgres.UPDATE().SKIP_LOCKED())
|
|
|
|
var rows []model.MailDeliveries
|
|
if err := stmt.QueryContext(ctx, tx, &rows); err != nil {
|
|
return nil, fmt.Errorf("claim due: %w", err)
|
|
}
|
|
|
|
claimed := make([]ClaimedDelivery, 0, len(rows))
|
|
for _, row := range rows {
|
|
delivery := modelToDelivery(row)
|
|
payload, err := s.loadPayloadTx(ctx, tx, delivery.PayloadID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
recipients, err := s.listRecipientsTx(ctx, tx, delivery.DeliveryID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
claimed = append(claimed, ClaimedDelivery{
|
|
Delivery: delivery,
|
|
Payload: payload,
|
|
Recipients: recipients,
|
|
})
|
|
}
|
|
return claimed, nil
|
|
}
|
|
|
|
// ClaimedDelivery bundles a locked delivery row with its payload and
|
|
// recipients so the worker has everything it needs in one structure.
|
|
type ClaimedDelivery struct {
|
|
Delivery Delivery
|
|
Payload Payload
|
|
Recipients []Recipient
|
|
}
|
|
|
|
// RecordAttempt inserts a row into `mail_attempts` for the given
|
|
// delivery. attempt_no is derived from MAX(attempt_no) + 1 within the
|
|
// transaction, which keeps the column monotonic across resend cycles
|
|
// — the delivery's wire-visible `attempts` field counts only the
|
|
// current cycle (and resets on resend), while `mail_attempts` stays
|
|
// append-only forensic history.
|
|
func (s *Store) RecordAttempt(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, startedAt time.Time, finishedAt time.Time, outcome string, errMsg string) (int32, error) {
|
|
a := table.MailAttempts
|
|
|
|
// Read the current max attempt_no for this delivery first; the
|
|
// surrounding worker transaction guarantees no concurrent inserts on
|
|
// the same delivery_id, so a simple read-then-write is sufficient
|
|
// (and avoids the awkward correlated subquery inside INSERT...VALUES
|
|
// that jet does not parenthesise).
|
|
maxStmt := postgres.SELECT(postgres.MAXi(a.AttemptNo).AS("max")).
|
|
FROM(a).
|
|
WHERE(a.DeliveryID.EQ(postgres.UUID(deliveryID)))
|
|
|
|
var maxRow struct {
|
|
Max *int32 `alias:"max"`
|
|
}
|
|
if err := maxStmt.QueryContext(ctx, tx, &maxRow); err != nil {
|
|
return 0, fmt.Errorf("record attempt: read max attempt_no: %w", err)
|
|
}
|
|
nextNo := int32(1)
|
|
if maxRow.Max != nil {
|
|
nextNo = *maxRow.Max + 1
|
|
}
|
|
|
|
insertStmt := a.INSERT(
|
|
a.AttemptID, a.DeliveryID, a.AttemptNo,
|
|
a.StartedAt, a.FinishedAt, a.Outcome, a.Error,
|
|
).VALUES(
|
|
uuid.New(), deliveryID, nextNo,
|
|
startedAt, finishedAt, outcome, errMsg,
|
|
).RETURNING(a.AttemptNo)
|
|
|
|
var inserted model.MailAttempts
|
|
if err := insertStmt.QueryContext(ctx, tx, &inserted); err != nil {
|
|
return 0, fmt.Errorf("record attempt: %w", err)
|
|
}
|
|
return inserted.AttemptNo, nil
|
|
}
|
|
|
|
// MarkSent flips the delivery to status='sent' and stamps sent_at.
|
|
func (s *Store) MarkSent(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, at time.Time) error {
|
|
d := table.MailDeliveries
|
|
stmt := d.UPDATE().
|
|
SET(
|
|
d.Status.SET(postgres.String(StatusSent)),
|
|
d.Attempts.SET(d.Attempts.ADD(postgres.Int(1))),
|
|
d.SentAt.SET(postgres.TimestampzT(at)),
|
|
d.UpdatedAt.SET(postgres.TimestampzT(at)),
|
|
d.NextAttemptAt.SET(postgres.TimestampzExp(postgres.NULL)),
|
|
d.LastError.SET(postgres.String("")),
|
|
).
|
|
WHERE(d.DeliveryID.EQ(postgres.UUID(deliveryID)))
|
|
if _, err := stmt.ExecContext(ctx, tx); err != nil {
|
|
return fmt.Errorf("mark sent: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ScheduleRetry flips the delivery to status='retrying', bumps
|
|
// attempts, and arms next_attempt_at.
|
|
func (s *Store) ScheduleRetry(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, at time.Time, nextAt time.Time, errMsg string) error {
|
|
d := table.MailDeliveries
|
|
stmt := d.UPDATE().
|
|
SET(
|
|
d.Status.SET(postgres.String(StatusRetrying)),
|
|
d.Attempts.SET(d.Attempts.ADD(postgres.Int(1))),
|
|
d.NextAttemptAt.SET(postgres.TimestampzT(nextAt)),
|
|
d.UpdatedAt.SET(postgres.TimestampzT(at)),
|
|
d.LastError.SET(postgres.String(errMsg)),
|
|
).
|
|
WHERE(d.DeliveryID.EQ(postgres.UUID(deliveryID)))
|
|
if _, err := stmt.ExecContext(ctx, tx); err != nil {
|
|
return fmt.Errorf("schedule retry: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MarkDeadLettered moves the delivery to the terminal `dead_lettered`
|
|
// state and inserts the matching row into `mail_dead_letters` under
|
|
// the same transaction.
|
|
func (s *Store) MarkDeadLettered(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID, at time.Time, reason string) error {
|
|
d := table.MailDeliveries
|
|
updateStmt := d.UPDATE().
|
|
SET(
|
|
d.Status.SET(postgres.String(StatusDeadLettered)),
|
|
d.Attempts.SET(d.Attempts.ADD(postgres.Int(1))),
|
|
d.DeadLetteredAt.SET(postgres.TimestampzT(at)),
|
|
d.UpdatedAt.SET(postgres.TimestampzT(at)),
|
|
d.NextAttemptAt.SET(postgres.TimestampzExp(postgres.NULL)),
|
|
d.LastError.SET(postgres.String(reason)),
|
|
).
|
|
WHERE(d.DeliveryID.EQ(postgres.UUID(deliveryID)))
|
|
if _, err := updateStmt.ExecContext(ctx, tx); err != nil {
|
|
return fmt.Errorf("mark dead-lettered: %w", err)
|
|
}
|
|
|
|
dl := table.MailDeadLetters
|
|
insertStmt := dl.INSERT(
|
|
dl.DeadLetterID, dl.DeliveryID, dl.ArchivedAt, dl.Reason,
|
|
).VALUES(uuid.New(), deliveryID, at, reason)
|
|
if _, err := insertStmt.ExecContext(ctx, tx); err != nil {
|
|
return fmt.Errorf("insert dead-letter: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CountByStatus returns a map keyed by the four status values so the
|
|
// worker can publish `mail_outbox_depth{state}` without scanning the
|
|
// whole table per metric tick.
|
|
func (s *Store) CountByStatus(ctx context.Context) (map[string]int64, error) {
|
|
d := table.MailDeliveries
|
|
stmt := postgres.SELECT(
|
|
d.Status,
|
|
postgres.COUNT(postgres.STAR).AS("count"),
|
|
).FROM(d).GROUP_BY(d.Status)
|
|
|
|
var rows []struct {
|
|
MailDeliveries model.MailDeliveries
|
|
Count int64 `alias:"count"`
|
|
}
|
|
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
|
|
return nil, fmt.Errorf("count by status: %w", err)
|
|
}
|
|
out := map[string]int64{
|
|
StatusPending: 0,
|
|
StatusRetrying: 0,
|
|
StatusSent: 0,
|
|
StatusDeadLettered: 0,
|
|
}
|
|
for _, row := range rows {
|
|
out[row.MailDeliveries.Status] = row.Count
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// GetDelivery loads a single row by primary key. ErrDeliveryNotFound
|
|
// is returned when no row matches.
|
|
func (s *Store) GetDelivery(ctx context.Context, deliveryID uuid.UUID) (Delivery, error) {
|
|
stmt := postgres.SELECT(deliveryColumns()).
|
|
FROM(table.MailDeliveries).
|
|
WHERE(table.MailDeliveries.DeliveryID.EQ(postgres.UUID(deliveryID))).
|
|
LIMIT(1)
|
|
|
|
var row model.MailDeliveries
|
|
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
return Delivery{}, ErrDeliveryNotFound
|
|
}
|
|
return Delivery{}, fmt.Errorf("get delivery: %w", err)
|
|
}
|
|
return modelToDelivery(row), nil
|
|
}
|
|
|
|
// ListDeliveries returns the deliveries page in newest-first order
|
|
// together with the total row count.
|
|
func (s *Store) ListDeliveries(ctx context.Context, offset, limit int) ([]Delivery, int64, error) {
|
|
total, err := countAll(ctx, s.db, table.MailDeliveries)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("count deliveries: %w", err)
|
|
}
|
|
d := table.MailDeliveries
|
|
stmt := postgres.SELECT(deliveryColumns()).
|
|
FROM(d).
|
|
ORDER_BY(d.CreatedAt.DESC(), d.DeliveryID.DESC()).
|
|
LIMIT(int64(limit)).OFFSET(int64(offset))
|
|
|
|
var rows []model.MailDeliveries
|
|
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
|
|
return nil, 0, fmt.Errorf("list deliveries: %w", err)
|
|
}
|
|
out := make([]Delivery, 0, len(rows))
|
|
for _, row := range rows {
|
|
out = append(out, modelToDelivery(row))
|
|
}
|
|
return out, total, nil
|
|
}
|
|
|
|
// ListAttempts returns every attempt for the given delivery, ordered
|
|
// by attempt_no.
|
|
func (s *Store) ListAttempts(ctx context.Context, deliveryID uuid.UUID) ([]Attempt, error) {
|
|
a := table.MailAttempts
|
|
stmt := postgres.SELECT(
|
|
a.AttemptID, a.DeliveryID, a.AttemptNo,
|
|
a.StartedAt, a.FinishedAt, a.Outcome, a.Error,
|
|
).
|
|
FROM(a).
|
|
WHERE(a.DeliveryID.EQ(postgres.UUID(deliveryID))).
|
|
ORDER_BY(a.AttemptNo.ASC())
|
|
|
|
var rows []model.MailAttempts
|
|
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
|
|
return nil, fmt.Errorf("list attempts: %w", err)
|
|
}
|
|
out := make([]Attempt, 0, len(rows))
|
|
for _, row := range rows {
|
|
out = append(out, modelToAttempt(row))
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// ListDeadLetters returns the dead-letter page newest-first.
|
|
func (s *Store) ListDeadLetters(ctx context.Context, offset, limit int) ([]DeadLetter, int64, error) {
|
|
total, err := countAll(ctx, s.db, table.MailDeadLetters)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("count dead-letters: %w", err)
|
|
}
|
|
dl := table.MailDeadLetters
|
|
stmt := postgres.SELECT(
|
|
dl.DeadLetterID, dl.DeliveryID, dl.ArchivedAt, dl.Reason,
|
|
).
|
|
FROM(dl).
|
|
ORDER_BY(dl.ArchivedAt.DESC(), dl.DeadLetterID.DESC()).
|
|
LIMIT(int64(limit)).OFFSET(int64(offset))
|
|
|
|
var rows []model.MailDeadLetters
|
|
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
|
|
return nil, 0, fmt.Errorf("list dead-letters: %w", err)
|
|
}
|
|
out := make([]DeadLetter, 0, len(rows))
|
|
for _, row := range rows {
|
|
out = append(out, DeadLetter{
|
|
DeadLetterID: row.DeadLetterID,
|
|
DeliveryID: row.DeliveryID,
|
|
ArchivedAt: row.ArchivedAt,
|
|
Reason: row.Reason,
|
|
})
|
|
}
|
|
return out, total, nil
|
|
}
|
|
|
|
// ResendNonSent re-arms the delivery for another attempt cycle. The
|
|
// `status <> 'sent'` clause makes it the storage-level guard that
|
|
// matches the contract: ErrResendOnSent is returned when the row is
|
|
// already terminal-sent. ErrDeliveryNotFound surfaces when no row
|
|
// matches.
|
|
func (s *Store) ResendNonSent(ctx context.Context, deliveryID uuid.UUID, at time.Time) (Delivery, error) {
|
|
var d Delivery
|
|
err := withTx(ctx, s.db, func(tx *sql.Tx) error {
|
|
md := table.MailDeliveries
|
|
lockStmt := postgres.SELECT(md.Status).
|
|
FROM(md).
|
|
WHERE(md.DeliveryID.EQ(postgres.UUID(deliveryID))).
|
|
FOR(postgres.UPDATE())
|
|
|
|
var locked model.MailDeliveries
|
|
if err := lockStmt.QueryContext(ctx, tx, &locked); err != nil {
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
return ErrDeliveryNotFound
|
|
}
|
|
return fmt.Errorf("lock delivery: %w", err)
|
|
}
|
|
if locked.Status == StatusSent {
|
|
return ErrResendOnSent
|
|
}
|
|
updateStmt := md.UPDATE().
|
|
SET(
|
|
md.Status.SET(postgres.String(StatusPending)),
|
|
md.Attempts.SET(postgres.Int(0)),
|
|
md.NextAttemptAt.SET(postgres.TimestampzT(at)),
|
|
md.DeadLetteredAt.SET(postgres.TimestampzExp(postgres.NULL)),
|
|
md.LastError.SET(postgres.String("")),
|
|
md.UpdatedAt.SET(postgres.TimestampzT(at)),
|
|
).
|
|
WHERE(md.DeliveryID.EQ(postgres.UUID(deliveryID)))
|
|
if _, err := updateStmt.ExecContext(ctx, tx); err != nil {
|
|
return fmt.Errorf("re-arm delivery: %w", err)
|
|
}
|
|
reloadStmt := postgres.SELECT(deliveryColumns()).
|
|
FROM(md).
|
|
WHERE(md.DeliveryID.EQ(postgres.UUID(deliveryID))).
|
|
LIMIT(1)
|
|
var refreshed model.MailDeliveries
|
|
if err := reloadStmt.QueryContext(ctx, tx, &refreshed); err != nil {
|
|
return fmt.Errorf("reload delivery: %w", err)
|
|
}
|
|
d = modelToDelivery(refreshed)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return Delivery{}, err
|
|
}
|
|
return d, nil
|
|
}
|
|
|
|
func (s *Store) loadPayloadTx(ctx context.Context, tx *sql.Tx, payloadID uuid.UUID) (Payload, error) {
|
|
p := table.MailPayloads
|
|
stmt := postgres.SELECT(
|
|
p.PayloadID, p.ContentType, p.Subject, p.Body, p.CreatedAt,
|
|
).FROM(p).
|
|
WHERE(p.PayloadID.EQ(postgres.UUID(payloadID))).
|
|
LIMIT(1)
|
|
|
|
var row model.MailPayloads
|
|
if err := stmt.QueryContext(ctx, tx, &row); err != nil {
|
|
return Payload{}, fmt.Errorf("load payload: %w", err)
|
|
}
|
|
return Payload{
|
|
PayloadID: row.PayloadID,
|
|
ContentType: row.ContentType,
|
|
Subject: row.Subject,
|
|
Body: row.Body,
|
|
CreatedAt: row.CreatedAt,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Store) listRecipientsTx(ctx context.Context, tx *sql.Tx, deliveryID uuid.UUID) ([]Recipient, error) {
|
|
r := table.MailRecipients
|
|
stmt := postgres.SELECT(
|
|
r.RecipientID, r.DeliveryID, r.Address, r.Kind,
|
|
).FROM(r).
|
|
WHERE(r.DeliveryID.EQ(postgres.UUID(deliveryID))).
|
|
ORDER_BY(r.RecipientID.ASC())
|
|
|
|
var rows []model.MailRecipients
|
|
if err := stmt.QueryContext(ctx, tx, &rows); err != nil {
|
|
return nil, fmt.Errorf("list recipients: %w", err)
|
|
}
|
|
out := make([]Recipient, 0, len(rows))
|
|
for _, row := range rows {
|
|
out = append(out, Recipient{
|
|
RecipientID: row.RecipientID,
|
|
DeliveryID: row.DeliveryID,
|
|
Address: row.Address,
|
|
Kind: row.Kind,
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// withTx wraps fn in a Postgres transaction. fn's return value
|
|
// determines commit (nil) vs rollback (non-nil). Rollback errors are
|
|
// swallowed when fn already returned an error, since the latter is
|
|
// more actionable.
|
|
func withTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
|
|
tx, err := db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("mail store: begin tx: %w", err)
|
|
}
|
|
if err := fn(tx); err != nil {
|
|
_ = tx.Rollback()
|
|
return err
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("mail store: commit tx: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BeginTx exposes the package-level transaction helper to the worker
|
|
// so it can scope ClaimDue + RecordAttempt + Mark* under a single
|
|
// commit boundary.
|
|
func (s *Store) BeginTx(ctx context.Context) (*sql.Tx, error) {
|
|
return s.db.BeginTx(ctx, nil)
|
|
}
|
|
|
|
// modelToDelivery projects a generated model row onto the public
|
|
// Delivery struct. Pointer fields are copied so callers cannot mutate
|
|
// the underlying scan buffer.
|
|
func modelToDelivery(row model.MailDeliveries) Delivery {
|
|
d := Delivery{
|
|
DeliveryID: row.DeliveryID,
|
|
TemplateID: row.TemplateID,
|
|
IdempotencyKey: row.IdempotencyKey,
|
|
Status: row.Status,
|
|
Attempts: row.Attempts,
|
|
PayloadID: row.PayloadID,
|
|
LastError: row.LastError,
|
|
CreatedAt: row.CreatedAt,
|
|
UpdatedAt: row.UpdatedAt,
|
|
}
|
|
if row.NextAttemptAt != nil {
|
|
t := *row.NextAttemptAt
|
|
d.NextAttemptAt = &t
|
|
}
|
|
if row.SentAt != nil {
|
|
t := *row.SentAt
|
|
d.SentAt = &t
|
|
}
|
|
if row.DeadLetteredAt != nil {
|
|
t := *row.DeadLetteredAt
|
|
d.DeadLetteredAt = &t
|
|
}
|
|
return d
|
|
}
|
|
|
|
// modelToAttempt projects a generated model row onto the public Attempt
|
|
// struct.
|
|
func modelToAttempt(row model.MailAttempts) Attempt {
|
|
a := Attempt{
|
|
AttemptID: row.AttemptID,
|
|
DeliveryID: row.DeliveryID,
|
|
AttemptNo: row.AttemptNo,
|
|
StartedAt: row.StartedAt,
|
|
Outcome: row.Outcome,
|
|
Error: row.Error,
|
|
}
|
|
if row.FinishedAt != nil {
|
|
t := *row.FinishedAt
|
|
a.FinishedAt = &t
|
|
}
|
|
return a
|
|
}
|
|
|
|
// countAll runs `SELECT COUNT(*) FROM <tbl>` through jet and returns
|
|
// the result as int64. The destination uses an alias-tagged scalar so
|
|
// QRM can map the un-prefixed alias produced by AS("count").
|
|
func countAll(ctx context.Context, db qrm.DB, tbl postgres.ReadableTable) (int64, error) {
|
|
stmt := postgres.SELECT(postgres.COUNT(postgres.STAR).AS("count")).FROM(tbl)
|
|
var dest struct {
|
|
Count int64 `alias:"count"`
|
|
}
|
|
if err := stmt.QueryContext(ctx, db, &dest); err != nil {
|
|
return 0, err
|
|
}
|
|
return dest.Count, nil
|
|
}
|