Skip to content

Commit e6189cb

Browse files
committed
Refactor telemetery storage dicts
1 parent d752a89 commit e6189cb

File tree

4 files changed

+283
-185
lines changed

4 files changed

+283
-185
lines changed

splitio/engine/telemetry.py

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,25 @@ def get_config_stats(self):
150150
return self._telemetry_storage.get_config_stats()
151151

152152
def get_config_stats_to_json(self):
153-
return json.dumps(self._telemetry_storage.get_config_stats())
153+
config_stats = self._telemetry_storage.get_config_stats()
154+
return json.dumps({
155+
'oM': config_stats['operationMode'],
156+
'sT': config_stats['storageType'],
157+
'sE': config_stats['streamingEnabled'],
158+
'rR': config_stats['refreshRate'],
159+
'uO': config_stats['urlOverride'],
160+
'iQ': config_stats['impressionsQueueSize'],
161+
'eQ': config_stats['eventsQueueSize'],
162+
'iM': config_stats['impressionsMode'],
163+
'iL': config_stats['impressionListener'],
164+
'hP': config_stats['httpProxy'],
165+
'aF': config_stats['activeFactoryCount'],
166+
'rF': config_stats['redundantFactoryCount'],
167+
'bT': config_stats['blockUntilReadyTimeout'],
168+
'nR': config_stats['notReady'],
169+
'uC': config_stats['userConsent'],
170+
'tR': config_stats['timeUntilReady']}
171+
)
154172

155173
class TelemetryEvaluationConsumer(object):
156174
"""Telemetry evaluation consumer class."""
@@ -169,12 +187,23 @@ def pop_latencies(self):
169187

170188
def pop_formatted_stats(self):
171189
"""Get formatted and reset stats."""
190+
exceptions = self.pop_exceptions()
191+
latencies = self.pop_latencies()
172192
return {
173-
**{'mE': self.pop_exceptions()},
174-
**{'mL': self.pop_latencies()},
193+
**{'mE': {'t': exceptions['treatment'],
194+
'ts': exceptions['treatments'],
195+
'tc': exceptions['treatmentWithConfig'],
196+
'tcs': exceptions['treatmentsWithConfig'],
197+
'tr': exceptions['track']}
198+
},
199+
**{'mL': {'t': latencies['treatment'],
200+
'ts': latencies['treatments'],
201+
'tc': latencies['treatmentWithConfig'],
202+
'tcs': latencies['treatmentsWithConfig'],
203+
'tr': latencies['track']}
204+
},
175205
}
176206

177-
178207
class TelemetryRuntimeConsumer(object):
179208
"""Telemetry runtime consumer class."""
180209

@@ -224,18 +253,49 @@ def get_session_length(self):
224253

225254
def pop_formatted_stats(self):
226255
"""Get formatted and reset stats."""
256+
last_synchronization = self.get_last_synchronization()
257+
http_errors = self.pop_http_errors()
258+
http_latencies = self.pop_http_latencies()
227259
return {
228-
**{'iQ': self.get_impressions_stats('iQ')},
229-
**{'iDe': self.get_impressions_stats('iDe')},
230-
**{'iDr': self.get_impressions_stats('iDr')},
231-
**{'eQ': self.get_events_stats('eQ')},
232-
**{'eD': self.get_events_stats('eD')},
233-
**{'IS': self.get_last_synchronization()},
260+
**{'iQ': self.get_impressions_stats('impressionsQueued')},
261+
**{'iDe': self.get_impressions_stats('impressionsDeduped')},
262+
**{'iDr': self.get_impressions_stats('impressionsDropped')},
263+
**{'eQ': self.get_events_stats('eventsQueued')},
264+
**{'eD': self.get_events_stats('eventsDropped')},
265+
**{'lS': {'sp': last_synchronization['split'],
266+
'se': last_synchronization['segment'],
267+
'ms': last_synchronization['mySegment'],
268+
'im': last_synchronization['impression'],
269+
'ic': last_synchronization['impressionCount'],
270+
'ev': last_synchronization['event'],
271+
'te': last_synchronization['telemetry'],
272+
'to': last_synchronization['token']}
273+
},
234274
**{'t': self.pop_tags()},
235-
**{'hE': self.pop_http_errors()},
236-
**{'hL': self.pop_http_latencies()},
275+
**{'hE': {'sp': http_errors['split'],
276+
'se': http_errors['segment'],
277+
'ms': http_errors['mySegment'],
278+
'im': http_errors['impression'],
279+
'ic': http_errors['impressionCount'],
280+
'ev': http_errors['event'],
281+
'te': http_errors['telemetry'],
282+
'to': http_errors['token']}
283+
},
284+
**{'hL': {'sp': http_latencies['split'],
285+
'se': http_latencies['segment'],
286+
'ms': http_latencies['mySegment'],
287+
'im': http_latencies['impression'],
288+
'ic': http_latencies['impressionCount'],
289+
'ev': http_latencies['event'],
290+
'te': http_latencies['telemetry'],
291+
'to': http_latencies['token']}
292+
},
237293
**{'aR': self.pop_auth_rejections()},
238294
**{'tR': self.pop_token_refreshes()},
239-
**{'sE': self.pop_streaming_events()},
295+
**{'sE': [{'e': event['type'],
296+
'd': event['data'],
297+
't': event['time']
298+
} for event in self.pop_streaming_events()]
299+
},
240300
**{'sL': self.get_session_length()}
241301
}

