Skip to content

Commit e3d129f

Browse files
committed
Added thread lock
1 parent acbeb35 commit e3d129f

File tree

1 file changed

+137
-99
lines changed

1 file changed

+137
-99
lines changed

splitio/storage/inmemmory.py

Lines changed: 137 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ def get_segments_keys_count(self):
309309
with self._lock:
310310
for segment in self._segments:
311311
total_count = total_count + len(segment)
312-
return total_count
312+
return total_count
313313

314314

315315
class InMemoryImpressionStorage(ImpressionStorage):
@@ -471,206 +471,244 @@ def __init__(self):
471471
self._integrations = {}
472472
self._config = {'bT':0, 'nR':0, 'uC': 0}
473473
self._map_latencies = {'Treatment': 't', 'Treatments': 'ts', 'TreatmentWithConfig': 'tc', 'TreatmentsWithConfig': 'tcs', 'Track': 'tr'}
474+
self._lock = threading.RLock()
474475

475476
def record_config(self, config):
476477
"""Record configurations."""
477-
self._config['oM'] = self._get_operation_mode(config['operationMode'])
478-
self._config['st'] = self._get_storage_type(config['operationMode'])
479-
self._config['sE'] = config['streamingEnabled']
480-
self._config['rR'] = self._get_refresh_rates(config)
481-
self._config['uO'] = self._get_url_overrides(config)
482-
self._config['iQ'] = config['impressionsQueueSize']
483-
self._config['eQ'] = config['eventsQueueSize']
484-
self._config['iM'] = self._get_impressions_mode(config['impressionsMode'])
485-
self._config['iL'] = True if config['impressionListener'] is not None else False
486-
self._config['hp'] = self._check_if_proxy_detected()
487-
self._config['aF'] = config['activeFactoryCount']
488-
self._config['rF'] = config['redundantFactoryCount']
478+
with self._lock:
479+
self._config['oM'] = self._get_operation_mode(config['operationMode'])
480+
self._config['st'] = self._get_storage_type(config['operationMode'])
481+
self._config['sE'] = config['streamingEnabled']
482+
self._config['rR'] = self._get_refresh_rates(config)
483+
self._config['uO'] = self._get_url_overrides(config)
484+
self._config['iQ'] = config['impressionsQueueSize']
485+
self._config['eQ'] = config['eventsQueueSize']
486+
self._config['iM'] = self._get_impressions_mode(config['impressionsMode'])
487+
self._config['iL'] = True if config['impressionListener'] is not None else False
488+
self._config['hp'] = self._check_if_proxy_detected()
489+
self._config['aF'] = config['activeFactoryCount']
490+
self._config['rF'] = config['redundantFactoryCount']
489491

490492
def record_ready_time(self, ready_time):
491493
"""Record ready time."""
492-
self._config['tR'] = ready_time
494+
with self._lock:
495+
self._config['tR'] = ready_time
493496

494497
def add_tag(self, tag):
495498
"""Record tag string."""
496-
if len(self._tags) <= MAX_TAGS:
497-
self._tags.append(tag)
499+
with self._lock:
500+
if len(self._tags) <= MAX_TAGS:
501+
self._tags.append(tag)
498502

499503
def record_bur_timeout(self):
500504
"""Record block until ready timeout."""
501-
self._config['bT'] = self._config['bT'] + 1
505+
with self._lock:
506+
self._config['bT'] = self._config['bT'] + 1
502507

503508
def record_non_ready_usage(self):
504509
"""record non-ready usage."""
505-
self._config['nR'] = self._config['nR'] + 1
510+
with self._lock:
511+
self._config['nR'] = self._config['nR'] + 1
506512

507513
def record_latency(self, method, latency):
508514
"""Record method latency time."""
509-
if self._latencies['mL'][self._map_latencies[method]] < MAX_LATENCY_BUCKET_COUNT:
510-
self._latencies['mL'][self._map_latencies[method]].append(latency)
515+
with self._lock:
516+
if self._latencies['mL'][self._map_latencies[method]] < MAX_LATENCY_BUCKET_COUNT:
517+
self._latencies['mL'][self._map_latencies[method]].append(latency)
511518

512519
def record_exceptions(self, method):
513520
"""Record method exception."""
514-
self._exceptions['mE'][self._map_latencies[method]] = self._exceptions['mE'][self._map_latencies[method]] + 1
521+
with self._lock:
522+
self._exceptions['mE'][self._map_latencies[method]] = self._exceptions['mE'][self._map_latencies[method]] + 1
515523

516524
def record_impression_stats(self, data_type, count):
517525
"""Record impressions stats."""
518-
self._counters[data_type] = self._counters[data_type] + count
526+
with self._lock:
527+
self._counters[data_type] = self._counters[data_type] + count
519528

520529
def record_event_stats(self, data_type, count):
521530
"""Record events stats."""
522-
self._counters[data_type] = self._counters[data_type] + count
531+
with self._lock:
532+
self._counters[data_type] = self._counters[data_type] + count
523533

