roadrunner-php/centrifugo

RoadRunner: Centrifugo bridge

Package info

github.com/roadrunner-php/centrifugo

Homepage

Issues

Chat

Forum

Documentation

pkg:composer/roadrunner-php/centrifugo

Fund package maintenance!

roadrunner-server

Statistics

Installs: 1 925 547

Dependents: 6

Suggesters: 0

Stars: 14

2.2.1 2025-06-23 08:24 UTC

Requires

Requires (Dev)

Suggests

None

Provides

None

Conflicts

None

Replaces

None

MIT fb639ed0d8524ee5940c655f8c1b4a3d4d3a1b97

  • Anton Titov (wolfy-j) <wolfy-j.woop@spiralscout.com>
  • Pavel Buchnev (butschster) <pavel.buchnev.woop@spiralscout.com>
  • Aleksei Gagarin (roxblnfk) <alexey.gagarin.woop@spiralscout.com>
  • Maksim Smakouz (msmakouz) <maksim.smakouz.woop@spiralscout.com>
  • Kirill Nesmeyanov (SerafimArts) <kirill.nesmeyanov.woop@spiralscout.com>
  • RoadRunner Community

This package is auto-updated.

Last update: 2026-05-23 21:24:44 UTC


README

👁 Image

RoadRunner Centrifugo Bridge

👁 PHP Version Require
👁 Latest Stable Version
👁 phpunit
👁 psalm
👁 Codecov
👁 Total Downloads

This repository contains the codebase PHP bridge using RoadRunner centrifuge plugin.

Installation

To install application server and Jobs codebase

composer require roadrunner-php/centrifugo

You can use the convenient installer to download the latest available compatible version of RoadRunner assembly:

composer require spiral/roadrunner-cli --dev
vendor/bin/rr get

Proxy Centrifugo requests to a PHP application

It's possible to proxy some client connection events from Centrifugo to the RoadRunner application server and react to them in a custom way. For example, it's possible to authenticate connection via request from Centrifugo to application backend, refresh client sessions and answer to RPC calls sent by a client over bidirectional connection.

The list of events that can be proxied:

  • connect – called when a client connects to Centrifugo, so it's possible to authenticate user, return custom data to a client, subscribe connection to several channels, attach meta information to the connection, and so on. Works for bidirectional and unidirectional transports.
  • refresh - called when a client session is going to expire, so it's possible to prolong it or just let it expire. Can also be used just as a periodical connection liveness callback from Centrifugo to app backend. Works for bidirectional and unidirectional transports.
  • sub_refresh - called when it's time to refresh the subscription. Centrifugo itself will ask your backend about subscription validity instead of subscription refresh workflow on the client-side.
  • subscribe - called when clients try to subscribe on a channel, so it's possible to check permissions and return custom initial subscription data. Works for bidirectional transports only.
  • publish - called when a client tries to publish into a channel, so it's possible to check permissions and optionally modify publication data. Works for bidirectional transports only.
  • rpc - called when a client sends RPC, you can do whatever logic you need based on a client-provided RPC method and params. Works for bidirectional transports only.

First you need to add centrifuge section to your RoadRunner configuration. For example, such a configuration would be quite feasible to run:

rpc:
 listen: tcp://127.0.0.1:6001

server:
 command: "php app.php"
 relay: pipes

centrifuge:
 proxy_address: "tcp://0.0.0.0:10001" # Centrifugo address

and centrifugo config:

{
 "admin": true,
 "api_key": "secret",
 "admin_password": "password",
 "admin_secret": "admin_secret",
 "allowed_origins": [
 "*"
 ],
 "token_hmac_secret_key": "test",
 "publish": true,
 "proxy_publish": true,
 "proxy_subscribe": true,
 "proxy_connect": true,
 "allow_subscribe_for_client": true,
 "proxy_connect_endpoint": "grpc://127.0.0.1:10001",
 "proxy_connect_timeout": "10s",
 "proxy_publish_endpoint": "grpc://127.0.0.1:10001",
 "proxy_publish_timeout": "10s",
 "proxy_subscribe_endpoint": "grpc://127.0.0.1:10001",
 "proxy_subscribe_timeout": "10s",
 "proxy_refresh_endpoint": "grpc://127.0.0.1:10001",
 "proxy_refresh_timeout": "10s",
 "proxy_sub_refresh_endpoint": "grpc://127.0.0.1:10001",
 "proxy_sub_refresh_timeout": "1s",
 "proxy_rpc_endpoint": "grpc://127.0.0.1:10001",
 "proxy_rpc_timeout": "10s"
}

Note proxy_connect_endpoint, proxy_publish_endpoint, proxy_subscribe_endpoint, proxy_refresh_endpoint, proxy_sub_refresh_endpoint, proxy_rpc_endpoint - endpoint address of roadrunner server with activated centrifuge plugin.

To init abstract RoadRunner worker:

<?php

require __DIR__ . '/vendor/autoload.php';

use RoadRunner\Centrifugo\CentrifugoWorker;
use RoadRunner\Centrifugo\Payload;
use RoadRunner\Centrifugo\Request;
use RoadRunner\Centrifugo\Request\RequestFactory;
use Spiral\RoadRunner\Worker;

$worker = Worker::create();
$requestFactory = new RequestFactory($worker);

// Create a new Centrifugo Worker from global environment
$centrifugoWorker = new CentrifugoWorker($worker, $requestFactory);

while ($request = $centrifugoWorker->waitRequest()) {

 if ($request instanceof Request\Invalid) {
 $errorMessage = $request->getException()->getMessage();

 if ($request->getException() instanceof \RoadRunner\Centrifugo\Exception\InvalidRequestTypeException) {
 $payload = $request->getException()->payload;
 }

 // Handle invalid request
 // $logger->error($errorMessage, $payload ?? []);

 continue;
 }

 if ($request instanceof Request\Refresh) {
 try {
 // Do something
 $request->respond(new Payload\RefreshResponse(
 // ...
 ));
 } catch (\Throwable $e) {
 $request->error($e->getCode(), $e->getMessage());
 }

 continue;
 }

 if ($request instanceof Request\Subscribe) {
 try {
 // Do something
 $request->respond(new Payload\SubscribeResponse(
 // ...
 ));

 // You can also disconnect connection
 $request->disconnect('500', 'Connection is not allowed.');
 } catch (\Throwable $e) {
 $request->error($e->getCode(), $e->getMessage());
 }

 continue;
 }

 if ($request instanceof Request\Publish) {
 try {
 // Do something
 $request->respond(new Payload\PublishResponse(
 // ...
 ));

 // You can also disconnect connection
 $request->disconnect('500', 'Connection is not allowed.');
 } catch (\Throwable $e) {
 $request->error($e->getCode(), $e->getMessage());
 }

 continue;
 }

 if ($request instanceof Request\RPC) {
 try {
 $response = $router->handle(
 new Request(uri: $request->method, data: $request->data),
 ); // ['user' => ['id' => 1, 'username' => 'john_smith']]

 $request->respond(new Payload\RPCResponse(
 data: $response
 ));
 } catch (\Throwable $e) {
 $request->error($e->getCode(), $e->getMessage());
 }

 continue;
 }
}

Note You can find addition information about here.

👁 try Spiral Framework

License

The MIT License (MIT). Please see LICENSE for more information. Maintained by Spiral Scout.