49 lines
1.3 KiB
Go
49 lines
1.3 KiB
Go
package push
|
|
|
|
import (
|
|
"strings"
|
|
|
|
pushv1 "galaxy/backend/proto/push/v1"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// SubscribePush is the gRPC server handler. It registers the connection
|
|
// in the subscription registry, replays any in-buffer events newer than
|
|
// the requested cursor, and then streams live events until the client
|
|
// cancels, the subscription is replaced by a newer connection from the
|
|
// same gateway client id, or the Service is shut down.
|
|
func (s *Service) SubscribePush(req *pushv1.GatewaySubscribeRequest, stream grpc.ServerStreamingServer[pushv1.PushEvent]) error {
|
|
if req == nil || strings.TrimSpace(req.GetGatewayClientId()) == "" {
|
|
return status.Error(codes.InvalidArgument, "gateway_client_id is required")
|
|
}
|
|
|
|
sub, replay, err := s.register(req.GetGatewayClientId(), req.GetCursor())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer s.unregister(sub)
|
|
|
|
for _, ev := range replay {
|
|
if err := stream.Send(ev); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
ctx := stream.Context()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-sub.done:
|
|
return status.Error(codes.Aborted, "push subscription replaced or service stopped")
|
|
case ev := <-sub.ch:
|
|
if err := stream.Send(ev); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|