Files
galaxy-game/backend/internal/diplomail/store.go
T
Ilia Denisov 57d2286f5e
Tests · Go / test (push) Successful in 2m5s
Tests · Go / test (pull_request) Successful in 2m10s
Tests · Integration / integration (pull_request) Successful in 1m54s
Tests · UI / test (pull_request) Successful in 2m53s
Phase 28 (Step 3a): /sent returns full message detail per recipient
Phase 28's in-game mail UI threads sent messages by the recipient
race name, so the bulk `/sent` endpoint now returns the same
`UserMailMessageDetail` shape as `/inbox` — single sends contribute
one row per message, broadcasts contribute one row per addressee
and the UI collapses them by `message_id` into a stand-alone item.

- `Store.ListSent` / `Service.ListSent` switched from `[]Message`
  to `[]InboxEntry`. SQL grows an INNER JOIN with
  `diplomail_recipients`.
- Handler emits `userMailMessageDetailWire` items; the deprecated
  `userMailSentSummaryWire` is removed.
- `openapi.yaml`: `UserMailSentList.items` now reference
  `UserMailMessageDetail`; the standalone `UserMailSentSummary`
  schema is dropped.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 22:27:39 +02:00

823 lines
28 KiB
Go

package diplomail
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"galaxy/backend/internal/postgres/jet/backend/model"
"galaxy/backend/internal/postgres/jet/backend/table"
"github.com/go-jet/jet/v2/postgres"
"github.com/go-jet/jet/v2/qrm"
"github.com/google/uuid"
)
// Store is the Postgres-backed query surface for the diplomail
// package. All queries are built through go-jet against the generated
// table bindings under `backend/internal/postgres/jet/backend/table`.
type Store struct {
db *sql.DB
}
// NewStore constructs a Store wrapping db.
func NewStore(db *sql.DB) *Store { return &Store{db: db} }
// messageColumns is the canonical projection for diplomail_messages
// reads.
func messageColumns() postgres.ColumnList {
m := table.DiplomailMessages
return postgres.ColumnList{
m.MessageID, m.GameID, m.GameName, m.Kind, m.SenderKind,
m.SenderUserID, m.SenderUsername, m.SenderRaceName, m.SenderIP,
m.Subject, m.Body, m.BodyLang, m.BroadcastScope, m.CreatedAt,
}
}
// recipientColumns is the canonical projection for
// diplomail_recipients reads.
func recipientColumns() postgres.ColumnList {
r := table.DiplomailRecipients
return postgres.ColumnList{
r.RecipientID, r.MessageID, r.GameID, r.UserID,
r.RecipientUserName, r.RecipientRaceName, r.RecipientPreferredLanguage,
r.AvailableAt, r.TranslationAttempts, r.NextTranslationAttemptAt,
r.DeliveredAt, r.ReadAt, r.DeletedAt, r.NotifiedAt,
}
}
// MessageInsert carries the immutable per-message fields. The store
// fills MessageID, sets CreatedAt to `now()` via the column default,
// and leaves recipient-side state to InsertRecipient.
type MessageInsert struct {
MessageID uuid.UUID
GameID uuid.UUID
GameName string
Kind string
SenderKind string
SenderUserID *uuid.UUID
SenderUsername *string
SenderRaceName *string
SenderIP string
Subject string
Body string
BodyLang string
BroadcastScope string
}
// RecipientInsert carries the per-recipient snapshot. AvailableAt
// captures the async-delivery contract: when non-nil, the recipient
// row is materialised already-delivered (no translation needed or
// the language matches); when nil, the recipient is queued for the
// translation worker.
type RecipientInsert struct {
RecipientID uuid.UUID
MessageID uuid.UUID
GameID uuid.UUID
UserID uuid.UUID
RecipientUserName string
RecipientRaceName *string
RecipientPreferredLanguage string
AvailableAt *time.Time
}
// InsertMessageWithRecipients persists a Message together with one or
// more Recipient rows inside a single transaction. The function is
// the canonical write path for every send variant: Stage A passes a
// single-element slice; later stages reuse the same path for
// broadcasts.
func (s *Store) InsertMessageWithRecipients(ctx context.Context, msg MessageInsert, recipients []RecipientInsert) (Message, []Recipient, error) {
if len(recipients) == 0 {
return Message{}, nil, errors.New("diplomail store: at least one recipient required")
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return Message{}, nil, fmt.Errorf("diplomail store: begin tx: %w", err)
}
defer func() { _ = tx.Rollback() }()
m := table.DiplomailMessages
msgStmt := m.INSERT(
m.MessageID, m.GameID, m.GameName, m.Kind, m.SenderKind,
m.SenderUserID, m.SenderUsername, m.SenderRaceName, m.SenderIP,
m.Subject, m.Body, m.BodyLang, m.BroadcastScope,
).VALUES(
msg.MessageID,
msg.GameID,
msg.GameName,
msg.Kind,
msg.SenderKind,
uuidPtrArg(msg.SenderUserID),
stringPtrArg(msg.SenderUsername),
stringPtrArg(msg.SenderRaceName),
msg.SenderIP,
msg.Subject,
msg.Body,
msg.BodyLang,
msg.BroadcastScope,
).RETURNING(messageColumns())
var msgRow model.DiplomailMessages
if err := msgStmt.QueryContext(ctx, tx, &msgRow); err != nil {
return Message{}, nil, fmt.Errorf("diplomail store: insert message: %w", err)
}
r := table.DiplomailRecipients
rcptStmt := r.INSERT(
r.RecipientID, r.MessageID, r.GameID, r.UserID,
r.RecipientUserName, r.RecipientRaceName,
r.RecipientPreferredLanguage, r.AvailableAt,
)
for _, in := range recipients {
rcptStmt = rcptStmt.VALUES(
in.RecipientID,
in.MessageID,
in.GameID,
in.UserID,
in.RecipientUserName,
stringPtrArg(in.RecipientRaceName),
in.RecipientPreferredLanguage,
timePtrArg(in.AvailableAt),
)
}
rcptStmt = rcptStmt.RETURNING(recipientColumns())
var rcptRows []model.DiplomailRecipients
if err := rcptStmt.QueryContext(ctx, tx, &rcptRows); err != nil {
return Message{}, nil, fmt.Errorf("diplomail store: insert recipients: %w", err)
}
if err := tx.Commit(); err != nil {
return Message{}, nil, fmt.Errorf("diplomail store: commit: %w", err)
}
return messageFromModel(msgRow), recipientsFromModel(rcptRows), nil
}
// LoadMessage returns the Message row identified by messageID. The
// function is used by readers that already verified recipient
// authorisation; callers that need both the message and the
// recipient's per-user state should use LoadInboxEntry.
func (s *Store) LoadMessage(ctx context.Context, messageID uuid.UUID) (Message, error) {
m := table.DiplomailMessages
stmt := postgres.SELECT(messageColumns()).
FROM(m).
WHERE(m.MessageID.EQ(postgres.UUID(messageID))).
LIMIT(1)
var row model.DiplomailMessages
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return Message{}, ErrNotFound
}
return Message{}, fmt.Errorf("diplomail store: load message %s: %w", messageID, err)
}
return messageFromModel(row), nil
}
// LoadInboxEntry returns a Message together with the caller's
// Recipient row, both for messageID. Returns ErrNotFound when the
// caller is not a recipient of the message — this is also how the
// service layer enforces "only recipients may read".
func (s *Store) LoadInboxEntry(ctx context.Context, messageID, userID uuid.UUID) (InboxEntry, error) {
m := table.DiplomailMessages
r := table.DiplomailRecipients
cols := append(messageColumns(), recipientColumns()...)
stmt := postgres.SELECT(cols).
FROM(r.INNER_JOIN(m, m.MessageID.EQ(r.MessageID))).
WHERE(
r.MessageID.EQ(postgres.UUID(messageID)).
AND(r.UserID.EQ(postgres.UUID(userID))),
).
LIMIT(1)
var dest struct {
model.DiplomailMessages
Recipient model.DiplomailRecipients `alias:"diplomail_recipients"`
}
if err := stmt.QueryContext(ctx, s.db, &dest); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return InboxEntry{}, ErrNotFound
}
return InboxEntry{}, fmt.Errorf("diplomail store: load inbox entry %s/%s: %w", messageID, userID, err)
}
return InboxEntry{
Message: messageFromModel(dest.DiplomailMessages),
Recipient: recipientFromModel(dest.Recipient),
}, nil
}
// ListInbox returns the recipient view of messages addressed to
// userID in gameID, newest first. Soft-deleted rows
// (`deleted_at IS NOT NULL`) are excluded. Rows still waiting for
// the async translation worker (`available_at IS NULL`) are also
// excluded — they will appear once delivery is complete.
func (s *Store) ListInbox(ctx context.Context, gameID, userID uuid.UUID) ([]InboxEntry, error) {
m := table.DiplomailMessages
r := table.DiplomailRecipients
cols := append(messageColumns(), recipientColumns()...)
stmt := postgres.SELECT(cols).
FROM(r.INNER_JOIN(m, m.MessageID.EQ(r.MessageID))).
WHERE(
r.UserID.EQ(postgres.UUID(userID)).
AND(r.GameID.EQ(postgres.UUID(gameID))).
AND(r.DeletedAt.IS_NULL()).
AND(r.AvailableAt.IS_NOT_NULL()),
).
ORDER_BY(m.CreatedAt.DESC(), m.MessageID.DESC())
var dest []struct {
model.DiplomailMessages
Recipient model.DiplomailRecipients `alias:"diplomail_recipients"`
}
if err := stmt.QueryContext(ctx, s.db, &dest); err != nil {
return nil, fmt.Errorf("diplomail store: list inbox %s/%s: %w", gameID, userID, err)
}
out := make([]InboxEntry, 0, len(dest))
for _, row := range dest {
out = append(out, InboxEntry{
Message: messageFromModel(row.DiplomailMessages),
Recipient: recipientFromModel(row.Recipient),
})
}
return out, nil
}
// ListSent returns the sender-side view of personal messages
// authored by senderUserID in gameID, newest first. Each
// `InboxEntry` carries the message together with one of its
// recipient rows — single sends produce one entry per message;
// game broadcasts produce one entry per addressee (the in-game
// mail UI collapses broadcast entries into a single stand-alone
// item by `message_id`). Admin / system rows have
// `sender_user_id IS NULL` and are excluded by the WHERE clause.
func (s *Store) ListSent(ctx context.Context, gameID, senderUserID uuid.UUID) ([]InboxEntry, error) {
m := table.DiplomailMessages
r := table.DiplomailRecipients
cols := append(messageColumns(), recipientColumns()...)
stmt := postgres.SELECT(cols).
FROM(m.INNER_JOIN(r, r.MessageID.EQ(m.MessageID))).
WHERE(
m.GameID.EQ(postgres.UUID(gameID)).
AND(m.SenderUserID.EQ(postgres.UUID(senderUserID))),
).
ORDER_BY(m.CreatedAt.DESC(), m.MessageID.DESC(), r.RecipientID.ASC())
var dest []struct {
model.DiplomailMessages
Recipient model.DiplomailRecipients `alias:"diplomail_recipients"`
}
if err := stmt.QueryContext(ctx, s.db, &dest); err != nil {
return nil, fmt.Errorf("diplomail store: list sent %s/%s: %w", gameID, senderUserID, err)
}
out := make([]InboxEntry, 0, len(dest))
for _, row := range dest {
out = append(out, InboxEntry{
Message: messageFromModel(row.DiplomailMessages),
Recipient: recipientFromModel(row.Recipient),
})
}
return out, nil
}
// MarkRead sets `read_at = at` on the recipient row identified by
// (messageID, userID). Idempotent: a row that is already marked read
// is left untouched but the existing Recipient is returned.
// Returns ErrNotFound when the user is not a recipient of the message.
func (s *Store) MarkRead(ctx context.Context, messageID, userID uuid.UUID, at time.Time) (Recipient, error) {
r := table.DiplomailRecipients
stmt := r.UPDATE(r.ReadAt).
SET(postgres.TimestampzT(at.UTC())).
WHERE(
r.MessageID.EQ(postgres.UUID(messageID)).
AND(r.UserID.EQ(postgres.UUID(userID))).
AND(r.ReadAt.IS_NULL()),
).
RETURNING(recipientColumns())
var row model.DiplomailRecipients
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
if !errors.Is(err, qrm.ErrNoRows) {
return Recipient{}, fmt.Errorf("diplomail store: mark read %s/%s: %w", messageID, userID, err)
}
// The row exists but read_at was already set, or the row
// does not exist at all. Fetch to disambiguate.
existing, loadErr := s.LoadRecipient(ctx, messageID, userID)
if loadErr != nil {
return Recipient{}, loadErr
}
return existing, nil
}
return recipientFromModel(row), nil
}
// SoftDelete sets `deleted_at = at` on the recipient row identified by
// (messageID, userID). The row must already have `read_at` set;
// otherwise the call returns ErrConflict so a hostile client cannot
// erase a message before opening it (item 10 of the spec).
// Returns ErrNotFound when the user is not a recipient.
func (s *Store) SoftDelete(ctx context.Context, messageID, userID uuid.UUID, at time.Time) (Recipient, error) {
r := table.DiplomailRecipients
stmt := r.UPDATE(r.DeletedAt).
SET(postgres.TimestampzT(at.UTC())).
WHERE(
r.MessageID.EQ(postgres.UUID(messageID)).
AND(r.UserID.EQ(postgres.UUID(userID))).
AND(r.ReadAt.IS_NOT_NULL()).
AND(r.DeletedAt.IS_NULL()),
).
RETURNING(recipientColumns())
var row model.DiplomailRecipients
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
if !errors.Is(err, qrm.ErrNoRows) {
return Recipient{}, fmt.Errorf("diplomail store: soft delete %s/%s: %w", messageID, userID, err)
}
existing, loadErr := s.LoadRecipient(ctx, messageID, userID)
if loadErr != nil {
return Recipient{}, loadErr
}
if existing.ReadAt == nil {
return Recipient{}, fmt.Errorf("%w: message must be read before delete", ErrConflict)
}
// Already deleted: return the existing row idempotently.
return existing, nil
}
return recipientFromModel(row), nil
}
// LoadRecipient fetches the Recipient row keyed on (messageID, userID).
// Returns ErrNotFound when no such recipient exists.
func (s *Store) LoadRecipient(ctx context.Context, messageID, userID uuid.UUID) (Recipient, error) {
r := table.DiplomailRecipients
stmt := postgres.SELECT(recipientColumns()).
FROM(r).
WHERE(
r.MessageID.EQ(postgres.UUID(messageID)).
AND(r.UserID.EQ(postgres.UUID(userID))),
).
LIMIT(1)
var row model.DiplomailRecipients
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return Recipient{}, ErrNotFound
}
return Recipient{}, fmt.Errorf("diplomail store: load recipient %s/%s: %w", messageID, userID, err)
}
return recipientFromModel(row), nil
}
// UnreadCountForUserGame returns the count of unread, non-deleted,
// delivered messages addressed to userID in gameID. Recipients
// still waiting for translation (`available_at IS NULL`) are
// excluded so the badge does not flicker.
func (s *Store) UnreadCountForUserGame(ctx context.Context, gameID, userID uuid.UUID) (int, error) {
r := table.DiplomailRecipients
stmt := postgres.SELECT(postgres.COUNT(postgres.STAR).AS("count")).
FROM(r).
WHERE(
r.UserID.EQ(postgres.UUID(userID)).
AND(r.GameID.EQ(postgres.UUID(gameID))).
AND(r.ReadAt.IS_NULL()).
AND(r.DeletedAt.IS_NULL()).
AND(r.AvailableAt.IS_NOT_NULL()),
)
var dest struct {
Count int64 `alias:"count"`
}
if err := stmt.QueryContext(ctx, s.db, &dest); err != nil {
return 0, fmt.Errorf("diplomail store: unread count %s/%s: %w", gameID, userID, err)
}
return int(dest.Count), nil
}
// PendingTranslationPair carries one unit of work picked by the
// translation worker. Multiple recipients of the same message that
// share a preferred_language collapse into one pair, because the
// translation is shared via the diplomail_translations cache.
// CurrentAttempts is the highest `translation_attempts` value across
// the matching recipient rows, so the worker can decide whether the
// next attempt is the last one before falling back.
type PendingTranslationPair struct {
MessageID uuid.UUID
TargetLang string
CurrentAttempts int32
}
// PickPendingTranslationPair returns one pair eligible for the
// translation worker, or `ok == false` when the queue is empty. The
// pair is the (message, target_lang) of any recipient where
// `available_at IS NULL` and `next_translation_attempt_at` is either
// unset or already due. The query intentionally drops the
// `FOR UPDATE` clause — the worker is single-threaded per process,
// and the optimistic UPDATE in `MarkPairDelivered` /
// `MarkPairFallback` filters by `available_at IS NULL`, so a stale
// pickup never delivers twice.
func (s *Store) PickPendingTranslationPair(ctx context.Context, now time.Time) (PendingTranslationPair, bool, error) {
r := table.DiplomailRecipients
stmt := postgres.SELECT(
r.MessageID.AS("message_id"),
r.RecipientPreferredLanguage.AS("target_lang"),
postgres.MAX(r.TranslationAttempts).AS("attempts"),
).
FROM(r).
WHERE(
r.AvailableAt.IS_NULL().
AND(r.RecipientPreferredLanguage.NOT_EQ(postgres.String(""))).
AND(r.NextTranslationAttemptAt.IS_NULL().
OR(r.NextTranslationAttemptAt.LT_EQ(postgres.TimestampzT(now.UTC())))),
).
GROUP_BY(r.MessageID, r.RecipientPreferredLanguage).
ORDER_BY(r.MessageID.ASC(), r.RecipientPreferredLanguage.ASC()).
LIMIT(1)
var dest struct {
MessageID uuid.UUID `alias:"message_id"`
TargetLang string `alias:"target_lang"`
Attempts int32 `alias:"attempts"`
}
if err := stmt.QueryContext(ctx, s.db, &dest); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return PendingTranslationPair{}, false, nil
}
return PendingTranslationPair{}, false, fmt.Errorf("diplomail store: pick pending pair: %w", err)
}
if dest.MessageID == (uuid.UUID{}) {
return PendingTranslationPair{}, false, nil
}
return PendingTranslationPair{
MessageID: dest.MessageID,
TargetLang: dest.TargetLang,
CurrentAttempts: dest.Attempts,
}, true, nil
}
// MarkPairDelivered flips every still-pending recipient of (messageID,
// targetLang) to `available_at = at`, optionally persisting the
// translation row alongside in the same transaction. Returns the
// recipients that were just delivered (used by the worker to fan out
// push events).
func (s *Store) MarkPairDelivered(ctx context.Context, messageID uuid.UUID, targetLang string, translation *Translation, at time.Time) ([]Recipient, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("diplomail store: begin deliver tx: %w", err)
}
defer func() { _ = tx.Rollback() }()
if translation != nil {
t := table.DiplomailTranslations
ins := t.INSERT(
t.TranslationID, t.MessageID, t.TargetLang,
t.TranslatedSubject, t.TranslatedBody, t.Translator,
).VALUES(
translation.TranslationID, translation.MessageID, translation.TargetLang,
translation.TranslatedSubject, translation.TranslatedBody, translation.Translator,
).ON_CONFLICT(t.MessageID, t.TargetLang).DO_NOTHING()
if _, err := ins.ExecContext(ctx, tx); err != nil {
return nil, fmt.Errorf("diplomail store: upsert translation: %w", err)
}
}
r := table.DiplomailRecipients
upd := r.UPDATE(r.AvailableAt, r.NextTranslationAttemptAt).
SET(postgres.TimestampzT(at.UTC()), postgres.NULL).
WHERE(
r.MessageID.EQ(postgres.UUID(messageID)).
AND(r.RecipientPreferredLanguage.EQ(postgres.String(targetLang))).
AND(r.AvailableAt.IS_NULL()),
).
RETURNING(recipientColumns())
var rows []model.DiplomailRecipients
if err := upd.QueryContext(ctx, tx, &rows); err != nil {
return nil, fmt.Errorf("diplomail store: mark pair delivered: %w", err)
}
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("diplomail store: commit deliver: %w", err)
}
out := make([]Recipient, 0, len(rows))
for _, row := range rows {
out = append(out, recipientFromModel(row))
}
return out, nil
}
// SchedulePairRetry bumps the attempt counter and schedules the next
// translation attempt for `next`. The recipient rows stay in the
// pending queue (`available_at IS NULL`). Returns the new attempt
// counter so the worker can decide whether to fall back to the
// original on the next pickup.
func (s *Store) SchedulePairRetry(ctx context.Context, messageID uuid.UUID, targetLang string, next time.Time) (int32, error) {
r := table.DiplomailRecipients
upd := r.UPDATE(r.TranslationAttempts, r.NextTranslationAttemptAt).
SET(r.TranslationAttempts.ADD(postgres.Int(1)), postgres.TimestampzT(next.UTC())).
WHERE(
r.MessageID.EQ(postgres.UUID(messageID)).
AND(r.RecipientPreferredLanguage.EQ(postgres.String(targetLang))).
AND(r.AvailableAt.IS_NULL()),
).
RETURNING(r.TranslationAttempts)
var dest []struct {
TranslationAttempts int32 `alias:"diplomail_recipients.translation_attempts"`
}
if err := upd.QueryContext(ctx, s.db, &dest); err != nil {
return 0, fmt.Errorf("diplomail store: schedule pair retry: %w", err)
}
if len(dest) == 0 {
return 0, nil
}
max := dest[0].TranslationAttempts
for _, d := range dest[1:] {
if d.TranslationAttempts > max {
max = d.TranslationAttempts
}
}
return max, nil
}
// translationColumns is the canonical projection for
// diplomail_translations reads.
func translationColumns() postgres.ColumnList {
t := table.DiplomailTranslations
return postgres.ColumnList{
t.TranslationID, t.MessageID, t.TargetLang,
t.TranslatedSubject, t.TranslatedBody, t.Translator, t.TranslatedAt,
}
}
// LoadTranslation returns the cached translation row for
// (messageID, targetLang). Returns ErrNotFound when no cache row
// exists yet — the caller decides whether to compute and persist
// one.
func (s *Store) LoadTranslation(ctx context.Context, messageID uuid.UUID, targetLang string) (Translation, error) {
t := table.DiplomailTranslations
stmt := postgres.SELECT(translationColumns()).
FROM(t).
WHERE(t.MessageID.EQ(postgres.UUID(messageID)).
AND(t.TargetLang.EQ(postgres.String(targetLang)))).
LIMIT(1)
var row model.DiplomailTranslations
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return Translation{}, ErrNotFound
}
return Translation{}, fmt.Errorf("diplomail store: load translation %s/%s: %w", messageID, targetLang, err)
}
return translationFromModel(row), nil
}
// InsertTranslation persists a new translation cache row. The unique
// constraint on (message_id, target_lang) prevents duplicate
// renderings. Callers that race on the same (message, lang) pair
// should be prepared for a UNIQUE violation; the second writer can
// fall back to LoadTranslation.
func (s *Store) InsertTranslation(ctx context.Context, in Translation) (Translation, error) {
t := table.DiplomailTranslations
stmt := t.INSERT(
t.TranslationID, t.MessageID, t.TargetLang,
t.TranslatedSubject, t.TranslatedBody, t.Translator,
).VALUES(
in.TranslationID, in.MessageID, in.TargetLang,
in.TranslatedSubject, in.TranslatedBody, in.Translator,
).RETURNING(translationColumns())
var row model.DiplomailTranslations
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
return Translation{}, fmt.Errorf("diplomail store: insert translation %s/%s: %w", in.MessageID, in.TargetLang, err)
}
return translationFromModel(row), nil
}
func translationFromModel(row model.DiplomailTranslations) Translation {
return Translation{
TranslationID: row.TranslationID,
MessageID: row.MessageID,
TargetLang: row.TargetLang,
TranslatedSubject: row.TranslatedSubject,
TranslatedBody: row.TranslatedBody,
Translator: row.Translator,
TranslatedAt: row.TranslatedAt,
}
}
// DeleteMessagesForGames removes every diplomail_messages row whose
// game_id falls in the supplied set. The cascade defined on the
// `diplomail_recipients` and `diplomail_translations` foreign keys
// removes the per-recipient state and the cached translations in
// the same transaction. Returns the count of messages removed.
//
// Used by the admin bulk-purge endpoint; callers are expected to
// have already filtered the input set to terminal-state games.
func (s *Store) DeleteMessagesForGames(ctx context.Context, gameIDs []uuid.UUID) (int, error) {
if len(gameIDs) == 0 {
return 0, nil
}
args := make([]postgres.Expression, 0, len(gameIDs))
for _, id := range gameIDs {
args = append(args, postgres.UUID(id))
}
m := table.DiplomailMessages
stmt := m.DELETE().WHERE(m.GameID.IN(args...))
res, err := stmt.ExecContext(ctx, s.db)
if err != nil {
return 0, fmt.Errorf("diplomail store: bulk delete messages: %w", err)
}
affected, err := res.RowsAffected()
if err != nil {
return 0, fmt.Errorf("diplomail store: rows affected: %w", err)
}
return int(affected), nil
}
// ListMessagesForAdmin returns a paginated slice of messages
// matching filter. The result is ordered by created_at DESC,
// message_id DESC. Total is the count without pagination so the
// caller can render a "page X of N" envelope.
func (s *Store) ListMessagesForAdmin(ctx context.Context, filter AdminMessageListing) ([]Message, int, error) {
m := table.DiplomailMessages
page := filter.Page
if page < 1 {
page = 1
}
pageSize := filter.PageSize
if pageSize < 1 {
pageSize = 50
}
conditions := postgres.BoolExpression(nil)
addCondition := func(cond postgres.BoolExpression) {
if conditions == nil {
conditions = cond
return
}
conditions = conditions.AND(cond)
}
if filter.GameID != nil {
addCondition(m.GameID.EQ(postgres.UUID(*filter.GameID)))
}
if filter.Kind != "" {
addCondition(m.Kind.EQ(postgres.String(filter.Kind)))
}
if filter.SenderKind != "" {
addCondition(m.SenderKind.EQ(postgres.String(filter.SenderKind)))
}
countStmt := postgres.SELECT(postgres.COUNT(postgres.STAR).AS("count")).FROM(m)
if conditions != nil {
countStmt = countStmt.WHERE(conditions)
}
var countDest struct {
Count int64 `alias:"count"`
}
if err := countStmt.QueryContext(ctx, s.db, &countDest); err != nil {
return nil, 0, fmt.Errorf("diplomail store: count admin messages: %w", err)
}
listStmt := postgres.SELECT(messageColumns()).FROM(m)
if conditions != nil {
listStmt = listStmt.WHERE(conditions)
}
listStmt = listStmt.
ORDER_BY(m.CreatedAt.DESC(), m.MessageID.DESC()).
LIMIT(int64(pageSize)).
OFFSET(int64((page - 1) * pageSize))
var rows []model.DiplomailMessages
if err := listStmt.QueryContext(ctx, s.db, &rows); err != nil {
return nil, 0, fmt.Errorf("diplomail store: list admin messages: %w", err)
}
out := make([]Message, 0, len(rows))
for _, row := range rows {
out = append(out, messageFromModel(row))
}
return out, int(countDest.Count), nil
}
// UnreadCountsForUser returns a per-game breakdown of unread messages
// addressed to userID, plus the matching game names so the lobby
// badge UI can render entries even after the recipient's membership
// has been revoked. The slice is ordered by game name.
func (s *Store) UnreadCountsForUser(ctx context.Context, userID uuid.UUID) ([]UnreadCount, error) {
r := table.DiplomailRecipients
m := table.DiplomailMessages
stmt := postgres.SELECT(
r.GameID.AS("game_id"),
postgres.MAX(m.GameName).AS("game_name"),
postgres.COUNT(postgres.STAR).AS("count"),
).
FROM(r.INNER_JOIN(m, m.MessageID.EQ(r.MessageID))).
WHERE(
r.UserID.EQ(postgres.UUID(userID)).
AND(r.ReadAt.IS_NULL()).
AND(r.DeletedAt.IS_NULL()).
AND(r.AvailableAt.IS_NOT_NULL()),
).
GROUP_BY(r.GameID).
ORDER_BY(postgres.MAX(m.GameName).ASC())
var dest []struct {
GameID uuid.UUID `alias:"game_id"`
GameName string `alias:"game_name"`
Count int64 `alias:"count"`
}
if err := stmt.QueryContext(ctx, s.db, &dest); err != nil {
return nil, fmt.Errorf("diplomail store: unread counts %s: %w", userID, err)
}
out := make([]UnreadCount, 0, len(dest))
for _, row := range dest {
out = append(out, UnreadCount{
GameID: row.GameID,
GameName: row.GameName,
Unread: int(row.Count),
})
}
return out, nil
}
// messageFromModel converts a jet-generated row to the domain type.
func messageFromModel(row model.DiplomailMessages) Message {
out := Message{
MessageID: row.MessageID,
GameID: row.GameID,
GameName: row.GameName,
Kind: row.Kind,
SenderKind: row.SenderKind,
SenderIP: row.SenderIP,
Subject: row.Subject,
Body: row.Body,
BodyLang: row.BodyLang,
BroadcastScope: row.BroadcastScope,
CreatedAt: row.CreatedAt,
}
if row.SenderUserID != nil {
id := *row.SenderUserID
out.SenderUserID = &id
}
if row.SenderUsername != nil {
name := *row.SenderUsername
out.SenderUsername = &name
}
if row.SenderRaceName != nil {
name := *row.SenderRaceName
out.SenderRaceName = &name
}
return out
}
// recipientFromModel converts a jet-generated row to the domain type.
func recipientFromModel(row model.DiplomailRecipients) Recipient {
out := Recipient{
RecipientID: row.RecipientID,
MessageID: row.MessageID,
GameID: row.GameID,
UserID: row.UserID,
RecipientUserName: row.RecipientUserName,
RecipientPreferredLanguage: row.RecipientPreferredLanguage,
AvailableAt: row.AvailableAt,
TranslationAttempts: row.TranslationAttempts,
NextTranslationAttemptAt: row.NextTranslationAttemptAt,
DeliveredAt: row.DeliveredAt,
ReadAt: row.ReadAt,
DeletedAt: row.DeletedAt,
NotifiedAt: row.NotifiedAt,
}
if row.RecipientRaceName != nil {
name := *row.RecipientRaceName
out.RecipientRaceName = &name
}
return out
}
// recipientsFromModel converts a slice in place. Used by
// InsertMessageWithRecipients.
func recipientsFromModel(rows []model.DiplomailRecipients) []Recipient {
out := make([]Recipient, 0, len(rows))
for _, row := range rows {
out = append(out, recipientFromModel(row))
}
return out
}
// uuidPtrArg returns the jet argument expression for a nullable UUID.
// Pre-NULL handling here avoids a custom NULL literal at every call
// site.
func uuidPtrArg(v *uuid.UUID) postgres.Expression {
if v == nil {
return postgres.NULL
}
return postgres.UUID(*v)
}
// stringPtrArg returns the jet argument expression for a nullable
// text column.
func stringPtrArg(v *string) postgres.Expression {
if v == nil {
return postgres.NULL
}
return postgres.String(*v)
}
// timePtrArg returns the jet argument expression for a nullable
// timestamptz column.
func timePtrArg(v *time.Time) postgres.Expression {
if v == nil {
return postgres.NULL
}
return postgres.TimestampzT(v.UTC())
}