Files
galaxy-game/gateway/internal/backendclient/push_client.go
T
2026-05-06 10:14:55 +03:00

267 lines
6.9 KiB
Go

// PushClient — gateway-side gRPC consumer of `Push.SubscribePush`.
//
// One PushClient is wired for the gateway lifecycle. Run keeps the
// subscription open, reconnects on every transport error with
// exponential backoff (capped at PushReconnectMaxBackoff), and forwards
// every received PushEvent to the configured EventHandler. The cursor
// of the last successfully handled event is remembered in process
// memory only (see `backend/README.md` and `backend/docs/` D2). On reconnect
// it is replayed back to backend so any events still in the freshness-
// window ring are received exactly once.
package backendclient
import (
"context"
"errors"
"fmt"
"io"
"math/rand/v2"
"sync"
"time"
pushv1 "galaxy/backend/proto/push/v1"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)
// EventHandler receives every PushEvent successfully drained from the
// backend stream. Implementations must be concurrency-safe and must not
// block; PushClient owns the calling goroutine and waits for Handle to
// return before reading the next event.
type EventHandler interface {
Handle(context.Context, *pushv1.PushEvent)
}
// EventHandlerFunc adapts a plain function to the EventHandler
// contract.
type EventHandlerFunc func(context.Context, *pushv1.PushEvent)
// Handle implements EventHandler.
func (f EventHandlerFunc) Handle(ctx context.Context, ev *pushv1.PushEvent) { f(ctx, ev) }
// PushClient is the gRPC adapter that owns the long-lived
// SubscribePush stream.
type PushClient struct {
cfg Config
dialOpts []grpc.DialOption
clock func() time.Time
sleep func(context.Context, time.Duration) error
logger *zap.Logger
handler EventHandler
mu sync.Mutex
cursor string
connMu sync.Mutex
conn *grpc.ClientConn
}
// NewPushClient constructs a PushClient. The default dial uses
// transport credentials INSECURE; deployments behind TLS must wrap the
// returned client with an alternative DialOption set via
// WithDialOptions before calling Run.
func NewPushClient(cfg Config) (*PushClient, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
return &PushClient{
cfg: cfg,
dialOpts: []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
},
clock: time.Now,
sleep: defaultSleep,
logger: zap.NewNop(),
}, nil
}
// WithDialOptions overrides the default dial options used when opening
// the gRPC connection. Tests typically pass `grpc.WithContextDialer` so
// `grpc.NewClient` connects to a `bufconn` listener.
func (c *PushClient) WithDialOptions(opts ...grpc.DialOption) *PushClient {
if c == nil {
return nil
}
c.dialOpts = append([]grpc.DialOption(nil), opts...)
return c
}
// WithLogger replaces the structured logger.
func (c *PushClient) WithLogger(logger *zap.Logger) *PushClient {
if c == nil {
return nil
}
if logger == nil {
logger = zap.NewNop()
}
c.logger = logger.Named("push_client")
return c
}
// WithHandler installs the EventHandler. Run returns an error if no
// handler has been installed.
func (c *PushClient) WithHandler(handler EventHandler) *PushClient {
if c == nil {
return nil
}
c.handler = handler
return c
}
// Cursor returns the cursor of the last event delivered to the handler.
// Useful for tests and operator inspection. Returns the empty string
// before any event has been processed.
func (c *PushClient) Cursor() string {
if c == nil {
return ""
}
c.mu.Lock()
defer c.mu.Unlock()
return c.cursor
}
// Run opens the SubscribePush stream and forwards events until ctx is
// cancelled. Network errors are retried with exponential backoff up to
// PushReconnectMaxBackoff; ctx cancellation is the only terminal exit.
func (c *PushClient) Run(ctx context.Context) error {
if c == nil {
return errors.New("backendclient.PushClient.Run: nil client")
}
if ctx == nil {
return errors.New("backendclient.PushClient.Run: nil context")
}
if c.handler == nil {
return errors.New("backendclient.PushClient.Run: handler is required")
}
conn, err := grpc.NewClient(c.cfg.GRPCPushURL, c.dialOpts...)
if err != nil {
return fmt.Errorf("backendclient.PushClient.Run: dial backend push: %w", err)
}
c.connMu.Lock()
c.conn = conn
c.connMu.Unlock()
defer func() {
c.connMu.Lock()
_ = c.conn.Close()
c.conn = nil
c.connMu.Unlock()
}()
pushAPI := pushv1.NewPushClient(conn)
backoff := c.cfg.PushReconnectBaseBackoff
for {
if err := ctx.Err(); err != nil {
return err
}
err := c.runOnce(ctx, pushAPI)
switch {
case err == nil, errors.Is(err, context.Canceled):
return ctx.Err()
case status.Code(err) == codes.Aborted:
c.logger.Info("backend replaced push subscription; reconnecting")
case errors.Is(err, io.EOF):
c.logger.Info("backend push stream closed; reconnecting")
default:
c.logger.Warn("backend push stream error; reconnecting",
zap.Error(err),
zap.Duration("backoff", backoff),
)
}
if err := c.sleep(ctx, jitter(backoff)); err != nil {
return err
}
backoff = nextBackoff(backoff, c.cfg.PushReconnectMaxBackoff)
}
}
// Shutdown is a no-op kept for `app.Component` compatibility. The
// SubscribePush call exits when its parent context is cancelled.
func (c *PushClient) Shutdown(_ context.Context) error { return nil }
// Close closes the underlying gRPC connection if it is open. Idempotent.
func (c *PushClient) Close() error {
if c == nil {
return nil
}
c.connMu.Lock()
defer c.connMu.Unlock()
if c.conn == nil {
return nil
}
err := c.conn.Close()
c.conn = nil
return err
}
func (c *PushClient) runOnce(ctx context.Context, pushAPI pushv1.PushClient) error {
stream, err := pushAPI.SubscribePush(ctx, &pushv1.GatewaySubscribeRequest{
GatewayClientId: c.cfg.GatewayClientID,
Cursor: c.Cursor(),
})
if err != nil {
return fmt.Errorf("subscribe push: %w", err)
}
for {
ev, err := stream.Recv()
if err != nil {
return err
}
c.handler.Handle(ctx, ev)
if cursor := ev.GetCursor(); cursor != "" {
c.setCursor(cursor)
}
}
}
func (c *PushClient) setCursor(cursor string) {
c.mu.Lock()
c.cursor = cursor
c.mu.Unlock()
}
func nextBackoff(current, max time.Duration) time.Duration {
doubled := current * 2
if doubled > max {
return max
}
if doubled <= 0 {
return max
}
return doubled
}
// jitter returns d with ±20% multiplicative noise so multiple gateway
// instances do not retry in lockstep after a backend restart.
func jitter(d time.Duration) time.Duration {
if d <= 0 {
return d
}
noise := 1 + (rand.Float64()-0.5)*0.4
return time.Duration(float64(d) * noise)
}
func defaultSleep(ctx context.Context, d time.Duration) error {
if d <= 0 {
return nil
}
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}