Files
galaxy-game/backend/push/service_test.go
T
2026-05-07 00:58:53 +03:00

241 lines
7.3 KiB
Go

package push
import (
"context"
"net"
"testing"
"time"
pushv1 "galaxy/backend/proto/push/v1"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"
)
const bufconnBufferSize = 1024 * 1024
// startBufconnServer wires svc into an in-process gRPC server reachable
// through a bufconn dialer. The returned cleanup function stops the
// server and closes the listener.
func startBufconnServer(t *testing.T, svc *Service) (pushv1.PushClient, func()) {
t.Helper()
lis := bufconn.Listen(bufconnBufferSize)
server := grpc.NewServer()
pushv1.RegisterPushServer(server, svc)
go func() {
_ = server.Serve(lis)
}()
conn, err := grpc.NewClient(
"passthrough://bufnet",
grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
return lis.DialContext(context.Background())
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
cleanup := func() {
_ = conn.Close()
server.Stop()
_ = lis.Close()
}
return pushv1.NewPushClient(conn), cleanup
}
func recvOne(t *testing.T, stream pushv1.Push_SubscribePushClient, timeout time.Duration) (*pushv1.PushEvent, error) {
t.Helper()
type result struct {
ev *pushv1.PushEvent
err error
}
ch := make(chan result, 1)
go func() {
ev, err := stream.Recv()
ch <- result{ev, err}
}()
select {
case r := <-ch:
return r.ev, r.err
case <-time.After(timeout):
t.Fatalf("timed out waiting for push event after %s", timeout)
return nil, nil
}
}
func TestSubscribePushDeliversLiveEvents(t *testing.T) {
t.Parallel()
svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 16, PerConnBuffer: 8}, nil, nil)
require.NoError(t, err)
t.Cleanup(svc.Close)
client, cleanup := startBufconnServer(t, svc)
defer cleanup()
stream, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1"})
require.NoError(t, err)
require.Eventually(t, func() bool { return svc.SubscriberCount() == 1 }, time.Second, 5*time.Millisecond)
userID := uuid.New()
require.NoError(t, svc.PublishClientEvent(context.Background(), userID, nil, JSONEvent{EventKind: "k"},"", "", ""))
ev, err := recvOne(t, stream, time.Second)
require.NoError(t, err)
assert.Equal(t, formatCursor(1), ev.Cursor)
assert.Equal(t, userID.String(), ev.GetClientEvent().UserId)
}
func TestSubscribePushReplaysPastEventsOnReconnect(t *testing.T) {
t.Parallel()
svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 16, PerConnBuffer: 8}, nil, nil)
require.NoError(t, err)
t.Cleanup(svc.Close)
userID := uuid.New()
for range 3 {
require.NoError(t, svc.PublishClientEvent(context.Background(), userID, nil, JSONEvent{EventKind: "k"},"", "", ""))
}
client, cleanup := startBufconnServer(t, svc)
defer cleanup()
stream, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1", Cursor: formatCursor(1)})
require.NoError(t, err)
for i := uint64(2); i <= 3; i++ {
ev, err := recvOne(t, stream, time.Second)
require.NoError(t, err)
assert.Equal(t, formatCursor(i), ev.Cursor)
}
}
func TestSubscribePushSkipsReplayWhenCursorStale(t *testing.T) {
t.Parallel()
svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 2, PerConnBuffer: 8}, nil, nil)
require.NoError(t, err)
t.Cleanup(svc.Close)
userID := uuid.New()
for range 4 {
require.NoError(t, svc.PublishClientEvent(context.Background(), userID, nil, JSONEvent{EventKind: "k"},"", "", ""))
}
// Ring capacity 2 means cursors 1 and 2 are evicted.
client, cleanup := startBufconnServer(t, svc)
defer cleanup()
stream, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1", Cursor: formatCursor(1)})
require.NoError(t, err)
require.Eventually(t, func() bool { return svc.SubscriberCount() == 1 }, time.Second, 5*time.Millisecond)
// Stale cursor → no replay; live publish must arrive.
require.NoError(t, svc.PublishClientEvent(context.Background(), userID, nil, JSONEvent{EventKind: "k"},"", "", ""))
ev, err := recvOne(t, stream, time.Second)
require.NoError(t, err)
assert.Equal(t, formatCursor(5), ev.Cursor)
}
func TestSubscribePushReplacesExistingClientID(t *testing.T) {
t.Parallel()
svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 8, PerConnBuffer: 8}, nil, nil)
require.NoError(t, err)
t.Cleanup(svc.Close)
client, cleanup := startBufconnServer(t, svc)
defer cleanup()
stream1, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1"})
require.NoError(t, err)
require.Eventually(t, func() bool { return svc.SubscriberCount() == 1 }, time.Second, 5*time.Millisecond)
stream2, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1"})
require.NoError(t, err)
// First stream must terminate with Aborted.
_, err = recvOne(t, stream1, time.Second)
require.Error(t, err)
assert.Equal(t, codes.Aborted, status.Code(err))
// Subscriber count returns to one (the replacement).
require.Eventually(t, func() bool { return svc.SubscriberCount() == 1 }, time.Second, 5*time.Millisecond)
// Live publish reaches the replacement.
require.NoError(t, svc.PublishClientEvent(context.Background(), uuid.New(), nil, JSONEvent{EventKind: "k"},"", "", ""))
ev, err := recvOne(t, stream2, time.Second)
require.NoError(t, err)
assert.NotEmpty(t, ev.Cursor)
}
func TestSubscribePushRejectsEmptyClientID(t *testing.T) {
t.Parallel()
svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 4, PerConnBuffer: 4}, nil, nil)
require.NoError(t, err)
t.Cleanup(svc.Close)
client, cleanup := startBufconnServer(t, svc)
defer cleanup()
stream, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{})
require.NoError(t, err)
_, err = stream.Recv()
require.Error(t, err)
assert.Equal(t, codes.InvalidArgument, status.Code(err))
}
func TestSubscriptionDeliverDropsOldestOnOverflow(t *testing.T) {
t.Parallel()
sub := &subscription{
clientID: "gw-1",
ch: make(chan *pushv1.PushEvent, 2),
done: make(chan struct{}),
}
first := mkEvent(1, "a")
second := mkEvent(2, "b")
third := mkEvent(3, "c")
assert.False(t, sub.deliver(first))
assert.False(t, sub.deliver(second))
assert.True(t, sub.deliver(third), "third deliver must report a drop")
got1 := <-sub.ch
got2 := <-sub.ch
assert.Equal(t, second, got1, "oldest event (first) was dropped")
assert.Equal(t, third, got2)
}
func TestServiceCloseTerminatesActiveStream(t *testing.T) {
t.Parallel()
svc, err := NewService(ServiceConfig{FreshnessWindow: time.Minute, RingCapacity: 4, PerConnBuffer: 4}, nil, nil)
require.NoError(t, err)
client, cleanup := startBufconnServer(t, svc)
defer cleanup()
stream, err := client.SubscribePush(t.Context(), &pushv1.GatewaySubscribeRequest{GatewayClientId: "gw-1"})
require.NoError(t, err)
require.Eventually(t, func() bool { return svc.SubscriberCount() == 1 }, time.Second, 5*time.Millisecond)
svc.Close()
_, err = recvOne(t, stream, time.Second)
require.Error(t, err)
assert.Equal(t, codes.Aborted, status.Code(err))
}