File tree Expand file tree Collapse file tree 3 files changed +6
-14
lines changed
Expand file tree Collapse file tree 3 files changed +6
-14
lines changed Original file line number Diff line number Diff line change @@ -99,8 +99,10 @@ def _streaming_feedback_handler(self):
9999 self ._synchronizer .start_periodic_fetching ()
100100 _LOGGER .info ('streaming temporarily down. starting periodic fetching' )
101101 elif status == Status .PUSH_RETRYABLE_ERROR :
102- self ._synchronizer . start_periodic_fetching ( )
102+ self ._push . update_workers_status ( False )
103103 self ._push .stop (True )
104+ self ._synchronizer .sync_all ()
105+ self ._synchronizer .start_periodic_fetching ()
104106 time .sleep (self ._backoff .get ())
105107 self ._push .start ()
106108 _LOGGER .info ('error in streaming. restarting flow' )
Original file line number Diff line number Diff line change @@ -94,10 +94,8 @@ def _execution_wrapper(self):
9494 elif msg == __TASK_FORCE_RUN__ :
9595 _LOGGER .debug ("Force execution signal received. Running now" )
9696 if not _safe_run (self ._main ):
97- _LOGGER .error (
98- "An error occurred when executing the task. "
99- "Retrying after perio expires"
100- )
97+ _LOGGER .error ("An error occurred when executing the task. "
98+ "Retrying after perio expires" )
10199 continue
102100 except queue .Empty :
103101 # If no message was received, the timeout has expired
Original file line number Diff line number Diff line change 1313except ImportError :
1414 from urlparse import parse_qs
1515
16- import pytest
1716
18- @pytest .mark .skip
1917class StreamingIntegrationTests (object ):
2018 """Test streaming operation and failover."""
2119
@@ -551,13 +549,7 @@ def test_start_without_occupancy(self): # pylint: disable=too-many-locals
551549 assert req .path == '/api/splitChanges?since=1'
552550 assert req .headers ['authorization' ] == 'Bearer some_apikey'
553551
554- # Fetch after first notification
555- req = split_backend_requests .get ()
556- assert req .method == 'GET'
557- assert req .path == '/api/splitChanges?since=1'
558- assert req .headers ['authorization' ] == 'Bearer some_apikey'
559-
560- # Iteration until since == till
552+ # Second iteration of previous syncAll
561553 req = split_backend_requests .get ()
562554 assert req .method == 'GET'
563555 assert req .path == '/api/splitChanges?since=2'
You can’t perform that action at this time.
0 commit comments