VOOZH about

URL: https://deepwiki.com/rudderlabs/rudder-php-sdk/3.1-consumer-base-classes

⇱ Consumer Base Classes | rudderlabs/rudder-php-sdk | DeepWiki


Loading...
Menu

Consumer Base Classes

The Consumer transport layer is built upon two primary abstract classes: Consumer and QueueConsumer. These classes define the interface and shared logic for all data transmission strategies in the RudderStack PHP SDK. While the Consumer class provides a basic contract for event methods, the QueueConsumer introduces sophisticated batching, queue management, and size enforcement logic used by most network-based consumers.

The Consumer Abstract Class

The Rudder\Consumer\Consumer class is the base for all implementations. It establishes the requirement for the six core event types and provides a unified error handling mechanism.

Key Responsibilities

Error Handling Flow

When a consumer encounters a failure (e.g., a file cannot be opened or a network request fails), it invokes handleError. If an error_handler closure was provided in the Rudder::init options, it is executed with the error code and message lib/Consumer/Consumer.php95-98

Sources:


The QueueConsumer Abstract Class

The Rudder\Consumer\QueueConsumer extends Consumer to provide an in-memory buffering system. This allows the SDK to batch multiple events into a single request, significantly improving performance and reducing network overhead.

Queue and Batching Lifecycle

Events are not sent immediately. Instead, they pass through an enqueue process:

  1. Validation: The item is checked against max_item_size_bytes lib/Consumer/QueueConsumer.php164-169
  2. Queue Limits: The system checks if the total queue count exceeds max_queue_size or if the serialized queue size exceeds max_queue_size_bytes lib/Consumer/QueueConsumer.php153-162
  3. Buffering: The message is added to the $this->queue array lib/Consumer/QueueConsumer.php171
  4. Trigger: If the queue size reaches the flush_at threshold, the flush() method is automatically triggered lib/Consumer/QueueConsumer.php173-175

The Flush Process

The flush() method iterates through the queue, slicing it into batches of size flush_at. Each batch is passed to the implementation-specific flushBatch() method lib/Consumer/QueueConsumer.php106-131

Entity Interaction Diagram


Sources:


Configuration and Limits

QueueConsumer enforces several hard limits to prevent memory exhaustion and ensure compatibility with the RudderStack Data Plane API.

Option / PropertyDefault ValueDescription
max_queue_size10000Maximum number of items allowed in the queue lib/Consumer/QueueConsumer.php16
max_queue_size_bytes33554432 (32MB)Maximum serialized size of the entire queue lib/Consumer/QueueConsumer.php17
max_item_size_bytes32000 (32KB)Maximum size of a single event payload lib/Consumer/QueueConsumer.php20
max_batch_size_bytes512000 (500KB)Maximum size of a batched request sent to the server lib/Consumer/QueueConsumer.php19
flush_at100Number of items to buffer before triggering a flush lib/Consumer/QueueConsumer.php18
flush_interval10000 (10s)Milliseconds to wait between sending consecutive batches during a flush lib/Consumer/QueueConsumer.php24

Lifecycle Diagram: Code Entity Space

This diagram maps the internal class properties and methods to the data flow during an event lifecycle.


Sources:


Concrete Implementation Example: File Consumer

The File consumer is a non-queueing implementation of the Consumer base class. It demonstrates the simplest implementation of the transport layer by writing events directly to a local log file as line-delimited JSON.

Implementation Details

  • Constructor: Opens a file handle with ab (append binary) mode and sets file permissions (default 0644) lib/Consumer/File.php22-40
  • Write Logic: Every event call (track, identify, etc.) calls the private write() method, which json_encodes the message and appends a newline lib/Consumer/File.php68-78
  • Destructor: Ensures the file handle is closed when the consumer object is destroyed lib/Consumer/File.php42-50

Sources: