259 lines
8.3 KiB
Go
259 lines
8.3 KiB
Go
package notification
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"time"
|
||
|
||
"galaxy/backend/internal/user"
|
||
|
||
"github.com/google/uuid"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
// Submit accepts a producer intent, validates it against the catalog,
|
||
// resolves recipients, materialises route rows, persists everything in
|
||
// one transaction, and best-effort dispatches the routes synchronously.
|
||
//
|
||
// The contract: producers never block on Submit, and Submit never
|
||
// surfaces a validation failure as an error — malformed intents go to
|
||
// `notification_malformed_intents` and the call returns nil. Real
|
||
// errors (encoder failure, Postgres trouble) are wrapped and returned.
|
||
//
|
||
// On idempotent re-submit (same kind + idempotency_key) the existing
|
||
// notification id is honoured and route materialisation is skipped.
|
||
func (s *Service) Submit(ctx context.Context, intent Intent) (uuid.UUID, error) {
|
||
entry, ok := LookupCatalog(intent.Kind)
|
||
if !ok {
|
||
s.recordMalformed(ctx, intent, ErrUnknownKind.Error())
|
||
return uuid.Nil, nil
|
||
}
|
||
if trimSpace(intent.IdempotencyKey) == "" {
|
||
s.recordMalformed(ctx, intent, ErrEmptyIdempotencyKey.Error())
|
||
return uuid.Nil, nil
|
||
}
|
||
if !entry.Admin && len(intent.Recipients) == 0 {
|
||
s.recordMalformed(ctx, intent, ErrNoRecipients.Error())
|
||
return uuid.Nil, nil
|
||
}
|
||
|
||
now := s.nowUTC()
|
||
notificationID := uuid.New()
|
||
var primaryUserID *uuid.UUID
|
||
if !entry.Admin && len(intent.Recipients) == 1 {
|
||
uid := intent.Recipients[0]
|
||
primaryUserID = &uid
|
||
}
|
||
|
||
routes, err := s.materialiseRoutes(ctx, notificationID, entry, intent, now)
|
||
if err != nil {
|
||
return uuid.Nil, err
|
||
}
|
||
|
||
storedID, inserted, err := s.deps.Store.InsertNotification(ctx, InsertNotificationArgs{
|
||
NotificationID: notificationID,
|
||
Kind: intent.Kind,
|
||
IdempotencyKey: intent.IdempotencyKey,
|
||
UserID: primaryUserID,
|
||
Payload: intent.Payload,
|
||
Routes: routes,
|
||
})
|
||
if err != nil {
|
||
return uuid.Nil, fmt.Errorf("notification submit: %w", err)
|
||
}
|
||
if !inserted {
|
||
s.deps.Logger.Debug("idempotent submit, returning existing notification",
|
||
zap.String("kind", intent.Kind),
|
||
zap.String("idempotency_key", intent.IdempotencyKey),
|
||
zap.String("notification_id", storedID.String()),
|
||
)
|
||
return storedID, nil
|
||
}
|
||
|
||
// Best-effort synchronous dispatch: any pending route gets a single
|
||
// attempt right now. Failures stay on the row for the worker to
|
||
// retry; they are not surfaced to producers.
|
||
for i := range routes {
|
||
if routes[i].Status != RouteStatusPending {
|
||
continue
|
||
}
|
||
s.bestEffortDispatch(ctx, Notification{
|
||
NotificationID: notificationID,
|
||
Kind: intent.Kind,
|
||
IdempotencyKey: intent.IdempotencyKey,
|
||
UserID: primaryUserID,
|
||
Payload: intent.Payload,
|
||
CreatedAt: now,
|
||
}, routeFromSeed(notificationID, routes[i], now))
|
||
}
|
||
|
||
return notificationID, nil
|
||
}
|
||
|
||
// materialiseRoutes builds the per-(recipient, channel) seeds that
|
||
// land in `notification_routes`. The function performs recipient
|
||
// resolution and the catalog-aware channel fan-out. Each seed already
|
||
// carries its terminal status (`pending` for live routes, `skipped`
|
||
// for cases where the destination cannot be resolved).
|
||
func (s *Service) materialiseRoutes(ctx context.Context, notificationID uuid.UUID, entry CatalogEntry, intent Intent, now time.Time) ([]RouteSeed, error) {
|
||
_ = notificationID
|
||
maxAttempts := int32(s.deps.Config.MaxAttempts)
|
||
if maxAttempts <= 0 {
|
||
maxAttempts = 1
|
||
}
|
||
pendingNext := timePtr(now.UTC())
|
||
|
||
if entry.Admin {
|
||
// Admin-channel kinds: one row per channel, no per-user fan-out.
|
||
seeds := make([]RouteSeed, 0, len(entry.Channels))
|
||
for _, ch := range entry.Channels {
|
||
seed := RouteSeed{
|
||
RouteID: uuid.New(),
|
||
Channel: ch,
|
||
Status: RouteStatusPending,
|
||
MaxAttempts: maxAttempts,
|
||
NextAttemptAt: pendingNext,
|
||
}
|
||
if ch == ChannelEmail {
|
||
seed.ResolvedEmail = s.adminEmail()
|
||
if seed.ResolvedEmail == "" {
|
||
seed.Status = RouteStatusSkipped
|
||
seed.NextAttemptAt = nil
|
||
seed.SkippedAt = timePtr(now.UTC())
|
||
seed.LastError = "BACKEND_NOTIFICATION_ADMIN_EMAIL not configured"
|
||
s.deps.Logger.Warn("admin notification skipped: admin email not configured",
|
||
zap.String("kind", intent.Kind),
|
||
zap.String("idempotency_key", intent.IdempotencyKey),
|
||
)
|
||
}
|
||
}
|
||
seeds = append(seeds, seed)
|
||
}
|
||
return seeds, nil
|
||
}
|
||
|
||
// Per-user kinds: fan out across (recipient × channel).
|
||
seeds := make([]RouteSeed, 0, len(intent.Recipients)*len(entry.Channels))
|
||
for _, userID := range intent.Recipients {
|
||
uid := userID
|
||
account, err := s.resolveAccount(ctx, userID)
|
||
for _, ch := range entry.Channels {
|
||
seed := RouteSeed{
|
||
RouteID: uuid.New(),
|
||
Channel: ch,
|
||
Status: RouteStatusPending,
|
||
MaxAttempts: maxAttempts,
|
||
NextAttemptAt: pendingNext,
|
||
UserID: &uid,
|
||
DeviceSessionID: intent.DeviceSessionID,
|
||
}
|
||
switch ch {
|
||
case ChannelEmail:
|
||
if err != nil {
|
||
seed.Status = RouteStatusSkipped
|
||
seed.NextAttemptAt = nil
|
||
seed.SkippedAt = timePtr(now.UTC())
|
||
seed.LastError = err.Error()
|
||
} else {
|
||
seed.ResolvedEmail = account.Email
|
||
seed.ResolvedLocale = account.PreferredLanguage
|
||
if trimSpace(seed.ResolvedEmail) == "" {
|
||
seed.Status = RouteStatusSkipped
|
||
seed.NextAttemptAt = nil
|
||
seed.SkippedAt = timePtr(now.UTC())
|
||
seed.LastError = "recipient has no email on file"
|
||
}
|
||
}
|
||
case ChannelPush:
|
||
if err != nil {
|
||
seed.Status = RouteStatusSkipped
|
||
seed.NextAttemptAt = nil
|
||
seed.SkippedAt = timePtr(now.UTC())
|
||
seed.LastError = err.Error()
|
||
} else if account.PreferredLanguage != "" {
|
||
seed.ResolvedLocale = account.PreferredLanguage
|
||
}
|
||
}
|
||
seeds = append(seeds, seed)
|
||
}
|
||
}
|
||
return seeds, nil
|
||
}
|
||
|
||
// resolveAccount fetches the recipient profile through the configured
|
||
// AccountResolver. user.ErrAccountNotFound is mapped to a sentinel-free
|
||
// error string so the route is skipped without a stack-trace log.
|
||
func (s *Service) resolveAccount(ctx context.Context, userID uuid.UUID) (user.Account, error) {
|
||
account, err := s.deps.Accounts.GetAccount(ctx, userID)
|
||
if err != nil {
|
||
if errors.Is(err, user.ErrAccountNotFound) {
|
||
return user.Account{}, errors.New("recipient account not found")
|
||
}
|
||
return user.Account{}, fmt.Errorf("resolve recipient %s: %w", userID, err)
|
||
}
|
||
if account.DeletedAt != nil {
|
||
return user.Account{}, errors.New("recipient account soft-deleted")
|
||
}
|
||
return account, nil
|
||
}
|
||
|
||
// recordMalformed best-effort persists an invalid intent. Logger is
|
||
// informational; a Postgres failure here is logged but never bubbles
|
||
// up to the producer, matching the README §10 contract.
|
||
func (s *Service) recordMalformed(ctx context.Context, intent Intent, reason string) {
|
||
payload := map[string]any{
|
||
"kind": intent.Kind,
|
||
"idempotency_key": intent.IdempotencyKey,
|
||
}
|
||
if len(intent.Payload) > 0 {
|
||
payload["payload"] = intent.Payload
|
||
}
|
||
if len(intent.Recipients) > 0 {
|
||
recipients := make([]string, 0, len(intent.Recipients))
|
||
for _, r := range intent.Recipients {
|
||
recipients = append(recipients, r.String())
|
||
}
|
||
payload["recipients"] = recipients
|
||
}
|
||
if intent.DeviceSessionID != nil {
|
||
payload["device_session_id"] = intent.DeviceSessionID.String()
|
||
}
|
||
if err := s.deps.Store.InsertMalformed(ctx, payload, reason); err != nil {
|
||
s.deps.Logger.Warn("failed to persist malformed notification intent",
|
||
zap.String("kind", intent.Kind),
|
||
zap.String("reason", reason),
|
||
zap.Error(err),
|
||
)
|
||
return
|
||
}
|
||
s.deps.Logger.Info("notification intent dropped as malformed",
|
||
zap.String("kind", intent.Kind),
|
||
zap.String("reason", reason),
|
||
)
|
||
}
|
||
|
||
// routeFromSeed converts a RouteSeed (the pre-insert snapshot the
|
||
// dispatcher needs) to a Route value the worker / dispatcher exchange
|
||
// after the row is durably persisted.
|
||
func routeFromSeed(notificationID uuid.UUID, seed RouteSeed, now time.Time) Route {
|
||
r := Route{
|
||
RouteID: seed.RouteID,
|
||
NotificationID: notificationID,
|
||
Channel: seed.Channel,
|
||
Status: seed.Status,
|
||
Attempts: 0,
|
||
MaxAttempts: seed.MaxAttempts,
|
||
NextAttemptAt: seed.NextAttemptAt,
|
||
ResolvedEmail: seed.ResolvedEmail,
|
||
ResolvedLocale: seed.ResolvedLocale,
|
||
UserID: seed.UserID,
|
||
DeviceSessionID: seed.DeviceSessionID,
|
||
CreatedAt: now.UTC(),
|
||
UpdatedAt: now.UTC(),
|
||
SkippedAt: seed.SkippedAt,
|
||
LastError: seed.LastError,
|
||
}
|
||
return r
|
||
}
|