Skip to content

Commit ccdc9b6

Browse files
authored
Merge pull request #449 from splitio/asyncio-sync-pr
several code polishing
2 parents 7b9ad8a + c8fcadd commit ccdc9b6

File tree

14 files changed

+137
-57
lines changed

14 files changed

+137
-57
lines changed

splitio/optional/loaders.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import aiohttp
55
import aiofiles
6+
from aiohttp import ClientConnectionError
67
except ImportError:
78
def missing_asyncio_dependencies(*_, **__):
89
"""Fail if missing dependencies are used."""

splitio/push/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ async def update_workers_status(self, enabled):
331331
"""
332332
await self._processor.update_workers_status(enabled)
333333

334-
async def start(self):
334+
def start(self):
335335
"""Start a new connection if not already running."""
336336
if self._running:
337337
_LOGGER.warning('Push manager already has a connection running. Ignoring')

splitio/sync/manager.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,10 @@ class ManagerAsync(object): # pylint:disable=too-many-instance-attributes
142142

143143
_CENTINEL_EVENT = object()
144144

145-
def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, sdk_metadata, telemetry_runtime_producer, sse_url=None, client_key=None): # pylint:disable=too-many-arguments
145+
def __init__(self, synchronizer, auth_api, streaming_enabled, sdk_metadata, telemetry_runtime_producer, sse_url=None, client_key=None): # pylint:disable=too-many-arguments
146146
"""
147147
Construct Manager.
148148
149-
:param ready_flag: Flag to set when splits initial sync is complete.
150-
:type ready_flag: threading.Event
151-
152149
:param split_synchronizers: synchronizers for performing start/stop logic
153150
:type split_synchronizers: splitio.sync.synchronizer.Synchronizer
154151
@@ -168,7 +165,6 @@ def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, sdk_me
168165
:type client_key: str
169166
"""
170167
self._streaming_enabled = streaming_enabled
171-
self._ready_flag = ready_flag
172168
self._synchronizer = synchronizer
173169
self._telemetry_runtime_producer = telemetry_runtime_producer
174170
if self._streaming_enabled:
@@ -178,15 +174,10 @@ def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, sdk_me
178174
self._push = PushManagerAsync(auth_api, synchronizer, self._queue, sdk_metadata, telemetry_runtime_producer, sse_url, client_key)
179175
self._push_status_handler_task = None
180176

181-
def recreate(self):
182-
"""Recreate poolers for forked processes."""
183-
self._synchronizer._split_synchronizers._segment_sync.recreate()
184-
185177
async def start(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
186178
"""Start the SDK synchronization tasks."""
187179
try:
188180
await self._synchronizer.sync_all(max_retry_attempts)
189-
self._ready_flag.set()
190181
self._synchronizer.start_periodic_data_recording()
191182
if self._streaming_enabled:
192183
self._push_status_handler_task = asyncio.get_running_loop().create_task(self._streaming_feedback_handler())

splitio/sync/synchronizer.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -606,11 +606,10 @@ async def stop_periodic_data_recording(self, blocking):
606606
:type blocking: bool
607607
"""
608608
_LOGGER.debug('Stopping periodic data recording')
609+
stop_periodic_data_recording_task = asyncio.get_running_loop().create_task(self._stop_periodic_data_recording())
609610
if blocking:
610-
await self._stop_periodic_data_recording()
611+
await stop_periodic_data_recording_task
611612
_LOGGER.debug('all tasks finished successfully.')
612-
else:
613-
self.stop_periodic_data_recording_task = asyncio.get_running_loop().create_task(self._stop_periodic_data_recording)
614613

615614
async def _stop_periodic_data_recording(self):
616615
"""

splitio/tasks/events_sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,4 @@ def __init__(self, synchronize_events, period):
7373

7474
async def stop(self, event=None):
7575
"""Stop executing the events synchronization task."""
76-
await self._task.stop()
76+
await self._task.stop(True)

splitio/tasks/impressions_sync.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def __init__(self, synchronize_impressions, period):
7575

7676
async def stop(self, event=None):
7777
"""Stop executing the impressions synchronization task."""
78-
await self._task.stop()
78+
await self._task.stop(True)
7979

8080

8181
class ImpressionsCountSyncTaskBase(BaseSynchronizationTask):
@@ -136,6 +136,6 @@ def __init__(self, synchronize_counters):
136136
"""
137137
self._task = AsyncTaskAsync(synchronize_counters, self._PERIOD, on_stop=synchronize_counters)
138138

139-
async def stop(self, event=None):
139+
async def stop(self):
140140
"""Stop executing the impressions synchronization task."""
141-
await self._task.stop()
141+
await self._task.stop(True)

