78 lines
2.2 KiB
Go
78 lines
2.2 KiB
Go
package push_test
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"galaxy/gateway/internal/push"
|
|
"galaxy/gateway/internal/telemetry"
|
|
"galaxy/gateway/internal/testutil"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestHubObserverClassifiesClosureReasons(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
logger, _ := testutil.NewObservedLogger(t)
|
|
telemetryRuntime := testutil.NewTelemetryRuntime(t, logger)
|
|
hub := push.NewHubWithObserver(1, telemetry.NewPushObserver(telemetryRuntime))
|
|
|
|
overflow, err := hub.Register(push.StreamBinding{
|
|
UserID: "user-123",
|
|
DeviceSessionID: "device-session-overflow",
|
|
})
|
|
require.NoError(t, err)
|
|
revoked, err := hub.Register(push.StreamBinding{
|
|
UserID: "user-123",
|
|
DeviceSessionID: "device-session-revoked",
|
|
})
|
|
require.NoError(t, err)
|
|
shutdown, err := hub.Register(push.StreamBinding{
|
|
UserID: "user-123",
|
|
DeviceSessionID: "device-session-shutdown",
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
hub.Publish(push.Event{
|
|
UserID: "user-123",
|
|
DeviceSessionID: "device-session-overflow",
|
|
EventType: "fleet.updated",
|
|
EventID: "event-1",
|
|
PayloadBytes: []byte("payload-1"),
|
|
})
|
|
hub.Publish(push.Event{
|
|
UserID: "user-123",
|
|
DeviceSessionID: "device-session-overflow",
|
|
EventType: "fleet.updated",
|
|
EventID: "event-2",
|
|
PayloadBytes: []byte("payload-2"),
|
|
})
|
|
hub.RevokeDeviceSession("device-session-revoked")
|
|
hub.Shutdown()
|
|
|
|
select {
|
|
case <-overflow.Done():
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "overflow subscription did not close")
|
|
}
|
|
select {
|
|
case <-revoked.Done():
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "revoked subscription did not close")
|
|
}
|
|
select {
|
|
case <-shutdown.Done():
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "shutdown subscription did not close")
|
|
}
|
|
|
|
metricsText := testutil.ScrapeMetrics(t, telemetryRuntime.Handler())
|
|
assert.Contains(t, metricsText, `gateway_push_stream_closures_total`)
|
|
assert.Contains(t, metricsText, `reason="overflow"`)
|
|
assert.Contains(t, metricsText, `reason="revoked"`)
|
|
assert.Contains(t, metricsText, `reason="shutdown"`)
|
|
assert.Contains(t, metricsText, `gateway_push_active_streams`)
|
|
}
|