diff options
author | Lamont Granquist <lamont@scriptkiddie.org> | 2021-04-21 16:40:10 -0700 |
---|---|---|
committer | Lamont Granquist <lamont@scriptkiddie.org> | 2021-04-21 17:27:08 -0700 |
commit | a01f9f49b65ae49a1881da7d4fbe812e1fbf2dbb (patch) | |
tree | 9763189b3d99204c44275774dfad2ac6c2b576de | |
parent | 256eda3e006627b38f429418beb4056ff772f51f (diff) | |
download | chef-a01f9f49b65ae49a1881da7d4fbe812e1fbf2dbb.tar.gz |
Replace the ChefFS parallelizer with parallel_map helper
The existing parallelizer was impossible to understand and may have had
deadlocks in it causing test failures for quite a long time. With
Ruby 3.0 the issues with the parallizer tests became completely
untenable and unsupportable. This change replaces that with a
simpler parallel_map implementation that uses ruby-concurrency which
implements the necessary requirements of the old Parallelizer code in
a more readable and maintainable way.
Signed-off-by: Lamont Granquist <lamont@scriptkiddie.org>
-rw-r--r-- | Gemfile.lock | 1 | ||||
-rw-r--r-- | chef-utils/chef-utils.gemspec | 4 | ||||
-rw-r--r-- | chef-utils/lib/chef-utils/parallel_map.rb | 131 | ||||
-rw-r--r-- | chef-utils/spec/unit/parallel_map_spec.rb | 156 | ||||
-rw-r--r-- | knife/lib/chef/chef_fs/knife.rb | 8 | ||||
-rw-r--r-- | lib/chef/chef_fs/command_line.rb | 7 | ||||
-rw-r--r-- | lib/chef/chef_fs/file_system.rb | 17 | ||||
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 102 | ||||
-rw-r--r-- | lib/chef/chef_fs/parallelizer/flatten_enumerable.rb | 35 | ||||
-rw-r--r-- | lib/chef/chef_fs/parallelizer/parallel_enumerable.rb | 278 | ||||
-rw-r--r-- | spec/unit/chef_fs/parallelizer_spec.rb | 479 |
11 files changed, 310 insertions, 908 deletions
diff --git a/Gemfile.lock b/Gemfile.lock index 0084b38a95..e1b5afb023 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -140,6 +140,7 @@ PATH remote: chef-utils specs: chef-utils (17.0.219) + concurrent-ruby GEM remote: https://rubygems.org/ diff --git a/chef-utils/chef-utils.gemspec b/chef-utils/chef-utils.gemspec index a72b32936d..8cbae405e3 100644 --- a/chef-utils/chef-utils.gemspec +++ b/chef-utils/chef-utils.gemspec @@ -41,6 +41,10 @@ Gem::Specification.new do |spec| # ABSOLUTELY NO EXCEPTIONS # + # concurrent-ruby is: 1. lightweight, 2. has zero deps, 3. is external to chef + # this is used for the parallel_map enumerable extension for lightweight threading + spec.add_dependency "concurrent-ruby" + spec.files = %w{Rakefile LICENSE} + Dir.glob("*.gemspec") + Dir.glob("{lib,spec}/**/*", File::FNM_DOTMATCH).reject { |f| File.directory?(f) } end diff --git a/chef-utils/lib/chef-utils/parallel_map.rb b/chef-utils/lib/chef-utils/parallel_map.rb new file mode 100644 index 0000000000..abc8279cc5 --- /dev/null +++ b/chef-utils/lib/chef-utils/parallel_map.rb @@ -0,0 +1,131 @@ +# frozen_string_literal: true +# +# Copyright:: Copyright (c) Chef Software Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require "concurrent/executors" +require "concurrent/future" +require "singleton" unless defined?(Singleton) + +module ChefUtils + # + # This module contains ruby refinements that adds several methods to the Enumerable + # class which are useful for parallel processing. + # + module ParallelMap + refine Enumerable do + + # Enumerates through the collection in parallel using the thread pool provided + # or the default thread pool. By using the default thread pool this supports + # recursively calling the method without deadlocking while using a globally + # fixed number of workers. This method supports lazy collections. It returns + # synchronously, waiting until all the work is done. Failures are only reported + # after the collection has executed and only the first exception is raised. + # + # (0..).lazy.parallel_map { |i| i*i }.first(5) + # + # @return [Array] output results + # + def parallel_map(pool: nil) + return self unless block_given? + + pool ||= ChefUtils::DefaultThreadPool.instance.pool + + futures = map do |item| + future = Concurrent::Future.execute(executor: pool) do + yield item + end + end + + futures.map(&:value!) + end + + # This has the same behavior as parallel_map but returns the enumerator instead of + # the return values. + # + # @return [Enumerable] the enumerable for method chaining + # + def parallel_each(pool: nil, &block) + return self unless block_given? + + parallel_map(pool: pool, &block) + + self + end + + # The flat_each method is tightly coupled to the usage of parallel_map within the + # ChefFS implementation. It is not itself a parallel method, but it is used to + # iterate through the 2nd level of nested structure, which is tied to the nested + # structures that ChefFS returns. + # + # This is different from Enumerable#flat_map because that behaves like map.flatten(1) while + # this behaves more like flatten(1).each. We need this on an Enumerable, so we have no + # Enumerable#flatten method to call. + # + # [ [ 1, 2 ], [ 3, 4 ] ].flat_each(&block) calls block four times with 1, 2, 3, 4 + # + # [ [ 1, 2 ], [ 3, 4 ] ].flat_map(&block) calls block twice with [1, 2] and [3,4] + # + def flat_each(&block) + map do |value| + if value.is_a?(Enumerable) + value.each(&block) + else + yield value + end + end + end + end + end + + # The DefaultThreadPool has a fixed thread size and has no + # queue of work and the behavior on failure to find a thread is for the + # caller to run the work. This contract means that the thread pool can + # be called recursively without deadlocking and while keeping the fixed + # number of threads (and not exponentially growing the thread pool with + # the depth of recursion). + # + class DefaultThreadPool + include Singleton + + DEFAULT_THREAD_SIZE = 10 + + # Size of the thread pool, must be set before getting the thread pool or + # calling parallel_map/parallel_each. Does not (but could be modified to) + # support dynamic resizing. To get fully synchronous behavior set this equal to + # zero rather than one since the caller will get work if the threads are + # busy. + # + # @return [Integer] number of threads + attr_accessor :threads + + # Memoizing accessor for the thread pool + # + # @return [Concurrent::ThreadPoolExecutor] the thread pool + def pool + @pool ||= Concurrent::ThreadPoolExecutor.new( + min_threads: threads || DEFAULT_THREAD_SIZE, + max_threads: threads || DEFAULT_THREAD_SIZE, + max_queue: 0, + # "synchronous" redefines the 0 in max_queue to mean 'no queue' instead of 'infinite queue' + # it does not mean synchronous execution (no threads) but synchronous offload to the threads. + synchronous: true, + # this prevents deadlocks on recursive parallel usage + fallback_policy: :caller_runs + ) + end + end +end diff --git a/chef-utils/spec/unit/parallel_map_spec.rb b/chef-utils/spec/unit/parallel_map_spec.rb new file mode 100644 index 0000000000..94a1b5342c --- /dev/null +++ b/chef-utils/spec/unit/parallel_map_spec.rb @@ -0,0 +1,156 @@ +# frozen_string_literal: true +# +# Copyright:: Copyright (c) Chef Software Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require "chef-utils/parallel_map" + +using ChefUtils::ParallelMap + +RSpec.describe ChefUtils::ParallelMap do + + shared_examples_for "common parallel API tests" do + + before(:each) do + ChefUtils::DefaultThreadPool.instance.instance_variable_set(:@pool, nil) + ChefUtils::DefaultThreadPool.instance.threads = threads + end + + after(:each) do + ChefUtils::DefaultThreadPool.instance.instance_variable_set(:@pool, nil) + end + + it "parallel_map runs in parallel" do + # this is implicitly also testing that we run in the caller when we exhaust threads by running threads+1 + val = threads + 1 + ret = [] + start = Time.now + (1..val).parallel_map do |i| + loop do + if val == i + ret << i + val -= 1 + break + end + # we spin for quite awhile to wait for very slow testers if we have to + if Time.now - start > 30 + raise "timed out; deadlocked due to lack of parallelization?" + end + + # need to sleep a tiny bit to let other threads schedule + sleep 0.000001 + end + end + expected = (1..threads + 1).to_a.reverse + expect(ret).to eql(expected) + end + + it "parallel_each runs in parallel" do + # this is implicitly also testing that we run in the caller when we exhaust threads by running threads+1 + val = threads + 1 + ret = [] + start = Time.now + (1..val).parallel_each do |i| + loop do + if val == i + ret << i + val -= 1 + break + end + # we spin for quite awhile to wait for very slow testers if we have to + if Time.now - start > 30 + raise "timed out; deadlocked due to lack of parallelization?" + end + + # need to sleep a tiny bit to let other threads schedule + sleep 0.000001 + end + end + expected = (1..threads + 1).to_a.reverse + expect(ret).to eql(expected) + end + + it "parallel_map throws exceptions" do + expect { (0..10).parallel_map { |i| raise "boom" } }.to raise_error(RuntimeError) + end + + it "parallel_each throws exceptions" do + expect { (0..10).parallel_each { |i| raise "boom" } }.to raise_error(RuntimeError) + end + + it "parallel_map runs" do + ans = Timeout.timeout(30) do + (1..10).parallel_map { |i| i } + end + expect(ans).to eql((1..10).to_a) + end + + it "parallel_each runs" do + Timeout.timeout(30) do + (1..10).parallel_each { |i| i } + end + end + + it "recursive parallel_map will not deadlock" do + ans = Timeout.timeout(30) do + (1..2).parallel_map { |i| (1..2).parallel_map { |i| i } } + end + expect(ans).to eql([[1, 2], [1, 2]]) + end + + it "recursive parallel_each will not deadlock" do + ans = Timeout.timeout(30) do + (1..2).parallel_each { |i| (1..2).parallel_each { |i| i } } + end + end + + it "parallel_map is lazy" do + ans = Timeout.timeout(30) do + (1..).lazy.parallel_map { |i| i }.first(5) + end + expect(ans).to eql((1..5).to_a) + end + + it "parallel_each is lazy" do + ans = Timeout.timeout(30) do + (1..).lazy.parallel_each { |i| i }.first(5) + end + end + end + + context "with 10 threads" do + let(:threads) { 10 } + it_behaves_like "common parallel API tests" + end + + context "with 0 threads" do + let(:threads) { 0 } + it_behaves_like "common parallel API tests" + end + + context "with 1 threads" do + let(:threads) { 1 } + it_behaves_like "common parallel API tests" + end + + context "flat_each" do + it "runs each over items which are nested one level" do + sum = 0 + [ [ 1, 2 ], [3, 4]].flat_each { |i| sum += i } + expect(sum).to eql(10) + end + end +end diff --git a/knife/lib/chef/chef_fs/knife.rb b/knife/lib/chef/chef_fs/knife.rb index 9e165ab7ea..55473da8cd 100644 --- a/knife/lib/chef/chef_fs/knife.rb +++ b/knife/lib/chef/chef_fs/knife.rb @@ -19,6 +19,9 @@ require_relative "../knife" require "pathname" unless defined?(Pathname) require "chef-utils/dist" unless defined?(ChefUtils::Dist) +require "chef-utils/parallel_map" unless defined?(ChefUtils::ParallelMap) + +using ChefUtils::ParallelMap class Chef module ChefFS @@ -27,7 +30,6 @@ class Chef def self.deps super do require "chef/config" unless defined?(Chef::Config) - require "chef/chef_fs/parallelizer" unless defined?(Chef::ChefFS::Parallelizer) require "chef/chef_fs/config" unless defined?(Chef::ChefFS::Config) require "chef/chef_fs/file_pattern" unless defined?(Chef::ChefFS::FilePattern) require "chef/chef_fs/path_utils" unless defined?(Chef::ChefFS::PathUtils) @@ -70,7 +72,7 @@ class Chef @chef_fs_config = Chef::ChefFS::Config.new(Chef::Config, Dir.pwd, config, ui) - Chef::ChefFS::Parallelizer.threads = (Chef::Config[:concurrency] || 10) - 1 + ChefUtils::DefaultThreadPool.instance.threads = (Chef::Config[:concurrency] || 10) - 1 end def chef_fs @@ -140,7 +142,7 @@ class Chef end def parallelize(inputs, options = {}, &block) - Chef::ChefFS::Parallelizer.parallelize(inputs, options, &block) + inputs.parallel_map(&block) end def discover_repo_dir(dir) diff --git a/lib/chef/chef_fs/command_line.rb b/lib/chef/chef_fs/command_line.rb index 1e3aa137cd..16d9bd93f4 100644 --- a/lib/chef/chef_fs/command_line.rb +++ b/lib/chef/chef_fs/command_line.rb @@ -19,6 +19,9 @@ require_relative "file_system" require_relative "file_system/exceptions" require_relative "../util/diff" +require "chef-utils/parallel_map" unless defined?(ChefUtils::ParallelMap) + +using ChefUtils::ParallelMap class Chef module ChefFS @@ -140,7 +143,7 @@ class Chef end def self.diff(pattern, old_root, new_root, recurse_depth, get_content) - Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root)) do |old_entry, new_entry| + Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root).parallel_map do |old_entry, new_entry| diff_entries(old_entry, new_entry, recurse_depth, get_content) end.flatten(1) end @@ -153,7 +156,7 @@ class Chef if recurse_depth == 0 [ [ :common_subdirectories, old_entry, new_entry ] ] else - Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry)) do |old_child, new_child| + Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry).parallel_map do |old_child, new_child| Chef::ChefFS::CommandLine.diff_entries(old_child, new_child, recurse_depth ? recurse_depth - 1 : nil, get_content) end.flatten(1) end diff --git a/lib/chef/chef_fs/file_system.rb b/lib/chef/chef_fs/file_system.rb index 73c3f0090a..5c4278c100 100644 --- a/lib/chef/chef_fs/file_system.rb +++ b/lib/chef/chef_fs/file_system.rb @@ -18,7 +18,9 @@ require_relative "path_utils" require_relative "file_system/exceptions" -require_relative "parallelizer" +require "chef-utils/parallel_map" unless defined?(ChefUtils::ParallelMap) + +using ChefUtils::ParallelMap class Chef module ChefFS @@ -70,8 +72,8 @@ class Chef # Otherwise, go through all children and find any matches elsif entry.dir? - results = Parallelizer.parallelize(entry.children) { |child| Chef::ChefFS::FileSystem.list(child, pattern) } - results.flatten(1).each(&block) + results = entry.children.parallel_map { |child| Chef::ChefFS::FileSystem.list(child, pattern) } + results.flat_each(&block) end end end @@ -138,7 +140,7 @@ class Chef def self.copy_to(pattern, src_root, dest_root, recurse_depth, options, ui = nil, format_path = nil) found_result = false error = false - parallel_do(list_pairs(pattern, src_root, dest_root)) do |src, dest| + list_pairs(pattern, src_root, dest_root).parallel_each do |src, dest| found_result = true new_dest_parent = get_or_create_parent(dest, options, ui, format_path) child_error = copy_entries(src, dest, new_dest_parent, recurse_depth, options, ui, format_path) @@ -319,7 +321,7 @@ class Chef end # Directory creation is recursive. if recurse_depth != 0 - parallel_do(src_entry.children) do |src_child| + src_entry.children.parallel_each do |src_child| new_dest_child = new_dest_dir.child(src_child.name) child_error = copy_entries(src_child, new_dest_child, new_dest_dir, recurse_depth ? recurse_depth - 1 : recurse_depth, options, ui, format_path) error ||= child_error @@ -356,7 +358,7 @@ class Chef if dest_entry.dir? # If both are directories, recurse into their children if recurse_depth != 0 - parallel_do(child_pairs(src_entry, dest_entry)) do |src_child, dest_child| + child_pairs(src_entry, dest_entry).parallel_each do |src_child, dest_child| child_error = copy_entries(src_child, dest_child, dest_entry, recurse_depth ? recurse_depth - 1 : recurse_depth, options, ui, format_path) error ||= child_error end @@ -423,9 +425,6 @@ class Chef parent end - def parallel_do(enum, options = {}, &block) - Chef::ChefFS::Parallelizer.parallel_do(enum, options, &block) - end end end end diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb deleted file mode 100644 index c4d17a842d..0000000000 --- a/lib/chef/chef_fs/parallelizer.rb +++ /dev/null @@ -1,102 +0,0 @@ -require_relative "parallelizer/parallel_enumerable" - -class Chef - module ChefFS - # Tries to balance several guarantees, in order of priority: - # - don't get deadlocked - # - provide results in desired order - # - provide results as soon as they are available - # - process input as soon as possible - class Parallelizer - @@parallelizer = nil - @@threads = 0 - - def self.threads=(value) - @@threads = value - @@parallelizer.resize(value) if @@parallelizer - end - - def self.parallelizer - @@parallelizer ||= Parallelizer.new(@@threads) - end - - def self.parallelize(enumerable, options = {}, &block) - parallelizer.parallelize(enumerable, options, &block) - end - - def self.parallel_do(enumerable, options = {}, &block) - parallelizer.parallel_do(enumerable, options, &block) - end - - def initialize(num_threads) - @tasks = Queue.new - @threads = [] - @stop_thread = {} - resize(num_threads) - end - - def num_threads - @threads.size - end - - def parallelize(enumerable, options = {}, &block) - ParallelEnumerable.new(@tasks, enumerable, options, &block) - end - - def parallel_do(enumerable, options = {}, &block) - ParallelEnumerable.new(@tasks, enumerable, options.merge(ordered: false), &block).wait - end - - def stop(wait = true, timeout = nil) - resize(0, wait, timeout) - end - - def resize(to_threads, wait = true, timeout = nil) - if to_threads < num_threads - threads_to_stop = @threads[to_threads..num_threads - 1] - @threads = @threads.slice(0, to_threads) - threads_to_stop.each do |thread| - @stop_thread[thread] = true - end - - if wait - start_time = Time.now - threads_to_stop.each do |thread| - thread_timeout = timeout ? timeout - (Time.now - start_time) : nil - thread.join(thread_timeout) - end - end - - else - num_threads.upto(to_threads - 1) do |i| - @threads[i] = Thread.new(&method(:worker_loop)) - end - end - end - - def kill - @threads.each do |thread| - Thread.kill(thread) - @stop_thread.delete(thread) - end - @threads = [] - end - - private - - def worker_loop - until @stop_thread[Thread.current] - begin - task = @tasks.pop - task.call - rescue - puts "ERROR #{$!}" - puts $!.backtrace - end - end - ensure - @stop_thread.delete(Thread.current) - end - end - end -end diff --git a/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb deleted file mode 100644 index 84db2c2053..0000000000 --- a/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb +++ /dev/null @@ -1,35 +0,0 @@ -class Chef - module ChefFS - class Parallelizer - class FlattenEnumerable - include Enumerable - - def initialize(enum, levels = nil) - @enum = enum - @levels = levels - end - - attr_reader :enum - attr_reader :levels - - def each(&block) - enum.each do |value| - flatten(value, levels, &block) - end - end - - private - - def flatten(value, levels, &block) - if levels != 0 && value.respond_to?(:each) && !value.is_a?(String) - value.each do |child| - flatten(child, levels.nil? ? levels : levels - 1, &block) - end - else - yield(value) - end - end - end - end - end -end diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb deleted file mode 100644 index 5fafc5c88f..0000000000 --- a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb +++ /dev/null @@ -1,278 +0,0 @@ -require_relative "flatten_enumerable" - -class Chef - module ChefFS - class Parallelizer - class ParallelEnumerable - include Enumerable - - # options: - # :ordered [true|false] - whether the output should stay in the same order - # as the input (even though it may not actually be processed in that - # order). Default: true - # :stop_on_exception [true|false] - if true, when an exception occurs in either - # input or output, we wait for any outstanding processing to complete, - # but will not process any new inputs. Default: false - # :main_thread_processing [true|false] - whether the main thread pulling - # on each() is allowed to process inputs. Default: true - # NOTE: If you set this to false, parallelizer.kill will stop each() - # in its tracks, so you need to know for sure that won't happen. - def initialize(parent_task_queue, input_enumerable, options = {}, &block) - @parent_task_queue = parent_task_queue - @input_enumerable = input_enumerable - @options = options - @block = block - - @unconsumed_input = Queue.new - @in_process = {} - @unconsumed_output = Queue.new - end - - attr_reader :parent_task_queue - attr_reader :input_enumerable - attr_reader :options - attr_reader :block - - def each - each_with_input do |output, index, input, type| - yield output - end - end - - def each_with_index - each_with_input do |output, index, input| - yield output, index - end - end - - def each_with_input - exception = nil - each_with_exceptions do |output, index, input, type| - if type == :exception - if @options[:ordered] == false - exception ||= output - else - raise output - end - else - yield output, index, input - end - end - raise exception if exception - end - - def each_with_exceptions(&block) - if @options[:ordered] == false - each_with_exceptions_unordered(&block) - else - each_with_exceptions_ordered(&block) - end - end - - def wait - exception = nil - each_with_exceptions_unordered do |output, index, input, type| - exception ||= output if type == :exception - end - raise exception if exception - end - - # Enumerable methods - def restricted_copy(enumerable) - ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block) - end - - alias :original_count :count - - def count(*args, &block) - if args.size == 0 && block.nil? - @input_enumerable.count - else - original_count(*args, &block) - end - end - - def first(n = nil) - if n - restricted_copy(@input_enumerable.first(n)).to_a - else - first(1)[0] - end - end - - def drop(n) - restricted_copy(@input_enumerable.drop(n)).to_a - end - - def flatten(levels = nil) - FlattenEnumerable.new(self, levels) - end - - def take(n) - restricted_copy(@input_enumerable.take(n)).to_a - end - - if Enumerable.method_defined?(:lazy) - class RestrictedLazy - def initialize(parallel_enumerable, actual_lazy) - @parallel_enumerable = parallel_enumerable - @actual_lazy = actual_lazy - end - - def drop(*args, &block) - input = @parallel_enumerable.input_enumerable.lazy.drop(*args, &block) - @parallel_enumerable.restricted_copy(input) - end - - def take(*args, &block) - input = @parallel_enumerable.input_enumerable.lazy.take(*args, &block) - @parallel_enumerable.restricted_copy(input) - end - - def method_missing(method, *args, &block) - @actual_lazy.send(:method, *args, &block) - end - end - - alias :original_lazy :lazy - - def lazy - RestrictedLazy.new(self, original_lazy) - end - end - - private - - def each_with_exceptions_unordered - if @each_running - raise "each() called on parallel enumerable twice simultaneously! Bad mojo" - end - - @each_running = true - begin - # Grab all the inputs, yielding any responses during enumeration - # in case the enumeration itself takes time - begin - @input_enumerable.each_with_index do |input, index| - @unconsumed_input.push([ input, index ]) - @parent_task_queue.push(method(:process_one)) - - stop_processing_input = false - until @unconsumed_output.empty? - output, index, input, type = @unconsumed_output.pop - yield output, index, input, type - if type == :exception && @options[:stop_on_exception] - stop_processing_input = true - break - end - end - - if stop_processing_input - break - end - end - rescue - # We still want to wait for the rest of the outputs to process - @unconsumed_output.push([$!, nil, nil, :exception]) - if @options[:stop_on_exception] - @unconsumed_input.clear - end - end - - until finished? - # yield thread to others (for 1.8.7) - if @unconsumed_output.empty? - sleep(0.01) - end - - yield @unconsumed_output.pop until @unconsumed_output.empty? - - # If no one is working on our tasks and we're allowed to - # work on them in the main thread, process an input to - # move things forward. - if @in_process.size == 0 && !(@options[:main_thread_processing] == false) - process_one - end - end - rescue - # If we exited early, perhaps due to any? finding a result, we want - # to make sure and throw away any extra results (gracefully) so that - # the next enumerator can start over. - unless finished? - stop - end - raise - ensure - @each_running = false - end - end - - def each_with_exceptions_ordered - next_to_yield = 0 - unconsumed = {} - each_with_exceptions_unordered do |output, index, input, type| - unconsumed[index] = [ output, input, type ] - while unconsumed[next_to_yield] - input_output = unconsumed.delete(next_to_yield) - yield input_output[0], next_to_yield, input_output[1], input_output[2] - next_to_yield += 1 - end - end - input_exception = unconsumed.delete(nil) - if input_exception - yield input_exception[0], next_to_yield, input_exception[1], input_exception[2] - end - end - - def stop - @unconsumed_input.clear - sleep(0.05) while @in_process.size > 0 - @unconsumed_output.clear - end - - # - # This is thread safe only if called from the main thread pulling on each(). - # The order of these checks is important, as well, to be thread safe. - # 1. If @unconsumed_input.empty? is true, then we will never have any more - # work legitimately picked up. - # 2. If @in_process == 0, then there is no work in process, and because of when unconsumed_input is empty, it will never go back up, because - # this is called after the input enumerator is finished. Note that switching #2 and #1 - # could cause a race, because in_process is incremented *before* consuming input. - # 3. If @unconsumed_output.empty? is true, then we are done with outputs. - # Thus, 1+2 means no more output will ever show up, and 3 means we've passed all - # existing outputs to the user. - # - def finished? - @unconsumed_input.empty? && @in_process.size == 0 && @unconsumed_output.empty? - end - - def process_one - @in_process[Thread.current] = true - begin - begin - input, index = @unconsumed_input.pop(true) - process_input(input, index) - rescue ThreadError - end - ensure - @in_process.delete(Thread.current) - end - end - - def process_input(input, index) - begin - output = @block.call(input) - @unconsumed_output.push([ output, index, input, :result ]) - rescue StandardError, ScriptError - if @options[:stop_on_exception] - @unconsumed_input.clear - end - @unconsumed_output.push([ $!, index, input, :exception ]) - end - - index - end - end - end - end -end diff --git a/spec/unit/chef_fs/parallelizer_spec.rb b/spec/unit/chef_fs/parallelizer_spec.rb deleted file mode 100644 index 519a628347..0000000000 --- a/spec/unit/chef_fs/parallelizer_spec.rb +++ /dev/null @@ -1,479 +0,0 @@ -require "spec_helper" -require "chef/chef_fs/parallelizer" - -# FIXME: these are disabled on MacOS due to timing issues in our anka build cluster -# these issues should be fixed and the tests should be re-eenabled. If we are getting -# omnibus test phases on mac tests which are reasonable and not ~3 hours long, then the -# condition to avoid this testing on macs can be deleted -describe Chef::ChefFS::Parallelizer, :not_supported_on_macos do - before :each do - @start_time = Time.now - end - - def elapsed_time - Time.now - @start_time - end - - after :each do - parallelizer.kill - end - - context "With a Parallelizer with 5 threads" do - let :parallelizer do - Chef::ChefFS::Parallelizer.new(5) - end - - def parallelize(inputs, options = {}, &block) - parallelizer.parallelize(inputs, { main_thread_processing: false }.merge(options), &block) - end - - it "parallel_do creates unordered output as soon as it is available" do - outputs = [] - parallelizer.parallel_do([0.5, 0.3, 0.1]) do |val| - sleep val - outputs << val - end - expect(elapsed_time).to be < 0.6 - expect(outputs).to eq([ 0.1, 0.3, 0.5 ]) - end - - context "With :ordered => false (unordered output)" do - it "An empty input produces an empty output" do - expect(parallelize([], ordered: false) do - sleep 10 - end.to_a).to eql([]) - expect(elapsed_time).to be < 0.1 - end - - it "10 sleep(0.2)s complete within 0.5 seconds" do - expect(parallelize(1.upto(10), ordered: false) do |i| - sleep 0.2 - "x" - end.to_a).to eq(%w{x x x x x x x x x x}) - expect(elapsed_time).to be < 0.5 - end - - it "The output comes as soon as it is available" do - enum = parallelize([0.5, 0.3, 0.1], ordered: false) do |val| - sleep val - val - end - expect(enum.map do |value| - expect(elapsed_time).to be < value + 0.1 - value - end).to eq([ 0.1, 0.3, 0.5 ]) - end - - it "An exception in input is passed through but does NOT stop processing" do - input = TestEnumerable.new(0.5, 0.3, 0.1) do - raise "hi" - end - enum = parallelize(input, ordered: false) { |x| sleep(x); x } - results = [] - expect { enum.each { |value| results << value } }.to raise_error "hi" - expect(results).to eq([ 0.1, 0.3, 0.5 ]) - expect(elapsed_time).to be < 0.6 - end - - it "Exceptions in output are raised after all processing is done" do - processed = 0 - enum = parallelize([1, 2, "x", 3], ordered: false) do |x| - if x == "x" - sleep 0.1 - raise "hi" - end - sleep 0.2 - processed += 1 - x - end - results = [] - expect { enum.each { |value| results << value } }.to raise_error "hi" - expect(results.sort).to eq([ 1, 2, 3 ]) - expect(elapsed_time).to be < 0.3 - expect(processed).to eq(3) - end - - it "Exceptions with :stop_on_exception are raised after all processing is done" do - processed = 0 - parallelized = parallelize([0.3, 0.3, "x", 0.3, 0.3, 0.3, 0.3, 0.3], ordered: false, stop_on_exception: true) do |x| - if x == "x" - sleep(0.1) - raise "hi" - end - sleep(x) - processed += 1 - x - end - expect { parallelized.to_a }.to raise_error "hi" - expect(processed).to eq(4) - end - end - - context "With :ordered => true (ordered output)" do - it "An empty input produces an empty output" do - expect(parallelize([]) do - sleep 10 - end.to_a).to eql([]) - expect(elapsed_time).to be < 0.1 - end - - it "10 sleep(0.2)s complete within 0.5 seconds" do - expect(parallelize(1.upto(10), ordered: true) do |i| - sleep 0.2 - "x" - end.to_a).to eq(%w{x x x x x x x x x x}) - expect(elapsed_time).to be < 0.5 - end - - it "Output comes in the order of the input" do - enum = parallelize([0.5, 0.3, 0.1]) do |val| - sleep val - val - end.enum_for(:each_with_index) - expect(enum.next).to eq([ 0.5, 0 ]) - expect(enum.next).to eq([ 0.3, 1 ]) - expect(enum.next).to eq([ 0.1, 2 ]) - expect(elapsed_time).to be < 0.6 - end - - it "Exceptions in input are raised in the correct sequence but do NOT stop processing" do - input = TestEnumerable.new(0.5, 0.3, 0.1) do - raise "hi" - end - results = [] - enum = parallelize(input) { |x| sleep(x); x } - expect { enum.each { |value| results << value } }.to raise_error "hi" - expect(elapsed_time).to be < 0.6 - expect(results).to eq([ 0.5, 0.3, 0.1 ]) - end - - it "Exceptions in output are raised in the correct sequence and running processes do NOT stop processing" do - processed = 0 - enum = parallelize([1, 2, "x", 3]) do |x| - if x == "x" - sleep(0.1) - raise "hi" - end - sleep(0.2) - processed += 1 - x - end - results = [] - expect { enum.each { |value| results << value } }.to raise_error "hi" - expect(results).to eq([ 1, 2 ]) - expect(elapsed_time).to be < 0.3 - expect(processed).to eq(3) - end - - it "Exceptions with :stop_on_exception are raised after all processing is done" do - processed = 0 - parallelized = parallelize([0.3, 0.3, "x", 0.3, 0.3, 0.3, 0.3, 0.3], ordered: false, stop_on_exception: true) do |x| - if x == "x" - sleep(0.1) - raise "hi" - end - sleep(x) - processed += 1 - x - end - expect { parallelized.to_a }.to raise_error "hi" - expect(processed).to eq(4) - end - end - - it "When the input is slow, output still proceeds" do - input = TestEnumerable.new do |&block| - block.call(1) - sleep 0.1 - block.call(2) - sleep 0.1 - block.call(3) - sleep 0.1 - end - enum = parallelize(input) { |x| x } - expect(enum.map do |value| - expect(elapsed_time).to be < (value + 1) * 0.1 - value - end).to eq([ 1, 2, 3 ]) - end - end - - context "With a Parallelizer with 1 thread" do - let :parallelizer do - Chef::ChefFS::Parallelizer.new(1) - end - - context "when the thread is occupied with a job" do - before :each do - parallelizer - started = false - @occupying_job_finished = occupying_job_finished = [ false ] - @thread = Thread.new do - parallelizer.parallelize([0], main_thread_processing: false) do |x| - started = true - sleep(0.3) - occupying_job_finished[0] = true - end.wait - end - sleep(0.01) until started - end - - after :each do - Thread.kill(@thread) - end - - it "parallelize with :main_thread_processing = true does not block" do - expect(parallelizer.parallelize([1]) do |x| - sleep(0.1) - x - end.to_a).to eq([ 1 ]) - expect(elapsed_time).to be < 0.2 - end - - it "parallelize with :main_thread_processing = false waits for the job to finish" do - expect(parallelizer.parallelize([1], main_thread_processing: false) do |x| - sleep(0.1) - x + 1 - end.to_a).to eq([ 2 ]) - expect(elapsed_time).to be > 0.3 - end - - it "resizing the Parallelizer to 0 waits for the job to stop" do - expect(elapsed_time).to be < 0.2 - parallelizer.resize(0) - expect(parallelizer.num_threads).to eq(0) - expect(elapsed_time).to be > 0.25 - expect(@occupying_job_finished).to eq([ true ]) - end - - it "stopping the Parallelizer waits for the job to finish" do - expect(elapsed_time).to be < 0.2 - parallelizer.stop - expect(parallelizer.num_threads).to eq(0) - expect(elapsed_time).to be > 0.25 - expect(@occupying_job_finished).to eq([ true ]) - end - - it "resizing the Parallelizer to 2 does not stop the job" do - expect(elapsed_time).to be < 0.2 - parallelizer.resize(2) - expect(parallelizer.num_threads).to eq(2) - expect(elapsed_time).to be < 0.2 - sleep(0.3) - expect(@occupying_job_finished).to eq([ true ]) - end - end - - context "enumerable methods should run efficiently" do - it ".count does not process anything" do - outputs_processed = 0 - input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - sleep(0.05) # Just enough to yield and get other inputs in the queue - x - end - expect(enum.count).to eq(6) - expect(outputs_processed).to eq(0) - expect(input_mapper.num_processed).to eq(6) - end - - it ".count with arguments works normally" do - outputs_processed = 0 - input_mapper = TestEnumerable.new(1, 1, 1, 1, 2, 2, 2, 3, 3, 4) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - x - end - expect(enum.count { |x| x > 1 }).to eq(6) - expect(enum.count(2)).to eq(3) - expect(outputs_processed).to eq(20) - expect(input_mapper.num_processed).to eq(20) - end - - it ".first does not enumerate anything other than the first result(s)" do - outputs_processed = 0 - input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - sleep(0.05) # Just enough to yield and get other inputs in the queue - x - end - expect(enum.first).to eq(1) - expect(enum.first(2)).to eq([1, 2]) - expect(outputs_processed).to eq(3) - expect(input_mapper.num_processed).to eq(3) - end - - it ".take does not enumerate anything other than the first result(s)" do - outputs_processed = 0 - input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - sleep(0.05) # Just enough to yield and get other inputs in the queue - x - end - expect(enum.take(2)).to eq([1, 2]) - expect(outputs_processed).to eq(2) - expect(input_mapper.num_processed).to eq(2) - end - - it ".drop does not process anything other than the last result(s)" do - outputs_processed = 0 - input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - sleep(0.05) # Just enough to yield and get other inputs in the queue - x - end - expect(enum.drop(2)).to eq([3, 4, 5, 6]) - expect(outputs_processed).to eq(4) - expect(input_mapper.num_processed).to eq(6) - end - - if Enumerable.method_defined?(:lazy) - it ".lazy.take does not enumerate anything other than the first result(s)" do - outputs_processed = 0 - input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - sleep(0.05) # Just enough to yield and get other inputs in the queue - x - end - expect(enum.lazy.take(2).to_a).to eq([1, 2]) - expect(outputs_processed).to eq(2) - expect(input_mapper.num_processed).to eq(2) - end - - it ".drop does not process anything other than the last result(s)" do - outputs_processed = 0 - input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - sleep(0.05) # Just enough to yield and get other inputs in the queue - x - end - expect(enum.lazy.drop(2).to_a).to eq([3, 4, 5, 6]) - expect(outputs_processed).to eq(4) - expect(input_mapper.num_processed).to eq(6) - end - - it "lazy enumerable is actually lazy" do - outputs_processed = 0 - input_mapper = TestEnumerable.new(1, 2, 3, 4, 5, 6) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - sleep(0.05) # Just enough to yield and get other inputs in the queue - x - end - enum.lazy.take(2) - enum.lazy.drop(2) - sleep(0.1) - expect(outputs_processed).to eq(0) - expect(input_mapper.num_processed).to eq(0) - end - end - end - - context "running enumerable multiple times should function correctly" do - it ".map twice on the same parallel enumerable returns the correct results and re-processes the input" do - outputs_processed = 0 - input_mapper = TestEnumerable.new(1, 2, 3) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - x - end - expect(enum.map { |x| x }).to eq([1, 2, 3]) - expect(enum.map { |x| x }).to eq([1, 2, 3]) - expect(outputs_processed).to eq(6) - expect(input_mapper.num_processed).to eq(6) - end - - it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input" do - outputs_processed = 0 - input_mapper = TestEnumerable.new(1, 2, 3) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - x - end - expect(enum.first).to eq(1) - expect(enum.map { |x| x }).to eq([1, 2, 3]) - expect(outputs_processed).to be >= 4 - expect(input_mapper.num_processed).to be >= 4 - end - - it "two simultaneous enumerations throws an exception" do - enum = parallelizer.parallelize([1, 2, 3]) { |x| x } - a = enum.enum_for(:each) - a.next - expect do - b = enum.enum_for(:each) - b.next - end.to raise_error(RuntimeError, "each() called on parallel enumerable twice simultaneously! Bad mojo") - end - end - end - - context "With a Parallelizer with 0 threads" do - let :parallelizer do - Chef::ChefFS::Parallelizer.new(0) - end - - context "And main_thread_processing on" do - it "succeeds in running" do - expect(parallelizer.parallelize([0.5]) { |x| x * 2 }.to_a).to eq([1]) - end - end - end - - context "With a Parallelizer with 10 threads" do - let :parallelizer do - Chef::ChefFS::Parallelizer.new(10) - end - - it "does not have contention issues with large numbers of inputs" do - expect(parallelizer.parallelize(1.upto(500)) { |x| x + 1 }.to_a).to eq(2.upto(501).to_a) - end - - it "does not have contention issues with large numbers of inputs with ordering off" do - expect(parallelizer.parallelize(1.upto(500), ordered: false) { |x| x + 1 }.to_a.sort).to eq(2.upto(501).to_a) - end - - it "does not have contention issues with large numbers of jobs and inputs with ordering off" do - parallelizers = 0.upto(99).map do - parallelizer.parallelize(1.upto(500)) { |x| x + 1 } - end - outputs = [] - threads = 0.upto(99).map do |i| - Thread.new { outputs[i] = parallelizers[i].to_a } - end - threads.each(&:join) - outputs.each { |output| expect(output.sort).to eq(2.upto(501).to_a) } - end - end - - class TestEnumerable - include Enumerable - - def initialize(*values, &block) - @values = values - @block = block - @num_processed = 0 - end - - attr_reader :num_processed - - def each - @values.each do |value| - @num_processed += 1 - yield(value) - end - if @block - @block.call do |value| - @num_processed += 1 - yield(value) - end - end - end - end -end |