524534
def record_suceessful_sync(self, resource, time):
525535
"""Record successful sync."""
526-
self._records['IS'][resource] = time
536+
with self._lock:
537+
self._records['IS'][resource] = time
527538

528539
def record_sync_error(self, resource, status):
529540
"""Record sync http error."""
530-
self._http_errors[resource][status] = self._http_errors[resource][status] + 1
541+
with self._lock:
542+
self._http_errors[resource][status] = self._http_errors[resource][status] + 1
531543

532544
def record_sync_latency(self, resource, latency):
533545
"""Record latency time."""
534-
if self._latencies['hL'][self._map_latencies[resource]] < MAX_LATENCY_BUCKET_COUNT:
535-
self._latencies['hL'][self._map_latencies[resource]].append(latency)
546+
with self._lock:
547+
if self._latencies['hL'][self._map_latencies[resource]] < MAX_LATENCY_BUCKET_COUNT:
548+
self._latencies['hL'][self._map_latencies[resource]].append(latency)
536549

537550
def record_auth_rejections(self):
538551
"""Record auth rejection."""
539-
self._counters['aR'] = self._counters['aR'] + 1
552+
with self._lock:
553+
self._counters['aR'] = self._counters['aR'] + 1
540554

541555
def record_token_refreshes(self):
542556
"""Record sse token refresh."""
543-
self._counters['tR'] = self._counters['tR'] + 1
557+
with self._lock:
558+
self._counters['tR'] = self._counters['tR'] + 1
544559

545560
def record_streaming_event(self, streaming_event):
546561
"""Record incoming streaming event."""
547-
if len(self._streaming_events) < MAX_STREAMING_EVENTS:
548-
self._streaming_events.append({'e': streaming_event.type, 'd': streaming_event.data, 't': streaming_event.time})
562+
with self._lock:
563+
if len(self._streaming_events) < MAX_STREAMING_EVENTS:
564+
self._streaming_events.append({'e': streaming_event.type, 'd': streaming_event.data, 't': streaming_event.time})
549565

550566
def record_session_length(self, session):
551567
"""Record session length."""
552-
self._records['sL'] = session
568+
with self._lock:
569+
self._records['sL'] = session
553570

554571
def get_bur_timeouts(self):
555572
"""Get block until ready timeout."""
556-
return self._config['bT']
573+
with self._lock:
574+
return self._config['bT']
557575

558576
def get_non_ready_usage(self):
559577
"""Get non-ready usage."""
560-
return self._config['nR']
578+
with self._lock:
579+
return self._config['nR']
561580

562581
def get_config_stats(self):
563582
"""Get all config info."""
564-
return self._config
583+
with self._lock:
584+
return self._config
565585

566586
def pop_exceptions(self):
567587
"""Get and reset method exceptions."""
568-
exceptions = self._exceptions['mE']
569-
self._exceptions = {'mE': {'t': 0, 'ts': 0, 'tc': 0, 'tcs': 0, 'tr': 0}}
570-
return exceptions
588+
with self._lock:
589+
exceptions = self._exceptions['mE']
590+
self._exceptions = {'mE': {'t': 0, 'ts': 0, 'tc': 0, 'tcs': 0, 'tr': 0}}
591+
return exceptions
571592

572593
def pop_tags(self):
573594
"""Get and reset tags."""
574-
tags = self._tags
575-
self._tags = []
576-
return tags
595+
with self._lock:
596+
tags = self._tags
597+
self._tags = []
598+
return tags
577599

578600
def pop_latencies(self):
579601
"""Get and reset eval latencies."""
580-
latencies = self._latencies['mL']
581-
self._latencies['mL'] = {'t': [], 'ts': [], 'tc': [], 'tcs': [], 'tr': []}
582-
return latencies
602+
with self._lock:
603+
latencies = self._latencies['mL']
604+
self._latencies['mL'] = {'t': [], 'ts': [], 'tc': [], 'tcs': [], 'tr': []}
605+
return latencies
583606

584607
def get_impressions_stats(self, type):
585608
"""Get impressions stats"""
586-
return self._counters[type]
609+
with self._lock:
610+
return self._counters[type]
587611

588612
def get_events_stats(self, type):
589613
"""Get events stats"""
590-
return self._counters[type]
614+
with self._lock:
615+
return self._counters[type]
591616

592617
def get_last_synchronization(self):
593618
"""Get last sync"""
594-
return self._records['IS']
619+
with self._lock:
620+
return self._records['IS']
595621

596622
def pop_http_errors(self):
597623
"""Get and reset http errors."""
598-
https_errors = self._http_errors
599-
self._http_errors = {'sp': {}, 'se': {}, 'ms': {}, 'im': {}, 'ic': {}, 'ev': {}, 'te': {}, 'to': {}}
600-
return https_errors
624+
with self._lock:
625+
https_errors = self._http_errors
626+
self._http_errors = {'sp': {}, 'se': {}, 'ms': {}, 'im': {}, 'ic': {}, 'ev': {}, 'te': {}, 'to': {}}
627+
return https_errors
601628

