diff options
Diffstat (limited to 'app/workers/bulk_import_worker.rb')
-rw-r--r-- | app/workers/bulk_import_worker.rb | 53 |
1 files changed, 51 insertions, 2 deletions
diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index 7828d046036..81099d4e5f7 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -7,9 +7,58 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker sidekiq_options retry: false, dead: false - worker_has_external_dependencies! + PERFORM_DELAY = 5.seconds + DEFAULT_BATCH_SIZE = 5 def perform(bulk_import_id) - BulkImports::Importers::GroupsImporter.new(bulk_import_id).execute + @bulk_import = BulkImport.find_by_id(bulk_import_id) + + return unless @bulk_import + return if @bulk_import.finished? + return @bulk_import.finish! if all_entities_processed? && @bulk_import.started? + return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running + + @bulk_import.start! if @bulk_import.created? + + created_entities.first(next_batch_size).each do |entity| + entity.start! + + BulkImports::EntityWorker.perform_async(entity.id) + end + + re_enqueue + end + + private + + def entities + @entities ||= @bulk_import.entities + end + + def started_entities + entities.with_status(:started) + end + + def created_entities + entities.with_status(:created) + end + + def all_entities_processed? + entities.all? { |entity| entity.finished? || entity.failed? } + end + + def max_batch_size_exceeded? + started_entities.count >= DEFAULT_BATCH_SIZE + end + + def next_batch_size + [DEFAULT_BATCH_SIZE - started_entities.count, 0].max + end + + # A new BulkImportWorker job is enqueued to either + # - Process the new BulkImports::Entity created during import (e.g. for the subgroups) + # - Or to mark the `bulk_import` as finished + def re_enqueue + BulkImportWorker.perform_in(PERFORM_DELAY, @bulk_import.id) end end |