Skip to content

Commit 1d9acfc

Browse files
authored
Reorganize exports for fewer and more succint imports in real use (#5)
This one's aimed at moving some types/files around so that a program using the package has a more succinct set of River imports. Types that are meant to be public-facing largely end up in `riverqueue/__init__.py`, and more internal types get moved elsewhere. So what used to be this: from riverqueue.client import Client from riverqueue.models import InsertOpts, UniqueOpts from riverqueue.drivers.sqlalchemy.sqlalchemy_driver import SqlAlchemyDriver client = Client(SqlAlchemyDriver(engine)) result = client.insert( MyJobArgs(), insert_opts=InsertOpts(unique_opts=UniqueOpts(by_period=900)), ) Becomes: from riverqueue import Client, InsertOpts, UniqueOpts from riverqueue.driver import riversqlalchemy client = Client(riversqlalchemy.Driver(engine)) result = client.insert( MyJobArgs(), insert_opts=InsertOpts(unique_opts=UniqueOpts(by_period=900)), ) (It's a subtle change to a degree, but see the import lines in particular.) There's a few other changes to clean up naming somewhat: * `sqlalchemy_driver.SqlAlchemyDriver` -> `riversqlalchemy.Driver` * `riverqueue.drivers` -> `riverqueue.driver` (plural to singular) to better match up other River projects. * `riverqueue.models` -> `riverqueue.model` (plural to singular) to match the change for drivers. * `Driver` -> `DriverProtocol` so there's no ambiguity problems where both it and a concrete driver implementation are imported together.
1 parent 7e86175 commit 1d9acfc

File tree

16 files changed

+240
-246
lines changed

16 files changed

