Files
scrabble-game/gateway/cmd/gateway/main.go
T
Ilia Denisov 9814d78ae3
Tests · UI / test (push) Successful in 19s
Tests · Go / test (pull_request) Successful in 6s
Tests · Integration / integration (pull_request) Failing after 11s
Tests · UI / test (pull_request) Successful in 18s
Stage 9: Telegram integration (connector side-service, Mini App, out-of-app push)
New platform/telegram connector (own container, bot token only there):
- go-telegram/bot long-poll loop: /start deep-links + Mini App launch button.
- gRPC API pkg/proto/telegram/v1 (Telegram service): ValidateInitData, Notify
  (renders a localized message + deep-link button), SendToUser/SendToGameChannel
  (admin, wired in Stage 10). Generic methods are platform-agnostic (external_id).
- Bot API base override for Telegram's test environment; Dockerfile + compose
  (VPN sidecar, no public ingress); README.

Gateway:
- initData validation relocated from the gateway into the connector; the gateway
  calls ValidateInitData over gRPC (GATEWAY_CONNECTOR_ADDR), drops the bot token,
  and deletes internal/auth.
- Out-of-app push: runPushPump routes events whose recipient has no live in-app
  stream to connector.Notify, gated by /internal/push-target + the in-app-only
  flag (race-free de-dup); HasSubscribers added to the push hub.

Backend:
- Migration 00007 accounts.notifications_in_app_only (default true) + jetgen.
- ProvisionTelegram seeds a new account's language/display name from the launch
  fields; IdentityExternalID reverse lookup; /internal/push-target handler.

UI:
- Telegram Mini App launch: detect initData, apply themeParams, authTelegram,
  route the deep-link start_param (g/i/f); /telegram/ guard redirects outside
  Telegram. Vite relative base + telegram-web-app.js. In-app-only profile toggle;
  share-to-Telegram link for a friend code. Vitest + Playwright coverage.

Wire/docs/CI: fbs Profile/UpdateProfileRequest gain notifications_in_app_only
(Go + TS); go.work uses ./platform/telegram; go-unit.yaml covers it; PLAN,
ARCHITECTURE, FUNCTIONAL (+ru), UI_DESIGN, READMEs updated.
2026-06-04 07:05:00 +02:00

244 lines
7.5 KiB
Go

