@@ -1015,6 +1015,243 @@ def test_server_closes_connection(self):
10151015 sse_server .stop ()
10161016 split_backend .stop ()
10171017
1018+ def test_ably_errors_handling (self ):
1019+ """Test incoming ably errors and validate its handling."""
1020+ import logging
1021+ logger = logging .getLogger ('splitio' )
1022+ handler = logging .StreamHandler ()
1023+ formatter = logging .Formatter ('%(asctime)s %(name)-12s %(levelname)-8s %(message)s' )
1024+ handler .setFormatter (formatter )
1025+ logger .addHandler (handler )
1026+ logger .setLevel (logging .DEBUG )
1027+ auth_server_response = {
1028+ 'pushEnabled' : True ,
1029+ 'token' : ('eyJhbGciOiJIUzI1NiIsImtpZCI6IjVZOU05US45QnJtR0EiLCJ0eXAiOiJKV1QifQ.'
1030+ 'eyJ4LWFibHktY2FwYWJpbGl0eSI6IntcIk1UWXlNVGN4T1RRNE13PT1fTWpBNE16Y3pO'
1031+ 'RFUxTWc9PV9zZWdtZW50c1wiOltcInN1YnNjcmliZVwiXSxcIk1UWXlNVGN4T1RRNE13P'
1032+ 'T1fTWpBNE16Y3pORFUxTWc9PV9zcGxpdHNcIjpbXCJzdWJzY3JpYmVcIl0sXCJjb250cm'
1033+ '9sX3ByaVwiOltcInN1YnNjcmliZVwiLFwiY2hhbm5lbC1tZXRhZGF0YTpwdWJsaXNoZXJ'
1034+ 'zXCJdLFwiY29udHJvbF9zZWNcIjpbXCJzdWJzY3JpYmVcIixcImNoYW5uZWwtbWV0YWRh'
1035+ 'dGE6cHVibGlzaGVyc1wiXX0iLCJ4LWFibHktY2xpZW50SWQiOiJjbGllbnRJZCIsImV4c'
1036+ 'CI6MTYwNDEwMDU5MSwiaWF0IjoxNjA0MDk2OTkxfQ.aP9BfR534K6J9h8gfDWg_CQgpz5E'
1037+ 'vJh17WlOlAKhcD0' )
1038+ }
1039+
1040+ split_changes = {
1041+ - 1 : {
1042+ 'since' : - 1 ,
1043+ 'till' : 1 ,
1044+ 'splits' : [make_simple_split ('split1' , 1 , True , False , 'off' , 'user' , True )]
1045+ },
1046+ 1 : {'since' : 1 , 'till' : 1 , 'splits' : []}
1047+ }
1048+
1049+ segment_changes = {}
1050+ split_backend_requests = Queue ()
1051+ split_backend = SplitMockServer (split_changes , segment_changes , split_backend_requests ,
1052+ auth_server_response )
1053+ sse_requests = Queue ()
1054+ sse_server = SSEMockServer (sse_requests )
1055+
1056+ split_backend .start ()
1057+ sse_server .start ()
1058+ sse_server .publish (make_initial_event ())
1059+ sse_server .publish (make_occupancy ('control_pri' , 2 ))
1060+ sse_server .publish (make_occupancy ('control_sec' , 2 ))
1061+
1062+ kwargs = {
1063+ 'sdk_api_base_url' : 'http://localhost:%d/api' % split_backend .port (),
1064+ 'events_api_base_url' : 'http://localhost:%d/api' % split_backend .port (),
1065+ 'auth_api_base_url' : 'http://localhost:%d/api' % split_backend .port (),
1066+ 'streaming_api_base_url' : 'http://localhost:%d' % sse_server .port (),
1067+ 'config' : {'connectTimeout' : 10000 , 'featuresRefreshRate' : 10 }
1068+ }
1069+
1070+ factory = get_factory ('some_apikey' , ** kwargs )
1071+ factory .block_until_ready (1 )
1072+ assert factory .ready
1073+ time .sleep (2 )
1074+
1075+ # Get a hook of the task so we can query its status
1076+ task = factory ._sync_manager ._synchronizer ._split_tasks .split_task ._task # pylint:disable=protected-access
1077+ assert not task .running ()
1078+
1079+ assert factory .client ().get_treatment ('maldo' , 'split1' ) == 'on'
1080+
1081+ # Make a change in the BE but don't send the event.
1082+ # We'll send an ignorable error and check it has nothing happened
1083+ split_changes [1 ] = {
1084+ 'since' : 1 ,
1085+ 'till' : 2 ,
1086+ 'splits' : [make_simple_split ('split1' , 2 , True , False , 'off' , 'user' , False )]
1087+ }
1088+ split_changes [2 ] = {'since' : 2 , 'till' : 2 , 'splits' : []}
1089+
1090+ sse_server .publish (make_ably_error_event (60000 , 600 ))
1091+ time .sleep (1 )
1092+ assert factory .client ().get_treatment ('maldo' , 'split1' ) == 'on'
1093+ assert not task .running ()
1094+
1095+ sse_server .publish (make_ably_error_event (40145 , 401 ))
1096+ sse_server .publish (sse_server .GRACEFUL_REQUEST_END )
1097+ time .sleep (3 )
1098+ assert task .running ()
1099+ assert factory .client ().get_treatment ('maldo' , 'split1' ) == 'off'
1100+
1101+ # Re-publish initial events so that the retry succeeds
1102+ sse_server .publish (make_initial_event ())
1103+ sse_server .publish (make_occupancy ('control_pri' , 2 ))
1104+ sse_server .publish (make_occupancy ('control_sec' , 2 ))
1105+ time .sleep (3 )
1106+ assert not task .running ()
1107+
1108+ # Assert streaming is working properly
1109+ split_changes [2 ] = {
1110+ 'since' : 2 ,
1111+ 'till' : 3 ,
1112+ 'splits' : [make_simple_split ('split1' , 3 , True , False , 'off' , 'user' , True )]
1113+ }
1114+ split_changes [3 ] = {'since' : 3 , 'till' : 3 , 'splits' : []}
1115+ sse_server .publish (make_split_change_event (3 ))
1116+ time .sleep (2 )
1117+ assert factory .client ().get_treatment ('maldo' , 'split1' ) == 'on'
1118+ assert not task .running ()
1119+
1120+ # Send a non-retryable ably error
1121+ sse_server .publish (make_ably_error_event (40200 , 402 ))
1122+ sse_server .publish (sse_server .GRACEFUL_REQUEST_END )
1123+ time .sleep (3 )
1124+
1125+ # Assert sync-task is running and the streaming status handler thread is over
1126+ assert task .running ()
1127+ assert 'PushStatusHandler' not in [t .name for t in threading .enumerate ()]
1128+
1129+ # Validate the SSE requests
1130+ sse_request = sse_requests .get ()
1131+ assert sse_request .method == 'GET'
1132+ path , qs = sse_request .path .split ('?' , 1 )
1133+ assert path == '/event-stream'
1134+ qs = parse_qs (qs )
1135+ assert qs ['accessToken' ][0 ] == (
1136+ 'eyJhbGciOiJIUzI1NiIsImtpZCI6IjVZOU05'
1137+ 'US45QnJtR0EiLCJ0eXAiOiJKV1QifQ.eyJ4LWFibHktY2FwYWJpbGl0eSI6IntcIk1UW'
1138+ 'XlNVGN4T1RRNE13PT1fTWpBNE16Y3pORFUxTWc9PV9zZWdtZW50c1wiOltcInN1YnNjc'
1139+ 'mliZVwiXSxcIk1UWXlNVGN4T1RRNE13PT1fTWpBNE16Y3pORFUxTWc9PV9zcGxpdHNcI'
1140+ 'jpbXCJzdWJzY3JpYmVcIl0sXCJjb250cm9sX3ByaVwiOltcInN1YnNjcmliZVwiLFwiY'
1141+ '2hhbm5lbC1tZXRhZGF0YTpwdWJsaXNoZXJzXCJdLFwiY29udHJvbF9zZWNcIjpbXCJzd'
1142+ 'WJzY3JpYmVcIixcImNoYW5uZWwtbWV0YWRhdGE6cHVibGlzaGVyc1wiXX0iLCJ4LWFib'
1143+ 'HktY2xpZW50SWQiOiJjbGllbnRJZCIsImV4cCI6MTYwNDEwMDU5MSwiaWF0IjoxNjA0M'
1144+ 'Dk2OTkxfQ.aP9BfR534K6J9h8gfDWg_CQgpz5EvJh17WlOlAKhcD0'
1145+ )
1146+
1147+ assert set (qs ['channels' ][0 ].split (',' )) == set (['MTYyMTcxOTQ4Mw==_MjA4MzczNDU1Mg==_splits' ,
1148+ 'MTYyMTcxOTQ4Mw==_MjA4MzczNDU1Mg==_segments' ,
1149+ '[?occupancy=metrics.publishers]control_pri' ,
1150+ '[?occupancy=metrics.publishers]control_sec' ])
1151+ assert qs ['v' ][0 ] == '1.1'
1152+
1153+ assert sse_request .method == 'GET'
1154+ path , qs = sse_request .path .split ('?' , 1 )
1155+ assert path == '/event-stream'
1156+ qs = parse_qs (qs )
1157+ assert qs ['accessToken' ][0 ] == (
1158+ 'eyJhbGciOiJIUzI1NiIsImtpZCI6IjVZOU05'
1159+ 'US45QnJtR0EiLCJ0eXAiOiJKV1QifQ.eyJ4LWFibHktY2FwYWJpbGl0eSI6IntcIk1UW'
1160+ 'XlNVGN4T1RRNE13PT1fTWpBNE16Y3pORFUxTWc9PV9zZWdtZW50c1wiOltcInN1YnNjc'
1161+ 'mliZVwiXSxcIk1UWXlNVGN4T1RRNE13PT1fTWpBNE16Y3pORFUxTWc9PV9zcGxpdHNcI'
1162+ 'jpbXCJzdWJzY3JpYmVcIl0sXCJjb250cm9sX3ByaVwiOltcInN1YnNjcmliZVwiLFwiY'
1163+ '2hhbm5lbC1tZXRhZGF0YTpwdWJsaXNoZXJzXCJdLFwiY29udHJvbF9zZWNcIjpbXCJzd'
1164+ 'WJzY3JpYmVcIixcImNoYW5uZWwtbWV0YWRhdGE6cHVibGlzaGVyc1wiXX0iLCJ4LWFib'
1165+ 'HktY2xpZW50SWQiOiJjbGllbnRJZCIsImV4cCI6MTYwNDEwMDU5MSwiaWF0IjoxNjA0M'
1166+ 'Dk2OTkxfQ.aP9BfR534K6J9h8gfDWg_CQgpz5EvJh17WlOlAKhcD0'
1167+ )
1168+
1169+ assert set (qs ['channels' ][0 ].split (',' )) == set (['MTYyMTcxOTQ4Mw==_MjA4MzczNDU1Mg==_splits' ,
1170+ 'MTYyMTcxOTQ4Mw==_MjA4MzczNDU1Mg==_segments' ,
1171+ '[?occupancy=metrics.publishers]control_pri' ,
1172+ '[?occupancy=metrics.publishers]control_sec' ])
1173+ assert qs ['v' ][0 ] == '1.1'
1174+
1175+ # Initial apikey validation
1176+ req = split_backend_requests .get ()
1177+ assert req .method == 'GET'
1178+ assert req .path == '/api/segmentChanges/__SOME_INVALID_SEGMENT__?since=-1'
1179+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1180+
1181+ # Initial splits fetch
1182+ req = split_backend_requests .get ()
1183+ assert req .method == 'GET'
1184+ assert req .path == '/api/splitChanges?since=-1'
1185+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1186+
1187+ # Iteration until since == till
1188+ req = split_backend_requests .get ()
1189+ assert req .method == 'GET'
1190+ assert req .path == '/api/splitChanges?since=1'
1191+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1192+
1193+ # Auth
1194+ req = split_backend_requests .get ()
1195+ assert req .method == 'GET'
1196+ assert req .path == '/api/auth'
1197+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1198+
1199+ # SyncAll after streaming connected
1200+ req = split_backend_requests .get ()
1201+ assert req .method == 'GET'
1202+ assert req .path == '/api/splitChanges?since=1'
1203+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1204+
1205+ # SyncAll retriable error
1206+ req = split_backend_requests .get ()
1207+ assert req .method == 'GET'
1208+ assert req .path == '/api/splitChanges?since=1'
1209+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1210+
1211+ # Iteration until since == till
1212+ req = split_backend_requests .get ()
1213+ assert req .method == 'GET'
1214+ assert req .path == '/api/splitChanges?since=2'
1215+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1216+
1217+ # Auth again
1218+ req = split_backend_requests .get ()
1219+ assert req .method == 'GET'
1220+ assert req .path == '/api/auth'
1221+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1222+
1223+ # SyncAll after push is up
1224+ req = split_backend_requests .get ()
1225+ assert req .method == 'GET'
1226+ assert req .path == '/api/splitChanges?since=2'
1227+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1228+
1229+ # Fetch after notification
1230+ req = split_backend_requests .get ()
1231+ assert req .method == 'GET'
1232+ assert req .path == '/api/splitChanges?since=2'
1233+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1234+
1235+ # Iteration until since == till
1236+ req = split_backend_requests .get ()
1237+ assert req .method == 'GET'
1238+ assert req .path == '/api/splitChanges?since=3'
1239+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1240+
1241+ # SyncAll after non recoverable ably error
1242+ req = split_backend_requests .get ()
1243+ assert req .method == 'GET'
1244+ assert req .path == '/api/splitChanges?since=3'
1245+ assert req .headers ['authorization' ] == 'Bearer some_apikey'
1246+
1247+ # Cleanup
1248+ destroy_event = threading .Event ()
1249+ factory .destroy (destroy_event )
1250+ destroy_event .wait ()
1251+ sse_server .publish (sse_server .GRACEFUL_REQUEST_END )
1252+ sse_server .stop ()
1253+ split_backend .stop ()
1254+
10181255
10191256def make_split_change_event (change_number ):
10201257 """Make a split change event."""
@@ -1105,6 +1342,18 @@ def make_control_event(control_type, timestamp):
11051342 })
11061343 }
11071344
1345+ def make_ably_error_event (code , status ):
1346+ """Make a control event."""
1347+ return {
1348+ 'event' : 'error' ,
1349+ 'data' : json .dumps ({
1350+ 'message' :'Invalid accessToken in request: sarasa' ,
1351+ 'code' : code ,
1352+ 'statusCode' : status ,
1353+ 'href' :"https://help.ably.io/error/%d" % code
1354+ })
1355+ }
1356+
11081357def make_simple_split (name , cn , active , killed , default_treatment , tt , on ):
11091358 """Make a simple split."""
11101359 return {
0 commit comments