Stage 6: gateway edge (Connect/FlatBuffers over h2c, platform/email/guest auth, sessions, rate-limit, admin passthrough, live push bridge)
New public ingress and the first network edge. Framework + a vertical slice of operations end-to-end; remaining ops reuse the same transcode pattern in Stage 7. Contracts (new module scrabble/pkg): - push.proto (backend->gateway gRPC server-stream) + scrabble.fbs (FlatBuffers edge payloads), committed generated Go; buf/flatc Makefiles (dev-time codegen). Backend: - REST handlers on the /api/v1 groups: internal session endpoints (telegram/guest/email login -> mint, resolve, revoke) and the user slice (profile, submit_play, state, lobby enqueue/poll, chat). - internal/notify in-process Publisher hub + internal/pushgrpc gRPC server (BACKEND_GRPC_ADDR) streaming your_turn/opponent_moved/chat/nudge/match_found; emission in game.commit, social, matchmaker. - migration 00005 accounts.is_guest; guests are durable rows excluded from stats; ProvisionGuest; email-as-login (RequestLoginCode/LoginWithCode). Gateway (new module scrabble/gateway): - Connect Gateway service over h2c (Execute + Subscribe), FlatBuffers<->JSON transcode registry, Telegram initData HMAC validator (seam), session cache, token-bucket rate limiter (3 classes), push fan-out hub, backend REST + push gRPC client, admin Basic-Auth reverse proxy. go.work: use ./pkg, ./gateway + replace scrabble/pkg. CI: gateway/**, pkg/** path filters; unit build/vet/test span all three modules. Docs (PLAN, ARCHITECTURE, FUNCTIONAL+ru, TESTING, READMEs) updated; gateway/pkg unit tests + guest/email-login integration tests.
This commit is contained in:
@@ -0,0 +1,20 @@
|
||||
package connectsrv
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Edge-level error values wrapped in Connect status codes. Domain outcomes are
|
||||
// not here — they ride back in the ExecuteResponse result_code.
|
||||
var (
|
||||
errRateLimited = errors.New("rate limit exceeded")
|
||||
errInternal = errors.New("internal error")
|
||||
errMissingToken = errors.New("missing session token")
|
||||
errInvalidSession = errors.New("invalid or expired session")
|
||||
)
|
||||
|
||||
// errUnknownMessageType reports an unregistered message type.
|
||||
func errUnknownMessageType(msgType string) error {
|
||||
return fmt.Errorf("unknown message type %q", msgType)
|
||||
}
|
||||
@@ -0,0 +1,209 @@
|
||||
// Package connectsrv implements the public Connect edge service over h2c. Execute
|
||||
// rate-limits, authenticates (resolving the Authorization bearer token to a user
|
||||
// id for non-auth operations), and dispatches to the transcode registry; the
|
||||
// domain outcome is carried back in the ExecuteResponse result_code. Subscribe
|
||||
// bridges the gateway push hub to a client server-stream with a keep-alive
|
||||
// heartbeat.
|
||||
package connectsrv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/h2c"
|
||||
|
||||
"scrabble/gateway/internal/config"
|
||||
"scrabble/gateway/internal/push"
|
||||
"scrabble/gateway/internal/ratelimit"
|
||||
"scrabble/gateway/internal/session"
|
||||
"scrabble/gateway/internal/transcode"
|
||||
edgev1 "scrabble/gateway/proto/edge/v1"
|
||||
"scrabble/gateway/proto/edge/v1/edgev1connect"
|
||||
)
|
||||
|
||||
// heartbeatKind is the live-stream keep-alive event kind.
|
||||
const heartbeatKind = "heartbeat"
|
||||
|
||||
// Server implements edgev1connect.GatewayHandler.
|
||||
type Server struct {
|
||||
registry *transcode.Registry
|
||||
sessions *session.Cache
|
||||
limiter *ratelimit.Limiter
|
||||
hub *push.Hub
|
||||
heartbeat time.Duration
|
||||
log *zap.Logger
|
||||
|
||||
publicPolicy ratelimit.Policy
|
||||
userPolicy ratelimit.Policy
|
||||
emailPolicy ratelimit.Policy
|
||||
}
|
||||
|
||||
// Deps carries the Server's dependencies.
|
||||
type Deps struct {
|
||||
Registry *transcode.Registry
|
||||
Sessions *session.Cache
|
||||
Limiter *ratelimit.Limiter
|
||||
Hub *push.Hub
|
||||
RateLimit config.RateLimitConfig
|
||||
Heartbeat time.Duration
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewServer constructs the edge service.
|
||||
func NewServer(d Deps) *Server {
|
||||
log := d.Logger
|
||||
if log == nil {
|
||||
log = zap.NewNop()
|
||||
}
|
||||
return &Server{
|
||||
registry: d.Registry,
|
||||
sessions: d.Sessions,
|
||||
limiter: d.Limiter,
|
||||
hub: d.Hub,
|
||||
heartbeat: d.Heartbeat,
|
||||
log: log,
|
||||
publicPolicy: ratelimit.PerMinute(d.RateLimit.PublicPerMinute, d.RateLimit.PublicBurst),
|
||||
userPolicy: ratelimit.PerMinute(d.RateLimit.UserPerMinute, d.RateLimit.UserBurst),
|
||||
emailPolicy: ratelimit.Per(d.RateLimit.EmailPer10Min, 10*time.Minute, d.RateLimit.EmailBurst),
|
||||
}
|
||||
}
|
||||
|
||||
// HTTPHandler returns the h2c-wrapped Connect handler ready to serve.
|
||||
func (s *Server) HTTPHandler() http.Handler {
|
||||
mux := http.NewServeMux()
|
||||
path, h := edgev1connect.NewGatewayHandler(s)
|
||||
mux.Handle(path, h)
|
||||
return h2c.NewHandler(mux, &http2.Server{})
|
||||
}
|
||||
|
||||
// Execute runs one unary operation. Domain failures are returned in the envelope
|
||||
// (result_code != "ok", HTTP 200); only edge failures (rate limit, missing
|
||||
// session, unknown type, internal) become Connect errors.
|
||||
func (s *Server) Execute(ctx context.Context, req *connect.Request[edgev1.ExecuteRequest]) (*connect.Response[edgev1.ExecuteResponse], error) {
|
||||
msgType := req.Msg.GetMessageType()
|
||||
op, ok := s.registry.Lookup(msgType)
|
||||
if !ok {
|
||||
return nil, connect.NewError(connect.CodeNotFound, errUnknownMessageType(msgType))
|
||||
}
|
||||
clientIP := peerIP(req.Peer().Addr, req.Header())
|
||||
|
||||
tr := transcode.Request{Payload: req.Msg.GetPayload(), ClientIP: clientIP}
|
||||
if op.Auth {
|
||||
uid, err := s.resolve(ctx, req.Header())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !s.limiter.Allow("user:"+uid, s.userPolicy) {
|
||||
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
|
||||
}
|
||||
tr.UserID = uid
|
||||
} else {
|
||||
if !s.limiter.Allow("ip:"+clientIP, s.publicPolicy) {
|
||||
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
|
||||
}
|
||||
if op.Email && !s.limiter.Allow("email:"+clientIP, s.emailPolicy) {
|
||||
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
|
||||
}
|
||||
}
|
||||
|
||||
payload, err := op.Handler(ctx, tr)
|
||||
if err != nil {
|
||||
if code, domain := transcode.DomainCode(err); domain {
|
||||
return connect.NewResponse(&edgev1.ExecuteResponse{
|
||||
RequestId: req.Msg.GetRequestId(),
|
||||
ResultCode: code,
|
||||
}), nil
|
||||
}
|
||||
s.log.Error("execute failed", zap.String("message_type", msgType), zap.Error(err))
|
||||
return nil, connect.NewError(connect.CodeInternal, errInternal)
|
||||
}
|
||||
return connect.NewResponse(&edgev1.ExecuteResponse{
|
||||
RequestId: req.Msg.GetRequestId(),
|
||||
ResultCode: "ok",
|
||||
Payload: payload,
|
||||
}), nil
|
||||
}
|
||||
|
||||
// Subscribe streams the authenticated user's live events with a keep-alive
|
||||
// heartbeat until the client disconnects.
|
||||
func (s *Server) Subscribe(ctx context.Context, req *connect.Request[edgev1.SubscribeRequest], stream *connect.ServerStream[edgev1.Event]) error {
|
||||
uid, err := s.resolve(ctx, req.Header())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !s.limiter.Allow("user:"+uid, s.userPolicy) {
|
||||
return connect.NewError(connect.CodeResourceExhausted, errRateLimited)
|
||||
}
|
||||
|
||||
events, cancel := s.hub.Subscribe(uid)
|
||||
defer cancel()
|
||||
|
||||
ticker := time.NewTicker(s.heartbeat)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if err := stream.Send(&edgev1.Event{Kind: heartbeatKind}); err != nil {
|
||||
return err
|
||||
}
|
||||
case e, ok := <-events:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if err := stream.Send(&edgev1.Event{Kind: e.Kind, Payload: e.Payload, EventId: e.EventID}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// resolve extracts and resolves the Authorization bearer token to an account id,
|
||||
// returning a Connect Unauthenticated error when it is missing or unknown.
|
||||
func (s *Server) resolve(ctx context.Context, h http.Header) (string, error) {
|
||||
token := bearerToken(h.Get("Authorization"))
|
||||
if token == "" {
|
||||
return "", connect.NewError(connect.CodeUnauthenticated, errMissingToken)
|
||||
}
|
||||
uid, err := s.sessions.Resolve(ctx, token)
|
||||
if err != nil {
|
||||
return "", connect.NewError(connect.CodeUnauthenticated, errInvalidSession)
|
||||
}
|
||||
return uid, nil
|
||||
}
|
||||
|
||||
// bearerToken extracts the token from an "Authorization: Bearer <token>" header,
|
||||
// tolerating a bare token for convenience.
|
||||
func bearerToken(header string) string {
|
||||
header = strings.TrimSpace(header)
|
||||
if header == "" {
|
||||
return ""
|
||||
}
|
||||
if rest, ok := strings.CutPrefix(header, "Bearer "); ok {
|
||||
return strings.TrimSpace(rest)
|
||||
}
|
||||
return header
|
||||
}
|
||||
|
||||
// peerIP prefers the X-Forwarded-For client hop, falling back to the connection
|
||||
// peer address (host part).
|
||||
func peerIP(peerAddr string, h http.Header) string {
|
||||
if xff := h.Get("X-Forwarded-For"); xff != "" {
|
||||
if i := strings.IndexByte(xff, ','); i >= 0 {
|
||||
return strings.TrimSpace(xff[:i])
|
||||
}
|
||||
return strings.TrimSpace(xff)
|
||||
}
|
||||
if host, _, err := net.SplitHostPort(peerAddr); err == nil {
|
||||
return host
|
||||
}
|
||||
return peerAddr
|
||||
}
|
||||
@@ -0,0 +1,96 @@
|
||||
package connectsrv_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
|
||||
"scrabble/gateway/internal/backendclient"
|
||||
"scrabble/gateway/internal/config"
|
||||
"scrabble/gateway/internal/connectsrv"
|
||||
"scrabble/gateway/internal/push"
|
||||
"scrabble/gateway/internal/ratelimit"
|
||||
"scrabble/gateway/internal/session"
|
||||
"scrabble/gateway/internal/transcode"
|
||||
edgev1 "scrabble/gateway/proto/edge/v1"
|
||||
"scrabble/gateway/proto/edge/v1/edgev1connect"
|
||||
fb "scrabble/pkg/fbs/scrabblefb"
|
||||
)
|
||||
|
||||
// newEdge wires a connectsrv.Server over a fake backend and returns a Connect
|
||||
// client plus a cleanup func.
|
||||
func newEdge(t *testing.T, backendHandler http.HandlerFunc) (edgev1connect.GatewayClient, func()) {
|
||||
t.Helper()
|
||||
backendSrv := httptest.NewServer(backendHandler)
|
||||
backend, err := backendclient.New(backendSrv.URL, "localhost:9090", 2*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("backendclient: %v", err)
|
||||
}
|
||||
edge := connectsrv.NewServer(connectsrv.Deps{
|
||||
Registry: transcode.NewRegistry(backend, nil),
|
||||
Sessions: session.NewCache(backend, time.Minute, 100),
|
||||
Limiter: ratelimit.New(),
|
||||
Hub: push.NewHub(0),
|
||||
RateLimit: config.DefaultRateLimit(),
|
||||
Heartbeat: 15 * time.Second,
|
||||
})
|
||||
edgeSrv := httptest.NewServer(edge.HTTPHandler())
|
||||
client := edgev1connect.NewGatewayClient(http.DefaultClient, edgeSrv.URL)
|
||||
return client, func() {
|
||||
edgeSrv.Close()
|
||||
_ = backend.Close()
|
||||
backendSrv.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecuteGuestAuthOK(t *testing.T) {
|
||||
client, cleanup := newEdge(t, func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(`{"token":"tok","user_id":"u-1","is_guest":true,"display_name":"Guest"}`))
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
resp, err := client.Execute(context.Background(), connect.NewRequest(&edgev1.ExecuteRequest{
|
||||
MessageType: transcode.MsgAuthGuest,
|
||||
RequestId: "req-1",
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("execute: %v", err)
|
||||
}
|
||||
if resp.Msg.GetResultCode() != "ok" || resp.Msg.GetRequestId() != "req-1" {
|
||||
t.Fatalf("result = %q req_id = %q", resp.Msg.GetResultCode(), resp.Msg.GetRequestId())
|
||||
}
|
||||
sess := fb.GetRootAsSession(resp.Msg.GetPayload(), 0)
|
||||
if string(sess.Token()) != "tok" || !sess.IsGuest() {
|
||||
t.Fatalf("session decoded wrong: %q guest=%v", sess.Token(), sess.IsGuest())
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecuteAuthedRequiresSession(t *testing.T) {
|
||||
client, cleanup := newEdge(t, func(w http.ResponseWriter, r *http.Request) {
|
||||
t.Error("backend must not be called without a session")
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
_, err := client.Execute(context.Background(), connect.NewRequest(&edgev1.ExecuteRequest{
|
||||
MessageType: transcode.MsgProfileGet,
|
||||
}))
|
||||
if connect.CodeOf(err) != connect.CodeUnauthenticated {
|
||||
t.Fatalf("code = %v, want Unauthenticated", connect.CodeOf(err))
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecuteUnknownMessageType(t *testing.T) {
|
||||
client, cleanup := newEdge(t, func(w http.ResponseWriter, r *http.Request) {})
|
||||
defer cleanup()
|
||||
|
||||
_, err := client.Execute(context.Background(), connect.NewRequest(&edgev1.ExecuteRequest{
|
||||
MessageType: "does.not.exist",
|
||||
}))
|
||||
if connect.CodeOf(err) != connect.CodeNotFound {
|
||||
t.Fatalf("code = %v, want NotFound", connect.CodeOf(err))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user