package worker import ( "context" "errors" "fmt" "log/slog" "strings" "time" "galaxy/notification/internal/api/intentstream" "galaxy/notification/internal/logging" "galaxy/notification/internal/service/acceptintent" "galaxy/notification/internal/service/malformedintent" "github.com/redis/go-redis/v9" ) // AcceptIntentUseCase accepts one normalized notification intent. type AcceptIntentUseCase interface { // Execute durably accepts one normalized notification intent. Execute(context.Context, acceptintent.AcceptInput) (acceptintent.Result, error) } // MalformedIntentRecorder stores one operator-visible malformed-intent record. type MalformedIntentRecorder interface { // Record persists entry idempotently by stream entry id. Record(context.Context, malformedintent.Entry) error } // StreamOffsetStore stores the last durably processed entry id of one plain // XREAD consumer. type StreamOffsetStore interface { // Load returns the last processed entry id for stream when one is stored. Load(context.Context, string) (string, bool, error) // Save stores the last processed entry id for stream. Save(context.Context, string, string) error } // IntentConsumerTelemetry records low-cardinality stream-consumer events. type IntentConsumerTelemetry interface { // RecordMalformedIntent records one malformed or rejected notification // intent. RecordMalformedIntent(context.Context, string, string, string) } // Clock provides the current wall-clock time. type Clock interface { // Now returns the current time. Now() time.Time } type systemClock struct{} func (systemClock) Now() time.Time { return time.Now() } // IntentConsumerConfig stores the dependencies used by IntentConsumer. type IntentConsumerConfig struct { // Client stores the Redis client used for XREAD. Client *redis.Client // Stream stores the Redis Stream name to consume. Stream string // BlockTimeout stores the blocking XREAD timeout. BlockTimeout time.Duration // Acceptor durably accepts valid notification intents. Acceptor AcceptIntentUseCase // MalformedRecorder persists operator-visible malformed-intent entries. MalformedRecorder MalformedIntentRecorder // OffsetStore stores the last durably processed stream entry id. OffsetStore StreamOffsetStore // Telemetry records malformed-intent counters. Telemetry IntentConsumerTelemetry // Clock provides wall-clock timestamps for malformed-intent records. Clock Clock } // IntentConsumer stores the Redis Streams consumer used for notification // intent intake. type IntentConsumer struct { client *redis.Client stream string blockTimeout time.Duration acceptor AcceptIntentUseCase malformedRecorder MalformedIntentRecorder offsetStore StreamOffsetStore telemetry IntentConsumerTelemetry clock Clock logger *slog.Logger } // NewIntentConsumer constructs the notification-intent consumer. func NewIntentConsumer(cfg IntentConsumerConfig, logger *slog.Logger) (*IntentConsumer, error) { switch { case cfg.Client == nil: return nil, errors.New("new intent consumer: nil redis client") case strings.TrimSpace(cfg.Stream) == "": return nil, errors.New("new intent consumer: stream must not be empty") case cfg.BlockTimeout <= 0: return nil, errors.New("new intent consumer: block timeout must be positive") case cfg.Acceptor == nil: return nil, errors.New("new intent consumer: nil acceptor") case cfg.MalformedRecorder == nil: return nil, errors.New("new intent consumer: nil malformed recorder") case cfg.OffsetStore == nil: return nil, errors.New("new intent consumer: nil offset store") } if cfg.Clock == nil { cfg.Clock = systemClock{} } if logger == nil { logger = slog.Default() } return &IntentConsumer{ client: cfg.Client, stream: cfg.Stream, blockTimeout: cfg.BlockTimeout, acceptor: cfg.Acceptor, malformedRecorder: cfg.MalformedRecorder, offsetStore: cfg.OffsetStore, telemetry: cfg.Telemetry, clock: cfg.Clock, logger: logger.With("component", "intent_consumer", "stream", cfg.Stream), }, nil } // Run starts the intent consumer and blocks until ctx is canceled or Redis // returns an unexpected error. func (consumer *IntentConsumer) Run(ctx context.Context) error { if ctx == nil { return errors.New("run intent consumer: nil context") } if err := ctx.Err(); err != nil { return err } if consumer == nil || consumer.client == nil { return errors.New("run intent consumer: nil consumer") } lastID, found, err := consumer.offsetStore.Load(ctx, consumer.stream) if err != nil { return fmt.Errorf("run intent consumer: load stream offset: %w", err) } if !found { lastID = "0-0" } consumer.logger.Info("intent consumer started", "block_timeout", consumer.blockTimeout.String(), "start_entry_id", lastID) for { streams, err := consumer.client.XRead(ctx, &redis.XReadArgs{ Streams: []string{consumer.stream, lastID}, Count: 1, Block: consumer.blockTimeout, }).Result() switch { case err == nil: for _, stream := range streams { for _, message := range stream.Messages { if err := consumer.handleMessage(ctx, message); err != nil { return err } if err := consumer.offsetStore.Save(ctx, consumer.stream, message.ID); err != nil { return fmt.Errorf("run intent consumer: save stream offset: %w", err) } lastID = message.ID } } case errors.Is(err, redis.Nil): continue case ctx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, redis.ErrClosed)): consumer.logger.Info("intent consumer stopped") return ctx.Err() case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), errors.Is(err, redis.ErrClosed): return fmt.Errorf("run intent consumer: %w", err) default: return fmt.Errorf("run intent consumer: %w", err) } } } func (consumer *IntentConsumer) handleMessage(ctx context.Context, message redis.XMessage) error { rawFields := cloneRawFields(message.Values) intent, err := intentstream.DecodeIntent(rawFields) if err != nil { return consumer.recordMalformed( ctx, message.ID, rawFields, intentstream.ClassifyDecodeError(err), err, ) } result, err := consumer.acceptor.Execute(ctx, acceptintent.AcceptInput{ NotificationID: message.ID, Intent: intent, }) switch { case err == nil: logArgs := []any{ "stream_entry_id", message.ID, "notification_id", message.ID, } logArgs = append(logArgs, logging.IntentAttrs(intent)...) logArgs = append(logArgs, "outcome", string(result.Outcome), ) logArgs = append(logArgs, logging.TraceAttrsFromContext(ctx)...) consumer.logger.Info("notification intent handled", logArgs...) return nil case errors.Is(err, acceptintent.ErrConflict): return consumer.recordMalformed(ctx, message.ID, rawFields, malformedintent.FailureCodeIdempotencyConflict, err) case errors.Is(err, acceptintent.ErrRecipientNotFound): return consumer.recordMalformed(ctx, message.ID, rawFields, malformedintent.FailureCodeRecipientNotFound, err) case errors.Is(err, acceptintent.ErrServiceUnavailable): return fmt.Errorf("handle intent %q: %w", message.ID, err) default: return fmt.Errorf("handle intent %q: %w", message.ID, err) } } func (consumer *IntentConsumer) recordMalformed( ctx context.Context, streamEntryID string, rawFields map[string]any, failureCode malformedintent.FailureCode, cause error, ) error { entry := malformedintent.Entry{ StreamEntryID: streamEntryID, NotificationType: optionalRawString(rawFields, "notification_type"), Producer: optionalRawString(rawFields, "producer"), IdempotencyKey: optionalRawString(rawFields, "idempotency_key"), FailureCode: failureCode, FailureMessage: strings.TrimSpace(cause.Error()), RawFields: cloneRawFields(rawFields), RecordedAt: consumer.clock.Now().UTC().Truncate(time.Millisecond), } if err := consumer.malformedRecorder.Record(ctx, entry); err != nil { return fmt.Errorf("record malformed intent %q: %w", streamEntryID, err) } if consumer.telemetry != nil { consumer.telemetry.RecordMalformedIntent(ctx, string(failureCode), entry.NotificationType, entry.Producer) } logArgs := []any{ "stream_entry_id", streamEntryID, "notification_type", entry.NotificationType, "producer", entry.Producer, "idempotency_key", entry.IdempotencyKey, "failure_code", string(entry.FailureCode), "failure_message", entry.FailureMessage, } if traceID := optionalRawString(rawFields, "trace_id"); traceID != "" { logArgs = append(logArgs, "trace_id", traceID) } logArgs = append(logArgs, logging.TraceAttrsFromContext(ctx)...) consumer.logger.Warn("notification intent rejected", logArgs...) return nil } func cloneRawFields(values map[string]any) map[string]any { if values == nil { return map[string]any{} } cloned := make(map[string]any, len(values)) for key, value := range values { cloned[key] = cloneRawValue(value) } return cloned } func cloneRawValue(value any) any { switch typed := value.(type) { case map[string]any: return cloneRawFields(typed) case []any: cloned := make([]any, len(typed)) for index, item := range typed { cloned[index] = cloneRawValue(item) } return cloned default: return typed } } func optionalRawString(values map[string]any, key string) string { raw, ok := values[key] if !ok { return "" } switch typed := raw.(type) { case string: return typed case []byte: return string(typed) default: return "" } } // Shutdown stops the intent consumer within ctx. The consumer relies on // context cancellation and a bounded block timeout, so it has no dedicated // resources to release here. func (consumer *IntentConsumer) Shutdown(ctx context.Context) error { if ctx == nil { return errors.New("shutdown intent consumer: nil context") } if consumer == nil { return nil } return nil }