diff options
Diffstat (limited to 'spec/workers/bulk_imports/pipeline_worker_spec.rb')
-rw-r--r-- | spec/workers/bulk_imports/pipeline_worker_spec.rb | 122 |
1 files changed, 121 insertions, 1 deletions
diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index 27151177634..972a4158194 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -8,10 +8,16 @@ RSpec.describe BulkImports::PipelineWorker do def initialize(_); end def run; end + + def self.ndjson_pipeline? + false + end end end - let_it_be(:entity) { create(:bulk_import_entity) } + let_it_be(:bulk_import) { create(:bulk_import) } + let_it_be(:config) { create(:bulk_import_configuration, bulk_import: bulk_import) } + let_it_be(:entity) { create(:bulk_import_entity, bulk_import: bulk_import) } before do stub_const('FakePipeline', pipeline_class) @@ -27,6 +33,7 @@ RSpec.describe BulkImports::PipelineWorker do expect(BulkImports::Stage) .to receive(:pipeline_exists?) .with('FakePipeline') + .twice .and_return(true) expect_next_instance_of(Gitlab::Import::Logger) do |logger| @@ -122,4 +129,117 @@ RSpec.describe BulkImports::PipelineWorker do expect(pipeline_tracker.jid).to eq('jid') end end + + context 'when ndjson pipeline' do + let(:ndjson_pipeline) do + Class.new do + def initialize(_); end + + def run; end + + def self.ndjson_pipeline? + true + end + + def self.relation + 'test' + end + end + end + + let(:pipeline_tracker) do + create( + :bulk_import_tracker, + entity: entity, + pipeline_name: 'NdjsonPipeline' + ) + end + + before do + stub_const('NdjsonPipeline', ndjson_pipeline) + allow(BulkImports::Stage) + .to receive(:pipeline_exists?) + .with('NdjsonPipeline') + .and_return(true) + end + + it 'runs the pipeline successfully' do + allow_next_instance_of(BulkImports::ExportStatus) do |status| + allow(status).to receive(:started?).and_return(false) + allow(status).to receive(:failed?).and_return(false) + end + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + expect(pipeline_tracker.reload.status_name).to eq(:finished) + end + + context 'when export status is started' do + it 'reenqueues pipeline worker' do + allow_next_instance_of(BulkImports::ExportStatus) do |status| + allow(status).to receive(:started?).and_return(true) + allow(status).to receive(:failed?).and_return(false) + end + + expect(described_class) + .to receive(:perform_in) + .with( + described_class::NDJSON_PIPELINE_PERFORM_DELAY, + pipeline_tracker.id, + pipeline_tracker.stage, + entity.id + ) + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + end + + context 'when job reaches timeout' do + it 'marks as failed and logs the error' do + old_created_at = entity.created_at + entity.update!(created_at: (BulkImports::Pipeline::NDJSON_EXPORT_TIMEOUT + 1.hour).ago) + + expect_next_instance_of(Gitlab::Import::Logger) do |logger| + expect(logger) + .to receive(:error) + .with( + worker: described_class.name, + pipeline_name: 'NdjsonPipeline', + entity_id: entity.id, + message: 'Pipeline timeout' + ) + end + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + expect(pipeline_tracker.reload.status_name).to eq(:failed) + + entity.update!(created_at: old_created_at) + end + end + + context 'when export status is failed' do + it 'marks as failed and logs the error' do + allow_next_instance_of(BulkImports::ExportStatus) do |status| + allow(status).to receive(:failed?).and_return(true) + allow(status).to receive(:error).and_return('Error!') + end + + expect_next_instance_of(Gitlab::Import::Logger) do |logger| + expect(logger) + .to receive(:error) + .with( + worker: described_class.name, + pipeline_name: 'NdjsonPipeline', + entity_id: entity.id, + message: 'Error!' + ) + end + + subject.perform(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + + expect(pipeline_tracker.reload.status_name).to eq(:failed) + end + end + end end |