diff options
-rw-r--r-- | lib/chef/chef_fs/parallelizer/parallel_enumerable.rb | 71 | ||||
-rw-r--r-- | spec/unit/chef_fs/parallelizer.rb | 158 |
2 files changed, 191 insertions, 38 deletions
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb index 6bd4a14fc3..46a01cbb4c 100644 --- a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb +++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb @@ -17,9 +17,9 @@ class Chef # on each() is allowed to process inputs. Default: true # NOTE: If you set this to false, parallelizer.kill will stop each() # in its tracks, so you need to know for sure that won't happen. - def initialize(parent_task_queue, enumerable, options = {}, &block) + def initialize(parent_task_queue, input_enumerable, options = {}, &block) @parent_task_queue = parent_task_queue - @enumerable = enumerable + @input_enumerable = input_enumerable @options = options @block = block @@ -28,6 +28,11 @@ class Chef @unconsumed_output = Queue.new end + attr_reader :parent_task_queue + attr_reader :input_enumerable + attr_reader :options + attr_reader :block + def each each_with_input do |output, index, input, type| yield output @@ -72,13 +77,67 @@ class Chef raise exception if exception end + # Enumerable methods + def restricted_copy(enumerable) + ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block) + end + + alias :original_count :count + + def count(*args, &block) + if args.size == 0 && block.nil? + @input_enumerable.count + else + original_count(*args, &block) + end + end + + def first(n=nil) + if n + restricted_copy(@input_enumerable.first(n)).to_a + else + first(1)[0] + end + end + + def drop(n) + restricted_copy(@input_enumerable.drop(n)).to_a + end + def flatten(levels = nil) FlattenEnumerable.new(self, levels) end - # TODO efficient implementation for - # count, first, drop, take, skip: run the method on the input enumerable - # and use that as our each + def take(n) + restricted_copy(@input_enumerable.take(n)).to_a + end + + class RestrictedLazy + def initialize(parallel_enumerable, actual_lazy) + @parallel_enumerable = parallel_enumerable + @actual_lazy = actual_lazy + end + + def drop(*args, &block) + input = @parallel_enumerable.input_enumerable.lazy.drop(*args, &block) + @parallel_enumerable.restricted_copy(input) + end + + def take(*args, &block) + input = @parallel_enumerable.input_enumerable.lazy.take(*args, &block) + @parallel_enumerable.restricted_copy(input) + end + + def method_missing(method, *args, &block) + @actual_lazy.send(:method, *args, &block) + end + end + + alias :original_lazy :lazy + + def lazy + RestrictedLazy.new(self, original_lazy) + end private @@ -91,7 +150,7 @@ class Chef # Grab all the inputs, yielding any responses during enumeration # in case the enumeration itself takes time begin - @enumerable.each_with_index do |input, index| + @input_enumerable.each_with_index do |input, index| @unconsumed_input.push([ input, index ]) @parent_task_queue.push(method(:process_one)) diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb index a91864fc2e..fa13d917a7 100644 --- a/spec/unit/chef_fs/parallelizer.rb +++ b/spec/unit/chef_fs/parallelizer.rb @@ -289,8 +289,9 @@ describe Chef::ChefFS::Parallelizer do class InputMapper include Enumerable - def initialize(*values) + def initialize(*values, &block) @values = values + @block = block @num_processed = 0 end @@ -301,43 +302,136 @@ describe Chef::ChefFS::Parallelizer do @num_processed += 1 yield value end + if @block + @block.call do |value| + @num_processed += 1 + yield value + end + end end end - it ".map twice on the same parallel enumerable returns the correct results and re-processes the input", :focus do - outputs_processed = 0 - input_mapper = InputMapper.new(1,2,3) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - x - end - enum.map { |x| x }.should == [1,2,3] - enum.map { |x| x }.should == [1,2,3] - outputs_processed.should == 6 - input_mapper.num_processed.should == 6 - end + context "enumerable methods should run efficiently" do + it ".count does not process anything" do + outputs_processed = 0 + input_mapper = InputMapper.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.count.should == 6 + outputs_processed.should == 0 + input_mapper.num_processed.should == 6 + end + + it ".count with arguments works normally" do + outputs_processed = 0 + input_mapper = InputMapper.new(1,1,1,1,2,2,2,3,3,4) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + x + end + enum.count { |x| x > 1 }.should == 6 + enum.count(2).should == 3 + outputs_processed.should == 20 + input_mapper.num_processed.should == 20 + end - it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input", :focus do - outputs_processed = 0 - input_mapper = InputMapper.new(1,2,3) - enum = parallelizer.parallelize(input_mapper) do |x| - outputs_processed += 1 - x - end - enum.first.should == 1 - enum.map { |x| x }.should == [1,2,3] - outputs_processed.should >= 4 - input_mapper.num_processed.should >= 4 + it ".first does not enumerate anything other than the first result(s)" do + outputs_processed = 0 + input_mapper = InputMapper.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.first.should == 1 + enum.first(2).should == [1,2] + outputs_processed.should == 3 + input_mapper.num_processed.should == 3 + end + + it ".take does not enumerate anything other than the first result(s)" do + outputs_processed = 0 + input_mapper = InputMapper.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.take(2).should == [1,2] + enum.lazy.take(2).to_a.should == [1,2] + outputs_processed.should == 4 + input_mapper.num_processed.should == 4 + end + + it ".drop does not process anything other than the last result(s)" do + outputs_processed = 0 + input_mapper = InputMapper.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.drop(2).should == [3,4,5,6] + enum.lazy.drop(2).to_a.should == [3,4,5,6] + outputs_processed.should == 8 + input_mapper.num_processed.should == 12 + end + + it "lazy enumerable is actually lazy" do + outputs_processed = 0 + input_mapper = InputMapper.new(1,2,3,4,5,6) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + sleep(0.05) # Just enough to yield and get other inputs in the queue + x + end + enum.lazy.take(2) + enum.lazy.drop(2) + sleep(0.1) + outputs_processed.should == 0 + input_mapper.num_processed.should == 0 + end end - it "two simultaneous enumerations throws an exception", :focus do - enum = parallelizer.parallelize([1,2,3]) { |x| x } - a = enum.enum_for(:each) - a.next - expect do - b = enum.enum_for(:each) - b.next - end.to raise_error + context "running enumerable multiple times should function correctly" do + it ".map twice on the same parallel enumerable returns the correct results and re-processes the input" do + outputs_processed = 0 + input_mapper = InputMapper.new(1,2,3) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + x + end + enum.map { |x| x }.should == [1,2,3] + enum.map { |x| x }.should == [1,2,3] + outputs_processed.should == 6 + input_mapper.num_processed.should == 6 + end + + it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input" do + outputs_processed = 0 + input_mapper = InputMapper.new(1,2,3) + enum = parallelizer.parallelize(input_mapper) do |x| + outputs_processed += 1 + x + end + enum.first.should == 1 + enum.map { |x| x }.should == [1,2,3] + outputs_processed.should >= 4 + input_mapper.num_processed.should >= 4 + end + + it "two simultaneous enumerations throws an exception" do + enum = parallelizer.parallelize([1,2,3]) { |x| x } + a = enum.enum_for(:each) + a.next + expect do + b = enum.enum_for(:each) + b.next + end.to raise_error + end end end |