// Package events translates inbound `pushv1.PushEvent` frames received // from backend into actions on the gateway-side push hub. It replaces // the Stage <6.2 Redis Stream subscribers (`session_events`, // `client_events`) with a single dispatcher driven by the gRPC // SubscribePush stream. package events import ( "context" "strings" pushv1 "galaxy/backend/proto/push/v1" "galaxy/gateway/internal/push" "galaxy/gateway/internal/telemetry" "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" ) // SessionInvalidator closes every active push subscription bound to a // (device_session_id) or every session of a user when the backend emits // a SessionInvalidation frame. *push.Hub satisfies this contract. type SessionInvalidator interface { RevokeDeviceSession(deviceSessionID string) RevokeAllForUser(userID string) } // EventPublisher fans out a translated client event to active push // subscriptions. *push.Hub satisfies this contract. type EventPublisher interface { Publish(event push.Event) } // Dispatcher converts inbound `pushv1.PushEvent` frames into either a // hub Publish or a hub revocation. Malformed frames are dropped and // counted via telemetry; observability mirrors the previous // RecordInternalEventDrop semantics. type Dispatcher struct { publisher EventPublisher invalidator SessionInvalidator logger *zap.Logger metrics *telemetry.Runtime } // NewDispatcher constructs a Dispatcher. publisher and invalidator are // required; logger and metrics may be nil. func NewDispatcher(publisher EventPublisher, invalidator SessionInvalidator, logger *zap.Logger, metrics *telemetry.Runtime) *Dispatcher { if logger == nil { logger = zap.NewNop() } return &Dispatcher{ publisher: publisher, invalidator: invalidator, logger: logger.Named("push_dispatcher"), metrics: metrics, } } // Handle implements backendclient.EventHandler. It is safe for // concurrent use; the caller serialises ev within its goroutine. func (d *Dispatcher) Handle(ctx context.Context, ev *pushv1.PushEvent) { if d == nil || ev == nil { return } switch kind := ev.GetKind().(type) { case *pushv1.PushEvent_ClientEvent: d.handleClientEvent(ctx, kind.ClientEvent, ev.GetCursor()) case *pushv1.PushEvent_SessionInvalidation: d.handleSessionInvalidation(kind.SessionInvalidation) default: d.logger.Warn("dropped malformed push event", zap.String("cursor", ev.GetCursor()), zap.String("reason", "unknown_kind"), ) d.recordDrop(ctx, "unknown_kind") } } func (d *Dispatcher) handleClientEvent(ctx context.Context, ce *pushv1.ClientEvent, cursor string) { if ce == nil || d.publisher == nil { return } userID := strings.TrimSpace(ce.GetUserId()) kind := strings.TrimSpace(ce.GetKind()) eventID := strings.TrimSpace(ce.GetEventId()) if userID == "" || kind == "" || eventID == "" { d.logger.Warn("dropped malformed client event", zap.String("cursor", cursor), zap.String("user_id", userID), zap.String("kind", kind), zap.String("event_id", eventID), ) d.recordDrop(ctx, "malformed_client_event") return } d.publisher.Publish(push.Event{ UserID: userID, DeviceSessionID: strings.TrimSpace(ce.GetDeviceSessionId()), EventType: kind, EventID: eventID, PayloadBytes: cloneBytes(ce.GetPayload()), RequestID: ce.GetRequestId(), TraceID: ce.GetTraceId(), }) } func (d *Dispatcher) handleSessionInvalidation(si *pushv1.SessionInvalidation) { if si == nil || d.invalidator == nil { return } userID := strings.TrimSpace(si.GetUserId()) deviceSessionID := strings.TrimSpace(si.GetDeviceSessionId()) switch { case deviceSessionID != "": d.invalidator.RevokeDeviceSession(deviceSessionID) case userID != "": d.invalidator.RevokeAllForUser(userID) default: d.logger.Warn("dropped malformed session_invalidation: user_id and device_session_id both empty") } } func (d *Dispatcher) recordDrop(ctx context.Context, reason string) { if d.metrics == nil { return } d.metrics.RecordInternalEventDrop(ctx, attribute.String("component", "push_dispatcher"), attribute.String("reason", reason), ) } func cloneBytes(in []byte) []byte { if len(in) == 0 { return nil } out := make([]byte, len(in)) copy(out, in) return out }