Skip to content

Commit 963e9bd

Browse files
authored
Implement #insert_many for batch job insertion (#5)
Implement `#insert_many` to support inserting jobs in bulk, similar to what the Go client supports. For better ergonomics, takes either an array of job args, or an array of `InsertManyParams`, which is a tuple of job args and insert opts (again, very similar to the Go API). Also, I seem to have found a way to reimplement the ActiveRecord driver so that it doesn't have to drop down to Arel, which ends up simplifying the code quite a bit and making it more alike the Sequel driver. Lastly, tighten up documentation somewhat by including code example for the important API methods.
1 parent a64617f commit 963e9bd

File tree

16 files changed

+492
-102
lines changed

16 files changed

+492
-102
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [0.2.0] - 2024-04-27
11+
12+
### Added
13+
14+
- Implement #insert_many for batch job insertion. [PR #5](https://github.com/riverqueue/riverqueue-ruby/pull/5).
15+
1016
## [0.1.0] - 2024-04-25
1117

1218
### Added

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,8 @@ standardrb:
2020
steep:
2121
bundle exec steep check
2222

23+
.PHONY: test
24+
test: spec
25+
2326
.PHONY: type-check
2427
type-check: steep

docs/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class SortArgs
3434
end
3535

3636
insert_res = client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"]))
37+
insert_res.job # inserted job row
3738
```
3839

3940
Job args should:
@@ -69,6 +70,26 @@ insert_res = client.insert(River::JobArgsHash.new("hash_kind", {
6970
}))
7071
```
7172

73+
### Bulk inserting jobs
74+
75+
Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency:
76+
77+
```ruby
78+
num_inserted = client.insert_many([
79+
SimpleArgs.new(job_num: 1),
80+
SimpleArgs.new(job_num: 2)
81+
])
82+
```
83+
84+
Or with `InsertManyParams`, which may include insertion options:
85+
86+
```ruby
87+
num_inserted = client.insert_many([
88+
River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: InsertOpts.new(max_attempts: 5)),
89+
River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: InsertOpts.new(queue: "high_priority"))
90+
])
91+
```
92+
7293
## Drivers
7394

7495
### ActiveRecord

docs/development.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ $ open coverage/index.html
4444

4545
## Publish gems
4646

47+
Update `CHANGELOG.md` to include the new version and open a pull request with the changes.
48+
4749
```shell
4850
git checkout master && git pull --rebase
4951
export VERSION=v0.0.x

drivers/riverqueue-activerecord/lib/driver.rb

Lines changed: 49 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -22,66 +22,72 @@ def self.dangerous_attribute_method?(method_name)
2222
return false if method_name == "errors"
2323
super
2424
end
25+
26+
# See comment above, but since we force allowed `errors` as an
27+
# attribute name, ActiveRecord would otherwise fail to save a row as
28+
# it checked for its own `errors` hash and finding no values.
29+
def errors = {}
2530
end)
2631
end
2732
end
2833

2934
def insert(insert_params)
35+
to_job_row(RiverJob.create(insert_params_to_hash(insert_params)))
36+
end
37+
38+
def insert_many(insert_params_many)
39+
RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) })
40+
insert_params_many.count
41+
end
42+
43+
private def insert_params_to_hash(insert_params)
3044
# the call to `#compact` is important so that we remove nils and table
3145
# default values get picked up instead
32-
to_job_row(
33-
RiverJob.insert(
34-
{
35-
args: insert_params.encoded_args,
36-
kind: insert_params.kind,
37-
max_attempts: insert_params.max_attempts,
38-
priority: insert_params.priority,
39-
queue: insert_params.queue,
40-
state: insert_params.state,
41-
scheduled_at: insert_params.scheduled_at,
42-
tags: insert_params.tags
43-
}.compact,
44-
returning: Arel.sql("*")
45-
).first
46-
)
46+
{
47+
args: insert_params.encoded_args,
48+
kind: insert_params.kind,
49+
max_attempts: insert_params.max_attempts,
50+
priority: insert_params.priority,
51+
queue: insert_params.queue,
52+
state: insert_params.state,
53+
scheduled_at: insert_params.scheduled_at,
54+
tags: insert_params.tags
55+
}.compact
4756
end
4857

