ab58062565
- accounts.flagged_high_rate_at baked into the R1 baseline (no prod data; the contour schema is wiped after merge); jet regenerated — the regen also picks up the previously missing game_drafts/game_hidden models. - account.Store: FlagHighRate (set-once), ClearHighRateFlag, the flag in GetByID/ListUsers and a ListFlaggedHighRate review queue. - New internal/ratewatch: ingests the gateway rejection reports, keeps a bounded in-memory episode window for the console and applies the conservative auto-flag (1000 rejected / 10 min, BACKEND_HIGHRATE_FLAG_*). - POST /api/v1/internal/ratelimit/report (network-trusted, like sessions/resolve). - Admin console: Throttled page (episodes + flagged accounts), a high-rate badge in the user list, the marker + operator clear action on the user card. - Tests: ratewatch unit suite, report-route handler test, renderer cases, integration coverage for the store round-trip and the console flow.
236 lines
6.5 KiB
Go
236 lines
6.5 KiB
Go
// Package ratewatch ingests the gateway's periodic rate-limiter rejection
|
|
// reports (R3). It keeps an in-memory window of recent throttle episodes for
|
|
// the admin console's view and applies the conservative high-rate auto-flag:
|
|
// when one account's rejections within the rolling window cross the threshold,
|
|
// the account store stamps the soft, reversible flagged_high_rate_at marker
|
|
// (set-once; an operator clears it; never an automatic ban). Like the gateway's
|
|
// active_users gauge it is single-instance and resets on restart by design —
|
|
// the durable part is the account flag, not the episode window.
|
|
package ratewatch
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// ClassUser is the limiter class whose keys are account ids — the only class
|
|
// the auto-flag applies to (the others are keyed by client IP).
|
|
const ClassUser = "user"
|
|
|
|
const (
|
|
// maxSeries bounds the distinct (class, key) series kept for the console
|
|
// view, so a key-spraying client cannot grow the map: past the bound the
|
|
// least-recently-throttled series is evicted.
|
|
maxSeries = 200
|
|
// minRetention keeps an episode visible in the console for at least an hour
|
|
// after its last rejection (longer when the flag window is longer).
|
|
minRetention = time.Hour
|
|
)
|
|
|
|
// Config tunes the conservative high-rate auto-flag.
|
|
type Config struct {
|
|
// FlagThreshold is the rejected-call count within FlagWindow past which a
|
|
// user account is flagged.
|
|
FlagThreshold int
|
|
// FlagWindow is the rolling window the rejections accumulate over.
|
|
FlagWindow time.Duration
|
|
}
|
|
|
|
// DefaultConfig returns the agreed conservative defaults — 1000 rejected calls
|
|
// within a rolling 10 minutes (~1.7/s sustained, far above the client's
|
|
// capped-backoff retry noise yet a fraction of an abusive loop).
|
|
func DefaultConfig() Config {
|
|
return Config{FlagThreshold: 1000, FlagWindow: 10 * time.Minute}
|
|
}
|
|
|
|
// Validate reports whether the configuration values are acceptable.
|
|
func (c Config) Validate() error {
|
|
if c.FlagThreshold <= 0 {
|
|
return fmt.Errorf("ratewatch: flag threshold must be positive")
|
|
}
|
|
if c.FlagWindow <= 0 {
|
|
return fmt.Errorf("ratewatch: flag window must be positive")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Flagger stamps the account-level high-rate marker; account.Store satisfies it.
|
|
type Flagger interface {
|
|
FlagHighRate(ctx context.Context, id uuid.UUID, at time.Time) (bool, error)
|
|
}
|
|
|
|
// Entry is one reported aggregate: the rejections of one limiter key within one
|
|
// gateway report window (the wire mirror of the gateway's rejection summary).
|
|
type Entry struct {
|
|
Class string
|
|
Key string
|
|
Rejected int
|
|
}
|
|
|
|
// Episode is one key's recent-throttle aggregate for the admin view.
|
|
type Episode struct {
|
|
Class string
|
|
Key string
|
|
Rejected int
|
|
FirstSeen time.Time
|
|
LastSeen time.Time
|
|
}
|
|
|
|
// Watch accumulates reports and applies the auto-flag rule.
|
|
type Watch struct {
|
|
cfg Config
|
|
flagger Flagger
|
|
log *zap.Logger
|
|
now func() time.Time
|
|
|
|
mu sync.Mutex
|
|
series map[seriesKey]*series
|
|
}
|
|
|
|
type seriesKey struct{ class, key string }
|
|
|
|
type series struct {
|
|
points []point // ascending by time
|
|
}
|
|
|
|
type point struct {
|
|
at time.Time
|
|
n int
|
|
}
|
|
|
|
// New constructs a Watch over flagger with cfg. A nil logger is replaced by a
|
|
// no-op one; a nil flagger disables the auto-flag (the view still works).
|
|
func New(cfg Config, flagger Flagger, log *zap.Logger) *Watch {
|
|
if log == nil {
|
|
log = zap.NewNop()
|
|
}
|
|
return &Watch{
|
|
cfg: cfg,
|
|
flagger: flagger,
|
|
log: log,
|
|
now: time.Now,
|
|
series: make(map[seriesKey]*series),
|
|
}
|
|
}
|
|
|
|
// Ingest records one gateway report. Entries with an empty class or key or a
|
|
// non-positive count are skipped. When a user-class series crosses the flag
|
|
// threshold within the flag window, the account is flagged (the store keeps it
|
|
// set-once, so a sustained episode costs one no-op UPDATE per report).
|
|
func (w *Watch) Ingest(ctx context.Context, entries []Entry) {
|
|
if len(entries) == 0 {
|
|
return
|
|
}
|
|
now := w.now()
|
|
var flag []uuid.UUID
|
|
w.mu.Lock()
|
|
for _, e := range entries {
|
|
if e.Class == "" || e.Key == "" || e.Rejected <= 0 {
|
|
continue
|
|
}
|
|
k := seriesKey{class: e.Class, key: e.Key}
|
|
s := w.series[k]
|
|
if s == nil {
|
|
s = &series{}
|
|
w.series[k] = s
|
|
}
|
|
s.points = append(s.points, point{at: now, n: e.Rejected})
|
|
if e.Class == ClassUser && s.sumSince(now.Add(-w.cfg.FlagWindow)) >= w.cfg.FlagThreshold {
|
|
if id, err := uuid.Parse(e.Key); err == nil {
|
|
flag = append(flag, id)
|
|
}
|
|
}
|
|
}
|
|
w.pruneLocked(now)
|
|
w.mu.Unlock()
|
|
|
|
if w.flagger == nil {
|
|
return
|
|
}
|
|
for _, id := range flag {
|
|
set, err := w.flagger.FlagHighRate(ctx, id, now)
|
|
switch {
|
|
case err != nil:
|
|
w.log.Warn("high-rate flag failed", zap.String("account_id", id.String()), zap.Error(err))
|
|
case set:
|
|
w.log.Info("account flagged high-rate",
|
|
zap.String("account_id", id.String()),
|
|
zap.Int("threshold", w.cfg.FlagThreshold),
|
|
zap.Duration("window", w.cfg.FlagWindow))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Config returns the active auto-flag tuning (the admin console captions it).
|
|
func (w *Watch) Config() Config { return w.cfg }
|
|
|
|
// Recent returns the retained throttle episodes, most recently throttled first.
|
|
func (w *Watch) Recent() []Episode {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
out := make([]Episode, 0, len(w.series))
|
|
for k, s := range w.series {
|
|
if len(s.points) == 0 {
|
|
continue
|
|
}
|
|
ep := Episode{
|
|
Class: k.class,
|
|
Key: k.key,
|
|
FirstSeen: s.points[0].at,
|
|
LastSeen: s.points[len(s.points)-1].at,
|
|
}
|
|
for _, p := range s.points {
|
|
ep.Rejected += p.n
|
|
}
|
|
out = append(out, ep)
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i].LastSeen.After(out[j].LastSeen) })
|
|
return out
|
|
}
|
|
|
|
// sumSince totals the points at or after cutoff.
|
|
func (s *series) sumSince(cutoff time.Time) int {
|
|
sum := 0
|
|
for i := len(s.points) - 1; i >= 0; i-- {
|
|
if s.points[i].at.Before(cutoff) {
|
|
break
|
|
}
|
|
sum += s.points[i].n
|
|
}
|
|
return sum
|
|
}
|
|
|
|
// pruneLocked drops points past retention, empty series, and — past maxSeries —
|
|
// the least-recently-throttled series. The caller holds w.mu.
|
|
func (w *Watch) pruneLocked(now time.Time) {
|
|
cutoff := now.Add(-max(minRetention, w.cfg.FlagWindow))
|
|
for k, s := range w.series {
|
|
i := 0
|
|
for i < len(s.points) && s.points[i].at.Before(cutoff) {
|
|
i++
|
|
}
|
|
s.points = s.points[i:]
|
|
if len(s.points) == 0 {
|
|
delete(w.series, k)
|
|
}
|
|
}
|
|
for len(w.series) > maxSeries {
|
|
var oldest seriesKey
|
|
var oldestAt time.Time
|
|
first := true
|
|
for k, s := range w.series {
|
|
last := s.points[len(s.points)-1].at
|
|
if first || last.Before(oldestAt) {
|
|
oldest, oldestAt, first = k, last, false
|
|
}
|
|
}
|
|
delete(w.series, oldest)
|
|
}
|
|
}
|