bf7dca0a09
CI / changes (pull_request) Successful in 1s
CI / unit (pull_request) Successful in 9s
CI / integration (pull_request) Successful in 11s
CI / ui (pull_request) Has been skipped
CI / gate (pull_request) Successful in 0s
CI / deploy (pull_request) Successful in 1m6s
Two owner-reported defects from a live contour game. A. Frequency: the robot's proactive nudge fired hourly for 12h+ (a 12h idle threshold then the 1h cooldown, uncapped). Replaced with a lengthening, randomized schedule (proactiveNudgeGap): the first nudge ~60-90 min into the human's turn, each later gap growing toward 1-6h (uniform sample in [60min, ceil], ceil ramping 90min->6h over 12h of idle, measured from the previous nudge), so a long wait gets a handful of increasingly-spaced reminders instead of a stream. B. Language: out-of-app push routed by the recipient's GLOBAL service_language (last-login-wins), so after re-logging via the RU bot an English game's nudges came from the RU bot. Now a game push (your_turn, game_over, nudge, match_found) carries the game's own language (engine.Variant.Language) on push.Event, and the gateway routes by it (falling back to service_language for non-game pushes). The New-Game variant-gating guarantees the game's bot is one the player has started, so delivery is never blocked. Tests: proactiveNudgeGap unit + retimed TestRobotProactiveNudge; TestVariantLanguage; emit your_turn/game_over language; TestNudgeRoutedByGameLanguage integration. Docs: ARCHITECTURE (§7 nudge, §10/§13 routing), FUNCTIONAL (+ _ru), PLAN tracker.
110 lines
3.1 KiB
Go
110 lines
3.1 KiB
Go
// Package pushgrpc serves the backend -> gateway live-event stream: a gRPC
|
|
// server exposing the scrabble.push.v1 Push service (docs/ARCHITECTURE.md §2).
|
|
// It bridges the in-process notify.Hub to the wire — each Subscribe stream
|
|
// drains a hub subscription and forwards every Intent as a push Event. The
|
|
// gateway opens one long-lived Subscribe at startup and fans the events out to
|
|
// its clients.
|
|
package pushgrpc
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
|
|
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
|
|
"scrabble/backend/internal/notify"
|
|
pushv1 "scrabble/pkg/proto/push/v1"
|
|
)
|
|
|
|
// Service implements pushv1.PushServer over a notify.Hub.
|
|
type Service struct {
|
|
pushv1.UnimplementedPushServer
|
|
hub *notify.Hub
|
|
log *zap.Logger
|
|
}
|
|
|
|
// NewService constructs a Service that streams the hub's intents.
|
|
func NewService(hub *notify.Hub, log *zap.Logger) *Service {
|
|
if log == nil {
|
|
log = zap.NewNop()
|
|
}
|
|
return &Service{hub: hub, log: log}
|
|
}
|
|
|
|
// Subscribe opens a hub subscription and forwards every intent to the gateway
|
|
// until the stream's context ends (the gateway disconnected or the server is
|
|
// shutting down). It returns nil on a clean disconnect.
|
|
func (s *Service) Subscribe(req *pushv1.SubscribeRequest, stream grpc.ServerStreamingServer[pushv1.Event]) error {
|
|
ch, cancel := s.hub.Subscribe()
|
|
defer cancel()
|
|
|
|
s.log.Info("gateway push subscription opened", zap.String("gateway_id", req.GetGatewayId()))
|
|
defer s.log.Info("gateway push subscription closed", zap.String("gateway_id", req.GetGatewayId()))
|
|
|
|
ctx := stream.Context()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case in, ok := <-ch:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
ev := &pushv1.Event{
|
|
UserId: in.UserID.String(),
|
|
Kind: in.Kind,
|
|
Payload: in.Payload,
|
|
EventId: in.EventID,
|
|
Language: in.Language,
|
|
}
|
|
if err := stream.Send(ev); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Server wraps the gRPC listener serving the Push service. Its Run mirrors the
|
|
// HTTP server's: serve until the context is cancelled, then stop gracefully.
|
|
type Server struct {
|
|
grpc *grpc.Server
|
|
addr string
|
|
log *zap.Logger
|
|
}
|
|
|
|
// NewServer builds a gRPC server bound to addr that streams hub events.
|
|
func NewServer(addr string, hub *notify.Hub, log *zap.Logger) *Server {
|
|
if log == nil {
|
|
log = zap.NewNop()
|
|
}
|
|
gs := grpc.NewServer(grpc.StatsHandler(otelgrpc.NewServerHandler()))
|
|
pushv1.RegisterPushServer(gs, NewService(hub, log))
|
|
return &Server{grpc: gs, addr: addr, log: log}
|
|
}
|
|
|
|
// Run starts the listener and blocks until ctx is cancelled, then stops the
|
|
// server gracefully. It returns the first error that is not a clean shutdown.
|
|
func (s *Server) Run(ctx context.Context) error {
|
|
lis, err := net.Listen("tcp", s.addr)
|
|
if err != nil {
|
|
return fmt.Errorf("pushgrpc: listen %s: %w", s.addr, err)
|
|
}
|
|
errc := make(chan error, 1)
|
|
go func() {
|
|
s.log.Info("push grpc listener starting", zap.String("addr", s.addr))
|
|
errc <- s.grpc.Serve(lis)
|
|
}()
|
|
|
|
select {
|
|
case err := <-errc:
|
|
return err
|
|
case <-ctx.Done():
|
|
s.log.Info("push grpc listener stopping")
|
|
s.grpc.GracefulStop()
|
|
return nil
|
|
}
|
|
}
|