Skip to content

Commit bcf8ead

Browse files
authored
Merge pull request #398 from splitio/async-redis-impression-storage
Added redis impressions async storage
2 parents a2ea494 + 6e8c098 commit bcf8ead

File tree

2 files changed

+263
-25
lines changed

2 files changed

+263
-25
lines changed

splitio/storage/redis.py

Lines changed: 101 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -562,24 +562,12 @@ def get_segments_keys_count(self):
562562
"""
563563
return 0
564564

565-
class RedisImpressionsStorage(ImpressionStorage, ImpressionPipelinedStorage):
566-
"""Redis based event storage class."""
565+
class RedisImpressionsStorageBase(ImpressionStorage, ImpressionPipelinedStorage):
566+
"""Redis based event storage base class."""
567567

568568
IMPRESSIONS_QUEUE_KEY = 'SPLITIO.impressions'
569569
IMPRESSIONS_KEY_DEFAULT_TTL = 3600
570570

571-
def __init__(self, redis_client, sdk_metadata):
572-
"""
573-
Class constructor.
574-
575-
:param redis_client: Redis client or compliant interface.
576-
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
577-
:param sdk_metadata: SDK & Machine information.
578-
:type sdk_metadata: splitio.client.util.SdkMetadata
579-
"""
580-
self._redis = redis_client
581-
self._sdk_metadata = sdk_metadata
582-
583571
def _wrap_impressions(self, impressions):
584572
"""
585573
Wrap impressions to be stored in redis
@@ -621,8 +609,7 @@ def expire_key(self, total_keys, inserted):
621609
:param inserted: added keys.
622610
:type inserted: int
623611
"""
624-
if total_keys == inserted:
625-
self._redis.expire(self.IMPRESSIONS_QUEUE_KEY, self.IMPRESSIONS_KEY_DEFAULT_TTL)
612+
pass
626613

627614
def add_impressions_to_pipe(self, impressions, pipe):
628615
"""
@@ -638,6 +625,61 @@ def add_impressions_to_pipe(self, impressions, pipe):
638625
_LOGGER.debug(bulk_impressions)
639626
pipe.rpush(self.IMPRESSIONS_QUEUE_KEY, *bulk_impressions)
640627

