diff --git a/gateway/README.md b/gateway/README.md index af4201f..d162bc0 100644 --- a/gateway/README.md +++ b/gateway/README.md @@ -23,7 +23,7 @@ proto/edge/v1/ # Connect envelope contract (committed generated Go) internal/config/ # GATEWAY_* env config internal/backendclient/ # typed REST client (+ X-User-ID) and push gRPC client internal/session/ # in-memory session cache (LRU/TTL, backend fallback) -internal/ratelimit/ # token-bucket limiter (golang.org/x/time/rate) +internal/ratelimit/ # token-bucket limiter (golang.org/x/time/rate) + the rejection tracker (R3) internal/connector/ # gRPC client to the Telegram connector (initData validate, out-of-app push) + routing internal/push/ # live-event fan-out hub (per-user client streams) internal/transcode/ # FlatBuffers<->REST bridge + message_type registry @@ -79,12 +79,21 @@ connector (`ValidateLoginWidget`) and forward the trusted `external_id`. These | `GATEWAY_SESSION_TTL` | `10m` | cached session lifetime | | `GATEWAY_SESSION_CACHE_MAX` | `50000` | cached session cap | | `GATEWAY_PUSH_HEARTBEAT_INTERVAL` | `10s` | live-stream keep-alive (an immediate heartbeat also fires on open, under the ~15s edge idle timeout) | +| `GATEWAY_MAX_BODY_BYTES` | `1048576` | caps one request body and one Connect message read; an oversized Execute is refused with `resource_exhausted` (R3) | | `GATEWAY_SERVICE_NAME` | `scrabble-gateway` | OpenTelemetry `service.name` | | `GATEWAY_OTEL_TRACES_EXPORTER` | `none` | `none`, `stdout` or `otlp` (gRPC; endpoint from `OTEL_EXPORTER_OTLP_*`) | | `GATEWAY_OTEL_METRICS_EXPORTER` | `none` | `none`, `stdout` or `otlp` | Rate-limit defaults (built-in): public 30/min·IP (burst 10), authenticated -120/min·user (burst 40), admin 60/min·IP (burst 20), email-code 5/10 min·IP. +300/min·user (burst 80, raised in Stage 17 for multi-device play), admin +60/min·IP (burst 20, guarding the `/_gm` mount ahead of its Basic-Auth), +email-code 5/10 min·IP (burst 2). + +Every rejection increments `gateway_rate_limited_total{class}` +(`user`/`public`/`email`/`admin`) and logs one Debug line; a reporter drains the +per-key rejection tracker every 30 s, emits a Warn summary per throttled key and +posts the report to the backend (`/api/v1/internal/ratelimit/report`), feeding +the admin console's throttled view and the high-rate auto-flag (R3). ## Run diff --git a/gateway/cmd/gateway/main.go b/gateway/cmd/gateway/main.go index 458b287..d6439f0 100644 --- a/gateway/cmd/gateway/main.go +++ b/gateway/cmd/gateway/main.go @@ -39,6 +39,14 @@ const ( pushReconnectDelay = 2 * time.Second // gatewayID identifies this gateway instance to the backend push channel. gatewayID = "gateway" + // readHeaderTimeout bounds reading one request's headers on the public + // listener (a slowloris guard). Bodies and long-lived streams are governed by + // the h2c settings in connectsrv — Read/WriteTimeout stay unset on purpose, + // they would kill the Subscribe stream (R3). + readHeaderTimeout = 10 * time.Second + // throttleReportInterval is the cadence of the rate-limiter rejection + // summary: the Warn log per throttled key and the report to the backend (R3). + throttleReportInterval = 30 * time.Second ) func main() { @@ -89,6 +97,7 @@ func run(ctx context.Context, cfg config.Config, logger *zap.Logger) error { sessions := session.NewCache(backend, cfg.SessionTTL, cfg.SessionCacheMax) limiter := ratelimit.New() + tracker := ratelimit.NewTracker() hub := push.NewHub(0) var conn *connector.Client @@ -119,22 +128,26 @@ func run(ctx context.Context, cfg config.Config, logger *zap.Logger) error { registry := transcode.NewRegistry(backend, validator, cfg.DefaultSupportedLanguages...) edge := connectsrv.NewServer(connectsrv.Deps{ - Registry: registry, - Sessions: sessions, - Limiter: limiter, - Hub: hub, - RateLimit: cfg.RateLimit, - Heartbeat: cfg.PushHeartbeatInterval, - Logger: logger, - AdminProxy: adminProxy, - Meter: tel.MeterProvider().Meter("scrabble/gateway/edge"), + Registry: registry, + Sessions: sessions, + Limiter: limiter, + Tracker: tracker, + Hub: hub, + RateLimit: cfg.RateLimit, + Heartbeat: cfg.PushHeartbeatInterval, + Logger: logger, + AdminProxy: adminProxy, + Meter: tel.MeterProvider().Meter("scrabble/gateway/edge"), + MaxBodyBytes: cfg.MaxBodyBytes, }) // Bridge the backend push stream into the fan-out hub (and the out-of-app // channel via the connector). go runPushPump(ctx, backend, hub, conn, logger) + // Periodically summarise rate-limiter rejections (Warn log + backend report). + go runThrottleReporter(ctx, tracker, backend, logger) - public := &http.Server{Addr: cfg.HTTPAddr, Handler: edge.HTTPHandler()} + public := &http.Server{Addr: cfg.HTTPAddr, Handler: edge.HTTPHandler(), ReadHeaderTimeout: readHeaderTimeout} servers := []*namedServer{{name: "public", srv: public}} logger.Info("gateway starting", @@ -182,6 +195,37 @@ func runServers(ctx context.Context, cancel context.CancelFunc, servers []*named return first } +// runThrottleReporter drains the rate-limiter rejection tracker on a fixed +// cadence, emits one Warn summary per throttled key and forwards the report to +// the backend (which feeds the admin throttled view and the high-rate +// auto-flag), until the context is done. A failed delivery is logged and +// dropped — the next window reports fresh data anyway. +func runThrottleReporter(ctx context.Context, tracker *ratelimit.Tracker, backend *backendclient.Client, logger *zap.Logger) { + ticker := time.NewTicker(throttleReportInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + entries := tracker.Drain() + if len(entries) == 0 { + continue + } + for _, e := range entries { + logger.Warn("rate limited", + zap.String("class", e.Class), + zap.String("key", e.Key), + zap.Int("rejected", e.Rejected), + zap.Duration("window", throttleReportInterval)) + } + if err := backend.ReportRateLimited(ctx, int(throttleReportInterval.Seconds()), entries); err != nil { + logger.Warn("rate-limit report failed", zap.Error(err)) + } + } +} + // runPushPump keeps a backend push subscription open, forwarding every event to // the hub and re-subscribing after the stream ends, until the context is done. For // the out-of-app push kinds it also routes events whose recipient has no live diff --git a/gateway/internal/backendclient/client.go b/gateway/internal/backendclient/client.go index c63f2fb..d1caeac 100644 --- a/gateway/internal/backendclient/client.go +++ b/gateway/internal/backendclient/client.go @@ -18,6 +18,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "scrabble/gateway/internal/ratelimit" pushv1 "scrabble/pkg/proto/push/v1" ) @@ -124,3 +125,15 @@ func parseAPIError(status int, data []byte) *APIError { func (c *Client) SubscribePush(ctx context.Context, gatewayID string) (grpc.ServerStreamingClient[pushv1.Event], error) { return c.push.Subscribe(ctx, &pushv1.SubscribeRequest{GatewayId: gatewayID}) } + +// ReportRateLimited posts the gateway's periodic rate-limiter rejection summary +// to the backend, which feeds the admin console's throttled view and the +// high-rate auto-flag. The endpoint carries no user identity: like +// sessions/resolve it rides the trusted internal segment (R3). +func (c *Client) ReportRateLimited(ctx context.Context, windowSeconds int, entries []ratelimit.Rejection) error { + body := struct { + WindowSeconds int `json:"window_seconds"` + Entries []ratelimit.Rejection `json:"entries"` + }{WindowSeconds: windowSeconds, Entries: entries} + return c.do(ctx, http.MethodPost, "/api/v1/internal/ratelimit/report", "", "", body, nil) +} diff --git a/gateway/internal/backendclient/client_test.go b/gateway/internal/backendclient/client_test.go new file mode 100644 index 0000000..3ad3612 --- /dev/null +++ b/gateway/internal/backendclient/client_test.go @@ -0,0 +1,48 @@ +package backendclient_test + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "scrabble/gateway/internal/backendclient" + "scrabble/gateway/internal/ratelimit" +) + +// TestReportRateLimited verifies the rejection report reaches the backend's +// internal endpoint with the agreed JSON shape and no user identity. +func TestReportRateLimited(t *testing.T) { + var got struct { + WindowSeconds int `json:"window_seconds"` + Entries []ratelimit.Rejection `json:"entries"` + } + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost || r.URL.Path != "/api/v1/internal/ratelimit/report" { + t.Errorf("call = %s %s, want POST /api/v1/internal/ratelimit/report", r.Method, r.URL.Path) + } + if uid := r.Header.Get("X-User-ID"); uid != "" { + t.Errorf("X-User-ID = %q, want empty", uid) + } + if err := json.NewDecoder(r.Body).Decode(&got); err != nil { + t.Errorf("decode report: %v", err) + } + })) + defer srv.Close() + + c, err := backendclient.New(srv.URL, "localhost:9090", 2*time.Second) + if err != nil { + t.Fatalf("backendclient: %v", err) + } + defer func() { _ = c.Close() }() + + entries := []ratelimit.Rejection{{Class: "user", Key: "u-1", Rejected: 5}} + if err := c.ReportRateLimited(context.Background(), 30, entries); err != nil { + t.Fatalf("ReportRateLimited: %v", err) + } + if got.WindowSeconds != 30 || len(got.Entries) != 1 || got.Entries[0] != entries[0] { + t.Fatalf("backend received %+v, want window 30 + %+v", got, entries[0]) + } +} diff --git a/gateway/internal/config/config.go b/gateway/internal/config/config.go index e035687..3800be6 100644 --- a/gateway/internal/config/config.go +++ b/gateway/internal/config/config.go @@ -44,6 +44,9 @@ type Config struct { SessionCacheMax int // PushHeartbeatInterval is the idle keep-alive cadence on a client live stream. PushHeartbeatInterval time.Duration + // MaxBodyBytes caps one inbound request body on the public listener and one + // Connect message read; oversized requests are refused without buffering. + MaxBodyBytes int // RateLimit configures the in-memory anti-abuse limiter. RateLimit RateLimitConfig // Telemetry configures the OpenTelemetry providers (shared bootstrap). @@ -77,6 +80,11 @@ const ( defaultServiceName = "scrabble-gateway" ) +// DefaultMaxBodyBytes is the default request-body cap (GATEWAY_MAX_BODY_BYTES): +// 1 MiB — far above any legitimate edge payload (drafts and chat are a few KB) +// yet small enough to stop a cheap memory-amplification upload (R3). +const DefaultMaxBodyBytes = 1 << 20 + // supportedLanguages is the set of game languages a service may declare for the // New Game variant gating; defaultSupportedLanguages is the non-platform fallback. var ( @@ -130,6 +138,9 @@ func Load() (Config, error) { if c.PushHeartbeatInterval, err = envDuration("GATEWAY_PUSH_HEARTBEAT_INTERVAL", defaultPushHeartbeatInterval); err != nil { return Config{}, err } + if c.MaxBodyBytes, err = envInt("GATEWAY_MAX_BODY_BYTES", DefaultMaxBodyBytes); err != nil { + return Config{}, err + } if c.DefaultSupportedLanguages, err = envLanguages("GATEWAY_DEFAULT_SUPPORTED_LANGUAGES", defaultSupportedLanguages); err != nil { return Config{}, err } @@ -161,6 +172,9 @@ func (c Config) validate() error { if c.BackendGRPCAddr == "" { return fmt.Errorf("config: GATEWAY_BACKEND_GRPC_ADDR must not be empty") } + if c.MaxBodyBytes <= 0 { + return fmt.Errorf("config: GATEWAY_MAX_BODY_BYTES must be positive") + } if err := c.Telemetry.Validate(); err != nil { return fmt.Errorf("config: %w", err) } diff --git a/gateway/internal/config/config_test.go b/gateway/internal/config/config_test.go index 8f9afcd..9667ea4 100644 --- a/gateway/internal/config/config_test.go +++ b/gateway/internal/config/config_test.go @@ -29,3 +29,19 @@ func TestLoadRejectsUnsupportedExporter(t *testing.T) { t.Fatal("Load: expected an error for an unsupported exporter, got nil") } } + +// TestLoadMaxBodyBytes verifies the body-cap default and that a non-positive +// override fails validation. +func TestLoadMaxBodyBytes(t *testing.T) { + c, err := Load() + if err != nil { + t.Fatalf("Load: %v", err) + } + if c.MaxBodyBytes != DefaultMaxBodyBytes { + t.Errorf("MaxBodyBytes = %d, want %d", c.MaxBodyBytes, DefaultMaxBodyBytes) + } + t.Setenv("GATEWAY_MAX_BODY_BYTES", "0") + if _, err := Load(); err == nil { + t.Fatal("Load: expected an error for a non-positive body cap, got nil") + } +} diff --git a/gateway/internal/connectsrv/metrics.go b/gateway/internal/connectsrv/metrics.go index a08c9c2..ab18bf1 100644 --- a/gateway/internal/connectsrv/metrics.go +++ b/gateway/internal/connectsrv/metrics.go @@ -24,8 +24,9 @@ var activeUserWindows = []struct { // serverMetrics holds the edge's operational instruments. It defaults to no-ops; // NewServer installs the real meter when one is supplied in Deps. type serverMetrics struct { - edge metric.Float64Histogram - active *activeUsers + edge metric.Float64Histogram + rateLimited metric.Int64Counter + active *activeUsers } // newServerMetrics builds the instruments on meter (nil selects a no-op meter), @@ -42,7 +43,12 @@ func newServerMetrics(meter metric.Meter) *serverMetrics { if err != nil { h, _ = noop.NewMeterProvider().Meter(meterName).Float64Histogram("edge_request_duration") } - m := &serverMetrics{edge: h, active: newActiveUsers()} + c, err := meter.Int64Counter("gateway_rate_limited_total", + metric.WithDescription("Rate-limiter rejections at the edge, by limiter class (user, public, email or admin) — aggregate only, no per-user attributes.")) + if err != nil { + c, _ = noop.NewMeterProvider().Meter(meterName).Int64Counter("gateway_rate_limited_total") + } + m := &serverMetrics{edge: h, rateLimited: c, active: newActiveUsers()} gauge, err := meter.Int64ObservableGauge("active_users", metric.WithDescription("Distinct accounts that performed an authenticated action within the window (in-memory, single gateway instance).")) @@ -75,3 +81,8 @@ func (m *serverMetrics) recordEdge(ctx context.Context, msgType, result string, func (m *serverMetrics) recordActive(uid string) { m.active.seen(uid) } + +// recordRateLimited counts one limiter rejection under class. +func (m *serverMetrics) recordRateLimited(ctx context.Context, class string) { + m.rateLimited.Add(ctx, 1, metric.WithAttributes(attribute.String("class", class))) +} diff --git a/gateway/internal/connectsrv/metrics_test.go b/gateway/internal/connectsrv/metrics_test.go index 80987cc..2f0910c 100644 --- a/gateway/internal/connectsrv/metrics_test.go +++ b/gateway/internal/connectsrv/metrics_test.go @@ -52,3 +52,41 @@ func TestEdgeMetric(t *testing.T) { t.Errorf("edge auth.guest/domain = %d, want 1", got) } } + +// TestRateLimitedMetric records limiter rejections through a manual reader and +// asserts gateway_rate_limited_total splits by class. +func TestRateLimitedMetric(t *testing.T) { + ctx := context.Background() + reader := sdkmetric.NewManualReader() + meter := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)).Meter("test") + m := newServerMetrics(meter) + + m.recordRateLimited(ctx, "user") + m.recordRateLimited(ctx, "user") + m.recordRateLimited(ctx, "public") + + var rm metricdata.ResourceMetrics + if err := reader.Collect(ctx, &rm); err != nil { + t.Fatalf("collect: %v", err) + } + + counts := map[string]int64{} + for _, sm := range rm.ScopeMetrics { + for _, md := range sm.Metrics { + if md.Name != "gateway_rate_limited_total" { + continue + } + sum, ok := md.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("gateway_rate_limited_total is not an int64 sum") + } + for _, dp := range sum.DataPoints { + class, _ := dp.Attributes.Value(attribute.Key("class")) + counts[class.AsString()] += dp.Value + } + } + } + if counts["user"] != 2 || counts["public"] != 1 { + t.Errorf("rate_limited counts = %v, want user=2 public=1", counts) + } +} diff --git a/gateway/internal/connectsrv/server.go b/gateway/internal/connectsrv/server.go index 455ad3d..5c6fef0 100644 --- a/gateway/internal/connectsrv/server.go +++ b/gateway/internal/connectsrv/server.go @@ -8,6 +8,7 @@ package connectsrv import ( "context" + "errors" "net" "net/http" "strings" @@ -19,6 +20,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" + "scrabble/gateway/internal/backendclient" "scrabble/gateway/internal/config" "scrabble/gateway/internal/push" "scrabble/gateway/internal/ratelimit" @@ -32,33 +34,68 @@ import ( // heartbeatKind is the live-stream keep-alive event kind. const heartbeatKind = "heartbeat" +// Limiter classes, the `class` attribute of gateway_rate_limited_total and the +// class field of the periodic rejection report (R3). +const ( + classUser = "user" + classPublic = "public" + classEmail = "email" + classAdmin = "admin" +) + +// Explicit h2c server sizing (R3, after the R2 stress run questioned the +// implicit defaults). +const ( + // h2cMaxConcurrentStreams bounds the open streams per client connection — the + // x/net default made explicit. A real client holds one Subscribe stream plus a + // few unary calls; only a synthetic load multiplexing many players over one + // transport approaches it. R7 revisits the sizing. + h2cMaxConcurrentStreams = 250 + // h2cIdleTimeout closes a connection with no open streams. A live Subscribe + // stream keeps its connection active, so long-lived clients are unaffected; + // only abandoned connections are reaped. + h2cIdleTimeout = 3 * time.Minute +) + // Server implements edgev1connect.GatewayHandler. type Server struct { registry *transcode.Registry sessions *session.Cache limiter *ratelimit.Limiter + tracker *ratelimit.Tracker hub *push.Hub heartbeat time.Duration log *zap.Logger adminProxy http.Handler metrics *serverMetrics + maxBodyBytes int + publicPolicy ratelimit.Policy userPolicy ratelimit.Policy emailPolicy ratelimit.Policy + adminPolicy ratelimit.Policy } -// Deps carries the Server's dependencies. +// Deps carries the Server's dependencies. A nil Limiter, nil Tracker, zero +// RateLimit and non-positive MaxBodyBytes each select a safe default. type Deps struct { - Registry *transcode.Registry - Sessions *session.Cache - Limiter *ratelimit.Limiter + Registry *transcode.Registry + Sessions *session.Cache + Limiter *ratelimit.Limiter + // Tracker accumulates limiter rejections for the periodic report; nil + // selects a private tracker (rejections are then only counted, never + // reported). + Tracker *ratelimit.Tracker Hub *push.Hub RateLimit config.RateLimitConfig Heartbeat time.Duration Logger *zap.Logger AdminProxy http.Handler Meter metric.Meter + // MaxBodyBytes caps one inbound request body and one Connect message read; + // zero or negative selects config.DefaultMaxBodyBytes. + MaxBodyBytes int } // NewServer constructs the edge service. @@ -67,33 +104,55 @@ func NewServer(d Deps) *Server { if log == nil { log = zap.NewNop() } + maxBody := d.MaxBodyBytes + if maxBody <= 0 { + maxBody = config.DefaultMaxBodyBytes + } + tracker := d.Tracker + if tracker == nil { + tracker = ratelimit.NewTracker() + } + limiter := d.Limiter + if limiter == nil { + limiter = ratelimit.New() + } + rl := d.RateLimit + if rl == (config.RateLimitConfig{}) { + rl = config.DefaultRateLimit() + } return &Server{ registry: d.Registry, sessions: d.Sessions, - limiter: d.Limiter, + limiter: limiter, + tracker: tracker, hub: d.Hub, heartbeat: d.Heartbeat, log: log, adminProxy: d.AdminProxy, metrics: newServerMetrics(d.Meter), - publicPolicy: ratelimit.PerMinute(d.RateLimit.PublicPerMinute, d.RateLimit.PublicBurst), - userPolicy: ratelimit.PerMinute(d.RateLimit.UserPerMinute, d.RateLimit.UserBurst), - emailPolicy: ratelimit.Per(d.RateLimit.EmailPer10Min, 10*time.Minute, d.RateLimit.EmailBurst), + maxBodyBytes: maxBody, + publicPolicy: ratelimit.PerMinute(rl.PublicPerMinute, rl.PublicBurst), + userPolicy: ratelimit.PerMinute(rl.UserPerMinute, rl.UserBurst), + emailPolicy: ratelimit.Per(rl.EmailPer10Min, 10*time.Minute, rl.EmailBurst), + adminPolicy: ratelimit.PerMinute(rl.AdminPerMinute, rl.AdminBurst), } } // HTTPHandler returns the h2c-wrapped Connect handler ready to serve. func (s *Server) HTTPHandler() http.Handler { mux := http.NewServeMux() - path, h := edgev1connect.NewGatewayHandler(s) + // The Connect read cap mirrors the HTTP-level body cap below; an oversized + // Execute message is refused (resource_exhausted) instead of buffered. + path, h := edgev1connect.NewGatewayHandler(s, connect.WithReadMaxBytes(s.maxBodyBytes)) mux.Handle(path, h) if s.adminProxy != nil { // The admin console (backend /_gm) is served on the public listener behind // the proxy's Basic-Auth, mounted below the h2c wrap so the Connect edge keeps // working over h2c (docs/ARCHITECTURE.md §12). In the deployed contour the // front caddy owns the /_gm Basic-Auth and Grafana routing; this mount serves - // a non-caddy (local) setup. - mux.Handle("/_gm/", s.adminProxy) + // a non-caddy (local) setup. The per-IP admin limiter class guards it — + // notably a Basic-Auth brute force (R3). + mux.Handle("/_gm/", s.limitAdmin(s.adminProxy)) } else { // With the console disabled here, keep /_gm a 404 so the SPA catch-all below // does not serve the app shell at the operator path. @@ -107,7 +166,21 @@ func (s *Server) HTTPHandler() http.Handler { mux.Handle("/telegram/", webui.Handler("/telegram/", "index.html")) mux.Handle("/app/", webui.Handler("/app/", "index.html")) mux.Handle("/", webui.Handler("", "landing.html")) - return h2c.NewHandler(mux, &http2.Server{}) + // Every request body on the public listener is capped (the admin proxy POSTs + // included); the h2c server carries explicit stream/idle sizing (R3). + return h2c.NewHandler(maxBodyHandler(s.maxBodyBytes, mux), &http2.Server{ + MaxConcurrentStreams: h2cMaxConcurrentStreams, + IdleTimeout: h2cIdleTimeout, + }) +} + +// maxBodyHandler caps every inbound request body at limit bytes: a read past the +// cap fails with *http.MaxBytesError and the connection is marked to close. +func maxBodyHandler(limit int, next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r.Body = http.MaxBytesReader(w, r.Body, int64(limit)) + next.ServeHTTP(w, r) + }) } // Execute runs one unary operation. Domain failures are returned in the envelope @@ -138,17 +211,17 @@ func (s *Server) Execute(ctx context.Context, req *connect.Request[edgev1.Execut s.metrics.recordActive(uid) if !s.limiter.Allow("user:"+uid, s.userPolicy) { result = "rate_limited" - return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited) + return nil, s.rejectRateLimited(ctx, classUser, uid, msgType) } tr.UserID = uid } else { if !s.limiter.Allow("ip:"+clientIP, s.publicPolicy) { result = "rate_limited" - return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited) + return nil, s.rejectRateLimited(ctx, classPublic, clientIP, msgType) } if op.Email && !s.limiter.Allow("email:"+clientIP, s.emailPolicy) { result = "rate_limited" - return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited) + return nil, s.rejectRateLimited(ctx, classEmail, clientIP, msgType) } } @@ -180,7 +253,7 @@ func (s *Server) Subscribe(ctx context.Context, req *connect.Request[edgev1.Subs return err } if !s.limiter.Allow("user:"+uid, s.userPolicy) { - return connect.NewError(connect.CodeResourceExhausted, errRateLimited) + return s.rejectRateLimited(ctx, classUser, uid, "subscribe") } events, cancel := s.hub.Subscribe(uid) @@ -216,6 +289,43 @@ func (s *Server) Subscribe(ctx context.Context, req *connect.Request[edgev1.Subs } } +// noteRateLimited accounts one limiter rejection: the aggregate counter, the +// per-rejection Debug line and the periodic-report tracker. The operational +// signal is the reporter's Warn summary; per-rejection logging stays at Debug so +// a rejection flood cannot flood the log (R3). +func (s *Server) noteRateLimited(ctx context.Context, class, key, msgType string) { + s.metrics.recordRateLimited(ctx, class) + s.tracker.Add(class, key) + s.log.Debug("rate limited", + zap.String("class", class), + zap.String("key", key), + zap.String("message_type", msgType)) +} + +// rejectRateLimited accounts one limiter rejection and returns the Connect error +// for the caller. +func (s *Server) rejectRateLimited(ctx context.Context, class, key, msgType string) error { + s.noteRateLimited(ctx, class, key, msgType) + return connect.NewError(connect.CodeResourceExhausted, errRateLimited) +} + +// limitAdmin guards the admin proxy with the per-IP admin limiter class, ahead +// of its Basic-Auth check (a credential brute force is exactly what it bounds). +// It covers the gateway-fronted /_gm mount; in the deployed contour /_gm reaches +// the backend through caddy, whose Basic-Auth has no limiter (stock caddy) — see +// docs/ARCHITECTURE.md §12 (R3). +func (s *Server) limitAdmin(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ip := peerIP(r.RemoteAddr, r.Header) + if !s.limiter.Allow("admin:"+ip, s.adminPolicy) { + s.noteRateLimited(r.Context(), classAdmin, ip, "admin") + http.Error(w, "rate limited", http.StatusTooManyRequests) + return + } + next.ServeHTTP(w, r) + }) +} + // resolve extracts and resolves the Authorization bearer token to an account id, // returning a Connect Unauthenticated error when it is missing or unknown. func (s *Server) resolve(ctx context.Context, h http.Header) (string, error) { @@ -225,6 +335,15 @@ func (s *Server) resolve(ctx context.Context, h http.Header) (string, error) { } uid, err := s.sessions.Resolve(ctx, token) if err != nil { + // An unknown or expired token (a backend 4xx) is the client's problem and + // stays silent; anything else — a resolve timeout, a refused connection, a + // backend 5xx — is an infra failure misread as "unauthenticated" by the + // client, so surface the cause (the transient resolves seen under load in + // the R2 stress run). The token itself is never logged. + var apiErr *backendclient.APIError + if !errors.As(err, &apiErr) || apiErr.Status >= http.StatusInternalServerError { + s.log.Warn("session resolve failed", zap.Error(err)) + } return "", connect.NewError(connect.CodeUnauthenticated, errInvalidSession) } return uid, nil @@ -247,10 +366,8 @@ func bearerToken(header string) string { // peer address (host part). func peerIP(peerAddr string, h http.Header) string { if xff := h.Get("X-Forwarded-For"); xff != "" { - if i := strings.IndexByte(xff, ','); i >= 0 { - return strings.TrimSpace(xff[:i]) - } - return strings.TrimSpace(xff) + first, _, _ := strings.Cut(xff, ",") + return strings.TrimSpace(first) } if host, _, err := net.SplitHostPort(peerAddr); err == nil { return host diff --git a/gateway/internal/connectsrv/server_test.go b/gateway/internal/connectsrv/server_test.go index 59fcadf..226f43b 100644 --- a/gateway/internal/connectsrv/server_test.go +++ b/gateway/internal/connectsrv/server_test.go @@ -83,6 +83,120 @@ func TestExecuteAuthedRequiresSession(t *testing.T) { } } +// TestExecuteRateLimitedTracked verifies a limiter rejection returns +// ResourceExhausted and lands in the rejection tracker under the public class, +// keyed by the client IP (R3). +func TestExecuteRateLimitedTracked(t *testing.T) { + backendSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"token":"tok","user_id":"u-1","is_guest":true,"display_name":"Guest"}`)) + })) + defer backendSrv.Close() + backend, err := backendclient.New(backendSrv.URL, "localhost:9090", 2*time.Second) + if err != nil { + t.Fatalf("backendclient: %v", err) + } + defer func() { _ = backend.Close() }() + + limits := config.DefaultRateLimit() + limits.PublicPerMinute, limits.PublicBurst = 1, 1 + tracker := ratelimit.NewTracker() + edge := connectsrv.NewServer(connectsrv.Deps{ + Registry: transcode.NewRegistry(backend, nil), + Sessions: session.NewCache(backend, time.Minute, 100), + Limiter: ratelimit.New(), + Tracker: tracker, + Hub: push.NewHub(0), + RateLimit: limits, + Heartbeat: 15 * time.Second, + }) + edgeSrv := httptest.NewServer(edge.HTTPHandler()) + defer edgeSrv.Close() + client := edgev1connect.NewGatewayClient(http.DefaultClient, edgeSrv.URL) + + if _, err := client.Execute(context.Background(), connect.NewRequest(&edgev1.ExecuteRequest{ + MessageType: transcode.MsgAuthGuest, + })); err != nil { + t.Fatalf("first execute: %v", err) + } + _, err = client.Execute(context.Background(), connect.NewRequest(&edgev1.ExecuteRequest{ + MessageType: transcode.MsgAuthGuest, + })) + if connect.CodeOf(err) != connect.CodeResourceExhausted { + t.Fatalf("code = %v, want ResourceExhausted", connect.CodeOf(err)) + } + + entries := tracker.Drain() + if len(entries) != 1 { + t.Fatalf("tracker drained %d entries, want 1", len(entries)) + } + if e := entries[0]; e.Class != "public" || e.Key != "127.0.0.1" || e.Rejected != 1 { + t.Fatalf("tracked %+v, want public/127.0.0.1 rejected=1", e) + } +} + +// TestAdminMountRateLimited verifies the /_gm mount is guarded by the per-IP +// admin limiter class ahead of the proxy's Basic-Auth (R3). +func TestAdminMountRateLimited(t *testing.T) { + backendSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + defer backendSrv.Close() + backend, err := backendclient.New(backendSrv.URL, "localhost:9090", 2*time.Second) + if err != nil { + t.Fatalf("backendclient: %v", err) + } + defer func() { _ = backend.Close() }() + + limits := config.DefaultRateLimit() + limits.AdminPerMinute, limits.AdminBurst = 1, 1 + edge := connectsrv.NewServer(connectsrv.Deps{ + Registry: transcode.NewRegistry(backend, nil), + Sessions: session.NewCache(backend, time.Minute, 100), + Limiter: ratelimit.New(), + Hub: push.NewHub(0), + RateLimit: limits, + Heartbeat: 15 * time.Second, + AdminProxy: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }), + }) + edgeSrv := httptest.NewServer(edge.HTTPHandler()) + defer edgeSrv.Close() + + first, err := http.Get(edgeSrv.URL + "/_gm/") + if err != nil { + t.Fatalf("first /_gm: %v", err) + } + _ = first.Body.Close() + if first.StatusCode != http.StatusOK { + t.Fatalf("first /_gm = %d, want 200", first.StatusCode) + } + second, err := http.Get(edgeSrv.URL + "/_gm/") + if err != nil { + t.Fatalf("second /_gm: %v", err) + } + _ = second.Body.Close() + if second.StatusCode != http.StatusTooManyRequests { + t.Fatalf("second /_gm = %d, want 429", second.StatusCode) + } +} + +// TestExecuteOversizedPayloadRejected verifies the request-body cap: an Execute +// message above GATEWAY_MAX_BODY_BYTES is refused at the edge without reaching +// the backend (R3). +func TestExecuteOversizedPayloadRejected(t *testing.T) { + client, cleanup := newEdge(t, func(w http.ResponseWriter, r *http.Request) { + t.Error("backend must not be called for an oversized payload") + }) + defer cleanup() + + _, err := client.Execute(context.Background(), connect.NewRequest(&edgev1.ExecuteRequest{ + MessageType: transcode.MsgAuthGuest, + Payload: make([]byte, config.DefaultMaxBodyBytes+1), + })) + if connect.CodeOf(err) != connect.CodeResourceExhausted { + t.Fatalf("code = %v, want ResourceExhausted", connect.CodeOf(err)) + } +} + func TestExecuteUnknownMessageType(t *testing.T) { client, cleanup := newEdge(t, func(w http.ResponseWriter, r *http.Request) {}) defer cleanup() diff --git a/gateway/internal/ratelimit/tracker.go b/gateway/internal/ratelimit/tracker.go new file mode 100644 index 0000000..92aa602 --- /dev/null +++ b/gateway/internal/ratelimit/tracker.go @@ -0,0 +1,52 @@ +package ratelimit + +import "sync" + +// Rejection aggregates the limiter rejections of one key within one report +// window. Class is the limiter class (user, public, email or admin); Key is the +// class-specific subject — an account id for the user class, a client IP for the +// others. The JSON shape is the gateway→backend rate-limit report wire contract. +type Rejection struct { + Class string `json:"class"` + Key string `json:"key"` + Rejected int `json:"rejected"` +} + +// Tracker accumulates limiter rejections between drains. The gateway's periodic +// reporter drains it to emit the per-key log summary and the backend report; the +// per-rejection cost is one map increment under a mutex, safe on the hot path. +type Tracker struct { + mu sync.Mutex + m map[trackerKey]int +} + +type trackerKey struct{ class, key string } + +// NewTracker constructs an empty Tracker. +func NewTracker() *Tracker { + return &Tracker{m: make(map[trackerKey]int)} +} + +// Add counts one rejection of key under class. +func (t *Tracker) Add(class, key string) { + t.mu.Lock() + defer t.mu.Unlock() + t.m[trackerKey{class: class, key: key}]++ +} + +// Drain returns the rejections accumulated since the previous drain, in +// unspecified order, and resets the tracker. It returns nil when nothing was +// rejected. +func (t *Tracker) Drain() []Rejection { + t.mu.Lock() + defer t.mu.Unlock() + if len(t.m) == 0 { + return nil + } + out := make([]Rejection, 0, len(t.m)) + for k, n := range t.m { + out = append(out, Rejection{Class: k.class, Key: k.key, Rejected: n}) + } + clear(t.m) + return out +} diff --git a/gateway/internal/ratelimit/tracker_test.go b/gateway/internal/ratelimit/tracker_test.go new file mode 100644 index 0000000..4f10f3d --- /dev/null +++ b/gateway/internal/ratelimit/tracker_test.go @@ -0,0 +1,38 @@ +package ratelimit_test + +import ( + "testing" + + "scrabble/gateway/internal/ratelimit" +) + +// TestTrackerDrain verifies rejections aggregate per (class, key) and that a +// drain resets the tracker. +func TestTrackerDrain(t *testing.T) { + tr := ratelimit.NewTracker() + if got := tr.Drain(); got != nil { + t.Fatalf("empty tracker drained %v, want nil", got) + } + + tr.Add("user", "u-1") + tr.Add("user", "u-1") + tr.Add("public", "10.0.0.1") + + got := map[string]ratelimit.Rejection{} + for _, r := range tr.Drain() { + got[r.Class+"/"+r.Key] = r + } + if len(got) != 2 { + t.Fatalf("drained %d entries, want 2", len(got)) + } + if r := got["user/u-1"]; r.Rejected != 2 { + t.Errorf("user/u-1 rejected = %d, want 2", r.Rejected) + } + if r := got["public/10.0.0.1"]; r.Rejected != 1 { + t.Errorf("public/10.0.0.1 rejected = %d, want 1", r.Rejected) + } + + if got := tr.Drain(); got != nil { + t.Fatalf("second drain = %v, want nil", got) + } +}