Skip to content

Commit f6a8441

Browse files
committed
added sync segment local class
1 parent 9167276 commit f6a8441

File tree

2 files changed

+268
-30
lines changed

2 files changed

+268
-30
lines changed

splitio/sync/segment.py

Lines changed: 144 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from splitio.tasks.util import workerpool
99
from splitio.models import segments
1010
from splitio.util.backoff import Backoff
11+
from splitio.optional.loaders import asyncio, aiofiles
1112
from splitio.sync import util
1213

1314
_LOGGER = logging.getLogger(__name__)
@@ -195,27 +196,57 @@ def segment_exist_in_storage(self, segment_name):
195196
"""
196197
return self._segment_storage.get(segment_name) != None
197198

198-
class LocalSegmentSynchronizer(object):
199-
"""Localhost mode segment synchronizer."""
199+
class LocalSegmentSynchronizerBase(object):
200+
"""Localhost mode segment base synchronizer."""
200201

201202
_DEFAULT_SEGMENT_TILL = -1
202203

203-
def __init__(self, segment_folder, split_storage, segment_storage):
204+
def _sanitize_segment(self, parsed):
205+
"""
206+
Sanitize json elements.
207+
208+
:param parsed: segment dict
209+
:type parsed: Dict
210+
211+
:return: sanitized segment structure dict
212+
:rtype: Dict
213+
"""
214+
if 'name' not in parsed or parsed['name'] is None:
215+
_LOGGER.warning("Segment does not have [name] element, skipping")
216+
raise Exception("Segment does not have [name] element")
217+
if parsed['name'].strip() == '':
218+
_LOGGER.warning("Segment [name] element is blank, skipping")
219+
raise Exception("Segment [name] element is blank")
220+
221+
for element in [('till', -1, -1, None, None, [0]),
222+
('added', [], None, None, None, None),
223+
('removed', [], None, None, None, None)
224+
]:
225+
parsed = util._sanitize_object_element(parsed, 'segment', element[0], element[1], lower_value=element[2], upper_value=element[3], in_list=None, not_in_list=element[5])
226+
parsed = util._sanitize_object_element(parsed, 'segment', 'since', parsed['till'], -1, parsed['till'], None, [0])
227+
228+
return parsed
229+
230+
231+
class LocalSegmentSynchronizer(LocalSegmentSynchronizerBase):
232+
"""Localhost mode segment synchronizer."""
233+
234+
def __init__(self, segment_folder, feature_flag_storage, segment_storage):
204235
"""
205236
Class constructor.
206237
207238
:param segment_folder: patch to the segment folder
208239
:type segment_folder: str
209240
210-
:param split_storage: Feature flag Storage.
211-
:type split_storage: splitio.storage.InMemorySplitStorage
241+
:param feature_flag_storage: Feature flag Storage.
242+
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
212243
213244
:param segment_storage: Segment storage reference.
214245
:type segment_storage: splitio.storage.SegmentStorage
215246
216247
"""
217248
self._segment_folder = segment_folder
218-
self._split_storage = split_storage
249+
self._feature_flag_storage = feature_flag_storage
219250
self._segment_storage = segment_storage
220251
self._segment_sha = {}
221252

@@ -231,7 +262,7 @@ def synchronize_segments(self, segment_names = None):
231262
"""
232263
_LOGGER.info('Synchronizing segments now.')
233264
if segment_names is None:
234-
segment_names = self._split_storage.get_segment_names()
265+
segment_names = self._feature_flag_storage.get_segment_names()
235266

236267
return_flag = True
237268
for segment_name in segment_names:
@@ -295,33 +326,118 @@ def _read_segment_from_json_file(self, filename):
295326
except Exception as exc:
296327
raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc
297328

