Files
galaxy-game/gateway/internal/grpcapi/server.go
T
Ilia Denisov 8565942392
Build · Site / build (push) Successful in 8s
Tests · Go / test (push) Successful in 2m22s
Tests · UI / test (push) Failing after 2m42s
feat(deploy): single-origin path-based deployment + project site
Serve the whole stack behind one host: site at /, game UI at /game/,
gateway REST at /api + /healthz, Connect at /rpc (prefix stripped by the
edge Caddy). The built artifact is domain-agnostic — the UI talks to the
gateway same-origin via relative URLs, so the same bundle runs under any
host with no rebuild and with CORS disabled.

- Rename the Connect proto service galaxy.gateway.v1.EdgeGateway ->
  edge.v1.Gateway; regenerate Go + TS; public path /rpc/edge.v1.Gateway.
- Move the game UI under base path /game (env BASE_PATH); make the
  manifest, service-worker scope, WASM loader, and all navigation
  base-aware via a withBase helper.
- Relative API + /rpc Connect prefix; Vite dev proxy mirrors the strip.
- Rewrite the edge Caddy (dev + prod) for path-based routing; empty CORS
  allow-lists (same-origin); single host.
- New VitePress project site (site/): i18n en/ru with switcher, LaTeX
  math, minimal monospace theme; built and served at /.
- dev-deploy compose/Makefile + CI (dev-deploy, prod-build, new
  site-build) build and seed the site; probes hit /, /game/, /healthz.
- Sync docs (ARCHITECTURE, gateway README/openapi, dev-deploy &
  local-dev READMEs, CLAUDE.md, ui/PLAN).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:19:07 +02:00

283 lines
8.3 KiB
Go

