diff options
author | John Keiser <jkeiser@opscode.com> | 2014-05-16 20:25:08 -0700 |
---|---|---|
committer | John Keiser <jkeiser@opscode.com> | 2014-05-17 20:03:32 -0700 |
commit | 09f81fcbee13c7939bac1f8db4e6cac5a85ce0d7 (patch) | |
tree | 5fed37416b46314b7280e0229fbb6c5e6ca04cec | |
parent | 62103f41818f897330b1ed424334d1eb88bcef41 (diff) | |
download | chef-09f81fcbee13c7939bac1f8db4e6cac5a85ce0d7.tar.gz |
Add methods to receive asynchronous output from parallelizer
- make parallelizer less eager to process inputs on its thread except when unblocking, to leave it free to send outputs
- allow parallelizer to start making headway with long-running inputs before the inputs are done
- use ruby internal Queues to manage locking instead of mutexes
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 220 | ||||
-rw-r--r-- | spec/unit/chef_fs/parallelizer.rb | 227 |
2 files changed, 376 insertions, 71 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb index 84f3d4d870..8996304675 100644 --- a/lib/chef/chef_fs/parallelizer.rb +++ b/lib/chef/chef_fs/parallelizer.rb @@ -1,3 +1,5 @@ +require 'thread' + class Chef module ChefFS class Parallelizer @@ -11,117 +13,193 @@ class Chef end end - def self.parallelize(enumerator, options = {}, &block) + def self.parallelizer @@parallelizer ||= Parallelizer.new(@@threads) - @@parallelizer.parallelize(enumerator, options, &block) + end + + def self.parallelize(enumerator, options = {}, &block) + parallelizer.parallelize(enumerator, options, &block) end def initialize(threads) - @tasks_mutex = Mutex.new - @tasks = [] + @tasks = Queue.new @threads = [] - 1.upto(threads) do - @threads << Thread.new { worker_loop } + 1.upto(threads) do |i| + @threads << Thread.new(&method(:worker_loop)) end end - def parallelize(enumerator, options = {}, &block) - task = ParallelizedResults.new(enumerator, options, &block) - @tasks_mutex.synchronize do - @tasks << task + def parallelize(enumerable, options = {}, &block) + ParallelEnumerable.new(@tasks, enumerable, options, &block) + end + + def stop + @threads.each do |thread| + Thread.kill(thread) + end + @threads = [] + end + + def worker_loop + while true + begin + task = @tasks.pop + task.call + rescue + puts "ERROR #{$!}" + puts $!.backtrace + end end - task end - class ParallelizedResults + class ParallelEnumerable include Enumerable - def initialize(enumerator, options, &block) - @inputs = enumerator.to_a + # options: + # :ordered [true|false] - whether the output should stay in the same order + # as the input (even though it may not actually be processed in that + # order). Default: true + # :main_thread_processing [true|false] - whether the main thread pulling + # on each() is allowed to process inputs. Default: true + def initialize(parent_task_queue, enumerable, options, &block) + @task_queue = Queue.new + @parent_task_queue = parent_task_queue + @enumerable = enumerable @options = options @block = block + @unconsumed_output = Queue.new + @in_process = 0 + end - @mutex = Mutex.new - @outputs = [] - @status = [] + def wait + each_with_input_unordered do |input, output, index| + end end - def each - next_index = 0 - while true - # Report any results that already exist - while @status.length > next_index && ([:finished, :exception].include?(@status[next_index])) - if @status[next_index] == :finished - if @options[:flatten] - @outputs[next_index].each do |entry| - yield entry - end + def each_with_input_unordered + awaiting_output = 0 + + # Grab all the inputs, yielding any responses during enumeration + # in case the enumeration itself takes time + begin + @enumerable.each_with_index do |input, index| + awaiting_output += 1 + @task_queue.push([ input, index ]) + @parent_task_queue.push(method(:process_one)) + while !@unconsumed_output.empty? + type, input, output, index = @unconsumed_output.pop + if type == :exception + exception ||= output else - yield @outputs[next_index] + yield input, output, index end - else - raise @outputs[next_index] + awaiting_output -= 1 end - next_index = next_index + 1 end + rescue + # We still want to wait for the rest of the outputs to process + awaiting_output += 1 + @unconsumed_output.push([:exception, nil, $!, nil]) + end - # Pick up a result and process it, if there is one. This ensures we - # move forward even if there are *zero* worker threads available. - if !process_input - # Exit if we're done. - if next_index >= @status.length - break + while awaiting_output > 0 + # yield thread to others (for 1.8.7) + if @unconsumed_output.empty? + sleep(0.01) + end + + while !@unconsumed_output.empty? + type, input, output, index = @unconsumed_output.pop + if type == :exception + exception ||= output else - # Ruby 1.8 threading sucks. Wait till we process more things. - sleep(0.05) + yield input, output, index end + awaiting_output -= 1 + end + + # If no one is working on our tasks and we're allowed to + # work on them in the main thread, process an input to + # move things forward. + if @in_process == 0 && !(@options[:main_thread_processing] == false) + process_one end end + + if exception + raise exception + end end - def process_input - # Grab the next one to process - index, input = @mutex.synchronize do - index = @status.length - if index >= @inputs.length - return nil + def each_with_input_ordered + next_to_yield = 0 + unconsumed = {} + each_with_input_unordered do |input, output, index| + unconsumed[index] = [ input, output ] + while unconsumed[next_to_yield] + input_output = unconsumed.delete(next_to_yield) + yield input_output[0], input_output[1], next_to_yield + next_to_yield += 1 end - input = @inputs[index] - @status[index] = :started - [ index, input ] end + end - begin - @outputs[index] = @block.call(input) - @status[index] = :finished - rescue Exception - @outputs[index] = $! - @status[index] = :exception + def each_with_input(&block) + if @options[:ordered] == false + each_with_input_unordered(&block) + else + each_with_input_ordered(&block) end - index end - end - private + def each_with_index + if @options[:ordered] == false + each_with_input_unordered do |input, output, index| + yield output, index + end + else + each_with_input_ordered do |input, output, index| + yield output, index + end + end + end - def worker_loop - while true + def each + if @options[:ordered] == false + each_with_input_unordered do |input, output, index| + yield output + end + else + each_with_input_ordered do |input, output, index| + yield output + end + end + end + + private + + def process_one + @in_process += 1 begin - task = @tasks[0] - if task - if !task.process_input - @tasks_mutex.synchronize do - @tasks.delete(task) - end - end - else - # Ruby 1.8 threading sucks. Wait a bit to see if another task comes in. - sleep(0.05) + begin + input, index = @task_queue.pop(true) + process_input(input, index) + rescue ThreadError end + ensure + @in_process -= 1 + end + end + + def process_input(input, index) + begin + output = @block.call(input) + @unconsumed_output.push([ :result, input, output, index ]) rescue - puts "ERROR #{$!}" - puts $!.backtrace + @unconsumed_output.push([ :exception, input, $!, index ]) end + + index end end end diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb new file mode 100644 index 0000000000..bf5d4c7eab --- /dev/null +++ b/spec/unit/chef_fs/parallelizer.rb @@ -0,0 +1,227 @@ +require 'spec_helper' +require 'chef/chef_fs/parallelizer' + +describe Chef::ChefFS::Parallelizer do + class EnumerableWithException + include Enumerable + + def initialize(*results) + @results = results + end + + def each + @results.each do |x| + yield x + end + raise 'hi' + end + end + + before :each do + @start_time = Time.now + end + + def elapsed_time + Time.now - @start_time + end + + after :each do + parallelizer.stop + end + + context 'With a Parallelizer with 5 threads' do + let :parallelizer do + Chef::ChefFS::Parallelizer.new(5) + end + + def parallelize(inputs, options = {}, &block) + parallelizer.parallelize(inputs, { :main_thread_processing => false }.merge(options), &block) + end + + context "With :ordered => false (unordered output)" do + it "An empty input produces an empty output" do + parallelize([], :ordered => false) do + sleep 10 + end.to_a == [] + elapsed_time.should < 1 + end + + it "10 sleep(0.5)s complete within 2 seconds" do + parallelize(1.upto(10), :ordered => false) do |i| + sleep 0.5 + 'x' + end.to_a.should == %w(x x x x x x x x x x) + elapsed_time.should < 2 + end + + it "The output comes as soon as it is available" do + enum = parallelize([0.5,0.3,0.1], :ordered => false) do |val| + sleep val + val + end.enum_for(:each_with_index) + enum.next.should == [ 0.1, 2 ] + elapsed_time.should < 0.3 + enum.next.should == [ 0.3, 1 ] + elapsed_time.should < 0.5 + enum.next.should == [ 0.5, 0 ] + elapsed_time.should < 0.7 + end + + it "An exception in input is passed through but does NOT stop processing" do + enum = parallelize(EnumerableWithException.new(0.5,0.3,0.1), :ordered => false) { |x| sleep(x); x }.enum_for(:each) + enum.next.should == 0.1 + elapsed_time.should > 0.1 + enum.next.should == 0.3 + enum.next.should == 0.5 + expect { enum.next }.to raise_error 'hi' + elapsed_time.should < 0.7 + end + + it "Exceptions in output are raised after all processing is done" do + processed = 0 + enum = parallelize([0.2,0.1,'x',0.3], :ordered => false) do |x| + sleep(x) + processed += 1 + x + end.enum_for(:each) + enum.next.should == 0.1 + enum.next.should == 0.2 + elapsed_time.should > 0.19 + enum.next.should == 0.3 + expect { enum.next }.to raise_error + elapsed_time.should < 0.5 + processed.should == 3 + end + end + + context "With :ordered => true (ordered output)" do + it "An empty input produces an empty output" do + parallelize([]) do + sleep 10 + end.to_a == [] + elapsed_time.should < 1 + end + + it "10 sleep(0.5)s complete within 2 seconds" do + parallelize(1.upto(10)) do + sleep 0.5 + 'x' + end.to_a.should == %w(x x x x x x x x x x) + elapsed_time.should < 2 + end + + it "Output comes in the order of the input" do + enum = parallelize([0.5,0.3,0.1]) do |val| + sleep val + val + end.enum_for(:each_with_index) + enum.next.should == [ 0.5, 0 ] + enum.next.should == [ 0.3, 1 ] + enum.next.should == [ 0.1, 2 ] + elapsed_time.should < 0.7 + end + + it "Exceptions in input are raised in the correct sequence but do NOT stop processing" do + enum = parallelize(EnumerableWithException.new(0.5,0.3,0.1)) { |x| sleep(x); x }.enum_for(:each) + enum.next.should == 0.5 + elapsed_time.should < 0.7 + enum.next.should == 0.3 + enum.next.should == 0.1 + expect { enum.next }.to raise_error 'hi' + elapsed_time.should < 0.7 + end + + it "Exceptions in output are raised in the correct sequence but do NOT stop processing" do + processed = 0 + enum = parallelize([0.2,0.1,'x',0.3]) do |x| + sleep(x) + processed += 1 + x + end.enum_for(:each) + enum.next.should == 0.2 + enum.next.should == 0.1 + expect { enum.next }.to raise_error + elapsed_time.should > 0.25 + elapsed_time.should < 0.55 + processed.should == 3 + end + end + end + + context "With a Parallelizer with 1 thread" do + let :parallelizer do + Chef::ChefFS::Parallelizer.new(1) + end + + context "when the thread is occupied with a job" do + before :each do + parallelizer + started = false + @thread = Thread.new do + parallelizer.parallelize([0], :main_thread_processing => false) { |x| started = true; sleep(0.3) }.wait + end + while !started + sleep(0.01) + end + end + + after :each do + Thread.kill(@thread) + end + + it "parallelize with :main_thread_processing = true does not block" do + parallelizer.parallelize([1]) do |x| + sleep(0.1) + x + end.to_a.should == [ 1 ] + elapsed_time.should < 0.2 + end + + it "parallelize with :main_thread_processing = false waits for the job to finish" do + parallelizer.parallelize([1], :main_thread_processing => false) do |x| + sleep(0.1) + x + end.to_a.should == [ 1 ] + elapsed_time.should > 0.3 + end + end + end + + context "With a Parallelizer with 0 threads" do + let :parallelizer do + Chef::ChefFS::Parallelizer.new(0) + end + + context "And main_thread_processing on" do + it "succeeds in running" do + parallelizer.parallelize([0.5]) { |x| x*2 }.to_a.should == [1] + end + end + end + + context "With a Parallelizer with 10 threads" do + let :parallelizer do + Chef::ChefFS::Parallelizer.new(10) + end + + it "does not have contention issues with large numbers of inputs" do + parallelizer.parallelize(1.upto(500)) { |x| x+1 }.to_a.should == 2.upto(501).to_a + end + + it "does not have contention issues with large numbers of inputs with ordering off" do + parallelizer.parallelize(1.upto(500), :ordered => false) { |x| x+1 }.to_a.sort.should == 2.upto(501).to_a + end + + it "does not have contention issues with large numbers of jobs and inputs with ordering off" do + parallelizers = 0.upto(99).map do + parallelizer.parallelize(1.upto(500)) { |x| x+1 } + end + outputs = [] + threads = 0.upto(99).map do |i| + Thread.new { outputs[i] = parallelizers[i].to_a } + end + threads.each { |thread| thread.join } + outputs.each { |output| output.sort.should == 2.upto(501).to_a } + end + end +end |