Stage 12: observability & performance (OTel/OTLP, domain metrics, guest GC)
- pkg/telemetry: shared OTel provider bootstrap (none/stdout/otlp + W3C
propagators + Go runtime metrics); backend/internal/telemetry becomes a thin
facade keeping its gin middleware.
- Telemetry parity: gateway and the Telegram connector gain telemetry runtimes
and config (GATEWAY_/TELEGRAM_ SERVICE_NAME + OTEL_*); otelgrpc instruments the
backend push server, the gateway's backend+connector clients and the connector
server. Default exporter stays none (collector/dashboards are Stage 14).
- Operational metrics (variant attribute on game-scoped ones): game_replay_duration,
game_move_validate_duration, games_started_total, games_abandoned_total,
game_cache_active, chat_messages_total{kind}, gateway edge_request_duration.
Wired via the SetMetrics setter pattern (default no-op meter).
- TODO-3: account.GuestReaper deletes guests with no game seat past
BACKEND_GUEST_RETENTION (default 30d, swept every BACKEND_GUEST_REAP_INTERVAL).
- Tests: pkg/telemetry exporter selection; game/social/edge metric recording via
a manual reader; config (otlp accepted, guest knobs); inttest guest reaper.
- Docs: PLAN.md re-scopes Stage 12 and adds Stage 13 (alphabet-on-wire) + Stage 14
(CI/deploy) with the agreed dictionary-versioning resolution; ARCHITECTURE 11/13,
TESTING, the three READMEs and FUNCTIONAL(+ru) updated.
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
"time"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/h2c"
|
||||
@@ -39,6 +40,7 @@ type Server struct {
|
||||
heartbeat time.Duration
|
||||
log *zap.Logger
|
||||
adminProxy http.Handler
|
||||
metrics *serverMetrics
|
||||
|
||||
publicPolicy ratelimit.Policy
|
||||
userPolicy ratelimit.Policy
|
||||
@@ -55,6 +57,7 @@ type Deps struct {
|
||||
Heartbeat time.Duration
|
||||
Logger *zap.Logger
|
||||
AdminProxy http.Handler
|
||||
Meter metric.Meter
|
||||
}
|
||||
|
||||
// NewServer constructs the edge service.
|
||||
@@ -71,6 +74,7 @@ func NewServer(d Deps) *Server {
|
||||
heartbeat: d.Heartbeat,
|
||||
log: log,
|
||||
adminProxy: d.AdminProxy,
|
||||
metrics: newServerMetrics(d.Meter),
|
||||
publicPolicy: ratelimit.PerMinute(d.RateLimit.PublicPerMinute, d.RateLimit.PublicBurst),
|
||||
userPolicy: ratelimit.PerMinute(d.RateLimit.UserPerMinute, d.RateLimit.UserBurst),
|
||||
emailPolicy: ratelimit.Per(d.RateLimit.EmailPer10Min, 10*time.Minute, d.RateLimit.EmailBurst),
|
||||
@@ -95,9 +99,14 @@ func (s *Server) HTTPHandler() http.Handler {
|
||||
// (result_code != "ok", HTTP 200); only edge failures (rate limit, missing
|
||||
// session, unknown type, internal) become Connect errors.
|
||||
func (s *Server) Execute(ctx context.Context, req *connect.Request[edgev1.ExecuteRequest]) (*connect.Response[edgev1.ExecuteResponse], error) {
|
||||
start := time.Now()
|
||||
msgType := req.Msg.GetMessageType()
|
||||
result := "internal"
|
||||
defer func() { s.metrics.recordEdge(ctx, msgType, result, start) }()
|
||||
|
||||
op, ok := s.registry.Lookup(msgType)
|
||||
if !ok {
|
||||
result = "unknown_type"
|
||||
return nil, connect.NewError(connect.CodeNotFound, errUnknownMessageType(msgType))
|
||||
}
|
||||
clientIP := peerIP(req.Peer().Addr, req.Header())
|
||||
@@ -106,17 +115,21 @@ func (s *Server) Execute(ctx context.Context, req *connect.Request[edgev1.Execut
|
||||
if op.Auth {
|
||||
uid, err := s.resolve(ctx, req.Header())
|
||||
if err != nil {
|
||||
result = "unauthenticated"
|
||||
return nil, err
|
||||
}
|
||||
if !s.limiter.Allow("user:"+uid, s.userPolicy) {
|
||||
result = "rate_limited"
|
||||
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
|
||||
}
|
||||
tr.UserID = uid
|
||||
} else {
|
||||
if !s.limiter.Allow("ip:"+clientIP, s.publicPolicy) {
|
||||
result = "rate_limited"
|
||||
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
|
||||
}
|
||||
if op.Email && !s.limiter.Allow("email:"+clientIP, s.emailPolicy) {
|
||||
result = "rate_limited"
|
||||
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
|
||||
}
|
||||
}
|
||||
@@ -124,6 +137,7 @@ func (s *Server) Execute(ctx context.Context, req *connect.Request[edgev1.Execut
|
||||
payload, err := op.Handler(ctx, tr)
|
||||
if err != nil {
|
||||
if code, domain := transcode.DomainCode(err); domain {
|
||||
result = "domain"
|
||||
return connect.NewResponse(&edgev1.ExecuteResponse{
|
||||
RequestId: req.Msg.GetRequestId(),
|
||||
ResultCode: code,
|
||||
@@ -132,6 +146,7 @@ func (s *Server) Execute(ctx context.Context, req *connect.Request[edgev1.Execut
|
||||
s.log.Error("execute failed", zap.String("message_type", msgType), zap.Error(err))
|
||||
return nil, connect.NewError(connect.CodeInternal, errInternal)
|
||||
}
|
||||
result = "ok"
|
||||
return connect.NewResponse(&edgev1.ExecuteResponse{
|
||||
RequestId: req.Msg.GetRequestId(),
|
||||
ResultCode: "ok",
|
||||
|
||||
Reference in New Issue
Block a user