Skip to content

Commit c4276ac

Browse files
committed
refactor: replace NOW() with CURRENT_TIMESTAMP for portability
CURRENT_TIMESTAMP is standard SQL that works on both MySQL and PostgreSQL. This is a step toward multi-backend support. Files changed: - jobs.py: all NOW() calls replaced - autopopulate.py: pending job queries updated Note: INTERVAL syntax still differs between backends and will be handled by the database adapter.
1 parent d071fb9 commit c4276ac

File tree

2 files changed

+14
-14
lines changed

2 files changed

+14
-14
lines changed

src/datajoint/autopopulate.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -486,11 +486,11 @@ def handler(signum, frame):
486486
refresh = config.jobs.auto_refresh
487487
if refresh:
488488
# Use delay=-1 to ensure jobs are immediately schedulable
489-
# (avoids race condition with scheduled_time <= NOW(3) check)
489+
# (avoids race condition with scheduled_time <= CURRENT_TIMESTAMP(3) check)
490490
self.jobs.refresh(*restrictions, priority=priority, delay=-1)
491491

492-
# Fetch pending jobs ordered by priority (use NOW(3) to match CURRENT_TIMESTAMP(3) precision)
493-
pending_query = self.jobs.pending & "scheduled_time <= NOW(3)"
492+
# Fetch pending jobs ordered by priority
493+
pending_query = self.jobs.pending & "scheduled_time <= CURRENT_TIMESTAMP(3)"
494494
if priority is not None:
495495
pending_query = pending_query & f"priority <= {priority}"
496496

src/datajoint/jobs.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,8 @@ def refresh(
375375
new_key_list = new_keys.keys()
376376

377377
if new_key_list:
378-
# Always use MySQL server time for scheduling (NOW(3) matches datetime(3) precision)
379-
scheduled_time = self.connection.query(f"SELECT NOW(3) + INTERVAL {delay} SECOND").fetchone()[0]
378+
# Always use server time for scheduling
379+
scheduled_time = self.connection.query(f"SELECT CURRENT_TIMESTAMP(3) + INTERVAL {delay} SECOND").fetchone()[0]
380380

381381
for key in new_key_list:
382382
job_entry = {
@@ -402,19 +402,19 @@ def refresh(
402402
self.insert1({**key, "status": "pending", "priority": priority})
403403
result["re_pended"] += 1
404404

405-
# 3. Remove stale jobs (not ignore status) - use MySQL NOW() for consistent timing
405+
# 3. Remove stale jobs (not ignore status)
406406
if stale_timeout > 0:
407-
old_jobs = self & f"created_time < NOW() - INTERVAL {stale_timeout} SECOND" & 'status != "ignore"'
407+
old_jobs = self & f"created_time < CURRENT_TIMESTAMP - INTERVAL {stale_timeout} SECOND" & 'status != "ignore"'
408408

409409
for key in old_jobs.keys():
410410
# Check if key still in key_source
411411
if not (key_source & key):
412412
(self & key).delete_quick()
413413
result["removed"] += 1
414414

415-
# 4. Handle orphaned reserved jobs - use MySQL NOW() for consistent timing
415+
# 4. Handle orphaned reserved jobs
416416
if orphan_timeout is not None and orphan_timeout > 0:
417-
orphaned_jobs = self.reserved & f"reserved_time < NOW() - INTERVAL {orphan_timeout} SECOND"
417+
orphaned_jobs = self.reserved & f"reserved_time < CURRENT_TIMESTAMP - INTERVAL {orphan_timeout} SECOND"
418418

419419
for key in orphaned_jobs.keys():
420420
(self & key).delete_quick()
@@ -440,14 +440,14 @@ def reserve(self, key: dict) -> bool:
440440
bool
441441
True if reservation successful, False if job not available.
442442
"""
443-
# Check if job is pending and scheduled (use NOW(3) to match CURRENT_TIMESTAMP(3) precision)
444-
job = (self & key & 'status="pending"' & "scheduled_time <= NOW(3)").to_dicts()
443+
# Check if job is pending and scheduled
444+
job = (self & key & 'status="pending"' & "scheduled_time <= CURRENT_TIMESTAMP(3)").to_dicts()
445445

446446
if not job:
447447
return False
448448

449449
# Get MySQL server time for reserved_time
450-
server_now = self.connection.query("SELECT NOW()").fetchone()[0]
450+
server_now = self.connection.query("SELECT CURRENT_TIMESTAMP").fetchone()[0]
451451

452452
# Build update row with primary key and new values
453453
pk = self._get_pk(key)
@@ -490,7 +490,7 @@ def complete(self, key: dict, duration: float | None = None) -> None:
490490

491491
if config.jobs.keep_completed:
492492
# Use MySQL server time for completed_time
493-
server_now = self.connection.query("SELECT NOW()").fetchone()[0]
493+
server_now = self.connection.query("SELECT CURRENT_TIMESTAMP").fetchone()[0]
494494
pk = self._get_pk(key)
495495
update_row = {
496496
**pk,
@@ -520,7 +520,7 @@ def error(self, key: dict, error_message: str, error_stack: str | None = None) -
520520
error_message = error_message[: ERROR_MESSAGE_LENGTH - len(TRUNCATION_APPENDIX)] + TRUNCATION_APPENDIX
521521

522522
# Use MySQL server time for completed_time
523-
server_now = self.connection.query("SELECT NOW()").fetchone()[0]
523+
server_now = self.connection.query("SELECT CURRENT_TIMESTAMP").fetchone()[0]
524524

525525
pk = self._get_pk(key)
526526
update_row = {

0 commit comments

Comments
 (0)