@@ -1117,6 +1117,189 @@ async def test_basic_auth_without_header_fails(
11171117 assert error_response ["error" ] == "unauthorized_client"
11181118 assert "Missing or invalid Basic authentication" in error_response ["error_description" ]
11191119
1120+ @pytest .mark .anyio
1121+ async def test_basic_auth_invalid_base64_fails (
1122+ self , test_client : httpx .AsyncClient , mock_oauth_provider : MockOAuthProvider , pkce_challenge : dict [str , str ]
1123+ ):
1124+ """Test that invalid base64 in Basic auth header fails."""
1125+ client_metadata = {
1126+ "redirect_uris" : ["https://client.example.com/callback" ],
1127+ "client_name" : "Basic Auth Client" ,
1128+ "token_endpoint_auth_method" : "client_secret_basic" ,
1129+ "grant_types" : ["authorization_code" , "refresh_token" ],
1130+ }
1131+
1132+ response = await test_client .post ("/register" , json = client_metadata )
1133+ assert response .status_code == 201
1134+ client_info = response .json ()
1135+
1136+ auth_code = f"code_{ int (time .time ())} "
1137+ mock_oauth_provider .auth_codes [auth_code ] = AuthorizationCode (
1138+ code = auth_code ,
1139+ client_id = client_info ["client_id" ],
1140+ code_challenge = pkce_challenge ["code_challenge" ],
1141+ redirect_uri = AnyUrl ("https://client.example.com/callback" ),
1142+ redirect_uri_provided_explicitly = True ,
1143+ scopes = ["read" , "write" ],
1144+ expires_at = time .time () + 600 ,
1145+ )
1146+
1147+ # Send invalid base64
1148+ response = await test_client .post (
1149+ "/token" ,
1150+ headers = {"Authorization" : "Basic !!!invalid-base64!!!" },
1151+ data = {
1152+ "grant_type" : "authorization_code" ,
1153+ "client_id" : client_info ["client_id" ],
1154+ "code" : auth_code ,
1155+ "code_verifier" : pkce_challenge ["code_verifier" ],
1156+ "redirect_uri" : "https://client.example.com/callback" ,
1157+ },
1158+ )
1159+ assert response .status_code == 401
1160+ error_response = response .json ()
1161+ assert error_response ["error" ] == "unauthorized_client"
1162+ assert "Invalid Basic authentication header" in error_response ["error_description" ]
1163+
1164+ @pytest .mark .anyio
1165+ async def test_basic_auth_no_colon_fails (
1166+ self , test_client : httpx .AsyncClient , mock_oauth_provider : MockOAuthProvider , pkce_challenge : dict [str , str ]
1167+ ):
1168+ """Test that Basic auth without colon separator fails."""
1169+ client_metadata = {
1170+ "redirect_uris" : ["https://client.example.com/callback" ],
1171+ "client_name" : "Basic Auth Client" ,
1172+ "token_endpoint_auth_method" : "client_secret_basic" ,
1173+ "grant_types" : ["authorization_code" , "refresh_token" ],
1174+ }
1175+
1176+ response = await test_client .post ("/register" , json = client_metadata )
1177+ assert response .status_code == 201
1178+ client_info = response .json ()
1179+
1180+ auth_code = f"code_{ int (time .time ())} "
1181+ mock_oauth_provider .auth_codes [auth_code ] = AuthorizationCode (
1182+ code = auth_code ,
1183+ client_id = client_info ["client_id" ],
1184+ code_challenge = pkce_challenge ["code_challenge" ],
1185+ redirect_uri = AnyUrl ("https://client.example.com/callback" ),
1186+ redirect_uri_provided_explicitly = True ,
1187+ scopes = ["read" , "write" ],
1188+ expires_at = time .time () + 600 ,
1189+ )
1190+
1191+ # Send base64 without colon (invalid format)
1192+ import base64
1193+
1194+ invalid_creds = base64 .b64encode (b"no-colon-here" ).decode ()
1195+ response = await test_client .post (
1196+ "/token" ,
1197+ headers = {"Authorization" : f"Basic { invalid_creds } " },
1198+ data = {
1199+ "grant_type" : "authorization_code" ,
1200+ "client_id" : client_info ["client_id" ],
1201+ "code" : auth_code ,
1202+ "code_verifier" : pkce_challenge ["code_verifier" ],
1203+ "redirect_uri" : "https://client.example.com/callback" ,
1204+ },
1205+ )
1206+ assert response .status_code == 401
1207+ error_response = response .json ()
1208+ assert error_response ["error" ] == "unauthorized_client"
1209+ assert "Invalid Basic authentication header" in error_response ["error_description" ]
1210+
1211+ @pytest .mark .anyio
1212+ async def test_basic_auth_client_id_mismatch_fails (
1213+ self , test_client : httpx .AsyncClient , mock_oauth_provider : MockOAuthProvider , pkce_challenge : dict [str , str ]
1214+ ):
1215+ """Test that client_id mismatch between body and Basic auth fails."""
1216+ client_metadata = {
1217+ "redirect_uris" : ["https://client.example.com/callback" ],
1218+ "client_name" : "Basic Auth Client" ,
1219+ "token_endpoint_auth_method" : "client_secret_basic" ,
1220+ "grant_types" : ["authorization_code" , "refresh_token" ],
1221+ }
1222+
1223+ response = await test_client .post ("/register" , json = client_metadata )
1224+ assert response .status_code == 201
1225+ client_info = response .json ()
1226+
1227+ auth_code = f"code_{ int (time .time ())} "
1228+ mock_oauth_provider .auth_codes [auth_code ] = AuthorizationCode (
1229+ code = auth_code ,
1230+ client_id = client_info ["client_id" ],
1231+ code_challenge = pkce_challenge ["code_challenge" ],
1232+ redirect_uri = AnyUrl ("https://client.example.com/callback" ),
1233+ redirect_uri_provided_explicitly = True ,
1234+ scopes = ["read" , "write" ],
1235+ expires_at = time .time () + 600 ,
1236+ )
1237+
1238+ # Send different client_id in Basic auth header
1239+ import base64
1240+
1241+ wrong_creds = base64 .b64encode (f"wrong-client-id:{ client_info ['client_secret' ]} " .encode ()).decode ()
1242+ response = await test_client .post (
1243+ "/token" ,
1244+ headers = {"Authorization" : f"Basic { wrong_creds } " },
1245+ data = {
1246+ "grant_type" : "authorization_code" ,
1247+ "client_id" : client_info ["client_id" ], # Correct client_id in body
1248+ "code" : auth_code ,
1249+ "code_verifier" : pkce_challenge ["code_verifier" ],
1250+ "redirect_uri" : "https://client.example.com/callback" ,
1251+ },
1252+ )
1253+ assert response .status_code == 401
1254+ error_response = response .json ()
1255+ assert error_response ["error" ] == "unauthorized_client"
1256+ assert "Client ID mismatch" in error_response ["error_description" ]
1257+
1258+ @pytest .mark .anyio
1259+ async def test_none_auth_method_public_client (
1260+ self , test_client : httpx .AsyncClient , mock_oauth_provider : MockOAuthProvider , pkce_challenge : dict [str , str ]
1261+ ):
1262+ """Test that 'none' authentication method works for public clients."""
1263+ client_metadata = {
1264+ "redirect_uris" : ["https://client.example.com/callback" ],
1265+ "client_name" : "Public Client" ,
1266+ "token_endpoint_auth_method" : "none" ,
1267+ "grant_types" : ["authorization_code" , "refresh_token" ],
1268+ }
1269+
1270+ response = await test_client .post ("/register" , json = client_metadata )
1271+ assert response .status_code == 201
1272+ client_info = response .json ()
1273+ assert client_info ["token_endpoint_auth_method" ] == "none"
1274+ # Public clients should not have a client_secret
1275+ assert "client_secret" not in client_info or client_info .get ("client_secret" ) is None
1276+
1277+ auth_code = f"code_{ int (time .time ())} "
1278+ mock_oauth_provider .auth_codes [auth_code ] = AuthorizationCode (
1279+ code = auth_code ,
1280+ client_id = client_info ["client_id" ],
1281+ code_challenge = pkce_challenge ["code_challenge" ],
1282+ redirect_uri = AnyUrl ("https://client.example.com/callback" ),
1283+ redirect_uri_provided_explicitly = True ,
1284+ scopes = ["read" , "write" ],
1285+ expires_at = time .time () + 600 ,
1286+ )
1287+
1288+ # Token request without any client secret
1289+ response = await test_client .post (
1290+ "/token" ,
1291+ data = {
1292+ "grant_type" : "authorization_code" ,
1293+ "client_id" : client_info ["client_id" ],
1294+ "code" : auth_code ,
1295+ "code_verifier" : pkce_challenge ["code_verifier" ],
1296+ "redirect_uri" : "https://client.example.com/callback" ,
1297+ },
1298+ )
1299+ assert response .status_code == 200
1300+ token_response = response .json ()
1301+ assert "access_token" in token_response
1302+
11201303
11211304class TestAuthorizeEndpointErrors :
11221305 """Test error handling in the OAuth authorization endpoint."""
0 commit comments