Skip to content

Commit cdf52c5

Browse files
fix: Atomic job reservation to prevent race condition (#1398)
Replace the non-atomic SELECT-then-UPDATE pattern in Job.reserve() with a single atomic UPDATE that includes status='pending' in the WHERE clause. Check cursor.rowcount to determine if the reservation succeeded. This eliminates the race window where multiple workers could simultaneously reserve the same job. The previous implementation allowed concurrent workers to both read status='pending' and then both successfully UPDATE (since the WHERE matched only on primary key). Now only the first UPDATE succeeds; all others see rowcount=0 and return False. Also reduces three database round-trips to one. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 3e75a16 commit cdf52c5

File tree

1 file changed

+27
-29
lines changed

1 file changed

+27
-29
lines changed

src/datajoint/jobs.py

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import platform
1414
import subprocess
1515

16-
from .condition import AndList, Not
16+
from .condition import AndList, Not, make_condition
1717
from .errors import DataJointError, DuplicateError
1818
from .heading import Heading
1919
from .table import Table
@@ -431,8 +431,10 @@ def reserve(self, key: dict) -> bool:
431431
"""
432432
Attempt to reserve a pending job for processing.
433433
434-
Updates status to ``'reserved'`` if currently ``'pending'`` and
435-
``scheduled_time <= now``.
434+
Atomically updates status to ``'reserved'`` if currently ``'pending'``
435+
and ``scheduled_time <= now``, using a single UPDATE with a WHERE clause
436+
that includes the status check. This prevents race conditions where
437+
multiple workers could reserve the same job simultaneously.
436438
437439
Parameters
438440
----------
@@ -444,33 +446,29 @@ def reserve(self, key: dict) -> bool:
444446
bool
445447
True if reservation successful, False if job not available.
446448
"""
447-
# Check if job is pending and scheduled (use CURRENT_TIMESTAMP(3) for datetime(3) precision)
448-
job = (self & key & "status='pending'" & "scheduled_time <= CURRENT_TIMESTAMP(3)").to_dicts()
449-
450-
if not job:
451-
return False
452-
453-
# Get server time for reserved_time
454-
server_now = self.connection.query("SELECT CURRENT_TIMESTAMP").fetchone()[0]
455-
456-
# Build update row with primary key and new values
457449
pk = self._get_pk(key)
458-
update_row = {
459-
**pk,
460-
"status": "reserved",
461-
"reserved_time": server_now,
462-
"host": platform.node(),
463-
"pid": os.getpid(),
464-
"connection_id": self.connection.connection_id,
465-
"user": self.connection.get_user(),
466-
"version": _get_job_version(),
467-
}
468-
469-
try:
470-
self.update1(update_row)
471-
return True
472-
except Exception:
473-
return False
450+
where = make_condition(self, pk, set())
451+
qi = self.adapter.quote_identifier
452+
assignments = ", ".join(
453+
f"{qi(k)}=%s"
454+
for k in ("status", "host", "pid", "connection_id", "user", "version")
455+
)
456+
query = (
457+
f"UPDATE {self.full_table_name} "
458+
f"SET {assignments}, {qi('reserved_time')}=CURRENT_TIMESTAMP(3) "
459+
f"WHERE {where} AND {qi('status')}='pending' "
460+
f"AND {qi('scheduled_time')} <= CURRENT_TIMESTAMP(3)"
461+
)
462+
args = [
463+
"reserved",
464+
platform.node(),
465+
os.getpid(),
466+
self.connection.connection_id,
467+
self.connection.get_user(),
468+
_get_job_version(),
469+
]
470+
cursor = self.connection.query(query, args=args)
471+
return cursor.rowcount == 1
474472

475473
def complete(self, key: dict, duration: float | None = None) -> None:
476474
"""

0 commit comments

Comments
 (0)