// Command gateway is the Scrabble platform's only public ingress. It terminates
// the client's Connect-RPC/FlatBuffers traffic over h2c, validates platform /
// email / guest credentials and mints opaque sessions, rate-limits, injects
// X-User-ID when forwarding to the backend over REST, and bridges the backend's
// gRPC push stream to each client's in-app live channel. It also fronts the
// backend admin API behind HTTP Basic-Auth.
package main
import (
"context"
"errors"
"log"
"net/http"
"os/signal"
"syscall"
"time"
"go.uber.org/zap"
"scrabble/gateway/internal/admin"
"scrabble/gateway/internal/backendclient"
"scrabble/gateway/internal/config"
"scrabble/gateway/internal/connector"
"scrabble/gateway/internal/connectsrv"
"scrabble/gateway/internal/push"
"scrabble/gateway/internal/ratelimit"
"scrabble/gateway/internal/session"
"scrabble/gateway/internal/transcode"
)
const (
// shutdownTimeout bounds the graceful HTTP shutdown.
shutdownTimeout = 10 * time.Second
// pushReconnectDelay is the pause before re-subscribing to the backend push
// stream after it ends.
pushReconnectDelay = 2 * time.Second
// gatewayID identifies this gateway instance to the backend push channel.
gatewayID = "gateway"
)
func main() {
cfg, err := config.Load()
if err != nil {
log.Fatalf("gateway: load config: %v", err)
}
logger, err := newLogger(cfg.LogLevel)
if err != nil {
log.Fatalf("gateway: build logger: %v", err)
}
defer func() { _ = logger.Sync() }()
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
if err := run(ctx, cfg, logger); err != nil {
logger.Fatal("gateway: terminated", zap.Error(err))
}
}
// run wires the gateway dependencies and serves the public (and optional admin)
// listeners until the context is cancelled.
func run(ctx context.Context, cfg config.Config, logger *zap.Logger) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
backend, err := backendclient.New(cfg.BackendHTTPURL, cfg.BackendGRPCAddr, cfg.BackendTimeout)
if err != nil {
return err
}
defer func() { _ = backend.Close() }()
sessions := session.NewCache(backend, cfg.SessionTTL, cfg.SessionCacheMax)
limiter := ratelimit.New()
hub := push.NewHub(0)
var conn *connector.Client
var validator transcode.TelegramValidator
if cfg.ConnectorAddr != "" {
conn, err = connector.New(cfg.ConnectorAddr)
if err != nil {
return err
}
defer func() { _ = conn.Close() }()
validator = conn
} else {
logger.Warn("telegram disabled (GATEWAY_CONNECTOR_ADDR unset)")
}
registry := transcode.NewRegistry(backend, validator)
edge := connectsrv.NewServer(connectsrv.Deps{
Registry: registry,
Sessions: sessions,
Limiter: limiter,
Hub: hub,
RateLimit: cfg.RateLimit,
Heartbeat: cfg.PushHeartbeatInterval,
Logger: logger,
})
// Bridge the backend push stream into the fan-out hub (and the out-of-app
// channel via the connector).
go runPushPump(ctx, backend, hub, conn, logger)
public := &http.Server{Addr: cfg.HTTPAddr, Handler: edge.HTTPHandler()}
servers := []*namedServer{{name: "public", srv: public}}
if cfg.AdminEnabled() {
proxy, err := admin.NewProxy(cfg.BackendHTTPURL, cfg.AdminUser, cfg.AdminPassword, logger)
if err != nil {
return err
}
servers = append(servers, &namedServer{name: "admin", srv: &http.Server{Addr: cfg.AdminAddr, Handler: proxy}})
} else {
logger.Info("admin proxy disabled (set GATEWAY_ADMIN_USER and GATEWAY_ADMIN_PASSWORD)")
}
logger.Info("gateway starting",
zap.String("http_addr", cfg.HTTPAddr),
zap.String("backend_http", cfg.BackendHTTPURL),
zap.String("backend_grpc", cfg.BackendGRPCAddr))
return runServers(ctx, cancel, servers, logger)
}
// namedServer pairs an HTTP server with a label for diagnostics.
type namedServer struct {
name string
srv *http.Server
}
// runServers serves every listener and shuts them all down when the first one
// stops or the context is cancelled.
func runServers(ctx context.Context, cancel context.CancelFunc, servers []*namedServer, logger *zap.Logger) error {
errc := make(chan error, len(servers))
for _, s := range servers {
go func(s *namedServer) {
logger.Info("listener starting", zap.String("server", s.name), zap.String("addr", s.srv.Addr))
err := s.srv.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
err = nil
}
errc <- err
}(s)
}
var first error
select {
case <-ctx.Done():
case first = <-errc:
}
cancel()
shutdownCtx, sc := context.WithTimeout(context.Background(), shutdownTimeout)
defer sc()
for _, s := range servers {
if err := s.srv.Shutdown(shutdownCtx); err != nil {
logger.Warn("listener shutdown", zap.String("server", s.name), zap.Error(err))
}
}
return first
}
// runPushPump keeps a backend push subscription open, forwarding every event to
// the hub and re-subscribing after the stream ends, until the context is done. For
// the out-of-app push kinds it also routes events whose recipient has no live
// in-app stream to the platform connector (a nil connector disables that channel).
func runPushPump(ctx context.Context, backend *backendclient.Client, hub *push.Hub, conn *connector.Client, logger *zap.Logger) {
for ctx.Err() == nil {
stream, err := backend.SubscribePush(ctx, gatewayID)
if err != nil {
logger.Warn("push subscribe failed", zap.Error(err))
if !sleep(ctx, pushReconnectDelay) {
return
}
continue
}
for {
ev, err := stream.Recv()
if err != nil {
if ctx.Err() == nil {
logger.Warn("push stream ended", zap.Error(err))
}
break
}
hub.Publish(push.Event{
UserID: ev.GetUserId(),
Kind: ev.GetKind(),
Payload: ev.GetPayload(),
EventID: ev.GetEventId(),
})
// Out-of-app fallback: when the recipient has no live in-app stream,
// deliver the event over the platform push channel. Done in a goroutine
// so a slow connector never stalls the in-app firehose.
if conn != nil && connector.OutOfAppKind(ev.GetKind()) && !hub.HasSubscribers(ev.GetUserId()) {
go deliverOutOfApp(ctx, backend, conn, ev.GetUserId(), ev.GetKind(), ev.GetPayload(), logger)
}
}
if !sleep(ctx, pushReconnectDelay) {
return
}
}
}
// deliverOutOfApp resolves the recipient's push target and, when they have a
// Telegram identity and have not confined notifications to the app, asks the
// connector to deliver the event. It is best-effort: every failure is logged and
// dropped (the in-app stream remains the primary channel).
func deliverOutOfApp(ctx context.Context, backend *backendclient.Client, conn *connector.Client, userID, kind string, payload []byte, logger *zap.Logger) {
target, err := backend.PushTarget(ctx, userID)
if err != nil {
logger.Warn("push target lookup failed", zap.String("user_id", userID), zap.Error(err))
return
}
if !connector.DeliverToTarget(target.ExternalID, target.NotificationsInAppOnly) {
return
}
if _, err := conn.Notify(ctx, target.ExternalID, kind, payload, target.Language); err != nil {
logger.Warn("out-of-app notify failed", zap.String("kind", kind), zap.Error(err))
}
}
// sleep waits for d or until ctx is cancelled, reporting whether it waited the
// full duration.
func sleep(ctx context.Context, d time.Duration) bool {
t := time.NewTimer(d)
defer t.Stop()
select {
case <-ctx.Done():
return false
case <-t.C:
return true
}
}
// newLogger builds a production JSON logger at the given level.
func newLogger(level string) (*zap.Logger, error) {
var lvl zap.AtomicLevel
if err := lvl.UnmarshalText([]byte(level)); err != nil {
return nil, err
}
cfg := zap.NewProductionConfig()
cfg.Level = lvl
return cfg.Build()
}