Skip to content

Commit 85a5b6d

Browse files
committed
ported fixes from MTK and updated tests
1 parent b0bfba4 commit 85a5b6d

26 files changed

+370
-355
lines changed

splitio/client/factory.py

Lines changed: 27 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
from splitio.client.config import sanitize as sanitize_config, DEFAULT_DATA_SAMPLING
1212
from splitio.client import util
1313
from splitio.client.listener import ImpressionListenerWrapper
14-
from splitio.engine.impressions import Manager as ImpressionsManager
14+
from splitio.engine.impressions.impressions import Manager as ImpressionsManager
1515
from splitio.engine.impressions import ImpressionsMode
16-
from splitio.engine.manager import Counter as ImpressionsCounter
17-
from splitio.engine.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode
18-
from splitio.engine.adapters import InMemorySenderAdapter, RedisSenderAdapter
16+
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
17+
from splitio.engine.impressions.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode
18+
from splitio.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter
19+
from splitio.engine.impressions import set_classes
1920

2021
# Storage
2122
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
@@ -291,17 +292,17 @@ def _wrap_impression_listener(listener, metadata):
291292
return None
292293

293294

294-
def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pylint:disable=too-many-arguments,too-many-locals
295+
def _build_in_memory_factory(api_key, cfg, extra_cfg, sdk_url=None, events_url=None, # pylint:disable=too-many-arguments,too-many-locals
295296
auth_api_base_url=None, streaming_api_base_url=None, telemetry_api_base_url=None):
296297
"""Build and return a split factory tailored to the supplied config."""
297298
if not input_validator.validate_factory_instantiation(api_key):
298299
return None
299300

300-
cfg['sdk_url'] = sdk_url
301-
cfg['events_url'] = events_url
302-
cfg['auth_url'] = auth_api_base_url
303-
cfg['streaming_url'] = streaming_api_base_url
304-
cfg['telemetry_api_url'] = telemetry_api_base_url
301+
extra_cfg['sdk_url'] = sdk_url
302+
extra_cfg['events_url'] = events_url
303+
extra_cfg['auth_url'] = auth_api_base_url
304+
extra_cfg['streaming_url'] = streaming_api_base_url
305+
extra_cfg['telemetry_api_url'] = telemetry_api_base_url
305306

306307
http_client = HttpClient(
307308
sdk_url=sdk_url,
@@ -330,30 +331,10 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
330331
'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize']),
331332
'events': InMemoryEventStorage(cfg['eventsQueueSize']),
332333
}
333-
imp_counter = ImpressionsCounter() if cfg['impressionsMode'] != ImpressionsMode.DEBUG else None
334-
335-
unique_keys_synchronizer = None
336-
clear_filter_sync = None
337-
unique_keys_task = None
338-
clear_filter_task = None
339-
impressions_count_sync = None
340-
impressions_count_task = None
341-
342-
if cfg['impressionsMode'] == ImpressionsMode.NONE:
343-
imp_strategy = StrategyNoneMode(imp_counter)
344-
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
345-
unique_keys_synchronizer = UniqueKeysSynchronizer(InMemorySenderAdapter(apis['telemetry']), imp_strategy.get_unique_keys_tracker())
346-
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
347-
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
348-
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
349-
impressions_count_sync = ImpressionsCountSynchronizer(apis['impressions'], imp_counter)
350-
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
351-
elif cfg['impressionsMode'] == ImpressionsMode.DEBUG:
352-
imp_strategy = StrategyDebugMode()
353-
else:
354-
imp_strategy = StrategyOptimizedMode(imp_counter)
355-
impressions_count_sync = ImpressionsCountSynchronizer(apis['impressions'], imp_counter)
356-
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
334+
335+
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
336+
clear_filter_task, impressions_count_sync, impressions_count_task, \
337+
imp_strategy = set_classes('MEMORY', cfg['impressionsMode'], apis)
357338

