package grpcapi import ( "context" "net" "time" "galaxy/gateway/internal/telemetry" "connectrpc.com/connect" "go.uber.org/zap" ) // observabilityConnectInterceptor returns a Connect interceptor that records // the same structured log entry and authenticated edge metric pair as the // gRPC instrumentation it replaced. It also injects the parsed peer IP into // the request context so the rate-limit decorator can attribute requests // without depending on the gRPC `peer` package. func observabilityConnectInterceptor(logger *zap.Logger, metrics *telemetry.Runtime) connect.Interceptor { if logger == nil { logger = zap.NewNop() } return &connectObservability{logger: logger, metrics: metrics} } type connectObservability struct { logger *zap.Logger metrics *telemetry.Runtime } // WrapUnary records timing and outcome for a single unary edge call. func (o *connectObservability) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { ctx = contextWithPeerIP(ctx, hostFromConnectPeerAddr(req.Peer().Addr)) start := time.Now() resp, err := next(ctx, req) var respValue any if resp != nil { respValue = resp.Any() } recordEdgeRequest(o.logger, o.metrics, ctx, "connect", req.Spec().Procedure, req.Any(), respValue, err, time.Since(start), "unary") return resp, err } } // WrapStreamingClient is the client-side hook required by the // connect.Interceptor contract. The gateway only acts as a Connect server, // so this hook is a pass-through. func (o *connectObservability) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { return next } // WrapStreamingHandler records timing and outcome for one server-streaming // edge call. The wrapped conn captures the first received request so the // log/metric pair carries the same envelope fields the gRPC instrumentation // emitted before. func (o *connectObservability) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { return func(ctx context.Context, conn connect.StreamingHandlerConn) error { ctx = contextWithPeerIP(ctx, hostFromConnectPeerAddr(conn.Peer().Addr)) start := time.Now() wrapped := &observabilityStreamingConn{StreamingHandlerConn: conn} err := next(ctx, wrapped) recordEdgeRequest(o.logger, o.metrics, ctx, "connect", conn.Spec().Procedure, wrapped.firstRequest, nil, err, time.Since(start), "stream") return err } } // observabilityStreamingConn captures the first received request so the // streaming-handler interceptor can derive the envelope log fields after // the handler returns. type observabilityStreamingConn struct { connect.StreamingHandlerConn firstRequest any } // Receive forwards to the underlying conn and stores the first successful // message, so envelopeFieldsFromRequest can read message_type, request_id, // and trace_id from it. func (c *observabilityStreamingConn) Receive(msg any) error { err := c.StreamingHandlerConn.Receive(msg) if err == nil && c.firstRequest == nil { c.firstRequest = msg } return err } // hostFromConnectPeerAddr returns the host part of a "host:port" peer // address, or the address verbatim when it cannot be split. Empty input // yields an empty string so peerIPFromContext falls back to the canonical // `unknown` bucket. func hostFromConnectPeerAddr(addr string) string { if addr == "" { return "" } host, _, err := net.SplitHostPort(addr) if err == nil && host != "" { return host } return addr }