602629
def pop_http_latencies(self):
603630
"""Get and reset http latencies."""
604-
latencies = self._latencies['hL']
605-
self._latencies['hL'] = {'sp': [], 'se': [], 'ms': [], 'im': [], 'ic': [], 'ev': [], 'te': [], 'to': []}
606-
return latencies
631+
with self._lock:
632+
latencies = self._latencies['hL']
633+
self._latencies['hL'] = {'sp': [], 'se': [], 'ms': [], 'im': [], 'ic': [], 'ev': [], 'te': [], 'to': []}
634+
return latencies
607635

608636
def pop_auth_rejections(self):
609637
"""Get and reset auth rejections."""
610-
auth_rejections = self._counters['aR']
611-
self._counters['aR'] = 0
612-
return auth_rejections
638+
with self._lock:
639+
auth_rejections = self._counters['aR']
640+
self._counters['aR'] = 0
641+
return auth_rejections
613642

614643
def pop_token_refreshes(self):
615644
"""Get and reset token refreshes."""
616-
token_refreshes = self._counters['tR']
617-
self._counters['tR'] = 0
618-
return token_refreshes
645+
with self._lock:
646+
token_refreshes = self._counters['tR']
647+
self._counters['tR'] = 0
648+
return token_refreshes
619649

620650
def pop_streaming_events(self):
621651
"""Get and reset streaming events."""
622-
streaming_events = self._streaming_events
623-
self._streaming_events = []
624-
return streaming_events
652+
with self._lock:
653+
streaming_events = self._streaming_events
654+
self._streaming_events = []
655+
return streaming_events
625656

626657
def get_session_length(self):
627658
"""Get session length"""
628-
return self._records['sL']
659+
with self._lock:
660+
return self._records['sL']
629661

630662
def _get_operation_mode(self, op_mode):
631-
if 'in-memory' in op_mode:
632-
return 0
633-
elif op_mode == 'redis-consumer':
634-
return 1
635-
else:
636-
return 2
663+
with self._lock:
664+
if 'in-memory' in op_mode:
665+
return 0
666+
elif op_mode == 'redis-consumer':
667+
return 1
668+
else:
669+
return 2
637670

638671
def _get_storage_type(self, op_mode):
639-
if 'in-memory' in op_mode:
640-
return 'memory'
641-
elif 'redis' in op_mode:
642-
return 'redis'
643-
else:
644-
return 'localstorage'
672+
with self._lock:
673+
if 'in-memory' in op_mode:
674+
return 'memory'
675+
elif 'redis' in op_mode:
676+
return 'redis'
677+
else:
678+
return 'localstorage'
645679

646680
def _get_refresh_rates(self, config):
647-
rr = {}
648-
rr['sp'] == config['featuresRefreshRate']
649-
rr['se'] == config['segmentsRefreshRate']
650-
rr['im'] == config['impressionsRefreshRate']
651-
rr['ev'] == config['eventsPushRate']
652-
rr['te'] == config['metrcsRefreshRate']
653-
return rr
681+
with self._lock:
682+
rr = {}
683+
rr['sp'] == config['featuresRefreshRate']
684+
rr['se'] == config['segmentsRefreshRate']
685+
rr['im'] == config['impressionsRefreshRate']
686+
rr['ev'] == config['eventsPushRate']
687+
rr['te'] == config['metrcsRefreshRate']
688+
return rr
654689

655690
def _get_url_overrides(self, config):
656-
rr = {}
657-
rr['s'] == True if config['sdk_url'] is not None else False
658-
rr['e'] == True if config['events_url'] is not None else False
659-
rr['a'] == True if config['auth_url'] is not None else False
660-
rr['st'] == True if config['streaming_url'] is not None else False
661-
rr['t'] == True if config['telemetry_url'] is not None else False
662-
return rr
691+
with self._lock:
692+
rr = {}
693+
rr['s'] == True if config['sdk_url'] is not None else False
694+
rr['e'] == True if config['events_url'] is not None else False
695+
rr['a'] == True if config['auth_url'] is not None else False
696+
rr['st'] == True if config['streaming_url'] is not None else False
697+
rr['t'] == True if config['telemetry_url'] is not None else False
698+
return rr
663699

664700
def _get_impressions_mode(self, imp_mode):
665-
if imp_mode == 'DEBUG':
666-
return 1
667-
elif imp_mode == 'OPTIMIZED':
668-
return 0
669-
else:
670-
return 3
701+
with self._lock:
702+
if imp_mode == 'DEBUG':
703+
return 1
704+
elif imp_mode == 'OPTIMIZED':
705+
return 0
706+
else:
707+
return 3
671708

672709
def _check_if_proxy_detected(self):
673-
for x in os.environ:
674-
if 'https_proxy' in os.getenv(x):
675-
return True
676-
return False
710+
with self._lock:
711+
for x in os.environ:
712+
if 'https_proxy' in os.getenv(x):
713+
return True
714+
return False

0 commit comments

Comments
 (0)