// Package connectsrv implements the public Connect edge service over h2c. Execute // rate-limits, authenticates (resolving the Authorization bearer token to a user // id for non-auth operations), and dispatches to the transcode registry; the // domain outcome is carried back in the ExecuteResponse result_code. Subscribe // bridges the gateway push hub to a client server-stream with a keep-alive // heartbeat. package connectsrv import ( "context" "net" "net/http" "strings" "time" "connectrpc.com/connect" "go.uber.org/zap" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "scrabble/gateway/internal/config" "scrabble/gateway/internal/push" "scrabble/gateway/internal/ratelimit" "scrabble/gateway/internal/session" "scrabble/gateway/internal/transcode" edgev1 "scrabble/gateway/proto/edge/v1" "scrabble/gateway/proto/edge/v1/edgev1connect" ) // heartbeatKind is the live-stream keep-alive event kind. const heartbeatKind = "heartbeat" // Server implements edgev1connect.GatewayHandler. type Server struct { registry *transcode.Registry sessions *session.Cache limiter *ratelimit.Limiter hub *push.Hub heartbeat time.Duration log *zap.Logger adminProxy http.Handler publicPolicy ratelimit.Policy userPolicy ratelimit.Policy emailPolicy ratelimit.Policy } // Deps carries the Server's dependencies. type Deps struct { Registry *transcode.Registry Sessions *session.Cache Limiter *ratelimit.Limiter Hub *push.Hub RateLimit config.RateLimitConfig Heartbeat time.Duration Logger *zap.Logger AdminProxy http.Handler } // NewServer constructs the edge service. func NewServer(d Deps) *Server { log := d.Logger if log == nil { log = zap.NewNop() } return &Server{ registry: d.Registry, sessions: d.Sessions, limiter: d.Limiter, hub: d.Hub, heartbeat: d.Heartbeat, log: log, adminProxy: d.AdminProxy, publicPolicy: ratelimit.PerMinute(d.RateLimit.PublicPerMinute, d.RateLimit.PublicBurst), userPolicy: ratelimit.PerMinute(d.RateLimit.UserPerMinute, d.RateLimit.UserBurst), emailPolicy: ratelimit.Per(d.RateLimit.EmailPer10Min, 10*time.Minute, d.RateLimit.EmailBurst), } } // HTTPHandler returns the h2c-wrapped Connect handler ready to serve. func (s *Server) HTTPHandler() http.Handler { mux := http.NewServeMux() path, h := edgev1connect.NewGatewayHandler(s) mux.Handle(path, h) if s.adminProxy != nil { // The admin console (backend /_gm) is served on the public listener behind // the proxy's Basic-Auth, mounted below the h2c wrap so the Connect edge keeps // working over h2c (docs/ARCHITECTURE.md ยง12). mux.Handle("/_gm/", s.adminProxy) } return h2c.NewHandler(mux, &http2.Server{}) } // Execute runs one unary operation. Domain failures are returned in the envelope // (result_code != "ok", HTTP 200); only edge failures (rate limit, missing // session, unknown type, internal) become Connect errors. func (s *Server) Execute(ctx context.Context, req *connect.Request[edgev1.ExecuteRequest]) (*connect.Response[edgev1.ExecuteResponse], error) { msgType := req.Msg.GetMessageType() op, ok := s.registry.Lookup(msgType) if !ok { return nil, connect.NewError(connect.CodeNotFound, errUnknownMessageType(msgType)) } clientIP := peerIP(req.Peer().Addr, req.Header()) tr := transcode.Request{Payload: req.Msg.GetPayload(), ClientIP: clientIP} if op.Auth { uid, err := s.resolve(ctx, req.Header()) if err != nil { return nil, err } if !s.limiter.Allow("user:"+uid, s.userPolicy) { return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited) } tr.UserID = uid } else { if !s.limiter.Allow("ip:"+clientIP, s.publicPolicy) { return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited) } if op.Email && !s.limiter.Allow("email:"+clientIP, s.emailPolicy) { return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited) } } payload, err := op.Handler(ctx, tr) if err != nil { if code, domain := transcode.DomainCode(err); domain { return connect.NewResponse(&edgev1.ExecuteResponse{ RequestId: req.Msg.GetRequestId(), ResultCode: code, }), nil } s.log.Error("execute failed", zap.String("message_type", msgType), zap.Error(err)) return nil, connect.NewError(connect.CodeInternal, errInternal) } return connect.NewResponse(&edgev1.ExecuteResponse{ RequestId: req.Msg.GetRequestId(), ResultCode: "ok", Payload: payload, }), nil } // Subscribe streams the authenticated user's live events with a keep-alive // heartbeat until the client disconnects. func (s *Server) Subscribe(ctx context.Context, req *connect.Request[edgev1.SubscribeRequest], stream *connect.ServerStream[edgev1.Event]) error { uid, err := s.resolve(ctx, req.Header()) if err != nil { return err } if !s.limiter.Allow("user:"+uid, s.userPolicy) { return connect.NewError(connect.CodeResourceExhausted, errRateLimited) } events, cancel := s.hub.Subscribe(uid) defer cancel() ticker := time.NewTicker(s.heartbeat) defer ticker.Stop() for { select { case <-ctx.Done(): return nil case <-ticker.C: if err := stream.Send(&edgev1.Event{Kind: heartbeatKind}); err != nil { return err } case e, ok := <-events: if !ok { return nil } if err := stream.Send(&edgev1.Event{Kind: e.Kind, Payload: e.Payload, EventId: e.EventID}); err != nil { return err } } } } // resolve extracts and resolves the Authorization bearer token to an account id, // returning a Connect Unauthenticated error when it is missing or unknown. func (s *Server) resolve(ctx context.Context, h http.Header) (string, error) { token := bearerToken(h.Get("Authorization")) if token == "" { return "", connect.NewError(connect.CodeUnauthenticated, errMissingToken) } uid, err := s.sessions.Resolve(ctx, token) if err != nil { return "", connect.NewError(connect.CodeUnauthenticated, errInvalidSession) } return uid, nil } // bearerToken extracts the token from an "Authorization: Bearer " header, // tolerating a bare token for convenience. func bearerToken(header string) string { header = strings.TrimSpace(header) if header == "" { return "" } if rest, ok := strings.CutPrefix(header, "Bearer "); ok { return strings.TrimSpace(rest) } return header } // peerIP prefers the X-Forwarded-For client hop, falling back to the connection // peer address (host part). func peerIP(peerAddr string, h http.Header) string { if xff := h.Get("X-Forwarded-For"); xff != "" { if i := strings.IndexByte(xff, ','); i >= 0 { return strings.TrimSpace(xff[:i]) } return strings.TrimSpace(xff) } if host, _, err := net.SplitHostPort(peerAddr); err == nil { return host } return peerAddr }