splitio/storage/inmemmory.py

Lines changed: 64 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ def get_segments_keys_count(self):
308308
total_count = 0
309309
with self._lock:
310310
for segment in self._segments:
311-
total_count = total_count + len(segment)
311+
total_count = total_count + len(self._segments[segment]._keys)
312312
return total_count
313313

314314

@@ -463,43 +463,41 @@ def __init__(self):
463463
self._lock = threading.RLock()
464464

465465
def _reset_counters(self):
466-
self._counters = {'iQ': 0, 'iDe': 0, 'iDr': 0, 'eQ': 0, 'eD': 0, 'sL': 0,
467-
'aR': 0, 'tR': 0}
468-
self._exceptions = {'mE': {'t': 0, 'ts': 0, 'tc': 0, 'tcs': 0, 'tr': 0}}
469-
self._records = {'IS': {'sp': 0, 'se': 0, 'ms': 0, 'im': 0, 'ic': 0, 'ev': 0, 'te': 0, 'to': 0},
470-
'sL': 0}
471-
self._http_errors = {'sp': {}, 'se': {}, 'ms': {}, 'im': {}, 'ic': {}, 'ev': {}, 'te': {}, 'to': {}}
472-
self._config = {'bT':0, 'nR':0, 'uC': 0}
466+
self._counters = {'impressionsQueued': 0, 'impressionsDeduped': 0, 'impressionsDropped': 0, 'eventsQueued': 0, 'eventsDropped': 0,
467+
'authRejections': 0, 'tokenRefreshes': 0}
468+
self._exceptions = {'methodExceptions': {'treatment': 0, 'treatments': 0, 'treatmentWithConfig': 0, 'treatmentsWithConfig': 0, 'track': 0}}
469+
self._records = {'lastSynchronizations': {'split': 0, 'segment': 0, 'mySegment': 0, 'impression': 0, 'impressionCount': 0, 'event': 0, 'telemetry': 0, 'token': 0},
470+
'sessionLength': 0}
471+
self._http_errors = {'split': {}, 'segment': {}, 'mySegment': {}, 'impression': {}, 'impressionCount': {}, 'event': {}, 'telemetry': {}, 'token': {}}
472+
self._config = {'blockUntilReadyTimeout':0, 'notReady':0, 'userConsent': 0, 'timeUntilReady': 0}
473473
self._streaming_events = []
474474
self._tags = []
475475
self._integrations = {}
476476

