VOOZH about

URL: https://deepwiki.com/hypervel/queue/4.3-sqs-driver

⇱ SQS Driver | hypervel/queue | DeepWiki


Loading...
Menu

SQS Driver

Purpose and Scope

This document covers the SQS (Simple Queue Service) driver implementation for AWS integration. The SqsQueue class provides a queue backend that stores jobs in Amazon SQS, supporting both standard and FIFO queues. This driver handles AWS authentication, message operations, queue metrics, and connection pooling for efficient resource utilization.

For information about other queue drivers, see Queue Drivers. For configuration details across all drivers, see Driver Configuration.


Overview

The SQS driver integrates AWS Simple Queue Service as a queue backend through the SqsQueue class. This driver implements both the Queue contract and the ClearableQueue interface, providing standard queue operations plus bulk deletion capabilities.

Key Components

ComponentPurpose
SqsQueueMain queue implementation that wraps AWS SQS SDK client
SqsJobJob representation for messages retrieved from SQS
SqsConnectorFactory that creates SqsQueue instances from configuration
SqsClientAWS SDK client for SQS API operations

The SQS driver is unique among queue backends in that it:

  • Requires external AWS service connectivity
  • Uses connection pooling for expensive client connections
  • Supports FIFO (First-In-First-Out) queues with ordering guarantees
  • Provides approximate queue metrics due to SQS's distributed nature
  • Cannot provide creation timestamps for oldest pending jobs

Sources: src/SqsQueue.php1-244


Architecture Diagram


Sources: src/SqsQueue.php1-244 High-Level Diagram 2


Configuration

The SQS driver is configured in the queue.php configuration file. Connection pooling is enabled by default for SQS to optimize AWS SDK client reuse.

Configuration Structure


Configuration Parameters

