Files
scrabble-game/gateway/internal/connectsrv/server.go
T
Ilia Denisov 408da3f201
Tests · Go / test (push) Successful in 8s
Tests · Integration / integration (push) Successful in 11s
Tests · Go / test (pull_request) Successful in 6s
Tests · Integration / integration (pull_request) Successful in 10s
Stage 6: gateway edge (Connect/FlatBuffers over h2c, platform/email/guest auth, sessions, rate-limit, admin passthrough, live push bridge)
New public ingress and the first network edge. Framework + a vertical slice of
operations end-to-end; remaining ops reuse the same transcode pattern in Stage 7.

Contracts (new module scrabble/pkg):
- push.proto (backend->gateway gRPC server-stream) + scrabble.fbs (FlatBuffers
  edge payloads), committed generated Go; buf/flatc Makefiles (dev-time codegen).

Backend:
- REST handlers on the /api/v1 groups: internal session endpoints
  (telegram/guest/email login -> mint, resolve, revoke) and the user slice
  (profile, submit_play, state, lobby enqueue/poll, chat).
- internal/notify in-process Publisher hub + internal/pushgrpc gRPC server
  (BACKEND_GRPC_ADDR) streaming your_turn/opponent_moved/chat/nudge/match_found;
  emission in game.commit, social, matchmaker.
- migration 00005 accounts.is_guest; guests are durable rows excluded from stats;
  ProvisionGuest; email-as-login (RequestLoginCode/LoginWithCode).

Gateway (new module scrabble/gateway):
- Connect Gateway service over h2c (Execute + Subscribe), FlatBuffers<->JSON
  transcode registry, Telegram initData HMAC validator (seam), session cache,
  token-bucket rate limiter (3 classes), push fan-out hub, backend REST + push
  gRPC client, admin Basic-Auth reverse proxy.

