1616import httpx
1717from anyio .abc import TaskGroup
1818from anyio .streams .memory import MemoryObjectReceiveStream , MemoryObjectSendStream
19+ from httpx ._config import DEFAULT_TIMEOUT_CONFIG
1920from httpx_sse import EventSource , ServerSentEvent , aconnect_sse
2021
21- from mcp .shared ._httpx_utils import McpHttpClientFactory , create_mcp_http_client
22+ from mcp .shared ._httpx_utils import create_mcp_http_client
2223from mcp .shared .message import ClientMessageMetadata , SessionMessage
2324from mcp .types import (
2425 ErrorData ,
3334
3435logger = logging .getLogger (__name__ )
3536
37+ HTTPX_DEFAULT_TIMEOUT = DEFAULT_TIMEOUT_CONFIG
3638
3739SessionMessageOrError = SessionMessage | Exception
3840StreamWriter = MemoryObjectSendStream [SessionMessageOrError ]
@@ -448,8 +450,8 @@ async def streamablehttp_client(
448450 timeout : float | timedelta = 30 ,
449451 sse_read_timeout : float | timedelta = 60 * 5 ,
450452 terminate_on_close : bool = True ,
451- httpx_client_factory : McpHttpClientFactory = create_mcp_http_client ,
452453 auth : httpx .Auth | None = None ,
454+ httpx_client : httpx .AsyncClient | None = None ,
453455) -> AsyncGenerator [
454456 tuple [
455457 MemoryObjectReceiveStream [SessionMessage | Exception ],
@@ -464,6 +466,19 @@ async def streamablehttp_client(
464466 `sse_read_timeout` determines how long (in seconds) the client will wait for a new
465467 event before disconnecting. All other HTTP operations are controlled by `timeout`.
466468
469+ Args:
470+ url: The StreamableHTTP endpoint URL.
471+ headers: Optional headers to include in requests.
472+ timeout: HTTP timeout for regular operations. Defaults to 30 seconds.
473+ Can be specified as float (seconds) or timedelta object.
474+ sse_read_timeout: Timeout for SSE read operations. Defaults to 300 seconds (5 minutes).
475+ Can be specified as float (seconds) or timedelta object.
476+ terminate_on_close: Whether to send a terminate request when closing the connection.
477+ auth: Optional HTTPX authentication handler.
478+ httpx_client: Optional pre-configured httpx.AsyncClient. If provided, the client's
479+ existing configuration is preserved. Timeout is only overridden if the provided
480+ client uses httpx's default timeout configuration.
481+
467482 Yields:
468483 Tuple containing:
469484 - read_stream: Stream for reading messages from the server
@@ -475,15 +490,30 @@ async def streamablehttp_client(
475490 read_stream_writer , read_stream = anyio .create_memory_object_stream [SessionMessage | Exception ](0 )
476491 write_stream , write_stream_reader = anyio .create_memory_object_stream [SessionMessage ](0 )
477492
493+ if httpx_client is not None :
494+ client = httpx_client
495+ if not getattr (client , "follow_redirects" , False ):
496+ logger .warning ("httpx_client does not have follow_redirects=True, which is recommended for MCP" )
497+ if headers :
498+ existing_headers = dict (client .headers ) if client .headers else {}
499+ existing_headers .update (transport .request_headers )
500+ client .headers = existing_headers
501+ if auth and not client .auth :
502+ client .auth = auth
503+ if client .timeout == HTTPX_DEFAULT_TIMEOUT :
504+ client .timeout = httpx .Timeout (transport .timeout , read = transport .sse_read_timeout )
505+ else :
506+ client = create_mcp_http_client (
507+ headers = transport .request_headers ,
508+ timeout = httpx .Timeout (transport .timeout , read = transport .sse_read_timeout ),
509+ auth = transport .auth ,
510+ )
511+
478512 async with anyio .create_task_group () as tg :
479513 try :
480514 logger .debug (f"Connecting to StreamableHTTP endpoint: { url } " )
481515
482- async with httpx_client_factory (
483- headers = transport .request_headers ,
484- timeout = httpx .Timeout (transport .timeout , read = transport .sse_read_timeout ),
485- auth = transport .auth ,
486- ) as client :
516+ async with client :
487517 # Define callbacks that need access to tg
488518 def start_get_stream () -> None :
489519 tg .start_soon (transport .handle_get_stream , client , read_stream_writer )
0 commit comments