4cb03736de
Adds the gateway-side translation layer that maps the eight new
ConnectRPC mail commands onto backend's
`/api/v1/user/games/{game_id}/mail/*` REST endpoints.
- `gateway/internal/backendclient/mail_commands.go` defines
`ExecuteMailCommand` and one helper per command (inbox, sent,
message.get, send, broadcast, admin, read, delete). Each helper
decodes the FlatBuffers request envelope, issues the REST call
via the existing `*RESTClient.do`, decodes the JSON body, and
re-encodes a typed FlatBuffers response. Recipient identifiers
travel through unchanged so the new `recipient_race_name`
shortcut introduced in Step 1 reaches backend untouched.
- `routes.go` exposes a `MailRoutes` constructor and a matching
`mailCommandClient` implementing `downstream.Client`.
- `cmd/gateway/main.go` registers the new routes alongside the
existing user / lobby / game-engine routes.
- `mail_commands_test.go` covers the inbox, send-by-race-name, and
read-state paths end-to-end against an `httptest.Server`,
asserting request shapes (path, body, X-User-ID) and the
decoded FlatBuffers response.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
306 lines
9.4 KiB
Go
306 lines
9.4 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
|
|
"galaxy/gateway/internal/adminapi"
|
|
"galaxy/gateway/internal/app"
|
|
"galaxy/gateway/authn"
|
|
"galaxy/gateway/internal/backendclient"
|
|
"galaxy/gateway/internal/config"
|
|
"galaxy/gateway/internal/downstream"
|
|
"galaxy/gateway/internal/events"
|
|
"galaxy/gateway/internal/grpcapi"
|
|
"galaxy/gateway/internal/logging"
|
|
"galaxy/gateway/internal/push"
|
|
"galaxy/gateway/internal/redisclient"
|
|
"galaxy/gateway/internal/replay"
|
|
"galaxy/gateway/internal/restapi"
|
|
"galaxy/gateway/internal/session"
|
|
"galaxy/gateway/internal/telemetry"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var errNoopClose = func() error { return nil }
|
|
|
|
// main loads the gateway configuration, runs the process lifecycle, and exits
|
|
// with a non-zero status when startup or runtime fails.
|
|
func main() {
|
|
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
|
defer cancel()
|
|
|
|
if err := run(ctx); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func run(ctx context.Context) (err error) {
|
|
cfg, err := config.LoadFromEnv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
logger, err := logging.New(cfg.Logging)
|
|
if err != nil {
|
|
return fmt.Errorf("build gateway logger: %w", err)
|
|
}
|
|
|
|
telemetryRuntime, err := telemetry.New(ctx, logger)
|
|
if err != nil {
|
|
return fmt.Errorf("build gateway telemetry: %w", err)
|
|
}
|
|
|
|
backend, err := backendclient.NewClient(backendclient.Config{
|
|
HTTPBaseURL: cfg.Backend.HTTPBaseURL,
|
|
GRPCPushURL: cfg.Backend.GRPCPushURL,
|
|
GatewayClientID: cfg.Backend.GatewayClientID,
|
|
HTTPTimeout: cfg.Backend.HTTPTimeout,
|
|
PushReconnectBaseBackoff: cfg.Backend.PushReconnectBaseBackoff,
|
|
PushReconnectMaxBackoff: cfg.Backend.PushReconnectMaxBackoff,
|
|
})
|
|
if err != nil {
|
|
_ = telemetryRuntime.Shutdown(context.Background())
|
|
_ = logging.Sync(logger)
|
|
return fmt.Errorf("build backend client: %w", err)
|
|
}
|
|
|
|
publicRESTDeps := restapi.ServerDependencies{
|
|
Logger: logger,
|
|
Telemetry: telemetryRuntime,
|
|
AuthService: authServiceAdapter{rest: backend.REST()},
|
|
}
|
|
|
|
grpcDeps, components, cleanup, err := newAuthenticatedGRPCDependencies(ctx, cfg, logger, telemetryRuntime, backend)
|
|
if err != nil {
|
|
_ = backend.Close()
|
|
_ = telemetryRuntime.Shutdown(context.Background())
|
|
_ = logging.Sync(logger)
|
|
return err
|
|
}
|
|
defer func() {
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), cfg.ShutdownTimeout)
|
|
defer cancel()
|
|
|
|
err = errors.Join(
|
|
err,
|
|
cleanup(),
|
|
backend.Close(),
|
|
telemetryRuntime.Shutdown(shutdownCtx),
|
|
logging.Sync(logger),
|
|
)
|
|
}()
|
|
|
|
restServer := restapi.NewServer(cfg.PublicHTTP, publicRESTDeps)
|
|
grpcServer := grpcapi.NewServer(cfg.AuthenticatedGRPC, grpcDeps)
|
|
|
|
applicationComponents := []app.Component{
|
|
restServer,
|
|
grpcServer,
|
|
}
|
|
if adminServer := adminapi.NewServer(cfg.AdminHTTP, telemetryRuntime.Handler(), logger); adminServer.Enabled() {
|
|
applicationComponents = append(applicationComponents, adminServer)
|
|
}
|
|
applicationComponents = append(applicationComponents, components...)
|
|
|
|
logger.Info("gateway application starting",
|
|
zap.String("public_http_addr", cfg.PublicHTTP.Addr),
|
|
zap.String("authenticated_grpc_addr", cfg.AuthenticatedGRPC.Addr),
|
|
zap.String("admin_http_addr", cfg.AdminHTTP.Addr),
|
|
zap.String("backend_http_url", cfg.Backend.HTTPBaseURL),
|
|
zap.String("backend_grpc_push_url", cfg.Backend.GRPCPushURL),
|
|
)
|
|
|
|
application := app.New(cfg, applicationComponents...)
|
|
|
|
err = application.Run(ctx)
|
|
return err
|
|
}
|
|
|
|
func newAuthenticatedGRPCDependencies(ctx context.Context, cfg config.Config, logger *zap.Logger, telemetryRuntime *telemetry.Runtime, backend *backendclient.Client) (grpcapi.ServerDependencies, []app.Component, func() error, error) {
|
|
responseSigner, err := authn.LoadEd25519ResponseSignerFromPEMFile(cfg.ResponseSigner.PrivateKeyPEMPath)
|
|
if err != nil {
|
|
return grpcapi.ServerDependencies{}, nil, nil, fmt.Errorf("build authenticated grpc dependencies: load response signer: %w", err)
|
|
}
|
|
|
|
redisClient := redisclient.NewClient(cfg.Redis)
|
|
if err := redisclient.InstrumentClient(redisClient, telemetryRuntime); err != nil {
|
|
closeErr := redisClient.Close()
|
|
return grpcapi.ServerDependencies{}, nil, nil, errors.Join(
|
|
fmt.Errorf("build authenticated grpc dependencies: %w", err),
|
|
closeErr,
|
|
)
|
|
}
|
|
closeRedisClient := func() error {
|
|
err := redisClient.Close()
|
|
if errors.Is(err, redis.ErrClosed) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
if err := redisclient.Ping(ctx, cfg.Redis, redisClient); err != nil {
|
|
closeErr := closeRedisClient()
|
|
return grpcapi.ServerDependencies{}, nil, nil, errors.Join(
|
|
fmt.Errorf("build authenticated grpc dependencies: %w", err),
|
|
closeErr,
|
|
)
|
|
}
|
|
|
|
sessionCache, err := session.NewMemoryCache(backend.REST(), session.MemoryCacheOptions{
|
|
MaxEntries: cfg.SessionCache.MaxEntries,
|
|
TTL: cfg.SessionCache.TTL,
|
|
Logger: logger,
|
|
})
|
|
if err != nil {
|
|
return grpcapi.ServerDependencies{}, nil, nil, errors.Join(
|
|
fmt.Errorf("build authenticated grpc dependencies: %w", err),
|
|
closeRedisClient(),
|
|
)
|
|
}
|
|
|
|
replayStore, err := replay.NewRedisStore(redisClient, cfg.ReplayRedis)
|
|
if err != nil {
|
|
return grpcapi.ServerDependencies{}, nil, nil, errors.Join(
|
|
fmt.Errorf("build authenticated grpc dependencies: %w", err),
|
|
closeRedisClient(),
|
|
)
|
|
}
|
|
|
|
pushHub := push.NewHubWithObserver(0, telemetry.NewPushObserver(telemetryRuntime))
|
|
|
|
// Composite invalidator: every session_invalidation event flips the
|
|
// cached record to revoked AND closes any active push subscription.
|
|
invalidator := &cacheAndHubInvalidator{cache: sessionCache, hub: pushHub}
|
|
dispatcher := events.NewDispatcher(pushHub, invalidator, logger, telemetryRuntime)
|
|
pushClient := backend.Push().
|
|
WithLogger(logger).
|
|
WithHandler(dispatcher)
|
|
|
|
userRoutes := backendclient.UserRoutes(backend.REST())
|
|
lobbyRoutes := backendclient.LobbyRoutes(backend.REST())
|
|
gameRoutes := backendclient.GameRoutes(backend.REST())
|
|
mailRoutes := backendclient.MailRoutes(backend.REST())
|
|
allRoutes := make(map[string]downstream.Client, len(userRoutes)+len(lobbyRoutes)+len(gameRoutes)+len(mailRoutes))
|
|
for k, v := range userRoutes {
|
|
allRoutes[k] = v
|
|
}
|
|
for k, v := range lobbyRoutes {
|
|
allRoutes[k] = v
|
|
}
|
|
for k, v := range gameRoutes {
|
|
allRoutes[k] = v
|
|
}
|
|
for k, v := range mailRoutes {
|
|
allRoutes[k] = v
|
|
}
|
|
|
|
cleanup := func() error {
|
|
return closeRedisClient()
|
|
}
|
|
|
|
return grpcapi.ServerDependencies{
|
|
Service: grpcapi.NewFanOutPushStreamService(pushHub, responseSigner, nil, logger),
|
|
Router: downstream.NewStaticRouter(allRoutes),
|
|
ResponseSigner: responseSigner,
|
|
SessionCache: sessionCache,
|
|
ReplayStore: replayStore,
|
|
Logger: logger,
|
|
Telemetry: telemetryRuntime,
|
|
PushHub: pushHub,
|
|
}, []app.Component{pushClient}, cleanup, nil
|
|
}
|
|
|
|
// cacheAndHubInvalidator fans every session-invalidation push frame
|
|
// out to both the session cache (so subsequent Lookups see the
|
|
// session as revoked without a backend round-trip) and the push hub
|
|
// (so any active SubscribeEvents stream bound to the session is
|
|
// closed immediately). The shape matches `events.SessionInvalidator`.
|
|
type cacheAndHubInvalidator struct {
|
|
cache session.Cache
|
|
hub *push.Hub
|
|
}
|
|
|
|
func (c *cacheAndHubInvalidator) RevokeDeviceSession(deviceSessionID string) {
|
|
if c == nil {
|
|
return
|
|
}
|
|
if c.cache != nil {
|
|
c.cache.MarkRevoked(deviceSessionID)
|
|
}
|
|
if c.hub != nil {
|
|
c.hub.RevokeDeviceSession(deviceSessionID)
|
|
}
|
|
}
|
|
|
|
func (c *cacheAndHubInvalidator) RevokeAllForUser(userID string) {
|
|
if c == nil {
|
|
return
|
|
}
|
|
if c.cache != nil {
|
|
c.cache.MarkAllRevokedForUser(userID)
|
|
}
|
|
if c.hub != nil {
|
|
c.hub.RevokeAllForUser(userID)
|
|
}
|
|
}
|
|
|
|
// authServiceAdapter adapts backendclient.RESTClient to the
|
|
// restapi.AuthServiceClient interface so the public REST handlers can stay
|
|
// unchanged. The two surfaces share the same JSON wire shape; only the Go
|
|
// type names differ.
|
|
type authServiceAdapter struct {
|
|
rest *backendclient.RESTClient
|
|
}
|
|
|
|
func (a authServiceAdapter) SendEmailCode(ctx context.Context, input restapi.SendEmailCodeInput) (restapi.SendEmailCodeResult, error) {
|
|
if a.rest == nil {
|
|
return restapi.SendEmailCodeResult{}, errors.New("auth service adapter: nil backend client")
|
|
}
|
|
out, err := a.rest.SendEmailCode(ctx, backendclient.SendEmailCodeInput{
|
|
Email: input.Email,
|
|
PreferredLanguage: input.PreferredLanguage,
|
|
})
|
|
if err != nil {
|
|
return restapi.SendEmailCodeResult{}, mapAuthError(err)
|
|
}
|
|
return restapi.SendEmailCodeResult{ChallengeID: out.ChallengeID}, nil
|
|
}
|
|
|
|
func (a authServiceAdapter) ConfirmEmailCode(ctx context.Context, input restapi.ConfirmEmailCodeInput) (restapi.ConfirmEmailCodeResult, error) {
|
|
if a.rest == nil {
|
|
return restapi.ConfirmEmailCodeResult{}, errors.New("auth service adapter: nil backend client")
|
|
}
|
|
out, err := a.rest.ConfirmEmailCode(ctx, backendclient.ConfirmEmailCodeInput{
|
|
ChallengeID: input.ChallengeID,
|
|
Code: input.Code,
|
|
ClientPublicKey: input.ClientPublicKey,
|
|
TimeZone: input.TimeZone,
|
|
})
|
|
if err != nil {
|
|
return restapi.ConfirmEmailCodeResult{}, mapAuthError(err)
|
|
}
|
|
return restapi.ConfirmEmailCodeResult{DeviceSessionID: out.DeviceSessionID}, nil
|
|
}
|
|
|
|
func mapAuthError(err error) error {
|
|
var ae *backendclient.AuthError
|
|
if errors.As(err, &ae) {
|
|
return &restapi.AuthServiceError{
|
|
StatusCode: ae.StatusCode,
|
|
Code: ae.Code,
|
|
Message: ae.Message,
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
var _ restapi.AuthServiceClient = authServiceAdapter{}
|
|
var _ = errNoopClose
|