Files
galaxy-game/gateway/internal/backendclient/push_client_test.go
T
2026-05-07 00:58:53 +03:00

136 lines
3.5 KiB
Go

package backendclient_test
import (
"context"
"net"
"sync"
"testing"
"time"
backendpush "galaxy/backend/push"
pushv1 "galaxy/backend/proto/push/v1"
"galaxy/gateway/internal/backendclient"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)
// bufconnPushService starts an in-process backend push.Service backed by
// a *grpc.Server on a bufconn listener and returns the dial option that
// gateway PushClient should use to connect to it.
type bufconnPushService struct {
Service *backendpush.Service
dial func(context.Context, string) (net.Conn, error)
stop func()
}
func newBufconnPushService(t *testing.T) *bufconnPushService {
t.Helper()
service, err := backendpush.NewService(backendpush.ServiceConfig{
FreshnessWindow: time.Minute,
RingCapacity: 16,
PerConnBuffer: 8,
}, nil, nil)
require.NoError(t, err)
listener := bufconn.Listen(1 << 16)
server := grpc.NewServer()
pushv1.RegisterPushServer(server, service)
go func() {
_ = server.Serve(listener)
}()
stop := func() {
service.Close()
server.Stop()
_ = listener.Close()
}
t.Cleanup(stop)
return &bufconnPushService{
Service: service,
dial: func(_ context.Context, _ string) (net.Conn, error) { return listener.Dial() },
stop: stop,
}
}
func TestPushClientDeliversClientEventsAndAdvancesCursor(t *testing.T) {
t.Parallel()
svc := newBufconnPushService(t)
type received struct {
event *pushv1.PushEvent
cursor string
}
out := make(chan received, 4)
cfg := backendclient.Config{
HTTPBaseURL: "http://example.invalid",
GRPCPushURL: "passthrough://bufconn",
GatewayClientID: "gw-1",
HTTPTimeout: time.Second,
PushReconnectBaseBackoff: 10 * time.Millisecond,
PushReconnectMaxBackoff: 100 * time.Millisecond,
}
client, err := backendclient.NewPushClient(cfg)
require.NoError(t, err)
client.WithDialOptions(
grpc.WithContextDialer(svc.dial),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
client.WithHandler(backendclient.EventHandlerFunc(func(_ context.Context, ev *pushv1.PushEvent) {
out <- received{event: ev, cursor: ev.GetCursor()}
}))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var (
runErr error
wg sync.WaitGroup
)
wg.Add(1)
go func() {
defer wg.Done()
runErr = client.Run(ctx)
}()
// Wait for backend service to register the subscription.
require.Eventually(t, func() bool { return svc.Service.SubscriberCount() == 1 }, time.Second, 10*time.Millisecond)
userID := uuid.New()
require.NoError(t, svc.Service.PublishClientEvent(context.Background(), userID, nil, backendpush.JSONEvent{
EventKind: "lobby.invite.received",
Payload: map[string]any{"x": 1.0},
}, "evt-1", "req-1", "trace-1"))
select {
case got := <-out:
ce := got.event.GetClientEvent()
require.NotNil(t, ce)
assert.Equal(t, userID.String(), ce.GetUserId())
assert.Equal(t, "lobby.invite.received", ce.GetKind())
assert.Equal(t, "evt-1", ce.GetEventId())
assert.Equal(t, "req-1", ce.GetRequestId())
assert.Equal(t, "trace-1", ce.GetTraceId())
assert.NotEmpty(t, got.cursor)
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for client event")
}
require.Eventually(t, func() bool { return client.Cursor() != "" }, time.Second, 10*time.Millisecond)
cancel()
wg.Wait()
if runErr != nil && runErr != context.Canceled {
t.Fatalf("unexpected run error: %v", runErr)
}
}