Files
galaxy-game/gateway/internal/events/client_subscriber_test.go
T
2026-04-02 19:18:42 +02:00

295 lines
7.7 KiB
Go

package events
import (
"context"
"strings"
"sync"
"testing"
"time"
"galaxy/gateway/internal/config"
"galaxy/gateway/internal/push"
"galaxy/gateway/internal/testutil"
"github.com/alicebob/miniredis/v2"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRedisClientEventSubscriberPublishesValidEvent(t *testing.T) {
t.Parallel()
server := miniredis.RunT(t)
publisher := &recordingClientEventPublisher{}
subscriber := newTestRedisClientEventSubscriber(t, server, publisher)
running := runTestClientEventSubscriber(t, subscriber)
defer running.stop(t)
addClientEvent(t, server, "gateway:client_events", map[string]any{
"user_id": "user-123",
"device_session_id": "device-session-123",
"event_type": "fleet.updated",
"event_id": "event-123",
"payload_bytes": []byte("payload-123"),
"request_id": "request-123",
"trace_id": "trace-123",
})
require.Eventually(t, func() bool {
return len(publisher.events()) == 1
}, time.Second, 10*time.Millisecond)
assert.Equal(t, []push.Event{{
UserID: "user-123",
DeviceSessionID: "device-session-123",
EventType: "fleet.updated",
EventID: "event-123",
PayloadBytes: []byte("payload-123"),
RequestID: "request-123",
TraceID: "trace-123",
}}, publisher.events())
}
func TestRedisClientEventSubscriberSkipsMalformedEventAndContinues(t *testing.T) {
t.Parallel()
server := miniredis.RunT(t)
publisher := &recordingClientEventPublisher{}
subscriber := newTestRedisClientEventSubscriber(t, server, publisher)
running := runTestClientEventSubscriber(t, subscriber)
defer running.stop(t)
addClientEvent(t, server, "gateway:client_events", map[string]any{
"user_id": "user-123",
"event_type": "fleet.updated",
"event_id": "event-bad",
"payload_bytes": []byte("payload-bad"),
"unexpected": "boom",
})
addClientEvent(t, server, "gateway:client_events", map[string]any{
"user_id": "user-123",
"event_type": "fleet.updated",
"event_id": "event-good",
"payload_bytes": []byte("payload-good"),
})
require.Eventually(t, func() bool {
events := publisher.events()
return len(events) == 1 && events[0].EventID == "event-good"
}, time.Second, 10*time.Millisecond)
}
func TestRedisClientEventSubscriberStartsFromCurrentTail(t *testing.T) {
t.Parallel()
server := miniredis.RunT(t)
publisher := &recordingClientEventPublisher{}
addClientEvent(t, server, "gateway:client_events", map[string]any{
"user_id": "user-123",
"event_type": "fleet.updated",
"event_id": "event-old",
"payload_bytes": []byte("payload-old"),
})
subscriber := newTestRedisClientEventSubscriber(t, server, publisher)
running := runTestClientEventSubscriber(t, subscriber)
defer running.stop(t)
assert.Never(t, func() bool {
return len(publisher.events()) > 0
}, 100*time.Millisecond, 10*time.Millisecond)
addClientEvent(t, server, "gateway:client_events", map[string]any{
"user_id": "user-123",
"event_type": "fleet.updated",
"event_id": "event-new",
"payload_bytes": []byte("payload-new"),
})
require.Eventually(t, func() bool {
events := publisher.events()
return len(events) == 1 && events[0].EventID == "event-new"
}, time.Second, 10*time.Millisecond)
}
func TestRedisClientEventSubscriberShutdownInterruptsBlockingRead(t *testing.T) {
t.Parallel()
server := miniredis.RunT(t)
publisher := &recordingClientEventPublisher{}
subscriber := newTestRedisClientEventSubscriber(t, server, publisher)
ctx, cancel := context.WithCancel(context.Background())
resultCh := make(chan error, 1)
go func() {
resultCh <- subscriber.Run(ctx)
}()
select {
case <-subscriber.started:
case <-time.After(time.Second):
require.FailNow(t, "subscriber did not start")
}
cancel()
require.NoError(t, subscriber.Shutdown(context.Background()))
select {
case err := <-resultCh:
require.ErrorIs(t, err, context.Canceled)
case <-time.After(time.Second):
require.FailNow(t, "subscriber did not stop after shutdown")
}
}
func TestRedisClientEventSubscriberLogsAndCountsMalformedEvents(t *testing.T) {
t.Parallel()
server := miniredis.RunT(t)
publisher := &recordingClientEventPublisher{}
logger, logBuffer := testutil.NewObservedLogger(t)
telemetryRuntime := testutil.NewTelemetryRuntime(t, logger)
subscriber, err := NewRedisClientEventSubscriberWithObservability(
config.SessionCacheRedisConfig{
Addr: server.Addr(),
LookupTimeout: 250 * time.Millisecond,
},
config.ClientEventsRedisConfig{
Stream: "gateway:client_events",
ReadBlockTimeout: 25 * time.Millisecond,
},
publisher,
logger,
telemetryRuntime,
)
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, subscriber.Close())
})
running := runTestClientEventSubscriber(t, subscriber)
defer running.stop(t)
addClientEvent(t, server, "gateway:client_events", map[string]any{
"user_id": "user-123",
"event_type": "fleet.updated",
"event_id": "event-bad",
"payload_bytes": []byte("payload-bad"),
"unexpected": "boom",
})
require.Eventually(t, func() bool {
return strings.Contains(logBuffer.String(), "dropped malformed client event")
}, time.Second, 10*time.Millisecond)
metricsText := testutil.ScrapeMetrics(t, telemetryRuntime.Handler())
assert.Contains(t, metricsText, `gateway_internal_event_drops_total`)
assert.Contains(t, metricsText, `component="client_event_subscriber"`)
assert.Contains(t, metricsText, `reason="malformed_event"`)
}
func newTestRedisClientEventSubscriber(t *testing.T, server *miniredis.Miniredis, publisher ClientEventPublisher) *RedisClientEventSubscriber {
t.Helper()
subscriber, err := NewRedisClientEventSubscriber(
config.SessionCacheRedisConfig{
Addr: server.Addr(),
LookupTimeout: 250 * time.Millisecond,
},
config.ClientEventsRedisConfig{
Stream: "gateway:client_events",
ReadBlockTimeout: 25 * time.Millisecond,
},
publisher,
)
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, subscriber.Close())
})
return subscriber
}
func addClientEvent(t *testing.T, server *miniredis.Miniredis, stream string, values map[string]any) {
t.Helper()
client := redis.NewClient(&redis.Options{
Addr: server.Addr(),
Protocol: 2,
DisableIdentity: true,
})
defer func() {
assert.NoError(t, client.Close())
}()
err := client.XAdd(context.Background(), &redis.XAddArgs{
Stream: stream,
Values: values,
}).Err()
require.NoError(t, err)
}
type runningClientEventSubscriber struct {
cancel context.CancelFunc
resultCh chan error
}
func runTestClientEventSubscriber(t *testing.T, subscriber *RedisClientEventSubscriber) runningClientEventSubscriber {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
resultCh := make(chan error, 1)
go func() {
resultCh <- subscriber.Run(ctx)
}()
select {
case <-subscriber.started:
case <-time.After(time.Second):
require.FailNow(t, "subscriber did not start")
}
return runningClientEventSubscriber{
cancel: cancel,
resultCh: resultCh,
}
}
func (r runningClientEventSubscriber) stop(t *testing.T) {
t.Helper()
r.cancel()
select {
case err := <-r.resultCh:
require.ErrorIs(t, err, context.Canceled)
case <-time.After(time.Second):
require.FailNow(t, "subscriber did not stop")
}
}
type recordingClientEventPublisher struct {
mu sync.Mutex
records []push.Event
}
func (p *recordingClientEventPublisher) Publish(event push.Event) {
p.mu.Lock()
defer p.mu.Unlock()
p.records = append(p.records, event)
}
func (p *recordingClientEventPublisher) events() []push.Event {
p.mu.Lock()
defer p.mu.Unlock()
cloned := make([]push.Event, len(p.records))
copy(cloned, p.records)
return cloned
}