477477
def _reset_latencies(self):
478-
self._latencies = {'mL': {'t': [], 'ts': [], 'tc': [], 'tcs': [], 'tr': []},
479-
'hL': {'sp': [], 'se': [], 'ms': [], 'im': [], 'ic': [], 'ev': [], 'te': [], 'to': []}}
480-
self._map_latencies = {'Treatment': 't', 'Treatments': 'ts', 'TreatmentWithConfig': 'tc', 'TreatmentsWithConfig': 'tcs', 'Track': 'tr'}
481-
478+
self._latencies = {'methodLatencies': {'treatment': [], 'treatments': [], 'treatmentWithConfig': [], 'treatmentsWithConfig': [], 'track': []},
479+
'httpLatencies': {'split': [], 'segment': [], 'mySegment': [], 'impression': [], 'impressionCount': [], 'event': [], 'telemetry': [], 'token': []}}
482480

483481
def record_config(self, config):
484482
"""Record configurations."""
485483
with self._lock:
486-
self._config['oM'] = self._get_operation_mode(config['operationMode'])
487-
self._config['st'] = self._get_storage_type(config['operationMode'])
488-
self._config['sE'] = config['streamingEnabled']
489-
self._config['rR'] = self._get_refresh_rates(config)
490-
self._config['uO'] = self._get_url_overrides(config)
491-
self._config['iQ'] = config['impressionsQueueSize']
492-
self._config['eQ'] = config['eventsQueueSize']
493-
self._config['iM'] = self._get_impressions_mode(config['impressionsMode'])
494-
self._config['iL'] = True if config['impressionListener'] is not None else False
495-
self._config['hp'] = self._check_if_proxy_detected()
496-
self._config['aF'] = config['activeFactoryCount']
497-
self._config['rF'] = config['redundantFactoryCount']
484+
self._config['operationMode'] = self._get_operation_mode(config['operationMode'])
485+
self._config['storageType'] = self._get_storage_type(config['operationMode'])
486+
self._config['streamingEnabled'] = config['streamingEnabled']
487+
self._config['refreshRate'] = self._get_refresh_rates(config)
488+
self._config['urlOverride'] = self._get_url_overrides(config)
489+
self._config['impressionsQueueSize'] = config['impressionsQueueSize']
490+
self._config['eventsQueueSize'] = config['eventsQueueSize']
491+
self._config['impressionsMode'] = self._get_impressions_mode(config['impressionsMode'])
492+
self._config['impressionListener'] = True if config['impressionListener'] is not None else False
493+
self._config['httpProxy'] = self._check_if_proxy_detected()
494+
self._config['activeFactoryCount'] = config['activeFactoryCount']
495+
self._config['redundantFactoryCount'] = config['redundantFactoryCount']
498496

499497
def record_ready_time(self, ready_time):
500498
"""Record ready time."""
501499
with self._lock:
502-
self._config['tR'] = ready_time
500+
self._config['timeUntilReady'] = ready_time
503501

504502
def add_tag(self, tag):
505503
"""Record tag string."""
@@ -510,23 +508,23 @@ def add_tag(self, tag):
510508
def record_bur_time_out(self):
511509
"""Record block until ready timeout."""
512510
with self._lock:
513-
self._config['bT'] = self._config['bT'] + 1
511+
self._config['blockUntilReadyTimeout'] = self._config['blockUntilReadyTimeout'] + 1
514512

515513
def record_not_ready_usage(self):
516514
"""record non-ready usage."""
517515
with self._lock:
518-
self._config['nR'] = self._config['nR'] + 1
516+
self._config['notReady'] = self._config['notReady'] + 1
519517

520518
def record_latency(self, method, latency):
521519
"""Record method latency time."""
522520
with self._lock:
523-
if len(self._latencies['mL'][self._map_latencies[method]]) < MAX_LATENCY_BUCKET_COUNT:
524-
self._latencies['mL'][self._map_latencies[method]].append(latency)
521+
if len(self._latencies['methodLatencies'][method]) < MAX_LATENCY_BUCKET_COUNT:
522+
self._latencies['methodLatencies'][method].append(latency)
525523

526524
def record_exception(self, method):
527525
"""Record method exception."""
528526
with self._lock:
529-
self._exceptions['mE'][self._map_latencies[method]] = self._exceptions['mE'][self._map_latencies[method]] + 1
527+
self._exceptions['methodExceptions'][method] = self._exceptions['methodExceptions'][method] + 1
530528

