Files
galaxy-game/gateway/internal/grpcapi/freshness_replay_integration_test.go
T
Ilia Denisov 8565942392
Build · Site / build (push) Successful in 8s
Tests · Go / test (push) Successful in 2m22s
Tests · UI / test (push) Failing after 2m42s
feat(deploy): single-origin path-based deployment + project site
Serve the whole stack behind one host: site at /, game UI at /game/,
gateway REST at /api + /healthz, Connect at /rpc (prefix stripped by the
edge Caddy). The built artifact is domain-agnostic — the UI talks to the
gateway same-origin via relative URLs, so the same bundle runs under any
host with no rebuild and with CORS disabled.

- Rename the Connect proto service galaxy.gateway.v1.EdgeGateway ->
  edge.v1.Gateway; regenerate Go + TS; public path /rpc/edge.v1.Gateway.
- Move the game UI under base path /game (env BASE_PATH); make the
  manifest, service-worker scope, WASM loader, and all navigation
  base-aware via a withBase helper.
- Relative API + /rpc Connect prefix; Vite dev proxy mirrors the strip.
- Rewrite the edge Caddy (dev + prod) for path-based routing; empty CORS
  allow-lists (same-origin); single host.
- New VitePress project site (site/): i18n en/ru with switcher, LaTeX
  math, minimal monospace theme; built and served at /.
- dev-deploy compose/Makefile + CI (dev-deploy, prod-build, new
  site-build) build and seed the site; probes hit /, /game/, /healthz.
- Sync docs (ARCHITECTURE, gateway README/openapi, dev-deploy &
  local-dev READMEs, CLAUDE.md, ui/PLAN).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:19:07 +02:00

448 lines
16 KiB
Go

