feat(gateway): unsigned gateway.heartbeat keeps Safari push streams alive
Tests · UI / test (push) Successful in 2m35s
Tests · Go / test (push) Successful in 1m56s
Tests · UI / test (pull_request) Has been cancelled
Tests · Integration / integration (pull_request) Successful in 1m42s
Tests · Go / test (pull_request) Successful in 2m0s

Browser fetch-streaming layers close response bodies they consider
idle after roughly 15-30 s without incoming bytes. Safari is the
most aggressive, but the symptom matters everywhere: a quiet
SubscribeEvents stream (lobby, between turns, mailbox empty) gets
torn down by the browser, the EventStream singleton reconnects with
backoff, and any push event that fires inside the reconnect window
is lost because `push.Hub` queues are not persisted across
subscription closes. The user-visible failure mode is the
intermittent "Fetch API cannot load … due to access control checks"
console error (a misleading WebKit symptom — CORS headers are
actually present) plus missed turn-ready / mail-received toasts.

Server-side fix: a silence-based heartbeat at the
`authenticatedPushStreamService` wrapper layer. After the signed
`gateway.server_time` bootstrap event, gateway wraps the bound
stream with `heartbeatingStream`. Every tail Send (fan-out, future
variants) resets the silence timer; when the timer elapses, a
goroutine emits `gateway.heartbeat` with only `EventType` set —
everything else stays at proto3 defaults, so the wire frame is
~45 bytes amortised. A `sendMu` serialises the heartbeat goroutine
with tail Sends because grpc.ServerStream.Send is not goroutine-safe.

The heartbeat is intentionally UNSIGNED: heartbeats carry no
payload, dispatch to no handler on the client, and an injected
heartbeat trivially causes no user-visible state change. TLS still
protects the wire and real events keep the signed envelope
unchanged. Documented in `docs/ARCHITECTURE.md` § 15 alongside the
per-scale bandwidth projection (100…100 000 clients × 15…60 s).

Config: new `GATEWAY_PUSH_HEARTBEAT_INTERVAL` (default `15s`,
`0s` disables). Telemetry: new
`gateway.push.heartbeats_sent{outcome}` counter so operators can
budget bandwidth and spot a sudden `outcome=error` bump as an
upstream-failing-before-flush signal.

Client (`ui/frontend/src/api/events.svelte.ts`): early `continue`
on `event.eventType === "gateway.heartbeat"` before `verifyEvent`,
`verifyPayloadHash`, or dispatch — empty signature would otherwise
trip SignatureError and reconnect. A leading heartbeat still flips
`connectionStatus` to `connected` and resets backoff, because
receiving one is proof the stream is healthy.

Tests:
- `push_heartbeat_test.go`: unit tests for the wrapper — zero
  interval returns nil, heartbeat fires after silence, real Send
  resets the timer, Stop / context-cancel halt the goroutine,
  Send errors propagate.
- `server_test.go`: integration tests through the full gateway
  pipeline — heartbeat fires after the configured silence window,
  zero interval keeps the stream silent.
- `config_test.go`: default applied, env-override parsed,
  negative value rejected.
- `events.test.ts`: heartbeat skipped before verification + not
  dispatched to handlers; leading heartbeat still flips
  `connectionStatus` to `connected`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ilia Denisov
