diff --git a/src/uipath_llamaindex/runtime/_sqlite.py b/src/uipath_llamaindex/runtime/_sqlite.py new file mode 100644 index 0000000..fed31c5 --- /dev/null +++ b/src/uipath_llamaindex/runtime/_sqlite.py @@ -0,0 +1,190 @@ +"""Async SQLite connection manager with automatic serialization.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Iterable +from contextlib import asynccontextmanager +from sqlite3 import Row +from typing import Any, AsyncIterator + +import aiosqlite + + +class AsyncSqlite: + """Async SQLite wrapper with automatic serialization via locks. + + Provides thread-safe access to a SQLite database using asyncio locks + to serialize operations. Maintains a single connection and ensures + proper WAL mode configuration. + """ + + def __init__(self, db_path: str, timeout: float = 30.0): + """ + Initialize AsyncSQLite manager. + + Args: + db_path: Path to the SQLite database file + timeout: Database connection timeout in seconds + """ + self.db_path = db_path + self.timeout = timeout + self.conn: aiosqlite.Connection | None = None + self.lock = asyncio.Lock() + self.is_setup = False + + async def __aenter__(self) -> AsyncSqlite: + """Async context manager entry.""" + await self.connect() + return self + + async def __aexit__(self, *args) -> None: + """Async context manager exit.""" + await self.close() + + async def connect(self) -> None: + """Establish database connection and apply initial pragmas.""" + if self.conn is not None: + return + + self.conn = await aiosqlite.connect(self.db_path, timeout=self.timeout) + await self._apply_connection_pragmas() + + # WAL mode is persistent, set once + await self.conn.execute("PRAGMA journal_mode=WAL") + await self.conn.commit() + + async def close(self) -> None: + """Close database connection.""" + if self.conn: + await self.conn.close() + self.conn = None + self.is_setup = False + + async def execute( + self, query: str, parameters: tuple[Any, ...] | None = None + ) -> aiosqlite.Cursor: + """ + Execute a single query with automatic locking. + + Args: + query: SQL query to execute + parameters: Query parameters + + Returns: + Cursor with query results + """ + if self.conn is None: + await self.connect() + + assert self.conn is not None + + async with self.lock: + return await self.conn.execute(query, parameters or ()) + + async def executemany( + self, query: str, parameters_list: list[tuple[Any, ...]] + ) -> None: + """ + Execute a query multiple times with different parameters. + + Args: + query: SQL query to execute + parameters_list: List of parameter tuples + """ + if self.conn is None: + await self.connect() + + assert self.conn is not None + + async with self.lock: + await self.conn.executemany(query, parameters_list) + await self.conn.commit() + + async def executescript(self, script: str) -> None: + """ + Execute a SQL script (multiple statements). + + Args: + script: SQL script to execute + """ + if self.conn is None: + await self.connect() + + assert self.conn is not None + + async with self.lock: + await self.conn.executescript(script) + await self.conn.commit() + + async def commit(self) -> None: + """Commit the current transaction.""" + if self.conn is None: + return + + assert self.conn is not None + + async with self.lock: + await self.conn.commit() + + @asynccontextmanager + async def cursor(self) -> AsyncIterator[aiosqlite.Cursor]: + """ + Get a cursor with automatic locking. + + Yields: + Database cursor + """ + if self.conn is None: + await self.connect() + + assert self.conn is not None + + async with self.lock: + cursor = await self.conn.cursor() + try: + yield cursor + finally: + await cursor.close() + + async def fetchone( + self, query: str, parameters: tuple[Any, ...] | None = None + ) -> Row | None: + """ + Execute query and fetch one result. + + Args: + query: SQL query to execute + parameters: Query parameters + + Returns: + Single row or None + """ + cursor = await self.execute(query, parameters) + return await cursor.fetchone() + + async def fetchall( + self, query: str, parameters: tuple[Any, ...] | None = None + ) -> Iterable[Row]: + """ + Execute query and fetch all results. + + Args: + query: SQL query to execute + parameters: Query parameters + + Returns: + List of rows + """ + cursor = await self.execute(query, parameters) + return await cursor.fetchall() + + async def _apply_connection_pragmas(self) -> None: + """Apply per-connection PRAGMA settings for optimal concurrency.""" + if self.conn is None: + return + + await self.conn.execute(f"PRAGMA busy_timeout={int(self.timeout * 1000)}") + await self.conn.execute("PRAGMA synchronous=NORMAL") + await self.conn.execute("PRAGMA cache_size=10000") + await self.conn.execute("PRAGMA temp_store=MEMORY") diff --git a/src/uipath_llamaindex/runtime/factory.py b/src/uipath_llamaindex/runtime/factory.py index 8e550f3..8975ed6 100644 --- a/src/uipath_llamaindex/runtime/factory.py +++ b/src/uipath_llamaindex/runtime/factory.py @@ -26,7 +26,7 @@ UiPathLlamaIndexRuntimeError, ) from uipath_llamaindex.runtime.runtime import UiPathLlamaIndexRuntime -from uipath_llamaindex.runtime.storage import SQLiteResumableStorage +from uipath_llamaindex.runtime.storage import SqliteResumableStorage from uipath_llamaindex.runtime.workflow import LlamaIndexWorkflowLoader @@ -51,7 +51,7 @@ def __init__( self._workflow_lock = asyncio.Lock() self._storage_lock = asyncio.Lock() - self._storage: SQLiteResumableStorage | None = None + self._storage: SqliteResumableStorage | None = None self._setup_instrumentation(self.context.trace_manager) @@ -80,7 +80,7 @@ def _get_storage_path(self) -> str: os.makedirs(os.path.dirname(default_path), exist_ok=True) return default_path - async def _get_storage(self) -> SQLiteResumableStorage: + async def _get_storage(self) -> SqliteResumableStorage: """Get or create the shared storage instance.""" if self._storage is not None: return self._storage @@ -90,7 +90,7 @@ async def _get_storage(self) -> SQLiteResumableStorage: return self._storage storage_path = self._get_storage_path() - self._storage = SQLiteResumableStorage(storage_path) + self._storage = SqliteResumableStorage(storage_path) await self._storage.setup() return self._storage @@ -291,3 +291,7 @@ async def dispose(self) -> None: self._workflow_loaders.clear() self._workflow_cache.clear() + + if self._storage: + await self._storage.dispose() + self._storage = None diff --git a/src/uipath_llamaindex/runtime/runtime.py b/src/uipath_llamaindex/runtime/runtime.py index 85716d3..fce571f 100644 --- a/src/uipath_llamaindex/runtime/runtime.py +++ b/src/uipath_llamaindex/runtime/runtime.py @@ -47,7 +47,7 @@ UiPathLlamaIndexRuntimeError, ) from uipath_llamaindex.runtime.schema import get_entrypoints_schema, get_workflow_schema -from uipath_llamaindex.runtime.storage import SQLiteResumableStorage +from uipath_llamaindex.runtime.storage import SqliteResumableStorage from ._serialize import serialize_output @@ -62,7 +62,7 @@ def __init__( workflow: Workflow, runtime_id: str | None = None, entrypoint: str | None = None, - storage: SQLiteResumableStorage | None = None, + storage: SqliteResumableStorage | None = None, debug_mode: bool = False, ): """ @@ -76,7 +76,7 @@ def __init__( self.workflow: Workflow = workflow self.runtime_id: str = runtime_id or "default" self.entrypoint: str | None = entrypoint - self.storage: SQLiteResumableStorage | None = storage + self.storage: SqliteResumableStorage | None = storage self.debug_mode: bool = debug_mode self._context: Context | None = None diff --git a/src/uipath_llamaindex/runtime/storage.py b/src/uipath_llamaindex/runtime/storage.py index 2a5c99f..b406868 100644 --- a/src/uipath_llamaindex/runtime/storage.py +++ b/src/uipath_llamaindex/runtime/storage.py @@ -1,11 +1,12 @@ """SQLite implementation of UiPathResumableStorageProtocol.""" +from __future__ import annotations + import json import os import pickle from typing import Any, cast -import aiosqlite from pydantic import BaseModel from uipath.core.errors import ErrorCategory, UiPathFaultedTriggerError from uipath.runtime import ( @@ -15,8 +16,10 @@ UiPathResumeTriggerType, ) +from ._sqlite import AsyncSqlite + -class SQLiteResumableStorage: +class SqliteResumableStorage: """SQLite database storage for resume triggers and workflow context.""" def __init__(self, storage_path: str): @@ -27,6 +30,29 @@ def __init__(self, storage_path: str): storage_path: Path to the SQLite database file """ self.storage_path = storage_path + self._db: AsyncSqlite | None = None + + async def _get_db(self) -> AsyncSqlite: + """Get or create database connection.""" + if self._db is None: + self._db = AsyncSqlite(self.storage_path, timeout=30.0) + await self._db.connect() + return self._db + + async def dispose(self) -> None: + """Dispose of the storage and close database connection.""" + if self._db: + await self._db.close() + self._db = None + + async def __aenter__(self) -> SqliteResumableStorage: + """Async context manager entry.""" + await self.setup() + return self + + async def __aexit__(self, *args) -> None: + """Async context manager exit.""" + await self.dispose() async def setup(self) -> None: """Ensure storage directory and database tables exist.""" @@ -35,54 +61,50 @@ async def setup(self) -> None: os.makedirs(dir_name, exist_ok=True) try: - async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: - await self._apply_connection_pragmas(conn) - - # WAL mode is persistent (stored in DB file), only needs to be set once - await conn.execute("PRAGMA journal_mode=WAL") - - # Table for workflow contexts - await conn.execute(""" - CREATE TABLE IF NOT EXISTS workflow_contexts ( - runtime_id TEXT PRIMARY KEY, - context_data BLOB NOT NULL - ) - """) - - # Table for resume triggers - await conn.execute(""" - CREATE TABLE IF NOT EXISTS resume_triggers ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - runtime_id TEXT NOT NULL, - interrupt_id TEXT NOT NULL, - trigger_data TEXT NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """) - - await conn.execute( - """ - CREATE INDEX IF NOT EXISTS idx_resume_triggers_runtime_id - ON resume_triggers(runtime_id) - """ + db = await self._get_db() + + # Table for workflow contexts + await db.execute(""" + CREATE TABLE IF NOT EXISTS workflow_contexts ( + runtime_id TEXT PRIMARY KEY, + context_data BLOB NOT NULL ) + """) + + # Table for resume triggers + await db.execute(""" + CREATE TABLE IF NOT EXISTS resume_triggers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + runtime_id TEXT NOT NULL, + interrupt_id TEXT NOT NULL, + trigger_data TEXT NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) - await conn.execute( - """ - CREATE TABLE IF NOT EXISTS runtime_kv ( - runtime_id TEXT NOT NULL, - namespace TEXT NOT NULL, - key TEXT NOT NULL, - value TEXT, - timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')), - PRIMARY KEY (runtime_id, namespace, key) - ) - """ + await db.execute( + """ + CREATE INDEX IF NOT EXISTS idx_resume_triggers_runtime_id + ON resume_triggers(runtime_id) + """ + ) + + await db.execute( + """ + CREATE TABLE IF NOT EXISTS runtime_kv ( + runtime_id TEXT NOT NULL, + namespace TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT, + timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')), + PRIMARY KEY (runtime_id, namespace, key) ) + """ + ) - await conn.commit() - except aiosqlite.Error as exc: - msg = f"Failed to initialize SQLite storage at {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}" + await db.commit() + except Exception as exc: + msg = f"Failed to initialize SQLite storage at {self.storage_path!r}: {exc}" raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc async def save_triggers( @@ -90,47 +112,41 @@ async def save_triggers( ) -> None: """Save resume trigger to SQLite database.""" try: - async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: - await self._apply_connection_pragmas(conn) - - # Delete all existing triggers for this runtime_id - await conn.execute( - """ - DELETE FROM resume_triggers - WHERE runtime_id = ? - """, - (runtime_id,), - ) - # Insert new triggers - for trigger in triggers: - trigger_dict = self._serialize_trigger(trigger) - trigger_json = json.dumps(trigger_dict) - await conn.execute( - "INSERT INTO resume_triggers (runtime_id, interrupt_id, trigger_data) VALUES (?, ?, ?)", - (runtime_id, trigger.interrupt_id, trigger_json), - ) - await conn.commit() - except aiosqlite.Error as exc: - msg = ( - f"Failed to save resume triggers " - f"to database {self.storage_path!r}:" - f" {exc.sqlite_errorname} {exc.sqlite_errorcode}" + db = await self._get_db() + + # Delete all existing triggers for this runtime_id + await db.execute( + """ + DELETE FROM resume_triggers + WHERE runtime_id = ? + """, + (runtime_id,), ) + + # Insert new triggers + for trigger in triggers: + trigger_dict = self._serialize_trigger(trigger) + trigger_json = json.dumps(trigger_dict) + await db.execute( + "INSERT INTO resume_triggers (runtime_id, interrupt_id, trigger_data) VALUES (?, ?, ?)", + (runtime_id, trigger.interrupt_id, trigger_json), + ) + + await db.commit() + except Exception as exc: + msg = f"Failed to save resume triggers to database {self.storage_path!r}: {exc}" raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None: """Get most recent trigger from SQLite database.""" try: - async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: - await self._apply_connection_pragmas(conn) - - cursor = await conn.execute( - "SELECT trigger_data FROM resume_triggers WHERE runtime_id = ? ORDER BY id ASC", - (runtime_id,), - ) - rows = await cursor.fetchall() - except aiosqlite.Error as exc: - msg = f"Failed to retrieve resume triggers from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}" + db = await self._get_db() + rows = await db.fetchall( + "SELECT trigger_data FROM resume_triggers WHERE runtime_id = ? ORDER BY id ASC", + (runtime_id,), + ) + except Exception as exc: + msg = f"Failed to retrieve resume triggers from database {self.storage_path!r}: {exc}" raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc if not rows: @@ -147,22 +163,17 @@ async def delete_trigger( ) -> None: """Delete resume trigger from storage.""" try: - async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: - await self._apply_connection_pragmas(conn) - - await conn.execute( - """ - DELETE FROM resume_triggers - WHERE runtime_id = ? AND interrupt_id = ? - """, - ( - runtime_id, - trigger.interrupt_id, - ), - ) - await conn.commit() - except aiosqlite.Error as exc: - msg = f"Failed to delete resume trigger from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}" + db = await self._get_db() + await db.execute( + """ + DELETE FROM resume_triggers + WHERE runtime_id = ? AND interrupt_id = ? + """, + (runtime_id, trigger.interrupt_id), + ) + await db.commit() + except Exception as exc: + msg = f"Failed to delete resume trigger from database {self.storage_path!r}: {exc}" raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc async def save_context(self, runtime_id: str, context_dict: dict[str, Any]) -> None: @@ -176,21 +187,19 @@ async def save_context(self, runtime_id: str, context_dict: dict[str, Any]) -> N context_blob = pickle.dumps(context_dict) try: - async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: - await self._apply_connection_pragmas(conn) - - await conn.execute( - """ - INSERT INTO workflow_contexts (runtime_id, context_data) - VALUES (?, ?) - ON CONFLICT(runtime_id) DO UPDATE SET - context_data = excluded.context_data - """, - (runtime_id, context_blob), - ) - await conn.commit() - except aiosqlite.Error as exc: - msg = f"Failed to save workflow context to database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}" + db = await self._get_db() + await db.execute( + """ + INSERT INTO workflow_contexts (runtime_id, context_data) + VALUES (?, ?) + ON CONFLICT(runtime_id) DO UPDATE SET + context_data = excluded.context_data + """, + (runtime_id, context_blob), + ) + await db.commit() + except Exception as exc: + msg = f"Failed to save workflow context to database {self.storage_path!r}: {exc}" raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc async def load_context(self, runtime_id: str) -> dict[str, Any] | None: @@ -204,16 +213,13 @@ async def load_context(self, runtime_id: str) -> dict[str, Any] | None: Serialized workflow context dictionary or None if not found """ try: - async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: - await self._apply_connection_pragmas(conn) - - cursor = await conn.execute( - "SELECT context_data FROM workflow_contexts WHERE runtime_id = ?", - (runtime_id,), - ) - row = await cursor.fetchone() - except aiosqlite.Error as exc: - msg = f"Failed to load workflow context from database {self.storage_path!r}: {exc.sqlite_errorname} {exc.sqlite_errorcode}" + db = await self._get_db() + row = await db.fetchone( + "SELECT context_data FROM workflow_contexts WHERE runtime_id = ?", + (runtime_id,), + ) + except Exception as exc: + msg = f"Failed to load workflow context from database {self.storage_path!r}: {exc}" raise UiPathFaultedTriggerError(ErrorCategory.SYSTEM, msg) from exc if not row: @@ -239,37 +245,32 @@ async def set_value( value_text = self._dump_value(value) - async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: - await self._apply_connection_pragmas(conn) - - await conn.execute( - """ - INSERT INTO runtime_kv (runtime_id, namespace, key, value) - VALUES (?, ?, ?, ?) - ON CONFLICT(runtime_id, namespace, key) - DO UPDATE SET - value = excluded.value, - timestamp = (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')) - """, - (runtime_id, namespace, key, value_text), - ) - await conn.commit() + db = await self._get_db() + await db.execute( + """ + INSERT INTO runtime_kv (runtime_id, namespace, key, value) + VALUES (?, ?, ?, ?) + ON CONFLICT(runtime_id, namespace, key) + DO UPDATE SET + value = excluded.value, + timestamp = (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')) + """, + (runtime_id, namespace, key, value_text), + ) + await db.commit() async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any: """Get arbitrary key-value pair from database (scoped by runtime_id + namespace).""" - async with aiosqlite.connect(self.storage_path, timeout=30.0) as conn: - await self._apply_connection_pragmas(conn) - - cur = await conn.execute( - """ - SELECT value - FROM runtime_kv - WHERE runtime_id = ? AND namespace = ? AND key = ? - LIMIT 1 - """, - (runtime_id, namespace, key), - ) - row = await cur.fetchone() + db = await self._get_db() + row = await db.fetchone( + """ + SELECT value + FROM runtime_kv + WHERE runtime_id = ? AND namespace = ? AND key = ? + LIMIT 1 + """, + (runtime_id, namespace, key), + ) if not row: return None @@ -343,10 +344,3 @@ def _load_value(self, raw: str | None) -> Any: if raw.startswith("j:"): return json.loads(raw[2:]) return raw - - async def _apply_connection_pragmas(self, conn: aiosqlite.Connection) -> None: - """Apply per-connection PRAGMA settings for optimal concurrency.""" - await conn.execute("PRAGMA busy_timeout=30000") - await conn.execute("PRAGMA synchronous=NORMAL") - await conn.execute("PRAGMA cache_size=10000") - await conn.execute("PRAGMA temp_store=MEMORY") diff --git a/tests/storage/test_integration.py b/tests/storage/test_integration.py index c481227..c1d6bc9 100644 --- a/tests/storage/test_integration.py +++ b/tests/storage/test_integration.py @@ -2,7 +2,8 @@ import asyncio import json -from pathlib import Path +import os +import tempfile from typing import Any import pytest @@ -13,7 +14,7 @@ UiPathResumeTriggerType, ) -from uipath_llamaindex.runtime.storage import SQLiteResumableStorage +from uipath_llamaindex.runtime.storage import SqliteResumableStorage class ComplexModel(BaseModel): @@ -29,15 +30,21 @@ class TestIntegrationScenarios: """Test realistic integration scenarios.""" @pytest.fixture - async def storage(self, tmp_path: Path): - """Create and setup a storage instance.""" - db_path = tmp_path / "integration_test.db" - storage = SQLiteResumableStorage(str(db_path)) - await storage.setup() - return storage + async def storage(self): + """Create a SqliteResumableStorage instance with temporary database file.""" + temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + temp_db.close() + + try: + async with SqliteResumableStorage(str(temp_db.name)) as storage: + await storage.setup() + yield storage + finally: + if os.path.exists(temp_db.name): + os.remove(temp_db.name) @pytest.mark.asyncio - async def test_full_workflow_lifecycle(self, storage: SQLiteResumableStorage): + async def test_full_workflow_lifecycle(self, storage: SqliteResumableStorage): """Test a complete workflow lifecycle with all storage operations.""" runtime_id = "workflow-123" @@ -103,7 +110,7 @@ async def test_full_workflow_lifecycle(self, storage: SQLiteResumableStorage): assert status == "in_progress" @pytest.mark.asyncio - async def test_multiple_parallel_workflows(self, storage: SQLiteResumableStorage): + async def test_multiple_parallel_workflows(self, storage: SqliteResumableStorage): """Test handling multiple parallel workflows.""" workflows: list[tuple[str, UiPathResumeTrigger | None, dict[str, Any]]] = [] trigger: UiPathResumeTrigger | None @@ -152,7 +159,7 @@ async def test_multiple_parallel_workflows(self, storage: SQLiteResumableStorage assert status == "active" @pytest.mark.asyncio - async def test_workflow_state_updates(self, storage: SQLiteResumableStorage): + async def test_workflow_state_updates(self, storage: SqliteResumableStorage): """Test updating workflow state multiple times.""" runtime_id = "stateful-workflow" @@ -183,15 +190,21 @@ class TestEdgeCases: """Test edge cases and boundary conditions.""" @pytest.fixture - async def storage(self, tmp_path: Path): - """Create and setup a storage instance.""" - db_path = tmp_path / "edge_test.db" - storage = SQLiteResumableStorage(str(db_path)) - await storage.setup() - return storage + async def storage(self): + """Create a SqliteResumableStorage instance with temporary database file.""" + temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + temp_db.close() + + try: + async with SqliteResumableStorage(str(temp_db.name)) as storage: + await storage.setup() + yield storage + finally: + if os.path.exists(temp_db.name): + os.remove(temp_db.name) @pytest.mark.asyncio - async def test_very_large_context(self, storage: SQLiteResumableStorage): + async def test_very_large_context(self, storage: SqliteResumableStorage): """Test handling of very large context data.""" runtime_id = "large-context" @@ -210,7 +223,7 @@ async def test_very_large_context(self, storage: SQLiteResumableStorage): @pytest.mark.asyncio async def test_unicode_and_special_characters( - self, storage: SQLiteResumableStorage + self, storage: SqliteResumableStorage ): """Test handling of Unicode and special characters.""" runtime_id = "unicode-test" @@ -230,7 +243,7 @@ async def test_unicode_and_special_characters( assert loaded == context @pytest.mark.asyncio - async def test_empty_string_values(self, storage: SQLiteResumableStorage): + async def test_empty_string_values(self, storage: SqliteResumableStorage): """Test handling of empty strings.""" runtime_id = "empty-strings" @@ -240,7 +253,7 @@ async def test_empty_string_values(self, storage: SQLiteResumableStorage): assert value == "" @pytest.mark.asyncio - async def test_whitespace_only_values(self, storage: SQLiteResumableStorage): + async def test_whitespace_only_values(self, storage: SqliteResumableStorage): """Test handling of whitespace-only strings.""" runtime_id = "whitespace" @@ -259,7 +272,7 @@ async def test_whitespace_only_values(self, storage: SQLiteResumableStorage): assert value == expected @pytest.mark.asyncio - async def test_nested_dict_with_none_values(self, storage: SQLiteResumableStorage): + async def test_nested_dict_with_none_values(self, storage: SqliteResumableStorage): """Test handling of nested dictionaries with None values.""" runtime_id = "nested-none" @@ -274,7 +287,7 @@ async def test_nested_dict_with_none_values(self, storage: SQLiteResumableStorag assert loaded == nested @pytest.mark.asyncio - async def test_very_long_runtime_id(self, storage: SQLiteResumableStorage): + async def test_very_long_runtime_id(self, storage: SqliteResumableStorage): """Test handling of very long runtime IDs.""" runtime_id = "a" * 500 # Very long runtime ID @@ -293,7 +306,7 @@ async def test_very_long_runtime_id(self, storage: SQLiteResumableStorage): assert triggers[0].item_key == "test-key" @pytest.mark.asyncio - async def test_special_characters_in_keys(self, storage: SQLiteResumableStorage): + async def test_special_characters_in_keys(self, storage: SqliteResumableStorage): """Test handling of special characters in namespace and key names.""" runtime_id = "special-chars" @@ -313,7 +326,7 @@ async def test_special_characters_in_keys(self, storage: SQLiteResumableStorage) assert value == f"value-{ns}-{key}" @pytest.mark.asyncio - async def test_json_with_escaped_characters(self, storage: SQLiteResumableStorage): + async def test_json_with_escaped_characters(self, storage: SqliteResumableStorage): """Test handling of JSON with escaped characters.""" runtime_id = "escaped-json" @@ -330,7 +343,7 @@ async def test_json_with_escaped_characters(self, storage: SQLiteResumableStorag assert retrieved == data @pytest.mark.asyncio - async def test_deeply_nested_structures(self, storage: SQLiteResumableStorage): + async def test_deeply_nested_structures(self, storage: SqliteResumableStorage): """Test handling of deeply nested data structures.""" runtime_id = "deep-nest" @@ -348,7 +361,7 @@ def create_nested(depth): assert loaded == {"root": deep_structure} @pytest.mark.asyncio - async def test_trigger_with_complex_payload(self, storage: SQLiteResumableStorage): + async def test_trigger_with_complex_payload(self, storage: SqliteResumableStorage): """Test trigger with complex nested payload.""" runtime_id = "complex-payload" @@ -396,15 +409,21 @@ class TestDataConsistency: """Test data consistency and isolation.""" @pytest.fixture - async def storage(self, tmp_path: Path): - """Create and setup a storage instance.""" - db_path = tmp_path / "consistency_test.db" - storage = SQLiteResumableStorage(str(db_path)) - await storage.setup() - return storage + async def storage(self): + """Create a SqliteResumableStorage instance with temporary database file.""" + temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + temp_db.close() + + try: + async with SqliteResumableStorage(str(temp_db.name)) as storage: + await storage.setup() + yield storage + finally: + if os.path.exists(temp_db.name): + os.remove(temp_db.name) @pytest.mark.asyncio - async def test_concurrent_reads(self, storage: SQLiteResumableStorage): + async def test_concurrent_reads(self, storage: SqliteResumableStorage): """Test concurrent reads of the same data.""" runtime_id = "concurrent-read-test" @@ -421,7 +440,7 @@ async def test_concurrent_reads(self, storage: SQLiteResumableStorage): assert result == context @pytest.mark.asyncio - async def test_kv_isolation_stress_test(self, storage: SQLiteResumableStorage): + async def test_kv_isolation_stress_test(self, storage: SqliteResumableStorage): """Stress test KV store isolation.""" tasks = [] @@ -451,7 +470,7 @@ async def test_kv_isolation_stress_test(self, storage: SQLiteResumableStorage): assert value == expected, f"Expected {expected}, got {value}" @pytest.mark.asyncio - async def test_update_race_condition(self, storage: SQLiteResumableStorage): + async def test_update_race_condition(self, storage: SqliteResumableStorage): """Test that concurrent updates don't cause data corruption.""" runtime_id = "race-condition-test" diff --git a/tests/storage/test_storage.py b/tests/storage/test_storage.py index 72f3215..7d73e14 100644 --- a/tests/storage/test_storage.py +++ b/tests/storage/test_storage.py @@ -1,6 +1,8 @@ -"""Tests for SQLiteResumableStorage class.""" +"""Tests for SqliteResumableStorage class.""" import json +import os +import tempfile from pathlib import Path from typing import Any @@ -14,7 +16,7 @@ UiPathResumeTriggerType, ) -from uipath_llamaindex.runtime.storage import SQLiteResumableStorage +from uipath_llamaindex.runtime.storage import SqliteResumableStorage class SampleModel(BaseModel): @@ -24,100 +26,94 @@ class SampleModel(BaseModel): value: int -class TestSQLiteResumableStorageInitialization: +class TestSqliteResumableStorageInitialization: """Test storage initialization and setup.""" @pytest.mark.asyncio async def test_setup_creates_database_file(self, tmp_path: Path): """Test that setup creates the database file.""" db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) - - await storage.setup() - - assert db_path.exists() + async with SqliteResumableStorage(str(db_path)) as storage: + await storage.setup() + assert db_path.exists() @pytest.mark.asyncio async def test_setup_creates_directory_if_missing(self, tmp_path: Path): """Test that setup creates parent directories if they don't exist.""" db_path = tmp_path / "subdir" / "another" / "test.db" - storage = SQLiteResumableStorage(str(db_path)) - - await storage.setup() - - assert db_path.exists() - assert db_path.parent.exists() + async with SqliteResumableStorage(str(db_path)) as storage: + await storage.setup() + assert db_path.exists() + assert db_path.parent.exists() @pytest.mark.asyncio async def test_setup_creates_workflow_contexts_table(self, tmp_path: Path): """Test that setup creates the workflow_contexts table.""" db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) - - await storage.setup() - - async with aiosqlite.connect(str(db_path)) as conn: - cursor = await conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='workflow_contexts'" - ) - result = await cursor.fetchone() - assert result is not None + async with SqliteResumableStorage(str(db_path)) as storage: + await storage.setup() + async with aiosqlite.connect(str(db_path)) as conn: + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='workflow_contexts'" + ) + result = await cursor.fetchone() + assert result is not None @pytest.mark.asyncio async def test_setup_creates_resume_triggers_table(self, tmp_path: Path): """Test that setup creates the resume_triggers table.""" db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) - - await storage.setup() - - async with aiosqlite.connect(str(db_path)) as conn: - cursor = await conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='resume_triggers'" - ) - result = await cursor.fetchone() - assert result is not None + async with SqliteResumableStorage(str(db_path)) as storage: + await storage.setup() + async with aiosqlite.connect(str(db_path)) as conn: + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='resume_triggers'" + ) + result = await cursor.fetchone() + assert result is not None @pytest.mark.asyncio async def test_setup_creates_runtime_kv_table(self, tmp_path: Path): """Test that setup creates the runtime_kv table.""" db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) - - await storage.setup() - - async with aiosqlite.connect(str(db_path)) as conn: - cursor = await conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='runtime_kv'" - ) - result = await cursor.fetchone() - assert result is not None + async with SqliteResumableStorage(str(db_path)) as storage: + await storage.setup() + async with aiosqlite.connect(str(db_path)) as conn: + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='runtime_kv'" + ) + result = await cursor.fetchone() + assert result is not None @pytest.mark.asyncio async def test_setup_is_idempotent(self, tmp_path: Path): """Test that setup can be called multiple times safely.""" db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) - - await storage.setup() - await storage.setup() # Should not raise - - assert db_path.exists() + async with SqliteResumableStorage(str(db_path)) as storage: + await storage.setup() + await storage.setup() # Should not raise + assert db_path.exists() class TestTriggerOperations: """Test resume trigger save and retrieval operations.""" @pytest.fixture - async def storage(self, tmp_path: Path): - """Create and setup a storage instance.""" - db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) - await storage.setup() - return storage + async def storage(self): + """Create a SqliteResumableStorage instance with temporary database file.""" + temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + temp_db.close() + + try: + async with SqliteResumableStorage(str(temp_db.name)) as storage: + await storage.setup() + yield storage + finally: + if os.path.exists(temp_db.name): + os.remove(temp_db.name) @pytest.mark.asyncio - async def test_save_trigger_basic(self, storage: SQLiteResumableStorage): + async def test_save_trigger_basic(self, storage: SqliteResumableStorage): """Test saving a basic resume trigger.""" trigger = UiPathResumeTrigger( trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, @@ -140,7 +136,7 @@ async def test_save_trigger_basic(self, storage: SQLiteResumableStorage): assert triggers[0].item_key == "queue-123" @pytest.mark.asyncio - async def test_save_trigger_with_api_type(self, storage: SQLiteResumableStorage): + async def test_save_trigger_with_api_type(self, storage: SqliteResumableStorage): """Test saving an API type trigger.""" trigger = UiPathResumeTrigger( trigger_type=UiPathResumeTriggerType.API, @@ -166,7 +162,7 @@ async def test_save_trigger_with_api_type(self, storage: SQLiteResumableStorage) @pytest.mark.asyncio async def test_save_trigger_with_dict_payload( - self, storage: SQLiteResumableStorage + self, storage: SqliteResumableStorage ): """Test saving trigger with dictionary payload.""" payload_dict = {"key1": "value1", "key2": 123, "nested": {"a": "b"}} @@ -188,7 +184,7 @@ async def test_save_trigger_with_dict_payload( @pytest.mark.asyncio async def test_save_trigger_with_none_payload( - self, storage: SQLiteResumableStorage + self, storage: SqliteResumableStorage ): """Test saving trigger with None payload.""" trigger = UiPathResumeTrigger( @@ -208,7 +204,7 @@ async def test_save_trigger_with_none_payload( @pytest.mark.asyncio async def test_get_latest_trigger_multiple_triggers( - self, storage: SQLiteResumableStorage + self, storage: SqliteResumableStorage ): """Test that get_latest_trigger returns the most recent trigger.""" # Save multiple triggers for the same runtime @@ -242,7 +238,7 @@ async def test_get_latest_trigger_multiple_triggers( @pytest.mark.asyncio async def test_get_latest_trigger_nonexistent( - self, storage: SQLiteResumableStorage + self, storage: SqliteResumableStorage ): """Test getting trigger for non-existent runtime_id.""" result = await storage.get_triggers("nonexistent") @@ -250,7 +246,7 @@ async def test_get_latest_trigger_nonexistent( @pytest.mark.asyncio async def test_save_trigger_different_runtimes( - self, storage: SQLiteResumableStorage + self, storage: SqliteResumableStorage ): """Test that triggers are properly isolated by runtime_id.""" trigger1 = UiPathResumeTrigger( @@ -283,15 +279,21 @@ class TestContextOperations: """Test workflow context save and load operations.""" @pytest.fixture - async def storage(self, tmp_path: Path): - """Create and setup a storage instance.""" - db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) - await storage.setup() - return storage + async def storage(self): + """Create a SqliteResumableStorage instance with temporary database file.""" + temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + temp_db.close() + + try: + async with SqliteResumableStorage(str(temp_db.name)) as storage: + await storage.setup() + yield storage + finally: + if os.path.exists(temp_db.name): + os.remove(temp_db.name) @pytest.mark.asyncio - async def test_save_and_load_context_basic(self, storage: SQLiteResumableStorage): + async def test_save_and_load_context_basic(self, storage: SqliteResumableStorage): """Test saving and loading a basic context.""" context = {"step": 1, "data": "test data", "flags": {"active": True}} @@ -301,7 +303,7 @@ async def test_save_and_load_context_basic(self, storage: SQLiteResumableStorage assert loaded == context @pytest.mark.asyncio - async def test_save_and_load_context_complex(self, storage: SQLiteResumableStorage): + async def test_save_and_load_context_complex(self, storage: SqliteResumableStorage): """Test saving and loading complex context with nested structures.""" context = { "variables": {"counter": 42, "name": "test", "items": [1, 2, 3, 4, 5]}, @@ -318,7 +320,7 @@ async def test_save_and_load_context_complex(self, storage: SQLiteResumableStora @pytest.mark.asyncio async def test_save_context_overwrites_existing( - self, storage: SQLiteResumableStorage + self, storage: SqliteResumableStorage ): """Test that saving context overwrites existing context.""" context1 = {"step": 1} @@ -332,13 +334,13 @@ async def test_save_context_overwrites_existing( assert loaded != context1 @pytest.mark.asyncio - async def test_load_context_nonexistent(self, storage: SQLiteResumableStorage): + async def test_load_context_nonexistent(self, storage: SqliteResumableStorage): """Test loading context for non-existent runtime_id.""" result = await storage.load_context("nonexistent") assert result is None @pytest.mark.asyncio - async def test_save_context_empty_dict(self, storage: SQLiteResumableStorage): + async def test_save_context_empty_dict(self, storage: SqliteResumableStorage): """Test saving empty dictionary as context.""" context: dict[str, Any] = {} @@ -349,7 +351,7 @@ async def test_save_context_empty_dict(self, storage: SQLiteResumableStorage): @pytest.mark.asyncio async def test_contexts_isolated_by_runtime_id( - self, storage: SQLiteResumableStorage + self, storage: SqliteResumableStorage ): """Test that contexts are properly isolated by runtime_id.""" context_a = {"runtime": "a", "value": 100} @@ -369,15 +371,21 @@ class TestKeyValueOperations: """Test key-value storage operations.""" @pytest.fixture - async def storage(self, tmp_path: Path): - """Create and setup a storage instance.""" - db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) - await storage.setup() - return storage + async def storage(self): + """Create a SqliteResumableStorage instance with temporary database file.""" + temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + temp_db.close() + + try: + async with SqliteResumableStorage(str(temp_db.name)) as storage: + await storage.setup() + yield storage + finally: + if os.path.exists(temp_db.name): + os.remove(temp_db.name) @pytest.mark.asyncio - async def test_set_and_get_string_value(self, storage: SQLiteResumableStorage): + async def test_set_and_get_string_value(self, storage: SqliteResumableStorage): """Test setting and getting a string value.""" await storage.set_value("runtime-1", "namespace1", "key1", "test_value") @@ -385,7 +393,7 @@ async def test_set_and_get_string_value(self, storage: SQLiteResumableStorage): assert value == "test_value" @pytest.mark.asyncio - async def test_set_and_get_dict_value(self, storage: SQLiteResumableStorage): + async def test_set_and_get_dict_value(self, storage: SqliteResumableStorage): """Test setting and getting a dictionary value.""" test_dict = {"name": "John", "age": 30, "active": True} @@ -395,7 +403,7 @@ async def test_set_and_get_dict_value(self, storage: SQLiteResumableStorage): assert value == test_dict @pytest.mark.asyncio - async def test_set_and_get_pydantic_model(self, storage: SQLiteResumableStorage): + async def test_set_and_get_pydantic_model(self, storage: SqliteResumableStorage): """Test setting and getting a Pydantic model.""" model = SampleModel(name="test", value=42) @@ -405,7 +413,7 @@ async def test_set_and_get_pydantic_model(self, storage: SQLiteResumableStorage) assert value == model.model_dump() @pytest.mark.asyncio - async def test_set_and_get_none_value(self, storage: SQLiteResumableStorage): + async def test_set_and_get_none_value(self, storage: SqliteResumableStorage): """Test setting and getting None value.""" await storage.set_value("runtime-4", "namespace4", "key4", None) @@ -413,7 +421,7 @@ async def test_set_and_get_none_value(self, storage: SQLiteResumableStorage): assert value is None @pytest.mark.asyncio - async def test_set_value_invalid_type(self, storage: SQLiteResumableStorage): + async def test_set_value_invalid_type(self, storage: SqliteResumableStorage): """Test that setting invalid type raises TypeError.""" with pytest.raises( TypeError, match="Value must be str, dict, BaseModel or None" @@ -426,7 +434,7 @@ async def test_set_value_invalid_type(self, storage: SQLiteResumableStorage): await storage.set_value("runtime-5", "namespace5", "key5", [1, 2, 3]) @pytest.mark.asyncio - async def test_set_value_overwrites_existing(self, storage: SQLiteResumableStorage): + async def test_set_value_overwrites_existing(self, storage: SqliteResumableStorage): """Test that setting a value overwrites existing value.""" await storage.set_value("runtime-6", "namespace6", "key6", "first") await storage.set_value("runtime-6", "namespace6", "key6", "second") @@ -435,13 +443,13 @@ async def test_set_value_overwrites_existing(self, storage: SQLiteResumableStora assert value == "second" @pytest.mark.asyncio - async def test_get_value_nonexistent(self, storage: SQLiteResumableStorage): + async def test_get_value_nonexistent(self, storage: SqliteResumableStorage): """Test getting non-existent value returns None.""" value = await storage.get_value("nonexistent", "namespace", "key") assert value is None @pytest.mark.asyncio - async def test_values_isolated_by_runtime_id(self, storage: SQLiteResumableStorage): + async def test_values_isolated_by_runtime_id(self, storage: SqliteResumableStorage): """Test that values are isolated by runtime_id.""" await storage.set_value("runtime-a", "ns", "key", "value-a") await storage.set_value("runtime-b", "ns", "key", "value-b") @@ -453,7 +461,7 @@ async def test_values_isolated_by_runtime_id(self, storage: SQLiteResumableStora assert value_b == "value-b" @pytest.mark.asyncio - async def test_values_isolated_by_namespace(self, storage: SQLiteResumableStorage): + async def test_values_isolated_by_namespace(self, storage: SqliteResumableStorage): """Test that values are isolated by namespace.""" await storage.set_value("runtime-1", "ns-a", "key", "value-a") await storage.set_value("runtime-1", "ns-b", "key", "value-b") @@ -465,7 +473,7 @@ async def test_values_isolated_by_namespace(self, storage: SQLiteResumableStorag assert value_b == "value-b" @pytest.mark.asyncio - async def test_values_isolated_by_key(self, storage: SQLiteResumableStorage): + async def test_values_isolated_by_key(self, storage: SqliteResumableStorage): """Test that values are isolated by key.""" await storage.set_value("runtime-1", "ns", "key-a", "value-a") await storage.set_value("runtime-1", "ns", "key-b", "value-b") @@ -481,14 +489,20 @@ class TestSerializationMethods: """Test internal serialization/deserialization methods.""" @pytest.fixture - async def storage(self, tmp_path: Path): - """Create a storage instance.""" - db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) - await storage.setup() - return storage - - def test_serialize_trigger_queue_type(self, storage: SQLiteResumableStorage): + async def storage(self): + """Create a SqliteResumableStorage instance with temporary database file.""" + temp_db = tempfile.NamedTemporaryFile(delete=False, suffix=".db") + temp_db.close() + + try: + async with SqliteResumableStorage(str(temp_db.name)) as storage: + await storage.setup() + yield storage + finally: + if os.path.exists(temp_db.name): + os.remove(temp_db.name) + + def test_serialize_trigger_queue_type(self, storage: SqliteResumableStorage): """Test serialization of queue type trigger.""" trigger = UiPathResumeTrigger( trigger_type=UiPathResumeTriggerType.QUEUE_ITEM, @@ -510,7 +524,7 @@ def test_serialize_trigger_queue_type(self, storage: SQLiteResumableStorage): assert serialized["interrupt_id"] == "interrupt-456" assert json.loads(serialized["payload"]) == {"test": "data"} - def test_serialize_trigger_api_type(self, storage: SQLiteResumableStorage): + def test_serialize_trigger_api_type(self, storage: SqliteResumableStorage): """Test serialization of API type trigger.""" trigger = UiPathResumeTrigger( trigger_type=UiPathResumeTriggerType.API, @@ -530,7 +544,7 @@ def test_serialize_trigger_api_type(self, storage: SQLiteResumableStorage): assert serialized["payload"] == "string payload" assert serialized["interrupt_id"] == "interrupt-123" - def test_deserialize_trigger_queue_type(self, storage: SQLiteResumableStorage): + def test_deserialize_trigger_queue_type(self, storage: SqliteResumableStorage): """Test deserialization of queue type trigger.""" trigger_data = { "type": UiPathResumeTriggerType.QUEUE_ITEM.value, @@ -550,7 +564,7 @@ def test_deserialize_trigger_queue_type(self, storage: SQLiteResumableStorage): assert trigger.folder_path == "/test" assert trigger.folder_key == "folder-123" - def test_deserialize_trigger_api_type(self, storage: SQLiteResumableStorage): + def test_deserialize_trigger_api_type(self, storage: SqliteResumableStorage): """Test deserialization of API type trigger.""" trigger_data = { "type": UiPathResumeTriggerType.API.value, @@ -566,43 +580,43 @@ def test_deserialize_trigger_api_type(self, storage: SQLiteResumableStorage): assert trigger.api_resume.inbox_id == "inbox-abc" assert trigger.api_resume.request == "request data" - def test_dump_value_string(self, storage: SQLiteResumableStorage): + def test_dump_value_string(self, storage: SqliteResumableStorage): """Test _dump_value with string.""" result = storage._dump_value("test string") assert result == "s:test string" - def test_dump_value_dict(self, storage: SQLiteResumableStorage): + def test_dump_value_dict(self, storage: SqliteResumableStorage): """Test _dump_value with dictionary.""" result = storage._dump_value({"key": "value"}) assert result == 'j:{"key": "value"}' - def test_dump_value_pydantic_model(self, storage: SQLiteResumableStorage): + def test_dump_value_pydantic_model(self, storage: SqliteResumableStorage): """Test _dump_value with Pydantic model.""" model = SampleModel(name="test", value=42) result = storage._dump_value(model) assert result == 'j:{"name": "test", "value": 42}' - def test_dump_value_none(self, storage: SQLiteResumableStorage): + def test_dump_value_none(self, storage: SqliteResumableStorage): """Test _dump_value with None.""" result = storage._dump_value(None) assert result is None - def test_load_value_string(self, storage: SQLiteResumableStorage): + def test_load_value_string(self, storage: SqliteResumableStorage): """Test _load_value with string.""" result = storage._load_value("s:test string") assert result == "test string" - def test_load_value_json(self, storage: SQLiteResumableStorage): + def test_load_value_json(self, storage: SqliteResumableStorage): """Test _load_value with JSON.""" result = storage._load_value('j:{"key": "value"}') assert result == {"key": "value"} - def test_load_value_none(self, storage: SQLiteResumableStorage): + def test_load_value_none(self, storage: SqliteResumableStorage): """Test _load_value with None.""" result = storage._load_value(None) assert result is None - def test_load_value_raw_string(self, storage: SQLiteResumableStorage): + def test_load_value_raw_string(self, storage: SqliteResumableStorage): """Test _load_value with raw string (no prefix).""" result = storage._load_value("raw string") assert result == "raw string" @@ -615,7 +629,7 @@ class TestErrorHandling: async def test_context_with_non_picklable_object(self, tmp_path: Path): """Test handling of non-picklable objects in context.""" db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) + storage = SqliteResumableStorage(str(db_path)) await storage.setup() # Lambda functions are not picklable @@ -632,7 +646,7 @@ async def test_multiple_concurrent_operations(self, tmp_path: Path): import asyncio db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) + storage = SqliteResumableStorage(str(db_path)) await storage.setup() async def save_trigger(runtime_id: str, key: str): @@ -674,7 +688,7 @@ class TestDatabaseSchema: async def test_runtime_kv_primary_key_constraint(self, tmp_path: Path): """Test that runtime_kv primary key constraint works.""" db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) + storage = SqliteResumableStorage(str(db_path)) await storage.setup() # First insert @@ -690,7 +704,7 @@ async def test_runtime_kv_primary_key_constraint(self, tmp_path: Path): async def test_workflow_contexts_primary_key_constraint(self, tmp_path: Path): """Test that workflow_contexts primary key constraint works.""" db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) + storage = SqliteResumableStorage(str(db_path)) await storage.setup() # First save @@ -706,7 +720,7 @@ async def test_workflow_contexts_primary_key_constraint(self, tmp_path: Path): async def test_resume_triggers_autoincrement(self, tmp_path: Path): """Test that resume_triggers id autoincrement works.""" db_path = tmp_path / "test.db" - storage = SQLiteResumableStorage(str(db_path)) + storage = SqliteResumableStorage(str(db_path)) await storage.setup() trigger1 = UiPathResumeTrigger(