Files
galaxy-game/integration/testenv/connect_client.go
T
Ilia Denisov 8565942392
Build · Site / build (push) Successful in 8s
Tests · Go / test (push) Successful in 2m22s
Tests · UI / test (push) Failing after 2m42s
feat(deploy): single-origin path-based deployment + project site
Serve the whole stack behind one host: site at /, game UI at /game/,
gateway REST at /api + /healthz, Connect at /rpc (prefix stripped by the
edge Caddy). The built artifact is domain-agnostic — the UI talks to the
gateway same-origin via relative URLs, so the same bundle runs under any
host with no rebuild and with CORS disabled.

- Rename the Connect proto service galaxy.gateway.v1.EdgeGateway ->
  edge.v1.Gateway; regenerate Go + TS; public path /rpc/edge.v1.Gateway.
- Move the game UI under base path /game (env BASE_PATH); make the
  manifest, service-worker scope, WASM loader, and all navigation
  base-aware via a withBase helper.
- Relative API + /rpc Connect prefix; Vite dev proxy mirrors the strip.
- Rewrite the edge Caddy (dev + prod) for path-based routing; empty CORS
  allow-lists (same-origin); single host.
- New VitePress project site (site/): i18n en/ru with switcher, LaTeX
  math, minimal monospace theme; built and served at /.
- dev-deploy compose/Makefile + CI (dev-deploy, prod-build, new
  site-build) build and seed the site; probes hit /, /game/, /healthz.
- Sync docs (ARCHITECTURE, gateway README/openapi, dev-deploy &
  local-dev READMEs, CLAUDE.md, ui/PLAN).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:19:07 +02:00

280 lines
8.5 KiB
Go

