From 646a0d57cbdc092d5dd2bbd945e399c762162bd3 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Mon, 21 Jul 2025 10:43:04 +0200 Subject: [PATCH 01/13] Rename `streamablehttp_client` to `streamable_http_client` --- README.md | 8 +-- .../mcp_simple_auth_client/main.py | 4 +- examples/snippets/clients/oauth_client.py | 4 +- examples/snippets/clients/streamable_basic.py | 4 +- src/mcp/client/session_group.py | 6 +- src/mcp/client/streamable_http.py | 23 ++++++- tests/client/test_session_group.py | 4 +- tests/server/fastmcp/test_integration.py | 7 +-- tests/shared/test_streamable_http.py | 62 +++++++++---------- 9 files changed, 69 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index a439184640..c4bf0141a7 100644 --- a/README.md +++ b/README.md @@ -2241,12 +2241,12 @@ Run from the repository root: import asyncio from mcp import ClientSession -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client async def main(): # Connect to a streamable HTTP server - async with streamablehttp_client("http://localhost:8000/mcp") as ( + async with streamable_http_client("http://localhost:8000/mcp") as ( read_stream, write_stream, _, @@ -2374,7 +2374,7 @@ from pydantic import AnyUrl from mcp import ClientSession from mcp.client.auth import OAuthClientProvider, TokenStorage -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -2428,7 +2428,7 @@ async def main(): callback_handler=handle_callback, ) - async with streamablehttp_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _): + async with streamable_http_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() diff --git a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py index 38dc5a9167..2599dc932f 100644 --- a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py +++ b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py @@ -19,7 +19,7 @@ from mcp.client.auth import OAuthClientProvider, TokenStorage from mcp.client.session import ClientSession from mcp.client.sse import sse_client -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -212,7 +212,7 @@ async def _default_redirect_handler(authorization_url: str) -> None: await self._run_session(read_stream, write_stream, None) else: print("šŸ“” Opening StreamableHTTP transport connection with auth...") - async with streamablehttp_client( + async with streamable_http_client( url=self.server_url, auth=oauth_auth, timeout=timedelta(seconds=60), diff --git a/examples/snippets/clients/oauth_client.py b/examples/snippets/clients/oauth_client.py index 45026590a5..38bf7f95f3 100644 --- a/examples/snippets/clients/oauth_client.py +++ b/examples/snippets/clients/oauth_client.py @@ -14,7 +14,7 @@ from mcp import ClientSession from mcp.client.auth import OAuthClientProvider, TokenStorage -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -68,7 +68,7 @@ async def main(): callback_handler=handle_callback, ) - async with streamablehttp_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _): + async with streamable_http_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() diff --git a/examples/snippets/clients/streamable_basic.py b/examples/snippets/clients/streamable_basic.py index 108439613e..071ea81553 100644 --- a/examples/snippets/clients/streamable_basic.py +++ b/examples/snippets/clients/streamable_basic.py @@ -6,12 +6,12 @@ import asyncio from mcp import ClientSession -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client async def main(): # Connect to a streamable HTTP server - async with streamablehttp_client("http://localhost:8000/mcp") as ( + async with streamable_http_client("http://localhost:8000/mcp") as ( read_stream, write_stream, _, diff --git a/src/mcp/client/session_group.py b/src/mcp/client/session_group.py index da45923e2a..8b86cf82bd 100644 --- a/src/mcp/client/session_group.py +++ b/src/mcp/client/session_group.py @@ -25,7 +25,7 @@ from mcp.client.session import ElicitationFnT, ListRootsFnT, LoggingFnT, MessageHandlerFnT, SamplingFnT from mcp.client.sse import sse_client from mcp.client.stdio import StdioServerParameters -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.shared.exceptions import McpError from mcp.shared.session import ProgressFnT @@ -47,7 +47,7 @@ class SseServerParameters(BaseModel): class StreamableHttpParameters(BaseModel): - """Parameters for intializing a streamablehttp_client.""" + """Parameters for intializing a streamable_http_client.""" # The endpoint URL. url: str @@ -309,7 +309,7 @@ async def _establish_session( ) read, write = await session_stack.enter_async_context(client) else: - client = streamablehttp_client( + client = streamable_http_client( url=server_params.url, headers=server_params.headers, timeout=server_params.timeout, diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index fa0524e6ef..c2842e9d48 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -17,6 +17,7 @@ from anyio.abc import TaskGroup from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from httpx_sse import EventSource, ServerSentEvent, aconnect_sse +from typing_extensions import deprecated from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client from mcp.shared.message import ClientMessageMetadata, SessionMessage @@ -560,7 +561,7 @@ def get_session_id(self) -> str | None: @asynccontextmanager -async def streamablehttp_client( +async def streamable_http_client( url: str, headers: dict[str, str] | None = None, timeout: float | timedelta = 30, @@ -629,3 +630,23 @@ def start_get_stream() -> None: finally: await read_stream_writer.aclose() await write_stream.aclose() + + +@deprecated("Use `streamable_http_client` instead.") +@asynccontextmanager +async def streamablehttp_client( + url: str, + headers: dict[str, str] | None = None, + timeout: float | timedelta = 30, + sse_read_timeout: float | timedelta = 60 * 5, + terminate_on_close: bool = True, +) -> AsyncGenerator[ + tuple[ + MemoryObjectReceiveStream[SessionMessage | Exception], + MemoryObjectSendStream[SessionMessage], + GetSessionIdCallback, + ], + None, +]: + async with streamable_http_client(url, headers, timeout, sse_read_timeout, terminate_on_close) as streams: + yield streams diff --git a/tests/client/test_session_group.py b/tests/client/test_session_group.py index e61ea572b4..669865fd60 100644 --- a/tests/client/test_session_group.py +++ b/tests/client/test_session_group.py @@ -280,7 +280,7 @@ async def test_disconnect_non_existent_server(self): ( StreamableHttpParameters(url="http://test.com/stream", terminate_on_close=False), "streamablehttp", - "mcp.client.session_group.streamablehttp_client", + "mcp.client.session_group.streamable_http_client", ), # url, headers, timeout, sse_read_timeout, terminate_on_close ], ) @@ -296,7 +296,7 @@ async def test_establish_session_parameterized( mock_read_stream = mock.AsyncMock(name=f"{client_type_name}Read") mock_write_stream = mock.AsyncMock(name=f"{client_type_name}Write") - # streamablehttp_client's __aenter__ returns three values + # streamable_http_client's __aenter__ returns three values if client_type_name == "streamablehttp": mock_extra_stream_val = mock.AsyncMock(name="StreamableExtra") mock_client_cm_instance.__aenter__.return_value = ( diff --git a/tests/server/fastmcp/test_integration.py b/tests/server/fastmcp/test_integration.py index b1cefca29c..887aa95887 100644 --- a/tests/server/fastmcp/test_integration.py +++ b/tests/server/fastmcp/test_integration.py @@ -34,10 +34,7 @@ ) from mcp.client.session import ClientSession from mcp.client.sse import sse_client -from mcp.client.streamable_http import GetSessionIdCallback, streamablehttp_client -from mcp.shared.context import RequestContext -from mcp.shared.message import SessionMessage -from mcp.shared.session import RequestResponder +from mcp.client.streamable_http import streamable_http_client from mcp.types import ( ClientResult, CreateMessageRequestParams, @@ -179,7 +176,7 @@ def create_client_for_transport(transport: str, server_url: str): return sse_client(endpoint) elif transport == "streamable-http": endpoint = f"{server_url}/mcp" - return streamablehttp_client(endpoint) + return streamable_http_client(endpoint) else: # pragma: no cover raise ValueError(f"Invalid transport: {transport}") diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index a626e73858..5e0780eea1 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -25,7 +25,7 @@ import mcp.types as types from mcp.client.session import ClientSession -from mcp.client.streamable_http import StreamableHTTPTransport, streamablehttp_client +from mcp.client.streamable_http import StreamableHTTPTransport, streamable_http_client from mcp.server import Server from mcp.server.streamable_http import ( MCP_PROTOCOL_VERSION_HEADER, @@ -972,7 +972,7 @@ async def http_client(basic_server: None, basic_server_url: str): # pragma: no @pytest.fixture async def initialized_client_session(basic_server: None, basic_server_url: str): """Create initialized StreamableHTTP client session.""" - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -986,9 +986,9 @@ async def initialized_client_session(basic_server: None, basic_server_url: str): @pytest.mark.anyio -async def test_streamablehttp_client_basic_connection(basic_server: None, basic_server_url: str): +async def test_streamable_http_client_basic_connection(basic_server, basic_server_url): """Test basic client connection with initialization.""" - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -1004,7 +1004,7 @@ async def test_streamablehttp_client_basic_connection(basic_server: None, basic_ @pytest.mark.anyio -async def test_streamablehttp_client_resource_read(initialized_client_session: ClientSession): +async def test_streamable_http_client_resource_read(initialized_client_session): """Test client resource read functionality.""" response = await initialized_client_session.read_resource(uri=AnyUrl("foobar://test-resource")) assert len(response.contents) == 1 @@ -1014,7 +1014,7 @@ async def test_streamablehttp_client_resource_read(initialized_client_session: C @pytest.mark.anyio -async def test_streamablehttp_client_tool_invocation(initialized_client_session: ClientSession): +async def test_streamable_http_client_tool_invocation(initialized_client_session): """Test client tool invocation.""" # First list tools tools = await initialized_client_session.list_tools() @@ -1029,7 +1029,7 @@ async def test_streamablehttp_client_tool_invocation(initialized_client_session: @pytest.mark.anyio -async def test_streamablehttp_client_error_handling(initialized_client_session: ClientSession): +async def test_streamable_http_client_error_handling(initialized_client_session): """Test error handling in client.""" with pytest.raises(McpError) as exc_info: await initialized_client_session.read_resource(uri=AnyUrl("unknown://test-error")) @@ -1038,9 +1038,9 @@ async def test_streamablehttp_client_error_handling(initialized_client_session: @pytest.mark.anyio -async def test_streamablehttp_client_session_persistence(basic_server: None, basic_server_url: str): +async def test_streamable_http_client_session_persistence(basic_server, basic_server_url): """Test that session ID persists across requests.""" - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -1066,9 +1066,9 @@ async def test_streamablehttp_client_session_persistence(basic_server: None, bas @pytest.mark.anyio -async def test_streamablehttp_client_json_response(json_response_server: None, json_server_url: str): +async def test_streamable_http_client_json_response(json_response_server, json_server_url): """Test client with JSON response mode.""" - async with streamablehttp_client(f"{json_server_url}/mcp") as ( + async with streamable_http_client(f"{json_server_url}/mcp") as ( read_stream, write_stream, _, @@ -1094,7 +1094,7 @@ async def test_streamablehttp_client_json_response(json_response_server: None, j @pytest.mark.anyio -async def test_streamablehttp_client_get_stream(basic_server: None, basic_server_url: str): +async def test_streamable_http_client_get_stream(basic_server, basic_server_url): """Test GET stream functionality for server-initiated messages.""" import mcp.types as types @@ -1107,7 +1107,7 @@ async def message_handler( # pragma: no branch if isinstance(message, types.ServerNotification): # pragma: no branch notifications_received.append(message) - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -1134,13 +1134,13 @@ async def message_handler( # pragma: no branch @pytest.mark.anyio -async def test_streamablehttp_client_session_termination(basic_server: None, basic_server_url: str): +async def test_streamable_http_client_session_termination(basic_server, basic_server_url): """Test client session termination functionality.""" captured_session_id = None - # Create the streamablehttp_client with a custom httpx client to capture headers - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + # Create the streamable_http_client with a custom httpx client to capture headers + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, get_session_id, @@ -1160,7 +1160,7 @@ async def test_streamablehttp_client_session_termination(basic_server: None, bas if captured_session_id: # pragma: no cover headers[MCP_SESSION_ID_HEADER] = captured_session_id - async with streamablehttp_client(f"{basic_server_url}/mcp", headers=headers) as ( + async with streamable_http_client(f"{basic_server_url}/mcp", headers=headers) as ( read_stream, write_stream, _, @@ -1175,9 +1175,7 @@ async def test_streamablehttp_client_session_termination(basic_server: None, bas @pytest.mark.anyio -async def test_streamablehttp_client_session_termination_204( - basic_server: None, basic_server_url: str, monkeypatch: pytest.MonkeyPatch -): +async def test_streamable_http_client_session_termination_204(basic_server, basic_server_url, monkeypatch): """Test client session termination functionality with a 204 response. This test patches the httpx client to return a 204 response for DELETEs. @@ -1205,8 +1203,8 @@ async def mock_delete(self: httpx.AsyncClient, *args: Any, **kwargs: Any) -> htt captured_session_id = None - # Create the streamablehttp_client with a custom httpx client to capture headers - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + # Create the streamable_http_client with a custom httpx client to capture headers + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, get_session_id, @@ -1226,7 +1224,7 @@ async def mock_delete(self: httpx.AsyncClient, *args: Any, **kwargs: Any) -> htt if captured_session_id: # pragma: no cover headers[MCP_SESSION_ID_HEADER] = captured_session_id - async with streamablehttp_client(f"{basic_server_url}/mcp", headers=headers) as ( + async with streamable_http_client(f"{basic_server_url}/mcp", headers=headers) as ( read_stream, write_stream, _, @@ -1241,7 +1239,7 @@ async def mock_delete(self: httpx.AsyncClient, *args: Any, **kwargs: Any) -> htt @pytest.mark.anyio -async def test_streamablehttp_client_resumption(event_server: tuple[SimpleEventStore, str]): +async def test_streamable_http_client_resumption(event_server): """Test client session resumption using sync primitives for reliable coordination.""" _, server_url = event_server @@ -1268,7 +1266,7 @@ async def on_resumption_token_update(token: str) -> None: captured_resumption_token = token # First, start the client session and begin the tool that waits on lock - async with streamablehttp_client(f"{server_url}/mcp", terminate_on_close=False) as ( + async with streamable_http_client(f"{server_url}/mcp", terminate_on_close=False) as ( read_stream, write_stream, get_session_id, @@ -1324,7 +1322,7 @@ async def run_tool(): headers[MCP_SESSION_ID_HEADER] = captured_session_id if captured_protocol_version: # pragma: no cover headers[MCP_PROTOCOL_VERSION_HEADER] = captured_protocol_version - async with streamablehttp_client(f"{server_url}/mcp", headers=headers) as ( + async with streamable_http_client(f"{server_url}/mcp", headers=headers) as ( read_stream, write_stream, _, @@ -1391,7 +1389,7 @@ async def sampling_callback( ) # Create client with sampling callback - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -1536,7 +1534,7 @@ async def test_streamablehttp_request_context_propagation(context_aware_server: "X-Trace-Id": "trace-123", } - async with streamablehttp_client(f"{basic_server_url}/mcp", headers=custom_headers) as ( + async with streamable_http_client(f"{basic_server_url}/mcp", headers=custom_headers) as ( read_stream, write_stream, _, @@ -1573,7 +1571,7 @@ async def test_streamablehttp_request_context_isolation(context_aware_server: No "Authorization": f"Bearer token-{i}", } - async with streamablehttp_client(f"{basic_server_url}/mcp", headers=headers) as (read_stream, write_stream, _): + async with streamable_http_client(f"{basic_server_url}/mcp", headers=headers) as (read_stream, write_stream, _): async with ClientSession(read_stream, write_stream) as session: await session.initialize() @@ -1597,7 +1595,7 @@ async def test_streamablehttp_request_context_isolation(context_aware_server: No @pytest.mark.anyio async def test_client_includes_protocol_version_header_after_init(context_aware_server: None, basic_server_url: str): """Test that client includes mcp-protocol-version header after initialization.""" - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -1713,7 +1711,7 @@ async def test_client_crash_handled(basic_server: None, basic_server_url: str): # Simulate bad client that crashes after init async def bad_client(): """Client that triggers ClosedResourceError""" - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, @@ -1731,7 +1729,7 @@ async def bad_client(): await anyio.sleep(0.1) # Try a good client, it should still be able to connect and list tools - async with streamablehttp_client(f"{basic_server_url}/mcp") as ( + async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, write_stream, _, From 433da3e0273738c0dab21b7fd23514c3e3752770 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Mon, 21 Jul 2025 10:44:37 +0200 Subject: [PATCH 02/13] Apply pre-commit --- .../simple-auth-client/mcp_simple_auth_client/main.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py index 2599dc932f..1439cff238 100644 --- a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py +++ b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py @@ -193,8 +193,10 @@ async def _default_redirect_handler(authorization_url: str) -> None: # Create OAuth authentication handler using the new interface # Use client_metadata_url to enable CIMD when the server supports it oauth_auth = OAuthClientProvider( - server_url=self.server_url, - client_metadata=OAuthClientMetadata.model_validate(client_metadata_dict), + server_url=self.server_url.replace("/mcp", ""), + client_metadata=OAuthClientMetadata.model_validate( + client_metadata_dict + ), storage=InMemoryTokenStorage(), redirect_handler=_default_redirect_handler, callback_handler=callback_handler, @@ -327,7 +329,9 @@ async def interactive_loop(self): await self.call_tool(tool_name, arguments) else: - print("āŒ Unknown command. Try 'list', 'call ', or 'quit'") + print( + "āŒ Unknown command. Try 'list', 'call ', or 'quit'" + ) except KeyboardInterrupt: print("\n\nšŸ‘‹ Goodbye!") From bcc4b19ce572473ab69bf612d557c01fa0cd2ffc Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Mon, 21 Jul 2025 12:17:21 +0200 Subject: [PATCH 03/13] Add missing paramters --- src/mcp/client/streamable_http.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index c2842e9d48..9dec707a6f 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -640,6 +640,8 @@ async def streamablehttp_client( timeout: float | timedelta = 30, sse_read_timeout: float | timedelta = 60 * 5, terminate_on_close: bool = True, + httpx_client_factory: McpHttpClientFactory = create_mcp_http_client, + auth: httpx.Auth | None = None, ) -> AsyncGenerator[ tuple[ MemoryObjectReceiveStream[SessionMessage | Exception], @@ -648,5 +650,7 @@ async def streamablehttp_client( ], None, ]: - async with streamable_http_client(url, headers, timeout, sse_read_timeout, terminate_on_close) as streams: + async with streamable_http_client( + url, headers, timeout, sse_read_timeout, terminate_on_close, httpx_client_factory, auth + ) as streams: yield streams From 8196fec0e80b342b210e96914f2b193ef51af5cd Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Wed, 1 Oct 2025 16:11:13 +0100 Subject: [PATCH 04/13] Replace httpx_client_factory with httpx_client parameter Modernize streamable_http_client API by accepting httpx.AsyncClient instances directly instead of factory functions, following industry standards. - New API: httpx_client: httpx.AsyncClient | None parameter - Default client created with recommended timeouts if None - Deprecated wrapper provides backward compatibility - Updated examples to show custom client usage - Add MCP_DEFAULT_TIMEOUT constants to _httpx_utils --- README.md | 16 +- .../mcp_simple_auth_client/main.py | 24 ++- examples/snippets/clients/oauth_client.py | 16 +- src/mcp/client/session_group.py | 15 +- src/mcp/client/streamable_http.py | 89 ++++++--- src/mcp/shared/_httpx_utils.py | 8 +- tests/shared/test_streamable_http.py | 173 +++++++++--------- 7 files changed, 205 insertions(+), 136 deletions(-) diff --git a/README.md b/README.md index c4bf0141a7..f8ac1fe13f 100644 --- a/README.md +++ b/README.md @@ -2375,6 +2375,7 @@ from pydantic import AnyUrl from mcp import ClientSession from mcp.client.auth import OAuthClientProvider, TokenStorage from mcp.client.streamable_http import streamable_http_client +from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -2428,15 +2429,16 @@ async def main(): callback_handler=handle_callback, ) - async with streamable_http_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _): - async with ClientSession(read, write) as session: - await session.initialize() + async with create_mcp_http_client(auth=oauth_auth) as custom_client: + async with streamable_http_client("http://localhost:8001/mcp", httpx_client=custom_client) as (read, write, _): + async with ClientSession(read, write) as session: + await session.initialize() - tools = await session.list_tools() - print(f"Available tools: {[tool.name for tool in tools.tools]}") + tools = await session.list_tools() + print(f"Available tools: {[tool.name for tool in tools.tools]}") - resources = await session.list_resources() - print(f"Available resources: {[r.uri for r in resources.resources]}") + resources = await session.list_resources() + print(f"Available resources: {[r.uri for r in resources.resources]}") def run(): diff --git a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py index 1439cff238..4d413d54a6 100644 --- a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py +++ b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py @@ -11,15 +11,17 @@ import threading import time import webbrowser -from datetime import timedelta from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Any from urllib.parse import parse_qs, urlparse +import httpx + from mcp.client.auth import OAuthClientProvider, TokenStorage from mcp.client.session import ClientSession from mcp.client.sse import sse_client from mcp.client.streamable_http import streamable_http_client +from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -194,9 +196,7 @@ async def _default_redirect_handler(authorization_url: str) -> None: # Use client_metadata_url to enable CIMD when the server supports it oauth_auth = OAuthClientProvider( server_url=self.server_url.replace("/mcp", ""), - client_metadata=OAuthClientMetadata.model_validate( - client_metadata_dict - ), + client_metadata=OAuthClientMetadata.model_validate(client_metadata_dict), storage=InMemoryTokenStorage(), redirect_handler=_default_redirect_handler, callback_handler=callback_handler, @@ -214,12 +214,12 @@ async def _default_redirect_handler(authorization_url: str) -> None: await self._run_session(read_stream, write_stream, None) else: print("šŸ“” Opening StreamableHTTP transport connection with auth...") - async with streamable_http_client( - url=self.server_url, - auth=oauth_auth, - timeout=timedelta(seconds=60), - ) as (read_stream, write_stream, get_session_id): - await self._run_session(read_stream, write_stream, get_session_id) + async with create_mcp_http_client(auth=oauth_auth) as custom_client: + async with streamable_http_client( + url=self.server_url, + httpx_client=custom_client, + ) as (read_stream, write_stream, get_session_id): + await self._run_session(read_stream, write_stream, get_session_id) except Exception as e: print(f"āŒ Failed to connect: {e}") @@ -329,9 +329,7 @@ async def interactive_loop(self): await self.call_tool(tool_name, arguments) else: - print( - "āŒ Unknown command. Try 'list', 'call ', or 'quit'" - ) + print("āŒ Unknown command. Try 'list', 'call ', or 'quit'") except KeyboardInterrupt: print("\n\nšŸ‘‹ Goodbye!") diff --git a/examples/snippets/clients/oauth_client.py b/examples/snippets/clients/oauth_client.py index 38bf7f95f3..68ecf1a7d6 100644 --- a/examples/snippets/clients/oauth_client.py +++ b/examples/snippets/clients/oauth_client.py @@ -15,6 +15,7 @@ from mcp import ClientSession from mcp.client.auth import OAuthClientProvider, TokenStorage from mcp.client.streamable_http import streamable_http_client +from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -68,15 +69,16 @@ async def main(): callback_handler=handle_callback, ) - async with streamable_http_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _): - async with ClientSession(read, write) as session: - await session.initialize() + async with create_mcp_http_client(auth=oauth_auth) as custom_client: + async with streamable_http_client("http://localhost:8001/mcp", httpx_client=custom_client) as (read, write, _): + async with ClientSession(read, write) as session: + await session.initialize() - tools = await session.list_tools() - print(f"Available tools: {[tool.name for tool in tools.tools]}") + tools = await session.list_tools() + print(f"Available tools: {[tool.name for tool in tools.tools]}") - resources = await session.list_resources() - print(f"Available resources: {[r.uri for r in resources.resources]}") + resources = await session.list_resources() + print(f"Available resources: {[r.uri for r in resources.resources]}") def run(): diff --git a/src/mcp/client/session_group.py b/src/mcp/client/session_group.py index 8b86cf82bd..aaf55e0c0a 100644 --- a/src/mcp/client/session_group.py +++ b/src/mcp/client/session_group.py @@ -17,6 +17,7 @@ from typing import Any, TypeAlias, overload import anyio +import httpx from pydantic import BaseModel from typing_extensions import Self, deprecated @@ -26,6 +27,7 @@ from mcp.client.sse import sse_client from mcp.client.stdio import StdioServerParameters from mcp.client.streamable_http import streamable_http_client +from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.exceptions import McpError from mcp.shared.session import ProgressFnT @@ -309,11 +311,18 @@ async def _establish_session( ) read, write = await session_stack.enter_async_context(client) else: + httpx_client = create_mcp_http_client( + headers=server_params.headers, + timeout=httpx.Timeout( + server_params.timeout.total_seconds(), + read=server_params.sse_read_timeout.total_seconds(), + ), + ) + await session_stack.enter_async_context(httpx_client) + client = streamable_http_client( url=server_params.url, - headers=server_params.headers, - timeout=server_params.timeout, - sse_read_timeout=server_params.sse_read_timeout, + httpx_client=httpx_client, terminate_on_close=server_params.terminate_on_close, ) read, write, _ = await session_stack.enter_async_context(client) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9dec707a6f..e2590cc653 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -6,6 +6,7 @@ and session management. """ +import contextlib import logging from collections.abc import AsyncGenerator, Awaitable, Callable from contextlib import asynccontextmanager @@ -19,7 +20,12 @@ from httpx_sse import EventSource, ServerSentEvent, aconnect_sse from typing_extensions import deprecated -from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client +from mcp.shared._httpx_utils import ( + MCP_DEFAULT_SSE_READ_TIMEOUT, + MCP_DEFAULT_TIMEOUT, + McpHttpClientFactory, + create_mcp_http_client, +) from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.types import ( ErrorData, @@ -106,9 +112,9 @@ def __init__( self.session_id = None self.protocol_version = None self.request_headers = { + **self.headers, ACCEPT: f"{JSON}, {SSE}", CONTENT_TYPE: JSON, - **self.headers, } def _prepare_request_headers(self, base_headers: dict[str, str]) -> dict[str, str]: @@ -563,12 +569,9 @@ def get_session_id(self) -> str | None: @asynccontextmanager async def streamable_http_client( url: str, - headers: dict[str, str] | None = None, - timeout: float | timedelta = 30, - sse_read_timeout: float | timedelta = 60 * 5, + *, + httpx_client: httpx.AsyncClient | None = None, terminate_on_close: bool = True, - httpx_client_factory: McpHttpClientFactory = create_mcp_http_client, - auth: httpx.Auth | None = None, ) -> AsyncGenerator[ tuple[ MemoryObjectReceiveStream[SessionMessage | Exception], @@ -580,30 +583,57 @@ async def streamable_http_client( """ Client transport for StreamableHTTP. - `sse_read_timeout` determines how long (in seconds) the client will wait for a new - event before disconnecting. All other HTTP operations are controlled by `timeout`. + Args: + url: The MCP server endpoint URL. + httpx_client: Optional pre-configured httpx.AsyncClient. If None, a default + client with recommended MCP timeouts will be created. To configure headers, + authentication, or other HTTP settings, create an httpx.AsyncClient and pass it here. + terminate_on_close: If True, send a DELETE request to terminate the session + when the context exits. Yields: Tuple containing: - read_stream: Stream for reading messages from the server - write_stream: Stream for sending messages to the server - get_session_id_callback: Function to retrieve the current session ID - """ - transport = StreamableHTTPTransport(url, headers, timeout, sse_read_timeout, auth) + Example: + See examples/snippets/clients/ for usage patterns. + """ read_stream_writer, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](0) write_stream, write_stream_reader = anyio.create_memory_object_stream[SessionMessage](0) + # Determine if we need to create and manage the client + client_provided = httpx_client is not None + client = httpx_client + + if client is None: + # Create default client with recommended MCP timeouts + client = create_mcp_http_client() + + # Extract configuration from the client to pass to transport + headers_dict = dict(client.headers) if client.headers else None + timeout = client.timeout.connect if (client.timeout and client.timeout.connect is not None) else MCP_DEFAULT_TIMEOUT + sse_read_timeout = ( + client.timeout.read if (client.timeout and client.timeout.read is not None) else MCP_DEFAULT_SSE_READ_TIMEOUT + ) + auth = client.auth + + # Create transport with extracted configuration + transport = StreamableHTTPTransport(url, headers_dict, timeout, sse_read_timeout, auth) + + # Sync client headers with transport's merged headers (includes MCP protocol requirements) + client.headers.update(transport.request_headers) + async with anyio.create_task_group() as tg: try: logger.debug(f"Connecting to StreamableHTTP endpoint: {url}") - async with httpx_client_factory( - headers=transport.request_headers, - timeout=httpx.Timeout(transport.timeout, read=transport.sse_read_timeout), - auth=transport.auth, - ) as client: - # Define callbacks that need access to tg + async with contextlib.AsyncExitStack() as stack: + # Only manage client lifecycle if we created it + if not client_provided: + await stack.enter_async_context(client) + def start_get_stream() -> None: tg.start_soon(transport.handle_get_stream, client, read_stream_writer) @@ -650,7 +680,24 @@ async def streamablehttp_client( ], None, ]: - async with streamable_http_client( - url, headers, timeout, sse_read_timeout, terminate_on_close, httpx_client_factory, auth - ) as streams: - yield streams + # Convert timeout parameters + timeout_seconds = timeout.total_seconds() if isinstance(timeout, timedelta) else timeout + sse_read_timeout_seconds = ( + sse_read_timeout.total_seconds() if isinstance(sse_read_timeout, timedelta) else sse_read_timeout + ) + + # Create httpx client using the factory with old-style parameters + client = httpx_client_factory( + headers=headers, + timeout=httpx.Timeout(timeout_seconds, read=sse_read_timeout_seconds), + auth=auth, + ) + + # Manage client lifecycle since we created it + async with client: + async with streamable_http_client( + url, + httpx_client=client, + terminate_on_close=terminate_on_close, + ) as streams: + yield streams diff --git a/src/mcp/shared/_httpx_utils.py b/src/mcp/shared/_httpx_utils.py index 3af64ad1eb..945ef80955 100644 --- a/src/mcp/shared/_httpx_utils.py +++ b/src/mcp/shared/_httpx_utils.py @@ -4,7 +4,11 @@ import httpx -__all__ = ["create_mcp_http_client"] +__all__ = ["create_mcp_http_client", "MCP_DEFAULT_TIMEOUT", "MCP_DEFAULT_SSE_READ_TIMEOUT"] + +# Default MCP timeout configuration +MCP_DEFAULT_TIMEOUT = 30.0 # General operations (seconds) +MCP_DEFAULT_SSE_READ_TIMEOUT = 300.0 # SSE streams - 5 minutes (seconds) class McpHttpClientFactory(Protocol): # pragma: no branch @@ -68,7 +72,7 @@ def create_mcp_http_client( # Handle timeout if timeout is None: - kwargs["timeout"] = httpx.Timeout(30.0) + kwargs["timeout"] = httpx.Timeout(MCP_DEFAULT_TIMEOUT, read=MCP_DEFAULT_SSE_READ_TIMEOUT) else: kwargs["timeout"] = timeout diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 5e0780eea1..3650ffcac4 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -27,6 +27,7 @@ from mcp.client.session import ClientSession from mcp.client.streamable_http import StreamableHTTPTransport, streamable_http_client from mcp.server import Server +from mcp.shared._httpx_utils import create_mcp_http_client from mcp.server.streamable_http import ( MCP_PROTOCOL_VERSION_HEADER, MCP_SESSION_ID_HEADER, @@ -1160,18 +1161,19 @@ async def test_streamable_http_client_session_termination(basic_server, basic_se if captured_session_id: # pragma: no cover headers[MCP_SESSION_ID_HEADER] = captured_session_id - async with streamable_http_client(f"{basic_server_url}/mcp", headers=headers) as ( - read_stream, - write_stream, - _, - ): - async with ClientSession(read_stream, write_stream) as session: - # Attempt to make a request after termination - with pytest.raises( # pragma: no branch - McpError, - match="Session terminated", - ): - await session.list_tools() + async with create_mcp_http_client(headers=headers) as httpx_client: + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client) as ( + read_stream, + write_stream, + _, + ): + async with ClientSession(read_stream, write_stream) as session: + # Attempt to make a request after termination + with pytest.raises( # pragma: no branch + McpError, + match="Session terminated", + ): + await session.list_tools() @pytest.mark.anyio @@ -1224,18 +1226,19 @@ async def mock_delete(self: httpx.AsyncClient, *args: Any, **kwargs: Any) -> htt if captured_session_id: # pragma: no cover headers[MCP_SESSION_ID_HEADER] = captured_session_id - async with streamable_http_client(f"{basic_server_url}/mcp", headers=headers) as ( - read_stream, - write_stream, - _, - ): - async with ClientSession(read_stream, write_stream) as session: - # Attempt to make a request after termination - with pytest.raises( # pragma: no branch - McpError, - match="Session terminated", - ): - await session.list_tools() + async with create_mcp_http_client(headers=headers) as httpx_client: + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client) as ( + read_stream, + write_stream, + _, + ): + async with ClientSession(read_stream, write_stream) as session: + # Attempt to make a request after termination + with pytest.raises( # pragma: no branch + McpError, + match="Session terminated", + ): + await session.list_tools() @pytest.mark.anyio @@ -1322,39 +1325,41 @@ async def run_tool(): headers[MCP_SESSION_ID_HEADER] = captured_session_id if captured_protocol_version: # pragma: no cover headers[MCP_PROTOCOL_VERSION_HEADER] = captured_protocol_version - async with streamable_http_client(f"{server_url}/mcp", headers=headers) as ( - read_stream, - write_stream, - _, - ): - async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: - result = await session.send_request( - types.ClientRequest( - types.CallToolRequest( - params=types.CallToolRequestParams(name="release_lock", arguments={}), - ) - ), - types.CallToolResult, - ) - metadata = ClientMessageMetadata( - resumption_token=captured_resumption_token, - ) - result = await session.send_request( - types.ClientRequest( - types.CallToolRequest( - params=types.CallToolRequestParams(name="wait_for_lock_with_notification", arguments={}), - ) - ), - types.CallToolResult, - metadata=metadata, - ) - assert len(result.content) == 1 - assert result.content[0].type == "text" - assert result.content[0].text == "Completed" + async with create_mcp_http_client(headers=headers) as httpx_client: + async with streamable_http_client(f"{server_url}/mcp", httpx_client=httpx_client) as ( + read_stream, + write_stream, + _, + ): + async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: + result = await session.send_request( + types.ClientRequest( + types.CallToolRequest( + params=types.CallToolRequestParams(name="release_lock", arguments={}), + ) + ), + types.CallToolResult, + ) + metadata = ClientMessageMetadata( + resumption_token=captured_resumption_token, + ) - # We should have received the remaining notifications - assert len(captured_notifications) == 1 + result = await session.send_request( + types.ClientRequest( + types.CallToolRequest( + params=types.CallToolRequestParams(name="wait_for_lock_with_notification", arguments={}), + ) + ), + types.CallToolResult, + metadata=metadata, + ) + assert len(result.content) == 1 + assert result.content[0].type == "text" + assert result.content[0].text == "Completed" + + # We should have received the remaining notifications + assert len(captured_notifications) == 1 assert isinstance(captured_notifications[0].root, types.LoggingMessageNotification) assert captured_notifications[0].root.params.data == "Second notification after lock" @@ -1534,28 +1539,29 @@ async def test_streamablehttp_request_context_propagation(context_aware_server: "X-Trace-Id": "trace-123", } - async with streamable_http_client(f"{basic_server_url}/mcp", headers=custom_headers) as ( - read_stream, - write_stream, - _, - ): - async with ClientSession(read_stream, write_stream) as session: - result = await session.initialize() - assert isinstance(result, InitializeResult) - assert result.serverInfo.name == "ContextAwareServer" + async with httpx.AsyncClient(headers=custom_headers) as httpx_client: + async with streamable_http_client(f"{basic_server_url}/mcp", httpx_client=httpx_client) as ( + read_stream, + write_stream, + _, + ): + async with ClientSession(read_stream, write_stream) as session: + result = await session.initialize() + assert isinstance(result, InitializeResult) + assert result.serverInfo.name == "ContextAwareServer" - # Call the tool that echoes headers back - tool_result = await session.call_tool("echo_headers", {}) + # Call the tool that echoes headers back + tool_result = await session.call_tool("echo_headers", {}) - # Parse the JSON response - assert len(tool_result.content) == 1 - assert isinstance(tool_result.content[0], TextContent) - headers_data = json.loads(tool_result.content[0].text) + # Parse the JSON response + assert len(tool_result.content) == 1 + assert isinstance(tool_result.content[0], TextContent) + headers_data = json.loads(tool_result.content[0].text) - # Verify headers were propagated - assert headers_data.get("authorization") == "Bearer test-token" - assert headers_data.get("x-custom-header") == "test-value" - assert headers_data.get("x-trace-id") == "trace-123" + # Verify headers were propagated + assert headers_data.get("authorization") == "Bearer test-token" + assert headers_data.get("x-custom-header") == "test-value" + assert headers_data.get("x-trace-id") == "trace-123" @pytest.mark.anyio @@ -1571,17 +1577,18 @@ async def test_streamablehttp_request_context_isolation(context_aware_server: No "Authorization": f"Bearer token-{i}", } - async with streamable_http_client(f"{basic_server_url}/mcp", headers=headers) as (read_stream, write_stream, _): - async with ClientSession(read_stream, write_stream) as session: - await session.initialize() + async with httpx.AsyncClient(headers=headers) as httpx_client: + async with streamable_http_client(f"{basic_server_url}/mcp", httpx_client=httpx_client) as (read_stream, write_stream, _): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() - # Call the tool that echoes context - tool_result = await session.call_tool("echo_context", {"request_id": f"request-{i}"}) + # Call the tool that echoes context + tool_result = await session.call_tool("echo_context", {"request_id": f"request-{i}"}) - assert len(tool_result.content) == 1 - assert isinstance(tool_result.content[0], TextContent) - context_data = json.loads(tool_result.content[0].text) - contexts.append(context_data) + assert len(tool_result.content) == 1 + assert isinstance(tool_result.content[0], TextContent) + context_data = json.loads(tool_result.content[0].text) + contexts.append(context_data) # Verify each request had its own context assert len(contexts) == 3 # pragma: no cover From 51e6a9ad28809a25525bb022419a399f8f54245b Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Wed, 1 Oct 2025 19:36:03 +0100 Subject: [PATCH 05/13] Fix test failures from httpx_client API changes This commit fixes all test failures introduced by the API change from httpx_client_factory to direct httpx_client parameter: 1. Updated deprecated imports: Changed streamablehttp_client to streamable_http_client in test files 2. Fixed 307 redirect errors: Replaced httpx.AsyncClient with create_mcp_http_client which includes follow_redirects=True by default 3. Fixed test assertion: Updated test_session_group.py to mock create_mcp_http_client and verify the new API signature where streamable_http_client receives httpx_client parameter instead of individual headers/timeout parameters 4. Removed unused httpx import from main.py after inlining client creation All tests now pass with the new API. --- .../mcp_simple_auth_client/main.py | 2 -- tests/client/test_notification_response.py | 4 +-- tests/client/test_session_group.py | 35 +++++++++++++++++-- tests/shared/test_streamable_http.py | 12 ++++--- 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py index 4d413d54a6..346b866fe3 100644 --- a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py +++ b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py @@ -15,8 +15,6 @@ from typing import Any from urllib.parse import parse_qs, urlparse -import httpx - from mcp.client.auth import OAuthClientProvider, TokenStorage from mcp.client.session import ClientSession from mcp.client.sse import sse_client diff --git a/tests/client/test_notification_response.py b/tests/client/test_notification_response.py index 19d5374007..7500abee73 100644 --- a/tests/client/test_notification_response.py +++ b/tests/client/test_notification_response.py @@ -18,7 +18,7 @@ from starlette.routing import Route from mcp import ClientSession, types -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from mcp.shared.session import RequestResponder from mcp.types import ClientNotification, RootsListChangedNotification from tests.test_helpers import wait_for_server @@ -127,7 +127,7 @@ async def message_handler( # pragma: no cover if isinstance(message, Exception): returned_exception = message - async with streamablehttp_client(server_url) as (read_stream, write_stream, _): + async with streamable_http_client(server_url) as (read_stream, write_stream, _): async with ClientSession( read_stream, write_stream, diff --git a/tests/client/test_session_group.py b/tests/client/test_session_group.py index 669865fd60..b3e20ef198 100644 --- a/tests/client/test_session_group.py +++ b/tests/client/test_session_group.py @@ -292,6 +292,20 @@ async def test_establish_session_parameterized( ): with mock.patch("mcp.client.session_group.mcp.ClientSession") as mock_ClientSession_class: with mock.patch(patch_target_for_client_func) as mock_specific_client_func: + # For streamablehttp, also need to mock create_mcp_http_client + if client_type_name == "streamablehttp": + mock_create_http_client = mock.patch("mcp.client.session_group.create_mcp_http_client") + mock_create_http_client_func = mock_create_http_client.start() + # Mock httpx_client returned by create_mcp_http_client + mock_httpx_client = mock.AsyncMock(name="MockHttpxClient") + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__ = mock.AsyncMock(return_value=None) + mock_create_http_client_func.return_value = mock_httpx_client + else: + mock_create_http_client = None + mock_create_http_client_func = None + mock_httpx_client = None + mock_client_cm_instance = mock.AsyncMock(name=f"{client_type_name}ClientCM") mock_read_stream = mock.AsyncMock(name=f"{client_type_name}Read") mock_write_stream = mock.AsyncMock(name=f"{client_type_name}Write") @@ -354,11 +368,22 @@ async def test_establish_session_parameterized( ) elif client_type_name == "streamablehttp": # pragma: no branch assert isinstance(server_params_instance, StreamableHttpParameters) + # Verify create_mcp_http_client was called with headers and timeout + import httpx + + assert mock_create_http_client_func is not None + expected_timeout = httpx.Timeout( + server_params_instance.timeout.total_seconds(), + read=server_params_instance.sse_read_timeout.total_seconds(), + ) + mock_create_http_client_func.assert_called_once_with( + headers=server_params_instance.headers, + timeout=expected_timeout, + ) + # Verify streamable_http_client was called with url, httpx_client, and terminate_on_close mock_specific_client_func.assert_called_once_with( url=server_params_instance.url, - headers=server_params_instance.headers, - timeout=server_params_instance.timeout, - sse_read_timeout=server_params_instance.sse_read_timeout, + httpx_client=mock_httpx_client, terminate_on_close=server_params_instance.terminate_on_close, ) @@ -382,3 +407,7 @@ async def test_establish_session_parameterized( # 3. Assert returned values assert returned_server_info is mock_initialize_result.serverInfo assert returned_session is mock_entered_session + + # Clean up streamablehttp-specific mock + if mock_create_http_client: + mock_create_http_client.stop() diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3650ffcac4..0418e79f9c 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -27,7 +27,6 @@ from mcp.client.session import ClientSession from mcp.client.streamable_http import StreamableHTTPTransport, streamable_http_client from mcp.server import Server -from mcp.shared._httpx_utils import create_mcp_http_client from mcp.server.streamable_http import ( MCP_PROTOCOL_VERSION_HEADER, MCP_SESSION_ID_HEADER, @@ -41,6 +40,7 @@ ) from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings +from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.context import RequestContext from mcp.shared.exceptions import McpError from mcp.shared.message import ClientMessageMetadata, ServerMessageMetadata, SessionMessage @@ -1539,7 +1539,7 @@ async def test_streamablehttp_request_context_propagation(context_aware_server: "X-Trace-Id": "trace-123", } - async with httpx.AsyncClient(headers=custom_headers) as httpx_client: + async with create_mcp_http_client(headers=custom_headers) as httpx_client: async with streamable_http_client(f"{basic_server_url}/mcp", httpx_client=httpx_client) as ( read_stream, write_stream, @@ -1577,8 +1577,12 @@ async def test_streamablehttp_request_context_isolation(context_aware_server: No "Authorization": f"Bearer token-{i}", } - async with httpx.AsyncClient(headers=headers) as httpx_client: - async with streamable_http_client(f"{basic_server_url}/mcp", httpx_client=httpx_client) as (read_stream, write_stream, _): + async with create_mcp_http_client(headers=headers) as httpx_client: + async with streamable_http_client(f"{basic_server_url}/mcp", httpx_client=httpx_client) as ( + read_stream, + write_stream, + _, + ): async with ClientSession(read_stream, write_stream) as session: await session.initialize() From ac0e3dfda55f2cc484491d437956a59ccd3c6085 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Wed, 1 Oct 2025 19:45:46 +0100 Subject: [PATCH 06/13] Add missing imports to test_integration.py Added missing type annotation imports that were causing NameError and preventing test collection: - RequestResponder from mcp.shared.session - SessionMessage from mcp.shared.message - GetSessionIdCallback from mcp.client.streamable_http - RequestContext from mcp.shared.context This fixes 4 NameError collection failures and 10 F821 ruff errors, allowing all 20 tests in the file to be properly collected and executed. --- tests/server/fastmcp/test_integration.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/server/fastmcp/test_integration.py b/tests/server/fastmcp/test_integration.py index 887aa95887..70948bd7e2 100644 --- a/tests/server/fastmcp/test_integration.py +++ b/tests/server/fastmcp/test_integration.py @@ -34,7 +34,10 @@ ) from mcp.client.session import ClientSession from mcp.client.sse import sse_client -from mcp.client.streamable_http import streamable_http_client +from mcp.client.streamable_http import GetSessionIdCallback, streamable_http_client +from mcp.shared.context import RequestContext +from mcp.shared.message import SessionMessage +from mcp.shared.session import RequestResponder from mcp.types import ( ClientResult, CreateMessageRequestParams, From 0f373e41362dce3bdc0526513099c77fa1ad28dd Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Wed, 1 Oct 2025 19:50:26 +0100 Subject: [PATCH 07/13] Add missing type annotations to test functions Restore type annotations on test function parameters in test_streamable_http.py that were accidentally removed during the function renaming from streamablehttp_client to streamable_http_client. Added type annotations to: - Fixture parameters: basic_server, basic_server_url, json_response_server, json_server_url, event_server, monkeypatch - Test function parameters: initialized_client_session This fixes all 61 pyright errors and ensures type safety matches the main branch standards. --- tests/shared/test_streamable_http.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 0418e79f9c..313137d178 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -987,7 +987,7 @@ async def initialized_client_session(basic_server: None, basic_server_url: str): @pytest.mark.anyio -async def test_streamable_http_client_basic_connection(basic_server, basic_server_url): +async def test_streamable_http_client_basic_connection(basic_server: None, basic_server_url: str): """Test basic client connection with initialization.""" async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, @@ -1005,7 +1005,7 @@ async def test_streamable_http_client_basic_connection(basic_server, basic_serve @pytest.mark.anyio -async def test_streamable_http_client_resource_read(initialized_client_session): +async def test_streamable_http_client_resource_read(initialized_client_session: ClientSession): """Test client resource read functionality.""" response = await initialized_client_session.read_resource(uri=AnyUrl("foobar://test-resource")) assert len(response.contents) == 1 @@ -1015,7 +1015,7 @@ async def test_streamable_http_client_resource_read(initialized_client_session): @pytest.mark.anyio -async def test_streamable_http_client_tool_invocation(initialized_client_session): +async def test_streamable_http_client_tool_invocation(initialized_client_session: ClientSession): """Test client tool invocation.""" # First list tools tools = await initialized_client_session.list_tools() @@ -1030,7 +1030,7 @@ async def test_streamable_http_client_tool_invocation(initialized_client_session @pytest.mark.anyio -async def test_streamable_http_client_error_handling(initialized_client_session): +async def test_streamable_http_client_error_handling(initialized_client_session: ClientSession): """Test error handling in client.""" with pytest.raises(McpError) as exc_info: await initialized_client_session.read_resource(uri=AnyUrl("unknown://test-error")) @@ -1039,7 +1039,7 @@ async def test_streamable_http_client_error_handling(initialized_client_session) @pytest.mark.anyio -async def test_streamable_http_client_session_persistence(basic_server, basic_server_url): +async def test_streamable_http_client_session_persistence(basic_server: None, basic_server_url: str): """Test that session ID persists across requests.""" async with streamable_http_client(f"{basic_server_url}/mcp") as ( read_stream, @@ -1067,7 +1067,7 @@ async def test_streamable_http_client_session_persistence(basic_server, basic_se @pytest.mark.anyio -async def test_streamable_http_client_json_response(json_response_server, json_server_url): +async def test_streamable_http_client_json_response(json_response_server: None, json_server_url: str): """Test client with JSON response mode.""" async with streamable_http_client(f"{json_server_url}/mcp") as ( read_stream, @@ -1095,7 +1095,7 @@ async def test_streamable_http_client_json_response(json_response_server, json_s @pytest.mark.anyio -async def test_streamable_http_client_get_stream(basic_server, basic_server_url): +async def test_streamable_http_client_get_stream(basic_server: None, basic_server_url: str): """Test GET stream functionality for server-initiated messages.""" import mcp.types as types @@ -1135,7 +1135,7 @@ async def message_handler( # pragma: no branch @pytest.mark.anyio -async def test_streamable_http_client_session_termination(basic_server, basic_server_url): +async def test_streamable_http_client_session_termination(basic_server: None, basic_server_url: str): """Test client session termination functionality.""" captured_session_id = None @@ -1177,7 +1177,9 @@ async def test_streamable_http_client_session_termination(basic_server, basic_se @pytest.mark.anyio -async def test_streamable_http_client_session_termination_204(basic_server, basic_server_url, monkeypatch): +async def test_streamable_http_client_session_termination_204( + basic_server: None, basic_server_url: str, monkeypatch: pytest.MonkeyPatch +): """Test client session termination functionality with a 204 response. This test patches the httpx client to return a 204 response for DELETEs. @@ -1242,7 +1244,7 @@ async def mock_delete(self: httpx.AsyncClient, *args: Any, **kwargs: Any) -> htt @pytest.mark.anyio -async def test_streamable_http_client_resumption(event_server): +async def test_streamable_http_client_resumption(event_server: tuple[SimpleEventStore, str]): """Test client session resumption using sync primitives for reliable coordination.""" _, server_url = event_server From 80a48bf10784732387f5944ee539fe800abcace0 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Wed, 1 Oct 2025 20:00:15 +0100 Subject: [PATCH 08/13] Simplify test_session_group.py mocking Remove complex mocking of create_mcp_http_client in the streamablehttp test case. Instead, let the real create_mcp_http_client execute and only verify that streamable_http_client receives the correct parameters including a real httpx.AsyncClient instance. This simplifies the test by: - Removing 13 lines of mock setup code - Removing 14 lines of mock verification code - Removing 3 lines of mock cleanup code - Trusting that create_mcp_http_client works (it has its own tests) The test now focuses on verifying the integration between session_group and streamable_http_client rather than re-testing create_mcp_http_client. --- tests/client/test_session_group.py | 40 +++++------------------------- 1 file changed, 6 insertions(+), 34 deletions(-) diff --git a/tests/client/test_session_group.py b/tests/client/test_session_group.py index b3e20ef198..53a954a022 100644 --- a/tests/client/test_session_group.py +++ b/tests/client/test_session_group.py @@ -292,20 +292,6 @@ async def test_establish_session_parameterized( ): with mock.patch("mcp.client.session_group.mcp.ClientSession") as mock_ClientSession_class: with mock.patch(patch_target_for_client_func) as mock_specific_client_func: - # For streamablehttp, also need to mock create_mcp_http_client - if client_type_name == "streamablehttp": - mock_create_http_client = mock.patch("mcp.client.session_group.create_mcp_http_client") - mock_create_http_client_func = mock_create_http_client.start() - # Mock httpx_client returned by create_mcp_http_client - mock_httpx_client = mock.AsyncMock(name="MockHttpxClient") - mock_httpx_client.__aenter__.return_value = mock_httpx_client - mock_httpx_client.__aexit__ = mock.AsyncMock(return_value=None) - mock_create_http_client_func.return_value = mock_httpx_client - else: - mock_create_http_client = None - mock_create_http_client_func = None - mock_httpx_client = None - mock_client_cm_instance = mock.AsyncMock(name=f"{client_type_name}ClientCM") mock_read_stream = mock.AsyncMock(name=f"{client_type_name}Read") mock_write_stream = mock.AsyncMock(name=f"{client_type_name}Write") @@ -368,24 +354,14 @@ async def test_establish_session_parameterized( ) elif client_type_name == "streamablehttp": # pragma: no branch assert isinstance(server_params_instance, StreamableHttpParameters) - # Verify create_mcp_http_client was called with headers and timeout + # Verify streamable_http_client was called with url, httpx_client, and terminate_on_close + # The httpx_client is created by the real create_mcp_http_client import httpx - assert mock_create_http_client_func is not None - expected_timeout = httpx.Timeout( - server_params_instance.timeout.total_seconds(), - read=server_params_instance.sse_read_timeout.total_seconds(), - ) - mock_create_http_client_func.assert_called_once_with( - headers=server_params_instance.headers, - timeout=expected_timeout, - ) - # Verify streamable_http_client was called with url, httpx_client, and terminate_on_close - mock_specific_client_func.assert_called_once_with( - url=server_params_instance.url, - httpx_client=mock_httpx_client, - terminate_on_close=server_params_instance.terminate_on_close, - ) + call_args = mock_specific_client_func.call_args + assert call_args.kwargs["url"] == server_params_instance.url + assert call_args.kwargs["terminate_on_close"] == server_params_instance.terminate_on_close + assert isinstance(call_args.kwargs["httpx_client"], httpx.AsyncClient) mock_client_cm_instance.__aenter__.assert_awaited_once() @@ -407,7 +383,3 @@ async def test_establish_session_parameterized( # 3. Assert returned values assert returned_server_info is mock_initialize_result.serverInfo assert returned_session is mock_entered_session - - # Clean up streamablehttp-specific mock - if mock_create_http_client: - mock_create_http_client.stop() From e0f780724513b5b6ccb9eb36ede2cf3af6d39a33 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 13 Oct 2025 14:12:18 +0100 Subject: [PATCH 09/13] refactor: Address API design improvements in StreamableHTTP client This commit addresses two API design concerns: 1. Remove private module usage in examples: Examples no longer import from the private mcp.shared._httpx_utils module. Instead, they create httpx clients directly using the public httpx library. 2. Rename httpx_client parameter to http_client: The 'httpx_client' parameter name was redundant since the type annotation already specifies it's an httpx.AsyncClient. Renaming to 'http_client' provides a cleaner, more concise API. Changes: - Updated oauth_client.py and simple-auth-client examples to use public APIs - Renamed httpx_client to http_client in function signatures - Updated all internal callers and tests - Updated deprecated streamablehttp_client wrapper function --- README.md | 6 +++--- .../simple-auth-client/mcp_simple_auth_client/main.py | 6 +++--- examples/snippets/clients/oauth_client.py | 6 +++--- src/mcp/client/session_group.py | 2 +- src/mcp/client/streamable_http.py | 10 +++++----- tests/client/test_session_group.py | 4 ++-- tests/shared/test_streamable_http.py | 6 +++--- 7 files changed, 20 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index f8ac1fe13f..e7a6e955b9 100644 --- a/README.md +++ b/README.md @@ -2370,12 +2370,12 @@ cd to the `examples/snippets` directory and run: import asyncio from urllib.parse import parse_qs, urlparse +import httpx from pydantic import AnyUrl from mcp import ClientSession from mcp.client.auth import OAuthClientProvider, TokenStorage from mcp.client.streamable_http import streamable_http_client -from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -2429,8 +2429,8 @@ async def main(): callback_handler=handle_callback, ) - async with create_mcp_http_client(auth=oauth_auth) as custom_client: - async with streamable_http_client("http://localhost:8001/mcp", httpx_client=custom_client) as (read, write, _): + async with httpx.AsyncClient(auth=oauth_auth, follow_redirects=True) as custom_client: + async with streamable_http_client("http://localhost:8001/mcp", http_client=custom_client) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() diff --git a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py index 346b866fe3..a88c4ea6b6 100644 --- a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py +++ b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py @@ -15,11 +15,11 @@ from typing import Any from urllib.parse import parse_qs, urlparse +import httpx from mcp.client.auth import OAuthClientProvider, TokenStorage from mcp.client.session import ClientSession from mcp.client.sse import sse_client from mcp.client.streamable_http import streamable_http_client -from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -212,10 +212,10 @@ async def _default_redirect_handler(authorization_url: str) -> None: await self._run_session(read_stream, write_stream, None) else: print("šŸ“” Opening StreamableHTTP transport connection with auth...") - async with create_mcp_http_client(auth=oauth_auth) as custom_client: + async with httpx.AsyncClient(auth=oauth_auth, follow_redirects=True) as custom_client: async with streamable_http_client( url=self.server_url, - httpx_client=custom_client, + http_client=custom_client, ) as (read_stream, write_stream, get_session_id): await self._run_session(read_stream, write_stream, get_session_id) diff --git a/examples/snippets/clients/oauth_client.py b/examples/snippets/clients/oauth_client.py index 68ecf1a7d6..140b38aedb 100644 --- a/examples/snippets/clients/oauth_client.py +++ b/examples/snippets/clients/oauth_client.py @@ -10,12 +10,12 @@ import asyncio from urllib.parse import parse_qs, urlparse +import httpx from pydantic import AnyUrl from mcp import ClientSession from mcp.client.auth import OAuthClientProvider, TokenStorage from mcp.client.streamable_http import streamable_http_client -from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken @@ -69,8 +69,8 @@ async def main(): callback_handler=handle_callback, ) - async with create_mcp_http_client(auth=oauth_auth) as custom_client: - async with streamable_http_client("http://localhost:8001/mcp", httpx_client=custom_client) as (read, write, _): + async with httpx.AsyncClient(auth=oauth_auth, follow_redirects=True) as custom_client: + async with streamable_http_client("http://localhost:8001/mcp", http_client=custom_client) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() diff --git a/src/mcp/client/session_group.py b/src/mcp/client/session_group.py index aaf55e0c0a..f82677d27c 100644 --- a/src/mcp/client/session_group.py +++ b/src/mcp/client/session_group.py @@ -322,7 +322,7 @@ async def _establish_session( client = streamable_http_client( url=server_params.url, - httpx_client=httpx_client, + http_client=httpx_client, terminate_on_close=server_params.terminate_on_close, ) read, write, _ = await session_stack.enter_async_context(client) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index e2590cc653..b1e4e9ab62 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -570,7 +570,7 @@ def get_session_id(self) -> str | None: async def streamable_http_client( url: str, *, - httpx_client: httpx.AsyncClient | None = None, + http_client: httpx.AsyncClient | None = None, terminate_on_close: bool = True, ) -> AsyncGenerator[ tuple[ @@ -585,7 +585,7 @@ async def streamable_http_client( Args: url: The MCP server endpoint URL. - httpx_client: Optional pre-configured httpx.AsyncClient. If None, a default + http_client: Optional pre-configured httpx.AsyncClient. If None, a default client with recommended MCP timeouts will be created. To configure headers, authentication, or other HTTP settings, create an httpx.AsyncClient and pass it here. terminate_on_close: If True, send a DELETE request to terminate the session @@ -604,8 +604,8 @@ async def streamable_http_client( write_stream, write_stream_reader = anyio.create_memory_object_stream[SessionMessage](0) # Determine if we need to create and manage the client - client_provided = httpx_client is not None - client = httpx_client + client_provided = http_client is not None + client = http_client if client is None: # Create default client with recommended MCP timeouts @@ -697,7 +697,7 @@ async def streamablehttp_client( async with client: async with streamable_http_client( url, - httpx_client=client, + http_client=client, terminate_on_close=terminate_on_close, ) as streams: yield streams diff --git a/tests/client/test_session_group.py b/tests/client/test_session_group.py index 53a954a022..755c613d9b 100644 --- a/tests/client/test_session_group.py +++ b/tests/client/test_session_group.py @@ -355,13 +355,13 @@ async def test_establish_session_parameterized( elif client_type_name == "streamablehttp": # pragma: no branch assert isinstance(server_params_instance, StreamableHttpParameters) # Verify streamable_http_client was called with url, httpx_client, and terminate_on_close - # The httpx_client is created by the real create_mcp_http_client + # The http_client is created by the real create_mcp_http_client import httpx call_args = mock_specific_client_func.call_args assert call_args.kwargs["url"] == server_params_instance.url assert call_args.kwargs["terminate_on_close"] == server_params_instance.terminate_on_close - assert isinstance(call_args.kwargs["httpx_client"], httpx.AsyncClient) + assert isinstance(call_args.kwargs["http_client"], httpx.AsyncClient) mock_client_cm_instance.__aenter__.assert_awaited_once() diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 313137d178..102bd9b7ac 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -1329,7 +1329,7 @@ async def run_tool(): headers[MCP_PROTOCOL_VERSION_HEADER] = captured_protocol_version async with create_mcp_http_client(headers=headers) as httpx_client: - async with streamable_http_client(f"{server_url}/mcp", httpx_client=httpx_client) as ( + async with streamable_http_client(f"{server_url}/mcp", http_client=httpx_client) as ( read_stream, write_stream, _, @@ -1542,7 +1542,7 @@ async def test_streamablehttp_request_context_propagation(context_aware_server: } async with create_mcp_http_client(headers=custom_headers) as httpx_client: - async with streamable_http_client(f"{basic_server_url}/mcp", httpx_client=httpx_client) as ( + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client) as ( read_stream, write_stream, _, @@ -1580,7 +1580,7 @@ async def test_streamablehttp_request_context_isolation(context_aware_server: No } async with create_mcp_http_client(headers=headers) as httpx_client: - async with streamable_http_client(f"{basic_server_url}/mcp", httpx_client=httpx_client) as ( + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client) as ( read_stream, write_stream, _, From 5004eda88b9d6bb3f0418224b15f481895ff572b Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Thu, 23 Oct 2025 14:28:27 -0700 Subject: [PATCH 10/13] refactor: Remove header mutation in streamable_http_client Remove client.headers.update() call that was unnecessarily mutating user-provided httpx.AsyncClient instances. The mutation was defensive but unnecessary since: 1. All transport methods pass headers explicitly to httpx requests 2. httpx merges request headers with client defaults, with request headers taking precedence 3. HTTP requests are identical with or without the mutation 4. Not mutating respects user's client object integrity Add comprehensive test coverage for header behavior: - Verify client headers are not mutated after use - Verify MCP protocol headers override httpx defaults in requests - Verify custom and MCP headers coexist correctly in requests All existing tests pass, confirming no behavior change to actual HTTP requests. --- src/mcp/client/streamable_http.py | 3 - tests/shared/test_streamable_http.py | 137 ++++++++++++++++++++++++--- 2 files changed, 123 insertions(+), 17 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index b1e4e9ab62..c4c7e50b88 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -622,9 +622,6 @@ async def streamable_http_client( # Create transport with extracted configuration transport = StreamableHTTPTransport(url, headers_dict, timeout, sse_read_timeout, auth) - # Sync client headers with transport's merged headers (includes MCP protocol requirements) - client.headers.update(transport.request_headers) - async with anyio.create_task_group() as tg: try: logger.debug(f"Connecting to StreamableHTTP endpoint: {url}") diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 102bd9b7ac..7de3ed80ca 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -1891,7 +1891,7 @@ async def test_close_sse_stream_callback_not_provided_for_old_protocol_version() @pytest.mark.anyio -async def test_streamablehttp_client_receives_priming_event( +async def test_streamable_http_client_receives_priming_event( event_server: tuple[SimpleEventStore, str], ) -> None: """Client should receive priming event (resumption token update) on POST SSE stream.""" @@ -1902,7 +1902,7 @@ async def test_streamablehttp_client_receives_priming_event( async def on_resumption_token_update(token: str) -> None: captured_resumption_tokens.append(token) - async with streamablehttp_client(f"{server_url}/mcp") as ( + async with streamable_http_client(f"{server_url}/mcp") as ( read_stream, write_stream, _, @@ -1943,7 +1943,7 @@ async def test_server_close_sse_stream_via_context( """Server tool can call ctx.close_sse_stream() to close connection.""" _, server_url = event_server - async with streamablehttp_client(f"{server_url}/mcp") as ( + async with streamable_http_client(f"{server_url}/mcp") as ( read_stream, write_stream, _, @@ -1964,7 +1964,7 @@ async def test_server_close_sse_stream_via_context( @pytest.mark.anyio -async def test_streamablehttp_client_auto_reconnects( +async def test_streamable_http_client_auto_reconnects( event_server: tuple[SimpleEventStore, str], ) -> None: """Client should auto-reconnect with Last-Event-ID when server closes after priming event.""" @@ -1980,7 +1980,7 @@ async def message_handler( if isinstance(message.root, types.LoggingMessageNotification): # pragma: no branch captured_notifications.append(str(message.root.params.data)) - async with streamablehttp_client(f"{server_url}/mcp") as ( + async with streamable_http_client(f"{server_url}/mcp") as ( read_stream, write_stream, _, @@ -2009,13 +2009,13 @@ async def message_handler( @pytest.mark.anyio -async def test_streamablehttp_client_respects_retry_interval( +async def test_streamable_http_client_respects_retry_interval( event_server: tuple[SimpleEventStore, str], ) -> None: """Client MUST respect retry field, waiting specified ms before reconnecting.""" _, server_url = event_server - async with streamablehttp_client(f"{server_url}/mcp") as ( + async with streamable_http_client(f"{server_url}/mcp") as ( read_stream, write_stream, _, @@ -2040,7 +2040,7 @@ async def test_streamablehttp_client_respects_retry_interval( @pytest.mark.anyio -async def test_streamablehttp_sse_polling_full_cycle( +async def test_streamable_http_sse_polling_full_cycle( event_server: tuple[SimpleEventStore, str], ) -> None: """End-to-end test: server closes stream, client reconnects, receives all events.""" @@ -2056,7 +2056,7 @@ async def message_handler( if isinstance(message.root, types.LoggingMessageNotification): # pragma: no branch all_notifications.append(str(message.root.params.data)) - async with streamablehttp_client(f"{server_url}/mcp") as ( + async with streamable_http_client(f"{server_url}/mcp") as ( read_stream, write_stream, _, @@ -2088,7 +2088,7 @@ async def message_handler( @pytest.mark.anyio -async def test_streamablehttp_events_replayed_after_disconnect( +async def test_streamable_http_events_replayed_after_disconnect( event_server: tuple[SimpleEventStore, str], ) -> None: """Events sent while client is disconnected should be replayed on reconnect.""" @@ -2104,7 +2104,7 @@ async def message_handler( if isinstance(message.root, types.LoggingMessageNotification): # pragma: no branch notification_data.append(str(message.root.params.data)) - async with streamablehttp_client(f"{server_url}/mcp") as ( + async with streamable_http_client(f"{server_url}/mcp") as ( read_stream, write_stream, _, @@ -2136,7 +2136,7 @@ async def message_handler( @pytest.mark.anyio -async def test_streamablehttp_multiple_reconnections( +async def test_streamable_http_multiple_reconnections( event_server: tuple[SimpleEventStore, str], ): """Verify multiple close_sse_stream() calls each trigger a client reconnect. @@ -2156,7 +2156,7 @@ async def test_streamablehttp_multiple_reconnections( async def on_resumption_token(token: str) -> None: resumption_tokens.append(token) - async with streamablehttp_client(f"{server_url}/mcp") as (read_stream, write_stream, _): + async with streamable_http_client(f"{server_url}/mcp") as (read_stream, write_stream, _): async with ClientSession(read_stream, write_stream) as session: await session.initialize() @@ -2216,7 +2216,7 @@ async def message_handler( if isinstance(message.root, types.ResourceUpdatedNotification): # pragma: no branch received_notifications.append(str(message.root.params.uri)) - async with streamablehttp_client(f"{server_url}/mcp") as ( + async with streamable_http_client(f"{server_url}/mcp") as ( read_stream, write_stream, _, @@ -2247,3 +2247,112 @@ async def message_handler( assert "http://notification_2/" in received_notifications, ( f"Should receive notification 2 after reconnect, got: {received_notifications}" ) + + +@pytest.mark.anyio +async def test_streamable_http_client_does_not_mutate_provided_client( + basic_server: None, basic_server_url: str +) -> None: + """Test that streamable_http_client does not mutate the provided httpx client's headers.""" + # Create a client with custom headers + original_headers = { + "X-Custom-Header": "custom-value", + "Authorization": "Bearer test-token", + } + + async with httpx.AsyncClient(headers=original_headers, follow_redirects=True) as custom_client: + # Use the client with streamable_http_client + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=custom_client) as ( + read_stream, + write_stream, + _, + ): + async with ClientSession(read_stream, write_stream) as session: + result = await session.initialize() + assert isinstance(result, InitializeResult) + + # Verify client headers were not mutated with MCP protocol headers + # If accept header exists, it should still be httpx default, not MCP's + if "accept" in custom_client.headers: # pragma: no branch + assert custom_client.headers.get("accept") == "*/*" + # MCP content-type should not have been added + assert custom_client.headers.get("content-type") != "application/json" + + # Verify custom headers are still present and unchanged + assert custom_client.headers.get("X-Custom-Header") == "custom-value" + assert custom_client.headers.get("Authorization") == "Bearer test-token" + + +@pytest.mark.anyio +async def test_streamable_http_client_mcp_headers_override_defaults( + context_aware_server: None, basic_server_url: str +) -> None: + """Test that MCP protocol headers override httpx.AsyncClient default headers.""" + # httpx.AsyncClient has default "accept: */*" header + # We need to verify that our MCP accept header overrides it in actual requests + + async with httpx.AsyncClient(follow_redirects=True) as client: + # Verify client has default accept header + assert client.headers.get("accept") == "*/*" + + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=client) as ( + read_stream, + write_stream, + _, + ): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + + # Use echo_headers tool to see what headers the server actually received + tool_result = await session.call_tool("echo_headers", {}) + assert len(tool_result.content) == 1 + assert isinstance(tool_result.content[0], TextContent) + headers_data = json.loads(tool_result.content[0].text) + + # Verify MCP protocol headers were sent (not httpx defaults) + assert "accept" in headers_data + assert "application/json" in headers_data["accept"] + assert "text/event-stream" in headers_data["accept"] + + assert "content-type" in headers_data + assert headers_data["content-type"] == "application/json" + + +@pytest.mark.anyio +async def test_streamable_http_client_preserves_custom_with_mcp_headers( + context_aware_server: None, basic_server_url: str +) -> None: + """Test that both custom headers and MCP protocol headers are sent in requests.""" + custom_headers = { + "X-Custom-Header": "custom-value", + "X-Request-Id": "req-123", + "Authorization": "Bearer test-token", + } + + async with httpx.AsyncClient(headers=custom_headers, follow_redirects=True) as client: + async with streamable_http_client(f"{basic_server_url}/mcp", http_client=client) as ( + read_stream, + write_stream, + _, + ): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + + # Use echo_headers tool to verify both custom and MCP headers are present + tool_result = await session.call_tool("echo_headers", {}) + assert len(tool_result.content) == 1 + assert isinstance(tool_result.content[0], TextContent) + headers_data = json.loads(tool_result.content[0].text) + + # Verify custom headers are present + assert headers_data.get("x-custom-header") == "custom-value" + assert headers_data.get("x-request-id") == "req-123" + assert headers_data.get("authorization") == "Bearer test-token" + + # Verify MCP protocol headers are also present + assert "accept" in headers_data + assert "application/json" in headers_data["accept"] + assert "text/event-stream" in headers_data["accept"] + + assert "content-type" in headers_data + assert headers_data["content-type"] == "application/json" From 862c22fec7e6a459a6cb2f76f37f0eae336203a2 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Tue, 28 Oct 2025 16:17:37 +0100 Subject: [PATCH 11/13] Add overload for deprecation --- src/mcp/client/streamable_http.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index c4c7e50b88..181625ea3d 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -12,6 +12,8 @@ from contextlib import asynccontextmanager from dataclasses import dataclass from datetime import timedelta +from typing import Any, overload +from warnings import warn import anyio import httpx @@ -85,6 +87,20 @@ class RequestContext: class StreamableHTTPTransport: """StreamableHTTP client transport implementation.""" + @overload + def __init__(self, url: str) -> None: ... + + @deprecated("Those parameters are deprecated. Use the url parameter instead.") + @overload + def __init__( + self, + url: str, + headers: dict[str, str] | None = None, + timeout: float | timedelta = 30, + sse_read_timeout: float | timedelta = 60 * 5, + auth: httpx.Auth | None = None, + ) -> None: ... + def __init__( self, url: str, @@ -92,6 +108,7 @@ def __init__( timeout: float | timedelta = 30, sse_read_timeout: float | timedelta = 60 * 5, auth: httpx.Auth | None = None, + **deprecated: dict[str, Any], ) -> None: """Initialize the StreamableHTTP transport. @@ -102,6 +119,8 @@ def __init__( sse_read_timeout: Timeout for SSE read operations. auth: Optional HTTPX authentication handler. """ + if deprecated: + warn(f"Deprecated parameters: {deprecated}", DeprecationWarning) self.url = url self.headers = headers or {} self.timeout = timeout.total_seconds() if isinstance(timeout, timedelta) else timeout From d64d191b6aaf59dbafb5f8d7015a77fb8f8069b5 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 4 Nov 2025 11:33:00 +0000 Subject: [PATCH 12/13] Fix StreamableHTTP transport API for backwards compatibility and cleaner header handling This commit addresses three issues identified in PR review: 1. **Restore RequestContext fields for backwards compatibility** - Re-add `headers` and `sse_read_timeout` fields as optional with None defaults - Mark them as deprecated in docstring since they're no longer used internally - Prevents breaking changes for any code accessing these fields 2. **Add runtime deprecation warnings for StreamableHTTPTransport constructor** - Use sentinel value pattern to detect when deprecated parameters are passed - Issue DeprecationWarning at runtime when headers, timeout, sse_read_timeout, or auth are provided - Complements existing @deprecated decorator for type checkers with actual runtime warnings - Improve deprecation message clarity 3. **Simplify header handling by removing redundant client parameter** - Remove `client` parameter from `_prepare_headers()` method - Stop extracting and re-passing client.headers since httpx automatically merges them - Only build MCP-specific headers (Accept, Content-Type, session headers) - httpx merges these with client.headers automatically, with our headers taking precedence - Reduces code complexity and eliminates unnecessary header extraction The header handling change leverages httpx's built-in header merging behavior, similar to how headers were handled before the refactoring but without the redundant extraction-and-repass pattern. --- src/mcp/client/streamable_http.py | 85 ++++++++++++++++------------ tests/client/test_http_unicode.py | 6 +- tests/shared/test_streamable_http.py | 40 ++++++++++++- 3 files changed, 92 insertions(+), 39 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 181625ea3d..a414d6a85b 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -62,6 +62,9 @@ JSON = "application/json" SSE = "text/event-stream" +# Sentinel value for detecting unset optional parameters +_UNSET = object() + class StreamableHTTPError(Exception): """Base exception for StreamableHTTP transport errors.""" @@ -81,7 +84,8 @@ class RequestContext: session_message: SessionMessage metadata: ClientMessageMetadata | None read_stream_writer: StreamWriter - sse_read_timeout: float + headers: dict[str, str] | None = None # Deprecated - no longer used + sse_read_timeout: float | None = None # Deprecated - no longer used class StreamableHTTPTransport: @@ -90,8 +94,11 @@ class StreamableHTTPTransport: @overload def __init__(self, url: str) -> None: ... - @deprecated("Those parameters are deprecated. Use the url parameter instead.") @overload + @deprecated( + "Parameters headers, timeout, sse_read_timeout, and auth are deprecated. " + "Configure these on the httpx.AsyncClient instead." + ) def __init__( self, url: str, @@ -104,11 +111,10 @@ def __init__( def __init__( self, url: str, - headers: dict[str, str] | None = None, - timeout: float | timedelta = 30, - sse_read_timeout: float | timedelta = 60 * 5, - auth: httpx.Auth | None = None, - **deprecated: dict[str, Any], + headers: Any = _UNSET, + timeout: Any = _UNSET, + sse_read_timeout: Any = _UNSET, + auth: Any = _UNSET, ) -> None: """Initialize the StreamableHTTP transport. @@ -119,26 +125,40 @@ def __init__( sse_read_timeout: Timeout for SSE read operations. auth: Optional HTTPX authentication handler. """ - if deprecated: - warn(f"Deprecated parameters: {deprecated}", DeprecationWarning) + # Check for deprecated parameters and issue runtime warning + deprecated_params: list[str] = [] + if headers is not _UNSET: + deprecated_params.append("headers") + if timeout is not _UNSET: + deprecated_params.append("timeout") + if sse_read_timeout is not _UNSET: + deprecated_params.append("sse_read_timeout") + if auth is not _UNSET: + deprecated_params.append("auth") + + if deprecated_params: + warn( + f"Parameters {', '.join(deprecated_params)} are deprecated and will be ignored. " + "Configure these on the httpx.AsyncClient instead.", + DeprecationWarning, + stacklevel=2, + ) + self.url = url - self.headers = headers or {} - self.timeout = timeout.total_seconds() if isinstance(timeout, timedelta) else timeout - self.sse_read_timeout = ( - sse_read_timeout.total_seconds() if isinstance(sse_read_timeout, timedelta) else sse_read_timeout - ) - self.auth = auth self.session_id = None self.protocol_version = None - self.request_headers = { - **self.headers, - ACCEPT: f"{JSON}, {SSE}", - CONTENT_TYPE: JSON, - } - - def _prepare_request_headers(self, base_headers: dict[str, str]) -> dict[str, str]: - """Update headers with session ID and protocol version if available.""" - headers = base_headers.copy() + + def _prepare_headers(self) -> dict[str, str]: + """Build MCP-specific request headers. + + These headers will be merged with the httpx.AsyncClient's default headers, + with these MCP-specific headers taking precedence. + """ + headers: dict[str, str] = {} + # Add MCP protocol headers + headers[ACCEPT] = f"{JSON}, {SSE}" + headers[CONTENT_TYPE] = JSON + # Add session headers if available if self.session_id: headers[MCP_SESSION_ID] = self.session_id if self.protocol_version: @@ -242,7 +262,7 @@ async def handle_get_stream( if not self.session_id: return - headers = self._prepare_request_headers(self.request_headers) + headers = self._prepare_headers() if last_event_id: headers[LAST_EVENT_ID] = last_event_id # pragma: no cover @@ -251,7 +271,6 @@ async def handle_get_stream( "GET", self.url, headers=headers, - timeout=httpx.Timeout(self.timeout, read=self.sse_read_timeout), ) as event_source: event_source.response.raise_for_status() logger.debug("GET SSE connection established") @@ -284,7 +303,7 @@ async def handle_get_stream( async def _handle_resumption_request(self, ctx: RequestContext) -> None: """Handle a resumption request using GET with SSE.""" - headers = self._prepare_request_headers(ctx.headers) + headers = self._prepare_headers() if ctx.metadata and ctx.metadata.resumption_token: headers[LAST_EVENT_ID] = ctx.metadata.resumption_token else: @@ -300,7 +319,6 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None: "GET", self.url, headers=headers, - timeout=httpx.Timeout(self.timeout, read=self.sse_read_timeout), ) as event_source: event_source.response.raise_for_status() logger.debug("Resumption GET SSE connection established") @@ -318,7 +336,7 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None: async def _handle_post_request(self, ctx: RequestContext) -> None: """Handle a POST request with response processing.""" - headers = self._prepare_request_headers(ctx.headers) + headers = self._prepare_headers() message = ctx.session_message.message is_initialization = self._is_initialization_request(message) @@ -436,7 +454,7 @@ async def _handle_reconnection( delay_ms = retry_interval_ms if retry_interval_ms is not None else DEFAULT_RECONNECTION_DELAY_MS await anyio.sleep(delay_ms / 1000.0) - headers = self._prepare_request_headers(ctx.headers) + headers = self._prepare_headers() headers[LAST_EVENT_ID] = last_event_id # Extract original request ID to map responses @@ -450,7 +468,6 @@ async def _handle_reconnection( "GET", self.url, headers=headers, - timeout=httpx.Timeout(self.timeout, read=self.sse_read_timeout), ) as event_source: event_source.response.raise_for_status() logger.info("Reconnected to SSE stream") @@ -538,12 +555,10 @@ async def post_writer( ctx = RequestContext( client=client, - headers=self.request_headers, session_id=self.session_id, session_message=session_message, metadata=metadata, read_stream_writer=read_stream_writer, - sse_read_timeout=self.sse_read_timeout, ) async def handle_request_async(): @@ -570,7 +585,7 @@ async def terminate_session(self, client: httpx.AsyncClient) -> None: # pragma: return try: - headers = self._prepare_request_headers(self.request_headers) + headers = self._prepare_headers() response = await client.delete(self.url, headers=headers) if response.status_code == 405: @@ -678,8 +693,8 @@ def start_get_stream() -> None: await write_stream.aclose() -@deprecated("Use `streamable_http_client` instead.") @asynccontextmanager +@deprecated("Use `streamable_http_client` instead.") async def streamablehttp_client( url: str, headers: dict[str, str] | None = None, diff --git a/tests/client/test_http_unicode.py b/tests/client/test_http_unicode.py index 95e01ce577..ec38f35838 100644 --- a/tests/client/test_http_unicode.py +++ b/tests/client/test_http_unicode.py @@ -12,7 +12,7 @@ import pytest from mcp.client.session import ClientSession -from mcp.client.streamable_http import streamablehttp_client +from mcp.client.streamable_http import streamable_http_client from tests.test_helpers import wait_for_server # Test constants with various Unicode characters @@ -178,7 +178,7 @@ async def test_streamable_http_client_unicode_tool_call(running_unicode_server: base_url = running_unicode_server endpoint_url = f"{base_url}/mcp" - async with streamablehttp_client(endpoint_url) as (read_stream, write_stream, _get_session_id): + async with streamable_http_client(endpoint_url) as (read_stream, write_stream, _get_session_id): async with ClientSession(read_stream, write_stream) as session: await session.initialize() @@ -210,7 +210,7 @@ async def test_streamable_http_client_unicode_prompts(running_unicode_server: st base_url = running_unicode_server endpoint_url = f"{base_url}/mcp" - async with streamablehttp_client(endpoint_url) as (read_stream, write_stream, _get_session_id): + async with streamable_http_client(endpoint_url) as (read_stream, write_stream, _get_session_id): async with ClientSession(read_stream, write_stream) as session: await session.initialize() diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 7de3ed80ca..174d33b753 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -9,6 +9,7 @@ import socket import time from collections.abc import Generator +from datetime import timedelta from typing import Any from unittest.mock import MagicMock @@ -25,7 +26,11 @@ import mcp.types as types from mcp.client.session import ClientSession -from mcp.client.streamable_http import StreamableHTTPTransport, streamable_http_client +from mcp.client.streamable_http import ( + StreamableHTTPTransport, + streamable_http_client, + streamablehttp_client, # pyright: ignore[reportDeprecated] +) from mcp.server import Server from mcp.server.streamable_http import ( MCP_PROTOCOL_VERSION_HEADER, @@ -2356,3 +2361,36 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( assert "content-type" in headers_data assert headers_data["content-type"] == "application/json" + + +@pytest.mark.anyio +async def test_streamable_http_transport_deprecated_params_ignored(basic_server: None, basic_server_url: str) -> None: + """Test that deprecated parameters passed to StreamableHTTPTransport are properly ignored.""" + with pytest.warns(DeprecationWarning): + transport = StreamableHTTPTransport( # pyright: ignore[reportDeprecated] + url=f"{basic_server_url}/mcp", + headers={"X-Should-Be-Ignored": "ignored"}, + timeout=999, + sse_read_timeout=timedelta(seconds=999), + auth=None, + ) + + headers = transport._prepare_headers() + assert "X-Should-Be-Ignored" not in headers + assert headers["accept"] == "application/json, text/event-stream" + assert headers["content-type"] == "application/json" + + +@pytest.mark.anyio +async def test_streamablehttp_client_deprecation_warning(basic_server: None, basic_server_url: str) -> None: + """Test that the old streamablehttp_client() function issues a deprecation warning.""" + with pytest.warns(DeprecationWarning, match="Use `streamable_http_client` instead"): + async with streamablehttp_client(f"{basic_server_url}/mcp") as ( # pyright: ignore[reportDeprecated] + read_stream, + write_stream, + _, + ): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + tools = await session.list_tools() + assert len(tools.tools) > 0 From f897932af86e5f032e7116046135b75443d97a73 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 1 Dec 2025 19:49:17 +0000 Subject: [PATCH 13/13] Add pragma comments for coverage on anyio 4.12.0 --- src/mcp/client/streamable_http.py | 14 +------------- tests/shared/test_streamable_http.py | 18 +++++++++--------- 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index a414d6a85b..ed28fcc275 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -23,8 +23,6 @@ from typing_extensions import deprecated from mcp.shared._httpx_utils import ( - MCP_DEFAULT_SSE_READ_TIMEOUT, - MCP_DEFAULT_TIMEOUT, McpHttpClientFactory, create_mcp_http_client, ) @@ -79,7 +77,6 @@ class RequestContext: """Context for a request operation.""" client: httpx.AsyncClient - headers: dict[str, str] session_id: str | None session_message: SessionMessage metadata: ClientMessageMetadata | None @@ -645,16 +642,7 @@ async def streamable_http_client( # Create default client with recommended MCP timeouts client = create_mcp_http_client() - # Extract configuration from the client to pass to transport - headers_dict = dict(client.headers) if client.headers else None - timeout = client.timeout.connect if (client.timeout and client.timeout.connect is not None) else MCP_DEFAULT_TIMEOUT - sse_read_timeout = ( - client.timeout.read if (client.timeout and client.timeout.read is not None) else MCP_DEFAULT_SSE_READ_TIMEOUT - ) - auth = client.auth - - # Create transport with extracted configuration - transport = StreamableHTTPTransport(url, headers_dict, timeout, sse_read_timeout, auth) + transport = StreamableHTTPTransport(url) async with anyio.create_task_group() as tg: try: diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 174d33b753..731dd20dd3 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -1172,7 +1172,7 @@ async def test_streamable_http_client_session_termination(basic_server: None, ba write_stream, _, ): - async with ClientSession(read_stream, write_stream) as session: + async with ClientSession(read_stream, write_stream) as session: # pragma: no branch # Attempt to make a request after termination with pytest.raises( # pragma: no branch McpError, @@ -1239,7 +1239,7 @@ async def mock_delete(self: httpx.AsyncClient, *args: Any, **kwargs: Any) -> htt write_stream, _, ): - async with ClientSession(read_stream, write_stream) as session: + async with ClientSession(read_stream, write_stream) as session: # pragma: no branch # Attempt to make a request after termination with pytest.raises( # pragma: no branch McpError, @@ -1368,8 +1368,8 @@ async def run_tool(): # We should have received the remaining notifications assert len(captured_notifications) == 1 - assert isinstance(captured_notifications[0].root, types.LoggingMessageNotification) - assert captured_notifications[0].root.params.data == "Second notification after lock" + assert isinstance(captured_notifications[0].root, types.LoggingMessageNotification) # pragma: no cover + assert captured_notifications[0].root.params.data == "Second notification after lock" # pragma: no cover @pytest.mark.anyio @@ -1552,7 +1552,7 @@ async def test_streamablehttp_request_context_propagation(context_aware_server: write_stream, _, ): - async with ClientSession(read_stream, write_stream) as session: + async with ClientSession(read_stream, write_stream) as session: # pragma: no branch result = await session.initialize() assert isinstance(result, InitializeResult) assert result.serverInfo.name == "ContextAwareServer" @@ -1590,7 +1590,7 @@ async def test_streamablehttp_request_context_isolation(context_aware_server: No write_stream, _, ): - async with ClientSession(read_stream, write_stream) as session: + async with ClientSession(read_stream, write_stream) as session: # pragma: no branch await session.initialize() # Call the tool that echoes context @@ -2305,7 +2305,7 @@ async def test_streamable_http_client_mcp_headers_override_defaults( write_stream, _, ): - async with ClientSession(read_stream, write_stream) as session: + async with ClientSession(read_stream, write_stream) as session: # pragma: no branch await session.initialize() # Use echo_headers tool to see what headers the server actually received @@ -2340,7 +2340,7 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( write_stream, _, ): - async with ClientSession(read_stream, write_stream) as session: + async with ClientSession(read_stream, write_stream) as session: # pragma: no branch await session.initialize() # Use echo_headers tool to verify both custom and MCP headers are present @@ -2390,7 +2390,7 @@ async def test_streamablehttp_client_deprecation_warning(basic_server: None, bas write_stream, _, ): - async with ClientSession(read_stream, write_stream) as session: + async with ClientSession(read_stream, write_stream) as session: # pragma: no branch await session.initialize() tools = await session.list_tools() assert len(tools.tools) > 0