// Package connectsrv implements the public Connect edge service over h2c. Execute // rate-limits, authenticates (resolving the Authorization bearer token to a user // id for non-auth operations), and dispatches to the transcode registry; the // domain outcome is carried back in the ExecuteResponse result_code. Subscribe // bridges the gateway push hub to a client server-stream with a keep-alive // heartbeat. package connectsrv import ( "context" "errors" "net" "net/http" "strings" "time" "connectrpc.com/connect" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "scrabble/gateway/internal/backendclient" "scrabble/gateway/internal/config" "scrabble/gateway/internal/push" "scrabble/gateway/internal/ratelimit" "scrabble/gateway/internal/session" "scrabble/gateway/internal/transcode" "scrabble/gateway/internal/webui" edgev1 "scrabble/gateway/proto/edge/v1" "scrabble/gateway/proto/edge/v1/edgev1connect" ) // heartbeatKind is the live-stream keep-alive event kind. const heartbeatKind = "heartbeat" // Limiter classes, the `class` attribute of gateway_rate_limited_total and the // class field of the periodic rejection report (R3). const ( classUser = "user" classPublic = "public" classEmail = "email" classAdmin = "admin" ) // Explicit h2c server sizing (R3, after the R2 stress run questioned the // implicit defaults). const ( // h2cMaxConcurrentStreams bounds the open streams per client connection — the // x/net default made explicit. A real client holds one Subscribe stream plus a // few unary calls; only a synthetic load multiplexing many players over one // transport approaches it. R7 revisits the sizing. h2cMaxConcurrentStreams = 250 // h2cIdleTimeout closes a connection with no open streams. A live Subscribe // stream keeps its connection active, so long-lived clients are unaffected; // only abandoned connections are reaped. h2cIdleTimeout = 3 * time.Minute ) // Server implements edgev1connect.GatewayHandler. type Server struct { registry *transcode.Registry sessions *session.Cache limiter *ratelimit.Limiter tracker *ratelimit.Tracker hub *push.Hub heartbeat time.Duration log *zap.Logger adminProxy http.Handler metrics *serverMetrics maxBodyBytes int publicPolicy ratelimit.Policy userPolicy ratelimit.Policy emailPolicy ratelimit.Policy adminPolicy ratelimit.Policy } // Deps carries the Server's dependencies. A nil Limiter, nil Tracker, zero // RateLimit and non-positive MaxBodyBytes each select a safe default. type Deps struct { Registry *transcode.Registry Sessions *session.Cache Limiter *ratelimit.Limiter // Tracker accumulates limiter rejections for the periodic report; nil // selects a private tracker (rejections are then only counted, never // reported). Tracker *ratelimit.Tracker Hub *push.Hub RateLimit config.RateLimitConfig Heartbeat time.Duration Logger *zap.Logger AdminProxy http.Handler Meter metric.Meter // MaxBodyBytes caps one inbound request body and one Connect message read; // zero or negative selects config.DefaultMaxBodyBytes. MaxBodyBytes int } // NewServer constructs the edge service. func NewServer(d Deps) *Server { log := d.Logger if log == nil { log = zap.NewNop() } maxBody := d.MaxBodyBytes if maxBody <= 0 { maxBody = config.DefaultMaxBodyBytes } tracker := d.Tracker if tracker == nil { tracker = ratelimit.NewTracker() } limiter := d.Limiter if limiter == nil { limiter = ratelimit.New() } rl := d.RateLimit if rl == (config.RateLimitConfig{}) { rl = config.DefaultRateLimit() } return &Server{ registry: d.Registry, sessions: d.Sessions, limiter: limiter, tracker: tracker, hub: d.Hub, heartbeat: d.Heartbeat, log: log, adminProxy: d.AdminProxy, metrics: newServerMetrics(d.Meter), maxBodyBytes: maxBody, publicPolicy: ratelimit.PerMinute(rl.PublicPerMinute, rl.PublicBurst), userPolicy: ratelimit.PerMinute(rl.UserPerMinute, rl.UserBurst), emailPolicy: ratelimit.Per(rl.EmailPer10Min, 10*time.Minute, rl.EmailBurst), adminPolicy: ratelimit.PerMinute(rl.AdminPerMinute, rl.AdminBurst), } } // HTTPHandler returns the h2c-wrapped Connect handler ready to serve. func (s *Server) HTTPHandler() http.Handler { mux := http.NewServeMux() // The Connect read cap mirrors the HTTP-level body cap below; an oversized // Execute message is refused (resource_exhausted) instead of buffered. path, h := edgev1connect.NewGatewayHandler(s, connect.WithReadMaxBytes(s.maxBodyBytes)) mux.Handle(path, h) if s.adminProxy != nil { // The admin console (backend /_gm) is served on the public listener behind // the proxy's Basic-Auth, mounted below the h2c wrap so the Connect edge keeps // working over h2c (docs/ARCHITECTURE.md §12). In the deployed contour the // front caddy owns the /_gm Basic-Auth and Grafana routing; this mount serves // a non-caddy (local) setup. The per-IP admin limiter class guards it — // notably a Basic-Auth brute force (R3). mux.Handle("/_gm/", s.limitAdmin(s.adminProxy)) } else { // With the console disabled here, keep /_gm a 404 so the SPA catch-all below // does not serve the app shell at the operator path. mux.Handle("/_gm/", http.NotFoundHandler()) } // The embedded UI: the game SPA under /app/ (web) and /telegram/ (the Telegram // Mini App) — the single-origin model (docs/ARCHITECTURE.md §13). Both sit below // the h2c wrap so the Connect edge (a more specific prefix) keeps priority, and // each mount falls back to the app shell (index.html) for the hash router. The // public landing moved to its own static container behind the contour caddy // (R3), so the catch-all redirects a stray root hit to the app shell — which // keeps a local no-caddy run usable. mux.Handle("/telegram/", webui.Handler("/telegram/", "index.html")) mux.Handle("/app/", webui.Handler("/app/", "index.html")) mux.Handle("/", http.RedirectHandler("/app/", http.StatusPermanentRedirect)) // Every request body on the public listener is capped (the admin proxy POSTs // included); the h2c server carries explicit stream/idle sizing (R3). return h2c.NewHandler(maxBodyHandler(s.maxBodyBytes, mux), &http2.Server{ MaxConcurrentStreams: h2cMaxConcurrentStreams, IdleTimeout: h2cIdleTimeout, }) } // maxBodyHandler caps every inbound request body at limit bytes: a read past the // cap fails with *http.MaxBytesError and the connection is marked to close. func maxBodyHandler(limit int, next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, int64(limit)) next.ServeHTTP(w, r) }) } // Execute runs one unary operation. Domain failures are returned in the envelope // (result_code != "ok", HTTP 200); only edge failures (rate limit, missing // session, unknown type, internal) become Connect errors. func (s *Server) Execute(ctx context.Context, req *connect.Request[edgev1.ExecuteRequest]) (*connect.Response[edgev1.ExecuteResponse], error) { start := time.Now() msgType := req.Msg.GetMessageType() result := "internal" defer func() { s.metrics.recordEdge(ctx, msgType, result, start) }() op, ok := s.registry.Lookup(msgType) if !ok { result = "unknown_type" return nil, connect.NewError(connect.CodeNotFound, errUnknownMessageType(msgType)) } clientIP := peerIP(req.Peer().Addr, req.Header()) tr := transcode.Request{Payload: req.Msg.GetPayload(), ClientIP: clientIP} if op.Auth { uid, err := s.resolve(ctx, req.Header()) if err != nil { result = "unauthenticated" return nil, err } // A valid session proving an authenticated request is an "action" for the // active_users gauge, counted before the rate-limit/domain outcome. s.metrics.recordActive(uid) if !s.limiter.Allow("user:"+uid, s.userPolicy) { result = "rate_limited" return nil, s.rejectRateLimited(ctx, classUser, uid, msgType) } tr.UserID = uid } else { if !s.limiter.Allow("ip:"+clientIP, s.publicPolicy) { result = "rate_limited" return nil, s.rejectRateLimited(ctx, classPublic, clientIP, msgType) } if op.Email && !s.limiter.Allow("email:"+clientIP, s.emailPolicy) { result = "rate_limited" return nil, s.rejectRateLimited(ctx, classEmail, clientIP, msgType) } } payload, err := op.Handler(ctx, tr) if err != nil { if code, domain := transcode.DomainCode(err); domain { result = "domain" return connect.NewResponse(&edgev1.ExecuteResponse{ RequestId: req.Msg.GetRequestId(), ResultCode: code, }), nil } s.log.Error("execute failed", zap.String("message_type", msgType), zap.Error(err)) return nil, connect.NewError(connect.CodeInternal, errInternal) } result = "ok" return connect.NewResponse(&edgev1.ExecuteResponse{ RequestId: req.Msg.GetRequestId(), ResultCode: "ok", Payload: payload, }), nil } // Subscribe streams the authenticated user's live events with a keep-alive // heartbeat until the client disconnects. func (s *Server) Subscribe(ctx context.Context, req *connect.Request[edgev1.SubscribeRequest], stream *connect.ServerStream[edgev1.Event]) error { uid, err := s.resolve(ctx, req.Header()) if err != nil { return err } if !s.limiter.Allow("user:"+uid, s.userPolicy) { return s.rejectRateLimited(ctx, classUser, uid, "subscribe") } events, cancel := s.hub.Subscribe(uid) defer cancel() // Send an immediate heartbeat so the stream's first byte flushes through the proxy chain // right away and resets edge/client idle timers, instead of the connection sitting silent // until the first tick — which otherwise raced a ~15 s idle timeout and forced a reconnect // every interval (Stage 17). if err := stream.Send(&edgev1.Event{Kind: heartbeatKind}); err != nil { return err } ticker := time.NewTicker(s.heartbeat) defer ticker.Stop() for { select { case <-ctx.Done(): return nil case <-ticker.C: if err := stream.Send(&edgev1.Event{Kind: heartbeatKind}); err != nil { return err } case e, ok := <-events: if !ok { return nil } if err := stream.Send(&edgev1.Event{Kind: e.Kind, Payload: e.Payload, EventId: e.EventID}); err != nil { return err } } } } // noteRateLimited accounts one limiter rejection: the aggregate counter, the // per-rejection Debug line and the periodic-report tracker. The operational // signal is the reporter's Warn summary; per-rejection logging stays at Debug so // a rejection flood cannot flood the log (R3). func (s *Server) noteRateLimited(ctx context.Context, class, key, msgType string) { s.metrics.recordRateLimited(ctx, class) s.tracker.Add(class, key) s.log.Debug("rate limited", zap.String("class", class), zap.String("key", key), zap.String("message_type", msgType)) } // rejectRateLimited accounts one limiter rejection and returns the Connect error // for the caller. func (s *Server) rejectRateLimited(ctx context.Context, class, key, msgType string) error { s.noteRateLimited(ctx, class, key, msgType) return connect.NewError(connect.CodeResourceExhausted, errRateLimited) } // limitAdmin guards the admin proxy with the per-IP admin limiter class, ahead // of its Basic-Auth check (a credential brute force is exactly what it bounds). // It covers the gateway-fronted /_gm mount; in the deployed contour /_gm reaches // the backend through caddy, whose Basic-Auth has no limiter (stock caddy) — see // docs/ARCHITECTURE.md §12 (R3). func (s *Server) limitAdmin(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ip := peerIP(r.RemoteAddr, r.Header) if !s.limiter.Allow("admin:"+ip, s.adminPolicy) { s.noteRateLimited(r.Context(), classAdmin, ip, "admin") http.Error(w, "rate limited", http.StatusTooManyRequests) return } next.ServeHTTP(w, r) }) } // resolve extracts and resolves the Authorization bearer token to an account id, // returning a Connect Unauthenticated error when it is missing or unknown. func (s *Server) resolve(ctx context.Context, h http.Header) (string, error) { token := bearerToken(h.Get("Authorization")) if token == "" { return "", connect.NewError(connect.CodeUnauthenticated, errMissingToken) } uid, err := s.sessions.Resolve(ctx, token) if err != nil { // An unknown or expired token (a backend 4xx) is the client's problem and // stays silent; anything else — a resolve timeout, a refused connection, a // backend 5xx — is an infra failure misread as "unauthenticated" by the // client, so surface the cause (the transient resolves seen under load in // the R2 stress run). The token itself is never logged. var apiErr *backendclient.APIError if !errors.As(err, &apiErr) || apiErr.Status >= http.StatusInternalServerError { s.log.Warn("session resolve failed", zap.Error(err)) } return "", connect.NewError(connect.CodeUnauthenticated, errInvalidSession) } return uid, nil } // bearerToken extracts the token from an "Authorization: Bearer " header, // tolerating a bare token for convenience. func bearerToken(header string) string { header = strings.TrimSpace(header) if header == "" { return "" } if rest, ok := strings.CutPrefix(header, "Bearer "); ok { return strings.TrimSpace(rest) } return header } // peerIP prefers the X-Forwarded-For client hop, falling back to the connection // peer address (host part). func peerIP(peerAddr string, h http.Header) string { if xff := h.Get("X-Forwarded-For"); xff != "" { first, _, _ := strings.Cut(xff, ",") return strings.TrimSpace(first) } if host, _, err := net.SplitHostPort(peerAddr); err == nil { return host } return peerAddr }