Skip to content

Commit 6b7544e

Browse files
committed
Forced async tasks to wait for completion
1 parent 41879f1 commit 6b7544e

File tree

6 files changed

+54
-25
lines changed

6 files changed

+54
-25
lines changed

splitio/tasks/events_sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,4 @@ def __init__(self, synchronize_events, period):
7373

7474
async def stop(self, event=None):
7575
"""Stop executing the events synchronization task."""
76-
await self._task.stop()
76+
await self._task.stop(True)

splitio/tasks/impressions_sync.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def __init__(self, synchronize_impressions, period):
7575

7676
async def stop(self, event=None):
7777
"""Stop executing the impressions synchronization task."""
78-
await self._task.stop()
78+
await self._task.stop(True)
7979

8080

8181
class ImpressionsCountSyncTaskBase(BaseSynchronizationTask):
@@ -136,6 +136,6 @@ def __init__(self, synchronize_counters):
136136
"""
137137
self._task = AsyncTaskAsync(synchronize_counters, self._PERIOD, on_stop=synchronize_counters)
138138

139-
async def stop(self, event=None):
139+
async def stop(self):
140140
"""Stop executing the impressions synchronization task."""
141-
await self._task.stop()
141+
await self._task.stop(True)

splitio/tasks/segment_sync.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,6 @@ def __init__(self, synchronize_segments, period):
6060
"""
6161
self._task = asynctask.AsyncTaskAsync(synchronize_segments, period, on_init=None)
6262

63-
async def stop(self, event=None):
63+
async def stop(self):
6464
"""Stop segment synchronization."""
65-
await self._task.stop(event)
65+
await self._task.stop(True)

splitio/tasks/split_sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,4 @@ def __init__(self, synchronize_splits, period):
6666

6767
async def stop(self, event=None):
6868
"""Stop the task. Accept an optional event to set when the task has finished."""
69-
await self._task.stop()
69+
await self._task.stop(True)

splitio/tasks/telemetry_sync.py

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,36 @@
22
import logging
33

44
from splitio.tasks import BaseSynchronizationTask
5-
from splitio.tasks.util.asynctask import AsyncTask
5+
from splitio.tasks.util.asynctask import AsyncTask, AsyncTaskAsync
66

77
_LOGGER = logging.getLogger(__name__)
88

9+
class TelemetrySyncTaskBase(BaseSynchronizationTask):
10+
"""Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs."""
11+
12+
def start(self):
13+
"""Start executing the telemetry synchronization task."""
14+
self._task.start()
15+
16+
def stop(self, event=None):
17+
"""Stop executing the unique telemetry synchronization task."""
18+
pass
19+
20+
def is_running(self):
21+
"""
22+
Return whether the task is running or not.
23+
24+
:return: True if the task is running. False otherwise.
25+
:rtype: bool
26+
"""
27+
return self._task.running()
28+
29+
def flush(self):
30+
"""Flush unique keys."""
31+
_LOGGER.debug('Forcing flush execution for telemetry')
32+
self._task.force_execution()
33+
34+
935
class TelemetrySyncTask(BaseSynchronizationTask):
1036
"""Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs."""
1137

@@ -22,24 +48,27 @@ def __init__(self, synchronize_telemetry, period):
2248
self._task = AsyncTask(synchronize_telemetry, period,
2349
on_stop=synchronize_telemetry)
2450

25-
def start(self):
26-
"""Start executing the telemetry synchronization task."""
27-
self._task.start()
28-
2951
def stop(self, event=None):
3052
"""Stop executing the unique telemetry synchronization task."""
3153
self._task.stop(event)
3254

33-
def is_running(self):
55+
56+
class TelemetrySyncTaskAsync(BaseSynchronizationTask):
57+
"""Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs."""
58+
59+
def __init__(self, synchronize_telemetry, period):
3460
"""
35-
Return whether the task is running or not.
61+
Class constructor.
3662
37-
:return: True if the task is running. False otherwise.
38-
:rtype: bool
63+
:param synchronize_telemetry: sender
64+
:type synchronize_telemetry: func
65+
:param period: How many seconds to wait between subsequent unique keys pushes to the BE.
66+
:type period: int
3967
"""
40-
return self._task.running()
4168

42-
def flush(self):
43-
"""Flush unique keys."""
44-
_LOGGER.debug('Forcing flush execution for telemetry')
45-
self._task.force_execution()
69+
self._task = AsyncTaskAsync(synchronize_telemetry, period,
70+
on_stop=synchronize_telemetry)
71+
72+
async def stop(self):
73+
"""Stop executing the unique telemetry synchronization task."""
74+
await self._task.stop(True)

splitio/tasks/unique_keys_sync.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ def __init__(self, synchronize_unique_keys, period = _UNIQUE_KEYS_SYNC_PERIOD):
7171
self._task = AsyncTaskAsync(synchronize_unique_keys, period,
7272
on_stop=synchronize_unique_keys)
7373

74-
async def stop(self, event=None):
74+
async def stop(self):
7575
"""Stop executing the unique keys synchronization task."""
76-
await self._task.stop(event)
76+
await self._task.stop(True)
7777

7878

7979
class ClearFilterSyncTaskBase(BaseSynchronizationTask):
@@ -123,6 +123,6 @@ def __init__(self, clear_filter, period = _CLEAR_FILTER_SYNC_PERIOD):
123123
self._task = AsyncTaskAsync(clear_filter, period,
124124
on_stop=clear_filter)
125125

126-
async def stop(self, event=None):
126+
async def stop(self):
127127
"""Stop executing the unique keys synchronization task."""
128-
await self._task.stop(event)
128+
await self._task.stop(True)

0 commit comments

Comments
 (0)