Skip to content

Commit 2c4cab0

Browse files
committed
Fixed several bugs and tested locally against staging, fixed existing tests.
TBD: creating new tests
1 parent 42df386 commit 2c4cab0

18 files changed

+193
-101
lines changed

splitio/api/telemetry.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
_LOGGER = logging.getLogger(__name__)
1010

11-
1211
class TelemetryAPI(object): # pylint: disable=too-few-public-methods
1312
"""Class that uses an httpClient to communicate with the Telemetry API."""
1413

@@ -34,8 +33,8 @@ def record_unique_keys(self, uniques):
3433
"""
3534
try:
3635
response = self._client.post(
37-
'keys',
38-
'/ss',
36+
'telemetry',
37+
'/keys/ss',
3938
self._apikey,
4039
body=uniques,
4140
extra_headers=self._metadata

splitio/client/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def _sanitize_impressions_mode(mode, refresh_rate=None):
9191
mode = ImpressionsMode(mode.upper())
9292
except (ValueError, AttributeError):
9393
_LOGGER.warning('You passed an invalid impressionsMode, impressionsMode should be '
94-
'one of the following values: `debug` or `optimized`. '
94+
'one of the following values: `debug`, `none` or `optimized`. '
9595
'Defaulting to `optimized` mode.')
9696
mode = ImpressionsMode.OPTIMIZED
9797

splitio/client/factory.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@
1212
from splitio.client import util
1313
from splitio.client.listener import ImpressionListenerWrapper
1414
from splitio.engine.impressions import Manager as ImpressionsManager
15+
from splitio.engine.impressions import ImpressionsMode
16+
from splitio.engine.strategies import Counter as ImpressionsCounter
1517
from splitio.engine.strategies.strategy_debug_mode import StrategyDebugMode
1618
from splitio.engine.strategies.strategy_optimized_mode import StrategyOptimizedMode
1719
from splitio.engine.strategies.strategy_none_mode import StrategyNoneMode
20+
from splitio.engine.sender_adapters.in_memory_sender_adapter import InMemorySenderAdapter
1821

1922
# Storage
2023
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
@@ -310,7 +313,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
310313
'segments': SegmentsAPI(http_client, api_key, sdk_metadata),
311314
'impressions': ImpressionsAPI(http_client, api_key, sdk_metadata, cfg['impressionsMode']),
312315
'events': EventsAPI(http_client, api_key, sdk_metadata),
313-
'telemtery': TelemetryAPI(http_client, api_key, sdk_metadata),
316+
'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata),
314317
}
315318

316319
if not input_validator.validate_apikey_type(apis['segments']):
@@ -322,13 +325,12 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
322325
'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize']),
323326
'events': InMemoryEventStorage(cfg['eventsQueueSize']),
324327
}
325-
326-
imp_counter = Counter() if cfg['impressionsMode'] == 'OPTIMIZED' else None
328+
imp_counter = ImpressionsCounter() if cfg['impressionsMode'] != ImpressionsMode.DEBUG else None
327329

328330
strategies = {
329-
'OPTIMIZED': StrategyOptimizedMode(imp_counter),
330-
'DEBUG' : StrategyDebugMode(),
331-
'NONE' : StrategyNoneMode(imp_counter),
331+
ImpressionsMode.OPTIMIZED : StrategyOptimizedMode(imp_counter),
332+
ImpressionsMode.DEBUG : StrategyDebugMode(),
333+
ImpressionsMode.NONE : StrategyNoneMode(imp_counter),
332334
}
333335
imp_strategy = strategies[cfg['impressionsMode']]
334336

@@ -362,9 +364,9 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
362364
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
363365
)
364366

365-
if cfg['impressionsMode'] == 'NONE':
367+
if cfg['impressionsMode'] == ImpressionsMode.NONE:
366368
synchronizers.set_none_syncs(
367-
UniqueKeysSynchronizer(imp_strategy._unique_keys_tracker),
369+
UniqueKeysSynchronizer(InMemorySenderAdapter(apis['telemetry']), imp_strategy._unique_keys_tracker),
368370
ClearFilterSynchronizer(imp_strategy._unique_keys_tracker),
369371
)
370372
tasks.set_none_tasks(
@@ -382,7 +384,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
382384

383385
storages['events'].set_queue_full_hook(tasks.events_task.flush)
384386
storages['impressions'].set_queue_full_hook(tasks.impressions_task.flush)
385-
# TODO: Add unique_keys_tracker.set_queue_full_hook(tasks.unique_keys.flush)
387+
if cfg['impressionsMode'] == ImpressionsMode.NONE:
388+
imp_strategy._unique_keys_tracker.set_queue_full_hook(tasks._unique_keys_task.flush)
386389

387390
recorder = StandardRecorder(
388391
imp_manager,
@@ -421,10 +424,14 @@ def _build_redis_factory(api_key, cfg):
421424
_LOGGER.warning("dataSampling cannot be less than %.2f, defaulting to minimum",
422425
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
423426
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
427+
428+
imp_manager = ImpressionsManager(
429+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
430+
StrategyDebugMode())
431+
424432
recorder = PipelinedRecorder(
425433
redis_adapter.pipeline,
426-
ImpressionsManager(cfg['impressionsMode'], False,
427-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
434+
imp_manager,
428435
storages['events'],
429436
storages['impressions'],
430437
data_sampling,
@@ -464,7 +471,7 @@ def _build_localhost_factory(cfg):
464471
manager = Manager(ready_event, synchronizer, None, False, sdk_metadata)
465472
manager.start()
466473
recorder = StandardRecorder(
467-
ImpressionsManager(cfg['impressionsMode'], True, None),
474+
ImpressionsManager(None, StrategyDebugMode()),
468475
storages['events'],
469476
storages['impressions'],
470477
)
@@ -513,7 +520,8 @@ def get_factory(api_key, **kwargs):
513520
kwargs.get('sdk_api_base_url'),
514521
kwargs.get('events_api_base_url'),
515522
kwargs.get('auth_api_base_url'),
516-
kwargs.get('streaming_api_base_url')
523+
kwargs.get('streaming_api_base_url'),
524+
kwargs.get('telemetry_api_base_url')
517525
)
518526
finally:
519527
_INSTANTIATED_FACTORIES.update([api_key])

splitio/engine/impressions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class ImpressionsMode(Enum):
88

99
OPTIMIZED = "OPTIMIZED"
1010
DEBUG = "DEBUG"
11-
NONE = 'NONE'
11+
NONE = "NONE"
1212

1313
class Manager(object): # pylint:disable=too-few-public-methods
1414
"""Impression manager."""

splitio/engine/sender_adapters/__init__.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,3 @@ def record_unique_keys(self, data):
1010
1111
"""
1212
pass
13-
14-
@abc.abstractmethod
15-
def record_impressions_count(self, data):
16-
"""
17-
No Return value
18-
19-
"""
20-
pass

