diff options
Diffstat (limited to 'lib/chef/chef_fs/parallelizer.rb')
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 156 |
1 files changed, 66 insertions, 90 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb index 84f3d4d870..116a626869 100644 --- a/lib/chef/chef_fs/parallelizer.rb +++ b/lib/chef/chef_fs/parallelizer.rb @@ -1,127 +1,103 @@ +require 'thread' +require 'chef/chef_fs/parallelizer/parallel_enumerable' + class Chef module ChefFS + # Tries to balance several guarantees, in order of priority: + # - don't get deadlocked + # - provide results in desired order + # - provide results as soon as they are available + # - process input as soon as possible class Parallelizer @@parallelizer = nil @@threads = 0 def self.threads=(value) - if @@threads != value - @@threads = value - @@parallelizer = nil - end + @@threads = value + @@parallelizer.resize(value) if @@parallelizer end - def self.parallelize(enumerator, options = {}, &block) + def self.parallelizer @@parallelizer ||= Parallelizer.new(@@threads) - @@parallelizer.parallelize(enumerator, options, &block) end - def initialize(threads) - @tasks_mutex = Mutex.new - @tasks = [] - @threads = [] - 1.upto(threads) do - @threads << Thread.new { worker_loop } - end + def self.parallelize(enumerable, options = {}, &block) + parallelizer.parallelize(enumerable, options, &block) end - def parallelize(enumerator, options = {}, &block) - task = ParallelizedResults.new(enumerator, options, &block) - @tasks_mutex.synchronize do - @tasks << task - end - task + def self.parallel_do(enumerable, options = {}, &block) + parallelizer.parallel_do(enumerable, options, &block) end - class ParallelizedResults - include Enumerable + def initialize(num_threads) + @tasks = Queue.new + @threads = [] + @stop_thread = {} + resize(num_threads) + end - def initialize(enumerator, options, &block) - @inputs = enumerator.to_a - @options = options - @block = block + def num_threads + @threads.size + end - @mutex = Mutex.new - @outputs = [] - @status = [] - end + def parallelize(enumerable, options = {}, &block) + ParallelEnumerable.new(@tasks, enumerable, options, &block) + end - def each - next_index = 0 - while true - # Report any results that already exist - while @status.length > next_index && ([:finished, :exception].include?(@status[next_index])) - if @status[next_index] == :finished - if @options[:flatten] - @outputs[next_index].each do |entry| - yield entry - end - else - yield @outputs[next_index] - end - else - raise @outputs[next_index] - end - next_index = next_index + 1 - end + def parallel_do(enumerable, options = {}, &block) + ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait + end - # Pick up a result and process it, if there is one. This ensures we - # move forward even if there are *zero* worker threads available. - if !process_input - # Exit if we're done. - if next_index >= @status.length - break - else - # Ruby 1.8 threading sucks. Wait till we process more things. - sleep(0.05) - end - end + def stop(wait = true, timeout = nil) + resize(0, wait, timeout) + end + + def resize(to_threads, wait = true, timeout = nil) + if to_threads < num_threads + threads_to_stop = @threads[to_threads..num_threads-1] + @threads = @threads.slice(0, to_threads) + threads_to_stop.each do |thread| + @stop_thread[thread] = true end - end - def process_input - # Grab the next one to process - index, input = @mutex.synchronize do - index = @status.length - if index >= @inputs.length - return nil + if wait + start_time = Time.now + threads_to_stop.each do |thread| + thread_timeout = timeout ? timeout - (Time.now - start_time) : nil + thread.join(thread_timeout) end - input = @inputs[index] - @status[index] = :started - [ index, input ] end - begin - @outputs[index] = @block.call(input) - @status[index] = :finished - rescue Exception - @outputs[index] = $! - @status[index] = :exception + else + num_threads.upto(to_threads - 1) do |i| + @threads[i] = Thread.new(&method(:worker_loop)) end - index end end + def kill + @threads.each do |thread| + Thread.kill(thread) + @stop_thread.delete(thread) + end + @threads = [] + end + private def worker_loop - while true - begin - task = @tasks[0] - if task - if !task.process_input - @tasks_mutex.synchronize do - @tasks.delete(task) - end - end - else - # Ruby 1.8 threading sucks. Wait a bit to see if another task comes in. - sleep(0.05) + begin + while !@stop_thread[Thread.current] + begin + task = @tasks.pop + task.call + rescue + puts "ERROR #{$!}" + puts $!.backtrace end - rescue - puts "ERROR #{$!}" - puts $!.backtrace end + ensure + @stop_thread.delete(Thread.current) end end end |