Skip to content

Commit 2b02438

Browse files
committed
added workers
1 parent 702d309 commit 2b02438

File tree

1 file changed

+260
-0
lines changed

1 file changed

+260
-0
lines changed

splitio/push/workers.py

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
"""Segment changes processing worker."""
2+
import logging
3+
import threading
4+
import abc
5+
6+
from splitio.optional.loaders import asyncio
7+
8+
9+
_LOGGER = logging.getLogger(__name__)
10+
11+
class WorkerBase(object, metaclass=abc.ABCMeta):
12+
"""Worker template."""
13+
14+
@abc.abstractmethod
15+
def is_running(self):
16+
"""Return whether the working is running."""
17+
18+
@abc.abstractmethod
19+
def start(self):
20+
"""Start worker."""
21+
22+
@abc.abstractmethod
23+
def stop(self):
24+
"""Stop worker."""
25+
26+
class SegmentWorker(WorkerBase):
27+
"""Segment Worker for processing updates."""
28+
29+
_centinel = object()
30+
31+
def __init__(self, synchronize_segment, segment_queue):
32+
"""
33+
Class constructor.
34+
35+
:param synchronize_segment: handler to perform segment synchronization on incoming event
36+
:type synchronize_segment: function
37+
38+
:param segment_queue: queue with segment updates notifications
39+
:type segment_queue: queue
40+
"""
41+
self._segment_queue = segment_queue
42+
self._handler = synchronize_segment
43+
self._running = False
44+
self._worker = None
45+
46+
def is_running(self):
47+
"""Return whether the working is running."""
48+
return self._running
49+
50+
def _run(self):
51+
"""Run worker handler."""
52+
while self.is_running():
53+
event = self._segment_queue.get()
54+
if not self.is_running():
55+
break
56+
if event == self._centinel:
57+
continue
58+
_LOGGER.debug('Processing segment_update: %s, change_number: %d',
59+
event.segment_name, event.change_number)
60+
try:
61+
self._handler(event.segment_name, event.change_number)
62+
except Exception:
63+
_LOGGER.error('Exception raised in segment synchronization')
64+
_LOGGER.debug('Exception information: ', exc_info=True)
65+
66+
def start(self):
67+
"""Start worker."""
68+
if self.is_running():
69+
_LOGGER.debug('Worker is already running')
70+
return
71+
self._running = True
72+
73+
_LOGGER.debug('Starting Segment Worker')
74+
self._worker = threading.Thread(target=self._run, name='PushSegmentWorker', daemon=True)
75+
self._worker.start()
76+
77+
def stop(self):
78+
"""Stop worker."""
79+
_LOGGER.debug('Stopping Segment Worker')
80+
if not self.is_running():
81+
_LOGGER.debug('Worker is not running. Ignoring.')
82+
return
83+
self._running = False
84+
self._segment_queue.put(self._centinel)
85+
86+
class SegmentWorkerAsync(WorkerBase):
87+
"""Segment Worker for processing updates."""
88+
89+
_centinel = object()
90+
91+
def __init__(self, synchronize_segment, segment_queue):
92+
"""
93+
Class constructor.
94+
95+
:param synchronize_segment: handler to perform segment synchronization on incoming event
96+
:type synchronize_segment: function
97+
98+
:param segment_queue: queue with segment updates notifications
99+
:type segment_queue: asyncio.Queue
100+
"""
101+
self._segment_queue = segment_queue
102+
self._handler = synchronize_segment
103+
self._running = False
104+
105+
def is_running(self):
106+
"""Return whether the working is running."""
107+
return self._running
108+
109+
async def _run(self):
110+
"""Run worker handler."""
111+
while self.is_running():
112+
event = await self._segment_queue.get()
113+
if not self.is_running():
114+
break
115+
if event == self._centinel:
116+
continue
117+
_LOGGER.debug('Processing segment_update: %s, change_number: %d',
118+
event.segment_name, event.change_number)
119+
try:
120+
await self._handler(event.segment_name, event.change_number)
121+
except Exception:
122+
_LOGGER.error('Exception raised in segment synchronization')
123+
_LOGGER.debug('Exception information: ', exc_info=True)
124+
125+
def start(self):
126+
"""Start worker."""
127+
if self.is_running():
128+
_LOGGER.debug('Worker is already running')
129+
return
130+
self._running = True
131+
132+
_LOGGER.debug('Starting Segment Worker')
133+
asyncio.get_event_loop().create_task(self._run())
134+
135+
async def stop(self):
136+
"""Stop worker."""
137+
_LOGGER.debug('Stopping Segment Worker')
138+
if not self.is_running():
139+
_LOGGER.debug('Worker is not running. Ignoring.')
140+
return
141+
self._running = False
142+
await self._segment_queue.put(self._centinel)
143+
144+
class SplitWorker(WorkerBase):
145+
"""Feature Flag Worker for processing updates."""
146+
147+
_centinel = object()
148+
149+
def __init__(self, synchronize_feature_flag, feature_flag_queue):
150+
"""
151+
Class constructor.
152+
153+
:param synchronize_feature_flag: handler to perform feature flag synchronization on incoming event
154+
:type synchronize_feature_flag: callable
155+
156+
:param feature_flag_queue: queue with feature flag updates notifications
157+
:type feature_flag_queue: queue
158+
"""
159+
self._feature_flag_queue = feature_flag_queue
160+
self._handler = synchronize_feature_flag
161+
self._running = False
162+
self._worker = None
163+
164+
def is_running(self):
165+
"""Return whether the working is running."""
166+
return self._running
167+
168+
def _run(self):
169+
"""Run worker handler."""
170+
while self.is_running():
171+
event = self._feature_flag_queue.get()
172+
if not self.is_running():
173+
break
174+
if event == self._centinel:
175+
continue
176+
_LOGGER.debug('Processing feature flag update %d', event.change_number)
177+
try:
178+
self._handler(event.change_number)
179+
except Exception: # pylint: disable=broad-except
180+
_LOGGER.error('Exception raised in feature flag synchronization')
181+
_LOGGER.debug('Exception information: ', exc_info=True)
182+
183+
def start(self):
184+
"""Start worker."""
185+
if self.is_running():
186+
_LOGGER.debug('Worker is already running')
187+
return
188+
self._running = True
189+
190+
_LOGGER.debug('Starting Feature Flag Worker')
191+
self._worker = threading.Thread(target=self._run, name='PushFeatureFlagWorker', daemon=True)
192+
self._worker.start()
193+
194+
def stop(self):
195+
"""Stop worker."""
196+
_LOGGER.debug('Stopping Feature Flag Worker')
197+
if not self.is_running():
198+
_LOGGER.debug('Worker is not running')
199+
return
200+
self._running = False
201+
self._feature_flag_queue.put(self._centinel)
202+
203+
class SplitWorkerAsync(WorkerBase):
204+
"""Split Worker for processing updates."""
205+
206+
_centinel = object()
207+
208+
def __init__(self, synchronize_split, split_queue):
209+
"""
210+
Class constructor.
211+
212+
:param synchronize_split: handler to perform split synchronization on incoming event
213+
:type synchronize_split: callable
214+
215+
:param split_queue: queue with split updates notifications
216+
:type split_queue: queue
217+
"""
218+
self._split_queue = split_queue
219+
self._handler = synchronize_split
220+
self._running = False
221+
222+
def is_running(self):
223+
"""Return whether the working is running."""
224+
return self._running
225+
226+
async def _run(self):
227+
"""Run worker handler."""
228+
while self.is_running():
229+
_LOGGER.error("_run")
230+
event = await self._split_queue.get()
231+
if not self.is_running():
232+
break
233+
if event == self._centinel:
234+
continue
235+
_LOGGER.debug('Processing split_update %d', event.change_number)
236+
try:
237+
_LOGGER.error(event.change_number)
238+
await self._handler(event.change_number)
239+
except Exception: # pylint: disable=broad-except
240+
_LOGGER.error('Exception raised in split synchronization')
241+
_LOGGER.debug('Exception information: ', exc_info=True)
242+
243+
def start(self):
244+
"""Start worker."""
245+
if self.is_running():
246+
_LOGGER.debug('Worker is already running')
247+
return
248+
self._running = True
249+
250+
_LOGGER.debug('Starting Split Worker')
251+
asyncio.get_event_loop().create_task(self._run())
252+
253+
async def stop(self):
254+
"""Stop worker."""
255+
_LOGGER.debug('Stopping Split Worker')
256+
if not self.is_running():
257+
_LOGGER.debug('Worker is not running')
258+
return
259+
self._running = False
260+
await self._split_queue.put(self._centinel)

0 commit comments

Comments
 (0)