88from splitio .tasks .util import workerpool
99from splitio .models import segments
1010from splitio .util .backoff import Backoff
11+ from splitio .optional .loaders import asyncio , aiofiles
1112from splitio .sync import util
1213
1314_LOGGER = logging .getLogger (__name__ )
@@ -195,27 +196,57 @@ def segment_exist_in_storage(self, segment_name):
195196 """
196197 return self ._segment_storage .get (segment_name ) != None
197198
198- class LocalSegmentSynchronizer (object ):
199- """Localhost mode segment synchronizer."""
199+ class LocalSegmentSynchronizerBase (object ):
200+ """Localhost mode segment base synchronizer."""
200201
201202 _DEFAULT_SEGMENT_TILL = - 1
202203
203- def __init__ (self , segment_folder , split_storage , segment_storage ):
204+ def _sanitize_segment (self , parsed ):
205+ """
206+ Sanitize json elements.
207+
208+ :param parsed: segment dict
209+ :type parsed: Dict
210+
211+ :return: sanitized segment structure dict
212+ :rtype: Dict
213+ """
214+ if 'name' not in parsed or parsed ['name' ] is None :
215+ _LOGGER .warning ("Segment does not have [name] element, skipping" )
216+ raise Exception ("Segment does not have [name] element" )
217+ if parsed ['name' ].strip () == '' :
218+ _LOGGER .warning ("Segment [name] element is blank, skipping" )
219+ raise Exception ("Segment [name] element is blank" )
220+
221+ for element in [('till' , - 1 , - 1 , None , None , [0 ]),
222+ ('added' , [], None , None , None , None ),
223+ ('removed' , [], None , None , None , None )
224+ ]:
225+ parsed = util ._sanitize_object_element (parsed , 'segment' , element [0 ], element [1 ], lower_value = element [2 ], upper_value = element [3 ], in_list = None , not_in_list = element [5 ])
226+ parsed = util ._sanitize_object_element (parsed , 'segment' , 'since' , parsed ['till' ], - 1 , parsed ['till' ], None , [0 ])
227+
228+ return parsed
229+
230+
231+ class LocalSegmentSynchronizer (LocalSegmentSynchronizerBase ):
232+ """Localhost mode segment synchronizer."""
233+
234+ def __init__ (self , segment_folder , feature_flag_storage , segment_storage ):
204235 """
205236 Class constructor.
206237
207238 :param segment_folder: patch to the segment folder
208239 :type segment_folder: str
209240
210- :param split_storage : Feature flag Storage.
211- :type split_storage : splitio.storage.InMemorySplitStorage
241+ :param feature_flag_storage : Feature flag Storage.
242+ :type feature_flag_storage : splitio.storage.InMemorySplitStorage
212243
213244 :param segment_storage: Segment storage reference.
214245 :type segment_storage: splitio.storage.SegmentStorage
215246
216247 """
217248 self ._segment_folder = segment_folder
218- self ._split_storage = split_storage
249+ self ._feature_flag_storage = feature_flag_storage
219250 self ._segment_storage = segment_storage
220251 self ._segment_sha = {}
221252
@@ -231,7 +262,7 @@ def synchronize_segments(self, segment_names = None):
231262 """
232263 _LOGGER .info ('Synchronizing segments now.' )
233264 if segment_names is None :
234- segment_names = self ._split_storage .get_segment_names ()
265+ segment_names = self ._feature_flag_storage .get_segment_names ()
235266
236267 return_flag = True
237268 for segment_name in segment_names :
@@ -295,33 +326,118 @@ def _read_segment_from_json_file(self, filename):
295326 except Exception as exc :
296327 raise ValueError ("Error parsing file %s. Make sure it's readable." % filename ) from exc
297328
298- def _sanitize_segment (self , parsed ):
329+ def segment_exist_in_storage (self , segment_name ):
299330 """
300- Sanitize json elements.
331+ Check if a segment exists in the storage
301332
302- :param parsed: segment dict
303- :type parsed: Dict
333+ :param segment_name: Name of the segment
334+ :type segment_name: str
304335
305- :return: sanitized segment structure dict
306- :rtype: Dict
336+ :return: True if segment exist. False otherwise.
337+ :rtype: bool
307338 """
308- if 'name' not in parsed or parsed ['name' ] is None :
309- _LOGGER .warning ("Segment does not have [name] element, skipping" )
310- raise Exception ("Segment does not have [name] element" )
311- if parsed ['name' ].strip () == '' :
312- _LOGGER .warning ("Segment [name] element is blank, skipping" )
313- raise Exception ("Segment [name] element is blank" )
339+ return self ._segment_storage .get (segment_name ) != None
314340
315- for element in [('till' , - 1 , - 1 , None , None , [0 ]),
316- ('added' , [], None , None , None , None ),
317- ('removed' , [], None , None , None , None )
318- ]:
319- parsed = util ._sanitize_object_element (parsed , 'segment' , element [0 ], element [1 ], lower_value = element [2 ], upper_value = element [3 ], in_list = None , not_in_list = element [5 ])
320- parsed = util ._sanitize_object_element (parsed , 'segment' , 'since' , parsed ['till' ], - 1 , parsed ['till' ], None , [0 ])
321341
322- return parsed
342+ class LocalSegmentSynchronizerAsync (LocalSegmentSynchronizerBase ):
343+ """Localhost mode segment async synchronizer."""
344+
345+ def __init__ (self , segment_folder , feature_flag_storage , segment_storage ):
346+ """
347+ Class constructor.
348+
349+ :param segment_folder: patch to the segment folder
350+ :type segment_folder: str
351+
352+ :param feature_flag_storage: Feature flag Storage.
353+ :type feature_flag_storage: splitio.storage.InMemorySplitStorage
354+
355+ :param segment_storage: Segment storage reference.
356+ :type segment_storage: splitio.storage.SegmentStorage
323357
324- def segment_exist_in_storage (self , segment_name ):
358+ """
359+ self ._segment_folder = segment_folder
360+ self ._feature_flag_storage = feature_flag_storage
361+ self ._segment_storage = segment_storage
362+ self ._segment_sha = {}
363+
364+ async def synchronize_segments (self , segment_names = None ):
365+ """
366+ Loop through given segment names and synchronize each one.
367+
368+ :param segment_names: Optional, array of segment names to update.
369+ :type segment_name: {str}
370+
371+ :return: True if no error occurs. False otherwise.
372+ :rtype: bool
373+ """
374+ _LOGGER .info ('Synchronizing segments now.' )
375+ if segment_names is None :
376+ segment_names = await self ._feature_flag_storage .get_segment_names ()
377+
378+ return_flag = True
379+ for segment_name in segment_names :
380+ if not await self .synchronize_segment (segment_name ):
381+ return_flag = False
382+
383+ return return_flag
384+
385+ async def synchronize_segment (self , segment_name , till = None ):
386+ """
387+ Update a segment from queue
388+
389+ :param segment_name: Name of the segment to update.
390+ :type segment_name: str
391+
392+ :param till: ChangeNumber received.
393+ :type till: int
394+
395+ :return: True if no error occurs. False otherwise.
396+ :rtype: bool
397+ """
398+ try :
399+ fetched = await self ._read_segment_from_json_file (segment_name )
400+ fetched_sha = util ._get_sha (json .dumps (fetched ))
401+ if not await self .segment_exist_in_storage (segment_name ):
402+ self ._segment_sha [segment_name ] = fetched_sha
403+ await self ._segment_storage .put (segments .from_raw (fetched ))
404+ _LOGGER .debug ("segment %s is added to storage" , segment_name )
405+ return True
406+
407+ if fetched_sha == self ._segment_sha [segment_name ]:
408+ return True
409+
410+ self ._segment_sha [segment_name ] = fetched_sha
411+ if await self ._segment_storage .get_change_number (segment_name ) > fetched ['till' ] and fetched ['till' ] != self ._DEFAULT_SEGMENT_TILL :
412+ return True
413+
414+ await self ._segment_storage .update (segment_name , fetched ['added' ], fetched ['removed' ], fetched ['till' ])
415+ _LOGGER .debug ("segment %s is updated" , segment_name )
416+ except Exception as e :
417+ _LOGGER .error ("Could not fetch segment: %s \n " + str (e ), segment_name )
418+ return False
419+
420+ return True
421+
422+ async def _read_segment_from_json_file (self , filename ):
423+ """
424+ Parse a segment and store in segment storage.
425+
426+ :param filename: Path of the file containing Feature flag
427+ :type filename: str.
428+
429+ :return: Sanitized segment structure
430+ :rtype: Dict
431+ """
432+ try :
433+ async with aiofiles .open (os .path .join (self ._segment_folder , "%s.json" % filename ), 'r' ) as flo :
434+ parsed = json .loads (await flo .read ())
435+ santitized_segment = self ._sanitize_segment (parsed )
436+ return santitized_segment
437+ except Exception as exc :
438+ raise ValueError ("Error parsing file %s. Make sure it's readable." % filename ) from exc
439+
440+ async def segment_exist_in_storage (self , segment_name ):
325441 """
326442 Check if a segment exists in the storage
327443
@@ -331,4 +447,4 @@ def segment_exist_in_storage(self, segment_name):
331447 :return: True if segment exist. False otherwise.
332448 :rtype: bool
333449 """
334- return self ._segment_storage .get (segment_name ) != None
450+ return await self ._segment_storage .get (segment_name ) != None
0 commit comments