// PushClient — gateway-side gRPC consumer of `Push.SubscribePush`. // // One PushClient is wired for the gateway lifecycle. Run keeps the // subscription open, reconnects on every transport error with // exponential backoff (capped at PushReconnectMaxBackoff), and forwards // every received PushEvent to the configured EventHandler. The cursor // of the last successfully handled event is remembered in process // memory only (see `backend/README.md` and `backend/docs/` D2). On reconnect // it is replayed back to backend so any events still in the freshness- // window ring are received exactly once. package backendclient import ( "context" "errors" "fmt" "io" "math/rand/v2" "sync" "time" pushv1 "galaxy/backend/proto/push/v1" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" ) // EventHandler receives every PushEvent successfully drained from the // backend stream. Implementations must be concurrency-safe and must not // block; PushClient owns the calling goroutine and waits for Handle to // return before reading the next event. type EventHandler interface { Handle(context.Context, *pushv1.PushEvent) } // EventHandlerFunc adapts a plain function to the EventHandler // contract. type EventHandlerFunc func(context.Context, *pushv1.PushEvent) // Handle implements EventHandler. func (f EventHandlerFunc) Handle(ctx context.Context, ev *pushv1.PushEvent) { f(ctx, ev) } // PushClient is the gRPC adapter that owns the long-lived // SubscribePush stream. type PushClient struct { cfg Config dialOpts []grpc.DialOption clock func() time.Time sleep func(context.Context, time.Duration) error logger *zap.Logger handler EventHandler mu sync.Mutex cursor string connMu sync.Mutex conn *grpc.ClientConn } // NewPushClient constructs a PushClient. The default dial uses // transport credentials INSECURE; deployments behind TLS must wrap the // returned client with an alternative DialOption set via // WithDialOptions before calling Run. func NewPushClient(cfg Config) (*PushClient, error) { if err := cfg.Validate(); err != nil { return nil, err } return &PushClient{ cfg: cfg, dialOpts: []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(otelgrpc.NewClientHandler()), }, clock: time.Now, sleep: defaultSleep, logger: zap.NewNop(), }, nil } // WithDialOptions overrides the default dial options used when opening // the gRPC connection. Tests typically pass `grpc.WithContextDialer` so // `grpc.NewClient` connects to a `bufconn` listener. func (c *PushClient) WithDialOptions(opts ...grpc.DialOption) *PushClient { if c == nil { return nil } c.dialOpts = append([]grpc.DialOption(nil), opts...) return c } // WithLogger replaces the structured logger. func (c *PushClient) WithLogger(logger *zap.Logger) *PushClient { if c == nil { return nil } if logger == nil { logger = zap.NewNop() } c.logger = logger.Named("push_client") return c } // WithHandler installs the EventHandler. Run returns an error if no // handler has been installed. func (c *PushClient) WithHandler(handler EventHandler) *PushClient { if c == nil { return nil } c.handler = handler return c } // Cursor returns the cursor of the last event delivered to the handler. // Useful for tests and operator inspection. Returns the empty string // before any event has been processed. func (c *PushClient) Cursor() string { if c == nil { return "" } c.mu.Lock() defer c.mu.Unlock() return c.cursor } // Run opens the SubscribePush stream and forwards events until ctx is // cancelled. Network errors are retried with exponential backoff up to // PushReconnectMaxBackoff; ctx cancellation is the only terminal exit. func (c *PushClient) Run(ctx context.Context) error { if c == nil { return errors.New("backendclient.PushClient.Run: nil client") } if ctx == nil { return errors.New("backendclient.PushClient.Run: nil context") } if c.handler == nil { return errors.New("backendclient.PushClient.Run: handler is required") } conn, err := grpc.NewClient(c.cfg.GRPCPushURL, c.dialOpts...) if err != nil { return fmt.Errorf("backendclient.PushClient.Run: dial backend push: %w", err) } c.connMu.Lock() c.conn = conn c.connMu.Unlock() defer func() { c.connMu.Lock() _ = c.conn.Close() c.conn = nil c.connMu.Unlock() }() pushAPI := pushv1.NewPushClient(conn) backoff := c.cfg.PushReconnectBaseBackoff for { if err := ctx.Err(); err != nil { return err } err := c.runOnce(ctx, pushAPI) switch { case err == nil, errors.Is(err, context.Canceled): return ctx.Err() case status.Code(err) == codes.Aborted: c.logger.Info("backend replaced push subscription; reconnecting") case errors.Is(err, io.EOF): c.logger.Info("backend push stream closed; reconnecting") default: c.logger.Warn("backend push stream error; reconnecting", zap.Error(err), zap.Duration("backoff", backoff), ) } if err := c.sleep(ctx, jitter(backoff)); err != nil { return err } backoff = nextBackoff(backoff, c.cfg.PushReconnectMaxBackoff) } } // Shutdown is a no-op kept for `app.Component` compatibility. The // SubscribePush call exits when its parent context is cancelled. func (c *PushClient) Shutdown(_ context.Context) error { return nil } // Close closes the underlying gRPC connection if it is open. Idempotent. func (c *PushClient) Close() error { if c == nil { return nil } c.connMu.Lock() defer c.connMu.Unlock() if c.conn == nil { return nil } err := c.conn.Close() c.conn = nil return err } func (c *PushClient) runOnce(ctx context.Context, pushAPI pushv1.PushClient) error { stream, err := pushAPI.SubscribePush(ctx, &pushv1.GatewaySubscribeRequest{ GatewayClientId: c.cfg.GatewayClientID, Cursor: c.Cursor(), }) if err != nil { return fmt.Errorf("subscribe push: %w", err) } for { ev, err := stream.Recv() if err != nil { return err } c.handler.Handle(ctx, ev) if cursor := ev.GetCursor(); cursor != "" { c.setCursor(cursor) } } } func (c *PushClient) setCursor(cursor string) { c.mu.Lock() c.cursor = cursor c.mu.Unlock() } func nextBackoff(current, max time.Duration) time.Duration { doubled := current * 2 if doubled > max { return max } if doubled <= 0 { return max } return doubled } // jitter returns d with ±20% multiplicative noise so multiple gateway // instances do not retry in lockstep after a backend restart. func jitter(d time.Duration) time.Duration { if d <= 0 { return d } noise := 1 + (rand.Float64()-0.5)*0.4 return time.Duration(float64(d) * noise) } func defaultSleep(ctx context.Context, d time.Duration) error { if d <= 0 { return nil } timer := time.NewTimer(d) defer timer.Stop() select { case <-ctx.Done(): return ctx.Err() case <-timer.C: return nil } }