From 118f7c17a217a8caf69ac473c74fa7a963e33d89 Mon Sep 17 00:00:00 2001 From: Ilia Denisov Date: Thu, 7 May 2026 11:49:28 +0200 Subject: [PATCH] phase 4: connectrpc on the gateway authenticated edge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the native-gRPC server bootstrap with a single `connectrpc.com/connect` HTTP/h2c listener. Connect-Go natively serves Connect, gRPC, and gRPC-Web on the same port, so browsers can now reach the authenticated surface without giving up the gRPC framing native and desktop clients may use later. The decorator stack (envelope → session → payload-hash → signature → freshness/replay → rate-limit → routing/push) is reused unchanged behind a small Connect → gRPC adapter and a `grpc.ServerStream` shim around `*connect.ServerStream`. Co-Authored-By: Claude Opus 4.7 --- docs/ARCHITECTURE.md | 9 + docs/FUNCTIONAL.md | 18 +- docs/FUNCTIONAL_ru.md | 22 +-- gateway/README.md | 49 +++-- gateway/buf.gen.yaml | 4 + gateway/docs/flows.md | 2 +- gateway/docs/runbook.md | 4 +- gateway/docs/runtime.md | 10 +- gateway/go.mod | 5 +- gateway/go.sum | 6 + .../command_routing_integration_test.go | 118 ++++-------- gateway/internal/grpcapi/connect_handler.go | 143 +++++++++++++++ .../internal/grpcapi/connect_observability.go | 110 ++++++++++++ gateway/internal/grpcapi/envelope.go | 3 +- .../freshness_replay_integration_test.go | 156 +++++----------- gateway/internal/grpcapi/observability.go | 72 ++------ .../grpcapi/payload_hash_integration_test.go | 48 ++--- gateway/internal/grpcapi/rate_limit.go | 36 ++-- .../grpcapi/rate_limit_integration_test.go | 112 ++++-------- gateway/internal/grpcapi/server.go | 109 ++++++----- gateway/internal/grpcapi/server_test.go | 170 +++++++++--------- gateway/internal/grpcapi/session_lookup.go | 2 +- .../session_lookup_integration_test.go | 115 ++++-------- .../grpcapi/signature_integration_test.go | 74 +++----- .../internal/grpcapi/test_fixtures_test.go | 29 ++- .../gatewayv1connect/edge_gateway.connect.go | 138 ++++++++++++++ integration/go.mod | 1 + integration/go.sum | 2 + .../{grpc_client.go => connect_client.go} | 110 +++++++----- ui/PLAN.md | 104 +++++++---- 30 files changed, 1009 insertions(+), 772 deletions(-) create mode 100644 gateway/internal/grpcapi/connect_handler.go create mode 100644 gateway/internal/grpcapi/connect_observability.go create mode 100644 gateway/proto/galaxy/gateway/v1/gatewayv1connect/edge_gateway.connect.go rename integration/testenv/{grpc_client.go => connect_client.go} (68%) diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 3c1bd1b..c7448be 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -531,6 +531,15 @@ This section describes the secure exchange model between client and gateway. It applies at the public boundary and does not rely on backend behaviour for any of its guarantees. +The authenticated edge listener is built on `connectrpc.com/connect` and +natively serves the Connect, gRPC, and gRPC-Web protocols on a single +HTTP/2 cleartext (`h2c`) port. Browser clients use Connect via +`@connectrpc/connect-web`; native iOS / Android / desktop clients can +use either Connect or raw gRPC framing against the same listener. +Envelope, signature, freshness, and anti-replay rules below are +protocol-agnostic — they apply identically to every supported wire +framing. + ### Principles - No browser cookies. diff --git a/docs/FUNCTIONAL.md b/docs/FUNCTIONAL.md index 7a1522b..caa336d 100644 --- a/docs/FUNCTIONAL.md +++ b/docs/FUNCTIONAL.md @@ -139,9 +139,10 @@ consumed exactly once. ### 1.4 Per-request session lookup Once the client holds a device session id and a private key, every -authenticated call is a signed gRPC request to gateway. Gateway is the -only component that ever sees the request signature; backend trusts -gateway's verdict. +authenticated call is a signed request to gateway over the +authenticated edge listener (Connect / gRPC / gRPC-Web on a single +HTTP/h2c port). Gateway is the only component that ever sees the +request signature; backend trusts gateway's verdict. Gateway needs the session's public key to verify the signature, so each authenticated request resolves the device session through an in-memory @@ -602,8 +603,8 @@ not duplicated here. ### 6.2 Backend's role: pass-through with authorisation -The signed-gRPC pipeline for in-game traffic uses three message types -on the authenticated surface — `user.games.command`, +The signed authenticated-edge pipeline for in-game traffic uses three +message types on the authenticated surface — `user.games.command`, `user.games.order`, `user.games.report` — each with a typed FlatBuffers payload. Gateway transcodes the FB request into the JSON shape backend expects, forwards over plain REST to the corresponding @@ -680,9 +681,10 @@ session invalidations). ### 7.1 Scope -In scope: the gRPC stream a client opens against gateway, the -bootstrap event, the framing of forwarded events, and the -backend → gateway control channel that produces those events. +In scope: the server-streaming subscription a client opens against +gateway (Connect / gRPC / gRPC-Web framing all map to the same +endpoint), the bootstrap event, the framing of forwarded events, and +the backend → gateway control channel that produces those events. Out of scope: the catalog of event kinds — see [Section 8](#8-notifications-and-mail) for the notification side and [`backend/README.md` §10](../backend/README.md#10-notification-catalog) for the closed list. diff --git a/docs/FUNCTIONAL_ru.md b/docs/FUNCTIONAL_ru.md index 691c100..96bc532 100644 --- a/docs/FUNCTIONAL_ru.md +++ b/docs/FUNCTIONAL_ru.md @@ -138,9 +138,10 @@ Throttle-переиспользование на стороне send означ ### 1.4 Поиск сессии для каждого запроса Когда у клиента есть идентификатор устройства-сессии и приватный ключ, -каждый аутентифицированный вызов — это подписанный gRPC-запрос к -gateway. Gateway — единственный компонент, который видит подпись -запроса; backend доверяет вердикту gateway. +каждый аутентифицированный вызов — это подписанный запрос к gateway +по аутентифицированному edge-листенеру (Connect / gRPC / gRPC-Web на +одном HTTP/h2c-порту). Gateway — единственный компонент, который видит +подпись запроса; backend доверяет вердикту gateway. Gateway нужен публичный ключ сессии для проверки подписи, поэтому каждый аутентифицированный запрос разрешает устройство-сессию через @@ -618,10 +619,10 @@ Wire-формат команд, приказов и отчётов — собс ### 6.2 Роль backend: pass-through с авторизацией -Signed-gRPC-конвейер для in-game-трафика использует три message -types на аутентифицированной поверхности — `user.games.command`, -`user.games.order`, `user.games.report` — у каждого типизированный -FlatBuffers-payload. Gateway транскодирует FB-запрос в JSON-форму, +Подписанный конвейер аутентифицированного edge для in-game-трафика +использует три message types на аутентифицированной поверхности — +`user.games.command`, `user.games.order`, `user.games.report` — +у каждого типизированный FlatBuffers-payload. Gateway транскодирует FB-запрос в JSON-форму, которую ждёт backend, форвардит её REST'ом в соответствующий `/api/v1/user/games/{game_id}/*` endpoint, после чего транскодирует JSON-ответ обратно в FB перед подписью. @@ -697,9 +698,10 @@ notification-каталог явно их опускает ### 7.1 Состав -В составе: gRPC-стрим, который клиент открывает к gateway, -bootstrap-событие, фрейминг форварднутых событий, control-канал -backend → gateway, который производит эти события. +В составе: server-streaming-подписка, которую клиент открывает к +gateway (Connect / gRPC / gRPC-Web фреймы все маршрутизируются на +одну точку), bootstrap-событие, фрейминг форварднутых событий, +control-канал backend → gateway, который производит эти события. Вне состава: каталог видов событий — см. [Раздел 8](#8-уведомления-и-почта) для notification-стороны и diff --git a/gateway/README.md b/gateway/README.md index b34eab5..5e974d9 100644 --- a/gateway/README.md +++ b/gateway/README.md @@ -87,7 +87,15 @@ The gateway exposes two external transport classes. | Transport | Audience | Authentication | Payload format | Primary use | | --- | --- | --- | --- | --- | | REST/JSON | Public, unauthenticated traffic | No device session auth | JSON | Health checks, public auth commands, and browser/bootstrap traffic | -| gRPC over HTTP/2 | Authenticated clients only | Required | FlatBuffers payload inside protobuf control envelope | Verified commands and push delivery | +| Connect / gRPC / gRPC-Web over HTTP/2 (h2c) | Authenticated clients only | Required | FlatBuffers payload inside protobuf control envelope | Verified commands and push delivery | + +The authenticated edge listener is built on +[`connectrpc.com/connect`](https://connectrpc.com/) and natively serves +the Connect, gRPC, and gRPC-Web protocols on a single HTTP/2 cleartext +(`h2c`) port. Browser clients use `@connectrpc/connect-web`; native +clients can use either Connect or raw gRPC framing against the same +listener. Production TLS termination happens upstream of the gateway, +matching the previous gRPC-only deployment posture. ### Public REST Surface @@ -181,16 +189,21 @@ The endpoint exposes metrics in the Prometheus text exposition format described in the official Prometheus documentation: . -### Authenticated gRPC Surface +### Authenticated Edge Surface -All authenticated client requests use HTTP/2 and gRPC. -The listener address is configured by `GATEWAY_AUTHENTICATED_GRPC_ADDR`. -Inbound authenticated gRPC connection setup is bounded by +All authenticated client requests use HTTP/2 cleartext (`h2c`) and are +served through `connectrpc.com/connect`, which natively accepts the +Connect, gRPC, and gRPC-Web protocols on the same listener. +The listener address is configured by `GATEWAY_AUTHENTICATED_GRPC_ADDR` +(the env-var name retains the historical `GRPC` infix for operational +stability — it labels the authenticated edge tier, not the wire +protocol). +Inbound authenticated edge connection setup is bounded by `GATEWAY_AUTHENTICATED_GRPC_CONNECTION_TIMEOUT`, which defaults to `5s`. The accepted client timestamp skew is configured by `GATEWAY_AUTHENTICATED_GRPC_FRESHNESS_WINDOW` and defaults to `5m`. -The public gRPC service exposes two methods: +The public service exposes two methods: - `ExecuteCommand(ExecuteCommandRequest) returns (ExecuteCommandResponse)` - `SubscribeEvents(SubscribeEventsRequest) returns (stream GatewayEvent)` @@ -200,9 +213,12 @@ The gateway routes the request downstream by `message_type` after transport verification succeeds. Downstream unary execution is bounded by `GATEWAY_AUTHENTICATED_DOWNSTREAM_TIMEOUT`, which defaults to `5s`. -When that timeout expires, the gateway preserves the authenticated gRPC -contract and returns gRPC `UNAVAILABLE` with message -`downstream service is unavailable`. +When that timeout expires, the gateway preserves the authenticated edge +contract and returns `UNAVAILABLE` with message +`downstream service is unavailable`. Reject codes are documented using +their gRPC names (`INVALID_ARGUMENT`, `UNAUTHENTICATED`, …); the same +codes flow back to Connect clients as the corresponding `connect.Code*` +values. `SubscribeEvents` is an authenticated server-streaming RPC. It binds the stream to `user_id` and `device_session_id` and starts by sending @@ -211,8 +227,9 @@ a signed service event that includes the current server time in milliseconds. The v1 protobuf contract lives in `proto/galaxy/gateway/v1/edge_gateway.proto` under package `galaxy.gateway.v1` and service `EdgeGateway`. -Generated Go bindings are committed under `proto/galaxy/gateway/v1/` and are -regenerated with: +Generated Go bindings are committed under +`proto/galaxy/gateway/v1/` (gRPC stubs and `gatewayv1connect/` Connect +handlers) and are regenerated with: ```bash buf generate @@ -286,8 +303,8 @@ affected stream is closed with gRPC `RESOURCE_EXHAUSTED` and message same `device_session_id` was revoked, every active `SubscribeEvents` stream bound to that exact session is closed with gRPC `FAILED_PRECONDITION` and message `device session is revoked`. During gateway shutdown, the in-memory -push hub is closed before gRPC graceful stop, and every active -`SubscribeEvents` stream is terminated with gRPC `UNAVAILABLE` and message +push hub is closed before HTTP graceful stop, and every active +`SubscribeEvents` stream is terminated with `UNAVAILABLE` and message `gateway is shutting down`. Authenticated anti-abuse budgets are configured by the `GATEWAY_AUTHENTICATED_GRPC_ANTI_ABUSE_*` environment variables. @@ -851,9 +868,9 @@ subscribers, and telemetry runtime. `GATEWAY_SHUTDOWN_TIMEOUT` configures the per-component graceful shutdown budget and defaults to `5s`. -During authenticated gRPC shutdown, the in-memory `PushHub` closes active -streams before gRPC graceful stop, so active `SubscribeEvents` calls terminate -with gRPC `UNAVAILABLE` and message `gateway is shutting down`. +During authenticated edge shutdown, the in-memory `PushHub` closes active +streams before HTTP graceful stop, so active `SubscribeEvents` calls terminate +with `UNAVAILABLE` and message `gateway is shutting down`. ## Recommended Package Layout diff --git a/gateway/buf.gen.yaml b/gateway/buf.gen.yaml index e576cda..f496bce 100644 --- a/gateway/buf.gen.yaml +++ b/gateway/buf.gen.yaml @@ -9,3 +9,7 @@ plugins: out: proto opt: - paths=source_relative + - remote: buf.build/connectrpc/go:v1.19.2 + out: proto + opt: + - paths=source_relative diff --git a/gateway/docs/flows.md b/gateway/docs/flows.md index f5b59c9..f6c0fa3 100644 --- a/gateway/docs/flows.md +++ b/gateway/docs/flows.md @@ -75,6 +75,6 @@ sequenceDiagram Dispatcher->>Hub: RevokeDeviceSession or RevokeAllForUser Hub-->>Client: stream closes with FAILED_PRECONDITION - Note over Gateway,Hub: During shutdown the gateway closes PushHub before gRPC graceful stop. + Note over Gateway,Hub: During shutdown the gateway closes PushHub before HTTP graceful stop. Hub-->>Client: stream closes with UNAVAILABLE ``` diff --git a/gateway/docs/runbook.md b/gateway/docs/runbook.md index db70999..0af43ef 100644 --- a/gateway/docs/runbook.md +++ b/gateway/docs/runbook.md @@ -80,8 +80,8 @@ Shutdown behavior: - the per-component shutdown budget is controlled by `GATEWAY_SHUTDOWN_TIMEOUT`; - internal subscribers are stopped as part of application shutdown; -- the in-memory `PushHub` is closed before gRPC graceful stop; -- active `SubscribeEvents` streams terminate with gRPC `UNAVAILABLE` and +- the in-memory `PushHub` is closed before HTTP graceful stop; +- active `SubscribeEvents` streams terminate with `UNAVAILABLE` and message `gateway is shutting down`. During planned restarts: diff --git a/gateway/docs/runtime.md b/gateway/docs/runtime.md index 12336e1..2ae1f0f 100644 --- a/gateway/docs/runtime.md +++ b/gateway/docs/runtime.md @@ -7,12 +7,12 @@ runtime dependencies. flowchart LR subgraph Clients Public["Public REST clients"] - Authd["Authenticated gRPC clients"] + Authd["Authenticated edge clients\n(Connect / gRPC / gRPC-Web)"] end subgraph Gateway["Edge Gateway process"] PublicHTTP["Public HTTP listener\n/healthz /readyz /api/v1/public/auth/*"] - AuthGRPC["Authenticated gRPC listener\nExecuteCommand / SubscribeEvents"] + AuthGRPC["Authenticated edge listener (h2c)\nConnect / gRPC / gRPC-Web\nExecuteCommand / SubscribeEvents"] AdminHTTP["Optional admin HTTP listener\n/metrics"] BackendREST["backendclient.RESTClient\nsessions + public auth + user/lobby"] BackendPush["backendclient.PushClient\nSubscribePush consumer"] @@ -48,9 +48,13 @@ Notes: - `cmd/gateway` refuses startup when Redis connectivity, the backend endpoint, or the response signer is misconfigured. -- Session lookup is synchronous: every authenticated gRPC request triggers one +- Session lookup is synchronous: every authenticated edge request triggers one `GET /api/v1/internal/sessions/{id}` call to backend; there is no process-local projection. +- The authenticated edge listener is built on `connectrpc.com/connect` and + natively serves the Connect, gRPC, and gRPC-Web protocols on a single + HTTP/2 cleartext (`h2c`) port. Browsers use Connect; native clients can + use either Connect or raw gRPC framing against the same listener. - `backendclient.PushClient` keeps a long-lived `Push.SubscribePush` stream open. The dispatcher converts inbound `pushv1.PushEvent` frames into either `PushHub.Publish` (for client events) or `PushHub.RevokeDeviceSession` / diff --git a/gateway/go.mod b/gateway/go.mod index 98131cc..c1cef6f 100644 --- a/gateway/go.mod +++ b/gateway/go.mod @@ -5,6 +5,7 @@ go 1.26.1 require ( buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1 buf.build/go/protovalidate v1.1.3 + connectrpc.com/connect v1.19.2 galaxy/core v0.0.0-00010101000000-000000000000 galaxy/redisconn v0.0.0-00010101000000-000000000000 github.com/alicebob/miniredis/v2 v2.37.0 @@ -17,6 +18,7 @@ require ( github.com/stretchr/testify v1.11.1 go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.68.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.67.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 @@ -26,6 +28,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 go.uber.org/zap v1.27.1 + golang.org/x/net v0.53.0 golang.org/x/text v0.36.0 golang.org/x/time v0.15.0 google.golang.org/grpc v1.80.0 @@ -44,6 +47,7 @@ require ( github.com/cloudwego/base64x v0.1.6 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gabriel-vasile/mimetype v1.4.13 // indirect github.com/gin-contrib/sse v1.1.1 // indirect github.com/go-logr/logr v1.4.3 // indirect @@ -95,7 +99,6 @@ require ( golang.org/x/arch v0.25.0 // indirect golang.org/x/crypto v0.50.0 // indirect golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f // indirect - golang.org/x/net v0.53.0 // indirect golang.org/x/sys v0.43.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260420184626-e10c466a9529 // indirect diff --git a/gateway/go.sum b/gateway/go.sum index 6f428e7..a06e0b0 100644 --- a/gateway/go.sum +++ b/gateway/go.sum @@ -4,6 +4,8 @@ buf.build/go/protovalidate v1.1.3 h1:m2GVEgQWd7rk+vIoAZ+f0ygGjvQTuqPQapBBdcpWVPE buf.build/go/protovalidate v1.1.3/go.mod h1:9XIuohWz+kj+9JVn3WQneHA5LZP50mjvneZMnbLkiIE= cel.dev/expr v0.25.1 h1:1KrZg61W6TWSxuNZ37Xy49ps13NUovb66QLprthtwi4= cel.dev/expr v0.25.1/go.mod h1:hrXvqGP6G6gyx8UAHSHJ5RGk//1Oj5nXQ2NI02Nrsg4= +connectrpc.com/connect v1.19.2 h1:McQ83FGdzL+t60peksi0gXC7MQ/iLKgLduAnThbM0mo= +connectrpc.com/connect v1.19.2/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w= github.com/alicebob/miniredis/v2 v2.37.0 h1:RheObYW32G1aiJIj81XVt78ZHJpHonHLHW7OLIshq68= github.com/alicebob/miniredis/v2 v2.37.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM= github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ= @@ -34,6 +36,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM= github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= github.com/getkin/kin-openapi v0.135.0 h1:751SjYfbiwqukYuVjwYEIKNfrSwS5YpA7DZnKSwQgtg= @@ -171,6 +175,8 @@ go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0. go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.68.0/go.mod h1:MdHW7tLtkeGJnR4TyOrnd5D0zUGZQB1l84uHCe8hRpE= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.67.0 h1:yI1/OhfEPy7J9eoa6Sj051C7n5dvpj0QX8g4sRchg04= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.67.0/go.mod h1:NoUCKYWK+3ecatC4HjkRktREheMeEtrXoQxrqYFeHSc= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 h1:CqXxU8VOmDefoh0+ztfGaymYbhdB/tT3zs79QaZTNGY= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0/go.mod h1:BuhAPThV8PBHBvg8ZzZ/Ok3idOdhWIodywz2xEcRbJo= go.opentelemetry.io/contrib/propagators/b3 v1.43.0 h1:CETqV3QLLPTy5yNrqyMr41VnAOOD4lsRved7n4QG00A= go.opentelemetry.io/contrib/propagators/b3 v1.43.0/go.mod h1:Q4mCiCdziYzpNR0g+6UqVotAlCDZdzz6L8jwY4knOrw= go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= diff --git a/gateway/internal/grpcapi/command_routing_integration_test.go b/gateway/internal/grpcapi/command_routing_integration_test.go index 687d702..19608cc 100644 --- a/gateway/internal/grpcapi/command_routing_integration_test.go +++ b/gateway/internal/grpcapi/command_routing_integration_test.go @@ -11,14 +11,12 @@ import ( "galaxy/gateway/internal/config" "galaxy/gateway/internal/downstream" "galaxy/gateway/internal/testutil" - gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + "connectrpc.com/connect" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func TestExecuteCommandRoutesVerifiedCommandAndSignsResponse(t *testing.T) { @@ -58,32 +56,27 @@ func TestExecuteCommandRoutesVerifiedCommandAndSignsResponse(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - response, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + response, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.NoError(t, err) - assert.Equal(t, "v1", response.GetProtocolVersion()) - assert.Equal(t, "request-123", response.GetRequestId()) - assert.Equal(t, testCurrentTime.UnixMilli(), response.GetTimestampMs()) - assert.Equal(t, "accepted", response.GetResultCode()) - assert.Equal(t, []byte("downstream-response"), response.GetPayloadBytes()) + assert.Equal(t, "v1", response.Msg.GetProtocolVersion()) + assert.Equal(t, "request-123", response.Msg.GetRequestId()) + assert.Equal(t, testCurrentTime.UnixMilli(), response.Msg.GetTimestampMs()) + assert.Equal(t, "accepted", response.Msg.GetResultCode()) + assert.Equal(t, []byte("downstream-response"), response.Msg.GetPayloadBytes()) assert.Equal(t, 1, moveClient.executeCalls) assert.Zero(t, renameClient.executeCalls) wantHash := sha256.Sum256([]byte("downstream-response")) - assert.Equal(t, wantHash[:], response.GetPayloadHash()) - require.NoError(t, authn.VerifyPayloadHash(response.GetPayloadBytes(), response.GetPayloadHash())) - require.NoError(t, authn.VerifyResponseSignature(signer.PublicKey(), response.GetSignature(), authn.ResponseSigningFields{ - ProtocolVersion: response.GetProtocolVersion(), - RequestID: response.GetRequestId(), - TimestampMS: response.GetTimestampMs(), - ResultCode: response.GetResultCode(), - PayloadHash: response.GetPayloadHash(), + assert.Equal(t, wantHash[:], response.Msg.GetPayloadHash()) + require.NoError(t, authn.VerifyPayloadHash(response.Msg.GetPayloadBytes(), response.Msg.GetPayloadHash())) + require.NoError(t, authn.VerifyResponseSignature(signer.PublicKey(), response.Msg.GetSignature(), authn.ResponseSigningFields{ + ProtocolVersion: response.Msg.GetProtocolVersion(), + RequestID: response.Msg.GetRequestId(), + TimestampMS: response.Msg.GetTimestampMs(), + ResultCode: response.Msg.GetResultCode(), + PayloadHash: response.Msg.GetPayloadHash(), })) } @@ -99,16 +92,11 @@ func TestExecuteCommandRouteMissReturnsUnimplemented(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.Unimplemented, status.Code(err)) - assert.Equal(t, "message_type is not routed", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnimplemented, connect.CodeOf(err)) + assert.Equal(t, "message_type is not routed", connectErrorMessage(t, err)) } func TestExecuteCommandMapsDownstreamUnavailableToUnavailable(t *testing.T) { @@ -131,16 +119,11 @@ func TestExecuteCommandMapsDownstreamUnavailableToUnavailable(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "downstream service is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "downstream service is unavailable", connectErrorMessage(t, err)) assert.Equal(t, 1, failingClient.executeCalls) } @@ -167,16 +150,11 @@ func TestExecuteCommandMapsDownstreamTimeoutToUnavailable(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "downstream service is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "downstream service is unavailable", connectErrorMessage(t, err)) assert.Equal(t, 1, stallingClient.executeCalls) } @@ -203,16 +181,11 @@ func TestExecuteCommandFailsClosedWhenResponseSignerUnavailable(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "response signer is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "response signer is unavailable", connectErrorMessage(t, err)) assert.Equal(t, 1, successClient.executeCalls) } @@ -250,13 +223,8 @@ func TestExecuteCommandPropagatesOTelSpanContextToDownstream(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.NoError(t, err) assert.True(t, seenSpanContext.IsValid()) @@ -290,15 +258,10 @@ func TestExecuteCommandDrainsInFlightUnaryDuringShutdown(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) resultCh := make(chan error, 1) go func() { - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) resultCh <- err }() @@ -353,13 +316,8 @@ func TestExecuteCommandLogsDoNotContainSensitiveTransportMaterial(t *testing.T) defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.NoError(t, err) logOutput := logBuffer.String() diff --git a/gateway/internal/grpcapi/connect_handler.go b/gateway/internal/grpcapi/connect_handler.go new file mode 100644 index 0000000..fa9a2e3 --- /dev/null +++ b/gateway/internal/grpcapi/connect_handler.go @@ -0,0 +1,143 @@ +package grpcapi + +import ( + "context" + "errors" + "fmt" + + gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + "galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect" + + "connectrpc.com/connect" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + grpcstatus "google.golang.org/grpc/status" +) + +// connectEdgeAdapter exposes the existing gRPC-shaped authenticated edge +// service decorator stack (envelope → session → payload-hash → signature → +// freshness/replay → rate-limit → routing/push) through the +// gatewayv1connect.EdgeGatewayHandler interface. It owns no logic of its +// own; the underlying decorator stack carries the full ingress contract +// unchanged. +type connectEdgeAdapter struct { + impl gatewayv1.EdgeGatewayServer +} + +// newConnectEdgeAdapter wraps impl as a Connect handler. +func newConnectEdgeAdapter(impl gatewayv1.EdgeGatewayServer) gatewayv1connect.EdgeGatewayHandler { + return &connectEdgeAdapter{impl: impl} +} + +// ExecuteCommand unwraps the typed Connect request, calls the underlying +// service, and wraps the typed response. gRPC `status.Error` values +// returned by the decorator stack are translated to *connect.Error so +// the Connect client receives the matching code and message. +func (a *connectEdgeAdapter) ExecuteCommand(ctx context.Context, req *connect.Request[gatewayv1.ExecuteCommandRequest]) (*connect.Response[gatewayv1.ExecuteCommandResponse], error) { + resp, err := a.impl.ExecuteCommand(ctx, req.Msg) + if err != nil { + return nil, translateGRPCStatusError(err) + } + + return connect.NewResponse(resp), nil +} + +// SubscribeEvents adapts the Connect server stream to the +// grpc.ServerStreamingServer contract expected by the existing decorator +// stack. The decorator stack only ever calls Send and Context on the +// stream; the remaining grpc.ServerStream surface is satisfied by no-op +// shims so the interface contract is met without panicking. Errors +// returned by the decorator stack are translated to *connect.Error. +func (a *connectEdgeAdapter) SubscribeEvents(ctx context.Context, req *connect.Request[gatewayv1.SubscribeEventsRequest], stream *connect.ServerStream[gatewayv1.GatewayEvent]) error { + wrapped := &connectEdgeStream{ctx: ctx, stream: stream} + if err := a.impl.SubscribeEvents(req.Msg, wrapped); err != nil { + return translateGRPCStatusError(err) + } + + return nil +} + +// translateGRPCStatusError maps gRPC status.Error values returned by the +// decorator stack into *connect.Error with the equivalent code and message. +// Errors that are already *connect.Error pass through unchanged. Errors +// without a recognisable gRPC status are returned verbatim — connect-go +// renders those as CodeUnknown. +func translateGRPCStatusError(err error) error { + if err == nil { + return nil + } + + var connectErr *connect.Error + if errors.As(err, &connectErr) { + return err + } + + grpcStatus, ok := grpcstatus.FromError(err) + if !ok { + return err + } + if grpcStatus.Code() == codes.OK { + return nil + } + + return connect.NewError(connect.Code(grpcStatus.Code()), errors.New(grpcStatus.Message())) +} + +// connectEdgeStream satisfies grpc.ServerStreamingServer[gatewayv1.GatewayEvent] +// on top of *connect.ServerStream. The decorator stack reads the request +// context and pushes outbound events through Send; the rest of the +// grpc.ServerStream surface is not exercised in the gateway, so the no-op +// implementations preserve the type contract without surprising behaviour. +type connectEdgeStream struct { + ctx context.Context + stream *connect.ServerStream[gatewayv1.GatewayEvent] +} + +// Send forwards a typed gateway event through the underlying Connect server +// stream. +func (s *connectEdgeStream) Send(event *gatewayv1.GatewayEvent) error { + return s.stream.Send(event) +} + +// Context returns the request context handed to the Connect handler. +func (s *connectEdgeStream) Context() context.Context { + return s.ctx +} + +// SetHeader is part of grpc.ServerStream. The Connect transport exposes +// response headers through ResponseHeader() at construction time; metadata +// supplied here is intentionally ignored because no decorator in the +// gateway exercises the gRPC-only metadata path. +func (s *connectEdgeStream) SetHeader(metadata.MD) error { + return nil +} + +// SendHeader is part of grpc.ServerStream. Connect-served streams flush +// headers automatically on the first Send; manual header dispatch is not +// modelled. +func (s *connectEdgeStream) SendHeader(metadata.MD) error { + return nil +} + +// SetTrailer is part of grpc.ServerStream. Trailer metadata has no +// corresponding Connect concept on server-streaming responses. +func (s *connectEdgeStream) SetTrailer(metadata.MD) {} + +// SendMsg is part of grpc.ServerStream. The decorator stack never calls +// SendMsg directly; if a future caller does, the typed Send path is used +// when the message is a GatewayEvent. +func (s *connectEdgeStream) SendMsg(m any) error { + event, ok := m.(*gatewayv1.GatewayEvent) + if !ok { + return fmt.Errorf("connectEdgeStream.SendMsg: unsupported message type %T", m) + } + + return s.stream.Send(event) +} + +// RecvMsg is part of grpc.ServerStream. Server-streaming server handlers +// have no client messages to receive after the initial request, so this +// method is intentionally an error path. +func (s *connectEdgeStream) RecvMsg(any) error { + return errors.New("connectEdgeStream.RecvMsg: server-streaming has no client messages") +} diff --git a/gateway/internal/grpcapi/connect_observability.go b/gateway/internal/grpcapi/connect_observability.go new file mode 100644 index 0000000..c873a79 --- /dev/null +++ b/gateway/internal/grpcapi/connect_observability.go @@ -0,0 +1,110 @@ +package grpcapi + +import ( + "context" + "net" + "time" + + "galaxy/gateway/internal/telemetry" + + "connectrpc.com/connect" + "go.uber.org/zap" +) + +// observabilityConnectInterceptor returns a Connect interceptor that records +// the same structured log entry and authenticated edge metric pair as the +// gRPC instrumentation it replaced. It also injects the parsed peer IP into +// the request context so the rate-limit decorator can attribute requests +// without depending on the gRPC `peer` package. +func observabilityConnectInterceptor(logger *zap.Logger, metrics *telemetry.Runtime) connect.Interceptor { + if logger == nil { + logger = zap.NewNop() + } + + return &connectObservability{logger: logger, metrics: metrics} +} + +type connectObservability struct { + logger *zap.Logger + metrics *telemetry.Runtime +} + +// WrapUnary records timing and outcome for a single unary edge call. +func (o *connectObservability) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + ctx = contextWithPeerIP(ctx, hostFromConnectPeerAddr(req.Peer().Addr)) + + start := time.Now() + resp, err := next(ctx, req) + + var respValue any + if resp != nil { + respValue = resp.Any() + } + recordEdgeRequest(o.logger, o.metrics, ctx, "connect", req.Spec().Procedure, req.Any(), respValue, err, time.Since(start), "unary") + + return resp, err + } +} + +// WrapStreamingClient is the client-side hook required by the +// connect.Interceptor contract. The gateway only acts as a Connect server, +// so this hook is a pass-through. +func (o *connectObservability) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc { + return next +} + +// WrapStreamingHandler records timing and outcome for one server-streaming +// edge call. The wrapped conn captures the first received request so the +// log/metric pair carries the same envelope fields the gRPC instrumentation +// emitted before. +func (o *connectObservability) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc { + return func(ctx context.Context, conn connect.StreamingHandlerConn) error { + ctx = contextWithPeerIP(ctx, hostFromConnectPeerAddr(conn.Peer().Addr)) + + start := time.Now() + wrapped := &observabilityStreamingConn{StreamingHandlerConn: conn} + err := next(ctx, wrapped) + + recordEdgeRequest(o.logger, o.metrics, ctx, "connect", conn.Spec().Procedure, wrapped.firstRequest, nil, err, time.Since(start), "stream") + return err + } +} + +// observabilityStreamingConn captures the first received request so the +// streaming-handler interceptor can derive the envelope log fields after +// the handler returns. +type observabilityStreamingConn struct { + connect.StreamingHandlerConn + + firstRequest any +} + +// Receive forwards to the underlying conn and stores the first successful +// message, so envelopeFieldsFromRequest can read message_type, request_id, +// and trace_id from it. +func (c *observabilityStreamingConn) Receive(msg any) error { + err := c.StreamingHandlerConn.Receive(msg) + if err == nil && c.firstRequest == nil { + c.firstRequest = msg + } + + return err +} + +// hostFromConnectPeerAddr returns the host part of a "host:port" peer +// address, or the address verbatim when it cannot be split. Empty input +// yields an empty string so peerIPFromContext falls back to the canonical +// `unknown` bucket. +func hostFromConnectPeerAddr(addr string) string { + if addr == "" { + return "" + } + + host, _, err := net.SplitHostPort(addr) + if err == nil && host != "" { + return host + } + + return addr +} diff --git a/gateway/internal/grpcapi/envelope.go b/gateway/internal/grpcapi/envelope.go index 885789c..d3a6a71 100644 --- a/gateway/internal/grpcapi/envelope.go +++ b/gateway/internal/grpcapi/envelope.go @@ -4,8 +4,7 @@ import ( "bytes" "context" "fmt" - - "galaxy/gateway/proto/galaxy/gateway/v1" + gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" "buf.build/go/protovalidate" "google.golang.org/grpc" diff --git a/gateway/internal/grpcapi/freshness_replay_integration_test.go b/gateway/internal/grpcapi/freshness_replay_integration_test.go index d0da946..d2c154e 100644 --- a/gateway/internal/grpcapi/freshness_replay_integration_test.go +++ b/gateway/internal/grpcapi/freshness_replay_integration_test.go @@ -3,7 +3,6 @@ package grpcapi import ( "context" "errors" - "io" "sync" "testing" "time" @@ -12,11 +11,10 @@ import ( "galaxy/gateway/internal/session" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + "connectrpc.com/connect" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func TestExecuteCommandRejectsStaleTimestamp(t *testing.T) { @@ -51,16 +49,11 @@ func TestExecuteCommandRejectsStaleTimestamp(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", tt.timestampMS)) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", tt.timestampMS))) require.Error(t, err) - assert.Equal(t, codes.FailedPrecondition, status.Code(err)) - assert.Equal(t, "request timestamp is outside the freshness window", status.Convert(err).Message()) + assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) + assert.Equal(t, "request timestamp is outside the freshness window", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) }) } @@ -98,16 +91,11 @@ func TestSubscribeEventsRejectsStaleTimestamp(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequestWithTimestamp("device-session-123", "request-123", tt.timestampMS)) require.Error(t, err) - assert.Equal(t, codes.FailedPrecondition, status.Code(err)) - assert.Equal(t, "request timestamp is outside the freshness window", status.Convert(err).Message()) + assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) + assert.Equal(t, "request timestamp is outside the freshness window", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) }) } @@ -127,21 +115,16 @@ func TestExecuteCommandRejectsReplay(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) req := newValidExecuteCommandRequest() - _, err := client.ExecuteCommand(context.Background(), req) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(req)) require.NoError(t, err) - _, err = client.ExecuteCommand(context.Background(), req) + _, err = client.ExecuteCommand(context.Background(), connect.NewRequest(req)) require.Error(t, err) - assert.Equal(t, codes.FailedPrecondition, status.Code(err)) - assert.Equal(t, "request replay detected", status.Convert(err).Message()) + assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) + assert.Equal(t, "request replay detected", connectErrorMessage(t, err)) assert.Equal(t, 1, delegate.executeCalls) } @@ -159,25 +142,20 @@ func TestSubscribeEventsRejectsReplay(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) req := newValidSubscribeEventsRequest() - stream, err := client.SubscribeEvents(context.Background(), req) + stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(req)) require.NoError(t, err) event := recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli()) - _, err = stream.Recv() - require.ErrorIs(t, err, io.EOF) + require.False(t, stream.Receive()) + require.NoError(t, stream.Err()) err = subscribeEventsError(t, context.Background(), client, req) require.Error(t, err) - assert.Equal(t, codes.FailedPrecondition, status.Code(err)) - assert.Equal(t, "request replay detected", status.Convert(err).Message()) + assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) + assert.Equal(t, "request replay detected", connectErrorMessage(t, err)) assert.Equal(t, 1, delegate.subscribeCalls) } @@ -204,17 +182,12 @@ func TestExecuteCommandAllowsSameRequestIDAcrossDistinctSessions(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-123", "request-shared")) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-123", "request-shared"))) require.NoError(t, err) - _, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-456", "request-shared")) + _, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-456", "request-shared"))) require.NoError(t, err) assert.Equal(t, 2, delegate.executeCalls) @@ -243,26 +216,21 @@ func TestSubscribeEventsAllowsSameRequestIDAcrossDistinctSessions(t *testing.T) defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - - stream, err := client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-123", "request-shared")) + stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-123", "request-shared"))) require.NoError(t, err) event := recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-shared", "trace-123", testCurrentTime.UnixMilli()) - _, err = stream.Recv() - require.ErrorIs(t, err, io.EOF) + require.False(t, stream.Receive()) + require.NoError(t, stream.Err()) - stream, err = client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-456", "request-shared")) + stream, err = client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-456", "request-shared"))) require.NoError(t, err) event = recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-shared", "trace-123", testCurrentTime.UnixMilli()) - _, err = stream.Recv() - require.ErrorIs(t, err, io.EOF) + require.False(t, stream.Receive()) + require.NoError(t, stream.Err()) assert.Equal(t, 2, delegate.subscribeCalls) } @@ -283,16 +251,11 @@ func TestExecuteCommandRejectsReplayStoreUnavailable(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "replay store is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) } @@ -312,16 +275,11 @@ func TestSubscribeEventsRejectsReplayStoreUnavailable(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest()) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "replay store is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) } @@ -353,15 +311,10 @@ func TestExecuteCommandFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *tes defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - response, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + response, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.NoError(t, err) - assert.Equal(t, "request-123", response.GetRequestId()) + assert.Equal(t, "request-123", response.Msg.GetRequestId()) assert.Equal(t, "device-session-123", reservedDeviceSessionID) assert.Equal(t, "request-123", reservedRequestID) assert.Equal(t, testFreshnessWindow, reservedTTL) @@ -394,18 +347,13 @@ func TestSubscribeEventsFreshRequestReachesDelegateAndUsesDynamicReplayTTL(t *te defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - stream, err := client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequest()) + client := newEdgeClient(t, addr) + stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequest())) require.NoError(t, err) event := recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli()) - _, err = stream.Recv() - require.ErrorIs(t, err, io.EOF) + require.False(t, stream.Receive()) + require.NoError(t, stream.Err()) assert.Equal(t, testFreshnessWindow, reservedTTL) assert.Equal(t, 1, delegate.subscribeCalls) } @@ -434,15 +382,10 @@ func TestExecuteCommandFutureSkewUsesExtendedReplayTTL(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) _, err := client.ExecuteCommand( context.Background(), - newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(2*time.Minute).UnixMilli()), + connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(2*time.Minute).UnixMilli())), ) require.NoError(t, err) assert.Equal(t, 7*time.Minute, reservedTTL) @@ -473,15 +416,10 @@ func TestExecuteCommandBoundaryFreshnessUsesMinimumReplayTTL(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) _, err := client.ExecuteCommand( context.Background(), - newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(-testFreshnessWindow).UnixMilli()), + connect.NewRequest(newValidExecuteCommandRequestWithTimestamp("device-session-123", "request-123", testCurrentTime.Add(-testFreshnessWindow).UnixMilli())), ) require.NoError(t, err) assert.Equal(t, minimumReplayReservationTTL, reservedTTL) diff --git a/gateway/internal/grpcapi/observability.go b/gateway/internal/grpcapi/observability.go index 0c1463d..0d1438f 100644 --- a/gateway/internal/grpcapi/observability.go +++ b/gateway/internal/grpcapi/observability.go @@ -12,59 +12,21 @@ import ( "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -func observabilityUnaryInterceptor(logger *zap.Logger, metrics *telemetry.Runtime) grpc.UnaryServerInterceptor { - if logger == nil { - logger = zap.NewNop() - } - - return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - start := time.Now() - resp, err := handler(ctx, req) - - recordGRPCRequest(logger, metrics, ctx, info.FullMethod, req, resp, err, time.Since(start), "unary") - return resp, err - } -} - -func observabilityStreamInterceptor(logger *zap.Logger, metrics *telemetry.Runtime) grpc.StreamServerInterceptor { - if logger == nil { - logger = zap.NewNop() - } - - return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - start := time.Now() - wrapped := &observabilityServerStream{ServerStream: stream} - err := handler(srv, wrapped) - - recordGRPCRequest(logger, metrics, stream.Context(), info.FullMethod, wrapped.request, nil, err, time.Since(start), "stream") - return err - } -} - -type observabilityServerStream struct { - grpc.ServerStream - request any -} - -func (s *observabilityServerStream) RecvMsg(m any) error { - err := s.ServerStream.RecvMsg(m) - if err == nil && s.request == nil { - s.request = m - } - - return err -} - -func recordGRPCRequest(logger *zap.Logger, metrics *telemetry.Runtime, ctx context.Context, fullMethod string, req any, resp any, err error, duration time.Duration, streamKind string) { +// recordEdgeRequest emits the structured log entry and the +// `gateway.authenticated_grpc.*` metric pair for one authenticated edge +// request or stream outcome. The transport parameter labels the wire +// protocol the request travelled over (`connect`, `grpc`, or `grpc-web`), +// preserving stable observability semantics across the unified Connect-go +// listener. +func recordEdgeRequest(logger *zap.Logger, metrics *telemetry.Runtime, ctx context.Context, transport string, fullMethod string, req any, resp any, err error, duration time.Duration, streamKind string) { rpcMethod := path.Base(fullMethod) - messageType, requestID, traceID := grpcEnvelopeFields(req) - resultCode := grpcResultCode(resp) - grpcCode, grpcMessage, outcome := grpcOutcome(err) + messageType, requestID, traceID := envelopeFieldsFromRequest(req) + resultCode := resultCodeFromResponse(resp) + grpcCode, grpcMessage, outcome := outcomeFromError(err) rejectReason := telemetry.RejectReason(outcome) attrs := []attribute.KeyValue{ @@ -82,7 +44,7 @@ func recordGRPCRequest(logger *zap.Logger, metrics *telemetry.Runtime, ctx conte fields := []zap.Field{ zap.String("component", "authenticated_grpc"), - zap.String("transport", "grpc"), + zap.String("transport", transport), zap.String("stream_kind", streamKind), zap.String("rpc_method", rpcMethod), zap.String("message_type", messageType), @@ -106,15 +68,15 @@ func recordGRPCRequest(logger *zap.Logger, metrics *telemetry.Runtime, ctx conte switch outcome { case telemetry.EdgeOutcomeSuccess: - logger.Info("authenticated gRPC request completed", fields...) + logger.Info("authenticated edge request completed", fields...) case telemetry.EdgeOutcomeBackendUnavailable, telemetry.EdgeOutcomeDownstreamUnavailable, telemetry.EdgeOutcomeInternalError: - logger.Error("authenticated gRPC request failed", fields...) + logger.Error("authenticated edge request failed", fields...) default: - logger.Warn("authenticated gRPC request rejected", fields...) + logger.Warn("authenticated edge request rejected", fields...) } } -func grpcEnvelopeFields(req any) (messageType string, requestID string, traceID string) { +func envelopeFieldsFromRequest(req any) (messageType string, requestID string, traceID string) { switch typed := req.(type) { case *gatewayv1.ExecuteCommandRequest: return typed.GetMessageType(), typed.GetRequestId(), typed.GetTraceId() @@ -125,7 +87,7 @@ func grpcEnvelopeFields(req any) (messageType string, requestID string, traceID } } -func grpcResultCode(resp any) string { +func resultCodeFromResponse(resp any) string { typed, ok := resp.(*gatewayv1.ExecuteCommandResponse) if !ok { return "" @@ -134,7 +96,7 @@ func grpcResultCode(resp any) string { return typed.GetResultCode() } -func grpcOutcome(err error) (codes.Code, string, telemetry.EdgeOutcome) { +func outcomeFromError(err error) (codes.Code, string, telemetry.EdgeOutcome) { switch { case err == nil: return codes.OK, "", telemetry.EdgeOutcomeSuccess diff --git a/gateway/internal/grpcapi/payload_hash_integration_test.go b/gateway/internal/grpcapi/payload_hash_integration_test.go index 84b1c20..c8de30d 100644 --- a/gateway/internal/grpcapi/payload_hash_integration_test.go +++ b/gateway/internal/grpcapi/payload_hash_integration_test.go @@ -6,12 +6,10 @@ import ( "testing" "galaxy/gateway/internal/session" - gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + "connectrpc.com/connect" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func TestExecuteCommandRejectsPayloadHashWithInvalidLength(t *testing.T) { @@ -25,19 +23,15 @@ func TestExecuteCommandRejectsPayloadHashWithInvalidLength(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) req := newValidExecuteCommandRequest() req.PayloadHash = []byte("short") - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), req) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(req)) require.Error(t, err) - assert.Equal(t, codes.InvalidArgument, status.Code(err)) - assert.Equal(t, "payload_hash must be a 32-byte SHA-256 digest", status.Convert(err).Message()) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) + assert.Equal(t, "payload_hash must be a 32-byte SHA-256 digest", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) } @@ -52,20 +46,16 @@ func TestExecuteCommandRejectsPayloadHashMismatch(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) req := newValidExecuteCommandRequest() sum := sha256.Sum256([]byte("other")) req.PayloadHash = sum[:] - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), req) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(req)) require.Error(t, err) - assert.Equal(t, codes.InvalidArgument, status.Code(err)) - assert.Equal(t, "payload_hash does not match payload_bytes", status.Convert(err).Message()) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) + assert.Equal(t, "payload_hash does not match payload_bytes", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) } @@ -80,19 +70,15 @@ func TestSubscribeEventsRejectsPayloadHashWithInvalidLength(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) req := newValidSubscribeEventsRequest() req.PayloadHash = []byte("short") - client := gatewayv1.NewEdgeGatewayClient(conn) err := subscribeEventsError(t, context.Background(), client, req) require.Error(t, err) - assert.Equal(t, codes.InvalidArgument, status.Code(err)) - assert.Equal(t, "payload_hash must be a 32-byte SHA-256 digest", status.Convert(err).Message()) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) + assert.Equal(t, "payload_hash must be a 32-byte SHA-256 digest", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) } @@ -107,19 +93,15 @@ func TestSubscribeEventsRejectsPayloadHashMismatch(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) req := newValidSubscribeEventsRequest() sum := sha256.Sum256([]byte("other")) req.PayloadHash = sum[:] - client := gatewayv1.NewEdgeGatewayClient(conn) err := subscribeEventsError(t, context.Background(), client, req) require.Error(t, err) - assert.Equal(t, codes.InvalidArgument, status.Code(err)) - assert.Equal(t, "payload_hash does not match payload_bytes", status.Convert(err).Message()) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) + assert.Equal(t, "payload_hash does not match payload_bytes", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) } diff --git a/gateway/internal/grpcapi/rate_limit.go b/gateway/internal/grpcapi/rate_limit.go index 87bad40..6dd2a0d 100644 --- a/gateway/internal/grpcapi/rate_limit.go +++ b/gateway/internal/grpcapi/rate_limit.go @@ -3,8 +3,6 @@ package grpcapi import ( "context" "errors" - "net" - "strings" "galaxy/gateway/internal/config" "galaxy/gateway/internal/ratelimit" @@ -13,7 +11,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/peer" "google.golang.org/grpc/status" ) @@ -41,7 +38,7 @@ var ( ErrAuthenticatedPolicyUnavailable = errors.New("authenticated request policy is unavailable") ) -// AuthenticatedRequestLimiter applies authenticated gRPC rate-limit policy to +// AuthenticatedRequestLimiter applies authenticated edge rate-limit policy to // one concrete bucket key. type AuthenticatedRequestLimiter interface { // Reserve evaluates key under policy and reports whether the request may @@ -52,10 +49,11 @@ type AuthenticatedRequestLimiter interface { // AuthenticatedRequest describes the authenticated request metadata exposed to // the edge-policy hook. type AuthenticatedRequest struct { - // RPCMethod identifies the public gRPC method being processed. + // RPCMethod identifies the public RPC method being processed. RPCMethod string - // PeerIP is the transport peer IP derived from the gRPC connection. + // PeerIP is the transport peer IP host part derived from the + // authenticated edge HTTP listener peer address. PeerIP string // MessageClass is the stable rate-limit and policy class. The gateway uses @@ -258,23 +256,21 @@ func authenticatedMessageClass(messageType string) string { return messageType } +type peerIPContextKey struct{} + +// contextWithPeerIP attaches the authenticated edge transport peer IP to ctx. +// It is set by the transport interceptor before the service decorator stack +// runs, and read back via peerIPFromContext. +func contextWithPeerIP(ctx context.Context, ip string) context.Context { + return context.WithValue(ctx, peerIPContextKey{}, ip) +} + func peerIPFromContext(ctx context.Context) string { - peerInfo, ok := peer.FromContext(ctx) - if !ok || peerInfo.Addr == nil { - return unknownAuthenticatedPeerIP + if ip, ok := ctx.Value(peerIPContextKey{}).(string); ok && ip != "" { + return ip } - value := strings.TrimSpace(peerInfo.Addr.String()) - if value == "" { - return unknownAuthenticatedPeerIP - } - - host, _, err := net.SplitHostPort(value) - if err == nil && host != "" { - return host - } - - return value + return unknownAuthenticatedPeerIP } type noopAuthenticatedRequestPolicy struct{} diff --git a/gateway/internal/grpcapi/rate_limit_integration_test.go b/gateway/internal/grpcapi/rate_limit_integration_test.go index 8d515e2..1992642 100644 --- a/gateway/internal/grpcapi/rate_limit_integration_test.go +++ b/gateway/internal/grpcapi/rate_limit_integration_test.go @@ -3,7 +3,6 @@ package grpcapi import ( "context" "fmt" - "io" "net" "net/http" "strings" @@ -17,10 +16,9 @@ import ( "galaxy/gateway/internal/session" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + "connectrpc.com/connect" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func TestExecuteCommandRateLimitsByIP(t *testing.T) { @@ -41,20 +39,15 @@ func TestExecuteCommandRateLimitsByIP(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1")) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1"))) require.NoError(t, err) - _, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-2")) + _, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-2"))) require.Error(t, err) - assert.Equal(t, codes.ResourceExhausted, status.Code(err)) - assert.Equal(t, "authenticated request rate limit exceeded", status.Convert(err).Message()) + assert.Equal(t, connect.CodeResourceExhausted, connect.CodeOf(err)) + assert.Equal(t, "authenticated request rate limit exceeded", connectErrorMessage(t, err)) assert.Equal(t, 1, delegate.executeCalls) } @@ -76,21 +69,16 @@ func TestExecuteCommandRateLimitsBySession(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1")) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1"))) require.NoError(t, err) - _, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-2")) + _, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-2"))) require.Error(t, err) - assert.Equal(t, codes.ResourceExhausted, status.Code(err)) + assert.Equal(t, connect.CodeResourceExhausted, connect.CodeOf(err)) - _, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-3")) + _, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-3"))) require.NoError(t, err) assert.Equal(t, 2, delegate.executeCalls) @@ -118,21 +106,16 @@ func TestExecuteCommandRateLimitsByUser(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1")) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-1", "request-1"))) require.NoError(t, err) - _, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-2")) + _, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-2", "request-2"))) require.Error(t, err) - assert.Equal(t, codes.ResourceExhausted, status.Code(err)) + assert.Equal(t, connect.CodeResourceExhausted, connect.CodeOf(err)) - _, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithSessionAndRequestID("device-session-3", "request-3")) + _, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithSessionAndRequestID("device-session-3", "request-3"))) require.NoError(t, err) assert.Equal(t, 2, delegate.executeCalls) @@ -159,21 +142,16 @@ func TestExecuteCommandRateLimitsByMessageClass(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithMessageType("device-session-1", "request-1", "fleet.move")) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithMessageType("device-session-1", "request-1", "fleet.move"))) require.NoError(t, err) - _, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithMessageType("device-session-2", "request-2", "fleet.move")) + _, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithMessageType("device-session-2", "request-2", "fleet.move"))) require.Error(t, err) - assert.Equal(t, codes.ResourceExhausted, status.Code(err)) + assert.Equal(t, connect.CodeResourceExhausted, connect.CodeOf(err)) - _, err = client.ExecuteCommand(context.Background(), newValidExecuteCommandRequestWithMessageType("device-session-2", "request-3", "fleet.rename")) + _, err = client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequestWithMessageType("device-session-2", "request-3", "fleet.rename"))) require.NoError(t, err) assert.Equal(t, 2, delegate.executeCalls) @@ -193,13 +171,8 @@ func TestAuthenticatedPolicyHookReceivesVerifiedRequest(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.NoError(t, err) require.Len(t, policy.requests, 1) @@ -228,16 +201,11 @@ func TestExecuteCommandPolicyRejectMapsToPermissionDenied(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.PermissionDenied, status.Code(err)) - assert.Equal(t, "authenticated request rejected by edge policy", status.Convert(err).Message()) + assert.Equal(t, connect.CodePermissionDenied, connect.CodeOf(err)) + assert.Equal(t, "authenticated request rejected by edge policy", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) } @@ -259,24 +227,19 @@ func TestSubscribeEventsRateLimitRejectsStream(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - - stream, err := client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-1", "request-1")) + stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-1", "request-1"))) require.NoError(t, err) event := recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-1", "trace-123", testCurrentTime.UnixMilli()) - _, err = stream.Recv() - require.ErrorIs(t, err, io.EOF) + require.False(t, stream.Receive()) + require.NoError(t, stream.Err()) err = subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequestWithSessionAndRequestID("device-session-2", "request-2")) require.Error(t, err) - assert.Equal(t, codes.ResourceExhausted, status.Code(err)) - assert.Equal(t, "authenticated request rate limit exceeded", status.Convert(err).Message()) + assert.Equal(t, connect.CodeResourceExhausted, connect.CodeOf(err)) + assert.Equal(t, "authenticated request rate limit exceeded", connectErrorMessage(t, err)) assert.Equal(t, 1, delegate.subscribeCalls) } @@ -342,13 +305,8 @@ func TestAuthenticatedRateLimitsStayIsolatedFromPublicREST(t *testing.T) { require.NoError(t, firstPublic.Body.Close()) require.NoError(t, secondPublic.Body.Close()) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.NoError(t, err) } diff --git a/gateway/internal/grpcapi/server.go b/gateway/internal/grpcapi/server.go index ed7c5fa..e101b58 100644 --- a/gateway/internal/grpcapi/server.go +++ b/gateway/internal/grpcapi/server.go @@ -1,4 +1,10 @@ -// Package grpcapi exposes the authenticated gRPC surface of the gateway. +// Package grpcapi exposes the authenticated edge transport surface of the +// gateway. Despite the historical package name, the listener is built on +// `connectrpc.com/connect` and natively serves the Connect, gRPC, and +// gRPC-Web protocols on a single HTTP/h2c listener. The configured Go +// types and environment variable names retain the `gRPC` infix for +// operational stability — they describe the authenticated edge tier, not +// the wire protocol. package grpcapi import ( @@ -6,6 +12,7 @@ import ( "errors" "fmt" "net" + "net/http" "sync" "galaxy/gateway/authn" @@ -18,14 +25,17 @@ import ( "galaxy/gateway/internal/session" "galaxy/gateway/internal/telemetry" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + "galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "connectrpc.com/connect" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.uber.org/zap" - "google.golang.org/grpc" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" ) // ServerDependencies describes the optional collaborators used by the -// authenticated gRPC server. The zero value is valid and keeps the process +// authenticated edge server. The zero value is valid and keeps the process // runnable with the built-in unimplemented service stub. type ServerDependencies struct { // Service optionally handles the post-bootstrap SubscribeEvents lifecycle @@ -45,12 +55,12 @@ type ServerDependencies struct { ResponseSigner authn.ResponseSigner // SessionCache resolves authenticated device sessions after the envelope - // gate succeeds. When nil, the authenticated gRPC surface remains runnable + // gate succeeds. When nil, the authenticated edge surface remains runnable // but valid envelopes fail closed as session-cache unavailable. SessionCache session.Cache // Clock provides current server time for freshness checks. When nil, the - // authenticated gRPC surface uses the system clock. + // authenticated edge surface uses the system clock. Clock clock.Clock // ReplayStore reserves authenticated request identifiers after signature @@ -59,26 +69,28 @@ type ServerDependencies struct { ReplayStore replay.Store // Limiter applies authenticated rate limits after the request passes the - // transport authenticity checks. When nil, the authenticated gRPC surface + // transport authenticity checks. When nil, the authenticated edge surface // uses a process-local in-memory limiter. Limiter AuthenticatedRequestLimiter // Policy evaluates later authenticated edge policy after rate limits pass. - // When nil, the authenticated gRPC surface applies a no-op allow policy. + // When nil, the authenticated edge surface applies a no-op allow policy. Policy AuthenticatedRequestPolicy - // Logger writes structured logs for authenticated gRPC traffic. + // Logger writes structured logs for authenticated edge traffic. Logger *zap.Logger - // Telemetry records low-cardinality gRPC metrics. + // Telemetry records low-cardinality edge metrics. Telemetry *telemetry.Runtime // PushHub is the active authenticated push-stream hub. When present, the - // server closes active streams before GracefulStop during shutdown. + // server closes active streams before HTTP graceful shutdown. PushHub *push.Hub } -// Server owns the authenticated gRPC listener exposed by the gateway. +// Server owns the authenticated edge HTTP/h2c listener exposed by the +// gateway. It serves the Connect, gRPC, and gRPC-Web protocols from a +// single net/http listener. type Server struct { cfg config.AuthenticatedGRPCConfig service gatewayv1.EdgeGatewayServer @@ -87,11 +99,11 @@ type Server struct { metrics *telemetry.Runtime stateMu sync.RWMutex - server *grpc.Server + server *http.Server listener net.Listener } -// NewServer constructs an authenticated gRPC server for the supplied listener +// NewServer constructs an authenticated edge server for the supplied listener // configuration and dependency bundle. Nil dependencies are replaced with safe // defaults so the gateway can expose the documented transport surface with the // full auth pipeline wired from built-in fallbacks. @@ -128,17 +140,17 @@ func NewServer(cfg config.AuthenticatedGRPCConfig, deps ServerDependencies) *Ser deps.SessionCache, ), ), - logger: deps.Logger.Named("authenticated_grpc"), + logger: deps.Logger.Named("authenticated_edge"), pushHub: deps.PushHub, metrics: deps.Telemetry, } } -// Run binds the configured listener and serves the authenticated gRPC surface -// until Shutdown closes the server. +// Run binds the configured listener and serves the authenticated edge +// surface until Shutdown closes the server. func (s *Server) Run(ctx context.Context) error { if ctx == nil { - return errors.New("run authenticated gRPC server: nil context") + return errors.New("run authenticated edge server: nil context") } if err := ctx.Err(); err != nil { return err @@ -146,23 +158,30 @@ func (s *Server) Run(ctx context.Context) error { listener, err := net.Listen("tcp", s.cfg.Addr) if err != nil { - return fmt.Errorf("run authenticated gRPC server: listen on %q: %w", s.cfg.Addr, err) + return fmt.Errorf("run authenticated edge server: listen on %q: %w", s.cfg.Addr, err) } - grpcServer := grpc.NewServer( - grpc.ConnectionTimeout(s.cfg.ConnectionTimeout), - grpc.StatsHandler(otelgrpc.NewServerHandler()), - grpc.ChainUnaryInterceptor(observabilityUnaryInterceptor(s.logger, s.metrics)), - grpc.ChainStreamInterceptor(observabilityStreamInterceptor(s.logger, s.metrics)), + mux := http.NewServeMux() + connectHandler := newConnectEdgeAdapter(s.service) + path, handler := gatewayv1connect.NewEdgeGatewayHandler( + connectHandler, + connect.WithInterceptors(observabilityConnectInterceptor(s.logger, s.metrics)), ) - gatewayv1.RegisterEdgeGatewayServer(grpcServer, s.service) + mux.Handle(path, handler) + + tracedHandler := otelhttp.NewHandler(mux, "authenticated_edge") + http2Server := &http2.Server{IdleTimeout: s.cfg.ConnectionTimeout} + httpServer := &http.Server{ + Handler: h2c.NewHandler(tracedHandler, http2Server), + ReadHeaderTimeout: s.cfg.ConnectionTimeout, + } s.stateMu.Lock() - s.server = grpcServer + s.server = httpServer s.listener = listener s.stateMu.Unlock() - s.logger.Info("authenticated gRPC server started", zap.String("addr", listener.Addr().String())) + s.logger.Info("authenticated edge server started", zap.String("addr", listener.Addr().String())) defer func() { s.stateMu.Lock() @@ -171,24 +190,22 @@ func (s *Server) Run(ctx context.Context) error { s.stateMu.Unlock() }() - err = grpcServer.Serve(listener) + err = httpServer.Serve(listener) switch { - case err == nil: - return nil - case errors.Is(err, grpc.ErrServerStopped): - s.logger.Info("authenticated gRPC server stopped") + case err == nil, errors.Is(err, http.ErrServerClosed): + s.logger.Info("authenticated edge server stopped") return nil default: - return fmt.Errorf("run authenticated gRPC server: serve on %q: %w", s.cfg.Addr, err) + return fmt.Errorf("run authenticated edge server: serve on %q: %w", s.cfg.Addr, err) } } -// Shutdown gracefully stops the authenticated gRPC server within ctx. When the -// graceful stop exceeds ctx, the server is force-stopped before returning the +// Shutdown gracefully stops the authenticated edge server within ctx. When the +// graceful stop exceeds ctx, the server is force-closed before returning the // timeout to the caller. func (s *Server) Shutdown(ctx context.Context) error { if ctx == nil { - return errors.New("shutdown authenticated gRPC server: nil context") + return errors.New("shutdown authenticated edge server: nil context") } s.stateMu.RLock() @@ -203,20 +220,16 @@ func (s *Server) Shutdown(ctx context.Context) error { s.pushHub.Shutdown() } - stopped := make(chan struct{}) - go func() { - server.GracefulStop() - close(stopped) - }() - - select { - case <-stopped: + err := server.Shutdown(ctx) + if err == nil { return nil - case <-ctx.Done(): - server.Stop() - <-stopped - return fmt.Errorf("shutdown authenticated gRPC server: %w", ctx.Err()) } + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + _ = server.Close() + return fmt.Errorf("shutdown authenticated edge server: %w", err) + } + + return fmt.Errorf("shutdown authenticated edge server: %w", err) } func (s *Server) listenAddr() string { diff --git a/gateway/internal/grpcapi/server_test.go b/gateway/internal/grpcapi/server_test.go index be7fd1a..819b02e 100644 --- a/gateway/internal/grpcapi/server_test.go +++ b/gateway/internal/grpcapi/server_test.go @@ -2,6 +2,10 @@ package grpcapi import ( "context" + "crypto/tls" + "errors" + "net" + "net/http" "testing" "time" @@ -9,13 +13,12 @@ import ( "galaxy/gateway/internal/config" "galaxy/gateway/internal/session" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + "galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect" + "connectrpc.com/connect" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" + "golang.org/x/net/http2" ) func TestExecuteCommandRejectsMalformedEnvelope(t *testing.T) { @@ -25,15 +28,11 @@ func TestExecuteCommandRejectsMalformedEnvelope(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), &gatewayv1.ExecuteCommandRequest{}) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(&gatewayv1.ExecuteCommandRequest{})) require.Error(t, err) - assert.Equal(t, codes.InvalidArgument, status.Code(err)) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) } func TestSubscribeEventsRejectsMalformedEnvelope(t *testing.T) { @@ -43,15 +42,11 @@ func TestSubscribeEventsRejectsMalformedEnvelope(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) err := subscribeEventsError(t, context.Background(), client, &gatewayv1.SubscribeEventsRequest{}) require.Error(t, err) - assert.Equal(t, codes.InvalidArgument, status.Code(err)) + assert.Equal(t, connect.CodeInvalidArgument, connect.CodeOf(err)) } func TestExecuteCommandRejectsUnsupportedProtocolVersion(t *testing.T) { @@ -61,13 +56,9 @@ func TestExecuteCommandRejectsUnsupportedProtocolVersion(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), &gatewayv1.ExecuteCommandRequest{ + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(&gatewayv1.ExecuteCommandRequest{ ProtocolVersion: "v2", DeviceSessionId: "device-session-123", MessageType: "fleet.move", @@ -76,10 +67,10 @@ func TestExecuteCommandRejectsUnsupportedProtocolVersion(t *testing.T) { PayloadBytes: []byte("payload"), PayloadHash: []byte("hash"), Signature: []byte("signature"), - }) + })) require.Error(t, err) - assert.Equal(t, codes.FailedPrecondition, status.Code(err)) - assert.Equal(t, `unsupported protocol_version "v2"`, status.Convert(err).Message()) + assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) + assert.Equal(t, `unsupported protocol_version "v2"`, connectErrorMessage(t, err)) } func TestExecuteCommandValidEnvelopeStillReturnsUnimplemented(t *testing.T) { @@ -96,15 +87,11 @@ func TestExecuteCommandValidEnvelopeStillReturnsUnimplemented(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.Unimplemented, status.Code(err)) + assert.Equal(t, connect.CodeUnimplemented, connect.CodeOf(err)) } func TestExecuteCommandMissingReplayStoreFailsClosed(t *testing.T) { @@ -120,16 +107,12 @@ func TestExecuteCommandMissingReplayStoreFailsClosed(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "replay store is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err)) } func TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation(t *testing.T) { @@ -149,22 +132,22 @@ func TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation( defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) - stream, err := client.SubscribeEvents(ctx, newValidSubscribeEventsRequest()) + stream, err := client.SubscribeEvents(ctx, connect.NewRequest(newValidSubscribeEventsRequest())) require.NoError(t, err) + t.Cleanup(func() { _ = stream.Close() }) event := recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli()) recvResult := make(chan error, 1) go func() { - _, recvErr := stream.Recv() - recvResult <- recvErr + if stream.Receive() { + recvResult <- errors.New("stream produced unexpected event") + return + } + recvResult <- stream.Err() }() require.Never(t, func() bool { @@ -188,7 +171,7 @@ func TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation( } }, time.Second, 10*time.Millisecond, "stream did not stop after client cancellation") require.Error(t, recvErr) - assert.Equal(t, codes.Canceled, status.Code(recvErr)) + assert.Equal(t, connect.CodeCanceled, connect.CodeOf(recvErr)) } func TestSubscribeEventsMissingReplayStoreFailsClosed(t *testing.T) { @@ -204,16 +187,12 @@ func TestSubscribeEventsMissingReplayStoreFailsClosed(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest()) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "replay store is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "replay store is unavailable", connectErrorMessage(t, err)) } func TestSubscribeEventsFailsClosedWhenResponseSignerUnavailable(t *testing.T) { @@ -231,16 +210,12 @@ func TestSubscribeEventsFailsClosedWhenResponseSignerUnavailable(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) - client := gatewayv1.NewEdgeGatewayClient(conn) err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest()) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "response signer is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "response signer is unavailable", connectErrorMessage(t, err)) } func TestServerLifecycle(t *testing.T) { @@ -248,21 +223,23 @@ func TestServerLifecycle(t *testing.T) { server, runGateway := newTestGateway(t, ServerDependencies{}) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - require.NoError(t, conn.Close()) + // Probe the listener before shutdown so we know it accepted at + // least one TCP connection. + probe, err := net.DialTimeout("tcp", addr, time.Second) + require.NoError(t, err) + require.NoError(t, probe.Close()) runGateway.stop(t) - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + // After shutdown the listener must refuse new TCP connections. + dialCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - - _, err := grpc.DialContext( - ctx, - addr, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - ) - require.Error(t, err) + dialer := &net.Dialer{} + closedConn, err := dialer.DialContext(dialCtx, "tcp", addr) + if err == nil { + _ = closedConn.Close() + t.Fatalf("expected dial to %s to fail after shutdown", addr) + } } type runningGateway struct { @@ -341,19 +318,36 @@ func waitForListenAddr(t *testing.T, server *Server) string { return addr } -func dialGatewayClient(t *testing.T, addr string) *grpc.ClientConn { +// newEdgeClient returns a Connect client speaking HTTP/2 cleartext to the +// authenticated edge listener. AllowHTTP forces the client to issue plain +// HTTP/2 requests (h2c) instead of attempting TLS, which the gateway's +// in-process test bootstrap does not configure. +func newEdgeClient(t *testing.T, addr string) gatewayv1connect.EdgeGatewayClient { t.Helper() - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - conn, err := grpc.DialContext( - ctx, - addr, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - ) - require.NoError(t, err) - - return conn + httpClient := &http.Client{ + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, network, target string, _ *tls.Config) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, network, target) + }, + }, + } + return gatewayv1connect.NewEdgeGatewayClient(httpClient, "http://"+addr) +} + +// connectErrorMessage extracts the *connect.Error message from err. It +// fails the test if err is not a *connect.Error so the caller's expected +// message comparison doesn't accidentally match the wrapped Go error +// string instead of the protocol-level message. +func connectErrorMessage(t require.TestingT, err error) string { + if helper, ok := t.(interface{ Helper() }); ok { + helper.Helper() + } + + var connectErr *connect.Error + if !errors.As(err, &connectErr) { + require.FailNowf(t, "expected *connect.Error", "got %T: %v", err, err) + } + return connectErr.Message() } diff --git a/gateway/internal/grpcapi/session_lookup.go b/gateway/internal/grpcapi/session_lookup.go index 3bc077e..64c7ed1 100644 --- a/gateway/internal/grpcapi/session_lookup.go +++ b/gateway/internal/grpcapi/session_lookup.go @@ -123,7 +123,7 @@ func (unavailableSessionCache) Lookup(context.Context, string) (session.Record, return session.Record{}, errors.New("session cache is unavailable") } -func (unavailableSessionCache) MarkRevoked(string) {} +func (unavailableSessionCache) MarkRevoked(string) {} func (unavailableSessionCache) MarkAllRevokedForUser(string) {} var _ gatewayv1.EdgeGatewayServer = sessionLookupService{} diff --git a/gateway/internal/grpcapi/session_lookup_integration_test.go b/gateway/internal/grpcapi/session_lookup_integration_test.go index 21ff7b3..8f11452 100644 --- a/gateway/internal/grpcapi/session_lookup_integration_test.go +++ b/gateway/internal/grpcapi/session_lookup_integration_test.go @@ -3,17 +3,15 @@ package grpcapi import ( "context" "errors" - "io" "testing" "galaxy/gateway/internal/session" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + "connectrpc.com/connect" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func TestExecuteCommandRejectsUnknownSession(t *testing.T) { @@ -31,16 +29,11 @@ func TestExecuteCommandRejectsUnknownSession(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.Unauthenticated, status.Code(err)) - assert.Equal(t, "unknown device session", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err)) + assert.Equal(t, "unknown device session", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) } @@ -59,16 +52,11 @@ func TestSubscribeEventsRejectsUnknownSession(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest()) require.Error(t, err) - assert.Equal(t, codes.Unauthenticated, status.Code(err)) - assert.Equal(t, "unknown device session", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err)) + assert.Equal(t, "unknown device session", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) } @@ -83,16 +71,11 @@ func TestExecuteCommandRejectsRevokedSession(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.FailedPrecondition, status.Code(err)) - assert.Equal(t, "device session is revoked", status.Convert(err).Message()) + assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) + assert.Equal(t, "device session is revoked", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) } @@ -107,16 +90,11 @@ func TestSubscribeEventsRejectsRevokedSession(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest()) require.Error(t, err) - assert.Equal(t, codes.FailedPrecondition, status.Code(err)) - assert.Equal(t, "device session is revoked", status.Convert(err).Message()) + assert.Equal(t, connect.CodeFailedPrecondition, connect.CodeOf(err)) + assert.Equal(t, "device session is revoked", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) } @@ -135,16 +113,11 @@ func TestExecuteCommandRejectsSessionCacheUnavailable(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "session cache is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "session cache is unavailable", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) } @@ -163,16 +136,11 @@ func TestSubscribeEventsRejectsSessionCacheUnavailable(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest()) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "session cache is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "session cache is unavailable", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) } @@ -196,15 +164,10 @@ func TestExecuteCommandAttachesResolvedSession(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - response, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + response, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.NoError(t, err) - assert.Equal(t, "request-123", response.GetRequestId()) + assert.Equal(t, "request-123", response.Msg.GetRequestId()) } func TestSubscribeEventsAttachesResolvedSession(t *testing.T) { @@ -227,20 +190,15 @@ func TestSubscribeEventsAttachesResolvedSession(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - stream, err := client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequest()) + client := newEdgeClient(t, addr) + stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequest())) require.NoError(t, err) event := recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli()) - _, err = stream.Recv() - require.ErrorIs(t, err, io.EOF) + require.False(t, stream.Receive()) + require.NoError(t, stream.Err()) } func TestSubscribeEventsAttachesAuthenticatedStreamBinding(t *testing.T) { @@ -269,20 +227,15 @@ func TestSubscribeEventsAttachesAuthenticatedStreamBinding(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - stream, err := client.SubscribeEvents(context.Background(), newValidSubscribeEventsRequest()) + client := newEdgeClient(t, addr) + stream, err := client.SubscribeEvents(context.Background(), connect.NewRequest(newValidSubscribeEventsRequest())) require.NoError(t, err) event := recvBootstrapEvent(t, stream) assertServerTimeBootstrapEvent(t, event, newTestResponseSignerPublicKey(), "request-123", "trace-123", testCurrentTime.UnixMilli()) - _, err = stream.Recv() - require.ErrorIs(t, err, io.EOF) + require.False(t, stream.Receive()) + require.NoError(t, stream.Err()) } type staticSessionCache struct { @@ -293,5 +246,5 @@ func (c staticSessionCache) Lookup(ctx context.Context, deviceSessionID string) return c.lookupFunc(ctx, deviceSessionID) } -func (staticSessionCache) MarkRevoked(string) {} +func (staticSessionCache) MarkRevoked(string) {} func (staticSessionCache) MarkAllRevokedForUser(string) {} diff --git a/gateway/internal/grpcapi/signature_integration_test.go b/gateway/internal/grpcapi/signature_integration_test.go index 3b36911..4dce842 100644 --- a/gateway/internal/grpcapi/signature_integration_test.go +++ b/gateway/internal/grpcapi/signature_integration_test.go @@ -5,12 +5,10 @@ import ( "testing" "galaxy/gateway/internal/session" - gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + "connectrpc.com/connect" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func TestExecuteCommandRejectsInvalidSignature(t *testing.T) { @@ -24,19 +22,15 @@ func TestExecuteCommandRejectsInvalidSignature(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) req := newValidExecuteCommandRequest() req.Signature[0] ^= 0xff - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), req) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(req)) require.Error(t, err) - assert.Equal(t, codes.Unauthenticated, status.Code(err)) - assert.Equal(t, "invalid request signature", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err)) + assert.Equal(t, "invalid request signature", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) } @@ -57,16 +51,11 @@ func TestExecuteCommandRejectsWrongKey(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.Unauthenticated, status.Code(err)) - assert.Equal(t, "invalid request signature", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err)) + assert.Equal(t, "invalid request signature", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) } @@ -87,16 +76,11 @@ func TestExecuteCommandRejectsInvalidCachedPublicKey(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) - _, err := client.ExecuteCommand(context.Background(), newValidExecuteCommandRequest()) + client := newEdgeClient(t, addr) + _, err := client.ExecuteCommand(context.Background(), connect.NewRequest(newValidExecuteCommandRequest())) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "session cache is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "session cache is unavailable", connectErrorMessage(t, err)) assert.Zero(t, delegate.executeCalls) } @@ -111,19 +95,15 @@ func TestSubscribeEventsRejectsInvalidSignature(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() + client := newEdgeClient(t, addr) req := newValidSubscribeEventsRequest() req.Signature[0] ^= 0xff - client := gatewayv1.NewEdgeGatewayClient(conn) err := subscribeEventsError(t, context.Background(), client, req) require.Error(t, err) - assert.Equal(t, codes.Unauthenticated, status.Code(err)) - assert.Equal(t, "invalid request signature", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err)) + assert.Equal(t, "invalid request signature", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) } @@ -144,16 +124,11 @@ func TestSubscribeEventsRejectsWrongKey(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest()) require.Error(t, err) - assert.Equal(t, codes.Unauthenticated, status.Code(err)) - assert.Equal(t, "invalid request signature", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err)) + assert.Equal(t, "invalid request signature", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) } @@ -174,15 +149,10 @@ func TestSubscribeEventsRejectsInvalidCachedPublicKey(t *testing.T) { defer runGateway.stop(t) addr := waitForListenAddr(t, server) - conn := dialGatewayClient(t, addr) - defer func() { - require.NoError(t, conn.Close()) - }() - - client := gatewayv1.NewEdgeGatewayClient(conn) + client := newEdgeClient(t, addr) err := subscribeEventsError(t, context.Background(), client, newValidSubscribeEventsRequest()) require.Error(t, err) - assert.Equal(t, codes.Unavailable, status.Code(err)) - assert.Equal(t, "session cache is unavailable", status.Convert(err).Message()) + assert.Equal(t, connect.CodeUnavailable, connect.CodeOf(err)) + assert.Equal(t, "session cache is unavailable", connectErrorMessage(t, err)) assert.Zero(t, delegate.subscribeCalls) } diff --git a/gateway/internal/grpcapi/test_fixtures_test.go b/gateway/internal/grpcapi/test_fixtures_test.go index 47dc57c..3512e0e 100644 --- a/gateway/internal/grpcapi/test_fixtures_test.go +++ b/gateway/internal/grpcapi/test_fixtures_test.go @@ -7,19 +7,21 @@ import ( "crypto/x509" "encoding/base64" "encoding/pem" + "errors" "time" "galaxy/gateway/authn" "galaxy/gateway/internal/downstream" "galaxy/gateway/internal/session" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + "galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect" gatewayfbs "galaxy/schema/fbs/gateway" + "connectrpc.com/connect" flatbuffers "github.com/google/flatbuffers/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc" ) var ( @@ -170,28 +172,37 @@ func (c fixedClock) Now() time.Time { func recvBootstrapEvent(t interface { require.TestingT Helper() -}, stream grpc.ServerStreamingClient[gatewayv1.GatewayEvent]) *gatewayv1.GatewayEvent { +}, stream *connect.ServerStreamForClient[gatewayv1.GatewayEvent]) *gatewayv1.GatewayEvent { t.Helper() - event, err := stream.Recv() - require.NoError(t, err) + if !stream.Receive() { + err := stream.Err() + if err == nil { + err = errors.New("stream closed before bootstrap event") + } + require.NoError(t, err) + } - return event + return stream.Msg() } func subscribeEventsError(t interface { require.TestingT Helper() -}, ctx context.Context, client gatewayv1.EdgeGatewayClient, req *gatewayv1.SubscribeEventsRequest) error { +}, ctx context.Context, client gatewayv1connect.EdgeGatewayClient, req *gatewayv1.SubscribeEventsRequest) error { t.Helper() - stream, err := client.SubscribeEvents(ctx, req) + stream, err := client.SubscribeEvents(ctx, connect.NewRequest(req)) if err != nil { return err } + defer func() { _ = stream.Close() }() - _, err = stream.Recv() - return err + if stream.Receive() { + return nil + } + + return stream.Err() } func assertServerTimeBootstrapEvent(t interface { diff --git a/gateway/proto/galaxy/gateway/v1/gatewayv1connect/edge_gateway.connect.go b/gateway/proto/galaxy/gateway/v1/gatewayv1connect/edge_gateway.connect.go new file mode 100644 index 0000000..5775c9a --- /dev/null +++ b/gateway/proto/galaxy/gateway/v1/gatewayv1connect/edge_gateway.connect.go @@ -0,0 +1,138 @@ +// Code generated by protoc-gen-connect-go. DO NOT EDIT. +// +// Source: galaxy/gateway/v1/edge_gateway.proto + +package gatewayv1connect + +import ( + connect "connectrpc.com/connect" + context "context" + errors "errors" + v1 "galaxy/gateway/proto/galaxy/gateway/v1" + http "net/http" + strings "strings" +) + +// This is a compile-time assertion to ensure that this generated file and the connect package are +// compatible. If you get a compiler error that this constant is not defined, this code was +// generated with a version of connect newer than the one compiled into your binary. You can fix the +// problem by either regenerating this code with an older version of connect or updating the connect +// version compiled into your binary. +const _ = connect.IsAtLeastVersion1_13_0 + +const ( + // EdgeGatewayName is the fully-qualified name of the EdgeGateway service. + EdgeGatewayName = "galaxy.gateway.v1.EdgeGateway" +) + +// These constants are the fully-qualified names of the RPCs defined in this package. They're +// exposed at runtime as Spec.Procedure and as the final two segments of the HTTP route. +// +// Note that these are different from the fully-qualified method names used by +// google.golang.org/protobuf/reflect/protoreflect. To convert from these constants to +// reflection-formatted method names, remove the leading slash and convert the remaining slash to a +// period. +const ( + // EdgeGatewayExecuteCommandProcedure is the fully-qualified name of the EdgeGateway's + // ExecuteCommand RPC. + EdgeGatewayExecuteCommandProcedure = "/galaxy.gateway.v1.EdgeGateway/ExecuteCommand" + // EdgeGatewaySubscribeEventsProcedure is the fully-qualified name of the EdgeGateway's + // SubscribeEvents RPC. + EdgeGatewaySubscribeEventsProcedure = "/galaxy.gateway.v1.EdgeGateway/SubscribeEvents" +) + +// EdgeGatewayClient is a client for the galaxy.gateway.v1.EdgeGateway service. +type EdgeGatewayClient interface { + ExecuteCommand(context.Context, *connect.Request[v1.ExecuteCommandRequest]) (*connect.Response[v1.ExecuteCommandResponse], error) + SubscribeEvents(context.Context, *connect.Request[v1.SubscribeEventsRequest]) (*connect.ServerStreamForClient[v1.GatewayEvent], error) +} + +// NewEdgeGatewayClient constructs a client for the galaxy.gateway.v1.EdgeGateway service. By +// default, it uses the Connect protocol with the binary Protobuf Codec, asks for gzipped responses, +// and sends uncompressed requests. To use the gRPC or gRPC-Web protocols, supply the +// connect.WithGRPC() or connect.WithGRPCWeb() options. +// +// The URL supplied here should be the base URL for the Connect or gRPC server (for example, +// http://api.acme.com or https://acme.com/grpc). +func NewEdgeGatewayClient(httpClient connect.HTTPClient, baseURL string, opts ...connect.ClientOption) EdgeGatewayClient { + baseURL = strings.TrimRight(baseURL, "/") + edgeGatewayMethods := v1.File_galaxy_gateway_v1_edge_gateway_proto.Services().ByName("EdgeGateway").Methods() + return &edgeGatewayClient{ + executeCommand: connect.NewClient[v1.ExecuteCommandRequest, v1.ExecuteCommandResponse]( + httpClient, + baseURL+EdgeGatewayExecuteCommandProcedure, + connect.WithSchema(edgeGatewayMethods.ByName("ExecuteCommand")), + connect.WithClientOptions(opts...), + ), + subscribeEvents: connect.NewClient[v1.SubscribeEventsRequest, v1.GatewayEvent]( + httpClient, + baseURL+EdgeGatewaySubscribeEventsProcedure, + connect.WithSchema(edgeGatewayMethods.ByName("SubscribeEvents")), + connect.WithClientOptions(opts...), + ), + } +} + +// edgeGatewayClient implements EdgeGatewayClient. +type edgeGatewayClient struct { + executeCommand *connect.Client[v1.ExecuteCommandRequest, v1.ExecuteCommandResponse] + subscribeEvents *connect.Client[v1.SubscribeEventsRequest, v1.GatewayEvent] +} + +// ExecuteCommand calls galaxy.gateway.v1.EdgeGateway.ExecuteCommand. +func (c *edgeGatewayClient) ExecuteCommand(ctx context.Context, req *connect.Request[v1.ExecuteCommandRequest]) (*connect.Response[v1.ExecuteCommandResponse], error) { + return c.executeCommand.CallUnary(ctx, req) +} + +// SubscribeEvents calls galaxy.gateway.v1.EdgeGateway.SubscribeEvents. +func (c *edgeGatewayClient) SubscribeEvents(ctx context.Context, req *connect.Request[v1.SubscribeEventsRequest]) (*connect.ServerStreamForClient[v1.GatewayEvent], error) { + return c.subscribeEvents.CallServerStream(ctx, req) +} + +// EdgeGatewayHandler is an implementation of the galaxy.gateway.v1.EdgeGateway service. +type EdgeGatewayHandler interface { + ExecuteCommand(context.Context, *connect.Request[v1.ExecuteCommandRequest]) (*connect.Response[v1.ExecuteCommandResponse], error) + SubscribeEvents(context.Context, *connect.Request[v1.SubscribeEventsRequest], *connect.ServerStream[v1.GatewayEvent]) error +} + +// NewEdgeGatewayHandler builds an HTTP handler from the service implementation. It returns the path +// on which to mount the handler and the handler itself. +// +// By default, handlers support the Connect, gRPC, and gRPC-Web protocols with the binary Protobuf +// and JSON codecs. They also support gzip compression. +func NewEdgeGatewayHandler(svc EdgeGatewayHandler, opts ...connect.HandlerOption) (string, http.Handler) { + edgeGatewayMethods := v1.File_galaxy_gateway_v1_edge_gateway_proto.Services().ByName("EdgeGateway").Methods() + edgeGatewayExecuteCommandHandler := connect.NewUnaryHandler( + EdgeGatewayExecuteCommandProcedure, + svc.ExecuteCommand, + connect.WithSchema(edgeGatewayMethods.ByName("ExecuteCommand")), + connect.WithHandlerOptions(opts...), + ) + edgeGatewaySubscribeEventsHandler := connect.NewServerStreamHandler( + EdgeGatewaySubscribeEventsProcedure, + svc.SubscribeEvents, + connect.WithSchema(edgeGatewayMethods.ByName("SubscribeEvents")), + connect.WithHandlerOptions(opts...), + ) + return "/galaxy.gateway.v1.EdgeGateway/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case EdgeGatewayExecuteCommandProcedure: + edgeGatewayExecuteCommandHandler.ServeHTTP(w, r) + case EdgeGatewaySubscribeEventsProcedure: + edgeGatewaySubscribeEventsHandler.ServeHTTP(w, r) + default: + http.NotFound(w, r) + } + }) +} + +// UnimplementedEdgeGatewayHandler returns CodeUnimplemented from all methods. +type UnimplementedEdgeGatewayHandler struct{} + +func (UnimplementedEdgeGatewayHandler) ExecuteCommand(context.Context, *connect.Request[v1.ExecuteCommandRequest]) (*connect.Response[v1.ExecuteCommandResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.New("galaxy.gateway.v1.EdgeGateway.ExecuteCommand is not implemented")) +} + +func (UnimplementedEdgeGatewayHandler) SubscribeEvents(context.Context, *connect.Request[v1.SubscribeEventsRequest], *connect.ServerStream[v1.GatewayEvent]) error { + return connect.NewError(connect.CodeUnimplemented, errors.New("galaxy.gateway.v1.EdgeGateway.SubscribeEvents is not implemented")) +} diff --git a/integration/go.mod b/integration/go.mod index de97c38..5a806b9 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -15,6 +15,7 @@ require ( require ( buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1 // indirect + connectrpc.com/connect v1.19.2 // indirect dario.cat/mergo v1.0.2 // indirect galaxy/util v0.0.0-00010101000000-000000000000 // indirect github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect diff --git a/integration/go.sum b/integration/go.sum index 1db685c..1e861b7 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -1,5 +1,7 @@ buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1 h1:PMmTMyvHScV9Mn8wc6ASge9uRcHy0jtqPd+fM35LmsQ= buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1/go.mod h1:tvtbpgaVXZX4g6Pn+AnzFycuRK3MOz5HJfEGeEllXYM= +connectrpc.com/connect v1.19.2 h1:McQ83FGdzL+t60peksi0gXC7MQ/iLKgLduAnThbM0mo= +connectrpc.com/connect v1.19.2/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w= dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA= github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk= diff --git a/integration/testenv/grpc_client.go b/integration/testenv/connect_client.go similarity index 68% rename from integration/testenv/grpc_client.go rename to integration/testenv/connect_client.go index 9723e30..0d3ad81 100644 --- a/integration/testenv/grpc_client.go +++ b/integration/testenv/connect_client.go @@ -5,30 +5,34 @@ import ( "crypto/ed25519" "crypto/rand" "crypto/sha256" + "crypto/tls" "encoding/base64" "errors" "fmt" + "net" + "net/http" "sync/atomic" "time" gatewayauthn "galaxy/gateway/authn" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + "galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect" + "connectrpc.com/connect" "github.com/google/uuid" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" + "golang.org/x/net/http2" ) -// SignedGatewayClient drives the authenticated gRPC surface of the +// SignedGatewayClient drives the authenticated edge surface of the // gateway from tests. It signs ExecuteCommand envelopes with the // session's Ed25519 private key, verifies response signatures with // the gateway's response-signer public key, and exposes a -// SubscribeEvents helper. +// SubscribeEvents helper. The client speaks Connect over HTTP/2 +// cleartext (h2c) — the gateway listener supports that natively +// alongside gRPC and gRPC-Web on the same port. type SignedGatewayClient struct { - conn *grpc.ClientConn - edge gatewayv1.EdgeGatewayClient + httpClient *http.Client + edge gatewayv1connect.EdgeGatewayClient deviceSID string privateKey ed25519.PrivateKey respPub ed25519.PublicKey @@ -55,25 +59,42 @@ func EncodePublicKey(pub ed25519.PublicKey) string { return base64.StdEncoding.EncodeToString(pub) } -// DialGateway opens a gRPC connection to gateway's authenticated -// surface and prepares a signing client bound to deviceSID. -func DialGateway(ctx context.Context, addr string, deviceSID string, privateKey ed25519.PrivateKey, respPub ed25519.PublicKey) (*SignedGatewayClient, error) { - conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, fmt.Errorf("dial gateway: %w", err) +// DialGateway opens a Connect (HTTP/2 cleartext) client against the +// gateway's authenticated edge listener at addr ("host:port") and +// prepares a signing client bound to deviceSID. +func DialGateway(_ context.Context, addr string, deviceSID string, privateKey ed25519.PrivateKey, respPub ed25519.PublicKey) (*SignedGatewayClient, error) { + if addr == "" { + return nil, fmt.Errorf("dial gateway: empty addr") } + + httpClient := &http.Client{ + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, network, target string, _ *tls.Config) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, network, target) + }, + }, + } + edge := gatewayv1connect.NewEdgeGatewayClient(httpClient, "http://"+addr) + return &SignedGatewayClient{ - conn: conn, - edge: gatewayv1.NewEdgeGatewayClient(conn), + httpClient: httpClient, + edge: edge, deviceSID: deviceSID, privateKey: privateKey, respPub: respPub, }, nil } -// Close releases the gRPC connection. +// Close releases idle HTTP/2 connections held by the underlying transport. +// The Connect client itself is stateless, so this is best-effort. func (c *SignedGatewayClient) Close() error { - return c.conn.Close() + if c.httpClient != nil { + if transport, ok := c.httpClient.Transport.(*http2.Transport); ok { + transport.CloseIdleConnections() + } + } + return nil } // ExecuteOptions tunes one ExecuteCommand call. The zero value @@ -81,11 +102,11 @@ func (c *SignedGatewayClient) Close() error { // need a fixed request_id (anti-replay) or a stale timestamp // (freshness window) override the relevant fields. type ExecuteOptions struct { - RequestID string - TimestampMS int64 - OverrideSignature []byte - OverridePayloadHash []byte - OverrideSessionID string + RequestID string + TimestampMS int64 + OverrideSignature []byte + OverridePayloadHash []byte + OverrideSessionID string OverrideProtocolVersion string } @@ -155,10 +176,11 @@ func (c *SignedGatewayClient) Execute(ctx context.Context, messageType string, p } atomic.AddUint64(&c.requestSeq, 1) - resp, err := c.edge.ExecuteCommand(ctx, req) + respWrap, err := c.edge.ExecuteCommand(ctx, connect.NewRequest(req)) if err != nil { return nil, err } + resp := respWrap.Msg respHash := sha256.Sum256(resp.GetPayloadBytes()) if string(respHash[:]) != string(resp.GetPayloadHash()) { @@ -202,7 +224,7 @@ func (c *SignedGatewayClient) SubscribeEvents(ctx context.Context, messageType s PayloadHash: emptyHash[:], })) - stream, err := c.edge.SubscribeEvents(ctx, &gatewayv1.SubscribeEventsRequest{ + stream, err := c.edge.SubscribeEvents(ctx, connect.NewRequest(&gatewayv1.SubscribeEventsRequest{ ProtocolVersion: protocolVersion, DeviceSessionId: c.deviceSID, MessageType: messageType, @@ -210,7 +232,7 @@ func (c *SignedGatewayClient) SubscribeEvents(ctx context.Context, messageType s RequestId: requestID, PayloadHash: emptyHash[:], Signature: signature, - }) + })) if err != nil { return nil, nil, fmt.Errorf("open subscribe events: %w", err) } @@ -219,41 +241,39 @@ func (c *SignedGatewayClient) SubscribeEvents(ctx context.Context, messageType s errs := make(chan error, 1) go func() { defer close(events) - for { - ev, err := stream.Recv() - if err != nil { - errs <- err - return - } - events <- ev + defer func() { _ = stream.Close() }() + for stream.Receive() { + events <- stream.Msg() } + errs <- stream.Err() }() return events, errs, nil } -// IsUnauthenticated reports whether err is a gRPC Unauthenticated -// status, useful for negative-path edge tests. +// IsUnauthenticated reports whether err carries Connect's +// CodeUnauthenticated, useful for negative-path edge tests. func IsUnauthenticated(err error) bool { - return status.Code(err) == codes.Unauthenticated + return connect.CodeOf(err) == connect.CodeUnauthenticated } -// IsInvalidArgument reports whether err is a gRPC InvalidArgument -// status (used for malformed envelopes and unsupported +// IsInvalidArgument reports whether err carries Connect's +// CodeInvalidArgument (used for malformed envelopes and unsupported // protocol_version). func IsInvalidArgument(err error) bool { - return status.Code(err) == codes.InvalidArgument + return connect.CodeOf(err) == connect.CodeInvalidArgument } -// IsResourceExhausted reports whether err is a gRPC -// ResourceExhausted status (used for replay rejection). +// IsResourceExhausted reports whether err carries Connect's +// CodeResourceExhausted (used for replay rejection or rate-limit +// rejections). func IsResourceExhausted(err error) bool { - return status.Code(err) == codes.ResourceExhausted + return connect.CodeOf(err) == connect.CodeResourceExhausted } -// IsFailedPrecondition reports whether err is a gRPC -// FailedPrecondition status. The gateway uses this code for replay +// IsFailedPrecondition reports whether err carries Connect's +// CodeFailedPrecondition. The gateway uses this code for replay // rejections (the canonical envelope was authentic but the // `request_id` was already consumed). func IsFailedPrecondition(err error) bool { - return status.Code(err) == codes.FailedPrecondition + return connect.CodeOf(err) == connect.CodeFailedPrecondition } diff --git a/ui/PLAN.md b/ui/PLAN.md index ff10e88..a6cb15c 100644 --- a/ui/PLAN.md +++ b/ui/PLAN.md @@ -423,46 +423,88 @@ Targeted tests: - `gateway/authn` cross-module parity tests as listed under Artifacts. -## Phase 4. ConnectRPC Support in Gateway +## ~~Phase 4. ConnectRPC Support in Gateway~~ -Status: pending. Cross-service phase — work happens in `gateway/`, -not `ui/`. +Status: done. Cross-service phase — work happened in `gateway/` and +`integration/`, not `ui/`. -Goal: enable browsers to call the gateway's authenticated gRPC surface -through ConnectRPC, while preserving the existing native gRPC ingress -for desktop and mobile clients. +Goal: enable browsers to call the gateway's authenticated edge surface +through ConnectRPC, without keeping a separate gRPC server bootstrap +alive purely for test clients. -Artifacts: +Decision (taken with the project owner before implementation): the +existing native-gRPC `grpc.NewServer` bootstrap was replaced with a +single `connectrpc.com/connect` HTTP/h2c listener, since Connect-Go +natively serves the Connect, gRPC, and gRPC-Web protocols on the same +port. No production gRPC clients existed to preserve. The package +`gateway/internal/grpcapi` keeps its name for diff-size reasons and +documents the historical labelling in its package doc. -- ConnectRPC handler registered alongside existing gRPC server in - `gateway/internal/...` using `connectrpc.com/connect` -- `gateway/buf.gen.yaml` extended to generate Connect-Go code from - existing `.proto` files -- updated `gateway/README.md` and `gateway/openapi.yaml` reflecting - Connect ingress endpoints -- updated `docs/ARCHITECTURE.md` §15 if the deployment topology changes -- `gateway/internal/.../connect_server_test.go` integration test - exercising a unary Connect call and a server-streaming Connect call +Artifacts (delivered): -Dependencies: Phase 3 (canonical bytes are needed for the integration -fixtures used here). +- `gateway/buf.gen.yaml` extended with `buf.build/connectrpc/go`, + generating `gateway/proto/galaxy/gateway/v1/gatewayv1connect/edge_gateway.connect.go` +- `gateway/internal/grpcapi/server.go` rewritten around `http.Server` + + `h2c.NewHandler` + `gatewayv1connect.NewEdgeGatewayHandler` +- new `gateway/internal/grpcapi/connect_handler.go` adapting the + existing `gatewayv1.EdgeGatewayServer` decorator stack to the + Connect handler interface, including a `grpc.ServerStreamingServer` + shim around `*connect.ServerStream[GatewayEvent]` and a gRPC + `status.Error` → `*connect.Error` translation helper +- new `gateway/internal/grpcapi/connect_observability.go` Connect + interceptor recording the same metric and structured-log shape the + gRPC interceptors emitted; the rate-limit decorator now reads peer + IP from a context value populated by the interceptor instead of + `peer.FromContext` +- updated `gateway/README.md` (Transport Matrix + "Authenticated Edge + Surface"), `gateway/docs/runtime.md`, `gateway/docs/flows.md`, + `gateway/docs/runbook.md`, and `docs/ARCHITECTURE.md` §15 +- migrated tests: `gateway/internal/grpcapi/server_test.go`, + `test_fixtures_test.go`, and every `*_integration_test.go` in that + package now drive a `gatewayv1connect.EdgeGatewayClient` over + HTTP/2 cleartext loopback +- migrated harness: `integration/testenv/grpc_client.go` → + `connect_client.go`. `SignedGatewayClient` keeps the same public + shape (`Execute`, `SubscribeEvents`, `Close`) but speaks Connect + internally; `Is*` helpers now use `connect.CodeOf` -Acceptance criteria: +Dependencies: Phase 3 (canonical bytes are needed for the +fixture-level signing the migrated tests use). -- a curl-based unary Connect call from outside the gateway process - succeeds end-to-end against the authenticated surface; -- server-streaming `SubscribeEvents` works over Connect with at least - one delivered event; -- existing native gRPC clients continue to work unchanged; -- both gRPC and Connect handlers share the same upstream business code - (no duplication beyond the protocol layer). +Acceptance criteria (met): -Targeted tests: +- unary Connect calls from outside the gateway process succeed + end-to-end against the authenticated surface — verified by the + migrated `grpcapi/server_test.go` and `command_routing_integration_test.go` + scenarios driving the Connect client over loopback h2c; +- server-streaming `SubscribeEvents` works over Connect with the + signed `gateway.server_time` bootstrap event delivered first — + verified by `TestSubscribeEventsValidEnvelopeSendsBootstrapEventAndWaitsForCancellation`; +- the unified listener still natively accepts gRPC and gRPC-Web + framing for any future native client (Connect-Go's documented + multi-protocol support); +- the Connect handler shares the same upstream business code as the + unified listener — there is exactly one decorator stack + (`grpcapi.NewServer` → `s.service`). -- Connect unary integration test against a running gateway+backend; -- Connect streaming integration test asserting at least one push event - delivery; -- existing gateway test suite stays green. +Targeted tests (delivered): + +- Connect unary integration tests in `gateway/internal/grpcapi/` + exercising the full envelope → signature → freshness/replay → + rate-limit → routing pipeline through the new Connect transport; +- Connect streaming integration tests asserting bootstrap-event + delivery, replay rejection on stream open, and shutdown closure; +- the existing gateway test suite (`go test ./gateway/...`) stays + green. + +Decision deviation note: the planned standalone +`gateway/internal/grpcapi/connect_server_test.go` was not added as a +separate file because the migrated `*_test.go` files in the same +package already cover unary happy + streaming bootstrap + protocol- +version reject through the Connect client. A duplicate file would not +add coverage. Future contributors looking for "the Connect tests" can +read any file in `gateway/internal/grpcapi/` — they all use the +Connect client now. ## Phase 5. WASM Build, `WasmCore` Adapter, `GalaxyClient` Skeleton