8700fbfae1
- backend + gateway multi-stage distroless Dockerfiles; the gateway embeds and
serves the SPA at / and /telegram/ via go:embed (committed dist placeholder,
real build baked in by the image's node stage)
- deploy/docker-compose.yml: backend + gateway + Postgres + Telegram connector
(VPN sidecar) + OTel Collector + Prometheus (15d) + Tempo (72h) + Grafana,
fronted by a caddy owning a single /_gm Basic-Auth (admin console + Grafana
subpath); inter-service on a private network, only caddy on the edge network
- new metrics: backend accounts_created_total{kind} (robots excluded) and an
in-memory gateway active_users{window=24h,7d} gauge
- CI: single .gitea/workflows/ci.yaml (unit/integration/ui + a gated test-contour
deploy) on the new feature/* -> development -> master branch model; the old
go-unit/integration/ui-test workflows are folded in; the connector-scoped
compose is retired (superseded by deploy/)
- docs: ARCHITECTURE §11/§12/§13, root + gateway READMEs, CLAUDE.md branching,
PLAN.md (stage 16 done + refinements + Stage 17 forward-notes)
250 lines
8.0 KiB
Go
250 lines
8.0 KiB
Go
// Package connectsrv implements the public Connect edge service over h2c. Execute
|
|
// rate-limits, authenticates (resolving the Authorization bearer token to a user
|
|
// id for non-auth operations), and dispatches to the transcode registry; the
|
|
// domain outcome is carried back in the ExecuteResponse result_code. Subscribe
|
|
// bridges the gateway push hub to a client server-stream with a keep-alive
|
|
// heartbeat.
|
|
package connectsrv
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"connectrpc.com/connect"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/net/http2"
|
|
"golang.org/x/net/http2/h2c"
|
|
|
|
"scrabble/gateway/internal/config"
|
|
"scrabble/gateway/internal/push"
|
|
"scrabble/gateway/internal/ratelimit"
|
|
"scrabble/gateway/internal/session"
|
|
"scrabble/gateway/internal/transcode"
|
|
"scrabble/gateway/internal/webui"
|
|
edgev1 "scrabble/gateway/proto/edge/v1"
|
|
"scrabble/gateway/proto/edge/v1/edgev1connect"
|
|
)
|
|
|
|
// heartbeatKind is the live-stream keep-alive event kind.
|
|
const heartbeatKind = "heartbeat"
|
|
|
|
// Server implements edgev1connect.GatewayHandler.
|
|
type Server struct {
|
|
registry *transcode.Registry
|
|
sessions *session.Cache
|
|
limiter *ratelimit.Limiter
|
|
hub *push.Hub
|
|
heartbeat time.Duration
|
|
log *zap.Logger
|
|
adminProxy http.Handler
|
|
metrics *serverMetrics
|
|
|
|
publicPolicy ratelimit.Policy
|
|
userPolicy ratelimit.Policy
|
|
emailPolicy ratelimit.Policy
|
|
}
|
|
|
|
// Deps carries the Server's dependencies.
|
|
type Deps struct {
|
|
Registry *transcode.Registry
|
|
Sessions *session.Cache
|
|
Limiter *ratelimit.Limiter
|
|
Hub *push.Hub
|
|
RateLimit config.RateLimitConfig
|
|
Heartbeat time.Duration
|
|
Logger *zap.Logger
|
|
AdminProxy http.Handler
|
|
Meter metric.Meter
|
|
}
|
|
|
|
// NewServer constructs the edge service.
|
|
func NewServer(d Deps) *Server {
|
|
log := d.Logger
|
|
if log == nil {
|
|
log = zap.NewNop()
|
|
}
|
|
return &Server{
|
|
registry: d.Registry,
|
|
sessions: d.Sessions,
|
|
limiter: d.Limiter,
|
|
hub: d.Hub,
|
|
heartbeat: d.Heartbeat,
|
|
log: log,
|
|
adminProxy: d.AdminProxy,
|
|
metrics: newServerMetrics(d.Meter),
|
|
publicPolicy: ratelimit.PerMinute(d.RateLimit.PublicPerMinute, d.RateLimit.PublicBurst),
|
|
userPolicy: ratelimit.PerMinute(d.RateLimit.UserPerMinute, d.RateLimit.UserBurst),
|
|
emailPolicy: ratelimit.Per(d.RateLimit.EmailPer10Min, 10*time.Minute, d.RateLimit.EmailBurst),
|
|
}
|
|
}
|
|
|
|
// HTTPHandler returns the h2c-wrapped Connect handler ready to serve.
|
|
func (s *Server) HTTPHandler() http.Handler {
|
|
mux := http.NewServeMux()
|
|
path, h := edgev1connect.NewGatewayHandler(s)
|
|
mux.Handle(path, h)
|
|
if s.adminProxy != nil {
|
|
// The admin console (backend /_gm) is served on the public listener behind
|
|
// the proxy's Basic-Auth, mounted below the h2c wrap so the Connect edge keeps
|
|
// working over h2c (docs/ARCHITECTURE.md §12). In the deployed contour the
|
|
// front caddy owns the /_gm Basic-Auth and Grafana routing; this mount serves
|
|
// a non-caddy (local) setup.
|
|
mux.Handle("/_gm/", s.adminProxy)
|
|
} else {
|
|
// With the console disabled here, keep /_gm a 404 so the SPA catch-all below
|
|
// does not serve the app shell at the operator path.
|
|
mux.Handle("/_gm/", http.NotFoundHandler())
|
|
}
|
|
// The embedded single-page UI is served at the site root and, for the Telegram
|
|
// Mini App, under /telegram/ — the single-origin model (docs/ARCHITECTURE.md
|
|
// §13). Both mounts sit below the h2c wrap so the Connect edge (a more specific
|
|
// prefix) keeps priority; "/" is the catch-all SPA fallback for the hash router.
|
|
mux.Handle("/telegram/", webui.Handler("/telegram/"))
|
|
mux.Handle("/", webui.Handler(""))
|
|
return h2c.NewHandler(mux, &http2.Server{})
|
|
}
|
|
|
|
// Execute runs one unary operation. Domain failures are returned in the envelope
|
|
// (result_code != "ok", HTTP 200); only edge failures (rate limit, missing
|
|
// session, unknown type, internal) become Connect errors.
|
|
func (s *Server) Execute(ctx context.Context, req *connect.Request[edgev1.ExecuteRequest]) (*connect.Response[edgev1.ExecuteResponse], error) {
|
|
start := time.Now()
|
|
msgType := req.Msg.GetMessageType()
|
|
result := "internal"
|
|
defer func() { s.metrics.recordEdge(ctx, msgType, result, start) }()
|
|
|
|
op, ok := s.registry.Lookup(msgType)
|
|
if !ok {
|
|
result = "unknown_type"
|
|
return nil, connect.NewError(connect.CodeNotFound, errUnknownMessageType(msgType))
|
|
}
|
|
clientIP := peerIP(req.Peer().Addr, req.Header())
|
|
|
|
tr := transcode.Request{Payload: req.Msg.GetPayload(), ClientIP: clientIP}
|
|
if op.Auth {
|
|
uid, err := s.resolve(ctx, req.Header())
|
|
if err != nil {
|
|
result = "unauthenticated"
|
|
return nil, err
|
|
}
|
|
// A valid session proving an authenticated request is an "action" for the
|
|
// active_users gauge, counted before the rate-limit/domain outcome.
|
|
s.metrics.recordActive(uid)
|
|
if !s.limiter.Allow("user:"+uid, s.userPolicy) {
|
|
result = "rate_limited"
|
|
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
|
|
}
|
|
tr.UserID = uid
|
|
} else {
|
|
if !s.limiter.Allow("ip:"+clientIP, s.publicPolicy) {
|
|
result = "rate_limited"
|
|
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
|
|
}
|
|
if op.Email && !s.limiter.Allow("email:"+clientIP, s.emailPolicy) {
|
|
result = "rate_limited"
|
|
return nil, connect.NewError(connect.CodeResourceExhausted, errRateLimited)
|
|
}
|
|
}
|
|
|
|
payload, err := op.Handler(ctx, tr)
|
|
if err != nil {
|
|
if code, domain := transcode.DomainCode(err); domain {
|
|
result = "domain"
|
|
return connect.NewResponse(&edgev1.ExecuteResponse{
|
|
RequestId: req.Msg.GetRequestId(),
|
|
ResultCode: code,
|
|
}), nil
|
|
}
|
|
s.log.Error("execute failed", zap.String("message_type", msgType), zap.Error(err))
|
|
return nil, connect.NewError(connect.CodeInternal, errInternal)
|
|
}
|
|
result = "ok"
|
|
return connect.NewResponse(&edgev1.ExecuteResponse{
|
|
RequestId: req.Msg.GetRequestId(),
|
|
ResultCode: "ok",
|
|
Payload: payload,
|
|
}), nil
|
|
}
|
|
|
|
// Subscribe streams the authenticated user's live events with a keep-alive
|
|
// heartbeat until the client disconnects.
|
|
func (s *Server) Subscribe(ctx context.Context, req *connect.Request[edgev1.SubscribeRequest], stream *connect.ServerStream[edgev1.Event]) error {
|
|
uid, err := s.resolve(ctx, req.Header())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !s.limiter.Allow("user:"+uid, s.userPolicy) {
|
|
return connect.NewError(connect.CodeResourceExhausted, errRateLimited)
|
|
}
|
|
|
|
events, cancel := s.hub.Subscribe(uid)
|
|
defer cancel()
|
|
|
|
ticker := time.NewTicker(s.heartbeat)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
if err := stream.Send(&edgev1.Event{Kind: heartbeatKind}); err != nil {
|
|
return err
|
|
}
|
|
case e, ok := <-events:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
if err := stream.Send(&edgev1.Event{Kind: e.Kind, Payload: e.Payload, EventId: e.EventID}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// resolve extracts and resolves the Authorization bearer token to an account id,
|
|
// returning a Connect Unauthenticated error when it is missing or unknown.
|
|
func (s *Server) resolve(ctx context.Context, h http.Header) (string, error) {
|
|
token := bearerToken(h.Get("Authorization"))
|
|
if token == "" {
|
|
return "", connect.NewError(connect.CodeUnauthenticated, errMissingToken)
|
|
}
|
|
uid, err := s.sessions.Resolve(ctx, token)
|
|
if err != nil {
|
|
return "", connect.NewError(connect.CodeUnauthenticated, errInvalidSession)
|
|
}
|
|
return uid, nil
|
|
}
|
|
|
|
// bearerToken extracts the token from an "Authorization: Bearer <token>" header,
|
|
// tolerating a bare token for convenience.
|
|
func bearerToken(header string) string {
|
|
header = strings.TrimSpace(header)
|
|
if header == "" {
|
|
return ""
|
|
}
|
|
if rest, ok := strings.CutPrefix(header, "Bearer "); ok {
|
|
return strings.TrimSpace(rest)
|
|
}
|
|
return header
|
|
}
|
|
|
|
// peerIP prefers the X-Forwarded-For client hop, falling back to the connection
|
|
// peer address (host part).
|
|
func peerIP(peerAddr string, h http.Header) string {
|
|
if xff := h.Get("X-Forwarded-For"); xff != "" {
|
|
if i := strings.IndexByte(xff, ','); i >= 0 {
|
|
return strings.TrimSpace(xff[:i])
|
|
}
|
|
return strings.TrimSpace(xff)
|
|
}
|
|
if host, _, err := net.SplitHostPort(peerAddr); err == nil {
|
|
return host
|
|
}
|
|
return peerAddr
|
|
}
|