From 02c5e25a5d0a2ca9a983091ac173f3b883c2fc8f Mon Sep 17 00:00:00 2001 From: David Skoog Date: Thu, 18 Dec 2025 18:14:16 -0500 Subject: [PATCH 1/8] Create new map_async api I tried map with a coroutine and it failed spectacularly: ``` @gen_test() def test_map_async_tornado(): @gen.coroutine def add_tor(x=0, y=0): return x + y source = Stream(asynchronous=True) L = source.map(add_tor, y=1).map(add_tor, y=2).sink_to_list() yield source.emit(0) yield gen.moment # yield to the event loop to ensure it finished > assert L == [3] E assert [] == [3] E E At index 0 diff: != 3 E E Full diff: E [ E - 3, E + , E ] ``` So I made a new `map_async` that uses native asyncio plumbing to await the coroutine before feeding it downstream. --- streamz/core.py | 55 ++++++++++++++++++++++++++++++++++++++ streamz/tests/test_core.py | 36 +++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/streamz/core.py b/streamz/core.py index ae7a27f9..4abc86ab 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -718,6 +718,61 @@ def update(self, x, who=None, metadata=None): return self._emit(result, metadata=metadata) +@Stream.register_api() +class map_async(Stream): + """ Apply an async function to every element in the stream + + Parameters + ---------- + func: async callable + *args : + The arguments to pass to the function. + **kwargs: + Keyword arguments to pass to func + + Examples + -------- + >>> async def mult(x, factor=1): + ... return factor*x + >>> async def run(): + ... source = Stream(asynchronous=True) + ... source.map_async(mult, factor=2).sink(print) + ... for i in range(5): + ... await source.emit(i) + >>> asyncio.run(run()) + 0 + 2 + 4 + 6 + 8 + + """ + def __init__(self, upstream, func, *args, **kwargs): + self.func = func + stream_name = kwargs.pop('stream_name', None) + self.kwargs = kwargs + self.args = args + self.work_queue = asyncio.Queue() + + Stream.__init__(self, upstream, stream_name=stream_name, ensure_io_loop=True) + self.cb_task = self.loop.asyncio_loop.create_task(self.cb()) + + def update(self, x, who=None, metadata=None): + coro = self.func(x, *self.args, **self.kwargs) + return self.work_queue.put_nowait((coro, metadata)) + + async def cb(self): + while True: + coro, metadata = await self.work_queue.get() + try: + result = await coro + except Exception as e: + logger.exception(e) + raise + else: + return self._emit(result, metadata=metadata) + + @Stream.register_api() class starmap(Stream): """ Apply a function to every element in the stream, splayed out diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 96ab9a24..e5e95af7 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -126,6 +126,42 @@ def add(x=0, y=0): assert L[0] == 11 +@gen_test() +def test_map_async_tornado(): + @gen.coroutine + def add_tor(x=0, y=0): + return x + y + + async def add_native(x=0, y=0): + return x + y + + source = Stream(asynchronous=True) + L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list() + + yield source.emit(0) + + yield gen.moment # Must yield to the event loop to ensure it finished + assert L == [3] + + +@pytest.mark.asyncio +async def test_map_async(): + @gen.coroutine + def add_tor(x=0, y=0): + return x + y + + async def add_native(x=0, y=0): + return x + y + + source = Stream(asynchronous=True) + L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list() + + await source.emit(0) + + await asyncio.sleep(0) # Must yield to the event loop to ensure it finished + assert L == [3] + + def test_map_args(): source = Stream() L = source.map(operator.add, 10).sink_to_list() From 1683bfc3781ad27d9350d87a3039465bc08728fe Mon Sep 17 00:00:00 2001 From: David Skoog Date: Thu, 18 Dec 2025 18:22:25 -0500 Subject: [PATCH 2/8] Add map_async to api.rst --- docs/source/api.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/api.rst b/docs/source/api.rst index c42abbc7..94c9ad8b 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -22,6 +22,7 @@ Stream filter flatten map + map_async partition rate_limit scatter From aa122bdd52d177b77e90522d9f3ade0b216132c3 Mon Sep 17 00:00:00 2001 From: David Skoog Date: Thu, 18 Dec 2025 18:39:10 -0500 Subject: [PATCH 3/8] Add documentation on map_async --- docs/source/async.rst | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/docs/source/async.rst b/docs/source/async.rst index 974ae376..8f32cff8 100644 --- a/docs/source/async.rst +++ b/docs/source/async.rst @@ -72,8 +72,8 @@ This would also work with async-await syntax in Python 3 .. code-block:: python + import asyncio from streamz import Stream - from tornado.ioloop import IOLoop async def f(): source = Stream(asynchronous=True) # tell the stream we're working asynchronously @@ -82,7 +82,28 @@ This would also work with async-await syntax in Python 3 for x in range(10): await source.emit(x) - IOLoop().run_sync(f) + asyncio.run(f()) + +When working asynchronously, we can also map asynchronous functions. + +.. code-block:: python + + async def increment_async(x): + """ A "long-running" increment function + + Simulates a function that does real asyncio work. + """ + await asyncio.sleep(0.1) + return x + 1 + + async def f_inc(): + source = Stream(asynchronous=True) # tell the stream we're working asynchronously + source.map_async(increment_async).rate_limit(0.500).sink(write) + + for x in range(10): + await source.emit(x) + + asyncio.run(f_inc()) Event Loop on a Separate Thread From 5c2495082a09207333761d94be5bc9ba8c4cca71 Mon Sep 17 00:00:00 2001 From: David Skoog Date: Thu, 18 Dec 2025 19:15:38 -0500 Subject: [PATCH 4/8] Use await_for idiom to make tests declarative --- streamz/tests/test_core.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index e5e95af7..d869268e 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -140,8 +140,7 @@ async def add_native(x=0, y=0): yield source.emit(0) - yield gen.moment # Must yield to the event loop to ensure it finished - assert L == [3] + yield await_for(lambda: L == [3], 1) @pytest.mark.asyncio @@ -158,8 +157,7 @@ async def add_native(x=0, y=0): await source.emit(0) - await asyncio.sleep(0) # Must yield to the event loop to ensure it finished - assert L == [3] + await await_for(lambda: L == [3], 1) def test_map_args(): From 39bcbd8ebc2a0f88549e9cd6ddba4aca751b5067 Mon Sep 17 00:00:00 2001 From: David Skoog Date: Thu, 18 Dec 2025 19:15:14 -0500 Subject: [PATCH 5/8] Mark the dataframe GC test as flaky --- streamz/dataframe/tests/test_dataframes.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/streamz/dataframe/tests/test_dataframes.py b/streamz/dataframe/tests/test_dataframes.py index f8345263..62029e82 100644 --- a/streamz/dataframe/tests/test_dataframes.py +++ b/streamz/dataframe/tests/test_dataframes.py @@ -8,6 +8,7 @@ from dask.dataframe.utils import assert_eq import numpy as np import pandas as pd +from flaky import flaky from tornado import gen from streamz import Stream @@ -570,6 +571,7 @@ def test_cumulative_aggregations(op, getter, stream): assert_eq(pd.concat(L), expected) +@flaky(max_runs=3, min_passes=1) @gen_test() def test_gc(): sdf = sd.Random(freq='5ms', interval='100ms') From 2506a92eadeddc51605b7bc968ca37d5deeff3af Mon Sep 17 00:00:00 2001 From: David Skoog Date: Fri, 19 Dec 2025 11:40:24 -0500 Subject: [PATCH 6/8] Fix the callback shutting itself off The background task can't return obviously if we want the stream to continue operating. --- streamz/core.py | 13 +++++++------ streamz/tests/test_core.py | 14 ++++++++++++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index 4abc86ab..54937671 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -752,25 +752,26 @@ def __init__(self, upstream, func, *args, **kwargs): stream_name = kwargs.pop('stream_name', None) self.kwargs = kwargs self.args = args - self.work_queue = asyncio.Queue() + self.input_queue = asyncio.Queue() Stream.__init__(self, upstream, stream_name=stream_name, ensure_io_loop=True) - self.cb_task = self.loop.asyncio_loop.create_task(self.cb()) + self.input_task = self.loop.asyncio_loop.create_task(self.input_callback()) def update(self, x, who=None, metadata=None): coro = self.func(x, *self.args, **self.kwargs) - return self.work_queue.put_nowait((coro, metadata)) + self.input_queue.put_nowait((coro, metadata)) - async def cb(self): + async def input_callback(self): while True: - coro, metadata = await self.work_queue.get() try: + coro, metadata = await self.input_queue.get() + self.input_queue.task_done() result = await coro except Exception as e: logger.exception(e) raise else: - return self._emit(result, metadata=metadata) + self._emit(result, metadata=metadata) @Stream.register_api() diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index d869268e..6ac7d099 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -139,8 +139,13 @@ async def add_native(x=0, y=0): L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list() yield source.emit(0) + yield source.emit(1) + yield source.emit(2) - yield await_for(lambda: L == [3], 1) + def fail_func(): + assert L == [3, 4, 5] + + yield await_for(lambda: L == [3, 4, 5], 1, fail_func=fail_func) @pytest.mark.asyncio @@ -156,8 +161,13 @@ async def add_native(x=0, y=0): L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list() await source.emit(0) + await source.emit(1) + await source.emit(2) + + def fail_func(): + assert L == [3, 4, 5] - await await_for(lambda: L == [3], 1) + await await_for(lambda: L == [3, 4, 5], 1, fail_func=fail_func) def test_map_args(): From 34509c779dcd4f2f56c0d61bc787c6701af4f018 Mon Sep 17 00:00:00 2001 From: David Skoog Date: Fri, 19 Dec 2025 11:56:03 -0500 Subject: [PATCH 7/8] Take a hint from buffer about backpressure --- streamz/core.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index 54937671..6a172894 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -727,6 +727,8 @@ class map_async(Stream): func: async callable *args : The arguments to pass to the function. + buffer_size: + The max size of the input buffer, default value is unlimited **kwargs: Keyword arguments to pass to func @@ -747,19 +749,23 @@ class map_async(Stream): 8 """ - def __init__(self, upstream, func, *args, **kwargs): + def __init__(self, upstream, func, *args, buffer_size=0, **kwargs): self.func = func stream_name = kwargs.pop('stream_name', None) self.kwargs = kwargs self.args = args - self.input_queue = asyncio.Queue() + self.input_queue = asyncio.Queue(maxsize=buffer_size) Stream.__init__(self, upstream, stream_name=stream_name, ensure_io_loop=True) - self.input_task = self.loop.asyncio_loop.create_task(self.input_callback()) + self.input_task = self._create_task(self.input_callback()) def update(self, x, who=None, metadata=None): coro = self.func(x, *self.args, **self.kwargs) - self.input_queue.put_nowait((coro, metadata)) + self._retain_refs(metadata) + return self._create_task(self.input_queue.put((coro, metadata))) + + def _create_task(self, coro): + return self.loop.asyncio_loop.create_task(coro) async def input_callback(self): while True: @@ -771,7 +777,10 @@ async def input_callback(self): logger.exception(e) raise else: - self._emit(result, metadata=metadata) + results = self._emit(result, metadata=metadata) + if results: + await asyncio.gather(*results) + self._release_refs(metadata) @Stream.register_api() From ea57a59c464e59f5b0a533034aae72eec3f3a6a3 Mon Sep 17 00:00:00 2001 From: David Skoog Date: Fri, 19 Dec 2025 14:28:58 -0500 Subject: [PATCH 8/8] Allow for map_async to run items in parallel Use an asyncio.Queue of the tasks to ensure that arrival and departure order of elements match. Asserts back pressure when a new value arrives via update but the work queue is full. Because asyncio.Queue cannot peak, the parallelism factor is not precise as the worker callback can have either zero or one task in hand but it must free up a slot in the queue to do so. Under pressure, the parallelism will generally be `(parallelism + 1)` instead of `parallelism` as given in the `__init__` as one Future will be in the awaited in the worker callback while the queue fills up from update calls. --- streamz/core.py | 43 +++++++++++++++++++++++++------------- streamz/tests/test_core.py | 10 +++++++-- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/streamz/core.py b/streamz/core.py index 6a172894..00b5ed4c 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -720,15 +720,16 @@ def update(self, x, who=None, metadata=None): @Stream.register_api() class map_async(Stream): - """ Apply an async function to every element in the stream + """ Apply an async function to every element in the stream, preserving order + even when evaluating multiple inputs in parallel. Parameters ---------- func: async callable *args : The arguments to pass to the function. - buffer_size: - The max size of the input buffer, default value is unlimited + parallelism: + The maximum number of parallel Tasks for evaluating func, default value is 1 **kwargs: Keyword arguments to pass to func @@ -747,32 +748,31 @@ class map_async(Stream): 4 6 8 - """ - def __init__(self, upstream, func, *args, buffer_size=0, **kwargs): + def __init__(self, upstream, func, *args, parallelism=1, **kwargs): self.func = func stream_name = kwargs.pop('stream_name', None) self.kwargs = kwargs self.args = args - self.input_queue = asyncio.Queue(maxsize=buffer_size) + self.work_queue = asyncio.Queue(maxsize=parallelism) Stream.__init__(self, upstream, stream_name=stream_name, ensure_io_loop=True) - self.input_task = self._create_task(self.input_callback()) + self.work_task = self._create_task(self.work_callback()) def update(self, x, who=None, metadata=None): - coro = self.func(x, *self.args, **self.kwargs) - self._retain_refs(metadata) - return self._create_task(self.input_queue.put((coro, metadata))) + return self._create_task(self._insert_job(x, metadata)) def _create_task(self, coro): + if gen.is_future(coro): + return coro return self.loop.asyncio_loop.create_task(coro) - async def input_callback(self): + async def work_callback(self): while True: try: - coro, metadata = await self.input_queue.get() - self.input_queue.task_done() - result = await coro + task, metadata = await self.work_queue.get() + self.work_queue.task_done() + result = await task except Exception as e: logger.exception(e) raise @@ -782,6 +782,21 @@ async def input_callback(self): await asyncio.gather(*results) self._release_refs(metadata) + async def _wait_for_work_slot(self): + while self.work_queue.full(): + await asyncio.sleep(0) + + async def _insert_job(self, x, metadata): + try: + await self._wait_for_work_slot() + coro = self.func(x, *self.args, **self.kwargs) + task = self._create_task(coro) + await self.work_queue.put((task, metadata)) + self._retain_refs(metadata) + except Exception as e: + logger.exception(e) + raise + @Stream.register_api() class starmap(Stream): diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 6ac7d099..9245f2e6 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -133,11 +133,13 @@ def add_tor(x=0, y=0): return x + y async def add_native(x=0, y=0): + await asyncio.sleep(0.1) return x + y source = Stream(asynchronous=True) - L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list() + L = source.map_async(add_tor, y=1).map_async(add_native, parallelism=2, y=2).buffer(1).sink_to_list() + start = time() yield source.emit(0) yield source.emit(1) yield source.emit(2) @@ -146,6 +148,7 @@ def fail_func(): assert L == [3, 4, 5] yield await_for(lambda: L == [3, 4, 5], 1, fail_func=fail_func) + assert (time() - start) == pytest.approx(0.1, abs=4e-3) @pytest.mark.asyncio @@ -155,11 +158,13 @@ def add_tor(x=0, y=0): return x + y async def add_native(x=0, y=0): + await asyncio.sleep(0.1) return x + y source = Stream(asynchronous=True) - L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list() + L = source.map_async(add_tor, y=1).map_async(add_native, parallelism=2, y=2).sink_to_list() + start = time() await source.emit(0) await source.emit(1) await source.emit(2) @@ -168,6 +173,7 @@ def fail_func(): assert L == [3, 4, 5] await await_for(lambda: L == [3, 4, 5], 1, fail_func=fail_func) + assert (time() - start) == pytest.approx(0.1, abs=4e-3) def test_map_args():