package grpcapi import ( "bytes" "context" "crypto/sha256" "galaxy/gateway/internal/authn" "galaxy/gateway/internal/clock" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" gatewayfbs "galaxy/schema/fbs/gateway" flatbuffers "github.com/google/flatbuffers/go" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const serverTimeEventType = "gateway.server_time" // authenticatedStreamBinding captures the verified identity bound to one // authenticated SubscribeEvents stream after the full ingress pipeline // succeeds. type authenticatedStreamBinding struct { UserID string DeviceSessionID string MessageType string RequestID string TraceID string } // authenticatedStreamBindingFromContext returns the verified stream binding // previously attached to ctx by the authenticated push-stream service. func authenticatedStreamBindingFromContext(ctx context.Context) (authenticatedStreamBinding, bool) { if ctx == nil { return authenticatedStreamBinding{}, false } binding, ok := ctx.Value(authenticatedStreamBindingContextKey{}).(authenticatedStreamBinding) if !ok { return authenticatedStreamBinding{}, false } return binding, true } // authenticatedPushStreamService owns SubscribeEvents bootstrap behavior: // bind the authenticated stream, send the initial signed server-time event, // and then hand the stream lifecycle to the configured tail delegate. type authenticatedPushStreamService struct { gatewayv1.UnimplementedEdgeGatewayServer tailDelegate gatewayv1.EdgeGatewayServer responseSigner authn.ResponseSigner clock clock.Clock } // SubscribeEvents binds the verified stream identity, sends the initial signed // server-time event, and then delegates the remaining lifecycle. func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error { envelope, ok := parsedEnvelopeFromContext(stream.Context()) if !ok { return status.Error(codes.Internal, "authenticated request context is incomplete") } record, ok := resolvedSessionFromContext(stream.Context()) if !ok { return status.Error(codes.Internal, "authenticated request context is incomplete") } binding := authenticatedStreamBinding{ UserID: record.UserID, DeviceSessionID: record.DeviceSessionID, MessageType: envelope.MessageType, RequestID: envelope.RequestID, TraceID: envelope.TraceID, } boundStream := authenticatedStreamContextStream{ ServerStreamingServer: stream, ctx: context.WithValue( stream.Context(), authenticatedStreamBindingContextKey{}, binding, ), } serverTimeMS := s.clock.Now().UTC().UnixMilli() payloadBytes := buildServerTimeEventPayload(serverTimeMS) payloadHash := sha256.Sum256(payloadBytes) signature, err := s.responseSigner.SignEvent(authn.EventSigningFields{ EventType: serverTimeEventType, EventID: envelope.RequestID, TimestampMS: serverTimeMS, RequestID: envelope.RequestID, TraceID: envelope.TraceID, PayloadHash: payloadHash[:], }) if err != nil { return status.Error(codes.Unavailable, "response signer is unavailable") } if err := boundStream.Send(&gatewayv1.GatewayEvent{ EventType: serverTimeEventType, EventId: envelope.RequestID, TimestampMs: serverTimeMS, PayloadBytes: bytes.Clone(payloadBytes), PayloadHash: bytes.Clone(payloadHash[:]), Signature: signature, RequestId: envelope.RequestID, TraceId: envelope.TraceID, }); err != nil { return err } return s.tailDelegate.SubscribeEvents(req, boundStream) } func newAuthenticatedPushStreamService(tailDelegate gatewayv1.EdgeGatewayServer, responseSigner authn.ResponseSigner, clk clock.Clock) gatewayv1.EdgeGatewayServer { if tailDelegate == nil { tailDelegate = holdOpenSubscribeEventsService{} } return authenticatedPushStreamService{ tailDelegate: tailDelegate, responseSigner: responseSigner, clock: clk, } } func buildServerTimeEventPayload(serverTimeMS int64) []byte { builder := flatbuffers.NewBuilder(32) gatewayfbs.ServerTimeEventStart(builder) gatewayfbs.ServerTimeEventAddServerTimeMs(builder, serverTimeMS) eventOffset := gatewayfbs.ServerTimeEventEnd(builder) gatewayfbs.FinishServerTimeEventBuffer(builder, eventOffset) return bytes.Clone(builder.FinishedBytes()) } type authenticatedStreamBindingContextKey struct{} type authenticatedStreamContextStream struct { grpc.ServerStreamingServer[gatewayv1.GatewayEvent] ctx context.Context } func (s authenticatedStreamContextStream) Context() context.Context { if s.ctx == nil { return context.Background() } return s.ctx } type holdOpenSubscribeEventsService struct { gatewayv1.UnimplementedEdgeGatewayServer } func (holdOpenSubscribeEventsService) SubscribeEvents(_ *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error { <-stream.Context().Done() return stream.Context().Err() } var _ gatewayv1.EdgeGatewayServer = authenticatedPushStreamService{}