Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-mcp"
version = "0.1.0"
version = "0.1.1"
description = "UiPath MCP SDK"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
19 changes: 18 additions & 1 deletion src/uipath_mcp/_cli/_runtime/_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,26 @@ async def new_runtime(
McpErrorCode.SERVER_NOT_FOUND,
"MCP server not found",
f"Server '{entrypoint}' not found. Available: {available}",
UiPathErrorCategory.DEPLOYMENT,
UiPathErrorCategory.USER,
)

# Validate streamable-http configuration
if server.is_streamable_http:
if not server.url:
raise UiPathMcpRuntimeError(
McpErrorCode.CONFIGURATION_ERROR,
"Invalid configuration",
f"Server '{entrypoint}' uses streamable-http transport but 'url' is not specified in mcp.json",
UiPathErrorCategory.USER,
)
if not server.command or server.command == "None":
raise UiPathMcpRuntimeError(
McpErrorCode.CONFIGURATION_ERROR,
"Invalid configuration",
f"Server '{entrypoint}' uses streamable-http transport but 'command' is not specified in mcp.json",
UiPathErrorCategory.USER,
)

# Validate runtime_id is a valid UUID, generate new one if not
try:
uuid.UUID(runtime_id)
Expand Down
241 changes: 218 additions & 23 deletions src/uipath_mcp/_cli/_runtime/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from .._utils._config import McpServer
from ._context import UiPathServerType
from ._exception import McpErrorCode, UiPathMcpRuntimeError
from ._session import SessionServer
from ._session import BaseSessionServer, SessionServer, StreamableHttpSessionServer

