diplomail (Stage C): paid-tier broadcast + multi-game + cleanup
Tests · Go / test (pull_request) Successful in 1m59s
Tests · Go / test (push) Successful in 1m59s
Tests · Integration / integration (pull_request) Successful in 1m36s

Closes out the producer-side of the diplomail surface. Paid-tier
players can fan out one personal message to the rest of the active
roster (gated on entitlement_snapshots.is_paid). Site admins gain a
multi-game broadcast (POST /admin/mail/broadcast with `selected` /
`all_running` scopes) and the bulk-purge endpoint that wipes
diplomail rows tied to games finished more than N years ago. An
admin listing (GET /admin/mail/messages) rounds out the
observability surface.

EntitlementReader and GameLookup are new narrow deps wired from
`*user.Service` and `*lobby.Service` in cmd/backend/main; the lobby
service grows a one-off `ListFinishedGamesBefore` helper for the
cleanup path (the cache evicts terminal-state games so the cache
walk is not enough). Stage D will swap LangUndetermined for an
actual body-language detector and add the translation cache.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ilia Denisov
2026-05-15 19:02:46 +02:00
parent b3f24cc440
commit 362f92e520
14 changed files with 1423 additions and 4 deletions
+219
View File
@@ -103,6 +103,225 @@ func (s *Service) SendAdminBroadcast(ctx context.Context, in SendAdminBroadcastI
return msg, recipients, nil
}
// SendPlayerBroadcast persists a paid-tier player broadcast and
// fans out the push event to every other active member of the game.
// The send is `kind="personal"`, `sender_kind="player"`,
// `broadcast_scope="game_broadcast"` — recipients reply to it as if
// it were a single-recipient personal send, and the reply targets
// only the broadcaster. The caller's entitlement tier is checked
// against `EntitlementReader`; free-tier callers are rejected with
// ErrForbidden.
func (s *Service) SendPlayerBroadcast(ctx context.Context, in SendPlayerBroadcastInput) (Message, []Recipient, error) {
subject, body, err := s.prepareContent(in.Subject, in.Body)
if err != nil {
return Message{}, nil, err
}
if s.deps.Entitlements == nil {
return Message{}, nil, fmt.Errorf("%w: entitlement reader is not wired", ErrForbidden)
}
paid, err := s.deps.Entitlements.IsPaidTier(ctx, in.SenderUserID)
if err != nil {
return Message{}, nil, fmt.Errorf("diplomail: entitlement lookup: %w", err)
}
if !paid {
return Message{}, nil, fmt.Errorf("%w: in-game broadcast requires a paid tier", ErrForbidden)
}
sender, err := s.deps.Memberships.GetActiveMembership(ctx, in.GameID, in.SenderUserID)
if err != nil {
if errors.Is(err, ErrNotFound) {
return Message{}, nil, fmt.Errorf("%w: sender is not an active member of the game", ErrForbidden)
}
return Message{}, nil, fmt.Errorf("diplomail: load sender membership: %w", err)
}
members, err := s.deps.Memberships.ListMembers(ctx, in.GameID, RecipientScopeActive)
if err != nil {
return Message{}, nil, fmt.Errorf("diplomail: list active members: %w", err)
}
callerID := in.SenderUserID
members = filterOutCaller(members, &callerID)
if len(members) == 0 {
return Message{}, nil, fmt.Errorf("%w: no other active members in this game", ErrInvalidInput)
}
username := sender.UserName
msgInsert := MessageInsert{
MessageID: uuid.New(),
GameID: in.GameID,
GameName: sender.GameName,
Kind: KindPersonal,
SenderKind: SenderKindPlayer,
SenderUserID: &callerID,
SenderUsername: &username,
SenderIP: in.SenderIP,
Subject: subject,
Body: body,
BodyLang: LangUndetermined,
BroadcastScope: BroadcastScopeGameBroadcast,
}
rcptInserts := make([]RecipientInsert, 0, len(members))
for _, m := range members {
rcptInserts = append(rcptInserts, buildRecipientInsert(msgInsert.MessageID, m))
}
msg, recipients, err := s.deps.Store.InsertMessageWithRecipients(ctx, msgInsert, rcptInserts)
if err != nil {
return Message{}, nil, fmt.Errorf("diplomail: send player broadcast: %w", err)
}
for _, r := range recipients {
s.publishMessageReceived(ctx, msg, r)
}
return msg, recipients, nil
}
// SendAdminMultiGameBroadcast emits one admin-kind message per game
// resolved from the input scope and fans out the push events. A
// recipient who plays in multiple addressed games receives one
// independently-deletable inbox entry per game; this avoids cross-
// game leakage of admin context and keeps the per-game unread badge
// honest.
func (s *Service) SendAdminMultiGameBroadcast(ctx context.Context, in SendMultiGameBroadcastInput) ([]Message, int, error) {
subject, body, err := s.prepareContent(in.Subject, in.Body)
if err != nil {
return nil, 0, err
}
if err := validateCaller(CallerKindAdmin, nil, in.CallerUsername); err != nil {
return nil, 0, err
}
scope, err := normaliseScope(in.RecipientScope)
if err != nil {
return nil, 0, err
}
if s.deps.Games == nil {
return nil, 0, fmt.Errorf("%w: game lookup is not wired", ErrInvalidInput)
}
games, err := s.resolveMultiGameTargets(ctx, in)
if err != nil {
return nil, 0, err
}
if len(games) == 0 {
return nil, 0, fmt.Errorf("%w: no games match the broadcast scope", ErrInvalidInput)
}
totalRecipients := 0
out := make([]Message, 0, len(games))
for _, game := range games {
members, err := s.deps.Memberships.ListMembers(ctx, game.GameID, scope)
if err != nil {
return nil, 0, fmt.Errorf("diplomail: list members for %s: %w", game.GameID, err)
}
if len(members) == 0 {
s.deps.Logger.Debug("multi-game broadcast skips empty game",
zap.String("game_id", game.GameID.String()),
zap.String("scope", scope))
continue
}
msgInsert, err := s.buildAdminMessageInsert(CallerKindAdmin, nil, in.CallerUsername,
game.GameID, game.GameName, subject, body, in.SenderIP, BroadcastScopeMultiGameBroadcast)
if err != nil {
return nil, 0, err
}
rcptInserts := make([]RecipientInsert, 0, len(members))
for _, m := range members {
rcptInserts = append(rcptInserts, buildRecipientInsert(msgInsert.MessageID, m))
}
msg, recipients, err := s.deps.Store.InsertMessageWithRecipients(ctx, msgInsert, rcptInserts)
if err != nil {
return nil, 0, fmt.Errorf("diplomail: insert multi-game broadcast for %s: %w", game.GameID, err)
}
for _, r := range recipients {
s.publishMessageReceived(ctx, msg, r)
}
out = append(out, msg)
totalRecipients += len(recipients)
}
return out, totalRecipients, nil
}
func (s *Service) resolveMultiGameTargets(ctx context.Context, in SendMultiGameBroadcastInput) ([]GameSnapshot, error) {
switch in.Scope {
case MultiGameScopeAllRunning:
games, err := s.deps.Games.ListRunningGames(ctx)
if err != nil {
return nil, fmt.Errorf("diplomail: list running games: %w", err)
}
return games, nil
case MultiGameScopeSelected, "":
if len(in.GameIDs) == 0 {
return nil, fmt.Errorf("%w: selected scope requires game_ids", ErrInvalidInput)
}
out := make([]GameSnapshot, 0, len(in.GameIDs))
for _, id := range in.GameIDs {
game, err := s.deps.Games.GetGame(ctx, id)
if err != nil {
if errors.Is(err, ErrNotFound) {
return nil, fmt.Errorf("%w: game %s not found", ErrInvalidInput, id)
}
return nil, fmt.Errorf("diplomail: load game %s: %w", id, err)
}
out = append(out, game)
}
return out, nil
default:
return nil, fmt.Errorf("%w: unknown multi-game scope %q", ErrInvalidInput, in.Scope)
}
}
// BulkCleanup deletes every diplomail_messages row tied to games that
// finished more than `OlderThanYears` years ago. Returns the affected
// game ids and the count of removed messages. The minimum allowed
// value is 1 year — finer-grained pruning would risk wiping live
// arbitration evidence.
func (s *Service) BulkCleanup(ctx context.Context, in BulkCleanupInput) (CleanupResult, error) {
if in.OlderThanYears < 1 {
return CleanupResult{}, fmt.Errorf("%w: older_than_years must be >= 1", ErrInvalidInput)
}
if s.deps.Games == nil {
return CleanupResult{}, fmt.Errorf("%w: game lookup is not wired", ErrInvalidInput)
}
cutoff := s.nowUTC().AddDate(-in.OlderThanYears, 0, 0)
games, err := s.deps.Games.ListFinishedGamesBefore(ctx, cutoff)
if err != nil {
return CleanupResult{}, fmt.Errorf("diplomail: list finished games: %w", err)
}
if len(games) == 0 {
return CleanupResult{}, nil
}
gameIDs := make([]uuid.UUID, 0, len(games))
for _, g := range games {
gameIDs = append(gameIDs, g.GameID)
}
deleted, err := s.deps.Store.DeleteMessagesForGames(ctx, gameIDs)
if err != nil {
return CleanupResult{}, fmt.Errorf("diplomail: bulk delete: %w", err)
}
return CleanupResult{GameIDs: gameIDs, MessagesDeleted: deleted}, nil
}
// ListMessagesForAdmin returns a paginated, optionally-filtered view
// of every persisted message. Used by the admin observability
// endpoint to inspect what has been sent and trace abuse reports.
func (s *Service) ListMessagesForAdmin(ctx context.Context, filter AdminMessageListing) (AdminMessagePage, error) {
rows, total, err := s.deps.Store.ListMessagesForAdmin(ctx, filter)
if err != nil {
return AdminMessagePage{}, err
}
page := filter.Page
if page < 1 {
page = 1
}
pageSize := filter.PageSize
if pageSize < 1 {
pageSize = 50
}
return AdminMessagePage{
Items: rows,
Total: total,
Page: page,
PageSize: pageSize,
}, nil
}
// PublishLifecycle persists a system-kind message in response to a
// lobby lifecycle transition and fan-outs push events to the
// affected recipients. Game-scoped transitions (`game.paused`,