// Package postgres opens the backend's primary Postgres pool and applies the // embedded migrations. // // The package is a thin wrapper around galaxy/postgres: it adapts the backend // configuration shape to galaxy/postgres.Config, plumbs the OpenTelemetry // tracer and meter providers from the telemetry runtime, instruments the // pool, and verifies connectivity with a bounded Ping. package postgres import ( "context" "database/sql" "database/sql/driver" "errors" "fmt" "strings" "time" "galaxy/backend/internal/config" "galaxy/backend/internal/postgres/migrations" "galaxy/backend/internal/telemetry" pgshared "galaxy/postgres" ) // connMaxLifetime caps the lifetime of an individual pooled connection. Kept // in sync with galaxy/postgres.DefaultConnMaxLifetime so behaviour matches // the helper's defaults until backend has reason to deviate. const connMaxLifetime = 30 * time.Minute // Open constructs the primary Postgres pool, instruments it, pings it, and // returns the *sql.DB. Closing the database is the caller's responsibility. func Open(ctx context.Context, cfg config.PostgresConfig, runtime *telemetry.Runtime) (*sql.DB, error) { pgCfg := pgshared.Config{ PrimaryDSN: cfg.DSN, OperationTimeout: cfg.OperationTimeout, MaxOpenConns: cfg.MaxConns, MaxIdleConns: cfg.MinConns, ConnMaxLifetime: connMaxLifetime, } db, err := pgshared.OpenPrimary( ctx, pgCfg, pgshared.WithTracerProvider(runtime.TracerProvider()), pgshared.WithMeterProvider(runtime.MeterProvider()), ) if err != nil { return nil, fmt.Errorf("open backend postgres pool: %w", err) } if _, err := pgshared.InstrumentDBStats( db, pgshared.WithTracerProvider(runtime.TracerProvider()), pgshared.WithMeterProvider(runtime.MeterProvider()), ); err != nil { _ = db.Close() return nil, fmt.Errorf("instrument backend postgres pool: %w", err) } if err := pgshared.Ping(ctx, db, cfg.OperationTimeout); err != nil { _ = db.Close() return nil, fmt.Errorf("ping backend postgres pool: %w", err) } return db, nil } // schemaName is the Postgres schema owned by the backend service. Every // backend table lives here. const schemaName = "backend" // migrationRetryAttempts and migrationRetryBackoff bound the transient-error // retry around ApplyMigrations. A freshly started Postgres — notably a test // container — can reset a pooled connection moments after it reports ready, // which surfaces as `driver: bad connection` mid-migration; a handful of quick // retries rides over that without masking real failures. const ( migrationRetryAttempts = 5 migrationRetryBackoff = 250 * time.Millisecond ) // ApplyMigrations runs every pending Up migration embedded in the backend // binary against db. The schema is created upfront so goose's bookkeeping // table (`goose_db_version`, scoped to the DSN `search_path = backend`) // has somewhere to land before the first migration runs; migration // `00001_init.sql` re-asserts the schema with `IF NOT EXISTS`, so the // double-create is idempotent. // // The apply is retried on transient connection errors (see retryOnTransient). // Both steps are idempotent — `CREATE SCHEMA IF NOT EXISTS` and goose's // version tracking — so a retry after a dropped connection re-runs cleanly and // resumes from the last committed migration. func ApplyMigrations(ctx context.Context, db *sql.DB) error { return retryOnTransient(ctx, migrationRetryAttempts, migrationRetryBackoff, func() error { if _, err := db.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil { return fmt.Errorf("ensure backend schema: %w", err) } if err := pgshared.RunMigrations(ctx, db, migrations.Migrations(), "."); err != nil { return fmt.Errorf("apply backend migrations: %w", err) } return nil }) } // retryOnTransient runs op up to attempts times, retrying only when op fails // with a transient connection error (see isTransientConnError) — a dropped, // reset, or refused connection, as opposed to a deterministic SQL error. It // waits backoff between attempts and stops early if ctx is cancelled. A // non-transient error, or the error from the final attempt, is returned as-is. func retryOnTransient(ctx context.Context, attempts int, backoff time.Duration, op func() error) error { var err error for attempt := 1; attempt <= attempts; attempt++ { if err = op(); err == nil { return nil } if attempt == attempts || !isTransientConnError(err) { return err } select { case <-ctx.Done(): return errors.Join(err, ctx.Err()) case <-time.After(backoff): } } return err } // isTransientConnError reports whether err is a transient connection-level // failure worth retrying. It matches database/sql's driver.ErrBadConn and the // connection-failure messages Postgres drivers surface, while leaving // deterministic SQL errors (syntax, constraint violations) to fail fast. func isTransientConnError(err error) bool { if err == nil { return false } if errors.Is(err, driver.ErrBadConn) { return true } msg := strings.ToLower(err.Error()) for _, s := range []string{ "bad connection", "connection refused", "connection reset", "broken pipe", "server closed the connection", } { if strings.Contains(msg, s) { return true } } return false }