Skip to content

Commit f16e413

Browse files
committed
add backhaul thread
1 parent 3d97430 commit f16e413

File tree

1 file changed

+30
-2
lines changed
  • src/py/reactpy/reactpy/backend

1 file changed

+30
-2
lines changed

src/py/reactpy/reactpy/backend/asgi.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import urllib.parse
77
from collections.abc import Coroutine, Sequence
88
from pathlib import Path
9+
from threading import Thread
910

1011
import aiofiles
1112
import orjson
@@ -26,6 +27,16 @@
2627

2728
DEFAULT_BLOCK_SIZE = 8192
2829
_logger = logging.getLogger(__name__)
30+
_backhaul_loop = asyncio.new_event_loop()
31+
32+
33+
def start_backhaul_loop():
34+
"""Starts the asyncio event loop that will perform component rendering tasks."""
35+
asyncio.set_event_loop(_backhaul_loop)
36+
_backhaul_loop.run_forever()
37+
38+
39+
_backhaul_thread = Thread(target=start_backhaul_loop, daemon=True)
2940

3041

3142
class ReactPy:
@@ -38,6 +49,7 @@ def __init__(
3849
static_path: str | None = "^reactpy/static/([^/]+)/?",
3950
static_dir: Path | str | None = None,
4051
head: Sequence[VdomDict] | VdomDict | str = "",
52+
backhaul_thread: bool = True,
4153
) -> None:
4254
self.component = (
4355
app_or_component
@@ -65,6 +77,10 @@ def __init__(
6577
self.head = vdom_head_elements_to_html(head)
6678
self._cached_index_html = ""
6779
self.connected = False
80+
self.backhaul_thread = backhaul_thread
81+
self.dispatcher_future = None
82+
if self.backhaul_thread and not _backhaul_thread.is_alive():
83+
_backhaul_thread.start()
6884

6985
async def __call__(self, scope, receive, send) -> None:
7086
"""The ASGI callable. This determines whether ReactPy should route the the
@@ -120,13 +136,25 @@ async def component_dispatch_app(self, scope, receive, send) -> None:
120136
if event["type"] == "websocket.connect" and not self.connected:
121137
self.connected = True
122138
await send({"type": "websocket.accept"})
123-
await self.run_dispatcher(scope, receive, send)
139+
run_dispatcher = self.run_dispatcher(scope, receive, send)
140+
if self.backhaul_thread:
141+
self.dispatcher_future = asyncio.run_coroutine_threadsafe(
142+
run_dispatcher, _backhaul_loop
143+
)
144+
else:
145+
await run_dispatcher
124146

125147
if event["type"] == "websocket.disconnect":
148+
if self.dispatcher_future:
149+
self.dispatcher_future.cancel()
126150
break
127151

128152
if event["type"] == "websocket.receive":
129-
await self.recv_queue.put(orjson.loads(event["text"]))
153+
recv_queue_put = self.recv_queue.put(orjson.loads(event["text"]))
154+
if self.backhaul_thread:
155+
asyncio.run_coroutine_threadsafe(recv_queue_put, _backhaul_loop)
156+
else:
157+
await recv_queue_put
130158

131159
async def js_modules_app(self, scope, receive, send) -> None:
132160
"""ASGI app for ReactPy web modules."""

0 commit comments

Comments
 (0)