8565942392
Serve the whole stack behind one host: site at /, game UI at /game/, gateway REST at /api + /healthz, Connect at /rpc (prefix stripped by the edge Caddy). The built artifact is domain-agnostic — the UI talks to the gateway same-origin via relative URLs, so the same bundle runs under any host with no rebuild and with CORS disabled. - Rename the Connect proto service galaxy.gateway.v1.EdgeGateway -> edge.v1.Gateway; regenerate Go + TS; public path /rpc/edge.v1.Gateway. - Move the game UI under base path /game (env BASE_PATH); make the manifest, service-worker scope, WASM loader, and all navigation base-aware via a withBase helper. - Relative API + /rpc Connect prefix; Vite dev proxy mirrors the strip. - Rewrite the edge Caddy (dev + prod) for path-based routing; empty CORS allow-lists (same-origin); single host. - New VitePress project site (site/): i18n en/ru with switcher, LaTeX math, minimal monospace theme; built and served at /. - dev-deploy compose/Makefile + CI (dev-deploy, prod-build, new site-build) build and seed the site; probes hit /, /game/, /healthz. - Sync docs (ARCHITECTURE, gateway README/openapi, dev-deploy & local-dev READMEs, CLAUDE.md, ui/PLAN). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
173 lines
5.5 KiB
Go
173 lines
5.5 KiB
Go
package grpcapi
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"errors"
|
|
|
|
"galaxy/gateway/authn"
|
|
"galaxy/gateway/internal/clock"
|
|
"galaxy/gateway/internal/logging"
|
|
"galaxy/gateway/internal/push"
|
|
"galaxy/gateway/internal/telemetry"
|
|
edgev1 "galaxy/gateway/proto/edge/v1"
|
|
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// NewFanOutPushStreamService constructs the authenticated SubscribeEvents tail
|
|
// service that registers active streams in hub and forwards client-facing
|
|
// events after the bootstrap event has been sent.
|
|
func NewFanOutPushStreamService(hub *push.Hub, responseSigner authn.ResponseSigner, clk clock.Clock, logger *zap.Logger) edgev1.GatewayServer {
|
|
if responseSigner == nil {
|
|
responseSigner = unavailableResponseSigner{}
|
|
}
|
|
if clk == nil {
|
|
clk = clock.System{}
|
|
}
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
return fanOutPushStreamService{
|
|
hub: hub,
|
|
responseSigner: responseSigner,
|
|
clock: clk,
|
|
logger: logger.Named("push_stream"),
|
|
}
|
|
}
|
|
|
|
// fanOutPushStreamService owns the post-bootstrap authenticated push-stream
|
|
// lifecycle backed by the in-memory push hub.
|
|
type fanOutPushStreamService struct {
|
|
edgev1.UnimplementedGatewayServer
|
|
|
|
hub *push.Hub
|
|
responseSigner authn.ResponseSigner
|
|
clock clock.Clock
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// SubscribeEvents registers the verified stream in the push hub and forwards
|
|
// matching client-facing events until the stream ends.
|
|
func (s fanOutPushStreamService) SubscribeEvents(_ *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
|
binding, ok := authenticatedStreamBindingFromContext(stream.Context())
|
|
if !ok {
|
|
return status.Error(codes.Internal, "authenticated request context is incomplete")
|
|
}
|
|
if s.hub == nil {
|
|
return status.Error(codes.Internal, "push hub is unavailable")
|
|
}
|
|
|
|
subscription, err := s.hub.Register(push.StreamBinding{
|
|
UserID: binding.UserID,
|
|
DeviceSessionID: binding.DeviceSessionID,
|
|
})
|
|
if err != nil {
|
|
return status.Error(codes.Internal, "push stream registration failed")
|
|
}
|
|
defer subscription.Close()
|
|
|
|
openFields := []zap.Field{
|
|
zap.String("component", "authenticated_grpc"),
|
|
zap.String("transport", "grpc"),
|
|
zap.String("rpc_method", authenticatedRPCSubscribeEvents),
|
|
zap.String("message_type", binding.MessageType),
|
|
zap.String("request_id", binding.RequestID),
|
|
zap.String("trace_id", binding.TraceID),
|
|
zap.String("device_session_id", binding.DeviceSessionID),
|
|
zap.String("user_id", binding.UserID),
|
|
}
|
|
openFields = append(openFields, logging.TraceFieldsFromContext(stream.Context())...)
|
|
s.logger.Info("push stream opened", openFields...)
|
|
|
|
for {
|
|
select {
|
|
case <-stream.Context().Done():
|
|
s.logger.Info("push stream closed", append(openFields, zap.String("edge_outcome", string(mapSubscriptionOutcome(stream.Context().Err()))))...)
|
|
return stream.Context().Err()
|
|
case <-subscription.Done():
|
|
subscriptionErr := subscription.Err()
|
|
s.logger.Warn("push stream closed", append(openFields,
|
|
zap.String("edge_outcome", string(mapSubscriptionOutcome(subscriptionErr))),
|
|
zap.String("reject_reason", string(mapSubscriptionOutcome(subscriptionErr))),
|
|
)...)
|
|
return mapSubscriptionError(subscriptionErr)
|
|
case event := <-subscription.Events():
|
|
signedEvent, err := s.buildGatewayEvent(event)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := stream.Send(signedEvent); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s fanOutPushStreamService) buildGatewayEvent(event push.Event) (*edgev1.GatewayEvent, error) {
|
|
timestampMS := s.clock.Now().UTC().UnixMilli()
|
|
payloadHash := sha256.Sum256(event.PayloadBytes)
|
|
|
|
signature, err := s.responseSigner.SignEvent(authn.EventSigningFields{
|
|
EventType: event.EventType,
|
|
EventID: event.EventID,
|
|
TimestampMS: timestampMS,
|
|
RequestID: event.RequestID,
|
|
TraceID: event.TraceID,
|
|
PayloadHash: payloadHash[:],
|
|
})
|
|
if err != nil {
|
|
return nil, status.Error(codes.Unavailable, "response signer is unavailable")
|
|
}
|
|
|
|
return &edgev1.GatewayEvent{
|
|
EventType: event.EventType,
|
|
EventId: event.EventID,
|
|
TimestampMs: timestampMS,
|
|
PayloadBytes: bytes.Clone(event.PayloadBytes),
|
|
PayloadHash: bytes.Clone(payloadHash[:]),
|
|
Signature: signature,
|
|
RequestId: event.RequestID,
|
|
TraceId: event.TraceID,
|
|
}, nil
|
|
}
|
|
|
|
func mapSubscriptionError(err error) error {
|
|
switch {
|
|
case err == nil:
|
|
return nil
|
|
case errors.Is(err, push.ErrSubscriptionRevoked):
|
|
return status.Error(codes.FailedPrecondition, "device session is revoked")
|
|
case errors.Is(err, push.ErrSubscriptionOverflow):
|
|
return status.Error(codes.ResourceExhausted, "push stream overflowed")
|
|
case errors.Is(err, push.ErrHubShuttingDown):
|
|
return status.Error(codes.Unavailable, "gateway is shutting down")
|
|
default:
|
|
return status.Error(codes.Internal, "push stream closed unexpectedly")
|
|
}
|
|
}
|
|
|
|
func mapSubscriptionOutcome(err error) telemetry.EdgeOutcome {
|
|
switch {
|
|
case err == nil:
|
|
return telemetry.EdgeOutcomeSuccess
|
|
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
|
|
return telemetry.EdgeOutcomeSuccess
|
|
case errors.Is(err, push.ErrSubscriptionRevoked):
|
|
return telemetry.EdgeOutcomeRevokedSession
|
|
case errors.Is(err, push.ErrSubscriptionOverflow):
|
|
return telemetry.EdgeOutcomeRateLimited
|
|
case errors.Is(err, push.ErrHubShuttingDown):
|
|
return telemetry.EdgeOutcomeGatewayShuttingDown
|
|
default:
|
|
return telemetry.EdgeOutcomeInternalError
|
|
}
|
|
}
|
|
|
|
var _ edgev1.GatewayServer = fanOutPushStreamService{}
|