Skip to content

Commit 1069ad3

Browse files
committed
more tests
1 parent bd9aebd commit 1069ad3

File tree

1 file changed

+235
-1
lines changed

1 file changed

+235
-1
lines changed

tests/integration/test_streaming_e2e.py

Lines changed: 235 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Streaming integration tests."""
22
# pylint:disable=no-self-use,invalid-name,too-many-arguments,too-few-public-methods,line-too-long
3-
# pylint:disable=too-many-statements,too-many-locals
3+
# pylint:disable=too-many-statements,too-many-locals,too-many-lines
44
import threading
55
import time
66
import json
@@ -404,6 +404,18 @@ def test_occupancy_flicker(self):
404404
assert req.path == '/api/splitChanges?since=4'
405405
assert req.headers['authorization'] == 'Bearer some_apikey'
406406

407+
# Split kill
408+
req = split_backend_requests.get()
409+
assert req.method == 'GET'
410+
assert req.path == '/api/splitChanges?since=4'
411+
assert req.headers['authorization'] == 'Bearer some_apikey'
412+
413+
# Iteration until since == till
414+
req = split_backend_requests.get()
415+
assert req.method == 'GET'
416+
assert req.path == '/api/splitChanges?since=5'
417+
assert req.headers['authorization'] == 'Bearer some_apikey'
418+
407419
# Cleanup
408420
destroy_event = threading.Event()
409421
factory.destroy(destroy_event)
@@ -782,6 +794,228 @@ def test_streaming_status_changes(self):
782794
sse_server.stop()
783795
split_backend.stop()
784796

797+
def test_server_closes_connection(self):
798+
"""Test that if the server closes the connection, the whole flow is retried with BO."""
799+
auth_server_response = {
800+
'pushEnabled': True,
801+
'token': ('eyJhbGciOiJIUzI1NiIsImtpZCI6IjVZOU05US45QnJtR0EiLCJ0eXAiOiJKV1QifQ.'
802+
'eyJ4LWFibHktY2FwYWJpbGl0eSI6IntcIk1UWXlNVGN4T1RRNE13PT1fTWpBNE16Y3pO'
803+
'RFUxTWc9PV9zZWdtZW50c1wiOltcInN1YnNjcmliZVwiXSxcIk1UWXlNVGN4T1RRNE13P'
804+
'T1fTWpBNE16Y3pORFUxTWc9PV9zcGxpdHNcIjpbXCJzdWJzY3JpYmVcIl0sXCJjb250cm'
805+
'9sX3ByaVwiOltcInN1YnNjcmliZVwiLFwiY2hhbm5lbC1tZXRhZGF0YTpwdWJsaXNoZXJ'
806+
'zXCJdLFwiY29udHJvbF9zZWNcIjpbXCJzdWJzY3JpYmVcIixcImNoYW5uZWwtbWV0YWRh'
807+
'dGE6cHVibGlzaGVyc1wiXX0iLCJ4LWFibHktY2xpZW50SWQiOiJjbGllbnRJZCIsImV4c'
808+
'CI6MTYwNDEwMDU5MSwiaWF0IjoxNjA0MDk2OTkxfQ.aP9BfR534K6J9h8gfDWg_CQgpz5E'
809+
'vJh17WlOlAKhcD0')
810+
}
811+
812+
split_changes = {
813+
-1: {
814+
'since': -1,
815+
'till': 1,
816+
'splits': [make_simple_split('split1', 1, True, False, 'on', 'user', True)]
817+
},
818+
1: {
819+
'since': 1,
820+
'till': 1,
821+
'splits': []
822+
}
823+
}
824+
825+
segment_changes = {}
826+
split_backend_requests = Queue()
827+
split_backend = SplitMockServer(split_changes, segment_changes, split_backend_requests,
828+
auth_server_response)
829+
sse_requests = Queue()
830+
sse_server = SSEMockServer(sse_requests)
831+
832+
split_backend.start()
833+
sse_server.start()
834+
sse_server.publish(make_initial_event())
835+
sse_server.publish(make_occupancy('control_pri', 2))
836+
sse_server.publish(make_occupancy('control_sec', 2))
837+
838+
kwargs = {
839+
'sdk_api_base_url': 'http://localhost:%d/api' % split_backend.port(),
840+
'events_api_base_url': 'http://localhost:%d/api' % split_backend.port(),
841+
'auth_api_base_url': 'http://localhost:%d/api' % split_backend.port(),
842+
'streaming_api_base_url': 'http://localhost:%d' % sse_server.port(),
843+
'config': {'connectTimeout': 10000, 'featuresRefreshRate': 100,
844+
'segmentsRefreshRate': 100, 'metricsRefreshRate': 100,
845+
'impressionsRefreshRate': 100, 'eventsPushRate': 100}
846+
}
847+
848+
factory = get_factory('some_apikey', **kwargs)
849+
factory.block_until_ready(1)
850+
assert factory.ready
851+
assert factory.client().get_treatment('maldo', 'split1') == 'on'
852+
task = factory._sync_manager._synchronizer._split_tasks.split_task._task # pylint:disable=protected-access
853+
assert not task.running()
854+
855+
time.sleep(1)
856+
split_changes[1] = {
857+
'since': 1,
858+
'till': 2,
859+
'splits': [make_simple_split('split1', 2, True, False, 'off', 'user', False)]
860+
}
861+
split_changes[2] = {'since': 2, 'till': 2, 'splits': []}
862+
sse_server.publish(make_split_change_event(2))
863+
time.sleep(1)
864+
assert factory.client().get_treatment('maldo', 'split1') == 'off'
865+
866+
sse_server.publish(SSEMockServer.GRACEFUL_REQUEST_END)
867+
time.sleep(1)
868+
assert factory.client().get_treatment('maldo', 'split1') == 'off'
869+
assert task.running()
870+
871+
time.sleep(2) # wait for the backoff to expire so streaming gets re-attached
872+
873+
# re-send initial event AND occupancy
874+
sse_server.publish(make_initial_event())
875+
sse_server.publish(make_occupancy('control_pri', 2))
876+
sse_server.publish(make_occupancy('control_sec', 2))
877+
time.sleep(2)
878+
879+
assert not task.running()
880+
split_changes[2] = {
881+
'since': 2,
882+
'till': 3,
883+
'splits': [make_simple_split('split1', 3, True, False, 'off', 'user', True)]
884+
}
885+
split_changes[3] = {'since': 3, 'till': 3, 'splits': []}
886+
sse_server.publish(make_split_change_event(3))
887+
time.sleep(1)
888+
assert factory.client().get_treatment('maldo', 'split1') == 'on'
889+
assert not task.running()
890+
891+
# Validate the SSE requests
892+
sse_request = sse_requests.get()
893+
assert sse_request.method == 'GET'
894+
path, qs = sse_request.path.split('?', 1)
895+
assert path == '/event-stream'
896+
qs = parse_qs(qs)
897+
assert qs['accessToken'][0] == (
898+
'eyJhbGciOiJIUzI1NiIsImtpZCI6IjVZOU05'
899+
'US45QnJtR0EiLCJ0eXAiOiJKV1QifQ.eyJ4LWFibHktY2FwYWJpbGl0eSI6IntcIk1UW'
900+
'XlNVGN4T1RRNE13PT1fTWpBNE16Y3pORFUxTWc9PV9zZWdtZW50c1wiOltcInN1YnNjc'
901+
'mliZVwiXSxcIk1UWXlNVGN4T1RRNE13PT1fTWpBNE16Y3pORFUxTWc9PV9zcGxpdHNcI'
902+
'jpbXCJzdWJzY3JpYmVcIl0sXCJjb250cm9sX3ByaVwiOltcInN1YnNjcmliZVwiLFwiY'
903+
'2hhbm5lbC1tZXRhZGF0YTpwdWJsaXNoZXJzXCJdLFwiY29udHJvbF9zZWNcIjpbXCJzd'
904+
'WJzY3JpYmVcIixcImNoYW5uZWwtbWV0YWRhdGE6cHVibGlzaGVyc1wiXX0iLCJ4LWFib'
905+
'HktY2xpZW50SWQiOiJjbGllbnRJZCIsImV4cCI6MTYwNDEwMDU5MSwiaWF0IjoxNjA0M'
906+
'Dk2OTkxfQ.aP9BfR534K6J9h8gfDWg_CQgpz5EvJh17WlOlAKhcD0'
907+
)
908+
909+
assert set(qs['channels'][0].split(',')) == set(['MTYyMTcxOTQ4Mw==_MjA4MzczNDU1Mg==_splits',
910+
'MTYyMTcxOTQ4Mw==_MjA4MzczNDU1Mg==_segments',
911+
'[?occupancy=metrics.publishers]control_pri',
912+
'[?occupancy=metrics.publishers]control_sec'])
913+
assert qs['v'][0] == '1.1'
914+
915+
sse_request = sse_requests.get()
916+
assert sse_request.method == 'GET'
917+
path, qs = sse_request.path.split('?', 1)
918+
assert path == '/event-stream'
919+
qs = parse_qs(qs)
920+
assert qs['accessToken'][0] == (
921+
'eyJhbGciOiJIUzI1NiIsImtpZCI6IjVZOU05'
922+
'US45QnJtR0EiLCJ0eXAiOiJKV1QifQ.eyJ4LWFibHktY2FwYWJpbGl0eSI6IntcIk1UW'
923+
'XlNVGN4T1RRNE13PT1fTWpBNE16Y3pORFUxTWc9PV9zZWdtZW50c1wiOltcInN1YnNjc'
924+
'mliZVwiXSxcIk1UWXlNVGN4T1RRNE13PT1fTWpBNE16Y3pORFUxTWc9PV9zcGxpdHNcI'
925+
'jpbXCJzdWJzY3JpYmVcIl0sXCJjb250cm9sX3ByaVwiOltcInN1YnNjcmliZVwiLFwiY'
926+
'2hhbm5lbC1tZXRhZGF0YTpwdWJsaXNoZXJzXCJdLFwiY29udHJvbF9zZWNcIjpbXCJzd'
927+
'WJzY3JpYmVcIixcImNoYW5uZWwtbWV0YWRhdGE6cHVibGlzaGVyc1wiXX0iLCJ4LWFib'
928+
'HktY2xpZW50SWQiOiJjbGllbnRJZCIsImV4cCI6MTYwNDEwMDU5MSwiaWF0IjoxNjA0M'
929+
'Dk2OTkxfQ.aP9BfR534K6J9h8gfDWg_CQgpz5EvJh17WlOlAKhcD0'
930+
)
931+
932+
assert set(qs['channels'][0].split(',')) == set(['MTYyMTcxOTQ4Mw==_MjA4MzczNDU1Mg==_splits',
933+
'MTYyMTcxOTQ4Mw==_MjA4MzczNDU1Mg==_segments',
934+
'[?occupancy=metrics.publishers]control_pri',
935+
'[?occupancy=metrics.publishers]control_sec'])
936+
assert qs['v'][0] == '1.1'
937+
938+
# Initial apikey validation
939+
req = split_backend_requests.get()
940+
assert req.method == 'GET'
941+
assert req.path == '/api/segmentChanges/__SOME_INVALID_SEGMENT__?since=-1'
942+
assert req.headers['authorization'] == 'Bearer some_apikey'
943+
944+
# Initial splits fetch
945+
req = split_backend_requests.get()
946+
assert req.method == 'GET'
947+
assert req.path == '/api/splitChanges?since=-1'
948+
assert req.headers['authorization'] == 'Bearer some_apikey'
949+
950+
# Iteration until since == till
951+
req = split_backend_requests.get()
952+
assert req.method == 'GET'
953+
assert req.path == '/api/splitChanges?since=1'
954+
assert req.headers['authorization'] == 'Bearer some_apikey'
955+
956+
# Auth
957+
req = split_backend_requests.get()
958+
assert req.method == 'GET'
959+
assert req.path == '/api/auth'
960+
assert req.headers['authorization'] == 'Bearer some_apikey'
961+
962+
# SyncAll after streaming connected
963+
req = split_backend_requests.get()
964+
assert req.method == 'GET'
965+
assert req.path == '/api/splitChanges?since=1'
966+
assert req.headers['authorization'] == 'Bearer some_apikey'
967+
968+
# Fetch after first notification
969+
req = split_backend_requests.get()
970+
assert req.method == 'GET'
971+
assert req.path == '/api/splitChanges?since=1'
972+
assert req.headers['authorization'] == 'Bearer some_apikey'
973+
974+
# Iteration until since == till
975+
req = split_backend_requests.get()
976+
assert req.method == 'GET'
977+
assert req.path == '/api/splitChanges?since=2'
978+
assert req.headers['authorization'] == 'Bearer some_apikey'
979+
980+
# SyncAll on retryable error handling
981+
req = split_backend_requests.get()
982+
assert req.method == 'GET'
983+
assert req.path == '/api/splitChanges?since=2'
984+
assert req.headers['authorization'] == 'Bearer some_apikey'
985+
986+
# Auth after connection breaks
987+
req = split_backend_requests.get()
988+
assert req.method == 'GET'
989+
assert req.path == '/api/auth'
990+
assert req.headers['authorization'] == 'Bearer some_apikey'
991+
992+
# SyncAll after streaming connected again
993+
req = split_backend_requests.get()
994+
assert req.method == 'GET'
995+
assert req.path == '/api/splitChanges?since=2'
996+
assert req.headers['authorization'] == 'Bearer some_apikey'
997+
998+
# Fetch after new notification
999+
req = split_backend_requests.get()
1000+
assert req.method == 'GET'
1001+
assert req.path == '/api/splitChanges?since=2'
1002+
assert req.headers['authorization'] == 'Bearer some_apikey'
1003+
1004+
# Iteration until since == till
1005+
req = split_backend_requests.get()
1006+
assert req.method == 'GET'
1007+
assert req.path == '/api/splitChanges?since=3'
1008+
assert req.headers['authorization'] == 'Bearer some_apikey'
1009+
1010+
# Cleanup
1011+
destroy_event = threading.Event()
1012+
factory.destroy(destroy_event)
1013+
destroy_event.wait()
1014+
sse_server.publish(sse_server.GRACEFUL_REQUEST_END)
1015+
sse_server.stop()
1016+
split_backend.stop()
1017+
1018+
7851019
def make_split_change_event(change_number):
7861020
"""Make a split change event."""
7871021
return {

0 commit comments

Comments
 (0)