Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${TEST_DATABASE_NAME};" ${ADMIN_DATABASE_URL}

- name: river migrate-up
run: river migrate-up --database-url "$TEST_DATABASE_URL" --max-steps 5 # temporarily include max steps so tests can pass with unique fixes
run: river migrate-up --database-url "$TEST_DATABASE_URL"

- name: Test
run: rye test
Expand Down Expand Up @@ -109,7 +109,7 @@ jobs:
run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${DATABASE_NAME};" ${ADMIN_DATABASE_URL}

- name: river migrate-up
run: river migrate-up --database-url "$DATABASE_URL" --max-steps 5 # temporarily include max steps so tests can pass with unique fixes
run: river migrate-up --database-url "$DATABASE_URL"

- name: Run examples
run: rye run python3 -m examples.all
Expand Down
24 changes: 23 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Breaking

- **Breaking change:** The return type of `Client#insert_many` and `Client#insert_many_tx` has been changed. Rather than returning just the number of rows inserted, it returns an array of all the `InsertResult` values for each inserted row. Unique conflicts which are skipped as duplicates are indicated in the same fashion as single inserts (the `unique_skipped_as_duplicated` attribute), and in such cases the conflicting row will be returned instead. [PR #38](https://github.com/riverqueue/riverqueue-python/pull/38).
- **Breaking change:** Unique jobs no longer allow total customization of their states when using the `by_state` option. The pending, scheduled, available, and running states are required whenever customizing this list.

### Added

- The `UniqueOpts` class gains an `exclude_kind` option for cases where uniqueness needs to be guaranteed across multiple job types.
- Unique jobs utilizing `by_args` can now also opt to have a subset of the job's arguments considered for uniqueness. For example, you could choose to consider only the `customer_id` field while ignoring the other fields:

```python
UniqueOpts(by_args=["customer_id"])
```

Any fields considered in uniqueness are also sorted alphabetically in order to guarantee a consistent result across implementations, even if the encoded JSON isn't sorted consistently.

### Changed

- Unique jobs have been improved to allow bulk insertion of unique jobs via `Client#insert_many`.

This updated implementation is significantly faster due to the removal of advisory locks in favor of an index-backed uniqueness system, while allowing some flexibility in which job states are considered. However, not all states may be removed from consideration when using the `by_state` option; pending, scheduled, available, and running states are required whenever customizing this list.

## [0.7.0] - 2024-07-30

### Changed
Expand Down Expand Up @@ -79,4 +101,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Initial release, supporting insertion through [SQLAlchemy](https://www.sqlalchemy.org/) and its underlying Postgres drivers like [`psycopg2`](https://pypi.org/project/psycopg2/) or [`asyncpg`](https://github.com/MagicStack/asyncpg) (for async).
- Initial release, supporting insertion through [SQLAlchemy](https://www.sqlalchemy.org/) and its underlying Postgres drivers like [`psycopg2`](https://pypi.org/project/psycopg2/) or [`asyncpg`](https://github.com/MagicStack/asyncpg) (for async).
18 changes: 5 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class JobArgs(Protocol):
pass
```

* `kind` is a unique string that identifies them the job in the database, and which a Go worker will recognize.
* `to_json()` defines how the job will serialize to JSON, which of course will have to be parseable as an object in Go.
- `kind` is a unique string that identifies them the job in the database, and which a Go worker will recognize.
- `to_json()` defines how the job will serialize to JSON, which of course will have to be parseable as an object in Go.

They may also respond to `insert_opts()` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind.

Expand Down Expand Up @@ -95,22 +95,14 @@ insert_res.job
insert_res.unique_skipped_as_duplicated
```

### Custom advisory lock prefix

Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:

```python
client = riverqueue.Client(riversqlalchemy.Driver(engine), advisory_lock_prefix=123456)
```

Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.
Unique jobs can also be inserted in bulk.

## Inserting jobs in bulk

Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency:

```python
num_inserted = client.insert_many([
results = client.insert_many([
SimpleArgs(job_num=1),
SimpleArgs(job_num=2)
])
Expand All @@ -119,7 +111,7 @@ num_inserted = client.insert_many([
Or with `InsertManyParams`, which may include insertion options:

```python
num_inserted = client.insert_many([
results = client.insert_many([
InsertManyParams(args=SimpleArgs(job_num=1), insert_opts=riverqueue.InsertOpts(max_attempts=5)),
InsertManyParams(args=SimpleArgs(job_num=2), insert_opts=riverqueue.InsertOpts(queue="high_priority"))
])
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# all-features: false
# with-sources: false
# generate-hashes: false
# universal: false

-e file:.
asyncpg==0.29.0
Expand Down
1 change: 1 addition & 0 deletions requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# all-features: false
# with-sources: false
# generate-hashes: false
# universal: false

-e file:.
sqlalchemy==2.0.30
Expand Down
Loading
Loading