|
3 | 3 | import logging |
4 | 4 | from threading import Timer |
5 | 5 |
|
6 | | -import abc |
7 | | - |
8 | 6 | from splitio.api import APIException |
9 | 7 | from splitio.util.time import get_current_epoch_time_ms |
10 | 8 | from splitio.push.splitsse import SplitSSEClient |
|
13 | 11 | from splitio.push.processor import MessageProcessor |
14 | 12 | from splitio.push.status_tracker import PushStatusTracker, Status |
15 | 13 | from splitio.models.telemetry import StreamingEventTypes |
16 | | -from splitio.optional.loaders import asyncio |
17 | | - |
18 | 14 | _TOKEN_REFRESH_GRACE_PERIOD = 10 * 60 # 10 minutes |
19 | 15 |
|
20 | 16 |
|
21 | 17 | _LOGGER = logging.getLogger(__name__) |
22 | 18 |
|
23 | | -def _get_parsed_event(event): |
24 | | - """ |
25 | | - Parse an incoming event. |
26 | | -
|
27 | | - :param event: Incoming event |
28 | | - :type event: splitio.push.sse.SSEEvent |
29 | | -
|
30 | | - :returns: an event parsed to it's concrete type. |
31 | | - :rtype: BaseEvent |
32 | | - """ |
33 | | - try: |
34 | | - parsed = parse_incoming_event(event) |
35 | | - except EventParsingException: |
36 | | - _LOGGER.error('error parsing event of type %s', event.event_type) |
37 | | - _LOGGER.debug(str(event), exc_info=True) |
38 | | - raise |
39 | | - |
40 | | - return parsed |
41 | | - |
42 | | -class PushManagerBase(object, metaclass=abc.ABCMeta): |
43 | | - """Worker template.""" |
44 | | - |
45 | | - @abc.abstractmethod |
46 | | - def update_workers_status(self, enabled): |
47 | | - """Enable/Disable push update workers.""" |
48 | | - |
49 | | - @abc.abstractmethod |
50 | | - def start(self): |
51 | | - """Start a new connection if not already running.""" |
52 | | - |
53 | | - @abc.abstractmethod |
54 | | - def stop(self, blocking=False): |
55 | | - """Stop the current ongoing connection.""" |
56 | 19 |
|
57 | | - |
58 | | -class PushManager(PushManagerBase): # pylint:disable=too-many-instance-attributes |
| 20 | +class PushManager(object): # pylint:disable=too-many-instance-attributes |
59 | 21 | """Push notifications susbsytem manager.""" |
60 | 22 |
|
61 | 23 | def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, telemetry_runtime_producer, sse_url=None, client_key=None): |
@@ -145,10 +107,16 @@ def _event_handler(self, event): |
145 | 107 | :type event: splitio.push.sse.SSEEvent |
146 | 108 | """ |
147 | 109 | try: |
148 | | - parsed = _get_parsed_event(event) |
| 110 | + parsed = parse_incoming_event(event) |
| 111 | + except EventParsingException: |
| 112 | + _LOGGER.error('error parsing event of type %s', event.event_type) |
| 113 | + _LOGGER.debug(str(event), exc_info=True) |
| 114 | + return |
| 115 | + |
| 116 | + try: |
149 | 117 | handle = self._event_handlers[parsed.event_type] |
150 | | - except (KeyError, EventParsingException): |
151 | | - _LOGGER.error('Parsing exception or no handler for message of type %s', parsed.event_type) |
| 118 | + except KeyError: |
| 119 | + _LOGGER.error('no handler for message of type %s', parsed.event_type) |
152 | 120 | _LOGGER.debug(str(event), exc_info=True) |
153 | 121 | return |
154 | 122 |
|
@@ -279,224 +247,3 @@ def _handle_connection_end(self): |
279 | 247 | feedback = self._status_tracker.handle_disconnect() |
280 | 248 | if feedback is not None: |
281 | 249 | self._feedback_loop.put(feedback) |
282 | | - |
283 | | -class PushManagerAsync(PushManagerBase): # pylint:disable=too-many-instance-attributes |
284 | | - """Push notifications susbsytem manager.""" |
285 | | - |
286 | | - def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, sse_url=None, client_key=None): |
287 | | - """ |
288 | | - Class constructor. |
289 | | -
|
290 | | - :param auth_api: sdk-auth-service api client |
291 | | - :type auth_api: splitio.api.auth.AuthAPI |
292 | | -
|
293 | | - :param synchronizer: split data synchronizer facade |
294 | | - :type synchronizer: splitio.sync.synchronizer.Synchronizer |
295 | | -
|
296 | | - :param feedback_loop: queue where push status updates are published. |
297 | | - :type feedback_loop: queue.Queue |
298 | | -
|
299 | | - :param sdk_metadata: SDK version & machine name & IP. |
300 | | - :type sdk_metadata: splitio.client.util.SdkMetadata |
301 | | -
|
302 | | - :param sse_url: streaming base url. |
303 | | - :type sse_url: str |
304 | | -
|
305 | | - :param client_key: client key. |
306 | | - :type client_key: str |
307 | | - """ |
308 | | - self._auth_api = auth_api |
309 | | - self._feedback_loop = feedback_loop |
310 | | - self._processor = MessageProcessor(synchronizer) |
311 | | - self._status_tracker = PushStatusTracker() |
312 | | - self._event_handlers = { |
313 | | - EventType.MESSAGE: self._handle_message, |
314 | | - EventType.ERROR: self._handle_error |
315 | | - } |
316 | | - |
317 | | - self._message_handlers = { |
318 | | - MessageType.UPDATE: self._handle_update, |
319 | | - MessageType.CONTROL: self._handle_control, |
320 | | - MessageType.OCCUPANCY: self._handle_occupancy |
321 | | - } |
322 | | - |
323 | | - kwargs = {} if sse_url is None else {'base_url': sse_url} |
324 | | - self._sse_client = SplitSSEClient(self._event_handler, sdk_metadata, self._handle_connection_ready, |
325 | | - self._handle_connection_end, client_key, **kwargs) |
326 | | - self._running = False |
327 | | - self._next_refresh = Timer(0, lambda: 0) |
328 | | - |
329 | | - async def update_workers_status(self, enabled): |
330 | | - """ |
331 | | - Enable/Disable push update workers. |
332 | | -
|
333 | | - :param enabled: if True, enable workers. If False, disable them. |
334 | | - :type enabled: bool |
335 | | - """ |
336 | | - await self._processor.update_workers_status(enabled) |
337 | | - |
338 | | - async def start(self): |
339 | | - """Start a new connection if not already running.""" |
340 | | - if self._running: |
341 | | - _LOGGER.warning('Push manager already has a connection running. Ignoring') |
342 | | - return |
343 | | - |
344 | | - await self._trigger_connection_flow() |
345 | | - |
346 | | - async def stop(self, blocking=False): |
347 | | - """ |
348 | | - Stop the current ongoing connection. |
349 | | -
|
350 | | - :param blocking: whether to wait for the connection to be successfully closed or not |
351 | | - :type blocking: bool |
352 | | - """ |
353 | | - if not self._running: |
354 | | - _LOGGER.warning('Push manager does not have an open SSE connection. Ignoring') |
355 | | - return |
356 | | - |
357 | | - self._running = False |
358 | | - await self._processor.update_workers_status(False) |
359 | | - self._status_tracker.notify_sse_shutdown_expected() |
360 | | - self._next_refresh.cancel() |
361 | | - await self._sse_client.stop(blocking) |
362 | | - |
363 | | - async def _event_handler(self, event): |
364 | | - """ |
365 | | - Process an incoming event. |
366 | | -
|
367 | | - :param event: Incoming event |
368 | | - :type event: splitio.push.sse.SSEEvent |
369 | | - """ |
370 | | - try: |
371 | | - parsed = _get_parsed_event(event) |
372 | | - handle = await self._event_handlers[parsed.event_type] |
373 | | - except (KeyError, EventParsingException): |
374 | | - _LOGGER.error('Parsing exception or no handler for message of type %s', parsed.event_type) |
375 | | - _LOGGER.debug(str(event), exc_info=True) |
376 | | - return |
377 | | - |
378 | | - try: |
379 | | - await handle(parsed) |
380 | | - except Exception: # pylint:disable=broad-except |
381 | | - _LOGGER.error('something went wrong when processing message of type %s', |
382 | | - parsed.event_type) |
383 | | - _LOGGER.debug(str(parsed), exc_info=True) |
384 | | - |
385 | | - async def _token_refresh(self): |
386 | | - """Refresh auth token.""" |
387 | | - _LOGGER.info("retriggering authentication flow.") |
388 | | - self.stop(True) |
389 | | - await self._trigger_connection_flow() |
390 | | - |
391 | | - async def _trigger_connection_flow(self): |
392 | | - """Authenticate and start a connection.""" |
393 | | - try: |
394 | | - token = await self._auth_api.authenticate() |
395 | | - except APIException: |
396 | | - _LOGGER.error('error performing sse auth request.') |
397 | | - _LOGGER.debug('stack trace: ', exc_info=True) |
398 | | - await self._feedback_loop.put(Status.PUSH_RETRYABLE_ERROR) |
399 | | - return |
400 | | - |
401 | | - if not token.push_enabled: |
402 | | - await self._feedback_loop.put(Status.PUSH_NONRETRYABLE_ERROR) |
403 | | - return |
404 | | - |
405 | | - _LOGGER.debug("auth token fetched. connecting to streaming.") |
406 | | - self._status_tracker.reset() |
407 | | - self._running = True |
408 | | - if self._sse_client.start(token): |
409 | | - _LOGGER.debug("connected to streaming, scheduling next refresh") |
410 | | - await self._setup_next_token_refresh(token) |
411 | | - self._running = True |
412 | | - |
413 | | - async def _setup_next_token_refresh(self, token): |
414 | | - """ |
415 | | - Schedule next token refresh. |
416 | | -
|
417 | | - :param token: Last fetched token. |
418 | | - :type token: splitio.models.token.Token |
419 | | - """ |
420 | | - if self._next_refresh is not None: |
421 | | - self._next_refresh.cancel() |
422 | | - self._next_refresh = Timer((token.exp - token.iat) - _TOKEN_REFRESH_GRACE_PERIOD, |
423 | | - await self._token_refresh) |
424 | | - self._next_refresh.setName('TokenRefresh') |
425 | | - self._next_refresh.start() |
426 | | - |
427 | | - async def _handle_message(self, event): |
428 | | - """ |
429 | | - Handle incoming update message. |
430 | | -
|
431 | | - :param event: Incoming Update message |
432 | | - :type event: splitio.push.sse.parser.Update |
433 | | - """ |
434 | | - try: |
435 | | - handle = await self._message_handlers[event.message_type] |
436 | | - except KeyError: |
437 | | - _LOGGER.error('no handler for message of type %s', event.message_type) |
438 | | - _LOGGER.debug(str(event), exc_info=True) |
439 | | - return |
440 | | - |
441 | | - await handle(event) |
442 | | - |
443 | | - async def _handle_update(self, event): |
444 | | - """ |
445 | | - Handle incoming update message. |
446 | | -
|
447 | | - :param event: Incoming Update message |
448 | | - :type event: splitio.push.sse.parser.Update |
449 | | - """ |
450 | | - _LOGGER.debug('handling update event: %s', str(event)) |
451 | | - await self._processor.handle(event) |
452 | | - |
453 | | - async def _handle_control(self, event): |
454 | | - """ |
455 | | - Handle incoming control message. |
456 | | -
|
457 | | - :param event: Incoming control message. |
458 | | - :type event: splitio.push.sse.parser.ControlMessage |
459 | | - """ |
460 | | - _LOGGER.debug('handling control event: %s', str(event)) |
461 | | - feedback = self._status_tracker.handle_control_message(event) |
462 | | - if feedback is not None: |
463 | | - await self._feedback_loop.put(feedback) |
464 | | - |
465 | | - async def _handle_occupancy(self, event): |
466 | | - """ |
467 | | - Handle incoming notification message. |
468 | | -
|
469 | | - :param event: Incoming occupancy message. |
470 | | - :type event: splitio.push.sse.parser.Occupancy |
471 | | - """ |
472 | | - _LOGGER.debug('handling occupancy event: %s', str(event)) |
473 | | - feedback = self._status_tracker.handle_occupancy(event) |
474 | | - if feedback is not None: |
475 | | - await self._feedback_loop.put(feedback) |
476 | | - |
477 | | - async def _handle_error(self, event): |
478 | | - """ |
479 | | - Handle incoming error message. |
480 | | -
|
481 | | - :param event: Incoming ably error |
482 | | - :type event: splitio.push.sse.parser.AblyError |
483 | | - """ |
484 | | - _LOGGER.debug('handling ably error event: %s', str(event)) |
485 | | - feedback = self._status_tracker.handle_ably_error(event) |
486 | | - if feedback is not None: |
487 | | - await self._feedback_loop.put(feedback) |
488 | | - |
489 | | - async def _handle_connection_ready(self): |
490 | | - """Handle a successful connection to SSE.""" |
491 | | - await self._feedback_loop.put(Status.PUSH_SUBSYSTEM_UP) |
492 | | - _LOGGER.info('sse initial event received. enabling') |
493 | | - |
494 | | - async def _handle_connection_end(self): |
495 | | - """ |
496 | | - Handle a connection ending. |
497 | | -
|
498 | | - If the connection shutdown was not requested, trigger a restart. |
499 | | - """ |
500 | | - feedback = self._status_tracker.handle_disconnect() |
501 | | - if feedback is not None: |
502 | | - await self._feedback_loop.put(feedback) |
0 commit comments