R3: gateway edge hardening — body cap, h2c sizing, rate-limit observability

- GATEWAY_MAX_BODY_BYTES (1 MiB): connect WithReadMaxBytes + http.MaxBytesReader
  on the public mux; explicit http2.Server MaxConcurrentStreams/IdleTimeout and
  an http.Server ReadHeaderTimeout (R2 report follow-up).
- gateway_rate_limited_total{class} counter, Debug per rejection, a rejection
  tracker drained every 30 s into a Warn summary per key and a report POST to
  /api/v1/internal/ratelimit/report (feeds the admin view + auto-flag).
- The dead AdminPerMinute/AdminBurst policy now guards the /_gm mount (429),
  ahead of its Basic-Auth.
- resolve() logs the cause of infra session-resolve failures at Warn (the
  transient unauthenticated dips from the R2 run); unknown tokens stay silent.
This commit is contained in:
Ilia Denisov
2026-06-10 01:58:48 +02:00
parent c23ac94c4e
commit 8878711cf3
12 changed files with 549 additions and 35 deletions
+14 -3
View File
@@ -24,8 +24,9 @@ var activeUserWindows = []struct {
// serverMetrics holds the edge's operational instruments. It defaults to no-ops;
// NewServer installs the real meter when one is supplied in Deps.
type serverMetrics struct {
edge metric.Float64Histogram
active *activeUsers
edge metric.Float64Histogram
rateLimited metric.Int64Counter
active *activeUsers
}
// newServerMetrics builds the instruments on meter (nil selects a no-op meter),
@@ -42,7 +43,12 @@ func newServerMetrics(meter metric.Meter) *serverMetrics {
if err != nil {
h, _ = noop.NewMeterProvider().Meter(meterName).Float64Histogram("edge_request_duration")
}
m := &serverMetrics{edge: h, active: newActiveUsers()}
c, err := meter.Int64Counter("gateway_rate_limited_total",
metric.WithDescription("Rate-limiter rejections at the edge, by limiter class (user, public, email or admin) — aggregate only, no per-user attributes."))
if err != nil {
c, _ = noop.NewMeterProvider().Meter(meterName).Int64Counter("gateway_rate_limited_total")
}
m := &serverMetrics{edge: h, rateLimited: c, active: newActiveUsers()}
gauge, err := meter.Int64ObservableGauge("active_users",
metric.WithDescription("Distinct accounts that performed an authenticated action within the window (in-memory, single gateway instance)."))
@@ -75,3 +81,8 @@ func (m *serverMetrics) recordEdge(ctx context.Context, msgType, result string,
func (m *serverMetrics) recordActive(uid string) {
m.active.seen(uid)
}
// recordRateLimited counts one limiter rejection under class.
func (m *serverMetrics) recordRateLimited(ctx context.Context, class string) {
m.rateLimited.Add(ctx, 1, metric.WithAttributes(attribute.String("class", class)))
}
@@ -52,3 +52,41 @@ func TestEdgeMetric(t *testing.T) {
t.Errorf("edge auth.guest/domain = %d, want 1", got)
}
}
// TestRateLimitedMetric records limiter rejections through a manual reader and
// asserts gateway_rate_limited_total splits by class.
func TestRateLimitedMetric(t *testing.T) {
ctx := context.Background()
reader := sdkmetric.NewManualReader()
meter := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)).Meter("test")
m := newServerMetrics(meter)
m.recordRateLimited(ctx, "user")
m.recordRateLimited(ctx, "user")
m.recordRateLimited(ctx, "public")
var rm metricdata.ResourceMetrics
if err := reader.Collect(ctx, &rm); err != nil {
t.Fatalf("collect: %v", err)
}
counts := map[string]int64{}
for _, sm := range rm.ScopeMetrics {
for _, md := range sm.Metrics {
if md.Name != "gateway_rate_limited_total" {
continue
}
sum, ok := md.Data.(metricdata.Sum[int64])
if !ok {
t.Fatalf("gateway_rate_limited_total is not an int64 sum")
}
for _, dp := range sum.DataPoints {
class, _ := dp.Attributes.Value(attribute.Key("class"))
counts[class.AsString()] += dp.Value
}
}
}
if counts["user"] != 2 || counts["public"] != 1 {
t.Errorf("rate_limited counts = %v, want user=2 public=1", counts)
}
}
+137 -20
View File
@@ -8,6 +8,7 @@ package connectsrv
import (
"context"
"errors"
"net"
"net/http"
"strings"
@@ -19,6 +20,7 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"scrabble/gateway/internal/backendclient"
"scrabble/gateway/internal/config"
"scrabble/gateway/internal/push"
"scrabble/gateway/internal/ratelimit"
@@ -32,33 +34,68 @@ import (
// heartbeatKind is the live-stream keep-alive event kind.
const heartbeatKind = "heartbeat"
// Limiter classes, the `class` attribute of gateway_rate_limited_total and the
// class field of the periodic rejection report (R3).
const (
classUser = "user"
classPublic = "public"
classEmail = "email"
classAdmin = "admin"
)
// Explicit h2c server sizing (R3, after the R2 stress run questioned the
// implicit defaults).
const (
// h2cMaxConcurrentStreams bounds the open streams per client connection — the
// x/net default made explicit. A real client holds one Subscribe stream plus a
// few unary calls; only a synthetic load multiplexing many players over one
// transport approaches it. R7 revisits the sizing.
h2cMaxConcurrentStreams = 250
// h2cIdleTimeout closes a connection with no open streams. A live Subscribe
// stream keeps its connection active, so long-lived clients are unaffected;
// only abandoned connections are reaped.
h2cIdleTimeout = 3 * time.Minute
)
// Server implements edgev1connect.GatewayHandler.
type Server struct {
registry *transcode.Registry
sessions *session.Cache
limiter *ratelimit.Limiter
tracker *ratelimit.Tracker
hub *push.Hub
heartbeat time.Duration
log *zap.Logger
adminProxy http.Handler
metrics *serverMetrics
maxBodyBytes int
publicPolicy ratelimit.Policy
userPolicy ratelimit.Policy
emailPolicy ratelimit.Policy
adminPolicy ratelimit.Policy
}
// Deps carries the Server's dependencies.
// Deps carries the Server's dependencies. A nil Limiter, nil Tracker, zero
// RateLimit and non-positive MaxBodyBytes each select a safe default.
type Deps struct {
Registry *transcode.Registry
Sessions *session.Cache
Limiter *ratelimit.Limiter
Registry *transcode.Registry
Sessions *session.Cache
Limiter *ratelimit.Limiter
// Tracker accumulates limiter rejections for the periodic report; nil
// selects a private tracker (rejections are then only counted, never
// reported).
Tracker *ratelimit.Tracker
Hub *push.Hub
RateLimit config.RateLimitConfig
Heartbeat time.Duration
Logger *zap.Logger
AdminProxy http.Handler
Meter metric.Meter
// MaxBodyBytes caps one inbound request body and one Connect message read;
// zero or negative selects config.DefaultMaxBodyBytes.
MaxBodyBytes int
}
// NewServer constructs the edge service.
@@ -67,33 +104,55 @@ func NewServer(d Deps) *Server {
if log == nil {
log = zap.NewNop()
}
maxBody := d.MaxBodyBytes
if maxBody <= 0 {
maxBody = config.DefaultMaxBodyBytes
}
tracker := d.Tracker
if tracker == nil {
tracker = ratelimit.NewTracker()
}
limiter := d.Limiter
if limiter == nil {
limiter = ratelimit.New()
}
rl := d.RateLimit
if rl == (config.RateLimitConfig{}) {
rl = config.DefaultRateLimit()
}
return &Server{
registry: d.Registry,
sessions: d.Sessions,
limiter: d.Limiter,
limiter: limiter,
tracker: tracker,
hub: d.Hub,
heartbeat: d.Heartbeat,
log: log,
adminProxy: d.AdminProxy,
metrics: newServerMetrics(d.Meter),
publicPolicy: ratelimit.PerMinute(d.RateLimit.PublicPerMinute, d.RateLimit.PublicBurst),
userPolicy: ratelimit.PerMinute(d.RateLimit.UserPerMinute, d.RateLimit.UserBurst),
emailPolicy: ratelimit.Per(d.RateLimit.EmailPer10Min, 10*time.Minute, d.RateLimit.EmailBurst),
maxBodyBytes: maxBody,
publicPolicy: ratelimit.PerMinute(rl.PublicPerMinute, rl.PublicBurst),
userPolicy: ratelimit.PerMinute(rl.UserPerMinute, rl.UserBurst),
emailPolicy: ratelimit.Per(rl.EmailPer10Min, 10*time.Minute, rl.EmailBurst),
adminPolicy: ratelimit.PerMinute(rl.AdminPerMinute, rl.AdminBurst),
}
}
// HTTPHandler returns the h2c-wrapped Connect handler ready to serve.
func (s *Server) HTTPHandler() http.Handler {
mux := http.NewServeMux()
path, h := edgev1connect.NewGatewayHandler(s)
// The Connect read cap mirrors the HTTP-level body cap below; an oversized
// Execute message is refused (resource_exhausted) instead of buffered.
path, h := edgev1connect.NewGatewayHandler(s, connect.WithReadMaxBytes(s.maxBodyBytes))
mux.Handle(path, h)
if s.adminProxy != nil {
// The admin console (backend /_gm) is served on the public listener behind
// the proxy's Basic-Auth, mounted below the h2c wrap so the Connect edge keeps
// working over h2c (docs/ARCHITECTURE.md §12). In the deployed contour the
// front caddy owns the /_gm Basic-Auth and Grafana routing; this mount serves
// a non-caddy (local) setup.
mux.Handle("/_gm/", s.adminProxy)
// a non-caddy (local) setup. The per-IP admin limiter class guards it —
// notably a Basic-Auth brute force (R3).
mux.Handle("/_gm/", s.limitAdmin(s.adminProxy))
} else {
// With the console disabled here, keep /_gm a 404 so the SPA catch-all below
// does not serve the app shell at the operator path.
@@ -107,7 +166,21 @@ func (s *Server) HTTPHandler() http.Handler {
mux.Handle("/telegram/", webui.Handler("/telegram/", "index.html"))
mux.Handle("/app/", webui.Handler("/app/", "index.html"))
mux.Handle("/", webui.Handler("", "landing.html"))
return h2c.NewHandler(mux, &http2.Server{})
// Every request body on the public listener is capped (the admin proxy POSTs
// included); the h2c server carries explicit stream/idle sizing (R3).
return h2c.NewHandler(maxBodyHandler(s.maxBodyBytes, mux), &http2.Server{
MaxConcurrentStreams: h2cMaxConcurrentStreams,
IdleTimeout: h2cIdleTimeout,
})
}
// maxBodyHandler caps every inbound request body at limit bytes: a read past the
// cap fails with *http.MaxBytesError and the connection is marked to close.
func maxBodyHandler(limit int, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, int64(limit))
next.ServeHTTP(w, r)
})
}
// Execute runs one unary operation. Domain failures are returned in the envelope
@@ -138,17 +211,17 @@ func (s *Server) Execute(ctx context.Context, req *connect.Request[edgev1.Execut
s.metrics.recordActive(uid)
if !s.limiter.Allow("user:"+uid, s.userPolicy) {
result = "rate_limited"
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
return nil, s.rejectRateLimited(ctx, classUser, uid, msgType)
}
tr.UserID = uid
} else {
if !s.limiter.Allow("ip:"+clientIP, s.publicPolicy) {
result = "rate_limited"
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
return nil, s.rejectRateLimited(ctx, classPublic, clientIP, msgType)
}
if op.Email && !s.limiter.Allow("email:"+clientIP, s.emailPolicy) {
result = "rate_limited"
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
return nil, s.rejectRateLimited(ctx, classEmail, clientIP, msgType)
}
}
@@ -180,7 +253,7 @@ func (s *Server) Subscribe(ctx context.Context, req *connect.Request[edgev1.Subs
return err
}
if !s.limiter.Allow("user:"+uid, s.userPolicy) {
return connect.NewError(connect.CodeResourceExhausted, errRateLimited)
return s.rejectRateLimited(ctx, classUser, uid, "subscribe")
}
events, cancel := s.hub.Subscribe(uid)
@@ -216,6 +289,43 @@ func (s *Server) Subscribe(ctx context.Context, req *connect.Request[edgev1.Subs
}
}
// noteRateLimited accounts one limiter rejection: the aggregate counter, the
// per-rejection Debug line and the periodic-report tracker. The operational
// signal is the reporter's Warn summary; per-rejection logging stays at Debug so
// a rejection flood cannot flood the log (R3).
func (s *Server) noteRateLimited(ctx context.Context, class, key, msgType string) {
s.metrics.recordRateLimited(ctx, class)
s.tracker.Add(class, key)
s.log.Debug("rate limited",
zap.String("class", class),
zap.String("key", key),
zap.String("message_type", msgType))
}
// rejectRateLimited accounts one limiter rejection and returns the Connect error
// for the caller.
func (s *Server) rejectRateLimited(ctx context.Context, class, key, msgType string) error {
s.noteRateLimited(ctx, class, key, msgType)
return connect.NewError(connect.CodeResourceExhausted, errRateLimited)
}
// limitAdmin guards the admin proxy with the per-IP admin limiter class, ahead
// of its Basic-Auth check (a credential brute force is exactly what it bounds).
// It covers the gateway-fronted /_gm mount; in the deployed contour /_gm reaches
// the backend through caddy, whose Basic-Auth has no limiter (stock caddy) — see
// docs/ARCHITECTURE.md §12 (R3).
func (s *Server) limitAdmin(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ip := peerIP(r.RemoteAddr, r.Header)
if !s.limiter.Allow("admin:"+ip, s.adminPolicy) {
s.noteRateLimited(r.Context(), classAdmin, ip, "admin")
http.Error(w, "rate limited", http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
// resolve extracts and resolves the Authorization bearer token to an account id,
// returning a Connect Unauthenticated error when it is missing or unknown.
func (s *Server) resolve(ctx context.Context, h http.Header) (string, error) {
@@ -225,6 +335,15 @@ func (s *Server) resolve(ctx context.Context, h http.Header) (string, error) {
}
uid, err := s.sessions.Resolve(ctx, token)
if err != nil {
// An unknown or expired token (a backend 4xx) is the client's problem and
// stays silent; anything else — a resolve timeout, a refused connection, a
// backend 5xx — is an infra failure misread as "unauthenticated" by the
// client, so surface the cause (the transient resolves seen under load in
// the R2 stress run). The token itself is never logged.
var apiErr *backendclient.APIError
if !errors.As(err, &apiErr) || apiErr.Status >= http.StatusInternalServerError {
s.log.Warn("session resolve failed", zap.Error(err))
}
return "", connect.NewError(connect.CodeUnauthenticated, errInvalidSession)
}
return uid, nil
@@ -247,10 +366,8 @@ func bearerToken(header string) string {
// peer address (host part).
func peerIP(peerAddr string, h http.Header) string {
if xff := h.Get("X-Forwarded-For"); xff != "" {
if i := strings.IndexByte(xff, ','); i >= 0 {
return strings.TrimSpace(xff[:i])
}
return strings.TrimSpace(xff)
first, _, _ := strings.Cut(xff, ",")
return strings.TrimSpace(first)
}
if host, _, err := net.SplitHostPort(peerAddr); err == nil {
return host
+114
View File
@@ -83,6 +83,120 @@ func TestExecuteAuthedRequiresSession(t *testing.T) {
}
}
// TestExecuteRateLimitedTracked verifies a limiter rejection returns
// ResourceExhausted and lands in the rejection tracker under the public class,
// keyed by the client IP (R3).
func TestExecuteRateLimitedTracked(t *testing.T) {
backendSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"token":"tok","user_id":"u-1","is_guest":true,"display_name":"Guest"}`))
}))
defer backendSrv.Close()
backend, err := backendclient.New(backendSrv.URL, "localhost:9090", 2*time.Second)
if err != nil {
t.Fatalf("backendclient: %v", err)
}
defer func() { _ = backend.Close() }()
limits := config.DefaultRateLimit()
limits.PublicPerMinute, limits.PublicBurst = 1, 1
tracker := ratelimit.NewTracker()
edge := connectsrv.NewServer(connectsrv.Deps{
Registry: transcode.NewRegistry(backend, nil),
Sessions: session.NewCache(backend, time.Minute, 100),
Limiter: ratelimit.New(),
Tracker: tracker,
Hub: push.NewHub(0),
RateLimit: limits,
Heartbeat: 15 * time.Second,
})
edgeSrv := httptest.NewServer(edge.HTTPHandler())
defer edgeSrv.Close()
client := edgev1connect.NewGatewayClient(http.DefaultClient, edgeSrv.URL)
if _, err := client.Execute(context.Background(), connect.NewRequest(&edgev1.ExecuteRequest{
MessageType: transcode.MsgAuthGuest,
})); err != nil {
t.Fatalf("first execute: %v", err)
}
_, err = client.Execute(context.Background(), connect.NewRequest(&edgev1.ExecuteRequest{
MessageType: transcode.MsgAuthGuest,
}))
if connect.CodeOf(err) != connect.CodeResourceExhausted {
t.Fatalf("code = %v, want ResourceExhausted", connect.CodeOf(err))
}
entries := tracker.Drain()
if len(entries) != 1 {
t.Fatalf("tracker drained %d entries, want 1", len(entries))
}
if e := entries[0]; e.Class != "public" || e.Key != "127.0.0.1" || e.Rejected != 1 {
t.Fatalf("tracked %+v, want public/127.0.0.1 rejected=1", e)
}
}
// TestAdminMountRateLimited verifies the /_gm mount is guarded by the per-IP
// admin limiter class ahead of the proxy's Basic-Auth (R3).
func TestAdminMountRateLimited(t *testing.T) {
backendSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
defer backendSrv.Close()
backend, err := backendclient.New(backendSrv.URL, "localhost:9090", 2*time.Second)
if err != nil {
t.Fatalf("backendclient: %v", err)
}
defer func() { _ = backend.Close() }()
limits := config.DefaultRateLimit()
limits.AdminPerMinute, limits.AdminBurst = 1, 1
edge := connectsrv.NewServer(connectsrv.Deps{
Registry: transcode.NewRegistry(backend, nil),
Sessions: session.NewCache(backend, time.Minute, 100),
Limiter: ratelimit.New(),
Hub: push.NewHub(0),
RateLimit: limits,
Heartbeat: 15 * time.Second,
AdminProxy: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}),
})
edgeSrv := httptest.NewServer(edge.HTTPHandler())
defer edgeSrv.Close()
first, err := http.Get(edgeSrv.URL + "/_gm/")
if err != nil {
t.Fatalf("first /_gm: %v", err)
}
_ = first.Body.Close()
if first.StatusCode != http.StatusOK {
t.Fatalf("first /_gm = %d, want 200", first.StatusCode)
}
second, err := http.Get(edgeSrv.URL + "/_gm/")
if err != nil {
t.Fatalf("second /_gm: %v", err)
}
_ = second.Body.Close()
if second.StatusCode != http.StatusTooManyRequests {
t.Fatalf("second /_gm = %d, want 429", second.StatusCode)
}
}
// TestExecuteOversizedPayloadRejected verifies the request-body cap: an Execute
// message above GATEWAY_MAX_BODY_BYTES is refused at the edge without reaching
// the backend (R3).
func TestExecuteOversizedPayloadRejected(t *testing.T) {
client, cleanup := newEdge(t, func(w http.ResponseWriter, r *http.Request) {
t.Error("backend must not be called for an oversized payload")
})
defer cleanup()
_, err := client.Execute(context.Background(), connect.NewRequest(&edgev1.ExecuteRequest{
MessageType: transcode.MsgAuthGuest,
Payload: make([]byte, config.DefaultMaxBodyBytes+1),
}))
if connect.CodeOf(err) != connect.CodeResourceExhausted {
t.Fatalf("code = %v, want ResourceExhausted", connect.CodeOf(err))
}
}
func TestExecuteUnknownMessageType(t *testing.T) {
client, cleanup := newEdge(t, func(w http.ResponseWriter, r *http.Request) {})
defer cleanup()