// Package notify is the backend's in-process live-event seam. Domain services // publish Intents after a successful commit; the gRPC push server (internal // /pushgrpc) subscribes to the hub and streams them to the gateway, which fans // them out to clients (docs/ARCHITECTURE.md §10). Event payloads are // FlatBuffers-encoded by the typed constructors in events.go, so the domain // services stay free of the wire schema and only depend on this package. // // Publishing is best-effort and non-blocking: a live event is a convenience, not // a correctness requirement, so a slow or absent subscriber never blocks a game // transition. The default Publisher is Nop, which keeps every domain service (and // its tests) runnable without a live channel. package notify import ( "sync" "github.com/google/uuid" ) // Notification kinds — the catalog in docs/ARCHITECTURE.md §10. const ( KindYourTurn = "your_turn" KindOpponentMoved = "opponent_moved" KindChatMessage = "chat_message" KindNudge = "nudge" KindMatchFound = "match_found" ) // Intent is one live event destined for a single user. Payload is the // FlatBuffers-encoded body (a scrabblefb.* table) that the gateway forwards // verbatim to the client; EventID is a correlation id carried through unchanged. type Intent struct { UserID uuid.UUID Kind string Payload []byte EventID string } // Publisher accepts live-event intents. Implementations must be safe for // concurrent use and must not block the caller. type Publisher interface { Publish(intents ...Intent) } // Nop is the default Publisher: it discards every intent. type Nop struct{} // Publish discards the intents. func (Nop) Publish(...Intent) {} // Hub is the in-process fan-in/fan-out between the domain publishers and the // push subscribers (the gRPC stream). It is safe for concurrent use. type Hub struct { mu sync.Mutex subs map[int]chan Intent nextID int bufSize int } // defaultBuffer is the per-subscriber queue depth used when NewHub is given a // non-positive size. const defaultBuffer = 256 // NewHub returns a Hub whose per-subscriber buffer holds bufSize intents before // dropping (a slow subscriber never blocks a publisher). func NewHub(bufSize int) *Hub { if bufSize <= 0 { bufSize = defaultBuffer } return &Hub{subs: make(map[int]chan Intent), bufSize: bufSize} } // Publish delivers each intent to every current subscriber, dropping it for any // subscriber whose buffer is full (best-effort live delivery). func (h *Hub) Publish(intents ...Intent) { h.mu.Lock() defer h.mu.Unlock() for _, in := range intents { for _, ch := range h.subs { select { case ch <- in: default: } } } } // Subscribe registers a new subscriber and returns its intent channel and an // unsubscribe func that closes the channel. The caller reads the channel until // it is closed or its own context ends, then calls unsubscribe. func (h *Hub) Subscribe() (<-chan Intent, func()) { h.mu.Lock() defer h.mu.Unlock() id := h.nextID h.nextID++ ch := make(chan Intent, h.bufSize) h.subs[id] = ch return ch, func() { h.unsubscribe(id) } } // unsubscribe removes and closes the subscriber's channel. It holds the same // lock as Publish, so it never closes a channel mid-send. func (h *Hub) unsubscribe(id int) { h.mu.Lock() defer h.mu.Unlock() if ch, ok := h.subs[id]; ok { delete(h.subs, id) close(ch) } }