diff options
Diffstat (limited to 'spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb')
-rw-r--r-- | spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb | 252 |
1 files changed, 245 insertions, 7 deletions
diff --git a/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb b/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb index d67cb95f483..cc69a11f7f8 100644 --- a/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware/duplicate_jobs/duplicate_job_spec.rb @@ -9,7 +9,14 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi described_class.new(job, queue) end - let(:job) { { 'class' => 'AuthorizedProjectsWorker', 'args' => [1], 'jid' => '123' } } + let(:wal_locations) do + { + main: '0/D525E3A8', + ci: 'AB/12345' + } + end + + let(:job) { { 'class' => 'AuthorizedProjectsWorker', 'args' => [1], 'jid' => '123', 'wal_locations' => wal_locations } } let(:queue) { 'authorized_projects' } let(:idempotency_key) do @@ -74,13 +81,39 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi context 'when there was no job in the queue yet' do it { expect(duplicate_job.check!).to eq('123') } - it "adds a key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do + it "adds a idempotency key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do expect { duplicate_job.check! } .to change { read_idempotency_key_with_ttl(idempotency_key) } .from([nil, -2]) .to(['123', be_within(1).of(described_class::DUPLICATE_KEY_TTL)]) end + context 'when wal locations is not empty' do + it "adds a existing wal locations key with ttl set to #{described_class::DUPLICATE_KEY_TTL}" do + expect { duplicate_job.check! } + .to change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) } + .from([nil, -2]) + .to([wal_locations[:main], be_within(1).of(described_class::DUPLICATE_KEY_TTL)]) + .and change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) } + .from([nil, -2]) + .to([wal_locations[:ci], be_within(1).of(described_class::DUPLICATE_KEY_TTL)]) + end + end + + context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do + before do + stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false) + end + + it "does not change the existing wal locations key's TTL" do + expect { duplicate_job.check! } + .to not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) } + .from([nil, -2]) + .and not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) } + .from([nil, -2]) + end + end + it "adds the idempotency key to the jobs payload" do expect { duplicate_job.check! }.to change { job['idempotency_key'] }.from(nil).to(idempotency_key) end @@ -89,6 +122,9 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi context 'when there was already a job with same arguments in the same queue' do before do set_idempotency_key(idempotency_key, 'existing-key') + wal_locations.each do |config_name, location| + set_idempotency_key(existing_wal_location_key(idempotency_key, config_name), location) + end end it { expect(duplicate_job.check!).to eq('existing-key') } @@ -99,6 +135,14 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi .from(['existing-key', -1]) end + it "does not change the existing wal locations key's TTL" do + expect { duplicate_job.check! } + .to not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :main)) } + .from([wal_locations[:main], -1]) + .and not_change { read_idempotency_key_with_ttl(existing_wal_location_key(idempotency_key, :ci)) } + .from([wal_locations[:ci], -1]) + end + it 'sets the existing jid' do duplicate_job.check! @@ -107,6 +151,117 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi end end + describe '#update_latest_wal_location!' do + let(:offset) { '1024' } + + before do + allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:main).and_return(offset) + allow(duplicate_job).to receive(:pg_wal_lsn_diff).with(:ci).and_return(offset) + end + + shared_examples 'updates wal location' do + it 'updates a wal location to redis with an offset' do + expect { duplicate_job.update_latest_wal_location! } + .to change { read_range_from_redis(wal_location_key(idempotency_key, :main)) } + .from(existing_wal_with_offset[:main]) + .to(new_wal_with_offset[:main]) + .and change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) } + .from(existing_wal_with_offset[:ci]) + .to(new_wal_with_offset[:ci]) + end + end + + context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do + before do + stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false) + end + + it "doesn't call Sidekiq.redis" do + expect(Sidekiq).not_to receive(:redis) + + duplicate_job.update_latest_wal_location! + end + + it "doesn't update a wal location to redis with an offset" do + expect { duplicate_job.update_latest_wal_location! } + .to not_change { read_range_from_redis(wal_location_key(idempotency_key, :main)) } + .from([]) + .and not_change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) } + .from([]) + end + end + + context "when the key doesn't exists in redis" do + include_examples 'updates wal location' do + let(:existing_wal_with_offset) { { main: [], ci: [] } } + let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } } + end + end + + context "when the key exists in redis" do + let(:existing_offset) { '1023'} + let(:existing_wal_locations) do + { + main: '0/D525E3NM', + ci: 'AB/111112' + } + end + + before do + rpush_to_redis_key(wal_location_key(idempotency_key, :main), existing_wal_locations[:main], existing_offset) + rpush_to_redis_key(wal_location_key(idempotency_key, :ci), existing_wal_locations[:ci], existing_offset) + end + + context "when the new offset is bigger then the existing one" do + include_examples 'updates wal location' do + let(:existing_wal_with_offset) { existing_wal_locations.transform_values { |v| [v, existing_offset] } } + let(:new_wal_with_offset) { wal_locations.transform_values { |v| [v, offset] } } + end + end + + context "when the old offset is not bigger then the existing one" do + let(:existing_offset) { offset } + + it "does not update a wal location to redis with an offset" do + expect { duplicate_job.update_latest_wal_location! } + .to not_change { read_range_from_redis(wal_location_key(idempotency_key, :main)) } + .from([existing_wal_locations[:main], existing_offset]) + .and not_change { read_range_from_redis(wal_location_key(idempotency_key, :ci)) } + .from([existing_wal_locations[:ci], existing_offset]) + end + end + end + end + + describe '#latest_wal_locations' do + context 'when job was deduplicated and wal locations were already persisted' do + before do + rpush_to_redis_key(wal_location_key(idempotency_key, :main), wal_locations[:main], 1024) + rpush_to_redis_key(wal_location_key(idempotency_key, :ci), wal_locations[:ci], 1024) + end + + it { expect(duplicate_job.latest_wal_locations).to eq(wal_locations) } + end + + context 'when job is not deduplication and wal locations were not persisted' do + it { expect(duplicate_job.latest_wal_locations).to be_empty } + end + + context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do + before do + stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false) + end + + it "doesn't call Sidekiq.redis" do + expect(Sidekiq).not_to receive(:redis) + + duplicate_job.latest_wal_locations + end + + it { expect(duplicate_job.latest_wal_locations).to eq({}) } + end + end + describe '#delete!' do context "when we didn't track the definition" do it { expect { duplicate_job.delete! }.not_to raise_error } @@ -115,14 +270,79 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi context 'when the key exists in redis' do before do set_idempotency_key(idempotency_key, 'existing-jid') + wal_locations.each do |config_name, location| + set_idempotency_key(existing_wal_location_key(idempotency_key, config_name), location) + set_idempotency_key(wal_location_key(idempotency_key, config_name), location) + end end shared_examples 'deleting the duplicate job' do - it 'removes the key from redis' do - expect { duplicate_job.delete! } - .to change { read_idempotency_key_with_ttl(idempotency_key) } - .from(['existing-jid', -1]) - .to([nil, -2]) + shared_examples 'deleting keys from redis' do |key_name| + it "removes the #{key_name} from redis" do + expect { duplicate_job.delete! } + .to change { read_idempotency_key_with_ttl(key) } + .from([from_value, -1]) + .to([nil, -2]) + end + end + + shared_examples 'does not delete key from redis' do |key_name| + it "does not remove the #{key_name} from redis" do + expect { duplicate_job.delete! } + .to not_change { read_idempotency_key_with_ttl(key) } + .from([from_value, -1]) + end + end + + it_behaves_like 'deleting keys from redis', 'idempotent key' do + let(:key) { idempotency_key } + let(:from_value) { 'existing-jid' } + end + + it_behaves_like 'deleting keys from redis', 'existing wal location keys for main database' do + let(:key) { existing_wal_location_key(idempotency_key, :main) } + let(:from_value) { wal_locations[:main] } + end + + it_behaves_like 'deleting keys from redis', 'existing wal location keys for ci database' do + let(:key) { existing_wal_location_key(idempotency_key, :ci) } + let(:from_value) { wal_locations[:ci] } + end + + it_behaves_like 'deleting keys from redis', 'latest wal location keys for main database' do + let(:key) { wal_location_key(idempotency_key, :main) } + let(:from_value) { wal_locations[:main] } + end + + it_behaves_like 'deleting keys from redis', 'latest wal location keys for ci database' do + let(:key) { wal_location_key(idempotency_key, :ci) } + let(:from_value) { wal_locations[:ci] } + end + + context 'when preserve_latest_wal_locations_for_idempotent_jobs feature flag is disabled' do + before do + stub_feature_flags(preserve_latest_wal_locations_for_idempotent_jobs: false) + end + + it_behaves_like 'does not delete key from redis', 'latest wal location keys for main database' do + let(:key) { existing_wal_location_key(idempotency_key, :main) } + let(:from_value) { wal_locations[:main] } + end + + it_behaves_like 'does not delete key from redis', 'latest wal location keys for ci database' do + let(:key) { existing_wal_location_key(idempotency_key, :ci) } + let(:from_value) { wal_locations[:ci] } + end + + it_behaves_like 'does not delete key from redis', 'latest wal location keys for main database' do + let(:key) { wal_location_key(idempotency_key, :main) } + let(:from_value) { wal_locations[:main] } + end + + it_behaves_like 'does not delete key from redis', 'latest wal location keys for ci database' do + let(:key) { wal_location_key(idempotency_key, :ci) } + let(:from_value) { wal_locations[:ci] } + end end end @@ -254,10 +474,22 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi end end + def existing_wal_location_key(idempotency_key, config_name) + "#{idempotency_key}:#{config_name}:existing_wal_location" + end + + def wal_location_key(idempotency_key, config_name) + "#{idempotency_key}:#{config_name}:wal_location" + end + def set_idempotency_key(key, value = '1') Sidekiq.redis { |r| r.set(key, value) } end + def rpush_to_redis_key(key, wal, offset) + Sidekiq.redis { |r| r.rpush(key, [wal, offset]) } + end + def read_idempotency_key_with_ttl(key) Sidekiq.redis do |redis| redis.pipelined do |p| @@ -266,4 +498,10 @@ RSpec.describe Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob, :clean_gi end end end + + def read_range_from_redis(key) + Sidekiq.redis do |redis| + redis.lrange(key, 0, -1) + end + end end |