ui/phase-24: push events, turn-ready toast, single SubscribeEvents consumer

Wires the gateway's signed SubscribeEvents stream end-to-end:

- backend: emit game.turn.ready from lobby.OnRuntimeSnapshot on every
  current_turn advance, addressed to every active membership, push-only
  channel, idempotency key turn-ready:<game_id>:<turn>;
- ui: single EventStream singleton replaces revocation-watcher.ts and
  carries both per-event dispatch and revocation detection; toast
  primitive (store + host) lives in lib/; GameStateStore gains
  pendingTurn/markPendingTurn/advanceToPending and a persisted
  lastViewedTurn so a return after multiple turns surfaces the same
  "view now" affordance as a live push event;
- mandatory event-signature verification through ui/core
  (verifyPayloadHash + verifyEvent), full-jitter exponential backoff
  1s -> 30s on transient failure, signOut("revoked") on
  Unauthenticated or clean end-of-stream;
- catalog and migration accept the new kind; tests cover producer
  (testcontainers + capturing publisher), consumer (Vitest event
  stream, toast, game-state extensions), and a Playwright e2e
  delivering a signed frame to the live UI.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Ilia Denisov
2026-05-11 16:16:31 +02:00
parent 5a2a977dc6
commit 5b07bb4e14
26 changed files with 2181 additions and 209 deletions
+376
View File
@@ -0,0 +1,376 @@
// `EventStream` is the single SubscribeEvents consumer for the
// authenticated UI session. It opens one server-streaming RPC against
// the gateway, verifies every incoming event (payload-hash +
// Ed25519 signature) through `ui/core`, dispatches verified events to
// type-keyed handlers, and reconnects with full-jitter exponential
// backoff on transient failure.
//
// Phase 24 introduces this module in place of `revocation-watcher.ts`.
// The watcher's revocation semantics are absorbed: a clean
// end-of-stream while the session is authenticated, or an
// `Unauthenticated` ConnectError, both call `session.signOut("revoked")`.
// Per-event-type dispatch (turn-ready toasts in this phase; battle and
// mail toasts in later phases) is registered through `on(eventType,
// handler)`.
//
// The store exposes `connectionStatus` as a Svelte rune so the
// connection-state indicator in the shell header (see PLAN.md
// cross-cutting shell) can subscribe without ceremony. The indicator
// itself is not part of Phase 24, but the rune is wired here so the
// next phase that adds the dot can read it directly.
import { create } from "@bufbuild/protobuf";
import { ConnectError } from "@connectrpc/connect";
import type { Core } from "../platform/core/index";
import type { DeviceKeypair } from "../platform/store/index";
import {
GatewayEventSchema,
SubscribeEventsRequestSchema,
type GatewayEvent,
} from "../proto/galaxy/gateway/v1/edge_gateway_pb";
import { GATEWAY_BASE_URL } from "../lib/env";
import { session } from "../lib/session-store.svelte";
import { createEdgeGatewayClient, type EdgeGatewayClient } from "./connect";
const PROTOCOL_VERSION = "v1";
const SUBSCRIBE_MESSAGE_TYPE = "gateway.subscribe";
// Connect error code numerical values used by the watcher. The full
// enum lives in `@connectrpc/connect` but importing the runtime enum
// would pull a large surface into this small module.
const CONNECT_CODE_CANCELED = 1;
const CONNECT_CODE_UNAUTHENTICATED = 16;
const BACKOFF_BASE_MS = 1_000;
const BACKOFF_MAX_MS = 30_000;
/**
* VerifiedEvent is the verified projection of a `GatewayEvent` handed
* to user handlers. The signature and payload-hash fields are dropped
* because verification has already succeeded; consumers only need the
* envelope plus the opaque payload bytes.
*/
export interface VerifiedEvent {
eventType: string;
eventId: string;
timestampMs: bigint;
requestId: string;
traceId: string;
payloadBytes: Uint8Array;
}
export type EventHandler = (event: VerifiedEvent) => void;
export type ConnectionStatus =
| "idle"
| "connecting"
| "connected"
| "reconnecting"
| "offline";
/**
* EventStreamStartOptions carries the live primitives the stream
* consumer cannot resolve by itself. Production code reads `core`,
* `keypair`, and `deviceSessionId` from the session store and the
* gateway public key from `lib/env`; tests inject a fake
* `EdgeGatewayClient` and deterministic `sleep`/`random` to drive
* backoff in fake-timer mode.
*/
export interface EventStreamStartOptions {
core: Core;
keypair: DeviceKeypair;
deviceSessionId: string;
gatewayResponsePublicKey: Uint8Array;
/** Custom transport client. Defaults to `createEdgeGatewayClient(GATEWAY_BASE_URL)`. */
client?: EdgeGatewayClient;
/** Sleep hook for tests; defaults to a real-time `setTimeout`. */
sleep?: (ms: number) => Promise<void>;
/** Random source for full-jitter backoff; defaults to `Math.random`. */
random?: () => number;
/** Function reporting `navigator.onLine`; defaults to the browser global. */
onlineProbe?: () => boolean;
}
/**
* SignatureError marks a verification failure (payload-hash mismatch
* or invalid Ed25519 signature). The stream loop classifies it as a
* forgery and reconnects through the same backoff path used for
* transient transport errors.
*/
export class SignatureError extends Error {
constructor(message: string) {
super(message);
this.name = "SignatureError";
}
}
export class EventStream {
connectionStatus: ConnectionStatus = $state("idle");
private handlers = new Map<string, Set<EventHandler>>();
private controller: AbortController | null = null;
private running = false;
/**
* on registers a handler for a specific event type. Returns a
* disposer that removes the handler. Multiple handlers per type
* are supported so future phases (battle, mail) can subscribe
* alongside turn-ready without coordination.
*/
on(eventType: string, handler: EventHandler): () => void {
let bucket = this.handlers.get(eventType);
if (bucket === undefined) {
bucket = new Set();
this.handlers.set(eventType, bucket);
}
bucket.add(handler);
return () => {
const current = this.handlers.get(eventType);
if (current === undefined) {
return;
}
current.delete(handler);
if (current.size === 0) {
this.handlers.delete(eventType);
}
};
}
/**
* start opens the stream. Calling start while the stream is
* already running is a no-op so the root layout's `$effect`-based
* lifecycle stays idempotent across re-renders.
*/
start(opts: EventStreamStartOptions): void {
if (this.running) {
return;
}
this.running = true;
this.controller = new AbortController();
void this.run(opts, this.controller.signal);
}
/**
* stop tears down the stream. Used by the root layout on logout
* or unmount. Re-calling start after stop opens a fresh stream.
*/
stop(): void {
this.running = false;
if (this.controller !== null) {
this.controller.abort();
this.controller = null;
}
this.connectionStatus = "idle";
}
/**
* resetForTests is used by the Vitest harness to forget all
* handlers and force the rune back to `idle` between cases.
*/
resetForTests(): void {
this.stop();
this.handlers.clear();
}
private async run(
opts: EventStreamStartOptions,
signal: AbortSignal,
): Promise<void> {
const sleep = opts.sleep ?? defaultSleep;
const random = opts.random ?? Math.random;
const onlineProbe = opts.onlineProbe ?? defaultOnlineProbe;
const client = opts.client ?? createEdgeGatewayClient(GATEWAY_BASE_URL);
let attempt = 0;
while (!signal.aborted && this.running) {
this.connectionStatus = "connecting";
let stream: AsyncIterable<GatewayEvent>;
try {
stream = await openStream(client, opts, signal);
} catch (err) {
if (signal.aborted) {
return;
}
if (handleAuthenticationError(err)) {
return;
}
this.connectionStatus = onlineProbe() ? "reconnecting" : "offline";
await sleep(backoffDelay(attempt, random));
attempt += 1;
continue;
}
let firstEventSeen = false;
let terminated = false;
try {
for await (const event of stream) {
if (signal.aborted) {
return;
}
this.verifyEvent(event, opts);
if (!firstEventSeen) {
firstEventSeen = true;
this.connectionStatus = "connected";
attempt = 0;
}
this.dispatch(event);
}
terminated = true;
} catch (err) {
if (signal.aborted) {
return;
}
if (handleAuthenticationError(err)) {
return;
}
this.connectionStatus = onlineProbe() ? "reconnecting" : "offline";
await sleep(backoffDelay(attempt, random));
attempt += 1;
continue;
}
if (terminated) {
// Clean end-of-stream on an authenticated session is the
// gateway's documented session-invalidation signal.
if (session.status === "authenticated") {
await session.signOut("revoked");
return;
}
this.connectionStatus = "idle";
return;
}
}
}
private verifyEvent(event: GatewayEvent, opts: EventStreamStartOptions): void {
if (!opts.core.verifyPayloadHash(event.payloadBytes, event.payloadHash)) {
throw new SignatureError("event payload_hash mismatch");
}
const ok = opts.core.verifyEvent(
opts.gatewayResponsePublicKey,
event.signature,
{
eventType: event.eventType,
eventId: event.eventId,
timestampMs: event.timestampMs,
requestId: event.requestId,
traceId: event.traceId,
payloadHash: event.payloadHash,
},
);
if (!ok) {
throw new SignatureError("event signature verification failed");
}
}
private dispatch(event: GatewayEvent): void {
const bucket = this.handlers.get(event.eventType);
if (bucket === undefined || bucket.size === 0) {
return;
}
const projection: VerifiedEvent = {
eventType: event.eventType,
eventId: event.eventId,
timestampMs: event.timestampMs,
requestId: event.requestId,
traceId: event.traceId,
payloadBytes: event.payloadBytes,
};
for (const handler of [...bucket]) {
try {
handler(projection);
} catch (err) {
console.info("events: handler threw", event.eventType, err);
}
}
}
}
async function openStream(
client: EdgeGatewayClient,
opts: EventStreamStartOptions,
signal: AbortSignal,
): Promise<AsyncIterable<GatewayEvent>> {
const requestId = newRequestId();
const timestampMs = BigInt(Date.now());
const emptyPayload = new Uint8Array();
const payloadHash = await sha256(emptyPayload);
const canonical = opts.core.signRequest({
protocolVersion: PROTOCOL_VERSION,
deviceSessionId: opts.deviceSessionId,
messageType: SUBSCRIBE_MESSAGE_TYPE,
timestampMs,
requestId,
payloadHash,
});
const signature = await opts.keypair.sign(canonical);
const request = create(SubscribeEventsRequestSchema, {
protocolVersion: PROTOCOL_VERSION,
deviceSessionId: opts.deviceSessionId,
messageType: SUBSCRIBE_MESSAGE_TYPE,
timestampMs,
requestId,
payloadHash,
signature,
payloadBytes: emptyPayload,
});
return client.subscribeEvents(request, { signal });
}
function handleAuthenticationError(err: unknown): boolean {
if (!(err instanceof ConnectError)) {
return false;
}
if (err.code === CONNECT_CODE_CANCELED) {
return true;
}
if (err.code === CONNECT_CODE_UNAUTHENTICATED) {
void session.signOut("revoked");
return true;
}
return false;
}
function backoffDelay(attempt: number, random: () => number): number {
const cap = Math.min(BACKOFF_MAX_MS, BACKOFF_BASE_MS * 2 ** attempt);
return Math.floor(random() * cap);
}
function defaultSleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function defaultOnlineProbe(): boolean {
if (typeof navigator === "undefined") {
return true;
}
return navigator.onLine !== false;
}
async function sha256(payload: Uint8Array): Promise<Uint8Array> {
const digest = await crypto.subtle.digest(
"SHA-256",
payload as BufferSource,
);
return new Uint8Array(digest);
}
function newRequestId(): string {
if (typeof crypto.randomUUID === "function") {
return crypto.randomUUID();
}
const buf = new Uint8Array(16);
crypto.getRandomValues(buf);
let hex = "";
for (let i = 0; i < buf.length; i++) {
hex += buf[i]!.toString(16).padStart(2, "0");
}
return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(16, 20)}-${hex.slice(20, 32)}`;
}
/**
* eventStream is the singleton stream consumer the root layout starts
* once the session becomes authenticated and stops on logout. Tests
* call `resetForTests()` between cases.
*/
export const eventStream = new EventStream();