Skip to content

Commit 00a4bdf

Browse files
authored
Merge pull request #428 from splitio/async-tasks-events
added tasks.event_sync async class
2 parents c81f5ed + 79836d4 commit 00a4bdf

File tree

2 files changed

+92
-16
lines changed

2 files changed

+92
-16
lines changed

splitio/tasks/events_sync.py

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,39 @@
22
import logging
33

44
from splitio.tasks import BaseSynchronizationTask
5-
from splitio.tasks.util.asynctask import AsyncTask
5+
from splitio.tasks.util.asynctask import AsyncTask, AsyncTaskAsync
66

77

88
_LOGGER = logging.getLogger(__name__)
99

1010

11-
class EventsSyncTask(BaseSynchronizationTask):
11+
class EventsSyncTaskBase(BaseSynchronizationTask):
12+
"""Events synchronization task base uses an asynctask.AsyncTask to send events."""
13+
14+
def start(self):
15+
"""Start executing the events synchronization task."""
16+
self._task.start()
17+
18+
def stop(self, event=None):
19+
"""Stop executing the events synchronization task."""
20+
pass
21+
22+
def flush(self):
23+
"""Flush events in storage."""
24+
_LOGGER.debug('Forcing flush execution for events')
25+
self._task.force_execution()
26+
27+
def is_running(self):
28+
"""
29+
Return whether the task is running or not.
30+
31+
:return: True if the task is running. False otherwise.
32+
:rtype: bool
33+
"""
34+
return self._task.running()
35+
36+
37+
class EventsSyncTask(EventsSyncTaskBase):
1238
"""Events synchronization task uses an asynctask.AsyncTask to send events."""
1339

1440
def __init__(self, synchronize_events, period):
@@ -24,24 +50,27 @@ def __init__(self, synchronize_events, period):
2450
self._period = period
2551
self._task = AsyncTask(synchronize_events, self._period, on_stop=synchronize_events)
2652

27-
def start(self):
28-
"""Start executing the events synchronization task."""
29-
self._task.start()
30-
3153
def stop(self, event=None):
3254
"""Stop executing the events synchronization task."""
3355
self._task.stop(event)
3456

35-
def flush(self):
36-
"""Flush events in storage."""
37-
_LOGGER.debug('Forcing flush execution for events')
38-
self._task.force_execution()
3957

40-
def is_running(self):
58+
class EventsSyncTaskAsync(EventsSyncTaskBase):
59+
"""Events synchronization task uses an asynctask.AsyncTaskAsync to send events."""
60+
61+
def __init__(self, synchronize_events, period):
4162
"""
42-
Return whether the task is running or not.
63+
Class constructor.
64+
65+
:param synchronize_events: Events Api object to send data to the backend
66+
:type synchronize_events: splitio.api.events.EventsAPIAsync
67+
:param period: How many seconds to wait between subsequent event pushes to the BE.
68+
:type period: int
4369
44-
:return: True if the task is running. False otherwise.
45-
:rtype: bool
4670
"""
47-
return self._task.running()
71+
self._period = period
72+
self._task = AsyncTaskAsync(synchronize_events, self._period, on_stop=synchronize_events)
73+
74+
async def stop(self, event=None):
75+
"""Stop executing the events synchronization task."""
76+
await self._task.stop()

tests/tasks/test_events_sync.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22

33
import threading
44
import time
5+
import pytest
6+
57
from splitio.api.client import HttpResponse
68
from splitio.tasks import events_sync
79
from splitio.storage import EventStorage
810
from splitio.models.events import Event
911
from splitio.api.events import EventsAPI
10-
from splitio.sync.event import EventSynchronizer
12+
from splitio.sync.event import EventSynchronizer, EventSynchronizerAsync
13+
from splitio.optional.loaders import asyncio
1114

1215

1316
class EventsSyncTests(object):
@@ -40,3 +43,47 @@ def test_normal_operation(self, mocker):
4043
stop_event.wait(5)
4144
assert stop_event.is_set()
4245
assert len(api.flush_events.mock_calls) > calls_now
46+
47+
48+
class EventsSyncAsyncTests(object):
49+
"""Impressions Syncrhonization task async test cases."""
50+
51+
@pytest.mark.asyncio
52+
async def test_normal_operation(self, mocker):
53+
"""Test that the task works properly under normal circumstances."""
54+
self.events = [
55+
Event('key1', 'user', 'purchase', 5.3, 123456, None),
56+
Event('key2', 'user', 'purchase', 5.3, 123456, None),
57+
Event('key3', 'user', 'purchase', 5.3, 123456, None),
58+
Event('key4', 'user', 'purchase', 5.3, 123456, None),
59+
Event('key5', 'user', 'purchase', 5.3, 123456, None),
60+
]
61+
storage = mocker.Mock(spec=EventStorage)
62+
self.called = False
63+
async def pop_many(*args):
64+
self.called = True
65+
return self.events
66+
storage.pop_many = pop_many
67+
68+
api = mocker.Mock(spec=EventsAPI)
69+
self.flushed_events = None
70+
self.count = 0
71+
async def flush_events(events):
72+
self.count += 1
73+
self.flushed_events = events
74+
return HttpResponse(200, '', {})
75+
api.flush_events = flush_events
76+
77+
event_synchronizer = EventSynchronizerAsync(api, storage, 5)
78+
task = events_sync.EventsSyncTaskAsync(event_synchronizer.synchronize_events, 1)
79+
task.start()
80+
await asyncio.sleep(2)
81+
82+
assert task.is_running()
83+
assert self.called
84+
assert self.flushed_events == self.events
85+
86+
calls_now = self.count
87+
await task.stop()
88+
assert not task.is_running()
89+
assert self.count > calls_now

0 commit comments

Comments
 (0)