137 lines
4.8 KiB
Go
137 lines
4.8 KiB
Go
package geo
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"galaxy/backend/internal/postgres/jet/backend/model"
|
|
"galaxy/backend/internal/postgres/jet/backend/table"
|
|
|
|
"github.com/go-jet/jet/v2/postgres"
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// counterUpsertTimeout bounds the database call performed by a single
|
|
// fire-and-forget counter goroutine. The upsert is a single statement on
|
|
// a tiny table and should complete in well under a second; the timeout
|
|
// exists to keep one slow Postgres node from accumulating leaked
|
|
// goroutines under load.
|
|
const counterUpsertTimeout = 5 * time.Second
|
|
|
|
// CountryCounter is one row from `backend.user_country_counters` exposed
|
|
// to the admin surface (`GET /api/v1/admin/geo/users/{user_id}/countries`).
|
|
//
|
|
// Country is the uppercase ISO 3166-1 alpha-2 code stored alongside the
|
|
// running count. LastSeenAt is nullable on the table and therefore
|
|
// optional; the admin response surfaces null when it is unset.
|
|
type CountryCounter struct {
|
|
Country string
|
|
Count int64
|
|
LastSeenAt *time.Time
|
|
}
|
|
|
|
// IncrementCounterAsync upserts the per-country counter for userID as a
|
|
// fire-and-forget goroutine: the country lookup is performed
|
|
// synchronously (it is pure CPU plus an mmap read), then a goroutine
|
|
// runs the database upsert against the Service-internal background
|
|
// context. The caller never blocks on the database round-trip and never
|
|
// observes errors directly — failures are logged via the Service logger
|
|
// configured through SetLogger.
|
|
//
|
|
// Inputs that yield no useful data short-circuit without launching the
|
|
// goroutine: a nil receiver, a zero userID, an empty sourceIP, or a
|
|
// failed country lookup all return immediately. A Service whose
|
|
// background context has already been cancelled (typically because Drain
|
|
// or Close ran) also short-circuits — counters are not started during
|
|
// shutdown, but live ones are awaited by Drain.
|
|
//
|
|
// The ctx parameter is intentionally unused for the database call: the
|
|
// request-scoped context is cancelled the moment the response is
|
|
// flushed to the gateway, which would race with the upsert. The
|
|
// goroutine derives its context from the Service-internal one
|
|
// instead.
|
|
func (s *Service) IncrementCounterAsync(_ context.Context, userID uuid.UUID, sourceIP string) {
|
|
if s == nil || userID == uuid.Nil || sourceIP == "" {
|
|
return
|
|
}
|
|
if s.bgCtx == nil || s.bgCtx.Err() != nil {
|
|
return
|
|
}
|
|
country := s.LookupCountry(sourceIP)
|
|
if country == "" {
|
|
return
|
|
}
|
|
|
|
s.wg.Go(func() {
|
|
ctx, cancel := context.WithTimeout(s.bgCtx, counterUpsertTimeout)
|
|
defer cancel()
|
|
|
|
if err := s.upsertCounter(ctx, userID, country); err != nil {
|
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
|
return
|
|
}
|
|
s.logger.Warn("counter upsert failed",
|
|
zap.String("user_id", userID.String()),
|
|
zap.String("country", country),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
})
|
|
}
|
|
|
|
// upsertCounter executes the atomic INSERT...ON CONFLICT against
|
|
// `backend.user_country_counters`. The compound primary key
|
|
// `(user_id, country)` makes the upsert race-safe across concurrent
|
|
// goroutines.
|
|
func (s *Service) upsertCounter(ctx context.Context, userID uuid.UUID, country string) error {
|
|
ucc := table.UserCountryCounters
|
|
stmt := ucc.INSERT(ucc.UserID, ucc.Country, ucc.Count, ucc.LastSeenAt).
|
|
VALUES(userID, country, postgres.Int(1), postgres.NOW()).
|
|
ON_CONFLICT(ucc.UserID, ucc.Country).
|
|
DO_UPDATE(postgres.SET(
|
|
ucc.Count.SET(ucc.Count.ADD(postgres.Int(1))),
|
|
ucc.LastSeenAt.SET(postgres.TimestampzExp(postgres.NOW())),
|
|
))
|
|
if _, err := stmt.ExecContext(ctx, s.db); err != nil {
|
|
return fmt.Errorf("geo: upsert counter for %s/%s: %w", userID, country, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ListUserCounters returns every per-country counter recorded for
|
|
// userID, ordered by country ASC. The list is empty (and the error is
|
|
// nil) when the user has no rows; ListUserCounters does not check that
|
|
// the user exists in `backend.accounts` because the admin surface gates
|
|
// existence through a separate listing endpoint.
|
|
func (s *Service) ListUserCounters(ctx context.Context, userID uuid.UUID) ([]CountryCounter, error) {
|
|
if s == nil {
|
|
return nil, errors.New("geo: nil service")
|
|
}
|
|
if userID == uuid.Nil {
|
|
return nil, errors.New("geo: nil user id")
|
|
}
|
|
ucc := table.UserCountryCounters
|
|
stmt := postgres.SELECT(ucc.Country, ucc.Count, ucc.LastSeenAt).
|
|
FROM(ucc).
|
|
WHERE(ucc.UserID.EQ(postgres.UUID(userID))).
|
|
ORDER_BY(ucc.Country.ASC())
|
|
|
|
var dest []model.UserCountryCounters
|
|
if err := stmt.QueryContext(ctx, s.db, &dest); err != nil {
|
|
return nil, fmt.Errorf("geo: list counters for %s: %w", userID, err)
|
|
}
|
|
out := make([]CountryCounter, 0, len(dest))
|
|
for _, row := range dest {
|
|
entry := CountryCounter{Country: row.Country, Count: row.Count}
|
|
if row.LastSeenAt != nil {
|
|
ts := row.LastSeenAt.UTC()
|
|
entry.LastSeenAt = &ts
|
|
}
|
|
out = append(out, entry)
|
|
}
|
|
return out, nil
|
|
}
|