VOOZH about

URL: https://deepwiki.com/friendsofhyperf/components/2.4-message-queue-and-async-job-tracing

⇱ Message Queue and Async Job Tracing | friendsofhyperf/components | DeepWiki


Loading...
Last indexed: 14 February 2026 (15d5ca)
Menu

Message Queue and Async Job Tracing

This document explains how Sentry traces messages and async jobs across AMQP, Kafka, and Hyperf's AsyncQueue systems. It covers the automatic injection of trace context into messages, trace continuation on the consumer side, and the mechanisms used to maintain distributed tracing across asynchronous boundaries.

For the overall distributed tracing architecture and transaction/span hierarchy, see Distributed Tracing Architecture. For details on the AOP aspects that implement this instrumentation, see AOP-Based Instrumentation. For coroutine-safe context storage, see Coroutine Context Management and SentryContext.

Supported Message Queue Systems

The Sentry integration provides automatic tracing for three message queue systems:

Queue SystemProducer AspectConfiguration KeyOperation
AMQP/RabbitMQAmqpProducerAspectsentry.tracing.amqp.enablequeue.publish
Apache KafkaKafkaProducerAspectsentry.tracing.kafka.enablequeue.publish
Hyperf AsyncQueueAsyncQueueJobMessageAspectsentry.tracing.async_queue.enablequeue.publish

Each aspect intercepts message publishing operations and automatically:

  1. Creates a tracing span with operation queue.publish
  2. Injects trace context (traceparent and baggage) into the message
  3. Records message metadata (size, destination, ID) in the span

Sources: src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php1-119 src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php1-128 src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php1-174

Trace Propagation Flow


Trace Propagation Mechanism:

  1. Producer Side: The aspect intercepts the publish operation, extracts the current span, creates a Carrier object containing trace context and metadata, and injects it into the message.

  2. Consumer Side: The message handler extracts the carrier from the message, deserializes it, and uses continueTrace() to create a child transaction linked to the original trace.

Sources: src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php82-116 src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php56-94 src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php96-119

Carrier Object and Trace Context

The Carrier utility class encapsulates trace context for cross-service propagation:


Carrier Structure:

  • sentry-trace: W3C Trace Context traceparent header (trace ID, span ID, sampled flag)
  • baggage: Dynamic sampling context and custom data
  • Metadata: Additional context added via with() method including publish timestamp, message ID, destination name, and body size

Sources: src/sentry/src/Util/Carrier.php (referenced in aspects), src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php85-91 src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php66-72

AMQP/RabbitMQ Tracing

The AmqpProducerAspect intercepts AMQP message publishing operations:


Implementation Details:

The aspect wraps the produceMessage call in a trace() function that creates a span with operation queue.publish src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php82-116 It injects the carrier into the AMQP message's application_headers table using the key Constants::TRACE_CARRIER src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php92-95

Span Data Recorded:

  • messaging.system: "amqp"
  • messaging.operation: "publish"
  • messaging.message.id: Generated Sentry UID
  • messaging.message.body.size: Payload size in bytes
  • messaging.destination.name: Routing key(s) or exchange name
  • messaging.amqp.message.type: Producer message type
  • messaging.amqp.message.routing_key: AMQP routing key
  • messaging.amqp.message.exchange: AMQP exchange name
  • messaging.amqp.message.pool_name: Connection pool name

The aspect reads @Producer annotation metadata when available to populate routing key, exchange, and pool name src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php68-76

Sources: src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php1-119 src/sentry/src/Constants.php

Kafka Tracing

The KafkaProducerAspect intercepts Kafka producer operations and supports both single and batch message publishing:


Single Message Publishing (sendAsync):

For single message operations src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php56-94:

  1. Generates a unique message ID via SentryUid::generate()
  2. Creates a span with the topic as destination name
  3. Injects the carrier as a RecordHeader with key Constants::TRACE_CARRIER

Batch Message Publishing (sendBatchAsync):

For batch operations src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php96-126:

  1. Creates a single parent span for the entire batch
  2. Iterates over each ProduceMessage in the array
  3. Injects a unique carrier into each message's headers
  4. Records the batch count in span data: message.count

