Skip to content

Commit fe59457

Browse files
authored
Merge pull request #422 from splitio/async-sync-imps
added impressions and impressions count sync classes
2 parents 1b35d9d + de13161 commit fe59457

File tree

3 files changed

+198
-2
lines changed

3 files changed

+198
-2
lines changed

splitio/sync/impression.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
import queue
33

44
from splitio.api import APIException
5+
from splitio.optional.loaders import asyncio
56

67
_LOGGER = logging.getLogger(__name__)
78

89

910
class ImpressionSynchronizer(object):
11+
"""Impressions synchronizer class."""
1012
def __init__(self, impressions_api, storage, bulk_size):
1113
"""
1214
Class constructor.
@@ -95,3 +97,95 @@ def synchronize_counters(self):
9597
except APIException:
9698
_LOGGER.error('Exception raised while reporting impression counts')
9799
_LOGGER.debug('Exception information: ', exc_info=True)
100+
101+
102+
class ImpressionSynchronizerAsync(object):
103+
"""Impressions async synchronizer class."""
104+
def __init__(self, impressions_api, storage, bulk_size):
105+
"""
106+
Class constructor.
107+
108+
:param impressions_api: Impressions Api object to send data to the backend
109+
:type impressions_api: splitio.api.impressions.ImpressionsAPI
110+
:param storage: Impressions Storage
111+
:type storage: splitio.storage.ImpressionsStorage
112+
:param bulk_size: How many impressions to send per push.
113+
:type bulk_size: int
114+
115+
"""
116+
self._api = impressions_api
117+
self._impression_storage = storage
118+
self._bulk_size = bulk_size
119+
self._failed = asyncio.Queue()
120+
121+
async def _get_failed(self):
122+
"""Return up to <BULK_SIZE> impressions stored in the failed impressions queue."""
123+
imps = []
124+
count = 0
125+
while count < self._bulk_size and self._failed.qsize() > 0:
126+
try:
127+
imps.append(await self._failed.get())
128+
count += 1
129+
except asyncio.QueueEmpty:
130+
# If no more items in queue, break the loop
131+
break
132+
return imps
133+
134+
async def _add_to_failed_queue(self, imps):
135+
"""
136+
Add impressions that were about to be sent to a secondary queue for failed sends.
137+
138+
:param imps: List of impressions that failed to be pushed.
139+
:type imps: list
140+
"""
141+
for impression in imps:
142+
await self._failed.put(impression)
143+
144+
async def synchronize_impressions(self):
145+
"""Send impressions from both the failed and new queues."""
146+
to_send = await self._get_failed()
147+
if len(to_send) < self._bulk_size:
148+
# If the amount of previously failed items is less than the bulk
149+
# size, try to complete with new impressions from storage
150+
to_send.extend(await self._impression_storage.pop_many(self._bulk_size - len(to_send)))
151+
152+
if not to_send:
153+
return
154+
155+
try:
156+
await self._api.flush_impressions(to_send)
157+
except APIException:
158+
_LOGGER.error('Exception raised while reporting impressions')
159+
_LOGGER.debug('Exception information: ', exc_info=True)
160+
await self._add_to_failed_queue(to_send)
161+
162+
163+
class ImpressionsCountSynchronizerAsync(object):
164+
def __init__(self, impressions_api, imp_counter):
165+
"""
166+
Class constructor.
167+
168+
:param impressions_api: Impressions Api object to send data to the backend
169+
:type impressions_api: splitio.api.impressions.ImpressionsAPI
170+
:param impressions_manager: Impressions manager instance
171+
:type impressions_manager: splitio.engine.impressions.Manager
172+
173+
"""
174+
self._impressions_api = impressions_api
175+
self._impressions_counter = imp_counter
176+
177+
async def synchronize_counters(self):
178+
"""Send impressions from both the failed and new queues."""
179+
180+
if self._impressions_counter == None:
181+
return
182+
183+
to_send = await self._impressions_counter.pop_all()
184+
if not to_send:
185+
return
186+
187+
try:
188+
await self._impressions_api.flush_counters(to_send)
189+
except APIException:
190+
_LOGGER.error('Exception raised while reporting impression counts')
191+
_LOGGER.debug('Exception information: ', exc_info=True)

tests/sync/test_impressions_count_synchronizer.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from splitio.engine.impressions.impressions import Manager as ImpressionsManager
1010
from splitio.engine.impressions.manager import Counter
1111
from splitio.engine.impressions.strategies import StrategyOptimizedMode
12-
from splitio.sync.impression import ImpressionsCountSynchronizer
12+
from splitio.sync.impression import ImpressionsCountSynchronizer, ImpressionsCountSynchronizerAsync
1313
from splitio.api.impressions import ImpressionsAPI
1414

1515

@@ -36,3 +36,40 @@ def test_synchronize_impressions_counts(self, mocker):
3636
assert api.flush_counters.mock_calls[0] == mocker.call(counters)
3737

3838
assert len(api.flush_counters.mock_calls) == 1
39+
40+
41+
class ImpressionsCountSynchronizerAsyncTests(object):
42+
"""ImpressionsCount synchronizer test cases."""
43+
44+
@pytest.mark.asyncio
45+
async def test_synchronize_impressions_counts(self, mocker):
46+
counter = mocker.Mock(spec=Counter)
47+
48+
self.called = 0
49+
async def pop_all():
50+
self.called += 1
51+
return [
52+
Counter.CountPerFeature('f1', 123, 2),
53+
Counter.CountPerFeature('f2', 123, 123),
54+
Counter.CountPerFeature('f1', 456, 111),
55+
Counter.CountPerFeature('f2', 456, 222)
56+
]
57+
counter.pop_all = pop_all
58+
59+
self.counters = None
60+
async def flush_counters(counters):
61+
self.counters = counters
62+
return HttpResponse(200, '', {})
63+
api = mocker.Mock(spec=ImpressionsAPI)
64+
api.flush_counters = flush_counters
65+
66+
impression_count_synchronizer = ImpressionsCountSynchronizerAsync(api, counter)
67+
await impression_count_synchronizer.synchronize_counters()
68+
69+
assert self.counters == [
70+
Counter.CountPerFeature('f1', 123, 2),
71+
Counter.CountPerFeature('f2', 123, 123),
72+
Counter.CountPerFeature('f1', 456, 111),
73+
Counter.CountPerFeature('f2', 456, 222)
74+
]
75+
assert self.called == 1

tests/sync/test_impressions_synchronizer.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from splitio.api import APIException
99
from splitio.storage import ImpressionStorage
1010
from splitio.models.impressions import Impression
11-
from splitio.sync.impression import ImpressionSynchronizer
11+
from splitio.sync.impression import ImpressionSynchronizer, ImpressionSynchronizerAsync
1212

1313

1414
class ImpressionsSynchronizerTests(object):
@@ -66,3 +66,68 @@ def run(x):
6666
impression_synchronizer.synchronize_impressions()
6767
assert run._called == 1
6868
assert impression_synchronizer._failed.qsize() == 0
69+
70+
71+
class ImpressionsSynchronizerAsyncTests(object):
72+
"""Impressions synchronizer test cases."""
73+
74+
@pytest.mark.asyncio
75+
async def test_synchronize_impressions_error(self, mocker):
76+
storage = mocker.Mock(spec=ImpressionStorage)
77+
async def pop_many(*args):
78+
return [
79+
Impression('key1', 'split1', 'on', 'l1', 123456, 'b1', 321654),
80+
Impression('key2', 'split1', 'on', 'l1', 123456, 'b1', 321654),
81+
]
82+
storage.pop_many = pop_many
83+
api = mocker.Mock()
84+
85+
async def run(x):
86+
raise APIException("something broke")
87+
api.flush_impressions = run
88+
89+
impression_synchronizer = ImpressionSynchronizerAsync(api, storage, 5)
90+
await impression_synchronizer.synchronize_impressions()
91+
assert impression_synchronizer._failed.qsize() == 2
92+
93+
@pytest.mark.asyncio
94+
async def test_synchronize_impressions_empty(self, mocker):
95+
storage = mocker.Mock(spec=ImpressionStorage)
96+
async def pop_many(*args):
97+
return []
98+
storage.pop_many = pop_many
99+
100+
api = mocker.Mock()
101+
102+
async def run(x):
103+
run._called += 1
104+
105+
run._called = 0
106+
api.flush_impressions = run
107+
impression_synchronizer = ImpressionSynchronizerAsync(api, storage, 5)
108+
await impression_synchronizer.synchronize_impressions()
109+
assert run._called == 0
110+
111+
@pytest.mark.asyncio
112+
async def test_synchronize_impressions(self, mocker):
113+
storage = mocker.Mock(spec=ImpressionStorage)
114+
async def pop_many(*args):
115+
return [
116+
Impression('key1', 'split1', 'on', 'l1', 123456, 'b1', 321654),
117+
Impression('key2', 'split1', 'on', 'l1', 123456, 'b1', 321654),
118+
]
119+
storage.pop_many = pop_many
120+
121+
api = mocker.Mock()
122+
123+
async def run(x):
124+
run._called += 1
125+
return HttpResponse(200, '', {})
126+
127+
api.flush_impressions = run
128+
run._called = 0
129+
130+
impression_synchronizer = ImpressionSynchronizerAsync(api, storage, 5)
131+
await impression_synchronizer.synchronize_impressions()
132+
assert run._called == 1
133+
assert impression_synchronizer._failed.qsize() == 0

0 commit comments

Comments
 (0)