Files
2026-05-06 10:14:55 +03:00

259 lines
8.3 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package notification
import (
"context"
"errors"
"fmt"
"time"
"galaxy/backend/internal/user"
"github.com/google/uuid"
"go.uber.org/zap"
)
// Submit accepts a producer intent, validates it against the catalog,
// resolves recipients, materialises route rows, persists everything in
// one transaction, and best-effort dispatches the routes synchronously.
//
// The contract: producers never block on Submit, and Submit never
// surfaces a validation failure as an error — malformed intents go to
// `notification_malformed_intents` and the call returns nil. Real
// errors (encoder failure, Postgres trouble) are wrapped and returned.
//
// On idempotent re-submit (same kind + idempotency_key) the existing
// notification id is honoured and route materialisation is skipped.
func (s *Service) Submit(ctx context.Context, intent Intent) (uuid.UUID, error) {
entry, ok := LookupCatalog(intent.Kind)
if !ok {
s.recordMalformed(ctx, intent, ErrUnknownKind.Error())
return uuid.Nil, nil
}
if trimSpace(intent.IdempotencyKey) == "" {
s.recordMalformed(ctx, intent, ErrEmptyIdempotencyKey.Error())
return uuid.Nil, nil
}
if !entry.Admin && len(intent.Recipients) == 0 {
s.recordMalformed(ctx, intent, ErrNoRecipients.Error())
return uuid.Nil, nil
}
now := s.nowUTC()
notificationID := uuid.New()
var primaryUserID *uuid.UUID
if !entry.Admin && len(intent.Recipients) == 1 {
uid := intent.Recipients[0]
primaryUserID = &uid
}
routes, err := s.materialiseRoutes(ctx, notificationID, entry, intent, now)
if err != nil {
return uuid.Nil, err
}
storedID, inserted, err := s.deps.Store.InsertNotification(ctx, InsertNotificationArgs{
NotificationID: notificationID,
Kind: intent.Kind,
IdempotencyKey: intent.IdempotencyKey,
UserID: primaryUserID,
Payload: intent.Payload,
Routes: routes,
})
if err != nil {
return uuid.Nil, fmt.Errorf("notification submit: %w", err)
}
if !inserted {
s.deps.Logger.Debug("idempotent submit, returning existing notification",
zap.String("kind", intent.Kind),
zap.String("idempotency_key", intent.IdempotencyKey),
zap.String("notification_id", storedID.String()),
)
return storedID, nil
}
// Best-effort synchronous dispatch: any pending route gets a single
// attempt right now. Failures stay on the row for the worker to
// retry; they are not surfaced to producers.
for i := range routes {
if routes[i].Status != RouteStatusPending {
continue
}
s.bestEffortDispatch(ctx, Notification{
NotificationID: notificationID,
Kind: intent.Kind,
IdempotencyKey: intent.IdempotencyKey,
UserID: primaryUserID,
Payload: intent.Payload,
CreatedAt: now,
}, routeFromSeed(notificationID, routes[i], now))
}
return notificationID, nil
}
// materialiseRoutes builds the per-(recipient, channel) seeds that
// land in `notification_routes`. The function performs recipient
// resolution and the catalog-aware channel fan-out. Each seed already
// carries its terminal status (`pending` for live routes, `skipped`
// for cases where the destination cannot be resolved).
func (s *Service) materialiseRoutes(ctx context.Context, notificationID uuid.UUID, entry CatalogEntry, intent Intent, now time.Time) ([]RouteSeed, error) {
_ = notificationID
maxAttempts := int32(s.deps.Config.MaxAttempts)
if maxAttempts <= 0 {
maxAttempts = 1
}
pendingNext := timePtr(now.UTC())
if entry.Admin {
// Admin-channel kinds: one row per channel, no per-user fan-out.
seeds := make([]RouteSeed, 0, len(entry.Channels))
for _, ch := range entry.Channels {
seed := RouteSeed{
RouteID: uuid.New(),
Channel: ch,
Status: RouteStatusPending,
MaxAttempts: maxAttempts,
NextAttemptAt: pendingNext,
}
if ch == ChannelEmail {
seed.ResolvedEmail = s.adminEmail()
if seed.ResolvedEmail == "" {
seed.Status = RouteStatusSkipped
seed.NextAttemptAt = nil
seed.SkippedAt = timePtr(now.UTC())
seed.LastError = "BACKEND_NOTIFICATION_ADMIN_EMAIL not configured"
s.deps.Logger.Warn("admin notification skipped: admin email not configured",
zap.String("kind", intent.Kind),
zap.String("idempotency_key", intent.IdempotencyKey),
)
}
}
seeds = append(seeds, seed)
}
return seeds, nil
}
// Per-user kinds: fan out across (recipient × channel).
seeds := make([]RouteSeed, 0, len(intent.Recipients)*len(entry.Channels))
for _, userID := range intent.Recipients {
uid := userID
account, err := s.resolveAccount(ctx, userID)
for _, ch := range entry.Channels {
seed := RouteSeed{
RouteID: uuid.New(),
Channel: ch,
Status: RouteStatusPending,
MaxAttempts: maxAttempts,
NextAttemptAt: pendingNext,
UserID: &uid,
DeviceSessionID: intent.DeviceSessionID,
}
switch ch {
case ChannelEmail:
if err != nil {
seed.Status = RouteStatusSkipped
seed.NextAttemptAt = nil
seed.SkippedAt = timePtr(now.UTC())
seed.LastError = err.Error()
} else {
seed.ResolvedEmail = account.Email
seed.ResolvedLocale = account.PreferredLanguage
if trimSpace(seed.ResolvedEmail) == "" {
seed.Status = RouteStatusSkipped
seed.NextAttemptAt = nil
seed.SkippedAt = timePtr(now.UTC())
seed.LastError = "recipient has no email on file"
}
}
case ChannelPush:
if err != nil {
seed.Status = RouteStatusSkipped
seed.NextAttemptAt = nil
seed.SkippedAt = timePtr(now.UTC())
seed.LastError = err.Error()
} else if account.PreferredLanguage != "" {
seed.ResolvedLocale = account.PreferredLanguage
}
}
seeds = append(seeds, seed)
}
}
return seeds, nil
}
// resolveAccount fetches the recipient profile through the configured
// AccountResolver. user.ErrAccountNotFound is mapped to a sentinel-free
// error string so the route is skipped without a stack-trace log.
func (s *Service) resolveAccount(ctx context.Context, userID uuid.UUID) (user.Account, error) {
account, err := s.deps.Accounts.GetAccount(ctx, userID)
if err != nil {
if errors.Is(err, user.ErrAccountNotFound) {
return user.Account{}, errors.New("recipient account not found")
}
return user.Account{}, fmt.Errorf("resolve recipient %s: %w", userID, err)
}
if account.DeletedAt != nil {
return user.Account{}, errors.New("recipient account soft-deleted")
}
return account, nil
}
// recordMalformed best-effort persists an invalid intent. Logger is
// informational; a Postgres failure here is logged but never bubbles
// up to the producer, matching the README §10 contract.
func (s *Service) recordMalformed(ctx context.Context, intent Intent, reason string) {
payload := map[string]any{
"kind": intent.Kind,
"idempotency_key": intent.IdempotencyKey,
}
if len(intent.Payload) > 0 {
payload["payload"] = intent.Payload
}
if len(intent.Recipients) > 0 {
recipients := make([]string, 0, len(intent.Recipients))
for _, r := range intent.Recipients {
recipients = append(recipients, r.String())
}
payload["recipients"] = recipients
}
if intent.DeviceSessionID != nil {
payload["device_session_id"] = intent.DeviceSessionID.String()
}
if err := s.deps.Store.InsertMalformed(ctx, payload, reason); err != nil {
s.deps.Logger.Warn("failed to persist malformed notification intent",
zap.String("kind", intent.Kind),
zap.String("reason", reason),
zap.Error(err),
)
return
}
s.deps.Logger.Info("notification intent dropped as malformed",
zap.String("kind", intent.Kind),
zap.String("reason", reason),
)
}
// routeFromSeed converts a RouteSeed (the pre-insert snapshot the
// dispatcher needs) to a Route value the worker / dispatcher exchange
// after the row is durably persisted.
func routeFromSeed(notificationID uuid.UUID, seed RouteSeed, now time.Time) Route {
r := Route{
RouteID: seed.RouteID,
NotificationID: notificationID,
Channel: seed.Channel,
Status: seed.Status,
Attempts: 0,
MaxAttempts: seed.MaxAttempts,
NextAttemptAt: seed.NextAttemptAt,
ResolvedEmail: seed.ResolvedEmail,
ResolvedLocale: seed.ResolvedLocale,
UserID: seed.UserID,
DeviceSessionID: seed.DeviceSessionID,
CreatedAt: now.UTC(),
UpdatedAt: now.UTC(),
SkippedAt: seed.SkippedAt,
LastError: seed.LastError,
}
return r
}