Skip to content

Commit 219194a

Browse files
authored
Merge pull request #382 from splitio/async-splitworker
Async splitworker
2 parents c1a9eb2 + 6e61275 commit 219194a

File tree

6 files changed

+155
-17
lines changed

6 files changed

+155
-17
lines changed

splitio/api/client.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,7 @@
44
import urllib
55
import abc
66

7-
try:
8-
import aiohttp
9-
except ImportError:
10-
def missing_asyncio_dependencies(*_, **__):
11-
"""Fail if missing dependencies are used."""
12-
raise NotImplementedError(
13-
'Missing aiohttp dependency. '
14-
'Please use `pip install splitio_client[asyncio]` to install the sdk with asyncio support'
15-
)
16-
aiohttp = missing_asyncio_dependencies
7+
from splitio.optional.loaders import aiohttp
178

189
SDK_URL = 'https://sdk.split.io/api'
1910
EVENTS_URL = 'https://events.split.io/api'
@@ -256,5 +247,5 @@ async def post(self, server, path, apikey, body, query=None, extra_headers=None)
256247
) as response:
257248
body = await response.text()
258249
return HttpResponse(response.status, body, response.headers)
259-
except Exception as exc: # pylint: disable=broad-except
250+
except aiohttp.ClientError as exc: # pylint: disable=broad-except
260251
raise HttpClientException('aiohttp library is throwing exceptions') from exc

splitio/optional/__init__.py

Whitespace-only changes.

splitio/optional/loaders.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
try:
2+
import asyncio
3+
import aiohttp
4+
except ImportError:
5+
def missing_asyncio_dependencies(*_, **__):
6+
"""Fail if missing dependencies are used."""
7+
raise NotImplementedError(
8+
'Missing aiohttp dependency. '
9+
'Please use `pip install splitio_client[asyncio]` to install the sdk with asyncio support'
10+
)
11+
aiohttp = missing_asyncio_dependencies
12+
asyncio = missing_asyncio_dependencies

splitio/push/splitworker.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,28 @@
11
"""Feature Flag changes processing worker."""
22
import logging
33
import threading
4+
import abc
45

6+
from splitio.optional.loaders import asyncio
57

68
_LOGGER = logging.getLogger(__name__)
79

10+
class SplitWorkerBase(object, metaclass=abc.ABCMeta):
11+
"""HttpClient wrapper template."""
812

9-
class SplitWorker(object):
13+
@abc.abstractmethod
14+
def is_running(self):
15+
"""Return whether the working is running."""
16+
17+
@abc.abstractmethod
18+
def start(self):
19+
"""Start worker."""
20+
21+
@abc.abstractmethod
22+
def stop(self):
23+
"""Stop worker."""
24+
25+
class SplitWorker(SplitWorkerBase):
1026
"""Feature Flag Worker for processing updates."""
1127

1228
_centinel = object()
@@ -64,3 +80,62 @@ def stop(self):
6480
return
6581
self._running = False
6682
self._feature_flag_queue.put(self._centinel)
83+
84+
class SplitWorkerAsync(SplitWorkerBase):
85+
"""Split Worker for processing updates."""
86+
87+
_centinel = object()
88+
89+
def __init__(self, synchronize_split, split_queue):
90+
"""
91+
Class constructor.
92+
93+
:param synchronize_split: handler to perform split synchronization on incoming event
94+
:type synchronize_split: callable
95+
96+
:param split_queue: queue with split updates notifications
97+
:type split_queue: queue
98+
"""
99+
self._split_queue = split_queue
100+
self._handler = synchronize_split
101+
self._running = False
102+
103+
def is_running(self):
104+
"""Return whether the working is running."""
105+
return self._running
106+
107+
async def _run(self):
108+
"""Run worker handler."""
109+
while self.is_running():
110+
_LOGGER.error("_run")
111+
event = await self._split_queue.get()
112+
if not self.is_running():
113+
break
114+
if event == self._centinel:
115+
continue
116+
_LOGGER.debug('Processing split_update %d', event.change_number)
117+
try:
118+
_LOGGER.error(event.change_number)
119+
await self._handler(event.change_number)
120+
except Exception: # pylint: disable=broad-except
121+
_LOGGER.error('Exception raised in split synchronization')
122+
_LOGGER.debug('Exception information: ', exc_info=True)
123+
124+
def start(self):
125+
"""Start worker."""
126+
if self.is_running():
127+
_LOGGER.debug('Worker is already running')
128+
return
129+
self._running = True
130+
131+
_LOGGER.debug('Starting Split Worker')
132+
asyncio.get_event_loop().create_task(self._run())
133+
134+
async def stop(self):
135+
"""Stop worker."""
136+
_LOGGER.debug('Stopping Split Worker')
137+
if not self.is_running():
138+
_LOGGER.debug('Worker is not running')
139+
return
140+
self._running = False
141+
await self._split_queue.put(self._centinel)

