Files
scrabble-game/backend/internal/pushgrpc/server.go
T
Ilia Denisov dcd8de8b00
Tests · Go / test (push) Successful in 11s
Tests · Integration / integration (push) Successful in 12s
Tests · Go / test (pull_request) Successful in 10s
Tests · Integration / integration (pull_request) Successful in 11s
Stage 12: observability & performance (OTel/OTLP, domain metrics, guest GC)
- pkg/telemetry: shared OTel provider bootstrap (none/stdout/otlp + W3C
  propagators + Go runtime metrics); backend/internal/telemetry becomes a thin
  facade keeping its gin middleware.
- Telemetry parity: gateway and the Telegram connector gain telemetry runtimes
  and config (GATEWAY_/TELEGRAM_ SERVICE_NAME + OTEL_*); otelgrpc instruments the
  backend push server, the gateway's backend+connector clients and the connector
  server. Default exporter stays none (collector/dashboards are Stage 14).
- Operational metrics (variant attribute on game-scoped ones): game_replay_duration,
  game_move_validate_duration, games_started_total, games_abandoned_total,
  game_cache_active, chat_messages_total{kind}, gateway edge_request_duration.
  Wired via the SetMetrics setter pattern (default no-op meter).
- TODO-3: account.GuestReaper deletes guests with no game seat past
  BACKEND_GUEST_RETENTION (default 30d, swept every BACKEND_GUEST_REAP_INTERVAL).
- Tests: pkg/telemetry exporter selection; game/social/edge metric recording via
  a manual reader; config (otlp accepted, guest knobs); inttest guest reaper.
- Docs: PLAN.md re-scopes Stage 12 and adds Stage 13 (alphabet-on-wire) + Stage 14
  (CI/deploy) with the agreed dictionary-versioning resolution; ARCHITECTURE 11/13,
  TESTING, the three READMEs and FUNCTIONAL(+ru) updated.
2026-06-04 14:22:15 +02:00

109 lines
3.1 KiB
Go

// Package pushgrpc serves the backend -> gateway live-event stream: a gRPC
// server exposing the scrabble.push.v1 Push service (docs/ARCHITECTURE.md §2).
// It bridges the in-process notify.Hub to the wire — each Subscribe stream
// drains a hub subscription and forwards every Intent as a push Event. The
// gateway opens one long-lived Subscribe at startup and fans the events out to
// its clients.
package pushgrpc
import (
"context"
"fmt"
"net"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"google.golang.org/grpc"
"scrabble/backend/internal/notify"
pushv1 "scrabble/pkg/proto/push/v1"
)
// Service implements pushv1.PushServer over a notify.Hub.
type Service struct {
pushv1.UnimplementedPushServer
hub *notify.Hub
log *zap.Logger
}
// NewService constructs a Service that streams the hub's intents.
func NewService(hub *notify.Hub, log *zap.Logger) *Service {
if log == nil {
log = zap.NewNop()
}
return &Service{hub: hub, log: log}
}
// Subscribe opens a hub subscription and forwards every intent to the gateway
// until the stream's context ends (the gateway disconnected or the server is
// shutting down). It returns nil on a clean disconnect.
func (s *Service) Subscribe(req *pushv1.SubscribeRequest, stream grpc.ServerStreamingServer[pushv1.Event]) error {
ch, cancel := s.hub.Subscribe()
defer cancel()
s.log.Info("gateway push subscription opened", zap.String("gateway_id", req.GetGatewayId()))
defer s.log.Info("gateway push subscription closed", zap.String("gateway_id", req.GetGatewayId()))
ctx := stream.Context()
for {
select {
case <-ctx.Done():
return nil
case in, ok := <-ch:
if !ok {
return nil
}
ev := &pushv1.Event{
UserId: in.UserID.String(),
Kind: in.Kind,
Payload: in.Payload,
EventId: in.EventID,
}
if err := stream.Send(ev); err != nil {
return err
}
}
}
}
// Server wraps the gRPC listener serving the Push service. Its Run mirrors the
// HTTP server's: serve until the context is cancelled, then stop gracefully.
type Server struct {
grpc *grpc.Server
addr string
log *zap.Logger
}
// NewServer builds a gRPC server bound to addr that streams hub events.
func NewServer(addr string, hub *notify.Hub, log *zap.Logger) *Server {
if log == nil {
log = zap.NewNop()
}
gs := grpc.NewServer(grpc.StatsHandler(otelgrpc.NewServerHandler()))
pushv1.RegisterPushServer(gs, NewService(hub, log))
return &Server{grpc: gs, addr: addr, log: log}
}
// Run starts the listener and blocks until ctx is cancelled, then stops the
// server gracefully. It returns the first error that is not a clean shutdown.
func (s *Server) Run(ctx context.Context) error {
lis, err := net.Listen("tcp", s.addr)
if err != nil {
return fmt.Errorf("pushgrpc: listen %s: %w", s.addr, err)
}
errc := make(chan error, 1)
go func() {
s.log.Info("push grpc listener starting", zap.String("addr", s.addr))
errc <- s.grpc.Serve(lis)
}()
select {
case err := <-errc:
return err
case <-ctx.Done():
s.log.Info("push grpc listener stopping")
s.grpc.GracefulStop()
return nil
}
}