Skip to content

Commit 20e60ca

Browse files
authored
Merge pull request #400 from splitio/async-redis-telemetry-storage
added telemetry redis storage async
2 parents a0cfafe + 1beb1e9 commit 20e60ca

File tree

2 files changed

+349
-48
lines changed

2 files changed

+349
-48
lines changed

splitio/storage/redis.py

Lines changed: 209 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55

66
from splitio.models.impressions import Impression
77
from splitio.models import splits, segments
8-
from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig, get_latency_bucket_index
8+
from splitio.models.telemetry import TelemetryConfig, get_latency_bucket_index, TelemetryConfigAsync
99
from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, \
1010
ImpressionPipelinedStorage, TelemetryStorage
1111
from splitio.storage.adapters.redis import RedisAdapterException
1212
from splitio.storage.adapters.cache_trait import decorate as add_cache, DEFAULT_MAX_AGE
13+
from splitio.optional.loaders import asyncio
1314
from splitio.storage.adapters.cache_trait import LocalMemoryCache
1415

15-
1616
_LOGGER = logging.getLogger(__name__)
1717
MAX_TAGS = 10
1818

@@ -854,41 +854,21 @@ def expire_keys(self, total_keys, inserted):
854854
if total_keys == inserted:
855855
self._redis.expire(self._EVENTS_KEY_TEMPLATE, self._EVENTS_KEY_DEFAULT_TTL)
856856

857-
class RedisTelemetryStorage(TelemetryStorage):
857+
class RedisTelemetryStorageBase(TelemetryStorage):
858858
"""Redis based telemetry storage class."""
859859

860860
_TELEMETRY_CONFIG_KEY = 'SPLITIO.telemetry.init'
861861
_TELEMETRY_LATENCIES_KEY = 'SPLITIO.telemetry.latencies'
862862
_TELEMETRY_EXCEPTIONS_KEY = 'SPLITIO.telemetry.exceptions'
863863
_TELEMETRY_KEY_DEFAULT_TTL = 3600
864864

865-
def __init__(self, redis_client, sdk_metadata):
866-
"""
867-
Class constructor.
868-
869-
:param redis_client: Redis client or compliant interface.
870-
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
871-
:param sdk_metadata: SDK & Machine information.
872-
:type sdk_metadata: splitio.client.util.SdkMetadata
873-
"""
874-
self._lock = threading.RLock()
875-
self._reset_config_tags()
876-
self._redis_client = redis_client
877-
self._sdk_metadata = sdk_metadata
878-
self._method_latencies = MethodLatencies()
879-
self._method_exceptions = MethodExceptions()
880-
self._tel_config = TelemetryConfig()
881-
self._make_pipe = redis_client.pipeline
882-
883865
def _reset_config_tags(self):
884-
with self._lock:
885-
self._config_tags = []
866+
"""Reset all config tags"""
867+
pass
886868

887869
def add_config_tag(self, tag):
888870
"""Record tag string."""
889-
with self._lock:
890-
if len(self._config_tags) < MAX_TAGS:
891-
self._config_tags.append(tag)
871+
pass
892872

893873
def record_config(self, config, extra_config):
894874
"""
@@ -897,35 +877,29 @@ def record_config(self, config, extra_config):
897877
:param congif: factory configuration parameters
898878
:type config: splitio.client.config
899879
"""
900-
self._tel_config.record_config(config, extra_config)
880+
pass
901881

902882
def pop_config_tags(self):
903883
"""Get and reset tags."""
904-
with self._lock:
905-
tags = self._config_tags
906-
self._reset_config_tags()
907-
return tags
884+
pass
908885

909886
def push_config_stats(self):
910887
"""push config stats to redis."""
911-
_LOGGER.debug("Adding Config stats to redis key %s" % (self._TELEMETRY_CONFIG_KEY))
912-
_LOGGER.debug(str(self._format_config_stats()))
913-
self._redis_client.hset(self._TELEMETRY_CONFIG_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip, str(self._format_config_stats()))
888+
pass
914889

915-
def _format_config_stats(self):
890+
def _format_config_stats(self, config_stats, tags):
916891
"""format only selected config stats to json"""
917-
config_stats = self._tel_config.get_stats()
918892
return json.dumps({
919893
'aF': config_stats['aF'],
920894
'rF': config_stats['rF'],
921895
'sT': config_stats['sT'],
922896
'oM': config_stats['oM'],
923-
't': self.pop_config_tags()
897+
't': tags
924898
})
925899

