package worker import ( "context" "errors" "fmt" "log/slog" "strings" "time" "galaxy/notification/internal/api/intentstream" "galaxy/notification/internal/logging" "galaxy/notification/internal/service/acceptintent" "galaxy/notification/internal/service/publishmail" "galaxy/notification/internal/service/routestate" "github.com/redis/go-redis/v9" ) const ( emailFailureClassificationPayloadEncoding = "payload_encoding_failed" emailFailureClassificationMailStreamWrite = "mail_stream_publish_failed" ) // EmailRouteStateStore describes the durable route-state operations required // by EmailPublisher. type EmailRouteStateStore interface { // ListDueRoutes loads due scheduled routes. ListDueRoutes(context.Context, time.Time, int64) ([]routestate.ScheduledRoute, error) // TryAcquireRouteLease attempts to acquire one temporary route lease. TryAcquireRouteLease(context.Context, string, string, string, time.Duration) (bool, error) // ReleaseRouteLease best-effort releases one temporary route lease. ReleaseRouteLease(context.Context, string, string, string) error // GetNotification loads one accepted notification. GetNotification(context.Context, string) (acceptintent.NotificationRecord, bool, error) // GetRoute loads one accepted notification route. GetRoute(context.Context, string, string) (acceptintent.NotificationRoute, bool, error) // CompleteRoutePublished records one successful publication. CompleteRoutePublished(context.Context, routestate.CompleteRoutePublishedInput) error // CompleteRouteFailed records one retryable publication failure. CompleteRouteFailed(context.Context, routestate.CompleteRouteFailedInput) error // CompleteRouteDeadLetter records one exhausted publication failure. CompleteRouteDeadLetter(context.Context, routestate.CompleteRouteDeadLetterInput) error } // EmailCommandEncoder encodes one email-capable notification route into a // Mail Service-compatible generic command. type EmailCommandEncoder interface { // Encode converts notification plus route to one outbound command. Encode(acceptintent.NotificationRecord, acceptintent.NotificationRoute) (publishmail.Command, error) } // EmailPublisherConfig stores the dependencies and policies used by // EmailPublisher. type EmailPublisherConfig struct { // Store owns the durable route-state transitions. Store EmailRouteStateStore // MailDeliveryCommandsStream stores the outbound Mail Service command // stream name. MailDeliveryCommandsStream string // RouteLeaseTTL stores the temporary route-lease lifetime. RouteLeaseTTL time.Duration // RouteBackoffMin stores the minimum retry backoff. RouteBackoffMin time.Duration // RouteBackoffMax stores the maximum retry backoff. RouteBackoffMax time.Duration // PollInterval stores how long the worker waits before the next due-route // scan when no progress was made. PollInterval time.Duration // BatchSize stores the maximum number of due schedule members loaded per // scan. BatchSize int64 // Encoder stores the email command encoder. Encoder EmailCommandEncoder // Telemetry records route publication counters. Telemetry RoutePublisherTelemetry // Clock provides wall-clock timestamps. Clock Clock // StreamPublisher emits the outbound mail-delivery command before the // route's PostgreSQL state transition is committed. StreamPublisher StreamPublisher } // EmailPublisher publishes due email routes into the Mail Service command // stream with retry and dead-letter handling. type EmailPublisher struct { store EmailRouteStateStore mailDeliveryCommandsStream string routeLeaseTTL time.Duration routeBackoffMin time.Duration routeBackoffMax time.Duration pollInterval time.Duration batchSize int64 encoder EmailCommandEncoder telemetry RoutePublisherTelemetry clock Clock streamPublisher StreamPublisher workerToken string logger *slog.Logger } // NewEmailPublisher constructs the email publication worker. func NewEmailPublisher(cfg EmailPublisherConfig, logger *slog.Logger) (*EmailPublisher, error) { switch { case cfg.Store == nil: return nil, errors.New("new email publisher: nil store") case cfg.StreamPublisher == nil: return nil, errors.New("new email publisher: nil stream publisher") case strings.TrimSpace(cfg.MailDeliveryCommandsStream) == "": return nil, errors.New("new email publisher: mail delivery-commands stream must not be empty") case cfg.RouteLeaseTTL <= 0: return nil, errors.New("new email publisher: route lease ttl must be positive") case cfg.RouteBackoffMin <= 0: return nil, errors.New("new email publisher: route backoff min must be positive") case cfg.RouteBackoffMax <= 0: return nil, errors.New("new email publisher: route backoff max must be positive") case cfg.RouteBackoffMin > cfg.RouteBackoffMax: return nil, errors.New("new email publisher: route backoff min must not exceed route backoff max") } if cfg.PollInterval <= 0 { cfg.PollInterval = defaultPushPublisherPollInterval } if cfg.BatchSize <= 0 { cfg.BatchSize = defaultPushPublisherBatchSize } if cfg.Clock == nil { cfg.Clock = systemClock{} } if cfg.Encoder == nil { cfg.Encoder = publishmail.Encoder{} } if logger == nil { logger = slog.Default() } workerToken, err := newWorkerToken() if err != nil { return nil, fmt.Errorf("new email publisher: %w", err) } return &EmailPublisher{ store: cfg.Store, mailDeliveryCommandsStream: cfg.MailDeliveryCommandsStream, routeLeaseTTL: cfg.RouteLeaseTTL, routeBackoffMin: cfg.RouteBackoffMin, routeBackoffMax: cfg.RouteBackoffMax, pollInterval: cfg.PollInterval, batchSize: cfg.BatchSize, encoder: cfg.Encoder, telemetry: cfg.Telemetry, clock: cfg.Clock, streamPublisher: cfg.StreamPublisher, workerToken: workerToken, logger: logger.With("component", "email_publisher", "stream", cfg.MailDeliveryCommandsStream), }, nil } // Run starts the email publication loop and blocks until ctx is canceled or // an unexpected publication error occurs. func (publisher *EmailPublisher) Run(ctx context.Context) error { if ctx == nil { return errors.New("run email publisher: nil context") } if err := ctx.Err(); err != nil { return err } if publisher == nil { return errors.New("run email publisher: nil publisher") } publisher.logger.Info("email publisher started", "poll_interval", publisher.pollInterval.String(), "batch_size", publisher.batchSize, ) for { progress, err := publisher.publishDueRoutes(ctx) switch { case err == nil && progress: continue case err == nil: if waitErr := waitWithContext(ctx, publisher.pollInterval); waitErr != nil { publisher.logger.Info("email publisher stopped") return waitErr } case ctx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)): publisher.logger.Info("email publisher stopped") return ctx.Err() default: return fmt.Errorf("run email publisher: %w", err) } } } // Shutdown stops the email publisher within ctx. The worker relies on context // cancellation and a bounded polling interval, so it has no dedicated // resources to release here. func (publisher *EmailPublisher) Shutdown(ctx context.Context) error { if ctx == nil { return errors.New("shutdown email publisher: nil context") } if publisher == nil { return nil } return nil } func (publisher *EmailPublisher) publishDueRoutes(ctx context.Context) (bool, error) { now := publisher.now() dueRoutes, err := publisher.store.ListDueRoutes(ctx, now, publisher.batchSize) if err != nil { return false, err } progress := false for _, dueRoute := range dueRoutes { if !strings.HasPrefix(dueRoute.RouteID, "email:") { continue } processed, err := publisher.publishRoute(ctx, now, dueRoute) if err != nil { return progress, err } progress = progress || processed } return progress, nil } func (publisher *EmailPublisher) publishRoute(ctx context.Context, now time.Time, dueRoute routestate.ScheduledRoute) (bool, error) { acquired, err := publisher.store.TryAcquireRouteLease(ctx, dueRoute.NotificationID, dueRoute.RouteID, publisher.workerToken, publisher.routeLeaseTTL) if err != nil { return false, fmt.Errorf("acquire route lease %q: %w", dueRoute.RouteID, err) } if !acquired { return false, nil } defer func() { releaseCtx, cancel := context.WithTimeout(context.Background(), publisher.routeLeaseTTL) defer cancel() _ = publisher.store.ReleaseRouteLease(releaseCtx, dueRoute.NotificationID, dueRoute.RouteID, publisher.workerToken) }() notification, found, err := publisher.store.GetNotification(ctx, dueRoute.NotificationID) if err != nil { return false, fmt.Errorf("load notification %q: %w", dueRoute.NotificationID, err) } if !found { return false, fmt.Errorf("notification %q is missing for route %q", dueRoute.NotificationID, dueRoute.RouteID) } route, found, err := publisher.store.GetRoute(ctx, dueRoute.NotificationID, dueRoute.RouteID) if err != nil { return false, fmt.Errorf("load route %q: %w", dueRoute.RouteID, err) } if !found { return false, fmt.Errorf("route %q is missing for notification %q", dueRoute.RouteID, dueRoute.NotificationID) } if route.Channel != intentstream.ChannelEmail { return false, nil } switch route.Status { case acceptintent.RouteStatusPending, acceptintent.RouteStatusFailed: default: return false, nil } if route.NextAttemptAt.After(now) { return false, nil } command, err := publisher.encoder.Encode(notification, route) if err != nil { return publisher.recordFailure(ctx, notification, route, emailFailureClassificationPayloadEncoding, err.Error()) } if err := publisher.streamPublisher.XAdd(ctx, &redis.XAddArgs{ Stream: publisher.mailDeliveryCommandsStream, Values: command.Values(), }).Err(); err != nil { return publisher.recordFailure(ctx, notification, route, emailFailureClassificationMailStreamWrite, err.Error()) } err = publisher.store.CompleteRoutePublished(ctx, routestate.CompleteRoutePublishedInput{ ExpectedRoute: route, LeaseToken: publisher.workerToken, PublishedAt: publisher.now(), Stream: publisher.mailDeliveryCommandsStream, StreamMaxLen: 0, StreamValues: command.Values(), }) switch { case err == nil: publisher.recordPublishAttempt(ctx, notification, route, "published", "") logArgs := logging.RouteAttrs( notification.NotificationID, notification.NotificationType, notification.Producer, notification.AudienceKind, notification.IdempotencyKey, notification.RequestID, notification.TraceID, route.RouteID, route.Channel, ) logArgs = append(logArgs, "delivery_id", command.DeliveryID, "resolved_email", route.ResolvedEmail, ) logArgs = append(logArgs, logging.TraceAttrsFromContext(ctx)...) publisher.logger.Info("email route published", logArgs...) return true, nil case errors.Is(err, routestate.ErrConflict): return false, nil default: return publisher.recordFailure(ctx, notification, route, emailFailureClassificationMailStreamWrite, err.Error()) } } func (publisher *EmailPublisher) recordFailure( ctx context.Context, notification acceptintent.NotificationRecord, route acceptintent.NotificationRoute, classification string, message string, ) (bool, error) { failureAt := publisher.now() attemptNumber := route.AttemptCount + 1 logArgs := logging.RouteAttrs( notification.NotificationID, notification.NotificationType, notification.Producer, notification.AudienceKind, notification.IdempotencyKey, notification.RequestID, notification.TraceID, route.RouteID, route.Channel, ) logArgs = append(logArgs, "resolved_email", route.ResolvedEmail, "failure_classification", classification, "failure_message", strings.TrimSpace(message), "attempt_number", attemptNumber, "max_attempts", route.MaxAttempts, ) logArgs = append(logArgs, logging.TraceAttrsFromContext(ctx)...) if attemptNumber >= route.MaxAttempts { err := publisher.store.CompleteRouteDeadLetter(ctx, routestate.CompleteRouteDeadLetterInput{ ExpectedRoute: route, LeaseToken: publisher.workerToken, DeadLetteredAt: failureAt, FailureClassification: classification, FailureMessage: strings.TrimSpace(message), }) switch { case err == nil: publisher.recordPublishAttempt(ctx, notification, route, "dead_letter", classification) publisher.recordRouteDeadLetter(ctx, notification, route, classification) publisher.logger.Warn("email route dead-lettered", logArgs...) return true, nil case errors.Is(err, routestate.ErrConflict): return false, nil default: return false, fmt.Errorf("dead-letter route %q: %w", route.RouteID, err) } } nextAttemptAt := failureAt.Add(routeBackoffDelay(attemptNumber, publisher.routeBackoffMin, publisher.routeBackoffMax)).UTC().Truncate(time.Millisecond) err := publisher.store.CompleteRouteFailed(ctx, routestate.CompleteRouteFailedInput{ ExpectedRoute: route, LeaseToken: publisher.workerToken, FailedAt: failureAt, NextAttemptAt: nextAttemptAt, FailureClassification: classification, FailureMessage: strings.TrimSpace(message), }) switch { case err == nil: publisher.recordPublishAttempt(ctx, notification, route, "retry", classification) publisher.recordRouteRetry(ctx, notification, route) logArgs = append(logArgs, "next_attempt_at", nextAttemptAt) publisher.logger.Warn("email route failed and was rescheduled", logArgs...) return true, nil case errors.Is(err, routestate.ErrConflict): return false, nil default: return false, fmt.Errorf("reschedule route %q: %w", route.RouteID, err) } } func (publisher *EmailPublisher) now() time.Time { return publisher.clock.Now().UTC().Truncate(time.Millisecond) } func (publisher *EmailPublisher) recordPublishAttempt(ctx context.Context, notification acceptintent.NotificationRecord, route acceptintent.NotificationRoute, result string, classification string) { if publisher == nil || publisher.telemetry == nil { return } publisher.telemetry.RecordRoutePublishAttempt(ctx, string(route.Channel), string(notification.NotificationType), result, classification) } func (publisher *EmailPublisher) recordRouteRetry(ctx context.Context, notification acceptintent.NotificationRecord, route acceptintent.NotificationRoute) { if publisher == nil || publisher.telemetry == nil { return } publisher.telemetry.RecordRouteRetry(ctx, string(route.Channel), string(notification.NotificationType)) } func (publisher *EmailPublisher) recordRouteDeadLetter(ctx context.Context, notification acceptintent.NotificationRecord, route acceptintent.NotificationRoute, classification string) { if publisher == nil || publisher.telemetry == nil { return } publisher.telemetry.RecordRouteDeadLetter(ctx, string(route.Channel), string(notification.NotificationType), classification) }