Skip to content

Commit c81f5ed

Browse files
authored
Merge pull request #429 from splitio/async-takss-impressions
added tasks.sync imps async classes
2 parents 9a2a477 + d43296e commit c81f5ed

File tree

2 files changed

+171
-22
lines changed

2 files changed

+171
-22
lines changed

splitio/tasks/impressions_sync.py

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,39 @@
22
import logging
33

44
from splitio.tasks import BaseSynchronizationTask
5-
from splitio.tasks.util.asynctask import AsyncTask
5+
from splitio.tasks.util.asynctask import AsyncTask, AsyncTaskAsync
66

77

88
_LOGGER = logging.getLogger(__name__)
99

1010

11-
class ImpressionsSyncTask(BaseSynchronizationTask):
11+
class ImpressionsSyncTaskBase(BaseSynchronizationTask):
12+
"""Impressions synchronization task uses an asynctask.AsyncTask to send impressions."""
13+
14+
def start(self):
15+
"""Start executing the impressions synchronization task."""
16+
self._task.start()
17+
18+
def stop(self, event=None):
19+
"""Stop executing the impressions synchronization task."""
20+
pass
21+
22+
def is_running(self):
23+
"""
24+
Return whether the task is running or not.
25+
26+
:return: True if the task is running. False otherwise.
27+
:rtype: bool
28+
"""
29+
return self._task.running()
30+
31+
def flush(self):
32+
"""Flush impressions in storage."""
33+
_LOGGER.debug('Forcing flush execution for impressions')
34+
self._task.force_execution()
35+
36+
37+
class ImpressionsSyncTask(ImpressionsSyncTaskBase):
1238
"""Impressions synchronization task uses an asynctask.AsyncTask to send impressions."""
1339

1440
def __init__(self, synchronize_impressions, period):
@@ -25,13 +51,45 @@ def __init__(self, synchronize_impressions, period):
2551
self._task = AsyncTask(synchronize_impressions, self._period,
2652
on_stop=synchronize_impressions)
2753

54+
def stop(self, event=None):
55+
"""Stop executing the impressions synchronization task."""
56+
self._task.stop(event)
57+
58+
59+
class ImpressionsSyncTaskAsync(ImpressionsSyncTaskBase):
60+
"""Impressions synchronization task uses an asynctask.AsyncTask to send impressions."""
61+
62+
def __init__(self, synchronize_impressions, period):
63+
"""
64+
Class constructor.
65+
66+
:param synchronize_impressions: sender
67+
:type synchronize_impressions: func
68+
:param period: How many seconds to wait between subsequent impressions pushes to the BE.
69+
:type period: int
70+
71+
"""
72+
self._period = period
73+
self._task = AsyncTaskAsync(synchronize_impressions, self._period,
74+
on_stop=synchronize_impressions)
75+
76+
async def stop(self, event=None):
77+
"""Stop executing the impressions synchronization task."""
78+
await self._task.stop()
79+
80+
81+
class ImpressionsCountSyncTaskBase(BaseSynchronizationTask):
82+
"""Impressions synchronization task uses an asynctask.AsyncTask to send impressions."""
83+
84+
_PERIOD = 1800 # 30 * 60 # 30 minutes
85+
2886
def start(self):
2987
"""Start executing the impressions synchronization task."""
3088
self._task.start()
3189

3290
def stop(self, event=None):
3391
"""Stop executing the impressions synchronization task."""
34-
self._task.stop(event)
92+
pass
3593

