Skip to content

Commit 0cb08b4

Browse files
committed
Removed counter and impression mode from ImpressionsManager and created strategy from factory instead
1 parent 37d55b6 commit 0cb08b4

File tree

9 files changed

+57
-51
lines changed

9 files changed

+57
-51
lines changed

splitio/client/factory.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from splitio.engine.strategies.strategy_debug_mode import StrategyDebugMode
1616
from splitio.engine.strategies.strategy_optimized_mode import StrategyOptimizedMode
1717

18-
1918
# Storage
2019
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
2120
InMemoryImpressionStorage, InMemoryEventStorage
@@ -317,11 +316,10 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
317316
'events': InMemoryEventStorage(cfg['eventsQueueSize']),
318317
}
319318

320-
imp_strategy = StrategyOptimizedMode(True) if cfg['ImpressionsMode'] == 'OPTIMIZED' else StrategyDebugMode(True)
319+
imp_counter = Counter() if cfg['impressionsMode'] == 'OPTIMIZED' else None
320+
imp_strategy = StrategyOptimizedMode(imp_counter) if cfg['impressionsMode'] == 'OPTIMIZED' else StrategyDebugMode()
321321

322322
imp_manager = ImpressionsManager(
323-
cfg['impressionsMode'],
324-
True,
325323
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
326324
imp_strategy)
327325

@@ -331,8 +329,9 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
331329
ImpressionSynchronizer(apis['impressions'], storages['impressions'],
332330
cfg['impressionsBulkSize']),
333331
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
334-
ImpressionsCountSynchronizer(apis['impressions'], imp_manager),
332+
ImpressionsCountSynchronizer(apis['impressions'], imp_counter),
335333
)
334+
imp_count_sync_task = ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters) if cfg['impressionsMode'] == 'OPTIMIZED' else None
336335

337336
tasks = SplitTasks(
338337
SplitSynchronizationTask(
@@ -348,7 +347,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
348347
cfg['impressionsRefreshRate'],
349348
),
350349
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
351-
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters)
350+
imp_count_sync_task
352351
)
353352

