Files
Ilia Denisov 118f7c17a2 phase 4: connectrpc on the gateway authenticated edge
Replace the native-gRPC server bootstrap with a single
`connectrpc.com/connect` HTTP/h2c listener. Connect-Go natively
serves Connect, gRPC, and gRPC-Web on the same port, so browsers can
now reach the authenticated surface without giving up the gRPC
framing native and desktop clients may use later. The decorator
stack (envelope → session → payload-hash → signature →
freshness/replay → rate-limit → routing/push) is reused unchanged
behind a small Connect → gRPC adapter and a `grpc.ServerStream`
shim around `*connect.ServerStream`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-07 11:49:28 +02:00

111 lines
3.5 KiB
Go

package grpcapi
import (
"context"
"net"
"time"
"galaxy/gateway/internal/telemetry"
"connectrpc.com/connect"
"go.uber.org/zap"
)
// observabilityConnectInterceptor returns a Connect interceptor that records
// the same structured log entry and authenticated edge metric pair as the
// gRPC instrumentation it replaced. It also injects the parsed peer IP into
// the request context so the rate-limit decorator can attribute requests
// without depending on the gRPC `peer` package.
func observabilityConnectInterceptor(logger *zap.Logger, metrics *telemetry.Runtime) connect.Interceptor {
if logger == nil {
logger = zap.NewNop()
}
return &connectObservability{logger: logger, metrics: metrics}
}
type connectObservability struct {
logger *zap.Logger
metrics *telemetry.Runtime
}
// WrapUnary records timing and outcome for a single unary edge call.
func (o *connectObservability) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
ctx = contextWithPeerIP(ctx, hostFromConnectPeerAddr(req.Peer().Addr))
start := time.Now()
resp, err := next(ctx, req)
var respValue any
if resp != nil {
respValue = resp.Any()
}
recordEdgeRequest(o.logger, o.metrics, ctx, "connect", req.Spec().Procedure, req.Any(), respValue, err, time.Since(start), "unary")
return resp, err
}
}
// WrapStreamingClient is the client-side hook required by the
// connect.Interceptor contract. The gateway only acts as a Connect server,
// so this hook is a pass-through.
func (o *connectObservability) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
return next
}
// WrapStreamingHandler records timing and outcome for one server-streaming
// edge call. The wrapped conn captures the first received request so the
// log/metric pair carries the same envelope fields the gRPC instrumentation
// emitted before.
func (o *connectObservability) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
return func(ctx context.Context, conn connect.StreamingHandlerConn) error {
ctx = contextWithPeerIP(ctx, hostFromConnectPeerAddr(conn.Peer().Addr))
start := time.Now()
wrapped := &observabilityStreamingConn{StreamingHandlerConn: conn}
err := next(ctx, wrapped)
recordEdgeRequest(o.logger, o.metrics, ctx, "connect", conn.Spec().Procedure, wrapped.firstRequest, nil, err, time.Since(start), "stream")
return err
}
}
// observabilityStreamingConn captures the first received request so the
// streaming-handler interceptor can derive the envelope log fields after
// the handler returns.
type observabilityStreamingConn struct {
connect.StreamingHandlerConn
firstRequest any
}
// Receive forwards to the underlying conn and stores the first successful
// message, so envelopeFieldsFromRequest can read message_type, request_id,
// and trace_id from it.
func (c *observabilityStreamingConn) Receive(msg any) error {
err := c.StreamingHandlerConn.Receive(msg)
if err == nil && c.firstRequest == nil {
c.firstRequest = msg
}
return err
}
// hostFromConnectPeerAddr returns the host part of a "host:port" peer
// address, or the address verbatim when it cannot be split. Empty input
// yields an empty string so peerIPFromContext falls back to the canonical
// `unknown` bucket.
func hostFromConnectPeerAddr(addr string) string {
if addr == "" {
return ""
}
host, _, err := net.SplitHostPort(addr)
if err == nil && host != "" {
return host
}
return addr
}