Skip to content

Commit a0a443d

Browse files
committed
update configs
1 parent b93d090 commit a0a443d

File tree

8 files changed

+197
-85
lines changed

8 files changed

+197
-85
lines changed

splitio/client/config.py

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
"""Default settings for the Split.IO SDK Python client."""
22
from __future__ import absolute_import, division, print_function, unicode_literals
3+
34
import os.path
45

6+
from splitio.engine.impressions import ImpressionsMode
7+
8+
59
DEFAULT_CONFIG = {
10+
'operationMode': 'in-memory',
611
'connectionTimeout': 1500,
712
'splitSdkMachineName': None,
813
'splitSdkMachineIp': None,
914
'featuresRefreshRate': 5,
1015
'segmentsRefreshRate': 60,
1116
'metricsRefreshRate': 60,
12-
'impressionsRefreshRate': 10,
17+
'impressionsRefreshRate': 5 * 60,
1318
'impressionsBulkSize': 5000,
1419
'impressionsQueueSize': 10000,
1520
'eventsPushRate': 10,
@@ -47,3 +52,69 @@
4752
'machineIp': None,
4853
'splitFile': os.path.join(os.path.expanduser('~'), '.split')
4954
}
55+
56+
57+
def _parse_operation_mode(apikey, config):
58+
"""
59+
Process incoming config to determine operation mode.
60+
61+
:param config: user supplied config
62+
:type config: dict
63+
64+
:returns: operation mode
65+
:rtype: str
66+
"""
67+
if apikey == 'localhost':
68+
return 'localhost-standalone'
69+
70+
if 'redisHost' in config or 'redisSentinels' in config:
71+
return 'redis-consumer'
72+
73+
if 'uwsgiClient' in config:
74+
return 'uwsgi-consumer'
75+
76+
return 'inmemory-standalone'
77+
78+
def _sanitize_impressions_mode(mode, refresh_rate=None):
79+
"""
80+
Check supplied impressions mode and adjust refresh rate.
81+
82+
:param config: default + supplied config
83+
:type config: dict
84+
85+
:returns: config with sanitized impressions mode & refresh rate
86+
:rtype: config
87+
"""
88+
if not isinstance(mode, ImpressionsMode):
89+
try:
90+
mode = ImpressionsMode(mode)
91+
except ValueError:
92+
mode = ImpressionsMode.OPTIMIZED
93+
94+
if mode == ImpressionsMode.DEBUG:
95+
refresh_rate = max(1, refresh_rate) if refresh_rate is not None else 60
96+
else:
97+
refresh_rate = max(60, refresh_rate) if refresh_rate is not None else 5 * 60
98+
99+
return mode, refresh_rate
100+
101+
def sanitize(apikey, config):
102+
"""
103+
Look for inconsistencies or ill-formed configs and tune it accordingly.
104+
105+
:param apikey: customer's apikey
106+
:type apikey: str
107+
108+
:param config: DEFAULT + user supplied config
109+
:type config: dict
110+
111+
:returns: sanitized config
112+
:rtype: dict
113+
"""
114+
config['operationMode'] = _parse_operation_mode(apikey, config)
115+
processed = DEFAULT_CONFIG.copy()
116+
processed.update(config)
117+
imp_mode, imp_rate = _sanitize_impressions_mode(config.get('impressionsMode'), config.get('impressionsRefreshRate'))
118+
processed['impressionsMode'] = imp_mode
119+
processed['impressionsRefreshRate'] = imp_rate
120+
return processed

splitio/client/factory.py

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from splitio.client.client import Client
1313
from splitio.client import input_validator
1414
from splitio.client.manager import SplitManager
15-
from splitio.client.config import DEFAULT_CONFIG
15+
from splitio.client.config import sanitize as sanitize_config
1616
from splitio.client import util
1717
from splitio.client.listener import ImpressionListenerWrapper
1818
from splitio.engine.impressions import Manager as ImpressionsManager
@@ -38,7 +38,7 @@
3838
# Tasks
3939
from splitio.tasks.split_sync import SplitSynchronizationTask
4040
from splitio.tasks.segment_sync import SegmentSynchronizationTask
41-
from splitio.tasks.impressions_sync import ImpressionsSyncTask
41+
from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask
4242
from splitio.tasks.events_sync import EventsSyncTask
4343
from splitio.tasks.telemetry_sync import TelemetrySynchronizationTask
4444

@@ -234,13 +234,11 @@ def _wrap_impression_listener(listener, metadata):
234234
return None
235235

236236

237-
def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): # pylint: disable=too-many-locals
237+
def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None): # pylint: disable=too-many-locals
238238
"""Build and return a split factory tailored to the supplied config."""
239239
if not input_validator.validate_factory_instantiation(api_key):
240240
return None
241241

242-
cfg = DEFAULT_CONFIG.copy()
243-
cfg.update(config)
244242
http_client = HttpClient(
245243
sdk_url=sdk_url,
246244
events_url=events_url,
@@ -272,6 +270,12 @@ def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): #
272270
segments_ready_flag = threading.Event()
273271
sdk_ready_flag = threading.Event()
274272

