Skip to content

Commit c8fcadd

Browse files
committed
added telemetry sync task async class and tests
1 parent 6b7544e commit c8fcadd

File tree

3 files changed

+70
-7
lines changed

3 files changed

+70
-7
lines changed

splitio/tasks/telemetry_sync.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
_LOGGER = logging.getLogger(__name__)
88

99
class TelemetrySyncTaskBase(BaseSynchronizationTask):
10-
"""Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs."""
10+
"""Telemetry synchronization task uses an asynctask.AsyncTask to send MTKs."""
1111

1212
def start(self):
1313
"""Start executing the telemetry synchronization task."""
@@ -32,8 +32,8 @@ def flush(self):
3232
self._task.force_execution()
3333

3434

35-
class TelemetrySyncTask(BaseSynchronizationTask):
36-
"""Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs."""
35+
class TelemetrySyncTask(TelemetrySyncTaskBase):
36+
"""Unique Telemetry task uses an asynctask.AsyncTask to send MTKs."""
3737

3838
def __init__(self, synchronize_telemetry, period):
3939
"""
@@ -53,8 +53,8 @@ def stop(self, event=None):
5353
self._task.stop(event)
5454

5555

56-
class TelemetrySyncTaskAsync(BaseSynchronizationTask):
57-
"""Unique Keys synchronization task uses an asynctask.AsyncTask to send MTKs."""
56+
class TelemetrySyncTaskAsync(TelemetrySyncTaskBase):
57+
"""Telemetry synchronization task uses an asynctask.AsyncTask to send MTKs."""
5858

5959
def __init__(self, synchronize_telemetry, period):
6060
"""

splitio/tasks/util/asynctask.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def __init__(self, main, period, on_init=None, on_stop=None):
218218
self._period = period
219219
self._messages = asyncio.Queue()
220220
self._running = False
221-
self._completion_event = None
221+
self._completion_event = asyncio.Event()
222222

223223
async def _execution_wrapper(self):
224224
"""
@@ -284,7 +284,7 @@ def start(self):
284284
_LOGGER.warning("Task is already running. Ignoring .start() call")
285285
return
286286
# Start execution
287-
self._completion_event = asyncio.Event()
287+
self._completion_event.clear()
288288
asyncio.get_running_loop().create_task(self._execution_wrapper())
289289

290290
async def stop(self, wait_for_completion=False):

tests/tasks/test_telemetry_sync.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""Impressions synchronization task test module."""
2+
import pytest
3+
import threading
4+
import time
5+
from splitio.api.client import HttpResponse
6+
from splitio.tasks.telemetry_sync import TelemetrySyncTask, TelemetrySyncTaskAsync
7+
from splitio.api.telemetry import TelemetryAPI, TelemetryAPIAsync
8+
from splitio.sync.telemetry import TelemetrySynchronizer, TelemetrySynchronizerAsync, InMemoryTelemetrySubmitter, InMemoryTelemetrySubmitterAsync
9+
from splitio.storage.inmemmory import InMemoryTelemetryStorage, InMemoryTelemetryStorageAsync
10+
from splitio.engine.telemetry import TelemetryStorageConsumer, TelemetryStorageConsumerAsync
11+
from splitio.optional.loaders import asyncio
12+
13+
14+
class TelemetrySyncTaskTests(object):
15+
"""Unique Keys Syncrhonization task test cases."""
16+
17+
def test_record_stats(self, mocker):
18+
"""Test that the task works properly under normal circumstances."""
19+
api = mocker.Mock(spec=TelemetryAPI)
20+
api.record_stats.return_value = HttpResponse(200, '', {})
21+
telemetry_storage = InMemoryTelemetryStorage()
22+
telemetry_consumer = TelemetryStorageConsumer(telemetry_storage)
23+
24+
telemetry_synchronizer = TelemetrySynchronizer(InMemoryTelemetrySubmitter(telemetry_consumer, mocker.Mock(), mocker.Mock(),api))
25+
task = TelemetrySyncTask(telemetry_synchronizer.synchronize_stats, 1)
26+
task.start()
27+
time.sleep(2)
28+
assert task.is_running()
29+
assert len(api.record_stats.mock_calls) == 1
30+
stop_event = threading.Event()
31+
task.stop(stop_event)
32+
stop_event.wait(5)
33+
assert stop_event.is_set()
34+
35+
36+
class TelemetrySyncTaskAsyncTests(object):
37+
"""Unique Keys Syncrhonization task test cases."""
38+
39+
@pytest.mark.asyncio
40+
async def test_record_stats(self, mocker):
41+
"""Test that the task works properly under normal circumstances."""
42+
api = mocker.Mock(spec=TelemetryAPIAsync)
43+
self.called = False
44+
async def record_stats(stats):
45+
self.called = True
46+
return HttpResponse(200, '', {})
47+
api.record_stats = record_stats
48+
49+
telemetry_storage = await InMemoryTelemetryStorageAsync.create()
50+
telemetry_consumer = TelemetryStorageConsumerAsync(telemetry_storage)
51+
telemetry_submitter = InMemoryTelemetrySubmitterAsync(telemetry_consumer, mocker.Mock(), mocker.Mock(),api)
52+
async def _build_stats():
53+
return {}
54+
telemetry_submitter._build_stats = _build_stats
55+
56+
telemetry_synchronizer = TelemetrySynchronizerAsync(telemetry_submitter)
57+
task = TelemetrySyncTaskAsync(telemetry_synchronizer.synchronize_stats, 1)
58+
task.start()
59+
await asyncio.sleep(2)
60+
assert task.is_running()
61+
assert self.called
62+
await task.stop()
63+
assert not task.is_running()

0 commit comments

Comments
 (0)