Skip to content

Commit 71d1717

Browse files
committed
Added engine.impressions.adapters async classes
1 parent 23ba8e0 commit 71d1717

File tree

2 files changed

+203
-11
lines changed

2 files changed

+203
-11
lines changed

splitio/engine/impressions/adapters.py

Lines changed: 109 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,31 @@ def record_unique_keys(self, data):
2121
"""
2222
pass
2323

24-
class InMemorySenderAdapter(ImpressionsSenderAdapter):
24+
class InMemorySenderAdapterBase(ImpressionsSenderAdapter):
25+
"""In Memory Impressions Sender Adapter base class."""
26+
27+
def record_unique_keys(self, uniques):
28+
"""
29+
post the unique keys to split back end.
30+
31+
:param uniques: unique keys disctionary
32+
:type uniques: Dictionary {'feature_flag1': set(), 'feature_flag2': set(), .. }
33+
"""
34+
pass
35+
36+
def _uniques_formatter(self, uniques):
37+
"""
38+
Format the unique keys dictionary array to a JSON body
39+
40+
:param uniques: unique keys disctionary
41+
:type uniques: Dictionary {'feature1_flag': set(), 'feature2_flag': set(), .. }
42+
43+
:return: unique keys JSON array
44+
:rtype: json
45+
"""
46+
return [{'f': feature, 'ks': list(keys)} for feature, keys in uniques.items()]
47+
48+
class InMemorySenderAdapter(InMemorySenderAdapterBase):
2549
"""In Memory Impressions Sender Adapter class."""
2650

2751
def __init__(self, telemtry_http_client):
@@ -42,17 +66,28 @@ def record_unique_keys(self, uniques):
4266
"""
4367
self._telemtry_http_client.record_unique_keys({'keys': self._uniques_formatter(uniques)})
4468

45-
def _uniques_formatter(self, uniques):
69+
70+
class InMemorySenderAdapterAsync(InMemorySenderAdapterBase):
71+
"""In Memory Impressions Sender Adapter class."""
72+
73+
def __init__(self, telemtry_http_client):
4674
"""
47-
Format the unique keys dictionary array to a JSON body
75+
Initialize In memory sender adapter instance
4876
49-
:param uniques: unique keys disctionary
50-
:type uniques: Dictionary {'feature1_flag': set(), 'feature2_flag': set(), .. }
77+
:param telemtry_http_client: instance of telemetry http api
78+
:type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
79+
"""
80+
self._telemtry_http_client = telemtry_http_client
5181

52-
:return: unique keys JSON array
53-
:rtype: json
82+
async def record_unique_keys(self, uniques):
5483
"""
55-
return [{'f': feature, 'ks': list(keys)} for feature, keys in uniques.items()]
84+
post the unique keys to split back end.
85+
86+
:param uniques: unique keys disctionary
87+
:type uniques: Dictionary {'feature_flag1': set(), 'feature_flag2': set(), .. }
88+
"""
89+
await self._telemtry_http_client.record_unique_keys({'keys': self._uniques_formatter(uniques)})
90+
5691

5792
class RedisSenderAdapter(ImpressionsSenderAdapter):
5893
"""In Memory Impressions Sender Adapter class."""
@@ -118,6 +153,72 @@ def _expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
118153
if total_keys == inserted:
119154
self._redis_client.expire(queue_key, key_default_ttl)
120155

156+
157+
class RedisSenderAdapterAsync(ImpressionsSenderAdapter):
158+
"""In Memory Impressions Sender Adapter async class."""
159+
160+
def __init__(self, redis_client):
161+
"""
162+
Initialize In memory sender adapter instance
163+
164+
:param telemtry_http_client: instance of telemetry http api
165+
:type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
166+
"""
167+
self._redis_client = redis_client
168+
169+
async def record_unique_keys(self, uniques):
170+
"""
171+
post the unique keys to redis.
172+
173+
:param uniques: unique keys disctionary
174+
:type uniques: Dictionary {'feature_flag1': set(), 'feature_flag2': set(), .. }
175+
"""
176+
bulk_mtks = _uniques_formatter(uniques)
177+
try:
178+
inserted = await self._redis_client.rpush(_MTK_QUEUE_KEY, *bulk_mtks)
179+
await self._expire_keys(_MTK_QUEUE_KEY, _MTK_KEY_DEFAULT_TTL, inserted, len(bulk_mtks))
180+
return True
181+
except RedisAdapterException:
182+
_LOGGER.error('Something went wrong when trying to add mtks to redis')
183+
_LOGGER.error('Error: ', exc_info=True)
184+
return False
185+
186+
async def flush_counters(self, to_send):
187+
"""
188+
post the impression counters to redis.
189+
190+
:param to_send: unique keys disctionary
191+
:type to_send: Dictionary {'feature_flag1': set(), 'feature_flag2': set(), .. }
192+
"""
193+
try:
194+
resulted = 0
195+
counted = 0
196+
pipe = self._redis_client.pipeline()
197+
for pf_count in to_send:
198+
pipe.hincrby(_IMP_COUNT_QUEUE_KEY, pf_count.feature + "::" + str(pf_count.timeframe), pf_count.count)
199+
counted += pf_count.count
200+
resulted = sum(await pipe.execute())
201+
await self._expire_keys(_IMP_COUNT_QUEUE_KEY,
202+
_IMP_COUNT_KEY_DEFAULT_TTL, resulted, counted)
203+
return True
204+
except RedisAdapterException:
205+
_LOGGER.error('Something went wrong when trying to add counters to redis')
206+
_LOGGER.error('Error: ', exc_info=True)
207+
return False
208+
209+
async def _expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
210+
"""
211+
Set expire
212+
213+
:param total_keys: length of keys.
214+
:type total_keys: int
215+
:param inserted: added keys.
216+
:type inserted: int
217+
"""
218+
if total_keys == inserted:
219+
await self._redis_client.expire(queue_key, key_default_ttl)
220+
221+
121222
class PluggableSenderAdapter(ImpressionsSenderAdapter):
122223
"""In Memory Impressions Sender Adapter class."""
123224

