summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-19 09:37:35 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-19 09:37:35 -0700
commit9f0bb81b3c8f873456a8e18eff3bee24c573ac8c (patch)
treef9420d75542f9dee72020f512efe98d25fb6b3ef /lib
parent0760983e7aff5bafe5b2c8c43891fbb3d0fd3ed3 (diff)
downloadchef-9f0bb81b3c8f873456a8e18eff3bee24c573ac8c.tar.gz
Prevent parallel first/take/drop from processing things it shouldn't
Diffstat (limited to 'lib')
-rw-r--r--lib/chef/chef_fs/parallelizer/parallel_enumerable.rb71
1 files changed, 65 insertions, 6 deletions
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
index 6bd4a14fc3..46a01cbb4c 100644
--- a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
+++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
@@ -17,9 +17,9 @@ class Chef
# on each() is allowed to process inputs. Default: true
# NOTE: If you set this to false, parallelizer.kill will stop each()
# in its tracks, so you need to know for sure that won't happen.
- def initialize(parent_task_queue, enumerable, options = {}, &block)
+ def initialize(parent_task_queue, input_enumerable, options = {}, &block)
@parent_task_queue = parent_task_queue
- @enumerable = enumerable
+ @input_enumerable = input_enumerable
@options = options
@block = block
@@ -28,6 +28,11 @@ class Chef
@unconsumed_output = Queue.new
end
+ attr_reader :parent_task_queue
+ attr_reader :input_enumerable
+ attr_reader :options
+ attr_reader :block
+
def each
each_with_input do |output, index, input, type|
yield output
@@ -72,13 +77,67 @@ class Chef
raise exception if exception
end
+ # Enumerable methods
+ def restricted_copy(enumerable)
+ ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block)
+ end
+
+ alias :original_count :count
+
+ def count(*args, &block)
+ if args.size == 0 && block.nil?
+ @input_enumerable.count
+ else
+ original_count(*args, &block)
+ end
+ end
+
+ def first(n=nil)
+ if n
+ restricted_copy(@input_enumerable.first(n)).to_a
+ else
+ first(1)[0]
+ end
+ end
+
+ def drop(n)
+ restricted_copy(@input_enumerable.drop(n)).to_a
+ end
+
def flatten(levels = nil)
FlattenEnumerable.new(self, levels)
end
- # TODO efficient implementation for
- # count, first, drop, take, skip: run the method on the input enumerable
- # and use that as our each
+ def take(n)
+ restricted_copy(@input_enumerable.take(n)).to_a
+ end
+
+ class RestrictedLazy
+ def initialize(parallel_enumerable, actual_lazy)
+ @parallel_enumerable = parallel_enumerable
+ @actual_lazy = actual_lazy
+ end
+
+ def drop(*args, &block)
+ input = @parallel_enumerable.input_enumerable.lazy.drop(*args, &block)
+ @parallel_enumerable.restricted_copy(input)
+ end
+
+ def take(*args, &block)
+ input = @parallel_enumerable.input_enumerable.lazy.take(*args, &block)
+ @parallel_enumerable.restricted_copy(input)
+ end
+
+ def method_missing(method, *args, &block)
+ @actual_lazy.send(:method, *args, &block)
+ end
+ end
+
+ alias :original_lazy :lazy
+
+ def lazy
+ RestrictedLazy.new(self, original_lazy)
+ end
private
@@ -91,7 +150,7 @@ class Chef
# Grab all the inputs, yielding any responses during enumeration
# in case the enumeration itself takes time
begin
- @enumerable.each_with_index do |input, index|
+ @input_enumerable.each_with_index do |input, index|
@unconsumed_input.push([ input, index ])
@parent_task_queue.push(method(:process_one))