Skip to content

Commit 59840c9

Browse files
authored
Merge pull request #399 from splitio/async-redis-event-storage
added redis event async storage class
2 parents e59dcd6 + c0a708d commit 59840c9

File tree

2 files changed

+200
-25
lines changed

2 files changed

+200
-25
lines changed

splitio/storage/redis.py

Lines changed: 101 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -686,24 +686,12 @@ async def put(self, impressions):
686686
return False
687687

688688

689-
class RedisEventsStorage(EventStorage):
690-
"""Redis based event storage class."""
689+
class RedisEventsStorageBase(EventStorage):
690+
"""Redis based event storage base class."""
691691

692692
_EVENTS_KEY_TEMPLATE = 'SPLITIO.events'
693693
_EVENTS_KEY_DEFAULT_TTL = 3600
694694

695-
def __init__(self, redis_client, sdk_metadata):
696-
"""
697-
Class constructor.
698-
699-
:param redis_client: Redis client or compliant interface.
700-
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
701-
:param sdk_metadata: SDK & Machine information.
702-
:type sdk_metadata: splitio.client.util.SdkMetadata
703-
"""
704-
self._redis = redis_client
705-
self._sdk_metadata = sdk_metadata
706-
707695
def add_events_to_pipe(self, events, pipe):
708696
"""
709697
Add put operation to pipeline
@@ -738,6 +726,59 @@ def _wrap_events(self, events):
738726
for e in events
739727
]
740728

729+
def put(self, events):
730+
"""
731+
Add an event to the redis storage.
732+
733+
:param event: Event to add to the queue.
734+
:type event: splitio.models.events.Event
735+
736+
:return: Whether the event has been added or not.
737+
:rtype: bool
738+
"""
739+
pass
740+
741+
def pop_many(self, count):
742+
"""
743+
Pop the oldest N events from storage.
744+
745+
:param count: Number of events to pop.
746+
:type count: int
747+
"""
748+
raise NotImplementedError('Only redis-consumer mode is supported.')
749+
750+
def clear(self):
751+
"""
752+
Clear data.
753+
"""
754+
raise NotImplementedError('Not supported for redis.')
755+
756+
def expire_keys(self, total_keys, inserted):
757+
"""
758+
Set expire
759+
760+
:param total_keys: length of keys.
761+
:type total_keys: int
762+
:param inserted: added keys.
763+
:type inserted: int
764+
"""
765+
pass
766+
767+
class RedisEventsStorage(RedisEventsStorageBase):
768+
"""Redis based event storage class."""
769+
770+
def __init__(self, redis_client, sdk_metadata):
771+
"""
772+
Class constructor.
773+
774+
:param redis_client: Redis client or compliant interface.
775+
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
776+
:param sdk_metadata: SDK & Machine information.
777+
:type sdk_metadata: splitio.client.util.SdkMetadata
778+
"""
779+
self._redis = redis_client
780+
self._sdk_metadata = sdk_metadata
781+
741782
def put(self, events):
742783
"""
743784
Add an event to the redis storage.
@@ -760,22 +801,57 @@ def put(self, events):
760801
_LOGGER.debug('Error: ', exc_info=True)
761802
return False
762803

763-
def pop_many(self, count):
804+
def expire_keys(self, total_keys, inserted):
764805
"""
765-
Pop the oldest N events from storage.
806+
Set expire
766807
767-
:param count: Number of events to pop.
768-
:type count: int
808+
:param total_keys: length of keys.
809+
:type total_keys: int
810+
:param inserted: added keys.
811+
:type inserted: int
769812
"""
770-
raise NotImplementedError('Only redis-consumer mode is supported.')
813+
if total_keys == inserted:
814+
self._redis.expire(self._EVENTS_KEY_TEMPLATE, self._EVENTS_KEY_DEFAULT_TTL)
771815

772-
def clear(self):
816+
817+
class RedisEventsStorageAsync(RedisEventsStorageBase):
818+
"""Redis based event async storage class."""
819+
820+
def __init__(self, redis_client, sdk_metadata):
773821
"""
774-
Clear data.
822+
Class constructor.
823+
824+
:param redis_client: Redis client or compliant interface.
825+
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
826+
:param sdk_metadata: SDK & Machine information.
827+
:type sdk_metadata: splitio.client.util.SdkMetadata
775828
"""
776-
raise NotImplementedError('Not supported for redis.')
829+
self._redis = redis_client
830+
self._sdk_metadata = sdk_metadata
777831

778-
def expire_keys(self, total_keys, inserted):
832+
async def put(self, events):
833+
"""
834+
Add an event to the redis storage.
835+
836+
:param event: Event to add to the queue.
837+
:type event: splitio.models.events.Event
838+
839+
:return: Whether the event has been added or not.
840+
:rtype: bool
841+
"""
842+
key = self._EVENTS_KEY_TEMPLATE
843+
to_store = self._wrap_events(events)
844+
try:
845+
_LOGGER.debug("Adding Events to redis key %s" % (key))
846+
_LOGGER.debug(to_store)
847+
await self._redis.rpush(key, *to_store)
848+
return True
849+
except RedisAdapterException:
850+
_LOGGER.error('Something went wrong when trying to add event to redis')
851+
_LOGGER.debug('Error: ', exc_info=True)
852+
return False
853+
854+
async def expire_keys(self, total_keys, inserted):
779855
"""
780856
Set expire
781857
@@ -785,7 +861,8 @@ def expire_keys(self, total_keys, inserted):
785861
:type inserted: int
786862
"""
787863
if total_keys == inserted:
788-
self._redis.expire(self._EVENTS_KEY_TEMPLATE, self._EVENTS_KEY_DEFAULT_TTL)
864+
await self._redis.expire(self._EVENTS_KEY_TEMPLATE, self._EVENTS_KEY_DEFAULT_TTL)
865+
789866

