package push import ( "strings" pushv1 "galaxy/backend/proto/push/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // SubscribePush is the gRPC server handler. It registers the connection // in the subscription registry, replays any in-buffer events newer than // the requested cursor, and then streams live events until the client // cancels, the subscription is replaced by a newer connection from the // same gateway client id, or the Service is shut down. func (s *Service) SubscribePush(req *pushv1.GatewaySubscribeRequest, stream grpc.ServerStreamingServer[pushv1.PushEvent]) error { if req == nil || strings.TrimSpace(req.GetGatewayClientId()) == "" { return status.Error(codes.InvalidArgument, "gateway_client_id is required") } sub, replay, err := s.register(req.GetGatewayClientId(), req.GetCursor()) if err != nil { return err } defer s.unregister(sub) for _, ev := range replay { if err := stream.Send(ev); err != nil { return err } } ctx := stream.Context() for { select { case <-ctx.Done(): return nil case <-sub.done: return status.Error(codes.Aborted, "push subscription replaced or service stopped") case ev := <-sub.ch: if err := stream.Send(ev); err != nil { return err } } } }