|
2 | 2 |
|
3 | 3 | import asyncio |
4 | 4 | import logging |
5 | | -from typing import Any, Optional |
| 5 | +from typing import Any, AsyncGenerator, Optional |
6 | 6 |
|
7 | 7 | from opentelemetry import trace |
8 | 8 | from uipath.runtime import ( |
9 | | - UiPathBaseRuntime, |
10 | 9 | UiPathExecuteOptions, |
11 | | - UiPathRuntimeFactory, |
| 10 | + UiPathRuntimeEvent, |
| 11 | + UiPathRuntimeProtocol, |
12 | 12 | UiPathRuntimeResult, |
13 | 13 | UiPathRuntimeStatus, |
| 14 | + UiPathStreamOptions, |
14 | 15 | ) |
15 | 16 | from uipath.runtime.schema import UiPathRuntimeSchema |
16 | 17 |
|
17 | 18 | logger = logging.getLogger(__name__) |
18 | 19 |
|
19 | 20 |
|
20 | | -class MockRuntime(UiPathBaseRuntime): |
| 21 | +class MockRuntime: |
21 | 22 | """A mock runtime that simulates a multi-step workflow with rich telemetry.""" |
22 | 23 |
|
23 | 24 | async def get_schema(self) -> UiPathRuntimeSchema: |
@@ -224,18 +225,30 @@ async def execute( |
224 | 225 | status=UiPathRuntimeStatus.SUCCESSFUL, |
225 | 226 | ) |
226 | 227 |
|
227 | | - async def cleanup(self) -> None: |
228 | | - logger.info("MockRuntime: cleanup() invoked") |
229 | | - print("[MockRuntime] cleanup() invoked") |
| 228 | + async def stream( |
| 229 | + self, |
| 230 | + input: Optional[dict[str, Any]] = None, |
| 231 | + options: Optional[UiPathStreamOptions] = None, |
| 232 | + ) -> AsyncGenerator[UiPathRuntimeEvent, None]: |
| 233 | + logger.info("MockRuntime: stream() invoked") |
| 234 | + print("[MockRuntime] stream() invoked") |
| 235 | + yield await self.execute(input=input, options=options) |
| 236 | + |
| 237 | + async def dispose(self) -> None: |
| 238 | + logger.info("MockRuntime: dispose() invoked") |
| 239 | + print("[MockRuntime] dispose() invoked") |
230 | 240 |
|
231 | 241 |
|
232 | | -class MockRuntimeFactory(UiPathRuntimeFactory[MockRuntime]): |
| 242 | +class MockRuntimeFactory: |
233 | 243 | """Runtime factory compatible with UiPathDevTerminal expectations.""" |
234 | 244 |
|
235 | 245 | # This is the method the Textual app calls here: |
236 | | - # runtime = self.runtime_factory.new_runtime(entrypoint=run.entrypoint) |
237 | | - def new_runtime(self, entrypoint: str) -> MockRuntime: |
| 246 | + # runtime = await self.runtime_factory.new_runtime(entrypoint=run.entrypoint) |
| 247 | + async def new_runtime(self, entrypoint: str) -> UiPathRuntimeProtocol: |
238 | 248 | return MockRuntime() |
239 | 249 |
|
240 | | - def discover_runtimes(self) -> list[MockRuntime]: |
| 250 | + def discover_runtimes(self) -> list[UiPathRuntimeProtocol]: |
| 251 | + return [] |
| 252 | + |
| 253 | + def discover_entrypoints(self) -> list[str]: |
241 | 254 | return [] |
0 commit comments