Files
galaxy-game/gateway/internal/events/dispatcher.go
T
2026-05-06 10:14:55 +03:00

146 lines
4.2 KiB
Go

// Package events translates inbound `pushv1.PushEvent` frames received
// from backend into actions on the gateway-side push hub. It replaces
// the Stage <6.2 Redis Stream subscribers (`session_events`,
// `client_events`) with a single dispatcher driven by the gRPC
// SubscribePush stream.
package events
import (
"context"
"strings"
pushv1 "galaxy/backend/proto/push/v1"
"galaxy/gateway/internal/push"
"galaxy/gateway/internal/telemetry"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
)
// SessionInvalidator closes every active push subscription bound to a
// (device_session_id) or every session of a user when the backend emits
// a SessionInvalidation frame. *push.Hub satisfies this contract.
type SessionInvalidator interface {
RevokeDeviceSession(deviceSessionID string)
RevokeAllForUser(userID string)
}
// EventPublisher fans out a translated client event to active push
// subscriptions. *push.Hub satisfies this contract.
type EventPublisher interface {
Publish(event push.Event)
}
// Dispatcher converts inbound `pushv1.PushEvent` frames into either a
// hub Publish or a hub revocation. Malformed frames are dropped and
// counted via telemetry; observability mirrors the previous
// RecordInternalEventDrop semantics.
type Dispatcher struct {
publisher EventPublisher
invalidator SessionInvalidator
logger *zap.Logger
metrics *telemetry.Runtime
}
// NewDispatcher constructs a Dispatcher. publisher and invalidator are
// required; logger and metrics may be nil.
func NewDispatcher(publisher EventPublisher, invalidator SessionInvalidator, logger *zap.Logger, metrics *telemetry.Runtime) *Dispatcher {
if logger == nil {
logger = zap.NewNop()
}
return &Dispatcher{
publisher: publisher,
invalidator: invalidator,
logger: logger.Named("push_dispatcher"),
metrics: metrics,
}
}
// Handle implements backendclient.EventHandler. It is safe for
// concurrent use; the caller serialises ev within its goroutine.
func (d *Dispatcher) Handle(ctx context.Context, ev *pushv1.PushEvent) {
if d == nil || ev == nil {
return
}
switch kind := ev.GetKind().(type) {
case *pushv1.PushEvent_ClientEvent:
d.handleClientEvent(ctx, kind.ClientEvent, ev.GetCursor())
case *pushv1.PushEvent_SessionInvalidation:
d.handleSessionInvalidation(kind.SessionInvalidation)
default:
d.logger.Warn("dropped malformed push event",
zap.String("cursor", ev.GetCursor()),
zap.String("reason", "unknown_kind"),
)
d.recordDrop(ctx, "unknown_kind")
}
}
func (d *Dispatcher) handleClientEvent(ctx context.Context, ce *pushv1.ClientEvent, cursor string) {
if ce == nil || d.publisher == nil {
return
}
userID := strings.TrimSpace(ce.GetUserId())
kind := strings.TrimSpace(ce.GetKind())
eventID := strings.TrimSpace(ce.GetEventId())
if userID == "" || kind == "" || eventID == "" {
d.logger.Warn("dropped malformed client event",
zap.String("cursor", cursor),
zap.String("user_id", userID),
zap.String("kind", kind),
zap.String("event_id", eventID),
)
d.recordDrop(ctx, "malformed_client_event")
return
}
d.publisher.Publish(push.Event{
UserID: userID,
DeviceSessionID: strings.TrimSpace(ce.GetDeviceSessionId()),
EventType: kind,
EventID: eventID,
PayloadBytes: cloneBytes(ce.GetPayload()),
RequestID: ce.GetRequestId(),
TraceID: ce.GetTraceId(),
})
}
func (d *Dispatcher) handleSessionInvalidation(si *pushv1.SessionInvalidation) {
if si == nil || d.invalidator == nil {
return
}
userID := strings.TrimSpace(si.GetUserId())
deviceSessionID := strings.TrimSpace(si.GetDeviceSessionId())
switch {
case deviceSessionID != "":
d.invalidator.RevokeDeviceSession(deviceSessionID)
case userID != "":
d.invalidator.RevokeAllForUser(userID)
default:
d.logger.Warn("dropped malformed session_invalidation: user_id and device_session_id both empty")
}
}
func (d *Dispatcher) recordDrop(ctx context.Context, reason string) {
if d.metrics == nil {
return
}
d.metrics.RecordInternalEventDrop(ctx,
attribute.String("component", "push_dispatcher"),
attribute.String("reason", reason),
)
}
func cloneBytes(in []byte) []byte {
if len(in) == 0 {
return nil
}
out := make([]byte, len(in))
copy(out, in)
return out
}