143 lines
4.4 KiB
Go
143 lines
4.4 KiB
Go
package lobby
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Sweeper is the periodic lobby maintenance worker. Each tick it
|
|
// releases expired `pending_registration` race-name rows and
|
|
// auto-closes enrollment windows whose `enrollment_ends_at` has passed.
|
|
//
|
|
// Implements `internal/app.Component`. The sweeper Run loop terminates
|
|
// on the parent context cancellation; Shutdown is a no-op because
|
|
// every tick already completes synchronously inside Run.
|
|
type Sweeper struct {
|
|
svc *Service
|
|
interval time.Duration
|
|
logger *zap.Logger
|
|
now func() time.Time
|
|
}
|
|
|
|
// NewSweeper constructs the sweeper. The interval falls back to the
|
|
// service config when zero.
|
|
func NewSweeper(svc *Service) *Sweeper {
|
|
cfg := svc.Config()
|
|
return &Sweeper{
|
|
svc: svc,
|
|
interval: cfg.SweeperInterval,
|
|
logger: svc.Logger().Named("sweeper"),
|
|
now: svc.deps.Now,
|
|
}
|
|
}
|
|
|
|
// Run drives the sweeper goroutine until ctx is done.
|
|
func (s *Sweeper) Run(ctx context.Context) error {
|
|
ticker := time.NewTicker(s.interval)
|
|
defer ticker.Stop()
|
|
|
|
// Run one tick immediately so a fresh process catches up on missed
|
|
// work without waiting for the first interval. Tests rely on this
|
|
// for deterministic e2e flows.
|
|
if err := s.tick(ctx); err != nil {
|
|
s.logger.Warn("lobby sweeper tick failed", zap.Error(err))
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
if err := s.tick(ctx); err != nil {
|
|
s.logger.Warn("lobby sweeper tick failed", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Shutdown is a no-op: every tick is synchronous inside Run.
|
|
func (s *Sweeper) Shutdown(_ context.Context) error { return nil }
|
|
|
|
// Tick runs a single sweep iteration. Exposed for tests so they can
|
|
// drive the sweeper without timing dependencies.
|
|
func (s *Sweeper) Tick(ctx context.Context) error { return s.tick(ctx) }
|
|
|
|
func (s *Sweeper) tick(ctx context.Context) error {
|
|
now := s.now().UTC()
|
|
releaseErr := s.releaseExpiredPending(ctx, now)
|
|
closeErr := s.autoCloseEnrollment(ctx, now)
|
|
return errors.Join(releaseErr, closeErr)
|
|
}
|
|
|
|
func (s *Sweeper) releaseExpiredPending(ctx context.Context, now time.Time) error {
|
|
rows, err := s.svc.deps.Store.ListPendingRegistrationsExpired(ctx, now)
|
|
if err != nil {
|
|
return fmt.Errorf("lobby sweeper: list expired pending: %w", err)
|
|
}
|
|
var errs []error
|
|
for _, row := range rows {
|
|
if err := s.svc.deps.Store.DeleteRaceName(ctx, row.Canonical, row.GameID); err != nil {
|
|
errs = append(errs, fmt.Errorf("delete pending %s: %w", row.Canonical, err))
|
|
continue
|
|
}
|
|
s.svc.deps.Cache.RemoveRaceName(row.Canonical)
|
|
intent := LobbyNotification{
|
|
Kind: NotificationLobbyRaceNameExpired,
|
|
IdempotencyKey: "racename-expired:" + string(row.Canonical) + ":" + row.GameID.String(),
|
|
Recipients: []uuid.UUID{row.OwnerUserID},
|
|
Payload: map[string]any{
|
|
"race_name": row.Name,
|
|
},
|
|
}
|
|
if pubErr := s.svc.deps.Notification.PublishLobbyEvent(ctx, intent); pubErr != nil {
|
|
s.logger.Warn("expired notification failed",
|
|
zap.String("canonical", string(row.Canonical)),
|
|
zap.Error(pubErr))
|
|
}
|
|
}
|
|
return errors.Join(errs...)
|
|
}
|
|
|
|
func (s *Sweeper) autoCloseEnrollment(ctx context.Context, now time.Time) error {
|
|
games, err := s.svc.deps.Store.ListEnrollmentExpiredGames(ctx, now)
|
|
if err != nil {
|
|
return fmt.Errorf("lobby sweeper: list expired enrollments: %w", err)
|
|
}
|
|
var errs []error
|
|
for _, game := range games {
|
|
active, err := s.svc.deps.Store.CountActiveMemberships(ctx, game.GameID)
|
|
if err != nil {
|
|
errs = append(errs, fmt.Errorf("count memberships %s: %w", game.GameID, err))
|
|
continue
|
|
}
|
|
if int32(active) < game.MinPlayers {
|
|
// Below quorum — leave the game in enrollment_open. Admins
|
|
// can extend `enrollment_ends_at` or cancel manually.
|
|
s.logger.Debug("enrollment expired below quorum, leaving",
|
|
zap.String("game_id", game.GameID.String()),
|
|
zap.Int32("min_players", game.MinPlayers),
|
|
zap.Int("active", active))
|
|
continue
|
|
}
|
|
updated, err := s.svc.deps.Store.UpdateGameStatus(ctx, game.GameID, statusUpdate{
|
|
NewStatus: GameStatusReadyToStart,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
errs = append(errs, fmt.Errorf("transition %s to ready_to_start: %w", game.GameID, err))
|
|
continue
|
|
}
|
|
s.svc.deps.Cache.PutGame(updated)
|
|
s.logger.Info("enrollment auto-closed",
|
|
zap.String("game_id", game.GameID.String()),
|
|
zap.Int32("min_players", game.MinPlayers),
|
|
zap.Int("active", active))
|
|
}
|
|
return errors.Join(errs...)
|
|
}
|