summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJohn Keiser <jkeiser@opscode.com>2014-05-19 15:05:25 -0700
committerJohn Keiser <jkeiser@opscode.com>2014-05-19 15:05:25 -0700
commitd6f24954a16aa5e2e60050e1344265353a381428 (patch)
tree7f65ec13eb62982f64ce4477976c084370ea7a16 /lib
parent15d4ff065806478d1c20bf78ce7c2a4c2fb74022 (diff)
downloadchef-d6f24954a16aa5e2e60050e1344265353a381428.tar.gz
Fix tests on 1.8.7
Diffstat (limited to 'lib')
-rw-r--r--lib/chef/chef_fs/parallelizer.rb19
-rw-r--r--lib/chef/chef_fs/parallelizer/parallel_enumerable.rb13
2 files changed, 17 insertions, 15 deletions
diff --git a/lib/chef/chef_fs/parallelizer.rb b/lib/chef/chef_fs/parallelizer.rb
index 8e49e155df..116a626869 100644
--- a/lib/chef/chef_fs/parallelizer.rb
+++ b/lib/chef/chef_fs/parallelizer.rb
@@ -86,16 +86,19 @@ class Chef
private
def worker_loop
- while !@stop_thread[Thread.current]
- begin
- task = @tasks.pop
- task.call
- rescue
- puts "ERROR #{$!}"
- puts $!.backtrace
+ begin
+ while !@stop_thread[Thread.current]
+ begin
+ task = @tasks.pop
+ task.call
+ rescue
+ puts "ERROR #{$!}"
+ puts $!.backtrace
+ end
end
+ ensure
+ @stop_thread.delete(Thread.current)
end
- @stop_thread.delete(Thread.current)
end
end
end
diff --git a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
index 7354bc5c82..8e50f361db 100644
--- a/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
+++ b/lib/chef/chef_fs/parallelizer/parallel_enumerable.rb
@@ -24,7 +24,7 @@ class Chef
@block = block
@unconsumed_input = Queue.new
- @in_process = 0
+ @in_process = {}
@unconsumed_output = Queue.new
end
@@ -191,7 +191,7 @@ class Chef
# If no one is working on our tasks and we're allowed to
# work on them in the main thread, process an input to
# move things forward.
- if @in_process == 0 && !(@options[:main_thread_processing] == false)
+ if @in_process.size == 0 && !(@options[:main_thread_processing] == false)
process_one
end
end
@@ -225,7 +225,7 @@ class Chef
def stop
@unconsumed_input.clear
- while @in_process > 0
+ while @in_process.size > 0
sleep(0.05)
end
@unconsumed_output.clear
@@ -244,11 +244,11 @@ class Chef
# existing outputs to the user.
#
def finished?
- @unconsumed_input.empty? && @in_process == 0 && @unconsumed_output.empty?
+ @unconsumed_input.empty? && @in_process.size == 0 && @unconsumed_output.empty?
end
def process_one
- @in_process += 1
+ @in_process[Thread.current] = true
begin
begin
input, index = @unconsumed_input.pop(true)
@@ -256,7 +256,7 @@ class Chef
rescue ThreadError
end
ensure
- @in_process -= 1
+ @in_process.delete(Thread.current)
end
end
@@ -264,7 +264,6 @@ class Chef
begin
output = @block.call(input)
@unconsumed_output.push([ output, index, input, :result ])
-
rescue
if @options[:stop_on_exception]
@unconsumed_input.clear