// Package push hosts the backend gRPC SubscribePush server and the // publisher API consumed by other backend domains. // // Service implements pushv1.PushServer. It maintains: // // - a connection registry keyed by GatewaySubscribeRequest.gateway_client_id; // - an in-memory ring buffer of recent PushEvent values with TTL equal // to BACKEND_FRESHNESS_WINDOW; // - a monotonic cursor generator stamped on every published event. // // Publisher methods (PublishClientEvent, PublishSessionInvalidation) // satisfy the SessionInvalidator interface in internal/auth and the // PushPublisher interface in internal/notification — main.go injects // a single *Service into both wiring sites. // // See `backend/README.md` §7 and `backend/docs/flows.md` for cursor, // ring buffer, and backpressure semantics. package push import ( "context" "encoding/json" "errors" "fmt" "strings" "sync" "time" "galaxy/backend/internal/telemetry" pushv1 "galaxy/backend/proto/push/v1" "github.com/google/uuid" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // Default sizing for the ring buffer and per-connection delivery queue. // The values are intentionally hard-coded: ring TTL is the operational // dial (BACKEND_FRESHNESS_WINDOW) and the buffer sizes are chosen to // comfortably absorb a freshness window of traffic at MVP rates. const ( defaultRingCapacity = 1024 defaultPerConnBuffer = 256 ) // ServiceConfig configures a Service. FreshnessWindow is required and // fixes the ring buffer's per-event TTL. RingCapacity and PerConnBuffer // fall back to the package defaults when zero. Now overrides time.Now // for deterministic tests. type ServiceConfig struct { FreshnessWindow time.Duration RingCapacity int PerConnBuffer int Now func() time.Time } // Service implements pushv1.PushServer and exposes the publisher API. // One Service is shared by every backend domain that needs to push; // it is safe for concurrent use. type Service struct { pushv1.UnimplementedPushServer logger *zap.Logger now func() time.Time perConnBuffer int mu sync.Mutex closed bool subs map[string]*subscription ring *ring cursorGen cursorGenerator eventsTotal metric.Int64Counter droppedTotal metric.Int64Counter } // NewService constructs a Service. A nil logger falls back to // zap.NewNop. A nil runtime disables metric emission so tests can // instantiate the service without the OpenTelemetry runtime. func NewService(cfg ServiceConfig, logger *zap.Logger, runtime *telemetry.Runtime) (*Service, error) { if cfg.FreshnessWindow <= 0 { return nil, errors.New("push.NewService: FreshnessWindow must be positive") } if logger == nil { logger = zap.NewNop() } if cfg.Now == nil { cfg.Now = time.Now } if cfg.RingCapacity <= 0 { cfg.RingCapacity = defaultRingCapacity } if cfg.PerConnBuffer <= 0 { cfg.PerConnBuffer = defaultPerConnBuffer } s := &Service{ logger: logger.Named("push"), now: cfg.Now, perConnBuffer: cfg.PerConnBuffer, subs: make(map[string]*subscription), ring: newRing(cfg.RingCapacity, cfg.FreshnessWindow), } if runtime != nil { if err := s.registerMetrics(runtime); err != nil { return nil, fmt.Errorf("push.NewService: register metrics: %w", err) } } return s, nil } // Close drops every active subscription and refuses new ones. It is // safe to call multiple times. The owning Server must call Close before // initiating GracefulStop so streaming handlers exit promptly. func (s *Service) Close() { s.mu.Lock() defer s.mu.Unlock() if s.closed { return } s.closed = true for clientID, sub := range s.subs { close(sub.done) delete(s.subs, clientID) } } // PublishClientEvent enqueues a ClientEvent for delivery. payload is // marshalled to JSON; deviceSessionID is optional. eventID, requestID // and traceID are correlation identifiers that gateway forwards // verbatim into the signed client envelope (typically the producing // route id, the originating client request id, and the trace id of the // span that produced the event); empty strings are forwarded // unchanged. The method satisfies notification.PushPublisher. func (s *Service) PublishClientEvent(_ context.Context, userID uuid.UUID, deviceSessionID *uuid.UUID, kind string, payload map[string]any, eventID, requestID, traceID string) error { if userID == uuid.Nil { return errors.New("push.PublishClientEvent: userID is required") } if strings.TrimSpace(kind) == "" { return errors.New("push.PublishClientEvent: kind is required") } encoded, err := json.Marshal(payload) if err != nil { return fmt.Errorf("push.PublishClientEvent: marshal payload: %w", err) } ev := &pushv1.PushEvent{ Kind: &pushv1.PushEvent_ClientEvent{ ClientEvent: &pushv1.ClientEvent{ UserId: userID.String(), Kind: kind, Payload: encoded, EventId: eventID, RequestId: requestID, TraceId: traceID, }, }, } if deviceSessionID != nil { ev.GetClientEvent().DeviceSessionId = deviceSessionID.String() } s.publish(ev, "client_event") return nil } // PublishSessionInvalidation enqueues a SessionInvalidation event. It // satisfies auth.SessionInvalidator. deviceSessionID may be uuid.Nil to // invalidate every session of userID. func (s *Service) PublishSessionInvalidation(_ context.Context, deviceSessionID, userID uuid.UUID, reason string) { if userID == uuid.Nil { s.logger.Warn("push session invalidation skipped: userID is required", zap.String("device_session_id", deviceSessionID.String()), zap.String("reason", reason), ) return } ev := &pushv1.PushEvent{ Kind: &pushv1.PushEvent_SessionInvalidation{ SessionInvalidation: &pushv1.SessionInvalidation{ UserId: userID.String(), Reason: reason, }, }, } if deviceSessionID != uuid.Nil { ev.GetSessionInvalidation().DeviceSessionId = deviceSessionID.String() } s.publish(ev, "session_invalidation") } func (s *Service) publish(ev *pushv1.PushEvent, kindLabel string) { s.mu.Lock() defer s.mu.Unlock() if s.closed { return } cursor := s.cursorGen.next() ev.Cursor = formatCursor(cursor) s.ring.append(cursor, ev, s.now()) if s.eventsTotal != nil { s.eventsTotal.Add(context.Background(), 1, metric.WithAttributes(attribute.String("kind", kindLabel))) } for clientID, sub := range s.subs { if dropped := sub.deliver(ev); dropped { if s.droppedTotal != nil { s.droppedTotal.Add(context.Background(), 1, metric.WithAttributes(attribute.String("gateway_client_id", clientID))) } s.logger.Warn("push subscription dropped event", zap.String("gateway_client_id", clientID), zap.String("cursor", ev.Cursor), zap.String("event_kind", kindLabel), ) } } } // register installs a new subscription for clientID and returns the // replay slice the caller must send before draining the live channel. // An existing subscription for the same clientID is closed first so // the previous reader goroutine exits. func (s *Service) register(clientID, cursor string) (*subscription, []*pushv1.PushEvent, error) { s.mu.Lock() defer s.mu.Unlock() if s.closed { return nil, nil, status.Error(codes.Unavailable, "push service stopped") } if existing, ok := s.subs[clientID]; ok { close(existing.done) delete(s.subs, clientID) s.logger.Info("push subscription replaced", zap.String("gateway_client_id", clientID), ) } sub := &subscription{ clientID: clientID, ch: make(chan *pushv1.PushEvent, s.perConnBuffer), done: make(chan struct{}), } s.subs[clientID] = sub from, ok := parseCursor(cursor) if !ok { s.logger.Warn("push subscribe with malformed cursor; resuming from live tail", zap.String("gateway_client_id", clientID), zap.String("cursor", cursor), ) } replay, stale := s.ring.since(from, s.now()) if stale { s.logger.Info("push subscribe cursor stale; replay skipped", zap.String("gateway_client_id", clientID), zap.String("cursor", cursor), ) } else if len(replay) > 0 { s.logger.Info("push subscribe replay", zap.String("gateway_client_id", clientID), zap.String("cursor", cursor), zap.Int("events", len(replay)), ) } return sub, replay, nil } // unregister removes sub from the registry when the reader goroutine // exits. It is a no-op when sub has already been replaced — the // replacement subscription owns the entry under the same clientID. func (s *Service) unregister(sub *subscription) { s.mu.Lock() defer s.mu.Unlock() if cur, ok := s.subs[sub.clientID]; ok && cur == sub { delete(s.subs, sub.clientID) } } // SubscriberCount reports the number of active subscriptions; used by // metrics callbacks and tests. func (s *Service) SubscriberCount() int { s.mu.Lock() defer s.mu.Unlock() return len(s.subs) } func (s *Service) registerMetrics(runtime *telemetry.Runtime) error { meter := runtime.MeterProvider().Meter("galaxy.backend/push") subscribers, err := meter.Int64ObservableGauge( "grpc_push_subscribers", metric.WithDescription("Number of gateway clients currently subscribed to the backend push stream."), metric.WithUnit("1"), ) if err != nil { return err } if _, err := meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(subscribers, int64(s.SubscriberCount())) return nil }, subscribers); err != nil { return err } eventsTotal, err := meter.Int64Counter( "grpc_push_events_total", metric.WithDescription("Number of push events published, partitioned by event kind."), metric.WithUnit("1"), ) if err != nil { return err } s.eventsTotal = eventsTotal droppedTotal, err := meter.Int64Counter( "grpc_push_dropped_total", metric.WithDescription("Number of push events dropped because a subscriber buffer was full, partitioned by gateway client id."), metric.WithUnit("1"), ) if err != nil { return err } s.droppedTotal = droppedTotal return nil }