summaryrefslogtreecommitdiff
path: root/lib/chef/chef_fs/parallelizer.rb
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-19 07:38:01 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-19 07:38:01 -0700
commit0760983e7aff5bafe5b2c8c43891fbb3d0fd3ed3 (patch)
tree9dfdefc98291a46edde980577a13aa275f055274 /lib/chef/chef_fs/parallelizer.rb
parentc28654606f6c2ef3211d0cc0cf8d02c29e9571b8 (diff)
downloadchef-0760983e7aff5bafe5b2c8c43891fbb3d0fd3ed3.tar.gz
Make parallel enumerable safe to restart
Diffstat (limited to 'lib/chef/chef_fs/parallelizer.rb')
-rw-r--r--lib/chef/chef_fs/parallelizer.rb39
1 files changed, 22 insertions, 17 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 3847934bf0..8e49e155df 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -36,10 +36,26 @@ class Chef
resize(num_threads)
end
- def resize(num_threads, wait = true, timeout = nil)
- if num_threads < @threads.size
- threads_to_stop = @threads[num_threads..@threads.size-1]
- @threads = @threads[0..num_threads-1]
+ def num_threads
+ @threads.size
+ end
+
+ def parallelize(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options, &block)
+ end
+
+ def parallel_do(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
+ end
+
+ def stop(wait = true, timeout = nil)
+ resize(0, wait, timeout)
+ end
+
+ def resize(to_threads, wait = true, timeout = nil)
+ if to_threads < num_threads
+ threads_to_stop = @threads[to_threads..num_threads-1]
+ @threads = @threads.slice(0, to_threads)
threads_to_stop.each do |thread|
@stop_thread[thread] = true
end
@@ -53,27 +69,16 @@ class Chef
end
else
- @threads.size.upto(num_threads - 1) do |i|
+ num_threads.upto(to_threads - 1) do |i|
@threads[i] = Thread.new(&method(:worker_loop))
end
end
end
- def parallelize(enumerable, options = {}, &block)
- ParallelEnumerable.new(@tasks, enumerable, options, &block)
- end
-
- def parallel_do(enumerable, options = {}, &block)
- ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
- end
-
- def stop(wait = true, timeout = nil)
- resize(0, wait, timeout)
- end
-
def kill
@threads.each do |thread|
Thread.kill(thread)
+ @stop_thread.delete(thread)
end
@threads = []
end