http connector first impl

This commit is contained in:
Ilia Denisov
2026-03-12 23:45:06 +02:00
committed by GitHub
parent f985370089
commit 9adadc3bbf
13 changed files with 962 additions and 41 deletions
+201
View File
@@ -3,12 +3,46 @@ package http
import (
"context"
"encoding/json"
"errors"
"fmt"
"galaxy/connector"
"math/rand/v2"
"net"
"net/http"
"net/url"
"path"
"strings"
"time"
)
const (
// checkConnectionPath is backend endpoint path used to test server reachability.
checkConnectionPath = "api/v1/status"
// checkVersionPath is backend endpoint path used to load available app versions.
checkVersionPath = "api/v1/versions"
// connectTimeout is max time for establishing TCP connection.
connectTimeout = 3 * time.Second
// responseTimeout is max time for waiting response headers from backend.
responseTimeout = 3 * time.Second
)
// defaultRetryCaps defines connect-timeout retry caps for full-jitter backoff.
var defaultRetryCaps = []time.Duration{
5 * time.Second,
15 * time.Second,
30 * time.Second,
60 * time.Second,
}
type httpConnector struct {
ctx context.Context
backendURL *url.URL // HTTP REST API Server URL
httpClient *http.Client
retryCaps []time.Duration
jitterFn func(time.Duration) time.Duration
sleepFn func(context.Context, time.Duration) error
}
func NewHttpConnector(ctx context.Context, backendURL string) (*httpConnector, error) {
@@ -19,6 +53,173 @@ func NewHttpConnector(ctx context.Context, backendURL string) (*httpConnector, e
h := &httpConnector{
ctx: ctx,
backendURL: u,
httpClient: newHTTPClient(connectTimeout, responseTimeout),
retryCaps: append([]time.Duration(nil), defaultRetryCaps...),
jitterFn: fullJitter,
sleepFn: sleepWithContext,
}
return h, nil
}
// newHTTPClient builds dedicated HTTP client with separate timeouts
// for connect and response phases.
func newHTTPClient(connectTimeout, responseTimeout time.Duration) *http.Client {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.DialContext = (&net.Dialer{
Timeout: connectTimeout,
KeepAlive: 30 * time.Second,
}).DialContext
transport.TLSHandshakeTimeout = connectTimeout
transport.ResponseHeaderTimeout = responseTimeout
return &http.Client{
Transport: transport,
}
}
func (h *httpConnector) requestContext() context.Context {
if h.ctx != nil {
return h.ctx
}
return context.Background()
}
// fullJitter calculates random wait duration in [0, cap].
func fullJitter(cap time.Duration) time.Duration {
if cap <= 0 {
return 0
}
return time.Duration(rand.Int64N(cap.Nanoseconds() + 1))
}
// sleepWithContext blocks for the given duration or until context cancellation.
func sleepWithContext(ctx context.Context, d time.Duration) error {
if d <= 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}
// isConnectTimeout returns true for dial and TLS-handshake timeout errors.
func isConnectTimeout(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}
var urlErr *url.Error
if errors.As(err, &urlErr) {
err = urlErr.Err
}
if strings.Contains(err.Error(), "timeout awaiting response headers") {
return false
}
if strings.Contains(err.Error(), "TLS handshake timeout") {
return true
}
var opErr *net.OpError
if errors.As(err, &opErr) {
return opErr.Op == "dial" && opErr.Timeout()
}
return false
}
// CheckConnection probes backend status endpoint and reports whether server is reachable.
func (h *httpConnector) CheckConnection() bool {
resp, err := h.doRequest(h.requestContext(), checkConnectionPath)
if err != nil {
return false
}
defer resp.Body.Close()
return true
}
// CheckVersion loads available app versions from backend and returns parsed version metadata.
func (h *httpConnector) CheckVersion() ([]connector.VersionInfo, error) {
resp, err := h.doRequest(h.requestContext(), checkVersionPath)
if err != nil {
return nil, fmt.Errorf("request versions from backend: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("request versions from backend: unexpected status code %d", resp.StatusCode)
}
var versions []connector.VersionInfo
if err := json.NewDecoder(resp.Body).Decode(&versions); err != nil {
return nil, fmt.Errorf("decode versions response: %w", err)
}
return versions, nil
}
// doRequest performs GET request for a backend relative endpoint with passed context.
func (h *httpConnector) doRequest(ctx context.Context, relativePath string) (*http.Response, error) {
requestURL := *h.backendURL
requestURL.Path = path.Join(requestURL.Path, relativePath)
retryCaps := h.retryCaps
if retryCaps == nil {
retryCaps = defaultRetryCaps
}
jitterFn := h.jitterFn
if jitterFn == nil {
jitterFn = fullJitter
}
sleepFn := h.sleepFn
if sleepFn == nil {
sleepFn = sleepWithContext
}
var lastErr error
for attempt := 0; attempt <= len(retryCaps); attempt++ {
if attempt > 0 {
delay := jitterFn(retryCaps[attempt-1])
if delay < 0 {
delay = 0
}
if err := sleepFn(ctx, delay); err != nil {
return nil, err
}
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL.String(), nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
resp, err := h.httpClient.Do(req)
if err == nil {
return resp, nil
}
if !isConnectTimeout(err) {
return nil, err
}
lastErr = err
}
return nil, lastErr
}
+595
View File
@@ -0,0 +1,595 @@
package http
import (
"context"
"errors"
"galaxy/connector"
"io"
"net"
stdhttp "net/http"
"net/http/httptest"
"net/url"
"reflect"
"strings"
"testing"
"time"
)
// checkConnectionCase describes one CheckConnection behavior scenario.
type checkConnectionCase struct {
name string
setup func(t *testing.T) (*httpConnector, <-chan string)
want bool
wantPath string
}
// checkVersionCase describes one CheckVersion behavior scenario.
type checkVersionCase struct {
name string
setup func(t *testing.T) (*httpConnector, <-chan string)
want []connector.VersionInfo
wantErr bool
wantPath string
}
// TestCheckConnection verifies backend reachability probe behavior.
func TestCheckConnection(t *testing.T) {
tests := []checkConnectionCase{
{
name: "status 200 returns true",
setup: func(t *testing.T) (*httpConnector, <-chan string) {
return newServerConnector(t, context.Background(), stdhttp.StatusOK, "")
},
want: true,
wantPath: "/api/v1/status",
},
{
name: "non-2xx status returns true",
setup: func(t *testing.T) (*httpConnector, <-chan string) {
return newServerConnector(t, context.Background(), stdhttp.StatusServiceUnavailable, "")
},
want: true,
wantPath: "/api/v1/status",
},
{
name: "canceled context returns false",
setup: func(t *testing.T) (*httpConnector, <-chan string) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
conn, err := NewHttpConnector(ctx, "http://127.0.0.1")
if err != nil {
t.Fatalf("NewHttpConnector() error = %v", err)
}
return conn, nil
},
want: false,
},
{
name: "transport failure returns false",
setup: func(t *testing.T) (*httpConnector, <-chan string) {
return newUnreachableConnector(t, context.Background()), nil
},
want: false,
},
{
name: "backend path prefix is preserved",
setup: func(t *testing.T) (*httpConnector, <-chan string) {
return newServerConnector(t, context.Background(), stdhttp.StatusOK, "/base")
},
want: true,
wantPath: "/base/api/v1/status",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conn, pathCh := tt.setup(t)
got := conn.CheckConnection()
if got != tt.want {
t.Fatalf("CheckConnection() = %v, want %v", got, tt.want)
}
if tt.wantPath == "" {
return
}
select {
case gotPath := <-pathCh:
if gotPath != tt.wantPath {
t.Fatalf("request path = %q, want %q", gotPath, tt.wantPath)
}
default:
t.Fatalf("expected request path %q, got no request", tt.wantPath)
}
})
}
}
// TestCheckVersion verifies versions retrieval behavior.
func TestCheckVersion(t *testing.T) {
tests := []checkVersionCase{
{
name: "status 200 with valid body returns versions",
setup: func(t *testing.T) (*httpConnector, <-chan string) {
return newVersionServerConnector(
t,
context.Background(),
stdhttp.StatusOK,
`[{"os":"darwin","version":"1.2.3","url":"https://example.com/darwin"}]`,
"",
)
},
want: []connector.VersionInfo{
{OS: "darwin", Version: "1.2.3", URL: "https://example.com/darwin"},
},
wantPath: "/api/v1/versions",
},
{
name: "status 200 with invalid json returns error",
setup: func(t *testing.T) (*httpConnector, <-chan string) {
return newVersionServerConnector(
t,
context.Background(),
stdhttp.StatusOK,
`{"versions":`,
"",
)
},
wantErr: true,
wantPath: "/api/v1/versions",
},
{
name: "non-200 status returns error",
setup: func(t *testing.T) (*httpConnector, <-chan string) {
return newVersionServerConnector(
t,
context.Background(),
stdhttp.StatusServiceUnavailable,
`[]`,
"",
)
},
wantErr: true,
wantPath: "/api/v1/versions",
},
{
name: "canceled context returns error",
setup: func(t *testing.T) (*httpConnector, <-chan string) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
conn, err := NewHttpConnector(ctx, "http://127.0.0.1")
if err != nil {
t.Fatalf("NewHttpConnector() error = %v", err)
}
return conn, nil
},
wantErr: true,
},
{
name: "transport failure returns error",
setup: func(t *testing.T) (*httpConnector, <-chan string) {
return newUnreachableConnector(t, context.Background()), nil
},
wantErr: true,
},
{
name: "backend path prefix is preserved",
setup: func(t *testing.T) (*httpConnector, <-chan string) {
return newVersionServerConnector(
t,
context.Background(),
stdhttp.StatusOK,
`[{"os":"linux","version":"2.0.0","url":"https://example.com/linux"}]`,
"/base",
)
},
want: []connector.VersionInfo{
{OS: "linux", Version: "2.0.0", URL: "https://example.com/linux"},
},
wantPath: "/base/api/v1/versions",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conn, pathCh := tt.setup(t)
got, err := conn.CheckVersion()
if tt.wantErr {
if err == nil {
t.Fatal("CheckVersion() error = nil, want non-nil")
}
} else {
if err != nil {
t.Fatalf("CheckVersion() error = %v, want nil", err)
}
if !reflect.DeepEqual(got, tt.want) {
t.Fatalf("CheckVersion() = %#v, want %#v", got, tt.want)
}
}
if tt.wantPath == "" {
return
}
select {
case gotPath := <-pathCh:
if gotPath != tt.wantPath {
t.Fatalf("request path = %q, want %q", gotPath, tt.wantPath)
}
default:
t.Fatalf("expected request path %q, got no request", tt.wantPath)
}
})
}
}
// TestDoRequestUsesPassedContext verifies request context is provided by caller.
func TestDoRequestUsesPassedContext(t *testing.T) {
conn, pathCh := newServerConnector(t, context.Background(), stdhttp.StatusOK, "")
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := conn.doRequest(ctx, checkConnectionPath)
if err == nil {
t.Fatal("doRequest() error = nil, want non-nil")
}
if !errors.Is(err, context.Canceled) {
t.Fatalf("doRequest() error = %v, want context canceled", err)
}
select {
case gotPath := <-pathCh:
t.Fatalf("expected no request with canceled context, got %q", gotPath)
default:
}
}
// TestDoRequestResponseHeaderTimeout verifies client distinguishes response timeout.
func TestDoRequestResponseHeaderTimeout(t *testing.T) {
const (
dialTimeout = time.Second
headerTimeout = 30 * time.Millisecond
serverHeaderDelayTime = 150 * time.Millisecond
)
server := httptest.NewServer(stdhttp.HandlerFunc(func(w stdhttp.ResponseWriter, r *stdhttp.Request) {
time.Sleep(serverHeaderDelayTime)
w.WriteHeader(stdhttp.StatusOK)
}))
t.Cleanup(server.Close)
backendURL, err := url.Parse(server.URL)
if err != nil {
t.Fatalf("parse backend URL error = %v", err)
}
conn := &httpConnector{
ctx: context.Background(),
backendURL: backendURL,
httpClient: newHTTPClient(dialTimeout, headerTimeout),
}
start := time.Now()
_, err = conn.doRequest(context.Background(), checkConnectionPath)
elapsed := time.Since(start)
if err == nil {
t.Fatal("doRequest() error = nil, want timeout")
}
if elapsed < headerTimeout {
t.Fatalf("doRequest() elapsed = %v, want >= %v", elapsed, headerTimeout)
}
var netErr net.Error
if !errors.As(err, &netErr) || !netErr.Timeout() {
t.Fatalf("doRequest() error = %v, want timeout error", err)
}
}
// TestDoRequestSuccessFirstAttemptNoRetry verifies successful call does not schedule retries.
func TestDoRequestSuccessFirstAttemptNoRetry(t *testing.T) {
attempts := 0
sleepCalls := 0
jitterCalls := 0
conn := newTransportConnector(t, func(req *stdhttp.Request) (*stdhttp.Response, error) {
attempts++
return &stdhttp.Response{
StatusCode: stdhttp.StatusOK,
Body: io.NopCloser(strings.NewReader("")),
}, nil
})
conn.jitterFn = func(cap time.Duration) time.Duration {
jitterCalls++
return cap
}
conn.sleepFn = func(ctx context.Context, d time.Duration) error {
sleepCalls++
return nil
}
resp, err := conn.doRequest(context.Background(), checkConnectionPath)
if err != nil {
t.Fatalf("doRequest() error = %v, want nil", err)
}
defer resp.Body.Close()
if attempts != 1 {
t.Fatalf("attempts = %d, want 1", attempts)
}
if jitterCalls != 0 {
t.Fatalf("jitter calls = %d, want 0", jitterCalls)
}
if sleepCalls != 0 {
t.Fatalf("sleep calls = %d, want 0", sleepCalls)
}
}
// TestDoRequestConnectTimeoutRetriesWithJitter verifies retries for connect timeout errors.
func TestDoRequestConnectTimeoutRetriesWithJitter(t *testing.T) {
attempts := 0
jitterCaps := make([]time.Duration, 0)
sleepDurations := make([]time.Duration, 0)
conn := newTransportConnector(t, func(req *stdhttp.Request) (*stdhttp.Response, error) {
attempts++
if attempts <= 2 {
return nil, newDialTimeoutError()
}
return &stdhttp.Response{
StatusCode: stdhttp.StatusOK,
Body: io.NopCloser(strings.NewReader("")),
}, nil
})
conn.jitterFn = func(cap time.Duration) time.Duration {
jitterCaps = append(jitterCaps, cap)
return cap
}
conn.sleepFn = func(ctx context.Context, d time.Duration) error {
sleepDurations = append(sleepDurations, d)
return nil
}
resp, err := conn.doRequest(context.Background(), checkConnectionPath)
if err != nil {
t.Fatalf("doRequest() error = %v, want nil", err)
}
defer resp.Body.Close()
if attempts != 3 {
t.Fatalf("attempts = %d, want 3", attempts)
}
wantCaps := []time.Duration{5 * time.Second, 15 * time.Second}
if !reflect.DeepEqual(jitterCaps, wantCaps) {
t.Fatalf("jitter caps = %v, want %v", jitterCaps, wantCaps)
}
if !reflect.DeepEqual(sleepDurations, wantCaps) {
t.Fatalf("sleep durations = %v, want %v", sleepDurations, wantCaps)
}
}
// TestDoRequestConnectTimeoutExhaustsRetries verifies retry count and final timeout error.
func TestDoRequestConnectTimeoutExhaustsRetries(t *testing.T) {
attempts := 0
jitterCaps := make([]time.Duration, 0)
sleepDurations := make([]time.Duration, 0)
conn := newTransportConnector(t, func(req *stdhttp.Request) (*stdhttp.Response, error) {
attempts++
return nil, newDialTimeoutError()
})
conn.jitterFn = func(cap time.Duration) time.Duration {
jitterCaps = append(jitterCaps, cap)
return cap
}
conn.sleepFn = func(ctx context.Context, d time.Duration) error {
sleepDurations = append(sleepDurations, d)
return nil
}
_, err := conn.doRequest(context.Background(), checkConnectionPath)
if err == nil {
t.Fatal("doRequest() error = nil, want timeout")
}
if !isConnectTimeout(err) {
t.Fatalf("doRequest() error = %v, want connect timeout", err)
}
wantCaps := append([]time.Duration(nil), defaultRetryCaps...)
if attempts != len(wantCaps)+1 {
t.Fatalf("attempts = %d, want %d", attempts, len(wantCaps)+1)
}
if !reflect.DeepEqual(jitterCaps, wantCaps) {
t.Fatalf("jitter caps = %v, want %v", jitterCaps, wantCaps)
}
if !reflect.DeepEqual(sleepDurations, wantCaps) {
t.Fatalf("sleep durations = %v, want %v", sleepDurations, wantCaps)
}
}
// TestDoRequestResponseTimeoutNoRetry verifies response timeout does not trigger retries.
func TestDoRequestResponseTimeoutNoRetry(t *testing.T) {
attempts := 0
sleepCalls := 0
jitterCalls := 0
conn := newTransportConnector(t, func(req *stdhttp.Request) (*stdhttp.Response, error) {
attempts++
return nil, newResponseHeaderTimeoutError()
})
conn.jitterFn = func(cap time.Duration) time.Duration {
jitterCalls++
return cap
}
conn.sleepFn = func(ctx context.Context, d time.Duration) error {
sleepCalls++
return nil
}
_, err := conn.doRequest(context.Background(), checkConnectionPath)
if err == nil {
t.Fatal("doRequest() error = nil, want timeout")
}
var netErr net.Error
if !errors.As(err, &netErr) || !netErr.Timeout() {
t.Fatalf("doRequest() error = %v, want timeout error", err)
}
if attempts != 1 {
t.Fatalf("attempts = %d, want 1", attempts)
}
if jitterCalls != 0 {
t.Fatalf("jitter calls = %d, want 0", jitterCalls)
}
if sleepCalls != 0 {
t.Fatalf("sleep calls = %d, want 0", sleepCalls)
}
}
// TestDoRequestContextCanceledDuringBackoff verifies cancellation interrupts retry wait.
func TestDoRequestContextCanceledDuringBackoff(t *testing.T) {
attempts := 0
sleepCalls := 0
conn := newTransportConnector(t, func(req *stdhttp.Request) (*stdhttp.Response, error) {
attempts++
return nil, newDialTimeoutError()
})
ctx, cancel := context.WithCancel(context.Background())
conn.jitterFn = func(cap time.Duration) time.Duration {
return cap
}
conn.sleepFn = func(ctx context.Context, d time.Duration) error {
sleepCalls++
cancel()
return ctx.Err()
}
_, err := conn.doRequest(ctx, checkConnectionPath)
if !errors.Is(err, context.Canceled) {
t.Fatalf("doRequest() error = %v, want context canceled", err)
}
if attempts != 1 {
t.Fatalf("attempts = %d, want 1", attempts)
}
if sleepCalls != 1 {
t.Fatalf("sleep calls = %d, want 1", sleepCalls)
}
}
type roundTripperFunc func(req *stdhttp.Request) (*stdhttp.Response, error)
// RoundTrip implements [http.RoundTripper].
func (f roundTripperFunc) RoundTrip(req *stdhttp.Request) (*stdhttp.Response, error) {
return f(req)
}
// timeoutError simulates a timeout error returned by transport.
type timeoutError struct {
message string
}
func (e timeoutError) Error() string {
return e.message
}
func (e timeoutError) Timeout() bool {
return true
}
func (e timeoutError) Temporary() bool {
return true
}
// newDialTimeoutError builds a connect timeout shaped like dial failure.
func newDialTimeoutError() error {
return &net.OpError{
Op: "dial",
Net: "tcp",
Err: timeoutError{message: "i/o timeout"},
}
}
// newResponseHeaderTimeoutError builds timeout error for response header wait.
func newResponseHeaderTimeoutError() error {
return timeoutError{message: "net/http: timeout awaiting response headers"}
}
// newTransportConnector creates connector with custom round tripper for request tests.
func newTransportConnector(t *testing.T, transport roundTripperFunc) *httpConnector {
t.Helper()
backendURL, err := url.Parse("http://example.com")
if err != nil {
t.Fatalf("parse backend URL error = %v", err)
}
return &httpConnector{
ctx: context.Background(),
backendURL: backendURL,
httpClient: &stdhttp.Client{Transport: transport},
retryCaps: append([]time.Duration(nil), defaultRetryCaps...),
}
}
// newServerConnector creates connector backed by an HTTP test server and captures requested path.
func newServerConnector(t *testing.T, ctx context.Context, status int, backendPath string) (*httpConnector, <-chan string) {
t.Helper()
pathCh := make(chan string, 1)
server := httptest.NewServer(stdhttp.HandlerFunc(func(w stdhttp.ResponseWriter, r *stdhttp.Request) {
pathCh <- r.URL.Path
w.WriteHeader(status)
}))
t.Cleanup(server.Close)
conn, err := NewHttpConnector(ctx, server.URL+backendPath)
if err != nil {
t.Fatalf("NewHttpConnector() error = %v", err)
}
return conn, pathCh
}
// newVersionServerConnector creates connector with configurable response body for versions endpoint tests.
func newVersionServerConnector(t *testing.T, ctx context.Context, status int, body, backendPath string) (*httpConnector, <-chan string) {
t.Helper()
pathCh := make(chan string, 1)
server := httptest.NewServer(stdhttp.HandlerFunc(func(w stdhttp.ResponseWriter, r *stdhttp.Request) {
pathCh <- r.URL.Path
w.WriteHeader(status)
_, _ = w.Write([]byte(body))
}))
t.Cleanup(server.Close)
conn, err := NewHttpConnector(ctx, server.URL+backendPath)
if err != nil {
t.Fatalf("NewHttpConnector() error = %v", err)
}
return conn, pathCh
}
// newUnreachableConnector creates connector pointing to a closed localhost TCP address.
func newUnreachableConnector(t *testing.T, ctx context.Context) *httpConnector {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen error = %v", err)
}
addr := ln.Addr().String()
if err := ln.Close(); err != nil {
t.Fatalf("close listener error = %v", err)
}
conn, err := NewHttpConnector(ctx, "http://"+addr)
if err != nil {
t.Fatalf("NewHttpConnector() error = %v", err)
}
return conn
}