Skip to content

Commit 0a5597d

Browse files
author
Matias Melograno
committed
added clear methods
1 parent 41c169c commit 0a5597d

File tree

12 files changed

+153
-23
lines changed

12 files changed

+153
-23
lines changed

splitio/client/client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
9090
if self.destroyed:
9191
_LOGGER.error("Client has already been destroyed - no calls possible")
9292
return CONTROL, None
93+
if self._factory._waiting_fork():
94+
_LOGGER.error("Client is not ready - no calls possible")
95+
return CONTROL, None
9396

9497
start = int(round(time.time() * 1000))
9598

@@ -143,6 +146,9 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
143146
if self.destroyed:
144147
_LOGGER.error("Client has already been destroyed - no calls possible")
145148
return input_validator.generate_control_treatments(features, method_name)
149+
if self._factory._waiting_fork():
150+
_LOGGER.error("Client is not ready - no calls possible")
151+
return input_validator.generate_control_treatments(features, method_name)
146152

147153
start = int(round(time.time() * 1000))
148154

@@ -363,6 +369,9 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
363369
if self.destroyed:
364370
_LOGGER.error("Client has already been destroyed - no calls possible")
365371
return False
372+
if self._factory._waiting_fork():
373+
_LOGGER.error("Client is not ready - no calls possible")
374+
return False
366375

367376
key = input_validator.validate_track_key(key)
368377
event_type = input_validator.validate_event_type(event_type)

splitio/client/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
'machineName': None,
5757
'machineIp': None,
5858
'splitFile': os.path.join(os.path.expanduser('~'), '.split'),
59-
'shouldHandlePostFork': False,
59+
'preforkedInitialization': False,
6060
}
6161

6262

splitio/client/factory.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def __init__( # pylint: disable=too-many-arguments
9191
recorder,
9292
sync_manager=None,
9393
sdk_ready_flag=None,
94-
should_handle_post_fork=False,
94+
preforked_initialization=False,
9595
):
9696
"""
9797
Class constructor.
@@ -108,25 +108,27 @@ def __init__( # pylint: disable=too-many-arguments
108108
:type sdk_ready_flag: threading.Event
109109
:param recorder: StatsRecorder instance
110110
:type recorder: StatsRecorder
111+
:param preforked_initialization: Whether should be instantiated as preforked or not.
112+
:type preforked_initialization: bool
111113
"""
112114
self._apikey = apikey
113115
self._storages = storages
114116
self._labels_enabled = labels_enabled
115117
self._sync_manager = sync_manager
116118
self._sdk_internal_ready_flag = sdk_ready_flag
117119
self._recorder = recorder
118-
self._should_handle_post_fork = should_handle_post_fork
119-
self._start_ready_updater()
120+
self._preforked_initialization = preforked_initialization
121+
self._start_status_updater()
120122

121-
def _start_ready_updater(self):
123+
def _start_status_updater(self):
122124
"""
123-
Perform ready updater
125+
Perform status updater
124126
"""
125-
# If we have a ready flag, it means we have sync tasks that need to finish
126-
# before the SDK client becomes ready.
127-
if self._should_handle_post_fork:
127+
if self._preforked_initialization:
128128
self._status = Status.WAITING_FORK
129129
return
130+
# If we have a ready flag, it means we have sync tasks that need to finish
131+
# before the SDK client becomes ready.
130132
if self._sdk_internal_ready_flag is not None:
131133
self._sdk_ready_flag = threading.Event()
132134
self._status = Status.NOT_INITIALIZED
@@ -243,35 +245,37 @@ def destroyed(self):
243245
"""
244246
return self._status == Status.DESTROYED
245247

246-
@property
247-
def waiting_fork(self):
248+
def _waiting_fork(self):
248249
"""
249-
Return whether the factory is waiting for fork recreation or not.
250+
Return whether the factory is waiting to be recreated by forking or not.
250251
251-
:return: True if the factory is waiting for fork recreation. False otherwise.
252+
:return: True if the factory is waiting to be recreated by forking. False otherwise.
252253
:rtype: bool
253254
"""
254255
return self._status == Status.WAITING_FORK
255256

256257
def handle_post_fork(self):
257258
"""
258-
Function capable to re-synchronize splits data on fork processes.
259+
Function in charge of starting periodic/realtime synchronization after a fork.
259260
"""
260-
if not self.waiting_fork:
261+
if not self._waiting_fork():
261262
_LOGGER.warning('Cannot call handle_post_fork')
262263
return
263-
self._should_handle_post_fork = False # Reset for updater
264264
self._sync_manager.recreate()
265265
sdk_ready_flag = threading.Event()
266266
self._sdk_internal_ready_flag = sdk_ready_flag
267267
self._sync_manager._ready_flag = sdk_ready_flag
268+
self._get_storage('telemetry').clear()
269+
self._get_storage('impressions').clear()
270+
self._get_storage('events').clear()
268271
initialization_thread = threading.Thread(
269272
target=self._sync_manager.start,
270273
name="SDKInitializer",
271274
)
272275
initialization_thread.setDaemon(True)
273276
initialization_thread.start()
274-
self._start_ready_updater()
277+
self._preforked_initialization = False # reset for status updater
278+
self._start_status_updater()
275279

276280

277281
def _wrap_impression_listener(listener, metadata):
@@ -360,9 +364,9 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
360364

361365
synchronizer = Synchronizer(synchronizers, tasks)
362366

363-
should_handle_post_fork = cfg.get('shouldHandlePostFork', False)
367+
preforked_initialization = cfg.get('preforkedInitialization', False)
364368

365-
sdk_ready_flag = threading.Event() if not should_handle_post_fork else None
369+
sdk_ready_flag = threading.Event() if not preforked_initialization else None
366370
manager = Manager(sdk_ready_flag, synchronizer, apis['auth'], cfg['streamingEnabled'],
367371
streaming_api_base_url)
368372

@@ -376,10 +380,11 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
376380
storages['impressions'],
377381
)
378382

379-
if should_handle_post_fork:
383+
if preforked_initialization:
380384
synchronizer.sync_all()
385+
synchronizer._split_synchronizers._segment_sync.shutdown()
381386
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
382-
recorder, manager, should_handle_post_fork=should_handle_post_fork)
387+
recorder, manager, preforked_initialization=preforked_initialization)
383388

384389
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
385390
initialization_thread.setDaemon(True)

splitio/client/localhost.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ def pop_many(self, *_, **__): # pylint: disable=arguments-differ
3030
"""Accept any arguments and do nothing."""
3131
pass
3232

33+
def clear(self, *_, **__): # pylint: disable=arguments-differ
34+
"""Accept any arguments and do nothing."""
35+
pass
36+
3337

3438
class LocalhostEventsStorage(EventStorage):
3539
"""Impression storage that doesn't cache anything."""
@@ -42,6 +46,10 @@ def pop_many(self, *_, **__): # pylint: disable=arguments-differ
4246
"""Accept any arguments and do nothing."""
4347
pass
4448

