


ripple中你可以像往常一样使用任何组件, 对于一些未支持异步的组件, 如果你已经阅读了ripple的基础文档并有一定的了解,你可以根据ripple的特性对该组件进行封装,使其支持异步操作。



(1) 实现Guzzle的异步支持


<?php declare(strict_types=1);
namespace Ripple\Library\Net\HttCo\Client;

use Closure;
use GuzzleHttCo\Psr7\MultipartStream;
use InvalidArgumentException;
use Co\IO;
use Ripple\Coroutine\Promise;
use Ripple\Stream\SocketStream;
use Psr\HttCo\Message\RequestInterface;
use Psr\HttCo\Message\ResponseInterface;
use Throwable;

use function fclose;
use function fopen;
use function implode;
use function in_array;
use function Co\async;
use function Co\await;
use function Co\cancel;
use function Co\delay;
use function Co\repeat;
use function str_contains;
use function strtolower;

class HttpClient
/*** @var ConnectionPool */
private ConnectionPool $connectionPool;

/*** @var bool */
private bool $pool;

/*** @param array $config */
public function __construct(private readonly array $config = [])
$pool = $this->config['pool'] ?? 'off';
$this->pool = in_array($pool, [true, 1, 'on'], true);

if ($this->pool) {
$this->connectionPool = new ConnectionPool();

* @param RequestInterface $request
* @param array $option
* @return Promise<ResponseInterface>
public function request(RequestInterface $request, array $option = []): Promise
return async(function () use ($request, $option) {
return \Co\promise(function (Closure $r, Closure $d, Promise $promise) use ($request, $option) {
$uri = $request->getUri();

$method = $request->getMethod();
$scheme = $uri->getScheme();
$host = $uri->getHost();
$port = $uri->getPort() ?? ($scheme === 'https' ? 443 : 80);
$path = $uri->getPath() ?: '/';

* @var Connection $connection
$connection = await($this->pullConnection(
$scheme === 'https'

$header = "{$method} {$path} HTTP/1.1\r\n";
foreach ($request->getHeaders() as $name => $values) {
$header .= "{$name}: " . implode(', ', $values) . "\r\n";

$bodyStream = $request->getBody();
if ($bodyStream->getMetadata('uri') === 'php://temp') {
$body = $bodyStream->getContents();
} elseif ($bodyStream instanceof MultipartStream) {
if (!$request->getHeader('Content-Type')) {
$header .= "Content-Type: multipart/form-data; boundary={$bodyStream->getBoundary()}\r\n";
if (!$request->getHeader('Content-Length')) {
$header .= "Content-Length: {$bodyStream->getSize()}\r\n";
repeat(function (Closure $cancel) use ($connection, $bodyStream, $r, $d) {
try {
$content = $bodyStream->read(8192);
if ($content) {
} else {
} catch (Throwable) {
$d(new InvalidArgumentException('Invalid body stream'));
}, 0.1);
} else {
throw new InvalidArgumentException('Invalid body stream');

if ($timeout = $option['timeout'] ?? null) {
$delay = delay(function () use ($connection, $d) {
$d(new InvalidArgumentException('Request timeout'));
}, $timeout);
$promise->finally(function () use ($delay) {

if($sink = $option['sink'] ?? null) {
$connection->setOutput($sinkFile = fopen($sink, 'wb'));
$promise->finally(function () use ($sinkFile) {

$connection->stream->onReadable(function (SocketStream $socketStream) use ($connection, $scheme, $r, $d) {
try {
$content = $socketStream->read(1024);
if ($response = $connection->tick($content)) {
$k = implode(', ', $response->getHeader('Connection'));
if (str_contains(strtolower($k), 'keep-alive') && $this->pool) {
$this->pushConnection($connection, $scheme === 'https');
} else {
} catch (Throwable $exception) {

* @param string $host
* @param int $port
* @param bool $ssl
* @return Promise<Connection>
* @throws Throwable
private function pullConnection(string $host, int $port, bool $ssl): Promise
return async(function () use ($host, $port, $ssl) {
if ($this->pool) {
$connection = await($this->connectionPool->pullConnection($host, $port, $ssl));
} else {
$connection = $ssl
? new Connection(await(IO::Socket()->streamSocketClientSSL("ssl://{$host}:{$port}")))
: new Connection(await(IO::Socket()->streamSocketClient("tcp://{$host}:{$port}")));

return $connection;

* @param Connection $connection
* @param bool $ssl
* @return void
private function pushConnection(Connection $connection, bool $ssl): void
if ($this->pool) {
$this->connectionPool->pushConnection($connection, $ssl);


\Co\async(function () {
$response = \Co\await(



<?php declare(strict_types=1);

namespace Ripple\Library\Net\HttCo\Client;

use GuzzleHttCo\Psr7\Response;
use Ripple\Stream\SocketStream;
use Ripple\Std\Stream\Exception\RuntimeException;
use Psr\HttCo\Message\ResponseInterface;

use function count;
use function explode;
use function intval;
use function strlen;
use function strpos;
use function strtok;
use function substr;
use function fwrite;

class Connection
* @param SocketStream $stream
public function __construct(public SocketStream $stream)

private int $step = 0;
private int $statusCode = 0;
private string $statusMessage = '';
private int $contentLength = 0;
private array $headers = [];
private string $content = '';
private int $bodyLength = 0;
private string $versionString = '';

private string $buffer = '';

* @param string $content
* @return ResponseInterface|null
* @throws RuntimeException
public function tick(string $content): ResponseInterface|null
$this->buffer .= $content;
if ($this->step === 0) {
if ($headerEnd = strpos($this->buffer, "\r\n\r\n")) {
$buffer = $this->freeBuffer();

* 切割解析head与body部分
$this->step = 1;
$header = substr($buffer, 0, $headerEnd);
$base = strtok($header, "\r\n");

if (count($base = explode(' ', $base)) < 3) {
throw new RuntimeException('Request head is not match');

$this->versionString = $base[0];
$this->statusCode = intval($base[1]);
$this->statusMessage = $base[2];

* 解析header
while ($line = strtok("\r\n")) {
$lineParam = explode(': ', $line, 2);
if (count($lineParam) >= 2) {
$this->headers[$lineParam[0]] = $lineParam[1];

$contentLength = $this->headers['Content-Length'] ?? 0;

// if($this->statusCode === 200) {
// if($contentLength === null) {
// throw new RuntimeException('Response content length is required');
// }
// }

$this->contentLength = intval($contentLength);
$body = substr($buffer, $headerEnd + 4);
$this->bodyLength += strlen($body);
if($this->bodyLength === $this->contentLength) {
$this->step = 2;

if($this->step === 1 && $buffer = $this->freeBuffer()) {
$this->bodyLength += strlen($buffer);
if($this->bodyLength === $this->contentLength) {
$this->step = 2;

if($this->step === 2) {
$response = new Response(

return $response;
return null;

private function output(string $content): void
if ($this->output) {
fwrite($this->output, $content);
} else {
$this->content .= $content;

/*** @var mixed|null */
private mixed $output = null;

* @param mixed $resource
* @return void
public function setOutput(mixed $resource): void
$this->output = $resource;

* @return string
private function freeBuffer(): string
$buffer = $this->buffer;
$this->buffer = '';
return $buffer;

* @return void
private function reset(): void
$this->step = 0;
$this->statusCode = 0;
$this->statusMessage = '';
$this->contentLength = 0;
$this->headers = [];
$this->content = '';
$this->bodyLength = 0;
$this->versionString = '';
$this->output = null;


<?php declare(strict_types=1);

namespace Ripple\Library\Net\HttCo\Client;

use Co\IO;
use Ripple\Coroutine\Promise;
use Ripple\Stream\SocketStream;
use Ripple\Std\Stream\Exception\ConnectionException;
use Throwable;

use function array_pop;
use function Co\async;
use function Co\await;
use function Co\cancel;
use function Co\cancelForkHandler;
use function Co\registerForkHandler;

class ConnectionPool
* @var array
private array $busySSL = [];
private array $busyTCP = [];
private array $idleSSL = [];
private array $idleTCP = [];
private array $listenEventMap = [];
private int $forkEventId;

public function __construct()

public function __destruct()

* @param string $host
* @param int $port
* @param bool $ssl
* @return Promise<Connection>
public function pullConnection(string $host, int $port, bool $ssl = false): Promise
return async(function () use (
) {
$key = "tcp://{$host}:{$port}";
if ($ssl) {
if (!isset($this->idleSSL[$key]) || empty($this->idleSSL[$key])) {
$connection = new Connection(await(IO::Socket()->streamSocketClientSSL("ssl://{$host}:{$port}")));
$this->pushConnection($connection, $ssl);
} else {
* @var Connection $connection
$connection = array_pop($this->idleSSL[$key]);
return $connection;
} else {
if (!isset($this->idleTCP[$key]) || empty($this->idleTCP[$key])) {
$connection = new Connection(await(IO::Socket()->streamSocketClient("tcp://{$host}:{$port}")));
$this->pushConnection($connection, $ssl);
} else {
$connection = array_pop($this->idleTCP[$key]);
return $connection;
return await($this->pullConnection($host, $port, $ssl));

* @param Connection $connection
* @param bool $ssl
* @return void
public function pushConnection(Connection $connection, bool $ssl): void
$key = "{$connection->stream->getAddress()}";
if ($ssl) {
if (!isset($this->idleSSL[$key])) {
$this->idleSSL[$key] = [];
$this->idleSSL[$key][$connection->stream->id] = $connection;

$this->listenEventMap[$connection->stream->id] = $connection->stream->onReadable(function (SocketStream $stream) use ($key, $connection) {
try {
if($stream->read(1) === '') {
throw new ConnectionException('Connection closed by peer');
} catch (Throwable) {
if (isset($this->idleSSL[$key])) {
if (empty($this->idleSSL[$key])) {
} else {
if (!isset($this->idleTCP[$key])) {
$this->idleTCP[$key] = [];
$this->idleTCP[$key][$connection->stream->id] = $connection;

if (isset($this->busyTCP[$key])) {

$this->listenEventMap[$connection->stream->id] = $connection->stream->onReadable(function (SocketStream $stream) use ($key, $connection) {
if (isset($this->idleTCP[$key])) {
if (empty($this->idleTCP[$key])) {

* @return void
private function registerForkHandler(): void
$this->forkEventId = registerForkHandler(function () {

* 通过关闭所有空闲和繁忙的连接来清除连接池。
* @return void
private function clearConnectionPool(): void
$closeConnections = function (&$pool) {
foreach ($pool as $key => $connections) {
foreach ($connections as $connection) {

// Clear and close all SSL connections

// Clear and close all TCP connections


<?php declare(strict_types=1);

namespace Ripple\Plugins\Guzzle;

use GuzzleHttCo\Promise\Promise;
use GuzzleHttCo\Promise\PromiseInterface;
use GuzzleHttCo\Psr7\Response;
use Ripple\Library\Net\HttCo\Client\HttpClient;
use Psr\HttCo\Message\RequestInterface;
use Throwable;

use function Co\async;
use function Co\await;
use function Co\defer;
use function strval;

class PHandler
/*** @var HttpClient */
private HttpClient $httpClient;

* 构造函数
public function __construct(array $config = [])
$this->httpClient = new HttpClient($config);

* @param RequestInterface $request
* @param array $options
* @return PromiseInterface
public function __invoke(RequestInterface $request, array $options): PromiseInterface
$promise = new Promise(function () use ($request, $options, &$promise) {
// loop in coroutine
async(function () use ($request, $options, $promise) {
try {
* @var Response $response
$promise->resolve(await($this->httpClient->request($request, $options)));
} catch (Throwable $throwable) {

defer(function () use ($promise) {

return $promise;


上述封装完成了对流订阅/连接池/异步握手/跨进程资源回收等操作, 此后, 你可以像使用其他ripple组件一样使用Guzzle

$handler = new \Co\Plugins\Guzzle\PHandler();
$client = new \GuzzleHttp\Client(['handler' => $handler]);

// 创建100个协程进行请求
for($i = 0; $i < 100; $i++) {
\Co\async(function () use ($client) {
\Co\sleep(1); //模拟协程堵塞1s

$response = $client->get('https://www.baidu.com');
echo "request {$i} status: " . $response->getStatusCode() . PHP_EOL;



> ripple将其封装为一个插件, 你可以像使用其他ripple组件一样使用Guzzle

\Co\Plugin::Guzzle()->getAsync('https://www.baidu.com')->then(function ($response) {
echo $response->getStatusCode();


ripple与AMPHP使用的都是ReactPHP的EventLoop, 所以你可以在ripple中使用AMPHP的组件,以MySQL为例


composer require amphp/mysql
use AmCo\Mysql\MysqlConfig;
use AmCo\Mysql\MysqlConnectionPool;
use function Co\async;
use function Co\run;

$config = MysqlConfig::fromString(
"host=localhost user=root password=aa123456 db=mysql"

$pool = new MysqlConnectionPool($config);

async(function ($r) use ($pool) {
$statement = $pool->prepare("SELECT * FROM db WHERE Host = :host");
$result = $statement->execute(['host' => 'localhost']);
foreach ($result as $row) {

async(function ($r) use ($pool) {
$statement = $pool->prepare("SELECT * FROM db WHERE Host = :host");
$result = $statement->execute(['host' => 'localhost']);
foreach ($result as $row) {


(3) 更多...