package grpcapi
import (
"context"
"errors"
"sync"
"testing"
"time"
"galaxy/gateway/internal/replay"
"galaxy/gateway/internal/session"
edgev1 "galaxy/gateway/proto/edge/v1"
"connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
func TestExecuteCommandRejectsStaleTimestamp(t *testing.T) {
t.Parallel()
tests := []struct {
name string
timestampMS int64
}{
{
name: "past window",
timestampMS: testCurrentTime.Add(-testFreshnessWindow - time.Millisecond).UnixMilli(),
},
{
name: "future window",
timestampMS: testCurrentTime.Add(testFreshnessWindow + time.Millisecond).UnixMilli(),
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", tt.timestampMS)))
require.Error(t, err)
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "request timestamp is outside the freshness window", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
})
}
}
func TestSubscribeEventsRejectsStaleTimestamp(t *testing.T) {
t.Parallel()
tests := []struct {
name string
timestampMS int64
}{
{
name: "past window",
timestampMS: testCurrentTime.Add(-testFreshnessWindow - time.Millisecond).UnixMilli(),
},
{
name: "future window",
timestampMS: testCurrentTime.Add(testFreshnessWindow + time.Millisecond).UnixMilli(),
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequestWithTimestamp("device-session-123", "request-123", tt.timestampMS))
require.Error(t, err)
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "request timestamp is outside the freshness window", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
})
}
}
func TestExecuteCommandRejectsReplay(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{
reserveFunc: replayDuplicateBySessionAndRequest(),
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
req := newValidExecuteCommandRequest()
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(req))
require.NoError(t, err)
_, err = client.ExecuteCommand(context.Background(), connect.NewRequest(req))
require.Error(t, err)
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "request replay detected", connectErrorMessage(t, err))
assert.Equal(t, 1, delegate.executeCalls)
}
func TestSubscribeEventsRejectsReplay(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{
reserveFunc: replayDuplicateBySessionAndRequest(),
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
req := newValidSubscribeEventsRequest()
stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(req))
require.NoError(t, err)
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
err = subscribeEventsError(t, context.Background(), client, req)
require.Error(t, err)
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "request replay detected", connectErrorMessage(t, err))
assert.Equal(t, 1, delegate.subscribeCalls)
}
func TestExecuteCommandAllowsSameRequestIDAcrossDistinctSessions(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{
executeCommandFunc: func(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
return &edgev1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
},
}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{
lookupFunc: func(ctx context.Context, deviceSessionID string) (session.Record, error) {
return newActiveSessionRecordWithSessionID(deviceSessionID), nil
},
},
ReplayStore: staticReplayStore{
reserveFunc: replayDuplicateBySessionAndRequest(),
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-123", "request-shared")))
require.NoError(t, err)
_, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-456", "request-shared")))
require.NoError(t, err)
assert.Equal(t, 2, delegate.executeCalls)
}
func TestSubscribeEventsAllowsSameRequestIDAcrossDistinctSessions(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{
subscribeEventsFunc: func(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
return nil
},
}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{
lookupFunc: func(ctx context.Context, deviceSessionID string) (session.Record, error) {
return newActiveSessionRecordWithSessionID(deviceSessionID), nil
},
},
ReplayStore: staticReplayStore{
reserveFunc: replayDuplicateBySessionAndRequest(),
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-123", "request-shared")))
require.NoError(t, err)
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-shared", "trace-123", testCurrentTime.UnixMilli())
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
stream, err = client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-456", "request-shared")))
require.NoError(t, err)
event = recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-shared", "trace-123", testCurrentTime.UnixMilli())
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
assert.Equal(t, 2, delegate.subscribeCalls)
}
func TestExecuteCommandRejectsReplayStoreUnavailable(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{
reserveFunc: func(context.Context, string, string, time.Duration) error {
return errors.New("redis down")
},
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
func TestSubscribeEventsRejectsReplayStoreUnavailable(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{}
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{
reserveFunc: func(context.Context, string, string, time.Duration) error {
return errors.New("redis down")
},
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
func TestExecuteCommandFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{
executeCommandFunc: func(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
return &edgev1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
},
}
var reservedDeviceSessionID string
var reservedRequestID string
var reservedTTL time.Duration
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{
reserveFunc: func(ctx context.Context, deviceSessionID string, requestID string, ttl time.Duration) error {
reservedDeviceSessionID = deviceSessionID
reservedRequestID = requestID
reservedTTL = ttl
return nil
},
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
response, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.NoError(t, err)
assert.Equal(t, "request-123", response.Msg.GetRequestId())
assert.Equal(t, "device-session-123", reservedDeviceSessionID)
assert.Equal(t, "request-123", reservedRequestID)
assert.Equal(t, testFreshnessWindow, reservedTTL)
assert.Equal(t, 1, delegate.executeCalls)
}
func TestSubscribeEventsFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{
subscribeEventsFunc: func(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
return nil
},
}
var reservedTTL time.Duration
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{
reserveFunc: func(ctx context.Context, deviceSessionID string, requestID string, ttl time.Duration) error {
assert.Equal(t, "device-session-123", deviceSessionID)
assert.Equal(t, "request-123", requestID)
reservedTTL = ttl
return nil
},
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequest()))
require.NoError(t, err)
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
assert.Equal(t, testFreshnessWindow, reservedTTL)
assert.Equal(t, 1, delegate.subscribeCalls)
}
func TestExecuteCommandFutureSkewUsesExtendedReplayTTL(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{
executeCommandFunc: func(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
return &edgev1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
},
}
var reservedTTL time.Duration
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{
reserveFunc: func(ctx context.Context, deviceSessionID string, requestID string, ttl time.Duration) error {
reservedTTL = ttl
return nil
},
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(
context.Background(),
connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(2*time.Minute).UnixMilli())),
)
require.NoError(t, err)
assert.Equal(t, 7*time.Minute, reservedTTL)
assert.Equal(t, 1, delegate.executeCalls)
}
func TestExecuteCommandBoundaryFreshnessUsesMinimumReplayTTL(t *testing.T) {
t.Parallel()
delegate := &recordingGatewayService{
executeCommandFunc: func(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
return &edgev1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
},
}
var reservedTTL time.Duration
server, runGateway := newTestGateway(t, ServerDependencies{
Service: delegate,
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
ReplayStore: staticReplayStore{
reserveFunc: func(ctx context.Context, deviceSessionID string, requestID string, ttl time.Duration) error {
reservedTTL = ttl
return nil
},
},
})
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(
context.Background(),
connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(-testFreshnessWindow).UnixMilli())),
)
require.NoError(t, err)
assert.Equal(t, minimumReplayReservationTTL, reservedTTL)
assert.Equal(t, 1, delegate.executeCalls)
}
func replayDuplicateBySessionAndRequest() func(context.Context, string, string, time.Duration) error {
var (
mu sync.Mutex
seen = make(map[string]struct{})
)
return func(ctx context.Context, deviceSessionID string, requestID string, ttl time.Duration) error {
mu.Lock()
defer mu.Unlock()
key := deviceSessionID + "\x00" + requestID
if _, ok := seen[key]; ok {
return replay.ErrDuplicate
}
seen[key] = struct{}{}
return nil
}
}