tests/engine/test_send_adapters.py

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
import ast
33
import json
44
import pytest
5+
import redis.asyncio as aioredis
56

6-
from splitio.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter, PluggableSenderAdapter
7+
from splitio.engine.impressions.adapters import InMemorySenderAdapter, RedisSenderAdapter, PluggableSenderAdapter, InMemorySenderAdapterAsync, RedisSenderAdapterAsync
78
from splitio.engine.impressions import adapters
8-
from splitio.api.telemetry import TelemetryAPI
9-
from splitio.storage.adapters.redis import RedisAdapter
9+
from splitio.api.telemetry import TelemetryAPI, TelemetryAPIAsync
10+
from splitio.storage.adapters.redis import RedisAdapter, RedisAdapterAsync
1011
from splitio.engine.impressions.manager import Counter
1112
from tests.storage.test_pluggable import StorageMockAdapter
1213

@@ -43,6 +44,28 @@ def test_record_unique_keys(self, mocker):
4344

4445
assert(mocker.called)
4546

47+
48+
class InMemorySenderAdapterAsyncTests(object):
49+
"""In memory sender adapter test."""
50+
51+
@pytest.mark.asyncio
52+
async def test_record_unique_keys(self, mocker):
53+
"""Test sending unique keys."""
54+
55+
uniques = {"feature1": set({'key1', 'key2', 'key3'}),
56+
"feature2": set({'key1', 'key2', 'key3'}),
57+
}
58+
telemetry_api = TelemetryAPIAsync(mocker.Mock(), 'some_api_key', mocker.Mock(), mocker.Mock())
59+
self.called = False
60+
async def record_unique_keys(*args):
61+
self.called = True
62+
63+
telemetry_api.record_unique_keys = record_unique_keys
64+
sender_adapter = InMemorySenderAdapterAsync(telemetry_api)
65+
await sender_adapter.record_unique_keys(uniques)
66+
assert(self.called)
67+
68+
4669
class RedisSenderAdapterTests(object):
4770
"""Redis sender adapter test."""
4871

@@ -103,6 +126,74 @@ def test_expire_keys(self, mocker):
103126
sender_adapter._expire_keys(mocker.Mock(), mocker.Mock(), total_keys, inserted)
104127
assert(mocker.called)
105128

129+
130+
class RedisSenderAdapterAsyncTests(object):
131+
"""Redis sender adapter test."""
132+
133+
@pytest.mark.asyncio
134+
async def test_record_unique_keys(self, mocker):
135+
"""Test sending unique keys."""
136+
137+
uniques = {"feature1": set({'key1', 'key2', 'key3'}),
138+
"feature2": set({'key1', 'key2', 'key3'}),
139+
}
140+
redis_client = RedisAdapterAsync(mocker.Mock(), mocker.Mock())
141+
sender_adapter = RedisSenderAdapterAsync(redis_client)
142+
143+
self.called = False
144+
async def rpush(*args):
145+
self.called = True
146+
147+
redis_client.rpush = rpush
148+
await sender_adapter.record_unique_keys(uniques)
149+
assert(self.called)
150+
151+
@pytest.mark.asyncio
152+
async def test_flush_counters(self, mocker):
153+
"""Test sending counters."""
154+
155+
counters = [
156+
Counter.CountPerFeature('f1', 123, 2),
157+
Counter.CountPerFeature('f2', 123, 123),
158+
]
159+
redis_client = await aioredis.from_url("redis://localhost")
160+
sender_adapter = RedisSenderAdapterAsync(redis_client)
161+
self.called = False
162+
def hincrby(*args):
163+
self.called = True
164+
self.called2 = False
165+
async def execute(*args):
166+
self.called2 = True
167+
return [1]
168+
169+
with mock.patch('redis.asyncio.client.Pipeline.hincrby', hincrby):
170+
with mock.patch('redis.asyncio.client.Pipeline.execute', execute):
171+
await sender_adapter.flush_counters(counters)
172+
assert(self.called)
173+
assert(self.called2)
174+
175+
@pytest.mark.asyncio
176+
async def test_expire_keys(self, mocker):
177+
"""Test set expire key."""
178+
179+
total_keys = 100
180+
inserted = 10
181+
redis_client = RedisAdapterAsync(mocker.Mock(), mocker.Mock())
182+
sender_adapter = RedisSenderAdapterAsync(redis_client)
183+
self.called = False
184+
async def expire(*args):
185+
self.called = True
186+
redis_client.expire = expire
187+
188+
await sender_adapter._expire_keys(mocker.Mock(), mocker.Mock(), total_keys, inserted)
189+
assert(not self.called)
190+
191+
total_keys = 100
192+
inserted = 100
193+
await sender_adapter._expire_keys(mocker.Mock(), mocker.Mock(), total_keys, inserted)
194+
assert(self.called)
195+
196+
106197
class PluggableSenderAdapterTests(object):
107198
"""Pluggable sender adapter test."""
108199

0 commit comments

Comments
 (0)