Turn the console landing page into an operational dashboard. - new internal/opsstatus: read-only Postgres projection via go-jet — ping + per-status COUNT/GROUP BY on runtime_records, mail_deliveries, notification_routes, and a malformed-intent count; degrades per-probe into Snapshot.Errors rather than failing the page - dashboard renders backend readiness, database health, the three status tables, the malformed count, and any collection errors; falls back to a "monitoring not wired" note when no reader is injected - AdminConsoleHandlers now takes an AdminConsoleDeps struct (Monitor + Ready added) so later stages add service refs without churning the signature Tests: opsstatus store test against a Postgres testcontainer (empty schema + one enqueued delivery); dashboard render tests with a fake reader (with and without monitoring). Docs: ARCHITECTURE 14.1 + FUNCTIONAL 10.2.1 (+ru) describe the dashboard. (Prometheus /metrics exporters were already enabled in dev-deploy in Stage 1.)
This commit is contained in:
@@ -0,0 +1,139 @@
|
||||
// Package opsstatus reads point-in-time operational signals from Postgres for
|
||||
// the admin console dashboard: database reachability, per-status counts of game
|
||||
// runtimes, mail deliveries, and notification routes, plus the malformed
|
||||
// notification-intent count.
|
||||
//
|
||||
// It is a read-only projection built entirely through the go-jet query builder
|
||||
// against the generated table bindings; it owns no business logic and mutates
|
||||
// nothing. Richer, historical metrics are out of scope — those belong to the
|
||||
// Prometheus exporters wired on `backend` and `gateway`.
|
||||
package opsstatus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"galaxy/backend/internal/postgres/jet/backend/table"
|
||||
|
||||
"github.com/go-jet/jet/v2/postgres"
|
||||
)
|
||||
|
||||
// defaultCollectTimeout bounds a single Collect call so a slow or wedged
|
||||
// database cannot hang the dashboard request.
|
||||
const defaultCollectTimeout = 3 * time.Second
|
||||
|
||||
// StatusCount pairs a status value with the number of rows currently in it.
|
||||
type StatusCount struct {
|
||||
Status string
|
||||
Count int64
|
||||
}
|
||||
|
||||
// Snapshot is a point-in-time view of the operational signals rendered on the
|
||||
// dashboard. Errors collects per-query failures so a single failing probe
|
||||
// degrades to a visible note rather than failing the whole page.
|
||||
type Snapshot struct {
|
||||
PostgresHealthy bool
|
||||
Runtimes []StatusCount
|
||||
MailDeliveries []StatusCount
|
||||
NotificationRoutes []StatusCount
|
||||
NotificationMalformed int64
|
||||
Errors []string
|
||||
}
|
||||
|
||||
// Reader collects an operational Snapshot. The admin console depends on this
|
||||
// interface so the dashboard can be tested without a database.
|
||||
type Reader interface {
|
||||
Collect(ctx context.Context) Snapshot
|
||||
}
|
||||
|
||||
// Store is the Postgres-backed Reader.
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// NewStore constructs a Store reading from db.
|
||||
func NewStore(db *sql.DB) *Store {
|
||||
return &Store{db: db, timeout: defaultCollectTimeout}
|
||||
}
|
||||
|
||||
// Collect gathers the dashboard signals within a bounded timeout. It never
|
||||
// returns an error: a failed probe is recorded in Snapshot.Errors and the
|
||||
// remaining probes still run, except that a failed Postgres ping short-circuits
|
||||
// the rest (the dependent queries would only fail the same way).
|
||||
func (s *Store) Collect(ctx context.Context) Snapshot {
|
||||
ctx, cancel := context.WithTimeout(ctx, s.timeout)
|
||||
defer cancel()
|
||||
|
||||
var snap Snapshot
|
||||
|
||||
if err := s.db.PingContext(ctx); err != nil {
|
||||
snap.Errors = append(snap.Errors, fmt.Sprintf("postgres ping: %v", err))
|
||||
return snap
|
||||
}
|
||||
snap.PostgresHealthy = true
|
||||
|
||||
if counts, err := s.statusCounts(ctx, table.RuntimeRecords.Status, table.RuntimeRecords); err != nil {
|
||||
snap.Errors = append(snap.Errors, fmt.Sprintf("runtime status counts: %v", err))
|
||||
} else {
|
||||
snap.Runtimes = counts
|
||||
}
|
||||
|
||||
if counts, err := s.statusCounts(ctx, table.MailDeliveries.Status, table.MailDeliveries); err != nil {
|
||||
snap.Errors = append(snap.Errors, fmt.Sprintf("mail delivery counts: %v", err))
|
||||
} else {
|
||||
snap.MailDeliveries = counts
|
||||
}
|
||||
|
||||
if counts, err := s.statusCounts(ctx, table.NotificationRoutes.Status, table.NotificationRoutes); err != nil {
|
||||
snap.Errors = append(snap.Errors, fmt.Sprintf("notification route counts: %v", err))
|
||||
} else {
|
||||
snap.NotificationRoutes = counts
|
||||
}
|
||||
|
||||
if n, err := s.countAll(ctx, table.NotificationMalformedIntents); err != nil {
|
||||
snap.Errors = append(snap.Errors, fmt.Sprintf("malformed notification count: %v", err))
|
||||
} else {
|
||||
snap.NotificationMalformed = n
|
||||
}
|
||||
|
||||
return snap
|
||||
}
|
||||
|
||||
// statusCounts runs `SELECT status, COUNT(*) FROM <from> GROUP BY status`
|
||||
// through jet and returns the rows ordered by status.
|
||||
func (s *Store) statusCounts(ctx context.Context, status postgres.ColumnString, from postgres.ReadableTable) ([]StatusCount, error) {
|
||||
stmt := postgres.SELECT(
|
||||
status.AS("status_count.status"),
|
||||
postgres.COUNT(postgres.STAR).AS("status_count.count"),
|
||||
).FROM(from).GROUP_BY(status).ORDER_BY(status.ASC())
|
||||
|
||||
var rows []struct {
|
||||
Status string `alias:"status_count.status"`
|
||||
Count int64 `alias:"status_count.count"`
|
||||
}
|
||||
if err := stmt.QueryContext(ctx, s.db, &rows); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make([]StatusCount, len(rows))
|
||||
for i, row := range rows {
|
||||
out[i] = StatusCount{Status: row.Status, Count: row.Count}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// countAll runs `SELECT COUNT(*) FROM <from>` through jet.
|
||||
func (s *Store) countAll(ctx context.Context, from postgres.ReadableTable) (int64, error) {
|
||||
stmt := postgres.SELECT(postgres.COUNT(postgres.STAR).AS("count")).FROM(from)
|
||||
|
||||
var row struct {
|
||||
Count int64 `alias:"count"`
|
||||
}
|
||||
if err := stmt.QueryContext(ctx, s.db, &row); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return row.Count, nil
|
||||
}
|
||||
@@ -0,0 +1,155 @@
|
||||
package opsstatus_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"galaxy/backend/internal/mail"
|
||||
"galaxy/backend/internal/opsstatus"
|
||||
backendpg "galaxy/backend/internal/postgres"
|
||||
pgshared "galaxy/postgres"
|
||||
|
||||
"github.com/google/uuid"
|
||||
testcontainers "github.com/testcontainers/testcontainers-go"
|
||||
tcpostgres "github.com/testcontainers/testcontainers-go/modules/postgres"
|
||||
"github.com/testcontainers/testcontainers-go/wait"
|
||||
)
|
||||
|
||||
const (
|
||||
pgImage = "postgres:16-alpine"
|
||||
pgUser = "galaxy"
|
||||
pgPassword = "galaxy"
|
||||
pgDatabase = "galaxy_backend"
|
||||
pgSchema = "backend"
|
||||
pgStartup = 90 * time.Second
|
||||
pgOpTO = 10 * time.Second
|
||||
)
|
||||
|
||||
// startPostgres mirrors the per-package scaffolding used by the other store
|
||||
// tests: spin up Postgres, apply migrations, return *sql.DB.
|
||||
func startPostgres(t *testing.T) *sql.DB {
|
||||
t.Helper()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
pgContainer, err := tcpostgres.Run(ctx, pgImage,
|
||||
tcpostgres.WithDatabase(pgDatabase),
|
||||
tcpostgres.WithUsername(pgUser),
|
||||
tcpostgres.WithPassword(pgPassword),
|
||||
testcontainers.WithWaitStrategy(
|
||||
wait.ForLog("database system is ready to accept connections").
|
||||
WithOccurrence(2).
|
||||
WithStartupTimeout(pgStartup),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
t.Skipf("postgres testcontainer unavailable, skipping: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
if termErr := testcontainers.TerminateContainer(pgContainer); termErr != nil {
|
||||
t.Errorf("terminate postgres container: %v", termErr)
|
||||
}
|
||||
})
|
||||
|
||||
baseDSN, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
|
||||
if err != nil {
|
||||
t.Fatalf("connection string: %v", err)
|
||||
}
|
||||
scopedDSN, err := dsnWithSearchPath(baseDSN, pgSchema)
|
||||
if err != nil {
|
||||
t.Fatalf("scope dsn: %v", err)
|
||||
}
|
||||
|
||||
cfg := pgshared.DefaultConfig()
|
||||
cfg.PrimaryDSN = scopedDSN
|
||||
cfg.OperationTimeout = pgOpTO
|
||||
|
||||
db, err := pgshared.OpenPrimary(ctx, cfg, backendpg.NoObservabilityOptions()...)
|
||||
if err != nil {
|
||||
t.Fatalf("open primary: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { _ = db.Close() })
|
||||
|
||||
if err := pgshared.Ping(ctx, db, cfg.OperationTimeout); err != nil {
|
||||
t.Fatalf("ping: %v", err)
|
||||
}
|
||||
if err := backendpg.ApplyMigrations(ctx, db); err != nil {
|
||||
t.Fatalf("apply migrations: %v", err)
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
func dsnWithSearchPath(baseDSN, schema string) (string, error) {
|
||||
parsed, err := url.Parse(baseDSN)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
values := parsed.Query()
|
||||
values.Set("search_path", schema)
|
||||
if values.Get("sslmode") == "" {
|
||||
values.Set("sslmode", "disable")
|
||||
}
|
||||
parsed.RawQuery = values.Encode()
|
||||
return parsed.String(), nil
|
||||
}
|
||||
|
||||
func TestStoreCollect(t *testing.T) {
|
||||
db := startPostgres(t)
|
||||
store := opsstatus.NewStore(db)
|
||||
ctx := context.Background()
|
||||
|
||||
// Empty schema: queries must execute cleanly with zero counts.
|
||||
empty := store.Collect(ctx)
|
||||
if !empty.PostgresHealthy {
|
||||
t.Fatal("PostgresHealthy must be true against a reachable database")
|
||||
}
|
||||
if len(empty.Errors) != 0 {
|
||||
t.Fatalf("unexpected collection errors: %v", empty.Errors)
|
||||
}
|
||||
if got := totalCount(empty.MailDeliveries); got != 0 {
|
||||
t.Fatalf("mail deliveries total = %d, want 0", got)
|
||||
}
|
||||
if len(empty.Runtimes) != 0 || len(empty.NotificationRoutes) != 0 {
|
||||
t.Fatalf("expected empty status slices, got runtimes=%v routes=%v", empty.Runtimes, empty.NotificationRoutes)
|
||||
}
|
||||
if empty.NotificationMalformed != 0 {
|
||||
t.Fatalf("malformed notifications = %d, want 0", empty.NotificationMalformed)
|
||||
}
|
||||
|
||||
// Enqueue one mail delivery and confirm the GROUP BY count reflects it.
|
||||
mailStore := mail.NewStore(db)
|
||||
inserted, err := mailStore.InsertEnqueue(ctx, mail.EnqueueArgs{
|
||||
DeliveryID: uuid.New(),
|
||||
TemplateID: mail.TemplateLoginCode,
|
||||
IdempotencyKey: uuid.NewString(),
|
||||
Recipients: []string{"ops@example.test"},
|
||||
ContentType: "text/plain",
|
||||
Subject: "hello",
|
||||
Body: []byte("hi"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("insert mail delivery: %v", err)
|
||||
}
|
||||
if !inserted {
|
||||
t.Fatal("expected the delivery to be inserted")
|
||||
}
|
||||
|
||||
after := store.Collect(ctx)
|
||||
if len(after.Errors) != 0 {
|
||||
t.Fatalf("unexpected collection errors after insert: %v", after.Errors)
|
||||
}
|
||||
if got := totalCount(after.MailDeliveries); got != 1 {
|
||||
t.Fatalf("mail deliveries total after insert = %d, want 1 (statuses: %v)", got, after.MailDeliveries)
|
||||
}
|
||||
}
|
||||
|
||||
func totalCount(counts []opsstatus.StatusCount) int64 {
|
||||
var total int64
|
||||
for _, c := range counts {
|
||||
total += c.Count
|
||||
}
|
||||
return total
|
||||
}
|
||||
Reference in New Issue
Block a user