204 lines
6.0 KiB
Go
204 lines
6.0 KiB
Go
package runtime
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"galaxy/backend/internal/dockerclient"
|
|
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Reconciler runs an immediate startup pass plus a periodic ticker
|
|
// (`BACKEND_RUNTIME_RECONCILE_INTERVAL`). On every pass it diffs
|
|
// labelled containers reported by Docker against
|
|
// `runtime_records`, adopts unrecorded labelled containers, marks
|
|
// recorded-but-missing as `removed`, and publishes a fresh snapshot
|
|
// for matched pairs.
|
|
//
|
|
// Implements `internal/app.Component`.
|
|
type Reconciler struct {
|
|
svc *Service
|
|
}
|
|
|
|
// NewReconciler builds a Reconciler bound to svc.
|
|
func NewReconciler(svc *Service) *Reconciler { return &Reconciler{svc: svc} }
|
|
|
|
// Run drives the reconciliation loop until ctx is cancelled.
|
|
func (r *Reconciler) Run(ctx context.Context) error {
|
|
if r == nil {
|
|
return nil
|
|
}
|
|
logger := r.svc.deps.Logger.Named("reconciler")
|
|
if err := r.tick(ctx); err != nil {
|
|
logger.Warn("initial reconcile tick failed", zap.Error(err))
|
|
}
|
|
ticker := time.NewTicker(r.svc.deps.Config.ReconcileInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
if err := r.tick(ctx); err != nil {
|
|
logger.Warn("reconcile tick failed", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Shutdown is a no-op: each tick is synchronous inside Run.
|
|
func (r *Reconciler) Shutdown(_ context.Context) error { return nil }
|
|
|
|
// Tick runs a single reconciliation pass. Exposed for tests so they
|
|
// can drive the reconciler without timing dependencies.
|
|
func (r *Reconciler) Tick(ctx context.Context) error { return r.tick(ctx) }
|
|
|
|
func (r *Reconciler) tick(ctx context.Context) error {
|
|
containers, err := r.svc.deps.Docker.List(ctx, dockerclient.ListFilter{
|
|
Labels: map[string]string{dockerclient.ManagedLabel: dockerclient.ManagedLabelValue},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("list managed containers: %w", err)
|
|
}
|
|
|
|
byContainerID := make(map[string]dockerclient.ContainerSummary, len(containers))
|
|
byGameID := make(map[uuid.UUID]dockerclient.ContainerSummary, len(containers))
|
|
for _, c := range containers {
|
|
byContainerID[c.ID] = c
|
|
gameID, ok := parseGameIDFromContainerName(c.Name)
|
|
if ok {
|
|
byGameID[gameID] = c
|
|
}
|
|
}
|
|
|
|
records, err := r.svc.deps.Store.ListAllRuntimeRecords(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("list runtime records: %w", err)
|
|
}
|
|
knownGames := make(map[uuid.UUID]struct{}, len(records))
|
|
|
|
var errs []error
|
|
for _, rec := range records {
|
|
knownGames[rec.GameID] = struct{}{}
|
|
if rec.IsTerminal() {
|
|
continue
|
|
}
|
|
c, matched := matchContainer(rec, byContainerID, byGameID)
|
|
if !matched {
|
|
if err := r.markRemoved(ctx, rec); err != nil {
|
|
errs = append(errs, fmt.Errorf("mark removed %s: %w", rec.GameID, err))
|
|
}
|
|
continue
|
|
}
|
|
if err := r.refreshSnapshot(ctx, rec, c); err != nil {
|
|
errs = append(errs, fmt.Errorf("refresh snapshot %s: %w", rec.GameID, err))
|
|
}
|
|
}
|
|
|
|
for gameID, c := range byGameID {
|
|
if _, ok := knownGames[gameID]; ok {
|
|
continue
|
|
}
|
|
if err := r.adopt(ctx, gameID, c); err != nil {
|
|
errs = append(errs, fmt.Errorf("adopt %s: %w", gameID, err))
|
|
}
|
|
}
|
|
return errors.Join(errs...)
|
|
}
|
|
|
|
func matchContainer(rec RuntimeRecord, byContainerID map[string]dockerclient.ContainerSummary, byGameID map[uuid.UUID]dockerclient.ContainerSummary) (dockerclient.ContainerSummary, bool) {
|
|
if rec.CurrentContainerID != "" {
|
|
if c, ok := byContainerID[rec.CurrentContainerID]; ok {
|
|
return c, true
|
|
}
|
|
}
|
|
if c, ok := byGameID[rec.GameID]; ok {
|
|
return c, true
|
|
}
|
|
return dockerclient.ContainerSummary{}, false
|
|
}
|
|
|
|
func (r *Reconciler) markRemoved(ctx context.Context, rec RuntimeRecord) error {
|
|
updated, err := r.svc.transitionRuntimeStatus(ctx, rec.GameID, RuntimeStatusRemoved, "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.svc.deps.Cache.PutRuntime(updated)
|
|
if r.svc.deps.Lobby != nil {
|
|
err = r.svc.deps.Lobby.OnRuntimeJobResult(ctx, rec.GameID, JobResult{
|
|
Op: OpReconcile,
|
|
Status: RuntimeStatusRemoved,
|
|
Message: "container disappeared",
|
|
})
|
|
if err != nil {
|
|
r.svc.deps.Logger.Warn("lobby OnRuntimeJobResult failed",
|
|
zap.String("game_id", rec.GameID.String()),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *Reconciler) adopt(ctx context.Context, gameID uuid.UUID, c dockerclient.ContainerSummary) error {
|
|
endpoint := fmt.Sprintf("http://%s:%d", HostName(gameID.String()), 8080)
|
|
game, err := r.svc.deps.Store.LoadGameProjection(ctx, gameID)
|
|
if err != nil {
|
|
if errors.Is(err, ErrNotFound) {
|
|
r.svc.deps.Logger.Warn("orphan container, no matching game",
|
|
zap.String("game_id", gameID.String()),
|
|
zap.String("container_id", c.ID))
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
rec, err := r.svc.upsertRuntimeRecord(ctx, runtimeRecordInsert{
|
|
GameID: gameID,
|
|
Status: RuntimeStatusRunning,
|
|
CurrentContainerID: c.ID,
|
|
CurrentImageRef: c.ImageRef,
|
|
CurrentEngineVersion: c.Labels["galaxy.engine_version"],
|
|
EngineEndpoint: endpoint,
|
|
DockerNetwork: r.svc.dockerNetwork(),
|
|
TurnSchedule: game.TurnSchedule,
|
|
}, runtimeRecordUpdate{
|
|
Status: strPtr(RuntimeStatusRunning),
|
|
CurrentContainerID: strPtr(c.ID),
|
|
CurrentImageRef: strPtr(c.ImageRef),
|
|
CurrentEngineVersion: strPtr(c.Labels["galaxy.engine_version"]),
|
|
EngineEndpoint: strPtr(endpoint),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.svc.deps.Cache.PutRuntime(rec)
|
|
r.svc.scheduler.startGame(rec)
|
|
return nil
|
|
}
|
|
|
|
func (r *Reconciler) refreshSnapshot(ctx context.Context, rec RuntimeRecord, _ dockerclient.ContainerSummary) error {
|
|
state, err := r.svc.deps.Engine.Status(ctx, rec.EngineEndpoint)
|
|
if err != nil {
|
|
_, _ = r.svc.transitionRuntimeStatus(ctx, rec.GameID, RuntimeStatusEngineUnreachable, "")
|
|
return nil
|
|
}
|
|
return r.svc.publishSnapshot(ctx, rec.GameID, state)
|
|
}
|
|
|
|
func parseGameIDFromContainerName(name string) (uuid.UUID, bool) {
|
|
const prefix = "galaxy-game-"
|
|
suffix := strings.TrimPrefix(name, prefix)
|
|
if suffix == name {
|
|
return uuid.Nil, false
|
|
}
|
|
parsed, err := uuid.Parse(suffix)
|
|
if err != nil {
|
|
return uuid.Nil, false
|
|
}
|
|
return parsed, true
|
|
}
|