Skip to content

Commit bca16ac

Browse files
authored
Merge pull request #210 from splitio/tests/push_integration
Tests/push integration
2 parents d32dcb5 + 76d531a commit bca16ac

File tree

18 files changed

+1777
-196
lines changed

18 files changed

+1777
-196
lines changed

splitio/client/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
'splitSdkMachineName': None,
1717
'splitSdkMachineIp': None,
1818
'streamingEnabled': True,
19-
'featuresRefreshRate': 5,
20-
'segmentsRefreshRate': 60,
19+
'featuresRefreshRate': 30,
20+
'segmentsRefreshRate': 30,
2121
'metricsRefreshRate': 60,
2222
'impressionsRefreshRate': 5 * 60,
2323
'impressionsBulkSize': 5000,

splitio/push/manager.py

Lines changed: 94 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -51,84 +51,46 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sse_url=None):
5151
MessageType.OCCUPANCY: self._handle_occupancy
5252
}
5353

54-
self._sse_client = SplitSSEClient(self._event_handler) if sse_url is None \
55-
else SplitSSEClient(self._event_handler, sse_url)
54+
kwargs = {} if sse_url is None else {'base_url': sse_url}
55+
self._sse_client = SplitSSEClient(self._event_handler, self._handle_connection_ready,
56+
self._handle_connection_end, **kwargs)
5657
self._running = False
5758
self._next_refresh = Timer(0, lambda: 0)
5859

59-
def _handle_message(self, event):
60-
"""
61-
Handle incoming update message.
62-
63-
:param event: Incoming Update message
64-
:type event: splitio.push.sse.parser.Update
65-
"""
66-
try:
67-
handle = self._message_handlers[event.message_type]
68-
except KeyError:
69-
_LOGGER.error('no handler for message of type %s', event.message_type)
70-
_LOGGER.debug(str(event), exc_info=True)
71-
return
72-
73-
handle(event)
74-
75-
def _handle_update(self, event):
76-
"""
77-
Handle incoming update message.
78-
79-
:param event: Incoming Update message
80-
:type event: splitio.push.sse.parser.Update
81-
"""
82-
_LOGGER.debug('handling update event: %s', str(event))
83-
self._processor.handle(event)
84-
85-
def _handle_control(self, event):
86-
"""
87-
Handle incoming control message.
88-
89-
:param event: Incoming control message.
90-
:type event: splitio.push.sse.parser.ControlMessage
60+
def update_workers_status(self, enabled):
9161
"""
92-
_LOGGER.debug('handling control event: %s', str(event))
93-
feedback = self._status_tracker.handle_control_message(event)
94-
if feedback is not None:
95-
self._feedback_loop.put(feedback)
62+
Enable/Disable push update workers.
9663
97-
def _handle_occupancy(self, event):
64+
:param enabled: if True, enable workers. If False, disable them.
65+
:type enabled: bool
9866
"""
99-
Handle incoming notification message.
67+
self._processor.update_workers_status(enabled)
10068

101-
:param event: Incoming occupancy message.
102-
:type event: splitio.push.sse.parser.Occupancy
103-
"""
104-
_LOGGER.debug('handling occupancy event: %s', str(event))
105-
feedback = self._status_tracker.handle_occupancy(event)
106-
if feedback is not None:
107-
self._feedback_loop.put(feedback)
10869

109-
def _handle_connection_end(self, shutdown_requested):
110-
"""
111-
Handle a connection ending.
70+
def start(self):
71+
"""Start a new connection if not already running."""
72+
if self._running:
73+
_LOGGER.warning('Push manager already has a connection running. Ignoring')
74+
return
11275

113-
If the connection shutdown was not requested, trigger a restart.
76+
self._trigger_connection_flow()
11477

115-
:param shutdown_requested: whether the shutdown was requested or unexpected.
116-
:type shutdown_requested: True
78+
def stop(self, blocking=False):
11779
"""
118-
if not shutdown_requested:
119-
self._feedback_loop.put(Status.PUSH_RETRYABLE_ERROR)
80+
Stop the current ongoing connection.
12081
121-
def _handle_error(self, event):
82+
:param blocking: whether to wait for the connection to be successfully closed or not
83+
:type blocking: bool
12284
"""
123-
Handle incoming error message.
85+
if not self._running:
86+
_LOGGER.warning('Push manager does not have an open SSE connection. Ignoring')
87+
return
12488

125-
:param event: Incoming ably error
126-
:type event: splitio.push.sse.parser.AblyError
127-
"""
128-
_LOGGER.debug('handling ably error event: %s', str(event))
129-
feedback = self._status_tracker.handle_ably_error(event)
130-
if feedback is not None:
131-
self._feedback_loop.put(feedback)
89+
self._running = False
90+
self._processor.update_workers_status(False)
91+
self._status_tracker.notify_sse_shutdown_expected()
92+
self._next_refresh.cancel()
93+
self._sse_client.stop(blocking)
13294

