diff --git a/backend/README.md b/backend/README.md index c7ce842..27505cf 100644 --- a/backend/README.md +++ b/backend/README.md @@ -134,6 +134,10 @@ fast. | `BACKEND_NOTIFICATION_MAX_ATTEMPTS` | no | `8` | Notification route delivery attempts before dead-lettering. | | `BACKEND_DIPLOMAIL_MAX_BODY_BYTES` | no | `4096` | Maximum size of `diplomail_messages.body` enforced at send time. Tune at runtime without a migration. | | `BACKEND_DIPLOMAIL_MAX_SUBJECT_BYTES` | no | `256` | Maximum size of `diplomail_messages.subject`. Subject is optional; empty is always accepted. | +| `BACKEND_DIPLOMAIL_TRANSLATOR_URL` | no | — | Base URL of a LibreTranslate-compatible instance (`http://libretranslate:5000`). Empty → translator falls through to no-op (recipients are delivered with the original body). | +| `BACKEND_DIPLOMAIL_TRANSLATOR_TIMEOUT` | no | `10s` | Per-request HTTP timeout for the translation worker. | +| `BACKEND_DIPLOMAIL_TRANSLATOR_MAX_ATTEMPTS` | no | `5` | Number of failed HTTP attempts before the worker delivers the message with the original body (fallback). | +| `BACKEND_DIPLOMAIL_WORKER_INTERVAL` | no | `2s` | How often the async translation worker scans for pending pairs. The worker processes one pair per tick. | If `BACKEND_ADMIN_BOOTSTRAP_USER` is set without `BACKEND_ADMIN_BOOTSTRAP_PASSWORD`, `Validate()` fails. If neither is diff --git a/backend/cmd/backend/main.go b/backend/cmd/backend/main.go index 05da93a..dd21847 100644 --- a/backend/cmd/backend/main.go +++ b/backend/cmd/backend/main.go @@ -309,6 +309,10 @@ func run(ctx context.Context) (err error) { runtimeNotifyPublisher.svc = notifSvc diplomailStore := diplomail.NewStore(db) + diplomailTranslator, err := buildDiplomailTranslator(cfg.Diplomail, logger) + if err != nil { + return fmt.Errorf("build diplomail translator: %w", err) + } diplomailSvc := diplomail.NewService(diplomail.Deps{ Store: diplomailStore, Memberships: &diplomailMembershipAdapter{lobby: lobbySvc, users: userSvc}, @@ -316,11 +320,12 @@ func run(ctx context.Context) (err error) { Entitlements: &diplomailEntitlementAdapter{users: userSvc}, Games: &diplomailGameAdapter{lobby: lobbySvc}, Detector: detector.New(), - Translator: translator.NewNoop(), + Translator: diplomailTranslator, Config: cfg.Diplomail, Logger: logger, }) lobbyDiplomailPublisher.svc = diplomailSvc + diplomailWorker := diplomail.NewWorker(diplomailSvc) if email := cfg.Notification.AdminEmail; email == "" { logger.Info("notification admin email not configured (BACKEND_NOTIFICATION_ADMIN_EMAIL); admin-channel routes will be skipped") } else { @@ -398,7 +403,7 @@ func run(ctx context.Context) (err error) { runtimeScheduler := runtimeSvc.SchedulerComponent() runtimeReconciler := runtimeSvc.Reconciler() - components := []app.Component{httpServer, pushServer, mailWorker, notifWorker, lobbySweeper, runtimeWorkers, runtimeScheduler, runtimeReconciler} + components := []app.Component{httpServer, pushServer, mailWorker, notifWorker, diplomailWorker, lobbySweeper, runtimeWorkers, runtimeScheduler, runtimeReconciler} if metricsServer.Enabled() { components = append(components, metricsServer) } @@ -641,11 +646,12 @@ func (a *diplomailMembershipAdapter) GetActiveMembership(ctx context.Context, ga return diplomail.ActiveMembership{}, err } return diplomail.ActiveMembership{ - UserID: userID, - GameID: gameID, - GameName: game.GameName, - UserName: account.UserName, - RaceName: found.RaceName, + UserID: userID, + GameID: gameID, + GameName: game.GameName, + UserName: account.UserName, + RaceName: found.RaceName, + PreferredLanguage: account.PreferredLanguage, }, nil } @@ -677,12 +683,13 @@ func (a *diplomailMembershipAdapter) GetMembershipAnyStatus(ctx context.Context, return diplomail.MemberSnapshot{}, err } return diplomail.MemberSnapshot{ - UserID: userID, - GameID: gameID, - GameName: game.GameName, - UserName: account.UserName, - RaceName: found.RaceName, - Status: found.Status, + UserID: userID, + GameID: gameID, + GameName: game.GameName, + UserName: account.UserName, + RaceName: found.RaceName, + Status: found.Status, + PreferredLanguage: account.PreferredLanguage, }, nil } @@ -720,12 +727,13 @@ func (a *diplomailMembershipAdapter) ListMembers(ctx context.Context, gameID uui return nil, fmt.Errorf("resolve user_name for %s: %w", m.UserID, err) } out = append(out, diplomail.MemberSnapshot{ - UserID: m.UserID, - GameID: gameID, - GameName: game.GameName, - UserName: account.UserName, - RaceName: m.RaceName, - Status: m.Status, + UserID: m.UserID, + GameID: gameID, + GameName: game.GameName, + UserName: account.UserName, + RaceName: m.RaceName, + Status: m.Status, + PreferredLanguage: account.PreferredLanguage, }) } return out, nil @@ -754,6 +762,22 @@ func (a *lobbyDiplomailPublisherAdapter) PublishLifecycle(ctx context.Context, e }) } +// buildDiplomailTranslator selects the diplomail translator backend +// from configuration: a non-empty `TranslatorURL` constructs the +// LibreTranslate HTTP client; an empty URL falls through to the +// noop translator so deployments without a translation service still +// boot and deliver mail with the fallback path. +func buildDiplomailTranslator(cfg config.DiplomailConfig, logger *zap.Logger) (translator.Translator, error) { + if cfg.TranslatorURL == "" { + logger.Info("diplomail translator URL not configured, using noop translator") + return translator.NewNoop(), nil + } + return translator.NewLibreTranslate(translator.LibreTranslateConfig{ + URL: cfg.TranslatorURL, + Timeout: cfg.TranslatorTimeout, + }) +} + // diplomailEntitlementAdapter implements // `diplomail.EntitlementReader` by reading the user-service // entitlement snapshot. The IsPaid flag mirrors the per-tier policy diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index fb46b62..bd981ab 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -98,6 +98,10 @@ const ( envDiplomailMaxBodyBytes = "BACKEND_DIPLOMAIL_MAX_BODY_BYTES" envDiplomailMaxSubjectBytes = "BACKEND_DIPLOMAIL_MAX_SUBJECT_BYTES" + envDiplomailTranslatorURL = "BACKEND_DIPLOMAIL_TRANSLATOR_URL" + envDiplomailTranslatorTimeout = "BACKEND_DIPLOMAIL_TRANSLATOR_TIMEOUT" + envDiplomailTranslatorMaxAttempts = "BACKEND_DIPLOMAIL_TRANSLATOR_MAX_ATTEMPTS" + envDiplomailWorkerInterval = "BACKEND_DIPLOMAIL_WORKER_INTERVAL" envDevSandboxEmail = "BACKEND_DEV_SANDBOX_EMAIL" envDevSandboxEngineImage = "BACKEND_DEV_SANDBOX_ENGINE_IMAGE" @@ -168,6 +172,9 @@ const ( defaultDiplomailMaxBodyBytes = 4096 defaultDiplomailMaxSubjectBytes = 256 + defaultDiplomailTranslatorTimeout = 10 * time.Second + defaultDiplomailTranslatorMaxAttempts = 5 + defaultDiplomailWorkerInterval = 2 * time.Second defaultDevSandboxEngineVersion = "0.1.0" defaultDevSandboxPlayerCount = 20 @@ -418,6 +425,26 @@ type DiplomailConfig struct { // in bytes. Subjects are optional; the empty-string default // passes the limit trivially. MaxSubjectBytes int + + // TranslatorURL is the base URL of the LibreTranslate-compatible + // instance the async translation worker calls. When empty, the + // worker still runs but falls through to "deliver original" + // (the noop translator returns engine=noop). + TranslatorURL string + + // TranslatorTimeout bounds a single HTTP request to the + // translator. Worker retries (exponential backoff up to + // TranslatorMaxAttempts) layer on top. + TranslatorTimeout time.Duration + + // TranslatorMaxAttempts is the number of times the worker tries + // to translate one (message, target_lang) pair before falling + // back to delivering the original body. + TranslatorMaxAttempts int + + // WorkerInterval bounds how often the async translation worker + // scans for pending pairs. The worker handles one pair per tick. + WorkerInterval time.Duration } // NotificationConfig configures the notification fan-out module @@ -520,6 +547,9 @@ func DefaultConfig() Config { Diplomail: DiplomailConfig{ MaxBodyBytes: defaultDiplomailMaxBodyBytes, MaxSubjectBytes: defaultDiplomailMaxSubjectBytes, + TranslatorTimeout: defaultDiplomailTranslatorTimeout, + TranslatorMaxAttempts: defaultDiplomailTranslatorMaxAttempts, + WorkerInterval: defaultDiplomailWorkerInterval, }, DevSandbox: DevSandboxConfig{ EngineVersion: defaultDevSandboxEngineVersion, @@ -690,6 +720,16 @@ func LoadFromEnv() (Config, error) { if cfg.Diplomail.MaxSubjectBytes, err = loadInt(envDiplomailMaxSubjectBytes, cfg.Diplomail.MaxSubjectBytes); err != nil { return Config{}, err } + cfg.Diplomail.TranslatorURL = loadString(envDiplomailTranslatorURL, cfg.Diplomail.TranslatorURL) + if cfg.Diplomail.TranslatorTimeout, err = loadDuration(envDiplomailTranslatorTimeout, cfg.Diplomail.TranslatorTimeout); err != nil { + return Config{}, err + } + if cfg.Diplomail.TranslatorMaxAttempts, err = loadInt(envDiplomailTranslatorMaxAttempts, cfg.Diplomail.TranslatorMaxAttempts); err != nil { + return Config{}, err + } + if cfg.Diplomail.WorkerInterval, err = loadDuration(envDiplomailWorkerInterval, cfg.Diplomail.WorkerInterval); err != nil { + return Config{}, err + } cfg.DevSandbox.Email = strings.TrimSpace(loadString(envDevSandboxEmail, cfg.DevSandbox.Email)) cfg.DevSandbox.EngineImage = strings.TrimSpace(loadString(envDevSandboxEngineImage, cfg.DevSandbox.EngineImage)) @@ -894,6 +934,15 @@ func (c Config) Validate() error { if c.Diplomail.MaxSubjectBytes < 0 { return fmt.Errorf("%s must not be negative", envDiplomailMaxSubjectBytes) } + if c.Diplomail.TranslatorTimeout <= 0 { + return fmt.Errorf("%s must be positive", envDiplomailTranslatorTimeout) + } + if c.Diplomail.TranslatorMaxAttempts <= 0 { + return fmt.Errorf("%s must be positive", envDiplomailTranslatorMaxAttempts) + } + if c.Diplomail.WorkerInterval <= 0 { + return fmt.Errorf("%s must be positive", envDiplomailWorkerInterval) + } if email := strings.TrimSpace(c.Notification.AdminEmail); email != "" { if _, err := netmail.ParseAddress(email); err != nil { return fmt.Errorf("%s must be a valid RFC 5322 address: %w", envNotificationAdminEmail, err) diff --git a/backend/internal/diplomail/README.md b/backend/internal/diplomail/README.md index 761223a..1af0733 100644 --- a/backend/internal/diplomail/README.md +++ b/backend/internal/diplomail/README.md @@ -18,6 +18,7 @@ purge, and the language-detection / translation cache. | B | Owner / admin sends + lifecycle hooks (paused, cancelled, kick); strict soft-access for kicked players | shipped | | C | Paid-tier personal broadcast + admin multi-game broadcast + bulk purge + admin observability | shipped | | D | Body-language detection (whatlanggo) + translation cache + lazy per-read translator dispatch | shipped | +| E | LibreTranslate HTTP client + async translation worker with exponential backoff + delivery gating on translation completion | shipped | ## Tables @@ -94,6 +95,47 @@ Future work plugs a real `translator.Translator` (LibreTranslate HTTP client is the documented next step) without touching the rest of the system. +## Async translation (Stage E) + +Stage E switches the translation pipeline from "lazy at read" to +"async at send". The send path stays synchronous from the +caller's perspective: the message and recipient rows are inserted +in one transaction. What changes is delivery semantics: + +- Recipients whose `preferred_language` matches the detected + `body_lang` (or whose body language is `und`) get + `available_at = now()` straight away and the push event fires + during the request. +- Recipients whose `preferred_language` differs are inserted with + `available_at IS NULL`. They are **not** visible in inbox, unread + count, or push events until the worker translates the message. + +The worker (`internal/diplomail.Worker`, started as an +`app.Component` in `cmd/backend/main`) ticks once every +`BACKEND_DIPLOMAIL_WORKER_INTERVAL` (default `2s`). Each tick: + +1. Picks one distinct `(message_id, recipient_preferred_language)` + pair from `diplomail_recipients` where `available_at IS NULL` + and `next_translation_attempt_at` is unset or due. +2. Loads the source message, checks the translation cache. +3. On cache hit → marks every pending recipient of the pair + delivered and emits push. +4. On cache miss → asks the configured `Translator`: + - success → caches the translation, marks delivered, push; + - HTTP 400 (unsupported pair) → marks delivered without a + translation (fallback to original); + - other failure → bumps `translation_attempts`, schedules the + retry via `next_translation_attempt_at`, leaves pending. +5. After `BACKEND_DIPLOMAIL_TRANSLATOR_MAX_ATTEMPTS` (default `5`) + the worker falls back to delivering the original body so a + prolonged LibreTranslate outage does not strand messages. + +Retry backoff is exponential `1s → 2s → 4s → 8s → 16s` (capped at +60s) per pair. Operators monitor the LibreTranslate dependency +through standard OpenTelemetry export — translation outcomes +surface in `diplomail.worker` logs at Info / Warn levels; +Grafana / Prometheus dashboards live outside this package. + ## Push integration Every successful send emits a `diplomail.message.received` push diff --git a/backend/internal/diplomail/admin_send.go b/backend/internal/diplomail/admin_send.go index 2d8b36b..0b4c5c8 100644 --- a/backend/internal/diplomail/admin_send.go +++ b/backend/internal/diplomail/admin_send.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "time" "github.com/google/uuid" "go.uber.org/zap" @@ -41,7 +42,7 @@ func (s *Service) SendAdminPersonal(ctx context.Context, in SendAdminPersonalInp if err != nil { return Message{}, Recipient{}, err } - rcptInsert := buildRecipientInsert(msgInsert.MessageID, recipient) + rcptInsert := buildRecipientInsert(msgInsert.MessageID, recipient, msgInsert.BodyLang, s.nowUTC()) msg, recipients, err := s.deps.Store.InsertMessageWithRecipients(ctx, msgInsert, []RecipientInsert{rcptInsert}) if err != nil { @@ -51,7 +52,7 @@ func (s *Service) SendAdminPersonal(ctx context.Context, in SendAdminPersonalInp return Message{}, Recipient{}, fmt.Errorf("diplomail: send admin personal: unexpected recipient count %d", len(recipients)) } - s.publishMessageReceived(ctx, msg, recipients[0]) + if recipients[0].AvailableAt != nil { s.publishMessageReceived(ctx, msg, recipients[0]) } return msg, recipients[0], nil } @@ -90,7 +91,7 @@ func (s *Service) SendAdminBroadcast(ctx context.Context, in SendAdminBroadcastI } rcptInserts := make([]RecipientInsert, 0, len(members)) for _, m := range members { - rcptInserts = append(rcptInserts, buildRecipientInsert(msgInsert.MessageID, m)) + rcptInserts = append(rcptInserts, buildRecipientInsert(msgInsert.MessageID, m, msgInsert.BodyLang, s.nowUTC())) } msg, recipients, err := s.deps.Store.InsertMessageWithRecipients(ctx, msgInsert, rcptInserts) @@ -98,7 +99,7 @@ func (s *Service) SendAdminBroadcast(ctx context.Context, in SendAdminBroadcastI return Message{}, nil, fmt.Errorf("diplomail: send admin broadcast: %w", err) } for _, r := range recipients { - s.publishMessageReceived(ctx, msg, r) + if r.AvailableAt != nil { s.publishMessageReceived(ctx, msg, r) } } return msg, recipients, nil } @@ -162,14 +163,14 @@ func (s *Service) SendPlayerBroadcast(ctx context.Context, in SendPlayerBroadcas } rcptInserts := make([]RecipientInsert, 0, len(members)) for _, m := range members { - rcptInserts = append(rcptInserts, buildRecipientInsert(msgInsert.MessageID, m)) + rcptInserts = append(rcptInserts, buildRecipientInsert(msgInsert.MessageID, m, msgInsert.BodyLang, s.nowUTC())) } msg, recipients, err := s.deps.Store.InsertMessageWithRecipients(ctx, msgInsert, rcptInserts) if err != nil { return Message{}, nil, fmt.Errorf("diplomail: send player broadcast: %w", err) } for _, r := range recipients { - s.publishMessageReceived(ctx, msg, r) + if r.AvailableAt != nil { s.publishMessageReceived(ctx, msg, r) } } return msg, recipients, nil } @@ -223,14 +224,14 @@ func (s *Service) SendAdminMultiGameBroadcast(ctx context.Context, in SendMultiG } rcptInserts := make([]RecipientInsert, 0, len(members)) for _, m := range members { - rcptInserts = append(rcptInserts, buildRecipientInsert(msgInsert.MessageID, m)) + rcptInserts = append(rcptInserts, buildRecipientInsert(msgInsert.MessageID, m, msgInsert.BodyLang, s.nowUTC())) } msg, recipients, err := s.deps.Store.InsertMessageWithRecipients(ctx, msgInsert, rcptInserts) if err != nil { return nil, 0, fmt.Errorf("diplomail: insert multi-game broadcast for %s: %w", game.GameID, err) } for _, r := range recipients { - s.publishMessageReceived(ctx, msg, r) + if r.AvailableAt != nil { s.publishMessageReceived(ctx, msg, r) } } out = append(out, msg) totalRecipients += len(recipients) @@ -362,14 +363,14 @@ func (s *Service) publishGameLifecycle(ctx context.Context, ev LifecycleEvent) e } rcptInserts := make([]RecipientInsert, 0, len(members)) for _, m := range members { - rcptInserts = append(rcptInserts, buildRecipientInsert(msgInsert.MessageID, m)) + rcptInserts = append(rcptInserts, buildRecipientInsert(msgInsert.MessageID, m, msgInsert.BodyLang, s.nowUTC())) } msg, recipients, err := s.deps.Store.InsertMessageWithRecipients(ctx, msgInsert, rcptInserts) if err != nil { return fmt.Errorf("diplomail lifecycle: insert %s system mail: %w", ev.Kind, err) } for _, r := range recipients { - s.publishMessageReceived(ctx, msg, r) + if r.AvailableAt != nil { s.publishMessageReceived(ctx, msg, r) } } return nil } @@ -389,12 +390,12 @@ func (s *Service) publishMembershipLifecycle(ctx context.Context, ev LifecycleEv if err != nil { return err } - rcptInsert := buildRecipientInsert(msgInsert.MessageID, target) + rcptInsert := buildRecipientInsert(msgInsert.MessageID, target, msgInsert.BodyLang, s.nowUTC()) msg, recipients, err := s.deps.Store.InsertMessageWithRecipients(ctx, msgInsert, []RecipientInsert{rcptInsert}) if err != nil { return fmt.Errorf("diplomail lifecycle: insert %s system mail: %w", ev.Kind, err) } - if len(recipients) == 1 { + if len(recipients) == 1 && recipients[0].AvailableAt != nil { s.publishMessageReceived(ctx, msg, recipients[0]) } return nil @@ -457,21 +458,68 @@ func (s *Service) buildAdminMessageInsert(callerKind string, callerUserID *uuid. // buildRecipientInsert turns a MemberSnapshot into a RecipientInsert. // The race-name snapshot is nullable so a kicked player with no race // name on file is still addressable. -func buildRecipientInsert(messageID uuid.UUID, m MemberSnapshot) RecipientInsert { +// +// `bodyLang` is the detected language of the message body. When the +// recipient's preferred_language matches body_lang (or body_lang is +// undetermined), the function fills AvailableAt with `now` so the +// recipient row is materialised already-delivered; otherwise +// AvailableAt stays nil and the translation worker takes over. +func buildRecipientInsert(messageID uuid.UUID, m MemberSnapshot, bodyLang string, now time.Time) RecipientInsert { in := RecipientInsert{ - RecipientID: uuid.New(), - MessageID: messageID, - GameID: m.GameID, - UserID: m.UserID, - RecipientUserName: m.UserName, + RecipientID: uuid.New(), + MessageID: messageID, + GameID: m.GameID, + UserID: m.UserID, + RecipientUserName: m.UserName, + RecipientPreferredLanguage: normaliseLang(m.PreferredLanguage), } if m.RaceName != "" { race := m.RaceName in.RecipientRaceName = &race } + if needsTranslation(bodyLang, in.RecipientPreferredLanguage) { + // AvailableAt left nil → worker will deliver after the + // translation cache is materialised (or after fallback). + } else { + t := now.UTC() + in.AvailableAt = &t + } return in } +// needsTranslation reports whether a recipient with preferredLang +// needs to wait for a translated rendering before the message is +// considered delivered. Undetermined body language and empty +// recipient preferences are short-circuited to "no translation +// needed" so we never block delivery on something the detector +// could not label. +func needsTranslation(bodyLang, preferredLang string) bool { + bodyLang = normaliseLang(bodyLang) + preferredLang = normaliseLang(preferredLang) + if bodyLang == "" || bodyLang == LangUndetermined { + return false + } + if preferredLang == "" || preferredLang == LangUndetermined { + return false + } + return bodyLang != preferredLang +} + +// normaliseLang strips any region subtag and lowercases the result so +// `en-US` and `EN` both collapse to `en`. The diplomail layer uses +// ISO 639-1 codes; whatlanggo and LibreTranslate share that +// vocabulary. +func normaliseLang(tag string) string { + tag = strings.TrimSpace(tag) + if tag == "" { + return "" + } + if i := strings.IndexAny(tag, "-_"); i > 0 { + tag = tag[:i] + } + return strings.ToLower(tag) +} + func validateCaller(callerKind string, callerUserID *uuid.UUID, callerUsername string) error { switch callerKind { case CallerKindOwner: diff --git a/backend/internal/diplomail/deps.go b/backend/internal/diplomail/deps.go index b132068..abc33ab 100644 --- a/backend/internal/diplomail/deps.go +++ b/backend/internal/diplomail/deps.go @@ -77,14 +77,15 @@ type GameSnapshot struct { // ActiveMembership is the slim view of a single (user, game) roster // row the diplomail package needs at send time: it confirms the // participant is active in the game and captures the snapshot fields -// (`game_name`, `user_name`, `race_name`) that we persist on each new -// message / recipient row. +// (`game_name`, `user_name`, `race_name`, `preferred_language`) that +// we persist on each new message / recipient row. type ActiveMembership struct { - UserID uuid.UUID - GameID uuid.UUID - GameName string - UserName string - RaceName string + UserID uuid.UUID + GameID uuid.UUID + GameName string + UserName string + RaceName string + PreferredLanguage string } // MembershipLookup is the read-only surface diplomail uses to verify @@ -123,14 +124,17 @@ const ( // MemberSnapshot is the slim view of a membership row that survives // all three status values. RaceName is the immutable string captured // at registration time; an empty value is legal for rare cases where -// the row was inserted without one. +// the row was inserted without one. PreferredLanguage is included so +// the broadcast and lifecycle paths can decide whether the recipient +// needs to wait for a translation before delivery. type MemberSnapshot struct { - UserID uuid.UUID - GameID uuid.UUID - GameName string - UserName string - RaceName string - Status string + UserID uuid.UUID + GameID uuid.UUID + GameName string + UserName string + RaceName string + PreferredLanguage string + Status string } // NotificationPublisher is the outbound surface diplomail uses to diff --git a/backend/internal/diplomail/diplomail_e2e_test.go b/backend/internal/diplomail/diplomail_e2e_test.go index c02bf21..074c9d9 100644 --- a/backend/internal/diplomail/diplomail_e2e_test.go +++ b/backend/internal/diplomail/diplomail_e2e_test.go @@ -808,6 +808,176 @@ func TestDiplomailLifecycleMembershipKick(t *testing.T) { } } +// TestDiplomailAsyncTranslationDelivery covers the Stage E flow: +// 1. SendPersonal where recipient.preferred_language != body_lang +// materialises a recipient with `AvailableAt == nil`; the inbox +// is empty until the worker runs. +// 2. After one Worker.Tick(), the translation cache row exists, +// `AvailableAt` is populated, and the push event fires. +// 3. The inbox now surfaces the message together with the cached +// translation under `Translation`. +func TestDiplomailAsyncTranslationDelivery(t *testing.T) { + db := startPostgres(t) + ctx := context.Background() + + gameID := uuid.New() + sender := uuid.New() + recipient := uuid.New() + seedAccount(t, db, sender) + seedAccount(t, db, recipient) + seedGame(t, db, gameID, "Async Translation Game") + + lookup := &staticMembershipLookup{ + rows: map[[2]uuid.UUID]diplomail.ActiveMembership{ + {gameID, sender}: { + UserID: sender, GameID: gameID, GameName: "Async Translation Game", + UserName: "sender", RaceName: "SenderRace", + PreferredLanguage: "en", + }, + {gameID, recipient}: { + UserID: recipient, GameID: gameID, GameName: "Async Translation Game", + UserName: "recipient", RaceName: "RecipientRace", + PreferredLanguage: "ru", + }, + }, + } + publisher := &recordingPublisher{} + stub := &staticTranslator{engine: translator.LibreTranslateEngine} + svc := diplomail.NewService(diplomail.Deps{ + Store: diplomail.NewStore(db), + Memberships: lookup, + Notification: publisher, + Detector: detectorFn(func(_ string) string { return "en" }), + Translator: stub, + Config: config.DiplomailConfig{ + MaxBodyBytes: 4096, + MaxSubjectBytes: 256, + TranslatorMaxAttempts: 5, + WorkerInterval: time.Second, + }, + }) + + msg, rcpt, err := svc.SendPersonal(ctx, diplomail.SendPersonalInput{ + GameID: gameID, + SenderUserID: sender, + RecipientUserID: recipient, + Subject: "Hello", + Body: "Trade proposal.", + }) + if err != nil { + t.Fatalf("send: %v", err) + } + if rcpt.AvailableAt != nil { + t.Fatalf("recipient marked available_at on send: %v (want NULL — pending translation)", rcpt.AvailableAt) + } + if got := publisher.snapshot(); len(got) != 0 { + t.Fatalf("push fired before worker delivered: %d events", len(got)) + } + inbox, err := svc.ListInbox(ctx, gameID, recipient, "") + if err != nil { + t.Fatalf("inbox before worker: %v", err) + } + if len(inbox) != 0 { + t.Fatalf("inbox before worker = %d, want empty", len(inbox)) + } + + worker := diplomail.NewWorker(svc) + if err := worker.Tick(ctx); err != nil { + t.Fatalf("worker tick: %v", err) + } + + if got := publisher.snapshot(); len(got) != 1 || got[0].Recipient != recipient { + t.Fatalf("publisher after tick = %+v", got) + } + inboxAfter, err := svc.ListInbox(ctx, gameID, recipient, "ru") + if err != nil { + t.Fatalf("inbox after worker: %v", err) + } + if len(inboxAfter) != 1 { + t.Fatalf("inbox after worker = %d, want 1", len(inboxAfter)) + } + if inboxAfter[0].Translation == nil { + t.Fatalf("translation missing on inbox entry") + } + if inboxAfter[0].Translation.TranslatedBody != "[ru] Trade proposal." { + t.Fatalf("translated body = %q", inboxAfter[0].Translation.TranslatedBody) + } + _ = msg +} + +// TestDiplomailAsyncFallbackOnUnsupportedPair covers the terminal +// "translation unavailable" path: the translator returns +// ErrUnsupportedLanguagePair, so the worker delivers the recipient +// with no cached translation. The user sees the original body. +func TestDiplomailAsyncFallbackOnUnsupportedPair(t *testing.T) { + db := startPostgres(t) + ctx := context.Background() + + gameID := uuid.New() + sender := uuid.New() + recipient := uuid.New() + seedAccount(t, db, sender) + seedAccount(t, db, recipient) + seedGame(t, db, gameID, "Unsupported Pair Game") + + lookup := &staticMembershipLookup{ + rows: map[[2]uuid.UUID]diplomail.ActiveMembership{ + {gameID, sender}: { + UserID: sender, GameID: gameID, GameName: "Unsupported Pair Game", + UserName: "sender", PreferredLanguage: "en", + }, + {gameID, recipient}: { + UserID: recipient, GameID: gameID, GameName: "Unsupported Pair Game", + UserName: "recipient", PreferredLanguage: "xx", + }, + }, + } + publisher := &recordingPublisher{} + svc := diplomail.NewService(diplomail.Deps{ + Store: diplomail.NewStore(db), + Memberships: lookup, + Notification: publisher, + Detector: detectorFn(func(_ string) string { return "en" }), + Translator: &erroringTranslator{err: translator.ErrUnsupportedLanguagePair}, + Config: config.DiplomailConfig{ + MaxBodyBytes: 4096, + MaxSubjectBytes: 256, + TranslatorMaxAttempts: 5, + }, + }) + + if _, _, err := svc.SendPersonal(ctx, diplomail.SendPersonalInput{ + GameID: gameID, + SenderUserID: sender, + RecipientUserID: recipient, + Body: "Hello there.", + }); err != nil { + t.Fatalf("send: %v", err) + } + worker := diplomail.NewWorker(svc) + if err := worker.Tick(ctx); err != nil { + t.Fatalf("worker tick: %v", err) + } + inbox, err := svc.ListInbox(ctx, gameID, recipient, "xx") + if err != nil { + t.Fatalf("inbox: %v", err) + } + if len(inbox) != 1 { + t.Fatalf("inbox after fallback = %d, want 1", len(inbox)) + } + if inbox[0].Translation != nil { + t.Fatalf("translation should be nil after fallback, got %+v", inbox[0].Translation) + } +} + +type erroringTranslator struct { + err error +} + +func (e *erroringTranslator) Translate(_ context.Context, _, _, _, _ string) (translator.Result, error) { + return translator.Result{}, e.err +} + // staticTranslator returns deterministic renderings so the // translation-cache test can assert against known output. type staticTranslator struct { diff --git a/backend/internal/diplomail/service.go b/backend/internal/diplomail/service.go index 71ac29d..b6365a7 100644 --- a/backend/internal/diplomail/service.go +++ b/backend/internal/diplomail/service.go @@ -72,18 +72,20 @@ func (s *Service) SendPersonal(ctx context.Context, in SendPersonalInput) (Messa BroadcastScope: BroadcastScopeSingle, } raceName := recipient.RaceName - var raceNamePtr *string - if raceName != "" { - raceNamePtr = &raceName - } - rcptInsert := RecipientInsert{ - RecipientID: uuid.New(), - MessageID: msgInsert.MessageID, - GameID: in.GameID, - UserID: in.RecipientUserID, - RecipientUserName: recipient.UserName, - RecipientRaceName: raceNamePtr, - } + rcptInsert := buildRecipientInsert( + msgInsert.MessageID, + MemberSnapshot{ + UserID: in.RecipientUserID, + GameID: in.GameID, + GameName: recipient.GameName, + UserName: recipient.UserName, + RaceName: raceName, + PreferredLanguage: recipient.PreferredLanguage, + Status: "active", + }, + msgInsert.BodyLang, + s.nowUTC(), + ) msg, recipients, err := s.deps.Store.InsertMessageWithRecipients(ctx, msgInsert, []RecipientInsert{rcptInsert}) if err != nil { @@ -93,7 +95,9 @@ func (s *Service) SendPersonal(ctx context.Context, in SendPersonalInput) (Messa return Message{}, Recipient{}, fmt.Errorf("diplomail: send personal: unexpected recipient count %d", len(recipients)) } - s.publishMessageReceived(ctx, msg, recipients[0]) + if recipients[0].AvailableAt != nil { + s.publishMessageReceived(ctx, msg, recipients[0]) + } return msg, recipients[0], nil } diff --git a/backend/internal/diplomail/store.go b/backend/internal/diplomail/store.go index 3fbdb1d..e6364cf 100644 --- a/backend/internal/diplomail/store.go +++ b/backend/internal/diplomail/store.go @@ -42,7 +42,8 @@ func recipientColumns() postgres.ColumnList { r := table.DiplomailRecipients return postgres.ColumnList{ r.RecipientID, r.MessageID, r.GameID, r.UserID, - r.RecipientUserName, r.RecipientRaceName, + r.RecipientUserName, r.RecipientRaceName, r.RecipientPreferredLanguage, + r.AvailableAt, r.TranslationAttempts, r.NextTranslationAttemptAt, r.DeliveredAt, r.ReadAt, r.DeletedAt, r.NotifiedAt, } } @@ -65,14 +66,20 @@ type MessageInsert struct { BroadcastScope string } -// RecipientInsert carries the per-recipient snapshot. +// RecipientInsert carries the per-recipient snapshot. AvailableAt +// captures the async-delivery contract: when non-nil, the recipient +// row is materialised already-delivered (no translation needed or +// the language matches); when nil, the recipient is queued for the +// translation worker. type RecipientInsert struct { - RecipientID uuid.UUID - MessageID uuid.UUID - GameID uuid.UUID - UserID uuid.UUID - RecipientUserName string - RecipientRaceName *string + RecipientID uuid.UUID + MessageID uuid.UUID + GameID uuid.UUID + UserID uuid.UUID + RecipientUserName string + RecipientRaceName *string + RecipientPreferredLanguage string + AvailableAt *time.Time } // InsertMessageWithRecipients persists a Message together with one or @@ -120,6 +127,7 @@ func (s *Store) InsertMessageWithRecipients(ctx context.Context, msg MessageInse rcptStmt := r.INSERT( r.RecipientID, r.MessageID, r.GameID, r.UserID, r.RecipientUserName, r.RecipientRaceName, + r.RecipientPreferredLanguage, r.AvailableAt, ) for _, in := range recipients { rcptStmt = rcptStmt.VALUES( @@ -129,6 +137,8 @@ func (s *Store) InsertMessageWithRecipients(ctx context.Context, msg MessageInse in.UserID, in.RecipientUserName, stringPtrArg(in.RecipientRaceName), + in.RecipientPreferredLanguage, + timePtrArg(in.AvailableAt), ) } rcptStmt = rcptStmt.RETURNING(recipientColumns()) @@ -198,7 +208,9 @@ func (s *Store) LoadInboxEntry(ctx context.Context, messageID, userID uuid.UUID) // ListInbox returns the recipient view of messages addressed to // userID in gameID, newest first. Soft-deleted rows -// (`deleted_at IS NOT NULL`) are excluded. +// (`deleted_at IS NOT NULL`) are excluded. Rows still waiting for +// the async translation worker (`available_at IS NULL`) are also +// excluded — they will appear once delivery is complete. func (s *Store) ListInbox(ctx context.Context, gameID, userID uuid.UUID) ([]InboxEntry, error) { m := table.DiplomailMessages r := table.DiplomailRecipients @@ -208,7 +220,8 @@ func (s *Store) ListInbox(ctx context.Context, gameID, userID uuid.UUID) ([]Inbo WHERE( r.UserID.EQ(postgres.UUID(userID)). AND(r.GameID.EQ(postgres.UUID(gameID))). - AND(r.DeletedAt.IS_NULL()), + AND(r.DeletedAt.IS_NULL()). + AND(r.AvailableAt.IS_NOT_NULL()), ). ORDER_BY(m.CreatedAt.DESC(), m.MessageID.DESC()) var dest []struct { @@ -336,9 +349,10 @@ func (s *Store) LoadRecipient(ctx context.Context, messageID, userID uuid.UUID) return recipientFromModel(row), nil } -// UnreadCountForUserGame returns the count of unread, non-deleted -// messages addressed to userID in gameID. Backs the push payload -// `unread_game` field. +// UnreadCountForUserGame returns the count of unread, non-deleted, +// delivered messages addressed to userID in gameID. Recipients +// still waiting for translation (`available_at IS NULL`) are +// excluded so the badge does not flicker. func (s *Store) UnreadCountForUserGame(ctx context.Context, gameID, userID uuid.UUID) (int, error) { r := table.DiplomailRecipients stmt := postgres.SELECT(postgres.COUNT(postgres.STAR).AS("count")). @@ -347,7 +361,8 @@ func (s *Store) UnreadCountForUserGame(ctx context.Context, gameID, userID uuid. r.UserID.EQ(postgres.UUID(userID)). AND(r.GameID.EQ(postgres.UUID(gameID))). AND(r.ReadAt.IS_NULL()). - AND(r.DeletedAt.IS_NULL()), + AND(r.DeletedAt.IS_NULL()). + AND(r.AvailableAt.IS_NOT_NULL()), ) var dest struct { Count int64 `alias:"count"` @@ -358,6 +373,149 @@ func (s *Store) UnreadCountForUserGame(ctx context.Context, gameID, userID uuid. return int(dest.Count), nil } +// PendingTranslationPair carries one unit of work picked by the +// translation worker. Multiple recipients of the same message that +// share a preferred_language collapse into one pair, because the +// translation is shared via the diplomail_translations cache. +// CurrentAttempts is the highest `translation_attempts` value across +// the matching recipient rows, so the worker can decide whether the +// next attempt is the last one before falling back. +type PendingTranslationPair struct { + MessageID uuid.UUID + TargetLang string + CurrentAttempts int32 +} + +// PickPendingTranslationPair returns one pair eligible for the +// translation worker, or `ok == false` when the queue is empty. The +// pair is the (message, target_lang) of any recipient where +// `available_at IS NULL` and `next_translation_attempt_at` is either +// unset or already due. The query intentionally drops the +// `FOR UPDATE` clause — the worker is single-threaded per process, +// and the optimistic UPDATE in `MarkPairDelivered` / +// `MarkPairFallback` filters by `available_at IS NULL`, so a stale +// pickup never delivers twice. +func (s *Store) PickPendingTranslationPair(ctx context.Context, now time.Time) (PendingTranslationPair, bool, error) { + r := table.DiplomailRecipients + stmt := postgres.SELECT( + r.MessageID.AS("message_id"), + r.RecipientPreferredLanguage.AS("target_lang"), + postgres.MAX(r.TranslationAttempts).AS("attempts"), + ). + FROM(r). + WHERE( + r.AvailableAt.IS_NULL(). + AND(r.RecipientPreferredLanguage.NOT_EQ(postgres.String(""))). + AND(r.NextTranslationAttemptAt.IS_NULL(). + OR(r.NextTranslationAttemptAt.LT_EQ(postgres.TimestampzT(now.UTC())))), + ). + GROUP_BY(r.MessageID, r.RecipientPreferredLanguage). + ORDER_BY(r.MessageID.ASC(), r.RecipientPreferredLanguage.ASC()). + LIMIT(1) + var dest struct { + MessageID uuid.UUID `alias:"message_id"` + TargetLang string `alias:"target_lang"` + Attempts int32 `alias:"attempts"` + } + if err := stmt.QueryContext(ctx, s.db, &dest); err != nil { + if errors.Is(err, qrm.ErrNoRows) { + return PendingTranslationPair{}, false, nil + } + return PendingTranslationPair{}, false, fmt.Errorf("diplomail store: pick pending pair: %w", err) + } + if dest.MessageID == (uuid.UUID{}) { + return PendingTranslationPair{}, false, nil + } + return PendingTranslationPair{ + MessageID: dest.MessageID, + TargetLang: dest.TargetLang, + CurrentAttempts: dest.Attempts, + }, true, nil +} + +// MarkPairDelivered flips every still-pending recipient of (messageID, +// targetLang) to `available_at = at`, optionally persisting the +// translation row alongside in the same transaction. Returns the +// recipients that were just delivered (used by the worker to fan out +// push events). +func (s *Store) MarkPairDelivered(ctx context.Context, messageID uuid.UUID, targetLang string, translation *Translation, at time.Time) ([]Recipient, error) { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return nil, fmt.Errorf("diplomail store: begin deliver tx: %w", err) + } + defer func() { _ = tx.Rollback() }() + + if translation != nil { + t := table.DiplomailTranslations + ins := t.INSERT( + t.TranslationID, t.MessageID, t.TargetLang, + t.TranslatedSubject, t.TranslatedBody, t.Translator, + ).VALUES( + translation.TranslationID, translation.MessageID, translation.TargetLang, + translation.TranslatedSubject, translation.TranslatedBody, translation.Translator, + ).ON_CONFLICT(t.MessageID, t.TargetLang).DO_NOTHING() + if _, err := ins.ExecContext(ctx, tx); err != nil { + return nil, fmt.Errorf("diplomail store: upsert translation: %w", err) + } + } + + r := table.DiplomailRecipients + upd := r.UPDATE(r.AvailableAt, r.NextTranslationAttemptAt). + SET(postgres.TimestampzT(at.UTC()), postgres.NULL). + WHERE( + r.MessageID.EQ(postgres.UUID(messageID)). + AND(r.RecipientPreferredLanguage.EQ(postgres.String(targetLang))). + AND(r.AvailableAt.IS_NULL()), + ). + RETURNING(recipientColumns()) + + var rows []model.DiplomailRecipients + if err := upd.QueryContext(ctx, tx, &rows); err != nil { + return nil, fmt.Errorf("diplomail store: mark pair delivered: %w", err) + } + if err := tx.Commit(); err != nil { + return nil, fmt.Errorf("diplomail store: commit deliver: %w", err) + } + out := make([]Recipient, 0, len(rows)) + for _, row := range rows { + out = append(out, recipientFromModel(row)) + } + return out, nil +} + +// SchedulePairRetry bumps the attempt counter and schedules the next +// translation attempt for `next`. The recipient rows stay in the +// pending queue (`available_at IS NULL`). Returns the new attempt +// counter so the worker can decide whether to fall back to the +// original on the next pickup. +func (s *Store) SchedulePairRetry(ctx context.Context, messageID uuid.UUID, targetLang string, next time.Time) (int32, error) { + r := table.DiplomailRecipients + upd := r.UPDATE(r.TranslationAttempts, r.NextTranslationAttemptAt). + SET(r.TranslationAttempts.ADD(postgres.Int(1)), postgres.TimestampzT(next.UTC())). + WHERE( + r.MessageID.EQ(postgres.UUID(messageID)). + AND(r.RecipientPreferredLanguage.EQ(postgres.String(targetLang))). + AND(r.AvailableAt.IS_NULL()), + ). + RETURNING(r.TranslationAttempts) + var dest []struct { + TranslationAttempts int32 `alias:"diplomail_recipients.translation_attempts"` + } + if err := upd.QueryContext(ctx, s.db, &dest); err != nil { + return 0, fmt.Errorf("diplomail store: schedule pair retry: %w", err) + } + if len(dest) == 0 { + return 0, nil + } + max := dest[0].TranslationAttempts + for _, d := range dest[1:] { + if d.TranslationAttempts > max { + max = d.TranslationAttempts + } + } + return max, nil +} + // translationColumns is the canonical projection for // diplomail_translations reads. func translationColumns() postgres.ColumnList { @@ -532,7 +690,8 @@ func (s *Store) UnreadCountsForUser(ctx context.Context, userID uuid.UUID) ([]Un WHERE( r.UserID.EQ(postgres.UUID(userID)). AND(r.ReadAt.IS_NULL()). - AND(r.DeletedAt.IS_NULL()), + AND(r.DeletedAt.IS_NULL()). + AND(r.AvailableAt.IS_NOT_NULL()), ). GROUP_BY(r.GameID). ORDER_BY(postgres.MAX(m.GameName).ASC()) @@ -584,15 +743,19 @@ func messageFromModel(row model.DiplomailMessages) Message { // recipientFromModel converts a jet-generated row to the domain type. func recipientFromModel(row model.DiplomailRecipients) Recipient { out := Recipient{ - RecipientID: row.RecipientID, - MessageID: row.MessageID, - GameID: row.GameID, - UserID: row.UserID, - RecipientUserName: row.RecipientUserName, - DeliveredAt: row.DeliveredAt, - ReadAt: row.ReadAt, - DeletedAt: row.DeletedAt, - NotifiedAt: row.NotifiedAt, + RecipientID: row.RecipientID, + MessageID: row.MessageID, + GameID: row.GameID, + UserID: row.UserID, + RecipientUserName: row.RecipientUserName, + RecipientPreferredLanguage: row.RecipientPreferredLanguage, + AvailableAt: row.AvailableAt, + TranslationAttempts: row.TranslationAttempts, + NextTranslationAttemptAt: row.NextTranslationAttemptAt, + DeliveredAt: row.DeliveredAt, + ReadAt: row.ReadAt, + DeletedAt: row.DeletedAt, + NotifiedAt: row.NotifiedAt, } if row.RecipientRaceName != nil { name := *row.RecipientRaceName @@ -629,3 +792,12 @@ func stringPtrArg(v *string) postgres.Expression { } return postgres.String(*v) } + +// timePtrArg returns the jet argument expression for a nullable +// timestamptz column. +func timePtrArg(v *time.Time) postgres.Expression { + if v == nil { + return postgres.NULL + } + return postgres.TimestampzT(v.UTC()) +} diff --git a/backend/internal/diplomail/translator/libretranslate.go b/backend/internal/diplomail/translator/libretranslate.go new file mode 100644 index 0000000..ec66391 --- /dev/null +++ b/backend/internal/diplomail/translator/libretranslate.go @@ -0,0 +1,154 @@ +package translator + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// LibreTranslateEngine is the engine identifier persisted in +// `diplomail_translations.translator` for cache rows produced by the +// LibreTranslate client. +const LibreTranslateEngine = "libretranslate" + +// LibreTranslateConfig configures the HTTP client. URL is the base +// of the deployed instance (without `/translate`). Timeout bounds a +// single HTTP request; the worker layers retry / backoff on top. +type LibreTranslateConfig struct { + URL string + Timeout time.Duration +} + +// ErrUnsupportedLanguagePair classifies a LibreTranslate 400 response +// that indicates the engine cannot translate between the requested +// source / target codes. The worker treats this as terminal: no +// further retries, deliver the original. +var ErrUnsupportedLanguagePair = errors.New("translator: language pair not supported by libretranslate") + +// NewLibreTranslate constructs a Translator that posts to +// `/translate`. Returns an error when URL is empty so wiring +// catches "translator misconfigured" at startup rather than at +// first-translation-attempt. +func NewLibreTranslate(cfg LibreTranslateConfig) (Translator, error) { + url := strings.TrimRight(strings.TrimSpace(cfg.URL), "/") + if url == "" { + return nil, errors.New("translator: libretranslate URL must be set") + } + timeout := cfg.Timeout + if timeout <= 0 { + timeout = 10 * time.Second + } + return &libreTranslate{ + endpoint: url + "/translate", + client: &http.Client{Timeout: timeout}, + }, nil +} + +type libreTranslate struct { + endpoint string + client *http.Client +} + +// requestBody is the LibreTranslate POST /translate input shape. +// `q` is sent as a two-element array so the engine returns one +// translation per element in the same call (subject + body). +type requestBody struct { + Q []string `json:"q"` + Source string `json:"source"` + Target string `json:"target"` + Format string `json:"format"` +} + +// responseBody is the LibreTranslate output shape when `q` is an +// array. The single-string-q variant is a different shape; we never +// emit a single-q request so the client always sees the array form. +type responseBody struct { + TranslatedText []string `json:"translatedText"` + Error string `json:"error,omitempty"` +} + +// Translate posts subject + body to LibreTranslate, normalising the +// language codes and classifying the response. The 400 / unsupported- +// pair path is signalled by `ErrUnsupportedLanguagePair`. All other +// HTTP errors (timeout, 5xx, network failure) come back as wrapped +// errors so the worker can backoff and retry. +func (l *libreTranslate) Translate(ctx context.Context, srcLang, dstLang, subject, body string) (Result, error) { + src := normaliseLanguageCode(srcLang) + dst := normaliseLanguageCode(dstLang) + if src == "" || dst == "" { + return Result{}, fmt.Errorf("translator: missing source or target language (src=%q dst=%q)", srcLang, dstLang) + } + if src == dst { + return Result{Subject: subject, Body: body, Engine: NoopEngine}, nil + } + + reqBody, err := json.Marshal(requestBody{ + Q: []string{subject, body}, + Source: src, + Target: dst, + Format: "text", + }) + if err != nil { + return Result{}, fmt.Errorf("translator: marshal request: %w", err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, l.endpoint, bytes.NewReader(reqBody)) + if err != nil { + return Result{}, fmt.Errorf("translator: build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + + resp, err := l.client.Do(req) + if err != nil { + return Result{}, fmt.Errorf("translator: do request: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + raw, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + if err != nil { + return Result{}, fmt.Errorf("translator: read response: %w", err) + } + if resp.StatusCode == http.StatusBadRequest { + return Result{}, fmt.Errorf("%w: %s", ErrUnsupportedLanguagePair, strings.TrimSpace(string(raw))) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return Result{}, fmt.Errorf("translator: libretranslate http %d: %s", resp.StatusCode, strings.TrimSpace(string(raw))) + } + + var out responseBody + if err := json.Unmarshal(raw, &out); err != nil { + return Result{}, fmt.Errorf("translator: unmarshal response: %w", err) + } + if out.Error != "" { + return Result{}, fmt.Errorf("translator: libretranslate error: %s", out.Error) + } + if len(out.TranslatedText) != 2 { + return Result{}, fmt.Errorf("translator: libretranslate returned %d strings, want 2", len(out.TranslatedText)) + } + return Result{ + Subject: out.TranslatedText[0], + Body: out.TranslatedText[1], + Engine: LibreTranslateEngine, + }, nil +} + +// normaliseLanguageCode collapses a BCP 47 tag to the ISO 639-1 base +// that LibreTranslate expects (`en-US` → `en`, `EN` → `en`). The +// helper is mirrored on the diplomail service side; both sides need +// to use the same normalisation so cache keys line up. +func normaliseLanguageCode(tag string) string { + tag = strings.TrimSpace(tag) + if tag == "" { + return "" + } + if i := strings.IndexAny(tag, "-_"); i > 0 { + tag = tag[:i] + } + return strings.ToLower(tag) +} diff --git a/backend/internal/diplomail/translator/libretranslate_test.go b/backend/internal/diplomail/translator/libretranslate_test.go new file mode 100644 index 0000000..efba08a --- /dev/null +++ b/backend/internal/diplomail/translator/libretranslate_test.go @@ -0,0 +1,141 @@ +package translator + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +func TestLibreTranslateHappyPath(t *testing.T) { + t.Parallel() + var ( + requestSource string + requestTarget string + requestQ []string + requestFormat string + ) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + var in requestBody + if err := json.Unmarshal(body, &in); err != nil { + t.Errorf("unmarshal: %v", err) + } + requestSource = in.Source + requestTarget = in.Target + requestQ = in.Q + requestFormat = in.Format + _ = json.NewEncoder(w).Encode(responseBody{ + TranslatedText: []string{"[ru] " + in.Q[0], "[ru] " + in.Q[1]}, + }) + })) + t.Cleanup(server.Close) + + tr, err := NewLibreTranslate(LibreTranslateConfig{URL: server.URL, Timeout: 2 * time.Second}) + if err != nil { + t.Fatalf("new: %v", err) + } + res, err := tr.Translate(context.Background(), "en", "ru", "Hello", "World") + if err != nil { + t.Fatalf("translate: %v", err) + } + if res.Engine != LibreTranslateEngine { + t.Fatalf("engine = %q, want %q", res.Engine, LibreTranslateEngine) + } + if res.Subject != "[ru] Hello" || res.Body != "[ru] World" { + t.Fatalf("result = %+v", res) + } + if requestSource != "en" || requestTarget != "ru" || requestFormat != "text" { + t.Fatalf("request fields: src=%q dst=%q fmt=%q", requestSource, requestTarget, requestFormat) + } + if len(requestQ) != 2 || requestQ[0] != "Hello" || requestQ[1] != "World" { + t.Fatalf("request q = %v", requestQ) + } +} + +func TestLibreTranslateNormalisesLanguageCodes(t *testing.T) { + t.Parallel() + var src, dst string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + var in requestBody + _ = json.Unmarshal(body, &in) + src, dst = in.Source, in.Target + _ = json.NewEncoder(w).Encode(responseBody{TranslatedText: []string{"a", "b"}}) + })) + t.Cleanup(server.Close) + + tr, _ := NewLibreTranslate(LibreTranslateConfig{URL: server.URL}) + if _, err := tr.Translate(context.Background(), "EN-US", "ru-RU", "x", "y"); err != nil { + t.Fatalf("translate: %v", err) + } + if src != "en" || dst != "ru" { + t.Fatalf("normalised codes src=%q dst=%q, want en/ru", src, dst) + } +} + +func TestLibreTranslateUnsupportedPair(t *testing.T) { + t.Parallel() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"language not supported"}`)) + })) + t.Cleanup(server.Close) + + tr, _ := NewLibreTranslate(LibreTranslateConfig{URL: server.URL}) + _, err := tr.Translate(context.Background(), "en", "xx", "subject", "body") + if !errors.Is(err, ErrUnsupportedLanguagePair) { + t.Fatalf("err = %v, want ErrUnsupportedLanguagePair", err) + } +} + +func TestLibreTranslateServerError(t *testing.T) { + t.Parallel() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("kaboom")) + })) + t.Cleanup(server.Close) + + tr, _ := NewLibreTranslate(LibreTranslateConfig{URL: server.URL}) + _, err := tr.Translate(context.Background(), "en", "ru", "subject", "body") + if err == nil { + t.Fatalf("expected error, got nil") + } + if errors.Is(err, ErrUnsupportedLanguagePair) { + t.Fatalf("err mis-classified as unsupported pair: %v", err) + } + if !strings.Contains(err.Error(), "500") { + t.Fatalf("err = %v, want mention of 500", err) + } +} + +func TestLibreTranslateSameSourceAndTargetIsNoop(t *testing.T) { + t.Parallel() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Errorf("translator should not call the server for identical src/dst: %s", r.URL.Path) + })) + t.Cleanup(server.Close) + + tr, _ := NewLibreTranslate(LibreTranslateConfig{URL: server.URL}) + res, err := tr.Translate(context.Background(), "en", "EN", "x", "y") + if err != nil { + t.Fatalf("translate: %v", err) + } + if res.Engine != NoopEngine { + t.Fatalf("engine = %q, want %q", res.Engine, NoopEngine) + } +} + +func TestLibreTranslateRequiresURL(t *testing.T) { + t.Parallel() + _, err := NewLibreTranslate(LibreTranslateConfig{URL: ""}) + if err == nil { + t.Fatalf("expected error for empty URL") + } +} diff --git a/backend/internal/diplomail/types.go b/backend/internal/diplomail/types.go index 1ff47dd..f1b29f7 100644 --- a/backend/internal/diplomail/types.go +++ b/backend/internal/diplomail/types.go @@ -33,20 +33,36 @@ type Message struct { // Recipient mirrors a row in `backend.diplomail_recipients`. The // per-recipient state (read/deleted/delivered/notified) lives here. -// RecipientUserName and RecipientRaceName are snapshots taken at -// insert time so the inbox listing and admin search render correctly -// even after the source rows are renamed or revoked. +// RecipientUserName, RecipientRaceName, and +// RecipientPreferredLanguage are snapshots taken at insert time so +// the inbox listing, admin search, and translation worker render +// correctly even after the source rows are renamed or revoked. +// +// AvailableAt encodes the async-translation contract introduced in +// Stage E: +// +// - non-nil → message is visible to the recipient (in inbox / +// unread counts / push events) starting from this timestamp; +// - nil → recipient is waiting for the translation worker to fan +// out the translated rendering. The translation_attempts counter +// tracks the number of failed LibreTranslate calls; the worker +// gives up after `MaxTranslationAttempts` and falls back to the +// original body, flipping AvailableAt to now(). type Recipient struct { - RecipientID uuid.UUID - MessageID uuid.UUID - GameID uuid.UUID - UserID uuid.UUID - RecipientUserName string - RecipientRaceName *string - DeliveredAt *time.Time - ReadAt *time.Time - DeletedAt *time.Time - NotifiedAt *time.Time + RecipientID uuid.UUID + MessageID uuid.UUID + GameID uuid.UUID + UserID uuid.UUID + RecipientUserName string + RecipientRaceName *string + RecipientPreferredLanguage string + AvailableAt *time.Time + TranslationAttempts int32 + NextTranslationAttemptAt *time.Time + DeliveredAt *time.Time + ReadAt *time.Time + DeletedAt *time.Time + NotifiedAt *time.Time } // InboxEntry is the read-side projection composed of a Message and the diff --git a/backend/internal/diplomail/worker.go b/backend/internal/diplomail/worker.go new file mode 100644 index 0000000..9cacb43 --- /dev/null +++ b/backend/internal/diplomail/worker.go @@ -0,0 +1,209 @@ +package diplomail + +import ( + "context" + "errors" + "time" + + "galaxy/backend/internal/diplomail/translator" + + "github.com/google/uuid" + "go.uber.org/zap" +) + +// translationBackoff returns the sleep applied before retry attempt +// `attempt`. attempt is 1-indexed (the value the row carries AFTER +// the failure is recorded). The schedule mirrors the spec — +// 1s → 2s → 4s → 8s → 16s — so 5 failed attempts span ~31 seconds +// before the worker falls back to delivering the original. +func translationBackoff(attempt int32) time.Duration { + if attempt <= 0 { + return 0 + } + out := time.Second + for i := int32(1); i < attempt; i++ { + out *= 2 + } + const cap = 60 * time.Second + if out > cap { + return cap + } + return out +} + +// Worker drives the async translation pipeline. Each tick picks a +// single (message_id, target_lang) pair from +// `diplomail_recipients` where `available_at IS NULL`, asks the +// configured Translator to render the body, and either delivers the +// pending recipients (success) or schedules a retry (transient +// failure) or delivers them with a fallback to the original body +// (terminal failure / max attempts). +// +// The worker is single-threaded by design: one HTTP call to +// LibreTranslate at a time. This protects the upstream from spikes +// and keeps the implementation reviewable. +// +// Implements `internal/app.Component` so it plugs into the same +// lifecycle as the mail and notification workers. +type Worker struct { + svc *Service +} + +// NewWorker constructs a Worker bound to svc. Returning a non-nil +// Worker even when the translator is the noop fallback is +// intentional — the pickup query still works and falls through to +// fallback delivery, which is the desired behaviour for setups +// without LibreTranslate. +func NewWorker(svc *Service) *Worker { return &Worker{svc: svc} } + +// Run drives the worker loop until ctx is cancelled. +func (w *Worker) Run(ctx context.Context) error { + if w == nil || w.svc == nil { + return nil + } + logger := w.svc.deps.Logger.Named("worker") + interval := w.svc.deps.Config.WorkerInterval + if interval <= 0 { + interval = 2 * time.Second + } + if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) { + logger.Warn("diplomail worker initial tick failed", zap.Error(err)) + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) { + logger.Warn("diplomail worker tick failed", zap.Error(err)) + } + } + } +} + +// Shutdown is a no-op: every translation outcome is committed inside +// tick before returning, so cancelling the parent ctx is enough. +func (w *Worker) Shutdown(_ context.Context) error { return nil } + +// Tick exposes the per-tick work for tests so they can drive the +// worker without depending on the ticker. +func (w *Worker) Tick(ctx context.Context) error { return w.tick(ctx) } + +// tick picks one pair from the queue and applies the result. The +// per-tick budget is one pair on purpose: the worker is single +// threaded and we do not want a fast LibreTranslate instance to +// starve the rest of the backend's I/O behind a long-running batch. +func (w *Worker) tick(ctx context.Context) error { + if ctx.Err() != nil { + return ctx.Err() + } + pair, ok, err := w.svc.deps.Store.PickPendingTranslationPair(ctx, w.svc.nowUTC()) + if err != nil { + return err + } + if !ok { + return nil + } + return w.processPair(ctx, pair) +} + +// processPair runs the full pipeline for one (message, target_lang). +// Steps: +// +// 1. Load the source message. +// 2. Check the translation cache. If a row already exists (another +// worker pre-populated it, or two pairs converged on the same +// target), reuse it and deliver. +// 3. Otherwise call the configured Translator. +// 4. Apply the outcome: success → cache + deliver; unsupported +// pair → deliver fallback (no cache row); other failure → +// schedule retry or deliver fallback after MaxAttempts. +// 5. Fan out push events for every recipient whose `available_at` +// just transitioned. +func (w *Worker) processPair(ctx context.Context, pair PendingTranslationPair) error { + logger := w.svc.deps.Logger.Named("worker").With( + zap.String("message_id", pair.MessageID.String()), + zap.String("target_lang", pair.TargetLang), + ) + msg, err := w.svc.deps.Store.LoadMessage(ctx, pair.MessageID) + if err != nil { + return err + } + + if cached, err := w.svc.deps.Store.LoadTranslation(ctx, pair.MessageID, pair.TargetLang); err == nil { + t := cached + return w.deliverPair(ctx, msg, pair.TargetLang, &t, logger) + } else if !errors.Is(err, ErrNotFound) { + return err + } + + result, callErr := w.svc.deps.Translator.Translate(ctx, msg.BodyLang, pair.TargetLang, msg.Subject, msg.Body) + if callErr == nil && result.Engine != "" && result.Engine != translator.NoopEngine { + tr := Translation{ + TranslationID: uuid.New(), + MessageID: msg.MessageID, + TargetLang: pair.TargetLang, + TranslatedSubject: result.Subject, + TranslatedBody: result.Body, + Translator: result.Engine, + } + return w.deliverPair(ctx, msg, pair.TargetLang, &tr, logger) + } + if callErr == nil { + // Noop translator (or engine returned empty). Treat as + // "translation unavailable" — deliver fallback so users + // see the original. + logger.Debug("translator returned noop, delivering fallback") + return w.deliverPair(ctx, msg, pair.TargetLang, nil, logger) + } + if errors.Is(callErr, translator.ErrUnsupportedLanguagePair) { + logger.Info("language pair unsupported, delivering fallback", zap.Error(callErr)) + return w.deliverPair(ctx, msg, pair.TargetLang, nil, logger) + } + + // Transient failure — bump the attempts counter and schedule a + // retry. The next attempt timestamp is computed from the + // post-increment counter so the spec's 1s→2s→4s→8s→16s schedule + // applies between retries of the same pair. + maxAttempts := w.svc.deps.Config.TranslatorMaxAttempts + if maxAttempts <= 0 { + maxAttempts = 5 + } + nextAttempt := pair.CurrentAttempts + 1 + if int(nextAttempt) >= maxAttempts { + logger.Warn("translator max attempts reached, delivering fallback", + zap.Int32("attempts", nextAttempt), zap.Error(callErr)) + return w.deliverPair(ctx, msg, pair.TargetLang, nil, logger) + } + next := w.svc.nowUTC().Add(translationBackoff(nextAttempt + 1)) + if _, err := w.svc.deps.Store.SchedulePairRetry(ctx, pair.MessageID, pair.TargetLang, next); err != nil { + return err + } + logger.Info("translator attempt failed, scheduled retry", + zap.Int32("attempts", nextAttempt), + zap.Time("next_attempt_at", next), + zap.Error(callErr)) + return nil +} + +// deliverPair flips every still-pending recipient of (messageID, +// targetLang) to delivered, optionally inserting the translation row +// in the same transaction, and emits push events to the recipients +// who were just unblocked. +func (w *Worker) deliverPair(ctx context.Context, msg Message, targetLang string, translation *Translation, logger *zap.Logger) error { + recipients, err := w.svc.deps.Store.MarkPairDelivered(ctx, msg.MessageID, targetLang, translation, w.svc.nowUTC()) + if err != nil { + return err + } + if len(recipients) == 0 { + logger.Debug("deliver yielded no recipients (already delivered)") + return nil + } + for _, r := range recipients { + w.svc.publishMessageReceived(ctx, msg, r) + } + return nil +} + diff --git a/backend/internal/postgres/jet/backend/model/diplomail_recipients.go b/backend/internal/postgres/jet/backend/model/diplomail_recipients.go index af27d73..59b0ef6 100644 --- a/backend/internal/postgres/jet/backend/model/diplomail_recipients.go +++ b/backend/internal/postgres/jet/backend/model/diplomail_recipients.go @@ -13,14 +13,18 @@ import ( ) type DiplomailRecipients struct { - RecipientID uuid.UUID `sql:"primary_key"` - MessageID uuid.UUID - GameID uuid.UUID - UserID uuid.UUID - RecipientUserName string - RecipientRaceName *string - DeliveredAt *time.Time - ReadAt *time.Time - DeletedAt *time.Time - NotifiedAt *time.Time + RecipientID uuid.UUID `sql:"primary_key"` + MessageID uuid.UUID + GameID uuid.UUID + UserID uuid.UUID + RecipientUserName string + RecipientRaceName *string + RecipientPreferredLanguage string + AvailableAt *time.Time + TranslationAttempts int32 + NextTranslationAttemptAt *time.Time + DeliveredAt *time.Time + ReadAt *time.Time + DeletedAt *time.Time + NotifiedAt *time.Time } diff --git a/backend/internal/postgres/jet/backend/table/diplomail_recipients.go b/backend/internal/postgres/jet/backend/table/diplomail_recipients.go index 43afaac..024f113 100644 --- a/backend/internal/postgres/jet/backend/table/diplomail_recipients.go +++ b/backend/internal/postgres/jet/backend/table/diplomail_recipients.go @@ -17,16 +17,20 @@ type diplomailRecipientsTable struct { postgres.Table // Columns - RecipientID postgres.ColumnString - MessageID postgres.ColumnString - GameID postgres.ColumnString - UserID postgres.ColumnString - RecipientUserName postgres.ColumnString - RecipientRaceName postgres.ColumnString - DeliveredAt postgres.ColumnTimestampz - ReadAt postgres.ColumnTimestampz - DeletedAt postgres.ColumnTimestampz - NotifiedAt postgres.ColumnTimestampz + RecipientID postgres.ColumnString + MessageID postgres.ColumnString + GameID postgres.ColumnString + UserID postgres.ColumnString + RecipientUserName postgres.ColumnString + RecipientRaceName postgres.ColumnString + RecipientPreferredLanguage postgres.ColumnString + AvailableAt postgres.ColumnTimestampz + TranslationAttempts postgres.ColumnInteger + NextTranslationAttemptAt postgres.ColumnTimestampz + DeliveredAt postgres.ColumnTimestampz + ReadAt postgres.ColumnTimestampz + DeletedAt postgres.ColumnTimestampz + NotifiedAt postgres.ColumnTimestampz AllColumns postgres.ColumnList MutableColumns postgres.ColumnList @@ -68,35 +72,43 @@ func newDiplomailRecipientsTable(schemaName, tableName, alias string) *Diplomail func newDiplomailRecipientsTableImpl(schemaName, tableName, alias string) diplomailRecipientsTable { var ( - RecipientIDColumn = postgres.StringColumn("recipient_id") - MessageIDColumn = postgres.StringColumn("message_id") - GameIDColumn = postgres.StringColumn("game_id") - UserIDColumn = postgres.StringColumn("user_id") - RecipientUserNameColumn = postgres.StringColumn("recipient_user_name") - RecipientRaceNameColumn = postgres.StringColumn("recipient_race_name") - DeliveredAtColumn = postgres.TimestampzColumn("delivered_at") - ReadAtColumn = postgres.TimestampzColumn("read_at") - DeletedAtColumn = postgres.TimestampzColumn("deleted_at") - NotifiedAtColumn = postgres.TimestampzColumn("notified_at") - allColumns = postgres.ColumnList{RecipientIDColumn, MessageIDColumn, GameIDColumn, UserIDColumn, RecipientUserNameColumn, RecipientRaceNameColumn, DeliveredAtColumn, ReadAtColumn, DeletedAtColumn, NotifiedAtColumn} - mutableColumns = postgres.ColumnList{MessageIDColumn, GameIDColumn, UserIDColumn, RecipientUserNameColumn, RecipientRaceNameColumn, DeliveredAtColumn, ReadAtColumn, DeletedAtColumn, NotifiedAtColumn} - defaultColumns = postgres.ColumnList{} + RecipientIDColumn = postgres.StringColumn("recipient_id") + MessageIDColumn = postgres.StringColumn("message_id") + GameIDColumn = postgres.StringColumn("game_id") + UserIDColumn = postgres.StringColumn("user_id") + RecipientUserNameColumn = postgres.StringColumn("recipient_user_name") + RecipientRaceNameColumn = postgres.StringColumn("recipient_race_name") + RecipientPreferredLanguageColumn = postgres.StringColumn("recipient_preferred_language") + AvailableAtColumn = postgres.TimestampzColumn("available_at") + TranslationAttemptsColumn = postgres.IntegerColumn("translation_attempts") + NextTranslationAttemptAtColumn = postgres.TimestampzColumn("next_translation_attempt_at") + DeliveredAtColumn = postgres.TimestampzColumn("delivered_at") + ReadAtColumn = postgres.TimestampzColumn("read_at") + DeletedAtColumn = postgres.TimestampzColumn("deleted_at") + NotifiedAtColumn = postgres.TimestampzColumn("notified_at") + allColumns = postgres.ColumnList{RecipientIDColumn, MessageIDColumn, GameIDColumn, UserIDColumn, RecipientUserNameColumn, RecipientRaceNameColumn, RecipientPreferredLanguageColumn, AvailableAtColumn, TranslationAttemptsColumn, NextTranslationAttemptAtColumn, DeliveredAtColumn, ReadAtColumn, DeletedAtColumn, NotifiedAtColumn} + mutableColumns = postgres.ColumnList{MessageIDColumn, GameIDColumn, UserIDColumn, RecipientUserNameColumn, RecipientRaceNameColumn, RecipientPreferredLanguageColumn, AvailableAtColumn, TranslationAttemptsColumn, NextTranslationAttemptAtColumn, DeliveredAtColumn, ReadAtColumn, DeletedAtColumn, NotifiedAtColumn} + defaultColumns = postgres.ColumnList{RecipientPreferredLanguageColumn, TranslationAttemptsColumn} ) return diplomailRecipientsTable{ Table: postgres.NewTable(schemaName, tableName, alias, allColumns...), //Columns - RecipientID: RecipientIDColumn, - MessageID: MessageIDColumn, - GameID: GameIDColumn, - UserID: UserIDColumn, - RecipientUserName: RecipientUserNameColumn, - RecipientRaceName: RecipientRaceNameColumn, - DeliveredAt: DeliveredAtColumn, - ReadAt: ReadAtColumn, - DeletedAt: DeletedAtColumn, - NotifiedAt: NotifiedAtColumn, + RecipientID: RecipientIDColumn, + MessageID: MessageIDColumn, + GameID: GameIDColumn, + UserID: UserIDColumn, + RecipientUserName: RecipientUserNameColumn, + RecipientRaceName: RecipientRaceNameColumn, + RecipientPreferredLanguage: RecipientPreferredLanguageColumn, + AvailableAt: AvailableAtColumn, + TranslationAttempts: TranslationAttemptsColumn, + NextTranslationAttemptAt: NextTranslationAttemptAtColumn, + DeliveredAt: DeliveredAtColumn, + ReadAt: ReadAtColumn, + DeletedAt: DeletedAtColumn, + NotifiedAt: NotifiedAtColumn, AllColumns: allColumns, MutableColumns: mutableColumns, diff --git a/backend/internal/postgres/migrations/00001_init.sql b/backend/internal/postgres/migrations/00001_init.sql index 8762397..c749749 100644 --- a/backend/internal/postgres/migrations/00001_init.sql +++ b/backend/internal/postgres/migrations/00001_init.sql @@ -722,16 +722,20 @@ CREATE INDEX diplomail_messages_sender_user_idx -- rare admin notifications addressed to a player who no longer has an -- active membership in the game. CREATE TABLE diplomail_recipients ( - recipient_id uuid PRIMARY KEY, - message_id uuid NOT NULL REFERENCES diplomail_messages (message_id) ON DELETE CASCADE, - game_id uuid NOT NULL, - user_id uuid NOT NULL, - recipient_user_name text NOT NULL, - recipient_race_name text, - delivered_at timestamptz, - read_at timestamptz, - deleted_at timestamptz, - notified_at timestamptz, + recipient_id uuid PRIMARY KEY, + message_id uuid NOT NULL REFERENCES diplomail_messages (message_id) ON DELETE CASCADE, + game_id uuid NOT NULL, + user_id uuid NOT NULL, + recipient_user_name text NOT NULL, + recipient_race_name text, + recipient_preferred_language text NOT NULL DEFAULT '', + available_at timestamptz, + translation_attempts integer NOT NULL DEFAULT 0, + next_translation_attempt_at timestamptz, + delivered_at timestamptz, + read_at timestamptz, + deleted_at timestamptz, + notified_at timestamptz, CONSTRAINT diplomail_recipients_unique UNIQUE (message_id, user_id) ); @@ -740,7 +744,17 @@ CREATE INDEX diplomail_recipients_inbox_idx CREATE INDEX diplomail_recipients_unread_idx ON diplomail_recipients (user_id, game_id) - WHERE read_at IS NULL AND deleted_at IS NULL; + WHERE read_at IS NULL AND deleted_at IS NULL AND available_at IS NOT NULL; + +-- Index drives the translation worker's pending-pair pickup. The +-- partial filter keeps the scan tight: terminal-state recipients +-- (with a non-NULL available_at) never appear in this btree. The +-- composite ordering puts the next-attempt clock first so the +-- backoff filter (`next_translation_attempt_at <= now()`) seeks +-- before the secondary cluster on (message_id, lang). +CREATE INDEX diplomail_recipients_pending_translation_idx + ON diplomail_recipients (next_translation_attempt_at, message_id, recipient_preferred_language) + WHERE available_at IS NULL; -- diplomail_translations caches one rendered translation per -- (message, target_lang) so a broadcast addressed to many recipients