15d35f6f1f
Engine no longer mints its own game UUID. The orchestrator (backend)
generates the game UUID at game-create time and passes it in the
admin/init request body as the required `gameId` field, so the value
that names the engine container and host bind-mount directory also
ends up inside the engine's state.json.
The engine rejects the zero UUID with 400 and any init that conflicts
with an existing state.json with 409 (a second init on the same gameId
is also a conflict; full idempotency is not part of the contract).
Updates rest.InitRequest, openapi.yaml (schema + 409 response),
controller.GenerateGame/NewGame/buildGameOnMap signatures, the engine
HTTP handler/executor, the backend runtime worker, and the relevant
unit and contract tests. Documentation in game/README.md,
docs/ARCHITECTURE.md, backend/README.md, and backend/docs/{runtime,flows}.md
is updated in the same patch.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1005 lines
34 KiB
Go
1005 lines
34 KiB
Go
package runtime
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"galaxy/backend/internal/dockerclient"
|
|
"galaxy/model/rest"
|
|
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Service is the runtime-domain entry point. It owns the per-game
|
|
// lifecycle (start, stop, pause, resume, restart, patch,
|
|
// force-next-turn), the runtime cache, the player-mapping projection,
|
|
// and the operation log; it coordinates with the worker pool and the
|
|
// per-game scheduler goroutines.
|
|
type Service struct {
|
|
deps Deps
|
|
|
|
gameMu sync.Map // uuid.UUID -> *sync.Mutex
|
|
|
|
scheduler *Scheduler
|
|
workers *WorkerPool
|
|
}
|
|
|
|
// NewService constructs a Service. Logger and Now default sensibly. The
|
|
// `Service` is `app.Component`-shaped through the embedded WorkerPool /
|
|
// Scheduler / Reconciler that callers register separately.
|
|
func NewService(deps Deps) (*Service, error) {
|
|
if deps.Store == nil {
|
|
return nil, errors.New("runtime: store must not be nil")
|
|
}
|
|
if deps.Cache == nil {
|
|
return nil, errors.New("runtime: cache must not be nil")
|
|
}
|
|
if deps.EngineVersions == nil {
|
|
return nil, errors.New("runtime: engine version service must not be nil")
|
|
}
|
|
if deps.Docker == nil {
|
|
return nil, errors.New("runtime: docker client must not be nil")
|
|
}
|
|
if deps.Engine == nil {
|
|
return nil, errors.New("runtime: engine client must not be nil")
|
|
}
|
|
if deps.Logger == nil {
|
|
deps.Logger = zap.NewNop()
|
|
}
|
|
deps.Logger = deps.Logger.Named("runtime")
|
|
if deps.Notification == nil {
|
|
deps.Notification = NewNoopNotificationPublisher(deps.Logger)
|
|
}
|
|
if deps.Now == nil {
|
|
deps.Now = time.Now
|
|
}
|
|
if deps.Config.WorkerPoolSize <= 0 {
|
|
deps.Config.WorkerPoolSize = 1
|
|
}
|
|
if deps.Config.JobQueueSize <= 0 {
|
|
deps.Config.JobQueueSize = 1
|
|
}
|
|
if deps.Config.StopGracePeriod <= 0 {
|
|
deps.Config.StopGracePeriod = 10 * time.Second
|
|
}
|
|
if deps.Config.ReconcileInterval <= 0 {
|
|
deps.Config.ReconcileInterval = 60 * time.Second
|
|
}
|
|
if strings.TrimSpace(deps.Config.ContainerStateMount) == "" {
|
|
deps.Config.ContainerStateMount = "/var/lib/galaxy-game"
|
|
}
|
|
if !dockerclient.PullPolicy(deps.Config.ImagePullPolicy).IsKnown() {
|
|
return nil, fmt.Errorf("runtime: invalid image pull policy %q", deps.Config.ImagePullPolicy)
|
|
}
|
|
svc := &Service{deps: deps}
|
|
svc.scheduler = NewScheduler(svc)
|
|
svc.workers = NewWorkerPool(svc)
|
|
return svc, nil
|
|
}
|
|
|
|
// Logger exposes the named logger used by the service.
|
|
func (s *Service) Logger() *zap.Logger { return s.deps.Logger }
|
|
|
|
// Cache returns the in-memory projection.
|
|
func (s *Service) Cache() *Cache { return s.deps.Cache }
|
|
|
|
// EngineVersions returns the engine-version registry service.
|
|
func (s *Service) EngineVersions() *EngineVersionService { return s.deps.EngineVersions }
|
|
|
|
// Workers returns the runtime worker pool component.
|
|
func (s *Service) Workers() *WorkerPool { return s.workers }
|
|
|
|
// Reconciler builds an `app.Component` driving the periodic
|
|
// reconciliation loop documented in PLAN.md §5.5.
|
|
func (s *Service) Reconciler() *Reconciler { return NewReconciler(s) }
|
|
|
|
// SchedulerComponent returns the per-game scheduler bookkeeper. It
|
|
// implements `app.Component` so main.go can register it alongside the
|
|
// worker pool.
|
|
func (s *Service) SchedulerComponent() *Scheduler { return s.scheduler }
|
|
|
|
// gameLock returns a sync.Mutex unique to gameID. Used to serialise
|
|
// per-game runtime operations across goroutines.
|
|
func (s *Service) gameLock(gameID uuid.UUID) *sync.Mutex {
|
|
if v, ok := s.gameMu.Load(gameID); ok {
|
|
return v.(*sync.Mutex)
|
|
}
|
|
v, _ := s.gameMu.LoadOrStore(gameID, &sync.Mutex{})
|
|
return v.(*sync.Mutex)
|
|
}
|
|
|
|
// =====================================================================
|
|
// Lifecycle entry points (consumed by lobby.RuntimeGateway adapter)
|
|
// =====================================================================
|
|
|
|
// StartGame queues a start job for gameID. Returns once the operation
|
|
// is durably recorded; the actual pull / create / start runs on a
|
|
// worker goroutine.
|
|
func (s *Service) StartGame(ctx context.Context, gameID uuid.UUID) error {
|
|
op, err := s.beginOperation(ctx, gameID, OpStart, OpSourceLobby)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.enqueue(ctx, jobStart{operation: op})
|
|
}
|
|
|
|
// StopGame queues a stop job for gameID.
|
|
func (s *Service) StopGame(ctx context.Context, gameID uuid.UUID) error {
|
|
op, err := s.beginOperation(ctx, gameID, OpStop, OpSourceLobby)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.enqueue(ctx, jobStop{operation: op})
|
|
}
|
|
|
|
// PauseGame flips the runtime row's `paused` flag. The container
|
|
// keeps running; the scheduler short-circuits ticks while paused.
|
|
// Synchronous because no Docker call is involved.
|
|
func (s *Service) PauseGame(ctx context.Context, gameID uuid.UUID) error {
|
|
mu := s.gameLock(gameID)
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
now := s.deps.Now().UTC()
|
|
paused := true
|
|
pausedAtPtr := &now
|
|
patch := runtimeRecordUpdate{Paused: &paused, PausedAt: &pausedAtPtr}
|
|
rec, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, patch, now)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.deps.Cache.PutRuntime(rec)
|
|
s.recordSyncOperation(ctx, gameID, OpPause, OpSourceLobby, rec.CurrentImageRef, rec.CurrentContainerID, nil)
|
|
return nil
|
|
}
|
|
|
|
// ResumeGame clears the `paused` flag. Synchronous.
|
|
func (s *Service) ResumeGame(ctx context.Context, gameID uuid.UUID) error {
|
|
mu := s.gameLock(gameID)
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
now := s.deps.Now().UTC()
|
|
paused := false
|
|
var nilTime *time.Time
|
|
cleared := &nilTime
|
|
patch := runtimeRecordUpdate{Paused: &paused, PausedAt: cleared}
|
|
rec, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, patch, now)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.deps.Cache.PutRuntime(rec)
|
|
s.recordSyncOperation(ctx, gameID, OpResume, OpSourceLobby, rec.CurrentImageRef, rec.CurrentContainerID, nil)
|
|
return nil
|
|
}
|
|
|
|
// AdminRestart queues a restart job. Stop + remove + run with the
|
|
// same image_ref.
|
|
func (s *Service) AdminRestart(ctx context.Context, gameID uuid.UUID) (OperationLog, error) {
|
|
op, err := s.beginOperation(ctx, gameID, OpRestart, OpSourceAdmin)
|
|
if err != nil {
|
|
return OperationLog{}, err
|
|
}
|
|
if err := s.enqueue(ctx, jobRestart{operation: op}); err != nil {
|
|
return OperationLog{}, err
|
|
}
|
|
return op, nil
|
|
}
|
|
|
|
// AdminPatch validates the target version against the registry, then
|
|
// queues a stop + remove + run with the new image. Returns
|
|
// ErrPatchSemverIncompatible when the target crosses major/minor.
|
|
func (s *Service) AdminPatch(ctx context.Context, gameID uuid.UUID, targetVersion string) (OperationLog, error) {
|
|
rec, err := s.GetRuntime(ctx, gameID)
|
|
if err != nil {
|
|
return OperationLog{}, err
|
|
}
|
|
if rec.CurrentEngineVersion == "" {
|
|
return OperationLog{}, fmt.Errorf("%w: runtime has no current engine version", ErrConflict)
|
|
}
|
|
if err := CheckPatchCompatible(rec.CurrentEngineVersion, targetVersion); err != nil {
|
|
return OperationLog{}, err
|
|
}
|
|
target, err := s.deps.EngineVersions.Resolve(ctx, targetVersion)
|
|
if err != nil {
|
|
return OperationLog{}, err
|
|
}
|
|
op, err := s.beginOperation(ctx, gameID, OpPatch, OpSourceAdmin)
|
|
if err != nil {
|
|
return OperationLog{}, err
|
|
}
|
|
if err := s.enqueue(ctx, jobPatch{operation: op, target: target}); err != nil {
|
|
return OperationLog{}, err
|
|
}
|
|
return op, nil
|
|
}
|
|
|
|
// AdminForceNextTurn sets the skip_next_tick flag so the next
|
|
// scheduler tick fires immediately. Synchronous.
|
|
func (s *Service) AdminForceNextTurn(ctx context.Context, gameID uuid.UUID) (OperationLog, error) {
|
|
mu := s.gameLock(gameID)
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
now := s.deps.Now().UTC()
|
|
skip := true
|
|
rec, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, runtimeRecordUpdate{SkipNextTick: &skip}, now)
|
|
if err != nil {
|
|
return OperationLog{}, err
|
|
}
|
|
s.deps.Cache.PutRuntime(rec)
|
|
op := s.recordSyncOperation(ctx, gameID, OpForceNextTurn, OpSourceAdmin, rec.CurrentImageRef, rec.CurrentContainerID, nil)
|
|
return op, nil
|
|
}
|
|
|
|
// GetRuntime returns the runtime record for gameID, cache-first.
|
|
func (s *Service) GetRuntime(ctx context.Context, gameID uuid.UUID) (RuntimeRecord, error) {
|
|
if rec, ok := s.deps.Cache.GetRuntime(gameID); ok {
|
|
return rec, nil
|
|
}
|
|
rec, err := s.deps.Store.LoadRuntimeRecord(ctx, gameID)
|
|
if err != nil {
|
|
return RuntimeRecord{}, err
|
|
}
|
|
s.deps.Cache.PutRuntime(rec)
|
|
return rec, nil
|
|
}
|
|
|
|
// ResolvePlayerMapping returns the (race_name, engine_player_uuid)
|
|
// projection for the supplied (game_id, user_id). Used by the user
|
|
// game-proxy handlers to populate the engine `actor` field.
|
|
func (s *Service) ResolvePlayerMapping(ctx context.Context, gameID, userID uuid.UUID) (PlayerMapping, error) {
|
|
return s.deps.Store.LoadPlayerMapping(ctx, gameID, userID)
|
|
}
|
|
|
|
// CheckOrdersAccept verifies that the runtime is in a state that
|
|
// accepts user-games commands and orders. It is called by the user
|
|
// game-proxy handlers (`Commands`, `Orders`) before forwarding to
|
|
// engine, so the backend's turn-cutoff and pause guards run before
|
|
// network traffic leaves the host. The decision itself lives in the
|
|
// pure helper `OrdersAcceptStatus` so it can be unit-tested without
|
|
// constructing a full Service.
|
|
//
|
|
// A missing runtime row is surfaced as `ErrNotFound` so the handler
|
|
// keeps its existing 404 behaviour.
|
|
func (s *Service) CheckOrdersAccept(ctx context.Context, gameID uuid.UUID) error {
|
|
rec, err := s.GetRuntime(ctx, gameID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return OrdersAcceptStatus(rec)
|
|
}
|
|
|
|
// OrdersAcceptStatus inspects a runtime record and returns the
|
|
// matching sentinel for the user-games order/command pre-check:
|
|
//
|
|
// - `runtime_status = generation_in_progress` → `ErrTurnAlreadyClosed`.
|
|
// The cron-driven `Scheduler.tick` has flipped the row before
|
|
// calling the engine. The order window reopens once the tick
|
|
// completes successfully.
|
|
//
|
|
// - `runtime_status ∈ {engine_unreachable, generation_failed,
|
|
// stopped, finished, removed, starting}` → `ErrGamePaused`.
|
|
// The game is not in a state that accepts writes; the lobby
|
|
// state machine has either already flipped the game to
|
|
// `paused` / `finished` or is still bootstrapping.
|
|
//
|
|
// - `runtime.Paused = true` → `ErrGamePaused`. The lobby admin
|
|
// paused the game explicitly.
|
|
//
|
|
// - `runtime_status = running` and `Paused = false` → nil
|
|
// (forward).
|
|
func OrdersAcceptStatus(rec RuntimeRecord) error {
|
|
if rec.Paused {
|
|
return ErrGamePaused
|
|
}
|
|
switch rec.Status {
|
|
case RuntimeStatusRunning:
|
|
return nil
|
|
case RuntimeStatusGenerationInProgress:
|
|
return ErrTurnAlreadyClosed
|
|
default:
|
|
return ErrGamePaused
|
|
}
|
|
}
|
|
|
|
// EngineEndpoint returns the engine endpoint URL for gameID. Used by
|
|
// the user game-proxy handlers.
|
|
func (s *Service) EngineEndpoint(ctx context.Context, gameID uuid.UUID) (string, error) {
|
|
rec, err := s.GetRuntime(ctx, gameID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if rec.EngineEndpoint == "" {
|
|
return "", fmt.Errorf("%w: runtime has no engine endpoint", ErrConflict)
|
|
}
|
|
return rec.EngineEndpoint, nil
|
|
}
|
|
|
|
// =====================================================================
|
|
// Worker / job execution
|
|
// =====================================================================
|
|
|
|
// job is the internal interface implemented by every long-running
|
|
// runtime task. The worker pool dispatches them in order.
|
|
type job interface {
|
|
GameID() uuid.UUID
|
|
Run(ctx context.Context, s *Service) error
|
|
Operation() OperationLog
|
|
}
|
|
|
|
type jobStart struct{ operation OperationLog }
|
|
type jobStop struct{ operation OperationLog }
|
|
type jobRestart struct{ operation OperationLog }
|
|
type jobPatch struct {
|
|
operation OperationLog
|
|
target EngineVersion
|
|
}
|
|
|
|
func (j jobStart) GameID() uuid.UUID { return j.operation.GameID }
|
|
func (j jobStop) GameID() uuid.UUID { return j.operation.GameID }
|
|
func (j jobRestart) GameID() uuid.UUID { return j.operation.GameID }
|
|
func (j jobPatch) GameID() uuid.UUID { return j.operation.GameID }
|
|
func (j jobStart) Operation() OperationLog { return j.operation }
|
|
func (j jobStop) Operation() OperationLog { return j.operation }
|
|
func (j jobRestart) Operation() OperationLog { return j.operation }
|
|
func (j jobPatch) Operation() OperationLog { return j.operation }
|
|
|
|
func (j jobStart) Run(ctx context.Context, s *Service) error { return s.runStart(ctx, j.operation) }
|
|
func (j jobStop) Run(ctx context.Context, s *Service) error { return s.runStop(ctx, j.operation) }
|
|
func (j jobRestart) Run(ctx context.Context, s *Service) error {
|
|
return s.runRestart(ctx, j.operation)
|
|
}
|
|
func (j jobPatch) Run(ctx context.Context, s *Service) error {
|
|
return s.runPatch(ctx, j.operation, j.target)
|
|
}
|
|
|
|
// enqueue places job onto the worker channel. Returns ErrJobQueueFull
|
|
// when the channel is at capacity; ErrShutdown when the pool is
|
|
// stopped.
|
|
func (s *Service) enqueue(ctx context.Context, j job) error {
|
|
if s.workers == nil {
|
|
return ErrShutdown
|
|
}
|
|
return s.workers.submit(ctx, j)
|
|
}
|
|
|
|
// beginOperation persists a queued operation log row. Caller is
|
|
// responsible for transitioning it to running/succeeded/failed via
|
|
// completeOperation.
|
|
func (s *Service) beginOperation(ctx context.Context, gameID uuid.UUID, op, source string) (OperationLog, error) {
|
|
in := operationLogInsert{
|
|
OperationID: uuid.New(),
|
|
GameID: gameID,
|
|
Op: op,
|
|
Source: source,
|
|
Status: OpStatusQueued,
|
|
StartedAt: s.deps.Now().UTC(),
|
|
}
|
|
return s.deps.Store.InsertOperationLog(ctx, in)
|
|
}
|
|
|
|
// recordSyncOperation logs an operation that completed synchronously
|
|
// (pause / resume / force-next-turn). It writes both the queued and
|
|
// the terminal row to keep the audit trail consistent with worker
|
|
// jobs.
|
|
func (s *Service) recordSyncOperation(ctx context.Context, gameID uuid.UUID, op, source, imageRef, containerID string, runErr error) OperationLog {
|
|
in := operationLogInsert{
|
|
OperationID: uuid.New(),
|
|
GameID: gameID,
|
|
Op: op,
|
|
Source: source,
|
|
Status: OpStatusRunning,
|
|
ImageRef: imageRef,
|
|
ContainerID: containerID,
|
|
StartedAt: s.deps.Now().UTC(),
|
|
}
|
|
rec, err := s.deps.Store.InsertOperationLog(ctx, in)
|
|
if err != nil {
|
|
s.deps.Logger.Warn("operation log insert failed",
|
|
zap.String("game_id", gameID.String()),
|
|
zap.String("op", op),
|
|
zap.Error(err))
|
|
return OperationLog{}
|
|
}
|
|
status := OpStatusSucceeded
|
|
errCode := ""
|
|
errMsg := ""
|
|
if runErr != nil {
|
|
status = OpStatusFailed
|
|
errCode = "internal_error"
|
|
errMsg = runErr.Error()
|
|
}
|
|
completed, err := s.deps.Store.CompleteOperationLog(ctx, rec.OperationID, status, errCode, errMsg, s.deps.Now().UTC())
|
|
if err != nil {
|
|
s.deps.Logger.Warn("operation log complete failed",
|
|
zap.String("game_id", gameID.String()),
|
|
zap.String("op", op),
|
|
zap.Error(err))
|
|
return rec
|
|
}
|
|
return completed
|
|
}
|
|
|
|
// completeOperation flips the row to a terminal status. runErr is nil
|
|
// on success.
|
|
func (s *Service) completeOperation(ctx context.Context, op OperationLog, runErr error) {
|
|
status := OpStatusSucceeded
|
|
errCode := ""
|
|
errMsg := ""
|
|
if runErr != nil {
|
|
status = OpStatusFailed
|
|
errCode = "internal_error"
|
|
errMsg = runErr.Error()
|
|
}
|
|
if _, err := s.deps.Store.CompleteOperationLog(ctx, op.OperationID, status, errCode, errMsg, s.deps.Now().UTC()); err != nil {
|
|
s.deps.Logger.Warn("operation log complete failed",
|
|
zap.String("game_id", op.GameID.String()),
|
|
zap.String("op", op.Op),
|
|
zap.String("operation_id", op.OperationID.String()),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// =====================================================================
|
|
// runStart — the heart of the package
|
|
// =====================================================================
|
|
|
|
func (s *Service) runStart(ctx context.Context, op OperationLog) error {
|
|
gameID := op.GameID
|
|
mu := s.gameLock(gameID)
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
game, err := s.deps.Store.LoadGameProjection(ctx, gameID)
|
|
if err != nil {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
if strings.TrimSpace(game.TargetEngineVersion) == "" {
|
|
err := fmt.Errorf("%w: game has no target_engine_version", ErrInvalidInput)
|
|
s.publishStartConfigInvalid(ctx, op, "target_engine_version is empty")
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
memberships, err := s.deps.Store.ListActiveMemberships(ctx, gameID)
|
|
if err != nil {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
if len(memberships) == 0 {
|
|
err := fmt.Errorf("%w: game has no active memberships", ErrConflict)
|
|
s.publishStartConfigInvalid(ctx, op, "no active memberships")
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
|
|
version, err := s.deps.EngineVersions.Resolve(ctx, game.TargetEngineVersion)
|
|
if err != nil {
|
|
s.publishStartConfigInvalid(ctx, op, fmt.Sprintf("engine version %q: %v", game.TargetEngineVersion, err))
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
|
|
mappings := make([]PlayerMapping, 0, len(memberships))
|
|
races := make([]rest.InitRace, 0, len(memberships))
|
|
for _, m := range memberships {
|
|
mappings = append(mappings, PlayerMapping{
|
|
GameID: gameID,
|
|
UserID: m.UserID,
|
|
RaceName: m.RaceName,
|
|
EnginePlayerUUID: uuid.New(),
|
|
})
|
|
races = append(races, rest.InitRace{RaceName: m.RaceName})
|
|
}
|
|
if err := s.deps.Store.InsertPlayerMappings(ctx, mappings); err != nil {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
|
|
statePath := filepath.Join(filepath.Clean(s.deps.Config.ContainerStateMount), gameID.String())
|
|
hostStatePath := filepath.Join(filepath.Clean(s.hostStateRoot()), gameID.String())
|
|
|
|
// Bind-mount sources are resolved by the Docker daemon against
|
|
// the host filesystem, not against the backend process namespace.
|
|
// Production deploys mount the same `BACKEND_GAME_STATE_ROOT`
|
|
// path into the backend container at the same path, so creating
|
|
// the per-game subdirectory inside backend makes it visible to
|
|
// the daemon at the same absolute path.
|
|
//
|
|
// The directory is created with mode 0o777 (and explicitly
|
|
// chmod-ed to override umask) because the engine container may
|
|
// run as a different uid than backend. Both processes need
|
|
// read-write access to the bind-mounted state path; backend has
|
|
// no way to know the engine container's uid ahead of time, so
|
|
// world-writable is the conservative default. Production
|
|
// deployments that pin both containers to the same user can
|
|
// tighten the mode through a future configuration knob.
|
|
if err := os.MkdirAll(hostStatePath, 0o777); err != nil {
|
|
s.completeOperation(ctx, op, fmt.Errorf("create host state path %q: %w", hostStatePath, err))
|
|
return err
|
|
}
|
|
if err := os.Chmod(hostStatePath, 0o777); err != nil {
|
|
s.completeOperation(ctx, op, fmt.Errorf("chmod host state path %q: %w", hostStatePath, err))
|
|
return err
|
|
}
|
|
|
|
spec := dockerclient.RunSpec{
|
|
Name: ContainerName(gameID.String()),
|
|
Image: version.ImageRef,
|
|
Hostname: HostName(gameID.String()),
|
|
Network: s.dockerNetwork(),
|
|
Env: map[string]string{
|
|
"GAME_STATE_PATH": statePath,
|
|
},
|
|
Labels: s.engineLabels(gameID.String(), version.Version),
|
|
BindMounts: []dockerclient.BindMount{
|
|
{
|
|
HostPath: hostStatePath,
|
|
MountPath: s.deps.Config.ContainerStateMount,
|
|
ReadOnly: false,
|
|
},
|
|
},
|
|
LogDriver: s.deps.Config.ContainerLogDriver,
|
|
LogOpts: s.deps.Config.ContainerLogOpts,
|
|
CPUQuota: s.deps.Config.ContainerCPUQuota,
|
|
Memory: s.deps.Config.ContainerMemory,
|
|
PIDsLimit: s.deps.Config.ContainerPIDsLimit,
|
|
PullPolicy: dockerclient.PullPolicy(s.deps.Config.ImagePullPolicy),
|
|
}
|
|
|
|
runResult, err := s.deps.Docker.Run(ctx, spec)
|
|
if err != nil {
|
|
s.publishStartFailure(ctx, op, version.ImageRef, err)
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
|
|
now := s.deps.Now().UTC()
|
|
startedAt := runResult.StartedAt
|
|
if startedAt.IsZero() {
|
|
startedAt = now
|
|
}
|
|
startedAtPtr := &startedAt
|
|
rec, err := s.upsertRuntimeRecord(ctx, runtimeRecordInsert{
|
|
GameID: gameID,
|
|
Status: RuntimeStatusStarting,
|
|
CurrentContainerID: runResult.ContainerID,
|
|
CurrentImageRef: version.ImageRef,
|
|
CurrentEngineVersion: version.Version,
|
|
EngineEndpoint: runResult.EngineEndpoint,
|
|
StatePath: statePath,
|
|
DockerNetwork: s.dockerNetwork(),
|
|
TurnSchedule: game.TurnSchedule,
|
|
StartedAt: &startedAt,
|
|
}, runtimeRecordUpdate{
|
|
Status: strPtr(RuntimeStatusStarting),
|
|
CurrentContainerID: strPtr(runResult.ContainerID),
|
|
CurrentImageRef: strPtr(version.ImageRef),
|
|
CurrentEngineVersion: strPtr(version.Version),
|
|
EngineEndpoint: strPtr(runResult.EngineEndpoint),
|
|
StatePath: strPtr(statePath),
|
|
DockerNetwork: strPtr(s.dockerNetwork()),
|
|
TurnSchedule: strPtr(game.TurnSchedule),
|
|
StartedAt: &startedAtPtr,
|
|
})
|
|
if err != nil {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
|
|
// Wait for the engine HTTP listener before issuing init. Docker
|
|
// reports the container as running as soon as the entrypoint
|
|
// starts, but the Go binary inside may take a moment to bind
|
|
// the port; without this loop, Init races the listener and
|
|
// fails with `connection refused`.
|
|
if err := s.waitForEngineHealthz(ctx, runResult.EngineEndpoint, 30*time.Second); err != nil {
|
|
s.deps.Logger.Warn("engine healthz never succeeded",
|
|
zap.String("game_id", gameID.String()),
|
|
zap.Error(err))
|
|
s.transitionRuntimeStatus(ctx, gameID, RuntimeStatusEngineUnreachable, "")
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
|
|
initResp, err := s.deps.Engine.Init(ctx, runResult.EngineEndpoint, rest.InitRequest{GameID: gameID, Races: races})
|
|
if err != nil {
|
|
s.deps.Logger.Warn("engine init failed",
|
|
zap.String("game_id", gameID.String()),
|
|
zap.Error(err))
|
|
s.transitionRuntimeStatus(ctx, gameID, RuntimeStatusEngineUnreachable, "")
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
|
|
// Engine is up. Transition the runtime row to running and publish
|
|
// the snapshot into lobby.
|
|
rec, err = s.transitionRuntimeStatus(ctx, gameID, RuntimeStatusRunning, "ok")
|
|
if err != nil {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
s.scheduler.startGame(rec)
|
|
if err := s.publishSnapshot(ctx, gameID, initResp); err != nil {
|
|
s.deps.Logger.Warn("publish init snapshot failed",
|
|
zap.String("game_id", gameID.String()),
|
|
zap.Error(err))
|
|
}
|
|
s.completeOperation(ctx, op, nil)
|
|
return nil
|
|
}
|
|
|
|
// runStop stops + removes the engine container and transitions the
|
|
// runtime row to `stopped`.
|
|
func (s *Service) runStop(ctx context.Context, op OperationLog) error {
|
|
gameID := op.GameID
|
|
mu := s.gameLock(gameID)
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
rec, err := s.GetRuntime(ctx, gameID)
|
|
if err != nil {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
s.scheduler.stopGame(gameID)
|
|
if rec.CurrentContainerID != "" {
|
|
if err := s.deps.Docker.Stop(ctx, rec.CurrentContainerID, int(s.deps.Config.StopGracePeriod/time.Second)); err != nil && !errors.Is(err, dockerclient.ErrContainerNotFound) {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
if err := s.deps.Docker.Remove(ctx, rec.CurrentContainerID); err != nil {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
}
|
|
now := s.deps.Now().UTC()
|
|
stoppedAtPtr := &now
|
|
updated, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, runtimeRecordUpdate{
|
|
Status: strPtr(RuntimeStatusStopped),
|
|
StoppedAt: &stoppedAtPtr,
|
|
}, now)
|
|
if err != nil {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
s.deps.Cache.PutRuntime(updated)
|
|
if err := s.deps.Store.DeletePlayerMappingsForGame(ctx, gameID); err != nil {
|
|
s.deps.Logger.Warn("delete player_mappings on stop failed",
|
|
zap.String("game_id", gameID.String()),
|
|
zap.Error(err))
|
|
}
|
|
s.completeOperation(ctx, op, nil)
|
|
return nil
|
|
}
|
|
|
|
// runRestart stops + removes + runs a fresh container with the same
|
|
// image_ref. Reuses runStart's logic via re-loading the lobby
|
|
// projection.
|
|
func (s *Service) runRestart(ctx context.Context, op OperationLog) error {
|
|
if err := s.runStop(ctx, op); err != nil {
|
|
return err
|
|
}
|
|
// Reuse runStart with a freshly minted operation row so the audit
|
|
// trail remains consistent.
|
|
startOp, err := s.beginOperation(ctx, op.GameID, OpStart, op.Source)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.runStart(ctx, startOp)
|
|
}
|
|
|
|
// runPatch stops + removes the current container, updates the engine
|
|
// version reference, and starts a fresh container.
|
|
func (s *Service) runPatch(ctx context.Context, op OperationLog, target EngineVersion) error {
|
|
mu := s.gameLock(op.GameID)
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
rec, err := s.GetRuntime(ctx, op.GameID)
|
|
if err != nil {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
s.scheduler.stopGame(op.GameID)
|
|
if rec.CurrentContainerID != "" {
|
|
if err := s.deps.Docker.Stop(ctx, rec.CurrentContainerID, int(s.deps.Config.StopGracePeriod/time.Second)); err != nil && !errors.Is(err, dockerclient.ErrContainerNotFound) {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
if err := s.deps.Docker.Remove(ctx, rec.CurrentContainerID); err != nil {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
statePath := rec.StatePath
|
|
if statePath == "" {
|
|
statePath = filepath.Join(filepath.Clean(s.deps.Config.ContainerStateMount), op.GameID.String())
|
|
}
|
|
hostStatePath := filepath.Join(filepath.Clean(s.hostStateRoot()), op.GameID.String())
|
|
|
|
spec := dockerclient.RunSpec{
|
|
Name: ContainerName(op.GameID.String()),
|
|
Image: target.ImageRef,
|
|
Hostname: HostName(op.GameID.String()),
|
|
Network: s.dockerNetwork(),
|
|
Env: map[string]string{
|
|
"GAME_STATE_PATH": statePath,
|
|
},
|
|
Labels: s.engineLabels(op.GameID.String(), target.Version),
|
|
BindMounts: []dockerclient.BindMount{
|
|
{HostPath: hostStatePath, MountPath: s.deps.Config.ContainerStateMount},
|
|
},
|
|
LogDriver: s.deps.Config.ContainerLogDriver,
|
|
LogOpts: s.deps.Config.ContainerLogOpts,
|
|
CPUQuota: s.deps.Config.ContainerCPUQuota,
|
|
Memory: s.deps.Config.ContainerMemory,
|
|
PIDsLimit: s.deps.Config.ContainerPIDsLimit,
|
|
PullPolicy: dockerclient.PullPolicy(s.deps.Config.ImagePullPolicy),
|
|
}
|
|
runResult, err := s.deps.Docker.Run(ctx, spec)
|
|
if err != nil {
|
|
s.publishStartFailure(ctx, op, target.ImageRef, err)
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
now := s.deps.Now().UTC()
|
|
startedAt := runResult.StartedAt
|
|
if startedAt.IsZero() {
|
|
startedAt = now
|
|
}
|
|
startedAtPtr := &startedAt
|
|
updated, err := s.deps.Store.UpdateRuntimeRecord(ctx, op.GameID, runtimeRecordUpdate{
|
|
Status: strPtr(RuntimeStatusRunning),
|
|
CurrentContainerID: strPtr(runResult.ContainerID),
|
|
CurrentImageRef: strPtr(target.ImageRef),
|
|
CurrentEngineVersion: strPtr(target.Version),
|
|
EngineEndpoint: strPtr(runResult.EngineEndpoint),
|
|
StartedAt: &startedAtPtr,
|
|
EngineHealth: strPtr("ok"),
|
|
}, now)
|
|
if err != nil {
|
|
s.completeOperation(ctx, op, err)
|
|
return err
|
|
}
|
|
s.deps.Cache.PutRuntime(updated)
|
|
s.scheduler.startGame(updated)
|
|
s.completeOperation(ctx, op, nil)
|
|
return nil
|
|
}
|
|
|
|
// =====================================================================
|
|
// Snapshot / status helpers
|
|
// =====================================================================
|
|
|
|
// publishSnapshot writes a runtime_health_snapshots row, refreshes the
|
|
// runtime cache from `current_turn` / `engine_health`, and forwards
|
|
// the snapshot to lobby.
|
|
func (s *Service) publishSnapshot(ctx context.Context, gameID uuid.UUID, state rest.StateResponse) error {
|
|
now := s.deps.Now().UTC()
|
|
payload, err := json.Marshal(state)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal snapshot: %w", err)
|
|
}
|
|
if err := s.deps.Store.InsertHealthSnapshot(ctx, uuid.New(), gameID, now, payload); err != nil {
|
|
return err
|
|
}
|
|
currentTurn := int32(state.Turn)
|
|
patch := runtimeRecordUpdate{
|
|
CurrentTurn: ¤tTurn,
|
|
EngineHealth: strPtr("ok"),
|
|
LastObservedAt: dblTime(now),
|
|
}
|
|
if state.Finished {
|
|
patch.Status = strPtr(RuntimeStatusFinished)
|
|
finishedAtPtr := &now
|
|
patch.FinishedAt = &finishedAtPtr
|
|
}
|
|
rec, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, patch, now)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.deps.Cache.PutRuntime(rec)
|
|
|
|
if s.deps.Lobby != nil {
|
|
mappings, err := s.deps.Store.ListPlayerMappingsForGame(ctx, gameID)
|
|
if err != nil {
|
|
s.deps.Logger.Warn("list player_mappings on snapshot failed",
|
|
zap.String("game_id", gameID.String()),
|
|
zap.Error(err))
|
|
}
|
|
userByEngine := make(map[uuid.UUID]uuid.UUID, len(mappings))
|
|
userByRace := make(map[string]uuid.UUID, len(mappings))
|
|
for _, m := range mappings {
|
|
userByEngine[m.EnginePlayerUUID] = m.UserID
|
|
userByRace[m.RaceName] = m.UserID
|
|
}
|
|
stats := make([]LobbyPlayerStats, 0, len(state.Players))
|
|
for _, p := range state.Players {
|
|
userID, ok := userByEngine[p.ID]
|
|
if !ok {
|
|
userID = userByRace[p.RaceName]
|
|
}
|
|
if userID == uuid.Nil {
|
|
continue
|
|
}
|
|
stats = append(stats, LobbyPlayerStats{
|
|
UserID: userID,
|
|
CurrentPlanets: int32(p.Planets),
|
|
CurrentPopulation: int32(p.Population),
|
|
MaxPlanets: int32(p.Planets),
|
|
MaxPopulation: int32(p.Population),
|
|
})
|
|
}
|
|
runtimeStatus := RuntimeStatusRunning
|
|
if state.Finished {
|
|
runtimeStatus = RuntimeStatusFinished
|
|
}
|
|
err = s.deps.Lobby.OnRuntimeSnapshot(ctx, gameID, LobbySnapshot{
|
|
CurrentTurn: currentTurn,
|
|
RuntimeStatus: runtimeStatus,
|
|
EngineHealth: "ok",
|
|
ObservedAt: now,
|
|
PlayerStats: stats,
|
|
})
|
|
if err != nil {
|
|
s.deps.Logger.Warn("lobby snapshot consumer failed",
|
|
zap.String("game_id", gameID.String()),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// publishFailureSnapshot forwards a runtime-failure observation to
|
|
// lobby so the game lifecycle can react (e.g. flipping `running` to
|
|
// `paused` on `engine_unreachable` / `generation_failed` per Phase
|
|
// 25). The snapshot carries the unchanged `current_turn` because no
|
|
// new turn has been produced; lobby uses the turn number to anchor
|
|
// the `game.paused` idempotency key.
|
|
//
|
|
// The call is best-effort: lobby errors are returned to the caller
|
|
// (the scheduler tick) so the warn-level logging stays in one place.
|
|
// A missing runtime cache entry (e.g. the row was just removed by
|
|
// the reconciler) collapses into a silent no-op.
|
|
func (s *Service) publishFailureSnapshot(ctx context.Context, gameID uuid.UUID, runtimeStatus string) error {
|
|
if s.deps.Lobby == nil {
|
|
return nil
|
|
}
|
|
rec, ok := s.deps.Cache.GetRuntime(gameID)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return s.deps.Lobby.OnRuntimeSnapshot(ctx, gameID, LobbySnapshot{
|
|
CurrentTurn: rec.CurrentTurn,
|
|
RuntimeStatus: runtimeStatus,
|
|
EngineHealth: "down",
|
|
ObservedAt: s.deps.Now().UTC(),
|
|
})
|
|
}
|
|
|
|
// transitionRuntimeStatus updates the status / engine_health columns
|
|
// and refreshes the cache.
|
|
func (s *Service) transitionRuntimeStatus(ctx context.Context, gameID uuid.UUID, status, health string) (RuntimeRecord, error) {
|
|
now := s.deps.Now().UTC()
|
|
patch := runtimeRecordUpdate{Status: &status}
|
|
if health != "" {
|
|
patch.EngineHealth = &health
|
|
}
|
|
if status == RuntimeStatusFinished {
|
|
finishedAtPtr := &now
|
|
patch.FinishedAt = &finishedAtPtr
|
|
}
|
|
if status == RuntimeStatusStopped {
|
|
stoppedAtPtr := &now
|
|
patch.StoppedAt = &stoppedAtPtr
|
|
}
|
|
rec, err := s.deps.Store.UpdateRuntimeRecord(ctx, gameID, patch, now)
|
|
if err != nil {
|
|
return RuntimeRecord{}, err
|
|
}
|
|
s.deps.Cache.PutRuntime(rec)
|
|
return rec, nil
|
|
}
|
|
|
|
// upsertRuntimeRecord inserts the record when no row exists; updates
|
|
// it otherwise. Used by runStart so a re-attempt after a worker crash
|
|
// stays idempotent.
|
|
func (s *Service) upsertRuntimeRecord(ctx context.Context, in runtimeRecordInsert, patch runtimeRecordUpdate) (RuntimeRecord, error) {
|
|
rec, err := s.deps.Store.InsertRuntimeRecord(ctx, in)
|
|
if err == nil {
|
|
s.deps.Cache.PutRuntime(rec)
|
|
return rec, nil
|
|
}
|
|
if !errors.Is(err, ErrConflict) {
|
|
return RuntimeRecord{}, err
|
|
}
|
|
updated, err := s.deps.Store.UpdateRuntimeRecord(ctx, in.GameID, patch, s.deps.Now().UTC())
|
|
if err != nil {
|
|
return RuntimeRecord{}, err
|
|
}
|
|
s.deps.Cache.PutRuntime(updated)
|
|
return updated, nil
|
|
}
|
|
|
|
// dockerNetwork returns the user-defined Docker network name engine
|
|
// containers attach to. Wired from cfg.Docker.Network through Deps.
|
|
func (s *Service) dockerNetwork() string { return s.deps.DockerNetwork }
|
|
|
|
// engineLabels returns the label set stamped on every engine container
|
|
// spawned for gameID running engineVersion. The runtime adapter merges
|
|
// `dockerclient.ManagedLabel` separately; this helper covers the
|
|
// game-scoped labels plus an optional `galaxy.stack=<value>` from the
|
|
// runtime config so host-side tooling can scope cleanup by dev stack
|
|
// without touching unrelated workloads.
|
|
func (s *Service) engineLabels(gameID, engineVersion string) map[string]string {
|
|
return engineLabels(gameID, engineVersion, s.deps.Config.StackLabel)
|
|
}
|
|
|
|
// engineLabels is the side-effect-free part of `(*Service).engineLabels`,
|
|
// exposed at package scope so unit tests can exercise the labelling
|
|
// rules without building a full Service.
|
|
func engineLabels(gameID, engineVersion, stackLabel string) map[string]string {
|
|
labels := map[string]string{
|
|
"galaxy.game_id": gameID,
|
|
"galaxy.engine_version": engineVersion,
|
|
}
|
|
if stackLabel != "" {
|
|
labels["galaxy.stack"] = stackLabel
|
|
}
|
|
return labels
|
|
}
|
|
|
|
// waitForEngineHealthz polls the engine `/healthz` endpoint until it
|
|
// responds 2xx or until the timeout elapses. The Docker daemon
|
|
// reports a container as `running` as soon as the entrypoint starts,
|
|
// but the engine binary may need a moment to bind its TCP port; the
|
|
// retry loop bridges that gap so the immediately-following Init call
|
|
// does not race the listener.
|
|
func (s *Service) waitForEngineHealthz(ctx context.Context, baseURL string, timeout time.Duration) error {
|
|
deadline := time.Now().Add(timeout)
|
|
var lastErr error
|
|
for {
|
|
probeCtx, cancel := context.WithTimeout(ctx, time.Second)
|
|
err := s.deps.Engine.Healthz(probeCtx, baseURL)
|
|
cancel()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
lastErr = err
|
|
if time.Now().After(deadline) {
|
|
return fmt.Errorf("engine healthz never succeeded within %s: %w", timeout, lastErr)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(200 * time.Millisecond):
|
|
}
|
|
}
|
|
}
|
|
|
|
// hostStateRoot returns the host-side root directory under which the
|
|
// per-game state directory is created. Wired from cfg.Game.StateRoot
|
|
// through Deps.
|
|
func (s *Service) hostStateRoot() string {
|
|
if s.deps.HostStateRoot != "" {
|
|
return s.deps.HostStateRoot
|
|
}
|
|
return s.deps.Config.ContainerStateMount
|
|
}
|
|
|
|
// strPtr returns a pointer to s. Helps assemble runtimeRecordUpdate
|
|
// values inline.
|
|
func strPtr(s string) *string { return &s }
|
|
|
|
// dblTime returns a `**time.Time` set to t. Used to clear / set the
|
|
// nullable timestamp columns of `runtime_records` through
|
|
// runtimeRecordUpdate.
|
|
func dblTime(t time.Time) **time.Time { p := &t; return &p }
|