Skip to content

Commit 1b35d9d

Browse files
authored
Merge pull request #423 from splitio/async-sync-split
added sync split synchronizer async class
2 parents e1d4da8 + d4b5757 commit 1b35d9d

File tree

2 files changed

+341
-104
lines changed

2 files changed

+341
-104
lines changed

splitio/sync/split.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from splitio.util.backoff import Backoff
1515
from splitio.util.time import get_current_epoch_time_ms
1616
from splitio.sync import util
17+
from splitio.optional.loaders import asyncio
1718

1819
_LEGACY_COMMENT_LINE_RE = re.compile(r'^#.*$')
1920
_LEGACY_DEFINITION_LINE_RE = re.compile(r'^(?<![^#])(?P<feature>[\w_-]+)\s+(?P<treatment>[\w_-]+)$')
@@ -154,6 +155,135 @@ def kill_split(self, split_name, default_treatment, change_number):
154155
"""
155156
self._split_storage.kill_locally(split_name, default_treatment, change_number)
156157

158+
159+
class SplitSynchronizerAsync(object):
160+
"""Feature Flag changes synchronizer async."""
161+
162+
def __init__(self, split_api, split_storage):
163+
"""
164+
Class constructor.
165+
166+
:param split_api: Feature Flag API Client.
167+
:type split_api: splitio.api.splits.SplitsAPI
168+
169+
:param split_storage: Feature Flag Storage.
170+
:type split_storage: splitio.storage.InMemorySplitStorage
171+
"""
172+
self._api = split_api
173+
self._split_storage = split_storage
174+
self._backoff = Backoff(
175+
_ON_DEMAND_FETCH_BACKOFF_BASE,
176+
_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT)
177+
178+
async def _fetch_until(self, fetch_options, till=None):
179+
"""
180+
Hit endpoint, update storage and return when since==till.
181+
182+
:param fetch_options Fetch options for getting feature flag definitions.
183+
:type fetch_options splitio.api.FetchOptions
184+
185+
:param till: Passed till from Streaming.
186+
:type till: int
187+
188+
:return: last change number
189+
:rtype: int
190+
"""
191+
segment_list = set()
192+
while True: # Fetch until since==till
193+
change_number = await self._split_storage.get_change_number()
194+
if change_number is None:
195+
change_number = -1
196+
if till is not None and till < change_number:
197+
# the passed till is less than change_number, no need to perform updates
198+
return change_number, segment_list
199+
200+
try:
201+
split_changes = await self._api.fetch_splits(change_number, fetch_options)
202+
except APIException as exc:
203+
_LOGGER.error('Exception raised while fetching feature flags')
204+
_LOGGER.debug('Exception information: ', exc_info=True)
205+
raise exc
206+
207+
for split in split_changes.get('splits', []):
208+
if split['status'] == splits.Status.ACTIVE.value:
209+
parsed = splits.from_raw(split)
210+
await self._split_storage.put(parsed)
211+
segment_list.update(set(parsed.get_segment_names()))
212+
else:
213+
await self._split_storage.remove(split['name'])
214+
await self._split_storage.set_change_number(split_changes['till'])
215+
if split_changes['till'] == split_changes['since']:
216+
return split_changes['till'], segment_list
217+
218+
async def _attempt_split_sync(self, fetch_options, till=None):
219+
"""
220+
Hit endpoint, update storage and return True if sync is complete.
221+
222+
:param fetch_options Fetch options for getting feature flag definitions.
223+
:type fetch_options splitio.api.FetchOptions
224+
225+
:param till: Passed till from Streaming.
226+
:type till: int
227+
228+
:return: Flags to check if it should perform bypass or operation ended
229+
:rtype: bool, int, int
230+
"""
231+
self._backoff.reset()
232+
final_segment_list = set()
233+
remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
234+
while True:
235+
remaining_attempts -= 1
236+
change_number, segment_list = await self._fetch_until(fetch_options, till)
237+
final_segment_list.update(segment_list)
238+
if till is None or till <= change_number:
239+
return True, remaining_attempts, change_number, final_segment_list
240+
elif remaining_attempts <= 0:
241+
return False, remaining_attempts, change_number, final_segment_list
242+
how_long = self._backoff.get()
243+
await asyncio.sleep(how_long)
244+
245+
async def synchronize_splits(self, till=None):
246+
"""
247+
Hit endpoint, update storage and return True if sync is complete.
248+
249+
:param till: Passed till from Streaming.
250+
:type till: int
251+
"""
252+
final_segment_list = set()
253+
fetch_options = FetchOptions(True) # Set Cache-Control to no-cache
254+
successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_split_sync(fetch_options,
255+
till)
256+
final_segment_list.update(segment_list)
257+
attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
258+
if successful_sync: # succedeed sync
259+
_LOGGER.debug('Refresh completed in %d attempts.', attempts)
260+
return final_segment_list
261+
with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN
262+
without_cdn_successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_split_sync(with_cdn_bypass, till)
263+
final_segment_list.update(segment_list)
264+
without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
265+
if without_cdn_successful_sync:
266+
_LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.',
267+
without_cdn_attempts)
268+
return final_segment_list
269+
else:
270+
_LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.',
271+
without_cdn_attempts)
272+
273+
async def kill_split(self, split_name, default_treatment, change_number):
274+
"""
275+
Local kill for feature flag.
276+
277+
:param split_name: name of the feature flag to perform kill
278+
:type split_name: str
279+
:param default_treatment: name of the default treatment to return
280+
:type default_treatment: str
281+
:param change_number: change_number
282+
:type change_number: int
283+
"""
284+
await self._split_storage.kill_locally(split_name, default_treatment, change_number)
285+
286+
157287
class LocalhostMode(Enum):
158288
"""types for localhost modes"""
159289
LEGACY = 0

0 commit comments

Comments
 (0)