diff options
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 19 | ||||
-rw-r--r-- | spec/unit/chef_fs/parallelizer.rb | 54 |
2 files changed, 49 insertions, 24 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb index 8996304675..25aaffaab0 100644 --- a/lib/chef/chef_fs/parallelizer.rb +++ b/lib/chef/chef_fs/parallelizer.rb @@ -9,6 +9,7 @@ class Chef def self.threads=(value) if @@threads != value @@threads = value + @@parallelizer.kill @@parallelizer = nil end end @@ -17,8 +18,12 @@ class Chef @@parallelizer ||= Parallelizer.new(@@threads) end - def self.parallelize(enumerator, options = {}, &block) - parallelizer.parallelize(enumerator, options, &block) + def self.parallelize(enumerable, options = {}, &block) + parallelizer.parallelize(enumerable, options, &block) + end + + def self.parallel_do(enumerable, options = {}, &block) + parallelizer.parallel_do(enumerable, options, &block) end def initialize(threads) @@ -33,13 +38,19 @@ class Chef ParallelEnumerable.new(@tasks, enumerable, options, &block) end - def stop + def parallel_do(enumerable, options = {}, &block) + ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait + end + + def kill @threads.each do |thread| Thread.kill(thread) end @threads = [] end + private + def worker_loop while true begin @@ -61,6 +72,8 @@ class Chef # order). Default: true # :main_thread_processing [true|false] - whether the main thread pulling # on each() is allowed to process inputs. Default: true + # NOTE: If you set this to false, parallelizer.kill will stop each() + # in its tracks, so you need to know for sure that won't happen. def initialize(parent_task_queue, enumerable, options, &block) @task_queue = Queue.new @parent_task_queue = parent_task_queue diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb index bf5d4c7eab..971949251c 100644 --- a/spec/unit/chef_fs/parallelizer.rb +++ b/spec/unit/chef_fs/parallelizer.rb @@ -26,7 +26,7 @@ describe Chef::ChefFS::Parallelizer do end after :each do - parallelizer.stop + parallelizer.kill end context 'With a Parallelizer with 5 threads' do @@ -38,20 +38,30 @@ describe Chef::ChefFS::Parallelizer do parallelizer.parallelize(inputs, { :main_thread_processing => false }.merge(options), &block) end + it "parallel_do creates unordered output as soon as it is available" do + outputs = [] + parallelizer.parallel_do([0.5,0.3,0.1]) do |val| + sleep val + outputs << val + end + elapsed_time.should < 0.6 + outputs.should == [ 0.1, 0.3, 0.5 ] + end + context "With :ordered => false (unordered output)" do it "An empty input produces an empty output" do parallelize([], :ordered => false) do sleep 10 end.to_a == [] - elapsed_time.should < 1 + elapsed_time.should < 0.1 end - it "10 sleep(0.5)s complete within 2 seconds" do + it "10 sleep(0.2)s complete within 0.5 seconds" do parallelize(1.upto(10), :ordered => false) do |i| - sleep 0.5 + sleep 0.2 'x' end.to_a.should == %w(x x x x x x x x x x) - elapsed_time.should < 2 + elapsed_time.should < 0.5 end it "The output comes as soon as it is available" do @@ -60,21 +70,20 @@ describe Chef::ChefFS::Parallelizer do val end.enum_for(:each_with_index) enum.next.should == [ 0.1, 2 ] - elapsed_time.should < 0.3 + elapsed_time.should < 0.2 enum.next.should == [ 0.3, 1 ] - elapsed_time.should < 0.5 + elapsed_time.should < 0.4 enum.next.should == [ 0.5, 0 ] - elapsed_time.should < 0.7 + elapsed_time.should < 0.6 end it "An exception in input is passed through but does NOT stop processing" do enum = parallelize(EnumerableWithException.new(0.5,0.3,0.1), :ordered => false) { |x| sleep(x); x }.enum_for(:each) enum.next.should == 0.1 - elapsed_time.should > 0.1 enum.next.should == 0.3 enum.next.should == 0.5 expect { enum.next }.to raise_error 'hi' - elapsed_time.should < 0.7 + elapsed_time.should < 0.6 end it "Exceptions in output are raised after all processing is done" do @@ -86,10 +95,10 @@ describe Chef::ChefFS::Parallelizer do end.enum_for(:each) enum.next.should == 0.1 enum.next.should == 0.2 - elapsed_time.should > 0.19 + elapsed_time.should < 0.3 enum.next.should == 0.3 expect { enum.next }.to raise_error - elapsed_time.should < 0.5 + elapsed_time.should < 0.4 processed.should == 3 end end @@ -99,15 +108,15 @@ describe Chef::ChefFS::Parallelizer do parallelize([]) do sleep 10 end.to_a == [] - elapsed_time.should < 1 + elapsed_time.should < 0.1 end - it "10 sleep(0.5)s complete within 2 seconds" do - parallelize(1.upto(10)) do - sleep 0.5 + it "10 sleep(0.2)s complete within 0.5 seconds" do + parallelize(1.upto(10), :ordered => true) do |i| + sleep 0.2 'x' end.to_a.should == %w(x x x x x x x x x x) - elapsed_time.should < 2 + elapsed_time.should < 0.5 end it "Output comes in the order of the input" do @@ -118,7 +127,7 @@ describe Chef::ChefFS::Parallelizer do enum.next.should == [ 0.5, 0 ] enum.next.should == [ 0.3, 1 ] enum.next.should == [ 0.1, 2 ] - elapsed_time.should < 0.7 + elapsed_time.should < 0.6 end it "Exceptions in input are raised in the correct sequence but do NOT stop processing" do @@ -158,7 +167,10 @@ describe Chef::ChefFS::Parallelizer do parallelizer started = false @thread = Thread.new do - parallelizer.parallelize([0], :main_thread_processing => false) { |x| started = true; sleep(0.3) }.wait + parallelizer.parallelize([0], :main_thread_processing => false) do |x| + started = true + sleep(0.3) + end.wait end while !started sleep(0.01) @@ -180,8 +192,8 @@ describe Chef::ChefFS::Parallelizer do it "parallelize with :main_thread_processing = false waits for the job to finish" do parallelizer.parallelize([1], :main_thread_processing => false) do |x| sleep(0.1) - x - end.to_a.should == [ 1 ] + x+1 + end.to_a.should == [ 2 ] elapsed_time.should > 0.3 end end |