// Package postgres opens the backend's Postgres pool and applies the embedded // goose migrations into the `backend` schema at startup. // // The pool is a standard library *sql.DB backed by the pgx driver (registered // through pgx/stdlib) and instrumented with otelsql, so go-jet queries run over // database/sql while statement spans and connection-pool metrics flow into // OpenTelemetry. The DSN must pin search_path to the backend schema. package postgres import ( "context" "database/sql" "errors" "fmt" "strings" "time" "github.com/XSAM/otelsql" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/stdlib" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" ) // Default pool tuning applied by DefaultConfig. const ( DefaultMaxOpenConns = 25 DefaultMaxIdleConns = 5 DefaultConnMaxLifetime = 30 * time.Minute DefaultOperationTimeout = 5 * time.Second ) // dbSystemAttribute identifies the wrapped backend in OpenTelemetry spans // without pinning the package to a specific semconv release. var dbSystemAttribute = attribute.String("db.system", "postgresql") // Config describes how to open the backend Postgres pool. type Config struct { // DSN is the pgx/libpq connection string. It must pin search_path to the // backend schema, e.g. "postgres://…/db?search_path=backend&sslmode=disable". DSN string // MaxOpenConns bounds the pool's open connections (database/sql). MaxOpenConns int // MaxIdleConns bounds the pool's idle connections (database/sql). MaxIdleConns int // ConnMaxLifetime caps how long a pooled connection may be reused. ConnMaxLifetime time.Duration // OperationTimeout bounds a single connect attempt and the startup Ping. OperationTimeout time.Duration } // DefaultConfig returns a Config carrying the default pool tuning and an empty // DSN. Callers fill DSN from the environment before opening. func DefaultConfig() Config { return Config{ MaxOpenConns: DefaultMaxOpenConns, MaxIdleConns: DefaultMaxIdleConns, ConnMaxLifetime: DefaultConnMaxLifetime, OperationTimeout: DefaultOperationTimeout, } } // Validate reports whether the configuration is usable. func (c Config) Validate() error { if strings.TrimSpace(c.DSN) == "" { return errors.New("postgres: DSN must not be empty") } if c.MaxOpenConns <= 0 { return fmt.Errorf("postgres: MaxOpenConns must be positive, got %d", c.MaxOpenConns) } if c.MaxIdleConns < 0 { return fmt.Errorf("postgres: MaxIdleConns must not be negative, got %d", c.MaxIdleConns) } if c.OperationTimeout <= 0 { return fmt.Errorf("postgres: OperationTimeout must be positive, got %s", c.OperationTimeout) } return nil } // Option configures the OpenTelemetry providers attached to a pool by Open. // Unset providers fall back to the OpenTelemetry global providers. type Option func(*options) type options struct { tracerProvider trace.TracerProvider meterProvider metric.MeterProvider } // WithTracerProvider sets the tracer provider used for SQL statement spans. func WithTracerProvider(tp trace.TracerProvider) Option { return func(o *options) { o.tracerProvider = tp } } // WithMeterProvider sets the meter provider used for connection-pool metrics. func WithMeterProvider(mp metric.MeterProvider) Option { return func(o *options) { o.meterProvider = mp } } func evalOptions(opts []Option) options { var resolved options for _, opt := range opts { if opt != nil { opt(&resolved) } } return resolved } func (o options) otelsqlOptions() []otelsql.Option { out := []otelsql.Option{otelsql.WithAttributes(dbSystemAttribute)} if o.tracerProvider != nil { out = append(out, otelsql.WithTracerProvider(o.tracerProvider)) } if o.meterProvider != nil { out = append(out, otelsql.WithMeterProvider(o.meterProvider)) } return out } // Open opens the instrumented pool described by cfg, registers connection-pool // metrics, and verifies connectivity with a bounded Ping. Closing the returned // *sql.DB is the caller's responsibility. func Open(ctx context.Context, cfg Config, opts ...Option) (*sql.DB, error) { if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("open postgres: %w", err) } resolved := evalOptions(opts) pgxCfg, err := pgx.ParseConfig(cfg.DSN) if err != nil { return nil, fmt.Errorf("open postgres: parse dsn: %w", err) } pgxCfg.ConnectTimeout = cfg.OperationTimeout registeredName := stdlib.RegisterConnConfig(pgxCfg) db, err := otelsql.Open("pgx", registeredName, resolved.otelsqlOptions()...) if err != nil { stdlib.UnregisterConnConfig(registeredName) return nil, fmt.Errorf("open postgres: otelsql open: %w", err) } db.SetMaxOpenConns(cfg.MaxOpenConns) db.SetMaxIdleConns(cfg.MaxIdleConns) db.SetConnMaxLifetime(cfg.ConnMaxLifetime) if _, err := otelsql.RegisterDBStatsMetrics(db, resolved.otelsqlOptions()...); err != nil { _ = db.Close() stdlib.UnregisterConnConfig(registeredName) return nil, fmt.Errorf("open postgres: register db stats: %w", err) } if err := Ping(ctx, db, cfg.OperationTimeout); err != nil { _ = db.Close() stdlib.UnregisterConnConfig(registeredName) return nil, err } return db, nil } // Ping bounds db.PingContext under timeout and wraps the error so startup // failures are easy to spot in service logs. The same call backs the /readyz // probe. timeout is typically Config.OperationTimeout. func Ping(ctx context.Context, db *sql.DB, timeout time.Duration) error { if db == nil { return errors.New("ping postgres: nil db") } if timeout <= 0 { return errors.New("ping postgres: timeout must be positive") } pingCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() if err := db.PingContext(pingCtx); err != nil { return fmt.Errorf("ping postgres: %w", err) } return nil }