Skip to content

Commit 2d96d47

Browse files
committed
added async for sse class
1 parent c584967 commit 2d96d47

File tree

3 files changed

+524
-22
lines changed

3 files changed

+524
-22
lines changed

splitio/push/manager.py

Lines changed: 263 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import logging
44
from threading import Timer
55

6+
import abc
7+
68
from splitio.api import APIException
79
from splitio.util.time import get_current_epoch_time_ms
810
from splitio.push.splitsse import SplitSSEClient
@@ -11,13 +13,49 @@
1113
from splitio.push.processor import MessageProcessor
1214
from splitio.push.status_tracker import PushStatusTracker, Status
1315
from splitio.models.telemetry import StreamingEventTypes
16+
from splitio.optional.loaders import asyncio
17+
1418
_TOKEN_REFRESH_GRACE_PERIOD = 10 * 60 # 10 minutes
1519

1620

1721
_LOGGER = logging.getLogger(__name__)
1822

23+
def _get_parsed_event(event):
24+
"""
25+
Parse an incoming event.
26+
27+
:param event: Incoming event
28+
:type event: splitio.push.sse.SSEEvent
29+
30+
:returns: an event parsed to it's concrete type.
31+
:rtype: BaseEvent
32+
"""
33+
try:
34+
parsed = parse_incoming_event(event)
35+
except EventParsingException:
36+
_LOGGER.error('error parsing event of type %s', event.event_type)
37+
_LOGGER.debug(str(event), exc_info=True)
38+
raise
39+
40+
return parsed
41+
42+
class PushManagerBase(object, metaclass=abc.ABCMeta):
43+
"""Worker template."""
44+
45+
@abc.abstractmethod
46+
def update_workers_status(self, enabled):
47+
"""Enable/Disable push update workers."""
48+
49+
@abc.abstractmethod
50+
def start(self):
51+
"""Start a new connection if not already running."""
52+
53+
@abc.abstractmethod
54+
def stop(self, blocking=False):
55+
"""Stop the current ongoing connection."""
1956

20-
class PushManager(object): # pylint:disable=too-many-instance-attributes
57+
58+
class PushManager(PushManagerBase): # pylint:disable=too-many-instance-attributes
2159
"""Push notifications susbsytem manager."""
2260

