From 1c8e0ca48e499f67aa36d34b3f77ffd57c85fca0 Mon Sep 17 00:00:00 2001 From: IliaDenisov Date: Thu, 9 Apr 2026 15:27:14 +0200 Subject: [PATCH] tests: integration suite --- gateway/.env.example | 9 +- gateway/README.md | 5 +- gateway/cmd/gateway/main.go | 37 +- gateway/cmd/gateway/main_test.go | 68 +++ gateway/internal/config/config.go | 42 ++ gateway/internal/config/config_test.go | 105 +++++ .../restapi/auth_service_http_client.go | 224 ++++++++++ .../restapi/auth_service_http_client_test.go | 346 +++++++++++++++ go.work | 1 + integration/README.md | 72 ++++ .../gateway_authsession_test.go | 265 ++++++++++++ .../gatewayauthsession/harness_test.go | 406 ++++++++++++++++++ integration/go.mod | 31 ++ integration/go.sum | 57 +++ .../internal/contracts/gatewayv1/contract.go | 184 ++++++++ integration/internal/harness/binary.go | 71 +++ integration/internal/harness/keys.go | 54 +++ integration/internal/harness/mail_stub.go | 182 ++++++++ integration/internal/harness/process.go | 276 ++++++++++++ integration/internal/harness/user_stub.go | 323 ++++++++++++++ 20 files changed, 2748 insertions(+), 10 deletions(-) create mode 100644 gateway/internal/restapi/auth_service_http_client.go create mode 100644 gateway/internal/restapi/auth_service_http_client_test.go create mode 100644 integration/README.md create mode 100644 integration/gatewayauthsession/gateway_authsession_test.go create mode 100644 integration/gatewayauthsession/harness_test.go create mode 100644 integration/go.mod create mode 100644 integration/go.sum create mode 100644 integration/internal/contracts/gatewayv1/contract.go create mode 100644 integration/internal/harness/binary.go create mode 100644 integration/internal/harness/keys.go create mode 100644 integration/internal/harness/mail_stub.go create mode 100644 integration/internal/harness/process.go create mode 100644 integration/internal/harness/user_stub.go diff --git a/gateway/.env.example b/gateway/.env.example index 004718f..9d493ef 100644 --- a/gateway/.env.example +++ b/gateway/.env.example @@ -1,7 +1,7 @@ # Required startup settings. GATEWAY_SESSION_CACHE_REDIS_ADDR=127.0.0.1:6379 -GATEWAY_SESSION_EVENTS_REDIS_STREAM=gateway:session-events -GATEWAY_CLIENT_EVENTS_REDIS_STREAM=gateway:client-events +GATEWAY_SESSION_EVENTS_REDIS_STREAM=gateway:session_events +GATEWAY_CLIENT_EVENTS_REDIS_STREAM=gateway:client_events GATEWAY_RESPONSE_SIGNER_PRIVATE_KEY_PEM_PATH=./secrets/response-signer.pem # Main listeners. @@ -17,8 +17,9 @@ GATEWAY_AUTHENTICATED_GRPC_ADDR=127.0.0.1:9090 # GATEWAY_REPLAY_REDIS_KEY_PREFIX=gateway:replay: # GATEWAY_SESSION_CACHE_REDIS_TLS_ENABLED=false -# Optional public-auth integration. Without an injected adapter the routes stay -# mounted and return 503 service_unavailable. +# Optional public-auth integration. Without a configured Auth / Session Service +# base URL the routes stay mounted and return 503 service_unavailable. +# GATEWAY_AUTH_SERVICE_BASE_URL=http://127.0.0.1:8081 # GATEWAY_PUBLIC_AUTH_UPSTREAM_TIMEOUT=3s # Optional shutdown and telemetry tuning. diff --git a/gateway/README.md b/gateway/README.md index d1473a0..c9073b7 100644 --- a/gateway/README.md +++ b/gateway/README.md @@ -21,13 +21,14 @@ Required startup environment variables: Optional integrations: - `GATEWAY_ADMIN_HTTP_ADDR` enables the private `/metrics` listener; -- an injected `AuthServiceClient` enables real public auth handling; +- `GATEWAY_AUTH_SERVICE_BASE_URL` enables real public auth handling through + Auth / Session Service public HTTP; - injected downstream routes are required for successful `ExecuteCommand`. Operational caveats: - public auth routes stay mounted and return `503 service_unavailable` until an - auth adapter is wired; + auth service base URL is configured; - authenticated gRPC starts without downstream routes, but `ExecuteCommand` returns gRPC `UNIMPLEMENTED` until routing is configured. diff --git a/gateway/cmd/gateway/main.go b/gateway/cmd/gateway/main.go index 61ca2a0..a099172 100644 --- a/gateway/cmd/gateway/main.go +++ b/gateway/cmd/gateway/main.go @@ -25,6 +25,8 @@ import ( "go.uber.org/zap" ) +var errNoopClose = func() error { return nil } + // main loads the gateway configuration, runs the process lifecycle, and exits // with a non-zero status when startup or runtime fails. func main() { @@ -53,8 +55,18 @@ func run(ctx context.Context) (err error) { return fmt.Errorf("build gateway telemetry: %w", err) } + publicRESTDeps, closePublicRESTDeps, err := newPublicRESTDependencies(cfg, logger, telemetryRuntime) + if err != nil { + _ = telemetryRuntime.Shutdown(context.Background()) + _ = logging.Sync(logger) + return err + } + grpcDeps, components, cleanup, err := newAuthenticatedGRPCDependencies(ctx, cfg, logger, telemetryRuntime) if err != nil { + _ = closePublicRESTDeps() + _ = telemetryRuntime.Shutdown(context.Background()) + _ = logging.Sync(logger) return err } defer func() { @@ -63,16 +75,14 @@ func run(ctx context.Context) (err error) { err = errors.Join( err, + closePublicRESTDeps(), cleanup(), telemetryRuntime.Shutdown(shutdownCtx), logging.Sync(logger), ) }() - restServer := restapi.NewServer(cfg.PublicHTTP, restapi.ServerDependencies{ - Logger: logger, - Telemetry: telemetryRuntime, - }) + restServer := restapi.NewServer(cfg.PublicHTTP, publicRESTDeps) grpcServer := grpcapi.NewServer(cfg.AuthenticatedGRPC, grpcDeps) applicationComponents := []app.Component{ @@ -96,6 +106,25 @@ func run(ctx context.Context) (err error) { return err } +func newPublicRESTDependencies(cfg config.Config, logger *zap.Logger, telemetryRuntime *telemetry.Runtime) (restapi.ServerDependencies, func() error, error) { + deps := restapi.ServerDependencies{ + Logger: logger, + Telemetry: telemetryRuntime, + } + + if cfg.AuthService.BaseURL == "" { + return deps, errNoopClose, nil + } + + authService, err := restapi.NewHTTPAuthServiceClient(cfg.AuthService.BaseURL) + if err != nil { + return restapi.ServerDependencies{}, nil, fmt.Errorf("build public REST dependencies: auth service client: %w", err) + } + + deps.AuthService = authService + return deps, authService.Close, nil +} + func newAuthenticatedGRPCDependencies(ctx context.Context, cfg config.Config, logger *zap.Logger, telemetryRuntime *telemetry.Runtime) (grpcapi.ServerDependencies, []app.Component, func() error, error) { responseSigner, err := authn.LoadEd25519ResponseSignerFromPEMFile(cfg.ResponseSigner.PrivateKeyPEMPath) if err != nil { diff --git a/gateway/cmd/gateway/main_test.go b/gateway/cmd/gateway/main_test.go index 60ac7f6..e005d20 100644 --- a/gateway/cmd/gateway/main_test.go +++ b/gateway/cmd/gateway/main_test.go @@ -7,12 +7,14 @@ import ( "crypto/x509" "encoding/pem" "net" + "net/http/httptest" "os" "path/filepath" "testing" "time" "galaxy/gateway/internal/config" + "galaxy/gateway/internal/restapi" "github.com/alicebob/miniredis/v2" "github.com/stretchr/testify/assert" @@ -20,6 +22,72 @@ import ( "go.uber.org/zap" ) +func TestNewPublicRESTDependencies(t *testing.T) { + t.Parallel() + + authServer := httptest.NewServer(nil) + defer authServer.Close() + + tests := []struct { + name string + cfg config.Config + assert func(*testing.T, restapi.ServerDependencies) + wantErr string + }{ + { + name: "default unavailable auth service when base url is empty", + cfg: config.Config{}, + assert: func(t *testing.T, deps restapi.ServerDependencies) { + t.Helper() + assert.Nil(t, deps.AuthService) + }, + }, + { + name: "real auth service client when base url is configured", + cfg: config.Config{ + AuthService: config.AuthServiceConfig{ + BaseURL: authServer.URL, + }, + }, + assert: func(t *testing.T, deps restapi.ServerDependencies) { + t.Helper() + require.NotNil(t, deps.AuthService) + _, ok := deps.AuthService.(*restapi.HTTPAuthServiceClient) + assert.True(t, ok) + }, + }, + { + name: "invalid auth service base url fails fast", + cfg: config.Config{ + AuthService: config.AuthServiceConfig{ + BaseURL: "/relative", + }, + }, + wantErr: "auth service client", + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + deps, cleanup, err := newPublicRESTDependencies(tt.cfg, zap.NewNop(), nil) + if tt.wantErr != "" { + require.Error(t, err) + assert.ErrorContains(t, err, tt.wantErr) + return + } + + require.NoError(t, err) + require.NotNil(t, cleanup) + tt.assert(t, deps) + assert.NoError(t, cleanup()) + }) + } +} + func TestNewAuthenticatedGRPCDependencies(t *testing.T) { t.Parallel() diff --git a/gateway/internal/config/config.go b/gateway/internal/config/config.go index d35081d..77b34b6 100644 --- a/gateway/internal/config/config.go +++ b/gateway/internal/config/config.go @@ -4,6 +4,7 @@ package config import ( "fmt" + "net/url" "os" "strconv" "strings" @@ -39,6 +40,11 @@ const ( // configures the timeout budget used for public auth upstream calls. publicAuthUpstreamTimeoutEnvVar = "GATEWAY_PUBLIC_AUTH_UPSTREAM_TIMEOUT" + // authServiceBaseURLEnvVar names the environment variable that configures + // the optional Auth / Session Service public HTTP base URL used by gateway + // public-auth delegation. + authServiceBaseURLEnvVar = "GATEWAY_AUTH_SERVICE_BASE_URL" + // adminHTTPAddrEnvVar names the environment variable that configures the // private admin HTTP listener address. When it is empty, the admin listener // remains disabled. @@ -464,6 +470,15 @@ type PublicHTTPConfig struct { AntiAbuse PublicHTTPAntiAbuseConfig } +// AuthServiceConfig describes the optional public-auth upstream used by the +// gateway runtime. +type AuthServiceConfig struct { + // BaseURL is the absolute base URL of the Auth / Session Service public + // HTTP API. When BaseURL is empty, the gateway keeps using its built-in + // unavailable public-auth adapter. + BaseURL string +} + // AdminHTTPConfig describes the private operational HTTP listener used for // metrics exposure. The listener remains disabled when Addr is empty. type AdminHTTPConfig struct { @@ -591,6 +606,10 @@ type Config struct { // PublicHTTP configures the public unauthenticated REST listener. PublicHTTP PublicHTTPConfig + // AuthService configures the optional public-auth delegation to the Auth / + // Session Service. + AuthService AuthServiceConfig + // AdminHTTP configures the optional private admin listener used for metrics // exposure. AdminHTTP AdminHTTPConfig @@ -766,6 +785,12 @@ func DefaultResponseSignerConfig() ResponseSignerConfig { return ResponseSignerConfig{} } +// DefaultAuthServiceConfig returns the default public-auth upstream settings. +// The zero value keeps the built-in unavailable adapter active. +func DefaultAuthServiceConfig() AuthServiceConfig { + return AuthServiceConfig{} +} + // LoadFromEnv loads Config from the process environment, applies defaults for // omitted settings, and validates the resulting values. func LoadFromEnv() (Config, error) { @@ -773,6 +798,7 @@ func LoadFromEnv() (Config, error) { ShutdownTimeout: defaultShutdownTimeout, Logging: DefaultLoggingConfig(), PublicHTTP: DefaultPublicHTTPConfig(), + AuthService: DefaultAuthServiceConfig(), AdminHTTP: DefaultAdminHTTPConfig(), AuthenticatedGRPC: DefaultAuthenticatedGRPCConfig(), SessionCacheRedis: DefaultSessionCacheRedisConfig(), @@ -825,6 +851,11 @@ func LoadFromEnv() (Config, error) { } cfg.PublicHTTP.AuthUpstreamTimeout = publicAuthUpstreamTimeout + rawAuthServiceBaseURL, ok := os.LookupEnv(authServiceBaseURLEnvVar) + if ok { + cfg.AuthService.BaseURL = rawAuthServiceBaseURL + } + rawAdminHTTPAddr, ok := os.LookupEnv(adminHTTPAddrEnvVar) if ok { cfg.AdminHTTP.Addr = rawAdminHTTPAddr @@ -1082,6 +1113,17 @@ func LoadFromEnv() (Config, error) { if cfg.PublicHTTP.AuthUpstreamTimeout <= 0 { return Config{}, fmt.Errorf("load gateway config: %s must be positive", publicAuthUpstreamTimeoutEnvVar) } + cfg.AuthService.BaseURL = strings.TrimSpace(cfg.AuthService.BaseURL) + if cfg.AuthService.BaseURL != "" { + parsedAuthServiceBaseURL, err := url.Parse(cfg.AuthService.BaseURL) + if err != nil { + return Config{}, fmt.Errorf("load gateway config: parse %s: %w", authServiceBaseURLEnvVar, err) + } + if parsedAuthServiceBaseURL.Scheme == "" || parsedAuthServiceBaseURL.Host == "" { + return Config{}, fmt.Errorf("load gateway config: %s must be an absolute URL", authServiceBaseURLEnvVar) + } + cfg.AuthService.BaseURL = strings.TrimRight(parsedAuthServiceBaseURL.String(), "/") + } if addr := strings.TrimSpace(cfg.AdminHTTP.Addr); addr != "" { cfg.AdminHTTP.Addr = addr } diff --git a/gateway/internal/config/config_test.go b/gateway/internal/config/config_test.go index caf74a1..e97ba6b 100644 --- a/gateway/internal/config/config_test.go +++ b/gateway/internal/config/config_test.go @@ -24,6 +24,9 @@ func TestLoadFromEnv(t *testing.T) { customPublicHTTPAddr := new(string) *customPublicHTTPAddr = "127.0.0.1:9090" + customAuthServiceBaseURL := new(string) + *customAuthServiceBaseURL = " http://127.0.0.1:8082/ " + customAuthenticatedGRPCAddr := new(string) *customAuthenticatedGRPCAddr = "127.0.0.1:9191" @@ -76,6 +79,7 @@ func TestLoadFromEnv(t *testing.T) { name string shutdownTimeout *string publicHTTPAddr *string + authServiceBaseURL *string authenticatedGRPCAddr *string authenticatedGRPCFreshnessWindow *string sessionCacheRedisAddr *string @@ -179,6 +183,40 @@ func TestLoadFromEnv(t *testing.T) { }, }, }, + { + name: "custom auth service base url", + authServiceBaseURL: customAuthServiceBaseURL, + sessionCacheRedisAddr: customSessionCacheRedisAddr, + responseSignerPrivateKeyPEMPath: customResponseSignerPrivateKeyPEMPath, + want: Config{ + ShutdownTimeout: 5 * time.Second, + Logging: DefaultLoggingConfig(), + PublicHTTP: DefaultPublicHTTPConfig(), + AuthService: AuthServiceConfig{ + BaseURL: "http://127.0.0.1:8082", + }, + AdminHTTP: DefaultAdminHTTPConfig(), + AuthenticatedGRPC: DefaultAuthenticatedGRPCConfig(), + SessionCacheRedis: SessionCacheRedisConfig{ + Addr: "127.0.0.1:6379", + DB: defaultSessionCacheRedisDB, + KeyPrefix: defaultSessionCacheRedisKeyPrefix, + LookupTimeout: defaultSessionCacheRedisLookupTimeout, + }, + ReplayRedis: DefaultReplayRedisConfig(), + SessionEventsRedis: SessionEventsRedisConfig{ + Stream: "gateway:session_events", + ReadBlockTimeout: defaultSessionEventsRedisReadBlockTimeout, + }, + ClientEventsRedis: ClientEventsRedisConfig{ + Stream: "gateway:client_events", + ReadBlockTimeout: defaultClientEventsRedisReadBlockTimeout, + }, + ResponseSigner: ResponseSignerConfig{ + PrivateKeyPEMPath: *customResponseSignerPrivateKeyPEMPath, + }, + }, + }, { name: "custom authenticated grpc address", authenticatedGRPCAddr: customAuthenticatedGRPCAddr, @@ -329,6 +367,7 @@ func TestLoadFromEnv(t *testing.T) { restoreEnvs(t, shutdownTimeoutEnvVar, publicHTTPAddrEnvVar, + authServiceBaseURLEnvVar, authenticatedGRPCAddrEnvVar, authenticatedGRPCFreshnessWindowEnvVar, sessionCacheRedisAddrEnvVar, @@ -339,6 +378,7 @@ func TestLoadFromEnv(t *testing.T) { setEnvValue(t, shutdownTimeoutEnvVar, tt.shutdownTimeout) setEnvValue(t, publicHTTPAddrEnvVar, tt.publicHTTPAddr) + setEnvValue(t, authServiceBaseURLEnvVar, tt.authServiceBaseURL) setEnvValue(t, authenticatedGRPCAddrEnvVar, tt.authenticatedGRPCAddr) setEnvValue(t, authenticatedGRPCFreshnessWindowEnvVar, tt.authenticatedGRPCFreshnessWindow) setEnvValue(t, sessionCacheRedisAddrEnvVar, tt.sessionCacheRedisAddr) @@ -477,6 +517,70 @@ func TestLoadFromEnvOperationalSettings(t *testing.T) { } } +func TestLoadFromEnvAuthService(t *testing.T) { + t.Parallel() + + customSessionCacheRedisAddr := new(string) + *customSessionCacheRedisAddr = "127.0.0.1:6379" + + customSessionEventsRedisStream := new(string) + *customSessionEventsRedisStream = "gateway:session_events" + + customClientEventsRedisStream := new(string) + *customClientEventsRedisStream = "gateway:client_events" + + customResponseSignerPrivateKeyPEMPath := new(string) + *customResponseSignerPrivateKeyPEMPath = writeTestResponseSignerPEMFile(t) + + invalidRelativeURL := new(string) + *invalidRelativeURL = "/authsession" + + invalidURL := new(string) + *invalidURL = "://bad" + + tests := []struct { + name string + value *string + wantErr string + }{ + { + name: "relative url rejected", + value: invalidRelativeURL, + wantErr: authServiceBaseURLEnvVar + " must be an absolute URL", + }, + { + name: "malformed url rejected", + value: invalidURL, + wantErr: "parse " + authServiceBaseURLEnvVar, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + restoreEnvs(t, + authServiceBaseURLEnvVar, + sessionCacheRedisAddrEnvVar, + sessionEventsRedisStreamEnvVar, + clientEventsRedisStreamEnvVar, + responseSignerPrivateKeyPEMPathEnvVar, + ) + setEnvValue(t, authServiceBaseURLEnvVar, tt.value) + setEnvValue(t, sessionCacheRedisAddrEnvVar, customSessionCacheRedisAddr) + setEnvValue(t, sessionEventsRedisStreamEnvVar, customSessionEventsRedisStream) + setEnvValue(t, clientEventsRedisStreamEnvVar, customClientEventsRedisStream) + setEnvValue(t, responseSignerPrivateKeyPEMPathEnvVar, customResponseSignerPrivateKeyPEMPath) + + _, err := LoadFromEnv() + require.Error(t, err) + require.ErrorContains(t, err, tt.wantErr) + }) + } +} + func TestLoadFromEnvAuthenticatedGRPCAntiAbuse(t *testing.T) { customSessionCacheRedisAddr := new(string) *customSessionCacheRedisAddr = "127.0.0.1:6379" @@ -1212,6 +1316,7 @@ func operationalEnvVars() []string { publicHTTPReadTimeoutEnvVar, publicHTTPIdleTimeoutEnvVar, publicAuthUpstreamTimeoutEnvVar, + authServiceBaseURLEnvVar, adminHTTPAddrEnvVar, adminHTTPReadHeaderTimeoutEnvVar, adminHTTPReadTimeoutEnvVar, diff --git a/gateway/internal/restapi/auth_service_http_client.go b/gateway/internal/restapi/auth_service_http_client.go new file mode 100644 index 0000000..65eec90 --- /dev/null +++ b/gateway/internal/restapi/auth_service_http_client.go @@ -0,0 +1,224 @@ +package restapi + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strings" +) + +const ( + authServiceSendEmailCodePath = "/api/v1/public/auth/send-email-code" + authServiceConfirmEmailCodePath = "/api/v1/public/auth/confirm-email-code" +) + +// HTTPAuthServiceClient implements AuthServiceClient over the Auth / Session +// Service public HTTP API using strict JSON request and response decoding. +type HTTPAuthServiceClient struct { + baseURL string + httpClient *http.Client +} + +type authServiceErrorEnvelope struct { + Error *authServiceErrorBody `json:"error"` +} + +type authServiceErrorBody struct { + Code string `json:"code"` + Message string `json:"message"` +} + +// NewHTTPAuthServiceClient constructs an AuthServiceClient that delegates the +// gateway public-auth routes to the Auth / Session Service public HTTP API at +// baseURL. The resulting client relies only on the caller-provided context for +// cancellation and timeout control. +func NewHTTPAuthServiceClient(baseURL string) (*HTTPAuthServiceClient, error) { + transport, ok := http.DefaultTransport.(*http.Transport) + if !ok { + return nil, errors.New("new auth service HTTP client: default transport is not *http.Transport") + } + + return newHTTPAuthServiceClient(baseURL, &http.Client{ + Transport: transport.Clone(), + }) +} + +func newHTTPAuthServiceClient(baseURL string, httpClient *http.Client) (*HTTPAuthServiceClient, error) { + if httpClient == nil { + return nil, errors.New("new auth service HTTP client: http client must not be nil") + } + + trimmedBaseURL := strings.TrimSpace(baseURL) + if trimmedBaseURL == "" { + return nil, errors.New("new auth service HTTP client: base URL must not be empty") + } + + parsedBaseURL, err := url.Parse(strings.TrimRight(trimmedBaseURL, "/")) + if err != nil { + return nil, fmt.Errorf("new auth service HTTP client: parse base URL: %w", err) + } + if parsedBaseURL.Scheme == "" || parsedBaseURL.Host == "" { + return nil, errors.New("new auth service HTTP client: base URL must be absolute") + } + + return &HTTPAuthServiceClient{ + baseURL: parsedBaseURL.String(), + httpClient: httpClient, + }, nil +} + +// Close releases idle HTTP connections owned by the client transport. +func (c *HTTPAuthServiceClient) Close() error { + if c == nil || c.httpClient == nil { + return nil + } + + type idleCloser interface { + CloseIdleConnections() + } + + if transport, ok := c.httpClient.Transport.(idleCloser); ok { + transport.CloseIdleConnections() + } + + return nil +} + +// SendEmailCode delegates the public send-email-code route to the configured +// Auth / Session Service public HTTP API. +func (c *HTTPAuthServiceClient) SendEmailCode(ctx context.Context, input SendEmailCodeInput) (SendEmailCodeResult, error) { + payload, statusCode, err := c.doJSONRequest(ctx, authServiceSendEmailCodePath, input) + if err != nil { + return SendEmailCodeResult{}, fmt.Errorf("send email code via auth service: %w", err) + } + + switch { + case statusCode == http.StatusOK: + var result SendEmailCodeResult + if err := decodeStrictJSONPayload(payload, &result); err != nil { + return SendEmailCodeResult{}, fmt.Errorf("send email code via auth service: decode success response: %w", err) + } + if err := validateSendEmailCodeResult(&result); err != nil { + return SendEmailCodeResult{}, fmt.Errorf("send email code via auth service: %w", err) + } + + return result, nil + case statusCode >= 400 && statusCode <= 599: + authErr, err := decodeAuthServiceError(statusCode, payload) + if err != nil { + return SendEmailCodeResult{}, fmt.Errorf("send email code via auth service: %w", err) + } + + return SendEmailCodeResult{}, authErr + default: + return SendEmailCodeResult{}, fmt.Errorf("send email code via auth service: unexpected HTTP status %d", statusCode) + } +} + +// ConfirmEmailCode delegates the public confirm-email-code route to the +// configured Auth / Session Service public HTTP API. +func (c *HTTPAuthServiceClient) ConfirmEmailCode(ctx context.Context, input ConfirmEmailCodeInput) (ConfirmEmailCodeResult, error) { + payload, statusCode, err := c.doJSONRequest(ctx, authServiceConfirmEmailCodePath, input) + if err != nil { + return ConfirmEmailCodeResult{}, fmt.Errorf("confirm email code via auth service: %w", err) + } + + switch { + case statusCode == http.StatusOK: + var result ConfirmEmailCodeResult + if err := decodeStrictJSONPayload(payload, &result); err != nil { + return ConfirmEmailCodeResult{}, fmt.Errorf("confirm email code via auth service: decode success response: %w", err) + } + if err := validateConfirmEmailCodeResult(&result); err != nil { + return ConfirmEmailCodeResult{}, fmt.Errorf("confirm email code via auth service: %w", err) + } + + return result, nil + case statusCode >= 400 && statusCode <= 599: + authErr, err := decodeAuthServiceError(statusCode, payload) + if err != nil { + return ConfirmEmailCodeResult{}, fmt.Errorf("confirm email code via auth service: %w", err) + } + + return ConfirmEmailCodeResult{}, authErr + default: + return ConfirmEmailCodeResult{}, fmt.Errorf("confirm email code via auth service: unexpected HTTP status %d", statusCode) + } +} + +func (c *HTTPAuthServiceClient) doJSONRequest(ctx context.Context, path string, requestBody any) ([]byte, int, error) { + if c == nil || c.httpClient == nil { + return nil, 0, errors.New("nil client") + } + if ctx == nil { + return nil, 0, errors.New("nil context") + } + if err := ctx.Err(); err != nil { + return nil, 0, err + } + + payload, err := json.Marshal(requestBody) + if err != nil { + return nil, 0, fmt.Errorf("marshal request body: %w", err) + } + + request, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, bytes.NewReader(payload)) + if err != nil { + return nil, 0, fmt.Errorf("build request: %w", err) + } + request.Header.Set("Content-Type", "application/json") + + response, err := c.httpClient.Do(request) + if err != nil { + return nil, 0, err + } + defer response.Body.Close() + + responsePayload, err := io.ReadAll(response.Body) + if err != nil { + return nil, 0, fmt.Errorf("read response body: %w", err) + } + + return responsePayload, response.StatusCode, nil +} + +func decodeAuthServiceError(statusCode int, payload []byte) (*AuthServiceError, error) { + var envelope authServiceErrorEnvelope + if err := decodeStrictJSONPayload(payload, &envelope); err != nil { + return nil, fmt.Errorf("decode error response: %w", err) + } + if envelope.Error == nil { + return nil, errors.New("decode error response: missing error object") + } + + return &AuthServiceError{ + StatusCode: statusCode, + Code: envelope.Error.Code, + Message: envelope.Error.Message, + }, nil +} + +func decodeStrictJSONPayload(payload []byte, target any) error { + decoder := json.NewDecoder(bytes.NewReader(payload)) + decoder.DisallowUnknownFields() + + if err := decoder.Decode(target); err != nil { + return err + } + if err := decoder.Decode(&struct{}{}); err != io.EOF { + if err == nil { + return errors.New("unexpected trailing JSON input") + } + + return err + } + + return nil +} + +var _ AuthServiceClient = (*HTTPAuthServiceClient)(nil) diff --git a/gateway/internal/restapi/auth_service_http_client_test.go b/gateway/internal/restapi/auth_service_http_client_test.go new file mode 100644 index 0000000..d516807 --- /dev/null +++ b/gateway/internal/restapi/auth_service_http_client_test.go @@ -0,0 +1,346 @@ +package restapi + +import ( + "context" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewHTTPAuthServiceClient(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + baseURL string + wantErr string + }{ + { + name: "success", + baseURL: " http://127.0.0.1:8080/ ", + }, + { + name: "empty base url", + wantErr: "base URL must not be empty", + }, + { + name: "relative base url", + baseURL: "/authsession", + wantErr: "base URL must be absolute", + }, + { + name: "malformed base url", + baseURL: "://bad", + wantErr: "parse base URL", + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + client, err := NewHTTPAuthServiceClient(tt.baseURL) + if tt.wantErr != "" { + require.Error(t, err) + assert.ErrorContains(t, err, tt.wantErr) + return + } + + require.NoError(t, err) + assert.Equal(t, "http://127.0.0.1:8080", client.baseURL) + assert.NoError(t, client.Close()) + }) + } +} + +func TestHTTPAuthServiceClientSendEmailCodeSuccess(t *testing.T) { + t.Parallel() + + var requestContentType string + var requestBody string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPost, r.Method) + assert.Equal(t, authServiceSendEmailCodePath, r.URL.Path) + + requestContentType = r.Header.Get("Content-Type") + payload, err := io.ReadAll(r.Body) + require.NoError(t, err) + requestBody = string(payload) + + w.Header().Set("Content-Type", "application/json") + _, err = io.WriteString(w, `{"challenge_id":"challenge-123"}`) + require.NoError(t, err) + })) + defer server.Close() + + client := newTestHTTPAuthServiceClient(t, server) + + result, err := client.SendEmailCode(context.Background(), SendEmailCodeInput{ + Email: "pilot@example.com", + }) + require.NoError(t, err) + assert.Equal(t, SendEmailCodeResult{ChallengeID: "challenge-123"}, result) + assert.Equal(t, "application/json", requestContentType) + assert.JSONEq(t, `{"email":"pilot@example.com"}`, requestBody) +} + +func TestHTTPAuthServiceClientConfirmEmailCodeSuccess(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPost, r.Method) + assert.Equal(t, authServiceConfirmEmailCodePath, r.URL.Path) + + payload, err := io.ReadAll(r.Body) + require.NoError(t, err) + assert.JSONEq(t, `{"challenge_id":"challenge-123","code":"123456","client_public_key":"public-key","time_zone":"Europe/Kaliningrad"}`, string(payload)) + + w.Header().Set("Content-Type", "application/json") + _, err = io.WriteString(w, `{"device_session_id":"device-session-123"}`) + require.NoError(t, err) + })) + defer server.Close() + + client := newTestHTTPAuthServiceClient(t, server) + + result, err := client.ConfirmEmailCode(context.Background(), ConfirmEmailCodeInput{ + ChallengeID: "challenge-123", + Code: "123456", + ClientPublicKey: "public-key", + TimeZone: "Europe/Kaliningrad", + }) + require.NoError(t, err) + assert.Equal(t, ConfirmEmailCodeResult{DeviceSessionID: "device-session-123"}, result) +} + +func TestHTTPAuthServiceClientProjectsAuthServiceErrors(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + statusCode int + responseBody string + call func(*HTTPAuthServiceClient) error + wantStatusCode int + wantCode string + wantMessage string + }{ + { + name: "send email code error", + statusCode: http.StatusServiceUnavailable, + responseBody: `{"error":{"code":"service_unavailable","message":"service is unavailable"}}`, + call: func(client *HTTPAuthServiceClient) error { + _, err := client.SendEmailCode(context.Background(), SendEmailCodeInput{Email: "pilot@example.com"}) + return err + }, + wantStatusCode: http.StatusServiceUnavailable, + wantCode: "service_unavailable", + wantMessage: "service is unavailable", + }, + { + name: "confirm email code error", + statusCode: http.StatusConflict, + responseBody: `{"error":{"code":"session_limit_exceeded","message":"active session limit would be exceeded"}}`, + call: func(client *HTTPAuthServiceClient) error { + _, err := client.ConfirmEmailCode(context.Background(), ConfirmEmailCodeInput{ + ChallengeID: "challenge-123", + Code: "123456", + ClientPublicKey: "public-key", + TimeZone: "Europe/Kaliningrad", + }) + return err + }, + wantStatusCode: http.StatusConflict, + wantCode: "session_limit_exceeded", + wantMessage: "active session limit would be exceeded", + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(tt.statusCode) + _, err := io.WriteString(w, tt.responseBody) + require.NoError(t, err) + })) + defer server.Close() + + client := newTestHTTPAuthServiceClient(t, server) + err := tt.call(client) + require.Error(t, err) + + var authErr *AuthServiceError + require.ErrorAs(t, err, &authErr) + assert.Equal(t, tt.wantStatusCode, authErr.StatusCode) + assert.Equal(t, tt.wantCode, authErr.Code) + assert.Equal(t, tt.wantMessage, authErr.Message) + }) + } +} + +func TestHTTPAuthServiceClientRejectsMalformedPayloads(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + path string + statusCode int + responseBody string + wantErr string + }{ + { + name: "send email code rejects unknown success field", + path: authServiceSendEmailCodePath, + statusCode: http.StatusOK, + responseBody: `{"challenge_id":"challenge-123","extra":true}`, + wantErr: "decode success response", + }, + { + name: "confirm email code rejects empty success field", + path: authServiceConfirmEmailCodePath, + statusCode: http.StatusOK, + responseBody: `{"device_session_id":" "}`, + wantErr: "empty device_session_id", + }, + { + name: "rejects missing error object", + path: authServiceSendEmailCodePath, + statusCode: http.StatusBadRequest, + responseBody: `{}`, + wantErr: "missing error object", + }, + { + name: "rejects malformed error envelope", + path: authServiceConfirmEmailCodePath, + statusCode: http.StatusBadRequest, + responseBody: `{"error":{"code":"invalid_code","message":"confirmation code is invalid","extra":true}}`, + wantErr: "decode error response", + }, + { + name: "rejects unexpected status", + path: authServiceSendEmailCodePath, + statusCode: http.StatusCreated, + responseBody: `{"challenge_id":"challenge-123"}`, + wantErr: "unexpected HTTP status 201", + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, tt.path, r.URL.Path) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(tt.statusCode) + _, err := io.WriteString(w, tt.responseBody) + require.NoError(t, err) + })) + defer server.Close() + + client := newTestHTTPAuthServiceClient(t, server) + + var err error + switch tt.path { + case authServiceSendEmailCodePath: + _, err = client.SendEmailCode(context.Background(), SendEmailCodeInput{Email: "pilot@example.com"}) + default: + _, err = client.ConfirmEmailCode(context.Background(), ConfirmEmailCodeInput{ + ChallengeID: "challenge-123", + Code: "123456", + ClientPublicKey: "public-key", + TimeZone: "Europe/Kaliningrad", + }) + } + + require.Error(t, err) + assert.ErrorContains(t, err, tt.wantErr) + assert.NotErrorAs(t, err, new(*AuthServiceError)) + }) + } +} + +func TestHTTPAuthServiceClientUsesCallerContext(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + w.Header().Set("Content-Type", "application/json") + _, _ = io.WriteString(w, `{"challenge_id":"challenge-123"}`) + })) + defer server.Close() + + client := newTestHTTPAuthServiceClient(t, server) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + + _, err := client.SendEmailCode(ctx, SendEmailCodeInput{Email: "pilot@example.com"}) + require.Error(t, err) + assert.ErrorContains(t, err, "send email code via auth service") + assert.True(t, errors.Is(err, context.DeadlineExceeded)) +} + +func TestHTTPAuthServiceClientRejectsNilContext(t *testing.T) { + t.Parallel() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.FailNow(t, "unexpected request", r.URL.Path) + })) + defer server.Close() + + client := newTestHTTPAuthServiceClient(t, server) + + _, err := client.SendEmailCode(nil, SendEmailCodeInput{Email: "pilot@example.com"}) + require.Error(t, err) + assert.ErrorContains(t, err, "nil context") +} + +func newTestHTTPAuthServiceClient(t *testing.T, server *httptest.Server) *HTTPAuthServiceClient { + t.Helper() + + client, err := newHTTPAuthServiceClient(server.URL, server.Client()) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, client.Close()) + }) + + return client +} + +func TestDecodeStrictJSONPayloadRejectsTrailingJSON(t *testing.T) { + t.Parallel() + + var target struct { + Value string `json:"value"` + } + err := decodeStrictJSONPayload([]byte(`{"value":"ok"}{}`), &target) + require.Error(t, err) + assert.Equal(t, "unexpected trailing JSON input", err.Error()) +} + +func TestDecodeAuthServiceErrorPreservesBlankFieldsForLaterNormalization(t *testing.T) { + t.Parallel() + + authErr, err := decodeAuthServiceError(http.StatusBadGateway, []byte(`{"error":{"code":" ","message":" "}}`)) + require.NoError(t, err) + assert.Equal(t, http.StatusBadGateway, authErr.StatusCode) + assert.True(t, strings.TrimSpace(authErr.Code) == "") + assert.True(t, strings.TrimSpace(authErr.Message) == "") +} diff --git a/go.work b/go.work index 708a98b..bc4bffb 100644 --- a/go.work +++ b/go.work @@ -5,6 +5,7 @@ use ( ./client ./game ./gateway + ./integration ./pkg/calc ./pkg/connector ./pkg/error diff --git a/integration/README.md b/integration/README.md new file mode 100644 index 0000000..55b4df0 --- /dev/null +++ b/integration/README.md @@ -0,0 +1,72 @@ +# Integration Tests + +`integration` owns only true inter-service black-box tests. +Each suite must raise real service processes, speak only over public HTTP/gRPC/Redis contracts, and avoid imports from `internal/...` packages of tested services. + +## Layout + +```text +integration/ +├── README.md +├── go.mod +├── gatewayauthsession/ +│ ├── harness_test.go +│ └── gateway_authsession_test.go +└── internal/ + ├── contracts/ + │ └── gatewayv1/ + │ └── contract.go + └── harness/ + ├── binary.go + ├── keys.go + ├── mail_stub.go + ├── process.go + └── user_stub.go +``` + +## Rules + +- Keep suites black-box. Do not import `galaxy/gateway/internal/...`, `galaxy/authsession/internal/...`, or any other service-owned internal package. +- Start real binaries from `cmd/...` and talk to them only through their published HTTP, gRPC, and Redis contracts. +- Put boundary-specific orchestration and assertions into the owning suite package, not into shared helpers. +- Put only generic process/runtime utilities into `internal/harness`. +- Put only public-contract helpers into `internal/contracts/...`. + +## Current Boundary Suites + +- `gatewayauthsession` verifies the integration boundary between real `Edge Gateway` and real `Auth / Session Service`. + +The current fast suite uses one isolated `miniredis` instance plus external stateful HTTP stubs for mail and user services. + +## Running + +Run from the module directory: + +```bash +cd integration +go test ./gatewayauthsession/... +``` + +Useful regression commands after boundary changes: + +```bash +go test ./gatewayauthsession/... +cd ../gateway && go test ./... +cd ../authsession && go test ./... -run GatewayCompatibility +``` + +Do not use `go test ./...` from the repository root. The repository is organized through `go.work`, so verification should stay module-scoped. + +## Adding A New Boundary Suite + +1. Create `integration//` for the new inter-service boundary. +2. Keep suite-local fixtures, scenario helpers, and assertion helpers inside that package. +3. Reuse `internal/harness` only for generic concerns such as binary build/run, ports, keys, Redis, and shared external stubs. +4. Add new helpers to `internal/contracts//` only when they describe a reusable public wire contract. +5. Prefer fast deterministic infrastructure by default: in-memory test doubles, `httptest` stubs, and `miniredis`. + +## Future Real Redis Smoke Suites + +Fast suites stay on `miniredis` by default. +When a boundary needs one real Redis smoke suite later, keep it in the same boundary package and gate it explicitly with environment-driven configuration such as `INTEGRATION_REAL_REDIS_ADDR`. +That smoke suite should complement, not replace, the deterministic `miniredis` coverage. diff --git a/integration/gatewayauthsession/gateway_authsession_test.go b/integration/gatewayauthsession/gateway_authsession_test.go new file mode 100644 index 0000000..74fa372 --- /dev/null +++ b/integration/gatewayauthsession/gateway_authsession_test.go @@ -0,0 +1,265 @@ +package gatewayauthsession_test + +import ( + "context" + "crypto/ed25519" + "encoding/base64" + "net/http" + "testing" + "time" + + "galaxy/integration/internal/harness" + + gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestGatewayAuthSessionSendEmailCodeReachesAuthsessionMailDelivery(t *testing.T) { + h := newGatewayAuthSessionHarness(t, gatewayAuthSessionOptions{}) + + response := postJSONValue(t, h.gatewayPublicURL+"/api/v1/public/auth/send-email-code", map[string]string{ + "email": testEmail, + }) + require.Equal(t, http.StatusOK, response.StatusCode) + + var body struct { + ChallengeID string `json:"challenge_id"` + } + require.NoError(t, decodeStrictJSONPayload([]byte(response.Body), &body)) + require.NotEmpty(t, body.ChallengeID) + + deliveries := h.mailStub.RecordedDeliveries() + require.Len(t, deliveries, 1) + require.Equal(t, testEmail, deliveries[0].Email) + require.Len(t, deliveries[0].Code, 6) +} + +func TestGatewayAuthSessionConfirmCreatesProjectionAndAllowsSubscribeEvents(t *testing.T) { + h := newGatewayAuthSessionHarness(t, gatewayAuthSessionOptions{}) + + clientPrivateKey := newClientPrivateKey("confirm-projection") + challengeID, code := h.sendChallenge(t, testEmail) + + response := h.confirmCode(t, challengeID, code, clientPrivateKey) + require.Equal(t, http.StatusOK, response.StatusCode) + + var confirmBody struct { + DeviceSessionID string `json:"device_session_id"` + } + require.NoError(t, decodeStrictJSONPayload([]byte(response.Body), &confirmBody)) + require.NotEmpty(t, confirmBody.DeviceSessionID) + + record := h.readGatewaySessionRecord(t, confirmBody.DeviceSessionID) + require.Equal(t, gatewaySessionRecord{ + DeviceSessionID: confirmBody.DeviceSessionID, + UserID: "user-1", + ClientPublicKey: base64.StdEncoding.EncodeToString(clientPrivateKey.Public().(ed25519.PublicKey)), + Status: "active", + }, record) + + ensureCalls := h.userStub.EnsureCalls() + require.Len(t, ensureCalls, 1) + require.Equal(t, testEmail, ensureCalls[0].Email) + require.Equal(t, "en", ensureCalls[0].PreferredLanguage) + require.Equal(t, testTimeZone, ensureCalls[0].TimeZone) + + conn := h.dialGateway(t) + client := gatewayv1.NewEdgeGatewayClient(conn) + + stream, err := client.SubscribeEvents(context.Background(), newSubscribeEventsRequest(confirmBody.DeviceSessionID, "request-bootstrap", clientPrivateKey)) + require.NoError(t, err) + + event, err := stream.Recv() + require.NoError(t, err) + assertBootstrapEvent(t, event, h.responseSignerPublicKey, "request-bootstrap") +} + +func TestGatewayAuthSessionRepeatedConfirmReturnsSameSessionID(t *testing.T) { + h := newGatewayAuthSessionHarness(t, gatewayAuthSessionOptions{}) + + clientPrivateKey := newClientPrivateKey("repeated-confirm") + challengeID, code := h.sendChallenge(t, testEmail) + + first := h.confirmCode(t, challengeID, code, clientPrivateKey) + second := h.confirmCode(t, challengeID, code, clientPrivateKey) + require.Equal(t, http.StatusOK, first.StatusCode) + require.Equal(t, http.StatusOK, second.StatusCode) + + var firstBody struct { + DeviceSessionID string `json:"device_session_id"` + } + var secondBody struct { + DeviceSessionID string `json:"device_session_id"` + } + require.NoError(t, decodeStrictJSONPayload([]byte(first.Body), &firstBody)) + require.NoError(t, decodeStrictJSONPayload([]byte(second.Body), &secondBody)) + require.Equal(t, firstBody.DeviceSessionID, secondBody.DeviceSessionID) +} + +func TestGatewayAuthSessionInvalidClientPublicKeyPassesThroughUnchanged(t *testing.T) { + h := newGatewayAuthSessionHarness(t, gatewayAuthSessionOptions{}) + + challengeID, _ := h.sendChallenge(t, testEmail) + + response := postJSONValue(t, h.gatewayPublicURL+"/api/v1/public/auth/confirm-email-code", map[string]string{ + "challenge_id": challengeID, + "code": "123456", + "client_public_key": "invalid", + "time_zone": testTimeZone, + }) + + require.Equal(t, http.StatusBadRequest, response.StatusCode) + require.JSONEq(t, `{"error":{"code":"invalid_client_public_key","message":"client_public_key is not a valid base64-encoded raw 32-byte Ed25519 public key"}}`, response.Body) +} + +func TestGatewayAuthSessionChallengeNotFoundPassesThroughUnchanged(t *testing.T) { + h := newGatewayAuthSessionHarness(t, gatewayAuthSessionOptions{}) + + response := h.confirmCode(t, "missing-challenge", "123456", newClientPrivateKey("missing-challenge")) + + require.Equal(t, http.StatusNotFound, response.StatusCode) + require.JSONEq(t, `{"error":{"code":"challenge_not_found","message":"challenge not found"}}`, response.Body) +} + +func TestGatewayAuthSessionInvalidCodePassesThroughUnchanged(t *testing.T) { + h := newGatewayAuthSessionHarness(t, gatewayAuthSessionOptions{}) + + clientPrivateKey := newClientPrivateKey("invalid-code") + challengeID, code := h.sendChallenge(t, testEmail) + invalidCode := "000000" + if code == invalidCode { + invalidCode = "111111" + } + + response := h.confirmCode(t, challengeID, invalidCode, clientPrivateKey) + + require.Equal(t, http.StatusBadRequest, response.StatusCode) + require.JSONEq(t, `{"error":{"code":"invalid_code","message":"confirmation code is invalid"}}`, response.Body) +} + +func TestGatewayAuthSessionBlockedSendRemainsSuccessShapedWithoutDelivery(t *testing.T) { + h := newGatewayAuthSessionHarness(t, gatewayAuthSessionOptions{}) + h.userStub.SeedBlockedEmail(testEmail, "policy_blocked") + + response := postJSONValue(t, h.gatewayPublicURL+"/api/v1/public/auth/send-email-code", map[string]string{ + "email": testEmail, + }) + + require.Equal(t, http.StatusOK, response.StatusCode) + var body struct { + ChallengeID string `json:"challenge_id"` + } + require.NoError(t, decodeStrictJSONPayload([]byte(response.Body), &body)) + require.NotEmpty(t, body.ChallengeID) + require.Empty(t, h.mailStub.RecordedDeliveries()) +} + +func TestGatewayAuthSessionSessionLimitExceededPassesThroughUnchanged(t *testing.T) { + h := newGatewayAuthSessionHarness(t, gatewayAuthSessionOptions{}) + h.seedSessionLimit(t, 1) + + firstClientPrivateKey := newClientPrivateKey("session-limit-first") + firstChallengeID, firstCode := h.sendChallenge(t, testEmail) + firstConfirm := h.confirmCode(t, firstChallengeID, firstCode, firstClientPrivateKey) + require.Equal(t, http.StatusOK, firstConfirm.StatusCode) + + const secondEmail = "pilot-second@example.com" + h.userStub.SeedExisting(secondEmail, "user-1") + + secondClientPrivateKey := newClientPrivateKey("session-limit-second") + secondChallengeID, secondCode := h.sendChallenge(t, secondEmail) + secondConfirm := h.confirmCode(t, secondChallengeID, secondCode, secondClientPrivateKey) + + require.Equal(t, http.StatusConflict, secondConfirm.StatusCode) + require.JSONEq(t, `{"error":{"code":"session_limit_exceeded","message":"active session limit would be exceeded"}}`, secondConfirm.Body) +} + +func TestGatewayAuthSessionRevokeClosesPushStreamAndRejectsReopen(t *testing.T) { + h := newGatewayAuthSessionHarness(t, gatewayAuthSessionOptions{}) + + clientPrivateKey := newClientPrivateKey("revoke") + challengeID, code := h.sendChallenge(t, testEmail) + confirm := h.confirmCode(t, challengeID, code, clientPrivateKey) + require.Equal(t, http.StatusOK, confirm.StatusCode) + + var confirmBody struct { + DeviceSessionID string `json:"device_session_id"` + } + require.NoError(t, decodeStrictJSONPayload([]byte(confirm.Body), &confirmBody)) + + conn := h.dialGateway(t) + client := gatewayv1.NewEdgeGatewayClient(conn) + + stream, err := client.SubscribeEvents(context.Background(), newSubscribeEventsRequest(confirmBody.DeviceSessionID, "request-revoke", clientPrivateKey)) + require.NoError(t, err) + + event, err := stream.Recv() + require.NoError(t, err) + assertBootstrapEvent(t, event, h.responseSignerPublicKey, "request-revoke") + + revokeResponse := postJSONValue(t, h.authsessionInternalURL+"/api/v1/internal/sessions/"+confirmBody.DeviceSessionID+"/revoke", map[string]any{ + "reason_code": "admin_revoke", + "actor": map[string]string{ + "type": "system", + }, + }) + require.Equal(t, http.StatusOK, revokeResponse.StatusCode) + + recvErrCh := make(chan error, 1) + go func() { + _, recvErr := stream.Recv() + recvErrCh <- recvErr + }() + + select { + case recvErr := <-recvErrCh: + require.Equal(t, codes.FailedPrecondition, status.Code(recvErr)) + require.Equal(t, "device session is revoked", status.Convert(recvErr).Message()) + case <-time.After(5 * time.Second): + t.Fatal("gateway stream did not close after authsession revoke") + } + + reopened, err := client.SubscribeEvents(context.Background(), newSubscribeEventsRequest(confirmBody.DeviceSessionID, "request-reopen", clientPrivateKey)) + if err == nil { + _, err = reopened.Recv() + } + + require.Equal(t, codes.FailedPrecondition, status.Code(err)) + require.Equal(t, "device session is revoked", status.Convert(err).Message()) +} + +func TestGatewayAuthSessionGatewayTimeoutMappingOverridesAuthsessionMessage(t *testing.T) { + h := newGatewayAuthSessionHarness(t, gatewayAuthSessionOptions{ + gatewayAuthUpstreamTimeout: 50 * time.Millisecond, + authsessionPublicHTTPTimeout: time.Second, + authsessionMailBehavior: harness.MailBehavior{ + Delay: 200 * time.Millisecond, + }, + }) + + response := postJSONValue(t, h.gatewayPublicURL+"/api/v1/public/auth/send-email-code", map[string]string{ + "email": testEmail, + }) + + require.Equal(t, http.StatusServiceUnavailable, response.StatusCode) + require.JSONEq(t, `{"error":{"code":"service_unavailable","message":"auth service is unavailable"}}`, response.Body) +} + +func TestGatewayAuthSessionAuthsessionServiceUnavailablePassesThroughUnchanged(t *testing.T) { + h := newGatewayAuthSessionHarness(t, gatewayAuthSessionOptions{ + authsessionMailBehavior: harness.MailBehavior{ + StatusCode: http.StatusServiceUnavailable, + RawBody: `{"error":"mail backend unavailable"}`, + }, + }) + + response := postJSONValue(t, h.gatewayPublicURL+"/api/v1/public/auth/send-email-code", map[string]string{ + "email": testEmail, + }) + + require.Equal(t, http.StatusServiceUnavailable, response.StatusCode) + require.JSONEq(t, `{"error":{"code":"service_unavailable","message":"service is unavailable"}}`, response.Body) +} diff --git a/integration/gatewayauthsession/harness_test.go b/integration/gatewayauthsession/harness_test.go new file mode 100644 index 0000000..3c5c6d7 --- /dev/null +++ b/integration/gatewayauthsession/harness_test.go @@ -0,0 +1,406 @@ +package gatewayauthsession_test + +import ( + "bytes" + "context" + "crypto/ed25519" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net/http" + "path/filepath" + "testing" + "time" + + contractsgatewayv1 "galaxy/integration/internal/contracts/gatewayv1" + "galaxy/integration/internal/harness" + + gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + testEmail = "pilot@example.com" + testTimeZone = "Europe/Kaliningrad" + + defaultGatewayAuthUpstreamTimeout = 500 * time.Millisecond + defaultAuthsessionPublicHTTPTimeout = time.Second + defaultAuthsessionInternalHTTPTimeout = time.Second + defaultAuthsessionDependencyTimeout = time.Second +) + +type gatewayAuthSessionOptions struct { + gatewayAuthUpstreamTimeout time.Duration + authsessionPublicHTTPTimeout time.Duration + authsessionMailBehavior harness.MailBehavior +} + +type gatewayAuthSessionHarness struct { + redis *redis.Client + + mailStub *harness.MailStub + userStub *harness.UserStub + + authsessionPublicURL string + authsessionInternalURL string + gatewayPublicURL string + gatewayGRPCAddr string + + responseSignerPublicKey ed25519.PublicKey + + gatewayProcess *harness.Process + authsessionProcess *harness.Process +} + +func newGatewayAuthSessionHarness(t *testing.T, opts gatewayAuthSessionOptions) *gatewayAuthSessionHarness { + t.Helper() + + if opts.gatewayAuthUpstreamTimeout <= 0 { + opts.gatewayAuthUpstreamTimeout = defaultGatewayAuthUpstreamTimeout + } + if opts.authsessionPublicHTTPTimeout <= 0 { + opts.authsessionPublicHTTPTimeout = defaultAuthsessionPublicHTTPTimeout + } + + redisServer := harness.StartMiniredis(t) + redisClient := redis.NewClient(&redis.Options{ + Addr: redisServer.Addr(), + Protocol: 2, + DisableIdentity: true, + }) + t.Cleanup(func() { + require.NoError(t, redisClient.Close()) + }) + + mailStub := harness.NewMailStub(t) + mailStub.SetBehavior(opts.authsessionMailBehavior) + + userStub := harness.NewUserStub(t) + + responseSignerPath, responseSignerPublicKey := harness.WriteResponseSignerPEM(t, t.Name()) + authsessionPublicAddr := harness.FreeTCPAddress(t) + authsessionInternalAddr := harness.FreeTCPAddress(t) + gatewayPublicAddr := harness.FreeTCPAddress(t) + gatewayGRPCAddr := harness.FreeTCPAddress(t) + + authsessionBinary := harness.BuildBinary(t, "authsession", "./authsession/cmd/authsession") + gatewayBinary := harness.BuildBinary(t, "gateway", "./gateway/cmd/gateway") + + authsessionEnv := map[string]string{ + "AUTHSESSION_LOG_LEVEL": "info", + "AUTHSESSION_PUBLIC_HTTP_ADDR": authsessionPublicAddr, + "AUTHSESSION_PUBLIC_HTTP_REQUEST_TIMEOUT": opts.authsessionPublicHTTPTimeout.String(), + "AUTHSESSION_INTERNAL_HTTP_ADDR": authsessionInternalAddr, + "AUTHSESSION_INTERNAL_HTTP_REQUEST_TIMEOUT": defaultAuthsessionInternalHTTPTimeout.String(), + "AUTHSESSION_REDIS_ADDR": redisServer.Addr(), + "AUTHSESSION_USER_SERVICE_MODE": "rest", + "AUTHSESSION_USER_SERVICE_BASE_URL": userStub.BaseURL(), + "AUTHSESSION_USER_SERVICE_REQUEST_TIMEOUT": defaultAuthsessionDependencyTimeout.String(), + "AUTHSESSION_MAIL_SERVICE_MODE": "rest", + "AUTHSESSION_MAIL_SERVICE_BASE_URL": mailStub.BaseURL(), + "AUTHSESSION_MAIL_SERVICE_REQUEST_TIMEOUT": defaultAuthsessionDependencyTimeout.String(), + "AUTHSESSION_REDIS_GATEWAY_SESSION_CACHE_KEY_PREFIX": "gateway:session:", + "AUTHSESSION_REDIS_GATEWAY_SESSION_EVENTS_STREAM": "gateway:session_events", + "OTEL_TRACES_EXPORTER": "none", + "OTEL_METRICS_EXPORTER": "none", + } + authsessionProcess := harness.StartProcess(t, "authsession", authsessionBinary, authsessionEnv) + waitForAuthsessionPublicReady(t, authsessionProcess, "http://"+authsessionPublicAddr) + waitForAuthsessionInternalReady(t, authsessionProcess, "http://"+authsessionInternalAddr) + + gatewayEnv := map[string]string{ + "GATEWAY_LOG_LEVEL": "info", + "GATEWAY_PUBLIC_HTTP_ADDR": gatewayPublicAddr, + "GATEWAY_AUTHENTICATED_GRPC_ADDR": gatewayGRPCAddr, + "GATEWAY_SESSION_CACHE_REDIS_ADDR": redisServer.Addr(), + "GATEWAY_SESSION_CACHE_REDIS_KEY_PREFIX": "gateway:session:", + "GATEWAY_SESSION_EVENTS_REDIS_STREAM": "gateway:session_events", + "GATEWAY_CLIENT_EVENTS_REDIS_STREAM": "gateway:client_events", + "GATEWAY_REPLAY_REDIS_KEY_PREFIX": "gateway:replay:", + "GATEWAY_RESPONSE_SIGNER_PRIVATE_KEY_PEM_PATH": filepath.Clean(responseSignerPath), + "GATEWAY_AUTH_SERVICE_BASE_URL": "http://" + authsessionPublicAddr, + "GATEWAY_PUBLIC_AUTH_UPSTREAM_TIMEOUT": opts.gatewayAuthUpstreamTimeout.String(), + "GATEWAY_PUBLIC_HTTP_ANTI_ABUSE_PUBLIC_AUTH_RATE_LIMIT_REQUESTS": "100", + "GATEWAY_PUBLIC_HTTP_ANTI_ABUSE_PUBLIC_AUTH_RATE_LIMIT_WINDOW": "1s", + "GATEWAY_PUBLIC_HTTP_ANTI_ABUSE_PUBLIC_AUTH_RATE_LIMIT_BURST": "100", + "GATEWAY_PUBLIC_HTTP_ANTI_ABUSE_SEND_EMAIL_CODE_IDENTITY_RATE_LIMIT_REQUESTS": "100", + "GATEWAY_PUBLIC_HTTP_ANTI_ABUSE_SEND_EMAIL_CODE_IDENTITY_RATE_LIMIT_WINDOW": "1s", + "GATEWAY_PUBLIC_HTTP_ANTI_ABUSE_SEND_EMAIL_CODE_IDENTITY_RATE_LIMIT_BURST": "100", + "GATEWAY_PUBLIC_HTTP_ANTI_ABUSE_CONFIRM_EMAIL_CODE_IDENTITY_RATE_LIMIT_REQUESTS": "100", + "GATEWAY_PUBLIC_HTTP_ANTI_ABUSE_CONFIRM_EMAIL_CODE_IDENTITY_RATE_LIMIT_WINDOW": "1s", + "GATEWAY_PUBLIC_HTTP_ANTI_ABUSE_CONFIRM_EMAIL_CODE_IDENTITY_RATE_LIMIT_BURST": "100", + "OTEL_TRACES_EXPORTER": "none", + "OTEL_METRICS_EXPORTER": "none", + } + gatewayProcess := harness.StartProcess(t, "gateway", gatewayBinary, gatewayEnv) + harness.WaitForHTTPStatus(t, gatewayProcess, "http://"+gatewayPublicAddr+"/healthz", http.StatusOK) + harness.WaitForTCP(t, gatewayProcess, gatewayGRPCAddr) + + return &gatewayAuthSessionHarness{ + redis: redisClient, + mailStub: mailStub, + userStub: userStub, + authsessionPublicURL: "http://" + authsessionPublicAddr, + authsessionInternalURL: "http://" + authsessionInternalAddr, + gatewayPublicURL: "http://" + gatewayPublicAddr, + gatewayGRPCAddr: gatewayGRPCAddr, + responseSignerPublicKey: responseSignerPublicKey, + gatewayProcess: gatewayProcess, + authsessionProcess: authsessionProcess, + } +} + +func (h *gatewayAuthSessionHarness) dialGateway(t *testing.T) *grpc.ClientConn { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, err := grpc.DialContext( + ctx, + h.gatewayGRPCAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, conn.Close()) + }) + + return conn +} + +func (h *gatewayAuthSessionHarness) seedSessionLimit(t *testing.T, limit int) { + t.Helper() + + require.NoError(t, h.redis.Set(context.Background(), "authsession:config:active-session-limit", fmt.Sprint(limit), 0).Err()) +} + +func (h *gatewayAuthSessionHarness) readGatewaySessionRecord(t *testing.T, deviceSessionID string) gatewaySessionRecord { + t.Helper() + + payload, err := h.redis.Get(context.Background(), "gateway:session:"+deviceSessionID).Bytes() + require.NoError(t, err) + + var record gatewaySessionRecord + require.NoError(t, decodeStrictJSONPayload(payload, &record)) + return record +} + +func (h *gatewayAuthSessionHarness) sendChallenge(t *testing.T, email string) (string, string) { + t.Helper() + + response := postJSONValue(t, h.gatewayPublicURL+"/api/v1/public/auth/send-email-code", map[string]string{ + "email": email, + }) + require.Equal(t, http.StatusOK, response.StatusCode) + + var body struct { + ChallengeID string `json:"challenge_id"` + } + require.NoError(t, decodeStrictJSONPayload([]byte(response.Body), &body)) + + deliveries := h.mailStub.RecordedDeliveries() + require.NotEmpty(t, deliveries) + return body.ChallengeID, deliveries[len(deliveries)-1].Code +} + +func (h *gatewayAuthSessionHarness) confirmCode(t *testing.T, challengeID string, code string, clientPrivateKey ed25519.PrivateKey) httpResponse { + t.Helper() + + return postJSONValue(t, h.gatewayPublicURL+"/api/v1/public/auth/confirm-email-code", map[string]string{ + "challenge_id": challengeID, + "code": code, + "client_public_key": encodePublicKey(clientPrivateKey.Public().(ed25519.PublicKey)), + "time_zone": testTimeZone, + }) +} + +func newClientPrivateKey(label string) ed25519.PrivateKey { + seed := sha256.Sum256([]byte("galaxy-integration-gateway-authsession-client-" + label)) + return ed25519.NewKeyFromSeed(seed[:]) +} + +func newSubscribeEventsRequest(deviceSessionID string, requestID string, clientPrivateKey ed25519.PrivateKey) *gatewayv1.SubscribeEventsRequest { + payloadHash := contractsgatewayv1.ComputePayloadHash(nil) + + request := &gatewayv1.SubscribeEventsRequest{ + ProtocolVersion: contractsgatewayv1.ProtocolVersionV1, + DeviceSessionId: deviceSessionID, + MessageType: contractsgatewayv1.SubscribeMessageType, + TimestampMs: time.Now().UnixMilli(), + RequestId: requestID, + PayloadHash: payloadHash, + TraceId: "trace-" + requestID, + } + request.Signature = contractsgatewayv1.SignRequest(clientPrivateKey, contractsgatewayv1.RequestSigningFields{ + ProtocolVersion: request.GetProtocolVersion(), + DeviceSessionID: request.GetDeviceSessionId(), + MessageType: request.GetMessageType(), + TimestampMS: request.GetTimestampMs(), + RequestID: request.GetRequestId(), + PayloadHash: request.GetPayloadHash(), + }) + return request +} + +func assertBootstrapEvent(t *testing.T, event *gatewayv1.GatewayEvent, responseSignerPublicKey ed25519.PublicKey, wantRequestID string) { + t.Helper() + + require.Equal(t, contractsgatewayv1.ServerTimeEventType, event.GetEventType()) + require.Equal(t, wantRequestID, event.GetEventId()) + require.Equal(t, wantRequestID, event.GetRequestId()) + require.NoError(t, contractsgatewayv1.VerifyPayloadHash(event.GetPayloadBytes(), event.GetPayloadHash())) + require.NoError(t, contractsgatewayv1.VerifyEventSignature(responseSignerPublicKey, event.GetSignature(), contractsgatewayv1.EventSigningFields{ + EventType: event.GetEventType(), + EventID: event.GetEventId(), + TimestampMS: event.GetTimestampMs(), + RequestID: event.GetRequestId(), + TraceID: event.GetTraceId(), + PayloadHash: event.GetPayloadHash(), + })) +} + +type httpResponse struct { + StatusCode int + Body string + Header http.Header +} + +type gatewaySessionRecord struct { + DeviceSessionID string `json:"device_session_id"` + UserID string `json:"user_id"` + ClientPublicKey string `json:"client_public_key"` + Status string `json:"status"` + RevokedAtMS *int64 `json:"revoked_at_ms,omitempty"` +} + +func postJSONValue(t *testing.T, targetURL string, body any) httpResponse { + t.Helper() + + payload, err := json.Marshal(body) + require.NoError(t, err) + + request, err := http.NewRequest(http.MethodPost, targetURL, bytes.NewReader(payload)) + require.NoError(t, err) + request.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 5 * time.Second} + + response, err := client.Do(request) + require.NoError(t, err) + defer response.Body.Close() + + responseBody, err := io.ReadAll(response.Body) + require.NoError(t, err) + + return httpResponse{ + StatusCode: response.StatusCode, + Body: string(responseBody), + Header: response.Header.Clone(), + } +} + +func decodeStrictJSONPayload(payload []byte, target any) error { + decoder := json.NewDecoder(bytes.NewReader(payload)) + decoder.DisallowUnknownFields() + + if err := decoder.Decode(target); err != nil { + return err + } + if err := decoder.Decode(&struct{}{}); err != io.EOF { + if err == nil { + return fmt.Errorf("unexpected trailing JSON input") + } + return err + } + + return nil +} + +func encodePublicKey(publicKey ed25519.PublicKey) string { + return base64.StdEncoding.EncodeToString(publicKey) +} + +func waitForAuthsessionPublicReady(t *testing.T, process *harness.Process, baseURL string) { + t.Helper() + + client := &http.Client{Timeout: 250 * time.Millisecond} + deadline := time.Now().Add(10 * time.Second) + + for time.Now().Before(deadline) { + response, err := postJSONValueMaybe(client, baseURL+"/api/v1/public/auth/send-email-code", map[string]string{ + "email": "", + }) + if err == nil && response.StatusCode == http.StatusBadRequest { + return + } + + time.Sleep(25 * time.Millisecond) + } + + t.Fatalf("wait for authsession public readiness: timeout\n%s", process.Logs()) +} + +func waitForAuthsessionInternalReady(t *testing.T, process *harness.Process, baseURL string) { + t.Helper() + + client := &http.Client{Timeout: 250 * time.Millisecond} + deadline := time.Now().Add(10 * time.Second) + + for time.Now().Before(deadline) { + request, err := http.NewRequest(http.MethodGet, baseURL+"/api/v1/internal/sessions/missing", nil) + if err != nil { + t.Fatalf("build authsession internal readiness request: %v", err) + } + + response, err := client.Do(request) + if err == nil { + _, _ = io.Copy(io.Discard, response.Body) + response.Body.Close() + if response.StatusCode == http.StatusNotFound { + return + } + } + + time.Sleep(25 * time.Millisecond) + } + + t.Fatalf("wait for authsession internal readiness: timeout\n%s", process.Logs()) +} + +func postJSONValueMaybe(client *http.Client, targetURL string, body any) (httpResponse, error) { + payload, err := json.Marshal(body) + if err != nil { + return httpResponse{}, err + } + + request, err := http.NewRequest(http.MethodPost, targetURL, bytes.NewReader(payload)) + if err != nil { + return httpResponse{}, err + } + request.Header.Set("Content-Type", "application/json") + + response, err := client.Do(request) + if err != nil { + return httpResponse{}, err + } + defer response.Body.Close() + + responseBody, err := io.ReadAll(response.Body) + if err != nil { + return httpResponse{}, err + } + + return httpResponse{ + StatusCode: response.StatusCode, + Body: string(responseBody), + Header: response.Header.Clone(), + }, nil +} diff --git a/integration/go.mod b/integration/go.mod new file mode 100644 index 0000000..7a8d48b --- /dev/null +++ b/integration/go.mod @@ -0,0 +1,31 @@ +module galaxy/integration + +go 1.26.0 + +require ( + github.com/alicebob/miniredis/v2 v2.37.0 + github.com/redis/go-redis/v9 v9.18.0 + github.com/stretchr/testify v1.11.1 + google.golang.org/grpc v1.80.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect + go.opentelemetry.io/otel v1.42.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.42.0 // indirect + go.uber.org/atomic v1.11.0 // indirect + golang.org/x/net v0.52.0 // indirect + golang.org/x/sys v0.42.0 // indirect + golang.org/x/text v0.35.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect + google.golang.org/protobuf v1.36.11 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/integration/go.sum b/integration/go.sum new file mode 100644 index 0000000..23fcc34 --- /dev/null +++ b/integration/go.sum @@ -0,0 +1,57 @@ +github.com/alicebob/miniredis/v2 v2.37.0 h1:RheObYW32G1aiJIj81XVt78ZHJpHonHLHW7OLIshq68= +github.com/alicebob/miniredis/v2 v2.37.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs= +github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= +go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4= +go.opentelemetry.io/otel/sdk v1.42.0 h1:LyC8+jqk6UJwdrI/8VydAq/hvkFKNHZVIWuslJXYsDo= +go.opentelemetry.io/otel/sdk/metric v1.42.0 h1:D/1QR46Clz6ajyZ3G8SgNlTJKBdGp84q9RKCAZ3YGuA= +go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg= +google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= +google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/integration/internal/contracts/gatewayv1/contract.go b/integration/internal/contracts/gatewayv1/contract.go new file mode 100644 index 0000000..a5e6a1b --- /dev/null +++ b/integration/internal/contracts/gatewayv1/contract.go @@ -0,0 +1,184 @@ +// Package gatewayv1contract provides public-contract helpers for the gateway +// v1 authenticated transport without importing service-internal packages. +package gatewayv1contract + +import ( + "bytes" + "crypto/ed25519" + "crypto/sha256" + "encoding/binary" + "errors" +) + +const ( + // ProtocolVersionV1 is the supported public protocol version literal. + ProtocolVersionV1 = "v1" + + // SubscribeMessageType is the authenticated message type used to open the + // gateway push stream. + SubscribeMessageType = "gateway.subscribe" + + // ServerTimeEventType is the bootstrap event type emitted by the gateway + // immediately after a push stream is opened. + ServerTimeEventType = "gateway.server_time" + + requestDomainMarkerV1 = "galaxy-request-v1" + eventDomainMarkerV1 = "galaxy-event-v1" +) + +var ( + // ErrInvalidPayloadHash reports that payloadHash is not a raw SHA-256 + // digest. + ErrInvalidPayloadHash = errors.New("payload_hash must be a 32-byte SHA-256 digest") + + // ErrPayloadHashMismatch reports that payloadHash does not match + // payloadBytes. + ErrPayloadHashMismatch = errors.New("payload_hash does not match payload_bytes") + + // ErrInvalidEventSignature reports that one gateway event signature is not + // a raw Ed25519 signature for the canonical event signing input. + ErrInvalidEventSignature = errors.New("invalid event signature") +) + +// RequestSigningFields stores the canonical public request fields bound into +// one client signature input. +type RequestSigningFields struct { + // ProtocolVersion identifies the gateway transport envelope version. + ProtocolVersion string + + // DeviceSessionID identifies the authenticated device session bound to the + // request. + DeviceSessionID string + + // MessageType is the stable authenticated gateway message type. + MessageType string + + // TimestampMS carries the client request timestamp in milliseconds. + TimestampMS int64 + + // RequestID is the transport correlation and anti-replay identifier. + RequestID string + + // PayloadHash stores the raw SHA-256 digest of PayloadBytes. + PayloadHash []byte +} + +// EventSigningFields stores the canonical public stream-event fields bound +// into one gateway event signature input. +type EventSigningFields struct { + // EventType identifies the stable client-facing event category. + EventType string + + // EventID is the stable event correlation identifier. + EventID string + + // TimestampMS carries the gateway event timestamp in milliseconds. + TimestampMS int64 + + // RequestID optionally correlates the event to the opening client request. + RequestID string + + // TraceID optionally carries the client-supplied trace correlation value. + TraceID string + + // PayloadHash stores the raw SHA-256 digest of PayloadBytes. + PayloadHash []byte +} + +// ComputePayloadHash returns the canonical raw SHA-256 digest for payloadBytes. +func ComputePayloadHash(payloadBytes []byte) []byte { + sum := sha256.Sum256(payloadBytes) + return bytes.Clone(sum[:]) +} + +// VerifyPayloadHash reports whether payloadHash matches payloadBytes under the +// public gateway payload-hash contract. +func VerifyPayloadHash(payloadBytes, payloadHash []byte) error { + if len(payloadHash) != sha256.Size { + return ErrInvalidPayloadHash + } + + sum := sha256.Sum256(payloadBytes) + if !bytes.Equal(sum[:], payloadHash) { + return ErrPayloadHashMismatch + } + + return nil +} + +// BuildRequestSigningInput returns the canonical byte sequence the v1 client +// request signature covers. +func BuildRequestSigningInput(fields RequestSigningFields) []byte { + size := len(requestDomainMarkerV1) + + len(fields.ProtocolVersion) + + len(fields.DeviceSessionID) + + len(fields.MessageType) + + len(fields.RequestID) + + len(fields.PayloadHash) + + (6 * binary.MaxVarintLen64) + + 8 + + buf := make([]byte, 0, size) + buf = appendLengthPrefixedString(buf, requestDomainMarkerV1) + buf = appendLengthPrefixedString(buf, fields.ProtocolVersion) + buf = appendLengthPrefixedString(buf, fields.DeviceSessionID) + buf = appendLengthPrefixedString(buf, fields.MessageType) + buf = binary.BigEndian.AppendUint64(buf, uint64(fields.TimestampMS)) + buf = appendLengthPrefixedString(buf, fields.RequestID) + buf = appendLengthPrefixedBytes(buf, fields.PayloadHash) + + return buf +} + +// BuildEventSigningInput returns the canonical byte sequence the v1 gateway +// event signature covers. +func BuildEventSigningInput(fields EventSigningFields) []byte { + size := len(eventDomainMarkerV1) + + len(fields.EventType) + + len(fields.EventID) + + len(fields.RequestID) + + len(fields.TraceID) + + len(fields.PayloadHash) + + (6 * binary.MaxVarintLen64) + + 8 + + buf := make([]byte, 0, size) + buf = appendLengthPrefixedString(buf, eventDomainMarkerV1) + buf = appendLengthPrefixedString(buf, fields.EventType) + buf = appendLengthPrefixedString(buf, fields.EventID) + buf = binary.BigEndian.AppendUint64(buf, uint64(fields.TimestampMS)) + buf = appendLengthPrefixedString(buf, fields.RequestID) + buf = appendLengthPrefixedString(buf, fields.TraceID) + buf = appendLengthPrefixedBytes(buf, fields.PayloadHash) + + return buf +} + +// SignRequest returns one raw Ed25519 client signature for the canonical v1 +// request signing input. +func SignRequest(privateKey ed25519.PrivateKey, fields RequestSigningFields) []byte { + return ed25519.Sign(privateKey, BuildRequestSigningInput(fields)) +} + +// VerifyEventSignature reports whether signature authenticates fields under +// publicKey using the canonical gateway event signing input. +func VerifyEventSignature(publicKey ed25519.PublicKey, signature []byte, fields EventSigningFields) error { + if len(publicKey) != ed25519.PublicKeySize || len(signature) != ed25519.SignatureSize { + return ErrInvalidEventSignature + } + if !ed25519.Verify(publicKey, BuildEventSigningInput(fields), signature) { + return ErrInvalidEventSignature + } + + return nil +} + +func appendLengthPrefixedString(dst []byte, value string) []byte { + return appendLengthPrefixedBytes(dst, []byte(value)) +} + +func appendLengthPrefixedBytes(dst []byte, value []byte) []byte { + dst = binary.AppendUvarint(dst, uint64(len(value))) + dst = append(dst, value...) + return dst +} diff --git a/integration/internal/harness/binary.go b/integration/internal/harness/binary.go new file mode 100644 index 0000000..4205257 --- /dev/null +++ b/integration/internal/harness/binary.go @@ -0,0 +1,71 @@ +// Package harness provides reusable black-box integration helpers shared by +// inter-service suites. +package harness + +import ( + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "sync" + "testing" +) + +var binaryCache struct { + mu sync.Mutex + paths map[string]string +} + +// BuildBinary builds packagePath once per test process and returns the +// resulting executable path. +func BuildBinary(t testing.TB, name string, packagePath string) string { + t.Helper() + + root := repositoryRoot(t) + key := name + ":" + packagePath + + binaryCache.mu.Lock() + if binaryCache.paths == nil { + binaryCache.paths = make(map[string]string) + } + if path, ok := binaryCache.paths[key]; ok { + binaryCache.mu.Unlock() + return path + } + + outputDir := filepath.Join(os.TempDir(), "galaxy-integration-binaries") + if err := os.MkdirAll(outputDir, 0o755); err != nil { + binaryCache.mu.Unlock() + t.Fatalf("create integration binary directory: %v", err) + } + + outputPath := filepath.Join(outputDir, sanitizeBinaryName(key)) + cmd := exec.Command("go", "build", "-o", outputPath, packagePath) + cmd.Dir = root + output, err := cmd.CombinedOutput() + if err != nil { + binaryCache.mu.Unlock() + t.Fatalf("build %s: %v\n%s", packagePath, err, output) + } + + binaryCache.paths[key] = outputPath + binaryCache.mu.Unlock() + return outputPath +} + +func repositoryRoot(t testing.TB) string { + t.Helper() + + _, file, _, ok := runtime.Caller(0) + if !ok { + t.Fatal("resolve harness repository root: runtime caller is unavailable") + } + + return filepath.Clean(filepath.Join(filepath.Dir(file), "..", "..", "..")) +} + +func sanitizeBinaryName(value string) string { + replacer := strings.NewReplacer("/", "_", "\\", "_", ":", "_", ".", "_") + return replacer.Replace(value) +} diff --git a/integration/internal/harness/keys.go b/integration/internal/harness/keys.go new file mode 100644 index 0000000..6a65656 --- /dev/null +++ b/integration/internal/harness/keys.go @@ -0,0 +1,54 @@ +package harness + +import ( + "crypto/ed25519" + "crypto/sha256" + "crypto/x509" + "encoding/pem" + "os" + "path/filepath" + "testing" + + "github.com/alicebob/miniredis/v2" +) + +// StartMiniredis starts one isolated Redis-compatible in-memory server and +// registers automatic cleanup. +func StartMiniredis(t testing.TB) *miniredis.Miniredis { + t.Helper() + + server, err := miniredis.Run() + if err != nil { + t.Fatalf("start miniredis: %v", err) + } + + t.Cleanup(server.Close) + return server +} + +// WriteResponseSignerPEM writes one deterministic PKCS#8 PEM-encoded Ed25519 +// private key for gateway response signing and returns the file path plus the +// matching public key. +func WriteResponseSignerPEM(t testing.TB, label string) (string, ed25519.PublicKey) { + t.Helper() + + seed := sha256.Sum256([]byte("galaxy-integration-response-signer-" + label)) + privateKey := ed25519.NewKeyFromSeed(seed[:]) + + encoded, err := x509.MarshalPKCS8PrivateKey(privateKey) + if err != nil { + t.Fatalf("marshal response signer private key: %v", err) + } + + pemBytes := pem.EncodeToMemory(&pem.Block{ + Type: "PRIVATE KEY", + Bytes: encoded, + }) + + path := filepath.Join(t.TempDir(), "response-signer.pem") + if err := os.WriteFile(path, pemBytes, 0o600); err != nil { + t.Fatalf("write response signer private key: %v", err) + } + + return path, privateKey.Public().(ed25519.PublicKey) +} diff --git a/integration/internal/harness/mail_stub.go b/integration/internal/harness/mail_stub.go new file mode 100644 index 0000000..e459dd7 --- /dev/null +++ b/integration/internal/harness/mail_stub.go @@ -0,0 +1,182 @@ +package harness + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" +) + +const mailStubPath = "/api/v1/internal/login-code-deliveries" + +// LoginCodeDelivery stores one mail-delivery request received by the external +// mail stub. +type LoginCodeDelivery struct { + // Email identifies the target e-mail address requested by authsession. + Email string + + // Code stores the cleartext login code requested by authsession. + Code string +} + +// MailBehavior overrides one external mail-stub response. +type MailBehavior struct { + // Delay waits before the stub writes its response. + Delay time.Duration + + // StatusCode overrides the HTTP status returned by the stub. Zero keeps the + // default `200 OK`. + StatusCode int + + // RawBody overrides the exact response body returned by the stub. Empty + // value keeps the default JSON payload for the chosen status. + RawBody string +} + +// MailStub provides one stateful external HTTP mail-service stub. +type MailStub struct { + server *httptest.Server + + mu sync.Mutex + deliveries []LoginCodeDelivery + behavior MailBehavior +} + +// NewMailStub starts one stateful external HTTP mail-service stub. +func NewMailStub(t testing.TB) *MailStub { + t.Helper() + + stub := &MailStub{} + stub.server = httptest.NewServer(http.HandlerFunc(stub.handle)) + t.Cleanup(stub.server.Close) + return stub +} + +// BaseURL returns the stub base URL suitable for service runtime wiring. +func (s *MailStub) BaseURL() string { + if s == nil || s.server == nil { + return "" + } + return s.server.URL +} + +// SetBehavior replaces the current response behavior used by subsequent +// requests. +func (s *MailStub) SetBehavior(behavior MailBehavior) { + s.mu.Lock() + defer s.mu.Unlock() + s.behavior = behavior +} + +// RecordedDeliveries returns a snapshot of all delivery requests received by +// the stub so far. +func (s *MailStub) RecordedDeliveries() []LoginCodeDelivery { + s.mu.Lock() + defer s.mu.Unlock() + + cloned := make([]LoginCodeDelivery, len(s.deliveries)) + copy(cloned, s.deliveries) + return cloned +} + +// Reset clears the recorded deliveries and restores default behavior. +func (s *MailStub) Reset() { + s.mu.Lock() + defer s.mu.Unlock() + + s.deliveries = nil + s.behavior = MailBehavior{} +} + +func (s *MailStub) handle(writer http.ResponseWriter, request *http.Request) { + if request.Method != http.MethodPost || request.URL.Path != mailStubPath { + http.NotFound(writer, request) + return + } + + var payload struct { + Email string `json:"email"` + Code string `json:"code"` + } + if err := decodeStrictJSONRequest(request, &payload); err != nil { + http.Error(writer, err.Error(), http.StatusBadRequest) + return + } + + s.mu.Lock() + s.deliveries = append(s.deliveries, LoginCodeDelivery{ + Email: payload.Email, + Code: payload.Code, + }) + behavior := s.behavior + s.mu.Unlock() + + if behavior.Delay > 0 { + timer := time.NewTimer(behavior.Delay) + defer timer.Stop() + + select { + case <-request.Context().Done(): + return + case <-timer.C: + } + } + + statusCode := behavior.StatusCode + if statusCode == 0 { + statusCode = http.StatusOK + } + + body := behavior.RawBody + if body == "" { + switch statusCode { + case http.StatusOK: + body = `{"outcome":"sent"}` + default: + body = `{"error":"stubbed mail failure"}` + } + } + + writer.Header().Set("Content-Type", "application/json") + writer.WriteHeader(statusCode) + _, _ = io.WriteString(writer, body) +} + +func decodeStrictJSONRequest(request *http.Request, target any) error { + decoder := json.NewDecoder(request.Body) + decoder.DisallowUnknownFields() + + if err := decoder.Decode(target); err != nil { + return err + } + if err := decoder.Decode(&struct{}{}); err != io.EOF { + if err == nil { + return errors.New("unexpected trailing JSON input") + } + return err + } + + return nil +} + +func decodeStrictJSONPayload(payload []byte, target any) error { + decoder := json.NewDecoder(bytes.NewReader(payload)) + decoder.DisallowUnknownFields() + + if err := decoder.Decode(target); err != nil { + return err + } + if err := decoder.Decode(&struct{}{}); err != io.EOF { + if err == nil { + return errors.New("unexpected trailing JSON input") + } + return err + } + + return nil +} diff --git a/integration/internal/harness/process.go b/integration/internal/harness/process.go new file mode 100644 index 0000000..0d26fd3 --- /dev/null +++ b/integration/internal/harness/process.go @@ -0,0 +1,276 @@ +package harness + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "strings" + "sync" + "syscall" + "testing" + "time" +) + +const ( + defaultStartupWait = 10 * time.Second + defaultPollInterval = 25 * time.Millisecond + defaultStopWait = 5 * time.Second +) + +// Process represents one long-lived external service process started by an +// integration suite. +type Process struct { + name string + cmd *exec.Cmd + + logsMu sync.Mutex + logs bytes.Buffer + + doneCh chan struct{} + waitErr error +} + +// StartProcess starts binaryPath with envOverrides and registers cleanup that +// stops the process and prints captured logs on failed tests. +func StartProcess(t testing.TB, name string, binaryPath string, envOverrides map[string]string) *Process { + t.Helper() + + cmd := exec.Command(binaryPath) + cmd.Env = mergeEnvironment(os.Environ(), envOverrides) + + process := &Process{ + name: name, + cmd: cmd, + doneCh: make(chan struct{}), + } + cmd.Stdout = process.logWriter() + cmd.Stderr = process.logWriter() + + if err := cmd.Start(); err != nil { + t.Fatalf("start %s: %v", name, err) + } + + go func() { + process.waitErr = cmd.Wait() + close(process.doneCh) + }() + + t.Cleanup(func() { + process.Stop(t) + if t.Failed() { + t.Logf("%s logs:\n%s", name, process.Logs()) + } + }) + + return process +} + +// Stop asks the process to terminate gracefully and waits for completion. +func (p *Process) Stop(t testing.TB) { + t.Helper() + + if p == nil { + return + } + + select { + case <-p.doneCh: + err := p.waitErr + if err != nil && !isExpectedProcessExit(err) { + t.Errorf("%s exited unexpectedly: %v", p.name, err) + } + return + default: + } + + if p.cmd.Process != nil { + _ = p.cmd.Process.Signal(syscall.SIGTERM) + } + + select { + case <-p.doneCh: + err := p.waitErr + if err != nil && !isExpectedProcessExit(err) { + t.Errorf("%s exited unexpectedly: %v", p.name, err) + } + case <-time.After(defaultStopWait): + if p.cmd.Process != nil { + _ = p.cmd.Process.Kill() + } + <-p.doneCh + err := p.waitErr + if err != nil && !isExpectedProcessExit(err) { + t.Errorf("%s exited unexpectedly: %v", p.name, err) + } + } +} + +// Logs returns the captured combined stdout/stderr output of the process. +func (p *Process) Logs() string { + if p == nil { + return "" + } + + p.logsMu.Lock() + defer p.logsMu.Unlock() + return p.logs.String() +} + +// FreeTCPAddress reserves one ephemeral loopback TCP address and releases it +// immediately so a service process can bind to it. +func FreeTCPAddress(t testing.TB) string { + t.Helper() + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("reserve free TCP address: %v", err) + } + + addr := listener.Addr().String() + if err := listener.Close(); err != nil { + t.Fatalf("release reserved TCP address: %v", err) + } + + return addr +} + +// WaitForHTTPStatus waits until url responds with wantStatus or fails when the +// backing process exits early. +func WaitForHTTPStatus(t testing.TB, process *Process, url string, wantStatus int) { + t.Helper() + + client := &http.Client{ + Timeout: 250 * time.Millisecond, + Transport: &http.Transport{ + DisableKeepAlives: true, + }, + } + defer client.CloseIdleConnections() + + ctx, cancel := context.WithTimeout(context.Background(), defaultStartupWait) + defer cancel() + + ticker := time.NewTicker(defaultPollInterval) + defer ticker.Stop() + + for { + if err := processErr(process); err != nil { + t.Fatalf("%s exited before %s became ready: %v\n%s", process.name, url, err, process.Logs()) + } + + request, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + t.Fatalf("build readiness request for %s: %v", url, err) + } + + response, err := client.Do(request) + if err == nil { + _, _ = io.Copy(io.Discard, response.Body) + response.Body.Close() + if response.StatusCode == wantStatus { + return + } + } + + select { + case <-ctx.Done(): + t.Fatalf("wait for %s status %d: %v\n%s", url, wantStatus, ctx.Err(), process.Logs()) + case <-ticker.C: + } + } +} + +// WaitForTCP waits until address accepts TCP connections or fails when the +// backing process exits early. +func WaitForTCP(t testing.TB, process *Process, address string) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), defaultStartupWait) + defer cancel() + + ticker := time.NewTicker(defaultPollInterval) + defer ticker.Stop() + + for { + if err := processErr(process); err != nil { + t.Fatalf("%s exited before %s became reachable: %v\n%s", process.name, address, err, process.Logs()) + } + + conn, err := net.DialTimeout("tcp", address, 100*time.Millisecond) + if err == nil { + _ = conn.Close() + return + } + + select { + case <-ctx.Done(): + t.Fatalf("wait for %s TCP readiness: %v\n%s", address, ctx.Err(), process.Logs()) + case <-ticker.C: + } + } +} + +func (p *Process) logWriter() io.Writer { + return writerFunc(func(data []byte) (int, error) { + p.logsMu.Lock() + defer p.logsMu.Unlock() + return p.logs.Write(data) + }) +} + +func mergeEnvironment(base []string, overrides map[string]string) []string { + values := make(map[string]string, len(base)+len(overrides)) + for _, entry := range base { + name, value, ok := strings.Cut(entry, "=") + if ok { + values[name] = value + } + } + for name, value := range overrides { + values[name] = value + } + + merged := make([]string, 0, len(values)) + for name, value := range values { + merged = append(merged, fmt.Sprintf("%s=%s", name, value)) + } + return merged +} + +func processErr(process *Process) error { + if process == nil { + return errors.New("nil process") + } + + select { + case <-process.doneCh: + return process.waitErr + default: + return nil + } +} + +func isExpectedProcessExit(err error) bool { + if err == nil { + return true + } + + var exitErr *exec.ExitError + if !errors.As(err, &exitErr) { + return false + } + + return exitErr.ExitCode() == -1 +} + +type writerFunc func([]byte) (int, error) + +func (f writerFunc) Write(data []byte) (int, error) { + return f(data) +} diff --git a/integration/internal/harness/user_stub.go b/integration/internal/harness/user_stub.go new file mode 100644 index 0000000..d238921 --- /dev/null +++ b/integration/internal/harness/user_stub.go @@ -0,0 +1,323 @@ +package harness + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "sync" + "testing" +) + +const ( + resolveByEmailPath = "/api/v1/internal/user-resolutions/by-email" + ensureByEmailPath = "/api/v1/internal/users/ensure-by-email" + blockByEmailPath = "/api/v1/internal/user-blocks/by-email" +) + +// EnsureUserCall stores one ensure-by-email request received by the external +// user-service stub. +type EnsureUserCall struct { + // Email identifies the requested login or registration e-mail. + Email string + + // PreferredLanguage stores the forwarded registration-context language. + PreferredLanguage string + + // TimeZone stores the forwarded registration-context time zone. + TimeZone string +} + +// UserStub provides one stateful external HTTP user-service stub. +type UserStub struct { + server *httptest.Server + + mu sync.Mutex + + emailToUserID map[string]string + userIDToEmail map[string]string + blockedEmails map[string]string + blockedUsers map[string]string + ensureCalls []EnsureUserCall + nextUserID int +} + +// NewUserStub starts one stateful external HTTP user-service stub. +func NewUserStub(t testing.TB) *UserStub { + t.Helper() + + stub := &UserStub{ + emailToUserID: make(map[string]string), + userIDToEmail: make(map[string]string), + blockedEmails: make(map[string]string), + blockedUsers: make(map[string]string), + nextUserID: 1, + } + stub.server = httptest.NewServer(http.HandlerFunc(stub.handle)) + t.Cleanup(stub.server.Close) + return stub +} + +// BaseURL returns the stub base URL suitable for authsession runtime wiring. +func (s *UserStub) BaseURL() string { + if s == nil || s.server == nil { + return "" + } + return s.server.URL +} + +// SeedExisting adds one existing unblocked user record into the stub state. +func (s *UserStub) SeedExisting(email string, userID string) { + s.mu.Lock() + defer s.mu.Unlock() + + s.emailToUserID[email] = userID + s.userIDToEmail[userID] = email +} + +// SeedBlockedEmail adds one blocked e-mail into the stub state. +func (s *UserStub) SeedBlockedEmail(email string, reasonCode string) { + s.mu.Lock() + defer s.mu.Unlock() + + s.blockedEmails[email] = reasonCode + if userID, ok := s.emailToUserID[email]; ok { + s.blockedUsers[userID] = reasonCode + } +} + +// EnsureCalls returns a snapshot of ensure-by-email requests observed by the +// stub so far. +func (s *UserStub) EnsureCalls() []EnsureUserCall { + s.mu.Lock() + defer s.mu.Unlock() + + cloned := make([]EnsureUserCall, len(s.ensureCalls)) + copy(cloned, s.ensureCalls) + return cloned +} + +// Reset clears all stub state and recorded calls. +func (s *UserStub) Reset() { + s.mu.Lock() + defer s.mu.Unlock() + + s.emailToUserID = make(map[string]string) + s.userIDToEmail = make(map[string]string) + s.blockedEmails = make(map[string]string) + s.blockedUsers = make(map[string]string) + s.ensureCalls = nil + s.nextUserID = 1 +} + +func (s *UserStub) handle(writer http.ResponseWriter, request *http.Request) { + switch { + case request.Method == http.MethodPost && request.URL.Path == resolveByEmailPath: + s.handleResolveByEmail(writer, request) + case request.Method == http.MethodGet && strings.HasPrefix(request.URL.Path, "/api/v1/internal/users/") && strings.HasSuffix(request.URL.Path, "/exists"): + s.handleExistsByUserID(writer, request) + case request.Method == http.MethodPost && request.URL.Path == ensureByEmailPath: + s.handleEnsureByEmail(writer, request) + case request.Method == http.MethodPost && strings.HasPrefix(request.URL.Path, "/api/v1/internal/users/") && strings.HasSuffix(request.URL.Path, "/block"): + s.handleBlockByUserID(writer, request) + case request.Method == http.MethodPost && request.URL.Path == blockByEmailPath: + s.handleBlockByEmail(writer, request) + default: + http.NotFound(writer, request) + } +} + +func (s *UserStub) handleResolveByEmail(writer http.ResponseWriter, request *http.Request) { + var payload struct { + Email string `json:"email"` + } + if err := decodeStrictJSONRequest(request, &payload); err != nil { + http.Error(writer, err.Error(), http.StatusBadRequest) + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + if reason, ok := s.blockedEmails[payload.Email]; ok { + writeJSON(writer, http.StatusOK, map[string]any{ + "kind": "blocked", + "block_reason_code": reason, + }) + return + } + + if userID, ok := s.emailToUserID[payload.Email]; ok { + if reason, blocked := s.blockedUsers[userID]; blocked { + writeJSON(writer, http.StatusOK, map[string]any{ + "kind": "blocked", + "block_reason_code": reason, + }) + return + } + + writeJSON(writer, http.StatusOK, map[string]any{ + "kind": "existing", + "user_id": userID, + }) + return + } + + writeJSON(writer, http.StatusOK, map[string]any{"kind": "creatable"}) +} + +func (s *UserStub) handleExistsByUserID(writer http.ResponseWriter, request *http.Request) { + userIDValue := strings.TrimSuffix(strings.TrimPrefix(request.URL.Path, "/api/v1/internal/users/"), "/exists") + userIDValue, err := url.PathUnescape(userIDValue) + if err != nil { + http.Error(writer, err.Error(), http.StatusBadRequest) + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + _, exists := s.userIDToEmail[userIDValue] + writeJSON(writer, http.StatusOK, map[string]bool{"exists": exists}) +} + +func (s *UserStub) handleEnsureByEmail(writer http.ResponseWriter, request *http.Request) { + var payload struct { + Email string `json:"email"` + RegistrationContext *struct { + PreferredLanguage string `json:"preferred_language"` + TimeZone string `json:"time_zone"` + } `json:"registration_context"` + } + if err := decodeStrictJSONRequest(request, &payload); err != nil { + http.Error(writer, err.Error(), http.StatusBadRequest) + return + } + if payload.RegistrationContext == nil { + http.Error(writer, "registration_context must be present", http.StatusBadRequest) + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.ensureCalls = append(s.ensureCalls, EnsureUserCall{ + Email: payload.Email, + PreferredLanguage: payload.RegistrationContext.PreferredLanguage, + TimeZone: payload.RegistrationContext.TimeZone, + }) + + if reason, ok := s.blockedEmails[payload.Email]; ok { + writeJSON(writer, http.StatusOK, map[string]any{ + "outcome": "blocked", + "block_reason_code": reason, + }) + return + } + + if userID, ok := s.emailToUserID[payload.Email]; ok { + if reason, blocked := s.blockedUsers[userID]; blocked { + writeJSON(writer, http.StatusOK, map[string]any{ + "outcome": "blocked", + "block_reason_code": reason, + }) + return + } + + writeJSON(writer, http.StatusOK, map[string]any{ + "outcome": "existing", + "user_id": userID, + }) + return + } + + userID := fmt.Sprintf("user-%d", s.nextUserID) + s.nextUserID++ + s.emailToUserID[payload.Email] = userID + s.userIDToEmail[userID] = payload.Email + + writeJSON(writer, http.StatusOK, map[string]any{ + "outcome": "created", + "user_id": userID, + }) +} + +func (s *UserStub) handleBlockByUserID(writer http.ResponseWriter, request *http.Request) { + userIDValue := strings.TrimSuffix(strings.TrimPrefix(request.URL.Path, "/api/v1/internal/users/"), "/block") + userIDValue, err := url.PathUnescape(userIDValue) + if err != nil { + http.Error(writer, err.Error(), http.StatusBadRequest) + return + } + + var payload struct { + ReasonCode string `json:"reason_code"` + } + if err := decodeStrictJSONRequest(request, &payload); err != nil { + http.Error(writer, err.Error(), http.StatusBadRequest) + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + email, exists := s.userIDToEmail[userIDValue] + if !exists { + writeJSON(writer, http.StatusNotFound, map[string]string{"error": "not found"}) + return + } + + outcome := "blocked" + if _, already := s.blockedUsers[userIDValue]; already { + outcome = "already_blocked" + } + s.blockedUsers[userIDValue] = payload.ReasonCode + s.blockedEmails[email] = payload.ReasonCode + + writeJSON(writer, http.StatusOK, map[string]any{ + "outcome": outcome, + "user_id": userIDValue, + }) +} + +func (s *UserStub) handleBlockByEmail(writer http.ResponseWriter, request *http.Request) { + var payload struct { + Email string `json:"email"` + ReasonCode string `json:"reason_code"` + } + if err := decodeStrictJSONRequest(request, &payload); err != nil { + http.Error(writer, err.Error(), http.StatusBadRequest) + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + outcome := "blocked" + if _, already := s.blockedEmails[payload.Email]; already { + outcome = "already_blocked" + } + s.blockedEmails[payload.Email] = payload.ReasonCode + + response := map[string]any{"outcome": outcome} + if userID, ok := s.emailToUserID[payload.Email]; ok { + s.blockedUsers[userID] = payload.ReasonCode + response["user_id"] = userID + } + + writeJSON(writer, http.StatusOK, response) +} + +func writeJSON(writer http.ResponseWriter, statusCode int, value any) { + payload, err := json.Marshal(value) + if err != nil { + http.Error(writer, err.Error(), http.StatusInternalServerError) + return + } + + writer.Header().Set("Content-Type", "application/json") + writer.WriteHeader(statusCode) + _, _ = writer.Write(payload) +}