summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-19 09:37:35 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-19 09:37:35 -0700
commit9f0bb81b3c8f873456a8e18eff3bee24c573ac8c (patch)
treef9420d75542f9dee72020f512efe98d25fb6b3ef
parent0760983e7aff5bafe5b2c8c43891fbb3d0fd3ed3 (diff)
downloadchef-9f0bb81b3c8f873456a8e18eff3bee24c573ac8c.tar.gz
Prevent parallel first/take/drop from processing things it shouldn't
-rw-r--r--lib/chef/chef_fs/parallelizer/parallel_enumerable.rb71
-rw-r--r--spec/unit/chef_fs/parallelizer.rb158
2 files changed, 191 insertions, 38 deletions
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
index 6bd4a14fc3..46a01cbb4c 100644
--- a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
+++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
@@ -17,9 +17,9 @@ class Chef
# on each() is allowed to process inputs. Default: true
# NOTE: If you set this to false, parallelizer.kill will stop each()
# in its tracks, so you need to know for sure that won't happen.
- def initialize(parent_task_queue, enumerable, options = {}, &block)
+ def initialize(parent_task_queue, input_enumerable, options = {}, &block)
@parent_task_queue = parent_task_queue
- @enumerable = enumerable
+ @input_enumerable = input_enumerable
@options = options
@block = block
@@ -28,6 +28,11 @@ class Chef
@unconsumed_output = Queue.new
end
+ attr_reader :parent_task_queue
+ attr_reader :input_enumerable
+ attr_reader :options
+ attr_reader :block
+
def each
each_with_input do |output, index, input, type|
yield output
@@ -72,13 +77,67 @@ class Chef
raise exception if exception
end
+ # Enumerable methods
+ def restricted_copy(enumerable)
+ ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block)
+ end
+
+ alias :original_count :count
+
+ def count(*args, &block)
+ if args.size == 0 && block.nil?
+ @input_enumerable.count
+ else
+ original_count(*args, &block)
+ end
+ end
+
+ def first(n=nil)
+ if n
+ restricted_copy(@input_enumerable.first(n)).to_a
+ else
+ first(1)[0]
+ end
+ end
+
+ def drop(n)
+ restricted_copy(@input_enumerable.drop(n)).to_a
+ end
+
def flatten(levels = nil)
FlattenEnumerable.new(self, levels)
end
- # TODO efficient implementation for
- # count, first, drop, take, skip: run the method on the input enumerable
- # and use that as our each
+ def take(n)
+ restricted_copy(@input_enumerable.take(n)).to_a
+ end
+
+ class RestrictedLazy
+ def initialize(parallel_enumerable, actual_lazy)
+ @parallel_enumerable = parallel_enumerable
+ @actual_lazy = actual_lazy
+ end
+
+ def drop(*args, &block)
+ input = @parallel_enumerable.input_enumerable.lazy.drop(*args, &block)
+ @parallel_enumerable.restricted_copy(input)
+ end
+
+ def take(*args, &block)
+ input = @parallel_enumerable.input_enumerable.lazy.take(*args, &block)
+ @parallel_enumerable.restricted_copy(input)
+ end
+
+ def method_missing(method, *args, &block)
+ @actual_lazy.send(:method, *args, &block)
+ end
+ end
+
+ alias :original_lazy :lazy
+
+ def lazy
+ RestrictedLazy.new(self, original_lazy)
+ end
private
@@ -91,7 +150,7 @@ class Chef
# Grab all the inputs, yielding any responses during enumeration
# in case the enumeration itself takes time
begin
- @enumerable.each_with_index do |input, index|
+ @input_enumerable.each_with_index do |input, index|
@unconsumed_input.push([ input, index ])
@parent_task_queue.push(method(:process_one))
diff --git a/spec/unit/chef_fs/parallelizer.rb b/spec/unit/chef_fs/parallelizer.rb
index a91864fc2e..fa13d917a7 100644
--- a/spec/unit/chef_fs/parallelizer.rb
+++ b/spec/unit/chef_fs/parallelizer.rb
@@ -289,8 +289,9 @@ describe Chef::ChefFS::Parallelizer do
class InputMapper
include Enumerable
- def initialize(*values)
+ def initialize(*values, &block)
@values = values
+ @block = block
@num_processed = 0
end
@@ -301,43 +302,136 @@ describe Chef::ChefFS::Parallelizer do
@num_processed += 1
yield value
end
+ if @block
+ @block.call do |value|
+ @num_processed += 1
+ yield value
+ end
+ end
end
end
- it ".map twice on the same parallel enumerable returns the correct results and re-processes the input", :focus do
- outputs_processed = 0
- input_mapper = InputMapper.new(1,2,3)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- x
- end
- enum.map { |x| x }.should == [1,2,3]
- enum.map { |x| x }.should == [1,2,3]
- outputs_processed.should == 6
- input_mapper.num_processed.should == 6
- end
+ context "enumerable methods should run efficiently" do
+ it ".count does not process anything" do
+ outputs_processed = 0
+ input_mapper = InputMapper.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.count.should == 6
+ outputs_processed.should == 0
+ input_mapper.num_processed.should == 6
+ end
+
+ it ".count with arguments works normally" do
+ outputs_processed = 0
+ input_mapper = InputMapper.new(1,1,1,1,2,2,2,3,3,4)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ x
+ end
+ enum.count { |x| x > 1 }.should == 6
+ enum.count(2).should == 3
+ outputs_processed.should == 20
+ input_mapper.num_processed.should == 20
+ end
- it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input", :focus do
- outputs_processed = 0
- input_mapper = InputMapper.new(1,2,3)
- enum = parallelizer.parallelize(input_mapper) do |x|
- outputs_processed += 1
- x
- end
- enum.first.should == 1
- enum.map { |x| x }.should == [1,2,3]
- outputs_processed.should >= 4
- input_mapper.num_processed.should >= 4
+ it ".first does not enumerate anything other than the first result(s)" do
+ outputs_processed = 0
+ input_mapper = InputMapper.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.first.should == 1
+ enum.first(2).should == [1,2]
+ outputs_processed.should == 3
+ input_mapper.num_processed.should == 3
+ end
+
+ it ".take does not enumerate anything other than the first result(s)" do
+ outputs_processed = 0
+ input_mapper = InputMapper.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.take(2).should == [1,2]
+ enum.lazy.take(2).to_a.should == [1,2]
+ outputs_processed.should == 4
+ input_mapper.num_processed.should == 4
+ end
+
+ it ".drop does not process anything other than the last result(s)" do
+ outputs_processed = 0
+ input_mapper = InputMapper.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.drop(2).should == [3,4,5,6]
+ enum.lazy.drop(2).to_a.should == [3,4,5,6]
+ outputs_processed.should == 8
+ input_mapper.num_processed.should == 12
+ end
+
+ it "lazy enumerable is actually lazy" do
+ outputs_processed = 0
+ input_mapper = InputMapper.new(1,2,3,4,5,6)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ sleep(0.05) # Just enough to yield and get other inputs in the queue
+ x
+ end
+ enum.lazy.take(2)
+ enum.lazy.drop(2)
+ sleep(0.1)
+ outputs_processed.should == 0
+ input_mapper.num_processed.should == 0
+ end
end
- it "two simultaneous enumerations throws an exception", :focus do
- enum = parallelizer.parallelize([1,2,3]) { |x| x }
- a = enum.enum_for(:each)
- a.next
- expect do
- b = enum.enum_for(:each)
- b.next
- end.to raise_error
+ context "running enumerable multiple times should function correctly" do
+ it ".map twice on the same parallel enumerable returns the correct results and re-processes the input" do
+ outputs_processed = 0
+ input_mapper = InputMapper.new(1,2,3)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ x
+ end
+ enum.map { |x| x }.should == [1,2,3]
+ enum.map { |x| x }.should == [1,2,3]
+ outputs_processed.should == 6
+ input_mapper.num_processed.should == 6
+ end
+
+ it ".first and then .map on the same parallel enumerable returns the correct results and re-processes the input" do
+ outputs_processed = 0
+ input_mapper = InputMapper.new(1,2,3)
+ enum = parallelizer.parallelize(input_mapper) do |x|
+ outputs_processed += 1
+ x
+ end
+ enum.first.should == 1
+ enum.map { |x| x }.should == [1,2,3]
+ outputs_processed.should >= 4
+ input_mapper.num_processed.should >= 4
+ end
+
+ it "two simultaneous enumerations throws an exception" do
+ enum = parallelizer.parallelize([1,2,3]) { |x| x }
+ a = enum.enum_for(:each)
+ a.next
+ expect do
+ b = enum.enum_for(:each)
+ b.next
+ end.to raise_error
+ end
end
end