531529
def record_impression_stats(self, data_type, count):
532530
"""Record impressions stats."""
@@ -541,7 +539,7 @@ def record_event_stats(self, data_type, count):
541539
def record_suceessful_sync(self, resource, time):
542540
"""Record successful sync."""
543541
with self._lock:
544-
self._records['IS'][resource] = time
542+
self._records['lastSynchronizations'][resource] = time
545543

546544
def record_sync_error(self, resource, status):
547545
"""Record sync http error."""
@@ -553,39 +551,39 @@ def record_sync_error(self, resource, status):
553551
def record_sync_latency(self, resource, latency):
554552
"""Record latency time."""
555553
with self._lock:
556-
if len(self._latencies['hL'][resource]) < MAX_LATENCY_BUCKET_COUNT:
557-
self._latencies['hL'][resource].append(latency)
554+
if len(self._latencies['httpLatencies'][resource]) < MAX_LATENCY_BUCKET_COUNT:
555+
self._latencies['httpLatencies'][resource].append(latency)
558556

559557
def record_auth_rejections(self):
560558
"""Record auth rejection."""
561559
with self._lock:
562-
self._counters['aR'] = self._counters['aR'] + 1
560+
self._counters['authRejections'] = self._counters['authRejections'] + 1
563561

564562
def record_token_refreshes(self):
565563
"""Record sse token refresh."""
566564
with self._lock:
567-
self._counters['tR'] = self._counters['tR'] + 1
565+
self._counters['tokenRefreshes'] = self._counters['tokenRefreshes'] + 1
568566

569567
def record_streaming_event(self, streaming_event):
570568
"""Record incoming streaming event."""
571569
with self._lock:
572570
if len(self._streaming_events) < MAX_STREAMING_EVENTS:
573-
self._streaming_events.append({'e': streaming_event['type'], 'd': streaming_event['data'], 't': streaming_event['time']})
571+
self._streaming_events.append({'type': streaming_event['type'], 'data': streaming_event['data'], 'time': streaming_event['time']})
574572

575573
def record_session_length(self, session):
576574
"""Record session length."""
577575
with self._lock:
578-
self._records['sL'] = session
576+
self._records['sessionLength'] = session
579577

580578
def get_bur_time_outs(self):
581579
"""Get block until ready timeout."""
582580
with self._lock:
583-
return self._config['bT']
581+
return self._config['blockUntilReadyTimeout']
584582

585583
def get_non_ready_usage(self):
586584
"""Get non-ready usage."""
587585
with self._lock:
588-
return self._config['nR']
586+
return self._config['notReady']
589587

590588
def get_config_stats(self):
591589
"""Get all config info."""
@@ -595,8 +593,8 @@ def get_config_stats(self):
595593
def pop_exceptions(self):
596594
"""Get and reset method exceptions."""
597595
with self._lock:
598-
exceptions = self._exceptions['mE']
599-
self._exceptions = {'mE': {'t': 0, 'ts': 0, 'tc': 0, 'tcs': 0, 'tr': 0}}
596+
exceptions = self._exceptions['methodExceptions']
597+
self._exceptions = {'methodExceptions': {'treatment': 0, 'treatments': 0, 'treatmentWithConfig': 0, 'treatmentsWithConfig': 0, 'track': 0}}
600598
return exceptions
601599

602600
def pop_tags(self):
@@ -609,8 +607,8 @@ def pop_tags(self):
609607
def pop_latencies(self):
610608
"""Get and reset eval latencies."""
611609
with self._lock:
612-
latencies = self._latencies['mL']
613-
self._latencies['mL'] = {'t': [], 'ts': [], 'tc': [], 'tcs': [], 'tr': []}
610+
latencies = self._latencies['methodLatencies']
611+
self._latencies['methodLatencies'] = {'treatment': [], 'treatments': [], 'treatmentWithConfig': [], 'treatmentsWithConfig': [], 'track': []}
614612
return latencies
615613

616614
def get_impressions_stats(self, type):
@@ -626,34 +624,34 @@ def get_events_stats(self, type):
626624
def get_last_synchronization(self):
627625
"""Get last sync"""
628626
with self._lock:
629-
return self._records['IS']
627+
return self._records['lastSynchronizations']
630628

