Skip to content

Commit a240324

Browse files
committed
added telemetry redis storage async
1 parent 1feb071 commit a240324

File tree

2 files changed

+321
-43
lines changed

2 files changed

+321
-43
lines changed

splitio/storage/redis.py

Lines changed: 188 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
ImpressionPipelinedStorage, TelemetryStorage
1111
from splitio.storage.adapters.redis import RedisAdapterException
1212
from splitio.storage.adapters.cache_trait import decorate as add_cache, DEFAULT_MAX_AGE
13-
13+
from splitio.optional.loaders import asyncio
1414

1515
_LOGGER = logging.getLogger(__name__)
1616
MAX_TAGS = 10
@@ -600,41 +600,21 @@ def expire_keys(self, total_keys, inserted):
600600
if total_keys == inserted:
601601
self._redis.expire(self._EVENTS_KEY_TEMPLATE, self._EVENTS_KEY_DEFAULT_TTL)
602602

603-
class RedisTelemetryStorage(TelemetryStorage):
603+
class RedisTelemetryStorageBase(TelemetryStorage):
604604
"""Redis based telemetry storage class."""
605605

606606
_TELEMETRY_CONFIG_KEY = 'SPLITIO.telemetry.init'
607607
_TELEMETRY_LATENCIES_KEY = 'SPLITIO.telemetry.latencies'
608608
_TELEMETRY_EXCEPTIONS_KEY = 'SPLITIO.telemetry.exceptions'
609609
_TELEMETRY_KEY_DEFAULT_TTL = 3600
610610

611-
def __init__(self, redis_client, sdk_metadata):
612-
"""
613-
Class constructor.
614-
615-
:param redis_client: Redis client or compliant interface.
616-
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
617-
:param sdk_metadata: SDK & Machine information.
618-
:type sdk_metadata: splitio.client.util.SdkMetadata
619-
"""
620-
self._lock = threading.RLock()
621-
self._reset_config_tags()
622-
self._redis_client = redis_client
623-
self._sdk_metadata = sdk_metadata
624-
self._method_latencies = MethodLatencies()
625-
self._method_exceptions = MethodExceptions()
626-
self._tel_config = TelemetryConfig()
627-
self._make_pipe = redis_client.pipeline
628-
629611
def _reset_config_tags(self):
630-
with self._lock:
631-
self._config_tags = []
612+
"""Reset all config tags"""
613+
pass
632614

633615
def add_config_tag(self, tag):
634616
"""Record tag string."""
635-
with self._lock:
636-
if len(self._config_tags) < MAX_TAGS:
637-
self._config_tags.append(tag)
617+
pass
638618

639619
def record_config(self, config, extra_config):
640620
"""
@@ -647,26 +627,21 @@ def record_config(self, config, extra_config):
647627

648628
def pop_config_tags(self):
649629
"""Get and reset tags."""
650-
with self._lock:
651-
tags = self._config_tags
652-
self._reset_config_tags()
653-
return tags
630+
pass
654631

655632
def push_config_stats(self):
656633
"""push config stats to redis."""
657-
_LOGGER.debug("Adding Config stats to redis key %s" % (self._TELEMETRY_CONFIG_KEY))
658-
_LOGGER.debug(str(self._format_config_stats()))
659-
self._redis_client.hset(self._TELEMETRY_CONFIG_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip, str(self._format_config_stats()))
634+
pass
660635

661-
def _format_config_stats(self):
636+
def _format_config_stats(self, tags):
662637
"""format only selected config stats to json"""
663638
config_stats = self._tel_config.get_stats()
664639
return json.dumps({
665640
'aF': config_stats['aF'],
666641
'rF': config_stats['rF'],
667642
'sT': config_stats['sT'],
668643
'oM': config_stats['oM'],
669-
't': self.pop_config_tags()
644+
't': tags
670645
})
671646

672647
def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
@@ -703,14 +678,7 @@ def record_exception(self, method):
703678
:param method: method name
704679
:type method: string
705680
"""
706-
_LOGGER.debug("Adding Excepction stats to redis key %s" % (self._TELEMETRY_EXCEPTIONS_KEY))
707-
_LOGGER.debug(self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
708-
method.value)
709-
pipe = self._make_pipe()
710-
pipe.hincrby(self._TELEMETRY_EXCEPTIONS_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
711-
method.value, 1)
712-
result = pipe.execute()
713-
self.expire_keys(self._TELEMETRY_EXCEPTIONS_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result[0])
681+
pass
714682

