summaryrefslogtreecommitdiff
path: root/lib/chef/chef_fs/parallelizer.rb
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2013-05-03 10:23:37 -0700
committerJohn Keiser <jkeiser@opscode.com>2013-06-07 13:12:32 -0700
commitcf00c1587eadd3d0df5d73e132ce6084c0d35a71 (patch)
treeb5fc9bc513673415ec0a48a9e6c05f999368e5ae /lib/chef/chef_fs/parallelizer.rb
parent4b33978d9aa40ea66db16b2f985f51f19bdacf8d (diff)
downloadchef-cf00c1587eadd3d0df5d73e132ce6084c0d35a71.tar.gz
Parallelize knife list and knife show
Diffstat (limited to 'lib/chef/chef_fs/parallelizer.rb')
-rw-r--r--lib/chef/chef_fs/parallelizer.rb110
1 files changed, 110 insertions, 0 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
new file mode 100644
index 0000000000..25b03a153c
--- /dev/null
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -0,0 +1,110 @@
+class Chef
+ module ChefFS
+ class Parallelizer
+ def self.parallelize(enumerator, options = {}, &block)
+ @@parallelizer ||= Parallelizer.new(10)
+ @@parallelizer.parallelize(enumerator, options, &block)
+ end
+
+ def initialize(threads)
+ @tasks_mutex = Mutex.new
+ @tasks = []
+ @threads = []
+ 1.upto(threads) do
+ @threads << Thread.new { worker_loop }
+ end
+ end
+
+ def parallelize(enumerator, options = {}, &block)
+ task = ParallelizedResults.new(enumerator, options, &block)
+ @tasks_mutex.synchronize do
+ @tasks << task
+ end
+ task
+ end
+
+ class ParallelizedResults
+ include Enumerable
+
+ def initialize(enumerator, options, &block)
+ @inputs = enumerator.to_a
+ @options = options
+ @block = block
+
+ @mutex = Mutex.new
+ @outputs = []
+ @status = []
+ end
+
+ def each
+ next_index = 0
+ while true
+ # Report any results that already exist
+ while @status.length > next_index && @status[next_index] == :finished
+ if @options[:flatten]
+ @outputs[next_index].each do |entry|
+ yield entry
+ end
+ else
+ yield @outputs[next_index]
+ end
+ next_index = next_index + 1
+ end
+
+ # Pick up a result and process it, if there is one. This ensures we
+ # move forward even if there are *zero* worker threads available.
+ if !process_input
+ # Exit if we're done.
+ if next_index >= @status.length
+ break
+ else
+ # Ruby 1.8 threading sucks. Wait till we process more things.
+ sleep(0.05)
+ end
+ end
+ end
+ end
+
+ def process_input
+ # Grab the next one to process
+ index, input = @mutex.synchronize do
+ index = @status.length
+ if index >= @inputs.length
+ return nil
+ end
+ input = @inputs[index]
+ @status[index] = :started
+ [ index, input ]
+ end
+
+ @outputs[index] = @block.call(input, @options)
+ @status[index] = :finished
+ index
+ end
+ end
+
+ private
+
+ def worker_loop
+ while true
+ begin
+ task = @tasks[0]
+ if task
+ if !task.process_input
+ @tasks_mutex.synchronize do
+ @tasks.delete(task)
+ end
+ end
+ else
+ # Ruby 1.8 threading sucks. Wait a bit to see if another task comes in.
+ sleep(0.05)
+ end
+ rescue
+ puts "ERROR #{$!}"
+ puts $!.backtrace
+ end
+ end
+ end
+ end
+ end
+end