Skip to content

Commit 903f9e3

Browse files
committed
more integration tests and UT fixing
1 parent 74b450d commit 903f9e3

File tree

7 files changed

+590
-124
lines changed

7 files changed

+590
-124
lines changed

splitio/push/manager.py

Lines changed: 95 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -51,84 +51,46 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sse_url=None):
5151
MessageType.OCCUPANCY: self._handle_occupancy
5252
}
5353

54-
self._sse_client = SplitSSEClient(self._event_handler) if sse_url is None \
55-
else SplitSSEClient(self._event_handler, sse_url)
54+
kwargs = {} if sse_url is None else {'base_url': sse_url}
55+
self._sse_client = SplitSSEClient(self._event_handler, self._handle_connection_ready,
56+
self._handle_connection_end, **kwargs)
5657
self._running = False
5758
self._next_refresh = Timer(0, lambda: 0)
5859

59-
def _handle_message(self, event):
60-
"""
61-
Handle incoming update message.
62-
63-
:param event: Incoming Update message
64-
:type event: splitio.push.sse.parser.Update
65-
"""
66-
try:
67-
handle = self._message_handlers[event.message_type]
68-
except KeyError:
69-
_LOGGER.error('no handler for message of type %s', event.message_type)
70-
_LOGGER.debug(str(event), exc_info=True)
71-
return
72-
73-
handle(event)
74-
75-
def _handle_update(self, event):
76-
"""
77-
Handle incoming update message.
78-
79-
:param event: Incoming Update message
80-
:type event: splitio.push.sse.parser.Update
81-
"""
82-
_LOGGER.debug('handling update event: %s', str(event))
83-
self._processor.handle(event)
84-
85-
def _handle_control(self, event):
60+
def update_workers_status(self, enabled):
8661
"""
87-
Handle incoming control message.
62+
Enable/Disable push update workers.
8863
89-
:param event: Incoming control message.
90-
:type event: splitio.push.sse.parser.ControlMessage
64+
:param enabled: if True, enable workers. If False, disable them.
65+
:type enabled: bool
9166
"""
92-
_LOGGER.debug('handling control event: %s', str(event))
93-
feedback = self._status_tracker.handle_control_message(event)
94-
if feedback is not None:
95-
self._feedback_loop.put(feedback)
67+
self._processor.update_workers_status(enabled)
9668

97-
def _handle_occupancy(self, event):
98-
"""
99-
Handle incoming notification message.
10069

101-
:param event: Incoming occupancy message.
102-
:type event: splitio.push.sse.parser.Occupancy
103-
"""
104-
_LOGGER.debug('handling occupancy event: %s', str(event))
105-
feedback = self._status_tracker.handle_occupancy(event)
106-
if feedback is not None:
107-
self._feedback_loop.put(feedback)
108-
109-
def _handle_connection_end(self, shutdown_requested):
110-
"""
111-
Handle a connection ending.
70+
def start(self):
71+
"""Start a new connection if not already running."""
72+
if self._running:
73+
_LOGGER.warning('Push manager already has a connection running. Ignoring')
74+
return
11275

113-
If the connection shutdown was not requested, trigger a restart.
76+
self._trigger_connection_flow()
11477

115-
:param shutdown_requested: whether the shutdown was requested or unexpected.
116-
:type shutdown_requested: True
78+
def stop(self, blocking=False):
11779
"""
118-
if not shutdown_requested:
119-
self._feedback_loop.put(Status.PUSH_RETRYABLE_ERROR)
80+
Stop the current ongoing connection.
12081
121-
def _handle_error(self, event):
82+
:param blocking: whether to wait for the connection to be successfully closed or not
83+
:type blocking: bool
12284
"""
123-
Handle incoming error message.
85+
if not self._running:
86+
_LOGGER.warning('Push manager does not have an open SSE connection. Ignoring')
87+
return
12488

125-
:param event: Incoming ably error
126-
:type event: splitio.push.sse.parser.AblyError
127-
"""
128-
_LOGGER.debug('handling ably error event: %s', str(event))
129-
feedback = self._status_tracker.handle_ably_error(event)
130-
if feedback is not None:
131-
self._feedback_loop.put(feedback)
89+
self._running = False
90+
self._processor.update_workers_status(False)
91+
self._status_tracker.notify_sse_shutdown_expected()
92+
self._next_refresh.cancel()
93+
self._sse_client.stop(blocking)
13294

