Skip to content

Commit b3fea3c

Browse files
author
Matias Melograno
committed
added pipelined op for redis
1 parent 8ab97c1 commit b3fea3c

File tree

14 files changed

+659
-299
lines changed

14 files changed

+659
-299
lines changed

splitio/client/client.py

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from splitio.models.telemetry import get_latency_bucket_index
1313
from splitio.client import input_validator
1414
from splitio.util import utctime_ms
15+
from splitio.recorder.recorder import StandardRecorder
1516

1617

1718
_LOGGER = logging.getLogger(__name__)
@@ -25,7 +26,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
2526
_METRIC_GET_TREATMENT_WITH_CONFIG = 'sdk.getTreatmentWithConfig'
2627
_METRIC_GET_TREATMENTS_WITH_CONFIG = 'sdk.getTreatmentsWithConfig'
2728

28-
def __init__(self, factory, impressions_manager, labels_enabled=True):
29+
def __init__(self, factory, recorder, labels_enabled=True):
2930
"""
3031
Construct a Client instance.
3132
@@ -35,21 +36,18 @@ def __init__(self, factory, impressions_manager, labels_enabled=True):
3536
:param labels_enabled: Whether to store labels on impressions
3637
:type labels_enabled: bool
3738
38-
:param impressions_manager: impression manager instance
39-
:type impressions_manager: splitio.engine.impressions.Manager
39+
:param recorder: recorder instance
40+
:type recorder: splitio.recorder.StatsRecorder
4041
4142
:rtype: Client
4243
"""
4344
self._factory = factory
4445
self._labels_enabled = labels_enabled
45-
self._impressions_manager = impressions_manager
46-
46+
self._recorder = recorder
4747
self._splitter = Splitter()
4848
self._split_storage = factory._get_storage('splits') # pylint: disable=protected-access
4949
self._segment_storage = factory._get_storage('segments') # pylint: disable=protected-access
50-
self._impressions_storage = factory._get_storage('impressions') # pylint: disable=protected-access
5150
self._events_storage = factory._get_storage('events') # pylint: disable=protected-access
52-
self._telemetry_storage = factory._get_storage('telemetry') # pylint: disable=protected-access
5351
self._evaluator = Evaluator(self._split_storage, self._segment_storage, self._splitter)
5452

