package geo import ( "context" "errors" "fmt" "time" "galaxy/backend/internal/postgres/jet/backend/model" "galaxy/backend/internal/postgres/jet/backend/table" "github.com/go-jet/jet/v2/postgres" "github.com/google/uuid" "go.uber.org/zap" ) // counterUpsertTimeout bounds the database call performed by a single // fire-and-forget counter goroutine. The upsert is a single statement on // a tiny table and should complete in well under a second; the timeout // exists to keep one slow Postgres node from accumulating leaked // goroutines under load. const counterUpsertTimeout = 5 * time.Second // CountryCounter is one row from `backend.user_country_counters` exposed // to the admin surface (`GET /api/v1/admin/geo/users/{user_id}/countries`). // // Country is the uppercase ISO 3166-1 alpha-2 code stored alongside the // running count. LastSeenAt is nullable on the table and therefore // optional; the admin response surfaces null when it is unset. type CountryCounter struct { Country string Count int64 LastSeenAt *time.Time } // IncrementCounterAsync upserts the per-country counter for userID as a // fire-and-forget goroutine: the country lookup is performed // synchronously (it is pure CPU plus an mmap read), then a goroutine // runs the database upsert against the Service-internal background // context. The caller never blocks on the database round-trip and never // observes errors directly — failures are logged via the Service logger // configured through SetLogger. // // Inputs that yield no useful data short-circuit without launching the // goroutine: a nil receiver, a zero userID, an empty sourceIP, or a // failed country lookup all return immediately. A Service whose // background context has already been cancelled (typically because Drain // or Close ran) also short-circuits — counters are not started during // shutdown, but live ones are awaited by Drain. // // The ctx parameter is intentionally unused for the database call: the // request-scoped context is cancelled the moment the response is // flushed to the gateway, which would race with the upsert. The // goroutine derives its context from the Service-internal one // instead. func (s *Service) IncrementCounterAsync(_ context.Context, userID uuid.UUID, sourceIP string) { if s == nil || userID == uuid.Nil || sourceIP == "" { return } if s.bgCtx == nil || s.bgCtx.Err() != nil { return } country := s.LookupCountry(sourceIP) if country == "" { return } s.wg.Go(func() { ctx, cancel := context.WithTimeout(s.bgCtx, counterUpsertTimeout) defer cancel() if err := s.upsertCounter(ctx, userID, country); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } s.logger.Warn("counter upsert failed", zap.String("user_id", userID.String()), zap.String("country", country), zap.Error(err), ) } }) } // upsertCounter executes the atomic INSERT...ON CONFLICT against // `backend.user_country_counters`. The compound primary key // `(user_id, country)` makes the upsert race-safe across concurrent // goroutines. func (s *Service) upsertCounter(ctx context.Context, userID uuid.UUID, country string) error { ucc := table.UserCountryCounters stmt := ucc.INSERT(ucc.UserID, ucc.Country, ucc.Count, ucc.LastSeenAt). VALUES(userID, country, postgres.Int(1), postgres.NOW()). ON_CONFLICT(ucc.UserID, ucc.Country). DO_UPDATE(postgres.SET( ucc.Count.SET(ucc.Count.ADD(postgres.Int(1))), ucc.LastSeenAt.SET(postgres.TimestampzExp(postgres.NOW())), )) if _, err := stmt.ExecContext(ctx, s.db); err != nil { return fmt.Errorf("geo: upsert counter for %s/%s: %w", userID, country, err) } return nil } // ListUserCounters returns every per-country counter recorded for // userID, ordered by country ASC. The list is empty (and the error is // nil) when the user has no rows; ListUserCounters does not check that // the user exists in `backend.accounts` because the admin surface gates // existence through a separate listing endpoint. func (s *Service) ListUserCounters(ctx context.Context, userID uuid.UUID) ([]CountryCounter, error) { if s == nil { return nil, errors.New("geo: nil service") } if userID == uuid.Nil { return nil, errors.New("geo: nil user id") } ucc := table.UserCountryCounters stmt := postgres.SELECT(ucc.Country, ucc.Count, ucc.LastSeenAt). FROM(ucc). WHERE(ucc.UserID.EQ(postgres.UUID(userID))). ORDER_BY(ucc.Country.ASC()) var dest []model.UserCountryCounters if err := stmt.QueryContext(ctx, s.db, &dest); err != nil { return nil, fmt.Errorf("geo: list counters for %s: %w", userID, err) } out := make([]CountryCounter, 0, len(dest)) for _, row := range dest { entry := CountryCounter{Country: row.Country, Count: row.Count} if row.LastSeenAt != nil { ts := row.LastSeenAt.UTC() entry.LastSeenAt = &ts } out = append(out, entry) } return out, nil }