Skip to content

Commit 9565b9c

Browse files
authored
Better structure by putting types in more appropriate files and reexporting (#8)
Follows up #5 to keep its API, but undo a lot of the file structure changes. A big downside of #5 is that a lot o the core code ends up in `__init__.py` files, which are really hard to fast find because they all have the same name, and generally just awkward to work with. Python docs make this impossible to find, but I found a trick in SQLAlchemy where it's possible to reexport types from a module using an `as` in an `import` like this: # Reexport for more ergonomic use in calling code. from .driver_protocol import ( GetParams as GetParams, JobInsertParams as JobInsertParams, DriverProtocol as DriverProtocol, ) Here, move code back into files with names better suited for it (e.g. `Client` goes back in `client.py`) and use this trick to reexport user-facing types so that we keep the same import ergonomic benefits as laid out in #5.
1 parent 6214d1b commit 9565b9c

File tree

6 files changed

+290
-275
lines changed

6 files changed

+290
-275
lines changed

src/riverqueue/__init__.py

Lines changed: 7 additions & 189 deletions
Original file line numberDiff line numberDiff line change
@@ -1,189 +1,7 @@
1-
from dataclasses import dataclass
2-
from datetime import datetime, timezone, timedelta
3-
from typing import Any, Optional, Protocol, Tuple, List, Callable
4-
5-
from .driver import GetParams, JobInsertParams, DriverProtocol
6-
from .model import InsertResult
7-
from .fnv import fnv1_hash
8-
9-
MAX_ATTEMPTS_DEFAULT = 25
10-
PRIORITY_DEFAULT = 1
11-
QUEUE_DEFAULT = "default"
12-
13-
DEFAULT_UNIQUE_STATES = ["available", "completed", "running", "retryable", "scheduled"]
14-
15-
16-
class Args(Protocol):
17-
kind: str
18-
19-
def to_json(self) -> str:
20-
pass
21-
22-
23-
@dataclass
24-
class InsertManyParams:
25-
args: Args
26-
insert_opts: Optional["InsertOpts"] = None
27-
28-
29-
@dataclass
30-
class InsertOpts:
31-
scheduled_at: Optional[datetime] = None
32-
unique_opts: Optional["UniqueOpts"] = None
33-
max_attempts: Optional[int] = None
34-
priority: Optional[int] = None
35-
queue: Optional[str] = None
36-
tags: Optional[List[Any]] = None
37-
38-
39-
@dataclass
40-
class UniqueOpts:
41-
by_args: Optional[Any] = None
42-
by_period: Optional[Any] = None
43-
by_queue: Optional[Any] = None
44-
by_state: Optional[Any] = None
45-
46-
47-
class Client:
48-
def __init__(self, driver: DriverProtocol, advisory_lock_prefix=None):
49-
self.driver = driver
50-
self.advisory_lock_prefix = advisory_lock_prefix
51-
self.time_now_utc = lambda: datetime.now(timezone.utc) # for test time stubbing
52-
53-
def insert(
54-
self, args: Args, insert_opts: Optional[InsertOpts] = None
55-
) -> InsertResult:
56-
if not insert_opts:
57-
insert_opts = InsertOpts()
58-
insert_params, unique_opts = self.__make_insert_params(args, insert_opts)
59-
60-
def insert():
61-
print(self.driver)
62-
return InsertResult(self.driver.job_insert(insert_params))
63-
64-
return self.__check_unique_job(insert_params, unique_opts, insert)
65-
66-
def insert_many(self, args: List[Args]) -> List[InsertResult]:
67-
all_params = [
68-
self.__make_insert_params(
69-
arg.args, arg.insert_opts or InsertOpts(), is_insert_many=True
70-
)[0]
71-
if isinstance(arg, InsertManyParams)
72-
else self.__make_insert_params(arg, InsertOpts(), is_insert_many=True)[0]
73-
for arg in args
74-
]
75-
return [InsertResult(x) for x in self.driver.job_insert_many(all_params)]
76-
77-
def __check_unique_job(
78-
self,
79-
insert_params: JobInsertParams,
80-
unique_opts: Optional[UniqueOpts],
81-
insert_func: Callable[[], InsertResult],
82-
) -> InsertResult:
83-
if unique_opts is None:
84-
return insert_func()
85-
86-
any_unique_opts = False
87-
get_params = GetParams(kind=insert_params.kind)
88-
89-
lock_str = f"unique_keykind={insert_params.kind}"
90-
91-
if unique_opts.by_args:
92-
any_unique_opts = True
93-
get_params.by_args = True
94-
get_params.args = insert_params.args
95-
lock_str += f"&args={insert_params.args}"
96-
97-
if unique_opts.by_period:
98-
lower_period_bound = self.__truncate_time(
99-
self.time_now_utc(), unique_opts.by_period
100-
)
101-
102-
any_unique_opts = True
103-
get_params.by_created_at = True
104-
get_params.created_at = [
105-
lower_period_bound,
106-
lower_period_bound + timedelta(seconds=unique_opts.by_period),
107-
]
108-
lock_str += f"&period={lower_period_bound.strftime('%FT%TZ')}"
109-
110-
if unique_opts.by_queue:
111-
any_unique_opts = True
112-
get_params.by_queue = True
113-
get_params.queue = insert_params.queue
114-
lock_str += f"&queue={insert_params.queue}"
115-
116-
if unique_opts.by_state:
117-
any_unique_opts = True
118-
get_params.by_state = True
119-
get_params.state = unique_opts.by_state
120-
lock_str += f"&state={','.join(unique_opts.by_state)}"
121-
else:
122-
get_params.state = DEFAULT_UNIQUE_STATES
123-
lock_str += f"&state={','.join(DEFAULT_UNIQUE_STATES)}"
124-
125-
if not any_unique_opts:
126-
return insert_func()
127-
128-
with self.driver.transaction():
129-
if self.advisory_lock_prefix is None:
130-
lock_key = fnv1_hash(lock_str.encode("utf-8"), 64)
131-
else:
132-
prefix = self.advisory_lock_prefix
133-
lock_key = (prefix << 32) | fnv1_hash(lock_str.encode("utf-8"), 32)
134-
135-
lock_key = self.__uint64_to_int64(lock_key)
136-
self.driver.advisory_lock(lock_key)
137-
138-
existing_job = self.driver.job_get_by_kind_and_unique_properties(get_params)
139-
if existing_job:
140-
return InsertResult(existing_job, unique_skipped_as_duplicated=True)
141-
142-
return insert_func()
143-
144-
@staticmethod
145-
def __make_insert_params(
146-
args: Args, insert_opts: InsertOpts, is_insert_many: bool = False
147-
) -> Tuple[JobInsertParams, Optional[UniqueOpts]]:
148-
if not hasattr(args, "kind"):
149-
raise Exception("args should respond to `kind`")
150-
151-
args_json = args.to_json()
152-
if args_json is None:
153-
raise Exception("args should return non-nil from `to_json`")
154-
155-
args_insert_opts = getattr(args, "insert_opts", InsertOpts())
156-
157-
scheduled_at = insert_opts.scheduled_at or args_insert_opts.scheduled_at
158-
unique_opts = insert_opts.unique_opts or args_insert_opts.unique_opts
159-
160-
if is_insert_many and unique_opts:
161-
raise ValueError("unique opts can't be used with `insert_many`")
162-
163-
insert_params = JobInsertParams(
164-
args=args_json,
165-
kind=args.kind,
166-
max_attempts=insert_opts.max_attempts
167-
or args_insert_opts.max_attempts
168-
or MAX_ATTEMPTS_DEFAULT,
169-
priority=insert_opts.priority
170-
or args_insert_opts.priority
171-
or PRIORITY_DEFAULT,
172-
queue=insert_opts.queue or args_insert_opts.queue or QUEUE_DEFAULT,
173-
scheduled_at=scheduled_at and scheduled_at.astimezone(timezone.utc),
174-
state="scheduled" if scheduled_at else "available",
175-
tags=insert_opts.tags or args_insert_opts.tags,
176-
)
177-
178-
return insert_params, unique_opts
179-
180-
@staticmethod
181-
def __truncate_time(time, interval_seconds) -> datetime:
182-
return datetime.fromtimestamp(
183-
(time.timestamp() // interval_seconds) * interval_seconds, tz=timezone.utc
184-
)
185-
186-
@staticmethod
187-
def __uint64_to_int64(uint64):
188-
# Packs a uint64 then unpacks to int64 to fit within Postgres bigint
189-
return (uint64 + (1 << 63)) % (1 << 64) - (1 << 63)
1+
# Reexport for more ergonomic use in calling code.
2+
from .client import (
3+
Args as Args,
4+
Client as Client,
5+
InsertOpts as InsertOpts,
6+
UniqueOpts as UniqueOpts,
7+
)

src/riverqueue/client.py

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
from dataclasses import dataclass
2+
from datetime import datetime, timezone, timedelta
3+
from typing import Any, Optional, Protocol, Tuple, List, Callable
4+
5+
from .driver import GetParams, JobInsertParams, DriverProtocol
6+
from .model import InsertResult
7+
from .fnv import fnv1_hash
8+
9+
MAX_ATTEMPTS_DEFAULT = 25
10+
PRIORITY_DEFAULT = 1
11+
QUEUE_DEFAULT = "default"
12+
13+
DEFAULT_UNIQUE_STATES = ["available", "completed", "running", "retryable", "scheduled"]
14+
15+
16+
class Args(Protocol):
17+
kind: str
18+
19+
def to_json(self) -> str:
20+
pass
21+
22+
23+
@dataclass
24+
class InsertManyParams:
25+
args: Args
26+
insert_opts: Optional["InsertOpts"] = None
27+
28+
29+
@dataclass
30+
class InsertOpts:
31+
scheduled_at: Optional[datetime] = None
32+
unique_opts: Optional["UniqueOpts"] = None
33+
max_attempts: Optional[int] = None
34+
priority: Optional[int] = None
35+
queue: Optional[str] = None
36+
tags: Optional[List[Any]] = None
37+
38+
39+
@dataclass
40+
class UniqueOpts:
41+
by_args: Optional[Any] = None
42+
by_period: Optional[Any] = None
43+
by_queue: Optional[Any] = None
44+
by_state: Optional[Any] = None
45+
46+
47+
class Client:
48+
def __init__(self, driver: DriverProtocol, advisory_lock_prefix=None):
49+
self.driver = driver
50+
self.advisory_lock_prefix = advisory_lock_prefix
51+
self.time_now_utc = lambda: datetime.now(timezone.utc) # for test time stubbing
52+
53+
def insert(
54+
self, args: Args, insert_opts: Optional[InsertOpts] = None
55+
) -> InsertResult:
56+
if not insert_opts:
57+
insert_opts = InsertOpts()
58+
insert_params, unique_opts = self.__make_insert_params(args, insert_opts)
59+
60+
def insert():
61+
print(self.driver)
62+
return InsertResult(self.driver.job_insert(insert_params))
63+
64+
return self.__check_unique_job(insert_params, unique_opts, insert)
65+
66+
def insert_many(self, args: List[Args]) -> List[InsertResult]:
67+
all_params = [
68+
self.__make_insert_params(
69+
arg.args, arg.insert_opts or InsertOpts(), is_insert_many=True
70+
)[0]
71+
if isinstance(arg, InsertManyParams)
72+
else self.__make_insert_params(arg, InsertOpts(), is_insert_many=True)[0]
73+
for arg in args
74+
]
75+
return [InsertResult(x) for x in self.driver.job_insert_many(all_params)]
76+
77+
def __check_unique_job(
78+
self,
79+
insert_params: JobInsertParams,
80+
unique_opts: Optional[UniqueOpts],
81+
insert_func: Callable[[], InsertResult],
82+
) -> InsertResult:
83+
if unique_opts is None:
84+
return insert_func()
85+
86+
any_unique_opts = False
87+
get_params = GetParams(kind=insert_params.kind)
88+
89+
lock_str = f"unique_keykind={insert_params.kind}"
90+
91+
if unique_opts.by_args:
92+
any_unique_opts = True
93+
get_params.by_args = True
94+
get_params.args = insert_params.args
95+
lock_str += f"&args={insert_params.args}"
96+
97+
if unique_opts.by_period:
98+
lower_period_bound = self.__truncate_time(
99+
self.time_now_utc(), unique_opts.by_period
100+
)
101+
102+
any_unique_opts = True
103+
get_params.by_created_at = True
104+
get_params.created_at = [
105+
lower_period_bound,
106+
lower_period_bound + timedelta(seconds=unique_opts.by_period),
107+
]
108+
lock_str += f"&period={lower_period_bound.strftime('%FT%TZ')}"
109+
110+
if unique_opts.by_queue:
111+
any_unique_opts = True
112+
get_params.by_queue = True
113+
get_params.queue = insert_params.queue
114+
lock_str += f"&queue={insert_params.queue}"
115+
116+
if unique_opts.by_state:
117+
any_unique_opts = True
118+
get_params.by_state = True
119+
get_params.state = unique_opts.by_state
120+
lock_str += f"&state={','.join(unique_opts.by_state)}"
121+
else:
122+
get_params.state = DEFAULT_UNIQUE_STATES
123+
lock_str += f"&state={','.join(DEFAULT_UNIQUE_STATES)}"
124+
125+
if not any_unique_opts:
126+
return insert_func()
127+
128+
with self.driver.transaction():
129+
if self.advisory_lock_prefix is None:
130+
lock_key = fnv1_hash(lock_str.encode("utf-8"), 64)
131+
else:
132+
prefix = self.advisory_lock_prefix
133+
lock_key = (prefix << 32) | fnv1_hash(lock_str.encode("utf-8"), 32)
134+
135+
lock_key = self.__uint64_to_int64(lock_key)
136+
self.driver.advisory_lock(lock_key)
137+
138+
existing_job = self.driver.job_get_by_kind_and_unique_properties(get_params)
139+
if existing_job:
140+
return InsertResult(existing_job, unique_skipped_as_duplicated=True)
141+
142+
return insert_func()
143+
144+
@staticmethod
145+
def __make_insert_params(
146+
args: Args, insert_opts: InsertOpts, is_insert_many: bool = False
147+
) -> Tuple[JobInsertParams, Optional[UniqueOpts]]:
148+
if not hasattr(args, "kind"):
149+
raise Exception("args should respond to `kind`")
150+
151+
args_json = args.to_json()
152+
if args_json is None:
153+
raise Exception("args should return non-nil from `to_json`")
154+
155+
args_insert_opts = getattr(args, "insert_opts", InsertOpts())
156+
157+
scheduled_at = insert_opts.scheduled_at or args_insert_opts.scheduled_at
158+
unique_opts = insert_opts.unique_opts or args_insert_opts.unique_opts
159+
160+
if is_insert_many and unique_opts:
161+
raise ValueError("unique opts can't be used with `insert_many`")
162+
163+
insert_params = JobInsertParams(
164+
args=args_json,
165+
kind=args.kind,
166+
max_attempts=insert_opts.max_attempts
167+
or args_insert_opts.max_attempts
168+
or MAX_ATTEMPTS_DEFAULT,
169+
priority=insert_opts.priority
170+
or args_insert_opts.priority
171+
or PRIORITY_DEFAULT,
172+
queue=insert_opts.queue or args_insert_opts.queue or QUEUE_DEFAULT,
173+
scheduled_at=scheduled_at and scheduled_at.astimezone(timezone.utc),
174+
state="scheduled" if scheduled_at else "available",
175+
tags=insert_opts.tags or args_insert_opts.tags,
176+
)
177+
178+
return insert_params, unique_opts
179+
180+
@staticmethod
181+
def __truncate_time(time, interval_seconds) -> datetime:
182+
return datetime.fromtimestamp(
183+
(time.timestamp() // interval_seconds) * interval_seconds, tz=timezone.utc
184+
)
185+
186+
@staticmethod
187+
def __uint64_to_int64(uint64):
188+
# Packs a uint64 then unpacks to int64 to fit within Postgres bigint
189+
return (uint64 + (1 << 63)) % (1 << 64) - (1 << 63)

0 commit comments

Comments
 (0)