298-
def _sanitize_segment(self, parsed):
329+
def segment_exist_in_storage(self, segment_name):
299330
"""
300-
Sanitize json elements.
331+
Check if a segment exists in the storage
301332
302-
:param parsed: segment dict
303-
:type parsed: Dict
333+
:param segment_name: Name of the segment
334+
:type segment_name: str
304335
305-
:return: sanitized segment structure dict
306-
:rtype: Dict
336+
:return: True if segment exist. False otherwise.
337+
:rtype: bool
307338
"""
308-
if 'name' not in parsed or parsed['name'] is None:
309-
_LOGGER.warning("Segment does not have [name] element, skipping")
310-
raise Exception("Segment does not have [name] element")
311-
if parsed['name'].strip() == '':
312-
_LOGGER.warning("Segment [name] element is blank, skipping")
313-
raise Exception("Segment [name] element is blank")
339+
return self._segment_storage.get(segment_name) != None
314340

315-
for element in [('till', -1, -1, None, None, [0]),
316-
('added', [], None, None, None, None),
317-
('removed', [], None, None, None, None)
318-
]:
319-
parsed = util._sanitize_object_element(parsed, 'segment', element[0], element[1], lower_value=element[2], upper_value=element[3], in_list=None, not_in_list=element[5])
320-
parsed = util._sanitize_object_element(parsed, 'segment', 'since', parsed['till'], -1, parsed['till'], None, [0])
321341

322-
return parsed
342+
class LocalSegmentSynchronizerAsync(LocalSegmentSynchronizerBase):
343+
"""Localhost mode segment async synchronizer."""
344+
345+
def __init__(self, segment_folder, feature_flag_storage, segment_storage):
346+
"""
347+
Class constructor.
348+
349+
:param segment_folder: patch to the segment folder
350+
:type segment_folder: str
351+
352+
:param feature_flag_storage: Feature flag Storage.
353+
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
354+
355+
:param segment_storage: Segment storage reference.
356+
:type segment_storage: splitio.storage.SegmentStorage
323357
324-
def segment_exist_in_storage(self, segment_name):
358+
"""
359+
self._segment_folder = segment_folder
360+
self._feature_flag_storage = feature_flag_storage
361+
self._segment_storage = segment_storage
362+
self._segment_sha = {}
363+
364+
async def synchronize_segments(self, segment_names = None):
365+
"""
366+
Loop through given segment names and synchronize each one.
367+
368+
:param segment_names: Optional, array of segment names to update.
369+
:type segment_name: {str}
370+
371+
:return: True if no error occurs. False otherwise.
372+
:rtype: bool
373+
"""
374+
_LOGGER.info('Synchronizing segments now.')
375+
if segment_names is None:
376+
segment_names = await self._feature_flag_storage.get_segment_names()
377+
378+
return_flag = True
379+
for segment_name in segment_names:
380+
if not await self.synchronize_segment(segment_name):
381+
return_flag = False
382+
383+
return return_flag
384+
385+
async def synchronize_segment(self, segment_name, till=None):
386+
"""
387+
Update a segment from queue
388+
389+
:param segment_name: Name of the segment to update.
390+
:type segment_name: str
391+
392+
:param till: ChangeNumber received.
393+
:type till: int
394+
395+
:return: True if no error occurs. False otherwise.
396+
:rtype: bool
397+
"""
398+
try:
399+
fetched = await self._read_segment_from_json_file(segment_name)
400+
fetched_sha = util._get_sha(json.dumps(fetched))
401+
if not await self.segment_exist_in_storage(segment_name):
402+
self._segment_sha[segment_name] = fetched_sha
403+
await self._segment_storage.put(segments.from_raw(fetched))
404+
_LOGGER.debug("segment %s is added to storage", segment_name)
405+
return True
406+
407+
if fetched_sha == self._segment_sha[segment_name]:
408+
return True
409+
410+
self._segment_sha[segment_name] = fetched_sha
411+
if await self._segment_storage.get_change_number(segment_name) > fetched['till'] and fetched['till'] != self._DEFAULT_SEGMENT_TILL:
412+
return True
413+
414+
await self._segment_storage.update(segment_name, fetched['added'], fetched['removed'], fetched['till'])
415+
_LOGGER.debug("segment %s is updated", segment_name)
416+
except Exception as e:
417+
_LOGGER.error("Could not fetch segment: %s \n" + str(e), segment_name)
418+
return False
419+
420+
return True
421+
422+
async def _read_segment_from_json_file(self, filename):
423+
"""
424+
Parse a segment and store in segment storage.
425+
426+
:param filename: Path of the file containing Feature flag
427+
:type filename: str.
428+
429+
:return: Sanitized segment structure
430+
:rtype: Dict
431+
"""
432+
try:
433+
async with aiofiles.open(os.path.join(self._segment_folder, "%s.json" % filename), 'r') as flo:
434+
parsed = json.loads(await flo.read())
435+
santitized_segment = self._sanitize_segment(parsed)
436+
return santitized_segment
437+
except Exception as exc:
438+
raise ValueError("Error parsing file %s. Make sure it's readable." % filename) from exc
439+
440+
async def segment_exist_in_storage(self, segment_name):
325441
"""
326442
Check if a segment exists in the storage
327443
@@ -331,4 +447,4 @@ def segment_exist_in_storage(self, segment_name):
331447
:return: True if segment exist. False otherwise.
332448
:rtype: bool
333449
"""
334-
return self._segment_storage.get(segment_name) != None
450+
return await self._segment_storage.get(segment_name) != None

