From cdf52c5e41129b64844bbff78680c68c48237d89 Mon Sep 17 00:00:00 2001 From: Dimitri Yatsenko Date: Tue, 17 Feb 2026 12:07:04 -0600 Subject: [PATCH 1/2] fix: Atomic job reservation to prevent race condition (#1398) Replace the non-atomic SELECT-then-UPDATE pattern in Job.reserve() with a single atomic UPDATE that includes status='pending' in the WHERE clause. Check cursor.rowcount to determine if the reservation succeeded. This eliminates the race window where multiple workers could simultaneously reserve the same job. The previous implementation allowed concurrent workers to both read status='pending' and then both successfully UPDATE (since the WHERE matched only on primary key). Now only the first UPDATE succeeds; all others see rowcount=0 and return False. Also reduces three database round-trips to one. Co-Authored-By: Claude Opus 4.6 --- src/datajoint/jobs.py | 56 +++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/src/datajoint/jobs.py b/src/datajoint/jobs.py index e5499eb8e..82839f9a3 100644 --- a/src/datajoint/jobs.py +++ b/src/datajoint/jobs.py @@ -13,7 +13,7 @@ import platform import subprocess -from .condition import AndList, Not +from .condition import AndList, Not, make_condition from .errors import DataJointError, DuplicateError from .heading import Heading from .table import Table @@ -431,8 +431,10 @@ def reserve(self, key: dict) -> bool: """ Attempt to reserve a pending job for processing. - Updates status to ``'reserved'`` if currently ``'pending'`` and - ``scheduled_time <= now``. + Atomically updates status to ``'reserved'`` if currently ``'pending'`` + and ``scheduled_time <= now``, using a single UPDATE with a WHERE clause + that includes the status check. This prevents race conditions where + multiple workers could reserve the same job simultaneously. Parameters ---------- @@ -444,33 +446,29 @@ def reserve(self, key: dict) -> bool: bool True if reservation successful, False if job not available. """ - # Check if job is pending and scheduled (use CURRENT_TIMESTAMP(3) for datetime(3) precision) - job = (self & key & "status='pending'" & "scheduled_time <= CURRENT_TIMESTAMP(3)").to_dicts() - - if not job: - return False - - # Get server time for reserved_time - server_now = self.connection.query("SELECT CURRENT_TIMESTAMP").fetchone()[0] - - # Build update row with primary key and new values pk = self._get_pk(key) - update_row = { - **pk, - "status": "reserved", - "reserved_time": server_now, - "host": platform.node(), - "pid": os.getpid(), - "connection_id": self.connection.connection_id, - "user": self.connection.get_user(), - "version": _get_job_version(), - } - - try: - self.update1(update_row) - return True - except Exception: - return False + where = make_condition(self, pk, set()) + qi = self.adapter.quote_identifier + assignments = ", ".join( + f"{qi(k)}=%s" + for k in ("status", "host", "pid", "connection_id", "user", "version") + ) + query = ( + f"UPDATE {self.full_table_name} " + f"SET {assignments}, {qi('reserved_time')}=CURRENT_TIMESTAMP(3) " + f"WHERE {where} AND {qi('status')}='pending' " + f"AND {qi('scheduled_time')} <= CURRENT_TIMESTAMP(3)" + ) + args = [ + "reserved", + platform.node(), + os.getpid(), + self.connection.connection_id, + self.connection.get_user(), + _get_job_version(), + ] + cursor = self.connection.query(query, args=args) + return cursor.rowcount == 1 def complete(self, key: dict, duration: float | None = None) -> None: """ From 2bd8b2644a4aa5da2ff4034d37df3701ba0ece9f Mon Sep 17 00:00:00 2001 From: Dimitri Yatsenko Date: Tue, 17 Feb 2026 12:31:21 -0600 Subject: [PATCH 2/2] style: Format to satisfy ruff-format Co-Authored-By: Claude Opus 4.6 --- src/datajoint/jobs.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/datajoint/jobs.py b/src/datajoint/jobs.py index 82839f9a3..5a0eb2a86 100644 --- a/src/datajoint/jobs.py +++ b/src/datajoint/jobs.py @@ -449,10 +449,7 @@ def reserve(self, key: dict) -> bool: pk = self._get_pk(key) where = make_condition(self, pk, set()) qi = self.adapter.quote_identifier - assignments = ", ".join( - f"{qi(k)}=%s" - for k in ("status", "host", "pid", "connection_id", "user", "version") - ) + assignments = ", ".join(f"{qi(k)}=%s" for k in ("status", "host", "pid", "connection_id", "user", "version")) query = ( f"UPDATE {self.full_table_name} " f"SET {assignments}, {qi('reserved_time')}=CURRENT_TIMESTAMP(3) "