Skip to content

Commit 6f18570

Browse files
authored
Merge pull request #388 from splitio/async-processor
Added processor async class
2 parents 6655769 + b9107f3 commit 6f18570

File tree

3 files changed

+165
-7
lines changed

3 files changed

+165
-7
lines changed

splitio/push/processor.py

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,28 @@
11
"""Message processor & Notification manager keeper implementations."""
22

33
from queue import Queue
4+
import abc
45

56
from splitio.push.parser import UpdateType
6-
from splitio.push.workers import SplitWorker
7-
from splitio.push.workers import SegmentWorker
7+
from splitio.push.workers import SplitWorker, SplitWorkerAsync, SegmentWorker, SegmentWorkerAsync
8+
from splitio.optional.loaders import asyncio
89

10+
class MessageProcessorBase(object, metaclass=abc.ABCMeta):
11+
"""Message processor template."""
912

10-
class MessageProcessor(object):
13+
@abc.abstractmethod
14+
def update_workers_status(self, enabled):
15+
"""Enable/Disable push update workers."""
16+
17+
@abc.abstractmethod
18+
def handle(self, event):
19+
"""Handle incoming update event."""
20+
21+
@abc.abstractmethod
22+
def shutdown(self):
23+
"""Stop splits & segments workers."""
24+
25+
class MessageProcessor(MessageProcessorBase):
1126
"""Message processor class."""
1227

1328
def __init__(self, synchronizer):
@@ -89,3 +104,87 @@ def shutdown(self):
89104
"""Stop splits & segments workers."""
90105
self._split_worker.stop()
91106
self._segments_worker.stop()
107+
108+
109+
class MessageProcessorAsync(MessageProcessorBase):
110+
"""Message processor class."""
111+
112+
def __init__(self, synchronizer):
113+
"""
114+
Class constructor.
115+
116+
:param synchronizer: synchronizer component
117+
:type synchronizer: splitio.sync.synchronizer.Synchronizer
118+
"""
119+
self._split_queue = asyncio.Queue()
120+
self._segments_queue = asyncio.Queue()
121+
self._synchronizer = synchronizer
122+
self._split_worker = SplitWorkerAsync(synchronizer.synchronize_splits, self._split_queue)
123+
self._segments_worker = SegmentWorkerAsync(synchronizer.synchronize_segment, self._segments_queue)
124+
self._handlers = {
125+
UpdateType.SPLIT_UPDATE: self._handle_split_update,
126+
UpdateType.SPLIT_KILL: self._handle_split_kill,
127+
UpdateType.SEGMENT_UPDATE: self._handle_segment_change
128+
}
129+
130+
async def _handle_split_update(self, event):
131+
"""
132+
Handle incoming split update notification.
133+
134+
:param event: Incoming split change event
135+
:type event: splitio.push.parser.SplitChangeUpdate
136+
"""
137+
await self._split_queue.put(event)
138+
139+
async def _handle_split_kill(self, event):
140+
"""
141+
Handle incoming split kill notification.
142+
143+
:param event: Incoming split kill event
144+
:type event: splitio.push.parser.SplitKillUpdate
145+
"""
146+
await self._synchronizer.kill_split(event.split_name, event.default_treatment,
147+
event.change_number)
148+
await self._split_queue.put(event)
149+
150+
async def _handle_segment_change(self, event):
151+
"""
152+
Handle incoming segment update notification.
153+
154+
:param event: Incoming segment change event
155+
:type event: splitio.push.parser.Update
156+
"""
157+
await self._segments_queue.put(event)
158+
159+
async def update_workers_status(self, enabled):
160+
"""
161+
Enable/Disable push update workers.
162+
163+
:param enabled: if True, enable workers. If False, disable them.
164+
:type enabled: bool
165+
"""
166+
if enabled:
167+
self._split_worker.start()
168+
self._segments_worker.start()
169+
else:
170+
await self._split_worker.stop()
171+
await self._segments_worker.stop()
172+
173+
async def handle(self, event):
174+
"""
175+
Handle incoming update event.
176+
177+
:param event: incoming data update event.
178+
:type event: splitio.push.BaseUpdate
179+
"""
180+
try:
181+
handle = self._handlers[event.update_type]
182+
except KeyError as exc:
183+
raise Exception('no handler for notification type: %s' % event.update_type) from exc
184+
185+
await handle(event)
186+
187+
async def shutdown(self):
188+
"""Stop splits & segments workers."""
189+
await self._split_worker.stop()
190+
await self._segments_worker.stop()

