Skip to content

Commit 9564eab

Browse files
committed
Implemented redis none mode
1 parent 4b5281f commit 9564eab

File tree

5 files changed

+137
-12
lines changed

5 files changed

+137
-12
lines changed

splitio/client/factory.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from splitio.engine.impressions import ImpressionsMode
1616
from splitio.engine.manager import Counter as ImpressionsCounter
1717
from splitio.engine.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode
18-
from splitio.engine.adapters import InMemorySenderAdapter
18+
from splitio.engine.adapters import InMemorySenderAdapter, RedisSenderAdapter
1919

2020
# Storage
2121
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
@@ -43,7 +43,7 @@
4343
# Synchronizer
4444
from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \
4545
LocalhostSynchronizer
46-
from splitio.sync.manager import Manager
46+
from splitio.sync.manager import Manager, RedisManager
4747
from splitio.sync.split import SplitSynchronizer, LocalSplitSynchronizer
4848
from splitio.sync.segment import SegmentSynchronizer
4949
from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer
@@ -215,6 +215,7 @@ def destroy(self, destroyed_event=None):
215215
return
216216

217217
try:
218+
_LOGGER.info('Factory destroy called, stopping tasks.')
218219
if self._sync_manager is not None:
219220
if destroyed_event is not None:
220221

@@ -424,7 +425,23 @@ def _build_redis_factory(api_key, cfg):
424425
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
425426
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
426427

427-
imp_strategy = StrategyDebugMode() if cfg['impressionsMode'] == ImpressionsMode.DEBUG else StrategyOptimizedMode(ImpressionsCounter())
428+
imp_counter = ImpressionsCounter() if cfg['impressionsMode'] != ImpressionsMode.DEBUG else None
429+
unique_keys_synchronizer = None
430+
clear_filter_sync = None
431+
unique_keys_task = None
432+
clear_filter_task = None
433+
if cfg['impressionsMode'] == ImpressionsMode.NONE:
434+
imp_strategy = StrategyNoneMode(imp_counter)
435+
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
436+
unique_keys_synchronizer = UniqueKeysSynchronizer(RedisSenderAdapter(redis_adapter), imp_strategy.get_unique_keys_tracker())
437+
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
438+
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
439+
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
440+
elif cfg['impressionsMode'] == ImpressionsMode.DEBUG:
441+
imp_strategy = StrategyDebugMode()
442+
else:
443+
imp_strategy = StrategyOptimizedMode(imp_counter)
444+
428445
imp_manager = ImpressionsManager(
429446
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
430447
imp_strategy)
@@ -436,11 +453,17 @@ def _build_redis_factory(api_key, cfg):
436453
storages['impressions'],
437454
data_sampling,
438455
)
456+
manager = RedisManager(unique_keys_task, clear_filter_task)
457+
initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
458+
initialization_thread.setDaemon(True)
459+
initialization_thread.start()
460+
439461
return SplitFactory(
440462
api_key,
441463
storages,
442464
cfg['labelsEnabled'],
443465
recorder,
466+
manager,
444467
)
445468

446469

splitio/engine/adapters.py

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import abc
2+
import logging
23
import json
34

5+
from splitio.storage.adapters.redis import RedisAdapterException
6+
7+
_LOGGER = logging.getLogger(__name__)
48

