Skip to content

Commit f5ba233

Browse files
authored
Merge pull request #427 from splitio/async-tasks-asynctask
added asynctask async class
2 parents 32da7b8 + 0b91679 commit f5ba233

File tree

2 files changed

+291
-3
lines changed

2 files changed

+291
-3
lines changed

splitio/tasks/util/asynctask.py

Lines changed: 151 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@
22
import threading
33
import logging
44
import queue
5-
5+
from splitio.optional.loaders import asyncio
66

77
__TASK_STOP__ = 0
88
__TASK_FORCE_RUN__ = 1
99

1010
_LOGGER = logging.getLogger(__name__)
1111

12-
1312
def _safe_run(func):
1413
"""
1514
Execute a function wrapped in a try-except block.
@@ -30,6 +29,26 @@ def _safe_run(func):
3029
_LOGGER.debug('Original traceback:', exc_info=True)
3130
return False
3231

32+
async def _safe_run_async(func):
33+
"""
34+
Execute a function wrapped in a try-except block.
35+
36+
If anything goes wrong returns false instead of propagating the exception.
37+
38+
:param func: Function to be executed, receives no arguments and it's return
39+
value is ignored.
40+
"""
41+
try:
42+
await func()
43+
return True
44+
except Exception: # pylint: disable=broad-except
45+
# Catch any exception that might happen to avoid the periodic task
46+
# from ending and allowing for a recovery, as well as preventing
47+
# an exception from propagating and breaking the main thread
48+
_LOGGER.error('Something went wrong when running passed function.')
49+
_LOGGER.debug('Original traceback:', exc_info=True)
50+
return False
51+
3352

3453
class AsyncTask(object): # pylint: disable=too-many-instance-attributes
3554
"""
@@ -166,3 +185,133 @@ def force_execution(self):
166185
def running(self):
167186
"""Return whether the task is running or not."""
168187
return self._running
188+
189+
190+
class AsyncTaskAsync(object): # pylint: disable=too-many-instance-attributes
191+
"""
192+
Asyncrhonous controllable task async class.
193+
194+
This class creates is used to wrap around a function to treat it as a
195+
periodic task. This task can be stopped, it's execution can be forced, and
196+
it's status (whether it's running or not) can be obtained from the task
197+
object.
198+
It also allows for "on init" and "on stop" functions to be passed.
199+
"""
200+
201+
202+
def __init__(self, main, period, on_init=None, on_stop=None):
203+
"""
204+
Class constructor.
205+
206+
:param main: Main function to be executed periodically
207+
:type main: callable
208+
:param period: How many seconds to wait between executions
209+
:type period: int
210+
:param on_init: Function to be executed ONCE before the main one
211+
:type on_init: callable
212+
:param on_stop: Function to be executed ONCE after the task has finished
213+
:type on_stop: callable
214+
"""
215+
self._on_init = on_init
216+
self._main = main
217+
self._on_stop = on_stop
218+
self._period = period
219+
self._messages = asyncio.Queue()
220+
self._running = False
221+
self._completion_event = None
222+
223+
async def _execution_wrapper(self):
224+
"""
225+
Execute user defined function in separate thread.
226+
227+
It will execute the "on init" hook is available. If an exception is
228+
raised it will abort execution, otherwise it will enter an infinite
229+
loop in which the main function is executed every <period> seconds.
230+
After stop has been called the "on stop" hook will be invoked if
231+
available.
232+
233+
All custom functions are run within a _safe_run() function which
234+
prevents exceptions from being propagated.
235+
"""
236+
try:
237+
if self._on_init is not None:
238+
if not await _safe_run_async(self._on_init):
239+
_LOGGER.error("Error running task initialization function, aborting execution")
240+
self._running = False
241+
return
242+
self._running = True
243+
244+
while self._running:
245+
try:
246+
msg = self._messages.get_nowait()
247+
if msg == __TASK_STOP__:
248+
_LOGGER.debug("Stop signal received. finishing task execution")
249+
break
250+
elif msg == __TASK_FORCE_RUN__:
251+
_LOGGER.debug("Force execution signal received. Running now")
252+
if not await _safe_run_async(self._main):
253+
_LOGGER.error("An error occurred when executing the task. "
254+
"Retrying after perio expires")
255+
continue
256+
except asyncio.QueueEmpty:
257+
# If no message was received, the timeout has expired
258+
# and we're ready for a new execution
259+
pass
260+
except asyncio.CancelledError:
261+
break
262+
263+
await asyncio.sleep(self._period)
264+
if not await _safe_run_async(self._main):
265+
_LOGGER.error(
266+
"An error occurred when executing the task. "
267+
"Retrying after period expires"
268+
)
269+
finally:
270+
await self._cleanup()
271+
272+
async def _cleanup(self):
273+
"""Execute on_stop callback, set event if needed, update status."""
274+
if self._on_stop is not None:
275+
if not await _safe_run_async(self._on_stop):
276+
_LOGGER.error("An error occurred when executing the task's OnStop hook. ")
277+
278+
self._running = False
279+
self._completion_event.set()
280+
281+
def start(self):
282+
"""Start the async task."""
283+
if self._running:
284+
_LOGGER.warning("Task is already running. Ignoring .start() call")
285+
return
286+
# Start execution
287+
self._completion_event = asyncio.Event()
288+
asyncio.get_running_loop().create_task(self._execution_wrapper())
289+
290+
async def stop(self, wait_for_completion=False):
291+
"""
292+
Send a signal to the thread in order to stop it. If the task is not running do nothing.
293+
294+
Optionally accept an event to be set upon task completion.
295+
296+
:param event: Event to set when the task completes.
297+
:type event: threading.Event
298+
"""
299+
if not self._running:
300+
return
301+
302+
# Queue is of infinite size, should not raise an exception
303+
self._messages.put_nowait(__TASK_STOP__)
304+
305+
if wait_for_completion:
306+
await self._completion_event.wait()
307+
308+
def force_execution(self):
309+
"""Force an execution of the task without waiting for the period to end."""
310+
if not self._running:
311+
return
312+
# Queue is of infinite size, should not raise an exception
313+
self._messages.put_nowait(__TASK_FORCE_RUN__)
314+
315+
def running(self):
316+
"""Return whether the task is running or not."""
317+
return self._running