Span Data Recorded:

  • messaging.system: "kafka"
  • messaging.operation: "publish"
  • messaging.message.id: Generated Sentry UID (per message)
  • messaging.message.body.size: Message value size in bytes
  • messaging.destination.name: Kafka topic name
  • message.count: Number of messages in batch (batch only)

Sources: src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php1-128 src/sentry/src/Constants.php

AsyncQueue Tracing and Serialization

The AsyncQueueJobMessageAspect provides comprehensive tracing for Hyperf's AsyncQueue system by intercepting multiple lifecycle methods:


Aspect Interception Methods:

  1. handleGet src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php64-72: Captures the queue destination name from DriverFactory::get() and stores it in Context for later use by handlePush.

  2. handlePush src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php74-120:

    • Wraps the push operation in a trace() call
    • Extracts job class and metadata
    • Creates a Carrier and stores it in SentryContext for serialization
    • Records span data including pool configuration for RedisDriver
  3. handleSerialize src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php141-154:

    • Intercepts JobMessage::__serialize() after processing
    • Retrieves the carrier from SentryContext::getCarrier()
    • Injects carrier JSON into the serialized array
    • Handles both list-based and associative array formats
  4. handleUnserialize src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php156-172:

    • Intercepts JobMessage::__unserialize() before processing
    • Extracts carrier JSON from the serialized data
    • Restores carrier to SentryContext::setCarrier()
    • Enables trace continuation in the consumer

Serialization Format:


The aspect supports two serialization formats src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php143-149:

  • List format: Carrier JSON is appended as the last element
  • Associative format: Carrier JSON is stored under key Constants::TRACE_CARRIER

RedisDriver Span Data:

When the driver is a RedisDriver, additional span data is collected src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php122-139:

  • async_queue.channel: Redis channel name from ChannelConfig
  • async_queue.redis_pool: Redis pool name used by the driver

Span Data Recorded:

  • messaging.system: "async_queue"
  • messaging.operation: "publish"
  • messaging.message.id: Job ID (from getId() method or generated UID)
  • messaging.message.body.size: Packed job size in bytes
  • messaging.destination.name: Queue name from Context

Sources: src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php1-174 src/sentry/src/Constants.php src/sentry/src/SentryContext.php

Trace Continuation in Consumers

Consumer-side trace continuation is enabled through the carrier extraction mechanism:


Consumer Implementation Pattern:

  1. Extract Carrier: Read the carrier JSON from message headers/properties based on queue system
  2. Deserialize: Call Carrier::fromJson() to reconstruct the carrier object
  3. Store in Context: Use SentryContext::setCarrier() to make it available to EventHandleListener
  4. Continue Trace: The EventHandleListener (see Event Lifecycle and EventHandleListener) automatically reads the carrier and calls continueTrace() to create a linked transaction
  5. Execute Job: The job handler executes within the child transaction
  6. Finish Transaction: Transaction is automatically finished after job completion

Metadata Availability:

The carrier's metadata (publish time, message ID, destination name, body size, producer name) is available throughout job execution and can be used for:

  • Calculating message latency (current time - publish_time)
  • Correlating logs with message IDs
  • Recording destination information in spans
  • Tracking message origins

Sources: src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php156-172 src/sentry/src/Util/Carrier.php src/sentry/src/SentryContext.php

Configuration and Feature Flags

Message queue tracing is controlled by configuration flags in config/sentry.php:

Configuration KeyAspectDefault
sentry.tracing.amqp.enableAmqpProducerAspecttrue
sentry.tracing.kafka.enableKafkaProducerAspecttrue
sentry.tracing.async_queue.enableAsyncQueueJobMessageAspecttrue

Each aspect checks its corresponding flag before instrumenting operations:

When disabled, the aspects return immediately without creating spans or injecting trace context, allowing zero-overhead operation when tracing is not needed.

Sources: src/sentry/src/Feature.php src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php39-47 src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php39-47 src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php44-53