Skip to content

Commit c50621d

Browse files
committed
Added workerpool and sync.segment async classes
1 parent e1d4da8 commit c50621d

File tree

4 files changed

+619
-15
lines changed

4 files changed

+619
-15
lines changed

splitio/sync/segment.py

Lines changed: 195 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,36 @@
99
from splitio.models import segments
1010
from splitio.util.backoff import Backoff
1111
from splitio.sync import util
12-
12+
from splitio.optional.loaders import asyncio
13+
import pytest
1314
_LOGGER = logging.getLogger(__name__)
1415

1516

1617
_ON_DEMAND_FETCH_BACKOFF_BASE = 10 # backoff base starting at 10 seconds
1718
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT = 60 # don't sleep for more than 1 minute
1819
_ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10
20+
_MAX_WORKERS = 10
1921

2022

2123
class SegmentSynchronizer(object):
22-
def __init__(self, segment_api, split_storage, segment_storage):
24+
def __init__(self, segment_api, feature_flag_storage, segment_storage):
2325
"""
2426
Class constructor.
2527
2628
:param segment_api: API to retrieve segments from backend.
2729
:type segment_api: splitio.api.SegmentApi
2830
29-
:param split_storage: Feature Flag Storage.
30-
:type split_storage: splitio.storage.InMemorySplitStorage
31+
:param feature_flag_storage: Feature Flag Storage.
32+
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
3133
3234
:param segment_storage: Segment storage reference.
3335
:type segment_storage: splitio.storage.SegmentStorage
3436
3537
"""
3638
self._api = segment_api
37-
self._split_storage = split_storage
39+
self._feature_flag_storage = feature_flag_storage
3840
self._segment_storage = segment_storage
39-
self._worker_pool = workerpool.WorkerPool(10, self.synchronize_segment)
41+
self._worker_pool = workerpool.WorkerPool(_MAX_WORKERS, self.synchronize_segment)
4042
self._worker_pool.start()
4143
self._backoff = Backoff(
4244
_ON_DEMAND_FETCH_BACKOFF_BASE,
@@ -47,7 +49,7 @@ def recreate(self):
4749
Create worker_pool on forked processes.
4850
4951
"""
50-
self._worker_pool = workerpool.WorkerPool(10, self.synchronize_segment)
52+
self._worker_pool = workerpool.WorkerPool(_MAX_WORKERS, self.synchronize_segment)
5153
self._worker_pool.start()
5254

5355
def shutdown(self):
@@ -175,7 +177,7 @@ def synchronize_segments(self, segment_names = None, dont_wait = False):
175177
:rtype: bool
176178
"""
177179
if segment_names is None:
178-
segment_names = self._split_storage.get_segment_names()
180+
segment_names = self._feature_flag_storage.get_segment_names()
179181

180182
for segment_name in segment_names:
181183
self._worker_pool.submit_work(segment_name)
@@ -195,27 +197,207 @@ def segment_exist_in_storage(self, segment_name):
195197
"""
196198
return self._segment_storage.get(segment_name) != None
197199

200+
201+
class SegmentSynchronizerAsync(object):
202+
def __init__(self, segment_api, feature_flag_storage, segment_storage):
203+
"""
204+
Class constructor.
205+
206+
:param segment_api: API to retrieve segments from backend.
207+
:type segment_api: splitio.api.SegmentApi
208+
209+
:param feature_flag_storage: Feature Flag Storage.
210+
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
211+
212+
:param segment_storage: Segment storage reference.
213+
:type segment_storage: splitio.storage.SegmentStorage
214+
215+
"""
216+
self._api = segment_api
217+
self._feature_flag_storage = feature_flag_storage
218+
self._segment_storage = segment_storage
219+
self._worker_pool = workerpool.WorkerPoolAsync(_MAX_WORKERS, self.synchronize_segment)
220+
self._worker_pool.start()
221+
self._backoff = Backoff(
222+
_ON_DEMAND_FETCH_BACKOFF_BASE,
223+
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
224+
225+
def recreate(self):
226+
"""
227+
Create worker_pool on forked processes.
228+
229+
"""
230+
self._worker_pool = workerpool.WorkerPoolAsync(_MAX_WORKERS, self.synchronize_segment)
231+
self._worker_pool.start()
232+
233+
async def shutdown(self):
234+
"""
235+
Shutdown worker_pool
236+
237+
"""
238+
await self._worker_pool.stop()
239+
240+
async def _fetch_until(self, segment_name, fetch_options, till=None):
241+
"""
242+
Hit endpoint, update storage and return when since==till.
243+
244+
:param segment_name: Name of the segment to update.
245+
:type segment_name: str
246+
247+
:param fetch_options Fetch options for getting segment definitions.
248+
:type fetch_options splitio.api.FetchOptions
249+
250+
:param till: Passed till from Streaming.
251+
:type till: int
252+
253+
:return: last change number
254+
:rtype: int
255+
"""
256+
while True: # Fetch until since==till
257+
change_number = await self._segment_storage.get_change_number(segment_name)
258+
if change_number is None:
259+
change_number = -1
260+
if till is not None and till < change_number:
261+
# the passed till is less than change_number, no need to perform updates
262+
return change_number
263+
264+
try:
265+
segment_changes = await self._api.fetch_segment(segment_name, change_number,
266+
fetch_options)
267+
except APIException as exc:
268+
_LOGGER.error('Exception raised while fetching segment %s', segment_name)
269+
_LOGGER.debug('Exception information: ', exc_info=True)
270+
raise exc
271+
272+
if change_number == -1: # first time fetching the segment
273+
new_segment = segments.from_raw(segment_changes)
274+
await self._segment_storage.put(new_segment)
275+
else:
276+
await self._segment_storage.update(
277+
segment_name,
278+
segment_changes['added'],
279+
segment_changes['removed'],
280+
segment_changes['till']
281+
)
282+
283+
if segment_changes['till'] == segment_changes['since']:
284+
return segment_changes['till']
285+
286+
async def _attempt_segment_sync(self, segment_name, fetch_options, till=None):
287+
"""
288+
Hit endpoint, update storage and return True if sync is complete.
289+
290+
:param segment_name: Name of the segment to update.
291+
:type segment_name: str
292+
293+
:param fetch_options Fetch options for getting feature flag definitions.
294+
:type fetch_options splitio.api.FetchOptions
295+
296+
:param till: Passed till from Streaming.
297+
:type till: int
298+
299+
:return: Flags to check if it should perform bypass or operation ended
300+
:rtype: bool, int, int
301+
"""
302+
self._backoff.reset()
303+
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
304+
while True:
305+
remaining_attempts -= 1
306+
change_number = await self._fetch_until(segment_name, fetch_options, till)
307+
if till is None or till <= change_number:
308+
return True, remaining_attempts, change_number
309+
elif remaining_attempts <= 0:
310+
return False, remaining_attempts, change_number
311+
how_long = self._backoff.get()
312+
await asyncio.sleep(how_long)
313+
314+
async def synchronize_segment(self, segment_name, till=None):
315+
"""
316+
Update a segment from queue
317+
318+
:param segment_name: Name of the segment to update.
319+
:type segment_name: str
320+
321+
:param till: ChangeNumber received.
322+
:type till: int
323+
324+
:return: True if no error occurs. False otherwise.
325+
:rtype: bool
326+
"""
327+
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
328+
successful_sync, remaining_attempts, change_number = await self._attempt_segment_sync(segment_name, fetch_options, till)
329+
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
330+
if successful_sync: # succedeed sync
331+
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
332+
return True
333+
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
334+
without_cdn_successful_sync, remaining_attempts, change_number = await self._attempt_segment_sync(segment_name, with_cdn_bypass, till)
335+
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
336+
if without_cdn_successful_sync:
337+
_LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',
338+
without_cdn_attempts)
339+
return True
340+
_LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
341+
without_cdn_attempts)
342+
return False
343+
344+
async def synchronize_segments(self, segment_names = None, dont_wait = False):
345+
"""
346+
Submit all current segments and wait for them to finish depend on dont_wait flag, then set the ready flag.
347+
348+
:param segment_names: Optional, array of segment names to update.
349+
:type segment_name: {str}
350+
351+
:param dont_wait: Optional, instruct the function to not wait for task completion
352+
:type segment_name: boolean
353+
354+
:return: True if no error occurs or dont_wait flag is True. False otherwise.
355+
:rtype: bool
356+
"""
357+
if segment_names is None:
358+
segment_names = await self._feature_flag_storage.get_segment_names()
359+
360+
for segment_name in segment_names:
361+
await self._worker_pool.submit_work(segment_name)
362+
if (dont_wait):
363+
return True
364+
await asyncio.sleep(.5)
365+
return not await self._worker_pool.wait_for_completion()
366+
367+
async def segment_exist_in_storage(self, segment_name):
368+
"""
369+
Check if a segment exists in the storage
370+
371+
:param segment_name: Name of the segment
372+
:type segment_name: str
373+
374+
:return: True if segment exist. False otherwise.
375+
:rtype: bool
376+
"""
377+
return await self._segment_storage.get(segment_name) != None
378+
379+
198380
class LocalSegmentSynchronizer(object):
199381
"""Localhost mode segment synchronizer."""
200382

201383
_DEFAULT_SEGMENT_TILL = -1
202384

203-
def __init__(self, segment_folder, split_storage, segment_storage):
385+
def __init__(self, segment_folder, feature_flag_storage, segment_storage):
204386
"""
205387
Class constructor.
206388
207389
:param segment_folder: patch to the segment folder
208390
:type segment_folder: str
209391
210-
:param split_storage: Feature flag Storage.
211-
:type split_storage: splitio.storage.InMemorySplitStorage
392+
:param feature_flag_storage: Feature flag Storage.
393+
:type feature_flag_storage: splitio.storage.InMemorySplitStorage
212394
213395
:param segment_storage: Segment storage reference.
214396
:type segment_storage: splitio.storage.SegmentStorage
215397
216398
"""
217399
self._segment_folder = segment_folder
218-
self._split_storage = split_storage
400+
self._feature_flag_storage = feature_flag_storage
219401
self._segment_storage = segment_storage
220402
self._segment_sha = {}
221403

@@ -231,7 +413,7 @@ def synchronize_segments(self, segment_names = None):
231413
"""
232414
_LOGGER.info('Synchronizing segments now.')
233415
if segment_names is None:
234-
segment_names = self._split_storage.get_segment_names()
416+
segment_names = self._feature_flag_storage.get_segment_names()
235417

236418
return_flag = True
237419
for segment_name in segment_names:

0 commit comments

Comments
 (0)