715 lines
24 KiB
Go
715 lines
24 KiB
Go
package runtime
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"galaxy/backend/internal/postgres/jet/backend/model"
|
|
"galaxy/backend/internal/postgres/jet/backend/table"
|
|
|
|
"github.com/go-jet/jet/v2/postgres"
|
|
"github.com/go-jet/jet/v2/qrm"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// engineVersionsPK is the constraint name surfaced when a duplicate
|
|
// `version` is inserted. Postgres synthesises `<table>_pkey` for the
|
|
// primary-key constraint, matching the migration in
|
|
// `backend/internal/postgres/migrations/00001_init.sql:407`.
|
|
const engineVersionsPK = "engine_versions_pkey"
|
|
|
|
// runtimeRecordsPK is the constraint name surfaced when a duplicate
|
|
// `runtime_records.game_id` insert hits the primary key.
|
|
const runtimeRecordsPK = "runtime_records_pkey"
|
|
|
|
// playerMappingsRaceUnique mirrors
|
|
// `player_mappings_game_race_uidx`, the partial UNIQUE that enforces
|
|
// the one-race-per-game invariant.
|
|
const playerMappingsRaceUnique = "player_mappings_game_race_uidx"
|
|
|
|
// Store is the Postgres-backed query surface for the runtime package.
|
|
// All queries are built through go-jet against the generated table
|
|
// bindings under `backend/internal/postgres/jet/backend/table`.
|
|
type Store struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// NewStore constructs a Store wrapping db.
|
|
func NewStore(db *sql.DB) *Store { return &Store{db: db} }
|
|
|
|
// engineVersionColumns is the canonical projection used by every
|
|
// engine-version read path.
|
|
func engineVersionColumns() postgres.ColumnList {
|
|
v := table.EngineVersions
|
|
return postgres.ColumnList{v.Version, v.ImageRef, v.Enabled, v.CreatedAt, v.UpdatedAt}
|
|
}
|
|
|
|
// runtimeRecordColumns is the canonical projection used by every
|
|
// runtime-record read path.
|
|
func runtimeRecordColumns() postgres.ColumnList {
|
|
r := table.RuntimeRecords
|
|
return postgres.ColumnList{
|
|
r.GameID, r.Status, r.CurrentContainerID, r.CurrentImageRef,
|
|
r.CurrentEngineVersion, r.EngineEndpoint, r.StatePath, r.DockerNetwork,
|
|
r.TurnSchedule, r.CurrentTurn, r.NextGenerationAt, r.SkipNextTick,
|
|
r.Paused, r.PausedAt, r.EngineHealth,
|
|
r.CreatedAt, r.UpdatedAt, r.StartedAt, r.StoppedAt, r.FinishedAt,
|
|
r.RemovedAt, r.LastObservedAt,
|
|
}
|
|
}
|
|
|
|
// operationLogColumns is the canonical projection used by every read
|
|
// of `backend.runtime_operation_log`.
|
|
func operationLogColumns() postgres.ColumnList {
|
|
o := table.RuntimeOperationLog
|
|
return postgres.ColumnList{
|
|
o.OperationID, o.GameID, o.Op, o.Source, o.Status, o.ImageRef,
|
|
o.ContainerID, o.ErrorCode, o.ErrorMessage, o.StartedAt, o.FinishedAt,
|
|
}
|
|
}
|
|
|
|
// =====================================================================
|
|
// Engine version registry
|
|
// =====================================================================
|
|
|
|
// ListEngineVersions returns every engine_versions row ordered by
|
|
// created_at DESC.
|
|
func (s *Store) ListEngineVersions(ctx context.Context) ([]EngineVersion, error) {
|
|
v := table.EngineVersions
|
|
stmt := postgres.SELECT(engineVersionColumns()).
|
|
FROM(v).
|
|
ORDER_BY(v.CreatedAt.DESC(), v.Version.DESC())
|
|
var rows []model.EngineVersions
|
|
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
|
|
return nil, fmt.Errorf("runtime store: list engine versions: %w", err)
|
|
}
|
|
out := make([]EngineVersion, 0, len(rows))
|
|
for _, row := range rows {
|
|
out = append(out, modelToEngineVersion(row))
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// GetEngineVersion returns the row for version. Returns ErrNotFound
|
|
// when no row matches.
|
|
func (s *Store) GetEngineVersion(ctx context.Context, version string) (EngineVersion, error) {
|
|
v := table.EngineVersions
|
|
stmt := postgres.SELECT(engineVersionColumns()).
|
|
FROM(v).
|
|
WHERE(v.Version.EQ(postgres.String(version))).
|
|
LIMIT(1)
|
|
var row model.EngineVersions
|
|
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
return EngineVersion{}, ErrNotFound
|
|
}
|
|
return EngineVersion{}, fmt.Errorf("runtime store: load engine version %q: %w", version, err)
|
|
}
|
|
return modelToEngineVersion(row), nil
|
|
}
|
|
|
|
// InsertEngineVersion persists a fresh engine version row. Returns
|
|
// ErrEngineVersionTaken when the primary key collides.
|
|
func (s *Store) InsertEngineVersion(ctx context.Context, version, imageRef string, enabled bool, now time.Time) (EngineVersion, error) {
|
|
v := table.EngineVersions
|
|
stmt := v.INSERT(v.Version, v.ImageRef, v.Enabled, v.CreatedAt, v.UpdatedAt).
|
|
VALUES(version, imageRef, enabled, now, now).
|
|
RETURNING(engineVersionColumns())
|
|
var row model.EngineVersions
|
|
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
|
if isUniqueViolation(err, engineVersionsPK) {
|
|
return EngineVersion{}, ErrEngineVersionTaken
|
|
}
|
|
return EngineVersion{}, fmt.Errorf("runtime store: insert engine version %q: %w", version, err)
|
|
}
|
|
return modelToEngineVersion(row), nil
|
|
}
|
|
|
|
// engineVersionUpdate carries the parameters for UpdateEngineVersion.
|
|
// Nil pointers leave the corresponding column alone.
|
|
type engineVersionUpdate struct {
|
|
ImageRef *string
|
|
Enabled *bool
|
|
}
|
|
|
|
// UpdateEngineVersion patches the supplied columns and bumps
|
|
// updated_at. Returns ErrNotFound when no row matches.
|
|
func (s *Store) UpdateEngineVersion(ctx context.Context, version string, patch engineVersionUpdate, now time.Time) (EngineVersion, error) {
|
|
v := table.EngineVersions
|
|
rest := []any{}
|
|
if patch.ImageRef != nil {
|
|
rest = append(rest, v.ImageRef.SET(postgres.String(*patch.ImageRef)))
|
|
}
|
|
if patch.Enabled != nil {
|
|
rest = append(rest, v.Enabled.SET(postgres.Bool(*patch.Enabled)))
|
|
}
|
|
stmt := v.UPDATE().
|
|
SET(v.UpdatedAt.SET(postgres.TimestampzT(now)), rest...).
|
|
WHERE(v.Version.EQ(postgres.String(version))).
|
|
RETURNING(engineVersionColumns())
|
|
|
|
var row model.EngineVersions
|
|
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
return EngineVersion{}, ErrNotFound
|
|
}
|
|
return EngineVersion{}, fmt.Errorf("runtime store: update engine version %q: %w", version, err)
|
|
}
|
|
return modelToEngineVersion(row), nil
|
|
}
|
|
|
|
// =====================================================================
|
|
// Runtime records
|
|
// =====================================================================
|
|
|
|
// runtimeRecordInsert carries the parameters for InsertRuntimeRecord.
|
|
type runtimeRecordInsert struct {
|
|
GameID uuid.UUID
|
|
Status string
|
|
CurrentContainerID string
|
|
CurrentImageRef string
|
|
CurrentEngineVersion string
|
|
EngineEndpoint string
|
|
StatePath string
|
|
DockerNetwork string
|
|
TurnSchedule string
|
|
StartedAt *time.Time
|
|
}
|
|
|
|
// InsertRuntimeRecord creates a fresh row.
|
|
func (s *Store) InsertRuntimeRecord(ctx context.Context, in runtimeRecordInsert) (RuntimeRecord, error) {
|
|
r := table.RuntimeRecords
|
|
stmt := r.INSERT(
|
|
r.GameID, r.Status, r.CurrentContainerID, r.CurrentImageRef,
|
|
r.CurrentEngineVersion, r.EngineEndpoint, r.StatePath,
|
|
r.DockerNetwork, r.TurnSchedule, r.StartedAt,
|
|
).VALUES(
|
|
in.GameID, in.Status,
|
|
nullableString(in.CurrentContainerID),
|
|
nullableString(in.CurrentImageRef),
|
|
nullableString(in.CurrentEngineVersion),
|
|
in.EngineEndpoint,
|
|
nullableString(in.StatePath),
|
|
nullableString(in.DockerNetwork),
|
|
in.TurnSchedule,
|
|
nullableTime(in.StartedAt),
|
|
).RETURNING(runtimeRecordColumns())
|
|
|
|
var row model.RuntimeRecords
|
|
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
|
if isUniqueViolation(err, runtimeRecordsPK) {
|
|
return RuntimeRecord{}, ErrConflict
|
|
}
|
|
return RuntimeRecord{}, fmt.Errorf("runtime store: insert runtime_record %s: %w", in.GameID, err)
|
|
}
|
|
return modelToRuntimeRecord(row), nil
|
|
}
|
|
|
|
// LoadRuntimeRecord returns the row for gameID. Returns ErrNotFound
|
|
// when no row matches.
|
|
func (s *Store) LoadRuntimeRecord(ctx context.Context, gameID uuid.UUID) (RuntimeRecord, error) {
|
|
r := table.RuntimeRecords
|
|
stmt := postgres.SELECT(runtimeRecordColumns()).
|
|
FROM(r).
|
|
WHERE(r.GameID.EQ(postgres.UUID(gameID))).
|
|
LIMIT(1)
|
|
var row model.RuntimeRecords
|
|
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
return RuntimeRecord{}, ErrNotFound
|
|
}
|
|
return RuntimeRecord{}, fmt.Errorf("runtime store: load runtime_record %s: %w", gameID, err)
|
|
}
|
|
return modelToRuntimeRecord(row), nil
|
|
}
|
|
|
|
// ListAllRuntimeRecords returns every row, used by Cache.Warm.
|
|
func (s *Store) ListAllRuntimeRecords(ctx context.Context) ([]RuntimeRecord, error) {
|
|
stmt := postgres.SELECT(runtimeRecordColumns()).FROM(table.RuntimeRecords)
|
|
var rows []model.RuntimeRecords
|
|
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
|
|
return nil, fmt.Errorf("runtime store: list runtime_records: %w", err)
|
|
}
|
|
out := make([]RuntimeRecord, 0, len(rows))
|
|
for _, row := range rows {
|
|
out = append(out, modelToRuntimeRecord(row))
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// runtimeRecordUpdate carries the parameters for UpdateRuntimeRecord.
|
|
// Pointer fields default to "leave alone" when nil.
|
|
type runtimeRecordUpdate struct {
|
|
Status *string
|
|
CurrentContainerID *string
|
|
CurrentImageRef *string
|
|
CurrentEngineVersion *string
|
|
EngineEndpoint *string
|
|
StatePath *string
|
|
DockerNetwork *string
|
|
TurnSchedule *string
|
|
CurrentTurn *int32
|
|
NextGenerationAt **time.Time
|
|
SkipNextTick *bool
|
|
Paused *bool
|
|
PausedAt **time.Time
|
|
EngineHealth *string
|
|
StartedAt **time.Time
|
|
StoppedAt **time.Time
|
|
FinishedAt **time.Time
|
|
RemovedAt **time.Time
|
|
LastObservedAt **time.Time
|
|
}
|
|
|
|
// UpdateRuntimeRecord patches the supplied columns. Pointer fields are
|
|
// translated into a dynamic SET list — only the fields the caller
|
|
// supplies are emitted in the UPDATE. Nullable timestamps use a
|
|
// `**time.Time` so callers can distinguish "leave alone" (outer nil)
|
|
// from "clear to NULL" (inner nil).
|
|
func (s *Store) UpdateRuntimeRecord(ctx context.Context, gameID uuid.UUID, patch runtimeRecordUpdate, now time.Time) (RuntimeRecord, error) {
|
|
r := table.RuntimeRecords
|
|
rest := []any{}
|
|
if patch.Status != nil {
|
|
rest = append(rest, r.Status.SET(postgres.String(*patch.Status)))
|
|
}
|
|
if patch.CurrentContainerID != nil {
|
|
rest = append(rest, r.CurrentContainerID.SET(nullableStringSetExpr(*patch.CurrentContainerID)))
|
|
}
|
|
if patch.CurrentImageRef != nil {
|
|
rest = append(rest, r.CurrentImageRef.SET(nullableStringSetExpr(*patch.CurrentImageRef)))
|
|
}
|
|
if patch.CurrentEngineVersion != nil {
|
|
rest = append(rest, r.CurrentEngineVersion.SET(nullableStringSetExpr(*patch.CurrentEngineVersion)))
|
|
}
|
|
if patch.EngineEndpoint != nil {
|
|
rest = append(rest, r.EngineEndpoint.SET(postgres.String(*patch.EngineEndpoint)))
|
|
}
|
|
if patch.StatePath != nil {
|
|
rest = append(rest, r.StatePath.SET(nullableStringSetExpr(*patch.StatePath)))
|
|
}
|
|
if patch.DockerNetwork != nil {
|
|
rest = append(rest, r.DockerNetwork.SET(nullableStringSetExpr(*patch.DockerNetwork)))
|
|
}
|
|
if patch.TurnSchedule != nil {
|
|
rest = append(rest, r.TurnSchedule.SET(postgres.String(*patch.TurnSchedule)))
|
|
}
|
|
if patch.CurrentTurn != nil {
|
|
rest = append(rest, r.CurrentTurn.SET(postgres.Int(int64(*patch.CurrentTurn))))
|
|
}
|
|
if patch.NextGenerationAt != nil {
|
|
rest = append(rest, r.NextGenerationAt.SET(timePtrSetExpr(*patch.NextGenerationAt)))
|
|
}
|
|
if patch.SkipNextTick != nil {
|
|
rest = append(rest, r.SkipNextTick.SET(postgres.Bool(*patch.SkipNextTick)))
|
|
}
|
|
if patch.Paused != nil {
|
|
rest = append(rest, r.Paused.SET(postgres.Bool(*patch.Paused)))
|
|
}
|
|
if patch.PausedAt != nil {
|
|
rest = append(rest, r.PausedAt.SET(timePtrSetExpr(*patch.PausedAt)))
|
|
}
|
|
if patch.EngineHealth != nil {
|
|
rest = append(rest, r.EngineHealth.SET(postgres.String(*patch.EngineHealth)))
|
|
}
|
|
if patch.StartedAt != nil {
|
|
rest = append(rest, r.StartedAt.SET(timePtrSetExpr(*patch.StartedAt)))
|
|
}
|
|
if patch.StoppedAt != nil {
|
|
rest = append(rest, r.StoppedAt.SET(timePtrSetExpr(*patch.StoppedAt)))
|
|
}
|
|
if patch.FinishedAt != nil {
|
|
rest = append(rest, r.FinishedAt.SET(timePtrSetExpr(*patch.FinishedAt)))
|
|
}
|
|
if patch.RemovedAt != nil {
|
|
rest = append(rest, r.RemovedAt.SET(timePtrSetExpr(*patch.RemovedAt)))
|
|
}
|
|
if patch.LastObservedAt != nil {
|
|
rest = append(rest, r.LastObservedAt.SET(timePtrSetExpr(*patch.LastObservedAt)))
|
|
}
|
|
|
|
stmt := r.UPDATE().
|
|
SET(r.UpdatedAt.SET(postgres.TimestampzT(now)), rest...).
|
|
WHERE(r.GameID.EQ(postgres.UUID(gameID))).
|
|
RETURNING(runtimeRecordColumns())
|
|
|
|
var row model.RuntimeRecords
|
|
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
return RuntimeRecord{}, ErrNotFound
|
|
}
|
|
return RuntimeRecord{}, fmt.Errorf("runtime store: update runtime_record %s: %w", gameID, err)
|
|
}
|
|
return modelToRuntimeRecord(row), nil
|
|
}
|
|
|
|
// DeleteRuntimeRecord removes the row at gameID. Idempotent: nil when
|
|
// no row matched.
|
|
func (s *Store) DeleteRuntimeRecord(ctx context.Context, gameID uuid.UUID) error {
|
|
stmt := table.RuntimeRecords.DELETE().
|
|
WHERE(table.RuntimeRecords.GameID.EQ(postgres.UUID(gameID)))
|
|
if _, err := stmt.ExecContext(ctx, s.db); err != nil {
|
|
return fmt.Errorf("runtime store: delete runtime_record %s: %w", gameID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// =====================================================================
|
|
// Player mappings
|
|
// =====================================================================
|
|
|
|
// InsertPlayerMappings persists a slice of mappings in a single
|
|
// transaction. Existing rows for the (game_id, user_id) pair are
|
|
// replaced (ON CONFLICT) so re-runs of StartGame after a transient
|
|
// failure stay idempotent.
|
|
func (s *Store) InsertPlayerMappings(ctx context.Context, mappings []PlayerMapping) error {
|
|
if len(mappings) == 0 {
|
|
return nil
|
|
}
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("runtime store: begin player_mappings tx: %w", err)
|
|
}
|
|
defer func() { _ = tx.Rollback() }()
|
|
pm := table.PlayerMappings
|
|
for _, m := range mappings {
|
|
stmt := pm.INSERT(pm.GameID, pm.UserID, pm.RaceName, pm.EnginePlayerUUID).
|
|
VALUES(m.GameID, m.UserID, m.RaceName, m.EnginePlayerUUID).
|
|
ON_CONFLICT(pm.GameID, pm.UserID).
|
|
DO_UPDATE(postgres.SET(
|
|
pm.RaceName.SET(pm.EXCLUDED.RaceName),
|
|
pm.EnginePlayerUUID.SET(pm.EXCLUDED.EnginePlayerUUID),
|
|
))
|
|
if _, err := stmt.ExecContext(ctx, tx); err != nil {
|
|
if isUniqueViolation(err, playerMappingsRaceUnique) {
|
|
return fmt.Errorf("%w: race name %q duplicated within game", ErrConflict, m.RaceName)
|
|
}
|
|
return fmt.Errorf("runtime store: insert player_mapping %s/%s: %w", m.GameID, m.UserID, err)
|
|
}
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("runtime store: commit player_mappings: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// LoadPlayerMapping returns the mapping for (gameID, userID). Returns
|
|
// ErrNotFound when no row matches.
|
|
func (s *Store) LoadPlayerMapping(ctx context.Context, gameID, userID uuid.UUID) (PlayerMapping, error) {
|
|
pm := table.PlayerMappings
|
|
stmt := postgres.SELECT(pm.GameID, pm.UserID, pm.RaceName, pm.EnginePlayerUUID, pm.CreatedAt).
|
|
FROM(pm).
|
|
WHERE(
|
|
pm.GameID.EQ(postgres.UUID(gameID)).
|
|
AND(pm.UserID.EQ(postgres.UUID(userID))),
|
|
).
|
|
LIMIT(1)
|
|
var row model.PlayerMappings
|
|
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
return PlayerMapping{}, ErrNotFound
|
|
}
|
|
return PlayerMapping{}, fmt.Errorf("runtime store: load player_mapping: %w", err)
|
|
}
|
|
return modelToPlayerMapping(row), nil
|
|
}
|
|
|
|
// ListPlayerMappingsForGame returns every mapping for gameID.
|
|
func (s *Store) ListPlayerMappingsForGame(ctx context.Context, gameID uuid.UUID) ([]PlayerMapping, error) {
|
|
pm := table.PlayerMappings
|
|
stmt := postgres.SELECT(pm.GameID, pm.UserID, pm.RaceName, pm.EnginePlayerUUID, pm.CreatedAt).
|
|
FROM(pm).
|
|
WHERE(pm.GameID.EQ(postgres.UUID(gameID))).
|
|
ORDER_BY(pm.RaceName.ASC())
|
|
var rows []model.PlayerMappings
|
|
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
|
|
return nil, fmt.Errorf("runtime store: list player_mappings: %w", err)
|
|
}
|
|
out := make([]PlayerMapping, 0, len(rows))
|
|
for _, row := range rows {
|
|
out = append(out, modelToPlayerMapping(row))
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// DeletePlayerMappingsForGame removes every mapping for gameID. Used
|
|
// on stop / cancel / reconciler-removal so a future StartGame can
|
|
// repopulate the projection without violating the per-game UNIQUE.
|
|
func (s *Store) DeletePlayerMappingsForGame(ctx context.Context, gameID uuid.UUID) error {
|
|
stmt := table.PlayerMappings.DELETE().
|
|
WHERE(table.PlayerMappings.GameID.EQ(postgres.UUID(gameID)))
|
|
if _, err := stmt.ExecContext(ctx, s.db); err != nil {
|
|
return fmt.Errorf("runtime store: delete player_mappings %s: %w", gameID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// =====================================================================
|
|
// Operation log
|
|
// =====================================================================
|
|
|
|
// operationLogInsert carries the parameters for InsertOperationLog.
|
|
type operationLogInsert struct {
|
|
OperationID uuid.UUID
|
|
GameID uuid.UUID
|
|
Op string
|
|
Source string
|
|
Status string
|
|
ImageRef string
|
|
ContainerID string
|
|
StartedAt time.Time
|
|
}
|
|
|
|
// InsertOperationLog persists a queued / running operation row.
|
|
func (s *Store) InsertOperationLog(ctx context.Context, in operationLogInsert) (OperationLog, error) {
|
|
o := table.RuntimeOperationLog
|
|
stmt := o.INSERT(
|
|
o.OperationID, o.GameID, o.Op, o.Source, o.Status, o.ImageRef,
|
|
o.ContainerID, o.StartedAt,
|
|
).VALUES(
|
|
in.OperationID, in.GameID, in.Op, in.Source, in.Status, in.ImageRef,
|
|
in.ContainerID, in.StartedAt,
|
|
).RETURNING(operationLogColumns())
|
|
var row model.RuntimeOperationLog
|
|
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
|
return OperationLog{}, err
|
|
}
|
|
return modelToOperationLog(row), nil
|
|
}
|
|
|
|
// CompleteOperationLog updates the status / error fields on
|
|
// operationID. Returns the refreshed row.
|
|
func (s *Store) CompleteOperationLog(ctx context.Context, operationID uuid.UUID, status, errCode, errMsg string, finishedAt time.Time) (OperationLog, error) {
|
|
o := table.RuntimeOperationLog
|
|
stmt := o.UPDATE().
|
|
SET(
|
|
o.Status.SET(postgres.String(status)),
|
|
o.ErrorCode.SET(postgres.String(errCode)),
|
|
o.ErrorMessage.SET(postgres.String(errMsg)),
|
|
o.FinishedAt.SET(postgres.TimestampzT(finishedAt)),
|
|
).
|
|
WHERE(o.OperationID.EQ(postgres.UUID(operationID))).
|
|
RETURNING(operationLogColumns())
|
|
var row model.RuntimeOperationLog
|
|
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
return OperationLog{}, ErrNotFound
|
|
}
|
|
return OperationLog{}, fmt.Errorf("runtime store: complete operation_log %s: %w", operationID, err)
|
|
}
|
|
return modelToOperationLog(row), nil
|
|
}
|
|
|
|
// =====================================================================
|
|
// Health snapshots
|
|
// =====================================================================
|
|
|
|
// InsertHealthSnapshot persists a JSON-encoded engine status snapshot.
|
|
func (s *Store) InsertHealthSnapshot(ctx context.Context, snapshotID, gameID uuid.UUID, observedAt time.Time, payload []byte) error {
|
|
hs := table.RuntimeHealthSnapshots
|
|
stmt := hs.INSERT(hs.SnapshotID, hs.GameID, hs.ObservedAt, hs.Payload).
|
|
VALUES(snapshotID, gameID, observedAt, string(payload))
|
|
if _, err := stmt.ExecContext(ctx, s.db); err != nil {
|
|
return fmt.Errorf("runtime store: insert health_snapshot %s: %w", gameID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// =====================================================================
|
|
// Read-only lobby projection (per The implementation D2)
|
|
// =====================================================================
|
|
|
|
// LoadGameProjection reads `backend.games` for runtime's start/stop
|
|
// flow. Lobby remains the only writer of the table; runtime is a
|
|
// read-only consumer. Returns ErrNotFound on miss.
|
|
func (s *Store) LoadGameProjection(ctx context.Context, gameID uuid.UUID) (Game, error) {
|
|
g := table.Games
|
|
stmt := postgres.SELECT(
|
|
g.GameID, g.OwnerUserID, g.Visibility, g.Status, g.GameName,
|
|
g.TurnSchedule, g.TargetEngineVersion,
|
|
g.MinPlayers, g.MaxPlayers, g.StartGapHours, g.StartGapPlayers,
|
|
).
|
|
FROM(g).
|
|
WHERE(g.GameID.EQ(postgres.UUID(gameID))).
|
|
LIMIT(1)
|
|
var row model.Games
|
|
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
return Game{}, ErrNotFound
|
|
}
|
|
return Game{}, fmt.Errorf("runtime store: load game %s: %w", gameID, err)
|
|
}
|
|
out := Game{
|
|
GameID: row.GameID,
|
|
Visibility: row.Visibility,
|
|
Status: row.Status,
|
|
GameName: row.GameName,
|
|
TurnSchedule: row.TurnSchedule,
|
|
TargetEngineVersion: row.TargetEngineVersion,
|
|
MinPlayers: row.MinPlayers,
|
|
MaxPlayers: row.MaxPlayers,
|
|
StartGapHours: row.StartGapHours,
|
|
StartGapPlayers: row.StartGapPlayers,
|
|
}
|
|
if row.OwnerUserID != nil {
|
|
owner := *row.OwnerUserID
|
|
out.OwnerUserID = &owner
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// ListActiveMemberships reads active rows from `backend.memberships`
|
|
// for gameID.
|
|
func (s *Store) ListActiveMemberships(ctx context.Context, gameID uuid.UUID) ([]MembershipRow, error) {
|
|
m := table.Memberships
|
|
stmt := postgres.SELECT(m.MembershipID, m.GameID, m.UserID, m.RaceName).
|
|
FROM(m).
|
|
WHERE(
|
|
m.GameID.EQ(postgres.UUID(gameID)).
|
|
AND(m.Status.EQ(postgres.String("active"))),
|
|
).
|
|
ORDER_BY(m.JoinedAt.ASC())
|
|
var rows []model.Memberships
|
|
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
|
|
return nil, fmt.Errorf("runtime store: list memberships %s: %w", gameID, err)
|
|
}
|
|
out := make([]MembershipRow, 0, len(rows))
|
|
for _, row := range rows {
|
|
out = append(out, MembershipRow{
|
|
MembershipID: row.MembershipID,
|
|
GameID: row.GameID,
|
|
UserID: row.UserID,
|
|
RaceName: row.RaceName,
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// =====================================================================
|
|
// Model → domain converters
|
|
// =====================================================================
|
|
|
|
func modelToEngineVersion(row model.EngineVersions) EngineVersion {
|
|
return EngineVersion{
|
|
Version: row.Version,
|
|
ImageRef: row.ImageRef,
|
|
Enabled: row.Enabled,
|
|
CreatedAt: row.CreatedAt,
|
|
UpdatedAt: row.UpdatedAt,
|
|
}
|
|
}
|
|
|
|
func modelToRuntimeRecord(row model.RuntimeRecords) RuntimeRecord {
|
|
rec := RuntimeRecord{
|
|
GameID: row.GameID,
|
|
Status: row.Status,
|
|
EngineEndpoint: row.EngineEndpoint,
|
|
TurnSchedule: row.TurnSchedule,
|
|
CurrentTurn: row.CurrentTurn,
|
|
SkipNextTick: row.SkipNextTick,
|
|
Paused: row.Paused,
|
|
EngineHealth: row.EngineHealth,
|
|
CreatedAt: row.CreatedAt,
|
|
UpdatedAt: row.UpdatedAt,
|
|
CurrentContainerID: derefString(row.CurrentContainerID),
|
|
CurrentImageRef: derefString(row.CurrentImageRef),
|
|
CurrentEngineVersion: derefString(row.CurrentEngineVersion),
|
|
StatePath: derefString(row.StatePath),
|
|
DockerNetwork: derefString(row.DockerNetwork),
|
|
}
|
|
rec.NextGenerationAt = copyTimePtr(row.NextGenerationAt)
|
|
rec.PausedAt = copyTimePtr(row.PausedAt)
|
|
rec.StartedAt = copyTimePtr(row.StartedAt)
|
|
rec.StoppedAt = copyTimePtr(row.StoppedAt)
|
|
rec.FinishedAt = copyTimePtr(row.FinishedAt)
|
|
rec.RemovedAt = copyTimePtr(row.RemovedAt)
|
|
rec.LastObservedAt = copyTimePtr(row.LastObservedAt)
|
|
return rec
|
|
}
|
|
|
|
func modelToOperationLog(row model.RuntimeOperationLog) OperationLog {
|
|
op := OperationLog{
|
|
OperationID: row.OperationID,
|
|
GameID: row.GameID,
|
|
Op: row.Op,
|
|
Source: row.Source,
|
|
Status: row.Status,
|
|
ImageRef: row.ImageRef,
|
|
ContainerID: row.ContainerID,
|
|
ErrorCode: row.ErrorCode,
|
|
ErrorMessage: row.ErrorMessage,
|
|
StartedAt: row.StartedAt,
|
|
}
|
|
op.FinishedAt = copyTimePtr(row.FinishedAt)
|
|
return op
|
|
}
|
|
|
|
func modelToPlayerMapping(row model.PlayerMappings) PlayerMapping {
|
|
return PlayerMapping{
|
|
GameID: row.GameID,
|
|
UserID: row.UserID,
|
|
RaceName: row.RaceName,
|
|
EnginePlayerUUID: row.EnginePlayerUUID,
|
|
CreatedAt: row.CreatedAt,
|
|
}
|
|
}
|
|
|
|
// =====================================================================
|
|
// Scalar helpers
|
|
// =====================================================================
|
|
|
|
// nullableString converts a Go string to the `any` form expected by
|
|
// jet INSERT VALUES bindings: an empty string becomes nil so the
|
|
// column receives NULL.
|
|
func nullableString(s string) any {
|
|
if s == "" {
|
|
return nil
|
|
}
|
|
return s
|
|
}
|
|
|
|
// nullableTime mirrors nullableString for *time.Time.
|
|
func nullableTime(t *time.Time) any {
|
|
if t == nil {
|
|
return nil
|
|
}
|
|
return *t
|
|
}
|
|
|
|
// nullableStringSetExpr returns a typed jet expression suitable for
|
|
// UPDATE SET on a nullable text column. The empty string is mapped to
|
|
// SQL NULL, mirroring the INSERT-side semantics so a "" patch clears
|
|
// the column.
|
|
func nullableStringSetExpr(v string) postgres.StringExpression {
|
|
if v == "" {
|
|
return postgres.StringExp(postgres.NULL)
|
|
}
|
|
return postgres.String(v)
|
|
}
|
|
|
|
// timePtrSetExpr mirrors nullableStringSetExpr for *time.Time. nil
|
|
// clears the column; non-nil sets it.
|
|
func timePtrSetExpr(t *time.Time) postgres.TimestampzExpression {
|
|
if t == nil {
|
|
return postgres.TimestampzExp(postgres.NULL)
|
|
}
|
|
return postgres.TimestampzT(*t)
|
|
}
|
|
|
|
func derefString(p *string) string {
|
|
if p == nil {
|
|
return ""
|
|
}
|
|
return *p
|
|
}
|
|
|
|
func copyTimePtr(p *time.Time) *time.Time {
|
|
if p == nil {
|
|
return nil
|
|
}
|
|
t := *p
|
|
return &t
|
|
}
|