5553
def destroy(self):
@@ -341,13 +339,9 @@ def _record_stats(self, impressions, start, operation):
341339
:param operation: operation performed.
342340
:type operation: str
343341
"""
344-
try:
345-
end = int(round(time.time() * 1000))
346-
self._impressions_manager.track(impressions)
347-
self._telemetry_storage.inc_latency(operation, get_latency_bucket_index(end - start))
348-
except Exception: # pylint: disable=broad-except
349-
_LOGGER.error('Error recording impressions and metrics')
350-
_LOGGER.debug('Error: ', exc_info=True)
342+
end = int(round(time.time() * 1000))
343+
self._recorder.record_treatment_stats(impressions, get_latency_bucket_index(end - start),
344+
operation)
351345

352346
def track(self, key, traffic_type, event_type, value=None, properties=None):
353347
"""
@@ -395,7 +389,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
395389
timestamp=utctime_ms(),
396390
properties=properties,
397391
)
398-
return self._events_storage.put([EventWrapper(
392+
return self._recorder.record_track_stats([EventWrapper(
399393
event=event,
400394
size=size,
401395
)])

splitio/client/factory.py

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
from splitio.api.telemetry import TelemetryAPI
3636
from splitio.api.auth import AuthAPI
3737

38-
3938
# Tasks
4039
from splitio.tasks.split_sync import SplitSynchronizationTask
4140
from splitio.tasks.segment_sync import SegmentSynchronizationTask
@@ -53,6 +52,9 @@
5352
from splitio.sync.event import EventSynchronizer
5453
from splitio.sync.telemetry import TelemetrySynchronizer
5554

55+
# Recorder
56+
from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder
57+
5658
# Localhost stuff
5759
from splitio.client.localhost import LocalhostEventsStorage, LocalhostImpressionsStorage, \
5860
LocalhostTelemetryStorage
@@ -85,7 +87,7 @@ def __init__( # pylint: disable=too-many-arguments
8587
apikey,
8688
storages,
8789
labels_enabled,
88-
impressions_manager,
90+
recorder,
8991
sync_manager=None,
9092
sdk_ready_flag=None,
9193
):
@@ -102,16 +104,16 @@ def __init__( # pylint: disable=too-many-arguments
102104
:type sync_manager: splitio.sync.manager.Manager
103105
:param sdk_ready_flag: Event to set when the sdk is ready.
104106
:type sdk_ready_flag: threading.Event
105-
:param impression_manager: Impressions manager instance
106-
:type impression_listener: ImpressionsManager
107+
:param recorder: StatsRecorder instance
108+
:type recorder: StatsRecorder
107109
"""
108110
self._apikey = apikey
109111
self._storages = storages
110112
self._labels_enabled = labels_enabled
111113
self._sync_manager = sync_manager
112114
self._sdk_internal_ready_flag = sdk_ready_flag
113115
self._sdk_ready_flag = threading.Event()
114-
self._impressions_manager = impressions_manager
116+
self._recorder = recorder
115117

116118
# If we have a ready flag, it means we have sync tasks that need to finish
117119
# before the SDK client becomes ready.
@@ -150,7 +152,7 @@ def client(self):
150152
This client is only a set of references to structures hold by the factory.
151153
Creating one a fast operation and safe to be used anywhere.
152154
"""
153-
return Client(self, self._impressions_manager, self._labels_enabled)
155+
return Client(self, self._recorder, self._labels_enabled)
154156

155157
def manager(self):
156158
"""
@@ -280,7 +282,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
280282
}
281283

282284
imp_manager = ImpressionsManager(
283-
storages['impressions'].put,
284285
cfg['impressionsMode'],
285286
True,
286287
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
@@ -329,8 +330,14 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
329330
storages['events'].set_queue_full_hook(tasks.events_task.flush)
330331
storages['impressions'].set_queue_full_hook(tasks.impressions_task.flush)
331332

333+
recorder = StandardRecorder(
334+
imp_manager,
335+
storages['telemetry'],
336+
storages['events'],
337+
storages['impressions'],
338+
)
332339
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
333-
imp_manager, manager, sdk_ready_flag)
340+
recorder, manager, sdk_ready_flag)
334341

335342

336343
def _build_redis_factory(api_key, cfg):
@@ -346,12 +353,19 @@ def _build_redis_factory(api_key, cfg):
346353
'events': RedisEventsStorage(redis_adapter, sdk_metadata),
347354
'telemetry': RedisTelemetryStorage(redis_adapter, sdk_metadata)
348355
}
356+
recorder = PipelinedRecorder(
357+
redis_adapter,
358+
ImpressionsManager(cfg['impressionsMode'], False,
359+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
360+
storages['telemetry'],
361+
storages['events'],
362+
storages['impressions'],
363+
)
349364
return SplitFactory(
350365
api_key,
351366
storages,
352367
cfg['labelsEnabled'],
353-
ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], False,
354-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
368+
recorder,
355369
)
356370

357371

@@ -366,12 +380,18 @@ def _build_uwsgi_factory(api_key, cfg):
366380
'events': UWSGIEventStorage(uwsgi_adapter),
367381
'telemetry': UWSGITelemetryStorage(uwsgi_adapter)
368382
}
383+
recorder = StandardRecorder(
384+
ImpressionsManager(cfg['impressionsMode'], True,
385+
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
386+
storages['telemetry'],
387+
storages['events'],
388+
storages['impressions'],
389+
)
369390
return SplitFactory(
370391
api_key,
371392
storages,
372393
cfg['labelsEnabled'],
373-
ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True,
374-
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata))
394+
recorder,
375395
)
376396

377397

@@ -401,12 +421,17 @@ def _build_localhost_factory(cfg):
401421
synchronizer = LocalhostSynchronizer(synchronizers, tasks)
402422
manager = Manager(ready_event, synchronizer, None, False)
403423
manager.start()
404-
424+
recorder = StandardRecorder(
425+
ImpressionsManager(cfg['impressionsMode'], True, None),
426+
storages['telemetry'],
427+
storages['events'],
428+
storages['impressions'],
429+
)
405430
return SplitFactory(
406431
'localhost',
407432
storages,
408433
False,
409-
ImpressionsManager(storages['impressions'].put, cfg['impressionsMode'], True, None),
434+
recorder,
410435
manager,
411436
ready_event
412437
)

splitio/engine/impressions.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,10 @@ def pop_all(self):
157157
class Manager(object): # pylint:disable=too-few-public-methods
158158
"""Impression manager."""
159159

160-
def __init__(self, forwarder, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=None):
160+
def __init__(self, mode=ImpressionsMode.OPTIMIZED, standalone=True, listener=None):
161161
"""
162162
Construct a manger to track and forward impressions to the queue.
163163
164-
:param forwarder: function accepting a list of impressions to be added to the queue.
165-
:type forwarder: callable[list[splitio.models.impressions.Impression]]
166-
167164
:param mode: Impressions capturing mode.
168165
:type mode: ImpressionsMode
169166
@@ -173,31 +170,30 @@ def __init__(self, forwarder, mode=ImpressionsMode.OPTIMIZED, standalone=True, l
173170
:param listener: Optional impressions listener that will capture all seen impressions.
174171
:type listener: splitio.client.listener.ImpressionListenerWrapper
175172
"""
176-
self._forwarder = forwarder
177173
self._observer = Observer(_IMPRESSION_OBSERVER_CACHE_SIZE) if standalone else None
178174
self._counter = Counter() if standalone and mode == ImpressionsMode.OPTIMIZED else None
179175
self._listener = listener
180176

181-
def track(self, impressions):
177+
def process_impressions(self, impressions):
182178
"""
183-
Track impressions.
179+
Process impressions.
184180
185181
Impressions are analyzed to see if they've been seen before and counted.
186182
187183
:param impressions: List of impression objects with attributes
188184
:type impressions: list[tuple[splitio.models.impression.Impression, dict]]
189185
"""
190-
imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] if self._observer \
191-
else impressions
186+
imps = [(self._observer.test_and_set(imp), attrs) for imp, attrs in impressions] \
187+
if self._observer else impressions
192188

193189
if self._counter:
194190
self._counter.track([imp for imp, _ in imps])
195191

196192
self._send_impressions_to_listener(imps)
197193

198194
this_hour = truncate_time(util.utctime_ms())
199-
self._forwarder([imp for imp, _ in imps] if self._counter is None
200-
else [i for i, _ in imps if i.previous_time is None or i.previous_time < this_hour])
195+
return [imp for imp, _ in imps] if self._counter is None \
196+
else [i for i, _ in imps if i.previous_time is None or i.previous_time < this_hour]
201197

202198
def get_counts(self):
203199
"""

splitio/recorder/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)