Skip to content

Commit 335ffe8

Browse files
committed
updated sync.split, sync.synchronizer and tasks.util.asynctask classes
1 parent 14a7266 commit 335ffe8

File tree

3 files changed

+67
-69
lines changed

3 files changed

+67
-69
lines changed

splitio/sync/split.py

Lines changed: 53 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010

1111
from splitio.api import APIException
1212
from splitio.api.commons import FetchOptions
13+
from splitio.client.input_validator import validate_flag_sets
1314
from splitio.models import splits
1415
from splitio.util.backoff import Backoff
1516
from splitio.util.time import get_current_epoch_time_ms
17+
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async
1618
from splitio.sync import util
1719
from splitio.optional.loaders import asyncio, aiofiles
1820

@@ -28,7 +30,7 @@
2830
_ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10
2931

3032

31-
class SplitSynchronizer(object):
33+
class SplitSynchronizerBase(object):
3234
"""Feature Flag changes synchronizer."""
3335

3436
def __init__(self, feature_flag_api, feature_flag_storage):
@@ -52,6 +54,31 @@ def feature_flag_storage(self):
5254
"""Return Feature_flag storage object"""
5355
return self._feature_flag_storage
5456

57+
def _get_config_sets(self):
58+
"""
59+
Get all filter flag sets cnverrted to string, if no filter flagsets exist return None
60+
:return: string with flagsets
61+
:rtype: str
62+
"""
63+
if self._feature_flag_storage.flag_set_filter.flag_sets == set({}):
64+
return None
65+
return ','.join(self._feature_flag_storage.flag_set_filter.sorted_flag_sets)
66+
67+
class SplitSynchronizer(SplitSynchronizerBase):
68+
"""Feature Flag changes synchronizer."""
69+
70+
def __init__(self, feature_flag_api, feature_flag_storage):
71+
"""
72+
Class constructor.
73+
74+
:param feature_flag_api: Feature Flag API Client.
75+
:type feature_flag_api: splitio.api.splits.SplitsAPI
76+
77+
:param feature_flag_storage: Feature Flag Storage.
78+
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
79+
"""
80+
super().__init__(feature_flag_api, feature_flag_storage)
81+
5582
def _fetch_until(self, fetch_options, till=None):
5683
"""
5784
Hit endpoint, update storage and return when since==till.
@@ -81,14 +108,9 @@ def _fetch_until(self, fetch_options, till=None):
81108
_LOGGER.debug('Exception information: ', exc_info=True)
82109
raise exc
83110

84-
for feature_flag in feature_flag_changes.get('splits', []):
85-
if feature_flag['status'] == splits.Status.ACTIVE.value:
86-
parsed = splits.from_raw(feature_flag)
87-
self._feature_flag_storage.put(parsed)
88-
segment_list.update(set(parsed.get_segment_names()))
89-
else:
90-
self._feature_flag_storage.remove(feature_flag['name'])
91-
self._feature_flag_storage.set_change_number(feature_flag_changes['till'])
111+
fetched_feature_flags = []
112+
[fetched_feature_flags.append(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
113+
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
92114
if feature_flag_changes['till'] == feature_flag_changes['since']:
93115
return feature_flag_changes['till'], segment_list
94116

@@ -127,15 +149,15 @@ def synchronize_splits(self, till=None):
127149
:type till: int
128150
"""
129151
final_segment_list = set()
130-
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
152+
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
131153
successful_sync, remaining_attempts, change_number, segment_list = self._attempt_feature_flag_sync(fetch_options,
132154
till)
133155
final_segment_list.update(segment_list)
134156
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
135157
if successful_sync: # succedeed sync
136158
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
137159
return final_segment_list
138-
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
160+
with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
139161
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = self._attempt_feature_flag_sync(with_cdn_bypass, till)
140162
final_segment_list.update(segment_list)
141163
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
@@ -160,8 +182,7 @@ def kill_split(self, feature_flag_name, default_treatment, change_number):
160182
"""
161183
self._feature_flag_storage.kill_locally(feature_flag_name, default_treatment, change_number)
162184

163-
164-
class SplitSynchronizerAsync(object):
185+
class SplitSynchronizerAsync(SplitSynchronizerBase):
165186
"""Feature Flag changes synchronizer async."""
166187

167188
def __init__(self, feature_flag_api, feature_flag_storage):
@@ -174,16 +195,7 @@ def __init__(self, feature_flag_api, feature_flag_storage):
174195
:param feature_flag_storage: Feature Flag Storage.
175196
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
176197
"""
177-
self._api = feature_flag_api
178-
self._feature_flag_storage = feature_flag_storage
179-
self._backoff = Backoff(
180-
_ON_DEMAND_FETCH_BACKOFF_BASE,
181-
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
182-
183-
@property
184-
def feature_flag_storage(self):
185-
"""Return Feature_flag storage object"""
186-
return self._feature_flag_storage
198+
super().__init__(feature_flag_api, feature_flag_storage)
187199

188200
async def _fetch_until(self, fetch_options, till=None):
189201
"""
@@ -214,13 +226,9 @@ async def _fetch_until(self, fetch_options, till=None):
214226
_LOGGER.debug('Exception information: ', exc_info=True)
215227
raise exc
216228

217-
for feature_flag in feature_flag_changes.get('splits', []):
218-
if feature_flag['status'] == splits.Status.ACTIVE.value:
219-
parsed = splits.from_raw(feature_flag)
220-
await self._feature_flag_storage.put(parsed)
221-
segment_list.update(set(parsed.get_segment_names()))
222-
else:
223-
await self._feature_flag_storage.remove(feature_flag['name'])
229+
fetched_feature_flags = []
230+
[fetched_feature_flags.append(splits.from_raw(feature_flag)) for feature_flag in feature_flag_changes.get('splits', [])]
231+
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, fetched_feature_flags, feature_flag_changes['till'])
224232
await self._feature_flag_storage.set_change_number(feature_flag_changes['till'])
225233
if feature_flag_changes['till'] == feature_flag_changes['since']:
226234
return feature_flag_changes['till'], segment_list
@@ -260,15 +268,15 @@ async def synchronize_splits(self, till=None):
260268
:type till: int
261269
"""
262270
final_segment_list = set()
263-
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
271+
fetch_options = FetchOptions(True, sets=self._get_config_sets()) # Set Cache-Control to no-cache
264272
successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_feature_flag_sync(fetch_options,
265273
till)
266274
final_segment_list.update(segment_list)
267275
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
268276
if successful_sync: # succedeed sync
269277
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
270278
return final_segment_list
271-
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
279+
with_cdn_bypass = FetchOptions(True, change_number, sets=self._get_config_sets()) # Set flag for bypassing CDN
272280
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_feature_flag_sync(with_cdn_bypass, till)
273281
final_segment_list.update(segment_list)
274282
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
@@ -430,6 +438,9 @@ def _sanitize_feature_flag_elements(self, parsed_feature_flags):
430438
('algo', 2, 2, 2, None, None)]:
431439
feature_flag = util._sanitize_object_element(feature_flag, 'split', element[0], element[1], lower_value=element[2], upper_value=element[3], in_list=element[4], not_in_list=element[5])
432440
feature_flag = self._sanitize_condition(feature_flag)
441+
if 'sets' not in feature_flag:
442+
feature_flag['sets'] = []
443+
feature_flag['sets'] = validate_flag_sets(feature_flag['sets'], 'Localhost Validator')
433444
sanitized_feature_flags.append(feature_flag)
434445
return sanitized_feature_flags
435446

