14b65389ef
Tests · UI / test (push) Successful in 2m35s
Tests · Go / test (push) Successful in 1m56s
Tests · UI / test (pull_request) Has been cancelled
Tests · Integration / integration (pull_request) Successful in 1m42s
Tests · Go / test (pull_request) Successful in 2m0s
Browser fetch-streaming layers close response bodies they consider
idle after roughly 15-30 s without incoming bytes. Safari is the
most aggressive, but the symptom matters everywhere: a quiet
SubscribeEvents stream (lobby, between turns, mailbox empty) gets
torn down by the browser, the EventStream singleton reconnects with
backoff, and any push event that fires inside the reconnect window
is lost because `push.Hub` queues are not persisted across
subscription closes. The user-visible failure mode is the
intermittent "Fetch API cannot load … due to access control checks"
console error (a misleading WebKit symptom — CORS headers are
actually present) plus missed turn-ready / mail-received toasts.
Server-side fix: a silence-based heartbeat at the
`authenticatedPushStreamService` wrapper layer. After the signed
`gateway.server_time` bootstrap event, gateway wraps the bound
stream with `heartbeatingStream`. Every tail Send (fan-out, future
variants) resets the silence timer; when the timer elapses, a
goroutine emits `gateway.heartbeat` with only `EventType` set —
everything else stays at proto3 defaults, so the wire frame is
~45 bytes amortised. A `sendMu` serialises the heartbeat goroutine
with tail Sends because grpc.ServerStream.Send is not goroutine-safe.
The heartbeat is intentionally UNSIGNED: heartbeats carry no
payload, dispatch to no handler on the client, and an injected
heartbeat trivially causes no user-visible state change. TLS still
protects the wire and real events keep the signed envelope
unchanged. Documented in `docs/ARCHITECTURE.md` § 15 alongside the
per-scale bandwidth projection (100…100 000 clients × 15…60 s).
Config: new `GATEWAY_PUSH_HEARTBEAT_INTERVAL` (default `15s`,
`0s` disables). Telemetry: new
`gateway.push.heartbeats_sent{outcome}` counter so operators can
budget bandwidth and spot a sudden `outcome=error` bump as an
upstream-failing-before-flush signal.
Client (`ui/frontend/src/api/events.svelte.ts`): early `continue`
on `event.eventType === "gateway.heartbeat"` before `verifyEvent`,
`verifyPayloadHash`, or dispatch — empty signature would otherwise
trip SignatureError and reconnect. A leading heartbeat still flips
`connectionStatus` to `connected` and resets backoff, because
receiving one is proof the stream is healthy.
Tests:
- `push_heartbeat_test.go`: unit tests for the wrapper — zero
interval returns nil, heartbeat fires after silence, real Send
resets the timer, Stop / context-cancel halt the goroutine,
Send errors propagate.
- `server_test.go`: integration tests through the full gateway
pipeline — heartbeat fires after the configured silence window,
zero interval keeps the stream silent.
- `config_test.go`: default applied, env-override parsed,
negative value rejected.
- `events.test.ts`: heartbeat skipped before verification + not
dispatched to handlers; leading heartbeat still flips
`connectionStatus` to `connected`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
295 lines
8.2 KiB
Go
295 lines
8.2 KiB
Go
package telemetry
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
|
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.opentelemetry.io/otel/propagation"
|
|
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
|
oteltrace "go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const defaultServiceName = "galaxy-edge-gateway"
|
|
|
|
// Runtime owns the shared OpenTelemetry providers, the Prometheus metrics
|
|
// handler, and the custom low-cardinality edge instruments.
|
|
type Runtime struct {
|
|
logger *zap.Logger
|
|
|
|
tracerProvider *sdktrace.TracerProvider
|
|
meterProvider *sdkmetric.MeterProvider
|
|
promHandler http.Handler
|
|
|
|
// Public REST instruments.
|
|
publicRequests metric.Int64Counter
|
|
publicDuration metric.Float64Histogram
|
|
|
|
// Authenticated gRPC instruments.
|
|
grpcRequests metric.Int64Counter
|
|
grpcDuration metric.Float64Histogram
|
|
|
|
// Push instruments.
|
|
pushActiveStreams metric.Int64UpDownCounter
|
|
pushStreamClosers metric.Int64Counter
|
|
pushHeartbeats metric.Int64Counter
|
|
|
|
// Internal event consumer instruments.
|
|
internalEventDrops metric.Int64Counter
|
|
}
|
|
|
|
// New constructs the gateway telemetry runtime, registers global providers,
|
|
// and returns the Prometheus handler used by the admin listener.
|
|
func New(ctx context.Context, logger *zap.Logger) (*Runtime, error) {
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
serviceName := strings.TrimSpace(os.Getenv("OTEL_SERVICE_NAME"))
|
|
if serviceName == "" {
|
|
serviceName = defaultServiceName
|
|
}
|
|
|
|
res, err := resource.New(
|
|
ctx,
|
|
resource.WithAttributes(attribute.String("service.name", serviceName)),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tracerProvider, err := newTracerProvider(ctx, res)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
registry := prometheus.NewRegistry()
|
|
exporter, err := otelprom.New(otelprom.WithRegisterer(registry))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
meterProvider := sdkmetric.NewMeterProvider(
|
|
sdkmetric.WithResource(res),
|
|
sdkmetric.WithReader(exporter),
|
|
)
|
|
|
|
otel.SetTracerProvider(tracerProvider)
|
|
otel.SetMeterProvider(meterProvider)
|
|
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
|
|
propagation.TraceContext{},
|
|
propagation.Baggage{},
|
|
))
|
|
|
|
meter := meterProvider.Meter("galaxy/gateway")
|
|
|
|
publicRequests, err := meter.Int64Counter("gateway.public_http.requests")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
publicDuration, err := meter.Float64Histogram("gateway.public_http.duration", metric.WithUnit("ms"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
grpcRequests, err := meter.Int64Counter("gateway.authenticated_grpc.requests")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
grpcDuration, err := meter.Float64Histogram("gateway.authenticated_grpc.duration", metric.WithUnit("ms"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pushActiveStreams, err := meter.Int64UpDownCounter("gateway.push.active_streams")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pushStreamClosers, err := meter.Int64Counter("gateway.push.stream_closures")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pushHeartbeats, err := meter.Int64Counter("gateway.push.heartbeats_sent")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
internalEventDrops, err := meter.Int64Counter("gateway.internal_event_drops")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Runtime{
|
|
logger: logger,
|
|
tracerProvider: tracerProvider,
|
|
meterProvider: meterProvider,
|
|
promHandler: promhttp.HandlerFor(registry, promhttp.HandlerOpts{}),
|
|
publicRequests: publicRequests,
|
|
publicDuration: publicDuration,
|
|
grpcRequests: grpcRequests,
|
|
grpcDuration: grpcDuration,
|
|
pushActiveStreams: pushActiveStreams,
|
|
pushStreamClosers: pushStreamClosers,
|
|
pushHeartbeats: pushHeartbeats,
|
|
internalEventDrops: internalEventDrops,
|
|
}, nil
|
|
}
|
|
|
|
// Handler returns the Prometheus handler that should be mounted on the admin
|
|
// listener.
|
|
func (r *Runtime) Handler() http.Handler {
|
|
if r == nil || r.promHandler == nil {
|
|
return http.NotFoundHandler()
|
|
}
|
|
|
|
return r.promHandler
|
|
}
|
|
|
|
// TracerProvider returns the runtime tracer provider, falling back to the
|
|
// global one when r is not initialised.
|
|
func (r *Runtime) TracerProvider() oteltrace.TracerProvider {
|
|
if r == nil || r.tracerProvider == nil {
|
|
return otel.GetTracerProvider()
|
|
}
|
|
|
|
return r.tracerProvider
|
|
}
|
|
|
|
// MeterProvider returns the runtime meter provider, falling back to the
|
|
// global one when r is not initialised.
|
|
func (r *Runtime) MeterProvider() metric.MeterProvider {
|
|
if r == nil || r.meterProvider == nil {
|
|
return otel.GetMeterProvider()
|
|
}
|
|
|
|
return r.meterProvider
|
|
}
|
|
|
|
// Shutdown flushes the configured telemetry providers.
|
|
func (r *Runtime) Shutdown(ctx context.Context) error {
|
|
if r == nil {
|
|
return nil
|
|
}
|
|
|
|
var shutdownErr error
|
|
if r.meterProvider != nil {
|
|
shutdownErr = errors.Join(shutdownErr, r.meterProvider.Shutdown(ctx))
|
|
}
|
|
if r.tracerProvider != nil {
|
|
shutdownErr = errors.Join(shutdownErr, r.tracerProvider.Shutdown(ctx))
|
|
}
|
|
|
|
return shutdownErr
|
|
}
|
|
|
|
// RecordPublicRequest records one public REST request outcome.
|
|
func (r *Runtime) RecordPublicRequest(ctx context.Context, attrs []attribute.KeyValue, duration time.Duration) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
options := metric.WithAttributes(attrs...)
|
|
r.publicRequests.Add(ctx, 1, options)
|
|
r.publicDuration.Record(ctx, duration.Seconds()*1000, options)
|
|
}
|
|
|
|
// RecordAuthenticatedGRPC records one authenticated gRPC request or stream
|
|
// outcome.
|
|
func (r *Runtime) RecordAuthenticatedGRPC(ctx context.Context, attrs []attribute.KeyValue, duration time.Duration) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
options := metric.WithAttributes(attrs...)
|
|
r.grpcRequests.Add(ctx, 1, options)
|
|
r.grpcDuration.Record(ctx, duration.Seconds()*1000, options)
|
|
}
|
|
|
|
// AddActivePushStream records one active-stream delta.
|
|
func (r *Runtime) AddActivePushStream(ctx context.Context, delta int64, attrs ...attribute.KeyValue) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
r.pushActiveStreams.Add(ctx, delta, metric.WithAttributes(attrs...))
|
|
}
|
|
|
|
// RecordPushStreamClosure records one push-stream closure reason.
|
|
func (r *Runtime) RecordPushStreamClosure(ctx context.Context, attrs ...attribute.KeyValue) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
r.pushStreamClosers.Add(ctx, 1, metric.WithAttributes(attrs...))
|
|
}
|
|
|
|
// RecordPushHeartbeat records one outbound push-stream heartbeat event.
|
|
// The `outcome` attribute should distinguish a successful Send from a
|
|
// transport-level failure so the metric stays useful for bandwidth
|
|
// budgeting (most heartbeats are `sent`; a sudden bump of `error` means
|
|
// the upstream connection is failing before the gateway can flush).
|
|
func (r *Runtime) RecordPushHeartbeat(ctx context.Context, attrs ...attribute.KeyValue) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
r.pushHeartbeats.Add(ctx, 1, metric.WithAttributes(attrs...))
|
|
}
|
|
|
|
// RecordInternalEventDrop records one malformed or rejected internal event.
|
|
func (r *Runtime) RecordInternalEventDrop(ctx context.Context, attrs ...attribute.KeyValue) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
r.internalEventDrops.Add(ctx, 1, metric.WithAttributes(attrs...))
|
|
}
|
|
|
|
func newTracerProvider(ctx context.Context, res *resource.Resource) (*sdktrace.TracerProvider, error) {
|
|
exporterName := strings.TrimSpace(os.Getenv("OTEL_TRACES_EXPORTER"))
|
|
if exporterName == "" || exporterName == "none" {
|
|
return sdktrace.NewTracerProvider(sdktrace.WithResource(res)), nil
|
|
}
|
|
|
|
if exporterName != "otlp" {
|
|
return nil, errors.New("unsupported OTEL_TRACES_EXPORTER value")
|
|
}
|
|
|
|
protocol := strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL"))
|
|
if protocol == "" {
|
|
protocol = strings.TrimSpace(os.Getenv("OTEL_EXPORTER_OTLP_PROTOCOL"))
|
|
}
|
|
|
|
var (
|
|
exporter sdktrace.SpanExporter
|
|
err error
|
|
)
|
|
switch protocol {
|
|
case "", "http/protobuf":
|
|
exporter, err = otlptracehttp.New(ctx)
|
|
case "grpc":
|
|
exporter, err = otlptracegrpc.New(ctx)
|
|
default:
|
|
return nil, errors.New("unsupported OTEL exporter protocol")
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return sdktrace.NewTracerProvider(
|
|
sdktrace.WithBatcher(exporter),
|
|
sdktrace.WithResource(res),
|
|
), nil
|
|
}
|