package runtime import ( "context" "encoding/json" "errors" "fmt" "os" "path/filepath" "strings" "sync" "time" "galaxy/backend/internal/dockerclient" "galaxy/model/rest" "github.com/google/uuid" "go.uber.org/zap" ) // Service is the runtime-domain entry point. It owns the per-game // lifecycle (start, stop, pause, resume, restart, patch, // force-next-turn), the runtime cache, the player-mapping projection, // and the operation log; it coordinates with the worker pool and the // per-game scheduler goroutines. type Service struct { deps Deps gameMu sync.Map // uuid.UUID -> *sync.Mutex scheduler *Scheduler workers *WorkerPool } // NewService constructs a Service. Logger and Now default sensibly. The // `Service` is `app.Component`-shaped through the embedded WorkerPool / // Scheduler / Reconciler that callers register separately. func NewService(deps Deps) (*Service, error) { if deps.Store == nil { return nil, errors.New("runtime: store must not be nil") } if deps.Cache == nil { return nil, errors.New("runtime: cache must not be nil") } if deps.EngineVersions == nil { return nil, errors.New("runtime: engine version service must not be nil") } if deps.Docker == nil { return nil, errors.New("runtime: docker client must not be nil") } if deps.Engine == nil { return nil, errors.New("runtime: engine client must not be nil") } if deps.Logger == nil { deps.Logger = zap.NewNop() } deps.Logger = deps.Logger.Named("runtime") if deps.Notification == nil { deps.Notification = NewNoopNotificationPublisher(deps.Logger) } if deps.Now == nil { deps.Now = time.Now } if deps.Config.WorkerPoolSize <= 0 { deps.Config.WorkerPoolSize = 1 } if deps.Config.JobQueueSize <= 0 { deps.Config.JobQueueSize = 1 } if deps.Config.StopGracePeriod <= 0 { deps.Config.StopGracePeriod = 10 * time.Second } if deps.Config.ReconcileInterval <= 0 { deps.Config.ReconcileInterval = 60 * time.Second } if strings.TrimSpace(deps.Config.ContainerStateMount) == "" { deps.Config.ContainerStateMount = "/var/lib/galaxy-game" } if !dockerclient.PullPolicy(deps.Config.ImagePullPolicy).IsKnown() { return nil, fmt.Errorf("runtime: invalid image pull policy %q", deps.Config.ImagePullPolicy) } svc := &Service{deps: deps} svc.scheduler = NewScheduler(svc) svc.workers = NewWorkerPool(svc) return svc, nil } // Logger exposes the named logger used by the service. func (s *Service) Logger() *zap.Logger { return s.deps.Logger } // Cache returns the in-memory projection. func (s *Service) Cache() *Cache { return s.deps.Cache } // EngineVersions returns the engine-version registry service. func (s *Service) EngineVersions() *EngineVersionService { return s.deps.EngineVersions } // Workers returns the runtime worker pool component. func (s *Service) Workers() *WorkerPool { return s.workers } // Reconciler builds an `app.Component` driving the periodic // reconciliation loop documented in PLAN.md §5.5. func (s *Service) Reconciler() *Reconciler { return NewReconciler(s) } // SchedulerComponent returns the per-game scheduler bookkeeper. It // implements `app.Component` so main.go can register it alongside the // worker pool. func (s *Service) SchedulerComponent() *Scheduler { return s.scheduler } // gameLock returns a sync.Mutex unique to gameID. Used to serialise // per-game runtime operations across goroutines. func (s *Service) gameLock(gameID uuid.UUID) *sync.Mutex { if v, ok := s.gameMu.Load(gameID); ok { return v.(*sync.Mutex) } v, _ := s.gameMu.LoadOrStore(gameID, &sync.Mutex{}) return v.(*sync.Mutex) } // ===================================================================== // Lifecycle entry points (consumed by lobby.RuntimeGateway adapter) // ===================================================================== // StartGame queues a start job for gameID. Returns once the operation // is durably recorded; the actual pull / create / start runs on a // worker goroutine. func (s *Service) StartGame(ctx context.Context, gameID uuid.UUID) error { op, err := s.beginOperation(ctx, gameID, OpStart, OpSourceLobby) if err != nil { return err } return s.enqueue(ctx, jobStart{operation: op}) } // StopGame queues a stop job for gameID. func (s *Service) StopGame(ctx context.Context, gameID uuid.UUID) error { op, err := s.beginOperation(ctx, gameID, OpStop, OpSourceLobby) if err != nil { return err } return s.enqueue(ctx, jobStop{operation: op}) } // PauseGame flips the runtime row's `paused` flag. The container // keeps running; the scheduler short-circuits ticks while paused. // Synchronous because no Docker call is involved. func (s *Service) PauseGame(ctx context.Context, gameID uuid.UUID) error { mu := s.gameLock(gameID) mu.Lock() defer mu.Unlock() now := s.deps.Now().UTC() paused := true pausedAtPtr := &now patch := runtimeRecordUpdate{Paused: &paused, PausedAt: &pausedAtPtr} rec, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, patch, now) if err != nil { return err } s.deps.Cache.PutRuntime(rec) s.recordSyncOperation(ctx, gameID, OpPause, OpSourceLobby, rec.CurrentImageRef, rec.CurrentContainerID, nil) return nil } // ResumeGame clears the `paused` flag. Synchronous. func (s *Service) ResumeGame(ctx context.Context, gameID uuid.UUID) error { mu := s.gameLock(gameID) mu.Lock() defer mu.Unlock() now := s.deps.Now().UTC() paused := false var nilTime *time.Time cleared := &nilTime patch := runtimeRecordUpdate{Paused: &paused, PausedAt: cleared} rec, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, patch, now) if err != nil { return err } s.deps.Cache.PutRuntime(rec) s.recordSyncOperation(ctx, gameID, OpResume, OpSourceLobby, rec.CurrentImageRef, rec.CurrentContainerID, nil) return nil } // AdminRestart queues a restart job. Stop + remove + run with the // same image_ref. func (s *Service) AdminRestart(ctx context.Context, gameID uuid.UUID) (OperationLog, error) { op, err := s.beginOperation(ctx, gameID, OpRestart, OpSourceAdmin) if err != nil { return OperationLog{}, err } if err := s.enqueue(ctx, jobRestart{operation: op}); err != nil { return OperationLog{}, err } return op, nil } // AdminPatch validates the target version against the registry, then // queues a stop + remove + run with the new image. Returns // ErrPatchSemverIncompatible when the target crosses major/minor. func (s *Service) AdminPatch(ctx context.Context, gameID uuid.UUID, targetVersion string) (OperationLog, error) { rec, err := s.GetRuntime(ctx, gameID) if err != nil { return OperationLog{}, err } if rec.CurrentEngineVersion == "" { return OperationLog{}, fmt.Errorf("%w: runtime has no current engine version", ErrConflict) } if err := CheckPatchCompatible(rec.CurrentEngineVersion, targetVersion); err != nil { return OperationLog{}, err } target, err := s.deps.EngineVersions.Resolve(ctx, targetVersion) if err != nil { return OperationLog{}, err } op, err := s.beginOperation(ctx, gameID, OpPatch, OpSourceAdmin) if err != nil { return OperationLog{}, err } if err := s.enqueue(ctx, jobPatch{operation: op, target: target}); err != nil { return OperationLog{}, err } return op, nil } // AdminForceNextTurn sets the skip_next_tick flag so the next // scheduler tick fires immediately. Synchronous. func (s *Service) AdminForceNextTurn(ctx context.Context, gameID uuid.UUID) (OperationLog, error) { mu := s.gameLock(gameID) mu.Lock() defer mu.Unlock() now := s.deps.Now().UTC() skip := true rec, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, runtimeRecordUpdate{SkipNextTick: &skip}, now) if err != nil { return OperationLog{}, err } s.deps.Cache.PutRuntime(rec) op := s.recordSyncOperation(ctx, gameID, OpForceNextTurn, OpSourceAdmin, rec.CurrentImageRef, rec.CurrentContainerID, nil) return op, nil } // GetRuntime returns the runtime record for gameID, cache-first. func (s *Service) GetRuntime(ctx context.Context, gameID uuid.UUID) (RuntimeRecord, error) { if rec, ok := s.deps.Cache.GetRuntime(gameID); ok { return rec, nil } rec, err := s.deps.Store.LoadRuntimeRecord(ctx, gameID) if err != nil { return RuntimeRecord{}, err } s.deps.Cache.PutRuntime(rec) return rec, nil } // ResolvePlayerMapping returns the (race_name, engine_player_uuid) // projection for the supplied (game_id, user_id). Used by the user // game-proxy handlers to populate the engine `actor` field. func (s *Service) ResolvePlayerMapping(ctx context.Context, gameID, userID uuid.UUID) (PlayerMapping, error) { return s.deps.Store.LoadPlayerMapping(ctx, gameID, userID) } // EngineEndpoint returns the engine endpoint URL for gameID. Used by // the user game-proxy handlers. func (s *Service) EngineEndpoint(ctx context.Context, gameID uuid.UUID) (string, error) { rec, err := s.GetRuntime(ctx, gameID) if err != nil { return "", err } if rec.EngineEndpoint == "" { return "", fmt.Errorf("%w: runtime has no engine endpoint", ErrConflict) } return rec.EngineEndpoint, nil } // ===================================================================== // Worker / job execution // ===================================================================== // job is the internal interface implemented by every long-running // runtime task. The worker pool dispatches them in order. type job interface { GameID() uuid.UUID Run(ctx context.Context, s *Service) error Operation() OperationLog } type jobStart struct{ operation OperationLog } type jobStop struct{ operation OperationLog } type jobRestart struct{ operation OperationLog } type jobPatch struct { operation OperationLog target EngineVersion } func (j jobStart) GameID() uuid.UUID { return j.operation.GameID } func (j jobStop) GameID() uuid.UUID { return j.operation.GameID } func (j jobRestart) GameID() uuid.UUID { return j.operation.GameID } func (j jobPatch) GameID() uuid.UUID { return j.operation.GameID } func (j jobStart) Operation() OperationLog { return j.operation } func (j jobStop) Operation() OperationLog { return j.operation } func (j jobRestart) Operation() OperationLog { return j.operation } func (j jobPatch) Operation() OperationLog { return j.operation } func (j jobStart) Run(ctx context.Context, s *Service) error { return s.runStart(ctx, j.operation) } func (j jobStop) Run(ctx context.Context, s *Service) error { return s.runStop(ctx, j.operation) } func (j jobRestart) Run(ctx context.Context, s *Service) error { return s.runRestart(ctx, j.operation) } func (j jobPatch) Run(ctx context.Context, s *Service) error { return s.runPatch(ctx, j.operation, j.target) } // enqueue places job onto the worker channel. Returns ErrJobQueueFull // when the channel is at capacity; ErrShutdown when the pool is // stopped. func (s *Service) enqueue(ctx context.Context, j job) error { if s.workers == nil { return ErrShutdown } return s.workers.submit(ctx, j) } // beginOperation persists a queued operation log row. Caller is // responsible for transitioning it to running/succeeded/failed via // completeOperation. func (s *Service) beginOperation(ctx context.Context, gameID uuid.UUID, op, source string) (OperationLog, error) { in := operationLogInsert{ OperationID: uuid.New(), GameID: gameID, Op: op, Source: source, Status: OpStatusQueued, StartedAt: s.deps.Now().UTC(), } return s.deps.Store.InsertOperationLog(ctx, in) } // recordSyncOperation logs an operation that completed synchronously // (pause / resume / force-next-turn). It writes both the queued and // the terminal row to keep the audit trail consistent with worker // jobs. func (s *Service) recordSyncOperation(ctx context.Context, gameID uuid.UUID, op, source, imageRef, containerID string, runErr error) OperationLog { in := operationLogInsert{ OperationID: uuid.New(), GameID: gameID, Op: op, Source: source, Status: OpStatusRunning, ImageRef: imageRef, ContainerID: containerID, StartedAt: s.deps.Now().UTC(), } rec, err := s.deps.Store.InsertOperationLog(ctx, in) if err != nil { s.deps.Logger.Warn("operation log insert failed", zap.String("game_id", gameID.String()), zap.String("op", op), zap.Error(err)) return OperationLog{} } status := OpStatusSucceeded errCode := "" errMsg := "" if runErr != nil { status = OpStatusFailed errCode = "internal_error" errMsg = runErr.Error() } completed, err := s.deps.Store.CompleteOperationLog(ctx, rec.OperationID, status, errCode, errMsg, s.deps.Now().UTC()) if err != nil { s.deps.Logger.Warn("operation log complete failed", zap.String("game_id", gameID.String()), zap.String("op", op), zap.Error(err)) return rec } return completed } // completeOperation flips the row to a terminal status. runErr is nil // on success. func (s *Service) completeOperation(ctx context.Context, op OperationLog, runErr error) { status := OpStatusSucceeded errCode := "" errMsg := "" if runErr != nil { status = OpStatusFailed errCode = "internal_error" errMsg = runErr.Error() } if _, err := s.deps.Store.CompleteOperationLog(ctx, op.OperationID, status, errCode, errMsg, s.deps.Now().UTC()); err != nil { s.deps.Logger.Warn("operation log complete failed", zap.String("game_id", op.GameID.String()), zap.String("op", op.Op), zap.String("operation_id", op.OperationID.String()), zap.Error(err)) } } // ===================================================================== // runStart — the heart of the package // ===================================================================== func (s *Service) runStart(ctx context.Context, op OperationLog) error { gameID := op.GameID mu := s.gameLock(gameID) mu.Lock() defer mu.Unlock() game, err := s.deps.Store.LoadGameProjection(ctx, gameID) if err != nil { s.completeOperation(ctx, op, err) return err } if strings.TrimSpace(game.TargetEngineVersion) == "" { err := fmt.Errorf("%w: game has no target_engine_version", ErrInvalidInput) s.publishStartConfigInvalid(ctx, op, "target_engine_version is empty") s.completeOperation(ctx, op, err) return err } memberships, err := s.deps.Store.ListActiveMemberships(ctx, gameID) if err != nil { s.completeOperation(ctx, op, err) return err } if len(memberships) == 0 { err := fmt.Errorf("%w: game has no active memberships", ErrConflict) s.publishStartConfigInvalid(ctx, op, "no active memberships") s.completeOperation(ctx, op, err) return err } version, err := s.deps.EngineVersions.Resolve(ctx, game.TargetEngineVersion) if err != nil { s.publishStartConfigInvalid(ctx, op, fmt.Sprintf("engine version %q: %v", game.TargetEngineVersion, err)) s.completeOperation(ctx, op, err) return err } mappings := make([]PlayerMapping, 0, len(memberships)) races := make([]rest.InitRace, 0, len(memberships)) for _, m := range memberships { mappings = append(mappings, PlayerMapping{ GameID: gameID, UserID: m.UserID, RaceName: m.RaceName, EnginePlayerUUID: uuid.New(), }) races = append(races, rest.InitRace{RaceName: m.RaceName}) } if err := s.deps.Store.InsertPlayerMappings(ctx, mappings); err != nil { s.completeOperation(ctx, op, err) return err } statePath := filepath.Join(filepath.Clean(s.deps.Config.ContainerStateMount), gameID.String()) hostStatePath := filepath.Join(filepath.Clean(s.hostStateRoot()), gameID.String()) // Bind-mount sources are resolved by the Docker daemon against // the host filesystem, not against the backend process namespace. // Production deploys mount the same `BACKEND_GAME_STATE_ROOT` // path into the backend container at the same path, so creating // the per-game subdirectory inside backend makes it visible to // the daemon at the same absolute path. // // The directory is created with mode 0o777 (and explicitly // chmod-ed to override umask) because the engine container may // run as a different uid than backend. Both processes need // read-write access to the bind-mounted state path; backend has // no way to know the engine container's uid ahead of time, so // world-writable is the conservative default. Production // deployments that pin both containers to the same user can // tighten the mode through a future configuration knob. if err := os.MkdirAll(hostStatePath, 0o777); err != nil { s.completeOperation(ctx, op, fmt.Errorf("create host state path %q: %w", hostStatePath, err)) return err } if err := os.Chmod(hostStatePath, 0o777); err != nil { s.completeOperation(ctx, op, fmt.Errorf("chmod host state path %q: %w", hostStatePath, err)) return err } spec := dockerclient.RunSpec{ Name: ContainerName(gameID.String()), Image: version.ImageRef, Hostname: HostName(gameID.String()), Network: s.dockerNetwork(), Env: map[string]string{ "GAME_STATE_PATH": statePath, }, Labels: map[string]string{ "galaxy.game_id": gameID.String(), "galaxy.engine_version": version.Version, }, BindMounts: []dockerclient.BindMount{ { HostPath: hostStatePath, MountPath: s.deps.Config.ContainerStateMount, ReadOnly: false, }, }, LogDriver: s.deps.Config.ContainerLogDriver, LogOpts: s.deps.Config.ContainerLogOpts, CPUQuota: s.deps.Config.ContainerCPUQuota, Memory: s.deps.Config.ContainerMemory, PIDsLimit: s.deps.Config.ContainerPIDsLimit, PullPolicy: dockerclient.PullPolicy(s.deps.Config.ImagePullPolicy), } runResult, err := s.deps.Docker.Run(ctx, spec) if err != nil { s.publishStartFailure(ctx, op, version.ImageRef, err) s.completeOperation(ctx, op, err) return err } now := s.deps.Now().UTC() startedAt := runResult.StartedAt if startedAt.IsZero() { startedAt = now } startedAtPtr := &startedAt rec, err := s.upsertRuntimeRecord(ctx, runtimeRecordInsert{ GameID: gameID, Status: RuntimeStatusStarting, CurrentContainerID: runResult.ContainerID, CurrentImageRef: version.ImageRef, CurrentEngineVersion: version.Version, EngineEndpoint: runResult.EngineEndpoint, StatePath: statePath, DockerNetwork: s.dockerNetwork(), TurnSchedule: game.TurnSchedule, StartedAt: &startedAt, }, runtimeRecordUpdate{ Status: strPtr(RuntimeStatusStarting), CurrentContainerID: strPtr(runResult.ContainerID), CurrentImageRef: strPtr(version.ImageRef), CurrentEngineVersion: strPtr(version.Version), EngineEndpoint: strPtr(runResult.EngineEndpoint), StatePath: strPtr(statePath), DockerNetwork: strPtr(s.dockerNetwork()), TurnSchedule: strPtr(game.TurnSchedule), StartedAt: &startedAtPtr, }) if err != nil { s.completeOperation(ctx, op, err) return err } // Wait for the engine HTTP listener before issuing init. Docker // reports the container as running as soon as the entrypoint // starts, but the Go binary inside may take a moment to bind // the port; without this loop, Init races the listener and // fails with `connection refused`. if err := s.waitForEngineHealthz(ctx, runResult.EngineEndpoint, 30*time.Second); err != nil { s.deps.Logger.Warn("engine healthz never succeeded", zap.String("game_id", gameID.String()), zap.Error(err)) s.transitionRuntimeStatus(ctx, gameID, RuntimeStatusEngineUnreachable, "") s.completeOperation(ctx, op, err) return err } initResp, err := s.deps.Engine.Init(ctx, runResult.EngineEndpoint, rest.InitRequest{Races: races}) if err != nil { s.deps.Logger.Warn("engine init failed", zap.String("game_id", gameID.String()), zap.Error(err)) s.transitionRuntimeStatus(ctx, gameID, RuntimeStatusEngineUnreachable, "") s.completeOperation(ctx, op, err) return err } // Engine is up. Transition the runtime row to running and publish // the snapshot into lobby. rec, err = s.transitionRuntimeStatus(ctx, gameID, RuntimeStatusRunning, "ok") if err != nil { s.completeOperation(ctx, op, err) return err } s.scheduler.startGame(rec) if err := s.publishSnapshot(ctx, gameID, initResp); err != nil { s.deps.Logger.Warn("publish init snapshot failed", zap.String("game_id", gameID.String()), zap.Error(err)) } s.completeOperation(ctx, op, nil) return nil } // runStop stops + removes the engine container and transitions the // runtime row to `stopped`. func (s *Service) runStop(ctx context.Context, op OperationLog) error { gameID := op.GameID mu := s.gameLock(gameID) mu.Lock() defer mu.Unlock() rec, err := s.GetRuntime(ctx, gameID) if err != nil { s.completeOperation(ctx, op, err) return err } s.scheduler.stopGame(gameID) if rec.CurrentContainerID != "" { if err := s.deps.Docker.Stop(ctx, rec.CurrentContainerID, int(s.deps.Config.StopGracePeriod/time.Second)); err != nil && !errors.Is(err, dockerclient.ErrContainerNotFound) { s.completeOperation(ctx, op, err) return err } if err := s.deps.Docker.Remove(ctx, rec.CurrentContainerID); err != nil { s.completeOperation(ctx, op, err) return err } } now := s.deps.Now().UTC() stoppedAtPtr := &now updated, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, runtimeRecordUpdate{ Status: strPtr(RuntimeStatusStopped), StoppedAt: &stoppedAtPtr, }, now) if err != nil { s.completeOperation(ctx, op, err) return err } s.deps.Cache.PutRuntime(updated) if err := s.deps.Store.DeletePlayerMappingsForGame(ctx, gameID); err != nil { s.deps.Logger.Warn("delete player_mappings on stop failed", zap.String("game_id", gameID.String()), zap.Error(err)) } s.completeOperation(ctx, op, nil) return nil } // runRestart stops + removes + runs a fresh container with the same // image_ref. Reuses runStart's logic via re-loading the lobby // projection. func (s *Service) runRestart(ctx context.Context, op OperationLog) error { if err := s.runStop(ctx, op); err != nil { return err } // Reuse runStart with a freshly minted operation row so the audit // trail remains consistent. startOp, err := s.beginOperation(ctx, op.GameID, OpStart, op.Source) if err != nil { return err } return s.runStart(ctx, startOp) } // runPatch stops + removes the current container, updates the engine // version reference, and starts a fresh container. func (s *Service) runPatch(ctx context.Context, op OperationLog, target EngineVersion) error { mu := s.gameLock(op.GameID) mu.Lock() defer mu.Unlock() rec, err := s.GetRuntime(ctx, op.GameID) if err != nil { s.completeOperation(ctx, op, err) return err } s.scheduler.stopGame(op.GameID) if rec.CurrentContainerID != "" { if err := s.deps.Docker.Stop(ctx, rec.CurrentContainerID, int(s.deps.Config.StopGracePeriod/time.Second)); err != nil && !errors.Is(err, dockerclient.ErrContainerNotFound) { s.completeOperation(ctx, op, err) return err } if err := s.deps.Docker.Remove(ctx, rec.CurrentContainerID); err != nil { s.completeOperation(ctx, op, err) return err } } statePath := rec.StatePath if statePath == "" { statePath = filepath.Join(filepath.Clean(s.deps.Config.ContainerStateMount), op.GameID.String()) } hostStatePath := filepath.Join(filepath.Clean(s.hostStateRoot()), op.GameID.String()) spec := dockerclient.RunSpec{ Name: ContainerName(op.GameID.String()), Image: target.ImageRef, Hostname: HostName(op.GameID.String()), Network: s.dockerNetwork(), Env: map[string]string{ "GAME_STATE_PATH": statePath, }, Labels: map[string]string{ "galaxy.game_id": op.GameID.String(), "galaxy.engine_version": target.Version, }, BindMounts: []dockerclient.BindMount{ {HostPath: hostStatePath, MountPath: s.deps.Config.ContainerStateMount}, }, LogDriver: s.deps.Config.ContainerLogDriver, LogOpts: s.deps.Config.ContainerLogOpts, CPUQuota: s.deps.Config.ContainerCPUQuota, Memory: s.deps.Config.ContainerMemory, PIDsLimit: s.deps.Config.ContainerPIDsLimit, PullPolicy: dockerclient.PullPolicy(s.deps.Config.ImagePullPolicy), } runResult, err := s.deps.Docker.Run(ctx, spec) if err != nil { s.publishStartFailure(ctx, op, target.ImageRef, err) s.completeOperation(ctx, op, err) return err } now := s.deps.Now().UTC() startedAt := runResult.StartedAt if startedAt.IsZero() { startedAt = now } startedAtPtr := &startedAt updated, err := s.deps.Store.UpdateRuntimeRecord(ctx, op.GameID, runtimeRecordUpdate{ Status: strPtr(RuntimeStatusRunning), CurrentContainerID: strPtr(runResult.ContainerID), CurrentImageRef: strPtr(target.ImageRef), CurrentEngineVersion: strPtr(target.Version), EngineEndpoint: strPtr(runResult.EngineEndpoint), StartedAt: &startedAtPtr, EngineHealth: strPtr("ok"), }, now) if err != nil { s.completeOperation(ctx, op, err) return err } s.deps.Cache.PutRuntime(updated) s.scheduler.startGame(updated) s.completeOperation(ctx, op, nil) return nil } // ===================================================================== // Snapshot / status helpers // ===================================================================== // publishSnapshot writes a runtime_health_snapshots row, refreshes the // runtime cache from `current_turn` / `engine_health`, and forwards // the snapshot to lobby. func (s *Service) publishSnapshot(ctx context.Context, gameID uuid.UUID, state rest.StateResponse) error { now := s.deps.Now().UTC() payload, err := json.Marshal(state) if err != nil { return fmt.Errorf("marshal snapshot: %w", err) } if err := s.deps.Store.InsertHealthSnapshot(ctx, uuid.New(), gameID, now, payload); err != nil { return err } currentTurn := int32(state.Turn) patch := runtimeRecordUpdate{ CurrentTurn: ¤tTurn, EngineHealth: strPtr("ok"), LastObservedAt: dblTime(now), } if state.Finished { patch.Status = strPtr(RuntimeStatusFinished) finishedAtPtr := &now patch.FinishedAt = &finishedAtPtr } rec, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, patch, now) if err != nil { return err } s.deps.Cache.PutRuntime(rec) if s.deps.Lobby != nil { mappings, err := s.deps.Store.ListPlayerMappingsForGame(ctx, gameID) if err != nil { s.deps.Logger.Warn("list player_mappings on snapshot failed", zap.String("game_id", gameID.String()), zap.Error(err)) } userByEngine := make(map[uuid.UUID]uuid.UUID, len(mappings)) userByRace := make(map[string]uuid.UUID, len(mappings)) for _, m := range mappings { userByEngine[m.EnginePlayerUUID] = m.UserID userByRace[m.RaceName] = m.UserID } stats := make([]LobbyPlayerStats, 0, len(state.Players)) for _, p := range state.Players { userID, ok := userByEngine[p.ID] if !ok { userID = userByRace[p.RaceName] } if userID == uuid.Nil { continue } stats = append(stats, LobbyPlayerStats{ UserID: userID, CurrentPlanets: int32(p.Planets), CurrentPopulation: int32(p.Population), MaxPlanets: int32(p.Planets), MaxPopulation: int32(p.Population), }) } runtimeStatus := RuntimeStatusRunning if state.Finished { runtimeStatus = RuntimeStatusFinished } err = s.deps.Lobby.OnRuntimeSnapshot(ctx, gameID, LobbySnapshot{ CurrentTurn: currentTurn, RuntimeStatus: runtimeStatus, EngineHealth: "ok", ObservedAt: now, PlayerStats: stats, }) if err != nil { s.deps.Logger.Warn("lobby snapshot consumer failed", zap.String("game_id", gameID.String()), zap.Error(err)) } } return nil } // transitionRuntimeStatus updates the status / engine_health columns // and refreshes the cache. func (s *Service) transitionRuntimeStatus(ctx context.Context, gameID uuid.UUID, status, health string) (RuntimeRecord, error) { now := s.deps.Now().UTC() patch := runtimeRecordUpdate{Status: &status} if health != "" { patch.EngineHealth = &health } if status == RuntimeStatusFinished { finishedAtPtr := &now patch.FinishedAt = &finishedAtPtr } if status == RuntimeStatusStopped { stoppedAtPtr := &now patch.StoppedAt = &stoppedAtPtr } rec, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, patch, now) if err != nil { return RuntimeRecord{}, err } s.deps.Cache.PutRuntime(rec) return rec, nil } // upsertRuntimeRecord inserts the record when no row exists; updates // it otherwise. Used by runStart so a re-attempt after a worker crash // stays idempotent. func (s *Service) upsertRuntimeRecord(ctx context.Context, in runtimeRecordInsert, patch runtimeRecordUpdate) (RuntimeRecord, error) { rec, err := s.deps.Store.InsertRuntimeRecord(ctx, in) if err == nil { s.deps.Cache.PutRuntime(rec) return rec, nil } if !errors.Is(err, ErrConflict) { return RuntimeRecord{}, err } updated, err := s.deps.Store.UpdateRuntimeRecord(ctx, in.GameID, patch, s.deps.Now().UTC()) if err != nil { return RuntimeRecord{}, err } s.deps.Cache.PutRuntime(updated) return updated, nil } // dockerNetwork returns the user-defined Docker network name engine // containers attach to. Wired from cfg.Docker.Network through Deps. func (s *Service) dockerNetwork() string { return s.deps.DockerNetwork } // waitForEngineHealthz polls the engine `/healthz` endpoint until it // responds 2xx or until the timeout elapses. The Docker daemon // reports a container as `running` as soon as the entrypoint starts, // but the engine binary may need a moment to bind its TCP port; the // retry loop bridges that gap so the immediately-following Init call // does not race the listener. func (s *Service) waitForEngineHealthz(ctx context.Context, baseURL string, timeout time.Duration) error { deadline := time.Now().Add(timeout) var lastErr error for { probeCtx, cancel := context.WithTimeout(ctx, time.Second) err := s.deps.Engine.Healthz(probeCtx, baseURL) cancel() if err == nil { return nil } lastErr = err if time.Now().After(deadline) { return fmt.Errorf("engine healthz never succeeded within %s: %w", timeout, lastErr) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(200 * time.Millisecond): } } } // hostStateRoot returns the host-side root directory under which the // per-game state directory is created. Wired from cfg.Game.StateRoot // through Deps. func (s *Service) hostStateRoot() string { if s.deps.HostStateRoot != "" { return s.deps.HostStateRoot } return s.deps.Config.ContainerStateMount } // strPtr returns a pointer to s. Helps assemble runtimeRecordUpdate // values inline. func strPtr(s string) *string { return &s } // dblTime returns a `**time.Time` set to t. Used to clear / set the // nullable timestamp columns of `runtime_records` through // runtimeRecordUpdate. func dblTime(t time.Time) **time.Time { p := &t; return &p }