VOOZH about

URL: https://deepwiki.com/hypervel/queue/2.1-core-components

⇱ Core Components | hypervel/queue | DeepWiki


Loading...
Menu

Core Components

This document details the primary structural components of the Hypervel Queue system: the QueueManager factory and monitor, the Queue abstract class, the connector pattern for driver instantiation, and the dependency injection container integration. These components form the foundational architecture that enables flexible, driver-agnostic queue operations.

For information about specific queue driver implementations, see Queue Drivers. For worker and job processing details, see Worker System. For job lifecycle and state management, see Job Lifecycle.


Component Overview

The Hypervel Queue package is built around four core structural components that work together to provide a flexible, extensible queue system:


Core Component Responsibilities

ComponentFilePrimary Responsibility
QueueManagersrc/QueueManager.phpFactory for creating queue connections; monitors queue sizes; manages connection lifecycle
Queue (abstract)src/Queue.phpBase class providing shared queue functionality: payload creation, encryption, transaction-aware dispatching
Connectorssrc/Connectors/*.phpFactory implementations that instantiate driver-specific queue instances with proper configuration
ConfigProvidersrc/ConfigProvider.phpRegisters all queue services with the DI container; configures SerializableClosure behavior
QueueManagerFactorysrc/QueueManagerFactory.phpFactory that creates configured QueueManager instances with exception handling setup

Sources: src/QueueManager.php1-390 src/Queue.php1-404 src/ConfigProvider.php1-95


QueueManager: Factory and Monitor

The QueueManager class serves as both a factory for creating queue connections and a monitor for tracking queue metrics. It implements the FactoryContract and MonitorContract interfaces, providing a unified facade for all queue operations.

Instantiation and Registration


The QueueManager constructor automatically registers all built-in connectors via registerConnectors() src/QueueManager.php281-292 Each connector is registered as a closure that instantiates the appropriate connector class when invoked:

Connector Registration Methods

MethodDriverConnector ClassLine Reference
registerNullConnector()nullNullConnectorsrc/QueueManager.php297-302
registerSyncConnector()syncSyncConnectorsrc/QueueManager.php307-312
registerDatabaseConnector()databaseDatabaseConnectorsrc/QueueManager.php317-324
registerRedisConnector()redisRedisConnectorsrc/QueueManager.php329-336
registerBeanstalkdConnector()beanstalkdBeanstalkdConnectorsrc/QueueManager.php341-346
registerSqsConnector()sqsSqsConnectorsrc/QueueManager.php351-356
registerDeferConnector()deferDeferConnectorsrc/QueueManager.php361-366
registerCoroutineConnector()coroutineCoroutineConnectorsrc/QueueManager.php371-376
registerFailoverConnector()failoverFailoverConnectorsrc/QueueManager.php381-389

Sources: src/QueueManager.php64-70 src/QueueManager.php281-389 src/QueueManagerFactory.php14-33

Connection Management

The QueueManager maintains a registry of resolved connections in the $connections array property src/QueueManager.php44 Connections are lazily instantiated on first access:


The connection resolution process src/QueueManager.php156-180:

  1. Retrieve Configuration: Load driver configuration from queue.connections.{name} src/QueueManager.php215-222
  2. Get Connector: Retrieve the registered connector closure for the driver src/QueueManager.php187-194
  3. Instantiate Queue: Call connector->connect(config) to create the queue instance
  4. Configure Queue: Set connection name, container reference, and configuration array
  5. Apply Pooling: If driver is in $poolables array (['beanstalkd', 'sqs']), wrap in QueuePoolProxy src/QueueManager.php171-177
  6. Cache: Store resolved connection in $connections array

Sources: src/QueueManager.php137-180 src/QueueManager.php187-222

Event Registration Helpers

The QueueManager provides convenience methods for registering event listeners. These methods delegate to the EventDispatcherInterface src/QueueManager.php75-124:


Event Listener Registration Methods

MethodEvent ClassPurposeLine Reference
before()Events\JobProcessingFired before job executionsrc/QueueManager.php75-79
after()Events\JobProcessedFired after successful job executionsrc/QueueManager.php84-88
exceptionOccurred()Events\JobExceptionOccurredFired when job throws exceptionsrc/QueueManager.php93-97
looping()Events\LoopingFired on each daemon loop iterationsrc/QueueManager.php102-106
failing()Events\JobFailedFired when job fails permanentlysrc/QueueManager.php111-115
stopping()Events\WorkerStoppingFired when worker is stoppingsrc/QueueManager.php120-124

Sources: src/QueueManager.php75-124

Dynamic Method Delegation

The QueueManager implements the __call() magic method to proxy all unhandled method calls to the default queue connection src/QueueManager.php273-276 This allows treating the manager as if it were a queue instance:


Sources: src/QueueManager.php273-276


Queue Abstract Class

The Queue abstract class provides shared functionality for all queue driver implementations. It handles payload creation, encryption, transaction-aware dispatching, and event dispatching.

Class Structure


Sources: src/Queue.php28-404 src/Contracts/Queue.php10-81

Payload Creation

The payload creation system transforms job objects into JSON-encoded strings for storage. The process differs based on whether the job is an object or a string handler:


Object Payload Structure src/Queue.php128-156:


String Payload Structure src/Queue.php233-246:


Payload Customization: Applications can register custom payload hooks via Queue::createPayloadUsing() src/Queue.php251-258 These callbacks receive the connection name, queue name, and payload array, allowing modification before encoding.

Sources: src/Queue.php97-272

Job Metadata Extraction

The Queue class provides methods to extract retry behavior and timeout configuration from job objects:

Job Metadata Methods

MethodExtractsFallbackLine Reference
getJobTries()$job->tries or $job->tries()nullsrc/Queue.php170-181
getJobBackoff()$job->backoff or $job->backoff()nullsrc/Queue.php186-201
getJobExpiration()$job->retryUntil or $job->retryUntil()nullsrc/Queue.php206-216
getDisplayName()$job->displayName()get_class($job)src/Queue.php161-165

The getJobBackoff() method normalizes backoff values to a comma-separated string, converting DateTimeInterface instances to seconds src/Queue.php196-200

Sources: src/Queue.php161-216

Transaction-Aware Dispatching

The enqueueUsing() method implements transaction-aware job dispatching src/Queue.php279-301 When a job implements ShouldQueueAfterCommit or has $afterCommit = true, and the TransactionManager is available, the job is deferred until the current database transaction commits:


Dispatch Conditions src/Queue.php308-319:

  1. Job implements ShouldQueueAfterCommit interface
  2. Job is object with $afterCommit = true property
  3. Queue has $dispatchAfterCommit = true property

Sources: src/Queue.php279-319

Event Dispatching

The Queue class dispatches events at key points in the queueing lifecycle:

Both events receive the connection name, queue name, job object, payload, and delay.

Sources: src/Queue.php326-349


Connector Pattern

Connectors implement the factory pattern, encapsulating the logic for instantiating driver-specific queue instances. Each connector implements the ConnectorInterface which defines a single connect() method.

Connector Architecture


Connector Responsibilities

ConnectorDependenciesQueue CreatedConfiguration Required
SyncConnectorNoneSyncQueueafterCommit flag
CoroutineConnectorNoneCoroutineQueueafterCommit flag, exception callback
RedisConnectorRedisFactoryRedisQueueconnection, queue, retry_after, block_for, after_commit, migration_batch_size
DatabaseConnectorConnectionResolverInterfaceDatabaseQueueconnection, table, queue, retry_after, after_commit
SqsConnectorNoneSqsQueuekey, secret, prefix, queue, region, suffix, after_commit
BeanstalkdConnectorNoneBeanstalkdQueuehost, queue, retry_after, block_for, after_commit
DeferConnectorNoneDeferQueueException callback
FailoverConnectorQueueManager, EventDispatcherFailoverQueueconnections array
NullConnectorNoneNullQueueNone

Sources: src/QueueManager.php281-389 src/Connectors/*.php

Connector Instantiation Example

The RedisConnector demonstrates typical connector implementation src/Connectors/RedisConnector.php:


Each connector extracts relevant configuration values and passes them to the queue constructor. The connector is responsible for resolving any dependencies from the container (e.g., RedisFactory for Redis, ConnectionResolverInterface for Database).

Sources: src/Connectors/RedisConnector.php


Dependency Injection Integration

The ConfigProvider class registers all queue components with the Hyperf DI container, making them available throughout the application.

Service Registration


Service Registration Details src/ConfigProvider.php33-39:


Sources: src/ConfigProvider.php28-63

SerializableClosure Configuration

The ConfigProvider configures SerializableClosure to properly handle variable serialization in job closures src/ConfigProvider.php68-87:


This ensures that closure variables are properly serialized when jobs are queued and restored when jobs are processed.

Sources: src/ConfigProvider.php68-87

Console Command Registration

All console commands are registered in the commands array src/ConfigProvider.php40-53:

Registered Commands

Command ClassCommand NameCategory
WorkCommandqueue:workJob Processing
ListenCommandqueue:listenJob Processing
MonitorCommandqueue:monitorMonitoring
RestartCommandqueue:restartManagement
ClearCommandqueue:clearMaintenance
ListFailedCommandqueue:failedFailed Jobs
RetryCommandqueue:retryFailed Jobs
ForgetFailedCommandqueue:forgetFailed Jobs
FlushFailedCommandqueue:flushFailed Jobs
PruneFailedJobsCommandqueue:prune-failedFailed Jobs
RetryBatchCommandqueue:retry-batchBatching
PruneBatchesCommandqueue:prune-batchesBatching

Sources: src/ConfigProvider.php40-53


Connection Pooling

For drivers with expensive connection overhead (SQS, Beanstalkd), the QueueManager automatically wraps connections in a QueuePoolProxy for connection pooling.

Pool Proxy Architecture


Poolable Drivers src/QueueManager.php59:


When a driver is marked as poolable, the QueueManager wraps it using the HasPoolProxy trait's createPoolProxy() method src/QueueManager.php171-177:


The QueuePoolProxy class implements the Queue contract and delegates all method calls to a pooled connection src/QueuePoolProxy.php13-126:


Pool Configuration: Connection pools are configured via the pool key in the driver configuration. The pool manages connection lifecycle, borrowing connections for operations and returning them when complete.

Sources: src/QueueManager.php34 src/QueueManager.php54 src/QueueManager.php59 src/QueueManager.php171-177 src/QueuePoolProxy.php13-126


Summary

The core components work together to provide a flexible, extensible queue system:

  1. QueueManager acts as the central factory, managing connection lifecycle and providing event registration helpers
  2. Queue abstract class provides shared functionality for payload creation, encryption, and transaction-aware dispatching
  3. Connectors implement the factory pattern, isolating driver instantiation logic
  4. ConfigProvider registers all services with the DI container, enabling dependency injection throughout the system
  5. Connection pooling optimizes performance for expensive connections via QueuePoolProxy

This architecture enables seamless switching between queue backends by simply changing configuration, while maintaining a consistent API across all drivers. The system integrates deeply with the Hyperf framework's DI container, event system, and coroutine runtime.

Sources: src/QueueManager.php1-390 src/Queue.php1-404 src/ConfigProvider.php1-95 src/QueueManagerFactory.php1-34 src/QueuePoolProxy.php1-126