Skip to content

Commit 2dea20e

Browse files
authored
Merge pull request #217 from splitio/feature/uwsgihooks
initial handle_fork
2 parents cfc8411 + 0d308de commit 2dea20e

19 files changed

+426
-11
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
8.3.2 (Dec XX, 2020)
22
- Added RecordStats for supporting pipelined recording in redis when treatment call is made.
3+
- Added hooks support for UWSGI.
34

45
8.3.1 (Nov 20, 2020)
56
- Fixed error handling when split server fails, so that it doesn't bring streaming down.

splitio/client/client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
9090
if self.destroyed:
9191
_LOGGER.error("Client has already been destroyed - no calls possible")
9292
return CONTROL, None
93+
if self._factory._waiting_fork():
94+
_LOGGER.error("Client is not ready - no calls possible")
95+
return CONTROL, None
9396

9497
start = int(round(time.time() * 1000))
9598

@@ -143,6 +146,9 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
143146
if self.destroyed:
144147
_LOGGER.error("Client has already been destroyed - no calls possible")
145148
return input_validator.generate_control_treatments(features, method_name)
149+
if self._factory._waiting_fork():
150+
_LOGGER.error("Client is not ready - no calls possible")
151+
return input_validator.generate_control_treatments(features, method_name)
146152

147153
start = int(round(time.time() * 1000))
148154

@@ -363,6 +369,9 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
363369
if self.destroyed:
364370
_LOGGER.error("Client has already been destroyed - no calls possible")
365371
return False
372+
if self._factory._waiting_fork():
373+
_LOGGER.error("Client is not ready - no calls possible")
374+
return False
366375

367376
key = input_validator.validate_track_key(key)
368377
event_type = input_validator.validate_event_type(event_type)

splitio/client/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@
5555
'redisMaxConnections': None,
5656
'machineName': None,
5757
'machineIp': None,
58-
'splitFile': os.path.join(os.path.expanduser('~'), '.split')
58+
'splitFile': os.path.join(os.path.expanduser('~'), '.split'),
59+
'preforkedInitialization': False,
5960
}
6061

6162

splitio/client/factory.py

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class Status(Enum):
7171
NOT_INITIALIZED = 'NOT_INITIALIZED'
7272
READY = 'READY'
7373
DESTROYED = 'DESTROYED'
74+
WAITING_FORK = 'WAITING_FORK'
7475

7576

7677
class TimeoutException(Exception):
@@ -90,6 +91,7 @@ def __init__( # pylint: disable=too-many-arguments
9091
recorder,
9192
sync_manager=None,
9293
sdk_ready_flag=None,
94+
preforked_initialization=False,
9395
):
9496
"""
9597
Class constructor.
@@ -106,18 +108,29 @@ def __init__( # pylint: disable=too-many-arguments
106108
:type sdk_ready_flag: threading.Event
107109
:param recorder: StatsRecorder instance
108110
:type recorder: StatsRecorder
111+
:param preforked_initialization: Whether should be instantiated as preforked or not.
112+
:type preforked_initialization: bool
109113
"""
110114
self._apikey = apikey
111115
self._storages = storages
112116
self._labels_enabled = labels_enabled
113117
self._sync_manager = sync_manager
114118
self._sdk_internal_ready_flag = sdk_ready_flag
115-
self._sdk_ready_flag = threading.Event()
116119
self._recorder = recorder
120+
self._preforked_initialization = preforked_initialization
121+
self._start_status_updater()
117122