13395
def _event_handler(self, event):
13496
"""
@@ -178,14 +140,12 @@ def _trigger_connection_flow(self):
178140
self._feedback_loop.put(Status.PUSH_NONRETRYABLE_ERROR)
179141
return
180142

143+
_LOGGER.debug("auth token fetched. connecting to streaming.")
181144
self._status_tracker.reset()
182-
if self._sse_client.start(token):
145+
if self._sse_client.start(token):
146+
_LOGGER.debug("connected to streaming, scheduling next refresh")
183147
self._setup_next_token_refresh(token)
184148
self._running = True
185-
self._feedback_loop.put(Status.PUSH_SUBSYSTEM_UP)
186-
return
187-
188-
self._feedback_loop.put(Status.PUSH_RETRYABLE_ERROR)
189149

190150
def _setup_next_token_refresh(self, token):
191151
"""
@@ -201,36 +161,79 @@ def _setup_next_token_refresh(self, token):
201161
self._next_refresh.setName('TokenRefresh')
202162
self._next_refresh.start()
203163

204-
def update_workers_status(self, enabled):
164+
def _handle_message(self, event):
205165
"""
206-
Enable/Disable push update workers.
166+
Handle incoming update message.
207167
208-
:param enabled: if True, enable workers. If False, disable them.
209-
:type enabled: bool
168+
:param event: Incoming Update message
169+
:type event: splitio.push.sse.parser.Update
210170
"""
211-
self._processor.update_workers_status(enabled)
171+
try:
172+
handle = self._message_handlers[event.message_type]
173+
except KeyError:
174+
_LOGGER.error('no handler for message of type %s', event.message_type)
175+
_LOGGER.debug(str(event), exc_info=True)
176+
return
212177

178+
handle(event)
213179

214-
def start(self):
215-
"""Start a new connection if not already running."""
216-
if self._running:
217-
_LOGGER.warning('Push manager already has a connection running. Ignoring')
218-
return
180+
def _handle_update(self, event):
181+
"""
182+
Handle incoming update message.
219183
220-
self._trigger_connection_flow()
184+
:param event: Incoming Update message
185+
:type event: splitio.push.sse.parser.Update
186+
"""
187+
_LOGGER.debug('handling update event: %s', str(event))
188+
self._processor.handle(event)
221189

222-
def stop(self, blocking=False):
190+
def _handle_control(self, event):
223191
"""
224-
Stop the current ongoing connection.
192+
Handle incoming control message.
225193
226-
:param blocking: whether to wait for the connection to be successfully closed or not
227-
:type blocking: bool
194+
:param event: Incoming control message.
195+
:type event: splitio.push.sse.parser.ControlMessage
228196
"""
229-
if not self._running:
230-
_LOGGER.warning('Push manager does not have an open SSE connection. Ignoring')
231-
return
197+
_LOGGER.debug('handling control event: %s', str(event))
198+
feedback = self._status_tracker.handle_control_message(event)
199+
if feedback is not None:
200+
self._feedback_loop.put(feedback)
232201

233-
self._processor.update_workers_status(False)
234-
self._status_tracker.notify_sse_shutdown_expected()
235-
self._next_refresh.cancel()
236-
self._sse_client.stop(blocking)
202+
def _handle_occupancy(self, event):
203+
"""
204+
Handle incoming notification message.
205+
206+
:param event: Incoming occupancy message.
207+
:type event: splitio.push.sse.parser.Occupancy
208+
"""
209+
_LOGGER.debug('handling occupancy event: %s', str(event))
210+
feedback = self._status_tracker.handle_occupancy(event)
211+
if feedback is not None:
212+
self._feedback_loop.put(feedback)
213+
214+
def _handle_error(self, event):
215+
"""
216+
Handle incoming error message.
217+
218+
:param event: Incoming ably error
219+
:type event: splitio.push.sse.parser.AblyError
220+
"""
221+
_LOGGER.debug('handling ably error event: %s', str(event))
222+
feedback = self._status_tracker.handle_ably_error(event)
223+
if feedback is not None:
224+
self._feedback_loop.put(feedback)
225+
226+
def _handle_connection_ready(self):
227+
"""Handle a successful connection to SSE."""
228+
self._feedback_loop.put(Status.PUSH_SUBSYSTEM_UP)
229+
_LOGGER.info('sse initial event received. enabling')
230+
231+
def _handle_connection_end(self):
232+
"""
233+
Handle a connection ending.
234+
235+
If the connection shutdown was not requested, trigger a restart.
236+
"""
237+
feedback = self._status_tracker.handle_disconnect()
238+
if feedback is not None:
239+
self._feedback_loop.put(feedback)

splitio/push/parser.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
"""SSE Notification definitions."""
22
import abc
33
import json
4-
import time
54
from enum import Enum
65

76
from future.utils import raise_from
87
from six import add_metaclass
98

109
from splitio.util.decorators import abstract_property
10+
from splitio.util import utctime_ms
1111
from splitio.push.sse import SSE_EVENT_ERROR, SSE_EVENT_MESSAGE
1212

1313

@@ -37,9 +37,9 @@ class UpdateType(Enum):
3737
class ControlType(Enum):
3838
"""Control type enumeration."""
3939

