package grpcapi import ( "bytes" "context" "crypto/sha256" "errors" "galaxy/gateway/authn" "galaxy/gateway/internal/clock" "galaxy/gateway/internal/logging" "galaxy/gateway/internal/push" "galaxy/gateway/internal/telemetry" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // NewFanOutPushStreamService constructs the authenticated SubscribeEvents tail // service that registers active streams in hub and forwards client-facing // events after the bootstrap event has been sent. func NewFanOutPushStreamService(hub *push.Hub, responseSigner authn.ResponseSigner, clk clock.Clock, logger *zap.Logger) gatewayv1.EdgeGatewayServer { if responseSigner == nil { responseSigner = unavailableResponseSigner{} } if clk == nil { clk = clock.System{} } if logger == nil { logger = zap.NewNop() } return fanOutPushStreamService{ hub: hub, responseSigner: responseSigner, clock: clk, logger: logger.Named("push_stream"), } } // fanOutPushStreamService owns the post-bootstrap authenticated push-stream // lifecycle backed by the in-memory push hub. type fanOutPushStreamService struct { gatewayv1.UnimplementedEdgeGatewayServer hub *push.Hub responseSigner authn.ResponseSigner clock clock.Clock logger *zap.Logger } // SubscribeEvents registers the verified stream in the push hub and forwards // matching client-facing events until the stream ends. func (s fanOutPushStreamService) SubscribeEvents(_ *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error { binding, ok := authenticatedStreamBindingFromContext(stream.Context()) if !ok { return status.Error(codes.Internal, "authenticated request context is incomplete") } if s.hub == nil { return status.Error(codes.Internal, "push hub is unavailable") } subscription, err := s.hub.Register(push.StreamBinding{ UserID: binding.UserID, DeviceSessionID: binding.DeviceSessionID, }) if err != nil { return status.Error(codes.Internal, "push stream registration failed") } defer subscription.Close() openFields := []zap.Field{ zap.String("component", "authenticated_grpc"), zap.String("transport", "grpc"), zap.String("rpc_method", authenticatedRPCSubscribeEvents), zap.String("message_type", binding.MessageType), zap.String("request_id", binding.RequestID), zap.String("trace_id", binding.TraceID), zap.String("device_session_id", binding.DeviceSessionID), zap.String("user_id", binding.UserID), } openFields = append(openFields, logging.TraceFieldsFromContext(stream.Context())...) s.logger.Info("push stream opened", openFields...) for { select { case <-stream.Context().Done(): s.logger.Info("push stream closed", append(openFields, zap.String("edge_outcome", string(mapSubscriptionOutcome(stream.Context().Err()))))...) return stream.Context().Err() case <-subscription.Done(): subscriptionErr := subscription.Err() s.logger.Warn("push stream closed", append(openFields, zap.String("edge_outcome", string(mapSubscriptionOutcome(subscriptionErr))), zap.String("reject_reason", string(mapSubscriptionOutcome(subscriptionErr))), )...) return mapSubscriptionError(subscriptionErr) case event := <-subscription.Events(): signedEvent, err := s.buildGatewayEvent(event) if err != nil { return err } if err := stream.Send(signedEvent); err != nil { return err } } } } func (s fanOutPushStreamService) buildGatewayEvent(event push.Event) (*gatewayv1.GatewayEvent, error) { timestampMS := s.clock.Now().UTC().UnixMilli() payloadHash := sha256.Sum256(event.PayloadBytes) signature, err := s.responseSigner.SignEvent(authn.EventSigningFields{ EventType: event.EventType, EventID: event.EventID, TimestampMS: timestampMS, RequestID: event.RequestID, TraceID: event.TraceID, PayloadHash: payloadHash[:], }) if err != nil { return nil, status.Error(codes.Unavailable, "response signer is unavailable") } return &gatewayv1.GatewayEvent{ EventType: event.EventType, EventId: event.EventID, TimestampMs: timestampMS, PayloadBytes: bytes.Clone(event.PayloadBytes), PayloadHash: bytes.Clone(payloadHash[:]), Signature: signature, RequestId: event.RequestID, TraceId: event.TraceID, }, nil } func mapSubscriptionError(err error) error { switch { case err == nil: return nil case errors.Is(err, push.ErrSubscriptionRevoked): return status.Error(codes.FailedPrecondition, "device session is revoked") case errors.Is(err, push.ErrSubscriptionOverflow): return status.Error(codes.ResourceExhausted, "push stream overflowed") case errors.Is(err, push.ErrHubShuttingDown): return status.Error(codes.Unavailable, "gateway is shutting down") default: return status.Error(codes.Internal, "push stream closed unexpectedly") } } func mapSubscriptionOutcome(err error) telemetry.EdgeOutcome { switch { case err == nil: return telemetry.EdgeOutcomeSuccess case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): return telemetry.EdgeOutcomeSuccess case errors.Is(err, push.ErrSubscriptionRevoked): return telemetry.EdgeOutcomeRevokedSession case errors.Is(err, push.ErrSubscriptionOverflow): return telemetry.EdgeOutcomeRateLimited case errors.Is(err, push.ErrHubShuttingDown): return telemetry.EdgeOutcomeGatewayShuttingDown default: return telemetry.EdgeOutcomeInternalError } } var _ gatewayv1.EdgeGatewayServer = fanOutPushStreamService{}