summaryrefslogtreecommitdiff
path: root/lib/chef/chef_fs/parallelizer
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-19 07:38:01 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-19 07:38:01 -0700
commit0760983e7aff5bafe5b2c8c43891fbb3d0fd3ed3 (patch)
tree9dfdefc98291a46edde980577a13aa275f055274 /lib/chef/chef_fs/parallelizer
parentc28654606f6c2ef3211d0cc0cf8d02c29e9571b8 (diff)
downloadchef-0760983e7aff5bafe5b2c8c43891fbb3d0fd3ed3.tar.gz
Make parallel enumerable safe to restart
Diffstat (limited to 'lib/chef/chef_fs/parallelizer')
-rw-r--r--lib/chef/chef_fs/parallelizer/parallel_enumerable.rb96
1 files changed, 63 insertions, 33 deletions
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
index b293497889..6bd4a14fc3 100644
--- a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
+++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
@@ -25,7 +25,6 @@ class Chef
@unconsumed_input = Queue.new
@in_process = 0
-
@unconsumed_output = Queue.new
end
@@ -77,49 +76,72 @@ class Chef
FlattenEnumerable.new(self, levels)
end
+ # TODO efficient implementation for
+ # count, first, drop, take, skip: run the method on the input enumerable
+ # and use that as our each
+
private
def each_with_exceptions_unordered
- # Grab all the inputs, yielding any responses during enumeration
- # in case the enumeration itself takes time
+ if @each_running
+ raise "each() called on parallel enumerable twice simultaneously! Bad mojo"
+ end
+ @each_running = true
begin
- @enumerable.each_with_index do |input, index|
- @unconsumed_input.push([ input, index ])
- @parent_task_queue.push(method(:process_one))
- no_more_inputs = false
- while !@unconsumed_output.empty?
- output, index, input, type = @unconsumed_output.pop
- yield output, index, input, type
- if type == :exception && @options[:stop_on_exception]
- no_more_inputs = true
+ # Grab all the inputs, yielding any responses during enumeration
+ # in case the enumeration itself takes time
+ begin
+ @enumerable.each_with_index do |input, index|
+ @unconsumed_input.push([ input, index ])
+ @parent_task_queue.push(method(:process_one))
+
+ stop_processing_input = false
+ while !@unconsumed_output.empty?
+ output, index, input, type = @unconsumed_output.pop
+ yield output, index, input, type
+ if type == :exception && @options[:stop_on_exception]
+ stop_processing_input = true
+ break
+ end
+ end
+
+ if stop_processing_input
+ break
end
end
- break if no_more_inputs
- end
- rescue
- # We still want to wait for the rest of the outputs to process
- @unconsumed_output.push([$!, nil, nil, :exception])
- if @options[:stop_on_exception]
- @unconsumed_input.clear
+ rescue
+ # We still want to wait for the rest of the outputs to process
+ @unconsumed_output.push([$!, nil, nil, :exception])
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
end
- end
- while !finished?
- # yield thread to others (for 1.8.7)
- if @unconsumed_output.empty?
- sleep(0.01)
- end
+ while !finished?
+ # yield thread to others (for 1.8.7)
+ if @unconsumed_output.empty?
+ sleep(0.01)
+ end
- while !@unconsumed_output.empty?
- yield @unconsumed_output.pop
- end
+ while !@unconsumed_output.empty?
+ yield @unconsumed_output.pop
+ end
- # If no one is working on our tasks and we're allowed to
- # work on them in the main thread, process an input to
- # move things forward.
- if @in_process == 0 && !(@options[:main_thread_processing] == false)
- process_one
+ # If no one is working on our tasks and we're allowed to
+ # work on them in the main thread, process an input to
+ # move things forward.
+ if @in_process == 0 && !(@options[:main_thread_processing] == false)
+ process_one
+ end
+ end
+ ensure
+ # If someone called "first" or something that exits the enumerator
+ # early, we want to make sure and throw away any extra results
+ # (gracefully) so that the next enumerator can start over.
+ if !finished?
+ stop
end
+ @each_running = false
end
end
@@ -140,6 +162,14 @@ class Chef
end
end
+ def stop
+ @unconsumed_input.clear
+ while @in_process > 0
+ sleep(0.05)
+ end
+ @unconsumed_output.clear
+ end
+
#
# This is thread safe only if called from the main thread pulling on each().
# The order of these checks is important, as well, to be thread safe.