diff options
author | John Keiser <jkeiser@opscode.com> | 2014-05-17 22:41:15 -0700 |
---|---|---|
committer | John Keiser <jkeiser@opscode.com> | 2014-05-17 22:41:15 -0700 |
commit | 31b553dcbcc9623eb75a8faa11c01e627851bd37 (patch) | |
tree | 55e5c699b0917d709cb033d055469114f9c8ef4e /lib/chef/chef_fs/parallelizer.rb | |
parent | ede39a9532b56f24a77efe34086d0e8e13082cc2 (diff) | |
download | chef-31b553dcbcc9623eb75a8faa11c01e627851bd37.tar.gz |
Add each_with_exceptions to allow all results to be known
Diffstat (limited to 'lib/chef/chef_fs/parallelizer.rb')
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 103 |
1 files changed, 47 insertions, 56 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb index 2c58323543..5b575bb207 100644 --- a/lib/chef/chef_fs/parallelizer.rb +++ b/lib/chef/chef_fs/parallelizer.rb @@ -78,7 +78,6 @@ class Chef # NOTE: If you set this to false, parallelizer.kill will stop each() # in its tracks, so you need to know for sure that won't happen. def initialize(parent_task_queue, enumerable, options, &block) - @parent_task_queue = parent_task_queue @enumerable = enumerable @options = options @@ -91,72 +90,71 @@ class Chef end def each - if @options[:ordered] == false - each_with_input_unordered do |input, output, index| - yield output - end - else - each_with_input_ordered do |input, output, index| - yield output - end + each_with_input do |output, index, input, type| + yield output end end def each_with_index - if @options[:ordered] == false - each_with_input_unordered do |input, output, index| - yield output, index - end - else - each_with_input_ordered do |input, output, index| - yield output, index + each_with_input do |output, index, input| + yield output, index + end + end + + def each_with_input + exception = nil + each_with_exceptions do |output, index, input, type| + if type == :exception + if @options[:ordered] == false + exception ||= output + else + raise output + end + else + yield output, index, input end end + raise exception if exception end - def each_with_input(&block) + def each_with_exceptions(&block) if @options[:ordered] == false - each_with_input_unordered(&block) + each_with_exceptions_unordered(&block) else - each_with_input_ordered(&block) + each_with_exceptions_ordered(&block) end end def wait - each_with_input_unordered do |type, input, output, index| + exception = nil + each_with_exceptions_unordered do |output, index, input, type| + exception ||= output if type == :exception end + raise exception if exception end - def each_with_exceptions - end + private - def each_with_input_unordered + def each_with_exceptions_unordered # Grab all the inputs, yielding any responses during enumeration # in case the enumeration itself takes time - exception = nil - begin @enumerable.each_with_index do |input, index| @unconsumed_input.push([ input, index ]) @parent_task_queue.push(method(:process_one)) no_more_inputs = false while !@unconsumed_output.empty? - type, input, output, index = @unconsumed_output.pop - if type == :exception - exception ||= output - if @options[:stop_on_exception] - @unconsumed_input.clear - no_more_inputs = true - end - else - yield input, output, index + output, index, input, type = @unconsumed_output.pop + yield output, index, input, type + if type == :exception && @options[:stop_on_exception] + no_more_inputs = true end end break if no_more_inputs end rescue # We still want to wait for the rest of the outputs to process - @unconsumed_output.push([:exception, nil, $!, nil]) + @unconsumed_output.push([$!, nil, nil, :exception]) if @options[:stop_on_exception] @unconsumed_input.clear end @@ -169,15 +167,7 @@ class Chef end while !@unconsumed_output.empty? - type, input, output, index = @unconsumed_output.pop - if type == :exception - exception ||= output - if @options[:stop_on_exception] - @unconsumed_input.clear - end - else - yield input, output, index - end + yield @unconsumed_output.pop end # If no one is working on our tasks and we're allowed to @@ -187,27 +177,25 @@ class Chef process_one end end - - if exception - raise exception - end end - def each_with_input_ordered + def each_with_exceptions_ordered next_to_yield = 0 unconsumed = {} - each_with_input_unordered do |input, output, index| - unconsumed[index] = [ input, output ] + each_with_exceptions_unordered do |output, index, input, type| + unconsumed[index] = [ output, input, type ] while unconsumed[next_to_yield] input_output = unconsumed.delete(next_to_yield) - yield input_output[0], input_output[1], next_to_yield + yield input_output[0], next_to_yield, input_output[1], input_output[2] next_to_yield += 1 end end + input_exception = unconsumed.delete(nil) + if input_exception + yield input_exception[0], next_to_yield, input_exception[1], input_exception[2] + end end - private - def process_one @in_process += 1 begin @@ -224,9 +212,12 @@ class Chef def process_input(input, index) begin output = @block.call(input) - @unconsumed_output.push([ :result, input, output, index ]) + @unconsumed_output.push([ output, index, input, :result ]) rescue - @unconsumed_output.push([ :exception, input, $!, index ]) + if @options[:stop_on_exception] + @unconsumed_input.clear + end + @unconsumed_output.push([ $!, index, input, :exception ]) end index |