diff --git a/tasktiger/task.py b/tasktiger/task.py index bca090de..01cf84e4 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -344,8 +344,8 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None): _key(from_state), queue, _key(from_state, queue), client=pipeline ) - if to_state == QUEUED and self.tiger.config["PUBLISH_QUEUED_TASKS"]: - pipeline.publish(_key("activity"), queue) + if to_state == QUEUED: + self.tiger._notify_task_queued(queue, client=pipeline) try: scripts.execute_pipeline(pipeline) @@ -420,8 +420,10 @@ def delay(self, when=None, max_queue_size=None): mode="nx", client=pipeline, ) - if state == QUEUED and tiger.config["PUBLISH_QUEUED_TASKS"]: - pipeline.publish(tiger._key("activity"), self.queue) + + if state == QUEUED: + tiger._notify_task_queued(self.queue, client=pipeline) + pipeline.execute() self._state = state diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index 6c18647a..989d41e7 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -3,6 +3,7 @@ import datetime import importlib import logging +import secrets from collections import defaultdict import click @@ -15,6 +16,7 @@ QUEUED, SCHEDULED, classproperty, + dotted_parts, g, queue_matches, serialize_func_name, @@ -76,6 +78,8 @@ STRING :qlock: (Legacy queue locks that are no longer used) """ +# TODO - mention newly added keys + class TaskTiger: def __init__( @@ -203,6 +207,11 @@ def init(self, connection=None, config=None, setup_structlog=False): # subscribe to the activity channel. Use for more efficient task # processing with a large amount of workers. "POLL_TASK_QUEUES_INTERVAL": 0, + # Set to > 0 to reduce the frequency of queue polling by using + # cache token keys whose values are updated whenever a task is queued. + # TODO - reword to make it sound better + # TODO - caution about increased memory usage + "POLL_CACHE_TOKEN_KEY_EXPIRY": 0, # Whether to publish new tasks to the activity channel. Only set to # False if all the workers are polling queues. "PUBLISH_QUEUED_TASKS": True, @@ -256,6 +265,24 @@ def _get_current_tasks(self): current_task = property(_get_current_task) current_tasks = property(_get_current_tasks) + def _notify_task_queued(self, queue, client=None): + _client = client or self.connection.pipeline(transaction=False) + + if self.config["PUBLISH_QUEUED_TASKS"]: + _client.publish(self._key("activity"), queue) + + cache_token_expiry = self.config["POLL_CACHE_TOKEN_KEY_EXPIRY"] + if cache_token_expiry > 0: + for queue_part in list(dotted_parts(queue)) + [""]: + _client.set( + self._key("queued_cache_token", queue_part), + secrets.token_hex(), + ex=cache_token_expiry, + ) + + if _client is not client: + _client.execute() + @classproperty def current_instance(self): """ diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 27ade63a..60cfcdcc 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -14,7 +14,7 @@ import uuid from collections import OrderedDict from contextlib import ExitStack -from typing import List, Set +from typing import List, Optional, Set, Tuple from redis.exceptions import LockError @@ -86,6 +86,8 @@ def __init__( self._key = tiger._key self._did_work = True self._last_task_check = 0.0 + self._next_forced_queue_poll = 0.0 + self._queued_cache_tokens: Optional[Tuple[Optional[str], ...]] = None self.stats_thread = None self.id = str(uuid.uuid4()) @@ -221,8 +223,7 @@ def _worker_queue_scheduled_tasks(self) -> None: # XXX: ideally this would be in the same pipeline, but we only want # to announce if there was a result. if result: - if self.config["PUBLISH_QUEUED_TASKS"]: - self.connection.publish(self._key("activity"), queue) + self.tiger._notify_task_queued(queue) self._did_work = True def _poll_for_queues(self) -> None: @@ -235,7 +236,38 @@ def _poll_for_queues(self) -> None: """ if not self._did_work: time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"]) - self._refresh_queue_set() + + if self._should_poll_for_queues(): + self._refresh_queue_set() + + def _should_poll_for_queues(self) -> bool: + cache_token_expiry = self.tiger.config["POLL_CACHE_TOKEN_KEY_EXPIRY"] + if cache_token_expiry <= 0: + return True + + next_forced_queue_poll = self._next_forced_queue_poll + self._next_forced_queue_poll = time.monotonic() + cache_token_expiry + + queued_cache_tokens = self._get_queued_cache_tokens() + if queued_cache_tokens != self._queued_cache_tokens: + self._queued_cache_tokens = queued_cache_tokens + return True + + # Ensure that we poll queues when we haven't retrieved the cache + # tokens for at least as long as their expiry time. + if time.monotonic() >= next_forced_queue_poll: + self.log.info("Forcing a queue poll due to inactivity.") + return True + + return False + + def _get_queued_cache_tokens(self) -> Tuple[Optional[str], ...]: + keys = sorted( + self._key("queued_cache_token", queue) + for queue in self.only_queues or [""] + ) + + return tuple(self.connection.mget(keys)) def _pubsub_for_queues(self, timeout=0, batch_timeout=0) -> None: """