diff options
author | John Keiser <jkeiser@opscode.com> | 2014-05-17 22:03:31 -0700 |
---|---|---|
committer | John Keiser <jkeiser@opscode.com> | 2014-05-17 22:03:31 -0700 |
commit | ede39a9532b56f24a77efe34086d0e8e13082cc2 (patch) | |
tree | 99cd2a07fc5e5d0ad4d220792de2d33b50eecfd2 | |
parent | b198db2ddf1664cd32389f13ad6284b66d80e9cb (diff) | |
download | chef-ede39a9532b56f24a77efe34086d0e8e13082cc2.tar.gz |
Add :stop_on_exception to stop consuming input on exception
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 105 | ||||
-rw-r--r-- | spec/unit/chef_fs/parallelizer.rb | 27 |
2 files changed, 88 insertions, 44 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb index 25aaffaab0..2c58323543 100644 --- a/lib/chef/chef_fs/parallelizer.rb +++ b/lib/chef/chef_fs/parallelizer.rb @@ -70,52 +70,99 @@ class Chef # :ordered [true|false] - whether the output should stay in the same order # as the input (even though it may not actually be processed in that # order). Default: true + # :stop_on_exception [true|false] - if true, when an exception occurs in either + # input or output, we wait for any outstanding processing to complete, + # but will not process any new inputs. Default: false # :main_thread_processing [true|false] - whether the main thread pulling # on each() is allowed to process inputs. Default: true # NOTE: If you set this to false, parallelizer.kill will stop each() # in its tracks, so you need to know for sure that won't happen. def initialize(parent_task_queue, enumerable, options, &block) - @task_queue = Queue.new + @parent_task_queue = parent_task_queue @enumerable = enumerable @options = options @block = block - @unconsumed_output = Queue.new + + @unconsumed_input = Queue.new @in_process = 0 + + @unconsumed_output = Queue.new + end + + def each + if @options[:ordered] == false + each_with_input_unordered do |input, output, index| + yield output + end + else + each_with_input_ordered do |input, output, index| + yield output + end + end + end + + def each_with_index + if @options[:ordered] == false + each_with_input_unordered do |input, output, index| + yield output, index + end + else + each_with_input_ordered do |input, output, index| + yield output, index + end + end + end + + def each_with_input(&block) + if @options[:ordered] == false + each_with_input_unordered(&block) + else + each_with_input_ordered(&block) + end end def wait - each_with_input_unordered do |input, output, index| + each_with_input_unordered do |type, input, output, index| end end - def each_with_input_unordered - awaiting_output = 0 + def each_with_exceptions + end + def each_with_input_unordered # Grab all the inputs, yielding any responses during enumeration # in case the enumeration itself takes time + exception = nil + begin @enumerable.each_with_index do |input, index| - awaiting_output += 1 - @task_queue.push([ input, index ]) + @unconsumed_input.push([ input, index ]) @parent_task_queue.push(method(:process_one)) + no_more_inputs = false while !@unconsumed_output.empty? type, input, output, index = @unconsumed_output.pop if type == :exception exception ||= output + if @options[:stop_on_exception] + @unconsumed_input.clear + no_more_inputs = true + end else yield input, output, index end - awaiting_output -= 1 end + break if no_more_inputs end rescue # We still want to wait for the rest of the outputs to process - awaiting_output += 1 @unconsumed_output.push([:exception, nil, $!, nil]) + if @options[:stop_on_exception] + @unconsumed_input.clear + end end - while awaiting_output > 0 + while !@unconsumed_input.empty? || @in_process > 0 || !@unconsumed_output.empty? # yield thread to others (for 1.8.7) if @unconsumed_output.empty? sleep(0.01) @@ -125,10 +172,12 @@ class Chef type, input, output, index = @unconsumed_output.pop if type == :exception exception ||= output + if @options[:stop_on_exception] + @unconsumed_input.clear + end else yield input, output, index end - awaiting_output -= 1 end # If no one is working on our tasks and we're allowed to @@ -157,45 +206,13 @@ class Chef end end - def each_with_input(&block) - if @options[:ordered] == false - each_with_input_unordered(&block) - else - each_with_input_ordered(&block) - end - end - - def each_with_index - if @options[:ordered] == false - each_with_input_unordered do |input, output, index| - yield output, index - end - else - each_with_input_ordered do |input, output, index| - yield output, index - end - end - end - - def each - if @options[:ordered] == false - each_with_input_unordered do |input, output, index| - yield output - end - else - each_with_input_ordered do |input, output, index| - yield output - end - end - end - private def process_one @in_process += 1 begin begin - input, index = @task_queue.pop(true) + input, index = @unconsumed_input.pop(true) process_input(input, index) rescue ThreadError end diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb index 971949251c..7823c307f9 100644 --- a/spec/unit/chef_fs/parallelizer.rb +++ b/spec/unit/chef_fs/parallelizer.rb @@ -101,6 +101,20 @@ describe Chef::ChefFS::Parallelizer do elapsed_time.should < 0.4 processed.should == 3 end + + it "Exceptions with :stop_on_exception are raised after all processing is done" do + processed = 0 + parallelized = parallelize([0.3,0.3,'x',0.3,0.3,0.3,0.3,0.3], :ordered => false, :stop_on_exception => true) do |x| + raise 'hi' if x == 'x' + sleep(x) + processed += 1 + x + end + expect { parallelized.to_a }.to raise_error 'hi' + processed.should <= 5 + processed.should >= 2 + end + end context "With :ordered => true (ordered output)" do @@ -154,6 +168,19 @@ describe Chef::ChefFS::Parallelizer do elapsed_time.should < 0.55 processed.should == 3 end + + it "Exceptions with :stop_on_exception are raised after all processing is done" do + processed = 0 + parallelized = parallelize([0.3,0.3,'x',0.3,0.3,0.3,0.3,0.3], :stop_on_exception => true) do |x| + raise 'hi' if x == 'x' + sleep(x) + processed += 1 + x + end + expect { parallelized.to_a }.to raise_error 'hi' + processed.should <= 5 + processed.should >= 2 + end end end |