Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[project]
name = "uipath"
version = "2.2.6"
version = "2.2.7"
description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools."
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
dependencies = [
"uipath-core>=0.0.5, <0.1.0",
"uipath-runtime>=0.0.23, <0.1.0",
"uipath-runtime>=0.1.0, <0.2.0",
"click>=8.3.1",
"httpx>=0.28.1",
"pyjwt>=2.10.1",
Expand Down
72 changes: 57 additions & 15 deletions src/uipath/_cli/_debug/_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import json
import logging
import os
import signal
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from typing import Any, Literal

Expand Down Expand Up @@ -75,13 +77,20 @@ def __init__(self, verbose: bool = True):
self.verbose = verbose
self.state = DebuggerState()

self._stdin_executor = ThreadPoolExecutor(max_workers=1)
self._terminate_event: asyncio.Event | None = None

async def connect(self) -> None:
"""Connect to console debugger."""
"""Initialize the console debugger."""
self._terminate_event = asyncio.Event()
signal.signal(
signal.SIGINT, self._handle_sigint
) # We need to catch CTRL+C during polling
self.console.print()
self._print_help()

async def disconnect(self) -> None:
"""Cleanup."""
"""Clean up console debugger."""
self.console.print()
self.console.print("[dim]─" * 40)
self.console.print("[green]Debug session completed")
Expand Down Expand Up @@ -116,7 +125,6 @@ async def emit_breakpoint_hit(

self.console.print("[red]─" * 40)

# Display current state
if breakpoint_result.current_state:
self._print_json(breakpoint_result.current_state, label="state")

Expand Down Expand Up @@ -165,43 +173,72 @@ async def emit_execution_error(
self.console.print(f"[red]{error_display}[/red]")
self.console.print("[red]─" * 40)

async def wait_for_resume(self) -> Any:
async def wait_for_resume(self) -> dict[str, Any]:
"""Wait for user to press Enter or type commands."""
while True: # Keep looping until we get a resume command
self.console.print()

# Run input() in executor to not block async loop
loop = asyncio.get_running_loop()
user_input = await loop.run_in_executor(None, lambda: input("> "))
future = loop.run_in_executor(
self._stdin_executor, self._read_input_blocking
)

try:
user_input = await future
except asyncio.CancelledError:
return {"command": DebugCommand.CONTINUE, "args": None}

command_result = self._parse_command(user_input.strip())

# Handle commands that need another prompt
if command_result["command"] in [

if command_result["command"] in {
DebugCommand.BREAKPOINT,
DebugCommand.LIST_BREAKPOINTS,
DebugCommand.CLEAR_BREAKPOINT,
DebugCommand.HELP,
]:
}:
# These commands don't resume execution, loop again
continue

# Commands that resume execution: CONTINUE, STEP
self.console.print()
return command_result

async def wait_for_terminate(self) -> None:
"""Wait until user requests termination (Ctrl+C or 'q')."""
assert self._terminate_event is not None, "Debugger not connected"
await self._terminate_event.wait()

def get_breakpoints(self) -> list[str] | Literal["*"]:
"""Get nodes to suspend execution at."""
if self.state.step_mode:
return "*" # Suspend at all nodes
return list(self.state.breakpoints) # Only suspend at breakpoints

def _read_input_blocking(self) -> str:
assert self._terminate_event is not None, "Debugger not connected"
try:
return input("> ")
except KeyboardInterrupt as e:
self._terminate_event.set()
raise UiPathDebugQuitError("User pressed Ctrl+C") from e
except EOFError as e:
self._terminate_event.set()
raise UiPathDebugQuitError("STDIN closed by user") from e

def _handle_sigint(self, signum: int, frame: Any) -> None:
assert self._terminate_event is not None, "Debugger not connected"
asyncio.get_running_loop().call_soon_threadsafe(self._terminate_event.set)

def _parse_command(self, user_input: str) -> dict[str, Any]:
"""Parse user command input.

Returns:
Dict with 'command' and optional 'args'
"""
assert self._terminate_event is not None, "Debugger not connected"

if not user_input:
return {"command": DebugCommand.CONTINUE, "args": None}

Expand Down Expand Up @@ -246,6 +283,7 @@ def _parse_command(self, user_input: str) -> dict[str, Any]:
}

elif cmd in ["q", "quit", "exit"]:
self._terminate_event.set()
raise UiPathDebugQuitError("User requested exit")

elif cmd in ["h", "help", "?"]:
Expand Down Expand Up @@ -382,7 +420,7 @@ def __init__(
self._client: SignalRClient | None = None
self._connected_event = asyncio.Event()
self._resume_event: asyncio.Event | None = None
self._quit_event = asyncio.Event()
self._terminate_event = asyncio.Event()

async def connect(self) -> None:
"""Establish SignalR connection."""
Expand Down Expand Up @@ -538,10 +576,10 @@ async def wait_for_resume(self) -> None:
self._resume_event = asyncio.Event()

resume_task = asyncio.create_task(self._resume_event.wait())
quit_task = asyncio.create_task(self._quit_event.wait())
terminate_task = asyncio.create_task(self._terminate_event.wait())

done, pending = await asyncio.wait(
{resume_task, quit_task}, return_when=asyncio.FIRST_COMPLETED
{resume_task, terminate_task}, return_when=asyncio.FIRST_COMPLETED
)

for task in pending:
Expand All @@ -551,12 +589,16 @@ async def wait_for_resume(self) -> None:
except asyncio.CancelledError:
pass

if quit_task in done:
logger.info("Quit command received during wait")
raise UiPathDebugQuitError("Quit command received from server")
if terminate_task in done:
logger.info("Terminate command received during wait")
raise UiPathDebugQuitError("Terminate command received from server")

logger.info("Resume command received")

async def wait_for_terminate(self) -> None:
"""Wait for terminate command from server."""
await self._terminate_event.wait()

def get_breakpoints(self) -> list[str] | Literal["*"]:
"""Get nodes to suspend execution at."""
if self.state.step_mode:
Expand Down Expand Up @@ -693,7 +735,7 @@ async def _handle_remove_breakpoints(self, args: list[Any]) -> None:
async def _handle_quit(self, _args: list[Any]) -> None:
"""Handle Quit command from SignalR server."""
logger.info("Quit command received")
self._quit_event.set()
self._terminate_event.set()

async def _handle_open(self) -> None:
"""Handle SignalR connection open."""
Expand Down
6 changes: 6 additions & 0 deletions src/uipath/_cli/cli_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,13 @@ async def execute_debug_runtime():
factory: UiPathRuntimeFactoryProtocol | None = None

try:
trigger_poll_interval: float = 5.0

if ctx.job_id:
trace_manager.add_span_exporter(LlmOpsHttpExporter())
trigger_poll_interval = (
0.0 # Polling disabled for production jobs
)

factory = UiPathRuntimeFactoryRegistry.get(context=ctx)

Expand All @@ -126,6 +131,7 @@ async def execute_debug_runtime():
debug_runtime = UiPathDebugRuntime(
delegate=runtime,
debug_bridge=debug_bridge,
trigger_poll_interval=trigger_poll_interval,
)

project_id = UiPathConfig.project_id
Expand Down
29 changes: 16 additions & 13 deletions src/uipath/platform/resume_triggers/_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,22 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None:
folder_key=trigger.folder_key,
folder_path=trigger.folder_path,
)
if (
job.state
and not job.state.lower()
== UiPathRuntimeStatus.SUCCESSFUL.value.lower()
):
raise UiPathRuntimeError(
UiPathErrorCode.INVOKED_PROCESS_FAILURE,
"Invoked process did not finish successfully.",
_try_convert_to_json_format(str(job.job_error or job.info))
or "Job error unavailable.",
)
output_data = await uipath.jobs.extract_output_async(job)
return _try_convert_to_json_format(output_data)
job_state = (job.state or "").lower()
successful_state = UiPathRuntimeStatus.SUCCESSFUL.value.lower()
faulted_state = UiPathRuntimeStatus.FAULTED.value.lower()

if job_state == successful_state:
output_data = await uipath.jobs.extract_output_async(job)
return _try_convert_to_json_format(output_data)

raise UiPathRuntimeError(
UiPathErrorCode.INVOKED_PROCESS_FAILURE,
"Invoked process did not finish successfully.",
_try_convert_to_json_format(str(job.job_error or job.info))
or "Job error unavailable."
if job_state == faulted_state
else f"Job {job.key} is {job_state}.",
)

case UiPathResumeTriggerType.API:
if trigger.api_resume and trigger.api_resume.inbox_id:
Expand Down
4 changes: 3 additions & 1 deletion tests/cli/test_hitl.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ async def test_read_job_trigger_failed(
job_error_info = JobErrorInfo(code="error code")
job_id = 1234

mock_job = Job(id=job_id, key=job_key, state="Failed", job_error=job_error_info)
mock_job = Job(
id=job_id, key=job_key, state="Faulted", job_error=job_error_info
)
mock_retrieve_async = AsyncMock(return_value=mock_job)

with patch(
Expand Down
10 changes: 5 additions & 5 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading