diff options
author | John Keiser <jkeiser@opscode.com> | 2014-05-20 12:21:18 -0700 |
---|---|---|
committer | John Keiser <jkeiser@opscode.com> | 2014-05-20 12:21:18 -0700 |
commit | 8453ed5652da2498a5bb7e15e3468e3fbffa720f (patch) | |
tree | a837b9046f6b485129b99c4a61c976ce99ed7665 | |
parent | 8a9b9bd5d186c300c4d1bd969b95b40a6f4d931c (diff) | |
parent | d6f24954a16aa5e2e60050e1344265353a381428 (diff) | |
download | chef-8453ed5652da2498a5bb7e15e3468e3fbffa720f.tar.gz |
Merge parallelizer robustification and enhancements
-rw-r--r-- | lib/chef/chef_fs/command_line.rb | 8 | ||||
-rw-r--r-- | lib/chef/chef_fs/file_system.rb | 6 | ||||
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 156 | ||||
-rw-r--r-- | lib/chef/chef_fs/parallelizer/flatten_enumerable.rb | 35 | ||||
-rw-r--r-- | lib/chef/chef_fs/parallelizer/parallel_enumerable.rb | 279 | ||||
-rw-r--r-- | lib/chef/knife/list.rb | 17 | ||||
-rw-r--r-- | lib/chef/knife/show.rb | 5 | ||||
-rw-r--r-- | spec/integration/knife/chef_repo_path_spec.rb | 2 | ||||
-rw-r--r-- | spec/unit/chef_fs/parallelizer.rb | 482 |
9 files changed, 881 insertions, 109 deletions
diff --git a/lib/chef/chef_fs/command_line.rb b/lib/chef/chef_fs/command_line.rb index d0183a5a2a..967c59ecae 100644 --- a/lib/chef/chef_fs/command_line.rb +++ b/lib/chef/chef_fs/command_line.rb @@ -129,9 +129,9 @@ class Chef end def self.diff(pattern, old_root, new_root, recurse_depth, get_content) - Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root), :flatten => true) do |old_entry, new_entry| + Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root)) do |old_entry, new_entry| diff_entries(old_entry, new_entry, recurse_depth, get_content) - end + end.flatten(1) end # Diff two known entries (could be files or dirs) @@ -142,9 +142,9 @@ class Chef if recurse_depth == 0 return [ [ :common_subdirectories, old_entry, new_entry ] ] else - return Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry), :flatten => true) do |old_child, new_child| + return Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry)) do |old_child, new_child| Chef::ChefFS::CommandLine.diff_entries(old_child, new_child, recurse_depth ? recurse_depth - 1 : nil, get_content) - end + end.flatten(1) end # If old is a directory and new is a file diff --git a/lib/chef/chef_fs/file_system.rb b/lib/chef/chef_fs/file_system.rb index f2478c4680..9fe726744e 100644 --- a/lib/chef/chef_fs/file_system.rb +++ b/lib/chef/chef_fs/file_system.rb @@ -72,8 +72,8 @@ class Chef # Otherwise, go through all children and find any matches elsif entry.dir? - results = Parallelizer::parallelize(entry.children, :flatten => true) { |child| Chef::ChefFS::FileSystem.list(child, pattern) } - results.each(&block) + results = Parallelizer::parallelize(entry.children) { |child| Chef::ChefFS::FileSystem.list(child, pattern) } + results.flatten(1).each(&block) end end end @@ -419,7 +419,7 @@ class Chef end def self.parallel_do(enum, options = {}, &block) - Chef::ChefFS::Parallelizer.parallelize(enum, options, &block).to_a + Chef::ChefFS::Parallelizer.parallel_do(enum, options, &block) end end end diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb index 84f3d4d870..116a626869 100644 --- a/lib/chef/chef_fs/parallelizer.rb +++ b/lib/chef/chef_fs/parallelizer.rb @@ -1,127 +1,103 @@ +require 'thread' +require 'chef/chef_fs/parallelizer/parallel_enumerable' + class Chef module ChefFS + # Tries to balance several guarantees, in order of priority: + # - don't get deadlocked + # - provide results in desired order + # - provide results as soon as they are available + # - process input as soon as possible class Parallelizer @@parallelizer = nil @@threads = 0 def self.threads=(value) - if @@threads != value - @@threads = value - @@parallelizer = nil - end + @@threads = value + @@parallelizer.resize(value) if @@parallelizer end - def self.parallelize(enumerator, options = {}, &block) + def self.parallelizer @@parallelizer ||= Parallelizer.new(@@threads) - @@parallelizer.parallelize(enumerator, options, &block) end - def initialize(threads) - @tasks_mutex = Mutex.new - @tasks = [] - @threads = [] - 1.upto(threads) do - @threads << Thread.new { worker_loop } - end + def self.parallelize(enumerable, options = {}, &block) + parallelizer.parallelize(enumerable, options, &block) end - def parallelize(enumerator, options = {}, &block) - task = ParallelizedResults.new(enumerator, options, &block) - @tasks_mutex.synchronize do - @tasks << task - end - task + def self.parallel_do(enumerable, options = {}, &block) + parallelizer.parallel_do(enumerable, options, &block) end - class ParallelizedResults - include Enumerable + def initialize(num_threads) + @tasks = Queue.new + @threads = [] + @stop_thread = {} + resize(num_threads) + end - def initialize(enumerator, options, &block) - @inputs = enumerator.to_a - @options = options - @block = block + def num_threads + @threads.size + end - @mutex = Mutex.new - @outputs = [] - @status = [] - end + def parallelize(enumerable, options = {}, &block) + ParallelEnumerable.new(@tasks, enumerable, options, &block) + end - def each - next_index = 0 - while true - # Report any results that already exist - while @status.length > next_index && ([:finished, :exception].include?(@status[next_index])) - if @status[next_index] == :finished - if @options[:flatten] - @outputs[next_index].each do |entry| - yield entry - end - else - yield @outputs[next_index] - end - else - raise @outputs[next_index] - end - next_index = next_index + 1 - end + def parallel_do(enumerable, options = {}, &block) + ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait + end - # Pick up a result and process it, if there is one. This ensures we - # move forward even if there are *zero* worker threads available. - if !process_input - # Exit if we're done. - if next_index >= @status.length - break - else - # Ruby 1.8 threading sucks. Wait till we process more things. - sleep(0.05) - end - end + def stop(wait = true, timeout = nil) + resize(0, wait, timeout) + end + + def resize(to_threads, wait = true, timeout = nil) + if to_threads < num_threads + threads_to_stop = @threads[to_threads..num_threads-1] + @threads = @threads.slice(0, to_threads) + threads_to_stop.each do |thread| + @stop_thread[thread] = true end - end - def process_input - # Grab the next one to process - index, input = @mutex.synchronize do - index = @status.length - if index >= @inputs.length - return nil + if wait + start_time = Time.now + threads_to_stop.each do |thread| + thread_timeout = timeout ? timeout - (Time.now - start_time) : nil + thread.join(thread_timeout) end - input = @inputs[index] - @status[index] = :started - [ index, input ] end - begin - @outputs[index] = @block.call(input) - @status[index] = :finished - rescue Exception - @outputs[index] = $! - @status[index] = :exception + else + num_threads.upto(to_threads - 1) do |i| + @threads[i] = Thread.new(&method(:worker_loop)) end - index end end + def kill + @threads.each do |thread| + Thread.kill(thread) + @stop_thread.delete(thread) + end + @threads = [] + end + private def worker_loop - while true - begin - task = @tasks[0] - if task - if !task.process_input - @tasks_mutex.synchronize do - @tasks.delete(task) - end - end - else - # Ruby 1.8 threading sucks. Wait a bit to see if another task comes in. - sleep(0.05) + begin + while !@stop_thread[Thread.current] + begin + task = @tasks.pop + task.call + rescue + puts "ERROR #{$!}" + puts $!.backtrace end - rescue - puts "ERROR #{$!}" - puts $!.backtrace end + ensure + @stop_thread.delete(Thread.current) end end end diff --git a/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb new file mode 100644 index 0000000000..7321aa0664 --- /dev/null +++ b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb @@ -0,0 +1,35 @@ +class Chef + module ChefFS + class Parallelizer + class FlattenEnumerable + include Enumerable + + def initialize(enum, levels = nil) + @enum = enum + @levels = levels + end + + attr_reader :enum + attr_reader :levels + + def each(&block) + enum.each do |value| + flatten(value, levels, &block) + end + end + + private + + def flatten(value, levels, &block) + if levels != 0 && value.respond_to?(:each) && !value.is_a?(String) + value.each do |child| + flatten(child, levels.nil? ? levels : levels-1, &block) + end + else + block.call(value) + end + end + end + end + end +end diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb new file mode 100644 index 0000000000..8e50f361db --- /dev/null +++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb @@ -0,0 +1,279 @@ +require 'chef/chef_fs/parallelizer/flatten_enumerable' + +class Chef + module ChefFS + class Parallelizer + class ParallelEnumerable + include Enumerable + + # options: + # :ordered [true|false] - whether the output should stay in the same order + # as the input (even though it may not actually be processed in that + # order). Default: true + # :stop_on_exception [true|false] - if true, when an exception occurs in either + # input or output, we wait for any outstanding processing to complete, + # but will not process any new inputs. Default: false + # :main_thread_processing [true|false] - whether the main thread pulling + # on each() is allowed to process inputs. Default: true + # NOTE: If you set this to false, parallelizer.kill will stop each() + # in its tracks, so you need to know for sure that won't happen. + def initialize(parent_task_queue, input_enumerable, options = {}, &block) + @parent_task_queue = parent_task_queue + @input_enumerable = input_enumerable + @options = options + @block = block + + @unconsumed_input = Queue.new + @in_process = {} + @unconsumed_output = Queue.new + end + + attr_reader :parent_task_queue + attr_reader :input_enumerable + attr_reader :options + attr_reader :block + + def each + each_with_input do |output, index, input, type| + yield output + end + end + + def each_with_index + each_with_input do |output, index, input| + yield output, index + end + end + + def each_with_input + exception = nil + each_with_exceptions do |output, index, input, type| + if type == :exception + if @options[:ordered] == false + exception ||= output + else + raise output + end + else + yield output, index, input + end + end + raise exception if exception + end + + def each_with_exceptions(&block) + if @options[:ordered] == false + each_with_exceptions_unordered(&block) + else + each_with_exceptions_ordered(&block) + end + end + + def wait + exception = nil + each_with_exceptions_unordered do |output, index, input, type| + exception ||= output if type == :exception + end + raise exception if exception + end + + # Enumerable methods + def restricted_copy(enumerable) + ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block) + end + + alias :original_count :count + + def count(*args, &block) + if args.size == 0 && block.nil? + @input_enumerable.count + else + original_count(*args, &block) + end + end + + def first(n=nil) + if n + restricted_copy(@input_enumerable.first(n)).to_a + else + first(1)[0] + end + end + + def drop(n) + restricted_copy(@input_enumerable.drop(n)).to_a + end + + def flatten(levels = nil) + FlattenEnumerable.new(self, levels) + end + + def take(n) + restricted_copy(@input_enumerable.take(n)).to_a + end + + if Enumerable.method_defined?(:lazy) + class RestrictedLazy + def initialize(parallel_enumerable, actual_lazy) + @parallel_enumerable = parallel_enumerable + @actual_lazy = actual_lazy + end + + def drop(*args, &block) + input = @parallel_enumerable.input_enumerable.lazy.drop(*args, &block) + @parallel_enumerable.restricted_copy(input) + end + + def take(*args, &block) + input = @parallel_enumerable.input_enumerable.lazy.take(*args, &block) + @parallel_enumerable.restricted_copy(input) + end + + def method_missing(method, *args, &block) + @actual_lazy.send(:method, *args, &block) + end + end + + alias :original_lazy :lazy + + def lazy + RestrictedLazy.new(self, original_lazy) + end + end + + private + + def each_with_exceptions_unordered + if @each_running + raise "each() called on parallel enumerable twice simultaneously! Bad mojo" + end + @each_running = true + begin + # Grab all the inputs, yielding any responses during enumeration + # in case the enumeration itself takes time + begin + @input_enumerable.each_with_index do |input, index| + @unconsumed_input.push([ input, index ]) + @parent_task_queue.push(method(:process_one)) + + stop_processing_input = false + while !@unconsumed_output.empty? + output, index, input, type = @unconsumed_output.pop + yield output, index, input, type + if type == :exception && @options[:stop_on_exception] + stop_processing_input = true + break + end + end + + if stop_processing_input + break + end + end + rescue + # We still want to wait for the rest of the outputs to process + @unconsumed_output.push([$!, nil, nil, :exception]) + if @options[:stop_on_exception] + @unconsumed_input.clear + end + end + + while !finished? + # yield thread to others (for 1.8.7) + if @unconsumed_output.empty? + sleep(0.01) + end + + while !@unconsumed_output.empty? + yield @unconsumed_output.pop + end + + # If no one is working on our tasks and we're allowed to + # work on them in the main thread, process an input to + # move things forward. + if @in_process.size == 0 && !(@options[:main_thread_processing] == false) + process_one + end + end + ensure + # If someone called "first" or something that exits the enumerator + # early, we want to make sure and throw away any extra results + # (gracefully) so that the next enumerator can start over. + if !finished? + stop + end + @each_running = false + end + end + + def each_with_exceptions_ordered + next_to_yield = 0 + unconsumed = {} + each_with_exceptions_unordered do |output, index, input, type| + unconsumed[index] = [ output, input, type ] + while unconsumed[next_to_yield] + input_output = unconsumed.delete(next_to_yield) + yield input_output[0], next_to_yield, input_output[1], input_output[2] + next_to_yield += 1 + end + end + input_exception = unconsumed.delete(nil) + if input_exception + yield input_exception[0], next_to_yield, input_exception[1], input_exception[2] + end + end + + def stop + @unconsumed_input.clear + while @in_process.size > 0 + sleep(0.05) + end + @unconsumed_output.clear + end + + # + # This is thread safe only if called from the main thread pulling on each(). + # The order of these checks is important, as well, to be thread safe. + # 1. If @unconsumed_input.empty? is true, then we will never have any more + # work legitimately picked up. + # 2. If @in_process == 0, then there is no work in process, and because ofwhen unconsumed_input is empty, it will never go back up, because + # this is called after the input enumerator is finished. Note that switching #2 and #1 + # could cause a race, because in_process is incremented *before* consuming input. + # 3. If @unconsumed_output.empty? is true, then we are done with outputs. + # Thus, 1+2 means no more output will ever show up, and 3 means we've passed all + # existing outputs to the user. + # + def finished? + @unconsumed_input.empty? && @in_process.size == 0 && @unconsumed_output.empty? + end + + def process_one + @in_process[Thread.current] = true + begin + begin + input, index = @unconsumed_input.pop(true) + process_input(input, index) + rescue ThreadError + end + ensure + @in_process.delete(Thread.current) + end + end + + def process_input(input, index) + begin + output = @block.call(input) + @unconsumed_output.push([ output, index, input, :result ]) + rescue + if @options[:stop_on_exception] + @unconsumed_input.clear + end + @unconsumed_output.push([ $!, index, input, :exception ]) + end + + index + end + end + end + end +end diff --git a/lib/chef/knife/list.rb b/lib/chef/knife/list.rb index 4338e195bd..137d61f3a5 100644 --- a/lib/chef/knife/list.rb +++ b/lib/chef/knife/list.rb @@ -43,21 +43,23 @@ class Chef def run patterns = name_args.length == 0 ? [""] : name_args - # Get the matches (recursively) - all_results = parallelize(pattern_args_from(patterns), :flatten => true) do |pattern| - pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern) + # Get the top-level matches + args = pattern_args_from(patterns) + all_results = parallelize(pattern_args_from(patterns)) do |pattern| + pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern).to_a if pattern_results.first && !pattern_results.first.exists? && pattern.exact_path ui.error "#{format_path(pattern_results.first)}: No such file or directory" self.exit_code = 1 end pattern_results - end + end.flatten(1).to_a # Process directories if !config[:bare_directories] - dir_results = parallelize(all_results.select { |result| result.dir? }, :flatten => true) do |result| + dir_results = parallelize(all_results.select { |result| result.dir? }) do |result| add_dir_result(result) - end.to_a + end.flatten(1) + else dir_results = [] end @@ -109,7 +111,7 @@ class Chef result = [ [ result, children ] ] if config[:recursive] child_dirs = children.select { |child| child.dir? } - result += parallelize(child_dirs, :flatten => true) { |child| add_dir_result(child) }.to_a + result += parallelize(child_dirs) { |child| add_dir_result(child) }.flatten(1).to_a end result end @@ -152,4 +154,3 @@ class Chef end end end - diff --git a/lib/chef/knife/show.rb b/lib/chef/knife/show.rb index acf1996e96..4684a6ac7e 100644 --- a/lib/chef/knife/show.rb +++ b/lib/chef/knife/show.rb @@ -20,7 +20,7 @@ class Chef def run # Get the matches (recursively) error = false - entry_values = parallelize(pattern_args, :flatten => true) do |pattern| + entry_values = parallelize(pattern_args) do |pattern| parallelize(Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern)) do |entry| if entry.dir? ui.error "#{format_path(entry)}: is a directory" if pattern.exact_path @@ -40,7 +40,7 @@ class Chef end end end - end + end.flatten(1) entry_values.each do |entry, value| if entry output "#{format_path(entry)}:" @@ -54,4 +54,3 @@ class Chef end end end - diff --git a/spec/integration/knife/chef_repo_path_spec.rb b/spec/integration/knife/chef_repo_path_spec.rb index 4ffb179a4b..87619d8a58 100644 --- a/spec/integration/knife/chef_repo_path_spec.rb +++ b/spec/integration/knife/chef_repo_path_spec.rb @@ -101,7 +101,7 @@ EOM /users/ /users/user3.json EOM - end + end context 'when cwd is at the top level' do cwd '.' diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb new file mode 100644 index 0000000000..a871b60e98 --- /dev/null +++ b/spec/unit/chef_fs/parallelizer.rb @@ -0,0 +1,482 @@ +require 'spec_helper' +require 'chef/chef_fs/parallelizer' + +describe Chef::ChefFS::Parallelizer do + before :each do + @start_time = Time.now + end + + def elapsed_time + Time.now - @start_time + end + + after :each do + parallelizer.kill + end + + context 'With a Parallelizer with 5 threads' do + let :parallelizer do + Chef::ChefFS::Parallelizer.new(5) + end + + def parallelize(inputs, options = {}, &block) + parallelizer.parallelize(inputs, { :main_thread_processing => false }.merge(options), &block) + end + + it "parallel_do creates unordered output as soon as it is available" do + outputs = [] + parallelizer.parallel_do([0.5,0.3,0.1]) do |val| + sleep val + outputs << val + end + elapsed_time.should < 0.6 + outputs.should == [ 0.1, 0.3, 0.5 ] + end + + context "With :ordered => false (unordered output)" do + it "An empty input produces an empty output" do + parallelize([], :ordered => false) do + sleep 10 + end.to_a == [] + elapsed_time.should < 0.1 + end + + it "10 sleep(0.2)s complete within 0.5 seconds" do + parallelize(1.upto(10), :ordered => false) do |i| + sleep 0.2 + 'x' + end.to_a.should == %w(x x x x x x x x x x) + elapsed_time.should < 0.5 + end + + it "The output comes as soon as it is available" do + enum = parallelize([0.5,0.3,0.1], :ordered => false) do |val| + sleep val + val + end + enum.map do |value| + elapsed_time.should < value+0.1 + value + end.should == [ 0.1, 0.3, 0.5 ] + end + + it "An exception in input is passed through but does NOT stop processing" do + input = TestEnumerable.new(0.5,0.3,0.1) do + raise 'hi' + end + enum = parallelize(input, :ordered => false) { |x| sleep(x); x } + results = [] + expect { enum.each { |value| results << value } }.to raise_error 'hi' + results.should == [ 0.1, 0.3, 0.5 ] + elapsed_time.should < 0.6 + end + + it "Exceptions in output are raised after all processing is done" do + processed = 0 + enum = parallelize([1,2,'x',3], :ordered => false) do |x| + if x == 'x' + sleep 0.1 + raise 'hi' + end + sleep 0.2 + processed += 1 + x + end + results = [] + expect { enum.each { |value| results << value } }.to raise_error 'hi' + results.sort.should == [ 1, 2, 3 ] + elapsed_time.should < 0.3 + processed.should == 3 + end + + it "Exceptions with :stop_on_exception are raised after all processing is done" do + processed = 0 + parallelized = parallelize([0.3,0.3,'x',0.3,0.3,0.3,0.3,0.3], :ordered => false, :stop_on_exception => true) do |x| + if x == 'x' + sleep(0.1) + raise 'hi' + end + sleep(x) + processed += 1 + x + end + expect { parallelized.to_a }.to raise_error 'hi' + processed.should == 4 + end + end + + context "With :ordered => true (ordered output)" do + it "An empty input produces an empty output" do + parallelize([]) do + sleep 10 + end.to_a == [] + elapsed_time.should < 0.1 + end + + it "10 sleep(0.2)s complete within 0.5 seconds" do + parallelize(1.upto(10), :ordered => true) do |i| + sleep 0.2 + 'x' + end.to_a.should == %w(x x x x x x x x x x) + elapsed_time.should < 0.5 + end + + it "Output comes in the order of the input" do + enum = parallelize([0.5,0.3,0.1]) do |val| + sleep val + val + end.enum_for(:each_with_index) + enum.next.should == [ 0.5, 0 ] + enum.next.should == [ 0.3, 1 ] + enum.next.should == [ 0.1, 2 ] + elapsed_time.should < 0.6 + end + + it "Exceptions in input are raised in the correct sequence but do NOT stop processing" do + input = TestEnumerable.new(0.5,0.3,0.1) do + raise 'hi' + end + results = [] + enum = parallelize(input) { |x| sleep(x); x } + expect { enum.each { |value| results << value } }.to raise_error 'hi' + elapsed_time.should < 0.6 + results.should == [ 0.5, 0.3, 0.1 ] + end + + it "Exceptions in output are raised in the correct sequence and running processes do NOT stop processing" do + processed = 0 + enum = parallelize([1,2,'x',3]) do |x| + if x == 'x' + sleep(0.1) + raise 'hi' + end + sleep(0.2) + processed += 1 + x + end + results = [] + expect { enum.each { |value| results << value } }.to raise_error 'hi' + results.should == [ 1, 2 ] + elapsed_time.should < 0.3 + processed.should == 3 + end + + it "Exceptions with :stop_on_exception are raised after all processing is done" do + processed = 0 + parallelized = parallelize([0.3,0.3,'x',0.3,0.3,0.3,0.3,0.3], :ordered => false, :stop_on_exception => true) do |x| + if x == 'x' + sleep(0.1) + raise 'hi' + end + sleep(x) + processed += 1 + x + end + expect { parallelized.to_a }.to raise_error 'hi' + processed.should == 4 + end + end + + it "When the input is slow, output still proceeds" do + input = TestEnumerable.new do |&block| + block.call(1) + sleep 0.1 + block.call(2) + sleep 0.1 + block.call(3) + sleep 0.1 + end + enum = parallelize(input) { |x| x } + enum.map do |value| + elapsed_time.should < (value+1)*0.1 + value + end.should == [ 1, 2, 3 ] + end + end + + context "With a Parallelizer with 1 thread" do + let :parallelizer do + Chef::ChefFS::Parallelizer.new(1) + end + + context "when the thread is occupied with a job" do + before :each do + parallelizer + started = false + @occupying_job_finished = occupying_job_finished = [ false ] + @thread = Thread.new do + begin + parallelizer.parallelize([0], :main_thread_processing => false) do |x| + started = true + sleep(0.3) + occupying_job_finished[0] = true + end.wait + ensure + end + end + while !started + sleep(0.01) + end + end + + after :each do + if RUBY_VERSION.to_f > 1.8 + Thread.kill(@thread) + end + end + + it "parallelize with :main_thread_processing = true does not block" do + parallelizer.parallelize([1]) do |x| + sleep(0.1) + x + end.to_a.should == [ 1 ] + elapsed_time.should < 0.2 + end + + it "parallelize with :main_thread_processing = false waits for the job to finish" do + parallelizer.parallelize([1], :main_thread_processing => false) do |x| + sleep(0.1) + x+1 + end.to_a.should == [ 2 ] + elapsed_time.should > 0.3 + end + + it "resizing the Parallelizer to 0 waits for the job to stop" do + elapsed_time.should < 0.2 + parallelizer.resize(0) + parallelizer.num_threads.should == 0 + elapsed_time.should > 0.25 + @occupying_job_finished.should == [ true ] + end + + it "stopping the Parallelizer waits for the job to finish" do + elapsed_time.should < 0.2 + parallelizer.stop + parallelizer.num_threads.should == 0 + elapsed_time.should > 0.25 + @occupying_job_finished.should == [ true ] + end + + it "resizing the Parallelizer to 2 does not stop the job" do + elapsed_time.should < 0.2 + parallelizer.resize(2) + parallelizer.num_threads.should == 2 + elapsed_time.should < 0.2 + sleep(0.3) + @occupying_job_finished.should == [ true ] + end + end + + context "enumerable methods should run efficiently" do + it ".count does not process anything" do + outputs_processed = 0 + input_mapper = TestEnumerable.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.count.should == 6 + outputs_processed.should == 0 + input_mapper.num_processed.should == 6 + end + + it ".count with arguments works normally" do + outputs_processed = 0 + input_mapper = TestEnumerable.new(1,1,1,1,2,2,2,3,3,4) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + x + end + enum.count { |x| x > 1 }.should == 6 + enum.count(2).should == 3 + outputs_processed.should == 20 + input_mapper.num_processed.should == 20 + end + + it ".first does not enumerate anything other than the first result(s)" do + outputs_processed = 0 + input_mapper = TestEnumerable.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.first.should == 1 + enum.first(2).should == [1,2] + outputs_processed.should == 3 + input_mapper.num_processed.should == 3 + end + + it ".take does not enumerate anything other than the first result(s)" do + outputs_processed = 0 + input_mapper = TestEnumerable.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.take(2).should == [1,2] + outputs_processed.should == 2 + input_mapper.num_processed.should == 2 + end + + it ".drop does not process anything other than the last result(s)" do + outputs_processed = 0 + input_mapper = TestEnumerable.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.drop(2).should == [3,4,5,6] + outputs_processed.should == 4 + input_mapper.num_processed.should == 6 + end + + if Enumerable.method_defined?(:lazy) + it ".lazy.take does not enumerate anything other than the first result(s)" do + outputs_processed = 0 + input_mapper = TestEnumerable.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.lazy.take(2).to_a.should == [1,2] + outputs_processed.should == 2 + input_mapper.num_processed.should == 2 + end + + it ".drop does not process anything other than the last result(s)" do + outputs_processed = 0 + input_mapper = TestEnumerable.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.lazy.drop(2).to_a.should == [3,4,5,6] + outputs_processed.should == 4 + input_mapper.num_processed.should == 6 + end + + it "lazy enumerable is actually lazy" do + outputs_processed = 0 + input_mapper = TestEnumerable.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.lazy.take(2) + enum.lazy.drop(2) + sleep(0.1) + outputs_processed.should == 0 + input_mapper.num_processed.should == 0 + end + end + end + + context "running enumerable multiple times should function correctly" do + it ".map twice on the same parallel enumerable returns the correct results and re-processes the input" do + outputs_processed = 0 + input_mapper = TestEnumerable.new(1,2,3) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + x + end + enum.map { |x| x }.should == [1,2,3] + enum.map { |x| x }.should == [1,2,3] + outputs_processed.should == 6 + input_mapper.num_processed.should == 6 + end + + it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input" do + outputs_processed = 0 + input_mapper = TestEnumerable.new(1,2,3) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + x + end + enum.first.should == 1 + enum.map { |x| x }.should == [1,2,3] + outputs_processed.should >= 4 + input_mapper.num_processed.should >= 4 + end + + it "two simultaneous enumerations throws an exception" do + enum = parallelizer.parallelize([1,2,3]) { |x| x } + a = enum.enum_for(:each) + a.next + expect do + b = enum.enum_for(:each) + b.next + end.to raise_error + end + end + end + + context "With a Parallelizer with 0 threads" do + let :parallelizer do + Chef::ChefFS::Parallelizer.new(0) + end + + context "And main_thread_processing on" do + it "succeeds in running" do + parallelizer.parallelize([0.5]) { |x| x*2 }.to_a.should == [1] + end + end + end + + context "With a Parallelizer with 10 threads" do + let :parallelizer do + Chef::ChefFS::Parallelizer.new(10) + end + + it "does not have contention issues with large numbers of inputs" do + parallelizer.parallelize(1.upto(500)) { |x| x+1 }.to_a.should == 2.upto(501).to_a + end + + it "does not have contention issues with large numbers of inputs with ordering off" do + parallelizer.parallelize(1.upto(500), :ordered => false) { |x| x+1 }.to_a.sort.should == 2.upto(501).to_a + end + + it "does not have contention issues with large numbers of jobs and inputs with ordering off" do + parallelizers = 0.upto(99).map do + parallelizer.parallelize(1.upto(500)) { |x| x+1 } + end + outputs = [] + threads = 0.upto(99).map do |i| + Thread.new { outputs[i] = parallelizers[i].to_a } + end + threads.each { |thread| thread.join } + outputs.each { |output| output.sort.should == 2.upto(501).to_a } + end + end + + class TestEnumerable + include Enumerable + + def initialize(*values, &block) + @values = values + @block = block + @num_processed = 0 + end + + attr_reader :num_processed + + def each(&each_block) + @values.each do |value| + @num_processed += 1 + each_block.call(value) + end + if @block + @block.call do |value| + @num_processed += 1 + each_block.call(value) + end + end + end + end +end |