// Package projectionpublisher implements // ports.GatewaySessionProjectionPublisher with Redis-backed gateway-compatible // cache snapshots and session lifecycle events. package projectionpublisher import ( "context" "crypto/tls" "encoding/json" "errors" "fmt" "strings" "time" "galaxy/authsession/internal/domain/gatewayprojection" "galaxy/authsession/internal/ports" "github.com/redis/go-redis/v9" ) // Config configures one Redis-backed gateway session projection publisher. type Config struct { // Addr is the Redis network address in host:port form. Addr string // Username is the optional Redis ACL username. Username string // Password is the optional Redis ACL password. Password string // DB is the Redis logical database index. DB int // TLSEnabled enables TLS with a conservative minimum protocol version. TLSEnabled bool // SessionCacheKeyPrefix is the namespace prefix applied to gateway session // cache keys. The raw device session identifier is appended directly. SessionCacheKeyPrefix string // SessionEventsStream identifies the gateway session lifecycle Redis Stream. SessionEventsStream string // StreamMaxLen bounds the session lifecycle stream with approximate // trimming via XADD MAXLEN ~. StreamMaxLen int64 // OperationTimeout bounds each Redis round trip performed by the adapter. OperationTimeout time.Duration } // Publisher publishes gateway-compatible session projections into Redis cache // and stream namespaces. type Publisher struct { client *redis.Client sessionCacheKeyPrefix string sessionEventsStream string streamMaxLen int64 operationTimeout time.Duration } type cacheRecord struct { DeviceSessionID string `json:"device_session_id"` UserID string `json:"user_id"` ClientPublicKey string `json:"client_public_key"` Status gatewayprojection.Status `json:"status"` RevokedAtMS *int64 `json:"revoked_at_ms,omitempty"` } // New constructs a Redis-backed gateway session projection publisher from // cfg. func New(cfg Config) (*Publisher, error) { switch { case strings.TrimSpace(cfg.Addr) == "": return nil, errors.New("new redis projection publisher: redis addr must not be empty") case cfg.DB < 0: return nil, errors.New("new redis projection publisher: redis db must not be negative") case strings.TrimSpace(cfg.SessionCacheKeyPrefix) == "": return nil, errors.New("new redis projection publisher: session cache key prefix must not be empty") case strings.TrimSpace(cfg.SessionEventsStream) == "": return nil, errors.New("new redis projection publisher: session events stream must not be empty") case cfg.StreamMaxLen <= 0: return nil, errors.New("new redis projection publisher: stream max len must be positive") case cfg.OperationTimeout <= 0: return nil, errors.New("new redis projection publisher: operation timeout must be positive") } options := &redis.Options{ Addr: cfg.Addr, Username: cfg.Username, Password: cfg.Password, DB: cfg.DB, Protocol: 2, DisableIdentity: true, } if cfg.TLSEnabled { options.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} } return &Publisher{ client: redis.NewClient(options), sessionCacheKeyPrefix: cfg.SessionCacheKeyPrefix, sessionEventsStream: cfg.SessionEventsStream, streamMaxLen: cfg.StreamMaxLen, operationTimeout: cfg.OperationTimeout, }, nil } // Close releases the underlying Redis client resources. func (p *Publisher) Close() error { if p == nil || p.client == nil { return nil } return p.client.Close() } // Ping verifies that the configured Redis backend is reachable within the // adapter operation timeout budget. func (p *Publisher) Ping(ctx context.Context) error { operationCtx, cancel, err := p.operationContext(ctx, "ping redis projection publisher") if err != nil { return err } defer cancel() if err := p.client.Ping(operationCtx).Err(); err != nil { return fmt.Errorf("ping redis projection publisher: %w", err) } return nil } // PublishSession writes one gateway-compatible session snapshot into the // gateway cache namespace and appends the same snapshot to the gateway session // event stream within one Redis transaction. func (p *Publisher) PublishSession(ctx context.Context, snapshot gatewayprojection.Snapshot) error { if err := snapshot.Validate(); err != nil { return fmt.Errorf("publish session projection to redis: %w", err) } payload, err := marshalCacheRecord(snapshot) if err != nil { return fmt.Errorf("publish session projection to redis: %w", err) } values := buildStreamValues(snapshot) operationCtx, cancel, err := p.operationContext(ctx, "publish session projection to redis") if err != nil { return err } defer cancel() key := p.sessionCacheKey(snapshot.DeviceSessionID) _, err = p.client.TxPipelined(operationCtx, func(pipe redis.Pipeliner) error { pipe.Set(operationCtx, key, payload, 0) pipe.XAdd(operationCtx, &redis.XAddArgs{ Stream: p.sessionEventsStream, MaxLen: p.streamMaxLen, Approx: true, Values: values, }) return nil }) if err != nil { return fmt.Errorf("publish session projection %q to redis: %w", snapshot.DeviceSessionID, err) } return nil } func (p *Publisher) operationContext(ctx context.Context, operation string) (context.Context, context.CancelFunc, error) { if p == nil || p.client == nil { return nil, nil, fmt.Errorf("%s: nil publisher", operation) } if ctx == nil { return nil, nil, fmt.Errorf("%s: nil context", operation) } operationCtx, cancel := context.WithTimeout(ctx, p.operationTimeout) return operationCtx, cancel, nil } func (p *Publisher) sessionCacheKey(deviceSessionID interface{ String() string }) string { return p.sessionCacheKeyPrefix + deviceSessionID.String() } func marshalCacheRecord(snapshot gatewayprojection.Snapshot) ([]byte, error) { record := cacheRecord{ DeviceSessionID: snapshot.DeviceSessionID.String(), UserID: snapshot.UserID.String(), ClientPublicKey: snapshot.ClientPublicKey, Status: snapshot.Status, } if snapshot.RevokedAt != nil { revokedAtMS := snapshot.RevokedAt.UTC().UnixMilli() record.RevokedAtMS = &revokedAtMS } payload, err := json.Marshal(record) if err != nil { return nil, fmt.Errorf("marshal gateway session cache record: %w", err) } return payload, nil } func buildStreamValues(snapshot gatewayprojection.Snapshot) map[string]any { values := map[string]any{ "device_session_id": snapshot.DeviceSessionID.String(), "user_id": snapshot.UserID.String(), "client_public_key": snapshot.ClientPublicKey, "status": string(snapshot.Status), } if snapshot.RevokedAt != nil { values["revoked_at_ms"] = fmt.Sprint(snapshot.RevokedAt.UTC().UnixMilli()) } return values } var _ ports.GatewaySessionProjectionPublisher = (*Publisher)(nil)