1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
|
# frozen_string_literal: true
require 'spec_helper'
RSpec.describe Gitlab::Database::LoadBalancing::SidekiqServerMiddleware do
let(:middleware) { described_class.new }
let(:load_balancer) { double.as_null_object }
let(:worker) { worker_class.new }
let(:job) { { "retry" => 3, "job_id" => "a180b47c-3fd6-41b8-81e9-34da61c3400e", 'database_replica_location' => '0/D525E3A8' } }
before do
skip_feature_flags_yaml_validation
skip_default_enabled_yaml_check
allow(::Gitlab::Database::LoadBalancing).to receive_message_chain(:proxy, :load_balancer).and_return(load_balancer)
replication_lag!(false)
end
after do
Gitlab::Database::LoadBalancing::Session.clear_session
end
describe '#call' do
shared_context 'data consistency worker class' do |data_consistency, feature_flag|
let(:worker_class) do
Class.new do
def self.name
'TestDataConsistencyWorker'
end
include ApplicationWorker
data_consistency data_consistency, feature_flag: feature_flag
def perform(*args)
end
end
end
before do
stub_const('TestDataConsistencyWorker', worker_class)
end
end
shared_examples_for 'load balancing strategy' do |strategy|
it "sets load balancing strategy to #{strategy}" do
run_middleware do
expect(job['load_balancing_strategy']).to eq(strategy)
end
end
end
shared_examples_for 'stick to the primary' do |expected_strategy|
it 'sticks to the primary' do
run_middleware do
expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).to be_truthy
end
end
include_examples 'load balancing strategy', expected_strategy
end
shared_examples_for 'replica is up to date' do |expected_strategy|
let(:location) {'0/D525E3A8' }
let(:wal_locations) { { Gitlab::Database::MAIN_DATABASE_NAME.to_sym => location } }
it 'does not stick to the primary', :aggregate_failures do
expect(load_balancer).to receive(:select_up_to_date_host).with(location).and_return(true)
run_middleware do
expect(Gitlab::Database::LoadBalancing::Session.current.use_primary?).not_to be_truthy
end
end
include_examples 'load balancing strategy', expected_strategy
end
shared_examples_for 'sticks based on data consistency' do |data_consistency|
include_context 'data consistency worker class', data_consistency, :load_balancing_for_test_data_consistency_worker
context 'when load_balancing_for_test_data_consistency_worker is disabled' do
before do
stub_feature_flags(load_balancing_for_test_data_consistency_worker: false)
end
include_examples 'stick to the primary', 'primary'
end
context 'when database wal location is set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e', 'wal_locations' => wal_locations } }
before do
allow(load_balancer).to receive(:select_up_to_date_host).with(location).and_return(true)
end
it_behaves_like 'replica is up to date', 'replica'
end
context 'when deduplication wal location is set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e', 'dedup_wal_locations' => wal_locations } }
before do
allow(load_balancer).to receive(:select_up_to_date_host).with(wal_locations[:main]).and_return(true)
end
it_behaves_like 'replica is up to date', 'replica'
end
context 'when legacy wal location is set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e', 'database_write_location' => '0/D525E3A8' } }
before do
allow(load_balancer).to receive(:select_up_to_date_host).with('0/D525E3A8').and_return(true)
end
it_behaves_like 'replica is up to date', 'replica'
end
context 'when database location is not set' do
let(:job) { { 'job_id' => 'a180b47c-3fd6-41b8-81e9-34da61c3400e' } }
include_examples 'stick to the primary', 'primary_no_wal'
end
end
context 'when worker class does not include ApplicationWorker' do
let(:worker) { ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper.new }
include_examples 'stick to the primary', 'primary'
end
context 'when worker data consistency is :always' do
include_context 'data consistency worker class', :always, :load_balancing_for_test_data_consistency_worker
include_examples 'stick to the primary', 'primary'
end
context 'when worker data consistency is :delayed' do
include_examples 'sticks based on data consistency', :delayed
context 'when replica is not up to date' do
before do
replication_lag!(true)
end
around do |example|
with_sidekiq_server_middleware do |chain|
chain.add described_class
Sidekiq::Testing.disable! { example.run }
end
end
context 'when job is executed first' do
it 'raises an error and retries', :aggregate_failures do
expect do
process_job(job)
end.to raise_error(Sidekiq::JobRetry::Skip)
expect(job['error_class']).to eq('Gitlab::Database::LoadBalancing::SidekiqServerMiddleware::JobReplicaNotUpToDate')
end
include_examples 'load balancing strategy', 'retry'
end
context 'when job is retried' do
before do
expect do
process_job(job)
end.to raise_error(Sidekiq::JobRetry::Skip)
end
context 'and replica still lagging behind' do
include_examples 'stick to the primary', 'primary'
end
context 'and replica is now up-to-date' do
before do
replication_lag!(false)
end
include_examples 'replica is up to date', 'replica_retried'
end
end
end
end
context 'when worker data consistency is :sticky' do
include_examples 'sticks based on data consistency', :sticky
context 'when replica is not up to date' do
before do
allow(load_balancer).to receive(:select_up_to_date_host).and_return(false)
end
include_examples 'stick to the primary', 'primary'
end
end
end
def process_job(job)
Sidekiq::JobRetry.new.local(worker_class, job, 'default') do
worker_class.process_job(job)
end
end
def run_middleware
middleware.call(worker, job, double(:queue)) { yield }
rescue described_class::JobReplicaNotUpToDate
# we silence errors here that cause the job to retry
end
def replication_lag!(exists)
allow(load_balancer).to receive(:select_up_to_date_host).and_return(!exists)
end
end
|