diff options
author | John Keiser <jkeiser@opscode.com> | 2014-05-18 18:44:03 -0700 |
---|---|---|
committer | John Keiser <jkeiser@opscode.com> | 2014-05-18 18:44:03 -0700 |
commit | a32736de04df3c40fb21ec57db00128cf33e1f7d (patch) | |
tree | 2a98fef314a9dc9135b823283ef9c7864a44faee | |
parent | 31b553dcbcc9623eb75a8faa11c01e627851bd37 (diff) | |
download | chef-a32736de04df3c40fb21ec57db00128cf33e1f7d.tar.gz |
Add ability to flatten parallel enumerable
-rw-r--r-- | lib/chef/chef_fs/command_line.rb | 8 | ||||
-rw-r--r-- | lib/chef/chef_fs/file_system.rb | 6 | ||||
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 167 | ||||
-rw-r--r-- | lib/chef/chef_fs/parallelizer/flatten_enumerable.rb | 35 | ||||
-rw-r--r-- | lib/chef/chef_fs/parallelizer/parallel_enumerable.rb | 189 | ||||
-rw-r--r-- | lib/chef/knife/list.rb | 17 | ||||
-rw-r--r-- | lib/chef/knife/show.rb | 5 | ||||
-rw-r--r-- | spec/integration/knife/chef_repo_path_spec.rb | 2 | ||||
-rw-r--r-- | spec/unit/chef_fs/parallelizer.rb | 24 |
9 files changed, 273 insertions, 180 deletions
diff --git a/lib/chef/chef_fs/command_line.rb b/lib/chef/chef_fs/command_line.rb index d0183a5a2a..967c59ecae 100644 --- a/lib/chef/chef_fs/command_line.rb +++ b/lib/chef/chef_fs/command_line.rb @@ -129,9 +129,9 @@ class Chef end def self.diff(pattern, old_root, new_root, recurse_depth, get_content) - Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root), :flatten => true) do |old_entry, new_entry| + Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root)) do |old_entry, new_entry| diff_entries(old_entry, new_entry, recurse_depth, get_content) - end + end.flatten(1) end # Diff two known entries (could be files or dirs) @@ -142,9 +142,9 @@ class Chef if recurse_depth == 0 return [ [ :common_subdirectories, old_entry, new_entry ] ] else - return Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry), :flatten => true) do |old_child, new_child| + return Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry)) do |old_child, new_child| Chef::ChefFS::CommandLine.diff_entries(old_child, new_child, recurse_depth ? recurse_depth - 1 : nil, get_content) - end + end.flatten(1) end # If old is a directory and new is a file diff --git a/lib/chef/chef_fs/file_system.rb b/lib/chef/chef_fs/file_system.rb index f2478c4680..9fe726744e 100644 --- a/lib/chef/chef_fs/file_system.rb +++ b/lib/chef/chef_fs/file_system.rb @@ -72,8 +72,8 @@ class Chef # Otherwise, go through all children and find any matches elsif entry.dir? - results = Parallelizer::parallelize(entry.children, :flatten => true) { |child| Chef::ChefFS::FileSystem.list(child, pattern) } - results.each(&block) + results = Parallelizer::parallelize(entry.children) { |child| Chef::ChefFS::FileSystem.list(child, pattern) } + results.flatten(1).each(&block) end end end @@ -419,7 +419,7 @@ class Chef end def self.parallel_do(enum, options = {}, &block) - Chef::ChefFS::Parallelizer.parallelize(enum, options, &block).to_a + Chef::ChefFS::Parallelizer.parallel_do(enum, options, &block) end end end diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb index 5b575bb207..8172aaf6e8 100644 --- a/lib/chef/chef_fs/parallelizer.rb +++ b/lib/chef/chef_fs/parallelizer.rb @@ -1,7 +1,13 @@ require 'thread' +require 'chef/chef_fs/parallelizer/parallel_enumerable' class Chef module ChefFS + # Tries to balance several guarantees, in order of priority: + # - don't get deadlocked + # - provide results in desired order + # - provide results as soon as they are available + # - process input as soon as possible class Parallelizer @@parallelizer = nil @@threads = 0 @@ -62,167 +68,6 @@ class Chef end end end - - class ParallelEnumerable - include Enumerable - - # options: - # :ordered [true|false] - whether the output should stay in the same order - # as the input (even though it may not actually be processed in that - # order). Default: true - # :stop_on_exception [true|false] - if true, when an exception occurs in either - # input or output, we wait for any outstanding processing to complete, - # but will not process any new inputs. Default: false - # :main_thread_processing [true|false] - whether the main thread pulling - # on each() is allowed to process inputs. Default: true - # NOTE: If you set this to false, parallelizer.kill will stop each() - # in its tracks, so you need to know for sure that won't happen. - def initialize(parent_task_queue, enumerable, options, &block) - @parent_task_queue = parent_task_queue - @enumerable = enumerable - @options = options - @block = block - - @unconsumed_input = Queue.new - @in_process = 0 - - @unconsumed_output = Queue.new - end - - def each - each_with_input do |output, index, input, type| - yield output - end - end - - def each_with_index - each_with_input do |output, index, input| - yield output, index - end - end - - def each_with_input - exception = nil - each_with_exceptions do |output, index, input, type| - if type == :exception - if @options[:ordered] == false - exception ||= output - else - raise output - end - else - yield output, index, input - end - end - raise exception if exception - end - - def each_with_exceptions(&block) - if @options[:ordered] == false - each_with_exceptions_unordered(&block) - else - each_with_exceptions_ordered(&block) - end - end - - def wait - exception = nil - each_with_exceptions_unordered do |output, index, input, type| - exception ||= output if type == :exception - end - raise exception if exception - end - - private - - def each_with_exceptions_unordered - # Grab all the inputs, yielding any responses during enumeration - # in case the enumeration itself takes time - begin - @enumerable.each_with_index do |input, index| - @unconsumed_input.push([ input, index ]) - @parent_task_queue.push(method(:process_one)) - no_more_inputs = false - while !@unconsumed_output.empty? - output, index, input, type = @unconsumed_output.pop - yield output, index, input, type - if type == :exception && @options[:stop_on_exception] - no_more_inputs = true - end - end - break if no_more_inputs - end - rescue - # We still want to wait for the rest of the outputs to process - @unconsumed_output.push([$!, nil, nil, :exception]) - if @options[:stop_on_exception] - @unconsumed_input.clear - end - end - - while !@unconsumed_input.empty? || @in_process > 0 || !@unconsumed_output.empty? - # yield thread to others (for 1.8.7) - if @unconsumed_output.empty? - sleep(0.01) - end - - while !@unconsumed_output.empty? - yield @unconsumed_output.pop - end - - # If no one is working on our tasks and we're allowed to - # work on them in the main thread, process an input to - # move things forward. - if @in_process == 0 && !(@options[:main_thread_processing] == false) - process_one - end - end - end - - def each_with_exceptions_ordered - next_to_yield = 0 - unconsumed = {} - each_with_exceptions_unordered do |output, index, input, type| - unconsumed[index] = [ output, input, type ] - while unconsumed[next_to_yield] - input_output = unconsumed.delete(next_to_yield) - yield input_output[0], next_to_yield, input_output[1], input_output[2] - next_to_yield += 1 - end - end - input_exception = unconsumed.delete(nil) - if input_exception - yield input_exception[0], next_to_yield, input_exception[1], input_exception[2] - end - end - - def process_one - @in_process += 1 - begin - begin - input, index = @unconsumed_input.pop(true) - process_input(input, index) - rescue ThreadError - end - ensure - @in_process -= 1 - end - end - - def process_input(input, index) - begin - output = @block.call(input) - @unconsumed_output.push([ output, index, input, :result ]) - rescue - if @options[:stop_on_exception] - @unconsumed_input.clear - end - @unconsumed_output.push([ $!, index, input, :exception ]) - end - - index - end - end end end end diff --git a/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb new file mode 100644 index 0000000000..7321aa0664 --- /dev/null +++ b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb @@ -0,0 +1,35 @@ +class Chef + module ChefFS + class Parallelizer + class FlattenEnumerable + include Enumerable + + def initialize(enum, levels = nil) + @enum = enum + @levels = levels + end + + attr_reader :enum + attr_reader :levels + + def each(&block) + enum.each do |value| + flatten(value, levels, &block) + end + end + + private + + def flatten(value, levels, &block) + if levels != 0 && value.respond_to?(:each) && !value.is_a?(String) + value.each do |child| + flatten(child, levels.nil? ? levels : levels-1, &block) + end + else + block.call(value) + end + end + end + end + end +end diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb new file mode 100644 index 0000000000..b293497889 --- /dev/null +++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb @@ -0,0 +1,189 @@ +require 'chef/chef_fs/parallelizer/flatten_enumerable' + +class Chef + module ChefFS + class Parallelizer + class ParallelEnumerable + include Enumerable + + # options: + # :ordered [true|false] - whether the output should stay in the same order + # as the input (even though it may not actually be processed in that + # order). Default: true + # :stop_on_exception [true|false] - if true, when an exception occurs in either + # input or output, we wait for any outstanding processing to complete, + # but will not process any new inputs. Default: false + # :main_thread_processing [true|false] - whether the main thread pulling + # on each() is allowed to process inputs. Default: true + # NOTE: If you set this to false, parallelizer.kill will stop each() + # in its tracks, so you need to know for sure that won't happen. + def initialize(parent_task_queue, enumerable, options = {}, &block) + @parent_task_queue = parent_task_queue + @enumerable = enumerable + @options = options + @block = block + + @unconsumed_input = Queue.new + @in_process = 0 + + @unconsumed_output = Queue.new + end + + def each + each_with_input do |output, index, input, type| + yield output + end + end + + def each_with_index + each_with_input do |output, index, input| + yield output, index + end + end + + def each_with_input + exception = nil + each_with_exceptions do |output, index, input, type| + if type == :exception + if @options[:ordered] == false + exception ||= output + else + raise output + end + else + yield output, index, input + end + end + raise exception if exception + end + + def each_with_exceptions(&block) + if @options[:ordered] == false + each_with_exceptions_unordered(&block) + else + each_with_exceptions_ordered(&block) + end + end + + def wait + exception = nil + each_with_exceptions_unordered do |output, index, input, type| + exception ||= output if type == :exception + end + raise exception if exception + end + + def flatten(levels = nil) + FlattenEnumerable.new(self, levels) + end + + private + + def each_with_exceptions_unordered + # Grab all the inputs, yielding any responses during enumeration + # in case the enumeration itself takes time + begin + @enumerable.each_with_index do |input, index| + @unconsumed_input.push([ input, index ]) + @parent_task_queue.push(method(:process_one)) + no_more_inputs = false + while !@unconsumed_output.empty? + output, index, input, type = @unconsumed_output.pop + yield output, index, input, type + if type == :exception && @options[:stop_on_exception] + no_more_inputs = true + end + end + break if no_more_inputs + end + rescue + # We still want to wait for the rest of the outputs to process + @unconsumed_output.push([$!, nil, nil, :exception]) + if @options[:stop_on_exception] + @unconsumed_input.clear + end + end + + while !finished? + # yield thread to others (for 1.8.7) + if @unconsumed_output.empty? + sleep(0.01) + end + + while !@unconsumed_output.empty? + yield @unconsumed_output.pop + end + + # If no one is working on our tasks and we're allowed to + # work on them in the main thread, process an input to + # move things forward. + if @in_process == 0 && !(@options[:main_thread_processing] == false) + process_one + end + end + end + + def each_with_exceptions_ordered + next_to_yield = 0 + unconsumed = {} + each_with_exceptions_unordered do |output, index, input, type| + unconsumed[index] = [ output, input, type ] + while unconsumed[next_to_yield] + input_output = unconsumed.delete(next_to_yield) + yield input_output[0], next_to_yield, input_output[1], input_output[2] + next_to_yield += 1 + end + end + input_exception = unconsumed.delete(nil) + if input_exception + yield input_exception[0], next_to_yield, input_exception[1], input_exception[2] + end + end + + # + # This is thread safe only if called from the main thread pulling on each(). + # The order of these checks is important, as well, to be thread safe. + # 1. If @unconsumed_input.empty? is true, then we will never have any more + # work legitimately picked up. + # 2. If @in_process == 0, then there is no work in process, and because ofwhen unconsumed_input is empty, it will never go back up, because + # this is called after the input enumerator is finished. Note that switching #2 and #1 + # could cause a race, because in_process is incremented *before* consuming input. + # 3. If @unconsumed_output.empty? is true, then we are done with outputs. + # Thus, 1+2 means no more output will ever show up, and 3 means we've passed all + # existing outputs to the user. + # + def finished? + @unconsumed_input.empty? && @in_process == 0 && @unconsumed_output.empty? + end + + def process_one + @in_process += 1 + begin + begin + input, index = @unconsumed_input.pop(true) + process_input(input, index) + rescue ThreadError + end + ensure + @in_process -= 1 + end + end + + def process_input(input, index) + begin + output = @block.call(input) + @unconsumed_output.push([ output, index, input, :result ]) + + rescue + if @options[:stop_on_exception] + @unconsumed_input.clear + end + @unconsumed_output.push([ $!, index, input, :exception ]) + end + + index + end + end + end + end +end diff --git a/lib/chef/knife/list.rb b/lib/chef/knife/list.rb index 4338e195bd..137d61f3a5 100644 --- a/lib/chef/knife/list.rb +++ b/lib/chef/knife/list.rb @@ -43,21 +43,23 @@ class Chef def run patterns = name_args.length == 0 ? [""] : name_args - # Get the matches (recursively) - all_results = parallelize(pattern_args_from(patterns), :flatten => true) do |pattern| - pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern) + # Get the top-level matches + args = pattern_args_from(patterns) + all_results = parallelize(pattern_args_from(patterns)) do |pattern| + pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern).to_a if pattern_results.first && !pattern_results.first.exists? && pattern.exact_path ui.error "#{format_path(pattern_results.first)}: No such file or directory" self.exit_code = 1 end pattern_results - end + end.flatten(1).to_a # Process directories if !config[:bare_directories] - dir_results = parallelize(all_results.select { |result| result.dir? }, :flatten => true) do |result| + dir_results = parallelize(all_results.select { |result| result.dir? }) do |result| add_dir_result(result) - end.to_a + end.flatten(1) + else dir_results = [] end @@ -109,7 +111,7 @@ class Chef result = [ [ result, children ] ] if config[:recursive] child_dirs = children.select { |child| child.dir? } - result += parallelize(child_dirs, :flatten => true) { |child| add_dir_result(child) }.to_a + result += parallelize(child_dirs) { |child| add_dir_result(child) }.flatten(1).to_a end result end @@ -152,4 +154,3 @@ class Chef end end end - diff --git a/lib/chef/knife/show.rb b/lib/chef/knife/show.rb index acf1996e96..4684a6ac7e 100644 --- a/lib/chef/knife/show.rb +++ b/lib/chef/knife/show.rb @@ -20,7 +20,7 @@ class Chef def run # Get the matches (recursively) error = false - entry_values = parallelize(pattern_args, :flatten => true) do |pattern| + entry_values = parallelize(pattern_args) do |pattern| parallelize(Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern)) do |entry| if entry.dir? ui.error "#{format_path(entry)}: is a directory" if pattern.exact_path @@ -40,7 +40,7 @@ class Chef end end end - end + end.flatten(1) entry_values.each do |entry, value| if entry output "#{format_path(entry)}:" @@ -54,4 +54,3 @@ class Chef end end end - diff --git a/spec/integration/knife/chef_repo_path_spec.rb b/spec/integration/knife/chef_repo_path_spec.rb index 4ffb179a4b..87619d8a58 100644 --- a/spec/integration/knife/chef_repo_path_spec.rb +++ b/spec/integration/knife/chef_repo_path_spec.rb @@ -101,7 +101,7 @@ EOM /users/ /users/user3.json EOM - end + end context 'when cwd is at the top level' do cwd '.' diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb index 5821721703..5c9f38a8eb 100644 --- a/spec/unit/chef_fs/parallelizer.rb +++ b/spec/unit/chef_fs/parallelizer.rb @@ -192,6 +192,30 @@ describe Chef::ChefFS::Parallelizer do processed.should == 4 end end + + class SlowEnumerable + def initialize(*values) + @values = values + end + include Enumerable + def each + @values.each do |value| + yield value + sleep 0.1 + end + end + end + + it "When the input is slow, output still proceeds" do + enum = parallelize(SlowEnumerable.new(1,2,3)) { |x| x }.enum_for(:each) + enum.next.should == 1 + elapsed_time.should < 0.2 + enum.next.should == 2 + elapsed_time.should < 0.3 + enum.next.should == 3 + elapsed_time.should < 0.4 + expect { enum.next }.to raise_error StopIteration + end end context "With a Parallelizer with 1 thread" do |