334 lines
9.9 KiB
Go
334 lines
9.9 KiB
Go
// Package push hosts the backend gRPC SubscribePush server and the
|
|
// publisher API consumed by other backend domains.
|
|
//
|
|
// Service implements pushv1.PushServer. It maintains:
|
|
//
|
|
// - a connection registry keyed by GatewaySubscribeRequest.gateway_client_id;
|
|
// - an in-memory ring buffer of recent PushEvent values with TTL equal
|
|
// to BACKEND_FRESHNESS_WINDOW;
|
|
// - a monotonic cursor generator stamped on every published event.
|
|
//
|
|
// Publisher methods (PublishClientEvent, PublishSessionInvalidation)
|
|
// satisfy the SessionInvalidator interface in internal/auth and the
|
|
// PushPublisher interface in internal/notification — main.go injects
|
|
// a single *Service into both wiring sites.
|
|
//
|
|
// See `backend/README.md` §7 and `backend/docs/flows.md` for cursor,
|
|
// ring buffer, and backpressure semantics.
|
|
package push
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"galaxy/backend/internal/telemetry"
|
|
pushv1 "galaxy/backend/proto/push/v1"
|
|
|
|
"github.com/google/uuid"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// Default sizing for the ring buffer and per-connection delivery queue.
|
|
// The values are intentionally hard-coded: ring TTL is the operational
|
|
// dial (BACKEND_FRESHNESS_WINDOW) and the buffer sizes are chosen to
|
|
// comfortably absorb a freshness window of traffic at MVP rates.
|
|
const (
|
|
defaultRingCapacity = 1024
|
|
defaultPerConnBuffer = 256
|
|
)
|
|
|
|
// ServiceConfig configures a Service. FreshnessWindow is required and
|
|
// fixes the ring buffer's per-event TTL. RingCapacity and PerConnBuffer
|
|
// fall back to the package defaults when zero. Now overrides time.Now
|
|
// for deterministic tests.
|
|
type ServiceConfig struct {
|
|
FreshnessWindow time.Duration
|
|
RingCapacity int
|
|
PerConnBuffer int
|
|
Now func() time.Time
|
|
}
|
|
|
|
// Service implements pushv1.PushServer and exposes the publisher API.
|
|
// One Service is shared by every backend domain that needs to push;
|
|
// it is safe for concurrent use.
|
|
type Service struct {
|
|
pushv1.UnimplementedPushServer
|
|
|
|
logger *zap.Logger
|
|
now func() time.Time
|
|
|
|
perConnBuffer int
|
|
|
|
mu sync.Mutex
|
|
closed bool
|
|
subs map[string]*subscription
|
|
ring *ring
|
|
cursorGen cursorGenerator
|
|
|
|
eventsTotal metric.Int64Counter
|
|
droppedTotal metric.Int64Counter
|
|
}
|
|
|
|
// NewService constructs a Service. A nil logger falls back to
|
|
// zap.NewNop. A nil runtime disables metric emission so tests can
|
|
// instantiate the service without the OpenTelemetry runtime.
|
|
func NewService(cfg ServiceConfig, logger *zap.Logger, runtime *telemetry.Runtime) (*Service, error) {
|
|
if cfg.FreshnessWindow <= 0 {
|
|
return nil, errors.New("push.NewService: FreshnessWindow must be positive")
|
|
}
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
if cfg.Now == nil {
|
|
cfg.Now = time.Now
|
|
}
|
|
if cfg.RingCapacity <= 0 {
|
|
cfg.RingCapacity = defaultRingCapacity
|
|
}
|
|
if cfg.PerConnBuffer <= 0 {
|
|
cfg.PerConnBuffer = defaultPerConnBuffer
|
|
}
|
|
|
|
s := &Service{
|
|
logger: logger.Named("push"),
|
|
now: cfg.Now,
|
|
perConnBuffer: cfg.PerConnBuffer,
|
|
subs: make(map[string]*subscription),
|
|
ring: newRing(cfg.RingCapacity, cfg.FreshnessWindow),
|
|
}
|
|
|
|
if runtime != nil {
|
|
if err := s.registerMetrics(runtime); err != nil {
|
|
return nil, fmt.Errorf("push.NewService: register metrics: %w", err)
|
|
}
|
|
}
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// Close drops every active subscription and refuses new ones. It is
|
|
// safe to call multiple times. The owning Server must call Close before
|
|
// initiating GracefulStop so streaming handlers exit promptly.
|
|
func (s *Service) Close() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.closed {
|
|
return
|
|
}
|
|
s.closed = true
|
|
for clientID, sub := range s.subs {
|
|
close(sub.done)
|
|
delete(s.subs, clientID)
|
|
}
|
|
}
|
|
|
|
// PublishClientEvent enqueues a ClientEvent for delivery. The typed
|
|
// `event` carries both the catalog kind and the payload bytes;
|
|
// push.Service invokes event.Marshal() at publish time so producers
|
|
// stay decoupled from the wire encoding. deviceSessionID is optional.
|
|
// eventID, requestID and traceID are correlation identifiers that
|
|
// gateway forwards verbatim into the signed client envelope (typically
|
|
// the producing route id, the originating client request id, and the
|
|
// trace id of the span that produced the event); empty strings are
|
|
// forwarded unchanged. The method satisfies
|
|
// notification.PushPublisher.
|
|
func (s *Service) PublishClientEvent(_ context.Context, userID uuid.UUID, deviceSessionID *uuid.UUID, event Event, eventID, requestID, traceID string) error {
|
|
if event == nil {
|
|
return errors.New("push.PublishClientEvent: event is required")
|
|
}
|
|
if userID == uuid.Nil {
|
|
return errors.New("push.PublishClientEvent: userID is required")
|
|
}
|
|
kind := event.Kind()
|
|
if strings.TrimSpace(kind) == "" {
|
|
return errors.New("push.PublishClientEvent: event kind is required")
|
|
}
|
|
encoded, err := event.Marshal()
|
|
if err != nil {
|
|
return fmt.Errorf("push.PublishClientEvent: marshal event: %w", err)
|
|
}
|
|
ev := &pushv1.PushEvent{
|
|
Kind: &pushv1.PushEvent_ClientEvent{
|
|
ClientEvent: &pushv1.ClientEvent{
|
|
UserId: userID.String(),
|
|
Kind: kind,
|
|
Payload: encoded,
|
|
EventId: eventID,
|
|
RequestId: requestID,
|
|
TraceId: traceID,
|
|
},
|
|
},
|
|
}
|
|
if deviceSessionID != nil {
|
|
ev.GetClientEvent().DeviceSessionId = deviceSessionID.String()
|
|
}
|
|
s.publish(ev, "client_event")
|
|
return nil
|
|
}
|
|
|
|
// PublishSessionInvalidation enqueues a SessionInvalidation event. It
|
|
// satisfies auth.SessionInvalidator. deviceSessionID may be uuid.Nil to
|
|
// invalidate every session of userID.
|
|
func (s *Service) PublishSessionInvalidation(_ context.Context, deviceSessionID, userID uuid.UUID, reason string) {
|
|
if userID == uuid.Nil {
|
|
s.logger.Warn("push session invalidation skipped: userID is required",
|
|
zap.String("device_session_id", deviceSessionID.String()),
|
|
zap.String("reason", reason),
|
|
)
|
|
return
|
|
}
|
|
ev := &pushv1.PushEvent{
|
|
Kind: &pushv1.PushEvent_SessionInvalidation{
|
|
SessionInvalidation: &pushv1.SessionInvalidation{
|
|
UserId: userID.String(),
|
|
Reason: reason,
|
|
},
|
|
},
|
|
}
|
|
if deviceSessionID != uuid.Nil {
|
|
ev.GetSessionInvalidation().DeviceSessionId = deviceSessionID.String()
|
|
}
|
|
s.publish(ev, "session_invalidation")
|
|
}
|
|
|
|
func (s *Service) publish(ev *pushv1.PushEvent, kindLabel string) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.closed {
|
|
return
|
|
}
|
|
cursor := s.cursorGen.next()
|
|
ev.Cursor = formatCursor(cursor)
|
|
s.ring.append(cursor, ev, s.now())
|
|
if s.eventsTotal != nil {
|
|
s.eventsTotal.Add(context.Background(), 1, metric.WithAttributes(attribute.String("kind", kindLabel)))
|
|
}
|
|
for clientID, sub := range s.subs {
|
|
if dropped := sub.deliver(ev); dropped {
|
|
if s.droppedTotal != nil {
|
|
s.droppedTotal.Add(context.Background(), 1, metric.WithAttributes(attribute.String("gateway_client_id", clientID)))
|
|
}
|
|
s.logger.Warn("push subscription dropped event",
|
|
zap.String("gateway_client_id", clientID),
|
|
zap.String("cursor", ev.Cursor),
|
|
zap.String("event_kind", kindLabel),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// register installs a new subscription for clientID and returns the
|
|
// replay slice the caller must send before draining the live channel.
|
|
// An existing subscription for the same clientID is closed first so
|
|
// the previous reader goroutine exits.
|
|
func (s *Service) register(clientID, cursor string) (*subscription, []*pushv1.PushEvent, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.closed {
|
|
return nil, nil, status.Error(codes.Unavailable, "push service stopped")
|
|
}
|
|
if existing, ok := s.subs[clientID]; ok {
|
|
close(existing.done)
|
|
delete(s.subs, clientID)
|
|
s.logger.Info("push subscription replaced",
|
|
zap.String("gateway_client_id", clientID),
|
|
)
|
|
}
|
|
sub := &subscription{
|
|
clientID: clientID,
|
|
ch: make(chan *pushv1.PushEvent, s.perConnBuffer),
|
|
done: make(chan struct{}),
|
|
}
|
|
s.subs[clientID] = sub
|
|
|
|
from, ok := parseCursor(cursor)
|
|
if !ok {
|
|
s.logger.Warn("push subscribe with malformed cursor; resuming from live tail",
|
|
zap.String("gateway_client_id", clientID),
|
|
zap.String("cursor", cursor),
|
|
)
|
|
}
|
|
replay, stale := s.ring.since(from, s.now())
|
|
if stale {
|
|
s.logger.Info("push subscribe cursor stale; replay skipped",
|
|
zap.String("gateway_client_id", clientID),
|
|
zap.String("cursor", cursor),
|
|
)
|
|
} else if len(replay) > 0 {
|
|
s.logger.Info("push subscribe replay",
|
|
zap.String("gateway_client_id", clientID),
|
|
zap.String("cursor", cursor),
|
|
zap.Int("events", len(replay)),
|
|
)
|
|
}
|
|
return sub, replay, nil
|
|
}
|
|
|
|
// unregister removes sub from the registry when the reader goroutine
|
|
// exits. It is a no-op when sub has already been replaced — the
|
|
// replacement subscription owns the entry under the same clientID.
|
|
func (s *Service) unregister(sub *subscription) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if cur, ok := s.subs[sub.clientID]; ok && cur == sub {
|
|
delete(s.subs, sub.clientID)
|
|
}
|
|
}
|
|
|
|
// SubscriberCount reports the number of active subscriptions; used by
|
|
// metrics callbacks and tests.
|
|
func (s *Service) SubscriberCount() int {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return len(s.subs)
|
|
}
|
|
|
|
func (s *Service) registerMetrics(runtime *telemetry.Runtime) error {
|
|
meter := runtime.MeterProvider().Meter("galaxy.backend/push")
|
|
|
|
subscribers, err := meter.Int64ObservableGauge(
|
|
"grpc_push_subscribers",
|
|
metric.WithDescription("Number of gateway clients currently subscribed to the backend push stream."),
|
|
metric.WithUnit("1"),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
|
|
o.ObserveInt64(subscribers, int64(s.SubscriberCount()))
|
|
return nil
|
|
}, subscribers); err != nil {
|
|
return err
|
|
}
|
|
|
|
eventsTotal, err := meter.Int64Counter(
|
|
"grpc_push_events_total",
|
|
metric.WithDescription("Number of push events published, partitioned by event kind."),
|
|
metric.WithUnit("1"),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.eventsTotal = eventsTotal
|
|
|
|
droppedTotal, err := meter.Int64Counter(
|
|
"grpc_push_dropped_total",
|
|
metric.WithDescription("Number of push events dropped because a subscriber buffer was full, partitioned by gateway client id."),
|
|
metric.WithUnit("1"),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.droppedTotal = droppedTotal
|
|
|
|
return nil
|
|
}
|