logger = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)
Expand Down Expand Up @@ -76,10 +76,14 @@ def __init__(
self._server_slug = server_slug

self._signalr_client: SignalRClient | None = None
self._session_servers: dict[str, SessionServer] = {}
self._session_servers: dict[str, BaseSessionServer] = {}
self._session_output: str | None = None
self._cancel_event = asyncio.Event()
self._keep_alive_task: asyncio.Task[None] | None = None
self._http_server_process: asyncio.subprocess.Process | None = None
self._http_monitor_task: asyncio.Task[None] | None = None
self._http_stderr_drain_task: asyncio.Task[None] | None = None
self._http_server_stderr_lines: list[str] = []
self._uipath = UiPath()
self._cleanup_done = False

Expand Down Expand Up @@ -223,6 +227,12 @@ async def _run_server(self) -> UiPathRuntimeResult:
# Register the local server with UiPath MCP Server
await self._register()

# Start HTTP server process monitor if using streamable-http
if self._server.is_streamable_http:
self._http_monitor_task = asyncio.create_task(
self._monitor_http_server_process()
)

run_task = asyncio.create_task(self._signalr_client.run())
cancel_task = asyncio.create_task(self._cancel_event.wait())
self._keep_alive_task = asyncio.create_task(self._keep_alive())
Expand Down Expand Up @@ -300,6 +310,9 @@ async def _cleanup(self) -> None:
except Exception as e:
logger.error(f"Error cleaning up session server {session_id}: {str(e)}")

# Stop the shared HTTP server process (streamable-http only)
await self._stop_http_server_process()

if self._signalr_client and hasattr(self._signalr_client, "_transport"):
transport = self._signalr_client._transport
if transport and hasattr(transport, "_ws") and transport._ws:
Expand Down Expand Up @@ -358,8 +371,13 @@ async def _handle_signalr_message(self, args: list[str]) -> None:
try:
# Check if we have a session server for this session_id
if session_id not in self._session_servers:
# Create and start a new session server
session_server = SessionServer(self._server, self.slug, session_id)
session_server: BaseSessionServer
if self._server.is_streamable_http:
session_server = StreamableHttpSessionServer(
self._server, self.slug, session_id
)
else:
session_server = SessionServer(self._server, self.slug, session_id)
try:
await session_server.start()
except Exception as e:
Expand Down Expand Up @@ -393,6 +411,156 @@ async def _handle_signalr_close(self) -> None:
"""Handle SignalR connection close event."""
logger.info("Websocket connection closed.")

async def _start_http_server_process(self) -> None:
"""Spawn the streamable-http server process.

The process is started once and shared across all sessions.
"""
env_vars = self._server.env.copy()
if self.server_type is UiPathServerType.Coded:
for name, value in os.environ.items():
if name not in env_vars:
env_vars[name] = value

merged_env = {**os.environ, **env_vars} if env_vars else None
self._http_server_stderr_lines = []
self._http_server_process = await asyncio.create_subprocess_exec(
self._server.command,
*self._server.args,
env=merged_env,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
)
self._http_stderr_drain_task = asyncio.create_task(self._drain_http_stderr())
logger.info(
f"Started HTTP server process (PID: {self._http_server_process.pid}) "
f"for {self._server.url}"
)

async def _drain_http_stderr(self) -> None:
"""Continuously read and log stderr from the HTTP server process.

Accumulates output in _http_server_stderr_lines for error reporting.
"""
if not self._http_server_process or not self._http_server_process.stderr:
return
try:
async for line in self._http_server_process.stderr:
decoded = line.decode("utf-8", errors="replace").rstrip()
self._http_server_stderr_lines.append(decoded)
logger.debug(f"HTTP server stderr: {decoded}")
except asyncio.CancelledError:
pass

async def _wait_for_http_server_ready(
self,
max_retries: int = 30,
retry_delay: float = 1.0,
) -> None:
"""Wait for the HTTP server to start accepting connections."""
import httpx

url = self._server.url
if not url:
raise ValueError("streamable-http transport requires url in config")

for attempt in range(max_retries):
# Check if process has crashed
if (
self._http_server_process
and self._http_server_process.returncode is not None
):
stderr_output = "\n".join(self._http_server_stderr_lines)
raise UiPathMcpRuntimeError(
McpErrorCode.INITIALIZATION_ERROR,
"HTTP server process exited unexpectedly",
f"Exit code: {self._http_server_process.returncode}\n{stderr_output}",
UiPathErrorCategory.SYSTEM,
)

try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=2.0)
logger.info(
f"HTTP server is ready (status: {response.status_code})"
)
return
except (httpx.ConnectError, httpx.ConnectTimeout) as err:
if attempt < max_retries - 1:
logger.debug(
f"HTTP server not ready yet, retrying in {retry_delay}s "
f"(attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(retry_delay)
else:
raise UiPathMcpRuntimeError(
McpErrorCode.INITIALIZATION_ERROR,
"HTTP server failed to start",
f"Server at {url} did not become ready after {max_retries} attempts",
UiPathErrorCategory.SYSTEM,
) from err
except httpx.HTTPError:
# Any other HTTP error means server is listening
logger.info("HTTP server is ready (responded with error, but is up)")
return
Comment on lines +502 to +505
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _wait_for_http_server_ready, treating any httpx.HTTPError as “server is ready” can incorrectly mark readiness on ReadTimeout/WriteTimeout/protocol errors (port open but server not usable). Consider retrying on timeout-related errors and only treating an actual response (any status code) or HTTPStatusError as readiness.

Copilot uses AI. Check for mistakes.

async def _stop_http_server_process(self) -> None:
"""Stop the shared HTTP server process."""
if self._http_monitor_task and not self._http_monitor_task.done():
self._http_monitor_task.cancel()
try:
await self._http_monitor_task
except asyncio.CancelledError:
pass
self._http_monitor_task = None

if self._http_server_process:
try:
self._http_server_process.terminate()
try:
await asyncio.wait_for(
self._http_server_process.wait(), timeout=5.0
)
except asyncio.TimeoutError:
self._http_server_process.kill()
await self._http_server_process.wait()
except ProcessLookupError:
pass
finally:
logger.info("HTTP server process stopped")
self._http_server_process = None

if self._http_stderr_drain_task and not self._http_stderr_drain_task.done():
self._http_stderr_drain_task.cancel()
try:
await self._http_stderr_drain_task
except asyncio.CancelledError:
pass
self._http_stderr_drain_task = None

async def _monitor_http_server_process(self) -> None:
"""Monitor the HTTP server process and handle unexpected exits."""
if not self._http_server_process:
return
try:
returncode = await self._http_server_process.wait()
if not self._cancel_event.is_set():
logger.error(
f"HTTP server process exited unexpectedly with code {returncode}"
)
# Stop all HTTP sessions, they will fail on next request anyway
for session_id, session_server in list(self._session_servers.items()):
if isinstance(session_server, StreamableHttpSessionServer):
try:
await session_server.stop()
except Exception as e:
logger.error(
f"Error stopping session {session_id} after process crash: {e}"
)
self._session_servers.pop(session_id, None)
except asyncio.CancelledError:
pass

async def _register(self) -> None:
"""Register the MCP server with UiPath."""

Expand All @@ -409,36 +577,63 @@ async def _register(self) -> None:
env_vars[name] = value

try:
# Create a temporary session to get tools
server_params = StdioServerParameters(
command=self._server.command,
args=self._server.args,
env=env_vars,
)
if self._server.is_streamable_http:
# spawn process, wait for readiness, connect via HTTP
await self._start_http_server_process()
await self._wait_for_http_server_ready()

# Start a temporary stdio client to get tools
# Use a temporary file to capture stderr
with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary:
stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8")
async with stdio_client(server_params, errlog=stderr_temp) as (
from mcp.client.streamable_http import streamable_http_client

if self._server.url is None:
raise UiPathMcpRuntimeError(
McpErrorCode.CONFIGURATION_ERROR,
"Missing URL for streamable-http server",
"Please specify a 'url' in the server configuration for streamable-http transport.",
UiPathErrorCategory.SYSTEM,
)
async with streamable_http_client(self._server.url) as (
read,
write,
_,
):
async with ClientSession(read, write) as session:
logger.info("Initializing client session...")
# Try to initialize with timeout
logger.info("Initializing client session (streamable-http)...")
try:
await asyncio.wait_for(session.initialize(), timeout=30)
initialization_successful = True
logger.info("Initialization successful")

# Only proceed if initialization was successful
tools_result = await session.list_tools()
logger.info(f"Discovered {len(tools_result.tools)} tool(s)")
except Exception as err:
logger.error(f"Initialization error: {err}")
# Capture stderr output here, after the timeout
stderr_temp.seek(0)
server_stderr_output = stderr_temp.read()
server_stderr_output = "\n".join(
self._http_server_stderr_lines
)
logger.info("Registration session closed (DELETE sent to server)")
else:
# spawn temporary process, discover tools, process dies with context
server_params = StdioServerParameters(
command=self._server.command,
args=self._server.args,
env=env_vars,
)

with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary:
stderr_temp = io.TextIOWrapper(stderr_temp_binary, encoding="utf-8")
async with stdio_client(server_params, errlog=stderr_temp) as (
read,
write,
):
async with ClientSession(read, write) as session:
logger.info("Initializing client session...")
try:
await asyncio.wait_for(session.initialize(), timeout=30)
initialization_successful = True
logger.info("Initialization successful")
tools_result = await session.list_tools()
except Exception as err:
logger.error(f"Initialization error: {err}")
stderr_temp.seek(0)
server_stderr_output = stderr_temp.read()

except* Exception as eg:
for e in eg.exceptions:
Expand Down
Loading