VOOZH about

URL: https://deepwiki.com/hypervel/queue/4.1-synchronous-and-coroutine-drivers

⇱ Synchronous and Coroutine Drivers | hypervel/queue | DeepWiki


Loading...
Menu

Synchronous and Coroutine Drivers

Purpose and Scope

This document covers the SyncQueue and CoroutineQueue driver implementations, which provide immediate job execution without external storage backends. The SyncQueue executes jobs synchronously in the current process, while the CoroutineQueue executes jobs asynchronously using Hyperf's coroutine runtime. Both drivers support transaction-aware dispatching, deferring job execution until database transactions commit.

For configuration of all queue drivers, see Driver Configuration. For transaction-aware job dispatching behavior, see Transaction-Aware Dispatching.


Driver Overview

The synchronous and coroutine drivers are the simplest queue implementations, designed for immediate execution without persistent storage:

DriverExecution ModelUse CaseStorageDelayed Jobs
SyncQueueSynchronous, blockingTesting, simple tasksNoneNo (executes immediately)
CoroutineQueueAsynchronous, non-blockingConcurrent tasks, I/O operationsNoneNo (executes immediately in coroutine)

Both drivers extend the abstract Queue class and implement the QueueContract interface, but they never store jobs persistently. Instead, they execute jobs immediately upon dispatch.

Sources: src/SyncQueue.php20 src/CoroutineQueue.php11


Class Hierarchy


Sources: src/SyncQueue.php20 src/CoroutineQueue.php11


SyncQueue Implementation

Synchronous Execution Model

The SyncQueue class executes jobs immediately and synchronously within the current process. When a job is pushed, it is not stored but executed immediately using the executeJob method.

Key characteristics:

  • Jobs execute in the same execution context as the dispatcher
  • Exceptions propagate directly to the caller
  • No worker process required
  • Zero queue size (always returns 0)

Sources: src/SyncQueue.php20-192

Job Execution Flow


Sources: src/SyncQueue.php75-109

Core Methods

push Method

The push method is the primary entry point for job dispatching. It checks for transaction-aware dispatching before executing the job:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/src/SyncQueue.php#L75-L87" min=75 max=87 file-path="src/SyncQueue.php">Hii</FileRef>

The method:

  1. Checks if the job implements ShouldQueueAfterCommit or has $afterCommit = true
  2. If transaction-aware and TransactionManager is available, registers a callback with the transaction manager
  3. Otherwise, executes the job immediately via executeJob

Sources: src/SyncQueue.php75-87

executeJob Method

The protected executeJob method performs the actual job execution:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/src/SyncQueue.php#L94-L109" min=94 max=109 file-path="src/SyncQueue.php">Hii</FileRef>

This method:

  1. Creates the job payload using createPayload from the parent Queue class
  2. Resolves a SyncJob instance from the payload
  3. Dispatches the JobProcessing event
  4. Calls fire() on the job to execute it
  5. Dispatches the JobProcessed event on success
  6. Handles exceptions via handleException method

Sources: src/SyncQueue.php94-109

Event Dispatching

The SyncQueue dispatches three types of events during job execution:

EventMethodTiming
JobProcessingraiseBeforeJobEventBefore job execution
JobProcessedraiseAfterJobEventAfter successful execution
JobExceptionOccurredraiseExceptionOccurredJobEventWhen exception occurs

All event dispatching methods check for EventDispatcherInterface availability before dispatching:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/src/SyncQueue.php#L122-L150" min=122 max=150 file-path="src/SyncQueue.php">Hii</FileRef>

Sources: src/SyncQueue.php122-150

Exception Handling

When an exception occurs during job execution, the handleException method:

  1. Dispatches the JobExceptionOccurred event
  2. Calls fail() on the job to mark it as failed
  3. Reports the exception to the ExceptionHandler
  4. Re-throws the exception to the caller
<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/src/SyncQueue.php#L157-L167" min=157 max=167 file-path="src/SyncQueue.php">Hii</FileRef>

This behavior differs from other queue drivers, which handle exceptions internally and may retry jobs.

Sources: src/SyncQueue.php157-167

Queue Size Methods

Since SyncQueue never stores jobs, all size-related methods return 0:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/src/SyncQueue.php#L33-L68" min=33 max=68 file-path="src/SyncQueue.php">Hii</FileRef>

These methods implement the QueueContract interface but have no meaningful values for synchronous execution.

Sources: src/SyncQueue.php33-68


CoroutineQueue Implementation

Asynchronous Execution Model

The CoroutineQueue class extends SyncQueue to provide asynchronous execution using Hyperf's coroutine runtime. Jobs are executed in separate coroutines, allowing concurrent execution without blocking the dispatcher.

Key characteristics:

  • Jobs execute in separate coroutines
  • Non-blocking dispatch
  • Exceptions caught within coroutines (do not propagate to caller)
  • Configurable exception callback for error handling

Sources: src/CoroutineQueue.php11-67

Coroutine Execution Flow


Sources: src/CoroutineQueue.php23-66

Overridden Methods

push Method

The push method is overridden to provide the same transaction-aware dispatching as SyncQueue, but with coroutine execution:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/src/CoroutineQueue.php#L23-L38" min=23 max=38 file-path="src/CoroutineQueue.php">Hii</FileRef>

The method follows the same transaction-aware logic but calls executeJob, which is overridden to use coroutines.

Sources: src/CoroutineQueue.php23-38

executeJob Method

The executeJob method is overridden to wrap job execution in a coroutine:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/src/CoroutineQueue.php#L53-L66" min=53 max=66 file-path="src/CoroutineQueue.php">Hii</FileRef>

