Skip to content

Commit 6113323

Browse files
committed
moved failed property to each task
1 parent c3cdaf2 commit 6113323

File tree

3 files changed

+14
-15
lines changed

3 files changed

+14
-15
lines changed

splitio/sync/segment.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,7 @@ async def synchronize_segments(self, segment_names = None, dont_wait = False):
360360
jobs = await self._worker_pool.submit_work(segment_names)
361361
if (dont_wait):
362362
return True
363-
await jobs.await_completion()
364-
return not self._worker_pool.pop_failed()
363+
return await jobs.await_completion()
365364

366365
async def segment_exist_in_storage(self, segment_name):
367366
"""

splitio/tasks/util/workerpool.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,6 @@ def __init__(self, worker_count, worker_func):
152152
self._queue = asyncio.Queue()
153153
self._handler = worker_func
154154
self._aborted = False
155-
self._failed = False
156155

157156
async def _schedule_work(self):
158157
"""wrap the message handler execution."""
@@ -173,7 +172,7 @@ async def _do_work(self, message):
173172
except Exception:
174173
_LOGGER.error("Something went wrong when processing message %s", message)
175174
_LOGGER.debug('Original traceback: ', exc_info=True)
176-
self._failed = True
175+
message._failed = True
177176
message._complete.set()
178177
self._semaphore.release() # signal worker is idle
179178

@@ -204,17 +203,13 @@ async def stop(self, event=None):
204203
"""abort all execution (except currently running handlers)."""
205204
await self._queue.put(self._abort)
206205

207-
def pop_failed(self):
208-
old = self._failed
209-
self._failed = False
210-
return old
211-
212206

213207
class TaskCompletionWraper:
214208
"""Task completion class"""
215209
def __init__(self, message):
216210
self._message = message
217211
self._complete = asyncio.Event()
212+
self._failed = False
218213

219214
async def await_completion(self):
220215
await self._complete.wait()
@@ -230,3 +225,7 @@ def __init__(self, tasks):
230225

231226
async def await_completion(self):
232227
await asyncio.gather(*[task.await_completion() for task in self._tasks])
228+
for task in self._tasks:
229+
if task._failed:
230+
return False
231+
return True

tests/tasks/util/test_workerpool.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,10 @@ async def worker_func(num):
9494
jobs.append(str(num))
9595

9696
task = await wpool.submit_work(jobs)
97-
await task.await_completion()
97+
assert await task.await_completion()
9898
await wpool.stop()
9999
for num in range(0, 11):
100100
assert str(num) in calls
101-
assert not wpool.pop_failed()
102101

103102
@pytest.mark.asyncio
104103
async def test_fail_in_msg_doesnt_break(self):
@@ -115,11 +114,13 @@ async def do_work(self, work):
115114
worker = Worker()
116115
wpool = workerpool.WorkerPoolAsync(50, worker.do_work)
117116
wpool.start()
117+
jobs = []
118118
for num in range(0, 100):
119-
await wpool.submit_work([str(num)])
120-
await asyncio.sleep(1)
119+
jobs.append(str(num))
120+
task = await wpool.submit_work(jobs)
121+
122+
assert not await task.await_completion()
121123
await wpool.stop()
122-
assert wpool.pop_failed()
123124

124125
for num in range(0, 100):
125126
if num != 55:
@@ -145,6 +146,6 @@ async def do_work(self, work):
145146
for num in range(0, 100):
146147
jobs.append(str(num))
147148
task = await wpool.submit_work(jobs)
148-
await task.await_completion()
149+
assert await task.await_completion()
149150
await wpool.stop()
150151
assert len(worker.worked) == 100

0 commit comments

Comments
 (0)