@@ -604,12 +615,8 @@ def _synchronize_legacy(self):
604615
fetched = self._read_feature_flags_from_legacy_file(self._filename)
605616
to_delete = [name for name in self._feature_flag_storage.get_split_names()
606617
if name not in fetched.keys()]
607-
for feature_flag in fetched.values():
608-
self._feature_flag_storage.put(feature_flag)
609-
610-
for feature_flag in to_delete:
611-
self._feature_flag_storage.remove(feature_flag)
612-
618+
to_add = [feature_flag for feature_flag in fetched.values()]
619+
self._feature_flag_storage.update(to_add, to_delete, 0)
613620
return []
614621

615622
def _synchronize_json(self):
@@ -628,18 +635,12 @@ def _synchronize_json(self):
628635
self._current_json_sha = fecthed_sha
629636
if self._feature_flag_storage.get_change_number() > till and till != self._DEFAULT_FEATURE_FLAG_TILL:
630637
return []
631-
for feature_flag in fetched:
632-
if feature_flag['status'] == splits.Status.ACTIVE.value:
633-
parsed = splits.from_raw(feature_flag)
634-
self._feature_flag_storage.put(parsed)
635-
_LOGGER.debug("feature flag %s is updated", parsed.name)
636-
segment_list.update(set(parsed.get_segment_names()))
637-
else:
638-
self._feature_flag_storage.remove(feature_flag['name'])
639638

