Skip to content

Commit c584967

Browse files
authored
Merge pull request #384 from splitio/async-segment-worker
Updated SegmentWorker
2 parents 219194a + 2b02438 commit c584967

File tree

5 files changed

+178
-76
lines changed

5 files changed

+178
-76
lines changed

splitio/push/processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
from queue import Queue
44

55
from splitio.push.parser import UpdateType
6-
from splitio.push.splitworker import SplitWorker
7-
from splitio.push.segmentworker import SegmentWorker
6+
from splitio.push.workers import SplitWorker
7+
from splitio.push.workers import SegmentWorker
88

99

1010
class MessageProcessor(object):

splitio/push/segmentworker.py

Lines changed: 0 additions & 67 deletions
This file was deleted.
Lines changed: 124 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
"""Feature Flag changes processing worker."""
1+
"""Segment changes processing worker."""
22
import logging
33
import threading
44
import abc
55

66
from splitio.optional.loaders import asyncio
77

8+
89
_LOGGER = logging.getLogger(__name__)
910

10-
class SplitWorkerBase(object, metaclass=abc.ABCMeta):
11-
"""HttpClient wrapper template."""
11+
class WorkerBase(object, metaclass=abc.ABCMeta):
12+
"""Worker template."""
1213

1314
@abc.abstractmethod
1415
def is_running(self):
@@ -22,7 +23,125 @@ def start(self):
2223
def stop(self):
2324
"""Stop worker."""
2425

25-
class SplitWorker(SplitWorkerBase):
26+
class SegmentWorker(WorkerBase):
27+
"""Segment Worker for processing updates."""
28+
29+
_centinel = object()
30+
31+
def __init__(self, synchronize_segment, segment_queue):
32+
"""
33+
Class constructor.
34+
35+
:param synchronize_segment: handler to perform segment synchronization on incoming event
36+
:type synchronize_segment: function
37+
38+
:param segment_queue: queue with segment updates notifications
39+
:type segment_queue: queue
40+
"""
41+
self._segment_queue = segment_queue
42+
self._handler = synchronize_segment
43+
self._running = False
44+
self._worker = None
45+
46+
def is_running(self):
47+
"""Return whether the working is running."""
48+
return self._running
49+
50+
def _run(self):
51+
"""Run worker handler."""
52+
while self.is_running():
53+
event = self._segment_queue.get()
54+
if not self.is_running():
55+
break
56+
if event == self._centinel:
57+
continue
58+
_LOGGER.debug('Processing segment_update: %s, change_number: %d',
59+
event.segment_name, event.change_number)
60+
try:
61+
self._handler(event.segment_name, event.change_number)
62+
except Exception:
63+
_LOGGER.error('Exception raised in segment synchronization')
64+
_LOGGER.debug('Exception information: ', exc_info=True)
65+
66+
def start(self):
67+
"""Start worker."""
68+
if self.is_running():
69+
_LOGGER.debug('Worker is already running')
70+
return
71+
self._running = True
72+
73+
_LOGGER.debug('Starting Segment Worker')
74+
self._worker = threading.Thread(target=self._run, name='PushSegmentWorker', daemon=True)
75+
self._worker.start()
76+
77+
def stop(self):
78+
"""Stop worker."""
79+
_LOGGER.debug('Stopping Segment Worker')
80+
if not self.is_running():
81+
_LOGGER.debug('Worker is not running. Ignoring.')
82+
return
83+
self._running = False
84+
self._segment_queue.put(self._centinel)
85+
86+
class SegmentWorkerAsync(WorkerBase):
87+
"""Segment Worker for processing updates."""
88+
89+
_centinel = object()
90+
91+
def __init__(self, synchronize_segment, segment_queue):
92+
"""
93+
Class constructor.
94+
95+
:param synchronize_segment: handler to perform segment synchronization on incoming event
96+
:type synchronize_segment: function
97+
98+
:param segment_queue: queue with segment updates notifications
99+
:type segment_queue: asyncio.Queue
100+
"""
101+
self._segment_queue = segment_queue
102+
self._handler = synchronize_segment
103+
self._running = False
104+
105+
def is_running(self):
106+
"""Return whether the working is running."""
107+
return self._running
108+
109+
async def _run(self):
110+
"""Run worker handler."""
111+
while self.is_running():
112+
event = await self._segment_queue.get()
113+
if not self.is_running():
114+
break
115+
if event == self._centinel:
116+
continue
117+
_LOGGER.debug('Processing segment_update: %s, change_number: %d',
118+
event.segment_name, event.change_number)
119+
try:
120+
await self._handler(event.segment_name, event.change_number)
121+
except Exception:
122+
_LOGGER.error('Exception raised in segment synchronization')
123+
_LOGGER.debug('Exception information: ', exc_info=True)
124+
125+
def start(self):
126+
"""Start worker."""
127+
if self.is_running():
128+
_LOGGER.debug('Worker is already running')
129+
return
130+
self._running = True
131+
132+
_LOGGER.debug('Starting Segment Worker')
133+
asyncio.get_event_loop().create_task(self._run())
134+
135+
async def stop(self):
136+
"""Stop worker."""
137+
_LOGGER.debug('Stopping Segment Worker')
138+
if not self.is_running():
139+
_LOGGER.debug('Worker is not running. Ignoring.')
140+
return
141+
self._running = False
142+
await self._segment_queue.put(self._centinel)
143+
144+
class SplitWorker(WorkerBase):
26145
"""Feature Flag Worker for processing updates."""
27146

