Skip to content

Commit 630f890

Browse files
committed
Added redis impressions async storage
1 parent dfc55d4 commit 630f890

File tree

2 files changed

+222
-26
lines changed

2 files changed

+222
-26
lines changed

splitio/storage/redis.py

Lines changed: 101 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -385,24 +385,12 @@ def get_segments_keys_count(self):
385385
"""
386386
return 0
387387

388-
class RedisImpressionsStorage(ImpressionStorage, ImpressionPipelinedStorage):
389-
"""Redis based event storage class."""
388+
class RedisImpressionsStorageBase(ImpressionStorage, ImpressionPipelinedStorage):
389+
"""Redis based event storage base class."""
390390

391391
IMPRESSIONS_QUEUE_KEY = 'SPLITIO.impressions'
392392
IMPRESSIONS_KEY_DEFAULT_TTL = 3600
393393

394-
def __init__(self, redis_client, sdk_metadata):
395-
"""
396-
Class constructor.
397-
398-
:param redis_client: Redis client or compliant interface.
399-
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
400-
:param sdk_metadata: SDK & Machine information.
401-
:type sdk_metadata: splitio.client.util.SdkMetadata
402-
"""
403-
self._redis = redis_client
404-
self._sdk_metadata = sdk_metadata
405-
406394
def _wrap_impressions(self, impressions):
407395
"""
408396
Wrap impressions to be stored in redis
@@ -444,8 +432,7 @@ def expire_key(self, total_keys, inserted):
444432
:param inserted: added keys.
445433
:type inserted: int
446434
"""
447-
if total_keys == inserted:
448-
self._redis.expire(self.IMPRESSIONS_QUEUE_KEY, self.IMPRESSIONS_KEY_DEFAULT_TTL)
435+
pass
449436

450437
def add_impressions_to_pipe(self, impressions, pipe):
451438
"""
@@ -461,6 +448,61 @@ def add_impressions_to_pipe(self, impressions, pipe):
461448
_LOGGER.debug(bulk_impressions)
462449
pipe.rpush(self.IMPRESSIONS_QUEUE_KEY, *bulk_impressions)
463450

