Skip to content

Commit 202b500

Browse files
authored
feat: Add MySQL and MariaDB support for SqlStorageClient (#1749)
### Description - Add `MySQL` and `MariaDB` support for `SqlStorageClient`. ### Issues - Relate: #1405
1 parent aad233f commit 202b500

File tree

6 files changed

+173
-43
lines changed

6 files changed

+173
-43
lines changed

docs/guides/storage_clients.mdx

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ Crawlee provides three main storage client implementations:
2828

2929
- <ApiLink to="class/FileSystemStorageClient">`FileSystemStorageClient`</ApiLink> - Provides persistent file system storage with in-memory caching.
3030
- <ApiLink to="class/MemoryStorageClient">`MemoryStorageClient`</ApiLink> - Stores data in memory with no persistence.
31-
- <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> - Provides persistent storage using a SQL database ([SQLite](https://sqlite.org/) or [PostgreSQL](https://www.postgresql.org/)). Requires installing the extra dependency: `crawlee[sql_sqlite]` for SQLite or `crawlee[sql_postgres]` for PostgreSQL.
31+
- <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> - Provides persistent storage using a SQL database ([SQLite](https://sqlite.org/), [PostgreSQL](https://www.postgresql.org/), [MySQL](https://www.mysql.com/) or [MariaDB](https://mariadb.org/)). Requires installing the extra dependency: `crawlee[sql_sqlite]` for SQLite, `crawlee[sql_postgres]` for PostgreSQL or `crawlee[sql_mysql]` for MySQL and MariaDB.
3232
- <ApiLink to="class/RedisStorageClient">`RedisStorageClient`</ApiLink> - Provides persistent storage using a [Redis](https://redis.io/) database v8.0+. Requires installing the extra dependency `crawlee[redis]`.
3333
- [`ApifyStorageClient`](https://docs.apify.com/sdk/python/reference/class/ApifyStorageClient) - Manages storage on the [Apify platform](https://apify.com), implemented in the [Apify SDK](https://github.com/apify/apify-sdk-python).
3434

@@ -144,7 +144,7 @@ The `MemoryStorageClient` does not persist data between runs. All data is lost w
144144
The `SqlStorageClient` is experimental. Its API and behavior may change in future releases.
145145
:::
146146

147-
The <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> provides persistent storage using a SQL database (SQLite by default, or PostgreSQL). It supports all Crawlee storage types and enables concurrent access from multiple independent clients or processes.
147+
The <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> provides persistent storage using a SQL database (SQLite by default, or PostgreSQL, MySQL, MariaDB). It supports all Crawlee storage types and enables concurrent access from multiple independent clients or processes.
148148

149149
:::note dependencies
150150
The <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> is not included in the core Crawlee package.
@@ -154,10 +154,12 @@ To use it, you need to install Crawlee with the appropriate extra dependency:
154154
<code>pip install 'crawlee[sql_sqlite]'</code>
155155
- For PostgreSQL support, run:
156156
<code>pip install 'crawlee[sql_postgres]'</code>
157+
- For MySQL or MariaDB support, run:
158+
<code>pip install 'crawlee[sql_mysql]'</code>
157159
:::
158160

159161
By default, <ApiLink to="class/SqlStorageClient">SqlStorageClient</ApiLink> uses SQLite.
160-
To use PostgreSQL instead, just provide a PostgreSQL connection string via the `connection_string` parameter. No other code changes are needed—the same client works for both databases.
162+
To use a different database, just provide the appropriate connection string via the `connection_string` parameter. No other code changes are needed—the same client works for all supported databases.
161163

162164
<RunnableCodeBlock className="language-python" language="python">
163165
{SQLStorageClientBasicExample}
@@ -214,7 +216,6 @@ class dataset_metadata_buffer {
214216
+ id (PK)
215217
+ accessed_at
216218
+ modified_at
217-
+ dataset_id (FK)
218219
+ delta_item_count
219220
}
220221
@@ -247,7 +248,6 @@ class key_value_store_metadata_buffer {
247248
+ id (PK)
248249
+ accessed_at
249250
+ modified_at
250-
+ key_value_store_id (FK)
251251
}
252252
253253
%% ========================
@@ -321,7 +321,6 @@ class request_queue_metadata_buffer {
321321
+ id (PK)
322322
+ accessed_at
323323
+ modified_at
324-
+ request_queue_id (FK)
325324
+ client_id
326325
+ delta_handled_count
327326
+ delta_pending_count
@@ -346,11 +345,15 @@ Configuration options for the <ApiLink to="class/SqlStorageClient">`SqlStorageCl
346345

347346
Configuration options for the <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> can be set via constructor arguments:
348347

349-
- **`connection_string`** (default: SQLite in <ApiLink to="class/Configuration">`Configuration`</ApiLink> storage dir) - SQLAlchemy connection string, e.g. `sqlite+aiosqlite:///my.db` or `postgresql+asyncpg://user:pass@host/db`.
348+
- **`connection_string`** (default: SQLite in <ApiLink to="class/Configuration">`Configuration`</ApiLink> storage dir) - SQLAlchemy connection string, e.g. `sqlite+aiosqlite:///my.db`, `postgresql+asyncpg://user:pass@host/db`, `mysql+aiomysql://user:pass@host/db` or `mariadb+aiomysql://user:pass@host/db`.
350349
- **`engine`** - Pre-configured SQLAlchemy AsyncEngine (optional).
351350

352351
For advanced scenarios, you can configure <ApiLink to="class/SqlStorageClient">`SqlStorageClient`</ApiLink> with a custom SQLAlchemy engine and additional options via the <ApiLink to="class/Configuration">`Configuration`</ApiLink> class. This is useful, for example, when connecting to an external PostgreSQL database or customizing connection pooling.
353352

353+
:::warning
354+
If you use MySQL or MariaDB, pass the `isolation_level='READ COMMITTED'` argument to `create_async_engine`. MySQL/MariaDB default to the `REPEATABLE READ` isolation level, which can cause unnecessary locking, deadlocks, or stale reads when multiple Crawlee workers access the same tables concurrently. Using `READ COMMITTED` ensures more predictable row-level locking and visibility semantics for `SqlStorageClient`.
355+
:::
356+
354357
<CodeBlock className="language-python" language="python">
355358
{SQLStorageClientConfigurationExample}
356359
</CodeBlock>

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ sql_sqlite = [
8181
"sqlalchemy[asyncio]>=2.0.0,<3.0.0",
8282
"aiosqlite>=0.21.0",
8383
]
84+
sql_mysql = [
85+
"sqlalchemy[asyncio]>=2.0.0,<3.0.0",
86+
"aiomysql>=0.3.2",
87+
"cryptography>=46.0.5",
88+
]
8489
redis = ["redis[hiredis] >= 7.0.0"]
8590

8691
[project.scripts]

src/crawlee/storage_clients/_sql/_client_mixin.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88

99
from sqlalchemy import CursorResult, delete, select, text, update
1010
from sqlalchemy import func as sql_func
11+
from sqlalchemy.dialects.mysql import insert as mysql_insert
1112
from sqlalchemy.dialects.postgresql import insert as pg_insert
1213
from sqlalchemy.dialects.sqlite import insert as lite_insert
13-
from sqlalchemy.exc import SQLAlchemyError
14+
from sqlalchemy.exc import OperationalError, SQLAlchemyError
1415

1516
from crawlee._utils.crypto import crypto_random_object_id
1617

@@ -227,6 +228,9 @@ def _build_insert_stmt_with_ignore(
227228
if dialect == 'sqlite':
228229
return lite_insert(table_model).values(insert_values).on_conflict_do_nothing()
229230

231+
if dialect in {'mysql', 'mariadb'}:
232+
return mysql_insert(table_model).values(insert_values).prefix_with('IGNORE')
233+
230234
raise NotImplementedError(f'Insert with ignore not supported for dialect: {dialect}')
231235

232236
def _build_upsert_stmt(
@@ -260,6 +264,11 @@ def _build_upsert_stmt(
260264
set_ = {col: getattr(lite_stmt.excluded, col) for col in update_columns}
261265
return lite_stmt.on_conflict_do_update(index_elements=conflict_cols, set_=set_)
262266

267+
if dialect in {'mysql', 'mariadb'}:
268+
mysql_stmt = mysql_insert(table_model).values(insert_values)
269+
set_ = {col: getattr(mysql_stmt.inserted, col) for col in update_columns}
270+
return mysql_stmt.on_duplicate_key_update(**set_)
271+
263272
raise NotImplementedError(f'Upsert not supported for dialect: {dialect}')
264273

265274
async def _purge(self, metadata_kwargs: MetadataUpdateParams) -> None:
@@ -402,11 +411,12 @@ async def _try_acquire_buffer_lock(self, session: AsyncSession) -> bool:
402411
Returns:
403412
True if lock was acquired, False if already locked by another process.
404413
"""
414+
capture_error_code = 1020 # MariaDB error code for "Record has changed since last read"
405415
now = datetime.now(timezone.utc)
406416
lock_until = now + self._BLOCK_BUFFER_TIME
407417
dialect = self._storage_client.get_dialect_name()
408418

409-
if dialect == 'postgresql':
419+
if dialect in {'postgresql', 'mysql', 'mariadb'}:
410420
select_stmt = (
411421
select(self._METADATA_TABLE)
412422
.where(
@@ -417,7 +427,17 @@ async def _try_acquire_buffer_lock(self, session: AsyncSession) -> bool:
417427
)
418428
.with_for_update(skip_locked=True)
419429
)
420-
result = await session.execute(select_stmt)
430+
431+
try:
432+
result = await session.execute(select_stmt)
433+
except OperationalError as e:
434+
# MariaDB raises error 1020 ("Record has changed since last read") instead of
435+
# silently skipping locked rows like MySQL/PostgreSQL. Treat it as lock not acquired.
436+
error_code = getattr(e.orig, 'args', [None])[0]
437+
if error_code == capture_error_code:
438+
return False
439+
raise
440+
421441
metadata_row = result.scalar_one_or_none()
422442

423443
if metadata_row is None:

src/crawlee/storage_clients/_sql/_request_queue_client.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -335,33 +335,33 @@ async def add_batch_of_requests(
335335
)
336336
)
337337

338-
if insert_values:
339-
if forefront:
340-
# If the request already exists in the database, we update the sequence_number by shifting request
341-
# to the left.
342-
upsert_stmt = self._build_upsert_stmt(
343-
self._ITEM_TABLE,
344-
insert_values,
345-
update_columns=['sequence_number'],
346-
conflict_cols=['request_id', 'request_queue_id'],
347-
)
348-
result = await session.execute(upsert_stmt)
349-
else:
350-
# If the request already exists in the database, we ignore this request when inserting.
351-
insert_stmt_with_ignore = self._build_insert_stmt_with_ignore(self._ITEM_TABLE, insert_values)
352-
result = await session.execute(insert_stmt_with_ignore)
338+
try:
339+
if insert_values:
340+
if forefront:
341+
# If the request already exists in the database, we update the sequence_number
342+
# by shifting request to the left.
343+
upsert_stmt = self._build_upsert_stmt(
344+
self._ITEM_TABLE,
345+
insert_values,
346+
update_columns=['sequence_number'],
347+
conflict_cols=['request_id', 'request_queue_id'],
348+
)
349+
result = await session.execute(upsert_stmt)
350+
else:
351+
# If the request already exists in the database, we ignore this request when inserting.
352+
insert_stmt_with_ignore = self._build_insert_stmt_with_ignore(self._ITEM_TABLE, insert_values)
353+
result = await session.execute(insert_stmt_with_ignore)
353354

354-
result = cast('CursorResult', result) if not isinstance(result, CursorResult) else result
355-
approximate_new_request += result.rowcount
355+
result = cast('CursorResult', result) if not isinstance(result, CursorResult) else result
356+
approximate_new_request += result.rowcount
356357

357-
await self._add_buffer_record(
358-
session,
359-
update_modified_at=True,
360-
delta_pending_request_count=approximate_new_request,
361-
delta_total_request_count=approximate_new_request,
362-
)
358+
await self._add_buffer_record(
359+
session,
360+
update_modified_at=True,
361+
delta_pending_request_count=approximate_new_request,
362+
delta_total_request_count=approximate_new_request,
363+
)
363364

364-
try:
365365
await session.commit()
366366
processed_requests.extend(transaction_processed_requests)
367367
except SQLAlchemyError as e:
@@ -433,7 +433,7 @@ async def fetch_next_request(self) -> Request | None:
433433

434434
async with self.get_session(with_simple_commit=True) as session:
435435
# We use the `skip_locked` database mechanism to prevent the 'interception' of requests by another client
436-
if dialect == 'postgresql':
436+
if dialect in {'postgresql', 'mysql', 'mariadb'}:
437437
stmt = stmt.with_for_update(skip_locked=True)
438438
result = await session.execute(stmt)
439439
requests_db = result.scalars().all()

src/crawlee/storage_clients/_sql/_storage_client.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import warnings
44
from logging import getLogger
55
from pathlib import Path
6-
from typing import TYPE_CHECKING
6+
from typing import TYPE_CHECKING, Any, ClassVar
77

88
from sqlalchemy.exc import IntegrityError, OperationalError
99
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
@@ -49,6 +49,8 @@ class SqlStorageClient(StorageClient):
4949
_DEFAULT_DB_NAME = 'crawlee.db'
5050
"""Default database name if not specified in connection string."""
5151

52+
_SUPPORTED_DIALECTS: ClassVar[set[str]] = {'sqlite', 'postgresql', 'mysql', 'mariadb'}
53+
5254
def __init__(
5355
self,
5456
*,
@@ -116,10 +118,10 @@ async def initialize(self, configuration: Configuration) -> None:
116118
async with engine.begin() as conn:
117119
self._dialect_name = engine.dialect.name
118120

119-
if self._dialect_name not in ('sqlite', 'postgresql'):
121+
if self._dialect_name not in self._SUPPORTED_DIALECTS:
120122
raise ValueError(
121-
f'Unsupported database dialect: {self._dialect_name}. Supported: sqlite, postgresql. '
122-
'Consider using a different database.',
123+
f'Unsupported database dialect: {self._dialect_name}. Supported: '
124+
f'{", ".join(self._SUPPORTED_DIALECTS)}. Consider using a different database.',
123125
)
124126

125127
# Create tables if they don't exist.
@@ -256,11 +258,21 @@ def _get_or_create_engine(self, configuration: Configuration) -> AsyncEngine:
256258
# Create connection string with path to default database
257259
connection_string = f'sqlite+aiosqlite:///{db_path}'
258260

259-
if 'sqlite' not in connection_string and 'postgresql' not in connection_string:
261+
if not any(connection_string.startswith(dialect) for dialect in self._SUPPORTED_DIALECTS):
260262
raise ValueError(
261-
'Unsupported database. Supported: sqlite, postgresql. Consider using a different database.'
263+
f'Unsupported database. Supported: {", ".join(self._SUPPORTED_DIALECTS)}. Consider using a different '
264+
'database.'
262265
)
263266

267+
kwargs: dict[str, Any] = {}
268+
if 'mysql' in connection_string or 'mariadb' in connection_string:
269+
connect_args: dict[str, Any] = {'connect_timeout': 30}
270+
# MySQL/MariaDB require READ COMMITTED isolation level for correct behavior in concurrent environments
271+
# without deadlocks.
272+
kwargs['isolation_level'] = 'READ COMMITTED'
273+
else:
274+
connect_args = {'timeout': 30}
275+
264276
self._engine = create_async_engine(
265277
connection_string,
266278
future=True,
@@ -270,6 +282,7 @@ def _get_or_create_engine(self, configuration: Configuration) -> AsyncEngine:
270282
pool_recycle=600,
271283
pool_pre_ping=True,
272284
echo=False,
273-
connect_args={'timeout': 30},
285+
connect_args=connect_args,
286+
**kwargs,
274287
)
275288
return self._engine

0 commit comments

Comments
 (0)