Skip to content

Commit 3047f61

Browse files
authored
Merge pull request #186 from splitio/feature/imp_dedupe
Feature/imp dedupe
2 parents b8bddff + 5e15293 commit 3047f61

31 files changed

+101860
-202
lines changed

setup.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323
'futures>=3.0.5;python_version<"3"'
2424
]
2525

26-
with open(path.join(path.abspath(path.dirname(__file__)),
27-
'splitio', 'version.py')) as f:
26+
with open(path.join(path.abspath(path.dirname(__file__)), 'splitio', 'version.py')) as f:
2827
exec(f.read()) # pylint: disable=exec-used
2928

3029
setup(
@@ -38,11 +37,13 @@
3837
license='Apache License 2.0',
3938
install_requires=INSTALL_REQUIRES,
4039
tests_require=TESTS_REQUIRES,
40+
# dependency_links=['https://github.com/splitio/mmh3cffi/tarball/feature/development#egg=mmh3cffi-0.2.0'],
4141
extras_require={
4242
'test': TESTS_REQUIRES,
4343
'redis': ['redis>=2.10.5'],
4444
'uwsgi': ['uwsgi>=2.0.0'],
45-
'cpphash': ['mmh3cffi>=0.1.5']
45+
# 'cpphash': ['mmh3cffi==0.2.0']
46+
'cpphash': ['mmh3cffi@git+https://github.com/splitio/mmh3cffi@development#egg=mmh3cffi']
4647
},
4748
setup_requires=['pytest-runner'],
4849
classifiers=[

splitio/api/impressions.py

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77

88
from splitio.api import APIException, headers_from_metadata
99
from splitio.api.client import HttpClientException
10+
from splitio.engine.impressions import ImpressionsMode
1011

1112

1213
class ImpressionsAPI(object): # pylint: disable=too-few-public-methods
1314
"""Class that uses an httpClient to communicate with the impressions API."""
1415

15-
def __init__(self, client, apikey, sdk_metadata):
16+
def __init__(self, client, apikey, sdk_metadata, mode=ImpressionsMode.OPTIMIZED):
1617
"""
1718
Class constructor.
1819
@@ -25,6 +26,7 @@ def __init__(self, client, apikey, sdk_metadata):
2526
self._client = client
2627
self._apikey = apikey
2728
self._metadata = headers_from_metadata(sdk_metadata)
29+
self._metadata['SplitSDKImpressionsMode'] = mode.name
2830

2931
@staticmethod
3032
def _build_bulk(impressions):
@@ -35,19 +37,20 @@ def _build_bulk(impressions):
3537
:type impressions: list(splitio.models.impressions.Impression)
3638
3739
:return: Dictionary of lists of impressions.
38-
:rtype: dict
40+
:rtype: list
3941
"""
4042
return [
4143
{
42-
'testName': test_name,
43-
'keyImpressions': [
44+
'f': test_name,
45+
'i': [
4446
{
45-
'keyName': impression.matching_key,
46-
'treatment': impression.treatment,
47-
'time': impression.time,
48-
'changeNumber': impression.change_number,
49-
'label': impression.label,
50-
'bucketingKey': impression.bucketing_key
47+
'k': impression.matching_key,
48+
't': impression.treatment,
49+
'm': impression.time,
50+
'c': impression.change_number,
51+
'r': impression.label,
52+
'b': impression.bucketing_key,
53+
'pt': impression.previous_time
5154
}
5255
for impression in imps
5356
]
@@ -58,6 +61,27 @@ def _build_bulk(impressions):
5861
)
5962
]
6063

64+
@staticmethod
65+
def _build_counters(counters):
66+
"""
67+
Build an impression bulk formatted as the API expects it.
68+
69+
:param counters: List of impression counters per feature.
70+
:type counters: list[splitio.engine.impressions.Counter.CountPerFeature]
71+
72+
:return: dict with list of impression count dtos
73+
:rtype: dict
74+
"""
75+
return {
76+
'pf': [
77+
{
78+
'f': pf_count.feature,
79+
'm': pf_count.timeframe,
80+
'rc': pf_count.count
81+
} for pf_count in counters
82+
]
83+
}
84+
6185
def flush_impressions(self, impressions):
6286
"""
6387
Send impressions to the backend.
@@ -80,3 +104,26 @@ def flush_impressions(self, impressions):
80104
self._logger.error('Http client is throwing exceptions')
81105
self._logger.debug('Error: ', exc_info=True)
82106
raise_from(APIException('Impressions not flushed properly.'), exc)
107+
108+
def flush_counters(self, counters):
109+
"""
110+
Send impressions to the backend.
111+
112+
:param impressions: Impressions bulk
113+
:type impressions: list
114+
"""
115+
bulk = self._build_counters(counters)
116+
try:
117+
response = self._client.post(
118+
'events',
119+
'/testImpressions/count',
120+
self._apikey,
121+
body=bulk,
122+
extra_headers=self._metadata
123+
)
124+
if not 200 <= response.status_code < 300:
125+
raise APIException(response.body, response.status_code)
126+
except HttpClientException as exc:
127+
self._logger.error('Http client is throwing exceptions')
128+
self._logger.debug('Error: ', exc_info=True)
129+
raise_from(APIException('Impressions not flushed properly.'), exc)

splitio/client/client.py

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from splitio.models.events import Event, EventWrapper
1212
from splitio.models.telemetry import get_latency_bucket_index
1313
from splitio.client import input_validator
14-
from splitio.client.listener import ImpressionListenerException
14+
from splitio.util import utctime_ms
1515

1616

1717
class Client(object): # pylint: disable=too-many-instance-attributes
@@ -22,7 +22,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
2222
_METRIC_GET_TREATMENT_WITH_CONFIG = 'sdk.getTreatmentWithConfig'
2323
_METRIC_GET_TREATMENTS_WITH_CONFIG = 'sdk.getTreatmentsWithConfig'
2424

25-
def __init__(self, factory, labels_enabled=True, impression_listener=None):
25+
def __init__(self, factory, impressions_manager, labels_enabled=True):
2626
"""
2727
Construct a Client instance.
2828
@@ -32,15 +32,15 @@ def __init__(self, factory, labels_enabled=True, impression_listener=None):
3232
:param labels_enabled: Whether to store labels on impressions
3333
:type labels_enabled: bool
3434
35-
:param impression_listener: impression listener implementation
36-
:type impression_listener: ImpressionListener
35+
:param impressions_manager: impression manager instance
36+
:type impressions_manager: splitio.engine.impressions.Manager
3737
3838
:rtype: Client
3939
"""
4040
self._logger = logging.getLogger(self.__class__.__name__)
4141
self._factory = factory
4242
self._labels_enabled = labels_enabled
43-
self._impression_listener = impression_listener
43+
self._impressions_manager = impressions_manager
4444

4545
self._splitter = Splitter()
4646
self._split_storage = factory._get_storage('splits') # pylint: disable=protected-access
@@ -68,25 +68,6 @@ def destroyed(self):
6868
"""Return whether the factory holding this client has been destroyed."""
6969
return self._factory.destroyed
7070

71-
def _send_impression_to_listener(self, impression, attributes):
72-
"""
73-
Send impression result to custom listener.
74-
75-
:param impression: Generated impression
76-
:type impression: Impression
77-
78-
:param attributes: An optional dictionary of attributes
79-
:type attributes: dict
80-
"""
81-
if self._impression_listener is not None:
82-
try:
83-
self._impression_listener.log_impression(impression, attributes)
84-
except ImpressionListenerException:
85-
self._logger.error(
86-
'An exception was raised while calling user-custom impression listener'
87-
)
88-
self._logger.debug('Error', exc_info=True)
89-
9071
def _evaluate_if_ready(self, matching_key, bucketing_key, feature, attributes=None):
9172
if not self.ready:
9273
return {
@@ -135,11 +116,10 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
135116
result['impression']['label'],
136117
result['impression']['change_number'],
137118
bucketing_key,
138-
start
119+
utctime_ms(),
139120
)
140121

141-
self._record_stats([impression], start, metric_name)
142-
self._send_impression_to_listener(impression, attributes)
122+
self._record_stats([(impression, attributes)], start, metric_name)
143123
return result['treatment'], result['configurations']
144124
except Exception: # pylint: disable=broad-except
145125
self._logger.error('Error getting treatment for feature')
@@ -152,10 +132,9 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
152132
Label.EXCEPTION,
153133
self._split_storage.get_change_number(),
154134
bucketing_key,
155-
start
135+
utctime_ms(),
156136
)
157-
self._record_stats([impression], start, metric_name)
158-
self._send_impression_to_listener(impression, attributes)
137+
self._record_stats([(impression, attributes)], start, metric_name)
159138
except Exception: # pylint: disable=broad-except
160139
self._logger.error('Error reporting impression into get_treatment exception block')
161140
self._logger.debug('Error: ', exc_info=True)
@@ -200,7 +179,7 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
200179
result['impression']['label'],
201180
result['impression']['change_number'],
202181
bucketing_key,
203-
start)
182+
utctime_ms())
204183

205184
bulk_impressions.append(impression)
206185
treatments[feature] = (result['treatment'], result['configurations'])
@@ -215,9 +194,11 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
215194
# Register impressions
216195
try:
217196
if bulk_impressions:
218-
self._record_stats(bulk_impressions, start, self._METRIC_GET_TREATMENTS)
219-
for impression in bulk_impressions:
220-
self._send_impression_to_listener(impression, attributes)
197+
self._record_stats(
198+
[(i, attributes) for i in bulk_impressions],
199+
start,
200+
metric_name
201+
)
221202
except Exception: # pylint: disable=broad-except
222203
self._logger.error('%s: An exception when trying to store '
223204
'impressions.' % method_name)
@@ -350,7 +331,7 @@ def _record_stats(self, impressions, start, operation):
350331
Record impressions and metrics.
351332
352333
:param impressions: Generated impressions
353-
:type impressions: list||Impression
334+
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
354335
355336
:param start: timestamp when get_treatment or get_treatments was called
356337
:type start: int
@@ -360,7 +341,7 @@ def _record_stats(self, impressions, start, operation):
360341
"""
361342
try:
362343
end = int(round(time.time() * 1000))
363-
self._impressions_storage.put(impressions)
344+
self._impressions_manager.track(impressions)
364345
self._telemetry_storage.inc_latency(operation, get_latency_bucket_index(end - start))
365346
except Exception: # pylint: disable=broad-except
366347
self._logger.error('Error recording impressions and metrics')
@@ -409,7 +390,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
409390
traffic_type_name=traffic_type,
410391
event_type_id=event_type,
411392
value=value,
412-
timestamp=int(time.time()*1000),
393+
timestamp=utctime_ms(),
413394
properties=properties,
414395
)
415396
return self._events_storage.put([EventWrapper(

splitio/client/config.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,32 @@
11
"""Default settings for the Split.IO SDK Python client."""
22
from __future__ import absolute_import, division, print_function, unicode_literals
3+
34
import os.path
5+
import logging
6+
7+
from splitio.engine.impressions import ImpressionsMode
8+
9+
10+
_LOGGER = logging.getLogger(__name__)
11+
412

513
DEFAULT_CONFIG = {
14+
'operationMode': 'in-memory',
615
'connectionTimeout': 1500,
716
'splitSdkMachineName': None,
817
'splitSdkMachineIp': None,
918
'featuresRefreshRate': 5,
1019
'segmentsRefreshRate': 60,
1120
'metricsRefreshRate': 60,
12-
'impressionsRefreshRate': 10,
21+
'impressionsRefreshRate': 5 * 60,
1322
'impressionsBulkSize': 5000,
1423
'impressionsQueueSize': 10000,
1524
'eventsPushRate': 10,
1625
'eventsBulkSize': 5000,
1726
'eventsQueueSize': 10000,
1827
'labelsEnabled': True,
1928
'IPAddressesEnabled': True,
29+
'impressionsMode': 'OPTIMIZED',
2030
'impressionListener': None,
2131
'redisLocalCacheEnabled': False,
2232
'redisLocalCacheTTL': 5,
@@ -46,3 +56,73 @@
4656
'machineIp': None,
4757
'splitFile': os.path.join(os.path.expanduser('~'), '.split')
4858
}
59+
60+
61+
def _parse_operation_mode(apikey, config):
62+
"""
63+
Process incoming config to determine operation mode.
64+
65+
:param config: user supplied config
66+
:type config: dict
67+
68+
:returns: operation mode
69+
:rtype: str
70+
"""
71+
if apikey == 'localhost':
72+
return 'localhost-standalone'
73+
74+
if 'redisHost' in config or 'redisSentinels' in config:
75+
return 'redis-consumer'
76+
77+
if 'uwsgiClient' in config:
78+
return 'uwsgi-consumer'
79+
80+
return 'inmemory-standalone'
81+
82+
def _sanitize_impressions_mode(mode, refresh_rate=None):
83+
"""
84+
Check supplied impressions mode and adjust refresh rate.
85+
86+
:param config: default + supplied config
87+
:type config: dict
88+
89+
:returns: config with sanitized impressions mode & refresh rate
90+
:rtype: config
91+
"""
92+
if not isinstance(mode, ImpressionsMode):
93+
try:
94+
mode = ImpressionsMode(mode.upper())
95+
except (ValueError, AttributeError):
96+
_LOGGER.warning('You passed an invalid impressionsMode, impressionsMode should be '
97+
'one of the following values: `debug` or `optimized`. '
98+
'Defaulting to `optimized` mode.')
99+
mode = ImpressionsMode.OPTIMIZED
100+
101+
if mode == ImpressionsMode.DEBUG:
102+
refresh_rate = max(1, refresh_rate) if refresh_rate is not None else 60
103+
else:
104+
refresh_rate = max(60, refresh_rate) if refresh_rate is not None else 5 * 60
105+
106+
return mode, refresh_rate
107+
108+
def sanitize(apikey, config):
109+
"""
110+
Look for inconsistencies or ill-formed configs and tune it accordingly.
111+
112+
:param apikey: customer's apikey
113+
:type apikey: str
114+
115+
:param config: DEFAULT + user supplied config
116+
:type config: dict
117+
118+
:returns: sanitized config
119+
:rtype: dict
120+
"""
121+
config['operationMode'] = _parse_operation_mode(apikey, config)
122+
processed = DEFAULT_CONFIG.copy()
123+
processed.update(config)
124+
imp_mode, imp_rate = _sanitize_impressions_mode(config.get('impressionsMode'),
125+
config.get('impressionsRefreshRate'))
126+
processed['impressionsMode'] = imp_mode
127+
processed['impressionsRefreshRate'] = imp_rate
128+
return processed

0 commit comments

Comments
 (0)