Files
2026-05-06 10:14:55 +03:00

146 lines
3.5 KiB
Go

// Package push hosts the backend gRPC listener used by gateway.
//
// Server owns the TCP listener and gRPC machinery. Service implements
// the PushServer interface and is registered against the gRPC server
// before Serve begins. On shutdown the server signals the service to
// drop its subscriptions, then performs the usual GracefulStop /
// forced-stop sequence.
package push
import (
"context"
"errors"
"fmt"
"net"
"sync"
"galaxy/backend/internal/config"
"galaxy/backend/internal/telemetry"
pushv1 "galaxy/backend/proto/push/v1"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"google.golang.org/grpc"
)
// Server owns the gRPC push listener.
type Server struct {
cfg config.GRPCPushConfig
svc *Service
logger *zap.Logger
runtime *telemetry.Runtime
stateMu sync.RWMutex
server *grpc.Server
listener net.Listener
}
// NewServer constructs a gRPC push server bound to cfg. svc must not be
// nil; it is registered as the pushv1.PushServer implementation when
// Run starts.
func NewServer(cfg config.GRPCPushConfig, svc *Service, logger *zap.Logger, runtime *telemetry.Runtime) *Server {
if logger == nil {
logger = zap.NewNop()
}
return &Server{
cfg: cfg,
svc: svc,
logger: logger.Named("grpc_push"),
runtime: runtime,
}
}
// Run binds the listener and serves the gRPC surface until Shutdown closes
// the server.
func (s *Server) Run(ctx context.Context) error {
if ctx == nil {
return errors.New("run backend gRPC push server: nil context")
}
if err := ctx.Err(); err != nil {
return err
}
if s.svc == nil {
return errors.New("run backend gRPC push server: nil service")
}
listener, err := net.Listen("tcp", s.cfg.Addr)
if err != nil {
return fmt.Errorf("run backend gRPC push server: listen on %q: %w", s.cfg.Addr, err)
}
grpcServer := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
pushv1.RegisterPushServer(grpcServer, s.svc)
s.stateMu.Lock()
s.server = grpcServer
s.listener = listener
s.stateMu.Unlock()
s.logger.Info("backend gRPC push server started", zap.String("addr", listener.Addr().String()))
defer func() {
s.stateMu.Lock()
s.server = nil
s.listener = nil
s.stateMu.Unlock()
}()
err = grpcServer.Serve(listener)
switch {
case err == nil:
return nil
case errors.Is(err, grpc.ErrServerStopped):
s.logger.Info("backend gRPC push server stopped")
return nil
default:
return fmt.Errorf("run backend gRPC push server: serve on %q: %w", s.cfg.Addr, err)
}
}
// Shutdown attempts a graceful stop within ctx, falling back to a forced stop
// when ctx expires before GracefulStop returns. The configured per-listener
// timeout further bounds the wait. Active SubscribePush streams are closed
// first so GracefulStop is not blocked by long-lived server-streaming RPCs.
func (s *Server) Shutdown(ctx context.Context) error {
if ctx == nil {
return errors.New("shutdown backend gRPC push server: nil context")
}
s.stateMu.RLock()
server := s.server
s.stateMu.RUnlock()
if server == nil {
return nil
}
if s.svc != nil {
s.svc.Close()
}
shutdownCtx, cancel := context.WithCancel(ctx)
defer cancel()
if s.cfg.ShutdownTimeout > 0 {
shutdownCtx, cancel = context.WithTimeout(ctx, s.cfg.ShutdownTimeout)
defer cancel()
}
stopped := make(chan struct{})
go func() {
server.GracefulStop()
close(stopped)
}()
select {
case <-stopped:
return nil
case <-shutdownCtx.Done():
server.Stop()
<-stopped
return fmt.Errorf("shutdown backend gRPC push server: %w", shutdownCtx.Err())
}
}