From b58f6039714bf9c0b35d5ef995578eedd6e8b62d Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 14 Aug 2025 12:56:59 -0700 Subject: [PATCH 1/7] WIP --- cadence/async/event_loop.py | 40 +++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 cadence/async/event_loop.py diff --git a/cadence/async/event_loop.py b/cadence/async/event_loop.py new file mode 100644 index 0000000..577552c --- /dev/null +++ b/cadence/async/event_loop.py @@ -0,0 +1,40 @@ + + +from asyncio import AbstractEventLoop +from asyncio import Future +import asyncio + +class EventLoop(AbstractEventLoop): + __running = False + + def __init__(self): + super().__init__() + + def run_until_complete(self, future: Future): + pass + + def start(self): + self.__running = True + + def is_running(self): + return self.__running + +async def workflow(): + asyncio.set_event_loop(EventLoop()) + + task1 = asyncio.create_task(delay_print("Hello, world!", 1)) + task2 = asyncio.create_task(delay_print("Hello, world!", 1)) + await task1 + await task2 + + +async def delay_print(message, delay): + await asyncio.sleep(delay) + print(message) + + +if __name__ == "__main__": + print("Starting workflow") + loop = asyncio.new_event_loop() + loop.start() + loop.run_until_complete(workflow()) From 976be37238d901bf8602f8c7cdd153e02cf1333c Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Tue, 19 Aug 2025 09:31:52 -0700 Subject: [PATCH 2/7] wip --- cadence/async/event_loop.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/cadence/async/event_loop.py b/cadence/async/event_loop.py index 577552c..16b56ab 100644 --- a/cadence/async/event_loop.py +++ b/cadence/async/event_loop.py @@ -1,10 +1,10 @@ -from asyncio import AbstractEventLoop +from asyncio import AbstractEventLoop, BaseEventLoop, EventLoop from asyncio import Future import asyncio -class EventLoop(AbstractEventLoop): +class EventLoop(BaseEventLoop): __running = False def __init__(self): @@ -33,6 +33,11 @@ async def delay_print(message, delay): print(message) +class DeterministicRunner(): + runner = None + + def run_util_block + if __name__ == "__main__": print("Starting workflow") loop = asyncio.new_event_loop() From bddf396885dbc0b318201740e9a75271f11b6174 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Wed, 20 Aug 2025 15:53:51 -0700 Subject: [PATCH 3/7] deterministic-el --- cadence/async/event_loop.py | 45 ----- cadence/workflow/deterministic_event_loop.py | 171 +++++++++++++++++++ 2 files changed, 171 insertions(+), 45 deletions(-) delete mode 100644 cadence/async/event_loop.py create mode 100644 cadence/workflow/deterministic_event_loop.py diff --git a/cadence/async/event_loop.py b/cadence/async/event_loop.py deleted file mode 100644 index 16b56ab..0000000 --- a/cadence/async/event_loop.py +++ /dev/null @@ -1,45 +0,0 @@ - - -from asyncio import AbstractEventLoop, BaseEventLoop, EventLoop -from asyncio import Future -import asyncio - -class EventLoop(BaseEventLoop): - __running = False - - def __init__(self): - super().__init__() - - def run_until_complete(self, future: Future): - pass - - def start(self): - self.__running = True - - def is_running(self): - return self.__running - -async def workflow(): - asyncio.set_event_loop(EventLoop()) - - task1 = asyncio.create_task(delay_print("Hello, world!", 1)) - task2 = asyncio.create_task(delay_print("Hello, world!", 1)) - await task1 - await task2 - - -async def delay_print(message, delay): - await asyncio.sleep(delay) - print(message) - - -class DeterministicRunner(): - runner = None - - def run_util_block - -if __name__ == "__main__": - print("Starting workflow") - loop = asyncio.new_event_loop() - loop.start() - loop.run_until_complete(workflow()) diff --git a/cadence/workflow/deterministic_event_loop.py b/cadence/workflow/deterministic_event_loop.py new file mode 100644 index 0000000..53ef90c --- /dev/null +++ b/cadence/workflow/deterministic_event_loop.py @@ -0,0 +1,171 @@ +from asyncio import AbstractEventLoop, BaseEventLoop, EventLoop, futures, tasks +from asyncio import Future +import logging +import collections +import asyncio.events as events +import threading + +logger = logging.getLogger(__name__) + + +class DeterministicEventLoop(AbstractEventLoop): + """ + This is a basic FIFO implementation of event loop that does not allow I/O or timer operations. + As a result, it's theoretically deterministic. This event loop is not useful directly without async events processing inside the loop. + + Code is mostly copied from asyncio.BaseEventLoop without I/O or timer operations. + """ + + def __init__(self): + self._thread_id: int = None # indicate if the event loop is running + self._debug = False + self._ready = collections.deque[events.Handle]() + self._stopping = False + self._closed = False + + def call_soon(self, callback, *args, context=None): + self._call_soon(callback, args, context) + + def _call_soon(self, callback, args, context): + handle = events.Handle(callback, args, self, context) + if handle._source_traceback: + del handle._source_traceback[-1] + self._ready.append(handle) + return handle + + def get_debug(self): + return self._debug + + + def run_forever(self): + """Run until stop() is called.""" + self._run_forever_setup() + try: + while True: + self._run_once() + if self._stopping: + break + finally: + self._run_forever_cleanup() + + + def run_until_complete(self, future: Future): + """Run until the Future is done. + + If the argument is a coroutine, it is wrapped in a Task. + + WARNING: It would be disastrous to call run_until_complete() + with the same coroutine twice -- it would wrap it in two + different Tasks and that can't be good. + + Return the Future's result, or raise its exception. + """ + self._check_closed() + self._check_running() + + new_task = not futures.isfuture(future) + future = tasks.ensure_future(future, loop=self) + if new_task: + # An exception is raised if the future didn't complete, so there + # is no need to log the "destroy pending task" message + future._log_destroy_pending = False + + future.add_done_callback(_run_until_complete_cb) + try: + self.run_forever() + except: + if new_task and future.done() and not future.cancelled(): + # The coroutine raised a BaseException. Consume the exception + # to not log a warning, the caller doesn't have access to the + # local task. + future.exception() + raise + finally: + future.remove_done_callback(_run_until_complete_cb) + if not future.done(): + raise RuntimeError('Event loop stopped before Future completed.') + + return future.result() + + def create_task(self, coro, **kwargs): + """Schedule a coroutine object. + + Return a task object. + """ + self._check_closed() + + # NOTE: eager_start is not supported for deterministic event loop + if kwargs.get("eager_start", False): + raise RuntimeError("eager_start in create_task is not supported for deterministic event loop") + + task = tasks.Task(coro, loop=self, **kwargs) + if task._source_traceback: + del task._source_traceback[-1] + try: + return task + finally: + # gh-128552: prevent a refcycle of + # task.exception().__traceback__->BaseEventLoop.create_task->task + del task + + def _run_once(self): + ntodo = len(self._ready) + for i in range(ntodo): + handle = self._ready.popleft() + if handle._cancelled: + continue + handle._run() + + def _run_forever_setup(self): + self._check_closed() + self._check_running() + self._thread_id = threading.get_ident() + events._set_running_loop(self) + + def _run_forever_cleanup(self): + self._stopping = False + self._thread_id = None + events._set_running_loop(None) + + def stop(self): + self._stopping = True + + def _check_closed(self): + if self._closed: + raise RuntimeError('Event loop is closed') + + def _check_running(self): + if self.is_running(): + raise RuntimeError('This event loop is already running') + if events._get_running_loop() is not None: + raise RuntimeError( + 'Cannot run the event loop while another loop is running') + + def is_running(self): + return (self._thread_id is not None) + + def close(self): + """Close the event loop. + The event loop must not be running. + """ + if self.is_running(): + raise RuntimeError("Cannot close a running event loop") + if self._closed: + return + if self._debug: + logger.debug("Close %r", self) + self._closed = True + self._ready.clear() + + def is_closed(self): + """Returns True if the event loop was closed.""" + return self._closed + +def _run_until_complete_cb(fut: Future): + if not fut.cancelled(): + exc = fut.exception() + if isinstance(exc, (SystemExit, KeyboardInterrupt)): + # Issue #22429: run_forever() already finished, no need to + # stop it. + return + fut.get_loop().stop() From 280c9ca1bfac95c58fb9d93ab3aa8e1a8709d262 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Wed, 27 Aug 2025 16:21:41 -0700 Subject: [PATCH 4/7] add unit test --- cadence/workflow/deterministic_event_loop.py | 7 +- .../workflow/test_deterministic_event_loop.py | 72 +++++++++++++++++++ 2 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 tests/cadence/workflow/test_deterministic_event_loop.py diff --git a/cadence/workflow/deterministic_event_loop.py b/cadence/workflow/deterministic_event_loop.py index 53ef90c..c6d9821 100644 --- a/cadence/workflow/deterministic_event_loop.py +++ b/cadence/workflow/deterministic_event_loop.py @@ -1,4 +1,4 @@ -from asyncio import AbstractEventLoop, BaseEventLoop, EventLoop, futures, tasks +from asyncio import AbstractEventLoop, futures, tasks from asyncio import Future import logging import collections @@ -108,6 +108,9 @@ def create_task(self, coro, **kwargs): # task.exception().__traceback__->BaseEventLoop.create_task->task del task + def create_future(self): + return futures.Future(loop=self) + def _run_once(self): ntodo = len(self._ready) for i in range(ntodo): @@ -165,7 +168,5 @@ def _run_until_complete_cb(fut: Future): if not fut.cancelled(): exc = fut.exception() if isinstance(exc, (SystemExit, KeyboardInterrupt)): - # Issue #22429: run_forever() already finished, no need to - # stop it. return fut.get_loop().stop() diff --git a/tests/cadence/workflow/test_deterministic_event_loop.py b/tests/cadence/workflow/test_deterministic_event_loop.py new file mode 100644 index 0000000..51fcb68 --- /dev/null +++ b/tests/cadence/workflow/test_deterministic_event_loop.py @@ -0,0 +1,72 @@ +import pytest +import asyncio +from cadence.workflow.deterministic_event_loop import DeterministicEventLoop + + +async def coro_append(results: list, i: int): + results.append(i) + +async def coro_await(size: int): + results = [] + for i in range(size): + await coro_append(results, i) + return results + +async def coro_await_future(future: asyncio.Future): + return await future + +async def coro_await_task(size: int): + results = [] + for i in range(size): + asyncio.create_task(coro_append(results, i)) + return results + +class TestDeterministicEventLoop: + """Test suite for DeterministicEventLoop using table-driven tests.""" + + def setup_method(self): + """Setup method called before each test.""" + self.loop = DeterministicEventLoop() + + def teardown_method(self): + """Teardown method called after each test.""" + if not self.loop.is_closed(): + self.loop.close() + assert self.loop.is_closed() is True + + def test_call_soon(self): + """Test _run_once executes single callback.""" + results = [] + expected = [] + for i in range(10000): + expected.append(i) + self.loop.call_soon(lambda x=i: results.append(x)) + + self.loop._run_once() + + assert results == expected + assert self.loop.is_running() is False + + def test_run_until_complete(self): + size = 10000 + results = self.loop.run_until_complete(coro_await(size)) + assert results == list(range(size)) + assert self.loop.is_running() is False + assert self.loop.is_closed() is False + + @pytest.mark.parametrize("result, exception, expected, expected_exception", + [(10000, None, 10000, None), (None, ValueError("test"), None, ValueError)]) + def test_create_future(self, result, exception, expected, expected_exception): + future = self.loop.create_future() + if expected_exception is not None: + with pytest.raises(expected_exception): + future.set_exception(exception) + self.loop.run_until_complete(coro_await_future(future)) + else: + future.set_result(result) + assert self.loop.run_until_complete(coro_await_future(future)) == expected + + def test_create_task(self): + size = 10000 + results = self.loop.run_until_complete(coro_await_task(size)) + assert results == list(range(size)) From 266c20eba2d2adc15115952d3aa7d0d794b85e7b Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 28 Aug 2025 14:44:53 -0700 Subject: [PATCH 5/7] wip --- cadence/workflow/deterministic_event_loop.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cadence/workflow/deterministic_event_loop.py b/cadence/workflow/deterministic_event_loop.py index c6d9821..0cc9468 100644 --- a/cadence/workflow/deterministic_event_loop.py +++ b/cadence/workflow/deterministic_event_loop.py @@ -1,9 +1,11 @@ -from asyncio import AbstractEventLoop, futures, tasks +from asyncio import AbstractEventLoop, Handle, futures, tasks from asyncio import Future +from contextvars import Context import logging import collections import asyncio.events as events import threading +from typing import Callable logger = logging.getLogger(__name__) @@ -23,7 +25,7 @@ def __init__(self): self._stopping = False self._closed = False - def call_soon(self, callback, *args, context=None): + def call_soon(self, callback: Callable, *args, context : Context | None = None) -> Handle: self._call_soon(callback, args, context) def _call_soon(self, callback, args, context): From c41d075fe7eae8735494607f14e087af3c71f712 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 28 Aug 2025 15:28:37 -0700 Subject: [PATCH 6/7] fix lint and type --- cadence/workflow/deterministic_event_loop.py | 32 ++++++-------------- 1 file changed, 9 insertions(+), 23 deletions(-) diff --git a/cadence/workflow/deterministic_event_loop.py b/cadence/workflow/deterministic_event_loop.py index 0cc9468..2fce400 100644 --- a/cadence/workflow/deterministic_event_loop.py +++ b/cadence/workflow/deterministic_event_loop.py @@ -1,14 +1,15 @@ from asyncio import AbstractEventLoop, Handle, futures, tasks -from asyncio import Future from contextvars import Context import logging import collections import asyncio.events as events import threading from typing import Callable +from typing_extensions import Unpack, TypeVarTuple logger = logging.getLogger(__name__) +_Ts = TypeVarTuple("_Ts") class DeterministicEventLoop(AbstractEventLoop): """ @@ -19,19 +20,17 @@ class DeterministicEventLoop(AbstractEventLoop): """ def __init__(self): - self._thread_id: int = None # indicate if the event loop is running + self._thread_id = None # indicate if the event loop is running self._debug = False self._ready = collections.deque[events.Handle]() self._stopping = False self._closed = False - def call_soon(self, callback: Callable, *args, context : Context | None = None) -> Handle: - self._call_soon(callback, args, context) + def call_soon(self, callback: Callable[[Unpack[_Ts]], object], *args: Unpack[_Ts], context: Context | None = None) -> Handle: + return self._call_soon(callback, args, context) - def _call_soon(self, callback, args, context): + def _call_soon(self, callback, args, context) -> Handle: handle = events.Handle(callback, args, self, context) - if handle._source_traceback: - del handle._source_traceback[-1] self._ready.append(handle) return handle @@ -50,8 +49,7 @@ def run_forever(self): finally: self._run_forever_cleanup() - - def run_until_complete(self, future: Future): + def run_until_complete(self, future): """Run until the Future is done. If the argument is a coroutine, it is wrapped in a Task. @@ -67,10 +65,6 @@ def run_until_complete(self, future: Future): new_task = not futures.isfuture(future) future = tasks.ensure_future(future, loop=self) - if new_task: - # An exception is raised if the future didn't complete, so there - # is no need to log the "destroy pending task" message - future._log_destroy_pending = False future.add_done_callback(_run_until_complete_cb) try: @@ -100,15 +94,7 @@ def create_task(self, coro, **kwargs): if kwargs.get("eager_start", False): raise RuntimeError("eager_start in create_task is not supported for deterministic event loop") - task = tasks.Task(coro, loop=self, **kwargs) - if task._source_traceback: - del task._source_traceback[-1] - try: - return task - finally: - # gh-128552: prevent a refcycle of - # task.exception().__traceback__->BaseEventLoop.create_task->task - del task + return tasks.Task(coro, loop=self, **kwargs) def create_future(self): return futures.Future(loop=self) @@ -166,7 +152,7 @@ def is_closed(self): """Returns True if the event loop was closed.""" return self._closed -def _run_until_complete_cb(fut: Future): +def _run_until_complete_cb(fut): if not fut.cancelled(): exc = fut.exception() if isinstance(exc, (SystemExit, KeyboardInterrupt)): From 52914ba9459fbfe3f4a7f6af23679eb0588b7d51 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 28 Aug 2025 15:32:46 -0700 Subject: [PATCH 7/7] add set_debug method --- cadence/workflow/deterministic_event_loop.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cadence/workflow/deterministic_event_loop.py b/cadence/workflow/deterministic_event_loop.py index 2fce400..f0af8e3 100644 --- a/cadence/workflow/deterministic_event_loop.py +++ b/cadence/workflow/deterministic_event_loop.py @@ -37,6 +37,8 @@ def _call_soon(self, callback, args, context) -> Handle: def get_debug(self): return self._debug + def set_debug(self, enabled: bool): + self._debug = enabled def run_forever(self): """Run until stop() is called."""