VOOZH about

URL: https://deepwiki.com/hypervel/bus/4.3-queue-integration

⇱ Queue Integration | hypervel/bus | DeepWiki


Loading...
Menu

Queue Integration

Purpose and Scope

This document explains how the Dispatcher integrates with the queue system to route jobs for asynchronous execution. It covers the dependency injection setup through DispatcherFactory, the queue resolution mechanism, how the dispatcher decides between synchronous and asynchronous execution, and how jobs are submitted to queue connections with specific queue names and delays.

For information about configuring individual job dispatches, see Configuring Job Dispatch. For details on the Queueable trait that provides queue configuration properties to jobs, see Queueable Trait.


Dependency Injection Setup

The queue integration is established at the dependency injection level through two key components: ConfigProvider and DispatcherFactory.

ConfigProvider Registration

The ConfigProvider registers the Dispatcher contract binding in the Hyperf DI container:

ContractFactoryPurpose
DispatcherDispatcherFactoryCreates dispatcher with queue resolver
BatchRepositoryDatabaseBatchRepositoryFactoryProvides batch persistence

src/ConfigProvider.php11-23

The DispatcherContract::class binding points to DispatcherFactory::class, which ensures that whenever the dispatcher is resolved from the container, it receives proper queue integration.

DispatcherFactory

The DispatcherFactory creates a Dispatcher instance with a queue resolver closure:

Dispatcher(
 container: ContainerInterface,
 queueResolver: Closure
)

The queue resolver closure accepts an optional connection name and returns a Queue instance by calling QueueFactoryContract::connection($connection) src/DispatcherFactory.php12-18 This closure is stored in the dispatcher and invoked whenever a job needs to be queued.

Sources: src/ConfigProvider.php11-23 src/DispatcherFactory.php1-19


Queue Resolution Mechanism


Diagram: Dependency Injection and Queue Resolution Setup

