Files
2026-05-07 00:58:53 +03:00

351 lines
9.3 KiB
Go

package mail_test
import (
"context"
"database/sql"
"errors"
"net/url"
"testing"
"time"
"galaxy/backend/internal/mail"
backendpg "galaxy/backend/internal/postgres"
pgshared "galaxy/postgres"
"github.com/google/uuid"
testcontainers "github.com/testcontainers/testcontainers-go"
tcpostgres "github.com/testcontainers/testcontainers-go/modules/postgres"
"github.com/testcontainers/testcontainers-go/wait"
)
const (
pgImage = "postgres:16-alpine"
pgUser = "galaxy"
pgPassword = "galaxy"
pgDatabase = "galaxy_backend"
pgSchema = "backend"
pgStartup = 90 * time.Second
pgOpTO = 10 * time.Second
)
// startPostgres mirrors the auth_e2e_test scaffolding: spin up
// Postgres, apply migrations, return *sql.DB.
func startPostgres(t *testing.T) *sql.DB {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)
pgContainer, err := tcpostgres.Run(ctx, pgImage,
tcpostgres.WithDatabase(pgDatabase),
tcpostgres.WithUsername(pgUser),
tcpostgres.WithPassword(pgPassword),
testcontainers.WithWaitStrategy(
wait.ForLog("database system is ready to accept connections").
WithOccurrence(2).
WithStartupTimeout(pgStartup),
),
)
if err != nil {
t.Skipf("postgres testcontainer unavailable, skipping: %v", err)
}
t.Cleanup(func() {
if termErr := testcontainers.TerminateContainer(pgContainer); termErr != nil {
t.Errorf("terminate postgres container: %v", termErr)
}
})
baseDSN, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
if err != nil {
t.Fatalf("connection string: %v", err)
}
scopedDSN, err := dsnWithSearchPath(baseDSN, pgSchema)
if err != nil {
t.Fatalf("scope dsn: %v", err)
}
cfg := pgshared.DefaultConfig()
cfg.PrimaryDSN = scopedDSN
cfg.OperationTimeout = pgOpTO
db, err := pgshared.OpenPrimary(ctx, cfg, backendpg.NoObservabilityOptions()...)
if err != nil {
t.Fatalf("open primary: %v", err)
}
t.Cleanup(func() { _ = db.Close() })
if err := pgshared.Ping(ctx, db, cfg.OperationTimeout); err != nil {
t.Fatalf("ping: %v", err)
}
if err := backendpg.ApplyMigrations(ctx, db); err != nil {
t.Fatalf("apply migrations: %v", err)
}
return db
}
func dsnWithSearchPath(baseDSN, schema string) (string, error) {
parsed, err := url.Parse(baseDSN)
if err != nil {
return "", err
}
values := parsed.Query()
values.Set("search_path", schema)
if values.Get("sslmode") == "" {
values.Set("sslmode", "disable")
}
parsed.RawQuery = values.Encode()
return parsed.String(), nil
}
func TestStoreInsertEnqueueRoundTrip(t *testing.T) {
t.Parallel()
db := startPostgres(t)
store := mail.NewStore(db)
ctx := context.Background()
args := mail.EnqueueArgs{
DeliveryID: uuid.New(),
TemplateID: mail.TemplateLoginCode,
IdempotencyKey: uuid.NewString(),
Recipients: []string{"alice@example.test"},
ContentType: "text/plain",
Subject: "hello",
Body: []byte("hi"),
}
inserted, err := store.InsertEnqueue(ctx, args)
if err != nil {
t.Fatalf("insert: %v", err)
}
if !inserted {
t.Fatal("first insert must report inserted=true")
}
// Same idempotency key must dedupe.
args2 := args
args2.DeliveryID = uuid.New()
inserted2, err := store.InsertEnqueue(ctx, args2)
if err != nil {
t.Fatalf("insert retry: %v", err)
}
if inserted2 {
t.Fatal("re-enqueue with same key must report inserted=false")
}
d, err := store.GetDelivery(ctx, args.DeliveryID)
if err != nil {
t.Fatalf("get delivery: %v", err)
}
if d.Status != mail.StatusPending {
t.Fatalf("status=%q want pending", d.Status)
}
if d.NextAttemptAt == nil {
t.Fatal("next_attempt_at must be set on insert")
}
}
func TestStoreClaimDueAndMarkSent(t *testing.T) {
t.Parallel()
db := startPostgres(t)
store := mail.NewStore(db)
ctx := context.Background()
deliveryID := uuid.New()
if _, err := store.InsertEnqueue(ctx, mail.EnqueueArgs{
DeliveryID: deliveryID,
TemplateID: mail.TemplateLoginCode,
IdempotencyKey: uuid.NewString(),
Recipients: []string{"bob@example.test"},
ContentType: "text/plain",
Subject: "hello",
Body: []byte("hi"),
}); err != nil {
t.Fatalf("insert: %v", err)
}
tx, err := store.BeginTx(ctx)
if err != nil {
t.Fatalf("begin: %v", err)
}
t.Cleanup(func() { _ = tx.Rollback() })
claimed, err := store.ClaimDue(ctx, tx, 5)
if err != nil {
t.Fatalf("claim: %v", err)
}
if len(claimed) != 1 {
t.Fatalf("got %d claimed, want 1", len(claimed))
}
if claimed[0].Delivery.DeliveryID != deliveryID {
t.Fatalf("claimed wrong delivery: %s", claimed[0].Delivery.DeliveryID)
}
if string(claimed[0].Payload.Body) != "hi" {
t.Fatalf("payload body lost in round trip: %q", claimed[0].Payload.Body)
}
if len(claimed[0].Recipients) != 1 || claimed[0].Recipients[0].Address != "bob@example.test" {
t.Fatalf("recipient lost: %+v", claimed[0].Recipients)
}
now := time.Now().UTC()
if _, err := store.RecordAttempt(ctx, tx, deliveryID, now, now, mail.OutcomeSuccess, ""); err != nil {
t.Fatalf("record attempt: %v", err)
}
if err := store.MarkSent(ctx, tx, deliveryID, now); err != nil {
t.Fatalf("mark sent: %v", err)
}
if err := tx.Commit(); err != nil {
t.Fatalf("commit: %v", err)
}
d, err := store.GetDelivery(ctx, deliveryID)
if err != nil {
t.Fatalf("get delivery: %v", err)
}
if d.Status != mail.StatusSent {
t.Fatalf("status=%q want sent", d.Status)
}
if d.SentAt == nil {
t.Fatal("sent_at must be set after MarkSent")
}
if d.Attempts != 1 {
t.Fatalf("attempts=%d want 1", d.Attempts)
}
attempts, err := store.ListAttempts(ctx, deliveryID)
if err != nil {
t.Fatalf("list attempts: %v", err)
}
if len(attempts) != 1 || attempts[0].Outcome != mail.OutcomeSuccess {
t.Fatalf("attempts=%+v", attempts)
}
}
func TestStoreScheduleRetryThenDeadLetter(t *testing.T) {
t.Parallel()
db := startPostgres(t)
store := mail.NewStore(db)
ctx := context.Background()
deliveryID := uuid.New()
if _, err := store.InsertEnqueue(ctx, mail.EnqueueArgs{
DeliveryID: deliveryID,
TemplateID: "test.template",
IdempotencyKey: uuid.NewString(),
Recipients: []string{"carol@example.test"},
ContentType: "text/plain",
Subject: "hi",
Body: []byte("body"),
}); err != nil {
t.Fatalf("insert: %v", err)
}
tx, err := store.BeginTx(ctx)
if err != nil {
t.Fatalf("begin tx 1: %v", err)
}
if _, err := store.ClaimDue(ctx, tx, 1); err != nil {
t.Fatalf("claim 1: %v", err)
}
now := time.Now().UTC()
if _, err := store.RecordAttempt(ctx, tx, deliveryID, now, now, mail.OutcomeTransientError, "boom"); err != nil {
t.Fatalf("record attempt: %v", err)
}
if err := store.ScheduleRetry(ctx, tx, deliveryID, now, now.Add(2*time.Second), "boom"); err != nil {
t.Fatalf("schedule retry: %v", err)
}
if err := tx.Commit(); err != nil {
t.Fatalf("commit 1: %v", err)
}
d, err := store.GetDelivery(ctx, deliveryID)
if err != nil {
t.Fatalf("get delivery: %v", err)
}
if d.Status != mail.StatusRetrying {
t.Fatalf("status=%q want retrying", d.Status)
}
if d.LastError != "boom" {
t.Fatalf("last_error=%q want boom", d.LastError)
}
tx2, err := store.BeginTx(ctx)
if err != nil {
t.Fatalf("begin tx 2: %v", err)
}
if err := store.MarkDeadLettered(ctx, tx2, deliveryID, now, "max attempts"); err != nil {
t.Fatalf("mark dead-lettered: %v", err)
}
if err := tx2.Commit(); err != nil {
t.Fatalf("commit 2: %v", err)
}
d, err = store.GetDelivery(ctx, deliveryID)
if err != nil {
t.Fatalf("get delivery 2: %v", err)
}
if d.Status != mail.StatusDeadLettered {
t.Fatalf("status=%q want dead_lettered", d.Status)
}
if d.DeadLetteredAt == nil {
t.Fatal("dead_lettered_at must be set")
}
_, total, err := store.ListDeadLetters(ctx, 0, 25)
if err != nil {
t.Fatalf("list dead letters: %v", err)
}
if total != 1 {
t.Fatalf("dead-letter total=%d want 1", total)
}
}
func TestStoreResendNonSent(t *testing.T) {
t.Parallel()
db := startPostgres(t)
store := mail.NewStore(db)
ctx := context.Background()
deliveryID := uuid.New()
if _, err := store.InsertEnqueue(ctx, mail.EnqueueArgs{
DeliveryID: deliveryID,
TemplateID: "test.template",
IdempotencyKey: uuid.NewString(),
Recipients: []string{"d@example.test"},
ContentType: "text/plain",
Subject: "hi",
Body: []byte("b"),
}); err != nil {
t.Fatalf("insert: %v", err)
}
// re-arm pending row -> ok.
if _, err := store.ResendNonSent(ctx, deliveryID, time.Now().UTC()); err != nil {
t.Fatalf("resend pending: %v", err)
}
// flip to sent and verify resend now errors.
tx, err := store.BeginTx(ctx)
if err != nil {
t.Fatalf("begin: %v", err)
}
if _, err := store.ClaimDue(ctx, tx, 1); err != nil {
t.Fatalf("claim: %v", err)
}
now := time.Now().UTC()
if _, err := store.RecordAttempt(ctx, tx, deliveryID, now, now, mail.OutcomeSuccess, ""); err != nil {
t.Fatalf("record attempt: %v", err)
}
if err := store.MarkSent(ctx, tx, deliveryID, now); err != nil {
t.Fatalf("mark sent: %v", err)
}
if err := tx.Commit(); err != nil {
t.Fatalf("commit: %v", err)
}
if _, err := store.ResendNonSent(ctx, deliveryID, time.Now().UTC()); !errors.Is(err, mail.ErrResendOnSent) {
t.Fatalf("resend on sent: want ErrResendOnSent, got %v", err)
}
if _, err := store.ResendNonSent(ctx, uuid.New(), time.Now().UTC()); !errors.Is(err, mail.ErrDeliveryNotFound) {
t.Fatalf("resend on missing: want ErrDeliveryNotFound, got %v", err)
}
}