package grpcapi import ( "context" "errors" "sync" "testing" "time" "galaxy/gateway/internal/replay" "galaxy/gateway/internal/session" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" "connectrpc.com/connect" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" ) func TestExecuteCommandRejectsStaleTimestamp(t *testing.T) { t.Parallel() tests := []struct { name string timestampMS int64 }{ { name: "past window", timestampMS: testCurrentTime.Add(-testFreshnessWindow - time.Millisecond).UnixMilli(), }, { name: "future window", timestampMS: testCurrentTime.Add(testFreshnessWindow + time.Millisecond).UnixMilli(), }, } for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{} server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }}, ReplayStore: staticReplayStore{}, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", tt.timestampMS))) require.Error(t, err) assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) assert.Equal(t, "request timestamp is outside the freshness window", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) }) } } func TestSubscribeEventsRejectsStaleTimestamp(t *testing.T) { t.Parallel() tests := []struct { name string timestampMS int64 }{ { name: "past window", timestampMS: testCurrentTime.Add(-testFreshnessWindow - time.Millisecond).UnixMilli(), }, { name: "future window", timestampMS: testCurrentTime.Add(testFreshnessWindow + time.Millisecond).UnixMilli(), }, } for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{} server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }}, ReplayStore: staticReplayStore{}, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequestWithTimestamp("device-session-123", "request-123", tt.timestampMS)) require.Error(t, err) assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) assert.Equal(t, "request timestamp is outside the freshness window", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) }) } } func TestExecuteCommandRejectsReplay(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{} server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }}, ReplayStore: staticReplayStore{ reserveFunc: replayDuplicateBySessionAndRequest(), }, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) req := newValidExecuteCommandRequest() _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(req)) require.NoError(t, err) _, err = client.ExecuteCommand(context.Background(), connect.NewRequest(req)) require.Error(t, err) assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) assert.Equal(t, "request replay detected", connectErrorMessage(t, err)) assert.Equal(t, 1, delegate.executeCalls) } func TestSubscribeEventsRejectsReplay(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{} server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }}, ReplayStore: staticReplayStore{ reserveFunc: replayDuplicateBySessionAndRequest(), }, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) req := newValidSubscribeEventsRequest() stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(req)) require.NoError(t, err) event := recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli()) require.False(t, stream.Receive()) require.NoError(t, stream.Err()) err = subscribeEventsError(t, context.Background(), client, req) require.Error(t, err) assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) assert.Equal(t, "request replay detected", connectErrorMessage(t, err)) assert.Equal(t, 1, delegate.subscribeCalls) } func TestExecuteCommandAllowsSameRequestIDAcrossDistinctSessions(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{ executeCommandFunc: func(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) { return &gatewayv1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil }, } server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{ lookupFunc: func(ctx context.Context, deviceSessionID string) (session.Record, error) { return newActiveSessionRecordWithSessionID(deviceSessionID), nil }, }, ReplayStore: staticReplayStore{ reserveFunc: replayDuplicateBySessionAndRequest(), }, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-123", "request-shared"))) require.NoError(t, err) _, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-456", "request-shared"))) require.NoError(t, err) assert.Equal(t, 2, delegate.executeCalls) } func TestSubscribeEventsAllowsSameRequestIDAcrossDistinctSessions(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{ subscribeEventsFunc: func(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error { return nil }, } server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{ lookupFunc: func(ctx context.Context, deviceSessionID string) (session.Record, error) { return newActiveSessionRecordWithSessionID(deviceSessionID), nil }, }, ReplayStore: staticReplayStore{ reserveFunc: replayDuplicateBySessionAndRequest(), }, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-123", "request-shared"))) require.NoError(t, err) event := recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-shared", "trace-123", testCurrentTime.UnixMilli()) require.False(t, stream.Receive()) require.NoError(t, stream.Err()) stream, err = client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-456", "request-shared"))) require.NoError(t, err) event = recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-shared", "trace-123", testCurrentTime.UnixMilli()) require.False(t, stream.Receive()) require.NoError(t, stream.Err()) assert.Equal(t, 2, delegate.subscribeCalls) } func TestExecuteCommandRejectsReplayStoreUnavailable(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{} server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }}, ReplayStore: staticReplayStore{ reserveFunc: func(context.Context, string, string, time.Duration) error { return errors.New("redis down") }, }, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) } func TestSubscribeEventsRejectsReplayStoreUnavailable(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{} server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }}, ReplayStore: staticReplayStore{ reserveFunc: func(context.Context, string, string, time.Duration) error { return errors.New("redis down") }, }, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest()) require.Error(t, err) assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) } func TestExecuteCommandFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{ executeCommandFunc: func(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) { return &gatewayv1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil }, } var reservedDeviceSessionID string var reservedRequestID string var reservedTTL time.Duration server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }}, ReplayStore: staticReplayStore{ reserveFunc: func(ctx context.Context, deviceSessionID string, requestID string, ttl time.Duration) error { reservedDeviceSessionID = deviceSessionID reservedRequestID = requestID reservedTTL = ttl return nil }, }, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) response, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.NoError(t, err) assert.Equal(t, "request-123", response.Msg.GetRequestId()) assert.Equal(t, "device-session-123", reservedDeviceSessionID) assert.Equal(t, "request-123", reservedRequestID) assert.Equal(t, testFreshnessWindow, reservedTTL) assert.Equal(t, 1, delegate.executeCalls) } func TestSubscribeEventsFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{ subscribeEventsFunc: func(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error { return nil }, } var reservedTTL time.Duration server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }}, ReplayStore: staticReplayStore{ reserveFunc: func(ctx context.Context, deviceSessionID string, requestID string, ttl time.Duration) error { assert.Equal(t, "device-session-123", deviceSessionID) assert.Equal(t, "request-123", requestID) reservedTTL = ttl return nil }, }, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequest())) require.NoError(t, err) event := recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli()) require.False(t, stream.Receive()) require.NoError(t, stream.Err()) assert.Equal(t, testFreshnessWindow, reservedTTL) assert.Equal(t, 1, delegate.subscribeCalls) } func TestExecuteCommandFutureSkewUsesExtendedReplayTTL(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{ executeCommandFunc: func(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) { return &gatewayv1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil }, } var reservedTTL time.Duration server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }}, ReplayStore: staticReplayStore{ reserveFunc: func(ctx context.Context, deviceSessionID string, requestID string, ttl time.Duration) error { reservedTTL = ttl return nil }, }, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) _, err := client.ExecuteCommand( context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(2*time.Minute).UnixMilli())), ) require.NoError(t, err) assert.Equal(t, 7*time.Minute, reservedTTL) assert.Equal(t, 1, delegate.executeCalls) } func TestExecuteCommandBoundaryFreshnessUsesMinimumReplayTTL(t *testing.T) { t.Parallel() delegate := &recordingEdgeGatewayService{ executeCommandFunc: func(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) { return &gatewayv1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil }, } var reservedTTL time.Duration server, runGateway := newTestGateway(t, ServerDependencies{ Service: delegate, SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }}, ReplayStore: staticReplayStore{ reserveFunc: func(ctx context.Context, deviceSessionID string, requestID string, ttl time.Duration) error { reservedTTL = ttl return nil }, }, }) defer runGateway.stop(t) addr := waitForListenAddr(t, server) client := newEdgeClient(t, addr) _, err := client.ExecuteCommand( context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(-testFreshnessWindow).UnixMilli())), ) require.NoError(t, err) assert.Equal(t, minimumReplayReservationTTL, reservedTTL) assert.Equal(t, 1, delegate.executeCalls) } func replayDuplicateBySessionAndRequest() func(context.Context, string, string, time.Duration) error { var ( mu sync.Mutex seen = make(map[string]struct{}) ) return func(ctx context.Context, deviceSessionID string, requestID string, ttl time.Duration) error { mu.Lock() defer mu.Unlock() key := deviceSessionID + "\x00" + requestID if _, ok := seen[key]; ok { return replay.ErrDuplicate } seen[key] = struct{}{} return nil } }