4958
# Type type injected to this method is not a `RiverJob`, but rather a raw
5059
# hash with stringified keys because we're inserting with the Arel framework
5160
# directly rather than generating a record from a model.
52-
private def to_job_row(raw_job)
53-
deserialize = ->(field) do
54-
RiverJob._default_attributes[field].type.deserialize(raw_job[field])
55-
end
56-
57-
# Errors is `jsonb[]` so the subtype here will decode `jsonb`.
58-
errors_subtype = RiverJob._default_attributes["errors"].type.subtype
61+
private def to_job_row(river_job)
62+
# needs to be accessed through values because `errors` is shadowed by both
63+
# ActiveRecord and the patch above
64+
errors = river_job.attributes["errors"]
5965

6066
River::JobRow.new(
61-
id: deserialize.call("id"),
62-
args: deserialize.call("args").yield_self { |a| a ? JSON.parse(a) : nil },
63-
attempt: deserialize.call("attempt"),
64-
attempted_at: deserialize.call("attempted_at"),
65-
attempted_by: deserialize.call("attempted_by"),
66-
created_at: deserialize.call("created_at"),
67-
errors: deserialize.call("errors")&.map do |e|
68-
deserialized_error = errors_subtype.deserialize(e)
67+
id: river_job.id,
68+
args: river_job.args ? JSON.parse(river_job.args) : nil,
69+
attempt: river_job.attempt,
70+
attempted_at: river_job.attempted_at,
71+
attempted_by: river_job.attempted_by,
72+
created_at: river_job.created_at,
73+
errors: errors&.map { |e|
74+
deserialized_error = JSON.parse(e, symbolize_names: true)
6975

7076
River::AttemptError.new(
71-
at: Time.parse(deserialized_error["at"]),
72-
attempt: deserialized_error["attempt"],
73-
error: deserialized_error["error"],
74-
trace: deserialized_error["trace"]
77+
at: Time.parse(deserialized_error[:at]),
78+
attempt: deserialized_error[:attempt],
79+
error: deserialized_error[:error],
80+
trace: deserialized_error[:trace]
7581
)
76-
end,
77-
finalized_at: deserialize.call("finalized_at"),
78-
kind: deserialize.call("kind"),
79-
max_attempts: deserialize.call("max_attempts"),
80-
priority: deserialize.call("priority"),
81-
queue: deserialize.call("queue"),
82-
scheduled_at: deserialize.call("scheduled_at"),
83-
state: deserialize.call("state"),
84-
tags: deserialize.call("tags")
82+
},
83+
finalized_at: river_job.finalized_at,
84+
kind: river_job.kind,
85+
max_attempts: river_job.max_attempts,
86+
priority: river_job.priority,
87+
queue: river_job.queue,
88+
scheduled_at: river_job.scheduled_at,
89+
state: river_job.state,
90+
tags: river_job.tags
8591
)
8692
end
8793
end

drivers/riverqueue-activerecord/spec/driver_spec.rb

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,22 +127,86 @@ class SimpleArgsWithInsertOpts < SimpleArgs
127127
raise ActiveRecord::Rollback
128128
end
129129

130-
# Not visible because the job was rolled back.
130+
# Not present because the job was rolled back.
131131
river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id)
132132
expect(river_job).to be_nil
133133
end
134134
end
135135

