package push import ( "context" "net" "testing" "time" pushv1 "galaxy/backend/proto/push/v1" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" "google.golang.org/grpc/test/bufconn" ) const bufconnBufferSize = 1024 * 1024 // startBufconnServer wires svc into an in-process gRPC server reachable // through a bufconn dialer. The returned cleanup function stops the // server and closes the listener. func startBufconnServer(t *testing.T, svc *Service) (pushv1.PushClient, func()) { t.Helper() lis := bufconn.Listen(bufconnBufferSize) server := grpc.NewServer() pushv1.RegisterPushServer(server, svc) go func() { _ = server.Serve(lis) }() conn, err := grpc.NewClient( "passthrough://bufnet", grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) { return lis.DialContext(context.Background()) }), grpc.WithTransportCredentials(insecure.NewCredentials()), ) require.NoError(t, err) cleanup := func() { _ = conn.Close() server.Stop() _ = lis.Close() } return pushv1.NewPushClient(conn), cleanup } func recvOne(t *testing.T, stream pushv1.Push_SubscribePushClient, timeout time.Duration) (*pushv1.PushEvent, error) { t.Helper() type result struct { ev *pushv1.PushEvent err error } ch := make(chan result, 1) go func() { ev, err := stream.Recv() ch <- result{ev, err} }() select { case r := <-ch: return r.ev, r.err case <-time.After(timeout): t.Fatalf("timed out waiting for push event after %s", timeout) return nil, nil } } func TestSubscribePushDeliversLiveEvents(t *testing.T) { t.Parallel() svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 16, PerConnBuffer: 8}, nil, nil) require.NoError(t, err) t.Cleanup(svc.Close) client, cleanup := startBufconnServer(t, svc) defer cleanup() stream, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1"}) require.NoError(t, err) require.Eventually(t, func() bool { return svc.SubscriberCount() == 1 }, time.Second, 5*time.Millisecond) userID := uuid.New() require.NoError(t, svc.PublishClientEvent(context.Background(), userID, nil, JSONEvent{EventKind: "k"},"", "", "")) ev, err := recvOne(t, stream, time.Second) require.NoError(t, err) assert.Equal(t, formatCursor(1), ev.Cursor) assert.Equal(t, userID.String(), ev.GetClientEvent().UserId) } func TestSubscribePushReplaysPastEventsOnReconnect(t *testing.T) { t.Parallel() svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 16, PerConnBuffer: 8}, nil, nil) require.NoError(t, err) t.Cleanup(svc.Close) userID := uuid.New() for range 3 { require.NoError(t, svc.PublishClientEvent(context.Background(), userID, nil, JSONEvent{EventKind: "k"},"", "", "")) } client, cleanup := startBufconnServer(t, svc) defer cleanup() stream, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1", Cursor: formatCursor(1)}) require.NoError(t, err) for i := uint64(2); i <= 3; i++ { ev, err := recvOne(t, stream, time.Second) require.NoError(t, err) assert.Equal(t, formatCursor(i), ev.Cursor) } } func TestSubscribePushSkipsReplayWhenCursorStale(t *testing.T) { t.Parallel() svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 2, PerConnBuffer: 8}, nil, nil) require.NoError(t, err) t.Cleanup(svc.Close) userID := uuid.New() for range 4 { require.NoError(t, svc.PublishClientEvent(context.Background(), userID, nil, JSONEvent{EventKind: "k"},"", "", "")) } // Ring capacity 2 means cursors 1 and 2 are evicted. client, cleanup := startBufconnServer(t, svc) defer cleanup() stream, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1", Cursor: formatCursor(1)}) require.NoError(t, err) require.Eventually(t, func() bool { return svc.SubscriberCount() == 1 }, time.Second, 5*time.Millisecond) // Stale cursor → no replay; live publish must arrive. require.NoError(t, svc.PublishClientEvent(context.Background(), userID, nil, JSONEvent{EventKind: "k"},"", "", "")) ev, err := recvOne(t, stream, time.Second) require.NoError(t, err) assert.Equal(t, formatCursor(5), ev.Cursor) } func TestSubscribePushReplacesExistingClientID(t *testing.T) { t.Parallel() svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 8, PerConnBuffer: 8}, nil, nil) require.NoError(t, err) t.Cleanup(svc.Close) client, cleanup := startBufconnServer(t, svc) defer cleanup() stream1, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1"}) require.NoError(t, err) require.Eventually(t, func() bool { return svc.SubscriberCount() == 1 }, time.Second, 5*time.Millisecond) stream2, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1"}) require.NoError(t, err) // First stream must terminate with Aborted. _, err = recvOne(t, stream1, time.Second) require.Error(t, err) assert.Equal(t, codes.Aborted, status.Code(err)) // Subscriber count returns to one (the replacement). require.Eventually(t, func() bool { return svc.SubscriberCount() == 1 }, time.Second, 5*time.Millisecond) // Live publish reaches the replacement. require.NoError(t, svc.PublishClientEvent(context.Background(), uuid.New(), nil, JSONEvent{EventKind: "k"},"", "", "")) ev, err := recvOne(t, stream2, time.Second) require.NoError(t, err) assert.NotEmpty(t, ev.Cursor) } func TestSubscribePushRejectsEmptyClientID(t *testing.T) { t.Parallel() svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 4, PerConnBuffer: 4}, nil, nil) require.NoError(t, err) t.Cleanup(svc.Close) client, cleanup := startBufconnServer(t, svc) defer cleanup() stream, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{}) require.NoError(t, err) _, err = stream.Recv() require.Error(t, err) assert.Equal(t, codes.InvalidArgument, status.Code(err)) } func TestSubscriptionDeliverDropsOldestOnOverflow(t *testing.T) { t.Parallel() sub := &subscription{ clientID: "gw-1", ch: make(chan *pushv1.PushEvent, 2), done: make(chan struct{}), } first := mkEvent(1, "a") second := mkEvent(2, "b") third := mkEvent(3, "c") assert.False(t, sub.deliver(first)) assert.False(t, sub.deliver(second)) assert.True(t, sub.deliver(third), "third deliver must report a drop") got1 := <-sub.ch got2 := <-sub.ch assert.Equal(t, second, got1, "oldest event (first) was dropped") assert.Equal(t, third, got2) } func TestServiceCloseTerminatesActiveStream(t *testing.T) { t.Parallel() svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 4, PerConnBuffer: 4}, nil, nil) require.NoError(t, err) client, cleanup := startBufconnServer(t, svc) defer cleanup() stream, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1"}) require.NoError(t, err) require.Eventually(t, func() bool { return svc.SubscriberCount() == 1 }, time.Second, 5*time.Millisecond) svc.Close() _, err = recvOne(t, stream, time.Second) require.Error(t, err) assert.Equal(t, codes.Aborted, status.Code(err)) }