From b9a5e87dbd91adb05096654609e3aeb065c23b01 Mon Sep 17 00:00:00 2001 From: Vishesh Bansal Date: Sun, 12 Nov 2023 00:19:59 +0530 Subject: [PATCH 1/3] Refactor Redis keys to avoid duplicate string literals --- tasktiger/constants.py | 11 +++++ tasktiger/migrations.py | 4 +- tests/test_base.py | 69 +++++++++++++++++++--------- tests/test_logging.py | 3 +- tests/test_migrations.py | 90 +++++++++++++++++++++++++++++-------- tests/test_redis_scripts.py | 22 ++++----- 6 files changed, 146 insertions(+), 53 deletions(-) create mode 100644 tasktiger/constants.py diff --git a/tasktiger/constants.py b/tasktiger/constants.py new file mode 100644 index 00000000..6baf92dc --- /dev/null +++ b/tasktiger/constants.py @@ -0,0 +1,11 @@ +# constants pertaining to Redis keys +REDIS_PREFIX = "t" +TASK = "task" +EXECUTIONS = "executions" +EXECUTIONS_COUNT = "executions_count" + + +QUEUED = "queued" +ACTIVE = "active" +SCHEDULED = "scheduled" +ERROR = "error" diff --git a/tasktiger/migrations.py b/tasktiger/migrations.py index 1b3a0a5b..3a80a2a8 100644 --- a/tasktiger/migrations.py +++ b/tasktiger/migrations.py @@ -1,5 +1,6 @@ from typing import TYPE_CHECKING +from .constants import EXECUTIONS, TASK from .utils import redis_glob_escape if TYPE_CHECKING: @@ -22,7 +23,8 @@ def migrate_executions_count(tiger: "TaskTiger") -> None: ) match = ( - redis_glob_escape(tiger.config["REDIS_PREFIX"]) + ":task:*:executions" + redis_glob_escape(tiger.config["REDIS_PREFIX"]) + + f":{TASK}:*:{EXECUTIONS}" ) for key in tiger.connection.scan_iter(count=100, match=match): diff --git a/tests/test_base.py b/tests/test_base.py index c8774999..4c5e32c8 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -21,6 +21,16 @@ linear, ) from tasktiger._internal import serialize_func_name +from tasktiger.constants import ( + ACTIVE, + ERROR, + EXECUTIONS, + EXECUTIONS_COUNT, + QUEUED, + REDIS_PREFIX, + SCHEDULED, + TASK, +) from .config import DELAY from .tasks import ( @@ -69,7 +79,6 @@ def teardown_method(self, method): def _ensure_queues( self, queued=None, active=None, error=None, scheduled=None ): - expected_queues = { "queued": {name for name, n in (queued or {}).items() if n}, "active": {name for name, n in (active or {}).items() if n}, @@ -89,7 +98,9 @@ def _ensure_queue(typ, data): task_ids = self.conn.zrange("t:%s:%s" % (typ, name), 0, -1) assert len(task_ids) == n ret[name] = [ - json.loads(self.conn.get("t:task:%s" % task_id)) + json.loads( + self.conn.get(f"{REDIS_PREFIX}:{TASK}:%s" % task_id) + ) for task_id in task_ids ] assert [task["id"] for task in ret[name]] == task_ids @@ -127,7 +138,7 @@ def test_simple_task(self): Worker(self.tiger).run(once=True) self._ensure_queues(queued={"default": 0}) - assert not self.conn.exists("t:task:%s" % task["id"]) + assert not self.conn.exists(f"{REDIS_PREFIX}:{TASK}:%s" % task["id"]) @pytest.mark.skipif( sys.version_info < (3, 3), reason="__qualname__ unavailable" @@ -140,7 +151,7 @@ def test_staticmethod_task(self): Worker(self.tiger).run(once=True) self._ensure_queues(queued={"default": 0}) - assert not self.conn.exists("t:task:%s" % task["id"]) + assert not self.conn.exists(f"{REDIS_PREFIX}:{TASK}:%s" % task["id"]) def test_task_delay(self): decorated_task.delay(1, 2, a=3, b=4) @@ -258,7 +269,7 @@ def test_exception_task(self, store_tracebacks): assert task["func"] == "tests.tasks:exception_task" executions = self.conn.lrange( - "t:task:%s:executions" % task["id"], 0, -1 + f"{REDIS_PREFIX}:{TASK}:%s:{EXECUTIONS}" % task["id"], 0, -1 ) assert len(executions) == 1 execution = json.loads(executions[0]) @@ -274,7 +285,9 @@ def test_exception_task(self, store_tracebacks): @pytest.mark.parametrize("max_stored_executions", [2, 3, 6, 11, None]) def test_max_stored_executions(self, max_stored_executions): def _get_stored_executions(): - return self.conn.llen(f"t:task:{task.id}:executions") + return self.conn.llen( + f"{REDIS_PREFIX}:{TASK}:{task.id}:{EXECUTIONS}" + ) task = self.tiger.delay( exception_task, @@ -309,7 +322,7 @@ def test_long_task_killed(self): assert task["func"] == "tests.tasks:long_task_killed" executions = self.conn.lrange( - "t:task:%s:executions" % task["id"], 0, -1 + f"{REDIS_PREFIX}:{TASK}:%s:{EXECUTIONS}" % task["id"], 0, -1 ) assert len(executions) == 1 execution = json.loads(executions[0]) @@ -664,9 +677,17 @@ def test_retry_executions_count(self, count): Worker(self.tiger).run(once=True) assert ( - int(self.conn.get(f"t:task:{task.id}:executions_count")) == count + int( + self.conn.get( + f"{REDIS_PREFIX}:{TASK}:{task.id}:{EXECUTIONS_COUNT}" + ) + ) + == count + ) + assert ( + self.conn.llen(f"{REDIS_PREFIX}:{TASK}:{task.id}:{EXECUTIONS}") + == count ) - assert self.conn.llen(f"t:task:{task.id}:executions") == count def test_batch_1(self): self.tiger.delay(batch_task, args=[1]) @@ -1059,11 +1080,15 @@ def test_update_scheduled_time(self): task = Task(self.tiger, simple_task, unique=True) task.delay(when=datetime.timedelta(minutes=5)) self._ensure_queues(scheduled={"default": 1}) - old_score = self.conn.zscore("t:scheduled:default", task.id) + old_score = self.conn.zscore( + f"{REDIS_PREFIX}:{SCHEDULED}:default", task.id + ) task.update_scheduled_time(when=datetime.timedelta(minutes=6)) self._ensure_queues(scheduled={"default": 1}) - new_score = self.conn.zscore("t:scheduled:default", task.id) + new_score = self.conn.zscore( + f"{REDIS_PREFIX}:{SCHEDULED}:default", task.id + ) # The difference can be slightly over 60 due to processing time, but # shouldn't be much higher. @@ -1331,16 +1356,16 @@ def test_task_disappears(self): time.sleep(DELAY) # Remove the task object while the task is processing. - assert self.conn.delete("t:task:{}".format(task.id)) == 1 + assert self.conn.delete(f"{REDIS_PREFIX}:{TASK}:{task.id}") == 1 # Kill the worker while it's still processing the task. os.kill(worker.pid, signal.SIGKILL) # _ensure_queues() breaks here because it can't find the task - assert self.conn.scard("t:queued") == 0 - assert self.conn.scard("t:active") == 1 - assert self.conn.scard("t:error") == 0 - assert self.conn.scard("t:scheduled") == 0 + assert self.conn.scard(f"{REDIS_PREFIX}:{QUEUED}") == 0 + assert self.conn.scard(f"{REDIS_PREFIX}:{ACTIVE}") == 1 + assert self.conn.scard(f"{REDIS_PREFIX}:{ERROR}") == 0 + assert self.conn.scard(f"{REDIS_PREFIX}:{SCHEDULED}") == 0 # Capture logger errors = [] @@ -1356,10 +1381,10 @@ def fake_error(msg): Worker(self.tiger).run(once=True) assert len(errors) == 0 - assert self.conn.scard("t:queued") == 0 - assert self.conn.scard("t:active") == 1 - assert self.conn.scard("t:error") == 0 - assert self.conn.scard("t:scheduled") == 0 + assert self.conn.scard(f"{REDIS_PREFIX}:{QUEUED}") == 0 + assert self.conn.scard(f"{REDIS_PREFIX}:{ACTIVE}") == 1 + assert self.conn.scard(f"{REDIS_PREFIX}:{ERROR}") == 0 + assert self.conn.scard(f"{REDIS_PREFIX}:{SCHEDULED}") == 0 # After waiting and re-running the worker, queues will clear. time.sleep(2 * DELAY) @@ -1407,7 +1432,7 @@ def test_child_hanging_forever(self): assert task["func"] == "tests.tasks:sleep_task" executions = self.conn.lrange( - "t:task:%s:executions" % task["id"], 0, -1 + f"{REDIS_PREFIX}:{TASK}:%s:{EXECUTIONS}" % task["id"], 0, -1 ) assert len(executions) == 1 execution = json.loads(executions[0]) @@ -1464,7 +1489,7 @@ def test_decorated_child_hard_timeout_precedence(self): assert task["func"] == "tests.tasks:decorated_task_sleep_timeout" executions = self.conn.lrange( - "t:task:%s:executions" % task["id"], 0, -1 + f"{REDIS_PREFIX}:{TASK}:%s:{EXECUTIONS}" % task["id"], 0, -1 ) assert len(executions) == 1 execution = json.loads(executions[0]) diff --git a/tests/test_logging.py b/tests/test_logging.py index d74ae914..67c44549 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -4,6 +4,7 @@ import structlog from tasktiger import TaskTiger, Worker +from tasktiger.constants import REDIS_PREFIX, TASK from tasktiger.logging import tasktiger_processor from .test_base import BaseTestCase @@ -55,7 +56,7 @@ def test_structlog_processor(self): Worker(self.tiger).run(once=True) self._ensure_queues(queued={"foo_qux": 0}) - assert not self.conn.exists("t:task:%s" % task["id"]) + assert not self.conn.exists(f"{REDIS_PREFIX}:{TASK}:%s" % task["id"]) class TestSetupStructlog(BaseTestCase): diff --git a/tests/test_migrations.py b/tests/test_migrations.py index ce44f287..fb17366b 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -2,6 +2,12 @@ import pytest +from tasktiger.constants import ( + EXECUTIONS, + EXECUTIONS_COUNT, + REDIS_PREFIX, + TASK, +) from tasktiger.migrations import migrate_executions_count from .test_base import BaseTestCase @@ -15,11 +21,11 @@ def test_migrate_nothing(self): @pytest.mark.parametrize( "key", [ - f"foot:task:{uuid.uuid4()}:executions", - f"foo:t:task:{uuid.uuid4()}:executions", - f"t:task:{uuid.uuid4()}:executionsfoo", - f"t:task:{uuid.uuid4()}:executions:foo", - f"t:task:{uuid.uuid4()}", + f"foot:{TASK}:{uuid.uuid4()}:{EXECUTIONS}", + f"foo:{REDIS_PREFIX}:{TASK}:{uuid.uuid4()}:{EXECUTIONS}", + f"{REDIS_PREFIX}:{TASK}:{uuid.uuid4()}:executionsfoo", + f"{REDIS_PREFIX}:{TASK}:{uuid.uuid4()}:{EXECUTIONS}:foo", + f"{REDIS_PREFIX}:{TASK}:{uuid.uuid4()}", ], ) def test_migrate_ignores_irrelevant_keys(self, key): @@ -33,47 +39,93 @@ def test_migrate(self): task_id_2 = uuid.uuid4() for __ in range(73): - self.conn.rpush(f"t:task:{task_id_1}:executions", "{}") + self.conn.rpush( + f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS}", "{}" + ) for __ in range(35): - self.conn.rpush(f"t:task:{task_id_2}:executions", "{}") + self.conn.rpush( + f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS}", "{}" + ) migrate_executions_count(self.tiger) - assert self.conn.get(f"t:task:{task_id_1}:executions_count") == "73" - assert self.conn.get(f"t:task:{task_id_2}:executions_count") == "35" + assert ( + self.conn.get( + f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS_COUNT}" + ) + == "73" + ) + assert ( + self.conn.get( + f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS_COUNT}" + ) + == "35" + ) def test_migrate_when_some_tasks_already_migrated(self): task_id_1 = uuid.uuid4() task_id_2 = uuid.uuid4() for __ in range(73): - self.conn.rpush(f"t:task:{task_id_1}:executions", "{}") + self.conn.rpush( + f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS}", "{}" + ) - self.conn.set(f"t:task:{task_id_1}:executions_count", 91) + self.conn.set( + f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS_COUNT}", 91 + ) for __ in range(35): - self.conn.rpush(f"t:task:{task_id_2}:executions", "{}") + self.conn.rpush( + f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS}", "{}" + ) migrate_executions_count(self.tiger) - assert self.conn.get(f"t:task:{task_id_2}:executions_count") == "35" + assert ( + self.conn.get( + f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS_COUNT}" + ) + == "35" + ) # looks migrated already - left untouched - assert self.conn.get(f"t:task:{task_id_1}:executions_count") == "91" + assert ( + self.conn.get( + f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS_COUNT}" + ) + == "91" + ) def test_migrate_when_counter_is_behind(self): task_id_1 = uuid.uuid4() task_id_2 = uuid.uuid4() for __ in range(73): - self.conn.rpush(f"t:task:{task_id_1}:executions", "{}") + self.conn.rpush( + f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS}", "{}" + ) - self.conn.set(f"t:task:{task_id_1}:executions_count", 10) + self.conn.set( + f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS_COUNT}", 10 + ) for __ in range(35): - self.conn.rpush(f"t:task:{task_id_2}:executions", "{}") + self.conn.rpush( + f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS}", "{}" + ) migrate_executions_count(self.tiger) - assert self.conn.get(f"t:task:{task_id_2}:executions_count") == "35" + assert ( + self.conn.get( + f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS_COUNT}" + ) + == "35" + ) # updated because the counter value was less than the actual count - assert self.conn.get(f"t:task:{task_id_1}:executions_count") == "73" + assert ( + self.conn.get( + f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS_COUNT}" + ) + == "73" + ) diff --git a/tests/test_redis_scripts.py b/tests/test_redis_scripts.py index 00879362..34af65a3 100644 --- a/tests/test_redis_scripts.py +++ b/tests/test_redis_scripts.py @@ -1,3 +1,5 @@ +from tasktiger.constants import ACTIVE, REDIS_PREFIX + from .utils import get_tiger @@ -125,7 +127,7 @@ def test_zpoppush_on_success_1(self, **kwargs): score=None, new_score=10, on_success=("update_sets", "val", "remove_set", "add_set"), - **kwargs + **kwargs, ) assert result == ["a", "b"] @@ -151,7 +153,7 @@ def test_zpoppush_on_success_2(self, **kwargs): score=0, new_score=10, on_success=("update_sets", "val", "remove_set", "add_set"), - **kwargs + **kwargs, ) assert result == [] @@ -177,7 +179,7 @@ def test_zpoppush_on_success_3(self, **kwargs): score=None, new_score=10, on_success=("update_sets", "val", "remove_set", "add_set"), - **kwargs + **kwargs, ) assert result == ["a", "b", "c", "d"] @@ -321,13 +323,13 @@ def test_delete_if_not_in_zsets_3(self): assert self.conn.exists("bar") == 0 def test_get_expired_tasks(self): - self.conn.sadd("t:active", "q1", "q2", "q3", "q4") - self.conn.zadd("t:active:q1", {"t1": 500}) - self.conn.zadd("t:active:q1", {"t2": 1000}) - self.conn.zadd("t:active:q1", {"t3": 1500}) - self.conn.zadd("t:active:q2", {"t4": 1200}) - self.conn.zadd("t:active:q3", {"t5": 1800}) - self.conn.zadd("t:active:q4", {"t6": 200}) + self.conn.sadd(f"{REDIS_PREFIX}:{ACTIVE}", "q1", "q2", "q3", "q4") + self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q1", {"t1": 500}) + self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q1", {"t2": 1000}) + self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q1", {"t3": 1500}) + self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q2", {"t4": 1200}) + self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q3", {"t5": 1800}) + self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q4", {"t6": 200}) expired_task_set = {("q1", "t1"), ("q1", "t2"), ("q4", "t6")} From 94eba11719b8ebbe722e02edbdfc48824fe480f2 Mon Sep 17 00:00:00 2001 From: Vishesh Bansal Date: Thu, 16 Nov 2023 23:40:26 +0530 Subject: [PATCH 2/3] Revert refactor of Redis keys in test cases --- tasktiger/redis_scripts.py | 7 +-- tasktiger/task.py | 15 ++++--- tasktiger/worker.py | 15 +++---- tests/test_base.py | 69 +++++++++------------------- tests/test_logging.py | 3 +- tests/test_migrations.py | 90 ++++++++----------------------------- tests/test_redis_scripts.py | 22 +++++---- 7 files changed, 71 insertions(+), 150 deletions(-) diff --git a/tasktiger/redis_scripts.py b/tasktiger/redis_scripts.py index 29e4c541..6c068e81 100644 --- a/tasktiger/redis_scripts.py +++ b/tasktiger/redis_scripts.py @@ -4,6 +4,7 @@ from redis import Redis from ._internal import ACTIVE, ERROR, QUEUED, SCHEDULED +from .constants import EXECUTIONS, EXECUTIONS_COUNT, TASK try: from redis.commands.core import Script @@ -595,9 +596,9 @@ def _bool_to_str(v: bool) -> str: def _none_to_empty_str(v: Optional[str]) -> str: return v or "" - key_task_id = key_func("task", id) - key_task_id_executions = key_func("task", id, "executions") - key_task_id_executions_count = key_func("task", id, "executions_count") + key_task_id = key_func(TASK, id) + key_task_id_executions = key_func(TASK, id, EXECUTIONS) + key_task_id_executions_count = key_func(TASK, id, EXECUTIONS_COUNT) key_from_state = key_func(from_state) key_to_state = key_func(to_state) if to_state else "" key_active_queue = key_func(ACTIVE, queue) diff --git a/tasktiger/task.py b/tasktiger/task.py index db034a6a..c723d32f 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -30,6 +30,7 @@ serialize_func_name, serialize_retry_method, ) +from .constants import EXECUTIONS, EXECUTIONS_COUNT, TASK from .exceptions import QueueFullException, TaskImportError, TaskNotFound from .runner import BaseRunner, get_runner_class from .types import RetryStrategy @@ -393,7 +394,7 @@ def delay( pipeline = tiger.connection.pipeline() pipeline.sadd(tiger._key(state), self.queue) - pipeline.set(tiger._key("task", self.id), serialized_task) + pipeline.set(tiger._key(TASK, self.id), serialized_task) # In case of unique tasks, don't update the score. tiger.scripts.zadd( tiger._key(state, self.queue), @@ -454,11 +455,11 @@ def from_id( latest). If the task doesn't exist, None is returned. """ pipeline = tiger.connection.pipeline() - pipeline.get(tiger._key("task", task_id)) + pipeline.get(tiger._key(TASK, task_id)) pipeline.zscore(tiger._key(state, queue), task_id) if load_executions: pipeline.lrange( - tiger._key("task", task_id, "executions"), -load_executions, -1 + tiger._key(TASK, task_id, EXECUTIONS), -load_executions, -1 ) ( serialized_data, @@ -526,10 +527,10 @@ def tasks_from_queue( ] if load_executions: pipeline = tiger.connection.pipeline() - pipeline.mget([tiger._key("task", item[0]) for item in items]) + pipeline.mget([tiger._key(TASK, item[0]) for item in items]) for item in items: pipeline.lrange( - tiger._key("task", item[0], "executions"), + tiger._key(TASK, item[0], EXECUTIONS), -load_executions, -1, ) @@ -586,8 +587,8 @@ def n_executions(self) -> int: Queries and returns the number of past task executions. """ pipeline = self.tiger.connection.pipeline() - pipeline.exists(self.tiger._key("task", self.id)) - pipeline.get(self.tiger._key("task", self.id, "executions_count")) + pipeline.exists(self.tiger._key(TASK, self.id)) + pipeline.get(self.tiger._key(TASK, self.id, EXECUTIONS_COUNT)) exists, executions_count = pipeline.execute() if not exists: diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 63fa8fd7..b322540f 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -47,6 +47,7 @@ serialize_func_name, serialize_retry_method, ) +from .constants import EXECUTIONS, EXECUTIONS_COUNT, TASK from .exceptions import ( RetryException, StopRetry, @@ -345,7 +346,7 @@ def _worker_queue_expired_tasks(self) -> None: self.config["REQUEUE_EXPIRED_TASKS_BATCH_SIZE"], ) - for (queue, task_id) in task_data: + for queue, task_id in task_data: self.log.debug("expiring task", queue=queue, task_id=task_id) self._did_work = True try: @@ -374,7 +375,7 @@ def _worker_queue_expired_tasks(self) -> None: # have a task without a task object. # XXX: Ideally, the following block should be atomic. - if not self.connection.get(self._key("task", task_id)): + if not self.connection.get(self._key(TASK, task_id)): self.log.error("not found", queue=queue, task_id=task_id) task = Task( self.tiger, @@ -812,7 +813,7 @@ def _process_queue_tasks( # Get all tasks serialized_tasks = self.connection.mget( - [self._key("task", task_id) for task_id in task_ids] + [self._key(TASK, task_id) for task_id in task_ids] ) # Parse tasks @@ -1053,7 +1054,7 @@ def _mark_done() -> None: should_log_error = True # Get execution info (for logging and retry purposes) execution = self.connection.lindex( - self._key("task", task.id, "executions"), -1 + self._key(TASK, task.id, EXECUTIONS), -1 ) if execution: @@ -1242,10 +1243,8 @@ def _store_task_execution( serialized_execution = json.dumps(execution) for task in tasks: - executions_key = self._key("task", task.id, "executions") - executions_count_key = self._key( - "task", task.id, "executions_count" - ) + executions_key = self._key(TASK, task.id, EXECUTIONS) + executions_count_key = self._key(TASK, task.id, EXECUTIONS_COUNT) pipeline = self.connection.pipeline() pipeline.incr(executions_count_key) diff --git a/tests/test_base.py b/tests/test_base.py index 4c5e32c8..c8774999 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -21,16 +21,6 @@ linear, ) from tasktiger._internal import serialize_func_name -from tasktiger.constants import ( - ACTIVE, - ERROR, - EXECUTIONS, - EXECUTIONS_COUNT, - QUEUED, - REDIS_PREFIX, - SCHEDULED, - TASK, -) from .config import DELAY from .tasks import ( @@ -79,6 +69,7 @@ def teardown_method(self, method): def _ensure_queues( self, queued=None, active=None, error=None, scheduled=None ): + expected_queues = { "queued": {name for name, n in (queued or {}).items() if n}, "active": {name for name, n in (active or {}).items() if n}, @@ -98,9 +89,7 @@ def _ensure_queue(typ, data): task_ids = self.conn.zrange("t:%s:%s" % (typ, name), 0, -1) assert len(task_ids) == n ret[name] = [ - json.loads( - self.conn.get(f"{REDIS_PREFIX}:{TASK}:%s" % task_id) - ) + json.loads(self.conn.get("t:task:%s" % task_id)) for task_id in task_ids ] assert [task["id"] for task in ret[name]] == task_ids @@ -138,7 +127,7 @@ def test_simple_task(self): Worker(self.tiger).run(once=True) self._ensure_queues(queued={"default": 0}) - assert not self.conn.exists(f"{REDIS_PREFIX}:{TASK}:%s" % task["id"]) + assert not self.conn.exists("t:task:%s" % task["id"]) @pytest.mark.skipif( sys.version_info < (3, 3), reason="__qualname__ unavailable" @@ -151,7 +140,7 @@ def test_staticmethod_task(self): Worker(self.tiger).run(once=True) self._ensure_queues(queued={"default": 0}) - assert not self.conn.exists(f"{REDIS_PREFIX}:{TASK}:%s" % task["id"]) + assert not self.conn.exists("t:task:%s" % task["id"]) def test_task_delay(self): decorated_task.delay(1, 2, a=3, b=4) @@ -269,7 +258,7 @@ def test_exception_task(self, store_tracebacks): assert task["func"] == "tests.tasks:exception_task" executions = self.conn.lrange( - f"{REDIS_PREFIX}:{TASK}:%s:{EXECUTIONS}" % task["id"], 0, -1 + "t:task:%s:executions" % task["id"], 0, -1 ) assert len(executions) == 1 execution = json.loads(executions[0]) @@ -285,9 +274,7 @@ def test_exception_task(self, store_tracebacks): @pytest.mark.parametrize("max_stored_executions", [2, 3, 6, 11, None]) def test_max_stored_executions(self, max_stored_executions): def _get_stored_executions(): - return self.conn.llen( - f"{REDIS_PREFIX}:{TASK}:{task.id}:{EXECUTIONS}" - ) + return self.conn.llen(f"t:task:{task.id}:executions") task = self.tiger.delay( exception_task, @@ -322,7 +309,7 @@ def test_long_task_killed(self): assert task["func"] == "tests.tasks:long_task_killed" executions = self.conn.lrange( - f"{REDIS_PREFIX}:{TASK}:%s:{EXECUTIONS}" % task["id"], 0, -1 + "t:task:%s:executions" % task["id"], 0, -1 ) assert len(executions) == 1 execution = json.loads(executions[0]) @@ -677,17 +664,9 @@ def test_retry_executions_count(self, count): Worker(self.tiger).run(once=True) assert ( - int( - self.conn.get( - f"{REDIS_PREFIX}:{TASK}:{task.id}:{EXECUTIONS_COUNT}" - ) - ) - == count - ) - assert ( - self.conn.llen(f"{REDIS_PREFIX}:{TASK}:{task.id}:{EXECUTIONS}") - == count + int(self.conn.get(f"t:task:{task.id}:executions_count")) == count ) + assert self.conn.llen(f"t:task:{task.id}:executions") == count def test_batch_1(self): self.tiger.delay(batch_task, args=[1]) @@ -1080,15 +1059,11 @@ def test_update_scheduled_time(self): task = Task(self.tiger, simple_task, unique=True) task.delay(when=datetime.timedelta(minutes=5)) self._ensure_queues(scheduled={"default": 1}) - old_score = self.conn.zscore( - f"{REDIS_PREFIX}:{SCHEDULED}:default", task.id - ) + old_score = self.conn.zscore("t:scheduled:default", task.id) task.update_scheduled_time(when=datetime.timedelta(minutes=6)) self._ensure_queues(scheduled={"default": 1}) - new_score = self.conn.zscore( - f"{REDIS_PREFIX}:{SCHEDULED}:default", task.id - ) + new_score = self.conn.zscore("t:scheduled:default", task.id) # The difference can be slightly over 60 due to processing time, but # shouldn't be much higher. @@ -1356,16 +1331,16 @@ def test_task_disappears(self): time.sleep(DELAY) # Remove the task object while the task is processing. - assert self.conn.delete(f"{REDIS_PREFIX}:{TASK}:{task.id}") == 1 + assert self.conn.delete("t:task:{}".format(task.id)) == 1 # Kill the worker while it's still processing the task. os.kill(worker.pid, signal.SIGKILL) # _ensure_queues() breaks here because it can't find the task - assert self.conn.scard(f"{REDIS_PREFIX}:{QUEUED}") == 0 - assert self.conn.scard(f"{REDIS_PREFIX}:{ACTIVE}") == 1 - assert self.conn.scard(f"{REDIS_PREFIX}:{ERROR}") == 0 - assert self.conn.scard(f"{REDIS_PREFIX}:{SCHEDULED}") == 0 + assert self.conn.scard("t:queued") == 0 + assert self.conn.scard("t:active") == 1 + assert self.conn.scard("t:error") == 0 + assert self.conn.scard("t:scheduled") == 0 # Capture logger errors = [] @@ -1381,10 +1356,10 @@ def fake_error(msg): Worker(self.tiger).run(once=True) assert len(errors) == 0 - assert self.conn.scard(f"{REDIS_PREFIX}:{QUEUED}") == 0 - assert self.conn.scard(f"{REDIS_PREFIX}:{ACTIVE}") == 1 - assert self.conn.scard(f"{REDIS_PREFIX}:{ERROR}") == 0 - assert self.conn.scard(f"{REDIS_PREFIX}:{SCHEDULED}") == 0 + assert self.conn.scard("t:queued") == 0 + assert self.conn.scard("t:active") == 1 + assert self.conn.scard("t:error") == 0 + assert self.conn.scard("t:scheduled") == 0 # After waiting and re-running the worker, queues will clear. time.sleep(2 * DELAY) @@ -1432,7 +1407,7 @@ def test_child_hanging_forever(self): assert task["func"] == "tests.tasks:sleep_task" executions = self.conn.lrange( - f"{REDIS_PREFIX}:{TASK}:%s:{EXECUTIONS}" % task["id"], 0, -1 + "t:task:%s:executions" % task["id"], 0, -1 ) assert len(executions) == 1 execution = json.loads(executions[0]) @@ -1489,7 +1464,7 @@ def test_decorated_child_hard_timeout_precedence(self): assert task["func"] == "tests.tasks:decorated_task_sleep_timeout" executions = self.conn.lrange( - f"{REDIS_PREFIX}:{TASK}:%s:{EXECUTIONS}" % task["id"], 0, -1 + "t:task:%s:executions" % task["id"], 0, -1 ) assert len(executions) == 1 execution = json.loads(executions[0]) diff --git a/tests/test_logging.py b/tests/test_logging.py index 67c44549..d74ae914 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -4,7 +4,6 @@ import structlog from tasktiger import TaskTiger, Worker -from tasktiger.constants import REDIS_PREFIX, TASK from tasktiger.logging import tasktiger_processor from .test_base import BaseTestCase @@ -56,7 +55,7 @@ def test_structlog_processor(self): Worker(self.tiger).run(once=True) self._ensure_queues(queued={"foo_qux": 0}) - assert not self.conn.exists(f"{REDIS_PREFIX}:{TASK}:%s" % task["id"]) + assert not self.conn.exists("t:task:%s" % task["id"]) class TestSetupStructlog(BaseTestCase): diff --git a/tests/test_migrations.py b/tests/test_migrations.py index fb17366b..ce44f287 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -2,12 +2,6 @@ import pytest -from tasktiger.constants import ( - EXECUTIONS, - EXECUTIONS_COUNT, - REDIS_PREFIX, - TASK, -) from tasktiger.migrations import migrate_executions_count from .test_base import BaseTestCase @@ -21,11 +15,11 @@ def test_migrate_nothing(self): @pytest.mark.parametrize( "key", [ - f"foot:{TASK}:{uuid.uuid4()}:{EXECUTIONS}", - f"foo:{REDIS_PREFIX}:{TASK}:{uuid.uuid4()}:{EXECUTIONS}", - f"{REDIS_PREFIX}:{TASK}:{uuid.uuid4()}:executionsfoo", - f"{REDIS_PREFIX}:{TASK}:{uuid.uuid4()}:{EXECUTIONS}:foo", - f"{REDIS_PREFIX}:{TASK}:{uuid.uuid4()}", + f"foot:task:{uuid.uuid4()}:executions", + f"foo:t:task:{uuid.uuid4()}:executions", + f"t:task:{uuid.uuid4()}:executionsfoo", + f"t:task:{uuid.uuid4()}:executions:foo", + f"t:task:{uuid.uuid4()}", ], ) def test_migrate_ignores_irrelevant_keys(self, key): @@ -39,93 +33,47 @@ def test_migrate(self): task_id_2 = uuid.uuid4() for __ in range(73): - self.conn.rpush( - f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS}", "{}" - ) + self.conn.rpush(f"t:task:{task_id_1}:executions", "{}") for __ in range(35): - self.conn.rpush( - f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS}", "{}" - ) + self.conn.rpush(f"t:task:{task_id_2}:executions", "{}") migrate_executions_count(self.tiger) - assert ( - self.conn.get( - f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS_COUNT}" - ) - == "73" - ) - assert ( - self.conn.get( - f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS_COUNT}" - ) - == "35" - ) + assert self.conn.get(f"t:task:{task_id_1}:executions_count") == "73" + assert self.conn.get(f"t:task:{task_id_2}:executions_count") == "35" def test_migrate_when_some_tasks_already_migrated(self): task_id_1 = uuid.uuid4() task_id_2 = uuid.uuid4() for __ in range(73): - self.conn.rpush( - f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS}", "{}" - ) + self.conn.rpush(f"t:task:{task_id_1}:executions", "{}") - self.conn.set( - f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS_COUNT}", 91 - ) + self.conn.set(f"t:task:{task_id_1}:executions_count", 91) for __ in range(35): - self.conn.rpush( - f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS}", "{}" - ) + self.conn.rpush(f"t:task:{task_id_2}:executions", "{}") migrate_executions_count(self.tiger) - assert ( - self.conn.get( - f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS_COUNT}" - ) - == "35" - ) + assert self.conn.get(f"t:task:{task_id_2}:executions_count") == "35" # looks migrated already - left untouched - assert ( - self.conn.get( - f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS_COUNT}" - ) - == "91" - ) + assert self.conn.get(f"t:task:{task_id_1}:executions_count") == "91" def test_migrate_when_counter_is_behind(self): task_id_1 = uuid.uuid4() task_id_2 = uuid.uuid4() for __ in range(73): - self.conn.rpush( - f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS}", "{}" - ) + self.conn.rpush(f"t:task:{task_id_1}:executions", "{}") - self.conn.set( - f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS_COUNT}", 10 - ) + self.conn.set(f"t:task:{task_id_1}:executions_count", 10) for __ in range(35): - self.conn.rpush( - f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS}", "{}" - ) + self.conn.rpush(f"t:task:{task_id_2}:executions", "{}") migrate_executions_count(self.tiger) - assert ( - self.conn.get( - f"{REDIS_PREFIX}:{TASK}:{task_id_2}:{EXECUTIONS_COUNT}" - ) - == "35" - ) + assert self.conn.get(f"t:task:{task_id_2}:executions_count") == "35" # updated because the counter value was less than the actual count - assert ( - self.conn.get( - f"{REDIS_PREFIX}:{TASK}:{task_id_1}:{EXECUTIONS_COUNT}" - ) - == "73" - ) + assert self.conn.get(f"t:task:{task_id_1}:executions_count") == "73" diff --git a/tests/test_redis_scripts.py b/tests/test_redis_scripts.py index 34af65a3..00879362 100644 --- a/tests/test_redis_scripts.py +++ b/tests/test_redis_scripts.py @@ -1,5 +1,3 @@ -from tasktiger.constants import ACTIVE, REDIS_PREFIX - from .utils import get_tiger @@ -127,7 +125,7 @@ def test_zpoppush_on_success_1(self, **kwargs): score=None, new_score=10, on_success=("update_sets", "val", "remove_set", "add_set"), - **kwargs, + **kwargs ) assert result == ["a", "b"] @@ -153,7 +151,7 @@ def test_zpoppush_on_success_2(self, **kwargs): score=0, new_score=10, on_success=("update_sets", "val", "remove_set", "add_set"), - **kwargs, + **kwargs ) assert result == [] @@ -179,7 +177,7 @@ def test_zpoppush_on_success_3(self, **kwargs): score=None, new_score=10, on_success=("update_sets", "val", "remove_set", "add_set"), - **kwargs, + **kwargs ) assert result == ["a", "b", "c", "d"] @@ -323,13 +321,13 @@ def test_delete_if_not_in_zsets_3(self): assert self.conn.exists("bar") == 0 def test_get_expired_tasks(self): - self.conn.sadd(f"{REDIS_PREFIX}:{ACTIVE}", "q1", "q2", "q3", "q4") - self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q1", {"t1": 500}) - self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q1", {"t2": 1000}) - self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q1", {"t3": 1500}) - self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q2", {"t4": 1200}) - self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q3", {"t5": 1800}) - self.conn.zadd(f"{REDIS_PREFIX}:{ACTIVE}:q4", {"t6": 200}) + self.conn.sadd("t:active", "q1", "q2", "q3", "q4") + self.conn.zadd("t:active:q1", {"t1": 500}) + self.conn.zadd("t:active:q1", {"t2": 1000}) + self.conn.zadd("t:active:q1", {"t3": 1500}) + self.conn.zadd("t:active:q2", {"t4": 1200}) + self.conn.zadd("t:active:q3", {"t5": 1800}) + self.conn.zadd("t:active:q4", {"t6": 200}) expired_task_set = {("q1", "t1"), ("q1", "t2"), ("q4", "t6")} From b5d0ffe93cfe5ac1e6ea2bea3200391ac915ebbc Mon Sep 17 00:00:00 2001 From: Vishesh Bansal Date: Tue, 12 Dec 2023 00:56:32 +0530 Subject: [PATCH 3/3] code refactor --- tasktiger/_internal.py | 5 +++++ tasktiger/constants.py | 11 ----------- tasktiger/migrations.py | 2 +- tasktiger/redis_scripts.py | 11 +++++++++-- tasktiger/task.py | 4 +++- tasktiger/worker.py | 4 +++- 6 files changed, 21 insertions(+), 16 deletions(-) delete mode 100644 tasktiger/constants.py diff --git a/tasktiger/_internal.py b/tasktiger/_internal.py index 6f31d239..7b00b575 100644 --- a/tasktiger/_internal.py +++ b/tasktiger/_internal.py @@ -27,6 +27,11 @@ from .task import Task from .tasktiger import TaskTiger +# Constants pertaining to Redis keys +TASK = "task" +EXECUTIONS = "executions" +EXECUTIONS_COUNT = "executions_count" + # Task states (represented by different queues) # Note some client code may rely on the string values (e.g. get_queue_stats). QUEUED = "queued" diff --git a/tasktiger/constants.py b/tasktiger/constants.py deleted file mode 100644 index 6baf92dc..00000000 --- a/tasktiger/constants.py +++ /dev/null @@ -1,11 +0,0 @@ -# constants pertaining to Redis keys -REDIS_PREFIX = "t" -TASK = "task" -EXECUTIONS = "executions" -EXECUTIONS_COUNT = "executions_count" - - -QUEUED = "queued" -ACTIVE = "active" -SCHEDULED = "scheduled" -ERROR = "error" diff --git a/tasktiger/migrations.py b/tasktiger/migrations.py index 3a80a2a8..68bbb2c7 100644 --- a/tasktiger/migrations.py +++ b/tasktiger/migrations.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING -from .constants import EXECUTIONS, TASK +from ._internal import EXECUTIONS, TASK from .utils import redis_glob_escape if TYPE_CHECKING: diff --git a/tasktiger/redis_scripts.py b/tasktiger/redis_scripts.py index 6c068e81..127e1ef7 100644 --- a/tasktiger/redis_scripts.py +++ b/tasktiger/redis_scripts.py @@ -3,8 +3,15 @@ from redis import Redis -from ._internal import ACTIVE, ERROR, QUEUED, SCHEDULED -from .constants import EXECUTIONS, EXECUTIONS_COUNT, TASK +from ._internal import ( + ACTIVE, + ERROR, + EXECUTIONS, + EXECUTIONS_COUNT, + QUEUED, + SCHEDULED, + TASK, +) try: from redis.commands.core import Script diff --git a/tasktiger/task.py b/tasktiger/task.py index c723d32f..0e08fc8f 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -20,8 +20,11 @@ from ._internal import ( ERROR, + EXECUTIONS, + EXECUTIONS_COUNT, QUEUED, SCHEDULED, + TASK, g, gen_id, gen_unique_id, @@ -30,7 +33,6 @@ serialize_func_name, serialize_retry_method, ) -from .constants import EXECUTIONS, EXECUTIONS_COUNT, TASK from .exceptions import QueueFullException, TaskImportError, TaskNotFound from .runner import BaseRunner, get_runner_class from .types import RetryStrategy diff --git a/tasktiger/worker.py b/tasktiger/worker.py index b322540f..9f944e49 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -36,8 +36,11 @@ from ._internal import ( ACTIVE, ERROR, + EXECUTIONS, + EXECUTIONS_COUNT, QUEUED, SCHEDULED, + TASK, dotted_parts, g, g_fork_lock, @@ -47,7 +50,6 @@ serialize_func_name, serialize_retry_method, ) -from .constants import EXECUTIONS, EXECUTIONS_COUNT, TASK from .exceptions import ( RetryException, StopRetry,