package grpcapi import ( "bytes" "context" "crypto/sha256" "time" "galaxy/gateway/authn" "galaxy/gateway/internal/clock" "galaxy/gateway/internal/telemetry" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" gatewayfbs "galaxy/schema/fbs/gateway" flatbuffers "github.com/google/flatbuffers/go" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const ( serverTimeEventType = "gateway.server_time" // gatewayHeartbeatEventType labels the silence-filling event the // authenticated push stream emits when no real event has been Send'd // within `AuthenticatedGRPCConfig.PushHeartbeatInterval`. Browser // fetch-streaming layers (notably Safari) close response bodies they // consider idle; the heartbeat keeps the body active so push events // land on the live stream instead of disappearing into the // client-side reconnect window. // // Heartbeat events are sent UNSIGNED โ€” `EventID`, `RequestID`, // `TraceID`, `PayloadBytes`, `PayloadHash`, `Signature`, and // `TimestampMs` are all left at their proto3 defaults so the wire // frame stays under ~50 bytes. The UI's EventStream short-circuits // on this event type before signature verification (see // `ui/frontend/src/api/events.svelte.ts`) and never dispatches it to // handlers. The security implication is intentional and documented // in `docs/ARCHITECTURE.md` (ยง authenticated edge): an attacker who // could inject heartbeats gains nothing โ€” they carry no payload and // trigger no UI behaviour, the only practical effect is keeping a // stream marginally more alive than transport-level keepalives // would. Real events keep the signed envelope unchanged. gatewayHeartbeatEventType = "gateway.heartbeat" ) // authenticatedStreamBinding captures the verified identity bound to one // authenticated SubscribeEvents stream after the full ingress pipeline // succeeds. type authenticatedStreamBinding struct { UserID string DeviceSessionID string MessageType string RequestID string TraceID string } // authenticatedStreamBindingFromContext returns the verified stream binding // previously attached to ctx by the authenticated push-stream service. func authenticatedStreamBindingFromContext(ctx context.Context) (authenticatedStreamBinding, bool) { if ctx == nil { return authenticatedStreamBinding{}, false } binding, ok := ctx.Value(authenticatedStreamBindingContextKey{}).(authenticatedStreamBinding) if !ok { return authenticatedStreamBinding{}, false } return binding, true } // authenticatedPushStreamService owns SubscribeEvents bootstrap behavior: // bind the authenticated stream, send the initial signed server-time event, // and then hand the stream lifecycle to the configured tail delegate. // // A positive `heartbeatInterval` wraps the bound stream with // `heartbeatingStream` before delegating, so any tail implementation // (fan-out, hold-open, future variants) gets the silence-based // `gateway.heartbeat` for free. The wrapper observes every real Send // the tail performs and only emits a heartbeat when the silence window // elapses; tails remain heartbeat-unaware. type authenticatedPushStreamService struct { gatewayv1.UnimplementedEdgeGatewayServer tailDelegate gatewayv1.EdgeGatewayServer responseSigner authn.ResponseSigner clock clock.Clock heartbeatInterval time.Duration metrics *telemetry.Runtime } // SubscribeEvents binds the verified stream identity, sends the initial signed // server-time event, and then delegates the remaining lifecycle. func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error { envelope, ok := parsedEnvelopeFromContext(stream.Context()) if !ok { return status.Error(codes.Internal, "authenticated request context is incomplete") } record, ok := resolvedSessionFromContext(stream.Context()) if !ok { return status.Error(codes.Internal, "authenticated request context is incomplete") } binding := authenticatedStreamBinding{ UserID: record.UserID, DeviceSessionID: record.DeviceSessionID, MessageType: envelope.MessageType, RequestID: envelope.RequestID, TraceID: envelope.TraceID, } boundStream := authenticatedStreamContextStream{ ServerStreamingServer: stream, ctx: context.WithValue( stream.Context(), authenticatedStreamBindingContextKey{}, binding, ), } serverTimeMS := s.clock.Now().UTC().UnixMilli() payloadBytes := buildServerTimeEventPayload(serverTimeMS) payloadHash := sha256.Sum256(payloadBytes) signature, err := s.responseSigner.SignEvent(authn.EventSigningFields{ EventType: serverTimeEventType, EventID: envelope.RequestID, TimestampMS: serverTimeMS, RequestID: envelope.RequestID, TraceID: envelope.TraceID, PayloadHash: payloadHash[:], }) if err != nil { return status.Error(codes.Unavailable, "response signer is unavailable") } if err := boundStream.Send(&gatewayv1.GatewayEvent{ EventType: serverTimeEventType, EventId: envelope.RequestID, TimestampMs: serverTimeMS, PayloadBytes: bytes.Clone(payloadBytes), PayloadHash: bytes.Clone(payloadHash[:]), Signature: signature, RequestId: envelope.RequestID, TraceId: envelope.TraceID, }); err != nil { return err } var streamForTail grpc.ServerStreamingServer[gatewayv1.GatewayEvent] = boundStream if hbStream := newHeartbeatingStream(boundStream, s.heartbeatInterval, s.metrics); hbStream != nil { defer hbStream.Stop() go func() { // Heartbeat Send failures imply the transport is already // dead โ€” the tail's next Send will hit the same error and // surface through the gateway observability layer, so we // discard the returned error here and rely on that path // for the canonical failure record. _ = hbStream.Run(stream.Context()) }() streamForTail = hbStream } return s.tailDelegate.SubscribeEvents(req, streamForTail) } func newAuthenticatedPushStreamService( tailDelegate gatewayv1.EdgeGatewayServer, responseSigner authn.ResponseSigner, clk clock.Clock, heartbeatInterval time.Duration, metrics *telemetry.Runtime, ) gatewayv1.EdgeGatewayServer { if tailDelegate == nil { tailDelegate = holdOpenSubscribeEventsService{} } return authenticatedPushStreamService{ tailDelegate: tailDelegate, responseSigner: responseSigner, clock: clk, heartbeatInterval: heartbeatInterval, metrics: metrics, } } func buildServerTimeEventPayload(serverTimeMS int64) []byte { builder := flatbuffers.NewBuilder(32) gatewayfbs.ServerTimeEventStart(builder) gatewayfbs.ServerTimeEventAddServerTimeMs(builder, serverTimeMS) eventOffset := gatewayfbs.ServerTimeEventEnd(builder) gatewayfbs.FinishServerTimeEventBuffer(builder, eventOffset) return bytes.Clone(builder.FinishedBytes()) } type authenticatedStreamBindingContextKey struct{} type authenticatedStreamContextStream struct { grpc.ServerStreamingServer[gatewayv1.GatewayEvent] ctx context.Context } func (s authenticatedStreamContextStream) Context() context.Context { if s.ctx == nil { return context.Background() } return s.ctx } type holdOpenSubscribeEventsService struct { gatewayv1.UnimplementedEdgeGatewayServer } func (holdOpenSubscribeEventsService) SubscribeEvents(_ *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error { <-stream.Context().Done() return stream.Context().Err() } var _ gatewayv1.EdgeGatewayServer = authenticatedPushStreamService{}