package worker import ( "context" "errors" "io" "log/slog" "testing" "time" redisstate "galaxy/notification/internal/adapters/redisstate" "galaxy/notification/internal/config" "galaxy/notification/internal/service/acceptintent" "galaxy/notification/internal/service/malformedintent" "github.com/alicebob/miniredis/v2" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestIntentConsumerStartsFromZeroOffsetWhenNoStoredOffsetExists(t *testing.T) { t.Parallel() fixture := newIntentConsumerFixture(t, stubUserDirectory{ records: map[string]acceptintent.UserRecord{ "user-1": {Email: "pilot@example.com", PreferredLanguage: "en"}, }, }) messageID := addValidIntent(t, fixture.client, fixture.stream, `{"turn_number":54,"game_name":"Nebula Clash","game_id":"game-123"}`) running := runIntentConsumer(t, fixture.consumer) defer running.stop(t) require.Eventually(t, func() bool { _, found, err := fixture.acceptanceStore.GetNotification(context.Background(), messageID) return err == nil && found }, time.Second, 10*time.Millisecond) } func TestIntentConsumerContinuesFromSavedOffsetAfterRestart(t *testing.T) { t.Parallel() fixture := newIntentConsumerFixture(t, stubUserDirectory{ records: map[string]acceptintent.UserRecord{ "user-1": {Email: "pilot@example.com", PreferredLanguage: "en"}, }, }) firstID := addValidIntent(t, fixture.client, fixture.stream, `{"turn_number":54,"game_name":"Nebula Clash","game_id":"game-123"}`) require.NoError(t, fixture.offsetStore.Save(context.Background(), fixture.stream, firstID)) secondID := addValidIntent(t, fixture.client, fixture.stream, `{"turn_number":55,"game_name":"Nebula Clash","game_id":"game-123"}`) running := runIntentConsumer(t, fixture.consumer) defer running.stop(t) require.Eventually(t, func() bool { _, found, err := fixture.acceptanceStore.GetNotification(context.Background(), secondID) return err == nil && found }, time.Second, 10*time.Millisecond) _, found, err := fixture.acceptanceStore.GetNotification(context.Background(), firstID) require.NoError(t, err) require.False(t, found) } func TestIntentConsumerRecordsIdempotencyConflictsAndAdvancesOffset(t *testing.T) { t.Parallel() fixture := newIntentConsumerFixture(t, stubUserDirectory{ records: map[string]acceptintent.UserRecord{ "user-1": {Email: "pilot@example.com", PreferredLanguage: "en"}, }, }) firstID := addValidIntent(t, fixture.client, fixture.stream, `{"turn_number":54,"game_name":"Nebula Clash","game_id":"game-123"}`) secondID := addValidIntent(t, fixture.client, fixture.stream, `{"turn_number":55,"game_name":"Nebula Clash","game_id":"game-123"}`) running := runIntentConsumer(t, fixture.consumer) defer running.stop(t) require.Eventually(t, func() bool { payload, err := fixture.client.Get(context.Background(), redisstate.Keyspace{}.MalformedIntent(secondID)).Bytes() if err != nil { return false } entry, err := redisstate.UnmarshalMalformedIntent(payload) if err != nil { return false } return entry.FailureCode == "idempotency_conflict" }, time.Second, 10*time.Millisecond) offset, found, err := fixture.offsetStore.Load(context.Background(), fixture.stream) require.NoError(t, err) require.True(t, found) require.Equal(t, secondID, offset) _, found, err = fixture.acceptanceStore.GetNotification(context.Background(), firstID) require.NoError(t, err) require.True(t, found) _, found, err = fixture.acceptanceStore.GetNotification(context.Background(), secondID) require.NoError(t, err) require.False(t, found) } func TestIntentConsumerShutdownInterruptsBlockingRead(t *testing.T) { t.Parallel() fixture := newIntentConsumerFixture(t, stubUserDirectory{}) ctx, cancel := context.WithCancel(context.Background()) resultCh := make(chan error, 1) go func() { resultCh <- fixture.consumer.Run(ctx) }() time.Sleep(50 * time.Millisecond) cancel() select { case err := <-resultCh: require.ErrorIs(t, err, context.Canceled) case <-time.After(time.Second): require.FailNow(t, "intent consumer did not stop after shutdown") } } func TestIntentConsumerRecordsRecipientNotFoundAndAdvancesOffset(t *testing.T) { t.Parallel() fixture := newIntentConsumerFixture(t, stubUserDirectory{}) messageID := addValidIntent(t, fixture.client, fixture.stream, `{"turn_number":54,"game_name":"Nebula Clash","game_id":"game-123"}`) running := runIntentConsumer(t, fixture.consumer) defer running.stop(t) require.Eventually(t, func() bool { payload, err := fixture.client.Get(context.Background(), redisstate.Keyspace{}.MalformedIntent(messageID)).Bytes() if err != nil { return false } entry, err := redisstate.UnmarshalMalformedIntent(payload) if err != nil { return false } return entry.FailureCode == malformedintent.FailureCodeRecipientNotFound }, time.Second, 10*time.Millisecond) offset, found, err := fixture.offsetStore.Load(context.Background(), fixture.stream) require.NoError(t, err) require.True(t, found) require.Equal(t, messageID, offset) _, found, err = fixture.acceptanceStore.GetNotification(context.Background(), messageID) require.NoError(t, err) require.False(t, found) } func TestIntentConsumerRecordsMalformedIntentAndAdvancesOffset(t *testing.T) { t.Parallel() fixture := newIntentConsumerFixture(t, stubUserDirectory{ records: map[string]acceptintent.UserRecord{ "user-1": {Email: "pilot@example.com", PreferredLanguage: "en"}, }, }) messageID, err := fixture.client.XAdd(context.Background(), &redis.XAddArgs{ Stream: fixture.stream, Values: map[string]any{ "notification_type": "game.turn.ready", "producer": "game_master", "audience_kind": "user", "recipient_user_ids_json": `["user-1"]`, "idempotency_key": "game-123:turn-ready", "occurred_at_ms": "1775121700000", }, }).Result() require.NoError(t, err) running := runIntentConsumer(t, fixture.consumer) defer running.stop(t) require.Eventually(t, func() bool { payload, err := fixture.client.Get(context.Background(), redisstate.Keyspace{}.MalformedIntent(messageID)).Bytes() if err != nil { return false } entry, err := redisstate.UnmarshalMalformedIntent(payload) if err != nil { return false } return entry.FailureCode == malformedintent.FailureCodeInvalidPayload && entry.StreamEntryID == messageID }, time.Second, 10*time.Millisecond) offset, found, err := fixture.offsetStore.Load(context.Background(), fixture.stream) require.NoError(t, err) require.True(t, found) require.Equal(t, messageID, offset) _, found, err = fixture.acceptanceStore.GetNotification(context.Background(), messageID) require.NoError(t, err) require.False(t, found) } func TestIntentConsumerRecordsTelemetryForOutcomesAndMalformedIntents(t *testing.T) { t.Parallel() fixture := newIntentConsumerFixture(t, stubUserDirectory{ records: map[string]acceptintent.UserRecord{ "user-1": {Email: "pilot@example.com", PreferredLanguage: "en"}, }, }) addValidIntent(t, fixture.client, fixture.stream, `{"turn_number":54,"game_name":"Nebula Clash","game_id":"game-123"}`) addValidIntent(t, fixture.client, fixture.stream, `{"turn_number":54,"game_name":"Nebula Clash","game_id":"game-123"}`) conflictID := addValidIntent(t, fixture.client, fixture.stream, `{"turn_number":55,"game_name":"Nebula Clash","game_id":"game-123"}`) running := runIntentConsumer(t, fixture.consumer) defer running.stop(t) require.Eventually(t, func() bool { payload, err := fixture.client.Get(context.Background(), redisstate.Keyspace{}.MalformedIntent(conflictID)).Bytes() if err != nil { return false } entry, err := redisstate.UnmarshalMalformedIntent(payload) if err != nil { return false } return entry.FailureCode == malformedintent.FailureCodeIdempotencyConflict }, time.Second, 10*time.Millisecond) require.Eventually(t, func() bool { return fixture.telemetry.hasIntentOutcome("accepted") && fixture.telemetry.hasIntentOutcome("duplicate") && fixture.telemetry.hasMalformedIntent("idempotency_conflict") }, time.Second, 10*time.Millisecond) } func TestIntentConsumerStopsWithoutAdvancingOffsetWhenUserDirectoryIsUnavailable(t *testing.T) { t.Parallel() fixture := newIntentConsumerFixture(t, stubUserDirectory{ err: errors.New("user service unavailable"), }) messageID := addValidIntent(t, fixture.client, fixture.stream, `{"turn_number":54,"game_name":"Nebula Clash","game_id":"game-123"}`) ctx, cancel := context.WithCancel(context.Background()) defer cancel() resultCh := make(chan error, 1) go func() { resultCh <- fixture.consumer.Run(ctx) }() var runErr error require.Eventually(t, func() bool { select { case runErr = <-resultCh: return true default: return false } }, time.Second, 10*time.Millisecond) require.Error(t, runErr) require.ErrorContains(t, runErr, "user service unavailable") _, found, err := fixture.offsetStore.Load(context.Background(), fixture.stream) require.NoError(t, err) require.False(t, found) _, found, err = fixture.acceptanceStore.GetNotification(context.Background(), messageID) require.NoError(t, err) require.False(t, found) } type intentConsumerFixture struct { client *redis.Client stream string acceptanceStore *redisstate.AcceptanceStore offsetStore *redisstate.StreamOffsetStore consumer *IntentConsumer telemetry *recordingWorkerTelemetry } func newIntentConsumerFixture(t *testing.T, userDirectory acceptintent.UserDirectory) intentConsumerFixture { t.Helper() server := miniredis.RunT(t) client := redis.NewClient(&redis.Options{ Addr: server.Addr(), Protocol: 2, DisableIdentity: true, }) t.Cleanup(func() { assert.NoError(t, client.Close()) }) acceptanceStore, err := redisstate.NewAcceptanceStore(client, redisstate.AcceptanceConfig{ RecordTTL: 24 * time.Hour, DeadLetterTTL: 72 * time.Hour, IdempotencyTTL: 7 * 24 * time.Hour, }) require.NoError(t, err) malformedStore, err := redisstate.NewMalformedIntentStore(client, 72*time.Hour) require.NoError(t, err) offsetStore, err := redisstate.NewStreamOffsetStore(client) require.NoError(t, err) telemetry := &recordingWorkerTelemetry{} service, err := acceptintent.New(acceptintent.Config{ Store: acceptanceStore, UserDirectory: userDirectory, Clock: fixedClock{now: time.UnixMilli(1775121700000).UTC()}, Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), Telemetry: telemetry, PushMaxAttempts: 3, EmailMaxAttempts: 7, IdempotencyTTL: 7 * 24 * time.Hour, AdminRouting: config.AdminRoutingConfig{}, }) require.NoError(t, err) consumer, err := NewIntentConsumer(IntentConsumerConfig{ Client: client, Stream: "notification:intents", BlockTimeout: 25 * time.Millisecond, Acceptor: service, MalformedRecorder: malformedStore, OffsetStore: offsetStore, Telemetry: telemetry, Clock: fixedClock{now: time.UnixMilli(1775121700001).UTC()}, }, slog.New(slog.NewTextHandler(io.Discard, nil))) require.NoError(t, err) return intentConsumerFixture{ client: client, stream: "notification:intents", acceptanceStore: acceptanceStore, offsetStore: offsetStore, consumer: consumer, telemetry: telemetry, } } func addValidIntent(t *testing.T, client *redis.Client, stream string, payloadJSON string) string { t.Helper() messageID, err := client.XAdd(context.Background(), &redis.XAddArgs{ Stream: stream, Values: map[string]any{ "notification_type": "game.turn.ready", "producer": "game_master", "audience_kind": "user", "recipient_user_ids_json": `["user-1"]`, "idempotency_key": "game-123:turn-ready", "occurred_at_ms": "1775121700000", "payload_json": payloadJSON, }, }).Result() require.NoError(t, err) return messageID } type runningIntentConsumer struct { cancel context.CancelFunc resultCh chan error } func runIntentConsumer(t *testing.T, consumer *IntentConsumer) runningIntentConsumer { t.Helper() ctx, cancel := context.WithCancel(context.Background()) resultCh := make(chan error, 1) go func() { resultCh <- consumer.Run(ctx) }() time.Sleep(50 * time.Millisecond) return runningIntentConsumer{ cancel: cancel, resultCh: resultCh, } } func (r runningIntentConsumer) stop(t *testing.T) { t.Helper() r.cancel() select { case err := <-r.resultCh: require.ErrorIs(t, err, context.Canceled) case <-time.After(time.Second): require.FailNow(t, "intent consumer did not stop") } } type fixedClock struct { now time.Time } func (clock fixedClock) Now() time.Time { return clock.now } type stubUserDirectory struct { records map[string]acceptintent.UserRecord err error } func (directory stubUserDirectory) GetUserByID(_ context.Context, userID string) (acceptintent.UserRecord, error) { if directory.err != nil { return acceptintent.UserRecord{}, directory.err } if record, ok := directory.records[userID]; ok { return record, nil } return acceptintent.UserRecord{}, acceptintent.ErrRecipientNotFound }