28147
_centinel = object()
@@ -81,7 +200,7 @@ def stop(self):
81200
self._running = False
82201
self._feature_flag_queue.put(self._centinel)
83202

84-
class SplitWorkerAsync(SplitWorkerBase):
203+
class SplitWorkerAsync(WorkerBase):
85204
"""Split Worker for processing updates."""
86205

87206
_centinel = object()

tests/push/test_segment_worker.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
import pytest
55

66
from splitio.api import APIException
7-
from splitio.push.segmentworker import SegmentWorker
7+
from splitio.push.workers import SegmentWorker, SegmentWorkerAsync
88
from splitio.models.notification import SegmentChangeNotification
9+
from splitio.optional.loaders import asyncio
910

1011
change_number_received = None
1112
segment_name_received = None
@@ -58,3 +59,52 @@ def test_handler(self):
5859

5960
segment_worker.stop()
6061
assert not segment_worker.is_running()
62+
63+
class SegmentWorkerAsyncTests(object):
64+
async def test_on_error(self):
65+
q = asyncio.Queue()
66+
67+
def handler_sync(change_number):
68+
raise APIException('some')
69+
70+
segment_worker = SegmentWorkerAsync(handler_sync, q)
71+
segment_worker.start()
72+
assert segment_worker.is_running()
73+
74+
await q.put(SegmentChangeNotification('some', 'SEGMENT_UPDATE', 123456789, 'some'))
75+
76+
with pytest.raises(Exception):
77+
segment_worker._handler()
78+
79+
assert segment_worker.is_running()
80+
assert(self._worker_running())
81+
await segment_worker.stop()
82+
await asyncio.sleep(.1)
83+
assert not segment_worker.is_running()
84+
assert(not self._worker_running())
85+
86+
def _worker_running(self):
87+
worker_running = False
88+
for task in asyncio.Task.all_tasks():
89+
if task._coro.cr_code.co_name == '_run' and not task.done():
90+
worker_running = True
91+
break
92+
return worker_running
93+
94+
async def test_handler(self):
95+
q = asyncio.Queue()
96+
segment_worker = SegmentWorkerAsync(handler_sync, q)
97+
global change_number_received
98+
assert not segment_worker.is_running()
99+
segment_worker.start()
100+
assert segment_worker.is_running()
101+
102+
await q.put(SegmentChangeNotification('some', 'SEGMENT_UPDATE', 123456789, 'some'))
103+
104+
await asyncio.sleep(.1)
105+
assert change_number_received == 123456789
106+
assert segment_name_received == 'some'
107+
108+
await segment_worker.stop()
109+
await asyncio.sleep(.1)
110+
assert(not self._worker_running())

tests/push/test_split_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import pytest
55

66
from splitio.api import APIException
7-
from splitio.push.splitworker import SplitWorker, SplitWorkerAsync
7+
from splitio.push.workers import SplitWorker, SplitWorkerAsync
88
from splitio.models.notification import SplitChangeNotification
99
from splitio.optional.loaders import asyncio
1010

0 commit comments

Comments
 (0)