Skip to content

Commit 7b9ad8a

Browse files
authored
Merge pull request #436 from splitio/async-sync-synchronizer
Added sync.synchronizer async class, updated tasks.unique_keys classes
2 parents 0a87c82 + baea887 commit 7b9ad8a

File tree

4 files changed

+777
-85
lines changed

4 files changed

+777
-85
lines changed

splitio/sync/segment.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,10 +358,10 @@ async def synchronize_segments(self, segment_names = None, dont_wait = False):
358358
if segment_names is None:
359359
segment_names = await self._feature_flag_storage.get_segment_names()
360360

361-
jobs = await self._worker_pool.submit_work(segment_names)
361+
self._jobs = await self._worker_pool.submit_work(segment_names)
362362
if (dont_wait):
363363
return True
364-
return await jobs.await_completion()
364+
return await self._jobs.await_completion()
365365

366366
async def segment_exist_in_storage(self, segment_name):
367367
"""

splitio/sync/synchronizer.py

Lines changed: 247 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ def shutdown(self, blocking):
224224
pass
225225

226226

227-
class Synchronizer(BaseSynchronizer):
227+
class SynchronizerInMemoryBase(BaseSynchronizer):
228228
"""Synchronizer."""
229229

230230
def __init__(self, split_synchronizers, split_tasks):
@@ -253,6 +253,100 @@ def __init__(self, split_synchronizers, split_tasks):
253253
if self._split_tasks.clear_filter_task:
254254
self._periodic_data_recording_tasks.append(self._split_tasks.clear_filter_task)
255255

256+
def synchronize_segment(self, segment_name, till):
257+
"""
258+
Synchronize particular segment.
259+
260+
:param segment_name: segment associated
261+
:type segment_name: str
262+
:param till: to fetch
263+
:type till: int
264+
"""
265+
pass
266+
267+
def synchronize_splits(self, till, sync_segments=True):
268+
"""
269+
Synchronize all splits.
270+
271+
:param till: to fetch
272+
:type till: int
273+
274+
:returns: whether the synchronization was successful or not.
275+
:rtype: bool
276+
"""
277+
pass
278+
279+
def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
280+
"""
281+
Synchronize all splits.
282+
283+
:param max_retry_attempts: apply max attempts if it set to absilute integer.
284+
:type max_retry_attempts: int
285+
"""
286+
pass
287+
288+
def shutdown(self, blocking):
289+
"""
290+
Stop tasks.
291+
292+
:param blocking:flag to wait until tasks are stopped
293+
:type blocking: bool
294+
"""
295+
pass
296+
297+
def start_periodic_fetching(self):
298+
"""Start fetchers for splits and segments."""
299+
_LOGGER.debug('Starting periodic data fetching')
300+
self._split_tasks.split_task.start()
301+
self._split_tasks.segment_task.start()
302+
303+
def stop_periodic_fetching(self):
304+
"""Stop fetchers for splits and segments."""
305+
pass
306+
307+
def start_periodic_data_recording(self):
308+
"""Start recorders."""
309+
_LOGGER.debug('Starting periodic data recording')
310+
for task in self._periodic_data_recording_tasks:
311+
task.start()
312+
313+
def stop_periodic_data_recording(self, blocking):
314+
"""
315+
Stop recorders.
316+
317+
:param blocking: flag to wait until tasks are stopped
318+
:type blocking: bool
319+
"""
320+
pass
321+
322+
def kill_split(self, split_name, default_treatment, change_number):
323+
"""
324+
Kill a split locally.
325+
326+
:param split_name: name of the split to perform kill
327+
:type split_name: str
328+
:param default_treatment: name of the default treatment to return
329+
:type default_treatment: str
330+
:param change_number: change_number
331+
:type change_number: int
332+
"""
333+
pass
334+
335+
336+
class Synchronizer(SynchronizerInMemoryBase):
337+
"""Synchronizer."""
338+
339+
def __init__(self, split_synchronizers, split_tasks):
340+
"""
341+
Class constructor.
342+
343+
:param split_synchronizers: syncs for performing synchronization of segments and splits
344+
:type split_synchronizers: splitio.sync.synchronizer.SplitSynchronizers
345+
:param split_tasks: tasks for starting/stopping tasks
346+
:type split_tasks: splitio.sync.synchronizer.SplitTasks
347+
"""
348+
super().__init__(split_synchronizers, split_tasks)
349+
256350
def _synchronize_segments(self):
257351
_LOGGER.debug('Starting segments synchronization')
258352
return self._split_synchronizers.segment_sync.synchronize_segments()
@@ -334,9 +428,6 @@ def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
334428

335429
_LOGGER.error("Could not correctly synchronize splits and segments after %d attempts.", retry_attempts)
336430

337-
def _retry_block(self, max_retry_attempts, retry_attempts):
338-
return retry_attempts
339-
340431
def shutdown(self, blocking):
341432
"""
342433
Stop tasks.
@@ -349,24 +440,12 @@ def shutdown(self, blocking):
349440
self.stop_periodic_fetching()
350441
self.stop_periodic_data_recording(blocking)
351442

