165 lines
5.0 KiB
Go
165 lines
5.0 KiB
Go
package grpcapi
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
|
|
"galaxy/gateway/authn"
|
|
"galaxy/gateway/internal/clock"
|
|
gatewayv1 "galaxy/gateway/proto/galaxy/gateway/v1"
|
|
gatewayfbs "galaxy/schema/fbs/gateway"
|
|
|
|
flatbuffers "github.com/google/flatbuffers/go"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const serverTimeEventType = "gateway.server_time"
|
|
|
|
// authenticatedStreamBinding captures the verified identity bound to one
|
|
// authenticated SubscribeEvents stream after the full ingress pipeline
|
|
// succeeds.
|
|
type authenticatedStreamBinding struct {
|
|
UserID string
|
|
DeviceSessionID string
|
|
MessageType string
|
|
RequestID string
|
|
TraceID string
|
|
}
|
|
|
|
// authenticatedStreamBindingFromContext returns the verified stream binding
|
|
// previously attached to ctx by the authenticated push-stream service.
|
|
func authenticatedStreamBindingFromContext(ctx context.Context) (authenticatedStreamBinding, bool) {
|
|
if ctx == nil {
|
|
return authenticatedStreamBinding{}, false
|
|
}
|
|
|
|
binding, ok := ctx.Value(authenticatedStreamBindingContextKey{}).(authenticatedStreamBinding)
|
|
if !ok {
|
|
return authenticatedStreamBinding{}, false
|
|
}
|
|
|
|
return binding, true
|
|
}
|
|
|
|
// authenticatedPushStreamService owns SubscribeEvents bootstrap behavior:
|
|
// bind the authenticated stream, send the initial signed server-time event,
|
|
// and then hand the stream lifecycle to the configured tail delegate.
|
|
type authenticatedPushStreamService struct {
|
|
gatewayv1.UnimplementedEdgeGatewayServer
|
|
|
|
tailDelegate gatewayv1.EdgeGatewayServer
|
|
responseSigner authn.ResponseSigner
|
|
clock clock.Clock
|
|
}
|
|
|
|
// SubscribeEvents binds the verified stream identity, sends the initial signed
|
|
// server-time event, and then delegates the remaining lifecycle.
|
|
func (s authenticatedPushStreamService) SubscribeEvents(req *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
|
envelope, ok := parsedEnvelopeFromContext(stream.Context())
|
|
if !ok {
|
|
return status.Error(codes.Internal, "authenticated request context is incomplete")
|
|
}
|
|
|
|
record, ok := resolvedSessionFromContext(stream.Context())
|
|
if !ok {
|
|
return status.Error(codes.Internal, "authenticated request context is incomplete")
|
|
}
|
|
|
|
binding := authenticatedStreamBinding{
|
|
UserID: record.UserID,
|
|
DeviceSessionID: record.DeviceSessionID,
|
|
MessageType: envelope.MessageType,
|
|
RequestID: envelope.RequestID,
|
|
TraceID: envelope.TraceID,
|
|
}
|
|
boundStream := authenticatedStreamContextStream{
|
|
ServerStreamingServer: stream,
|
|
ctx: context.WithValue(
|
|
stream.Context(),
|
|
authenticatedStreamBindingContextKey{},
|
|
binding,
|
|
),
|
|
}
|
|
|
|
serverTimeMS := s.clock.Now().UTC().UnixMilli()
|
|
payloadBytes := buildServerTimeEventPayload(serverTimeMS)
|
|
payloadHash := sha256.Sum256(payloadBytes)
|
|
signature, err := s.responseSigner.SignEvent(authn.EventSigningFields{
|
|
EventType: serverTimeEventType,
|
|
EventID: envelope.RequestID,
|
|
TimestampMS: serverTimeMS,
|
|
RequestID: envelope.RequestID,
|
|
TraceID: envelope.TraceID,
|
|
PayloadHash: payloadHash[:],
|
|
})
|
|
if err != nil {
|
|
return status.Error(codes.Unavailable, "response signer is unavailable")
|
|
}
|
|
|
|
if err := boundStream.Send(&gatewayv1.GatewayEvent{
|
|
EventType: serverTimeEventType,
|
|
EventId: envelope.RequestID,
|
|
TimestampMs: serverTimeMS,
|
|
PayloadBytes: bytes.Clone(payloadBytes),
|
|
PayloadHash: bytes.Clone(payloadHash[:]),
|
|
Signature: signature,
|
|
RequestId: envelope.RequestID,
|
|
TraceId: envelope.TraceID,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return s.tailDelegate.SubscribeEvents(req, boundStream)
|
|
}
|
|
|
|
func newAuthenticatedPushStreamService(tailDelegate gatewayv1.EdgeGatewayServer, responseSigner authn.ResponseSigner, clk clock.Clock) gatewayv1.EdgeGatewayServer {
|
|
if tailDelegate == nil {
|
|
tailDelegate = holdOpenSubscribeEventsService{}
|
|
}
|
|
|
|
return authenticatedPushStreamService{
|
|
tailDelegate: tailDelegate,
|
|
responseSigner: responseSigner,
|
|
clock: clk,
|
|
}
|
|
}
|
|
|
|
func buildServerTimeEventPayload(serverTimeMS int64) []byte {
|
|
builder := flatbuffers.NewBuilder(32)
|
|
gatewayfbs.ServerTimeEventStart(builder)
|
|
gatewayfbs.ServerTimeEventAddServerTimeMs(builder, serverTimeMS)
|
|
eventOffset := gatewayfbs.ServerTimeEventEnd(builder)
|
|
gatewayfbs.FinishServerTimeEventBuffer(builder, eventOffset)
|
|
|
|
return bytes.Clone(builder.FinishedBytes())
|
|
}
|
|
|
|
type authenticatedStreamBindingContextKey struct{}
|
|
|
|
type authenticatedStreamContextStream struct {
|
|
grpc.ServerStreamingServer[gatewayv1.GatewayEvent]
|
|
ctx context.Context
|
|
}
|
|
|
|
func (s authenticatedStreamContextStream) Context() context.Context {
|
|
if s.ctx == nil {
|
|
return context.Background()
|
|
}
|
|
|
|
return s.ctx
|
|
}
|
|
|
|
type holdOpenSubscribeEventsService struct {
|
|
gatewayv1.UnimplementedEdgeGatewayServer
|
|
}
|
|
|
|
func (holdOpenSubscribeEventsService) SubscribeEvents(_ *gatewayv1.SubscribeEventsRequest, stream grpc.ServerStreamingServer[gatewayv1.GatewayEvent]) error {
|
|
<-stream.Context().Done()
|
|
return stream.Context().Err()
|
|
}
|
|
|
|
var _ gatewayv1.EdgeGatewayServer = authenticatedPushStreamService{}
|