49 lines
1.3 KiB
Go
49 lines
1.3 KiB
Go
package telemetry
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"galaxy/gateway/internal/push"
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
)
|
|
|
|
// PushObserver adapts Runtime to the push.Observer interface.
|
|
type PushObserver struct {
|
|
runtime *Runtime
|
|
}
|
|
|
|
// NewPushObserver constructs a push stream observer backed by runtime.
|
|
func NewPushObserver(runtime *Runtime) *PushObserver {
|
|
return &PushObserver{runtime: runtime}
|
|
}
|
|
|
|
// Registered records one active push stream.
|
|
func (o *PushObserver) Registered(_ push.StreamBinding) {
|
|
if o == nil || o.runtime == nil {
|
|
return
|
|
}
|
|
|
|
o.runtime.AddActivePushStream(context.Background(), 1)
|
|
}
|
|
|
|
// Unregistered records one active-stream decrement and one closure reason for
|
|
// hub-enforced shutdown, overflow, or revocation.
|
|
func (o *PushObserver) Unregistered(_ push.StreamBinding, err error) {
|
|
if o == nil || o.runtime == nil {
|
|
return
|
|
}
|
|
|
|
o.runtime.AddActivePushStream(context.Background(), -1)
|
|
|
|
switch {
|
|
case errors.Is(err, push.ErrSubscriptionOverflow):
|
|
o.runtime.RecordPushStreamClosure(context.Background(), attribute.String("reason", "overflow"))
|
|
case errors.Is(err, push.ErrSubscriptionRevoked):
|
|
o.runtime.RecordPushStreamClosure(context.Background(), attribute.String("reason", "revoked"))
|
|
case errors.Is(err, push.ErrHubShuttingDown):
|
|
o.runtime.RecordPushStreamClosure(context.Background(), attribute.String("reason", "shutdown"))
|
|
}
|
|
}
|