swoole/etcd-client

Grpc PHP Client base on Swoole Http2 Coroutine

Package info

github.com/swoole/grpc

pkg:composer/swoole/etcd-client

Statistics

Installs: 3 319

Dependents: 1

Suggesters: 0

Stars: 197

Open Issues: 13

v2.0.1 2022-09-26 15:34 UTC

Requires

Requires (Dev)

None

Suggests

None

Provides

None

Conflicts

None

Replaces

None

Apache-2.0 15c129ec3b4758ea58f1169d1098cd9d186ddd3b

  • twosee <twose.woop@qq.com>

phphttpclientcoroutineetcdswoolehttp2gRPC

This package is auto-updated.

Last update: 2026-06-26 23:41:01 UTC


README

👁 Latest Version
👁 Php Version
👁 Swoole Version
👁 Swoole License

Introduction

由Swoole驱动的Grpc协程客户端, 底层使用高性能协程Http2-Client客户端

  • 同步代码几乎无改动
    • 自动协程调度获得异步高性能
    • 提供Grpc代码生成器Plus版, 0成本迁移
  • 基于Channel实现的消息生产消费
    • 一个客户端连接即可同时hold住上万个请求响应
    • 支持跨协程并发, 多类型Client分享同一连接
  • Etcd的直接支持
    • 使用Http2协议全双工通信+Protobuf极限压缩, 告别同步阻塞与Json打包的低性能

Requirement

  • PHP7及以上
  • Swoole: v4.4.0及以上, StreamingCall支持需要v4.5.3及以上
  • Protobuf
  • grpc_php_plugin
  • 请不要启用grpc的php扩展, 也无需grpc的php库

Usage

仓库已提供Etcd的生成代码, 如要自己根据proto文件生成代码, 请使用tools目录下的生成工具generator, 使用方法和protoc命令完全一样, 增强了支持以目录作为参数, 自动查找目录下的proto文件生成, 如: 该目录下已提供的grpc生成代码脚本:

# it's generate_grpc.sh
./generator \
--proto_path=./../src/Grpc/Proto \
--php_out=./../src/Grpc \
--grpc_out=./../src/Grpc \
--plugin=protoc-gen-grpc=$1 \
./../src/Grpc/Proto

只需要将proto文件放在Grpc/Proto下, 运行./generate_grpc.sh ../../grpc/bins/opt/grpc_php_plugin (参数是你的grpc php插件位置, 一般在grpc/bins/opt目录中), 即可生成相关代码


Examples

以下示例都可在examples目录下找到并直接运行

Grpc

HelloWorld

经典的Grpc官方示例, 代码更加简洁

$greeterClient = new GreeterClient('127.0.0.1:50051');
$request = new HelloRequest();
$request->setName('Swoole');
list($reply, $status) = $greeterClient->SayHello($request);
$message = $reply->getMessage();
echo "{$message}\n"; // Output: Hello Swoole

Etcd

Etcd的几个基本操作的使用

Put

use Swoole\Coroutine;

Coroutine::create(function () {
 $kvClient = new Etcdserverpb\KVClient(GRPC_SERVER_DEFAULT_URI);
 $request = new Etcdserverpb\PutRequest();
 $request->setPrevKv(true);
 $request->setKey('Hello');
 $request->setValue('Swoole');
 [$reply, $status] = $kvClient->Put($request);
 if ($status === 0) {
 echo "{$reply->getPrevKv()->getKey()}\n";
 echo "{$reply->getPrevKv()->getValue()}\n";
 } else {
 echo "Error#{$status}: {$reply}\n";
 }
 $kvClient->close();
});

Watch

创建一个协程负责Watch, 创建两个协程定时写入/删除键值以便观察效果

use Etcdserverpb\WatchCreateRequest;
use Etcdserverpb\WatchCreateRequest\FilterType;
use Etcdserverpb\WatchRequest;
use Swoole\Coroutine;

