Files
galaxy-game/gateway/internal/grpcapi/push_fanout.go
T
Ilia Denisov 8565942392
Build · Site / build (push) Successful in 8s
Tests · Go / test (push) Successful in 2m22s
Tests · UI / test (push) Failing after 2m42s
feat(deploy): single-origin path-based deployment + project site
Serve the whole stack behind one host: site at /, game UI at /game/,
gateway REST at /api + /healthz, Connect at /rpc (prefix stripped by the
edge Caddy). The built artifact is domain-agnostic — the UI talks to the
gateway same-origin via relative URLs, so the same bundle runs under any
host with no rebuild and with CORS disabled.

- Rename the Connect proto service galaxy.gateway.v1.EdgeGateway ->
  edge.v1.Gateway; regenerate Go + TS; public path /rpc/edge.v1.Gateway.
- Move the game UI under base path /game (env BASE_PATH); make the
  manifest, service-worker scope, WASM loader, and all navigation
  base-aware via a withBase helper.
- Relative API + /rpc Connect prefix; Vite dev proxy mirrors the strip.
- Rewrite the edge Caddy (dev + prod) for path-based routing; empty CORS
  allow-lists (same-origin); single host.
- New VitePress project site (site/): i18n en/ru with switcher, LaTeX
  math, minimal monospace theme; built and served at /.
- dev-deploy compose/Makefile + CI (dev-deploy, prod-build, new
  site-build) build and seed the site; probes hit /, /game/, /healthz.
- Sync docs (ARCHITECTURE, gateway README/openapi, dev-deploy &
  local-dev READMEs, CLAUDE.md, ui/PLAN).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 18:19:07 +02:00

173 lines
5.5 KiB
Go

package grpcapi
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"galaxy/gateway/authn"
"galaxy/gateway/internal/clock"
"galaxy/gateway/internal/logging"
"galaxy/gateway/internal/push"
"galaxy/gateway/internal/telemetry"
edgev1 "galaxy/gateway/proto/edge/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// NewFanOutPushStreamService constructs the authenticated SubscribeEvents tail
// service that registers active streams in hub and forwards client-facing
// events after the bootstrap event has been sent.
func NewFanOutPushStreamService(hub *push.Hub, responseSigner authn.ResponseSigner, clk clock.Clock, logger *zap.Logger) edgev1.GatewayServer {
if responseSigner == nil {
responseSigner = unavailableResponseSigner{}
}
if clk == nil {
clk = clock.System{}
}
if logger == nil {
logger = zap.NewNop()
}
return fanOutPushStreamService{
hub: hub,
responseSigner: responseSigner,
clock: clk,
logger: logger.Named("push_stream"),
}
}
// fanOutPushStreamService owns the post-bootstrap authenticated push-stream
// lifecycle backed by the in-memory push hub.
type fanOutPushStreamService struct {
edgev1.UnimplementedGatewayServer
hub *push.Hub
responseSigner authn.ResponseSigner
clock clock.Clock
logger *zap.Logger
}
// SubscribeEvents registers the verified stream in the push hub and forwards
// matching client-facing events until the stream ends.
func (s fanOutPushStreamService) SubscribeEvents(_ *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
binding, ok := authenticatedStreamBindingFromContext(stream.Context())
if !ok {
return status.Error(codes.Internal, "authenticated request context is incomplete")
}
if s.hub == nil {
return status.Error(codes.Internal, "push hub is unavailable")
}
subscription, err := s.hub.Register(push.StreamBinding{
UserID: binding.UserID,
DeviceSessionID: binding.DeviceSessionID,
})
if err != nil {
return status.Error(codes.Internal, "push stream registration failed")
}
defer subscription.Close()
openFields := []zap.Field{
zap.String("component", "authenticated_grpc"),
zap.String("transport", "grpc"),
zap.String("rpc_method", authenticatedRPCSubscribeEvents),
zap.String("message_type", binding.MessageType),
zap.String("request_id", binding.RequestID),
zap.String("trace_id", binding.TraceID),
zap.String("device_session_id", binding.DeviceSessionID),
zap.String("user_id", binding.UserID),
}
openFields = append(openFields, logging.TraceFieldsFromContext(stream.Context())...)
s.logger.Info("push stream opened", openFields...)
for {
select {
case <-stream.Context().Done():
s.logger.Info("push stream closed", append(openFields, zap.String("edge_outcome", string(mapSubscriptionOutcome(stream.Context().Err()))))...)
return stream.Context().Err()
case <-subscription.Done():
subscriptionErr := subscription.Err()
s.logger.Warn("push stream closed", append(openFields,
zap.String("edge_outcome", string(mapSubscriptionOutcome(subscriptionErr))),
zap.String("reject_reason", string(mapSubscriptionOutcome(subscriptionErr))),
)...)
return mapSubscriptionError(subscriptionErr)
case event := <-subscription.Events():
signedEvent, err := s.buildGatewayEvent(event)
if err != nil {
return err
}
if err := stream.Send(signedEvent); err != nil {
return err
}
}
}
}
func (s fanOutPushStreamService) buildGatewayEvent(event push.Event) (*edgev1.GatewayEvent, error) {
timestampMS := s.clock.Now().UTC().UnixMilli()
payloadHash := sha256.Sum256(event.PayloadBytes)
signature, err := s.responseSigner.SignEvent(authn.EventSigningFields{
EventType: event.EventType,
EventID: event.EventID,
TimestampMS: timestampMS,
RequestID: event.RequestID,
TraceID: event.TraceID,
PayloadHash: payloadHash[:],
})
if err != nil {
return nil, status.Error(codes.Unavailable, "response signer is unavailable")
}
return &edgev1.GatewayEvent{
EventType: event.EventType,
EventId: event.EventID,
TimestampMs: timestampMS,
PayloadBytes: bytes.Clone(event.PayloadBytes),
PayloadHash: bytes.Clone(payloadHash[:]),
Signature: signature,
RequestId: event.RequestID,
TraceId: event.TraceID,
}, nil
}
func mapSubscriptionError(err error) error {
switch {
case err == nil:
return nil
case errors.Is(err, push.ErrSubscriptionRevoked):
return status.Error(codes.FailedPrecondition, "device session is revoked")
case errors.Is(err, push.ErrSubscriptionOverflow):
return status.Error(codes.ResourceExhausted, "push stream overflowed")
case errors.Is(err, push.ErrHubShuttingDown):
return status.Error(codes.Unavailable, "gateway is shutting down")
default:
return status.Error(codes.Internal, "push stream closed unexpectedly")
}
}
func mapSubscriptionOutcome(err error) telemetry.EdgeOutcome {
switch {
case err == nil:
return telemetry.EdgeOutcomeSuccess
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return telemetry.EdgeOutcomeSuccess
case errors.Is(err, push.ErrSubscriptionRevoked):
return telemetry.EdgeOutcomeRevokedSession
case errors.Is(err, push.ErrSubscriptionOverflow):
return telemetry.EdgeOutcomeRateLimited
case errors.Is(err, push.ErrHubShuttingDown):
return telemetry.EdgeOutcomeGatewayShuttingDown
default:
return telemetry.EdgeOutcomeInternalError
}
}
var _ edgev1.GatewayServer = fanOutPushStreamService{}