Skip to content

Commit bab7850

Browse files
authored
Merge pull request #262 from splitio/ImpressionsStrategyInterfaceImplementation
Implemented Impressions strategy classes for Impression Manager
2 parents 802a2f2 + 9fe0fb2 commit bab7850

File tree

13 files changed

+375
-255
lines changed

13 files changed

+375
-255
lines changed

splitio/client/factory.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
from splitio.client import util
1313
from splitio.client.listener import ImpressionListenerWrapper
1414
from splitio.engine.impressions import Manager as ImpressionsManager
15+
from splitio.engine.impressions import ImpressionsMode
16+
from splitio.engine.strategies import Counter as ImpressionsCounter
17+
from splitio.engine.strategies.strategy_debug_mode import StrategyDebugMode
18+
from splitio.engine.strategies.strategy_optimized_mode import StrategyOptimizedMode
1519

1620
# Storage
1721
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
@@ -314,10 +318,17 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
314318
'events': InMemoryEventStorage(cfg['eventsQueueSize']),
315319
}
316320

321+
imp_counter = ImpressionsCounter() if cfg['impressionsMode'] != ImpressionsMode.DEBUG else None
322+
323+
strategies = {
324+
ImpressionsMode.OPTIMIZED : StrategyOptimizedMode(imp_counter),
325+
ImpressionsMode.DEBUG : StrategyDebugMode(),
326+
}
327+
imp_strategy = strategies[cfg['impressionsMode']]
328+
317329
imp_manager = ImpressionsManager(
318-
cfg['impressionsMode'],
319-
True,
320-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
330+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
331+
imp_strategy)
321332

322333
synchronizers = SplitSynchronizers(
323334
SplitSynchronizer(apis['splits'], storages['splits']),
@@ -327,6 +338,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
327338
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
328339
ImpressionsCountSynchronizer(apis['impressions'], imp_manager),
329340
)
341+
imp_count_sync_task = ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters) if cfg['impressionsMode'] == 'OPTIMIZED' else None
330342

331343
tasks = SplitTasks(
332344
SplitSynchronizationTask(
@@ -342,7 +354,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
342354
cfg['impressionsRefreshRate'],
343355
),
344356
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
345-
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters)
357+
imp_count_sync_task
346358
)
347359