273+
imp_manager = ImpressionsManager(
274+
storages['impressions'].put,
275+
cfg['impressionsMode'],
276+
True,
277+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
278+
275279
tasks = {
276280
'splits': SplitSynchronizationTask(
277281
apis['splits'],
@@ -295,6 +299,11 @@ def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): #
295299
cfg['impressionsBulkSize']
296300
),
297301

302+
'impressions_count': ImpressionsCountSyncTask(
303+
apis['impressions'],
304+
imp_manager
305+
),
306+
298307
'events': EventsSyncTask(
299308
apis['events'],
300309
storages['events'],
@@ -312,6 +321,7 @@ def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): #
312321
# Start tasks that have no dependencies
313322
tasks['splits'].start()
314323
tasks['impressions'].start()
324+
tasks['impressions_count'].start()
315325
tasks['events'].start()
316326
tasks['telemetry'].start()
317327

@@ -334,22 +344,13 @@ def segment_ready_task():
334344
segment_completion_thread = threading.Thread(target=segment_ready_task)
335345
segment_completion_thread.setDaemon(True)
336346
segment_completion_thread.start()
337-
return SplitFactory(
338-
api_key,
339-
storages,
340-
cfg['labelsEnabled'],
341-
ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True,
342-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
343-
apis,
344-
tasks,
345-
sdk_ready_flag,
346-
)
347347

348+
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
349+
imp_manager, apis, tasks, sdk_ready_flag)
348350

349-
def _build_redis_factory(api_key, config):
351+
352+
def _build_redis_factory(api_key, cfg):
350353
"""Build and return a split factory with redis-based storage."""
351-
cfg = DEFAULT_CONFIG.copy()
352-
cfg.update(config)
353354
sdk_metadata = util.get_metadata(cfg)
354355
redis_adapter = redis.build(cfg)
355356
cache_enabled = cfg.get('redisLocalCacheEnabled', False)
@@ -370,10 +371,8 @@ def _build_redis_factory(api_key, config):
370371
)
371372

372373

373-
def _build_uwsgi_factory(api_key, config):
374+
def _build_uwsgi_factory(api_key, cfg):
374375
"""Build and return a split factory with redis-based storage."""
375-
cfg = DEFAULT_CONFIG.copy()
376-
cfg.update(config)
377376
sdk_metadata = util.get_metadata(cfg)
378377
uwsgi_adapter = get_uwsgi()
379378
storages = {
@@ -392,10 +391,8 @@ def _build_uwsgi_factory(api_key, config):
392391
)
393392

394393

395-
def _build_localhost_factory(config):
394+
def _build_localhost_factory(cfg):
396395
"""Build and return a localhost factory for testing/development purposes."""
397-
cfg = DEFAULT_CONFIG.copy()
398-
cfg.update(config)
399396
storages = {
400397
'splits': InMemorySplitStorage(),
401398
'segments': InMemorySegmentStorage(), # not used, just to avoid possible future errors.
@@ -413,12 +410,12 @@ def _build_localhost_factory(config):
413410
)}
414411
tasks['splits'].start()
415412
return SplitFactory(
416-
'localhost',
417-
storages,
418-
False,
413+
'localhost',
414+
storages,
415+
False,
419416
ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True, None),
420-
None,
421-
tasks,
417+
None,
418+
tasks,
422419
ready_event
423420
)
424421

@@ -444,15 +441,15 @@ def get_factory(api_key, **kwargs):
444441
"(Singleton pattern) and reusing it throughout your application."
445442
)
446443

447-
config = kwargs.get('config', {})
444+
config = sanitize_config(api_key, kwargs.get('config', {}))
448445

449-
if api_key == 'localhost':
446+
if config['operationMode'] == 'localhost-standalone':
450447
return _build_localhost_factory(config)
451448

452-
if 'redisHost' in config or 'redisSentinels' in config:
449+
if config['operationMode'] == 'redis-consumer':
453450
return _build_redis_factory(api_key, config)
454451

455-
if 'uwsgiClient' in config:
452+
if config['operationMode'] == 'uwsgi-consumer':
456453
return _build_uwsgi_factory(api_key, config)
457454

