// Package push hosts the backend gRPC listener used by gateway. // // Server owns the TCP listener and gRPC machinery. Service implements // the PushServer interface and is registered against the gRPC server // before Serve begins. On shutdown the server signals the service to // drop its subscriptions, then performs the usual GracefulStop / // forced-stop sequence. package push import ( "context" "errors" "fmt" "net" "sync" "galaxy/backend/internal/config" "galaxy/backend/internal/telemetry" pushv1 "galaxy/backend/proto/push/v1" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/zap" "google.golang.org/grpc" ) // Server owns the gRPC push listener. type Server struct { cfg config.GRPCPushConfig svc *Service logger *zap.Logger runtime *telemetry.Runtime stateMu sync.RWMutex server *grpc.Server listener net.Listener } // NewServer constructs a gRPC push server bound to cfg. svc must not be // nil; it is registered as the pushv1.PushServer implementation when // Run starts. func NewServer(cfg config.GRPCPushConfig, svc *Service, logger *zap.Logger, runtime *telemetry.Runtime) *Server { if logger == nil { logger = zap.NewNop() } return &Server{ cfg: cfg, svc: svc, logger: logger.Named("grpc_push"), runtime: runtime, } } // Run binds the listener and serves the gRPC surface until Shutdown closes // the server. func (s *Server) Run(ctx context.Context) error { if ctx == nil { return errors.New("run backend gRPC push server: nil context") } if err := ctx.Err(); err != nil { return err } if s.svc == nil { return errors.New("run backend gRPC push server: nil service") } listener, err := net.Listen("tcp", s.cfg.Addr) if err != nil { return fmt.Errorf("run backend gRPC push server: listen on %q: %w", s.cfg.Addr, err) } grpcServer := grpc.NewServer( grpc.StatsHandler(otelgrpc.NewServerHandler()), ) pushv1.RegisterPushServer(grpcServer, s.svc) s.stateMu.Lock() s.server = grpcServer s.listener = listener s.stateMu.Unlock() s.logger.Info("backend gRPC push server started", zap.String("addr", listener.Addr().String())) defer func() { s.stateMu.Lock() s.server = nil s.listener = nil s.stateMu.Unlock() }() err = grpcServer.Serve(listener) switch { case err == nil: return nil case errors.Is(err, grpc.ErrServerStopped): s.logger.Info("backend gRPC push server stopped") return nil default: return fmt.Errorf("run backend gRPC push server: serve on %q: %w", s.cfg.Addr, err) } } // Shutdown attempts a graceful stop within ctx, falling back to a forced stop // when ctx expires before GracefulStop returns. The configured per-listener // timeout further bounds the wait. Active SubscribePush streams are closed // first so GracefulStop is not blocked by long-lived server-streaming RPCs. func (s *Server) Shutdown(ctx context.Context) error { if ctx == nil { return errors.New("shutdown backend gRPC push server: nil context") } s.stateMu.RLock() server := s.server s.stateMu.RUnlock() if server == nil { return nil } if s.svc != nil { s.svc.Close() } shutdownCtx, cancel := context.WithCancel(ctx) defer cancel() if s.cfg.ShutdownTimeout > 0 { shutdownCtx, cancel = context.WithTimeout(ctx, s.cfg.ShutdownTimeout) defer cancel() } stopped := make(chan struct{}) go func() { server.GracefulStop() close(stopped) }() select { case <-stopped: return nil case <-shutdownCtx.Done(): server.Stop() <-stopped return fmt.Errorf("shutdown backend gRPC push server: %w", shutdownCtx.Err()) } }