348360
synchronizer = Synchronizer(synchronizers, tasks)
@@ -393,10 +405,14 @@ def _build_redis_factory(api_key, cfg):
393405
_LOGGER.warning("dataSampling cannot be less than %.2f, defaulting to minimum",
394406
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
395407
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
408+
409+
imp_manager = ImpressionsManager(
410+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
411+
StrategyDebugMode())
412+
396413
recorder = PipelinedRecorder(
397414
redis_adapter.pipeline,
398-
ImpressionsManager(cfg['impressionsMode'], False,
399-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
415+
imp_manager,
400416
storages['events'],
401417
storages['impressions'],
402418
data_sampling,
@@ -408,7 +424,6 @@ def _build_redis_factory(api_key, cfg):
408424
recorder,
409425
)
410426

411-
412427
def _build_localhost_factory(cfg):
413428
"""Build and return a localhost factory for testing/development purposes."""
414429
storages = {
@@ -435,8 +450,9 @@ def _build_localhost_factory(cfg):
435450
synchronizer = LocalhostSynchronizer(synchronizers, tasks)
436451
manager = Manager(ready_event, synchronizer, None, False, sdk_metadata)
437452
manager.start()
453+
438454
recorder = StandardRecorder(
439-
ImpressionsManager(cfg['impressionsMode'], True, None),
455+
ImpressionsManager(cfg['impressionsMode'], StrategyDebugMode()),
440456
storages['events'],
441457
storages['impressions'],
442458
)

splitio/engine/impressions.py

Lines changed: 6 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -1,160 +1,18 @@
11
"""Split evaluator module."""
2-
import threading
3-
from collections import defaultdict, namedtuple
42
from enum import Enum
53

6-
from splitio.models.impressions import Impression
7-
from splitio.engine.hashfns import murmur_128
8-
from splitio.engine.cache.lru import SimpleLruCache
94
from splitio.client.listener import ImpressionListenerException
10-
from splitio import util
11-
12-
13-
_TIME_INTERVAL_MS = 3600 * 1000 # one hour
14-
_IMPRESSION_OBSERVER_CACHE_SIZE = 500000
15-
165

176
class ImpressionsMode(Enum):
187
"""Impressions tracking mode."""
198

209
OPTIMIZED = "OPTIMIZED"
2110
DEBUG = "DEBUG"
2211

23-
24-
def truncate_time(timestamp_ms):
25-
"""
26-
Truncate a timestamp in milliseconds to have hour granularity.
27-
28-
:param timestamp_ms: timestamp generated in the impression.
29-
:type timestamp_ms: int
30-
31-
:returns: a timestamp with hour, min, seconds, and ms set to 0.
32-
:rtype: int
33-
"""
34-
return timestamp_ms - (timestamp_ms % _TIME_INTERVAL_MS)
35-
36-
37-
class Hasher(object): # pylint:disable=too-few-public-methods
38-
"""Impression hasher."""
39-
40-
_PATTERN = "%s:%s:%s:%s:%d"
41-
42-
def __init__(self, hash_fn=murmur_128, seed=0):
43-
"""
44-
Class constructor.
45-
46-
:param hash_fn: Hash function to apply (str, int) -> int
47-
:type hash_fn: callable
48-
49-
:param seed: seed to be provided when hashing
50-
:type seed: int
51-
"""
52-
self._hash_fn = hash_fn
53-
self._seed = seed
54-
55-
def _stringify(self, impression):
56-
"""
57-
Stringify an impression.
58-
59-
:param impression: Impression to stringify using _PATTERN
60-
:type impression: splitio.models.impressions.Impression
61-
62-
:returns: a string representation of the impression
63-
:rtype: str
64-
"""
65-
return self._PATTERN % (impression.matching_key if impression.matching_key else 'UNKNOWN',
66-
impression.feature_name if impression.feature_name else 'UNKNOWN',
67-
impression.treatment if impression.treatment else 'UNKNOWN',
68-
impression.label if impression.label else 'UNKNOWN',
69-
impression.change_number if impression.change_number else 0)
70-
71-
def process(self, impression):
72-
"""
73-
Hash an impression.
74-
75-
:param impression: Impression to hash.
76-
:type impression: splitio.models.impressions.Impression
77-
78-
:returns: a hash of the supplied impression's relevant fields.
79-
:rtype: int
80-
"""
81-
return self._hash_fn(self._stringify(impression), self._seed)
82-
83-
84-
class Observer(object): # pylint:disable=too-few-public-methods
85-
"""Observe impression and add a previous time if applicable."""
86-
87-
def __init__(self, size):
88-
"""Class constructor."""
89-
self._hasher = Hasher()
90-
self._cache = SimpleLruCache(size)
91-
92-
def test_and_set(self, impression):
93-
"""
94-
Examine an impression to determine and set it's previous time accordingly.
95-
96-
:param impression: Impression to track
97-
:type impression: splitio.models.impressions.Impression
98-
99-
:returns: Impression with populated previous time
100-
:rtype: splitio.models.impressions.Impression
101-
"""
102-
previous_time = self._cache.test_and_set(self._hasher.process(impression), impression.time)
103-
return Impression(impression.matching_key,
104-
impression.feature_name,
105-
impression.treatment,
106-
impression.label,
107-
impression.change_number,
108-
impression.bucketing_key,
109-
impression.time,
110-
previous_time)
111-
112-
113-
class Counter(object):
114-
"""Class that counts impressions per timeframe."""
115-
116-
CounterKey = namedtuple('Count', ['feature', 'timeframe'])
117-
CountPerFeature = namedtuple('CountPerFeature', ['feature', 'timeframe', 'count'])
118-
119-
def __init__(self):
120-
"""Class constructor."""
121-
self._data = defaultdict(lambda: 0)
122-
self._lock = threading.Lock()
123-
124-
def track(self, impressions, inc=1):
125-
"""
126-
Register N new impressions for a feature in a specific timeframe.
127-
128-
:param impressions: generated impressions
129-
:type impressions: list[splitio.models.impressions.Impression]
130-
131-
:param inc: amount to increment (defaults to 1)
132-
:type inc: int
133-
"""
134-
keys = [Counter.CounterKey(i.feature_name, truncate_time(i.time)) for i in impressions]
135-
with self._lock:
136-
for key in keys:
137-
self._data[key] += inc
138-
139-
def pop_all(self):
140-
"""
141-
Clear and return all the counters currently stored.
142-
143-
:returns: List of count per feature/timeframe objects
144-
:rtype: list[ImpressionCounter.CountPerFeature]
145-
"""
146-
with self._lock:
147-
old = self._data
148-
self._data = defaultdict(lambda: 0)
149-
150-
return [Counter.CountPerFeature(k.feature, k.timeframe, v)
151-
for (k, v) in old.items()]
152-
153-
15412
class Manager(object): # pylint:disable=too-few-public-methods
15513
"""Impression manager."""
15614

157-
def __init__(self, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=None):
15+
def __init__(self, listener=None, strategy=None):
15816
"""
15917
Construct a manger to track and forward impressions to the queue.
16018
@@ -167,8 +25,8 @@ def __init__(self, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=Non
16725
:param listener: Optional impressions listener that will capture all seen impressions.
16826
:type listener: splitio.client.listener.ImpressionListenerWrapper
16927
"""
170-
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if standalone else None
171-
self._counter = Counter() if standalone and mode == ImpressionsMode.OPTIMIZED else None
28+
29+
self._strategy = strategy
17230
self._listener = listener
17331

17432
def process_impressions(self, impressions):
@@ -180,26 +38,9 @@ def process_impressions(self, impressions):
18038
:param impressions: List of impression objects with attributes
18139
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
18240
"""
183-
imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] \
184-
if self._observer else impressions
185-
186-
if self._counter:
187-
self._counter.track([imp for imp, _ in imps])
188-
189-
self._send_impressions_to_listener(imps)
190-
191-
this_hour = truncate_time(util.utctime_ms())
192-
return [imp for imp, _ in imps] if self._counter is None \
193-
else [i for i, _ in imps if i.previous_time is None or i.previous_time < this_hour]
194-
195-
def get_counts(self):
196-
"""
197-
Return counts of impressions per features.
198-
199-
:returns: A list of counter objects.
200-
:rtype: list[Counter.CountPerFeature]
201-
"""
202-
return self._counter.pop_all() if self._counter is not None else []
41+
for_log, for_listener = self._strategy.process_impressions(impressions)
42+
self._send_impressions_to_listener(for_listener)
43+
return for_log
20344

20445
def _send_impressions_to_listener(self, impressions):
20546
"""

0 commit comments

Comments
 (0)