From c1fc90cdbcc3794105c66aa21844d083612360a9 Mon Sep 17 00:00:00 2001 From: Radu Date: Fri, 20 Feb 2026 10:31:11 +0200 Subject: [PATCH] feat: add streamable-http support --- pyproject.toml | 2 +- src/uipath_mcp/_cli/_runtime/_factory.py | 19 +- src/uipath_mcp/_cli/_runtime/_runtime.py | 241 +++++++++++++++-- src/uipath_mcp/_cli/_runtime/_session.py | 331 +++++++++++++---------- src/uipath_mcp/_cli/_utils/_config.py | 20 +- uv.lock | 2 +- 6 files changed, 442 insertions(+), 173 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2b0690a..8da4709 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-mcp" -version = "0.1.0" +version = "0.1.1" description = "UiPath MCP SDK" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath_mcp/_cli/_runtime/_factory.py b/src/uipath_mcp/_cli/_runtime/_factory.py index 5ef5832..4c144c0 100644 --- a/src/uipath_mcp/_cli/_runtime/_factory.py +++ b/src/uipath_mcp/_cli/_runtime/_factory.py @@ -110,9 +110,26 @@ async def new_runtime( McpErrorCode.SERVER_NOT_FOUND, "MCP server not found", f"Server '{entrypoint}' not found. Available: {available}", - UiPathErrorCategory.DEPLOYMENT, + UiPathErrorCategory.USER, ) + # Validate streamable-http configuration + if server.is_streamable_http: + if not server.url: + raise UiPathMcpRuntimeError( + McpErrorCode.CONFIGURATION_ERROR, + "Invalid configuration", + f"Server '{entrypoint}' uses streamable-http transport but 'url' is not specified in mcp.json", + UiPathErrorCategory.USER, + ) + if not server.command or server.command == "None": + raise UiPathMcpRuntimeError( + McpErrorCode.CONFIGURATION_ERROR, + "Invalid configuration", + f"Server '{entrypoint}' uses streamable-http transport but 'command' is not specified in mcp.json", + UiPathErrorCategory.USER, + ) + # Validate runtime_id is a valid UUID, generate new one if not try: uuid.UUID(runtime_id) diff --git a/src/uipath_mcp/_cli/_runtime/_runtime.py b/src/uipath_mcp/_cli/_runtime/_runtime.py index bf967d7..83b87de 100644 --- a/src/uipath_mcp/_cli/_runtime/_runtime.py +++ b/src/uipath_mcp/_cli/_runtime/_runtime.py @@ -37,7 +37,7 @@ from .._utils._config import McpServer from ._context import UiPathServerType from ._exception import McpErrorCode, UiPathMcpRuntimeError -from ._session import SessionServer +from ._session import BaseSessionServer, SessionServer, StreamableHttpSessionServer logger = logging.getLogger(__name__) tracer = trace.get_tracer(__name__) @@ -76,10 +76,14 @@ def __init__( self._server_slug = server_slug self._signalr_client: SignalRClient | None = None - self._session_servers: dict[str, SessionServer] = {} + self._session_servers: dict[str, BaseSessionServer] = {} self._session_output: str | None = None self._cancel_event = asyncio.Event() self._keep_alive_task: asyncio.Task[None] | None = None + self._http_server_process: asyncio.subprocess.Process | None = None + self._http_monitor_task: asyncio.Task[None] | None = None + self._http_stderr_drain_task: asyncio.Task[None] | None = None + self._http_server_stderr_lines: list[str] = [] self._uipath = UiPath() self._cleanup_done = False @@ -223,6 +227,12 @@ async def _run_server(self) -> UiPathRuntimeResult: # Register the local server with UiPath MCP Server await self._register() + # Start HTTP server process monitor if using streamable-http + if self._server.is_streamable_http: + self._http_monitor_task = asyncio.create_task( + self._monitor_http_server_process() + ) + run_task = asyncio.create_task(self._signalr_client.run()) cancel_task = asyncio.create_task(self._cancel_event.wait()) self._keep_alive_task = asyncio.create_task(self._keep_alive()) @@ -300,6 +310,9 @@ async def _cleanup(self) -> None: except Exception as e: logger.error(f"Error cleaning up session server {session_id}: {str(e)}") + # Stop the shared HTTP server process (streamable-http only) + await self._stop_http_server_process() + if self._signalr_client and hasattr(self._signalr_client, "_transport"): transport = self._signalr_client._transport if transport and hasattr(transport, "_ws") and transport._ws: @@ -358,8 +371,13 @@ async def _handle_signalr_message(self, args: list[str]) -> None: try: # Check if we have a session server for this session_id if session_id not in self._session_servers: - # Create and start a new session server - session_server = SessionServer(self._server, self.slug, session_id) + session_server: BaseSessionServer + if self._server.is_streamable_http: + session_server = StreamableHttpSessionServer( + self._server, self.slug, session_id + ) + else: + session_server = SessionServer(self._server, self.slug, session_id) try: await session_server.start() except Exception as e: @@ -393,6 +411,156 @@ async def _handle_signalr_close(self) -> None: """Handle SignalR connection close event.""" logger.info("Websocket connection closed.") + async def _start_http_server_process(self) -> None: + """Spawn the streamable-http server process. + + The process is started once and shared across all sessions. + """ + env_vars = self._server.env.copy() + if self.server_type is UiPathServerType.Coded: + for name, value in os.environ.items(): + if name not in env_vars: + env_vars[name] = value + + merged_env = {**os.environ, **env_vars} if env_vars else None + self._http_server_stderr_lines = [] + self._http_server_process = await asyncio.create_subprocess_exec( + self._server.command, + *self._server.args, + env=merged_env, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE, + ) + self._http_stderr_drain_task = asyncio.create_task(self._drain_http_stderr()) + logger.info( + f"Started HTTP server process (PID: {self._http_server_process.pid}) " + f"for {self._server.url}" + ) + + async def _drain_http_stderr(self) -> None: + """Continuously read and log stderr from the HTTP server process. + + Accumulates output in _http_server_stderr_lines for error reporting. + """ + if not self._http_server_process or not self._http_server_process.stderr: + return + try: + async for line in self._http_server_process.stderr: + decoded = line.decode("utf-8", errors="replace").rstrip() + self._http_server_stderr_lines.append(decoded) + logger.debug(f"HTTP server stderr: {decoded}") + except asyncio.CancelledError: + pass + + async def _wait_for_http_server_ready( + self, + max_retries: int = 30, + retry_delay: float = 1.0, + ) -> None: + """Wait for the HTTP server to start accepting connections.""" + import httpx + + url = self._server.url + if not url: + raise ValueError("streamable-http transport requires url in config") + + for attempt in range(max_retries): + # Check if process has crashed + if ( + self._http_server_process + and self._http_server_process.returncode is not None + ): + stderr_output = "\n".join(self._http_server_stderr_lines) + raise UiPathMcpRuntimeError( + McpErrorCode.INITIALIZATION_ERROR, + "HTTP server process exited unexpectedly", + f"Exit code: {self._http_server_process.returncode}\n{stderr_output}", + UiPathErrorCategory.SYSTEM, + ) + + try: + async with httpx.AsyncClient() as client: + response = await client.get(url, timeout=2.0) + logger.info( + f"HTTP server is ready (status: {response.status_code})" + ) + return + except (httpx.ConnectError, httpx.ConnectTimeout) as err: + if attempt < max_retries - 1: + logger.debug( + f"HTTP server not ready yet, retrying in {retry_delay}s " + f"(attempt {attempt + 1}/{max_retries})" + ) + await asyncio.sleep(retry_delay) + else: + raise UiPathMcpRuntimeError( + McpErrorCode.INITIALIZATION_ERROR, + "HTTP server failed to start", + f"Server at {url} did not become ready after {max_retries} attempts", + UiPathErrorCategory.SYSTEM, + ) from err + except httpx.HTTPError: + # Any other HTTP error means server is listening + logger.info("HTTP server is ready (responded with error, but is up)") + return + + async def _stop_http_server_process(self) -> None: + """Stop the shared HTTP server process.""" + if self._http_monitor_task and not self._http_monitor_task.done(): + self._http_monitor_task.cancel() + try: + await self._http_monitor_task + except asyncio.CancelledError: + pass + self._http_monitor_task = None + + if self._http_server_process: + try: + self._http_server_process.terminate() + try: + await asyncio.wait_for( + self._http_server_process.wait(), timeout=5.0 + ) + except asyncio.TimeoutError: + self._http_server_process.kill() + await self._http_server_process.wait() + except ProcessLookupError: + pass + finally: + logger.info("HTTP server process stopped") + self._http_server_process = None + + if self._http_stderr_drain_task and not self._http_stderr_drain_task.done(): + self._http_stderr_drain_task.cancel() + try: + await self._http_stderr_drain_task + except asyncio.CancelledError: + pass + self._http_stderr_drain_task = None + + async def _monitor_http_server_process(self) -> None: + """Monitor the HTTP server process and handle unexpected exits.""" + if not self._http_server_process: + return + try: + returncode = await self._http_server_process.wait() + if not self._cancel_event.is_set(): + logger.error( + f"HTTP server process exited unexpectedly with code {returncode}" + ) + # Stop all HTTP sessions, they will fail on next request anyway + for session_id, session_server in list(self._session_servers.items()): + if isinstance(session_server, StreamableHttpSessionServer): + try: + await session_server.stop() + except Exception as e: + logger.error( + f"Error stopping session {session_id} after process crash: {e}" + ) + self._session_servers.pop(session_id, None) + except asyncio.CancelledError: + pass + async def _register(self) -> None: """Register the MCP server with UiPath.""" @@ -409,36 +577,63 @@ async def _register(self) -> None: env_vars[name] = value try: - # Create a temporary session to get tools - server_params = StdioServerParameters( - command=self._server.command, - args=self._server.args, - env=env_vars, - ) + if self._server.is_streamable_http: + # spawn process, wait for readiness, connect via HTTP + await self._start_http_server_process() + await self._wait_for_http_server_ready() - # Start a temporary stdio client to get tools - # Use a temporary file to capture stderr - with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary: - stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8") - async with stdio_client(server_params, errlog=stderr_temp) as ( + from mcp.client.streamable_http import streamable_http_client + + if self._server.url is None: + raise UiPathMcpRuntimeError( + McpErrorCode.CONFIGURATION_ERROR, + "Missing URL for streamable-http server", + "Please specify a 'url' in the server configuration for streamable-http transport.", + UiPathErrorCategory.SYSTEM, + ) + async with streamable_http_client(self._server.url) as ( read, write, + _, ): async with ClientSession(read, write) as session: - logger.info("Initializing client session...") - # Try to initialize with timeout + logger.info("Initializing client session (streamable-http)...") try: await asyncio.wait_for(session.initialize(), timeout=30) initialization_successful = True - logger.info("Initialization successful") - - # Only proceed if initialization was successful tools_result = await session.list_tools() + logger.info(f"Discovered {len(tools_result.tools)} tool(s)") except Exception as err: logger.error(f"Initialization error: {err}") - # Capture stderr output here, after the timeout - stderr_temp.seek(0) - server_stderr_output = stderr_temp.read() + server_stderr_output = "\n".join( + self._http_server_stderr_lines + ) + logger.info("Registration session closed (DELETE sent to server)") + else: + # spawn temporary process, discover tools, process dies with context + server_params = StdioServerParameters( + command=self._server.command, + args=self._server.args, + env=env_vars, + ) + + with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary: + stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8") + async with stdio_client(server_params, errlog=stderr_temp) as ( + read, + write, + ): + async with ClientSession(read, write) as session: + logger.info("Initializing client session...") + try: + await asyncio.wait_for(session.initialize(), timeout=30) + initialization_successful = True + logger.info("Initialization successful") + tools_result = await session.list_tools() + except Exception as err: + logger.error(f"Initialization error: {err}") + stderr_temp.seek(0) + server_stderr_output = stderr_temp.read() except* Exception as eg: for e in eg.exceptions: diff --git a/src/uipath_mcp/_cli/_runtime/_session.py b/src/uipath_mcp/_cli/_runtime/_session.py index c7a6735..1e86a5f 100644 --- a/src/uipath_mcp/_cli/_runtime/_session.py +++ b/src/uipath_mcp/_cli/_runtime/_session.py @@ -5,6 +5,7 @@ from typing import Any from mcp import StdioServerParameters, stdio_client +from mcp.client.streamable_http import streamable_http_client from mcp.shared.message import SessionMessage from mcp.types import ( ErrorData, @@ -26,8 +27,8 @@ RETRY_DELAY = 1 -class SessionServer: - """Manages a server process for a specific session.""" +class BaseSessionServer: + """Base class with transport-agnostic message relay logic.""" def __init__(self, server_config: McpServer, server_slug: str, session_id: str): self._server_config = server_config @@ -35,7 +36,6 @@ def __init__(self, server_config: McpServer, server_slug: str, session_id: str): self._session_id = session_id self._read_stream: Any = None self._write_stream: Any = None - self._mcp_session: Any = None self._run_task: asyncio.Task[None] | None = None self._message_queue: asyncio.Queue[JSONRPCMessage] = asyncio.Queue() self._active_requests: dict[str, str] = {} @@ -43,32 +43,32 @@ def __init__(self, server_config: McpServer, server_slug: str, session_id: str): self._last_message_id: str | None = None self._uipath = UiPath() self._mcp_tracer = McpTracer(tracer, logger) - self._server_stderr_output: str | None = None @property def output(self) -> str | None: - """Returns the captured stderr output from the MCP server process.""" - return self._server_stderr_output + """Returns captured output from the server process, if any.""" + return None async def start(self) -> None: - """Start the server process in a separate task.""" - try: - server_params = StdioServerParameters( - command=str(self._server_config.command), - args=self._server_config.args, - env=self._server_config.env, - ) + """Start the session. Must be implemented by subclasses.""" + raise NotImplementedError - # Start the server process in a separate task - self._run_task = asyncio.create_task(self._run_server(server_params)) - self._run_task.add_done_callback(self._run_server_callback) + async def stop(self) -> None: + """Clean up resources and stop the session.""" + if self._run_task and not self._run_task.done(): + self._run_task.cancel() + try: + await asyncio.wait_for(asyncio.shield(self._run_task), timeout=3.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass + except Exception as e: + logger.error( + f"Error during task cancellation for session {self._session_id}: {e}" + ) - except Exception as e: - logger.error( - f"Error starting session {self._session_id}: {e}", exc_info=True - ) - await self.stop() - raise + self._run_task = None + self._read_stream = None + self._write_stream = None async def on_message_received(self, request_id: str) -> None: """Get new incoming messages from UiPath MCP Server.""" @@ -89,128 +89,79 @@ async def on_message_received(self, request_id: str) -> None: ) raise - async def stop(self) -> None: - """Clean up resources and stop the server.""" - # Cancel the context task if it exists - if self._run_task and not self._run_task.done(): - self._run_task.cancel() - try: - await asyncio.wait_for(asyncio.shield(self._run_task), timeout=3.0) - except (asyncio.TimeoutError, asyncio.CancelledError): - pass - except Exception as e: - logger.error( - f"Error during task cancellation for session {self._session_id}: {e}" - ) - - # The context managers in _run_server will handle resource cleanup - self._run_task = None - self._read_stream = None - self._write_stream = None - self._mcp_session = None + async def _relay_messages(self) -> None: + """Transport-agnostic message relay loop. - async def _run_server(self, server_params: StdioServerParameters) -> None: - """Run the local MCP server process.""" - logger.info(f"Starting local MCP Server process for session {self._session_id}") - self._server_stderr_output = None - with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary: - stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8") - try: - async with stdio_client(server_params, errlog=stderr_temp) as ( - read, - write, - ): - self._read_stream, self._write_stream = read, write + Reads messages from the local server's read stream, matches responses + to request IDs, and sends them back. + """ + consumer_task = asyncio.create_task(self._consume_messages()) - # Start the message consumer task - consumer_task = asyncio.create_task(self._consume_messages()) - - # Process incoming messages from the local server - try: - while True: - # Get message from local server - session_message = None - try: - if self._read_stream is None: - logger.error("Read stream is not initialized") - break - - session_message = await self._read_stream.receive() - if isinstance(session_message, Exception): - logger.error(f"Received error: {session_message}") - continue - message = session_message.message - # For responses, determine which request_id to use - if self._is_response(message): - message_id = self._get_message_id(message) - if ( - message_id - and message_id in self._active_requests - ): - # Use the stored request_id for this response - request_id = self._active_requests[message_id] - # Send with the matched request_id - await self._send_message(message, request_id) - # Clean up the mapping after use - del self._active_requests[message_id] - else: - # If no mapping found, use the last known request_id - if self._last_request_id is not None: - await self._send_message( - message, self._last_request_id - ) - else: - # For non-responses, use the last known request_id - if self._last_request_id is not None: - await self._send_message( - message, self._last_request_id - ) - except Exception as e: - if session_message: - logger.info(session_message) - logger.error( - f"Error processing message for session {self._session_id}: {e}", - exc_info=True, - ) - if self._last_request_id is not None: - await self._send_message( - JSONRPCMessage( - root=JSONRPCError( - jsonrpc="2.0", - # Use the last known message id for error reporting - id=self._last_message_id, - error=ErrorData( - code=-32000, - message=f"Error processing message: {e}", - ), - ) - ), - self._last_request_id, - ) - continue - finally: - # Cancel the consumer when we exit the loop - consumer_task.cancel() - try: - await asyncio.wait_for(consumer_task, timeout=2.0) - except (asyncio.CancelledError, asyncio.TimeoutError): - pass + try: + while True: + session_message = None + try: + if self._read_stream is None: + logger.error("Read stream is not initialized") + break - except* Exception as eg: - for exception in eg.exceptions: + session_message = await self._read_stream.receive() + if isinstance(session_message, Exception): + logger.error(f"Received error: {session_message}") + continue + message = session_message.message + # For responses, determine which request_id to use + if self._is_response(message): + message_id = self._get_message_id(message) + if message_id and message_id in self._active_requests: + # Use the stored request_id for this response + request_id = self._active_requests[message_id] + # Send with the matched request_id + await self._send_message(message, request_id) + # Clean up the mapping after use + del self._active_requests[message_id] + else: + # If no mapping found, use the last known request_id + if self._last_request_id is not None: + await self._send_message(message, self._last_request_id) + else: + # For non-responses, use the last known request_id + if self._last_request_id is not None: + await self._send_message(message, self._last_request_id) + except Exception as e: + if session_message: + logger.info(session_message) logger.error( - f"Unexpected error for session {self._session_id}: {exception}", + f"Error processing message for session {self._session_id}: {e}", exc_info=True, ) - finally: - stderr_temp.seek(0) - self._server_stderr_output = stderr_temp.read() - logger.error(self._server_stderr_output) + if self._last_request_id is not None: + await self._send_message( + JSONRPCMessage( + root=JSONRPCError( + jsonrpc="2.0", + # Use the last known message id for error reporting + id=self._last_message_id, + error=ErrorData( + code=-32000, + message=f"Error processing message: {e}", + ), + ) + ), + self._last_request_id, + ) + continue + finally: + # Cancel the consumer when we exit the loop + consumer_task.cancel() + try: + await asyncio.wait_for(consumer_task, timeout=2.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass - def _run_server_callback(self, task): + def _run_server_callback(self, task: asyncio.Task[None]) -> None: """Handle task completion.""" try: - # Get the result to propagate any exceptions task.result() except asyncio.CancelledError: pass @@ -219,7 +170,7 @@ def _run_server_callback(self, task): f"Server task for session {self._session_id} failed: {e}", exc_info=True ) - async def _consume_messages(self): + async def _consume_messages(self) -> None: """Consume messages from the queue and send them to the local server.""" try: while True: @@ -292,13 +243,17 @@ async def _get_messages_internal(self, request_id: str) -> None: self._last_request_id = request_id messages = response.json() for message in messages: - logger.debug(f"Received message: {message}") json_message = JSONRPCMessage.model_validate(message) - if self._is_request(json_message): + if isinstance(json_message.root, JSONRPCRequest): + logger.info( + f"Session {self._session_id[:8]}: {json_message.root.method}" + ) message_id = self._get_message_id(json_message) if message_id: self._last_message_id = message_id self._active_requests[message_id] = request_id + else: + logger.debug(f"Received message: {message}") with self._mcp_tracer.create_span_for_message( json_message, session_id=self._session_id, @@ -309,13 +264,6 @@ async def _get_messages_internal(self, request_id: str) -> None: elif 500 <= response.status_code < 600: raise Exception(f"{response.status_code} - {response.text}") - def _is_request(self, message: JSONRPCMessage) -> bool: - """Check if a message is a JSONRPCRequest.""" - if hasattr(message, "root"): - root = message.root - return isinstance(root, JSONRPCRequest) - return False - def _is_response(self, message: JSONRPCMessage) -> bool: """Check if a message is a JSONRPCResponse or JSONRPCError.""" if hasattr(message, "root"): @@ -328,3 +276,96 @@ def _get_message_id(self, message: JSONRPCMessage) -> str: if hasattr(message, "root") and hasattr(message.root, "id"): return str(message.root.id) return "" + + +class SessionServer(BaseSessionServer): + """Manages a stdio server process for a specific session.""" + + def __init__(self, server_config: McpServer, server_slug: str, session_id: str): + super().__init__(server_config, server_slug, session_id) + self._server_stderr_output: str | None = None + + @property + def output(self) -> str | None: + """Returns the captured stderr output from the MCP server process.""" + return self._server_stderr_output + + async def start(self) -> None: + """Start the server process in a separate task.""" + try: + server_params = StdioServerParameters( + command=str(self._server_config.command), + args=self._server_config.args, + env=self._server_config.env, + ) + + self._run_task = asyncio.create_task(self._run_server(server_params)) + self._run_task.add_done_callback(self._run_server_callback) + + except Exception as e: + logger.error( + f"Error starting session {self._session_id}: {e}", exc_info=True + ) + await self.stop() + raise + + async def _run_server(self, server_params: StdioServerParameters) -> None: + """Run the local MCP server process.""" + logger.info(f"Starting local MCP Server process for session {self._session_id}") + self._server_stderr_output = None + with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary: + stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8") + try: + async with stdio_client(server_params, errlog=stderr_temp) as ( + read, + write, + ): + self._read_stream, self._write_stream = read, write + await self._relay_messages() + + except* Exception as eg: + for exception in eg.exceptions: + logger.error( + f"Unexpected error for session {self._session_id}: {exception}", + exc_info=True, + ) + finally: + stderr_temp.seek(0) + self._server_stderr_output = stderr_temp.read() + logger.error(self._server_stderr_output) + + +class StreamableHttpSessionServer(BaseSessionServer): + """Manages an HTTP connection to a shared streamable-http server for a specific session.""" + + async def start(self) -> None: + """Start an HTTP session to the running server.""" + try: + self._run_task = asyncio.create_task(self._run_http_session()) + self._run_task.add_done_callback(self._run_server_callback) + + except Exception as e: + logger.error( + f"Error starting HTTP session {self._session_id}: {e}", exc_info=True + ) + await self.stop() + raise + + async def _run_http_session(self) -> None: + """Connect to the streamable HTTP server and run the message relay.""" + url = self._server_config.url + if not url: + raise ValueError("streamable-http transport requires a url in config") + + logger.info( + f"Connecting to streamable HTTP server at {url} for session {self._session_id}" + ) + try: + async with streamable_http_client(url) as (read, write, _): + self._read_stream, self._write_stream = read, write + await self._relay_messages() + except Exception as e: + logger.error( + f"Unexpected error for HTTP session {self._session_id}: {e}", + exc_info=True, + ) diff --git a/src/uipath_mcp/_cli/_utils/_config.py b/src/uipath_mcp/_cli/_utils/_config.py index 3c87c10..ec8a0e0 100644 --- a/src/uipath_mcp/_cli/_utils/_config.py +++ b/src/uipath_mcp/_cli/_utils/_config.py @@ -16,6 +16,8 @@ def __init__( ): self.name = name self.type = server_config.get("type") + self.transport: str = server_config.get("transport", "stdio") + self.url: str | None = server_config.get("url") self.command: str = str(server_config.get("command")) self.args = server_config.get("args", []) self.env = server_config.get("env", {}) @@ -23,6 +25,11 @@ def __init__( if key in os.environ: self.env[key] = os.environ[key] + @property + def is_streamable_http(self) -> bool: + """Whether this server uses streamable-http transport.""" + return self.transport == "streamable-http" + @property def file_path(self) -> str | None: """Get the file path from args if available.""" @@ -30,10 +37,19 @@ def file_path(self) -> str | None: def to_dict(self) -> dict[str, Any]: """Convert the server model back to a dictionary.""" - return {"type": self.type, "command": self.command, "args": self.args} + result: dict[str, Any] = { + "type": self.type, + "command": self.command, + "args": self.args, + } + if self.transport: + result["transport"] = self.transport + if self.url: + result["url"] = self.url + return result def __repr__(self) -> str: - return f"McpServer(name='{self.name}', type='{self.type}', command='{self.command}', args={self.args})" + return f"McpServer(name='{self.name}', type='{self.type}', transport='{self.transport}', command='{self.command}', args={self.args}, url='{self.url}')" class McpConfig: diff --git a/uv.lock b/uv.lock index 0154ab1..1408437 100644 --- a/uv.lock +++ b/uv.lock @@ -1761,7 +1761,7 @@ wheels = [ [[package]] name = "uipath-mcp" -version = "0.1.0" +version = "0.1.1" source = { editable = "." } dependencies = [ { name = "mcp" },