22"""
33Simple MCP streamable private gateway client example without authentication.
44
5- This client connects to an MCP server using streamable HTTP or SSE transport.
5+ This client connects to an MCP server using streamable HTTP or SSE transport
6+ with custom extensions for private gateway connectivity (SNI hostname support).
67
78"""
89
910import asyncio
10- import os
11+ from collections . abc import Callable
1112from datetime import timedelta
1213from typing import Any
1314
15+ from anyio .streams .memory import MemoryObjectReceiveStream , MemoryObjectSendStream
16+
1417from mcp .client .session import ClientSession
18+ from mcp .client .sse import sse_client
1519from mcp .client .streamable_http import streamablehttp_client
20+ from mcp .shared .message import SessionMessage
1621
1722
1823class SimpleStreamablePrivateGateway :
19- """Simple MCP streamable private gateway client without authentication."""
24+ """Simple MCP private gateway client supporting StreamableHTTP and SSE transports.
25+
26+ This client demonstrates how to use custom extensions (e.g., SNI hostname) for
27+ private gateway connectivity with both transport types.
28+ """
2029
2130 def __init__ (self , server_url : str , server_hostname : str , transport_type : str = "streamable-http" ):
2231 self .server_url = server_url
@@ -29,25 +38,42 @@ async def connect(self):
2938 print (f"🔗 Attempting to connect to { self .server_url } ..." )
3039
3140 try :
32- print ("📡 Opening StreamableHTTP transport connection..." )
33- # Note: terminate_on_close=False prevents SSL handshake failures during exit
34- # Some servers may not handle session termination gracefully over SSL
35- async with streamablehttp_client (
36- url = self .server_url ,
37- headers = {"Host" : self .server_hostname },
38- extensions = {"sni_hostname" : self .server_hostname },
39- timeout = timedelta (seconds = 60 ),
40- terminate_on_close = False , # Skip session termination to avoid SSL errors
41- ) as (read_stream , write_stream , get_session_id ):
42- await self ._run_session (read_stream , write_stream , get_session_id )
41+ # Create transport based on transport type
42+ if self .transport_type == "sse" :
43+ print ("📡 Opening SSE transport connection with extensions..." )
44+ # SSE transport with custom extensions for private gateway
45+ async with sse_client (
46+ url = self .server_url ,
47+ headers = {"Host" : self .server_hostname },
48+ extensions = {"sni_hostname" : self .server_hostname },
49+ timeout = 60 ,
50+ ) as (read_stream , write_stream ):
51+ await self ._run_session (read_stream , write_stream , None )
52+ else :
53+ print ("📡 Opening StreamableHTTP transport connection with extensions..." )
54+ # Note: terminate_on_close=False prevents SSL handshake failures during exit
55+ # Some servers may not handle session termination gracefully over SSL
56+ async with streamablehttp_client (
57+ url = self .server_url ,
58+ headers = {"Host" : self .server_hostname },
59+ extensions = {"sni_hostname" : self .server_hostname },
60+ timeout = timedelta (seconds = 60 ),
61+ terminate_on_close = False , # Skip session termination to avoid SSL errors
62+ ) as (read_stream , write_stream , get_session_id ):
63+ await self ._run_session (read_stream , write_stream , get_session_id )
4364
4465 except Exception as e :
4566 print (f"❌ Failed to connect: { e } " )
4667 import traceback
4768
4869 traceback .print_exc ()
4970
50- async def _run_session (self , read_stream , write_stream , get_session_id ):
71+ async def _run_session (
72+ self ,
73+ read_stream : MemoryObjectReceiveStream [SessionMessage | Exception ],
74+ write_stream : MemoryObjectSendStream [SessionMessage ],
75+ get_session_id : Callable [[], str | None ] | None ,
76+ ):
5177 """Run the MCP session with the given streams."""
5278 print ("🤝 Initializing MCP session..." )
5379 async with ClientSession (read_stream , write_stream ) as session :
@@ -107,7 +133,7 @@ async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None = Non
107133
108134 async def interactive_loop (self ):
109135 """Run interactive command loop."""
110- print ("\n 🎯 Interactive Streamable Private Gateway" )
136+ print ("\n 🎯 Interactive MCP Client ( Private Gateway) " )
111137 print ("Commands:" )
112138 print (" list - List available tools" )
113139 print (" call <tool_name> [args] - Call a tool" )
@@ -160,23 +186,57 @@ async def interactive_loop(self):
160186 break
161187
162188
163- async def main ():
164- """Main entry point."""
165- # Default server URL - can be overridden with environment variable
166- # Most MCP streamable HTTP servers use /mcp as the endpoint
167- server_port = os .getenv ("MCP_SERVER_PORT" , "8081" )
168- server_hostname = os .getenv ("MCP_SERVER_HOSTNAME" , "mcp.deepwiki.com" )
169- transport_type = "streamable-http"
170- server_url = f"https://localhost:{ server_port } /mcp"
171-
189+ def get_user_input ():
190+ """Get server configuration from user input."""
172191 print ("🚀 Simple Streamable Private Gateway" )
173- print (f"Connecting to: { server_url } " )
174- print (f"Server hostname: { server_hostname } " )
175- print (f"Transport type: { transport_type } " )
192+ print ("\n 📝 Server Configuration" )
193+ print ("=" * 50 )
194+
195+ # Get server port
196+ server_port = input ("Server port [8081]: " ).strip () or "8081"
197+
198+ # Get server hostname
199+ server_hostname = input ("Server hostname [mcp.deepwiki.com]: " ).strip () or "mcp.deepwiki.com"
200+
201+ # Get transport type
202+ print ("\n Transport type:" )
203+ print (" 1. streamable-http (default)" )
204+ print (" 2. sse" )
205+ transport_choice = input ("Select transport [1]: " ).strip () or "1"
206+
207+ if transport_choice == "2" :
208+ transport_type = "sse"
209+ else :
210+ transport_type = "streamable-http"
211+
212+ print ("=" * 50 )
213+
214+ return server_port , server_hostname , transport_type
215+
176216
177- # Start connection flow
178- client = SimpleStreamablePrivateGateway (server_url , server_hostname , transport_type )
179- await client .connect ()
217+ async def main ():
218+ """Main entry point."""
219+ try :
220+ # Get configuration from user input
221+ server_port , server_hostname , transport_type = get_user_input ()
222+
223+ # Set URL endpoint based on transport type
224+ # StreamableHTTP servers typically use /mcp, SSE servers use /sse
225+ endpoint = "/mcp" if transport_type == "streamable-http" else "/sse"
226+ server_url = f"https://localhost:{ server_port } { endpoint } "
227+
228+ print (f"\n 🔗 Connecting to: { server_url } " )
229+ print (f"📡 Server hostname: { server_hostname } " )
230+ print (f"🚀 Transport type: { transport_type } \n " )
231+
232+ # Start connection flow
233+ client = SimpleStreamablePrivateGateway (server_url , server_hostname , transport_type )
234+ await client .connect ()
235+
236+ except KeyboardInterrupt :
237+ print ("\n \n 👋 Goodbye!" )
238+ except EOFError :
239+ print ("\n 👋 Goodbye!" )
180240
181241
182242def cli ():
0 commit comments