eeaad62b10
- internal/postgres: pgx-over-database/sql pool (otelsql), embedded goose
migrations into schema 'backend', committed go-jet code + cmd/jetgen tool.
- internal/account: durable accounts + unified telegram/email identities
(UUIDv7 keys), find-or-create provisioning with unique-conflict handling.
- internal/session: opaque 256-bit tokens stored as a SHA-256 hash, revoke-only
(no TTL); write-through cache gating /readyz; store + service.
- internal/telemetry: OTel tracer/meter providers (none/stdout) + request-timing
middleware; internal/config gains Postgres + OTel env loading.
- internal/server: /api/v1 {public,user,internal,admin} skeleton + X-User-ID
middleware; /readyz checks DB ping + cache; main wires
telemetry -> db+migrate -> warm cache -> server.
- Tests: unit + integration (build tag 'integration', testcontainers
postgres:17) for migrations, accounts, sessions, readyz; new integration.yaml.
- Docs: ARCHITECTURE, TESTING, PLAN refinements, root + backend READMEs.
Session/account REST handlers deferred to Stage 6 (gateway); OTLP + dashboards
to Stage 11.
178 lines
5.7 KiB
Go
178 lines
5.7 KiB
Go
// Package postgres opens the backend's Postgres pool and applies the embedded
|
|
// goose migrations into the `backend` schema at startup.
|
|
//
|
|
// The pool is a standard library *sql.DB backed by the pgx driver (registered
|
|
// through pgx/stdlib) and instrumented with otelsql, so go-jet queries run over
|
|
// database/sql while statement spans and connection-pool metrics flow into
|
|
// OpenTelemetry. The DSN must pin search_path to the backend schema.
|
|
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/XSAM/otelsql"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/stdlib"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
// Default pool tuning applied by DefaultConfig.
|
|
const (
|
|
DefaultMaxOpenConns = 25
|
|
DefaultMaxIdleConns = 5
|
|
DefaultConnMaxLifetime = 30 * time.Minute
|
|
DefaultOperationTimeout = 5 * time.Second
|
|
)
|
|
|
|
// dbSystemAttribute identifies the wrapped backend in OpenTelemetry spans
|
|
// without pinning the package to a specific semconv release.
|
|
var dbSystemAttribute = attribute.String("db.system", "postgresql")
|
|
|
|
// Config describes how to open the backend Postgres pool.
|
|
type Config struct {
|
|
// DSN is the pgx/libpq connection string. It must pin search_path to the
|
|
// backend schema, e.g. "postgres://…/db?search_path=backend&sslmode=disable".
|
|
DSN string
|
|
// MaxOpenConns bounds the pool's open connections (database/sql).
|
|
MaxOpenConns int
|
|
// MaxIdleConns bounds the pool's idle connections (database/sql).
|
|
MaxIdleConns int
|
|
// ConnMaxLifetime caps how long a pooled connection may be reused.
|
|
ConnMaxLifetime time.Duration
|
|
// OperationTimeout bounds a single connect attempt and the startup Ping.
|
|
OperationTimeout time.Duration
|
|
}
|
|
|
|
// DefaultConfig returns a Config carrying the default pool tuning and an empty
|
|
// DSN. Callers fill DSN from the environment before opening.
|
|
func DefaultConfig() Config {
|
|
return Config{
|
|
MaxOpenConns: DefaultMaxOpenConns,
|
|
MaxIdleConns: DefaultMaxIdleConns,
|
|
ConnMaxLifetime: DefaultConnMaxLifetime,
|
|
OperationTimeout: DefaultOperationTimeout,
|
|
}
|
|
}
|
|
|
|
// Validate reports whether the configuration is usable.
|
|
func (c Config) Validate() error {
|
|
if strings.TrimSpace(c.DSN) == "" {
|
|
return errors.New("postgres: DSN must not be empty")
|
|
}
|
|
if c.MaxOpenConns <= 0 {
|
|
return fmt.Errorf("postgres: MaxOpenConns must be positive, got %d", c.MaxOpenConns)
|
|
}
|
|
if c.MaxIdleConns < 0 {
|
|
return fmt.Errorf("postgres: MaxIdleConns must not be negative, got %d", c.MaxIdleConns)
|
|
}
|
|
if c.OperationTimeout <= 0 {
|
|
return fmt.Errorf("postgres: OperationTimeout must be positive, got %s", c.OperationTimeout)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Option configures the OpenTelemetry providers attached to a pool by Open.
|
|
// Unset providers fall back to the OpenTelemetry global providers.
|
|
type Option func(*options)
|
|
|
|
type options struct {
|
|
tracerProvider trace.TracerProvider
|
|
meterProvider metric.MeterProvider
|
|
}
|
|
|
|
// WithTracerProvider sets the tracer provider used for SQL statement spans.
|
|
func WithTracerProvider(tp trace.TracerProvider) Option {
|
|
return func(o *options) { o.tracerProvider = tp }
|
|
}
|
|
|
|
// WithMeterProvider sets the meter provider used for connection-pool metrics.
|
|
func WithMeterProvider(mp metric.MeterProvider) Option {
|
|
return func(o *options) { o.meterProvider = mp }
|
|
}
|
|
|
|
func evalOptions(opts []Option) options {
|
|
var resolved options
|
|
for _, opt := range opts {
|
|
if opt != nil {
|
|
opt(&resolved)
|
|
}
|
|
}
|
|
return resolved
|
|
}
|
|
|
|
func (o options) otelsqlOptions() []otelsql.Option {
|
|
out := []otelsql.Option{otelsql.WithAttributes(dbSystemAttribute)}
|
|
if o.tracerProvider != nil {
|
|
out = append(out, otelsql.WithTracerProvider(o.tracerProvider))
|
|
}
|
|
if o.meterProvider != nil {
|
|
out = append(out, otelsql.WithMeterProvider(o.meterProvider))
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Open opens the instrumented pool described by cfg, registers connection-pool
|
|
// metrics, and verifies connectivity with a bounded Ping. Closing the returned
|
|
// *sql.DB is the caller's responsibility.
|
|
func Open(ctx context.Context, cfg Config, opts ...Option) (*sql.DB, error) {
|
|
if err := cfg.Validate(); err != nil {
|
|
return nil, fmt.Errorf("open postgres: %w", err)
|
|
}
|
|
resolved := evalOptions(opts)
|
|
|
|
pgxCfg, err := pgx.ParseConfig(cfg.DSN)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open postgres: parse dsn: %w", err)
|
|
}
|
|
pgxCfg.ConnectTimeout = cfg.OperationTimeout
|
|
registeredName := stdlib.RegisterConnConfig(pgxCfg)
|
|
|
|
db, err := otelsql.Open("pgx", registeredName, resolved.otelsqlOptions()...)
|
|
if err != nil {
|
|
stdlib.UnregisterConnConfig(registeredName)
|
|
return nil, fmt.Errorf("open postgres: otelsql open: %w", err)
|
|
}
|
|
|
|
db.SetMaxOpenConns(cfg.MaxOpenConns)
|
|
db.SetMaxIdleConns(cfg.MaxIdleConns)
|
|
db.SetConnMaxLifetime(cfg.ConnMaxLifetime)
|
|
|
|
if _, err := otelsql.RegisterDBStatsMetrics(db, resolved.otelsqlOptions()...); err != nil {
|
|
_ = db.Close()
|
|
stdlib.UnregisterConnConfig(registeredName)
|
|
return nil, fmt.Errorf("open postgres: register db stats: %w", err)
|
|
}
|
|
|
|
if err := Ping(ctx, db, cfg.OperationTimeout); err != nil {
|
|
_ = db.Close()
|
|
stdlib.UnregisterConnConfig(registeredName)
|
|
return nil, err
|
|
}
|
|
return db, nil
|
|
}
|
|
|
|
// Ping bounds db.PingContext under timeout and wraps the error so startup
|
|
// failures are easy to spot in service logs. The same call backs the /readyz
|
|
// probe. timeout is typically Config.OperationTimeout.
|
|
func Ping(ctx context.Context, db *sql.DB, timeout time.Duration) error {
|
|
if db == nil {
|
|
return errors.New("ping postgres: nil db")
|
|
}
|
|
if timeout <= 0 {
|
|
return errors.New("ping postgres: timeout must be positive")
|
|
}
|
|
pingCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
if err := db.PingContext(pingCtx); err != nil {
|
|
return fmt.Errorf("ping postgres: %w", err)
|
|
}
|
|
return nil
|
|
}
|