451+
def put(self, impressions):
452+
"""
453+
Add an impression to the redis storage.
454+
455+
:param impressions: Impression to add to the queue.
456+
:type impressions: splitio.models.impressions.Impression
457+
458+
:return: Whether the impression has been added or not.
459+
:rtype: bool
460+
"""
461+
pass
462+
463+
def pop_many(self, count):
464+
"""
465+
Pop the oldest N events from storage.
466+
467+
:param count: Number of events to pop.
468+
:type count: int
469+
"""
470+
raise NotImplementedError('Only redis-consumer mode is supported.')
471+
472+
def clear(self):
473+
"""
474+
Clear data.
475+
"""
476+
raise NotImplementedError('Not supported for redis.')
477+
478+
479+
class RedisImpressionsStorage(RedisImpressionsStorageBase):
480+
"""Redis based event storage class."""
481+
482+
def __init__(self, redis_client, sdk_metadata):
483+
"""
484+
Class constructor.
485+
486+
:param redis_client: Redis client or compliant interface.
487+
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
488+
:param sdk_metadata: SDK & Machine information.
489+
:type sdk_metadata: splitio.client.util.SdkMetadata
490+
"""
491+
self._redis = redis_client
492+
self._sdk_metadata = sdk_metadata
493+
494+
def expire_key(self, total_keys, inserted):
495+
"""
496+
Set expire
497+
498+
:param total_keys: length of keys.
499+
:type total_keys: int
500+
:param inserted: added keys.
501+
:type inserted: int
502+
"""
503+
if total_keys == inserted:
504+
self._redis.expire(self.IMPRESSIONS_QUEUE_KEY, self.IMPRESSIONS_KEY_DEFAULT_TTL)
505+
464506
def put(self, impressions):
465507
"""
466508
Add an impression to the redis storage.
@@ -483,20 +525,55 @@ def put(self, impressions):
483525
_LOGGER.error('Error: ', exc_info=True)
484526
return False
485527

486-
def pop_many(self, count):
528+
529+
class RedisImpressionsStorageAsync(RedisImpressionsStorageBase):
530+
"""Redis based event storage async class."""
531+
532+
def __init__(self, redis_client, sdk_metadata):
487533
"""
488-
Pop the oldest N events from storage.
534+
Class constructor.
489535
490-
:param count: Number of events to pop.
491-
:type count: int
536+
:param redis_client: Redis client or compliant interface.
537+
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
538+
:param sdk_metadata: SDK & Machine information.
539+
:type sdk_metadata: splitio.client.util.SdkMetadata
492540
"""
493-
raise NotImplementedError('Only redis-consumer mode is supported.')
541+
self._redis = redis_client
542+
self._sdk_metadata = sdk_metadata
494543

495-
def clear(self):
544+
async def expire_key(self, total_keys, inserted):
496545
"""
497-
Clear data.
546+
Set expire
547+
548+
:param total_keys: length of keys.
549+
:type total_keys: int
550+
:param inserted: added keys.
551+
:type inserted: int
498552
"""
499-
raise NotImplementedError('Not supported for redis.')
553+
if total_keys == inserted:
554+
await self._redis.expire(self.IMPRESSIONS_QUEUE_KEY, self.IMPRESSIONS_KEY_DEFAULT_TTL)
555+
556+
async def put(self, impressions):
557+
"""
558+
Add an impression to the redis storage.
559+
560+
:param impressions: Impression to add to the queue.
561+
:type impressions: splitio.models.impressions.Impression
562+
563+
:return: Whether the impression has been added or not.
564+
:rtype: bool
565+
"""
566+
bulk_impressions = self._wrap_impressions(impressions)
567+
try:
568+
_LOGGER.debug("Adding Impressions to redis key %s" % (self.IMPRESSIONS_QUEUE_KEY))
569+
_LOGGER.debug(bulk_impressions)
570+
inserted = await self._redis.rpush(self.IMPRESSIONS_QUEUE_KEY, *bulk_impressions)
571+
await self.expire_key(inserted, len(bulk_impressions))
572+
return True
573+
except RedisAdapterException:
574+
_LOGGER.error('Something went wrong when trying to add impression to redis')
575+
_LOGGER.error('Error: ', exc_info=True)
576+
return False
500577

501578

502579
class RedisEventsStorage(EventStorage):

tests/storage/test_redis.py

Lines changed: 121 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
import unittest.mock as mock
77
import pytest
88

9+
from splitio.optional.loaders import asyncio
910
from splitio.client.util import get_metadata, SdkMetadata
10-
from splitio.storage.redis import RedisEventsStorage, RedisImpressionsStorage, \
11+
from splitio.storage.redis import RedisEventsStorage, RedisImpressionsStorage, RedisImpressionsStorageAsync, \
1112
RedisSegmentStorage, RedisSplitStorage, RedisTelemetryStorage
12-
from splitio.storage.adapters.redis import RedisAdapter, RedisAdapterException, build
13+
from splitio.storage.adapters.redis import RedisAdapter, RedisAdapterAsync, RedisAdapterException, build
1314
from splitio.models.segments import Segment
1415
from splitio.models.impressions import Impression
1516
from splitio.models.events import Event, EventWrapper
@@ -334,6 +335,124 @@ def test_add_impressions_to_pipe(self, mocker):
334335
storage.add_impressions_to_pipe(impressions, adapter)
335336
assert adapter.rpush.mock_calls == [mocker.call('SPLITIO.impressions', *to_validate)]
336337

338+
class RedisImpressionsStorageAsyncTests(object): # pylint: disable=too-few-public-methods
339+
"""Redis Impressions async storage test cases."""
340+
341+
def test_wrap_impressions(self, mocker):
342+
"""Test wrap impressions."""
343+
adapter = mocker.Mock(spec=RedisAdapterAsync)
344+
metadata = get_metadata({})
345+
storage = RedisImpressionsStorageAsync(adapter, metadata)
346+
347+
impressions = [
348+
Impression('key1', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654),
349+
Impression('key2', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
350+
Impression('key3', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
351+
Impression('key4', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654)
352+
]
353+
354+
to_validate = [json.dumps({
355+
'm': { # METADATA PORTION
356+
's': metadata.sdk_version,
357+
'n': metadata.instance_name,
358+
'i': metadata.instance_ip,
359+
},
360+
'i': { # IMPRESSION PORTION
361+
'k': impression.matching_key,
362+
'b': impression.bucketing_key,
363+
'f': impression.feature_name,
364+
't': impression.treatment,
365+
'r': impression.label,
366+
'c': impression.change_number,
367+
'm': impression.time,
368+
}
369+
}) for impression in impressions]
370+
371+
assert storage._wrap_impressions(impressions) == to_validate
372+
373+
@pytest.mark.asyncio
374+
async def test_add_impressions(self, mocker):
375+
"""Test that adding impressions to storage works."""
376+
adapter = mocker.Mock(spec=RedisAdapterAsync)
377+
metadata = get_metadata({})
378+
storage = RedisImpressionsStorageAsync(adapter, metadata)
379+
380+
impressions = [
381+
Impression('key1', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654),
382+
Impression('key2', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
383+
Impression('key3', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
384+
Impression('key4', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654)
385+
]
386+
self.key = None
387+
self.imps = None
388+
async def rpush(key, *imps):
389+
self.key = key
390+
self.imps = imps
391+
392+
adapter.rpush = rpush
393+
assert await storage.put(impressions) is True
394+
395+
to_validate = [json.dumps({
396+
'm': { # METADATA PORTION
397+
's': metadata.sdk_version,
398+
'n': metadata.instance_name,
399+
'i': metadata.instance_ip,
400+
},
401+
'i': { # IMPRESSION PORTION
402+
'k': impression.matching_key,
403+
'b': impression.bucketing_key,
404+
'f': impression.feature_name,
405+
't': impression.treatment,
406+
'r': impression.label,
407+
'c': impression.change_number,
408+
'm': impression.time,
409+
}
410+
}) for impression in impressions]
411+
412+
assert self.key == 'SPLITIO.impressions'
413+
assert self.imps == tuple(to_validate)
414+
415+
# Assert that if an exception is thrown it's caught and False is returned
416+
adapter.reset_mock()
417+
418+
async def rpush2(key, *imps):
419+
raise RedisAdapterException('something')
420+
adapter.rpush = rpush2
421+
assert await storage.put(impressions) is False
422+
423+
def test_add_impressions_to_pipe(self, mocker):
424+
"""Test that adding impressions to storage works."""
425+
adapter = mocker.Mock(spec=RedisAdapterAsync)
426+
metadata = get_metadata({})
427+
storage = RedisImpressionsStorageAsync(adapter, metadata)
428+
429+
impressions = [
430+
Impression('key1', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654),
431+
Impression('key2', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
432+
Impression('key3', 'feature2', 'on', 'some_label', 123456, 'buck1', 321654),
433+
Impression('key4', 'feature1', 'on', 'some_label', 123456, 'buck1', 321654)
434+
]
435+
436+
to_validate = [json.dumps({
437+
'm': { # METADATA PORTION
438+
's': metadata.sdk_version,
439+
'n': metadata.instance_name,
440+
'i': metadata.instance_ip,
441+
},
442+
'i': { # IMPRESSION PORTION
443+
'k': impression.matching_key,
444+
'b': impression.bucketing_key,
445+
'f': impression.feature_name,
446+
't': impression.treatment,
447+
'r': impression.label,
448+
'c': impression.change_number,
449+
'm': impression.time,
450+
}
451+
}) for impression in impressions]
452+
453+
storage.add_impressions_to_pipe(impressions, adapter)
454+
assert adapter.rpush.mock_calls == [mocker.call('SPLITIO.impressions', *to_validate)]
455+
337456

338457
class RedisEventsStorageTests(object): # pylint: disable=too-few-public-methods
339458
"""Redis Impression storage test cases."""

0 commit comments

Comments
 (0)