package diplomail import ( "context" "errors" "time" "galaxy/backend/internal/diplomail/translator" "github.com/google/uuid" "go.uber.org/zap" ) // translationBackoff returns the sleep applied before retry attempt // `attempt`. attempt is 1-indexed (the value the row carries AFTER // the failure is recorded). The schedule mirrors the spec — // 1s → 2s → 4s → 8s → 16s — so 5 failed attempts span ~31 seconds // before the worker falls back to delivering the original. func translationBackoff(attempt int32) time.Duration { if attempt <= 0 { return 0 } out := time.Second for i := int32(1); i < attempt; i++ { out *= 2 } const cap = 60 * time.Second if out > cap { return cap } return out } // Worker drives the async translation pipeline. Each tick picks a // single (message_id, target_lang) pair from // `diplomail_recipients` where `available_at IS NULL`, asks the // configured Translator to render the body, and either delivers the // pending recipients (success) or schedules a retry (transient // failure) or delivers them with a fallback to the original body // (terminal failure / max attempts). // // The worker is single-threaded by design: one HTTP call to // LibreTranslate at a time. This protects the upstream from spikes // and keeps the implementation reviewable. // // Implements `internal/app.Component` so it plugs into the same // lifecycle as the mail and notification workers. type Worker struct { svc *Service } // NewWorker constructs a Worker bound to svc. Returning a non-nil // Worker even when the translator is the noop fallback is // intentional — the pickup query still works and falls through to // fallback delivery, which is the desired behaviour for setups // without LibreTranslate. func NewWorker(svc *Service) *Worker { return &Worker{svc: svc} } // Run drives the worker loop until ctx is cancelled. func (w *Worker) Run(ctx context.Context) error { if w == nil || w.svc == nil { return nil } logger := w.svc.deps.Logger.Named("worker") interval := w.svc.deps.Config.WorkerInterval if interval <= 0 { interval = 2 * time.Second } if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) { logger.Warn("diplomail worker initial tick failed", zap.Error(err)) } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return nil case <-ticker.C: if err := w.tick(ctx); err != nil && !errors.Is(err, context.Canceled) { logger.Warn("diplomail worker tick failed", zap.Error(err)) } } } } // Shutdown is a no-op: every translation outcome is committed inside // tick before returning, so cancelling the parent ctx is enough. func (w *Worker) Shutdown(_ context.Context) error { return nil } // Tick exposes the per-tick work for tests so they can drive the // worker without depending on the ticker. func (w *Worker) Tick(ctx context.Context) error { return w.tick(ctx) } // tick picks one pair from the queue and applies the result. The // per-tick budget is one pair on purpose: the worker is single // threaded and we do not want a fast LibreTranslate instance to // starve the rest of the backend's I/O behind a long-running batch. func (w *Worker) tick(ctx context.Context) error { if ctx.Err() != nil { return ctx.Err() } pair, ok, err := w.svc.deps.Store.PickPendingTranslationPair(ctx, w.svc.nowUTC()) if err != nil { return err } if !ok { return nil } return w.processPair(ctx, pair) } // processPair runs the full pipeline for one (message, target_lang). // Steps: // // 1. Load the source message. // 2. Check the translation cache. If a row already exists (another // worker pre-populated it, or two pairs converged on the same // target), reuse it and deliver. // 3. Otherwise call the configured Translator. // 4. Apply the outcome: success → cache + deliver; unsupported // pair → deliver fallback (no cache row); other failure → // schedule retry or deliver fallback after MaxAttempts. // 5. Fan out push events for every recipient whose `available_at` // just transitioned. func (w *Worker) processPair(ctx context.Context, pair PendingTranslationPair) error { logger := w.svc.deps.Logger.Named("worker").With( zap.String("message_id", pair.MessageID.String()), zap.String("target_lang", pair.TargetLang), ) msg, err := w.svc.deps.Store.LoadMessage(ctx, pair.MessageID) if err != nil { return err } if cached, err := w.svc.deps.Store.LoadTranslation(ctx, pair.MessageID, pair.TargetLang); err == nil { t := cached return w.deliverPair(ctx, msg, pair.TargetLang, &t, logger) } else if !errors.Is(err, ErrNotFound) { return err } result, callErr := w.svc.deps.Translator.Translate(ctx, msg.BodyLang, pair.TargetLang, msg.Subject, msg.Body) if callErr == nil && result.Engine != "" && result.Engine != translator.NoopEngine { tr := Translation{ TranslationID: uuid.New(), MessageID: msg.MessageID, TargetLang: pair.TargetLang, TranslatedSubject: result.Subject, TranslatedBody: result.Body, Translator: result.Engine, } return w.deliverPair(ctx, msg, pair.TargetLang, &tr, logger) } if callErr == nil { // Noop translator (or engine returned empty). Treat as // "translation unavailable" — deliver fallback so users // see the original. logger.Debug("translator returned noop, delivering fallback") return w.deliverPair(ctx, msg, pair.TargetLang, nil, logger) } if errors.Is(callErr, translator.ErrUnsupportedLanguagePair) { logger.Info("language pair unsupported, delivering fallback", zap.Error(callErr)) return w.deliverPair(ctx, msg, pair.TargetLang, nil, logger) } // Transient failure — bump the attempts counter and schedule a // retry. The next attempt timestamp is computed from the // post-increment counter so the spec's 1s→2s→4s→8s→16s schedule // applies between retries of the same pair. maxAttempts := w.svc.deps.Config.TranslatorMaxAttempts if maxAttempts <= 0 { maxAttempts = 5 } nextAttempt := pair.CurrentAttempts + 1 if int(nextAttempt) >= maxAttempts { logger.Warn("translator max attempts reached, delivering fallback", zap.Int32("attempts", nextAttempt), zap.Error(callErr)) return w.deliverPair(ctx, msg, pair.TargetLang, nil, logger) } next := w.svc.nowUTC().Add(translationBackoff(nextAttempt + 1)) if _, err := w.svc.deps.Store.SchedulePairRetry(ctx, pair.MessageID, pair.TargetLang, next); err != nil { return err } logger.Info("translator attempt failed, scheduled retry", zap.Int32("attempts", nextAttempt), zap.Time("next_attempt_at", next), zap.Error(callErr)) return nil } // deliverPair flips every still-pending recipient of (messageID, // targetLang) to delivered, optionally inserting the translation row // in the same transaction, and emits push events to the recipients // who were just unblocked. func (w *Worker) deliverPair(ctx context.Context, msg Message, targetLang string, translation *Translation, logger *zap.Logger) error { recipients, err := w.svc.deps.Store.MarkPairDelivered(ctx, msg.MessageID, targetLang, translation, w.svc.nowUTC()) if err != nil { return err } if len(recipients) == 0 { logger.Debug("deliver yielded no recipients (already delivered)") return nil } for _, r := range recipients { w.svc.publishMessageReceived(ctx, msg, r) } return nil }