14b65389ef
Tests · UI / test (push) Successful in 2m35s
Tests · Go / test (push) Successful in 1m56s
Tests · UI / test (pull_request) Has been cancelled
Tests · Integration / integration (pull_request) Successful in 1m42s
Tests · Go / test (pull_request) Successful in 2m0s
Browser fetch-streaming layers close response bodies they consider
idle after roughly 15-30 s without incoming bytes. Safari is the
most aggressive, but the symptom matters everywhere: a quiet
SubscribeEvents stream (lobby, between turns, mailbox empty) gets
torn down by the browser, the EventStream singleton reconnects with
backoff, and any push event that fires inside the reconnect window
is lost because `push.Hub` queues are not persisted across
subscription closes. The user-visible failure mode is the
intermittent "Fetch API cannot load … due to access control checks"
console error (a misleading WebKit symptom — CORS headers are
actually present) plus missed turn-ready / mail-received toasts.
Server-side fix: a silence-based heartbeat at the
`authenticatedPushStreamService` wrapper layer. After the signed
`gateway.server_time` bootstrap event, gateway wraps the bound
stream with `heartbeatingStream`. Every tail Send (fan-out, future
variants) resets the silence timer; when the timer elapses, a
goroutine emits `gateway.heartbeat` with only `EventType` set —
everything else stays at proto3 defaults, so the wire frame is
~45 bytes amortised. A `sendMu` serialises the heartbeat goroutine
with tail Sends because grpc.ServerStream.Send is not goroutine-safe.
The heartbeat is intentionally UNSIGNED: heartbeats carry no
payload, dispatch to no handler on the client, and an injected
heartbeat trivially causes no user-visible state change. TLS still
protects the wire and real events keep the signed envelope
unchanged. Documented in `docs/ARCHITECTURE.md` § 15 alongside the
per-scale bandwidth projection (100…100 000 clients × 15…60 s).
Config: new `GATEWAY_PUSH_HEARTBEAT_INTERVAL` (default `15s`,
`0s` disables). Telemetry: new
`gateway.push.heartbeats_sent{outcome}` counter so operators can
budget bandwidth and spot a sudden `outcome=error` bump as an
upstream-failing-before-flush signal.
Client (`ui/frontend/src/api/events.svelte.ts`): early `continue`
on `event.eventType === "gateway.heartbeat"` before `verifyEvent`,
`verifyPayloadHash`, or dispatch — empty signature would otherwise
trip SignatureError and reconnect. A leading heartbeat still flips
`connectionStatus` to `connected` and resets backoff, because
receiving one is proof the stream is healthy.
Tests:
- `push_heartbeat_test.go`: unit tests for the wrapper — zero
interval returns nil, heartbeat fires after silence, real Send
resets the timer, Stop / context-cancel halt the goroutine,
Send errors propagate.
- `server_test.go`: integration tests through the full gateway
pipeline — heartbeat fires after the configured silence window,
zero interval keeps the stream silent.
- `config_test.go`: default applied, env-override parsed,
negative value rejected.
- `events.test.ts`: heartbeat skipped before verification + not
dispatched to handlers; leading heartbeat still flips
`connectionStatus` to `connected`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
437 lines
12 KiB
TypeScript
437 lines
12 KiB
TypeScript
// Vitest coverage for the SubscribeEvents stream consumer in
|
|
// `src/api/events.svelte.ts`. The tests drive the singleton through
|
|
// its lifecycle with a `createRouterTransport` fake — the same
|
|
// pattern `galaxy-client.test.ts` uses for unary calls, extended to
|
|
// async-generator handlers for server-streaming RPCs.
|
|
//
|
|
// The session store is mocked so `signOut("revoked")` is observable
|
|
// without instantiating the real keystore/IndexedDB chain.
|
|
|
|
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
|
|
import { create } from "@bufbuild/protobuf";
|
|
import {
|
|
Code,
|
|
ConnectError,
|
|
createClient,
|
|
createRouterTransport,
|
|
} from "@connectrpc/connect";
|
|
import {
|
|
EdgeGateway,
|
|
GatewayEventSchema,
|
|
type GatewayEvent,
|
|
} from "../src/proto/galaxy/gateway/v1/edge_gateway_pb";
|
|
|
|
let sessionStatus: "anonymous" | "authenticated" = "anonymous";
|
|
const signOutSpy = vi.fn();
|
|
vi.mock("../src/lib/session-store.svelte", () => ({
|
|
session: {
|
|
get status(): string {
|
|
return sessionStatus;
|
|
},
|
|
signOut: (...args: unknown[]) => signOutSpy(...args),
|
|
},
|
|
}));
|
|
|
|
// The import must come after vi.mock so the module reads the mocked
|
|
// session reference.
|
|
const {
|
|
eventStream,
|
|
} = await import("../src/api/events.svelte");
|
|
|
|
import type { Core } from "../src/platform/core/index";
|
|
import type { DeviceKeypair } from "../src/platform/store/index";
|
|
|
|
beforeEach(() => {
|
|
eventStream.resetForTests();
|
|
signOutSpy.mockReset();
|
|
sessionStatus = "anonymous";
|
|
});
|
|
|
|
afterEach(() => {
|
|
eventStream.resetForTests();
|
|
});
|
|
|
|
function mockCore(overrides?: Partial<Core>): Core {
|
|
return {
|
|
signRequest: () => new Uint8Array([1, 2, 3]),
|
|
verifyResponse: () => true,
|
|
verifyEvent: () => true,
|
|
verifyPayloadHash: () => true,
|
|
driveEffective: () => 0,
|
|
emptyMass: () => 0,
|
|
weaponsBlockMass: () => 0,
|
|
fullMass: () => 0,
|
|
speed: () => 0,
|
|
cargoCapacity: () => 0,
|
|
carryingMass: () => 0,
|
|
blockUpgradeCost: () => 0,
|
|
...overrides,
|
|
} as Core;
|
|
}
|
|
|
|
function mockKeypair(): DeviceKeypair {
|
|
return {
|
|
publicKey: new Uint8Array(32),
|
|
sign: async () => new Uint8Array(64),
|
|
};
|
|
}
|
|
|
|
function buildEvent(eventType: string, payload: Uint8Array): GatewayEvent {
|
|
return create(GatewayEventSchema, {
|
|
eventType,
|
|
eventId: `event-${eventType}-${Math.random().toString(16).slice(2, 8)}`,
|
|
timestampMs: 1n,
|
|
payloadBytes: payload,
|
|
payloadHash: new Uint8Array(32).fill(0xaa),
|
|
signature: new Uint8Array(64).fill(0xbb),
|
|
requestId: "req-1",
|
|
traceId: "trace-1",
|
|
});
|
|
}
|
|
|
|
function makeRouter(
|
|
streamFactory: () => AsyncIterable<GatewayEvent>,
|
|
): ReturnType<typeof createClient<typeof EdgeGateway>> {
|
|
const transport = createRouterTransport(({ service }) => {
|
|
service(EdgeGateway, {
|
|
executeCommand() {
|
|
throw new Error("not used in this test");
|
|
},
|
|
async *subscribeEvents() {
|
|
for await (const e of streamFactory()) {
|
|
yield e;
|
|
}
|
|
},
|
|
});
|
|
});
|
|
return createClient(EdgeGateway, transport);
|
|
}
|
|
|
|
describe("EventStream", () => {
|
|
test("verified events reach the registered handler", async () => {
|
|
const handler = vi.fn();
|
|
eventStream.on("game.turn.ready", handler);
|
|
|
|
const event = buildEvent(
|
|
"game.turn.ready",
|
|
new TextEncoder().encode(JSON.stringify({ game_id: "g", turn: 2 })),
|
|
);
|
|
const client = makeRouter(async function* () {
|
|
yield event;
|
|
});
|
|
|
|
const sleep = vi.fn(async () => {});
|
|
|
|
eventStream.start({
|
|
core: mockCore(),
|
|
keypair: mockKeypair(),
|
|
deviceSessionId: "device-1",
|
|
gatewayResponsePublicKey: new Uint8Array(32),
|
|
client,
|
|
sleep,
|
|
random: () => 0,
|
|
});
|
|
|
|
await vi.waitFor(() => {
|
|
expect(handler).toHaveBeenCalled();
|
|
});
|
|
expect(handler).toHaveBeenCalledTimes(1);
|
|
expect(handler.mock.calls[0]?.[0].eventType).toBe("game.turn.ready");
|
|
eventStream.stop();
|
|
});
|
|
|
|
test("handlers for other event types are not invoked", async () => {
|
|
const turnHandler = vi.fn();
|
|
const mailHandler = vi.fn();
|
|
eventStream.on("game.turn.ready", turnHandler);
|
|
eventStream.on("mail.received", mailHandler);
|
|
|
|
const event = buildEvent(
|
|
"game.turn.ready",
|
|
new TextEncoder().encode("{}"),
|
|
);
|
|
const client = makeRouter(async function* () {
|
|
yield event;
|
|
});
|
|
eventStream.start({
|
|
core: mockCore(),
|
|
keypair: mockKeypair(),
|
|
deviceSessionId: "device-1",
|
|
gatewayResponsePublicKey: new Uint8Array(32),
|
|
client,
|
|
sleep: async () => {},
|
|
random: () => 0,
|
|
});
|
|
|
|
await vi.waitFor(() => {
|
|
expect(turnHandler).toHaveBeenCalled();
|
|
});
|
|
expect(mailHandler).not.toHaveBeenCalled();
|
|
eventStream.stop();
|
|
});
|
|
|
|
test("unsubscribe removes the handler", async () => {
|
|
const handler = vi.fn();
|
|
const off = eventStream.on("game.turn.ready", handler);
|
|
off();
|
|
|
|
const event = buildEvent(
|
|
"game.turn.ready",
|
|
new TextEncoder().encode("{}"),
|
|
);
|
|
const client = makeRouter(async function* () {
|
|
yield event;
|
|
});
|
|
const sleepSpy = vi.fn(async () => {});
|
|
eventStream.start({
|
|
core: mockCore(),
|
|
keypair: mockKeypair(),
|
|
deviceSessionId: "device-1",
|
|
gatewayResponsePublicKey: new Uint8Array(32),
|
|
client,
|
|
sleep: sleepSpy,
|
|
random: () => 0,
|
|
});
|
|
|
|
await vi.waitFor(() => {
|
|
// Stream finished — either status became idle, or the loop
|
|
// is at backoff after a clean close on an anonymous
|
|
// session (which goes straight to idle as well).
|
|
expect(eventStream.connectionStatus).toBe("idle");
|
|
});
|
|
expect(handler).not.toHaveBeenCalled();
|
|
eventStream.stop();
|
|
});
|
|
|
|
test("bad signature tears down the stream and reconnects", async () => {
|
|
const handler = vi.fn();
|
|
eventStream.on("game.turn.ready", handler);
|
|
|
|
let verifyCalls = 0;
|
|
const core = mockCore({
|
|
verifyEvent: () => {
|
|
verifyCalls += 1;
|
|
return verifyCalls > 1; // first event fails, then passes
|
|
},
|
|
});
|
|
|
|
let streamCalls = 0;
|
|
const client = makeRouter(async function* () {
|
|
streamCalls += 1;
|
|
yield buildEvent(
|
|
"game.turn.ready",
|
|
new TextEncoder().encode("{}"),
|
|
);
|
|
});
|
|
|
|
const sleepCalls: number[] = [];
|
|
eventStream.start({
|
|
core,
|
|
keypair: mockKeypair(),
|
|
deviceSessionId: "device-1",
|
|
gatewayResponsePublicKey: new Uint8Array(32),
|
|
client,
|
|
sleep: async (ms) => {
|
|
sleepCalls.push(ms);
|
|
},
|
|
random: () => 0, // full-jitter = 0 → instant retry
|
|
});
|
|
|
|
await vi.waitFor(() => {
|
|
expect(handler).toHaveBeenCalled();
|
|
});
|
|
// Two stream openings: first one rejected on bad signature,
|
|
// second one delivered the good event.
|
|
expect(streamCalls).toBeGreaterThanOrEqual(2);
|
|
// Backoff was scheduled between attempts.
|
|
expect(sleepCalls.length).toBeGreaterThanOrEqual(1);
|
|
eventStream.stop();
|
|
});
|
|
|
|
test("unauthenticated error signs the session out", async () => {
|
|
sessionStatus = "authenticated";
|
|
const client = makeRouter(async function* () {
|
|
yield* [];
|
|
throw new ConnectError("revoked", Code.Unauthenticated);
|
|
});
|
|
eventStream.start({
|
|
core: mockCore(),
|
|
keypair: mockKeypair(),
|
|
deviceSessionId: "device-1",
|
|
gatewayResponsePublicKey: new Uint8Array(32),
|
|
client,
|
|
sleep: async () => {},
|
|
random: () => 0,
|
|
});
|
|
|
|
await vi.waitFor(() => {
|
|
expect(signOutSpy).toHaveBeenCalled();
|
|
});
|
|
expect(signOutSpy).toHaveBeenCalledWith("revoked");
|
|
eventStream.stop();
|
|
});
|
|
|
|
test("clean end-of-stream on an authenticated session is the revocation signal", async () => {
|
|
sessionStatus = "authenticated";
|
|
const client = makeRouter(async function* () {
|
|
yield* [];
|
|
});
|
|
eventStream.start({
|
|
core: mockCore(),
|
|
keypair: mockKeypair(),
|
|
deviceSessionId: "device-1",
|
|
gatewayResponsePublicKey: new Uint8Array(32),
|
|
client,
|
|
sleep: async () => {},
|
|
random: () => 0,
|
|
});
|
|
await vi.waitFor(() => {
|
|
expect(signOutSpy).toHaveBeenCalledWith("revoked");
|
|
});
|
|
eventStream.stop();
|
|
});
|
|
|
|
test("game.paused events dispatch to the matching handler (Phase 25)", async () => {
|
|
const handler = vi.fn();
|
|
eventStream.on("game.paused", handler);
|
|
const payload = new TextEncoder().encode(
|
|
JSON.stringify({
|
|
game_id: "11111111-2222-3333-4444-555555555555",
|
|
turn: 7,
|
|
reason: "generation_failed",
|
|
}),
|
|
);
|
|
const event = buildEvent("game.paused", payload);
|
|
const client = makeRouter(async function* () {
|
|
yield event;
|
|
});
|
|
eventStream.start({
|
|
core: mockCore(),
|
|
keypair: mockKeypair(),
|
|
deviceSessionId: "device-1",
|
|
gatewayResponsePublicKey: new Uint8Array(32),
|
|
client,
|
|
sleep: async () => {},
|
|
random: () => 0,
|
|
});
|
|
await vi.waitFor(() => {
|
|
expect(handler).toHaveBeenCalled();
|
|
});
|
|
expect(handler.mock.calls[0]?.[0].eventType).toBe("game.paused");
|
|
eventStream.stop();
|
|
});
|
|
|
|
test("gateway.heartbeat is short-circuited before verification and dispatch", async () => {
|
|
const handler = vi.fn();
|
|
eventStream.on("game.turn.ready", handler);
|
|
// A heartbeat with an empty signature (matching the unsigned
|
|
// wire shape the gateway emits) would normally trip
|
|
// `verifyEvent` and tear the stream down with a SignatureError.
|
|
// The short-circuit skips both verification and dispatch.
|
|
const heartbeat = create(GatewayEventSchema, {
|
|
eventType: "gateway.heartbeat",
|
|
eventId: "",
|
|
timestampMs: 0n,
|
|
payloadBytes: new Uint8Array(0),
|
|
payloadHash: new Uint8Array(0),
|
|
signature: new Uint8Array(0),
|
|
requestId: "",
|
|
traceId: "",
|
|
});
|
|
const realEvent = buildEvent(
|
|
"game.turn.ready",
|
|
new TextEncoder().encode("{}"),
|
|
);
|
|
|
|
const verifyEventSpy = vi.fn(() => true);
|
|
const verifyPayloadHashSpy = vi.fn(() => true);
|
|
const core = mockCore({
|
|
verifyEvent: verifyEventSpy,
|
|
verifyPayloadHash: verifyPayloadHashSpy,
|
|
});
|
|
|
|
const client = makeRouter(async function* () {
|
|
yield heartbeat;
|
|
yield realEvent;
|
|
});
|
|
eventStream.start({
|
|
core,
|
|
keypair: mockKeypair(),
|
|
deviceSessionId: "device-1",
|
|
gatewayResponsePublicKey: new Uint8Array(32),
|
|
client,
|
|
sleep: async () => {},
|
|
random: () => 0,
|
|
});
|
|
|
|
await vi.waitFor(() => {
|
|
expect(handler).toHaveBeenCalled();
|
|
});
|
|
// Verification ran exactly once — for the real event, never
|
|
// for the heartbeat. The handler also only sees the real one.
|
|
expect(verifyEventSpy).toHaveBeenCalledTimes(1);
|
|
expect(verifyPayloadHashSpy).toHaveBeenCalledTimes(1);
|
|
expect(handler).toHaveBeenCalledTimes(1);
|
|
expect(handler.mock.calls[0]?.[0].eventType).toBe("game.turn.ready");
|
|
eventStream.stop();
|
|
});
|
|
|
|
test("a leading heartbeat still flips connectionStatus to connected", async () => {
|
|
const heartbeat = create(GatewayEventSchema, {
|
|
eventType: "gateway.heartbeat",
|
|
});
|
|
// The stream yields only a heartbeat then waits for the test
|
|
// to abort the consumer — the `connected` transition must not
|
|
// require a real event behind the heartbeat.
|
|
const pending = new Promise<never>(() => {});
|
|
const client = makeRouter(async function* () {
|
|
yield heartbeat;
|
|
await pending;
|
|
});
|
|
eventStream.start({
|
|
core: mockCore(),
|
|
keypair: mockKeypair(),
|
|
deviceSessionId: "device-1",
|
|
gatewayResponsePublicKey: new Uint8Array(32),
|
|
client,
|
|
sleep: async () => {},
|
|
random: () => 0,
|
|
});
|
|
await vi.waitFor(() => {
|
|
expect(eventStream.connectionStatus).toBe("connected");
|
|
});
|
|
eventStream.stop();
|
|
});
|
|
|
|
test("connectionStatus transitions through connecting → connected → idle", async () => {
|
|
expect(eventStream.connectionStatus).toBe("idle");
|
|
const event = buildEvent(
|
|
"game.turn.ready",
|
|
new TextEncoder().encode("{}"),
|
|
);
|
|
const observed: string[] = [];
|
|
const client = makeRouter(async function* () {
|
|
yield event;
|
|
});
|
|
const handler = vi.fn(() => {
|
|
observed.push(eventStream.connectionStatus);
|
|
});
|
|
eventStream.on("game.turn.ready", handler);
|
|
eventStream.start({
|
|
core: mockCore(),
|
|
keypair: mockKeypair(),
|
|
deviceSessionId: "device-1",
|
|
gatewayResponsePublicKey: new Uint8Array(32),
|
|
client,
|
|
sleep: async () => {},
|
|
random: () => 0,
|
|
});
|
|
await vi.waitFor(() => {
|
|
expect(handler).toHaveBeenCalled();
|
|
});
|
|
// Inside the handler, status had already flipped to connected.
|
|
expect(observed).toContain("connected");
|
|
eventStream.stop();
|
|
});
|
|
});
|