diff options
author | John Keiser <jkeiser@opscode.com> | 2014-05-18 19:18:38 -0700 |
---|---|---|
committer | John Keiser <jkeiser@opscode.com> | 2014-05-18 19:18:38 -0700 |
commit | c28654606f6c2ef3211d0cc0cf8d02c29e9571b8 (patch) | |
tree | 04d5cf7bff3fe8e2d61a57631c47f718b75bb3f3 | |
parent | a32736de04df3c40fb21ec57db00128cf33e1f7d (diff) | |
download | chef-c28654606f6c2ef3211d0cc0cf8d02c29e9571b8.tar.gz |
Make it possible to resize and gently stop the parallelizer
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 42 | ||||
-rw-r--r-- | spec/unit/chef_fs/parallelizer.rb | 24 |
2 files changed, 57 insertions, 9 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb index 8172aaf6e8..3847934bf0 100644 --- a/lib/chef/chef_fs/parallelizer.rb +++ b/lib/chef/chef_fs/parallelizer.rb @@ -13,11 +13,8 @@ class Chef @@threads = 0 def self.threads=(value) - if @@threads != value - @@threads = value - @@parallelizer.kill - @@parallelizer = nil - end + @@threads = value + @@parallelizer.resize(value) if @@parallelizer end def self.parallelizer @@ -32,11 +29,33 @@ class Chef parallelizer.parallel_do(enumerable, options, &block) end - def initialize(threads) + def initialize(num_threads) @tasks = Queue.new @threads = [] - 1.upto(threads) do |i| - @threads << Thread.new(&method(:worker_loop)) + @stop_thread = {} + resize(num_threads) + end + + def resize(num_threads, wait = true, timeout = nil) + if num_threads < @threads.size + threads_to_stop = @threads[num_threads..@threads.size-1] + @threads = @threads[0..num_threads-1] + threads_to_stop.each do |thread| + @stop_thread[thread] = true + end + + if wait + start_time = Time.now + threads_to_stop.each do |thread| + thread_timeout = timeout ? timeout - (Time.now - start_time) : nil + thread.join(thread_timeout) + end + end + + else + @threads.size.upto(num_threads - 1) do |i| + @threads[i] = Thread.new(&method(:worker_loop)) + end end end @@ -48,6 +67,10 @@ class Chef ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait end + def stop(wait = true, timeout = nil) + resize(0, wait, timeout) + end + def kill @threads.each do |thread| Thread.kill(thread) @@ -58,7 +81,7 @@ class Chef private def worker_loop - while true + while !@stop_thread[Thread.current] begin task = @tasks.pop task.call @@ -67,6 +90,7 @@ class Chef puts $!.backtrace end end + @stop_thread.delete(Thread.current) end end end diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb index 5c9f38a8eb..c474a4774d 100644 --- a/spec/unit/chef_fs/parallelizer.rb +++ b/spec/unit/chef_fs/parallelizer.rb @@ -227,10 +227,12 @@ describe Chef::ChefFS::Parallelizer do before :each do parallelizer started = false + @occupying_job_finished = occupying_job_finished = [ false ] @thread = Thread.new do parallelizer.parallelize([0], :main_thread_processing => false) do |x| started = true sleep(0.3) + occupying_job_finished[0] = true end.wait end while !started @@ -257,6 +259,28 @@ describe Chef::ChefFS::Parallelizer do end.to_a.should == [ 2 ] elapsed_time.should > 0.3 end + + it "resizing the Parallelizer to 0 waits for the job to stop" do + elapsed_time.should < 0.2 + parallelizer.resize(0) + elapsed_time.should > 0.25 + @occupying_job_finished.should == [ true ] + end + + it "stopping the Parallelizer waits for the job to finish" do + elapsed_time.should < 0.2 + parallelizer.stop + elapsed_time.should > 0.25 + @occupying_job_finished.should == [ true ] + end + + it "resizing the Parallelizer to 2 does not stop the job" do + elapsed_time.should < 0.2 + parallelizer.resize(2) + elapsed_time.should < 0.2 + sleep(0.3) + @occupying_job_finished.should == [ true ] + end end end |