Files
galaxy-game/gateway/internal/grpcapi/session_lookup_integration_test.go
Ilia Denisov 118f7c17a2 phase 4: connectrpc on the gateway authenticated edge
Replace the native-gRPC server bootstrap with a single
`connectrpc.com/connect` HTTP/h2c listener. Connect-Go natively
serves Connect, gRPC, and gRPC-Web on the same port, so browsers can
now reach the authenticated surface without giving up the gRPC
framing native and desktop clients may use later. The decorator
stack (envelope → session → payload-hash → signature →
freshness/replay → rate-limit → routing/push) is reused unchanged
behind a small Connect → gRPC adapter and a `grpc.ServerStream`
shim around `*connect.ServerStream`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-07 11:49:28 +02:00

251 lines
8.7 KiB
Go

package grpcapi
import (
"context"
"errors"
"testing"
"galaxy/gateway/internal/session"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
func TestExecuteCommandRejectsUnknownSession(t *testing.T) {
t.Parallel()
delegate := &recordingEdgeGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{
lookupFunc: func(context.Context, string) (session.Record, error) {
return session.Record{}, session.ErrNotFound
},
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))
assert.Equal(t, "unknown device session", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
func TestSubscribeEventsRejectsUnknownSession(t *testing.T) {
t.Parallel()
delegate := &recordingEdgeGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{
lookupFunc: func(context.Context, string) (session.Record, error) {
return session.Record{}, session.ErrNotFound
},
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))
assert.Equal(t, "unknown device session", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
func TestExecuteCommandRejectsRevokedSession(t *testing.T) {
t.Parallel()
delegate := &recordingEdgeGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newRevokedSessionRecord(), nil }},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "device session is revoked", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
func TestSubscribeEventsRejectsRevokedSession(t *testing.T) {
t.Parallel()
delegate := &recordingEdgeGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newRevokedSessionRecord(), nil }},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "device session is revoked", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
func TestExecuteCommandRejectsSessionCacheUnavailable(t *testing.T) {
t.Parallel()
delegate := &recordingEdgeGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{
lookupFunc: func(context.Context, string) (session.Record, error) {
return session.Record{}, errors.New("redis down")
},
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "session cache is unavailable", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
func TestSubscribeEventsRejectsSessionCacheUnavailable(t *testing.T) {
t.Parallel()
delegate := &recordingEdgeGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{
lookupFunc: func(context.Context, string) (session.Record, error) {
return session.Record{}, errors.New("redis down")
},
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "session cache is unavailable", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
func TestExecuteCommandAttachesResolvedSession(t *testing.T) {
t.Parallel()
delegate := &recordingEdgeGatewayService{
executeCommandFunc: func(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
record, ok := resolvedSessionFromContext(ctx)
require.True(t, ok)
assert.Equal(t, newActiveSessionRecord(), record)
return &gatewayv1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
},
}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
response, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.NoError(t, err)
assert.Equal(t, "request-123", response.Msg.GetRequestId())
}
func TestSubscribeEventsAttachesResolvedSession(t *testing.T) {
t.Parallel()
delegate := &recordingEdgeGatewayService{
subscribeEventsFunc: func(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
record, ok := resolvedSessionFromContext(stream.Context())
require.True(t, ok)
assert.Equal(t, newActiveSessionRecord(), record)
return nil
},
}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequest()))
require.NoError(t, err)
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
}
func TestSubscribeEventsAttachesAuthenticatedStreamBinding(t *testing.T) {
t.Parallel()
delegate := &recordingEdgeGatewayService{
subscribeEventsFunc: func(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
binding, ok := authenticatedStreamBindingFromContext(stream.Context())
require.True(t, ok)
assert.Equal(t, authenticatedStreamBinding{
UserID: "user-123",
DeviceSessionID: "device-session-123",
MessageType: "gateway.subscribe",
RequestID: "request-123",
TraceID: "trace-123",
}, binding)
return nil
},
}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequest()))
require.NoError(t, err)
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
}
type staticSessionCache struct {
lookupFunc func(context.Context, string) (session.Record, error)
}
func (c staticSessionCache) Lookup(ctx context.Context, deviceSessionID string) (session.Record, error) {
return c.lookupFunc(ctx, deviceSessionID)
}
func (staticSessionCache) MarkRevoked(string) {}
func (staticSessionCache) MarkAllRevokedForUser(string) {}