Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-llamaindex"
version = "0.1.8"
version = "0.2.0"
description = "UiPath LlamaIndex SDK"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand All @@ -10,7 +10,8 @@ dependencies = [
"llama-index-embeddings-azure-openai>=0.4.1",
"llama-index-llms-azure-openai>=0.4.2",
"openinference-instrumentation-llama-index>=4.3.9",
"uipath>=2.2.26, <2.3.0",
"uipath>=2.3.0, <2.4.0",
"uipath-runtime>=0.3.2, <0.4.0",
]
classifiers = [
"Intended Audience :: Developers",
Expand Down Expand Up @@ -58,6 +59,7 @@ dev = [
"pytest-cov>=4.1.0",
"pytest-mock>=3.11.1",
"pre-commit>=4.1.0",
"pytest-asyncio>=1.0.0",
"numpy>=1.24.0",
]

Expand Down Expand Up @@ -98,10 +100,11 @@ disallow_untyped_defs = false
testpaths = ["tests"]
python_files = "test_*.py"
addopts = "-ra -q"
asyncio_default_fixture_loop_scope = "function"
asyncio_mode = "auto"

[[tool.uv.index]]
name = "testpypi"
url = "https://test.pypi.org/simple/"
publish-url = "https://test.pypi.org/legacy/"
explicit = true

1 change: 1 addition & 0 deletions src/uipath_llamaindex/runtime/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ async def _create_runtime_instance(
delegate=base_runtime,
storage=storage,
trigger_manager=trigger_manager,
runtime_id=runtime_id,
)

async def new_runtime(
Expand Down
12 changes: 8 additions & 4 deletions src/uipath_llamaindex/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ async def _run_workflow(
Core workflow execution logic used by both execute() and stream().
"""
workflow_input = input or {}

is_resuming = bool(options and options.resume)

if is_resuming:
Expand All @@ -154,7 +153,11 @@ async def _run_workflow(
if is_resuming:
handler: WorkflowHandler = self.workflow.run(ctx=self._context)
if workflow_input:
handler.ctx.send_event(HumanResponseEvent(**workflow_input))
handler.ctx.send_event(
HumanResponseEvent(
**workflow_input.get(self.runtime_id, workflow_input)
)
)
else:
handler.ctx.send_event(BreakpointResumeEvent())
else:
Expand Down Expand Up @@ -267,13 +270,14 @@ def _create_suspended_result(
if hasattr(event, "_data") and "prefix" in event._data:
prefix = event._data["prefix"]

resume_map = {self.runtime_id: prefix or ""}
return UiPathRuntimeResult(
output=prefix,
output=resume_map,
status=UiPathRuntimeStatus.SUSPENDED,
)

return UiPathRuntimeResult(
output=event,
output={self.runtime_id: event},
status=UiPathRuntimeStatus.SUSPENDED,
)

Expand Down
200 changes: 178 additions & 22 deletions src/uipath_llamaindex/runtime/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import json
import os
import pickle
from typing import Any
from typing import Any, cast

import aiosqlite
from pydantic import BaseModel
from uipath.core.errors import ErrorCategory, UiPathFaultedTriggerError
from uipath.runtime import (
UiPathApiTrigger,
Expand Down Expand Up @@ -34,7 +35,12 @@ async def setup(self) -> None:
os.makedirs(dir_name, exist_ok=True)

try:
async with aiosqlite.connect(self.storage_path) as conn:
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
await self._apply_connection_pragmas(conn)

# WAL mode is persistent (stored in DB file), only needs to be set once
await conn.execute("PRAGMA journal_mode=WAL")

# Table for workflow contexts
await conn.execute("""
CREATE TABLE IF NOT EXISTS workflow_contexts (
Expand All @@ -47,54 +53,117 @@ async def setup(self) -> None:
await conn.execute("""
CREATE TABLE IF NOT EXISTS resume_triggers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
runtime_id TEXT NOT NULL,
interrupt_id TEXT NOT NULL,
trigger_data TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")

await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_resume_triggers_runtime_id
ON resume_triggers(runtime_id)
"""
)

await conn.execute(
"""
CREATE TABLE IF NOT EXISTS runtime_kv (
runtime_id TEXT NOT NULL,
namespace TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT,
timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')),
PRIMARY KEY (runtime_id, namespace, key)
)
"""
)

await conn.commit()
except aiosqlite.Error as exc:
msg = f"Failed to initialize SQLite storage at {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}"
raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc

async def save_trigger(self, trigger: UiPathResumeTrigger) -> None:
async def save_triggers(
self, runtime_id: str, triggers: list[UiPathResumeTrigger]
) -> None:
"""Save resume trigger to SQLite database."""
trigger_dict = self._serialize_trigger(trigger)
trigger_json = json.dumps(trigger_dict)

try:
async with aiosqlite.connect(self.storage_path) as conn:
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
await self._apply_connection_pragmas(conn)

# Delete all existing triggers for this runtime_id
await conn.execute(
"INSERT INTO resume_triggers (trigger_data) VALUES (?)",
(trigger_json,),
"""
DELETE FROM resume_triggers
WHERE runtime_id = ?
""",
(runtime_id,),
)
# Insert new triggers
for trigger in triggers:
trigger_dict = self._serialize_trigger(trigger)
trigger_json = json.dumps(trigger_dict)
await conn.execute(
"INSERT INTO resume_triggers (runtime_id, interrupt_id, trigger_data) VALUES (?, ?, ?)",
(runtime_id, trigger.interrupt_id, trigger_json),
)
await conn.commit()
except aiosqlite.Error as exc:
msg = (
f"Failed to save resume trigger "
f"(type={trigger.trigger_type}, name={trigger.trigger_name}) "
f"Failed to save resume triggers "
f"to database {self.storage_path!r}:"
f" {exc.sqlite_errorname} {exc.sqlite_errorcode}"
)
raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc

async def get_latest_trigger(self) -> UiPathResumeTrigger | None:
async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None:
"""Get most recent trigger from SQLite database."""
try:
async with aiosqlite.connect(self.storage_path) as conn:
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
await self._apply_connection_pragmas(conn)

cursor = await conn.execute(
"SELECT trigger_data FROM resume_triggers ORDER BY created_at DESC LIMIT 1"
"SELECT trigger_data FROM resume_triggers WHERE runtime_id = ? ORDER BY id ASC",
(runtime_id,),
)
row = await cursor.fetchone()
rows = await cursor.fetchall()
except aiosqlite.Error as exc:
msg = f"Failed to retrieve latest resume trigger from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}"
msg = f"Failed to retrieve resume triggers from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}"
raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc

if not row:
if not rows:
return None

trigger_dict = json.loads(row[0])
return self._deserialize_trigger(trigger_dict)
triggers = []
for row in rows:
trigger_dict = json.loads(row[0])
triggers.append(self._deserialize_trigger(trigger_dict))
return triggers

async def delete_trigger(
self, runtime_id: str, trigger: UiPathResumeTrigger
) -> None:
"""Delete resume trigger from storage."""
try:
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
await self._apply_connection_pragmas(conn)

await conn.execute(
"""
DELETE FROM resume_triggers
WHERE runtime_id = ? AND interrupt_id = ?
""",
(
runtime_id,
trigger.interrupt_id,
),
)
await conn.commit()
except aiosqlite.Error as exc:
msg = f"Failed to delete resume trigger from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}"
raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc

async def save_context(self, runtime_id: str, context_dict: dict[str, Any]) -> None:
"""
Expand All @@ -107,7 +176,9 @@ async def save_context(self, runtime_id: str, context_dict: dict[str, Any]) -> N
context_blob = pickle.dumps(context_dict)

try:
async with aiosqlite.connect(self.storage_path) as conn:
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
await self._apply_connection_pragmas(conn)

await conn.execute(
"""
INSERT INTO workflow_contexts (runtime_id, context_data)
Expand All @@ -133,7 +204,9 @@ async def load_context(self, runtime_id: str) -> dict[str, Any] | None:
Serialized workflow context dictionary or None if not found
"""
try:
async with aiosqlite.connect(self.storage_path) as conn:
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
await self._apply_connection_pragmas(conn)

cursor = await conn.execute(
"SELECT context_data FROM workflow_contexts WHERE runtime_id = ?",
(runtime_id,),
Expand All @@ -148,6 +221,61 @@ async def load_context(self, runtime_id: str) -> dict[str, Any] | None:

return pickle.loads(row[0])

async def set_value(
self,
runtime_id: str,
namespace: str,
key: str,
value: Any,
) -> None:
"""Save arbitrary key-value pair to database."""
if not (
isinstance(value, str)
or isinstance(value, dict)
or isinstance(value, BaseModel)
or value is None
):
raise TypeError("Value must be str, dict, BaseModel or None.")

value_text = self._dump_value(value)

async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
await self._apply_connection_pragmas(conn)

await conn.execute(
"""
INSERT INTO runtime_kv (runtime_id, namespace, key, value)
VALUES (?, ?, ?, ?)
ON CONFLICT(runtime_id, namespace, key)
DO UPDATE SET
value = excluded.value,
timestamp = (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc'))
""",
(runtime_id, namespace, key, value_text),
)
await conn.commit()

async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any:
"""Get arbitrary key-value pair from database (scoped by runtime_id + namespace)."""
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
await self._apply_connection_pragmas(conn)

cur = await conn.execute(
"""
SELECT value
FROM runtime_kv
WHERE runtime_id = ? AND namespace = ? AND key = ?
LIMIT 1
""",
(runtime_id, namespace, key),
)
row = await cur.fetchone()

if not row:
return None

return self._load_value(cast(str | None, row[0]))

def _serialize_trigger(self, trigger: UiPathResumeTrigger) -> dict[str, Any]:
"""Serialize a resume trigger to a dictionary."""
trigger_key = (
Expand All @@ -166,6 +294,7 @@ def _serialize_trigger(self, trigger: UiPathResumeTrigger) -> dict[str, Any]:
"key": trigger_key,
"name": trigger.trigger_name.value,
"payload": payload,
"interrupt_id": trigger.interrupt_id,
"folder_path": trigger.folder_path,
"folder_key": trigger.folder_key,
}
Expand All @@ -178,6 +307,7 @@ def _deserialize_trigger(self, trigger_data: dict[str, Any]) -> UiPathResumeTrig
folder_path = trigger_data.get("folder_path")
folder_key = trigger_data.get("folder_key")
payload = trigger_data.get("payload")
interrupt_id = trigger_data.get("interrupt_id")

resume_trigger = UiPathResumeTrigger(
trigger_type=UiPathResumeTriggerType(trigger_type),
Expand All @@ -186,6 +316,7 @@ def _deserialize_trigger(self, trigger_data: dict[str, Any]) -> UiPathResumeTrig
folder_path=folder_path,
folder_key=folder_key,
payload=payload,
interrupt_id=interrupt_id,
)

if resume_trigger.trigger_type == UiPathResumeTriggerType.API:
Expand All @@ -194,3 +325,28 @@ def _deserialize_trigger(self, trigger_data: dict[str, Any]) -> UiPathResumeTrig
)

return resume_trigger

def _dump_value(self, value: str | dict[str, Any] | BaseModel | None) -> str | None:
if value is None:
return None
if isinstance(value, BaseModel):
return "j:" + json.dumps(value.model_dump())
if isinstance(value, dict):
return "j:" + json.dumps(value)
return "s:" + value

def _load_value(self, raw: str | None) -> Any:
if raw is None:
return None
if raw.startswith("s:"):
return raw[2:]
if raw.startswith("j:"):
return json.loads(raw[2:])
return raw

async def _apply_connection_pragmas(self, conn: aiosqlite.Connection) -> None:
"""Apply per-connection PRAGMA settings for optimal concurrency."""
await conn.execute("PRAGMA busy_timeout=30000")
await conn.execute("PRAGMA synchronous=NORMAL")
await conn.execute("PRAGMA cache_size=10000")
await conn.execute("PRAGMA temp_store=MEMORY")
Loading