workerman/redis-queue
Message queue system written in PHP based on workerman and backed by Redis.
Maintainers
v1.2.2
2026-01-20 14:57 UTC
Requires
- php: >=7.0
- workerman/redis: ^1.0||^2.0
- workerman/workerman: >=4.0.20
Requires (Dev)
None
Suggests
None
Provides
None
Conflicts
None
Replaces
None
MIT f0ba4ea9143ae02f39b998ed908d107354cb43c0
README
Message queue system written in PHP based on workerman and backed by Redis.
Install
composer require workerman/redis-queue
Usage
test.php
<?php require __DIR__ . '/vendor/autoload.php'; use Workerman\Worker; use Workerman\Timer; use Workerman\RedisQueue\Client; $worker = new Worker(); $worker->onWorkerStart = function () { $client = new Client('redis://127.0.0.1:6379'); $client->subscribe('user-1', function($data) { echo "user-1\n"; var_export($data); }); $client->subscribe('user-2', function($data) { echo "user-2\n"; var_export($data); }); $client->onConsumeFailure(function (\Throwable $exception, $package) { echo "consume failure\n"; echo $exception->getMessage(), "\n"; var_export($package); }); Timer::add(1, function() use ($client) { $client->send('user-1', ['some', 'data']); }); }; Worker::runAll();
Run with command php test.php start or php test.php start -d.
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
Create an instance by $address and $options.
-
$addressfor exampleredis://ip:6379. -
$optionsis the client connection options. Defaults:auth: default ''db: default 0retry_seconds: Retry interval after consumption failuremax_attempts: Maximum number of retries after consumption failure
send(String $queue, Mixed $data, [int $delay=0])
Send a message to a queue
$queueis the queue to publish to,String$datais the message to publish,Mixed$delayis delay seconds for delayed consumption,Int
subscribe(mixed $queue, callable $callback)
Subscribe to a queue or queues
$queueis aStringqueue or anArraywhich has as keys the queue name to subscribe.$callback-function (Mixed $data),$datais the data sent bysend($queue, $data).
unsubscribe(mixed $queue)
Unsubscribe from a queue or queues
onConsumeFailure(callable $callback)
When consumption fails onConsumeFailure is triggered.
$callback-function (\Throwable $exception, array $package),$packagecontains information such as data queue attempts
