summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-16 20:25:08 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-17 20:03:32 -0700
commit09f81fcbee13c7939bac1f8db4e6cac5a85ce0d7 (patch)
tree5fed37416b46314b7280e0229fbb6c5e6ca04cec
parent62103f41818f897330b1ed424334d1eb88bcef41 (diff)
downloadchef-09f81fcbee13c7939bac1f8db4e6cac5a85ce0d7.tar.gz
Add methods to receive asynchronous output from parallelizer
- make parallelizer less eager to process inputs on its thread except when unblocking, to leave it free to send outputs - allow parallelizer to start making headway with long-running inputs before the inputs are done - use ruby internal Queues to manage locking instead of mutexes
-rw-r--r--lib/chef/chef_fs/parallelizer.rb220
-rw-r--r--spec/unit/chef_fs/parallelizer.rb227
2 files changed, 376 insertions, 71 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 84f3d4d870..8996304675 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -1,3 +1,5 @@
+require 'thread'
+
class Chef
module ChefFS
class Parallelizer
@@ -11,117 +13,193 @@ class Chef
end
end
- def self.parallelize(enumerator, options = {}, &block)
+ def self.parallelizer
@@parallelizer ||= Parallelizer.new(@@threads)
- @@parallelizer.parallelize(enumerator, options, &block)
+ end
+
+ def self.parallelize(enumerator, options = {}, &block)
+ parallelizer.parallelize(enumerator, options, &block)
end
def initialize(threads)
- @tasks_mutex = Mutex.new
- @tasks = []
+ @tasks = Queue.new
@threads = []
- 1.upto(threads) do
- @threads << Thread.new { worker_loop }
+ 1.upto(threads) do |i|
+ @threads << Thread.new(&method(:worker_loop))
end
end
- def parallelize(enumerator, options = {}, &block)
- task = ParallelizedResults.new(enumerator, options, &block)
- @tasks_mutex.synchronize do
- @tasks << task
+ def parallelize(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options, &block)
+ end
+
+ def stop
+ @threads.each do |thread|
+ Thread.kill(thread)
+ end
+ @threads = []
+ end
+
+ def worker_loop
+ while true
+ begin
+ task = @tasks.pop
+ task.call
+ rescue
+ puts "ERROR #{$!}"
+ puts $!.backtrace
+ end
end
- task
end
- class ParallelizedResults
+ class ParallelEnumerable
include Enumerable
- def initialize(enumerator, options, &block)
- @inputs = enumerator.to_a
+ # options:
+ # :ordered [true|false] - whether the output should stay in the same order
+ # as the input (even though it may not actually be processed in that
+ # order). Default: true
+ # :main_thread_processing [true|false] - whether the main thread pulling
+ # on each() is allowed to process inputs. Default: true
+ def initialize(parent_task_queue, enumerable, options, &block)
+ @task_queue = Queue.new
+ @parent_task_queue = parent_task_queue
+ @enumerable = enumerable
@options = options
@block = block
+ @unconsumed_output = Queue.new
+ @in_process = 0
+ end
- @mutex = Mutex.new
- @outputs = []
- @status = []
+ def wait
+ each_with_input_unordered do |input, output, index|
+ end
end
- def each
- next_index = 0
- while true
- # Report any results that already exist
- while @status.length > next_index && ([:finished, :exception].include?(@status[next_index]))
- if @status[next_index] == :finished
- if @options[:flatten]
- @outputs[next_index].each do |entry|
- yield entry
- end
+ def each_with_input_unordered
+ awaiting_output = 0
+
+ # Grab all the inputs, yielding any responses during enumeration
+ # in case the enumeration itself takes time
+ begin
+ @enumerable.each_with_index do |input, index|
+ awaiting_output += 1
+ @task_queue.push([ input, index ])
+ @parent_task_queue.push(method(:process_one))
+ while !@unconsumed_output.empty?
+ type, input, output, index = @unconsumed_output.pop
+ if type == :exception
+ exception ||= output
else
- yield @outputs[next_index]
+ yield input, output, index
end
- else
- raise @outputs[next_index]
+ awaiting_output -= 1
end
- next_index = next_index + 1
end
+ rescue
+ # We still want to wait for the rest of the outputs to process
+ awaiting_output += 1
+ @unconsumed_output.push([:exception, nil, $!, nil])
+ end
- # Pick up a result and process it, if there is one. This ensures we
- # move forward even if there are *zero* worker threads available.
- if !process_input
- # Exit if we're done.
- if next_index >= @status.length
- break
+ while awaiting_output > 0
+ # yield thread to others (for 1.8.7)
+ if @unconsumed_output.empty?
+ sleep(0.01)
+ end
+
+ while !@unconsumed_output.empty?
+ type, input, output, index = @unconsumed_output.pop
+ if type == :exception
+ exception ||= output
else
- # Ruby 1.8 threading sucks. Wait till we process more things.
- sleep(0.05)
+ yield input, output, index
end
+ awaiting_output -= 1
+ end
+
+ # If no one is working on our tasks and we're allowed to
+ # work on them in the main thread, process an input to
+ # move things forward.
+ if @in_process == 0 && !(@options[:main_thread_processing] == false)
+ process_one
end
end
+
+ if exception
+ raise exception
+ end
end
- def process_input
- # Grab the next one to process
- index, input = @mutex.synchronize do
- index = @status.length
- if index >= @inputs.length
- return nil
+ def each_with_input_ordered
+ next_to_yield = 0
+ unconsumed = {}
+ each_with_input_unordered do |input, output, index|
+ unconsumed[index] = [ input, output ]
+ while unconsumed[next_to_yield]
+ input_output = unconsumed.delete(next_to_yield)
+ yield input_output[0], input_output[1], next_to_yield
+ next_to_yield += 1
end
- input = @inputs[index]
- @status[index] = :started
- [ index, input ]
end
+ end
- begin
- @outputs[index] = @block.call(input)
- @status[index] = :finished
- rescue Exception
- @outputs[index] = $!
- @status[index] = :exception
+ def each_with_input(&block)
+ if @options[:ordered] == false
+ each_with_input_unordered(&block)
+ else
+ each_with_input_ordered(&block)
end
- index
end
- end
- private
+ def each_with_index
+ if @options[:ordered] == false
+ each_with_input_unordered do |input, output, index|
+ yield output, index
+ end
+ else
+ each_with_input_ordered do |input, output, index|
+ yield output, index
+ end
+ end
+ end
- def worker_loop
- while true
+ def each
+ if @options[:ordered] == false
+ each_with_input_unordered do |input, output, index|
+ yield output
+ end
+ else
+ each_with_input_ordered do |input, output, index|
+ yield output
+ end
+ end
+ end
+
+ private
+
+ def process_one
+ @in_process += 1
begin
- task = @tasks[0]
- if task
- if !task.process_input
- @tasks_mutex.synchronize do
- @tasks.delete(task)
- end
- end
- else
- # Ruby 1.8 threading sucks. Wait a bit to see if another task comes in.
- sleep(0.05)
+ begin
+ input, index = @task_queue.pop(true)
+ process_input(input, index)
+ rescue ThreadError
end
+ ensure
+ @in_process -= 1
+ end
+ end
+
+ def process_input(input, index)
+ begin
+ output = @block.call(input)
+ @unconsumed_output.push([ :result, input, output, index ])
rescue
- puts "ERROR #{$!}"
- puts $!.backtrace
+ @unconsumed_output.push([ :exception, input, $!, index ])
end
+
+ index
end
end
end
diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb
new file mode 100644
index 0000000000..bf5d4c7eab
--- /dev/null
+++ b/spec/unit/chef_fs/parallelizer.rb
@@ -0,0 +1,227 @@
+require 'spec_helper'
+require 'chef/chef_fs/parallelizer'
+
+describe Chef::ChefFS::Parallelizer do
+ class EnumerableWithException
+ include Enumerable
+
+ def initialize(*results)
+ @results = results
+ end
+
+ def each
+ @results.each do |x|
+ yield x
+ end
+ raise 'hi'
+ end
+ end
+
+ before :each do
+ @start_time = Time.now
+ end
+
+ def elapsed_time
+ Time.now - @start_time
+ end
+
+ after :each do
+ parallelizer.stop
+ end
+
+ context 'With a Parallelizer with 5 threads' do
+ let :parallelizer do
+ Chef::ChefFS::Parallelizer.new(5)
+ end
+
+ def parallelize(inputs, options = {}, &block)
+ parallelizer.parallelize(inputs, { :main_thread_processing => false }.merge(options), &block)
+ end
+
+ context "With :ordered => false (unordered output)" do
+ it "An empty input produces an empty output" do
+ parallelize([], :ordered => false) do
+ sleep 10
+ end.to_a == []
+ elapsed_time.should < 1
+ end
+
+ it "10 sleep(0.5)s complete within 2 seconds" do
+ parallelize(1.upto(10), :ordered => false) do |i|
+ sleep 0.5
+ 'x'
+ end.to_a.should == %w(x x x x x x x x x x)
+ elapsed_time.should < 2
+ end
+
+ it "The output comes as soon as it is available" do
+ enum = parallelize([0.5,0.3,0.1], :ordered => false) do |val|
+ sleep val
+ val
+ end.enum_for(:each_with_index)
+ enum.next.should == [ 0.1, 2 ]
+ elapsed_time.should < 0.3
+ enum.next.should == [ 0.3, 1 ]
+ elapsed_time.should < 0.5
+ enum.next.should == [ 0.5, 0 ]
+ elapsed_time.should < 0.7
+ end
+
+ it "An exception in input is passed through but does NOT stop processing" do
+ enum = parallelize(EnumerableWithException.new(0.5,0.3,0.1), :ordered => false) { |x| sleep(x); x }.enum_for(:each)
+ enum.next.should == 0.1
+ elapsed_time.should > 0.1
+ enum.next.should == 0.3
+ enum.next.should == 0.5
+ expect { enum.next }.to raise_error 'hi'
+ elapsed_time.should < 0.7
+ end
+
+ it "Exceptions in output are raised after all processing is done" do
+ processed = 0
+ enum = parallelize([0.2,0.1,'x',0.3], :ordered => false) do |x|
+ sleep(x)
+ processed += 1
+ x
+ end.enum_for(:each)
+ enum.next.should == 0.1
+ enum.next.should == 0.2
+ elapsed_time.should > 0.19
+ enum.next.should == 0.3
+ expect { enum.next }.to raise_error
+ elapsed_time.should < 0.5
+ processed.should == 3
+ end
+ end
+
+ context "With :ordered => true (ordered output)" do
+ it "An empty input produces an empty output" do
+ parallelize([]) do
+ sleep 10
+ end.to_a == []
+ elapsed_time.should < 1
+ end
+
+ it "10 sleep(0.5)s complete within 2 seconds" do
+ parallelize(1.upto(10)) do
+ sleep 0.5
+ 'x'
+ end.to_a.should == %w(x x x x x x x x x x)
+ elapsed_time.should < 2
+ end
+
+ it "Output comes in the order of the input" do
+ enum = parallelize([0.5,0.3,0.1]) do |val|
+ sleep val
+ val
+ end.enum_for(:each_with_index)
+ enum.next.should == [ 0.5, 0 ]
+ enum.next.should == [ 0.3, 1 ]
+ enum.next.should == [ 0.1, 2 ]
+ elapsed_time.should < 0.7
+ end
+
+ it "Exceptions in input are raised in the correct sequence but do NOT stop processing" do
+ enum = parallelize(EnumerableWithException.new(0.5,0.3,0.1)) { |x| sleep(x); x }.enum_for(:each)
+ enum.next.should == 0.5
+ elapsed_time.should < 0.7
+ enum.next.should == 0.3
+ enum.next.should == 0.1
+ expect { enum.next }.to raise_error 'hi'
+ elapsed_time.should < 0.7
+ end
+
+ it "Exceptions in output are raised in the correct sequence but do NOT stop processing" do
+ processed = 0
+ enum = parallelize([0.2,0.1,'x',0.3]) do |x|
+ sleep(x)
+ processed += 1
+ x
+ end.enum_for(:each)
+ enum.next.should == 0.2
+ enum.next.should == 0.1
+ expect { enum.next }.to raise_error
+ elapsed_time.should > 0.25
+ elapsed_time.should < 0.55
+ processed.should == 3
+ end
+ end
+ end
+
+ context "With a Parallelizer with 1 thread" do
+ let :parallelizer do
+ Chef::ChefFS::Parallelizer.new(1)
+ end
+
+ context "when the thread is occupied with a job" do
+ before :each do
+ parallelizer
+ started = false
+ @thread = Thread.new do
+ parallelizer.parallelize([0], :main_thread_processing => false) { |x| started = true; sleep(0.3) }.wait
+ end
+ while !started
+ sleep(0.01)
+ end
+ end
+
+ after :each do
+ Thread.kill(@thread)
+ end
+
+ it "parallelize with :main_thread_processing = true does not block" do
+ parallelizer.parallelize([1]) do |x|
+ sleep(0.1)
+ x
+ end.to_a.should == [ 1 ]
+ elapsed_time.should < 0.2
+ end
+
+ it "parallelize with :main_thread_processing = false waits for the job to finish" do
+ parallelizer.parallelize([1], :main_thread_processing => false) do |x|
+ sleep(0.1)
+ x
+ end.to_a.should == [ 1 ]
+ elapsed_time.should > 0.3
+ end
+ end
+ end
+
+ context "With a Parallelizer with 0 threads" do
+ let :parallelizer do
+ Chef::ChefFS::Parallelizer.new(0)
+ end
+
+ context "And main_thread_processing on" do
+ it "succeeds in running" do
+ parallelizer.parallelize([0.5]) { |x| x*2 }.to_a.should == [1]
+ end
+ end
+ end
+
+ context "With a Parallelizer with 10 threads" do
+ let :parallelizer do
+ Chef::ChefFS::Parallelizer.new(10)
+ end
+
+ it "does not have contention issues with large numbers of inputs" do
+ parallelizer.parallelize(1.upto(500)) { |x| x+1 }.to_a.should == 2.upto(501).to_a
+ end
+
+ it "does not have contention issues with large numbers of inputs with ordering off" do
+ parallelizer.parallelize(1.upto(500), :ordered => false) { |x| x+1 }.to_a.sort.should == 2.upto(501).to_a
+ end
+
+ it "does not have contention issues with large numbers of jobs and inputs with ordering off" do
+ parallelizers = 0.upto(99).map do
+ parallelizer.parallelize(1.upto(500)) { |x| x+1 }
+ end
+ outputs = []
+ threads = 0.upto(99).map do |i|
+ Thread.new { outputs[i] = parallelizers[i].to_a }
+ end
+ threads.each { |thread| thread.join }
+ outputs.each { |output| output.sort.should == 2.upto(501).to_a }
+ end
+ end
+end