Skip to content

Commit 7dd43de

Browse files
authored
Merge pull request #9 from php-api-clients/feature-streaming-body
Streaming body
2 parents c88b34c + 71e0cfa commit 7dd43de

File tree

3 files changed

+52
-8
lines changed

3 files changed

+52
-8
lines changed

src/Transport/Client.php

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Psr\Http\Message\UriInterface;
1717
use React\Cache\CacheInterface;
1818
use React\EventLoop\LoopInterface;
19+
use React\EventLoop\Timer\TimerInterface;
1920
use React\Promise\PromiseInterface;
2021
use function React\Promise\reject;
2122
use React\Promise\RejectedPromise;
@@ -209,12 +210,9 @@ public function request(RequestInterface $request, array $options = [], bool $re
209210
));
210211
})->then(function (ResponseInterface $response) use ($request, $options, $refresh) {
211212
if (isset($options[RequestOptions::STREAM]) && $options[RequestOptions::STREAM] === true) {
212-
return resolve(
213-
new Response(
214-
'',
215-
$response
216-
)
217-
);
213+
$responseWrapper = new Response('', $response);
214+
$this->streamBody($responseWrapper);
215+
return resolve($responseWrapper);
218216
}
219217

220218
$contents = $response->getBody()->getContents();
@@ -245,6 +243,25 @@ public function request(RequestInterface $request, array $options = [], bool $re
245243
});
246244
}
247245

246+
protected function streamBody(Response $response)
247+
{
248+
$stream = $response->getResponse()->getBody();
249+
$this->loop->addPeriodicTimer(0.001, function (TimerInterface $timer) use ($stream, $response) {
250+
if ($stream->eof()) {
251+
$timer->cancel();
252+
$response->emit('end');
253+
return;
254+
}
255+
256+
$size = $stream->getSize();
257+
if ($size === 0) {
258+
return;
259+
}
260+
261+
$response->emit('data', [$stream->read($size)]);
262+
});
263+
}
264+
248265
protected function applyApiSettingsToRequest(RequestInterface $request): RequestInterface
249266
{
250267
$uri = $request->getUri();

src/Transport/Response.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22

33
namespace ApiClients\Foundation\Transport;
44

5+
use Evenement\EventEmitterInterface;
6+
use Evenement\EventEmitterTrait;
57
use Psr\Http\Message\ResponseInterface;
68

7-
final class Response
9+
final class Response implements EventEmitterInterface
810
{
11+
use EventEmitterTrait;
12+
913
/**
1014
* @var string
1115
*/

tests/Transport/ClientTest.php

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,22 @@ public function testRequestStreaming()
241241
$cache = Phake::mock(CacheInterface::class);
242242

243243
$stream = Phake::mock(StreamInterface::class);
244-
Phake::when($stream)->getContents()->thenReturn('{"foo":"bar"}');
244+
Phake::when($stream)->eof()
245+
->thenReturn(false)
246+
->thenReturn(false)
247+
->thenReturn(false)
248+
->thenReturn(true)
249+
;
250+
Phake::when($stream)->getSize()
251+
->thenReturn(1)
252+
->thenReturn(1)
253+
->thenReturn(1)
254+
;
255+
Phake::when($stream)->read(1)
256+
->thenReturn('a')
257+
->thenReturn('b')
258+
->thenReturn('c')
259+
;
245260

246261
$response = Phake::mock(Response::class);
247262
Phake::when($response)->getBody()->thenReturn($stream);
@@ -276,6 +291,14 @@ public function testRequestStreaming()
276291
$this->assertInstanceOf(TransportResponse::class, $result);
277292
$this->assertSame('', $result->getBody());
278293

294+
$buffer = '';
295+
$result->on('data', function ($data) use (&$buffer) {
296+
$buffer .= $data;
297+
});
298+
$loop->run();
299+
300+
$this->assertSame('abc', $buffer);
301+
279302
Phake::verifyNoInteraction($cache);
280303
}
281304

0 commit comments

Comments
 (0)