diff options
author | John Keiser <jkeiser@opscode.com> | 2014-05-17 22:03:31 -0700 |
---|---|---|
committer | John Keiser <jkeiser@opscode.com> | 2014-05-17 22:03:31 -0700 |
commit | ede39a9532b56f24a77efe34086d0e8e13082cc2 (patch) | |
tree | 99cd2a07fc5e5d0ad4d220792de2d33b50eecfd2 /lib/chef/chef_fs/parallelizer.rb | |
parent | b198db2ddf1664cd32389f13ad6284b66d80e9cb (diff) | |
download | chef-ede39a9532b56f24a77efe34086d0e8e13082cc2.tar.gz |
Add :stop_on_exception to stop consuming input on exception
Diffstat (limited to 'lib/chef/chef_fs/parallelizer.rb')
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 105 |
1 files changed, 61 insertions, 44 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb index 25aaffaab0..2c58323543 100644 --- a/lib/chef/chef_fs/parallelizer.rb +++ b/lib/chef/chef_fs/parallelizer.rb @@ -70,52 +70,99 @@ class Chef # :ordered [true|false] - whether the output should stay in the same order # as the input (even though it may not actually be processed in that # order). Default: true + # :stop_on_exception [true|false] - if true, when an exception occurs in either + # input or output, we wait for any outstanding processing to complete, + # but will not process any new inputs. Default: false # :main_thread_processing [true|false] - whether the main thread pulling # on each() is allowed to process inputs. Default: true # NOTE: If you set this to false, parallelizer.kill will stop each() # in its tracks, so you need to know for sure that won't happen. def initialize(parent_task_queue, enumerable, options, &block) - @task_queue = Queue.new + @parent_task_queue = parent_task_queue @enumerable = enumerable @options = options @block = block - @unconsumed_output = Queue.new + + @unconsumed_input = Queue.new @in_process = 0 + + @unconsumed_output = Queue.new + end + + def each + if @options[:ordered] == false + each_with_input_unordered do |input, output, index| + yield output + end + else + each_with_input_ordered do |input, output, index| + yield output + end + end + end + + def each_with_index + if @options[:ordered] == false + each_with_input_unordered do |input, output, index| + yield output, index + end + else + each_with_input_ordered do |input, output, index| + yield output, index + end + end + end + + def each_with_input(&block) + if @options[:ordered] == false + each_with_input_unordered(&block) + else + each_with_input_ordered(&block) + end end def wait - each_with_input_unordered do |input, output, index| + each_with_input_unordered do |type, input, output, index| end end - def each_with_input_unordered - awaiting_output = 0 + def each_with_exceptions + end + def each_with_input_unordered # Grab all the inputs, yielding any responses during enumeration # in case the enumeration itself takes time + exception = nil + begin @enumerable.each_with_index do |input, index| - awaiting_output += 1 - @task_queue.push([ input, index ]) + @unconsumed_input.push([ input, index ]) @parent_task_queue.push(method(:process_one)) + no_more_inputs = false while !@unconsumed_output.empty? type, input, output, index = @unconsumed_output.pop if type == :exception exception ||= output + if @options[:stop_on_exception] + @unconsumed_input.clear + no_more_inputs = true + end else yield input, output, index end - awaiting_output -= 1 end + break if no_more_inputs end rescue # We still want to wait for the rest of the outputs to process - awaiting_output += 1 @unconsumed_output.push([:exception, nil, $!, nil]) + if @options[:stop_on_exception] + @unconsumed_input.clear + end end - while awaiting_output > 0 + while !@unconsumed_input.empty? || @in_process > 0 || !@unconsumed_output.empty? # yield thread to others (for 1.8.7) if @unconsumed_output.empty? sleep(0.01) @@ -125,10 +172,12 @@ class Chef type, input, output, index = @unconsumed_output.pop if type == :exception exception ||= output + if @options[:stop_on_exception] + @unconsumed_input.clear + end else yield input, output, index end - awaiting_output -= 1 end # If no one is working on our tasks and we're allowed to @@ -157,45 +206,13 @@ class Chef end end - def each_with_input(&block) - if @options[:ordered] == false - each_with_input_unordered(&block) - else - each_with_input_ordered(&block) - end - end - - def each_with_index - if @options[:ordered] == false - each_with_input_unordered do |input, output, index| - yield output, index - end - else - each_with_input_ordered do |input, output, index| - yield output, index - end - end - end - - def each - if @options[:ordered] == false - each_with_input_unordered do |input, output, index| - yield output - end - else - each_with_input_ordered do |input, output, index| - yield output - end - end - end - private def process_one @in_process += 1 begin begin - input, index = @task_queue.pop(true) + input, index = @unconsumed_input.pop(true) process_input(input, index) rescue ThreadError end |