splitio/engine/sender_adapters/in_memory_sender_adapter.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,14 @@ def _uniques_formatter(self, uniques):
3333
:return: unique keys JSON
3434
:rtype: json
3535
"""
36-
formatted_uniques = json.load('{keys: []}')
36+
formatted_uniques = json.loads('{"keys": []}')
3737
if len(uniques) == 0:
38-
return formatted_uniques
39-
for key in uniques:
40-
formatted_uniques['keys'].append('{"f":"' + key +'", "ks:['+ json.dump(uniques[key])+']}')
41-
return formatted_uniques
38+
return json.loads('{"keys": []}')
39+
40+
return {
41+
'keys': [{'f': feature, 'ks': list(keys)} for feature, keys in uniques.items()]
42+
}
43+
# for key in uniques:
44+
# formatted_uniques["keys"].append(json.loads('{"f":"' + key +'", "ks":' + json.dumps(list(uniques[key])) + '}'))
45+
46+
# return formatted_uniques

splitio/engine/strategies/strategy_none_mode.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@ def process_impressions(self, impressions):
3434
imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions]
3535
self._counter.track([imp for imp, _ in imps])
3636
this_hour = truncate_time(util.utctime_ms())
37-
[self._unique_keys_tracker(i.matching_key, i.feature_name) for i, _ in imps if i.previous_time is None or i.previous_time < this_hour], imps
37+
[self._unique_keys_tracker.track(i.matching_key, i.feature_name) for i, _ in imps if i.previous_time is None or i.previous_time < this_hour]
3838

39-
return []
39+
return [], imps

splitio/engine/strategies/strategy_optimized_mode.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@
77
class StrategyOptimizedMode(BaseStrategy):
88
"""Optimized mode strategy."""
99

10-
def __init__(self, counter=None):
10+
def __init__(self, counter):
1111
"""
1212
Construct a strategy instance for optimized mode.
1313
1414
"""
1515
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE)
1616
self._counter = counter
1717

18+
1819
def process_impressions(self, impressions):
1920
"""
2021
Process impressions.

