Files
scrabble-game/loadtest/internal/edge/client.go
T
Ilia Denisov 04263a17ca R7: per-player transports + drop finished games in the load harness
Each virtual player now builds its own edge.Client (its own h2c connection
carrying both the Subscribe stream and the Execute calls), instead of every
player multiplexing over a single shared http2.Transport. The R2 trip report
traced the ~14% transport_error on game.state at 500 players to that single
shared transport; per-player connections mirror real clients and isolate the
artifact. The assembly burst and the gateway-hammer each get their own client.

playTurn now reports when a game has finished so playerLoop drops it from the
rotation (slices.DeleteFunc); once no active game remains the player idles while
still holding its stream. This stops secondary ops from hammering game_finished
on already-ended games (the other R2 harness finding).
2026-06-10 18:53:07 +02:00

139 lines
4.8 KiB
Go

// Package edge is the load harness's client of the gateway edge protocol: the
// Connect Execute envelope carrying FlatBuffers payloads, plus the Subscribe live
// stream, over h2c. It exposes typed wrappers for the operations the driver
// exercises, decoding responses into plain Go structs so the scenario layer never
// touches FlatBuffers directly.
package edge
import (
"context"
"crypto/tls"
"net"
"net/http"
"time"
"connectrpc.com/connect"
"golang.org/x/net/http2"
edgev1 "scrabble/gateway/proto/edge/v1"
"scrabble/gateway/proto/edge/v1/edgev1connect"
)
// Message types the driver uses, mirroring gateway/internal/transcode's catalog.
const (
msgSubmitPlay = "game.submit_play"
msgPass = "game.pass"
msgExchange = "game.exchange"
msgState = "game.state"
msgHistory = "game.history"
msgGamesList = "games.list"
msgCheckWord = "game.check_word"
msgNudge = "chat.nudge"
msgChatPost = "chat.post"
msgDraftSave = "draft.save"
msgDraftGet = "draft.get"
msgProfileGet = "profile.get"
msgProfileUpd = "profile.update"
msgStatsGet = "stats.get"
msgInvCreate = "invitation.create"
msgInvAccept = "invitation.accept"
msgInvList = "invitation.list"
msgEnqueue = "lobby.enqueue"
)
// Client speaks the edge protocol to a single gateway base URL over h2c. The harness
// builds one Client per virtual player, so each player owns its h2c connection (its
// Subscribe stream and Execute calls share it) the way a real client does; a single
// Client is safe for that player's own concurrent goroutines.
type Client struct {
rpc edgev1connect.GatewayClient
}
// New builds a Client for baseURL (for example http://gateway:8081). The transport
// speaks HTTP/2 cleartext (h2c) to match the gateway, dialling plaintext TCP rather
// than TLS. Each virtual player gets its own Client (hence its own connection), so the
// load mirrors real clients instead of multiplexing every player over one transport.
func New(baseURL string) *Client {
hc := &http.Client{
Transport: &http2.Transport{
AllowHTTP: true,
DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, network, addr)
},
},
}
return &Client{rpc: edgev1connect.NewGatewayClient(hc, baseURL)}
}
// Result is the decoded Execute envelope: Code is "ok" or a stable domain error
// code (a non-ok Code is a domain outcome, not a transport failure); Payload is the
// FlatBuffers response body (empty on error).
type Result struct {
Code string
Payload []byte
}
// execute runs one operation as token (empty for an unauthenticated op). A transport
// or connection error is returned as err; a domain rejection is reported in
// Result.Code with a nil err.
func (c *Client) execute(ctx context.Context, token, msgType string, payload []byte) (Result, error) {
req := connect.NewRequest(&edgev1.ExecuteRequest{MessageType: msgType, Payload: payload})
if token != "" {
req.Header().Set("Authorization", "Bearer "+token)
}
resp, err := c.rpc.Execute(ctx, req)
if err != nil {
return Result{Code: connectCode(err)}, err
}
return Result{Code: resp.Msg.ResultCode, Payload: resp.Msg.Payload}, nil
}
// connectCode renders a transport error as a short code for the report (e.g.
// "rate_limited" for HTTP 429, "unavailable", "deadline"), so the gateway-hammer can
// tally limiter rejections without inspecting full errors.
func connectCode(err error) string {
switch connect.CodeOf(err) {
case connect.CodeResourceExhausted:
return "rate_limited"
case connect.CodeUnauthenticated:
return "unauthenticated"
case connect.CodeUnavailable:
return "unavailable"
case connect.CodeDeadlineExceeded:
return "deadline"
default:
return "transport_error"
}
}
// Event is one decoded live event: its kind and raw FlatBuffers payload (the driver
// reacts to kind alone — your_turn / match_found drive a state fetch).
type Event struct {
Kind string
}
// Subscribe opens the live-event stream as token and invokes onEvent for each event
// until the context is cancelled or the stream ends. It blocks; run it in its own
// goroutine. Stream errors are returned for the caller to count and (optionally)
// reconnect.
func (c *Client) Subscribe(ctx context.Context, token string, onEvent func(Event)) error {
req := connect.NewRequest(&edgev1.SubscribeRequest{})
req.Header().Set("Authorization", "Bearer "+token)
stream, err := c.rpc.Subscribe(ctx, req)
if err != nil {
return err
}
defer stream.Close()
for stream.Receive() {
if onEvent != nil {
onEvent(Event{Kind: stream.Msg().Kind})
}
}
return stream.Err()
}
// pollInterval bounds how often a player re-checks one game's state; exported for the
// scenario's pacing math so a virtual player stays under the per-user rate limit.
const DefaultPollInterval = 3 * time.Second