phase 4: connectrpc on the gateway authenticated edge

Replace the native-gRPC server bootstrap with a single
`connectrpc.com/connect` HTTP/h2c listener. Connect-Go natively
serves Connect, gRPC, and gRPC-Web on the same port, so browsers can
now reach the authenticated surface without giving up the gRPC
framing native and desktop clients may use later. The decorator
stack (envelope → session → payload-hash → signature →
freshness/replay → rate-limit → routing/push) is reused unchanged
behind a small Connect → gRPC adapter and a `grpc.ServerStream`
shim around `*connect.ServerStream`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Ilia Denisov
2026-05-07 11:49:28 +02:00
parent 39b7b2ef29
commit 118f7c17a2
30 changed files with 1009 additions and 772 deletions
+9
View File
@@ -531,6 +531,15 @@ This section describes the secure exchange model between client and
gateway. It applies at the public boundary and does not rely on backend
behaviour for any of its guarantees.
The authenticated edge listener is built on `connectrpc.com/connect` and
natively serves the Connect, gRPC, and gRPC-Web protocols on a single
HTTP/2 cleartext (`h2c`) port. Browser clients use Connect via
`@connectrpc/connect-web`; native iOS / Android / desktop clients can
use either Connect or raw gRPC framing against the same listener.
Envelope, signature, freshness, and anti-replay rules below are
protocol-agnostic — they apply identically to every supported wire
framing.
### Principles
- No browser cookies.
+10 -8
View File
@@ -139,9 +139,10 @@ consumed exactly once.
### 1.4 Per-request session lookup
Once the client holds a device session id and a private key, every
authenticated call is a signed gRPC request to gateway. Gateway is the
only component that ever sees the request signature; backend trusts
gateway's verdict.
authenticated call is a signed request to gateway over the
authenticated edge listener (Connect / gRPC / gRPC-Web on a single
HTTP/h2c port). Gateway is the only component that ever sees the
request signature; backend trusts gateway's verdict.
Gateway needs the session's public key to verify the signature, so each
authenticated request resolves the device session through an in-memory
@@ -602,8 +603,8 @@ not duplicated here.
### 6.2 Backend's role: pass-through with authorisation
The signed-gRPC pipeline for in-game traffic uses three message types
on the authenticated surface — `user.games.command`,
The signed authenticated-edge pipeline for in-game traffic uses three
message types on the authenticated surface — `user.games.command`,
`user.games.order`, `user.games.report` — each with a typed
FlatBuffers payload. Gateway transcodes the FB request into the JSON
shape backend expects, forwards over plain REST to the corresponding
@@ -680,9 +681,10 @@ session invalidations).
### 7.1 Scope
In scope: the gRPC stream a client opens against gateway, the
bootstrap event, the framing of forwarded events, and the
backend → gateway control channel that produces those events.
In scope: the server-streaming subscription a client opens against
gateway (Connect / gRPC / gRPC-Web framing all map to the same
endpoint), the bootstrap event, the framing of forwarded events, and
the backend → gateway control channel that produces those events.
Out of scope: the catalog of event kinds — see [Section 8](#8-notifications-and-mail) for the
notification side and [`backend/README.md` §10](../backend/README.md#10-notification-catalog) for the closed list.
+12 -10
View File
@@ -138,9 +138,10 @@ Throttle-переиспользование на стороне send означ
### 1.4 Поиск сессии для каждого запроса
Когда у клиента есть идентификатор устройства-сессии и приватный ключ,
каждый аутентифицированный вызов — это подписанный gRPC-запрос к
gateway. Gateway — единственный компонент, который видит подпись
запроса; backend доверяет вердикту gateway.
каждый аутентифицированный вызов — это подписанный запрос к gateway
по аутентифицированному edge-листенеру (Connect / gRPC / gRPC-Web на
одном HTTP/h2c-порту). Gateway — единственный компонент, который видит
подпись запроса; backend доверяет вердикту gateway.
Gateway нужен публичный ключ сессии для проверки подписи, поэтому
каждый аутентифицированный запрос разрешает устройство-сессию через
@@ -618,10 +619,10 @@ Wire-формат команд, приказов и отчётов — собс
### 6.2 Роль backend: pass-through с авторизацией
Signed-gRPC-конвейер для in-game-трафика использует три message
types на аутентифицированной поверхности — `user.games.command`,
`user.games.order`, `user.games.report` у каждого типизированный
FlatBuffers-payload. Gateway транскодирует FB-запрос в JSON-форму,
Подписанный конвейер аутентифицированного edge для in-game-трафика
использует три message types на аутентифицированной поверхности —
`user.games.command`, `user.games.order`, `user.games.report`
у каждого типизированный FlatBuffers-payload. Gateway транскодирует FB-запрос в JSON-форму,
которую ждёт backend, форвардит её REST'ом в соответствующий
`/api/v1/user/games/{game_id}/*` endpoint, после чего транскодирует
JSON-ответ обратно в FB перед подписью.
@@ -697,9 +698,10 @@ notification-каталог явно их опускает
### 7.1 Состав
В составе: gRPC-стрим, который клиент открывает к gateway,
bootstrap-событие, фрейминг форварднутых событий, control-канал
backend → gateway, который производит эти события.
В составе: server-streaming-подписка, которую клиент открывает к
gateway (Connect / gRPC / gRPC-Web фреймы все маршрутизируются на
одну точку), bootstrap-событие, фрейминг форварднутых событий,
control-канал backend → gateway, который производит эти события.
Вне состава: каталог видов событий — см.
[Раздел 8](#8-уведомления-и-почта) для notification-стороны и
+33 -16
View File
@@ -87,7 +87,15 @@ The gateway exposes two external transport classes.
| Transport | Audience | Authentication | Payload format | Primary use |
| --- | --- | --- | --- | --- |
| REST/JSON | Public, unauthenticated traffic | No device session auth | JSON | Health checks, public auth commands, and browser/bootstrap traffic |
| gRPC over HTTP/2 | Authenticated clients only | Required | FlatBuffers payload inside protobuf control envelope | Verified commands and push delivery |
| Connect / gRPC / gRPC-Web over HTTP/2 (h2c) | Authenticated clients only | Required | FlatBuffers payload inside protobuf control envelope | Verified commands and push delivery |
The authenticated edge listener is built on
[`connectrpc.com/connect`](https://connectrpc.com/) and natively serves
the Connect, gRPC, and gRPC-Web protocols on a single HTTP/2 cleartext
(`h2c`) port. Browser clients use `@connectrpc/connect-web`; native
clients can use either Connect or raw gRPC framing against the same
listener. Production TLS termination happens upstream of the gateway,
matching the previous gRPC-only deployment posture.
### Public REST Surface
@@ -181,16 +189,21 @@ The endpoint exposes metrics in the Prometheus text exposition format described
in the official Prometheus documentation:
<https://prometheus.io/docs/instrumenting/exposition_formats/>.
### Authenticated gRPC Surface
### Authenticated Edge Surface
All authenticated client requests use HTTP/2 and gRPC.
The listener address is configured by `GATEWAY_AUTHENTICATED_GRPC_ADDR`.
Inbound authenticated gRPC connection setup is bounded by
All authenticated client requests use HTTP/2 cleartext (`h2c`) and are
served through `connectrpc.com/connect`, which natively accepts the
Connect, gRPC, and gRPC-Web protocols on the same listener.
The listener address is configured by `GATEWAY_AUTHENTICATED_GRPC_ADDR`
(the env-var name retains the historical `GRPC` infix for operational
stability — it labels the authenticated edge tier, not the wire
protocol).
Inbound authenticated edge connection setup is bounded by
`GATEWAY_AUTHENTICATED_GRPC_CONNECTION_TIMEOUT`, which defaults to `5s`.
The accepted client timestamp skew is configured by
`GATEWAY_AUTHENTICATED_GRPC_FRESHNESS_WINDOW` and defaults to `5m`.
The public gRPC service exposes two methods:
The public service exposes two methods:
- `ExecuteCommand(ExecuteCommandRequest) returns (ExecuteCommandResponse)`
- `SubscribeEvents(SubscribeEventsRequest) returns (stream GatewayEvent)`
@@ -200,9 +213,12 @@ The gateway routes the request downstream by `message_type` after transport
verification succeeds.
Downstream unary execution is bounded by
`GATEWAY_AUTHENTICATED_DOWNSTREAM_TIMEOUT`, which defaults to `5s`.
When that timeout expires, the gateway preserves the authenticated gRPC
contract and returns gRPC `UNAVAILABLE` with message
`downstream service is unavailable`.
When that timeout expires, the gateway preserves the authenticated edge
contract and returns `UNAVAILABLE` with message
`downstream service is unavailable`. Reject codes are documented using
their gRPC names (`INVALID_ARGUMENT`, `UNAUTHENTICATED`, …); the same
codes flow back to Connect clients as the corresponding `connect.Code*`
values.
`SubscribeEvents` is an authenticated server-streaming RPC.
It binds the stream to `user_id` and `device_session_id` and starts by sending
@@ -211,8 +227,9 @@ a signed service event that includes the current server time in milliseconds.
The v1 protobuf contract lives in
`proto/galaxy/gateway/v1/edge_gateway.proto` under package
`galaxy.gateway.v1` and service `EdgeGateway`.
Generated Go bindings are committed under `proto/galaxy/gateway/v1/` and are
regenerated with:
Generated Go bindings are committed under
`proto/galaxy/gateway/v1/` (gRPC stubs and `gatewayv1connect/` Connect
handlers) and are regenerated with:
```bash
buf generate
@@ -286,8 +303,8 @@ affected stream is closed with gRPC `RESOURCE_EXHAUSTED` and message
same `device_session_id` was revoked, every active `SubscribeEvents` stream
bound to that exact session is closed with gRPC `FAILED_PRECONDITION` and
message `device session is revoked`. During gateway shutdown, the in-memory
push hub is closed before gRPC graceful stop, and every active
`SubscribeEvents` stream is terminated with gRPC `UNAVAILABLE` and message
push hub is closed before HTTP graceful stop, and every active
`SubscribeEvents` stream is terminated with `UNAVAILABLE` and message
`gateway is shutting down`.
Authenticated anti-abuse budgets are configured by the
`GATEWAY_AUTHENTICATED_GRPC_ANTI_ABUSE_*` environment variables.
@@ -851,9 +868,9 @@ subscribers, and telemetry runtime.
`GATEWAY_SHUTDOWN_TIMEOUT` configures the per-component graceful shutdown
budget and defaults to `5s`.
During authenticated gRPC shutdown, the in-memory `PushHub` closes active
streams before gRPC graceful stop, so active `SubscribeEvents` calls terminate
with gRPC `UNAVAILABLE` and message `gateway is shutting down`.
During authenticated edge shutdown, the in-memory `PushHub` closes active
streams before HTTP graceful stop, so active `SubscribeEvents` calls terminate
with `UNAVAILABLE` and message `gateway is shutting down`.
## Recommended Package Layout
+4
View File
@@ -9,3 +9,7 @@ plugins:
out: proto
opt:
- paths=source_relative
- remote: buf.build/connectrpc/go:v1.19.2
out: proto
opt:
- paths=source_relative
+1 -1
View File
@@ -75,6 +75,6 @@ sequenceDiagram
Dispatcher->>Hub: RevokeDeviceSession or RevokeAllForUser
Hub-->>Client: stream closes with FAILED_PRECONDITION
Note over Gateway,Hub: During shutdown the gateway closes PushHub before gRPC graceful stop.
Note over Gateway,Hub: During shutdown the gateway closes PushHub before HTTP graceful stop.
Hub-->>Client: stream closes with UNAVAILABLE
```
+2 -2
View File
@@ -80,8 +80,8 @@ Shutdown behavior:
- the per-component shutdown budget is controlled by
`GATEWAY_SHUTDOWN_TIMEOUT`;
- internal subscribers are stopped as part of application shutdown;
- the in-memory `PushHub` is closed before gRPC graceful stop;
- active `SubscribeEvents` streams terminate with gRPC `UNAVAILABLE` and
- the in-memory `PushHub` is closed before HTTP graceful stop;
- active `SubscribeEvents` streams terminate with `UNAVAILABLE` and
message `gateway is shutting down`.
During planned restarts:
+7 -3
View File
@@ -7,12 +7,12 @@ runtime dependencies.
flowchart LR
subgraph Clients
Public["Public REST clients"]
Authd["Authenticated gRPC clients"]
Authd["Authenticated edge clients\n(Connect / gRPC / gRPC-Web)"]
end
subgraph Gateway["Edge Gateway process"]
PublicHTTP["Public HTTP listener\n/healthz /readyz /api/v1/public/auth/*"]
AuthGRPC["Authenticated gRPC listener\nExecuteCommand / SubscribeEvents"]
AuthGRPC["Authenticated edge listener (h2c)\nConnect / gRPC / gRPC-Web\nExecuteCommand / SubscribeEvents"]
AdminHTTP["Optional admin HTTP listener\n/metrics"]
BackendREST["backendclient.RESTClient\nsessions + public auth + user/lobby"]
BackendPush["backendclient.PushClient\nSubscribePush consumer"]
@@ -48,9 +48,13 @@ Notes:
- `cmd/gateway` refuses startup when Redis connectivity, the backend endpoint,
or the response signer is misconfigured.
- Session lookup is synchronous: every authenticated gRPC request triggers one
- Session lookup is synchronous: every authenticated edge request triggers one
`GET /api/v1/internal/sessions/{id}` call to backend; there is no
process-local projection.
- The authenticated edge listener is built on `connectrpc.com/connect` and
natively serves the Connect, gRPC, and gRPC-Web protocols on a single
HTTP/2 cleartext (`h2c`) port. Browsers use Connect; native clients can
use either Connect or raw gRPC framing against the same listener.
- `backendclient.PushClient` keeps a long-lived `Push.SubscribePush` stream
open. The dispatcher converts inbound `pushv1.PushEvent` frames into either
`PushHub.Publish` (for client events) or `PushHub.RevokeDeviceSession` /
+4 -1
View File
@@ -5,6 +5,7 @@ go 1.26.1
require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1
buf.build/go/protovalidate v1.1.3
connectrpc.com/connect v1.19.2
galaxy/core v0.0.0-00010101000000-000000000000
galaxy/redisconn v0.0.0-00010101000000-000000000000
github.com/alicebob/miniredis/v2 v2.37.0
@@ -17,6 +18,7 @@ require (
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.68.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.67.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0
@@ -26,6 +28,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
go.uber.org/zap v1.27.1
golang.org/x/net v0.53.0
golang.org/x/text v0.36.0
golang.org/x/time v0.15.0
google.golang.org/grpc v1.80.0
@@ -44,6 +47,7 @@ require (
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gabriel-vasile/mimetype v1.4.13 // indirect
github.com/gin-contrib/sse v1.1.1 // indirect
github.com/go-logr/logr v1.4.3 // indirect
@@ -95,7 +99,6 @@ require (
golang.org/x/arch v0.25.0 // indirect
golang.org/x/crypto v0.50.0 // indirect
golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/sys v0.43.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260420184626-e10c466a9529 // indirect
+6
View File
@@ -4,6 +4,8 @@ buf.build/go/protovalidate v1.1.3 h1:m2GVEgQWd7rk+vIoAZ+f0ygGjvQTuqPQapBBdcpWVPE
buf.build/go/protovalidate v1.1.3/go.mod h1:9XIuohWz+kj+9JVn3WQneHA5LZP50mjvneZMnbLkiIE=
cel.dev/expr v0.25.1 h1:1KrZg61W6TWSxuNZ37Xy49ps13NUovb66QLprthtwi4=
cel.dev/expr v0.25.1/go.mod h1:hrXvqGP6G6gyx8UAHSHJ5RGk//1Oj5nXQ2NI02Nrsg4=
connectrpc.com/connect v1.19.2 h1:McQ83FGdzL+t60peksi0gXC7MQ/iLKgLduAnThbM0mo=
connectrpc.com/connect v1.19.2/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w=
github.com/alicebob/miniredis/v2 v2.37.0 h1:RheObYW32G1aiJIj81XVt78ZHJpHonHLHW7OLIshq68=
github.com/alicebob/miniredis/v2 v2.37.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
@@ -34,6 +36,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM=
github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s=
github.com/getkin/kin-openapi v0.135.0 h1:751SjYfbiwqukYuVjwYEIKNfrSwS5YpA7DZnKSwQgtg=
@@ -171,6 +175,8 @@ go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.68.0/go.mod h1:MdHW7tLtkeGJnR4TyOrnd5D0zUGZQB1l84uHCe8hRpE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.67.0 h1:yI1/OhfEPy7J9eoa6Sj051C7n5dvpj0QX8g4sRchg04=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.67.0/go.mod h1:NoUCKYWK+3ecatC4HjkRktREheMeEtrXoQxrqYFeHSc=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 h1:CqXxU8VOmDefoh0+ztfGaymYbhdB/tT3zs79QaZTNGY=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0/go.mod h1:BuhAPThV8PBHBvg8ZzZ/Ok3idOdhWIodywz2xEcRbJo=
go.opentelemetry.io/contrib/propagators/b3 v1.43.0 h1:CETqV3QLLPTy5yNrqyMr41VnAOOD4lsRved7n4QG00A=
go.opentelemetry.io/contrib/propagators/b3 v1.43.0/go.mod h1:Q4mCiCdziYzpNR0g+6UqVotAlCDZdzz6L8jwY4knOrw=
go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I=
@@ -11,14 +11,12 @@ import (
"galaxy/gateway/internal/config"
"galaxy/gateway/internal/downstream"
"galaxy/gateway/internal/testutil"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestExecuteCommandRoutesVerifiedCommandAndSignsResponse(t *testing.T) {
@@ -58,32 +56,27 @@ func TestExecuteCommandRoutesVerifiedCommandAndSignsResponse(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
response, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
response, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.NoError(t, err)
assert.Equal(t, "v1", response.GetProtocolVersion())
assert.Equal(t, "request-123", response.GetRequestId())
assert.Equal(t, testCurrentTime.UnixMilli(), response.GetTimestampMs())
assert.Equal(t, "accepted", response.GetResultCode())
assert.Equal(t, []byte("downstream-response"), response.GetPayloadBytes())
assert.Equal(t, "v1", response.Msg.GetProtocolVersion())
assert.Equal(t, "request-123", response.Msg.GetRequestId())
assert.Equal(t, testCurrentTime.UnixMilli(), response.Msg.GetTimestampMs())
assert.Equal(t, "accepted", response.Msg.GetResultCode())
assert.Equal(t, []byte("downstream-response"), response.Msg.GetPayloadBytes())
assert.Equal(t, 1, moveClient.executeCalls)
assert.Zero(t, renameClient.executeCalls)
wantHash := sha256.Sum256([]byte("downstream-response"))
assert.Equal(t, wantHash[:], response.GetPayloadHash())
require.NoError(t, authn.VerifyPayloadHash(response.GetPayloadBytes(), response.GetPayloadHash()))
require.NoError(t, authn.VerifyResponseSignature(signer.PublicKey(), response.GetSignature(), authn.ResponseSigningFields{
ProtocolVersion: response.GetProtocolVersion(),
RequestID: response.GetRequestId(),
TimestampMS: response.GetTimestampMs(),
ResultCode: response.GetResultCode(),
PayloadHash: response.GetPayloadHash(),
assert.Equal(t, wantHash[:], response.Msg.GetPayloadHash())
require.NoError(t, authn.VerifyPayloadHash(response.Msg.GetPayloadBytes(), response.Msg.GetPayloadHash()))
require.NoError(t, authn.VerifyResponseSignature(signer.PublicKey(), response.Msg.GetSignature(), authn.ResponseSigningFields{
ProtocolVersion: response.Msg.GetProtocolVersion(),
RequestID: response.Msg.GetRequestId(),
TimestampMS: response.Msg.GetTimestampMs(),
ResultCode: response.Msg.GetResultCode(),
PayloadHash: response.Msg.GetPayloadHash(),
}))
}
@@ -99,16 +92,11 @@ func TestExecuteCommandRouteMissReturnsUnimplemented(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.Unimplemented, status.Code(err))
assert.Equal(t, "message_type is not routed", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnimplemented, connect.CodeOf(err))
assert.Equal(t, "message_type is not routed", connectErrorMessage(t, err))
}
func TestExecuteCommandMapsDownstreamUnavailableToUnavailable(t *testing.T) {
@@ -131,16 +119,11 @@ func TestExecuteCommandMapsDownstreamUnavailableToUnavailable(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "downstream service is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "downstream service is unavailable", connectErrorMessage(t, err))
assert.Equal(t, 1, failingClient.executeCalls)
}
@@ -167,16 +150,11 @@ func TestExecuteCommandMapsDownstreamTimeoutToUnavailable(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "downstream service is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "downstream service is unavailable", connectErrorMessage(t, err))
assert.Equal(t, 1, stallingClient.executeCalls)
}
@@ -203,16 +181,11 @@ func TestExecuteCommandFailsClosedWhenResponseSignerUnavailable(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "response signer is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "response signer is unavailable", connectErrorMessage(t, err))
assert.Equal(t, 1, successClient.executeCalls)
}
@@ -250,13 +223,8 @@ func TestExecuteCommandPropagatesOTelSpanContextToDownstream(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.NoError(t, err)
assert.True(t, seenSpanContext.IsValid())
@@ -290,15 +258,10 @@ func TestExecuteCommandDrainsInFlightUnaryDuringShutdown(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
resultCh := make(chan error, 1)
go func() {
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
resultCh <- err
}()
@@ -353,13 +316,8 @@ func TestExecuteCommandLogsDoNotContainSensitiveTransportMaterial(t *testing.T)
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.NoError(t, err)
logOutput := logBuffer.String()
+143
View File
@@ -0,0 +1,143 @@
package grpcapi
import (
"context"
"errors"
"fmt"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
"connectrpc.com/connect"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
)
// connectEdgeAdapter exposes the existing gRPC-shaped authenticated edge
// service decorator stack (envelope → session → payload-hash → signature →
// freshness/replay → rate-limit → routing/push) through the
// gatewayv1connect.EdgeGatewayHandler interface. It owns no logic of its
// own; the underlying decorator stack carries the full ingress contract
// unchanged.
type connectEdgeAdapter struct {
impl gatewayv1.EdgeGatewayServer
}
// newConnectEdgeAdapter wraps impl as a Connect handler.
func newConnectEdgeAdapter(impl gatewayv1.EdgeGatewayServer) gatewayv1connect.EdgeGatewayHandler {
return &connectEdgeAdapter{impl: impl}
}
// ExecuteCommand unwraps the typed Connect request, calls the underlying
// service, and wraps the typed response. gRPC `status.Error` values
// returned by the decorator stack are translated to *connect.Error so
// the Connect client receives the matching code and message.
func (a *connectEdgeAdapter) ExecuteCommand(ctx context.Context, req *connect.Request[gatewayv1.ExecuteCommandRequest]) (*connect.Response[gatewayv1.ExecuteCommandResponse], error) {
resp, err := a.impl.ExecuteCommand(ctx, req.Msg)
if err != nil {
return nil, translateGRPCStatusError(err)
}
return connect.NewResponse(resp), nil
}
// SubscribeEvents adapts the Connect server stream to the
// grpc.ServerStreamingServer contract expected by the existing decorator
// stack. The decorator stack only ever calls Send and Context on the
// stream; the remaining grpc.ServerStream surface is satisfied by no-op
// shims so the interface contract is met without panicking. Errors
// returned by the decorator stack are translated to *connect.Error.
func (a *connectEdgeAdapter) SubscribeEvents(ctx context.Context, req *connect.Request[gatewayv1.SubscribeEventsRequest], stream *connect.ServerStream[gatewayv1.GatewayEvent]) error {
wrapped := &connectEdgeStream{ctx: ctx, stream: stream}
if err := a.impl.SubscribeEvents(req.Msg, wrapped); err != nil {
return translateGRPCStatusError(err)
}
return nil
}
// translateGRPCStatusError maps gRPC status.Error values returned by the
// decorator stack into *connect.Error with the equivalent code and message.
// Errors that are already *connect.Error pass through unchanged. Errors
// without a recognisable gRPC status are returned verbatim — connect-go
// renders those as CodeUnknown.
func translateGRPCStatusError(err error) error {
if err == nil {
return nil
}
var connectErr *connect.Error
if errors.As(err, &connectErr) {
return err
}
grpcStatus, ok := grpcstatus.FromError(err)
if !ok {
return err
}
if grpcStatus.Code() == codes.OK {
return nil
}
return connect.NewError(connect.Code(grpcStatus.Code()), errors.New(grpcStatus.Message()))
}
// connectEdgeStream satisfies grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
// on top of *connect.ServerStream. The decorator stack reads the request
// context and pushes outbound events through Send; the rest of the
// grpc.ServerStream surface is not exercised in the gateway, so the no-op
// implementations preserve the type contract without surprising behaviour.
type connectEdgeStream struct {
ctx context.Context
stream *connect.ServerStream[gatewayv1.GatewayEvent]
}
// Send forwards a typed gateway event through the underlying Connect server
// stream.
func (s *connectEdgeStream) Send(event *gatewayv1.GatewayEvent) error {
return s.stream.Send(event)
}
// Context returns the request context handed to the Connect handler.
func (s *connectEdgeStream) Context() context.Context {
return s.ctx
}
// SetHeader is part of grpc.ServerStream. The Connect transport exposes
// response headers through ResponseHeader() at construction time; metadata
// supplied here is intentionally ignored because no decorator in the
// gateway exercises the gRPC-only metadata path.
func (s *connectEdgeStream) SetHeader(metadata.MD) error {
return nil
}
// SendHeader is part of grpc.ServerStream. Connect-served streams flush
// headers automatically on the first Send; manual header dispatch is not
// modelled.
func (s *connectEdgeStream) SendHeader(metadata.MD) error {
return nil
}
// SetTrailer is part of grpc.ServerStream. Trailer metadata has no
// corresponding Connect concept on server-streaming responses.
func (s *connectEdgeStream) SetTrailer(metadata.MD) {}
// SendMsg is part of grpc.ServerStream. The decorator stack never calls
// SendMsg directly; if a future caller does, the typed Send path is used
// when the message is a GatewayEvent.
func (s *connectEdgeStream) SendMsg(m any) error {
event, ok := m.(*gatewayv1.GatewayEvent)
if !ok {
return fmt.Errorf("connectEdgeStream.SendMsg: unsupported message type %T", m)
}
return s.stream.Send(event)
}
// RecvMsg is part of grpc.ServerStream. Server-streaming server handlers
// have no client messages to receive after the initial request, so this
// method is intentionally an error path.
func (s *connectEdgeStream) RecvMsg(any) error {
return errors.New("connectEdgeStream.RecvMsg: server-streaming has no client messages")
}
@@ -0,0 +1,110 @@
package grpcapi
import (
"context"
"net"
"time"
"galaxy/gateway/internal/telemetry"
"connectrpc.com/connect"
"go.uber.org/zap"
)
// observabilityConnectInterceptor returns a Connect interceptor that records
// the same structured log entry and authenticated edge metric pair as the
// gRPC instrumentation it replaced. It also injects the parsed peer IP into
// the request context so the rate-limit decorator can attribute requests
// without depending on the gRPC `peer` package.
func observabilityConnectInterceptor(logger *zap.Logger, metrics *telemetry.Runtime) connect.Interceptor {
if logger == nil {
logger = zap.NewNop()
}
return &connectObservability{logger: logger, metrics: metrics}
}
type connectObservability struct {
logger *zap.Logger
metrics *telemetry.Runtime
}
// WrapUnary records timing and outcome for a single unary edge call.
func (o *connectObservability) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
ctx = contextWithPeerIP(ctx, hostFromConnectPeerAddr(req.Peer().Addr))
start := time.Now()
resp, err := next(ctx, req)
var respValue any
if resp != nil {
respValue = resp.Any()
}
recordEdgeRequest(o.logger, o.metrics, ctx, "connect", req.Spec().Procedure, req.Any(), respValue, err, time.Since(start), "unary")
return resp, err
}
}
// WrapStreamingClient is the client-side hook required by the
// connect.Interceptor contract. The gateway only acts as a Connect server,
// so this hook is a pass-through.
func (o *connectObservability) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
return next
}
// WrapStreamingHandler records timing and outcome for one server-streaming
// edge call. The wrapped conn captures the first received request so the
// log/metric pair carries the same envelope fields the gRPC instrumentation
// emitted before.
func (o *connectObservability) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
return func(ctx context.Context, conn connect.StreamingHandlerConn) error {
ctx = contextWithPeerIP(ctx, hostFromConnectPeerAddr(conn.Peer().Addr))
start := time.Now()
wrapped := &observabilityStreamingConn{StreamingHandlerConn: conn}
err := next(ctx, wrapped)
recordEdgeRequest(o.logger, o.metrics, ctx, "connect", conn.Spec().Procedure, wrapped.firstRequest, nil, err, time.Since(start), "stream")
return err
}
}
// observabilityStreamingConn captures the first received request so the
// streaming-handler interceptor can derive the envelope log fields after
// the handler returns.
type observabilityStreamingConn struct {
connect.StreamingHandlerConn
firstRequest any
}
// Receive forwards to the underlying conn and stores the first successful
// message, so envelopeFieldsFromRequest can read message_type, request_id,
// and trace_id from it.
func (c *observabilityStreamingConn) Receive(msg any) error {
err := c.StreamingHandlerConn.Receive(msg)
if err == nil && c.firstRequest == nil {
c.firstRequest = msg
}
return err
}
// hostFromConnectPeerAddr returns the host part of a "host:port" peer
// address, or the address verbatim when it cannot be split. Empty input
// yields an empty string so peerIPFromContext falls back to the canonical
// `unknown` bucket.
func hostFromConnectPeerAddr(addr string) string {
if addr == "" {
return ""
}
host, _, err := net.SplitHostPort(addr)
if err == nil && host != "" {
return host
}
return addr
}
+1 -2
View File
@@ -4,8 +4,7 @@ import (
"bytes"
"context"
"fmt"
"galaxy/gateway/proto/galaxy/gateway/v1"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"buf.build/go/protovalidate"
"google.golang.org/grpc"
@@ -3,7 +3,6 @@ package grpcapi
import (
"context"
"errors"
"io"
"sync"
"testing"
"time"
@@ -12,11 +11,10 @@ import (
"galaxy/gateway/internal/session"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestExecuteCommandRejectsStaleTimestamp(t *testing.T) {
@@ -51,16 +49,11 @@ func TestExecuteCommandRejectsStaleTimestamp(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", tt.timestampMS))
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", tt.timestampMS)))
require.Error(t, err)
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
assert.Equal(t, "request timestamp is outside the freshness window", status.Convert(err).Message())
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "request timestamp is outside the freshness window", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
})
}
@@ -98,16 +91,11 @@ func TestSubscribeEventsRejectsStaleTimestamp(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequestWithTimestamp("device-session-123", "request-123", tt.timestampMS))
require.Error(t, err)
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
assert.Equal(t, "request timestamp is outside the freshness window", status.Convert(err).Message())
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "request timestamp is outside the freshness window", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
})
}
@@ -127,21 +115,16 @@ func TestExecuteCommandRejectsReplay(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
req := newValidExecuteCommandRequest()
_, err := client.ExecuteCommand(context.Background(), req)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(req))
require.NoError(t, err)
_, err = client.ExecuteCommand(context.Background(), req)
_, err = client.ExecuteCommand(context.Background(), connect.NewRequest(req))
require.Error(t, err)
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
assert.Equal(t, "request replay detected", status.Convert(err).Message())
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "request replay detected", connectErrorMessage(t, err))
assert.Equal(t, 1, delegate.executeCalls)
}
@@ -159,25 +142,20 @@ func TestSubscribeEventsRejectsReplay(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
req := newValidSubscribeEventsRequest()
stream, err := client.SubscribeEvents(context.Background(), req)
stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(req))
require.NoError(t, err)
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
_, err = stream.Recv()
require.ErrorIs(t, err, io.EOF)
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
err = subscribeEventsError(t, context.Background(), client, req)
require.Error(t, err)
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
assert.Equal(t, "request replay detected", status.Convert(err).Message())
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "request replay detected", connectErrorMessage(t, err))
assert.Equal(t, 1, delegate.subscribeCalls)
}
@@ -204,17 +182,12 @@ func TestExecuteCommandAllowsSameRequestIDAcrossDistinctSessions(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-123", "request-shared"))
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-123", "request-shared")))
require.NoError(t, err)
_, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-456", "request-shared"))
_, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-456", "request-shared")))
require.NoError(t, err)
assert.Equal(t, 2, delegate.executeCalls)
@@ -243,26 +216,21 @@ func TestSubscribeEventsAllowsSameRequestIDAcrossDistinctSessions(t *testing.T)
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
stream, err := client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-123", "request-shared"))
stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-123", "request-shared")))
require.NoError(t, err)
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-shared", "trace-123", testCurrentTime.UnixMilli())
_, err = stream.Recv()
require.ErrorIs(t, err, io.EOF)
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
stream, err = client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-456", "request-shared"))
stream, err = client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-456", "request-shared")))
require.NoError(t, err)
event = recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-shared", "trace-123", testCurrentTime.UnixMilli())
_, err = stream.Recv()
require.ErrorIs(t, err, io.EOF)
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
assert.Equal(t, 2, delegate.subscribeCalls)
}
@@ -283,16 +251,11 @@ func TestExecuteCommandRejectsReplayStoreUnavailable(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "replay store is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
@@ -312,16 +275,11 @@ func TestSubscribeEventsRejectsReplayStoreUnavailable(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "replay store is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
@@ -353,15 +311,10 @@ func TestExecuteCommandFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *tes
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
response, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
response, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.NoError(t, err)
assert.Equal(t, "request-123", response.GetRequestId())
assert.Equal(t, "request-123", response.Msg.GetRequestId())
assert.Equal(t, "device-session-123", reservedDeviceSessionID)
assert.Equal(t, "request-123", reservedRequestID)
assert.Equal(t, testFreshnessWindow, reservedTTL)
@@ -394,18 +347,13 @@ func TestSubscribeEventsFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *te
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
stream, err := client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequest())
client := newEdgeClient(t, addr)
stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequest()))
require.NoError(t, err)
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
_, err = stream.Recv()
require.ErrorIs(t, err, io.EOF)
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
assert.Equal(t, testFreshnessWindow, reservedTTL)
assert.Equal(t, 1, delegate.subscribeCalls)
}
@@ -434,15 +382,10 @@ func TestExecuteCommandFutureSkewUsesExtendedReplayTTL(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(
context.Background(),
newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(2*time.Minute).UnixMilli()),
connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(2*time.Minute).UnixMilli())),
)
require.NoError(t, err)
assert.Equal(t, 7*time.Minute, reservedTTL)
@@ -473,15 +416,10 @@ func TestExecuteCommandBoundaryFreshnessUsesMinimumReplayTTL(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(
context.Background(),
newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(-testFreshnessWindow).UnixMilli()),
connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(-testFreshnessWindow).UnixMilli())),
)
require.NoError(t, err)
assert.Equal(t, minimumReplayReservationTTL, reservedTTL)
+17 -55
View File
@@ -12,59 +12,21 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func observabilityUnaryInterceptor(logger *zap.Logger, metrics *telemetry.Runtime) grpc.UnaryServerInterceptor {
if logger == nil {
logger = zap.NewNop()
}
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
start := time.Now()
resp, err := handler(ctx, req)
recordGRPCRequest(logger, metrics, ctx, info.FullMethod, req, resp, err, time.Since(start), "unary")
return resp, err
}
}
func observabilityStreamInterceptor(logger *zap.Logger, metrics *telemetry.Runtime) grpc.StreamServerInterceptor {
if logger == nil {
logger = zap.NewNop()
}
return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
start := time.Now()
wrapped := &observabilityServerStream{ServerStream: stream}
err := handler(srv, wrapped)
recordGRPCRequest(logger, metrics, stream.Context(), info.FullMethod, wrapped.request, nil, err, time.Since(start), "stream")
return err
}
}
type observabilityServerStream struct {
grpc.ServerStream
request any
}
func (s *observabilityServerStream) RecvMsg(m any) error {
err := s.ServerStream.RecvMsg(m)
if err == nil && s.request == nil {
s.request = m
}
return err
}
func recordGRPCRequest(logger *zap.Logger, metrics *telemetry.Runtime, ctx context.Context, fullMethod string, req any, resp any, err error, duration time.Duration, streamKind string) {
// recordEdgeRequest emits the structured log entry and the
// `gateway.authenticated_grpc.*` metric pair for one authenticated edge
// request or stream outcome. The transport parameter labels the wire
// protocol the request travelled over (`connect`, `grpc`, or `grpc-web`),
// preserving stable observability semantics across the unified Connect-go
// listener.
func recordEdgeRequest(logger *zap.Logger, metrics *telemetry.Runtime, ctx context.Context, transport string, fullMethod string, req any, resp any, err error, duration time.Duration, streamKind string) {
rpcMethod := path.Base(fullMethod)
messageType, requestID, traceID := grpcEnvelopeFields(req)
resultCode := grpcResultCode(resp)
grpcCode, grpcMessage, outcome := grpcOutcome(err)
messageType, requestID, traceID := envelopeFieldsFromRequest(req)
resultCode := resultCodeFromResponse(resp)
grpcCode, grpcMessage, outcome := outcomeFromError(err)
rejectReason := telemetry.RejectReason(outcome)
attrs := []attribute.KeyValue{
@@ -82,7 +44,7 @@ func recordGRPCRequest(logger *zap.Logger, metrics *telemetry.Runtime, ctx conte
fields := []zap.Field{
zap.String("component", "authenticated_grpc"),
zap.String("transport", "grpc"),
zap.String("transport", transport),
zap.String("stream_kind", streamKind),
zap.String("rpc_method", rpcMethod),
zap.String("message_type", messageType),
@@ -106,15 +68,15 @@ func recordGRPCRequest(logger *zap.Logger, metrics *telemetry.Runtime, ctx conte
switch outcome {
case telemetry.EdgeOutcomeSuccess:
logger.Info("authenticated gRPC request completed", fields...)
logger.Info("authenticated edge request completed", fields...)
case telemetry.EdgeOutcomeBackendUnavailable, telemetry.EdgeOutcomeDownstreamUnavailable, telemetry.EdgeOutcomeInternalError:
logger.Error("authenticated gRPC request failed", fields...)
logger.Error("authenticated edge request failed", fields...)
default:
logger.Warn("authenticated gRPC request rejected", fields...)
logger.Warn("authenticated edge request rejected", fields...)
}
}
func grpcEnvelopeFields(req any) (messageType string, requestID string, traceID string) {
func envelopeFieldsFromRequest(req any) (messageType string, requestID string, traceID string) {
switch typed := req.(type) {
case *gatewayv1.ExecuteCommandRequest:
return typed.GetMessageType(), typed.GetRequestId(), typed.GetTraceId()
@@ -125,7 +87,7 @@ func grpcEnvelopeFields(req any) (messageType string, requestID string, traceID
}
}
func grpcResultCode(resp any) string {
func resultCodeFromResponse(resp any) string {
typed, ok := resp.(*gatewayv1.ExecuteCommandResponse)
if !ok {
return ""
@@ -134,7 +96,7 @@ func grpcResultCode(resp any) string {
return typed.GetResultCode()
}
func grpcOutcome(err error) (codes.Code, string, telemetry.EdgeOutcome) {
func outcomeFromError(err error) (codes.Code, string, telemetry.EdgeOutcome) {
switch {
case err == nil:
return codes.OK, "", telemetry.EdgeOutcomeSuccess
@@ -6,12 +6,10 @@ import (
"testing"
"galaxy/gateway/internal/session"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestExecuteCommandRejectsPayloadHashWithInvalidLength(t *testing.T) {
@@ -25,19 +23,15 @@ func TestExecuteCommandRejectsPayloadHashWithInvalidLength(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
req := newValidExecuteCommandRequest()
req.PayloadHash = []byte("short")
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), req)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(req))
require.Error(t, err)
assert.Equal(t, codes.InvalidArgument, status.Code(err))
assert.Equal(t, "payload_hash must be a 32-byte SHA-256 digest", status.Convert(err).Message())
assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err))
assert.Equal(t, "payload_hash must be a 32-byte SHA-256 digest", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
@@ -52,20 +46,16 @@ func TestExecuteCommandRejectsPayloadHashMismatch(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
req := newValidExecuteCommandRequest()
sum := sha256.Sum256([]byte("other"))
req.PayloadHash = sum[:]
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), req)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(req))
require.Error(t, err)
assert.Equal(t, codes.InvalidArgument, status.Code(err))
assert.Equal(t, "payload_hash does not match payload_bytes", status.Convert(err).Message())
assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err))
assert.Equal(t, "payload_hash does not match payload_bytes", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
@@ -80,19 +70,15 @@ func TestSubscribeEventsRejectsPayloadHashWithInvalidLength(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
req := newValidSubscribeEventsRequest()
req.PayloadHash = []byte("short")
client := gatewayv1.NewEdgeGatewayClient(conn)
err := subscribeEventsError(t, context.Background(), client, req)
require.Error(t, err)
assert.Equal(t, codes.InvalidArgument, status.Code(err))
assert.Equal(t, "payload_hash must be a 32-byte SHA-256 digest", status.Convert(err).Message())
assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err))
assert.Equal(t, "payload_hash must be a 32-byte SHA-256 digest", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
@@ -107,19 +93,15 @@ func TestSubscribeEventsRejectsPayloadHashMismatch(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
req := newValidSubscribeEventsRequest()
sum := sha256.Sum256([]byte("other"))
req.PayloadHash = sum[:]
client := gatewayv1.NewEdgeGatewayClient(conn)
err := subscribeEventsError(t, context.Background(), client, req)
require.Error(t, err)
assert.Equal(t, codes.InvalidArgument, status.Code(err))
assert.Equal(t, "payload_hash does not match payload_bytes", status.Convert(err).Message())
assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err))
assert.Equal(t, "payload_hash does not match payload_bytes", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
+17 -21
View File
@@ -3,8 +3,6 @@ package grpcapi
import (
"context"
"errors"
"net"
"strings"
"galaxy/gateway/internal/config"
"galaxy/gateway/internal/ratelimit"
@@ -13,7 +11,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
@@ -41,7 +38,7 @@ var (
ErrAuthenticatedPolicyUnavailable = errors.New("authenticated request policy is unavailable")
)
// AuthenticatedRequestLimiter applies authenticated gRPC rate-limit policy to
// AuthenticatedRequestLimiter applies authenticated edge rate-limit policy to
// one concrete bucket key.
type AuthenticatedRequestLimiter interface {
// Reserve evaluates key under policy and reports whether the request may
@@ -52,10 +49,11 @@ type AuthenticatedRequestLimiter interface {
// AuthenticatedRequest describes the authenticated request metadata exposed to
// the edge-policy hook.
type AuthenticatedRequest struct {
// RPCMethod identifies the public gRPC method being processed.
// RPCMethod identifies the public RPC method being processed.
RPCMethod string
// PeerIP is the transport peer IP derived from the gRPC connection.
// PeerIP is the transport peer IP host part derived from the
// authenticated edge HTTP listener peer address.
PeerIP string
// MessageClass is the stable rate-limit and policy class. The gateway uses
@@ -258,25 +256,23 @@ func authenticatedMessageClass(messageType string) string {
return messageType
}
type peerIPContextKey struct{}
// contextWithPeerIP attaches the authenticated edge transport peer IP to ctx.
// It is set by the transport interceptor before the service decorator stack
// runs, and read back via peerIPFromContext.
func contextWithPeerIP(ctx context.Context, ip string) context.Context {
return context.WithValue(ctx, peerIPContextKey{}, ip)
}
func peerIPFromContext(ctx context.Context) string {
peerInfo, ok := peer.FromContext(ctx)
if !ok || peerInfo.Addr == nil {
if ip, ok := ctx.Value(peerIPContextKey{}).(string); ok && ip != "" {
return ip
}
return unknownAuthenticatedPeerIP
}
value := strings.TrimSpace(peerInfo.Addr.String())
if value == "" {
return unknownAuthenticatedPeerIP
}
host, _, err := net.SplitHostPort(value)
if err == nil && host != "" {
return host
}
return value
}
type noopAuthenticatedRequestPolicy struct{}
func (noopAuthenticatedRequestPolicy) Evaluate(context.Context, AuthenticatedRequest) error {
@@ -3,7 +3,6 @@ package grpcapi
import (
"context"
"fmt"
"io"
"net"
"net/http"
"strings"
@@ -17,10 +16,9 @@ import (
"galaxy/gateway/internal/session"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestExecuteCommandRateLimitsByIP(t *testing.T) {
@@ -41,20 +39,15 @@ func TestExecuteCommandRateLimitsByIP(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1"))
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1")))
require.NoError(t, err)
_, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-2"))
_, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-2")))
require.Error(t, err)
assert.Equal(t, codes.ResourceExhausted, status.Code(err))
assert.Equal(t, "authenticated request rate limit exceeded", status.Convert(err).Message())
assert.Equal(t, connect.CodeResourceExhausted, connect.CodeOf(err))
assert.Equal(t, "authenticated request rate limit exceeded", connectErrorMessage(t, err))
assert.Equal(t, 1, delegate.executeCalls)
}
@@ -76,21 +69,16 @@ func TestExecuteCommandRateLimitsBySession(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1"))
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1")))
require.NoError(t, err)
_, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-2"))
_, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-2")))
require.Error(t, err)
assert.Equal(t, codes.ResourceExhausted, status.Code(err))
assert.Equal(t, connect.CodeResourceExhausted, connect.CodeOf(err))
_, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-3"))
_, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-3")))
require.NoError(t, err)
assert.Equal(t, 2, delegate.executeCalls)
@@ -118,21 +106,16 @@ func TestExecuteCommandRateLimitsByUser(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1"))
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1")))
require.NoError(t, err)
_, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-2"))
_, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-2")))
require.Error(t, err)
assert.Equal(t, codes.ResourceExhausted, status.Code(err))
assert.Equal(t, connect.CodeResourceExhausted, connect.CodeOf(err))
_, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-3", "request-3"))
_, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-3", "request-3")))
require.NoError(t, err)
assert.Equal(t, 2, delegate.executeCalls)
@@ -159,21 +142,16 @@ func TestExecuteCommandRateLimitsByMessageClass(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithMessageType("device-session-1", "request-1", "fleet.move"))
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithMessageType("device-session-1", "request-1", "fleet.move")))
require.NoError(t, err)
_, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithMessageType("device-session-2", "request-2", "fleet.move"))
_, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithMessageType("device-session-2", "request-2", "fleet.move")))
require.Error(t, err)
assert.Equal(t, codes.ResourceExhausted, status.Code(err))
assert.Equal(t, connect.CodeResourceExhausted, connect.CodeOf(err))
_, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithMessageType("device-session-2", "request-3", "fleet.rename"))
_, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithMessageType("device-session-2", "request-3", "fleet.rename")))
require.NoError(t, err)
assert.Equal(t, 2, delegate.executeCalls)
@@ -193,13 +171,8 @@ func TestAuthenticatedPolicyHookReceivesVerifiedRequest(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.NoError(t, err)
require.Len(t, policy.requests, 1)
@@ -228,16 +201,11 @@ func TestExecuteCommandPolicyRejectMapsToPermissionDenied(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.PermissionDenied, status.Code(err))
assert.Equal(t, "authenticated request rejected by edge policy", status.Convert(err).Message())
assert.Equal(t, connect.CodePermissionDenied, connect.CodeOf(err))
assert.Equal(t, "authenticated request rejected by edge policy", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
@@ -259,24 +227,19 @@ func TestSubscribeEventsRateLimitRejectsStream(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
stream, err := client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-1", "request-1"))
stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-1", "request-1")))
require.NoError(t, err)
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-1", "trace-123", testCurrentTime.UnixMilli())
_, err = stream.Recv()
require.ErrorIs(t, err, io.EOF)
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
err = subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-2", "request-2"))
require.Error(t, err)
assert.Equal(t, codes.ResourceExhausted, status.Code(err))
assert.Equal(t, "authenticated request rate limit exceeded", status.Convert(err).Message())
assert.Equal(t, connect.CodeResourceExhausted, connect.CodeOf(err))
assert.Equal(t, "authenticated request rate limit exceeded", connectErrorMessage(t, err))
assert.Equal(t, 1, delegate.subscribeCalls)
}
@@ -342,13 +305,8 @@ func TestAuthenticatedRateLimitsStayIsolatedFromPublicREST(t *testing.T) {
require.NoError(t, firstPublic.Body.Close())
require.NoError(t, secondPublic.Body.Close())
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.NoError(t, err)
}
+61 -48
View File
@@ -1,4 +1,10 @@
// Package grpcapi exposes the authenticated gRPC surface of the gateway.
// Package grpcapi exposes the authenticated edge transport surface of the
// gateway. Despite the historical package name, the listener is built on
// `connectrpc.com/connect` and natively serves the Connect, gRPC, and
// gRPC-Web protocols on a single HTTP/h2c listener. The configured Go
// types and environment variable names retain the `gRPC` infix for
// operational stability — they describe the authenticated edge tier, not
// the wire protocol.
package grpcapi
import (
@@ -6,6 +12,7 @@ import (
"errors"
"fmt"
"net"
"net/http"
"sync"
"galaxy/gateway/authn"
@@ -18,14 +25,17 @@ import (
"galaxy/gateway/internal/session"
"galaxy/gateway/internal/telemetry"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"connectrpc.com/connect"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.uber.org/zap"
"google.golang.org/grpc"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
// ServerDependencies describes the optional collaborators used by the
// authenticated gRPC server. The zero value is valid and keeps the process
// authenticated edge server. The zero value is valid and keeps the process
// runnable with the built-in unimplemented service stub.
type ServerDependencies struct {
// Service optionally handles the post-bootstrap SubscribeEvents lifecycle
@@ -45,12 +55,12 @@ type ServerDependencies struct {
ResponseSigner authn.ResponseSigner
// SessionCache resolves authenticated device sessions after the envelope
// gate succeeds. When nil, the authenticated gRPC surface remains runnable
// gate succeeds. When nil, the authenticated edge surface remains runnable
// but valid envelopes fail closed as session-cache unavailable.
SessionCache session.Cache
// Clock provides current server time for freshness checks. When nil, the
// authenticated gRPC surface uses the system clock.
// authenticated edge surface uses the system clock.
Clock clock.Clock
// ReplayStore reserves authenticated request identifiers after signature
@@ -59,26 +69,28 @@ type ServerDependencies struct {
ReplayStore replay.Store
// Limiter applies authenticated rate limits after the request passes the
// transport authenticity checks. When nil, the authenticated gRPC surface
// transport authenticity checks. When nil, the authenticated edge surface
// uses a process-local in-memory limiter.
Limiter AuthenticatedRequestLimiter
// Policy evaluates later authenticated edge policy after rate limits pass.
// When nil, the authenticated gRPC surface applies a no-op allow policy.
// When nil, the authenticated edge surface applies a no-op allow policy.
Policy AuthenticatedRequestPolicy
// Logger writes structured logs for authenticated gRPC traffic.
// Logger writes structured logs for authenticated edge traffic.
Logger *zap.Logger
// Telemetry records low-cardinality gRPC metrics.
// Telemetry records low-cardinality edge metrics.
Telemetry *telemetry.Runtime
// PushHub is the active authenticated push-stream hub. When present, the
// server closes active streams before GracefulStop during shutdown.
// server closes active streams before HTTP graceful shutdown.
PushHub *push.Hub
}
// Server owns the authenticated gRPC listener exposed by the gateway.
// Server owns the authenticated edge HTTP/h2c listener exposed by the
// gateway. It serves the Connect, gRPC, and gRPC-Web protocols from a
// single net/http listener.
type Server struct {
cfg config.AuthenticatedGRPCConfig
service gatewayv1.EdgeGatewayServer
@@ -87,11 +99,11 @@ type Server struct {
metrics *telemetry.Runtime
stateMu sync.RWMutex
server *grpc.Server
server *http.Server
listener net.Listener
}
// NewServer constructs an authenticated gRPC server for the supplied listener
// NewServer constructs an authenticated edge server for the supplied listener
// configuration and dependency bundle. Nil dependencies are replaced with safe
// defaults so the gateway can expose the documented transport surface with the
// full auth pipeline wired from built-in fallbacks.
@@ -128,17 +140,17 @@ func NewServer(cfg config.AuthenticatedGRPCConfig, deps ServerDependencies) *Ser
deps.SessionCache,
),
),
logger: deps.Logger.Named("authenticated_grpc"),
logger: deps.Logger.Named("authenticated_edge"),
pushHub: deps.PushHub,
metrics: deps.Telemetry,
}
}
// Run binds the configured listener and serves the authenticated gRPC surface
// until Shutdown closes the server.
// Run binds the configured listener and serves the authenticated edge
// surface until Shutdown closes the server.
func (s *Server) Run(ctx context.Context) error {
if ctx == nil {
return errors.New("run authenticated gRPC server: nil context")
return errors.New("run authenticated edge server: nil context")
}
if err := ctx.Err(); err != nil {
return err
@@ -146,23 +158,30 @@ func (s *Server) Run(ctx context.Context) error {
listener, err := net.Listen("tcp", s.cfg.Addr)
if err != nil {
return fmt.Errorf("run authenticated gRPC server: listen on %q: %w", s.cfg.Addr, err)
return fmt.Errorf("run authenticated edge server: listen on %q: %w", s.cfg.Addr, err)
}
grpcServer := grpc.NewServer(
grpc.ConnectionTimeout(s.cfg.ConnectionTimeout),
grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.ChainUnaryInterceptor(observabilityUnaryInterceptor(s.logger, s.metrics)),
grpc.ChainStreamInterceptor(observabilityStreamInterceptor(s.logger, s.metrics)),
mux := http.NewServeMux()
connectHandler := newConnectEdgeAdapter(s.service)
path, handler := gatewayv1connect.NewEdgeGatewayHandler(
connectHandler,
connect.WithInterceptors(observabilityConnectInterceptor(s.logger, s.metrics)),
)
gatewayv1.RegisterEdgeGatewayServer(grpcServer, s.service)
mux.Handle(path, handler)
tracedHandler := otelhttp.NewHandler(mux, "authenticated_edge")
http2Server := &http2.Server{IdleTimeout: s.cfg.ConnectionTimeout}
httpServer := &http.Server{
Handler: h2c.NewHandler(tracedHandler, http2Server),
ReadHeaderTimeout: s.cfg.ConnectionTimeout,
}
s.stateMu.Lock()
s.server = grpcServer
s.server = httpServer
s.listener = listener
s.stateMu.Unlock()
s.logger.Info("authenticated gRPC server started", zap.String("addr", listener.Addr().String()))
s.logger.Info("authenticated edge server started", zap.String("addr", listener.Addr().String()))
defer func() {
s.stateMu.Lock()
@@ -171,24 +190,22 @@ func (s *Server) Run(ctx context.Context) error {
s.stateMu.Unlock()
}()
err = grpcServer.Serve(listener)
err = httpServer.Serve(listener)
switch {
case err == nil:
return nil
case errors.Is(err, grpc.ErrServerStopped):
s.logger.Info("authenticated gRPC server stopped")
case err == nil, errors.Is(err, http.ErrServerClosed):
s.logger.Info("authenticated edge server stopped")
return nil
default:
return fmt.Errorf("run authenticated gRPC server: serve on %q: %w", s.cfg.Addr, err)
return fmt.Errorf("run authenticated edge server: serve on %q: %w", s.cfg.Addr, err)
}
}
// Shutdown gracefully stops the authenticated gRPC server within ctx. When the
// graceful stop exceeds ctx, the server is force-stopped before returning the
// Shutdown gracefully stops the authenticated edge server within ctx. When the
// graceful stop exceeds ctx, the server is force-closed before returning the
// timeout to the caller.
func (s *Server) Shutdown(ctx context.Context) error {
if ctx == nil {
return errors.New("shutdown authenticated gRPC server: nil context")
return errors.New("shutdown authenticated edge server: nil context")
}
s.stateMu.RLock()
@@ -203,20 +220,16 @@ func (s *Server) Shutdown(ctx context.Context) error {
s.pushHub.Shutdown()
}
stopped := make(chan struct{})
go func() {
server.GracefulStop()
close(stopped)
}()
select {
case <-stopped:
err := server.Shutdown(ctx)
if err == nil {
return nil
case <-ctx.Done():
server.Stop()
<-stopped
return fmt.Errorf("shutdown authenticated gRPC server: %w", ctx.Err())
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
_ = server.Close()
return fmt.Errorf("shutdown authenticated edge server: %w", err)
}
return fmt.Errorf("shutdown authenticated edge server: %w", err)
}
func (s *Server) listenAddr() string {
+82 -88
View File
@@ -2,6 +2,10 @@ package grpcapi
import (
"context"
"crypto/tls"
"errors"
"net"
"net/http"
"testing"
"time"
@@ -9,13 +13,12 @@ import (
"galaxy/gateway/internal/config"
"galaxy/gateway/internal/session"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
"connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"golang.org/x/net/http2"
)
func TestExecuteCommandRejectsMalformedEnvelope(t *testing.T) {
@@ -25,15 +28,11 @@ func TestExecuteCommandRejectsMalformedEnvelope(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), &gatewayv1.ExecuteCommandRequest{})
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(&gatewayv1.ExecuteCommandRequest{}))
require.Error(t, err)
assert.Equal(t, codes.InvalidArgument, status.Code(err))
assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err))
}
func TestSubscribeEventsRejectsMalformedEnvelope(t *testing.T) {
@@ -43,15 +42,11 @@ func TestSubscribeEventsRejectsMalformedEnvelope(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
err := subscribeEventsError(t, context.Background(), client, &gatewayv1.SubscribeEventsRequest{})
require.Error(t, err)
assert.Equal(t, codes.InvalidArgument, status.Code(err))
assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err))
}
func TestExecuteCommandRejectsUnsupportedProtocolVersion(t *testing.T) {
@@ -61,13 +56,9 @@ func TestExecuteCommandRejectsUnsupportedProtocolVersion(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), &gatewayv1.ExecuteCommandRequest{
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(&gatewayv1.ExecuteCommandRequest{
ProtocolVersion: "v2",
DeviceSessionId: "device-session-123",
MessageType: "fleet.move",
@@ -76,10 +67,10 @@ func TestExecuteCommandRejectsUnsupportedProtocolVersion(t *testing.T) {
PayloadBytes: []byte("payload"),
PayloadHash: []byte("hash"),
Signature: []byte("signature"),
})
}))
require.Error(t, err)
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
assert.Equal(t, `unsupported protocol_version "v2"`, status.Convert(err).Message())
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, `unsupported protocol_version "v2"`, connectErrorMessage(t, err))
}
func TestExecuteCommandValidEnvelopeStillReturnsUnimplemented(t *testing.T) {
@@ -96,15 +87,11 @@ func TestExecuteCommandValidEnvelopeStillReturnsUnimplemented(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.Unimplemented, status.Code(err))
assert.Equal(t, connect.CodeUnimplemented, connect.CodeOf(err))
}
func TestExecuteCommandMissingReplayStoreFailsClosed(t *testing.T) {
@@ -120,16 +107,12 @@ func TestExecuteCommandMissingReplayStoreFailsClosed(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "replay store is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err))
}
func TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation(t *testing.T) {
@@ -149,22 +132,22 @@ func TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation(
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
stream, err := client.SubscribeEvents(ctx, newValidSubscribeEventsRequest())
stream, err := client.SubscribeEvents(ctx, connect.NewRequest(newValidSubscribeEventsRequest()))
require.NoError(t, err)
t.Cleanup(func() { _ = stream.Close() })
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
recvResult := make(chan error, 1)
go func() {
_, recvErr := stream.Recv()
recvResult <- recvErr
if stream.Receive() {
recvResult <- errors.New("stream produced unexpected event")
return
}
recvResult <- stream.Err()
}()
require.Never(t, func() bool {
@@ -188,7 +171,7 @@ func TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation(
}
}, time.Second, 10*time.Millisecond, "stream did not stop after client cancellation")
require.Error(t, recvErr)
assert.Equal(t, codes.Canceled, status.Code(recvErr))
assert.Equal(t, connect.CodeCanceled, connect.CodeOf(recvErr))
}
func TestSubscribeEventsMissingReplayStoreFailsClosed(t *testing.T) {
@@ -204,16 +187,12 @@ func TestSubscribeEventsMissingReplayStoreFailsClosed(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "replay store is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err))
}
func TestSubscribeEventsFailsClosedWhenResponseSignerUnavailable(t *testing.T) {
@@ -231,16 +210,12 @@ func TestSubscribeEventsFailsClosedWhenResponseSignerUnavailable(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
client := gatewayv1.NewEdgeGatewayClient(conn)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "response signer is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "response signer is unavailable", connectErrorMessage(t, err))
}
func TestServerLifecycle(t *testing.T) {
@@ -248,21 +223,23 @@ func TestServerLifecycle(t *testing.T) {
server, runGateway := newTestGateway(t, ServerDependencies{})
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
require.NoError(t, conn.Close())
// Probe the listener before shutdown so we know it accepted at
// least one TCP connection.
probe, err := net.DialTimeout("tcp", addr, time.Second)
require.NoError(t, err)
require.NoError(t, probe.Close())
runGateway.stop(t)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
// After shutdown the listener must refuse new TCP connections.
dialCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err := grpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
require.Error(t, err)
dialer := &net.Dialer{}
closedConn, err := dialer.DialContext(dialCtx, "tcp", addr)
if err == nil {
_ = closedConn.Close()
t.Fatalf("expected dial to %s to fail after shutdown", addr)
}
}
type runningGateway struct {
@@ -341,19 +318,36 @@ func waitForListenAddr(t *testing.T, server *Server) string {
return addr
}
func dialGatewayClient(t *testing.T, addr string) *grpc.ClientConn {
// newEdgeClient returns a Connect client speaking HTTP/2 cleartext to the
// authenticated edge listener. AllowHTTP forces the client to issue plain
// HTTP/2 requests (h2c) instead of attempting TLS, which the gateway's
// in-process test bootstrap does not configure.
func newEdgeClient(t *testing.T, addr string) gatewayv1connect.EdgeGatewayClient {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := grpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
require.NoError(t, err)
return conn
httpClient := &http.Client{
Transport: &http2.Transport{
AllowHTTP: true,
DialTLSContext: func(ctx context.Context, network, target string, _ *tls.Config) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
},
},
}
return gatewayv1connect.NewEdgeGatewayClient(httpClient, "http://"+addr)
}
// connectErrorMessage extracts the *connect.Error message from err. It
// fails the test if err is not a *connect.Error so the caller's expected
// message comparison doesn't accidentally match the wrapped Go error
// string instead of the protocol-level message.
func connectErrorMessage(t require.TestingT, err error) string {
if helper, ok := t.(interface{ Helper() }); ok {
helper.Helper()
}
var connectErr *connect.Error
if !errors.As(err, &connectErr) {
require.FailNowf(t, "expected *connect.Error", "got %T: %v", err, err)
}
return connectErr.Message()
}
@@ -3,17 +3,15 @@ package grpcapi
import (
"context"
"errors"
"io"
"testing"
"galaxy/gateway/internal/session"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestExecuteCommandRejectsUnknownSession(t *testing.T) {
@@ -31,16 +29,11 @@ func TestExecuteCommandRejectsUnknownSession(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.Unauthenticated, status.Code(err))
assert.Equal(t, "unknown device session", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))
assert.Equal(t, "unknown device session", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
@@ -59,16 +52,11 @@ func TestSubscribeEventsRejectsUnknownSession(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, codes.Unauthenticated, status.Code(err))
assert.Equal(t, "unknown device session", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))
assert.Equal(t, "unknown device session", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
@@ -83,16 +71,11 @@ func TestExecuteCommandRejectsRevokedSession(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
assert.Equal(t, "device session is revoked", status.Convert(err).Message())
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "device session is revoked", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
@@ -107,16 +90,11 @@ func TestSubscribeEventsRejectsRevokedSession(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
assert.Equal(t, "device session is revoked", status.Convert(err).Message())
assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err))
assert.Equal(t, "device session is revoked", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
@@ -135,16 +113,11 @@ func TestExecuteCommandRejectsSessionCacheUnavailable(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "session cache is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "session cache is unavailable", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
@@ -163,16 +136,11 @@ func TestSubscribeEventsRejectsSessionCacheUnavailable(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "session cache is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "session cache is unavailable", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
@@ -196,15 +164,10 @@ func TestExecuteCommandAttachesResolvedSession(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
response, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
response, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.NoError(t, err)
assert.Equal(t, "request-123", response.GetRequestId())
assert.Equal(t, "request-123", response.Msg.GetRequestId())
}
func TestSubscribeEventsAttachesResolvedSession(t *testing.T) {
@@ -227,20 +190,15 @@ func TestSubscribeEventsAttachesResolvedSession(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
stream, err := client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequest())
client := newEdgeClient(t, addr)
stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequest()))
require.NoError(t, err)
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
_, err = stream.Recv()
require.ErrorIs(t, err, io.EOF)
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
}
func TestSubscribeEventsAttachesAuthenticatedStreamBinding(t *testing.T) {
@@ -269,20 +227,15 @@ func TestSubscribeEventsAttachesAuthenticatedStreamBinding(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
stream, err := client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequest())
client := newEdgeClient(t, addr)
stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequest()))
require.NoError(t, err)
event := recvBootstrapEvent(t, stream)
assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli())
_, err = stream.Recv()
require.ErrorIs(t, err, io.EOF)
require.False(t, stream.Receive())
require.NoError(t, stream.Err())
}
type staticSessionCache struct {
@@ -5,12 +5,10 @@ import (
"testing"
"galaxy/gateway/internal/session"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"connectrpc.com/connect"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestExecuteCommandRejectsInvalidSignature(t *testing.T) {
@@ -24,19 +22,15 @@ func TestExecuteCommandRejectsInvalidSignature(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
req := newValidExecuteCommandRequest()
req.Signature[0] ^= 0xff
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), req)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(req))
require.Error(t, err)
assert.Equal(t, codes.Unauthenticated, status.Code(err))
assert.Equal(t, "invalid request signature", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))
assert.Equal(t, "invalid request signature", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
@@ -57,16 +51,11 @@ func TestExecuteCommandRejectsWrongKey(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.Unauthenticated, status.Code(err))
assert.Equal(t, "invalid request signature", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))
assert.Equal(t, "invalid request signature", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
@@ -87,16 +76,11 @@ func TestExecuteCommandRejectsInvalidCachedPublicKey(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
_, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest())
client := newEdgeClient(t, addr)
_, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest()))
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "session cache is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "session cache is unavailable", connectErrorMessage(t, err))
assert.Zero(t, delegate.executeCalls)
}
@@ -111,19 +95,15 @@ func TestSubscribeEventsRejectsInvalidSignature(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := newEdgeClient(t, addr)
req := newValidSubscribeEventsRequest()
req.Signature[0] ^= 0xff
client := gatewayv1.NewEdgeGatewayClient(conn)
err := subscribeEventsError(t, context.Background(), client, req)
require.Error(t, err)
assert.Equal(t, codes.Unauthenticated, status.Code(err))
assert.Equal(t, "invalid request signature", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))
assert.Equal(t, "invalid request signature", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
@@ -144,16 +124,11 @@ func TestSubscribeEventsRejectsWrongKey(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, codes.Unauthenticated, status.Code(err))
assert.Equal(t, "invalid request signature", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))
assert.Equal(t, "invalid request signature", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
@@ -174,15 +149,10 @@ func TestSubscribeEventsRejectsInvalidCachedPublicKey(t *testing.T) {
defer runGateway.stop(t)
addr := waitForListenAddr(t, server)
conn := dialGatewayClient(t, addr)
defer func() {
require.NoError(t, conn.Close())
}()
client := gatewayv1.NewEdgeGatewayClient(conn)
client := newEdgeClient(t, addr)
err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest())
require.Error(t, err)
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Equal(t, "session cache is unavailable", status.Convert(err).Message())
assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err))
assert.Equal(t, "session cache is unavailable", connectErrorMessage(t, err))
assert.Zero(t, delegate.subscribeCalls)
}
+19 -8
View File
@@ -7,19 +7,21 @@ import (
"crypto/x509"
"encoding/base64"
"encoding/pem"
"errors"
"time"
"galaxy/gateway/authn"
"galaxy/gateway/internal/downstream"
"galaxy/gateway/internal/session"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
gatewayfbs "galaxy/schema/fbs/gateway"
"connectrpc.com/connect"
flatbuffers "github.com/google/flatbuffers/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
var (
@@ -170,28 +172,37 @@ func (c fixedClock) Now() time.Time {
func recvBootstrapEvent(t interface {
require.TestingT
Helper()
}, stream grpc.ServerStreamingClient[gatewayv1.GatewayEvent]) *gatewayv1.GatewayEvent {
}, stream *connect.ServerStreamForClient[gatewayv1.GatewayEvent]) *gatewayv1.GatewayEvent {
t.Helper()
event, err := stream.Recv()
if !stream.Receive() {
err := stream.Err()
if err == nil {
err = errors.New("stream closed before bootstrap event")
}
require.NoError(t, err)
}
return event
return stream.Msg()
}
func subscribeEventsError(t interface {
require.TestingT
Helper()
}, ctx context.Context, client gatewayv1.EdgeGatewayClient, req *gatewayv1.SubscribeEventsRequest) error {
}, ctx context.Context, client gatewayv1connect.EdgeGatewayClient, req *gatewayv1.SubscribeEventsRequest) error {
t.Helper()
stream, err := client.SubscribeEvents(ctx, req)
stream, err := client.SubscribeEvents(ctx, connect.NewRequest(req))
if err != nil {
return err
}
defer func() { _ = stream.Close() }()
_, err = stream.Recv()
return err
if stream.Receive() {
return nil
}
return stream.Err()
}
func assertServerTimeBootstrapEvent(t interface {
@@ -0,0 +1,138 @@
// Code generated by protoc-gen-connect-go. DO NOT EDIT.
//
// Source: galaxy/gateway/v1/edge_gateway.proto
package gatewayv1connect
import (
connect "connectrpc.com/connect"
context "context"
errors "errors"
v1 "galaxy/gateway/proto/galaxy/gateway/v1"
http "net/http"
strings "strings"
)
// This is a compile-time assertion to ensure that this generated file and the connect package are
// compatible. If you get a compiler error that this constant is not defined, this code was
// generated with a version of connect newer than the one compiled into your binary. You can fix the
// problem by either regenerating this code with an older version of connect or updating the connect
// version compiled into your binary.
const _ = connect.IsAtLeastVersion1_13_0
const (
// EdgeGatewayName is the fully-qualified name of the EdgeGateway service.
EdgeGatewayName = "galaxy.gateway.v1.EdgeGateway"
)
// These constants are the fully-qualified names of the RPCs defined in this package. They're
// exposed at runtime as Spec.Procedure and as the final two segments of the HTTP route.
//
// Note that these are different from the fully-qualified method names used by
// google.golang.org/protobuf/reflect/protoreflect. To convert from these constants to
// reflection-formatted method names, remove the leading slash and convert the remaining slash to a
// period.
const (
// EdgeGatewayExecuteCommandProcedure is the fully-qualified name of the EdgeGateway's
// ExecuteCommand RPC.
EdgeGatewayExecuteCommandProcedure = "/galaxy.gateway.v1.EdgeGateway/ExecuteCommand"
// EdgeGatewaySubscribeEventsProcedure is the fully-qualified name of the EdgeGateway's
// SubscribeEvents RPC.
EdgeGatewaySubscribeEventsProcedure = "/galaxy.gateway.v1.EdgeGateway/SubscribeEvents"
)
// EdgeGatewayClient is a client for the galaxy.gateway.v1.EdgeGateway service.
type EdgeGatewayClient interface {
ExecuteCommand(context.Context, *connect.Request[v1.ExecuteCommandRequest]) (*connect.Response[v1.ExecuteCommandResponse], error)
SubscribeEvents(context.Context, *connect.Request[v1.SubscribeEventsRequest]) (*connect.ServerStreamForClient[v1.GatewayEvent], error)
}
// NewEdgeGatewayClient constructs a client for the galaxy.gateway.v1.EdgeGateway service. By
// default, it uses the Connect protocol with the binary Protobuf Codec, asks for gzipped responses,
// and sends uncompressed requests. To use the gRPC or gRPC-Web protocols, supply the
// connect.WithGRPC() or connect.WithGRPCWeb() options.
//
// The URL supplied here should be the base URL for the Connect or gRPC server (for example,
// http://api.acme.com or https://acme.com/grpc).
func NewEdgeGatewayClient(httpClient connect.HTTPClient, baseURL string, opts ...connect.ClientOption) EdgeGatewayClient {
baseURL = strings.TrimRight(baseURL, "/")
edgeGatewayMethods := v1.File_galaxy_gateway_v1_edge_gateway_proto.Services().ByName("EdgeGateway").Methods()
return &edgeGatewayClient{
executeCommand: connect.NewClient[v1.ExecuteCommandRequest, v1.ExecuteCommandResponse](
httpClient,
baseURL+EdgeGatewayExecuteCommandProcedure,
connect.WithSchema(edgeGatewayMethods.ByName("ExecuteCommand")),
connect.WithClientOptions(opts...),
),
subscribeEvents: connect.NewClient[v1.SubscribeEventsRequest, v1.GatewayEvent](
httpClient,
baseURL+EdgeGatewaySubscribeEventsProcedure,
connect.WithSchema(edgeGatewayMethods.ByName("SubscribeEvents")),
connect.WithClientOptions(opts...),
),
}
}
// edgeGatewayClient implements EdgeGatewayClient.
type edgeGatewayClient struct {
executeCommand *connect.Client[v1.ExecuteCommandRequest, v1.ExecuteCommandResponse]
subscribeEvents *connect.Client[v1.SubscribeEventsRequest, v1.GatewayEvent]
}
// ExecuteCommand calls galaxy.gateway.v1.EdgeGateway.ExecuteCommand.
func (c *edgeGatewayClient) ExecuteCommand(ctx context.Context, req *connect.Request[v1.ExecuteCommandRequest]) (*connect.Response[v1.ExecuteCommandResponse], error) {
return c.executeCommand.CallUnary(ctx, req)
}
// SubscribeEvents calls galaxy.gateway.v1.EdgeGateway.SubscribeEvents.
func (c *edgeGatewayClient) SubscribeEvents(ctx context.Context, req *connect.Request[v1.SubscribeEventsRequest]) (*connect.ServerStreamForClient[v1.GatewayEvent], error) {
return c.subscribeEvents.CallServerStream(ctx, req)
}
// EdgeGatewayHandler is an implementation of the galaxy.gateway.v1.EdgeGateway service.
type EdgeGatewayHandler interface {
ExecuteCommand(context.Context, *connect.Request[v1.ExecuteCommandRequest]) (*connect.Response[v1.ExecuteCommandResponse], error)
SubscribeEvents(context.Context, *connect.Request[v1.SubscribeEventsRequest], *connect.ServerStream[v1.GatewayEvent]) error
}
// NewEdgeGatewayHandler builds an HTTP handler from the service implementation. It returns the path
// on which to mount the handler and the handler itself.
//
// By default, handlers support the Connect, gRPC, and gRPC-Web protocols with the binary Protobuf
// and JSON codecs. They also support gzip compression.
func NewEdgeGatewayHandler(svc EdgeGatewayHandler, opts ...connect.HandlerOption) (string, http.Handler) {
edgeGatewayMethods := v1.File_galaxy_gateway_v1_edge_gateway_proto.Services().ByName("EdgeGateway").Methods()
edgeGatewayExecuteCommandHandler := connect.NewUnaryHandler(
EdgeGatewayExecuteCommandProcedure,
svc.ExecuteCommand,
connect.WithSchema(edgeGatewayMethods.ByName("ExecuteCommand")),
connect.WithHandlerOptions(opts...),
)
edgeGatewaySubscribeEventsHandler := connect.NewServerStreamHandler(
EdgeGatewaySubscribeEventsProcedure,
svc.SubscribeEvents,
connect.WithSchema(edgeGatewayMethods.ByName("SubscribeEvents")),
connect.WithHandlerOptions(opts...),
)
return "/galaxy.gateway.v1.EdgeGateway/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case EdgeGatewayExecuteCommandProcedure:
edgeGatewayExecuteCommandHandler.ServeHTTP(w, r)
case EdgeGatewaySubscribeEventsProcedure:
edgeGatewaySubscribeEventsHandler.ServeHTTP(w, r)
default:
http.NotFound(w, r)
}
})
}
// UnimplementedEdgeGatewayHandler returns CodeUnimplemented from all methods.
type UnimplementedEdgeGatewayHandler struct{}
func (UnimplementedEdgeGatewayHandler) ExecuteCommand(context.Context, *connect.Request[v1.ExecuteCommandRequest]) (*connect.Response[v1.ExecuteCommandResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("galaxy.gateway.v1.EdgeGateway.ExecuteCommand is not implemented"))
}
func (UnimplementedEdgeGatewayHandler) SubscribeEvents(context.Context, *connect.Request[v1.SubscribeEventsRequest], *connect.ServerStream[v1.GatewayEvent]) error {
return connect.NewError(connect.CodeUnimplemented, errors.New("galaxy.gateway.v1.EdgeGateway.SubscribeEvents is not implemented"))
}
+1
View File
@@ -15,6 +15,7 @@ require (
require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1 // indirect
connectrpc.com/connect v1.19.2 // indirect
dario.cat/mergo v1.0.2 // indirect
galaxy/util v0.0.0-00010101000000-000000000000 // indirect
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
+2
View File
@@ -1,5 +1,7 @@
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1 h1:PMmTMyvHScV9Mn8wc6ASge9uRcHy0jtqPd+fM35LmsQ=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1/go.mod h1:tvtbpgaVXZX4g6Pn+AnzFycuRK3MOz5HJfEGeEllXYM=
connectrpc.com/connect v1.19.2 h1:McQ83FGdzL+t60peksi0gXC7MQ/iLKgLduAnThbM0mo=
connectrpc.com/connect v1.19.2/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w=
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk=
@@ -5,30 +5,34 @@ import (
"crypto/ed25519"
"crypto/rand"
"crypto/sha256"
"crypto/tls"
"encoding/base64"
"errors"
"fmt"
"net"
"net/http"
"sync/atomic"
"time"
gatewayauthn "galaxy/gateway/authn"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
"connectrpc.com/connect"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"golang.org/x/net/http2"
)
// SignedGatewayClient drives the authenticated gRPC surface of the
// SignedGatewayClient drives the authenticated edge surface of the
// gateway from tests. It signs ExecuteCommand envelopes with the
// session's Ed25519 private key, verifies response signatures with
// the gateway's response-signer public key, and exposes a
// SubscribeEvents helper.
// SubscribeEvents helper. The client speaks Connect over HTTP/2
// cleartext (h2c) — the gateway listener supports that natively
// alongside gRPC and gRPC-Web on the same port.
type SignedGatewayClient struct {
conn *grpc.ClientConn
edge gatewayv1.EdgeGatewayClient
httpClient *http.Client
edge gatewayv1connect.EdgeGatewayClient
deviceSID string
privateKey ed25519.PrivateKey
respPub ed25519.PublicKey
@@ -55,25 +59,42 @@ func EncodePublicKey(pub ed25519.PublicKey) string {
return base64.StdEncoding.EncodeToString(pub)
}
// DialGateway opens a gRPC connection to gateway's authenticated
// surface and prepares a signing client bound to deviceSID.
func DialGateway(ctx context.Context, addr string, deviceSID string, privateKey ed25519.PrivateKey, respPub ed25519.PublicKey) (*SignedGatewayClient, error) {
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("dial gateway: %w", err)
// DialGateway opens a Connect (HTTP/2 cleartext) client against the
// gateway's authenticated edge listener at addr ("host:port") and
// prepares a signing client bound to deviceSID.
func DialGateway(_ context.Context, addr string, deviceSID string, privateKey ed25519.PrivateKey, respPub ed25519.PublicKey) (*SignedGatewayClient, error) {
if addr == "" {
return nil, fmt.Errorf("dial gateway: empty addr")
}
httpClient := &http.Client{
Transport: &http2.Transport{
AllowHTTP: true,
DialTLSContext: func(ctx context.Context, network, target string, _ *tls.Config) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
},
},
}
edge := gatewayv1connect.NewEdgeGatewayClient(httpClient, "http://"+addr)
return &SignedGatewayClient{
conn: conn,
edge: gatewayv1.NewEdgeGatewayClient(conn),
httpClient: httpClient,
edge: edge,
deviceSID: deviceSID,
privateKey: privateKey,
respPub: respPub,
}, nil
}
// Close releases the gRPC connection.
// Close releases idle HTTP/2 connections held by the underlying transport.
// The Connect client itself is stateless, so this is best-effort.
func (c *SignedGatewayClient) Close() error {
return c.conn.Close()
if c.httpClient != nil {
if transport, ok := c.httpClient.Transport.(*http2.Transport); ok {
transport.CloseIdleConnections()
}
}
return nil
}
// ExecuteOptions tunes one ExecuteCommand call. The zero value
@@ -155,10 +176,11 @@ func (c *SignedGatewayClient) Execute(ctx context.Context, messageType string, p
}
atomic.AddUint64(&c.requestSeq, 1)
resp, err := c.edge.ExecuteCommand(ctx, req)
respWrap, err := c.edge.ExecuteCommand(ctx, connect.NewRequest(req))
if err != nil {
return nil, err
}
resp := respWrap.Msg
respHash := sha256.Sum256(resp.GetPayloadBytes())
if string(respHash[:]) != string(resp.GetPayloadHash()) {
@@ -202,7 +224,7 @@ func (c *SignedGatewayClient) SubscribeEvents(ctx context.Context, messageType s
PayloadHash: emptyHash[:],
}))
stream, err := c.edge.SubscribeEvents(ctx, &gatewayv1.SubscribeEventsRequest{
stream, err := c.edge.SubscribeEvents(ctx, connect.NewRequest(&gatewayv1.SubscribeEventsRequest{
ProtocolVersion: protocolVersion,
DeviceSessionId: c.deviceSID,
MessageType: messageType,
@@ -210,7 +232,7 @@ func (c *SignedGatewayClient) SubscribeEvents(ctx context.Context, messageType s
RequestId: requestID,
PayloadHash: emptyHash[:],
Signature: signature,
})
}))
if err != nil {
return nil, nil, fmt.Errorf("open subscribe events: %w", err)
}
@@ -219,41 +241,39 @@ func (c *SignedGatewayClient) SubscribeEvents(ctx context.Context, messageType s
errs := make(chan error, 1)
go func() {
defer close(events)
for {
ev, err := stream.Recv()
if err != nil {
errs <- err
return
}
events <- ev
defer func() { _ = stream.Close() }()
for stream.Receive() {
events <- stream.Msg()
}
errs <- stream.Err()
}()
return events, errs, nil
}
// IsUnauthenticated reports whether err is a gRPC Unauthenticated
// status, useful for negative-path edge tests.
// IsUnauthenticated reports whether err carries Connect's
// CodeUnauthenticated, useful for negative-path edge tests.
func IsUnauthenticated(err error) bool {
return status.Code(err) == codes.Unauthenticated
return connect.CodeOf(err) == connect.CodeUnauthenticated
}
// IsInvalidArgument reports whether err is a gRPC InvalidArgument
// status (used for malformed envelopes and unsupported
// IsInvalidArgument reports whether err carries Connect's
// CodeInvalidArgument (used for malformed envelopes and unsupported
// protocol_version).
func IsInvalidArgument(err error) bool {
return status.Code(err) == codes.InvalidArgument
return connect.CodeOf(err) == connect.CodeInvalidArgument
}
// IsResourceExhausted reports whether err is a gRPC
// ResourceExhausted status (used for replay rejection).
// IsResourceExhausted reports whether err carries Connect's
// CodeResourceExhausted (used for replay rejection or rate-limit
// rejections).
func IsResourceExhausted(err error) bool {
return status.Code(err) == codes.ResourceExhausted
return connect.CodeOf(err) == connect.CodeResourceExhausted
}
// IsFailedPrecondition reports whether err is a gRPC
// FailedPrecondition status. The gateway uses this code for replay
// IsFailedPrecondition reports whether err carries Connect's
// CodeFailedPrecondition. The gateway uses this code for replay
// rejections (the canonical envelope was authentic but the
// `request_id` was already consumed).
func IsFailedPrecondition(err error) bool {
return status.Code(err) == codes.FailedPrecondition
return connect.CodeOf(err) == connect.CodeFailedPrecondition
}
+73 -31
View File
@@ -423,46 +423,88 @@ Targeted tests:
- `gateway/authn` cross-module parity tests as listed under
Artifacts.
## Phase 4. ConnectRPC Support in Gateway
## ~~Phase 4. ConnectRPC Support in Gateway~~
Status: pending. Cross-service phase — work happens in `gateway/`,
not `ui/`.
Status: done. Cross-service phase — work happened in `gateway/` and
`integration/`, not `ui/`.
Goal: enable browsers to call the gateway's authenticated gRPC surface
through ConnectRPC, while preserving the existing native gRPC ingress
for desktop and mobile clients.
Goal: enable browsers to call the gateway's authenticated edge surface
through ConnectRPC, without keeping a separate gRPC server bootstrap
alive purely for test clients.
Artifacts:
Decision (taken with the project owner before implementation): the
existing native-gRPC `grpc.NewServer` bootstrap was replaced with a
single `connectrpc.com/connect` HTTP/h2c listener, since Connect-Go
natively serves the Connect, gRPC, and gRPC-Web protocols on the same
port. No production gRPC clients existed to preserve. The package
`gateway/internal/grpcapi` keeps its name for diff-size reasons and
documents the historical labelling in its package doc.
- ConnectRPC handler registered alongside existing gRPC server in
`gateway/internal/...` using `connectrpc.com/connect`
- `gateway/buf.gen.yaml` extended to generate Connect-Go code from
existing `.proto` files
- updated `gateway/README.md` and `gateway/openapi.yaml` reflecting
Connect ingress endpoints
- updated `docs/ARCHITECTURE.md` §15 if the deployment topology changes
- `gateway/internal/.../connect_server_test.go` integration test
exercising a unary Connect call and a server-streaming Connect call
Artifacts (delivered):
Dependencies: Phase 3 (canonical bytes are needed for the integration
fixtures used here).
- `gateway/buf.gen.yaml` extended with `buf.build/connectrpc/go`,
generating `gateway/proto/galaxy/gateway/v1/gatewayv1connect/edge_gateway.connect.go`
- `gateway/internal/grpcapi/server.go` rewritten around `http.Server`
+ `h2c.NewHandler` + `gatewayv1connect.NewEdgeGatewayHandler`
- new `gateway/internal/grpcapi/connect_handler.go` adapting the
existing `gatewayv1.EdgeGatewayServer` decorator stack to the
Connect handler interface, including a `grpc.ServerStreamingServer`
shim around `*connect.ServerStream[GatewayEvent]` and a gRPC
`status.Error` → `*connect.Error` translation helper
- new `gateway/internal/grpcapi/connect_observability.go` Connect
interceptor recording the same metric and structured-log shape the
gRPC interceptors emitted; the rate-limit decorator now reads peer
IP from a context value populated by the interceptor instead of
`peer.FromContext`
- updated `gateway/README.md` (Transport Matrix + "Authenticated Edge
Surface"), `gateway/docs/runtime.md`, `gateway/docs/flows.md`,
`gateway/docs/runbook.md`, and `docs/ARCHITECTURE.md` §15
- migrated tests: `gateway/internal/grpcapi/server_test.go`,
`test_fixtures_test.go`, and every `*_integration_test.go` in that
package now drive a `gatewayv1connect.EdgeGatewayClient` over
HTTP/2 cleartext loopback
- migrated harness: `integration/testenv/grpc_client.go` →
`connect_client.go`. `SignedGatewayClient` keeps the same public
shape (`Execute`, `SubscribeEvents`, `Close`) but speaks Connect
internally; `Is*` helpers now use `connect.CodeOf`
Acceptance criteria:
Dependencies: Phase 3 (canonical bytes are needed for the
fixture-level signing the migrated tests use).
- a curl-based unary Connect call from outside the gateway process
succeeds end-to-end against the authenticated surface;
- server-streaming `SubscribeEvents` works over Connect with at least
one delivered event;
- existing native gRPC clients continue to work unchanged;
- both gRPC and Connect handlers share the same upstream business code
(no duplication beyond the protocol layer).
Acceptance criteria (met):
Targeted tests:
- unary Connect calls from outside the gateway process succeed
end-to-end against the authenticated surface — verified by the
migrated `grpcapi/server_test.go` and `command_routing_integration_test.go`
scenarios driving the Connect client over loopback h2c;
- server-streaming `SubscribeEvents` works over Connect with the
signed `gateway.server_time` bootstrap event delivered first —
verified by `TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation`;
- the unified listener still natively accepts gRPC and gRPC-Web
framing for any future native client (Connect-Go's documented
multi-protocol support);
- the Connect handler shares the same upstream business code as the
unified listener — there is exactly one decorator stack
(`grpcapi.NewServer` → `s.service`).
- Connect unary integration test against a running gateway+backend;
- Connect streaming integration test asserting at least one push event
delivery;
- existing gateway test suite stays green.
Targeted tests (delivered):
- Connect unary integration tests in `gateway/internal/grpcapi/`
exercising the full envelope → signature → freshness/replay →
rate-limit → routing pipeline through the new Connect transport;
- Connect streaming integration tests asserting bootstrap-event
delivery, replay rejection on stream open, and shutdown closure;
- the existing gateway test suite (`go test ./gateway/...`) stays
green.
Decision deviation note: the planned standalone
`gateway/internal/grpcapi/connect_server_test.go` was not added as a
separate file because the migrated `*_test.go` files in the same
package already cover unary happy + streaming bootstrap + protocol-
version reject through the Connect client. A duplicate file would not
add coverage. Future contributors looking for "the Connect tests" can
read any file in `gateway/internal/grpcapi/` — they all use the
Connect client now.
## Phase 5. WASM Build, `WasmCore` Adapter, `GalaxyClient` Skeleton