40-
STREAMING_ENABLED = 0
41-
STREAMING_PAUSED = 1
42-
STREAMING_DISABLED = 2
40+
STREAMING_ENABLED = 'STREAMING_ENABLED'
41+
STREAMING_PAUSED = 'STREAMING_PAUSED'
42+
STREAMING_DISABLED = 'STREAMING_DISABLED'
4343

4444

4545
TAG_OCCUPANCY = '[meta]occupancy'
@@ -89,7 +89,7 @@ def __init__(self, code, status_code, message, href):
8989
self._status_code = status_code
9090
self._message = message
9191
self._href = href
92-
self._timestamp = int(time.time() * 1000) # TODO: replace with UTC function after merge
92+
self._timestamp = utctime_ms()
9393

9494
@property
9595
def event_type(self): #pylint:disable=no-self-use

splitio/push/splitsse.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
_LOGGER = logging.getLogger(__name__)
1111

1212

13-
class SplitSSEClient(object):
13+
class SplitSSEClient(object): # pylint: disable=too-many-instance-attributes
1414
"""Split streaming endpoint SSE client."""
1515

1616
KEEPALIVE_TIMEOUT = 70
@@ -21,18 +21,27 @@ class _Status(Enum):
2121
ERRORED = 2
2222
CONNECTED = 3
2323

24-
def __init__(self, callback, base_url='https://streaming.split.io'):
24+
def __init__(self, event_callback, first_event_callback=None,
25+
connection_closed_callback=None, base_url='https://streaming.split.io'):
2526
"""
2627
Construct a split sse client.
2728
2829
:param callback: fuction to call when an event is received.
2930
:type callback: callable
3031
32+
:param first_event_callback: function to call when the first event is received.
33+
:type first_event_callback: callable
34+
35+
:param connection_closed_callback: funciton to call when the connection ends.
36+
:type connection_closed_callback: callable
37+
3138
:param base_url: scheme + :// + host
3239
:type base_url: str
3340
"""
3441
self._client = SSEClient(self._raw_event_handler)
35-
self._callback = callback
42+
self._callback = event_callback
43+
self._on_connected = first_event_callback
44+
self._on_disconnected = connection_closed_callback
3645
self._base_url = base_url
3746
self._status = SplitSSEClient._Status.IDLE
3847
self._sse_first_event = None
@@ -49,6 +58,8 @@ def _raw_event_handler(self, event):
4958
self._status = SplitSSEClient._Status.CONNECTED if event.event != SSE_EVENT_ERROR \
5059
else SplitSSEClient._Status.ERRORED
5160
self._sse_first_event.set()
61+
if self._on_connected is not None:
62+
self._on_connected()
5263

5364
if event.data is not None:
5465
self._callback(event)
@@ -111,6 +122,7 @@ def connect(url):
111122
finally:
112123
self._status = SplitSSEClient._Status.IDLE
113124
self._sse_connection_closed.set()
125+
self._on_disconnected()
114126

115127
url = self._build_url(token)
116128
task = threading.Thread(target=connect, name='SSEConnection', args=(url,))

splitio/push/splitworker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def _run(self):
4141
_LOGGER.debug('Processing split_update %d', event.change_number)
4242
try:
4343
self._handler(event.change_number)
44-
except Exception:
44+
except Exception: # pylint: disable=broad-except
4545
_LOGGER.error('Exception raised in split synchronization')
4646
_LOGGER.debug('Exception information: ', exc_info=True)
4747

splitio/push/status_tracker.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
"""NotificationManagerKeeper implementation."""
2-
from collections import defaultdict
32
from enum import Enum
43
import logging
54
import six
@@ -18,7 +17,7 @@ class Status(Enum):
1817
PUSH_NONRETRYABLE_ERROR = 3
1918

2019

21-
class LastEventTimestamps(object):
20+
class LastEventTimestamps(object): # pylint:disable=too-few-public-methods
2221
"""Simple class to keep track of the last time multiple events occurred."""
2322

2423
def __init__(self):
@@ -90,7 +89,7 @@ def handle_control_message(self, event):
9089
:type event: splitio.push.parser.ControlMessage
9190
"""
9291
# we don't care about control messages if a disconnection is expected
93-
if self._shutdown_expected:
92+
if self._shutdown_expected:
9493
return None
9594

9695
if self._timestamps.control > event.timestamp:
@@ -111,10 +110,10 @@ def handle_ably_error(self, event):
111110
:returns: A new status if required. None otherwise
112111
:rtype: Optional[Status]
113112
"""
114-
if self._shutdown_expected: # we don't care about occupancy if a disconnection is expected
113+
if self._shutdown_expected: # we don't care about an incoming error if a shutdown is expected
115114
return None
116115

117-
_LOGGER.debug('handling update event: %s', str(event))
116+
_LOGGER.debug('handling ably error event: %s', str(event))
118117
if event.should_be_ignored():
119118
_LOGGER.debug('ignoring sse error message: %s', event)
120119
return None
@@ -161,7 +160,7 @@ def _update_status(self):
161160

162161
return None
163162

164-
def _handle_disconnect(self):
163+
def handle_disconnect(self):
165164
"""
166165
Handle non-requested SSE disconnection.
167166

0 commit comments

Comments
 (0)