feat: mail service
This commit is contained in:
@@ -0,0 +1,148 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"galaxy/mail/internal/service/executeattempt"
|
||||
)
|
||||
|
||||
// AttemptExecutionService executes one claimed in-progress attempt.
|
||||
type AttemptExecutionService interface {
|
||||
// Execute runs one claimed attempt through provider execution and durable
|
||||
// state mutation.
|
||||
Execute(context.Context, executeattempt.WorkItem) error
|
||||
}
|
||||
|
||||
// AttemptWorkerPoolConfig stores the dependencies used by AttemptWorkerPool.
|
||||
type AttemptWorkerPoolConfig struct {
|
||||
// Concurrency stores how many workers run concurrently.
|
||||
Concurrency int
|
||||
|
||||
// WorkQueue stores the claimed attempt handoff channel produced by the
|
||||
// scheduler.
|
||||
WorkQueue <-chan executeattempt.WorkItem
|
||||
|
||||
// Service executes one claimed attempt.
|
||||
Service AttemptExecutionService
|
||||
}
|
||||
|
||||
// AttemptWorkerPool executes claimed attempts concurrently.
|
||||
type AttemptWorkerPool struct {
|
||||
concurrency int
|
||||
workQueue <-chan executeattempt.WorkItem
|
||||
service AttemptExecutionService
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewAttemptWorkerPool constructs one attempt worker pool.
|
||||
func NewAttemptWorkerPool(cfg AttemptWorkerPoolConfig, logger *slog.Logger) (*AttemptWorkerPool, error) {
|
||||
switch {
|
||||
case cfg.Concurrency <= 0:
|
||||
return nil, errors.New("new attempt worker pool: concurrency must be positive")
|
||||
case cfg.WorkQueue == nil:
|
||||
return nil, errors.New("new attempt worker pool: nil work queue")
|
||||
case cfg.Service == nil:
|
||||
return nil, errors.New("new attempt worker pool: nil attempt execution service")
|
||||
}
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
|
||||
return &AttemptWorkerPool{
|
||||
concurrency: cfg.Concurrency,
|
||||
workQueue: cfg.WorkQueue,
|
||||
service: cfg.Service,
|
||||
logger: logger.With("component", "attempt_worker_pool", "concurrency", cfg.Concurrency),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run starts the attempt worker pool and blocks until ctx is canceled or one
|
||||
// worker returns an execution error.
|
||||
func (pool *AttemptWorkerPool) Run(ctx context.Context) error {
|
||||
if ctx == nil {
|
||||
return errors.New("run attempt worker pool: nil context")
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
if pool == nil {
|
||||
return errors.New("run attempt worker pool: nil pool")
|
||||
}
|
||||
|
||||
pool.logger.Info("attempt worker pool started")
|
||||
defer pool.logger.Info("attempt worker pool stopped")
|
||||
|
||||
runCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
errs := make(chan error, pool.concurrency)
|
||||
var waitGroup sync.WaitGroup
|
||||
|
||||
for index := 0; index < pool.concurrency; index++ {
|
||||
waitGroup.Add(1)
|
||||
go func(workerIndex int) {
|
||||
defer waitGroup.Done()
|
||||
if err := pool.runWorker(runCtx, workerIndex); err != nil {
|
||||
errs <- err
|
||||
}
|
||||
}(index)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
waitGroup.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
<-done
|
||||
return ctx.Err()
|
||||
case err := <-errs:
|
||||
cancel()
|
||||
<-done
|
||||
return err
|
||||
case <-done:
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
return errors.New("run attempt worker pool: workers exited without shutdown")
|
||||
}
|
||||
}
|
||||
|
||||
func (pool *AttemptWorkerPool) runWorker(ctx context.Context, workerIndex int) error {
|
||||
pool.logger.Debug("attempt worker started", "worker_index", workerIndex)
|
||||
defer pool.logger.Debug("attempt worker stopped", "worker_index", workerIndex)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case item, ok := <-pool.workQueue:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if err := pool.service.Execute(ctx, item); err != nil {
|
||||
return fmt.Errorf("attempt worker %d: %w", workerIndex, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown stops the attempt worker pool within ctx. The pool does not own
|
||||
// additional resources beyond its run loop.
|
||||
func (pool *AttemptWorkerPool) Shutdown(ctx context.Context) error {
|
||||
if ctx == nil {
|
||||
return errors.New("shutdown attempt worker pool: nil context")
|
||||
}
|
||||
if pool == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,347 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"galaxy/mail/internal/adapters/redisstate"
|
||||
"galaxy/mail/internal/adapters/stubprovider"
|
||||
"galaxy/mail/internal/domain/attempt"
|
||||
"galaxy/mail/internal/domain/common"
|
||||
deliverydomain "galaxy/mail/internal/domain/delivery"
|
||||
"galaxy/mail/internal/ports"
|
||||
"galaxy/mail/internal/service/executeattempt"
|
||||
"galaxy/mail/internal/service/renderdelivery"
|
||||
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAttemptWorkersSendImmediateFirstAttempt(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fixture := newAttemptWorkerFixture(t, nil)
|
||||
createAcceptedRenderedDelivery(t, fixture.client, common.DeliveryID("delivery-immediate"), fixture.clock.Now())
|
||||
|
||||
cancel, wait := fixture.run(t)
|
||||
defer func() {
|
||||
cancel()
|
||||
wait()
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
deliveryRecord := loadDeliveryRecord(t, fixture.client, common.DeliveryID("delivery-immediate"))
|
||||
return deliveryRecord.Status == deliverydomain.StatusSent
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
require.Len(t, fixture.provider.Inputs(), 1)
|
||||
}
|
||||
|
||||
func TestAttemptWorkersRetryTransientFailuresUntilSuccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fixture := newAttemptWorkerFixture(t, []stubprovider.ScriptedOutcome{
|
||||
{
|
||||
Classification: ports.ClassificationTransientFailure,
|
||||
Script: "retry_1",
|
||||
},
|
||||
{
|
||||
Classification: ports.ClassificationTransientFailure,
|
||||
Script: "retry_2",
|
||||
},
|
||||
{
|
||||
Classification: ports.ClassificationAccepted,
|
||||
Script: "accepted",
|
||||
},
|
||||
})
|
||||
createAcceptedRenderedDelivery(t, fixture.client, common.DeliveryID("delivery-retry-success"), fixture.clock.Now())
|
||||
|
||||
cancel, wait := fixture.run(t)
|
||||
defer func() {
|
||||
cancel()
|
||||
wait()
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
deliveryRecord := loadDeliveryRecord(t, fixture.client, common.DeliveryID("delivery-retry-success"))
|
||||
return deliveryRecord.AttemptCount == 2 && deliveryRecord.Status == deliverydomain.StatusQueued
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
fixture.clock.Advance(time.Minute)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
deliveryRecord := loadDeliveryRecord(t, fixture.client, common.DeliveryID("delivery-retry-success"))
|
||||
return deliveryRecord.AttemptCount == 3 && deliveryRecord.Status == deliverydomain.StatusQueued
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
fixture.clock.Advance(5 * time.Minute)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
deliveryRecord := loadDeliveryRecord(t, fixture.client, common.DeliveryID("delivery-retry-success"))
|
||||
return deliveryRecord.Status == deliverydomain.StatusSent
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
require.Len(t, fixture.provider.Inputs(), 3)
|
||||
}
|
||||
|
||||
func TestAttemptWorkersDeadLetterAfterRetryExhaustion(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fixture := newAttemptWorkerFixture(t, []stubprovider.ScriptedOutcome{
|
||||
{Classification: ports.ClassificationTransientFailure, Script: "retry_1"},
|
||||
{Classification: ports.ClassificationTransientFailure, Script: "retry_2"},
|
||||
{Classification: ports.ClassificationTransientFailure, Script: "retry_3"},
|
||||
{Classification: ports.ClassificationTransientFailure, Script: "retry_4"},
|
||||
})
|
||||
deliveryID := common.DeliveryID("delivery-dead-letter")
|
||||
createAcceptedRenderedDelivery(t, fixture.client, deliveryID, fixture.clock.Now())
|
||||
|
||||
cancel, wait := fixture.run(t)
|
||||
defer func() {
|
||||
cancel()
|
||||
wait()
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return loadDeliveryRecord(t, fixture.client, deliveryID).AttemptCount == 2
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
fixture.clock.Advance(time.Minute)
|
||||
require.Eventually(t, func() bool {
|
||||
return loadDeliveryRecord(t, fixture.client, deliveryID).AttemptCount == 3
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
fixture.clock.Advance(5 * time.Minute)
|
||||
require.Eventually(t, func() bool {
|
||||
return loadDeliveryRecord(t, fixture.client, deliveryID).AttemptCount == 4
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
fixture.clock.Advance(30 * time.Minute)
|
||||
require.Eventually(t, func() bool {
|
||||
return loadDeliveryRecord(t, fixture.client, deliveryID).Status == deliverydomain.StatusDeadLetter
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
deadLetter := loadDeadLetterRecord(t, fixture.client, deliveryID)
|
||||
require.Equal(t, "retry_exhausted", deadLetter.FailureClassification)
|
||||
require.Len(t, fixture.provider.Inputs(), 4)
|
||||
}
|
||||
|
||||
func TestAttemptWorkersRecoverExpiredClaimAfterCrash(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fixture := newAttemptWorkerFixture(t, []stubprovider.ScriptedOutcome{
|
||||
{Classification: ports.ClassificationAccepted, Script: "accepted"},
|
||||
})
|
||||
deliveryID := common.DeliveryID("delivery-recovered")
|
||||
createAcceptedRenderedDelivery(t, fixture.client, deliveryID, fixture.clock.Now())
|
||||
|
||||
claimed, found, err := fixture.store.ClaimDueAttempt(context.Background(), deliveryID, fixture.clock.Now())
|
||||
require.NoError(t, err)
|
||||
require.True(t, found)
|
||||
require.Equal(t, deliverydomain.StatusSending, claimed.Delivery.Status)
|
||||
|
||||
fixture.clock.Advance(20 * time.Millisecond)
|
||||
|
||||
cancel, wait := fixture.run(t)
|
||||
defer func() {
|
||||
cancel()
|
||||
wait()
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
deliveryRecord := loadDeliveryRecord(t, fixture.client, deliveryID)
|
||||
return deliveryRecord.Status == deliverydomain.StatusQueued && deliveryRecord.AttemptCount == 2
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
fixture.clock.Advance(time.Minute)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
deliveryRecord := loadDeliveryRecord(t, fixture.client, deliveryID)
|
||||
return deliveryRecord.Status == deliverydomain.StatusSent
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
require.Len(t, fixture.provider.Inputs(), 1)
|
||||
}
|
||||
|
||||
type attemptWorkerFixture struct {
|
||||
client *redis.Client
|
||||
store *redisstate.AttemptExecutionStore
|
||||
service *executeattempt.Service
|
||||
scheduler *Scheduler
|
||||
pool *AttemptWorkerPool
|
||||
provider *stubprovider.Provider
|
||||
clock *schedulerTestClock
|
||||
}
|
||||
|
||||
func newAttemptWorkerFixture(t *testing.T, scripted []stubprovider.ScriptedOutcome) attemptWorkerFixture {
|
||||
t.Helper()
|
||||
|
||||
server := miniredis.RunT(t)
|
||||
client := redis.NewClient(&redis.Options{Addr: server.Addr()})
|
||||
t.Cleanup(func() { require.NoError(t, client.Close()) })
|
||||
|
||||
store, err := redisstate.NewAttemptExecutionStore(client)
|
||||
require.NoError(t, err)
|
||||
|
||||
provider, err := stubprovider.New(scripted...)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, provider.Close()) })
|
||||
|
||||
clock := &schedulerTestClock{now: time.Unix(1_775_121_700, 0).UTC()}
|
||||
workQueue := make(chan executeattempt.WorkItem, 1)
|
||||
|
||||
service, err := executeattempt.New(executeattempt.Config{
|
||||
Renderer: noopRenderer{},
|
||||
Provider: provider,
|
||||
PayloadLoader: store,
|
||||
Store: store,
|
||||
Clock: clock,
|
||||
AttemptTimeout: 5 * time.Millisecond,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
scheduler, err := NewScheduler(SchedulerConfig{
|
||||
Store: store,
|
||||
Service: service,
|
||||
WorkQueue: workQueue,
|
||||
Clock: clock,
|
||||
AttemptTimeout: 5 * time.Millisecond,
|
||||
PollInterval: 10 * time.Millisecond,
|
||||
RecoveryInterval: 10 * time.Millisecond,
|
||||
RecoveryGrace: 5 * time.Millisecond,
|
||||
}, testWorkerLogger())
|
||||
require.NoError(t, err)
|
||||
|
||||
pool, err := NewAttemptWorkerPool(AttemptWorkerPoolConfig{
|
||||
Concurrency: 1,
|
||||
WorkQueue: workQueue,
|
||||
Service: service,
|
||||
}, testWorkerLogger())
|
||||
require.NoError(t, err)
|
||||
|
||||
return attemptWorkerFixture{
|
||||
client: client,
|
||||
store: store,
|
||||
service: service,
|
||||
scheduler: scheduler,
|
||||
pool: pool,
|
||||
provider: provider,
|
||||
clock: clock,
|
||||
}
|
||||
}
|
||||
|
||||
func (fixture attemptWorkerFixture) run(t *testing.T) (context.CancelFunc, func()) {
|
||||
t.Helper()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
schedulerDone := make(chan error, 1)
|
||||
poolDone := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
schedulerDone <- fixture.scheduler.Run(ctx)
|
||||
}()
|
||||
go func() {
|
||||
poolDone <- fixture.pool.Run(ctx)
|
||||
}()
|
||||
|
||||
wait := func() {
|
||||
require.ErrorIs(t, <-schedulerDone, context.Canceled)
|
||||
require.ErrorIs(t, <-poolDone, context.Canceled)
|
||||
}
|
||||
|
||||
return cancel, wait
|
||||
}
|
||||
|
||||
type schedulerTestClock struct {
|
||||
mu sync.Mutex
|
||||
now time.Time
|
||||
}
|
||||
|
||||
func (clock *schedulerTestClock) Now() time.Time {
|
||||
clock.mu.Lock()
|
||||
defer clock.mu.Unlock()
|
||||
return clock.now
|
||||
}
|
||||
|
||||
func (clock *schedulerTestClock) Advance(delta time.Duration) {
|
||||
clock.mu.Lock()
|
||||
defer clock.mu.Unlock()
|
||||
clock.now = clock.now.Add(delta)
|
||||
}
|
||||
|
||||
type noopRenderer struct{}
|
||||
|
||||
func (noopRenderer) Execute(context.Context, renderdelivery.Input) (renderdelivery.Result, error) {
|
||||
return renderdelivery.Result{}, errors.New("unexpected render invocation")
|
||||
}
|
||||
|
||||
func createAcceptedRenderedDelivery(t *testing.T, client *redis.Client, deliveryID common.DeliveryID, createdAt time.Time) {
|
||||
t.Helper()
|
||||
|
||||
writer, err := redisstate.NewAtomicWriter(client)
|
||||
require.NoError(t, err)
|
||||
|
||||
deliveryRecord := deliverydomain.Delivery{
|
||||
DeliveryID: deliveryID,
|
||||
Source: deliverydomain.SourceNotification,
|
||||
PayloadMode: deliverydomain.PayloadModeRendered,
|
||||
Envelope: deliverydomain.Envelope{
|
||||
To: []common.Email{common.Email("pilot@example.com")},
|
||||
},
|
||||
Content: deliverydomain.Content{
|
||||
Subject: "Turn ready",
|
||||
TextBody: "Turn 54 is ready.",
|
||||
},
|
||||
IdempotencyKey: common.IdempotencyKey("notification:" + deliveryID.String()),
|
||||
Status: deliverydomain.StatusQueued,
|
||||
AttemptCount: 1,
|
||||
CreatedAt: createdAt.UTC().Truncate(time.Millisecond),
|
||||
UpdatedAt: createdAt.UTC().Truncate(time.Millisecond),
|
||||
}
|
||||
require.NoError(t, deliveryRecord.Validate())
|
||||
|
||||
firstAttempt := attempt.Attempt{
|
||||
DeliveryID: deliveryID,
|
||||
AttemptNo: 1,
|
||||
ScheduledFor: createdAt.UTC().Truncate(time.Millisecond),
|
||||
Status: attempt.StatusScheduled,
|
||||
}
|
||||
require.NoError(t, firstAttempt.Validate())
|
||||
|
||||
require.NoError(t, writer.CreateAcceptance(context.Background(), redisstate.CreateAcceptanceInput{
|
||||
Delivery: deliveryRecord,
|
||||
FirstAttempt: &firstAttempt,
|
||||
}))
|
||||
}
|
||||
|
||||
func loadDeliveryRecord(t *testing.T, client *redis.Client, deliveryID common.DeliveryID) deliverydomain.Delivery {
|
||||
t.Helper()
|
||||
|
||||
payload, err := client.Get(context.Background(), redisstate.Keyspace{}.Delivery(deliveryID)).Bytes()
|
||||
require.NoError(t, err)
|
||||
record, err := redisstate.UnmarshalDelivery(payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
return record
|
||||
}
|
||||
|
||||
func loadDeadLetterRecord(t *testing.T, client *redis.Client, deliveryID common.DeliveryID) deliverydomain.DeadLetterEntry {
|
||||
t.Helper()
|
||||
|
||||
payload, err := client.Get(context.Background(), redisstate.Keyspace{}.DeadLetter(deliveryID)).Bytes()
|
||||
require.NoError(t, err)
|
||||
record, err := redisstate.UnmarshalDeadLetter(payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
return record
|
||||
}
|
||||
|
||||
func testWorkerLogger() *slog.Logger {
|
||||
return slog.New(slog.NewJSONHandler(io.Discard, nil))
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"galaxy/mail/internal/adapters/redisstate"
|
||||
)
|
||||
|
||||
const cleanupInterval = time.Hour
|
||||
|
||||
// CleanupWorker stores the idle index cleanup worker used by the Stage 6
|
||||
// runtime skeleton.
|
||||
type CleanupWorker struct {
|
||||
cleaner *redisstate.IndexCleaner
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewCleanupWorker constructs the idle Stage 6 cleanup worker.
|
||||
func NewCleanupWorker(cleaner *redisstate.IndexCleaner, logger *slog.Logger) (*CleanupWorker, error) {
|
||||
if cleaner == nil {
|
||||
return nil, errors.New("new cleanup worker: nil index cleaner")
|
||||
}
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
|
||||
return &CleanupWorker{
|
||||
cleaner: cleaner,
|
||||
logger: logger.With("component", "cleanup_worker"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run starts the idle cleanup worker and blocks until ctx is canceled.
|
||||
func (worker *CleanupWorker) Run(ctx context.Context) error {
|
||||
if ctx == nil {
|
||||
return errors.New("run cleanup worker: nil context")
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
if worker == nil || worker.cleaner == nil {
|
||||
return errors.New("run cleanup worker: nil cleanup worker")
|
||||
}
|
||||
|
||||
worker.logger.Info("cleanup worker started", "interval", cleanupInterval.String())
|
||||
ticker := time.NewTicker(cleanupInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
worker.logger.Info("cleanup worker stopped")
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown stops the cleanup worker within ctx. The Stage 6 skeleton has no
|
||||
// additional resources to release.
|
||||
func (worker *CleanupWorker) Shutdown(ctx context.Context) error {
|
||||
if ctx == nil {
|
||||
return errors.New("shutdown cleanup worker: nil context")
|
||||
}
|
||||
if worker == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,326 @@
|
||||
// Package worker provides the long-lived background components used by the
|
||||
// runnable Mail Service process.
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"galaxy/mail/internal/api/streamcommand"
|
||||
"galaxy/mail/internal/domain/malformedcommand"
|
||||
"galaxy/mail/internal/logging"
|
||||
"galaxy/mail/internal/service/acceptgenericdelivery"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// AcceptGenericDeliveryUseCase accepts one generic asynchronous delivery
|
||||
// command.
|
||||
type AcceptGenericDeliveryUseCase interface {
|
||||
// Execute durably accepts one normalized generic-delivery command.
|
||||
Execute(context.Context, streamcommand.Command) (acceptgenericdelivery.Result, error)
|
||||
}
|
||||
|
||||
// MalformedCommandRecorder stores one operator-visible malformed async command
|
||||
// record.
|
||||
type MalformedCommandRecorder interface {
|
||||
// Record persists entry idempotently by stream entry id.
|
||||
Record(context.Context, malformedcommand.Entry) error
|
||||
}
|
||||
|
||||
// StreamOffsetStore stores the last durably processed entry id of one plain
|
||||
// XREAD consumer.
|
||||
type StreamOffsetStore interface {
|
||||
// Load returns the last processed entry id for stream when one is stored.
|
||||
Load(context.Context, string) (string, bool, error)
|
||||
|
||||
// Save stores the last processed entry id for stream.
|
||||
Save(context.Context, string, string) error
|
||||
}
|
||||
|
||||
// CommandConsumerTelemetry records low-cardinality stream-consumer events.
|
||||
type CommandConsumerTelemetry interface {
|
||||
// RecordMalformedCommand records one malformed or rejected async stream
|
||||
// command.
|
||||
RecordMalformedCommand(context.Context, string)
|
||||
}
|
||||
|
||||
// Clock provides the current wall-clock time.
|
||||
type Clock interface {
|
||||
// Now returns the current time.
|
||||
Now() time.Time
|
||||
}
|
||||
|
||||
type systemClock struct{}
|
||||
|
||||
func (systemClock) Now() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
// CommandConsumerConfig stores the dependencies used by CommandConsumer.
|
||||
type CommandConsumerConfig struct {
|
||||
// Client stores the Redis client used for XREAD.
|
||||
Client *redis.Client
|
||||
|
||||
// Stream stores the Redis Stream name to consume.
|
||||
Stream string
|
||||
|
||||
// BlockTimeout stores the blocking XREAD timeout.
|
||||
BlockTimeout time.Duration
|
||||
|
||||
// Acceptor durably accepts valid generic-delivery commands.
|
||||
Acceptor AcceptGenericDeliveryUseCase
|
||||
|
||||
// MalformedRecorder persists operator-visible malformed-command entries.
|
||||
MalformedRecorder MalformedCommandRecorder
|
||||
|
||||
// OffsetStore stores the last durably processed stream entry id.
|
||||
OffsetStore StreamOffsetStore
|
||||
|
||||
// Telemetry records malformed-command counters.
|
||||
Telemetry CommandConsumerTelemetry
|
||||
|
||||
// Clock provides wall-clock timestamps for malformed-command records.
|
||||
Clock Clock
|
||||
}
|
||||
|
||||
// CommandConsumer stores the Redis Streams consumer used for generic
|
||||
// asynchronous delivery intake.
|
||||
type CommandConsumer struct {
|
||||
client *redis.Client
|
||||
stream string
|
||||
blockTimeout time.Duration
|
||||
acceptor AcceptGenericDeliveryUseCase
|
||||
malformedRecorder MalformedCommandRecorder
|
||||
offsetStore StreamOffsetStore
|
||||
telemetry CommandConsumerTelemetry
|
||||
clock Clock
|
||||
logger *slog.Logger
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
// NewCommandConsumer constructs the generic-delivery command consumer.
|
||||
func NewCommandConsumer(cfg CommandConsumerConfig, logger *slog.Logger) (*CommandConsumer, error) {
|
||||
switch {
|
||||
case cfg.Client == nil:
|
||||
return nil, errors.New("new command consumer: nil redis client")
|
||||
case strings.TrimSpace(cfg.Stream) == "":
|
||||
return nil, errors.New("new command consumer: stream must not be empty")
|
||||
case cfg.BlockTimeout <= 0:
|
||||
return nil, errors.New("new command consumer: block timeout must be positive")
|
||||
case cfg.Acceptor == nil:
|
||||
return nil, errors.New("new command consumer: nil acceptor")
|
||||
case cfg.MalformedRecorder == nil:
|
||||
return nil, errors.New("new command consumer: nil malformed recorder")
|
||||
case cfg.OffsetStore == nil:
|
||||
return nil, errors.New("new command consumer: nil offset store")
|
||||
}
|
||||
if cfg.Clock == nil {
|
||||
cfg.Clock = systemClock{}
|
||||
}
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
|
||||
return &CommandConsumer{
|
||||
client: cfg.Client,
|
||||
stream: cfg.Stream,
|
||||
blockTimeout: cfg.BlockTimeout,
|
||||
acceptor: cfg.Acceptor,
|
||||
malformedRecorder: cfg.MalformedRecorder,
|
||||
offsetStore: cfg.OffsetStore,
|
||||
telemetry: cfg.Telemetry,
|
||||
clock: cfg.Clock,
|
||||
logger: logger.With("component", "command_consumer", "stream", cfg.Stream),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run starts the command consumer and blocks until ctx is canceled or Redis
|
||||
// returns an unexpected error.
|
||||
func (consumer *CommandConsumer) Run(ctx context.Context) error {
|
||||
if ctx == nil {
|
||||
return errors.New("run command consumer: nil context")
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
if consumer == nil || consumer.client == nil {
|
||||
return errors.New("run command consumer: nil consumer")
|
||||
}
|
||||
|
||||
lastID, found, err := consumer.offsetStore.Load(ctx, consumer.stream)
|
||||
if err != nil {
|
||||
return fmt.Errorf("run command consumer: load stream offset: %w", err)
|
||||
}
|
||||
if !found {
|
||||
lastID = "0-0"
|
||||
}
|
||||
|
||||
consumer.logger.Info("command consumer started", "block_timeout", consumer.blockTimeout.String(), "start_entry_id", lastID)
|
||||
|
||||
for {
|
||||
streams, err := consumer.client.XRead(ctx, &redis.XReadArgs{
|
||||
Streams: []string{consumer.stream, lastID},
|
||||
Count: 1,
|
||||
Block: consumer.blockTimeout,
|
||||
}).Result()
|
||||
switch {
|
||||
case err == nil:
|
||||
for _, stream := range streams {
|
||||
for _, message := range stream.Messages {
|
||||
if err := consumer.handleMessage(ctx, message); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := consumer.offsetStore.Save(ctx, consumer.stream, message.ID); err != nil {
|
||||
return fmt.Errorf("run command consumer: save stream offset: %w", err)
|
||||
}
|
||||
lastID = message.ID
|
||||
}
|
||||
}
|
||||
case errors.Is(err, redis.Nil):
|
||||
continue
|
||||
case ctx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, redis.ErrClosed)):
|
||||
consumer.logger.Info("command consumer stopped")
|
||||
return ctx.Err()
|
||||
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded), errors.Is(err, redis.ErrClosed):
|
||||
return fmt.Errorf("run command consumer: %w", err)
|
||||
default:
|
||||
return fmt.Errorf("run command consumer: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (consumer *CommandConsumer) handleMessage(ctx context.Context, message redis.XMessage) error {
|
||||
rawFields := cloneRawFields(message.Values)
|
||||
|
||||
command, err := streamcommand.DecodeCommand(rawFields)
|
||||
if err != nil {
|
||||
return consumer.recordMalformed(ctx, message.ID, rawFields, streamcommand.ClassifyDecodeError(err), err)
|
||||
}
|
||||
|
||||
result, err := consumer.acceptor.Execute(ctx, command)
|
||||
switch {
|
||||
case err == nil:
|
||||
logArgs := logging.CommandAttrs(command)
|
||||
logArgs = append(logArgs,
|
||||
"stream_entry_id", message.ID,
|
||||
"outcome", string(result.Outcome),
|
||||
)
|
||||
logArgs = append(logArgs, logging.TraceAttrsFromContext(ctx)...)
|
||||
consumer.logger.Info("generic command accepted", logArgs...)
|
||||
return nil
|
||||
case errors.Is(err, acceptgenericdelivery.ErrConflict):
|
||||
return consumer.recordMalformed(ctx, message.ID, rawFields, malformedcommand.FailureCodeIdempotencyConflict, err)
|
||||
case errors.Is(err, acceptgenericdelivery.ErrServiceUnavailable):
|
||||
return fmt.Errorf("handle command %q: %w", message.ID, err)
|
||||
default:
|
||||
return fmt.Errorf("handle command %q: %w", message.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (consumer *CommandConsumer) recordMalformed(
|
||||
ctx context.Context,
|
||||
streamEntryID string,
|
||||
rawFields map[string]any,
|
||||
failureCode malformedcommand.FailureCode,
|
||||
cause error,
|
||||
) error {
|
||||
entry := malformedcommand.Entry{
|
||||
StreamEntryID: streamEntryID,
|
||||
DeliveryID: optionalRawString(rawFields, "delivery_id"),
|
||||
Source: optionalRawString(rawFields, "source"),
|
||||
IdempotencyKey: optionalRawString(rawFields, "idempotency_key"),
|
||||
FailureCode: failureCode,
|
||||
FailureMessage: strings.TrimSpace(cause.Error()),
|
||||
RawFields: cloneRawFields(rawFields),
|
||||
RecordedAt: consumer.clock.Now().UTC().Truncate(time.Millisecond),
|
||||
}
|
||||
if err := consumer.malformedRecorder.Record(ctx, entry); err != nil {
|
||||
return fmt.Errorf("record malformed command %q: %w", streamEntryID, err)
|
||||
}
|
||||
if consumer.telemetry != nil {
|
||||
consumer.telemetry.RecordMalformedCommand(ctx, string(failureCode))
|
||||
}
|
||||
|
||||
consumer.logger.Warn("stream command rejected",
|
||||
append([]any{
|
||||
"stream_entry_id", streamEntryID,
|
||||
"delivery_id", entry.DeliveryID,
|
||||
"source", entry.Source,
|
||||
"idempotency_key", entry.IdempotencyKey,
|
||||
"trace_id", optionalRawString(rawFields, "trace_id"),
|
||||
"failure_code", string(entry.FailureCode),
|
||||
"failure_message", entry.FailureMessage,
|
||||
}, logging.TraceAttrsFromContext(ctx)...)...,
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func cloneRawFields(values map[string]any) map[string]any {
|
||||
if values == nil {
|
||||
return map[string]any{}
|
||||
}
|
||||
|
||||
cloned := make(map[string]any, len(values))
|
||||
for key, value := range values {
|
||||
cloned[key] = cloneRawValue(value)
|
||||
}
|
||||
|
||||
return cloned
|
||||
}
|
||||
|
||||
func cloneRawValue(value any) any {
|
||||
switch typed := value.(type) {
|
||||
case map[string]any:
|
||||
return cloneRawFields(typed)
|
||||
case []any:
|
||||
cloned := make([]any, len(typed))
|
||||
for index, item := range typed {
|
||||
cloned[index] = cloneRawValue(item)
|
||||
}
|
||||
return cloned
|
||||
default:
|
||||
return typed
|
||||
}
|
||||
}
|
||||
|
||||
func optionalRawString(values map[string]any, key string) string {
|
||||
raw, ok := values[key]
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
|
||||
value, ok := raw.(string)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
|
||||
return value
|
||||
}
|
||||
|
||||
// Shutdown stops the command consumer within ctx. The consumer uses the
|
||||
// shared process Redis client and therefore has no dedicated resources to
|
||||
// release here.
|
||||
func (consumer *CommandConsumer) Shutdown(ctx context.Context) error {
|
||||
if ctx == nil {
|
||||
return errors.New("shutdown command consumer: nil context")
|
||||
}
|
||||
if consumer == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
consumer.closeOnce.Do(func() {
|
||||
if consumer.client != nil {
|
||||
err = consumer.client.Close()
|
||||
}
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -0,0 +1,391 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"galaxy/mail/internal/adapters/redisstate"
|
||||
"galaxy/mail/internal/service/acceptgenericdelivery"
|
||||
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCommandConsumerAcceptsRenderedCommand(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fixture := newCommandConsumerFixture(t)
|
||||
messageID := addRenderedCommand(t, fixture.client, "mail-123", "notification:mail-123")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- fixture.consumer.Run(ctx)
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
delivery, found, err := fixture.acceptanceStore.GetDelivery(context.Background(), "mail-123")
|
||||
if err != nil || !found {
|
||||
return false
|
||||
}
|
||||
entryID, found, err := fixture.offsetStore.Load(context.Background(), fixture.stream)
|
||||
return err == nil && found && entryID == messageID && delivery.DeliveryID == "mail-123"
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
cancel()
|
||||
require.ErrorIs(t, <-done, context.Canceled)
|
||||
}
|
||||
|
||||
func TestCommandConsumerAcceptsTemplateCommand(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fixture := newCommandConsumerFixture(t)
|
||||
messageID := addTemplateCommand(t, fixture.client, "mail-124", "notification:mail-124")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- fixture.consumer.Run(ctx)
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
delivery, found, err := fixture.acceptanceStore.GetDelivery(context.Background(), "mail-124")
|
||||
if err != nil || !found {
|
||||
return false
|
||||
}
|
||||
entryID, found, err := fixture.offsetStore.Load(context.Background(), fixture.stream)
|
||||
return err == nil && found && entryID == messageID && delivery.TemplateID == "game.turn_ready"
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
cancel()
|
||||
require.ErrorIs(t, <-done, context.Canceled)
|
||||
}
|
||||
|
||||
func TestCommandConsumerRecordsMalformedCommandAndContinues(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fixture := newCommandConsumerFixture(t)
|
||||
malformedID := addMalformedRenderedCommand(t, fixture.client, "mail-bad", "notification:mail-bad")
|
||||
validID := addRenderedCommand(t, fixture.client, "mail-125", "notification:mail-125")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- fixture.consumer.Run(ctx)
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, deliveryFound, deliveryErr := fixture.acceptanceStore.GetDelivery(context.Background(), "mail-125")
|
||||
entry, malformedFound, malformedErr := fixture.malformedStore.Get(context.Background(), malformedID)
|
||||
entryID, offsetFound, offsetErr := fixture.offsetStore.Load(context.Background(), fixture.stream)
|
||||
return deliveryErr == nil &&
|
||||
malformedErr == nil &&
|
||||
offsetErr == nil &&
|
||||
deliveryFound &&
|
||||
malformedFound &&
|
||||
entry.FailureCode == "invalid_payload" &&
|
||||
offsetFound &&
|
||||
entryID == validID
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
cancel()
|
||||
require.ErrorIs(t, <-done, context.Canceled)
|
||||
}
|
||||
|
||||
func TestCommandConsumerRestartsFromSavedOffset(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fixture := newCommandConsumerFixture(t)
|
||||
firstID := addRenderedCommand(t, fixture.client, "mail-126", "notification:mail-126")
|
||||
|
||||
firstCtx, firstCancel := context.WithCancel(context.Background())
|
||||
firstDone := make(chan error, 1)
|
||||
go func() {
|
||||
firstDone <- fixture.consumer.Run(firstCtx)
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
entryID, found, err := fixture.offsetStore.Load(context.Background(), fixture.stream)
|
||||
return err == nil && found && entryID == firstID
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
firstCancel()
|
||||
require.ErrorIs(t, <-firstDone, context.Canceled)
|
||||
|
||||
secondID := addRenderedCommand(t, fixture.client, "mail-127", "notification:mail-127")
|
||||
|
||||
secondCtx, secondCancel := context.WithCancel(context.Background())
|
||||
secondDone := make(chan error, 1)
|
||||
go func() {
|
||||
secondDone <- fixture.consumer.Run(secondCtx)
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, firstFound, firstErr := fixture.acceptanceStore.GetDelivery(context.Background(), "mail-126")
|
||||
_, secondFound, secondErr := fixture.acceptanceStore.GetDelivery(context.Background(), "mail-127")
|
||||
entryID, offsetFound, offsetErr := fixture.offsetStore.Load(context.Background(), fixture.stream)
|
||||
return firstErr == nil &&
|
||||
secondErr == nil &&
|
||||
offsetErr == nil &&
|
||||
firstFound &&
|
||||
secondFound &&
|
||||
offsetFound &&
|
||||
entryID == secondID
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
secondCancel()
|
||||
require.ErrorIs(t, <-secondDone, context.Canceled)
|
||||
}
|
||||
|
||||
func TestCommandConsumerDoesNotDuplicateAcceptanceAfterOffsetSaveFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fixture := newCommandConsumerFixture(t)
|
||||
messageID := addRenderedCommand(t, fixture.client, "mail-128", "notification:mail-128")
|
||||
failingOffsetStore := &scriptedOffsetStore{
|
||||
saveErrs: []error{errors.New("offset unavailable")},
|
||||
}
|
||||
consumer := newCommandConsumerForTest(t, fixture.client, fixture.stream, fixture.acceptor, fixture.malformedStore, failingOffsetStore)
|
||||
|
||||
err := consumer.Run(context.Background())
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "save stream offset")
|
||||
|
||||
delivery, found, err := fixture.acceptanceStore.GetDelivery(context.Background(), "mail-128")
|
||||
require.NoError(t, err)
|
||||
require.True(t, found)
|
||||
require.Equal(t, "mail-128", delivery.DeliveryID.String())
|
||||
|
||||
indexCard, err := fixture.client.ZCard(context.Background(), redisstate.Keyspace{}.CreatedAtIndex()).Result()
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, indexCard)
|
||||
|
||||
replayConsumer := newCommandConsumerForTest(t, fixture.client, fixture.stream, fixture.acceptor, fixture.malformedStore, failingOffsetStore)
|
||||
replayCtx, replayCancel := context.WithCancel(context.Background())
|
||||
replayDone := make(chan error, 1)
|
||||
go func() {
|
||||
replayDone <- replayConsumer.Run(replayCtx)
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return failingOffsetStore.lastEntryID == messageID
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
replayCancel()
|
||||
require.ErrorIs(t, <-replayDone, context.Canceled)
|
||||
|
||||
indexCard, err = fixture.client.ZCard(context.Background(), redisstate.Keyspace{}.CreatedAtIndex()).Result()
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, indexCard)
|
||||
|
||||
scheduleCard, err := fixture.client.ZCard(context.Background(), redisstate.Keyspace{}.AttemptSchedule()).Result()
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, scheduleCard)
|
||||
}
|
||||
|
||||
func TestCommandConsumerRecordsIdempotencyConflictAsMalformed(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
fixture := newCommandConsumerFixture(t)
|
||||
addRenderedCommand(t, fixture.client, "mail-129", "notification:shared")
|
||||
conflictID := addRenderedCommandWithSubject(t, fixture.client, "mail-130", "notification:shared", "Different subject")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- fixture.consumer.Run(ctx)
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, firstFound, firstErr := fixture.acceptanceStore.GetDelivery(context.Background(), "mail-129")
|
||||
_, secondFound, secondErr := fixture.acceptanceStore.GetDelivery(context.Background(), "mail-130")
|
||||
entry, malformedFound, malformedErr := fixture.malformedStore.Get(context.Background(), conflictID)
|
||||
return firstErr == nil &&
|
||||
secondErr == nil &&
|
||||
malformedErr == nil &&
|
||||
firstFound &&
|
||||
!secondFound &&
|
||||
malformedFound &&
|
||||
entry.FailureCode == "idempotency_conflict"
|
||||
}, 5*time.Second, 20*time.Millisecond)
|
||||
|
||||
cancel()
|
||||
require.ErrorIs(t, <-done, context.Canceled)
|
||||
}
|
||||
|
||||
type commandConsumerFixture struct {
|
||||
client *redis.Client
|
||||
stream string
|
||||
consumer *CommandConsumer
|
||||
acceptor *acceptgenericdelivery.Service
|
||||
acceptanceStore *redisstate.GenericAcceptanceStore
|
||||
malformedStore *redisstate.MalformedCommandStore
|
||||
offsetStore *redisstate.StreamOffsetStore
|
||||
}
|
||||
|
||||
func newCommandConsumerFixture(t *testing.T) commandConsumerFixture {
|
||||
t.Helper()
|
||||
|
||||
server := miniredis.RunT(t)
|
||||
client := redis.NewClient(&redis.Options{Addr: server.Addr()})
|
||||
t.Cleanup(func() { require.NoError(t, client.Close()) })
|
||||
|
||||
acceptanceStore, err := redisstate.NewGenericAcceptanceStore(client)
|
||||
require.NoError(t, err)
|
||||
now := time.Now().UTC().Truncate(time.Millisecond)
|
||||
acceptor, err := acceptgenericdelivery.New(acceptgenericdelivery.Config{
|
||||
Store: acceptanceStore,
|
||||
Clock: testClock{now: now},
|
||||
IdempotencyTTL: redisstate.IdempotencyTTL,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
malformedStore, err := redisstate.NewMalformedCommandStore(client)
|
||||
require.NoError(t, err)
|
||||
offsetStore, err := redisstate.NewStreamOffsetStore(client)
|
||||
require.NoError(t, err)
|
||||
|
||||
stream := redisstate.Keyspace{}.DeliveryCommands()
|
||||
consumer := newCommandConsumerForTest(t, client, stream, acceptor, malformedStore, offsetStore)
|
||||
|
||||
return commandConsumerFixture{
|
||||
client: client,
|
||||
stream: stream,
|
||||
consumer: consumer,
|
||||
acceptor: acceptor,
|
||||
acceptanceStore: acceptanceStore,
|
||||
malformedStore: malformedStore,
|
||||
offsetStore: offsetStore,
|
||||
}
|
||||
}
|
||||
|
||||
func newCommandConsumerForTest(
|
||||
t *testing.T,
|
||||
client *redis.Client,
|
||||
stream string,
|
||||
acceptor AcceptGenericDeliveryUseCase,
|
||||
malformedRecorder MalformedCommandRecorder,
|
||||
offsetStore StreamOffsetStore,
|
||||
) *CommandConsumer {
|
||||
t.Helper()
|
||||
|
||||
consumer, err := NewCommandConsumer(CommandConsumerConfig{
|
||||
Client: client,
|
||||
Stream: stream,
|
||||
BlockTimeout: 20 * time.Millisecond,
|
||||
Acceptor: acceptor,
|
||||
MalformedRecorder: malformedRecorder,
|
||||
OffsetStore: offsetStore,
|
||||
Clock: testClock{now: time.Now().UTC().Truncate(time.Millisecond)},
|
||||
}, testLogger())
|
||||
require.NoError(t, err)
|
||||
|
||||
return consumer
|
||||
}
|
||||
|
||||
func addRenderedCommand(t *testing.T, client *redis.Client, deliveryID string, idempotencyKey string) string {
|
||||
t.Helper()
|
||||
|
||||
return addRenderedCommandWithSubject(t, client, deliveryID, idempotencyKey, "Turn ready")
|
||||
}
|
||||
|
||||
func addRenderedCommandWithSubject(t *testing.T, client *redis.Client, deliveryID string, idempotencyKey string, subject string) string {
|
||||
t.Helper()
|
||||
|
||||
messageID, err := client.XAdd(context.Background(), &redis.XAddArgs{
|
||||
Stream: redisstate.Keyspace{}.DeliveryCommands(),
|
||||
Values: map[string]any{
|
||||
"delivery_id": deliveryID,
|
||||
"source": "notification",
|
||||
"payload_mode": "rendered",
|
||||
"idempotency_key": idempotencyKey,
|
||||
"requested_at_ms": "1775121700000",
|
||||
"payload_json": `{"to":["pilot@example.com"],"cc":[],"bcc":[],"reply_to":["noreply@example.com"],"subject":"` + subject + `","text_body":"Turn 54 is ready.","html_body":"<p>Turn 54 is ready.</p>","attachments":[]}`,
|
||||
},
|
||||
}).Result()
|
||||
require.NoError(t, err)
|
||||
|
||||
return messageID
|
||||
}
|
||||
|
||||
func addTemplateCommand(t *testing.T, client *redis.Client, deliveryID string, idempotencyKey string) string {
|
||||
t.Helper()
|
||||
|
||||
messageID, err := client.XAdd(context.Background(), &redis.XAddArgs{
|
||||
Stream: redisstate.Keyspace{}.DeliveryCommands(),
|
||||
Values: map[string]any{
|
||||
"delivery_id": deliveryID,
|
||||
"source": "notification",
|
||||
"payload_mode": "template",
|
||||
"idempotency_key": idempotencyKey,
|
||||
"requested_at_ms": "1775121700001",
|
||||
"payload_json": `{"to":["pilot@example.com"],"cc":[],"bcc":[],"reply_to":[],"template_id":"game.turn_ready","locale":"fr-FR","variables":{"turn_number":54},"attachments":[]}`,
|
||||
},
|
||||
}).Result()
|
||||
require.NoError(t, err)
|
||||
|
||||
return messageID
|
||||
}
|
||||
|
||||
func addMalformedRenderedCommand(t *testing.T, client *redis.Client, deliveryID string, idempotencyKey string) string {
|
||||
t.Helper()
|
||||
|
||||
messageID, err := client.XAdd(context.Background(), &redis.XAddArgs{
|
||||
Stream: redisstate.Keyspace{}.DeliveryCommands(),
|
||||
Values: map[string]any{
|
||||
"delivery_id": deliveryID,
|
||||
"source": "notification",
|
||||
"payload_mode": "rendered",
|
||||
"idempotency_key": idempotencyKey,
|
||||
"requested_at_ms": "1775121700000",
|
||||
"payload_json": `{"to":["pilot@example.com"],"cc":[],"bcc":[],"reply_to":[],"text_body":"Turn 54 is ready.","attachments":[]}`,
|
||||
},
|
||||
}).Result()
|
||||
require.NoError(t, err)
|
||||
|
||||
return messageID
|
||||
}
|
||||
|
||||
type testClock struct {
|
||||
now time.Time
|
||||
}
|
||||
|
||||
func (clock testClock) Now() time.Time {
|
||||
return clock.now
|
||||
}
|
||||
|
||||
type scriptedOffsetStore struct {
|
||||
lastEntryID string
|
||||
found bool
|
||||
saveErrs []error
|
||||
saveCalls int
|
||||
}
|
||||
|
||||
func (store *scriptedOffsetStore) Load(context.Context, string) (string, bool, error) {
|
||||
if !store.found {
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
return store.lastEntryID, true, nil
|
||||
}
|
||||
|
||||
func (store *scriptedOffsetStore) Save(_ context.Context, _ string, entryID string) error {
|
||||
if store.saveCalls < len(store.saveErrs) && store.saveErrs[store.saveCalls] != nil {
|
||||
store.saveCalls++
|
||||
return store.saveErrs[store.saveCalls-1]
|
||||
}
|
||||
|
||||
store.saveCalls++
|
||||
store.lastEntryID = entryID
|
||||
store.found = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func testLogger() *slog.Logger {
|
||||
return slog.New(slog.NewJSONHandler(io.Discard, nil))
|
||||
}
|
||||
@@ -0,0 +1,347 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"galaxy/mail/internal/domain/attempt"
|
||||
"galaxy/mail/internal/domain/common"
|
||||
deliverydomain "galaxy/mail/internal/domain/delivery"
|
||||
"galaxy/mail/internal/logging"
|
||||
"galaxy/mail/internal/service/executeattempt"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultSchedulePollInterval = 250 * time.Millisecond
|
||||
defaultRecoveryInterval = 30 * time.Second
|
||||
defaultRecoveryGrace = 30 * time.Second
|
||||
)
|
||||
|
||||
// AttemptExecutionStore describes the durable state operations used by the
|
||||
// attempt scheduler.
|
||||
type AttemptExecutionStore interface {
|
||||
// NextDueDeliveryIDs returns up to limit due delivery identifiers.
|
||||
NextDueDeliveryIDs(context.Context, time.Time, int64) ([]common.DeliveryID, error)
|
||||
|
||||
// SendingDeliveryIDs returns every delivery currently indexed as sending.
|
||||
SendingDeliveryIDs(context.Context) ([]common.DeliveryID, error)
|
||||
|
||||
// LoadWorkItem loads the current delivery and active attempt for deliveryID.
|
||||
LoadWorkItem(context.Context, common.DeliveryID) (executeattempt.WorkItem, bool, error)
|
||||
|
||||
// ClaimDueAttempt atomically claims the due scheduled attempt for
|
||||
// deliveryID.
|
||||
ClaimDueAttempt(context.Context, common.DeliveryID, time.Time) (executeattempt.WorkItem, bool, error)
|
||||
|
||||
// RemoveScheduledDelivery removes deliveryID from the attempt schedule set.
|
||||
RemoveScheduledDelivery(context.Context, common.DeliveryID) error
|
||||
}
|
||||
|
||||
// AttemptPreparationService prepares queued template deliveries and recovers
|
||||
// stale claimed attempts.
|
||||
type AttemptPreparationService interface {
|
||||
// Prepare renders one queued template delivery when needed and reports
|
||||
// whether the scheduler may continue to claim the attempt.
|
||||
Prepare(context.Context, executeattempt.WorkItem) (bool, error)
|
||||
|
||||
// RecoverExpired marks one stale in-progress attempt as timed out.
|
||||
RecoverExpired(context.Context, executeattempt.WorkItem) error
|
||||
}
|
||||
|
||||
// SchedulerTelemetry records low-cardinality scheduler-side delivery
|
||||
// transitions.
|
||||
type SchedulerTelemetry interface {
|
||||
// RecordDeliveryStatusTransition records one durable delivery status
|
||||
// transition.
|
||||
RecordDeliveryStatusTransition(context.Context, string, string)
|
||||
}
|
||||
|
||||
// SchedulerConfig stores the dependencies used by Scheduler.
|
||||
type SchedulerConfig struct {
|
||||
// Store owns the durable scheduled and in-progress attempt state.
|
||||
Store AttemptExecutionStore
|
||||
|
||||
// Service prepares queued template deliveries and recovers stale claims.
|
||||
Service AttemptPreparationService
|
||||
|
||||
// WorkQueue stores the claimed attempt handoff channel consumed by the
|
||||
// attempt worker pool.
|
||||
WorkQueue chan<- executeattempt.WorkItem
|
||||
|
||||
// Clock provides the scheduler wall clock.
|
||||
Clock Clock
|
||||
|
||||
// AttemptTimeout stores the provider execution budget used to derive claim
|
||||
// recovery deadlines.
|
||||
AttemptTimeout time.Duration
|
||||
|
||||
// Telemetry records scheduler-side delivery transitions.
|
||||
Telemetry SchedulerTelemetry
|
||||
|
||||
// PollInterval overrides the default due-attempt polling interval when
|
||||
// positive.
|
||||
PollInterval time.Duration
|
||||
|
||||
// RecoveryInterval overrides the default stale-claim recovery interval when
|
||||
// positive.
|
||||
RecoveryInterval time.Duration
|
||||
|
||||
// RecoveryGrace overrides the default stale-claim grace window when
|
||||
// positive.
|
||||
RecoveryGrace time.Duration
|
||||
}
|
||||
|
||||
// Scheduler polls due attempts, optionally renders queued template
|
||||
// deliveries, atomically claims runnable work, and recovers stale in-progress
|
||||
// ownership.
|
||||
type Scheduler struct {
|
||||
store AttemptExecutionStore
|
||||
service AttemptPreparationService
|
||||
workQueue chan<- executeattempt.WorkItem
|
||||
clock Clock
|
||||
attemptTimeout time.Duration
|
||||
telemetry SchedulerTelemetry
|
||||
pollInterval time.Duration
|
||||
recoveryInterval time.Duration
|
||||
recoveryGrace time.Duration
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewScheduler constructs one attempt scheduler.
|
||||
func NewScheduler(cfg SchedulerConfig, logger *slog.Logger) (*Scheduler, error) {
|
||||
switch {
|
||||
case cfg.Store == nil:
|
||||
return nil, errors.New("new scheduler: nil attempt execution store")
|
||||
case cfg.Service == nil:
|
||||
return nil, errors.New("new scheduler: nil attempt preparation service")
|
||||
case cfg.WorkQueue == nil:
|
||||
return nil, errors.New("new scheduler: nil work queue")
|
||||
case cfg.Clock == nil:
|
||||
return nil, errors.New("new scheduler: nil clock")
|
||||
case cfg.AttemptTimeout <= 0:
|
||||
return nil, errors.New("new scheduler: non-positive attempt timeout")
|
||||
}
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
|
||||
pollInterval := cfg.PollInterval
|
||||
if pollInterval <= 0 {
|
||||
pollInterval = defaultSchedulePollInterval
|
||||
}
|
||||
|
||||
recoveryInterval := cfg.RecoveryInterval
|
||||
if recoveryInterval <= 0 {
|
||||
recoveryInterval = defaultRecoveryInterval
|
||||
}
|
||||
|
||||
recoveryGrace := cfg.RecoveryGrace
|
||||
if recoveryGrace <= 0 {
|
||||
recoveryGrace = defaultRecoveryGrace
|
||||
}
|
||||
|
||||
return &Scheduler{
|
||||
store: cfg.Store,
|
||||
service: cfg.Service,
|
||||
workQueue: cfg.WorkQueue,
|
||||
clock: cfg.Clock,
|
||||
attemptTimeout: cfg.AttemptTimeout,
|
||||
telemetry: cfg.Telemetry,
|
||||
pollInterval: pollInterval,
|
||||
recoveryInterval: recoveryInterval,
|
||||
recoveryGrace: recoveryGrace,
|
||||
logger: logger.With(
|
||||
"component", "scheduler",
|
||||
"poll_interval", pollInterval.String(),
|
||||
"recovery_interval", recoveryInterval.String(),
|
||||
"recovery_grace", recoveryGrace.String(),
|
||||
),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run starts the scheduler loop and blocks until ctx is canceled or one
|
||||
// durable state operation fails.
|
||||
func (scheduler *Scheduler) Run(ctx context.Context) error {
|
||||
if ctx == nil {
|
||||
return errors.New("run scheduler: nil context")
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
if scheduler == nil {
|
||||
return errors.New("run scheduler: nil scheduler")
|
||||
}
|
||||
|
||||
scheduler.logger.Info("scheduler started")
|
||||
defer scheduler.logger.Info("scheduler stopped")
|
||||
|
||||
if err := scheduler.recoverExpired(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pollTicker := time.NewTicker(scheduler.pollInterval)
|
||||
defer pollTicker.Stop()
|
||||
|
||||
recoveryTicker := time.NewTicker(scheduler.recoveryInterval)
|
||||
defer recoveryTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-pollTicker.C:
|
||||
if err := scheduler.dispatchDueAttempts(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-recoveryTicker.C:
|
||||
if err := scheduler.recoverExpired(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown stops the scheduler within ctx. The scheduler does not own
|
||||
// additional resources beyond its run loop.
|
||||
func (scheduler *Scheduler) Shutdown(ctx context.Context) error {
|
||||
if ctx == nil {
|
||||
return errors.New("shutdown scheduler: nil context")
|
||||
}
|
||||
if scheduler == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scheduler *Scheduler) dispatchDueAttempts(ctx context.Context) error {
|
||||
for {
|
||||
now := scheduler.clock.Now().UTC().Truncate(time.Millisecond)
|
||||
deliveryIDs, err := scheduler.store.NextDueDeliveryIDs(ctx, now, 1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dispatch due attempts: %w", err)
|
||||
}
|
||||
if len(deliveryIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := scheduler.dispatchOne(ctx, deliveryIDs[0], now); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *Scheduler) dispatchOne(ctx context.Context, deliveryID common.DeliveryID, now time.Time) error {
|
||||
workItem, found, err := scheduler.store.LoadWorkItem(ctx, deliveryID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dispatch due delivery %q: load work item: %w", deliveryID, err)
|
||||
}
|
||||
if !found {
|
||||
if err := scheduler.store.RemoveScheduledDelivery(ctx, deliveryID); err != nil {
|
||||
return fmt.Errorf("dispatch due delivery %q: remove stale schedule: %w", deliveryID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if !isSchedulable(workItem) {
|
||||
if err := scheduler.store.RemoveScheduledDelivery(ctx, deliveryID); err != nil {
|
||||
return fmt.Errorf("dispatch due delivery %q: remove unschedulable entry: %w", deliveryID, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
ready, err := scheduler.service.Prepare(ctx, workItem)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dispatch due delivery %q: prepare attempt: %w", deliveryID, err)
|
||||
}
|
||||
if !ready {
|
||||
return nil
|
||||
}
|
||||
|
||||
claimed, found, err := scheduler.store.ClaimDueAttempt(ctx, deliveryID, now)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dispatch due delivery %q: claim attempt: %w", deliveryID, err)
|
||||
}
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
scheduler.recordStatusTransition(ctx, claimed.Delivery)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case scheduler.workQueue <- claimed:
|
||||
logArgs := logging.DeliveryAttemptAttrs(claimed.Delivery, claimed.Attempt)
|
||||
logArgs = append(logArgs, logging.TraceAttrsFromContext(ctx)...)
|
||||
scheduler.logger.Debug("attempt claimed", logArgs...)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *Scheduler) recoverExpired(ctx context.Context) error {
|
||||
now := scheduler.clock.Now().UTC().Truncate(time.Millisecond)
|
||||
deadline := now.Add(-(scheduler.attemptTimeout + scheduler.recoveryGrace))
|
||||
|
||||
deliveryIDs, err := scheduler.store.SendingDeliveryIDs(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("recover expired attempts: %w", err)
|
||||
}
|
||||
|
||||
for _, deliveryID := range deliveryIDs {
|
||||
workItem, found, err := scheduler.store.LoadWorkItem(ctx, deliveryID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("recover expired delivery %q: load work item: %w", deliveryID, err)
|
||||
}
|
||||
if !found || !isRecoverable(workItem) || workItem.Attempt.StartedAt == nil {
|
||||
continue
|
||||
}
|
||||
if workItem.Attempt.StartedAt.After(deadline) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := scheduler.service.RecoverExpired(ctx, workItem); err != nil {
|
||||
return fmt.Errorf("recover expired delivery %q: %w", deliveryID, err)
|
||||
}
|
||||
|
||||
logArgs := logging.DeliveryAttemptAttrs(workItem.Delivery, workItem.Attempt)
|
||||
logArgs = append(logArgs, "started_at", workItem.Attempt.StartedAt)
|
||||
logArgs = append(logArgs, logging.TraceAttrsFromContext(ctx)...)
|
||||
scheduler.logger.Warn("attempt claim expired", logArgs...)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scheduler *Scheduler) recordStatusTransition(ctx context.Context, record deliverydomain.Delivery) {
|
||||
if scheduler == nil || scheduler.telemetry == nil {
|
||||
return
|
||||
}
|
||||
|
||||
scheduler.telemetry.RecordDeliveryStatusTransition(ctx, string(record.Status), string(record.Source))
|
||||
}
|
||||
|
||||
func isSchedulable(item executeattempt.WorkItem) bool {
|
||||
if item.Delivery.AttemptCount != item.Attempt.AttemptNo {
|
||||
return false
|
||||
}
|
||||
switch item.Delivery.Status {
|
||||
case deliverydomain.StatusQueued, deliverydomain.StatusRendered:
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
||||
return item.Attempt.Status == attempt.StatusScheduled
|
||||
}
|
||||
|
||||
func isRecoverable(item executeattempt.WorkItem) bool {
|
||||
if item.Delivery.AttemptCount != item.Attempt.AttemptNo {
|
||||
return false
|
||||
}
|
||||
if item.Delivery.Status != deliverydomain.StatusSending {
|
||||
return false
|
||||
}
|
||||
|
||||
return item.Attempt.Status == attempt.StatusInProgress
|
||||
}
|
||||
Reference in New Issue
Block a user