go.work: use ./pkg, ./gateway + replace scrabble/pkg. CI: gateway/**, pkg/**
path filters; unit build/vet/test span all three modules. Docs (PLAN,
ARCHITECTURE, FUNCTIONAL+ru, TESTING, READMEs) updated; gateway/pkg unit tests +
guest/email-login integration tests.
2026-06-02 22:38:24 +02:00

210 lines
6.3 KiB
Go

// Package connectsrv implements the public Connect edge service over h2c. Execute
// rate-limits, authenticates (resolving the Authorization bearer token to a user
// id for non-auth operations), and dispatches to the transcode registry; the
// domain outcome is carried back in the ExecuteResponse result_code. Subscribe
// bridges the gateway push hub to a client server-stream with a keep-alive
// heartbeat.
package connectsrv
import (
"context"
"net"
"net/http"
"strings"
"time"
"connectrpc.com/connect"
"go.uber.org/zap"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"scrabble/gateway/internal/config"
"scrabble/gateway/internal/push"
"scrabble/gateway/internal/ratelimit"
"scrabble/gateway/internal/session"
"scrabble/gateway/internal/transcode"
edgev1 "scrabble/gateway/proto/edge/v1"
"scrabble/gateway/proto/edge/v1/edgev1connect"
)
// heartbeatKind is the live-stream keep-alive event kind.
const heartbeatKind = "heartbeat"
// Server implements edgev1connect.GatewayHandler.
type Server struct {
registry *transcode.Registry
sessions *session.Cache
limiter *ratelimit.Limiter
hub *push.Hub
heartbeat time.Duration
log *zap.Logger
publicPolicy ratelimit.Policy
userPolicy ratelimit.Policy
emailPolicy ratelimit.Policy
}
// Deps carries the Server's dependencies.
type Deps struct {
Registry *transcode.Registry
Sessions *session.Cache
Limiter *ratelimit.Limiter
Hub *push.Hub
RateLimit config.RateLimitConfig
Heartbeat time.Duration
Logger *zap.Logger
}
// NewServer constructs the edge service.
func NewServer(d Deps) *Server {
log := d.Logger
if log == nil {
log = zap.NewNop()
}
return &Server{
registry: d.Registry,
sessions: d.Sessions,
limiter: d.Limiter,
hub: d.Hub,
heartbeat: d.Heartbeat,
log: log,
publicPolicy: ratelimit.PerMinute(d.RateLimit.PublicPerMinute, d.RateLimit.PublicBurst),
userPolicy: ratelimit.PerMinute(d.RateLimit.UserPerMinute, d.RateLimit.UserBurst),
emailPolicy: ratelimit.Per(d.RateLimit.EmailPer10Min, 10*time.Minute, d.RateLimit.EmailBurst),
}
}
// HTTPHandler returns the h2c-wrapped Connect handler ready to serve.
func (s *Server) HTTPHandler() http.Handler {
mux := http.NewServeMux()
path, h := edgev1connect.NewGatewayHandler(s)
mux.Handle(path, h)
return h2c.NewHandler(mux, &http2.Server{})
}
// Execute runs one unary operation. Domain failures are returned in the envelope
// (result_code != "ok", HTTP 200); only edge failures (rate limit, missing
// session, unknown type, internal) become Connect errors.
func (s *Server) Execute(ctx context.Context, req *connect.Request[edgev1.ExecuteRequest]) (*connect.Response[edgev1.ExecuteResponse], error) {
msgType := req.Msg.GetMessageType()
op, ok := s.registry.Lookup(msgType)
if !ok {
return nil, connect.NewError(connect.CodeNotFound, errUnknownMessageType(msgType))
}
clientIP := peerIP(req.Peer().Addr, req.Header())
tr := transcode.Request{Payload: req.Msg.GetPayload(), ClientIP: clientIP}
if op.Auth {
uid, err := s.resolve(ctx, req.Header())
if err != nil {
return nil, err
}
if !s.limiter.Allow("user:"+uid, s.userPolicy) {
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
}
tr.UserID = uid
} else {
if !s.limiter.Allow("ip:"+clientIP, s.publicPolicy) {
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
}
if op.Email && !s.limiter.Allow("email:"+clientIP, s.emailPolicy) {
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
}
}
payload, err := op.Handler(ctx, tr)
if err != nil {
if code, domain := transcode.DomainCode(err); domain {
return connect.NewResponse(&edgev1.ExecuteResponse{
RequestId: req.Msg.GetRequestId(),
ResultCode: code,
}), nil
}
s.log.Error("execute failed", zap.String("message_type", msgType), zap.Error(err))
return nil, connect.NewError(connect.CodeInternal, errInternal)
}
return connect.NewResponse(&edgev1.ExecuteResponse{
RequestId: req.Msg.GetRequestId(),
ResultCode: "ok",
Payload: payload,
}), nil
}
// Subscribe streams the authenticated user's live events with a keep-alive
// heartbeat until the client disconnects.
func (s *Server) Subscribe(ctx context.Context, req *connect.Request[edgev1.SubscribeRequest], stream *connect.ServerStream[edgev1.Event]) error {
uid, err := s.resolve(ctx, req.Header())
if err != nil {
return err
}
if !s.limiter.Allow("user:"+uid, s.userPolicy) {
return connect.NewError(connect.CodeResourceExhausted, errRateLimited)
}
events, cancel := s.hub.Subscribe(uid)
defer cancel()
ticker := time.NewTicker(s.heartbeat)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
if err := stream.Send(&edgev1.Event{Kind: heartbeatKind}); err != nil {
return err
}
case e, ok := <-events:
if !ok {
return nil
}
if err := stream.Send(&edgev1.Event{Kind: e.Kind, Payload: e.Payload, EventId: e.EventID}); err != nil {
return err
}
}
}
}
// resolve extracts and resolves the Authorization bearer token to an account id,
// returning a Connect Unauthenticated error when it is missing or unknown.
func (s *Server) resolve(ctx context.Context, h http.Header) (string, error) {
token := bearerToken(h.Get("Authorization"))
if token == "" {
return "", connect.NewError(connect.CodeUnauthenticated, errMissingToken)
}
uid, err := s.sessions.Resolve(ctx, token)
if err != nil {
return "", connect.NewError(connect.CodeUnauthenticated, errInvalidSession)
}
return uid, nil
}
// bearerToken extracts the token from an "Authorization: Bearer <token>" header,
// tolerating a bare token for convenience.
func bearerToken(header string) string {
header = strings.TrimSpace(header)
if header == "" {
return ""
}
if rest, ok := strings.CutPrefix(header, "Bearer "); ok {
return strings.TrimSpace(rest)
}
return header
}
// peerIP prefers the X-Forwarded-For client hop, falling back to the connection
// peer address (host part).
func peerIP(peerAddr string, h http.Header) string {
if xff := h.Get("X-Forwarded-For"); xff != "" {
if i := strings.IndexByte(xff, ','); i >= 0 {
return strings.TrimSpace(xff[:i])
}
return strings.TrimSpace(xff)
}
if host, _, err := net.SplitHostPort(peerAddr); err == nil {
return host
}
return peerAddr
}