+240
-246
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[project]
2-
name = "riverqueue-py"
2+
name = "riverqueue"
33
version = "0.1.0"
44
description = "Add your description here"
55
authors = [

src/riverqueue/__init__.py

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
from dataclasses import dataclass
2+
from datetime import datetime, timezone, timedelta
3+
from typing import Any, Optional, Protocol, Tuple, List, Callable
4+
5+
from .driver import GetParams, JobInsertParams, DriverProtocol
6+
from .model import InsertResult
7+
from .fnv import fnv1_hash
8+
9+
MAX_ATTEMPTS_DEFAULT = 25
10+
PRIORITY_DEFAULT = 1
11+
QUEUE_DEFAULT = "default"
12+
13+
DEFAULT_UNIQUE_STATES = ["available", "completed", "running", "retryable", "scheduled"]
14+
15+
16+
class Args(Protocol):
17+
kind: str
18+
19+
def to_json(self) -> str:
20+
pass
21+
22+
23+
@dataclass
24+
class InsertManyParams:
25+
args: Args
26+
insert_opts: Optional["InsertOpts"] = None
27+
28+
29+
@dataclass
30+
class InsertOpts:
31+
scheduled_at: Optional[datetime] = None
32+
unique_opts: Optional["UniqueOpts"] = None
33+
max_attempts: Optional[int] = None
34+
priority: Optional[int] = None
35+
queue: Optional[str] = None
36+
tags: Optional[List[Any]] = None
37+
38+
39+
@dataclass
40+
class UniqueOpts:
41+
by_args: Optional[Any] = None
42+
by_period: Optional[Any] = None
43+
by_queue: Optional[Any] = None
44+
by_state: Optional[Any] = None
45+
46+
47+
class Client:
48+
def __init__(self, driver: DriverProtocol, advisory_lock_prefix=None):
49+
self.driver = driver
50+
self.advisory_lock_prefix = advisory_lock_prefix
51+
self.time_now_utc = lambda: datetime.now(timezone.utc) # for test time stubbing
52+
53+
def insert(
54+
self, args: Args, insert_opts: Optional[InsertOpts] = None
55+
) -> InsertResult:
56+
if not insert_opts:
57+
insert_opts = InsertOpts()
58+
insert_params, unique_opts = self.make_insert_params(args, insert_opts)
59+
60+
def insert():
61+
print(self.driver)
62+
return InsertResult(self.driver.job_insert(insert_params))
63+
64+
return self.check_unique_job(insert_params, unique_opts, insert)
65+
66+
def insert_many(self, args: List[Args]) -> List[InsertResult]:
67+
all_params = [
68+
self.make_insert_params(
69+
arg.args, arg.insert_opts or InsertOpts(), is_insert_many=True
70+
)[0]
71+
if isinstance(arg, InsertManyParams)
72+
else self.make_insert_params(arg, InsertOpts(), is_insert_many=True)[0]
73+
for arg in args
74+
]
75+
return [InsertResult(x) for x in self.driver.job_insert_many(all_params)]
76+
77+
def check_unique_job(
78+
self,
79+
insert_params: JobInsertParams,
80+
unique_opts: Optional[UniqueOpts],
81+
insert_func: Callable[[], InsertResult],
82+
) -> InsertResult:
83+
if unique_opts is None:
84+
return insert_func()
85+
86+
any_unique_opts = False
87+
get_params = GetParams(kind=insert_params.kind)
88+
89+
lock_str = f"unique_keykind={insert_params.kind}"
90+
91+
if unique_opts.by_args:
92+
any_unique_opts = True
93+
get_params.by_args = True
94+
get_params.args = insert_params.args
95+
lock_str += f"&args={insert_params.args}"
96+
97+
if unique_opts.by_period:
98+
lower_period_bound = self.truncate_time(
99+
self.time_now_utc(), unique_opts.by_period
100+
)
101+
102+
any_unique_opts = True
103+
get_params.by_created_at = True
104+
get_params.created_at = [
105+
lower_period_bound,
106+
lower_period_bound + timedelta(seconds=unique_opts.by_period),
107+
]
108+
lock_str += f"&period={lower_period_bound.strftime('%FT%TZ')}"
109+
110+
if unique_opts.by_queue:
111+
any_unique_opts = True
112+
get_params.by_queue = True
113+
get_params.queue = insert_params.queue
114+
lock_str += f"&queue={insert_params.queue}"
115+
116+
if unique_opts.by_state:
117+
any_unique_opts = True
118+
get_params.by_state = True
119+
get_params.state = unique_opts.by_state
120+
lock_str += f"&state={','.join(unique_opts.by_state)}"
121+
else:
122+
get_params.state = DEFAULT_UNIQUE_STATES
123+
lock_str += f"&state={','.join(DEFAULT_UNIQUE_STATES)}"
124+
125+
if not any_unique_opts:
126+
return insert_func()
127+
128+
with self.driver.transaction():
129+
if self.advisory_lock_prefix is None:
130+
lock_key = fnv1_hash(lock_str.encode("utf-8"), 64)
131+
else:
132+
prefix = self.advisory_lock_prefix
133+
lock_key = (prefix << 32) | fnv1_hash(lock_str.encode("utf-8"), 32)
134+
135+
lock_key = self.uint64_to_int64(lock_key)
136+
self.driver.advisory_lock(lock_key)
137+
138+
existing_job = self.driver.job_get_by_kind_and_unique_properties(get_params)
139+
if existing_job:
140+
return InsertResult(existing_job, unique_skipped_as_duplicated=True)
141+
142+
return insert_func()
143+
144+
@staticmethod
145+
def make_insert_params(
146+
args: Args, insert_opts: InsertOpts, is_insert_many: bool = False
147+
) -> Tuple[JobInsertParams, Optional[UniqueOpts]]:
148+
if not hasattr(args, "kind"):
149+
raise Exception("args should respond to `kind`")
150+
151+
args_json = args.to_json()
152+
if args_json is None:
153+
raise Exception("args should return non-nil from `to_json`")
154+
155+
args_insert_opts = getattr(args, "insert_opts", InsertOpts())
156+
157+
scheduled_at = insert_opts.scheduled_at or args_insert_opts.scheduled_at
158+
unique_opts = insert_opts.unique_opts or args_insert_opts.unique_opts
159+
160+
if is_insert_many and unique_opts:
161+
raise ValueError("unique opts can't be used with `insert_many`")
162+
163+
insert_params = JobInsertParams(
164+
args=args_json,
165+
kind=args.kind,
166+
max_attempts=insert_opts.max_attempts
167+
or args_insert_opts.max_attempts
168+
or MAX_ATTEMPTS_DEFAULT,
169+
priority=insert_opts.priority
170+
or args_insert_opts.priority
171+
or PRIORITY_DEFAULT,
172+
queue=insert_opts.queue or args_insert_opts.queue or QUEUE_DEFAULT,
173+
scheduled_at=scheduled_at and scheduled_at.astimezone(timezone.utc),
174+
state="scheduled" if scheduled_at else "available",
175+
tags=insert_opts.tags or args_insert_opts.tags,
176+
)
177+
178+
return insert_params, unique_opts
179+
180+
@staticmethod
181+
def truncate_time(time, interval_seconds) -> datetime:
182+
return datetime.fromtimestamp(
183+
(time.timestamp() // interval_seconds) * interval_seconds, tz=timezone.utc
184+
)
185+
186+
@staticmethod
187+
def uint64_to_int64(uint64):
188+
# Packs a uint64 then unpacks to int64 to fit within Postgres bigint
189+
return (uint64 + (1 << 63)) % (1 << 64) - (1 << 63)

src/riverqueue/client.py

Lines changed: 0 additions & 164 deletions
This file was deleted.

src/riverqueue/driver.py

Lines changed: 0 additions & 20 deletions
This file was deleted.

0 commit comments

Comments
 (0)