Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions arq/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from operator import attrgetter
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union, cast
from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast
from urllib.parse import parse_qs, urlparse
from uuid import uuid4

Expand All @@ -28,7 +28,7 @@ class RedisSettings:
Used by :func:`arq.connections.create_pool` and :class:`arq.worker.Worker`.
"""

host: Union[str, List[Tuple[str, int]]] = 'localhost'
host: Union[str, list[tuple[str, int]]] = 'localhost'
port: int = 6379
unix_socket_path: Optional[str] = None
database: int = 0
Expand All @@ -50,7 +50,7 @@ class RedisSettings:
sentinel_master: str = 'mymaster'

retry_on_timeout: bool = False
retry_on_error: Optional[List[Exception]] = None
retry_on_error: Optional[list[Exception]] = None
retry: Optional[Retry] = None

@classmethod
Expand Down Expand Up @@ -189,7 +189,7 @@ async def _get_job_result(self, key: bytes) -> JobResult:
r.job_id = job_id
return r

async def all_job_results(self) -> List[JobResult]:
async def all_job_results(self) -> list[JobResult]:
"""
Get results for all jobs in redis.
"""
Expand All @@ -207,7 +207,7 @@ async def _get_job_def(self, job_id: bytes, score: int) -> JobDef:
jd.job_id = job_id.decode()
return jd

async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef]:
async def queued_jobs(self, *, queue_name: Optional[str] = None) -> list[JobDef]:
"""
Get information about queued, mostly useful when testing.
"""
Expand Down Expand Up @@ -312,8 +312,5 @@ async def log_redis_info(redis: 'Redis[bytes]', log_func: Callable[[str], Any])
clients_connected = info_clients.get('connected_clients', '?')

log_func(
f'redis_version={redis_version} '
f'mem_usage={mem_usage} '
f'clients_connected={clients_connected} '
f'db_keys={key_count}'
f'redis_version={redis_version} mem_usage={mem_usage} clients_connected={clients_connected} db_keys={key_count}'
)
20 changes: 10 additions & 10 deletions arq/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, Optional, Tuple
from typing import Any, Callable, Optional

from redis.asyncio import Redis

Expand All @@ -14,8 +14,8 @@

logger = logging.getLogger('arq.jobs')

Serializer = Callable[[Dict[str, Any]], bytes]
Deserializer = Callable[[bytes], Dict[str, Any]]
Serializer = Callable[[dict[str, Any]], bytes]
Deserializer = Callable[[bytes], dict[str, Any]]


class ResultNotFound(RuntimeError):
Expand All @@ -42,8 +42,8 @@ class JobStatus(str, Enum):
@dataclass
class JobDef:
function: str
args: Tuple[Any, ...]
kwargs: Dict[str, Any]
args: tuple[Any, ...]
kwargs: dict[str, Any]
job_try: int
enqueue_time: datetime
score: Optional[int]
Expand Down Expand Up @@ -210,8 +210,8 @@ class DeserializationError(SerializationError):

def serialize_job(
function_name: str,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
args: tuple[Any, ...],
kwargs: dict[str, Any],
job_try: Optional[int],
enqueue_time_ms: int,
*,
Expand All @@ -228,8 +228,8 @@ def serialize_job(

def serialize_result(
function: str,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
args: tuple[Any, ...],
kwargs: dict[str, Any],
job_try: int,
enqueue_time_ms: int,
success: bool,
Expand Down Expand Up @@ -291,7 +291,7 @@ def deserialize_job(r: bytes, *, deserializer: Optional[Deserializer] = None) ->

def deserialize_job_raw(
r: bytes, *, deserializer: Optional[Deserializer] = None
) -> Tuple[str, Tuple[Any, ...], Dict[str, Any], int, int]:
) -> tuple[str, tuple[Any, ...], dict[str, Any], int, int]:
if deserializer is None:
deserializer = pickle.loads
try:
Expand Down
4 changes: 2 additions & 2 deletions arq/logs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Dict
from typing import Any


def default_log_config(verbose: bool) -> Dict[str, Any]:
def default_log_config(verbose: bool) -> dict[str, Any]:
"""
Setup default config. for dictConfig.

