VOOZH about

URL: https://deepwiki.com/hypervel/components/7-queue-and-job-processing

⇱ Queue and Job Processing | hypervel/components | DeepWiki


Loading...
Last indexed: 7 March 2026 (96fbab)
Menu

Queue and Job Processing

Purpose and Scope

This document describes the queue and job processing system in Hypervel, which provides asynchronous task execution capabilities. The queue system allows applications to defer time-consuming tasks (such as sending emails, processing uploads, or performing API calls) to background workers, improving response times and user experience.

The queue component builds on Hyperf's coroutine runtime while providing a Laravel-compatible API for job dispatching, processing, and failure handling. It supports multiple queue drivers (Redis, Database, Sync) and integrates with the Bus package for command dispatching.

For information about the Bus package and command dispatching patterns, see Bus System. For event-driven job triggering, see Event Broadcasting.

Sources: src/queue/composer.json1-64 composer.json57 composer.json175


Queue System Architecture

The queue system follows a manager pattern with pluggable drivers, allowing different storage backends for queue data. The architecture supports coroutine-safe job processing, serialization with encryption, and coordination with Hyperf's process management.

Core Components Diagram


Sources: src/queue/composer.json22-36 src/bus/composer.json22-34 composer.json32 composer.json57


Queue Manager and Configuration

The QueueManager class serves as the central factory for creating queue connections. It implements the manager pattern (see Manager Pattern) and resolves queue drivers based on configuration.

Configuration Structure

Queue connections are configured in the application's queue configuration file. Each connection specifies a driver and driver-specific options:

Configuration KeyDescriptionExample Values
defaultDefault queue connectionredis, database, sync
connectionsArray of connection configurationsSee below
connections.*.driverQueue driver typeredis, database, sync, sqs, beanstalkd
connections.*.queueDefault queue namedefault, high, low
connections.*.retry_afterJob timeout before retry (seconds)90
connections.*.block_forRedis blocking pop timeoutnull, 5
failedFailed job storage configurationDatabase table/connection

Connection Resolution

The QueueManager uses connector classes to instantiate queue drivers:

  • RedisConnector - Creates RedisQueue instances using Redis lists and sorted sets
  • DatabaseConnector - Creates DatabaseQueue instances using database tables
  • SyncConnector - Creates SyncQueue instances for synchronous execution
  • SqsConnector - Creates SqsQueue instances for AWS SQS (optional)
  • BeanstalkdConnector - Creates BeanstalkdQueue instances for Beanstalkd (optional)

The manager caches instantiated connections to avoid redundant creation within a coroutine context.

Sources: src/queue/composer.json22-36 src/bus/composer.json46-48 composer.json261


Queue Drivers

Redis Queue Driver

The Redis-based queue uses Redis lists for queue storage and sorted sets for delayed jobs. This driver supports:

  • Atomic Operations - Uses Redis transactions for push/pop operations
  • Delayed Jobs - Stores jobs with future release times in sorted sets
  • Job Reservations - Moves jobs to a reserved list during processing
  • Blocking Pop - Optional blocking pop with configurable timeout
  • Multiple Queues - Named queues with priority support

Redis keys follow the pattern:

  • queues:{connection}:{queue} - Main queue list
  • queues:{connection}:{queue}:delayed - Sorted set for delayed jobs
  • queues:{connection}:{queue}:reserved - Reserved jobs during processing

Database Queue Driver

The database-based queue stores jobs in a jobs table with the following schema:

ColumnTypePurpose
idbigintPrimary key
queuestringQueue name
payloadtextSerialized job data
attemptsintNumber of processing attempts
reserved_attimestampWhen job was reserved by worker
available_attimestampWhen job becomes available
created_attimestampJob creation time

This driver is useful when Redis is not available or when queue persistence to relational storage is required.

Sync Queue Driver

The SyncQueue driver executes jobs immediately in the current process/coroutine. This is useful for:

  • Local Development - Immediate execution for debugging
  • Testing - Synchronous job processing in tests
  • Simple Applications - When asynchronous processing is not needed

Jobs dispatched to the sync driver bypass serialization and execute directly using the container to resolve dependencies.

Queue Driver Comparison


Sources: src/queue/composer.json51-58 composer.json113 composer.json209


Job Dispatching

Job Classes

Jobs are classes that implement the ShouldQueue interface and define a handle() method containing the job logic. Jobs can be dispatched using several methods:


Dispatch Methods

Jobs can be dispatched through multiple interfaces:

MethodDescriptionExample
dispatch($job)Global helper functiondispatch(new ProcessOrder($order))
Job::dispatch()Static method on job classProcessOrder::dispatch($order)
$job->dispatch()Instance method(new ProcessOrder($order))->dispatch()
Bus::dispatch($job)Facade methodBus::dispatch(new ProcessOrder($order))

Job Chaining and Batching

Jobs can be chained to execute sequentially or batched to execute in parallel:

  • Chaining - Execute jobs one after another using withChain()
  • Batching - Group jobs together and track completion using Bus::batch()

Delayed and Priority Jobs

