Files
Ilia Denisov 118f7c17a2 phase 4: connectrpc on the gateway authenticated edge
Replace the native-gRPC server bootstrap with a single
`connectrpc.com/connect` HTTP/h2c listener. Connect-Go natively
serves Connect, gRPC, and gRPC-Web on the same port, so browsers can
now reach the authenticated surface without giving up the gRPC
framing native and desktop clients may use later. The decorator
stack (envelope → session → payload-hash → signature →
freshness/replay → rate-limit → routing/push) is reused unchanged
behind a small Connect → gRPC adapter and a `grpc.ServerStream`
shim around `*connect.ServerStream`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-07 11:49:28 +02:00

144 lines
5.3 KiB
Go

package grpcapi
import (
"context"
"errors"
"fmt"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect"
"connectrpc.com/connect"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
)
// connectEdgeAdapter exposes the existing gRPC-shaped authenticated edge
// service decorator stack (envelope → session → payload-hash → signature →
// freshness/replay → rate-limit → routing/push) through the
// gatewayv1connect.EdgeGatewayHandler interface. It owns no logic of its
// own; the underlying decorator stack carries the full ingress contract
// unchanged.
type connectEdgeAdapter struct {
impl gatewayv1.EdgeGatewayServer
}
// newConnectEdgeAdapter wraps impl as a Connect handler.
func newConnectEdgeAdapter(impl gatewayv1.EdgeGatewayServer) gatewayv1connect.EdgeGatewayHandler {
return &connectEdgeAdapter{impl: impl}
}
// ExecuteCommand unwraps the typed Connect request, calls the underlying
// service, and wraps the typed response. gRPC `status.Error` values
// returned by the decorator stack are translated to *connect.Error so
// the Connect client receives the matching code and message.
func (a *connectEdgeAdapter) ExecuteCommand(ctx context.Context, req *connect.Request[gatewayv1.ExecuteCommandRequest]) (*connect.Response[gatewayv1.ExecuteCommandResponse], error) {
resp, err := a.impl.ExecuteCommand(ctx, req.Msg)
if err != nil {
return nil, translateGRPCStatusError(err)
}
return connect.NewResponse(resp), nil
}
// SubscribeEvents adapts the Connect server stream to the
// grpc.ServerStreamingServer contract expected by the existing decorator
// stack. The decorator stack only ever calls Send and Context on the
// stream; the remaining grpc.ServerStream surface is satisfied by no-op
// shims so the interface contract is met without panicking. Errors
// returned by the decorator stack are translated to *connect.Error.
func (a *connectEdgeAdapter) SubscribeEvents(ctx context.Context, req *connect.Request[gatewayv1.SubscribeEventsRequest], stream *connect.ServerStream[gatewayv1.GatewayEvent]) error {
wrapped := &connectEdgeStream{ctx: ctx, stream: stream}
if err := a.impl.SubscribeEvents(req.Msg, wrapped); err != nil {
return translateGRPCStatusError(err)
}
return nil
}
// translateGRPCStatusError maps gRPC status.Error values returned by the
// decorator stack into *connect.Error with the equivalent code and message.
// Errors that are already *connect.Error pass through unchanged. Errors
// without a recognisable gRPC status are returned verbatim — connect-go
// renders those as CodeUnknown.
func translateGRPCStatusError(err error) error {
if err == nil {
return nil
}
var connectErr *connect.Error
if errors.As(err, &connectErr) {
return err
}
grpcStatus, ok := grpcstatus.FromError(err)
if !ok {
return err
}
if grpcStatus.Code() == codes.OK {
return nil
}
return connect.NewError(connect.Code(grpcStatus.Code()), errors.New(grpcStatus.Message()))
}
// connectEdgeStream satisfies grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
// on top of *connect.ServerStream. The decorator stack reads the request
// context and pushes outbound events through Send; the rest of the
// grpc.ServerStream surface is not exercised in the gateway, so the no-op
// implementations preserve the type contract without surprising behaviour.
type connectEdgeStream struct {
ctx context.Context
stream *connect.ServerStream[gatewayv1.GatewayEvent]
}
// Send forwards a typed gateway event through the underlying Connect server
// stream.
func (s *connectEdgeStream) Send(event *gatewayv1.GatewayEvent) error {
return s.stream.Send(event)
}
// Context returns the request context handed to the Connect handler.
func (s *connectEdgeStream) Context() context.Context {
return s.ctx
}
// SetHeader is part of grpc.ServerStream. The Connect transport exposes
// response headers through ResponseHeader() at construction time; metadata
// supplied here is intentionally ignored because no decorator in the
// gateway exercises the gRPC-only metadata path.
func (s *connectEdgeStream) SetHeader(metadata.MD) error {
return nil
}
// SendHeader is part of grpc.ServerStream. Connect-served streams flush
// headers automatically on the first Send; manual header dispatch is not
// modelled.
func (s *connectEdgeStream) SendHeader(metadata.MD) error {
return nil
}
// SetTrailer is part of grpc.ServerStream. Trailer metadata has no
// corresponding Connect concept on server-streaming responses.
func (s *connectEdgeStream) SetTrailer(metadata.MD) {}
// SendMsg is part of grpc.ServerStream. The decorator stack never calls
// SendMsg directly; if a future caller does, the typed Send path is used
// when the message is a GatewayEvent.
func (s *connectEdgeStream) SendMsg(m any) error {
event, ok := m.(*gatewayv1.GatewayEvent)
if !ok {
return fmt.Errorf("connectEdgeStream.SendMsg: unsupported message type %T", m)
}
return s.stream.Send(event)
}
// RecvMsg is part of grpc.ServerStream. Server-streaming server handlers
// have no client messages to receive after the initial request, so this
// method is intentionally an error path.
func (s *connectEdgeStream) RecvMsg(any) error {
return errors.New("connectEdgeStream.RecvMsg: server-streaming has no client messages")
}