Files
galaxy-game/backend/internal/runtime/scheduler.go
T
2026-05-06 10:14:55 +03:00

267 lines
6.6 KiB
Go

package runtime
import (
"context"
"errors"
"sync"
"time"
"galaxy/backend/internal/dockerclient"
"galaxy/cronutil"
"github.com/google/uuid"
"go.uber.org/zap"
)
// Scheduler runs one goroutine per running game. Each goroutine holds
// a `cronutil.Schedule` parsed from `runtime_records.turn_schedule`
// and invokes `engineclient.Turn` on every tick (or when
// `skip_next_tick=true` short-circuits the timer).
//
// Implements `app.Component` so main.go can register the bookkeeper
// component alongside the worker pool and reconciler. Run blocks on
// ctx; per-game goroutines tear down when their game leaves the cache
// (stopGame is called) or when ctx is cancelled.
type Scheduler struct {
svc *Service
mu sync.Mutex
tickers map[uuid.UUID]*scheduledGame
parent context.Context
stopping bool
}
type scheduledGame struct {
cancel context.CancelFunc
done chan struct{}
}
// NewScheduler builds a Scheduler. The svc reference is held for the
// life of the Scheduler.
func NewScheduler(svc *Service) *Scheduler {
return &Scheduler{
svc: svc,
tickers: make(map[uuid.UUID]*scheduledGame),
}
}
// Run installs ctx as the parent context and re-attaches scheduler
// goroutines for every active runtime record at startup. Blocks on
// ctx.
func (sch *Scheduler) Run(ctx context.Context) error {
if sch == nil {
return nil
}
sch.mu.Lock()
sch.parent = ctx
sch.stopping = false
sch.mu.Unlock()
// Re-attach schedulers for every running record.
for _, rec := range sch.svc.deps.Cache.ActiveRuntimes() {
if rec.Status != RuntimeStatusRunning {
continue
}
sch.startGame(rec)
}
<-ctx.Done()
return nil
}
// Shutdown cancels every per-game goroutine and waits for them to
// drain. The provided context bounds the wait.
func (sch *Scheduler) Shutdown(ctx context.Context) error {
if sch == nil {
return nil
}
sch.mu.Lock()
sch.stopping = true
games := make([]*scheduledGame, 0, len(sch.tickers))
for _, g := range sch.tickers {
games = append(games, g)
}
sch.tickers = make(map[uuid.UUID]*scheduledGame)
sch.mu.Unlock()
for _, g := range games {
g.cancel()
}
for _, g := range games {
select {
case <-g.done:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// startGame attaches a per-game scheduler goroutine. Idempotent: a
// repeated call replaces the old goroutine with a fresh one bound to
// the supplied record.
func (sch *Scheduler) startGame(rec RuntimeRecord) {
if sch == nil {
return
}
sch.mu.Lock()
if sch.stopping || sch.parent == nil {
sch.mu.Unlock()
return
}
if existing, ok := sch.tickers[rec.GameID]; ok {
existing.cancel()
sch.mu.Unlock()
<-existing.done
sch.mu.Lock()
}
parent := sch.parent
if parent == nil {
sch.mu.Unlock()
return
}
gameCtx, cancel := context.WithCancel(parent)
g := &scheduledGame{cancel: cancel, done: make(chan struct{})}
sch.tickers[rec.GameID] = g
sch.mu.Unlock()
go sch.loop(gameCtx, rec, g.done)
}
// stopGame cancels the goroutine tied to gameID. Idempotent.
func (sch *Scheduler) stopGame(gameID uuid.UUID) {
if sch == nil {
return
}
sch.mu.Lock()
g, ok := sch.tickers[gameID]
if ok {
delete(sch.tickers, gameID)
}
sch.mu.Unlock()
if !ok {
return
}
g.cancel()
<-g.done
}
// activeCount reports how many games currently have a scheduler
// goroutine. Used by tests.
func (sch *Scheduler) activeCount() int {
sch.mu.Lock()
defer sch.mu.Unlock()
return len(sch.tickers)
}
// tickInterval computes the wait for the next scheduler firing. When
// the cron schedule fails to parse the loop falls back to a one-hour
// safety interval and logs a warning so operators notice.
func (sch *Scheduler) loop(ctx context.Context, rec RuntimeRecord, done chan struct{}) {
defer close(done)
logger := sch.svc.deps.Logger.With(zap.String("game_id", rec.GameID.String()))
schedule, err := cronutil.Parse(rec.TurnSchedule)
if err != nil {
logger.Warn("invalid turn_schedule, scheduler stopping",
zap.String("turn_schedule", rec.TurnSchedule),
zap.Error(err))
return
}
for {
latest, ok := sch.svc.deps.Cache.GetRuntime(rec.GameID)
if !ok {
return
}
if latest.Status != RuntimeStatusRunning {
return
}
now := sch.svc.deps.Now().UTC()
next := schedule.Next(now)
wait := next.Sub(now)
if latest.SkipNextTick {
wait = 0
}
if wait < 0 {
wait = 0
}
timer := time.NewTimer(wait)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
// Fresh fetch in case of pause / status change while waiting.
current, ok := sch.svc.deps.Cache.GetRuntime(rec.GameID)
if !ok {
return
}
if current.Status != RuntimeStatusRunning {
return
}
if current.Paused {
continue
}
if err := sch.tick(ctx, current); err != nil {
logger.Warn("scheduler tick failed", zap.Error(err))
}
}
}
// tick runs one engine /admin/turn call under the per-game mutex,
// publishes the resulting snapshot, and clears `skip_next_tick`.
func (sch *Scheduler) tick(ctx context.Context, rec RuntimeRecord) error {
mu := sch.svc.gameLock(rec.GameID)
if !mu.TryLock() {
return nil // another op is in flight; skip this tick
}
defer mu.Unlock()
op, err := sch.svc.beginOperation(ctx, rec.GameID, OpTurn, OpSourceScheduler)
if err != nil {
return err
}
state, err := sch.svc.deps.Engine.Turn(ctx, rec.EngineEndpoint)
if err != nil {
sch.svc.completeOperation(ctx, op, err)
_, _ = sch.svc.transitionRuntimeStatus(ctx, rec.GameID, RuntimeStatusEngineUnreachable, "")
// On engine unreachable, also clear skip_next_tick so the next
// real tick can start fresh.
_ = sch.clearSkipFlag(ctx, rec.GameID)
// Best-effort: ask Docker whether the container is still
// alive; if it's gone we mark the runtime row as removed.
if rec.CurrentContainerID != "" {
if _, inspErr := sch.svc.deps.Docker.InspectContainer(ctx, rec.CurrentContainerID); errors.Is(inspErr, dockerclient.ErrContainerNotFound) {
_, _ = sch.svc.transitionRuntimeStatus(ctx, rec.GameID, RuntimeStatusRemoved, "")
}
}
return err
}
if err := sch.svc.publishSnapshot(ctx, rec.GameID, state); err != nil {
sch.svc.completeOperation(ctx, op, err)
return err
}
sch.svc.completeOperation(ctx, op, nil)
_ = sch.clearSkipFlag(ctx, rec.GameID)
return nil
}
func (sch *Scheduler) clearSkipFlag(ctx context.Context, gameID uuid.UUID) error {
rec, ok := sch.svc.deps.Cache.GetRuntime(gameID)
if !ok || !rec.SkipNextTick {
return nil
}
skip := false
now := sch.svc.deps.Now().UTC()
updated, err := sch.svc.deps.Store.UpdateRuntimeRecord(ctx, gameID, runtimeRecordUpdate{SkipNextTick: &skip}, now)
if err != nil {
return err
}
sch.svc.deps.Cache.PutRuntime(updated)
return nil
}