109 lines
2.9 KiB
Go
109 lines
2.9 KiB
Go
package push
|
|
|
|
import (
|
|
"time"
|
|
|
|
pushv1 "galaxy/backend/proto/push/v1"
|
|
)
|
|
|
|
// ringEntry is one event stored in the in-memory replay buffer. The
|
|
// cursor is duplicated here for O(1) comparison without re-parsing
|
|
// event.Cursor.
|
|
type ringEntry struct {
|
|
cursor uint64
|
|
addedAt time.Time
|
|
event *pushv1.PushEvent
|
|
}
|
|
|
|
// ring is the in-memory replay buffer. Entries are evicted by either
|
|
// freshness-window TTL or capacity, whichever triggers first. The ring
|
|
// is not safe for concurrent use; the owning Service serialises access
|
|
// under its mutex.
|
|
type ring struct {
|
|
capacity int
|
|
ttl time.Duration
|
|
entries []ringEntry
|
|
lastEvicted uint64 // largest cursor evicted from the buffer
|
|
hasLastEvicted bool
|
|
}
|
|
|
|
func newRing(capacity int, ttl time.Duration) *ring {
|
|
return &ring{
|
|
capacity: capacity,
|
|
ttl: ttl,
|
|
entries: make([]ringEntry, 0, capacity),
|
|
}
|
|
}
|
|
|
|
// append records ev with its cursor and evicts entries past TTL or
|
|
// capacity. The caller is responsible for setting ev.Cursor to
|
|
// formatCursor(cursor) before calling.
|
|
func (r *ring) append(cursor uint64, ev *pushv1.PushEvent, now time.Time) {
|
|
r.evictExpired(now)
|
|
for len(r.entries) >= r.capacity {
|
|
r.evictHead()
|
|
}
|
|
r.entries = append(r.entries, ringEntry{cursor: cursor, addedAt: now, event: ev})
|
|
}
|
|
|
|
// since returns the events with cursor strictly greater than fromCursor
|
|
// in ascending cursor order. The boolean is true when the requested
|
|
// cursor is "stale" — either older than the oldest retained event or
|
|
// older than the last evicted cursor — meaning the caller missed at
|
|
// least one event that the ring no longer holds. Stale callers receive
|
|
// no replay and must resume from the live tail.
|
|
func (r *ring) since(fromCursor uint64, now time.Time) ([]*pushv1.PushEvent, bool) {
|
|
r.evictExpired(now)
|
|
if len(r.entries) == 0 {
|
|
// An empty ring is never stale: gateway is either fully caught
|
|
// up or there has been no traffic.
|
|
return nil, false
|
|
}
|
|
if r.hasLastEvicted && fromCursor < r.lastEvicted {
|
|
return nil, true
|
|
}
|
|
first := r.entries[0].cursor
|
|
if fromCursor+1 < first {
|
|
return nil, true
|
|
}
|
|
out := make([]*pushv1.PushEvent, 0)
|
|
for i := range r.entries {
|
|
if r.entries[i].cursor > fromCursor {
|
|
out = append(out, r.entries[i].event)
|
|
}
|
|
}
|
|
return out, false
|
|
}
|
|
|
|
// len reports the current number of retained entries; intended for
|
|
// tests and metrics.
|
|
func (r *ring) len() int {
|
|
return len(r.entries)
|
|
}
|
|
|
|
func (r *ring) evictExpired(now time.Time) {
|
|
if r.ttl <= 0 {
|
|
return
|
|
}
|
|
cutoff := now.Add(-r.ttl)
|
|
drop := 0
|
|
for drop < len(r.entries) && r.entries[drop].addedAt.Before(cutoff) {
|
|
drop++
|
|
}
|
|
if drop == 0 {
|
|
return
|
|
}
|
|
r.lastEvicted = r.entries[drop-1].cursor
|
|
r.hasLastEvicted = true
|
|
r.entries = append(r.entries[:0], r.entries[drop:]...)
|
|
}
|
|
|
|
func (r *ring) evictHead() {
|
|
if len(r.entries) == 0 {
|
|
return
|
|
}
|
|
r.lastEvicted = r.entries[0].cursor
|
|
r.hasLastEvicted = true
|
|
r.entries = append(r.entries[:0], r.entries[1:]...)
|
|
}
|