VOOZH about

URL: https://deepwiki.com/friendsofhyperf/components/3.2-fluent-dispatch-api-and-job-routing

⇱ Fluent Dispatch API and Job Routing | friendsofhyperf/components | DeepWiki


Loading...
Last indexed: 14 February 2026 (15d5ca)
Menu

Fluent Dispatch API and Job Routing

This page documents the fluent dispatch API provided by the friendsofhyperf/support component, which offers a unified interface for dispatching jobs across multiple messaging systems. The dispatch system automatically routes jobs to the appropriate backend (AsyncQueue, AMQP, or Kafka) based on the job type and provides a fluent configuration API.

For information about the global helper functions that wrap this functionality, see Global Helper Functions. For retry strategies used with dispatched jobs, see Backoff Strategies.

Overview and Architecture

The dispatch system provides a single entry point (dispatch() function) that intelligently routes jobs to different backends based on type inspection. The system uses the "Pending Dispatch" pattern, where configuration is accumulated in a pending object before actual dispatch occurs when the object is destroyed.

Dispatch Routing Flow


Sources: src/support/README.md28-80 CHANGELOG-3.1.md96

Component Structure


Sources: src/support/README.md9-16 src/support/composer.json22-38

The dispatch() Function

The dispatch() helper function in the FriendsOfHyperf\Support namespace provides the primary entry point for job dispatch. It performs type inspection and creates the appropriate pending dispatch object.

Function Signature and Type Routing

Job TypePending ClassBackend
ClosurePendingAsyncQueueDispatch (with CallQueuedClosure)Hyperf AsyncQueue
ProducerMessageInterfacePendingAmqpProducerMessageDispatchHyperf AMQP
ProduceMessagePendingKafkaProducerMessageDispatchHyperf Kafka
Other objects implementing JobInterfacePendingAsyncQueueDispatchHyperf AsyncQueue

Deprecated Legacy dispatch()

Note that friendsofhyperf/helpers contains a deprecated dispatch() function at src/helpers/src/Functions.php194-225 which will be removed in v3.2. This older implementation returns a boolean and does not provide the fluent API. The new implementation is located in the friendsofhyperf/support package.

Sources: src/support/README.md169-177 src/helpers/src/Functions.php194-225 CHANGELOG-3.1.md96

CallQueuedClosure - Executing Closures as Jobs

CallQueuedClosure wraps PHP closures to make them compatible with Hyperf's async queue system. It serializes the closure using laravel/serializable-closure and handles dependency injection when the job is processed.

Class Structure


Usage Patterns

Basic Closure Job


With Dependency Injection


With Configuration


The CallQueuedClosure class implements JobInterface and integrates with Hyperf's async queue worker. When the worker processes the job, it calls handle(), which resolves dependencies from the DI container and invokes the original closure.

Sources: src/support/README.md30-54 src/support/README.md82-99 CHANGELOG-3.1.md79

PendingAsyncQueueDispatch

PendingAsyncQueueDispatch provides a fluent interface for configuring async queue jobs before dispatch. The actual job is pushed to the queue when the pending object is destroyed (in the __destruct() method).

Configuration Methods

MethodParameter TypeDescription
onConnection()string $connectionSet the queue connection name (e.g., 'default', 'high-priority')
delay()int $delayDelay job execution by specified seconds
setMaxAttempts()int $attemptsSet maximum retry attempts for the job
when()mixed $condition, callable $callbackApply callback conditionally
unless()mixed $condition, callable $callbackApply callback unless condition is true

Execution Flow


Internal Properties Access Pattern

The pending dispatch classes use a closure trick to access private properties of the job objects. For example, when determining the queue connection:


This pattern allows the dispatch system to read configuration from the job object without requiring public getters.

Sources: src/support/README.md103-120 src/helpers/src/Functions.php208-216

PendingAmqpProducerMessageDispatch

PendingAmqpProducerMessageDispatch handles AMQP message production with support for RabbitMQ-specific features like exchanges, routing keys, and publisher confirms.

Configuration Methods

MethodParameter TypeDescription
setPool()string $poolSet AMQP connection pool name
setExchange()string $exchangeSet target exchange name
setRoutingKey()array|string $routingKeySet routing key(s) for message routing
setTimeout()int $timeoutSet producer timeout in seconds
setConfirm()bool $confirmEnable/disable publisher confirms

AMQP Message Flow


Usage Example


The routing key determines which queue(s) receive the message based on the exchange binding configuration. Multiple routing keys can be provided as an array for fanout patterns.

Sources: src/support/README.md56-69 src/support/README.md124-138 src/support/README.md186-196

PendingKafkaProducerMessageDispatch

PendingKafkaProducerMessageDispatch provides a simpler interface for Kafka message production, primarily focused on pool selection.

Configuration Methods

MethodParameter TypeDescription
setPool()string $poolSet Kafka producer pool name

Kafka Message Flow


Usage Example


The ProduceMessage contains the topic, partition key, and message payload. The pool name corresponds to a Kafka producer configuration in the Hyperf configuration files.

Sources: src/support/README.md71-80 src/support/README.md140-151 src/support/README.md197-202

Conditional Execution with when() and unless()

All pending dispatch classes implement the Conditionable trait from Hyperf, providing when() and unless() methods for conditional configuration.

Method Signatures


The $condition is evaluated as a boolean. If the condition is met, the callback is invoked with the pending dispatch object as its parameter. The callback can chain additional configuration methods.

Conditional Configuration Flow


Usage Examples

Conditional Queue Selection


Environment-Based Configuration


The callbacks receive the pending dispatch object, allowing chained method calls within the callback scope. This pattern is particularly useful for environment-specific or feature-flag-based configuration.

Sources: src/support/README.md153-166 src/support/README.md186-202

Complete Job Lifecycle

The following diagram illustrates the complete lifecycle of a dispatched job from initial call to backend execution:


Key Lifecycle Points

  1. Dispatch Call: dispatch($job) inspects job type and creates appropriate pending object
  2. Configuration Phase: Fluent methods accumulate configuration in pending object
  3. Automatic Dispatch: When pending object goes out of scope, __destruct() triggers actual dispatch
  4. Backend Storage: Job is stored in the appropriate backend (Redis queue, RabbitMQ, Kafka)
  5. Worker Processing: Queue workers fetch and execute jobs
  6. Retry Logic: Failed jobs can be retried based on maxAttempts configuration

Deferred Execution Pattern

The pending dispatch pattern uses PHP's destructor to defer the actual dispatch until the object is destroyed. This allows:

  • Fluent API: Chain multiple configuration methods before dispatch
  • Conditional Logic: Apply configuration conditionally without intermediate variables
  • Lazy Evaluation: Dispatch doesn't occur until configuration is complete
  • Memory Safety: No manual dispatch call needed - happens automatically

Sources: src/support/README.md103-151 CHANGELOG-3.1.md96

Configuration Examples and Patterns

Async Queue Job with Full Configuration


AMQP Message with Routing


Kafka Event Publishing


Sources: src/support/README.md30-80 src/support/README.md153-166

Refresh this wiki

On this page