diff --git a/pyproject.toml b/pyproject.toml index e55d971..76699cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-llamaindex" -version = "0.1.8" +version = "0.2.0" description = "UiPath LlamaIndex SDK" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" @@ -10,7 +10,8 @@ dependencies = [ "llama-index-embeddings-azure-openai>=0.4.1", "llama-index-llms-azure-openai>=0.4.2", "openinference-instrumentation-llama-index>=4.3.9", - "uipath>=2.2.26, <2.3.0", + "uipath>=2.3.0, <2.4.0", + "uipath-runtime>=0.3.2, <0.4.0", ] classifiers = [ "Intended Audience :: Developers", @@ -58,6 +59,7 @@ dev = [ "pytest-cov>=4.1.0", "pytest-mock>=3.11.1", "pre-commit>=4.1.0", + "pytest-asyncio>=1.0.0", "numpy>=1.24.0", ] @@ -98,10 +100,11 @@ disallow_untyped_defs = false testpaths = ["tests"] python_files = "test_*.py" addopts = "-ra -q" +asyncio_default_fixture_loop_scope = "function" +asyncio_mode = "auto" [[tool.uv.index]] name = "testpypi" url = "https://test.pypi.org/simple/" publish-url = "https://test.pypi.org/legacy/" explicit = true - diff --git a/src/uipath_llamaindex/runtime/factory.py b/src/uipath_llamaindex/runtime/factory.py index 1d133e1..8e550f3 100644 --- a/src/uipath_llamaindex/runtime/factory.py +++ b/src/uipath_llamaindex/runtime/factory.py @@ -260,6 +260,7 @@ async def _create_runtime_instance( delegate=base_runtime, storage=storage, trigger_manager=trigger_manager, + runtime_id=runtime_id, ) async def new_runtime( diff --git a/src/uipath_llamaindex/runtime/runtime.py b/src/uipath_llamaindex/runtime/runtime.py index 1207b69..85716d3 100644 --- a/src/uipath_llamaindex/runtime/runtime.py +++ b/src/uipath_llamaindex/runtime/runtime.py @@ -138,7 +138,6 @@ async def _run_workflow( Core workflow execution logic used by both execute() and stream(). """ workflow_input = input or {} - is_resuming = bool(options and options.resume) if is_resuming: @@ -154,7 +153,11 @@ async def _run_workflow( if is_resuming: handler: WorkflowHandler = self.workflow.run(ctx=self._context) if workflow_input: - handler.ctx.send_event(HumanResponseEvent(**workflow_input)) + handler.ctx.send_event( + HumanResponseEvent( + **workflow_input.get(self.runtime_id, workflow_input) + ) + ) else: handler.ctx.send_event(BreakpointResumeEvent()) else: @@ -267,13 +270,14 @@ def _create_suspended_result( if hasattr(event, "_data") and "prefix" in event._data: prefix = event._data["prefix"] + resume_map = {self.runtime_id: prefix or ""} return UiPathRuntimeResult( - output=prefix, + output=resume_map, status=UiPathRuntimeStatus.SUSPENDED, ) return UiPathRuntimeResult( - output=event, + output={self.runtime_id: event}, status=UiPathRuntimeStatus.SUSPENDED, ) diff --git a/src/uipath_llamaindex/runtime/storage.py b/src/uipath_llamaindex/runtime/storage.py index 8dd9db7..2a5c99f 100644 --- a/src/uipath_llamaindex/runtime/storage.py +++ b/src/uipath_llamaindex/runtime/storage.py @@ -3,9 +3,10 @@ import json import os import pickle -from typing import Any +from typing import Any, cast import aiosqlite +from pydantic import BaseModel from uipath.core.errors import ErrorCategory, UiPathFaultedTriggerError from uipath.runtime import ( UiPathApiTrigger, @@ -34,7 +35,12 @@ async def setup(self) -> None: os.makedirs(dir_name, exist_ok=True) try: - async with aiosqlite.connect(self.storage_path) as conn: + async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: + await self._apply_connection_pragmas(conn) + + # WAL mode is persistent (stored in DB file), only needs to be set once + await conn.execute("PRAGMA journal_mode=WAL") + # Table for workflow contexts await conn.execute(""" CREATE TABLE IF NOT EXISTS workflow_contexts ( @@ -47,54 +53,117 @@ async def setup(self) -> None: await conn.execute(""" CREATE TABLE IF NOT EXISTS resume_triggers ( id INTEGER PRIMARY KEY AUTOINCREMENT, + runtime_id TEXT NOT NULL, + interrupt_id TEXT NOT NULL, trigger_data TEXT NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) + await conn.execute( + """ + CREATE INDEX IF NOT EXISTS idx_resume_triggers_runtime_id + ON resume_triggers(runtime_id) + """ + ) + + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS runtime_kv ( + runtime_id TEXT NOT NULL, + namespace TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT, + timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')), + PRIMARY KEY (runtime_id, namespace, key) + ) + """ + ) + await conn.commit() except aiosqlite.Error as exc: msg = f"Failed to initialize SQLite storage at {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}" raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc - async def save_trigger(self, trigger: UiPathResumeTrigger) -> None: + async def save_triggers( + self, runtime_id: str, triggers: list[UiPathResumeTrigger] + ) -> None: """Save resume trigger to SQLite database.""" - trigger_dict = self._serialize_trigger(trigger) - trigger_json = json.dumps(trigger_dict) - try: - async with aiosqlite.connect(self.storage_path) as conn: + async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: + await self._apply_connection_pragmas(conn) + + # Delete all existing triggers for this runtime_id await conn.execute( - "INSERT INTO resume_triggers (trigger_data) VALUES (?)", - (trigger_json,), + """ + DELETE FROM resume_triggers + WHERE runtime_id = ? + """, + (runtime_id,), ) + # Insert new triggers + for trigger in triggers: + trigger_dict = self._serialize_trigger(trigger) + trigger_json = json.dumps(trigger_dict) + await conn.execute( + "INSERT INTO resume_triggers (runtime_id, interrupt_id, trigger_data) VALUES (?, ?, ?)", + (runtime_id, trigger.interrupt_id, trigger_json), + ) await conn.commit() except aiosqlite.Error as exc: msg = ( - f"Failed to save resume trigger " - f"(type={trigger.trigger_type}, name={trigger.trigger_name}) " + f"Failed to save resume triggers " f"to database {self.storage_path!r}:" f" {exc.sqlite_errorname} {exc.sqlite_errorcode}" ) raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc - async def get_latest_trigger(self) -> UiPathResumeTrigger | None: + async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None: """Get most recent trigger from SQLite database.""" try: - async with aiosqlite.connect(self.storage_path) as conn: + async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: + await self._apply_connection_pragmas(conn) + cursor = await conn.execute( - "SELECT trigger_data FROM resume_triggers ORDER BY created_at DESC LIMIT 1" + "SELECT trigger_data FROM resume_triggers WHERE runtime_id = ? ORDER BY id ASC", + (runtime_id,), ) - row = await cursor.fetchone() + rows = await cursor.fetchall() except aiosqlite.Error as exc: - msg = f"Failed to retrieve latest resume trigger from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}" + msg = f"Failed to retrieve resume triggers from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}" raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc - if not row: + if not rows: return None - trigger_dict = json.loads(row[0]) - return self._deserialize_trigger(trigger_dict) + triggers = [] + for row in rows: + trigger_dict = json.loads(row[0]) + triggers.append(self._deserialize_trigger(trigger_dict)) + return triggers + + async def delete_trigger( + self, runtime_id: str, trigger: UiPathResumeTrigger + ) -> None: + """Delete resume trigger from storage.""" + try: + async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: + await self._apply_connection_pragmas(conn) + + await conn.execute( + """ + DELETE FROM resume_triggers + WHERE runtime_id = ? AND interrupt_id = ? + """, + ( + runtime_id, + trigger.interrupt_id, + ), + ) + await conn.commit() + except aiosqlite.Error as exc: + msg = f"Failed to delete resume trigger from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}" + raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc async def save_context(self, runtime_id: str, context_dict: dict[str, Any]) -> None: """ @@ -107,7 +176,9 @@ async def save_context(self, runtime_id: str, context_dict: dict[str, Any]) -> N context_blob = pickle.dumps(context_dict) try: - async with aiosqlite.connect(self.storage_path) as conn: + async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: + await self._apply_connection_pragmas(conn) + await conn.execute( """ INSERT INTO workflow_contexts (runtime_id, context_data) @@ -133,7 +204,9 @@ async def load_context(self, runtime_id: str) -> dict[str, Any] | None: Serialized workflow context dictionary or None if not found """ try: - async with aiosqlite.connect(self.storage_path) as conn: + async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: + await self._apply_connection_pragmas(conn) + cursor = await conn.execute( "SELECT context_data FROM workflow_contexts WHERE runtime_id = ?", (runtime_id,), @@ -148,6 +221,61 @@ async def load_context(self, runtime_id: str) -> dict[str, Any] | None: return pickle.loads(row[0]) + async def set_value( + self, + runtime_id: str, + namespace: str, + key: str, + value: Any, + ) -> None: + """Save arbitrary key-value pair to database.""" + if not ( + isinstance(value, str) + or isinstance(value, dict) + or isinstance(value, BaseModel) + or value is None + ): + raise TypeError("Value must be str, dict, BaseModel or None.") + + value_text = self._dump_value(value) + + async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: + await self._apply_connection_pragmas(conn) + + await conn.execute( + """ + INSERT INTO runtime_kv (runtime_id, namespace, key, value) + VALUES (?, ?, ?, ?) + ON CONFLICT(runtime_id, namespace, key) + DO UPDATE SET + value = excluded.value, + timestamp = (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')) + """, + (runtime_id, namespace, key, value_text), + ) + await conn.commit() + + async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any: + """Get arbitrary key-value pair from database (scoped by runtime_id + namespace).""" + async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: + await self._apply_connection_pragmas(conn) + + cur = await conn.execute( + """ + SELECT value + FROM runtime_kv + WHERE runtime_id = ? AND namespace = ? AND key = ? + LIMIT 1 + """, + (runtime_id, namespace, key), + ) + row = await cur.fetchone() + + if not row: + return None + + return self._load_value(cast(str | None, row[0])) + def _serialize_trigger(self, trigger: UiPathResumeTrigger) -> dict[str, Any]: """Serialize a resume trigger to a dictionary.""" trigger_key = ( @@ -166,6 +294,7 @@ def _serialize_trigger(self, trigger: UiPathResumeTrigger) -> dict[str, Any]: "key": trigger_key, "name": trigger.trigger_name.value, "payload": payload, + "interrupt_id": trigger.interrupt_id, "folder_path": trigger.folder_path, "folder_key": trigger.folder_key, } @@ -178,6 +307,7 @@ def _deserialize_trigger(self, trigger_data: dict[str, Any]) -> UiPathResumeTrig folder_path = trigger_data.get("folder_path") folder_key = trigger_data.get("folder_key") payload = trigger_data.get("payload") + interrupt_id = trigger_data.get("interrupt_id") resume_trigger = UiPathResumeTrigger( trigger_type=UiPathResumeTriggerType(trigger_type), @@ -186,6 +316,7 @@ def _deserialize_trigger(self, trigger_data: dict[str, Any]) -> UiPathResumeTrig folder_path=folder_path, folder_key=folder_key, payload=payload, + interrupt_id=interrupt_id, ) if resume_trigger.trigger_type == UiPathResumeTriggerType.API: @@ -194,3 +325,28 @@ def _deserialize_trigger(self, trigger_data: dict[str, Any]) -> UiPathResumeTrig ) return resume_trigger + + def _dump_value(self, value: str | dict[str, Any] | BaseModel | None) -> str | None: + if value is None: + return None + if isinstance(value, BaseModel): + return "j:" + json.dumps(value.model_dump()) + if isinstance(value, dict): + return "j:" + json.dumps(value) + return "s:" + value + + def _load_value(self, raw: str | None) -> Any: + if raw is None: + return None + if raw.startswith("s:"): + return raw[2:] + if raw.startswith("j:"): + return json.loads(raw[2:]) + return raw + + async def _apply_connection_pragmas(self, conn: aiosqlite.Connection) -> None: + """Apply per-connection PRAGMA settings for optimal concurrency.""" + await conn.execute("PRAGMA busy_timeout=30000") + await conn.execute("PRAGMA synchronous=NORMAL") + await conn.execute("PRAGMA cache_size=10000") + await conn.execute("PRAGMA temp_store=MEMORY") diff --git a/tests/storage/test_integration.py b/tests/storage/test_integration.py new file mode 100644 index 0000000..c481227 --- /dev/null +++ b/tests/storage/test_integration.py @@ -0,0 +1,473 @@ +"""Additional integration and edge case tests for SQLiteResumableStorage.""" + +import asyncio +import json +from pathlib import Path +from typing import Any + +import pytest +from pydantic import BaseModel +from uipath.runtime import ( + UiPathResumeTrigger, + UiPathResumeTriggerName, + UiPathResumeTriggerType, +) + +from uipath_llamaindex.runtime.storage import SQLiteResumableStorage + + +class ComplexModel(BaseModel): + """Complex nested Pydantic model for testing.""" + + id: str + data: dict[str, Any] + items: list[Any] + nested: dict[str, Any] + + +class TestIntegrationScenarios: + """Test realistic integration scenarios.""" + + @pytest.fixture + async def storage(self, tmp_path: Path): + """Create and setup a storage instance.""" + db_path = tmp_path / "integration_test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + return storage + + @pytest.mark.asyncio + async def test_full_workflow_lifecycle(self, storage: SQLiteResumableStorage): + """Test a complete workflow lifecycle with all storage operations.""" + runtime_id = "workflow-123" + + # 1. Save initial trigger + initial_trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="queue-item-1", + folder_path="/production", + folder_key="prod-folder", + payload={"order_id": "12345"}, + interrupt_id="interrupt-12345", + ) + await storage.save_triggers(runtime_id, [initial_trigger]) + + # 2. Save workflow context + context = { + "current_step": "processing_order", + "order_id": "12345", + "customer_data": {"name": "John Doe", "email": "john@example.com"}, + "items": ["item1", "item2", "item3"], + "total_amount": 299.99, + } + await storage.save_context(runtime_id, context) + + # 3. Save some KV pairs + await storage.set_value( + runtime_id, "metadata", "processed_at", "2024-01-15T10:30:00Z" + ) + await storage.set_value(runtime_id, "status", "current", "in_progress") + await storage.set_value(runtime_id, "metrics", "retry_count", "0") + + # 4. Update workflow context (simulating workflow progress) + updated_context = context.copy() + updated_context["current_step"] = "awaiting_approval" + updated_context["approval_requested_at"] = "2024-01-15T10:35:00Z" + await storage.save_context(runtime_id, updated_context) + + # 5. Save resume trigger (simulating workflow suspension) + resume_trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName.API.value, + item_key="approval-inbox-789", + payload='{"approval_status": "pending"}', + interrupt_id="interrupt-12345", + ) + await storage.save_triggers(runtime_id, [resume_trigger]) + + # 6. Verify all data can be retrieved + triggers = await storage.get_triggers(runtime_id) + loaded_context = await storage.load_context(runtime_id) + processed_at = await storage.get_value(runtime_id, "metadata", "processed_at") + status = await storage.get_value(runtime_id, "status", "current") + + assert triggers is not None + trigger = triggers[0] + assert trigger is not None + assert trigger.trigger_type == UiPathResumeTriggerType.API + assert trigger.item_key == "approval-inbox-789" + assert loaded_context is not None + assert loaded_context["current_step"] == "awaiting_approval" + assert processed_at == "2024-01-15T10:30:00Z" + assert status == "in_progress" + + @pytest.mark.asyncio + async def test_multiple_parallel_workflows(self, storage: SQLiteResumableStorage): + """Test handling multiple parallel workflows.""" + workflows: list[tuple[str, UiPathResumeTrigger | None, dict[str, Any]]] = [] + trigger: UiPathResumeTrigger | None + context: dict[str, Any] | None + + # Create multiple parallel workflows + for i in range(10): + runtime_id = f"workflow-{i}" + + trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key=f"queue-item-{i}", + interrupt_id=f"interrupt-{i}", + ) + + context = { + "workflow_id": i, + "data": f"data-{i}", + "step": i % 3, # Distribute across 3 steps + } + + workflows.append((runtime_id, trigger, context)) + + # Save all workflows concurrently + save_tasks = [] + for runtime_id, trigger, context in workflows: + assert trigger is not None + save_tasks.append(storage.save_triggers(runtime_id, [trigger])) + save_tasks.append(storage.save_context(runtime_id, context)) + save_tasks.append(storage.set_value(runtime_id, "meta", "status", "active")) + + await asyncio.gather(*save_tasks) + + # Verify all workflows were saved correctly + for runtime_id, expected_trigger, expected_context in workflows: + triggers = await storage.get_triggers(runtime_id) + context = await storage.load_context(runtime_id) + status = await storage.get_value(runtime_id, "meta", "status") + + assert expected_trigger is not None + assert triggers is not None + assert triggers[0] is not None + assert triggers[0].item_key == expected_trigger.item_key + assert context == expected_context + assert status == "active" + + @pytest.mark.asyncio + async def test_workflow_state_updates(self, storage: SQLiteResumableStorage): + """Test updating workflow state multiple times.""" + runtime_id = "stateful-workflow" + + # Simulate workflow progressing through multiple states + states = [ + {"step": "initialize", "progress": 0}, + {"step": "validate", "progress": 25}, + {"step": "process", "progress": 50}, + {"step": "verify", "progress": 75}, + {"step": "complete", "progress": 100}, + ] + + for state in states: + await storage.save_context(runtime_id, state) + await storage.set_value( + runtime_id, "workflow", "last_update", state["step"] + ) + + # Final state should be the last one + final_context = await storage.load_context(runtime_id) + last_update = await storage.get_value(runtime_id, "workflow", "last_update") + + assert final_context == {"step": "complete", "progress": 100} + assert last_update == "complete" + + +class TestEdgeCases: + """Test edge cases and boundary conditions.""" + + @pytest.fixture + async def storage(self, tmp_path: Path): + """Create and setup a storage instance.""" + db_path = tmp_path / "edge_test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + return storage + + @pytest.mark.asyncio + async def test_very_large_context(self, storage: SQLiteResumableStorage): + """Test handling of very large context data.""" + runtime_id = "large-context" + + # Create a large context with many keys + large_context = { + f"key_{i}": f"value_{i}" * 100 # Each value is ~600 bytes + for i in range(1000) # 1000 keys + } + + await storage.save_context(runtime_id, large_context) + loaded = await storage.load_context(runtime_id) + + assert loaded is not None + assert len(loaded) == 1000 + assert loaded["key_0"] == "value_0" * 100 + + @pytest.mark.asyncio + async def test_unicode_and_special_characters( + self, storage: SQLiteResumableStorage + ): + """Test handling of Unicode and special characters.""" + runtime_id = "unicode-test" + + # Test various Unicode characters + context = { + "chinese": "你好世界", + "japanese": "こんにちは", + "emoji": "😀🎉🚀", + "arabic": "مرحبا", + "special": "!@#$%^&*()_+-={}[]|\\:;\"'<>,.?/", + } + + await storage.save_context(runtime_id, context) + loaded = await storage.load_context(runtime_id) + + assert loaded == context + + @pytest.mark.asyncio + async def test_empty_string_values(self, storage: SQLiteResumableStorage): + """Test handling of empty strings.""" + runtime_id = "empty-strings" + + await storage.set_value(runtime_id, "ns", "empty", "") + value = await storage.get_value(runtime_id, "ns", "empty") + + assert value == "" + + @pytest.mark.asyncio + async def test_whitespace_only_values(self, storage: SQLiteResumableStorage): + """Test handling of whitespace-only strings.""" + runtime_id = "whitespace" + + test_values = [ + " ", # spaces + "\t\t", # tabs + "\n\n", # newlines + " \t\n ", # mixed + ] + + for i, val in enumerate(test_values): + await storage.set_value(runtime_id, "ws", f"key{i}", val) + + for i, expected in enumerate(test_values): + value = await storage.get_value(runtime_id, "ws", f"key{i}") + assert value == expected + + @pytest.mark.asyncio + async def test_nested_dict_with_none_values(self, storage: SQLiteResumableStorage): + """Test handling of nested dictionaries with None values.""" + runtime_id = "nested-none" + + nested = { + "level1": {"level2": {"value": None, "other": "data"}, "another": None}, + "top_none": None, + } + + await storage.save_context(runtime_id, nested) + loaded = await storage.load_context(runtime_id) + + assert loaded == nested + + @pytest.mark.asyncio + async def test_very_long_runtime_id(self, storage: SQLiteResumableStorage): + """Test handling of very long runtime IDs.""" + runtime_id = "a" * 500 # Very long runtime ID + + trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="test-key", + interrupt_id="interrupt-long", + ) + + await storage.save_triggers(runtime_id, [trigger]) + triggers = await storage.get_triggers(runtime_id) + + assert triggers is not None + assert triggers[0] is not None + assert triggers[0].item_key == "test-key" + + @pytest.mark.asyncio + async def test_special_characters_in_keys(self, storage: SQLiteResumableStorage): + """Test handling of special characters in namespace and key names.""" + runtime_id = "special-chars" + + # Test various special characters + test_cases = [ + ("namespace-with-dashes", "key-with-dashes"), + ("namespace_with_underscores", "key_with_underscores"), + ("namespace.with.dots", "key.with.dots"), + ("namespace:with:colons", "key:with:colons"), + ] + + for ns, key in test_cases: + await storage.set_value(runtime_id, ns, key, f"value-{ns}-{key}") + + for ns, key in test_cases: + value = await storage.get_value(runtime_id, ns, key) + assert value == f"value-{ns}-{key}" + + @pytest.mark.asyncio + async def test_json_with_escaped_characters(self, storage: SQLiteResumableStorage): + """Test handling of JSON with escaped characters.""" + runtime_id = "escaped-json" + + data = { + "quoted": 'String with "quotes"', + "backslash": "Path\\to\\file", + "newline": "Line1\nLine2", + "tab": "Col1\tCol2", + } + + await storage.set_value(runtime_id, "ns", "json", data) + retrieved = await storage.get_value(runtime_id, "ns", "json") + + assert retrieved == data + + @pytest.mark.asyncio + async def test_deeply_nested_structures(self, storage: SQLiteResumableStorage): + """Test handling of deeply nested data structures.""" + runtime_id = "deep-nest" + + # Create a deeply nested structure + def create_nested(depth): + if depth == 0: + return "leaf" + return {"level": depth, "child": create_nested(depth - 1)} + + deep_structure = create_nested(20) + + await storage.save_context(runtime_id, {"root": deep_structure}) + loaded = await storage.load_context(runtime_id) + + assert loaded == {"root": deep_structure} + + @pytest.mark.asyncio + async def test_trigger_with_complex_payload(self, storage: SQLiteResumableStorage): + """Test trigger with complex nested payload.""" + runtime_id = "complex-payload" + + complex_payload = { + "transaction": { + "id": "tx-12345", + "items": [ + {"sku": "ITEM-001", "quantity": 2, "price": 29.99}, + {"sku": "ITEM-002", "quantity": 1, "price": 49.99}, + ], + "customer": { + "id": "CUST-789", + "address": { + "street": "123 Main St", + "city": "Springfield", + "country": "US", + }, + }, + "metadata": { + "tags": ["priority", "express"], + "notes": "Handle with care", + }, + } + } + + trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="complex-item", + payload=complex_payload, + interrupt_id="interrupt-complex", + ) + + await storage.save_triggers(runtime_id, [trigger]) + triggers = await storage.get_triggers(runtime_id) + + assert triggers is not None + assert triggers[0] is not None + assert isinstance(triggers[0].payload, str) + # Payload gets JSON serialized and deserialized + assert json.loads(triggers[0].payload) == complex_payload + + +class TestDataConsistency: + """Test data consistency and isolation.""" + + @pytest.fixture + async def storage(self, tmp_path: Path): + """Create and setup a storage instance.""" + db_path = tmp_path / "consistency_test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + return storage + + @pytest.mark.asyncio + async def test_concurrent_reads(self, storage: SQLiteResumableStorage): + """Test concurrent reads of the same data.""" + runtime_id = "concurrent-read-test" + + context = {"data": "test", "value": 42} + await storage.save_context(runtime_id, context) + + # Perform multiple concurrent reads + results = await asyncio.gather( + *[storage.load_context(runtime_id) for _ in range(20)] + ) + + # All reads should return the same data + for result in results: + assert result == context + + @pytest.mark.asyncio + async def test_kv_isolation_stress_test(self, storage: SQLiteResumableStorage): + """Stress test KV store isolation.""" + tasks = [] + + # Create many KV operations with different combinations + for runtime in range(5): + for namespace in range(5): + for key in range(5): + tasks.append( + storage.set_value( + f"runtime-{runtime}", + f"ns-{namespace}", + f"key-{key}", + f"value-{runtime}-{namespace}-{key}", + ) + ) + + await asyncio.gather(*tasks) + + # Verify each value is correctly isolated + for runtime in range(5): + for namespace in range(5): + for key in range(5): + value = await storage.get_value( + f"runtime-{runtime}", f"ns-{namespace}", f"key-{key}" + ) + expected = f"value-{runtime}-{namespace}-{key}" + assert value == expected, f"Expected {expected}, got {value}" + + @pytest.mark.asyncio + async def test_update_race_condition(self, storage: SQLiteResumableStorage): + """Test that concurrent updates don't cause data corruption.""" + runtime_id = "race-condition-test" + + # Initialize with a value + await storage.set_value(runtime_id, "counter", "value", "0") + + # Simulate concurrent updates (not a real increment, just updates) + async def update_value(suffix): + await storage.set_value(runtime_id, "counter", "value", f"updated-{suffix}") + + await asyncio.gather(*[update_value(i) for i in range(10)]) + + # Final value should be one of the updates (last write wins) + final_value: str = await storage.get_value(runtime_id, "counter", "value") + assert final_value.startswith("updated-") + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/storage/test_storage.py b/tests/storage/test_storage.py new file mode 100644 index 0000000..72f3215 --- /dev/null +++ b/tests/storage/test_storage.py @@ -0,0 +1,740 @@ +"""Tests for SQLiteResumableStorage class.""" + +import json +from pathlib import Path +from typing import Any + +import aiosqlite +import pytest +from pydantic import BaseModel +from uipath.runtime import ( + UiPathApiTrigger, + UiPathResumeTrigger, + UiPathResumeTriggerName, + UiPathResumeTriggerType, +) + +from uipath_llamaindex.runtime.storage import SQLiteResumableStorage + + +class SampleModel(BaseModel): + """Sample Pydantic model for testing.""" + + name: str + value: int + + +class TestSQLiteResumableStorageInitialization: + """Test storage initialization and setup.""" + + @pytest.mark.asyncio + async def test_setup_creates_database_file(self, tmp_path: Path): + """Test that setup creates the database file.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + + await storage.setup() + + assert db_path.exists() + + @pytest.mark.asyncio + async def test_setup_creates_directory_if_missing(self, tmp_path: Path): + """Test that setup creates parent directories if they don't exist.""" + db_path = tmp_path / "subdir" / "another" / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + + await storage.setup() + + assert db_path.exists() + assert db_path.parent.exists() + + @pytest.mark.asyncio + async def test_setup_creates_workflow_contexts_table(self, tmp_path: Path): + """Test that setup creates the workflow_contexts table.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + + await storage.setup() + + async with aiosqlite.connect(str(db_path)) as conn: + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='workflow_contexts'" + ) + result = await cursor.fetchone() + assert result is not None + + @pytest.mark.asyncio + async def test_setup_creates_resume_triggers_table(self, tmp_path: Path): + """Test that setup creates the resume_triggers table.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + + await storage.setup() + + async with aiosqlite.connect(str(db_path)) as conn: + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='resume_triggers'" + ) + result = await cursor.fetchone() + assert result is not None + + @pytest.mark.asyncio + async def test_setup_creates_runtime_kv_table(self, tmp_path: Path): + """Test that setup creates the runtime_kv table.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + + await storage.setup() + + async with aiosqlite.connect(str(db_path)) as conn: + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='runtime_kv'" + ) + result = await cursor.fetchone() + assert result is not None + + @pytest.mark.asyncio + async def test_setup_is_idempotent(self, tmp_path: Path): + """Test that setup can be called multiple times safely.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + + await storage.setup() + await storage.setup() # Should not raise + + assert db_path.exists() + + +class TestTriggerOperations: + """Test resume trigger save and retrieval operations.""" + + @pytest.fixture + async def storage(self, tmp_path: Path): + """Create and setup a storage instance.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + return storage + + @pytest.mark.asyncio + async def test_save_trigger_basic(self, storage: SQLiteResumableStorage): + """Test saving a basic resume trigger.""" + trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="queue-123", + folder_path="/test/folder", + folder_key="folder-456", + payload={"data": "test"}, + interrupt_id="interrupt-789", + ) + + await storage.save_triggers("runtime-1", [trigger]) + + # Verify it was saved + triggers = await storage.get_triggers("runtime-1") + assert triggers is not None + assert triggers[0] is not None + assert triggers[0].trigger_type == UiPathResumeTriggerType.QUEUE_ITEM + assert triggers[0].trigger_name == UiPathResumeTriggerName.QUEUE_ITEM + assert triggers[0].item_key == "queue-123" + + @pytest.mark.asyncio + async def test_save_trigger_with_api_type(self, storage: SQLiteResumableStorage): + """Test saving an API type trigger.""" + trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName.API.value, + item_key="inbox-789", + folder_path="/api/folder", + folder_key="folder-abc", + payload='{"request": "data"}', + interrupt_id="interrupt-123", + ) + trigger.api_resume = UiPathApiTrigger( + inbox_id="inbox-789", request='{"request": "data"}' + ) + + await storage.save_triggers("runtime-2", [trigger]) + + retrieved = await storage.get_triggers("runtime-2") + assert retrieved is not None + assert retrieved[0] is not None + assert retrieved[0].trigger_type == UiPathResumeTriggerType.API + assert retrieved[0].api_resume is not None + assert retrieved[0].api_resume.inbox_id == "inbox-789" + + @pytest.mark.asyncio + async def test_save_trigger_with_dict_payload( + self, storage: SQLiteResumableStorage + ): + """Test saving trigger with dictionary payload.""" + payload_dict = {"key1": "value1", "key2": 123, "nested": {"a": "b"}} + trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="queue-dict", + payload=payload_dict, + interrupt_id="interrupt-456", + ) + + await storage.save_triggers("runtime-3", [trigger]) + + retrieved = await storage.get_triggers("runtime-3") + assert retrieved is not None + assert retrieved[0] is not None + # Payload should be JSON string after serialization/deserialization + assert retrieved[0].payload is not None + + @pytest.mark.asyncio + async def test_save_trigger_with_none_payload( + self, storage: SQLiteResumableStorage + ): + """Test saving trigger with None payload.""" + trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="queue-none", + payload=None, + interrupt_id="interrupt-789", + ) + + await storage.save_triggers("runtime-4", [trigger]) + + retrieved = await storage.get_triggers("runtime-4") + assert retrieved is not None + assert retrieved[0] is not None + assert retrieved[0].payload is None + + @pytest.mark.asyncio + async def test_get_latest_trigger_multiple_triggers( + self, storage: SQLiteResumableStorage + ): + """Test that get_latest_trigger returns the most recent trigger.""" + # Save multiple triggers for the same runtime + trigger1 = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="first", + interrupt_id="interrupt-1", + ) + trigger2 = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="second", + interrupt_id="interrupt-2", + ) + trigger3 = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="third", + interrupt_id="interrupt-3", + ) + + await storage.save_triggers("runtime-5", [trigger1]) + await storage.save_triggers("runtime-5", [trigger2]) + await storage.save_triggers("runtime-5", [trigger3]) + + retrieved = await storage.get_triggers("runtime-5") + assert retrieved is not None + assert retrieved[0] is not None + assert retrieved[0].item_key == "third" + + @pytest.mark.asyncio + async def test_get_latest_trigger_nonexistent( + self, storage: SQLiteResumableStorage + ): + """Test getting trigger for non-existent runtime_id.""" + result = await storage.get_triggers("nonexistent") + assert result is None + + @pytest.mark.asyncio + async def test_save_trigger_different_runtimes( + self, storage: SQLiteResumableStorage + ): + """Test that triggers are properly isolated by runtime_id.""" + trigger1 = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="runtime1-trigger", + interrupt_id="interrupt-1", + ) + trigger2 = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="runtime2-trigger", + interrupt_id="interrupt-2", + ) + + await storage.save_triggers("runtime-a", [trigger1]) + await storage.save_triggers("runtime-b", [trigger2]) + + retrieved_a = await storage.get_triggers("runtime-a") + retrieved_b = await storage.get_triggers("runtime-b") + assert retrieved_a is not None + assert retrieved_b is not None + assert retrieved_a[0] is not None + assert retrieved_b[0] is not None + assert retrieved_a[0].item_key == "runtime1-trigger" + assert retrieved_b[0].item_key == "runtime2-trigger" + + +class TestContextOperations: + """Test workflow context save and load operations.""" + + @pytest.fixture + async def storage(self, tmp_path: Path): + """Create and setup a storage instance.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + return storage + + @pytest.mark.asyncio + async def test_save_and_load_context_basic(self, storage: SQLiteResumableStorage): + """Test saving and loading a basic context.""" + context = {"step": 1, "data": "test data", "flags": {"active": True}} + + await storage.save_context("runtime-1", context) + loaded = await storage.load_context("runtime-1") + + assert loaded == context + + @pytest.mark.asyncio + async def test_save_and_load_context_complex(self, storage: SQLiteResumableStorage): + """Test saving and loading complex context with nested structures.""" + context = { + "variables": {"counter": 42, "name": "test", "items": [1, 2, 3, 4, 5]}, + "state": { + "current_step": "processing", + "metadata": {"created": "2024-01-01", "tags": ["tag1", "tag2"]}, + }, + } + + await storage.save_context("runtime-2", context) + loaded = await storage.load_context("runtime-2") + + assert loaded == context + + @pytest.mark.asyncio + async def test_save_context_overwrites_existing( + self, storage: SQLiteResumableStorage + ): + """Test that saving context overwrites existing context.""" + context1 = {"step": 1} + context2 = {"step": 2, "new_field": "value"} + + await storage.save_context("runtime-3", context1) + await storage.save_context("runtime-3", context2) + + loaded = await storage.load_context("runtime-3") + assert loaded == context2 + assert loaded != context1 + + @pytest.mark.asyncio + async def test_load_context_nonexistent(self, storage: SQLiteResumableStorage): + """Test loading context for non-existent runtime_id.""" + result = await storage.load_context("nonexistent") + assert result is None + + @pytest.mark.asyncio + async def test_save_context_empty_dict(self, storage: SQLiteResumableStorage): + """Test saving empty dictionary as context.""" + context: dict[str, Any] = {} + + await storage.save_context("runtime-4", context) + loaded = await storage.load_context("runtime-4") + + assert loaded == {} + + @pytest.mark.asyncio + async def test_contexts_isolated_by_runtime_id( + self, storage: SQLiteResumableStorage + ): + """Test that contexts are properly isolated by runtime_id.""" + context_a = {"runtime": "a", "value": 100} + context_b = {"runtime": "b", "value": 200} + + await storage.save_context("runtime-a", context_a) + await storage.save_context("runtime-b", context_b) + + loaded_a = await storage.load_context("runtime-a") + loaded_b = await storage.load_context("runtime-b") + + assert loaded_a == context_a + assert loaded_b == context_b + + +class TestKeyValueOperations: + """Test key-value storage operations.""" + + @pytest.fixture + async def storage(self, tmp_path: Path): + """Create and setup a storage instance.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + return storage + + @pytest.mark.asyncio + async def test_set_and_get_string_value(self, storage: SQLiteResumableStorage): + """Test setting and getting a string value.""" + await storage.set_value("runtime-1", "namespace1", "key1", "test_value") + + value = await storage.get_value("runtime-1", "namespace1", "key1") + assert value == "test_value" + + @pytest.mark.asyncio + async def test_set_and_get_dict_value(self, storage: SQLiteResumableStorage): + """Test setting and getting a dictionary value.""" + test_dict = {"name": "John", "age": 30, "active": True} + + await storage.set_value("runtime-2", "namespace2", "key2", test_dict) + + value = await storage.get_value("runtime-2", "namespace2", "key2") + assert value == test_dict + + @pytest.mark.asyncio + async def test_set_and_get_pydantic_model(self, storage: SQLiteResumableStorage): + """Test setting and getting a Pydantic model.""" + model = SampleModel(name="test", value=42) + + await storage.set_value("runtime-3", "namespace3", "key3", model) + + value = await storage.get_value("runtime-3", "namespace3", "key3") + assert value == model.model_dump() + + @pytest.mark.asyncio + async def test_set_and_get_none_value(self, storage: SQLiteResumableStorage): + """Test setting and getting None value.""" + await storage.set_value("runtime-4", "namespace4", "key4", None) + + value = await storage.get_value("runtime-4", "namespace4", "key4") + assert value is None + + @pytest.mark.asyncio + async def test_set_value_invalid_type(self, storage: SQLiteResumableStorage): + """Test that setting invalid type raises TypeError.""" + with pytest.raises( + TypeError, match="Value must be str, dict, BaseModel or None" + ): + await storage.set_value("runtime-5", "namespace5", "key5", 123) + + with pytest.raises( + TypeError, match="Value must be str, dict, BaseModel or None" + ): + await storage.set_value("runtime-5", "namespace5", "key5", [1, 2, 3]) + + @pytest.mark.asyncio + async def test_set_value_overwrites_existing(self, storage: SQLiteResumableStorage): + """Test that setting a value overwrites existing value.""" + await storage.set_value("runtime-6", "namespace6", "key6", "first") + await storage.set_value("runtime-6", "namespace6", "key6", "second") + + value = await storage.get_value("runtime-6", "namespace6", "key6") + assert value == "second" + + @pytest.mark.asyncio + async def test_get_value_nonexistent(self, storage: SQLiteResumableStorage): + """Test getting non-existent value returns None.""" + value = await storage.get_value("nonexistent", "namespace", "key") + assert value is None + + @pytest.mark.asyncio + async def test_values_isolated_by_runtime_id(self, storage: SQLiteResumableStorage): + """Test that values are isolated by runtime_id.""" + await storage.set_value("runtime-a", "ns", "key", "value-a") + await storage.set_value("runtime-b", "ns", "key", "value-b") + + value_a = await storage.get_value("runtime-a", "ns", "key") + value_b = await storage.get_value("runtime-b", "ns", "key") + + assert value_a == "value-a" + assert value_b == "value-b" + + @pytest.mark.asyncio + async def test_values_isolated_by_namespace(self, storage: SQLiteResumableStorage): + """Test that values are isolated by namespace.""" + await storage.set_value("runtime-1", "ns-a", "key", "value-a") + await storage.set_value("runtime-1", "ns-b", "key", "value-b") + + value_a = await storage.get_value("runtime-1", "ns-a", "key") + value_b = await storage.get_value("runtime-1", "ns-b", "key") + + assert value_a == "value-a" + assert value_b == "value-b" + + @pytest.mark.asyncio + async def test_values_isolated_by_key(self, storage: SQLiteResumableStorage): + """Test that values are isolated by key.""" + await storage.set_value("runtime-1", "ns", "key-a", "value-a") + await storage.set_value("runtime-1", "ns", "key-b", "value-b") + + value_a = await storage.get_value("runtime-1", "ns", "key-a") + value_b = await storage.get_value("runtime-1", "ns", "key-b") + + assert value_a == "value-a" + assert value_b == "value-b" + + +class TestSerializationMethods: + """Test internal serialization/deserialization methods.""" + + @pytest.fixture + async def storage(self, tmp_path: Path): + """Create a storage instance.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + return storage + + def test_serialize_trigger_queue_type(self, storage: SQLiteResumableStorage): + """Test serialization of queue type trigger.""" + trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="queue-123", + folder_path="/folder", + folder_key="folder-key", + payload={"test": "data"}, + interrupt_id="interrupt-456", + ) + + serialized = storage._serialize_trigger(trigger) + + assert serialized["type"] == UiPathResumeTriggerType.QUEUE_ITEM.value + assert serialized["key"] == "queue-123" + assert serialized["name"] == UiPathResumeTriggerName.QUEUE_ITEM.value + assert serialized["folder_path"] == "/folder" + assert serialized["folder_key"] == "folder-key" + assert serialized["interrupt_id"] == "interrupt-456" + assert json.loads(serialized["payload"]) == {"test": "data"} + + def test_serialize_trigger_api_type(self, storage: SQLiteResumableStorage): + """Test serialization of API type trigger.""" + trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName.API.value, + item_key="inbox-456", + payload="string payload", + interrupt_id="interrupt-123", + ) + trigger.api_resume = UiPathApiTrigger( + inbox_id="inbox-456", request="string payload" + ) + + serialized = storage._serialize_trigger(trigger) + + assert serialized["type"] == UiPathResumeTriggerType.API.value + assert serialized["key"] == "inbox-456" + assert serialized["payload"] == "string payload" + assert serialized["interrupt_id"] == "interrupt-123" + + def test_deserialize_trigger_queue_type(self, storage: SQLiteResumableStorage): + """Test deserialization of queue type trigger.""" + trigger_data = { + "type": UiPathResumeTriggerType.QUEUE_ITEM.value, + "key": "queue-789", + "name": UiPathResumeTriggerName.QUEUE_ITEM.value, + "folder_path": "/test", + "folder_key": "folder-123", + "payload": '{"key": "value"}', + } + + trigger = storage._deserialize_trigger(trigger_data) + + assert trigger is not None + assert trigger.trigger_type == UiPathResumeTriggerType.QUEUE_ITEM + assert trigger.trigger_name == UiPathResumeTriggerName.QUEUE_ITEM + assert trigger.item_key == "queue-789" + assert trigger.folder_path == "/test" + assert trigger.folder_key == "folder-123" + + def test_deserialize_trigger_api_type(self, storage: SQLiteResumableStorage): + """Test deserialization of API type trigger.""" + trigger_data = { + "type": UiPathResumeTriggerType.API.value, + "key": "inbox-abc", + "name": UiPathResumeTriggerName.API.value, + "payload": "request data", + } + + trigger = storage._deserialize_trigger(trigger_data) + + assert trigger.trigger_type == UiPathResumeTriggerType.API + assert trigger.api_resume is not None + assert trigger.api_resume.inbox_id == "inbox-abc" + assert trigger.api_resume.request == "request data" + + def test_dump_value_string(self, storage: SQLiteResumableStorage): + """Test _dump_value with string.""" + result = storage._dump_value("test string") + assert result == "s:test string" + + def test_dump_value_dict(self, storage: SQLiteResumableStorage): + """Test _dump_value with dictionary.""" + result = storage._dump_value({"key": "value"}) + assert result == 'j:{"key": "value"}' + + def test_dump_value_pydantic_model(self, storage: SQLiteResumableStorage): + """Test _dump_value with Pydantic model.""" + model = SampleModel(name="test", value=42) + result = storage._dump_value(model) + assert result == 'j:{"name": "test", "value": 42}' + + def test_dump_value_none(self, storage: SQLiteResumableStorage): + """Test _dump_value with None.""" + result = storage._dump_value(None) + assert result is None + + def test_load_value_string(self, storage: SQLiteResumableStorage): + """Test _load_value with string.""" + result = storage._load_value("s:test string") + assert result == "test string" + + def test_load_value_json(self, storage: SQLiteResumableStorage): + """Test _load_value with JSON.""" + result = storage._load_value('j:{"key": "value"}') + assert result == {"key": "value"} + + def test_load_value_none(self, storage: SQLiteResumableStorage): + """Test _load_value with None.""" + result = storage._load_value(None) + assert result is None + + def test_load_value_raw_string(self, storage: SQLiteResumableStorage): + """Test _load_value with raw string (no prefix).""" + result = storage._load_value("raw string") + assert result == "raw string" + + +class TestErrorHandling: + """Test error handling and edge cases.""" + + @pytest.mark.asyncio + async def test_context_with_non_picklable_object(self, tmp_path: Path): + """Test handling of non-picklable objects in context.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + + # Lambda functions are not picklable + context = {"func": lambda x: x + 1} + + with pytest.raises( + (AttributeError, TypeError) + ): # pickle.PicklingError or similar + await storage.save_context("runtime-1", context) + + @pytest.mark.asyncio + async def test_multiple_concurrent_operations(self, tmp_path: Path): + """Test that multiple concurrent operations work correctly.""" + import asyncio + + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + + async def save_trigger(runtime_id: str, key: str): + trigger = UiPathResumeTrigger( + interrupt_id="interrupt-1", + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key=key, + ) + await storage.save_triggers(runtime_id, [trigger]) + + # Run multiple saves concurrently + await asyncio.gather( + save_trigger("runtime-1", "key-1"), + save_trigger("runtime-2", "key-2"), + save_trigger("runtime-3", "key-3"), + ) + + # Verify all were saved + trigger1 = await storage.get_triggers("runtime-1") + trigger2 = await storage.get_triggers("runtime-2") + trigger3 = await storage.get_triggers("runtime-3") + + assert trigger1 is not None + assert trigger2 is not None + assert trigger3 is not None + assert trigger1[0] is not None + assert trigger2[0] is not None + assert trigger3[0] is not None + assert trigger1[0].item_key == "key-1" + assert trigger2[0].item_key == "key-2" + assert trigger3[0].item_key == "key-3" + + +class TestDatabaseSchema: + """Test database schema and constraints.""" + + @pytest.mark.asyncio + async def test_runtime_kv_primary_key_constraint(self, tmp_path: Path): + """Test that runtime_kv primary key constraint works.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + + # First insert + await storage.set_value("runtime-1", "ns", "key", "value1") + + # Second insert with same primary key should update, not fail + await storage.set_value("runtime-1", "ns", "key", "value2") + + value = await storage.get_value("runtime-1", "ns", "key") + assert value == "value2" + + @pytest.mark.asyncio + async def test_workflow_contexts_primary_key_constraint(self, tmp_path: Path): + """Test that workflow_contexts primary key constraint works.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + + # First save + await storage.save_context("runtime-1", {"step": 1}) + + # Second save with same runtime_id should update, not fail + await storage.save_context("runtime-1", {"step": 2}) + + context = await storage.load_context("runtime-1") + assert context == {"step": 2} + + @pytest.mark.asyncio + async def test_resume_triggers_autoincrement(self, tmp_path: Path): + """Test that resume_triggers id autoincrement works.""" + db_path = tmp_path / "test.db" + storage = SQLiteResumableStorage(str(db_path)) + await storage.setup() + + trigger1 = UiPathResumeTrigger( + interrupt_id="interrupt-1", + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="key-1", + ) + trigger2 = UiPathResumeTrigger( + interrupt_id="interrupt-2", + trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, + trigger_name=UiPathResumeTriggerName.QUEUE_ITEM.value, + item_key="key-2", + ) + + await storage.save_triggers("runtime-1", [trigger1, trigger2]) + + # Verify both were saved (autoincrement allowed multiple rows) + async with aiosqlite.connect(str(db_path)) as conn: + cursor = await conn.execute( + "SELECT COUNT(*) FROM resume_triggers WHERE runtime_id = ?", + ("runtime-1",), + ) + row = await cursor.fetchone() + assert row is not None + count = row[0] + assert count == 2 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/uv.lock b/uv.lock index 9ba3e13..20a38b0 100644 --- a/uv.lock +++ b/uv.lock @@ -2724,6 +2724,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" }, ] +[[package]] +name = "pytest-asyncio" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, + { name = "typing-extensions", marker = "python_full_version < '3.13'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/90/2c/8af215c0f776415f3590cac4f9086ccefd6fd463befeae41cd4d3f193e5a/pytest_asyncio-1.3.0.tar.gz", hash = "sha256:d7f52f36d231b80ee124cd216ffb19369aa168fc10095013c6b014a34d3ee9e5", size = 50087, upload-time = "2025-11-10T16:07:47.256Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" }, +] + [[package]] name = "pytest-cov" version = "7.0.0" @@ -3312,7 +3325,7 @@ wheels = [ [[package]] name = "uipath" -version = "2.2.36" +version = "2.3.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "click" }, @@ -3332,9 +3345,9 @@ dependencies = [ { name = "uipath-core" }, { name = "uipath-runtime" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/86/a9/6fe298f6de80540162440755ca63ba2e745752909a6bf1be214c7155a955/uipath-2.2.36.tar.gz", hash = "sha256:4f73d134e195caf04860bf65f24704a024632f88ee5392f1d039071ef50075c1", size = 3423899, upload-time = "2025-12-19T02:01:37.34Z" } +sdist = { url = "https://files.pythonhosted.org/packages/13/b6/10e30406786dce197d1f7235811ff19440793c835097376606c6500b3242/uipath-2.3.0.tar.gz", hash = "sha256:f46f034d5c29dd86240324118e2ebe51c48bd6bf393081e2ed3c6015963a7d2b", size = 3431649, upload-time = "2025-12-26T10:22:00.273Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/56/c7/9423ba674519d79793a74403e5e6b4fa791e2f177987cf3876cd7d3a84c0/uipath-2.2.36-py3-none-any.whl", hash = "sha256:a67f855bf3e1be6b290d1ba37808c65e76af2825661767c8facdd94b6245f4ed", size = 393750, upload-time = "2025-12-19T02:01:35.304Z" }, + { url = "https://files.pythonhosted.org/packages/f6/46/b77126a490a47251ac3deff0a88cbdf8c5ac1c69ebf2c71649e25b0999c1/uipath-2.3.0-py3-none-any.whl", hash = "sha256:3df06dca5ffc6304a3a5c61d128ea3c07e39267d2dd515b68c39f145b209aa26", size = 398558, upload-time = "2025-12-26T10:21:58.154Z" }, ] [[package]] @@ -3353,7 +3366,7 @@ wheels = [ [[package]] name = "uipath-llamaindex" -version = "0.1.8" +version = "0.2.0" source = { editable = "." } dependencies = [ { name = "aiosqlite" }, @@ -3362,6 +3375,7 @@ dependencies = [ { name = "llama-index-llms-azure-openai" }, { name = "openinference-instrumentation-llama-index" }, { name = "uipath" }, + { name = "uipath-runtime" }, ] [package.optional-dependencies] @@ -3382,6 +3396,7 @@ dev = [ { name = "numpy" }, { name = "pre-commit" }, { name = "pytest" }, + { name = "pytest-asyncio" }, { name = "pytest-cov" }, { name = "pytest-mock" }, { name = "ruff" }, @@ -3400,7 +3415,8 @@ requires-dist = [ { name = "llama-index-llms-bedrock-converse", marker = "extra == 'bedrock'", specifier = ">=0.3.0" }, { name = "llama-index-llms-google-genai", marker = "extra == 'vertex'", specifier = ">=0.8.0" }, { name = "openinference-instrumentation-llama-index", specifier = ">=4.3.9" }, - { name = "uipath", specifier = ">=2.2.26,<2.3.0" }, + { name = "uipath", specifier = ">=2.3.0,<2.4.0" }, + { name = "uipath-runtime", specifier = ">=0.3.2,<0.4.0" }, ] provides-extras = ["bedrock", "vertex"] @@ -3410,6 +3426,7 @@ dev = [ { name = "numpy", specifier = ">=1.24.0" }, { name = "pre-commit", specifier = ">=4.1.0" }, { name = "pytest", specifier = ">=7.4.0" }, + { name = "pytest-asyncio", specifier = ">=1.0.0" }, { name = "pytest-cov", specifier = ">=4.1.0" }, { name = "pytest-mock", specifier = ">=3.11.1" }, { name = "ruff", specifier = ">=0.9.4" }, @@ -3417,14 +3434,14 @@ dev = [ [[package]] name = "uipath-runtime" -version = "0.2.9" +version = "0.3.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "uipath-core" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/8d/61/52c1dca619f6903a5487265d890088f77e4af2b5d6bb13f5ddeec18fff63/uipath_runtime-0.2.9.tar.gz", hash = "sha256:551314539392e866aacffd9ee063324cd6a976d75fa1c7bfdc8dad756fbbb937", size = 96159, upload-time = "2025-12-15T11:04:15.645Z" } +sdist = { url = "https://files.pythonhosted.org/packages/4c/c1/3ebede98a90e5fc761bacbca65960238062a515efcdac78d7e862d5a3e07/uipath_runtime-0.3.2.tar.gz", hash = "sha256:16c07626c656f0db70dd0c4d3b9b8c58b9eda38af4d29309e5bfd11533b77b96", size = 99557, upload-time = "2025-12-26T13:37:31.061Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/97/b1/238ee2c81205bef4bc694151bc1c9863393fdb107d328240bdcb3ed51586/uipath_runtime-0.2.9-py3-none-any.whl", hash = "sha256:30555d0a1184eb8926d5e54bafcbf03e1355fca879aa2eeede6583a608f7b460", size = 37002, upload-time = "2025-12-15T11:04:14.111Z" }, + { url = "https://files.pythonhosted.org/packages/ea/ab/92b79fc1e2422a123ae80cced3c5f46ef9c21674498fad60fd4b284dc2d7/uipath_runtime-0.3.2-py3-none-any.whl", hash = "sha256:e30d250e80789a5f4ca62322d6d8574d0a4a4fd1610ed754a07296b0d96b7452", size = 38319, upload-time = "2025-12-26T13:37:29.432Z" }, ] [[package]]