179 lines
4.4 KiB
Go
179 lines
4.4 KiB
Go
// Package app wires the gateway process lifecycle and coordinates component
|
|
// startup and graceful shutdown.
|
|
package app
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"galaxy/gateway/internal/config"
|
|
)
|
|
|
|
// Component is a long-lived gateway subsystem that participates in coordinated
|
|
// startup and graceful shutdown.
|
|
type Component interface {
|
|
// Run starts the component and blocks until it stops.
|
|
Run(context.Context) error
|
|
|
|
// Shutdown stops the component within the provided timeout-bounded context.
|
|
Shutdown(context.Context) error
|
|
}
|
|
|
|
// App owns the process-level lifecycle of the gateway and its registered
|
|
// components.
|
|
type App struct {
|
|
cfg config.Config
|
|
components []Component
|
|
}
|
|
|
|
// New constructs an App with a defensive copy of the supplied components.
|
|
func New(cfg config.Config, components ...Component) *App {
|
|
clonedComponents := append([]Component(nil), components...)
|
|
|
|
return &App{
|
|
cfg: cfg,
|
|
components: clonedComponents,
|
|
}
|
|
}
|
|
|
|
// Run starts all configured components, waits for cancellation or the first
|
|
// component failure, and then executes best-effort graceful shutdown for every
|
|
// component.
|
|
func (a *App) Run(ctx context.Context) error {
|
|
if ctx == nil {
|
|
return errors.New("run gateway app: nil context")
|
|
}
|
|
if err := a.validate(); err != nil {
|
|
return err
|
|
}
|
|
if len(a.components) == 0 {
|
|
<-ctx.Done()
|
|
return nil
|
|
}
|
|
|
|
runCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
results := make(chan componentResult, len(a.components))
|
|
var runWG sync.WaitGroup
|
|
|
|
for idx, component := range a.components {
|
|
runWG.Add(1)
|
|
|
|
go func(index int, component Component) {
|
|
defer runWG.Done()
|
|
results <- componentResult{
|
|
index: index,
|
|
err: component.Run(runCtx),
|
|
}
|
|
}(idx, component)
|
|
}
|
|
|
|
var runErr error
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
case result := <-results:
|
|
runErr = classifyComponentResult(ctx, result)
|
|
}
|
|
|
|
cancel()
|
|
|
|
shutdownErr := a.shutdownComponents()
|
|
waitErr := a.waitForComponents(&runWG)
|
|
|
|
return errors.Join(runErr, shutdownErr, waitErr)
|
|
}
|
|
|
|
// componentResult captures the first observed exit from a running component.
|
|
type componentResult struct {
|
|
index int
|
|
err error
|
|
}
|
|
|
|
// validate confirms that the App has a safe shutdown budget and no nil
|
|
// components before goroutines are started.
|
|
func (a *App) validate() error {
|
|
if a.cfg.ShutdownTimeout <= 0 {
|
|
return fmt.Errorf("run gateway app: shutdown timeout must be positive, got %s", a.cfg.ShutdownTimeout)
|
|
}
|
|
|
|
for idx, component := range a.components {
|
|
if component == nil {
|
|
return fmt.Errorf("run gateway app: component %d is nil", idx)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// classifyComponentResult maps the first component exit into the error that
|
|
// should control the application result.
|
|
func classifyComponentResult(parentCtx context.Context, result componentResult) error {
|
|
switch {
|
|
case result.err == nil:
|
|
if parentCtx.Err() != nil {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("run gateway app: component %d exited without error before shutdown", result.index)
|
|
case errors.Is(result.err, context.Canceled) && parentCtx.Err() != nil:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("run gateway app: component %d: %w", result.index, result.err)
|
|
}
|
|
}
|
|
|
|
// shutdownComponents calls Shutdown on every registered component using a fresh
|
|
// timeout-bounded context per component and joins any shutdown failures.
|
|
func (a *App) shutdownComponents() error {
|
|
var shutdownWG sync.WaitGroup
|
|
errs := make(chan error, len(a.components))
|
|
|
|
for idx, component := range a.components {
|
|
shutdownWG.Add(1)
|
|
|
|
go func(index int, component Component) {
|
|
defer shutdownWG.Done()
|
|
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), a.cfg.ShutdownTimeout)
|
|
defer cancel()
|
|
|
|
if err := component.Shutdown(shutdownCtx); err != nil {
|
|
errs <- fmt.Errorf("shutdown gateway component %d: %w", index, err)
|
|
}
|
|
}(idx, component)
|
|
}
|
|
|
|
shutdownWG.Wait()
|
|
close(errs)
|
|
|
|
var joined error
|
|
for err := range errs {
|
|
joined = errors.Join(joined, err)
|
|
}
|
|
|
|
return joined
|
|
}
|
|
|
|
// waitForComponents waits for running components to return after shutdown and
|
|
// reports when they outlive the configured shutdown budget.
|
|
func (a *App) waitForComponents(runWG *sync.WaitGroup) error {
|
|
done := make(chan struct{})
|
|
go func() {
|
|
runWG.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
waitCtx, cancel := context.WithTimeout(context.Background(), a.cfg.ShutdownTimeout)
|
|
defer cancel()
|
|
|
|
select {
|
|
case <-done:
|
|
return nil
|
|
case <-waitCtx.Done():
|
|
return fmt.Errorf("wait for gateway components: %w", waitCtx.Err())
|
|
}
|
|
}
|