Files
galaxy-game/gateway/internal/grpcapi/rate_limit.go
T
Ilia Denisov 118f7c17a2 phase 4: connectrpc on the gateway authenticated edge
Replace the native-gRPC server bootstrap with a single
`connectrpc.com/connect` HTTP/h2c listener. Connect-Go natively
serves Connect, gRPC, and gRPC-Web on the same port, so browsers can
now reach the authenticated surface without giving up the gRPC
framing native and desktop clients may use later. The decorator
stack (envelope → session → payload-hash → signature →
freshness/replay → rate-limit → routing/push) is reused unchanged
behind a small Connect → gRPC adapter and a `grpc.ServerStream`
shim around `*connect.ServerStream`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-07 11:49:28 +02:00

283 lines
9.4 KiB
Go

package grpcapi
import (
"context"
"errors"
"galaxy/gateway/internal/config"
"galaxy/gateway/internal/ratelimit"
"galaxy/gateway/internal/session"
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
authenticatedGRPCBaseBucketKeyPrefix = "authenticated_grpc/"
authenticatedGRPCIPBucketKeySegment = authenticatedGRPCBaseBucketKeyPrefix + "ip="
authenticatedGRPCSessionBucketKeySegment = authenticatedGRPCBaseBucketKeyPrefix + "session="
authenticatedGRPCUserBucketKeySegment = authenticatedGRPCBaseBucketKeyPrefix + "user="
authenticatedGRPCMessageClassBucketKeySegment = authenticatedGRPCBaseBucketKeyPrefix + "message_class="
unknownAuthenticatedPeerIP = "unknown"
authenticatedRPCExecuteCommand = "ExecuteCommand"
authenticatedRPCSubscribeEvents = "SubscribeEvents"
)
var (
// ErrAuthenticatedPolicyDenied reports that the authenticated request was
// rejected by later edge policy after transport authenticity succeeded.
ErrAuthenticatedPolicyDenied = errors.New("authenticated request rejected by edge policy")
// ErrAuthenticatedPolicyUnavailable reports that authenticated policy could
// not be evaluated because its backing dependency is unavailable.
ErrAuthenticatedPolicyUnavailable = errors.New("authenticated request policy is unavailable")
)
// AuthenticatedRequestLimiter applies authenticated edge rate-limit policy to
// one concrete bucket key.
type AuthenticatedRequestLimiter interface {
// Reserve evaluates key under policy and reports whether the request may
// proceed immediately.
Reserve(key string, policy ratelimit.Policy) ratelimit.Decision
}
// AuthenticatedRequest describes the authenticated request metadata exposed to
// the edge-policy hook.
type AuthenticatedRequest struct {
// RPCMethod identifies the public RPC method being processed.
RPCMethod string
// PeerIP is the transport peer IP host part derived from the
// authenticated edge HTTP listener peer address.
PeerIP string
// MessageClass is the stable rate-limit and policy class. The gateway uses
// the full message_type literal because the v1 transport does not yet define
// a coarser authenticated class taxonomy.
MessageClass string
// Envelope contains the verified transport envelope fields used by later
// edge policy.
Envelope AuthenticatedRequestEnvelope
// Session contains the authenticated identity resolved from SessionCache.
Session session.Record
}
// AuthenticatedRequestEnvelope describes the verified request envelope fields
// exposed to the edge-policy hook.
type AuthenticatedRequestEnvelope struct {
// ProtocolVersion is the supported transport protocol version literal.
ProtocolVersion string
// DeviceSessionID is the authenticated device-session identifier.
DeviceSessionID string
// MessageType is the verified downstream routing key supplied by the client.
MessageType string
// TimestampMS is the client timestamp that already passed freshness checks.
TimestampMS int64
// RequestID is the authenticated transport request identifier.
RequestID string
// TraceID is the optional client-supplied correlation identifier.
TraceID string
}
// AuthenticatedRequestPolicy evaluates later authenticated edge policy after
// transport authenticity and rate-limit checks succeed.
type AuthenticatedRequestPolicy interface {
// Evaluate returns nil when the authenticated request may proceed. It should
// wrap ErrAuthenticatedPolicyDenied for stable reject mapping and
// ErrAuthenticatedPolicyUnavailable when its backing dependency is
// temporarily unavailable.
Evaluate(ctx context.Context, request AuthenticatedRequest) error
}
type authenticatedRateLimitService struct {
gatewayv1.UnimplementedEdgeGatewayServer
delegate gatewayv1.EdgeGatewayServer
limiter AuthenticatedRequestLimiter
policy AuthenticatedRequestPolicy
cfg config.AuthenticatedGRPCAntiAbuseConfig
}
// ExecuteCommand applies authenticated rate limits and edge policy before
// delegating to the configured service implementation.
func (s authenticatedRateLimitService) ExecuteCommand(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) {
if err := s.applyRateLimitsAndPolicy(ctx, authenticatedRPCExecuteCommand); err != nil {
return nil, err
}
return s.delegate.ExecuteCommand(ctx, req)
}
// SubscribeEvents applies authenticated rate limits and edge policy before
// delegating to the configured service implementation.
func (s authenticatedRateLimitService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
if err := s.applyRateLimitsAndPolicy(stream.Context(), authenticatedRPCSubscribeEvents); err != nil {
return err
}
return s.delegate.SubscribeEvents(req, stream)
}
// newAuthenticatedRateLimitService wraps delegate with the authenticated
// rate-limit and edge-policy gate.
func newAuthenticatedRateLimitService(delegate gatewayv1.EdgeGatewayServer, limiter AuthenticatedRequestLimiter, policy AuthenticatedRequestPolicy, cfg config.AuthenticatedGRPCAntiAbuseConfig) gatewayv1.EdgeGatewayServer {
return authenticatedRateLimitService{
delegate: delegate,
limiter: limiter,
policy: policy,
cfg: cfg,
}
}
func (s authenticatedRateLimitService) applyRateLimitsAndPolicy(ctx context.Context, rpcMethod string) error {
request, err := authenticatedRequestFromContext(ctx, rpcMethod)
if err != nil {
return err
}
if err := s.applyRateLimits(request); err != nil {
return err
}
if err := s.applyPolicy(ctx, request); err != nil {
return err
}
return nil
}
func (s authenticatedRateLimitService) applyRateLimits(request AuthenticatedRequest) error {
checks := []struct {
key string
policy config.AuthenticatedRateLimitConfig
}{
{
key: authenticatedGRPCIPBucketKey(request.PeerIP),
policy: s.cfg.IP,
},
{
key: authenticatedGRPCSessionBucketKey(request.Envelope.DeviceSessionID),
policy: s.cfg.Session,
},
{
key: authenticatedGRPCUserBucketKey(request.Session.UserID),
policy: s.cfg.User,
},
{
key: authenticatedGRPCMessageClassBucketKey(request.MessageClass),
policy: s.cfg.MessageClass,
},
}
for _, check := range checks {
decision := s.limiter.Reserve(check.key, ratelimit.Policy{
Requests: check.policy.Requests,
Window: check.policy.Window,
Burst: check.policy.Burst,
})
if !decision.Allowed {
return status.Error(codes.ResourceExhausted, "authenticated request rate limit exceeded")
}
}
return nil
}
func (s authenticatedRateLimitService) applyPolicy(ctx context.Context, request AuthenticatedRequest) error {
err := s.policy.Evaluate(ctx, request)
switch {
case err == nil:
return nil
case errors.Is(err, ErrAuthenticatedPolicyDenied):
return status.Error(codes.PermissionDenied, "authenticated request rejected by edge policy")
case errors.Is(err, ErrAuthenticatedPolicyUnavailable):
return status.Error(codes.Unavailable, "authenticated request policy is unavailable")
default:
return status.Error(codes.Internal, "authenticated request policy evaluation failed")
}
}
func authenticatedRequestFromContext(ctx context.Context, rpcMethod string) (AuthenticatedRequest, error) {
envelope, ok := parsedEnvelopeFromContext(ctx)
if !ok {
return AuthenticatedRequest{}, status.Error(codes.Internal, "authenticated request context is incomplete")
}
record, ok := resolvedSessionFromContext(ctx)
if !ok {
return AuthenticatedRequest{}, status.Error(codes.Internal, "authenticated request context is incomplete")
}
return AuthenticatedRequest{
RPCMethod: rpcMethod,
PeerIP: peerIPFromContext(ctx),
MessageClass: authenticatedMessageClass(envelope.MessageType),
Envelope: AuthenticatedRequestEnvelope{
ProtocolVersion: envelope.ProtocolVersion,
DeviceSessionID: envelope.DeviceSessionID,
MessageType: envelope.MessageType,
TimestampMS: envelope.TimestampMS,
RequestID: envelope.RequestID,
TraceID: envelope.TraceID,
},
Session: record,
}, nil
}
func authenticatedGRPCIPBucketKey(peerIP string) string {
return authenticatedGRPCIPBucketKeySegment + peerIP
}
func authenticatedGRPCSessionBucketKey(deviceSessionID string) string {
return authenticatedGRPCSessionBucketKeySegment + deviceSessionID
}
func authenticatedGRPCUserBucketKey(userID string) string {
return authenticatedGRPCUserBucketKeySegment + userID
}
func authenticatedGRPCMessageClassBucketKey(messageClass string) string {
return authenticatedGRPCMessageClassBucketKeySegment + messageClass
}
func authenticatedMessageClass(messageType string) string {
return messageType
}
type peerIPContextKey struct{}
// contextWithPeerIP attaches the authenticated edge transport peer IP to ctx.
// It is set by the transport interceptor before the service decorator stack
// runs, and read back via peerIPFromContext.
func contextWithPeerIP(ctx context.Context, ip string) context.Context {
return context.WithValue(ctx, peerIPContextKey{}, ip)
}
func peerIPFromContext(ctx context.Context) string {
if ip, ok := ctx.Value(peerIPContextKey{}).(string); ok && ip != "" {
return ip
}
return unknownAuthenticatedPeerIP
}
type noopAuthenticatedRequestPolicy struct{}
func (noopAuthenticatedRequestPolicy) Evaluate(context.Context, AuthenticatedRequest) error {
return nil
}
var _ gatewayv1.EdgeGatewayServer = authenticatedRateLimitService{}