13395
def _event_handler(self, event):
13496
"""
@@ -179,13 +141,11 @@ def _trigger_connection_flow(self):
179141
return
180142

181143
self._status_tracker.reset()
182-
if self._sse_client.start(token):
144+
res = self._sse_client.start(token)
145+
if res:
146+
print(res)
183147
self._setup_next_token_refresh(token)
184148
self._running = True
185-
self._feedback_loop.put(Status.PUSH_SUBSYSTEM_UP)
186-
return
187-
188-
self._feedback_loop.put(Status.PUSH_RETRYABLE_ERROR)
189149

190150
def _setup_next_token_refresh(self, token):
191151
"""
@@ -201,36 +161,81 @@ def _setup_next_token_refresh(self, token):
201161
self._next_refresh.setName('TokenRefresh')
202162
self._next_refresh.start()
203163

204-
def update_workers_status(self, enabled):
164+
def _handle_connection_ready(self):
165+
"""Handle a successful connection to SSE."""
166+
self._feedback_loop.put(Status.PUSH_SUBSYSTEM_UP)
167+
_LOGGER.info('sse initial event received. enabling')
168+
169+
def _handle_connection_end(self, shutdown_requested):
205170
"""
206-
Enable/Disable push update workers.
171+
Handle a connection ending.
207172
208-
:param enabled: if True, enable workers. If False, disable them.
209-
:type enabled: bool
173+
If the connection shutdown was not requested, trigger a restart.
174+
175+
:param shutdown_requested: whether the shutdown was requested or unexpected.
176+
:type shutdown_requested: True
210177
"""
211-
self._processor.update_workers_status(enabled)
178+
if not shutdown_requested:
179+
self._feedback_loop.put(Status.PUSH_RETRYABLE_ERROR)
212180

181+
def _handle_message(self, event):
182+
"""
183+
Handle incoming update message.
213184
214-
def start(self):
215-
"""Start a new connection if not already running."""
216-
if self._running:
217-
_LOGGER.warning('Push manager already has a connection running. Ignoring')
185+
:param event: Incoming Update message
186+
:type event: splitio.push.sse.parser.Update
187+
"""
188+
try:
189+
handle = self._message_handlers[event.message_type]
190+
except KeyError:
191+
_LOGGER.error('no handler for message of type %s', event.message_type)
192+
_LOGGER.debug(str(event), exc_info=True)
218193
return
219194

220-
self._trigger_connection_flow()
195+
handle(event)
221196

222-
def stop(self, blocking=False):
197+
def _handle_update(self, event):
223198
"""
224-
Stop the current ongoing connection.
199+
Handle incoming update message.
225200
226-
:param blocking: whether to wait for the connection to be successfully closed or not
227-
:type blocking: bool
201+
:param event: Incoming Update message
202+
:type event: splitio.push.sse.parser.Update
228203
"""
229-
if not self._running:
230-
_LOGGER.warning('Push manager does not have an open SSE connection. Ignoring')
231-
return
204+
_LOGGER.debug('handling update event: %s', str(event))
205+
self._processor.handle(event)
232206

233-
self._processor.update_workers_status(False)
234-
self._status_tracker.notify_sse_shutdown_expected()
235-
self._next_refresh.cancel()
236-
self._sse_client.stop(blocking)
207+
def _handle_control(self, event):
208+
"""
209+
Handle incoming control message.
210+
211+
:param event: Incoming control message.
212+
:type event: splitio.push.sse.parser.ControlMessage
213+
"""
214+
_LOGGER.debug('handling control event: %s', str(event))
215+
feedback = self._status_tracker.handle_control_message(event)
216+
if feedback is not None:
217+
self._feedback_loop.put(feedback)
218+
219+
def _handle_occupancy(self, event):
220+
"""
221+
Handle incoming notification message.
222+
223+
:param event: Incoming occupancy message.
224+
:type event: splitio.push.sse.parser.Occupancy
225+
"""
226+
_LOGGER.debug('handling occupancy event: %s', str(event))
227+
feedback = self._status_tracker.handle_occupancy(event)
228+
if feedback is not None:
229+
self._feedback_loop.put(feedback)
230+
231+
def _handle_error(self, event):
232+
"""
233+
Handle incoming error message.
234+
235+
:param event: Incoming ably error
236+
:type event: splitio.push.sse.parser.AblyError
237+
"""
238+
_LOGGER.debug('handling ably error event: %s', str(event))
239+
feedback = self._status_tracker.handle_ably_error(event)
240+
if feedback is not None:
241+
self._feedback_loop.put(feedback)

splitio/push/splitsse.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
_LOGGER = logging.getLogger(__name__)
1111

1212

13-
class SplitSSEClient(object):
13+
class SplitSSEClient(object): # pylint: disable=too-many-instance-attributes
1414
"""Split streaming endpoint SSE client."""
1515

