package runtime import ( "context" "errors" "sync" "time" "galaxy/backend/internal/dockerclient" "galaxy/cronutil" "github.com/google/uuid" "go.uber.org/zap" ) // Scheduler runs one goroutine per running game. Each goroutine holds // a `cronutil.Schedule` parsed from `runtime_records.turn_schedule` // and invokes `engineclient.Turn` on every tick (or when // `skip_next_tick=true` short-circuits the timer). // // Implements `app.Component` so main.go can register the bookkeeper // component alongside the worker pool and reconciler. Run blocks on // ctx; per-game goroutines tear down when their game leaves the cache // (stopGame is called) or when ctx is cancelled. type Scheduler struct { svc *Service mu sync.Mutex tickers map[uuid.UUID]*scheduledGame parent context.Context stopping bool } type scheduledGame struct { cancel context.CancelFunc done chan struct{} } // NewScheduler builds a Scheduler. The svc reference is held for the // life of the Scheduler. func NewScheduler(svc *Service) *Scheduler { return &Scheduler{ svc: svc, tickers: make(map[uuid.UUID]*scheduledGame), } } // Run installs ctx as the parent context and re-attaches scheduler // goroutines for every active runtime record at startup. Blocks on // ctx. func (sch *Scheduler) Run(ctx context.Context) error { if sch == nil { return nil } sch.mu.Lock() sch.parent = ctx sch.stopping = false sch.mu.Unlock() // Re-attach schedulers for every running record. for _, rec := range sch.svc.deps.Cache.ActiveRuntimes() { if rec.Status != RuntimeStatusRunning { continue } sch.startGame(rec) } <-ctx.Done() return nil } // Shutdown cancels every per-game goroutine and waits for them to // drain. The provided context bounds the wait. func (sch *Scheduler) Shutdown(ctx context.Context) error { if sch == nil { return nil } sch.mu.Lock() sch.stopping = true games := make([]*scheduledGame, 0, len(sch.tickers)) for _, g := range sch.tickers { games = append(games, g) } sch.tickers = make(map[uuid.UUID]*scheduledGame) sch.mu.Unlock() for _, g := range games { g.cancel() } for _, g := range games { select { case <-g.done: case <-ctx.Done(): return ctx.Err() } } return nil } // startGame attaches a per-game scheduler goroutine. Idempotent: a // repeated call replaces the old goroutine with a fresh one bound to // the supplied record. func (sch *Scheduler) startGame(rec RuntimeRecord) { if sch == nil { return } sch.mu.Lock() if sch.stopping || sch.parent == nil { sch.mu.Unlock() return } if existing, ok := sch.tickers[rec.GameID]; ok { existing.cancel() sch.mu.Unlock() <-existing.done sch.mu.Lock() } parent := sch.parent if parent == nil { sch.mu.Unlock() return } gameCtx, cancel := context.WithCancel(parent) g := &scheduledGame{cancel: cancel, done: make(chan struct{})} sch.tickers[rec.GameID] = g sch.mu.Unlock() go sch.loop(gameCtx, rec, g.done) } // stopGame cancels the goroutine tied to gameID. Idempotent. func (sch *Scheduler) stopGame(gameID uuid.UUID) { if sch == nil { return } sch.mu.Lock() g, ok := sch.tickers[gameID] if ok { delete(sch.tickers, gameID) } sch.mu.Unlock() if !ok { return } g.cancel() <-g.done } // activeCount reports how many games currently have a scheduler // goroutine. Used by tests. func (sch *Scheduler) activeCount() int { sch.mu.Lock() defer sch.mu.Unlock() return len(sch.tickers) } // tickInterval computes the wait for the next scheduler firing. When // the cron schedule fails to parse the loop falls back to a one-hour // safety interval and logs a warning so operators notice. func (sch *Scheduler) loop(ctx context.Context, rec RuntimeRecord, done chan struct{}) { defer close(done) logger := sch.svc.deps.Logger.With(zap.String("game_id", rec.GameID.String())) schedule, err := cronutil.Parse(rec.TurnSchedule) if err != nil { logger.Warn("invalid turn_schedule, scheduler stopping", zap.String("turn_schedule", rec.TurnSchedule), zap.Error(err)) return } for { latest, ok := sch.svc.deps.Cache.GetRuntime(rec.GameID) if !ok { return } if latest.Status != RuntimeStatusRunning { return } now := sch.svc.deps.Now().UTC() next := schedule.Next(now) wait := next.Sub(now) if latest.SkipNextTick { wait = 0 } if wait < 0 { wait = 0 } timer := time.NewTimer(wait) select { case <-ctx.Done(): timer.Stop() return case <-timer.C: } // Fresh fetch in case of pause / status change while waiting. current, ok := sch.svc.deps.Cache.GetRuntime(rec.GameID) if !ok { return } if current.Status != RuntimeStatusRunning { return } if current.Paused { continue } if err := sch.tick(ctx, current); err != nil { logger.Warn("scheduler tick failed", zap.Error(err)) } } } // tick runs one engine /admin/turn call under the per-game mutex, // publishes the resulting snapshot, and clears `skip_next_tick`. func (sch *Scheduler) tick(ctx context.Context, rec RuntimeRecord) error { mu := sch.svc.gameLock(rec.GameID) if !mu.TryLock() { return nil // another op is in flight; skip this tick } defer mu.Unlock() op, err := sch.svc.beginOperation(ctx, rec.GameID, OpTurn, OpSourceScheduler) if err != nil { return err } state, err := sch.svc.deps.Engine.Turn(ctx, rec.EngineEndpoint) if err != nil { sch.svc.completeOperation(ctx, op, err) _, _ = sch.svc.transitionRuntimeStatus(ctx, rec.GameID, RuntimeStatusEngineUnreachable, "") // On engine unreachable, also clear skip_next_tick so the next // real tick can start fresh. _ = sch.clearSkipFlag(ctx, rec.GameID) // Best-effort: ask Docker whether the container is still // alive; if it's gone we mark the runtime row as removed. if rec.CurrentContainerID != "" { if _, inspErr := sch.svc.deps.Docker.InspectContainer(ctx, rec.CurrentContainerID); errors.Is(inspErr, dockerclient.ErrContainerNotFound) { _, _ = sch.svc.transitionRuntimeStatus(ctx, rec.GameID, RuntimeStatusRemoved, "") } } return err } if err := sch.svc.publishSnapshot(ctx, rec.GameID, state); err != nil { sch.svc.completeOperation(ctx, op, err) return err } sch.svc.completeOperation(ctx, op, nil) _ = sch.clearSkipFlag(ctx, rec.GameID) return nil } func (sch *Scheduler) clearSkipFlag(ctx context.Context, gameID uuid.UUID) error { rec, ok := sch.svc.deps.Cache.GetRuntime(gameID) if !ok || !rec.SkipNextTick { return nil } skip := false now := sch.svc.deps.Now().UTC() updated, err := sch.svc.deps.Store.UpdateRuntimeRecord(ctx, gameID, runtimeRecordUpdate{SkipNextTick: &skip}, now) if err != nil { return err } sch.svc.deps.Cache.PutRuntime(updated) return nil }