summaryrefslogtreecommitdiff
path: root/spec/lib/bulk_imports/pipeline/runner_spec.rb
diff options
context:
space:
mode:
Diffstat (limited to 'spec/lib/bulk_imports/pipeline/runner_spec.rb')
-rw-r--r--spec/lib/bulk_imports/pipeline/runner_spec.rb97
1 files changed, 67 insertions, 30 deletions
diff --git a/spec/lib/bulk_imports/pipeline/runner_spec.rb b/spec/lib/bulk_imports/pipeline/runner_spec.rb
index 60833e83dcc..76e4e64a7d6 100644
--- a/spec/lib/bulk_imports/pipeline/runner_spec.rb
+++ b/spec/lib/bulk_imports/pipeline/runner_spec.rb
@@ -39,56 +39,94 @@ RSpec.describe BulkImports::Pipeline::Runner do
extractor BulkImports::Extractor
transformer BulkImports::Transformer
loader BulkImports::Loader
+
+ def after_run(_); end
end
stub_const('BulkImports::MyPipeline', pipeline)
end
context 'when entity is not marked as failed' do
- let(:context) do
- instance_double(
- BulkImports::Pipeline::Context,
- entity: instance_double(BulkImports::Entity, id: 1, source_type: 'group', failed?: false)
- )
- end
+ let(:entity) { create(:bulk_import_entity) }
+ let(:context) { BulkImports::Pipeline::Context.new(entity) }
it 'runs pipeline extractor, transformer, loader' do
- entries = [{ foo: :bar }]
+ extracted_data = BulkImports::Pipeline::ExtractedData.new(data: { foo: :bar })
expect_next_instance_of(BulkImports::Extractor) do |extractor|
- expect(extractor).to receive(:extract).with(context).and_return(entries)
+ expect(extractor)
+ .to receive(:extract)
+ .with(context)
+ .and_return(extracted_data)
end
expect_next_instance_of(BulkImports::Transformer) do |transformer|
- expect(transformer).to receive(:transform).with(context, entries.first).and_return(entries.first)
+ expect(transformer)
+ .to receive(:transform)
+ .with(context, extracted_data.data.first)
+ .and_return(extracted_data.data.first)
end
expect_next_instance_of(BulkImports::Loader) do |loader|
- expect(loader).to receive(:load).with(context, entries.first)
+ expect(loader)
+ .to receive(:load)
+ .with(context, extracted_data.data.first)
end
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:info)
.with(
+ bulk_import_entity_id: entity.id,
+ bulk_import_entity_type: 'group_entity',
message: 'Pipeline started',
+ pipeline_class: 'BulkImports::MyPipeline'
+ )
+ expect(logger).to receive(:info)
+ .with(
+ bulk_import_entity_id: entity.id,
+ bulk_import_entity_type: 'group_entity',
pipeline_class: 'BulkImports::MyPipeline',
- bulk_import_entity_id: 1,
- bulk_import_entity_type: 'group'
+ pipeline_step: :extractor,
+ step_class: 'BulkImports::Extractor'
)
expect(logger).to receive(:info)
- .with(bulk_import_entity_id: 1, bulk_import_entity_type: 'group', extractor: 'BulkImports::Extractor')
+ .with(
+ bulk_import_entity_id: entity.id,
+ bulk_import_entity_type: 'group_entity',
+ pipeline_class: 'BulkImports::MyPipeline',
+ pipeline_step: :transformer,
+ step_class: 'BulkImports::Transformer'
+ )
expect(logger).to receive(:info)
- .with(bulk_import_entity_id: 1, bulk_import_entity_type: 'group', transformer: 'BulkImports::Transformer')
+ .with(
+ bulk_import_entity_id: entity.id,
+ bulk_import_entity_type: 'group_entity',
+ pipeline_class: 'BulkImports::MyPipeline',
+ pipeline_step: :loader,
+ step_class: 'BulkImports::Loader'
+ )
+ expect(logger).to receive(:info)
+ .with(
+ bulk_import_entity_id: entity.id,
+ bulk_import_entity_type: 'group_entity',
+ pipeline_class: 'BulkImports::MyPipeline',
+ pipeline_step: :after_run
+ )
expect(logger).to receive(:info)
- .with(bulk_import_entity_id: 1, bulk_import_entity_type: 'group', loader: 'BulkImports::Loader')
+ .with(
+ bulk_import_entity_id: entity.id,
+ bulk_import_entity_type: 'group_entity',
+ message: 'Pipeline finished',
+ pipeline_class: 'BulkImports::MyPipeline'
+ )
end
- BulkImports::MyPipeline.new.run(context)
+ BulkImports::MyPipeline.new(context).run
end
context 'when exception is raised' do
let(:entity) { create(:bulk_import_entity, :created) }
- let(:context) { BulkImports::Pipeline::Context.new(entity: entity) }
+ let(:context) { BulkImports::Pipeline::Context.new(entity) }
before do
allow_next_instance_of(BulkImports::Extractor) do |extractor|
@@ -97,12 +135,13 @@ RSpec.describe BulkImports::Pipeline::Runner do
end
it 'logs import failure' do
- BulkImports::MyPipeline.new.run(context)
+ BulkImports::MyPipeline.new(context).run
failure = entity.failures.first
expect(failure).to be_present
expect(failure.pipeline_class).to eq('BulkImports::MyPipeline')
+ expect(failure.pipeline_step).to eq('extractor')
expect(failure.exception_class).to eq('StandardError')
expect(failure.exception_message).to eq('Error!')
end
@@ -113,7 +152,7 @@ RSpec.describe BulkImports::Pipeline::Runner do
end
it 'marks entity as failed' do
- BulkImports::MyPipeline.new.run(context)
+ BulkImports::MyPipeline.new(context).run
expect(entity.failed?).to eq(true)
end
@@ -129,13 +168,13 @@ RSpec.describe BulkImports::Pipeline::Runner do
)
end
- BulkImports::MyPipeline.new.run(context)
+ BulkImports::MyPipeline.new(context).run
end
end
context 'when pipeline is not marked to abort on failure' do
it 'marks entity as failed' do
- BulkImports::MyPipeline.new.run(context)
+ BulkImports::MyPipeline.new(context).run
expect(entity.failed?).to eq(false)
end
@@ -144,25 +183,23 @@ RSpec.describe BulkImports::Pipeline::Runner do
end
context 'when entity is marked as failed' do
- let(:context) do
- instance_double(
- BulkImports::Pipeline::Context,
- entity: instance_double(BulkImports::Entity, id: 1, source_type: 'group', failed?: true)
- )
- end
+ let(:entity) { create(:bulk_import_entity) }
+ let(:context) { BulkImports::Pipeline::Context.new(entity) }
it 'logs and returns without execution' do
+ allow(entity).to receive(:failed?).and_return(true)
+
expect_next_instance_of(Gitlab::Import::Logger) do |logger|
expect(logger).to receive(:info)
.with(
message: 'Skipping due to failed pipeline status',
pipeline_class: 'BulkImports::MyPipeline',
- bulk_import_entity_id: 1,
- bulk_import_entity_type: 'group'
+ bulk_import_entity_id: entity.id,
+ bulk_import_entity_type: 'group_entity'
)
end
- BulkImports::MyPipeline.new.run(context)
+ BulkImports::MyPipeline.new(context).run
end
end
end