55# pyright: reportUnknownLambdaType=false
66from collections .abc import Callable
77from dataclasses import dataclass
8+ import threading
9+ import time
810from typing import Annotated , Any , Final , TypedDict
911
1012import annotated_types
13+ import anyio
1114import pytest
1215from dirty_equals import IsPartialDict
1316from pydantic import BaseModel , Field
@@ -1202,3 +1205,118 @@ def func_with_metadata() -> Annotated[int, Field(gt=1)]: ... # pragma: no branc
12021205
12031206 assert meta .output_schema is not None
12041207 assert meta .output_schema ["properties" ]["result" ] == {"exclusiveMinimum" : 1 , "title" : "Result" , "type" : "integer" }
1208+
1209+ @pytest .mark .anyio
1210+ async def test_sync_function_runs_in_worker_thread ():
1211+ """
1212+ Ensure synchronous tools are executed in a worker thread via anyio.to_thread.run_sync,
1213+ instead of blocking the event loop thread.
1214+ """
1215+
1216+ def blocking_sync (delay : float ) -> int : # pragma: no cover
1217+ # Sleep to simulate a blocking sync tool
1218+ time .sleep (delay )
1219+ # Return the thread ID we are running on
1220+ return threading .get_ident ()
1221+
1222+ meta = func_metadata (blocking_sync )
1223+
1224+ # This is the event loop thread ID (where the test itself is running)
1225+ loop_thread_id = threading .get_ident ()
1226+
1227+ # Call the sync function through call_fn_with_arg_validation
1228+ result_thread_id = await meta .call_fn_with_arg_validation (
1229+ blocking_sync ,
1230+ fn_is_async = False ,
1231+ arguments_to_validate = {"delay" : 0.01 },
1232+ arguments_to_pass_directly = None ,
1233+ )
1234+
1235+ # The tool should have executed in a different worker thread
1236+ assert result_thread_id != loop_thread_id
1237+
1238+
1239+ @pytest .mark .anyio
1240+ async def test_sync_blocking_tool_does_not_block_event_loop ():
1241+ """
1242+ A blocking synchronous tool (time.sleep) should not prevent other tasks
1243+ on the event loop from running, because it is offloaded to a worker thread.
1244+ """
1245+
1246+ def blocking_tool (delay : float ) -> str : # pragma: no cover
1247+ time .sleep (delay )
1248+ return "done"
1249+
1250+ meta = func_metadata (blocking_tool )
1251+
1252+ flag = {"ran" : False }
1253+
1254+ async def run_tool ():
1255+ result = await meta .call_fn_with_arg_validation (
1256+ blocking_tool ,
1257+ fn_is_async = False ,
1258+ arguments_to_validate = {"delay" : 0.2 },
1259+ arguments_to_pass_directly = None ,
1260+ )
1261+ assert result == "done"
1262+
1263+ async def concurrent_task ():
1264+ # If the event loop is *not* blocked, this will run while the tool sleeps
1265+ await anyio .sleep (0.05 )
1266+ flag ["ran" ] = True
1267+
1268+ async with anyio .create_task_group () as tg :
1269+ tg .start_soon (run_tool )
1270+ tg .start_soon (concurrent_task )
1271+
1272+ # If the sync tool had blocked the event loop, concurrent_task would never
1273+ # have executed and flag["ran"] would still be False.
1274+ assert flag ["ran" ] is True
1275+
1276+ @pytest .mark .anyio
1277+ async def test_sync_tool_does_not_block_event_loop () -> None :
1278+ """
1279+ Regression test: sync tools must not run inline on the event loop.
1280+
1281+ If sync tools run inline, this test will fail because `fast_probe`
1282+ won't get scheduled until after `time.sleep`.
1283+ """
1284+
1285+ def slow_sync (x : int ) -> int :
1286+ time .sleep (0.30 ) # intentionally blocks if run on event loop
1287+ return x + 1
1288+
1289+ md = func_metadata (slow_sync )
1290+
1291+ start = anyio .current_time ()
1292+ fast_probe_elapsed : float | None = None
1293+ slow_result : int | None = None
1294+
1295+ async def run_slow () -> None :
1296+ nonlocal slow_result
1297+ # call_fn_with_arg_validation is the execution path used for tools
1298+ slow_result = await md .call_fn_with_arg_validation (
1299+ fn = slow_sync ,
1300+ fn_is_async = False ,
1301+ arguments_to_validate = {"x" : 1 },
1302+ arguments_to_pass_directly = None ,
1303+ )
1304+
1305+ async def fast_probe () -> None :
1306+ nonlocal fast_probe_elapsed
1307+ # If event loop is not blocked, this should run "immediately"
1308+ await anyio .sleep (0 )
1309+ fast_probe_elapsed = anyio .current_time () - start
1310+
1311+ # Keep the whole test bounded even if something regresses badly
1312+ with anyio .fail_after (2 ):
1313+ async with anyio .create_task_group () as tg :
1314+ tg .start_soon (run_slow )
1315+ tg .start_soon (fast_probe )
1316+
1317+ assert slow_result == 2
1318+
1319+ assert fast_probe_elapsed is not None
1320+ # If slow_sync blocks the loop, this will be ~0.30s and fail.
1321+ # If slow_sync is offloaded, this should typically be a few ms.
1322+ assert fast_probe_elapsed < 0.10
0 commit comments