Files
galaxy-game/backend/push/ring.go
T
2026-05-06 10:14:55 +03:00

109 lines
2.9 KiB
Go

package push
import (
"time"
pushv1 "galaxy/backend/proto/push/v1"
)
// ringEntry is one event stored in the in-memory replay buffer. The
// cursor is duplicated here for O(1) comparison without re-parsing
// event.Cursor.
type ringEntry struct {
cursor uint64
addedAt time.Time
event *pushv1.PushEvent
}
// ring is the in-memory replay buffer. Entries are evicted by either
// freshness-window TTL or capacity, whichever triggers first. The ring
// is not safe for concurrent use; the owning Service serialises access
// under its mutex.
type ring struct {
capacity int
ttl time.Duration
entries []ringEntry
lastEvicted uint64 // largest cursor evicted from the buffer
hasLastEvicted bool
}
func newRing(capacity int, ttl time.Duration) *ring {
return &ring{
capacity: capacity,
ttl: ttl,
entries: make([]ringEntry, 0, capacity),
}
}
// append records ev with its cursor and evicts entries past TTL or
// capacity. The caller is responsible for setting ev.Cursor to
// formatCursor(cursor) before calling.
func (r *ring) append(cursor uint64, ev *pushv1.PushEvent, now time.Time) {
r.evictExpired(now)
for len(r.entries) >= r.capacity {
r.evictHead()
}
r.entries = append(r.entries, ringEntry{cursor: cursor, addedAt: now, event: ev})
}
// since returns the events with cursor strictly greater than fromCursor
// in ascending cursor order. The boolean is true when the requested
// cursor is "stale" — either older than the oldest retained event or
// older than the last evicted cursor — meaning the caller missed at
// least one event that the ring no longer holds. Stale callers receive
// no replay and must resume from the live tail.
func (r *ring) since(fromCursor uint64, now time.Time) ([]*pushv1.PushEvent, bool) {
r.evictExpired(now)
if len(r.entries) == 0 {
// An empty ring is never stale: gateway is either fully caught
// up or there has been no traffic.
return nil, false
}
if r.hasLastEvicted && fromCursor < r.lastEvicted {
return nil, true
}
first := r.entries[0].cursor
if fromCursor+1 < first {
return nil, true
}
out := make([]*pushv1.PushEvent, 0)
for i := range r.entries {
if r.entries[i].cursor > fromCursor {
out = append(out, r.entries[i].event)
}
}
return out, false
}
// len reports the current number of retained entries; intended for
// tests and metrics.
func (r *ring) len() int {
return len(r.entries)
}
func (r *ring) evictExpired(now time.Time) {
if r.ttl <= 0 {
return
}
cutoff := now.Add(-r.ttl)
drop := 0
for drop < len(r.entries) && r.entries[drop].addedAt.Before(cutoff) {
drop++
}
if drop == 0 {
return
}
r.lastEvicted = r.entries[drop-1].cursor
r.hasLastEvicted = true
r.entries = append(r.entries[:0], r.entries[drop:]...)
}
func (r *ring) evictHead() {
if len(r.entries) == 0 {
return
}
r.lastEvicted = r.entries[0].cursor
r.hasLastEvicted = true
r.entries = append(r.entries[:0], r.entries[1:]...)
}