diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 275bc9b..aa76cb0 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -691,6 +691,55 @@ stream opens is `event_type = gateway.server_time`, reusing the opening `request_id` as `event_id` and carrying `server_time_ms` so clients can calibrate offset without a separate time request. +#### Unsigned `gateway.heartbeat` keepalive + +Browser fetch-streaming layers (notably WebKit/Safari) close response +bodies they consider idle after roughly 15-30 seconds without +incoming bytes. A push stream in a quiet game (no `game.turn.ready`, +no diplomatic mail) would otherwise be torn down and reopened +repeatedly; events that fire during the reconnect window vanish +because `push.Hub` queues are not persisted across subscription +closes. To keep the body active, the gateway emits a +`gateway.heartbeat` event after `GATEWAY_PUSH_HEARTBEAT_INTERVAL` of +stream silence (default `15s`; set to `0s` to disable). Every real +event resets the silence timer, so the heartbeat fires rarely on +busy streams. + +Heartbeats are sent **unsigned**: every field except `event_type` is +left at its protobuf default and no Ed25519 signature is computed. +The client short-circuits on the `gateway.heartbeat` type before +calling `verifyEvent` / `verifyPayloadHash` and never dispatches the +event to handlers. The security implication is intentional — +heartbeats carry no payload that the UI acts on, so an injected +heartbeat trivially fails to cause any user-visible state change. +TLS still protects the wire and the rest of the signed envelope is +unchanged for real events. + +##### Wire cost projections + +| Clients | 15 s | 30 s | 60 s | +| ------: | ---: | ---: | ---: | +| 100 | 25 MB/day | 13 MB/day | 6 MB/day | +| 1 000 | 250 MB/day | 125 MB/day | 62 MB/day | +| 10 000 | 2.5 GB/day | 1.3 GB/day | 0.6 GB/day | +| 100 000 | 25 GB/day | 12.5 GB/day | 6 GB/day | + +Per-heartbeat budget at ~45 bytes on the wire (proto + Connect +framing + HTTP/2 DATA header + amortised TLS overhead), worst case +when no real event ever displaces a tick. Active streams trade +heartbeat traffic for real-event traffic 1:1, so the table is the +upper bound at the chosen interval. Larger deployments that are +willing to take a marginally higher Safari reconnect risk should +raise `GATEWAY_PUSH_HEARTBEAT_INTERVAL` toward 30 s before paying +the full table; setting `0s` reclaims all bytes at the cost of the +visible Safari reconnect loop returning. + +Observability: every emission increments the +`gateway.push.heartbeats_sent{outcome}` counter, where +`outcome=sent` is the steady-state line item the operator budgets +bandwidth against and a sudden `outcome=error` bump means the +upstream connection is failing before the gateway can flush. + ### Verification order at gateway Before any payload is forwarded to backend, gateway must: diff --git a/docs/FUNCTIONAL.md b/docs/FUNCTIONAL.md index 3947aee..4ef3e42 100644 --- a/docs/FUNCTIONAL.md +++ b/docs/FUNCTIONAL.md @@ -820,6 +820,18 @@ internal hub. The first frame the client receives is a gateway-signed bootstrap event carrying the current server time, so the client can calibrate its local clock without a separate request. +While the stream is open, gateway tracks a silence timer; if no real +event has been forwarded for `GATEWAY_PUSH_HEARTBEAT_INTERVAL` +(default `15s`, `0s` disables), gateway emits an unsigned +`gateway.heartbeat` event to keep browser fetch-streaming layers +from closing the response body as idle. Real events reset the +timer, so on busy streams the heartbeat fires rarely. The UI client +short-circuits the heartbeat type before signature verification and +never dispatches it to handlers — see +[`docs/ARCHITECTURE.md` § 15](ARCHITECTURE.md#15-transport-security-model-gateway-boundary) +for the wire-cost projection and the security rationale of leaving +the heartbeat unsigned. + ### 7.3 Backend → gateway control Backend hosts a single gRPC service `Push.SubscribePush`, consumed diff --git a/docs/FUNCTIONAL_ru.md b/docs/FUNCTIONAL_ru.md index 58a64e9..220ba37 100644 --- a/docs/FUNCTIONAL_ru.md +++ b/docs/FUNCTIONAL_ru.md @@ -845,6 +845,18 @@ control-канал backend → gateway, который производит эт с текущим серверным временем, чтобы клиент мог калибровать свои локальные часы без отдельного запроса. +Пока стрим открыт, gateway отслеживает таймер тишины; если за +`GATEWAY_PUSH_HEARTBEAT_INTERVAL` (по умолчанию `15s`, `0s` +отключает) не пришло ни одного реального события, gateway +отправляет неподписанное `gateway.heartbeat`-событие, чтобы +браузерные fetch-streaming слои не закрыли response body как idle. +Реальные события сбрасывают таймер, поэтому на нагруженных +стримах heartbeat срабатывает редко. UI-клиент короткозамыкает +heartbeat-тип до верификации подписи и никогда не дотягивает его +до handlers — см. +[`docs/ARCHITECTURE.md` § 15](ARCHITECTURE.md#15-transport-security-model-gateway-boundary) +для расчёта траффика и обоснования отсутствия подписи у heartbeat. + ### 7.3 Управление backend → gateway Backend хостит единственный gRPC-сервис `Push.SubscribePush`, diff --git a/gateway/README.md b/gateway/README.md index b357877..878d8bf 100644 --- a/gateway/README.md +++ b/gateway/README.md @@ -294,7 +294,17 @@ the stream to the verified `user_id` and `device_session_id`, sends one signed `gateway.server_time` bootstrap event whose FlatBuffers payload carries `server_time_ms`, registers the active stream in the in-memory `PushHub`, and then forwards signed client-facing events consumed from the configured client -event Redis stream. User-targeted events fan out to every active stream for +event Redis stream. After the bootstrap, the stream is wrapped with a +silence-based heartbeat: when no real event has been forwarded for +`GATEWAY_PUSH_HEARTBEAT_INTERVAL` (default `15s`, set to `0s` to disable), +the gateway emits an unsigned `gateway.heartbeat` event so browser +fetch-streaming layers (Safari is the most aggressive) keep the response +body open and pending push events are not lost into the client-side +reconnect window. Each emission is counted by the +`gateway.push.heartbeats_sent{outcome}` metric; see +[`docs/ARCHITECTURE.md` § 15](../docs/ARCHITECTURE.md#15-transport-security-model-gateway-boundary) +for the bandwidth projection and the reasoning behind the unsigned +envelope. User-targeted events fan out to every active stream for that user. Session-targeted events fan out only to streams whose `user_id` and `device_session_id` both match the event target. Each active stream uses a bounded in-memory queue; when that queue overflows, only the diff --git a/gateway/internal/config/config.go b/gateway/internal/config/config.go index 128e400..3d697f6 100644 --- a/gateway/internal/config/config.go +++ b/gateway/internal/config/config.go @@ -125,6 +125,14 @@ const ( // gRPC requests. authenticatedGRPCFreshnessWindowEnvVar = "GATEWAY_AUTHENTICATED_GRPC_FRESHNESS_WINDOW" + // pushHeartbeatIntervalEnvVar names the environment variable that + // configures the silence-based heartbeat cadence for authenticated + // push streams. The heartbeat keeps idle SubscribeEvents responses + // alive across browser fetch-streaming idle timeouts (Safari is + // notably aggressive) so push events do not disappear into the + // reconnect window. A value of `0s` disables heartbeats entirely. + pushHeartbeatIntervalEnvVar = "GATEWAY_PUSH_HEARTBEAT_INTERVAL" + // authenticatedGRPCIPRateLimitRequestsEnvVar names the environment // variable that configures the authenticated gRPC per-IP request budget per // window. @@ -321,6 +329,13 @@ const ( defaultAuthenticatedGRPCDownstreamTimeout = 5 * time.Second defaultAuthenticatedGRPCFreshnessWindow = 5 * time.Minute + // defaultPushHeartbeatInterval is the silence window the push stream + // keeps open before emitting `gateway.heartbeat`. 15s is comfortably + // below the empirical Safari fetch-streaming idle threshold + // (~15-30s) and well above any realistic per-event rate, so the + // timer is almost always reset by a real event in active games. + defaultPushHeartbeatInterval = 15 * time.Second + defaultAuthenticatedGRPCIPRateLimitRequests = 120 defaultAuthenticatedGRPCIPRateLimitBurst = 40 @@ -549,6 +564,16 @@ type AuthenticatedGRPCConfig struct { // used for client request timestamps. FreshnessWindow time.Duration + // PushHeartbeatInterval is the silence window after which an open + // authenticated SubscribeEvents stream sends an unsigned + // `gateway.heartbeat` event. Every real Send resets the window, so + // in busy streams the heartbeat fires rarely. A zero or negative + // value disables the heartbeat — the stream then relies on + // transport-level keepalives only, which Safari's fetch-streaming + // layer ignores. See `docs/ARCHITECTURE.md` for the security + // rationale of leaving the heartbeat unsigned. + PushHeartbeatInterval time.Duration + // AntiAbuse configures the authenticated gRPC rate limits enforced after // the request passes the transport authenticity checks. AntiAbuse AuthenticatedGRPCAntiAbuseConfig @@ -719,6 +744,7 @@ func DefaultAuthenticatedGRPCConfig() AuthenticatedGRPCConfig { ConnectionTimeout: defaultAuthenticatedGRPCConnectionTimeout, DownstreamTimeout: defaultAuthenticatedGRPCDownstreamTimeout, FreshnessWindow: defaultAuthenticatedGRPCFreshnessWindow, + PushHeartbeatInterval: defaultPushHeartbeatInterval, AntiAbuse: AuthenticatedGRPCAntiAbuseConfig{ IP: AuthenticatedRateLimitConfig{ Requests: defaultAuthenticatedGRPCIPRateLimitRequests, @@ -928,6 +954,12 @@ func LoadFromEnv() (Config, error) { } cfg.AuthenticatedGRPC.DownstreamTimeout = authenticatedGRPCDownstreamTimeout + pushHeartbeatInterval, err := loadDurationEnvWithDefault(pushHeartbeatIntervalEnvVar, cfg.AuthenticatedGRPC.PushHeartbeatInterval) + if err != nil { + return Config{}, err + } + cfg.AuthenticatedGRPC.PushHeartbeatInterval = pushHeartbeatInterval + authenticatedGRPCFreshnessWindow, err := loadDurationEnvWithDefault(authenticatedGRPCFreshnessWindowEnvVar, cfg.AuthenticatedGRPC.FreshnessWindow) if err != nil { return Config{}, err @@ -1156,6 +1188,9 @@ func LoadFromEnv() (Config, error) { if cfg.AuthenticatedGRPC.FreshnessWindow <= 0 { return Config{}, fmt.Errorf("load gateway config: %s must be positive", authenticatedGRPCFreshnessWindowEnvVar) } + if cfg.AuthenticatedGRPC.PushHeartbeatInterval < 0 { + return Config{}, fmt.Errorf("load gateway config: %s must not be negative", pushHeartbeatIntervalEnvVar) + } if err := validateRateLimitConfig( cfg.AuthenticatedGRPC.AntiAbuse.IP, authenticatedGRPCIPRateLimitRequestsEnvVar, diff --git a/gateway/internal/config/config_test.go b/gateway/internal/config/config_test.go index 5e1e423..7ffa517 100644 --- a/gateway/internal/config/config_test.go +++ b/gateway/internal/config/config_test.go @@ -164,6 +164,30 @@ func TestLoadFromEnvAppliesPublicAndAuthGRPCDefaults(t *testing.T) { assert.Equal(t, defaultAuthenticatedGRPCConnectionTimeout, cfg.AuthenticatedGRPC.ConnectionTimeout) assert.Equal(t, defaultAuthenticatedGRPCDownstreamTimeout, cfg.AuthenticatedGRPC.DownstreamTimeout) assert.Equal(t, defaultAuthenticatedGRPCFreshnessWindow, cfg.AuthenticatedGRPC.FreshnessWindow) + assert.Equal(t, defaultPushHeartbeatInterval, cfg.AuthenticatedGRPC.PushHeartbeatInterval) +} + +func TestLoadFromEnvParsesPushHeartbeatInterval(t *testing.T) { + configEnvMu.Lock() + defer configEnvMu.Unlock() + + resetEnv(t) + setBaseRequiredEnv(t) + t.Setenv(pushHeartbeatIntervalEnvVar, "0s") + + cfg, err := LoadFromEnv() + require.NoError(t, err) + assert.Equal(t, time.Duration(0), cfg.AuthenticatedGRPC.PushHeartbeatInterval, "0s explicitly disables the heartbeat") + + t.Setenv(pushHeartbeatIntervalEnvVar, "25s") + cfg, err = LoadFromEnv() + require.NoError(t, err) + assert.Equal(t, 25*time.Second, cfg.AuthenticatedGRPC.PushHeartbeatInterval) + + t.Setenv(pushHeartbeatIntervalEnvVar, "-1s") + _, err = LoadFromEnv() + require.Error(t, err) + assert.Contains(t, err.Error(), pushHeartbeatIntervalEnvVar) } func TestLoadFromEnvParsesCORSAllowedOrigins(t *testing.T) { @@ -211,6 +235,7 @@ func resetEnv(t *testing.T) { authenticatedGRPCConnectionTimeoutEnvVar, authenticatedGRPCDownstreamTimeoutEnvVar, authenticatedGRPCFreshnessWindowEnvVar, + pushHeartbeatIntervalEnvVar, gatewayRedisMasterAddrEnvVar, gatewayRedisPasswordEnvVar, replayRedisKeyPrefixEnvVar, diff --git a/gateway/internal/grpcapi/push_heartbeat.go b/gateway/internal/grpcapi/push_heartbeat.go new file mode 100644 index 0000000..41ba3cf --- /dev/null +++ b/gateway/internal/grpcapi/push_heartbeat.go @@ -0,0 +1,163 @@ +package grpcapi + +import ( + "context" + "sync" + "time" + + "galaxy/gateway/internal/telemetry" + gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + + "go.opentelemetry.io/otel/attribute" + "google.golang.org/grpc" +) + +// heartbeatingStream wraps a server-streaming response so the inner +// stream stays alive across browser fetch-streaming idle timeouts. +// Every call to Send (a real event from a tail service) resets a +// silence timer; when the timer fires, Run emits an unsigned +// `gateway.heartbeat` event on its own. Send and the heartbeat +// goroutine serialise on the same mutex because grpc.ServerStream.Send +// is documented as not goroutine-safe. +// +// Wire-cost budgeting: each heartbeat is one GatewayEvent with only +// EventType populated (~17 bytes + protobuf tag), framed by Connect +// (~5 bytes) and HTTP/2 plus TLS overhead (~50 bytes). At the +// 15-second default a fully-idle stream costs ~840 KB/day per client; +// see `docs/ARCHITECTURE.md` for the per-scale projection. +type heartbeatingStream struct { + grpc.ServerStreamingServer[gatewayv1.GatewayEvent] + + interval time.Duration + metrics *telemetry.Runtime + + sendMu sync.Mutex + timer *time.Timer + + stopOnce sync.Once + done chan struct{} +} + +// newHeartbeatingStream wraps inner with a silence-based heartbeat +// emitter. A non-positive interval returns nil so the caller can skip +// the wrapping entirely; non-nil returns must have `Stop()` called once +// the stream lifecycle ends. +func newHeartbeatingStream( + inner grpc.ServerStreamingServer[gatewayv1.GatewayEvent], + interval time.Duration, + metrics *telemetry.Runtime, +) *heartbeatingStream { + if interval <= 0 { + return nil + } + + return &heartbeatingStream{ + ServerStreamingServer: inner, + interval: interval, + metrics: metrics, + timer: time.NewTimer(interval), + done: make(chan struct{}), + } +} + +// Send forwards event to the inner stream and resets the silence timer +// so the heartbeat goroutine waits a fresh interval before firing +// again. A Send that succeeds means the transport just delivered real +// bytes; the silence window restarts from "now". +func (s *heartbeatingStream) Send(event *gatewayv1.GatewayEvent) error { + s.sendMu.Lock() + defer s.sendMu.Unlock() + if err := s.ServerStreamingServer.Send(event); err != nil { + return err + } + s.resetTimerLocked() + + return nil +} + +// Run blocks until ctx is canceled or Stop is called, emitting one +// `gateway.heartbeat` event whenever the silence timer fires. Intended +// to run in its own goroutine alongside the tail service that owns the +// stream. A Send failure from the heartbeat path is recorded in +// telemetry and returned to the caller; production wiring discards it +// because the tail service will see the same transport failure on its +// next Send and propagate the real error to the gateway frame +// observability layer. +func (s *heartbeatingStream) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case <-s.done: + return nil + case <-s.timer.C: + err := s.sendHeartbeat() + if err != nil { + return err + } + } + } +} + +// Stop halts the heartbeat goroutine and drains the silence timer. +// Safe to call multiple times; subsequent calls are no-ops. +func (s *heartbeatingStream) Stop() { + s.stopOnce.Do(func() { + close(s.done) + if !s.timer.Stop() { + select { + case <-s.timer.C: + default: + } + } + }) +} + +// sendHeartbeat emits one heartbeat event, records the outcome in +// telemetry, and re-arms the silence timer. The outcome attribute +// makes a sudden bump of `error` easy to spot in dashboards — it +// usually means the upstream connection is failing before the gateway +// can flush, while a steady `sent` rate is the normal idle baseline +// the deployment operator budgets bandwidth against. +func (s *heartbeatingStream) sendHeartbeat() error { + s.sendMu.Lock() + defer s.sendMu.Unlock() + + err := s.ServerStreamingServer.Send(buildHeartbeatEvent()) + outcome := attribute.String("outcome", "sent") + if err != nil { + outcome = attribute.String("outcome", "error") + } + s.metrics.RecordPushHeartbeat(context.Background(), outcome) + if err != nil { + return err + } + s.resetTimerLocked() + + return nil +} + +// resetTimerLocked re-arms the silence timer. Caller must hold sendMu. +// The drain follows the canonical pattern from the time.Timer +// docstring: Stop may report `false` either because the timer already +// fired or because nothing was queued, so the non-blocking drain +// handles both states without deadlocking when the channel was already +// emptied by Run. +func (s *heartbeatingStream) resetTimerLocked() { + if !s.timer.Stop() { + select { + case <-s.timer.C: + default: + } + } + s.timer.Reset(s.interval) +} + +// buildHeartbeatEvent returns the minimal `gateway.heartbeat` +// GatewayEvent emitted into the push stream. Every field except +// EventType is left at its proto3 default so the wire frame stays as +// small as Connect framing allows. See `gatewayHeartbeatEventType` for +// the security rationale of leaving the event unsigned. +func buildHeartbeatEvent() *gatewayv1.GatewayEvent { + return &gatewayv1.GatewayEvent{EventType: gatewayHeartbeatEventType} +} diff --git a/gateway/internal/grpcapi/push_heartbeat_test.go b/gateway/internal/grpcapi/push_heartbeat_test.go new file mode 100644 index 0000000..9d571f7 --- /dev/null +++ b/gateway/internal/grpcapi/push_heartbeat_test.go @@ -0,0 +1,185 @@ +package grpcapi + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +func TestNewHeartbeatingStreamZeroIntervalReturnsNil(t *testing.T) { + t.Parallel() + + stream := newHeartbeatingStream(newCapturingStream(t), 0, nil) + assert.Nil(t, stream, "zero interval must not allocate a wrapper") + + negative := newHeartbeatingStream(newCapturingStream(t), -time.Second, nil) + assert.Nil(t, negative, "negative interval must not allocate a wrapper") +} + +func TestHeartbeatingStreamSendsHeartbeatAfterSilence(t *testing.T) { + t.Parallel() + + inner := newCapturingStream(t) + hb := newHeartbeatingStream(inner, 30*time.Millisecond, nil) + require.NotNil(t, hb) + defer hb.Stop() + + go func() { _ = hb.Run(t.Context()) }() + + event := inner.recv(t, 200*time.Millisecond) + assert.Equal(t, gatewayHeartbeatEventType, event.GetEventType()) + // Heartbeat envelope: only the event type travels. Every other + // field stays at proto3 default so the wire frame stays minimal. + assert.Empty(t, event.GetEventId()) + assert.Zero(t, event.GetTimestampMs()) + assert.Empty(t, event.GetPayloadBytes()) + assert.Empty(t, event.GetPayloadHash()) + assert.Empty(t, event.GetSignature()) + assert.Empty(t, event.GetRequestId()) + assert.Empty(t, event.GetTraceId()) +} + +func TestHeartbeatingStreamRealSendResetsSilenceTimer(t *testing.T) { + t.Parallel() + + inner := newCapturingStream(t) + hb := newHeartbeatingStream(inner, 50*time.Millisecond, nil) + require.NotNil(t, hb) + defer hb.Stop() + + go func() { _ = hb.Run(t.Context()) }() + + // Reset the timer every 20ms for 120ms — the silence window never + // elapses, so the heartbeat goroutine must stay quiet and the + // channel must only carry the manual real-event Sends. + go func() { + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + for range 6 { + <-ticker.C + if err := hb.Send(&gatewayv1.GatewayEvent{EventType: "real.event"}); err != nil { + t.Errorf("real Send failed: %v", err) + return + } + } + }() + + for range 6 { + event := inner.recv(t, 100*time.Millisecond) + assert.Equal(t, "real.event", event.GetEventType(), "only real events should appear while Send keeps resetting the silence window") + } +} + +func TestHeartbeatingStreamStopHaltsRun(t *testing.T) { + t.Parallel() + + inner := newCapturingStream(t) + hb := newHeartbeatingStream(inner, 20*time.Millisecond, nil) + require.NotNil(t, hb) + + runDone := make(chan error, 1) + go func() { runDone <- hb.Run(context.Background()) }() + + hb.Stop() + select { + case err := <-runDone: + require.NoError(t, err) + case <-time.After(200 * time.Millisecond): + t.Fatal("Run did not exit after Stop") + } + + // Stop is idempotent; the second call must not panic on the + // already-closed done channel. + assert.NotPanics(t, hb.Stop) +} + +func TestHeartbeatingStreamContextCancelHaltsRun(t *testing.T) { + t.Parallel() + + inner := newCapturingStream(t) + hb := newHeartbeatingStream(inner, 20*time.Millisecond, nil) + require.NotNil(t, hb) + defer hb.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + runDone := make(chan error, 1) + go func() { runDone <- hb.Run(ctx) }() + + cancel() + select { + case err := <-runDone: + require.NoError(t, err) + case <-time.After(200 * time.Millisecond): + t.Fatal("Run did not exit after context cancel") + } +} + +func TestHeartbeatingStreamSendErrorPropagates(t *testing.T) { + t.Parallel() + + wantErr := errors.New("send failed") + inner := newCapturingStream(t) + inner.sendErr.Store(&errorBox{err: wantErr}) + + hb := newHeartbeatingStream(inner, time.Minute, nil) + require.NotNil(t, hb) + defer hb.Stop() + + err := hb.Send(&gatewayv1.GatewayEvent{EventType: "real.event"}) + require.ErrorIs(t, err, wantErr) +} + +// capturingStream is a minimal grpc.ServerStreamingServer that pushes +// every Send into a channel so tests can assert on the wire frame. +type capturingStream struct { + grpc.ServerStreamingServer[gatewayv1.GatewayEvent] + + events chan *gatewayv1.GatewayEvent + sendErr atomic.Pointer[errorBox] +} + +type errorBox struct{ err error } + +func newCapturingStream(t *testing.T) *capturingStream { + t.Helper() + + return &capturingStream{events: make(chan *gatewayv1.GatewayEvent, 16)} +} + +func (s *capturingStream) Send(event *gatewayv1.GatewayEvent) error { + if box := s.sendErr.Load(); box != nil { + return box.err + } + s.events <- event + + return nil +} + +func (s *capturingStream) Context() context.Context { return context.Background() } + +func (s *capturingStream) SetHeader(metadata.MD) error { return nil } +func (s *capturingStream) SendHeader(metadata.MD) error { return nil } +func (s *capturingStream) SetTrailer(metadata.MD) {} +func (s *capturingStream) SendMsg(any) error { return errors.New("capturingStream.SendMsg: unused") } +func (s *capturingStream) RecvMsg(any) error { return errors.New("capturingStream.RecvMsg: unused") } + +func (s *capturingStream) recv(t *testing.T, timeout time.Duration) *gatewayv1.GatewayEvent { + t.Helper() + + select { + case event := <-s.events: + return event + case <-time.After(timeout): + t.Fatalf("no event captured within %s", timeout) + return nil + } +} diff --git a/gateway/internal/grpcapi/push_stream.go b/gateway/internal/grpcapi/push_stream.go index 7189f8e..de7aada 100644 --- a/gateway/internal/grpcapi/push_stream.go +++ b/gateway/internal/grpcapi/push_stream.go @@ -4,9 +4,11 @@ import ( "bytes" "context" "crypto/sha256" + "time" "galaxy/gateway/authn" "galaxy/gateway/internal/clock" + "galaxy/gateway/internal/telemetry" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" gatewayfbs "galaxy/schema/fbs/gateway" @@ -16,7 +18,31 @@ import ( "google.golang.org/grpc/status" ) -const serverTimeEventType = "gateway.server_time" +const ( + serverTimeEventType = "gateway.server_time" + + // gatewayHeartbeatEventType labels the silence-filling event the + // authenticated push stream emits when no real event has been Send'd + // within `AuthenticatedGRPCConfig.PushHeartbeatInterval`. Browser + // fetch-streaming layers (notably Safari) close response bodies they + // consider idle; the heartbeat keeps the body active so push events + // land on the live stream instead of disappearing into the + // client-side reconnect window. + // + // Heartbeat events are sent UNSIGNED — `EventID`, `RequestID`, + // `TraceID`, `PayloadBytes`, `PayloadHash`, `Signature`, and + // `TimestampMs` are all left at their proto3 defaults so the wire + // frame stays under ~50 bytes. The UI's EventStream short-circuits + // on this event type before signature verification (see + // `ui/frontend/src/api/events.svelte.ts`) and never dispatches it to + // handlers. The security implication is intentional and documented + // in `docs/ARCHITECTURE.md` (§ authenticated edge): an attacker who + // could inject heartbeats gains nothing — they carry no payload and + // trigger no UI behaviour, the only practical effect is keeping a + // stream marginally more alive than transport-level keepalives + // would. Real events keep the signed envelope unchanged. + gatewayHeartbeatEventType = "gateway.heartbeat" +) // authenticatedStreamBinding captures the verified identity bound to one // authenticated SubscribeEvents stream after the full ingress pipeline @@ -47,12 +73,21 @@ func authenticatedStreamBindingFromContext(ctx context.Context) (authenticatedSt // authenticatedPushStreamService owns SubscribeEvents bootstrap behavior: // bind the authenticated stream, send the initial signed server-time event, // and then hand the stream lifecycle to the configured tail delegate. +// +// A positive `heartbeatInterval` wraps the bound stream with +// `heartbeatingStream` before delegating, so any tail implementation +// (fan-out, hold-open, future variants) gets the silence-based +// `gateway.heartbeat` for free. The wrapper observes every real Send +// the tail performs and only emits a heartbeat when the silence window +// elapses; tails remain heartbeat-unaware. type authenticatedPushStreamService struct { gatewayv1.UnimplementedEdgeGatewayServer - tailDelegate gatewayv1.EdgeGatewayServer - responseSigner authn.ResponseSigner - clock clock.Clock + tailDelegate gatewayv1.EdgeGatewayServer + responseSigner authn.ResponseSigner + clock clock.Clock + heartbeatInterval time.Duration + metrics *telemetry.Runtime } // SubscribeEvents binds the verified stream identity, sends the initial signed @@ -112,18 +147,40 @@ func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.Subscribe return err } - return s.tailDelegate.SubscribeEvents(req, boundStream) + var streamForTail grpc.ServerStreamingServer[gatewayv1.GatewayEvent] = boundStream + if hbStream := newHeartbeatingStream(boundStream, s.heartbeatInterval, s.metrics); hbStream != nil { + defer hbStream.Stop() + go func() { + // Heartbeat Send failures imply the transport is already + // dead — the tail's next Send will hit the same error and + // surface through the gateway observability layer, so we + // discard the returned error here and rely on that path + // for the canonical failure record. + _ = hbStream.Run(stream.Context()) + }() + streamForTail = hbStream + } + + return s.tailDelegate.SubscribeEvents(req, streamForTail) } -func newAuthenticatedPushStreamService(tailDelegate gatewayv1.EdgeGatewayServer, responseSigner authn.ResponseSigner, clk clock.Clock) gatewayv1.EdgeGatewayServer { +func newAuthenticatedPushStreamService( + tailDelegate gatewayv1.EdgeGatewayServer, + responseSigner authn.ResponseSigner, + clk clock.Clock, + heartbeatInterval time.Duration, + metrics *telemetry.Runtime, +) gatewayv1.EdgeGatewayServer { if tailDelegate == nil { tailDelegate = holdOpenSubscribeEventsService{} } return authenticatedPushStreamService{ - tailDelegate: tailDelegate, - responseSigner: responseSigner, - clock: clk, + tailDelegate: tailDelegate, + responseSigner: responseSigner, + clock: clk, + heartbeatInterval: heartbeatInterval, + metrics: metrics, } } diff --git a/gateway/internal/grpcapi/server.go b/gateway/internal/grpcapi/server.go index 93cf13e..d3dc748 100644 --- a/gateway/internal/grpcapi/server.go +++ b/gateway/internal/grpcapi/server.go @@ -111,7 +111,13 @@ func NewServer(cfg config.AuthenticatedGRPCConfig, deps ServerDependencies) *Ser deps = normalizeServerDependencies(deps) finalService := newCommandRoutingService( - newAuthenticatedPushStreamService(deps.Service, deps.ResponseSigner, deps.Clock), + newAuthenticatedPushStreamService( + deps.Service, + deps.ResponseSigner, + deps.Clock, + cfg.PushHeartbeatInterval, + deps.Telemetry, + ), deps.Router, deps.ResponseSigner, deps.Clock, diff --git a/gateway/internal/grpcapi/server_test.go b/gateway/internal/grpcapi/server_test.go index 819b02e..2dcdc57 100644 --- a/gateway/internal/grpcapi/server_test.go +++ b/gateway/internal/grpcapi/server_test.go @@ -174,6 +174,102 @@ func TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation( assert.Equal(t, connect.CodeCanceled, connect.CodeOf(recvErr)) } +func TestSubscribeEventsEmitsHeartbeatAfterSilenceWindow(t *testing.T) { + t.Parallel() + + grpcCfg := config.DefaultAuthenticatedGRPCConfig() + grpcCfg.Addr = "127.0.0.1:0" + grpcCfg.FreshnessWindow = testFreshnessWindow + // 30 ms keeps the test inside the standard 60-second go test + // timeout while still giving the heartbeat goroutine enough + // headroom to fire after the bootstrap server-time event lands. + grpcCfg.PushHeartbeatInterval = 30 * time.Millisecond + + server, runGateway := newTestGatewayWithGRPCConfig(t, grpcCfg, ServerDependencies{ + SessionCache: staticSessionCache{ + lookupFunc: func(context.Context, string) (session.Record, error) { + return newActiveSessionRecord(), nil + }, + }, + ReplayStore: staticReplayStore{}, + }) + defer runGateway.stop(t) + + addr := waitForListenAddr(t, server) + client := newEdgeClient(t, addr) + + stream, err := client.SubscribeEvents(t.Context(), connect.NewRequest(newValidSubscribeEventsRequest())) + require.NoError(t, err) + t.Cleanup(func() { _ = stream.Close() }) + + bootstrap := recvBootstrapEvent(t, stream) + assertServerTimeBootstrapEvent(t, bootstrap, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli()) + + // The next frame must be the unsigned heartbeat. Every field + // except EventType is left at its proto3 default — the UI side + // short-circuits on EventType BEFORE signature verification, so + // any non-empty signature would be wasted bytes on the wire. + require.True(t, stream.Receive(), "stream did not deliver a heartbeat: %v", stream.Err()) + heartbeat := stream.Msg() + assert.Equal(t, gatewayHeartbeatEventType, heartbeat.GetEventType()) + assert.Empty(t, heartbeat.GetEventId()) + assert.Zero(t, heartbeat.GetTimestampMs()) + assert.Empty(t, heartbeat.GetPayloadBytes()) + assert.Empty(t, heartbeat.GetPayloadHash()) + assert.Empty(t, heartbeat.GetSignature()) + assert.Empty(t, heartbeat.GetRequestId()) + assert.Empty(t, heartbeat.GetTraceId()) +} + +func TestSubscribeEventsZeroHeartbeatIntervalDisablesEmission(t *testing.T) { + t.Parallel() + + grpcCfg := config.DefaultAuthenticatedGRPCConfig() + grpcCfg.Addr = "127.0.0.1:0" + grpcCfg.FreshnessWindow = testFreshnessWindow + grpcCfg.PushHeartbeatInterval = 0 + + server, runGateway := newTestGatewayWithGRPCConfig(t, grpcCfg, ServerDependencies{ + SessionCache: staticSessionCache{ + lookupFunc: func(context.Context, string) (session.Record, error) { + return newActiveSessionRecord(), nil + }, + }, + ReplayStore: staticReplayStore{}, + }) + defer runGateway.stop(t) + + addr := waitForListenAddr(t, server) + client := newEdgeClient(t, addr) + + stream, err := client.SubscribeEvents(t.Context(), connect.NewRequest(newValidSubscribeEventsRequest())) + require.NoError(t, err) + t.Cleanup(func() { _ = stream.Close() }) + + bootstrap := recvBootstrapEvent(t, stream) + assertServerTimeBootstrapEvent(t, bootstrap, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli()) + + // No heartbeat is expected — the stream must stay silent. A + // background Receive races a deadline check so the test fails + // fast if the gateway ever sends a second frame on this stream. + recvResult := make(chan error, 1) + go func() { + if stream.Receive() { + recvResult <- errors.New("stream produced unexpected event") + return + } + recvResult <- stream.Err() + }() + require.Never(t, func() bool { + select { + case <-recvResult: + return true + default: + return false + } + }, 200*time.Millisecond, 20*time.Millisecond, "heartbeat fired despite zero interval") +} + func TestSubscribeEventsMissingReplayStoreFailsClosed(t *testing.T) { t.Parallel() diff --git a/gateway/internal/telemetry/runtime.go b/gateway/internal/telemetry/runtime.go index 5616c32..06507b3 100644 --- a/gateway/internal/telemetry/runtime.go +++ b/gateway/internal/telemetry/runtime.go @@ -46,6 +46,7 @@ type Runtime struct { // Push instruments. pushActiveStreams metric.Int64UpDownCounter pushStreamClosers metric.Int64Counter + pushHeartbeats metric.Int64Counter // Internal event consumer instruments. internalEventDrops metric.Int64Counter @@ -120,6 +121,10 @@ func New(ctx context.Context, logger *zap.Logger) (*Runtime, error) { if err != nil { return nil, err } + pushHeartbeats, err := meter.Int64Counter("gateway.push.heartbeats_sent") + if err != nil { + return nil, err + } internalEventDrops, err := meter.Int64Counter("gateway.internal_event_drops") if err != nil { return nil, err @@ -136,6 +141,7 @@ func New(ctx context.Context, logger *zap.Logger) (*Runtime, error) { grpcDuration: grpcDuration, pushActiveStreams: pushActiveStreams, pushStreamClosers: pushStreamClosers, + pushHeartbeats: pushHeartbeats, internalEventDrops: internalEventDrops, }, nil } @@ -228,6 +234,19 @@ func (r *Runtime) RecordPushStreamClosure(ctx context.Context, attrs ...attribut r.pushStreamClosers.Add(ctx, 1, metric.WithAttributes(attrs...)) } +// RecordPushHeartbeat records one outbound push-stream heartbeat event. +// The `outcome` attribute should distinguish a successful Send from a +// transport-level failure so the metric stays useful for bandwidth +// budgeting (most heartbeats are `sent`; a sudden bump of `error` means +// the upstream connection is failing before the gateway can flush). +func (r *Runtime) RecordPushHeartbeat(ctx context.Context, attrs ...attribute.KeyValue) { + if r == nil { + return + } + + r.pushHeartbeats.Add(ctx, 1, metric.WithAttributes(attrs...)) +} + // RecordInternalEventDrop records one malformed or rejected internal event. func (r *Runtime) RecordInternalEventDrop(ctx context.Context, attrs ...attribute.KeyValue) { if r == nil { diff --git a/ui/frontend/src/api/events.svelte.ts b/ui/frontend/src/api/events.svelte.ts index 56903a2..a928d72 100644 --- a/ui/frontend/src/api/events.svelte.ts +++ b/ui/frontend/src/api/events.svelte.ts @@ -24,7 +24,6 @@ import { ConnectError } from "@connectrpc/connect"; import type { Core } from "../platform/core/index"; import type { DeviceKeypair } from "../platform/store/index"; import { - GatewayEventSchema, SubscribeEventsRequestSchema, type GatewayEvent, } from "../proto/galaxy/gateway/v1/edge_gateway_pb"; @@ -35,6 +34,17 @@ import { createEdgeGatewayClient, type EdgeGatewayClient } from "./connect"; const PROTOCOL_VERSION = "v1"; const SUBSCRIBE_MESSAGE_TYPE = "gateway.subscribe"; +// HEARTBEAT_EVENT_TYPE matches `gatewayHeartbeatEventType` on the +// gateway side. The server emits this unsigned event with only the +// `eventType` field set when the push stream has been silent for the +// configured interval — its sole purpose is to keep browser fetch +// streaming layers (Safari is notably aggressive on idle timeouts) +// from closing the response body and causing push events to disappear +// into the reconnect window. The client SHORT-CIRCUITS on this event +// type before signature verification or handler dispatch: the payload +// is empty by design and there is no signature to validate. +const HEARTBEAT_EVENT_TYPE = "gateway.heartbeat"; + // Connect error code numerical values used by the watcher. The full // enum lives in `@connectrpc/connect` but importing the runtime enum // would pull a large surface into this small module. @@ -207,6 +217,20 @@ export class EventStream { if (signal.aborted) { return; } + if (event.eventType === HEARTBEAT_EVENT_TYPE) { + // Heartbeats are unsigned by design (see the + // constant's docstring) and carry no payload — skip + // verification + dispatch. They still count as + // connection proof: receiving one means the + // stream is healthy, so reset backoff like any + // other first event. + if (!firstEventSeen) { + firstEventSeen = true; + this.connectionStatus = "connected"; + attempt = 0; + } + continue; + } this.verifyEvent(event, opts); if (!firstEventSeen) { firstEventSeen = true; diff --git a/ui/frontend/tests/events.test.ts b/ui/frontend/tests/events.test.ts index f24ca97..507d289 100644 --- a/ui/frontend/tests/events.test.ts +++ b/ui/frontend/tests/events.test.ts @@ -321,6 +321,88 @@ describe("EventStream", () => { eventStream.stop(); }); + test("gateway.heartbeat is short-circuited before verification and dispatch", async () => { + const handler = vi.fn(); + eventStream.on("game.turn.ready", handler); + // A heartbeat with an empty signature (matching the unsigned + // wire shape the gateway emits) would normally trip + // `verifyEvent` and tear the stream down with a SignatureError. + // The short-circuit skips both verification and dispatch. + const heartbeat = create(GatewayEventSchema, { + eventType: "gateway.heartbeat", + eventId: "", + timestampMs: 0n, + payloadBytes: new Uint8Array(0), + payloadHash: new Uint8Array(0), + signature: new Uint8Array(0), + requestId: "", + traceId: "", + }); + const realEvent = buildEvent( + "game.turn.ready", + new TextEncoder().encode("{}"), + ); + + const verifyEventSpy = vi.fn(() => true); + const verifyPayloadHashSpy = vi.fn(() => true); + const core = mockCore({ + verifyEvent: verifyEventSpy, + verifyPayloadHash: verifyPayloadHashSpy, + }); + + const client = makeRouter(async function* () { + yield heartbeat; + yield realEvent; + }); + eventStream.start({ + core, + keypair: mockKeypair(), + deviceSessionId: "device-1", + gatewayResponsePublicKey: new Uint8Array(32), + client, + sleep: async () => {}, + random: () => 0, + }); + + await vi.waitFor(() => { + expect(handler).toHaveBeenCalled(); + }); + // Verification ran exactly once — for the real event, never + // for the heartbeat. The handler also only sees the real one. + expect(verifyEventSpy).toHaveBeenCalledTimes(1); + expect(verifyPayloadHashSpy).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledTimes(1); + expect(handler.mock.calls[0]?.[0].eventType).toBe("game.turn.ready"); + eventStream.stop(); + }); + + test("a leading heartbeat still flips connectionStatus to connected", async () => { + const heartbeat = create(GatewayEventSchema, { + eventType: "gateway.heartbeat", + }); + // The stream yields only a heartbeat then waits for the test + // to abort the consumer — the `connected` transition must not + // require a real event behind the heartbeat. + const pending = new Promise(() => {}); + const client = makeRouter(async function* () { + yield heartbeat; + await pending; + }); + eventStream.start({ + core: mockCore(), + keypair: mockKeypair(), + deviceSessionId: "device-1", + gatewayResponsePublicKey: new Uint8Array(32), + client, + sleep: async () => {}, + random: () => 0, + }); + await vi.waitFor(() => { + expect(eventStream.connectionStatus).toBe("connected"); + }); + eventStream.stop(); + }); + test("connectionStatus transitions through connecting → connected → idle", async () => { expect(eventStream.connectionStatus).toBe("idle"); const event = buildEvent(