splitio/engine/unique_keys_tracker.py

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import threading
33
import logging
44
from splitio.engine.filters.bloom_filter import BloomFilter
5-
from splitio.engine.sender_adapters.in_memory_sender_adapter import InMemorySenderAdapter
65

76
_LOGGER = logging.getLogger(__name__)
87

@@ -32,6 +31,7 @@ def __init__(self, cache_size=30000):
3231
self._lock = threading.RLock()
3332
self._cache = {}
3433
self._queue_full_hook = None
34+
self._current_cache_size = 0
3535

3636
def track(self, key, feature_name):
3737
"""
@@ -45,33 +45,23 @@ def track(self, key, feature_name):
4545
:return: True if successful
4646
:rtype: boolean
4747
"""
48-
if self._filter.contains(feature_name+key):
49-
return False
48+
with self._lock:
49+
if self._filter.contains(feature_name+key):
50+
return False
5051

5152
with self._lock:
5253
self._add_or_update(feature_name, key)
5354
self._filter.add(feature_name+key)
55+
self._current_cache_size = self._current_cache_size + 1
5456

55-
if self._get_dict_size() > self._cache_size:
56-
if self._queue_full_hook is not None and callable(self._queue_full_hook):
57-
self._queue_full_hook()
57+
if self._current_cache_size > self._cache_size:
5858
_LOGGER.info(
5959
'Unique Keys queue is full, flushing the current queue now.'
6060
)
61+
if self._queue_full_hook is not None and callable(self._queue_full_hook):
62+
self._queue_full_hook()
6163
return True
6264

63-
def _get_dict_size(self):
64-
"""
65-
Return the size of unique keys dictionary (number of keys in all features)
66-
67-
:return: dictionary set() items count
68-
:rtype: int
69-
"""
70-
total_size = 0
71-
for key in self._uniqe_keys_tracker._cache:
72-
total_size = total_size + len(self._uniqe_keys_tracker._cache[key])
73-
return total_size
74-
7565
def _add_or_update(self, feature_name, key):
7666
"""
7767
Add the feature_name+key to both bloom filter and dictionary.
@@ -93,3 +83,20 @@ def set_queue_full_hook(self, hook):
9383
"""
9484
if callable(hook):
9585
self._queue_full_hook = hook
86+
87+
def filter_pop_all(self):
88+
"""
89+
Delete the filter items
90+
91+
"""
92+
with self._lock:
93+
self._filter.clear()
94+
95+
def get_cache_info_and_pop_all(self):
96+
with self._lock:
97+
temp_cach = self._cache.copy()
98+
temp_cache_size = self._current_cache_size
99+
self._cache = {}
100+
self._current_cache_size = 0
101+
102+
return temp_cach, temp_cache_size

splitio/sync/impression.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def __init__(self, impressions_api, impressions_manager):
8383

8484
def synchronize_counters(self):
8585
"""Send impressions from both the failed and new queues."""
86-
to_send = self._impressions_manager.get_counts()
86+
to_send = self._impressions_manager._strategy._counter.pop_all()
8787
if not to_send:
8888
return
8989

0 commit comments

Comments
 (0)