Expand Down
11 changes: 6 additions & 5 deletions arq/typing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections.abc import Sequence
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Protocol, Sequence, Set, Type, Union
from typing import TYPE_CHECKING, Any, Literal, Optional, Protocol, Union

__all__ = (
'OptionType',
Expand All @@ -16,7 +17,7 @@
from .cron import CronJob
from .worker import Function

OptionType = Union[None, Set[int], int]
OptionType = Union[None, set[int], int]
WEEKDAYS = 'mon', 'tues', 'wed', 'thurs', 'fri', 'sat', 'sun'
WeekdayOptionType = Union[OptionType, Literal['mon', 'tues', 'wed', 'thurs', 'fri', 'sat', 'sun']]
SecondsTimedelta = Union[int, float, timedelta]
Expand All @@ -25,14 +26,14 @@
class WorkerCoroutine(Protocol):
__qualname__: str

async def __call__(self, ctx: Dict[Any, Any], *args: Any, **kwargs: Any) -> Any: # pragma: no cover
async def __call__(self, ctx: dict[Any, Any], *args: Any, **kwargs: Any) -> Any: # pragma: no cover
pass


class StartupShutdown(Protocol):
__qualname__: str

async def __call__(self, ctx: Dict[Any, Any]) -> Any: # pragma: no cover
async def __call__(self, ctx: dict[Any, Any]) -> Any: # pragma: no cover
pass


Expand All @@ -44,4 +45,4 @@ class WorkerSettingsBase(Protocol):
# and many more...


WorkerSettingsType = Union[Dict[str, Any], Type[WorkerSettingsBase]]
WorkerSettingsType = Union[dict[str, Any], type[WorkerSettingsBase]]
17 changes: 10 additions & 7 deletions arq/utils.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
import asyncio
import logging
import os
from collections.abc import AsyncGenerator, Sequence
from datetime import datetime, timedelta, timezone
from functools import lru_cache
from time import time
from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, Optional, Sequence, overload
from typing import TYPE_CHECKING, Any, Optional, overload

from .constants import timezone_env_vars

try:
import pytz
except ImportError: # pragma: no cover
pytz = None # type: ignore

logger = logging.getLogger('arq.utils')

if TYPE_CHECKING:
import pytz

from .typing import SecondsTimedelta
else:
try:
import pytz
except ImportError: # pragma: no cover
pytz = None # type: ignore


def as_int(f: float) -> int:
Expand Down Expand Up @@ -121,7 +124,7 @@ def truncate(s: str, length: int = DEFAULT_CURTAIL) -> str:
return s


def args_to_string(args: Sequence[Any], kwargs: Dict[str, Any]) -> str:
def args_to_string(args: Sequence[Any], kwargs: dict[str, Any]) -> str:
arguments = ''
if args:
arguments = ', '.join(map(repr, args))
Expand Down
29 changes: 15 additions & 14 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import inspect
import logging
import signal
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from functools import partial
from signal import Signals
from time import time
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, cast
from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast

from redis.exceptions import ResponseError, WatchError

Expand Down Expand Up @@ -81,7 +82,7 @@ def func(

if isinstance(coroutine, str):
name = name or coroutine
coroutine_: 'WorkerCoroutine' = import_string(coroutine)
coroutine_: WorkerCoroutine = import_string(coroutine)
else:
coroutine_ = coroutine

Expand Down Expand Up @@ -118,7 +119,7 @@ def __eq__(self, other: Any) -> bool:


class FailedJobs(RuntimeError):
def __init__(self, count: int, job_results: List[JobResult]):
def __init__(self, count: int, job_results: list[JobResult]):
self.count = count
self.job_results = job_results

Expand Down Expand Up @@ -208,7 +209,7 @@ def __init__(
max_tries: int = 5,
health_check_interval: 'SecondsTimedelta' = 3600,
health_check_key: Optional[str] = None,
ctx: Optional[Dict[Any, Any]] = None,
ctx: Optional[dict[Any, Any]] = None,
retry_jobs: bool = True,
allow_abort_jobs: bool = False,
max_burst_jobs: int = -1,
Expand All @@ -218,14 +219,14 @@ def __init__(
timezone: Optional[timezone] = None,
log_results: bool = True,
):
self.functions: Dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)}
self.functions: dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)}
if queue_name is None:
if redis_pool is not None:
queue_name = redis_pool.default_queue_name
else:
raise ValueError('If queue_name is absent, redis_pool must be present.')
self.queue_name = queue_name
self.cron_jobs: List[CronJob] = []
self.cron_jobs: list[CronJob] = []
if cron_jobs is not None:
if not all(isinstance(cj, CronJob) for cj in cron_jobs):
raise RuntimeError('cron_jobs, must be instances of CronJob')
Expand Down Expand Up @@ -262,9 +263,9 @@ def __init__(
else:
self.redis_settings = None
# self.tasks holds references to run_job coroutines currently running
self.tasks: Dict[str, asyncio.Task[Any]] = {}
self.tasks: dict[str, asyncio.Task[Any]] = {}
# self.job_tasks holds references the actual jobs running
self.job_tasks: Dict[str, asyncio.Task[Any]] = {}
self.job_tasks: dict[str, asyncio.Task[Any]] = {}
self.main_task: Optional[asyncio.Task[None]] = None
self.loop = asyncio.get_event_loop()
self.ctx = ctx or {}
Expand All @@ -289,7 +290,7 @@ def __init__(
self.retry_jobs = retry_jobs
self.allow_abort_jobs = allow_abort_jobs
self.allow_pick_jobs: bool = True
self.aborting_tasks: Set[str] = set()
self.aborting_tasks: set[str] = set()
self.max_burst_jobs = max_burst_jobs
self.job_serializer = job_serializer
self.job_deserializer = job_deserializer
Expand Down Expand Up @@ -409,7 +410,7 @@ async def _cancel_aborted_jobs(self) -> None:
pipe.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf'))
abort_job_ids, _ = await pipe.execute()

aborted: Set[str] = set()
aborted: set[str] = set()
for job_id_bytes in abort_job_ids:
job_id = job_id_bytes.decode()
try:
Expand All @@ -428,7 +429,7 @@ def _release_sem_dec_counter_on_complete(self) -> None:
self.job_counter = self.job_counter - 1
self.sem.release()

async def start_jobs(self, job_ids: List[bytes]) -> None:
async def start_jobs(self, job_ids: list[bytes]) -> None:
"""
For each job id, get the job definition, check it's not running and start it in a task
"""
Expand Down Expand Up @@ -484,8 +485,8 @@ async def run_job(self, job_id: str, score: int) -> None: # noqa: C901
abort_job = False

function_name, enqueue_time_ms = '<unknown>', 0
args: Tuple[Any, ...] = ()
kwargs: Dict[Any, Any] = {}
args: tuple[Any, ...] = ()
kwargs: dict[Any, Any] = {}

async def job_failed(exc: BaseException) -> None:
self.jobs_failed += 1
Expand Down Expand Up @@ -879,7 +880,7 @@ def __repr__(self) -> str:
)


def get_kwargs(settings_cls: 'WorkerSettingsType') -> Dict[str, NameError]:
def get_kwargs(settings_cls: 'WorkerSettingsType') -> dict[str, NameError]:
worker_args = set(inspect.signature(Worker).parameters.keys())
d = settings_cls if isinstance(settings_cls, dict) else settings_cls.__dict__
return {k: v for k, v in d.items() if k in worker_args}
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ exclude_lines = [
]

[tool.ruff]
target-version = "py39"
line-length = 120

[tool.ruff.lint]
Expand Down
2 changes: 1 addition & 1 deletion requirements/linting.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mypy-extensions==1.0.0
# via mypy
pycparser==2.22
# via cffi
ruff==0.3.4
ruff==0.14.14
# via -r requirements/linting.in
tomli==2.4.0
# via mypy
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import functools
import os
import sys
from typing import Generator
from collections.abc import Generator

import msgpack
import pytest
Expand Down
2 changes: 1 addition & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ async def test_job_old(arq_redis: ArqRedis, worker, caplog):
assert worker.jobs_retried == 0

log = re.sub(r'(\d+).\d\ds', r'\1.XXs', '\n'.join(r.message for r in caplog.records))
assert log.endswith(' 0.XXs → testing:foobar() delayed=2.XXs\n' ' 0.XXs ← testing:foobar ● 42')
assert log.endswith(' 0.XXs → testing:foobar() delayed=2.XXs\n 0.XXs ← testing:foobar ● 42')


async def test_retry_repr():
Expand Down