summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-17 22:03:31 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-17 22:03:31 -0700
commitede39a9532b56f24a77efe34086d0e8e13082cc2 (patch)
tree99cd2a07fc5e5d0ad4d220792de2d33b50eecfd2
parentb198db2ddf1664cd32389f13ad6284b66d80e9cb (diff)
downloadchef-ede39a9532b56f24a77efe34086d0e8e13082cc2.tar.gz
Add :stop_on_exception to stop consuming input on exception
-rw-r--r--lib/chef/chef_fs/parallelizer.rb105
-rw-r--r--spec/unit/chef_fs/parallelizer.rb27
2 files changed, 88 insertions, 44 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 25aaffaab0..2c58323543 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -70,52 +70,99 @@ class Chef
# :ordered [true|false] - whether the output should stay in the same order
# as the input (even though it may not actually be processed in that
# order). Default: true
+ # :stop_on_exception [true|false] - if true, when an exception occurs in either
+ # input or output, we wait for any outstanding processing to complete,
+ # but will not process any new inputs. Default: false
# :main_thread_processing [true|false] - whether the main thread pulling
# on each() is allowed to process inputs. Default: true
# NOTE: If you set this to false, parallelizer.kill will stop each()
# in its tracks, so you need to know for sure that won't happen.
def initialize(parent_task_queue, enumerable, options, &block)
- @task_queue = Queue.new
+
@parent_task_queue = parent_task_queue
@enumerable = enumerable
@options = options
@block = block
- @unconsumed_output = Queue.new
+
+ @unconsumed_input = Queue.new
@in_process = 0
+
+ @unconsumed_output = Queue.new
+ end
+
+ def each
+ if @options[:ordered] == false
+ each_with_input_unordered do |input, output, index|
+ yield output
+ end
+ else
+ each_with_input_ordered do |input, output, index|
+ yield output
+ end
+ end
+ end
+
+ def each_with_index
+ if @options[:ordered] == false
+ each_with_input_unordered do |input, output, index|
+ yield output, index
+ end
+ else
+ each_with_input_ordered do |input, output, index|
+ yield output, index
+ end
+ end
+ end
+
+ def each_with_input(&block)
+ if @options[:ordered] == false
+ each_with_input_unordered(&block)
+ else
+ each_with_input_ordered(&block)
+ end
end
def wait
- each_with_input_unordered do |input, output, index|
+ each_with_input_unordered do |type, input, output, index|
end
end
- def each_with_input_unordered
- awaiting_output = 0
+ def each_with_exceptions
+ end
+ def each_with_input_unordered
# Grab all the inputs, yielding any responses during enumeration
# in case the enumeration itself takes time
+ exception = nil
+
begin
@enumerable.each_with_index do |input, index|
- awaiting_output += 1
- @task_queue.push([ input, index ])
+ @unconsumed_input.push([ input, index ])
@parent_task_queue.push(method(:process_one))
+ no_more_inputs = false
while !@unconsumed_output.empty?
type, input, output, index = @unconsumed_output.pop
if type == :exception
exception ||= output
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ no_more_inputs = true
+ end
else
yield input, output, index
end
- awaiting_output -= 1
end
+ break if no_more_inputs
end
rescue
# We still want to wait for the rest of the outputs to process
- awaiting_output += 1
@unconsumed_output.push([:exception, nil, $!, nil])
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
end
- while awaiting_output > 0
+ while !@unconsumed_input.empty? || @in_process > 0 || !@unconsumed_output.empty?
# yield thread to others (for 1.8.7)
if @unconsumed_output.empty?
sleep(0.01)
@@ -125,10 +172,12 @@ class Chef
type, input, output, index = @unconsumed_output.pop
if type == :exception
exception ||= output
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
else
yield input, output, index
end
- awaiting_output -= 1
end
# If no one is working on our tasks and we're allowed to
@@ -157,45 +206,13 @@ class Chef
end
end
- def each_with_input(&block)
- if @options[:ordered] == false
- each_with_input_unordered(&block)
- else
- each_with_input_ordered(&block)
- end
- end
-
- def each_with_index
- if @options[:ordered] == false
- each_with_input_unordered do |input, output, index|
- yield output, index
- end
- else
- each_with_input_ordered do |input, output, index|
- yield output, index
- end
- end
- end
-
- def each
- if @options[:ordered] == false
- each_with_input_unordered do |input, output, index|
- yield output
- end
- else
- each_with_input_ordered do |input, output, index|
- yield output
- end
- end
- end
-
private
def process_one
@in_process += 1
begin
begin
- input, index = @task_queue.pop(true)
+ input, index = @unconsumed_input.pop(true)
process_input(input, index)
rescue ThreadError
end
diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb
index 971949251c..7823c307f9 100644
--- a/spec/unit/chef_fs/parallelizer.rb
+++ b/spec/unit/chef_fs/parallelizer.rb
@@ -101,6 +101,20 @@ describe Chef::ChefFS::Parallelizer do
elapsed_time.should < 0.4
processed.should == 3
end
+
+ it "Exceptions with :stop_on_exception are raised after all processing is done" do
+ processed = 0
+ parallelized = parallelize([0.3,0.3,'x',0.3,0.3,0.3,0.3,0.3], :ordered => false, :stop_on_exception => true) do |x|
+ raise 'hi' if x == 'x'
+ sleep(x)
+ processed += 1
+ x
+ end
+ expect { parallelized.to_a }.to raise_error 'hi'
+ processed.should <= 5
+ processed.should >= 2
+ end
+
end
context "With :ordered => true (ordered output)" do
@@ -154,6 +168,19 @@ describe Chef::ChefFS::Parallelizer do
elapsed_time.should < 0.55
processed.should == 3
end
+
+ it "Exceptions with :stop_on_exception are raised after all processing is done" do
+ processed = 0
+ parallelized = parallelize([0.3,0.3,'x',0.3,0.3,0.3,0.3,0.3], :stop_on_exception => true) do |x|
+ raise 'hi' if x == 'x'
+ sleep(x)
+ processed += 1
+ x
+ end
+ expect { parallelized.to_a }.to raise_error 'hi'
+ processed.should <= 5
+ processed.should >= 2
+ end
end
end