diff options
author | John Keiser <jkeiser@opscode.com> | 2013-05-03 10:23:37 -0700 |
---|---|---|
committer | John Keiser <jkeiser@opscode.com> | 2013-06-07 13:12:32 -0700 |
commit | cf00c1587eadd3d0df5d73e132ce6084c0d35a71 (patch) | |
tree | b5fc9bc513673415ec0a48a9e6c05f999368e5ae /lib/chef/chef_fs/parallelizer.rb | |
parent | 4b33978d9aa40ea66db16b2f985f51f19bdacf8d (diff) | |
download | chef-cf00c1587eadd3d0df5d73e132ce6084c0d35a71.tar.gz |
Parallelize knife list and knife show
Diffstat (limited to 'lib/chef/chef_fs/parallelizer.rb')
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 110 |
1 files changed, 110 insertions, 0 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb new file mode 100644 index 0000000000..25b03a153c --- /dev/null +++ b/lib/chef/chef_fs/parallelizer.rb @@ -0,0 +1,110 @@ +class Chef + module ChefFS + class Parallelizer + def self.parallelize(enumerator, options = {}, &block) + @@parallelizer ||= Parallelizer.new(10) + @@parallelizer.parallelize(enumerator, options, &block) + end + + def initialize(threads) + @tasks_mutex = Mutex.new + @tasks = [] + @threads = [] + 1.upto(threads) do + @threads << Thread.new { worker_loop } + end + end + + def parallelize(enumerator, options = {}, &block) + task = ParallelizedResults.new(enumerator, options, &block) + @tasks_mutex.synchronize do + @tasks << task + end + task + end + + class ParallelizedResults + include Enumerable + + def initialize(enumerator, options, &block) + @inputs = enumerator.to_a + @options = options + @block = block + + @mutex = Mutex.new + @outputs = [] + @status = [] + end + + def each + next_index = 0 + while true + # Report any results that already exist + while @status.length > next_index && @status[next_index] == :finished + if @options[:flatten] + @outputs[next_index].each do |entry| + yield entry + end + else + yield @outputs[next_index] + end + next_index = next_index + 1 + end + + # Pick up a result and process it, if there is one. This ensures we + # move forward even if there are *zero* worker threads available. + if !process_input + # Exit if we're done. + if next_index >= @status.length + break + else + # Ruby 1.8 threading sucks. Wait till we process more things. + sleep(0.05) + end + end + end + end + + def process_input + # Grab the next one to process + index, input = @mutex.synchronize do + index = @status.length + if index >= @inputs.length + return nil + end + input = @inputs[index] + @status[index] = :started + [ index, input ] + end + + @outputs[index] = @block.call(input, @options) + @status[index] = :finished + index + end + end + + private + + def worker_loop + while true + begin + task = @tasks[0] + if task + if !task.process_input + @tasks_mutex.synchronize do + @tasks.delete(task) + end + end + else + # Ruby 1.8 threading sucks. Wait a bit to see if another task comes in. + sleep(0.05) + end + rescue + puts "ERROR #{$!}" + puts $!.backtrace + end + end + end + end + end +end |