tests/sync/test_segments_synchronizer.py

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
from splitio.api import APIException
77
from splitio.api.commons import FetchOptions
88
from splitio.storage import SplitStorage, SegmentStorage
9-
from splitio.storage.inmemmory import InMemorySegmentStorage, InMemorySplitStorage
10-
from splitio.sync.segment import SegmentSynchronizer, LocalSegmentSynchronizer
9+
from splitio.storage.inmemmory import InMemorySegmentStorage, InMemorySegmentStorageAsync, InMemorySplitStorage, InMemorySplitStorageAsync
10+
from splitio.sync.segment import SegmentSynchronizer, LocalSegmentSynchronizer, LocalSegmentSynchronizerAsync
1111
from splitio.models.segments import Segment
12+
from splitio.optional.loaders import aiofiles
1213

1314
import pytest
1415

@@ -356,3 +357,124 @@ def test_json_elements_sanitization(self, mocker):
356357
segment3["till"] = 12
357358
segment2 = {"name": 'seg', "added": [], "removed": [], "since": 20, "till": 12}
358359
assert(segment_synchronizer._sanitize_segment(segment2) == segment3)
360+
361+
362+
class LocalSegmentsSynchronizerTests(object):
363+
"""Segments synchronizer test cases."""
364+
365+
@pytest.mark.asyncio
366+
async def test_synchronize_segments_error(self, mocker):
367+
"""On error."""
368+
split_storage = mocker.Mock(spec=SplitStorage)
369+
async def get_segment_names():
370+
return ['segmentA', 'segmentB', 'segmentC']
371+
split_storage.get_segment_names = get_segment_names
372+
373+
storage = mocker.Mock(spec=SegmentStorage)
374+
async def get_change_number():
375+
return -1
376+
storage.get_change_number = get_change_number
377+
378+
segments_synchronizer = LocalSegmentSynchronizerAsync('/,/,/invalid folder name/,/,/', split_storage, storage)
379+
assert not await segments_synchronizer.synchronize_segments()
380+
381+
@pytest.mark.asyncio
382+
async def test_synchronize_segments(self, mocker):
383+
"""Test the normal operation flow."""
384+
split_storage = mocker.Mock(spec=InMemorySplitStorage)
385+
async def get_segment_names():
386+
return ['segmentA', 'segmentB', 'segmentC']
387+
split_storage.get_segment_names = get_segment_names
388+
389+
storage = InMemorySegmentStorageAsync()
390+
391+
segment_a = {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [],
392+
'since': -1, 'till': 123}
393+
segment_b = {'name': 'segmentB', 'added': ['key4', 'key5', 'key6'], 'removed': [],
394+
'since': -1, 'till': 123}
395+
segment_c = {'name': 'segmentC', 'added': ['key7', 'key8', 'key9'], 'removed': [],
396+
'since': -1, 'till': 123}
397+
blank = {'added': [], 'removed': [], 'since': 123, 'till': 123}
398+
399+
async def read_segment_from_json_file(*args, **kwargs):
400+
if args[0] == 'segmentA':
401+
return segment_a
402+
if args[0] == 'segmentB':
403+
return segment_b
404+
if args[0] == 'segmentC':
405+
return segment_c
406+
return blank
407+
408+
segments_synchronizer = LocalSegmentSynchronizerAsync('segment_path', split_storage, storage)
409+
segments_synchronizer._read_segment_from_json_file = read_segment_from_json_file
410+
assert await segments_synchronizer.synchronize_segments()
411+
412+
segment = await storage.get('segmentA')
413+
assert segment.name == 'segmentA'
414+
assert segment.contains('key1')
415+
assert segment.contains('key2')
416+
assert segment.contains('key3')
417+
418+
segment = await storage.get('segmentB')
419+
assert segment.name == 'segmentB'
420+
assert segment.contains('key4')
421+
assert segment.contains('key5')
422+
assert segment.contains('key6')
423+
424+
segment = await storage.get('segmentC')
425+
assert segment.name == 'segmentC'
426+
assert segment.contains('key7')
427+
assert segment.contains('key8')
428+
assert segment.contains('key9')
429+
430+
# Should sync when changenumber is not changed
431+
segment_a['added'] = ['key111']
432+
await segments_synchronizer.synchronize_segments(['segmentA'])
433+
segment = await storage.get('segmentA')
434+
assert segment.contains('key111')
435+
436+
# Should not sync when changenumber below till
437+
segment_a['till'] = 122
438+
segment_a['added'] = ['key222']
439+
await segments_synchronizer.synchronize_segments(['segmentA'])
440+
segment = await storage.get('segmentA')
441+
assert not segment.contains('key222')
442+
443+
# Should sync when changenumber above till
444+
segment_a['till'] = 124
445+
await segments_synchronizer.synchronize_segments(['segmentA'])
446+
segment = await storage.get('segmentA')
447+
assert segment.contains('key222')
448+
449+
# Should sync when till is default (-1)
450+
segment_a['till'] = -1
451+
segment_a['added'] = ['key33']
452+
await segments_synchronizer.synchronize_segments(['segmentA'])
453+
segment = await storage.get('segmentA')
454+
assert segment.contains('key33')
455+
456+
# verify remove keys
457+
segment_a['added'] = []
458+
segment_a['removed'] = ['key111']
459+
segment_a['till'] = 125
460+
await segments_synchronizer.synchronize_segments(['segmentA'])
461+
segment = await storage.get('segmentA')
462+
assert not segment.contains('key111')
463+
464+
@pytest.mark.asyncio
465+
async def test_reading_json(self, mocker):
466+
"""Test reading json file."""
467+
async with aiofiles.open("./segmentA.json", "w") as f:
468+
await f.write('{"name": "segmentA", "added": ["key1", "key2", "key3"], "removed": [],"since": -1, "till": 123}')
469+
split_storage = mocker.Mock(spec=InMemorySplitStorageAsync)
470+
storage = InMemorySegmentStorageAsync()
471+
segments_synchronizer = LocalSegmentSynchronizerAsync('.', split_storage, storage)
472+
assert await segments_synchronizer.synchronize_segments(['segmentA'])
473+
474+
segment = await storage.get('segmentA')
475+
assert segment.name == 'segmentA'
476+
assert segment.contains('key1')
477+
assert segment.contains('key2')
478+
assert segment.contains('key3')
479+
480+
os.remove("./segmentA.json")

0 commit comments

Comments
 (0)