summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLamont Granquist <lamont@opscode.com>2021-04-21 17:27:33 -0700
committerGitHub <noreply@github.com>2021-04-21 17:27:33 -0700
commit7ca520d143796128ad1bec1890275beff3628c65 (patch)
tree9763189b3d99204c44275774dfad2ac6c2b576de
parent256eda3e006627b38f429418beb4056ff772f51f (diff)
parenta01f9f49b65ae49a1881da7d4fbe812e1fbf2dbb (diff)
downloadchef-7ca520d143796128ad1bec1890275beff3628c65.tar.gz
Merge pull request #11397 from chef/lcg/replace-parallelizer
Replace the ChefFS parallelizer with parallel_map helper
-rw-r--r--Gemfile.lock1
-rw-r--r--chef-utils/chef-utils.gemspec4
-rw-r--r--chef-utils/lib/chef-utils/parallel_map.rb131
-rw-r--r--chef-utils/spec/unit/parallel_map_spec.rb156
-rw-r--r--knife/lib/chef/chef_fs/knife.rb8
-rw-r--r--lib/chef/chef_fs/command_line.rb7
-rw-r--r--lib/chef/chef_fs/file_system.rb17
-rw-r--r--lib/chef/chef_fs/parallelizer.rb102
-rw-r--r--lib/chef/chef_fs/parallelizer/flatten_enumerable.rb35
-rw-r--r--lib/chef/chef_fs/parallelizer/parallel_enumerable.rb278
-rw-r--r--spec/unit/chef_fs/parallelizer_spec.rb479
11 files changed, 310 insertions, 908 deletions
diff --git a/Gemfile.lock b/Gemfile.lock
index 0084b38a95..e1b5afb023 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -140,6 +140,7 @@ PATH
remote: chef-utils
specs:
chef-utils (17.0.219)
+ concurrent-ruby
GEM
remote: https://rubygems.org/
diff --git a/chef-utils/chef-utils.gemspec b/chef-utils/chef-utils.gemspec
index a72b32936d..8cbae405e3 100644
--- a/chef-utils/chef-utils.gemspec
+++ b/chef-utils/chef-utils.gemspec
@@ -41,6 +41,10 @@ Gem::Specification.new do |spec|
# ABSOLUTELY NO EXCEPTIONS
#
+ # concurrent-ruby is: 1. lightweight, 2. has zero deps, 3. is external to chef
+ # this is used for the parallel_map enumerable extension for lightweight threading
+ spec.add_dependency "concurrent-ruby"
+
spec.files = %w{Rakefile LICENSE} + Dir.glob("*.gemspec") +
Dir.glob("{lib,spec}/**/*", File::FNM_DOTMATCH).reject { |f| File.directory?(f) }
end
diff --git a/chef-utils/lib/chef-utils/parallel_map.rb b/chef-utils/lib/chef-utils/parallel_map.rb
new file mode 100644
index 0000000000..abc8279cc5
--- /dev/null
+++ b/chef-utils/lib/chef-utils/parallel_map.rb
@@ -0,0 +1,131 @@
+# frozen_string_literal: true
+#
+# Copyright:: Copyright (c) Chef Software Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require "concurrent/executors"
+require "concurrent/future"
+require "singleton" unless defined?(Singleton)
+
+module ChefUtils
+ #
+ # This module contains ruby refinements that adds several methods to the Enumerable
+ # class which are useful for parallel processing.
+ #
+ module ParallelMap
+ refine Enumerable do
+
+ # Enumerates through the collection in parallel using the thread pool provided
+ # or the default thread pool. By using the default thread pool this supports
+ # recursively calling the method without deadlocking while using a globally
+ # fixed number of workers. This method supports lazy collections. It returns
+ # synchronously, waiting until all the work is done. Failures are only reported
+ # after the collection has executed and only the first exception is raised.
+ #
+ # (0..).lazy.parallel_map { |i| i*i }.first(5)
+ #
+ # @return [Array] output results
+ #
+ def parallel_map(pool: nil)
+ return self unless block_given?
+
+ pool ||= ChefUtils::DefaultThreadPool.instance.pool
+
+ futures = map do |item|
+ future = Concurrent::Future.execute(executor: pool) do
+ yield item
+ end
+ end
+
+ futures.map(&:value!)
+ end
+
+ # This has the same behavior as parallel_map but returns the enumerator instead of
+ # the return values.
+ #
+ # @return [Enumerable] the enumerable for method chaining
+ #
+ def parallel_each(pool: nil, &block)
+ return self unless block_given?
+
+ parallel_map(pool: pool, &block)
+
+ self
+ end
+
+ # The flat_each method is tightly coupled to the usage of parallel_map within the
+ # ChefFS implementation. It is not itself a parallel method, but it is used to
+ # iterate through the 2nd level of nested structure, which is tied to the nested
+ # structures that ChefFS returns.
+ #
+ # This is different from Enumerable#flat_map because that behaves like map.flatten(1) while
+ # this behaves more like flatten(1).each. We need this on an Enumerable, so we have no
+ # Enumerable#flatten method to call.
+ #
+ # [ [ 1, 2 ], [ 3, 4 ] ].flat_each(&block) calls block four times with 1, 2, 3, 4
+ #
+ # [ [ 1, 2 ], [ 3, 4 ] ].flat_map(&block) calls block twice with [1, 2] and [3,4]
+ #
+ def flat_each(&block)
+ map do |value|
+ if value.is_a?(Enumerable)
+ value.each(&block)
+ else
+ yield value
+ end
+ end
+ end
+ end
+ end
+
+ # The DefaultThreadPool has a fixed thread size and has no
+ # queue of work and the behavior on failure to find a thread is for the
+ # caller to run the work. This contract means that the thread pool can
+ # be called recursively without deadlocking and while keeping the fixed
+ # number of threads (and not exponentially growing the thread pool with
+ # the depth of recursion).
+ #
+ class DefaultThreadPool
+ include Singleton
+
+ DEFAULT_THREAD_SIZE = 10
+
+ # Size of the thread pool, must be set before getting the thread pool or
+ # calling parallel_map/parallel_each. Does not (but could be modified to)
+ # support dynamic resizing. To get fully synchronous behavior set this equal to
+ # zero rather than one since the caller will get work if the threads are
+ # busy.
+ #
+ # @return [Integer] number of threads
+ attr_accessor :threads
+
+ # Memoizing accessor for the thread pool
+ #
+ # @return [Concurrent::ThreadPoolExecutor] the thread pool
+ def pool
+ @pool ||= Concurrent::ThreadPoolExecutor.new(
+ min_threads: threads || DEFAULT_THREAD_SIZE,
+ max_threads: threads || DEFAULT_THREAD_SIZE,
+ max_queue: 0,
+ # "synchronous" redefines the 0 in max_queue to mean 'no queue' instead of 'infinite queue'
+ # it does not mean synchronous execution (no threads) but synchronous offload to the threads.
+ synchronous: true,
+ # this prevents deadlocks on recursive parallel usage
+ fallback_policy: :caller_runs
+ )
+ end
+ end
+end
diff --git a/chef-utils/spec/unit/parallel_map_spec.rb b/chef-utils/spec/unit/parallel_map_spec.rb
new file mode 100644
index 0000000000..94a1b5342c
--- /dev/null
+++ b/chef-utils/spec/unit/parallel_map_spec.rb
@@ -0,0 +1,156 @@
+# frozen_string_literal: true
+#
+# Copyright:: Copyright (c) Chef Software Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require "chef-utils/parallel_map"
+
+using ChefUtils::ParallelMap
+
+RSpec.describe ChefUtils::ParallelMap do
+
+ shared_examples_for "common parallel API tests" do
+
+ before(:each) do
+ ChefUtils::DefaultThreadPool.instance.instance_variable_set(:@pool, nil)
+ ChefUtils::DefaultThreadPool.instance.threads = threads
+ end
+
+ after(:each) do
+ ChefUtils::DefaultThreadPool.instance.instance_variable_set(:@pool, nil)
+ end
+
+ it "parallel_map runs in parallel" do
+ # this is implicitly also testing that we run in the caller when we exhaust threads by running threads+1
+ val = threads + 1
+ ret = []
+ start = Time.now
+ (1..val).parallel_map do |i|
+ loop do
+ if val == i
+ ret << i
+ val -= 1
+ break
+ end
+ # we spin for quite awhile to wait for very slow testers if we have to
+ if Time.now - start > 30
+ raise "timed out; deadlocked due to lack of parallelization?"
+ end
+
+ # need to sleep a tiny bit to let other threads schedule
+ sleep 0.000001
+ end
+ end
+ expected = (1..threads + 1).to_a.reverse
+ expect(ret).to eql(expected)
+ end
+
+ it "parallel_each runs in parallel" do
+ # this is implicitly also testing that we run in the caller when we exhaust threads by running threads+1
+ val = threads + 1
+ ret = []
+ start = Time.now
+ (1..val).parallel_each do |i|
+ loop do
+ if val == i
+ ret << i
+ val -= 1
+ break
+ end
+ # we spin for quite awhile to wait for very slow testers if we have to
+ if Time.now - start > 30
+ raise "timed out; deadlocked due to lack of parallelization?"
+ end
+
+ # need to sleep a tiny bit to let other threads schedule
+ sleep 0.000001
+ end
+ end
+ expected = (1..threads + 1).to_a.reverse
+ expect(ret).to eql(expected)
+ end
+
+ it "parallel_map throws exceptions" do
+ expect { (0..10).parallel_map { |i| raise "boom" } }.to raise_error(RuntimeError)
+ end
+
+ it "parallel_each throws exceptions" do
+ expect { (0..10).parallel_each { |i| raise "boom" } }.to raise_error(RuntimeError)
+ end
+
+ it "parallel_map runs" do
+ ans = Timeout.timeout(30) do
+ (1..10).parallel_map { |i| i }
+ end
+ expect(ans).to eql((1..10).to_a)
+ end
+
+ it "parallel_each runs" do
+ Timeout.timeout(30) do
+ (1..10).parallel_each { |i| i }
+ end
+ end
+
+ it "recursive parallel_map will not deadlock" do
+ ans = Timeout.timeout(30) do
+ (1..2).parallel_map { |i| (1..2).parallel_map { |i| i } }
+ end
+ expect(ans).to eql([[1, 2], [1, 2]])
+ end
+
+ it "recursive parallel_each will not deadlock" do
+ ans = Timeout.timeout(30) do
+ (1..2).parallel_each { |i| (1..2).parallel_each { |i| i } }
+ end
+ end
+
+ it "parallel_map is lazy" do
+ ans = Timeout.timeout(30) do
+ (1..).lazy.parallel_map { |i| i }.first(5)
+ end
+ expect(ans).to eql((1..5).to_a)
+ end
+
+ it "parallel_each is lazy" do
+ ans = Timeout.timeout(30) do
+ (1..).lazy.parallel_each { |i| i }.first(5)
+ end
+ end
+ end
+
+ context "with 10 threads" do
+ let(:threads) { 10 }
+ it_behaves_like "common parallel API tests"
+ end
+
+ context "with 0 threads" do
+ let(:threads) { 0 }
+ it_behaves_like "common parallel API tests"
+ end
+
+ context "with 1 threads" do
+ let(:threads) { 1 }
+ it_behaves_like "common parallel API tests"
+ end
+
+ context "flat_each" do
+ it "runs each over items which are nested one level" do
+ sum = 0
+ [ [ 1, 2 ], [3, 4]].flat_each { |i| sum += i }
+ expect(sum).to eql(10)
+ end
+ end
+end
diff --git a/knife/lib/chef/chef_fs/knife.rb b/knife/lib/chef/chef_fs/knife.rb
index 9e165ab7ea..55473da8cd 100644
--- a/knife/lib/chef/chef_fs/knife.rb
+++ b/knife/lib/chef/chef_fs/knife.rb
@@ -19,6 +19,9 @@
require_relative "../knife"
require "pathname" unless defined?(Pathname)
require "chef-utils/dist" unless defined?(ChefUtils::Dist)
+require "chef-utils/parallel_map" unless defined?(ChefUtils::ParallelMap)
+
+using ChefUtils::ParallelMap
class Chef
module ChefFS
@@ -27,7 +30,6 @@ class Chef
def self.deps
super do
require "chef/config" unless defined?(Chef::Config)
- require "chef/chef_fs/parallelizer" unless defined?(Chef::ChefFS::Parallelizer)
require "chef/chef_fs/config" unless defined?(Chef::ChefFS::Config)
require "chef/chef_fs/file_pattern" unless defined?(Chef::ChefFS::FilePattern)
require "chef/chef_fs/path_utils" unless defined?(Chef::ChefFS::PathUtils)
@@ -70,7 +72,7 @@ class Chef
@chef_fs_config = Chef::ChefFS::Config.new(Chef::Config, Dir.pwd, config, ui)
- Chef::ChefFS::Parallelizer.threads = (Chef::Config[:concurrency] || 10) - 1
+ ChefUtils::DefaultThreadPool.instance.threads = (Chef::Config[:concurrency] || 10) - 1
end
def chef_fs
@@ -140,7 +142,7 @@ class Chef
end
def parallelize(inputs, options = {}, &block)
- Chef::ChefFS::Parallelizer.parallelize(inputs, options, &block)
+ inputs.parallel_map(&block)
end
def discover_repo_dir(dir)
diff --git a/lib/chef/chef_fs/command_line.rb b/lib/chef/chef_fs/command_line.rb
index 1e3aa137cd..16d9bd93f4 100644
--- a/lib/chef/chef_fs/command_line.rb
+++ b/lib/chef/chef_fs/command_line.rb
@@ -19,6 +19,9 @@
require_relative "file_system"
require_relative "file_system/exceptions"
require_relative "../util/diff"
+require "chef-utils/parallel_map" unless defined?(ChefUtils::ParallelMap)
+
+using ChefUtils::ParallelMap
class Chef
module ChefFS
@@ -140,7 +143,7 @@ class Chef
end
def self.diff(pattern, old_root, new_root, recurse_depth, get_content)
- Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root)) do |old_entry, new_entry|
+ Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root).parallel_map do |old_entry, new_entry|
diff_entries(old_entry, new_entry, recurse_depth, get_content)
end.flatten(1)
end
@@ -153,7 +156,7 @@ class Chef
if recurse_depth == 0
[ [ :common_subdirectories, old_entry, new_entry ] ]
else
- Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry)) do |old_child, new_child|
+ Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry).parallel_map do |old_child, new_child|
Chef::ChefFS::CommandLine.diff_entries(old_child, new_child, recurse_depth ? recurse_depth - 1 : nil, get_content)
end.flatten(1)
end
diff --git a/lib/chef/chef_fs/file_system.rb b/lib/chef/chef_fs/file_system.rb
index 73c3f0090a..5c4278c100 100644
--- a/lib/chef/chef_fs/file_system.rb
+++ b/lib/chef/chef_fs/file_system.rb
@@ -18,7 +18,9 @@
require_relative "path_utils"
require_relative "file_system/exceptions"
-require_relative "parallelizer"
+require "chef-utils/parallel_map" unless defined?(ChefUtils::ParallelMap)
+
+using ChefUtils::ParallelMap
class Chef
module ChefFS
@@ -70,8 +72,8 @@ class Chef
# Otherwise, go through all children and find any matches
elsif entry.dir?
- results = Parallelizer.parallelize(entry.children) { |child| Chef::ChefFS::FileSystem.list(child, pattern) }
- results.flatten(1).each(&block)
+ results = entry.children.parallel_map { |child| Chef::ChefFS::FileSystem.list(child, pattern) }
+ results.flat_each(&block)
end
end
end
@@ -138,7 +140,7 @@ class Chef
def self.copy_to(pattern, src_root, dest_root, recurse_depth, options, ui = nil, format_path = nil)
found_result = false
error = false
- parallel_do(list_pairs(pattern, src_root, dest_root)) do |src, dest|
+ list_pairs(pattern, src_root, dest_root).parallel_each do |src, dest|
found_result = true
new_dest_parent = get_or_create_parent(dest, options, ui, format_path)
child_error = copy_entries(src, dest, new_dest_parent, recurse_depth, options, ui, format_path)
@@ -319,7 +321,7 @@ class Chef
end
# Directory creation is recursive.
if recurse_depth != 0
- parallel_do(src_entry.children) do |src_child|
+ src_entry.children.parallel_each do |src_child|
new_dest_child = new_dest_dir.child(src_child.name)
child_error = copy_entries(src_child, new_dest_child, new_dest_dir, recurse_depth ? recurse_depth - 1 : recurse_depth, options, ui, format_path)
error ||= child_error
@@ -356,7 +358,7 @@ class Chef
if dest_entry.dir?
# If both are directories, recurse into their children
if recurse_depth != 0
- parallel_do(child_pairs(src_entry, dest_entry)) do |src_child, dest_child|
+ child_pairs(src_entry, dest_entry).parallel_each do |src_child, dest_child|
child_error = copy_entries(src_child, dest_child, dest_entry, recurse_depth ? recurse_depth - 1 : recurse_depth, options, ui, format_path)
error ||= child_error
end
@@ -423,9 +425,6 @@ class Chef
parent
end
- def parallel_do(enum, options = {}, &block)
- Chef::ChefFS::Parallelizer.parallel_do(enum, options, &block)
- end
end
end
end
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
deleted file mode 100644
index c4d17a842d..0000000000
--- a/lib/chef/chef_fs/parallelizer.rb
+++ /dev/null
@@ -1,102 +0,0 @@
-require_relative "parallelizer/parallel_enumerable"
-
-class Chef
- module ChefFS
- # Tries to balance several guarantees, in order of priority:
- # - don't get deadlocked
- # - provide results in desired order
- # - provide results as soon as they are available
- # - process input as soon as possible
- class Parallelizer
- @@parallelizer = nil
- @@threads = 0
-
- def self.threads=(value)
- @@threads = value
- @@parallelizer.resize(value) if @@parallelizer
- end
-
- def self.parallelizer
- @@parallelizer ||= Parallelizer.new(@@threads)
- end
-
- def self.parallelize(enumerable, options = {}, &block)
- parallelizer.parallelize(enumerable, options, &block)
- end
-
- def self.parallel_do(enumerable, options = {}, &block)
- parallelizer.parallel_do(enumerable, options, &block)
- end
-
- def initialize(num_threads)
- @tasks = Queue.new
- @threads = []
- @stop_thread = {}
- resize(num_threads)
- end
-
- def num_threads
- @threads.size
- end
-
- def parallelize(enumerable, options = {}, &block)
- ParallelEnumerable.new(@tasks, enumerable, options, &block)
- end
-
- def parallel_do(enumerable, options = {}, &block)
- ParallelEnumerable.new(@tasks, enumerable, options.merge(ordered: false), &block).wait
- end
-
- def stop(wait = true, timeout = nil)
- resize(0, wait, timeout)
- end
-
- def resize(to_threads, wait = true, timeout = nil)
- if to_threads < num_threads
- threads_to_stop = @threads[to_threads..num_threads - 1]
- @threads = @threads.slice(0, to_threads)
- threads_to_stop.each do |thread|
- @stop_thread[thread] = true
- end
-
- if wait
- start_time = Time.now
- threads_to_stop.each do |thread|
- thread_timeout = timeout ? timeout - (Time.now - start_time) : nil
- thread.join(thread_timeout)
- end
- end
-
- else
- num_threads.upto(to_threads - 1) do |i|
- @threads[i] = Thread.new(&method(:worker_loop))
- end
- end
- end
-
- def kill
- @threads.each do |thread|
- Thread.kill(thread)
- @stop_thread.delete(thread)
- end
- @threads = []
- end
-
- private
-
- def worker_loop
- until @stop_thread[Thread.current]
- begin
- task = @tasks.pop
- task.call
- rescue
- puts "ERROR #{$!}"
- puts $!.backtrace
- end
- end
- ensure
- @stop_thread.delete(Thread.current)
- end
- end
- end
-end
diff --git a/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb
deleted file mode 100644
index 84db2c2053..0000000000
--- a/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb
+++ /dev/null
@@ -1,35 +0,0 @@
-class Chef
- module ChefFS
- class Parallelizer
- class FlattenEnumerable
- include Enumerable
-
- def initialize(enum, levels = nil)
- @enum = enum
- @levels = levels
- end
-
- attr_reader :enum
- attr_reader :levels
-
- def each(&block)
- enum.each do |value|
- flatten(value, levels, &block)
- end
- end
-
- private
-
- def flatten(value, levels, &block)
- if levels != 0 && value.respond_to?(:each) && !value.is_a?(String)
- value.each do |child|
- flatten(child, levels.nil? ? levels : levels - 1, &block)
- end
- else
- yield(value)
- end
- end
- end
- end
- end
-end
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
deleted file mode 100644
index 5fafc5c88f..0000000000
--- a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
+++ /dev/null
@@ -1,278 +0,0 @@
-require_relative "flatten_enumerable"
-
-class Chef
- module ChefFS
- class Parallelizer
- class ParallelEnumerable
- include Enumerable
-
- # options:
- # :ordered [true|false] - whether the output should stay in the same order
- # as the input (even though it may not actually be processed in that
- # order). Default: true
- # :stop_on_exception [true|false] - if true, when an exception occurs in either
- # input or output, we wait for any outstanding processing to complete,
- # but will not process any new inputs. Default: false
- # :main_thread_processing [true|false] - whether the main thread pulling
- # on each() is allowed to process inputs. Default: true
- # NOTE: If you set this to false, parallelizer.kill will stop each()
- # in its tracks, so you need to know for sure that won't happen.
- def initialize(parent_task_queue, input_enumerable, options = {}, &block)
- @parent_task_queue = parent_task_queue
- @input_enumerable = input_enumerable
- @options = options
- @block = block
-
- @unconsumed_input = Queue.new
- @in_process = {}
- @unconsumed_output = Queue.new
- end
-
- attr_reader :parent_task_queue
- attr_reader :input_enumerable
- attr_reader :options
- attr_reader :block
-
- def each
- each_with_input do |output, index, input, type|
- yield output
- end
- end
-
- def each_with_index
- each_with_input do |output, index, input|
- yield output, index
- end
- end
-
- def each_with_input
- exception = nil
- each_with_exceptions do |output, index, input, type|
- if type == :exception
- if @options[:ordered] == false
- exception ||= output
- else
- raise output
- end
- else
- yield output, index, input
- end
- end
- raise exception if exception
- end
-
- def each_with_exceptions(&block)
- if @options[:ordered] == false
- each_with_exceptions_unordered(&block)
- else
- each_with_exceptions_ordered(&block)
- end
- end
-
- def wait
- exception = nil
- each_with_exceptions_unordered do |output, index, input, type|
- exception ||= output if type == :exception
- end
- raise exception if exception
- end
-
- # Enumerable methods
- def restricted_copy(enumerable)
- ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block)
- end
-
- alias :original_count :count
-
- def count(*args, &block)
- if args.size == 0 && block.nil?
- @input_enumerable.count
- else
- original_count(*args, &block)
- end
- end
-
- def first(n = nil)
- if n
- restricted_copy(@input_enumerable.first(n)).to_a
- else
- first(1)[0]
- end
- end
-
- def drop(n)
- restricted_copy(@input_enumerable.drop(n)).to_a
- end
-
- def flatten(levels = nil)
- FlattenEnumerable.new(self, levels)
- end
-
- def take(n)
- restricted_copy(@input_enumerable.take(n)).to_a
- end
-
- if Enumerable.method_defined?(:lazy)
- class RestrictedLazy
- def initialize(parallel_enumerable, actual_lazy)
- @parallel_enumerable = parallel_enumerable
- @actual_lazy = actual_lazy
- end
-
- def drop(*args, &block)
- input = @parallel_enumerable.input_enumerable.lazy.drop(*args, &block)
- @parallel_enumerable.restricted_copy(input)
- end
-
- def take(*args, &block)
- input = @parallel_enumerable.input_enumerable.lazy.take(*args, &block)
- @parallel_enumerable.restricted_copy(input)
- end
-
- def method_missing(method, *args, &block)
- @actual_lazy.send(:method, *args, &block)
- end
- end
-
- alias :original_lazy :lazy
-
- def lazy
- RestrictedLazy.new(self, original_lazy)
- end
- end
-
- private
-
- def each_with_exceptions_unordered
- if @each_running
- raise "each() called on parallel enumerable twice simultaneously! Bad mojo"
- end
-
- @each_running = true
- begin
- # Grab all the inputs, yielding any responses during enumeration
- # in case the enumeration itself takes time
- begin
- @input_enumerable.each_with_index do |input, index|
- @unconsumed_input.push([ input, index ])
- @parent_task_queue.push(method(:process_one))
-
- stop_processing_input = false
- until @unconsumed_output.empty?
- output, index, input, type = @unconsumed_output.pop
- yield output, index, input, type
- if type == :exception && @options[:stop_on_exception]
- stop_processing_input = true
- break
- end
- end
-
- if stop_processing_input
- break
- end
- end
- rescue
- # We still want to wait for the rest of the outputs to process
- @unconsumed_output.push([$!, nil, nil, :exception])
- if @options[:stop_on_exception]
- @unconsumed_input.clear
- end
- end
-
- until finished?
- # yield thread to others (for 1.8.7)
- if @unconsumed_output.empty?
- sleep(0.01)
- end
-
- yield @unconsumed_output.pop until @unconsumed_output.empty?
-
- # If no one is working on our tasks and we're allowed to
- # work on them in the main thread, process an input to
- # move things forward.
- if @in_process.size == 0 && !(@options[:main_thread_processing] == false)
- process_one
- end
- end
- rescue
- # If we exited early, perhaps due to any? finding a result, we want
- # to make sure and throw away any extra results (gracefully) so that
- # the next enumerator can start over.
- unless finished?
- stop
- end
- raise
- ensure
- @each_running = false
- end
- end
-
- def each_with_exceptions_ordered
- next_to_yield = 0
- unconsumed = {}
- each_with_exceptions_unordered do |output, index, input, type|
- unconsumed[index] = [ output, input, type ]
- while unconsumed[next_to_yield]
- input_output = unconsumed.delete(next_to_yield)
- yield input_output[0], next_to_yield, input_output[1], input_output[2]
- next_to_yield += 1
- end
- end
- input_exception = unconsumed.delete(nil)
- if input_exception
- yield input_exception[0], next_to_yield, input_exception[1], input_exception[2]
- end
- end
-
- def stop
- @unconsumed_input.clear
- sleep(0.05) while @in_process.size > 0
- @unconsumed_output.clear
- end
-
- #
- # This is thread safe only if called from the main thread pulling on each().
- # The order of these checks is important, as well, to be thread safe.
- # 1. If @unconsumed_input.empty? is true, then we will never have any more
- # work legitimately picked up.
- # 2. If @in_process == 0, then there is no work in process, and because of when unconsumed_input is empty, it will never go back up, because
- # this is called after the input enumerator is finished. Note that switching #2 and #1
- # could cause a race, because in_process is incremented *before* consuming input.
- # 3. If @unconsumed_output.empty? is true, then we are done with outputs.
- # Thus, 1+2 means no more output will ever show up, and 3 means we've passed all
- # existing outputs to the user.
- #
- def finished?
- @unconsumed_input.empty? && @in_process.size == 0 && @unconsumed_output.empty?
- end
-
- def process_one
- @in_process[Thread.current] = true
- begin
- begin
- input, index = @unconsumed_input.pop(true)
- process_input(input, index)
- rescue ThreadError
- end
- ensure
- @in_process.delete(Thread.current)
- end
- end
-
- def process_input(input, index)
- begin
- output = @block.call(input)
- @unconsumed_output.push([ output, index, input, :result ])
- rescue StandardError, ScriptError
- if @options[:stop_on_exception]
- @unconsumed_input.clear
- end
- @unconsumed_output.push([ $!, index, input, :exception ])
- end
-
- index
- end
- end
- end
- end
-end
diff --git a/spec/unit/chef_fs/parallelizer_spec.rb b/spec/unit/chef_fs/parallelizer_spec.rb
deleted file mode 100644
index 519a628347..0000000000
--- a/spec/unit/chef_fs/parallelizer_spec.rb
+++ /dev/null
@@ -1,479 +0,0 @@
-require "spec_helper"
-require "chef/chef_fs/parallelizer"
-
-# FIXME: these are disabled on MacOS due to timing issues in our anka build cluster
-# these issues should be fixed and the tests should be re-eenabled. If we are getting
-# omnibus test phases on mac tests which are reasonable and not ~3 hours long, then the
-# condition to avoid this testing on macs can be deleted
-describe Chef::ChefFS::Parallelizer, :not_supported_on_macos do
- before :each do
- @start_time = Time.now
- end
-
- def elapsed_time
- Time.now - @start_time
- end
-
- after :each do
- parallelizer.kill
- end
-
- context "With a Parallelizer with 5 threads" do
- let :parallelizer do
- Chef::ChefFS::Parallelizer.new(5)
- end
-
- def parallelize(inputs, options = {}, &block)
- parallelizer.parallelize(inputs, { main_thread_processing: false }.merge(options), &block)
- end
-
- it "parallel_do creates unordered output as soon as it is available" do
- outputs = []
- parallelizer.parallel_do([0.5, 0.3, 0.1]) do |val|
- sleep val
- outputs << val
- end
- expect(elapsed_time).to be < 0.6
- expect(outputs).to eq([ 0.1, 0.3, 0.5 ])
- end
-
- context "With :ordered => false (unordered output)" do
- it "An empty input produces an empty output" do
- expect(parallelize([], ordered: false) do
- sleep 10
- end.to_a).to eql([])
- expect(elapsed_time).to be < 0.1
- end
-
- it "10 sleep(0.2)s complete within 0.5 seconds" do
- expect(parallelize(1.upto(10), ordered: false) do |i|
- sleep 0.2
- "x"
- end.to_a).to eq(%w{x x x x x x x x x x})
- expect(elapsed_time).to be < 0.5
- end
-
- it "The output comes as soon as it is available" do
- enum = parallelize([0.5, 0.3, 0.1], ordered: false) do |val|
- sleep val
- val
- end
- expect(enum.map do |value|
- expect(elapsed_time).to be < value + 0.1
- value
- end).to eq([ 0.1, 0.3, 0.5 ])
- end
-
- it "An exception in input is passed through but does NOT stop processing" do
- input = TestEnumerable.new(0.5, 0.3, 0.1) do
- raise "hi"
- end
- enum = parallelize(input, ordered: false) { |x| sleep(x); x }
- results = []
- expect { enum.each { |value| results << value } }.to raise_error "hi"
- expect(results).to eq([ 0.1, 0.3, 0.5 ])
- expect(elapsed_time).to be < 0.6
- end
-
- it "Exceptions in output are raised after all processing is done" do
- processed = 0
- enum = parallelize([1, 2, "x", 3], ordered: false) do |x|
- if x == "x"
- sleep 0.1
- raise "hi"
- end
- sleep 0.2
- processed += 1
- x
- end
- results = []
- expect { enum.each { |value| results << value } }.to raise_error "hi"
- expect(results.sort).to eq([ 1, 2, 3 ])
- expect(elapsed_time).to be < 0.3
- expect(processed).to eq(3)
- end
-
- it "Exceptions with :stop_on_exception are raised after all processing is done" do
- processed = 0
- parallelized = parallelize([0.3, 0.3, "x", 0.3, 0.3, 0.3, 0.3, 0.3], ordered: false, stop_on_exception: true) do |x|
- if x == "x"
- sleep(0.1)
- raise "hi"
- end
- sleep(x)
- processed += 1
- x
- end
- expect { parallelized.to_a }.to raise_error "hi"
- expect(processed).to eq(4)
- end
- end
-
- context "With :ordered => true (ordered output)" do
- it "An empty input produces an empty output" do
- expect(parallelize([]) do
- sleep 10
- end.to_a).to eql([])
- expect(elapsed_time).to be < 0.1
- end
-
- it "10 sleep(0.2)s complete within 0.5 seconds" do
- expect(parallelize(1.upto(10), ordered: true) do |i|
- sleep 0.2
- "x"
- end.to_a).to eq(%w{x x x x x x x x x x})
- expect(elapsed_time).to be < 0.5
- end
-
- it "Output comes in the order of the input" do
- enum = parallelize([0.5, 0.3, 0.1]) do |val|
- sleep val
- val
- end.enum_for(:each_with_index)
- expect(enum.next).to eq([ 0.5, 0 ])
- expect(enum.next).to eq([ 0.3, 1 ])
- expect(enum.next).to eq([ 0.1, 2 ])
- expect(elapsed_time).to be < 0.6
- end
-
- it "Exceptions in input are raised in the correct sequence but do NOT stop processing" do
- input = TestEnumerable.new(0.5, 0.3, 0.1) do
- raise "hi"
- end
- results = []
- enum = parallelize(input) { |x| sleep(x); x }
- expect { enum.each { |value| results << value } }.to raise_error "hi"
- expect(elapsed_time).to be < 0.6
- expect(results).to eq([ 0.5, 0.3, 0.1 ])
- end
-
- it "Exceptions in output are raised in the correct sequence and running processes do NOT stop processing" do
- processed = 0
- enum = parallelize([1, 2, "x", 3]) do |x|
- if x == "x"
- sleep(0.1)
- raise "hi"
- end
- sleep(0.2)
- processed += 1
- x
- end
- results = []
- expect { enum.each { |value| results << value } }.to raise_error "hi"
- expect(results).to eq([ 1, 2 ])
- expect(elapsed_time).to be < 0.3
- expect(processed).to eq(3)
- end
-
- it "Exceptions with :stop_on_exception are raised after all processing is done" do
- processed = 0
- parallelized = parallelize([0.3, 0.3, "x", 0.3, 0.3, 0.3, 0.3, 0.3], ordered: false, stop_on_exception: true) do |x|
- if x == "x"
- sleep(0.1)
- raise "hi"
- end
- sleep(x)
- processed += 1
- x
- end
- expect { parallelized.to_a }.to raise_error "hi"
- expect(processed).to eq(4)
- end
- end
-
- it "When the input is slow, output still proceeds" do
- input = TestEnumerable.new do |&block|
- block.call(1)
- sleep 0.1
- block.call(2)
- sleep 0.1
- block.call(3)
- sleep 0.1
- end
- enum = parallelize(input) { |x| x }
- expect(enum.map do |value|
- expect(elapsed_time).to be < (value + 1) * 0.1
- value
- end).to eq([ 1, 2, 3 ])
- end
- end
-
- context "With a Parallelizer with 1 thread" do
- let :parallelizer do
- Chef::ChefFS::Parallelizer.new(1)
- end
-
- context "when the thread is occupied with a job" do
- before :each do
- parallelizer
- started = false
- @occupying_job_finished = occupying_job_finished = [ false ]
- @thread = Thread.new do
- parallelizer.parallelize([0], main_thread_processing: false) do |x|
- started = true
- sleep(0.3)
- occupying_job_finished[0] = true
- end.wait
- end
- sleep(0.01) until started
- end
-
- after :each do
- Thread.kill(@thread)
- end
-
- it "parallelize with :main_thread_processing = true does not block" do
- expect(parallelizer.parallelize([1]) do |x|
- sleep(0.1)
- x
- end.to_a).to eq([ 1 ])
- expect(elapsed_time).to be < 0.2
- end
-
- it "parallelize with :main_thread_processing = false waits for the job to finish" do
- expect(parallelizer.parallelize([1], main_thread_processing: false) do |x|
- sleep(0.1)
- x + 1
- end.to_a).to eq([ 2 ])
- expect(elapsed_time).to be > 0.3
- end
-
- it "resizing the Parallelizer to 0 waits for the job to stop" do
- expect(elapsed_time).to be < 0.2
- parallelizer.resize(0)
- expect(parallelizer.num_threads).to eq(0)
- expect(elapsed_time).to be > 0.25
- expect(@occupying_job_finished).to eq([ true ])
- end
-
- it "stopping the Parallelizer waits for the job to finish" do
- expect(elapsed_time).to be < 0.2
- parallelizer.stop
- expect(parallelizer.num_threads).to eq(0)
- expect(elapsed_time).to be > 0.25
- expect(@occupying_job_finished).to eq([ true ])
- end
-
- it "resizing the Parallelizer to 2 does not stop the job" do
- expect(elapsed_time).to be < 0.2
- parallelizer.resize(2)
- expect(parallelizer.num_threads).to eq(2)
- expect(elapsed_time).to be < 0.2
- sleep(0.3)
- expect(@occupying_job_finished).to eq([ true ])
- end
- end
-
- context "enumerable methods should run efficiently" do
- it ".count does not process anything" do
- outputs_processed = 0
- input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- sleep(0.05) # Just enough to yield and get other inputs in the queue
- x
- end
- expect(enum.count).to eq(6)
- expect(outputs_processed).to eq(0)
- expect(input_mapper.num_processed).to eq(6)
- end
-
- it ".count with arguments works normally" do
- outputs_processed = 0
- input_mapper = TestEnumerable.new(1, 1, 1, 1, 2, 2, 2, 3, 3, 4)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- x
- end
- expect(enum.count { |x| x > 1 }).to eq(6)
- expect(enum.count(2)).to eq(3)
- expect(outputs_processed).to eq(20)
- expect(input_mapper.num_processed).to eq(20)
- end
-
- it ".first does not enumerate anything other than the first result(s)" do
- outputs_processed = 0
- input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- sleep(0.05) # Just enough to yield and get other inputs in the queue
- x
- end
- expect(enum.first).to eq(1)
- expect(enum.first(2)).to eq([1, 2])
- expect(outputs_processed).to eq(3)
- expect(input_mapper.num_processed).to eq(3)
- end
-
- it ".take does not enumerate anything other than the first result(s)" do
- outputs_processed = 0
- input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- sleep(0.05) # Just enough to yield and get other inputs in the queue
- x
- end
- expect(enum.take(2)).to eq([1, 2])
- expect(outputs_processed).to eq(2)
- expect(input_mapper.num_processed).to eq(2)
- end
-
- it ".drop does not process anything other than the last result(s)" do
- outputs_processed = 0
- input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- sleep(0.05) # Just enough to yield and get other inputs in the queue
- x
- end
- expect(enum.drop(2)).to eq([3, 4, 5, 6])
- expect(outputs_processed).to eq(4)
- expect(input_mapper.num_processed).to eq(6)
- end
-
- if Enumerable.method_defined?(:lazy)
- it ".lazy.take does not enumerate anything other than the first result(s)" do
- outputs_processed = 0
- input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- sleep(0.05) # Just enough to yield and get other inputs in the queue
- x
- end
- expect(enum.lazy.take(2).to_a).to eq([1, 2])
- expect(outputs_processed).to eq(2)
- expect(input_mapper.num_processed).to eq(2)
- end
-
- it ".drop does not process anything other than the last result(s)" do
- outputs_processed = 0
- input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- sleep(0.05) # Just enough to yield and get other inputs in the queue
- x
- end
- expect(enum.lazy.drop(2).to_a).to eq([3, 4, 5, 6])
- expect(outputs_processed).to eq(4)
- expect(input_mapper.num_processed).to eq(6)
- end
-
- it "lazy enumerable is actually lazy" do
- outputs_processed = 0
- input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- sleep(0.05) # Just enough to yield and get other inputs in the queue
- x
- end
- enum.lazy.take(2)
- enum.lazy.drop(2)
- sleep(0.1)
- expect(outputs_processed).to eq(0)
- expect(input_mapper.num_processed).to eq(0)
- end
- end
- end
-
- context "running enumerable multiple times should function correctly" do
- it ".map twice on the same parallel enumerable returns the correct results and re-processes the input" do
- outputs_processed = 0
- input_mapper = TestEnumerable.new(1, 2, 3)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- x
- end
- expect(enum.map { |x| x }).to eq([1, 2, 3])
- expect(enum.map { |x| x }).to eq([1, 2, 3])
- expect(outputs_processed).to eq(6)
- expect(input_mapper.num_processed).to eq(6)
- end
-
- it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input" do
- outputs_processed = 0
- input_mapper = TestEnumerable.new(1, 2, 3)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- x
- end
- expect(enum.first).to eq(1)
- expect(enum.map { |x| x }).to eq([1, 2, 3])
- expect(outputs_processed).to be >= 4
- expect(input_mapper.num_processed).to be >= 4
- end
-
- it "two simultaneous enumerations throws an exception" do
- enum = parallelizer.parallelize([1, 2, 3]) { |x| x }
- a = enum.enum_for(:each)
- a.next
- expect do
- b = enum.enum_for(:each)
- b.next
- end.to raise_error(RuntimeError, "each() called on parallel enumerable twice simultaneously! Bad mojo")
- end
- end
- end
-
- context "With a Parallelizer with 0 threads" do
- let :parallelizer do
- Chef::ChefFS::Parallelizer.new(0)
- end
-
- context "And main_thread_processing on" do
- it "succeeds in running" do
- expect(parallelizer.parallelize([0.5]) { |x| x * 2 }.to_a).to eq([1])
- end
- end
- end
-
- context "With a Parallelizer with 10 threads" do
- let :parallelizer do
- Chef::ChefFS::Parallelizer.new(10)
- end
-
- it "does not have contention issues with large numbers of inputs" do
- expect(parallelizer.parallelize(1.upto(500)) { |x| x + 1 }.to_a).to eq(2.upto(501).to_a)
- end
-
- it "does not have contention issues with large numbers of inputs with ordering off" do
- expect(parallelizer.parallelize(1.upto(500), ordered: false) { |x| x + 1 }.to_a.sort).to eq(2.upto(501).to_a)
- end
-
- it "does not have contention issues with large numbers of jobs and inputs with ordering off" do
- parallelizers = 0.upto(99).map do
- parallelizer.parallelize(1.upto(500)) { |x| x + 1 }
- end
- outputs = []
- threads = 0.upto(99).map do |i|
- Thread.new { outputs[i] = parallelizers[i].to_a }
- end
- threads.each(&:join)
- outputs.each { |output| expect(output.sort).to eq(2.upto(501).to_a) }
- end
- end
-
- class TestEnumerable
- include Enumerable
-
- def initialize(*values, &block)
- @values = values
- @block = block
- @num_processed = 0
- end
-
- attr_reader :num_processed
-
- def each
- @values.each do |value|
- @num_processed += 1
- yield(value)
- end
- if @block
- @block.call do |value|
- @num_processed += 1
- yield(value)
- end
- end
- end
- end
-end