Skip to content

Commit 8196fec

Browse files
Replace httpx_client_factory with httpx_client parameter
Modernize streamable_http_client API by accepting httpx.AsyncClient instances directly instead of factory functions, following industry standards. - New API: httpx_client: httpx.AsyncClient | None parameter - Default client created with recommended timeouts if None - Deprecated wrapper provides backward compatibility - Updated examples to show custom client usage - Add MCP_DEFAULT_TIMEOUT constants to _httpx_utils
1 parent bcc4b19 commit 8196fec

File tree

7 files changed

+205
-136
lines changed

7 files changed

+205
-136
lines changed

README.md

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2375,6 +2375,7 @@ from pydantic import AnyUrl
23752375
from mcp import ClientSession
23762376
from mcp.client.auth import OAuthClientProvider, TokenStorage
23772377
from mcp.client.streamable_http import streamable_http_client
2378+
from mcp.shared._httpx_utils import create_mcp_http_client
23782379
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
23792380

23802381

@@ -2428,15 +2429,16 @@ async def main():
24282429
callback_handler=handle_callback,
24292430
)
24302431

2431-
async with streamable_http_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _):
2432-
async with ClientSession(read, write) as session:
2433-
await session.initialize()
2432+
async with create_mcp_http_client(auth=oauth_auth) as custom_client:
2433+
async with streamable_http_client("http://localhost:8001/mcp", httpx_client=custom_client) as (read, write, _):
2434+
async with ClientSession(read, write) as session:
2435+
await session.initialize()
24342436

2435-
tools = await session.list_tools()
2436-
print(f"Available tools: {[tool.name for tool in tools.tools]}")
2437+
tools = await session.list_tools()
2438+
print(f"Available tools: {[tool.name for tool in tools.tools]}")
24372439

2438-
resources = await session.list_resources()
2439-
print(f"Available resources: {[r.uri for r in resources.resources]}")
2440+
resources = await session.list_resources()
2441+
print(f"Available resources: {[r.uri for r in resources.resources]}")
24402442

24412443

24422444
def run():

examples/clients/simple-auth-client/mcp_simple_auth_client/main.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,17 @@
1111
import threading
1212
import time
1313
import webbrowser
14-
from datetime import timedelta
1514
from http.server import BaseHTTPRequestHandler, HTTPServer
1615
from typing import Any
1716
from urllib.parse import parse_qs, urlparse
1817

18+
import httpx
19+
1920
from mcp.client.auth import OAuthClientProvider, TokenStorage
2021
from mcp.client.session import ClientSession
2122
from mcp.client.sse import sse_client
2223
from mcp.client.streamable_http import streamable_http_client
24+
from mcp.shared._httpx_utils import create_mcp_http_client
2325
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
2426

2527

@@ -194,9 +196,7 @@ async def _default_redirect_handler(authorization_url: str) -> None:
194196
# Use client_metadata_url to enable CIMD when the server supports it
195197
oauth_auth = OAuthClientProvider(
196198
server_url=self.server_url.replace("/mcp", ""),
197-
client_metadata=OAuthClientMetadata.model_validate(
198-
client_metadata_dict
199-
),
199+
client_metadata=OAuthClientMetadata.model_validate(client_metadata_dict),
200200
storage=InMemoryTokenStorage(),
201201
redirect_handler=_default_redirect_handler,
202202
callback_handler=callback_handler,
@@ -214,12 +214,12 @@ async def _default_redirect_handler(authorization_url: str) -> None:
214214
await self._run_session(read_stream, write_stream, None)
215215
else:
216216
print("📡 Opening StreamableHTTP transport connection with auth...")
217-
async with streamable_http_client(
218-
url=self.server_url,
219-
auth=oauth_auth,
220-
timeout=timedelta(seconds=60),
221-
) as (read_stream, write_stream, get_session_id):
222-
await self._run_session(read_stream, write_stream, get_session_id)
217+
async with create_mcp_http_client(auth=oauth_auth) as custom_client:
218+
async with streamable_http_client(
219+
url=self.server_url,
220+
httpx_client=custom_client,
221+
) as (read_stream, write_stream, get_session_id):
222+
await self._run_session(read_stream, write_stream, get_session_id)
223223

224224
except Exception as e:
225225
print(f"❌ Failed to connect: {e}")
@@ -329,9 +329,7 @@ async def interactive_loop(self):
329329
await self.call_tool(tool_name, arguments)
330330

331331
else:
332-
print(
333-
"❌ Unknown command. Try 'list', 'call <tool_name>', or 'quit'"
334-
)
332+
print("❌ Unknown command. Try 'list', 'call <tool_name>', or 'quit'")
335333

336334
except KeyboardInterrupt:
337335
print("\n\n👋 Goodbye!")

examples/snippets/clients/oauth_client.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from mcp import ClientSession
1616
from mcp.client.auth import OAuthClientProvider, TokenStorage
1717
from mcp.client.streamable_http import streamable_http_client
18+
from mcp.shared._httpx_utils import create_mcp_http_client
1819
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
1920

2021

@@ -68,15 +69,16 @@ async def main():
6869
callback_handler=handle_callback,
6970
)
7071