640-
self._feature_flag_storage.set_change_number(till)
639+
fetched_feature_flags = [fetched_feature_flags.append(splits.from_raw(feature_flag)) for feature_flag in fetched]
640+
segment_list = update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, till)
641641
return segment_list
642642
except Exception as exc:
643+
_LOGGER.debug(exc)
643644
raise ValueError("Error reading feature flags from json.") from exc
644645

645646
def _read_feature_flags_from_json_file(self, filename):
@@ -758,11 +759,8 @@ async def _synchronize_legacy(self):
758759
fetched = await self._read_feature_flags_from_legacy_file(self._filename)
759760
to_delete = [name for name in await self._feature_flag_storage.get_split_names()
760761
if name not in fetched.keys()]
761-
for feature_flag in fetched.values():
762-
await self._feature_flag_storage.put(feature_flag)
763-
764-
for feature_flag in to_delete:
765-
await self._feature_flag_storage.remove(feature_flag)
762+
to_add = [feature_flag for feature_flag in fetched.values()]
763+
await self._feature_flag_storage.update(to_add, to_delete, 0)
766764

767765
return []
768766

@@ -782,18 +780,11 @@ async def _synchronize_json(self):
782780
self._current_json_sha = fecthed_sha
783781
if await self._feature_flag_storage.get_change_number() > till and till != self._DEFAULT_FEATURE_FLAG_TILL:
784782
return []
785-
for feature_flag in fetched:
786-
if feature_flag['status'] == splits.Status.ACTIVE.value:
787-
parsed = splits.from_raw(feature_flag)
788-
await self._feature_flag_storage.put(parsed)
789-
_LOGGER.debug("feature flag %s is updated", parsed.name)
790-
segment_list.update(set(parsed.get_segment_names()))
791-
else:
792-
await self._feature_flag_storage.remove(feature_flag['name'])
793-
794-
await self._feature_flag_storage.set_change_number(till)
783+
fetched_feature_flags = [fetched_feature_flags.append(splits.from_raw(feature_flag)) for feature_flag in fetched]
784+
segment_list = await update_feature_flag_storage(self._feature_flag_storage, fetched_feature_flags, till)
795785
return segment_list
796786
except Exception as exc:
787+
_LOGGER.debug(exc)
797788
raise ValueError("Error reading feature flags from json.") from exc
798789

799790
async def _read_feature_flags_from_json_file(self, filename):

splitio/sync/synchronizer.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ def __init__(self, split_synchronizers, split_tasks):
252252
self._periodic_data_recording_tasks.append(self._split_tasks.unique_keys_task)
253253
if self._split_tasks.clear_filter_task:
254254
self._periodic_data_recording_tasks.append(self._split_tasks.clear_filter_task)
255+
self._break_sync_all = False
255256