123+
def _start_status_updater(self):
124+
"""
125+
Perform status updater
126+
"""
127+
if self._preforked_initialization:
128+
self._status = Status.WAITING_FORK
129+
return
118130
# If we have a ready flag, it means we have sync tasks that need to finish
119131
# before the SDK client becomes ready.
120132
if self._sdk_internal_ready_flag is not None:
133+
self._sdk_ready_flag = threading.Event()
121134
self._status = Status.NOT_INITIALIZED
122135
# add a listener that updates the status to READY once the flag is set.
123136
ready_updater = threading.Thread(target=self._update_status_when_ready,
@@ -232,6 +245,38 @@ def destroyed(self):
232245
"""
233246
return self._status == Status.DESTROYED
234247

248+
def _waiting_fork(self):
249+
"""
250+
Return whether the factory is waiting to be recreated by forking or not.
251+
252+
:return: True if the factory is waiting to be recreated by forking. False otherwise.
253+
:rtype: bool
254+
"""
255+
return self._status == Status.WAITING_FORK
256+
257+
def handle_post_fork(self):
258+
"""
259+
Function in charge of starting periodic/realtime synchronization after a fork.
260+
"""
261+
if not self._waiting_fork():
262+
_LOGGER.warning('Cannot call handle_post_fork')
263+
return
264+
self._sync_manager.recreate()
265+
sdk_ready_flag = threading.Event()
266+
self._sdk_internal_ready_flag = sdk_ready_flag
267+
self._sync_manager._ready_flag = sdk_ready_flag
268+
self._get_storage('telemetry').clear()
269+
self._get_storage('impressions').clear()
270+
self._get_storage('events').clear()
271+
initialization_thread = threading.Thread(
272+
target=self._sync_manager.start,
273+
name="SDKInitializer",
274+
)
275+
initialization_thread.setDaemon(True)
276+
initialization_thread.start()
277+
self._preforked_initialization = False # reset for status updater
278+
self._start_status_updater()
279+
235280

236281
def _wrap_impression_listener(listener, metadata):
237282
"""
@@ -319,14 +364,12 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
319364

320365
synchronizer = Synchronizer(synchronizers, tasks)
321366

322-
sdk_ready_flag = threading.Event()
367+
preforked_initialization = cfg.get('preforkedInitialization', False)
368+
369+
sdk_ready_flag = threading.Event() if not preforked_initialization else None
323370
manager = Manager(sdk_ready_flag, synchronizer, apis['auth'], cfg['streamingEnabled'],
324371
streaming_api_base_url)
325372

326-
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
327-
initialization_thread.setDaemon(True)
328-
initialization_thread.start()
329-
330373
storages['events'].set_queue_full_hook(tasks.events_task.flush)
331374
storages['impressions'].set_queue_full_hook(tasks.impressions_task.flush)
332375

@@ -336,6 +379,17 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
336379
storages['events'],
337380
storages['impressions'],
338381
)
382+
383+
if preforked_initialization:
384+
synchronizer.sync_all()
385+
synchronizer._split_synchronizers._segment_sync.shutdown()
386+
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
387+
recorder, manager, preforked_initialization=preforked_initialization)
388+
389+
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
390+
initialization_thread.setDaemon(True)
391+
initialization_thread.start()
392+
339393
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
340394
recorder, manager, sdk_ready_flag)
341395

