Files
2026-05-06 10:14:55 +03:00

49 lines
1.3 KiB
Go

package push
import (
"strings"
pushv1 "galaxy/backend/proto/push/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// SubscribePush is the gRPC server handler. It registers the connection
// in the subscription registry, replays any in-buffer events newer than
// the requested cursor, and then streams live events until the client
// cancels, the subscription is replaced by a newer connection from the
// same gateway client id, or the Service is shut down.
func (s *Service) SubscribePush(req *pushv1.GatewaySubscribeRequest, stream grpc.ServerStreamingServer[pushv1.PushEvent]) error {
if req == nil || strings.TrimSpace(req.GetGatewayClientId()) == "" {
return status.Error(codes.InvalidArgument, "gateway_client_id is required")
}
sub, replay, err := s.register(req.GetGatewayClientId(), req.GetCursor())
if err != nil {
return err
}
defer s.unregister(sub)
for _, ev := range replay {
if err := stream.Send(ev); err != nil {
return err
}
}
ctx := stream.Context()
for {
select {
case <-ctx.Done():
return nil
case <-sub.done:
return status.Error(codes.Aborted, "push subscription replaced or service stopped")
case ev := <-sub.ch:
if err := stream.Send(ev); err != nil {
return err
}
}
}
}