Skip to content

Commit c102bbd

Browse files
committed
Fix the callback shutting itself off
The background task can't return obviously if we want the stream to continue operating.
1 parent f2d3dc5 commit c102bbd

File tree

2 files changed

+19
-8
lines changed

2 files changed

+19
-8
lines changed

streamz/core.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -752,25 +752,26 @@ def __init__(self, upstream, func, *args, **kwargs):
752752
stream_name = kwargs.pop('stream_name', None)
753753
self.kwargs = kwargs
754754
self.args = args
755-
self.work_queue = asyncio.Queue()
755+
self.input_queue = asyncio.Queue()
756756

757757
Stream.__init__(self, upstream, stream_name=stream_name, ensure_io_loop=True)
758-
self.cb_task = self.loop.asyncio_loop.create_task(self.cb())
758+
self.input_task = self.loop.asyncio_loop.create_task(self.input_callback())
759759

760760
def update(self, x, who=None, metadata=None):
761761
coro = self.func(x, *self.args, **self.kwargs)
762-
return self.work_queue.put_nowait((coro, metadata))
762+
self.input_queue.put_nowait((coro, metadata))
763763

764-
async def cb(self):
764+
async def input_callback(self):
765765
while True:
766-
coro, metadata = await self.work_queue.get()
767766
try:
767+
coro, metadata = await self.input_queue.get()
768+
self.input_queue.task_done()
768769
result = await coro
769770
except Exception as e:
770771
logger.exception(e)
771772
raise
772773
else:
773-
return self._emit(result, metadata=metadata)
774+
self._emit(result, metadata=metadata)
774775

775776

776777
@Stream.register_api()

streamz/tests/test_core.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,13 @@ async def add_native(x=0, y=0):
139139
L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list()
140140

141141
yield source.emit(0)
142+
yield source.emit(1)
143+
yield source.emit(2)
142144

143-
yield await_for(lambda: L == [3], 1)
145+
def fail_func():
146+
assert L == [3, 4, 5]
147+
148+
yield await_for(lambda: L == [3, 4, 5], 1, fail_func=fail_func)
144149

145150

146151
@pytest.mark.asyncio
@@ -156,8 +161,13 @@ async def add_native(x=0, y=0):
156161
L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list()
157162

158163
await source.emit(0)
164+
await source.emit(1)
165+
await source.emit(2)
166+
167+
def fail_func():
168+
assert L == [3, 4, 5]
159169

160-
await await_for(lambda: L == [3], 1)
170+
await await_for(lambda: L == [3, 4, 5], 1, fail_func=fail_func)
161171

162172

163173
def test_map_args():

0 commit comments

Comments
 (0)