Skip to content

Commit bd9aebd

Browse files
committed
more tests
1 parent dac5ecc commit bd9aebd

File tree

5 files changed

+319
-64
lines changed

5 files changed

+319
-64
lines changed

splitio/push/manager.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,7 @@ def _trigger_connection_flow(self):
141141
return
142142

143143
self._status_tracker.reset()
144-
res = self._sse_client.start(token)
145-
if res:
146-
print(res)
144+
if self._sse_client.start(token):
147145
self._setup_next_token_refresh(token)
148146
self._running = True
149147

splitio/push/parser.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ class UpdateType(Enum):
3737
class ControlType(Enum):
3838
"""Control type enumeration."""
3939

40-
STREAMING_ENABLED = 0
41-
STREAMING_PAUSED = 1
42-
STREAMING_DISABLED = 2
40+
STREAMING_ENABLED = 'STREAMING_ENABLED'
41+
STREAMING_PAUSED = 'STREAMING_PAUSED'
42+
STREAMING_DISABLED = 'STREAMING_DISABLED'
4343

4444

4545
TAG_OCCUPANCY = '[meta]occupancy'

splitio/push/status_tracker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
"""NotificationManagerKeeper implementation."""
2-
from collections import defaultdict
32
from enum import Enum
43
import logging
54
import six
@@ -90,7 +89,7 @@ def handle_control_message(self, event):
9089
:type event: splitio.push.parser.ControlMessage
9190
"""
9291
# we don't care about control messages if a disconnection is expected
93-
if self._shutdown_expected:
92+
if self._shutdown_expected:
9493
return None
9594

9695
if self._timestamps.control > event.timestamp:

splitio/sync/manager.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,14 @@ def _streaming_feedback_handler(self):
103103
self._push.stop(True)
104104
self._synchronizer.sync_all()
105105
self._synchronizer.start_periodic_fetching()
106-
time.sleep(self._backoff.get())
106+
how_long = self._backoff.get()
107+
_LOGGER.info('error in streaming. restarting flow in %d seconds', how_long)
108+
time.sleep(how_long)
107109
self._push.start()
108-
_LOGGER.info('error in streaming. restarting flow')
109110
elif status == Status.PUSH_NONRETRYABLE_ERROR:
110-
self._synchronizer.start_periodic_fetching()
111+
self._push.update_workers_status(False)
111112
self._push.stop(False)
113+
self._synchronizer.sync_all()
114+
self._synchronizer.start_periodic_fetching()
112115
_LOGGER.info('non-recoverable error in streaming. switching to polling.')
113116
return

0 commit comments

Comments
 (0)