715683
def record_not_ready_usage(self):
716684
"""
@@ -730,6 +698,94 @@ def record_impression_stats(self, data_type, count):
730698
pass
731699

732700
def expire_latency_keys(self, total_keys, inserted):
701+
pass
702+
703+
def expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
704+
"""
705+
Set expire
706+
707+
:param total_keys: length of keys.
708+
:type total_keys: int
709+
:param inserted: added keys.
710+
:type inserted: int
711+
"""
712+
pass
713+
714+
715+
class RedisTelemetryStorage(RedisTelemetryStorageBase):
716+
"""Redis based telemetry storage class."""
717+
718+
def __init__(self, redis_client, sdk_metadata):
719+
"""
720+
Class constructor.
721+
722+
:param redis_client: Redis client or compliant interface.
723+
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
724+
:param sdk_metadata: SDK & Machine information.
725+
:type sdk_metadata: splitio.client.util.SdkMetadata
726+
"""
727+
self._lock = threading.RLock()
728+
self._reset_config_tags()
729+
self._redis_client = redis_client
730+
self._sdk_metadata = sdk_metadata
731+
self._method_latencies = MethodLatencies()
732+
self._method_exceptions = MethodExceptions()
733+
self._tel_config = TelemetryConfig()
734+
self._make_pipe = redis_client.pipeline
735+
736+
def _reset_config_tags(self):
737+
"""Reset all config tags"""
738+
with self._lock:
739+
self._config_tags = []
740+
741+
def add_config_tag(self, tag):
742+
"""Record tag string."""
743+
with self._lock:
744+
if len(self._config_tags) < MAX_TAGS:
745+
self._config_tags.append(tag)
746+
747+
def pop_config_tags(self):
748+
"""Get and reset tags."""
749+
with self._lock:
750+
tags = self._config_tags
751+
self._reset_config_tags()
752+
return tags
753+
754+
def push_config_stats(self):
755+
"""push config stats to redis."""
756+
_LOGGER.debug("Adding Config stats to redis key %s" % (self._TELEMETRY_CONFIG_KEY))
757+
_LOGGER.debug(str(self._format_config_stats(self.pop_config_tags())))
758+
self._redis_client.hset(self._TELEMETRY_CONFIG_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip, str(self._format_config_stats(self.pop_config_tags())))
759+
760+
def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
761+
"""Record active and redundant factories."""
762+
self._tel_config.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
763+
764+
def record_exception(self, method):
765+
"""
766+
record an exception
767+
768+
:param method: method name
769+
:type method: string
770+
"""
771+
_LOGGER.debug("Adding Excepction stats to redis key %s" % (self._TELEMETRY_EXCEPTIONS_KEY))
772+
_LOGGER.debug(self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
773+
method.value)
774+
pipe = self._make_pipe()
775+
pipe.hincrby(self._TELEMETRY_EXCEPTIONS_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
776+
method.value, 1)
777+
result = pipe.execute()
778+
self.expire_keys(self._TELEMETRY_EXCEPTIONS_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result[0])
779+
780+
def expire_latency_keys(self, total_keys, inserted):
781+
"""
782+
Expire lstency keys
783+
784+
:param total_keys: length of keys.
785+
:type total_keys: int
786+
:param inserted: added keys.
787+
:type inserted: int
788+
"""
733789
self.expire_keys(self._TELEMETRY_LATENCIES_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, total_keys, inserted)
734790

735791
def expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
@@ -743,3 +799,93 @@ def expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
743799
"""
744800
if total_keys == inserted:
745801
self._redis_client.expire(queue_key, key_default_ttl)
802+
803+
804+
class RedisTelemetryStorageAsync(RedisTelemetryStorageBase):
805+
"""Redis based telemetry async storage class."""
806+
807+
async def create(redis_client, sdk_metadata):
808+
"""
809+
Create instance and reset tags
810+
811+
:param redis_client: Redis client or compliant interface.
812+
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
813+
:param sdk_metadata: SDK & Machine information.
814+
:type sdk_metadata: splitio.client.util.SdkMetadata
815+
816+
:return: self instance.
817+
:rtype: splitio.storage.redis.RedisTelemetryStorageAsync
818+
"""
819+
self = RedisTelemetryStorageAsync()
820+
self._lock = asyncio.Lock()
821+
await self._reset_config_tags()
822+
self._redis_client = redis_client
823+
self._sdk_metadata = sdk_metadata
824+
self._method_latencies = MethodLatencies() # to be changed to async version class
825+
self._method_exceptions = MethodExceptions() # to be changed to async version class
826+
self._tel_config = TelemetryConfig() # to be changed to async version class
827+
self._make_pipe = redis_client.pipeline
828+
return self
829+
830+
async def _reset_config_tags(self):
831+
"""Reset all config tags"""
832+
async with self._lock:
833+
self._config_tags = []
834+
835+
async def add_config_tag(self, tag):
836+
"""Record tag string."""
837+
async with self._lock:
838+
if len(self._config_tags) < MAX_TAGS:
839+
self._config_tags.append(tag)
840+
841+
async def pop_config_tags(self):
842+
"""Get and reset tags."""
843+
async with self._lock:
844+
tags = self._config_tags
845+
await self._reset_config_tags()
846+
return tags
847+
848+
async def push_config_stats(self):
849+
"""push config stats to redis."""
850+
_LOGGER.debug("Adding Config stats to redis key %s" % (self._TELEMETRY_CONFIG_KEY))
851+
_LOGGER.debug(str(await self._format_config_stats(await self.pop_config_tags())))
852+
await self._redis_client.hset(self._TELEMETRY_CONFIG_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip, str(await self._format_config_stats(await self.pop_config_tags())))
853+
854+
async def record_exception(self, method):
855+
"""
856+
record an exception
857+
858+
:param method: method name
859+
:type method: string
860+
"""
861+
_LOGGER.debug("Adding Excepction stats to redis key %s" % (self._TELEMETRY_EXCEPTIONS_KEY))
862+
_LOGGER.debug(self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
863+
method.value)
864+
pipe = self._make_pipe()
865+
pipe.hincrby(self._TELEMETRY_EXCEPTIONS_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
866+
method.value, 1)
867+
result = await pipe.execute()
868+
await self.expire_keys(self._TELEMETRY_EXCEPTIONS_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result[0])
869+
870+
async def expire_latency_keys(self, total_keys, inserted):
871+
"""
872+
Expire lstency keys
873+
874+
:param total_keys: length of keys.
875+
:type total_keys: int
876+
:param inserted: added keys.
877+
:type inserted: int
878+
"""
879+
await self.expire_keys(self._TELEMETRY_LATENCIES_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, total_keys, inserted)
880+
881+
async def expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
882+
"""
883+
Set expire
884+
885+
:param total_keys: length of keys.
886+
:type total_keys: int
887+
:param inserted: added keys.
888+
:type inserted: int
889+
"""
890+
if total_keys == inserted:
891+
await self._redis_client.expire(queue_key, key_default_ttl)

0 commit comments

Comments
 (0)