feat(gateway): unsigned gateway.heartbeat keeps Safari push streams alive #17

Merged
developer merged 1 commits from feature/subscribe-events-heartbeat into development 2026-05-19 07:35:35 +00:00
14 changed files with 787 additions and 12 deletions
+49
View File
@@ -691,6 +691,55 @@ stream opens is `event_type = gateway.server_time`, reusing the opening
`request_id` as `event_id` and carrying `server_time_ms` so clients can `request_id` as `event_id` and carrying `server_time_ms` so clients can
calibrate offset without a separate time request. calibrate offset without a separate time request.
#### Unsigned `gateway.heartbeat` keepalive
Browser fetch-streaming layers (notably WebKit/Safari) close response
bodies they consider idle after roughly 15-30 seconds without
incoming bytes. A push stream in a quiet game (no `game.turn.ready`,
no diplomatic mail) would otherwise be torn down and reopened
repeatedly; events that fire during the reconnect window vanish
because `push.Hub` queues are not persisted across subscription
closes. To keep the body active, the gateway emits a
`gateway.heartbeat` event after `GATEWAY_PUSH_HEARTBEAT_INTERVAL` of
stream silence (default `15s`; set to `0s` to disable). Every real
event resets the silence timer, so the heartbeat fires rarely on
busy streams.
Heartbeats are sent **unsigned**: every field except `event_type` is
left at its protobuf default and no Ed25519 signature is computed.
The client short-circuits on the `gateway.heartbeat` type before
calling `verifyEvent` / `verifyPayloadHash` and never dispatches the
event to handlers. The security implication is intentional —
heartbeats carry no payload that the UI acts on, so an injected
heartbeat trivially fails to cause any user-visible state change.
TLS still protects the wire and the rest of the signed envelope is
unchanged for real events.
##### Wire cost projections
| Clients | 15 s | 30 s | 60 s |
| ------: | ---: | ---: | ---: |
| 100 | 25 MB/day | 13 MB/day | 6 MB/day |
| 1 000 | 250 MB/day | 125 MB/day | 62 MB/day |
| 10 000 | 2.5 GB/day | 1.3 GB/day | 0.6 GB/day |
| 100 000 | 25 GB/day | 12.5 GB/day | 6 GB/day |
Per-heartbeat budget at ~45 bytes on the wire (proto + Connect
framing + HTTP/2 DATA header + amortised TLS overhead), worst case
when no real event ever displaces a tick. Active streams trade
heartbeat traffic for real-event traffic 1:1, so the table is the
upper bound at the chosen interval. Larger deployments that are
willing to take a marginally higher Safari reconnect risk should
raise `GATEWAY_PUSH_HEARTBEAT_INTERVAL` toward 30 s before paying
the full table; setting `0s` reclaims all bytes at the cost of the
visible Safari reconnect loop returning.
Observability: every emission increments the
`gateway.push.heartbeats_sent{outcome}` counter, where
`outcome=sent` is the steady-state line item the operator budgets
bandwidth against and a sudden `outcome=error` bump means the
upstream connection is failing before the gateway can flush.
### Verification order at gateway ### Verification order at gateway
Before any payload is forwarded to backend, gateway must: Before any payload is forwarded to backend, gateway must:
+12
View File
@@ -820,6 +820,18 @@ internal hub. The first frame the client receives is a
gateway-signed bootstrap event carrying the current server time, so gateway-signed bootstrap event carrying the current server time, so
the client can calibrate its local clock without a separate request. the client can calibrate its local clock without a separate request.
While the stream is open, gateway tracks a silence timer; if no real
event has been forwarded for `GATEWAY_PUSH_HEARTBEAT_INTERVAL`
(default `15s`, `0s` disables), gateway emits an unsigned
`gateway.heartbeat` event to keep browser fetch-streaming layers
from closing the response body as idle. Real events reset the
timer, so on busy streams the heartbeat fires rarely. The UI client
short-circuits the heartbeat type before signature verification and
never dispatches it to handlers — see
[`docs/ARCHITECTURE.md` § 15](ARCHITECTURE.md#15-transport-security-model-gateway-boundary)
for the wire-cost projection and the security rationale of leaving
the heartbeat unsigned.
### 7.3 Backend → gateway control ### 7.3 Backend → gateway control
Backend hosts a single gRPC service `Push.SubscribePush`, consumed Backend hosts a single gRPC service `Push.SubscribePush`, consumed
+12
View File
@@ -845,6 +845,18 @@ control-канал backend → gateway, который производит эт
с текущим серверным временем, чтобы клиент мог калибровать свои с текущим серверным временем, чтобы клиент мог калибровать свои
локальные часы без отдельного запроса. локальные часы без отдельного запроса.
Пока стрим открыт, gateway отслеживает таймер тишины; если за
`GATEWAY_PUSH_HEARTBEAT_INTERVAL` (по умолчанию `15s`, `0s`
отключает) не пришло ни одного реального события, gateway
отправляет неподписанное `gateway.heartbeat`-событие, чтобы
браузерные fetch-streaming слои не закрыли response body как idle.
Реальные события сбрасывают таймер, поэтому на нагруженных
стримах heartbeat срабатывает редко. UI-клиент короткозамыкает
heartbeat-тип до верификации подписи и никогда не дотягивает его
до handlers — см.
[`docs/ARCHITECTURE.md` § 15](ARCHITECTURE.md#15-transport-security-model-gateway-boundary)
для расчёта траффика и обоснования отсутствия подписи у heartbeat.
### 7.3 Управление backend → gateway ### 7.3 Управление backend → gateway
Backend хостит единственный gRPC-сервис `Push.SubscribePush`, Backend хостит единственный gRPC-сервис `Push.SubscribePush`,
+11 -1
View File
@@ -294,7 +294,17 @@ the stream to the verified `user_id` and `device_session_id`, sends one
signed `gateway.server_time` bootstrap event whose FlatBuffers payload carries signed `gateway.server_time` bootstrap event whose FlatBuffers payload carries
`server_time_ms`, registers the active stream in the in-memory `PushHub`, and `server_time_ms`, registers the active stream in the in-memory `PushHub`, and
then forwards signed client-facing events consumed from the configured client then forwards signed client-facing events consumed from the configured client
event Redis stream. User-targeted events fan out to every active stream for event Redis stream. After the bootstrap, the stream is wrapped with a
silence-based heartbeat: when no real event has been forwarded for
`GATEWAY_PUSH_HEARTBEAT_INTERVAL` (default `15s`, set to `0s` to disable),
the gateway emits an unsigned `gateway.heartbeat` event so browser
fetch-streaming layers (Safari is the most aggressive) keep the response
body open and pending push events are not lost into the client-side
reconnect window. Each emission is counted by the
`gateway.push.heartbeats_sent{outcome}` metric; see
[`docs/ARCHITECTURE.md` § 15](../docs/ARCHITECTURE.md#15-transport-security-model-gateway-boundary)
for the bandwidth projection and the reasoning behind the unsigned
envelope. User-targeted events fan out to every active stream for
that user. Session-targeted events fan out only to streams whose that user. Session-targeted events fan out only to streams whose
`user_id` and `device_session_id` both match the event target. Each active `user_id` and `device_session_id` both match the event target. Each active
stream uses a bounded in-memory queue; when that queue overflows, only the stream uses a bounded in-memory queue; when that queue overflows, only the
+35
View File
@@ -125,6 +125,14 @@ const (
// gRPC requests. // gRPC requests.
authenticatedGRPCFreshnessWindowEnvVar = "GATEWAY_AUTHENTICATED_GRPC_FRESHNESS_WINDOW" authenticatedGRPCFreshnessWindowEnvVar = "GATEWAY_AUTHENTICATED_GRPC_FRESHNESS_WINDOW"
// pushHeartbeatIntervalEnvVar names the environment variable that
// configures the silence-based heartbeat cadence for authenticated
// push streams. The heartbeat keeps idle SubscribeEvents responses
// alive across browser fetch-streaming idle timeouts (Safari is
// notably aggressive) so push events do not disappear into the
// reconnect window. A value of `0s` disables heartbeats entirely.
pushHeartbeatIntervalEnvVar = "GATEWAY_PUSH_HEARTBEAT_INTERVAL"
// authenticatedGRPCIPRateLimitRequestsEnvVar names the environment // authenticatedGRPCIPRateLimitRequestsEnvVar names the environment
// variable that configures the authenticated gRPC per-IP request budget per // variable that configures the authenticated gRPC per-IP request budget per
// window. // window.
@@ -321,6 +329,13 @@ const (
defaultAuthenticatedGRPCDownstreamTimeout = 5 * time.Second defaultAuthenticatedGRPCDownstreamTimeout = 5 * time.Second
defaultAuthenticatedGRPCFreshnessWindow = 5 * time.Minute defaultAuthenticatedGRPCFreshnessWindow = 5 * time.Minute
// defaultPushHeartbeatInterval is the silence window the push stream
// keeps open before emitting `gateway.heartbeat`. 15s is comfortably
// below the empirical Safari fetch-streaming idle threshold
// (~15-30s) and well above any realistic per-event rate, so the
// timer is almost always reset by a real event in active games.
defaultPushHeartbeatInterval = 15 * time.Second
defaultAuthenticatedGRPCIPRateLimitRequests = 120 defaultAuthenticatedGRPCIPRateLimitRequests = 120
defaultAuthenticatedGRPCIPRateLimitBurst = 40 defaultAuthenticatedGRPCIPRateLimitBurst = 40
@@ -549,6 +564,16 @@ type AuthenticatedGRPCConfig struct {
// used for client request timestamps. // used for client request timestamps.
FreshnessWindow time.Duration FreshnessWindow time.Duration
// PushHeartbeatInterval is the silence window after which an open
// authenticated SubscribeEvents stream sends an unsigned
// `gateway.heartbeat` event. Every real Send resets the window, so
// in busy streams the heartbeat fires rarely. A zero or negative
// value disables the heartbeat — the stream then relies on
// transport-level keepalives only, which Safari's fetch-streaming
// layer ignores. See `docs/ARCHITECTURE.md` for the security
// rationale of leaving the heartbeat unsigned.
PushHeartbeatInterval time.Duration
// AntiAbuse configures the authenticated gRPC rate limits enforced after // AntiAbuse configures the authenticated gRPC rate limits enforced after
// the request passes the transport authenticity checks. // the request passes the transport authenticity checks.
AntiAbuse AuthenticatedGRPCAntiAbuseConfig AntiAbuse AuthenticatedGRPCAntiAbuseConfig
@@ -719,6 +744,7 @@ func DefaultAuthenticatedGRPCConfig() AuthenticatedGRPCConfig {
ConnectionTimeout: defaultAuthenticatedGRPCConnectionTimeout, ConnectionTimeout: defaultAuthenticatedGRPCConnectionTimeout,
DownstreamTimeout: defaultAuthenticatedGRPCDownstreamTimeout, DownstreamTimeout: defaultAuthenticatedGRPCDownstreamTimeout,
FreshnessWindow: defaultAuthenticatedGRPCFreshnessWindow, FreshnessWindow: defaultAuthenticatedGRPCFreshnessWindow,
PushHeartbeatInterval: defaultPushHeartbeatInterval,
AntiAbuse: AuthenticatedGRPCAntiAbuseConfig{ AntiAbuse: AuthenticatedGRPCAntiAbuseConfig{
IP: AuthenticatedRateLimitConfig{ IP: AuthenticatedRateLimitConfig{
Requests: defaultAuthenticatedGRPCIPRateLimitRequests, Requests: defaultAuthenticatedGRPCIPRateLimitRequests,
@@ -928,6 +954,12 @@ func LoadFromEnv() (Config, error) {
} }
cfg.AuthenticatedGRPC.DownstreamTimeout = authenticatedGRPCDownstreamTimeout cfg.AuthenticatedGRPC.DownstreamTimeout = authenticatedGRPCDownstreamTimeout
pushHeartbeatInterval, err := loadDurationEnvWithDefault(pushHeartbeatIntervalEnvVar, cfg.AuthenticatedGRPC.PushHeartbeatInterval)
if err != nil {
return Config{}, err
}
cfg.AuthenticatedGRPC.PushHeartbeatInterval = pushHeartbeatInterval
authenticatedGRPCFreshnessWindow, err := loadDurationEnvWithDefault(authenticatedGRPCFreshnessWindowEnvVar, cfg.AuthenticatedGRPC.FreshnessWindow) authenticatedGRPCFreshnessWindow, err := loadDurationEnvWithDefault(authenticatedGRPCFreshnessWindowEnvVar, cfg.AuthenticatedGRPC.FreshnessWindow)
if err != nil { if err != nil {
return Config{}, err return Config{}, err
@@ -1156,6 +1188,9 @@ func LoadFromEnv() (Config, error) {
if cfg.AuthenticatedGRPC.FreshnessWindow <= 0 { if cfg.AuthenticatedGRPC.FreshnessWindow <= 0 {
return Config{}, fmt.Errorf("load gateway config: %s must be positive", authenticatedGRPCFreshnessWindowEnvVar) return Config{}, fmt.Errorf("load gateway config: %s must be positive", authenticatedGRPCFreshnessWindowEnvVar)
} }
if cfg.AuthenticatedGRPC.PushHeartbeatInterval < 0 {
return Config{}, fmt.Errorf("load gateway config: %s must not be negative", pushHeartbeatIntervalEnvVar)
}
if err := validateRateLimitConfig( if err := validateRateLimitConfig(
cfg.AuthenticatedGRPC.AntiAbuse.IP, cfg.AuthenticatedGRPC.AntiAbuse.IP,
authenticatedGRPCIPRateLimitRequestsEnvVar, authenticatedGRPCIPRateLimitRequestsEnvVar,
+25
View File
@@ -164,6 +164,30 @@ func TestLoadFromEnvAppliesPublicAndAuthGRPCDefaults(t *testing.T) {
assert.Equal(t, defaultAuthenticatedGRPCConnectionTimeout, cfg.AuthenticatedGRPC.ConnectionTimeout) assert.Equal(t, defaultAuthenticatedGRPCConnectionTimeout, cfg.AuthenticatedGRPC.ConnectionTimeout)
assert.Equal(t, defaultAuthenticatedGRPCDownstreamTimeout, cfg.AuthenticatedGRPC.DownstreamTimeout) assert.Equal(t, defaultAuthenticatedGRPCDownstreamTimeout, cfg.AuthenticatedGRPC.DownstreamTimeout)
assert.Equal(t, defaultAuthenticatedGRPCFreshnessWindow, cfg.AuthenticatedGRPC.FreshnessWindow) assert.Equal(t, defaultAuthenticatedGRPCFreshnessWindow, cfg.AuthenticatedGRPC.FreshnessWindow)
assert.Equal(t, defaultPushHeartbeatInterval, cfg.AuthenticatedGRPC.PushHeartbeatInterval)
}
func TestLoadFromEnvParsesPushHeartbeatInterval(t *testing.T) {
configEnvMu.Lock()
defer configEnvMu.Unlock()
resetEnv(t)
setBaseRequiredEnv(t)
t.Setenv(pushHeartbeatIntervalEnvVar, "0s")
cfg, err := LoadFromEnv()
require.NoError(t, err)
assert.Equal(t, time.Duration(0), cfg.AuthenticatedGRPC.PushHeartbeatInterval, "0s explicitly disables the heartbeat")
t.Setenv(pushHeartbeatIntervalEnvVar, "25s")
cfg, err = LoadFromEnv()
require.NoError(t, err)
assert.Equal(t, 25*time.Second, cfg.AuthenticatedGRPC.PushHeartbeatInterval)
t.Setenv(pushHeartbeatIntervalEnvVar, "-1s")
_, err = LoadFromEnv()
require.Error(t, err)
assert.Contains(t, err.Error(), pushHeartbeatIntervalEnvVar)
} }
func TestLoadFromEnvParsesCORSAllowedOrigins(t *testing.T) { func TestLoadFromEnvParsesCORSAllowedOrigins(t *testing.T) {
@@ -211,6 +235,7 @@ func resetEnv(t *testing.T) {
authenticatedGRPCConnectionTimeoutEnvVar, authenticatedGRPCConnectionTimeoutEnvVar,
authenticatedGRPCDownstreamTimeoutEnvVar, authenticatedGRPCDownstreamTimeoutEnvVar,
authenticatedGRPCFreshnessWindowEnvVar, authenticatedGRPCFreshnessWindowEnvVar,
pushHeartbeatIntervalEnvVar,
gatewayRedisMasterAddrEnvVar, gatewayRedisMasterAddrEnvVar,
gatewayRedisPasswordEnvVar, gatewayRedisPasswordEnvVar,
replayRedisKeyPrefixEnvVar, replayRedisKeyPrefixEnvVar,
+163
View File
@@ -0,0 +1,163 @@
package grpcapi
import (
"context"
"sync"
"time"
"galaxy/gateway/internal/telemetry"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
)
// heartbeatingStream wraps a server-streaming response so the inner
// stream stays alive across browser fetch-streaming idle timeouts.
// Every call to Send (a real event from a tail service) resets a
// silence timer; when the timer fires, Run emits an unsigned
// `gateway.heartbeat` event on its own. Send and the heartbeat
// goroutine serialise on the same mutex because grpc.ServerStream.Send
// is documented as not goroutine-safe.
//
// Wire-cost budgeting: each heartbeat is one GatewayEvent with only
// EventType populated (~17 bytes + protobuf tag), framed by Connect
// (~5 bytes) and HTTP/2 plus TLS overhead (~50 bytes). At the
// 15-second default a fully-idle stream costs ~840 KB/day per client;
// see `docs/ARCHITECTURE.md` for the per-scale projection.
type heartbeatingStream struct {
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
interval time.Duration
metrics *telemetry.Runtime
sendMu sync.Mutex
timer *time.Timer
stopOnce sync.Once
done chan struct{}
}
// newHeartbeatingStream wraps inner with a silence-based heartbeat
// emitter. A non-positive interval returns nil so the caller can skip
// the wrapping entirely; non-nil returns must have `Stop()` called once
// the stream lifecycle ends.
func newHeartbeatingStream(
inner grpc.ServerStreamingServer[gatewayv1.GatewayEvent],
interval time.Duration,
metrics *telemetry.Runtime,
) *heartbeatingStream {
if interval <= 0 {
return nil
}
return &heartbeatingStream{
ServerStreamingServer: inner,
interval: interval,
metrics: metrics,
timer: time.NewTimer(interval),
done: make(chan struct{}),
}
}
// Send forwards event to the inner stream and resets the silence timer
// so the heartbeat goroutine waits a fresh interval before firing
// again. A Send that succeeds means the transport just delivered real
// bytes; the silence window restarts from "now".
func (s *heartbeatingStream) Send(event *gatewayv1.GatewayEvent) error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
if err := s.ServerStreamingServer.Send(event); err != nil {
return err
}
s.resetTimerLocked()
return nil
}
// Run blocks until ctx is canceled or Stop is called, emitting one
// `gateway.heartbeat` event whenever the silence timer fires. Intended
// to run in its own goroutine alongside the tail service that owns the
// stream. A Send failure from the heartbeat path is recorded in
// telemetry and returned to the caller; production wiring discards it
// because the tail service will see the same transport failure on its
// next Send and propagate the real error to the gateway frame
// observability layer.
func (s *heartbeatingStream) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-s.done:
return nil
case <-s.timer.C:
err := s.sendHeartbeat()
if err != nil {
return err
}
}
}
}
// Stop halts the heartbeat goroutine and drains the silence timer.
// Safe to call multiple times; subsequent calls are no-ops.
func (s *heartbeatingStream) Stop() {
s.stopOnce.Do(func() {
close(s.done)
if !s.timer.Stop() {
select {
case <-s.timer.C:
default:
}
}
})
}
// sendHeartbeat emits one heartbeat event, records the outcome in
// telemetry, and re-arms the silence timer. The outcome attribute
// makes a sudden bump of `error` easy to spot in dashboards — it
// usually means the upstream connection is failing before the gateway
// can flush, while a steady `sent` rate is the normal idle baseline
// the deployment operator budgets bandwidth against.
func (s *heartbeatingStream) sendHeartbeat() error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
err := s.ServerStreamingServer.Send(buildHeartbeatEvent())
outcome := attribute.String("outcome", "sent")
if err != nil {
outcome = attribute.String("outcome", "error")
}
s.metrics.RecordPushHeartbeat(context.Background(), outcome)
if err != nil {
return err
}
s.resetTimerLocked()
return nil
}
// resetTimerLocked re-arms the silence timer. Caller must hold sendMu.
// The drain follows the canonical pattern from the time.Timer
// docstring: Stop may report `false` either because the timer already
// fired or because nothing was queued, so the non-blocking drain
// handles both states without deadlocking when the channel was already
// emptied by Run.
func (s *heartbeatingStream) resetTimerLocked() {
if !s.timer.Stop() {
select {
case <-s.timer.C:
default:
}
}
s.timer.Reset(s.interval)
}
// buildHeartbeatEvent returns the minimal `gateway.heartbeat`
// GatewayEvent emitted into the push stream. Every field except
// EventType is left at its proto3 default so the wire frame stays as
// small as Connect framing allows. See `gatewayHeartbeatEventType` for
// the security rationale of leaving the event unsigned.
func buildHeartbeatEvent() *gatewayv1.GatewayEvent {
return &gatewayv1.GatewayEvent{EventType: gatewayHeartbeatEventType}
}
@@ -0,0 +1,185 @@
package grpcapi
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func TestNewHeartbeatingStreamZeroIntervalReturnsNil(t *testing.T) {
t.Parallel()
stream := newHeartbeatingStream(newCapturingStream(t), 0, nil)
assert.Nil(t, stream, "zero interval must not allocate a wrapper")
negative := newHeartbeatingStream(newCapturingStream(t), -time.Second, nil)
assert.Nil(t, negative, "negative interval must not allocate a wrapper")
}
func TestHeartbeatingStreamSendsHeartbeatAfterSilence(t *testing.T) {
t.Parallel()
inner := newCapturingStream(t)
hb := newHeartbeatingStream(inner, 30*time.Millisecond, nil)
require.NotNil(t, hb)
defer hb.Stop()
go func() { _ = hb.Run(t.Context()) }()
event := inner.recv(t, 200*time.Millisecond)
assert.Equal(t, gatewayHeartbeatEventType, event.GetEventType())
// Heartbeat envelope: only the event type travels. Every other
// field stays at proto3 default so the wire frame stays minimal.
assert.Empty(t, event.GetEventId())
assert.Zero(t, event.GetTimestampMs())
assert.Empty(t, event.GetPayloadBytes())
assert.Empty(t, event.GetPayloadHash())
assert.Empty(t, event.GetSignature())
assert.Empty(t, event.GetRequestId())
assert.Empty(t, event.GetTraceId())
}
func TestHeartbeatingStreamRealSendResetsSilenceTimer(t *testing.T) {
t.Parallel()
inner := newCapturingStream(t)
hb := newHeartbeatingStream(inner, 50*time.Millisecond, nil)
require.NotNil(t, hb)
defer hb.Stop()
go func() { _ = hb.Run(t.Context()) }()
// Reset the timer every 20ms for 120ms — the silence window never
// elapses, so the heartbeat goroutine must stay quiet and the
// channel must only carry the manual real-event Sends.
go func() {
ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()
for range 6 {
<-ticker.C
if err := hb.Send(&gatewayv1.GatewayEvent{EventType: "real.event"}); err != nil {
t.Errorf("real Send failed: %v", err)
return
}
}
}()
for range 6 {
event := inner.recv(t, 100*time.Millisecond)
assert.Equal(t, "real.event", event.GetEventType(), "only real events should appear while Send keeps resetting the silence window")
}
}
func TestHeartbeatingStreamStopHaltsRun(t *testing.T) {
t.Parallel()
inner := newCapturingStream(t)
hb := newHeartbeatingStream(inner, 20*time.Millisecond, nil)
require.NotNil(t, hb)
runDone := make(chan error, 1)
go func() { runDone <- hb.Run(context.Background()) }()
hb.Stop()
select {
case err := <-runDone:
require.NoError(t, err)
case <-time.After(200 * time.Millisecond):
t.Fatal("Run did not exit after Stop")
}
// Stop is idempotent; the second call must not panic on the
// already-closed done channel.
assert.NotPanics(t, hb.Stop)
}
func TestHeartbeatingStreamContextCancelHaltsRun(t *testing.T) {
t.Parallel()
inner := newCapturingStream(t)
hb := newHeartbeatingStream(inner, 20*time.Millisecond, nil)
require.NotNil(t, hb)
defer hb.Stop()
ctx, cancel := context.WithCancel(context.Background())
runDone := make(chan error, 1)
go func() { runDone <- hb.Run(ctx) }()
cancel()
select {
case err := <-runDone:
require.NoError(t, err)
case <-time.After(200 * time.Millisecond):
t.Fatal("Run did not exit after context cancel")
}
}
func TestHeartbeatingStreamSendErrorPropagates(t *testing.T) {
t.Parallel()
wantErr := errors.New("send failed")
inner := newCapturingStream(t)
inner.sendErr.Store(&errorBox{err: wantErr})
hb := newHeartbeatingStream(inner, time.Minute, nil)
require.NotNil(t, hb)
defer hb.Stop()
err := hb.Send(&gatewayv1.GatewayEvent{EventType: "real.event"})
require.ErrorIs(t, err, wantErr)
}
// capturingStream is a minimal grpc.ServerStreamingServer that pushes
// every Send into a channel so tests can assert on the wire frame.
type capturingStream struct {
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
events chan *gatewayv1.GatewayEvent
sendErr atomic.Pointer[errorBox]
}
type errorBox struct{ err error }
func newCapturingStream(t *testing.T) *capturingStream {
t.Helper()
return &capturingStream{events: make(chan *gatewayv1.GatewayEvent, 16)}
}
func (s *capturingStream) Send(event *gatewayv1.GatewayEvent) error {
if box := s.sendErr.Load(); box != nil {
return box.err
}
s.events <- event
return nil
}
func (s *capturingStream) Context() context.Context { return context.Background() }
func (s *capturingStream) SetHeader(metadata.MD) error { return nil }
func (s *capturingStream) SendHeader(metadata.MD) error { return nil }
func (s *capturingStream) SetTrailer(metadata.MD) {}
func (s *capturingStream) SendMsg(any) error { return errors.New("capturingStream.SendMsg: unused") }
func (s *capturingStream) RecvMsg(any) error { return errors.New("capturingStream.RecvMsg: unused") }
func (s *capturingStream) recv(t *testing.T, timeout time.Duration) *gatewayv1.GatewayEvent {
t.Helper()
select {
case event := <-s.events:
return event
case <-time.After(timeout):
t.Fatalf("no event captured within %s", timeout)
return nil
}
}
+66 -9
View File
@@ -4,9 +4,11 @@ import (
"bytes" "bytes"
"context" "context"
"crypto/sha256" "crypto/sha256"
"time"
"galaxy/gateway/authn" "galaxy/gateway/authn"
"galaxy/gateway/internal/clock" "galaxy/gateway/internal/clock"
"galaxy/gateway/internal/telemetry"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
gatewayfbs "galaxy/schema/fbs/gateway" gatewayfbs "galaxy/schema/fbs/gateway"
@@ -16,7 +18,31 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
const serverTimeEventType = "gateway.server_time" const (
serverTimeEventType = "gateway.server_time"
// gatewayHeartbeatEventType labels the silence-filling event the
// authenticated push stream emits when no real event has been Send'd
// within `AuthenticatedGRPCConfig.PushHeartbeatInterval`. Browser
// fetch-streaming layers (notably Safari) close response bodies they
// consider idle; the heartbeat keeps the body active so push events
// land on the live stream instead of disappearing into the
// client-side reconnect window.
//
// Heartbeat events are sent UNSIGNED — `EventID`, `RequestID`,
// `TraceID`, `PayloadBytes`, `PayloadHash`, `Signature`, and
// `TimestampMs` are all left at their proto3 defaults so the wire
// frame stays under ~50 bytes. The UI's EventStream short-circuits
// on this event type before signature verification (see
// `ui/frontend/src/api/events.svelte.ts`) and never dispatches it to
// handlers. The security implication is intentional and documented
// in `docs/ARCHITECTURE.md` (§ authenticated edge): an attacker who
// could inject heartbeats gains nothing — they carry no payload and
// trigger no UI behaviour, the only practical effect is keeping a
// stream marginally more alive than transport-level keepalives
// would. Real events keep the signed envelope unchanged.
gatewayHeartbeatEventType = "gateway.heartbeat"
)
// authenticatedStreamBinding captures the verified identity bound to one // authenticatedStreamBinding captures the verified identity bound to one
// authenticated SubscribeEvents stream after the full ingress pipeline // authenticated SubscribeEvents stream after the full ingress pipeline
@@ -47,12 +73,21 @@ func authenticatedStreamBindingFromContext(ctx context.Context) (authenticatedSt
// authenticatedPushStreamService owns SubscribeEvents bootstrap behavior: // authenticatedPushStreamService owns SubscribeEvents bootstrap behavior:
// bind the authenticated stream, send the initial signed server-time event, // bind the authenticated stream, send the initial signed server-time event,
// and then hand the stream lifecycle to the configured tail delegate. // and then hand the stream lifecycle to the configured tail delegate.
//
// A positive `heartbeatInterval` wraps the bound stream with
// `heartbeatingStream` before delegating, so any tail implementation
// (fan-out, hold-open, future variants) gets the silence-based
// `gateway.heartbeat` for free. The wrapper observes every real Send
// the tail performs and only emits a heartbeat when the silence window
// elapses; tails remain heartbeat-unaware.
type authenticatedPushStreamService struct { type authenticatedPushStreamService struct {
gatewayv1.UnimplementedEdgeGatewayServer gatewayv1.UnimplementedEdgeGatewayServer
tailDelegate gatewayv1.EdgeGatewayServer tailDelegate gatewayv1.EdgeGatewayServer
responseSigner authn.ResponseSigner responseSigner authn.ResponseSigner
clock clock.Clock clock clock.Clock
heartbeatInterval time.Duration
metrics *telemetry.Runtime
} }
// SubscribeEvents binds the verified stream identity, sends the initial signed // SubscribeEvents binds the verified stream identity, sends the initial signed
@@ -112,18 +147,40 @@ func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.Subscribe
return err return err
} }
return s.tailDelegate.SubscribeEvents(req, boundStream) var streamForTail grpc.ServerStreamingServer[gatewayv1.GatewayEvent] = boundStream
if hbStream := newHeartbeatingStream(boundStream, s.heartbeatInterval, s.metrics); hbStream != nil {
defer hbStream.Stop()
go func() {
// Heartbeat Send failures imply the transport is already
// dead — the tail's next Send will hit the same error and
// surface through the gateway observability layer, so we
// discard the returned error here and rely on that path
// for the canonical failure record.
_ = hbStream.Run(stream.Context())
}()
streamForTail = hbStream
}
return s.tailDelegate.SubscribeEvents(req, streamForTail)
} }
func newAuthenticatedPushStreamService(tailDelegate gatewayv1.EdgeGatewayServer, responseSigner authn.ResponseSigner, clk clock.Clock) gatewayv1.EdgeGatewayServer { func newAuthenticatedPushStreamService(
tailDelegate gatewayv1.EdgeGatewayServer,
responseSigner authn.ResponseSigner,
clk clock.Clock,
heartbeatInterval time.Duration,
metrics *telemetry.Runtime,
) gatewayv1.EdgeGatewayServer {
if tailDelegate == nil { if tailDelegate == nil {
tailDelegate = holdOpenSubscribeEventsService{} tailDelegate = holdOpenSubscribeEventsService{}
} }
return authenticatedPushStreamService{ return authenticatedPushStreamService{
tailDelegate: tailDelegate, tailDelegate: tailDelegate,
responseSigner: responseSigner, responseSigner: responseSigner,
clock: clk, clock: clk,
heartbeatInterval: heartbeatInterval,
metrics: metrics,
} }
} }
+7 -1
View File
@@ -111,7 +111,13 @@ func NewServer(cfg config.AuthenticatedGRPCConfig, deps ServerDependencies) *Ser
deps = normalizeServerDependencies(deps) deps = normalizeServerDependencies(deps)
finalService := newCommandRoutingService( finalService := newCommandRoutingService(
newAuthenticatedPushStreamService(deps.Service, deps.ResponseSigner, deps.Clock), newAuthenticatedPushStreamService(
deps.Service,
deps.ResponseSigner,
deps.Clock,
cfg.PushHeartbeatInterval,
deps.Telemetry,
),
deps.Router, deps.Router,
deps.ResponseSigner, deps.ResponseSigner,
deps.Clock, deps.Clock,
+96
View File
@@ -174,6 +174,102 @@ func TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation(
assert.Equal(t, connect.CodeCanceled, connect.CodeOf(recvErr)) assert.Equal(t, connect.CodeCanceled, connect.CodeOf(recvErr))
} }
func TestSubscribeEventsEmitsHeartbeatAfterSilenceWindow(t *testing.T) {
t.Parallel()
grpcCfg := config.DefaultAuthenticatedGRPCConfig()
grpcCfg.Addr = "127.0.0.1:0"
grpcCfg.FreshnessWindow = testFreshnessWindow
// 30 ms keeps the test inside the standard 60-second go test
// timeout while still giving the heartbeat goroutine enough
// headroom to fire after the bootstrap server-time event lands.
grpcCfg.PushHeartbeatInterval = 30 * time.Millisecond
server, runGateway := newTestGatewayWithGRPCConfig(t, grpcCfg, ServerDependencies{
SessionCache: staticSessionCache{
lookupFunc: func(context.Context, string) (session.Record, error) {
return newActiveSessionRecord(), nil
},
},
ReplayStore: staticReplayStore{},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
stream, err := client.SubscribeEvents(t.Context(), connect.NewRequest(newValidSubscribeEventsRequest()))
require.NoError(t, err)
t.Cleanup(func() { _ = stream.Close() })
bootstrap := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, bootstrap, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
// The next frame must be the unsigned heartbeat. Every field
// except EventType is left at its proto3 default — the UI side
// short-circuits on EventType BEFORE signature verification, so
// any non-empty signature would be wasted bytes on the wire.
require.True(t, stream.Receive(), "stream did not deliver a heartbeat: %v", stream.Err())
heartbeat := stream.Msg()
assert.Equal(t, gatewayHeartbeatEventType, heartbeat.GetEventType())
assert.Empty(t, heartbeat.GetEventId())
assert.Zero(t, heartbeat.GetTimestampMs())
assert.Empty(t, heartbeat.GetPayloadBytes())
assert.Empty(t, heartbeat.GetPayloadHash())
assert.Empty(t, heartbeat.GetSignature())
assert.Empty(t, heartbeat.GetRequestId())
assert.Empty(t, heartbeat.GetTraceId())
}
func TestSubscribeEventsZeroHeartbeatIntervalDisablesEmission(t *testing.T) {
t.Parallel()
grpcCfg := config.DefaultAuthenticatedGRPCConfig()
grpcCfg.Addr = "127.0.0.1:0"
grpcCfg.FreshnessWindow = testFreshnessWindow
grpcCfg.PushHeartbeatInterval = 0
server, runGateway := newTestGatewayWithGRPCConfig(t, grpcCfg, ServerDependencies{
SessionCache: staticSessionCache{
lookupFunc: func(context.Context, string) (session.Record, error) {
return newActiveSessionRecord(), nil
},
},
ReplayStore: staticReplayStore{},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
stream, err := client.SubscribeEvents(t.Context(), connect.NewRequest(newValidSubscribeEventsRequest()))
require.NoError(t, err)
t.Cleanup(func() { _ = stream.Close() })
bootstrap := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, bootstrap, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
// No heartbeat is expected — the stream must stay silent. A
// background Receive races a deadline check so the test fails
// fast if the gateway ever sends a second frame on this stream.
recvResult := make(chan error, 1)
go func() {
if stream.Receive() {
recvResult <- errors.New("stream produced unexpected event")
return
}
recvResult <- stream.Err()
}()
require.Never(t, func() bool {
select {
case <-recvResult:
return true
default:
return false
}
}, 200*time.Millisecond, 20*time.Millisecond, "heartbeat fired despite zero interval")
}
func TestSubscribeEventsMissingReplayStoreFailsClosed(t *testing.T) { func TestSubscribeEventsMissingReplayStoreFailsClosed(t *testing.T) {
t.Parallel() t.Parallel()
+19
View File
@@ -46,6 +46,7 @@ type Runtime struct {
// Push instruments. // Push instruments.
pushActiveStreams metric.Int64UpDownCounter pushActiveStreams metric.Int64UpDownCounter
pushStreamClosers metric.Int64Counter pushStreamClosers metric.Int64Counter
pushHeartbeats metric.Int64Counter
// Internal event consumer instruments. // Internal event consumer instruments.
internalEventDrops metric.Int64Counter internalEventDrops metric.Int64Counter
@@ -120,6 +121,10 @@ func New(ctx context.Context, logger *zap.Logger) (*Runtime, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
pushHeartbeats, err := meter.Int64Counter("gateway.push.heartbeats_sent")
if err != nil {
return nil, err
}
internalEventDrops, err := meter.Int64Counter("gateway.internal_event_drops") internalEventDrops, err := meter.Int64Counter("gateway.internal_event_drops")
if err != nil { if err != nil {
return nil, err return nil, err
@@ -136,6 +141,7 @@ func New(ctx context.Context, logger *zap.Logger) (*Runtime, error) {
grpcDuration: grpcDuration, grpcDuration: grpcDuration,
pushActiveStreams: pushActiveStreams, pushActiveStreams: pushActiveStreams,
pushStreamClosers: pushStreamClosers, pushStreamClosers: pushStreamClosers,
pushHeartbeats: pushHeartbeats,
internalEventDrops: internalEventDrops, internalEventDrops: internalEventDrops,
}, nil }, nil
} }
@@ -228,6 +234,19 @@ func (r *Runtime) RecordPushStreamClosure(ctx context.Context, attrs ...attribut
r.pushStreamClosers.Add(ctx, 1, metric.WithAttributes(attrs...)) r.pushStreamClosers.Add(ctx, 1, metric.WithAttributes(attrs...))
} }
// RecordPushHeartbeat records one outbound push-stream heartbeat event.
// The `outcome` attribute should distinguish a successful Send from a
// transport-level failure so the metric stays useful for bandwidth
// budgeting (most heartbeats are `sent`; a sudden bump of `error` means
// the upstream connection is failing before the gateway can flush).
func (r *Runtime) RecordPushHeartbeat(ctx context.Context, attrs ...attribute.KeyValue) {
if r == nil {
return
}
r.pushHeartbeats.Add(ctx, 1, metric.WithAttributes(attrs...))
}
// RecordInternalEventDrop records one malformed or rejected internal event. // RecordInternalEventDrop records one malformed or rejected internal event.
func (r *Runtime) RecordInternalEventDrop(ctx context.Context, attrs ...attribute.KeyValue) { func (r *Runtime) RecordInternalEventDrop(ctx context.Context, attrs ...attribute.KeyValue) {
if r == nil { if r == nil {
+25 -1
View File
@@ -24,7 +24,6 @@ import { ConnectError } from "@connectrpc/connect";
import type { Core } from "../platform/core/index"; import type { Core } from "../platform/core/index";
import type { DeviceKeypair } from "../platform/store/index"; import type { DeviceKeypair } from "../platform/store/index";
import { import {
GatewayEventSchema,
SubscribeEventsRequestSchema, SubscribeEventsRequestSchema,
type GatewayEvent, type GatewayEvent,
} from "../proto/galaxy/gateway/v1/edge_gateway_pb"; } from "../proto/galaxy/gateway/v1/edge_gateway_pb";
@@ -35,6 +34,17 @@ import { createEdgeGatewayClient, type EdgeGatewayClient } from "./connect";
const PROTOCOL_VERSION = "v1"; const PROTOCOL_VERSION = "v1";
const SUBSCRIBE_MESSAGE_TYPE = "gateway.subscribe"; const SUBSCRIBE_MESSAGE_TYPE = "gateway.subscribe";
// HEARTBEAT_EVENT_TYPE matches `gatewayHeartbeatEventType` on the
// gateway side. The server emits this unsigned event with only the
// `eventType` field set when the push stream has been silent for the
// configured interval — its sole purpose is to keep browser fetch
// streaming layers (Safari is notably aggressive on idle timeouts)
// from closing the response body and causing push events to disappear
// into the reconnect window. The client SHORT-CIRCUITS on this event
// type before signature verification or handler dispatch: the payload
// is empty by design and there is no signature to validate.
const HEARTBEAT_EVENT_TYPE = "gateway.heartbeat";
// Connect error code numerical values used by the watcher. The full // Connect error code numerical values used by the watcher. The full
// enum lives in `@connectrpc/connect` but importing the runtime enum // enum lives in `@connectrpc/connect` but importing the runtime enum
// would pull a large surface into this small module. // would pull a large surface into this small module.
@@ -207,6 +217,20 @@ export class EventStream {
if (signal.aborted) { if (signal.aborted) {
return; return;
} }
if (event.eventType === HEARTBEAT_EVENT_TYPE) {
// Heartbeats are unsigned by design (see the
// constant's docstring) and carry no payload — skip
// verification + dispatch. They still count as
// connection proof: receiving one means the
// stream is healthy, so reset backoff like any
// other first event.
if (!firstEventSeen) {
firstEventSeen = true;
this.connectionStatus = "connected";
attempt = 0;
}
continue;
}
this.verifyEvent(event, opts); this.verifyEvent(event, opts);
if (!firstEventSeen) { if (!firstEventSeen) {
firstEventSeen = true; firstEventSeen = true;
+82
View File
@@ -321,6 +321,88 @@ describe("EventStream", () => {
eventStream.stop(); eventStream.stop();
}); });
test("gateway.heartbeat is short-circuited before verification and dispatch", async () => {
const handler = vi.fn();
eventStream.on("game.turn.ready", handler);
// A heartbeat with an empty signature (matching the unsigned
// wire shape the gateway emits) would normally trip
// `verifyEvent` and tear the stream down with a SignatureError.
// The short-circuit skips both verification and dispatch.
const heartbeat = create(GatewayEventSchema, {
eventType: "gateway.heartbeat",
eventId: "",
timestampMs: 0n,
payloadBytes: new Uint8Array(0),
payloadHash: new Uint8Array(0),
signature: new Uint8Array(0),
requestId: "",
traceId: "",
});
const realEvent = buildEvent(
"game.turn.ready",
new TextEncoder().encode("{}"),
);
const verifyEventSpy = vi.fn(() => true);
const verifyPayloadHashSpy = vi.fn(() => true);
const core = mockCore({
verifyEvent: verifyEventSpy,
verifyPayloadHash: verifyPayloadHashSpy,
});
const client = makeRouter(async function* () {
yield heartbeat;
yield realEvent;
});
eventStream.start({
core,
keypair: mockKeypair(),
deviceSessionId: "device-1",
gatewayResponsePublicKey: new Uint8Array(32),
client,
sleep: async () => {},
random: () => 0,
});
await vi.waitFor(() => {
expect(handler).toHaveBeenCalled();
});
// Verification ran exactly once — for the real event, never
// for the heartbeat. The handler also only sees the real one.
expect(verifyEventSpy).toHaveBeenCalledTimes(1);
expect(verifyPayloadHashSpy).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledTimes(1);
expect(handler.mock.calls[0]?.[0].eventType).toBe("game.turn.ready");
eventStream.stop();
});
test("a leading heartbeat still flips connectionStatus to connected", async () => {
const heartbeat = create(GatewayEventSchema, {
eventType: "gateway.heartbeat",
});
// The stream yields only a heartbeat then waits for the test
// to abort the consumer — the `connected` transition must not
// require a real event behind the heartbeat.
const pending = new Promise<never>(() => {});
const client = makeRouter(async function* () {
yield heartbeat;
await pending;
});
eventStream.start({
core: mockCore(),
keypair: mockKeypair(),
deviceSessionId: "device-1",
gatewayResponsePublicKey: new Uint8Array(32),
client,
sleep: async () => {},
random: () => 0,
});
await vi.waitFor(() => {
expect(eventStream.connectionStatus).toBe("connected");
});
eventStream.stop();
});
test("connectionStatus transitions through connecting → connected → idle", async () => { test("connectionStatus transitions through connecting → connected → idle", async () => {
expect(eventStream.connectionStatus).toBe("idle"); expect(eventStream.connectionStatus).toBe("idle");
const event = buildEvent( const event = buildEvent(