Files
2026-05-06 10:14:55 +03:00

173 lines
5.5 KiB
Go

package grpcapi
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"galaxy/gateway/authn"
"galaxy/gateway/internal/clock"
"galaxy/gateway/internal/logging"
"galaxy/gateway/internal/push"
"galaxy/gateway/internal/telemetry"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// NewFanOutPushStreamService constructs the authenticated SubscribeEvents tail
// service that registers active streams in hub and forwards client-facing
// events after the bootstrap event has been sent.
func NewFanOutPushStreamService(hub *push.Hub, responseSigner authn.ResponseSigner, clk clock.Clock, logger *zap.Logger) gatewayv1.EdgeGatewayServer {
if responseSigner == nil {
responseSigner = unavailableResponseSigner{}
}
if clk == nil {
clk = clock.System{}
}
if logger == nil {
logger = zap.NewNop()
}
return fanOutPushStreamService{
hub: hub,
responseSigner: responseSigner,
clock: clk,
logger: logger.Named("push_stream"),
}
}
// fanOutPushStreamService owns the post-bootstrap authenticated push-stream
// lifecycle backed by the in-memory push hub.
type fanOutPushStreamService struct {
gatewayv1.UnimplementedEdgeGatewayServer
hub *push.Hub
responseSigner authn.ResponseSigner
clock clock.Clock
logger *zap.Logger
}
// SubscribeEvents registers the verified stream in the push hub and forwards
// matching client-facing events until the stream ends.
func (s fanOutPushStreamService) SubscribeEvents(_ *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
binding, ok := authenticatedStreamBindingFromContext(stream.Context())
if !ok {
return status.Error(codes.Internal, "authenticated request context is incomplete")
}
if s.hub == nil {
return status.Error(codes.Internal, "push hub is unavailable")
}
subscription, err := s.hub.Register(push.StreamBinding{
UserID: binding.UserID,
DeviceSessionID: binding.DeviceSessionID,
})
if err != nil {
return status.Error(codes.Internal, "push stream registration failed")
}
defer subscription.Close()
openFields := []zap.Field{
zap.String("component", "authenticated_grpc"),
zap.String("transport", "grpc"),
zap.String("rpc_method", authenticatedRPCSubscribeEvents),
zap.String("message_type", binding.MessageType),
zap.String("request_id", binding.RequestID),
zap.String("trace_id", binding.TraceID),
zap.String("device_session_id", binding.DeviceSessionID),
zap.String("user_id", binding.UserID),
}
openFields = append(openFields, logging.TraceFieldsFromContext(stream.Context())...)
s.logger.Info("push stream opened", openFields...)
for {
select {
case <-stream.Context().Done():
s.logger.Info("push stream closed", append(openFields, zap.String("edge_outcome", string(mapSubscriptionOutcome(stream.Context().Err()))))...)
return stream.Context().Err()
case <-subscription.Done():
subscriptionErr := subscription.Err()
s.logger.Warn("push stream closed", append(openFields,
zap.String("edge_outcome", string(mapSubscriptionOutcome(subscriptionErr))),
zap.String("reject_reason", string(mapSubscriptionOutcome(subscriptionErr))),
)...)
return mapSubscriptionError(subscriptionErr)
case event := <-subscription.Events():
signedEvent, err := s.buildGatewayEvent(event)
if err != nil {
return err
}
if err := stream.Send(signedEvent); err != nil {
return err
}
}
}
}
func (s fanOutPushStreamService) buildGatewayEvent(event push.Event) (*gatewayv1.GatewayEvent, error) {
timestampMS := s.clock.Now().UTC().UnixMilli()
payloadHash := sha256.Sum256(event.PayloadBytes)
signature, err := s.responseSigner.SignEvent(authn.EventSigningFields{
EventType: event.EventType,
EventID: event.EventID,
TimestampMS: timestampMS,
RequestID: event.RequestID,
TraceID: event.TraceID,
PayloadHash: payloadHash[:],
})
if err != nil {
return nil, status.Error(codes.Unavailable, "response signer is unavailable")
}
return &gatewayv1.GatewayEvent{
EventType: event.EventType,
EventId: event.EventID,
TimestampMs: timestampMS,
PayloadBytes: bytes.Clone(event.PayloadBytes),
PayloadHash: bytes.Clone(payloadHash[:]),
Signature: signature,
RequestId: event.RequestID,
TraceId: event.TraceID,
}, nil
}
func mapSubscriptionError(err error) error {
switch {
case err == nil:
return nil
case errors.Is(err, push.ErrSubscriptionRevoked):
return status.Error(codes.FailedPrecondition, "device session is revoked")
case errors.Is(err, push.ErrSubscriptionOverflow):
return status.Error(codes.ResourceExhausted, "push stream overflowed")
case errors.Is(err, push.ErrHubShuttingDown):
return status.Error(codes.Unavailable, "gateway is shutting down")
default:
return status.Error(codes.Internal, "push stream closed unexpectedly")
}
}
func mapSubscriptionOutcome(err error) telemetry.EdgeOutcome {
switch {
case err == nil:
return telemetry.EdgeOutcomeSuccess
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return telemetry.EdgeOutcomeSuccess
case errors.Is(err, push.ErrSubscriptionRevoked):
return telemetry.EdgeOutcomeRevokedSession
case errors.Is(err, push.ErrSubscriptionOverflow):
return telemetry.EdgeOutcomeRateLimited
case errors.Is(err, push.ErrHubShuttingDown):
return telemetry.EdgeOutcomeGatewayShuttingDown
default:
return telemetry.EdgeOutcomeInternalError
}
}
var _ gatewayv1.EdgeGatewayServer = fanOutPushStreamService{}