118f7c17a2
Replace the native-gRPC server bootstrap with a single `connectrpc.com/connect` HTTP/h2c listener. Connect-Go natively serves Connect, gRPC, and gRPC-Web on the same port, so browsers can now reach the authenticated surface without giving up the gRPC framing native and desktop clients may use later. The decorator stack (envelope → session → payload-hash → signature → freshness/replay → rate-limit → routing/push) is reused unchanged behind a small Connect → gRPC adapter and a `grpc.ServerStream` shim around `*connect.ServerStream`. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
274 lines
8.1 KiB
Go
274 lines
8.1 KiB
Go
// Package grpcapi exposes the authenticated edge transport surface of the
|
|
// gateway. Despite the historical package name, the listener is built on
|
|
// `connectrpc.com/connect` and natively serves the Connect, gRPC, and
|
|
// gRPC-Web protocols on a single HTTP/h2c listener. The configured Go
|
|
// types and environment variable names retain the `gRPC` infix for
|
|
// operational stability — they describe the authenticated edge tier, not
|
|
// the wire protocol.
|
|
package grpcapi
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
|
|
"galaxy/gateway/authn"
|
|
"galaxy/gateway/internal/clock"
|
|
"galaxy/gateway/internal/config"
|
|
"galaxy/gateway/internal/downstream"
|
|
"galaxy/gateway/internal/push"
|
|
"galaxy/gateway/internal/ratelimit"
|
|
"galaxy/gateway/internal/replay"
|
|
"galaxy/gateway/internal/session"
|
|
"galaxy/gateway/internal/telemetry"
|
|
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
|
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
|
|
|
|
"connectrpc.com/connect"
|
|
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/net/http2"
|
|
"golang.org/x/net/http2/h2c"
|
|
)
|
|
|
|
// ServerDependencies describes the optional collaborators used by the
|
|
// authenticated edge server. The zero value is valid and keeps the process
|
|
// runnable with the built-in unimplemented service stub.
|
|
type ServerDependencies struct {
|
|
// Service optionally handles the post-bootstrap SubscribeEvents lifecycle
|
|
// after the initial authenticated service event has been sent. When nil, the
|
|
// gateway keeps authenticated SubscribeEvents streams open until the client
|
|
// cancels them, the server shuts down, or a later stream send fails.
|
|
Service gatewayv1.EdgeGatewayServer
|
|
|
|
// Router resolves the exact downstream unary client for the verified
|
|
// message_type value. When nil, the authenticated unary surface uses an
|
|
// empty exact-match router and returns UNIMPLEMENTED for unrouted commands.
|
|
Router downstream.Router
|
|
|
|
// ResponseSigner signs authenticated unary responses after downstream
|
|
// execution succeeds. When nil, the unary surface fails closed once it needs
|
|
// to sign a routed response.
|
|
ResponseSigner authn.ResponseSigner
|
|
|
|
// SessionCache resolves authenticated device sessions after the envelope
|
|
// gate succeeds. When nil, the authenticated edge surface remains runnable
|
|
// but valid envelopes fail closed as session-cache unavailable.
|
|
SessionCache session.Cache
|
|
|
|
// Clock provides current server time for freshness checks. When nil, the
|
|
// authenticated edge surface uses the system clock.
|
|
Clock clock.Clock
|
|
|
|
// ReplayStore reserves authenticated request identifiers after signature
|
|
// verification. When nil, valid requests fail closed as replay-store
|
|
// unavailable.
|
|
ReplayStore replay.Store
|
|
|
|
// Limiter applies authenticated rate limits after the request passes the
|
|
// transport authenticity checks. When nil, the authenticated edge surface
|
|
// uses a process-local in-memory limiter.
|
|
Limiter AuthenticatedRequestLimiter
|
|
|
|
// Policy evaluates later authenticated edge policy after rate limits pass.
|
|
// When nil, the authenticated edge surface applies a no-op allow policy.
|
|
Policy AuthenticatedRequestPolicy
|
|
|
|
// Logger writes structured logs for authenticated edge traffic.
|
|
Logger *zap.Logger
|
|
|
|
// Telemetry records low-cardinality edge metrics.
|
|
Telemetry *telemetry.Runtime
|
|
|
|
// PushHub is the active authenticated push-stream hub. When present, the
|
|
// server closes active streams before HTTP graceful shutdown.
|
|
PushHub *push.Hub
|
|
}
|
|
|
|
// Server owns the authenticated edge HTTP/h2c listener exposed by the
|
|
// gateway. It serves the Connect, gRPC, and gRPC-Web protocols from a
|
|
// single net/http listener.
|
|
type Server struct {
|
|
cfg config.AuthenticatedGRPCConfig
|
|
service gatewayv1.EdgeGatewayServer
|
|
logger *zap.Logger
|
|
pushHub *push.Hub
|
|
metrics *telemetry.Runtime
|
|
|
|
stateMu sync.RWMutex
|
|
server *http.Server
|
|
listener net.Listener
|
|
}
|
|
|
|
// NewServer constructs an authenticated edge server for the supplied listener
|
|
// configuration and dependency bundle. Nil dependencies are replaced with safe
|
|
// defaults so the gateway can expose the documented transport surface with the
|
|
// full auth pipeline wired from built-in fallbacks.
|
|
func NewServer(cfg config.AuthenticatedGRPCConfig, deps ServerDependencies) *Server {
|
|
deps = normalizeServerDependencies(deps)
|
|
|
|
finalService := newCommandRoutingService(
|
|
newAuthenticatedPushStreamService(deps.Service, deps.ResponseSigner, deps.Clock),
|
|
deps.Router,
|
|
deps.ResponseSigner,
|
|
deps.Clock,
|
|
cfg.DownstreamTimeout,
|
|
)
|
|
|
|
return &Server{
|
|
cfg: cfg,
|
|
service: newEnvelopeValidatingService(
|
|
newSessionLookupService(
|
|
newPayloadHashVerifyingService(
|
|
newSignatureVerifyingService(
|
|
newFreshnessAndReplayService(
|
|
newAuthenticatedRateLimitService(
|
|
finalService,
|
|
deps.Limiter,
|
|
deps.Policy,
|
|
cfg.AntiAbuse,
|
|
),
|
|
deps.Clock,
|
|
deps.ReplayStore,
|
|
cfg.FreshnessWindow,
|
|
),
|
|
),
|
|
),
|
|
deps.SessionCache,
|
|
),
|
|
),
|
|
logger: deps.Logger.Named("authenticated_edge"),
|
|
pushHub: deps.PushHub,
|
|
metrics: deps.Telemetry,
|
|
}
|
|
}
|
|
|
|
// Run binds the configured listener and serves the authenticated edge
|
|
// surface until Shutdown closes the server.
|
|
func (s *Server) Run(ctx context.Context) error {
|
|
if ctx == nil {
|
|
return errors.New("run authenticated edge server: nil context")
|
|
}
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
listener, err := net.Listen("tcp", s.cfg.Addr)
|
|
if err != nil {
|
|
return fmt.Errorf("run authenticated edge server: listen on %q: %w", s.cfg.Addr, err)
|
|
}
|
|
|
|
mux := http.NewServeMux()
|
|
connectHandler := newConnectEdgeAdapter(s.service)
|
|
path, handler := gatewayv1connect.NewEdgeGatewayHandler(
|
|
connectHandler,
|
|
connect.WithInterceptors(observabilityConnectInterceptor(s.logger, s.metrics)),
|
|
)
|
|
mux.Handle(path, handler)
|
|
|
|
tracedHandler := otelhttp.NewHandler(mux, "authenticated_edge")
|
|
http2Server := &http2.Server{IdleTimeout: s.cfg.ConnectionTimeout}
|
|
httpServer := &http.Server{
|
|
Handler: h2c.NewHandler(tracedHandler, http2Server),
|
|
ReadHeaderTimeout: s.cfg.ConnectionTimeout,
|
|
}
|
|
|
|
s.stateMu.Lock()
|
|
s.server = httpServer
|
|
s.listener = listener
|
|
s.stateMu.Unlock()
|
|
|
|
s.logger.Info("authenticated edge server started", zap.String("addr", listener.Addr().String()))
|
|
|
|
defer func() {
|
|
s.stateMu.Lock()
|
|
s.server = nil
|
|
s.listener = nil
|
|
s.stateMu.Unlock()
|
|
}()
|
|
|
|
err = httpServer.Serve(listener)
|
|
switch {
|
|
case err == nil, errors.Is(err, http.ErrServerClosed):
|
|
s.logger.Info("authenticated edge server stopped")
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("run authenticated edge server: serve on %q: %w", s.cfg.Addr, err)
|
|
}
|
|
}
|
|
|
|
// Shutdown gracefully stops the authenticated edge server within ctx. When the
|
|
// graceful stop exceeds ctx, the server is force-closed before returning the
|
|
// timeout to the caller.
|
|
func (s *Server) Shutdown(ctx context.Context) error {
|
|
if ctx == nil {
|
|
return errors.New("shutdown authenticated edge server: nil context")
|
|
}
|
|
|
|
s.stateMu.RLock()
|
|
server := s.server
|
|
s.stateMu.RUnlock()
|
|
|
|
if server == nil {
|
|
return nil
|
|
}
|
|
|
|
if s.pushHub != nil {
|
|
s.pushHub.Shutdown()
|
|
}
|
|
|
|
err := server.Shutdown(ctx)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
|
_ = server.Close()
|
|
return fmt.Errorf("shutdown authenticated edge server: %w", err)
|
|
}
|
|
|
|
return fmt.Errorf("shutdown authenticated edge server: %w", err)
|
|
}
|
|
|
|
func (s *Server) listenAddr() string {
|
|
s.stateMu.RLock()
|
|
defer s.stateMu.RUnlock()
|
|
|
|
if s.listener == nil {
|
|
return ""
|
|
}
|
|
|
|
return s.listener.Addr().String()
|
|
}
|
|
|
|
func normalizeServerDependencies(deps ServerDependencies) ServerDependencies {
|
|
if deps.Router == nil {
|
|
deps.Router = downstream.NewStaticRouter(nil)
|
|
}
|
|
if deps.ResponseSigner == nil {
|
|
deps.ResponseSigner = unavailableResponseSigner{}
|
|
}
|
|
if deps.SessionCache == nil {
|
|
deps.SessionCache = unavailableSessionCache{}
|
|
}
|
|
if deps.Clock == nil {
|
|
deps.Clock = clock.System{}
|
|
}
|
|
if deps.ReplayStore == nil {
|
|
deps.ReplayStore = unavailableReplayStore{}
|
|
}
|
|
if deps.Limiter == nil {
|
|
deps.Limiter = ratelimit.NewInMemory()
|
|
}
|
|
if deps.Policy == nil {
|
|
deps.Policy = noopAuthenticatedRequestPolicy{}
|
|
}
|
|
if deps.Logger == nil {
|
|
deps.Logger = zap.NewNop()
|
|
}
|
|
|
|
return deps
|
|
}
|