diff options
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 39 | ||||
-rw-r--r-- | lib/chef/chef_fs/parallelizer/parallel_enumerable.rb | 96 | ||||
-rw-r--r-- | spec/unit/chef_fs/parallelizer.rb | 57 |
3 files changed, 142 insertions, 50 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb index 3847934bf0..8e49e155df 100644 --- a/lib/chef/chef_fs/parallelizer.rb +++ b/lib/chef/chef_fs/parallelizer.rb @@ -36,10 +36,26 @@ class Chef resize(num_threads) end - def resize(num_threads, wait = true, timeout = nil) - if num_threads < @threads.size - threads_to_stop = @threads[num_threads..@threads.size-1] - @threads = @threads[0..num_threads-1] + def num_threads + @threads.size + end + + def parallelize(enumerable, options = {}, &block) + ParallelEnumerable.new(@tasks, enumerable, options, &block) + end + + def parallel_do(enumerable, options = {}, &block) + ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait + end + + def stop(wait = true, timeout = nil) + resize(0, wait, timeout) + end + + def resize(to_threads, wait = true, timeout = nil) + if to_threads < num_threads + threads_to_stop = @threads[to_threads..num_threads-1] + @threads = @threads.slice(0, to_threads) threads_to_stop.each do |thread| @stop_thread[thread] = true end @@ -53,27 +69,16 @@ class Chef end else - @threads.size.upto(num_threads - 1) do |i| + num_threads.upto(to_threads - 1) do |i| @threads[i] = Thread.new(&method(:worker_loop)) end end end - def parallelize(enumerable, options = {}, &block) - ParallelEnumerable.new(@tasks, enumerable, options, &block) - end - - def parallel_do(enumerable, options = {}, &block) - ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait - end - - def stop(wait = true, timeout = nil) - resize(0, wait, timeout) - end - def kill @threads.each do |thread| Thread.kill(thread) + @stop_thread.delete(thread) end @threads = [] end diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb index b293497889..6bd4a14fc3 100644 --- a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb +++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb @@ -25,7 +25,6 @@ class Chef @unconsumed_input = Queue.new @in_process = 0 - @unconsumed_output = Queue.new end @@ -77,49 +76,72 @@ class Chef FlattenEnumerable.new(self, levels) end + # TODO efficient implementation for + # count, first, drop, take, skip: run the method on the input enumerable + # and use that as our each + private def each_with_exceptions_unordered - # Grab all the inputs, yielding any responses during enumeration - # in case the enumeration itself takes time + if @each_running + raise "each() called on parallel enumerable twice simultaneously! Bad mojo" + end + @each_running = true begin - @enumerable.each_with_index do |input, index| - @unconsumed_input.push([ input, index ]) - @parent_task_queue.push(method(:process_one)) - no_more_inputs = false - while !@unconsumed_output.empty? - output, index, input, type = @unconsumed_output.pop - yield output, index, input, type - if type == :exception && @options[:stop_on_exception] - no_more_inputs = true + # Grab all the inputs, yielding any responses during enumeration + # in case the enumeration itself takes time + begin + @enumerable.each_with_index do |input, index| + @unconsumed_input.push([ input, index ]) + @parent_task_queue.push(method(:process_one)) + + stop_processing_input = false + while !@unconsumed_output.empty? + output, index, input, type = @unconsumed_output.pop + yield output, index, input, type + if type == :exception && @options[:stop_on_exception] + stop_processing_input = true + break + end + end + + if stop_processing_input + break end end - break if no_more_inputs - end - rescue - # We still want to wait for the rest of the outputs to process - @unconsumed_output.push([$!, nil, nil, :exception]) - if @options[:stop_on_exception] - @unconsumed_input.clear + rescue + # We still want to wait for the rest of the outputs to process + @unconsumed_output.push([$!, nil, nil, :exception]) + if @options[:stop_on_exception] + @unconsumed_input.clear + end end - end - while !finished? - # yield thread to others (for 1.8.7) - if @unconsumed_output.empty? - sleep(0.01) - end + while !finished? + # yield thread to others (for 1.8.7) + if @unconsumed_output.empty? + sleep(0.01) + end - while !@unconsumed_output.empty? - yield @unconsumed_output.pop - end + while !@unconsumed_output.empty? + yield @unconsumed_output.pop + end - # If no one is working on our tasks and we're allowed to - # work on them in the main thread, process an input to - # move things forward. - if @in_process == 0 && !(@options[:main_thread_processing] == false) - process_one + # If no one is working on our tasks and we're allowed to + # work on them in the main thread, process an input to + # move things forward. + if @in_process == 0 && !(@options[:main_thread_processing] == false) + process_one + end + end + ensure + # If someone called "first" or something that exits the enumerator + # early, we want to make sure and throw away any extra results + # (gracefully) so that the next enumerator can start over. + if !finished? + stop end + @each_running = false end end @@ -140,6 +162,14 @@ class Chef end end + def stop + @unconsumed_input.clear + while @in_process > 0 + sleep(0.05) + end + @unconsumed_output.clear + end + # # This is thread safe only if called from the main thread pulling on each(). # The order of these checks is important, as well, to be thread safe. diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb index c474a4774d..a91864fc2e 100644 --- a/spec/unit/chef_fs/parallelizer.rb +++ b/spec/unit/chef_fs/parallelizer.rb @@ -263,6 +263,7 @@ describe Chef::ChefFS::Parallelizer do it "resizing the Parallelizer to 0 waits for the job to stop" do elapsed_time.should < 0.2 parallelizer.resize(0) + parallelizer.num_threads.should == 0 elapsed_time.should > 0.25 @occupying_job_finished.should == [ true ] end @@ -270,6 +271,7 @@ describe Chef::ChefFS::Parallelizer do it "stopping the Parallelizer waits for the job to finish" do elapsed_time.should < 0.2 parallelizer.stop + parallelizer.num_threads.should == 0 elapsed_time.should > 0.25 @occupying_job_finished.should == [ true ] end @@ -277,11 +279,66 @@ describe Chef::ChefFS::Parallelizer do it "resizing the Parallelizer to 2 does not stop the job" do elapsed_time.should < 0.2 parallelizer.resize(2) + parallelizer.num_threads.should == 2 elapsed_time.should < 0.2 sleep(0.3) @occupying_job_finished.should == [ true ] end end + + class InputMapper + include Enumerable + + def initialize(*values) + @values = values + @num_processed = 0 + end + + attr_reader :num_processed + + def each + @values.each do |value| + @num_processed += 1 + yield value + end + end + end + + it ".map twice on the same parallel enumerable returns the correct results and re-processes the input", :focus do + outputs_processed = 0 + input_mapper = InputMapper.new(1,2,3) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + x + end + enum.map { |x| x }.should == [1,2,3] + enum.map { |x| x }.should == [1,2,3] + outputs_processed.should == 6 + input_mapper.num_processed.should == 6 + end + + it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input", :focus do + outputs_processed = 0 + input_mapper = InputMapper.new(1,2,3) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + x + end + enum.first.should == 1 + enum.map { |x| x }.should == [1,2,3] + outputs_processed.should >= 4 + input_mapper.num_processed.should >= 4 + end + + it "two simultaneous enumerations throws an exception", :focus do + enum = parallelizer.parallelize([1,2,3]) { |x| x } + a = enum.enum_for(:each) + a.next + expect do + b = enum.enum_for(:each) + b.next + end.to raise_error + end end context "With a Parallelizer with 0 threads" do |