Skip to content
This repository was archived by the owner on Feb 17, 2025. It is now read-only.

Commit cedcb6e

Browse files
committed
Add multi message parcel
1 parent 4aa9ca6 commit cedcb6e

File tree

3 files changed

+86
-1
lines changed

3 files changed

+86
-1
lines changed

src/Message/Parcel.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace ReactParallel\ObjectProxy\Message;
6+
7+
use function array_merge;
8+
9+
final class Parcel
10+
{
11+
/** @var array<Call|Existence|Notify> */
12+
private array $messages = [];
13+
14+
/** @var array<Destruct> */
15+
private array $destructions = [];
16+
17+
public function call(Call $call): void
18+
{
19+
$this->messages[] = $call;
20+
}
21+
22+
public function existence(Existence $existence): void
23+
{
24+
$this->messages[] = $existence;
25+
}
26+
27+
public function notify(Notify $notify): void
28+
{
29+
$this->messages[] = $notify;
30+
}
31+
32+
public function destruct(Destruct $destruct): void
33+
{
34+
$this->destructions[] = $destruct;
35+
}
36+
37+
/**
38+
* @return iterable<Call|Destruct|Existence|Notify>
39+
*/
40+
public function messages(): iterable
41+
{
42+
yield from array_merge($this->messages, $this->destructions);
43+
}
44+
}

src/Proxy.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,21 @@
1111
use ReactParallel\ObjectProxy\Message\Destruct;
1212
use ReactParallel\ObjectProxy\Message\Existence;
1313
use ReactParallel\ObjectProxy\Message\Notify;
14+
use ReactParallel\ObjectProxy\Message\Parcel;
1415
use ReactParallel\ObjectProxy\Proxy\Instance;
16+
use Rx\Observable;
1517
use WyriHaximus\Metrics\Label;
1618
use WyriHaximus\Metrics\Registry;
1719
use WyriHaximus\Metrics\Registry\Counters;
1820

21+
use function ApiClients\Tools\Rx\observableFromArray;
1922
use function array_key_exists;
2023
use function bin2hex;
2124
use function get_class;
2225
use function is_object;
2326
use function random_bytes;
2427
use function var_export;
28+
use function WyriHaximus\iteratorOrArrayToArray;
2529

2630
final class Proxy extends ProxyList
2731
{
@@ -113,7 +117,7 @@ public function __destruct()
113117

114118
private function setUpHandlers(): void
115119
{
116-
$this->factory->streams()->channel($this->in)->subscribe(function (object $message): void {
120+
$this->factory->streams()->channel($this->in)->flatMap(static fn (object $message): Observable => observableFromArray($message instanceof Parcel ? iteratorOrArrayToArray($message->messages()) : [$message]))->subscribe(function (object $message): void {
117121
if ($message instanceof Existence) {
118122
$this->handleExistence($message);
119123

tests/Message/ParcelTest.php

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace ReactParallel\Tests\ObjectProxy\Message;
6+
7+
use parallel\Channel;
8+
use ReactParallel\ObjectProxy\Message\Call;
9+
use ReactParallel\ObjectProxy\Message\Destruct;
10+
use ReactParallel\ObjectProxy\Message\Existence;
11+
use ReactParallel\ObjectProxy\Message\Parcel;
12+
use WyriHaximus\AsyncTestUtilities\AsyncTestCase;
13+
14+
use function bin2hex;
15+
use function random_bytes;
16+
use function time;
17+
use function WyriHaximus\iteratorOrArrayToArray;
18+
19+
final class ParcelTest extends AsyncTestCase
20+
{
21+
/**
22+
* @test
23+
*/
24+
public function getters(): void
25+
{
26+
$destruct = new Destruct(bin2hex(random_bytes(1024)), bin2hex(random_bytes(1024)));
27+
$call = new Call(new Channel(1), bin2hex(random_bytes(1024)), bin2hex(random_bytes(1024)), 'hammer', [time()]);
28+
$existence = new Existence(bin2hex(random_bytes(1024)), bin2hex(random_bytes(1024)));
29+
30+
$parcel = new Parcel();
31+
$parcel->destruct($destruct);
32+
$parcel->call($call);
33+
$parcel->existence($existence);
34+
35+
self::assertSame([$call, $existence, $destruct], iteratorOrArrayToArray($parcel->messages()));
36+
}
37+
}

0 commit comments

Comments
 (0)