diff options
author | John Keiser <jkeiser@opscode.com> | 2013-05-03 10:23:37 -0700 |
---|---|---|
committer | John Keiser <jkeiser@opscode.com> | 2013-06-07 13:12:32 -0700 |
commit | cf00c1587eadd3d0df5d73e132ce6084c0d35a71 (patch) | |
tree | b5fc9bc513673415ec0a48a9e6c05f999368e5ae | |
parent | 4b33978d9aa40ea66db16b2f985f51f19bdacf8d (diff) | |
download | chef-cf00c1587eadd3d0df5d73e132ce6084c0d35a71.tar.gz |
Parallelize knife list and knife show
-rw-r--r-- | lib/chef/chef_fs/file_system.rb | 8 | ||||
-rw-r--r-- | lib/chef/chef_fs/knife.rb | 3 | ||||
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 110 | ||||
-rw-r--r-- | lib/chef/knife/list.rb | 40 | ||||
-rw-r--r-- | lib/chef/knife/show.rb | 21 |
5 files changed, 155 insertions, 27 deletions
diff --git a/lib/chef/chef_fs/file_system.rb b/lib/chef/chef_fs/file_system.rb index 3c7fd3c155..523fe8b38e 100644 --- a/lib/chef/chef_fs/file_system.rb +++ b/lib/chef/chef_fs/file_system.rb @@ -20,6 +20,7 @@ require 'chef/chef_fs/path_utils' require 'chef/chef_fs/file_system/default_environment_cannot_be_modified_error' require 'chef/chef_fs/file_system/operation_failed_error' require 'chef/chef_fs/file_system/operation_not_allowed_error' +require 'chef/chef_fs/parallelizer' class Chef module ChefFS @@ -47,8 +48,8 @@ class Chef attr_reader :root attr_reader :pattern - def each - list_from(root) { |entry| yield entry } + def each(&block) + list_from(root, &block) end def list_from(entry, &block) @@ -71,7 +72,8 @@ class Chef # Otherwise, go through all children and find any matches elsif entry.dir? - entry.children.each { |child| list_from(child, &block) } + results = Parallelizer::parallelize(entry.children, :flatten => true) { |child| Chef::ChefFS::FileSystem.list(child, pattern) } + results.each(&block) end end end diff --git a/lib/chef/chef_fs/knife.rb b/lib/chef/chef_fs/knife.rb index 5aec4ba7c0..270b6768f5 100644 --- a/lib/chef/chef_fs/knife.rb +++ b/lib/chef/chef_fs/knife.rb @@ -205,6 +205,9 @@ class Chef end end + def parallelize(inputs, options = {}, &block) + Chef::ChefFS::Parallelizer.parallelize(inputs, options, &block) + end end end end diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb new file mode 100644 index 0000000000..25b03a153c --- /dev/null +++ b/lib/chef/chef_fs/parallelizer.rb @@ -0,0 +1,110 @@ +class Chef + module ChefFS + class Parallelizer + def self.parallelize(enumerator, options = {}, &block) + @@parallelizer ||= Parallelizer.new(10) + @@parallelizer.parallelize(enumerator, options, &block) + end + + def initialize(threads) + @tasks_mutex = Mutex.new + @tasks = [] + @threads = [] + 1.upto(threads) do + @threads << Thread.new { worker_loop } + end + end + + def parallelize(enumerator, options = {}, &block) + task = ParallelizedResults.new(enumerator, options, &block) + @tasks_mutex.synchronize do + @tasks << task + end + task + end + + class ParallelizedResults + include Enumerable + + def initialize(enumerator, options, &block) + @inputs = enumerator.to_a + @options = options + @block = block + + @mutex = Mutex.new + @outputs = [] + @status = [] + end + + def each + next_index = 0 + while true + # Report any results that already exist + while @status.length > next_index && @status[next_index] == :finished + if @options[:flatten] + @outputs[next_index].each do |entry| + yield entry + end + else + yield @outputs[next_index] + end + next_index = next_index + 1 + end + + # Pick up a result and process it, if there is one. This ensures we + # move forward even if there are *zero* worker threads available. + if !process_input + # Exit if we're done. + if next_index >= @status.length + break + else + # Ruby 1.8 threading sucks. Wait till we process more things. + sleep(0.05) + end + end + end + end + + def process_input + # Grab the next one to process + index, input = @mutex.synchronize do + index = @status.length + if index >= @inputs.length + return nil + end + input = @inputs[index] + @status[index] = :started + [ index, input ] + end + + @outputs[index] = @block.call(input, @options) + @status[index] = :finished + index + end + end + + private + + def worker_loop + while true + begin + task = @tasks[0] + if task + if !task.process_input + @tasks_mutex.synchronize do + @tasks.delete(task) + end + end + else + # Ruby 1.8 threading sucks. Wait a bit to see if another task comes in. + sleep(0.05) + end + rescue + puts "ERROR #{$!}" + puts $!.backtrace + end + end + end + end + end +end diff --git a/lib/chef/knife/list.rb b/lib/chef/knife/list.rb index dca30aca46..296a5392ad 100644 --- a/lib/chef/knife/list.rb +++ b/lib/chef/knife/list.rb @@ -41,21 +41,28 @@ class Chef patterns = name_args.length == 0 ? [""] : name_args # Get the matches (recursively) - results = [] - dir_results = [] - pattern_args_from(patterns).each do |pattern| - Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern).each do |result| - if result.dir? && !config[:bare_directories] - dir_results += add_dir_result(result) - elsif result.exists? - results << result - elsif pattern.exact_path - ui.error "#{format_path(result)}: No such file or directory" - self.exit_code = 1 - end + all_results = parallelize(pattern_args_from(patterns), :flatten => true) do |pattern| + pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern) + if pattern_results.first && !pattern_results.first.exists? && pattern.exact_path + ui.error "#{format_path(pattern_results.first)}: No such file or directory" + self.exit_code = 1 end + pattern_results end + # Process directories + if !config[:bare_directories] + dir_results = parallelize(all_results.select { |result| result.dir? }, :flatten => true) do |result| + add_dir_result(result) + end.to_a + else + dir_results = [] + end + + # Process all other results + results = all_results.select { |result| result.exists? && (!result.dir? || config[:bare_directories]) }.to_a + + # Flatten out directory results if necessary if config[:flat] dir_results.each do |result, children| results += children @@ -63,9 +70,11 @@ class Chef dir_results = [] end + # Sort by path for happy output results = results.sort_by { |result| result.path } dir_results = dir_results.sort_by { |result| result[0].path } + # Print! if results.length == 0 && dir_results.length == 1 results = dir_results[0][1] dir_results = [] @@ -96,11 +105,8 @@ class Chef result = [ [ result, children ] ] if config[:recursive] - children.each do |child| - if child.dir? - result += add_dir_result(child) - end - end + child_dirs = children.select { |child| child.dir? } + result += parallelize(child_dirs, :flatten => true) { |child| add_dir_result(child) }.to_a end result end diff --git a/lib/chef/knife/show.rb b/lib/chef/knife/show.rb index f838ed466a..518d8c9ca5 100644 --- a/lib/chef/knife/show.rb +++ b/lib/chef/knife/show.rb @@ -17,26 +17,33 @@ class Chef def run # Get the matches (recursively) error = false - pattern_args.each do |pattern| - Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern).each do |result| - if result.dir? - ui.error "#{format_path(result)}: is a directory" if pattern.exact_path + entry_values = parallelize(pattern_args, :flatten => true) do |pattern| + parallelize(Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern)) do |entry| + if entry.dir? + ui.error "#{format_path(entry)}: is a directory" if pattern.exact_path error = true + nil else begin - value = result.read - output "#{format_path(result)}:" - output(format_for_display(value)) + [entry, entry.read] rescue Chef::ChefFS::FileSystem::OperationNotAllowedError => e ui.error "#{format_path(e.entry)}: #{e.reason}." error = true + nil rescue Chef::ChefFS::FileSystem::NotFoundError => e ui.error "#{format_path(e.entry)}: No such file or directory" error = true + nil end end end end + entry_values.each do |entry, value| + if entry + output "#{format_path(entry)}:" + output(format_for_display(value)) + end + end if error exit 1 end |