Merge pull request 'fix(backend): retry migrations on transient connection errors' (#74) from feature/pg-migration-transient-retry into development
This commit was merged in pull request #74.
This commit is contained in:
@@ -10,7 +10,10 @@ package postgres
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"database/sql/driver"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"galaxy/backend/internal/config"
|
"galaxy/backend/internal/config"
|
||||||
@@ -67,18 +70,84 @@ func Open(ctx context.Context, cfg config.PostgresConfig, runtime *telemetry.Run
|
|||||||
// backend table lives here.
|
// backend table lives here.
|
||||||
const schemaName = "backend"
|
const schemaName = "backend"
|
||||||
|
|
||||||
|
// migrationRetryAttempts and migrationRetryBackoff bound the transient-error
|
||||||
|
// retry around ApplyMigrations. A freshly started Postgres — notably a test
|
||||||
|
// container — can reset a pooled connection moments after it reports ready,
|
||||||
|
// which surfaces as `driver: bad connection` mid-migration; a handful of quick
|
||||||
|
// retries rides over that without masking real failures.
|
||||||
|
const (
|
||||||
|
migrationRetryAttempts = 5
|
||||||
|
migrationRetryBackoff = 250 * time.Millisecond
|
||||||
|
)
|
||||||
|
|
||||||
// ApplyMigrations runs every pending Up migration embedded in the backend
|
// ApplyMigrations runs every pending Up migration embedded in the backend
|
||||||
// binary against db. The schema is created upfront so goose's bookkeeping
|
// binary against db. The schema is created upfront so goose's bookkeeping
|
||||||
// table (`goose_db_version`, scoped to the DSN `search_path = backend`)
|
// table (`goose_db_version`, scoped to the DSN `search_path = backend`)
|
||||||
// has somewhere to land before the first migration runs; migration
|
// has somewhere to land before the first migration runs; migration
|
||||||
// `00001_init.sql` re-asserts the schema with `IF NOT EXISTS`, so the
|
// `00001_init.sql` re-asserts the schema with `IF NOT EXISTS`, so the
|
||||||
// double-create is idempotent.
|
// double-create is idempotent.
|
||||||
|
//
|
||||||
|
// The apply is retried on transient connection errors (see retryOnTransient).
|
||||||
|
// Both steps are idempotent — `CREATE SCHEMA IF NOT EXISTS` and goose's
|
||||||
|
// version tracking — so a retry after a dropped connection re-runs cleanly and
|
||||||
|
// resumes from the last committed migration.
|
||||||
func ApplyMigrations(ctx context.Context, db *sql.DB) error {
|
func ApplyMigrations(ctx context.Context, db *sql.DB) error {
|
||||||
if _, err := db.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
|
return retryOnTransient(ctx, migrationRetryAttempts, migrationRetryBackoff, func() error {
|
||||||
return fmt.Errorf("ensure backend schema: %w", err)
|
if _, err := db.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
|
||||||
}
|
return fmt.Errorf("ensure backend schema: %w", err)
|
||||||
if err := pgshared.RunMigrations(ctx, db, migrations.Migrations(), "."); err != nil {
|
}
|
||||||
return fmt.Errorf("apply backend migrations: %w", err)
|
if err := pgshared.RunMigrations(ctx, db, migrations.Migrations(), "."); err != nil {
|
||||||
}
|
return fmt.Errorf("apply backend migrations: %w", err)
|
||||||
return nil
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// retryOnTransient runs op up to attempts times, retrying only when op fails
|
||||||
|
// with a transient connection error (see isTransientConnError) — a dropped,
|
||||||
|
// reset, or refused connection, as opposed to a deterministic SQL error. It
|
||||||
|
// waits backoff between attempts and stops early if ctx is cancelled. A
|
||||||
|
// non-transient error, or the error from the final attempt, is returned as-is.
|
||||||
|
func retryOnTransient(ctx context.Context, attempts int, backoff time.Duration, op func() error) error {
|
||||||
|
var err error
|
||||||
|
for attempt := 1; attempt <= attempts; attempt++ {
|
||||||
|
if err = op(); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if attempt == attempts || !isTransientConnError(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return errors.Join(err, ctx.Err())
|
||||||
|
case <-time.After(backoff):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// isTransientConnError reports whether err is a transient connection-level
|
||||||
|
// failure worth retrying. It matches database/sql's driver.ErrBadConn and the
|
||||||
|
// connection-failure messages Postgres drivers surface, while leaving
|
||||||
|
// deterministic SQL errors (syntax, constraint violations) to fail fast.
|
||||||
|
func isTransientConnError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if errors.Is(err, driver.ErrBadConn) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
msg := strings.ToLower(err.Error())
|
||||||
|
for _, s := range []string{
|
||||||
|
"bad connection",
|
||||||
|
"connection refused",
|
||||||
|
"connection reset",
|
||||||
|
"broken pipe",
|
||||||
|
"server closed the connection",
|
||||||
|
} {
|
||||||
|
if strings.Contains(msg, s) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,103 @@
|
|||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql/driver"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestIsTransientConnError(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
err error
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{"nil", nil, false},
|
||||||
|
{"driver.ErrBadConn", driver.ErrBadConn, true},
|
||||||
|
{"wrapped ErrBadConn", fmt.Errorf("run migrations: %w", driver.ErrBadConn), true},
|
||||||
|
// The exact shape observed flaking CI: goose surfaces the driver
|
||||||
|
// error as a plain string, so errors.Is can't see ErrBadConn.
|
||||||
|
{"bad connection string", errors.New(`apply backend migrations: run migrations: ERROR 00001_init.sql: CREATE TABLE race_names: driver: bad connection`), true},
|
||||||
|
{"connection refused", errors.New("dial tcp 127.0.0.1:5432: connect: connection refused"), true},
|
||||||
|
{"connection reset", errors.New("read tcp: connection reset by peer"), true},
|
||||||
|
{"broken pipe", errors.New("write tcp: broken pipe"), true},
|
||||||
|
{"server closed", errors.New("pq: server closed the connection unexpectedly"), true},
|
||||||
|
{"syntax error is not transient", errors.New(`pq: syntax error at or near "TABL"`), false},
|
||||||
|
{"constraint violation is not transient", errors.New("pq: duplicate key value violates unique constraint"), false},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
assert.Equal(t, tt.want, isTransientConnError(tt.err))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRetryOnTransientSucceedsAfterTransientFailures(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
calls := 0
|
||||||
|
err := retryOnTransient(context.Background(), 5, time.Millisecond, func() error {
|
||||||
|
calls++
|
||||||
|
if calls < 3 {
|
||||||
|
return fmt.Errorf("attempt %d: %w", calls, driver.ErrBadConn)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 3, calls, "should retry until the transient error clears")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRetryOnTransientStopsOnNonTransient(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
sentinel := errors.New(`pq: syntax error at or near "TABL"`)
|
||||||
|
calls := 0
|
||||||
|
err := retryOnTransient(context.Background(), 5, time.Millisecond, func() error {
|
||||||
|
calls++
|
||||||
|
return sentinel
|
||||||
|
})
|
||||||
|
|
||||||
|
require.ErrorIs(t, err, sentinel)
|
||||||
|
assert.Equal(t, 1, calls, "a deterministic SQL error must not be retried")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRetryOnTransientExhaustsAttempts(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
calls := 0
|
||||||
|
err := retryOnTransient(context.Background(), 3, time.Millisecond, func() error {
|
||||||
|
calls++
|
||||||
|
return driver.ErrBadConn
|
||||||
|
})
|
||||||
|
|
||||||
|
require.ErrorIs(t, err, driver.ErrBadConn)
|
||||||
|
assert.Equal(t, 3, calls, "must stop after the attempt budget is spent")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRetryOnTransientRespectsContextCancellation(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
calls := 0
|
||||||
|
err := retryOnTransient(ctx, 5, time.Hour, func() error {
|
||||||
|
calls++
|
||||||
|
return driver.ErrBadConn
|
||||||
|
})
|
||||||
|
|
||||||
|
require.ErrorIs(t, err, context.Canceled)
|
||||||
|
require.ErrorIs(t, err, driver.ErrBadConn, "the underlying transient error is preserved")
|
||||||
|
assert.Equal(t, 1, calls, "cancellation during backoff stops further attempts")
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user