diff options
Diffstat (limited to 'app/workers/bulk_import_worker.rb')
-rw-r--r-- | app/workers/bulk_import_worker.rb | 16 |
1 files changed, 15 insertions, 1 deletions
diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index d5eca86744e..6bce13c5ff0 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -4,6 +4,7 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker PERFORM_DELAY = 5.seconds + DEFAULT_BATCH_SIZE = 5 data_consistency :always feature_category :importers @@ -16,10 +17,11 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker return if @bulk_import.finished? || @bulk_import.failed? return @bulk_import.fail_op! if all_entities_failed? return @bulk_import.finish! if all_entities_processed? && @bulk_import.started? + return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running @bulk_import.start! if @bulk_import.created? - created_entities.find_each do |entity| + created_entities.first(next_batch_size).each do |entity| BulkImports::CreatePipelineTrackersService.new(entity).execute! entity.start! @@ -58,4 +60,16 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker def re_enqueue BulkImportWorker.perform_in(PERFORM_DELAY, @bulk_import.id) end + + def started_entities + entities.with_status(:started) + end + + def max_batch_size_exceeded? + started_entities.count >= DEFAULT_BATCH_SIZE + end + + def next_batch_size + [DEFAULT_BATCH_SIZE - started_entities.count, 0].max + end end |