354353
synchronizer = Synchronizer(synchronizers, tasks)
@@ -399,10 +398,15 @@ def _build_redis_factory(api_key, cfg):
399398
_LOGGER.warning("dataSampling cannot be less than %.2f, defaulting to minimum",
400399
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
401400
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
401+
402+
imp_strategy = StrategyOptimizedMode(Counter()) if cfg['impressionsMode'] == 'OPTIMIZED' else StrategyDebugMode()
403+
imp_manager = ImpressionsManager(
404+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
405+
imp_strategy)
406+
402407
recorder = PipelinedRecorder(
403408
redis_adapter.pipeline,
404-
ImpressionsManager(cfg['impressionsMode'], False,
405-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
409+
imp_manager,
406410
storages['events'],
407411
storages['impressions'],
408412
data_sampling,
@@ -414,7 +418,6 @@ def _build_redis_factory(api_key, cfg):
414418
recorder,
415419
)
416420

417-
418421
def _build_localhost_factory(cfg):
419422
"""Build and return a localhost factory for testing/development purposes."""
420423
storages = {
@@ -441,8 +444,9 @@ def _build_localhost_factory(cfg):
441444
synchronizer = LocalhostSynchronizer(synchronizers, tasks)
442445
manager = Manager(ready_event, synchronizer, None, False, sdk_metadata)
443446
manager.start()
447+
444448
recorder = StandardRecorder(
445-
ImpressionsManager(cfg['impressionsMode'], True, None),
449+
ImpressionsManager(cfg['impressionsMode'], StrategyDebugMode()),
446450
storages['events'],
447451
storages['impressions'],
448452
)

splitio/engine/impressions.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from splitio.client.listener import ImpressionListenerException
55
from splitio.engine.strategies.strategy_debug_mode import StrategyDebugMode
66
from splitio.engine.strategies.strategy_optimized_mode import StrategyOptimizedMode
7-
from splitio.engine.strategies import Observer, Counter
7+
#from splitio.engine.strategies import Observer, Counter
88

99
class ImpressionsMode(Enum):
1010
"""Impressions tracking mode."""
@@ -15,7 +15,7 @@ class ImpressionsMode(Enum):
1515
class Manager(object): # pylint:disable=too-few-public-methods
1616
"""Impression manager."""
1717

18-
def __init__(self, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=None, strategy=None):
18+
def __init__(self, listener=None, strategy=None):
1919
"""
2020
Construct a manger to track and forward impressions to the queue.
2121
@@ -29,7 +29,6 @@ def __init__(self, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=Non
2929
:type listener: splitio.client.listener.ImpressionListenerWrapper
3030
"""
3131

32-
self._counter = Counter() if standalone else None
3332
self._strategy = strategy
3433
self._listener = listener
3534

@@ -42,19 +41,10 @@ def process_impressions(self, impressions):
4241
:param impressions: List of impression objects with attributes
4342
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
4443
"""
45-
for_log, for_listener = self._strategy.process_impressions(impressions, self._counter)
44+
for_log, for_listener = self._strategy.process_impressions(impressions)
4645
self._send_impressions_to_listener(for_listener)
4746
return for_log
4847

49-
def get_counts(self):
50-
"""
51-
Return counts of impressions per features.
52-
53-
:returns: A list of counter objects.
54-
:rtype: list[Counter.CountPerFeature]
55-
"""
56-
return self._counter.pop_all() if self._counter is not None else []
57-
5848
def _send_impressions_to_listener(self, impressions):
5949
"""
6050
Send impression result to custom listener.

splitio/engine/strategies/strategy_debug_mode.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
class StrategyDebugMode(BaseStrategy):
77
"""Debug mode strategy."""
88

9-
def __init__(self, standalone=True):
9+
def __init__(self):
1010
"""
1111
Construct a strategy instance for debug mode.
1212
1313
"""
14-
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if standalone else None
14+
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE)
1515

16-
def process_impressions(self, impressions, counter=None):
16+
def process_impressions(self, impressions):
1717
"""
1818
Process impressions.
1919

splitio/engine/strategies/strategy_optimized_mode.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@
77
class StrategyOptimizedMode(BaseStrategy):
88
"""Optimized mode strategy."""
99

10-
def __init__(self, standalone=True):
10+
def __init__(self, counter=None):
1111
"""
1212
Construct a strategy instance for optimized mode.
1313
1414
"""
15-
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if standalone else None
15+
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE)
16+
self._counter = counter
1617

17-
def process_impressions(self, impressions, counter):
18+
def process_impressions(self, impressions):
1819
"""
1920
Process impressions.
2021
@@ -27,6 +28,6 @@ def process_impressions(self, impressions, counter):
2728
:rtype: list[tuple[splitio.models.impression.Impression, dict]]
2829
"""
2930
imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] if self._observer else impressions
30-
counter.track([imp for imp, _ in imps])
31+
self._counter.track([imp for imp, _ in imps])
3132
this_hour = truncate_time(util.utctime_ms())
3233
return [i for i, _ in imps if i.previous_time is None or i.previous_time < this_hour], imps

splitio/sync/impression.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import queue
33

44
from splitio.api import APIException
5+
from splitio.engine.strategies import Counter
56

67

78
_LOGGER = logging.getLogger(__name__)
@@ -68,22 +69,22 @@ def synchronize_impressions(self):
6869

6970

7071
class ImpressionsCountSynchronizer(object):
71-
def __init__(self, impressions_api, impressions_manager):
72+
def __init__(self, impressions_api, impressions_counter):
7273
"""
7374
Class constructor.
7475
7576
:param impressions_api: Impressions Api object to send data to the backend
7677
:type impressions_api: splitio.api.impressions.ImpressionsAPI
7778
:param impressions_manager: Impressions manager instance
78-
:type impressions_manager: splitio.engine.impressions.Manager
79+
:type impressions_counter: splitio.engine.strategies
7980
8081
"""
8182
self._impressions_api = impressions_api
82-
self._impressions_manager = impressions_manager
83+
self._impressions_counter = impressions_counter
8384

8485
def synchronize_counters(self):
8586
"""Send impressions from both the failed and new queues."""
86-
to_send = self._impressions_manager.get_counts()
87+
to_send = self._impressions_counter.pop_all()
8788
if not to_send:
8889
return
8990

splitio/sync/synchronizer.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,8 @@ def start_periodic_data_recording(self):
291291
_LOGGER.debug('Starting periodic data recording')
292292
self._split_tasks.impressions_task.start()
293293
self._split_tasks.events_task.start()
294-
self._split_tasks.impressions_count_task.start()
294+
if self._split_tasks.impressions_count_task is not None:
295+
self._split_tasks.impressions_count_task.start()
295296

296297
def stop_periodic_data_recording(self, blocking):
297298
"""
@@ -303,9 +304,11 @@ def stop_periodic_data_recording(self, blocking):
303304
_LOGGER.debug('Stopping periodic data recording')
304305
if blocking:
305306
events = []
306-
for task in [self._split_tasks.impressions_task,
307-
self._split_tasks.events_task,
308-
self._split_tasks.impressions_count_task]:
307+
tasks = [self._split_tasks.impressions_task,
308+
self._split_tasks.events_task]
309+
if self._split_tasks.impressions_count_task is not None:
310+
tasks.append(self._split_tasks.impressions_count_task)
311+
for task in tasks:
309312
stop_event = threading.Event()
310313
task.stop(stop_event)
311314
events.append(stop_event)
@@ -314,7 +317,8 @@ def stop_periodic_data_recording(self, blocking):
314317
else:
315318
self._split_tasks.impressions_task.stop()
316319
self._split_tasks.events_task.stop()
317-
self._split_tasks.impressions_count_task.stop()
320+
if self._split_tasks.impressions_count_task is not None:
321+
self._split_tasks.impressions_count_task.stop()
318322

319323
def kill_split(self, split_name, default_treatment, change_number):
320324
"""

tests/integration/test_client_e2e.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
from splitio.storage.adapters.redis import build, RedisAdapter
1616
from splitio.models import splits, segments
1717
from splitio.engine.impressions import Manager as ImpressionsManager, ImpressionsMode
18+
from splitio.engine.strategies.strategy_debug_mode import StrategyDebugMode
19+
from splitio.engine.strategies.strategy_optimized_mode import StrategyOptimizedMode
20+
from splitio.engine.strategies import Counter
1821
from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder
1922
from splitio.client.config import DEFAULT_CONFIG
2023

@@ -48,7 +51,8 @@ def setup_method(self):
4851
'impressions': InMemoryImpressionStorage(5000),
4952
'events': InMemoryEventStorage(5000),
5053
}
51-
impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG)
54+
# impmanager = ImpressionsManager(storages['impressions'].put, StrategyDebugMode())
55+
impmanager = ImpressionsManager(None, StrategyDebugMode()) # no listener
5256
recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'])
5357
self.factory = SplitFactory('some_api_key', storages, True, recorder) # pylint:disable=attribute-defined-outside-init
5458

@@ -296,7 +300,8 @@ def setup_method(self):
296300
'impressions': InMemoryImpressionStorage(5000),
297301
'events': InMemoryEventStorage(5000),
298302
}
299-
impmanager = ImpressionsManager(ImpressionsMode.OPTIMIZED, True)
303+
# impmanager = ImpressionsManager(ImpressionsMode.OPTIMIZED, True)
304+
impmanager = ImpressionsManager(None, StrategyOptimizedMode(Counter()))
300305
recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'])
301306
self.factory = SplitFactory('some_api_key', storages, True, recorder) # pylint:disable=attribute-defined-outside-init
302307

@@ -514,7 +519,7 @@ def setup_method(self):
514519
'impressions': RedisImpressionsStorage(redis_client, metadata),
515520
'events': RedisEventsStorage(redis_client, metadata),
516521
}
517-
impmanager = ImpressionsManager(ImpressionsMode.DEBUG, False)
522+
impmanager = ImpressionsManager(None, StrategyDebugMode())
518523
recorder = PipelinedRecorder(redis_client.pipeline, impmanager,
519524
storages['events'], storages['impressions'])
520525
self.factory = SplitFactory('some_api_key', storages, True, recorder) # pylint:disable=attribute-defined-outside-init
@@ -791,7 +796,8 @@ def setup_method(self):
791796
'impressions': RedisImpressionsStorage(redis_client, metadata),
792797
'events': RedisEventsStorage(redis_client, metadata),
793798
}
794-
impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG)
799+
# impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG)
800+
impmanager = ImpressionsManager(None, StrategyDebugMode())
795801
recorder = PipelinedRecorder(redis_client.pipeline, impmanager,
796802
storages['events'], storages['impressions'])
797803
self.factory = SplitFactory('some_api_key', storages, True, recorder) # pylint:disable=attribute-defined-outside-init

tests/sync/test_impressions_count_synchronizer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class ImpressionsCountSynchronizerTests(object):
1616
"""ImpressionsCount synchronizer test cases."""
1717

1818
def test_synchronize_impressions_counts(self, mocker):
19-
manager = mocker.Mock(spec=ImpressionsManager)
19+
counter = mocker.Mock(spec=Counter)
2020

2121
counters = [
2222
Counter.CountPerFeature('f1', 123, 2),
@@ -25,13 +25,13 @@ def test_synchronize_impressions_counts(self, mocker):
2525
Counter.CountPerFeature('f2', 456, 222)
2626
]
2727

28-
manager.get_counts.return_value = counters
28+
counter.pop_all.return_value = counters
2929
api = mocker.Mock(spec=ImpressionsAPI)
3030
api.flush_counters.return_value = HttpResponse(200, '')
31-
impression_count_synchronizer = ImpressionsCountSynchronizer(api, manager)
31+
impression_count_synchronizer = ImpressionsCountSynchronizer(api, counter)
3232
impression_count_synchronizer.synchronize_counters()
3333

34-
assert manager.get_counts.mock_calls[0] == mocker.call()
34+
assert counter.pop_all.mock_calls[0] == mocker.call()
3535
assert api.flush_counters.mock_calls[0] == mocker.call(counters)
3636

3737
assert len(api.flush_counters.mock_calls) == 1

tests/tasks/test_impressions_sync.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class ImpressionsCountSyncTests(object):
5151

5252
def test_normal_operation(self, mocker):
5353
"""Test that the task works properly under normal circumstances."""
54-
manager = mocker.Mock(spec=ImpressionsManager)
54+
counter = mocker.Mock(spec=Counter)
5555

5656
counters = [
5757
Counter.CountPerFeature('f1', 123, 2),
@@ -60,18 +60,18 @@ def test_normal_operation(self, mocker):
6060
Counter.CountPerFeature('f2', 456, 222)
6161
]
6262

63-
manager.get_counts.return_value = counters
63+
counter.pop_all.return_value = counters
6464
api = mocker.Mock(spec=ImpressionsAPI)
6565
api.flush_counters.return_value = HttpResponse(200, '')
6666
impressions_sync.ImpressionsCountSyncTask._PERIOD = 1
67-
impression_synchronizer = ImpressionsCountSynchronizer(api, manager)
67+
impression_synchronizer = ImpressionsCountSynchronizer(api, counter)
6868
task = impressions_sync.ImpressionsCountSyncTask(
6969
impression_synchronizer.synchronize_counters
7070
)
7171
task.start()
7272
time.sleep(2)
7373
assert task.is_running()
74-
assert manager.get_counts.mock_calls[0] == mocker.call()
74+
assert counter.pop_all.mock_calls[0] == mocker.call()
7575
assert api.flush_counters.mock_calls[0] == mocker.call(counters)
7676
stop_event = threading.Event()
7777
calls_now = len(api.flush_counters.mock_calls)

0 commit comments

Comments
 (0)