summaryrefslogtreecommitdiff
path: root/lib/chef/chef_fs/parallelizer.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/chef/chef_fs/parallelizer.rb')
-rw-r--r--lib/chef/chef_fs/parallelizer.rb156
1 files changed, 66 insertions, 90 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 84f3d4d870..116a626869 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -1,127 +1,103 @@
+require 'thread'
+require 'chef/chef_fs/parallelizer/parallel_enumerable'
+
class Chef
module ChefFS
+ # Tries to balance several guarantees, in order of priority:
+ # - don't get deadlocked
+ # - provide results in desired order
+ # - provide results as soon as they are available
+ # - process input as soon as possible
class Parallelizer
@@parallelizer = nil
@@threads = 0
def self.threads=(value)
- if @@threads != value
- @@threads = value
- @@parallelizer = nil
- end
+ @@threads = value
+ @@parallelizer.resize(value) if @@parallelizer
end
- def self.parallelize(enumerator, options = {}, &block)
+ def self.parallelizer
@@parallelizer ||= Parallelizer.new(@@threads)
- @@parallelizer.parallelize(enumerator, options, &block)
end
- def initialize(threads)
- @tasks_mutex = Mutex.new
- @tasks = []
- @threads = []
- 1.upto(threads) do
- @threads << Thread.new { worker_loop }
- end
+ def self.parallelize(enumerable, options = {}, &block)
+ parallelizer.parallelize(enumerable, options, &block)
end
- def parallelize(enumerator, options = {}, &block)
- task = ParallelizedResults.new(enumerator, options, &block)
- @tasks_mutex.synchronize do
- @tasks << task
- end
- task
+ def self.parallel_do(enumerable, options = {}, &block)
+ parallelizer.parallel_do(enumerable, options, &block)
end
- class ParallelizedResults
- include Enumerable
+ def initialize(num_threads)
+ @tasks = Queue.new
+ @threads = []
+ @stop_thread = {}
+ resize(num_threads)
+ end
- def initialize(enumerator, options, &block)
- @inputs = enumerator.to_a
- @options = options
- @block = block
+ def num_threads
+ @threads.size
+ end
- @mutex = Mutex.new
- @outputs = []
- @status = []
- end
+ def parallelize(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options, &block)
+ end
- def each
- next_index = 0
- while true
- # Report any results that already exist
- while @status.length > next_index && ([:finished, :exception].include?(@status[next_index]))
- if @status[next_index] == :finished
- if @options[:flatten]
- @outputs[next_index].each do |entry|
- yield entry
- end
- else
- yield @outputs[next_index]
- end
- else
- raise @outputs[next_index]
- end
- next_index = next_index + 1
- end
+ def parallel_do(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
+ end
- # Pick up a result and process it, if there is one. This ensures we
- # move forward even if there are *zero* worker threads available.
- if !process_input
- # Exit if we're done.
- if next_index >= @status.length
- break
- else
- # Ruby 1.8 threading sucks. Wait till we process more things.
- sleep(0.05)
- end
- end
+ def stop(wait = true, timeout = nil)
+ resize(0, wait, timeout)
+ end
+
+ def resize(to_threads, wait = true, timeout = nil)
+ if to_threads < num_threads
+ threads_to_stop = @threads[to_threads..num_threads-1]
+ @threads = @threads.slice(0, to_threads)
+ threads_to_stop.each do |thread|
+ @stop_thread[thread] = true
end
- end
- def process_input
- # Grab the next one to process
- index, input = @mutex.synchronize do
- index = @status.length
- if index >= @inputs.length
- return nil
+ if wait
+ start_time = Time.now
+ threads_to_stop.each do |thread|
+ thread_timeout = timeout ? timeout - (Time.now - start_time) : nil
+ thread.join(thread_timeout)
end
- input = @inputs[index]
- @status[index] = :started
- [ index, input ]
end
- begin
- @outputs[index] = @block.call(input)
- @status[index] = :finished
- rescue Exception
- @outputs[index] = $!
- @status[index] = :exception
+ else
+ num_threads.upto(to_threads - 1) do |i|
+ @threads[i] = Thread.new(&method(:worker_loop))
end
- index
end
end
+ def kill
+ @threads.each do |thread|
+ Thread.kill(thread)
+ @stop_thread.delete(thread)
+ end
+ @threads = []
+ end
+
private
def worker_loop
- while true
- begin
- task = @tasks[0]
- if task
- if !task.process_input
- @tasks_mutex.synchronize do
- @tasks.delete(task)
- end
- end
- else
- # Ruby 1.8 threading sucks. Wait a bit to see if another task comes in.
- sleep(0.05)
+ begin
+ while !@stop_thread[Thread.current]
+ begin
+ task = @tasks.pop
+ task.call
+ rescue
+ puts "ERROR #{$!}"
+ puts $!.backtrace
end
- rescue
- puts "ERROR #{$!}"
- puts $!.backtrace
end
+ ensure
+ @stop_thread.delete(Thread.current)
end
end
end