diff --git a/tasktiger/_internal.py b/tasktiger/_internal.py index 6f31d239..7b00b575 100644 --- a/tasktiger/_internal.py +++ b/tasktiger/_internal.py @@ -27,6 +27,11 @@ from .task import Task from .tasktiger import TaskTiger +# Constants pertaining to Redis keys +TASK = "task" +EXECUTIONS = "executions" +EXECUTIONS_COUNT = "executions_count" + # Task states (represented by different queues) # Note some client code may rely on the string values (e.g. get_queue_stats). QUEUED = "queued" diff --git a/tasktiger/migrations.py b/tasktiger/migrations.py index 1b3a0a5b..68bbb2c7 100644 --- a/tasktiger/migrations.py +++ b/tasktiger/migrations.py @@ -1,5 +1,6 @@ from typing import TYPE_CHECKING +from ._internal import EXECUTIONS, TASK from .utils import redis_glob_escape if TYPE_CHECKING: @@ -22,7 +23,8 @@ def migrate_executions_count(tiger: "TaskTiger") -> None: ) match = ( - redis_glob_escape(tiger.config["REDIS_PREFIX"]) + ":task:*:executions" + redis_glob_escape(tiger.config["REDIS_PREFIX"]) + + f":{TASK}:*:{EXECUTIONS}" ) for key in tiger.connection.scan_iter(count=100, match=match): diff --git a/tasktiger/redis_scripts.py b/tasktiger/redis_scripts.py index 29e4c541..127e1ef7 100644 --- a/tasktiger/redis_scripts.py +++ b/tasktiger/redis_scripts.py @@ -3,7 +3,15 @@ from redis import Redis -from ._internal import ACTIVE, ERROR, QUEUED, SCHEDULED +from ._internal import ( + ACTIVE, + ERROR, + EXECUTIONS, + EXECUTIONS_COUNT, + QUEUED, + SCHEDULED, + TASK, +) try: from redis.commands.core import Script @@ -595,9 +603,9 @@ def _bool_to_str(v: bool) -> str: def _none_to_empty_str(v: Optional[str]) -> str: return v or "" - key_task_id = key_func("task", id) - key_task_id_executions = key_func("task", id, "executions") - key_task_id_executions_count = key_func("task", id, "executions_count") + key_task_id = key_func(TASK, id) + key_task_id_executions = key_func(TASK, id, EXECUTIONS) + key_task_id_executions_count = key_func(TASK, id, EXECUTIONS_COUNT) key_from_state = key_func(from_state) key_to_state = key_func(to_state) if to_state else "" key_active_queue = key_func(ACTIVE, queue) diff --git a/tasktiger/task.py b/tasktiger/task.py index db034a6a..0e08fc8f 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -20,8 +20,11 @@ from ._internal import ( ERROR, + EXECUTIONS, + EXECUTIONS_COUNT, QUEUED, SCHEDULED, + TASK, g, gen_id, gen_unique_id, @@ -393,7 +396,7 @@ def delay( pipeline = tiger.connection.pipeline() pipeline.sadd(tiger._key(state), self.queue) - pipeline.set(tiger._key("task", self.id), serialized_task) + pipeline.set(tiger._key(TASK, self.id), serialized_task) # In case of unique tasks, don't update the score. tiger.scripts.zadd( tiger._key(state, self.queue), @@ -454,11 +457,11 @@ def from_id( latest). If the task doesn't exist, None is returned. """ pipeline = tiger.connection.pipeline() - pipeline.get(tiger._key("task", task_id)) + pipeline.get(tiger._key(TASK, task_id)) pipeline.zscore(tiger._key(state, queue), task_id) if load_executions: pipeline.lrange( - tiger._key("task", task_id, "executions"), -load_executions, -1 + tiger._key(TASK, task_id, EXECUTIONS), -load_executions, -1 ) ( serialized_data, @@ -526,10 +529,10 @@ def tasks_from_queue( ] if load_executions: pipeline = tiger.connection.pipeline() - pipeline.mget([tiger._key("task", item[0]) for item in items]) + pipeline.mget([tiger._key(TASK, item[0]) for item in items]) for item in items: pipeline.lrange( - tiger._key("task", item[0], "executions"), + tiger._key(TASK, item[0], EXECUTIONS), -load_executions, -1, ) @@ -586,8 +589,8 @@ def n_executions(self) -> int: Queries and returns the number of past task executions. """ pipeline = self.tiger.connection.pipeline() - pipeline.exists(self.tiger._key("task", self.id)) - pipeline.get(self.tiger._key("task", self.id, "executions_count")) + pipeline.exists(self.tiger._key(TASK, self.id)) + pipeline.get(self.tiger._key(TASK, self.id, EXECUTIONS_COUNT)) exists, executions_count = pipeline.execute() if not exists: diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 63fa8fd7..9f944e49 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -36,8 +36,11 @@ from ._internal import ( ACTIVE, ERROR, + EXECUTIONS, + EXECUTIONS_COUNT, QUEUED, SCHEDULED, + TASK, dotted_parts, g, g_fork_lock, @@ -345,7 +348,7 @@ def _worker_queue_expired_tasks(self) -> None: self.config["REQUEUE_EXPIRED_TASKS_BATCH_SIZE"], ) - for (queue, task_id) in task_data: + for queue, task_id in task_data: self.log.debug("expiring task", queue=queue, task_id=task_id) self._did_work = True try: @@ -374,7 +377,7 @@ def _worker_queue_expired_tasks(self) -> None: # have a task without a task object. # XXX: Ideally, the following block should be atomic. - if not self.connection.get(self._key("task", task_id)): + if not self.connection.get(self._key(TASK, task_id)): self.log.error("not found", queue=queue, task_id=task_id) task = Task( self.tiger, @@ -812,7 +815,7 @@ def _process_queue_tasks( # Get all tasks serialized_tasks = self.connection.mget( - [self._key("task", task_id) for task_id in task_ids] + [self._key(TASK, task_id) for task_id in task_ids] ) # Parse tasks @@ -1053,7 +1056,7 @@ def _mark_done() -> None: should_log_error = True # Get execution info (for logging and retry purposes) execution = self.connection.lindex( - self._key("task", task.id, "executions"), -1 + self._key(TASK, task.id, EXECUTIONS), -1 ) if execution: @@ -1242,10 +1245,8 @@ def _store_task_execution( serialized_execution = json.dumps(execution) for task in tasks: - executions_key = self._key("task", task.id, "executions") - executions_count_key = self._key( - "task", task.id, "executions_count" - ) + executions_key = self._key(TASK, task.id, EXECUTIONS) + executions_count_key = self._key(TASK, task.id, EXECUTIONS_COUNT) pipeline = self.connection.pipeline() pipeline.incr(executions_count_key)