71-
async with streamable_http_client("http://localhost:8001/mcp", auth=oauth_auth) as (read, write, _):
72-
async with ClientSession(read, write) as session:
73-
await session.initialize()
72+
async with create_mcp_http_client(auth=oauth_auth) as custom_client:
73+
async with streamable_http_client("http://localhost:8001/mcp", httpx_client=custom_client) as (read, write, _):
74+
async with ClientSession(read, write) as session:
75+
await session.initialize()
7476

75-
tools = await session.list_tools()
76-
print(f"Available tools: {[tool.name for tool in tools.tools]}")
77+
tools = await session.list_tools()
78+
print(f"Available tools: {[tool.name for tool in tools.tools]}")
7779

78-
resources = await session.list_resources()
79-
print(f"Available resources: {[r.uri for r in resources.resources]}")
80+
resources = await session.list_resources()
81+
print(f"Available resources: {[r.uri for r in resources.resources]}")
8082

8183

8284
def run():

src/mcp/client/session_group.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from typing import Any, TypeAlias, overload
1818

1919
import anyio
20+
import httpx
2021
from pydantic import BaseModel
2122
from typing_extensions import Self, deprecated
2223

@@ -26,6 +27,7 @@
2627
from mcp.client.sse import sse_client
2728
from mcp.client.stdio import StdioServerParameters
2829
from mcp.client.streamable_http import streamable_http_client
30+
from mcp.shared._httpx_utils import create_mcp_http_client
2931
from mcp.shared.exceptions import McpError
3032
from mcp.shared.session import ProgressFnT
3133

@@ -309,11 +311,18 @@ async def _establish_session(
309311
)
310312
read, write = await session_stack.enter_async_context(client)
311313
else:
314+
httpx_client = create_mcp_http_client(
315+
headers=server_params.headers,
316+
timeout=httpx.Timeout(
317+
server_params.timeout.total_seconds(),
318+
read=server_params.sse_read_timeout.total_seconds(),
319+
),
320+
)
321+
await session_stack.enter_async_context(httpx_client)
322+
312323
client = streamable_http_client(
313324
url=server_params.url,
314-
headers=server_params.headers,
315-
timeout=server_params.timeout,
316-
sse_read_timeout=server_params.sse_read_timeout,
325+
httpx_client=httpx_client,
317326
terminate_on_close=server_params.terminate_on_close,
318327
)
319328
read, write, _ = await session_stack.enter_async_context(client)

src/mcp/client/streamable_http.py

Lines changed: 68 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
and session management.
77
"""
88

9+
import contextlib
910
import logging
1011
from collections.abc import AsyncGenerator, Awaitable, Callable
1112
from contextlib import asynccontextmanager
@@ -19,7 +20,12 @@
1920
from httpx_sse import EventSource, ServerSentEvent, aconnect_sse
2021
from typing_extensions import deprecated
2122

22-
from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client
23+
from mcp.shared._httpx_utils import (
24+
MCP_DEFAULT_SSE_READ_TIMEOUT,
25+
MCP_DEFAULT_TIMEOUT,
26+
McpHttpClientFactory,
27+
create_mcp_http_client,
28+
)
2329
from mcp.shared.message import ClientMessageMetadata, SessionMessage
2430
from mcp.types import (
2531
ErrorData,
@@ -106,9 +112,9 @@ def __init__(
106112
self.session_id = None
107113
self.protocol_version = None
108114
self.request_headers = {
115+
**self.headers,
109116
ACCEPT: f"{JSON}, {SSE}",
110117
CONTENT_TYPE: JSON,
111-
**self.headers,
112118
}
113119

114120
def _prepare_request_headers(self, base_headers: dict[str, str]) -> dict[str, str]:
@@ -563,12 +569,9 @@ def get_session_id(self) -> str | None:
563569
@asynccontextmanager
564570
async def streamable_http_client(
565571
url: str,
566-
headers: dict[str, str] | None = None,
567-
timeout: float | timedelta = 30,
568-
sse_read_timeout: float | timedelta = 60 * 5,
572+
*,
573+
httpx_client: httpx.AsyncClient | None = None,
569574
terminate_on_close: bool = True,
570-
httpx_client_factory: McpHttpClientFactory = create_mcp_http_client,
571-
auth: httpx.Auth | None = None,
572575
) -> AsyncGenerator[
573576
tuple[
574577
MemoryObjectReceiveStream[SessionMessage | Exception],
@@ -580,30 +583,57 @@ async def streamable_http_client(
580583
"""
581584
Client transport for StreamableHTTP.
582585
583-
`sse_read_timeout` determines how long (in seconds) the client will wait for a new
584-
event before disconnecting. All other HTTP operations are controlled by `timeout`.
586+
Args:
587+
url: The MCP server endpoint URL.
588+
httpx_client: Optional pre-configured httpx.AsyncClient. If None, a default
589+
client with recommended MCP timeouts will be created. To configure headers,
590+
authentication, or other HTTP settings, create an httpx.AsyncClient and pass it here.
591+
terminate_on_close: If True, send a DELETE request to terminate the session
592+
when the context exits.
585593
586594
Yields:
587595
Tuple containing:
588596
- read_stream: Stream for reading messages from the server
589597
- write_stream: Stream for sending messages to the server
590598
- get_session_id_callback: Function to retrieve the current session ID
591-
"""
592-
transport = StreamableHTTPTransport(url, headers, timeout, sse_read_timeout, auth)
593599
600+
Example:
601+
See examples/snippets/clients/ for usage patterns.
602+
"""
594603
read_stream_writer, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](0)
595604
write_stream, write_stream_reader = anyio.create_memory_object_stream[SessionMessage](0)
596605