358339
imp_manager = ImpressionsManager(
359340
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
@@ -438,31 +419,9 @@ def _build_redis_factory(api_key, cfg):
438419
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
439420
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
440421

441-
unique_keys_synchronizer = None
442-
clear_filter_sync = None
443-
unique_keys_task = None
444-
clear_filter_task = None
445-
impressions_count_sync = None
446-
impressions_count_task = None
447-
redis_sender_adapter = RedisSenderAdapter(redis_adapter)
448-
449-
if cfg['impressionsMode'] == ImpressionsMode.NONE:
450-
imp_counter = ImpressionsCounter()
451-
imp_strategy = StrategyNoneMode(imp_counter)
452-
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
453-
unique_keys_synchronizer = UniqueKeysSynchronizer(redis_sender_adapter, imp_strategy.get_unique_keys_tracker())
454-
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
455-
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
456-
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
457-
impressions_count_sync = ImpressionsCountSynchronizer(redis_sender_adapter, imp_counter)
458-
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
459-
elif cfg['impressionsMode'] == ImpressionsMode.DEBUG:
460-
imp_strategy = StrategyDebugMode()
461-
else:
462-
imp_counter = ImpressionsCounter()
463-
imp_strategy = StrategyOptimizedMode(imp_counter)
464-
impressions_count_sync = ImpressionsCountSynchronizer(redis_sender_adapter, imp_counter)
465-
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
422+
unique_keys_synchronizer, clear_filter_sync, unique_keys_task, \
423+
clear_filter_task, impressions_count_sync, impressions_count_task, \
424+
imp_strategy = set_classes('REDIS', cfg['impressionsMode'], redis_adapter)
466425

467426
imp_manager = ImpressionsManager(
468427
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
@@ -543,17 +502,12 @@ def _build_localhost_factory(cfg):
543502
ready_event
544503
)
545504

546-
547505
def get_factory(api_key, **kwargs):
548506
"""Build and return the appropriate factory."""
549507
try:
550-
active_factory_count = 0
551-
redundant_factory_count = 0
552508
_INSTANTIATED_FACTORIES_LOCK.acquire()
553509
if _INSTANTIATED_FACTORIES:
554-
active_factory_count = active_factory_count + 1
555510
if api_key in _INSTANTIATED_FACTORIES:
556-
redundant_factory_count = redundant_factory_count + 1
557511
_LOGGER.warning(
558512
"factory instantiation: You already have %d %s with this API Key. "
559513
"We recommend keeping only one instance of the factory at all times "
@@ -570,8 +524,7 @@ def get_factory(api_key, **kwargs):
570524
)
571525

572526
config = sanitize_config(api_key, kwargs.get('config', {}))
573-
config['redundantFactoryCount'] = redundant_factory_count
574-
config['activeFactoryCount'] = active_factory_count
527+
extra_config = {}
575528

576529
if config['operationMode'] == 'localhost-standalone':
577530
return _build_localhost_factory(config)
@@ -582,12 +535,20 @@ def get_factory(api_key, **kwargs):
582535
return _build_in_memory_factory(
583536
api_key,
584537
config,
538+
extra_config,
585539
kwargs.get('sdk_api_base_url'),
586540
kwargs.get('events_api_base_url'),
587541
kwargs.get('auth_api_base_url'),
588542
kwargs.get('streaming_api_base_url'),
589543
kwargs.get('telemetry_api_base_url')
590544
)
591545
finally:
546+
redundant_factory_count = 0
547+
active_factory_count = 0
592548
_INSTANTIATED_FACTORIES.update([api_key])
549+
for item in _INSTANTIATED_FACTORIES:
550+
redundant_factory_count = redundant_factory_count + _INSTANTIATED_FACTORIES[item] - 1
551+
active_factory_count = active_factory_count + _INSTANTIATED_FACTORIES[item]
552+
extra_config['redundant_factory_count'] = redundant_factory_count
553+
extra_config['active_factory_count'] = active_factory_count
593554
_INSTANTIATED_FACTORIES_LOCK.release()

splitio/engine/filters.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import abc
2+
import threading
23

34
from bloom_filter2 import BloomFilter as BloomFilter2
45

@@ -45,6 +46,7 @@ def __init__(self, max_elements=5000, error_rate=0.01):
4546
self._max_elements = max_elements
4647
self._error_rate = error_rate
4748
self._imps_bloom_filter = BloomFilter2(max_elements=self._max_elements, error_rate=self._error_rate)
49+
self._lock = threading.RLock()
4850

4951
def add(self, data):
5052
"""
@@ -56,8 +58,9 @@ def add(self, data):
5658
:return: True if successful
5759
:rtype: boolean
5860
"""
59-
self._imps_bloom_filter.add(data)
60-
return data in self._imps_bloom_filter
61+
with self._lock:
62+
self._imps_bloom_filter.add(data)
63+
return data in self._imps_bloom_filter
6164

6265
def contains(self, data):
6366
"""
@@ -69,12 +72,14 @@ def contains(self, data):
6972
:return: True if exist
7073
:rtype: boolean
7174
"""
72-
return data in self._imps_bloom_filter
75+
with self._lock:
76+
return data in self._imps_bloom_filter
7377

7478
def clear(self):
7579
"""
7680
Destroy the current filter instance and create new one.
7781
7882
"""
79-
self._imps_bloom_filter.close()
80-
self._imps_bloom_filter = BloomFilter2(max_elements=self._max_elements, error_rate=self._error_rate)
83+
with self._lock:
84+
self._imps_bloom_filter.close()
85+
self._imps_bloom_filter = BloomFilter2(max_elements=self._max_elements, error_rate=self._error_rate)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from splitio.engine.impressions.impressions import ImpressionsMode
2+
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
3+
from splitio.engine.impressions.strategies import StrategyNoneMode, StrategyDebugMode, StrategyOptimizedMode
4+
from splitio.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter
5+
from splitio.tasks.unique_keys_sync import UniqueKeysSyncTask, ClearFilterSyncTask
6+
from splitio.sync.unique_keys import UniqueKeysSynchronizer, ClearFilterSynchronizer
7+
from splitio.sync.impression import ImpressionsCountSynchronizer
8+
from splitio.tasks.impressions_sync import ImpressionsCountSyncTask
9+
10+
11+
def set_classes(storage_mode, impressions_mode, api_adapter):
12+
unique_keys_synchronizer = None
13+
clear_filter_sync = None
14+
unique_keys_task = None
15+
clear_filter_task = None
16+
impressions_count_sync = None
17+
impressions_count_task = None
18+
if storage_mode == 'REDIS':
19+
redis_sender_adapter = RedisSenderAdapter(api_adapter)
20+
api_telemetry_adapter = redis_sender_adapter
21+
api_impressions_adapter = redis_sender_adapter
22+
else:
23+
api_telemetry_adapter = api_adapter['telemetry']
24+
api_impressions_adapter = api_adapter['impressions']
25+
26+
if impressions_mode == ImpressionsMode.NONE:
27+
imp_counter = ImpressionsCounter()
28+
imp_strategy = StrategyNoneMode(imp_counter)
29+
clear_filter_sync = ClearFilterSynchronizer(imp_strategy.get_unique_keys_tracker())
30+
unique_keys_synchronizer = UniqueKeysSynchronizer(InMemorySenderAdapter(api_telemetry_adapter), imp_strategy.get_unique_keys_tracker())
31+
unique_keys_task = UniqueKeysSyncTask(unique_keys_synchronizer.send_all)
32+
clear_filter_task = ClearFilterSyncTask(clear_filter_sync.clear_all)
33+
imp_strategy.get_unique_keys_tracker().set_queue_full_hook(unique_keys_task.flush)
34+
impressions_count_sync = ImpressionsCountSynchronizer(api_impressions_adapter, imp_counter)
35+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
36+
elif impressions_mode == ImpressionsMode.DEBUG:
37+
imp_strategy = StrategyDebugMode()
38+
else:
39+
imp_counter = ImpressionsCounter()
40+
imp_strategy = StrategyOptimizedMode(imp_counter)
41+
impressions_count_sync = ImpressionsCountSynchronizer(api_impressions_adapter, imp_counter)
42+
impressions_count_task = ImpressionsCountSyncTask(impressions_count_sync.synchronize_counters)
43+
44+
return unique_keys_synchronizer, clear_filter_sync, unique_keys_task, clear_filter_task, \
45+
impressions_count_sync, impressions_count_task, imp_strategy
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,12 @@ def _build_counters(self, counters):
135135
:return: dict with list of impression count dtos
136136
:rtype: dict
137137
"""
138-
return [json.dumps({
138+
return json.dumps({
139139
'pf': [
140140
{
141141
'f': pf_count.feature,
142142
'm': pf_count.timeframe,
143143
'rc': pf_count.count
144144
} for pf_count in counters
145145
]
146-
})]
146+
})
Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@ def __init__(self, listener=None, strategy=None):
1717
"""
1818
Construct a manger to track and forward impressions to the queue.
1919
20-
:param mode: Impressions capturing mode.
21-
:type mode: ImpressionsMode
22-
23-
:param standalone: whether the SDK is running in standalone sending impressions by itself
24-
:type standalone: bool
25-
2620
:param listener: Optional impressions listener that will capture all seen impressions.
2721
:type listener: splitio.client.listener.ImpressionListenerWrapper
22+
23+
:param strategy: Impressions stragetgy instance
24+
:type strategy: (BaseStrategy)
2825
"""
2926

3027
self._strategy = strategy
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import abc
22

3-
from splitio.engine.manager import Observer, truncate_impressions_time, Counter, truncate_time
4-
from splitio.engine.unique_keys_tracker import UniqueKeysTracker
3+
from splitio.engine.impressions.manager import Observer, truncate_impressions_time, Counter, truncate_time
4+
from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker
55
from splitio import util
66

77
_IMPRESSION_OBSERVER_CACHE_SIZE = 500000

splitio/engine/unique_keys_tracker.py renamed to splitio/engine/impressions/unique_keys_tracker.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ def track(self, key, feature_name):
4848
with self._lock:
4949
if self._filter.contains(feature_name+key):
5050
return False
51-
52-
with self._lock:
5351
self._add_or_update(feature_name, key)
5452
self._filter.add(feature_name+key)
5553
self._current_cache_size = self._current_cache_size + 1
@@ -72,9 +70,11 @@ def _add_or_update(self, feature_name, key):
7270
:param key: key to be added to MTK list
7371
:type key: int
7472
"""
75-
if feature_name not in self._cache:
76-
self._cache[feature_name] = set()
77-
self._cache[feature_name].add(key)
73+
74+
with self._lock:
75+
if feature_name not in self._cache:
76+
self._cache[feature_name] = set()
77+
self._cache[feature_name].add(key)
7878

7979
def set_queue_full_hook(self, hook):
8080
"""
@@ -85,7 +85,7 @@ def set_queue_full_hook(self, hook):
8585
if callable(hook):
8686
self._queue_full_hook = hook
8787

88-
def filter_pop_all(self):
88+
def clear_filter(self):
8989
"""
9090
Delete the filter items
9191

splitio/recorder/recorder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def record_treatment_stats(self, impressions, latency, operation):
129129
if self._data_sampling < rnumber:
130130
return
131131
impressions = self._impressions_manager.process_impressions(impressions)
132-
if impressions == []:
132+
if not impressions:
133133
return
134134
# pipe = self._make_pipe()
135135
# self._impression_storage.add_impressions_to_pipe(impressions, pipe)

splitio/sync/synchronizer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,9 @@ def __init__(self, split_synchronizers, split_tasks):
229229
]
230230
if self._split_tasks.impressions_count_task:
231231
self._periodic_data_recording_tasks.append(self._split_tasks.impressions_count_task)
232-
if self._split_tasks.unique_keys_task is not None:
232+
if self._split_tasks.unique_keys_task:
233233
self._periodic_data_recording_tasks.append(self._split_tasks.unique_keys_task)
234-
if self._split_tasks.clear_filter_task is not None:
234+
if self._split_tasks.clear_filter_task:
235235
self._periodic_data_recording_tasks.append(self._split_tasks.clear_filter_task)
236236

237237

0 commit comments

Comments
 (0)