352-
def start_periodic_fetching(self):
353-
"""Start fetchers for splits and segments."""
354-
_LOGGER.debug('Starting periodic data fetching')
355-
self._split_tasks.split_task.start()
356-
self._split_tasks.segment_task.start()
357-
358443
def stop_periodic_fetching(self):
359444
"""Stop fetchers for splits and segments."""
360445
_LOGGER.debug('Stopping periodic fetching')
361446
self._split_tasks.split_task.stop()
362447
self._split_tasks.segment_task.stop()
363448

364-
def start_periodic_data_recording(self):
365-
"""Start recorders."""
366-
_LOGGER.debug('Starting periodic data recording')
367-
for task in self._periodic_data_recording_tasks:
368-
task.start()
369-
370449
def stop_periodic_data_recording(self, blocking):
371450
"""
372451
Stop recorders.
@@ -405,6 +484,158 @@ def kill_split(self, split_name, default_treatment, change_number):
405484
self._split_synchronizers.split_sync.kill_split(split_name, default_treatment,
406485
change_number)
407486

487+
class SynchronizerAsync(SynchronizerInMemoryBase):
488+
"""Synchronizer async."""
489+
490+
def __init__(self, split_synchronizers, split_tasks):
491+
"""
492+
Class constructor.
493+
494+
:param split_synchronizers: syncs for performing synchronization of segments and splits
495+
:type split_synchronizers: splitio.sync.synchronizer.SplitSynchronizers
496+
:param split_tasks: tasks for starting/stopping tasks
497+
:type split_tasks: splitio.sync.synchronizer.SplitTasks
498+
"""
499+
super().__init__(split_synchronizers, split_tasks)
500+
self.stop_periodic_data_recording_task = None
501+
502+
async def _synchronize_segments(self):
503+
_LOGGER.debug('Starting segments synchronization')
504+
return await self._split_synchronizers.segment_sync.synchronize_segments()
505+
506+
async def synchronize_segment(self, segment_name, till):
507+
"""
508+
Synchronize particular segment.
509+
510+
:param segment_name: segment associated
511+
:type segment_name: str
512+
:param till: to fetch
513+
:type till: int
514+
"""
515+
_LOGGER.debug('Synchronizing segment %s', segment_name)
516+
success = await self._split_synchronizers.segment_sync.synchronize_segment(segment_name, till)
517+
if not success:
518+
_LOGGER.error('Failed to sync some segments.')
519+
return success
520+
521+
async def synchronize_splits(self, till, sync_segments=True):
522+
"""
523+
Synchronize all splits.
524+
525+
:param till: to fetch
526+
:type till: int
527+
528+
:returns: whether the synchronization was successful or not.
529+
:rtype: bool
530+
"""
531+
_LOGGER.debug('Starting splits synchronization')
532+
try:
533+
new_segments = []
534+
for segment in await self._split_synchronizers.split_sync.synchronize_splits(till):
535+
if not await self._split_synchronizers.segment_sync.segment_exist_in_storage(segment):
536+
new_segments.append(segment)
537+
if sync_segments and len(new_segments) != 0:
538+
_LOGGER.debug('Synching Segments: %s', ','.join(new_segments))
539+
success = await self._split_synchronizers.segment_sync.synchronize_segments(new_segments, True)
540+
if not success:
541+
_LOGGER.error('Failed to schedule sync one or all segment(s) below.')
542+
_LOGGER.error(','.join(new_segments))
543+
else:
544+
_LOGGER.debug('Segment sync scheduled.')
545+
return True
546+
except APIException:
547+
_LOGGER.error('Failed syncing splits')
548+
_LOGGER.debug('Error: ', exc_info=True)
549+
return False
550+
551+
async def sync_all(self, max_retry_attempts=_SYNC_ALL_NO_RETRIES):
552+
"""
553+
Synchronize all splits.
554+
555+
:param max_retry_attempts: apply max attempts if it set to absilute integer.
556+
:type max_retry_attempts: int
557+
"""
558+
retry_attempts = 0
559+
while True:
560+
try:
561+
if not await self.synchronize_splits(None, False):
562+
raise Exception("split sync failed")
563+
564+
# Only retrying splits, since segments may trigger too many calls.
565+
566+
if not await self._synchronize_segments():
567+
_LOGGER.warning('Segments failed to synchronize.')
568+
569+
# All is good
570+
return
571+
except Exception as exc: # pylint:disable=broad-except
572+
_LOGGER.error("Exception caught when trying to sync all data: %s", str(exc))
573+
_LOGGER.debug('Error: ', exc_info=True)
574+
if max_retry_attempts != _SYNC_ALL_NO_RETRIES:
575+
retry_attempts += 1
576+
if retry_attempts > max_retry_attempts:
577+
break
578+
how_long = self._backoff.get()
579+
time.sleep(how_long)
580+
581+
_LOGGER.error("Could not correctly synchronize splits and segments after %d attempts.", retry_attempts)
582+
583+
async def shutdown(self, blocking):
584+
"""
585+
Stop tasks.
586+
587+
:param blocking:flag to wait until tasks are stopped
588+
:type blocking: bool
589+
"""
590+
_LOGGER.debug('Shutting down tasks.')
591+
await self._split_synchronizers.segment_sync.shutdown()
592+
await self.stop_periodic_fetching()
593+
await self.stop_periodic_data_recording(blocking)
594+
595+
async def stop_periodic_fetching(self):
596+
"""Stop fetchers for splits and segments."""
597+
_LOGGER.debug('Stopping periodic fetching')
598+
await self._split_tasks.split_task.stop()
599+
await self._split_tasks.segment_task.stop()
600+
601+
async def stop_periodic_data_recording(self, blocking):
602+
"""
603+
Stop recorders.
604+
605+
:param blocking: flag to wait until tasks are stopped
606+
:type blocking: bool
607+
"""
608+
_LOGGER.debug('Stopping periodic data recording')
609+
if blocking:
610+
await self._stop_periodic_data_recording()
611+
_LOGGER.debug('all tasks finished successfully.')
612+
else:
613+
self.stop_periodic_data_recording_task = asyncio.get_running_loop().create_task(self._stop_periodic_data_recording)
614+
615+
async def _stop_periodic_data_recording(self):
616+
"""
617+
Stop recorders.
618+
619+
:param blocking: flag to wait until tasks are stopped
620+
:type blocking: bool
621+
"""
622+
for task in self._periodic_data_recording_tasks:
623+
await task.stop()
624+
625+
async def kill_split(self, split_name, default_treatment, change_number):
626+
"""
627+
Kill a split locally.
628+
629+
:param split_name: name of the split to perform kill
630+
:type split_name: str
631+
:param default_treatment: name of the default treatment to return
632+
:type default_treatment: str
633+
:param change_number: change_number
634+
:type change_number: int
635+
"""
636+
await self._split_synchronizers.split_sync.kill_split(split_name, default_treatment,
637+
change_number)
638+
408639
class RedisSynchronizerBase(BaseSynchronizer):
409640
"""Redis Synchronizer."""
410641

0 commit comments

Comments
 (0)