628+
def put(self, impressions):
629+
"""
630+
Add an impression to the redis storage.
631+
632+
:param impressions: Impression to add to the queue.
633+
:type impressions: splitio.models.impressions.Impression
634+
635+
:return: Whether the impression has been added or not.
636+
:rtype: bool
637+
"""
638+
pass
639+
640+
def pop_many(self, count):
641+
"""
642+
Pop the oldest N events from storage.
643+
644+
:param count: Number of events to pop.
645+
:type count: int
646+
"""
647+
raise NotImplementedError('Only redis-consumer mode is supported.')
648+
649+
def clear(self):
650+
"""
651+
Clear data.
652+
"""
653+
raise NotImplementedError('Not supported for redis.')
654+
655+
656+
class RedisImpressionsStorage(RedisImpressionsStorageBase):
657+
"""Redis based event storage class."""
658+
659+
def __init__(self, redis_client, sdk_metadata):
660+
"""
661+
Class constructor.
662+
663+
:param redis_client: Redis client or compliant interface.
664+
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
665+
:param sdk_metadata: SDK & Machine information.
666+
:type sdk_metadata: splitio.client.util.SdkMetadata
667+
"""
668+
self._redis = redis_client
669+
self._sdk_metadata = sdk_metadata
670+
671+
def expire_key(self, total_keys, inserted):
672+
"""
673+
Set expire
674+
675+
:param total_keys: length of keys.
676+
:type total_keys: int
677+
:param inserted: added keys.
678+
:type inserted: int
679+
"""
680+
if total_keys == inserted:
681+
self._redis.expire(self.IMPRESSIONS_QUEUE_KEY, self.IMPRESSIONS_KEY_DEFAULT_TTL)
682+
641683
def put(self, impressions):
642684
"""
643685
Add an impression to the redis storage.
@@ -660,20 +702,55 @@ def put(self, impressions):
660702
_LOGGER.error('Error: ', exc_info=True)
661703
return False
662704

663-
def pop_many(self, count):
705+
706+
class RedisImpressionsStorageAsync(RedisImpressionsStorageBase):
707+
"""Redis based event storage async class."""
708+
709+
def __init__(self, redis_client, sdk_metadata):
664710
"""
665-
Pop the oldest N events from storage.
711+
Class constructor.
666712
667-
:param count: Number of events to pop.
668-
:type count: int
713+
:param redis_client: Redis client or compliant interface.
714+
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
715+
:param sdk_metadata: SDK & Machine information.
716+
:type sdk_metadata: splitio.client.util.SdkMetadata
669717
"""
670-
raise NotImplementedError('Only redis-consumer mode is supported.')
718+
self._redis = redis_client
719+
self._sdk_metadata = sdk_metadata
671720

672-
def clear(self):
721+
async def expire_key(self, total_keys, inserted):
673722
"""
674-
Clear data.
723+
Set expire
724+
725+
:param total_keys: length of keys.
726+
:type total_keys: int
727+
:param inserted: added keys.
728+
:type inserted: int
675729
"""
676-
raise NotImplementedError('Not supported for redis.')
730+
if total_keys == inserted:
731+
await self._redis.expire(self.IMPRESSIONS_QUEUE_KEY, self.IMPRESSIONS_KEY_DEFAULT_TTL)
732+
733+
async def put(self, impressions):
734+
"""
735+
Add an impression to the redis storage.
736+
737+
:param impressions: Impression to add to the queue.
738+
:type impressions: splitio.models.impressions.Impression
739+
740+
:return: Whether the impression has been added or not.
741+
:rtype: bool
742+
"""
743+
bulk_impressions = self._wrap_impressions(impressions)
744+
try:
745+
_LOGGER.debug("Adding Impressions to redis key %s" % (self.IMPRESSIONS_QUEUE_KEY))
746+
_LOGGER.debug(bulk_impressions)
747+
inserted = await self._redis.rpush(self.IMPRESSIONS_QUEUE_KEY, *bulk_impressions)
748+
await self.expire_key(inserted, len(bulk_impressions))
749+
return True
750+
except RedisAdapterException:
751+
_LOGGER.error('Something went wrong when trying to add impression to redis')
752+
_LOGGER.error('Error: ', exc_info=True)
753+
return False
677754

678755

679756
class RedisEventsStorage(EventStorage):

tests/storage/test_redis.py

Lines changed: 162 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from splitio.client.util import get_metadata, SdkMetadata
1010
from splitio.optional.loaders import asyncio
11-
from splitio.storage.redis import RedisEventsStorage, RedisImpressionsStorage, \
11+
from splitio.storage.redis import RedisEventsStorage, RedisImpressionsStorage, RedisImpressionsStorageAsync\
1212
RedisSegmentStorage, RedisSplitStorage, RedisSplitStorageAsync, RedisTelemetryStorage
1313
from splitio.storage.adapters.redis import RedisAdapter, RedisAdapterException, build
1414
from redis.asyncio.client import Redis as aioredis
@@ -590,6 +590,167 @@ def test_add_impressions_to_pipe(self, mocker):
590590
storage.add_impressions_to_pipe(impressions, adapter)
591591
assert adapter.rpush.mock_calls == [mocker.call('SPLITIO.impressions', *to_validate)]
592592

593+
def test_expire_key(self, mocker):
594+
adapter = mocker.Mock(spec=RedisAdapter)
595+
metadata = get_metadata({})
596+
storage = RedisImpressionsStorage(adapter, metadata)
597+
598+
self.key = None
599+
self.ttl = None
600+
def expire(key, ttl):
601+
self.key = key
602+
self.ttl = ttl
603+
adapter.expire = expire
604+
605+
storage.expire_key(2, 2)
606+
assert self.key == 'SPLITIO.impressions'
607+
assert self.ttl == 3600
608+
609+
self.key = None
610+
storage.expire_key(2, 1)
611+
assert self.key == None
612+
613+
614+
class RedisImpressionsStorageAsyncTests(object): # pylint: disable=too-few-public-methods
615+
"""Redis Impressions async storage test cases."""
616+
617+
def test_wrap_impressions(self, mocker):
618+
"""Test wrap impressions."""
619+
adapter = mocker.Mock(spec=RedisAdapterAsync)
620+
metadata = get_metadata({})
621+
storage = RedisImpressionsStorageAsync(adapter, metadata)
622+
623+
impressions = [
624+
Impression('key1', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654),
625+
Impression('key2', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
626+
Impression('key3', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
627+
Impression('key4', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654)
628+
]
629+
630+
to_validate = [json.dumps({
631+
'm': { # METADATA PORTION
632+
's': metadata.sdk_version,
633+
'n': metadata.instance_name,
634+
'i': metadata.instance_ip,
635+
},
636+
'i': { # IMPRESSION PORTION
637+
'k': impression.matching_key,
638+
'b': impression.bucketing_key,
639+
'f': impression.feature_name,
640+
't': impression.treatment,
641+
'r': impression.label,
642+
'c': impression.change_number,
643+
'm': impression.time,
644+
}
645+
}) for impression in impressions]
646+
647+
assert storage._wrap_impressions(impressions) == to_validate
648+
649+
@pytest.mark.asyncio
650+
async def test_add_impressions(self, mocker):
651+
"""Test that adding impressions to storage works."""
652+
adapter = mocker.Mock(spec=RedisAdapterAsync)
653+
metadata = get_metadata({})
654+
storage = RedisImpressionsStorageAsync(adapter, metadata)
655+
656+
impressions = [
657+
Impression('key1', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654),
658+
Impression('key2', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
659+
Impression('key3', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
660+
Impression('key4', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654)
661+
]
662+
self.key = None
663+
self.imps = None
664+
async def rpush(key, *imps):
665+
self.key = key
666+
self.imps = imps
667+
668+
adapter.rpush = rpush
669+
assert await storage.put(impressions) is True
670+
671+
to_validate = [json.dumps({
672+
'm': { # METADATA PORTION
673+
's': metadata.sdk_version,
674+
'n': metadata.instance_name,
675+
'i': metadata.instance_ip,
676+
},
677+
'i': { # IMPRESSION PORTION
678+
'k': impression.matching_key,
679+
'b': impression.bucketing_key,
680+
'f': impression.feature_name,
681+
't': impression.treatment,
682+
'r': impression.label,
683+
'c': impression.change_number,
684+
'm': impression.time,
685+
}
686+
}) for impression in impressions]
687+
688+
assert self.key == 'SPLITIO.impressions'
689+
assert self.imps == tuple(to_validate)
690+
691+
# Assert that if an exception is thrown it's caught and False is returned
692+
adapter.reset_mock()
693+
694+
async def rpush2(key, *imps):
695+
raise RedisAdapterException('something')
696+
adapter.rpush = rpush2
697+
assert await storage.put(impressions) is False
698+
699+
def test_add_impressions_to_pipe(self, mocker):
700+
"""Test that adding impressions to storage works."""
701+
adapter = mocker.Mock(spec=RedisAdapterAsync)
702+
metadata = get_metadata({})
703+
storage = RedisImpressionsStorageAsync(adapter, metadata)
704+
705+
impressions = [
706+
Impression('key1', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654),
707+
Impression('key2', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
708+
Impression('key3', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
709+
Impression('key4', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654)
710+
]
711+
712+
to_validate = [json.dumps({
713+
'm': { # METADATA PORTION
714+
's': metadata.sdk_version,
715+
'n': metadata.instance_name,
716+
'i': metadata.instance_ip,
717+
},
718+
'i': { # IMPRESSION PORTION
719+
'k': impression.matching_key,
720+
'b': impression.bucketing_key,
721+
'f': impression.feature_name,
722+
't': impression.treatment,
723+
'r': impression.label,
724+
'c': impression.change_number,
725+
'm': impression.time,
726+
}
727+
}) for impression in impressions]
728+
729+
storage.add_impressions_to_pipe(impressions, adapter)
730+
assert adapter.rpush.mock_calls == [mocker.call('SPLITIO.impressions', *to_validate)]
731+
732+
@pytest.mark.asyncio
733+
async def test_expire_key(self, mocker):
734+
adapter = mocker.Mock(spec=RedisAdapterAsync)
735+
metadata = get_metadata({})
736+
storage = RedisImpressionsStorageAsync(adapter, metadata)
737+
738+
self.key = None
739+
self.ttl = None
740+
async def expire(key, ttl):
741+
self.key = key
742+
self.ttl = ttl
743+
adapter.expire = expire
744+
745+
await storage.expire_key(2, 2)
746+
assert self.key == 'SPLITIO.impressions'
747+
assert self.ttl == 3600
748+
749+
self.key = None
750+
await storage.expire_key(2, 1)
751+
assert self.key == None
752+
753+
593754

594755
class RedisEventsStorageTests(object): # pylint: disable=too-few-public-methods
595756
"""Redis Impression storage test cases."""

0 commit comments

Comments
 (0)