59
class ImpressionsSenderAdapter(object, metaclass=abc.ABCMeta):
610
"""Impressions Sender Adapter interface."""
@@ -32,18 +36,60 @@ def record_unique_keys(self, uniques):
3236
:param uniques: unique keys disctionary
3337
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
3438
"""
35-
self._telemtry_http_client.record_unique_keys(self._uniques_formatter(uniques))
39+
self._telemtry_http_client.record_unique_keys({'keys': self._uniques_formatter(uniques)})
40+
41+
def _uniques_formatter(self, uniques):
42+
"""
43+
Format the unique keys dictionary array to a JSON body
44+
45+
:param uniques: unique keys disctionary
46+
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
47+
48+
:return: unique keys JSON array
49+
:rtype: json
50+
"""
51+
return [{'f': feature, 'ks': list(keys)} for feature, keys in uniques.items()]
52+
53+
class RedisSenderAdapter(ImpressionsSenderAdapter):
54+
"""In Memory Impressions Sender Adapter class."""
55+
56+
MTK_QUEUE_KEY = 'SPLITIO.uniquekeys'
57+
MTK_KEY_DEFAULT_TTL = 3600
58+
59+
def __init__(self, redis_client):
60+
"""
61+
Initialize In memory sender adapter instance
62+
63+
:param telemtry_http_client: instance of telemetry http api
64+
:type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
65+
"""
66+
self._redis_client = redis_client
67+
68+
def record_unique_keys(self, uniques):
69+
"""
70+
post the unique keys to split back end.
71+
72+
:param uniques: unique keys disctionary
73+
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
74+
"""
75+
bulk_mtks = self._uniques_formatter(uniques)
76+
try:
77+
self._redis_client.rpush(self.MTK_QUEUE_KEY, *bulk_mtks)
78+
self._redis_client.expire(self.MTK_QUEUE_KEY, self.MTK_KEY_DEFAULT_TTL)
79+
return True
80+
except RedisAdapterException:
81+
_LOGGER.error('Something went wrong when trying to add mtks to redis')
82+
_LOGGER.error('Error: ', exc_info=True)
83+
return False
3684

3785
def _uniques_formatter(self, uniques):
3886
"""
39-
Format the unique keys dictionary to a JSON body
87+
Format the unique keys dictionary array to a JSON body
4088
4189
:param uniques: unique keys disctionary
4290
:type uniques: Dictionary {'feature1': set(), 'feature2': set(), .. }
4391
44-
:return: unique keys JSON
92+
:return: unique keys JSON array
4593
:rtype: json
4694
"""
47-
return {
48-
'keys': [{'f': feature, 'ks': list(keys)} for feature, keys in uniques.items()]
49-
}
95+
return [json.dumps({'f': feature, 'ks': list(keys)}) for feature, keys in uniques.items()]

splitio/engine/unique_keys_tracker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def track(self, key, feature_name):
5959
'Unique Keys queue is full, flushing the current queue now.'
6060
)
6161
if self._queue_full_hook is not None and callable(self._queue_full_hook):
62+
_LOGGER.info('Calling hook.')
6263
self._queue_full_hook()
6364
return True
6465

splitio/sync/manager.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Synchronization manager module."""
22
import logging
33
import time
4+
import threading
45
from threading import Thread
56
from queue import Queue
67
from splitio.push.manager import PushManager, Status
@@ -127,3 +128,60 @@ def _streaming_feedback_handler(self):
127128
self._synchronizer.start_periodic_fetching()
128129
_LOGGER.info('non-recoverable error in streaming. switching to polling.')
129130
return
131+
132+
class RedisManager(object): # pylint:disable=too-many-instance-attributes
133+
"""Manager Class."""
134+
135+
_CENTINEL_EVENT = object()
136+
137+
def __init__(self, unique_keys_task, clear_filter_task): # pylint:disable=too-many-arguments
138+
"""
139+
Construct Manager.
140+
141+
:param unique_keys_task: unique keys task instance
142+
:type unique_keys_task: splitio.tasks.unique_keys_sync.UniqueKeysSyncTask
143+
144+
:param clear_filter_task: clear filter task instance
145+
:type clear_filter_task: splitio.tasks.clear_filter_task.ClearFilterSynchronizer
146+
147+
"""
148+
self._unique_keys_task = unique_keys_task
149+
self._clear_filter_task = clear_filter_task
150+
self._ready_flag = True
151+
152+
def recreate(self):
153+
"""Not implemented"""
154+
return
155+
156+
def start(self):
157+
"""Start the SDK synchronization tasks."""
158+
try:
159+
self._unique_keys_task.start()
160+
self._clear_filter_task.start()
161+
162+
except (APIException, RuntimeError):
163+
_LOGGER.error('Exception raised starting Split Manager')
164+
_LOGGER.debug('Exception information: ', exc_info=True)
165+
raise
166+
167+
def stop(self, blocking):
168+
"""
169+
Stop manager logic.
170+
171+
:param blocking: flag to wait until tasks are stopped
172+
:type blocking: bool
173+
"""
174+
_LOGGER.info('Stopping manager tasks')
175+
if blocking:
176+
events = []
177+
tasks = [self._unique_keys_task,
178+
self._clear_filter_task]
179+
for task in tasks:
180+
stop_event = threading.Event()
181+
task.stop(stop_event)
182+
events.append(stop_event)
183+
if all(event.wait() for event in events):
184+
_LOGGER.debug('all tasks finished successfully.')
185+
else:
186+
self._unique_keys_task.stop()
187+
self._clear_filter_task.stop()

splitio/sync/unique_keys.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
from splitio.engine.filters import BloomFilter
2-
from splitio.engine.adapters import InMemorySenderAdapter
3-
41
_UNIQUE_KEYS_MAX_BULK_SIZE = 5000
52

63
class UniqueKeysSynchronizer(object):

0 commit comments

Comments
 (0)