Files
galaxy-game/gateway/internal/grpcapi/server.go
T
Ilia Denisov 14b65389ef
Tests · UI / test (push) Successful in 2m35s
Tests · Go / test (push) Successful in 1m56s
Tests · UI / test (pull_request) Has been cancelled
Tests · Integration / integration (pull_request) Successful in 1m42s
Tests · Go / test (pull_request) Successful in 2m0s
feat(gateway): unsigned gateway.heartbeat keeps Safari push streams alive
Browser fetch-streaming layers close response bodies they consider
idle after roughly 15-30 s without incoming bytes. Safari is the
most aggressive, but the symptom matters everywhere: a quiet
SubscribeEvents stream (lobby, between turns, mailbox empty) gets
torn down by the browser, the EventStream singleton reconnects with
backoff, and any push event that fires inside the reconnect window
is lost because `push.Hub` queues are not persisted across
subscription closes. The user-visible failure mode is the
intermittent "Fetch API cannot load … due to access control checks"
console error (a misleading WebKit symptom — CORS headers are
actually present) plus missed turn-ready / mail-received toasts.

Server-side fix: a silence-based heartbeat at the
`authenticatedPushStreamService` wrapper layer. After the signed
`gateway.server_time` bootstrap event, gateway wraps the bound
stream with `heartbeatingStream`. Every tail Send (fan-out, future
variants) resets the silence timer; when the timer elapses, a
goroutine emits `gateway.heartbeat` with only `EventType` set —
everything else stays at proto3 defaults, so the wire frame is
~45 bytes amortised. A `sendMu` serialises the heartbeat goroutine
with tail Sends because grpc.ServerStream.Send is not goroutine-safe.

The heartbeat is intentionally UNSIGNED: heartbeats carry no
payload, dispatch to no handler on the client, and an injected
heartbeat trivially causes no user-visible state change. TLS still
protects the wire and real events keep the signed envelope
unchanged. Documented in `docs/ARCHITECTURE.md` § 15 alongside the
per-scale bandwidth projection (100…100 000 clients × 15…60 s).

Config: new `GATEWAY_PUSH_HEARTBEAT_INTERVAL` (default `15s`,
`0s` disables). Telemetry: new
`gateway.push.heartbeats_sent{outcome}` counter so operators can
budget bandwidth and spot a sudden `outcome=error` bump as an
upstream-failing-before-flush signal.

Client (`ui/frontend/src/api/events.svelte.ts`): early `continue`
on `event.eventType === "gateway.heartbeat"` before `verifyEvent`,
`verifyPayloadHash`, or dispatch — empty signature would otherwise
trip SignatureError and reconnect. A leading heartbeat still flips
`connectionStatus` to `connected` and resets backoff, because
receiving one is proof the stream is healthy.

Tests:
- `push_heartbeat_test.go`: unit tests for the wrapper — zero
  interval returns nil, heartbeat fires after silence, real Send
  resets the timer, Stop / context-cancel halt the goroutine,
  Send errors propagate.
- `server_test.go`: integration tests through the full gateway
  pipeline — heartbeat fires after the configured silence window,
  zero interval keeps the stream silent.
- `config_test.go`: default applied, env-override parsed,
  negative value rejected.
- `events.test.ts`: heartbeat skipped before verification + not
  dispatched to handlers; leading heartbeat still flips
  `connectionStatus` to `connected`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 09:29:29 +02:00

283 lines
8.4 KiB
Go

