diff options
author | John Keiser <jkeiser@opscode.com> | 2014-05-19 09:37:35 -0700 |
---|---|---|
committer | John Keiser <jkeiser@opscode.com> | 2014-05-19 09:37:35 -0700 |
commit | 9f0bb81b3c8f873456a8e18eff3bee24c573ac8c (patch) | |
tree | f9420d75542f9dee72020f512efe98d25fb6b3ef /lib/chef/chef_fs/parallelizer | |
parent | 0760983e7aff5bafe5b2c8c43891fbb3d0fd3ed3 (diff) | |
download | chef-9f0bb81b3c8f873456a8e18eff3bee24c573ac8c.tar.gz |
Prevent parallel first/take/drop from processing things it shouldn't
Diffstat (limited to 'lib/chef/chef_fs/parallelizer')
-rw-r--r-- | lib/chef/chef_fs/parallelizer/parallel_enumerable.rb | 71 |
1 files changed, 65 insertions, 6 deletions
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb index 6bd4a14fc3..46a01cbb4c 100644 --- a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb +++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb @@ -17,9 +17,9 @@ class Chef # on each() is allowed to process inputs. Default: true # NOTE: If you set this to false, parallelizer.kill will stop each() # in its tracks, so you need to know for sure that won't happen. - def initialize(parent_task_queue, enumerable, options = {}, &block) + def initialize(parent_task_queue, input_enumerable, options = {}, &block) @parent_task_queue = parent_task_queue - @enumerable = enumerable + @input_enumerable = input_enumerable @options = options @block = block @@ -28,6 +28,11 @@ class Chef @unconsumed_output = Queue.new end + attr_reader :parent_task_queue + attr_reader :input_enumerable + attr_reader :options + attr_reader :block + def each each_with_input do |output, index, input, type| yield output @@ -72,13 +77,67 @@ class Chef raise exception if exception end + # Enumerable methods + def restricted_copy(enumerable) + ParallelEnumerable.new(@parent_task_queue, enumerable, @options, &@block) + end + + alias :original_count :count + + def count(*args, &block) + if args.size == 0 && block.nil? + @input_enumerable.count + else + original_count(*args, &block) + end + end + + def first(n=nil) + if n + restricted_copy(@input_enumerable.first(n)).to_a + else + first(1)[0] + end + end + + def drop(n) + restricted_copy(@input_enumerable.drop(n)).to_a + end + def flatten(levels = nil) FlattenEnumerable.new(self, levels) end - # TODO efficient implementation for - # count, first, drop, take, skip: run the method on the input enumerable - # and use that as our each + def take(n) + restricted_copy(@input_enumerable.take(n)).to_a + end + + class RestrictedLazy + def initialize(parallel_enumerable, actual_lazy) + @parallel_enumerable = parallel_enumerable + @actual_lazy = actual_lazy + end + + def drop(*args, &block) + input = @parallel_enumerable.input_enumerable.lazy.drop(*args, &block) + @parallel_enumerable.restricted_copy(input) + end + + def take(*args, &block) + input = @parallel_enumerable.input_enumerable.lazy.take(*args, &block) + @parallel_enumerable.restricted_copy(input) + end + + def method_missing(method, *args, &block) + @actual_lazy.send(:method, *args, &block) + end + end + + alias :original_lazy :lazy + + def lazy + RestrictedLazy.new(self, original_lazy) + end private @@ -91,7 +150,7 @@ class Chef # Grab all the inputs, yielding any responses during enumeration # in case the enumeration itself takes time begin - @enumerable.each_with_index do |input, index| + @input_enumerable.each_with_index do |input, index| @unconsumed_input.push([ input, index ]) @parent_task_queue.push(method(:process_one)) |