splitio/push/workers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def start(self):
130130
self._running = True
131131

132132
_LOGGER.debug('Starting Segment Worker')
133-
asyncio.get_event_loop().create_task(self._run())
133+
asyncio.get_running_loop().create_task(self._run())
134134

135135
async def stop(self):
136136
"""Stop worker."""
@@ -248,7 +248,7 @@ def start(self):
248248
self._running = True
249249

250250
_LOGGER.debug('Starting Split Worker')
251-
asyncio.get_event_loop().create_task(self._run())
251+
asyncio.get_running_loop().create_task(self._run())
252252

253253
async def stop(self):
254254
"""Stop worker."""

tests/push/test_processor.py

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
"""Message processor tests."""
22
from queue import Queue
3-
from splitio.push.processor import MessageProcessor
4-
from splitio.sync.synchronizer import Synchronizer
3+
import pytest
4+
5+
from splitio.push.processor import MessageProcessor, MessageProcessorAsync
6+
from splitio.sync.synchronizer import Synchronizer, SynchronizerAsync
57
from splitio.push.parser import SplitChangeUpdate, SegmentChangeUpdate, SplitKillUpdate
8+
from splitio.optional.loaders import asyncio
69

710

811
class ProcessorTests(object):
@@ -56,3 +59,59 @@ def test_segment_change(self, mocker):
5659
def test_todo(self):
5760
"""Fix previous tests so that we validate WHICH queue the update is pushed into."""
5861
assert NotImplementedError("DO THAT")
62+
63+
class ProcessorAsyncTests(object):
64+
"""Message processor test cases."""
65+
66+
@pytest.mark.asyncio
67+
async def test_split_change(self, mocker):
68+
"""Test split change is properly handled."""
69+
sync_mock = mocker.Mock(spec=Synchronizer)
70+
self._update = None
71+
async def put_mock(first, event):
72+
self._update = event
73+
74+
mocker.patch('splitio.push.processor.asyncio.Queue.put', new=put_mock)
75+
processor = MessageProcessorAsync(sync_mock)
76+
update = SplitChangeUpdate('sarasa', 123, 123)
77+
await processor.handle(update)
78+
assert update == self._update
79+
80+
@pytest.mark.asyncio
81+
async def test_split_kill(self, mocker):
82+
"""Test split kill is properly handled."""
83+
84+
self._killed_split = None
85+
async def kill_mock(se, split_name, default_treatment, change_number):
86+
self._killed_split = (split_name, default_treatment, change_number)
87+
88+
mocker.patch('splitio.sync.synchronizer.SynchronizerAsync.kill_split', new=kill_mock)
89+
sync_mock = SynchronizerAsync()
90+
91+
self._update = None
92+
async def put_mock(first, event):
93+
self._update = event
94+
95+
mocker.patch('splitio.push.processor.asyncio.Queue.put', new=put_mock)
96+
processor = MessageProcessorAsync(sync_mock)
97+
update = SplitKillUpdate('sarasa', 123, 456, 'some_split', 'off')
98+
await processor.handle(update)
99+
assert update == self._update
100+
assert ('some_split', 'off', 456) == self._killed_split
101+
102+
@pytest.mark.asyncio
103+
async def test_segment_change(self, mocker):
104+
"""Test segment change is properly handled."""
105+
106+
sync_mock = SynchronizerAsync()
107+
queue_mock = mocker.Mock(spec=asyncio.Queue)
108+
109+
self._update = None
110+
async def put_mock(first, event):
111+
self._update = event
112+
113+
mocker.patch('splitio.push.processor.asyncio.Queue.put', new=put_mock)
114+
processor = MessageProcessorAsync(sync_mock)
115+
update = SegmentChangeUpdate('sarasa', 123, 123, 'some_segment')
116+
await processor.handle(update)
117+
assert update == self._update

0 commit comments

Comments
 (0)