Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-runtime"
version = "0.8.5"
version = "0.8.6"
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
7 changes: 6 additions & 1 deletion src/uipath/runtime/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
All events inherit from UiPathRuntimeEvent and can be filtered by execution_id.
"""

from uipath.runtime.events.base import UiPathRuntimeEvent, UiPathRuntimeEventType
from uipath.runtime.events.base import (
UiPathRuntimeEvent,
UiPathRuntimeEventType,
UiPathRuntimeStatePhase,
)
from uipath.runtime.events.state import (
UiPathRuntimeMessageEvent,
UiPathRuntimeStateEvent,
Expand All @@ -18,6 +22,7 @@
# Base
"UiPathRuntimeEvent",
"UiPathRuntimeEventType",
"UiPathRuntimeStatePhase",
# Runtime events
"UiPathRuntimeStateEvent",
"UiPathRuntimeMessageEvent",
Expand Down
11 changes: 10 additions & 1 deletion src/uipath/runtime/events/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ class UiPathRuntimeEventType(str, Enum):
RUNTIME_RESULT = "runtime_result"


class UiPathRuntimeStatePhase(str, Enum):
"""Lifecycle phase of a state event."""

STARTED = "started"
UPDATED = "updated"
COMPLETED = "completed"
FAULTED = "faulted"


class UiPathRuntimeEvent(BaseModel):
"""Base class for all UiPath runtime events."""

Expand All @@ -29,4 +38,4 @@ class UiPathRuntimeEvent(BaseModel):
)


__all__ = ["UiPathRuntimeEventType", "UiPathRuntimeEvent"]
__all__ = ["UiPathRuntimeEventType", "UiPathRuntimeStatePhase", "UiPathRuntimeEvent"]
10 changes: 9 additions & 1 deletion src/uipath/runtime/events/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

from pydantic import Field

from uipath.runtime.events.base import UiPathRuntimeEvent, UiPathRuntimeEventType
from uipath.runtime.events.base import (
UiPathRuntimeEvent,
UiPathRuntimeEventType,
UiPathRuntimeStatePhase,
)


class UiPathRuntimeMessageEvent(UiPathRuntimeEvent):
Expand Down Expand Up @@ -69,6 +73,10 @@ class UiPathRuntimeStateEvent(UiPathRuntimeEvent):
default=None,
description="Fully qualified node name including subgraph hierarchy prefix",
)
phase: UiPathRuntimeStatePhase = Field(
default=UiPathRuntimeStatePhase.UPDATED,
description="Lifecycle phase: started, updated, or completed",
)
event_type: UiPathRuntimeEventType = Field(
default=UiPathRuntimeEventType.RUNTIME_STATE, frozen=True
)
Expand Down
147 changes: 147 additions & 0 deletions tests/test_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from typing import Any, AsyncGenerator

import pytest

from uipath.runtime.base import UiPathStreamOptions
from uipath.runtime.events import (
UiPathRuntimeEvent,
UiPathRuntimeStateEvent,
UiPathRuntimeStatePhase,
)
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus


def test_state_event_phase_defaults_to_updated() -> None:
"""Phase should default to UPDATED for backward compatibility."""
event = UiPathRuntimeStateEvent(payload={"messages": []})

assert event.phase == UiPathRuntimeStatePhase.UPDATED


def test_state_event_phase_can_be_set_explicitly() -> None:
"""Phase should accept any valid UiPathRuntimeStatePhase value."""
for phase in UiPathRuntimeStatePhase:
event = UiPathRuntimeStateEvent(payload={}, phase=phase)
assert event.phase == phase


class PhaseAwareMockRuntime:
"""Mock runtime that emits started/updated/completed state events per node."""

def __init__(self, nodes: list[str], *, failing_node: str | None = None) -> None:
self.nodes = nodes
self.failing_node = failing_node

async def stream(
self,
input: dict[str, Any] | None = None,
options: UiPathStreamOptions | None = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
for node in self.nodes:
yield UiPathRuntimeStateEvent(
node_name=node,
payload={},
phase=UiPathRuntimeStatePhase.STARTED,
)

if node == self.failing_node:
yield UiPathRuntimeStateEvent(
node_name=node,
payload={"error": f"{node} failed"},
phase=UiPathRuntimeStatePhase.FAULTED,
)
yield UiPathRuntimeResult(
status=UiPathRuntimeStatus.FAULTED,
output={"error": f"{node} failed"},
)
return

yield UiPathRuntimeStateEvent(
node_name=node,
payload={"key": f"{node}_value"},
phase=UiPathRuntimeStatePhase.UPDATED,
)
yield UiPathRuntimeStateEvent(
node_name=node,
payload={"key": f"{node}_value"},
phase=UiPathRuntimeStatePhase.COMPLETED,
)

yield UiPathRuntimeResult(
status=UiPathRuntimeStatus.SUCCESSFUL,
output={"done": True},
)


@pytest.mark.asyncio
async def test_runtime_stream_emits_phase_lifecycle() -> None:
"""A runtime streaming started/updated/completed phases per node."""
runtime = PhaseAwareMockRuntime(nodes=["planner", "executor"])

state_events: list[UiPathRuntimeStateEvent] = []
result: UiPathRuntimeResult | None = None

async for event in runtime.stream({}):
if isinstance(event, UiPathRuntimeResult):
result = event
elif isinstance(event, UiPathRuntimeStateEvent):
state_events.append(event)

# 2 nodes x 3 phases = 6 state events
assert len(state_events) == 6

# Verify per-node lifecycle ordering
for i, node in enumerate(["planner", "executor"]):
group = state_events[i * 3 : i * 3 + 3]
assert group[0].node_name == node
assert group[0].phase == UiPathRuntimeStatePhase.STARTED
assert group[0].payload == {}

assert group[1].node_name == node
assert group[1].phase == UiPathRuntimeStatePhase.UPDATED
assert group[1].payload == {"key": f"{node}_value"}

assert group[2].node_name == node
assert group[2].phase == UiPathRuntimeStatePhase.COMPLETED
assert group[2].payload == {"key": f"{node}_value"}

# Final result
assert result is not None
assert result.status == UiPathRuntimeStatus.SUCCESSFUL
assert result.output == {"done": True}


@pytest.mark.asyncio
async def test_runtime_stream_emits_faulted_phase_on_error() -> None:
"""A node that fails should emit started then faulted, and stop the stream."""
runtime = PhaseAwareMockRuntime(
nodes=["planner", "executor"], failing_node="executor"
)

state_events: list[UiPathRuntimeStateEvent] = []
result: UiPathRuntimeResult | None = None

async for event in runtime.stream({}):
if isinstance(event, UiPathRuntimeResult):
result = event
elif isinstance(event, UiPathRuntimeStateEvent):
state_events.append(event)

# planner: started, updated, completed (3) + executor: started, faulted (2) = 5
assert len(state_events) == 5

# planner completed normally
assert state_events[0].phase == UiPathRuntimeStatePhase.STARTED
assert state_events[1].phase == UiPathRuntimeStatePhase.UPDATED
assert state_events[2].phase == UiPathRuntimeStatePhase.COMPLETED

# executor started then faulted
assert state_events[3].node_name == "executor"
assert state_events[3].phase == UiPathRuntimeStatePhase.STARTED
assert state_events[4].node_name == "executor"
assert state_events[4].phase == UiPathRuntimeStatePhase.FAULTED
assert state_events[4].payload == {"error": "executor failed"}

# Result is faulted
assert result is not None
assert result.status == UiPathRuntimeStatus.FAULTED
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.