Skip to content

Commit 6913a71

Browse files
authored
Merge pull request #431 from splitio/async-tasks-segment
added tasks.sync segment async class
2 parents 3355508 + b25da23 commit 6913a71

File tree

2 files changed

+293
-13
lines changed

2 files changed

+293
-13
lines changed

splitio/tasks/segment_sync.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,28 @@
88
_LOGGER = logging.getLogger(__name__)
99

1010

11-
class SegmentSynchronizationTask(BaseSynchronizationTask):
11+
class SegmentSynchronizationTaskBase(BaseSynchronizationTask):
12+
"""Segment Syncrhonization base class."""
13+
14+
def start(self):
15+
"""Start segment synchronization."""
16+
self._task.start()
17+
18+
def stop(self, event=None):
19+
"""Stop segment synchronization."""
20+
pass
21+
22+
def is_running(self):
23+
"""
24+
Return whether the task is running or not.
25+
26+
:return: True if the task is running. False otherwise.
27+
:rtype: bool
28+
"""
29+
return self._task.running()
30+
31+
32+
class SegmentSynchronizationTask(SegmentSynchronizationTaskBase):
1233
"""Segment Syncrhonization class."""
1334

1435
def __init__(self, synchronize_segments, period):
@@ -21,19 +42,24 @@ def __init__(self, synchronize_segments, period):
2142
"""
2243
self._task = asynctask.AsyncTask(synchronize_segments, period, on_init=None)
2344

24-
def start(self):
25-
"""Start segment synchronization."""
26-
self._task.start()
27-
2845
def stop(self, event=None):
2946
"""Stop segment synchronization."""
3047
self._task.stop(event)
3148

32-
def is_running(self):
49+
50+
class SegmentSynchronizationTaskAsync(SegmentSynchronizationTaskBase):
51+
"""Segment Syncrhonization async class."""
52+
53+
def __init__(self, synchronize_segments, period):
3354
"""
34-
Return whether the task is running or not.
55+
Clas constructor.
56+
57+
:param synchronize_segments: handler for syncing segments
58+
:type synchronize_segments: func
3559
36-
:return: True if the task is running. False otherwise.
37-
:rtype: bool
3860
"""
39-
return self._task.running()
61+
self._task = asynctask.AsyncTaskAsync(synchronize_segments, period, on_init=None)
62+
63+
async def stop(self, event=None):
64+
"""Stop segment synchronization."""
65+
await self._task.stop(event)

tests/tasks/test_segment_sync.py

Lines changed: 257 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22

33
import threading
44
import time
5+
import pytest
6+
57
from splitio.api.commons import FetchOptions
68
from splitio.tasks import segment_sync
79
from splitio.storage import SegmentStorage, SplitStorage
810
from splitio.models.splits import Split
911
from splitio.models.segments import Segment
1012
from splitio.models.grammar.condition import Condition
1113
from splitio.models.grammar.matchers import UserDefinedSegmentMatcher
12-
from splitio.sync.segment import SegmentSynchronizer
13-
14+
from splitio.sync.segment import SegmentSynchronizer, SegmentSynchronizerAsync
15+
from splitio.optional.loaders import asyncio
1416

1517
class SegmentSynchronizationTests(object):
1618
"""Split synchronization task test cases."""
@@ -95,4 +97,256 @@ def fetch_segment_mock(segment_name, change_number, fetch_options):
9597

9698
def test_that_errors_dont_stop_task(self, mocker):
9799
"""Test that if fetching segments fails at some_point, the task will continue running."""
98-
# TODO!
100+
split_storage = mocker.Mock(spec=SplitStorage)
101+
split_storage.get_segment_names.return_value = ['segmentA', 'segmentB', 'segmentC']
102+
103+
# Setup a mocked segment storage whose changenumber returns -1 on first fetch and
104+
# 123 afterwards.
105+
storage = mocker.Mock(spec=SegmentStorage)
106+
107+
def change_number_mock(segment_name):
108+
if segment_name == 'segmentA' and change_number_mock._count_a == 0:
109+
change_number_mock._count_a = 1
110+
return -1
111+
if segment_name == 'segmentB' and change_number_mock._count_b == 0:
112+
change_number_mock._count_b = 1
113+
return -1
114+
if segment_name == 'segmentC' and change_number_mock._count_c == 0:
115+
change_number_mock._count_c = 1
116+
return -1
117+
return 123
118+
change_number_mock._count_a = 0
119+
change_number_mock._count_b = 0
120+
change_number_mock._count_c = 0
121+
storage.get_change_number.side_effect = change_number_mock
122+
123+
# Setup a mocked segment api to return segments mentioned before.
124+
def fetch_segment_mock(segment_name, change_number, fetch_options):
125+
if segment_name == 'segmentA' and fetch_segment_mock._count_a == 0:
126+
fetch_segment_mock._count_a = 1
127+
return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [],
128+
'since': -1, 'till': 123}
129+
if segment_name == 'segmentB' and fetch_segment_mock._count_b == 0:
130+
fetch_segment_mock._count_b = 1
131+
raise Exception("some exception")
132+
if segment_name == 'segmentC' and fetch_segment_mock._count_c == 0:
133+
fetch_segment_mock._count_c = 1
134+
return {'name': 'segmentC', 'added': ['key7', 'key8', 'key9'], 'removed': [],
135+
'since': -1, 'till': 123}
136+
return {'added': [], 'removed': [], 'since': 123, 'till': 123}
137+
fetch_segment_mock._count_a = 0
138+
fetch_segment_mock._count_b = 0
139+
fetch_segment_mock._count_c = 0
140+
141+
api = mocker.Mock()
142+
fetch_options = FetchOptions(True)
143+
api.fetch_segment.side_effect = fetch_segment_mock
144+
145+
segments_synchronizer = SegmentSynchronizer(api, split_storage, storage)
146+
task = segment_sync.SegmentSynchronizationTask(segments_synchronizer.synchronize_segments,
147+
0.5)
148+
task.start()
149+
time.sleep(0.7)
150+
151+
assert task.is_running()
152+
153+
stop_event = threading.Event()
154+
task.stop(stop_event)
155+
stop_event.wait()
156+
assert not task.is_running()
157+
158+
api_calls = [call for call in api.fetch_segment.mock_calls]
159+
assert mocker.call('segmentA', -1, fetch_options) in api_calls
160+
assert mocker.call('segmentB', -1, fetch_options) in api_calls
161+
assert mocker.call('segmentC', -1, fetch_options) in api_calls
162+
assert mocker.call('segmentA', 123, fetch_options) in api_calls
163+
assert mocker.call('segmentC', 123, fetch_options) in api_calls
164+
165+
segment_put_calls = storage.put.mock_calls
166+
segments_to_validate = set(['segmentA', 'segmentB', 'segmentC'])
167+
for call in segment_put_calls:
168+
_, positional_args, _ = call
169+
segment = positional_args[0]
170+
assert isinstance(segment, Segment)
171+
assert segment.name in segments_to_validate
172+
segments_to_validate.remove(segment.name)
173+
174+
175+
class SegmentSynchronizationAsyncTests(object):
176+
"""Split synchronization async task test cases."""
177+
178+
@pytest.mark.asyncio
179+
async def test_normal_operation(self, mocker):
180+
"""Test the normal operation flow."""
181+
split_storage = mocker.Mock(spec=SplitStorage)
182+
async def get_segment_names():
183+
return ['segmentA', 'segmentB', 'segmentC']
184+
split_storage.get_segment_names = get_segment_names
185+
186+
# Setup a mocked segment storage whose changenumber returns -1 on first fetch and
187+
# 123 afterwards.
188+
storage = mocker.Mock(spec=SegmentStorage)
189+
190+
async def change_number_mock(segment_name):
191+
if segment_name == 'segmentA' and change_number_mock._count_a == 0:
192+
change_number_mock._count_a = 1
193+
return -1
194+
if segment_name == 'segmentB' and change_number_mock._count_b == 0:
195+
change_number_mock._count_b = 1
196+
return -1
197+
if segment_name == 'segmentC' and change_number_mock._count_c == 0:
198+
change_number_mock._count_c = 1
199+
return -1
200+
return 123
201+
change_number_mock._count_a = 0
202+
change_number_mock._count_b = 0
203+
change_number_mock._count_c = 0
204+
storage.get_change_number = change_number_mock
205+
206+
self.segments = []
207+
async def put(segment):
208+
self.segments.append(segment)
209+
storage.put = put
210+
211+
async def update(*arg):
212+
pass
213+
storage.update = update
214+
215+
# Setup a mocked segment api to return segments mentioned before.
216+
self.segment_name = []
217+
self.change_number = []
218+
self.fetch_options = []
219+
async def fetch_segment_mock(segment_name, change_number, fetch_options):
220+
self.segment_name.append(segment_name)
221+
self.change_number.append(change_number)
222+
self.fetch_options.append(fetch_options)
223+
if segment_name == 'segmentA' and fetch_segment_mock._count_a == 0:
224+
fetch_segment_mock._count_a = 1
225+
return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [],
226+
'since': -1, 'till': 123}
227+
if segment_name == 'segmentB' and fetch_segment_mock._count_b == 0:
228+
fetch_segment_mock._count_b = 1
229+
return {'name': 'segmentB', 'added': ['key4', 'key5', 'key6'], 'removed': [],
230+
'since': -1, 'till': 123}
231+
if segment_name == 'segmentC' and fetch_segment_mock._count_c == 0:
232+
fetch_segment_mock._count_c = 1
233+
return {'name': 'segmentC', 'added': ['key7', 'key8', 'key9'], 'removed': [],
234+
'since': -1, 'till': 123}
235+
return {'added': [], 'removed': [], 'since': 123, 'till': 123}
236+
fetch_segment_mock._count_a = 0
237+
fetch_segment_mock._count_b = 0
238+
fetch_segment_mock._count_c = 0
239+
240+
api = mocker.Mock()
241+
fetch_options = FetchOptions(True)
242+
api.fetch_segment = fetch_segment_mock
243+
244+
segments_synchronizer = SegmentSynchronizerAsync(api, split_storage, storage)
245+
task = segment_sync.SegmentSynchronizationTaskAsync(segments_synchronizer.synchronize_segments,
246+
0.5)
247+
task.start()
248+
await asyncio.sleep(0.7)
249+
assert task.is_running()
250+
251+
await task.stop()
252+
assert not task.is_running()
253+
254+
assert (self.segment_name[0], self.change_number[0], self.fetch_options[0]) == ('segmentA', -1, fetch_options)
255+
assert (self.segment_name[1], self.change_number[1], self.fetch_options[1]) == ('segmentA', 123, fetch_options)
256+
assert (self.segment_name[2], self.change_number[2], self.fetch_options[2]) == ('segmentB', -1, fetch_options)
257+
assert (self.segment_name[3], self.change_number[3], self.fetch_options[3]) == ('segmentB', 123, fetch_options)
258+
assert (self.segment_name[4], self.change_number[4], self.fetch_options[4]) == ('segmentC', -1, fetch_options)
259+
assert (self.segment_name[5], self.change_number[5], self.fetch_options[5]) == ('segmentC', 123, fetch_options)
260+
261+
segments_to_validate = set(['segmentA', 'segmentB', 'segmentC'])
262+
for segment in self.segments:
263+
assert isinstance(segment, Segment)
264+
assert segment.name in segments_to_validate
265+
segments_to_validate.remove(segment.name)
266+
267+
@pytest.mark.asyncio
268+
async def test_that_errors_dont_stop_task(self, mocker):
269+
"""Test that if fetching segments fails at some_point, the task will continue running."""
270+
split_storage = mocker.Mock(spec=SplitStorage)
271+
async def get_segment_names():
272+
return ['segmentA', 'segmentB', 'segmentC']
273+
split_storage.get_segment_names = get_segment_names
274+
275+
# Setup a mocked segment storage whose changenumber returns -1 on first fetch and
276+
# 123 afterwards.
277+
storage = mocker.Mock(spec=SegmentStorage)
278+
279+
async def change_number_mock(segment_name):
280+
if segment_name == 'segmentA' and change_number_mock._count_a == 0:
281+
change_number_mock._count_a = 1
282+
return -1
283+
if segment_name == 'segmentB' and change_number_mock._count_b == 0:
284+
change_number_mock._count_b = 1
285+
return -1
286+
if segment_name == 'segmentC' and change_number_mock._count_c == 0:
287+
change_number_mock._count_c = 1
288+
return -1
289+
return 123
290+
change_number_mock._count_a = 0
291+
change_number_mock._count_b = 0
292+
change_number_mock._count_c = 0
293+
storage.get_change_number = change_number_mock
294+
295+
self.segments = []
296+
async def put(segment):
297+
self.segments.append(segment)
298+
storage.put = put
299+
300+
async def update(*arg):
301+
pass
302+
storage.update = update
303+
304+
# Setup a mocked segment api to return segments mentioned before.
305+
self.segment_name = []
306+
self.change_number = []
307+
self.fetch_options = []
308+
async def fetch_segment_mock(segment_name, change_number, fetch_options):
309+
self.segment_name.append(segment_name)
310+
self.change_number.append(change_number)
311+
self.fetch_options.append(fetch_options)
312+
if segment_name == 'segmentA' and fetch_segment_mock._count_a == 0:
313+
fetch_segment_mock._count_a = 1
314+
return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [],
315+
'since': -1, 'till': 123}
316+
if segment_name == 'segmentB' and fetch_segment_mock._count_b == 0:
317+
fetch_segment_mock._count_b = 1
318+
raise Exception("some exception")
319+
if segment_name == 'segmentC' and fetch_segment_mock._count_c == 0:
320+
fetch_segment_mock._count_c = 1
321+
return {'name': 'segmentC', 'added': ['key7', 'key8', 'key9'], 'removed': [],
322+
'since': -1, 'till': 123}
323+
return {'added': [], 'removed': [], 'since': 123, 'till': 123}
324+
fetch_segment_mock._count_a = 0
325+
fetch_segment_mock._count_b = 0
326+
fetch_segment_mock._count_c = 0
327+
328+
api = mocker.Mock()
329+
fetch_options = FetchOptions(True)
330+
api.fetch_segment = fetch_segment_mock
331+
332+
segments_synchronizer = SegmentSynchronizerAsync(api, split_storage, storage)
333+
task = segment_sync.SegmentSynchronizationTaskAsync(segments_synchronizer.synchronize_segments,
334+
0.5)
335+
task.start()
336+
await asyncio.sleep(0.7)
337+
assert task.is_running()
338+
339+
await task.stop()
340+
assert not task.is_running()
341+
342+
assert (self.segment_name[0], self.change_number[0], self.fetch_options[0]) == ('segmentA', -1, fetch_options)
343+
assert (self.segment_name[1], self.change_number[1], self.fetch_options[1]) == ('segmentA', 123, fetch_options)
344+
assert (self.segment_name[2], self.change_number[2], self.fetch_options[2]) == ('segmentB', -1, fetch_options)
345+
assert (self.segment_name[3], self.change_number[3], self.fetch_options[3]) == ('segmentC', -1, fetch_options)
346+
assert (self.segment_name[4], self.change_number[4], self.fetch_options[4]) == ('segmentC', 123, fetch_options)
347+
348+
segments_to_validate = set(['segmentA', 'segmentB', 'segmentC'])
349+
for segment in self.segments:
350+
assert isinstance(segment, Segment)
351+
assert segment.name in segments_to_validate
352+
segments_to_validate.remove(segment.name)

0 commit comments

Comments
 (0)