package runtime import ( "context" "errors" "sync" "sync/atomic" "go.uber.org/zap" ) // WorkerPool drains long-running runtime jobs (start, stop, restart, // patch). Implements `internal/app.Component` so the App lifecycle // drives Run/Shutdown. type WorkerPool struct { svc *Service jobs chan job stopping atomic.Bool wg sync.WaitGroup } // NewWorkerPool builds a worker pool sized by `cfg.WorkerPoolSize` // with a buffered channel of depth `cfg.JobQueueSize`. func NewWorkerPool(svc *Service) *WorkerPool { return &WorkerPool{ svc: svc, jobs: make(chan job, svc.deps.Config.JobQueueSize), } } // submit places j on the worker channel. Returns ErrJobQueueFull when // the channel is full and ErrShutdown when the pool is stopping. func (w *WorkerPool) submit(ctx context.Context, j job) error { if w == nil || w.stopping.Load() { return ErrShutdown } select { case <-ctx.Done(): return ctx.Err() case w.jobs <- j: return nil default: } // One last attempt with the caller's context; lets a fast worker // pick it up while we wait briefly. select { case <-ctx.Done(): return ctx.Err() case w.jobs <- j: return nil } } // Run starts the configured number of worker goroutines and blocks // until ctx is cancelled. func (w *WorkerPool) Run(ctx context.Context) error { if w == nil { return nil } count := w.svc.deps.Config.WorkerPoolSize if count <= 0 { count = 1 } for i := 0; i < count; i++ { w.wg.Add(1) go w.loop(ctx, i) } <-ctx.Done() return nil } // Shutdown signals the pool to stop accepting new work and waits for // in-flight workers to drain. The provided context bounds the wait; // any worker still running when ctx expires is left to finish on its // own and the pool returns. func (w *WorkerPool) Shutdown(ctx context.Context) error { if w == nil { return nil } if !w.stopping.CompareAndSwap(false, true) { return nil } close(w.jobs) done := make(chan struct{}) go func() { w.wg.Wait() close(done) }() select { case <-done: return nil case <-ctx.Done(): return ctx.Err() } } func (w *WorkerPool) loop(ctx context.Context, idx int) { defer w.wg.Done() logger := w.svc.deps.Logger.With(zap.Int("worker", idx)) for { select { case <-ctx.Done(): return case j, ok := <-w.jobs: if !ok { return } logger.Debug("runtime job picked", zap.String("game_id", j.GameID().String()), zap.String("op", j.Operation().Op), ) if err := j.Run(ctx, w.svc); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } logger.Warn("runtime job failed", zap.String("game_id", j.GameID().String()), zap.String("op", j.Operation().Op), zap.Error(err), ) } } } }