This method:

  1. Creates a new coroutine via Coroutine::create()
  2. Calls the parent SyncQueue::executeJob() within the coroutine
  3. Catches any exceptions thrown during execution
  4. Invokes the exceptionCallback if configured
  5. Returns immediately (without waiting for coroutine completion)

Sources: src/CoroutineQueue.php53-66

Exception Handling Callback

The CoroutineQueue provides a mechanism for handling uncaught exceptions via the setExceptionCallback method:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/src/CoroutineQueue.php#L43-L48" min=43 max=48 file-path="src/CoroutineQueue.php">Hii</FileRef>

This callback is invoked when an exception occurs within a coroutine. Unlike SyncQueue, exceptions do not propagate to the caller since the coroutine executes asynchronously.

The callback is typically set by the QueueManagerFactory when an ExceptionHandler is available:


Sources: src/CoroutineQueue.php14-61


Transaction-Aware Dispatching

Both SyncQueue and CoroutineQueue support transaction-aware dispatching, which defers job execution until the current database transaction commits. This ensures that jobs only execute if the transaction succeeds, preventing inconsistent state.

Dispatching Logic


Implementation Details

The shouldDispatchAfterCommit method (inherited from the Queue abstract class) determines if a job should be deferred:

  1. Checks if the job implements the ShouldQueueAfterCommit interface
  2. Checks if the job has an $afterCommit property set to true
  3. Falls back to the queue connection's after_commit configuration value

If dispatching should be deferred and the TransactionManager is available, the job execution is registered as a callback:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/src/SyncQueue.php#L77-L84" min=77 max=84 file-path="src/SyncQueue.php">Hii</FileRef>

The TransactionManager::addCallback() method stores the callback and invokes it when the transaction commits successfully. If the transaction rolls back, the callback is discarded and the job never executes.

Sources: src/SyncQueue.php77-84 src/CoroutineQueue.php26-32


Connector Implementations

SyncConnector

The SyncConnector creates SyncQueue instances with the configured after_commit setting:


Sources: Referenced in system diagrams but not in provided files

CoroutineConnector

The CoroutineConnector creates CoroutineQueue instances:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/src/Connectors/CoroutineConnector.php#L1-L19" min=1 max=19 file-path="src/Connectors/CoroutineConnector.php">Hii</FileRef>

Both connectors implement the ConnectorInterface and are used by the QueueManager to instantiate queue drivers based on configuration.

Sources: src/Connectors/CoroutineConnector.php15-18


Configuration

Queue Configuration File

The synchronous and coroutine drivers are configured in the queue.php configuration file:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/publish/queue.php#L46-L56" min=46 max=56 file-path="publish/queue.php">Hii</FileRef>

There is also a defer driver alias that points to the coroutine driver:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/publish/queue.php#L54-L56" min=54 max=56 file-path="publish/queue.php">Hii</FileRef>

Configuration Options

OptionTypeDefaultDescription
driverstringRequiredMust be 'sync', 'coroutine', or 'defer'
after_commitboolfalseDefer job execution until database transaction commits

Sources: publish/queue.php46-56

Setting Default Driver

To use synchronous or coroutine execution as the default queue driver:

<FileRef file-url="https://github.com/hypervel/queue/blob/37a2c42f/publish/queue.php#L19-L19" min=19 file-path="publish/queue.php">Hii</FileRef>

The QUEUE_CONNECTION environment variable can be set to 'sync', 'coroutine', or 'defer'.

Sources: publish/queue.php19


Driver Comparison

Execution Model Comparison


Sources: src/SyncQueue.php94-109 src/CoroutineQueue.php53-66

Feature Comparison Table

FeatureSyncQueueCoroutineQueue
Execution ModelSynchronous, blockingAsynchronous, non-blocking
ConcurrencyNoneMultiple coroutines
Exception PropagationYes, to callerNo, caught in coroutine
Exception CallbackNoYes, via setExceptionCallback
Transaction-AwareYesYes
Storage BackendNoneNone
Delayed JobsNo (executes immediately)No (executes immediately)
Queue SizeAlways 0Always 0
Worker RequiredNoNo
Use CaseTesting, simple synchronous tasksConcurrent tasks, I/O operations

Sources: src/SyncQueue.php1-192 src/CoroutineQueue.php1-67


Use Cases

When to Use SyncQueue

  1. Testing: Execute jobs synchronously to simplify test assertions
  2. Development: Immediate feedback without worker processes
  3. Simple Tasks: Tasks that don't benefit from asynchronous execution
  4. Debugging: Easier to trace execution flow and exceptions

Example configuration for testing:


Sources: publish/queue.php46-48

When to Use CoroutineQueue

  1. I/O-Bound Tasks: Tasks that involve network requests, database queries, or file operations
  2. Concurrent Execution: Multiple jobs that can run in parallel
  3. Non-Blocking Operations: Tasks that should not block the main application flow
  4. High Throughput: Processing multiple jobs without blocking

Example: Sending multiple HTTP notifications concurrently:


Sources: src/CoroutineQueue.php1-67 publish/queue.php50-52

When NOT to Use These Drivers

These drivers should not be used when:

  • Job persistence is required (use Database, Redis, SQS, or Beanstalkd drivers)
  • Delayed job execution is needed (jobs execute immediately)
  • Retry logic with backoff is required (exceptions are handled differently)
  • Multiple worker processes are needed for scaling
  • Job monitoring and management via console commands is required

For persistent queues with worker support, see Redis Driver, SQS Driver, or Database and Beanstalkd Drivers.

Sources: System architecture context