The queueResolver is a closure that captures the container and provides lazy queue instantiation. When invoked, it:

  1. Accepts an optional $connection parameter (from the job's connection property)
  2. Calls QueueFactoryContract::connection($connection) to get the appropriate queue driver
  3. Returns a Queue instance configured for the specified connection

This lazy resolution pattern ensures that queue connections are only established when actually needed, and allows jobs to specify different connections dynamically.

Sources: src/DispatcherFactory.php12-18 src/Dispatcher.php40-48


Dispatch Decision Flow

The dispatcher's core dispatch() method determines whether to execute a job synchronously or queue it asynchronously.

Decision Logic


Diagram: Dispatch Decision Tree

The decision is made in two checks src/Dispatcher.php53-58:

  1. Queue Resolver Check: $this->queueResolver must be set (it will be if created via DispatcherFactory)
  2. ShouldQueue Interface Check: commandShouldBeQueued($command) returns true if the command implements ShouldQueue src/Dispatcher.php170-173

If both conditions are met, the job is dispatched to the queue. Otherwise, it executes immediately via dispatchNow().

Synchronous Dispatch Override

The dispatchSync() method provides explicit synchronous execution even for queueable jobs:

ScenarioBehavior
Non-queueable jobExecutes via dispatchNow()
Queueable job with onConnection() methodDispatches to sync queue connection
Queueable job without onConnection()Executes via dispatchNow()

src/Dispatcher.php65-75

The sync queue connection is a special synchronous queue driver that executes jobs immediately in the current process, but still runs them through the queue worker pipeline.

Sources: src/Dispatcher.php53-75 src/Dispatcher.php170-173


Queue Routing and Job Submission

When a job is determined to be queueable, the dispatcher routes it through a multi-stage process to determine the appropriate queue connection, queue name, and delay.

Dispatch to Queue Flow


Diagram: Queue Dispatch Sequence

The dispatchToQueue() method src/Dispatcher.php180-195 performs the following steps:

  1. Extract Connection: Reads $command->connection ?? null to determine the queue connection
  2. Resolve Queue: Calls the queueResolver closure with the connection name
  3. Validate Queue Instance: Throws RuntimeException if the resolver doesn't return a Queue implementation
  4. Custom Queue Method: If the command has a queue() method, delegates to it
  5. Standard Push: Otherwise, calls pushCommandToQueue() with the queue instance

Sources: src/Dispatcher.php180-195

Job Push Routing Logic

The pushCommandToQueue() method src/Dispatcher.php200-215 examines the job's properties to determine the appropriate queue method:


Diagram: Queue Push Method Selection

The routing logic creates a priority order:

Queue PropertyDelay PropertyQueue Method CalledDescription
SetSetlaterOn($queue, $delay, $command)Push to specific queue with delay
SetNot setpushOn($queue, $command)Push to specific queue immediately
Not setSetlater($delay, $command)Push to default queue with delay
Not setNot setpush($command)Push to default queue immediately

These properties (queue, delay, connection) are typically set through the Queueable trait (see Queueable Trait) and configured via PendingDispatch (see Configuring Job Dispatch).

Sources: src/Dispatcher.php200-215


Connection and Queue Configuration

Jobs control their queue routing through three properties managed by the Queueable trait:

Property Overview

PropertyTypeDefaultPurpose
connection`stringnull`null
queue`stringnull`null
delay`DateTimeInterfaceDateIntervalint

Runtime Modification

These properties can be set at dispatch time using the PendingDispatch fluent interface:

dispatch($job)
 ->onConnection('redis')
 ->onQueue('high-priority')
 ->delay(60)

When these methods are called, they modify the job object's properties before it reaches the dispatcher's dispatchToQueue() method.

Connection Resolution

The connection string is passed directly to QueueFactoryContract::connection($connection), which:

  1. Looks up the connection configuration in Hyperf's queue configuration
  2. Instantiates the appropriate queue driver (Redis, Database, Sync, etc.)
  3. Returns a Queue instance bound to that connection

If no connection is specified ($connection === null), the queue factory returns the default connection configured in the application.

Sources: src/Dispatcher.php180-215 src/DispatcherFactory.php12-18


Integration with Hypervel Queue

The Dispatcher depends on the hypervel/queue package for actual queue operations. The integration points are:

Contract Dependencies

ContractFrom PackagePurpose
Queuehypervel/queueQueue driver interface for pushing jobs
QueueFactoryContracthypervel/queueFactory for resolving queue connections
ShouldQueuehypervel/queueMarker interface for queueable jobs

Queue Method Calls

The dispatcher calls the following Queue interface methods:

  • push(mixed $job) - Push job to default queue
  • pushOn(string $queue, mixed $job) - Push job to specific queue
  • later(DateTimeInterface|DateInterval|int $delay, mixed $job) - Push job with delay
  • laterOn(string $queue, DateTimeInterface|DateInterval|int $delay, mixed $job) - Push to specific queue with delay

These methods serialize the job, store it in the queue backend (Redis, database, etc.), and return a job ID. Queue workers from hypervel/queue then pick up and execute these jobs.

Sources: src/Dispatcher.php180-215 src/DispatcherFactory.php12-18


Summary

The queue integration in hypervel/bus is established through:

  1. Dependency Injection: ConfigProvider registers DispatcherFactory to create dispatchers with queue resolution
  2. Lazy Queue Resolution: A closure captures QueueFactoryContract and resolves connections on-demand
  3. Automatic Routing: The dispatcher checks ShouldQueue interface and routes jobs accordingly
  4. Property-Based Configuration: Jobs control routing via connection, queue, and delay properties
  5. Flexible Push Logic: Four queue methods handle different combinations of queue names and delays
  6. External Integration: Delegates to hypervel/queue for actual queue operations and worker execution

This design separates concerns between job configuration (bus layer) and queue execution (queue layer), allowing the bus to focus on dispatching logic while leveraging the full queue infrastructure.

Sources: src/ConfigProvider.php11-23 src/Dispatcher.php1-244 src/DispatcherFactory.php1-19