R3: backend rate-limit observability — ratewatch, auto-flag, admin throttled view
- accounts.flagged_high_rate_at baked into the R1 baseline (no prod data; the contour schema is wiped after merge); jet regenerated — the regen also picks up the previously missing game_drafts/game_hidden models. - account.Store: FlagHighRate (set-once), ClearHighRateFlag, the flag in GetByID/ListUsers and a ListFlaggedHighRate review queue. - New internal/ratewatch: ingests the gateway rejection reports, keeps a bounded in-memory episode window for the console and applies the conservative auto-flag (1000 rejected / 10 min, BACKEND_HIGHRATE_FLAG_*). - POST /api/v1/internal/ratelimit/report (network-trusted, like sessions/resolve). - Admin console: Throttled page (episodes + flagged accounts), a high-rate badge in the user list, the marker + operator clear action on the user card. - Tests: ratewatch unit suite, report-route handler test, renderer cases, integration coverage for the store round-trip and the console flow.
This commit is contained in:
@@ -0,0 +1,235 @@
|
||||
// Package ratewatch ingests the gateway's periodic rate-limiter rejection
|
||||
// reports (R3). It keeps an in-memory window of recent throttle episodes for
|
||||
// the admin console's view and applies the conservative high-rate auto-flag:
|
||||
// when one account's rejections within the rolling window cross the threshold,
|
||||
// the account store stamps the soft, reversible flagged_high_rate_at marker
|
||||
// (set-once; an operator clears it; never an automatic ban). Like the gateway's
|
||||
// active_users gauge it is single-instance and resets on restart by design —
|
||||
// the durable part is the account flag, not the episode window.
|
||||
package ratewatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ClassUser is the limiter class whose keys are account ids — the only class
|
||||
// the auto-flag applies to (the others are keyed by client IP).
|
||||
const ClassUser = "user"
|
||||
|
||||
const (
|
||||
// maxSeries bounds the distinct (class, key) series kept for the console
|
||||
// view, so a key-spraying client cannot grow the map: past the bound the
|
||||
// least-recently-throttled series is evicted.
|
||||
maxSeries = 200
|
||||
// minRetention keeps an episode visible in the console for at least an hour
|
||||
// after its last rejection (longer when the flag window is longer).
|
||||
minRetention = time.Hour
|
||||
)
|
||||
|
||||
// Config tunes the conservative high-rate auto-flag.
|
||||
type Config struct {
|
||||
// FlagThreshold is the rejected-call count within FlagWindow past which a
|
||||
// user account is flagged.
|
||||
FlagThreshold int
|
||||
// FlagWindow is the rolling window the rejections accumulate over.
|
||||
FlagWindow time.Duration
|
||||
}
|
||||
|
||||
// DefaultConfig returns the agreed conservative defaults — 1000 rejected calls
|
||||
// within a rolling 10 minutes (~1.7/s sustained, far above the client's
|
||||
// capped-backoff retry noise yet a fraction of an abusive loop).
|
||||
func DefaultConfig() Config {
|
||||
return Config{FlagThreshold: 1000, FlagWindow: 10 * time.Minute}
|
||||
}
|
||||
|
||||
// Validate reports whether the configuration values are acceptable.
|
||||
func (c Config) Validate() error {
|
||||
if c.FlagThreshold <= 0 {
|
||||
return fmt.Errorf("ratewatch: flag threshold must be positive")
|
||||
}
|
||||
if c.FlagWindow <= 0 {
|
||||
return fmt.Errorf("ratewatch: flag window must be positive")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flagger stamps the account-level high-rate marker; account.Store satisfies it.
|
||||
type Flagger interface {
|
||||
FlagHighRate(ctx context.Context, id uuid.UUID, at time.Time) (bool, error)
|
||||
}
|
||||
|
||||
// Entry is one reported aggregate: the rejections of one limiter key within one
|
||||
// gateway report window (the wire mirror of the gateway's rejection summary).
|
||||
type Entry struct {
|
||||
Class string
|
||||
Key string
|
||||
Rejected int
|
||||
}
|
||||
|
||||
// Episode is one key's recent-throttle aggregate for the admin view.
|
||||
type Episode struct {
|
||||
Class string
|
||||
Key string
|
||||
Rejected int
|
||||
FirstSeen time.Time
|
||||
LastSeen time.Time
|
||||
}
|
||||
|
||||
// Watch accumulates reports and applies the auto-flag rule.
|
||||
type Watch struct {
|
||||
cfg Config
|
||||
flagger Flagger
|
||||
log *zap.Logger
|
||||
now func() time.Time
|
||||
|
||||
mu sync.Mutex
|
||||
series map[seriesKey]*series
|
||||
}
|
||||
|
||||
type seriesKey struct{ class, key string }
|
||||
|
||||
type series struct {
|
||||
points []point // ascending by time
|
||||
}
|
||||
|
||||
type point struct {
|
||||
at time.Time
|
||||
n int
|
||||
}
|
||||
|
||||
// New constructs a Watch over flagger with cfg. A nil logger is replaced by a
|
||||
// no-op one; a nil flagger disables the auto-flag (the view still works).
|
||||
func New(cfg Config, flagger Flagger, log *zap.Logger) *Watch {
|
||||
if log == nil {
|
||||
log = zap.NewNop()
|
||||
}
|
||||
return &Watch{
|
||||
cfg: cfg,
|
||||
flagger: flagger,
|
||||
log: log,
|
||||
now: time.Now,
|
||||
series: make(map[seriesKey]*series),
|
||||
}
|
||||
}
|
||||
|
||||
// Ingest records one gateway report. Entries with an empty class or key or a
|
||||
// non-positive count are skipped. When a user-class series crosses the flag
|
||||
// threshold within the flag window, the account is flagged (the store keeps it
|
||||
// set-once, so a sustained episode costs one no-op UPDATE per report).
|
||||
func (w *Watch) Ingest(ctx context.Context, entries []Entry) {
|
||||
if len(entries) == 0 {
|
||||
return
|
||||
}
|
||||
now := w.now()
|
||||
var flag []uuid.UUID
|
||||
w.mu.Lock()
|
||||
for _, e := range entries {
|
||||
if e.Class == "" || e.Key == "" || e.Rejected <= 0 {
|
||||
continue
|
||||
}
|
||||
k := seriesKey{class: e.Class, key: e.Key}
|
||||
s := w.series[k]
|
||||
if s == nil {
|
||||
s = &series{}
|
||||
w.series[k] = s
|
||||
}
|
||||
s.points = append(s.points, point{at: now, n: e.Rejected})
|
||||
if e.Class == ClassUser && s.sumSince(now.Add(-w.cfg.FlagWindow)) >= w.cfg.FlagThreshold {
|
||||
if id, err := uuid.Parse(e.Key); err == nil {
|
||||
flag = append(flag, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
w.pruneLocked(now)
|
||||
w.mu.Unlock()
|
||||
|
||||
if w.flagger == nil {
|
||||
return
|
||||
}
|
||||
for _, id := range flag {
|
||||
set, err := w.flagger.FlagHighRate(ctx, id, now)
|
||||
switch {
|
||||
case err != nil:
|
||||
w.log.Warn("high-rate flag failed", zap.String("account_id", id.String()), zap.Error(err))
|
||||
case set:
|
||||
w.log.Info("account flagged high-rate",
|
||||
zap.String("account_id", id.String()),
|
||||
zap.Int("threshold", w.cfg.FlagThreshold),
|
||||
zap.Duration("window", w.cfg.FlagWindow))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Config returns the active auto-flag tuning (the admin console captions it).
|
||||
func (w *Watch) Config() Config { return w.cfg }
|
||||
|
||||
// Recent returns the retained throttle episodes, most recently throttled first.
|
||||
func (w *Watch) Recent() []Episode {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
out := make([]Episode, 0, len(w.series))
|
||||
for k, s := range w.series {
|
||||
if len(s.points) == 0 {
|
||||
continue
|
||||
}
|
||||
ep := Episode{
|
||||
Class: k.class,
|
||||
Key: k.key,
|
||||
FirstSeen: s.points[0].at,
|
||||
LastSeen: s.points[len(s.points)-1].at,
|
||||
}
|
||||
for _, p := range s.points {
|
||||
ep.Rejected += p.n
|
||||
}
|
||||
out = append(out, ep)
|
||||
}
|
||||
sort.Slice(out, func(i, j int) bool { return out[i].LastSeen.After(out[j].LastSeen) })
|
||||
return out
|
||||
}
|
||||
|
||||
// sumSince totals the points at or after cutoff.
|
||||
func (s *series) sumSince(cutoff time.Time) int {
|
||||
sum := 0
|
||||
for i := len(s.points) - 1; i >= 0; i-- {
|
||||
if s.points[i].at.Before(cutoff) {
|
||||
break
|
||||
}
|
||||
sum += s.points[i].n
|
||||
}
|
||||
return sum
|
||||
}
|
||||
|
||||
// pruneLocked drops points past retention, empty series, and — past maxSeries —
|
||||
// the least-recently-throttled series. The caller holds w.mu.
|
||||
func (w *Watch) pruneLocked(now time.Time) {
|
||||
cutoff := now.Add(-max(minRetention, w.cfg.FlagWindow))
|
||||
for k, s := range w.series {
|
||||
i := 0
|
||||
for i < len(s.points) && s.points[i].at.Before(cutoff) {
|
||||
i++
|
||||
}
|
||||
s.points = s.points[i:]
|
||||
if len(s.points) == 0 {
|
||||
delete(w.series, k)
|
||||
}
|
||||
}
|
||||
for len(w.series) > maxSeries {
|
||||
var oldest seriesKey
|
||||
var oldestAt time.Time
|
||||
first := true
|
||||
for k, s := range w.series {
|
||||
last := s.points[len(s.points)-1].at
|
||||
if first || last.Before(oldestAt) {
|
||||
oldest, oldestAt, first = k, last, false
|
||||
}
|
||||
}
|
||||
delete(w.series, oldest)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
package ratewatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// fakeFlagger records flag calls and reports them as newly set.
|
||||
type fakeFlagger struct {
|
||||
calls []uuid.UUID
|
||||
}
|
||||
|
||||
func (f *fakeFlagger) FlagHighRate(_ context.Context, id uuid.UUID, _ time.Time) (bool, error) {
|
||||
f.calls = append(f.calls, id)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// watchAt returns a Watch with a controllable clock.
|
||||
func watchAt(cfg Config, flagger Flagger, at *time.Time) *Watch {
|
||||
w := New(cfg, flagger, nil)
|
||||
w.now = func() time.Time { return *at }
|
||||
return w
|
||||
}
|
||||
|
||||
// TestIngestAggregatesAndRecent verifies episodes accumulate per (class, key),
|
||||
// invalid entries are skipped, and Recent orders by last rejection.
|
||||
func TestIngestAggregatesAndRecent(t *testing.T) {
|
||||
now := time.Date(2026, 6, 10, 12, 0, 0, 0, time.UTC)
|
||||
w := watchAt(DefaultConfig(), nil, &now)
|
||||
ctx := context.Background()
|
||||
|
||||
w.Ingest(ctx, []Entry{
|
||||
{Class: "public", Key: "10.0.0.1", Rejected: 3},
|
||||
{Class: "user", Key: "u-1", Rejected: 5},
|
||||
{Class: "", Key: "x", Rejected: 1},
|
||||
{Class: "user", Key: "", Rejected: 1},
|
||||
{Class: "user", Key: "u-1", Rejected: 0},
|
||||
})
|
||||
now = now.Add(30 * time.Second)
|
||||
w.Ingest(ctx, []Entry{{Class: "public", Key: "10.0.0.1", Rejected: 4}})
|
||||
|
||||
got := w.Recent()
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("Recent returned %d episodes, want 2", len(got))
|
||||
}
|
||||
if got[0].Class != "public" || got[0].Key != "10.0.0.1" || got[0].Rejected != 7 {
|
||||
t.Errorf("first episode = %+v, want public/10.0.0.1 rejected=7", got[0])
|
||||
}
|
||||
if !got[0].LastSeen.After(got[0].FirstSeen) {
|
||||
t.Errorf("episode span = [%v, %v], want a positive span", got[0].FirstSeen, got[0].LastSeen)
|
||||
}
|
||||
if got[1].Class != "user" || got[1].Rejected != 5 {
|
||||
t.Errorf("second episode = %+v, want user rejected=5", got[1])
|
||||
}
|
||||
}
|
||||
|
||||
// TestAutoFlagThreshold verifies the flag fires only for a user-class series
|
||||
// crossing the threshold within the window, with a parseable account id.
|
||||
func TestAutoFlagThreshold(t *testing.T) {
|
||||
now := time.Date(2026, 6, 10, 12, 0, 0, 0, time.UTC)
|
||||
flagged := &fakeFlagger{}
|
||||
id := uuid.New()
|
||||
w := watchAt(Config{FlagThreshold: 100, FlagWindow: 10 * time.Minute}, flagged, &now)
|
||||
ctx := context.Background()
|
||||
|
||||
w.Ingest(ctx, []Entry{
|
||||
{Class: "user", Key: id.String(), Rejected: 99},
|
||||
{Class: "public", Key: "10.0.0.1", Rejected: 1000},
|
||||
{Class: "user", Key: "not-a-uuid", Rejected: 1000},
|
||||
})
|
||||
if len(flagged.calls) != 0 {
|
||||
t.Fatalf("flagged %v below the threshold", flagged.calls)
|
||||
}
|
||||
|
||||
now = now.Add(30 * time.Second)
|
||||
w.Ingest(ctx, []Entry{{Class: "user", Key: id.String(), Rejected: 1}})
|
||||
if len(flagged.calls) != 1 || flagged.calls[0] != id {
|
||||
t.Fatalf("flag calls = %v, want exactly [%s]", flagged.calls, id)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAutoFlagWindowExpiry verifies rejections age out of the rolling window.
|
||||
func TestAutoFlagWindowExpiry(t *testing.T) {
|
||||
now := time.Date(2026, 6, 10, 12, 0, 0, 0, time.UTC)
|
||||
flagged := &fakeFlagger{}
|
||||
id := uuid.New()
|
||||
w := watchAt(Config{FlagThreshold: 100, FlagWindow: 10 * time.Minute}, flagged, &now)
|
||||
ctx := context.Background()
|
||||
|
||||
w.Ingest(ctx, []Entry{{Class: "user", Key: id.String(), Rejected: 60}})
|
||||
now = now.Add(11 * time.Minute)
|
||||
w.Ingest(ctx, []Entry{{Class: "user", Key: id.String(), Rejected: 60}})
|
||||
if len(flagged.calls) != 0 {
|
||||
t.Fatalf("flagged %v across an expired window", flagged.calls)
|
||||
}
|
||||
now = now.Add(time.Minute)
|
||||
w.Ingest(ctx, []Entry{{Class: "user", Key: id.String(), Rejected: 50}})
|
||||
if len(flagged.calls) != 1 {
|
||||
t.Fatalf("flag calls = %v, want one in-window crossing", flagged.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSeriesBound verifies the episode map stays bounded by evicting the
|
||||
// least-recently-throttled series.
|
||||
func TestSeriesBound(t *testing.T) {
|
||||
now := time.Date(2026, 6, 10, 12, 0, 0, 0, time.UTC)
|
||||
w := watchAt(DefaultConfig(), nil, &now)
|
||||
ctx := context.Background()
|
||||
|
||||
for i := range maxSeries + 10 {
|
||||
now = now.Add(time.Second)
|
||||
w.Ingest(ctx, []Entry{{Class: "public", Key: fmt.Sprintf("10.0.%d.%d", i/256, i%256), Rejected: 1}})
|
||||
}
|
||||
got := w.Recent()
|
||||
if len(got) != maxSeries {
|
||||
t.Fatalf("retained %d series, want %d", len(got), maxSeries)
|
||||
}
|
||||
for _, ep := range got {
|
||||
if ep.Key == "10.0.0.0" {
|
||||
t.Fatal("the least-recently-throttled series survived the bound")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestConfigValidate covers the tuning guards.
|
||||
func TestConfigValidate(t *testing.T) {
|
||||
if err := DefaultConfig().Validate(); err != nil {
|
||||
t.Errorf("default config invalid: %v", err)
|
||||
}
|
||||
if err := (Config{FlagThreshold: 0, FlagWindow: time.Minute}).Validate(); err == nil {
|
||||
t.Error("zero threshold passed validation")
|
||||
}
|
||||
if err := (Config{FlagThreshold: 1, FlagWindow: 0}).Validate(); err == nil {
|
||||
t.Error("zero window passed validation")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user