2026-05-19 09:29:29 +02:00
parent 8f84075c4b
commit 14b65389ef
14 changed files with 787 additions and 12 deletions
+11 -1
View File
@@ -294,7 +294,17 @@ the stream to the verified `user_id` and `device_session_id`, sends one
signed `gateway.server_time` bootstrap event whose FlatBuffers payload carries
`server_time_ms`, registers the active stream in the in-memory `PushHub`, and
then forwards signed client-facing events consumed from the configured client
event Redis stream. User-targeted events fan out to every active stream for
event Redis stream. After the bootstrap, the stream is wrapped with a
silence-based heartbeat: when no real event has been forwarded for
`GATEWAY_PUSH_HEARTBEAT_INTERVAL` (default `15s`, set to `0s` to disable),
the gateway emits an unsigned `gateway.heartbeat` event so browser
fetch-streaming layers (Safari is the most aggressive) keep the response
body open and pending push events are not lost into the client-side
reconnect window. Each emission is counted by the
`gateway.push.heartbeats_sent{outcome}` metric; see
[`docs/ARCHITECTURE.md` § 15](../docs/ARCHITECTURE.md#15-transport-security-model-gateway-boundary)
for the bandwidth projection and the reasoning behind the unsigned
envelope. User-targeted events fan out to every active stream for
that user. Session-targeted events fan out only to streams whose
`user_id` and `device_session_id` both match the event target. Each active
stream uses a bounded in-memory queue; when that queue overflows, only the
+35
View File
@@ -125,6 +125,14 @@ const (
// gRPC requests.
authenticatedGRPCFreshnessWindowEnvVar = "GATEWAY_AUTHENTICATED_GRPC_FRESHNESS_WINDOW"
// pushHeartbeatIntervalEnvVar names the environment variable that
// configures the silence-based heartbeat cadence for authenticated
// push streams. The heartbeat keeps idle SubscribeEvents responses
// alive across browser fetch-streaming idle timeouts (Safari is
// notably aggressive) so push events do not disappear into the
// reconnect window. A value of `0s` disables heartbeats entirely.
pushHeartbeatIntervalEnvVar = "GATEWAY_PUSH_HEARTBEAT_INTERVAL"
// authenticatedGRPCIPRateLimitRequestsEnvVar names the environment
// variable that configures the authenticated gRPC per-IP request budget per
// window.
@@ -321,6 +329,13 @@ const (
defaultAuthenticatedGRPCDownstreamTimeout = 5 * time.Second
defaultAuthenticatedGRPCFreshnessWindow = 5 * time.Minute
// defaultPushHeartbeatInterval is the silence window the push stream
// keeps open before emitting `gateway.heartbeat`. 15s is comfortably
// below the empirical Safari fetch-streaming idle threshold
// (~15-30s) and well above any realistic per-event rate, so the
// timer is almost always reset by a real event in active games.
defaultPushHeartbeatInterval = 15 * time.Second
defaultAuthenticatedGRPCIPRateLimitRequests = 120
defaultAuthenticatedGRPCIPRateLimitBurst = 40
@@ -549,6 +564,16 @@ type AuthenticatedGRPCConfig struct {
// used for client request timestamps.
FreshnessWindow time.Duration
// PushHeartbeatInterval is the silence window after which an open
// authenticated SubscribeEvents stream sends an unsigned
// `gateway.heartbeat` event. Every real Send resets the window, so
// in busy streams the heartbeat fires rarely. A zero or negative
// value disables the heartbeat — the stream then relies on
// transport-level keepalives only, which Safari's fetch-streaming
// layer ignores. See `docs/ARCHITECTURE.md` for the security
// rationale of leaving the heartbeat unsigned.
PushHeartbeatInterval time.Duration
// AntiAbuse configures the authenticated gRPC rate limits enforced after
// the request passes the transport authenticity checks.
AntiAbuse AuthenticatedGRPCAntiAbuseConfig
@@ -719,6 +744,7 @@ func DefaultAuthenticatedGRPCConfig() AuthenticatedGRPCConfig {
ConnectionTimeout: defaultAuthenticatedGRPCConnectionTimeout,
DownstreamTimeout: defaultAuthenticatedGRPCDownstreamTimeout,
FreshnessWindow: defaultAuthenticatedGRPCFreshnessWindow,
PushHeartbeatInterval: defaultPushHeartbeatInterval,
AntiAbuse: AuthenticatedGRPCAntiAbuseConfig{
IP: AuthenticatedRateLimitConfig{
Requests: defaultAuthenticatedGRPCIPRateLimitRequests,
@@ -928,6 +954,12 @@ func LoadFromEnv() (Config, error) {
}
cfg.AuthenticatedGRPC.DownstreamTimeout = authenticatedGRPCDownstreamTimeout
pushHeartbeatInterval, err := loadDurationEnvWithDefault(pushHeartbeatIntervalEnvVar, cfg.AuthenticatedGRPC.PushHeartbeatInterval)
if err != nil {
return Config{}, err
}
cfg.AuthenticatedGRPC.PushHeartbeatInterval = pushHeartbeatInterval
authenticatedGRPCFreshnessWindow, err := loadDurationEnvWithDefault(authenticatedGRPCFreshnessWindowEnvVar, cfg.AuthenticatedGRPC.FreshnessWindow)
if err != nil {
return Config{}, err
@@ -1156,6 +1188,9 @@ func LoadFromEnv() (Config, error) {
if cfg.AuthenticatedGRPC.FreshnessWindow <= 0 {
return Config{}, fmt.Errorf("load gateway config: %s must be positive", authenticatedGRPCFreshnessWindowEnvVar)
}
if cfg.AuthenticatedGRPC.PushHeartbeatInterval < 0 {
return Config{}, fmt.Errorf("load gateway config: %s must not be negative", pushHeartbeatIntervalEnvVar)
}
if err := validateRateLimitConfig(
cfg.AuthenticatedGRPC.AntiAbuse.IP,
authenticatedGRPCIPRateLimitRequestsEnvVar,
+25
View File
@@ -164,6 +164,30 @@ func TestLoadFromEnvAppliesPublicAndAuthGRPCDefaults(t *testing.T) {
assert.Equal(t, defaultAuthenticatedGRPCConnectionTimeout, cfg.AuthenticatedGRPC.ConnectionTimeout)
assert.Equal(t, defaultAuthenticatedGRPCDownstreamTimeout, cfg.AuthenticatedGRPC.DownstreamTimeout)
assert.Equal(t, defaultAuthenticatedGRPCFreshnessWindow, cfg.AuthenticatedGRPC.FreshnessWindow)
assert.Equal(t, defaultPushHeartbeatInterval, cfg.AuthenticatedGRPC.PushHeartbeatInterval)
}
func TestLoadFromEnvParsesPushHeartbeatInterval(t *testing.T) {
configEnvMu.Lock()
defer configEnvMu.Unlock()
resetEnv(t)
setBaseRequiredEnv(t)
t.Setenv(pushHeartbeatIntervalEnvVar, "0s")
cfg, err := LoadFromEnv()
require.NoError(t, err)
assert.Equal(t, time.Duration(0), cfg.AuthenticatedGRPC.PushHeartbeatInterval, "0s explicitly disables the heartbeat")
t.Setenv(pushHeartbeatIntervalEnvVar, "25s")
cfg, err = LoadFromEnv()
require.NoError(t, err)
assert.Equal(t, 25*time.Second, cfg.AuthenticatedGRPC.PushHeartbeatInterval)
t.Setenv(pushHeartbeatIntervalEnvVar, "-1s")
_, err = LoadFromEnv()
require.Error(t, err)
assert.Contains(t, err.Error(), pushHeartbeatIntervalEnvVar)
}
func TestLoadFromEnvParsesCORSAllowedOrigins(t *testing.T) {
@@ -211,6 +235,7 @@ func resetEnv(t *testing.T) {
authenticatedGRPCConnectionTimeoutEnvVar,
authenticatedGRPCDownstreamTimeoutEnvVar,
authenticatedGRPCFreshnessWindowEnvVar,
pushHeartbeatIntervalEnvVar,
gatewayRedisMasterAddrEnvVar,
gatewayRedisPasswordEnvVar,
replayRedisKeyPrefixEnvVar,
+163
View File
@@ -0,0 +1,163 @@
package grpcapi
import (
"context"
"sync"
"time"
"galaxy/gateway/internal/telemetry"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
)
// heartbeatingStream wraps a server-streaming response so the inner
// stream stays alive across browser fetch-streaming idle timeouts.
// Every call to Send (a real event from a tail service) resets a
// silence timer; when the timer fires, Run emits an unsigned
// `gateway.heartbeat` event on its own. Send and the heartbeat
// goroutine serialise on the same mutex because grpc.ServerStream.Send
// is documented as not goroutine-safe.
//
// Wire-cost budgeting: each heartbeat is one GatewayEvent with only
// EventType populated (~17 bytes + protobuf tag), framed by Connect
// (~5 bytes) and HTTP/2 plus TLS overhead (~50 bytes). At the
// 15-second default a fully-idle stream costs ~840 KB/day per client;
// see `docs/ARCHITECTURE.md` for the per-scale projection.
type heartbeatingStream struct {
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
interval time.Duration
metrics *telemetry.Runtime
sendMu sync.Mutex
timer *time.Timer
stopOnce sync.Once
done chan struct{}
}
// newHeartbeatingStream wraps inner with a silence-based heartbeat
// emitter. A non-positive interval returns nil so the caller can skip
// the wrapping entirely; non-nil returns must have `Stop()` called once
// the stream lifecycle ends.
func newHeartbeatingStream(
inner grpc.ServerStreamingServer[gatewayv1.GatewayEvent],
interval time.Duration,
metrics *telemetry.Runtime,
) *heartbeatingStream {
if interval <= 0 {
return nil
}
return &heartbeatingStream{
ServerStreamingServer: inner,
interval: interval,
metrics: metrics,
timer: time.NewTimer(interval),
done: make(chan struct{}),
}
}
// Send forwards event to the inner stream and resets the silence timer
// so the heartbeat goroutine waits a fresh interval before firing
// again. A Send that succeeds means the transport just delivered real
// bytes; the silence window restarts from "now".
func (s *heartbeatingStream) Send(event *gatewayv1.GatewayEvent) error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
if err := s.ServerStreamingServer.Send(event); err != nil {
return err
}
s.resetTimerLocked()
return nil
}
// Run blocks until ctx is canceled or Stop is called, emitting one
// `gateway.heartbeat` event whenever the silence timer fires. Intended
// to run in its own goroutine alongside the tail service that owns the
// stream. A Send failure from the heartbeat path is recorded in
// telemetry and returned to the caller; production wiring discards it
// because the tail service will see the same transport failure on its
// next Send and propagate the real error to the gateway frame
// observability layer.
func (s *heartbeatingStream) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-s.done:
return nil
case <-s.timer.C:
err := s.sendHeartbeat()
if err != nil {
return err
}
}
}
}
// Stop halts the heartbeat goroutine and drains the silence timer.
// Safe to call multiple times; subsequent calls are no-ops.
func (s *heartbeatingStream) Stop() {
s.stopOnce.Do(func() {
close(s.done)
if !s.timer.Stop() {
select {
case <-s.timer.C:
default:
}
}
})
}
// sendHeartbeat emits one heartbeat event, records the outcome in
// telemetry, and re-arms the silence timer. The outcome attribute
// makes a sudden bump of `error` easy to spot in dashboards — it
// usually means the upstream connection is failing before the gateway
// can flush, while a steady `sent` rate is the normal idle baseline
// the deployment operator budgets bandwidth against.
func (s *heartbeatingStream) sendHeartbeat() error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
err := s.ServerStreamingServer.Send(buildHeartbeatEvent())
outcome := attribute.String("outcome", "sent")
if err != nil {
outcome = attribute.String("outcome", "error")
}
s.metrics.RecordPushHeartbeat(context.Background(), outcome)
if err != nil {
return err
}
s.resetTimerLocked()
return nil
}
// resetTimerLocked re-arms the silence timer. Caller must hold sendMu.
// The drain follows the canonical pattern from the time.Timer
// docstring: Stop may report `false` either because the timer already
// fired or because nothing was queued, so the non-blocking drain
// handles both states without deadlocking when the channel was already
// emptied by Run.
func (s *heartbeatingStream) resetTimerLocked() {
if !s.timer.Stop() {
select {
case <-s.timer.C:
default:
}
}
s.timer.Reset(s.interval)
}
// buildHeartbeatEvent returns the minimal `gateway.heartbeat`
// GatewayEvent emitted into the push stream. Every field except
// EventType is left at its proto3 default so the wire frame stays as
// small as Connect framing allows. See `gatewayHeartbeatEventType` for
// the security rationale of leaving the event unsigned.
func buildHeartbeatEvent() *gatewayv1.GatewayEvent {
return &gatewayv1.GatewayEvent{EventType: gatewayHeartbeatEventType}
}
@@ -0,0 +1,185 @@
package grpcapi
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func TestNewHeartbeatingStreamZeroIntervalReturnsNil(t *testing.T) {
t.Parallel()
stream := newHeartbeatingStream(newCapturingStream(t), 0, nil)
assert.Nil(t, stream, "zero interval must not allocate a wrapper")
negative := newHeartbeatingStream(newCapturingStream(t), -time.Second, nil)
assert.Nil(t, negative, "negative interval must not allocate a wrapper")
}
func TestHeartbeatingStreamSendsHeartbeatAfterSilence(t *testing.T) {
t.Parallel()
inner := newCapturingStream(t)
hb := newHeartbeatingStream(inner, 30*time.Millisecond, nil)
require.NotNil(t, hb)
defer hb.Stop()
go func() { _ = hb.Run(t.Context()) }()
event := inner.recv(t, 200*time.Millisecond)
assert.Equal(t, gatewayHeartbeatEventType, event.GetEventType())
// Heartbeat envelope: only the event type travels. Every other
// field stays at proto3 default so the wire frame stays minimal.
assert.Empty(t, event.GetEventId())
assert.Zero(t, event.GetTimestampMs())
assert.Empty(t, event.GetPayloadBytes())
assert.Empty(t, event.GetPayloadHash())
assert.Empty(t, event.GetSignature())
assert.Empty(t, event.GetRequestId())
assert.Empty(t, event.GetTraceId())
}
func TestHeartbeatingStreamRealSendResetsSilenceTimer(t *testing.T) {
t.Parallel()
inner := newCapturingStream(t)
hb := newHeartbeatingStream(inner, 50*time.Millisecond, nil)
require.NotNil(t, hb)
defer hb.Stop()
go func() { _ = hb.Run(t.Context()) }()
// Reset the timer every 20ms for 120ms — the silence window never
// elapses, so the heartbeat goroutine must stay quiet and the
// channel must only carry the manual real-event Sends.
go func() {
ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()
for range 6 {
<-ticker.C
if err := hb.Send(&gatewayv1.GatewayEvent{EventType: "real.event"}); err != nil {
t.Errorf("real Send failed: %v", err)
return
}
}
}()
for range 6 {
event := inner.recv(t, 100*time.Millisecond)
assert.Equal(t, "real.event", event.GetEventType(), "only real events should appear while Send keeps resetting the silence window")
}
}
func TestHeartbeatingStreamStopHaltsRun(t *testing.T) {
t.Parallel()
inner := newCapturingStream(t)
hb := newHeartbeatingStream(inner, 20*time.Millisecond, nil)
require.NotNil(t, hb)
runDone := make(chan error, 1)
go func() { runDone <- hb.Run(context.Background()) }()
hb.Stop()
select {
case err := <-runDone:
require.NoError(t, err)
case <-time.After(200 * time.Millisecond):
t.Fatal("Run did not exit after Stop")
}
// Stop is idempotent; the second call must not panic on the
// already-closed done channel.
assert.NotPanics(t, hb.Stop)
}
func TestHeartbeatingStreamContextCancelHaltsRun(t *testing.T) {
t.Parallel()
inner := newCapturingStream(t)
hb := newHeartbeatingStream(inner, 20*time.Millisecond, nil)
require.NotNil(t, hb)
defer hb.Stop()
ctx, cancel := context.WithCancel(context.Background())
runDone := make(chan error, 1)
go func() { runDone <- hb.Run(ctx) }()
cancel()
select {
case err := <-runDone:
require.NoError(t, err)
case <-time.After(200 * time.Millisecond):
t.Fatal("Run did not exit after context cancel")
}
}
func TestHeartbeatingStreamSendErrorPropagates(t *testing.T) {
t.Parallel()
wantErr := errors.New("send failed")
inner := newCapturingStream(t)
inner.sendErr.Store(&errorBox{err: wantErr})
hb := newHeartbeatingStream(inner, time.Minute, nil)
require.NotNil(t, hb)
defer hb.Stop()
err := hb.Send(&gatewayv1.GatewayEvent{EventType: "real.event"})
require.ErrorIs(t, err, wantErr)
}
// capturingStream is a minimal grpc.ServerStreamingServer that pushes
// every Send into a channel so tests can assert on the wire frame.
type capturingStream struct {
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
events chan *gatewayv1.GatewayEvent
sendErr atomic.Pointer[errorBox]
}
type errorBox struct{ err error }
func newCapturingStream(t *testing.T) *capturingStream {
t.Helper()
return &capturingStream{events: make(chan *gatewayv1.GatewayEvent, 16)}
}
func (s *capturingStream) Send(event *gatewayv1.GatewayEvent) error {
if box := s.sendErr.Load(); box != nil {
return box.err
}
s.events <- event
return nil
}
func (s *capturingStream) Context() context.Context { return context.Background() }
func (s *capturingStream) SetHeader(metadata.MD) error { return nil }
func (s *capturingStream) SendHeader(metadata.MD) error { return nil }
func (s *capturingStream) SetTrailer(metadata.MD) {}
func (s *capturingStream) SendMsg(any) error { return errors.New("capturingStream.SendMsg: unused") }
func (s *capturingStream) RecvMsg(any) error { return errors.New("capturingStream.RecvMsg: unused") }
func (s *capturingStream) recv(t *testing.T, timeout time.Duration) *gatewayv1.GatewayEvent {
t.Helper()
select {
case event := <-s.events:
return event
case <-time.After(timeout):
t.Fatalf("no event captured within %s", timeout)
return nil
}
}
+66 -9
View File
@@ -4,9 +4,11 @@ import (
"bytes"
"context"
"crypto/sha256"
"time"
"galaxy/gateway/authn"
"galaxy/gateway/internal/clock"
"galaxy/gateway/internal/telemetry"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
gatewayfbs "galaxy/schema/fbs/gateway"
@@ -16,7 +18,31 @@ import (
"google.golang.org/grpc/status"
)
const serverTimeEventType = "gateway.server_time"
const (
serverTimeEventType = "gateway.server_time"
// gatewayHeartbeatEventType labels the silence-filling event the
// authenticated push stream emits when no real event has been Send'd
// within `AuthenticatedGRPCConfig.PushHeartbeatInterval`. Browser
// fetch-streaming layers (notably Safari) close response bodies they
// consider idle; the heartbeat keeps the body active so push events
// land on the live stream instead of disappearing into the
// client-side reconnect window.
//
// Heartbeat events are sent UNSIGNED — `EventID`, `RequestID`,
// `TraceID`, `PayloadBytes`, `PayloadHash`, `Signature`, and
// `TimestampMs` are all left at their proto3 defaults so the wire
// frame stays under ~50 bytes. The UI's EventStream short-circuits
// on this event type before signature verification (see
// `ui/frontend/src/api/events.svelte.ts`) and never dispatches it to
// handlers. The security implication is intentional and documented
// in `docs/ARCHITECTURE.md` (§ authenticated edge): an attacker who
// could inject heartbeats gains nothing — they carry no payload and
// trigger no UI behaviour, the only practical effect is keeping a
// stream marginally more alive than transport-level keepalives
// would. Real events keep the signed envelope unchanged.
gatewayHeartbeatEventType = "gateway.heartbeat"
)
// authenticatedStreamBinding captures the verified identity bound to one
// authenticated SubscribeEvents stream after the full ingress pipeline
@@ -47,12 +73,21 @@ func authenticatedStreamBindingFromContext(ctx context.Context) (authenticatedSt
// authenticatedPushStreamService owns SubscribeEvents bootstrap behavior:
// bind the authenticated stream, send the initial signed server-time event,
// and then hand the stream lifecycle to the configured tail delegate.
//
// A positive `heartbeatInterval` wraps the bound stream with
// `heartbeatingStream` before delegating, so any tail implementation
// (fan-out, hold-open, future variants) gets the silence-based
// `gateway.heartbeat` for free. The wrapper observes every real Send
// the tail performs and only emits a heartbeat when the silence window
// elapses; tails remain heartbeat-unaware.
type authenticatedPushStreamService struct {
gatewayv1.UnimplementedEdgeGatewayServer
tailDelegate gatewayv1.EdgeGatewayServer
responseSigner authn.ResponseSigner
clock clock.Clock
tailDelegate gatewayv1.EdgeGatewayServer
responseSigner authn.ResponseSigner
clock clock.Clock
heartbeatInterval time.Duration
metrics *telemetry.Runtime
}
// SubscribeEvents binds the verified stream identity, sends the initial signed
@@ -112,18 +147,40 @@ func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.Subscribe
return err
}
return s.tailDelegate.SubscribeEvents(req, boundStream)
var streamForTail grpc.ServerStreamingServer[gatewayv1.GatewayEvent] = boundStream
if hbStream := newHeartbeatingStream(boundStream, s.heartbeatInterval, s.metrics); hbStream != nil {
defer hbStream.Stop()
go func() {
// Heartbeat Send failures imply the transport is already
// dead — the tail's next Send will hit the same error and
// surface through the gateway observability layer, so we
// discard the returned error here and rely on that path
// for the canonical failure record.
_ = hbStream.Run(stream.Context())
}()
streamForTail = hbStream
}
return s.tailDelegate.SubscribeEvents(req, streamForTail)
}
func newAuthenticatedPushStreamService(tailDelegate gatewayv1.EdgeGatewayServer, responseSigner authn.ResponseSigner, clk clock.Clock) gatewayv1.EdgeGatewayServer {
func newAuthenticatedPushStreamService(
tailDelegate gatewayv1.EdgeGatewayServer,
responseSigner authn.ResponseSigner,
clk clock.Clock,
heartbeatInterval time.Duration,
metrics *telemetry.Runtime,
) gatewayv1.EdgeGatewayServer {
if tailDelegate == nil {
tailDelegate = holdOpenSubscribeEventsService{}
}
return authenticatedPushStreamService{
tailDelegate: tailDelegate,
responseSigner: responseSigner,
clock: clk,
tailDelegate: tailDelegate,
responseSigner: responseSigner,
clock: clk,
heartbeatInterval: heartbeatInterval,
metrics: metrics,
}
}
+7 -1
View File
@@ -111,7 +111,13 @@ func NewServer(cfg config.AuthenticatedGRPCConfig, deps ServerDependencies) *Ser
deps = normalizeServerDependencies(deps)
finalService := newCommandRoutingService(
newAuthenticatedPushStreamService(deps.Service, deps.ResponseSigner, deps.Clock),
newAuthenticatedPushStreamService(
deps.Service,
deps.ResponseSigner,
deps.Clock,
cfg.PushHeartbeatInterval,
deps.Telemetry,
),
deps.Router,
deps.ResponseSigner,
deps.Clock,
+96
View File
@@ -174,6 +174,102 @@ func TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation(
assert.Equal(t, connect.CodeCanceled, connect.CodeOf(recvErr))
}
func TestSubscribeEventsEmitsHeartbeatAfterSilenceWindow(t *testing.T) {
t.Parallel()
grpcCfg := config.DefaultAuthenticatedGRPCConfig()
grpcCfg.Addr = "127.0.0.1:0"
grpcCfg.FreshnessWindow = testFreshnessWindow
// 30 ms keeps the test inside the standard 60-second go test
// timeout while still giving the heartbeat goroutine enough
// headroom to fire after the bootstrap server-time event lands.
grpcCfg.PushHeartbeatInterval = 30 * time.Millisecond
server, runGateway := newTestGatewayWithGRPCConfig(t, grpcCfg, ServerDependencies{
SessionCache: staticSessionCache{
lookupFunc: func(context.Context, string) (session.Record, error) {
return newActiveSessionRecord(), nil
},
},
ReplayStore: staticReplayStore{},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
stream, err := client.SubscribeEvents(t.Context(), connect.NewRequest(newValidSubscribeEventsRequest()))
require.NoError(t, err)
t.Cleanup(func() { _ = stream.Close() })
bootstrap := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, bootstrap, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
// The next frame must be the unsigned heartbeat. Every field
// except EventType is left at its proto3 default — the UI side
// short-circuits on EventType BEFORE signature verification, so
// any non-empty signature would be wasted bytes on the wire.
require.True(t, stream.Receive(), "stream did not deliver a heartbeat: %v", stream.Err())
heartbeat := stream.Msg()
assert.Equal(t, gatewayHeartbeatEventType, heartbeat.GetEventType())
assert.Empty(t, heartbeat.GetEventId())
assert.Zero(t, heartbeat.GetTimestampMs())
assert.Empty(t, heartbeat.GetPayloadBytes())
assert.Empty(t, heartbeat.GetPayloadHash())
assert.Empty(t, heartbeat.GetSignature())
assert.Empty(t, heartbeat.GetRequestId())
assert.Empty(t, heartbeat.GetTraceId())
}
func TestSubscribeEventsZeroHeartbeatIntervalDisablesEmission(t *testing.T) {
t.Parallel()
grpcCfg := config.DefaultAuthenticatedGRPCConfig()
grpcCfg.Addr = "127.0.0.1:0"
grpcCfg.FreshnessWindow = testFreshnessWindow
grpcCfg.PushHeartbeatInterval = 0
server, runGateway := newTestGatewayWithGRPCConfig(t, grpcCfg, ServerDependencies{
SessionCache: staticSessionCache{
lookupFunc: func(context.Context, string) (session.Record, error) {
return newActiveSessionRecord(), nil
},
},
ReplayStore: staticReplayStore{},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
stream, err := client.SubscribeEvents(t.Context(), connect.NewRequest(newValidSubscribeEventsRequest()))
require.NoError(t, err)
t.Cleanup(func() { _ = stream.Close() })
bootstrap := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, bootstrap, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
// No heartbeat is expected — the stream must stay silent. A
// background Receive races a deadline check so the test fails
// fast if the gateway ever sends a second frame on this stream.
recvResult := make(chan error, 1)
go func() {
if stream.Receive() {
recvResult <- errors.New("stream produced unexpected event")
return
}
recvResult <- stream.Err()
}()
require.Never(t, func() bool {
select {
case <-recvResult:
return true
default:
return false
}
}, 200*time.Millisecond, 20*time.Millisecond, "heartbeat fired despite zero interval")
}
func TestSubscribeEventsMissingReplayStoreFailsClosed(t *testing.T) {
t.Parallel()
+19
View File
@@ -46,6 +46,7 @@ type Runtime struct {
// Push instruments.
pushActiveStreams metric.Int64UpDownCounter
pushStreamClosers metric.Int64Counter
pushHeartbeats metric.Int64Counter
// Internal event consumer instruments.
internalEventDrops metric.Int64Counter
@@ -120,6 +121,10 @@ func New(ctx context.Context, logger *zap.Logger) (*Runtime, error) {
if err != nil {
return nil, err
}
pushHeartbeats, err := meter.Int64Counter("gateway.push.heartbeats_sent")
if err != nil {
return nil, err
}
internalEventDrops, err := meter.Int64Counter("gateway.internal_event_drops")
if err != nil {
return nil, err
@@ -136,6 +141,7 @@ func New(ctx context.Context, logger *zap.Logger) (*Runtime, error) {
grpcDuration: grpcDuration,
pushActiveStreams: pushActiveStreams,
pushStreamClosers: pushStreamClosers,
pushHeartbeats: pushHeartbeats,
internalEventDrops: internalEventDrops,
}, nil
}
@@ -228,6 +234,19 @@ func (r *Runtime) RecordPushStreamClosure(ctx context.Context, attrs ...attribut
r.pushStreamClosers.Add(ctx, 1, metric.WithAttributes(attrs...))
}
// RecordPushHeartbeat records one outbound push-stream heartbeat event.
// The `outcome` attribute should distinguish a successful Send from a
// transport-level failure so the metric stays useful for bandwidth
// budgeting (most heartbeats are `sent`; a sudden bump of `error` means
// the upstream connection is failing before the gateway can flush).
func (r *Runtime) RecordPushHeartbeat(ctx context.Context, attrs ...attribute.KeyValue) {
if r == nil {
return
}
r.pushHeartbeats.Add(ctx, 1, metric.WithAttributes(attrs...))
}
// RecordInternalEventDrop records one malformed or rejected internal event.
func (r *Runtime) RecordInternalEventDrop(ctx context.Context, attrs ...attribute.KeyValue) {
if r == nil {