summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNikhil Benesch <nikhil.benesch@gmail.com>2014-03-09 22:57:21 -0400
committerClaire McQuin <claire@getchef.com>2014-05-13 15:21:15 -0700
commit89d5d20dcd494c7d42a1f24f8a3ab7712735267e (patch)
treefbcaa2b854d0dc54405485ec9fa063d52e4ad060
parentbea3c04f0bf8f89c637cc8073825188acb095bff (diff)
downloadchef-89d5d20dcd494c7d42a1f24f8a3ab7712735267e.tar.gz
CHEF-4423: extract cookbook upload threaded queue into utility class
-rw-r--r--lib/chef/cookbook_uploader.rb29
-rw-r--r--lib/chef/util/threaded_job_queue.rb61
-rw-r--r--spec/unit/util/threaded_job_queue_spec.rb51
3 files changed, 116 insertions, 25 deletions
diff --git a/lib/chef/cookbook_uploader.rb b/lib/chef/cookbook_uploader.rb
index 6524eed3e5..968673d87a 100644
--- a/lib/chef/cookbook_uploader.rb
+++ b/lib/chef/cookbook_uploader.rb
@@ -7,29 +7,12 @@ require 'chef/digester'
require 'chef/cookbook_version'
require 'chef/cookbook/syntax_check'
require 'chef/cookbook/file_system_file_vendor'
+require 'chef/util/threaded_job_queue'
require 'chef/sandbox'
-require 'thread'
class Chef
class CookbookUploader
- def self.work_queue
- @work_queue ||= Queue.new
- end
-
- def self.setup_worker_threads(concurrency=10)
- @worker_threads ||= begin
- work_queue
- (1..concurrency).map do
- Thread.new do
- loop do
- work_queue.pop.call
- end
- end
- end
- end
- end
-
attr_reader :cookbooks
attr_reader :path
attr_reader :opts
@@ -61,8 +44,6 @@ class Chef
end
def upload_cookbooks
- Thread.abort_on_exception = true
-
# Syntax Check
validate_cookbooks
# generate checksums of cookbook files and create a sandbox
@@ -77,7 +58,7 @@ class Chef
Chef::Log.info("Uploading files")
- self.class.setup_worker_threads(concurrency)
+ queue = Chef::Util::ThreadedJobQueue.new
checksums_to_upload = Set.new
@@ -86,15 +67,13 @@ class Chef
if info['needs_upload'] == true
checksums_to_upload << checksum
Chef::Log.info("Uploading #{checksum_files[checksum]} (checksum hex = #{checksum}) to #{info['url']}")
- self.class.work_queue << uploader_function_for(checksum_files[checksum], checksum, info['url'], checksums_to_upload)
+ queue << uploader_function_for(checksum_files[checksum], checksum, info['url'], checksums_to_upload)
else
Chef::Log.debug("#{checksum_files[checksum]} has not changed")
end
end
- until checksums_to_upload.empty?
- sleep 0.1
- end
+ queue.process(@concurrency)
sandbox_url = new_sandbox['uri']
Chef::Log.debug("Committing sandbox")
diff --git a/lib/chef/util/threaded_job_queue.rb b/lib/chef/util/threaded_job_queue.rb
new file mode 100644
index 0000000000..824cd0a3c4
--- /dev/null
+++ b/lib/chef/util/threaded_job_queue.rb
@@ -0,0 +1,61 @@
+# Copyright:: Copyright (c) 2014 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'thread'
+
+class Chef
+ class Util
+ # A simple threaded job queue
+ #
+ # Create a queue:
+ #
+ # queue = ThreadedJobQueue.new
+ #
+ # Add jobs:
+ #
+ # queue << lambda { |lock| foo.the_bar }
+ #
+ # A job is a callable that optionally takes a Mutex instance as its only
+ # parameter.
+ #
+ # Then start processing jobs with +n+ threads:
+ #
+ # queue.process(n)
+ #
+ class ThreadedJobQueue
+ def initialize
+ @queue = Queue.new
+ @lock = Mutex.new
+ end
+
+ def <<(job)
+ @queue << job
+ end
+
+ def process(concurrency = 10)
+ workers = (1..concurrency).map do
+ Thread.new do
+ loop do
+ fn = @queue.pop
+ fn.arity == 1 ? fn.call(@lock) : fn.call
+ end
+ end
+ end
+ workers.each { |worker| self << Thread.method(:exit) }
+ workers.each { |worker| worker.join }
+ end
+ end
+ end
+end
diff --git a/spec/unit/util/threaded_job_queue_spec.rb b/spec/unit/util/threaded_job_queue_spec.rb
new file mode 100644
index 0000000000..a199937639
--- /dev/null
+++ b/spec/unit/util/threaded_job_queue_spec.rb
@@ -0,0 +1,51 @@
+# Copyright:: Copyright (c) 2014 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'spec_helper'
+
+class WorkerThreadError < StandardError
+end
+
+describe Chef::Util::ThreadedJobQueue do
+ let(:queue) { Chef::Util::ThreadedJobQueue.new }
+
+ it "should pass mutex to jobs with an arity of 1" do
+ job = double()
+ job.should_receive(:arity).at_least(:once).and_return(1)
+ job.should_receive(:call).exactly(5).times.with(an_instance_of(Mutex))
+
+ 5.times { queue << job }
+ queue.process
+ end
+
+ it "should pass nothing to jobs with an arity of 0" do
+ job = double()
+ job.should_receive(:arity).at_least(:once).and_return(0)
+ job.should_receive(:call).exactly(5).times.with(no_args)
+
+ 5.times { queue << job }
+ queue.process
+ end
+
+ it "should use specified number of threads" do
+ Thread.should_receive(:new).exactly(7).times.and_call_original
+ queue.process(7)
+ end
+
+ it "should propagate exceptions to the main thread" do
+ queue << lambda { raise WorkerThreadError }
+ lambda { queue.process }.should raise_error(WorkerThreadError)
+ end
+end