diff options
author | Nikhil Benesch <nikhil.benesch@gmail.com> | 2014-03-09 22:57:21 -0400 |
---|---|---|
committer | Claire McQuin <claire@getchef.com> | 2014-05-13 15:21:15 -0700 |
commit | 89d5d20dcd494c7d42a1f24f8a3ab7712735267e (patch) | |
tree | fbcaa2b854d0dc54405485ec9fa063d52e4ad060 | |
parent | bea3c04f0bf8f89c637cc8073825188acb095bff (diff) | |
download | chef-89d5d20dcd494c7d42a1f24f8a3ab7712735267e.tar.gz |
CHEF-4423: extract cookbook upload threaded queue into utility class
-rw-r--r-- | lib/chef/cookbook_uploader.rb | 29 | ||||
-rw-r--r-- | lib/chef/util/threaded_job_queue.rb | 61 | ||||
-rw-r--r-- | spec/unit/util/threaded_job_queue_spec.rb | 51 |
3 files changed, 116 insertions, 25 deletions
diff --git a/lib/chef/cookbook_uploader.rb b/lib/chef/cookbook_uploader.rb index 6524eed3e5..968673d87a 100644 --- a/lib/chef/cookbook_uploader.rb +++ b/lib/chef/cookbook_uploader.rb @@ -7,29 +7,12 @@ require 'chef/digester' require 'chef/cookbook_version' require 'chef/cookbook/syntax_check' require 'chef/cookbook/file_system_file_vendor' +require 'chef/util/threaded_job_queue' require 'chef/sandbox' -require 'thread' class Chef class CookbookUploader - def self.work_queue - @work_queue ||= Queue.new - end - - def self.setup_worker_threads(concurrency=10) - @worker_threads ||= begin - work_queue - (1..concurrency).map do - Thread.new do - loop do - work_queue.pop.call - end - end - end - end - end - attr_reader :cookbooks attr_reader :path attr_reader :opts @@ -61,8 +44,6 @@ class Chef end def upload_cookbooks - Thread.abort_on_exception = true - # Syntax Check validate_cookbooks # generate checksums of cookbook files and create a sandbox @@ -77,7 +58,7 @@ class Chef Chef::Log.info("Uploading files") - self.class.setup_worker_threads(concurrency) + queue = Chef::Util::ThreadedJobQueue.new checksums_to_upload = Set.new @@ -86,15 +67,13 @@ class Chef if info['needs_upload'] == true checksums_to_upload << checksum Chef::Log.info("Uploading #{checksum_files[checksum]} (checksum hex = #{checksum}) to #{info['url']}") - self.class.work_queue << uploader_function_for(checksum_files[checksum], checksum, info['url'], checksums_to_upload) + queue << uploader_function_for(checksum_files[checksum], checksum, info['url'], checksums_to_upload) else Chef::Log.debug("#{checksum_files[checksum]} has not changed") end end - until checksums_to_upload.empty? - sleep 0.1 - end + queue.process(@concurrency) sandbox_url = new_sandbox['uri'] Chef::Log.debug("Committing sandbox") diff --git a/lib/chef/util/threaded_job_queue.rb b/lib/chef/util/threaded_job_queue.rb new file mode 100644 index 0000000000..824cd0a3c4 --- /dev/null +++ b/lib/chef/util/threaded_job_queue.rb @@ -0,0 +1,61 @@ +# Copyright:: Copyright (c) 2014 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'thread' + +class Chef + class Util + # A simple threaded job queue + # + # Create a queue: + # + # queue = ThreadedJobQueue.new + # + # Add jobs: + # + # queue << lambda { |lock| foo.the_bar } + # + # A job is a callable that optionally takes a Mutex instance as its only + # parameter. + # + # Then start processing jobs with +n+ threads: + # + # queue.process(n) + # + class ThreadedJobQueue + def initialize + @queue = Queue.new + @lock = Mutex.new + end + + def <<(job) + @queue << job + end + + def process(concurrency = 10) + workers = (1..concurrency).map do + Thread.new do + loop do + fn = @queue.pop + fn.arity == 1 ? fn.call(@lock) : fn.call + end + end + end + workers.each { |worker| self << Thread.method(:exit) } + workers.each { |worker| worker.join } + end + end + end +end diff --git a/spec/unit/util/threaded_job_queue_spec.rb b/spec/unit/util/threaded_job_queue_spec.rb new file mode 100644 index 0000000000..a199937639 --- /dev/null +++ b/spec/unit/util/threaded_job_queue_spec.rb @@ -0,0 +1,51 @@ +# Copyright:: Copyright (c) 2014 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'spec_helper' + +class WorkerThreadError < StandardError +end + +describe Chef::Util::ThreadedJobQueue do + let(:queue) { Chef::Util::ThreadedJobQueue.new } + + it "should pass mutex to jobs with an arity of 1" do + job = double() + job.should_receive(:arity).at_least(:once).and_return(1) + job.should_receive(:call).exactly(5).times.with(an_instance_of(Mutex)) + + 5.times { queue << job } + queue.process + end + + it "should pass nothing to jobs with an arity of 0" do + job = double() + job.should_receive(:arity).at_least(:once).and_return(0) + job.should_receive(:call).exactly(5).times.with(no_args) + + 5.times { queue << job } + queue.process + end + + it "should use specified number of threads" do + Thread.should_receive(:new).exactly(7).times.and_call_original + queue.process(7) + end + + it "should propagate exceptions to the main thread" do + queue << lambda { raise WorkerThreadError } + lambda { queue.process }.should raise_error(WorkerThreadError) + end +end |