feat(deploy): single-origin path-based deployment + project site
Serve the whole stack behind one host: site at /, game UI at /game/, gateway REST at /api + /healthz, Connect at /rpc (prefix stripped by the edge Caddy). The built artifact is domain-agnostic — the UI talks to the gateway same-origin via relative URLs, so the same bundle runs under any host with no rebuild and with CORS disabled. - Rename the Connect proto service galaxy.gateway.v1.EdgeGateway -> edge.v1.Gateway; regenerate Go + TS; public path /rpc/edge.v1.Gateway. - Move the game UI under base path /game (env BASE_PATH); make the manifest, service-worker scope, WASM loader, and all navigation base-aware via a withBase helper. - Relative API + /rpc Connect prefix; Vite dev proxy mirrors the strip. - Rewrite the edge Caddy (dev + prod) for path-based routing; empty CORS allow-lists (same-origin); single host. - New VitePress project site (site/): i18n en/ru with switcher, LaTeX math, minimal monospace theme; built and served at /. - dev-deploy compose/Makefile + CI (dev-deploy, prod-build, new site-build) build and seed the site; probes hit /, /game/, /healthz. - Sync docs (ARCHITECTURE, gateway README/openapi, dev-deploy & local-dev READMEs, CLAUDE.md, ui/PLAN). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -11,7 +11,7 @@ import (
|
||||
"galaxy/gateway/authn"
|
||||
"galaxy/gateway/internal/clock"
|
||||
"galaxy/gateway/internal/downstream"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@@ -21,9 +21,9 @@ import (
|
||||
// commandRoutingService translates the verified authenticated request context
|
||||
// into an internal downstream command and signs successful unary responses.
|
||||
type commandRoutingService struct {
|
||||
gatewayv1.UnimplementedEdgeGatewayServer
|
||||
edgev1.UnimplementedGatewayServer
|
||||
|
||||
subscribeDelegate gatewayv1.EdgeGatewayServer
|
||||
subscribeDelegate edgev1.GatewayServer
|
||||
router downstream.Router
|
||||
responseSigner authn.ResponseSigner
|
||||
clock clock.Clock
|
||||
@@ -32,7 +32,7 @@ type commandRoutingService struct {
|
||||
|
||||
// ExecuteCommand builds a verified downstream command, routes it by exact
|
||||
// message_type, executes it, and signs the resulting unary response.
|
||||
func (s commandRoutingService) ExecuteCommand(ctx context.Context, _ *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
func (s commandRoutingService) ExecuteCommand(ctx context.Context, _ *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
command, err := authenticatedCommandFromContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -80,7 +80,7 @@ func (s commandRoutingService) ExecuteCommand(ctx context.Context, _ *gatewayv1.
|
||||
return nil, status.Error(codes.Unavailable, "response signer is unavailable")
|
||||
}
|
||||
|
||||
return &gatewayv1.ExecuteCommandResponse{
|
||||
return &edgev1.ExecuteCommandResponse{
|
||||
ProtocolVersion: command.ProtocolVersion,
|
||||
RequestId: command.RequestID,
|
||||
TimestampMs: responseTimestampMS,
|
||||
@@ -93,13 +93,13 @@ func (s commandRoutingService) ExecuteCommand(ctx context.Context, _ *gatewayv1.
|
||||
|
||||
// SubscribeEvents delegates to the authenticated streaming service
|
||||
// implementation selected during server construction.
|
||||
func (s commandRoutingService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
func (s commandRoutingService) SubscribeEvents(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
return s.subscribeDelegate.SubscribeEvents(req, stream)
|
||||
}
|
||||
|
||||
// newCommandRoutingService constructs the final authenticated service that
|
||||
// owns verified unary routing while preserving the delegated streaming path.
|
||||
func newCommandRoutingService(subscribeDelegate gatewayv1.EdgeGatewayServer, router downstream.Router, responseSigner authn.ResponseSigner, clk clock.Clock, downstreamTimeout time.Duration) gatewayv1.EdgeGatewayServer {
|
||||
func newCommandRoutingService(subscribeDelegate edgev1.GatewayServer, router downstream.Router, responseSigner authn.ResponseSigner, clk clock.Clock, downstreamTimeout time.Duration) edgev1.GatewayServer {
|
||||
return commandRoutingService{
|
||||
subscribeDelegate: subscribeDelegate,
|
||||
router: router,
|
||||
@@ -142,4 +142,4 @@ func (unavailableResponseSigner) SignEvent(authn.EventSigningFields) ([]byte, er
|
||||
return nil, errors.New("response signer is unavailable")
|
||||
}
|
||||
|
||||
var _ gatewayv1.EdgeGatewayServer = commandRoutingService{}
|
||||
var _ edgev1.GatewayServer = commandRoutingService{}
|
||||
|
||||
@@ -5,8 +5,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
"galaxy/gateway/proto/edge/v1/edgev1connect"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"google.golang.org/grpc/codes"
|
||||
@@ -17,15 +17,15 @@ import (
|
||||
// connectEdgeAdapter exposes the existing gRPC-shaped authenticated edge
|
||||
// service decorator stack (envelope → session → payload-hash → signature →
|
||||
// freshness/replay → rate-limit → routing/push) through the
|
||||
// gatewayv1connect.EdgeGatewayHandler interface. It owns no logic of its
|
||||
// edgev1connect.GatewayHandler interface. It owns no logic of its
|
||||
// own; the underlying decorator stack carries the full ingress contract
|
||||
// unchanged.
|
||||
type connectEdgeAdapter struct {
|
||||
impl gatewayv1.EdgeGatewayServer
|
||||
impl edgev1.GatewayServer
|
||||
}
|
||||
|
||||
// newConnectEdgeAdapter wraps impl as a Connect handler.
|
||||
func newConnectEdgeAdapter(impl gatewayv1.EdgeGatewayServer) gatewayv1connect.EdgeGatewayHandler {
|
||||
func newConnectEdgeAdapter(impl edgev1.GatewayServer) edgev1connect.GatewayHandler {
|
||||
return &connectEdgeAdapter{impl: impl}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ func newConnectEdgeAdapter(impl gatewayv1.EdgeGatewayServer) gatewayv1connect.Ed
|
||||
// service, and wraps the typed response. gRPC `status.Error` values
|
||||
// returned by the decorator stack are translated to *connect.Error so
|
||||
// the Connect client receives the matching code and message.
|
||||
func (a *connectEdgeAdapter) ExecuteCommand(ctx context.Context, req *connect.Request[gatewayv1.ExecuteCommandRequest]) (*connect.Response[gatewayv1.ExecuteCommandResponse], error) {
|
||||
func (a *connectEdgeAdapter) ExecuteCommand(ctx context.Context, req *connect.Request[edgev1.ExecuteCommandRequest]) (*connect.Response[edgev1.ExecuteCommandResponse], error) {
|
||||
resp, err := a.impl.ExecuteCommand(ctx, req.Msg)
|
||||
if err != nil {
|
||||
return nil, translateGRPCStatusError(err)
|
||||
@@ -48,7 +48,7 @@ func (a *connectEdgeAdapter) ExecuteCommand(ctx context.Context, req *connect.Re
|
||||
// stream; the remaining grpc.ServerStream surface is satisfied by no-op
|
||||
// shims so the interface contract is met without panicking. Errors
|
||||
// returned by the decorator stack are translated to *connect.Error.
|
||||
func (a *connectEdgeAdapter) SubscribeEvents(ctx context.Context, req *connect.Request[gatewayv1.SubscribeEventsRequest], stream *connect.ServerStream[gatewayv1.GatewayEvent]) error {
|
||||
func (a *connectEdgeAdapter) SubscribeEvents(ctx context.Context, req *connect.Request[edgev1.SubscribeEventsRequest], stream *connect.ServerStream[edgev1.GatewayEvent]) error {
|
||||
wrapped := &connectEdgeStream{ctx: ctx, stream: stream}
|
||||
if err := a.impl.SubscribeEvents(req.Msg, wrapped); err != nil {
|
||||
return translateGRPCStatusError(err)
|
||||
@@ -83,19 +83,19 @@ func translateGRPCStatusError(err error) error {
|
||||
return connect.NewError(connect.Code(grpcStatus.Code()), errors.New(grpcStatus.Message()))
|
||||
}
|
||||
|
||||
// connectEdgeStream satisfies grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
|
||||
// connectEdgeStream satisfies grpc.ServerStreamingServer[edgev1.GatewayEvent]
|
||||
// on top of *connect.ServerStream. The decorator stack reads the request
|
||||
// context and pushes outbound events through Send; the rest of the
|
||||
// grpc.ServerStream surface is not exercised in the gateway, so the no-op
|
||||
// implementations preserve the type contract without surprising behaviour.
|
||||
type connectEdgeStream struct {
|
||||
ctx context.Context
|
||||
stream *connect.ServerStream[gatewayv1.GatewayEvent]
|
||||
stream *connect.ServerStream[edgev1.GatewayEvent]
|
||||
}
|
||||
|
||||
// Send forwards a typed gateway event through the underlying Connect server
|
||||
// stream.
|
||||
func (s *connectEdgeStream) Send(event *gatewayv1.GatewayEvent) error {
|
||||
func (s *connectEdgeStream) Send(event *edgev1.GatewayEvent) error {
|
||||
return s.stream.Send(event)
|
||||
}
|
||||
|
||||
@@ -127,7 +127,7 @@ func (s *connectEdgeStream) SetTrailer(metadata.MD) {}
|
||||
// SendMsg directly; if a future caller does, the typed Send path is used
|
||||
// when the message is a GatewayEvent.
|
||||
func (s *connectEdgeStream) SendMsg(m any) error {
|
||||
event, ok := m.(*gatewayv1.GatewayEvent)
|
||||
event, ok := m.(*edgev1.GatewayEvent)
|
||||
if !ok {
|
||||
return fmt.Errorf("connectEdgeStream.SendMsg: unsupported message type %T", m)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"buf.build/go/protovalidate"
|
||||
"google.golang.org/grpc"
|
||||
@@ -47,14 +47,14 @@ func parsedEnvelopeFromContext(ctx context.Context) (parsedEnvelope, bool) {
|
||||
// envelopeValidatingService applies envelope parsing and the protocol gate
|
||||
// before delegating to the configured service implementation.
|
||||
type envelopeValidatingService struct {
|
||||
gatewayv1.UnimplementedEdgeGatewayServer
|
||||
edgev1.UnimplementedGatewayServer
|
||||
|
||||
delegate gatewayv1.EdgeGatewayServer
|
||||
delegate edgev1.GatewayServer
|
||||
}
|
||||
|
||||
// ExecuteCommand validates req and only then forwards it to the configured
|
||||
// delegate with the parsed envelope attached to ctx.
|
||||
func (s envelopeValidatingService) ExecuteCommand(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
func (s envelopeValidatingService) ExecuteCommand(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
envelope, err := parseExecuteCommandRequest(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -65,7 +65,7 @@ func (s envelopeValidatingService) ExecuteCommand(ctx context.Context, req *gate
|
||||
|
||||
// SubscribeEvents validates req and only then forwards it to the configured
|
||||
// delegate with the parsed envelope attached to the stream context.
|
||||
func (s envelopeValidatingService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
func (s envelopeValidatingService) SubscribeEvents(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
envelope, err := parseSubscribeEventsRequest(req)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -79,7 +79,7 @@ func (s envelopeValidatingService) SubscribeEvents(req *gatewayv1.SubscribeEvent
|
||||
|
||||
// parseExecuteCommandRequest validates req according to the request-envelope
|
||||
// rules and returns a cloned parsed envelope suitable for later auth steps.
|
||||
func parseExecuteCommandRequest(req *gatewayv1.ExecuteCommandRequest) (parsedEnvelope, error) {
|
||||
func parseExecuteCommandRequest(req *edgev1.ExecuteCommandRequest) (parsedEnvelope, error) {
|
||||
if req == nil {
|
||||
return parsedEnvelope{}, newMalformedEnvelopeError("request envelope must not be nil")
|
||||
}
|
||||
@@ -105,7 +105,7 @@ func parseExecuteCommandRequest(req *gatewayv1.ExecuteCommandRequest) (parsedEnv
|
||||
|
||||
// parseSubscribeEventsRequest validates req according to the request-envelope
|
||||
// rules and returns a cloned parsed envelope suitable for later auth steps.
|
||||
func parseSubscribeEventsRequest(req *gatewayv1.SubscribeEventsRequest) (parsedEnvelope, error) {
|
||||
func parseSubscribeEventsRequest(req *edgev1.SubscribeEventsRequest) (parsedEnvelope, error) {
|
||||
if req == nil {
|
||||
return parsedEnvelope{}, newMalformedEnvelopeError("request envelope must not be nil")
|
||||
}
|
||||
@@ -131,13 +131,13 @@ func parseSubscribeEventsRequest(req *gatewayv1.SubscribeEventsRequest) (parsedE
|
||||
|
||||
// newEnvelopeValidatingService wraps delegate with the envelope-validation
|
||||
// gate.
|
||||
func newEnvelopeValidatingService(delegate gatewayv1.EdgeGatewayServer) gatewayv1.EdgeGatewayServer {
|
||||
func newEnvelopeValidatingService(delegate edgev1.GatewayServer) edgev1.GatewayServer {
|
||||
return envelopeValidatingService{delegate: delegate}
|
||||
}
|
||||
|
||||
// canonicalExecuteCommandValidationError maps any ExecuteCommand validation
|
||||
// failure into the stable canonical error chosen by field order.
|
||||
func canonicalExecuteCommandValidationError(req *gatewayv1.ExecuteCommandRequest) error {
|
||||
func canonicalExecuteCommandValidationError(req *edgev1.ExecuteCommandRequest) error {
|
||||
switch {
|
||||
case req.GetProtocolVersion() == "":
|
||||
return newMalformedEnvelopeError("protocol_version must not be empty")
|
||||
@@ -162,7 +162,7 @@ func canonicalExecuteCommandValidationError(req *gatewayv1.ExecuteCommandRequest
|
||||
|
||||
// canonicalSubscribeEventsValidationError maps any SubscribeEvents validation
|
||||
// failure into the stable canonical error chosen by field order.
|
||||
func canonicalSubscribeEventsValidationError(req *gatewayv1.SubscribeEventsRequest) error {
|
||||
func canonicalSubscribeEventsValidationError(req *edgev1.SubscribeEventsRequest) error {
|
||||
switch {
|
||||
case req.GetProtocolVersion() == "":
|
||||
return newMalformedEnvelopeError("protocol_version must not be empty")
|
||||
@@ -198,7 +198,7 @@ func newUnsupportedProtocolVersionError(version string) error {
|
||||
type parsedEnvelopeContextKey struct{}
|
||||
|
||||
type envelopeContextStream struct {
|
||||
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
|
||||
grpc.ServerStreamingServer[edgev1.GatewayEvent]
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
@@ -210,4 +210,4 @@ func (s envelopeContextStream) Context() context.Context {
|
||||
return s.ctx
|
||||
}
|
||||
|
||||
var _ gatewayv1.EdgeGatewayServer = envelopeValidatingService{}
|
||||
var _ edgev1.GatewayServer = envelopeValidatingService{}
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -19,10 +19,10 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
mutate func(*gatewayv1.ExecuteCommandRequest)
|
||||
mutate func(*edgev1.ExecuteCommandRequest)
|
||||
wantCode codes.Code
|
||||
wantMessage string
|
||||
assertValid func(*testing.T, *gatewayv1.ExecuteCommandRequest, parsedEnvelope)
|
||||
assertValid func(*testing.T, *edgev1.ExecuteCommandRequest, parsedEnvelope)
|
||||
}{
|
||||
{
|
||||
name: "nil request",
|
||||
@@ -31,7 +31,7 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty protocol version",
|
||||
mutate: func(req *gatewayv1.ExecuteCommandRequest) {
|
||||
mutate: func(req *edgev1.ExecuteCommandRequest) {
|
||||
req.ProtocolVersion = ""
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -39,7 +39,7 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty device session id",
|
||||
mutate: func(req *gatewayv1.ExecuteCommandRequest) {
|
||||
mutate: func(req *edgev1.ExecuteCommandRequest) {
|
||||
req.DeviceSessionId = ""
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -47,7 +47,7 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty message type",
|
||||
mutate: func(req *gatewayv1.ExecuteCommandRequest) {
|
||||
mutate: func(req *edgev1.ExecuteCommandRequest) {
|
||||
req.MessageType = ""
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -55,7 +55,7 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "zero timestamp",
|
||||
mutate: func(req *gatewayv1.ExecuteCommandRequest) {
|
||||
mutate: func(req *edgev1.ExecuteCommandRequest) {
|
||||
req.TimestampMs = 0
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -63,7 +63,7 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty request id",
|
||||
mutate: func(req *gatewayv1.ExecuteCommandRequest) {
|
||||
mutate: func(req *edgev1.ExecuteCommandRequest) {
|
||||
req.RequestId = ""
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -71,7 +71,7 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty payload bytes",
|
||||
mutate: func(req *gatewayv1.ExecuteCommandRequest) {
|
||||
mutate: func(req *edgev1.ExecuteCommandRequest) {
|
||||
req.PayloadBytes = nil
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -79,7 +79,7 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty payload hash",
|
||||
mutate: func(req *gatewayv1.ExecuteCommandRequest) {
|
||||
mutate: func(req *edgev1.ExecuteCommandRequest) {
|
||||
req.PayloadHash = nil
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -87,7 +87,7 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty signature",
|
||||
mutate: func(req *gatewayv1.ExecuteCommandRequest) {
|
||||
mutate: func(req *edgev1.ExecuteCommandRequest) {
|
||||
req.Signature = nil
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -95,7 +95,7 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "unsupported protocol version",
|
||||
mutate: func(req *gatewayv1.ExecuteCommandRequest) {
|
||||
mutate: func(req *edgev1.ExecuteCommandRequest) {
|
||||
req.ProtocolVersion = "v2"
|
||||
},
|
||||
wantCode: codes.FailedPrecondition,
|
||||
@@ -104,7 +104,7 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
{
|
||||
name: "valid request",
|
||||
wantCode: codes.OK,
|
||||
assertValid: func(t *testing.T, req *gatewayv1.ExecuteCommandRequest, envelope parsedEnvelope) {
|
||||
assertValid: func(t *testing.T, req *edgev1.ExecuteCommandRequest, envelope parsedEnvelope) {
|
||||
t.Helper()
|
||||
|
||||
assert.Equal(t, supportedProtocolVersion, envelope.ProtocolVersion)
|
||||
@@ -138,7 +138,7 @@ func TestParseExecuteCommandRequest(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var req *gatewayv1.ExecuteCommandRequest
|
||||
var req *edgev1.ExecuteCommandRequest
|
||||
if tt.name != "nil request" {
|
||||
req = newValidExecuteCommandRequest()
|
||||
if tt.mutate != nil {
|
||||
@@ -166,10 +166,10 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
mutate func(*gatewayv1.SubscribeEventsRequest)
|
||||
mutate func(*edgev1.SubscribeEventsRequest)
|
||||
wantCode codes.Code
|
||||
wantMessage string
|
||||
assertValid func(*testing.T, *gatewayv1.SubscribeEventsRequest, parsedEnvelope)
|
||||
assertValid func(*testing.T, *edgev1.SubscribeEventsRequest, parsedEnvelope)
|
||||
}{
|
||||
{
|
||||
name: "nil request",
|
||||
@@ -178,7 +178,7 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty protocol version",
|
||||
mutate: func(req *gatewayv1.SubscribeEventsRequest) {
|
||||
mutate: func(req *edgev1.SubscribeEventsRequest) {
|
||||
req.ProtocolVersion = ""
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -186,7 +186,7 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty device session id",
|
||||
mutate: func(req *gatewayv1.SubscribeEventsRequest) {
|
||||
mutate: func(req *edgev1.SubscribeEventsRequest) {
|
||||
req.DeviceSessionId = ""
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -194,7 +194,7 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty message type",
|
||||
mutate: func(req *gatewayv1.SubscribeEventsRequest) {
|
||||
mutate: func(req *edgev1.SubscribeEventsRequest) {
|
||||
req.MessageType = ""
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -202,7 +202,7 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "zero timestamp",
|
||||
mutate: func(req *gatewayv1.SubscribeEventsRequest) {
|
||||
mutate: func(req *edgev1.SubscribeEventsRequest) {
|
||||
req.TimestampMs = 0
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -210,7 +210,7 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty request id",
|
||||
mutate: func(req *gatewayv1.SubscribeEventsRequest) {
|
||||
mutate: func(req *edgev1.SubscribeEventsRequest) {
|
||||
req.RequestId = ""
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -218,7 +218,7 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty payload hash",
|
||||
mutate: func(req *gatewayv1.SubscribeEventsRequest) {
|
||||
mutate: func(req *edgev1.SubscribeEventsRequest) {
|
||||
req.PayloadHash = nil
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -226,7 +226,7 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "empty signature",
|
||||
mutate: func(req *gatewayv1.SubscribeEventsRequest) {
|
||||
mutate: func(req *edgev1.SubscribeEventsRequest) {
|
||||
req.Signature = nil
|
||||
},
|
||||
wantCode: codes.InvalidArgument,
|
||||
@@ -234,7 +234,7 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "unsupported protocol version",
|
||||
mutate: func(req *gatewayv1.SubscribeEventsRequest) {
|
||||
mutate: func(req *edgev1.SubscribeEventsRequest) {
|
||||
req.ProtocolVersion = "v2"
|
||||
},
|
||||
wantCode: codes.FailedPrecondition,
|
||||
@@ -243,7 +243,7 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
{
|
||||
name: "valid request with empty payload bytes",
|
||||
wantCode: codes.OK,
|
||||
assertValid: func(t *testing.T, req *gatewayv1.SubscribeEventsRequest, envelope parsedEnvelope) {
|
||||
assertValid: func(t *testing.T, req *edgev1.SubscribeEventsRequest, envelope parsedEnvelope) {
|
||||
t.Helper()
|
||||
|
||||
assert.Empty(t, req.GetPayloadBytes())
|
||||
@@ -260,7 +260,7 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var req *gatewayv1.SubscribeEventsRequest
|
||||
var req *edgev1.SubscribeEventsRequest
|
||||
if tt.name != "nil request" {
|
||||
req = newValidSubscribeEventsRequest()
|
||||
if tt.mutate != nil {
|
||||
@@ -286,10 +286,10 @@ func TestParseSubscribeEventsRequest(t *testing.T) {
|
||||
func TestEnvelopeValidatingServiceExecuteCommandRejectsInvalidRequestBeforeDelegate(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
service := newEnvelopeValidatingService(delegate)
|
||||
|
||||
_, err := service.ExecuteCommand(context.Background(), &gatewayv1.ExecuteCommandRequest{})
|
||||
_, err := service.ExecuteCommand(context.Background(), &edgev1.ExecuteCommandRequest{})
|
||||
require.Error(t, err)
|
||||
|
||||
assert.Equal(t, codes.InvalidArgument, status.Code(err))
|
||||
@@ -299,10 +299,10 @@ func TestEnvelopeValidatingServiceExecuteCommandRejectsInvalidRequestBeforeDeleg
|
||||
func TestEnvelopeValidatingServiceSubscribeEventsRejectsInvalidRequestBeforeDelegate(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
service := newEnvelopeValidatingService(delegate)
|
||||
|
||||
err := service.SubscribeEvents(&gatewayv1.SubscribeEventsRequest{}, stubGatewayEventStream{})
|
||||
err := service.SubscribeEvents(&edgev1.SubscribeEventsRequest{}, stubGatewayEventStream{})
|
||||
require.Error(t, err)
|
||||
|
||||
assert.Equal(t, codes.InvalidArgument, status.Code(err))
|
||||
@@ -313,15 +313,15 @@ func TestEnvelopeValidatingServiceExecuteCommandAttachesParsedEnvelope(t *testin
|
||||
t.Parallel()
|
||||
|
||||
want := newValidExecuteCommandRequest()
|
||||
delegate := &recordingEdgeGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
delegate := &recordingGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
envelope, ok := parsedEnvelopeFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, want.GetRequestId(), envelope.RequestID)
|
||||
assert.Equal(t, want.GetDeviceSessionId(), envelope.DeviceSessionID)
|
||||
assert.Equal(t, want.GetMessageType(), envelope.MessageType)
|
||||
assert.Equal(t, want.GetPayloadBytes(), envelope.PayloadBytes)
|
||||
return &gatewayv1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
return &edgev1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
},
|
||||
}
|
||||
service := newEnvelopeValidatingService(delegate)
|
||||
@@ -337,8 +337,8 @@ func TestEnvelopeValidatingServiceSubscribeEventsAttachesParsedEnvelope(t *testi
|
||||
t.Parallel()
|
||||
|
||||
want := newValidSubscribeEventsRequest()
|
||||
delegate := &recordingEdgeGatewayService{
|
||||
subscribeEventsFunc: func(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
delegate := &recordingGatewayService{
|
||||
subscribeEventsFunc: func(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
envelope, ok := parsedEnvelopeFromContext(stream.Context())
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, want.GetRequestId(), envelope.RequestID)
|
||||
@@ -357,25 +357,25 @@ func TestEnvelopeValidatingServiceSubscribeEventsAttachesParsedEnvelope(t *testi
|
||||
assert.Equal(t, 1, delegate.subscribeCalls)
|
||||
}
|
||||
|
||||
type recordingEdgeGatewayService struct {
|
||||
gatewayv1.UnimplementedEdgeGatewayServer
|
||||
type recordingGatewayService struct {
|
||||
edgev1.UnimplementedGatewayServer
|
||||
|
||||
executeCalls int
|
||||
subscribeCalls int
|
||||
executeCommandFunc func(context.Context, *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error)
|
||||
subscribeEventsFunc func(*gatewayv1.SubscribeEventsRequest, grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error
|
||||
executeCommandFunc func(context.Context, *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error)
|
||||
subscribeEventsFunc func(*edgev1.SubscribeEventsRequest, grpc.ServerStreamingServer[edgev1.GatewayEvent]) error
|
||||
}
|
||||
|
||||
func (s *recordingEdgeGatewayService) ExecuteCommand(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
func (s *recordingGatewayService) ExecuteCommand(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
s.executeCalls++
|
||||
if s.executeCommandFunc != nil {
|
||||
return s.executeCommandFunc(ctx, req)
|
||||
}
|
||||
|
||||
return &gatewayv1.ExecuteCommandResponse{}, nil
|
||||
return &edgev1.ExecuteCommandResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *recordingEdgeGatewayService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
func (s *recordingGatewayService) SubscribeEvents(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
s.subscribeCalls++
|
||||
if s.subscribeEventsFunc != nil {
|
||||
return s.subscribeEventsFunc(req, stream)
|
||||
@@ -389,7 +389,7 @@ type stubGatewayEventStream struct {
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (s stubGatewayEventStream) Send(*gatewayv1.GatewayEvent) error {
|
||||
func (s stubGatewayEventStream) Send(*edgev1.GatewayEvent) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
|
||||
"galaxy/gateway/internal/clock"
|
||||
"galaxy/gateway/internal/replay"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@@ -19,9 +19,9 @@ const minimumReplayReservationTTL = time.Millisecond
|
||||
// freshnessAndReplayService applies freshness and anti-replay checks after
|
||||
// client-signature verification and before later policy or routing steps run.
|
||||
type freshnessAndReplayService struct {
|
||||
gatewayv1.UnimplementedEdgeGatewayServer
|
||||
edgev1.UnimplementedGatewayServer
|
||||
|
||||
delegate gatewayv1.EdgeGatewayServer
|
||||
delegate edgev1.GatewayServer
|
||||
clock clock.Clock
|
||||
replayStore replay.Store
|
||||
freshnessWindow time.Duration
|
||||
@@ -29,7 +29,7 @@ type freshnessAndReplayService struct {
|
||||
|
||||
// ExecuteCommand verifies request freshness and replay protection before
|
||||
// delegating to the configured service implementation.
|
||||
func (s freshnessAndReplayService) ExecuteCommand(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
func (s freshnessAndReplayService) ExecuteCommand(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
if err := s.verifyFreshnessAndReplay(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -39,7 +39,7 @@ func (s freshnessAndReplayService) ExecuteCommand(ctx context.Context, req *gate
|
||||
|
||||
// SubscribeEvents verifies request freshness and replay protection before
|
||||
// delegating to the configured service implementation.
|
||||
func (s freshnessAndReplayService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
func (s freshnessAndReplayService) SubscribeEvents(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
if err := s.verifyFreshnessAndReplay(stream.Context()); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -49,7 +49,7 @@ func (s freshnessAndReplayService) SubscribeEvents(req *gatewayv1.SubscribeEvent
|
||||
|
||||
// newFreshnessAndReplayService wraps delegate with the freshness and replay
|
||||
// gate.
|
||||
func newFreshnessAndReplayService(delegate gatewayv1.EdgeGatewayServer, clk clock.Clock, replayStore replay.Store, freshnessWindow time.Duration) gatewayv1.EdgeGatewayServer {
|
||||
func newFreshnessAndReplayService(delegate edgev1.GatewayServer, clk clock.Clock, replayStore replay.Store, freshnessWindow time.Duration) edgev1.GatewayServer {
|
||||
return freshnessAndReplayService{
|
||||
delegate: delegate,
|
||||
clock: clk,
|
||||
@@ -92,4 +92,4 @@ func (unavailableReplayStore) Reserve(context.Context, string, string, time.Dura
|
||||
return errors.New("replay store is unavailable")
|
||||
}
|
||||
|
||||
var _ gatewayv1.EdgeGatewayServer = freshnessAndReplayService{}
|
||||
var _ edgev1.GatewayServer = freshnessAndReplayService{}
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
"galaxy/gateway/internal/replay"
|
||||
"galaxy/gateway/internal/session"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -40,7 +40,7 @@ func TestExecuteCommandRejectsStaleTimestamp(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
@@ -82,7 +82,7 @@ func TestSubscribeEventsRejectsStaleTimestamp(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
@@ -104,7 +104,7 @@ func TestSubscribeEventsRejectsStaleTimestamp(t *testing.T) {
|
||||
func TestExecuteCommandRejectsReplay(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
@@ -131,7 +131,7 @@ func TestExecuteCommandRejectsReplay(t *testing.T) {
|
||||
func TestSubscribeEventsRejectsReplay(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
@@ -162,9 +162,9 @@ func TestSubscribeEventsRejectsReplay(t *testing.T) {
|
||||
func TestExecuteCommandAllowsSameRequestIDAcrossDistinctSessions(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
return &gatewayv1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
delegate := &recordingGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
return &edgev1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
},
|
||||
}
|
||||
|
||||
@@ -196,8 +196,8 @@ func TestExecuteCommandAllowsSameRequestIDAcrossDistinctSessions(t *testing.T) {
|
||||
func TestSubscribeEventsAllowsSameRequestIDAcrossDistinctSessions(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{
|
||||
subscribeEventsFunc: func(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
delegate := &recordingGatewayService{
|
||||
subscribeEventsFunc: func(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
@@ -238,7 +238,7 @@ func TestSubscribeEventsAllowsSameRequestIDAcrossDistinctSessions(t *testing.T)
|
||||
func TestExecuteCommandRejectsReplayStoreUnavailable(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
@@ -262,7 +262,7 @@ func TestExecuteCommandRejectsReplayStoreUnavailable(t *testing.T) {
|
||||
func TestSubscribeEventsRejectsReplayStoreUnavailable(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
@@ -286,9 +286,9 @@ func TestSubscribeEventsRejectsReplayStoreUnavailable(t *testing.T) {
|
||||
func TestExecuteCommandFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
return &gatewayv1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
delegate := &recordingGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
return &edgev1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
},
|
||||
}
|
||||
|
||||
@@ -324,8 +324,8 @@ func TestExecuteCommandFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *tes
|
||||
func TestSubscribeEventsFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{
|
||||
subscribeEventsFunc: func(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
delegate := &recordingGatewayService{
|
||||
subscribeEventsFunc: func(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
@@ -361,9 +361,9 @@ func TestSubscribeEventsFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *te
|
||||
func TestExecuteCommandFutureSkewUsesExtendedReplayTTL(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
return &gatewayv1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
delegate := &recordingGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
return &edgev1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
},
|
||||
}
|
||||
|
||||
@@ -395,9 +395,9 @@ func TestExecuteCommandFutureSkewUsesExtendedReplayTTL(t *testing.T) {
|
||||
func TestExecuteCommandBoundaryFreshnessUsesMinimumReplayTTL(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
return &gatewayv1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
delegate := &recordingGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
return &edgev1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
|
||||
"galaxy/gateway/internal/logging"
|
||||
"galaxy/gateway/internal/telemetry"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.uber.org/zap"
|
||||
@@ -78,9 +78,9 @@ func recordEdgeRequest(logger *zap.Logger, metrics *telemetry.Runtime, ctx conte
|
||||
|
||||
func envelopeFieldsFromRequest(req any) (messageType string, requestID string, traceID string) {
|
||||
switch typed := req.(type) {
|
||||
case *gatewayv1.ExecuteCommandRequest:
|
||||
case *edgev1.ExecuteCommandRequest:
|
||||
return typed.GetMessageType(), typed.GetRequestId(), typed.GetTraceId()
|
||||
case *gatewayv1.SubscribeEventsRequest:
|
||||
case *edgev1.SubscribeEventsRequest:
|
||||
return typed.GetMessageType(), typed.GetRequestId(), typed.GetTraceId()
|
||||
default:
|
||||
return "", "", ""
|
||||
@@ -88,7 +88,7 @@ func envelopeFieldsFromRequest(req any) (messageType string, requestID string, t
|
||||
}
|
||||
|
||||
func resultCodeFromResponse(resp any) string {
|
||||
typed, ok := resp.(*gatewayv1.ExecuteCommandResponse)
|
||||
typed, ok := resp.(*edgev1.ExecuteCommandResponse)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"errors"
|
||||
|
||||
"galaxy/gateway/authn"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@@ -15,14 +15,14 @@ import (
|
||||
// payloadHashVerifyingService applies payload-hash verification after session
|
||||
// lookup and before any later auth or routing step runs.
|
||||
type payloadHashVerifyingService struct {
|
||||
gatewayv1.UnimplementedEdgeGatewayServer
|
||||
edgev1.UnimplementedGatewayServer
|
||||
|
||||
delegate gatewayv1.EdgeGatewayServer
|
||||
delegate edgev1.GatewayServer
|
||||
}
|
||||
|
||||
// ExecuteCommand verifies req payload integrity before delegating to the
|
||||
// configured service implementation.
|
||||
func (s payloadHashVerifyingService) ExecuteCommand(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
func (s payloadHashVerifyingService) ExecuteCommand(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
if err := verifyPayloadHash(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -32,7 +32,7 @@ func (s payloadHashVerifyingService) ExecuteCommand(ctx context.Context, req *ga
|
||||
|
||||
// SubscribeEvents verifies req payload integrity before delegating to the
|
||||
// configured service implementation.
|
||||
func (s payloadHashVerifyingService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
func (s payloadHashVerifyingService) SubscribeEvents(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
if err := verifyPayloadHash(stream.Context()); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -42,7 +42,7 @@ func (s payloadHashVerifyingService) SubscribeEvents(req *gatewayv1.SubscribeEve
|
||||
|
||||
// newPayloadHashVerifyingService wraps delegate with the payload-hash
|
||||
// verification gate.
|
||||
func newPayloadHashVerifyingService(delegate gatewayv1.EdgeGatewayServer) gatewayv1.EdgeGatewayServer {
|
||||
func newPayloadHashVerifyingService(delegate edgev1.GatewayServer) edgev1.GatewayServer {
|
||||
return payloadHashVerifyingService{delegate: delegate}
|
||||
}
|
||||
|
||||
@@ -63,4 +63,4 @@ func verifyPayloadHash(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
var _ gatewayv1.EdgeGatewayServer = payloadHashVerifyingService{}
|
||||
var _ edgev1.GatewayServer = payloadHashVerifyingService{}
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
func TestExecuteCommandRejectsPayloadHashWithInvalidLength(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
@@ -38,7 +38,7 @@ func TestExecuteCommandRejectsPayloadHashWithInvalidLength(t *testing.T) {
|
||||
func TestExecuteCommandRejectsPayloadHashMismatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
@@ -62,7 +62,7 @@ func TestExecuteCommandRejectsPayloadHashMismatch(t *testing.T) {
|
||||
func TestSubscribeEventsRejectsPayloadHashWithInvalidLength(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
@@ -85,7 +85,7 @@ func TestSubscribeEventsRejectsPayloadHashWithInvalidLength(t *testing.T) {
|
||||
func TestSubscribeEventsRejectsPayloadHashMismatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"galaxy/gateway/internal/logging"
|
||||
"galaxy/gateway/internal/push"
|
||||
"galaxy/gateway/internal/telemetry"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
@@ -22,7 +22,7 @@ import (
|
||||
// NewFanOutPushStreamService constructs the authenticated SubscribeEvents tail
|
||||
// service that registers active streams in hub and forwards client-facing
|
||||
// events after the bootstrap event has been sent.
|
||||
func NewFanOutPushStreamService(hub *push.Hub, responseSigner authn.ResponseSigner, clk clock.Clock, logger *zap.Logger) gatewayv1.EdgeGatewayServer {
|
||||
func NewFanOutPushStreamService(hub *push.Hub, responseSigner authn.ResponseSigner, clk clock.Clock, logger *zap.Logger) edgev1.GatewayServer {
|
||||
if responseSigner == nil {
|
||||
responseSigner = unavailableResponseSigner{}
|
||||
}
|
||||
@@ -44,7 +44,7 @@ func NewFanOutPushStreamService(hub *push.Hub, responseSigner authn.ResponseSign
|
||||
// fanOutPushStreamService owns the post-bootstrap authenticated push-stream
|
||||
// lifecycle backed by the in-memory push hub.
|
||||
type fanOutPushStreamService struct {
|
||||
gatewayv1.UnimplementedEdgeGatewayServer
|
||||
edgev1.UnimplementedGatewayServer
|
||||
|
||||
hub *push.Hub
|
||||
responseSigner authn.ResponseSigner
|
||||
@@ -54,7 +54,7 @@ type fanOutPushStreamService struct {
|
||||
|
||||
// SubscribeEvents registers the verified stream in the push hub and forwards
|
||||
// matching client-facing events until the stream ends.
|
||||
func (s fanOutPushStreamService) SubscribeEvents(_ *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
func (s fanOutPushStreamService) SubscribeEvents(_ *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
binding, ok := authenticatedStreamBindingFromContext(stream.Context())
|
||||
if !ok {
|
||||
return status.Error(codes.Internal, "authenticated request context is incomplete")
|
||||
@@ -109,7 +109,7 @@ func (s fanOutPushStreamService) SubscribeEvents(_ *gatewayv1.SubscribeEventsReq
|
||||
}
|
||||
}
|
||||
|
||||
func (s fanOutPushStreamService) buildGatewayEvent(event push.Event) (*gatewayv1.GatewayEvent, error) {
|
||||
func (s fanOutPushStreamService) buildGatewayEvent(event push.Event) (*edgev1.GatewayEvent, error) {
|
||||
timestampMS := s.clock.Now().UTC().UnixMilli()
|
||||
payloadHash := sha256.Sum256(event.PayloadBytes)
|
||||
|
||||
@@ -125,7 +125,7 @@ func (s fanOutPushStreamService) buildGatewayEvent(event push.Event) (*gatewayv1
|
||||
return nil, status.Error(codes.Unavailable, "response signer is unavailable")
|
||||
}
|
||||
|
||||
return &gatewayv1.GatewayEvent{
|
||||
return &edgev1.GatewayEvent{
|
||||
EventType: event.EventType,
|
||||
EventId: event.EventID,
|
||||
TimestampMs: timestampMS,
|
||||
@@ -169,4 +169,4 @@ func mapSubscriptionOutcome(err error) telemetry.EdgeOutcome {
|
||||
}
|
||||
}
|
||||
|
||||
var _ gatewayv1.EdgeGatewayServer = fanOutPushStreamService{}
|
||||
var _ edgev1.GatewayServer = fanOutPushStreamService{}
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"galaxy/gateway/internal/telemetry"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"google.golang.org/grpc"
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
// 15-second default a fully-idle stream costs ~840 KB/day per client;
|
||||
// see `docs/ARCHITECTURE.md` for the per-scale projection.
|
||||
type heartbeatingStream struct {
|
||||
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
|
||||
grpc.ServerStreamingServer[edgev1.GatewayEvent]
|
||||
|
||||
interval time.Duration
|
||||
metrics *telemetry.Runtime
|
||||
@@ -43,7 +43,7 @@ type heartbeatingStream struct {
|
||||
// the wrapping entirely; non-nil returns must have `Stop()` called once
|
||||
// the stream lifecycle ends.
|
||||
func newHeartbeatingStream(
|
||||
inner grpc.ServerStreamingServer[gatewayv1.GatewayEvent],
|
||||
inner grpc.ServerStreamingServer[edgev1.GatewayEvent],
|
||||
interval time.Duration,
|
||||
metrics *telemetry.Runtime,
|
||||
) *heartbeatingStream {
|
||||
@@ -64,7 +64,7 @@ func newHeartbeatingStream(
|
||||
// so the heartbeat goroutine waits a fresh interval before firing
|
||||
// again. A Send that succeeds means the transport just delivered real
|
||||
// bytes; the silence window restarts from "now".
|
||||
func (s *heartbeatingStream) Send(event *gatewayv1.GatewayEvent) error {
|
||||
func (s *heartbeatingStream) Send(event *edgev1.GatewayEvent) error {
|
||||
s.sendMu.Lock()
|
||||
defer s.sendMu.Unlock()
|
||||
if err := s.ServerStreamingServer.Send(event); err != nil {
|
||||
@@ -158,6 +158,6 @@ func (s *heartbeatingStream) resetTimerLocked() {
|
||||
// EventType is left at its proto3 default so the wire frame stays as
|
||||
// small as Connect framing allows. See `gatewayHeartbeatEventType` for
|
||||
// the security rationale of leaving the event unsigned.
|
||||
func buildHeartbeatEvent() *gatewayv1.GatewayEvent {
|
||||
return &gatewayv1.GatewayEvent{EventType: gatewayHeartbeatEventType}
|
||||
func buildHeartbeatEvent() *edgev1.GatewayEvent {
|
||||
return &edgev1.GatewayEvent{EventType: gatewayHeartbeatEventType}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -66,7 +66,7 @@ func TestHeartbeatingStreamRealSendResetsSilenceTimer(t *testing.T) {
|
||||
defer ticker.Stop()
|
||||
for range 6 {
|
||||
<-ticker.C
|
||||
if err := hb.Send(&gatewayv1.GatewayEvent{EventType: "real.event"}); err != nil {
|
||||
if err := hb.Send(&edgev1.GatewayEvent{EventType: "real.event"}); err != nil {
|
||||
t.Errorf("real Send failed: %v", err)
|
||||
return
|
||||
}
|
||||
@@ -134,16 +134,16 @@ func TestHeartbeatingStreamSendErrorPropagates(t *testing.T) {
|
||||
require.NotNil(t, hb)
|
||||
defer hb.Stop()
|
||||
|
||||
err := hb.Send(&gatewayv1.GatewayEvent{EventType: "real.event"})
|
||||
err := hb.Send(&edgev1.GatewayEvent{EventType: "real.event"})
|
||||
require.ErrorIs(t, err, wantErr)
|
||||
}
|
||||
|
||||
// capturingStream is a minimal grpc.ServerStreamingServer that pushes
|
||||
// every Send into a channel so tests can assert on the wire frame.
|
||||
type capturingStream struct {
|
||||
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
|
||||
grpc.ServerStreamingServer[edgev1.GatewayEvent]
|
||||
|
||||
events chan *gatewayv1.GatewayEvent
|
||||
events chan *edgev1.GatewayEvent
|
||||
sendErr atomic.Pointer[errorBox]
|
||||
}
|
||||
|
||||
@@ -152,10 +152,10 @@ type errorBox struct{ err error }
|
||||
func newCapturingStream(t *testing.T) *capturingStream {
|
||||
t.Helper()
|
||||
|
||||
return &capturingStream{events: make(chan *gatewayv1.GatewayEvent, 16)}
|
||||
return &capturingStream{events: make(chan *edgev1.GatewayEvent, 16)}
|
||||
}
|
||||
|
||||
func (s *capturingStream) Send(event *gatewayv1.GatewayEvent) error {
|
||||
func (s *capturingStream) Send(event *edgev1.GatewayEvent) error {
|
||||
if box := s.sendErr.Load(); box != nil {
|
||||
return box.err
|
||||
}
|
||||
@@ -172,7 +172,7 @@ func (s *capturingStream) SetTrailer(metadata.MD) {}
|
||||
func (s *capturingStream) SendMsg(any) error { return errors.New("capturingStream.SendMsg: unused") }
|
||||
func (s *capturingStream) RecvMsg(any) error { return errors.New("capturingStream.RecvMsg: unused") }
|
||||
|
||||
func (s *capturingStream) recv(t *testing.T, timeout time.Duration) *gatewayv1.GatewayEvent {
|
||||
func (s *capturingStream) recv(t *testing.T, timeout time.Duration) *edgev1.GatewayEvent {
|
||||
t.Helper()
|
||||
|
||||
select {
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"galaxy/gateway/authn"
|
||||
"galaxy/gateway/internal/clock"
|
||||
"galaxy/gateway/internal/telemetry"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
gatewayfbs "galaxy/schema/fbs/gateway"
|
||||
|
||||
flatbuffers "github.com/google/flatbuffers/go"
|
||||
@@ -81,9 +81,9 @@ func authenticatedStreamBindingFromContext(ctx context.Context) (authenticatedSt
|
||||
// the tail performs and only emits a heartbeat when the silence window
|
||||
// elapses; tails remain heartbeat-unaware.
|
||||
type authenticatedPushStreamService struct {
|
||||
gatewayv1.UnimplementedEdgeGatewayServer
|
||||
edgev1.UnimplementedGatewayServer
|
||||
|
||||
tailDelegate gatewayv1.EdgeGatewayServer
|
||||
tailDelegate edgev1.GatewayServer
|
||||
responseSigner authn.ResponseSigner
|
||||
clock clock.Clock
|
||||
heartbeatInterval time.Duration
|
||||
@@ -92,7 +92,7 @@ type authenticatedPushStreamService struct {
|
||||
|
||||
// SubscribeEvents binds the verified stream identity, sends the initial signed
|
||||
// server-time event, and then delegates the remaining lifecycle.
|
||||
func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
func (s authenticatedPushStreamService) SubscribeEvents(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
envelope, ok := parsedEnvelopeFromContext(stream.Context())
|
||||
if !ok {
|
||||
return status.Error(codes.Internal, "authenticated request context is incomplete")
|
||||
@@ -134,7 +134,7 @@ func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.Subscribe
|
||||
return status.Error(codes.Unavailable, "response signer is unavailable")
|
||||
}
|
||||
|
||||
if err := boundStream.Send(&gatewayv1.GatewayEvent{
|
||||
if err := boundStream.Send(&edgev1.GatewayEvent{
|
||||
EventType: serverTimeEventType,
|
||||
EventId: envelope.RequestID,
|
||||
TimestampMs: serverTimeMS,
|
||||
@@ -147,7 +147,7 @@ func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.Subscribe
|
||||
return err
|
||||
}
|
||||
|
||||
var streamForTail grpc.ServerStreamingServer[gatewayv1.GatewayEvent] = boundStream
|
||||
var streamForTail grpc.ServerStreamingServer[edgev1.GatewayEvent] = boundStream
|
||||
if hbStream := newHeartbeatingStream(boundStream, s.heartbeatInterval, s.metrics); hbStream != nil {
|
||||
defer hbStream.Stop()
|
||||
go func() {
|
||||
@@ -165,12 +165,12 @@ func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.Subscribe
|
||||
}
|
||||
|
||||
func newAuthenticatedPushStreamService(
|
||||
tailDelegate gatewayv1.EdgeGatewayServer,
|
||||
tailDelegate edgev1.GatewayServer,
|
||||
responseSigner authn.ResponseSigner,
|
||||
clk clock.Clock,
|
||||
heartbeatInterval time.Duration,
|
||||
metrics *telemetry.Runtime,
|
||||
) gatewayv1.EdgeGatewayServer {
|
||||
) edgev1.GatewayServer {
|
||||
if tailDelegate == nil {
|
||||
tailDelegate = holdOpenSubscribeEventsService{}
|
||||
}
|
||||
@@ -197,7 +197,7 @@ func buildServerTimeEventPayload(serverTimeMS int64) []byte {
|
||||
type authenticatedStreamBindingContextKey struct{}
|
||||
|
||||
type authenticatedStreamContextStream struct {
|
||||
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
|
||||
grpc.ServerStreamingServer[edgev1.GatewayEvent]
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
@@ -210,12 +210,12 @@ func (s authenticatedStreamContextStream) Context() context.Context {
|
||||
}
|
||||
|
||||
type holdOpenSubscribeEventsService struct {
|
||||
gatewayv1.UnimplementedEdgeGatewayServer
|
||||
edgev1.UnimplementedGatewayServer
|
||||
}
|
||||
|
||||
func (holdOpenSubscribeEventsService) SubscribeEvents(_ *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
func (holdOpenSubscribeEventsService) SubscribeEvents(_ *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
<-stream.Context().Done()
|
||||
return stream.Context().Err()
|
||||
}
|
||||
|
||||
var _ gatewayv1.EdgeGatewayServer = authenticatedPushStreamService{}
|
||||
var _ edgev1.GatewayServer = authenticatedPushStreamService{}
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"galaxy/gateway/internal/config"
|
||||
"galaxy/gateway/internal/ratelimit"
|
||||
"galaxy/gateway/internal/session"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@@ -102,9 +102,9 @@ type AuthenticatedRequestPolicy interface {
|
||||
}
|
||||
|
||||
type authenticatedRateLimitService struct {
|
||||
gatewayv1.UnimplementedEdgeGatewayServer
|
||||
edgev1.UnimplementedGatewayServer
|
||||
|
||||
delegate gatewayv1.EdgeGatewayServer
|
||||
delegate edgev1.GatewayServer
|
||||
limiter AuthenticatedRequestLimiter
|
||||
policy AuthenticatedRequestPolicy
|
||||
cfg config.AuthenticatedGRPCAntiAbuseConfig
|
||||
@@ -112,7 +112,7 @@ type authenticatedRateLimitService struct {
|
||||
|
||||
// ExecuteCommand applies authenticated rate limits and edge policy before
|
||||
// delegating to the configured service implementation.
|
||||
func (s authenticatedRateLimitService) ExecuteCommand(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
func (s authenticatedRateLimitService) ExecuteCommand(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
if err := s.applyRateLimitsAndPolicy(ctx, authenticatedRPCExecuteCommand); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -122,7 +122,7 @@ func (s authenticatedRateLimitService) ExecuteCommand(ctx context.Context, req *
|
||||
|
||||
// SubscribeEvents applies authenticated rate limits and edge policy before
|
||||
// delegating to the configured service implementation.
|
||||
func (s authenticatedRateLimitService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
func (s authenticatedRateLimitService) SubscribeEvents(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
if err := s.applyRateLimitsAndPolicy(stream.Context(), authenticatedRPCSubscribeEvents); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -132,7 +132,7 @@ func (s authenticatedRateLimitService) SubscribeEvents(req *gatewayv1.SubscribeE
|
||||
|
||||
// newAuthenticatedRateLimitService wraps delegate with the authenticated
|
||||
// rate-limit and edge-policy gate.
|
||||
func newAuthenticatedRateLimitService(delegate gatewayv1.EdgeGatewayServer, limiter AuthenticatedRequestLimiter, policy AuthenticatedRequestPolicy, cfg config.AuthenticatedGRPCAntiAbuseConfig) gatewayv1.EdgeGatewayServer {
|
||||
func newAuthenticatedRateLimitService(delegate edgev1.GatewayServer, limiter AuthenticatedRequestLimiter, policy AuthenticatedRequestPolicy, cfg config.AuthenticatedGRPCAntiAbuseConfig) edgev1.GatewayServer {
|
||||
return authenticatedRateLimitService{
|
||||
delegate: delegate,
|
||||
limiter: limiter,
|
||||
@@ -279,4 +279,4 @@ func (noopAuthenticatedRequestPolicy) Evaluate(context.Context, AuthenticatedReq
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ gatewayv1.EdgeGatewayServer = authenticatedRateLimitService{}
|
||||
var _ edgev1.GatewayServer = authenticatedRateLimitService{}
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
"galaxy/gateway/internal/ratelimit"
|
||||
"galaxy/gateway/internal/restapi"
|
||||
"galaxy/gateway/internal/session"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -24,7 +24,7 @@ import (
|
||||
func TestExecuteCommandRateLimitsByIP(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGatewayWithGRPCConfig(t, newAuthenticatedGRPCConfigForTest(func(cfg *config.AuthenticatedGRPCConfig) {
|
||||
cfg.AntiAbuse.IP = config.AuthenticatedRateLimitConfig{
|
||||
Requests: 1,
|
||||
@@ -54,7 +54,7 @@ func TestExecuteCommandRateLimitsByIP(t *testing.T) {
|
||||
func TestExecuteCommandRateLimitsBySession(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGatewayWithGRPCConfig(t, newAuthenticatedGRPCConfigForTest(func(cfg *config.AuthenticatedGRPCConfig) {
|
||||
cfg.AntiAbuse.Session = config.AuthenticatedRateLimitConfig{
|
||||
Requests: 1,
|
||||
@@ -87,7 +87,7 @@ func TestExecuteCommandRateLimitsBySession(t *testing.T) {
|
||||
func TestExecuteCommandRateLimitsByUser(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGatewayWithGRPCConfig(t, newAuthenticatedGRPCConfigForTest(func(cfg *config.AuthenticatedGRPCConfig) {
|
||||
cfg.AntiAbuse.User = config.AuthenticatedRateLimitConfig{
|
||||
Requests: 1,
|
||||
@@ -124,7 +124,7 @@ func TestExecuteCommandRateLimitsByUser(t *testing.T) {
|
||||
func TestExecuteCommandRateLimitsByMessageClass(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGatewayWithGRPCConfig(t, newAuthenticatedGRPCConfigForTest(func(cfg *config.AuthenticatedGRPCConfig) {
|
||||
cfg.AntiAbuse.MessageClass = config.AuthenticatedRateLimitConfig{
|
||||
Requests: 1,
|
||||
@@ -161,7 +161,7 @@ func TestAuthenticatedPolicyHookReceivesVerifiedRequest(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
policy := &recordingAuthenticatedRequestPolicy{}
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: userMappedSessionCache(map[string]string{"device-session-123": "user-123"}),
|
||||
@@ -189,7 +189,7 @@ func TestAuthenticatedPolicyHookReceivesVerifiedRequest(t *testing.T) {
|
||||
func TestExecuteCommandPolicyRejectMapsToPermissionDenied(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: userMappedSessionCache(map[string]string{"device-session-123": "user-123"}),
|
||||
@@ -212,7 +212,7 @@ func TestExecuteCommandPolicyRejectMapsToPermissionDenied(t *testing.T) {
|
||||
func TestSubscribeEventsRateLimitRejectsStream(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGatewayWithGRPCConfig(t, newAuthenticatedGRPCConfigForTest(func(cfg *config.AuthenticatedGRPCConfig) {
|
||||
cfg.AntiAbuse.IP = config.AuthenticatedRateLimitConfig{
|
||||
Requests: 1,
|
||||
@@ -274,7 +274,7 @@ func TestAuthenticatedRateLimitsStayIsolatedFromPublicREST(t *testing.T) {
|
||||
AuthService: staticAuthServiceClient{},
|
||||
Limiter: publicLimiterAdapter{limiter: sharedLimiter},
|
||||
})
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
grpcServer := NewServer(grpcCfg, ServerDependencies{
|
||||
Service: delegate,
|
||||
Router: executeCommandAdapterRouter{service: delegate},
|
||||
@@ -342,7 +342,7 @@ func newAuthenticatedGRPCConfigForTest(mutate func(*config.AuthenticatedGRPCConf
|
||||
return cfg
|
||||
}
|
||||
|
||||
func newValidExecuteCommandRequestWithMessageType(deviceSessionID string, requestID string, messageType string) *gatewayv1.ExecuteCommandRequest {
|
||||
func newValidExecuteCommandRequestWithMessageType(deviceSessionID string, requestID string, messageType string) *edgev1.ExecuteCommandRequest {
|
||||
req := newValidExecuteCommandRequestWithSessionAndRequestID(deviceSessionID, requestID)
|
||||
req.MessageType = messageType
|
||||
req.Signature = signRequest(
|
||||
|
||||
@@ -24,8 +24,8 @@ import (
|
||||
"galaxy/gateway/internal/replay"
|
||||
"galaxy/gateway/internal/session"
|
||||
"galaxy/gateway/internal/telemetry"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
"galaxy/gateway/proto/edge/v1/edgev1connect"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
@@ -42,7 +42,7 @@ type ServerDependencies struct {
|
||||
// after the initial authenticated service event has been sent. When nil, the
|
||||
// gateway keeps authenticated SubscribeEvents streams open until the client
|
||||
// cancels them, the server shuts down, or a later stream send fails.
|
||||
Service gatewayv1.EdgeGatewayServer
|
||||
Service edgev1.GatewayServer
|
||||
|
||||
// Router resolves the exact downstream unary client for the verified
|
||||
// message_type value. When nil, the authenticated unary surface uses an
|
||||
@@ -93,7 +93,7 @@ type ServerDependencies struct {
|
||||
// single net/http listener.
|
||||
type Server struct {
|
||||
cfg config.AuthenticatedGRPCConfig
|
||||
service gatewayv1.EdgeGatewayServer
|
||||
service edgev1.GatewayServer
|
||||
logger *zap.Logger
|
||||
pushHub *push.Hub
|
||||
metrics *telemetry.Runtime
|
||||
@@ -169,7 +169,7 @@ func (s *Server) Run(ctx context.Context) error {
|
||||
|
||||
mux := http.NewServeMux()
|
||||
connectHandler := newConnectEdgeAdapter(s.service)
|
||||
path, handler := gatewayv1connect.NewEdgeGatewayHandler(
|
||||
path, handler := edgev1connect.NewGatewayHandler(
|
||||
connectHandler,
|
||||
connect.WithInterceptors(observabilityConnectInterceptor(s.logger, s.metrics)),
|
||||
)
|
||||
|
||||
@@ -12,8 +12,8 @@ import (
|
||||
"galaxy/gateway/internal/app"
|
||||
"galaxy/gateway/internal/config"
|
||||
"galaxy/gateway/internal/session"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
"galaxy/gateway/proto/edge/v1/edgev1connect"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -30,7 +30,7 @@ func TestExecuteCommandRejectsMalformedEnvelope(t *testing.T) {
|
||||
addr := waitForListenAddr(t, server)
|
||||
client := newEdgeClient(t, addr)
|
||||
|
||||
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(&gatewayv1.ExecuteCommandRequest{}))
|
||||
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(&edgev1.ExecuteCommandRequest{}))
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err))
|
||||
}
|
||||
@@ -44,7 +44,7 @@ func TestSubscribeEventsRejectsMalformedEnvelope(t *testing.T) {
|
||||
addr := waitForListenAddr(t, server)
|
||||
client := newEdgeClient(t, addr)
|
||||
|
||||
err := subscribeEventsError(t, context.Background(), client, &gatewayv1.SubscribeEventsRequest{})
|
||||
err := subscribeEventsError(t, context.Background(), client, &edgev1.SubscribeEventsRequest{})
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err))
|
||||
}
|
||||
@@ -58,7 +58,7 @@ func TestExecuteCommandRejectsUnsupportedProtocolVersion(t *testing.T) {
|
||||
addr := waitForListenAddr(t, server)
|
||||
client := newEdgeClient(t, addr)
|
||||
|
||||
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(&gatewayv1.ExecuteCommandRequest{
|
||||
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(&edgev1.ExecuteCommandRequest{
|
||||
ProtocolVersion: "v2",
|
||||
DeviceSessionId: "device-session-123",
|
||||
MessageType: "fleet.move",
|
||||
@@ -418,7 +418,7 @@ func waitForListenAddr(t *testing.T, server *Server) string {
|
||||
// authenticated edge listener. AllowHTTP forces the client to issue plain
|
||||
// HTTP/2 requests (h2c) instead of attempting TLS, which the gateway's
|
||||
// in-process test bootstrap does not configure.
|
||||
func newEdgeClient(t *testing.T, addr string) gatewayv1connect.EdgeGatewayClient {
|
||||
func newEdgeClient(t *testing.T, addr string) edgev1connect.GatewayClient {
|
||||
t.Helper()
|
||||
|
||||
httpClient := &http.Client{
|
||||
@@ -429,7 +429,7 @@ func newEdgeClient(t *testing.T, addr string) gatewayv1connect.EdgeGatewayClient
|
||||
},
|
||||
},
|
||||
}
|
||||
return gatewayv1connect.NewEdgeGatewayClient(httpClient, "http://"+addr)
|
||||
return edgev1connect.NewGatewayClient(httpClient, "http://"+addr)
|
||||
}
|
||||
|
||||
// connectErrorMessage extracts the *connect.Error message from err. It
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"errors"
|
||||
|
||||
"galaxy/gateway/internal/session"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@@ -30,15 +30,15 @@ func resolvedSessionFromContext(ctx context.Context) (session.Record, bool) {
|
||||
// sessionLookupService resolves the authenticated session from SessionCache
|
||||
// after envelope parsing succeeds and before later auth steps run.
|
||||
type sessionLookupService struct {
|
||||
gatewayv1.UnimplementedEdgeGatewayServer
|
||||
edgev1.UnimplementedGatewayServer
|
||||
|
||||
delegate gatewayv1.EdgeGatewayServer
|
||||
delegate edgev1.GatewayServer
|
||||
cache session.Cache
|
||||
}
|
||||
|
||||
// ExecuteCommand resolves the cached session for req and only then forwards it
|
||||
// to the configured delegate with the resolved session attached to ctx.
|
||||
func (s sessionLookupService) ExecuteCommand(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
func (s sessionLookupService) ExecuteCommand(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
record, err := s.lookupSession(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -50,7 +50,7 @@ func (s sessionLookupService) ExecuteCommand(ctx context.Context, req *gatewayv1
|
||||
// SubscribeEvents resolves the cached session for req and only then forwards it
|
||||
// to the configured delegate with the resolved session attached to the stream
|
||||
// context.
|
||||
func (s sessionLookupService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
func (s sessionLookupService) SubscribeEvents(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
record, err := s.lookupSession(stream.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -63,7 +63,7 @@ func (s sessionLookupService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequ
|
||||
}
|
||||
|
||||
// newSessionLookupService wraps delegate with the session-cache lookup gate.
|
||||
func newSessionLookupService(delegate gatewayv1.EdgeGatewayServer, cache session.Cache) gatewayv1.EdgeGatewayServer {
|
||||
func newSessionLookupService(delegate edgev1.GatewayServer, cache session.Cache) edgev1.GatewayServer {
|
||||
return sessionLookupService{
|
||||
delegate: delegate,
|
||||
cache: cache,
|
||||
@@ -105,7 +105,7 @@ func cloneSessionRecord(record session.Record) session.Record {
|
||||
type resolvedSessionContextKey struct{}
|
||||
|
||||
type resolvedSessionContextStream struct {
|
||||
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
|
||||
grpc.ServerStreamingServer[edgev1.GatewayEvent]
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
@@ -126,4 +126,4 @@ func (unavailableSessionCache) Lookup(context.Context, string) (session.Record,
|
||||
func (unavailableSessionCache) MarkRevoked(string) {}
|
||||
func (unavailableSessionCache) MarkAllRevokedForUser(string) {}
|
||||
|
||||
var _ gatewayv1.EdgeGatewayServer = sessionLookupService{}
|
||||
var _ edgev1.GatewayServer = sessionLookupService{}
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"galaxy/gateway/internal/session"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
func TestExecuteCommandRejectsUnknownSession(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{
|
||||
@@ -40,7 +40,7 @@ func TestExecuteCommandRejectsUnknownSession(t *testing.T) {
|
||||
func TestSubscribeEventsRejectsUnknownSession(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{
|
||||
@@ -63,7 +63,7 @@ func TestSubscribeEventsRejectsUnknownSession(t *testing.T) {
|
||||
func TestExecuteCommandRejectsRevokedSession(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newRevokedSessionRecord(), nil }},
|
||||
@@ -82,7 +82,7 @@ func TestExecuteCommandRejectsRevokedSession(t *testing.T) {
|
||||
func TestSubscribeEventsRejectsRevokedSession(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newRevokedSessionRecord(), nil }},
|
||||
@@ -101,7 +101,7 @@ func TestSubscribeEventsRejectsRevokedSession(t *testing.T) {
|
||||
func TestExecuteCommandRejectsSessionCacheUnavailable(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{
|
||||
@@ -124,7 +124,7 @@ func TestExecuteCommandRejectsSessionCacheUnavailable(t *testing.T) {
|
||||
func TestSubscribeEventsRejectsSessionCacheUnavailable(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{
|
||||
@@ -147,12 +147,12 @@ func TestSubscribeEventsRejectsSessionCacheUnavailable(t *testing.T) {
|
||||
func TestExecuteCommandAttachesResolvedSession(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
delegate := &recordingGatewayService{
|
||||
executeCommandFunc: func(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
record, ok := resolvedSessionFromContext(ctx)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, newActiveSessionRecord(), record)
|
||||
return &gatewayv1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
return &edgev1.ExecuteCommandResponse{RequestId: req.GetRequestId()}, nil
|
||||
},
|
||||
}
|
||||
|
||||
@@ -173,8 +173,8 @@ func TestExecuteCommandAttachesResolvedSession(t *testing.T) {
|
||||
func TestSubscribeEventsAttachesResolvedSession(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{
|
||||
subscribeEventsFunc: func(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
delegate := &recordingGatewayService{
|
||||
subscribeEventsFunc: func(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
record, ok := resolvedSessionFromContext(stream.Context())
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, newActiveSessionRecord(), record)
|
||||
@@ -204,8 +204,8 @@ func TestSubscribeEventsAttachesResolvedSession(t *testing.T) {
|
||||
func TestSubscribeEventsAttachesAuthenticatedStreamBinding(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{
|
||||
subscribeEventsFunc: func(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
delegate := &recordingGatewayService{
|
||||
subscribeEventsFunc: func(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
binding, ok := authenticatedStreamBindingFromContext(stream.Context())
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, authenticatedStreamBinding{
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"errors"
|
||||
|
||||
"galaxy/gateway/authn"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@@ -15,14 +15,14 @@ import (
|
||||
// signatureVerifyingService applies client-signature verification after
|
||||
// payload integrity checks and before later auth or routing steps run.
|
||||
type signatureVerifyingService struct {
|
||||
gatewayv1.UnimplementedEdgeGatewayServer
|
||||
edgev1.UnimplementedGatewayServer
|
||||
|
||||
delegate gatewayv1.EdgeGatewayServer
|
||||
delegate edgev1.GatewayServer
|
||||
}
|
||||
|
||||
// ExecuteCommand verifies req client signature before delegating to the
|
||||
// configured service implementation.
|
||||
func (s signatureVerifyingService) ExecuteCommand(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
|
||||
func (s signatureVerifyingService) ExecuteCommand(ctx context.Context, req *edgev1.ExecuteCommandRequest) (*edgev1.ExecuteCommandResponse, error) {
|
||||
if err := verifyRequestSignature(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -32,7 +32,7 @@ func (s signatureVerifyingService) ExecuteCommand(ctx context.Context, req *gate
|
||||
|
||||
// SubscribeEvents verifies req client signature before delegating to the
|
||||
// configured service implementation.
|
||||
func (s signatureVerifyingService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
||||
func (s signatureVerifyingService) SubscribeEvents(req *edgev1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[edgev1.GatewayEvent]) error {
|
||||
if err := verifyRequestSignature(stream.Context()); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -42,7 +42,7 @@ func (s signatureVerifyingService) SubscribeEvents(req *gatewayv1.SubscribeEvent
|
||||
|
||||
// newSignatureVerifyingService wraps delegate with the client-signature
|
||||
// verification gate.
|
||||
func newSignatureVerifyingService(delegate gatewayv1.EdgeGatewayServer) gatewayv1.EdgeGatewayServer {
|
||||
func newSignatureVerifyingService(delegate edgev1.GatewayServer) edgev1.GatewayServer {
|
||||
return signatureVerifyingService{delegate: delegate}
|
||||
}
|
||||
|
||||
@@ -77,4 +77,4 @@ func verifyRequestSignature(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
var _ gatewayv1.EdgeGatewayServer = signatureVerifyingService{}
|
||||
var _ edgev1.GatewayServer = signatureVerifyingService{}
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
func TestExecuteCommandRejectsInvalidSignature(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
@@ -37,7 +37,7 @@ func TestExecuteCommandRejectsInvalidSignature(t *testing.T) {
|
||||
func TestExecuteCommandRejectsWrongKey(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{
|
||||
@@ -62,7 +62,7 @@ func TestExecuteCommandRejectsWrongKey(t *testing.T) {
|
||||
func TestExecuteCommandRejectsInvalidCachedPublicKey(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{
|
||||
@@ -87,7 +87,7 @@ func TestExecuteCommandRejectsInvalidCachedPublicKey(t *testing.T) {
|
||||
func TestSubscribeEventsRejectsInvalidSignature(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{lookupFunc: func(context.Context, string) (session.Record, error) { return newActiveSessionRecord(), nil }},
|
||||
@@ -110,7 +110,7 @@ func TestSubscribeEventsRejectsInvalidSignature(t *testing.T) {
|
||||
func TestSubscribeEventsRejectsWrongKey(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{
|
||||
@@ -135,7 +135,7 @@ func TestSubscribeEventsRejectsWrongKey(t *testing.T) {
|
||||
func TestSubscribeEventsRejectsInvalidCachedPublicKey(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
delegate := &recordingEdgeGatewayService{}
|
||||
delegate := &recordingGatewayService{}
|
||||
server, runGateway := newTestGateway(t, ServerDependencies{
|
||||
Service: delegate,
|
||||
SessionCache: staticSessionCache{
|
||||
|
||||
@@ -13,8 +13,8 @@ import (
|
||||
"galaxy/gateway/authn"
|
||||
"galaxy/gateway/internal/downstream"
|
||||
"galaxy/gateway/internal/session"
|
||||
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
||||
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
|
||||
edgev1 "galaxy/gateway/proto/edge/v1"
|
||||
"galaxy/gateway/proto/edge/v1/edgev1connect"
|
||||
|
||||
gatewayfbs "galaxy/schema/fbs/gateway"
|
||||
|
||||
@@ -29,19 +29,19 @@ var (
|
||||
testFreshnessWindow = 5 * time.Minute
|
||||
)
|
||||
|
||||
func newValidExecuteCommandRequest() *gatewayv1.ExecuteCommandRequest {
|
||||
func newValidExecuteCommandRequest() *edgev1.ExecuteCommandRequest {
|
||||
return newValidExecuteCommandRequestWithSessionAndRequestID("device-session-123", "request-123")
|
||||
}
|
||||
|
||||
func newValidExecuteCommandRequestWithSessionAndRequestID(deviceSessionID string, requestID string) *gatewayv1.ExecuteCommandRequest {
|
||||
func newValidExecuteCommandRequestWithSessionAndRequestID(deviceSessionID string, requestID string) *edgev1.ExecuteCommandRequest {
|
||||
return newValidExecuteCommandRequestWithTimestamp(deviceSessionID, requestID, testCurrentTime.UnixMilli())
|
||||
}
|
||||
|
||||
func newValidExecuteCommandRequestWithTimestamp(deviceSessionID string, requestID string, timestampMS int64) *gatewayv1.ExecuteCommandRequest {
|
||||
func newValidExecuteCommandRequestWithTimestamp(deviceSessionID string, requestID string, timestampMS int64) *edgev1.ExecuteCommandRequest {
|
||||
payloadBytes := []byte("payload")
|
||||
payloadHash := sha256.Sum256(payloadBytes)
|
||||
|
||||
req := &gatewayv1.ExecuteCommandRequest{
|
||||
req := &edgev1.ExecuteCommandRequest{
|
||||
ProtocolVersion: supportedProtocolVersion,
|
||||
DeviceSessionId: deviceSessionID,
|
||||
MessageType: "fleet.move",
|
||||
@@ -56,18 +56,18 @@ func newValidExecuteCommandRequestWithTimestamp(deviceSessionID string, requestI
|
||||
return req
|
||||
}
|
||||
|
||||
func newValidSubscribeEventsRequest() *gatewayv1.SubscribeEventsRequest {
|
||||
func newValidSubscribeEventsRequest() *edgev1.SubscribeEventsRequest {
|
||||
return newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-123", "request-123")
|
||||
}
|
||||
|
||||
func newValidSubscribeEventsRequestWithSessionAndRequestID(deviceSessionID string, requestID string) *gatewayv1.SubscribeEventsRequest {
|
||||
func newValidSubscribeEventsRequestWithSessionAndRequestID(deviceSessionID string, requestID string) *edgev1.SubscribeEventsRequest {
|
||||
return newValidSubscribeEventsRequestWithTimestamp(deviceSessionID, requestID, testCurrentTime.UnixMilli())
|
||||
}
|
||||
|
||||
func newValidSubscribeEventsRequestWithTimestamp(deviceSessionID string, requestID string, timestampMS int64) *gatewayv1.SubscribeEventsRequest {
|
||||
func newValidSubscribeEventsRequestWithTimestamp(deviceSessionID string, requestID string, timestampMS int64) *edgev1.SubscribeEventsRequest {
|
||||
payloadHash := sha256.Sum256(nil)
|
||||
|
||||
req := &gatewayv1.SubscribeEventsRequest{
|
||||
req := &edgev1.SubscribeEventsRequest{
|
||||
ProtocolVersion: supportedProtocolVersion,
|
||||
DeviceSessionId: deviceSessionID,
|
||||
MessageType: "gateway.subscribe",
|
||||
@@ -172,7 +172,7 @@ func (c fixedClock) Now() time.Time {
|
||||
func recvBootstrapEvent(t interface {
|
||||
require.TestingT
|
||||
Helper()
|
||||
}, stream *connect.ServerStreamForClient[gatewayv1.GatewayEvent]) *gatewayv1.GatewayEvent {
|
||||
}, stream *connect.ServerStreamForClient[edgev1.GatewayEvent]) *edgev1.GatewayEvent {
|
||||
t.Helper()
|
||||
|
||||
if !stream.Receive() {
|
||||
@@ -189,7 +189,7 @@ func recvBootstrapEvent(t interface {
|
||||
func subscribeEventsError(t interface {
|
||||
require.TestingT
|
||||
Helper()
|
||||
}, ctx context.Context, client gatewayv1connect.EdgeGatewayClient, req *gatewayv1.SubscribeEventsRequest) error {
|
||||
}, ctx context.Context, client edgev1connect.GatewayClient, req *edgev1.SubscribeEventsRequest) error {
|
||||
t.Helper()
|
||||
|
||||
stream, err := client.SubscribeEvents(ctx, connect.NewRequest(req))
|
||||
@@ -208,7 +208,7 @@ func subscribeEventsError(t interface {
|
||||
func assertServerTimeBootstrapEvent(t interface {
|
||||
require.TestingT
|
||||
Helper()
|
||||
}, event *gatewayv1.GatewayEvent, publicKey ed25519.PublicKey, wantRequestID string, wantTraceID string, wantTimestampMS int64) {
|
||||
}, event *edgev1.GatewayEvent, publicKey ed25519.PublicKey, wantRequestID string, wantTraceID string, wantTimestampMS int64) {
|
||||
t.Helper()
|
||||
|
||||
require.NotNil(t, event)
|
||||
@@ -244,7 +244,7 @@ func (s staticReplayStore) Reserve(ctx context.Context, deviceSessionID string,
|
||||
}
|
||||
|
||||
type executeCommandAdapterRouter struct {
|
||||
service gatewayv1.EdgeGatewayServer
|
||||
service edgev1.GatewayServer
|
||||
}
|
||||
|
||||
func (r executeCommandAdapterRouter) Route(string) (downstream.Client, error) {
|
||||
@@ -252,11 +252,11 @@ func (r executeCommandAdapterRouter) Route(string) (downstream.Client, error) {
|
||||
}
|
||||
|
||||
type executeCommandAdapterClient struct {
|
||||
service gatewayv1.EdgeGatewayServer
|
||||
service edgev1.GatewayServer
|
||||
}
|
||||
|
||||
func (c executeCommandAdapterClient) ExecuteCommand(ctx context.Context, command downstream.AuthenticatedCommand) (downstream.UnaryResult, error) {
|
||||
response, err := c.service.ExecuteCommand(ctx, &gatewayv1.ExecuteCommandRequest{
|
||||
response, err := c.service.ExecuteCommand(ctx, &edgev1.ExecuteCommandRequest{
|
||||
ProtocolVersion: command.ProtocolVersion,
|
||||
DeviceSessionId: command.DeviceSessionID,
|
||||
MessageType: command.MessageType,
|
||||
|
||||
Reference in New Issue
Block a user