From c73b7ca449031d42d1cb5e877b4414bdf9b790d3 Mon Sep 17 00:00:00 2001 From: Ruiwen Chua Date: Wed, 23 Apr 2014 20:09:29 +0800 Subject: [PATCH 01/11] Avoid resetting 'attempts' to 1 each time we retrieve a job --- mongoqueue/mongoqueue.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mongoqueue/mongoqueue.py b/mongoqueue/mongoqueue.py index 65a9022..17af583 100644 --- a/mongoqueue/mongoqueue.py +++ b/mongoqueue/mongoqueue.py @@ -83,9 +83,11 @@ def next(self): query={"locked_by": None, "locked_at": None, "attempts": {"$lt": self.max_attempts}}, - update={"$set": {"attempts": 1, - "locked_by": self.consumer_id, - "locked_at": datetime.now()}}, + update={ + "$set": { + "locked_by": self.consumer_id, + "locked_at": datetime.now() + }}, sort=[('priority', pymongo.DESCENDING)], new=1, limit=1 From e28a136d5b33fd91542ab81db5e0278df19a20d3 Mon Sep 17 00:00:00 2001 From: Ruiwen Chua Date: Wed, 23 Apr 2014 20:10:17 +0800 Subject: [PATCH 02/11] Documentation suggests that the 'new' parameter should be a boolean, not an int --- mongoqueue/mongoqueue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mongoqueue/mongoqueue.py b/mongoqueue/mongoqueue.py index 17af583..a90b3f6 100644 --- a/mongoqueue/mongoqueue.py +++ b/mongoqueue/mongoqueue.py @@ -89,7 +89,7 @@ def next(self): "locked_at": datetime.now() }}, sort=[('priority', pymongo.DESCENDING)], - new=1, + new=True, limit=1 )) From 6928bf896c9d3390e38228b02cd6c51cf7393570 Mon Sep 17 00:00:00 2001 From: Ruiwen Chua Date: Wed, 23 Apr 2014 20:10:33 +0800 Subject: [PATCH 03/11] Return True to avoid exceptions propagating beyond the context manager --- mongoqueue/mongoqueue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/mongoqueue/mongoqueue.py b/mongoqueue/mongoqueue.py index a90b3f6..dbe7d20 100644 --- a/mongoqueue/mongoqueue.py +++ b/mongoqueue/mongoqueue.py @@ -194,3 +194,4 @@ def __exit__(self, type, value, tb): else: error = traceback.format_exc() self.error(error) + return True From 04855b353b3f1cd8ac5e20fbbb94ffce7a72206b Mon Sep 17 00:00:00 2001 From: Ruiwen Chua Date: Wed, 23 Apr 2014 20:12:21 +0800 Subject: [PATCH 04/11] Add tests for ensuring a job can only be attempted 'max_attempts' times --- mongoqueue/test.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/mongoqueue/test.py b/mongoqueue/test.py index dfbdb3a..d6f4db4 100644 --- a/mongoqueue/test.py +++ b/mongoqueue/test.py @@ -111,6 +111,20 @@ def test_release(self): job = self.queue.next() self.assert_job_equal(job, data) + def test_max_attempts(self): + data = {"context_id": "alpha", + "ts": time.time()} + self.queue.put(dict(data)) + attempts = 0 + for i in xrange(0, self.queue.max_attempts): + job = self.queue.next() + if not job: + break + with job: + attempts += 1 + raise Exception() + self.assertEqual(attempts, self.queue.max_attempts) + def test_error(self): pass From ef186fba91068b18aa48047dca74ba9f0333baad Mon Sep 17 00:00:00 2001 From: Ruiwen Chua Date: Thu, 24 Apr 2014 23:39:09 +0800 Subject: [PATCH 05/11] Add abort() method for jobs that should be intentionally skipped --- mongoqueue/mongoqueue.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/mongoqueue/mongoqueue.py b/mongoqueue/mongoqueue.py index dbe7d20..ca8d26f 100644 --- a/mongoqueue/mongoqueue.py +++ b/mongoqueue/mongoqueue.py @@ -183,6 +183,13 @@ def release(self): update={"$set": {"locked_by": None, "locked_at": None}, "$inc": {"attempts": 1}}) + def abort(self): + """Intentionally terminate execution of a job, and remove it from the queue + """ + return self._queue.collection.find_and_modify( + {"_id": self.job_id, "locked_by": self._queue.consumer_id}, + remove=True) + ## Context Manager support def __enter__(self): From 0b76b6ed1367d35f637c6795b3988ffacbbc91c8 Mon Sep 17 00:00:00 2001 From: Jack Date: Wed, 30 Jul 2014 13:57:11 +0800 Subject: [PATCH 06/11] Add time for jobs and divide next function into 2 separated queries --- mongoqueue/mongoqueue.py | 64 +++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/mongoqueue/mongoqueue.py b/mongoqueue/mongoqueue.py index ca8d26f..b8796b6 100644 --- a/mongoqueue/mongoqueue.py +++ b/mongoqueue/mongoqueue.py @@ -1,18 +1,3 @@ -# Copyright 2012 Kapil Thangavelu -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - import pymongo from datetime import datetime, timedelta @@ -21,6 +6,7 @@ DEFAULT_INSERT = { "priority": 0, + "time": None, "attempts": 0, "locked_by": None, "locked_at": None, @@ -70,29 +56,65 @@ def repair(self): "$inc": {"attempts": 1}} ) - def put(self, payload, priority=0): + def put(self, payload, priority=0, time=None): """Place a job into the queue """ job = dict(DEFAULT_INSERT) job['priority'] = priority + job['time'] = time job['payload'] = payload return self.collection.insert(job) def next(self): - return self._wrap_one(self.collection.find_and_modify( - query={"locked_by": None, - "locked_at": None, - "attempts": {"$lt": self.max_attempts}}, + scheduled_job = self.next_scheduled_job() + free_job = self.next_free_job() + next_job = None + + if scheduled_job and scheduled_job['time'] < datetime.utcnow(): + next_job = scheduled_job + else: + next_job = free_job + + return self._wrap_one(self.collection.find_and_modify({ + "_id": next_job["_id"] + }, update={ "$set": { "locked_by": self.consumer_id, "locked_at": datetime.now() }}, - sort=[('priority', pymongo.DESCENDING)], new=True, limit=1 )) + def next_scheduled_job(self): + jobs = self.collection.find({ + "locked_by": None, + "locked_at": None, + "time": {"$ne": None}, + "attempts": {"$lt": self.max_attempts} + }, + sort=[('time', pymongo.ASCENDING)], + limit=1 + ) + for job in jobs: + return job + return None + + def next_free_job(self): + jobs = self.collection.find({ + "locked_by": None, + "locked_at": None, + "time": None, + "attempts": {"$lt": self.max_attempts} + }, + sort=[('priority', pymongo.DESCENDING)], + limit=1 + ) + for job in jobs: + return job + return None + def _jobs(self): return self.collection.find( query={"locked_by": None, From 2ffb74f91ff26a97b1d052c30e1998c492e36f30 Mon Sep 17 00:00:00 2001 From: Jack Date: Wed, 30 Jul 2014 19:13:08 +0800 Subject: [PATCH 07/11] Recover the header --- mongoqueue/mongoqueue.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/mongoqueue/mongoqueue.py b/mongoqueue/mongoqueue.py index b8796b6..51b348f 100644 --- a/mongoqueue/mongoqueue.py +++ b/mongoqueue/mongoqueue.py @@ -1,3 +1,18 @@ +# Copyright 2012 Kapil Thangavelu +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + import pymongo from datetime import datetime, timedelta From 0bd5fef8cf0482ecd77e1e2704a5d52046a99893 Mon Sep 17 00:00:00 2001 From: Jack Date: Thu, 31 Jul 2014 16:46:15 +0800 Subject: [PATCH 08/11] Add and handle periodic jobs --- mongoqueue/mongoqueue.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/mongoqueue/mongoqueue.py b/mongoqueue/mongoqueue.py index 51b348f..6e64e4a 100644 --- a/mongoqueue/mongoqueue.py +++ b/mongoqueue/mongoqueue.py @@ -22,6 +22,7 @@ DEFAULT_INSERT = { "priority": 0, "time": None, + "period": 0, "attempts": 0, "locked_by": None, "locked_at": None, @@ -71,12 +72,16 @@ def repair(self): "$inc": {"attempts": 1}} ) - def put(self, payload, priority=0, time=None): + def put(self, payload, priority=0, time=None, period=None): """Place a job into the queue """ job = dict(DEFAULT_INSERT) job['priority'] = priority job['time'] = time + # Store period as an integer representing the number of seconds + # because BSON format doesn't support timedelta + if period and type(period) == timedelta: + job['period'] = period.total_seconds() job['payload'] = payload return self.collection.insert(job) @@ -192,6 +197,17 @@ def job_id(self): def complete(self): """Job has been completed. """ + if self._data['period']: + updated_time = self._data['time'] + timedelta(seconds=self._data['period']) + + return self._queue.collection.find_and_modify( + {"_id": self.job_id, "locked_by": self._queue.consumer_id}, + update={"$set":{ + "locked_by": None, + "locked_at": None, + "time": updated_time + }}) + return self._queue.collection.find_and_modify( {"_id": self.job_id, "locked_by": self._queue.consumer_id}, remove=True) @@ -205,6 +221,15 @@ def error(self, message=None): "locked_by": None, "locked_at": None, "last_error": message}, "$inc": {"attempts": 1}}) + if self._data['attempts'] == self._queue.max_attempts - 1 and self._data['period']: + updated_time = self._data['time'] + timedelta(seconds=self._data['period']) + + self._queue.put(self._data['payload'], + priority=self._data['priority'], + time=updated_time, + period=timedelta(seconds=self._data['period'])) + + def progress(self, count=0): """Note progress on a long running task. """ From 6fcb968a43e66a029e6aa2f422a772267d9d1c4b Mon Sep 17 00:00:00 2001 From: Jack Date: Mon, 4 Aug 2014 11:52:27 +0800 Subject: [PATCH 09/11] Check for dupe before inserting new job to collection --- mongoqueue/mongoqueue.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/mongoqueue/mongoqueue.py b/mongoqueue/mongoqueue.py index 6e64e4a..dfe1f87 100644 --- a/mongoqueue/mongoqueue.py +++ b/mongoqueue/mongoqueue.py @@ -83,8 +83,22 @@ def put(self, payload, priority=0, time=None, period=None): if period and type(period) == timedelta: job['period'] = period.total_seconds() job['payload'] = payload + if self.is_dupe(job): + return return self.collection.insert(job) + def is_dupe(self, job): + jobs = self.collection.find({ + 'payload': job['payload'], + 'time': job['time'], + 'period': job['period']}, + limit=1 + ) + for job in jobs: + if job: + return True + return False + def next(self): scheduled_job = self.next_scheduled_job() free_job = self.next_free_job() From 14299ef959ca098fda2a69be8294661caec67e1a Mon Sep 17 00:00:00 2001 From: Jack Date: Thu, 14 Aug 2014 16:08:05 +0800 Subject: [PATCH 10/11] Check duplicates with only unattempted jobs --- mongoqueue/mongoqueue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mongoqueue/mongoqueue.py b/mongoqueue/mongoqueue.py index dfe1f87..c561b64 100644 --- a/mongoqueue/mongoqueue.py +++ b/mongoqueue/mongoqueue.py @@ -91,7 +91,8 @@ def is_dupe(self, job): jobs = self.collection.find({ 'payload': job['payload'], 'time': job['time'], - 'period': job['period']}, + 'period': job['period'], + 'attempts': 0}, limit=1 ) for job in jobs: From c28c2215ecabfc42e3211f0cea48ea75b20dc557 Mon Sep 17 00:00:00 2001 From: Ruiwen Chua Date: Sun, 17 Aug 2014 15:59:17 +0800 Subject: [PATCH 11/11] fix(setup) version bump to 0.7.3 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a7e7ba4..468a596 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages setup(name='mongoqueue', - version="0.7.2", + version="0.7.3", classifiers=[ 'Intended Audience :: Developers', 'Programming Language :: Python',