diff options
author | Claire McQuin <mcquin@users.noreply.github.com> | 2014-05-15 14:01:26 -0700 |
---|---|---|
committer | Claire McQuin <mcquin@users.noreply.github.com> | 2014-05-15 14:01:26 -0700 |
commit | 831691edfb95a4f56d684819d698d81fdd517f2a (patch) | |
tree | f66344ac295175f572b422f58c1bb4d9942a01dc | |
parent | bea3c04f0bf8f89c637cc8073825188acb095bff (diff) | |
parent | 8bad4e917ac4f0da4cc6b3585bad17e66d6993df (diff) | |
download | chef-831691edfb95a4f56d684819d698d81fdd517f2a.tar.gz |
Merge pull request #1434 from opscode/CHEF-4423
Cookbook synchronization speedup (CHEF-4423)
-rw-r--r-- | CHANGELOG.md | 1 | ||||
-rw-r--r-- | CONTRIBUTIONS.md | 1 | ||||
-rw-r--r-- | DOC_CHANGES.md | 4 | ||||
-rw-r--r-- | lib/chef/config.rb | 6 | ||||
-rw-r--r-- | lib/chef/cookbook/synchronizer.rb | 106 | ||||
-rw-r--r-- | lib/chef/cookbook_uploader.rb | 29 | ||||
-rw-r--r-- | lib/chef/cookbook_version.rb | 20 | ||||
-rw-r--r-- | lib/chef/formatters/error_inspectors/api_error_formatting.rb | 19 | ||||
-rw-r--r-- | lib/chef/formatters/error_inspectors/cookbook_sync_error_inspector.rb | 4 | ||||
-rw-r--r-- | lib/chef/mixin/create_path.rb | 24 | ||||
-rw-r--r-- | lib/chef/util/threaded_job_queue.rb | 61 | ||||
-rw-r--r-- | spec/unit/util/threaded_job_queue_spec.rb | 51 |
12 files changed, 243 insertions, 83 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 870077b0ff..c3556f6e67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ * `freebsd_package` resource now uses the brand new "pkgng" package manager when available. (CHEF-4637) * chef-full template gets knife options to override install script url, add wget/curl cli options, and custom install commands (CHEF-4697) * knife now bootstraps node with the latest current version of chef-client. (CHEF-4911) +* Add a threaded download queue for synchronizing cookbooks in parallel. (CHEF-4423) ## Last Release: 11.12.0 RC1 (03/31/2014) * SIGTERM will once-more kill a non-daemonized chef-client (CHEF-5172) diff --git a/CONTRIBUTIONS.md b/CONTRIBUTIONS.md index 4b401521d2..892af90134 100644 --- a/CONTRIBUTIONS.md +++ b/CONTRIBUTIONS.md @@ -16,3 +16,4 @@ Example Contribution: * **hongbin**: Made bootstrap report authentication exceptions. (CHEF-5161) * **liseki**: Made `freebsd_package` resource use the brand new "pkgng" package manager when available.(CHEF-4637) +* **benesch**: Implemented a threaded download queue for synchronizing cookbooks. (CHEF-4423) diff --git a/DOC_CHANGES.md b/DOC_CHANGES.md index 2987bf19b2..00d15f7b18 100644 --- a/DOC_CHANGES.md +++ b/DOC_CHANGES.md @@ -33,3 +33,7 @@ You can now modify the chef-full template with the following options in `knife b * `--bootstrap-install-sh URL` fetches and executes an installation bash script from the provided URL. * `--bootstrap-wget-options OPTIONS` and `--bootstrap-curl-options OPTIONS` allow arbitrary options to be added to wget and curl. * `--bootstrap-install-command COMMAND` can be used to execute a custom chef-client installation command sequence. Take note that this cannot be used in conjunction with the above options. + +### Parallelize cookbook synchronization + +You can now synchronize your cookbooks faster by parallelizing the process. You can specify the number of helper threads in your config file with `cookbook_sync_threads NUM_THREADS`. The default is 10. Increasing `NUM_THREADS` can result in gateway errors from the chef server (namely 503 and 504). If you are experiencing these often, consider decreasing `NUM_THREADS` to fewer than default. diff --git a/lib/chef/config.rb b/lib/chef/config.rb index f9a3289b30..35b07c24ea 100644 --- a/lib/chef/config.rb +++ b/lib/chef/config.rb @@ -552,6 +552,12 @@ class Chef # immediately if 0.) default :run_lock_timeout, nil + # Number of worker threads for syncing cookbooks in parallel. Increasing + # this number can result in gateway errors from the server (namely 503 and 504). + # If you are seeing this behavior while using the default setting, reducing + # the number of threads will help. + default :cookbook_sync_threads, 10 + # If installed via an omnibus installer, this gives the path to the # "embedded" directory which contains all of the software packaged with # omnibus. This is used to locate the cacert.pem file on windows. diff --git a/lib/chef/cookbook/synchronizer.rb b/lib/chef/cookbook/synchronizer.rb index fc5d16617c..0e8f5e692e 100644 --- a/lib/chef/cookbook/synchronizer.rb +++ b/lib/chef/cookbook/synchronizer.rb @@ -1,4 +1,5 @@ require 'chef/client' +require 'chef/util/threaded_job_queue' require 'singleton' class Chef @@ -56,6 +57,8 @@ class Chef # Synchronizes the locally cached copies of cookbooks with the files on the # server. class CookbookSynchronizer + CookbookFile = Struct.new(:cookbook, :segment, :manifest_record) + def initialize(cookbooks_by_name, events) @eager_segments = Chef::CookbookVersion::COOKBOOK_SEGMENTS.dup unless Chef::Config[:no_lazy_load] @@ -87,6 +90,38 @@ class Chef @cookbooks_by_name.key?(cookbook_name) end + def files + @files ||= cookbooks.inject([]) do |memo, cookbook| + @eager_segments.each do |segment| + cookbook.manifest[segment].each do |manifest_record| + memo << CookbookFile.new(cookbook, segment, manifest_record) + end + end + memo + end + end + + def files_by_cookbook + files.group_by { |file| file.cookbook } + end + + def files_remaining_by_cookbook + @files_remaining_by_cookbook ||= begin + files_by_cookbook.inject({}) do |memo, (cookbook, files)| + memo[cookbook] = files.size + memo + end + end + end + + def mark_file_synced(file) + files_remaining_by_cookbook[file.cookbook] -= 1 + + if files_remaining_by_cookbook[file.cookbook] == 0 + @events.synchronized_cookbook(file.cookbook.name) + end + end + # Synchronizes all the cookbooks from the chef-server. #) # === Returns @@ -97,14 +132,19 @@ class Chef clear_obsoleted_cookbooks - @events.cookbook_sync_start(cookbook_count) + queue = Chef::Util::ThreadedJobQueue.new - # Synchronize each of the node's cookbooks, and add to the - # valid_cache_entries hash. - cookbooks.each do |cookbook| - sync_cookbook(cookbook) + files.each do |file| + queue << lambda do |lock| + sync_file(file) + lock.synchronize { mark_file_synced(file) } + end end + @events.cookbook_sync_start(cookbook_count) + queue.process(Chef::Config[:cookbook_sync_threads]) + update_cookbook_filenames + rescue Exception => e @events.cookbook_sync_failed(cookbooks, e) raise @@ -129,61 +169,43 @@ class Chef @events.cookbook_clean_complete end - # Sync the eagerly loaded files contained by +cookbook+ - # - # === Arguments - # cookbook<Chef::Cookbook>:: The cookbook to update - # valid_cache_entries<Hash>:: Out-param; Added to this hash are the files that - # were referred to by this cookbook - def sync_cookbook(cookbook) - Chef::Log.debug("Synchronizing cookbook #{cookbook.name} #{cookbook.version}") - - # files and templates are lazily loaded, and will be done later. - - @eager_segments.each do |segment| - segment_filenames = Array.new - cookbook.manifest[segment].each do |manifest_record| - - cache_filename = sync_file_in_cookbook(cookbook, manifest_record) - # make the segment filenames a full path. - full_path_cache_filename = cache.load(cache_filename, false) - segment_filenames << full_path_cache_filename - end + def update_cookbook_filenames + files_by_cookbook.each do |cookbook, cookbook_files| + files_by_segment = cookbook_files.group_by { |file| file.segment } + @eager_segments.each do |segment| + segment_files = files_by_segment[segment] + next unless segment_files - # replace segment filenames with a full-path one. - if segment.to_sym == :recipes - cookbook.recipe_filenames = segment_filenames - elsif segment.to_sym == :attributes - cookbook.attribute_filenames = segment_filenames - else - cookbook.segment_filenames(segment).replace(segment_filenames) + filenames = segment_files.map { |file| file.manifest_record['path'] } + cookbook.replace_segment_filenames(segment, filenames) end end - @events.synchronized_cookbook(cookbook.name) end # Sync an individual file if needed. If there is an up to date copy - # locally, nothing is done. + # locally, nothing is done. Updates +file+'s manifest with the full path to + # the cached file. # # === Arguments - # file_manifest::: A Hash of the form {"path" => 'relative/path', "url" => "location to fetch the file"} + # file<CookbookFile> # === Returns - # Path to the cached file as a String - def sync_file_in_cookbook(cookbook, file_manifest) - cache_filename = File.join("cookbooks", cookbook.name, file_manifest['path']) + # Full path to the cached file as a String + def sync_file(file) + cache_filename = File.join("cookbooks", file.cookbook.name, file.manifest_record['path']) mark_cached_file_valid(cache_filename) # If the checksums are different between on-disk (current) and on-server # (remote, per manifest), do the update. This will also execute if there # is no current checksum. - if !cached_copy_up_to_date?(cache_filename, file_manifest['checksum']) - download_file(file_manifest['url'], cache_filename) - @events.updated_cookbook_file(cookbook.name, cache_filename) + if !cached_copy_up_to_date?(cache_filename, file.manifest_record['checksum']) + download_file(file.manifest_record['url'], cache_filename) + @events.updated_cookbook_file(file.cookbook.name, cache_filename) else Chef::Log.debug("Not storing #{cache_filename}, as the cache is up to date.") end - cache_filename + # Update the manifest with the full path to the cached file + file.manifest_record['path'] = cache.load(cache_filename, false) end def cached_copy_up_to_date?(local_path, expected_checksum) diff --git a/lib/chef/cookbook_uploader.rb b/lib/chef/cookbook_uploader.rb index 6524eed3e5..968673d87a 100644 --- a/lib/chef/cookbook_uploader.rb +++ b/lib/chef/cookbook_uploader.rb @@ -7,29 +7,12 @@ require 'chef/digester' require 'chef/cookbook_version' require 'chef/cookbook/syntax_check' require 'chef/cookbook/file_system_file_vendor' +require 'chef/util/threaded_job_queue' require 'chef/sandbox' -require 'thread' class Chef class CookbookUploader - def self.work_queue - @work_queue ||= Queue.new - end - - def self.setup_worker_threads(concurrency=10) - @worker_threads ||= begin - work_queue - (1..concurrency).map do - Thread.new do - loop do - work_queue.pop.call - end - end - end - end - end - attr_reader :cookbooks attr_reader :path attr_reader :opts @@ -61,8 +44,6 @@ class Chef end def upload_cookbooks - Thread.abort_on_exception = true - # Syntax Check validate_cookbooks # generate checksums of cookbook files and create a sandbox @@ -77,7 +58,7 @@ class Chef Chef::Log.info("Uploading files") - self.class.setup_worker_threads(concurrency) + queue = Chef::Util::ThreadedJobQueue.new checksums_to_upload = Set.new @@ -86,15 +67,13 @@ class Chef if info['needs_upload'] == true checksums_to_upload << checksum Chef::Log.info("Uploading #{checksum_files[checksum]} (checksum hex = #{checksum}) to #{info['url']}") - self.class.work_queue << uploader_function_for(checksum_files[checksum], checksum, info['url'], checksums_to_upload) + queue << uploader_function_for(checksum_files[checksum], checksum, info['url'], checksums_to_upload) else Chef::Log.debug("#{checksum_files[checksum]} has not changed") end end - until checksums_to_upload.empty? - sleep 0.1 - end + queue.process(@concurrency) sandbox_url = new_sandbox['uri'] Chef::Log.debug("Committing sandbox") diff --git a/lib/chef/cookbook_version.rb b/lib/chef/cookbook_version.rb index 27dc8ef9e5..cbae3a552d 100644 --- a/lib/chef/cookbook_version.rb +++ b/lib/chef/cookbook_version.rb @@ -169,14 +169,7 @@ class Chef next unless @manifest.has_key?(segment) filenames = @manifest[segment].map{|manifest_record| manifest_record['name']} - if segment == :recipes - self.recipe_filenames = filenames - elsif segment == :attributes - self.attribute_filenames = filenames - else - segment_filenames(segment).clear - filenames.each { |filename| segment_filenames(segment) << filename } - end + replace_segment_filenames(segment, filenames) end end @@ -272,6 +265,17 @@ class Chef end end + def replace_segment_filenames(segment, filenames) + case segment.to_sym + when :recipes + self.recipe_filenames = filenames + when :attributes + self.attribute_filenames = filenames + else + segment_filenames(segment).replace(filenames) + end + end + # Query whether a template file +template_filename+ is available. File # specificity for the given +node+ is obeyed in the lookup. def has_template_for_node?(node, template_filename) diff --git a/lib/chef/formatters/error_inspectors/api_error_formatting.rb b/lib/chef/formatters/error_inspectors/api_error_formatting.rb index 1e4e258906..652d478b40 100644 --- a/lib/chef/formatters/error_inspectors/api_error_formatting.rb +++ b/lib/chef/formatters/error_inspectors/api_error_formatting.rb @@ -88,7 +88,7 @@ E def format_rest_error Array(Chef::JSONCompat.from_json(exception.response.body)["error"]).join('; ') rescue Exception - exception.response.body + safe_format_rest_error end def username @@ -107,6 +107,23 @@ E exception.response.body =~ /synchronize the clock/i end + def safe_format_rest_error + # When we get 504 from the server, sometimes the response body is non-readable. + # + # Stack trace: + # + # NoMethodError: undefined method `closed?' for nil:NilClass + # .../lib/ruby/1.9.1/net/http.rb:2789:in `stream_check' + # .../lib/ruby/1.9.1/net/http.rb:2709:in `read_body' + # .../lib/ruby/1.9.1/net/http.rb:2736:in `body' + # .../lib/chef/formatters/error_inspectors/api_error_formatting.rb:91:in `rescue in format_rest_error' + begin + exception.response.body + rescue Exception + "Cannot fetch the contents of the response." + end + end + end end end diff --git a/lib/chef/formatters/error_inspectors/cookbook_sync_error_inspector.rb b/lib/chef/formatters/error_inspectors/cookbook_sync_error_inspector.rb index 56a55a296b..0cb849a17f 100644 --- a/lib/chef/formatters/error_inspectors/cookbook_sync_error_inspector.rb +++ b/lib/chef/formatters/error_inspectors/cookbook_sync_error_inspector.rb @@ -65,7 +65,7 @@ class Chef when Net::HTTPNotFound when Net::HTTPInternalServerError describe_500_error(error_description) - when Net::HTTPBadGateway, Net::HTTPServiceUnavailable + when Net::HTTPBadGateway, Net::HTTPServiceUnavailable, Net::HTTPGatewayTimeOut describe_503_error(error_description) else describe_http_error(error_description) @@ -76,5 +76,3 @@ class Chef end end end - - diff --git a/lib/chef/mixin/create_path.rb b/lib/chef/mixin/create_path.rb index 9d1248e907..547224dda9 100644 --- a/lib/chef/mixin/create_path.rb +++ b/lib/chef/mixin/create_path.rb @@ -44,14 +44,30 @@ class Chef file_path.each_index do |i| create_path = File.join(file_path[0, i + 1]) - unless File.directory?(create_path) - Chef::Log.debug("Creating directory #{create_path}") - Dir.mkdir(create_path) - end + create_dir(create_path) unless File.directory?(create_path) end + File.expand_path(File.join(file_path)) end + private + + def create_dir(path) + begin + # When doing multithreaded downloads into the file cache, the following + # interleaving raises an error here: + # + # thread1 thread2 + # File.directory?(create_path) <- false + # File.directory?(create_path) <- false + # Dir.mkdir(create_path) + # Dir.mkdir(create_path) <- raises Errno::EEXIST + Chef::Log.debug("Creating directory #{path}") + Dir.mkdir(path) + rescue Errno::EEXIST + end + end + end end end diff --git a/lib/chef/util/threaded_job_queue.rb b/lib/chef/util/threaded_job_queue.rb new file mode 100644 index 0000000000..824cd0a3c4 --- /dev/null +++ b/lib/chef/util/threaded_job_queue.rb @@ -0,0 +1,61 @@ +# Copyright:: Copyright (c) 2014 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'thread' + +class Chef + class Util + # A simple threaded job queue + # + # Create a queue: + # + # queue = ThreadedJobQueue.new + # + # Add jobs: + # + # queue << lambda { |lock| foo.the_bar } + # + # A job is a callable that optionally takes a Mutex instance as its only + # parameter. + # + # Then start processing jobs with +n+ threads: + # + # queue.process(n) + # + class ThreadedJobQueue + def initialize + @queue = Queue.new + @lock = Mutex.new + end + + def <<(job) + @queue << job + end + + def process(concurrency = 10) + workers = (1..concurrency).map do + Thread.new do + loop do + fn = @queue.pop + fn.arity == 1 ? fn.call(@lock) : fn.call + end + end + end + workers.each { |worker| self << Thread.method(:exit) } + workers.each { |worker| worker.join } + end + end + end +end diff --git a/spec/unit/util/threaded_job_queue_spec.rb b/spec/unit/util/threaded_job_queue_spec.rb new file mode 100644 index 0000000000..a199937639 --- /dev/null +++ b/spec/unit/util/threaded_job_queue_spec.rb @@ -0,0 +1,51 @@ +# Copyright:: Copyright (c) 2014 Opscode, Inc. +# License:: Apache License, Version 2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'spec_helper' + +class WorkerThreadError < StandardError +end + +describe Chef::Util::ThreadedJobQueue do + let(:queue) { Chef::Util::ThreadedJobQueue.new } + + it "should pass mutex to jobs with an arity of 1" do + job = double() + job.should_receive(:arity).at_least(:once).and_return(1) + job.should_receive(:call).exactly(5).times.with(an_instance_of(Mutex)) + + 5.times { queue << job } + queue.process + end + + it "should pass nothing to jobs with an arity of 0" do + job = double() + job.should_receive(:arity).at_least(:once).and_return(0) + job.should_receive(:call).exactly(5).times.with(no_args) + + 5.times { queue << job } + queue.process + end + + it "should use specified number of threads" do + Thread.should_receive(:new).exactly(7).times.and_call_original + queue.process(7) + end + + it "should propagate exceptions to the main thread" do + queue << lambda { raise WorkerThreadError } + lambda { queue.process }.should raise_error(WorkerThreadError) + end +end |