arquivei/php-kafka-consumer

A consumer of Kafka in PHP

Maintainers

👁 arquivei

Package info

github.com/arquivei/php-kafka-consumer

Type:project

pkg:composer/arquivei/php-kafka-consumer

Statistics

Installs: 79 222

Dependents: 1

Suggesters: 0

Stars: 5

Open Issues: 11

2.6.4 2026-05-20 14:25 UTC

Requires

  • php: ~7.2 || ~7.3 || ~7.4 || ^8.0
  • ext-rdkafka: ~3.0 || ~3.1 || ~4.0 || ~5.0 || ~6.0
  • illuminate/console: ~6 || ~7 || ~8 || ~9 || ~10 || ~11
  • monolog/monolog: ~1 || ~2 || ~3

Requires (Dev)

Suggests

None

Provides

None

Conflicts

None

Replaces

None

MIT 5654dc448c2b1e237f27976aec7e4c29152c1ad5

phpkafkaconsumer


README

👁 Latest Stable Version
👁 Total Downloads
👁 Tests
👁 Dependency coverage

An Apache Kafka consumer in PHP. Subscribe to topics and define callbacks to handle the messages.

Requirements

In order to use this library, you'll need the php-rdkafka PECL extension. Please notice that the extension requires the librdkafka C library.

Minimum requirements:

Dependency version
librdkafka v1.5.3
PHP 7.4 +
ext-rdkafka 3.0 +
Laravel 6 +

Install

Using composer:

composer require arquivei/php-kafka-consumer

Usage

<?php

require_once 'vendor/autoload.php';

use Kafka\Consumer\ConsumerBuilder;
use Kafka\Consumer\Entities\Config\Sasl;

class DefaultConsumer
{
 public function __invoke(string $message): void
 {
 print 'Init: ' . date('Y-m-d H:i:s') . PHP_EOL;
 sleep(2);
 print 'Finish: ' . date('Y-m-d H:i:s') . PHP_EOL;
 }
}

$consumer = ConsumerBuilder::create('broker:port', 'php-kafka-consumer-group-id', ['topic'])
 ->withSasl(new Sasl('username', 'pasword', 'mechanisms'))
 ->withCommitBatchSize(1)
 ->withSecurityProtocol('security-protocol')
 ->withHandler(new DefaultConsumer()) // or any callable
 ->build();

$consumer->consume();

Or by using the legacy API:

<?php

require_once 'vendor/autoload.php';

use Kafka\Consumer\Contracts\Consumer;
use Kafka\Consumer\Entities\Config;
use Kafka\Consumer\Entities\Config\Sasl;

class DefaultConsumer extends Consumer
{
 public function handle(string $message): void
 {
 print 'Init: ' . date('Y-m-d H:i:s') . PHP_EOL;
 sleep(2);
 print 'Finish: ' . date('Y-m-d H:i:s') . PHP_EOL;
 }
}

$config = new Config(
 new Sasl(
 'username',
 'password',
 'mechanisms'
 ),
 ['topic'],
 'broker:port',
 1,
 'php-kafka-consumer-group-id',
 new DefaultConsumer(),
 'PLAINTEXT',
 'topic-dlq',
 1,
 6
);

$consumer = new \Kafka\Consumer\Consumer($config);
$consumer->consume();

Usage with Laravel

You need to add the php-kafka-consig.php in config path:

<?php

return [
 'topic' => 'topic',
 'broker' => 'broker',
 'groupId' => 'group-id',
 'securityProtocol' => 'security-protocol',
 'sasl' => [
 'mechanisms' => 'mechanisms',
 'username' => 'username',
 'password' => 'password',
 ],
];

Use the command to execute the consumer:

$ php artisan arquivei:php-kafka-consumer --consumer="App\Consumers\YourConsumer" --commit=1

Middlewares

Middlewares are simple callables that receive two arguments: the message being handled and the next handler. Some possible use cases for middlewares: message transformation, filtering, logging stuff, or even transaction handling, your imagination is the limit.

<?php

use Kafka\Consumer\ConsumerBuilder;

$consumer = ConsumerBuilder::create('broker:port', 'php-kafka-consumer-group-id', ['topic'])
 ->withHandler(function ($message) {/** ... */})
 // You may add any number of middlewares, they will be executed in the order provided
 ->withMiddleware(function (string $rawMessage, callable $next): void {
 $decoded = json_decode($rawMessage, true);
 $next($decoded);
 })
 ->withMiddleware(function (array $message, callable $next): void {
 if (! isset($message['foo'])) {
 return;
 }
 $next($message);
 })
 ->build();

$consumer->consume();

Build and test

If you want to contribute, there are a few utilities that will help.

First create a container:

docker compose up -d --build

If you have make, you can use pre defined commands in the Makefile

make build

Then install the dependencies:

docker compose exec php-fpm composer install

or with make:

make composer install

You can run tests locally:

docker compose exec php-fpm ./vendor/phpunit/phpunit/phpunit tests

or with make:

make test

and check for coverage:

docker compose exec php-fpm phpdbg -qrr ./vendor/bin/phpunit --whitelist src/ --coverage-html coverage/

or with make:

make coverage