@@ -21,7 +21,31 @@ def record_unique_keys(self, data):
2121 """
2222 pass
2323
24- class InMemorySenderAdapter (ImpressionsSenderAdapter ):
24+ class InMemorySenderAdapterBase (ImpressionsSenderAdapter ):
25+ """In Memory Impressions Sender Adapter base class."""
26+
27+ def record_unique_keys (self , uniques ):
28+ """
29+ post the unique keys to split back end.
30+
31+ :param uniques: unique keys disctionary
32+ :type uniques: Dictionary {'feature_flag1': set(), 'feature_flag2': set(), .. }
33+ """
34+ pass
35+
36+ def _uniques_formatter (self , uniques ):
37+ """
38+ Format the unique keys dictionary array to a JSON body
39+
40+ :param uniques: unique keys disctionary
41+ :type uniques: Dictionary {'feature1_flag': set(), 'feature2_flag': set(), .. }
42+
43+ :return: unique keys JSON array
44+ :rtype: json
45+ """
46+ return [{'f' : feature , 'ks' : list (keys )} for feature , keys in uniques .items ()]
47+
48+ class InMemorySenderAdapter (InMemorySenderAdapterBase ):
2549 """In Memory Impressions Sender Adapter class."""
2650
2751 def __init__ (self , telemtry_http_client ):
@@ -42,24 +66,35 @@ def record_unique_keys(self, uniques):
4266 """
4367 self ._telemtry_http_client .record_unique_keys ({'keys' : self ._uniques_formatter (uniques )})
4468
45- def _uniques_formatter (self , uniques ):
69+
70+ class InMemorySenderAdapterAsync (InMemorySenderAdapterBase ):
71+ """In Memory Impressions Sender Adapter class."""
72+
73+ def __init__ (self , telemtry_http_client ):
4674 """
47- Format the unique keys dictionary array to a JSON body
75+ Initialize In memory sender adapter instance
4876
49- :param uniques: unique keys disctionary
50- :type uniques: Dictionary {'feature1_flag': set(), 'feature2_flag': set(), .. }
77+ :param telemtry_http_client: instance of telemetry http api
78+ :type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
79+ """
80+ self ._telemtry_http_client = telemtry_http_client
5181
52- :return: unique keys JSON array
53- :rtype: json
82+ async def record_unique_keys (self , uniques ):
5483 """
55- return [{'f' : feature , 'ks' : list (keys )} for feature , keys in uniques .items ()]
84+ post the unique keys to split back end.
85+
86+ :param uniques: unique keys disctionary
87+ :type uniques: Dictionary {'feature_flag1': set(), 'feature_flag2': set(), .. }
88+ """
89+ await self ._telemtry_http_client .record_unique_keys ({'keys' : self ._uniques_formatter (uniques )})
90+
5691
5792class RedisSenderAdapter (ImpressionsSenderAdapter ):
58- """In Memory Impressions Sender Adapter class."""
93+ """Redis Impressions Sender Adapter class."""
5994
6095 def __init__ (self , redis_client ):
6196 """
62- Initialize In memory sender adapter instance
97+ Initialize Redis sender adapter instance
6398
6499 :param telemtry_http_client: instance of telemetry http api
65100 :type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
@@ -118,8 +153,74 @@ def _expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
118153 if total_keys == inserted :
119154 self ._redis_client .expire (queue_key , key_default_ttl )
120155
156+
157+ class RedisSenderAdapterAsync (ImpressionsSenderAdapter ):
158+ """In Redis Impressions Sender Adapter async class."""
159+
160+ def __init__ (self , redis_client ):
161+ """
162+ Initialize Redis sender adapter instance
163+
164+ :param telemtry_http_client: instance of telemetry http api
165+ :type telemtry_http_client: splitio.api.telemetry.TelemetryAPI
166+ """
167+ self ._redis_client = redis_client
168+
169+ async def record_unique_keys (self , uniques ):
170+ """
171+ post the unique keys to redis.
172+
173+ :param uniques: unique keys disctionary
174+ :type uniques: Dictionary {'feature_flag1': set(), 'feature_flag2': set(), .. }
175+ """
176+ bulk_mtks = _uniques_formatter (uniques )
177+ try :
178+ inserted = await self ._redis_client .rpush (_MTK_QUEUE_KEY , * bulk_mtks )
179+ await self ._expire_keys (_MTK_QUEUE_KEY , _MTK_KEY_DEFAULT_TTL , inserted , len (bulk_mtks ))
180+ return True
181+ except RedisAdapterException :
182+ _LOGGER .error ('Something went wrong when trying to add mtks to redis' )
183+ _LOGGER .error ('Error: ' , exc_info = True )
184+ return False
185+
186+ async def flush_counters (self , to_send ):
187+ """
188+ post the impression counters to redis.
189+
190+ :param to_send: unique keys disctionary
191+ :type to_send: Dictionary {'feature_flag1': set(), 'feature_flag2': set(), .. }
192+ """
193+ try :
194+ resulted = 0
195+ counted = 0
196+ pipe = self ._redis_client .pipeline ()
197+ for pf_count in to_send :
198+ pipe .hincrby (_IMP_COUNT_QUEUE_KEY , pf_count .feature + "::" + str (pf_count .timeframe ), pf_count .count )
199+ counted += pf_count .count
200+ resulted = sum (await pipe .execute ())
201+ await self ._expire_keys (_IMP_COUNT_QUEUE_KEY ,
202+ _IMP_COUNT_KEY_DEFAULT_TTL , resulted , counted )
203+ return True
204+ except RedisAdapterException :
205+ _LOGGER .error ('Something went wrong when trying to add counters to redis' )
206+ _LOGGER .error ('Error: ' , exc_info = True )
207+ return False
208+
209+ async def _expire_keys (self , queue_key , key_default_ttl , total_keys , inserted ):
210+ """
211+ Set expire
212+
213+ :param total_keys: length of keys.
214+ :type total_keys: int
215+ :param inserted: added keys.
216+ :type inserted: int
217+ """
218+ if total_keys == inserted :
219+ await self ._redis_client .expire (queue_key , key_default_ttl )
220+
221+
121222class PluggableSenderAdapter (ImpressionsSenderAdapter ):
122- """In Memory Impressions Sender Adapter class."""
223+ """Pluggable Impressions Sender Adapter class."""
123224
124225 def __init__ (self , adapter_client , prefix = None ):
125226 """
0 commit comments