package grpcapi import ( "context" "sync" "time" "galaxy/gateway/internal/telemetry" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" "go.opentelemetry.io/otel/attribute" "google.golang.org/grpc" ) // heartbeatingStream wraps a server-streaming response so the inner // stream stays alive across browser fetch-streaming idle timeouts. // Every call to Send (a real event from a tail service) resets a // silence timer; when the timer fires, Run emits an unsigned // `gateway.heartbeat` event on its own. Send and the heartbeat // goroutine serialise on the same mutex because grpc.ServerStream.Send // is documented as not goroutine-safe. // // Wire-cost budgeting: each heartbeat is one GatewayEvent with only // EventType populated (~17 bytes + protobuf tag), framed by Connect // (~5 bytes) and HTTP/2 plus TLS overhead (~50 bytes). At the // 15-second default a fully-idle stream costs ~840 KB/day per client; // see `docs/ARCHITECTURE.md` for the per-scale projection. type heartbeatingStream struct { grpc.ServerStreamingServer[gatewayv1.GatewayEvent] interval time.Duration metrics *telemetry.Runtime sendMu sync.Mutex timer *time.Timer stopOnce sync.Once done chan struct{} } // newHeartbeatingStream wraps inner with a silence-based heartbeat // emitter. A non-positive interval returns nil so the caller can skip // the wrapping entirely; non-nil returns must have `Stop()` called once // the stream lifecycle ends. func newHeartbeatingStream( inner grpc.ServerStreamingServer[gatewayv1.GatewayEvent], interval time.Duration, metrics *telemetry.Runtime, ) *heartbeatingStream { if interval <= 0 { return nil } return &heartbeatingStream{ ServerStreamingServer: inner, interval: interval, metrics: metrics, timer: time.NewTimer(interval), done: make(chan struct{}), } } // Send forwards event to the inner stream and resets the silence timer // so the heartbeat goroutine waits a fresh interval before firing // again. A Send that succeeds means the transport just delivered real // bytes; the silence window restarts from "now". func (s *heartbeatingStream) Send(event *gatewayv1.GatewayEvent) error { s.sendMu.Lock() defer s.sendMu.Unlock() if err := s.ServerStreamingServer.Send(event); err != nil { return err } s.resetTimerLocked() return nil } // Run blocks until ctx is canceled or Stop is called, emitting one // `gateway.heartbeat` event whenever the silence timer fires. Intended // to run in its own goroutine alongside the tail service that owns the // stream. A Send failure from the heartbeat path is recorded in // telemetry and returned to the caller; production wiring discards it // because the tail service will see the same transport failure on its // next Send and propagate the real error to the gateway frame // observability layer. func (s *heartbeatingStream) Run(ctx context.Context) error { for { select { case <-ctx.Done(): return nil case <-s.done: return nil case <-s.timer.C: err := s.sendHeartbeat() if err != nil { return err } } } } // Stop halts the heartbeat goroutine and drains the silence timer. // Safe to call multiple times; subsequent calls are no-ops. func (s *heartbeatingStream) Stop() { s.stopOnce.Do(func() { close(s.done) if !s.timer.Stop() { select { case <-s.timer.C: default: } } }) } // sendHeartbeat emits one heartbeat event, records the outcome in // telemetry, and re-arms the silence timer. The outcome attribute // makes a sudden bump of `error` easy to spot in dashboards — it // usually means the upstream connection is failing before the gateway // can flush, while a steady `sent` rate is the normal idle baseline // the deployment operator budgets bandwidth against. func (s *heartbeatingStream) sendHeartbeat() error { s.sendMu.Lock() defer s.sendMu.Unlock() err := s.ServerStreamingServer.Send(buildHeartbeatEvent()) outcome := attribute.String("outcome", "sent") if err != nil { outcome = attribute.String("outcome", "error") } s.metrics.RecordPushHeartbeat(context.Background(), outcome) if err != nil { return err } s.resetTimerLocked() return nil } // resetTimerLocked re-arms the silence timer. Caller must hold sendMu. // The drain follows the canonical pattern from the time.Timer // docstring: Stop may report `false` either because the timer already // fired or because nothing was queued, so the non-blocking drain // handles both states without deadlocking when the channel was already // emptied by Run. func (s *heartbeatingStream) resetTimerLocked() { if !s.timer.Stop() { select { case <-s.timer.C: default: } } s.timer.Reset(s.interval) } // buildHeartbeatEvent returns the minimal `gateway.heartbeat` // GatewayEvent emitted into the push stream. Every field except // EventType is left at its proto3 default so the wire frame stays as // small as Connect framing allows. See `gatewayHeartbeatEventType` for // the security rationale of leaving the event unsigned. func buildHeartbeatEvent() *gatewayv1.GatewayEvent { return &gatewayv1.GatewayEvent{EventType: gatewayHeartbeatEventType} }