Files
galaxy-game/backend/internal/user/store.go
T
2026-05-07 00:58:53 +03:00

817 lines
26 KiB
Go

package user
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"
"galaxy/backend/internal/postgres/jet/backend/model"
"galaxy/backend/internal/postgres/jet/backend/table"
"github.com/go-jet/jet/v2/postgres"
"github.com/go-jet/jet/v2/qrm"
"github.com/google/uuid"
)
// Store is the Postgres-backed query surface for the user package.
// All queries are built through go-jet against the generated table
// bindings under `backend/internal/postgres/jet/backend/table`.
type Store struct {
db *sql.DB
}
// NewStore constructs a Store wrapping db.
func NewStore(db *sql.DB) *Store {
return &Store{db: db}
}
// AccountRow mirrors a row in `backend.accounts` with the specific
// projection the user-package read paths need. It is not a full
// representation of the table; column subsets like the audit trail are
// folded into Account by the Service layer.
type AccountRow struct {
UserID uuid.UUID
Email string
UserName string
DisplayName string
PreferredLanguage string
TimeZone string
DeclaredCountry string
PermanentBlock bool
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt *time.Time
}
// accountInsert is the parameter struct for InsertAccountWithSnapshot.
type accountInsert struct {
UserID uuid.UUID
Email string
UserName string
PreferredLanguage string
TimeZone string
DeclaredCountry string
}
// settingsPatch carries the optional settings columns supplied by an
// `UpdateSettingsInput`. Nil pointers mean "leave the column alone".
type settingsPatch struct {
PreferredLanguage *string
TimeZone *string
}
func (p settingsPatch) empty() bool {
return p.PreferredLanguage == nil && p.TimeZone == nil
}
// sanctionInsert is the parameter struct for ApplySanctionTx.
type sanctionInsert struct {
UserID uuid.UUID
SanctionCode string
Scope string
ReasonCode string
Actor ActorRef
AppliedAt time.Time
ExpiresAt *time.Time
FlipPermanent bool
}
// limitInsert is the parameter struct for ApplyLimitTx.
type limitInsert struct {
UserID uuid.UUID
LimitCode string
Value int32
ReasonCode string
Actor ActorRef
AppliedAt time.Time
ExpiresAt *time.Time
}
// errEmailRace is a sentinel returned by InsertAccountWithSnapshot when
// the ON CONFLICT (email) DO NOTHING branch fires. The caller looks up
// the existing user_id and returns it instead.
var errEmailRace = errors.New("user store: email already exists")
// accountColumns is the canonical projection used by every read of
// `backend.accounts`. Centralised so the model-row → AccountRow
// converter stays in sync with the SELECT order.
func accountColumns() postgres.ColumnList {
a := table.Accounts
return postgres.ColumnList{
a.UserID, a.Email, a.UserName, a.DisplayName,
a.PreferredLanguage, a.TimeZone, a.DeclaredCountry, a.PermanentBlock,
a.CreatedAt, a.UpdatedAt, a.DeletedAt,
}
}
// snapshotColumns is the canonical projection used by every read of
// `backend.entitlement_snapshots`.
func snapshotColumns() postgres.ColumnList {
s := table.EntitlementSnapshots
return postgres.ColumnList{
s.UserID, s.Tier, s.IsPaid, s.Source,
s.ActorType, s.ActorUserID, s.ActorUsername,
s.ReasonCode, s.StartsAt, s.EndsAt, s.MaxRegisteredRaceNames, s.UpdatedAt,
}
}
// LookupAccountIDByEmail returns the user_id of the live account for
// email. The boolean reports whether a row was found. Soft-deleted
// rows are skipped.
func (s *Store) LookupAccountIDByEmail(ctx context.Context, email string) (uuid.UUID, bool, error) {
stmt := postgres.SELECT(table.Accounts.UserID).
FROM(table.Accounts).
WHERE(
table.Accounts.Email.EQ(postgres.String(email)).
AND(table.Accounts.DeletedAt.IS_NULL()),
).
LIMIT(1)
var row model.Accounts
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return uuid.Nil, false, nil
}
return uuid.Nil, false, err
}
return row.UserID, true, nil
}
// LookupAccount returns the AccountRow projection for userID. Soft-deleted
// rows are excluded; returns ErrAccountNotFound when no live row exists.
func (s *Store) LookupAccount(ctx context.Context, userID uuid.UUID) (AccountRow, error) {
stmt := postgres.SELECT(accountColumns()).
FROM(table.Accounts).
WHERE(
table.Accounts.UserID.EQ(postgres.UUID(userID)).
AND(table.Accounts.DeletedAt.IS_NULL()),
).
LIMIT(1)
var row model.Accounts
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return AccountRow{}, ErrAccountNotFound
}
return AccountRow{}, fmt.Errorf("user store: scan account: %w", err)
}
return modelToAccountRow(row), nil
}
// ListAccountRows returns the requested page of live accounts together
// with the total live-row count for pagination.
func (s *Store) ListAccountRows(ctx context.Context, page, pageSize int) ([]AccountRow, int, error) {
a := table.Accounts
totalStmt := postgres.SELECT(postgres.COUNT(postgres.STAR).AS("count")).
FROM(a).
WHERE(a.DeletedAt.IS_NULL())
var totalDest struct {
Count int64 `alias:"count"`
}
if err := totalStmt.QueryContext(ctx, s.db, &totalDest); err != nil {
return nil, 0, fmt.Errorf("user store: count accounts: %w", err)
}
offset := (page - 1) * pageSize
listStmt := postgres.SELECT(accountColumns()).
FROM(a).
WHERE(a.DeletedAt.IS_NULL()).
ORDER_BY(a.CreatedAt.DESC(), a.UserID.DESC()).
LIMIT(int64(pageSize)).OFFSET(int64(offset))
var rows []model.Accounts
if err := listStmt.QueryContext(ctx, s.db, &rows); err != nil {
return nil, 0, fmt.Errorf("user store: list accounts: %w", err)
}
out := make([]AccountRow, 0, len(rows))
for _, row := range rows {
out = append(out, modelToAccountRow(row))
}
return out, int(totalDest.Count), nil
}
// InsertAccountWithSnapshot persists a brand-new accounts row and the
// matching default entitlement snapshot in one transaction. On
// ON CONFLICT (email) DO NOTHING it returns errEmailRace so the caller
// can recover the existing user_id; on user_name UNIQUE violation it
// returns the underlying pgconn error so the caller can retry the
// suffix.
func (s *Store) InsertAccountWithSnapshot(ctx context.Context, account accountInsert, snapshot EntitlementSnapshot) (uuid.UUID, error) {
var declaredCountryArg postgres.Expression = postgres.StringExp(postgres.NULL)
if account.DeclaredCountry != "" {
declaredCountryArg = postgres.String(account.DeclaredCountry)
}
var insertedID uuid.UUID
err := withTx(ctx, s.db, func(tx *sql.Tx) error {
insertStmt := table.Accounts.INSERT(
table.Accounts.UserID, table.Accounts.Email, table.Accounts.UserName,
table.Accounts.PreferredLanguage, table.Accounts.TimeZone, table.Accounts.DeclaredCountry,
).VALUES(
account.UserID, account.Email, account.UserName,
account.PreferredLanguage, account.TimeZone, declaredCountryArg,
).
ON_CONFLICT(table.Accounts.Email).DO_NOTHING().
RETURNING(table.Accounts.UserID)
var inserted model.Accounts
if err := insertStmt.QueryContext(ctx, tx, &inserted); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return errEmailRace
}
return err
}
insertedID = inserted.UserID
return insertSnapshotTx(ctx, tx, snapshot)
})
if err != nil {
return uuid.Nil, err
}
return insertedID, nil
}
// LookupEntitlementSnapshot loads the snapshot row for userID. Returns
// ErrAccountNotFound when no row exists (a fresh account without a
// snapshot is treated as "account not found" — the bootstrap path
// always inserts the default snapshot).
func (s *Store) LookupEntitlementSnapshot(ctx context.Context, userID uuid.UUID) (EntitlementSnapshot, error) {
stmt := postgres.SELECT(snapshotColumns()).
FROM(table.EntitlementSnapshots).
WHERE(table.EntitlementSnapshots.UserID.EQ(postgres.UUID(userID))).
LIMIT(1)
var row model.EntitlementSnapshots
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return EntitlementSnapshot{}, ErrAccountNotFound
}
return EntitlementSnapshot{}, fmt.Errorf("user store: lookup snapshot for %s: %w", userID, err)
}
return modelToSnapshot(row), nil
}
// ListEntitlementSnapshots loads every snapshot row. Cache.Warm calls
// this at process boot.
func (s *Store) ListEntitlementSnapshots(ctx context.Context) ([]EntitlementSnapshot, error) {
stmt := postgres.SELECT(snapshotColumns()).FROM(table.EntitlementSnapshots)
var rows []model.EntitlementSnapshots
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
return nil, fmt.Errorf("user store: list snapshots: %w", err)
}
out := make([]EntitlementSnapshot, 0, len(rows))
for _, row := range rows {
out = append(out, modelToSnapshot(row))
}
return out, nil
}
// ListActiveSanctions returns the active sanctions for userID joined
// with the audit columns from the underlying records row. Order is
// applied_at DESC so the most recent sanction surfaces first.
func (s *Store) ListActiveSanctions(ctx context.Context, userID uuid.UUID) ([]ActiveSanction, error) {
a := table.SanctionActive
r := table.SanctionRecords
stmt := postgres.SELECT(
r.SanctionCode, r.Scope, r.ReasonCode,
r.ActorType, r.ActorUserID, r.ActorUsername,
r.AppliedAt, r.ExpiresAt,
).
FROM(a.INNER_JOIN(r, r.RecordID.EQ(a.RecordID))).
WHERE(a.UserID.EQ(postgres.UUID(userID))).
ORDER_BY(r.AppliedAt.DESC())
var rows []model.SanctionRecords
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
return nil, fmt.Errorf("user store: list active sanctions: %w", err)
}
out := make([]ActiveSanction, 0, len(rows))
for _, row := range rows {
entry := ActiveSanction{
SanctionCode: row.SanctionCode,
Scope: row.Scope,
ReasonCode: row.ReasonCode,
Actor: actorFromColumns(row.ActorType, row.ActorUserID, row.ActorUsername),
AppliedAt: row.AppliedAt,
}
if row.ExpiresAt != nil {
t := *row.ExpiresAt
entry.ExpiresAt = &t
}
out = append(out, entry)
}
return out, nil
}
// ListActiveLimits returns the active limits for userID joined with
// the audit columns from the underlying records row.
func (s *Store) ListActiveLimits(ctx context.Context, userID uuid.UUID) ([]ActiveLimit, error) {
a := table.LimitActive
r := table.LimitRecords
stmt := postgres.SELECT(
r.LimitCode, a.Value, r.ReasonCode,
r.ActorType, r.ActorUserID, r.ActorUsername,
r.AppliedAt, r.ExpiresAt,
).
FROM(a.INNER_JOIN(r, r.RecordID.EQ(a.RecordID))).
WHERE(a.UserID.EQ(postgres.UUID(userID))).
ORDER_BY(r.AppliedAt.DESC())
var rows []struct {
LimitRecords model.LimitRecords
LimitActive model.LimitActive
}
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
return nil, fmt.Errorf("user store: list active limits: %w", err)
}
out := make([]ActiveLimit, 0, len(rows))
for _, row := range rows {
entry := ActiveLimit{
LimitCode: row.LimitRecords.LimitCode,
Value: row.LimitActive.Value,
ReasonCode: row.LimitRecords.ReasonCode,
Actor: actorFromColumns(row.LimitRecords.ActorType, row.LimitRecords.ActorUserID, row.LimitRecords.ActorUsername),
AppliedAt: row.LimitRecords.AppliedAt,
}
if row.LimitRecords.ExpiresAt != nil {
t := *row.LimitRecords.ExpiresAt
entry.ExpiresAt = &t
}
out = append(out, entry)
}
return out, nil
}
// UpdateAccountDisplayName patches accounts.display_name and bumps
// updated_at. Returns ErrAccountNotFound when no live row matches.
func (s *Store) UpdateAccountDisplayName(ctx context.Context, userID uuid.UUID, displayName string, now time.Time) error {
a := table.Accounts
stmt := a.UPDATE(a.DisplayName, a.UpdatedAt).
SET(displayName, now).
WHERE(
a.UserID.EQ(postgres.UUID(userID)).
AND(a.DeletedAt.IS_NULL()),
)
res, err := stmt.ExecContext(ctx, s.db)
if err != nil {
return fmt.Errorf("user store: update display_name: %w", err)
}
return rowsAffectedOrNotFound(res)
}
// UpdateAccountSettings patches the supplied settings columns and bumps
// updated_at. Empty patches are a precondition error from the caller.
func (s *Store) UpdateAccountSettings(ctx context.Context, userID uuid.UUID, patch settingsPatch, now time.Time) error {
if patch.empty() {
return fmt.Errorf("user store: update settings: empty patch")
}
a := table.Accounts
rest := make([]any, 0, 2)
if patch.PreferredLanguage != nil {
rest = append(rest, a.PreferredLanguage.SET(postgres.String(*patch.PreferredLanguage)))
}
if patch.TimeZone != nil {
rest = append(rest, a.TimeZone.SET(postgres.String(*patch.TimeZone)))
}
stmt := a.UPDATE().
SET(a.UpdatedAt.SET(postgres.TimestampzT(now)), rest...).
WHERE(
a.UserID.EQ(postgres.UUID(userID)).
AND(a.DeletedAt.IS_NULL()),
)
res, err := stmt.ExecContext(ctx, s.db)
if err != nil {
return fmt.Errorf("user store: update settings: %w", err)
}
return rowsAffectedOrNotFound(res)
}
// ApplyEntitlementTx persists a fresh entitlement_records row and
// upserts the matching entitlement_snapshots row in one transaction.
// Returns the persisted snapshot exactly as stored (created_at is the
// input UpdatedAt, etc.).
func (s *Store) ApplyEntitlementTx(ctx context.Context, snap EntitlementSnapshot) (EntitlementSnapshot, error) {
if err := s.assertAccountLive(ctx, snap.UserID); err != nil {
return EntitlementSnapshot{}, err
}
actorUserID, actorUsername, err := actorToColumnArgs(snap.Actor)
if err != nil {
return EntitlementSnapshot{}, err
}
err = withTx(ctx, s.db, func(tx *sql.Tx) error {
recordID := uuid.New()
var endsAt any
if snap.EndsAt != nil {
endsAt = *snap.EndsAt
}
recordStmt := table.EntitlementRecords.INSERT(
table.EntitlementRecords.RecordID,
table.EntitlementRecords.UserID,
table.EntitlementRecords.Tier,
table.EntitlementRecords.IsPaid,
table.EntitlementRecords.Source,
table.EntitlementRecords.ActorType,
table.EntitlementRecords.ActorUserID,
table.EntitlementRecords.ActorUsername,
table.EntitlementRecords.ReasonCode,
table.EntitlementRecords.StartsAt,
table.EntitlementRecords.EndsAt,
table.EntitlementRecords.CreatedAt,
).VALUES(
recordID, snap.UserID, snap.Tier, snap.IsPaid, snap.Source,
snap.Actor.Type, actorUserID, actorUsername, snap.ReasonCode,
snap.StartsAt, endsAt, snap.UpdatedAt,
)
if _, err := recordStmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("insert entitlement record: %w", err)
}
return upsertSnapshotTx(ctx, tx, snap, actorUserID, actorUsername)
})
if err != nil {
return EntitlementSnapshot{}, err
}
return snap, nil
}
// ApplySanctionTx persists a fresh sanction_records row, upserts
// sanction_active, and (when alsoFlipPermanent is set) flips
// accounts.permanent_block to true — all in one transaction.
func (s *Store) ApplySanctionTx(ctx context.Context, input sanctionInsert) error {
if err := s.assertAccountLive(ctx, input.UserID); err != nil {
return err
}
actorUserID, actorUsername, err := actorToColumnArgs(input.Actor)
if err != nil {
return err
}
return withTx(ctx, s.db, func(tx *sql.Tx) error {
recordID := uuid.New()
var expiresAt any
if input.ExpiresAt != nil {
expiresAt = *input.ExpiresAt
}
recordStmt := table.SanctionRecords.INSERT(
table.SanctionRecords.RecordID,
table.SanctionRecords.UserID,
table.SanctionRecords.SanctionCode,
table.SanctionRecords.Scope,
table.SanctionRecords.ReasonCode,
table.SanctionRecords.ActorType,
table.SanctionRecords.ActorUserID,
table.SanctionRecords.ActorUsername,
table.SanctionRecords.AppliedAt,
table.SanctionRecords.ExpiresAt,
).VALUES(
recordID, input.UserID, input.SanctionCode, input.Scope, input.ReasonCode,
input.Actor.Type, actorUserID, actorUsername, input.AppliedAt, expiresAt,
)
if _, err := recordStmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("insert sanction record: %w", err)
}
sa := table.SanctionActive
activeStmt := sa.INSERT(sa.UserID, sa.SanctionCode, sa.RecordID).
VALUES(input.UserID, input.SanctionCode, recordID).
ON_CONFLICT(sa.UserID, sa.SanctionCode).
DO_UPDATE(postgres.SET(
sa.RecordID.SET(sa.EXCLUDED.RecordID),
))
if _, err := activeStmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("upsert sanction_active: %w", err)
}
if input.FlipPermanent {
a := table.Accounts
permStmt := a.UPDATE().
SET(
a.PermanentBlock.SET(postgres.Bool(true)),
a.UpdatedAt.SET(postgres.TimestampzT(input.AppliedAt)),
).
WHERE(
a.UserID.EQ(postgres.UUID(input.UserID)).
AND(a.DeletedAt.IS_NULL()),
)
if _, err := permStmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("flip permanent_block: %w", err)
}
}
return nil
})
}
// ApplyLimitTx persists a fresh limit_records row and upserts
// limit_active in one transaction.
func (s *Store) ApplyLimitTx(ctx context.Context, input limitInsert) error {
if err := s.assertAccountLive(ctx, input.UserID); err != nil {
return err
}
actorUserID, actorUsername, err := actorToColumnArgs(input.Actor)
if err != nil {
return err
}
return withTx(ctx, s.db, func(tx *sql.Tx) error {
recordID := uuid.New()
var expiresAt any
if input.ExpiresAt != nil {
expiresAt = *input.ExpiresAt
}
recordStmt := table.LimitRecords.INSERT(
table.LimitRecords.RecordID,
table.LimitRecords.UserID,
table.LimitRecords.LimitCode,
table.LimitRecords.Value,
table.LimitRecords.ReasonCode,
table.LimitRecords.ActorType,
table.LimitRecords.ActorUserID,
table.LimitRecords.ActorUsername,
table.LimitRecords.AppliedAt,
table.LimitRecords.ExpiresAt,
).VALUES(
recordID, input.UserID, input.LimitCode, input.Value, input.ReasonCode,
input.Actor.Type, actorUserID, actorUsername, input.AppliedAt, expiresAt,
)
if _, err := recordStmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("insert limit record: %w", err)
}
la := table.LimitActive
activeStmt := la.INSERT(la.UserID, la.LimitCode, la.RecordID, la.Value).
VALUES(input.UserID, input.LimitCode, recordID, input.Value).
ON_CONFLICT(la.UserID, la.LimitCode).
DO_UPDATE(postgres.SET(
la.RecordID.SET(la.EXCLUDED.RecordID),
la.Value.SET(la.EXCLUDED.Value),
))
if _, err := activeStmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("upsert limit_active: %w", err)
}
return nil
})
}
// SoftDeleteAccount marks the account soft-deleted with the supplied
// actor trail. The boolean reports whether the row actually changed
// (true for a fresh delete; false when the row was already
// soft-deleted or does not exist). The caller distinguishes "already
// gone" from "never existed" by reading the row separately when it
// matters; for the cascade orchestration "no change" is treated as a
// successful idempotent operation.
func (s *Store) SoftDeleteAccount(ctx context.Context, userID uuid.UUID, actor ActorRef, now time.Time) (bool, error) {
a := table.Accounts
actorUserIDExpr, actorUsernameExpr, err := actorToColumnExprs(actor)
if err != nil {
return false, err
}
stmt := a.UPDATE().
SET(
a.DeletedAt.SET(postgres.TimestampzT(now)),
a.DeletedActorType.SET(postgres.String(actor.Type)),
a.DeletedActorUserID.SET(actorUserIDExpr),
a.DeletedActorUsername.SET(actorUsernameExpr),
a.UpdatedAt.SET(postgres.TimestampzT(now)),
).
WHERE(
a.UserID.EQ(postgres.UUID(userID)).
AND(a.DeletedAt.IS_NULL()),
)
res, err := stmt.ExecContext(ctx, s.db)
if err != nil {
return false, fmt.Errorf("user store: soft delete %s: %w", userID, err)
}
affected, err := res.RowsAffected()
if err != nil {
return false, fmt.Errorf("user store: soft delete rows-affected: %w", err)
}
return affected > 0, nil
}
// assertAccountLive returns ErrAccountNotFound when userID does not
// match a live accounts row. Used by the mutation paths to fail fast
// before opening a transaction.
func (s *Store) assertAccountLive(ctx context.Context, userID uuid.UUID) error {
a := table.Accounts
stmt := postgres.SELECT(a.UserID).
FROM(a).
WHERE(
a.UserID.EQ(postgres.UUID(userID)).
AND(a.DeletedAt.IS_NULL()),
).
LIMIT(1)
var row model.Accounts
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
if errors.Is(err, qrm.ErrNoRows) {
return ErrAccountNotFound
}
return fmt.Errorf("user store: account live-check: %w", err)
}
return nil
}
func insertSnapshotTx(ctx context.Context, tx *sql.Tx, snap EntitlementSnapshot) error {
actorUserID, actorUsername, err := actorToColumnArgs(snap.Actor)
if err != nil {
return err
}
es := table.EntitlementSnapshots
var endsAt any
if snap.EndsAt != nil {
endsAt = *snap.EndsAt
}
stmt := es.INSERT(
es.UserID, es.Tier, es.IsPaid, es.Source,
es.ActorType, es.ActorUserID, es.ActorUsername,
es.ReasonCode, es.StartsAt, es.EndsAt,
es.MaxRegisteredRaceNames, es.UpdatedAt,
).VALUES(
snap.UserID, snap.Tier, snap.IsPaid, snap.Source,
snap.Actor.Type, actorUserID, actorUsername,
snap.ReasonCode, snap.StartsAt, endsAt, snap.MaxRegisteredRaceNames, snap.UpdatedAt,
)
if _, err := stmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("insert entitlement_snapshots: %w", err)
}
return nil
}
func upsertSnapshotTx(ctx context.Context, tx *sql.Tx, snap EntitlementSnapshot, actorUserID, actorUsername any) error {
es := table.EntitlementSnapshots
var endsAt any
if snap.EndsAt != nil {
endsAt = *snap.EndsAt
}
stmt := es.INSERT(
es.UserID, es.Tier, es.IsPaid, es.Source,
es.ActorType, es.ActorUserID, es.ActorUsername,
es.ReasonCode, es.StartsAt, es.EndsAt,
es.MaxRegisteredRaceNames, es.UpdatedAt,
).VALUES(
snap.UserID, snap.Tier, snap.IsPaid, snap.Source,
snap.Actor.Type, actorUserID, actorUsername,
snap.ReasonCode, snap.StartsAt, endsAt, snap.MaxRegisteredRaceNames, snap.UpdatedAt,
).
ON_CONFLICT(es.UserID).
DO_UPDATE(postgres.SET(
es.Tier.SET(es.EXCLUDED.Tier),
es.IsPaid.SET(es.EXCLUDED.IsPaid),
es.Source.SET(es.EXCLUDED.Source),
es.ActorType.SET(es.EXCLUDED.ActorType),
es.ActorUserID.SET(es.EXCLUDED.ActorUserID),
es.ActorUsername.SET(es.EXCLUDED.ActorUsername),
es.ReasonCode.SET(es.EXCLUDED.ReasonCode),
es.StartsAt.SET(es.EXCLUDED.StartsAt),
es.EndsAt.SET(es.EXCLUDED.EndsAt),
es.MaxRegisteredRaceNames.SET(es.EXCLUDED.MaxRegisteredRaceNames),
es.UpdatedAt.SET(es.EXCLUDED.UpdatedAt),
))
if _, err := stmt.ExecContext(ctx, tx); err != nil {
return fmt.Errorf("upsert entitlement_snapshots: %w", err)
}
return nil
}
// modelToAccountRow projects a generated model row onto the public
// AccountRow struct. The DeclaredCountry field is collapsed from
// nullable to "" by the projection.
func modelToAccountRow(row model.Accounts) AccountRow {
out := AccountRow{
UserID: row.UserID,
Email: row.Email,
UserName: row.UserName,
DisplayName: row.DisplayName,
PreferredLanguage: row.PreferredLanguage,
TimeZone: row.TimeZone,
PermanentBlock: row.PermanentBlock,
CreatedAt: row.CreatedAt,
UpdatedAt: row.UpdatedAt,
}
if row.DeclaredCountry != nil {
out.DeclaredCountry = *row.DeclaredCountry
}
if row.DeletedAt != nil {
t := *row.DeletedAt
out.DeletedAt = &t
}
return out
}
// modelToSnapshot projects a generated model row onto the public
// EntitlementSnapshot struct.
func modelToSnapshot(row model.EntitlementSnapshots) EntitlementSnapshot {
out := EntitlementSnapshot{
UserID: row.UserID,
Tier: row.Tier,
IsPaid: row.IsPaid,
Source: row.Source,
Actor: actorFromColumns(row.ActorType, row.ActorUserID, row.ActorUsername),
ReasonCode: row.ReasonCode,
StartsAt: row.StartsAt,
MaxRegisteredRaceNames: row.MaxRegisteredRaceNames,
UpdatedAt: row.UpdatedAt,
}
if row.EndsAt != nil {
t := *row.EndsAt
out.EndsAt = &t
}
return out
}
// actorToColumnArgs converts an ActorRef into the (actor_user_id,
// actor_username) values for jet INSERT VALUES. A nil-typed `any` lands
// as SQL NULL through the database/sql driver. Type=="user" parses ID
// as a UUID; Type=="admin" stores ID verbatim as the username;
// everything else (system, unknown) writes both columns as NULL. An
// empty ID is allowed for "user" so synthetic system events that label
// themselves as "user" do not fail.
func actorToColumnArgs(actor ActorRef) (any, any, error) {
switch strings.TrimSpace(actor.Type) {
case "user":
id := strings.TrimSpace(actor.ID)
if id == "" {
return nil, nil, nil
}
uid, err := uuid.Parse(id)
if err != nil {
return nil, nil, fmt.Errorf("user store: actor id %q is not a uuid: %w", actor.ID, err)
}
return uid, nil, nil
case "admin":
if strings.TrimSpace(actor.ID) == "" {
return nil, nil, nil
}
return nil, actor.ID, nil
default:
return nil, nil, nil
}
}
// actorToColumnExprs is the typed-expression analogue of
// actorToColumnArgs for the UPDATE SET sites. jet's generated bindings
// type uuid columns as ColumnString (the dialect emits an explicit
// CAST), so both returned expressions are StringExpression.
func actorToColumnExprs(actor ActorRef) (postgres.StringExpression, postgres.StringExpression, error) {
uidArg, nameArg, err := actorToColumnArgs(actor)
if err != nil {
return nil, nil, err
}
uidExpr := postgres.StringExp(postgres.NULL)
if uid, ok := uidArg.(uuid.UUID); ok {
uidExpr = postgres.UUID(uid)
}
nameExpr := postgres.StringExp(postgres.NULL)
if name, ok := nameArg.(string); ok {
nameExpr = postgres.String(name)
}
return uidExpr, nameExpr, nil
}
// actorFromColumns reconstructs an ActorRef from the (actor_type,
// actor_user_id, actor_username) triple read from an audit row. The
// non-nil column wins; both nil yields an empty ID.
func actorFromColumns(actorType string, userID *uuid.UUID, username *string) ActorRef {
out := ActorRef{Type: actorType}
switch {
case userID != nil:
out.ID = userID.String()
case username != nil:
out.ID = *username
}
return out
}
// rowsAffectedOrNotFound returns ErrAccountNotFound when the UPDATE
// affected zero rows, nil otherwise. Used by the account-mutation paths
// that need fail-fast on a missing/soft-deleted target.
func rowsAffectedOrNotFound(res sql.Result) error {
if res == nil {
return nil
}
affected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("user store: rows affected: %w", err)
}
if affected == 0 {
return ErrAccountNotFound
}
return nil
}
// withTx wraps fn in a Postgres transaction. fn's return value
// determines commit (nil) vs rollback (non-nil). Rollback errors are
// swallowed when fn already returned an error, since the latter is
// more actionable.
func withTx(ctx context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("user store: begin tx: %w", err)
}
if err := fn(tx); err != nil {
_ = tx.Rollback()
return err
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("user store: commit tx: %w", err)
}
return nil
}