package testenv
import (
"context"
"crypto/ed25519"
"crypto/rand"
"crypto/sha256"
"crypto/tls"
"encoding/base64"
"errors"
"fmt"
"net"
"net/http"
"sync/atomic"
"time"
gatewayauthn "galaxy/gateway/authn"
edgev1 "galaxy/gateway/proto/edge/v1"
"galaxy/gateway/proto/edge/v1/edgev1connect"
"connectrpc.com/connect"
"github.com/google/uuid"
"golang.org/x/net/http2"
)
// SignedGatewayClient drives the authenticated edge surface of the
// gateway from tests. It signs ExecuteCommand envelopes with the
// session's Ed25519 private key, verifies response signatures with
// the gateway's response-signer public key, and exposes a
// SubscribeEvents helper. The client speaks Connect over HTTP/2
// cleartext (h2c) — the gateway listener supports that natively
// alongside gRPC and gRPC-Web on the same port.
type SignedGatewayClient struct {
httpClient *http.Client
edge edgev1connect.GatewayClient
deviceSID string
privateKey ed25519.PrivateKey
respPub ed25519.PublicKey
requestSeq uint64
}
// NewSession is the device-session shape returned by registration.
type NewSession struct {
DeviceSessionID string
PrivateKey ed25519.PrivateKey
PublicKey ed25519.PublicKey
}
// GenerateSessionKeyPair returns a fresh Ed25519 keypair for use in
// `confirm-email-code`.
func GenerateSessionKeyPair() (ed25519.PublicKey, ed25519.PrivateKey, error) {
return ed25519.GenerateKey(rand.Reader)
}
// EncodePublicKey base64-encodes the raw 32-byte Ed25519 public key
// for the `client_public_key` field.
func EncodePublicKey(pub ed25519.PublicKey) string {
return base64.StdEncoding.EncodeToString(pub)
}
// DialGateway opens a Connect (HTTP/2 cleartext) client against the
// gateway's authenticated edge listener at addr ("host:port") and
// prepares a signing client bound to deviceSID.
func DialGateway(_ context.Context, addr string, deviceSID string, privateKey ed25519.PrivateKey, respPub ed25519.PublicKey) (*SignedGatewayClient, error) {
if addr == "" {
return nil, fmt.Errorf("dial gateway: empty addr")
}
httpClient := &http.Client{
Transport: &http2.Transport{
AllowHTTP: true,
DialTLSContext: func(ctx context.Context, network, target string, _ *tls.Config) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
},
},
}
edge := edgev1connect.NewGatewayClient(httpClient, "http://"+addr)
return &SignedGatewayClient{
httpClient: httpClient,
edge: edge,
deviceSID: deviceSID,
privateKey: privateKey,
respPub: respPub,
}, nil
}
// Close releases idle HTTP/2 connections held by the underlying transport.
// The Connect client itself is stateless, so this is best-effort.
func (c *SignedGatewayClient) Close() error {
if c.httpClient != nil {
if transport, ok := c.httpClient.Transport.(*http2.Transport); ok {
transport.CloseIdleConnections()
}
}
return nil
}
// ExecuteOptions tunes one ExecuteCommand call. The zero value
// produces a fresh `request_id` and the current timestamp; tests that
// need a fixed request_id (anti-replay) or a stale timestamp
// (freshness window) override the relevant fields.
type ExecuteOptions struct {
RequestID string
TimestampMS int64
OverrideSignature []byte
OverridePayloadHash []byte
OverrideSessionID string
OverrideProtocolVersion string
}
// ExecuteResult is the verified response of a successful
// ExecuteCommand. PayloadBytes is the authenticated FlatBuffers
// blob; tests decode it via galaxy/transcoder.
type ExecuteResult struct {
ResultCode string
PayloadBytes []byte
RequestID string
TimestampMS int64
}
// Execute signs the supplied payload, calls ExecuteCommand, verifies
// the response signature against the gateway response signer, and
// returns the decoded result.
func (c *SignedGatewayClient) Execute(ctx context.Context, messageType string, payload []byte, opts ExecuteOptions) (*ExecuteResult, error) {
if len(payload) == 0 {
return nil, errors.New("ExecuteCommand requires non-empty payload")
}
requestID := opts.RequestID
if requestID == "" {
requestID = uuid.NewString()
}
timestampMS := opts.TimestampMS
if timestampMS == 0 {
timestampMS = time.Now().UnixMilli()
}
protocolVersion := opts.OverrideProtocolVersion
if protocolVersion == "" {
protocolVersion = "v1"
}
deviceSID := opts.OverrideSessionID
if deviceSID == "" {
deviceSID = c.deviceSID
}
hash := opts.OverridePayloadHash
if hash == nil {
sum := sha256.Sum256(payload)
hash = sum[:]
}
signature := opts.OverrideSignature
if signature == nil {
input := gatewayauthn.BuildRequestSigningInput(gatewayauthn.RequestSigningFields{
ProtocolVersion: protocolVersion,
DeviceSessionID: deviceSID,
MessageType: messageType,
TimestampMS: timestampMS,
RequestID: requestID,
PayloadHash: hash,
})
signature = ed25519.Sign(c.privateKey, input)
}
req := &edgev1.ExecuteCommandRequest{
ProtocolVersion: protocolVersion,
DeviceSessionId: deviceSID,
MessageType: messageType,
TimestampMs: timestampMS,
RequestId: requestID,
PayloadBytes: payload,
PayloadHash: hash,
Signature: signature,
}
atomic.AddUint64(&c.requestSeq, 1)
respWrap, err := c.edge.ExecuteCommand(ctx, connect.NewRequest(req))
if err != nil {
return nil, err
}
resp := respWrap.Msg
respHash := sha256.Sum256(resp.GetPayloadBytes())
if string(respHash[:]) != string(resp.GetPayloadHash()) {
return nil, fmt.Errorf("response payload_hash mismatch")
}
if err := gatewayauthn.VerifyResponseSignature(c.respPub, resp.GetSignature(), gatewayauthn.ResponseSigningFields{
ProtocolVersion: resp.GetProtocolVersion(),
RequestID: resp.GetRequestId(),
TimestampMS: resp.GetTimestampMs(),
ResultCode: resp.GetResultCode(),
PayloadHash: resp.GetPayloadHash(),
}); err != nil {
return nil, fmt.Errorf("response signature verification failed: %w", err)
}
return &ExecuteResult{
ResultCode: resp.GetResultCode(),
PayloadBytes: resp.GetPayloadBytes(),
RequestID: resp.GetRequestId(),
TimestampMS: resp.GetTimestampMs(),
}, nil
}
// SubscribeEvents opens the authenticated server-streaming
// SubscribeEvents RPC. The returned channel receives every
// authenticated event the gateway delivers; the channel closes when
// the stream ends or when ctx is done. Errors land on the err
// channel.
func (c *SignedGatewayClient) SubscribeEvents(ctx context.Context, messageType string) (<-chan *edgev1.GatewayEvent, <-chan error, error) {
requestID := uuid.NewString()
timestampMS := time.Now().UnixMilli()
protocolVersion := "v1"
emptyHash := sha256.Sum256(nil)
signature := ed25519.Sign(c.privateKey, gatewayauthn.BuildRequestSigningInput(gatewayauthn.RequestSigningFields{
ProtocolVersion: protocolVersion,
DeviceSessionID: c.deviceSID,
MessageType: messageType,
TimestampMS: timestampMS,
RequestID: requestID,
PayloadHash: emptyHash[:],
}))
stream, err := c.edge.SubscribeEvents(ctx, connect.NewRequest(&edgev1.SubscribeEventsRequest{
ProtocolVersion: protocolVersion,
DeviceSessionId: c.deviceSID,
MessageType: messageType,
TimestampMs: timestampMS,
RequestId: requestID,
PayloadHash: emptyHash[:],
Signature: signature,
}))
if err != nil {
return nil, nil, fmt.Errorf("open subscribe events: %w", err)
}
events := make(chan *edgev1.GatewayEvent, 16)
errs := make(chan error, 1)
go func() {
defer close(events)
defer func() { _ = stream.Close() }()
for stream.Receive() {
events <- stream.Msg()
}
errs <- stream.Err()
}()
return events, errs, nil
}
// IsUnauthenticated reports whether err carries Connect's
// CodeUnauthenticated, useful for negative-path edge tests.
func IsUnauthenticated(err error) bool {
return connect.CodeOf(err) == connect.CodeUnauthenticated
}
// IsInvalidArgument reports whether err carries Connect's
// CodeInvalidArgument (used for malformed envelopes and unsupported
// protocol_version).
func IsInvalidArgument(err error) bool {
return connect.CodeOf(err) == connect.CodeInvalidArgument
}
// IsResourceExhausted reports whether err carries Connect's
// CodeResourceExhausted (used for replay rejection or rate-limit
// rejections).
func IsResourceExhausted(err error) bool {
return connect.CodeOf(err) == connect.CodeResourceExhausted
}
// IsFailedPrecondition reports whether err carries Connect's
// CodeFailedPrecondition. The gateway uses this code for replay
// rejections (the canonical envelope was authentic but the
// `request_id` was already consumed).
func IsFailedPrecondition(err error) bool {
return connect.CodeOf(err) == connect.CodeFailedPrecondition
}