package grpcapi import ( "context" "errors" "fmt" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" "galaxy/gateway/proto/galaxy/gateway/v1/gatewayv1connect" "connectrpc.com/connect" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" grpcstatus "google.golang.org/grpc/status" ) // connectEdgeAdapter exposes the existing gRPC-shaped authenticated edge // service decorator stack (envelope → session → payload-hash → signature → // freshness/replay → rate-limit → routing/push) through the // gatewayv1connect.EdgeGatewayHandler interface. It owns no logic of its // own; the underlying decorator stack carries the full ingress contract // unchanged. type connectEdgeAdapter struct { impl gatewayv1.EdgeGatewayServer } // newConnectEdgeAdapter wraps impl as a Connect handler. func newConnectEdgeAdapter(impl gatewayv1.EdgeGatewayServer) gatewayv1connect.EdgeGatewayHandler { return &connectEdgeAdapter{impl: impl} } // ExecuteCommand unwraps the typed Connect request, calls the underlying // service, and wraps the typed response. gRPC `status.Error` values // returned by the decorator stack are translated to *connect.Error so // the Connect client receives the matching code and message. func (a *connectEdgeAdapter) ExecuteCommand(ctx context.Context, req *connect.Request[gatewayv1.ExecuteCommandRequest]) (*connect.Response[gatewayv1.ExecuteCommandResponse], error) { resp, err := a.impl.ExecuteCommand(ctx, req.Msg) if err != nil { return nil, translateGRPCStatusError(err) } return connect.NewResponse(resp), nil } // SubscribeEvents adapts the Connect server stream to the // grpc.ServerStreamingServer contract expected by the existing decorator // stack. The decorator stack only ever calls Send and Context on the // stream; the remaining grpc.ServerStream surface is satisfied by no-op // shims so the interface contract is met without panicking. Errors // returned by the decorator stack are translated to *connect.Error. func (a *connectEdgeAdapter) SubscribeEvents(ctx context.Context, req *connect.Request[gatewayv1.SubscribeEventsRequest], stream *connect.ServerStream[gatewayv1.GatewayEvent]) error { wrapped := &connectEdgeStream{ctx: ctx, stream: stream} if err := a.impl.SubscribeEvents(req.Msg, wrapped); err != nil { return translateGRPCStatusError(err) } return nil } // translateGRPCStatusError maps gRPC status.Error values returned by the // decorator stack into *connect.Error with the equivalent code and message. // Errors that are already *connect.Error pass through unchanged. Errors // without a recognisable gRPC status are returned verbatim — connect-go // renders those as CodeUnknown. func translateGRPCStatusError(err error) error { if err == nil { return nil } var connectErr *connect.Error if errors.As(err, &connectErr) { return err } grpcStatus, ok := grpcstatus.FromError(err) if !ok { return err } if grpcStatus.Code() == codes.OK { return nil } return connect.NewError(connect.Code(grpcStatus.Code()), errors.New(grpcStatus.Message())) } // connectEdgeStream satisfies grpc.ServerStreamingServer[gatewayv1.GatewayEvent] // on top of *connect.ServerStream. The decorator stack reads the request // context and pushes outbound events through Send; the rest of the // grpc.ServerStream surface is not exercised in the gateway, so the no-op // implementations preserve the type contract without surprising behaviour. type connectEdgeStream struct { ctx context.Context stream *connect.ServerStream[gatewayv1.GatewayEvent] } // Send forwards a typed gateway event through the underlying Connect server // stream. func (s *connectEdgeStream) Send(event *gatewayv1.GatewayEvent) error { return s.stream.Send(event) } // Context returns the request context handed to the Connect handler. func (s *connectEdgeStream) Context() context.Context { return s.ctx } // SetHeader is part of grpc.ServerStream. The Connect transport exposes // response headers through ResponseHeader() at construction time; metadata // supplied here is intentionally ignored because no decorator in the // gateway exercises the gRPC-only metadata path. func (s *connectEdgeStream) SetHeader(metadata.MD) error { return nil } // SendHeader is part of grpc.ServerStream. Connect-served streams flush // headers automatically on the first Send; manual header dispatch is not // modelled. func (s *connectEdgeStream) SendHeader(metadata.MD) error { return nil } // SetTrailer is part of grpc.ServerStream. Trailer metadata has no // corresponding Connect concept on server-streaming responses. func (s *connectEdgeStream) SetTrailer(metadata.MD) {} // SendMsg is part of grpc.ServerStream. The decorator stack never calls // SendMsg directly; if a future caller does, the typed Send path is used // when the message is a GatewayEvent. func (s *connectEdgeStream) SendMsg(m any) error { event, ok := m.(*gatewayv1.GatewayEvent) if !ok { return fmt.Errorf("connectEdgeStream.SendMsg: unsupported message type %T", m) } return s.stream.Send(event) } // RecvMsg is part of grpc.ServerStream. Server-streaming server handlers // have no client messages to receive after the initial request, so this // method is intentionally an error path. func (s *connectEdgeStream) RecvMsg(any) error { return errors.New("connectEdgeStream.RecvMsg: server-streaming has no client messages") }