// Package grpcapi exposes the authenticated edge transport surface of the
// gateway. Despite the historical package name, the listener is built on
// `connectrpc.com/connect` and natively serves the Connect, gRPC, and
// gRPC-Web protocols on a single HTTP/h2c listener. The configured Go
// types and environment variable names retain the `gRPC` infix for
// operational stability — they describe the authenticated edge tier, not
// the wire protocol.
package grpcapi
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"
"galaxy/gateway/authn"
"galaxy/gateway/internal/clock"
"galaxy/gateway/internal/config"
"galaxy/gateway/internal/downstream"
"galaxy/gateway/internal/push"
"galaxy/gateway/internal/ratelimit"
"galaxy/gateway/internal/replay"
"galaxy/gateway/internal/session"
"galaxy/gateway/internal/telemetry"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
"connectrpc.com/connect"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.uber.org/zap"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
// ServerDependencies describes the optional collaborators used by the
// authenticated edge server. The zero value is valid and keeps the process
// runnable with the built-in unimplemented service stub.
type ServerDependencies struct {
// Service optionally handles the post-bootstrap SubscribeEvents lifecycle
// after the initial authenticated service event has been sent. When nil, the
// gateway keeps authenticated SubscribeEvents streams open until the client
// cancels them, the server shuts down, or a later stream send fails.
Service gatewayv1.EdgeGatewayServer
// Router resolves the exact downstream unary client for the verified
// message_type value. When nil, the authenticated unary surface uses an
// empty exact-match router and returns UNIMPLEMENTED for unrouted commands.
Router downstream.Router
// ResponseSigner signs authenticated unary responses after downstream
// execution succeeds. When nil, the unary surface fails closed once it needs
// to sign a routed response.
ResponseSigner authn.ResponseSigner
// SessionCache resolves authenticated device sessions after the envelope
// gate succeeds. When nil, the authenticated edge surface remains runnable
// but valid envelopes fail closed as session-cache unavailable.
SessionCache session.Cache
// Clock provides current server time for freshness checks. When nil, the
// authenticated edge surface uses the system clock.
Clock clock.Clock
// ReplayStore reserves authenticated request identifiers after signature
// verification. When nil, valid requests fail closed as replay-store
// unavailable.
ReplayStore replay.Store
// Limiter applies authenticated rate limits after the request passes the
// transport authenticity checks. When nil, the authenticated edge surface
// uses a process-local in-memory limiter.
Limiter AuthenticatedRequestLimiter
// Policy evaluates later authenticated edge policy after rate limits pass.
// When nil, the authenticated edge surface applies a no-op allow policy.
Policy AuthenticatedRequestPolicy
// Logger writes structured logs for authenticated edge traffic.
Logger *zap.Logger
// Telemetry records low-cardinality edge metrics.
Telemetry *telemetry.Runtime
// PushHub is the active authenticated push-stream hub. When present, the
// server closes active streams before HTTP graceful shutdown.
PushHub *push.Hub
}
// Server owns the authenticated edge HTTP/h2c listener exposed by the
// gateway. It serves the Connect, gRPC, and gRPC-Web protocols from a
// single net/http listener.
type Server struct {
cfg config.AuthenticatedGRPCConfig
service gatewayv1.EdgeGatewayServer
logger *zap.Logger
pushHub *push.Hub
metrics *telemetry.Runtime
stateMu sync.RWMutex
server *http.Server
listener net.Listener
}
// NewServer constructs an authenticated edge server for the supplied listener
// configuration and dependency bundle. Nil dependencies are replaced with safe
// defaults so the gateway can expose the documented transport surface with the
// full auth pipeline wired from built-in fallbacks.
func NewServer(cfg config.AuthenticatedGRPCConfig, deps ServerDependencies) *Server {
deps = normalizeServerDependencies(deps)
finalService := newCommandRoutingService(
newAuthenticatedPushStreamService(
deps.Service,
deps.ResponseSigner,
deps.Clock,
cfg.PushHeartbeatInterval,
deps.Telemetry,
),
deps.Router,
deps.ResponseSigner,
deps.Clock,
cfg.DownstreamTimeout,
)
return &Server{
cfg: cfg,
service: newEnvelopeValidatingService(
newSessionLookupService(
newPayloadHashVerifyingService(
newSignatureVerifyingService(
newFreshnessAndReplayService(
newAuthenticatedRateLimitService(
finalService,
deps.Limiter,
deps.Policy,
cfg.AntiAbuse,
),
deps.Clock,
deps.ReplayStore,
cfg.FreshnessWindow,
),
),
),
deps.SessionCache,
),
),
logger: deps.Logger.Named("authenticated_edge"),
pushHub: deps.PushHub,
metrics: deps.Telemetry,
}
}
// Run binds the configured listener and serves the authenticated edge
// surface until Shutdown closes the server.
func (s *Server) Run(ctx context.Context) error {
if ctx == nil {
return errors.New("run authenticated edge server: nil context")
}
if err := ctx.Err(); err != nil {
return err
}
listener, err := net.Listen("tcp", s.cfg.Addr)
if err != nil {
return fmt.Errorf("run authenticated edge server: listen on %q: %w", s.cfg.Addr, err)
}
mux := http.NewServeMux()
connectHandler := newConnectEdgeAdapter(s.service)
path, handler := gatewayv1connect.NewEdgeGatewayHandler(
connectHandler,
connect.WithInterceptors(observabilityConnectInterceptor(s.logger, s.metrics)),
)
mux.Handle(path, handler)
// CORS runs OUTSIDE the otelhttp wrapper so preflight OPTIONS calls
// answer with 204 immediately and never enter the trace path.
corsMux := withCORS(s.cfg.CORSAllowedOrigins, mux)
tracedHandler := otelhttp.NewHandler(corsMux, "authenticated_edge")
http2Server := &http2.Server{IdleTimeout: s.cfg.ConnectionTimeout}
httpServer := &http.Server{
Handler: h2c.NewHandler(tracedHandler, http2Server),
ReadHeaderTimeout: s.cfg.ConnectionTimeout,
}
s.stateMu.Lock()
s.server = httpServer
s.listener = listener
s.stateMu.Unlock()
s.logger.Info("authenticated edge server started", zap.String("addr", listener.Addr().String()))
defer func() {
s.stateMu.Lock()
s.server = nil
s.listener = nil
s.stateMu.Unlock()
}()
err = httpServer.Serve(listener)
switch {
case err == nil, errors.Is(err, http.ErrServerClosed):
s.logger.Info("authenticated edge server stopped")
return nil
default:
return fmt.Errorf("run authenticated edge server: serve on %q: %w", s.cfg.Addr, err)
}
}
// Shutdown gracefully stops the authenticated edge server within ctx. When the
// graceful stop exceeds ctx, the server is force-closed before returning the
// timeout to the caller.
func (s *Server) Shutdown(ctx context.Context) error {
if ctx == nil {
return errors.New("shutdown authenticated edge server: nil context")
}
s.stateMu.RLock()
server := s.server
s.stateMu.RUnlock()
if server == nil {
return nil
}
if s.pushHub != nil {
s.pushHub.Shutdown()
}
err := server.Shutdown(ctx)
if err == nil {
return nil
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
_ = server.Close()
return fmt.Errorf("shutdown authenticated edge server: %w", err)
}
return fmt.Errorf("shutdown authenticated edge server: %w", err)
}
func (s *Server) listenAddr() string {
s.stateMu.RLock()
defer s.stateMu.RUnlock()
if s.listener == nil {
return ""
}
return s.listener.Addr().String()
}
func normalizeServerDependencies(deps ServerDependencies) ServerDependencies {
if deps.Router == nil {
deps.Router = downstream.NewStaticRouter(nil)
}
if deps.ResponseSigner == nil {
deps.ResponseSigner = unavailableResponseSigner{}
}
if deps.SessionCache == nil {
deps.SessionCache = unavailableSessionCache{}
}
if deps.Clock == nil {
deps.Clock = clock.System{}
}
if deps.ReplayStore == nil {
deps.ReplayStore = unavailableReplayStore{}
}
if deps.Limiter == nil {
deps.Limiter = ratelimit.NewInMemory()
}
if deps.Policy == nil {
deps.Policy = noopAuthenticatedRequestPolicy{}
}
if deps.Logger == nil {
deps.Logger = zap.NewNop()
}
return deps
}