summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-18 18:44:03 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-18 18:44:03 -0700
commita32736de04df3c40fb21ec57db00128cf33e1f7d (patch)
tree2a98fef314a9dc9135b823283ef9c7864a44faee
parent31b553dcbcc9623eb75a8faa11c01e627851bd37 (diff)
downloadchef-a32736de04df3c40fb21ec57db00128cf33e1f7d.tar.gz
Add ability to flatten parallel enumerable
-rw-r--r--lib/chef/chef_fs/command_line.rb8
-rw-r--r--lib/chef/chef_fs/file_system.rb6
-rw-r--r--lib/chef/chef_fs/parallelizer.rb167
-rw-r--r--lib/chef/chef_fs/parallelizer/flatten_enumerable.rb35
-rw-r--r--lib/chef/chef_fs/parallelizer/parallel_enumerable.rb189
-rw-r--r--lib/chef/knife/list.rb17
-rw-r--r--lib/chef/knife/show.rb5
-rw-r--r--spec/integration/knife/chef_repo_path_spec.rb2
-rw-r--r--spec/unit/chef_fs/parallelizer.rb24
9 files changed, 273 insertions, 180 deletions
diff --git a/lib/chef/chef_fs/command_line.rb b/lib/chef/chef_fs/command_line.rb
index d0183a5a2a..967c59ecae 100644
--- a/lib/chef/chef_fs/command_line.rb
+++ b/lib/chef/chef_fs/command_line.rb
@@ -129,9 +129,9 @@ class Chef
end
def self.diff(pattern, old_root, new_root, recurse_depth, get_content)
- Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root), :flatten => true) do |old_entry, new_entry|
+ Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root)) do |old_entry, new_entry|
diff_entries(old_entry, new_entry, recurse_depth, get_content)
- end
+ end.flatten(1)
end
# Diff two known entries (could be files or dirs)
@@ -142,9 +142,9 @@ class Chef
if recurse_depth == 0
return [ [ :common_subdirectories, old_entry, new_entry ] ]
else
- return Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry), :flatten => true) do |old_child, new_child|
+ return Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry)) do |old_child, new_child|
Chef::ChefFS::CommandLine.diff_entries(old_child, new_child, recurse_depth ? recurse_depth - 1 : nil, get_content)
- end
+ end.flatten(1)
end
# If old is a directory and new is a file
diff --git a/lib/chef/chef_fs/file_system.rb b/lib/chef/chef_fs/file_system.rb
index f2478c4680..9fe726744e 100644
--- a/lib/chef/chef_fs/file_system.rb
+++ b/lib/chef/chef_fs/file_system.rb
@@ -72,8 +72,8 @@ class Chef
# Otherwise, go through all children and find any matches
elsif entry.dir?
- results = Parallelizer::parallelize(entry.children, :flatten => true) { |child| Chef::ChefFS::FileSystem.list(child, pattern) }
- results.each(&block)
+ results = Parallelizer::parallelize(entry.children) { |child| Chef::ChefFS::FileSystem.list(child, pattern) }
+ results.flatten(1).each(&block)
end
end
end
@@ -419,7 +419,7 @@ class Chef
end
def self.parallel_do(enum, options = {}, &block)
- Chef::ChefFS::Parallelizer.parallelize(enum, options, &block).to_a
+ Chef::ChefFS::Parallelizer.parallel_do(enum, options, &block)
end
end
end
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 5b575bb207..8172aaf6e8 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -1,7 +1,13 @@
require 'thread'
+require 'chef/chef_fs/parallelizer/parallel_enumerable'
class Chef
module ChefFS
+ # Tries to balance several guarantees, in order of priority:
+ # - don't get deadlocked
+ # - provide results in desired order
+ # - provide results as soon as they are available
+ # - process input as soon as possible
class Parallelizer
@@parallelizer = nil
@@threads = 0
@@ -62,167 +68,6 @@ class Chef
end
end
end
-
- class ParallelEnumerable
- include Enumerable
-
- # options:
- # :ordered [true|false] - whether the output should stay in the same order
- # as the input (even though it may not actually be processed in that
- # order). Default: true
- # :stop_on_exception [true|false] - if true, when an exception occurs in either
- # input or output, we wait for any outstanding processing to complete,
- # but will not process any new inputs. Default: false
- # :main_thread_processing [true|false] - whether the main thread pulling
- # on each() is allowed to process inputs. Default: true
- # NOTE: If you set this to false, parallelizer.kill will stop each()
- # in its tracks, so you need to know for sure that won't happen.
- def initialize(parent_task_queue, enumerable, options, &block)
- @parent_task_queue = parent_task_queue
- @enumerable = enumerable
- @options = options
- @block = block
-
- @unconsumed_input = Queue.new
- @in_process = 0
-
- @unconsumed_output = Queue.new
- end
-
- def each
- each_with_input do |output, index, input, type|
- yield output
- end
- end
-
- def each_with_index
- each_with_input do |output, index, input|
- yield output, index
- end
- end
-
- def each_with_input
- exception = nil
- each_with_exceptions do |output, index, input, type|
- if type == :exception
- if @options[:ordered] == false
- exception ||= output
- else
- raise output
- end
- else
- yield output, index, input
- end
- end
- raise exception if exception
- end
-
- def each_with_exceptions(&block)
- if @options[:ordered] == false
- each_with_exceptions_unordered(&block)
- else
- each_with_exceptions_ordered(&block)
- end
- end
-
- def wait
- exception = nil
- each_with_exceptions_unordered do |output, index, input, type|
- exception ||= output if type == :exception
- end
- raise exception if exception
- end
-
- private
-
- def each_with_exceptions_unordered
- # Grab all the inputs, yielding any responses during enumeration
- # in case the enumeration itself takes time
- begin
- @enumerable.each_with_index do |input, index|
- @unconsumed_input.push([ input, index ])
- @parent_task_queue.push(method(:process_one))
- no_more_inputs = false
- while !@unconsumed_output.empty?
- output, index, input, type = @unconsumed_output.pop
- yield output, index, input, type
- if type == :exception && @options[:stop_on_exception]
- no_more_inputs = true
- end
- end
- break if no_more_inputs
- end
- rescue
- # We still want to wait for the rest of the outputs to process
- @unconsumed_output.push([$!, nil, nil, :exception])
- if @options[:stop_on_exception]
- @unconsumed_input.clear
- end
- end
-
- while !@unconsumed_input.empty? || @in_process > 0 || !@unconsumed_output.empty?
- # yield thread to others (for 1.8.7)
- if @unconsumed_output.empty?
- sleep(0.01)
- end
-
- while !@unconsumed_output.empty?
- yield @unconsumed_output.pop
- end
-
- # If no one is working on our tasks and we're allowed to
- # work on them in the main thread, process an input to
- # move things forward.
- if @in_process == 0 && !(@options[:main_thread_processing] == false)
- process_one
- end
- end
- end
-
- def each_with_exceptions_ordered
- next_to_yield = 0
- unconsumed = {}
- each_with_exceptions_unordered do |output, index, input, type|
- unconsumed[index] = [ output, input, type ]
- while unconsumed[next_to_yield]
- input_output = unconsumed.delete(next_to_yield)
- yield input_output[0], next_to_yield, input_output[1], input_output[2]
- next_to_yield += 1
- end
- end
- input_exception = unconsumed.delete(nil)
- if input_exception
- yield input_exception[0], next_to_yield, input_exception[1], input_exception[2]
- end
- end
-
- def process_one
- @in_process += 1
- begin
- begin
- input, index = @unconsumed_input.pop(true)
- process_input(input, index)
- rescue ThreadError
- end
- ensure
- @in_process -= 1
- end
- end
-
- def process_input(input, index)
- begin
- output = @block.call(input)
- @unconsumed_output.push([ output, index, input, :result ])
- rescue
- if @options[:stop_on_exception]
- @unconsumed_input.clear
- end
- @unconsumed_output.push([ $!, index, input, :exception ])
- end
-
- index
- end
- end
end
end
end
diff --git a/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb
new file mode 100644
index 0000000000..7321aa0664
--- /dev/null
+++ b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb
@@ -0,0 +1,35 @@
+class Chef
+ module ChefFS
+ class Parallelizer
+ class FlattenEnumerable
+ include Enumerable
+
+ def initialize(enum, levels = nil)
+ @enum = enum
+ @levels = levels
+ end
+
+ attr_reader :enum
+ attr_reader :levels
+
+ def each(&block)
+ enum.each do |value|
+ flatten(value, levels, &block)
+ end
+ end
+
+ private
+
+ def flatten(value, levels, &block)
+ if levels != 0 && value.respond_to?(:each) && !value.is_a?(String)
+ value.each do |child|
+ flatten(child, levels.nil? ? levels : levels-1, &block)
+ end
+ else
+ block.call(value)
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
new file mode 100644
index 0000000000..b293497889
--- /dev/null
+++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
@@ -0,0 +1,189 @@
+require 'chef/chef_fs/parallelizer/flatten_enumerable'
+
+class Chef
+ module ChefFS
+ class Parallelizer
+ class ParallelEnumerable
+ include Enumerable
+
+ # options:
+ # :ordered [true|false] - whether the output should stay in the same order
+ # as the input (even though it may not actually be processed in that
+ # order). Default: true
+ # :stop_on_exception [true|false] - if true, when an exception occurs in either
+ # input or output, we wait for any outstanding processing to complete,
+ # but will not process any new inputs. Default: false
+ # :main_thread_processing [true|false] - whether the main thread pulling
+ # on each() is allowed to process inputs. Default: true
+ # NOTE: If you set this to false, parallelizer.kill will stop each()
+ # in its tracks, so you need to know for sure that won't happen.
+ def initialize(parent_task_queue, enumerable, options = {}, &block)
+ @parent_task_queue = parent_task_queue
+ @enumerable = enumerable
+ @options = options
+ @block = block
+
+ @unconsumed_input = Queue.new
+ @in_process = 0
+
+ @unconsumed_output = Queue.new
+ end
+
+ def each
+ each_with_input do |output, index, input, type|
+ yield output
+ end
+ end
+
+ def each_with_index
+ each_with_input do |output, index, input|
+ yield output, index
+ end
+ end
+
+ def each_with_input
+ exception = nil
+ each_with_exceptions do |output, index, input, type|
+ if type == :exception
+ if @options[:ordered] == false
+ exception ||= output
+ else
+ raise output
+ end
+ else
+ yield output, index, input
+ end
+ end
+ raise exception if exception
+ end
+
+ def each_with_exceptions(&block)
+ if @options[:ordered] == false
+ each_with_exceptions_unordered(&block)
+ else
+ each_with_exceptions_ordered(&block)
+ end
+ end
+
+ def wait
+ exception = nil
+ each_with_exceptions_unordered do |output, index, input, type|
+ exception ||= output if type == :exception
+ end
+ raise exception if exception
+ end
+
+ def flatten(levels = nil)
+ FlattenEnumerable.new(self, levels)
+ end
+
+ private
+
+ def each_with_exceptions_unordered
+ # Grab all the inputs, yielding any responses during enumeration
+ # in case the enumeration itself takes time
+ begin
+ @enumerable.each_with_index do |input, index|
+ @unconsumed_input.push([ input, index ])
+ @parent_task_queue.push(method(:process_one))
+ no_more_inputs = false
+ while !@unconsumed_output.empty?
+ output, index, input, type = @unconsumed_output.pop
+ yield output, index, input, type
+ if type == :exception && @options[:stop_on_exception]
+ no_more_inputs = true
+ end
+ end
+ break if no_more_inputs
+ end
+ rescue
+ # We still want to wait for the rest of the outputs to process
+ @unconsumed_output.push([$!, nil, nil, :exception])
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
+ end
+
+ while !finished?
+ # yield thread to others (for 1.8.7)
+ if @unconsumed_output.empty?
+ sleep(0.01)
+ end
+
+ while !@unconsumed_output.empty?
+ yield @unconsumed_output.pop
+ end
+
+ # If no one is working on our tasks and we're allowed to
+ # work on them in the main thread, process an input to
+ # move things forward.
+ if @in_process == 0 && !(@options[:main_thread_processing] == false)
+ process_one
+ end
+ end
+ end
+
+ def each_with_exceptions_ordered
+ next_to_yield = 0
+ unconsumed = {}
+ each_with_exceptions_unordered do |output, index, input, type|
+ unconsumed[index] = [ output, input, type ]
+ while unconsumed[next_to_yield]
+ input_output = unconsumed.delete(next_to_yield)
+ yield input_output[0], next_to_yield, input_output[1], input_output[2]
+ next_to_yield += 1
+ end
+ end
+ input_exception = unconsumed.delete(nil)
+ if input_exception
+ yield input_exception[0], next_to_yield, input_exception[1], input_exception[2]
+ end
+ end
+
+ #
+ # This is thread safe only if called from the main thread pulling on each().
+ # The order of these checks is important, as well, to be thread safe.
+ # 1. If @unconsumed_input.empty? is true, then we will never have any more
+ # work legitimately picked up.
+ # 2. If @in_process == 0, then there is no work in process, and because ofwhen unconsumed_input is empty, it will never go back up, because
+ # this is called after the input enumerator is finished. Note that switching #2 and #1
+ # could cause a race, because in_process is incremented *before* consuming input.
+ # 3. If @unconsumed_output.empty? is true, then we are done with outputs.
+ # Thus, 1+2 means no more output will ever show up, and 3 means we've passed all
+ # existing outputs to the user.
+ #
+ def finished?
+ @unconsumed_input.empty? && @in_process == 0 && @unconsumed_output.empty?
+ end
+
+ def process_one
+ @in_process += 1
+ begin
+ begin
+ input, index = @unconsumed_input.pop(true)
+ process_input(input, index)
+ rescue ThreadError
+ end
+ ensure
+ @in_process -= 1
+ end
+ end
+
+ def process_input(input, index)
+ begin
+ output = @block.call(input)
+ @unconsumed_output.push([ output, index, input, :result ])
+
+ rescue
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
+ @unconsumed_output.push([ $!, index, input, :exception ])
+ end
+
+ index
+ end
+ end
+ end
+ end
+end
diff --git a/lib/chef/knife/list.rb b/lib/chef/knife/list.rb
index 4338e195bd..137d61f3a5 100644
--- a/lib/chef/knife/list.rb
+++ b/lib/chef/knife/list.rb
@@ -43,21 +43,23 @@ class Chef
def run
patterns = name_args.length == 0 ? [""] : name_args
- # Get the matches (recursively)
- all_results = parallelize(pattern_args_from(patterns), :flatten => true) do |pattern|
- pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern)
+ # Get the top-level matches
+ args = pattern_args_from(patterns)
+ all_results = parallelize(pattern_args_from(patterns)) do |pattern|
+ pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern).to_a
if pattern_results.first && !pattern_results.first.exists? && pattern.exact_path
ui.error "#{format_path(pattern_results.first)}: No such file or directory"
self.exit_code = 1
end
pattern_results
- end
+ end.flatten(1).to_a
# Process directories
if !config[:bare_directories]
- dir_results = parallelize(all_results.select { |result| result.dir? }, :flatten => true) do |result|
+ dir_results = parallelize(all_results.select { |result| result.dir? }) do |result|
add_dir_result(result)
- end.to_a
+ end.flatten(1)
+
else
dir_results = []
end
@@ -109,7 +111,7 @@ class Chef
result = [ [ result, children ] ]
if config[:recursive]
child_dirs = children.select { |child| child.dir? }
- result += parallelize(child_dirs, :flatten => true) { |child| add_dir_result(child) }.to_a
+ result += parallelize(child_dirs) { |child| add_dir_result(child) }.flatten(1).to_a
end
result
end
@@ -152,4 +154,3 @@ class Chef
end
end
end
-
diff --git a/lib/chef/knife/show.rb b/lib/chef/knife/show.rb
index acf1996e96..4684a6ac7e 100644
--- a/lib/chef/knife/show.rb
+++ b/lib/chef/knife/show.rb
@@ -20,7 +20,7 @@ class Chef
def run
# Get the matches (recursively)
error = false
- entry_values = parallelize(pattern_args, :flatten => true) do |pattern|
+ entry_values = parallelize(pattern_args) do |pattern|
parallelize(Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern)) do |entry|
if entry.dir?
ui.error "#{format_path(entry)}: is a directory" if pattern.exact_path
@@ -40,7 +40,7 @@ class Chef
end
end
end
- end
+ end.flatten(1)
entry_values.each do |entry, value|
if entry
output "#{format_path(entry)}:"
@@ -54,4 +54,3 @@ class Chef
end
end
end
-
diff --git a/spec/integration/knife/chef_repo_path_spec.rb b/spec/integration/knife/chef_repo_path_spec.rb
index 4ffb179a4b..87619d8a58 100644
--- a/spec/integration/knife/chef_repo_path_spec.rb
+++ b/spec/integration/knife/chef_repo_path_spec.rb
@@ -101,7 +101,7 @@ EOM
/users/
/users/user3.json
EOM
- end
+ end
context 'when cwd is at the top level' do
cwd '.'
diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb
index 5821721703..5c9f38a8eb 100644
--- a/spec/unit/chef_fs/parallelizer.rb
+++ b/spec/unit/chef_fs/parallelizer.rb
@@ -192,6 +192,30 @@ describe Chef::ChefFS::Parallelizer do
processed.should == 4
end
end
+
+ class SlowEnumerable
+ def initialize(*values)
+ @values = values
+ end
+ include Enumerable
+ def each
+ @values.each do |value|
+ yield value
+ sleep 0.1
+ end
+ end
+ end
+
+ it "When the input is slow, output still proceeds" do
+ enum = parallelize(SlowEnumerable.new(1,2,3)) { |x| x }.enum_for(:each)
+ enum.next.should == 1
+ elapsed_time.should < 0.2
+ enum.next.should == 2
+ elapsed_time.should < 0.3
+ enum.next.should == 3
+ elapsed_time.should < 0.4
+ expect { enum.next }.to raise_error StopIteration
+ end
end
context "With a Parallelizer with 1 thread" do