package telemetry import ( "context" "errors" "net/http" "os" "strings" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" otelprom "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" oteltrace "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) const defaultServiceName = "galaxy-edge-gateway" // Runtime owns the shared OpenTelemetry providers, the Prometheus metrics // handler, and the custom low-cardinality edge instruments. type Runtime struct { logger *zap.Logger tracerProvider *sdktrace.TracerProvider meterProvider *sdkmetric.MeterProvider promHandler http.Handler // Public REST instruments. publicRequests metric.Int64Counter publicDuration metric.Float64Histogram // Authenticated gRPC instruments. grpcRequests metric.Int64Counter grpcDuration metric.Float64Histogram // Push instruments. pushActiveStreams metric.Int64UpDownCounter pushStreamClosers metric.Int64Counter // Internal event consumer instruments. internalEventDrops metric.Int64Counter } // New constructs the gateway telemetry runtime, registers global providers, // and returns the Prometheus handler used by the admin listener. func New(ctx context.Context, logger *zap.Logger) (*Runtime, error) { if logger == nil { logger = zap.NewNop() } serviceName := strings.TrimSpace(os.Getenv("OTEL_SERVICE_NAME")) if serviceName == "" { serviceName = defaultServiceName } res, err := resource.New( ctx, resource.WithAttributes(attribute.String("service.name", serviceName)), ) if err != nil { return nil, err } tracerProvider, err := newTracerProvider(ctx, res) if err != nil { return nil, err } registry := prometheus.NewRegistry() exporter, err := otelprom.New(otelprom.WithRegisterer(registry)) if err != nil { return nil, err } meterProvider := sdkmetric.NewMeterProvider( sdkmetric.WithResource(res), sdkmetric.WithReader(exporter), ) otel.SetTracerProvider(tracerProvider) otel.SetMeterProvider(meterProvider) otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, propagation.Baggage{}, )) meter := meterProvider.Meter("galaxy/gateway") publicRequests, err := meter.Int64Counter("gateway.public_http.requests") if err != nil { return nil, err } publicDuration, err := meter.Float64Histogram("gateway.public_http.duration", metric.WithUnit("ms")) if err != nil { return nil, err } grpcRequests, err := meter.Int64Counter("gateway.authenticated_grpc.requests") if err != nil { return nil, err } grpcDuration, err := meter.Float64Histogram("gateway.authenticated_grpc.duration", metric.WithUnit("ms")) if err != nil { return nil, err } pushActiveStreams, err := meter.Int64UpDownCounter("gateway.push.active_streams") if err != nil { return nil, err } pushStreamClosers, err := meter.Int64Counter("gateway.push.stream_closures") if err != nil { return nil, err } internalEventDrops, err := meter.Int64Counter("gateway.internal_event_drops") if err != nil { return nil, err } return &Runtime{ logger: logger, tracerProvider: tracerProvider, meterProvider: meterProvider, promHandler: promhttp.HandlerFor(registry, promhttp.HandlerOpts{}), publicRequests: publicRequests, publicDuration: publicDuration, grpcRequests: grpcRequests, grpcDuration: grpcDuration, pushActiveStreams: pushActiveStreams, pushStreamClosers: pushStreamClosers, internalEventDrops: internalEventDrops, }, nil } // Handler returns the Prometheus handler that should be mounted on the admin // listener. func (r *Runtime) Handler() http.Handler { if r == nil || r.promHandler == nil { return http.NotFoundHandler() } return r.promHandler } // TracerProvider returns the runtime tracer provider, falling back to the // global one when r is not initialised. func (r *Runtime) TracerProvider() oteltrace.TracerProvider { if r == nil || r.tracerProvider == nil { return otel.GetTracerProvider() } return r.tracerProvider } // MeterProvider returns the runtime meter provider, falling back to the // global one when r is not initialised. func (r *Runtime) MeterProvider() metric.MeterProvider { if r == nil || r.meterProvider == nil { return otel.GetMeterProvider() } return r.meterProvider } // Shutdown flushes the configured telemetry providers. func (r *Runtime) Shutdown(ctx context.Context) error { if r == nil { return nil } var shutdownErr error if r.meterProvider != nil { shutdownErr = errors.Join(shutdownErr, r.meterProvider.Shutdown(ctx)) } if r.tracerProvider != nil { shutdownErr = errors.Join(shutdownErr, r.tracerProvider.Shutdown(ctx)) } return shutdownErr } // RecordPublicRequest records one public REST request outcome. func (r *Runtime) RecordPublicRequest(ctx context.Context, attrs []attribute.KeyValue, duration time.Duration) { if r == nil { return } options := metric.WithAttributes(attrs...) r.publicRequests.Add(ctx, 1, options) r.publicDuration.Record(ctx, duration.Seconds()*1000, options) } // RecordAuthenticatedGRPC records one authenticated gRPC request or stream // outcome. func (r *Runtime) RecordAuthenticatedGRPC(ctx context.Context, attrs []attribute.KeyValue, duration time.Duration) { if r == nil { return } options := metric.WithAttributes(attrs...) r.grpcRequests.Add(ctx, 1, options) r.grpcDuration.Record(ctx, duration.Seconds()*1000, options) } // AddActivePushStream records one active-stream delta. func (r *Runtime) AddActivePushStream(ctx context.Context, delta int64, attrs ...attribute.KeyValue) { if r == nil { return } r.pushActiveStreams.Add(ctx, delta, metric.WithAttributes(attrs...)) } // RecordPushStreamClosure records one push-stream closure reason. func (r *Runtime) RecordPushStreamClosure(ctx context.Context, attrs ...attribute.KeyValue) { if r == nil { return } r.pushStreamClosers.Add(ctx, 1, metric.WithAttributes(attrs...)) } // RecordInternalEventDrop records one malformed or rejected internal event. func (r *Runtime) RecordInternalEventDrop(ctx context.Context, attrs ...attribute.KeyValue) { if r == nil { return } r.internalEventDrops.Add(ctx, 1, metric.WithAttributes(attrs...)) } func newTracerProvider(ctx context.Context, res *resource.Resource) (*sdktrace.TracerProvider, error) { exporterName := strings.TrimSpace(os.Getenv("OTEL_TRACES_EXPORTER")) if exporterName == "" || exporterName == "none" { return sdktrace.NewTracerProvider(sdktrace.WithResource(res)), nil } if exporterName != "otlp" { return nil, errors.New("unsupported OTEL_TRACES_EXPORTER value") } protocol := strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL")) if protocol == "" { protocol = strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_PROTOCOL")) } var ( exporter sdktrace.SpanExporter err error ) switch protocol { case "", "http/protobuf": exporter, err = otlptracehttp.New(ctx) case "grpc": exporter, err = otlptracegrpc.New(ctx) default: return nil, errors.New("unsupported OTEL exporter protocol") } if err != nil { return nil, err } return sdktrace.NewTracerProvider( sdktrace.WithBatcher(exporter), sdktrace.WithResource(res), ), nil }