14b65389ef
Tests · UI / test (push) Successful in 2m35s
Tests · Go / test (push) Successful in 1m56s
Tests · UI / test (pull_request) Has been cancelled
Tests · Integration / integration (pull_request) Successful in 1m42s
Tests · Go / test (pull_request) Successful in 2m0s
Browser fetch-streaming layers close response bodies they consider
idle after roughly 15-30 s without incoming bytes. Safari is the
most aggressive, but the symptom matters everywhere: a quiet
SubscribeEvents stream (lobby, between turns, mailbox empty) gets
torn down by the browser, the EventStream singleton reconnects with
backoff, and any push event that fires inside the reconnect window
is lost because `push.Hub` queues are not persisted across
subscription closes. The user-visible failure mode is the
intermittent "Fetch API cannot load … due to access control checks"
console error (a misleading WebKit symptom — CORS headers are
actually present) plus missed turn-ready / mail-received toasts.
Server-side fix: a silence-based heartbeat at the
`authenticatedPushStreamService` wrapper layer. After the signed
`gateway.server_time` bootstrap event, gateway wraps the bound
stream with `heartbeatingStream`. Every tail Send (fan-out, future
variants) resets the silence timer; when the timer elapses, a
goroutine emits `gateway.heartbeat` with only `EventType` set —
everything else stays at proto3 defaults, so the wire frame is
~45 bytes amortised. A `sendMu` serialises the heartbeat goroutine
with tail Sends because grpc.ServerStream.Send is not goroutine-safe.
The heartbeat is intentionally UNSIGNED: heartbeats carry no
payload, dispatch to no handler on the client, and an injected
heartbeat trivially causes no user-visible state change. TLS still
protects the wire and real events keep the signed envelope
unchanged. Documented in `docs/ARCHITECTURE.md` § 15 alongside the
per-scale bandwidth projection (100…100 000 clients × 15…60 s).
Config: new `GATEWAY_PUSH_HEARTBEAT_INTERVAL` (default `15s`,
`0s` disables). Telemetry: new
`gateway.push.heartbeats_sent{outcome}` counter so operators can
budget bandwidth and spot a sudden `outcome=error` bump as an
upstream-failing-before-flush signal.
Client (`ui/frontend/src/api/events.svelte.ts`): early `continue`
on `event.eventType === "gateway.heartbeat"` before `verifyEvent`,
`verifyPayloadHash`, or dispatch — empty signature would otherwise
trip SignatureError and reconnect. A leading heartbeat still flips
`connectionStatus` to `connected` and resets backoff, because
receiving one is proof the stream is healthy.
Tests:
- `push_heartbeat_test.go`: unit tests for the wrapper — zero
interval returns nil, heartbeat fires after silence, real Send
resets the timer, Stop / context-cancel halt the goroutine,
Send errors propagate.
- `server_test.go`: integration tests through the full gateway
pipeline — heartbeat fires after the configured silence window,
zero interval keeps the stream silent.
- `config_test.go`: default applied, env-override parsed,
negative value rejected.
- `events.test.ts`: heartbeat skipped before verification + not
dispatched to handlers; leading heartbeat still flips
`connectionStatus` to `connected`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
222 lines
7.5 KiB
Go
222 lines
7.5 KiB
Go
package grpcapi
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"time"
|
|
|
|
"galaxy/gateway/authn"
|
|
"galaxy/gateway/internal/clock"
|
|
"galaxy/gateway/internal/telemetry"
|
|
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
|
gatewayfbs "galaxy/schema/fbs/gateway"
|
|
|
|
flatbuffers "github.com/google/flatbuffers/go"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const (
|
|
serverTimeEventType = "gateway.server_time"
|
|
|
|
// gatewayHeartbeatEventType labels the silence-filling event the
|
|
// authenticated push stream emits when no real event has been Send'd
|
|
// within `AuthenticatedGRPCConfig.PushHeartbeatInterval`. Browser
|
|
// fetch-streaming layers (notably Safari) close response bodies they
|
|
// consider idle; the heartbeat keeps the body active so push events
|
|
// land on the live stream instead of disappearing into the
|
|
// client-side reconnect window.
|
|
//
|
|
// Heartbeat events are sent UNSIGNED — `EventID`, `RequestID`,
|
|
// `TraceID`, `PayloadBytes`, `PayloadHash`, `Signature`, and
|
|
// `TimestampMs` are all left at their proto3 defaults so the wire
|
|
// frame stays under ~50 bytes. The UI's EventStream short-circuits
|
|
// on this event type before signature verification (see
|
|
// `ui/frontend/src/api/events.svelte.ts`) and never dispatches it to
|
|
// handlers. The security implication is intentional and documented
|
|
// in `docs/ARCHITECTURE.md` (§ authenticated edge): an attacker who
|
|
// could inject heartbeats gains nothing — they carry no payload and
|
|
// trigger no UI behaviour, the only practical effect is keeping a
|
|
// stream marginally more alive than transport-level keepalives
|
|
// would. Real events keep the signed envelope unchanged.
|
|
gatewayHeartbeatEventType = "gateway.heartbeat"
|
|
)
|
|
|
|
// authenticatedStreamBinding captures the verified identity bound to one
|
|
// authenticated SubscribeEvents stream after the full ingress pipeline
|
|
// succeeds.
|
|
type authenticatedStreamBinding struct {
|
|
UserID string
|
|
DeviceSessionID string
|
|
MessageType string
|
|
RequestID string
|
|
TraceID string
|
|
}
|
|
|
|
// authenticatedStreamBindingFromContext returns the verified stream binding
|
|
// previously attached to ctx by the authenticated push-stream service.
|
|
func authenticatedStreamBindingFromContext(ctx context.Context) (authenticatedStreamBinding, bool) {
|
|
if ctx == nil {
|
|
return authenticatedStreamBinding{}, false
|
|
}
|
|
|
|
binding, ok := ctx.Value(authenticatedStreamBindingContextKey{}).(authenticatedStreamBinding)
|
|
if !ok {
|
|
return authenticatedStreamBinding{}, false
|
|
}
|
|
|
|
return binding, true
|
|
}
|
|
|
|
// authenticatedPushStreamService owns SubscribeEvents bootstrap behavior:
|
|
// bind the authenticated stream, send the initial signed server-time event,
|
|
// and then hand the stream lifecycle to the configured tail delegate.
|
|
//
|
|
// A positive `heartbeatInterval` wraps the bound stream with
|
|
// `heartbeatingStream` before delegating, so any tail implementation
|
|
// (fan-out, hold-open, future variants) gets the silence-based
|
|
// `gateway.heartbeat` for free. The wrapper observes every real Send
|
|
// the tail performs and only emits a heartbeat when the silence window
|
|
// elapses; tails remain heartbeat-unaware.
|
|
type authenticatedPushStreamService struct {
|
|
gatewayv1.UnimplementedEdgeGatewayServer
|
|
|
|
tailDelegate gatewayv1.EdgeGatewayServer
|
|
responseSigner authn.ResponseSigner
|
|
clock clock.Clock
|
|
heartbeatInterval time.Duration
|
|
metrics *telemetry.Runtime
|
|
}
|
|
|
|
// SubscribeEvents binds the verified stream identity, sends the initial signed
|
|
// server-time event, and then delegates the remaining lifecycle.
|
|
func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
|
envelope, ok := parsedEnvelopeFromContext(stream.Context())
|
|
if !ok {
|
|
return status.Error(codes.Internal, "authenticated request context is incomplete")
|
|
}
|
|
|
|
record, ok := resolvedSessionFromContext(stream.Context())
|
|
if !ok {
|
|
return status.Error(codes.Internal, "authenticated request context is incomplete")
|
|
}
|
|
|
|
binding := authenticatedStreamBinding{
|
|
UserID: record.UserID,
|
|
DeviceSessionID: record.DeviceSessionID,
|
|
MessageType: envelope.MessageType,
|
|
RequestID: envelope.RequestID,
|
|
TraceID: envelope.TraceID,
|
|
}
|
|
boundStream := authenticatedStreamContextStream{
|
|
ServerStreamingServer: stream,
|
|
ctx: context.WithValue(
|
|
stream.Context(),
|
|
authenticatedStreamBindingContextKey{},
|
|
binding,
|
|
),
|
|
}
|
|
|
|
serverTimeMS := s.clock.Now().UTC().UnixMilli()
|
|
payloadBytes := buildServerTimeEventPayload(serverTimeMS)
|
|
payloadHash := sha256.Sum256(payloadBytes)
|
|
signature, err := s.responseSigner.SignEvent(authn.EventSigningFields{
|
|
EventType: serverTimeEventType,
|
|
EventID: envelope.RequestID,
|
|
TimestampMS: serverTimeMS,
|
|
RequestID: envelope.RequestID,
|
|
TraceID: envelope.TraceID,
|
|
PayloadHash: payloadHash[:],
|
|
})
|
|
if err != nil {
|
|
return status.Error(codes.Unavailable, "response signer is unavailable")
|
|
}
|
|
|
|
if err := boundStream.Send(&gatewayv1.GatewayEvent{
|
|
EventType: serverTimeEventType,
|
|
EventId: envelope.RequestID,
|
|
TimestampMs: serverTimeMS,
|
|
PayloadBytes: bytes.Clone(payloadBytes),
|
|
PayloadHash: bytes.Clone(payloadHash[:]),
|
|
Signature: signature,
|
|
RequestId: envelope.RequestID,
|
|
TraceId: envelope.TraceID,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
var streamForTail grpc.ServerStreamingServer[gatewayv1.GatewayEvent] = boundStream
|
|
if hbStream := newHeartbeatingStream(boundStream, s.heartbeatInterval, s.metrics); hbStream != nil {
|
|
defer hbStream.Stop()
|
|
go func() {
|
|
// Heartbeat Send failures imply the transport is already
|
|
// dead — the tail's next Send will hit the same error and
|
|
// surface through the gateway observability layer, so we
|
|
// discard the returned error here and rely on that path
|
|
// for the canonical failure record.
|
|
_ = hbStream.Run(stream.Context())
|
|
}()
|
|
streamForTail = hbStream
|
|
}
|
|
|
|
return s.tailDelegate.SubscribeEvents(req, streamForTail)
|
|
}
|
|
|
|
func newAuthenticatedPushStreamService(
|
|
tailDelegate gatewayv1.EdgeGatewayServer,
|
|
responseSigner authn.ResponseSigner,
|
|
clk clock.Clock,
|
|
heartbeatInterval time.Duration,
|
|
metrics *telemetry.Runtime,
|
|
) gatewayv1.EdgeGatewayServer {
|
|
if tailDelegate == nil {
|
|
tailDelegate = holdOpenSubscribeEventsService{}
|
|
}
|
|
|
|
return authenticatedPushStreamService{
|
|
tailDelegate: tailDelegate,
|
|
responseSigner: responseSigner,
|
|
clock: clk,
|
|
heartbeatInterval: heartbeatInterval,
|
|
metrics: metrics,
|
|
}
|
|
}
|
|
|
|
func buildServerTimeEventPayload(serverTimeMS int64) []byte {
|
|
builder := flatbuffers.NewBuilder(32)
|
|
gatewayfbs.ServerTimeEventStart(builder)
|
|
gatewayfbs.ServerTimeEventAddServerTimeMs(builder, serverTimeMS)
|
|
eventOffset := gatewayfbs.ServerTimeEventEnd(builder)
|
|
gatewayfbs.FinishServerTimeEventBuffer(builder, eventOffset)
|
|
|
|
return bytes.Clone(builder.FinishedBytes())
|
|
}
|
|
|
|
type authenticatedStreamBindingContextKey struct{}
|
|
|
|
type authenticatedStreamContextStream struct {
|
|
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
|
|
ctx context.Context
|
|
}
|
|
|
|
func (s authenticatedStreamContextStream) Context() context.Context {
|
|
if s.ctx == nil {
|
|
return context.Background()
|
|
}
|
|
|
|
return s.ctx
|
|
}
|
|
|
|
type holdOpenSubscribeEventsService struct {
|
|
gatewayv1.UnimplementedEdgeGatewayServer
|
|
}
|
|
|
|
func (holdOpenSubscribeEventsService) SubscribeEvents(_ *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
|
<-stream.Context().Done()
|
|
return stream.Context().Err()
|
|
}
|
|
|
|
var _ gatewayv1.EdgeGatewayServer = authenticatedPushStreamService{}
|