@@ -387,6 +441,10 @@ def _build_uwsgi_factory(api_key, cfg):
387441
storages['events'],
388442
storages['impressions'],
389443
)
444+
_LOGGER.warning(
445+
"Beware: uwsgi-cache based operation mode is soon to be deprecated. Please consider " +
446+
"redis if you need a centralized point of syncrhonization, or in-memory (with preforking " +
447+
"support enabled) if running uwsgi with a master and several http workers)")
390448
return SplitFactory(
391449
api_key,
392450
storages,

splitio/client/localhost.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ def pop_many(self, *_, **__): # pylint: disable=arguments-differ
3030
"""Accept any arguments and do nothing."""
3131
pass
3232

33+
def clear(self, *_, **__): # pylint: disable=arguments-differ
34+
"""Accept any arguments and do nothing."""
35+
pass
36+
3337

3438
class LocalhostEventsStorage(EventStorage):
3539
"""Impression storage that doesn't cache anything."""
@@ -42,6 +46,10 @@ def pop_many(self, *_, **__): # pylint: disable=arguments-differ
4246
"""Accept any arguments and do nothing."""
4347
pass
4448

49+
def clear(self, *_, **__): # pylint: disable=arguments-differ
50+
"""Accept any arguments and do nothing."""
51+
pass
52+
4553

4654
class LocalhostTelemetryStorage(TelemetryStorage):
4755
"""Impression storage that doesn't cache anything."""
@@ -69,3 +77,7 @@ def pop_counters(self, *_, **__): # pylint: disable=arguments-differ
6977
def pop_gauges(self, *_, **__): # pylint: disable=arguments-differ
7078
"""Accept any arguments and do nothing."""
7179
pass
80+
81+
def clear(self, *_, **__): # pylint: disable=arguments-differ
82+
"""Accept any arguments and do nothing."""
83+
pass

splitio/client/manager.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ def split_names(self):
3232
if self._factory.destroyed:
3333
_LOGGER.error("Client has already been destroyed - no calls possible.")
3434
return []
35+
if self._factory._waiting_fork():
36+
_LOGGER.error("Client is not ready - no calls possible")
37+
return []
3538

3639
if not self._factory.ready:
3740
_LOGGER.warning(
@@ -51,6 +54,9 @@ def splits(self):
5154
if self._factory.destroyed:
5255
_LOGGER.error("Client has already been destroyed - no calls possible.")
5356
return []
57+
if self._factory._waiting_fork():
58+
_LOGGER.error("Client is not ready - no calls possible")
59+
return []
5460

5561
if not self._factory.ready:
5662
_LOGGER.warning(
@@ -72,7 +78,10 @@ def split(self, feature_name):
7278
"""
7379
if self._factory.destroyed:
7480
_LOGGER.error("Client has already been destroyed - no calls possible.")
75-
return []
81+
return None
82+
if self._factory._waiting_fork():
83+
_LOGGER.error("Client is not ready - no calls possible")
84+
return None
7685

7786
feature_name = input_validator.validate_manager_feature_name(
7887
feature_name,

splitio/storage/__init__.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,13 @@ def pop_many(self, count):
237237
"""
238238
pass
239239

240+
@abc.abstractmethod
241+
def clear(self):
242+
"""
243+
Clear data.
244+
"""
245+
pass
246+
240247

241248
@add_metaclass(abc.ABCMeta)
242249
class ImpressionPipelinedStorage(object):
@@ -279,6 +286,13 @@ def pop_many(self, count):
279286
"""
280287
pass
281288

289+
@abc.abstractmethod
290+
def clear(self):
291+
"""
292+
Clear data.
293+
"""
294+
pass
295+
282296

283297
@add_metaclass(abc.ABCMeta)
284298
class TelemetryStorage(object):
@@ -346,6 +360,13 @@ def pop_latencies(self):
346360
"""
347361
pass
348362

363+
@abc.abstractmethod
364+
def clear(self):
365+
"""
366+
Clear data.
367+
"""
368+
pass
369+
349370

350371
@add_metaclass(abc.ABCMeta)
351372
class TelemetryPipelinedStorage(object):

splitio/storage/inmemmory.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ def __init__(self, queue_size):
291291
292292
:param eventsQueueSize: How many events to queue before forcing a submission
293293
"""
294+
self._queue_size = queue_size
294295
self._impressions = queue.Queue(maxsize=queue_size)
295296
self._lock = threading.Lock()
296297
self._queue_full_hook = None
@@ -339,6 +340,13 @@ def pop_many(self, count):
339340
count -= 1
340341
return impressions
341342

343+
def clear(self):
344+
"""
345+
Clear data.
346+
"""
347+
with self._lock:
348+
self._impressions = queue.Queue(maxsize=self._queue_size)
349+
342350

343351
class InMemoryEventStorage(EventStorage):
344352
"""
@@ -353,6 +361,7 @@ def __init__(self, eventsQueueSize):
353361
354362
:param eventsQueueSize: How many events to queue before forcing a submission
355363
"""
364+
self._queue_size = eventsQueueSize
356365
self._lock = threading.Lock()
357366
self._events = queue.Queue(maxsize=eventsQueueSize)
358367
self._queue_full_hook = None
@@ -407,6 +416,13 @@ def pop_many(self, count):
407416
self._size = 0
408417
return events
409418

419+
def clear(self):
420+
"""
421+
Clear data.
422+
"""
423+
with self._lock:
424+
self._events = queue.Queue(maxsize=self._queue_size)
425+
410426

411427
class InMemoryTelemetryStorage(TelemetryStorage):
412428
"""In-Memory implementation of telemetry storage interface."""
@@ -498,3 +514,14 @@ def pop_latencies(self):
498514
return self._latencies
499515
finally:
500516
self._latencies = {}
517+
518+
def clear(self):
519+
"""
520+
Clear data.
521+
"""
522+
with self._latencies_lock:
523+
self._latencies = {}
524+
with self._gauges_lock:
525+
self._gauges = {}
526+
with self._counters_lock:
527+
self._counters = {}

splitio/storage/redis.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,12 @@ def pop_many(self, count):
453453
"""
454454
raise NotImplementedError('Only redis-consumer mode is supported.')
455455

456+
def clear(self):
457+
"""
458+
Clear data.
459+
"""
460+
raise NotImplementedError('Not supported for redis.')
461+
456462

457463
class RedisEventsStorage(EventStorage):
458464
"""Redis based event storage class."""
@@ -517,6 +523,12 @@ def pop_many(self, count):
517523
"""
518524
raise NotImplementedError('Only redis-consumer mode is supported.')
519525

526+
def clear(self):
527+
"""
528+
Clear data.
529+
"""
530+
raise NotImplementedError('Not supported for redis.')
531+
520532

521533
class RedisTelemetryStorage(TelemetryStorage, TelemetryPipelinedStorage):
522534
"""Redis-based Telemetry storage."""
@@ -694,3 +706,9 @@ def pop_latencies(self):
694706
:rtype: list
695707
"""
696708
raise NotImplementedError('Only redis-consumer mode is supported.')
709+
710+
def clear(self):
711+
"""
712+
Clear data.
713+
"""
714+
raise NotImplementedError('Not supported for redis.')

0 commit comments

Comments
 (0)