Skip to content

Commit d6f0e24

Browse files
authored
Merge pull request #99 from UiPath/fix/update_runtime_0_3
fix: storage scoped to runtime id
2 parents 7a3cc61 + 8cc3202 commit d6f0e24

File tree

7 files changed

+1431
-37
lines changed

7 files changed

+1431
-37
lines changed

pyproject.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-llamaindex"
3-
version = "0.1.8"
3+
version = "0.2.0"
44
description = "UiPath LlamaIndex SDK"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"
@@ -10,7 +10,8 @@ dependencies = [
1010
"llama-index-embeddings-azure-openai>=0.4.1",
1111
"llama-index-llms-azure-openai>=0.4.2",
1212
"openinference-instrumentation-llama-index>=4.3.9",
13-
"uipath>=2.2.26, <2.3.0",
13+
"uipath>=2.3.0, <2.4.0",
14+
"uipath-runtime>=0.3.2, <0.4.0",
1415
]
1516
classifiers = [
1617
"Intended Audience :: Developers",
@@ -58,6 +59,7 @@ dev = [
5859
"pytest-cov>=4.1.0",
5960
"pytest-mock>=3.11.1",
6061
"pre-commit>=4.1.0",
62+
"pytest-asyncio>=1.0.0",
6163
"numpy>=1.24.0",
6264
]
6365

@@ -98,10 +100,11 @@ disallow_untyped_defs = false
98100
testpaths = ["tests"]
99101
python_files = "test_*.py"
100102
addopts = "-ra -q"
103+
asyncio_default_fixture_loop_scope = "function"
104+
asyncio_mode = "auto"
101105

102106
[[tool.uv.index]]
103107
name = "testpypi"
104108
url = "https://test.pypi.org/simple/"
105109
publish-url = "https://test.pypi.org/legacy/"
106110
explicit = true
107-

src/uipath_llamaindex/runtime/factory.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ async def _create_runtime_instance(
260260
delegate=base_runtime,
261261
storage=storage,
262262
trigger_manager=trigger_manager,
263+
runtime_id=runtime_id,
263264
)
264265

265266
async def new_runtime(

src/uipath_llamaindex/runtime/runtime.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ async def _run_workflow(
138138
Core workflow execution logic used by both execute() and stream().
139139
"""
140140
workflow_input = input or {}
141-
142141
is_resuming = bool(options and options.resume)
143142

144143
if is_resuming:
@@ -154,7 +153,11 @@ async def _run_workflow(
154153
if is_resuming:
155154
handler: WorkflowHandler = self.workflow.run(ctx=self._context)
156155
if workflow_input:
157-
handler.ctx.send_event(HumanResponseEvent(**workflow_input))
156+
handler.ctx.send_event(
157+
HumanResponseEvent(
158+
**workflow_input.get(self.runtime_id, workflow_input)
159+
)
160+
)
158161
else:
159162
handler.ctx.send_event(BreakpointResumeEvent())
160163
else:
@@ -267,13 +270,14 @@ def _create_suspended_result(
267270
if hasattr(event, "_data") and "prefix" in event._data:
268271
prefix = event._data["prefix"]
269272

273+
resume_map = {self.runtime_id: prefix or ""}
270274
return UiPathRuntimeResult(
271-
output=prefix,
275+
output=resume_map,
272276
status=UiPathRuntimeStatus.SUSPENDED,
273277
)
274278

275279
return UiPathRuntimeResult(
276-
output=event,
280+
output={self.runtime_id: event},
277281
status=UiPathRuntimeStatus.SUSPENDED,
278282
)
279283

src/uipath_llamaindex/runtime/storage.py

Lines changed: 178 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
import json
44
import os
55
import pickle
6-
from typing import Any
6+
from typing import Any, cast
77

88
import aiosqlite
9+
from pydantic import BaseModel
910
from uipath.core.errors import ErrorCategory, UiPathFaultedTriggerError
1011
from uipath.runtime import (
1112
UiPathApiTrigger,
@@ -34,7 +35,12 @@ async def setup(self) -> None:
3435
os.makedirs(dir_name, exist_ok=True)
3536

3637
try:
37-
async with aiosqlite.connect(self.storage_path) as conn:
38+
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
39+
await self._apply_connection_pragmas(conn)
40+
41+
# WAL mode is persistent (stored in DB file), only needs to be set once
42+
await conn.execute("PRAGMA journal_mode=WAL")
43+
3844
# Table for workflow contexts
3945
await conn.execute("""
4046
CREATE TABLE IF NOT EXISTS workflow_contexts (
@@ -47,54 +53,117 @@ async def setup(self) -> None:
4753
await conn.execute("""
4854
CREATE TABLE IF NOT EXISTS resume_triggers (
4955
id INTEGER PRIMARY KEY AUTOINCREMENT,
56+
runtime_id TEXT NOT NULL,
57+
interrupt_id TEXT NOT NULL,
5058
trigger_data TEXT NOT NULL,
51-
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
59+
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
5260
)
5361
""")
5462

63+
await conn.execute(
64+
"""
65+
CREATE INDEX IF NOT EXISTS idx_resume_triggers_runtime_id
66+
ON resume_triggers(runtime_id)
67+
"""
68+
)
69+
70+
await conn.execute(
71+
"""
72+
CREATE TABLE IF NOT EXISTS runtime_kv (
73+
runtime_id TEXT NOT NULL,
74+
namespace TEXT NOT NULL,
75+
key TEXT NOT NULL,
76+
value TEXT,
77+
timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')),
78+
PRIMARY KEY (runtime_id, namespace, key)
79+
)
80+
"""
81+
)
82+
5583
await conn.commit()
5684
except aiosqlite.Error as exc:
5785
msg = f"Failed to initialize SQLite storage at {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}"
5886
raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc
5987

60-
async def save_trigger(self, trigger: UiPathResumeTrigger) -> None:
88+
async def save_triggers(
89+
self, runtime_id: str, triggers: list[UiPathResumeTrigger]
90+
) -> None:
6191
"""Save resume trigger to SQLite database."""
62-
trigger_dict = self._serialize_trigger(trigger)
63-
trigger_json = json.dumps(trigger_dict)
64-
6592
try:
66-
async with aiosqlite.connect(self.storage_path) as conn:
93+
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
94+
await self._apply_connection_pragmas(conn)
95+
96+
# Delete all existing triggers for this runtime_id
6797
await conn.execute(
68-
"INSERT INTO resume_triggers (trigger_data) VALUES (?)",
69-
(trigger_json,),
98+
"""
99+
DELETE FROM resume_triggers
100+
WHERE runtime_id = ?
101+
""",
102+
(runtime_id,),
70103
)
104+
# Insert new triggers
105+
for trigger in triggers:
106+
trigger_dict = self._serialize_trigger(trigger)
107+
trigger_json = json.dumps(trigger_dict)
108+
await conn.execute(
109+
"INSERT INTO resume_triggers (runtime_id, interrupt_id, trigger_data) VALUES (?, ?, ?)",
110+
(runtime_id, trigger.interrupt_id, trigger_json),
111+
)
71112
await conn.commit()
72113
except aiosqlite.Error as exc:
73114
msg = (
74-
f"Failed to save resume trigger "
75-
f"(type={trigger.trigger_type}, name={trigger.trigger_name}) "
115+
f"Failed to save resume triggers "
76116
f"to database {self.storage_path!r}:"
77117
f" {exc.sqlite_errorname} {exc.sqlite_errorcode}"
78118
)
79119
raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc
80120

81-
async def get_latest_trigger(self) -> UiPathResumeTrigger | None:
121+
async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None:
82122
"""Get most recent trigger from SQLite database."""
83123
try:
84-
async with aiosqlite.connect(self.storage_path) as conn:
124+
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
125+
await self._apply_connection_pragmas(conn)
126+
85127
cursor = await conn.execute(
86-
"SELECT trigger_data FROM resume_triggers ORDER BY created_at DESC LIMIT 1"
128+
"SELECT trigger_data FROM resume_triggers WHERE runtime_id = ? ORDER BY id ASC",
129+
(runtime_id,),
87130
)
88-
row = await cursor.fetchone()
131+
rows = await cursor.fetchall()
89132
except aiosqlite.Error as exc:
90-
msg = f"Failed to retrieve latest resume trigger from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}"
133+
msg = f"Failed to retrieve resume triggers from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}"
91134
raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc
92135

93-
if not row:
136+
if not rows:
94137
return None
95138

96-
trigger_dict = json.loads(row[0])
97-
return self._deserialize_trigger(trigger_dict)
139+
triggers = []
140+
for row in rows:
141+
trigger_dict = json.loads(row[0])
142+
triggers.append(self._deserialize_trigger(trigger_dict))
143+
return triggers
144+
145+
async def delete_trigger(
146+
self, runtime_id: str, trigger: UiPathResumeTrigger
147+
) -> None:
148+
"""Delete resume trigger from storage."""
149+
try:
150+
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
151+
await self._apply_connection_pragmas(conn)
152+
153+
await conn.execute(
154+
"""
155+
DELETE FROM resume_triggers
156+
WHERE runtime_id = ? AND interrupt_id = ?
157+
""",
158+
(
159+
runtime_id,
160+
trigger.interrupt_id,
161+
),
162+
)
163+
await conn.commit()
164+
except aiosqlite.Error as exc:
165+
msg = f"Failed to delete resume trigger from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}"
166+
raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc
98167

99168
async def save_context(self, runtime_id: str, context_dict: dict[str, Any]) -> None:
100169
"""
@@ -107,7 +176,9 @@ async def save_context(self, runtime_id: str, context_dict: dict[str, Any]) -> N
107176
context_blob = pickle.dumps(context_dict)
108177

109178
try:
110-
async with aiosqlite.connect(self.storage_path) as conn:
179+
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
180+
await self._apply_connection_pragmas(conn)
181+
111182
await conn.execute(
112183
"""
113184
INSERT INTO workflow_contexts (runtime_id, context_data)
@@ -133,7 +204,9 @@ async def load_context(self, runtime_id: str) -> dict[str, Any] | None:
133204
Serialized workflow context dictionary or None if not found
134205
"""
135206
try:
136-
async with aiosqlite.connect(self.storage_path) as conn:
207+
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
208+
await self._apply_connection_pragmas(conn)
209+
137210
cursor = await conn.execute(
138211
"SELECT context_data FROM workflow_contexts WHERE runtime_id = ?",
139212
(runtime_id,),
@@ -148,6 +221,61 @@ async def load_context(self, runtime_id: str) -> dict[str, Any] | None:
148221

149222
return pickle.loads(row[0])
150223

224+
async def set_value(
225+
self,
226+
runtime_id: str,
227+
namespace: str,
228+
key: str,
229+
value: Any,
230+
) -> None:
231+
"""Save arbitrary key-value pair to database."""
232+
if not (
233+
isinstance(value, str)
234+
or isinstance(value, dict)
235+
or isinstance(value, BaseModel)
236+
or value is None
237+
):
238+
raise TypeError("Value must be str, dict, BaseModel or None.")
239+
240+
value_text = self._dump_value(value)
241+
242+
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
243+
await self._apply_connection_pragmas(conn)
244+
245+
await conn.execute(
246+
"""
247+
INSERT INTO runtime_kv (runtime_id, namespace, key, value)
248+
VALUES (?, ?, ?, ?)
249+
ON CONFLICT(runtime_id, namespace, key)
250+
DO UPDATE SET
251+
value = excluded.value,
252+
timestamp = (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc'))
253+
""",
254+
(runtime_id, namespace, key, value_text),
255+
)
256+
await conn.commit()
257+
258+
async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any:
259+
"""Get arbitrary key-value pair from database (scoped by runtime_id + namespace)."""
260+
async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn:
261+
await self._apply_connection_pragmas(conn)
262+
263+
cur = await conn.execute(
264+
"""
265+
SELECT value
266+
FROM runtime_kv
267+
WHERE runtime_id = ? AND namespace = ? AND key = ?
268+
LIMIT 1
269+
""",
270+
(runtime_id, namespace, key),
271+
)
272+
row = await cur.fetchone()
273+
274+
if not row:
275+
return None
276+
277+
return self._load_value(cast(str | None, row[0]))
278+
151279
def _serialize_trigger(self, trigger: UiPathResumeTrigger) -> dict[str, Any]:
152280
"""Serialize a resume trigger to a dictionary."""
153281
trigger_key = (
@@ -166,6 +294,7 @@ def _serialize_trigger(self, trigger: UiPathResumeTrigger) -> dict[str, Any]:
166294
"key": trigger_key,
167295
"name": trigger.trigger_name.value,
168296
"payload": payload,
297+
"interrupt_id": trigger.interrupt_id,
169298
"folder_path": trigger.folder_path,
170299
"folder_key": trigger.folder_key,
171300
}
@@ -178,6 +307,7 @@ def _deserialize_trigger(self, trigger_data: dict[str, Any]) -> UiPathResumeTrig
178307
folder_path = trigger_data.get("folder_path")
179308
folder_key = trigger_data.get("folder_key")
180309
payload = trigger_data.get("payload")
310+
interrupt_id = trigger_data.get("interrupt_id")
181311

182312
resume_trigger = UiPathResumeTrigger(
183313
trigger_type=UiPathResumeTriggerType(trigger_type),
@@ -186,6 +316,7 @@ def _deserialize_trigger(self, trigger_data: dict[str, Any]) -> UiPathResumeTrig
186316
folder_path=folder_path,
187317
folder_key=folder_key,
188318
payload=payload,
319+
interrupt_id=interrupt_id,
189320
)
190321

191322
if resume_trigger.trigger_type == UiPathResumeTriggerType.API:
@@ -194,3 +325,28 @@ def _deserialize_trigger(self, trigger_data: dict[str, Any]) -> UiPathResumeTrig
194325
)
195326

196327
return resume_trigger
328+
329+
def _dump_value(self, value: str | dict[str, Any] | BaseModel | None) -> str | None:
330+
if value is None:
331+
return None
332+
if isinstance(value, BaseModel):
333+
return "j:" + json.dumps(value.model_dump())
334+
if isinstance(value, dict):
335+
return "j:" + json.dumps(value)
336+
return "s:" + value
337+
338+
def _load_value(self, raw: str | None) -> Any:
339+
if raw is None:
340+
return None
341+
if raw.startswith("s:"):
342+
return raw[2:]
343+
if raw.startswith("j:"):
344+
return json.loads(raw[2:])
345+
return raw
346+
347+
async def _apply_connection_pragmas(self, conn: aiosqlite.Connection) -> None:
348+
"""Apply per-connection PRAGMA settings for optimal concurrency."""
349+
await conn.execute("PRAGMA busy_timeout=30000")
350+
await conn.execute("PRAGMA synchronous=NORMAL")
351+
await conn.execute("PRAGMA cache_size=10000")
352+
await conn.execute("PRAGMA temp_store=MEMORY")

0 commit comments

Comments
 (0)