-
Notifications
You must be signed in to change notification settings - Fork 5
feat: add streamable-http support #176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,7 +37,7 @@ | |
| from .._utils._config import McpServer | ||
| from ._context import UiPathServerType | ||
| from ._exception import McpErrorCode, UiPathMcpRuntimeError | ||
| from ._session import SessionServer | ||
| from ._session import BaseSessionServer, SessionServer, StreamableHttpSessionServer | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
| tracer = trace.get_tracer(__name__) | ||
|
|
@@ -76,10 +76,14 @@ def __init__( | |
| self._server_slug = server_slug | ||
|
|
||
| self._signalr_client: SignalRClient | None = None | ||
| self._session_servers: dict[str, SessionServer] = {} | ||
| self._session_servers: dict[str, BaseSessionServer] = {} | ||
| self._session_output: str | None = None | ||
| self._cancel_event = asyncio.Event() | ||
| self._keep_alive_task: asyncio.Task[None] | None = None | ||
| self._http_server_process: asyncio.subprocess.Process | None = None | ||
| self._http_monitor_task: asyncio.Task[None] | None = None | ||
| self._http_stderr_drain_task: asyncio.Task[None] | None = None | ||
| self._http_server_stderr_lines: list[str] = [] | ||
| self._uipath = UiPath() | ||
| self._cleanup_done = False | ||
|
|
||
|
|
@@ -223,6 +227,12 @@ async def _run_server(self) -> UiPathRuntimeResult: | |
| # Register the local server with UiPath MCP Server | ||
| await self._register() | ||
|
|
||
| # Start HTTP server process monitor if using streamable-http | ||
| if self._server.is_streamable_http: | ||
| self._http_monitor_task = asyncio.create_task( | ||
| self._monitor_http_server_process() | ||
| ) | ||
|
|
||
| run_task = asyncio.create_task(self._signalr_client.run()) | ||
| cancel_task = asyncio.create_task(self._cancel_event.wait()) | ||
| self._keep_alive_task = asyncio.create_task(self._keep_alive()) | ||
|
|
@@ -300,6 +310,9 @@ async def _cleanup(self) -> None: | |
| except Exception as e: | ||
| logger.error(f"Error cleaning up session server {session_id}: {str(e)}") | ||
|
|
||
| # Stop the shared HTTP server process (streamable-http only) | ||
| await self._stop_http_server_process() | ||
|
|
||
| if self._signalr_client and hasattr(self._signalr_client, "_transport"): | ||
| transport = self._signalr_client._transport | ||
| if transport and hasattr(transport, "_ws") and transport._ws: | ||
|
|
@@ -358,8 +371,13 @@ async def _handle_signalr_message(self, args: list[str]) -> None: | |
| try: | ||
| # Check if we have a session server for this session_id | ||
| if session_id not in self._session_servers: | ||
| # Create and start a new session server | ||
| session_server = SessionServer(self._server, self.slug, session_id) | ||
| session_server: BaseSessionServer | ||
| if self._server.is_streamable_http: | ||
| session_server = StreamableHttpSessionServer( | ||
| self._server, self.slug, session_id | ||
| ) | ||
| else: | ||
| session_server = SessionServer(self._server, self.slug, session_id) | ||
| try: | ||
| await session_server.start() | ||
| except Exception as e: | ||
|
|
@@ -393,6 +411,156 @@ async def _handle_signalr_close(self) -> None: | |
| """Handle SignalR connection close event.""" | ||
| logger.info("Websocket connection closed.") | ||
|
|
||
| async def _start_http_server_process(self) -> None: | ||
| """Spawn the streamable-http server process. | ||
|
|
||
| The process is started once and shared across all sessions. | ||
| """ | ||
| env_vars = self._server.env.copy() | ||
| if self.server_type is UiPathServerType.Coded: | ||
| for name, value in os.environ.items(): | ||
| if name not in env_vars: | ||
| env_vars[name] = value | ||
|
|
||
| merged_env = {**os.environ, **env_vars} if env_vars else None | ||
| self._http_server_stderr_lines = [] | ||
| self._http_server_process = await asyncio.create_subprocess_exec( | ||
| self._server.command, | ||
| *self._server.args, | ||
| env=merged_env, | ||
| stdout=asyncio.subprocess.DEVNULL, | ||
| stderr=asyncio.subprocess.PIPE, | ||
| ) | ||
| self._http_stderr_drain_task = asyncio.create_task(self._drain_http_stderr()) | ||
| logger.info( | ||
| f"Started HTTP server process (PID: {self._http_server_process.pid}) " | ||
| f"for {self._server.url}" | ||
| ) | ||
|
|
||
| async def _drain_http_stderr(self) -> None: | ||
| """Continuously read and log stderr from the HTTP server process. | ||
|
|
||
| Accumulates output in _http_server_stderr_lines for error reporting. | ||
| """ | ||
| if not self._http_server_process or not self._http_server_process.stderr: | ||
| return | ||
| try: | ||
| async for line in self._http_server_process.stderr: | ||
| decoded = line.decode("utf-8", errors="replace").rstrip() | ||
| self._http_server_stderr_lines.append(decoded) | ||
| logger.debug(f"HTTP server stderr: {decoded}") | ||
| except asyncio.CancelledError: | ||
| pass | ||
|
|
||
| async def _wait_for_http_server_ready( | ||
| self, | ||
| max_retries: int = 30, | ||
| retry_delay: float = 1.0, | ||
| ) -> None: | ||
| """Wait for the HTTP server to start accepting connections.""" | ||
| import httpx | ||
|
|
||
| url = self._server.url | ||
| if not url: | ||
| raise ValueError("streamable-http transport requires url in config") | ||
|
|
||
| for attempt in range(max_retries): | ||
| # Check if process has crashed | ||
| if ( | ||
| self._http_server_process | ||
| and self._http_server_process.returncode is not None | ||
| ): | ||
| stderr_output = "\n".join(self._http_server_stderr_lines) | ||
| raise UiPathMcpRuntimeError( | ||
| McpErrorCode.INITIALIZATION_ERROR, | ||
| "HTTP server process exited unexpectedly", | ||
| f"Exit code: {self._http_server_process.returncode}\n{stderr_output}", | ||
| UiPathErrorCategory.SYSTEM, | ||
| ) | ||
|
|
||
| try: | ||
| async with httpx.AsyncClient() as client: | ||
| response = await client.get(url, timeout=2.0) | ||
| logger.info( | ||
| f"HTTP server is ready (status: {response.status_code})" | ||
| ) | ||
| return | ||
| except (httpx.ConnectError, httpx.ConnectTimeout) as err: | ||
| if attempt < max_retries - 1: | ||
| logger.debug( | ||
| f"HTTP server not ready yet, retrying in {retry_delay}s " | ||
| f"(attempt {attempt + 1}/{max_retries})" | ||
| ) | ||
| await asyncio.sleep(retry_delay) | ||
| else: | ||
| raise UiPathMcpRuntimeError( | ||
| McpErrorCode.INITIALIZATION_ERROR, | ||
| "HTTP server failed to start", | ||
| f"Server at {url} did not become ready after {max_retries} attempts", | ||
| UiPathErrorCategory.SYSTEM, | ||
| ) from err | ||
| except httpx.HTTPError: | ||
| # Any other HTTP error means server is listening | ||
| logger.info("HTTP server is ready (responded with error, but is up)") | ||
| return | ||
|
Comment on lines
+502
to
+505
|
||
|
|
||
| async def _stop_http_server_process(self) -> None: | ||
| """Stop the shared HTTP server process.""" | ||
| if self._http_monitor_task and not self._http_monitor_task.done(): | ||
| self._http_monitor_task.cancel() | ||
| try: | ||
| await self._http_monitor_task | ||
| except asyncio.CancelledError: | ||
| pass | ||
| self._http_monitor_task = None | ||
|
|
||
| if self._http_server_process: | ||
| try: | ||
| self._http_server_process.terminate() | ||
| try: | ||
| await asyncio.wait_for( | ||
| self._http_server_process.wait(), timeout=5.0 | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| self._http_server_process.kill() | ||
| await self._http_server_process.wait() | ||
| except ProcessLookupError: | ||
| pass | ||
| finally: | ||
| logger.info("HTTP server process stopped") | ||
| self._http_server_process = None | ||
|
|
||
| if self._http_stderr_drain_task and not self._http_stderr_drain_task.done(): | ||
| self._http_stderr_drain_task.cancel() | ||
| try: | ||
| await self._http_stderr_drain_task | ||
| except asyncio.CancelledError: | ||
| pass | ||
| self._http_stderr_drain_task = None | ||
|
|
||
| async def _monitor_http_server_process(self) -> None: | ||
| """Monitor the HTTP server process and handle unexpected exits.""" | ||
| if not self._http_server_process: | ||
| return | ||
| try: | ||
| returncode = await self._http_server_process.wait() | ||
| if not self._cancel_event.is_set(): | ||
| logger.error( | ||
| f"HTTP server process exited unexpectedly with code {returncode}" | ||
| ) | ||
| # Stop all HTTP sessions, they will fail on next request anyway | ||
| for session_id, session_server in list(self._session_servers.items()): | ||
| if isinstance(session_server, StreamableHttpSessionServer): | ||
| try: | ||
| await session_server.stop() | ||
| except Exception as e: | ||
| logger.error( | ||
| f"Error stopping session {session_id} after process crash: {e}" | ||
| ) | ||
| self._session_servers.pop(session_id, None) | ||
| except asyncio.CancelledError: | ||
| pass | ||
|
|
||
| async def _register(self) -> None: | ||
| """Register the MCP server with UiPath.""" | ||
|
|
||
|
|
@@ -409,36 +577,63 @@ async def _register(self) -> None: | |
| env_vars[name] = value | ||
|
|
||
| try: | ||
| # Create a temporary session to get tools | ||
| server_params = StdioServerParameters( | ||
| command=self._server.command, | ||
| args=self._server.args, | ||
| env=env_vars, | ||
| ) | ||
| if self._server.is_streamable_http: | ||
| # spawn process, wait for readiness, connect via HTTP | ||
| await self._start_http_server_process() | ||
| await self._wait_for_http_server_ready() | ||
|
|
||
| # Start a temporary stdio client to get tools | ||
| # Use a temporary file to capture stderr | ||
| with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary: | ||
| stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8") | ||
| async with stdio_client(server_params, errlog=stderr_temp) as ( | ||
| from mcp.client.streamable_http import streamable_http_client | ||
|
|
||
| if self._server.url is None: | ||
| raise UiPathMcpRuntimeError( | ||
| McpErrorCode.CONFIGURATION_ERROR, | ||
| "Missing URL for streamable-http server", | ||
| "Please specify a 'url' in the server configuration for streamable-http transport.", | ||
| UiPathErrorCategory.SYSTEM, | ||
| ) | ||
| async with streamable_http_client(self._server.url) as ( | ||
| read, | ||
| write, | ||
| _, | ||
| ): | ||
| async with ClientSession(read, write) as session: | ||
| logger.info("Initializing client session...") | ||
| # Try to initialize with timeout | ||
| logger.info("Initializing client session (streamable-http)...") | ||
| try: | ||
| await asyncio.wait_for(session.initialize(), timeout=30) | ||
| initialization_successful = True | ||
| logger.info("Initialization successful") | ||
|
|
||
| # Only proceed if initialization was successful | ||
| tools_result = await session.list_tools() | ||
| logger.info(f"Discovered {len(tools_result.tools)} tool(s)") | ||
| except Exception as err: | ||
| logger.error(f"Initialization error: {err}") | ||
radugheo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # Capture stderr output here, after the timeout | ||
| stderr_temp.seek(0) | ||
| server_stderr_output = stderr_temp.read() | ||
| server_stderr_output = "\n".join( | ||
| self._http_server_stderr_lines | ||
| ) | ||
| logger.info("Registration session closed (DELETE sent to server)") | ||
| else: | ||
| # spawn temporary process, discover tools, process dies with context | ||
| server_params = StdioServerParameters( | ||
| command=self._server.command, | ||
| args=self._server.args, | ||
| env=env_vars, | ||
| ) | ||
|
|
||
| with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary: | ||
| stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8") | ||
| async with stdio_client(server_params, errlog=stderr_temp) as ( | ||
| read, | ||
| write, | ||
| ): | ||
| async with ClientSession(read, write) as session: | ||
| logger.info("Initializing client session...") | ||
| try: | ||
| await asyncio.wait_for(session.initialize(), timeout=30) | ||
| initialization_successful = True | ||
| logger.info("Initialization successful") | ||
| tools_result = await session.list_tools() | ||
| except Exception as err: | ||
| logger.error(f"Initialization error: {err}") | ||
| stderr_temp.seek(0) | ||
| server_stderr_output = stderr_temp.read() | ||
|
|
||
| except* Exception as eg: | ||
| for e in eg.exceptions: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.