Skip to content

Commit 42df386

Browse files
committed
Added NONE implementationa and ported Strategies code.
1 parent 8367d59 commit 42df386

15 files changed

+430
-332
lines changed

splitio/client/factory.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
from splitio.client import util
1313
from splitio.client.listener import ImpressionListenerWrapper
1414
from splitio.engine.impressions import Manager as ImpressionsManager
15+
from splitio.engine.strategies.strategy_debug_mode import StrategyDebugMode
16+
from splitio.engine.strategies.strategy_optimized_mode import StrategyOptimizedMode
17+
from splitio.engine.strategies.strategy_none_mode import StrategyNoneMode
1518

1619
# Storage
1720
from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \
@@ -320,10 +323,18 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
320323
'events': InMemoryEventStorage(cfg['eventsQueueSize']),
321324
}
322325

326+
imp_counter = Counter() if cfg['impressionsMode'] == 'OPTIMIZED' else None
327+
328+
strategies = {
329+
'OPTIMIZED': StrategyOptimizedMode(imp_counter),
330+
'DEBUG' : StrategyDebugMode(),
331+
'NONE' : StrategyNoneMode(imp_counter),
332+
}
333+
imp_strategy = strategies[cfg['impressionsMode']]
334+
323335
imp_manager = ImpressionsManager(
324-
cfg['impressionsMode'],
325-
True,
326-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
336+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata),
337+
imp_strategy)
327338

328339
synchronizers = SplitSynchronizers(
329340
SplitSynchronizer(apis['splits'], storages['splits']),
@@ -332,8 +343,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
332343
cfg['impressionsBulkSize']),
333344
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
334345
ImpressionsCountSynchronizer(apis['impressions'], imp_manager),
335-
UniqueKeysSynchronizer(), # TODO: Pass the UniqueKeysTracker instance fetched from Strategy instance created above.
336-
ClearFilterSynchronizer(), # TODO: Pass the UniqueKeysTracker instance fetched from Strategy instance created above.
337346
)
338347

339348
tasks = SplitTasks(
@@ -351,10 +360,18 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
351360
),
352361
EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']),
353362
ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters),
354-
UniqueKeysSyncTask(synchronizers.unique_keys_sync.SendAll),
355-
ClearFilterSyncTask(synchronizers.clear_filter_sync.clearAll)
356363
)
357364

365+
if cfg['impressionsMode'] == 'NONE':
366+
synchronizers.set_none_syncs(
367+
UniqueKeysSynchronizer(imp_strategy._unique_keys_tracker),
368+
ClearFilterSynchronizer(imp_strategy._unique_keys_tracker),
369+
)
370+
tasks.set_none_tasks(
371+
UniqueKeysSyncTask(synchronizers.unique_keys_sync.SendAll),
372+
ClearFilterSyncTask(synchronizers.clear_filter_sync.clearAll)
373+
)
374+
358375
synchronizer = Synchronizer(synchronizers, tasks)
359376

360377
preforked_initialization = cfg.get('preforkedInitialization', False)

splitio/engine/impressions.py

Lines changed: 7 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -1,160 +1,19 @@
11
"""Split evaluator module."""
2-
import threading
3-
from collections import defaultdict, namedtuple
42
from enum import Enum
53

6-
from splitio.models.impressions import Impression
7-
from splitio.engine.hashfns import murmur_128
8-
from splitio.engine.cache.lru import SimpleLruCache
94
from splitio.client.listener import ImpressionListenerException
10-
from splitio import util
11-
12-
13-
_TIME_INTERVAL_MS = 3600 * 1000 # one hour
14-
_IMPRESSION_OBSERVER_CACHE_SIZE = 500000
15-
165

176
class ImpressionsMode(Enum):
187
"""Impressions tracking mode."""
198

