summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-20 12:21:18 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-20 12:21:18 -0700
commit8453ed5652da2498a5bb7e15e3468e3fbffa720f (patch)
treea837b9046f6b485129b99c4a61c976ce99ed7665 /lib
parent8a9b9bd5d186c300c4d1bd969b95b40a6f4d931c (diff)
parentd6f24954a16aa5e2e60050e1344265353a381428 (diff)
downloadchef-8453ed5652da2498a5bb7e15e3468e3fbffa720f.tar.gz
Merge parallelizer robustification and enhancements
Diffstat (limited to 'lib')
-rw-r--r--lib/chef/chef_fs/command_line.rb8
-rw-r--r--lib/chef/chef_fs/file_system.rb6
-rw-r--r--lib/chef/chef_fs/parallelizer.rb156
-rw-r--r--lib/chef/chef_fs/parallelizer/flatten_enumerable.rb35
-rw-r--r--lib/chef/chef_fs/parallelizer/parallel_enumerable.rb279
-rw-r--r--lib/chef/knife/list.rb17
-rw-r--r--lib/chef/knife/show.rb5
7 files changed, 398 insertions, 108 deletions
diff --git a/lib/chef/chef_fs/command_line.rb b/lib/chef/chef_fs/command_line.rb
index d0183a5a2a..967c59ecae 100644
--- a/lib/chef/chef_fs/command_line.rb
+++ b/lib/chef/chef_fs/command_line.rb
@@ -129,9 +129,9 @@ class Chef
end
def self.diff(pattern, old_root, new_root, recurse_depth, get_content)
- Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root), :flatten => true) do |old_entry, new_entry|
+ Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root)) do |old_entry, new_entry|
diff_entries(old_entry, new_entry, recurse_depth, get_content)
- end
+ end.flatten(1)
end
# Diff two known entries (could be files or dirs)
@@ -142,9 +142,9 @@ class Chef
if recurse_depth == 0
return [ [ :common_subdirectories, old_entry, new_entry ] ]
else
- return Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry), :flatten => true) do |old_child, new_child|
+ return Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry)) do |old_child, new_child|
Chef::ChefFS::CommandLine.diff_entries(old_child, new_child, recurse_depth ? recurse_depth - 1 : nil, get_content)
- end
+ end.flatten(1)
end
# If old is a directory and new is a file
diff --git a/lib/chef/chef_fs/file_system.rb b/lib/chef/chef_fs/file_system.rb
index f2478c4680..9fe726744e 100644
--- a/lib/chef/chef_fs/file_system.rb
+++ b/lib/chef/chef_fs/file_system.rb
@@ -72,8 +72,8 @@ class Chef
# Otherwise, go through all children and find any matches
elsif entry.dir?
- results = Parallelizer::parallelize(entry.children, :flatten => true) { |child| Chef::ChefFS::FileSystem.list(child, pattern) }
- results.each(&block)
+ results = Parallelizer::parallelize(entry.children) { |child| Chef::ChefFS::FileSystem.list(child, pattern) }
+ results.flatten(1).each(&block)
end
end
end
@@ -419,7 +419,7 @@ class Chef
end
def self.parallel_do(enum, options = {}, &block)
- Chef::ChefFS::Parallelizer.parallelize(enum, options, &block).to_a
+ Chef::ChefFS::Parallelizer.parallel_do(enum, options, &block)
end
end
end
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 84f3d4d870..116a626869 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -1,127 +1,103 @@
+require 'thread'
+require 'chef/chef_fs/parallelizer/parallel_enumerable'
+
class Chef
module ChefFS
+ # Tries to balance several guarantees, in order of priority:
+ # - don't get deadlocked
+ # - provide results in desired order
+ # - provide results as soon as they are available
+ # - process input as soon as possible
class Parallelizer
@@parallelizer = nil
@@threads = 0
def self.threads=(value)
- if @@threads != value
- @@threads = value
- @@parallelizer = nil
- end
+ @@threads = value
+ @@parallelizer.resize(value) if @@parallelizer
end
- def self.parallelize(enumerator, options = {}, &block)
+ def self.parallelizer
@@parallelizer ||= Parallelizer.new(@@threads)
- @@parallelizer.parallelize(enumerator, options, &block)
end
- def initialize(threads)
- @tasks_mutex = Mutex.new
- @tasks = []
- @threads = []
- 1.upto(threads) do
- @threads << Thread.new { worker_loop }
- end
+ def self.parallelize(enumerable, options = {}, &block)
+ parallelizer.parallelize(enumerable, options, &block)
end
- def parallelize(enumerator, options = {}, &block)
- task = ParallelizedResults.new(enumerator, options, &block)
- @tasks_mutex.synchronize do
- @tasks << task
- end
- task
+ def self.parallel_do(enumerable, options = {}, &block)
+ parallelizer.parallel_do(enumerable, options, &block)
end
- class ParallelizedResults
- include Enumerable
+ def initialize(num_threads)
+ @tasks = Queue.new
+ @threads = []
+ @stop_thread = {}
+ resize(num_threads)
+ end
- def initialize(enumerator, options, &block)
- @inputs = enumerator.to_a
- @options = options
- @block = block
+ def num_threads
+ @threads.size
+ end
- @mutex = Mutex.new
- @outputs = []
- @status = []
- end
+ def parallelize(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options, &block)
+ end
- def each
- next_index = 0
- while true
- # Report any results that already exist
- while @status.length > next_index && ([:finished, :exception].include?(@status[next_index]))
- if @status[next_index] == :finished
- if @options[:flatten]
- @outputs[next_index].each do |entry|
- yield entry
- end
- else
- yield @outputs[next_index]
- end
- else
- raise @outputs[next_index]
- end
- next_index = next_index + 1
- end
+ def parallel_do(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
+ end
- # Pick up a result and process it, if there is one. This ensures we
- # move forward even if there are *zero* worker threads available.
- if !process_input
- # Exit if we're done.
- if next_index >= @status.length
- break
- else
- # Ruby 1.8 threading sucks. Wait till we process more things.
- sleep(0.05)
- end
- end
+ def stop(wait = true, timeout = nil)
+ resize(0, wait, timeout)
+ end
+
+ def resize(to_threads, wait = true, timeout = nil)
+ if to_threads < num_threads
+ threads_to_stop = @threads[to_threads..num_threads-1]
+ @threads = @threads.slice(0, to_threads)
+ threads_to_stop.each do |thread|
+ @stop_thread[thread] = true
end
- end
- def process_input
- # Grab the next one to process
- index, input = @mutex.synchronize do
- index = @status.length
- if index >= @inputs.length
- return nil
+ if wait
+ start_time = Time.now
+ threads_to_stop.each do |thread|
+ thread_timeout = timeout ? timeout - (Time.now - start_time) : nil
+ thread.join(thread_timeout)
end
- input = @inputs[index]
- @status[index] = :started
- [ index, input ]
end
- begin
- @outputs[index] = @block.call(input)
- @status[index] = :finished
- rescue Exception
- @outputs[index] = $!
- @status[index] = :exception
+ else
+ num_threads.upto(to_threads - 1) do |i|
+ @threads[i] = Thread.new(&method(:worker_loop))
end
- index
end
end
+ def kill
+ @threads.each do |thread|
+ Thread.kill(thread)
+ @stop_thread.delete(thread)
+ end
+ @threads = []
+ end
+
private
def worker_loop
- while true
- begin
- task = @tasks[0]
- if task
- if !task.process_input
- @tasks_mutex.synchronize do
- @tasks.delete(task)
- end
- end
- else
- # Ruby 1.8 threading sucks. Wait a bit to see if another task comes in.
- sleep(0.05)
+ begin
+ while !@stop_thread[Thread.current]
+ begin
+ task = @tasks.pop
+ task.call
+ rescue
+ puts "ERROR #{$!}"
+ puts $!.backtrace
end
- rescue
- puts "ERROR #{$!}"
- puts $!.backtrace
end
+ ensure
+ @stop_thread.delete(Thread.current)
end
end
end
diff --git a/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb
new file mode 100644
index 0000000000..7321aa0664
--- /dev/null
+++ b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb
@@ -0,0 +1,35 @@
+class Chef
+ module ChefFS
+ class Parallelizer
+ class FlattenEnumerable
+ include Enumerable
+
+ def initialize(enum, levels = nil)
+ @enum = enum
+ @levels = levels
+ end
+
+ attr_reader :enum
+ attr_reader :levels
+
+ def each(&block)
+ enum.each do |value|
+ flatten(value, levels, &block)
+ end
+ end
+
+ private
+
+ def flatten(value, levels, &block)
+ if levels != 0 && value.respond_to?(:each) && !value.is_a?(String)
+ value.each do |child|
+ flatten(child, levels.nil? ? levels : levels-1, &block)
+ end
+ else
+ block.call(value)
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
new file mode 100644
index 0000000000..8e50f361db
--- /dev/null
+++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
@@ -0,0 +1,279 @@
+require 'chef/chef_fs/parallelizer/flatten_enumerable'
+
+class Chef
+ module ChefFS
+ class Parallelizer
+ class ParallelEnumerable
+ include Enumerable
+
+ # options:
+ # :ordered [true|false] - whether the output should stay in the same order
+ # as the input (even though it may not actually be processed in that
+ # order). Default: true
+ # :stop_on_exception [true|false] - if true, when an exception occurs in either
+ # input or output, we wait for any outstanding processing to complete,
+ # but will not process any new inputs. Default: false
+ # :main_thread_processing [true|false] - whether the main thread pulling
+ # on each() is allowed to process inputs. Default: true
+ # NOTE: If you set this to false, parallelizer.kill will stop each()
+ # in its tracks, so you need to know for sure that won't happen.
+ def initialize(parent_task_queue, input_enumerable, options = {}, &block)
+ @parent_task_queue = parent_task_queue
+ @input_enumerable = input_enumerable
+ @options = options
+ @block = block
+
+ @unconsumed_input = Queue.new
+ @in_process = {}
+ @unconsumed_output = Queue.new
+ end
+
+ attr_reader :parent_task_queue
+ attr_reader :input_enumerable
+ attr_reader :options
+ attr_reader :block
+
+ def each
+ each_with_input do |output, index, input, type|
+ yield output
+ end
+ end
+
+ def each_with_index
+ each_with_input do |output, index, input|
+ yield output, index
+ end
+ end
+
+ def each_with_input
+ exception = nil
+ each_with_exceptions do |output, index, input, type|
+ if type == :exception
+ if @options[:ordered] == false
+ exception ||= output
+ else
+ raise output
+ end
+ else
+ yield output, index, input
+ end
+ end
+ raise exception if exception
+ end
+
+ def each_with_exceptions(&block)
+ if @options[:ordered] == false
+ each_with_exceptions_unordered(&block)
+ else
+ each_with_exceptions_ordered(&block)
+ end
+ end
+
+ def wait
+ exception = nil
+ each_with_exceptions_unordered do |output, index, input, type|
+ exception ||= output if type == :exception
+ end
+ raise exception if exception
+ end
+
+ # Enumerable methods
+ def restricted_copy(enumerable)
+ ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block)
+ end
+
+ alias :original_count :count
+
+ def count(*args, &block)
+ if args.size == 0 && block.nil?
+ @input_enumerable.count
+ else
+ original_count(*args, &block)
+ end
+ end
+
+ def first(n=nil)
+ if n
+ restricted_copy(@input_enumerable.first(n)).to_a
+ else
+ first(1)[0]
+ end
+ end
+
+ def drop(n)
+ restricted_copy(@input_enumerable.drop(n)).to_a
+ end
+
+ def flatten(levels = nil)
+ FlattenEnumerable.new(self, levels)
+ end
+
+ def take(n)
+ restricted_copy(@input_enumerable.take(n)).to_a
+ end
+
+ if Enumerable.method_defined?(:lazy)
+ class RestrictedLazy
+ def initialize(parallel_enumerable, actual_lazy)
+ @parallel_enumerable = parallel_enumerable
+ @actual_lazy = actual_lazy
+ end
+
+ def drop(*args, &block)
+ input = @parallel_enumerable.input_enumerable.lazy.drop(*args, &block)
+ @parallel_enumerable.restricted_copy(input)
+ end
+
+ def take(*args, &block)
+ input = @parallel_enumerable.input_enumerable.lazy.take(*args, &block)
+ @parallel_enumerable.restricted_copy(input)
+ end
+
+ def method_missing(method, *args, &block)
+ @actual_lazy.send(:method, *args, &block)
+ end
+ end
+
+ alias :original_lazy :lazy
+
+ def lazy
+ RestrictedLazy.new(self, original_lazy)
+ end
+ end
+
+ private
+
+ def each_with_exceptions_unordered
+ if @each_running
+ raise "each() called on parallel enumerable twice simultaneously! Bad mojo"
+ end
+ @each_running = true
+ begin
+ # Grab all the inputs, yielding any responses during enumeration
+ # in case the enumeration itself takes time
+ begin
+ @input_enumerable.each_with_index do |input, index|
+ @unconsumed_input.push([ input, index ])
+ @parent_task_queue.push(method(:process_one))
+
+ stop_processing_input = false
+ while !@unconsumed_output.empty?
+ output, index, input, type = @unconsumed_output.pop
+ yield output, index, input, type
+ if type == :exception && @options[:stop_on_exception]
+ stop_processing_input = true
+ break
+ end
+ end
+
+ if stop_processing_input
+ break
+ end
+ end
+ rescue
+ # We still want to wait for the rest of the outputs to process
+ @unconsumed_output.push([$!, nil, nil, :exception])
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
+ end
+
+ while !finished?
+ # yield thread to others (for 1.8.7)
+ if @unconsumed_output.empty?
+ sleep(0.01)
+ end
+
+ while !@unconsumed_output.empty?
+ yield @unconsumed_output.pop
+ end
+
+ # If no one is working on our tasks and we're allowed to
+ # work on them in the main thread, process an input to
+ # move things forward.
+ if @in_process.size == 0 && !(@options[:main_thread_processing] == false)
+ process_one
+ end
+ end
+ ensure
+ # If someone called "first" or something that exits the enumerator
+ # early, we want to make sure and throw away any extra results
+ # (gracefully) so that the next enumerator can start over.
+ if !finished?
+ stop
+ end
+ @each_running = false
+ end
+ end
+
+ def each_with_exceptions_ordered
+ next_to_yield = 0
+ unconsumed = {}
+ each_with_exceptions_unordered do |output, index, input, type|
+ unconsumed[index] = [ output, input, type ]
+ while unconsumed[next_to_yield]
+ input_output = unconsumed.delete(next_to_yield)
+ yield input_output[0], next_to_yield, input_output[1], input_output[2]
+ next_to_yield += 1
+ end
+ end
+ input_exception = unconsumed.delete(nil)
+ if input_exception
+ yield input_exception[0], next_to_yield, input_exception[1], input_exception[2]
+ end
+ end
+
+ def stop
+ @unconsumed_input.clear
+ while @in_process.size > 0
+ sleep(0.05)
+ end
+ @unconsumed_output.clear
+ end
+
+ #
+ # This is thread safe only if called from the main thread pulling on each().
+ # The order of these checks is important, as well, to be thread safe.
+ # 1. If @unconsumed_input.empty? is true, then we will never have any more
+ # work legitimately picked up.
+ # 2. If @in_process == 0, then there is no work in process, and because ofwhen unconsumed_input is empty, it will never go back up, because
+ # this is called after the input enumerator is finished. Note that switching #2 and #1
+ # could cause a race, because in_process is incremented *before* consuming input.
+ # 3. If @unconsumed_output.empty? is true, then we are done with outputs.
+ # Thus, 1+2 means no more output will ever show up, and 3 means we've passed all
+ # existing outputs to the user.
+ #
+ def finished?
+ @unconsumed_input.empty? && @in_process.size == 0 && @unconsumed_output.empty?
+ end
+
+ def process_one
+ @in_process[Thread.current] = true
+ begin
+ begin
+ input, index = @unconsumed_input.pop(true)
+ process_input(input, index)
+ rescue ThreadError
+ end
+ ensure
+ @in_process.delete(Thread.current)
+ end
+ end
+
+ def process_input(input, index)
+ begin
+ output = @block.call(input)
+ @unconsumed_output.push([ output, index, input, :result ])
+ rescue
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
+ @unconsumed_output.push([ $!, index, input, :exception ])
+ end
+
+ index
+ end
+ end
+ end
+ end
+end
diff --git a/lib/chef/knife/list.rb b/lib/chef/knife/list.rb
index 4338e195bd..137d61f3a5 100644
--- a/lib/chef/knife/list.rb
+++ b/lib/chef/knife/list.rb
@@ -43,21 +43,23 @@ class Chef
def run
patterns = name_args.length == 0 ? [""] : name_args
- # Get the matches (recursively)
- all_results = parallelize(pattern_args_from(patterns), :flatten => true) do |pattern|
- pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern)
+ # Get the top-level matches
+ args = pattern_args_from(patterns)
+ all_results = parallelize(pattern_args_from(patterns)) do |pattern|
+ pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern).to_a
if pattern_results.first && !pattern_results.first.exists? && pattern.exact_path
ui.error "#{format_path(pattern_results.first)}: No such file or directory"
self.exit_code = 1
end
pattern_results
- end
+ end.flatten(1).to_a
# Process directories
if !config[:bare_directories]
- dir_results = parallelize(all_results.select { |result| result.dir? }, :flatten => true) do |result|
+ dir_results = parallelize(all_results.select { |result| result.dir? }) do |result|
add_dir_result(result)
- end.to_a
+ end.flatten(1)
+
else
dir_results = []
end
@@ -109,7 +111,7 @@ class Chef
result = [ [ result, children ] ]
if config[:recursive]
child_dirs = children.select { |child| child.dir? }
- result += parallelize(child_dirs, :flatten => true) { |child| add_dir_result(child) }.to_a
+ result += parallelize(child_dirs) { |child| add_dir_result(child) }.flatten(1).to_a
end
result
end
@@ -152,4 +154,3 @@ class Chef
end
end
end
-
diff --git a/lib/chef/knife/show.rb b/lib/chef/knife/show.rb
index acf1996e96..4684a6ac7e 100644
--- a/lib/chef/knife/show.rb
+++ b/lib/chef/knife/show.rb
@@ -20,7 +20,7 @@ class Chef
def run
# Get the matches (recursively)
error = false
- entry_values = parallelize(pattern_args, :flatten => true) do |pattern|
+ entry_values = parallelize(pattern_args) do |pattern|
parallelize(Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern)) do |entry|
if entry.dir?
ui.error "#{format_path(entry)}: is a directory" if pattern.exact_path
@@ -40,7 +40,7 @@ class Chef
end
end
end
- end
+ end.flatten(1)
entry_values.each do |entry, value|
if entry
output "#{format_path(entry)}:"
@@ -54,4 +54,3 @@ class Chef
end
end
end
-