Files
galaxy-game/backend/internal/geo/counter.go
T
2026-05-06 10:14:55 +03:00

137 lines
4.8 KiB
Go

package geo
import (
"context"
"errors"
"fmt"
"time"
"galaxy/backend/internal/postgres/jet/backend/model"
"galaxy/backend/internal/postgres/jet/backend/table"
"github.com/go-jet/jet/v2/postgres"
"github.com/google/uuid"
"go.uber.org/zap"
)
// counterUpsertTimeout bounds the database call performed by a single
// fire-and-forget counter goroutine. The upsert is a single statement on
// a tiny table and should complete in well under a second; the timeout
// exists to keep one slow Postgres node from accumulating leaked
// goroutines under load.
const counterUpsertTimeout = 5 * time.Second
// CountryCounter is one row from `backend.user_country_counters` exposed
// to the admin surface (`GET /api/v1/admin/geo/users/{user_id}/countries`).
//
// Country is the uppercase ISO 3166-1 alpha-2 code stored alongside the
// running count. LastSeenAt is nullable on the table and therefore
// optional; the admin response surfaces null when it is unset.
type CountryCounter struct {
Country string
Count int64
LastSeenAt *time.Time
}
// IncrementCounterAsync upserts the per-country counter for userID as a
// fire-and-forget goroutine: the country lookup is performed
// synchronously (it is pure CPU plus an mmap read), then a goroutine
// runs the database upsert against the Service-internal background
// context. The caller never blocks on the database round-trip and never
// observes errors directly — failures are logged via the Service logger
// configured through SetLogger.
//
// Inputs that yield no useful data short-circuit without launching the
// goroutine: a nil receiver, a zero userID, an empty sourceIP, or a
// failed country lookup all return immediately. A Service whose
// background context has already been cancelled (typically because Drain
// or Close ran) also short-circuits — counters are not started during
// shutdown, but live ones are awaited by Drain.
//
// The ctx parameter is intentionally unused for the database call: the
// request-scoped context is cancelled the moment the response is
// flushed to the gateway, which would race with the upsert. The
// goroutine derives its context from the Service-internal one
// instead.
func (s *Service) IncrementCounterAsync(_ context.Context, userID uuid.UUID, sourceIP string) {
if s == nil || userID == uuid.Nil || sourceIP == "" {
return
}
if s.bgCtx == nil || s.bgCtx.Err() != nil {
return
}
country := s.LookupCountry(sourceIP)
if country == "" {
return
}
s.wg.Go(func() {
ctx, cancel := context.WithTimeout(s.bgCtx, counterUpsertTimeout)
defer cancel()
if err := s.upsertCounter(ctx, userID, country); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
s.logger.Warn("counter upsert failed",
zap.String("user_id", userID.String()),
zap.String("country", country),
zap.Error(err),
)
}
})
}
// upsertCounter executes the atomic INSERT...ON CONFLICT against
// `backend.user_country_counters`. The compound primary key
// `(user_id, country)` makes the upsert race-safe across concurrent
// goroutines.
func (s *Service) upsertCounter(ctx context.Context, userID uuid.UUID, country string) error {
ucc := table.UserCountryCounters
stmt := ucc.INSERT(ucc.UserID, ucc.Country, ucc.Count, ucc.LastSeenAt).
VALUES(userID, country, postgres.Int(1), postgres.NOW()).
ON_CONFLICT(ucc.UserID, ucc.Country).
DO_UPDATE(postgres.SET(
ucc.Count.SET(ucc.Count.ADD(postgres.Int(1))),
ucc.LastSeenAt.SET(postgres.TimestampzExp(postgres.NOW())),
))
if _, err := stmt.ExecContext(ctx, s.db); err != nil {
return fmt.Errorf("geo: upsert counter for %s/%s: %w", userID, country, err)
}
return nil
}
// ListUserCounters returns every per-country counter recorded for
// userID, ordered by country ASC. The list is empty (and the error is
// nil) when the user has no rows; ListUserCounters does not check that
// the user exists in `backend.accounts` because the admin surface gates
// existence through a separate listing endpoint.
func (s *Service) ListUserCounters(ctx context.Context, userID uuid.UUID) ([]CountryCounter, error) {
if s == nil {
return nil, errors.New("geo: nil service")
}
if userID == uuid.Nil {
return nil, errors.New("geo: nil user id")
}
ucc := table.UserCountryCounters
stmt := postgres.SELECT(ucc.Country, ucc.Count, ucc.LastSeenAt).
FROM(ucc).
WHERE(ucc.UserID.EQ(postgres.UUID(userID))).
ORDER_BY(ucc.Country.ASC())
var dest []model.UserCountryCounters
if err := stmt.QueryContext(ctx, s.db, &dest); err != nil {
return nil, fmt.Errorf("geo: list counters for %s: %w", userID, err)
}
out := make([]CountryCounter, 0, len(dest))
for _, row := range dest {
entry := CountryCounter{Country: row.Country, Count: row.Count}
if row.LastSeenAt != nil {
ts := row.LastSeenAt.UTC()
entry.LastSeenAt = &ts
}
out = append(out, entry)
}
return out, nil
}