Files
Ilia Denisov 8881214213 R6(a): de-stage code, docs, READMEs; split stage6_test
Mechanical, behaviour-preserving removal of Stage N / TODO-N / phase (RN)
references from comments, doc-comments, service READMEs, the current-state docs
(ARCHITECTURE, FUNCTIONAL+_ru, TESTING, UI_DESIGN), config-file comments, and the
.fbs/.proto schema comments. PLAN.md / PRERELEASE.md / CLAUDE.md keep the stage
history.

- Rename the only stage-named identifiers: registerStage8 -> registerSocialOps,
  registerStage11 -> registerLinkOps (gateway transcode).
- Split stage6_test.go: TestEmailLoginFlow -> email_test.go,
  TestGuestAutoMatchLeavesNoStats (+ provisionGuest) -> account_test.go.
- Regenerated proto bindings (push.pb.go, telegram_grpc.pb.go) from the de-staged
  .proto comments; FB Go/TS bindings unchanged (flatc strips schema comments).

go build/vet/gofmt clean across modules; integration typecheck and pnpm check green.
2026-06-10 16:56:03 +02:00

236 lines
6.5 KiB
Go

// Package ratewatch ingests the gateway's periodic rate-limiter rejection
// reports. It keeps an in-memory window of recent throttle episodes for
// the admin console's view and applies the conservative high-rate auto-flag:
// when one account's rejections within the rolling window cross the threshold,
// the account store stamps the soft, reversible flagged_high_rate_at marker
// (set-once; an operator clears it; never an automatic ban). Like the gateway's
// active_users gauge it is single-instance and resets on restart by design —
// the durable part is the account flag, not the episode window.
package ratewatch
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
)
// ClassUser is the limiter class whose keys are account ids — the only class
// the auto-flag applies to (the others are keyed by client IP).
const ClassUser = "user"
const (
// maxSeries bounds the distinct (class, key) series kept for the console
// view, so a key-spraying client cannot grow the map: past the bound the
// least-recently-throttled series is evicted.
maxSeries = 200
// minRetention keeps an episode visible in the console for at least an hour
// after its last rejection (longer when the flag window is longer).
minRetention = time.Hour
)
// Config tunes the conservative high-rate auto-flag.
type Config struct {
// FlagThreshold is the rejected-call count within FlagWindow past which a
// user account is flagged.
FlagThreshold int
// FlagWindow is the rolling window the rejections accumulate over.
FlagWindow time.Duration
}
// DefaultConfig returns the agreed conservative defaults — 1000 rejected calls
// within a rolling 10 minutes (~1.7/s sustained, far above the client's
// capped-backoff retry noise yet a fraction of an abusive loop).
func DefaultConfig() Config {
return Config{FlagThreshold: 1000, FlagWindow: 10 * time.Minute}
}
// Validate reports whether the configuration values are acceptable.
func (c Config) Validate() error {
if c.FlagThreshold <= 0 {
return fmt.Errorf("ratewatch: flag threshold must be positive")
}
if c.FlagWindow <= 0 {
return fmt.Errorf("ratewatch: flag window must be positive")
}
return nil
}
// Flagger stamps the account-level high-rate marker; account.Store satisfies it.
type Flagger interface {
FlagHighRate(ctx context.Context, id uuid.UUID, at time.Time) (bool, error)
}
// Entry is one reported aggregate: the rejections of one limiter key within one
// gateway report window (the wire mirror of the gateway's rejection summary).
type Entry struct {
Class string
Key string
Rejected int
}
// Episode is one key's recent-throttle aggregate for the admin view.
type Episode struct {
Class string
Key string
Rejected int
FirstSeen time.Time
LastSeen time.Time
}
// Watch accumulates reports and applies the auto-flag rule.
type Watch struct {
cfg Config
flagger Flagger
log *zap.Logger
now func() time.Time
mu sync.Mutex
series map[seriesKey]*series
}
type seriesKey struct{ class, key string }
type series struct {
points []point // ascending by time
}
type point struct {
at time.Time
n int
}
// New constructs a Watch over flagger with cfg. A nil logger is replaced by a
// no-op one; a nil flagger disables the auto-flag (the view still works).
func New(cfg Config, flagger Flagger, log *zap.Logger) *Watch {
if log == nil {
log = zap.NewNop()
}
return &Watch{
cfg: cfg,
flagger: flagger,
log: log,
now: time.Now,
series: make(map[seriesKey]*series),
}
}
// Ingest records one gateway report. Entries with an empty class or key or a
// non-positive count are skipped. When a user-class series crosses the flag
// threshold within the flag window, the account is flagged (the store keeps it
// set-once, so a sustained episode costs one no-op UPDATE per report).
func (w *Watch) Ingest(ctx context.Context, entries []Entry) {
if len(entries) == 0 {
return
}
now := w.now()
var flag []uuid.UUID
w.mu.Lock()
for _, e := range entries {
if e.Class == "" || e.Key == "" || e.Rejected <= 0 {
continue
}
k := seriesKey{class: e.Class, key: e.Key}
s := w.series[k]
if s == nil {
s = &series{}
w.series[k] = s
}
s.points = append(s.points, point{at: now, n: e.Rejected})
if e.Class == ClassUser && s.sumSince(now.Add(-w.cfg.FlagWindow)) >= w.cfg.FlagThreshold {
if id, err := uuid.Parse(e.Key); err == nil {
flag = append(flag, id)
}
}
}
w.pruneLocked(now)
w.mu.Unlock()
if w.flagger == nil {
return
}
for _, id := range flag {
set, err := w.flagger.FlagHighRate(ctx, id, now)
switch {
case err != nil:
w.log.Warn("high-rate flag failed", zap.String("account_id", id.String()), zap.Error(err))
case set:
w.log.Info("account flagged high-rate",
zap.String("account_id", id.String()),
zap.Int("threshold", w.cfg.FlagThreshold),
zap.Duration("window", w.cfg.FlagWindow))
}
}
}
// Config returns the active auto-flag tuning (the admin console captions it).
func (w *Watch) Config() Config { return w.cfg }
// Recent returns the retained throttle episodes, most recently throttled first.
func (w *Watch) Recent() []Episode {
w.mu.Lock()
defer w.mu.Unlock()
out := make([]Episode, 0, len(w.series))
for k, s := range w.series {
if len(s.points) == 0 {
continue
}
ep := Episode{
Class: k.class,
Key: k.key,
FirstSeen: s.points[0].at,
LastSeen: s.points[len(s.points)-1].at,
}
for _, p := range s.points {
ep.Rejected += p.n
}
out = append(out, ep)
}
sort.Slice(out, func(i, j int) bool { return out[i].LastSeen.After(out[j].LastSeen) })
return out
}
// sumSince totals the points at or after cutoff.
func (s *series) sumSince(cutoff time.Time) int {
sum := 0
for i := len(s.points) - 1; i >= 0; i-- {
if s.points[i].at.Before(cutoff) {
break
}
sum += s.points[i].n
}
return sum
}
// pruneLocked drops points past retention, empty series, and — past maxSeries —
// the least-recently-throttled series. The caller holds w.mu.
func (w *Watch) pruneLocked(now time.Time) {
cutoff := now.Add(-max(minRetention, w.cfg.FlagWindow))
for k, s := range w.series {
i := 0
for i < len(s.points) && s.points[i].at.Before(cutoff) {
i++
}
s.points = s.points[i:]
if len(s.points) == 0 {
delete(w.series, k)
}
}
for len(w.series) > maxSeries {
var oldest seriesKey
var oldestAt time.Time
first := true
for k, s := range w.series {
last := s.points[len(s.points)-1].at
if first || last.Before(oldestAt) {
oldest, oldestAt, first = k, last, false
}
}
delete(w.series, oldest)
}
}