|
14 | 14 | from splitio.util.backoff import Backoff |
15 | 15 | from splitio.util.time import get_current_epoch_time_ms |
16 | 16 | from splitio.sync import util |
| 17 | +from splitio.optional.loaders import asyncio |
17 | 18 |
|
18 | 19 | _LEGACY_COMMENT_LINE_RE = re.compile(r'^#.*$') |
19 | 20 | _LEGACY_DEFINITION_LINE_RE = re.compile(r'^(?<![^#])(?P<feature>[\w_-]+)\s+(?P<treatment>[\w_-]+)$') |
@@ -154,6 +155,135 @@ def kill_split(self, split_name, default_treatment, change_number): |
154 | 155 | """ |
155 | 156 | self._split_storage.kill_locally(split_name, default_treatment, change_number) |
156 | 157 |
|
| 158 | + |
| 159 | +class SplitSynchronizerAsync(object): |
| 160 | + """Feature Flag changes synchronizer async.""" |
| 161 | + |
| 162 | + def __init__(self, split_api, split_storage): |
| 163 | + """ |
| 164 | + Class constructor. |
| 165 | +
|
| 166 | + :param split_api: Feature Flag API Client. |
| 167 | + :type split_api: splitio.api.splits.SplitsAPI |
| 168 | +
|
| 169 | + :param split_storage: Feature Flag Storage. |
| 170 | + :type split_storage: splitio.storage.InMemorySplitStorage |
| 171 | + """ |
| 172 | + self._api = split_api |
| 173 | + self._split_storage = split_storage |
| 174 | + self._backoff = Backoff( |
| 175 | + _ON_DEMAND_FETCH_BACKOFF_BASE, |
| 176 | + _ON_DEMAND_FETCH_BACKOFF_MAX_WAIT) |
| 177 | + |
| 178 | + async def _fetch_until(self, fetch_options, till=None): |
| 179 | + """ |
| 180 | + Hit endpoint, update storage and return when since==till. |
| 181 | +
|
| 182 | + :param fetch_options Fetch options for getting feature flag definitions. |
| 183 | + :type fetch_options splitio.api.FetchOptions |
| 184 | +
|
| 185 | + :param till: Passed till from Streaming. |
| 186 | + :type till: int |
| 187 | +
|
| 188 | + :return: last change number |
| 189 | + :rtype: int |
| 190 | + """ |
| 191 | + segment_list = set() |
| 192 | + while True: # Fetch until since==till |
| 193 | + change_number = await self._split_storage.get_change_number() |
| 194 | + if change_number is None: |
| 195 | + change_number = -1 |
| 196 | + if till is not None and till < change_number: |
| 197 | + # the passed till is less than change_number, no need to perform updates |
| 198 | + return change_number, segment_list |
| 199 | + |
| 200 | + try: |
| 201 | + split_changes = await self._api.fetch_splits(change_number, fetch_options) |
| 202 | + except APIException as exc: |
| 203 | + _LOGGER.error('Exception raised while fetching feature flags') |
| 204 | + _LOGGER.debug('Exception information: ', exc_info=True) |
| 205 | + raise exc |
| 206 | + |
| 207 | + for split in split_changes.get('splits', []): |
| 208 | + if split['status'] == splits.Status.ACTIVE.value: |
| 209 | + parsed = splits.from_raw(split) |
| 210 | + await self._split_storage.put(parsed) |
| 211 | + segment_list.update(set(parsed.get_segment_names())) |
| 212 | + else: |
| 213 | + await self._split_storage.remove(split['name']) |
| 214 | + await self._split_storage.set_change_number(split_changes['till']) |
| 215 | + if split_changes['till'] == split_changes['since']: |
| 216 | + return split_changes['till'], segment_list |
| 217 | + |
| 218 | + async def _attempt_split_sync(self, fetch_options, till=None): |
| 219 | + """ |
| 220 | + Hit endpoint, update storage and return True if sync is complete. |
| 221 | +
|
| 222 | + :param fetch_options Fetch options for getting feature flag definitions. |
| 223 | + :type fetch_options splitio.api.FetchOptions |
| 224 | +
|
| 225 | + :param till: Passed till from Streaming. |
| 226 | + :type till: int |
| 227 | +
|
| 228 | + :return: Flags to check if it should perform bypass or operation ended |
| 229 | + :rtype: bool, int, int |
| 230 | + """ |
| 231 | + self._backoff.reset() |
| 232 | + final_segment_list = set() |
| 233 | + remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES |
| 234 | + while True: |
| 235 | + remaining_attempts -= 1 |
| 236 | + change_number, segment_list = await self._fetch_until(fetch_options, till) |
| 237 | + final_segment_list.update(segment_list) |
| 238 | + if till is None or till <= change_number: |
| 239 | + return True, remaining_attempts, change_number, final_segment_list |
| 240 | + elif remaining_attempts <= 0: |
| 241 | + return False, remaining_attempts, change_number, final_segment_list |
| 242 | + how_long = self._backoff.get() |
| 243 | + await asyncio.sleep(how_long) |
| 244 | + |
| 245 | + async def synchronize_splits(self, till=None): |
| 246 | + """ |
| 247 | + Hit endpoint, update storage and return True if sync is complete. |
| 248 | +
|
| 249 | + :param till: Passed till from Streaming. |
| 250 | + :type till: int |
| 251 | + """ |
| 252 | + final_segment_list = set() |
| 253 | + fetch_options = FetchOptions(True) # Set Cache-Control to no-cache |
| 254 | + successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_split_sync(fetch_options, |
| 255 | + till) |
| 256 | + final_segment_list.update(segment_list) |
| 257 | + attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts |
| 258 | + if successful_sync: # succedeed sync |
| 259 | + _LOGGER.debug('Refresh completed in %d attempts.', attempts) |
| 260 | + return final_segment_list |
| 261 | + with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN |
| 262 | + without_cdn_successful_sync, remaining_attempts, change_number, segment_list = await self._attempt_split_sync(with_cdn_bypass, till) |
| 263 | + final_segment_list.update(segment_list) |
| 264 | + without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts |
| 265 | + if without_cdn_successful_sync: |
| 266 | + _LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.', |
| 267 | + without_cdn_attempts) |
| 268 | + return final_segment_list |
| 269 | + else: |
| 270 | + _LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.', |
| 271 | + without_cdn_attempts) |
| 272 | + |
| 273 | + async def kill_split(self, split_name, default_treatment, change_number): |
| 274 | + """ |
| 275 | + Local kill for feature flag. |
| 276 | +
|
| 277 | + :param split_name: name of the feature flag to perform kill |
| 278 | + :type split_name: str |
| 279 | + :param default_treatment: name of the default treatment to return |
| 280 | + :type default_treatment: str |
| 281 | + :param change_number: change_number |
| 282 | + :type change_number: int |
| 283 | + """ |
| 284 | + await self._split_storage.kill_locally(split_name, default_treatment, change_number) |
| 285 | + |
| 286 | + |
157 | 287 | class LocalhostMode(Enum): |
158 | 288 | """types for localhost modes""" |
159 | 289 | LEGACY = 0 |
|
0 commit comments