summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/chef/chef_fs/parallelizer.rb39
-rw-r--r--lib/chef/chef_fs/parallelizer/parallel_enumerable.rb96
-rw-r--r--spec/unit/chef_fs/parallelizer.rb57
3 files changed, 142 insertions, 50 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 3847934bf0..8e49e155df 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -36,10 +36,26 @@ class Chef
resize(num_threads)
end
- def resize(num_threads, wait = true, timeout = nil)
- if num_threads < @threads.size
- threads_to_stop = @threads[num_threads..@threads.size-1]
- @threads = @threads[0..num_threads-1]
+ def num_threads
+ @threads.size
+ end
+
+ def parallelize(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options, &block)
+ end
+
+ def parallel_do(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
+ end
+
+ def stop(wait = true, timeout = nil)
+ resize(0, wait, timeout)
+ end
+
+ def resize(to_threads, wait = true, timeout = nil)
+ if to_threads < num_threads
+ threads_to_stop = @threads[to_threads..num_threads-1]
+ @threads = @threads.slice(0, to_threads)
threads_to_stop.each do |thread|
@stop_thread[thread] = true
end
@@ -53,27 +69,16 @@ class Chef
end
else
- @threads.size.upto(num_threads - 1) do |i|
+ num_threads.upto(to_threads - 1) do |i|
@threads[i] = Thread.new(&method(:worker_loop))
end
end
end
- def parallelize(enumerable, options = {}, &block)
- ParallelEnumerable.new(@tasks, enumerable, options, &block)
- end
-
- def parallel_do(enumerable, options = {}, &block)
- ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
- end
-
- def stop(wait = true, timeout = nil)
- resize(0, wait, timeout)
- end
-
def kill
@threads.each do |thread|
Thread.kill(thread)
+ @stop_thread.delete(thread)
end
@threads = []
end
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
index b293497889..6bd4a14fc3 100644
--- a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
+++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
@@ -25,7 +25,6 @@ class Chef
@unconsumed_input = Queue.new
@in_process = 0
-
@unconsumed_output = Queue.new
end
@@ -77,49 +76,72 @@ class Chef
FlattenEnumerable.new(self, levels)
end
+ # TODO efficient implementation for
+ # count, first, drop, take, skip: run the method on the input enumerable
+ # and use that as our each
+
private
def each_with_exceptions_unordered
- # Grab all the inputs, yielding any responses during enumeration
- # in case the enumeration itself takes time
+ if @each_running
+ raise "each() called on parallel enumerable twice simultaneously! Bad mojo"
+ end
+ @each_running = true
begin
- @enumerable.each_with_index do |input, index|
- @unconsumed_input.push([ input, index ])
- @parent_task_queue.push(method(:process_one))
- no_more_inputs = false
- while !@unconsumed_output.empty?
- output, index, input, type = @unconsumed_output.pop
- yield output, index, input, type
- if type == :exception && @options[:stop_on_exception]
- no_more_inputs = true
+ # Grab all the inputs, yielding any responses during enumeration
+ # in case the enumeration itself takes time
+ begin
+ @enumerable.each_with_index do |input, index|
+ @unconsumed_input.push([ input, index ])
+ @parent_task_queue.push(method(:process_one))
+
+ stop_processing_input = false
+ while !@unconsumed_output.empty?
+ output, index, input, type = @unconsumed_output.pop
+ yield output, index, input, type
+ if type == :exception && @options[:stop_on_exception]
+ stop_processing_input = true
+ break
+ end
+ end
+
+ if stop_processing_input
+ break
end
end
- break if no_more_inputs
- end
- rescue
- # We still want to wait for the rest of the outputs to process
- @unconsumed_output.push([$!, nil, nil, :exception])
- if @options[:stop_on_exception]
- @unconsumed_input.clear
+ rescue
+ # We still want to wait for the rest of the outputs to process
+ @unconsumed_output.push([$!, nil, nil, :exception])
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
end
- end
- while !finished?
- # yield thread to others (for 1.8.7)
- if @unconsumed_output.empty?
- sleep(0.01)
- end
+ while !finished?
+ # yield thread to others (for 1.8.7)
+ if @unconsumed_output.empty?
+ sleep(0.01)
+ end
- while !@unconsumed_output.empty?
- yield @unconsumed_output.pop
- end
+ while !@unconsumed_output.empty?
+ yield @unconsumed_output.pop
+ end
- # If no one is working on our tasks and we're allowed to
- # work on them in the main thread, process an input to
- # move things forward.
- if @in_process == 0 && !(@options[:main_thread_processing] == false)
- process_one
+ # If no one is working on our tasks and we're allowed to
+ # work on them in the main thread, process an input to
+ # move things forward.
+ if @in_process == 0 && !(@options[:main_thread_processing] == false)
+ process_one
+ end
+ end
+ ensure
+ # If someone called "first" or something that exits the enumerator
+ # early, we want to make sure and throw away any extra results
+ # (gracefully) so that the next enumerator can start over.
+ if !finished?
+ stop
end
+ @each_running = false
end
end
@@ -140,6 +162,14 @@ class Chef
end
end
+ def stop
+ @unconsumed_input.clear
+ while @in_process > 0
+ sleep(0.05)
+ end
+ @unconsumed_output.clear
+ end
+
#
# This is thread safe only if called from the main thread pulling on each().
# The order of these checks is important, as well, to be thread safe.
diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb
index c474a4774d..a91864fc2e 100644
--- a/spec/unit/chef_fs/parallelizer.rb
+++ b/spec/unit/chef_fs/parallelizer.rb
@@ -263,6 +263,7 @@ describe Chef::ChefFS::Parallelizer do
it "resizing the Parallelizer to 0 waits for the job to stop" do
elapsed_time.should < 0.2
parallelizer.resize(0)
+ parallelizer.num_threads.should == 0
elapsed_time.should > 0.25
@occupying_job_finished.should == [ true ]
end
@@ -270,6 +271,7 @@ describe Chef::ChefFS::Parallelizer do
it "stopping the Parallelizer waits for the job to finish" do
elapsed_time.should < 0.2
parallelizer.stop
+ parallelizer.num_threads.should == 0
elapsed_time.should > 0.25
@occupying_job_finished.should == [ true ]
end
@@ -277,11 +279,66 @@ describe Chef::ChefFS::Parallelizer do
it "resizing the Parallelizer to 2 does not stop the job" do
elapsed_time.should < 0.2
parallelizer.resize(2)
+ parallelizer.num_threads.should == 2
elapsed_time.should < 0.2
sleep(0.3)
@occupying_job_finished.should == [ true ]
end
end
+
+ class InputMapper
+ include Enumerable
+
+ def initialize(*values)
+ @values = values
+ @num_processed = 0
+ end
+
+ attr_reader :num_processed
+
+ def each
+ @values.each do |value|
+ @num_processed += 1
+ yield value
+ end
+ end
+ end
+
+ it ".map twice on the same parallel enumerable returns the correct results and re-processes the input", :focus do
+ outputs_processed = 0
+ input_mapper = InputMapper.new(1,2,3)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ x
+ end
+ enum.map { |x| x }.should == [1,2,3]
+ enum.map { |x| x }.should == [1,2,3]
+ outputs_processed.should == 6
+ input_mapper.num_processed.should == 6
+ end
+
+ it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input", :focus do
+ outputs_processed = 0
+ input_mapper = InputMapper.new(1,2,3)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ x
+ end
+ enum.first.should == 1
+ enum.map { |x| x }.should == [1,2,3]
+ outputs_processed.should >= 4
+ input_mapper.num_processed.should >= 4
+ end
+
+ it "two simultaneous enumerations throws an exception", :focus do
+ enum = parallelizer.parallelize([1,2,3]) { |x| x }
+ a = enum.enum_for(:each)
+ a.next
+ expect do
+ b = enum.enum_for(:each)
+ b.next
+ end.to raise_error
+ end
end
context "With a Parallelizer with 0 threads" do