summaryrefslogtreecommitdiff
path: root/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb
diff options
context:
space:
mode:
Diffstat (limited to 'spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb')
-rw-r--r--spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb285
1 files changed, 235 insertions, 50 deletions
diff --git a/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb b/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb
index 5083ac514db..833de6ae624 100644
--- a/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb
+++ b/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb
@@ -24,6 +24,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
"#{Gitlab::Redis::Queues::SIDEKIQ_NAMESPACE}:duplicate:#{queue}:#{hash}"
end
+ let(:deduplicated_flag_key) do
+ "#{idempotency_key}:deduplicate_flag"
+ end
+
describe '#schedule' do
shared_examples 'scheduling with deduplication class' do |strategy_class|
it 'calls schedule on the strategy' do
@@ -81,25 +85,43 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
context 'when there was no job in the queue yet' do
it { expect(duplicate_job.check!).to eq('123') }
- it "adds a idempotency key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do
- expect { duplicate_job.check! }
- .to change { read_idempotency_key_with_ttl(idempotency_key) }
- .from([nil, -2])
- .to(['123', be_within(1).of(described_class::DUPLICATE_KEY_TTL)])
- end
-
- context 'when wal locations is not empty' do
- it "adds a existing wal locations key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do
+ shared_examples 'sets Redis keys with correct TTL' do
+ it "adds an idempotency key with correct ttl" do
expect { duplicate_job.check! }
- .to change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) }
- .from([nil, -2])
- .to([wal_locations[:main], be_within(1).of(described_class::DUPLICATE_KEY_TTL)])
- .and change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) }
+ .to change { read_idempotency_key_with_ttl(idempotency_key) }
.from([nil, -2])
- .to([wal_locations[:ci], be_within(1).of(described_class::DUPLICATE_KEY_TTL)])
+ .to(['123', be_within(1).of(expected_ttl)])
+ end
+
+ context 'when wal locations is not empty' do
+ it "adds an existing wal locations key with correct ttl" do
+ expect { duplicate_job.check! }
+ .to change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) }
+ .from([nil, -2])
+ .to([wal_locations[:main], be_within(1).of(expected_ttl)])
+ .and change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) }
+ .from([nil, -2])
+ .to([wal_locations[:ci], be_within(1).of(expected_ttl)])
+ end
end
end
+ context 'with TTL option is not set' do
+ let(:expected_ttl) { described_class::DEFAULT_DUPLICATE_KEY_TTL }
+
+ it_behaves_like 'sets Redis keys with correct TTL'
+ end
+
+ context 'when TTL option is set' do
+ let(:expected_ttl) { 5.minutes }
+
+ before do
+ allow(duplicate_job).to receive(:options).and_return({ ttl: expected_ttl })
+ end
+
+ it_behaves_like 'sets Redis keys with correct TTL'
+ end
+
context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do
before do
stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false)
@@ -152,26 +174,21 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end
describe '#update_latest_wal_location!' do
- let(:offset) { '1024' }
-
before do
- allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:main).and_return(offset)
- allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:ci).and_return(offset)
- end
+ allow(Gitlab::Database).to receive(:database_base_models).and_return(
+ { main: ::ActiveRecord::Base,
+ ci: ::ActiveRecord::Base })
- shared_examples 'updates wal location' do
- it 'updates a wal location to redis with an offset' do
- expect { duplicate_job.update_latest_wal_location! }
- .to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
- .from(existing_wal_with_offset[:main])
- .to(new_wal_with_offset[:main])
- .and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
- .from(existing_wal_with_offset[:ci])
- .to(new_wal_with_offset[:ci])
- end
+ set_idempotency_key(existing_wal_location_key(idempotency_key, :main), existing_wal[:main])
+ set_idempotency_key(existing_wal_location_key(idempotency_key, :ci), existing_wal[:ci])
+
+ # read existing_wal_locations
+ duplicate_job.check!
end
context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do
+ let(:existing_wal) { {} }
+
before do
stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false)
end
@@ -192,42 +209,107 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end
context "when the key doesn't exists in redis" do
- include_examples 'updates wal location' do
- let(:existing_wal_with_offset) { { main: [], ci: [] } }
- let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } }
+ let(:existing_wal) do
+ {
+ main: '0/D525E3A0',
+ ci: 'AB/12340'
+ }
end
- end
- context "when the key exists in redis" do
- let(:existing_offset) { '1023'}
- let(:existing_wal_locations) do
+ let(:new_wal_location_with_offset) do
{
- main: '0/D525E3NM',
- ci: 'AB/111112'
+ # offset is relative to `existing_wal`
+ main: ['0/D525E3A8', '8'],
+ ci: ['AB/12345', '5']
}
end
+ let(:wal_locations) { new_wal_location_with_offset.transform_values(&:first) }
+
+ it 'stores a wal location to redis with an offset relative to existing wal location' do
+ expect { duplicate_job.update_latest_wal_location! }
+ .to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
+ .from([])
+ .to(new_wal_location_with_offset[:main])
+ .and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
+ .from([])
+ .to(new_wal_location_with_offset[:ci])
+ end
+ end
+
+ context "when the key exists in redis" do
before do
- rpush_to_redis_key(wal_location_key(idempotency_key, :main), existing_wal_locations[:main], existing_offset)
- rpush_to_redis_key(wal_location_key(idempotency_key, :ci), existing_wal_locations[:ci], existing_offset)
+ rpush_to_redis_key(wal_location_key(idempotency_key, :main), *stored_wal_location_with_offset[:main])
+ rpush_to_redis_key(wal_location_key(idempotency_key, :ci), *stored_wal_location_with_offset[:ci])
end
+ let(:wal_locations) { new_wal_location_with_offset.transform_values(&:first) }
+
context "when the new offset is bigger then the existing one" do
- include_examples 'updates wal location' do
- let(:existing_wal_with_offset) { existing_wal_locations.transform_values { |v| [v, existing_offset] } }
- let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } }
+ let(:existing_wal) do
+ {
+ main: '0/D525E3A0',
+ ci: 'AB/12340'
+ }
+ end
+
+ let(:stored_wal_location_with_offset) do
+ {
+ # offset is relative to `existing_wal`
+ main: ['0/D525E3A3', '3'],
+ ci: ['AB/12342', '2']
+ }
+ end
+
+ let(:new_wal_location_with_offset) do
+ {
+ # offset is relative to `existing_wal`
+ main: ['0/D525E3A8', '8'],
+ ci: ['AB/12345', '5']
+ }
+ end
+
+ it 'updates a wal location to redis with an offset' do
+ expect { duplicate_job.update_latest_wal_location! }
+ .to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
+ .from(stored_wal_location_with_offset[:main])
+ .to(new_wal_location_with_offset[:main])
+ .and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
+ .from(stored_wal_location_with_offset[:ci])
+ .to(new_wal_location_with_offset[:ci])
end
end
context "when the old offset is not bigger then the existing one" do
- let(:existing_offset) { offset }
+ let(:existing_wal) do
+ {
+ main: '0/D525E3A0',
+ ci: 'AB/12340'
+ }
+ end
+
+ let(:stored_wal_location_with_offset) do
+ {
+ # offset is relative to `existing_wal`
+ main: ['0/D525E3A8', '8'],
+ ci: ['AB/12345', '5']
+ }
+ end
+
+ let(:new_wal_location_with_offset) do
+ {
+ # offset is relative to `existing_wal`
+ main: ['0/D525E3A2', '2'],
+ ci: ['AB/12342', '2']
+ }
+ end
it "does not update a wal location to redis with an offset" do
expect { duplicate_job.update_latest_wal_location! }
.to not_change { read_range_from_redis(wal_location_key(idempotency_key, :main)) }
- .from([existing_wal_locations[:main], existing_offset])
+ .from(stored_wal_location_with_offset[:main])
.and not_change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) }
- .from([existing_wal_locations[:ci], existing_offset])
+ .from(stored_wal_location_with_offset[:ci])
end
end
end
@@ -270,6 +352,7 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
context 'when the key exists in redis' do
before do
set_idempotency_key(idempotency_key, 'existing-jid')
+ set_idempotency_key(deduplicated_flag_key, 1)
wal_locations.each do |config_name, location|
set_idempotency_key(existing_wal_location_key(idempotency_key, config_name), location)
set_idempotency_key(wal_location_key(idempotency_key, config_name), location)
@@ -299,6 +382,11 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
let(:from_value) { 'existing-jid' }
end
+ it_behaves_like 'deleting keys from redis', 'deduplication counter key' do
+ let(:key) { deduplicated_flag_key }
+ let(:from_value) { '1' }
+ end
+
it_behaves_like 'deleting keys from redis', 'existing wal location keys for main database' do
let(:key) { existing_wal_location_key(idempotency_key, :main) }
let(:from_value) { wal_locations[:main] }
@@ -390,6 +478,103 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end
end
+ describe '#reschedule' do
+ it 'reschedules the current job' do
+ fake_logger = instance_double(Gitlab::SidekiqLogging::DeduplicationLogger)
+ expect(Gitlab::SidekiqLogging::DeduplicationLogger).to receive(:instance).and_return(fake_logger)
+ expect(fake_logger).to receive(:rescheduled_log).with(a_hash_including({ 'jid' => '123' }))
+ expect(AuthorizedProjectsWorker).to receive(:perform_async).with(1).once
+
+ duplicate_job.reschedule
+ end
+ end
+
+ describe '#should_reschedule?' do
+ subject { duplicate_job.should_reschedule? }
+
+ context 'when the job is reschedulable' do
+ before do
+ allow(duplicate_job).to receive(:reschedulable?) { true }
+ end
+
+ it { is_expected.to eq(false) }
+
+ context 'with deduplicated flag' do
+ before do
+ duplicate_job.set_deduplicated_flag!
+ end
+
+ it { is_expected.to eq(true) }
+ end
+ end
+
+ context 'when the job is not reschedulable' do
+ before do
+ allow(duplicate_job).to receive(:reschedulable?) { false }
+ end
+
+ it { is_expected.to eq(false) }
+
+ context 'with deduplicated flag' do
+ before do
+ duplicate_job.set_deduplicated_flag!
+ end
+
+ it { is_expected.to eq(false) }
+ end
+ end
+ end
+
+ describe '#set_deduplicated_flag!' do
+ context 'when the job is reschedulable' do
+ before do
+ allow(duplicate_job).to receive(:reschedulable?) { true }
+ end
+
+ it 'sets the key in Redis' do
+ duplicate_job.set_deduplicated_flag!
+
+ flag = Sidekiq.redis { |redis| redis.get(deduplicated_flag_key) }
+
+ expect(flag).to eq(described_class::DEDUPLICATED_FLAG_VALUE.to_s)
+ end
+
+ it 'sets, gets and cleans up the deduplicated flag' do
+ expect(duplicate_job.should_reschedule?).to eq(false)
+
+ duplicate_job.set_deduplicated_flag!
+ expect(duplicate_job.should_reschedule?).to eq(true)
+
+ duplicate_job.delete!
+ expect(duplicate_job.should_reschedule?).to eq(false)
+ end
+ end
+
+ context 'when the job is not reschedulable' do
+ before do
+ allow(duplicate_job).to receive(:reschedulable?) { false }
+ end
+
+ it 'does not set the key in Redis' do
+ duplicate_job.set_deduplicated_flag!
+
+ flag = Sidekiq.redis { |redis| redis.get(deduplicated_flag_key) }
+
+ expect(flag).to be_nil
+ end
+
+ it 'does not set the deduplicated flag' do
+ expect(duplicate_job.should_reschedule?).to eq(false)
+
+ duplicate_job.set_deduplicated_flag!
+ expect(duplicate_job.should_reschedule?).to eq(false)
+
+ duplicate_job.delete!
+ expect(duplicate_job.should_reschedule?).to eq(false)
+ end
+ end
+ end
+
describe '#duplicate?' do
it "raises an error if the check wasn't performed" do
expect { duplicate_job.duplicate? }.to raise_error /Call `#check!` first/
@@ -494,12 +679,12 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi
end
end
- def existing_wal_location_key(idempotency_key, config_name)
- "#{idempotency_key}:#{config_name}:existing_wal_location"
+ def existing_wal_location_key(idempotency_key, connection_name)
+ "#{idempotency_key}:#{connection_name}:existing_wal_location"
end
- def wal_location_key(idempotency_key, config_name)
- "#{idempotency_key}:#{config_name}:wal_location"
+ def wal_location_key(idempotency_key, connection_name)
+ "#{idempotency_key}:#{connection_name}:wal_location"
end
def set_idempotency_key(key, value = '1')