tests/tasks/util/test_asynctask.py

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
import time
44
import threading
5-
from splitio.tasks.util import asynctask
5+
import pytest
66

7+
from splitio.tasks.util import asynctask
8+
from splitio.optional.loaders import asyncio
79

810
class AsyncTaskTests(object):
911
"""AsyncTask test cases."""
@@ -116,3 +118,140 @@ def test_force_run(self, mocker):
116118
assert on_stop.mock_calls == [mocker.call()]
117119
assert len(main_func.mock_calls) == 2
118120
assert not task.running()
121+
122+
123+
class AsyncTaskAsyncTests(object):
124+
"""AsyncTask test cases."""
125+
126+
@pytest.mark.asyncio
127+
async def test_default_task_flow(self, mocker):
128+
"""Test the default execution flow of an asynctask."""
129+
self.main_called = 0
130+
async def main_func():
131+
self.main_called += 1
132+
133+
self.init_called = 0
134+
async def on_init():
135+
self.init_called += 1
136+
137+
self.stop_called = 0
138+
async def on_stop():
139+
self.stop_called += 1
140+
141+
task = asynctask.AsyncTaskAsync(main_func, 0.5, on_init, on_stop)
142+
task.start()
143+
await asyncio.sleep(1)
144+
assert task.running()
145+
await task.stop(True)
146+
147+
assert 0 < self.main_called <= 2
148+
assert self.init_called == 1
149+
assert self.stop_called == 1
150+
assert not task.running()
151+
152+
@pytest.mark.asyncio
153+
async def test_main_exception_skips_iteration(self, mocker):
154+
"""Test that an exception in the main func only skips current iteration."""
155+
self.main_called = 0
156+
async def raise_exception():
157+
self.main_called += 1
158+
raise Exception('something')
159+
main_func = raise_exception
160+
161+
self.init_called = 0
162+
async def on_init():
163+
self.init_called += 1
164+
165+
self.stop_called = 0
166+
async def on_stop():
167+
self.stop_called += 1
168+
169+
task = asynctask.AsyncTaskAsync(main_func, 0.1, on_init, on_stop)
170+
task.start()
171+
await asyncio.sleep(1)
172+
assert task.running()
173+
await task.stop(True)
174+
175+
assert 9 <= self.main_called <= 10
176+
assert self.init_called == 1
177+
assert self.stop_called == 1
178+
assert not task.running()
179+
180+
@pytest.mark.asyncio
181+
async def test_on_init_failure_aborts_task(self, mocker):
182+
"""Test that if the on_init callback fails, the task never runs."""
183+
self.main_called = 0
184+
async def main_func():
185+
self.main_called += 1
186+
187+
self.init_called = 0
188+
async def on_init():
189+
self.init_called += 1
190+
raise Exception('something')
191+
192+
self.stop_called = 0
193+
async def on_stop():
194+
self.stop_called += 1
195+
196+
task = asynctask.AsyncTaskAsync(main_func, 0.1, on_init, on_stop)
197+
task.start()
198+
await asyncio.sleep(0.5)
199+
assert not task.running() # Since on_init fails, task never starts
200+
await task.stop(True)
201+
202+
assert self.init_called == 1
203+
assert self.stop_called == 1
204+
assert self.main_called == 0
205+
assert not task.running()
206+
207+
@pytest.mark.asyncio
208+
async def test_on_stop_failure_ends_gacefully(self, mocker):
209+
"""Test that if the on_init callback fails, the task never runs."""
210+
self.main_called = 0
211+
async def main_func():
212+
self.main_called += 1
213+
214+
self.init_called = 0
215+
async def on_init():
216+
self.init_called += 1
217+
218+
self.stop_called = 0
219+
async def on_stop():
220+
self.stop_called += 1
221+
raise Exception('something')
222+
223+
task = asynctask.AsyncTaskAsync(main_func, 0.1, on_init, on_stop)
224+
task.start()
225+
await asyncio.sleep(1)
226+
await task.stop(True)
227+
assert 9 <= self.main_called <= 10
228+
assert self.init_called == 1
229+
assert self.stop_called == 1
230+
231+
@pytest.mark.asyncio
232+
async def test_force_run(self, mocker):
233+
"""Test that if the on_init callback fails, the task never runs."""
234+
self.main_called = 0
235+
async def main_func():
236+
self.main_called += 1
237+
238+
self.init_called = 0
239+
async def on_init():
240+
self.init_called += 1
241+
242+
self.stop_called = 0
243+
async def on_stop():
244+
self.stop_called += 1
245+
246+
task = asynctask.AsyncTaskAsync(main_func, 5, on_init, on_stop)
247+
task.start()
248+
await asyncio.sleep(1)
249+
assert task.running()
250+
task.force_execution()
251+
task.force_execution()
252+
await task.stop(True)
253+
254+
assert self.main_called == 3
255+
assert self.init_called == 1
256+
assert self.stop_called == 1
257+
assert not task.running()

0 commit comments

Comments
 (0)