8565942392
Serve the whole stack behind one host: site at /, game UI at /game/, gateway REST at /api + /healthz, Connect at /rpc (prefix stripped by the edge Caddy). The built artifact is domain-agnostic — the UI talks to the gateway same-origin via relative URLs, so the same bundle runs under any host with no rebuild and with CORS disabled. - Rename the Connect proto service galaxy.gateway.v1.EdgeGateway -> edge.v1.Gateway; regenerate Go + TS; public path /rpc/edge.v1.Gateway. - Move the game UI under base path /game (env BASE_PATH); make the manifest, service-worker scope, WASM loader, and all navigation base-aware via a withBase helper. - Relative API + /rpc Connect prefix; Vite dev proxy mirrors the strip. - Rewrite the edge Caddy (dev + prod) for path-based routing; empty CORS allow-lists (same-origin); single host. - New VitePress project site (site/): i18n en/ru with switcher, LaTeX math, minimal monospace theme; built and served at /. - dev-deploy compose/Makefile + CI (dev-deploy, prod-build, new site-build) build and seed the site; probes hit /, /game/, /healthz. - Sync docs (ARCHITECTURE, gateway README/openapi, dev-deploy & local-dev READMEs, CLAUDE.md, ui/PLAN). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
164 lines
5.0 KiB
Go
164 lines
5.0 KiB
Go
package grpcapi
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"galaxy/gateway/internal/telemetry"
|
|
edgev1 "galaxy/gateway/proto/edge/v1"
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// heartbeatingStream wraps a server-streaming response so the inner
|
|
// stream stays alive across browser fetch-streaming idle timeouts.
|
|
// Every call to Send (a real event from a tail service) resets a
|
|
// silence timer; when the timer fires, Run emits an unsigned
|
|
// `gateway.heartbeat` event on its own. Send and the heartbeat
|
|
// goroutine serialise on the same mutex because grpc.ServerStream.Send
|
|
// is documented as not goroutine-safe.
|
|
//
|
|
// Wire-cost budgeting: each heartbeat is one GatewayEvent with only
|
|
// EventType populated (~17 bytes + protobuf tag), framed by Connect
|
|
// (~5 bytes) and HTTP/2 plus TLS overhead (~50 bytes). At the
|
|
// 15-second default a fully-idle stream costs ~840 KB/day per client;
|
|
// see `docs/ARCHITECTURE.md` for the per-scale projection.
|
|
type heartbeatingStream struct {
|
|
grpc.ServerStreamingServer[edgev1.GatewayEvent]
|
|
|
|
interval time.Duration
|
|
metrics *telemetry.Runtime
|
|
|
|
sendMu sync.Mutex
|
|
timer *time.Timer
|
|
|
|
stopOnce sync.Once
|
|
done chan struct{}
|
|
}
|
|
|
|
// newHeartbeatingStream wraps inner with a silence-based heartbeat
|
|
// emitter. A non-positive interval returns nil so the caller can skip
|
|
// the wrapping entirely; non-nil returns must have `Stop()` called once
|
|
// the stream lifecycle ends.
|
|
func newHeartbeatingStream(
|
|
inner grpc.ServerStreamingServer[edgev1.GatewayEvent],
|
|
interval time.Duration,
|
|
metrics *telemetry.Runtime,
|
|
) *heartbeatingStream {
|
|
if interval <= 0 {
|
|
return nil
|
|
}
|
|
|
|
return &heartbeatingStream{
|
|
ServerStreamingServer: inner,
|
|
interval: interval,
|
|
metrics: metrics,
|
|
timer: time.NewTimer(interval),
|
|
done: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Send forwards event to the inner stream and resets the silence timer
|
|
// so the heartbeat goroutine waits a fresh interval before firing
|
|
// again. A Send that succeeds means the transport just delivered real
|
|
// bytes; the silence window restarts from "now".
|
|
func (s *heartbeatingStream) Send(event *edgev1.GatewayEvent) error {
|
|
s.sendMu.Lock()
|
|
defer s.sendMu.Unlock()
|
|
if err := s.ServerStreamingServer.Send(event); err != nil {
|
|
return err
|
|
}
|
|
s.resetTimerLocked()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Run blocks until ctx is canceled or Stop is called, emitting one
|
|
// `gateway.heartbeat` event whenever the silence timer fires. Intended
|
|
// to run in its own goroutine alongside the tail service that owns the
|
|
// stream. A Send failure from the heartbeat path is recorded in
|
|
// telemetry and returned to the caller; production wiring discards it
|
|
// because the tail service will see the same transport failure on its
|
|
// next Send and propagate the real error to the gateway frame
|
|
// observability layer.
|
|
func (s *heartbeatingStream) Run(ctx context.Context) error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-s.done:
|
|
return nil
|
|
case <-s.timer.C:
|
|
err := s.sendHeartbeat()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop halts the heartbeat goroutine and drains the silence timer.
|
|
// Safe to call multiple times; subsequent calls are no-ops.
|
|
func (s *heartbeatingStream) Stop() {
|
|
s.stopOnce.Do(func() {
|
|
close(s.done)
|
|
if !s.timer.Stop() {
|
|
select {
|
|
case <-s.timer.C:
|
|
default:
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// sendHeartbeat emits one heartbeat event, records the outcome in
|
|
// telemetry, and re-arms the silence timer. The outcome attribute
|
|
// makes a sudden bump of `error` easy to spot in dashboards — it
|
|
// usually means the upstream connection is failing before the gateway
|
|
// can flush, while a steady `sent` rate is the normal idle baseline
|
|
// the deployment operator budgets bandwidth against.
|
|
func (s *heartbeatingStream) sendHeartbeat() error {
|
|
s.sendMu.Lock()
|
|
defer s.sendMu.Unlock()
|
|
|
|
err := s.ServerStreamingServer.Send(buildHeartbeatEvent())
|
|
outcome := attribute.String("outcome", "sent")
|
|
if err != nil {
|
|
outcome = attribute.String("outcome", "error")
|
|
}
|
|
s.metrics.RecordPushHeartbeat(context.Background(), outcome)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.resetTimerLocked()
|
|
|
|
return nil
|
|
}
|
|
|
|
// resetTimerLocked re-arms the silence timer. Caller must hold sendMu.
|
|
// The drain follows the canonical pattern from the time.Timer
|
|
// docstring: Stop may report `false` either because the timer already
|
|
// fired or because nothing was queued, so the non-blocking drain
|
|
// handles both states without deadlocking when the channel was already
|
|
// emptied by Run.
|
|
func (s *heartbeatingStream) resetTimerLocked() {
|
|
if !s.timer.Stop() {
|
|
select {
|
|
case <-s.timer.C:
|
|
default:
|
|
}
|
|
}
|
|
s.timer.Reset(s.interval)
|
|
}
|
|
|
|
// buildHeartbeatEvent returns the minimal `gateway.heartbeat`
|
|
// GatewayEvent emitted into the push stream. Every field except
|
|
// EventType is left at its proto3 default so the wire frame stays as
|
|
// small as Connect framing allows. See `gatewayHeartbeatEventType` for
|
|
// the security rationale of leaving the event unsigned.
|
|
func buildHeartbeatEvent() *edgev1.GatewayEvent {
|
|
return &edgev1.GatewayEvent{EventType: gatewayHeartbeatEventType}
|
|
}
|