790867
class RedisTelemetryStorageBase(TelemetryStorage):
791868
"""Redis based telemetry storage class."""

tests/storage/test_redis.py

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
import pytest
99

1010
from splitio.client.util import get_metadata, SdkMetadata
11+
from splitio.storage.adapters.redis import RedisAdapter, RedisAdapterAsync, RedisAdapterException, build
1112
from splitio.optional.loaders import asyncio
12-
from splitio.storage.redis import RedisEventsStorage, RedisImpressionsStorage, RedisImpressionsStorageAsync, \
13+
from splitio.storage.redis import RedisEventsStorage, RedisEventsStorageAsync, RedisImpressionsStorage, RedisImpressionsStorageAsync, \
1314
RedisSegmentStorage, RedisSegmentStorageAsync, RedisSplitStorage, RedisSplitStorageAsync, RedisTelemetryStorage, RedisTelemetryStorageAsync
1415
from splitio.storage.adapters.redis import RedisAdapter, RedisAdapterException, build
1516
from redis.asyncio.client import Redis as aioredis
@@ -627,6 +628,103 @@ def _raise_exc(*_):
627628
adapter.rpush.side_effect = _raise_exc
628629
assert storage.put(events) is False
629630

631+
def test_expire_keys(self, mocker):
632+
adapter = mocker.Mock(spec=RedisAdapter)
633+
metadata = get_metadata({})
634+
storage = RedisEventsStorage(adapter, metadata)
635+
636+
self.key = None
637+
self.ttl = None
638+
def expire(key, ttl):
639+
self.key = key
640+
self.ttl = ttl
641+
adapter.expire = expire
642+
643+
storage.expire_keys(2, 2)
644+
assert self.key == 'SPLITIO.events'
645+
assert self.ttl == 3600
646+
647+
self.key = None
648+
storage.expire_keys(2, 1)
649+
assert self.key == None
650+
651+
class RedisEventsStorageAsyncTests(object): # pylint: disable=too-few-public-methods
652+
"""Redis Impression async storage test cases."""
653+
654+
@pytest.mark.asyncio
655+
async def test_add_events(self, mocker):
656+
"""Test that adding impressions to storage works."""
657+
adapter = mocker.Mock(spec=RedisAdapterAsync)
658+
metadata = get_metadata({})
659+
660+
storage = RedisEventsStorageAsync(adapter, metadata)
661+
662+
events = [
663+
EventWrapper(event=Event('key1', 'user', 'purchase', 10, 123456, None), size=32768),
664+
EventWrapper(event=Event('key2', 'user', 'purchase', 10, 123456, None), size=32768),
665+
EventWrapper(event=Event('key3', 'user', 'purchase', 10, 123456, None), size=32768),
666+
EventWrapper(event=Event('key4', 'user', 'purchase', 10, 123456, None), size=32768),
667+
]
668+
self.key = None
669+
self.events = None
670+
async def rpush(key, *events):
671+
self.key = key
672+
self.events = events
673+
adapter.rpush = rpush
674+
675+
assert await storage.put(events) is True
676+
677+
list_of_raw_events = [json.dumps({
678+
'e': { # EVENT PORTION
679+
'key': e.event.key,
680+
'trafficTypeName': e.event.traffic_type_name,
681+
'eventTypeId': e.event.event_type_id,
682+
'value': e.event.value,
683+
'timestamp': e.event.timestamp,
684+
'properties': e.event.properties,
685+
},
686+
'm': { # METADATA PORTION
687+
's': metadata.sdk_version,
688+
'n': metadata.instance_name,
689+
'i': metadata.instance_ip,
690+
}
691+
}) for e in events]
692+
693+
assert self.events == tuple(list_of_raw_events)
694+
assert self.key == 'SPLITIO.events'
695+
assert storage._wrap_events(events) == list_of_raw_events
696+
697+
# Assert that if an exception is thrown it's caught and False is returned
698+
adapter.reset_mock()
699+
700+
async def rpush2(key, *events):
701+
raise RedisAdapterException('something')
702+
adapter.rpush = rpush2
703+
assert await storage.put(events) is False
704+
705+
706+
@pytest.mark.asyncio
707+
async def test_expire_keys(self, mocker):
708+
adapter = mocker.Mock(spec=RedisAdapterAsync)
709+
metadata = get_metadata({})
710+
storage = RedisEventsStorageAsync(adapter, metadata)
711+
712+
self.key = None
713+
self.ttl = None
714+
async def expire(key, ttl):
715+
self.key = key
716+
self.ttl = ttl
717+
adapter.expire = expire
718+
719+
await storage.expire_keys(2, 2)
720+
assert self.key == 'SPLITIO.events'
721+
assert self.ttl == 3600
722+
723+
self.key = None
724+
await storage.expire_keys(2, 1)
725+
assert self.key == None
726+
727+
630728
class RedisTelemetryStorageTests(object):
631729
"""Redis Telemetry storage test cases."""
632730

0 commit comments

Comments
 (0)