diff --git a/tasktiger/task.py b/tasktiger/task.py index d5b5709c..10934735 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -298,7 +298,9 @@ def execute(self): try: if is_batch_func: - return func([{'args': self.args, 'kwargs': self.kwargs}]) + return func( + [{'id': self.id, 'args': self.args, 'kwargs': self.kwargs}] + ) else: return func(*self.args, **self.kwargs) finally: diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 558f710b..7d02767e 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -371,7 +371,11 @@ def _execute_forked(self, tasks, log): if is_batch_func: # Batch process if the task supports it. params = [ - {'args': task.args, 'kwargs': task.kwargs} + { + 'id': task.id, + 'args': task.args, + 'kwargs': task.kwargs, + } for task in tasks ] task_timeouts = [ diff --git a/tests/tasks.py b/tests/tasks.py index 58da158b..546a2227 100644 --- a/tests/tasks.py +++ b/tests/tasks.py @@ -96,6 +96,8 @@ def locked_task(key, other=None): @tiger.task(queue='batch', batch=True) def batch_task(params): + assert all(p['id'] for p in params) + with redis.Redis( host=REDIS_HOST, db=TEST_DB, decode_responses=True ) as conn: diff --git a/tests/test_base.py b/tests/test_base.py index ca618fc6..c9a2d5c4 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -595,29 +595,29 @@ def test_retry_exception_2(self): pytest.raises(TaskNotFound, task.n_executions) def test_batch_1(self): - self.tiger.delay(batch_task, args=[1]) - self.tiger.delay(batch_task, args=[2]) - self.tiger.delay(batch_task, args=[3]) - self.tiger.delay(batch_task, args=[4]) + id1 = self.tiger.delay(batch_task, args=[1]).id + id2 = self.tiger.delay(batch_task, args=[2]).id + id3 = self.tiger.delay(batch_task, args=[3]).id + id4 = self.tiger.delay(batch_task, args=[4]).id self._ensure_queues(queued={'batch': 4}) Worker(self.tiger).run(once=True) self._ensure_queues(queued={'batch': 0}) data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)] assert data == [ [ - {'args': [1], 'kwargs': {}}, - {'args': [2], 'kwargs': {}}, - {'args': [3], 'kwargs': {}}, + {'id': id1, 'args': [1], 'kwargs': {}}, + {'id': id2, 'args': [2], 'kwargs': {}}, + {'id': id3, 'args': [3], 'kwargs': {}}, ], - [{'args': [4], 'kwargs': {}}], + [{'id': id4, 'args': [4], 'kwargs': {}}], ] def test_batch_2(self): - self.tiger.delay(batch_task, args=[1]) + id1 = self.tiger.delay(batch_task, args=[1]).id self.tiger.delay(non_batch_task, args=[5]) - self.tiger.delay(batch_task, args=[2]) - self.tiger.delay(batch_task, args=[3]) - self.tiger.delay(batch_task, args=[4]) + id2 = self.tiger.delay(batch_task, args=[2]).id + id3 = self.tiger.delay(batch_task, args=[3]).id + id4 = self.tiger.delay(batch_task, args=[4]).id self.tiger.delay(non_batch_task, args=[6]) self.tiger.delay(non_batch_task, args=[7]) self._ensure_queues(queued={'batch': 7}) @@ -625,45 +625,51 @@ def test_batch_2(self): self._ensure_queues(queued={'batch': 0}) data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)] assert data == [ - [{'args': [1], 'kwargs': {}}, {'args': [2], 'kwargs': {}}], + [ + {'id': id1, 'args': [1], 'kwargs': {}}, + {'id': id2, 'args': [2], 'kwargs': {}}, + ], 5, - [{'args': [3], 'kwargs': {}}, {'args': [4], 'kwargs': {}}], + [ + {'id': id3, 'args': [3], 'kwargs': {}}, + {'id': id4, 'args': [4], 'kwargs': {}}, + ], 6, 7, ] def test_batch_3(self): - self.tiger.delay(batch_task, queue='default', args=[1]) - self.tiger.delay(batch_task, queue='default', args=[2]) - self.tiger.delay(batch_task, queue='default', args=[3]) - self.tiger.delay(batch_task, queue='default', args=[4]) + id1 = self.tiger.delay(batch_task, queue='default', args=[1]).id + id2 = self.tiger.delay(batch_task, queue='default', args=[2]).id + id3 = self.tiger.delay(batch_task, queue='default', args=[3]).id + id4 = self.tiger.delay(batch_task, queue='default', args=[4]).id self._ensure_queues(queued={'default': 4}) Worker(self.tiger).run(once=True) self._ensure_queues(queued={'default': 0}) data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)] assert data == [ - [{'args': [1], 'kwargs': {}}], - [{'args': [2], 'kwargs': {}}], - [{'args': [3], 'kwargs': {}}], - [{'args': [4], 'kwargs': {}}], + [{'id': id1, 'args': [1], 'kwargs': {}}], + [{'id': id2, 'args': [2], 'kwargs': {}}], + [{'id': id3, 'args': [3], 'kwargs': {}}], + [{'id': id4, 'args': [4], 'kwargs': {}}], ] def test_batch_4(self): - self.tiger.delay(batch_task, queue='batch.sub', args=[1]) - self.tiger.delay(batch_task, queue='batch.sub', args=[2]) - self.tiger.delay(batch_task, queue='batch.sub', args=[3]) - self.tiger.delay(batch_task, queue='batch.sub', args=[4]) + id1 = self.tiger.delay(batch_task, queue='batch.sub', args=[1]).id + id2 = self.tiger.delay(batch_task, queue='batch.sub', args=[2]).id + id3 = self.tiger.delay(batch_task, queue='batch.sub', args=[3]).id + id4 = self.tiger.delay(batch_task, queue='batch.sub', args=[4]).id self._ensure_queues(queued={'batch.sub': 4}) Worker(self.tiger).run(once=True) self._ensure_queues(queued={'batch.sub': 0}) data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)] assert data == [ [ - {'args': [1], 'kwargs': {}}, - {'args': [2], 'kwargs': {}}, - {'args': [3], 'kwargs': {}}, + {'id': id1, 'args': [1], 'kwargs': {}}, + {'id': id2, 'args': [2], 'kwargs': {}}, + {'id': id3, 'args': [3], 'kwargs': {}}, ], - [{'args': [4], 'kwargs': {}}], + [{'id': id4, 'args': [4], 'kwargs': {}}], ] def test_batch_exception_1(self): @@ -1046,6 +1052,10 @@ def test_eager(self): task.delay(when=datetime.timedelta(seconds=5)) self._ensure_queues(scheduled={'default': 1}) + # Test batch task + task = Task(self.tiger, batch_task) + task.delay() + class TestCurrentTask(BaseTestCase): def test_current_task(self):