256257
@property
257258
def split_sync(self):
@@ -384,6 +385,7 @@ def synchronize_splits(self, till, sync_segments=True):
384385
:returns: whether the synchronization was successful or not.
385386
:rtype: bool
386387
"""
388+
self._break_sync_all = False
387389
_LOGGER.debug('Starting splits synchronization')
388390
try:
389391
new_segments = []
@@ -399,7 +401,9 @@ def synchronize_splits(self, till, sync_segments=True):
399401
else:
400402
_LOGGER.debug('Segment sync scheduled.')
401403
return True
402-
except APIException:
404+
except APIException as exc:
405+
if exc._status_code is not None and exc._status_code == 414:
406+
self._break_sync_all = True
403407
_LOGGER.error('Failed syncing feature flags')
404408
_LOGGER.debug('Error: ', exc_info=True)
405409
return False
@@ -429,7 +433,7 @@ def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
429433
_LOGGER.debug('Error: ', exc_info=True)
430434
if max_retry_attempts != _SYNC_ALL_NO_RETRIES:
431435
retry_attempts += 1
432-
if retry_attempts > max_retry_attempts:
436+
if retry_attempts > max_retry_attempts or self._break_sync_all:
433437
break
434438
how_long = self._backoff.get()
435439
time.sleep(how_long)
@@ -536,6 +540,7 @@ async def synchronize_splits(self, till, sync_segments=True):
536540
:returns: whether the synchronization was successful or not.
537541
:rtype: bool
538542
"""
543+
self._break_sync_all = False
539544
_LOGGER.debug('Starting feature flags synchronization')
540545
try:
541546
new_segments = []
@@ -551,7 +556,9 @@ async def synchronize_splits(self, till, sync_segments=True):
551556
else:
552557
_LOGGER.debug('Segment sync scheduled.')
553558
return True
554-
except APIException:
559+
except APIException as exc:
560+
if exc._status_code is not None and exc._status_code == 414:
561+
self._break_sync_all = True
555562
_LOGGER.error('Failed syncing feature flags')
556563
_LOGGER.debug('Error: ', exc_info=True)
557564
return False
@@ -581,7 +588,7 @@ async def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
581588
_LOGGER.debug('Error: ', exc_info=True)
582589
if max_retry_attempts != _SYNC_ALL_NO_RETRIES:
583590
retry_attempts += 1
584-
if retry_attempts > max_retry_attempts:
591+
if retry_attempts > max_retry_attempts or self._break_sync_all:
585592
break
586593
how_long = self._backoff.get()
587594
time.sleep(how_long)

splitio/tasks/util/asynctask.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def _execution_wrapper(self):
113113
_LOGGER.debug("Force execution signal received. Running now")
114114
if not _safe_run(self._main):
115115
_LOGGER.error("An error occurred when executing the task. "
116-
"Retrying after perio expires")
116+
"Retrying after period expires")
117117
continue
118118
except queue.Empty:
119119
# If no message was received, the timeout has expired
@@ -123,7 +123,7 @@ def _execution_wrapper(self):
123123
if not _safe_run(self._main):
124124
_LOGGER.error(
125125
"An error occurred when executing the task. "
126-
"Retrying after perio expires"
126+
"Retrying after period expires"
127127
)
128128
finally:
129129
self._cleanup()
@@ -252,7 +252,7 @@ async def _execution_wrapper(self):
252252
_LOGGER.debug("Force execution signal received. Running now")
253253
if not await _safe_run_async(self._main):
254254
_LOGGER.error("An error occurred when executing the task. "
255-
"Retrying after perio expires")
255+
"Retrying after period expires")
256256
continue
257257
except asyncio.QueueEmpty:
258258
# If no message was received, the timeout has expired

0 commit comments

Comments
 (0)