Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ def execute(self):

try:
if is_batch_func:
return func([{'args': self.args, 'kwargs': self.kwargs}])
return func(
[{'id': self.id, 'args': self.args, 'kwargs': self.kwargs}]
)
else:
return func(*self.args, **self.kwargs)
finally:
Expand Down
6 changes: 5 additions & 1 deletion tasktiger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ def _execute_forked(self, tasks, log):
if is_batch_func:
# Batch process if the task supports it.
params = [
{'args': task.args, 'kwargs': task.kwargs}
{
'id': task.id,
'args': task.args,
'kwargs': task.kwargs,
}
for task in tasks
]
task_timeouts = [
Expand Down
2 changes: 2 additions & 0 deletions tests/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def locked_task(key, other=None):

@tiger.task(queue='batch', batch=True)
def batch_task(params):
assert all(p['id'] for p in params)

with redis.Redis(
host=REDIS_HOST, db=TEST_DB, decode_responses=True
) as conn:
Expand Down
70 changes: 40 additions & 30 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,75 +595,81 @@ def test_retry_exception_2(self):
pytest.raises(TaskNotFound, task.n_executions)

def test_batch_1(self):
self.tiger.delay(batch_task, args=[1])
self.tiger.delay(batch_task, args=[2])
self.tiger.delay(batch_task, args=[3])
self.tiger.delay(batch_task, args=[4])
id1 = self.tiger.delay(batch_task, args=[1]).id
id2 = self.tiger.delay(batch_task, args=[2]).id
id3 = self.tiger.delay(batch_task, args=[3]).id
id4 = self.tiger.delay(batch_task, args=[4]).id
self._ensure_queues(queued={'batch': 4})
Worker(self.tiger).run(once=True)
self._ensure_queues(queued={'batch': 0})
data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)]
assert data == [
[
{'args': [1], 'kwargs': {}},
{'args': [2], 'kwargs': {}},
{'args': [3], 'kwargs': {}},
{'id': id1, 'args': [1], 'kwargs': {}},
{'id': id2, 'args': [2], 'kwargs': {}},
{'id': id3, 'args': [3], 'kwargs': {}},
],
[{'args': [4], 'kwargs': {}}],
[{'id': id4, 'args': [4], 'kwargs': {}}],
]

def test_batch_2(self):
self.tiger.delay(batch_task, args=[1])
id1 = self.tiger.delay(batch_task, args=[1]).id
self.tiger.delay(non_batch_task, args=[5])
self.tiger.delay(batch_task, args=[2])
self.tiger.delay(batch_task, args=[3])
self.tiger.delay(batch_task, args=[4])
id2 = self.tiger.delay(batch_task, args=[2]).id
id3 = self.tiger.delay(batch_task, args=[3]).id
id4 = self.tiger.delay(batch_task, args=[4]).id
self.tiger.delay(non_batch_task, args=[6])
self.tiger.delay(non_batch_task, args=[7])
self._ensure_queues(queued={'batch': 7})
Worker(self.tiger).run(once=True)
self._ensure_queues(queued={'batch': 0})
data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)]
assert data == [
[{'args': [1], 'kwargs': {}}, {'args': [2], 'kwargs': {}}],
[
{'id': id1, 'args': [1], 'kwargs': {}},
{'id': id2, 'args': [2], 'kwargs': {}},
],
5,
[{'args': [3], 'kwargs': {}}, {'args': [4], 'kwargs': {}}],
[
{'id': id3, 'args': [3], 'kwargs': {}},
{'id': id4, 'args': [4], 'kwargs': {}},
],
6,
7,
]

def test_batch_3(self):
self.tiger.delay(batch_task, queue='default', args=[1])
self.tiger.delay(batch_task, queue='default', args=[2])
self.tiger.delay(batch_task, queue='default', args=[3])
self.tiger.delay(batch_task, queue='default', args=[4])
id1 = self.tiger.delay(batch_task, queue='default', args=[1]).id
id2 = self.tiger.delay(batch_task, queue='default', args=[2]).id
id3 = self.tiger.delay(batch_task, queue='default', args=[3]).id
id4 = self.tiger.delay(batch_task, queue='default', args=[4]).id
self._ensure_queues(queued={'default': 4})
Worker(self.tiger).run(once=True)
self._ensure_queues(queued={'default': 0})
data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)]
assert data == [
[{'args': [1], 'kwargs': {}}],
[{'args': [2], 'kwargs': {}}],
[{'args': [3], 'kwargs': {}}],
[{'args': [4], 'kwargs': {}}],
[{'id': id1, 'args': [1], 'kwargs': {}}],
[{'id': id2, 'args': [2], 'kwargs': {}}],
[{'id': id3, 'args': [3], 'kwargs': {}}],
[{'id': id4, 'args': [4], 'kwargs': {}}],
]

def test_batch_4(self):
self.tiger.delay(batch_task, queue='batch.sub', args=[1])
self.tiger.delay(batch_task, queue='batch.sub', args=[2])
self.tiger.delay(batch_task, queue='batch.sub', args=[3])
self.tiger.delay(batch_task, queue='batch.sub', args=[4])
id1 = self.tiger.delay(batch_task, queue='batch.sub', args=[1]).id
id2 = self.tiger.delay(batch_task, queue='batch.sub', args=[2]).id
id3 = self.tiger.delay(batch_task, queue='batch.sub', args=[3]).id
id4 = self.tiger.delay(batch_task, queue='batch.sub', args=[4]).id
self._ensure_queues(queued={'batch.sub': 4})
Worker(self.tiger).run(once=True)
self._ensure_queues(queued={'batch.sub': 0})
data = [json.loads(d) for d in self.conn.lrange('batch_task', 0, -1)]
assert data == [
[
{'args': [1], 'kwargs': {}},
{'args': [2], 'kwargs': {}},
{'args': [3], 'kwargs': {}},
{'id': id1, 'args': [1], 'kwargs': {}},
{'id': id2, 'args': [2], 'kwargs': {}},
{'id': id3, 'args': [3], 'kwargs': {}},
],
[{'args': [4], 'kwargs': {}}],
[{'id': id4, 'args': [4], 'kwargs': {}}],
]

def test_batch_exception_1(self):
Expand Down Expand Up @@ -1046,6 +1052,10 @@ def test_eager(self):
task.delay(when=datetime.timedelta(seconds=5))
self._ensure_queues(scheduled={'default': 1})

# Test batch task
task = Task(self.tiger, batch_task)
task.delay()


class TestCurrentTask(BaseTestCase):
def test_current_task(self):
Expand Down