Files
Ilia Denisov 118f7c17a2 phase 4: connectrpc on the gateway authenticated edge
Replace the native-gRPC server bootstrap with a single
`connectrpc.com/connect` HTTP/h2c listener. Connect-Go natively
serves Connect, gRPC, and gRPC-Web on the same port, so browsers can
now reach the authenticated surface without giving up the gRPC
framing native and desktop clients may use later. The decorator
stack (envelope → session → payload-hash → signature →
freshness/replay → rate-limit → routing/push) is reused unchanged
behind a small Connect → gRPC adapter and a `grpc.ServerStream`
shim around `*connect.ServerStream`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-07 11:49:28 +02:00

130 lines
3.9 KiB
Go

package grpcapi
import (
"context"
"errors"
"galaxy/gateway/internal/session"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// resolvedSessionFromContext returns the session record previously attached to
// ctx by the session-lookup gateway wrapper.
func resolvedSessionFromContext(ctx context.Context) (session.Record, bool) {
if ctx == nil {
return session.Record{}, false
}
record, ok := ctx.Value(resolvedSessionContextKey{}).(session.Record)
if !ok {
return session.Record{}, false
}
return cloneSessionRecord(record), true
}
// sessionLookupService resolves the authenticated session from SessionCache
// after envelope parsing succeeds and before later auth steps run.
type sessionLookupService struct {
gatewayv1.UnimplementedEdgeGatewayServer
delegate gatewayv1.EdgeGatewayServer
cache session.Cache
}
// ExecuteCommand resolves the cached session for req and only then forwards it
// to the configured delegate with the resolved session attached to ctx.
func (s sessionLookupService) ExecuteCommand(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
record, err := s.lookupSession(ctx)
if err != nil {
return nil, err
}
return s.delegate.ExecuteCommand(context.WithValue(ctx, resolvedSessionContextKey{}, cloneSessionRecord(record)), req)
}
// SubscribeEvents resolves the cached session for req and only then forwards it
// to the configured delegate with the resolved session attached to the stream
// context.
func (s sessionLookupService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
record, err := s.lookupSession(stream.Context())
if err != nil {
return err
}
return s.delegate.SubscribeEvents(req, resolvedSessionContextStream{
ServerStreamingServer: stream,
ctx: context.WithValue(stream.Context(), resolvedSessionContextKey{}, cloneSessionRecord(record)),
})
}
// newSessionLookupService wraps delegate with the session-cache lookup gate.
func newSessionLookupService(delegate gatewayv1.EdgeGatewayServer, cache session.Cache) gatewayv1.EdgeGatewayServer {
return sessionLookupService{
delegate: delegate,
cache: cache,
}
}
func (s sessionLookupService) lookupSession(ctx context.Context) (session.Record, error) {
envelope, ok := parsedEnvelopeFromContext(ctx)
if !ok {
return session.Record{}, status.Error(codes.Internal, "authenticated request context is incomplete")
}
record, err := s.cache.Lookup(ctx, envelope.DeviceSessionID)
switch {
case err == nil:
case errors.Is(err, session.ErrNotFound):
return session.Record{}, status.Error(codes.Unauthenticated, "unknown device session")
default:
return session.Record{}, status.Error(codes.Unavailable, "session cache is unavailable")
}
if record.Status == session.StatusRevoked {
return session.Record{}, status.Error(codes.FailedPrecondition, "device session is revoked")
}
return cloneSessionRecord(record), nil
}
func cloneSessionRecord(record session.Record) session.Record {
cloned := record
if record.RevokedAtMS != nil {
value := *record.RevokedAtMS
cloned.RevokedAtMS = &value
}
return cloned
}
type resolvedSessionContextKey struct{}
type resolvedSessionContextStream struct {
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
ctx context.Context
}
func (s resolvedSessionContextStream) Context() context.Context {
if s.ctx == nil {
return context.Background()
}
return s.ctx
}
type unavailableSessionCache struct{}
func (unavailableSessionCache) Lookup(context.Context, string) (session.Record, error) {
return session.Record{}, errors.New("session cache is unavailable")
}
func (unavailableSessionCache) MarkRevoked(string) {}
func (unavailableSessionCache) MarkAllRevokedForUser(string) {}
var _ gatewayv1.EdgeGatewayServer = sessionLookupService{}