// Package pushgrpc serves the backend -> gateway live-event stream: a gRPC // server exposing the scrabble.push.v1 Push service (docs/ARCHITECTURE.md §2). // It bridges the in-process notify.Hub to the wire — each Subscribe stream // drains a hub subscription and forwards every Intent as a push Event. The // gateway opens one long-lived Subscribe at startup and fans the events out to // its clients. package pushgrpc import ( "context" "fmt" "net" "go.uber.org/zap" "google.golang.org/grpc" "scrabble/backend/internal/notify" pushv1 "scrabble/pkg/proto/push/v1" ) // Service implements pushv1.PushServer over a notify.Hub. type Service struct { pushv1.UnimplementedPushServer hub *notify.Hub log *zap.Logger } // NewService constructs a Service that streams the hub's intents. func NewService(hub *notify.Hub, log *zap.Logger) *Service { if log == nil { log = zap.NewNop() } return &Service{hub: hub, log: log} } // Subscribe opens a hub subscription and forwards every intent to the gateway // until the stream's context ends (the gateway disconnected or the server is // shutting down). It returns nil on a clean disconnect. func (s *Service) Subscribe(req *pushv1.SubscribeRequest, stream grpc.ServerStreamingServer[pushv1.Event]) error { ch, cancel := s.hub.Subscribe() defer cancel() s.log.Info("gateway push subscription opened", zap.String("gateway_id", req.GetGatewayId())) defer s.log.Info("gateway push subscription closed", zap.String("gateway_id", req.GetGatewayId())) ctx := stream.Context() for { select { case <-ctx.Done(): return nil case in, ok := <-ch: if !ok { return nil } ev := &pushv1.Event{ UserId: in.UserID.String(), Kind: in.Kind, Payload: in.Payload, EventId: in.EventID, } if err := stream.Send(ev); err != nil { return err } } } } // Server wraps the gRPC listener serving the Push service. Its Run mirrors the // HTTP server's: serve until the context is cancelled, then stop gracefully. type Server struct { grpc *grpc.Server addr string log *zap.Logger } // NewServer builds a gRPC server bound to addr that streams hub events. func NewServer(addr string, hub *notify.Hub, log *zap.Logger) *Server { if log == nil { log = zap.NewNop() } gs := grpc.NewServer() pushv1.RegisterPushServer(gs, NewService(hub, log)) return &Server{grpc: gs, addr: addr, log: log} } // Run starts the listener and blocks until ctx is cancelled, then stops the // server gracefully. It returns the first error that is not a clean shutdown. func (s *Server) Run(ctx context.Context) error { lis, err := net.Listen("tcp", s.addr) if err != nil { return fmt.Errorf("pushgrpc: listen %s: %w", s.addr, err) } errc := make(chan error, 1) go func() { s.log.Info("push grpc listener starting", zap.String("addr", s.addr)) errc <- s.grpc.Serve(lis) }() select { case err := <-errc: return err case <-ctx.Done(): s.log.Info("push grpc listener stopping") s.grpc.GracefulStop() return nil } }