package backendclient_test import ( "context" "net" "sync" "testing" "time" backendpush "galaxy/backend/push" pushv1 "galaxy/backend/proto/push/v1" "galaxy/gateway/internal/backendclient" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" ) // bufconnPushService starts an in-process backend push.Service backed by // a *grpc.Server on a bufconn listener and returns the dial option that // gateway PushClient should use to connect to it. type bufconnPushService struct { Service *backendpush.Service dial func(context.Context, string) (net.Conn, error) stop func() } func newBufconnPushService(t *testing.T) *bufconnPushService { t.Helper() service, err := backendpush.NewService(backendpush.ServiceConfig{ FreshnessWindow: time.Minute, RingCapacity: 16, PerConnBuffer: 8, }, nil, nil) require.NoError(t, err) listener := bufconn.Listen(1 << 16) server := grpc.NewServer() pushv1.RegisterPushServer(server, service) go func() { _ = server.Serve(listener) }() stop := func() { service.Close() server.Stop() _ = listener.Close() } t.Cleanup(stop) return &bufconnPushService{ Service: service, dial: func(_ context.Context, _ string) (net.Conn, error) { return listener.Dial() }, stop: stop, } } func TestPushClientDeliversClientEventsAndAdvancesCursor(t *testing.T) { t.Parallel() svc := newBufconnPushService(t) type received struct { event *pushv1.PushEvent cursor string } out := make(chan received, 4) cfg := backendclient.Config{ HTTPBaseURL: "http://example.invalid", GRPCPushURL: "passthrough://bufconn", GatewayClientID: "gw-1", HTTPTimeout: time.Second, PushReconnectBaseBackoff: 10 * time.Millisecond, PushReconnectMaxBackoff: 100 * time.Millisecond, } client, err := backendclient.NewPushClient(cfg) require.NoError(t, err) client.WithDialOptions( grpc.WithContextDialer(svc.dial), grpc.WithTransportCredentials(insecure.NewCredentials()), ) client.WithHandler(backendclient.EventHandlerFunc(func(_ context.Context, ev *pushv1.PushEvent) { out <- received{event: ev, cursor: ev.GetCursor()} })) ctx, cancel := context.WithCancel(context.Background()) defer cancel() var ( runErr error wg sync.WaitGroup ) wg.Add(1) go func() { defer wg.Done() runErr = client.Run(ctx) }() // Wait for backend service to register the subscription. require.Eventually(t, func() bool { return svc.Service.SubscriberCount() == 1 }, time.Second, 10*time.Millisecond) userID := uuid.New() require.NoError(t, svc.Service.PublishClientEvent(context.Background(), userID, nil, backendpush.JSONEvent{ EventKind: "lobby.invite.received", Payload: map[string]any{"x": 1.0}, }, "evt-1", "req-1", "trace-1")) select { case got := <-out: ce := got.event.GetClientEvent() require.NotNil(t, ce) assert.Equal(t, userID.String(), ce.GetUserId()) assert.Equal(t, "lobby.invite.received", ce.GetKind()) assert.Equal(t, "evt-1", ce.GetEventId()) assert.Equal(t, "req-1", ce.GetRequestId()) assert.Equal(t, "trace-1", ce.GetTraceId()) assert.NotEmpty(t, got.cursor) case <-time.After(2 * time.Second): t.Fatal("timed out waiting for client event") } require.Eventually(t, func() bool { return client.Cursor() != "" }, time.Second, 10*time.Millisecond) cancel() wg.Wait() if runErr != nil && runErr != context.Canceled { t.Fatalf("unexpected run error: %v", runErr) } }