summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/chef/chef_fs/parallelizer.rb19
-rw-r--r--spec/unit/chef_fs/parallelizer.rb54
2 files changed, 49 insertions, 24 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 8996304675..25aaffaab0 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -9,6 +9,7 @@ class Chef
def self.threads=(value)
if @@threads != value
@@threads = value
+ @@parallelizer.kill
@@parallelizer = nil
end
end
@@ -17,8 +18,12 @@ class Chef
@@parallelizer ||= Parallelizer.new(@@threads)
end
- def self.parallelize(enumerator, options = {}, &block)
- parallelizer.parallelize(enumerator, options, &block)
+ def self.parallelize(enumerable, options = {}, &block)
+ parallelizer.parallelize(enumerable, options, &block)
+ end
+
+ def self.parallel_do(enumerable, options = {}, &block)
+ parallelizer.parallel_do(enumerable, options, &block)
end
def initialize(threads)
@@ -33,13 +38,19 @@ class Chef
ParallelEnumerable.new(@tasks, enumerable, options, &block)
end
- def stop
+ def parallel_do(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
+ end
+
+ def kill
@threads.each do |thread|
Thread.kill(thread)
end
@threads = []
end
+ private
+
def worker_loop
while true
begin
@@ -61,6 +72,8 @@ class Chef
# order). Default: true
# :main_thread_processing [true|false] - whether the main thread pulling
# on each() is allowed to process inputs. Default: true
+ # NOTE: If you set this to false, parallelizer.kill will stop each()
+ # in its tracks, so you need to know for sure that won't happen.
def initialize(parent_task_queue, enumerable, options, &block)
@task_queue = Queue.new
@parent_task_queue = parent_task_queue
diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb
index bf5d4c7eab..971949251c 100644
--- a/spec/unit/chef_fs/parallelizer.rb
+++ b/spec/unit/chef_fs/parallelizer.rb
@@ -26,7 +26,7 @@ describe Chef::ChefFS::Parallelizer do
end
after :each do
- parallelizer.stop
+ parallelizer.kill
end
context 'With a Parallelizer with 5 threads' do
@@ -38,20 +38,30 @@ describe Chef::ChefFS::Parallelizer do
parallelizer.parallelize(inputs, { :main_thread_processing => false }.merge(options), &block)
end
+ it "parallel_do creates unordered output as soon as it is available" do
+ outputs = []
+ parallelizer.parallel_do([0.5,0.3,0.1]) do |val|
+ sleep val
+ outputs << val
+ end
+ elapsed_time.should < 0.6
+ outputs.should == [ 0.1, 0.3, 0.5 ]
+ end
+
context "With :ordered => false (unordered output)" do
it "An empty input produces an empty output" do
parallelize([], :ordered => false) do
sleep 10
end.to_a == []
- elapsed_time.should < 1
+ elapsed_time.should < 0.1
end
- it "10 sleep(0.5)s complete within 2 seconds" do
+ it "10 sleep(0.2)s complete within 0.5 seconds" do
parallelize(1.upto(10), :ordered => false) do |i|
- sleep 0.5
+ sleep 0.2
'x'
end.to_a.should == %w(x x x x x x x x x x)
- elapsed_time.should < 2
+ elapsed_time.should < 0.5
end
it "The output comes as soon as it is available" do
@@ -60,21 +70,20 @@ describe Chef::ChefFS::Parallelizer do
val
end.enum_for(:each_with_index)
enum.next.should == [ 0.1, 2 ]
- elapsed_time.should < 0.3
+ elapsed_time.should < 0.2
enum.next.should == [ 0.3, 1 ]
- elapsed_time.should < 0.5
+ elapsed_time.should < 0.4
enum.next.should == [ 0.5, 0 ]
- elapsed_time.should < 0.7
+ elapsed_time.should < 0.6
end
it "An exception in input is passed through but does NOT stop processing" do
enum = parallelize(EnumerableWithException.new(0.5,0.3,0.1), :ordered => false) { |x| sleep(x); x }.enum_for(:each)
enum.next.should == 0.1
- elapsed_time.should > 0.1
enum.next.should == 0.3
enum.next.should == 0.5
expect { enum.next }.to raise_error 'hi'
- elapsed_time.should < 0.7
+ elapsed_time.should < 0.6
end
it "Exceptions in output are raised after all processing is done" do
@@ -86,10 +95,10 @@ describe Chef::ChefFS::Parallelizer do
end.enum_for(:each)
enum.next.should == 0.1
enum.next.should == 0.2
- elapsed_time.should > 0.19
+ elapsed_time.should < 0.3
enum.next.should == 0.3
expect { enum.next }.to raise_error
- elapsed_time.should < 0.5
+ elapsed_time.should < 0.4
processed.should == 3
end
end
@@ -99,15 +108,15 @@ describe Chef::ChefFS::Parallelizer do
parallelize([]) do
sleep 10
end.to_a == []
- elapsed_time.should < 1
+ elapsed_time.should < 0.1
end
- it "10 sleep(0.5)s complete within 2 seconds" do
- parallelize(1.upto(10)) do
- sleep 0.5
+ it "10 sleep(0.2)s complete within 0.5 seconds" do
+ parallelize(1.upto(10), :ordered => true) do |i|
+ sleep 0.2
'x'
end.to_a.should == %w(x x x x x x x x x x)
- elapsed_time.should < 2
+ elapsed_time.should < 0.5
end
it "Output comes in the order of the input" do
@@ -118,7 +127,7 @@ describe Chef::ChefFS::Parallelizer do
enum.next.should == [ 0.5, 0 ]
enum.next.should == [ 0.3, 1 ]
enum.next.should == [ 0.1, 2 ]
- elapsed_time.should < 0.7
+ elapsed_time.should < 0.6
end
it "Exceptions in input are raised in the correct sequence but do NOT stop processing" do
@@ -158,7 +167,10 @@ describe Chef::ChefFS::Parallelizer do
parallelizer
started = false
@thread = Thread.new do
- parallelizer.parallelize([0], :main_thread_processing => false) { |x| started = true; sleep(0.3) }.wait
+ parallelizer.parallelize([0], :main_thread_processing => false) do |x|
+ started = true
+ sleep(0.3)
+ end.wait
end
while !started
sleep(0.01)
@@ -180,8 +192,8 @@ describe Chef::ChefFS::Parallelizer do
it "parallelize with :main_thread_processing = false waits for the job to finish" do
parallelizer.parallelize([1], :main_thread_processing => false) do |x|
sleep(0.1)
- x
- end.to_a.should == [ 1 ]
+ x+1
+ end.to_a.should == [ 2 ]
elapsed_time.should > 0.3
end
end