2361
def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, telemetry_runtime_producer, sse_url=None, client_key=None):
@@ -107,16 +145,10 @@ def _event_handler(self, event):
107145
:type event: splitio.push.sse.SSEEvent
108146
"""
109147
try:
110-
parsed = parse_incoming_event(event)
111-
except EventParsingException:
112-
_LOGGER.error('error parsing event of type %s', event.event_type)
113-
_LOGGER.debug(str(event), exc_info=True)
114-
return
115-
116-
try:
148+
parsed = _get_parsed_event(event)
117149
handle = self._event_handlers[parsed.event_type]
118-
except KeyError:
119-
_LOGGER.error('no handler for message of type %s', parsed.event_type)
150+
except (KeyError, EventParsingException):
151+
_LOGGER.error('Parsing exception or no handler for message of type %s', parsed.event_type)
120152
_LOGGER.debug(str(event), exc_info=True)
121153
return
122154

@@ -247,3 +279,224 @@ def _handle_connection_end(self):
247279
feedback = self._status_tracker.handle_disconnect()
248280
if feedback is not None:
249281
self._feedback_loop.put(feedback)
282+
283+
class PushManagerAsync(PushManagerBase): # pylint:disable=too-many-instance-attributes
284+
"""Push notifications susbsytem manager."""
285+
286+
def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, sse_url=None, client_key=None):
287+
"""
288+
Class constructor.
289+
290+
:param auth_api: sdk-auth-service api client
291+
:type auth_api: splitio.api.auth.AuthAPI
292+
293+
:param synchronizer: split data synchronizer facade
294+
:type synchronizer: splitio.sync.synchronizer.Synchronizer
295+
296+
:param feedback_loop: queue where push status updates are published.
297+
:type feedback_loop: queue.Queue
298+
299+
:param sdk_metadata: SDK version & machine name & IP.
300+
:type sdk_metadata: splitio.client.util.SdkMetadata
301+
302+
:param sse_url: streaming base url.
303+
:type sse_url: str
304+
305+
:param client_key: client key.
306+
:type client_key: str
307+
"""
308+
self._auth_api = auth_api
309+
self._feedback_loop = feedback_loop
310+
self._processor = MessageProcessor(synchronizer)
311+
self._status_tracker = PushStatusTracker()
312+
self._event_handlers = {
313+
EventType.MESSAGE: self._handle_message,
314+
EventType.ERROR: self._handle_error
315+
}
316+
317+
self._message_handlers = {
318+
MessageType.UPDATE: self._handle_update,
319+
MessageType.CONTROL: self._handle_control,
320+
MessageType.OCCUPANCY: self._handle_occupancy
321+
}
322+
323+
kwargs = {} if sse_url is None else {'base_url': sse_url}
324+
self._sse_client = SplitSSEClient(self._event_handler, sdk_metadata, self._handle_connection_ready,
325+
self._handle_connection_end, client_key, **kwargs)
326+
self._running = False
327+
self._next_refresh = Timer(0, lambda: 0)
328+
329+
async def update_workers_status(self, enabled):
330+
"""
331+
Enable/Disable push update workers.
332+
333+
:param enabled: if True, enable workers. If False, disable them.
334+
:type enabled: bool
335+
"""
336+
await self._processor.update_workers_status(enabled)
337+
338+
async def start(self):
339+
"""Start a new connection if not already running."""
340+
if self._running:
341+
_LOGGER.warning('Push manager already has a connection running. Ignoring')
342+
return
343+
344+
await self._trigger_connection_flow()
345+
346+
async def stop(self, blocking=False):
347+
"""
348+
Stop the current ongoing connection.
349+
350+
:param blocking: whether to wait for the connection to be successfully closed or not
351+
:type blocking: bool
352+
"""
353+
if not self._running:
354+
_LOGGER.warning('Push manager does not have an open SSE connection. Ignoring')
355+
return
356+
357+
self._running = False
358+
await self._processor.update_workers_status(False)
359+
self._status_tracker.notify_sse_shutdown_expected()
360+
self._next_refresh.cancel()
361+
await self._sse_client.stop(blocking)
362+
363+
async def _event_handler(self, event):
364+
"""
365+
Process an incoming event.
366+
367+
:param event: Incoming event
368+
:type event: splitio.push.sse.SSEEvent
369+
"""
370+
try:
371+
parsed = _get_parsed_event(event)
372+
handle = await self._event_handlers[parsed.event_type]
373+
except (KeyError, EventParsingException):
374+
_LOGGER.error('Parsing exception or no handler for message of type %s', parsed.event_type)
375+
_LOGGER.debug(str(event), exc_info=True)
376+
return
377+
378+
try:
379+
await handle(parsed)
380+
except Exception: # pylint:disable=broad-except
381+
_LOGGER.error('something went wrong when processing message of type %s',
382+
parsed.event_type)
383+
_LOGGER.debug(str(parsed), exc_info=True)
384+
385+
async def _token_refresh(self):
386+
"""Refresh auth token."""
387+
_LOGGER.info("retriggering authentication flow.")
388+
self.stop(True)
389+
await self._trigger_connection_flow()
390+
391+
async def _trigger_connection_flow(self):
392+
"""Authenticate and start a connection."""
393+
try:
394+
token = await self._auth_api.authenticate()
395+
except APIException:
396+
_LOGGER.error('error performing sse auth request.')
397+
_LOGGER.debug('stack trace: ', exc_info=True)
398+
await self._feedback_loop.put(Status.PUSH_RETRYABLE_ERROR)
399+
return
400+
401+
if not token.push_enabled:
402+
await self._feedback_loop.put(Status.PUSH_NONRETRYABLE_ERROR)
403+
return
404+
405+
_LOGGER.debug("auth token fetched. connecting to streaming.")
406+
self._status_tracker.reset()
407+
self._running = True
408+
if self._sse_client.start(token):
409+
_LOGGER.debug("connected to streaming, scheduling next refresh")
410+
await self._setup_next_token_refresh(token)
411+
self._running = True
412+
413+
async def _setup_next_token_refresh(self, token):
414+
"""
415+
Schedule next token refresh.
416+
417+
:param token: Last fetched token.
418+
:type token: splitio.models.token.Token
419+
"""
420+
if self._next_refresh is not None:
421+
self._next_refresh.cancel()
422+
self._next_refresh = Timer((token.exp - token.iat) - _TOKEN_REFRESH_GRACE_PERIOD,
423+
await self._token_refresh)
424+
self._next_refresh.setName('TokenRefresh')
425+
self._next_refresh.start()
426+
427+
async def _handle_message(self, event):
428+
"""
429+
Handle incoming update message.
430+
431+
:param event: Incoming Update message
432+
:type event: splitio.push.sse.parser.Update
433+
"""
434+
try:
435+
handle = await self._message_handlers[event.message_type]
436+
except KeyError:
437+
_LOGGER.error('no handler for message of type %s', event.message_type)
438+
_LOGGER.debug(str(event), exc_info=True)
439+
return
440+
441+
await handle(event)
442+
443+
async def _handle_update(self, event):
444+
"""
445+
Handle incoming update message.
446+
447+
:param event: Incoming Update message
448+
:type event: splitio.push.sse.parser.Update
449+
"""
450+
_LOGGER.debug('handling update event: %s', str(event))
451+
await self._processor.handle(event)
452+
453+
async def _handle_control(self, event):
454+
"""
455+
Handle incoming control message.
456+
457+
:param event: Incoming control message.
458+
:type event: splitio.push.sse.parser.ControlMessage
459+
"""
460+
_LOGGER.debug('handling control event: %s', str(event))
461+
feedback = self._status_tracker.handle_control_message(event)
462+
if feedback is not None:
463+
await self._feedback_loop.put(feedback)
464+
465+
async def _handle_occupancy(self, event):
466+
"""
467+
Handle incoming notification message.
468+
469+
:param event: Incoming occupancy message.
470+
:type event: splitio.push.sse.parser.Occupancy
471+
"""
472+
_LOGGER.debug('handling occupancy event: %s', str(event))
473+
feedback = self._status_tracker.handle_occupancy(event)
474+
if feedback is not None:
475+
await self._feedback_loop.put(feedback)
476+
477+
async def _handle_error(self, event):
478+
"""
479+
Handle incoming error message.
480+
481+
:param event: Incoming ably error
482+
:type event: splitio.push.sse.parser.AblyError
483+
"""
484+
_LOGGER.debug('handling ably error event: %s', str(event))
485+
feedback = self._status_tracker.handle_ably_error(event)
486+
if feedback is not None:
487+
await self._feedback_loop.put(feedback)
488+
489+
async def _handle_connection_ready(self):
490+
"""Handle a successful connection to SSE."""
491+
await self._feedback_loop.put(Status.PUSH_SUBSYSTEM_UP)
492+
_LOGGER.info('sse initial event received. enabling')
493+
494+
async def _handle_connection_end(self):
495+
"""
496+
Handle a connection ending.
497+
498+
If the connection shutdown was not requested, trigger a restart.
499+
"""
500+
feedback = self._status_tracker.handle_disconnect()
501+
if feedback is not None:
502+
await self._feedback_loop.put(feedback)

0 commit comments

Comments
 (0)