606+
# Determine if we need to create and manage the client
607+
client_provided = httpx_client is not None
608+
client = httpx_client
609+
610+
if client is None:
611+
# Create default client with recommended MCP timeouts
612+
client = create_mcp_http_client()
613+
614+
# Extract configuration from the client to pass to transport
615+
headers_dict = dict(client.headers) if client.headers else None
616+
timeout = client.timeout.connect if (client.timeout and client.timeout.connect is not None) else MCP_DEFAULT_TIMEOUT
617+
sse_read_timeout = (
618+
client.timeout.read if (client.timeout and client.timeout.read is not None) else MCP_DEFAULT_SSE_READ_TIMEOUT
619+
)
620+
auth = client.auth
621+
622+
# Create transport with extracted configuration
623+
transport = StreamableHTTPTransport(url, headers_dict, timeout, sse_read_timeout, auth)
624+
625+
# Sync client headers with transport's merged headers (includes MCP protocol requirements)
626+
client.headers.update(transport.request_headers)
627+
597628
async with anyio.create_task_group() as tg:
598629
try:
599630
logger.debug(f"Connecting to StreamableHTTP endpoint: {url}")
600631

601-
async with httpx_client_factory(
602-
headers=transport.request_headers,
603-
timeout=httpx.Timeout(transport.timeout, read=transport.sse_read_timeout),
604-
auth=transport.auth,
605-
) as client:
606-
# Define callbacks that need access to tg
632+
async with contextlib.AsyncExitStack() as stack:
633+
# Only manage client lifecycle if we created it
634+
if not client_provided:
635+
await stack.enter_async_context(client)
636+
607637
def start_get_stream() -> None:
608638
tg.start_soon(transport.handle_get_stream, client, read_stream_writer)
609639

@@ -650,7 +680,24 @@ async def streamablehttp_client(
650680
],
651681
None,
652682
]:
653-
async with streamable_http_client(
654-
url, headers, timeout, sse_read_timeout, terminate_on_close, httpx_client_factory, auth
655-
) as streams:
656-
yield streams
683+
# Convert timeout parameters
684+
timeout_seconds = timeout.total_seconds() if isinstance(timeout, timedelta) else timeout
685+
sse_read_timeout_seconds = (
686+
sse_read_timeout.total_seconds() if isinstance(sse_read_timeout, timedelta) else sse_read_timeout
687+
)
688+
689+
# Create httpx client using the factory with old-style parameters
690+
client = httpx_client_factory(
691+
headers=headers,
692+
timeout=httpx.Timeout(timeout_seconds, read=sse_read_timeout_seconds),
693+
auth=auth,
694+
)
695+
696+
# Manage client lifecycle since we created it
697+
async with client:
698+
async with streamable_http_client(
699+
url,
700+
httpx_client=client,
701+
terminate_on_close=terminate_on_close,
702+
) as streams:
703+
yield streams

src/mcp/shared/_httpx_utils.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44

55
import httpx
66

7-
__all__ = ["create_mcp_http_client"]
7+
__all__ = ["create_mcp_http_client", "MCP_DEFAULT_TIMEOUT", "MCP_DEFAULT_SSE_READ_TIMEOUT"]
8+
9+
# Default MCP timeout configuration
10+
MCP_DEFAULT_TIMEOUT = 30.0 # General operations (seconds)
11+
MCP_DEFAULT_SSE_READ_TIMEOUT = 300.0 # SSE streams - 5 minutes (seconds)
812

913

1014
class McpHttpClientFactory(Protocol): # pragma: no branch
@@ -68,7 +72,7 @@ def create_mcp_http_client(
6872

6973
# Handle timeout
7074
if timeout is None:
71-
kwargs["timeout"] = httpx.Timeout(30.0)
75+
kwargs["timeout"] = httpx.Timeout(MCP_DEFAULT_TIMEOUT, read=MCP_DEFAULT_SSE_READ_TIMEOUT)
7276
else:
7377
kwargs["timeout"] = timeout
7478

0 commit comments

Comments
 (0)