Skip to content

Commit 3518309

Browse files
authored
Merge pull request #183 from splitio/impression_counter
add counter and manager
2 parents e994cb7 + 4589b38 commit 3518309

File tree

3 files changed

+665
-7
lines changed

3 files changed

+665
-7
lines changed

splitio/engine/impmanager.py

Lines changed: 117 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,42 @@
11
"""Split evaluator module."""
22
import logging
3+
import threading
4+
from collections import defaultdict, namedtuple
5+
from enum import Enum
6+
7+
import six
38

49
from splitio.models.impressions import Impression
510
from splitio.engine.hashfns import murmur_128
611
from splitio.engine.cache.lru import SimpleLruCache
12+
from splitio import util
13+
14+
15+
_TIME_INTERVAL_MS = 3600 * 1000 # one hour
16+
_IMPRESSION_OBSERVER_CACHE_SIZE = 500000
17+
18+
19+
class ImpressionsMode(Enum):
20+
"""Impressions tracking mode."""
21+
22+
OPTIMIZED = "OPTIMIZED"
23+
DEBUG = "DEBUG"
24+
25+
26+
def truncate_time(timestamp_ms):
27+
"""
28+
Truncate a timestamp in milliseconds to have hour granularity.
29+
30+
:param timestamp_ms: timestamp generated in the impression.
31+
:type timestamp_ms: int
32+
33+
:returns: a timestamp with hour, min, seconds, and ms set to 0.
34+
:rtype: int
35+
"""
36+
return timestamp_ms - (timestamp_ms % _TIME_INTERVAL_MS)
737

838

9-
class Hasher(object):
39+
class Hasher(object): # pylint:disable=too-few-public-methods
1040
"""Impression hasher."""
1141

1242
_PATTERN = "%s:%s:%s:%s:%d"
@@ -53,7 +83,7 @@ def process(self, impression):
5383
return self._hash_fn(self._stringify(impression), self._seed)
5484

5585

56-
class Observer(object):
86+
class Observer(object): # pylint:disable=too-few-public-methods
5787
"""Observe impression and add a previous time if applicable."""
5888

5989
def __init__(self, size):
@@ -82,8 +112,90 @@ def test_and_set(self, impression):
82112
previous_time)
83113

84114

85-
class Manager(object):
115+
class Counter(object):
116+
"""Class that counts impressions per timeframe."""
117+
118+
CounterKey = namedtuple('Count', ['feature', 'timeframe'])
119+
CountPerFeature = namedtuple('Count', ['feature', 'timeframe', 'count'])
120+
121+
def __init__(self):
122+
"""Class constructor."""
123+
self._data = defaultdict(lambda: 0)
124+
self._lock = threading.Lock()
125+
126+
def track(self, impressions, inc=1):
127+
"""
128+
Register N new impressions for a feature in a specific timeframe.
129+
130+
:param impressions: generated impressions
131+
:type impressions: list[splitio.models.impressions.Impression]
132+
133+
:param inc: amount to increment (defaults to 1)
134+
:type inc: int
135+
"""
136+
keys = [Counter.CounterKey(i.feature_name, truncate_time(i.time)) for i in impressions]
137+
with self._lock:
138+
for key in keys:
139+
self._data[key] += inc
140+
141+
def pop_all(self):
142+
"""
143+
Clear and return all the counters currently stored.
144+
145+
:returns: List of count per feature/timeframe objects
146+
:rtype: list[ImpressionCounter.CountPerFeature]
147+
"""
148+
with self._lock:
149+
old = self._data
150+
self._data = defaultdict(lambda: 0)
151+
152+
return [Counter.CountPerFeature(k.feature, k.timeframe, v)
153+
for (k, v) in six.iteritems(old)]
154+
155+
156+
class Manager(object): # pylint:disable=too-few-public-methods
86157
"""Impression manager."""
87158

88-
#TODO: implement
89-
pass
159+
def __init__(self, forwarder, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=None):
160+
"""
161+
Construct a manger to track and forward impressions to the queue.
162+
163+
:param forwarder: function accepting a list of impressions to be added to the queue.
164+
:type forwarder: callable[list[splitio.models.impressions.Impression]]
165+
166+
:param mode: Impressions capturing mode.
167+
:type mode: ImpressionsMode
168+
169+
:param standalone: whether the SDK is running in standalone sending impressions by itself
170+
:type standalone: bool
171+
172+
:param listener: Optional impressions listener that will capture all seen impressions.
173+
:type listener: splitio.client.listener.ImpressionListenerWrapper
174+
"""
175+
self._forwarder = forwarder
176+
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if standalone else None
177+
self._counter = Counter() if standalone and mode == ImpressionsMode.OPTIMIZED else None
178+
self._listener = listener
179+
180+
def track(self, impressions):
181+
"""
182+
Track impressions.
183+
184+
Impressions are analyzed to see if they've been seen before and counted.
185+
186+
:param impressions: List of impression objects
187+
:type impressions: list[splitio.models.impression.Impression]
188+
"""
189+
imps = [self._observer.test_and_set(i) for i in impressions] if self._observer \
190+
else impressions
191+
192+
if self._counter:
193+
self._counter.track(imps)
194+
195+
if self._listener:
196+
for imp in imps:
197+
self._listener.log_impression(imp)
198+
199+
this_hour = truncate_time(util.utctime_ms())
200+
self._forwarder(imps if self._counter is None
201+
else [i for i in imps if i.previous_time is None or i.previous_time < this_hour]) # pylint:disable=line-too-long

splitio/util/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Utilities."""
2+
from datetime import datetime
3+
4+
5+
EPOCH_DATETIME = datetime(1970, 1, 1)
6+
7+
def utctime():
8+
"""
9+
Return the utc time in nanoseconds.
10+
11+
:returns: utc time in nanoseconds.
12+
:rtype: float
13+
"""
14+
return (datetime.utcnow() - EPOCH_DATETIME).total_seconds()
15+
16+
17+
def utctime_ms():
18+
"""
19+
Return the utc time in milliseconds.
20+
21+
:returns: utc time in milliseconds.
22+
:rtype: int
23+
"""
24+
return int(utctime() * 1000)

0 commit comments

Comments
 (0)