Files
galaxy-game/gateway/internal/grpcapi/push_heartbeat.go
T
Ilia Denisov 8565942392
Build · Site / build (push) Successful in 8s
Tests · Go / test (push) Successful in 2m22s
Tests · UI / test (push) Failing after 2m42s
feat(deploy): single-origin path-based deployment + project site
Serve the whole stack behind one host: site at /, game UI at /game/,
gateway REST at /api + /healthz, Connect at /rpc (prefix stripped by the
edge Caddy). The built artifact is domain-agnostic — the UI talks to the
gateway same-origin via relative URLs, so the same bundle runs under any
host with no rebuild and with CORS disabled.

- Rename the Connect proto service galaxy.gateway.v1.EdgeGateway ->
  edge.v1.Gateway; regenerate Go + TS; public path /rpc/edge.v1.Gateway.
- Move the game UI under base path /game (env BASE_PATH); make the
  manifest, service-worker scope, WASM loader, and all navigation
  base-aware via a withBase helper.
- Relative API + /rpc Connect prefix; Vite dev proxy mirrors the strip.
- Rewrite the edge Caddy (dev + prod) for path-based routing; empty CORS
  allow-lists (same-origin); single host.
- New VitePress project site (site/): i18n en/ru with switcher, LaTeX
  math, minimal monospace theme; built and served at /.
- dev-deploy compose/Makefile + CI (dev-deploy, prod-build, new
  site-build) build and seed the site; probes hit /, /game/, /healthz.
- Sync docs (ARCHITECTURE, gateway README/openapi, dev-deploy &
  local-dev READMEs, CLAUDE.md, ui/PLAN).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:19:07 +02:00

164 lines
5.0 KiB
Go

package grpcapi
import (
"context"
"sync"
"time"
"galaxy/gateway/internal/telemetry"
edgev1 "galaxy/gateway/proto/edge/v1"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
)
// heartbeatingStream wraps a server-streaming response so the inner
// stream stays alive across browser fetch-streaming idle timeouts.
// Every call to Send (a real event from a tail service) resets a
// silence timer; when the timer fires, Run emits an unsigned
// `gateway.heartbeat` event on its own. Send and the heartbeat
// goroutine serialise on the same mutex because grpc.ServerStream.Send
// is documented as not goroutine-safe.
//
// Wire-cost budgeting: each heartbeat is one GatewayEvent with only
// EventType populated (~17 bytes + protobuf tag), framed by Connect
// (~5 bytes) and HTTP/2 plus TLS overhead (~50 bytes). At the
// 15-second default a fully-idle stream costs ~840 KB/day per client;
// see `docs/ARCHITECTURE.md` for the per-scale projection.
type heartbeatingStream struct {
grpc.ServerStreamingServer[edgev1.GatewayEvent]
interval time.Duration
metrics *telemetry.Runtime
sendMu sync.Mutex
timer *time.Timer
stopOnce sync.Once
done chan struct{}
}
// newHeartbeatingStream wraps inner with a silence-based heartbeat
// emitter. A non-positive interval returns nil so the caller can skip
// the wrapping entirely; non-nil returns must have `Stop()` called once
// the stream lifecycle ends.
func newHeartbeatingStream(
inner grpc.ServerStreamingServer[edgev1.GatewayEvent],
interval time.Duration,
metrics *telemetry.Runtime,
) *heartbeatingStream {
if interval <= 0 {
return nil
}
return &heartbeatingStream{
ServerStreamingServer: inner,
interval: interval,
metrics: metrics,
timer: time.NewTimer(interval),
done: make(chan struct{}),
}
}
// Send forwards event to the inner stream and resets the silence timer
// so the heartbeat goroutine waits a fresh interval before firing
// again. A Send that succeeds means the transport just delivered real
// bytes; the silence window restarts from "now".
func (s *heartbeatingStream) Send(event *edgev1.GatewayEvent) error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
if err := s.ServerStreamingServer.Send(event); err != nil {
return err
}
s.resetTimerLocked()
return nil
}
// Run blocks until ctx is canceled or Stop is called, emitting one
// `gateway.heartbeat` event whenever the silence timer fires. Intended
// to run in its own goroutine alongside the tail service that owns the
// stream. A Send failure from the heartbeat path is recorded in
// telemetry and returned to the caller; production wiring discards it
// because the tail service will see the same transport failure on its
// next Send and propagate the real error to the gateway frame
// observability layer.
func (s *heartbeatingStream) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-s.done:
return nil
case <-s.timer.C:
err := s.sendHeartbeat()
if err != nil {
return err
}
}
}
}
// Stop halts the heartbeat goroutine and drains the silence timer.
// Safe to call multiple times; subsequent calls are no-ops.
func (s *heartbeatingStream) Stop() {
s.stopOnce.Do(func() {
close(s.done)
if !s.timer.Stop() {
select {
case <-s.timer.C:
default:
}
}
})
}
// sendHeartbeat emits one heartbeat event, records the outcome in
// telemetry, and re-arms the silence timer. The outcome attribute
// makes a sudden bump of `error` easy to spot in dashboards — it
// usually means the upstream connection is failing before the gateway
// can flush, while a steady `sent` rate is the normal idle baseline
// the deployment operator budgets bandwidth against.
func (s *heartbeatingStream) sendHeartbeat() error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
err := s.ServerStreamingServer.Send(buildHeartbeatEvent())
outcome := attribute.String("outcome", "sent")
if err != nil {
outcome = attribute.String("outcome", "error")
}
s.metrics.RecordPushHeartbeat(context.Background(), outcome)
if err != nil {
return err
}
s.resetTimerLocked()
return nil
}
// resetTimerLocked re-arms the silence timer. Caller must hold sendMu.
// The drain follows the canonical pattern from the time.Timer
// docstring: Stop may report `false` either because the timer already
// fired or because nothing was queued, so the non-blocking drain
// handles both states without deadlocking when the channel was already
// emptied by Run.
func (s *heartbeatingStream) resetTimerLocked() {
if !s.timer.Stop() {
select {
case <-s.timer.C:
default:
}
}
s.timer.Reset(s.interval)
}
// buildHeartbeatEvent returns the minimal `gateway.heartbeat`
// GatewayEvent emitted into the push stream. Every field except
// EventType is left at its proto3 default so the wire frame stays as
// small as Connect framing allows. See `gatewayHeartbeatEventType` for
// the security rationale of leaving the event unsigned.
func buildHeartbeatEvent() *edgev1.GatewayEvent {
return &edgev1.GatewayEvent{EventType: gatewayHeartbeatEventType}
}