Skip to content

Commit 4bc41c7

Browse files
authored
Merge pull request #421 from splitio/async-sync-events
Added sync event async class
2 parents fe59457 + 093b15f commit 4bc41c7

File tree

2 files changed

+127
-2
lines changed

2 files changed

+127
-2
lines changed

splitio/sync/event.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
import queue
33

44
from splitio.api import APIException
5-
5+
from splitio.optional.loaders import asyncio
66

77
_LOGGER = logging.getLogger(__name__)
88

99

1010
class EventSynchronizer(object):
11+
"""Event Synchronizer class"""
1112
def __init__(self, events_api, storage, bulk_size):
1213
"""
1314
Class constructor.
@@ -65,3 +66,64 @@ def synchronize_events(self):
6566
_LOGGER.error('Exception raised while reporting events')
6667
_LOGGER.debug('Exception information: ', exc_info=True)
6768
self._add_to_failed_queue(to_send)
69+
70+
71+
class EventSynchronizerAsync(object):
72+
"""Event Synchronizer async class"""
73+
def __init__(self, events_api, storage, bulk_size):
74+
"""
75+
Class constructor.
76+
77+
:param events_api: Events Api object to send data to the backend
78+
:type events_api: splitio.api.events.EventsAPI
79+
:param storage: Events Storage
80+
:type storage: splitio.storage.EventStorage
81+
:param bulk_size: How many events to send per push.
82+
:type bulk_size: int
83+
84+
"""
85+
self._api = events_api
86+
self._event_storage = storage
87+
self._bulk_size = bulk_size
88+
self._failed = asyncio.Queue()
89+
90+
async def _get_failed(self):
91+
"""Return up to <BULK_SIZE> events stored in the failed eventes queue."""
92+
events = []
93+
count = 0
94+
while count < self._bulk_size and self._failed.qsize() > 0:
95+
try:
96+
events.append(await self._failed.get())
97+
count += 1
98+
except asyncio.QueueEmpty:
99+
# If no more items in queue, break the loop
100+
break
101+
return events
102+
103+
async def _add_to_failed_queue(self, events):
104+
"""
105+
Add events that were about to be sent to a secondary queue for failed sends.
106+
107+
:param events: List of events that failed to be pushed.
108+
:type events: list
109+
"""
110+
for event in events:
111+
await self._failed.put(event)
112+
113+
async def synchronize_events(self):
114+
"""Send events from both the failed and new queues."""
115+
to_send = await self._get_failed()
116+
if len(to_send) < self._bulk_size:
117+
# If the amount of previously failed items is less than the bulk
118+
# size, try to complete with new events from storage
119+
to_send.extend(await self._event_storage.pop_many(self._bulk_size - len(to_send)))
120+
121+
if not to_send:
122+
return
123+
124+
try:
125+
await self._api.flush_events(to_send)
126+
except APIException:
127+
_LOGGER.error('Exception raised while reporting events')
128+
_LOGGER.debug('Exception information: ', exc_info=True)
129+
await self._add_to_failed_queue(to_send)

tests/sync/test_events_synchronizer.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from splitio.api import APIException
99
from splitio.storage import EventStorage
1010
from splitio.models.events import Event
11-
from splitio.sync.event import EventSynchronizer
11+
from splitio.sync.event import EventSynchronizer, EventSynchronizerAsync
1212

1313

1414
class EventsSynchronizerTests(object):
@@ -66,3 +66,66 @@ def run(x):
6666
event_synchronizer.synchronize_events()
6767
assert run._called == 1
6868
assert event_synchronizer._failed.qsize() == 0
69+
70+
71+
class EventsSynchronizerAsyncTests(object):
72+
"""Events synchronizer async test cases."""
73+
74+
@pytest.mark.asyncio
75+
async def test_synchronize_events_error(self, mocker):
76+
storage = mocker.Mock(spec=EventStorage)
77+
async def pop_many(*args):
78+
return [
79+
Event('key1', 'user', 'purchase', 5.3, 123456, None),
80+
Event('key2', 'user', 'purchase', 5.3, 123456, None),
81+
]
82+
storage.pop_many = pop_many
83+
84+
api = mocker.Mock()
85+
async def run(x):
86+
raise APIException("something broke")
87+
88+
api.flush_events = run
89+
event_synchronizer = EventSynchronizerAsync(api, storage, 5)
90+
await event_synchronizer.synchronize_events()
91+
assert event_synchronizer._failed.qsize() == 2
92+
93+
@pytest.mark.asyncio
94+
async def test_synchronize_events_empty(self, mocker):
95+
storage = mocker.Mock(spec=EventStorage)
96+
async def pop_many(*args):
97+
return []
98+
storage.pop_many = pop_many
99+
100+
api = mocker.Mock()
101+
async def run(x):
102+
run._called += 1
103+
104+
run._called = 0
105+
api.flush_events = run
106+
event_synchronizer = EventSynchronizerAsync(api, storage, 5)
107+
await event_synchronizer.synchronize_events()
108+
assert run._called == 0
109+
110+
@pytest.mark.asyncio
111+
async def test_synchronize_impressions(self, mocker):
112+
storage = mocker.Mock(spec=EventStorage)
113+
async def pop_many(*args):
114+
return [
115+
Event('key1', 'user', 'purchase', 5.3, 123456, None),
116+
Event('key2', 'user', 'purchase', 5.3, 123456, None),
117+
]
118+
storage.pop_many = pop_many
119+
120+
api = mocker.Mock()
121+
async def run(x):
122+
run._called += 1
123+
return HttpResponse(200, '', {})
124+
125+
api.flush_events.side_effect = run
126+
run._called = 0
127+
128+
event_synchronizer = EventSynchronizerAsync(api, storage, 5)
129+
await event_synchronizer.synchronize_events()
130+
assert run._called == 1
131+
assert event_synchronizer._failed.qsize() == 0

0 commit comments

Comments
 (0)