package grpcapi import ( "context" "errors" "galaxy/gateway/internal/config" "galaxy/gateway/internal/ratelimit" "galaxy/gateway/internal/session" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const ( authenticatedGRPCBaseBucketKeyPrefix = "authenticated_grpc/" authenticatedGRPCIPBucketKeySegment = authenticatedGRPCBaseBucketKeyPrefix + "ip=" authenticatedGRPCSessionBucketKeySegment = authenticatedGRPCBaseBucketKeyPrefix + "session=" authenticatedGRPCUserBucketKeySegment = authenticatedGRPCBaseBucketKeyPrefix + "user=" authenticatedGRPCMessageClassBucketKeySegment = authenticatedGRPCBaseBucketKeyPrefix + "message_class=" unknownAuthenticatedPeerIP = "unknown" authenticatedRPCExecuteCommand = "ExecuteCommand" authenticatedRPCSubscribeEvents = "SubscribeEvents" ) var ( // ErrAuthenticatedPolicyDenied reports that the authenticated request was // rejected by later edge policy after transport authenticity succeeded. ErrAuthenticatedPolicyDenied = errors.New("authenticated request rejected by edge policy") // ErrAuthenticatedPolicyUnavailable reports that authenticated policy could // not be evaluated because its backing dependency is unavailable. ErrAuthenticatedPolicyUnavailable = errors.New("authenticated request policy is unavailable") ) // AuthenticatedRequestLimiter applies authenticated edge rate-limit policy to // one concrete bucket key. type AuthenticatedRequestLimiter interface { // Reserve evaluates key under policy and reports whether the request may // proceed immediately. Reserve(key string, policy ratelimit.Policy) ratelimit.Decision } // AuthenticatedRequest describes the authenticated request metadata exposed to // the edge-policy hook. type AuthenticatedRequest struct { // RPCMethod identifies the public RPC method being processed. RPCMethod string // PeerIP is the transport peer IP host part derived from the // authenticated edge HTTP listener peer address. PeerIP string // MessageClass is the stable rate-limit and policy class. The gateway uses // the full message_type literal because the v1 transport does not yet define // a coarser authenticated class taxonomy. MessageClass string // Envelope contains the verified transport envelope fields used by later // edge policy. Envelope AuthenticatedRequestEnvelope // Session contains the authenticated identity resolved from SessionCache. Session session.Record } // AuthenticatedRequestEnvelope describes the verified request envelope fields // exposed to the edge-policy hook. type AuthenticatedRequestEnvelope struct { // ProtocolVersion is the supported transport protocol version literal. ProtocolVersion string // DeviceSessionID is the authenticated device-session identifier. DeviceSessionID string // MessageType is the verified downstream routing key supplied by the client. MessageType string // TimestampMS is the client timestamp that already passed freshness checks. TimestampMS int64 // RequestID is the authenticated transport request identifier. RequestID string // TraceID is the optional client-supplied correlation identifier. TraceID string } // AuthenticatedRequestPolicy evaluates later authenticated edge policy after // transport authenticity and rate-limit checks succeed. type AuthenticatedRequestPolicy interface { // Evaluate returns nil when the authenticated request may proceed. It should // wrap ErrAuthenticatedPolicyDenied for stable reject mapping and // ErrAuthenticatedPolicyUnavailable when its backing dependency is // temporarily unavailable. Evaluate(ctx context.Context, request AuthenticatedRequest) error } type authenticatedRateLimitService struct { gatewayv1.UnimplementedEdgeGatewayServer delegate gatewayv1.EdgeGatewayServer limiter AuthenticatedRequestLimiter policy AuthenticatedRequestPolicy cfg config.AuthenticatedGRPCAntiAbuseConfig } // ExecuteCommand applies authenticated rate limits and edge policy before // delegating to the configured service implementation. func (s authenticatedRateLimitService) ExecuteCommand(ctx context.Context, req *gatewayv1.ExecuteCommandRequest) (*gatewayv1.ExecuteCommandResponse, error) { if err := s.applyRateLimitsAndPolicy(ctx, authenticatedRPCExecuteCommand); err != nil { return nil, err } return s.delegate.ExecuteCommand(ctx, req) } // SubscribeEvents applies authenticated rate limits and edge policy before // delegating to the configured service implementation. func (s authenticatedRateLimitService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error { if err := s.applyRateLimitsAndPolicy(stream.Context(), authenticatedRPCSubscribeEvents); err != nil { return err } return s.delegate.SubscribeEvents(req, stream) } // newAuthenticatedRateLimitService wraps delegate with the authenticated // rate-limit and edge-policy gate. func newAuthenticatedRateLimitService(delegate gatewayv1.EdgeGatewayServer, limiter AuthenticatedRequestLimiter, policy AuthenticatedRequestPolicy, cfg config.AuthenticatedGRPCAntiAbuseConfig) gatewayv1.EdgeGatewayServer { return authenticatedRateLimitService{ delegate: delegate, limiter: limiter, policy: policy, cfg: cfg, } } func (s authenticatedRateLimitService) applyRateLimitsAndPolicy(ctx context.Context, rpcMethod string) error { request, err := authenticatedRequestFromContext(ctx, rpcMethod) if err != nil { return err } if err := s.applyRateLimits(request); err != nil { return err } if err := s.applyPolicy(ctx, request); err != nil { return err } return nil } func (s authenticatedRateLimitService) applyRateLimits(request AuthenticatedRequest) error { checks := []struct { key string policy config.AuthenticatedRateLimitConfig }{ { key: authenticatedGRPCIPBucketKey(request.PeerIP), policy: s.cfg.IP, }, { key: authenticatedGRPCSessionBucketKey(request.Envelope.DeviceSessionID), policy: s.cfg.Session, }, { key: authenticatedGRPCUserBucketKey(request.Session.UserID), policy: s.cfg.User, }, { key: authenticatedGRPCMessageClassBucketKey(request.MessageClass), policy: s.cfg.MessageClass, }, } for _, check := range checks { decision := s.limiter.Reserve(check.key, ratelimit.Policy{ Requests: check.policy.Requests, Window: check.policy.Window, Burst: check.policy.Burst, }) if !decision.Allowed { return status.Error(codes.ResourceExhausted, "authenticated request rate limit exceeded") } } return nil } func (s authenticatedRateLimitService) applyPolicy(ctx context.Context, request AuthenticatedRequest) error { err := s.policy.Evaluate(ctx, request) switch { case err == nil: return nil case errors.Is(err, ErrAuthenticatedPolicyDenied): return status.Error(codes.PermissionDenied, "authenticated request rejected by edge policy") case errors.Is(err, ErrAuthenticatedPolicyUnavailable): return status.Error(codes.Unavailable, "authenticated request policy is unavailable") default: return status.Error(codes.Internal, "authenticated request policy evaluation failed") } } func authenticatedRequestFromContext(ctx context.Context, rpcMethod string) (AuthenticatedRequest, error) { envelope, ok := parsedEnvelopeFromContext(ctx) if !ok { return AuthenticatedRequest{}, status.Error(codes.Internal, "authenticated request context is incomplete") } record, ok := resolvedSessionFromContext(ctx) if !ok { return AuthenticatedRequest{}, status.Error(codes.Internal, "authenticated request context is incomplete") } return AuthenticatedRequest{ RPCMethod: rpcMethod, PeerIP: peerIPFromContext(ctx), MessageClass: authenticatedMessageClass(envelope.MessageType), Envelope: AuthenticatedRequestEnvelope{ ProtocolVersion: envelope.ProtocolVersion, DeviceSessionID: envelope.DeviceSessionID, MessageType: envelope.MessageType, TimestampMS: envelope.TimestampMS, RequestID: envelope.RequestID, TraceID: envelope.TraceID, }, Session: record, }, nil } func authenticatedGRPCIPBucketKey(peerIP string) string { return authenticatedGRPCIPBucketKeySegment + peerIP } func authenticatedGRPCSessionBucketKey(deviceSessionID string) string { return authenticatedGRPCSessionBucketKeySegment + deviceSessionID } func authenticatedGRPCUserBucketKey(userID string) string { return authenticatedGRPCUserBucketKeySegment + userID } func authenticatedGRPCMessageClassBucketKey(messageClass string) string { return authenticatedGRPCMessageClassBucketKeySegment + messageClass } func authenticatedMessageClass(messageType string) string { return messageType } type peerIPContextKey struct{} // contextWithPeerIP attaches the authenticated edge transport peer IP to ctx. // It is set by the transport interceptor before the service decorator stack // runs, and read back via peerIPFromContext. func contextWithPeerIP(ctx context.Context, ip string) context.Context { return context.WithValue(ctx, peerIPContextKey{}, ip) } func peerIPFromContext(ctx context.Context) string { if ip, ok := ctx.Value(peerIPContextKey{}).(string); ok && ip != "" { return ip } return unknownAuthenticatedPeerIP } type noopAuthenticatedRequestPolicy struct{} func (noopAuthenticatedRequestPolicy) Evaluate(context.Context, AuthenticatedRequest) error { return nil } var _ gatewayv1.EdgeGatewayServer = authenticatedRateLimitService{}