tests/api/test_httpclient.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ async def test_get(self, mocker):
166166
response_mock = MockResponse('ok', 200, {})
167167
get_mock = mocker.Mock()
168168
get_mock.return_value = response_mock
169-
mocker.patch('splitio.api.client.aiohttp.ClientSession.get', new=get_mock)
169+
mocker.patch('splitio.optional.loaders.aiohttp.ClientSession.get', new=get_mock)
170170
httpclient = client.HttpClientAsync()
171171
response = await httpclient.get('sdk', 'test1', 'some_api_key', {'param1': 123}, {'h1': 'abc'})
172172
assert response.status_code == 200
@@ -197,7 +197,7 @@ async def test_get_custom_urls(self, mocker):
197197
response_mock = MockResponse('ok', 200, {})
198198
get_mock = mocker.Mock()
199199
get_mock.return_value = response_mock
200-
mocker.patch('splitio.api.client.aiohttp.ClientSession.get', new=get_mock)
200+
mocker.patch('splitio.optional.loaders.aiohttp.ClientSession.get', new=get_mock)
201201
httpclient = client.HttpClientAsync(sdk_url='https://sdk.com', events_url='https://events.com')
202202
response = await httpclient.get('sdk', 'test1', 'some_api_key', {'param1': 123}, {'h1': 'abc'})
203203
call = mocker.call(
@@ -228,7 +228,7 @@ async def test_post(self, mocker):
228228
response_mock = MockResponse('ok', 200, {})
229229
get_mock = mocker.Mock()
230230
get_mock.return_value = response_mock
231-
mocker.patch('splitio.api.client.aiohttp.ClientSession.post', new=get_mock)
231+
mocker.patch('splitio.optional.loaders.aiohttp.ClientSession.post', new=get_mock)
232232
httpclient = client.HttpClientAsync()
233233
response = await httpclient.post('sdk', 'test1', 'some_api_key', {'p1': 'a'}, {'param1': 123}, {'h1': 'abc'})
234234
call = mocker.call(
@@ -260,7 +260,7 @@ async def test_post_custom_urls(self, mocker):
260260
response_mock = MockResponse('ok', 200, {})
261261
get_mock = mocker.Mock()
262262
get_mock.return_value = response_mock
263-
mocker.patch('splitio.api.client.aiohttp.ClientSession.post', new=get_mock)
263+
mocker.patch('splitio.optional.loaders.aiohttp.ClientSession.post', new=get_mock)
264264
httpclient = client.HttpClientAsync(sdk_url='https://sdk.com', events_url='https://events.com')
265265
response = await httpclient.post('sdk', 'test1', 'some_api_key', {'p1': 'a'}, {'param1': 123}, {'h1': 'abc'})
266266
call = mocker.call(

tests/push/test_split_worker.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
import pytest
55

66
from splitio.api import APIException
7-
from splitio.push.splitworker import SplitWorker
7+
from splitio.push.splitworker import SplitWorker, SplitWorkerAsync
88
from splitio.models.notification import SplitChangeNotification
9+
from splitio.optional.loaders import asyncio
910

1011
change_number_received = None
1112

@@ -15,6 +16,11 @@ def handler_sync(change_number):
1516
change_number_received = change_number
1617
return
1718

19+
async def handler_async(change_number):
20+
global change_number_received
21+
change_number_received = change_number
22+
return
23+
1824

1925
class SplitWorkerTests(object):
2026

@@ -55,3 +61,57 @@ def test_handler(self):
5561

5662
split_worker.stop()
5763
assert not split_worker.is_running()
64+
65+
class SplitWorkerAsyncTests(object):
66+
67+
async def test_on_error(self):
68+
q = asyncio.Queue()
69+
70+
def handler_sync(change_number):
71+
raise APIException('some')
72+
73+
split_worker = SplitWorkerAsync(handler_sync, q)
74+
split_worker.start()
75+
assert split_worker.is_running()
76+
77+
await q.put(SplitChangeNotification('some', 'SPLIT_UPDATE', 123456789))
78+
with pytest.raises(Exception):
79+
split_worker._handler()
80+
81+
assert split_worker.is_running()
82+
assert(self._worker_running())
83+
84+
await split_worker.stop()
85+
await asyncio.sleep(.1)
86+
87+
assert not split_worker.is_running()
88+
assert(not self._worker_running())
89+
90+
def _worker_running(self):
91+
worker_running = False
92+
for task in asyncio.Task.all_tasks():
93+
if task._coro.cr_code.co_name == '_run' and not task.done():
94+
worker_running = True
95+
break
96+
return worker_running
97+
98+
async def test_handler(self):
99+
q = asyncio.Queue()
100+
split_worker = SplitWorkerAsync(handler_async, q)
101+
102+
assert not split_worker.is_running()
103+
split_worker.start()
104+
assert split_worker.is_running()
105+
assert(self._worker_running())
106+
107+
global change_number_received
108+
await q.put(SplitChangeNotification('some', 'SPLIT_UPDATE', 123456789))
109+
await asyncio.sleep(1)
110+
111+
assert change_number_received == 123456789
112+
113+
await split_worker.stop()
114+
await asyncio.sleep(.1)
115+
116+
assert not split_worker.is_running()
117+
assert(not self._worker_running())

0 commit comments

Comments
 (0)