// Package worker provides the long-lived background components used by the // runnable Mail Service process. package worker import ( "context" "errors" "fmt" "log/slog" "strings" "sync" "time" "galaxy/mail/internal/api/streamcommand" "galaxy/mail/internal/domain/malformedcommand" "galaxy/mail/internal/logging" "galaxy/mail/internal/service/acceptgenericdelivery" "github.com/redis/go-redis/v9" ) // AcceptGenericDeliveryUseCase accepts one generic asynchronous delivery // command. type AcceptGenericDeliveryUseCase interface { // Execute durably accepts one normalized generic-delivery command. Execute(context.Context, streamcommand.Command) (acceptgenericdelivery.Result, error) } // MalformedCommandRecorder stores one operator-visible malformed async command // record. type MalformedCommandRecorder interface { // Record persists entry idempotently by stream entry id. Record(context.Context, malformedcommand.Entry) error } // StreamOffsetStore stores the last durably processed entry id of one plain // XREAD consumer. type StreamOffsetStore interface { // Load returns the last processed entry id for stream when one is stored. Load(context.Context, string) (string, bool, error) // Save stores the last processed entry id for stream. Save(context.Context, string, string) error } // CommandConsumerTelemetry records low-cardinality stream-consumer events. type CommandConsumerTelemetry interface { // RecordMalformedCommand records one malformed or rejected async stream // command. RecordMalformedCommand(context.Context, string) } // Clock provides the current wall-clock time. type Clock interface { // Now returns the current time. Now() time.Time } type systemClock struct{} func (systemClock) Now() time.Time { return time.Now() } // CommandConsumerConfig stores the dependencies used by CommandConsumer. type CommandConsumerConfig struct { // Client stores the Redis client used for XREAD. Client *redis.Client // Stream stores the Redis Stream name to consume. Stream string // BlockTimeout stores the blocking XREAD timeout. BlockTimeout time.Duration // Acceptor durably accepts valid generic-delivery commands. Acceptor AcceptGenericDeliveryUseCase // MalformedRecorder persists operator-visible malformed-command entries. MalformedRecorder MalformedCommandRecorder // OffsetStore stores the last durably processed stream entry id. OffsetStore StreamOffsetStore // Telemetry records malformed-command counters. Telemetry CommandConsumerTelemetry // Clock provides wall-clock timestamps for malformed-command records. Clock Clock } // CommandConsumer stores the Redis Streams consumer used for generic // asynchronous delivery intake. type CommandConsumer struct { client *redis.Client stream string blockTimeout time.Duration acceptor AcceptGenericDeliveryUseCase malformedRecorder MalformedCommandRecorder offsetStore StreamOffsetStore telemetry CommandConsumerTelemetry clock Clock logger *slog.Logger closeOnce sync.Once } // NewCommandConsumer constructs the generic-delivery command consumer. func NewCommandConsumer(cfg CommandConsumerConfig, logger *slog.Logger) (*CommandConsumer, error) { switch { case cfg.Client == nil: return nil, errors.New("new command consumer: nil redis client") case strings.TrimSpace(cfg.Stream) == "": return nil, errors.New("new command consumer: stream must not be empty") case cfg.BlockTimeout <= 0: return nil, errors.New("new command consumer: block timeout must be positive") case cfg.Acceptor == nil: return nil, errors.New("new command consumer: nil acceptor") case cfg.MalformedRecorder == nil: return nil, errors.New("new command consumer: nil malformed recorder") case cfg.OffsetStore == nil: return nil, errors.New("new command consumer: nil offset store") } if cfg.Clock == nil { cfg.Clock = systemClock{} } if logger == nil { logger = slog.Default() } return &CommandConsumer{ client: cfg.Client, stream: cfg.Stream, blockTimeout: cfg.BlockTimeout, acceptor: cfg.Acceptor, malformedRecorder: cfg.MalformedRecorder, offsetStore: cfg.OffsetStore, telemetry: cfg.Telemetry, clock: cfg.Clock, logger: logger.With("component", "command_consumer", "stream", cfg.Stream), }, nil } // Run starts the command consumer and blocks until ctx is canceled or Redis // returns an unexpected error. func (consumer *CommandConsumer) Run(ctx context.Context) error { if ctx == nil { return errors.New("run command consumer: nil context") } if err := ctx.Err(); err != nil { return err } if consumer == nil || consumer.client == nil { return errors.New("run command consumer: nil consumer") } lastID, found, err := consumer.offsetStore.Load(ctx, consumer.stream) if err != nil { return fmt.Errorf("run command consumer: load stream offset: %w", err) } if !found { lastID = "0-0" } consumer.logger.Info("command consumer started", "block_timeout", consumer.blockTimeout.String(), "start_entry_id", lastID) for { streams, err := consumer.client.XRead(ctx, &redis.XReadArgs{ Streams: []string{consumer.stream, lastID}, Count: 1, Block: consumer.blockTimeout, }).Result() switch { case err == nil: for _, stream := range streams { for _, message := range stream.Messages { if err := consumer.handleMessage(ctx, message); err != nil { return err } if err := consumer.offsetStore.Save(ctx, consumer.stream, message.ID); err != nil { return fmt.Errorf("run command consumer: save stream offset: %w", err) } lastID = message.ID } } case errors.Is(err, redis.Nil): continue case ctx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, redis.ErrClosed)): consumer.logger.Info("command consumer stopped") return ctx.Err() case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), errors.Is(err, redis.ErrClosed): return fmt.Errorf("run command consumer: %w", err) default: return fmt.Errorf("run command consumer: %w", err) } } } func (consumer *CommandConsumer) handleMessage(ctx context.Context, message redis.XMessage) error { rawFields := cloneRawFields(message.Values) command, err := streamcommand.DecodeCommand(rawFields) if err != nil { return consumer.recordMalformed(ctx, message.ID, rawFields, streamcommand.ClassifyDecodeError(err), err) } result, err := consumer.acceptor.Execute(ctx, command) switch { case err == nil: logArgs := logging.CommandAttrs(command) logArgs = append(logArgs, "stream_entry_id", message.ID, "outcome", string(result.Outcome), ) logArgs = append(logArgs, logging.TraceAttrsFromContext(ctx)...) consumer.logger.Info("generic command accepted", logArgs...) return nil case errors.Is(err, acceptgenericdelivery.ErrConflict): return consumer.recordMalformed(ctx, message.ID, rawFields, malformedcommand.FailureCodeIdempotencyConflict, err) case errors.Is(err, acceptgenericdelivery.ErrServiceUnavailable): return fmt.Errorf("handle command %q: %w", message.ID, err) default: return fmt.Errorf("handle command %q: %w", message.ID, err) } } func (consumer *CommandConsumer) recordMalformed( ctx context.Context, streamEntryID string, rawFields map[string]any, failureCode malformedcommand.FailureCode, cause error, ) error { entry := malformedcommand.Entry{ StreamEntryID: streamEntryID, DeliveryID: optionalRawString(rawFields, "delivery_id"), Source: optionalRawString(rawFields, "source"), IdempotencyKey: optionalRawString(rawFields, "idempotency_key"), FailureCode: failureCode, FailureMessage: strings.TrimSpace(cause.Error()), RawFields: cloneRawFields(rawFields), RecordedAt: consumer.clock.Now().UTC().Truncate(time.Millisecond), } if err := consumer.malformedRecorder.Record(ctx, entry); err != nil { return fmt.Errorf("record malformed command %q: %w", streamEntryID, err) } if consumer.telemetry != nil { consumer.telemetry.RecordMalformedCommand(ctx, string(failureCode)) } consumer.logger.Warn("stream command rejected", append([]any{ "stream_entry_id", streamEntryID, "delivery_id", entry.DeliveryID, "source", entry.Source, "idempotency_key", entry.IdempotencyKey, "trace_id", optionalRawString(rawFields, "trace_id"), "failure_code", string(entry.FailureCode), "failure_message", entry.FailureMessage, }, logging.TraceAttrsFromContext(ctx)...)..., ) return nil } func cloneRawFields(values map[string]any) map[string]any { if values == nil { return map[string]any{} } cloned := make(map[string]any, len(values)) for key, value := range values { cloned[key] = cloneRawValue(value) } return cloned } func cloneRawValue(value any) any { switch typed := value.(type) { case map[string]any: return cloneRawFields(typed) case []any: cloned := make([]any, len(typed)) for index, item := range typed { cloned[index] = cloneRawValue(item) } return cloned default: return typed } } func optionalRawString(values map[string]any, key string) string { raw, ok := values[key] if !ok { return "" } value, ok := raw.(string) if !ok { return "" } return value } // Shutdown stops the command consumer within ctx. The consumer borrows the // shared process Redis client and forcibly closes it during Shutdown so the // in-flight blocking XREAD returns immediately; the runtime owns the same // client and its cleanupFn is tolerant of ErrClosed. func (consumer *CommandConsumer) Shutdown(ctx context.Context) error { if ctx == nil { return errors.New("shutdown command consumer: nil context") } if consumer == nil { return nil } var err error consumer.closeOnce.Do(func() { if consumer.client != nil { if cerr := consumer.client.Close(); cerr != nil && !errors.Is(cerr, redis.ErrClosed) { err = cerr } } }) return err }