631629
def pop_http_errors(self):
632630
"""Get and reset http errors."""
633631
with self._lock:
634632
https_errors = self._http_errors
635-
self._http_errors = {'sp': {}, 'se': {}, 'ms': {}, 'im': {}, 'ic': {}, 'ev': {}, 'te': {}, 'to': {}}
633+
self._http_errors = {'split': {}, 'segment': {}, 'mySegment': {}, 'impression': {}, 'impressionCount': {}, 'event': {}, 'telemetry': {}, 'token': {}}
636634
return https_errors
637635

638636
def pop_http_latencies(self):
639637
"""Get and reset http latencies."""
640638
with self._lock:
641-
latencies = self._latencies['hL']
642-
self._latencies['hL'] = {'sp': [], 'se': [], 'ms': [], 'im': [], 'ic': [], 'ev': [], 'te': [], 'to': []}
639+
latencies = self._latencies['httpLatencies']
640+
self._latencies['httpLatencies'] = {'split': [], 'segment': [], 'mySegment': [], 'impression': [], 'impressionCount': [], 'event': [], 'telemetry': [], 'token': []}
643641
return latencies
644642

645643
def pop_auth_rejections(self):
646644
"""Get and reset auth rejections."""
647645
with self._lock:
648-
auth_rejections = self._counters['aR']
649-
self._counters['aR'] = 0
646+
auth_rejections = self._counters['authRejections']
647+
self._counters['authRejections'] = 0
650648
return auth_rejections
651649

652650
def pop_token_refreshes(self):
653651
"""Get and reset token refreshes."""
654652
with self._lock:
655-
token_refreshes = self._counters['tR']
656-
self._counters['tR'] = 0
653+
token_refreshes = self._counters['tokenRefreshes']
654+
self._counters['tokenRefreshes'] = 0
657655
return token_refreshes
658656

659657
def pop_streaming_events(self):
@@ -666,7 +664,7 @@ def pop_streaming_events(self):
666664
def get_session_length(self):
667665
"""Get session length"""
668666
with self._lock:
669-
return self._records['sL']
667+
return self._records['sessionLength']
670668

671669
def _get_operation_mode(self, op_mode):
672670
with self._lock:
@@ -688,23 +686,23 @@ def _get_storage_type(self, op_mode):
688686

689687
def _get_refresh_rates(self, config):
690688
with self._lock:
691-
rr = {}
692-
rr['sp'] = config['featuresRefreshRate']
693-
rr['se'] = config['segmentsRefreshRate']
694-
rr['im'] = config['impressionsRefreshRate']
695-
rr['ev'] = config['eventsPushRate']
696-
rr['te'] = config['metrcsRefreshRate']
697-
return rr
689+
return {
690+
'featuresRefreshRate': config['featuresRefreshRate'],
691+
'segmentsRefreshRate': config['segmentsRefreshRate'],
692+
'impressionsRefreshRate': config['impressionsRefreshRate'],
693+
'eventsPushRate': config['eventsPushRate'],
694+
'metrcsRefreshRate': config['metrcsRefreshRate']
695+
}
698696

699697
def _get_url_overrides(self, config):
700698
with self._lock:
701-
rr = {}
702-
rr['s'] == True if 'sdk_url' in config else False
703-
rr['e'] == True if 'events_url' in config else False
704-
rr['a'] == True if 'auth_url' in config else False
705-
rr['st'] == True if 'streaming_url' in config else False
706-
rr['t'] == True if 'telemetry_url' in config else False
707-
return rr
699+
return {
700+
'sdk_url': True if 'sdk_url' in config else False,
701+
'events_url': True if 'events_url' in config else False,
702+
'auth_url': True if 'auth_url' in config else False,
703+
'streaming_url': True if 'streaming_url' in config else False,
704+
'telemetry_url': True if 'telemetry_url' in config else False
705+
}
708706

709707
def _get_impressions_mode(self, imp_mode):
710708
with self._lock:

0 commit comments

Comments
 (0)