From 24b9ce25d1ae076ee584954e8b7249b169ed13cb Mon Sep 17 00:00:00 2001 From: Cristian Pufu Date: Mon, 10 Nov 2025 15:57:15 +0200 Subject: [PATCH] fix: reactive ui --- src/uipath/dev/__init__.py | 206 +++++------------- src/uipath/dev/services/__init__.py | 7 + src/uipath/dev/services/run_service.py | 203 +++++++++++++++++ src/uipath/dev/ui/panels/run_history_panel.py | 127 +++++++---- 4 files changed, 347 insertions(+), 196 deletions(-) create mode 100644 src/uipath/dev/services/__init__.py create mode 100644 src/uipath/dev/services/run_service.py diff --git a/src/uipath/dev/__init__.py b/src/uipath/dev/__init__.py index dd7bf5d..34d42e1 100644 --- a/src/uipath/dev/__init__.py +++ b/src/uipath/dev/__init__.py @@ -2,34 +2,24 @@ import asyncio import json -import traceback from datetime import datetime from pathlib import Path -from typing import Any, Optional +from typing import Any import pyperclip # type: ignore[import-untyped] -from pydantic import BaseModel -from rich.traceback import Traceback from textual import on from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Container, Horizontal from textual.widgets import Button, Footer, Input, ListView, RichLog from uipath.core.tracing import UiPathTraceManager -from uipath.runtime import ( - UiPathExecuteOptions, - UiPathExecutionRuntime, - UiPathRuntimeFactoryProtocol, - UiPathRuntimeStatus, -) -from uipath.runtime.errors import UiPathErrorContract, UiPathRuntimeError +from uipath.runtime import UiPathRuntimeFactoryProtocol from uipath.dev.infrastructure import ( - RunContextExporter, - RunContextLogHandler, patch_textual_stderr, ) from uipath.dev.models import ExecutionRun, LogMessage, TraceMessage +from uipath.dev.services import RunService from uipath.dev.ui.panels import NewRunPanel, RunDetailsPanel, RunHistoryPanel @@ -37,7 +27,10 @@ class UiPathDeveloperConsole(App[Any]): """UiPath developer console interface.""" TITLE = "UiPath Developer Console" - SUB_TITLE = "Interactive terminal application for building, testing, and debugging UiPath Python runtimes, agents, and automation scripts." + SUB_TITLE = ( + "Interactive terminal application for building, testing, and debugging " + "UiPath Python runtimes, agents, and automation scripts." + ) CSS_PATH = Path(__file__).parent / "ui" / "styles" / "terminal.tcss" BINDINGS = [ @@ -56,23 +49,27 @@ def __init__( **kwargs, ): """Initialize the UiPath Dev Terminal App.""" + # Capture subprocess stderr lines and route to our log handler self._stderr_write_fd: int = patch_textual_stderr(self._add_subprocess_log) super().__init__(**kwargs) - self.initial_entrypoint: str = "main.py" - self.initial_input: str = '{\n "message": "Hello World"\n}' - self.runs: dict[str, ExecutionRun] = {} self.runtime_factory = runtime_factory self.trace_manager = trace_manager - self.trace_manager.add_span_exporter( - RunContextExporter( - on_trace=self._handle_trace_message, - on_log=self._handle_log_message, - ), - batch=False, + + # Core service: owns run state, logs, traces + self.run_service = RunService( + runtime_factory=self.runtime_factory, + trace_manager=self.trace_manager, + on_run_updated=self._on_run_updated, + on_log=self._on_log_for_ui, + on_trace=self._on_trace_for_ui, ) + # Just defaults for convenience + self.initial_entrypoint: str = "main.py" + self.initial_input: str = '{\n "message": "Hello World"\n}' + def compose(self) -> ComposeResult: """Compose the UI layout.""" with Horizontal(): @@ -127,8 +124,10 @@ async def handle_chat_input(self, event: Input.Submitted) -> None: "Wait for agent response...", timeout=1.5, severity="warning" ) return + if details_panel.current_run.status == "suspended": details_panel.current_run.resume_data = {"message": user_text} + asyncio.create_task(self._execute_runtime(details_panel.current_run)) event.input.clear() @@ -145,24 +144,24 @@ async def action_cancel(self) -> None: await self.action_new_run() async def action_execute_run(self) -> None: - """Execute a new run with UiPath runtime.""" + """Execute a new run based on NewRunPanel inputs.""" new_run_panel = self.query_one("#new-run-panel", NewRunPanel) entrypoint, input_data, conversational = new_run_panel.get_input_values() if not entrypoint: return - input: dict[str, Any] = {} try: - input = json.loads(input_data) + input_payload: dict[str, Any] = json.loads(input_data) except json.JSONDecodeError: return - run = ExecutionRun(entrypoint, input, conversational) + run = ExecutionRun(entrypoint, input_payload, conversational) - self.runs[run.id] = run + history_panel = self.query_one("#history-panel", RunHistoryPanel) + history_panel.add_run(run) - self._add_run_in_history(run) + self.run_service.register_run(run) self._show_run_details(run) @@ -187,140 +186,48 @@ def action_copy(self) -> None: else: self.app.notify("Nothing to copy here.", timeout=1.5, severity="warning") - async def _execute_runtime(self, run: ExecutionRun): - """Execute the script using UiPath runtime.""" - try: - execution_input: Optional[dict[str, Any]] = {} - execution_options: UiPathExecuteOptions = UiPathExecuteOptions() - if run.status == "suspended": - execution_input = run.resume_data - execution_options.resume = True - self._add_info_log(run, f"Resuming execution: {run.entrypoint}") - else: - execution_input = run.input_data - self._add_info_log(run, f"Starting execution: {run.entrypoint}") - - run.status = "running" - run.start_time = datetime.now() - log_handler = RunContextLogHandler( - run_id=run.id, - callback=self._handle_log_message, - ) - runtime = await self.runtime_factory.new_runtime(entrypoint=run.entrypoint) - execution_runtime = UiPathExecutionRuntime( - delegate=runtime, - trace_manager=self.trace_manager, - log_handler=log_handler, - execution_id=run.id, - ) - result = await execution_runtime.execute(execution_input, execution_options) - - if result is not None: - if ( - result.status == UiPathRuntimeStatus.SUSPENDED.value - and result.resume - ): - run.status = "suspended" - else: - if result.output is None: - run.output_data = {} - elif isinstance(result.output, BaseModel): - run.output_data = result.output.model_dump() - else: - run.output_data = result.output - run.status = "completed" - if run.output_data: - self._add_info_log(run, f"Execution result: {run.output_data}") - - self._add_info_log(run, "✅ Execution completed successfully") - run.end_time = datetime.now() - - except UiPathRuntimeError as e: - self._add_error_log(run) - run.status = "failed" - run.end_time = datetime.now() - run.error = e.error_info - - except Exception as e: - self._add_error_log(run) - run.status = "failed" - run.end_time = datetime.now() - run.error = UiPathErrorContract( - code="Unknown", title=str(e), detail=traceback.format_exc() - ) - - self._update_run_in_history(run) - self._update_run_details(run) - - def _show_run_details(self, run: ExecutionRun): + async def _execute_runtime(self, run: ExecutionRun) -> None: + """Wrapper that delegates execution to RunService.""" + await self.run_service.execute(run) + + def _on_run_updated(self, run: ExecutionRun) -> None: + """Called whenever a run changes (status, times, logs, traces).""" + # Update the run in history + history_panel = self.query_one("#history-panel", RunHistoryPanel) + history_panel.update_run(run) + + # If this run is currently shown, refresh details + details_panel = self.query_one("#details-panel", RunDetailsPanel) + if details_panel.current_run and details_panel.current_run.id == run.id: + details_panel.update_run_details(run) + + def _on_log_for_ui(self, log_msg: LogMessage) -> None: + """Append a log message to the logs UI.""" + details_panel = self.query_one("#details-panel", RunDetailsPanel) + details_panel.add_log(log_msg) + + def _on_trace_for_ui(self, trace_msg: TraceMessage) -> None: + """Append/refresh traces in the UI.""" + details_panel = self.query_one("#details-panel", RunDetailsPanel) + details_panel.add_trace(trace_msg) + + def _show_run_details(self, run: ExecutionRun) -> None: """Show details panel for a specific run.""" - # Hide new run panel, show details panel new_panel = self.query_one("#new-run-panel") details_panel = self.query_one("#details-panel", RunDetailsPanel) new_panel.add_class("hidden") details_panel.remove_class("hidden") - # Populate the details panel with run data details_panel.update_run(run) - def _focus_chat_input(self): + def _focus_chat_input(self) -> None: """Focus the chat input box.""" details_panel = self.query_one("#details-panel", RunDetailsPanel) details_panel.switch_tab("chat-tab") chat_input = details_panel.query_one("#chat-input", Input) chat_input.focus() - def _add_run_in_history(self, run: ExecutionRun): - """Add run to history panel.""" - history_panel = self.query_one("#history-panel", RunHistoryPanel) - history_panel.add_run(run) - - def _update_run_in_history(self, run: ExecutionRun): - """Update run display in history panel.""" - history_panel = self.query_one("#history-panel", RunHistoryPanel) - history_panel.update_run(run) - - def _update_run_details(self, run: ExecutionRun): - """Update the displayed run information.""" - details_panel = self.query_one("#details-panel", RunDetailsPanel) - details_panel.update_run_details(run) - - def _handle_trace_message(self, trace_msg: TraceMessage): - """Handle trace message from exporter.""" - run = self.runs[trace_msg.run_id] - for i, existing_trace in enumerate(run.traces): - if existing_trace.span_id == trace_msg.span_id: - run.traces[i] = trace_msg - break - else: - run.traces.append(trace_msg) - - details_panel = self.query_one("#details-panel", RunDetailsPanel) - details_panel.add_trace(trace_msg) - - def _handle_log_message(self, log_msg: LogMessage): - """Handle log message from exporter.""" - self.runs[log_msg.run_id].logs.append(log_msg) - details_panel = self.query_one("#details-panel", RunDetailsPanel) - details_panel.add_log(log_msg) - - def _add_info_log(self, run: ExecutionRun, message: str): - """Add info log to run.""" - timestamp = datetime.now() - log_msg = LogMessage(run.id, "INFO", message, timestamp) - self._handle_log_message(log_msg) - - def _add_error_log(self, run: ExecutionRun): - """Add error log to run.""" - timestamp = datetime.now() - tb = Traceback( - show_locals=False, - max_frames=4, - ) - log_msg = LogMessage(run.id, "ERROR", tb, timestamp) - self._handle_log_message(log_msg) - def _add_subprocess_log(self, level: str, message: str) -> None: """Handle a stderr line coming from subprocesses.""" @@ -329,6 +236,7 @@ def add_log() -> None: run = getattr(details_panel, "current_run", None) if run: log_msg = LogMessage(run.id, level, message, datetime.now()) - self._handle_log_message(log_msg) + # Route through RunService so state + UI stay in sync + self.run_service.handle_log(log_msg) self.call_from_thread(add_log) diff --git a/src/uipath/dev/services/__init__.py b/src/uipath/dev/services/__init__.py new file mode 100644 index 0000000..8ed5680 --- /dev/null +++ b/src/uipath/dev/services/__init__.py @@ -0,0 +1,7 @@ +"""UiPath Developer Console services module.""" + +from uipath.dev.services.run_service import RunService + +__all__ = [ + "RunService", +] diff --git a/src/uipath/dev/services/run_service.py b/src/uipath/dev/services/run_service.py new file mode 100644 index 0000000..e084d30 --- /dev/null +++ b/src/uipath/dev/services/run_service.py @@ -0,0 +1,203 @@ +"""UiPath Developer Console run service module.""" + +from __future__ import annotations + +import traceback +from datetime import datetime +from typing import Any, Callable, Dict, Optional + +from pydantic import BaseModel +from uipath.core.tracing import UiPathTraceManager +from uipath.runtime import ( + UiPathExecuteOptions, + UiPathExecutionRuntime, + UiPathRuntimeFactoryProtocol, + UiPathRuntimeStatus, +) +from uipath.runtime.errors import UiPathErrorContract, UiPathRuntimeError + +from uipath.dev.infrastructure import RunContextExporter, RunContextLogHandler +from uipath.dev.models import ExecutionRun, LogMessage, TraceMessage + +RunUpdatedCallback = Callable[[ExecutionRun], None] +LogCallback = Callable[[LogMessage], None] +TraceCallback = Callable[[TraceMessage], None] + + +class RunService: + """Orchestrates execution runs and keeps ExecutionRun state in sync. + + - Executes / resumes runtimes + - Updates run status, timings, output, and error + - Collects logs and traces + - Notifies observers via callbacks + """ + + def __init__( + self, + runtime_factory: UiPathRuntimeFactoryProtocol, + trace_manager: UiPathTraceManager, + on_run_updated: Optional[RunUpdatedCallback] = None, + on_log: Optional[LogCallback] = None, + on_trace: Optional[TraceCallback] = None, + ) -> None: + """Initialize RunService with runtime factory and trace manager.""" + self.runtime_factory = runtime_factory + self.trace_manager = trace_manager + self.runs: Dict[str, ExecutionRun] = {} + + self.on_run_updated = on_run_updated + self.on_log = on_log + self.on_trace = on_trace + + self.trace_manager.add_span_exporter( + RunContextExporter( + on_trace=self.handle_trace, + on_log=self.handle_log, + ), + batch=False, + ) + + def register_run(self, run: ExecutionRun) -> None: + """Register a new run and emit an initial update.""" + self.runs[run.id] = run + self._emit_run_updated(run) + + def get_run(self, run_id: str) -> Optional[ExecutionRun]: + """Get a registered run.""" + return self.runs.get(run_id) + + async def execute(self, run: ExecutionRun) -> None: + """Execute or resume a run. + + This is the extracted version of the old `_execute_runtime` method. + """ + try: + execution_input: Optional[dict[str, Any]] = {} + execution_options: UiPathExecuteOptions = UiPathExecuteOptions() + + if run.status == "suspended": + execution_input = run.resume_data + execution_options.resume = True + self._add_info_log(run, f"Resuming execution: {run.entrypoint}") + else: + execution_input = run.input_data + self._add_info_log(run, f"Starting execution: {run.entrypoint}") + + run.status = "running" + run.start_time = datetime.now() + self._emit_run_updated(run) + + # Attach log handler that goes back into this service + log_handler = RunContextLogHandler( + run_id=run.id, + callback=self.handle_log, + ) + + runtime = await self.runtime_factory.new_runtime(entrypoint=run.entrypoint) + + execution_runtime = UiPathExecutionRuntime( + delegate=runtime, + trace_manager=self.trace_manager, + log_handler=log_handler, + execution_id=run.id, + ) + + result = await execution_runtime.execute(execution_input, execution_options) + + if result is not None: + if ( + result.status == UiPathRuntimeStatus.SUSPENDED.value + and result.resume + ): + run.status = "suspended" + else: + if result.output is None: + run.output_data = {} + elif isinstance(result.output, BaseModel): + run.output_data = result.output.model_dump() + else: + run.output_data = result.output + run.status = "completed" + + if run.output_data: + self._add_info_log(run, f"Execution result: {run.output_data}") + + self._add_info_log(run, "✅ Execution completed successfully") + run.end_time = datetime.now() + + except UiPathRuntimeError as e: + self._add_error_log(run) + run.status = "failed" + run.end_time = datetime.now() + run.error = e.error_info + + except Exception as e: + self._add_error_log(run) + run.status = "failed" + run.end_time = datetime.now() + run.error = UiPathErrorContract( + code="Unknown", + title=str(e), + detail=traceback.format_exc(), + ) + + self.runs[run.id] = run + self._emit_run_updated(run) + + def handle_log(self, log_msg: LogMessage) -> None: + """Entry point for all logs (runtime, traces, stderr).""" + run = self.runs.get(log_msg.run_id) + if run is not None: + run.logs.append(log_msg) + self._emit_run_updated(run) + + if self.on_log is not None: + self.on_log(log_msg) + + def handle_trace(self, trace_msg: TraceMessage) -> None: + """Entry point for traces (from RunContextExporter).""" + run = self.runs.get(trace_msg.run_id) + if run is not None: + # Update or append trace + for i, existing_trace in enumerate(run.traces): + if existing_trace.span_id == trace_msg.span_id: + run.traces[i] = trace_msg + break + else: + run.traces.append(trace_msg) + + self._emit_run_updated(run) + + if self.on_trace is not None: + self.on_trace(trace_msg) + + def _emit_run_updated(self, run: ExecutionRun) -> None: + """Notify observers that a run's state changed.""" + self.runs[run.id] = run + if self.on_run_updated is not None: + self.on_run_updated(run) + + def _add_info_log(self, run: ExecutionRun, message: str) -> None: + log_msg = LogMessage( + run_id=run.id, + level="INFO", + message=message, + timestamp=datetime.now(), + ) + self.handle_log(log_msg) + + def _add_error_log(self, run: ExecutionRun) -> None: + from rich.traceback import Traceback + + tb = Traceback( + show_locals=False, + max_frames=4, + ) + log_msg = LogMessage( + run_id=run.id, + level="ERROR", + message=tb, + timestamp=datetime.now(), + ) + self.handle_log(log_msg) diff --git a/src/uipath/dev/ui/panels/run_history_panel.py b/src/uipath/dev/ui/panels/run_history_panel.py index e726bac..01b2d86 100644 --- a/src/uipath/dev/ui/panels/run_history_panel.py +++ b/src/uipath/dev/ui/panels/run_history_panel.py @@ -2,6 +2,7 @@ from typing import List, Optional +from rich.text import Text from textual.app import ComposeResult from textual.containers import Container, Vertical from textual.widgets import ( @@ -20,13 +21,13 @@ class RunHistoryPanel(Container): """Left panel showing execution run history.""" def __init__(self, **kwargs): - """Initialize RunHistoryPanel.""" + """Initialize RunHistoryPanel with empty run list.""" super().__init__(**kwargs) self.runs: List[ExecutionRun] = [] self.selected_run: Optional[ExecutionRun] = None def compose(self) -> ComposeResult: - """Compose the UI layout.""" + """Compose the RunHistoryPanel layout.""" with TabbedContent(): with TabPane("History", id="history-tab"): with Vertical(): @@ -39,72 +40,104 @@ def compose(self) -> ComposeResult: ) def on_mount(self) -> None: - """Set up periodic refresh on mount.""" - # Update only running items every 5 seconds + """Set up periodic refresh for running items.""" self.set_interval(5.0, self._refresh_running_items) - def add_run(self, run: ExecutionRun): - """Add a new run to history.""" - self.runs.insert(0, run) # Add to top - self.refresh_list() + def add_run(self, run: ExecutionRun) -> None: + """Add a new run to history (at the top).""" + self.runs.insert(0, run) + self._rebuild_list() - def update_run(self, run: ExecutionRun): - """Update an existing run.""" - self.refresh_list() - - def refresh_list(self): - """Refresh the run list display.""" - run_list = self.query_one("#run-list", ListView) - run_list.clear() - - for run in self.runs: - item = ListItem( - Static(run.display_name), classes=f"run-item run-{run.status}" - ) - # Store run id directly on the ListItem - item.run_id = run.id # type: ignore[attr-defined] - run_list.append(item) + def update_run(self, run: ExecutionRun) -> None: + """Update an existing run's row (does not insert new runs).""" + for index, existing in enumerate(self.runs): + if existing.id == run.id: + self.runs[index] = run + self._update_list_item(run) + break + # If run not found, just ignore; creation is done via add_run() def get_run_by_id(self, run_id: str) -> Optional[ExecutionRun]: - """Get run by id.""" + """Get a run.""" for run in self.runs: if run.id == run_id: return run return None - def clear_runs(self): + def clear_runs(self) -> None: """Clear all runs from history.""" self.runs.clear() - self.refresh_list() + self._rebuild_list() - def _refresh_running_items(self) -> None: - """Refresh display names for running items only.""" - if not any(run.status == "running" for run in self.runs): - return None + def _format_run_label(self, run: ExecutionRun) -> Text: + """Format the label for a run item. - try: - run_list = self.query_one("#run-list", ListView) - except Exception: - return None + - Preserves styling from `ExecutionRun.display_name` (rich.Text) + - Ensures exactly one leading space before the content + """ + base = run.display_name - # Take a snapshot of items to avoid mid-iteration changes - items_snapshot = list(run_list.children) + # Ensure we have a Text object + if not isinstance(base, Text): + base = Text(str(base)) - for item in items_snapshot: - if not hasattr(item, "run_id"): - continue + # Work on a copy so we don't mutate the model’s display_name + text = base.copy() - run = self.get_run_by_id(item.run_id) - if not run or run.status != "running": - continue + # We want exactly one leading space visually. + # Rich Text doesn't have an in-place "lstrip" that keeps spans perfect, + # so we just check the plain text and conditionally prepend. + if not text.plain.startswith(" "): + text = Text(" ") + text + + return text + + def _rebuild_list(self) -> None: + run_list = self.query_one("#run-list", ListView) + run_list.clear() + + for run in self.runs: + item = self._create_list_item(run) + run_list.append(item) + + def _create_list_item(self, run: ExecutionRun) -> ListItem: + item = ListItem( + Static(run.display_name), + classes=f"run-item run-{run.status}", + ) + item.run_id = run.id # type: ignore[attr-defined] + return item + + def _update_list_item(self, run: ExecutionRun) -> None: + """Update only the ListItem corresponding to a single run.""" + try: + run_list = self.query_one("#run-list", ListView) + except Exception: + return - # Check if item still exists in the list (wasn't removed) - if item not in run_list.children: + for item in list(run_list.children): + run_id = getattr(item, "run_id", None) + if run_id != run.id: continue + # Update label try: static = item.query_one(Static) - static.update(run.display_name) + static.update(self._format_run_label(run)) except Exception: - # Item structure changed or was removed continue + + # Update status-related CSS class + new_classes = [cls for cls in item.classes if not cls.startswith("run-")] + new_classes.append(f"run-{run.status}") + item.set_classes(" ".join(new_classes)) + break + + def _refresh_running_items(self) -> None: + """Refresh display names for running items only.""" + if not any(run.status == "running" for run in self.runs): + return None + + for run in self.runs: + if run.status == "running": + self._update_list_item(run)