// Package stopjobsconsumer drives the asynchronous half of the // Lobby ↔ Runtime Manager stop contract. The consumer XREADs from // `runtime:stop_jobs` (produced by Lobby), decodes the envelope frozen // in `rtmanager/api/runtime-jobs-asyncapi.yaml`, calls the production // stop orchestrator, and publishes one `runtime:job_results` outcome // per consumed envelope. // // Replay safety: the stop service surfaces an already-stopped or // already-removed record as `Outcome=success` with // `error_code=replay_no_op`. The consumer copies the result fields // into the wire payload verbatim. Per-message decode and publish // errors are logged and absorbed; the offset advances unconditionally // so a single poison message cannot pin the loop. Design rationale is // captured in `rtmanager/docs/workers.md`. package stopjobsconsumer import ( "context" "errors" "fmt" "log/slog" "strconv" "strings" "time" "galaxy/rtmanager/internal/domain/operation" "galaxy/rtmanager/internal/logging" "galaxy/rtmanager/internal/ports" "galaxy/rtmanager/internal/service/stopruntime" "github.com/redis/go-redis/v9" ) // streamOffsetLabel identifies the stop-jobs consumer in the stream // offset store. Matches the convention from // `rtmanager/README.md §Persistence Layout > Redis runtime-coordination state`. const streamOffsetLabel = "stopjobs" // Wire field names of the `RuntimeStopJob` payload. Frozen by // `rtmanager/api/runtime-jobs-asyncapi.yaml`. const ( fieldGameID = "game_id" fieldReason = "reason" fieldRequestedAtMS = "requested_at_ms" ) // StopService is the narrow surface the consumer needs from the stop // orchestrator. The concrete `*stopruntime.Service` satisfies this // interface and is wired in production. type StopService interface { Handle(ctx context.Context, input stopruntime.Input) (stopruntime.Result, error) } // Config groups the dependencies required to construct a Consumer. type Config struct { // Client provides XREAD access to the stop-jobs stream. Client *redis.Client // Stream stores the Redis Streams key consumed by the worker. Stream string // BlockTimeout bounds the blocking XREAD window. BlockTimeout time.Duration // StopService executes the stop lifecycle for each decoded envelope. StopService StopService // JobResults publishes one outcome entry per processed envelope. JobResults ports.JobResultPublisher // OffsetStore persists the last successfully processed entry id so // the consumer survives restarts without replaying processed // envelopes. OffsetStore ports.StreamOffsetStore // Logger receives structured worker-level events. Defaults to // `slog.Default` when nil. Logger *slog.Logger } // Consumer drives the stop-jobs processing loop. type Consumer struct { client *redis.Client stream string blockTimeout time.Duration stopService StopService jobResults ports.JobResultPublisher offsetStore ports.StreamOffsetStore logger *slog.Logger } // NewConsumer constructs one Consumer from cfg. func NewConsumer(cfg Config) (*Consumer, error) { switch { case cfg.Client == nil: return nil, errors.New("new stop jobs consumer: nil redis client") case strings.TrimSpace(cfg.Stream) == "": return nil, errors.New("new stop jobs consumer: stream must not be empty") case cfg.BlockTimeout <= 0: return nil, errors.New("new stop jobs consumer: block timeout must be positive") case cfg.StopService == nil: return nil, errors.New("new stop jobs consumer: nil stop service") case cfg.JobResults == nil: return nil, errors.New("new stop jobs consumer: nil job results publisher") case cfg.OffsetStore == nil: return nil, errors.New("new stop jobs consumer: nil offset store") } logger := cfg.Logger if logger == nil { logger = slog.Default() } return &Consumer{ client: cfg.Client, stream: cfg.Stream, blockTimeout: cfg.BlockTimeout, stopService: cfg.StopService, jobResults: cfg.JobResults, offsetStore: cfg.OffsetStore, logger: logger.With("worker", "rtmanager.stopjobs", "stream", cfg.Stream), }, nil } // Run drives the XREAD loop until ctx is cancelled. func (consumer *Consumer) Run(ctx context.Context) error { if consumer == nil || consumer.client == nil { return errors.New("run stop jobs consumer: nil consumer") } if ctx == nil { return errors.New("run stop jobs consumer: nil context") } if err := ctx.Err(); err != nil { return err } lastID, found, err := consumer.offsetStore.Load(ctx, streamOffsetLabel) if err != nil { return fmt.Errorf("run stop jobs consumer: load offset: %w", err) } if !found { lastID = "0-0" } consumer.logger.Info("stop jobs consumer started", "block_timeout", consumer.blockTimeout.String(), "start_entry_id", lastID, ) defer consumer.logger.Info("stop jobs consumer stopped") for { streams, err := consumer.client.XRead(ctx, &redis.XReadArgs{ Streams: []string{consumer.stream, lastID}, Count: 1, Block: consumer.blockTimeout, }).Result() switch { case err == nil: for _, stream := range streams { for _, message := range stream.Messages { consumer.HandleMessage(ctx, message) if err := consumer.offsetStore.Save(ctx, streamOffsetLabel, message.ID); err != nil { return fmt.Errorf("run stop jobs consumer: save offset: %w", err) } lastID = message.ID } } case errors.Is(err, redis.Nil): continue case ctx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, redis.ErrClosed)): return ctx.Err() case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), errors.Is(err, redis.ErrClosed): return fmt.Errorf("run stop jobs consumer: %w", err) default: return fmt.Errorf("run stop jobs consumer: %w", err) } } } // Shutdown is a no-op; the consumer relies on context cancellation. func (consumer *Consumer) Shutdown(ctx context.Context) error { if ctx == nil { return errors.New("shutdown stop jobs consumer: nil context") } return nil } // HandleMessage processes one Redis Stream message. Exported so tests // can drive the consumer deterministically without spinning up a real // XREAD loop. func (consumer *Consumer) HandleMessage(ctx context.Context, message redis.XMessage) { if consumer == nil { return } envelope, err := decodeStopJob(message) if err != nil { consumer.logger.WarnContext(ctx, "decode stop job", "stream_entry_id", message.ID, "err", err.Error(), ) return } input := stopruntime.Input{ GameID: envelope.GameID, Reason: envelope.Reason, OpSource: operation.OpSourceLobbyStream, SourceRef: message.ID, } result, err := consumer.stopService.Handle(ctx, input) if err != nil { consumer.logger.ErrorContext(ctx, "stop service returned go-level error", "stream_entry_id", message.ID, "game_id", envelope.GameID, "err", err.Error(), ) return } jobResult := buildJobResult(envelope.GameID, result) if err := consumer.jobResults.Publish(ctx, jobResult); err != nil { consumer.logger.ErrorContext(ctx, "publish job result", "stream_entry_id", message.ID, "game_id", envelope.GameID, "outcome", jobResult.Outcome, "error_code", jobResult.ErrorCode, "err", err.Error(), ) return } logArgs := []any{ "stream_entry_id", message.ID, "game_id", envelope.GameID, "reason", string(envelope.Reason), "outcome", jobResult.Outcome, "error_code", jobResult.ErrorCode, "requested_at_ms", envelope.RequestedAtMS, } logArgs = append(logArgs, logging.ContextAttrs(ctx)...) consumer.logger.InfoContext(ctx, "stop job processed", logArgs...) } // stopJobEnvelope stores the decoded shape of one `runtime:stop_jobs` // stream entry. type stopJobEnvelope struct { GameID string Reason stopruntime.StopReason RequestedAtMS int64 } func decodeStopJob(message redis.XMessage) (stopJobEnvelope, error) { gameID := strings.TrimSpace(optionalString(message.Values, fieldGameID)) if gameID == "" { return stopJobEnvelope{}, errors.New("missing game_id") } reasonRaw := strings.TrimSpace(optionalString(message.Values, fieldReason)) if reasonRaw == "" { return stopJobEnvelope{}, errors.New("missing reason") } reason := stopruntime.StopReason(reasonRaw) if !reason.IsKnown() { return stopJobEnvelope{}, fmt.Errorf("unsupported reason %q", reasonRaw) } requestedAtMS, err := optionalInt64(message.Values, fieldRequestedAtMS) if err != nil { return stopJobEnvelope{}, fmt.Errorf("invalid requested_at_ms: %w", err) } return stopJobEnvelope{ GameID: gameID, Reason: reason, RequestedAtMS: requestedAtMS, }, nil } // buildJobResult translates a stopruntime.Result into the wire payload // published on `runtime:job_results`. Stop replays for `status=removed` // records carry an empty `CurrentContainerID`; the consumer publishes // the empty fields verbatim, which the AsyncAPI contract permits. func buildJobResult(gameID string, result stopruntime.Result) ports.JobResult { jobResult := ports.JobResult{ GameID: gameID, Outcome: string(result.Outcome), ErrorCode: result.ErrorCode, ErrorMessage: result.ErrorMessage, } if result.Outcome == operation.OutcomeSuccess { jobResult.ContainerID = result.Record.CurrentContainerID jobResult.EngineEndpoint = result.Record.EngineEndpoint } return jobResult } func optionalString(values map[string]any, key string) string { raw, ok := values[key] if !ok { return "" } switch typed := raw.(type) { case string: return typed case []byte: return string(typed) default: return "" } } func optionalInt64(values map[string]any, key string) (int64, error) { raw, ok := values[key] if !ok { return 0, nil } var stringValue string switch typed := raw.(type) { case string: stringValue = typed case []byte: stringValue = string(typed) default: return 0, fmt.Errorf("unsupported type %T", raw) } stringValue = strings.TrimSpace(stringValue) if stringValue == "" { return 0, nil } parsed, err := strconv.ParseInt(stringValue, 10, 64) if err != nil { return 0, err } return parsed, nil }