From fbae326762f788e4114e0f66be3dbc9c7d757c2b Mon Sep 17 00:00:00 2001 From: Joe Kemp Date: Mon, 14 Dec 2020 09:52:48 -0500 Subject: [PATCH 1/3] Add task id to batch parameters --- tasktiger/worker.py | 11 ++++++-- tests/test_base.py | 66 ++++++++++++++++++++++++--------------------- 2 files changed, 45 insertions(+), 32 deletions(-) diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 558f710b..50bd835c 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -119,7 +119,10 @@ def __init__( # Redis load. self.worker_group_name = hashlib.sha256( json.dumps( - [sorted(self.only_queues), sorted(self.exclude_queues),] + [ + sorted(self.only_queues), + sorted(self.exclude_queues), + ] ).encode('utf8') ).hexdigest() @@ -371,7 +374,11 @@ def _execute_forked(self, tasks, log): if is_batch_func: # Batch process if the task supports it. params = [ - {'args': task.args, 'kwargs': task.kwargs} + { + 'id': task.id, + 'args': task.args, + 'kwargs': task.kwargs, + } for task in tasks ] task_timeouts = [ diff --git a/tests/test_base.py b/tests/test_base.py index ca618fc6..7f597e30 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -595,29 +595,29 @@ def test_retry_exception_2(self): pytest.raises(TaskNotFound, task.n_executions) def test_batch_1(self): - self.tiger.delay(batch_task, args=[1]) - self.tiger.delay(batch_task, args=[2]) - self.tiger.delay(batch_task, args=[3]) - self.tiger.delay(batch_task, args=[4]) + id1 = self.tiger.delay(batch_task, args=[1]).id + id2 = self.tiger.delay(batch_task, args=[2]).id + id3 = self.tiger.delay(batch_task, args=[3]).id + id4 = self.tiger.delay(batch_task, args=[4]).id self._ensure_queues(queued={'batch': 4}) Worker(self.tiger).run(once=True) self._ensure_queues(queued={'batch': 0}) data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)] assert data == [ [ - {'args': [1], 'kwargs': {}}, - {'args': [2], 'kwargs': {}}, - {'args': [3], 'kwargs': {}}, + {'id': id1, 'args': [1], 'kwargs': {}}, + {'id': id2, 'args': [2], 'kwargs': {}}, + {'id': id3, 'args': [3], 'kwargs': {}}, ], - [{'args': [4], 'kwargs': {}}], + [{'id': id4, 'args': [4], 'kwargs': {}}], ] def test_batch_2(self): - self.tiger.delay(batch_task, args=[1]) + id1 = self.tiger.delay(batch_task, args=[1]).id self.tiger.delay(non_batch_task, args=[5]) - self.tiger.delay(batch_task, args=[2]) - self.tiger.delay(batch_task, args=[3]) - self.tiger.delay(batch_task, args=[4]) + id2 = self.tiger.delay(batch_task, args=[2]).id + id3 = self.tiger.delay(batch_task, args=[3]).id + id4 = self.tiger.delay(batch_task, args=[4]).id self.tiger.delay(non_batch_task, args=[6]) self.tiger.delay(non_batch_task, args=[7]) self._ensure_queues(queued={'batch': 7}) @@ -625,45 +625,51 @@ def test_batch_2(self): self._ensure_queues(queued={'batch': 0}) data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)] assert data == [ - [{'args': [1], 'kwargs': {}}, {'args': [2], 'kwargs': {}}], + [ + {'id': id1, 'args': [1], 'kwargs': {}}, + {'id': id2, 'args': [2], 'kwargs': {}}, + ], 5, - [{'args': [3], 'kwargs': {}}, {'args': [4], 'kwargs': {}}], + [ + {'id': id3, 'args': [3], 'kwargs': {}}, + {'id': id4, 'args': [4], 'kwargs': {}}, + ], 6, 7, ] def test_batch_3(self): - self.tiger.delay(batch_task, queue='default', args=[1]) - self.tiger.delay(batch_task, queue='default', args=[2]) - self.tiger.delay(batch_task, queue='default', args=[3]) - self.tiger.delay(batch_task, queue='default', args=[4]) + id1 = self.tiger.delay(batch_task, queue='default', args=[1]).id + id2 = self.tiger.delay(batch_task, queue='default', args=[2]).id + id3 = self.tiger.delay(batch_task, queue='default', args=[3]).id + id4 = self.tiger.delay(batch_task, queue='default', args=[4]).id self._ensure_queues(queued={'default': 4}) Worker(self.tiger).run(once=True) self._ensure_queues(queued={'default': 0}) data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)] assert data == [ - [{'args': [1], 'kwargs': {}}], - [{'args': [2], 'kwargs': {}}], - [{'args': [3], 'kwargs': {}}], - [{'args': [4], 'kwargs': {}}], + [{'id': id1, 'args': [1], 'kwargs': {}}], + [{'id': id2, 'args': [2], 'kwargs': {}}], + [{'id': id3, 'args': [3], 'kwargs': {}}], + [{'id': id4, 'args': [4], 'kwargs': {}}], ] def test_batch_4(self): - self.tiger.delay(batch_task, queue='batch.sub', args=[1]) - self.tiger.delay(batch_task, queue='batch.sub', args=[2]) - self.tiger.delay(batch_task, queue='batch.sub', args=[3]) - self.tiger.delay(batch_task, queue='batch.sub', args=[4]) + id1 = self.tiger.delay(batch_task, queue='batch.sub', args=[1]).id + id2 = self.tiger.delay(batch_task, queue='batch.sub', args=[2]).id + id3 = self.tiger.delay(batch_task, queue='batch.sub', args=[3]).id + id4 = self.tiger.delay(batch_task, queue='batch.sub', args=[4]).id self._ensure_queues(queued={'batch.sub': 4}) Worker(self.tiger).run(once=True) self._ensure_queues(queued={'batch.sub': 0}) data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)] assert data == [ [ - {'args': [1], 'kwargs': {}}, - {'args': [2], 'kwargs': {}}, - {'args': [3], 'kwargs': {}}, + {'id': id1, 'args': [1], 'kwargs': {}}, + {'id': id2, 'args': [2], 'kwargs': {}}, + {'id': id3, 'args': [3], 'kwargs': {}}, ], - [{'args': [4], 'kwargs': {}}], + [{'id': id4, 'args': [4], 'kwargs': {}}], ] def test_batch_exception_1(self): From 822a654b22f60c4786fe124dc2bded5e54a06dd6 Mon Sep 17 00:00:00 2001 From: Joe Kemp Date: Mon, 14 Dec 2020 10:05:27 -0500 Subject: [PATCH 2/3] Revert newer black change --- tasktiger/worker.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 50bd835c..7d02767e 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -119,10 +119,7 @@ def __init__( # Redis load. self.worker_group_name = hashlib.sha256( json.dumps( - [ - sorted(self.only_queues), - sorted(self.exclude_queues), - ] + [sorted(self.only_queues), sorted(self.exclude_queues),] ).encode('utf8') ).hexdigest() From b3d25670d6a9a5b3512d34bfb7b0a103df789e7d Mon Sep 17 00:00:00 2001 From: Joe Kemp Date: Mon, 14 Dec 2020 13:53:02 -0500 Subject: [PATCH 3/3] Handle eager processed batch tasks --- tasktiger/task.py | 4 +++- tests/tasks.py | 2 ++ tests/test_base.py | 4 ++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/tasktiger/task.py b/tasktiger/task.py index d5b5709c..10934735 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -298,7 +298,9 @@ def execute(self): try: if is_batch_func: - return func([{'args': self.args, 'kwargs': self.kwargs}]) + return func( + [{'id': self.id, 'args': self.args, 'kwargs': self.kwargs}] + ) else: return func(*self.args, **self.kwargs) finally: diff --git a/tests/tasks.py b/tests/tasks.py index 58da158b..546a2227 100644 --- a/tests/tasks.py +++ b/tests/tasks.py @@ -96,6 +96,8 @@ def locked_task(key, other=None): @tiger.task(queue='batch', batch=True) def batch_task(params): + assert all(p['id'] for p in params) + with redis.Redis( host=REDIS_HOST, db=TEST_DB, decode_responses=True ) as conn: diff --git a/tests/test_base.py b/tests/test_base.py index 7f597e30..c9a2d5c4 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -1052,6 +1052,10 @@ def test_eager(self): task.delay(when=datetime.timedelta(seconds=5)) self._ensure_queues(scheduled={'default': 1}) + # Test batch task + task = Task(self.tiger, batch_task) + task.delay() + class TestCurrentTask(BaseTestCase): def test_current_task(self):