458455
return _build_in_memory_factory(

splitio/engine/impressions.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,6 @@ def __init__(self, forwarder, mode=ImpressionsMode.OPTIMIZED, standalone=True, l
173173
:param listener: Optional impressions listener that will capture all seen impressions.
174174
:type listener: splitio.client.listener.ImpressionListenerWrapper
175175
"""
176-
if not isinstance(mode, ImpressionsMode):
177-
try:
178-
mode = ImpressionsMode(mode)
179-
except ValueError:
180-
mode = ImpressionsMode.OPTIMIZED
181-
182176
self._forwarder = forwarder
183177
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if standalone else None
184178
self._counter = Counter() if standalone and mode == ImpressionsMode.OPTIMIZED else None

splitio/tasks/impressions_sync.py

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from splitio.api import APIException
1010
from splitio.tasks import BaseSynchronizationTask
1111
from splitio.tasks.util.asynctask import AsyncTask
12-
from splitio.engine.impressions import Manager as ImpressionsManager
1312

1413

1514
class ImpressionsSyncTask(BaseSynchronizationTask):
@@ -105,60 +104,34 @@ def flush(self):
105104
class ImpressionsCountSyncTask(BaseSynchronizationTask):
106105
"""Impressions synchronization task uses an asynctask.AsyncTask to send impressions."""
107106

108-
def __init__(self, impressions_manager):
107+
_PERIOD = 30 * 60 # 30 minutes
108+
109+
def __init__(self, impressions_api, impressions_manager):
109110
"""
110111
Class constructor.
111112
113+
:param impressions_api: Impressions Api object to send data to the backend
114+
:type impressions_api: splitio.api.impressions.ImpressionsAPI
115+
112116
:param impressions_manager: Impressions manager instance
113-
:type impressions_manager: ImpressionsManager
117+
:type impressions_manager: splitio.engine.impressions.Manager
114118
"""
115119
self._logger = logging.getLogger(self.__class__.__name__)
120+
self._impressions_api = impressions_api
116121
self._impressions_manager = impressions_manager
117-
self._task = AsyncTask(self._send_impressions, self._period, on_stop=self._send_impressions)
122+
self._task = AsyncTask(self._send_counters, self._PERIOD, on_stop=self._send_counters)
118123

119-
def _get_failed(self):
120-
"""Return up to <BULK_SIZE> impressions stored in the failed impressions queue."""
121-
imps = []
122-
count = 0
123-
while count < self._bulk_size:
124-
try:
125-
imps.append(self._failed.get(False))
126-
count += 1
127-
except queue.Empty:
128-
# If no more items in queue, break the loop
129-
break
130-
return imps
131-
132-
def _add_to_failed_queue(self, imps):
133-
"""
134-
Add impressions that were about to be sent to a secondary queue for failed sends.
135-
136-
:param imps: List of impressions that failed to be pushed.
137-
:type imps: list
138-
"""
139-
for impression in imps:
140-
self._failed.put(impression, False)
141-
142-
def _send_impressions(self):
124+
def _send_counters(self):
143125
"""Send impressions from both the failed and new queues."""
144-
to_send = self._get_failed()
145-
if len(to_send) < self._bulk_size:
146-
# If the amount of previously failed items is less than the bulk
147-
# size, try to complete with new impressions from storage
148-
to_send.extend(self._storage.pop_many(self._bulk_size - len(to_send)))
149-
150-
if not to_send:
151-
return
152-
126+
to_send = self._impressions_manager.get_counts()
153127
try:
154-
self._impressions_api.flush_impressions(to_send)
128+
self._impressions_api.flush_counters(to_send)
155129
except APIException as exc:
156130
self._logger.error(
157131
'Exception raised while reporting impressions: %s -- %d',
158132
exc.message,
159133
exc.status_code
160134
)
161-
self._add_to_failed_queue(to_send)
162135

163136
def start(self):
164137
"""Start executing the impressions synchronization task."""

splitio/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '8.2.1'
1+
__version__ = '8.3.0-rc1'

tests/client/test_config.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
"""Configuration unit tests."""
2+
#pylint: disable=protected-access,no-self-use,line-too-long
3+
4+
from splitio.client import config
5+
from splitio.engine.impressions import ImpressionsMode
6+
7+
8+
class ConfigSanitizationTests(object):
9+
"""Inmemory storage-based integration tests."""
10+
11+
def test_parse_operation_mode(self):
12+
"""Make sure operation mode is correctly captured."""
13+
assert config._parse_operation_mode('some', {}) == 'inmemory-standalone'
14+
assert config._parse_operation_mode('localhost', {}) == 'localhost-standalone'
15+
assert config._parse_operation_mode('some', {'redisHost': 'x'}) == 'redis-consumer'
16+
assert config._parse_operation_mode('some', {'uwsgiClient': True}) == 'uwsgi-consumer'
17+
18+
def test_sanitize_imp_mode(self):
19+
"""Test sanitization of impressions mode."""
20+
mode, rate = config._sanitize_impressions_mode('OPTIMIZED', 1)
21+
assert mode == ImpressionsMode.OPTIMIZED
22+
assert rate == 60
23+
24+
mode, rate = config._sanitize_impressions_mode('DEBUG', 1)
25+
assert mode == ImpressionsMode.DEBUG
26+
assert rate == 1
27+
28+
mode, rate = config._sanitize_impressions_mode('ANYTHING', 200)
29+
assert mode == ImpressionsMode.OPTIMIZED
30+
assert rate == 200
31+
32+
mode, rate = config._sanitize_impressions_mode(43, -1)
33+
assert mode == ImpressionsMode.OPTIMIZED
34+
assert rate == 60
35+
36+
mode, rate = config._sanitize_impressions_mode('OPTIMIZED')
37+
assert mode == ImpressionsMode.OPTIMIZED
38+
assert rate == 300
39+
40+
mode, rate = config._sanitize_impressions_mode('DEBUG')
41+
assert mode == ImpressionsMode.DEBUG
42+
assert rate == 60

0 commit comments

Comments
 (0)