3694
def is_running(self):
3795
"""
@@ -44,15 +102,12 @@ def is_running(self):
44102

45103
def flush(self):
46104
"""Flush impressions in storage."""
47-
_LOGGER.debug('Forcing flush execution for impressions')
48105
self._task.force_execution()
49106

50107

51-
class ImpressionsCountSyncTask(BaseSynchronizationTask):
108+
class ImpressionsCountSyncTask(ImpressionsCountSyncTaskBase):
52109
"""Impressions synchronization task uses an asynctask.AsyncTask to send impressions."""
53110

54-
_PERIOD = 1800 # 30 * 60 # 30 minutes
55-
56111
def __init__(self, synchronize_counters):
57112
"""
58113
Class constructor.
@@ -63,23 +118,24 @@ def __init__(self, synchronize_counters):
63118
"""
64119
self._task = AsyncTask(synchronize_counters, self._PERIOD, on_stop=synchronize_counters)
65120

66-
def start(self):
67-
"""Start executing the impressions synchronization task."""
68-
self._task.start()
69-
70121
def stop(self, event=None):
71122
"""Stop executing the impressions synchronization task."""
72123
self._task.stop(event)
73124

74-
def is_running(self):
125+
126+
class ImpressionsCountSyncTaskAsync(ImpressionsCountSyncTaskBase):
127+
"""Impressions synchronization task uses an asynctask.AsyncTask to send impressions."""
128+
129+
def __init__(self, synchronize_counters):
75130
"""
76-
Return whether the task is running or not.
131+
Class constructor.
132+
133+
:param synchronize_counters: Handler
134+
:type synchronize_counters: func
77135
78-
:return: True if the task is running. False otherwise.
79-
:rtype: bool
80136
"""
81-
return self._task.running()
137+
self._task = AsyncTaskAsync(synchronize_counters, self._PERIOD, on_stop=synchronize_counters)
82138

83-
def flush(self):
84-
"""Flush impressions in storage."""
85-
self._task.force_execution()
139+
async def stop(self, event=None):
140+
"""Stop executing the impressions synchronization task."""
141+
await self._task.stop()

tests/tasks/test_impressions_sync.py

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22

33
import threading
44
import time
5+
import pytest
6+
57
from splitio.api.client import HttpResponse
68
from splitio.tasks import impressions_sync
79
from splitio.storage import ImpressionStorage
810
from splitio.models.impressions import Impression
911
from splitio.api.impressions import ImpressionsAPI
10-
from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer
12+
from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer, ImpressionSynchronizerAsync, ImpressionsCountSynchronizerAsync
1113
from splitio.engine.impressions.manager import Counter
14+
from splitio.optional.loaders import asyncio
1215

13-
class ImpressionsSyncTests(object):
16+
class ImpressionsSyncTaskTests(object):
1417
"""Impressions Syncrhonization task test cases."""
1518

1619
def test_normal_operation(self, mocker):
@@ -44,7 +47,52 @@ def test_normal_operation(self, mocker):
4447
assert len(api.flush_impressions.mock_calls) > calls_now
4548

4649

47-
class ImpressionsCountSyncTests(object):
50+
class ImpressionsSyncTaskAsyncTests(object):
51+
"""Impressions Syncrhonization task test cases."""
52+
53+
@pytest.mark.asyncio
54+
async def test_normal_operation(self, mocker):
55+
"""Test that the task works properly under normal circumstances."""
56+
storage = mocker.Mock(spec=ImpressionStorage)
57+
impressions = [
58+
Impression('key1', 'split1', 'on', 'l1', 123456, 'b1', 321654),
59+
Impression('key2', 'split1', 'on', 'l1', 123456, 'b1', 321654),
60+
Impression('key3', 'split2', 'off', 'l1', 123456, 'b1', 321654),
61+
Impression('key4', 'split2', 'on', 'l1', 123456, 'b1', 321654),
62+
Impression('key5', 'split3', 'off', 'l1', 123456, 'b1', 321654)
63+
]
64+
self.pop_called = 0
65+
async def pop_many(*args):
66+
self.pop_called += 1
67+
return impressions
68+
storage.pop_many = pop_many
69+
70+
api = mocker.Mock(spec=ImpressionsAPI)
71+
self.flushed = None
72+
self.called = 0
73+
async def flush_impressions(imps):
74+
self.called += 1
75+
self.flushed = imps
76+
return HttpResponse(200, '', {})
77+
api.flush_impressions = flush_impressions
78+
79+
impression_synchronizer = ImpressionSynchronizerAsync(api, storage, 5)
80+
task = impressions_sync.ImpressionsSyncTaskAsync(
81+
impression_synchronizer.synchronize_impressions,
82+
1
83+
)
84+
task.start()
85+
await asyncio.sleep(2)
86+
assert task.is_running()
87+
assert self.pop_called == 1
88+
assert self.flushed == impressions
89+
90+
calls_now = self.called
91+
await task.stop()
92+
assert self.called > calls_now
93+
94+
95+
class ImpressionsCountSyncTaskTests(object):
4896
"""Impressions Syncrhonization task test cases."""
4997

5098
def test_normal_operation(self, mocker):
@@ -77,3 +125,48 @@ def test_normal_operation(self, mocker):
77125
stop_event.wait(5)
78126
assert stop_event.is_set()
79127
assert len(api.flush_counters.mock_calls) > calls_now
128+
129+
130+
class ImpressionsCountSyncTaskAsyncTests(object):
131+
"""Impressions Syncrhonization task test cases."""
132+
133+
@pytest.mark.asyncio
134+
async def test_normal_operation(self, mocker):
135+
"""Test that the task works properly under normal circumstances."""
136+
counter = mocker.Mock(spec=Counter)
137+
counters = [
138+
Counter.CountPerFeature('f1', 123, 2),
139+
Counter.CountPerFeature('f2', 123, 123),
140+
Counter.CountPerFeature('f1', 456, 111),
141+
Counter.CountPerFeature('f2', 456, 222)
142+
]
143+
self._pop_called = 0
144+
async def pop_all():
145+
self._pop_called += 1
146+
return counters
147+
counter.pop_all = pop_all
148+
149+
api = mocker.Mock(spec=ImpressionsAPI)
150+
self.flushed = None
151+
self.called = 0
152+
async def flush_counters(imps):
153+
self.called += 1
154+
self.flushed = imps
155+
return HttpResponse(200, '', {})
156+
api.flush_counters = flush_counters
157+
158+
impressions_sync.ImpressionsCountSyncTaskAsync._PERIOD = 1
159+
impression_synchronizer = ImpressionsCountSynchronizerAsync(api, counter)
160+
task = impressions_sync.ImpressionsCountSyncTaskAsync(
161+
impression_synchronizer.synchronize_counters
162+
)
163+
task.start()
164+
await asyncio.sleep(2)
165+
assert task.is_running()
166+
167+
assert self._pop_called == 1
168+
assert self.flushed == counters
169+
170+
calls_now = self.called
171+
await task.stop()
172+
assert self.called > calls_now

0 commit comments

Comments
 (0)