209
OPTIMIZED = "OPTIMIZED"
2110
DEBUG = "DEBUG"
22-
23-
24-
def truncate_time(timestamp_ms):
25-
"""
26-
Truncate a timestamp in milliseconds to have hour granularity.
27-
28-
:param timestamp_ms: timestamp generated in the impression.
29-
:type timestamp_ms: int
30-
31-
:returns: a timestamp with hour, min, seconds, and ms set to 0.
32-
:rtype: int
33-
"""
34-
return timestamp_ms - (timestamp_ms % _TIME_INTERVAL_MS)
35-
36-
37-
class Hasher(object): # pylint:disable=too-few-public-methods
38-
"""Impression hasher."""
39-
40-
_PATTERN = "%s:%s:%s:%s:%d"
41-
42-
def __init__(self, hash_fn=murmur_128, seed=0):
43-
"""
44-
Class constructor.
45-
46-
:param hash_fn: Hash function to apply (str, int) -> int
47-
:type hash_fn: callable
48-
49-
:param seed: seed to be provided when hashing
50-
:type seed: int
51-
"""
52-
self._hash_fn = hash_fn
53-
self._seed = seed
54-
55-
def _stringify(self, impression):
56-
"""
57-
Stringify an impression.
58-
59-
:param impression: Impression to stringify using _PATTERN
60-
:type impression: splitio.models.impressions.Impression
61-
62-
:returns: a string representation of the impression
63-
:rtype: str
64-
"""
65-
return self._PATTERN % (impression.matching_key if impression.matching_key else 'UNKNOWN',
66-
impression.feature_name if impression.feature_name else 'UNKNOWN',
67-
impression.treatment if impression.treatment else 'UNKNOWN',
68-
impression.label if impression.label else 'UNKNOWN',
69-
impression.change_number if impression.change_number else 0)
70-
71-
def process(self, impression):
72-
"""
73-
Hash an impression.
74-
75-
:param impression: Impression to hash.
76-
:type impression: splitio.models.impressions.Impression
77-
78-
:returns: a hash of the supplied impression's relevant fields.
79-
:rtype: int
80-
"""
81-
return self._hash_fn(self._stringify(impression), self._seed)
82-
83-
84-
class Observer(object): # pylint:disable=too-few-public-methods
85-
"""Observe impression and add a previous time if applicable."""
86-
87-
def __init__(self, size):
88-
"""Class constructor."""
89-
self._hasher = Hasher()
90-
self._cache = SimpleLruCache(size)
91-
92-
def test_and_set(self, impression):
93-
"""
94-
Examine an impression to determine and set it's previous time accordingly.
95-
96-
:param impression: Impression to track
97-
:type impression: splitio.models.impressions.Impression
98-
99-
:returns: Impression with populated previous time
100-
:rtype: splitio.models.impressions.Impression
101-
"""
102-
previous_time = self._cache.test_and_set(self._hasher.process(impression), impression.time)
103-
return Impression(impression.matching_key,
104-
impression.feature_name,
105-
impression.treatment,
106-
impression.label,
107-
impression.change_number,
108-
impression.bucketing_key,
109-
impression.time,
110-
previous_time)
111-
112-
113-
class Counter(object):
114-
"""Class that counts impressions per timeframe."""
115-
116-
CounterKey = namedtuple('Count', ['feature', 'timeframe'])
117-
CountPerFeature = namedtuple('CountPerFeature', ['feature', 'timeframe', 'count'])
118-
119-
def __init__(self):
120-
"""Class constructor."""
121-
self._data = defaultdict(lambda: 0)
122-
self._lock = threading.Lock()
123-
124-
def track(self, impressions, inc=1):
125-
"""
126-
Register N new impressions for a feature in a specific timeframe.
127-
128-
:param impressions: generated impressions
129-
:type impressions: list[splitio.models.impressions.Impression]
130-
131-
:param inc: amount to increment (defaults to 1)
132-
:type inc: int
133-
"""
134-
keys = [Counter.CounterKey(i.feature_name, truncate_time(i.time)) for i in impressions]
135-
with self._lock:
136-
for key in keys:
137-
self._data[key] += inc
138-
139-
def pop_all(self):
140-
"""
141-
Clear and return all the counters currently stored.
142-
143-
:returns: List of count per feature/timeframe objects
144-
:rtype: list[ImpressionCounter.CountPerFeature]
145-
"""
146-
with self._lock:
147-
old = self._data
148-
self._data = defaultdict(lambda: 0)
149-
150-
return [Counter.CountPerFeature(k.feature, k.timeframe, v)
151-
for (k, v) in old.items()]
152-
11+
NONE = 'NONE'
15312

15413
class Manager(object): # pylint:disable=too-few-public-methods
15514
"""Impression manager."""
15615

157-
def __init__(self, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=None):
16+
def __init__(self, listener=None, strategy=None):
15817
"""
15918
Construct a manger to track and forward impressions to the queue.
16019
@@ -167,8 +26,8 @@ def __init__(self, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=Non
16726
:param listener: Optional impressions listener that will capture all seen impressions.
16827
:type listener: splitio.client.listener.ImpressionListenerWrapper
16928
"""
170-
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if standalone else None
171-
self._counter = Counter() if standalone and mode == ImpressionsMode.OPTIMIZED else None
29+
30+
self._strategy = strategy
17231
self._listener = listener
17332

17433
def process_impressions(self, impressions):
@@ -180,26 +39,9 @@ def process_impressions(self, impressions):
18039
:param impressions: List of impression objects with attributes
18140
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
18241
"""
183-
imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] \
184-
if self._observer else impressions
185-
186-
if self._counter:
187-
self._counter.track([imp for imp, _ in imps])
188-
189-
self._send_impressions_to_listener(imps)
190-
191-
this_hour = truncate_time(util.utctime_ms())
192-
return [imp for imp, _ in imps] if self._counter is None \
193-
else [i for i, _ in imps if i.previous_time is None or i.previous_time < this_hour]
194-
195-
def get_counts(self):
196-
"""
197-
Return counts of impressions per features.
198-
199-
:returns: A list of counter objects.
200-
:rtype: list[Counter.CountPerFeature]
201-
"""
202-
return self._counter.pop_all() if self._counter is not None else []
42+
for_log, for_listener = self._strategy.process_impressions(impressions)
43+
self._send_impressions_to_listener(for_listener)
44+
return for_log
20345

20446
def _send_impressions_to_listener(self, impressions):
20547
"""

0 commit comments

Comments
 (0)