summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2013-05-03 10:23:37 -0700
committerJohn Keiser <jkeiser@opscode.com>2013-06-07 13:12:32 -0700
commitcf00c1587eadd3d0df5d73e132ce6084c0d35a71 (patch)
treeb5fc9bc513673415ec0a48a9e6c05f999368e5ae
parent4b33978d9aa40ea66db16b2f985f51f19bdacf8d (diff)
downloadchef-cf00c1587eadd3d0df5d73e132ce6084c0d35a71.tar.gz
Parallelize knife list and knife show
-rw-r--r--lib/chef/chef_fs/file_system.rb8
-rw-r--r--lib/chef/chef_fs/knife.rb3
-rw-r--r--lib/chef/chef_fs/parallelizer.rb110
-rw-r--r--lib/chef/knife/list.rb40
-rw-r--r--lib/chef/knife/show.rb21
5 files changed, 155 insertions, 27 deletions
diff --git a/lib/chef/chef_fs/file_system.rb b/lib/chef/chef_fs/file_system.rb
index 3c7fd3c155..523fe8b38e 100644
--- a/lib/chef/chef_fs/file_system.rb
+++ b/lib/chef/chef_fs/file_system.rb
@@ -20,6 +20,7 @@ require 'chef/chef_fs/path_utils'
require 'chef/chef_fs/file_system/default_environment_cannot_be_modified_error'
require 'chef/chef_fs/file_system/operation_failed_error'
require 'chef/chef_fs/file_system/operation_not_allowed_error'
+require 'chef/chef_fs/parallelizer'
class Chef
module ChefFS
@@ -47,8 +48,8 @@ class Chef
attr_reader :root
attr_reader :pattern
- def each
- list_from(root) { |entry| yield entry }
+ def each(&block)
+ list_from(root, &block)
end
def list_from(entry, &block)
@@ -71,7 +72,8 @@ class Chef
# Otherwise, go through all children and find any matches
elsif entry.dir?
- entry.children.each { |child| list_from(child, &block) }
+ results = Parallelizer::parallelize(entry.children, :flatten => true) { |child| Chef::ChefFS::FileSystem.list(child, pattern) }
+ results.each(&block)
end
end
end
diff --git a/lib/chef/chef_fs/knife.rb b/lib/chef/chef_fs/knife.rb
index 5aec4ba7c0..270b6768f5 100644
--- a/lib/chef/chef_fs/knife.rb
+++ b/lib/chef/chef_fs/knife.rb
@@ -205,6 +205,9 @@ class Chef
end
end
+ def parallelize(inputs, options = {}, &block)
+ Chef::ChefFS::Parallelizer.parallelize(inputs, options, &block)
+ end
end
end
end
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
new file mode 100644
index 0000000000..25b03a153c
--- /dev/null
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -0,0 +1,110 @@
+class Chef
+ module ChefFS
+ class Parallelizer
+ def self.parallelize(enumerator, options = {}, &block)
+ @@parallelizer ||= Parallelizer.new(10)
+ @@parallelizer.parallelize(enumerator, options, &block)
+ end
+
+ def initialize(threads)
+ @tasks_mutex = Mutex.new
+ @tasks = []
+ @threads = []
+ 1.upto(threads) do
+ @threads << Thread.new { worker_loop }
+ end
+ end
+
+ def parallelize(enumerator, options = {}, &block)
+ task = ParallelizedResults.new(enumerator, options, &block)
+ @tasks_mutex.synchronize do
+ @tasks << task
+ end
+ task
+ end
+
+ class ParallelizedResults
+ include Enumerable
+
+ def initialize(enumerator, options, &block)
+ @inputs = enumerator.to_a
+ @options = options
+ @block = block
+
+ @mutex = Mutex.new
+ @outputs = []
+ @status = []
+ end
+
+ def each
+ next_index = 0
+ while true
+ # Report any results that already exist
+ while @status.length > next_index && @status[next_index] == :finished
+ if @options[:flatten]
+ @outputs[next_index].each do |entry|
+ yield entry
+ end
+ else
+ yield @outputs[next_index]
+ end
+ next_index = next_index + 1
+ end
+
+ # Pick up a result and process it, if there is one. This ensures we
+ # move forward even if there are *zero* worker threads available.
+ if !process_input
+ # Exit if we're done.
+ if next_index >= @status.length
+ break
+ else
+ # Ruby 1.8 threading sucks. Wait till we process more things.
+ sleep(0.05)
+ end
+ end
+ end
+ end
+
+ def process_input
+ # Grab the next one to process
+ index, input = @mutex.synchronize do
+ index = @status.length
+ if index >= @inputs.length
+ return nil
+ end
+ input = @inputs[index]
+ @status[index] = :started
+ [ index, input ]
+ end
+
+ @outputs[index] = @block.call(input, @options)
+ @status[index] = :finished
+ index
+ end
+ end
+
+ private
+
+ def worker_loop
+ while true
+ begin
+ task = @tasks[0]
+ if task
+ if !task.process_input
+ @tasks_mutex.synchronize do
+ @tasks.delete(task)
+ end
+ end
+ else
+ # Ruby 1.8 threading sucks. Wait a bit to see if another task comes in.
+ sleep(0.05)
+ end
+ rescue
+ puts "ERROR #{$!}"
+ puts $!.backtrace
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/chef/knife/list.rb b/lib/chef/knife/list.rb
index dca30aca46..296a5392ad 100644
--- a/lib/chef/knife/list.rb
+++ b/lib/chef/knife/list.rb
@@ -41,21 +41,28 @@ class Chef
patterns = name_args.length == 0 ? [""] : name_args
# Get the matches (recursively)
- results = []
- dir_results = []
- pattern_args_from(patterns).each do |pattern|
- Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern).each do |result|
- if result.dir? && !config[:bare_directories]
- dir_results += add_dir_result(result)
- elsif result.exists?
- results << result
- elsif pattern.exact_path
- ui.error "#{format_path(result)}: No such file or directory"
- self.exit_code = 1
- end
+ all_results = parallelize(pattern_args_from(patterns), :flatten => true) do |pattern|
+ pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern)
+ if pattern_results.first && !pattern_results.first.exists? && pattern.exact_path
+ ui.error "#{format_path(pattern_results.first)}: No such file or directory"
+ self.exit_code = 1
end
+ pattern_results
end
+ # Process directories
+ if !config[:bare_directories]
+ dir_results = parallelize(all_results.select { |result| result.dir? }, :flatten => true) do |result|
+ add_dir_result(result)
+ end.to_a
+ else
+ dir_results = []
+ end
+
+ # Process all other results
+ results = all_results.select { |result| result.exists? && (!result.dir? || config[:bare_directories]) }.to_a
+
+ # Flatten out directory results if necessary
if config[:flat]
dir_results.each do |result, children|
results += children
@@ -63,9 +70,11 @@ class Chef
dir_results = []
end
+ # Sort by path for happy output
results = results.sort_by { |result| result.path }
dir_results = dir_results.sort_by { |result| result[0].path }
+ # Print!
if results.length == 0 && dir_results.length == 1
results = dir_results[0][1]
dir_results = []
@@ -96,11 +105,8 @@ class Chef
result = [ [ result, children ] ]
if config[:recursive]
- children.each do |child|
- if child.dir?
- result += add_dir_result(child)
- end
- end
+ child_dirs = children.select { |child| child.dir? }
+ result += parallelize(child_dirs, :flatten => true) { |child| add_dir_result(child) }.to_a
end
result
end
diff --git a/lib/chef/knife/show.rb b/lib/chef/knife/show.rb
index f838ed466a..518d8c9ca5 100644
--- a/lib/chef/knife/show.rb
+++ b/lib/chef/knife/show.rb
@@ -17,26 +17,33 @@ class Chef
def run
# Get the matches (recursively)
error = false
- pattern_args.each do |pattern|
- Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern).each do |result|
- if result.dir?
- ui.error "#{format_path(result)}: is a directory" if pattern.exact_path
+ entry_values = parallelize(pattern_args, :flatten => true) do |pattern|
+ parallelize(Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern)) do |entry|
+ if entry.dir?
+ ui.error "#{format_path(entry)}: is a directory" if pattern.exact_path
error = true
+ nil
else
begin
- value = result.read
- output "#{format_path(result)}:"
- output(format_for_display(value))
+ [entry, entry.read]
rescue Chef::ChefFS::FileSystem::OperationNotAllowedError => e
ui.error "#{format_path(e.entry)}: #{e.reason}."
error = true
+ nil
rescue Chef::ChefFS::FileSystem::NotFoundError => e
ui.error "#{format_path(e.entry)}: No such file or directory"
error = true
+ nil
end
end
end
end
+ entry_values.each do |entry, value|
+ if entry
+ output "#{format_path(entry)}:"
+ output(format_for_display(value))
+ end
+ end
if error
exit 1
end