diff --git a/.rubocop_todo.yml b/.rubocop_todo.yml index 82ed17f97d5..d8b4ab7a415 100644 --- a/.rubocop_todo.yml +++ b/.rubocop_todo.yml @@ -154,6 +154,7 @@ RSpec/BeforeAfterAll: - 'spec/integration/cors_spec.rb' - 'spec/unit/lib/vcap/rest_api/event_query_spec.rb' - 'spec/unit/lib/vcap/rest_api/query_spec.rb' + - 'spec/isolated_specs/inline_runner_spec.rb' # Offense count: 4 # This cop supports safe autocorrection (--autocorrect). diff --git a/app/jobs/diego/sync.rb b/app/jobs/diego/sync.rb index f13d2ad0858..0f75c805022 100644 --- a/app/jobs/diego/sync.rb +++ b/app/jobs/diego/sync.rb @@ -29,6 +29,10 @@ def perform end end + def inline? + true + end + private def logger diff --git a/app/jobs/enqueuer.rb b/app/jobs/enqueuer.rb index 0bda7fc8ce9..e52b6a613f6 100644 --- a/app/jobs/enqueuer.rb +++ b/app/jobs/enqueuer.rb @@ -29,10 +29,8 @@ def enqueue_pollable(job, existing_guid: nil, run_at: nil, priority_increment: n PollableJobModel.find_by_delayed_job(delayed_job) end - def run_inline(job) - run_immediately do - Delayed::Job.enqueue(TimeoutJob.new(job, job_timeout(job)), @opts) - end + def self.unwrap_job(job) + job.is_a?(WrappingJob) ? unwrap_job(job.handler) : job end private @@ -64,30 +62,18 @@ def load_delayed_job_plugins end def job_timeout(job) - unwrapped_job = unwrap_job(job) + unwrapped_job = self.class.unwrap_job(job) return @timeout_calculator.calculate(unwrapped_job.try(:job_name_in_configuration), @opts[:queue]) if @opts[:queue] @timeout_calculator.calculate(unwrapped_job.try(:job_name_in_configuration)) end def get_overwritten_job_priority_from_config(job) - unwrapped_job = unwrap_job(job) + unwrapped_job = self.class.unwrap_job(job) @priority_overwriter.get(unwrapped_job.try(:display_name)) || @priority_overwriter.get(unwrapped_job.try(:job_name_in_configuration)) || @priority_overwriter.get(unwrapped_job.class.name) end - - def unwrap_job(job) - job.is_a?(PollableJobWrapper) ? job.handler : job - end - - def run_immediately - cache = Delayed::Worker.delay_jobs - Delayed::Worker.delay_jobs = false - yield - ensure - Delayed::Worker.delay_jobs = cache - end end end end diff --git a/lib/cloud_controller/clock/clock.rb b/lib/cloud_controller/clock/clock.rb index 0260b3797f0..f491d23833b 100644 --- a/lib/cloud_controller/clock/clock.rb +++ b/lib/cloud_controller/clock/clock.rb @@ -1,5 +1,6 @@ require 'clockwork' require 'cloud_controller/clock/distributed_scheduler' +require 'cloud_controller/clock/inline_runner' module VCAP::CloudController class Clock @@ -10,6 +11,10 @@ class Clock MEDIUM_PRIORITY = 1 LOW_PRIORITY = 100 + def initialize + Jobs::InlineRunner.setup + end + def schedule_daily_job(name:, at:, priority:) job_opts = { name: name, @@ -48,7 +53,7 @@ def schedule_frequent_inline_job(name:, interval:, timeout:) schedule_job(job_opts) do job = yield - Jobs::Enqueuer.new(queue: name).run_inline(job) + Jobs::InlineRunner.new(queue: name).run(job) end end diff --git a/lib/cloud_controller/clock/inline_runner.rb b/lib/cloud_controller/clock/inline_runner.rb new file mode 100644 index 00000000000..9f2c299fcb5 --- /dev/null +++ b/lib/cloud_controller/clock/inline_runner.rb @@ -0,0 +1,20 @@ +module VCAP::CloudController + module Jobs + class InlineRunner < Enqueuer + class << self + def setup + Delayed::Worker.delay_jobs = lambda do |job| + unwrapped_job = unwrap_job(job.payload_object) + !(unwrapped_job.respond_to?(:inline?) && unwrapped_job.inline?) + end + end + end + + def run(job) + raise ArgumentError.new("job must define a method 'inline?' which returns 'true'") unless job.respond_to?(:inline?) && job.inline? + + Delayed::Job.enqueue(TimeoutJob.new(job, job_timeout(job)), @opts) + end + end + end +end diff --git a/spec/isolated_specs/inline_runner_spec.rb b/spec/isolated_specs/inline_runner_spec.rb new file mode 100644 index 00000000000..1677acc30c0 --- /dev/null +++ b/spec/isolated_specs/inline_runner_spec.rb @@ -0,0 +1,65 @@ +require 'spec_helper' +require 'cloud_controller/clock/inline_runner' + +########### Note ########### +# This test modifies the global setting Delayed::Worker.delay_jobs which might affect other tests running in parallel +# It is recommended to run this test separately +############################ + +module VCAP::CloudController + RSpec.describe Jobs::InlineRunner, job_context: :clock do + before(:all) do + @original_delay_jobs = Delayed::Worker.delay_jobs + end + + after(:all) do + Delayed::Worker.delay_jobs = @original_delay_jobs + end + + before do + Delayed::Worker.delay_jobs = nil + end + + describe '#setup' do + it 'sets up delay_jobs' do + expect(Delayed::Worker.delay_jobs).to be_nil + + Jobs::InlineRunner.setup + + expect(Delayed::Worker.delay_jobs).to be_instance_of(Proc) + end + end + + describe '#run' do + let(:job) { double('inline_job', inline?: true, perform: nil) } + let(:timeout) { 123 } + let(:opts) { { queue: 'queue' } } + + before do + Jobs::InlineRunner.setup + end + + it 'calls Delayed::Job.enqueue with the job wrapped in a TimeoutJob' do + expect_any_instance_of(Jobs::InlineRunner).to receive(:job_timeout).with(job).and_return(timeout) + expect(Jobs::TimeoutJob).to receive(:new).with(job, timeout).and_call_original + expect(Delayed::Job).to receive(:enqueue).with(instance_of(Jobs::TimeoutJob), opts).and_call_original + expect(Delayed::Worker).to receive(:delay_job?).and_wrap_original do |method, *args| + result = method.call(*args) + expect(result).to be(false) + result + end + expect(job).to receive(:perform) + + Jobs::InlineRunner.new(opts).run(job) + end + + context 'when the job does not define inline?' do + let(:job) { double('non_inline_job') } + + it 'raises an ArgumentError' do + expect { Jobs::InlineRunner.new.run(job) }.to raise_error(ArgumentError, "job must define a method 'inline?' which returns 'true'") + end + end + end + end +end diff --git a/spec/unit/jobs/diego/sync_spec.rb b/spec/unit/jobs/diego/sync_spec.rb index 52dfce9f634..ee9401a209d 100644 --- a/spec/unit/jobs/diego/sync_spec.rb +++ b/spec/unit/jobs/diego/sync_spec.rb @@ -43,6 +43,12 @@ module Jobs::Diego job.perform end end + + describe '#inline?' do + it 'returns true' do + expect(job.inline?).to be(true) + end + end end end end diff --git a/spec/unit/jobs/enqueuer_spec.rb b/spec/unit/jobs/enqueuer_spec.rb index 35ec490711c..1c3f274c285 100644 --- a/spec/unit/jobs/enqueuer_spec.rb +++ b/spec/unit/jobs/enqueuer_spec.rb @@ -290,39 +290,5 @@ module VCAP::CloudController::Jobs end end end - - describe '#run_inline' do - let(:wrapped_job) { Runtime::ModelDeletion.new('one', 'two') } - let(:opts) { {} } - - it 'schedules the job to run immediately in-process' do - expect(Delayed::Job).to receive(:enqueue) do - expect(Delayed::Worker.delay_jobs).to be(false) - end - - expect(Delayed::Worker.delay_jobs).to be(true) - Enqueuer.new(opts).run_inline(wrapped_job) - expect(Delayed::Worker.delay_jobs).to be(true) - end - - it 'uses the job timeout' do - expect(Delayed::Job).to receive(:enqueue) do |enqueued_job, _opts| - expect(enqueued_job).to be_a TimeoutJob - expect(enqueued_job.timeout).to eq(global_timeout) - end - Enqueuer.new(opts).run_inline(wrapped_job) - end - - context 'when executing the job fails' do - it 'still restores delay_jobs flag' do - expect(Delayed::Job).to receive(:enqueue).and_raise('Boom!') - expect(Delayed::Worker.delay_jobs).to be(true) - expect do - Enqueuer.new(opts).run_inline(wrapped_job) - end.to raise_error(/Boom!/) - expect(Delayed::Worker.delay_jobs).to be(true) - end - end - end end end diff --git a/spec/unit/lib/cloud_controller/clock/clock_spec.rb b/spec/unit/lib/cloud_controller/clock/clock_spec.rb index 32f1021eb67..3a71cc9d83f 100644 --- a/spec/unit/lib/cloud_controller/clock/clock_spec.rb +++ b/spec/unit/lib/cloud_controller/clock/clock_spec.rb @@ -10,10 +10,19 @@ module VCAP::CloudController def initialize(*args); end end end - let(:enqueuer) { instance_double(Jobs::Enqueuer, enqueue: nil, run_inline: nil) } + let(:enqueuer) { instance_double(Jobs::Enqueuer, enqueue: nil) } + let(:inline_runner) { instance_double(Jobs::InlineRunner, run: nil) } before do allow(Jobs::Enqueuer).to receive(:new).and_return(enqueuer) + allow(Jobs::InlineRunner).to receive(:new).and_return(inline_runner) + end + + describe 'setup' do + it 'sets up the Jobs::InlineRunner' do + expect(Jobs::InlineRunner).to receive(:setup) + clock + end end describe 'scheduling a daily job' do @@ -127,8 +136,8 @@ def initialize(*args); end clock.schedule_frequent_inline_job(**clock_opts) { some_job_class.new } expected_job_opts = { queue: job_name } - expect(Jobs::Enqueuer).to have_received(:new).with(expected_job_opts) - expect(enqueuer).to have_received(:run_inline).with(instance_of(some_job_class)) + expect(Jobs::InlineRunner).to have_received(:new).with(expected_job_opts) + expect(inline_runner).to have_received(:run).with(instance_of(some_job_class)) end end end