136+
describe "#insert_many" do
137+
it "inserts multiple jobs" do
138+
num_inserted = client.insert_many([
139+
SimpleArgs.new(job_num: 1),
140+
SimpleArgs.new(job_num: 2)
141+
])
142+
expect(num_inserted).to eq(2)
143+
144+
job1 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.first)
145+
expect(job1).to have_attributes(
146+
attempt: 0,
147+
args: {"job_num" => 1},
148+
created_at: be_within(2).of(Time.now.utc),
149+
kind: "simple",
150+
max_attempts: River::MAX_ATTEMPTS_DEFAULT,
151+
queue: River::QUEUE_DEFAULT,
152+
priority: River::PRIORITY_DEFAULT,
153+
scheduled_at: be_within(2).of(Time.now.utc),
154+
state: River::JOB_STATE_AVAILABLE,
155+
tags: []
156+
)
157+
158+
job2 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.offset(1).first)
159+
expect(job2).to have_attributes(
160+
attempt: 0,
161+
args: {"job_num" => 2},
162+
created_at: be_within(2).of(Time.now.utc),
163+
kind: "simple",
164+
max_attempts: River::MAX_ATTEMPTS_DEFAULT,
165+
queue: River::QUEUE_DEFAULT,
166+
priority: River::PRIORITY_DEFAULT,
167+
scheduled_at: be_within(2).of(Time.now.utc),
168+
state: River::JOB_STATE_AVAILABLE,
169+
tags: []
170+
)
171+
end
172+
173+
it "inserts multiple jobs in a transaction" do
174+
job1 = nil
175+
job2 = nil
176+
177+
ActiveRecord::Base.transaction(requires_new: true) do
178+
num_inserted = client.insert_many([
179+
SimpleArgs.new(job_num: 1),
180+
SimpleArgs.new(job_num: 2)
181+
])
182+
expect(num_inserted).to eq(2)
183+
184+
job1 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.first)
185+
job2 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.offset(1).first)
186+
187+
raise ActiveRecord::Rollback
188+
end
189+
190+
# Not present because the jobs were rolled back.
191+
expect do
192+
River::Driver::ActiveRecord::RiverJob.find(job1.id)
193+
end.to raise_error(ActiveRecord::RecordNotFound)
194+
expect do
195+
River::Driver::ActiveRecord::RiverJob.find(job2.id)
196+
end.to raise_error(ActiveRecord::RecordNotFound)
197+
end
198+
end
199+
136200
describe "#to_job_row" do
137201
it "converts a database record to `River::JobRow`" do
138202
now = Time.now.utc
139-
river_job = {
203+
river_job = River::Driver::ActiveRecord::RiverJob.new(
140204
id: 1,
141205
attempt: 1,
142206
attempted_at: now,
143207
attempted_by: ["client1"],
144208
created_at: now,
145-
args: JSON.generate(%({"job_num":1})), # encoded twice, like how ActiveRecord returns it
209+
args: %({"job_num":1}),
146210
finalized_at: now,
147211
kind: "simple",
148212
max_attempts: River::MAX_ATTEMPTS_DEFAULT,
@@ -151,7 +215,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs
151215
scheduled_at: now,
152216
state: River::JOB_STATE_COMPLETED,
153217
tags: ["tag1"]
154-
}.transform_keys { |k| k.to_s }
218+
)
155219

156220
job_row = driver.send(:to_job_row, river_job)
157221

@@ -176,7 +240,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs
176240

177241
it "with errors" do
178242
now = Time.now.utc
179-
river_job = {
243+
river_job = River::Driver::ActiveRecord::RiverJob.new(
180244
errors: [JSON.dump(
181245
{
182246
at: now,
@@ -185,7 +249,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs
185249
trace: "error trace"
186250
}
187251
)]
188-
}.transform_keys { |k| k.to_s }
252+
)
189253

190254
job_row = driver.send(:to_job_row, river_job)
191255

drivers/riverqueue-sequel/lib/driver.rb

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,27 @@ def initialize(db)
2222
end
2323

2424
def insert(insert_params)
25+
to_job_row(RiverJob.create(insert_params_to_hash(insert_params)))
26+
end
27+
28+
def insert_many(insert_params_many)
29+
RiverJob.multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) })
30+
insert_params_many.count
31+
end
32+
33+
private def insert_params_to_hash(insert_params)
2534
# the call to `#compact` is important so that we remove nils and table
2635
# default values get picked up instead
27-
to_job_row(
28-
RiverJob.create(
29-
{
30-
args: insert_params.encoded_args,
31-
kind: insert_params.kind,
32-
max_attempts: insert_params.max_attempts,
33-
priority: insert_params.priority,
34-
queue: insert_params.queue,
35-
state: insert_params.state,
36-
scheduled_at: insert_params.scheduled_at,
37-
tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil
38-
}.compact
39-
)
40-
)
36+
{
37+
args: insert_params.encoded_args,
38+
kind: insert_params.kind,
39+
max_attempts: insert_params.max_attempts,
40+
priority: insert_params.priority,
41+
queue: insert_params.queue,
42+
state: insert_params.state,
43+
scheduled_at: insert_params.scheduled_at,
44+
tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil
45+
}.compact
4146
end
4247

4348
private def to_job_row(river_job)

0 commit comments

Comments
 (0)