Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .rubocop_todo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ RSpec/BeforeAfterAll:
- 'spec/integration/cors_spec.rb'
- 'spec/unit/lib/vcap/rest_api/event_query_spec.rb'
- 'spec/unit/lib/vcap/rest_api/query_spec.rb'
- 'spec/isolated_specs/inline_runner_spec.rb'

# Offense count: 4
# This cop supports safe autocorrection (--autocorrect).
Expand Down
4 changes: 4 additions & 0 deletions app/jobs/diego/sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def perform
end
end

def inline?
true
end

private

def logger
Expand Down
22 changes: 4 additions & 18 deletions app/jobs/enqueuer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ def enqueue_pollable(job, existing_guid: nil, run_at: nil, priority_increment: n
PollableJobModel.find_by_delayed_job(delayed_job)
end

def run_inline(job)
run_immediately do
Delayed::Job.enqueue(TimeoutJob.new(job, job_timeout(job)), @opts)
end
def self.unwrap_job(job)
job.is_a?(WrappingJob) ? unwrap_job(job.handler) : job
end

private
Expand Down Expand Up @@ -64,30 +62,18 @@ def load_delayed_job_plugins
end

def job_timeout(job)
unwrapped_job = unwrap_job(job)
unwrapped_job = self.class.unwrap_job(job)
return @timeout_calculator.calculate(unwrapped_job.try(:job_name_in_configuration), @opts[:queue]) if @opts[:queue]

@timeout_calculator.calculate(unwrapped_job.try(:job_name_in_configuration))
end

def get_overwritten_job_priority_from_config(job)
unwrapped_job = unwrap_job(job)
unwrapped_job = self.class.unwrap_job(job)
@priority_overwriter.get(unwrapped_job.try(:display_name)) ||
@priority_overwriter.get(unwrapped_job.try(:job_name_in_configuration)) ||
@priority_overwriter.get(unwrapped_job.class.name)
end

def unwrap_job(job)
job.is_a?(PollableJobWrapper) ? job.handler : job
end

def run_immediately
cache = Delayed::Worker.delay_jobs
Delayed::Worker.delay_jobs = false
yield
ensure
Delayed::Worker.delay_jobs = cache
end
end
end
end
7 changes: 6 additions & 1 deletion lib/cloud_controller/clock/clock.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'clockwork'
require 'cloud_controller/clock/distributed_scheduler'
require 'cloud_controller/clock/inline_runner'

module VCAP::CloudController
class Clock
Expand All @@ -10,6 +11,10 @@ class Clock
MEDIUM_PRIORITY = 1
LOW_PRIORITY = 100

def initialize
Jobs::InlineRunner.setup
end

def schedule_daily_job(name:, at:, priority:)
job_opts = {
name: name,
Expand Down Expand Up @@ -48,7 +53,7 @@ def schedule_frequent_inline_job(name:, interval:, timeout:)

schedule_job(job_opts) do
job = yield
Jobs::Enqueuer.new(queue: name).run_inline(job)
Jobs::InlineRunner.new(queue: name).run(job)
end
end

Expand Down
20 changes: 20 additions & 0 deletions lib/cloud_controller/clock/inline_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module VCAP::CloudController
module Jobs
class InlineRunner < Enqueuer
class << self
def setup
Delayed::Worker.delay_jobs = lambda do |job|
unwrapped_job = unwrap_job(job.payload_object)
!(unwrapped_job.respond_to?(:inline?) && unwrapped_job.inline?)
end
end
end

def run(job)
raise ArgumentError.new("job must define a method 'inline?' which returns 'true'") unless job.respond_to?(:inline?) && job.inline?

Delayed::Job.enqueue(TimeoutJob.new(job, job_timeout(job)), @opts)
end
end
end
end
65 changes: 65 additions & 0 deletions spec/isolated_specs/inline_runner_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
require 'spec_helper'
require 'cloud_controller/clock/inline_runner'

########### Note ###########
# This test modifies the global setting Delayed::Worker.delay_jobs which might affect other tests running in parallel
# It is recommended to run this test separately
############################

module VCAP::CloudController
RSpec.describe Jobs::InlineRunner, job_context: :clock do
before(:all) do
@original_delay_jobs = Delayed::Worker.delay_jobs
end

after(:all) do
Delayed::Worker.delay_jobs = @original_delay_jobs
end

before do
Delayed::Worker.delay_jobs = nil
end

describe '#setup' do
it 'sets up delay_jobs' do
expect(Delayed::Worker.delay_jobs).to be_nil

Jobs::InlineRunner.setup

expect(Delayed::Worker.delay_jobs).to be_instance_of(Proc)
end
end

describe '#run' do
let(:job) { double('inline_job', inline?: true, perform: nil) }
let(:timeout) { 123 }
let(:opts) { { queue: 'queue' } }

before do
Jobs::InlineRunner.setup
end

it 'calls Delayed::Job.enqueue with the job wrapped in a TimeoutJob' do
expect_any_instance_of(Jobs::InlineRunner).to receive(:job_timeout).with(job).and_return(timeout)
expect(Jobs::TimeoutJob).to receive(:new).with(job, timeout).and_call_original
expect(Delayed::Job).to receive(:enqueue).with(instance_of(Jobs::TimeoutJob), opts).and_call_original
expect(Delayed::Worker).to receive(:delay_job?).and_wrap_original do |method, *args|
result = method.call(*args)
expect(result).to be(false)
result
end
expect(job).to receive(:perform)

Jobs::InlineRunner.new(opts).run(job)
end

context 'when the job does not define inline?' do
let(:job) { double('non_inline_job') }

it 'raises an ArgumentError' do
expect { Jobs::InlineRunner.new.run(job) }.to raise_error(ArgumentError, "job must define a method 'inline?' which returns 'true'")
end
end
end
end
end
6 changes: 6 additions & 0 deletions spec/unit/jobs/diego/sync_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ module Jobs::Diego
job.perform
end
end

describe '#inline?' do
it 'returns true' do
expect(job.inline?).to be(true)
end
end
end
end
end
34 changes: 0 additions & 34 deletions spec/unit/jobs/enqueuer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -290,39 +290,5 @@ module VCAP::CloudController::Jobs
end
end
end

describe '#run_inline' do
let(:wrapped_job) { Runtime::ModelDeletion.new('one', 'two') }
let(:opts) { {} }

it 'schedules the job to run immediately in-process' do
expect(Delayed::Job).to receive(:enqueue) do
expect(Delayed::Worker.delay_jobs).to be(false)
end

expect(Delayed::Worker.delay_jobs).to be(true)
Enqueuer.new(opts).run_inline(wrapped_job)
expect(Delayed::Worker.delay_jobs).to be(true)
end

it 'uses the job timeout' do
expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, _opts|
expect(enqueued_job).to be_a TimeoutJob
expect(enqueued_job.timeout).to eq(global_timeout)
end
Enqueuer.new(opts).run_inline(wrapped_job)
end

context 'when executing the job fails' do
it 'still restores delay_jobs flag' do
expect(Delayed::Job).to receive(:enqueue).and_raise('Boom!')
expect(Delayed::Worker.delay_jobs).to be(true)
expect do
Enqueuer.new(opts).run_inline(wrapped_job)
end.to raise_error(/Boom!/)
expect(Delayed::Worker.delay_jobs).to be(true)
end
end
end
end
end
15 changes: 12 additions & 3 deletions spec/unit/lib/cloud_controller/clock/clock_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,19 @@ module VCAP::CloudController
def initialize(*args); end
end
end
let(:enqueuer) { instance_double(Jobs::Enqueuer, enqueue: nil, run_inline: nil) }
let(:enqueuer) { instance_double(Jobs::Enqueuer, enqueue: nil) }
let(:inline_runner) { instance_double(Jobs::InlineRunner, run: nil) }

before do
allow(Jobs::Enqueuer).to receive(:new).and_return(enqueuer)
allow(Jobs::InlineRunner).to receive(:new).and_return(inline_runner)
end

describe 'setup' do
it 'sets up the Jobs::InlineRunner' do
expect(Jobs::InlineRunner).to receive(:setup)
clock
end
end

describe 'scheduling a daily job' do
Expand Down Expand Up @@ -127,8 +136,8 @@ def initialize(*args); end
clock.schedule_frequent_inline_job(**clock_opts) { some_job_class.new }

expected_job_opts = { queue: job_name }
expect(Jobs::Enqueuer).to have_received(:new).with(expected_job_opts)
expect(enqueuer).to have_received(:run_inline).with(instance_of(some_job_class))
expect(Jobs::InlineRunner).to have_received(:new).with(expected_job_opts)
expect(inline_runner).to have_received(:run).with(instance_of(some_job_class))
end
end
end
Expand Down