package runtime import ( "context" "errors" "fmt" "strings" "time" "galaxy/backend/internal/dockerclient" "github.com/google/uuid" "go.uber.org/zap" ) // Reconciler runs an immediate startup pass plus a periodic ticker // (`BACKEND_RUNTIME_RECONCILE_INTERVAL`). On every pass it diffs // labelled containers reported by Docker against // `runtime_records`, adopts unrecorded labelled containers, marks // recorded-but-missing as `removed`, and publishes a fresh snapshot // for matched pairs. // // Implements `internal/app.Component`. type Reconciler struct { svc *Service } // NewReconciler builds a Reconciler bound to svc. func NewReconciler(svc *Service) *Reconciler { return &Reconciler{svc: svc} } // Run drives the reconciliation loop until ctx is cancelled. func (r *Reconciler) Run(ctx context.Context) error { if r == nil { return nil } logger := r.svc.deps.Logger.Named("reconciler") if err := r.tick(ctx); err != nil { logger.Warn("initial reconcile tick failed", zap.Error(err)) } ticker := time.NewTicker(r.svc.deps.Config.ReconcileInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return nil case <-ticker.C: if err := r.tick(ctx); err != nil { logger.Warn("reconcile tick failed", zap.Error(err)) } } } } // Shutdown is a no-op: each tick is synchronous inside Run. func (r *Reconciler) Shutdown(_ context.Context) error { return nil } // Tick runs a single reconciliation pass. Exposed for tests so they // can drive the reconciler without timing dependencies. func (r *Reconciler) Tick(ctx context.Context) error { return r.tick(ctx) } func (r *Reconciler) tick(ctx context.Context) error { containers, err := r.svc.deps.Docker.List(ctx, dockerclient.ListFilter{ Labels: map[string]string{dockerclient.ManagedLabel: dockerclient.ManagedLabelValue}, }) if err != nil { return fmt.Errorf("list managed containers: %w", err) } byContainerID := make(map[string]dockerclient.ContainerSummary, len(containers)) byGameID := make(map[uuid.UUID]dockerclient.ContainerSummary, len(containers)) for _, c := range containers { byContainerID[c.ID] = c gameID, ok := parseGameIDFromContainerName(c.Name) if ok { byGameID[gameID] = c } } records, err := r.svc.deps.Store.ListAllRuntimeRecords(ctx) if err != nil { return fmt.Errorf("list runtime records: %w", err) } knownGames := make(map[uuid.UUID]struct{}, len(records)) var errs []error for _, rec := range records { knownGames[rec.GameID] = struct{}{} if rec.IsTerminal() { continue } c, matched := matchContainer(rec, byContainerID, byGameID) if !matched { if err := r.markRemoved(ctx, rec); err != nil { errs = append(errs, fmt.Errorf("mark removed %s: %w", rec.GameID, err)) } continue } if err := r.refreshSnapshot(ctx, rec, c); err != nil { errs = append(errs, fmt.Errorf("refresh snapshot %s: %w", rec.GameID, err)) } } for gameID, c := range byGameID { if _, ok := knownGames[gameID]; ok { continue } if err := r.adopt(ctx, gameID, c); err != nil { errs = append(errs, fmt.Errorf("adopt %s: %w", gameID, err)) } } return errors.Join(errs...) } func matchContainer(rec RuntimeRecord, byContainerID map[string]dockerclient.ContainerSummary, byGameID map[uuid.UUID]dockerclient.ContainerSummary) (dockerclient.ContainerSummary, bool) { if rec.CurrentContainerID != "" { if c, ok := byContainerID[rec.CurrentContainerID]; ok { return c, true } } if c, ok := byGameID[rec.GameID]; ok { return c, true } return dockerclient.ContainerSummary{}, false } func (r *Reconciler) markRemoved(ctx context.Context, rec RuntimeRecord) error { updated, err := r.svc.transitionRuntimeStatus(ctx, rec.GameID, RuntimeStatusRemoved, "") if err != nil { return err } r.svc.deps.Cache.PutRuntime(updated) if r.svc.deps.Lobby != nil { err = r.svc.deps.Lobby.OnRuntimeJobResult(ctx, rec.GameID, JobResult{ Op: OpReconcile, Status: RuntimeStatusRemoved, Message: "container disappeared", }) if err != nil { r.svc.deps.Logger.Warn("lobby OnRuntimeJobResult failed", zap.String("game_id", rec.GameID.String()), zap.Error(err)) } } return nil } func (r *Reconciler) adopt(ctx context.Context, gameID uuid.UUID, c dockerclient.ContainerSummary) error { endpoint := fmt.Sprintf("http://%s:%d", HostName(gameID.String()), 8080) game, err := r.svc.deps.Store.LoadGameProjection(ctx, gameID) if err != nil { if errors.Is(err, ErrNotFound) { r.svc.deps.Logger.Warn("orphan container, no matching game", zap.String("game_id", gameID.String()), zap.String("container_id", c.ID)) return nil } return err } rec, err := r.svc.upsertRuntimeRecord(ctx, runtimeRecordInsert{ GameID: gameID, Status: RuntimeStatusRunning, CurrentContainerID: c.ID, CurrentImageRef: c.ImageRef, CurrentEngineVersion: c.Labels["galaxy.engine_version"], EngineEndpoint: endpoint, DockerNetwork: r.svc.dockerNetwork(), TurnSchedule: game.TurnSchedule, }, runtimeRecordUpdate{ Status: strPtr(RuntimeStatusRunning), CurrentContainerID: strPtr(c.ID), CurrentImageRef: strPtr(c.ImageRef), CurrentEngineVersion: strPtr(c.Labels["galaxy.engine_version"]), EngineEndpoint: strPtr(endpoint), }) if err != nil { return err } r.svc.deps.Cache.PutRuntime(rec) r.svc.scheduler.startGame(rec) return nil } func (r *Reconciler) refreshSnapshot(ctx context.Context, rec RuntimeRecord, _ dockerclient.ContainerSummary) error { state, err := r.svc.deps.Engine.Status(ctx, rec.EngineEndpoint) if err != nil { _, _ = r.svc.transitionRuntimeStatus(ctx, rec.GameID, RuntimeStatusEngineUnreachable, "") return nil } return r.svc.publishSnapshot(ctx, rec.GameID, state) } func parseGameIDFromContainerName(name string) (uuid.UUID, bool) { const prefix = "galaxy-game-" suffix := strings.TrimPrefix(name, prefix) if suffix == name { return uuid.Nil, false } parsed, err := uuid.Parse(suffix) if err != nil { return uuid.Nil, false } return parsed, true }