Files
galaxy-game/backend/internal/runtime/workers.go
T
2026-05-06 10:14:55 +03:00

125 lines
2.7 KiB
Go

package runtime
import (
"context"
"errors"
"sync"
"sync/atomic"
"go.uber.org/zap"
)
// WorkerPool drains long-running runtime jobs (start, stop, restart,
// patch). Implements `internal/app.Component` so the App lifecycle
// drives Run/Shutdown.
type WorkerPool struct {
svc *Service
jobs chan job
stopping atomic.Bool
wg sync.WaitGroup
}
// NewWorkerPool builds a worker pool sized by `cfg.WorkerPoolSize`
// with a buffered channel of depth `cfg.JobQueueSize`.
func NewWorkerPool(svc *Service) *WorkerPool {
return &WorkerPool{
svc: svc,
jobs: make(chan job, svc.deps.Config.JobQueueSize),
}
}
// submit places j on the worker channel. Returns ErrJobQueueFull when
// the channel is full and ErrShutdown when the pool is stopping.
func (w *WorkerPool) submit(ctx context.Context, j job) error {
if w == nil || w.stopping.Load() {
return ErrShutdown
}
select {
case <-ctx.Done():
return ctx.Err()
case w.jobs <- j:
return nil
default:
}
// One last attempt with the caller's context; lets a fast worker
// pick it up while we wait briefly.
select {
case <-ctx.Done():
return ctx.Err()
case w.jobs <- j:
return nil
}
}
// Run starts the configured number of worker goroutines and blocks
// until ctx is cancelled.
func (w *WorkerPool) Run(ctx context.Context) error {
if w == nil {
return nil
}
count := w.svc.deps.Config.WorkerPoolSize
if count <= 0 {
count = 1
}
for i := 0; i < count; i++ {
w.wg.Add(1)
go w.loop(ctx, i)
}
<-ctx.Done()
return nil
}
// Shutdown signals the pool to stop accepting new work and waits for
// in-flight workers to drain. The provided context bounds the wait;
// any worker still running when ctx expires is left to finish on its
// own and the pool returns.
func (w *WorkerPool) Shutdown(ctx context.Context) error {
if w == nil {
return nil
}
if !w.stopping.CompareAndSwap(false, true) {
return nil
}
close(w.jobs)
done := make(chan struct{})
go func() {
w.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (w *WorkerPool) loop(ctx context.Context, idx int) {
defer w.wg.Done()
logger := w.svc.deps.Logger.With(zap.Int("worker", idx))
for {
select {
case <-ctx.Done():
return
case j, ok := <-w.jobs:
if !ok {
return
}
logger.Debug("runtime job picked",
zap.String("game_id", j.GameID().String()),
zap.String("op", j.Operation().Op),
)
if err := j.Run(ctx, w.svc); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
logger.Warn("runtime job failed",
zap.String("game_id", j.GameID().String()),
zap.String("op", j.Operation().Op),
zap.Error(err),
)
}
}
}
}