66
77use parallel \Channel ;
88use parallel \Events ;
9+ use parallel \Events \Event ;
910use parallel \Future ;
1011use React \EventLoop \Loop ;
1112use React \EventLoop \TimerInterface ;
@@ -35,21 +36,30 @@ final class EventLoopBridge
3536
3637 private Metrics |null $ metrics = null ;
3738
38- /** @var Events<Events\Event> */
39+ /**
40+ * @template T
41+ * @var Events<Events\Event<T>>
42+ */
3943 private Events $ events ;
4044
4145 private TimerInterface |null $ timer = null ;
4246
43- /** @var array<int, Subject> */
47+ /**
48+ * @template T
49+ * @var array<int, Subject<T>>
50+ */
4451 private array $ channels = [];
4552
46- /** @var array<int, Deferred> */
53+ /**
54+ * @template T
55+ * @var array<int, Deferred<T>>
56+ */
4757 private array $ futures = [];
4858
4959 /** @var array<float> */
5060 private array $ scaleRange = self ::DEFAULT_SCALE_RANGE ;
5161 private int $ scalePosition = self ::DEFAULT_SCALE_POSITION ;
52- private int $ scaleNoItemsCount = 0 ;
62+ private int $ scaleNoItemsCount = ZERO ;
5363
5464 public function __construct ()
5565 {
@@ -65,9 +75,16 @@ public function withMetrics(Metrics $metrics): self
6575 return $ self ;
6676 }
6777
68- /** @return iterable<mixed> */
78+ /**
79+ * @param Channel<T> $channel
80+ *
81+ * @return iterable<T>
82+ *
83+ * @template T
84+ */
6985 public function observe (Channel $ channel ): iterable
7086 {
87+ /** @var Subject<T> $subject */
7188 $ subject = new Subject ();
7289 $ this ->channels [spl_object_id ($ channel )] = $ subject ;
7390 $ this ->events ->addChannel ($ channel );
@@ -81,8 +98,16 @@ public function observe(Channel $channel): iterable
8198 return awaitObservable ($ subject );
8299 }
83100
101+ /**
102+ * @param Future<T> $futurea
103+ *
104+ * @return T
105+ *
106+ * @template T
107+ */
84108 public function await (Future $ future ): mixed
85109 {
110+ /** @var Deferred<T> $deferred */
86111 $ deferred = new Deferred ();
87112 $ this ->futures [spl_object_id ($ future )] = $ deferred ;
88113 $ this ->events ->addFuture (spl_object_hash ($ future ), $ future );
@@ -93,7 +118,6 @@ public function await(Future $future): mixed
93118
94119 $ this ->startTimer ();
95120
96- /** @phpstan-ignore-next-line */
97121 return await ($ deferred ->promise ());
98122 }
99123
@@ -199,6 +223,11 @@ private function runTimer(): void
199223 });
200224 }
201225
226+ /**
227+ * @param Event<T> $event
228+ *
229+ * @template T
230+ */
202231 private function handleReadEvent (Events \Event $ event ): void
203232 {
204233 if ($ event ->object instanceof Future) {
@@ -212,6 +241,11 @@ private function handleReadEvent(Events\Event $event): void
212241 $ this ->handleChannelReadEvent ($ event );
213242 }
214243
244+ /**
245+ * @param Events\Event<T> $event
246+ *
247+ * @template T
248+ */
215249 private function handleFutureReadEvent (Events \Event $ event ): void
216250 {
217251 $ this ->futures [spl_object_id ($ event ->object )]->resolve ($ event ->value );
@@ -226,6 +260,11 @@ private function handleFutureReadEvent(Events\Event $event): void
226260 $ futures ->gauge (new Label ('state ' , 'resolve ' ))->incr ();
227261 }
228262
263+ /**
264+ * @param Events\Event<T> $event
265+ *
266+ * @template T
267+ */
229268 private function handleChannelReadEvent (Events \Event $ event ): void
230269 {
231270 $ this ->channels [spl_object_id ($ event ->object )]->onNext ($ event ->value );
@@ -238,6 +277,11 @@ private function handleChannelReadEvent(Events\Event $event): void
238277 $ this ->metrics ->channelMessages ()->counter (new Label ('event ' , 'read ' ))->incr ();
239278 }
240279
280+ /**
281+ * @param Events\Event<T> $event
282+ *
283+ * @template T
284+ */
241285 private function handleCloseEvent (Events \Event $ event ): void
242286 {
243287 $ this ->channels [spl_object_id ($ event ->object )]->onCompleted ();
@@ -252,6 +296,11 @@ private function handleCloseEvent(Events\Event $event): void
252296 $ channels ->gauge (new Label ('state ' , 'close ' ))->incr ();
253297 }
254298
299+ /**
300+ * @param Events\Event<T> $event
301+ *
302+ * @template T
303+ */
255304 private function handleCancelEvent (Events \Event $ event ): void
256305 {
257306 $ this ->futures [spl_object_id ($ event ->object )]->reject (new CanceledFuture ());
@@ -266,6 +315,11 @@ private function handleCancelEvent(Events\Event $event): void
266315 $ futures ->gauge (new Label ('state ' , 'cancel ' ))->incr ();
267316 }
268317
318+ /**
319+ * @param Events\Event<T> $event
320+ *
321+ * @template T
322+ */
269323 private function handleKillEvent (Events \Event $ event ): void
270324 {
271325 $ this ->futures [spl_object_id ($ event ->object )]->reject (new KilledRuntime ());
@@ -280,6 +334,11 @@ private function handleKillEvent(Events\Event $event): void
280334 $ futures ->gauge (new Label ('state ' , 'kill ' ))->incr ();
281335 }
282336
337+ /**
338+ * @param Events\Event<T> $event
339+ *
340+ * @template T
341+ */
283342 private function handleErrorEvent (Events \Event $ event ): void
284343 {
285344 if (! ($ event ->object instanceof Future)) {
0 commit comments