1616
KEEPALIVE_TIMEOUT = 70
@@ -21,18 +21,27 @@ class _Status(Enum):
2121
ERRORED = 2
2222
CONNECTED = 3
2323

24-
def __init__(self, callback, base_url='https://streaming.split.io'):
24+
def __init__(self, event_callback, first_event_callback=None,
25+
connection_closed_callback=None, base_url='https://streaming.split.io'):
2526
"""
2627
Construct a split sse client.
2728
2829
:param callback: fuction to call when an event is received.
2930
:type callback: callable
3031
32+
:param first_event_callback: function to call when the first event is received.
33+
:type first_event_callback: callable
34+
35+
:param connection_closed_callback: funciton to call when the connection ends.
36+
:type connection_closed_callback: callable
37+
3138
:param base_url: scheme + :// + host
3239
:type base_url: str
3340
"""
3441
self._client = SSEClient(self._raw_event_handler)
35-
self._callback = callback
42+
self._callback = event_callback
43+
self._on_connected = first_event_callback
44+
self._on_disconnected = connection_closed_callback
3645
self._base_url = base_url
3746
self._status = SplitSSEClient._Status.IDLE
3847
self._sse_first_event = None
@@ -49,6 +58,8 @@ def _raw_event_handler(self, event):
4958
self._status = SplitSSEClient._Status.CONNECTED if event.event != SSE_EVENT_ERROR \
5059
else SplitSSEClient._Status.ERRORED
5160
self._sse_first_event.set()
61+
if self._on_connected is not None:
62+
self._on_connected()
5263

5364
if event.data is not None:
5465
self._callback(event)
@@ -106,11 +117,13 @@ def start(self, token):
106117

107118
def connect(url):
108119
"""Connect to sse in a blocking manner."""
120+
shutdown_requested = False
109121
try:
110-
self._client.start(url, timeout=self.KEEPALIVE_TIMEOUT)
122+
shutdown_requested = self._client.start(url, timeout=self.KEEPALIVE_TIMEOUT)
111123
finally:
112124
self._status = SplitSSEClient._Status.IDLE
113125
self._sse_connection_closed.set()
126+
self._on_disconnected(shutdown_requested)
114127

115128
url = self._build_url(token)
116129
task = threading.Thread(target=connect, name='SSEConnection', args=(url,))

splitio/sync/manager.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,23 +88,24 @@ def _streaming_feedback_handler(self):
8888
continue
8989

9090
if status == Status.PUSH_SUBSYSTEM_UP:
91-
_LOGGER.info('streaming up and running. disabling periodic fetching.')
9291
self._synchronizer.stop_periodic_fetching()
9392
self._synchronizer.sync_all()
9493
self._push.update_workers_status(True)
9594
self._backoff.reset()
95+
_LOGGER.info('streaming up and running. disabling periodic fetching.')
9696
elif status == Status.PUSH_SUBSYSTEM_DOWN:
97-
_LOGGER.info('streaming temporarily down. starting periodic fetching')
9897
self._push.update_workers_status(False)
98+
self._synchronizer.sync_all()
9999
self._synchronizer.start_periodic_fetching()
100+
_LOGGER.info('streaming temporarily down. starting periodic fetching')
100101
elif status == Status.PUSH_RETRYABLE_ERROR:
101-
_LOGGER.info('error in streaming. restarting flow')
102102
self._synchronizer.start_periodic_fetching()
103103
self._push.stop(True)
104104
time.sleep(self._backoff.get())
105105
self._push.start()
106+
_LOGGER.info('error in streaming. restarting flow')
106107
elif status == Status.PUSH_NONRETRYABLE_ERROR:
107-
_LOGGER.info('non-recoverable error in streaming. switching to polling.')
108108
self._synchronizer.start_periodic_fetching()
109109
self._push.stop(False)
110+
_LOGGER.info('non-recoverable error in streaming. switching to polling.')
110111
return

splitio/tasks/util/asynctask.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ def _execution_wrapper(self):
8989
try:
9090
msg = self._messages.get(True, self._period)
9191
if msg == __TASK_STOP__:
92-
_LOGGER.info("Stop signal received. finishing task execution")
92+
_LOGGER.debug("Stop signal received. finishing task execution")
9393
break
9494
elif msg == __TASK_FORCE_RUN__:
95-
_LOGGER.info("Force execution signal received. Running now")
95+
_LOGGER.debug("Force execution signal received. Running now")
9696
if not _safe_run(self._main):
9797
_LOGGER.error(
9898
"An error occurred when executing the task. "

0 commit comments

Comments
 (0)