// Package grpcapi exposes the authenticated edge transport surface of the
// gateway. Despite the historical package name, the listener is built on
// `connectrpc.com/connect` and natively serves the Connect, gRPC, and
// gRPC-Web protocols on a single HTTP/h2c listener. The configured Go
// types and environment variable names retain the `gRPC` infix for
// operational stability — they describe the authenticated edge tier, not
// the wire protocol.
package grpcapi
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"
"galaxy/gateway/authn"
"galaxy/gateway/internal/clock"
"galaxy/gateway/internal/config"
"galaxy/gateway/internal/downstream"
"galaxy/gateway/internal/push"
"galaxy/gateway/internal/ratelimit"
"galaxy/gateway/internal/replay"
"galaxy/gateway/internal/session"
"galaxy/gateway/internal/telemetry"
edgev1 "galaxy/gateway/proto/edge/v1"
"galaxy/gateway/proto/edge/v1/edgev1connect"
"connectrpc.com/connect"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.uber.org/zap"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
// ServerDependencies describes the optional collaborators used by the
// authenticated edge server. The zero value is valid and keeps the process
// runnable with the built-in unimplemented service stub.
type ServerDependencies struct {
// Service optionally handles the post-bootstrap SubscribeEvents lifecycle
// after the initial authenticated service event has been sent. When nil, the
// gateway keeps authenticated SubscribeEvents streams open until the client
// cancels them, the server shuts down, or a later stream send fails.
Service edgev1.GatewayServer
// Router resolves the exact downstream unary client for the verified
// message_type value. When nil, the authenticated unary surface uses an
// empty exact-match router and returns UNIMPLEMENTED for unrouted commands.
Router downstream.Router
// ResponseSigner signs authenticated unary responses after downstream
// execution succeeds. When nil, the unary surface fails closed once it needs
// to sign a routed response.
ResponseSigner authn.ResponseSigner
// SessionCache resolves authenticated device sessions after the envelope
// gate succeeds. When nil, the authenticated edge surface remains runnable
// but valid envelopes fail closed as session-cache unavailable.
SessionCache session.Cache
// Clock provides current server time for freshness checks. When nil, the
// authenticated edge surface uses the system clock.
Clock clock.Clock
// ReplayStore reserves authenticated request identifiers after signature
// verification. When nil, valid requests fail closed as replay-store
// unavailable.
ReplayStore replay.Store
// Limiter applies authenticated rate limits after the request passes the
// transport authenticity checks. When nil, the authenticated edge surface
// uses a process-local in-memory limiter.
Limiter AuthenticatedRequestLimiter
// Policy evaluates later authenticated edge policy after rate limits pass.
// When nil, the authenticated edge surface applies a no-op allow policy.
Policy AuthenticatedRequestPolicy
// Logger writes structured logs for authenticated edge traffic.
Logger *zap.Logger
// Telemetry records low-cardinality edge metrics.
Telemetry *telemetry.Runtime
// PushHub is the active authenticated push-stream hub. When present, the
// server closes active streams before HTTP graceful shutdown.
PushHub *push.Hub
}
// Server owns the authenticated edge HTTP/h2c listener exposed by the
// gateway. It serves the Connect, gRPC, and gRPC-Web protocols from a
// single net/http listener.
type Server struct {
cfg config.AuthenticatedGRPCConfig
service edgev1.GatewayServer
logger *zap.Logger
pushHub *push.Hub
metrics *telemetry.Runtime
stateMu sync.RWMutex
server *http.Server
listener net.Listener
}
// NewServer constructs an authenticated edge server for the supplied listener
// configuration and dependency bundle. Nil dependencies are replaced with safe
// defaults so the gateway can expose the documented transport surface with the
// full auth pipeline wired from built-in fallbacks.
func NewServer(cfg config.AuthenticatedGRPCConfig, deps ServerDependencies) *Server {
deps = normalizeServerDependencies(deps)
finalService := newCommandRoutingService(
newAuthenticatedPushStreamService(
deps.Service,
deps.ResponseSigner,
deps.Clock,
cfg.PushHeartbeatInterval,
deps.Telemetry,
),
deps.Router,
deps.ResponseSigner,
deps.Clock,
cfg.DownstreamTimeout,
)
return &Server{
cfg: cfg,
service: newEnvelopeValidatingService(
newSessionLookupService(
newPayloadHashVerifyingService(
newSignatureVerifyingService(
newFreshnessAndReplayService(
newAuthenticatedRateLimitService(
finalService,
deps.Limiter,
deps.Policy,
cfg.AntiAbuse,
),
deps.Clock,
deps.ReplayStore,
cfg.FreshnessWindow,
),
),
),
deps.SessionCache,
),
),
logger: deps.Logger.Named("authenticated_edge"),
pushHub: deps.PushHub,
metrics: deps.Telemetry,
}
}
// Run binds the configured listener and serves the authenticated edge
// surface until Shutdown closes the server.
func (s *Server) Run(ctx context.Context) error {
if ctx == nil {
return errors.New("run authenticated edge server: nil context")
}
if err := ctx.Err(); err != nil {
return err
}
listener, err := net.Listen("tcp", s.cfg.Addr)
if err != nil {
return fmt.Errorf("run authenticated edge server: listen on %q: %w", s.cfg.Addr, err)
}
mux := http.NewServeMux()
connectHandler := newConnectEdgeAdapter(s.service)
path, handler := edgev1connect.NewGatewayHandler(
connectHandler,
connect.WithInterceptors(observabilityConnectInterceptor(s.logger, s.metrics)),
)
mux.Handle(path, handler)
// CORS runs OUTSIDE the otelhttp wrapper so preflight OPTIONS calls
// answer with 204 immediately and never enter the trace path.
corsMux := withCORS(s.cfg.CORSAllowedOrigins, mux)
tracedHandler := otelhttp.NewHandler(corsMux, "authenticated_edge")
http2Server := &http2.Server{IdleTimeout: s.cfg.ConnectionTimeout}
httpServer := &http.Server{
Handler: h2c.NewHandler(tracedHandler, http2Server),
ReadHeaderTimeout: s.cfg.ConnectionTimeout,
}
s.stateMu.Lock()
s.server = httpServer
s.listener = listener
s.stateMu.Unlock()
s.logger.Info("authenticated edge server started", zap.String("addr", listener.Addr().String()))
defer func() {
s.stateMu.Lock()
s.server = nil
s.listener = nil
s.stateMu.Unlock()
}()
err = httpServer.Serve(listener)
switch {
case err == nil, errors.Is(err, http.ErrServerClosed):
s.logger.Info("authenticated edge server stopped")
return nil
default:
return fmt.Errorf("run authenticated edge server: serve on %q: %w", s.cfg.Addr, err)
}
}
// Shutdown gracefully stops the authenticated edge server within ctx. When the
// graceful stop exceeds ctx, the server is force-closed before returning the
// timeout to the caller.
func (s *Server) Shutdown(ctx context.Context) error {
if ctx == nil {
return errors.New("shutdown authenticated edge server: nil context")
}
s.stateMu.RLock()
server := s.server
s.stateMu.RUnlock()
if server == nil {
return nil
}
if s.pushHub != nil {
s.pushHub.Shutdown()
}
err := server.Shutdown(ctx)
if err == nil {
return nil
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
_ = server.Close()
return fmt.Errorf("shutdown authenticated edge server: %w", err)
}
return fmt.Errorf("shutdown authenticated edge server: %w", err)
}
func (s *Server) listenAddr() string {
s.stateMu.RLock()
defer s.stateMu.RUnlock()
if s.listener == nil {
return ""
}
return s.listener.Addr().String()
}
func normalizeServerDependencies(deps ServerDependencies) ServerDependencies {
if deps.Router == nil {
deps.Router = downstream.NewStaticRouter(nil)
}
if deps.ResponseSigner == nil {
deps.ResponseSigner = unavailableResponseSigner{}
}
if deps.SessionCache == nil {
deps.SessionCache = unavailableSessionCache{}
}
if deps.Clock == nil {
deps.Clock = clock.System{}
}
if deps.ReplayStore == nil {
deps.ReplayStore = unavailableReplayStore{}
}
if deps.Limiter == nil {
deps.Limiter = ratelimit.NewInMemory()
}
if deps.Policy == nil {
deps.Policy = noopAuthenticatedRequestPolicy{}
}
if deps.Logger == nil {
deps.Logger = zap.NewNop()
}
return deps
}