diff --git a/loadtest/cmd/loadtest/main.go b/loadtest/cmd/loadtest/main.go index 9d58798..aaec899 100644 --- a/loadtest/cmd/loadtest/main.go +++ b/loadtest/cmd/loadtest/main.go @@ -24,7 +24,6 @@ import ( "syscall" "time" - "scrabble/loadtest/internal/edge" "scrabble/loadtest/internal/moves" "scrabble/loadtest/internal/report" "scrabble/loadtest/internal/scenario" @@ -114,7 +113,7 @@ func cmdRun(ctx context.Context, log *slog.Logger, args []string) error { log.Info("seeded", "durable", len(pool.Durables), "guest", len(pool.Guests)) rec := report.New() - drv := scenario.NewDriver(edge.New(*gateway), reg, rec, log) + drv := scenario.NewDriver(*gateway, reg, rec, log) cfg := scenario.RealisticConfig{ Steps: steps, StepDur: *stepDur, GamesPerPlayer: *gpp, Tick: *tick, SecondaryProb: *secProb, diff --git a/loadtest/internal/edge/client.go b/loadtest/internal/edge/client.go index a40ec8d..9b7d41b 100644 --- a/loadtest/internal/edge/client.go +++ b/loadtest/internal/edge/client.go @@ -41,16 +41,18 @@ const ( msgEnqueue = "lobby.enqueue" ) -// Client speaks the edge protocol to a single gateway base URL over h2c. It is safe -// for concurrent use by many virtual players (the underlying http2.Transport pools -// and multiplexes connections). +// Client speaks the edge protocol to a single gateway base URL over h2c. The harness +// builds one Client per virtual player, so each player owns its h2c connection (its +// Subscribe stream and Execute calls share it) the way a real client does; a single +// Client is safe for that player's own concurrent goroutines. type Client struct { rpc edgev1connect.GatewayClient } // New builds a Client for baseURL (for example http://gateway:8081). The transport // speaks HTTP/2 cleartext (h2c) to match the gateway, dialling plaintext TCP rather -// than TLS. +// than TLS. Each virtual player gets its own Client (hence its own connection), so the +// load mirrors real clients instead of multiplexing every player over one transport. func New(baseURL string) *Client { hc := &http.Client{ Transport: &http2.Transport{ diff --git a/loadtest/internal/scenario/assemble.go b/loadtest/internal/scenario/assemble.go index 33584c6..7cc9ece 100644 --- a/loadtest/internal/scenario/assemble.go +++ b/loadtest/internal/scenario/assemble.go @@ -37,6 +37,7 @@ func (d *Driver) assembleCohort(ctx context.Context, cohort []seed.Account, game if len(cohort) < 2 { return nil } + c := edge.New(d.gateway) // one client for the assembly burst; players play on their own gamesOf := make(map[string]int, len(cohort)) var games []*Game for i := range cohort { @@ -51,7 +52,7 @@ func (d *Driver) assembleCohort(ctx context.Context, cohort []seed.Account, game break } variant := moves.Variants()[rng.Intn(len(moves.Variants()))] - g, err := d.assemble(ctx, members, variant) + g, err := d.assemble(ctx, c, members, variant) if err != nil { d.log.Debug("assemble game", "err", err) break @@ -85,7 +86,7 @@ func pickMembers(cohort []seed.Account, inviter seed.Account, rng *rand.Rand) [] // assemble runs the invitation flow for one game: the inviter (members[0]) invites // the rest, each invitee accepts the pending invitation, and the completing accept // starts the game, which is then located in the inviter's game list. -func (d *Driver) assemble(ctx context.Context, members []seed.Account, variant string) (*Game, error) { +func (d *Driver) assemble(ctx context.Context, c *edge.Client, members []seed.Account, variant string) (*Game, error) { inviter := members[0] inviteeIDs := make([]string, len(members)-1) for i, m := range members[1:] { @@ -93,7 +94,7 @@ func (d *Driver) assemble(ctx context.Context, members []seed.Account, variant s } t0 := time.Now() - code, err := d.edge.CreateInvitation(ctx, inviter.Token, inviteeIDs, variant) + code, err := c.CreateInvitation(ctx, inviter.Token, inviteeIDs, variant) d.rec.Record("invitation.create", code, time.Since(t0)) if err != nil || code != "ok" { return nil, fmt.Errorf("invitation.create: %s", code) @@ -101,7 +102,7 @@ func (d *Driver) assemble(ctx context.Context, members []seed.Account, variant s for _, invitee := range members[1:] { t0 = time.Now() - list, lc, err := d.edge.ListInvitations(ctx, invitee.Token) + list, lc, err := c.ListInvitations(ctx, invitee.Token) d.rec.Record("invitation.list", lc, time.Since(t0)) if err != nil || lc != "ok" { return nil, fmt.Errorf("invitation.list: %s", lc) @@ -111,7 +112,7 @@ func (d *Driver) assemble(ctx context.Context, members []seed.Account, variant s return nil, fmt.Errorf("no pending invitation from %s", inviter.ID) } t0 = time.Now() - ac, err := d.edge.AcceptInvitation(ctx, invitee.Token, invID) + ac, err := c.AcceptInvitation(ctx, invitee.Token, invID) d.rec.Record("invitation.accept", ac, time.Since(t0)) if err != nil || ac != "ok" { return nil, fmt.Errorf("invitation.accept: %s", ac) @@ -119,7 +120,7 @@ func (d *Driver) assemble(ctx context.Context, members []seed.Account, variant s } t0 = time.Now() - games, gc, err := d.edge.GamesList(ctx, inviter.Token) + games, gc, err := c.GamesList(ctx, inviter.Token) d.rec.Record("games.list", gc, time.Since(t0)) if err != nil || gc != "ok" { return nil, fmt.Errorf("games.list: %s", gc) diff --git a/loadtest/internal/scenario/hammer.go b/loadtest/internal/scenario/hammer.go index 18e51a6..a6be366 100644 --- a/loadtest/internal/scenario/hammer.go +++ b/loadtest/internal/scenario/hammer.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "scrabble/loadtest/internal/edge" "scrabble/loadtest/internal/seed" ) @@ -29,6 +30,7 @@ func (d *Driver) Hammer(ctx context.Context, acc seed.Account, cfg HammerConfig) runCtx, cancel := context.WithTimeout(ctx, cfg.Duration) defer cancel() d.log.Info("gateway-hammer", "workers", cfg.Workers, "duration", cfg.Duration) + c := edge.New(d.gateway) var wg sync.WaitGroup for w := 0; w < cfg.Workers; w++ { wg.Add(1) @@ -36,7 +38,7 @@ func (d *Driver) Hammer(ctx context.Context, acc seed.Account, cfg HammerConfig) defer wg.Done() for runCtx.Err() == nil { t0 := time.Now() - _, code, _ := d.edge.GamesList(runCtx, acc.Token) + _, code, _ := c.GamesList(runCtx, acc.Token) d.rec.Record("hammer:games.list", code, time.Since(t0)) } }() diff --git a/loadtest/internal/scenario/scenario.go b/loadtest/internal/scenario/scenario.go index ccd964a..e86a8ca 100644 --- a/loadtest/internal/scenario/scenario.go +++ b/loadtest/internal/scenario/scenario.go @@ -9,6 +9,7 @@ import ( "context" "log/slog" "math/rand" + "slices" "sync" "time" @@ -18,18 +19,20 @@ import ( "scrabble/loadtest/internal/seed" ) -// Driver ties the edge client, the local move generator and the run recorder -// together. All three are safe for concurrent use by many player goroutines. +// Driver ties the gateway endpoint, the local move generator and the run recorder +// together. It builds one edge client per virtual player, so each player owns its +// h2c connection (its Subscribe stream and Execute calls share it) the way a real +// client does, rather than multiplexing every player over a single shared transport. type Driver struct { - edge *edge.Client - moves *moves.Registry - rec *report.Recorder - log *slog.Logger + gateway string // gateway base URL, e.g. http://gateway:8081 + moves *moves.Registry + rec *report.Recorder + log *slog.Logger } -// NewDriver builds a Driver. -func NewDriver(c *edge.Client, m *moves.Registry, rec *report.Recorder, log *slog.Logger) *Driver { - return &Driver{edge: c, moves: m, rec: rec, log: log} +// NewDriver builds a Driver targeting the gateway base URL. +func NewDriver(gateway string, m *moves.Registry, rec *report.Recorder, log *slog.Logger) *Driver { + return &Driver{gateway: gateway, moves: m, rec: rec, log: log} } // RealisticConfig parameterises the under-the-limit ramp. @@ -98,11 +101,16 @@ func (d *Driver) RunRealistic(ctx context.Context, pool *seed.Pool, cfg Realisti return nil } -// playerLoop runs one virtual player: a live-event subscription (loads the push hub, -// counts events) plus a round-robin turn loop over the player's games. +// playerLoop runs one virtual player over its own edge client (its own h2c +// connection): a live-event subscription (loads the push hub, counts events) plus a +// round-robin turn loop over the player's games. A game that has finished is dropped +// from the rotation so secondary ops stop hitting an ended game; once no active game +// remains the player idles, still holding its stream, until the run ends. func (d *Driver) playerLoop(ctx context.Context, p seed.Account, games []*Game, cfg RealisticConfig, rng *rand.Rand) { - go d.subscribeLoop(ctx, p) - if len(games) == 0 { + c := edge.New(d.gateway) + go d.subscribeLoop(ctx, c, p) + active := games + if len(active) == 0 { <-ctx.Done() return } @@ -114,22 +122,30 @@ func (d *Driver) playerLoop(ctx context.Context, p seed.Account, games []*Game, case <-ctx.Done(): return case <-ticker.C: - g := games[gi%len(games)] + g := active[gi%len(active)] gi++ if rng.Float64() < cfg.SecondaryProb { - d.secondaryOp(ctx, p, g, rng) + d.secondaryOp(ctx, c, p, g, rng) continue } - d.playTurn(ctx, p, g, rng) + if d.playTurn(ctx, c, p, g, rng) { + active = slices.DeleteFunc(active, func(x *Game) bool { return x == g }) + gi = 0 + if len(active) == 0 { + <-ctx.Done() + return + } + } } } } -// subscribeLoop holds the player's live-event stream open, counting events and -// reconnecting with a brief backoff after a drop, until the run ends. -func (d *Driver) subscribeLoop(ctx context.Context, p seed.Account) { +// subscribeLoop holds the player's live-event stream open on the player's client, +// counting events and reconnecting with a brief backoff after a drop, until the run +// ends. +func (d *Driver) subscribeLoop(ctx context.Context, c *edge.Client, p seed.Account) { for ctx.Err() == nil { - err := d.edge.Subscribe(ctx, p.Token, func(e edge.Event) { d.rec.Event(e.Kind) }) + err := c.Subscribe(ctx, p.Token, func(e edge.Event) { d.rec.Event(e.Kind) }) if ctx.Err() != nil { return } @@ -144,80 +160,90 @@ func (d *Driver) subscribeLoop(ctx context.Context, p seed.Account) { } } -// playTurn plays one turn in g when it is the player's move: fetch state, replay -// history, pick a legal move and submit it (or exchange / pass). -func (d *Driver) playTurn(ctx context.Context, p seed.Account, g *Game, rng *rand.Rand) { +// playTurn plays one turn in g over the player's client when it is the player's +// move: fetch state, replay history, pick a legal move and submit it (or exchange / +// pass). It reports whether the game has finished, so the caller can drop it from the +// rotation. +func (d *Driver) playTurn(ctx context.Context, c *edge.Client, p seed.Account, g *Game, rng *rand.Rand) (finished bool) { seat := g.seatOf(p.ID.String()) if seat < 0 { - return + return false } t0 := time.Now() - st, code, err := d.edge.State(ctx, p.Token, g.ID) + st, code, err := c.State(ctx, p.Token, g.ID) d.rec.Record("game.state", code, time.Since(t0)) - if err != nil || code != "ok" || !st.Game.Active() || st.Game.ToMove != seat { - return + if err != nil || code != "ok" { + return false + } + if !st.Game.Active() { + return true + } + if st.Game.ToMove != seat { + return false } t0 = time.Now() - hist, hc, err := d.edge.History(ctx, p.Token, g.ID) + hist, hc, err := c.History(ctx, p.Token, g.ID) d.rec.Record("game.history", hc, time.Since(t0)) if err != nil || hc != "ok" { - return + return false } action, err := d.moves.Pick(g.Variant, hist, st.Rack, st.BagLen, rng) if err != nil { d.log.Debug("pick move", "variant", g.Variant, "err", err) - return + return false } switch action.Kind { case "play": t0 = time.Now() - _, c, _ := d.edge.SubmitPlay(ctx, p.Token, g.ID, action.Dir, action.Tiles) - d.rec.Record("game.submit_play", c, time.Since(t0)) + _, code, _ := c.SubmitPlay(ctx, p.Token, g.ID, action.Dir, action.Tiles) + d.rec.Record("game.submit_play", code, time.Since(t0)) case "exchange": t0 = time.Now() - _, c, _ := d.edge.Exchange(ctx, p.Token, g.ID, action.Exchange) - d.rec.Record("game.exchange", c, time.Since(t0)) + _, code, _ := c.Exchange(ctx, p.Token, g.ID, action.Exchange) + d.rec.Record("game.exchange", code, time.Since(t0)) default: t0 = time.Now() - _, c, _ := d.edge.Pass(ctx, p.Token, g.ID) - d.rec.Record("game.pass", c, time.Since(t0)) + _, code, _ := c.Pass(ctx, p.Token, g.ID) + d.rec.Record("game.pass", code, time.Since(t0)) } + return false } // secondaryOp exercises one of the non-move edge operations the plan calls out, so -// the run touches nudge / chat / check-word / draft / profile / stats too. -func (d *Driver) secondaryOp(ctx context.Context, p seed.Account, g *Game, rng *rand.Rand) { +// the run touches nudge / chat / check-word / draft / profile / stats too, over the +// player's own client. +func (d *Driver) secondaryOp(ctx context.Context, c *edge.Client, p seed.Account, g *Game, rng *rand.Rand) { t0 := time.Now() switch rng.Intn(7) { case 0: - c, _ := d.edge.Nudge(ctx, p.Token, g.ID) - d.rec.Record("chat.nudge", c, time.Since(t0)) + code, _ := c.Nudge(ctx, p.Token, g.ID) + d.rec.Record("chat.nudge", code, time.Since(t0)) case 1: - c, _ := d.edge.ChatPost(ctx, p.Token, g.ID, "gg") - d.rec.Record("chat.post", c, time.Since(t0)) + code, _ := c.ChatPost(ctx, p.Token, g.ID, "gg") + d.rec.Record("chat.post", code, time.Since(t0)) case 2: - c, _ := d.edge.CheckWord(ctx, p.Token, g.ID, []byte{0, 1, 2}) - d.rec.Record("game.check_word", c, time.Since(t0)) + code, _ := c.CheckWord(ctx, p.Token, g.ID, []byte{0, 1, 2}) + d.rec.Record("game.check_word", code, time.Since(t0)) case 3: // rack_order is an opaque string and board_tiles a (here empty) array, per the // backend draft DTO; a malformed shape is rejected as bad_request. - c, _ := d.edge.DraftSave(ctx, p.Token, g.ID, `{"rack_order":"","board_tiles":[]}`) - d.rec.Record("draft.save", c, time.Since(t0)) + code, _ := c.DraftSave(ctx, p.Token, g.ID, `{"rack_order":"","board_tiles":[]}`) + d.rec.Record("draft.save", code, time.Since(t0)) case 4: - c, _ := d.edge.DraftGet(ctx, p.Token, g.ID) - d.rec.Record("draft.get", c, time.Since(t0)) + code, _ := c.DraftGet(ctx, p.Token, g.ID) + d.rec.Record("draft.get", code, time.Since(t0)) case 5: lang := "en" if rng.Intn(2) == 1 { lang = "ru" } - c, _ := d.edge.ProfileUpdate(ctx, p.Token, p.Name, lang) - d.rec.Record("profile.update", c, time.Since(t0)) + code, _ := c.ProfileUpdate(ctx, p.Token, p.Name, lang) + d.rec.Record("profile.update", code, time.Since(t0)) default: - c, _ := d.edge.Stats(ctx, p.Token) - d.rec.Record("stats.get", c, time.Since(t0)) + code, _ := c.Stats(ctx, p.Token) + d.rec.Record("stats.get", code, time.Since(t0)) } }