49+
def clear(self, *_, **__): # pylint: disable=arguments-differ
50+
"""Accept any arguments and do nothing."""
51+
pass
52+
4553

4654
class LocalhostTelemetryStorage(TelemetryStorage):
4755
"""Impression storage that doesn't cache anything."""
@@ -69,3 +77,7 @@ def pop_counters(self, *_, **__): # pylint: disable=arguments-differ
6977
def pop_gauges(self, *_, **__): # pylint: disable=arguments-differ
7078
"""Accept any arguments and do nothing."""
7179
pass
80+
81+
def clear(self, *_, **__): # pylint: disable=arguments-differ
82+
"""Accept any arguments and do nothing."""
83+
pass

splitio/client/manager.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ def split_names(self):
3232
if self._factory.destroyed:
3333
_LOGGER.error("Client has already been destroyed - no calls possible.")
3434
return []
35+
if self._factory._waiting_fork():
36+
_LOGGER.error("Client is not ready - no calls possible")
37+
return []
3538

3639
if not self._factory.ready:
3740
_LOGGER.warning(
@@ -51,6 +54,9 @@ def splits(self):
5154
if self._factory.destroyed:
5255
_LOGGER.error("Client has already been destroyed - no calls possible.")
5356
return []
57+
if self._factory._waiting_fork():
58+
_LOGGER.error("Client is not ready - no calls possible")
59+
return []
5460

5561
if not self._factory.ready:
5662
_LOGGER.warning(
@@ -72,7 +78,10 @@ def split(self, feature_name):
7278
"""
7379
if self._factory.destroyed:
7480
_LOGGER.error("Client has already been destroyed - no calls possible.")
75-
return []
81+
return None
82+
if self._factory._waiting_fork():
83+
_LOGGER.error("Client is not ready - no calls possible")
84+
return None
7685

7786
feature_name = input_validator.validate_manager_feature_name(
7887
feature_name,

splitio/storage/__init__.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,13 @@ def pop_many(self, count):
237237
"""
238238
pass
239239

240+
@abc.abstractmethod
241+
def clear(self):
242+
"""
243+
Clear data.
244+
"""
245+
pass
246+
240247

241248
@add_metaclass(abc.ABCMeta)
242249
class ImpressionPipelinedStorage(object):
@@ -279,6 +286,13 @@ def pop_many(self, count):
279286
"""
280287
pass
281288

289+
@abc.abstractmethod
290+
def clear(self):
291+
"""
292+
Clear data.
293+
"""
294+
pass
295+
282296

283297
@add_metaclass(abc.ABCMeta)
284298
class TelemetryStorage(object):
@@ -346,6 +360,13 @@ def pop_latencies(self):
346360
"""
347361
pass
348362

363+
@abc.abstractmethod
364+
def clear(self):
365+
"""
366+
Clear data.
367+
"""
368+
pass
369+
349370

350371
@add_metaclass(abc.ABCMeta)
351372
class TelemetryPipelinedStorage(object):

splitio/storage/inmemmory.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ def __init__(self, queue_size):
291291
292292
:param eventsQueueSize: How many events to queue before forcing a submission
293293
"""
294+
self._queue_size = queue_size
294295
self._impressions = queue.Queue(maxsize=queue_size)
295296
self._lock = threading.Lock()
296297
self._queue_full_hook = None
@@ -339,6 +340,13 @@ def pop_many(self, count):
339340
count -= 1
340341
return impressions
341342

343+
def clear(self):
344+
"""
345+
Clear data.
346+
"""
347+
with self._lock:
348+
self._impressions = queue.Queue(maxsize=self._queue_size)
349+
342350

343351
class InMemoryEventStorage(EventStorage):
344352
"""
@@ -353,6 +361,7 @@ def __init__(self, eventsQueueSize):
353361
354362
:param eventsQueueSize: How many events to queue before forcing a submission
355363
"""
364+
self._queue_size = eventsQueueSize
356365
self._lock = threading.Lock()
357366
self._events = queue.Queue(maxsize=eventsQueueSize)
358367
self._queue_full_hook = None
@@ -407,6 +416,13 @@ def pop_many(self, count):
407416
self._size = 0
408417
return events
409418

419+
def clear(self):
420+
"""
421+
Clear data.
422+
"""
423+
with self._lock:
424+
self._events = queue.Queue(maxsize=self._queue_size)
425+
410426

411427
class InMemoryTelemetryStorage(TelemetryStorage):
412428
"""In-Memory implementation of telemetry storage interface."""
@@ -498,3 +514,14 @@ def pop_latencies(self):
498514
return self._latencies
499515
finally:
500516
self._latencies = {}
517+
518+
def clear(self):
519+
"""
520+
Clear data.
521+
"""
522+
with self._latencies_lock:
523+
self._latencies = {}
524+
with self._gauges_lock:
525+
self._gauges = {}
526+
with self._counters_lock:
527+
self._counters = {}

splitio/storage/redis.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,12 @@ def pop_many(self, count):
453453
"""
454454
raise NotImplementedError('Only redis-consumer mode is supported.')
455455

456+
def clear(self):
457+
"""
458+
Clear data.
459+
"""
460+
raise NotImplementedError('Not supported for redis.')
461+
456462

457463
class RedisEventsStorage(EventStorage):
458464
"""Redis based event storage class."""
@@ -517,6 +523,12 @@ def pop_many(self, count):
517523
"""
518524
raise NotImplementedError('Only redis-consumer mode is supported.')
519525

526+
def clear(self):
527+
"""
528+
Clear data.
529+
"""
530+
raise NotImplementedError('Not supported for redis.')
531+
520532

521533
class RedisTelemetryStorage(TelemetryStorage, TelemetryPipelinedStorage):
522534
"""Redis-based Telemetry storage."""
@@ -694,3 +706,9 @@ def pop_latencies(self):
694706
:rtype: list
695707
"""
696708
raise NotImplementedError('Only redis-consumer mode is supported.')
709+
710+
def clear(self):
711+
"""
712+
Clear data.
713+
"""
714+
raise NotImplementedError('Not supported for redis.')

splitio/storage/uwsgi.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,12 @@ def acknowledge_flush(self):
511511
"""Acknowledge that a flush has been requested."""
512512
self._uwsgi.cache_del(self._IMPRESSIONS_FLUSH, _SPLITIO_LOCK_CACHE_NAMESPACE)
513513

514+
def clear(self):
515+
"""
516+
Clear data.
517+
"""
518+
raise NotImplementedError('Not supported for uwsgi.')
519+
514520

515521
class UWSGIEventStorage(EventStorage):
516522
"""Events storage interface."""
@@ -602,6 +608,12 @@ def acknowledge_flush(self):
602608
"""Acknowledge that a flush has been requested."""
603609
self._uwsgi.cache_del(self._EVENTS_FLUSH, _SPLITIO_LOCK_CACHE_NAMESPACE)
604610

611+
def clear(self):
612+
"""
613+
Clear data.
614+
"""
615+
raise NotImplementedError('Not supported for uwsgi.')
616+
605617

606618
class UWSGITelemetryStorage(TelemetryStorage):
607619
"""Telemetry storage interface."""
@@ -726,3 +738,9 @@ def pop_latencies(self):
726738
self._LATENCIES_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
727739
self._uwsgi.cache_del(self._LATENCIES_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
728740
return json.loads(latencies_raw) if latencies_raw else {}
741+
742+
def clear(self):
743+
"""
744+
Clear data.
745+
"""
746+
raise NotImplementedError('Not supported for uwsgi.')

splitio/sync/segment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ def recreate(self):
3535
Create worker_pool on forked processes.
3636
3737
"""
38-
self.shutdown()
3938
self._worker_pool = workerpool.WorkerPool(10, self.synchronize_segment)
4039
self._worker_pool.start()
4140

0 commit comments

Comments
 (0)