// Package gmevents implements the worker that consumes Game Master // runtime events from the `gm:lobby_events` Redis Stream and drives the // surface area: keeping the denormalized runtime snapshot // current, feeding the per-game stats aggregate, and // dispatching capability evaluation at game finish. // // The consumer recognizes two event kinds documented in // lobby/README.md §Runtime Snapshot: // // - `runtime_snapshot_update` — applied to the game record snapshot // (current_turn, runtime_status, engine_health_summary) and to the // per-user stats aggregate (initial fields frozen on the first // observation, max fields raised by per-component maximum on every // event); // - `game_finished` — the same snapshot update plus a status // transition to `finished` and a capability-evaluation hand-off. // // Replay protection rests on three ingredients: // // 1. Status transitions (running/paused → finished) use the existing // ports.GameStore CAS guard, so a replayed game_finished finds the // game already in `finished` and the second pass is a no-op for the // status field. // 2. Snapshot updates use a fresh `At` timestamp on each call but // overwrite a deterministic snapshot blob, so re-applying an older // event does not corrupt the record. // 3. Capability evaluation uses ports.EvaluationGuardStore to recognise // replay and skip mutations. // // Stream-offset bookkeeping advances after each successfully handled // event. A `game_finished` event that fails capability evaluation // (transient store error) leaves the offset behind so the next loop // iteration retries the same entry. package gmevents import ( "context" "encoding/json" "errors" "fmt" "log/slog" "strconv" "strings" "time" "galaxy/lobby/internal/domain/common" "galaxy/lobby/internal/domain/game" "galaxy/lobby/internal/logging" "galaxy/lobby/internal/ports" "galaxy/lobby/internal/telemetry" "github.com/redis/go-redis/v9" ) // streamOffsetLabel identifies the gmevents consumer in the stream offset // store. It stays stable when the underlying stream key is renamed via // configuration. const streamOffsetLabel = "gm_lobby_events" // Event kinds carried in the GM stream `kind` field. const ( kindRuntimeSnapshotUpdate = "runtime_snapshot_update" kindGameFinished = "game_finished" ) // CapabilityEvaluator is the minimal interface the gmevents consumer // requires from the capability evaluation service. The interface // lives here rather than in the service package to avoid an import cycle. type CapabilityEvaluator interface { Evaluate(ctx context.Context, gameID common.GameID, finishedAt time.Time) error } // Config groups the dependencies used by Consumer. type Config struct { // Client provides XREAD access to the GM events stream. Client *redis.Client // Stream stores the Redis Streams key consumed by the worker. Stream string // BlockTimeout bounds the blocking XREAD window. BlockTimeout time.Duration // Games persists the runtime snapshot updates and the game-finished // status transitions. Games ports.GameStore // Stats persists the per-user stats aggregate fed by every snapshot // event. Stats ports.GameTurnStatsStore // Capability runs capability evaluation after a successful // `game_finished` transition. Capability CapabilityEvaluator // OffsetStore persists the last successfully processed entry id. OffsetStore ports.StreamOffsetStore // Clock supplies the wall-clock used for snapshot UpdatedAt and for // status transition timestamps when the GM event does not carry one. // Defaults to time.Now when nil. Clock func() time.Time // Logger receives structured worker-level events. Defaults to // slog.Default when nil. Logger *slog.Logger // Telemetry records the `lobby.game.transitions` counter on each // successful game-finished transition. Optional; nil disables // metric emission. Telemetry *telemetry.Runtime } // Consumer drives the gmevents processing loop. type Consumer struct { client *redis.Client stream string blockTimeout time.Duration games ports.GameStore stats ports.GameTurnStatsStore capability CapabilityEvaluator offsetStore ports.StreamOffsetStore clock func() time.Time logger *slog.Logger telemetry *telemetry.Runtime } // NewConsumer constructs one Consumer from cfg. func NewConsumer(cfg Config) (*Consumer, error) { switch { case cfg.Client == nil: return nil, errors.New("new gm events consumer: nil redis client") case strings.TrimSpace(cfg.Stream) == "": return nil, errors.New("new gm events consumer: stream must not be empty") case cfg.BlockTimeout <= 0: return nil, errors.New("new gm events consumer: block timeout must be positive") case cfg.Games == nil: return nil, errors.New("new gm events consumer: nil game store") case cfg.Stats == nil: return nil, errors.New("new gm events consumer: nil game turn stats store") case cfg.Capability == nil: return nil, errors.New("new gm events consumer: nil capability evaluator") case cfg.OffsetStore == nil: return nil, errors.New("new gm events consumer: nil offset store") } clock := cfg.Clock if clock == nil { clock = time.Now } logger := cfg.Logger if logger == nil { logger = slog.Default() } return &Consumer{ client: cfg.Client, stream: cfg.Stream, blockTimeout: cfg.BlockTimeout, games: cfg.Games, stats: cfg.Stats, capability: cfg.Capability, offsetStore: cfg.OffsetStore, clock: clock, logger: logger.With("worker", "lobby.gmevents", "stream", cfg.Stream), telemetry: cfg.Telemetry, }, nil } // Run drives the XREAD loop until ctx is cancelled. Per-message outcomes // are absorbed by HandleMessage; the loop only exits on context // cancellation or a fatal Redis error. The offset advances only after a // successful HandleMessage call so capability evaluation failure replays // the same entry on the next iteration. func (consumer *Consumer) Run(ctx context.Context) error { if consumer == nil || consumer.client == nil { return errors.New("run gm events consumer: nil consumer") } if ctx == nil { return errors.New("run gm events consumer: nil context") } if err := ctx.Err(); err != nil { return err } lastID, found, err := consumer.offsetStore.Load(ctx, streamOffsetLabel) if err != nil { return fmt.Errorf("run gm events consumer: load offset: %w", err) } if !found { lastID = "0-0" } consumer.logger.Info("gm events consumer started", "block_timeout", consumer.blockTimeout.String(), "start_entry_id", lastID, ) defer consumer.logger.Info("gm events consumer stopped") for { streams, err := consumer.client.XRead(ctx, &redis.XReadArgs{ Streams: []string{consumer.stream, lastID}, Count: 1, Block: consumer.blockTimeout, }).Result() switch { case err == nil: for _, stream := range streams { for _, message := range stream.Messages { if !consumer.HandleMessage(ctx, message) { continue } if err := consumer.offsetStore.Save(ctx, streamOffsetLabel, message.ID); err != nil { return fmt.Errorf("run gm events consumer: save offset: %w", err) } lastID = message.ID } } case errors.Is(err, redis.Nil): continue case ctx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, redis.ErrClosed)): return ctx.Err() case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), errors.Is(err, redis.ErrClosed): return fmt.Errorf("run gm events consumer: %w", err) default: return fmt.Errorf("run gm events consumer: %w", err) } } } // Shutdown is a no-op; the consumer relies on context cancellation. func (consumer *Consumer) Shutdown(ctx context.Context) error { if ctx == nil { return errors.New("shutdown gm events consumer: nil context") } return nil } // HandleMessage processes one Redis Stream message and reports whether // the offset is allowed to advance. Decoding errors and logical replays // return true (the message is consumed and the offset advances). A // `game_finished` whose capability evaluation fails returns false so the // caller leaves the offset behind and the next iteration retries. // // Exported so tests can drive the consumer deterministically without // spinning up a real XREAD loop. func (consumer *Consumer) HandleMessage(ctx context.Context, message redis.XMessage) bool { if consumer == nil { return false } event, err := decodeGMEvent(message) if err != nil { consumer.logger.WarnContext(ctx, "decode gm event", "stream_entry_id", message.ID, "err", err.Error(), ) return true } switch event.Kind { case kindRuntimeSnapshotUpdate: consumer.handleSnapshotUpdate(ctx, message.ID, event) return true case kindGameFinished: return consumer.handleGameFinished(ctx, message.ID, event) default: consumer.logger.WarnContext(ctx, "unknown gm event kind", "stream_entry_id", message.ID, "game_id", event.GameID.String(), "kind", event.Kind, ) return true } } // handleSnapshotUpdate applies the snapshot to the game record and the // stats aggregate. Errors are logged and absorbed: the message advances // in either case so a transient Redis hiccup does not stall the stream. // CAS-protected mutations naturally absorb replays without further // bookkeeping. func (consumer *Consumer) handleSnapshotUpdate(ctx context.Context, entryID string, event gmEvent) { at := consumer.clock().UTC() if err := consumer.games.UpdateRuntimeSnapshot(ctx, ports.UpdateRuntimeSnapshotInput{ GameID: event.GameID, Snapshot: event.Snapshot, At: at, }); err != nil && !errors.Is(err, game.ErrNotFound) { consumer.logger.WarnContext(ctx, "apply runtime snapshot", "stream_entry_id", entryID, "game_id", event.GameID.String(), "err", err.Error(), ) } if len(event.PlayerStats) == 0 { return } if err := consumer.stats.SaveInitial(ctx, event.GameID, event.PlayerStats); err != nil { consumer.logger.WarnContext(ctx, "save initial player stats", "stream_entry_id", entryID, "game_id", event.GameID.String(), "err", err.Error(), ) } if err := consumer.stats.UpdateMax(ctx, event.GameID, event.PlayerStats); err != nil { consumer.logger.WarnContext(ctx, "update max player stats", "stream_entry_id", entryID, "game_id", event.GameID.String(), "err", err.Error(), ) } } // handleGameFinished applies the final snapshot, transitions the game to // `finished` (or absorbs a replay where the game is already finished), // then drives capability evaluation. The function returns false to hold // the stream offset when a recoverable failure prevents capability // evaluation; the next loop iteration retries the same entry. func (consumer *Consumer) handleGameFinished(ctx context.Context, entryID string, event gmEvent) bool { at := consumer.clock().UTC() finishedAt := event.FinishedAt if finishedAt.IsZero() { finishedAt = at } if err := consumer.games.UpdateRuntimeSnapshot(ctx, ports.UpdateRuntimeSnapshotInput{ GameID: event.GameID, Snapshot: event.Snapshot, At: at, }); err != nil && !errors.Is(err, game.ErrNotFound) { consumer.logger.WarnContext(ctx, "apply final runtime snapshot", "stream_entry_id", entryID, "game_id", event.GameID.String(), "err", err.Error(), ) } if len(event.PlayerStats) > 0 { if err := consumer.stats.SaveInitial(ctx, event.GameID, event.PlayerStats); err != nil { consumer.logger.WarnContext(ctx, "save initial player stats on finish", "stream_entry_id", entryID, "game_id", event.GameID.String(), "err", err.Error(), ) } if err := consumer.stats.UpdateMax(ctx, event.GameID, event.PlayerStats); err != nil { consumer.logger.WarnContext(ctx, "update max player stats on finish", "stream_entry_id", entryID, "game_id", event.GameID.String(), "err", err.Error(), ) } } record, err := consumer.games.Get(ctx, event.GameID) switch { case err == nil: case errors.Is(err, game.ErrNotFound): consumer.logger.WarnContext(ctx, "game finished for unknown game id", "stream_entry_id", entryID, "game_id", event.GameID.String(), ) return true default: consumer.logger.WarnContext(ctx, "load game for finish", "stream_entry_id", entryID, "game_id", event.GameID.String(), "err", err.Error(), ) return false } switch record.Status { case game.StatusRunning, game.StatusPaused: if err := consumer.games.UpdateStatus(ctx, ports.UpdateStatusInput{ GameID: record.GameID, ExpectedFrom: record.Status, To: game.StatusFinished, Trigger: game.TriggerRuntimeEvent, At: finishedAt, }); err != nil { switch { case errors.Is(err, game.ErrConflict), errors.Is(err, game.ErrInvalidTransition): consumer.logger.InfoContext(ctx, "game finished transition absorbed by status conflict", "stream_entry_id", entryID, "game_id", record.GameID.String(), ) default: consumer.logger.WarnContext(ctx, "transition game to finished", "stream_entry_id", entryID, "game_id", record.GameID.String(), "err", err.Error(), ) return false } } else { consumer.telemetry.RecordGameTransition(ctx, string(record.Status), string(game.StatusFinished), string(game.TriggerRuntimeEvent), ) logArgs := []any{ "stream_entry_id", entryID, "game_id", record.GameID.String(), "from_status", string(record.Status), "to_status", string(game.StatusFinished), "trigger", string(game.TriggerRuntimeEvent), } logArgs = append(logArgs, logging.ContextAttrs(ctx)...) consumer.logger.InfoContext(ctx, "game finished", logArgs...) } case game.StatusFinished: consumer.logger.InfoContext(ctx, "game finished event observed for already finished game", "stream_entry_id", entryID, "game_id", record.GameID.String(), ) default: consumer.logger.InfoContext(ctx, "game finished event ignored for unexpected status", "stream_entry_id", entryID, "game_id", record.GameID.String(), "current_status", string(record.Status), ) return true } refreshed, err := consumer.games.Get(ctx, event.GameID) if err != nil { consumer.logger.WarnContext(ctx, "reload finished game record", "stream_entry_id", entryID, "game_id", event.GameID.String(), "err", err.Error(), ) return false } if refreshed.Status != game.StatusFinished { consumer.logger.WarnContext(ctx, "game record not finished after transition attempt", "stream_entry_id", entryID, "game_id", refreshed.GameID.String(), "current_status", string(refreshed.Status), ) return true } if refreshed.FinishedAt == nil { consumer.logger.WarnContext(ctx, "finished game missing finished_at", "stream_entry_id", entryID, "game_id", refreshed.GameID.String(), ) return true } if err := consumer.capability.Evaluate(ctx, refreshed.GameID, *refreshed.FinishedAt); err != nil { consumer.logger.WarnContext(ctx, "capability evaluation failed", "stream_entry_id", entryID, "game_id", refreshed.GameID.String(), "err", err.Error(), ) return false } consumer.logger.InfoContext(ctx, "game finished processed", "stream_entry_id", entryID, "game_id", refreshed.GameID.String(), ) return true } // gmEvent stores the decoded shape of one `gm:lobby_events` entry shared // by the snapshot and finish handlers. type gmEvent struct { Kind string GameID common.GameID Snapshot game.RuntimeSnapshot PlayerStats []ports.PlayerObservedStats FinishedAt time.Time } func decodeGMEvent(message redis.XMessage) (gmEvent, error) { kind := optionalString(message.Values, "kind") if kind != kindRuntimeSnapshotUpdate && kind != kindGameFinished { return gmEvent{}, fmt.Errorf("unsupported event kind %q", kind) } gameIDRaw := optionalString(message.Values, "game_id") if strings.TrimSpace(gameIDRaw) == "" { return gmEvent{}, errors.New("missing game_id") } gameID := common.GameID(gameIDRaw) if err := gameID.Validate(); err != nil { return gmEvent{}, fmt.Errorf("invalid game_id: %w", err) } snapshot := game.RuntimeSnapshot{ RuntimeStatus: optionalString(message.Values, "runtime_status"), EngineHealthSummary: optionalString(message.Values, "engine_health_summary"), } if turnRaw := optionalString(message.Values, "current_turn"); turnRaw != "" { parsed, err := strconv.Atoi(turnRaw) if err != nil { return gmEvent{}, fmt.Errorf("invalid current_turn: %w", err) } if parsed < 0 { return gmEvent{}, fmt.Errorf("invalid current_turn: must not be negative") } snapshot.CurrentTurn = parsed } playerStats, err := decodePlayerStats(optionalString(message.Values, "player_turn_stats")) if err != nil { return gmEvent{}, fmt.Errorf("invalid player_turn_stats: %w", err) } var finishedAt time.Time if raw := optionalString(message.Values, "finished_at_ms"); raw != "" { ms, err := strconv.ParseInt(raw, 10, 64) if err != nil { return gmEvent{}, fmt.Errorf("invalid finished_at_ms: %w", err) } if ms <= 0 { return gmEvent{}, fmt.Errorf("invalid finished_at_ms: must be positive") } finishedAt = time.UnixMilli(ms).UTC() } return gmEvent{ Kind: kind, GameID: gameID, Snapshot: snapshot, PlayerStats: playerStats, FinishedAt: finishedAt, }, nil } // playerStatsLine mirrors the JSON shape of one `player_turn_stats` // element on the GM stream. type playerStatsLine struct { UserID string `json:"user_id"` Planets int64 `json:"planets"` Population int64 `json:"population"` ShipsBuilt int64 `json:"ships_built"` } func decodePlayerStats(payload string) ([]ports.PlayerObservedStats, error) { if strings.TrimSpace(payload) == "" { return nil, nil } var lines []playerStatsLine if err := json.Unmarshal([]byte(payload), &lines); err != nil { return nil, err } stats := make([]ports.PlayerObservedStats, 0, len(lines)) for _, line := range lines { entry := ports.PlayerObservedStats{ UserID: strings.TrimSpace(line.UserID), Planets: line.Planets, Population: line.Population, ShipsBuilt: line.ShipsBuilt, } if err := entry.Validate(); err != nil { return nil, err } stats = append(stats, entry) } return stats, nil } func optionalString(values map[string]any, key string) string { raw, ok := values[key] if !ok { return "" } switch typed := raw.(type) { case string: return typed case []byte: return string(typed) default: return "" } }