926900
def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
927901
"""Record active and redundant factories."""
928-
self._tel_config.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
902+
pass
929903

930904
def add_latency_to_pipe(self, method, bucket, pipe):
931905
"""
@@ -957,14 +931,7 @@ def record_exception(self, method):
957931
:param method: method name
958932
:type method: string
959933
"""
960-
_LOGGER.debug("Adding Excepction stats to redis key %s" % (self._TELEMETRY_EXCEPTIONS_KEY))
961-
_LOGGER.debug(self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
962-
method.value)
963-
pipe = self._make_pipe()
964-
pipe.hincrby(self._TELEMETRY_EXCEPTIONS_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
965-
method.value, 1)
966-
result = pipe.execute()
967-
self.expire_keys(self._TELEMETRY_EXCEPTIONS_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result[0])
934+
pass
968935

969936
def record_not_ready_usage(self):
970937
"""
@@ -984,6 +951,105 @@ def record_impression_stats(self, data_type, count):
984951
pass
985952

986953
def expire_latency_keys(self, total_keys, inserted):
954+
pass
955+
956+
def expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
957+
"""
958+
Set expire
959+
960+
:param total_keys: length of keys.
961+
:type total_keys: int
962+
:param inserted: added keys.
963+
:type inserted: int
964+
"""
965+
pass
966+
967+
968+
class RedisTelemetryStorage(RedisTelemetryStorageBase):
969+
"""Redis based telemetry storage class."""
970+
971+
def __init__(self, redis_client, sdk_metadata):
972+
"""
973+
Class constructor.
974+
975+
:param redis_client: Redis client or compliant interface.
976+
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
977+
:param sdk_metadata: SDK & Machine information.
978+
:type sdk_metadata: splitio.client.util.SdkMetadata
979+
"""
980+
self._lock = threading.RLock()
981+
self._reset_config_tags()
982+
self._redis_client = redis_client
983+
self._sdk_metadata = sdk_metadata
984+
self._tel_config = TelemetryConfig()
985+
self._make_pipe = redis_client.pipeline
986+
987+
def _reset_config_tags(self):
988+
"""Reset all config tags"""
989+
with self._lock:
990+
self._config_tags = []
991+
992+
def add_config_tag(self, tag):
993+
"""Record tag string."""
994+
with self._lock:
995+
if len(self._config_tags) < MAX_TAGS:
996+
self._config_tags.append(tag)
997+
998+
def record_config(self, config, extra_config):
999+
"""
1000+
initilize telemetry objects
1001+
1002+
:param congif: factory configuration parameters
1003+
:type config: splitio.client.config
1004+
"""
1005+
self._tel_config.record_config(config, extra_config)
1006+
1007+
def pop_config_tags(self):
1008+
"""Get and reset tags."""
1009+
with self._lock:
1010+
tags = self._config_tags
1011+
self._reset_config_tags()
1012+
return tags
1013+
1014+
def push_config_stats(self):
1015+
"""push config stats to redis."""
1016+
_LOGGER.debug("Adding Config stats to redis key %s" % (self._TELEMETRY_CONFIG_KEY))
1017+
_LOGGER.debug(str(self._format_config_stats(self._tel_config.get_stats(), self.pop_config_tags())))
1018+
self._redis_client.hset(self._TELEMETRY_CONFIG_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip, str(self._format_config_stats(self._tel_config.get_stats(), self.pop_config_tags())))
1019+
1020+
def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
1021+
"""Record active and redundant factories."""
1022+
self._tel_config.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
1023+
1024+
def record_exception(self, method):
1025+
"""
1026+
record an exception
1027+
1028+
:param method: method name
1029+
:type method: string
1030+
"""
1031+
_LOGGER.debug("Adding Excepction stats to redis key %s" % (self._TELEMETRY_EXCEPTIONS_KEY))
1032+
_LOGGER.debug(self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
1033+
method.value)
1034+
pipe = self._make_pipe()
1035+
pipe.hincrby(self._TELEMETRY_EXCEPTIONS_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
1036+
method.value, 1)
1037+
result = pipe.execute()
1038+
self.expire_keys(self._TELEMETRY_EXCEPTIONS_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result[0])
1039+
1040+
def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
1041+
"""Record active and redundant factories."""
1042+
self._tel_config.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
1043+
1044+
def expire_latency_keys(self, total_keys, inserted):
1045+
"""
1046+
Expire lstency keys
1047+
1048+
:param total_keys: length of keys.
1049+
:type total_keys: int
1050+
:param inserted: added keys.
1051+
:type inserted: int
1052+
"""
9871053
self.expire_keys(self._TELEMETRY_LATENCIES_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, total_keys, inserted)
9881054

9891055
def expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
@@ -997,3 +1063,100 @@ def expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
9971063
"""
9981064
if total_keys == inserted:
9991065
self._redis_client.expire(queue_key, key_default_ttl)
1066+
1067+
1068+
class RedisTelemetryStorageAsync(RedisTelemetryStorageBase):
1069+
"""Redis based telemetry async storage class."""
1070+
1071+
async def create(redis_client, sdk_metadata):
1072+
"""
1073+
Create instance and reset tags
1074+
1075+
:param redis_client: Redis client or compliant interface.
1076+
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
1077+
:param sdk_metadata: SDK & Machine information.
1078+
:type sdk_metadata: splitio.client.util.SdkMetadata
1079+
1080+
:return: self instance.
1081+
:rtype: splitio.storage.redis.RedisTelemetryStorageAsync
1082+
"""
1083+
self = RedisTelemetryStorageAsync()
1084+
await self._reset_config_tags()
1085+
self._redis_client = redis_client
1086+
self._sdk_metadata = sdk_metadata
1087+
self._tel_config = await TelemetryConfigAsync.create()
1088+
self._make_pipe = redis_client.pipeline
1089+
return self
1090+
1091+
async def _reset_config_tags(self):
1092+
"""Reset all config tags"""
1093+
self._config_tags = []
1094+
1095+
async def add_config_tag(self, tag):
1096+
"""Record tag string."""
1097+
if len(self._config_tags) < MAX_TAGS:
1098+
self._config_tags.append(tag)
1099+
1100+
async def record_config(self, config, extra_config):
1101+
"""
1102+
initilize telemetry objects
1103+
1104+
:param congif: factory configuration parameters
1105+
:type config: splitio.client.config
1106+
"""
1107+
await self._tel_config.record_config(config, extra_config)
1108+
1109+
async def pop_config_tags(self):
1110+
"""Get and reset tags."""
1111+
tags = self._config_tags
1112+
await self._reset_config_tags()
1113+
return tags
1114+
1115+
async def push_config_stats(self):
1116+
"""push config stats to redis."""
1117+
_LOGGER.debug("Adding Config stats to redis key %s" % (self._TELEMETRY_CONFIG_KEY))
1118+
_LOGGER.debug(str(await self._format_config_stats(await self._tel_config.get_stats(), await self.pop_config_tags())))
1119+
await self._redis_client.hset(self._TELEMETRY_CONFIG_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip, str(await self._format_config_stats(await self._tel_config.get_stats(), await self.pop_config_tags())))
1120+
1121+
async def record_exception(self, method):
1122+
"""
1123+
record an exception
1124+
1125+
:param method: method name
1126+
:type method: string
1127+
"""
1128+
_LOGGER.debug("Adding Excepction stats to redis key %s" % (self._TELEMETRY_EXCEPTIONS_KEY))
1129+
_LOGGER.debug(self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
1130+
method.value)
1131+
pipe = self._make_pipe()
1132+
pipe.hincrby(self._TELEMETRY_EXCEPTIONS_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
1133+
method.value, 1)
1134+
result = await pipe.execute()
1135+
await self.expire_keys(self._TELEMETRY_EXCEPTIONS_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result[0])
1136+
1137+
async def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
1138+
"""Record active and redundant factories."""
1139+
await self._tel_config.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
1140+
1141+
async def expire_latency_keys(self, total_keys, inserted):
1142+
"""
1143+
Expire lstency keys
1144+
1145+
:param total_keys: length of keys.
1146+
:type total_keys: int
1147+
:param inserted: added keys.
1148+
:type inserted: int
1149+
"""
1150+
await self.expire_keys(self._TELEMETRY_LATENCIES_KEY, self._TELEMETRY_KEY_DEFAULT_TTL, total_keys, inserted)
1151+
1152+
async def expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
1153+
"""
1154+
Set expire
1155+
1156+
:param total_keys: length of keys.
1157+
:type total_keys: int
1158+
:param inserted: added keys.
1159+
:type inserted: int
1160+
"""
1161+
if total_keys == inserted:
1162+
await self._redis_client.expire(queue_key, key_default_ttl)

0 commit comments

Comments
 (0)