summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-18 19:18:38 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-18 19:18:38 -0700
commitc28654606f6c2ef3211d0cc0cf8d02c29e9571b8 (patch)
tree04d5cf7bff3fe8e2d61a57631c47f718b75bb3f3
parenta32736de04df3c40fb21ec57db00128cf33e1f7d (diff)
downloadchef-c28654606f6c2ef3211d0cc0cf8d02c29e9571b8.tar.gz
Make it possible to resize and gently stop the parallelizer
-rw-r--r--lib/chef/chef_fs/parallelizer.rb42
-rw-r--r--spec/unit/chef_fs/parallelizer.rb24
2 files changed, 57 insertions, 9 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 8172aaf6e8..3847934bf0 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -13,11 +13,8 @@ class Chef
@@threads = 0
def self.threads=(value)
- if @@threads != value
- @@threads = value
- @@parallelizer.kill
- @@parallelizer = nil
- end
+ @@threads = value
+ @@parallelizer.resize(value) if @@parallelizer
end
def self.parallelizer
@@ -32,11 +29,33 @@ class Chef
parallelizer.parallel_do(enumerable, options, &block)
end
- def initialize(threads)
+ def initialize(num_threads)
@tasks = Queue.new
@threads = []
- 1.upto(threads) do |i|
- @threads << Thread.new(&method(:worker_loop))
+ @stop_thread = {}
+ resize(num_threads)
+ end
+
+ def resize(num_threads, wait = true, timeout = nil)
+ if num_threads < @threads.size
+ threads_to_stop = @threads[num_threads..@threads.size-1]
+ @threads = @threads[0..num_threads-1]
+ threads_to_stop.each do |thread|
+ @stop_thread[thread] = true
+ end
+
+ if wait
+ start_time = Time.now
+ threads_to_stop.each do |thread|
+ thread_timeout = timeout ? timeout - (Time.now - start_time) : nil
+ thread.join(thread_timeout)
+ end
+ end
+
+ else
+ @threads.size.upto(num_threads - 1) do |i|
+ @threads[i] = Thread.new(&method(:worker_loop))
+ end
end
end
@@ -48,6 +67,10 @@ class Chef
ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
end
+ def stop(wait = true, timeout = nil)
+ resize(0, wait, timeout)
+ end
+
def kill
@threads.each do |thread|
Thread.kill(thread)
@@ -58,7 +81,7 @@ class Chef
private
def worker_loop
- while true
+ while !@stop_thread[Thread.current]
begin
task = @tasks.pop
task.call
@@ -67,6 +90,7 @@ class Chef
puts $!.backtrace
end
end
+ @stop_thread.delete(Thread.current)
end
end
end
diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb
index 5c9f38a8eb..c474a4774d 100644
--- a/spec/unit/chef_fs/parallelizer.rb
+++ b/spec/unit/chef_fs/parallelizer.rb
@@ -227,10 +227,12 @@ describe Chef::ChefFS::Parallelizer do
before :each do
parallelizer
started = false
+ @occupying_job_finished = occupying_job_finished = [ false ]
@thread = Thread.new do
parallelizer.parallelize([0], :main_thread_processing => false) do |x|
started = true
sleep(0.3)
+ occupying_job_finished[0] = true
end.wait
end
while !started
@@ -257,6 +259,28 @@ describe Chef::ChefFS::Parallelizer do
end.to_a.should == [ 2 ]
elapsed_time.should > 0.3
end
+
+ it "resizing the Parallelizer to 0 waits for the job to stop" do
+ elapsed_time.should < 0.2
+ parallelizer.resize(0)
+ elapsed_time.should > 0.25
+ @occupying_job_finished.should == [ true ]
+ end
+
+ it "stopping the Parallelizer waits for the job to finish" do
+ elapsed_time.should < 0.2
+ parallelizer.stop
+ elapsed_time.should > 0.25
+ @occupying_job_finished.should == [ true ]
+ end
+
+ it "resizing the Parallelizer to 2 does not stop the job" do
+ elapsed_time.should < 0.2
+ parallelizer.resize(2)
+ elapsed_time.should < 0.2
+ sleep(0.3)
+ @occupying_job_finished.should == [ true ]
+ end
end
end