// Package grpcapi exposes the authenticated edge transport surface of the // gateway. Despite the historical package name, the listener is built on // `connectrpc.com/connect` and natively serves the Connect, gRPC, and // gRPC-Web protocols on a single HTTP/h2c listener. The configured Go // types and environment variable names retain the `gRPC` infix for // operational stability — they describe the authenticated edge tier, not // the wire protocol. package grpcapi import ( "context" "errors" "fmt" "net" "net/http" "sync" "galaxy/gateway/authn" "galaxy/gateway/internal/clock" "galaxy/gateway/internal/config" "galaxy/gateway/internal/downstream" "galaxy/gateway/internal/push" "galaxy/gateway/internal/ratelimit" "galaxy/gateway/internal/replay" "galaxy/gateway/internal/session" "galaxy/gateway/internal/telemetry" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" "galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect" "connectrpc.com/connect" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.uber.org/zap" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" ) // ServerDependencies describes the optional collaborators used by the // authenticated edge server. The zero value is valid and keeps the process // runnable with the built-in unimplemented service stub. type ServerDependencies struct { // Service optionally handles the post-bootstrap SubscribeEvents lifecycle // after the initial authenticated service event has been sent. When nil, the // gateway keeps authenticated SubscribeEvents streams open until the client // cancels them, the server shuts down, or a later stream send fails. Service gatewayv1.EdgeGatewayServer // Router resolves the exact downstream unary client for the verified // message_type value. When nil, the authenticated unary surface uses an // empty exact-match router and returns UNIMPLEMENTED for unrouted commands. Router downstream.Router // ResponseSigner signs authenticated unary responses after downstream // execution succeeds. When nil, the unary surface fails closed once it needs // to sign a routed response. ResponseSigner authn.ResponseSigner // SessionCache resolves authenticated device sessions after the envelope // gate succeeds. When nil, the authenticated edge surface remains runnable // but valid envelopes fail closed as session-cache unavailable. SessionCache session.Cache // Clock provides current server time for freshness checks. When nil, the // authenticated edge surface uses the system clock. Clock clock.Clock // ReplayStore reserves authenticated request identifiers after signature // verification. When nil, valid requests fail closed as replay-store // unavailable. ReplayStore replay.Store // Limiter applies authenticated rate limits after the request passes the // transport authenticity checks. When nil, the authenticated edge surface // uses a process-local in-memory limiter. Limiter AuthenticatedRequestLimiter // Policy evaluates later authenticated edge policy after rate limits pass. // When nil, the authenticated edge surface applies a no-op allow policy. Policy AuthenticatedRequestPolicy // Logger writes structured logs for authenticated edge traffic. Logger *zap.Logger // Telemetry records low-cardinality edge metrics. Telemetry *telemetry.Runtime // PushHub is the active authenticated push-stream hub. When present, the // server closes active streams before HTTP graceful shutdown. PushHub *push.Hub } // Server owns the authenticated edge HTTP/h2c listener exposed by the // gateway. It serves the Connect, gRPC, and gRPC-Web protocols from a // single net/http listener. type Server struct { cfg config.AuthenticatedGRPCConfig service gatewayv1.EdgeGatewayServer logger *zap.Logger pushHub *push.Hub metrics *telemetry.Runtime stateMu sync.RWMutex server *http.Server listener net.Listener } // NewServer constructs an authenticated edge server for the supplied listener // configuration and dependency bundle. Nil dependencies are replaced with safe // defaults so the gateway can expose the documented transport surface with the // full auth pipeline wired from built-in fallbacks. func NewServer(cfg config.AuthenticatedGRPCConfig, deps ServerDependencies) *Server { deps = normalizeServerDependencies(deps) finalService := newCommandRoutingService( newAuthenticatedPushStreamService(deps.Service, deps.ResponseSigner, deps.Clock), deps.Router, deps.ResponseSigner, deps.Clock, cfg.DownstreamTimeout, ) return &Server{ cfg: cfg, service: newEnvelopeValidatingService( newSessionLookupService( newPayloadHashVerifyingService( newSignatureVerifyingService( newFreshnessAndReplayService( newAuthenticatedRateLimitService( finalService, deps.Limiter, deps.Policy, cfg.AntiAbuse, ), deps.Clock, deps.ReplayStore, cfg.FreshnessWindow, ), ), ), deps.SessionCache, ), ), logger: deps.Logger.Named("authenticated_edge"), pushHub: deps.PushHub, metrics: deps.Telemetry, } } // Run binds the configured listener and serves the authenticated edge // surface until Shutdown closes the server. func (s *Server) Run(ctx context.Context) error { if ctx == nil { return errors.New("run authenticated edge server: nil context") } if err := ctx.Err(); err != nil { return err } listener, err := net.Listen("tcp", s.cfg.Addr) if err != nil { return fmt.Errorf("run authenticated edge server: listen on %q: %w", s.cfg.Addr, err) } mux := http.NewServeMux() connectHandler := newConnectEdgeAdapter(s.service) path, handler := gatewayv1connect.NewEdgeGatewayHandler( connectHandler, connect.WithInterceptors(observabilityConnectInterceptor(s.logger, s.metrics)), ) mux.Handle(path, handler) tracedHandler := otelhttp.NewHandler(mux, "authenticated_edge") http2Server := &http2.Server{IdleTimeout: s.cfg.ConnectionTimeout} httpServer := &http.Server{ Handler: h2c.NewHandler(tracedHandler, http2Server), ReadHeaderTimeout: s.cfg.ConnectionTimeout, } s.stateMu.Lock() s.server = httpServer s.listener = listener s.stateMu.Unlock() s.logger.Info("authenticated edge server started", zap.String("addr", listener.Addr().String())) defer func() { s.stateMu.Lock() s.server = nil s.listener = nil s.stateMu.Unlock() }() err = httpServer.Serve(listener) switch { case err == nil, errors.Is(err, http.ErrServerClosed): s.logger.Info("authenticated edge server stopped") return nil default: return fmt.Errorf("run authenticated edge server: serve on %q: %w", s.cfg.Addr, err) } } // Shutdown gracefully stops the authenticated edge server within ctx. When the // graceful stop exceeds ctx, the server is force-closed before returning the // timeout to the caller. func (s *Server) Shutdown(ctx context.Context) error { if ctx == nil { return errors.New("shutdown authenticated edge server: nil context") } s.stateMu.RLock() server := s.server s.stateMu.RUnlock() if server == nil { return nil } if s.pushHub != nil { s.pushHub.Shutdown() } err := server.Shutdown(ctx) if err == nil { return nil } if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { _ = server.Close() return fmt.Errorf("shutdown authenticated edge server: %w", err) } return fmt.Errorf("shutdown authenticated edge server: %w", err) } func (s *Server) listenAddr() string { s.stateMu.RLock() defer s.stateMu.RUnlock() if s.listener == nil { return "" } return s.listener.Addr().String() } func normalizeServerDependencies(deps ServerDependencies) ServerDependencies { if deps.Router == nil { deps.Router = downstream.NewStaticRouter(nil) } if deps.ResponseSigner == nil { deps.ResponseSigner = unavailableResponseSigner{} } if deps.SessionCache == nil { deps.SessionCache = unavailableSessionCache{} } if deps.Clock == nil { deps.Clock = clock.System{} } if deps.ReplayStore == nil { deps.ReplayStore = unavailableReplayStore{} } if deps.Limiter == nil { deps.Limiter = ratelimit.NewInMemory() } if deps.Policy == nil { deps.Policy = noopAuthenticatedRequestPolicy{} } if deps.Logger == nil { deps.Logger = zap.NewNop() } return deps }