package testenv import ( "context" "crypto/ed25519" "crypto/rand" "crypto/sha256" "encoding/base64" "errors" "fmt" "sync/atomic" "time" gatewayauthn "galaxy/gateway/authn" gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" ) // SignedGatewayClient drives the authenticated gRPC surface of the // gateway from tests. It signs ExecuteCommand envelopes with the // session's Ed25519 private key, verifies response signatures with // the gateway's response-signer public key, and exposes a // SubscribeEvents helper. type SignedGatewayClient struct { conn *grpc.ClientConn edge gatewayv1.EdgeGatewayClient deviceSID string privateKey ed25519.PrivateKey respPub ed25519.PublicKey requestSeq uint64 } // NewSession is the device-session shape returned by registration. type NewSession struct { DeviceSessionID string PrivateKey ed25519.PrivateKey PublicKey ed25519.PublicKey } // GenerateSessionKeyPair returns a fresh Ed25519 keypair for use in // `confirm-email-code`. func GenerateSessionKeyPair() (ed25519.PublicKey, ed25519.PrivateKey, error) { return ed25519.GenerateKey(rand.Reader) } // EncodePublicKey base64-encodes the raw 32-byte Ed25519 public key // for the `client_public_key` field. func EncodePublicKey(pub ed25519.PublicKey) string { return base64.StdEncoding.EncodeToString(pub) } // DialGateway opens a gRPC connection to gateway's authenticated // surface and prepares a signing client bound to deviceSID. func DialGateway(ctx context.Context, addr string, deviceSID string, privateKey ed25519.PrivateKey, respPub ed25519.PublicKey) (*SignedGatewayClient, error) { conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, fmt.Errorf("dial gateway: %w", err) } return &SignedGatewayClient{ conn: conn, edge: gatewayv1.NewEdgeGatewayClient(conn), deviceSID: deviceSID, privateKey: privateKey, respPub: respPub, }, nil } // Close releases the gRPC connection. func (c *SignedGatewayClient) Close() error { return c.conn.Close() } // ExecuteOptions tunes one ExecuteCommand call. The zero value // produces a fresh `request_id` and the current timestamp; tests that // need a fixed request_id (anti-replay) or a stale timestamp // (freshness window) override the relevant fields. type ExecuteOptions struct { RequestID string TimestampMS int64 OverrideSignature []byte OverridePayloadHash []byte OverrideSessionID string OverrideProtocolVersion string } // ExecuteResult is the verified response of a successful // ExecuteCommand. PayloadBytes is the authenticated FlatBuffers // blob; tests decode it via galaxy/transcoder. type ExecuteResult struct { ResultCode string PayloadBytes []byte RequestID string TimestampMS int64 } // Execute signs the supplied payload, calls ExecuteCommand, verifies // the response signature against the gateway response signer, and // returns the decoded result. func (c *SignedGatewayClient) Execute(ctx context.Context, messageType string, payload []byte, opts ExecuteOptions) (*ExecuteResult, error) { if len(payload) == 0 { return nil, errors.New("ExecuteCommand requires non-empty payload") } requestID := opts.RequestID if requestID == "" { requestID = uuid.NewString() } timestampMS := opts.TimestampMS if timestampMS == 0 { timestampMS = time.Now().UnixMilli() } protocolVersion := opts.OverrideProtocolVersion if protocolVersion == "" { protocolVersion = "v1" } deviceSID := opts.OverrideSessionID if deviceSID == "" { deviceSID = c.deviceSID } hash := opts.OverridePayloadHash if hash == nil { sum := sha256.Sum256(payload) hash = sum[:] } signature := opts.OverrideSignature if signature == nil { input := gatewayauthn.BuildRequestSigningInput(gatewayauthn.RequestSigningFields{ ProtocolVersion: protocolVersion, DeviceSessionID: deviceSID, MessageType: messageType, TimestampMS: timestampMS, RequestID: requestID, PayloadHash: hash, }) signature = ed25519.Sign(c.privateKey, input) } req := &gatewayv1.ExecuteCommandRequest{ ProtocolVersion: protocolVersion, DeviceSessionId: deviceSID, MessageType: messageType, TimestampMs: timestampMS, RequestId: requestID, PayloadBytes: payload, PayloadHash: hash, Signature: signature, } atomic.AddUint64(&c.requestSeq, 1) resp, err := c.edge.ExecuteCommand(ctx, req) if err != nil { return nil, err } respHash := sha256.Sum256(resp.GetPayloadBytes()) if string(respHash[:]) != string(resp.GetPayloadHash()) { return nil, fmt.Errorf("response payload_hash mismatch") } if err := gatewayauthn.VerifyResponseSignature(c.respPub, resp.GetSignature(), gatewayauthn.ResponseSigningFields{ ProtocolVersion: resp.GetProtocolVersion(), RequestID: resp.GetRequestId(), TimestampMS: resp.GetTimestampMs(), ResultCode: resp.GetResultCode(), PayloadHash: resp.GetPayloadHash(), }); err != nil { return nil, fmt.Errorf("response signature verification failed: %w", err) } return &ExecuteResult{ ResultCode: resp.GetResultCode(), PayloadBytes: resp.GetPayloadBytes(), RequestID: resp.GetRequestId(), TimestampMS: resp.GetTimestampMs(), }, nil } // SubscribeEvents opens the authenticated server-streaming // SubscribeEvents RPC. The returned channel receives every // authenticated event the gateway delivers; the channel closes when // the stream ends or when ctx is done. Errors land on the err // channel. func (c *SignedGatewayClient) SubscribeEvents(ctx context.Context, messageType string) (<-chan *gatewayv1.GatewayEvent, <-chan error, error) { requestID := uuid.NewString() timestampMS := time.Now().UnixMilli() protocolVersion := "v1" emptyHash := sha256.Sum256(nil) signature := ed25519.Sign(c.privateKey, gatewayauthn.BuildRequestSigningInput(gatewayauthn.RequestSigningFields{ ProtocolVersion: protocolVersion, DeviceSessionID: c.deviceSID, MessageType: messageType, TimestampMS: timestampMS, RequestID: requestID, PayloadHash: emptyHash[:], })) stream, err := c.edge.SubscribeEvents(ctx, &gatewayv1.SubscribeEventsRequest{ ProtocolVersion: protocolVersion, DeviceSessionId: c.deviceSID, MessageType: messageType, TimestampMs: timestampMS, RequestId: requestID, PayloadHash: emptyHash[:], Signature: signature, }) if err != nil { return nil, nil, fmt.Errorf("open subscribe events: %w", err) } events := make(chan *gatewayv1.GatewayEvent, 16) errs := make(chan error, 1) go func() { defer close(events) for { ev, err := stream.Recv() if err != nil { errs <- err return } events <- ev } }() return events, errs, nil } // IsUnauthenticated reports whether err is a gRPC Unauthenticated // status, useful for negative-path edge tests. func IsUnauthenticated(err error) bool { return status.Code(err) == codes.Unauthenticated } // IsInvalidArgument reports whether err is a gRPC InvalidArgument // status (used for malformed envelopes and unsupported // protocol_version). func IsInvalidArgument(err error) bool { return status.Code(err) == codes.InvalidArgument } // IsResourceExhausted reports whether err is a gRPC // ResourceExhausted status (used for replay rejection). func IsResourceExhausted(err error) bool { return status.Code(err) == codes.ResourceExhausted } // IsFailedPrecondition reports whether err is a gRPC // FailedPrecondition status. The gateway uses this code for replay // rejections (the canonical envelope was authentic but the // `request_id` was already consumed). func IsFailedPrecondition(err error) bool { return status.Code(err) == codes.FailedPrecondition }