// Package ratewatch ingests the gateway's periodic rate-limiter rejection // reports. It keeps an in-memory window of recent throttle episodes for // the admin console's view and applies the conservative high-rate auto-flag: // when one account's rejections within the rolling window cross the threshold, // the account store stamps the soft, reversible flagged_high_rate_at marker // (set-once; an operator clears it; never an automatic ban). Like the gateway's // active_users gauge it is single-instance and resets on restart by design — // the durable part is the account flag, not the episode window. package ratewatch import ( "context" "fmt" "sort" "sync" "time" "github.com/google/uuid" "go.uber.org/zap" ) // ClassUser is the limiter class whose keys are account ids — the only class // the auto-flag applies to (the others are keyed by client IP). const ClassUser = "user" const ( // maxSeries bounds the distinct (class, key) series kept for the console // view, so a key-spraying client cannot grow the map: past the bound the // least-recently-throttled series is evicted. maxSeries = 200 // minRetention keeps an episode visible in the console for at least an hour // after its last rejection (longer when the flag window is longer). minRetention = time.Hour ) // Config tunes the conservative high-rate auto-flag. type Config struct { // FlagThreshold is the rejected-call count within FlagWindow past which a // user account is flagged. FlagThreshold int // FlagWindow is the rolling window the rejections accumulate over. FlagWindow time.Duration } // DefaultConfig returns the agreed conservative defaults — 1000 rejected calls // within a rolling 10 minutes (~1.7/s sustained, far above the client's // capped-backoff retry noise yet a fraction of an abusive loop). func DefaultConfig() Config { return Config{FlagThreshold: 1000, FlagWindow: 10 * time.Minute} } // Validate reports whether the configuration values are acceptable. func (c Config) Validate() error { if c.FlagThreshold <= 0 { return fmt.Errorf("ratewatch: flag threshold must be positive") } if c.FlagWindow <= 0 { return fmt.Errorf("ratewatch: flag window must be positive") } return nil } // Flagger stamps the account-level high-rate marker; account.Store satisfies it. type Flagger interface { FlagHighRate(ctx context.Context, id uuid.UUID, at time.Time) (bool, error) } // Entry is one reported aggregate: the rejections of one limiter key within one // gateway report window (the wire mirror of the gateway's rejection summary). type Entry struct { Class string Key string Rejected int } // Episode is one key's recent-throttle aggregate for the admin view. type Episode struct { Class string Key string Rejected int FirstSeen time.Time LastSeen time.Time } // Watch accumulates reports and applies the auto-flag rule. type Watch struct { cfg Config flagger Flagger log *zap.Logger now func() time.Time mu sync.Mutex series map[seriesKey]*series } type seriesKey struct{ class, key string } type series struct { points []point // ascending by time } type point struct { at time.Time n int } // New constructs a Watch over flagger with cfg. A nil logger is replaced by a // no-op one; a nil flagger disables the auto-flag (the view still works). func New(cfg Config, flagger Flagger, log *zap.Logger) *Watch { if log == nil { log = zap.NewNop() } return &Watch{ cfg: cfg, flagger: flagger, log: log, now: time.Now, series: make(map[seriesKey]*series), } } // Ingest records one gateway report. Entries with an empty class or key or a // non-positive count are skipped. When a user-class series crosses the flag // threshold within the flag window, the account is flagged (the store keeps it // set-once, so a sustained episode costs one no-op UPDATE per report). func (w *Watch) Ingest(ctx context.Context, entries []Entry) { if len(entries) == 0 { return } now := w.now() var flag []uuid.UUID w.mu.Lock() for _, e := range entries { if e.Class == "" || e.Key == "" || e.Rejected <= 0 { continue } k := seriesKey{class: e.Class, key: e.Key} s := w.series[k] if s == nil { s = &series{} w.series[k] = s } s.points = append(s.points, point{at: now, n: e.Rejected}) if e.Class == ClassUser && s.sumSince(now.Add(-w.cfg.FlagWindow)) >= w.cfg.FlagThreshold { if id, err := uuid.Parse(e.Key); err == nil { flag = append(flag, id) } } } w.pruneLocked(now) w.mu.Unlock() if w.flagger == nil { return } for _, id := range flag { set, err := w.flagger.FlagHighRate(ctx, id, now) switch { case err != nil: w.log.Warn("high-rate flag failed", zap.String("account_id", id.String()), zap.Error(err)) case set: w.log.Info("account flagged high-rate", zap.String("account_id", id.String()), zap.Int("threshold", w.cfg.FlagThreshold), zap.Duration("window", w.cfg.FlagWindow)) } } } // Config returns the active auto-flag tuning (the admin console captions it). func (w *Watch) Config() Config { return w.cfg } // Recent returns the retained throttle episodes, most recently throttled first. func (w *Watch) Recent() []Episode { w.mu.Lock() defer w.mu.Unlock() out := make([]Episode, 0, len(w.series)) for k, s := range w.series { if len(s.points) == 0 { continue } ep := Episode{ Class: k.class, Key: k.key, FirstSeen: s.points[0].at, LastSeen: s.points[len(s.points)-1].at, } for _, p := range s.points { ep.Rejected += p.n } out = append(out, ep) } sort.Slice(out, func(i, j int) bool { return out[i].LastSeen.After(out[j].LastSeen) }) return out } // sumSince totals the points at or after cutoff. func (s *series) sumSince(cutoff time.Time) int { sum := 0 for i := len(s.points) - 1; i >= 0; i-- { if s.points[i].at.Before(cutoff) { break } sum += s.points[i].n } return sum } // pruneLocked drops points past retention, empty series, and — past maxSeries — // the least-recently-throttled series. The caller holds w.mu. func (w *Watch) pruneLocked(now time.Time) { cutoff := now.Add(-max(minRetention, w.cfg.FlagWindow)) for k, s := range w.series { i := 0 for i < len(s.points) && s.points[i].at.Before(cutoff) { i++ } s.points = s.points[i:] if len(s.points) == 0 { delete(w.series, k) } } for len(w.series) > maxSeries { var oldest seriesKey var oldestAt time.Time first := true for k, s := range w.series { last := s.points[len(s.points)-1].at if first || last.Before(oldestAt) { oldest, oldestAt, first = k, last, false } } delete(w.series, oldest) } }