package grpcapi import ( "context" "errors" "path" "time" "galaxy/gateway/internal/logging" "galaxy/gateway/internal/telemetry" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) func observabilityUnaryInterceptor(logger *zap.Logger, metrics *telemetry.Runtime) grpc.UnaryServerInterceptor { if logger == nil { logger = zap.NewNop() } return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { start := time.Now() resp, err := handler(ctx, req) recordGRPCRequest(logger, metrics, ctx, info.FullMethod, req, resp, err, time.Since(start), "unary") return resp, err } } func observabilityStreamInterceptor(logger *zap.Logger, metrics *telemetry.Runtime) grpc.StreamServerInterceptor { if logger == nil { logger = zap.NewNop() } return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { start := time.Now() wrapped := &observabilityServerStream{ServerStream: stream} err := handler(srv, wrapped) recordGRPCRequest(logger, metrics, stream.Context(), info.FullMethod, wrapped.request, nil, err, time.Since(start), "stream") return err } } type observabilityServerStream struct { grpc.ServerStream request any } func (s *observabilityServerStream) RecvMsg(m any) error { err := s.ServerStream.RecvMsg(m) if err == nil && s.request == nil { s.request = m } return err } func recordGRPCRequest(logger *zap.Logger, metrics *telemetry.Runtime, ctx context.Context, fullMethod string, req any, resp any, err error, duration time.Duration, streamKind string) { rpcMethod := path.Base(fullMethod) messageType, requestID, traceID := grpcEnvelopeFields(req) resultCode := grpcResultCode(resp) grpcCode, grpcMessage, outcome := grpcOutcome(err) rejectReason := telemetry.RejectReason(outcome) attrs := []attribute.KeyValue{ attribute.String("rpc_method", rpcMethod), attribute.String("message_type", messageType), attribute.String("edge_outcome", string(outcome)), } if resultCode != "" { attrs = append(attrs, attribute.String("result_code", resultCode)) } if rejectReason != "" { attrs = append(attrs, attribute.String("reject_reason", rejectReason)) } metrics.RecordAuthenticatedGRPC(ctx, attrs, duration) fields := []zap.Field{ zap.String("component", "authenticated_grpc"), zap.String("transport", "grpc"), zap.String("stream_kind", streamKind), zap.String("rpc_method", rpcMethod), zap.String("message_type", messageType), zap.String("grpc_code", grpcCode.String()), zap.Float64("duration_ms", float64(duration.Microseconds())/1000), zap.String("request_id", requestID), zap.String("trace_id", traceID), zap.String("peer_ip", peerIPFromContext(ctx)), zap.String("edge_outcome", string(outcome)), } if resultCode != "" { fields = append(fields, zap.String("result_code", resultCode)) } if rejectReason != "" { fields = append(fields, zap.String("reject_reason", rejectReason)) } if grpcMessage != "" { fields = append(fields, zap.String("grpc_message", grpcMessage)) } fields = append(fields, logging.TraceFieldsFromContext(ctx)...) switch outcome { case telemetry.EdgeOutcomeSuccess: logger.Info("authenticated gRPC request completed", fields...) case telemetry.EdgeOutcomeBackendUnavailable, telemetry.EdgeOutcomeDownstreamUnavailable, telemetry.EdgeOutcomeInternalError: logger.Error("authenticated gRPC request failed", fields...) default: logger.Warn("authenticated gRPC request rejected", fields...) } } func grpcEnvelopeFields(req any) (messageType string, requestID string, traceID string) { switch typed := req.(type) { case *gatewayv1.ExecuteCommandRequest: return typed.GetMessageType(), typed.GetRequestId(), typed.GetTraceId() case *gatewayv1.SubscribeEventsRequest: return typed.GetMessageType(), typed.GetRequestId(), typed.GetTraceId() default: return "", "", "" } } func grpcResultCode(resp any) string { typed, ok := resp.(*gatewayv1.ExecuteCommandResponse) if !ok { return "" } return typed.GetResultCode() } func grpcOutcome(err error) (codes.Code, string, telemetry.EdgeOutcome) { switch { case err == nil: return codes.OK, "", telemetry.EdgeOutcomeSuccess case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): return codes.Canceled, err.Error(), telemetry.EdgeOutcomeSuccess default: grpcStatus := status.Convert(err) return grpcStatus.Code(), grpcStatus.Message(), telemetry.OutcomeFromGRPCStatus(grpcStatus.Code(), grpcStatus.Message()) } }