Skip to content

Commit 2abbec7

Browse files
committed
Updated SegmentWorker
1 parent 219194a commit 2abbec7

File tree

2 files changed

+127
-2
lines changed

2 files changed

+127
-2
lines changed

splitio/push/segmentworker.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,29 @@
11
"""Segment changes processing worker."""
22
import logging
33
import threading
4+
import abc
5+
6+
from splitio.optional.loaders import asyncio
47

58

69
_LOGGER = logging.getLogger(__name__)
710

11+
class SegmentWorkerBase(object, metaclass=abc.ABCMeta):
12+
"""HttpClient wrapper template."""
13+
14+
@abc.abstractmethod
15+
def is_running(self):
16+
"""Return whether the working is running."""
17+
18+
@abc.abstractmethod
19+
def start(self):
20+
"""Start worker."""
821

9-
class SegmentWorker(object):
22+
@abc.abstractmethod
23+
def stop(self):
24+
"""Stop worker."""
25+
26+
class SegmentWorker(SegmentWorkerBase):
1027
"""Segment Worker for processing updates."""
1128

1229
_centinel = object()
@@ -65,3 +82,61 @@ def stop(self):
6582
return
6683
self._running = False
6784
self._segment_queue.put(self._centinel)
85+
86+
class SegmentWorkerAsync(SegmentWorkerBase):
87+
"""Segment Worker for processing updates."""
88+
89+
_centinel = object()
90+
91+
def __init__(self, synchronize_segment, segment_queue):
92+
"""
93+
Class constructor.
94+
95+
:param synchronize_segment: handler to perform segment synchronization on incoming event
96+
:type synchronize_segment: function
97+
98+
:param segment_queue: queue with segment updates notifications
99+
:type segment_queue: asyncio.Queue
100+
"""
101+
self._segment_queue = segment_queue
102+
self._handler = synchronize_segment
103+
self._running = False
104+
105+
def is_running(self):
106+
"""Return whether the working is running."""
107+
return self._running
108+
109+
async def _run(self):
110+
"""Run worker handler."""
111+
while self.is_running():
112+
event = await self._segment_queue.get()
113+
if not self.is_running():
114+
break
115+
if event == self._centinel:
116+
continue
117+
_LOGGER.debug('Processing segment_update: %s, change_number: %d',
118+
event.segment_name, event.change_number)
119+
try:
120+
await self._handler(event.segment_name, event.change_number)
121+
except Exception:
122+
_LOGGER.error('Exception raised in segment synchronization')
123+
_LOGGER.debug('Exception information: ', exc_info=True)
124+
125+
def start(self):
126+
"""Start worker."""
127+
if self.is_running():
128+
_LOGGER.debug('Worker is already running')
129+
return
130+
self._running = True
131+
132+
_LOGGER.debug('Starting Segment Worker')
133+
asyncio.get_event_loop().create_task(self._run())
134+
135+
async def stop(self):
136+
"""Stop worker."""
137+
_LOGGER.debug('Stopping Segment Worker')
138+
if not self.is_running():
139+
_LOGGER.debug('Worker is not running. Ignoring.')
140+
return
141+
self._running = False
142+
await self._segment_queue.put(self._centinel)

tests/push/test_segment_worker.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
import pytest
55

66
from splitio.api import APIException
7-
from splitio.push.segmentworker import SegmentWorker
7+
from splitio.push.segmentworker import SegmentWorker, SegmentWorkerAsync
88
from splitio.models.notification import SegmentChangeNotification
9+
from splitio.optional.loaders import asyncio
910

1011
change_number_received = None
1112
segment_name_received = None
@@ -58,3 +59,52 @@ def test_handler(self):
5859

5960
segment_worker.stop()
6061
assert not segment_worker.is_running()
62+
63+
class SegmentWorkerAsyncTests(object):
64+
async def test_on_error(self):
65+
q = asyncio.Queue()
66+
67+
def handler_sync(change_number):
68+
raise APIException('some')
69+
70+
segment_worker = SegmentWorkerAsync(handler_sync, q)
71+
segment_worker.start()
72+
assert segment_worker.is_running()
73+
74+
await q.put(SegmentChangeNotification('some', 'SEGMENT_UPDATE', 123456789, 'some'))
75+
76+
with pytest.raises(Exception):
77+
segment_worker._handler()
78+
79+
assert segment_worker.is_running()
80+
assert(self._worker_running())
81+
await segment_worker.stop()
82+
await asyncio.sleep(.1)
83+
assert not segment_worker.is_running()
84+
assert(not self._worker_running())
85+
86+
def _worker_running(self):
87+
worker_running = False
88+
for task in asyncio.Task.all_tasks():
89+
if task._coro.cr_code.co_name == '_run' and not task.done():
90+
worker_running = True
91+
break
92+
return worker_running
93+
94+
async def test_handler(self):
95+
q = asyncio.Queue()
96+
segment_worker = SegmentWorkerAsync(handler_sync, q)
97+
global change_number_received
98+
assert not segment_worker.is_running()
99+
segment_worker.start()
100+
assert segment_worker.is_running()
101+
102+
await q.put(SegmentChangeNotification('some', 'SEGMENT_UPDATE', 123456789, 'some'))
103+
104+
await asyncio.sleep(.1)
105+
assert change_number_received == 123456789
106+
assert segment_name_received == 'some'
107+
108+
await segment_worker.stop()
109+
await asyncio.sleep(.1)
110+
assert(not self._worker_running())

0 commit comments

Comments
 (0)