ParameterTypeRequiredDefaultDescription
driverstringYes-Must be 'sqs'
keystringYes-AWS access key ID for authentication
secretstringYes-AWS secret access key for authentication
prefixstringYes-Base URL for queue (e.g., https://sqs.us-east-1.amazonaws.com/123456789)
queuestringNo'default'Default queue name
suffixstringNo''Optional suffix appended to queue names
regionstringNo'us-east-1'AWS region for SQS service
after_commitbooleanNofalseWhether to defer job dispatching until DB transaction commits
pool.min_objectsintNo1Minimum number of pooled SQS client connections
pool.max_objectsintNo10Maximum number of pooled SQS client connections
pool.wait_timeoutfloatNo3.0Seconds to wait for available connection from pool
pool.max_lifetimefloatNo60.0Maximum lifetime in seconds for pooled connections

Sources: publish/queue.php82-97


Constructor and Initialization

The SqsQueue class is instantiated by the SqsConnector with the configured AWS SQS client and connection parameters.

Constructor Signature


Constructor Parameters

ParameterPurpose
$sqsConfigured AWS SQS SDK client instance
$defaultDefault queue name when none specified
$prefixURL prefix prepended to queue names (typically the base SQS URL)
$suffixOptional suffix appended to queue names (before .fifo for FIFO queues)
$dispatchAfterCommitWhether to delay job dispatch until database transaction commits

Sources: src/SqsQueue.php28-40


Queue Operations

The SQS driver implements standard queue operations for job lifecycle management.

Message Flow Diagram


Push Operation

The push() method sends a job immediately to the queue.

Method signature:


Implementation: The method uses enqueueUsing() to handle payload creation and calls pushRaw() to send the message via SqsClient::sendMessage(). Returns the AWS Message ID.

Sources: src/SqsQueue.php116-127

Push Raw Payload

The pushRaw() method sends a pre-serialized payload directly to SQS.

Method signature:


Implementation: Directly invokes SqsClient::sendMessage() with the queue URL and message body, returning the Message ID.

Sources: src/SqsQueue.php132-138

Delayed Push Operation

The later() method schedules a job to be available after a specified delay.

Method signature:


Implementation: Converts the delay to seconds using secondsUntil() and sends the message with the DelaySeconds parameter. SQS supports delays up to 15 minutes (900 seconds).

Sources: src/SqsQueue.php143-158

Bulk Push Operation

The bulk() method dispatches multiple jobs in a single operation.

Method signature:


Implementation: Iterates through the jobs array, calling either later() or push() depending on whether each job has a delay property. Returns null as no aggregate result is provided.

Note: This implementation does not use SQS batch operations (sendMessageBatch), instead sending messages individually.

Sources: src/SqsQueue.php163-174

Pop Operation

The pop() method retrieves the next available job from the queue.

Method signature:


Implementation: Calls SqsClient::receiveMessage() requesting the ApproximateReceiveCount attribute. If a message is available, wraps it in an SqsJob instance. Returns null if the queue is empty.

Key details:

  • Retrieves one message per call (no batch receiving)
  • Message becomes invisible to other consumers (reserved) until processed or visibility timeout expires
  • Receipt handle is stored in the SqsJob for acknowledgment/deletion

Sources: src/SqsQueue.php179-197


Queue Metrics

The SQS driver provides methods to query approximate queue sizes. These metrics are approximate due to SQS's distributed architecture.

Metrics Methods Table

MethodPurposeSQS Attributes Queried
size()Total jobs in queue (pending + delayed + reserved)ApproximateNumberOfMessages
ApproximateNumberOfMessagesDelayed
ApproximateNumberOfMessagesNotVisible
pendingSize()Jobs available for immediate processingApproximateNumberOfMessages
delayedSize()Jobs waiting for delay period to expireApproximateNumberOfMessagesDelayed
reservedSize()Jobs currently being processed (invisible)ApproximateNumberOfMessagesNotVisible
creationTimeOfOldestPendingJob()Timestamp of oldest pending jobNot supported - returns null

Size Calculation Diagram


Size Method

Returns the total number of jobs across all states.

Implementation:


Sources: src/SqsQueue.php45-61

Individual Metric Methods

Pending Size:


Returns only jobs immediately available for processing.

Delayed Size:


Returns only jobs waiting for their delay period to expire.

Reserved Size:


Returns only jobs currently being processed (invisible to other consumers).

Sources: src/SqsQueue.php66-100

Unsupported: Creation Time

The creationTimeOfOldestPendingJob() method always returns null because SQS does not expose message creation timestamps through its API.

Sources: src/SqsQueue.php107-111


FIFO Queue Support

The SQS driver supports both standard queues and FIFO (First-In-First-Out) queues. FIFO queues provide message ordering guarantees and exactly-once processing.

FIFO Queue Identification

FIFO queues are identified by the .fifo suffix in their queue name. The driver automatically handles FIFO queue URL construction.

URL Construction for FIFO Queues


Suffix Queue Method

The suffixQueue() method handles FIFO queue URL construction specially to preserve the .fifo extension.

Implementation logic:

  1. Check if queue name ends with .fifo
  2. If FIFO:
    • Extract base name (remove .fifo)
    • Append configured suffix to base name
    • Re-append .fifo extension
  3. If standard:
    • Append configured suffix directly

Code:


Example:

  • Input: orders.fifo, suffix: -staging
  • Output: https://sqs.us-east-1.amazonaws.com/123456789/orders-staging.fifo

Sources: src/SqsQueue.php226-235


Queue URL Construction

The SQS driver constructs full queue URLs from configuration and queue names using the getQueue() method.

URL Construction Flow


Get Queue Method

Method signature:


Implementation:

  1. Use provided queue name, or fall back to $this->default
  2. Validate if the queue name is already a full URL
  3. If full URL, return as-is
  4. If queue name, construct URL using suffixQueue()

This allows both queue names ('my-queue') and full URLs ('https://sqs.us-east-1.amazonaws.com/123456789/my-queue') to be used interchangeably.

Sources: src/SqsQueue.php214-221

URL Construction Examples

ConfigurationQueue ParameterResult URL
prefix: 'https://sqs.us-east-1.amazonaws.com/123456789'
suffix: ''
'default'https://sqs.us-east-1.amazonaws.com/123456789/default
prefix: 'https://sqs.us-east-1.amazonaws.com/123456789'
suffix: '-prod'
'orders'https://sqs.us-east-1.amazonaws.com/123456789/orders-prod
prefix: 'https://sqs.us-east-1.amazonaws.com/123456789'
suffix: '-staging'
'notifications.fifo'https://sqs.us-east-1.amazonaws.com/123456789/notifications-staging.fifo
Any config'https://sqs.eu-west-1.amazonaws.com/987654321/custom-queue'https://sqs.eu-west-1.amazonaws.com/987654321/custom-queue

Clear Operation

The SQS driver implements the ClearableQueue interface, providing the ability to delete all messages from a queue.

Clear Method

Method signature:


Implementation: Uses SqsClient::purgeQueue() to delete all messages. Returns the approximate queue size before purging.

Important notes:

  • Purging is asynchronous and may take up to 60 seconds to complete
  • Messages sent to the queue during the purge operation may be deleted
  • Recently sent messages (< 5 minutes) may not be purged immediately

Code:


Sources: src/SqsQueue.php202-209


Connection Pooling

The SQS driver uses connection pooling to optimize AWS SDK client reuse, reducing the overhead of creating new authenticated connections for each operation.

Pool Configuration

Connection pooling is configured in the pool section of the SQS connection configuration:


Pooling Mechanism

The QueueManager wraps SQS connections with a PoolProxy when the connection is identified as "poolable". The proxy manages a pool of SqsClient instances, providing connections on demand and recycling them after use.

Benefits:

  • Reduced connection overhead for high-throughput applications
  • Better resource utilization in coroutine environments
  • Automatic connection lifecycle management

Sources: publish/queue.php91-96 High-Level Diagram 2


Integration with SqsJob

When a message is retrieved via pop(), it is wrapped in an SqsJob instance that provides job-specific operations.

SqsJob Creation


SqsJob Constructor Parameters

The SqsJob constructor receives:

  • $container: DI container for resolving dependencies
  • $sqs: The SqsClient instance for delete/release operations
  • $message: Raw message data from SQS API
  • $connectionName: Name of the queue connection (e.g., 'sqs')
  • $queue: Full queue URL where the message originated

Sources: src/SqsQueue.php187-193


Client Access

The SQS driver provides access to the underlying AWS SDK client for advanced operations.

Get SQS Client Method

Method signature:


Returns the configured SqsClient instance for direct AWS API access when needed for operations not exposed by the queue interface.

Sources: src/SqsQueue.php240-243


Summary

The SQS driver provides enterprise-grade queue functionality through AWS Simple Queue Service integration. Key capabilities include:

  • Standard and FIFO queue support with automatic URL construction
  • Approximate queue metrics for monitoring (size, pending, delayed, reserved)
  • Connection pooling for optimized AWS SDK client reuse
  • Delayed job dispatch up to 15 minutes
  • Queue clearing via purge operations
  • Flexible queue specification supporting both names and full URLs

The driver integrates seamlessly with the Hypervel queue system while exposing SQS-specific features like FIFO ordering and AWS-native monitoring capabilities.

Sources: src/SqsQueue.php1-244 publish/queue.php82-97

Refresh this wiki

On this page