diff options
Diffstat (limited to 'spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb')
-rw-r--r-- | spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb | 285 |
1 files changed, 235 insertions, 50 deletions
diff --git a/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb b/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb index 5083ac514db..833de6ae624 100644 --- a/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb @@ -24,6 +24,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi "#{Gitlab::Redis::Queues::SIDEKIQ_NAMESPACE}:duplicate:#{queue}:#{hash}" end + let(:deduplicated_flag_key) do + "#{idempotency_key}:deduplicate_flag" + end + describe '#schedule' do shared_examples 'scheduling with deduplication class' do |strategy_class| it 'calls schedule on the strategy' do @@ -81,25 +85,43 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi context 'when there was no job in the queue yet' do it { expect(duplicate_job.check!).to eq('123') } - it "adds a idempotency key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do - expect { duplicate_job.check! } - .to change { read_idempotency_key_with_ttl(idempotency_key) } - .from([nil, -2]) - .to(['123', be_within(1).of(described_class::DUPLICATE_KEY_TTL)]) - end - - context 'when wal locations is not empty' do - it "adds a existing wal locations key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do + shared_examples 'sets Redis keys with correct TTL' do + it "adds an idempotency key with correct ttl" do expect { duplicate_job.check! } - .to change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) } - .from([nil, -2]) - .to([wal_locations[:main], be_within(1).of(described_class::DUPLICATE_KEY_TTL)]) - .and change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) } + .to change { read_idempotency_key_with_ttl(idempotency_key) } .from([nil, -2]) - .to([wal_locations[:ci], be_within(1).of(described_class::DUPLICATE_KEY_TTL)]) + .to(['123', be_within(1).of(expected_ttl)]) + end + + context 'when wal locations is not empty' do + it "adds an existing wal locations key with correct ttl" do + expect { duplicate_job.check! } + .to change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) } + .from([nil, -2]) + .to([wal_locations[:main], be_within(1).of(expected_ttl)]) + .and change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) } + .from([nil, -2]) + .to([wal_locations[:ci], be_within(1).of(expected_ttl)]) + end end end + context 'with TTL option is not set' do + let(:expected_ttl) { described_class::DEFAULT_DUPLICATE_KEY_TTL } + + it_behaves_like 'sets Redis keys with correct TTL' + end + + context 'when TTL option is set' do + let(:expected_ttl) { 5.minutes } + + before do + allow(duplicate_job).to receive(:options).and_return({ ttl: expected_ttl }) + end + + it_behaves_like 'sets Redis keys with correct TTL' + end + context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do before do stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false) @@ -152,26 +174,21 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi end describe '#update_latest_wal_location!' do - let(:offset) { '1024' } - before do - allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:main).and_return(offset) - allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:ci).and_return(offset) - end + allow(Gitlab::Database).to receive(:database_base_models).and_return( + { main: ::ActiveRecord::Base, + ci: ::ActiveRecord::Base }) - shared_examples 'updates wal location' do - it 'updates a wal location to redis with an offset' do - expect { duplicate_job.update_latest_wal_location! } - .to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) } - .from(existing_wal_with_offset[:main]) - .to(new_wal_with_offset[:main]) - .and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) } - .from(existing_wal_with_offset[:ci]) - .to(new_wal_with_offset[:ci]) - end + set_idempotency_key(existing_wal_location_key(idempotency_key, :main), existing_wal[:main]) + set_idempotency_key(existing_wal_location_key(idempotency_key, :ci), existing_wal[:ci]) + + # read existing_wal_locations + duplicate_job.check! end context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do + let(:existing_wal) { {} } + before do stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false) end @@ -192,42 +209,107 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi end context "when the key doesn't exists in redis" do - include_examples 'updates wal location' do - let(:existing_wal_with_offset) { { main: [], ci: [] } } - let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } } + let(:existing_wal) do + { + main: '0/D525E3A0', + ci: 'AB/12340' + } end - end - context "when the key exists in redis" do - let(:existing_offset) { '1023'} - let(:existing_wal_locations) do + let(:new_wal_location_with_offset) do { - main: '0/D525E3NM', - ci: 'AB/111112' + # offset is relative to `existing_wal` + main: ['0/D525E3A8', '8'], + ci: ['AB/12345', '5'] } end + let(:wal_locations) { new_wal_location_with_offset.transform_values(&:first) } + + it 'stores a wal location to redis with an offset relative to existing wal location' do + expect { duplicate_job.update_latest_wal_location! } + .to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) } + .from([]) + .to(new_wal_location_with_offset[:main]) + .and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) } + .from([]) + .to(new_wal_location_with_offset[:ci]) + end + end + + context "when the key exists in redis" do before do - rpush_to_redis_key(wal_location_key(idempotency_key, :main), existing_wal_locations[:main], existing_offset) - rpush_to_redis_key(wal_location_key(idempotency_key, :ci), existing_wal_locations[:ci], existing_offset) + rpush_to_redis_key(wal_location_key(idempotency_key, :main), *stored_wal_location_with_offset[:main]) + rpush_to_redis_key(wal_location_key(idempotency_key, :ci), *stored_wal_location_with_offset[:ci]) end + let(:wal_locations) { new_wal_location_with_offset.transform_values(&:first) } + context "when the new offset is bigger then the existing one" do - include_examples 'updates wal location' do - let(:existing_wal_with_offset) { existing_wal_locations.transform_values { |v| [v, existing_offset] } } - let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } } + let(:existing_wal) do + { + main: '0/D525E3A0', + ci: 'AB/12340' + } + end + + let(:stored_wal_location_with_offset) do + { + # offset is relative to `existing_wal` + main: ['0/D525E3A3', '3'], + ci: ['AB/12342', '2'] + } + end + + let(:new_wal_location_with_offset) do + { + # offset is relative to `existing_wal` + main: ['0/D525E3A8', '8'], + ci: ['AB/12345', '5'] + } + end + + it 'updates a wal location to redis with an offset' do + expect { duplicate_job.update_latest_wal_location! } + .to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) } + .from(stored_wal_location_with_offset[:main]) + .to(new_wal_location_with_offset[:main]) + .and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) } + .from(stored_wal_location_with_offset[:ci]) + .to(new_wal_location_with_offset[:ci]) end end context "when the old offset is not bigger then the existing one" do - let(:existing_offset) { offset } + let(:existing_wal) do + { + main: '0/D525E3A0', + ci: 'AB/12340' + } + end + + let(:stored_wal_location_with_offset) do + { + # offset is relative to `existing_wal` + main: ['0/D525E3A8', '8'], + ci: ['AB/12345', '5'] + } + end + + let(:new_wal_location_with_offset) do + { + # offset is relative to `existing_wal` + main: ['0/D525E3A2', '2'], + ci: ['AB/12342', '2'] + } + end it "does not update a wal location to redis with an offset" do expect { duplicate_job.update_latest_wal_location! } .to not_change { read_range_from_redis(wal_location_key(idempotency_key, :main)) } - .from([existing_wal_locations[:main], existing_offset]) + .from(stored_wal_location_with_offset[:main]) .and not_change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) } - .from([existing_wal_locations[:ci], existing_offset]) + .from(stored_wal_location_with_offset[:ci]) end end end @@ -270,6 +352,7 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi context 'when the key exists in redis' do before do set_idempotency_key(idempotency_key, 'existing-jid') + set_idempotency_key(deduplicated_flag_key, 1) wal_locations.each do |config_name, location| set_idempotency_key(existing_wal_location_key(idempotency_key, config_name), location) set_idempotency_key(wal_location_key(idempotency_key, config_name), location) @@ -299,6 +382,11 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi let(:from_value) { 'existing-jid' } end + it_behaves_like 'deleting keys from redis', 'deduplication counter key' do + let(:key) { deduplicated_flag_key } + let(:from_value) { '1' } + end + it_behaves_like 'deleting keys from redis', 'existing wal location keys for main database' do let(:key) { existing_wal_location_key(idempotency_key, :main) } let(:from_value) { wal_locations[:main] } @@ -390,6 +478,103 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi end end + describe '#reschedule' do + it 'reschedules the current job' do + fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger) + expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger) + expect(fake_logger).to receive(:rescheduled_log).with(a_hash_including({ 'jid' => '123' })) + expect(AuthorizedProjectsWorker).to receive(:perform_async).with(1).once + + duplicate_job.reschedule + end + end + + describe '#should_reschedule?' do + subject { duplicate_job.should_reschedule? } + + context 'when the job is reschedulable' do + before do + allow(duplicate_job).to receive(:reschedulable?) { true } + end + + it { is_expected.to eq(false) } + + context 'with deduplicated flag' do + before do + duplicate_job.set_deduplicated_flag! + end + + it { is_expected.to eq(true) } + end + end + + context 'when the job is not reschedulable' do + before do + allow(duplicate_job).to receive(:reschedulable?) { false } + end + + it { is_expected.to eq(false) } + + context 'with deduplicated flag' do + before do + duplicate_job.set_deduplicated_flag! + end + + it { is_expected.to eq(false) } + end + end + end + + describe '#set_deduplicated_flag!' do + context 'when the job is reschedulable' do + before do + allow(duplicate_job).to receive(:reschedulable?) { true } + end + + it 'sets the key in Redis' do + duplicate_job.set_deduplicated_flag! + + flag = Sidekiq.redis { |redis| redis.get(deduplicated_flag_key) } + + expect(flag).to eq(described_class::DEDUPLICATED_FLAG_VALUE.to_s) + end + + it 'sets, gets and cleans up the deduplicated flag' do + expect(duplicate_job.should_reschedule?).to eq(false) + + duplicate_job.set_deduplicated_flag! + expect(duplicate_job.should_reschedule?).to eq(true) + + duplicate_job.delete! + expect(duplicate_job.should_reschedule?).to eq(false) + end + end + + context 'when the job is not reschedulable' do + before do + allow(duplicate_job).to receive(:reschedulable?) { false } + end + + it 'does not set the key in Redis' do + duplicate_job.set_deduplicated_flag! + + flag = Sidekiq.redis { |redis| redis.get(deduplicated_flag_key) } + + expect(flag).to be_nil + end + + it 'does not set the deduplicated flag' do + expect(duplicate_job.should_reschedule?).to eq(false) + + duplicate_job.set_deduplicated_flag! + expect(duplicate_job.should_reschedule?).to eq(false) + + duplicate_job.delete! + expect(duplicate_job.should_reschedule?).to eq(false) + end + end + end + describe '#duplicate?' do it "raises an error if the check wasn't performed" do expect { duplicate_job.duplicate? }.to raise_error /Call `#check!` first/ @@ -494,12 +679,12 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi end end - def existing_wal_location_key(idempotency_key, config_name) - "#{idempotency_key}:#{config_name}:existing_wal_location" + def existing_wal_location_key(idempotency_key, connection_name) + "#{idempotency_key}:#{connection_name}:existing_wal_location" end - def wal_location_key(idempotency_key, config_name) - "#{idempotency_key}:#{config_name}:wal_location" + def wal_location_key(idempotency_key, connection_name) + "#{idempotency_key}:#{connection_name}:wal_location" end def set_idempotency_key(key, value = '1') |