summaryrefslogtreecommitdiff
path: root/lib/chef/chef_fs/parallelizer.rb
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-17 22:41:15 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-17 22:41:15 -0700
commit31b553dcbcc9623eb75a8faa11c01e627851bd37 (patch)
tree55e5c699b0917d709cb033d055469114f9c8ef4e /lib/chef/chef_fs/parallelizer.rb
parentede39a9532b56f24a77efe34086d0e8e13082cc2 (diff)
downloadchef-31b553dcbcc9623eb75a8faa11c01e627851bd37.tar.gz
Add each_with_exceptions to allow all results to be known
Diffstat (limited to 'lib/chef/chef_fs/parallelizer.rb')
-rw-r--r--lib/chef/chef_fs/parallelizer.rb103
1 files changed, 47 insertions, 56 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 2c58323543..5b575bb207 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -78,7 +78,6 @@ class Chef
# NOTE: If you set this to false, parallelizer.kill will stop each()
# in its tracks, so you need to know for sure that won't happen.
def initialize(parent_task_queue, enumerable, options, &block)
-
@parent_task_queue = parent_task_queue
@enumerable = enumerable
@options = options
@@ -91,72 +90,71 @@ class Chef
end
def each
- if @options[:ordered] == false
- each_with_input_unordered do |input, output, index|
- yield output
- end
- else
- each_with_input_ordered do |input, output, index|
- yield output
- end
+ each_with_input do |output, index, input, type|
+ yield output
end
end
def each_with_index
- if @options[:ordered] == false
- each_with_input_unordered do |input, output, index|
- yield output, index
- end
- else
- each_with_input_ordered do |input, output, index|
- yield output, index
+ each_with_input do |output, index, input|
+ yield output, index
+ end
+ end
+
+ def each_with_input
+ exception = nil
+ each_with_exceptions do |output, index, input, type|
+ if type == :exception
+ if @options[:ordered] == false
+ exception ||= output
+ else
+ raise output
+ end
+ else
+ yield output, index, input
end
end
+ raise exception if exception
end
- def each_with_input(&block)
+ def each_with_exceptions(&block)
if @options[:ordered] == false
- each_with_input_unordered(&block)
+ each_with_exceptions_unordered(&block)
else
- each_with_input_ordered(&block)
+ each_with_exceptions_ordered(&block)
end
end
def wait
- each_with_input_unordered do |type, input, output, index|
+ exception = nil
+ each_with_exceptions_unordered do |output, index, input, type|
+ exception ||= output if type == :exception
end
+ raise exception if exception
end
- def each_with_exceptions
- end
+ private
- def each_with_input_unordered
+ def each_with_exceptions_unordered
# Grab all the inputs, yielding any responses during enumeration
# in case the enumeration itself takes time
- exception = nil
-
begin
@enumerable.each_with_index do |input, index|
@unconsumed_input.push([ input, index ])
@parent_task_queue.push(method(:process_one))
no_more_inputs = false
while !@unconsumed_output.empty?
- type, input, output, index = @unconsumed_output.pop
- if type == :exception
- exception ||= output
- if @options[:stop_on_exception]
- @unconsumed_input.clear
- no_more_inputs = true
- end
- else
- yield input, output, index
+ output, index, input, type = @unconsumed_output.pop
+ yield output, index, input, type
+ if type == :exception && @options[:stop_on_exception]
+ no_more_inputs = true
end
end
break if no_more_inputs
end
rescue
# We still want to wait for the rest of the outputs to process
- @unconsumed_output.push([:exception, nil, $!, nil])
+ @unconsumed_output.push([$!, nil, nil, :exception])
if @options[:stop_on_exception]
@unconsumed_input.clear
end
@@ -169,15 +167,7 @@ class Chef
end
while !@unconsumed_output.empty?
- type, input, output, index = @unconsumed_output.pop
- if type == :exception
- exception ||= output
- if @options[:stop_on_exception]
- @unconsumed_input.clear
- end
- else
- yield input, output, index
- end
+ yield @unconsumed_output.pop
end
# If no one is working on our tasks and we're allowed to
@@ -187,27 +177,25 @@ class Chef
process_one
end
end
-
- if exception
- raise exception
- end
end
- def each_with_input_ordered
+ def each_with_exceptions_ordered
next_to_yield = 0
unconsumed = {}
- each_with_input_unordered do |input, output, index|
- unconsumed[index] = [ input, output ]
+ each_with_exceptions_unordered do |output, index, input, type|
+ unconsumed[index] = [ output, input, type ]
while unconsumed[next_to_yield]
input_output = unconsumed.delete(next_to_yield)
- yield input_output[0], input_output[1], next_to_yield
+ yield input_output[0], next_to_yield, input_output[1], input_output[2]
next_to_yield += 1
end
end
+ input_exception = unconsumed.delete(nil)
+ if input_exception
+ yield input_exception[0], next_to_yield, input_exception[1], input_exception[2]
+ end
end
- private
-
def process_one
@in_process += 1
begin
@@ -224,9 +212,12 @@ class Chef
def process_input(input, index)
begin
output = @block.call(input)
- @unconsumed_output.push([ :result, input, output, index ])
+ @unconsumed_output.push([ output, index, input, :result ])
rescue
- @unconsumed_output.push([ :exception, input, $!, index ])
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
+ @unconsumed_output.push([ $!, index, input, :exception ])
end
index