Merge pull request 'feat(gateway): unsigned gateway.heartbeat keeps Safari push streams alive' (#17) from feature/subscribe-events-heartbeat into development
This commit was merged in pull request #17.
This commit is contained in:
@@ -691,6 +691,55 @@ stream opens is `event_type = gateway.server_time`, reusing the opening
|
|||||||
`request_id` as `event_id` and carrying `server_time_ms` so clients can
|
`request_id` as `event_id` and carrying `server_time_ms` so clients can
|
||||||
calibrate offset without a separate time request.
|
calibrate offset without a separate time request.
|
||||||
|
|
||||||
|
#### Unsigned `gateway.heartbeat` keepalive
|
||||||
|
|
||||||
|
Browser fetch-streaming layers (notably WebKit/Safari) close response
|
||||||
|
bodies they consider idle after roughly 15-30 seconds without
|
||||||
|
incoming bytes. A push stream in a quiet game (no `game.turn.ready`,
|
||||||
|
no diplomatic mail) would otherwise be torn down and reopened
|
||||||
|
repeatedly; events that fire during the reconnect window vanish
|
||||||
|
because `push.Hub` queues are not persisted across subscription
|
||||||
|
closes. To keep the body active, the gateway emits a
|
||||||
|
`gateway.heartbeat` event after `GATEWAY_PUSH_HEARTBEAT_INTERVAL` of
|
||||||
|
stream silence (default `15s`; set to `0s` to disable). Every real
|
||||||
|
event resets the silence timer, so the heartbeat fires rarely on
|
||||||
|
busy streams.
|
||||||
|
|
||||||
|
Heartbeats are sent **unsigned**: every field except `event_type` is
|
||||||
|
left at its protobuf default and no Ed25519 signature is computed.
|
||||||
|
The client short-circuits on the `gateway.heartbeat` type before
|
||||||
|
calling `verifyEvent` / `verifyPayloadHash` and never dispatches the
|
||||||
|
event to handlers. The security implication is intentional —
|
||||||
|
heartbeats carry no payload that the UI acts on, so an injected
|
||||||
|
heartbeat trivially fails to cause any user-visible state change.
|
||||||
|
TLS still protects the wire and the rest of the signed envelope is
|
||||||
|
unchanged for real events.
|
||||||
|
|
||||||
|
##### Wire cost projections
|
||||||
|
|
||||||
|
| Clients | 15 s | 30 s | 60 s |
|
||||||
|
| ------: | ---: | ---: | ---: |
|
||||||
|
| 100 | 25 MB/day | 13 MB/day | 6 MB/day |
|
||||||
|
| 1 000 | 250 MB/day | 125 MB/day | 62 MB/day |
|
||||||
|
| 10 000 | 2.5 GB/day | 1.3 GB/day | 0.6 GB/day |
|
||||||
|
| 100 000 | 25 GB/day | 12.5 GB/day | 6 GB/day |
|
||||||
|
|
||||||
|
Per-heartbeat budget at ~45 bytes on the wire (proto + Connect
|
||||||
|
framing + HTTP/2 DATA header + amortised TLS overhead), worst case
|
||||||
|
when no real event ever displaces a tick. Active streams trade
|
||||||
|
heartbeat traffic for real-event traffic 1:1, so the table is the
|
||||||
|
upper bound at the chosen interval. Larger deployments that are
|
||||||
|
willing to take a marginally higher Safari reconnect risk should
|
||||||
|
raise `GATEWAY_PUSH_HEARTBEAT_INTERVAL` toward 30 s before paying
|
||||||
|
the full table; setting `0s` reclaims all bytes at the cost of the
|
||||||
|
visible Safari reconnect loop returning.
|
||||||
|
|
||||||
|
Observability: every emission increments the
|
||||||
|
`gateway.push.heartbeats_sent{outcome}` counter, where
|
||||||
|
`outcome=sent` is the steady-state line item the operator budgets
|
||||||
|
bandwidth against and a sudden `outcome=error` bump means the
|
||||||
|
upstream connection is failing before the gateway can flush.
|
||||||
|
|
||||||
### Verification order at gateway
|
### Verification order at gateway
|
||||||
|
|
||||||
Before any payload is forwarded to backend, gateway must:
|
Before any payload is forwarded to backend, gateway must:
|
||||||
|
|||||||
@@ -820,6 +820,18 @@ internal hub. The first frame the client receives is a
|
|||||||
gateway-signed bootstrap event carrying the current server time, so
|
gateway-signed bootstrap event carrying the current server time, so
|
||||||
the client can calibrate its local clock without a separate request.
|
the client can calibrate its local clock without a separate request.
|
||||||
|
|
||||||
|
While the stream is open, gateway tracks a silence timer; if no real
|
||||||
|
event has been forwarded for `GATEWAY_PUSH_HEARTBEAT_INTERVAL`
|
||||||
|
(default `15s`, `0s` disables), gateway emits an unsigned
|
||||||
|
`gateway.heartbeat` event to keep browser fetch-streaming layers
|
||||||
|
from closing the response body as idle. Real events reset the
|
||||||
|
timer, so on busy streams the heartbeat fires rarely. The UI client
|
||||||
|
short-circuits the heartbeat type before signature verification and
|
||||||
|
never dispatches it to handlers — see
|
||||||
|
[`docs/ARCHITECTURE.md` § 15](ARCHITECTURE.md#15-transport-security-model-gateway-boundary)
|
||||||
|
for the wire-cost projection and the security rationale of leaving
|
||||||
|
the heartbeat unsigned.
|
||||||
|
|
||||||
### 7.3 Backend → gateway control
|
### 7.3 Backend → gateway control
|
||||||
|
|
||||||
Backend hosts a single gRPC service `Push.SubscribePush`, consumed
|
Backend hosts a single gRPC service `Push.SubscribePush`, consumed
|
||||||
|
|||||||
@@ -845,6 +845,18 @@ control-канал backend → gateway, который производит эт
|
|||||||
с текущим серверным временем, чтобы клиент мог калибровать свои
|
с текущим серверным временем, чтобы клиент мог калибровать свои
|
||||||
локальные часы без отдельного запроса.
|
локальные часы без отдельного запроса.
|
||||||
|
|
||||||
|
Пока стрим открыт, gateway отслеживает таймер тишины; если за
|
||||||
|
`GATEWAY_PUSH_HEARTBEAT_INTERVAL` (по умолчанию `15s`, `0s`
|
||||||
|
отключает) не пришло ни одного реального события, gateway
|
||||||
|
отправляет неподписанное `gateway.heartbeat`-событие, чтобы
|
||||||
|
браузерные fetch-streaming слои не закрыли response body как idle.
|
||||||
|
Реальные события сбрасывают таймер, поэтому на нагруженных
|
||||||
|
стримах heartbeat срабатывает редко. UI-клиент короткозамыкает
|
||||||
|
heartbeat-тип до верификации подписи и никогда не дотягивает его
|
||||||
|
до handlers — см.
|
||||||
|
[`docs/ARCHITECTURE.md` § 15](ARCHITECTURE.md#15-transport-security-model-gateway-boundary)
|
||||||
|
для расчёта траффика и обоснования отсутствия подписи у heartbeat.
|
||||||
|
|
||||||
### 7.3 Управление backend → gateway
|
### 7.3 Управление backend → gateway
|
||||||
|
|
||||||
Backend хостит единственный gRPC-сервис `Push.SubscribePush`,
|
Backend хостит единственный gRPC-сервис `Push.SubscribePush`,
|
||||||
|
|||||||
+11
-1
@@ -294,7 +294,17 @@ the stream to the verified `user_id` and `device_session_id`, sends one
|
|||||||
signed `gateway.server_time` bootstrap event whose FlatBuffers payload carries
|
signed `gateway.server_time` bootstrap event whose FlatBuffers payload carries
|
||||||
`server_time_ms`, registers the active stream in the in-memory `PushHub`, and
|
`server_time_ms`, registers the active stream in the in-memory `PushHub`, and
|
||||||
then forwards signed client-facing events consumed from the configured client
|
then forwards signed client-facing events consumed from the configured client
|
||||||
event Redis stream. User-targeted events fan out to every active stream for
|
event Redis stream. After the bootstrap, the stream is wrapped with a
|
||||||
|
silence-based heartbeat: when no real event has been forwarded for
|
||||||
|
`GATEWAY_PUSH_HEARTBEAT_INTERVAL` (default `15s`, set to `0s` to disable),
|
||||||
|
the gateway emits an unsigned `gateway.heartbeat` event so browser
|
||||||
|
fetch-streaming layers (Safari is the most aggressive) keep the response
|
||||||
|
body open and pending push events are not lost into the client-side
|
||||||
|
reconnect window. Each emission is counted by the
|
||||||
|
`gateway.push.heartbeats_sent{outcome}` metric; see
|
||||||
|
[`docs/ARCHITECTURE.md` § 15](../docs/ARCHITECTURE.md#15-transport-security-model-gateway-boundary)
|
||||||
|
for the bandwidth projection and the reasoning behind the unsigned
|
||||||
|
envelope. User-targeted events fan out to every active stream for
|
||||||
that user. Session-targeted events fan out only to streams whose
|
that user. Session-targeted events fan out only to streams whose
|
||||||
`user_id` and `device_session_id` both match the event target. Each active
|
`user_id` and `device_session_id` both match the event target. Each active
|
||||||
stream uses a bounded in-memory queue; when that queue overflows, only the
|
stream uses a bounded in-memory queue; when that queue overflows, only the
|
||||||
|
|||||||
@@ -125,6 +125,14 @@ const (
|
|||||||
// gRPC requests.
|
// gRPC requests.
|
||||||
authenticatedGRPCFreshnessWindowEnvVar = "GATEWAY_AUTHENTICATED_GRPC_FRESHNESS_WINDOW"
|
authenticatedGRPCFreshnessWindowEnvVar = "GATEWAY_AUTHENTICATED_GRPC_FRESHNESS_WINDOW"
|
||||||
|
|
||||||
|
// pushHeartbeatIntervalEnvVar names the environment variable that
|
||||||
|
// configures the silence-based heartbeat cadence for authenticated
|
||||||
|
// push streams. The heartbeat keeps idle SubscribeEvents responses
|
||||||
|
// alive across browser fetch-streaming idle timeouts (Safari is
|
||||||
|
// notably aggressive) so push events do not disappear into the
|
||||||
|
// reconnect window. A value of `0s` disables heartbeats entirely.
|
||||||
|
pushHeartbeatIntervalEnvVar = "GATEWAY_PUSH_HEARTBEAT_INTERVAL"
|
||||||
|
|
||||||
// authenticatedGRPCIPRateLimitRequestsEnvVar names the environment
|
// authenticatedGRPCIPRateLimitRequestsEnvVar names the environment
|
||||||
// variable that configures the authenticated gRPC per-IP request budget per
|
// variable that configures the authenticated gRPC per-IP request budget per
|
||||||
// window.
|
// window.
|
||||||
@@ -321,6 +329,13 @@ const (
|
|||||||
defaultAuthenticatedGRPCDownstreamTimeout = 5 * time.Second
|
defaultAuthenticatedGRPCDownstreamTimeout = 5 * time.Second
|
||||||
defaultAuthenticatedGRPCFreshnessWindow = 5 * time.Minute
|
defaultAuthenticatedGRPCFreshnessWindow = 5 * time.Minute
|
||||||
|
|
||||||
|
// defaultPushHeartbeatInterval is the silence window the push stream
|
||||||
|
// keeps open before emitting `gateway.heartbeat`. 15s is comfortably
|
||||||
|
// below the empirical Safari fetch-streaming idle threshold
|
||||||
|
// (~15-30s) and well above any realistic per-event rate, so the
|
||||||
|
// timer is almost always reset by a real event in active games.
|
||||||
|
defaultPushHeartbeatInterval = 15 * time.Second
|
||||||
|
|
||||||
defaultAuthenticatedGRPCIPRateLimitRequests = 120
|
defaultAuthenticatedGRPCIPRateLimitRequests = 120
|
||||||
defaultAuthenticatedGRPCIPRateLimitBurst = 40
|
defaultAuthenticatedGRPCIPRateLimitBurst = 40
|
||||||
|
|
||||||
@@ -549,6 +564,16 @@ type AuthenticatedGRPCConfig struct {
|
|||||||
// used for client request timestamps.
|
// used for client request timestamps.
|
||||||
FreshnessWindow time.Duration
|
FreshnessWindow time.Duration
|
||||||
|
|
||||||
|
// PushHeartbeatInterval is the silence window after which an open
|
||||||
|
// authenticated SubscribeEvents stream sends an unsigned
|
||||||
|
// `gateway.heartbeat` event. Every real Send resets the window, so
|
||||||
|
// in busy streams the heartbeat fires rarely. A zero or negative
|
||||||
|
// value disables the heartbeat — the stream then relies on
|
||||||
|
// transport-level keepalives only, which Safari's fetch-streaming
|
||||||
|
// layer ignores. See `docs/ARCHITECTURE.md` for the security
|
||||||
|
// rationale of leaving the heartbeat unsigned.
|
||||||
|
PushHeartbeatInterval time.Duration
|
||||||
|
|
||||||
// AntiAbuse configures the authenticated gRPC rate limits enforced after
|
// AntiAbuse configures the authenticated gRPC rate limits enforced after
|
||||||
// the request passes the transport authenticity checks.
|
// the request passes the transport authenticity checks.
|
||||||
AntiAbuse AuthenticatedGRPCAntiAbuseConfig
|
AntiAbuse AuthenticatedGRPCAntiAbuseConfig
|
||||||
@@ -719,6 +744,7 @@ func DefaultAuthenticatedGRPCConfig() AuthenticatedGRPCConfig {
|
|||||||
ConnectionTimeout: defaultAuthenticatedGRPCConnectionTimeout,
|
ConnectionTimeout: defaultAuthenticatedGRPCConnectionTimeout,
|
||||||
DownstreamTimeout: defaultAuthenticatedGRPCDownstreamTimeout,
|
DownstreamTimeout: defaultAuthenticatedGRPCDownstreamTimeout,
|
||||||
FreshnessWindow: defaultAuthenticatedGRPCFreshnessWindow,
|
FreshnessWindow: defaultAuthenticatedGRPCFreshnessWindow,
|
||||||
|
PushHeartbeatInterval: defaultPushHeartbeatInterval,
|
||||||
AntiAbuse: AuthenticatedGRPCAntiAbuseConfig{
|
AntiAbuse: AuthenticatedGRPCAntiAbuseConfig{
|
||||||
IP: AuthenticatedRateLimitConfig{
|
IP: AuthenticatedRateLimitConfig{
|
||||||
Requests: defaultAuthenticatedGRPCIPRateLimitRequests,
|
Requests: defaultAuthenticatedGRPCIPRateLimitRequests,
|
||||||
@@ -928,6 +954,12 @@ func LoadFromEnv() (Config, error) {
|
|||||||
}
|
}
|
||||||
cfg.AuthenticatedGRPC.DownstreamTimeout = authenticatedGRPCDownstreamTimeout
|
cfg.AuthenticatedGRPC.DownstreamTimeout = authenticatedGRPCDownstreamTimeout
|
||||||
|
|
||||||
|
pushHeartbeatInterval, err := loadDurationEnvWithDefault(pushHeartbeatIntervalEnvVar, cfg.AuthenticatedGRPC.PushHeartbeatInterval)
|
||||||
|
if err != nil {
|
||||||
|
return Config{}, err
|
||||||
|
}
|
||||||
|
cfg.AuthenticatedGRPC.PushHeartbeatInterval = pushHeartbeatInterval
|
||||||
|
|
||||||
authenticatedGRPCFreshnessWindow, err := loadDurationEnvWithDefault(authenticatedGRPCFreshnessWindowEnvVar, cfg.AuthenticatedGRPC.FreshnessWindow)
|
authenticatedGRPCFreshnessWindow, err := loadDurationEnvWithDefault(authenticatedGRPCFreshnessWindowEnvVar, cfg.AuthenticatedGRPC.FreshnessWindow)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Config{}, err
|
return Config{}, err
|
||||||
@@ -1156,6 +1188,9 @@ func LoadFromEnv() (Config, error) {
|
|||||||
if cfg.AuthenticatedGRPC.FreshnessWindow <= 0 {
|
if cfg.AuthenticatedGRPC.FreshnessWindow <= 0 {
|
||||||
return Config{}, fmt.Errorf("load gateway config: %s must be positive", authenticatedGRPCFreshnessWindowEnvVar)
|
return Config{}, fmt.Errorf("load gateway config: %s must be positive", authenticatedGRPCFreshnessWindowEnvVar)
|
||||||
}
|
}
|
||||||
|
if cfg.AuthenticatedGRPC.PushHeartbeatInterval < 0 {
|
||||||
|
return Config{}, fmt.Errorf("load gateway config: %s must not be negative", pushHeartbeatIntervalEnvVar)
|
||||||
|
}
|
||||||
if err := validateRateLimitConfig(
|
if err := validateRateLimitConfig(
|
||||||
cfg.AuthenticatedGRPC.AntiAbuse.IP,
|
cfg.AuthenticatedGRPC.AntiAbuse.IP,
|
||||||
authenticatedGRPCIPRateLimitRequestsEnvVar,
|
authenticatedGRPCIPRateLimitRequestsEnvVar,
|
||||||
|
|||||||
@@ -164,6 +164,30 @@ func TestLoadFromEnvAppliesPublicAndAuthGRPCDefaults(t *testing.T) {
|
|||||||
assert.Equal(t, defaultAuthenticatedGRPCConnectionTimeout, cfg.AuthenticatedGRPC.ConnectionTimeout)
|
assert.Equal(t, defaultAuthenticatedGRPCConnectionTimeout, cfg.AuthenticatedGRPC.ConnectionTimeout)
|
||||||
assert.Equal(t, defaultAuthenticatedGRPCDownstreamTimeout, cfg.AuthenticatedGRPC.DownstreamTimeout)
|
assert.Equal(t, defaultAuthenticatedGRPCDownstreamTimeout, cfg.AuthenticatedGRPC.DownstreamTimeout)
|
||||||
assert.Equal(t, defaultAuthenticatedGRPCFreshnessWindow, cfg.AuthenticatedGRPC.FreshnessWindow)
|
assert.Equal(t, defaultAuthenticatedGRPCFreshnessWindow, cfg.AuthenticatedGRPC.FreshnessWindow)
|
||||||
|
assert.Equal(t, defaultPushHeartbeatInterval, cfg.AuthenticatedGRPC.PushHeartbeatInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadFromEnvParsesPushHeartbeatInterval(t *testing.T) {
|
||||||
|
configEnvMu.Lock()
|
||||||
|
defer configEnvMu.Unlock()
|
||||||
|
|
||||||
|
resetEnv(t)
|
||||||
|
setBaseRequiredEnv(t)
|
||||||
|
t.Setenv(pushHeartbeatIntervalEnvVar, "0s")
|
||||||
|
|
||||||
|
cfg, err := LoadFromEnv()
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, time.Duration(0), cfg.AuthenticatedGRPC.PushHeartbeatInterval, "0s explicitly disables the heartbeat")
|
||||||
|
|
||||||
|
t.Setenv(pushHeartbeatIntervalEnvVar, "25s")
|
||||||
|
cfg, err = LoadFromEnv()
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 25*time.Second, cfg.AuthenticatedGRPC.PushHeartbeatInterval)
|
||||||
|
|
||||||
|
t.Setenv(pushHeartbeatIntervalEnvVar, "-1s")
|
||||||
|
_, err = LoadFromEnv()
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), pushHeartbeatIntervalEnvVar)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLoadFromEnvParsesCORSAllowedOrigins(t *testing.T) {
|
func TestLoadFromEnvParsesCORSAllowedOrigins(t *testing.T) {
|
||||||
@@ -211,6 +235,7 @@ func resetEnv(t *testing.T) {
|
|||||||
authenticatedGRPCConnectionTimeoutEnvVar,
|
authenticatedGRPCConnectionTimeoutEnvVar,
|
||||||
authenticatedGRPCDownstreamTimeoutEnvVar,
|
authenticatedGRPCDownstreamTimeoutEnvVar,
|
||||||
authenticatedGRPCFreshnessWindowEnvVar,
|
authenticatedGRPCFreshnessWindowEnvVar,
|
||||||
|
pushHeartbeatIntervalEnvVar,
|
||||||
gatewayRedisMasterAddrEnvVar,
|
gatewayRedisMasterAddrEnvVar,
|
||||||
gatewayRedisPasswordEnvVar,
|
gatewayRedisPasswordEnvVar,
|
||||||
replayRedisKeyPrefixEnvVar,
|
replayRedisKeyPrefixEnvVar,
|
||||||
|
|||||||
@@ -0,0 +1,163 @@
|
|||||||
|
package grpcapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"galaxy/gateway/internal/telemetry"
|
||||||
|
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// heartbeatingStream wraps a server-streaming response so the inner
|
||||||
|
// stream stays alive across browser fetch-streaming idle timeouts.
|
||||||
|
// Every call to Send (a real event from a tail service) resets a
|
||||||
|
// silence timer; when the timer fires, Run emits an unsigned
|
||||||
|
// `gateway.heartbeat` event on its own. Send and the heartbeat
|
||||||
|
// goroutine serialise on the same mutex because grpc.ServerStream.Send
|
||||||
|
// is documented as not goroutine-safe.
|
||||||
|
//
|
||||||
|
// Wire-cost budgeting: each heartbeat is one GatewayEvent with only
|
||||||
|
// EventType populated (~17 bytes + protobuf tag), framed by Connect
|
||||||
|
// (~5 bytes) and HTTP/2 plus TLS overhead (~50 bytes). At the
|
||||||
|
// 15-second default a fully-idle stream costs ~840 KB/day per client;
|
||||||
|
// see `docs/ARCHITECTURE.md` for the per-scale projection.
|
||||||
|
type heartbeatingStream struct {
|
||||||
|
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
|
||||||
|
|
||||||
|
interval time.Duration
|
||||||
|
metrics *telemetry.Runtime
|
||||||
|
|
||||||
|
sendMu sync.Mutex
|
||||||
|
timer *time.Timer
|
||||||
|
|
||||||
|
stopOnce sync.Once
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newHeartbeatingStream wraps inner with a silence-based heartbeat
|
||||||
|
// emitter. A non-positive interval returns nil so the caller can skip
|
||||||
|
// the wrapping entirely; non-nil returns must have `Stop()` called once
|
||||||
|
// the stream lifecycle ends.
|
||||||
|
func newHeartbeatingStream(
|
||||||
|
inner grpc.ServerStreamingServer[gatewayv1.GatewayEvent],
|
||||||
|
interval time.Duration,
|
||||||
|
metrics *telemetry.Runtime,
|
||||||
|
) *heartbeatingStream {
|
||||||
|
if interval <= 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &heartbeatingStream{
|
||||||
|
ServerStreamingServer: inner,
|
||||||
|
interval: interval,
|
||||||
|
metrics: metrics,
|
||||||
|
timer: time.NewTimer(interval),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send forwards event to the inner stream and resets the silence timer
|
||||||
|
// so the heartbeat goroutine waits a fresh interval before firing
|
||||||
|
// again. A Send that succeeds means the transport just delivered real
|
||||||
|
// bytes; the silence window restarts from "now".
|
||||||
|
func (s *heartbeatingStream) Send(event *gatewayv1.GatewayEvent) error {
|
||||||
|
s.sendMu.Lock()
|
||||||
|
defer s.sendMu.Unlock()
|
||||||
|
if err := s.ServerStreamingServer.Send(event); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.resetTimerLocked()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run blocks until ctx is canceled or Stop is called, emitting one
|
||||||
|
// `gateway.heartbeat` event whenever the silence timer fires. Intended
|
||||||
|
// to run in its own goroutine alongside the tail service that owns the
|
||||||
|
// stream. A Send failure from the heartbeat path is recorded in
|
||||||
|
// telemetry and returned to the caller; production wiring discards it
|
||||||
|
// because the tail service will see the same transport failure on its
|
||||||
|
// next Send and propagate the real error to the gateway frame
|
||||||
|
// observability layer.
|
||||||
|
func (s *heartbeatingStream) Run(ctx context.Context) error {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case <-s.done:
|
||||||
|
return nil
|
||||||
|
case <-s.timer.C:
|
||||||
|
err := s.sendHeartbeat()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop halts the heartbeat goroutine and drains the silence timer.
|
||||||
|
// Safe to call multiple times; subsequent calls are no-ops.
|
||||||
|
func (s *heartbeatingStream) Stop() {
|
||||||
|
s.stopOnce.Do(func() {
|
||||||
|
close(s.done)
|
||||||
|
if !s.timer.Stop() {
|
||||||
|
select {
|
||||||
|
case <-s.timer.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendHeartbeat emits one heartbeat event, records the outcome in
|
||||||
|
// telemetry, and re-arms the silence timer. The outcome attribute
|
||||||
|
// makes a sudden bump of `error` easy to spot in dashboards — it
|
||||||
|
// usually means the upstream connection is failing before the gateway
|
||||||
|
// can flush, while a steady `sent` rate is the normal idle baseline
|
||||||
|
// the deployment operator budgets bandwidth against.
|
||||||
|
func (s *heartbeatingStream) sendHeartbeat() error {
|
||||||
|
s.sendMu.Lock()
|
||||||
|
defer s.sendMu.Unlock()
|
||||||
|
|
||||||
|
err := s.ServerStreamingServer.Send(buildHeartbeatEvent())
|
||||||
|
outcome := attribute.String("outcome", "sent")
|
||||||
|
if err != nil {
|
||||||
|
outcome = attribute.String("outcome", "error")
|
||||||
|
}
|
||||||
|
s.metrics.RecordPushHeartbeat(context.Background(), outcome)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.resetTimerLocked()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// resetTimerLocked re-arms the silence timer. Caller must hold sendMu.
|
||||||
|
// The drain follows the canonical pattern from the time.Timer
|
||||||
|
// docstring: Stop may report `false` either because the timer already
|
||||||
|
// fired or because nothing was queued, so the non-blocking drain
|
||||||
|
// handles both states without deadlocking when the channel was already
|
||||||
|
// emptied by Run.
|
||||||
|
func (s *heartbeatingStream) resetTimerLocked() {
|
||||||
|
if !s.timer.Stop() {
|
||||||
|
select {
|
||||||
|
case <-s.timer.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.timer.Reset(s.interval)
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildHeartbeatEvent returns the minimal `gateway.heartbeat`
|
||||||
|
// GatewayEvent emitted into the push stream. Every field except
|
||||||
|
// EventType is left at its proto3 default so the wire frame stays as
|
||||||
|
// small as Connect framing allows. See `gatewayHeartbeatEventType` for
|
||||||
|
// the security rationale of leaving the event unsigned.
|
||||||
|
func buildHeartbeatEvent() *gatewayv1.GatewayEvent {
|
||||||
|
return &gatewayv1.GatewayEvent{EventType: gatewayHeartbeatEventType}
|
||||||
|
}
|
||||||
@@ -0,0 +1,185 @@
|
|||||||
|
package grpcapi
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewHeartbeatingStreamZeroIntervalReturnsNil(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
stream := newHeartbeatingStream(newCapturingStream(t), 0, nil)
|
||||||
|
assert.Nil(t, stream, "zero interval must not allocate a wrapper")
|
||||||
|
|
||||||
|
negative := newHeartbeatingStream(newCapturingStream(t), -time.Second, nil)
|
||||||
|
assert.Nil(t, negative, "negative interval must not allocate a wrapper")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeatingStreamSendsHeartbeatAfterSilence(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
inner := newCapturingStream(t)
|
||||||
|
hb := newHeartbeatingStream(inner, 30*time.Millisecond, nil)
|
||||||
|
require.NotNil(t, hb)
|
||||||
|
defer hb.Stop()
|
||||||
|
|
||||||
|
go func() { _ = hb.Run(t.Context()) }()
|
||||||
|
|
||||||
|
event := inner.recv(t, 200*time.Millisecond)
|
||||||
|
assert.Equal(t, gatewayHeartbeatEventType, event.GetEventType())
|
||||||
|
// Heartbeat envelope: only the event type travels. Every other
|
||||||
|
// field stays at proto3 default so the wire frame stays minimal.
|
||||||
|
assert.Empty(t, event.GetEventId())
|
||||||
|
assert.Zero(t, event.GetTimestampMs())
|
||||||
|
assert.Empty(t, event.GetPayloadBytes())
|
||||||
|
assert.Empty(t, event.GetPayloadHash())
|
||||||
|
assert.Empty(t, event.GetSignature())
|
||||||
|
assert.Empty(t, event.GetRequestId())
|
||||||
|
assert.Empty(t, event.GetTraceId())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeatingStreamRealSendResetsSilenceTimer(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
inner := newCapturingStream(t)
|
||||||
|
hb := newHeartbeatingStream(inner, 50*time.Millisecond, nil)
|
||||||
|
require.NotNil(t, hb)
|
||||||
|
defer hb.Stop()
|
||||||
|
|
||||||
|
go func() { _ = hb.Run(t.Context()) }()
|
||||||
|
|
||||||
|
// Reset the timer every 20ms for 120ms — the silence window never
|
||||||
|
// elapses, so the heartbeat goroutine must stay quiet and the
|
||||||
|
// channel must only carry the manual real-event Sends.
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(20 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for range 6 {
|
||||||
|
<-ticker.C
|
||||||
|
if err := hb.Send(&gatewayv1.GatewayEvent{EventType: "real.event"}); err != nil {
|
||||||
|
t.Errorf("real Send failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for range 6 {
|
||||||
|
event := inner.recv(t, 100*time.Millisecond)
|
||||||
|
assert.Equal(t, "real.event", event.GetEventType(), "only real events should appear while Send keeps resetting the silence window")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeatingStreamStopHaltsRun(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
inner := newCapturingStream(t)
|
||||||
|
hb := newHeartbeatingStream(inner, 20*time.Millisecond, nil)
|
||||||
|
require.NotNil(t, hb)
|
||||||
|
|
||||||
|
runDone := make(chan error, 1)
|
||||||
|
go func() { runDone <- hb.Run(context.Background()) }()
|
||||||
|
|
||||||
|
hb.Stop()
|
||||||
|
select {
|
||||||
|
case err := <-runDone:
|
||||||
|
require.NoError(t, err)
|
||||||
|
case <-time.After(200 * time.Millisecond):
|
||||||
|
t.Fatal("Run did not exit after Stop")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop is idempotent; the second call must not panic on the
|
||||||
|
// already-closed done channel.
|
||||||
|
assert.NotPanics(t, hb.Stop)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeatingStreamContextCancelHaltsRun(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
inner := newCapturingStream(t)
|
||||||
|
hb := newHeartbeatingStream(inner, 20*time.Millisecond, nil)
|
||||||
|
require.NotNil(t, hb)
|
||||||
|
defer hb.Stop()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
runDone := make(chan error, 1)
|
||||||
|
go func() { runDone <- hb.Run(ctx) }()
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
select {
|
||||||
|
case err := <-runDone:
|
||||||
|
require.NoError(t, err)
|
||||||
|
case <-time.After(200 * time.Millisecond):
|
||||||
|
t.Fatal("Run did not exit after context cancel")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeatingStreamSendErrorPropagates(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
wantErr := errors.New("send failed")
|
||||||
|
inner := newCapturingStream(t)
|
||||||
|
inner.sendErr.Store(&errorBox{err: wantErr})
|
||||||
|
|
||||||
|
hb := newHeartbeatingStream(inner, time.Minute, nil)
|
||||||
|
require.NotNil(t, hb)
|
||||||
|
defer hb.Stop()
|
||||||
|
|
||||||
|
err := hb.Send(&gatewayv1.GatewayEvent{EventType: "real.event"})
|
||||||
|
require.ErrorIs(t, err, wantErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// capturingStream is a minimal grpc.ServerStreamingServer that pushes
|
||||||
|
// every Send into a channel so tests can assert on the wire frame.
|
||||||
|
type capturingStream struct {
|
||||||
|
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
|
||||||
|
|
||||||
|
events chan *gatewayv1.GatewayEvent
|
||||||
|
sendErr atomic.Pointer[errorBox]
|
||||||
|
}
|
||||||
|
|
||||||
|
type errorBox struct{ err error }
|
||||||
|
|
||||||
|
func newCapturingStream(t *testing.T) *capturingStream {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
return &capturingStream{events: make(chan *gatewayv1.GatewayEvent, 16)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *capturingStream) Send(event *gatewayv1.GatewayEvent) error {
|
||||||
|
if box := s.sendErr.Load(); box != nil {
|
||||||
|
return box.err
|
||||||
|
}
|
||||||
|
s.events <- event
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *capturingStream) Context() context.Context { return context.Background() }
|
||||||
|
|
||||||
|
func (s *capturingStream) SetHeader(metadata.MD) error { return nil }
|
||||||
|
func (s *capturingStream) SendHeader(metadata.MD) error { return nil }
|
||||||
|
func (s *capturingStream) SetTrailer(metadata.MD) {}
|
||||||
|
func (s *capturingStream) SendMsg(any) error { return errors.New("capturingStream.SendMsg: unused") }
|
||||||
|
func (s *capturingStream) RecvMsg(any) error { return errors.New("capturingStream.RecvMsg: unused") }
|
||||||
|
|
||||||
|
func (s *capturingStream) recv(t *testing.T, timeout time.Duration) *gatewayv1.GatewayEvent {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case event := <-s.events:
|
||||||
|
return event
|
||||||
|
case <-time.After(timeout):
|
||||||
|
t.Fatalf("no event captured within %s", timeout)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,9 +4,11 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
"time"
|
||||||
|
|
||||||
"galaxy/gateway/authn"
|
"galaxy/gateway/authn"
|
||||||
"galaxy/gateway/internal/clock"
|
"galaxy/gateway/internal/clock"
|
||||||
|
"galaxy/gateway/internal/telemetry"
|
||||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||||
gatewayfbs "galaxy/schema/fbs/gateway"
|
gatewayfbs "galaxy/schema/fbs/gateway"
|
||||||
|
|
||||||
@@ -16,7 +18,31 @@ import (
|
|||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
const serverTimeEventType = "gateway.server_time"
|
const (
|
||||||
|
serverTimeEventType = "gateway.server_time"
|
||||||
|
|
||||||
|
// gatewayHeartbeatEventType labels the silence-filling event the
|
||||||
|
// authenticated push stream emits when no real event has been Send'd
|
||||||
|
// within `AuthenticatedGRPCConfig.PushHeartbeatInterval`. Browser
|
||||||
|
// fetch-streaming layers (notably Safari) close response bodies they
|
||||||
|
// consider idle; the heartbeat keeps the body active so push events
|
||||||
|
// land on the live stream instead of disappearing into the
|
||||||
|
// client-side reconnect window.
|
||||||
|
//
|
||||||
|
// Heartbeat events are sent UNSIGNED — `EventID`, `RequestID`,
|
||||||
|
// `TraceID`, `PayloadBytes`, `PayloadHash`, `Signature`, and
|
||||||
|
// `TimestampMs` are all left at their proto3 defaults so the wire
|
||||||
|
// frame stays under ~50 bytes. The UI's EventStream short-circuits
|
||||||
|
// on this event type before signature verification (see
|
||||||
|
// `ui/frontend/src/api/events.svelte.ts`) and never dispatches it to
|
||||||
|
// handlers. The security implication is intentional and documented
|
||||||
|
// in `docs/ARCHITECTURE.md` (§ authenticated edge): an attacker who
|
||||||
|
// could inject heartbeats gains nothing — they carry no payload and
|
||||||
|
// trigger no UI behaviour, the only practical effect is keeping a
|
||||||
|
// stream marginally more alive than transport-level keepalives
|
||||||
|
// would. Real events keep the signed envelope unchanged.
|
||||||
|
gatewayHeartbeatEventType = "gateway.heartbeat"
|
||||||
|
)
|
||||||
|
|
||||||
// authenticatedStreamBinding captures the verified identity bound to one
|
// authenticatedStreamBinding captures the verified identity bound to one
|
||||||
// authenticated SubscribeEvents stream after the full ingress pipeline
|
// authenticated SubscribeEvents stream after the full ingress pipeline
|
||||||
@@ -47,12 +73,21 @@ func authenticatedStreamBindingFromContext(ctx context.Context) (authenticatedSt
|
|||||||
// authenticatedPushStreamService owns SubscribeEvents bootstrap behavior:
|
// authenticatedPushStreamService owns SubscribeEvents bootstrap behavior:
|
||||||
// bind the authenticated stream, send the initial signed server-time event,
|
// bind the authenticated stream, send the initial signed server-time event,
|
||||||
// and then hand the stream lifecycle to the configured tail delegate.
|
// and then hand the stream lifecycle to the configured tail delegate.
|
||||||
|
//
|
||||||
|
// A positive `heartbeatInterval` wraps the bound stream with
|
||||||
|
// `heartbeatingStream` before delegating, so any tail implementation
|
||||||
|
// (fan-out, hold-open, future variants) gets the silence-based
|
||||||
|
// `gateway.heartbeat` for free. The wrapper observes every real Send
|
||||||
|
// the tail performs and only emits a heartbeat when the silence window
|
||||||
|
// elapses; tails remain heartbeat-unaware.
|
||||||
type authenticatedPushStreamService struct {
|
type authenticatedPushStreamService struct {
|
||||||
gatewayv1.UnimplementedEdgeGatewayServer
|
gatewayv1.UnimplementedEdgeGatewayServer
|
||||||
|
|
||||||
tailDelegate gatewayv1.EdgeGatewayServer
|
tailDelegate gatewayv1.EdgeGatewayServer
|
||||||
responseSigner authn.ResponseSigner
|
responseSigner authn.ResponseSigner
|
||||||
clock clock.Clock
|
clock clock.Clock
|
||||||
|
heartbeatInterval time.Duration
|
||||||
|
metrics *telemetry.Runtime
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeEvents binds the verified stream identity, sends the initial signed
|
// SubscribeEvents binds the verified stream identity, sends the initial signed
|
||||||
@@ -112,10 +147,30 @@ func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.Subscribe
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.tailDelegate.SubscribeEvents(req, boundStream)
|
var streamForTail grpc.ServerStreamingServer[gatewayv1.GatewayEvent] = boundStream
|
||||||
|
if hbStream := newHeartbeatingStream(boundStream, s.heartbeatInterval, s.metrics); hbStream != nil {
|
||||||
|
defer hbStream.Stop()
|
||||||
|
go func() {
|
||||||
|
// Heartbeat Send failures imply the transport is already
|
||||||
|
// dead — the tail's next Send will hit the same error and
|
||||||
|
// surface through the gateway observability layer, so we
|
||||||
|
// discard the returned error here and rely on that path
|
||||||
|
// for the canonical failure record.
|
||||||
|
_ = hbStream.Run(stream.Context())
|
||||||
|
}()
|
||||||
|
streamForTail = hbStream
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAuthenticatedPushStreamService(tailDelegate gatewayv1.EdgeGatewayServer, responseSigner authn.ResponseSigner, clk clock.Clock) gatewayv1.EdgeGatewayServer {
|
return s.tailDelegate.SubscribeEvents(req, streamForTail)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAuthenticatedPushStreamService(
|
||||||
|
tailDelegate gatewayv1.EdgeGatewayServer,
|
||||||
|
responseSigner authn.ResponseSigner,
|
||||||
|
clk clock.Clock,
|
||||||
|
heartbeatInterval time.Duration,
|
||||||
|
metrics *telemetry.Runtime,
|
||||||
|
) gatewayv1.EdgeGatewayServer {
|
||||||
if tailDelegate == nil {
|
if tailDelegate == nil {
|
||||||
tailDelegate = holdOpenSubscribeEventsService{}
|
tailDelegate = holdOpenSubscribeEventsService{}
|
||||||
}
|
}
|
||||||
@@ -124,6 +179,8 @@ func newAuthenticatedPushStreamService(tailDelegate gatewayv1.EdgeGatewayServer,
|
|||||||
tailDelegate: tailDelegate,
|
tailDelegate: tailDelegate,
|
||||||
responseSigner: responseSigner,
|
responseSigner: responseSigner,
|
||||||
clock: clk,
|
clock: clk,
|
||||||
|
heartbeatInterval: heartbeatInterval,
|
||||||
|
metrics: metrics,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -111,7 +111,13 @@ func NewServer(cfg config.AuthenticatedGRPCConfig, deps ServerDependencies) *Ser
|
|||||||
deps = normalizeServerDependencies(deps)
|
deps = normalizeServerDependencies(deps)
|
||||||
|
|
||||||
finalService := newCommandRoutingService(
|
finalService := newCommandRoutingService(
|
||||||
newAuthenticatedPushStreamService(deps.Service, deps.ResponseSigner, deps.Clock),
|
newAuthenticatedPushStreamService(
|
||||||
|
deps.Service,
|
||||||
|
deps.ResponseSigner,
|
||||||
|
deps.Clock,
|
||||||
|
cfg.PushHeartbeatInterval,
|
||||||
|
deps.Telemetry,
|
||||||
|
),
|
||||||
deps.Router,
|
deps.Router,
|
||||||
deps.ResponseSigner,
|
deps.ResponseSigner,
|
||||||
deps.Clock,
|
deps.Clock,
|
||||||
|
|||||||
@@ -174,6 +174,102 @@ func TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation(
|
|||||||
assert.Equal(t, connect.CodeCanceled, connect.CodeOf(recvErr))
|
assert.Equal(t, connect.CodeCanceled, connect.CodeOf(recvErr))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSubscribeEventsEmitsHeartbeatAfterSilenceWindow(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
grpcCfg := config.DefaultAuthenticatedGRPCConfig()
|
||||||
|
grpcCfg.Addr = "127.0.0.1:0"
|
||||||
|
grpcCfg.FreshnessWindow = testFreshnessWindow
|
||||||
|
// 30 ms keeps the test inside the standard 60-second go test
|
||||||
|
// timeout while still giving the heartbeat goroutine enough
|
||||||
|
// headroom to fire after the bootstrap server-time event lands.
|
||||||
|
grpcCfg.PushHeartbeatInterval = 30 * time.Millisecond
|
||||||
|
|
||||||
|
server, runGateway := newTestGatewayWithGRPCConfig(t, grpcCfg, ServerDependencies{
|
||||||
|
SessionCache: staticSessionCache{
|
||||||
|
lookupFunc: func(context.Context, string) (session.Record, error) {
|
||||||
|
return newActiveSessionRecord(), nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ReplayStore: staticReplayStore{},
|
||||||
|
})
|
||||||
|
defer runGateway.stop(t)
|
||||||
|
|
||||||
|
addr := waitForListenAddr(t, server)
|
||||||
|
client := newEdgeClient(t, addr)
|
||||||
|
|
||||||
|
stream, err := client.SubscribeEvents(t.Context(), connect.NewRequest(newValidSubscribeEventsRequest()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() { _ = stream.Close() })
|
||||||
|
|
||||||
|
bootstrap := recvBootstrapEvent(t, stream)
|
||||||
|
assertServerTimeBootstrapEvent(t, bootstrap, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
|
||||||
|
|
||||||
|
// The next frame must be the unsigned heartbeat. Every field
|
||||||
|
// except EventType is left at its proto3 default — the UI side
|
||||||
|
// short-circuits on EventType BEFORE signature verification, so
|
||||||
|
// any non-empty signature would be wasted bytes on the wire.
|
||||||
|
require.True(t, stream.Receive(), "stream did not deliver a heartbeat: %v", stream.Err())
|
||||||
|
heartbeat := stream.Msg()
|
||||||
|
assert.Equal(t, gatewayHeartbeatEventType, heartbeat.GetEventType())
|
||||||
|
assert.Empty(t, heartbeat.GetEventId())
|
||||||
|
assert.Zero(t, heartbeat.GetTimestampMs())
|
||||||
|
assert.Empty(t, heartbeat.GetPayloadBytes())
|
||||||
|
assert.Empty(t, heartbeat.GetPayloadHash())
|
||||||
|
assert.Empty(t, heartbeat.GetSignature())
|
||||||
|
assert.Empty(t, heartbeat.GetRequestId())
|
||||||
|
assert.Empty(t, heartbeat.GetTraceId())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscribeEventsZeroHeartbeatIntervalDisablesEmission(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
grpcCfg := config.DefaultAuthenticatedGRPCConfig()
|
||||||
|
grpcCfg.Addr = "127.0.0.1:0"
|
||||||
|
grpcCfg.FreshnessWindow = testFreshnessWindow
|
||||||
|
grpcCfg.PushHeartbeatInterval = 0
|
||||||
|
|
||||||
|
server, runGateway := newTestGatewayWithGRPCConfig(t, grpcCfg, ServerDependencies{
|
||||||
|
SessionCache: staticSessionCache{
|
||||||
|
lookupFunc: func(context.Context, string) (session.Record, error) {
|
||||||
|
return newActiveSessionRecord(), nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ReplayStore: staticReplayStore{},
|
||||||
|
})
|
||||||
|
defer runGateway.stop(t)
|
||||||
|
|
||||||
|
addr := waitForListenAddr(t, server)
|
||||||
|
client := newEdgeClient(t, addr)
|
||||||
|
|
||||||
|
stream, err := client.SubscribeEvents(t.Context(), connect.NewRequest(newValidSubscribeEventsRequest()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() { _ = stream.Close() })
|
||||||
|
|
||||||
|
bootstrap := recvBootstrapEvent(t, stream)
|
||||||
|
assertServerTimeBootstrapEvent(t, bootstrap, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
|
||||||
|
|
||||||
|
// No heartbeat is expected — the stream must stay silent. A
|
||||||
|
// background Receive races a deadline check so the test fails
|
||||||
|
// fast if the gateway ever sends a second frame on this stream.
|
||||||
|
recvResult := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
if stream.Receive() {
|
||||||
|
recvResult <- errors.New("stream produced unexpected event")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
recvResult <- stream.Err()
|
||||||
|
}()
|
||||||
|
require.Never(t, func() bool {
|
||||||
|
select {
|
||||||
|
case <-recvResult:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, 200*time.Millisecond, 20*time.Millisecond, "heartbeat fired despite zero interval")
|
||||||
|
}
|
||||||
|
|
||||||
func TestSubscribeEventsMissingReplayStoreFailsClosed(t *testing.T) {
|
func TestSubscribeEventsMissingReplayStoreFailsClosed(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ type Runtime struct {
|
|||||||
// Push instruments.
|
// Push instruments.
|
||||||
pushActiveStreams metric.Int64UpDownCounter
|
pushActiveStreams metric.Int64UpDownCounter
|
||||||
pushStreamClosers metric.Int64Counter
|
pushStreamClosers metric.Int64Counter
|
||||||
|
pushHeartbeats metric.Int64Counter
|
||||||
|
|
||||||
// Internal event consumer instruments.
|
// Internal event consumer instruments.
|
||||||
internalEventDrops metric.Int64Counter
|
internalEventDrops metric.Int64Counter
|
||||||
@@ -120,6 +121,10 @@ func New(ctx context.Context, logger *zap.Logger) (*Runtime, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
pushHeartbeats, err := meter.Int64Counter("gateway.push.heartbeats_sent")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
internalEventDrops, err := meter.Int64Counter("gateway.internal_event_drops")
|
internalEventDrops, err := meter.Int64Counter("gateway.internal_event_drops")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -136,6 +141,7 @@ func New(ctx context.Context, logger *zap.Logger) (*Runtime, error) {
|
|||||||
grpcDuration: grpcDuration,
|
grpcDuration: grpcDuration,
|
||||||
pushActiveStreams: pushActiveStreams,
|
pushActiveStreams: pushActiveStreams,
|
||||||
pushStreamClosers: pushStreamClosers,
|
pushStreamClosers: pushStreamClosers,
|
||||||
|
pushHeartbeats: pushHeartbeats,
|
||||||
internalEventDrops: internalEventDrops,
|
internalEventDrops: internalEventDrops,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@@ -228,6 +234,19 @@ func (r *Runtime) RecordPushStreamClosure(ctx context.Context, attrs ...attribut
|
|||||||
r.pushStreamClosers.Add(ctx, 1, metric.WithAttributes(attrs...))
|
r.pushStreamClosers.Add(ctx, 1, metric.WithAttributes(attrs...))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RecordPushHeartbeat records one outbound push-stream heartbeat event.
|
||||||
|
// The `outcome` attribute should distinguish a successful Send from a
|
||||||
|
// transport-level failure so the metric stays useful for bandwidth
|
||||||
|
// budgeting (most heartbeats are `sent`; a sudden bump of `error` means
|
||||||
|
// the upstream connection is failing before the gateway can flush).
|
||||||
|
func (r *Runtime) RecordPushHeartbeat(ctx context.Context, attrs ...attribute.KeyValue) {
|
||||||
|
if r == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.pushHeartbeats.Add(ctx, 1, metric.WithAttributes(attrs...))
|
||||||
|
}
|
||||||
|
|
||||||
// RecordInternalEventDrop records one malformed or rejected internal event.
|
// RecordInternalEventDrop records one malformed or rejected internal event.
|
||||||
func (r *Runtime) RecordInternalEventDrop(ctx context.Context, attrs ...attribute.KeyValue) {
|
func (r *Runtime) RecordInternalEventDrop(ctx context.Context, attrs ...attribute.KeyValue) {
|
||||||
if r == nil {
|
if r == nil {
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ import { ConnectError } from "@connectrpc/connect";
|
|||||||
import type { Core } from "../platform/core/index";
|
import type { Core } from "../platform/core/index";
|
||||||
import type { DeviceKeypair } from "../platform/store/index";
|
import type { DeviceKeypair } from "../platform/store/index";
|
||||||
import {
|
import {
|
||||||
GatewayEventSchema,
|
|
||||||
SubscribeEventsRequestSchema,
|
SubscribeEventsRequestSchema,
|
||||||
type GatewayEvent,
|
type GatewayEvent,
|
||||||
} from "../proto/galaxy/gateway/v1/edge_gateway_pb";
|
} from "../proto/galaxy/gateway/v1/edge_gateway_pb";
|
||||||
@@ -35,6 +34,17 @@ import { createEdgeGatewayClient, type EdgeGatewayClient } from "./connect";
|
|||||||
const PROTOCOL_VERSION = "v1";
|
const PROTOCOL_VERSION = "v1";
|
||||||
const SUBSCRIBE_MESSAGE_TYPE = "gateway.subscribe";
|
const SUBSCRIBE_MESSAGE_TYPE = "gateway.subscribe";
|
||||||
|
|
||||||
|
// HEARTBEAT_EVENT_TYPE matches `gatewayHeartbeatEventType` on the
|
||||||
|
// gateway side. The server emits this unsigned event with only the
|
||||||
|
// `eventType` field set when the push stream has been silent for the
|
||||||
|
// configured interval — its sole purpose is to keep browser fetch
|
||||||
|
// streaming layers (Safari is notably aggressive on idle timeouts)
|
||||||
|
// from closing the response body and causing push events to disappear
|
||||||
|
// into the reconnect window. The client SHORT-CIRCUITS on this event
|
||||||
|
// type before signature verification or handler dispatch: the payload
|
||||||
|
// is empty by design and there is no signature to validate.
|
||||||
|
const HEARTBEAT_EVENT_TYPE = "gateway.heartbeat";
|
||||||
|
|
||||||
// Connect error code numerical values used by the watcher. The full
|
// Connect error code numerical values used by the watcher. The full
|
||||||
// enum lives in `@connectrpc/connect` but importing the runtime enum
|
// enum lives in `@connectrpc/connect` but importing the runtime enum
|
||||||
// would pull a large surface into this small module.
|
// would pull a large surface into this small module.
|
||||||
@@ -207,6 +217,20 @@ export class EventStream {
|
|||||||
if (signal.aborted) {
|
if (signal.aborted) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (event.eventType === HEARTBEAT_EVENT_TYPE) {
|
||||||
|
// Heartbeats are unsigned by design (see the
|
||||||
|
// constant's docstring) and carry no payload — skip
|
||||||
|
// verification + dispatch. They still count as
|
||||||
|
// connection proof: receiving one means the
|
||||||
|
// stream is healthy, so reset backoff like any
|
||||||
|
// other first event.
|
||||||
|
if (!firstEventSeen) {
|
||||||
|
firstEventSeen = true;
|
||||||
|
this.connectionStatus = "connected";
|
||||||
|
attempt = 0;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
this.verifyEvent(event, opts);
|
this.verifyEvent(event, opts);
|
||||||
if (!firstEventSeen) {
|
if (!firstEventSeen) {
|
||||||
firstEventSeen = true;
|
firstEventSeen = true;
|
||||||
|
|||||||
@@ -321,6 +321,88 @@ describe("EventStream", () => {
|
|||||||
eventStream.stop();
|
eventStream.stop();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("gateway.heartbeat is short-circuited before verification and dispatch", async () => {
|
||||||
|
const handler = vi.fn();
|
||||||
|
eventStream.on("game.turn.ready", handler);
|
||||||
|
// A heartbeat with an empty signature (matching the unsigned
|
||||||
|
// wire shape the gateway emits) would normally trip
|
||||||
|
// `verifyEvent` and tear the stream down with a SignatureError.
|
||||||
|
// The short-circuit skips both verification and dispatch.
|
||||||
|
const heartbeat = create(GatewayEventSchema, {
|
||||||
|
eventType: "gateway.heartbeat",
|
||||||
|
eventId: "",
|
||||||
|
timestampMs: 0n,
|
||||||
|
payloadBytes: new Uint8Array(0),
|
||||||
|
payloadHash: new Uint8Array(0),
|
||||||
|
signature: new Uint8Array(0),
|
||||||
|
requestId: "",
|
||||||
|
traceId: "",
|
||||||
|
});
|
||||||
|
const realEvent = buildEvent(
|
||||||
|
"game.turn.ready",
|
||||||
|
new TextEncoder().encode("{}"),
|
||||||
|
);
|
||||||
|
|
||||||
|
const verifyEventSpy = vi.fn(() => true);
|
||||||
|
const verifyPayloadHashSpy = vi.fn(() => true);
|
||||||
|
const core = mockCore({
|
||||||
|
verifyEvent: verifyEventSpy,
|
||||||
|
verifyPayloadHash: verifyPayloadHashSpy,
|
||||||
|
});
|
||||||
|
|
||||||
|
const client = makeRouter(async function* () {
|
||||||
|
yield heartbeat;
|
||||||
|
yield realEvent;
|
||||||
|
});
|
||||||
|
eventStream.start({
|
||||||
|
core,
|
||||||
|
keypair: mockKeypair(),
|
||||||
|
deviceSessionId: "device-1",
|
||||||
|
gatewayResponsePublicKey: new Uint8Array(32),
|
||||||
|
client,
|
||||||
|
sleep: async () => {},
|
||||||
|
random: () => 0,
|
||||||
|
});
|
||||||
|
|
||||||
|
await vi.waitFor(() => {
|
||||||
|
expect(handler).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
// Verification ran exactly once — for the real event, never
|
||||||
|
// for the heartbeat. The handler also only sees the real one.
|
||||||
|
expect(verifyEventSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(verifyPayloadHashSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(handler).toHaveBeenCalledTimes(1);
|
||||||
|
expect(handler.mock.calls[0]?.[0].eventType).toBe("game.turn.ready");
|
||||||
|
eventStream.stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
test("a leading heartbeat still flips connectionStatus to connected", async () => {
|
||||||
|
const heartbeat = create(GatewayEventSchema, {
|
||||||
|
eventType: "gateway.heartbeat",
|
||||||
|
});
|
||||||
|
// The stream yields only a heartbeat then waits for the test
|
||||||
|
// to abort the consumer — the `connected` transition must not
|
||||||
|
// require a real event behind the heartbeat.
|
||||||
|
const pending = new Promise<never>(() => {});
|
||||||
|
const client = makeRouter(async function* () {
|
||||||
|
yield heartbeat;
|
||||||
|
await pending;
|
||||||
|
});
|
||||||
|
eventStream.start({
|
||||||
|
core: mockCore(),
|
||||||
|
keypair: mockKeypair(),
|
||||||
|
deviceSessionId: "device-1",
|
||||||
|
gatewayResponsePublicKey: new Uint8Array(32),
|
||||||
|
client,
|
||||||
|
sleep: async () => {},
|
||||||
|
random: () => 0,
|
||||||
|
});
|
||||||
|
await vi.waitFor(() => {
|
||||||
|
expect(eventStream.connectionStatus).toBe("connected");
|
||||||
|
});
|
||||||
|
eventStream.stop();
|
||||||
|
});
|
||||||
|
|
||||||
test("connectionStatus transitions through connecting → connected → idle", async () => {
|
test("connectionStatus transitions through connecting → connected → idle", async () => {
|
||||||
expect(eventStream.connectionStatus).toBe("idle");
|
expect(eventStream.connectionStatus).toBe("idle");
|
||||||
const event = buildEvent(
|
const event = buildEvent(
|
||||||
|
|||||||
Reference in New Issue
Block a user