summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-20 12:21:18 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-20 12:21:18 -0700
commit8453ed5652da2498a5bb7e15e3468e3fbffa720f (patch)
treea837b9046f6b485129b99c4a61c976ce99ed7665
parent8a9b9bd5d186c300c4d1bd969b95b40a6f4d931c (diff)
parentd6f24954a16aa5e2e60050e1344265353a381428 (diff)
downloadchef-8453ed5652da2498a5bb7e15e3468e3fbffa720f.tar.gz
Merge parallelizer robustification and enhancements
-rw-r--r--lib/chef/chef_fs/command_line.rb8
-rw-r--r--lib/chef/chef_fs/file_system.rb6
-rw-r--r--lib/chef/chef_fs/parallelizer.rb156
-rw-r--r--lib/chef/chef_fs/parallelizer/flatten_enumerable.rb35
-rw-r--r--lib/chef/chef_fs/parallelizer/parallel_enumerable.rb279
-rw-r--r--lib/chef/knife/list.rb17
-rw-r--r--lib/chef/knife/show.rb5
-rw-r--r--spec/integration/knife/chef_repo_path_spec.rb2
-rw-r--r--spec/unit/chef_fs/parallelizer.rb482
9 files changed, 881 insertions, 109 deletions
diff --git a/lib/chef/chef_fs/command_line.rb b/lib/chef/chef_fs/command_line.rb
index d0183a5a2a..967c59ecae 100644
--- a/lib/chef/chef_fs/command_line.rb
+++ b/lib/chef/chef_fs/command_line.rb
@@ -129,9 +129,9 @@ class Chef
end
def self.diff(pattern, old_root, new_root, recurse_depth, get_content)
- Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root), :flatten => true) do |old_entry, new_entry|
+ Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.list_pairs(pattern, old_root, new_root)) do |old_entry, new_entry|
diff_entries(old_entry, new_entry, recurse_depth, get_content)
- end
+ end.flatten(1)
end
# Diff two known entries (could be files or dirs)
@@ -142,9 +142,9 @@ class Chef
if recurse_depth == 0
return [ [ :common_subdirectories, old_entry, new_entry ] ]
else
- return Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry), :flatten => true) do |old_child, new_child|
+ return Chef::ChefFS::Parallelizer.parallelize(Chef::ChefFS::FileSystem.child_pairs(old_entry, new_entry)) do |old_child, new_child|
Chef::ChefFS::CommandLine.diff_entries(old_child, new_child, recurse_depth ? recurse_depth - 1 : nil, get_content)
- end
+ end.flatten(1)
end
# If old is a directory and new is a file
diff --git a/lib/chef/chef_fs/file_system.rb b/lib/chef/chef_fs/file_system.rb
index f2478c4680..9fe726744e 100644
--- a/lib/chef/chef_fs/file_system.rb
+++ b/lib/chef/chef_fs/file_system.rb
@@ -72,8 +72,8 @@ class Chef
# Otherwise, go through all children and find any matches
elsif entry.dir?
- results = Parallelizer::parallelize(entry.children, :flatten => true) { |child| Chef::ChefFS::FileSystem.list(child, pattern) }
- results.each(&block)
+ results = Parallelizer::parallelize(entry.children) { |child| Chef::ChefFS::FileSystem.list(child, pattern) }
+ results.flatten(1).each(&block)
end
end
end
@@ -419,7 +419,7 @@ class Chef
end
def self.parallel_do(enum, options = {}, &block)
- Chef::ChefFS::Parallelizer.parallelize(enum, options, &block).to_a
+ Chef::ChefFS::Parallelizer.parallel_do(enum, options, &block)
end
end
end
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 84f3d4d870..116a626869 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -1,127 +1,103 @@
+require 'thread'
+require 'chef/chef_fs/parallelizer/parallel_enumerable'
+
class Chef
module ChefFS
+ # Tries to balance several guarantees, in order of priority:
+ # - don't get deadlocked
+ # - provide results in desired order
+ # - provide results as soon as they are available
+ # - process input as soon as possible
class Parallelizer
@@parallelizer = nil
@@threads = 0
def self.threads=(value)
- if @@threads != value
- @@threads = value
- @@parallelizer = nil
- end
+ @@threads = value
+ @@parallelizer.resize(value) if @@parallelizer
end
- def self.parallelize(enumerator, options = {}, &block)
+ def self.parallelizer
@@parallelizer ||= Parallelizer.new(@@threads)
- @@parallelizer.parallelize(enumerator, options, &block)
end
- def initialize(threads)
- @tasks_mutex = Mutex.new
- @tasks = []
- @threads = []
- 1.upto(threads) do
- @threads << Thread.new { worker_loop }
- end
+ def self.parallelize(enumerable, options = {}, &block)
+ parallelizer.parallelize(enumerable, options, &block)
end
- def parallelize(enumerator, options = {}, &block)
- task = ParallelizedResults.new(enumerator, options, &block)
- @tasks_mutex.synchronize do
- @tasks << task
- end
- task
+ def self.parallel_do(enumerable, options = {}, &block)
+ parallelizer.parallel_do(enumerable, options, &block)
end
- class ParallelizedResults
- include Enumerable
+ def initialize(num_threads)
+ @tasks = Queue.new
+ @threads = []
+ @stop_thread = {}
+ resize(num_threads)
+ end
- def initialize(enumerator, options, &block)
- @inputs = enumerator.to_a
- @options = options
- @block = block
+ def num_threads
+ @threads.size
+ end
- @mutex = Mutex.new
- @outputs = []
- @status = []
- end
+ def parallelize(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options, &block)
+ end
- def each
- next_index = 0
- while true
- # Report any results that already exist
- while @status.length > next_index && ([:finished, :exception].include?(@status[next_index]))
- if @status[next_index] == :finished
- if @options[:flatten]
- @outputs[next_index].each do |entry|
- yield entry
- end
- else
- yield @outputs[next_index]
- end
- else
- raise @outputs[next_index]
- end
- next_index = next_index + 1
- end
+ def parallel_do(enumerable, options = {}, &block)
+ ParallelEnumerable.new(@tasks, enumerable, options.merge(:ordered => false), &block).wait
+ end
- # Pick up a result and process it, if there is one. This ensures we
- # move forward even if there are *zero* worker threads available.
- if !process_input
- # Exit if we're done.
- if next_index >= @status.length
- break
- else
- # Ruby 1.8 threading sucks. Wait till we process more things.
- sleep(0.05)
- end
- end
+ def stop(wait = true, timeout = nil)
+ resize(0, wait, timeout)
+ end
+
+ def resize(to_threads, wait = true, timeout = nil)
+ if to_threads < num_threads
+ threads_to_stop = @threads[to_threads..num_threads-1]
+ @threads = @threads.slice(0, to_threads)
+ threads_to_stop.each do |thread|
+ @stop_thread[thread] = true
end
- end
- def process_input
- # Grab the next one to process
- index, input = @mutex.synchronize do
- index = @status.length
- if index >= @inputs.length
- return nil
+ if wait
+ start_time = Time.now
+ threads_to_stop.each do |thread|
+ thread_timeout = timeout ? timeout - (Time.now - start_time) : nil
+ thread.join(thread_timeout)
end
- input = @inputs[index]
- @status[index] = :started
- [ index, input ]
end
- begin
- @outputs[index] = @block.call(input)
- @status[index] = :finished
- rescue Exception
- @outputs[index] = $!
- @status[index] = :exception
+ else
+ num_threads.upto(to_threads - 1) do |i|
+ @threads[i] = Thread.new(&method(:worker_loop))
end
- index
end
end
+ def kill
+ @threads.each do |thread|
+ Thread.kill(thread)
+ @stop_thread.delete(thread)
+ end
+ @threads = []
+ end
+
private
def worker_loop
- while true
- begin
- task = @tasks[0]
- if task
- if !task.process_input
- @tasks_mutex.synchronize do
- @tasks.delete(task)
- end
- end
- else
- # Ruby 1.8 threading sucks. Wait a bit to see if another task comes in.
- sleep(0.05)
+ begin
+ while !@stop_thread[Thread.current]
+ begin
+ task = @tasks.pop
+ task.call
+ rescue
+ puts "ERROR #{$!}"
+ puts $!.backtrace
end
- rescue
- puts "ERROR #{$!}"
- puts $!.backtrace
end
+ ensure
+ @stop_thread.delete(Thread.current)
end
end
end
diff --git a/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb
new file mode 100644
index 0000000000..7321aa0664
--- /dev/null
+++ b/lib/chef/chef_fs/parallelizer/flatten_enumerable.rb
@@ -0,0 +1,35 @@
+class Chef
+ module ChefFS
+ class Parallelizer
+ class FlattenEnumerable
+ include Enumerable
+
+ def initialize(enum, levels = nil)
+ @enum = enum
+ @levels = levels
+ end
+
+ attr_reader :enum
+ attr_reader :levels
+
+ def each(&block)
+ enum.each do |value|
+ flatten(value, levels, &block)
+ end
+ end
+
+ private
+
+ def flatten(value, levels, &block)
+ if levels != 0 && value.respond_to?(:each) && !value.is_a?(String)
+ value.each do |child|
+ flatten(child, levels.nil? ? levels : levels-1, &block)
+ end
+ else
+ block.call(value)
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
new file mode 100644
index 0000000000..8e50f361db
--- /dev/null
+++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
@@ -0,0 +1,279 @@
+require 'chef/chef_fs/parallelizer/flatten_enumerable'
+
+class Chef
+ module ChefFS
+ class Parallelizer
+ class ParallelEnumerable
+ include Enumerable
+
+ # options:
+ # :ordered [true|false] - whether the output should stay in the same order
+ # as the input (even though it may not actually be processed in that
+ # order). Default: true
+ # :stop_on_exception [true|false] - if true, when an exception occurs in either
+ # input or output, we wait for any outstanding processing to complete,
+ # but will not process any new inputs. Default: false
+ # :main_thread_processing [true|false] - whether the main thread pulling
+ # on each() is allowed to process inputs. Default: true
+ # NOTE: If you set this to false, parallelizer.kill will stop each()
+ # in its tracks, so you need to know for sure that won't happen.
+ def initialize(parent_task_queue, input_enumerable, options = {}, &block)
+ @parent_task_queue = parent_task_queue
+ @input_enumerable = input_enumerable
+ @options = options
+ @block = block
+
+ @unconsumed_input = Queue.new
+ @in_process = {}
+ @unconsumed_output = Queue.new
+ end
+
+ attr_reader :parent_task_queue
+ attr_reader :input_enumerable
+ attr_reader :options
+ attr_reader :block
+
+ def each
+ each_with_input do |output, index, input, type|
+ yield output
+ end
+ end
+
+ def each_with_index
+ each_with_input do |output, index, input|
+ yield output, index
+ end
+ end
+
+ def each_with_input
+ exception = nil
+ each_with_exceptions do |output, index, input, type|
+ if type == :exception
+ if @options[:ordered] == false
+ exception ||= output
+ else
+ raise output
+ end
+ else
+ yield output, index, input
+ end
+ end
+ raise exception if exception
+ end
+
+ def each_with_exceptions(&block)
+ if @options[:ordered] == false
+ each_with_exceptions_unordered(&block)
+ else
+ each_with_exceptions_ordered(&block)
+ end
+ end
+
+ def wait
+ exception = nil
+ each_with_exceptions_unordered do |output, index, input, type|
+ exception ||= output if type == :exception
+ end
+ raise exception if exception
+ end
+
+ # Enumerable methods
+ def restricted_copy(enumerable)
+ ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block)
+ end
+
+ alias :original_count :count
+
+ def count(*args, &block)
+ if args.size == 0 && block.nil?
+ @input_enumerable.count
+ else
+ original_count(*args, &block)
+ end
+ end
+
+ def first(n=nil)
+ if n
+ restricted_copy(@input_enumerable.first(n)).to_a
+ else
+ first(1)[0]
+ end
+ end
+
+ def drop(n)
+ restricted_copy(@input_enumerable.drop(n)).to_a
+ end
+
+ def flatten(levels = nil)
+ FlattenEnumerable.new(self, levels)
+ end
+
+ def take(n)
+ restricted_copy(@input_enumerable.take(n)).to_a
+ end
+
+ if Enumerable.method_defined?(:lazy)
+ class RestrictedLazy
+ def initialize(parallel_enumerable, actual_lazy)
+ @parallel_enumerable = parallel_enumerable
+ @actual_lazy = actual_lazy
+ end
+
+ def drop(*args, &block)
+ input = @parallel_enumerable.input_enumerable.lazy.drop(*args, &block)
+ @parallel_enumerable.restricted_copy(input)
+ end
+
+ def take(*args, &block)
+ input = @parallel_enumerable.input_enumerable.lazy.take(*args, &block)
+ @parallel_enumerable.restricted_copy(input)
+ end
+
+ def method_missing(method, *args, &block)
+ @actual_lazy.send(:method, *args, &block)
+ end
+ end
+
+ alias :original_lazy :lazy
+
+ def lazy
+ RestrictedLazy.new(self, original_lazy)
+ end
+ end
+
+ private
+
+ def each_with_exceptions_unordered
+ if @each_running
+ raise "each() called on parallel enumerable twice simultaneously! Bad mojo"
+ end
+ @each_running = true
+ begin
+ # Grab all the inputs, yielding any responses during enumeration
+ # in case the enumeration itself takes time
+ begin
+ @input_enumerable.each_with_index do |input, index|
+ @unconsumed_input.push([ input, index ])
+ @parent_task_queue.push(method(:process_one))
+
+ stop_processing_input = false
+ while !@unconsumed_output.empty?
+ output, index, input, type = @unconsumed_output.pop
+ yield output, index, input, type
+ if type == :exception && @options[:stop_on_exception]
+ stop_processing_input = true
+ break
+ end
+ end
+
+ if stop_processing_input
+ break
+ end
+ end
+ rescue
+ # We still want to wait for the rest of the outputs to process
+ @unconsumed_output.push([$!, nil, nil, :exception])
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
+ end
+
+ while !finished?
+ # yield thread to others (for 1.8.7)
+ if @unconsumed_output.empty?
+ sleep(0.01)
+ end
+
+ while !@unconsumed_output.empty?
+ yield @unconsumed_output.pop
+ end
+
+ # If no one is working on our tasks and we're allowed to
+ # work on them in the main thread, process an input to
+ # move things forward.
+ if @in_process.size == 0 && !(@options[:main_thread_processing] == false)
+ process_one
+ end
+ end
+ ensure
+ # If someone called "first" or something that exits the enumerator
+ # early, we want to make sure and throw away any extra results
+ # (gracefully) so that the next enumerator can start over.
+ if !finished?
+ stop
+ end
+ @each_running = false
+ end
+ end
+
+ def each_with_exceptions_ordered
+ next_to_yield = 0
+ unconsumed = {}
+ each_with_exceptions_unordered do |output, index, input, type|
+ unconsumed[index] = [ output, input, type ]
+ while unconsumed[next_to_yield]
+ input_output = unconsumed.delete(next_to_yield)
+ yield input_output[0], next_to_yield, input_output[1], input_output[2]
+ next_to_yield += 1
+ end
+ end
+ input_exception = unconsumed.delete(nil)
+ if input_exception
+ yield input_exception[0], next_to_yield, input_exception[1], input_exception[2]
+ end
+ end
+
+ def stop
+ @unconsumed_input.clear
+ while @in_process.size > 0
+ sleep(0.05)
+ end
+ @unconsumed_output.clear
+ end
+
+ #
+ # This is thread safe only if called from the main thread pulling on each().
+ # The order of these checks is important, as well, to be thread safe.
+ # 1. If @unconsumed_input.empty? is true, then we will never have any more
+ # work legitimately picked up.
+ # 2. If @in_process == 0, then there is no work in process, and because ofwhen unconsumed_input is empty, it will never go back up, because
+ # this is called after the input enumerator is finished. Note that switching #2 and #1
+ # could cause a race, because in_process is incremented *before* consuming input.
+ # 3. If @unconsumed_output.empty? is true, then we are done with outputs.
+ # Thus, 1+2 means no more output will ever show up, and 3 means we've passed all
+ # existing outputs to the user.
+ #
+ def finished?
+ @unconsumed_input.empty? && @in_process.size == 0 && @unconsumed_output.empty?
+ end
+
+ def process_one
+ @in_process[Thread.current] = true
+ begin
+ begin
+ input, index = @unconsumed_input.pop(true)
+ process_input(input, index)
+ rescue ThreadError
+ end
+ ensure
+ @in_process.delete(Thread.current)
+ end
+ end
+
+ def process_input(input, index)
+ begin
+ output = @block.call(input)
+ @unconsumed_output.push([ output, index, input, :result ])
+ rescue
+ if @options[:stop_on_exception]
+ @unconsumed_input.clear
+ end
+ @unconsumed_output.push([ $!, index, input, :exception ])
+ end
+
+ index
+ end
+ end
+ end
+ end
+end
diff --git a/lib/chef/knife/list.rb b/lib/chef/knife/list.rb
index 4338e195bd..137d61f3a5 100644
--- a/lib/chef/knife/list.rb
+++ b/lib/chef/knife/list.rb
@@ -43,21 +43,23 @@ class Chef
def run
patterns = name_args.length == 0 ? [""] : name_args
- # Get the matches (recursively)
- all_results = parallelize(pattern_args_from(patterns), :flatten => true) do |pattern|
- pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern)
+ # Get the top-level matches
+ args = pattern_args_from(patterns)
+ all_results = parallelize(pattern_args_from(patterns)) do |pattern|
+ pattern_results = Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern).to_a
if pattern_results.first && !pattern_results.first.exists? && pattern.exact_path
ui.error "#{format_path(pattern_results.first)}: No such file or directory"
self.exit_code = 1
end
pattern_results
- end
+ end.flatten(1).to_a
# Process directories
if !config[:bare_directories]
- dir_results = parallelize(all_results.select { |result| result.dir? }, :flatten => true) do |result|
+ dir_results = parallelize(all_results.select { |result| result.dir? }) do |result|
add_dir_result(result)
- end.to_a
+ end.flatten(1)
+
else
dir_results = []
end
@@ -109,7 +111,7 @@ class Chef
result = [ [ result, children ] ]
if config[:recursive]
child_dirs = children.select { |child| child.dir? }
- result += parallelize(child_dirs, :flatten => true) { |child| add_dir_result(child) }.to_a
+ result += parallelize(child_dirs) { |child| add_dir_result(child) }.flatten(1).to_a
end
result
end
@@ -152,4 +154,3 @@ class Chef
end
end
end
-
diff --git a/lib/chef/knife/show.rb b/lib/chef/knife/show.rb
index acf1996e96..4684a6ac7e 100644
--- a/lib/chef/knife/show.rb
+++ b/lib/chef/knife/show.rb
@@ -20,7 +20,7 @@ class Chef
def run
# Get the matches (recursively)
error = false
- entry_values = parallelize(pattern_args, :flatten => true) do |pattern|
+ entry_values = parallelize(pattern_args) do |pattern|
parallelize(Chef::ChefFS::FileSystem.list(config[:local] ? local_fs : chef_fs, pattern)) do |entry|
if entry.dir?
ui.error "#{format_path(entry)}: is a directory" if pattern.exact_path
@@ -40,7 +40,7 @@ class Chef
end
end
end
- end
+ end.flatten(1)
entry_values.each do |entry, value|
if entry
output "#{format_path(entry)}:"
@@ -54,4 +54,3 @@ class Chef
end
end
end
-
diff --git a/spec/integration/knife/chef_repo_path_spec.rb b/spec/integration/knife/chef_repo_path_spec.rb
index 4ffb179a4b..87619d8a58 100644
--- a/spec/integration/knife/chef_repo_path_spec.rb
+++ b/spec/integration/knife/chef_repo_path_spec.rb
@@ -101,7 +101,7 @@ EOM
/users/
/users/user3.json
EOM
- end
+ end
context 'when cwd is at the top level' do
cwd '.'
diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb
new file mode 100644
index 0000000000..a871b60e98
--- /dev/null
+++ b/spec/unit/chef_fs/parallelizer.rb
@@ -0,0 +1,482 @@
+require 'spec_helper'
+require 'chef/chef_fs/parallelizer'
+
+describe Chef::ChefFS::Parallelizer do
+ before :each do
+ @start_time = Time.now
+ end
+
+ def elapsed_time
+ Time.now - @start_time
+ end
+
+ after :each do
+ parallelizer.kill
+ end
+
+ context 'With a Parallelizer with 5 threads' do
+ let :parallelizer do
+ Chef::ChefFS::Parallelizer.new(5)
+ end
+
+ def parallelize(inputs, options = {}, &block)
+ parallelizer.parallelize(inputs, { :main_thread_processing => false }.merge(options), &block)
+ end
+
+ it "parallel_do creates unordered output as soon as it is available" do
+ outputs = []
+ parallelizer.parallel_do([0.5,0.3,0.1]) do |val|
+ sleep val
+ outputs << val
+ end
+ elapsed_time.should < 0.6
+ outputs.should == [ 0.1, 0.3, 0.5 ]
+ end
+
+ context "With :ordered => false (unordered output)" do
+ it "An empty input produces an empty output" do
+ parallelize([], :ordered => false) do
+ sleep 10
+ end.to_a == []
+ elapsed_time.should < 0.1
+ end
+
+ it "10 sleep(0.2)s complete within 0.5 seconds" do
+ parallelize(1.upto(10), :ordered => false) do |i|
+ sleep 0.2
+ 'x'
+ end.to_a.should == %w(x x x x x x x x x x)
+ elapsed_time.should < 0.5
+ end
+
+ it "The output comes as soon as it is available" do
+ enum = parallelize([0.5,0.3,0.1], :ordered => false) do |val|
+ sleep val
+ val
+ end
+ enum.map do |value|
+ elapsed_time.should < value+0.1
+ value
+ end.should == [ 0.1, 0.3, 0.5 ]
+ end
+
+ it "An exception in input is passed through but does NOT stop processing" do
+ input = TestEnumerable.new(0.5,0.3,0.1) do
+ raise 'hi'
+ end
+ enum = parallelize(input, :ordered => false) { |x| sleep(x); x }
+ results = []
+ expect { enum.each { |value| results << value } }.to raise_error 'hi'
+ results.should == [ 0.1, 0.3, 0.5 ]
+ elapsed_time.should < 0.6
+ end
+
+ it "Exceptions in output are raised after all processing is done" do
+ processed = 0
+ enum = parallelize([1,2,'x',3], :ordered => false) do |x|
+ if x == 'x'
+ sleep 0.1
+ raise 'hi'
+ end
+ sleep 0.2
+ processed += 1
+ x
+ end
+ results = []
+ expect { enum.each { |value| results << value } }.to raise_error 'hi'
+ results.sort.should == [ 1, 2, 3 ]
+ elapsed_time.should < 0.3
+ processed.should == 3
+ end
+
+ it "Exceptions with :stop_on_exception are raised after all processing is done" do
+ processed = 0
+ parallelized = parallelize([0.3,0.3,'x',0.3,0.3,0.3,0.3,0.3], :ordered => false, :stop_on_exception => true) do |x|
+ if x == 'x'
+ sleep(0.1)
+ raise 'hi'
+ end
+ sleep(x)
+ processed += 1
+ x
+ end
+ expect { parallelized.to_a }.to raise_error 'hi'
+ processed.should == 4
+ end
+ end
+
+ context "With :ordered => true (ordered output)" do
+ it "An empty input produces an empty output" do
+ parallelize([]) do
+ sleep 10
+ end.to_a == []
+ elapsed_time.should < 0.1
+ end
+
+ it "10 sleep(0.2)s complete within 0.5 seconds" do
+ parallelize(1.upto(10), :ordered => true) do |i|
+ sleep 0.2
+ 'x'
+ end.to_a.should == %w(x x x x x x x x x x)
+ elapsed_time.should < 0.5
+ end
+
+ it "Output comes in the order of the input" do
+ enum = parallelize([0.5,0.3,0.1]) do |val|
+ sleep val
+ val
+ end.enum_for(:each_with_index)
+ enum.next.should == [ 0.5, 0 ]
+ enum.next.should == [ 0.3, 1 ]
+ enum.next.should == [ 0.1, 2 ]
+ elapsed_time.should < 0.6
+ end
+
+ it "Exceptions in input are raised in the correct sequence but do NOT stop processing" do
+ input = TestEnumerable.new(0.5,0.3,0.1) do
+ raise 'hi'
+ end
+ results = []
+ enum = parallelize(input) { |x| sleep(x); x }
+ expect { enum.each { |value| results << value } }.to raise_error 'hi'
+ elapsed_time.should < 0.6
+ results.should == [ 0.5, 0.3, 0.1 ]
+ end
+
+ it "Exceptions in output are raised in the correct sequence and running processes do NOT stop processing" do
+ processed = 0
+ enum = parallelize([1,2,'x',3]) do |x|
+ if x == 'x'
+ sleep(0.1)
+ raise 'hi'
+ end
+ sleep(0.2)
+ processed += 1
+ x
+ end
+ results = []
+ expect { enum.each { |value| results << value } }.to raise_error 'hi'
+ results.should == [ 1, 2 ]
+ elapsed_time.should < 0.3
+ processed.should == 3
+ end
+
+ it "Exceptions with :stop_on_exception are raised after all processing is done" do
+ processed = 0
+ parallelized = parallelize([0.3,0.3,'x',0.3,0.3,0.3,0.3,0.3], :ordered => false, :stop_on_exception => true) do |x|
+ if x == 'x'
+ sleep(0.1)
+ raise 'hi'
+ end
+ sleep(x)
+ processed += 1
+ x
+ end
+ expect { parallelized.to_a }.to raise_error 'hi'
+ processed.should == 4
+ end
+ end
+
+ it "When the input is slow, output still proceeds" do
+ input = TestEnumerable.new do |&block|
+ block.call(1)
+ sleep 0.1
+ block.call(2)
+ sleep 0.1
+ block.call(3)
+ sleep 0.1
+ end
+ enum = parallelize(input) { |x| x }
+ enum.map do |value|
+ elapsed_time.should < (value+1)*0.1
+ value
+ end.should == [ 1, 2, 3 ]
+ end
+ end
+
+ context "With a Parallelizer with 1 thread" do
+ let :parallelizer do
+ Chef::ChefFS::Parallelizer.new(1)
+ end
+
+ context "when the thread is occupied with a job" do
+ before :each do
+ parallelizer
+ started = false
+ @occupying_job_finished = occupying_job_finished = [ false ]
+ @thread = Thread.new do
+ begin
+ parallelizer.parallelize([0], :main_thread_processing => false) do |x|
+ started = true
+ sleep(0.3)
+ occupying_job_finished[0] = true
+ end.wait
+ ensure
+ end
+ end
+ while !started
+ sleep(0.01)
+ end
+ end
+
+ after :each do
+ if RUBY_VERSION.to_f > 1.8
+ Thread.kill(@thread)
+ end
+ end
+
+ it "parallelize with :main_thread_processing = true does not block" do
+ parallelizer.parallelize([1]) do |x|
+ sleep(0.1)
+ x
+ end.to_a.should == [ 1 ]
+ elapsed_time.should < 0.2
+ end
+
+ it "parallelize with :main_thread_processing = false waits for the job to finish" do
+ parallelizer.parallelize([1], :main_thread_processing => false) do |x|
+ sleep(0.1)
+ x+1
+ end.to_a.should == [ 2 ]
+ elapsed_time.should > 0.3
+ end
+
+ it "resizing the Parallelizer to 0 waits for the job to stop" do
+ elapsed_time.should < 0.2
+ parallelizer.resize(0)
+ parallelizer.num_threads.should == 0
+ elapsed_time.should > 0.25
+ @occupying_job_finished.should == [ true ]
+ end
+
+ it "stopping the Parallelizer waits for the job to finish" do
+ elapsed_time.should < 0.2
+ parallelizer.stop
+ parallelizer.num_threads.should == 0
+ elapsed_time.should > 0.25
+ @occupying_job_finished.should == [ true ]
+ end
+
+ it "resizing the Parallelizer to 2 does not stop the job" do
+ elapsed_time.should < 0.2
+ parallelizer.resize(2)
+ parallelizer.num_threads.should == 2
+ elapsed_time.should < 0.2
+ sleep(0.3)
+ @occupying_job_finished.should == [ true ]
+ end
+ end
+
+ context "enumerable methods should run efficiently" do
+ it ".count does not process anything" do
+ outputs_processed = 0
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.count.should == 6
+ outputs_processed.should == 0
+ input_mapper.num_processed.should == 6
+ end
+
+ it ".count with arguments works normally" do
+ outputs_processed = 0
+ input_mapper = TestEnumerable.new(1,1,1,1,2,2,2,3,3,4)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ x
+ end
+ enum.count { |x| x > 1 }.should == 6
+ enum.count(2).should == 3
+ outputs_processed.should == 20
+ input_mapper.num_processed.should == 20
+ end
+
+ it ".first does not enumerate anything other than the first result(s)" do
+ outputs_processed = 0
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.first.should == 1
+ enum.first(2).should == [1,2]
+ outputs_processed.should == 3
+ input_mapper.num_processed.should == 3
+ end
+
+ it ".take does not enumerate anything other than the first result(s)" do
+ outputs_processed = 0
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.take(2).should == [1,2]
+ outputs_processed.should == 2
+ input_mapper.num_processed.should == 2
+ end
+
+ it ".drop does not process anything other than the last result(s)" do
+ outputs_processed = 0
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.drop(2).should == [3,4,5,6]
+ outputs_processed.should == 4
+ input_mapper.num_processed.should == 6
+ end
+
+ if Enumerable.method_defined?(:lazy)
+ it ".lazy.take does not enumerate anything other than the first result(s)" do
+ outputs_processed = 0
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.lazy.take(2).to_a.should == [1,2]
+ outputs_processed.should == 2
+ input_mapper.num_processed.should == 2
+ end
+
+ it ".drop does not process anything other than the last result(s)" do
+ outputs_processed = 0
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.lazy.drop(2).to_a.should == [3,4,5,6]
+ outputs_processed.should == 4
+ input_mapper.num_processed.should == 6
+ end
+
+ it "lazy enumerable is actually lazy" do
+ outputs_processed = 0
+ input_mapper = TestEnumerable.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.lazy.take(2)
+ enum.lazy.drop(2)
+ sleep(0.1)
+ outputs_processed.should == 0
+ input_mapper.num_processed.should == 0
+ end
+ end
+ end
+
+ context "running enumerable multiple times should function correctly" do
+ it ".map twice on the same parallel enumerable returns the correct results and re-processes the input" do
+ outputs_processed = 0
+ input_mapper = TestEnumerable.new(1,2,3)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ x
+ end
+ enum.map { |x| x }.should == [1,2,3]
+ enum.map { |x| x }.should == [1,2,3]
+ outputs_processed.should == 6
+ input_mapper.num_processed.should == 6
+ end
+
+ it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input" do
+ outputs_processed = 0
+ input_mapper = TestEnumerable.new(1,2,3)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ x
+ end
+ enum.first.should == 1
+ enum.map { |x| x }.should == [1,2,3]
+ outputs_processed.should >= 4
+ input_mapper.num_processed.should >= 4
+ end
+
+ it "two simultaneous enumerations throws an exception" do
+ enum = parallelizer.parallelize([1,2,3]) { |x| x }
+ a = enum.enum_for(:each)
+ a.next
+ expect do
+ b = enum.enum_for(:each)
+ b.next
+ end.to raise_error
+ end
+ end
+ end
+
+ context "With a Parallelizer with 0 threads" do
+ let :parallelizer do
+ Chef::ChefFS::Parallelizer.new(0)
+ end
+
+ context "And main_thread_processing on" do
+ it "succeeds in running" do
+ parallelizer.parallelize([0.5]) { |x| x*2 }.to_a.should == [1]
+ end
+ end
+ end
+
+ context "With a Parallelizer with 10 threads" do
+ let :parallelizer do
+ Chef::ChefFS::Parallelizer.new(10)
+ end
+
+ it "does not have contention issues with large numbers of inputs" do
+ parallelizer.parallelize(1.upto(500)) { |x| x+1 }.to_a.should == 2.upto(501).to_a
+ end
+
+ it "does not have contention issues with large numbers of inputs with ordering off" do
+ parallelizer.parallelize(1.upto(500), :ordered => false) { |x| x+1 }.to_a.sort.should == 2.upto(501).to_a
+ end
+
+ it "does not have contention issues with large numbers of jobs and inputs with ordering off" do
+ parallelizers = 0.upto(99).map do
+ parallelizer.parallelize(1.upto(500)) { |x| x+1 }
+ end
+ outputs = []
+ threads = 0.upto(99).map do |i|
+ Thread.new { outputs[i] = parallelizers[i].to_a }
+ end
+ threads.each { |thread| thread.join }
+ outputs.each { |output| output.sort.should == 2.upto(501).to_a }
+ end
+ end
+
+ class TestEnumerable
+ include Enumerable
+
+ def initialize(*values, &block)
+ @values = values
+ @block = block
+ @num_processed = 0
+ end
+
+ attr_reader :num_processed
+
+ def each(&each_block)
+ @values.each do |value|
+ @num_processed += 1
+ each_block.call(value)
+ end
+ if @block
+ @block.call do |value|
+ @num_processed += 1
+ each_block.call(value)
+ end
+ end
+ end
+ end
+end