Skip to content

Commit 9a2a477

Browse files
authored
Merge pull request #430 from splitio/sync-tasks-split
added tasks.sync split async class
2 parents 6913a71 + d79646a commit 9a2a477

File tree

2 files changed

+164
-47
lines changed

2 files changed

+164
-47
lines changed

splitio/tasks/split_sync.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,36 @@
22

33
import logging
44
from splitio.tasks import BaseSynchronizationTask
5-
from splitio.tasks.util.asynctask import AsyncTask
5+
from splitio.tasks.util.asynctask import AsyncTask, AsyncTaskAsync
66

77

88
_LOGGER = logging.getLogger(__name__)
99

1010

11-
class SplitSynchronizationTask(BaseSynchronizationTask):
11+
class SplitSynchronizationTaskBase(BaseSynchronizationTask):
1212
"""Split Synchronization task class."""
13+
14+
def start(self):
15+
"""Start the task."""
16+
self._task.start()
17+
18+
def stop(self, event=None):
19+
"""Stop the task. Accept an optional event to set when the task has finished."""
20+
pass
21+
22+
def is_running(self):
23+
"""
24+
Return whether the task is running.
25+
26+
:return: True if the task is running. False otherwise.
27+
:rtype bool
28+
"""
29+
return self._task.running()
30+
31+
32+
class SplitSynchronizationTask(SplitSynchronizationTaskBase):
33+
"""Split Synchronization task class."""
34+
1335
def __init__(self, synchronize_splits, period):
1436
"""
1537
Class constructor.
@@ -22,19 +44,26 @@ def __init__(self, synchronize_splits, period):
2244
self._period = period
2345
self._task = AsyncTask(synchronize_splits, period, on_init=None)
2446

25-
def start(self):
26-
"""Start the task."""
27-
self._task.start()
28-
2947
def stop(self, event=None):
3048
"""Stop the task. Accept an optional event to set when the task has finished."""
3149
self._task.stop(event)
3250

33-
def is_running(self):
51+
52+
class SplitSynchronizationTaskAsync(SplitSynchronizationTaskBase):
53+
"""Split Synchronization async task class."""
54+
55+
def __init__(self, synchronize_splits, period):
3456
"""
35-
Return whether the task is running.
57+
Class constructor.
3658
37-
:return: True if the task is running. False otherwise.
38-
:rtype bool
59+
:param synchronize_splits: Handler
60+
:type synchronize_splits: func
61+
:param period: Period of task
62+
:type period: int
3963
"""
40-
return self._task.running()
64+
self._period = period
65+
self._task = AsyncTaskAsync(synchronize_splits, period, on_init=None)
66+
67+
async def stop(self, event=None):
68+
"""Stop the task. Accept an optional event to set when the task has finished."""
69+
await self._task.stop()

tests/tasks/test_split_sync.py

Lines changed: 124 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,50 @@
11
"""Split syncrhonization task test module."""
2-
32
import threading
43
import time
4+
import pytest
5+
56
from splitio.api import APIException
67
from splitio.api.commons import FetchOptions
78
from splitio.tasks import split_sync
89
from splitio.storage import SplitStorage
910
from splitio.models.splits import Split
10-
from splitio.sync.split import SplitSynchronizer
11+
from splitio.sync.split import SplitSynchronizer, SplitSynchronizerAsync
12+
from splitio.optional.loaders import asyncio
13+
14+
splits = [{
15+
'changeNumber': 123,
16+
'trafficTypeName': 'user',
17+
'name': 'some_name',
18+
'trafficAllocation': 100,
19+
'trafficAllocationSeed': 123456,
20+
'seed': 321654,
21+
'status': 'ACTIVE',
22+
'killed': False,
23+
'defaultTreatment': 'off',
24+
'algo': 2,
25+
'conditions': [
26+
{
27+
'partitions': [
28+
{'treatment': 'on', 'size': 50},
29+
{'treatment': 'off', 'size': 50}
30+
],
31+
'contitionType': 'WHITELIST',
32+
'label': 'some_label',
33+
'matcherGroup': {
34+
'matchers': [
35+
{
36+
'matcherType': 'WHITELIST',
37+
'whitelistMatcherData': {
38+
'whitelist': ['k1', 'k2', 'k3']
39+
},
40+
'negate': False,
41+
}
42+
],
43+
'combiner': 'AND'
44+
}
45+
}
46+
]
47+
}]
1148

1249

1350
class SplitSynchronizationTests(object):
@@ -26,40 +63,6 @@ def change_number_mock():
2663
storage.get_change_number.side_effect = change_number_mock
2764

2865
api = mocker.Mock()
29-
splits = [{
30-
'changeNumber': 123,
31-
'trafficTypeName': 'user',
32-
'name': 'some_name',
33-
'trafficAllocation': 100,
34-
'trafficAllocationSeed': 123456,
35-
'seed': 321654,
36-
'status': 'ACTIVE',
37-
'killed': False,
38-
'defaultTreatment': 'off',
39-
'algo': 2,
40-
'conditions': [
41-
{
42-
'partitions': [
43-
{'treatment': 'on', 'size': 50},
44-
{'treatment': 'off', 'size': 50}
45-
],
46-
'contitionType': 'WHITELIST',
47-
'label': 'some_label',
48-
'matcherGroup': {
49-
'matchers': [
50-
{
51-
'matcherType': 'WHITELIST',
52-
'whitelistMatcherData': {
53-
'whitelist': ['k1', 'k2', 'k3']
54-
},
55-
'negate': False,
56-
}
57-
],
58-
'combiner': 'AND'
59-
}
60-
}
61-
]
62-
}]
6366

6467
def get_changes(*args, **kwargs):
6568
get_changes.called += 1
@@ -120,3 +123,88 @@ def run(x):
120123
time.sleep(1)
121124
assert task.is_running()
122125
task.stop()
126+
127+
128+
class SplitSynchronizationAsyncTests(object):
129+
"""Split synchronization task async test cases."""
130+
131+
@pytest.mark.asyncio
132+
async def test_normal_operation(self, mocker):
133+
"""Test the normal operation flow."""
134+
storage = mocker.Mock(spec=SplitStorage)
135+
136+
async def change_number_mock():
137+
change_number_mock._calls += 1
138+
if change_number_mock._calls == 1:
139+
return -1
140+
return 123
141+
change_number_mock._calls = 0
142+
storage.get_change_number = change_number_mock
143+
144+
api = mocker.Mock()
145+
self.change_number = []
146+
self.fetch_options = []
147+
async def get_changes(change_number, fetch_options):
148+
self.change_number.append(change_number)
149+
self.fetch_options.append(fetch_options)
150+
get_changes.called += 1
151+
if get_changes.called == 1:
152+
return {
153+
'splits': splits,
154+
'since': -1,
155+
'till': 123
156+
}
157+
else:
158+
return {
159+
'splits': [],
160+
'since': 123,
161+
'till': 123
162+
}
163+
api.fetch_splits = get_changes
164+
get_changes.called = 0
165+
self.inserted_split = None
166+
async def put(split):
167+
self.inserted_split = split
168+
storage.put = put
169+
170+
fetch_options = FetchOptions(True)
171+
split_synchronizer = SplitSynchronizerAsync(api, storage)
172+
task = split_sync.SplitSynchronizationTaskAsync(split_synchronizer.synchronize_splits, 0.5)
173+
task.start()
174+
await asyncio.sleep(0.7)
175+
assert task.is_running()
176+
await task.stop()
177+
assert not task.is_running()
178+
assert (self.change_number[0], self.fetch_options[0]) == (-1, fetch_options)
179+
assert (self.change_number[1], self.fetch_options[1]) == (123, fetch_options)
180+
assert isinstance(self.inserted_split, Split)
181+
assert self.inserted_split.name == 'some_name'
182+
183+
@pytest.mark.asyncio
184+
async def test_that_errors_dont_stop_task(self, mocker):
185+
"""Test that if fetching splits fails at some_point, the task will continue running."""
186+
storage = mocker.Mock(spec=SplitStorage)
187+
api = mocker.Mock()
188+
189+
async def run(x):
190+
run._calls += 1
191+
if run._calls == 1:
192+
return {'splits': [], 'since': -1, 'till': -1}
193+
if run._calls == 2:
194+
return {'splits': [], 'since': -1, 'till': -1}
195+
raise APIException("something broke")
196+
run._calls = 0
197+
api.fetch_splits = run
198+
199+
async def get_change_number():
200+
return -1
201+
storage.get_change_number = get_change_number
202+
203+
split_synchronizer = SplitSynchronizerAsync(api, storage)
204+
task = split_sync.SplitSynchronizationTaskAsync(split_synchronizer.synchronize_splits, 0.5)
205+
task.start()
206+
await asyncio.sleep(0.1)
207+
assert task.is_running()
208+
await asyncio.sleep(1)
209+
assert task.is_running()
210+
await task.stop()

0 commit comments

Comments
 (0)