package diplomail import ( "context" "database/sql" "errors" "fmt" "time" "galaxy/backend/internal/postgres/jet/backend/model" "galaxy/backend/internal/postgres/jet/backend/table" "github.com/go-jet/jet/v2/postgres" "github.com/go-jet/jet/v2/qrm" "github.com/google/uuid" ) // Store is the Postgres-backed query surface for the diplomail // package. All queries are built through go-jet against the generated // table bindings under `backend/internal/postgres/jet/backend/table`. type Store struct { db *sql.DB } // NewStore constructs a Store wrapping db. func NewStore(db *sql.DB) *Store { return &Store{db: db} } // messageColumns is the canonical projection for diplomail_messages // reads. func messageColumns() postgres.ColumnList { m := table.DiplomailMessages return postgres.ColumnList{ m.MessageID, m.GameID, m.GameName, m.Kind, m.SenderKind, m.SenderUserID, m.SenderUsername, m.SenderIP, m.Subject, m.Body, m.BodyLang, m.BroadcastScope, m.CreatedAt, } } // recipientColumns is the canonical projection for // diplomail_recipients reads. func recipientColumns() postgres.ColumnList { r := table.DiplomailRecipients return postgres.ColumnList{ r.RecipientID, r.MessageID, r.GameID, r.UserID, r.RecipientUserName, r.RecipientRaceName, r.RecipientPreferredLanguage, r.AvailableAt, r.TranslationAttempts, r.NextTranslationAttemptAt, r.DeliveredAt, r.ReadAt, r.DeletedAt, r.NotifiedAt, } } // MessageInsert carries the immutable per-message fields. The store // fills MessageID, sets CreatedAt to `now()` via the column default, // and leaves recipient-side state to InsertRecipient. type MessageInsert struct { MessageID uuid.UUID GameID uuid.UUID GameName string Kind string SenderKind string SenderUserID *uuid.UUID SenderUsername *string SenderIP string Subject string Body string BodyLang string BroadcastScope string } // RecipientInsert carries the per-recipient snapshot. AvailableAt // captures the async-delivery contract: when non-nil, the recipient // row is materialised already-delivered (no translation needed or // the language matches); when nil, the recipient is queued for the // translation worker. type RecipientInsert struct { RecipientID uuid.UUID MessageID uuid.UUID GameID uuid.UUID UserID uuid.UUID RecipientUserName string RecipientRaceName *string RecipientPreferredLanguage string AvailableAt *time.Time } // InsertMessageWithRecipients persists a Message together with one or // more Recipient rows inside a single transaction. The function is // the canonical write path for every send variant: Stage A passes a // single-element slice; later stages reuse the same path for // broadcasts. func (s *Store) InsertMessageWithRecipients(ctx context.Context, msg MessageInsert, recipients []RecipientInsert) (Message, []Recipient, error) { if len(recipients) == 0 { return Message{}, nil, errors.New("diplomail store: at least one recipient required") } tx, err := s.db.BeginTx(ctx, nil) if err != nil { return Message{}, nil, fmt.Errorf("diplomail store: begin tx: %w", err) } defer func() { _ = tx.Rollback() }() m := table.DiplomailMessages msgStmt := m.INSERT( m.MessageID, m.GameID, m.GameName, m.Kind, m.SenderKind, m.SenderUserID, m.SenderUsername, m.SenderIP, m.Subject, m.Body, m.BodyLang, m.BroadcastScope, ).VALUES( msg.MessageID, msg.GameID, msg.GameName, msg.Kind, msg.SenderKind, uuidPtrArg(msg.SenderUserID), stringPtrArg(msg.SenderUsername), msg.SenderIP, msg.Subject, msg.Body, msg.BodyLang, msg.BroadcastScope, ).RETURNING(messageColumns()) var msgRow model.DiplomailMessages if err := msgStmt.QueryContext(ctx, tx, &msgRow); err != nil { return Message{}, nil, fmt.Errorf("diplomail store: insert message: %w", err) } r := table.DiplomailRecipients rcptStmt := r.INSERT( r.RecipientID, r.MessageID, r.GameID, r.UserID, r.RecipientUserName, r.RecipientRaceName, r.RecipientPreferredLanguage, r.AvailableAt, ) for _, in := range recipients { rcptStmt = rcptStmt.VALUES( in.RecipientID, in.MessageID, in.GameID, in.UserID, in.RecipientUserName, stringPtrArg(in.RecipientRaceName), in.RecipientPreferredLanguage, timePtrArg(in.AvailableAt), ) } rcptStmt = rcptStmt.RETURNING(recipientColumns()) var rcptRows []model.DiplomailRecipients if err := rcptStmt.QueryContext(ctx, tx, &rcptRows); err != nil { return Message{}, nil, fmt.Errorf("diplomail store: insert recipients: %w", err) } if err := tx.Commit(); err != nil { return Message{}, nil, fmt.Errorf("diplomail store: commit: %w", err) } return messageFromModel(msgRow), recipientsFromModel(rcptRows), nil } // LoadMessage returns the Message row identified by messageID. The // function is used by readers that already verified recipient // authorisation; callers that need both the message and the // recipient's per-user state should use LoadInboxEntry. func (s *Store) LoadMessage(ctx context.Context, messageID uuid.UUID) (Message, error) { m := table.DiplomailMessages stmt := postgres.SELECT(messageColumns()). FROM(m). WHERE(m.MessageID.EQ(postgres.UUID(messageID))). LIMIT(1) var row model.DiplomailMessages if err := stmt.QueryContext(ctx, s.db, &row); err != nil { if errors.Is(err, qrm.ErrNoRows) { return Message{}, ErrNotFound } return Message{}, fmt.Errorf("diplomail store: load message %s: %w", messageID, err) } return messageFromModel(row), nil } // LoadInboxEntry returns a Message together with the caller's // Recipient row, both for messageID. Returns ErrNotFound when the // caller is not a recipient of the message — this is also how the // service layer enforces "only recipients may read". func (s *Store) LoadInboxEntry(ctx context.Context, messageID, userID uuid.UUID) (InboxEntry, error) { m := table.DiplomailMessages r := table.DiplomailRecipients cols := append(messageColumns(), recipientColumns()...) stmt := postgres.SELECT(cols). FROM(r.INNER_JOIN(m, m.MessageID.EQ(r.MessageID))). WHERE( r.MessageID.EQ(postgres.UUID(messageID)). AND(r.UserID.EQ(postgres.UUID(userID))), ). LIMIT(1) var dest struct { model.DiplomailMessages Recipient model.DiplomailRecipients `alias:"diplomail_recipients"` } if err := stmt.QueryContext(ctx, s.db, &dest); err != nil { if errors.Is(err, qrm.ErrNoRows) { return InboxEntry{}, ErrNotFound } return InboxEntry{}, fmt.Errorf("diplomail store: load inbox entry %s/%s: %w", messageID, userID, err) } return InboxEntry{ Message: messageFromModel(dest.DiplomailMessages), Recipient: recipientFromModel(dest.Recipient), }, nil } // ListInbox returns the recipient view of messages addressed to // userID in gameID, newest first. Soft-deleted rows // (`deleted_at IS NOT NULL`) are excluded. Rows still waiting for // the async translation worker (`available_at IS NULL`) are also // excluded — they will appear once delivery is complete. func (s *Store) ListInbox(ctx context.Context, gameID, userID uuid.UUID) ([]InboxEntry, error) { m := table.DiplomailMessages r := table.DiplomailRecipients cols := append(messageColumns(), recipientColumns()...) stmt := postgres.SELECT(cols). FROM(r.INNER_JOIN(m, m.MessageID.EQ(r.MessageID))). WHERE( r.UserID.EQ(postgres.UUID(userID)). AND(r.GameID.EQ(postgres.UUID(gameID))). AND(r.DeletedAt.IS_NULL()). AND(r.AvailableAt.IS_NOT_NULL()), ). ORDER_BY(m.CreatedAt.DESC(), m.MessageID.DESC()) var dest []struct { model.DiplomailMessages Recipient model.DiplomailRecipients `alias:"diplomail_recipients"` } if err := stmt.QueryContext(ctx, s.db, &dest); err != nil { return nil, fmt.Errorf("diplomail store: list inbox %s/%s: %w", gameID, userID, err) } out := make([]InboxEntry, 0, len(dest)) for _, row := range dest { out = append(out, InboxEntry{ Message: messageFromModel(row.DiplomailMessages), Recipient: recipientFromModel(row.Recipient), }) } return out, nil } // ListSent returns messages authored by senderUserID in gameID, // newest first. Personal messages only — admin/system rows have // `sender_user_id IS NULL` and are filtered out by the WHERE clause. func (s *Store) ListSent(ctx context.Context, gameID, senderUserID uuid.UUID) ([]Message, error) { m := table.DiplomailMessages stmt := postgres.SELECT(messageColumns()). FROM(m). WHERE( m.GameID.EQ(postgres.UUID(gameID)). AND(m.SenderUserID.EQ(postgres.UUID(senderUserID))), ). ORDER_BY(m.CreatedAt.DESC(), m.MessageID.DESC()) var rows []model.DiplomailMessages if err := stmt.QueryContext(ctx, s.db, &rows); err != nil { return nil, fmt.Errorf("diplomail store: list sent %s/%s: %w", gameID, senderUserID, err) } out := make([]Message, 0, len(rows)) for _, row := range rows { out = append(out, messageFromModel(row)) } return out, nil } // MarkRead sets `read_at = at` on the recipient row identified by // (messageID, userID). Idempotent: a row that is already marked read // is left untouched but the existing Recipient is returned. // Returns ErrNotFound when the user is not a recipient of the message. func (s *Store) MarkRead(ctx context.Context, messageID, userID uuid.UUID, at time.Time) (Recipient, error) { r := table.DiplomailRecipients stmt := r.UPDATE(r.ReadAt). SET(postgres.TimestampzT(at.UTC())). WHERE( r.MessageID.EQ(postgres.UUID(messageID)). AND(r.UserID.EQ(postgres.UUID(userID))). AND(r.ReadAt.IS_NULL()), ). RETURNING(recipientColumns()) var row model.DiplomailRecipients if err := stmt.QueryContext(ctx, s.db, &row); err != nil { if !errors.Is(err, qrm.ErrNoRows) { return Recipient{}, fmt.Errorf("diplomail store: mark read %s/%s: %w", messageID, userID, err) } // The row exists but read_at was already set, or the row // does not exist at all. Fetch to disambiguate. existing, loadErr := s.LoadRecipient(ctx, messageID, userID) if loadErr != nil { return Recipient{}, loadErr } return existing, nil } return recipientFromModel(row), nil } // SoftDelete sets `deleted_at = at` on the recipient row identified by // (messageID, userID). The row must already have `read_at` set; // otherwise the call returns ErrConflict so a hostile client cannot // erase a message before opening it (item 10 of the spec). // Returns ErrNotFound when the user is not a recipient. func (s *Store) SoftDelete(ctx context.Context, messageID, userID uuid.UUID, at time.Time) (Recipient, error) { r := table.DiplomailRecipients stmt := r.UPDATE(r.DeletedAt). SET(postgres.TimestampzT(at.UTC())). WHERE( r.MessageID.EQ(postgres.UUID(messageID)). AND(r.UserID.EQ(postgres.UUID(userID))). AND(r.ReadAt.IS_NOT_NULL()). AND(r.DeletedAt.IS_NULL()), ). RETURNING(recipientColumns()) var row model.DiplomailRecipients if err := stmt.QueryContext(ctx, s.db, &row); err != nil { if !errors.Is(err, qrm.ErrNoRows) { return Recipient{}, fmt.Errorf("diplomail store: soft delete %s/%s: %w", messageID, userID, err) } existing, loadErr := s.LoadRecipient(ctx, messageID, userID) if loadErr != nil { return Recipient{}, loadErr } if existing.ReadAt == nil { return Recipient{}, fmt.Errorf("%w: message must be read before delete", ErrConflict) } // Already deleted: return the existing row idempotently. return existing, nil } return recipientFromModel(row), nil } // LoadRecipient fetches the Recipient row keyed on (messageID, userID). // Returns ErrNotFound when no such recipient exists. func (s *Store) LoadRecipient(ctx context.Context, messageID, userID uuid.UUID) (Recipient, error) { r := table.DiplomailRecipients stmt := postgres.SELECT(recipientColumns()). FROM(r). WHERE( r.MessageID.EQ(postgres.UUID(messageID)). AND(r.UserID.EQ(postgres.UUID(userID))), ). LIMIT(1) var row model.DiplomailRecipients if err := stmt.QueryContext(ctx, s.db, &row); err != nil { if errors.Is(err, qrm.ErrNoRows) { return Recipient{}, ErrNotFound } return Recipient{}, fmt.Errorf("diplomail store: load recipient %s/%s: %w", messageID, userID, err) } return recipientFromModel(row), nil } // UnreadCountForUserGame returns the count of unread, non-deleted, // delivered messages addressed to userID in gameID. Recipients // still waiting for translation (`available_at IS NULL`) are // excluded so the badge does not flicker. func (s *Store) UnreadCountForUserGame(ctx context.Context, gameID, userID uuid.UUID) (int, error) { r := table.DiplomailRecipients stmt := postgres.SELECT(postgres.COUNT(postgres.STAR).AS("count")). FROM(r). WHERE( r.UserID.EQ(postgres.UUID(userID)). AND(r.GameID.EQ(postgres.UUID(gameID))). AND(r.ReadAt.IS_NULL()). AND(r.DeletedAt.IS_NULL()). AND(r.AvailableAt.IS_NOT_NULL()), ) var dest struct { Count int64 `alias:"count"` } if err := stmt.QueryContext(ctx, s.db, &dest); err != nil { return 0, fmt.Errorf("diplomail store: unread count %s/%s: %w", gameID, userID, err) } return int(dest.Count), nil } // PendingTranslationPair carries one unit of work picked by the // translation worker. Multiple recipients of the same message that // share a preferred_language collapse into one pair, because the // translation is shared via the diplomail_translations cache. // CurrentAttempts is the highest `translation_attempts` value across // the matching recipient rows, so the worker can decide whether the // next attempt is the last one before falling back. type PendingTranslationPair struct { MessageID uuid.UUID TargetLang string CurrentAttempts int32 } // PickPendingTranslationPair returns one pair eligible for the // translation worker, or `ok == false` when the queue is empty. The // pair is the (message, target_lang) of any recipient where // `available_at IS NULL` and `next_translation_attempt_at` is either // unset or already due. The query intentionally drops the // `FOR UPDATE` clause — the worker is single-threaded per process, // and the optimistic UPDATE in `MarkPairDelivered` / // `MarkPairFallback` filters by `available_at IS NULL`, so a stale // pickup never delivers twice. func (s *Store) PickPendingTranslationPair(ctx context.Context, now time.Time) (PendingTranslationPair, bool, error) { r := table.DiplomailRecipients stmt := postgres.SELECT( r.MessageID.AS("message_id"), r.RecipientPreferredLanguage.AS("target_lang"), postgres.MAX(r.TranslationAttempts).AS("attempts"), ). FROM(r). WHERE( r.AvailableAt.IS_NULL(). AND(r.RecipientPreferredLanguage.NOT_EQ(postgres.String(""))). AND(r.NextTranslationAttemptAt.IS_NULL(). OR(r.NextTranslationAttemptAt.LT_EQ(postgres.TimestampzT(now.UTC())))), ). GROUP_BY(r.MessageID, r.RecipientPreferredLanguage). ORDER_BY(r.MessageID.ASC(), r.RecipientPreferredLanguage.ASC()). LIMIT(1) var dest struct { MessageID uuid.UUID `alias:"message_id"` TargetLang string `alias:"target_lang"` Attempts int32 `alias:"attempts"` } if err := stmt.QueryContext(ctx, s.db, &dest); err != nil { if errors.Is(err, qrm.ErrNoRows) { return PendingTranslationPair{}, false, nil } return PendingTranslationPair{}, false, fmt.Errorf("diplomail store: pick pending pair: %w", err) } if dest.MessageID == (uuid.UUID{}) { return PendingTranslationPair{}, false, nil } return PendingTranslationPair{ MessageID: dest.MessageID, TargetLang: dest.TargetLang, CurrentAttempts: dest.Attempts, }, true, nil } // MarkPairDelivered flips every still-pending recipient of (messageID, // targetLang) to `available_at = at`, optionally persisting the // translation row alongside in the same transaction. Returns the // recipients that were just delivered (used by the worker to fan out // push events). func (s *Store) MarkPairDelivered(ctx context.Context, messageID uuid.UUID, targetLang string, translation *Translation, at time.Time) ([]Recipient, error) { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return nil, fmt.Errorf("diplomail store: begin deliver tx: %w", err) } defer func() { _ = tx.Rollback() }() if translation != nil { t := table.DiplomailTranslations ins := t.INSERT( t.TranslationID, t.MessageID, t.TargetLang, t.TranslatedSubject, t.TranslatedBody, t.Translator, ).VALUES( translation.TranslationID, translation.MessageID, translation.TargetLang, translation.TranslatedSubject, translation.TranslatedBody, translation.Translator, ).ON_CONFLICT(t.MessageID, t.TargetLang).DO_NOTHING() if _, err := ins.ExecContext(ctx, tx); err != nil { return nil, fmt.Errorf("diplomail store: upsert translation: %w", err) } } r := table.DiplomailRecipients upd := r.UPDATE(r.AvailableAt, r.NextTranslationAttemptAt). SET(postgres.TimestampzT(at.UTC()), postgres.NULL). WHERE( r.MessageID.EQ(postgres.UUID(messageID)). AND(r.RecipientPreferredLanguage.EQ(postgres.String(targetLang))). AND(r.AvailableAt.IS_NULL()), ). RETURNING(recipientColumns()) var rows []model.DiplomailRecipients if err := upd.QueryContext(ctx, tx, &rows); err != nil { return nil, fmt.Errorf("diplomail store: mark pair delivered: %w", err) } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("diplomail store: commit deliver: %w", err) } out := make([]Recipient, 0, len(rows)) for _, row := range rows { out = append(out, recipientFromModel(row)) } return out, nil } // SchedulePairRetry bumps the attempt counter and schedules the next // translation attempt for `next`. The recipient rows stay in the // pending queue (`available_at IS NULL`). Returns the new attempt // counter so the worker can decide whether to fall back to the // original on the next pickup. func (s *Store) SchedulePairRetry(ctx context.Context, messageID uuid.UUID, targetLang string, next time.Time) (int32, error) { r := table.DiplomailRecipients upd := r.UPDATE(r.TranslationAttempts, r.NextTranslationAttemptAt). SET(r.TranslationAttempts.ADD(postgres.Int(1)), postgres.TimestampzT(next.UTC())). WHERE( r.MessageID.EQ(postgres.UUID(messageID)). AND(r.RecipientPreferredLanguage.EQ(postgres.String(targetLang))). AND(r.AvailableAt.IS_NULL()), ). RETURNING(r.TranslationAttempts) var dest []struct { TranslationAttempts int32 `alias:"diplomail_recipients.translation_attempts"` } if err := upd.QueryContext(ctx, s.db, &dest); err != nil { return 0, fmt.Errorf("diplomail store: schedule pair retry: %w", err) } if len(dest) == 0 { return 0, nil } max := dest[0].TranslationAttempts for _, d := range dest[1:] { if d.TranslationAttempts > max { max = d.TranslationAttempts } } return max, nil } // translationColumns is the canonical projection for // diplomail_translations reads. func translationColumns() postgres.ColumnList { t := table.DiplomailTranslations return postgres.ColumnList{ t.TranslationID, t.MessageID, t.TargetLang, t.TranslatedSubject, t.TranslatedBody, t.Translator, t.TranslatedAt, } } // LoadTranslation returns the cached translation row for // (messageID, targetLang). Returns ErrNotFound when no cache row // exists yet — the caller decides whether to compute and persist // one. func (s *Store) LoadTranslation(ctx context.Context, messageID uuid.UUID, targetLang string) (Translation, error) { t := table.DiplomailTranslations stmt := postgres.SELECT(translationColumns()). FROM(t). WHERE(t.MessageID.EQ(postgres.UUID(messageID)). AND(t.TargetLang.EQ(postgres.String(targetLang)))). LIMIT(1) var row model.DiplomailTranslations if err := stmt.QueryContext(ctx, s.db, &row); err != nil { if errors.Is(err, qrm.ErrNoRows) { return Translation{}, ErrNotFound } return Translation{}, fmt.Errorf("diplomail store: load translation %s/%s: %w", messageID, targetLang, err) } return translationFromModel(row), nil } // InsertTranslation persists a new translation cache row. The unique // constraint on (message_id, target_lang) prevents duplicate // renderings. Callers that race on the same (message, lang) pair // should be prepared for a UNIQUE violation; the second writer can // fall back to LoadTranslation. func (s *Store) InsertTranslation(ctx context.Context, in Translation) (Translation, error) { t := table.DiplomailTranslations stmt := t.INSERT( t.TranslationID, t.MessageID, t.TargetLang, t.TranslatedSubject, t.TranslatedBody, t.Translator, ).VALUES( in.TranslationID, in.MessageID, in.TargetLang, in.TranslatedSubject, in.TranslatedBody, in.Translator, ).RETURNING(translationColumns()) var row model.DiplomailTranslations if err := stmt.QueryContext(ctx, s.db, &row); err != nil { return Translation{}, fmt.Errorf("diplomail store: insert translation %s/%s: %w", in.MessageID, in.TargetLang, err) } return translationFromModel(row), nil } func translationFromModel(row model.DiplomailTranslations) Translation { return Translation{ TranslationID: row.TranslationID, MessageID: row.MessageID, TargetLang: row.TargetLang, TranslatedSubject: row.TranslatedSubject, TranslatedBody: row.TranslatedBody, Translator: row.Translator, TranslatedAt: row.TranslatedAt, } } // DeleteMessagesForGames removes every diplomail_messages row whose // game_id falls in the supplied set. The cascade defined on the // `diplomail_recipients` and `diplomail_translations` foreign keys // removes the per-recipient state and the cached translations in // the same transaction. Returns the count of messages removed. // // Used by the admin bulk-purge endpoint; callers are expected to // have already filtered the input set to terminal-state games. func (s *Store) DeleteMessagesForGames(ctx context.Context, gameIDs []uuid.UUID) (int, error) { if len(gameIDs) == 0 { return 0, nil } args := make([]postgres.Expression, 0, len(gameIDs)) for _, id := range gameIDs { args = append(args, postgres.UUID(id)) } m := table.DiplomailMessages stmt := m.DELETE().WHERE(m.GameID.IN(args...)) res, err := stmt.ExecContext(ctx, s.db) if err != nil { return 0, fmt.Errorf("diplomail store: bulk delete messages: %w", err) } affected, err := res.RowsAffected() if err != nil { return 0, fmt.Errorf("diplomail store: rows affected: %w", err) } return int(affected), nil } // ListMessagesForAdmin returns a paginated slice of messages // matching filter. The result is ordered by created_at DESC, // message_id DESC. Total is the count without pagination so the // caller can render a "page X of N" envelope. func (s *Store) ListMessagesForAdmin(ctx context.Context, filter AdminMessageListing) ([]Message, int, error) { m := table.DiplomailMessages page := filter.Page if page < 1 { page = 1 } pageSize := filter.PageSize if pageSize < 1 { pageSize = 50 } conditions := postgres.BoolExpression(nil) addCondition := func(cond postgres.BoolExpression) { if conditions == nil { conditions = cond return } conditions = conditions.AND(cond) } if filter.GameID != nil { addCondition(m.GameID.EQ(postgres.UUID(*filter.GameID))) } if filter.Kind != "" { addCondition(m.Kind.EQ(postgres.String(filter.Kind))) } if filter.SenderKind != "" { addCondition(m.SenderKind.EQ(postgres.String(filter.SenderKind))) } countStmt := postgres.SELECT(postgres.COUNT(postgres.STAR).AS("count")).FROM(m) if conditions != nil { countStmt = countStmt.WHERE(conditions) } var countDest struct { Count int64 `alias:"count"` } if err := countStmt.QueryContext(ctx, s.db, &countDest); err != nil { return nil, 0, fmt.Errorf("diplomail store: count admin messages: %w", err) } listStmt := postgres.SELECT(messageColumns()).FROM(m) if conditions != nil { listStmt = listStmt.WHERE(conditions) } listStmt = listStmt. ORDER_BY(m.CreatedAt.DESC(), m.MessageID.DESC()). LIMIT(int64(pageSize)). OFFSET(int64((page - 1) * pageSize)) var rows []model.DiplomailMessages if err := listStmt.QueryContext(ctx, s.db, &rows); err != nil { return nil, 0, fmt.Errorf("diplomail store: list admin messages: %w", err) } out := make([]Message, 0, len(rows)) for _, row := range rows { out = append(out, messageFromModel(row)) } return out, int(countDest.Count), nil } // UnreadCountsForUser returns a per-game breakdown of unread messages // addressed to userID, plus the matching game names so the lobby // badge UI can render entries even after the recipient's membership // has been revoked. The slice is ordered by game name. func (s *Store) UnreadCountsForUser(ctx context.Context, userID uuid.UUID) ([]UnreadCount, error) { r := table.DiplomailRecipients m := table.DiplomailMessages stmt := postgres.SELECT( r.GameID.AS("game_id"), postgres.MAX(m.GameName).AS("game_name"), postgres.COUNT(postgres.STAR).AS("count"), ). FROM(r.INNER_JOIN(m, m.MessageID.EQ(r.MessageID))). WHERE( r.UserID.EQ(postgres.UUID(userID)). AND(r.ReadAt.IS_NULL()). AND(r.DeletedAt.IS_NULL()). AND(r.AvailableAt.IS_NOT_NULL()), ). GROUP_BY(r.GameID). ORDER_BY(postgres.MAX(m.GameName).ASC()) var dest []struct { GameID uuid.UUID `alias:"game_id"` GameName string `alias:"game_name"` Count int64 `alias:"count"` } if err := stmt.QueryContext(ctx, s.db, &dest); err != nil { return nil, fmt.Errorf("diplomail store: unread counts %s: %w", userID, err) } out := make([]UnreadCount, 0, len(dest)) for _, row := range dest { out = append(out, UnreadCount{ GameID: row.GameID, GameName: row.GameName, Unread: int(row.Count), }) } return out, nil } // messageFromModel converts a jet-generated row to the domain type. func messageFromModel(row model.DiplomailMessages) Message { out := Message{ MessageID: row.MessageID, GameID: row.GameID, GameName: row.GameName, Kind: row.Kind, SenderKind: row.SenderKind, SenderIP: row.SenderIP, Subject: row.Subject, Body: row.Body, BodyLang: row.BodyLang, BroadcastScope: row.BroadcastScope, CreatedAt: row.CreatedAt, } if row.SenderUserID != nil { id := *row.SenderUserID out.SenderUserID = &id } if row.SenderUsername != nil { name := *row.SenderUsername out.SenderUsername = &name } return out } // recipientFromModel converts a jet-generated row to the domain type. func recipientFromModel(row model.DiplomailRecipients) Recipient { out := Recipient{ RecipientID: row.RecipientID, MessageID: row.MessageID, GameID: row.GameID, UserID: row.UserID, RecipientUserName: row.RecipientUserName, RecipientPreferredLanguage: row.RecipientPreferredLanguage, AvailableAt: row.AvailableAt, TranslationAttempts: row.TranslationAttempts, NextTranslationAttemptAt: row.NextTranslationAttemptAt, DeliveredAt: row.DeliveredAt, ReadAt: row.ReadAt, DeletedAt: row.DeletedAt, NotifiedAt: row.NotifiedAt, } if row.RecipientRaceName != nil { name := *row.RecipientRaceName out.RecipientRaceName = &name } return out } // recipientsFromModel converts a slice in place. Used by // InsertMessageWithRecipients. func recipientsFromModel(rows []model.DiplomailRecipients) []Recipient { out := make([]Recipient, 0, len(rows)) for _, row := range rows { out = append(out, recipientFromModel(row)) } return out } // uuidPtrArg returns the jet argument expression for a nullable UUID. // Pre-NULL handling here avoids a custom NULL literal at every call // site. func uuidPtrArg(v *uuid.UUID) postgres.Expression { if v == nil { return postgres.NULL } return postgres.UUID(*v) } // stringPtrArg returns the jet argument expression for a nullable // text column. func stringPtrArg(v *string) postgres.Expression { if v == nil { return postgres.NULL } return postgres.String(*v) } // timePtrArg returns the jet argument expression for a nullable // timestamptz column. func timePtrArg(v *time.Time) postgres.Expression { if v == nil { return postgres.NULL } return postgres.TimestampzT(v.UTC()) }