splitio/tasks/segment_sync.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,6 @@ def __init__(self, synchronize_segments, period):
6060
"""
6161
self._task = asynctask.AsyncTaskAsync(synchronize_segments, period, on_init=None)
6262

63-
async def stop(self, event=None):
63+
async def stop(self):
6464
"""Stop segment synchronization."""
65-
await self._task.stop(event)
65+
await self._task.stop(True)

splitio/tasks/split_sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,4 @@ def __init__(self, synchronize_splits, period):
6666

6767
async def stop(self, event=None):
6868
"""Stop the task. Accept an optional event to set when the task has finished."""
69-
await self._task.stop()
69+
await self._task.stop(True)

splitio/tasks/telemetry_sync.py

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,38 @@
22
import logging
33

44
from splitio.tasks import BaseSynchronizationTask
5-
from splitio.tasks.util.asynctask import AsyncTask
5+
from splitio.tasks.util.asynctask import AsyncTask, AsyncTaskAsync
66

77
_LOGGER = logging.getLogger(__name__)
88

9-
class TelemetrySyncTask(BaseSynchronizationTask):
10-
"""Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs."""
9+
class TelemetrySyncTaskBase(BaseSynchronizationTask):
10+
"""Telemetry synchronization task uses an asynctask.AsyncTask to send MTKs."""
11+
12+
def start(self):
13+
"""Start executing the telemetry synchronization task."""
14+
self._task.start()
15+
16+
def stop(self, event=None):
17+
"""Stop executing the unique telemetry synchronization task."""
18+
pass
19+
20+
def is_running(self):
21+
"""
22+
Return whether the task is running or not.
23+
24+
:return: True if the task is running. False otherwise.
25+
:rtype: bool
26+
"""
27+
return self._task.running()
28+
29+
def flush(self):
30+
"""Flush unique keys."""
31+
_LOGGER.debug('Forcing flush execution for telemetry')
32+
self._task.force_execution()
33+
34+
35+
class TelemetrySyncTask(TelemetrySyncTaskBase):
36+
"""Unique Telemetry task uses an asynctask.AsyncTask to send MTKs."""
1137

1238
def __init__(self, synchronize_telemetry, period):
1339
"""
@@ -22,24 +48,27 @@ def __init__(self, synchronize_telemetry, period):
2248
self._task = AsyncTask(synchronize_telemetry, period,
2349
on_stop=synchronize_telemetry)
2450

25-
def start(self):
26-
"""Start executing the telemetry synchronization task."""
27-
self._task.start()
28-
2951
def stop(self, event=None):
3052
"""Stop executing the unique telemetry synchronization task."""
3153
self._task.stop(event)
3254

33-
def is_running(self):
55+
56+
class TelemetrySyncTaskAsync(TelemetrySyncTaskBase):
57+
"""Telemetry synchronization task uses an asynctask.AsyncTask to send MTKs."""
58+
59+
def __init__(self, synchronize_telemetry, period):
3460
"""
35-
Return whether the task is running or not.
61+
Class constructor.
3662
37-
:return: True if the task is running. False otherwise.
38-
:rtype: bool
63+
:param synchronize_telemetry: sender
64+
:type synchronize_telemetry: func
65+
:param period: How many seconds to wait between subsequent unique keys pushes to the BE.
66+
:type period: int
3967
"""
40-
return self._task.running()
4168

42-
def flush(self):
43-
"""Flush unique keys."""
44-
_LOGGER.debug('Forcing flush execution for telemetry')
45-
self._task.force_execution()
69+
self._task = AsyncTaskAsync(synchronize_telemetry, period,
70+
on_stop=synchronize_telemetry)
71+
72+
async def stop(self):
73+
"""Stop executing the unique telemetry synchronization task."""
74+
await self._task.stop(True)

splitio/tasks/unique_keys_sync.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ def __init__(self, synchronize_unique_keys, period = _UNIQUE_KEYS_SYNC_PERIOD):
7171
self._task = AsyncTaskAsync(synchronize_unique_keys, period,
7272
on_stop=synchronize_unique_keys)
7373

74-
async def stop(self, event=None):
74+
async def stop(self):
7575
"""Stop executing the unique keys synchronization task."""
76-
await self._task.stop(event)
76+
await self._task.stop(True)
7777

7878

7979
class ClearFilterSyncTaskBase(BaseSynchronizationTask):
@@ -123,6 +123,6 @@ def __init__(self, clear_filter, period = _CLEAR_FILTER_SYNC_PERIOD):
123123
self._task = AsyncTaskAsync(clear_filter, period,
124124
on_stop=clear_filter)
125125

126-
async def stop(self, event=None):
126+
async def stop(self):
127127
"""Stop executing the unique keys synchronization task."""
128-
await self._task.stop(event)
128+
await self._task.stop(True)

0 commit comments

Comments
 (0)