Jobs support delayed execution and queue prioritization:

  • dispatch($job)->delay(now()->addMinutes(10)) - Delay job execution
  • dispatch($job)->onQueue('high') - Specify queue name for priority
  • dispatch($job)->onConnection('redis-high') - Use specific connection

Unique Jobs

Jobs can be made unique to prevent duplicate execution by implementing uniqueness constraints. The queue system checks for existing jobs with the same unique identifier before pushing.

Sources: src/bus/composer.json22-34 src/queue/composer.json31 composer.json74


Job Processing and Worker Lifecycle

Worker Process

The Worker class manages the job processing loop within a Hyperf process. It continuously polls queues for available jobs and executes them.

Job Execution Flow


Coroutine Integration

The worker leverages Hyperf's coroutine capabilities:

  • Coordinator - Uses Hyperf\Coordinator\Coordinator for graceful shutdown coordination
  • Coroutine Isolation - Each job execution maintains isolated context
  • Non-blocking Operations - Database and Redis operations use coroutine-friendly drivers

Job Timeout and Memory Management

Workers monitor resource usage:

  • Execution Timeout - Jobs exceeding retry_after seconds are released back to queue
  • Memory Limits - Worker exits when memory usage exceeds configured threshold
  • Max Jobs - Worker exits after processing configured number of jobs

Sources: src/queue/composer.json24-26 src/queue/composer.json33


Job Serialization and Encryption

Serialization Process

Jobs are serialized using Laravel's SerializableClosure package, which allows serialization of:

  • Closure Jobs - Anonymous functions passed to dispatch(function() {...})
  • Class-based Jobs - Job class instances with properties
  • Nested Objects - Complex object graphs including Eloquent models

Encryption for Sensitive Data

Jobs containing sensitive data can be encrypted before storage:

  1. Job payload is serialized to JSON
  2. Encrypted using Hypervel\Encryption\Encrypter
  3. Stored with encryption marker
  4. Decrypted by worker before unserialization

This ensures sensitive data in queued jobs remains secure in Redis or database storage.

Sources: src/queue/composer.json31 src/queue/composer.json36 composer.json127


Failed Job Handling

Failed Job Provider

The FailedJobProviderInterface defines storage for failed jobs. The default DatabaseFailedJobProvider stores failures in a failed_jobs table:

ColumnTypePurpose
idbigintPrimary key
uuidstringUnique identifier
connectionstringQueue connection name
queuestringQueue name
payloadtextSerialized job data
exceptiontextException stack trace
failed_attimestampFailure time

Failed Job Lifecycle


Failed Job Methods

Job classes can define a failed() method that executes when the job permanently fails:


Retry Configuration

Jobs control retry behavior through properties:

  • $tries - Maximum number of attempts
  • $maxExceptions - Maximum number of unhandled exceptions before failure
  • $timeout - Maximum execution time
  • $retryAfter - Delay between retries (calculated based on attempts)
  • $backoff - Array of delays for exponential backoff

Sources: src/queue/composer.json30 src/queue/composer.json54


Integration with Other Systems

Bus Package Integration

The Queue package integrates with the Bus package (hypervel/bus) for dispatching:

  • Dispatcher - Routes ShouldQueue jobs to queue system
  • PendingDispatch - Fluent interface for queue configuration
  • Batch Repository - Stores batch metadata for job groups

Event System Integration

Queue operations trigger events:

  • JobProcessing - Before job execution
  • JobProcessed - After successful execution
  • JobFailed - When job fails permanently
  • JobExceptionOccurred - On any exception during processing

Dependency Injection

The worker resolves job dependencies through the container:

  1. Job class constructor parameters are resolved automatically
  2. handle() method parameters are type-hinted and resolved
  3. Supports contextual binding for job-specific dependencies

Process Management

Queue workers run as Hyperf processes:

  • Configured in config/autoload/processes.php
  • Managed by Hyperf\Process\ProcessManager
  • Support graceful shutdown via signal handling
  • Can be deployed as separate worker processes or within the same server

Sources: src/bus/composer.json22-34 src/event/composer.json32-39 src/queue/composer.json22-36


Configuration Example

A typical queue configuration structure:

Config SectionPurpose
queue.defaultDefault connection name
queue.connections.redisRedis driver configuration
queue.connections.redis.driverSet to 'redis'
queue.connections.redis.connectionRedis connection from config/redis.php
queue.connections.redis.queueDefault queue name
queue.connections.redis.retry_afterJob timeout in seconds
queue.connections.redis.block_forBlocking pop timeout
queue.connections.databaseDatabase driver configuration
queue.connections.database.driverSet to 'database'
queue.connections.database.connectionDatabase connection name
queue.connections.database.tableJobs table name (jobs)
queue.connections.database.queueDefault queue name
queue.connections.database.retry_afterJob timeout
queue.failed.driverFailed job storage driver
queue.failed.databaseDatabase connection for failed jobs
queue.failed.tableFailed jobs table name (failed_jobs)

Sources: src/queue/composer.json1-64 composer.json261

Refresh this wiki

On this page