// The Watcher
Coroutine::create(function () {
 $watchClient = new Etcdserverpb\WatchClient(GRPC_SERVER_DEFAULT_URI);

 $watchCall = $watchClient->Watch();
 $request = new WatchRequest();
 $createRequest = new WatchCreateRequest();
 $createRequest->setKey('Hello');
 $request->setCreateRequest($createRequest);

 _retry:
 $watchCall->push($request);
 /**@var $reply Etcdserverpb\WatchResponse */
 while (true) {
 [$reply, $status] = $watchCall->recv();
 if ($status === 0) { // success
 if ($reply->getCreated() || $reply->getCanceled()) {
 continue;
 }
 foreach ($reply->getEvents() as $event) {
 /**@var $event Mvccpb\Event */
 $type = $event->getType();
 $kv = $event->getKv();
 if (FilterType::NOPUT === $type) {
 echo "Put key {$kv->getKey()} => {$kv->getValue()}\n";
 break;
 } elseif (FilterType::NODELETE === $type) {
 echo "Delete key {$kv->getKey()}\n";
 break;
 }
 }
 } else { // failed
 static $retry_time = 0;
 if ($watchClient->isConnected()) {
 $retry_time++;
 echo "Retry#{$retry_time}\n";
 goto _retry;
 } else {
 echo "Error#{$status}: {$reply}\n";
 break;
 }
 }
 }
 $watchClient->close();
});

// The Writer Put and Delete
Coroutine::create(function () {
 $kvClient = new Etcdserverpb\KVClient(GRPC_SERVER_DEFAULT_URI);
 Coroutine::create(function () use ($kvClient) {
 $request = new Etcdserverpb\PutRequest();
 $request->setKey('Hello');
 $request->setPrevKv(true);
 while (true) {
 static $count = 0;
 Coroutine::sleep(.5);
 $request->setValue('Swoole#' . (++$count));
 [$reply, $status] = $kvClient->Put($request);
 if ($status !== 0) {
 echo "Error#{$status}: {$reply}\n";
 break;
 }
 }
 $kvClient->close();
 });
 Coroutine::create(function () use ($kvClient) {
 $request = new Etcdserverpb\DeleteRangeRequest();
 $request->setKey('Hello');
 $request->setPrevKv(true);
 while (true) {
 Coroutine::sleep(1);
 [$reply, $status] = $kvClient->DeleteRange($request);
 if ($status !== 0) {
 echo "Error#{$status}: {$reply}\n";
 break;
 }
 }
 $kvClient->close();
 });
});

Auth and Share Client

用户添加/展示/删除以及展示了如何让不同类型的EtcdClient能够使用同一个Grpc\Client创建的连接

use Swoole\Coroutine;

Coroutine::create(function () {
 $grpcClient = new Grpc\Client(GRPC_SERVER_DEFAULT_URI);
 // use in different type clients

 Coroutine::create(function () use ($grpcClient) {
 $kvClient = new Etcdserverpb\KVClient(GRPC_SERVER_DEFAULT_URI, ['use' => $grpcClient]);
 $request = new Etcdserverpb\PutRequest();
 $request->setPrevKv(true);
 $request->setKey('Hello');
 $request->setValue('Swoole');
 [$reply, $status] = $kvClient->Put($request);
 if ($status === 0) {
 echo "\n=== PUT KV OK ===\n";
 } else {
 echo "Error#{$status}: {$reply}\n";
 }
 });

 Coroutine::create(function () use ($grpcClient) {
 $authClient = new Etcdserverpb\AuthClient(GRPC_SERVER_DEFAULT_URI, ['use' => $grpcClient]);

 $userRequest = new Etcdserverpb\AuthUserAddRequest();
 $userNames = ['ranCoroutine::create', 'twosee', 'gxh', 'stone', 'sjl'];
 foreach ($userNames as $username) {
 $userRequest->setName($username);
 [$reply, $status] = $authClient->UserAdd($userRequest);
 if ($status !== 0) {
 goto _error;
 }
 }

 $useListRequest = new Etcdserverpb\AuthUserListRequest();
 [$reply, $status] = $authClient->UserList($useListRequest);
 if ($status !== 0) {
 goto _error;
 }
 echo "\n=== SHOW USER LIST ===\n";
 foreach ($reply->getUsers() as $user) {
 /**@var \Authpb\User */
 echo "* {$user}\n";
 }
 echo "=== SHOW USER LIST OK ===\n";

 $userRequest = new Etcdserverpb\AuthUserDeleteRequest();
 foreach ($userNames as $username) {
 $userRequest->setName($username);
 [$reply, $status] = $authClient->UserDelete($userRequest);
 if ($status !== 0) {
 goto _error;
 }
 }

 if (false) {
 _error:
 echo "Error#{$status}: {$reply}\n";
 }

 echo "\n=== SHOW ALL CLIENT STATS ===\n";
 var_dump(grpc_client_num_stats());
 $grpcClient->close();
 });

});