// Package userlifecycle implements the Redis-Streams consumer for the // `user:lifecycle_events` topic. wires the consumer behind the // `ports.UserLifecycleConsumer` interface so the cascade worker can // register a handler without depending on Redis directly. // // The consumer mirrors the reliability shape used by `worker/gmevents`: // XREAD blocks for `BlockTimeout`, decoded events are dispatched to the // registered handler, and the persisted offset advances only after the // handler returns nil. Decoding errors and unknown event kinds are // logged and absorbed (the offset advances) so a malformed entry never // stalls the stream. Handler errors hold the offset on the current // entry so the next loop iteration retries. package userlifecycle import ( "context" "errors" "fmt" "log/slog" "strconv" "strings" "sync" "time" "galaxy/lobby/internal/ports" "github.com/redis/go-redis/v9" ) // streamOffsetLabel identifies the user-lifecycle consumer in the // stream-offset store. It stays stable when the underlying stream key // is renamed via configuration. const streamOffsetLabel = "user_lifecycle" // Config groups the dependencies used by Consumer. type Config struct { // Client provides XREAD access to the user-lifecycle stream. Client *redis.Client // Stream stores the Redis Streams key consumed by the worker. The // production default is `user:lifecycle_events`. Stream string // BlockTimeout bounds the blocking XREAD window. BlockTimeout time.Duration // OffsetStore persists the last successfully processed entry id under // the `user_lifecycle` label. OffsetStore ports.StreamOffsetStore // Clock supplies the wall-clock used for log timestamps. Defaults to // time.Now when nil. Clock func() time.Time // Logger receives structured worker-level events. Defaults to // slog.Default when nil. Logger *slog.Logger } // Consumer drives the user-lifecycle processing loop. type Consumer struct { client *redis.Client stream string blockTimeout time.Duration offsetStore ports.StreamOffsetStore clock func() time.Time logger *slog.Logger mu sync.Mutex handler ports.UserLifecycleHandler } // NewConsumer constructs one Consumer from cfg. func NewConsumer(cfg Config) (*Consumer, error) { switch { case cfg.Client == nil: return nil, errors.New("new user lifecycle consumer: nil redis client") case strings.TrimSpace(cfg.Stream) == "": return nil, errors.New("new user lifecycle consumer: stream must not be empty") case cfg.BlockTimeout <= 0: return nil, errors.New("new user lifecycle consumer: block timeout must be positive") case cfg.OffsetStore == nil: return nil, errors.New("new user lifecycle consumer: nil offset store") } clock := cfg.Clock if clock == nil { clock = time.Now } logger := cfg.Logger if logger == nil { logger = slog.Default() } return &Consumer{ client: cfg.Client, stream: cfg.Stream, blockTimeout: cfg.BlockTimeout, offsetStore: cfg.OffsetStore, clock: clock, logger: logger.With("worker", "lobby.userlifecycle", "stream", cfg.Stream), }, nil } // OnEvent installs handler as the sole dispatcher for decoded events. // A second call replaces the previous handler. Calling OnEvent // concurrently with Run is safe. func (consumer *Consumer) OnEvent(handler ports.UserLifecycleHandler) { if consumer == nil { return } consumer.mu.Lock() consumer.handler = handler consumer.mu.Unlock() } // Run drives the XREAD loop until ctx is cancelled. The offset advances // only after a successful handler return so a transient failure replays // the same entry on the next iteration. func (consumer *Consumer) Run(ctx context.Context) error { if consumer == nil || consumer.client == nil { return errors.New("run user lifecycle consumer: nil consumer") } if ctx == nil { return errors.New("run user lifecycle consumer: nil context") } if err := ctx.Err(); err != nil { return err } lastID, found, err := consumer.offsetStore.Load(ctx, streamOffsetLabel) if err != nil { return fmt.Errorf("run user lifecycle consumer: load offset: %w", err) } if !found { lastID = "0-0" } consumer.logger.Info("user lifecycle consumer started", "block_timeout", consumer.blockTimeout.String(), "start_entry_id", lastID, ) defer consumer.logger.Info("user lifecycle consumer stopped") for { streams, err := consumer.client.XRead(ctx, &redis.XReadArgs{ Streams: []string{consumer.stream, lastID}, Count: 1, Block: consumer.blockTimeout, }).Result() switch { case err == nil: for _, stream := range streams { for _, message := range stream.Messages { if !consumer.handleMessage(ctx, message) { continue } if err := consumer.offsetStore.Save(ctx, streamOffsetLabel, message.ID); err != nil { return fmt.Errorf("run user lifecycle consumer: save offset: %w", err) } lastID = message.ID } } case errors.Is(err, redis.Nil): continue case ctx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, redis.ErrClosed)): return ctx.Err() case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), errors.Is(err, redis.ErrClosed): return fmt.Errorf("run user lifecycle consumer: %w", err) default: return fmt.Errorf("run user lifecycle consumer: %w", err) } } } // Shutdown is a no-op; the consumer relies on context cancellation. func (consumer *Consumer) Shutdown(ctx context.Context) error { if ctx == nil { return errors.New("shutdown user lifecycle consumer: nil context") } return nil } // handleMessage decodes one Redis Stream entry and dispatches it to the // registered handler. It returns true when the offset is allowed to // advance, false when the consumer must hold the offset and retry on // the next iteration. Decoding errors and unknown event kinds advance // the offset so a malformed entry never stalls the stream. func (consumer *Consumer) handleMessage(ctx context.Context, message redis.XMessage) bool { event, err := decodeUserLifecycleEvent(message) if err != nil { consumer.logger.WarnContext(ctx, "decode user lifecycle event", "stream_entry_id", message.ID, "err", err.Error(), ) return true } if !event.EventType.IsKnown() { consumer.logger.InfoContext(ctx, "unknown user lifecycle event type", "stream_entry_id", message.ID, "event_type", event.EventType, ) return true } consumer.mu.Lock() handler := consumer.handler consumer.mu.Unlock() if handler == nil { consumer.logger.WarnContext(ctx, "no user lifecycle handler registered; entry dropped", "stream_entry_id", message.ID, ) return true } if err := handler(ctx, event); err != nil { consumer.logger.WarnContext(ctx, "handle user lifecycle event", "stream_entry_id", message.ID, "event_type", event.EventType, "user_id", event.UserID, "err", err.Error(), ) return false } consumer.logger.InfoContext(ctx, "user lifecycle event processed", "stream_entry_id", message.ID, "event_type", event.EventType, "user_id", event.UserID, ) return true } func decodeUserLifecycleEvent(message redis.XMessage) (ports.UserLifecycleEvent, error) { eventType := optionalString(message.Values, "event_type") userID := optionalString(message.Values, "user_id") occurredAtRaw := optionalString(message.Values, "occurred_at_ms") if strings.TrimSpace(eventType) == "" { return ports.UserLifecycleEvent{}, errors.New("missing event_type") } if strings.TrimSpace(userID) == "" { return ports.UserLifecycleEvent{}, errors.New("missing user_id") } if strings.TrimSpace(occurredAtRaw) == "" { return ports.UserLifecycleEvent{}, errors.New("missing occurred_at_ms") } ms, err := strconv.ParseInt(occurredAtRaw, 10, 64) if err != nil { return ports.UserLifecycleEvent{}, fmt.Errorf("invalid occurred_at_ms: %w", err) } if ms <= 0 { return ports.UserLifecycleEvent{}, fmt.Errorf("invalid occurred_at_ms: must be positive") } return ports.UserLifecycleEvent{ EntryID: message.ID, EventType: ports.UserLifecycleEventType(eventType), UserID: strings.TrimSpace(userID), OccurredAt: time.UnixMilli(ms).UTC(), Source: optionalString(message.Values, "source"), ActorType: optionalString(message.Values, "actor_type"), ActorID: optionalString(message.Values, "actor_id"), ReasonCode: optionalString(message.Values, "reason_code"), TraceID: optionalString(message.Values, "trace_id"), }, nil } func optionalString(values map[string]any, key string) string { raw, ok := values[key] if !ok { return "" } switch typed := raw.(type) { case string: return typed case []byte: return string(typed) default: return "" } } // Compile-time assertion: Consumer satisfies the port interface. var _ ports.UserLifecycleConsumer = (*Consumer)(nil)