diff options
author | John Keiser <jkeiser@opscode.com> | 2014-05-19 07:38:01 -0700 |
---|---|---|
committer | John Keiser <jkeiser@opscode.com> | 2014-05-19 07:38:01 -0700 |
commit | 0760983e7aff5bafe5b2c8c43891fbb3d0fd3ed3 (patch) | |
tree | 9dfdefc98291a46edde980577a13aa275f055274 /lib/chef/chef_fs/parallelizer.rb | |
parent | c28654606f6c2ef3211d0cc0cf8d02c29e9571b8 (diff) | |
download | chef-0760983e7aff5bafe5b2c8c43891fbb3d0fd3ed3.tar.gz |
Make parallel enumerable safe to restart
Diffstat (limited to 'lib/chef/chef_fs/parallelizer.rb')
-rw-r--r-- | lib/chef/chef_fs/parallelizer.rb | 39 |
1 files changed, 22 insertions, 17 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb index 3847934bf0..8e49e155df 100644 --- a/lib/chef/chef_fs/parallelizer.rb +++ b/lib/chef/chef_fs/parallelizer.rb @@ -36,10 +36,26 @@ class Chef resize(num_threads) end - def resize(num_threads, wait = true, timeout = nil) - if num_threads < @threads.size - threads_to_stop = @threads[num_threads..@threads.size-1] - @threads = @threads[0..num_threads-1] + def num_threads + @threads.size + end + + def parallelize(enumerable, options = {}, &block) + ParallelEnumerable.new(@tasks, enumerable, options, &block) + end + + def parallel_do(enumerable, options = {}, &block) + ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait + end + + def stop(wait = true, timeout = nil) + resize(0, wait, timeout) + end + + def resize(to_threads, wait = true, timeout = nil) + if to_threads < num_threads + threads_to_stop = @threads[to_threads..num_threads-1] + @threads = @threads.slice(0, to_threads) threads_to_stop.each do |thread| @stop_thread[thread] = true end @@ -53,27 +69,16 @@ class Chef end else - @threads.size.upto(num_threads - 1) do |i| + num_threads.upto(to_threads - 1) do |i| @threads[i] = Thread.new(&method(:worker_loop)) end end end - def parallelize(enumerable, options = {}, &block) - ParallelEnumerable.new(@tasks, enumerable, options, &block) - end - - def parallel_do(enumerable, options = {}, &block) - ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait - end - - def stop(wait = true, timeout = nil) - resize(0, wait, timeout) - end - def kill @threads.each do |thread| Thread.kill(thread) + @stop_thread.delete(thread) end @threads = [] end |