Files
Ilia Denisov 2ca47eb4df ui/phase-25: backend turn-cutoff guard + auto-pause + UI sync protocol
Backend now owns the turn-cutoff and pause guards the order tab
relies on: the scheduler flips runtime_status between
generation_in_progress and running around every engine tick, a
failed tick auto-pauses the game through OnRuntimeSnapshot, and a
new game.paused notification kind fans out alongside
game.turn.ready. The user-games handlers reject submits with
HTTP 409 turn_already_closed or game_paused depending on the
runtime state.

UI delegates auto-sync to a new OrderQueue: offline detection,
single retry on reconnect, conflict / paused classification.
OrderDraftStore surfaces conflictBanner / pausedBanner runes,
clears them on local mutation or on a game.turn.ready push via
resetForNewTurn. The order tab renders the matching banners and
the new conflict per-row badge; i18n bundles cover en + ru.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:00:16 +02:00

304 lines
8.3 KiB
Go

package runtime
import (
"context"
"errors"
"sync"
"time"
"galaxy/backend/internal/dockerclient"
"galaxy/backend/internal/engineclient"
"galaxy/cronutil"
"github.com/google/uuid"
"go.uber.org/zap"
)
// Scheduler runs one goroutine per running game. Each goroutine holds
// a `cronutil.Schedule` parsed from `runtime_records.turn_schedule`
// and invokes `engineclient.Turn` on every tick (or when
// `skip_next_tick=true` short-circuits the timer).
//
// Implements `app.Component` so main.go can register the bookkeeper
// component alongside the worker pool and reconciler. Run blocks on
// ctx; per-game goroutines tear down when their game leaves the cache
// (stopGame is called) or when ctx is cancelled.
type Scheduler struct {
svc *Service
mu sync.Mutex
tickers map[uuid.UUID]*scheduledGame
parent context.Context
stopping bool
}
type scheduledGame struct {
cancel context.CancelFunc
done chan struct{}
}
// NewScheduler builds a Scheduler. The svc reference is held for the
// life of the Scheduler.
func NewScheduler(svc *Service) *Scheduler {
return &Scheduler{
svc: svc,
tickers: make(map[uuid.UUID]*scheduledGame),
}
}
// Run installs ctx as the parent context and re-attaches scheduler
// goroutines for every active runtime record at startup. Blocks on
// ctx.
func (sch *Scheduler) Run(ctx context.Context) error {
if sch == nil {
return nil
}
sch.mu.Lock()
sch.parent = ctx
sch.stopping = false
sch.mu.Unlock()
// Re-attach schedulers for every running record.
for _, rec := range sch.svc.deps.Cache.ActiveRuntimes() {
if rec.Status != RuntimeStatusRunning {
continue
}
sch.startGame(rec)
}
<-ctx.Done()
return nil
}
// Shutdown cancels every per-game goroutine and waits for them to
// drain. The provided context bounds the wait.
func (sch *Scheduler) Shutdown(ctx context.Context) error {
if sch == nil {
return nil
}
sch.mu.Lock()
sch.stopping = true
games := make([]*scheduledGame, 0, len(sch.tickers))
for _, g := range sch.tickers {
games = append(games, g)
}
sch.tickers = make(map[uuid.UUID]*scheduledGame)
sch.mu.Unlock()
for _, g := range games {
g.cancel()
}
for _, g := range games {
select {
case <-g.done:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// startGame attaches a per-game scheduler goroutine. Idempotent: a
// repeated call replaces the old goroutine with a fresh one bound to
// the supplied record.
func (sch *Scheduler) startGame(rec RuntimeRecord) {
if sch == nil {
return
}
sch.mu.Lock()
if sch.stopping || sch.parent == nil {
sch.mu.Unlock()
return
}
if existing, ok := sch.tickers[rec.GameID]; ok {
existing.cancel()
sch.mu.Unlock()
<-existing.done
sch.mu.Lock()
}
parent := sch.parent
if parent == nil {
sch.mu.Unlock()
return
}
gameCtx, cancel := context.WithCancel(parent)
g := &scheduledGame{cancel: cancel, done: make(chan struct{})}
sch.tickers[rec.GameID] = g
sch.mu.Unlock()
go sch.loop(gameCtx, rec, g.done)
}
// stopGame cancels the goroutine tied to gameID. Idempotent.
func (sch *Scheduler) stopGame(gameID uuid.UUID) {
if sch == nil {
return
}
sch.mu.Lock()
g, ok := sch.tickers[gameID]
if ok {
delete(sch.tickers, gameID)
}
sch.mu.Unlock()
if !ok {
return
}
g.cancel()
<-g.done
}
// activeCount reports how many games currently have a scheduler
// goroutine. Used by tests.
func (sch *Scheduler) activeCount() int {
sch.mu.Lock()
defer sch.mu.Unlock()
return len(sch.tickers)
}
// tickInterval computes the wait for the next scheduler firing. When
// the cron schedule fails to parse the loop falls back to a one-hour
// safety interval and logs a warning so operators notice.
func (sch *Scheduler) loop(ctx context.Context, rec RuntimeRecord, done chan struct{}) {
defer close(done)
logger := sch.svc.deps.Logger.With(zap.String("game_id", rec.GameID.String()))
schedule, err := cronutil.Parse(rec.TurnSchedule)
if err != nil {
logger.Warn("invalid turn_schedule, scheduler stopping",
zap.String("turn_schedule", rec.TurnSchedule),
zap.Error(err))
return
}
for {
latest, ok := sch.svc.deps.Cache.GetRuntime(rec.GameID)
if !ok {
return
}
if latest.Status != RuntimeStatusRunning {
return
}
now := sch.svc.deps.Now().UTC()
next := schedule.Next(now)
wait := next.Sub(now)
if latest.SkipNextTick {
wait = 0
}
if wait < 0 {
wait = 0
}
timer := time.NewTimer(wait)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
// Fresh fetch in case of pause / status change while waiting.
current, ok := sch.svc.deps.Cache.GetRuntime(rec.GameID)
if !ok {
return
}
if current.Status != RuntimeStatusRunning {
return
}
if current.Paused {
continue
}
if err := sch.tick(ctx, current); err != nil {
logger.Warn("scheduler tick failed", zap.Error(err))
}
}
}
// tick runs one engine /admin/turn call under the per-game mutex,
// publishes the resulting snapshot, and clears `skip_next_tick`.
//
// Phase 25 wraps the engine call between two runtime-status flips so
// the backend order handler can reject late submits while the engine
// is producing:
//
// - before `Engine.Turn`: runtime status moves to
// `generation_in_progress`; the loop's running-only guard tolerates
// this because the flip back happens inside the same tick.
// - on success: runtime status moves back to `running` (unless the
// engine reports `finished`, in which case `publishSnapshot` has
// already promoted the row to `finished`).
// - on error: runtime status moves to `generation_failed` (engine
// validation failure) or `engine_unreachable` (transport / 5xx).
// The matching snapshot is forwarded to lobby through
// `publishFailureSnapshot` so lobby can flip the game to `paused`
// and emit `game.paused`.
func (sch *Scheduler) tick(ctx context.Context, rec RuntimeRecord) error {
mu := sch.svc.gameLock(rec.GameID)
if !mu.TryLock() {
return nil // another op is in flight; skip this tick
}
defer mu.Unlock()
op, err := sch.svc.beginOperation(ctx, rec.GameID, OpTurn, OpSourceScheduler)
if err != nil {
return err
}
if _, err := sch.svc.transitionRuntimeStatus(ctx, rec.GameID, RuntimeStatusGenerationInProgress, ""); err != nil {
sch.svc.completeOperation(ctx, op, err)
return err
}
state, err := sch.svc.deps.Engine.Turn(ctx, rec.EngineEndpoint)
if err != nil {
sch.svc.completeOperation(ctx, op, err)
failureStatus := RuntimeStatusEngineUnreachable
if errors.Is(err, engineclient.ErrEngineValidation) {
failureStatus = RuntimeStatusGenerationFailed
}
_, _ = sch.svc.transitionRuntimeStatus(ctx, rec.GameID, failureStatus, "down")
if pubErr := sch.svc.publishFailureSnapshot(ctx, rec.GameID, failureStatus); pubErr != nil {
sch.svc.deps.Logger.Warn("publish failure snapshot to lobby",
zap.String("game_id", rec.GameID.String()),
zap.String("runtime_status", failureStatus),
zap.Error(pubErr))
}
// On engine unreachable, also clear skip_next_tick so the next
// real tick can start fresh.
_ = sch.clearSkipFlag(ctx, rec.GameID)
// Best-effort: ask Docker whether the container is still
// alive; if it's gone we mark the runtime row as removed.
if rec.CurrentContainerID != "" {
if _, inspErr := sch.svc.deps.Docker.InspectContainer(ctx, rec.CurrentContainerID); errors.Is(inspErr, dockerclient.ErrContainerNotFound) {
_, _ = sch.svc.transitionRuntimeStatus(ctx, rec.GameID, RuntimeStatusRemoved, "")
}
}
return err
}
if err := sch.svc.publishSnapshot(ctx, rec.GameID, state); err != nil {
sch.svc.completeOperation(ctx, op, err)
return err
}
if !state.Finished {
// `publishSnapshot` patches CurrentTurn / EngineHealth but does
// not reset the status column; reopen the orders window here so
// the next loop iteration finds the runtime back in `running`.
_, _ = sch.svc.transitionRuntimeStatus(ctx, rec.GameID, RuntimeStatusRunning, "ok")
}
sch.svc.completeOperation(ctx, op, nil)
_ = sch.clearSkipFlag(ctx, rec.GameID)
return nil
}
func (sch *Scheduler) clearSkipFlag(ctx context.Context, gameID uuid.UUID) error {
rec, ok := sch.svc.deps.Cache.GetRuntime(gameID)
if !ok || !rec.SkipNextTick {
return nil
}
skip := false
now := sch.svc.deps.Now().UTC()
updated, err := sch.svc.deps.Store.UpdateRuntimeRecord(ctx, gameID, runtimeRecordUpdate{SkipNextTick: &skip}, now)
if err != nil {
return err
}
sch.svc.deps.Cache.PutRuntime(updated)
return nil
}