summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClaire McQuin <mcquin@users.noreply.github.com>2014-05-15 14:01:26 -0700
committerClaire McQuin <mcquin@users.noreply.github.com>2014-05-15 14:01:26 -0700
commit831691edfb95a4f56d684819d698d81fdd517f2a (patch)
treef66344ac295175f572b422f58c1bb4d9942a01dc
parentbea3c04f0bf8f89c637cc8073825188acb095bff (diff)
parent8bad4e917ac4f0da4cc6b3585bad17e66d6993df (diff)
downloadchef-831691edfb95a4f56d684819d698d81fdd517f2a.tar.gz
Merge pull request #1434 from opscode/CHEF-4423
Cookbook synchronization speedup (CHEF-4423)
-rw-r--r--CHANGELOG.md1
-rw-r--r--CONTRIBUTIONS.md1
-rw-r--r--DOC_CHANGES.md4
-rw-r--r--lib/chef/config.rb6
-rw-r--r--lib/chef/cookbook/synchronizer.rb106
-rw-r--r--lib/chef/cookbook_uploader.rb29
-rw-r--r--lib/chef/cookbook_version.rb20
-rw-r--r--lib/chef/formatters/error_inspectors/api_error_formatting.rb19
-rw-r--r--lib/chef/formatters/error_inspectors/cookbook_sync_error_inspector.rb4
-rw-r--r--lib/chef/mixin/create_path.rb24
-rw-r--r--lib/chef/util/threaded_job_queue.rb61
-rw-r--r--spec/unit/util/threaded_job_queue_spec.rb51
12 files changed, 243 insertions, 83 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 870077b0ff..c3556f6e67 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -24,6 +24,7 @@
* `freebsd_package` resource now uses the brand new "pkgng" package manager when available. (CHEF-4637)
* chef-full template gets knife options to override install script url, add wget/curl cli options, and custom install commands (CHEF-4697)
* knife now bootstraps node with the latest current version of chef-client. (CHEF-4911)
+* Add a threaded download queue for synchronizing cookbooks in parallel. (CHEF-4423)
## Last Release: 11.12.0 RC1 (03/31/2014)
* SIGTERM will once-more kill a non-daemonized chef-client (CHEF-5172)
diff --git a/CONTRIBUTIONS.md b/CONTRIBUTIONS.md
index 4b401521d2..892af90134 100644
--- a/CONTRIBUTIONS.md
+++ b/CONTRIBUTIONS.md
@@ -16,3 +16,4 @@ Example Contribution:
* **hongbin**: Made bootstrap report authentication exceptions. (CHEF-5161)
* **liseki**: Made `freebsd_package` resource use the brand new "pkgng" package
manager when available.(CHEF-4637)
+* **benesch**: Implemented a threaded download queue for synchronizing cookbooks. (CHEF-4423)
diff --git a/DOC_CHANGES.md b/DOC_CHANGES.md
index 2987bf19b2..00d15f7b18 100644
--- a/DOC_CHANGES.md
+++ b/DOC_CHANGES.md
@@ -33,3 +33,7 @@ You can now modify the chef-full template with the following options in `knife b
* `--bootstrap-install-sh URL` fetches and executes an installation bash script from the provided URL.
* `--bootstrap-wget-options OPTIONS` and `--bootstrap-curl-options OPTIONS` allow arbitrary options to be added to wget and curl.
* `--bootstrap-install-command COMMAND` can be used to execute a custom chef-client installation command sequence. Take note that this cannot be used in conjunction with the above options.
+
+### Parallelize cookbook synchronization
+
+You can now synchronize your cookbooks faster by parallelizing the process. You can specify the number of helper threads in your config file with `cookbook_sync_threads NUM_THREADS`. The default is 10. Increasing `NUM_THREADS` can result in gateway errors from the chef server (namely 503 and 504). If you are experiencing these often, consider decreasing `NUM_THREADS` to fewer than default.
diff --git a/lib/chef/config.rb b/lib/chef/config.rb
index f9a3289b30..35b07c24ea 100644
--- a/lib/chef/config.rb
+++ b/lib/chef/config.rb
@@ -552,6 +552,12 @@ class Chef
# immediately if 0.)
default :run_lock_timeout, nil
+ # Number of worker threads for syncing cookbooks in parallel. Increasing
+ # this number can result in gateway errors from the server (namely 503 and 504).
+ # If you are seeing this behavior while using the default setting, reducing
+ # the number of threads will help.
+ default :cookbook_sync_threads, 10
+
# If installed via an omnibus installer, this gives the path to the
# "embedded" directory which contains all of the software packaged with
# omnibus. This is used to locate the cacert.pem file on windows.
diff --git a/lib/chef/cookbook/synchronizer.rb b/lib/chef/cookbook/synchronizer.rb
index fc5d16617c..0e8f5e692e 100644
--- a/lib/chef/cookbook/synchronizer.rb
+++ b/lib/chef/cookbook/synchronizer.rb
@@ -1,4 +1,5 @@
require 'chef/client'
+require 'chef/util/threaded_job_queue'
require 'singleton'
class Chef
@@ -56,6 +57,8 @@ class Chef
# Synchronizes the locally cached copies of cookbooks with the files on the
# server.
class CookbookSynchronizer
+ CookbookFile = Struct.new(:cookbook, :segment, :manifest_record)
+
def initialize(cookbooks_by_name, events)
@eager_segments = Chef::CookbookVersion::COOKBOOK_SEGMENTS.dup
unless Chef::Config[:no_lazy_load]
@@ -87,6 +90,38 @@ class Chef
@cookbooks_by_name.key?(cookbook_name)
end
+ def files
+ @files ||= cookbooks.inject([]) do |memo, cookbook|
+ @eager_segments.each do |segment|
+ cookbook.manifest[segment].each do |manifest_record|
+ memo << CookbookFile.new(cookbook, segment, manifest_record)
+ end
+ end
+ memo
+ end
+ end
+
+ def files_by_cookbook
+ files.group_by { |file| file.cookbook }
+ end
+
+ def files_remaining_by_cookbook
+ @files_remaining_by_cookbook ||= begin
+ files_by_cookbook.inject({}) do |memo, (cookbook, files)|
+ memo[cookbook] = files.size
+ memo
+ end
+ end
+ end
+
+ def mark_file_synced(file)
+ files_remaining_by_cookbook[file.cookbook] -= 1
+
+ if files_remaining_by_cookbook[file.cookbook] == 0
+ @events.synchronized_cookbook(file.cookbook.name)
+ end
+ end
+
# Synchronizes all the cookbooks from the chef-server.
#)
# === Returns
@@ -97,14 +132,19 @@ class Chef
clear_obsoleted_cookbooks
- @events.cookbook_sync_start(cookbook_count)
+ queue = Chef::Util::ThreadedJobQueue.new
- # Synchronize each of the node's cookbooks, and add to the
- # valid_cache_entries hash.
- cookbooks.each do |cookbook|
- sync_cookbook(cookbook)
+ files.each do |file|
+ queue << lambda do |lock|
+ sync_file(file)
+ lock.synchronize { mark_file_synced(file) }
+ end
end
+ @events.cookbook_sync_start(cookbook_count)
+ queue.process(Chef::Config[:cookbook_sync_threads])
+ update_cookbook_filenames
+
rescue Exception => e
@events.cookbook_sync_failed(cookbooks, e)
raise
@@ -129,61 +169,43 @@ class Chef
@events.cookbook_clean_complete
end
- # Sync the eagerly loaded files contained by +cookbook+
- #
- # === Arguments
- # cookbook<Chef::Cookbook>:: The cookbook to update
- # valid_cache_entries<Hash>:: Out-param; Added to this hash are the files that
- # were referred to by this cookbook
- def sync_cookbook(cookbook)
- Chef::Log.debug("Synchronizing cookbook #{cookbook.name} #{cookbook.version}")
-
- # files and templates are lazily loaded, and will be done later.
-
- @eager_segments.each do |segment|
- segment_filenames = Array.new
- cookbook.manifest[segment].each do |manifest_record|
-
- cache_filename = sync_file_in_cookbook(cookbook, manifest_record)
- # make the segment filenames a full path.
- full_path_cache_filename = cache.load(cache_filename, false)
- segment_filenames << full_path_cache_filename
- end
+ def update_cookbook_filenames
+ files_by_cookbook.each do |cookbook, cookbook_files|
+ files_by_segment = cookbook_files.group_by { |file| file.segment }
+ @eager_segments.each do |segment|
+ segment_files = files_by_segment[segment]
+ next unless segment_files
- # replace segment filenames with a full-path one.
- if segment.to_sym == :recipes
- cookbook.recipe_filenames = segment_filenames
- elsif segment.to_sym == :attributes
- cookbook.attribute_filenames = segment_filenames
- else
- cookbook.segment_filenames(segment).replace(segment_filenames)
+ filenames = segment_files.map { |file| file.manifest_record['path'] }
+ cookbook.replace_segment_filenames(segment, filenames)
end
end
- @events.synchronized_cookbook(cookbook.name)
end
# Sync an individual file if needed. If there is an up to date copy
- # locally, nothing is done.
+ # locally, nothing is done. Updates +file+'s manifest with the full path to
+ # the cached file.
#
# === Arguments
- # file_manifest::: A Hash of the form {"path" => 'relative/path', "url" => "location to fetch the file"}
+ # file<CookbookFile>
# === Returns
- # Path to the cached file as a String
- def sync_file_in_cookbook(cookbook, file_manifest)
- cache_filename = File.join("cookbooks", cookbook.name, file_manifest['path'])
+ # Full path to the cached file as a String
+ def sync_file(file)
+ cache_filename = File.join("cookbooks", file.cookbook.name, file.manifest_record['path'])
mark_cached_file_valid(cache_filename)
# If the checksums are different between on-disk (current) and on-server
# (remote, per manifest), do the update. This will also execute if there
# is no current checksum.
- if !cached_copy_up_to_date?(cache_filename, file_manifest['checksum'])
- download_file(file_manifest['url'], cache_filename)
- @events.updated_cookbook_file(cookbook.name, cache_filename)
+ if !cached_copy_up_to_date?(cache_filename, file.manifest_record['checksum'])
+ download_file(file.manifest_record['url'], cache_filename)
+ @events.updated_cookbook_file(file.cookbook.name, cache_filename)
else
Chef::Log.debug("Not storing #{cache_filename}, as the cache is up to date.")
end
- cache_filename
+ # Update the manifest with the full path to the cached file
+ file.manifest_record['path'] = cache.load(cache_filename, false)
end
def cached_copy_up_to_date?(local_path, expected_checksum)
diff --git a/lib/chef/cookbook_uploader.rb b/lib/chef/cookbook_uploader.rb
index 6524eed3e5..968673d87a 100644
--- a/lib/chef/cookbook_uploader.rb
+++ b/lib/chef/cookbook_uploader.rb
@@ -7,29 +7,12 @@ require 'chef/digester'
require 'chef/cookbook_version'
require 'chef/cookbook/syntax_check'
require 'chef/cookbook/file_system_file_vendor'
+require 'chef/util/threaded_job_queue'
require 'chef/sandbox'
-require 'thread'
class Chef
class CookbookUploader
- def self.work_queue
- @work_queue ||= Queue.new
- end
-
- def self.setup_worker_threads(concurrency=10)
- @worker_threads ||= begin
- work_queue
- (1..concurrency).map do
- Thread.new do
- loop do
- work_queue.pop.call
- end
- end
- end
- end
- end
-
attr_reader :cookbooks
attr_reader :path
attr_reader :opts
@@ -61,8 +44,6 @@ class Chef
end
def upload_cookbooks
- Thread.abort_on_exception = true
-
# Syntax Check
validate_cookbooks
# generate checksums of cookbook files and create a sandbox
@@ -77,7 +58,7 @@ class Chef
Chef::Log.info("Uploading files")
- self.class.setup_worker_threads(concurrency)
+ queue = Chef::Util::ThreadedJobQueue.new
checksums_to_upload = Set.new
@@ -86,15 +67,13 @@ class Chef
if info['needs_upload'] == true
checksums_to_upload << checksum
Chef::Log.info("Uploading #{checksum_files[checksum]} (checksum hex = #{checksum}) to #{info['url']}")
- self.class.work_queue << uploader_function_for(checksum_files[checksum], checksum, info['url'], checksums_to_upload)
+ queue << uploader_function_for(checksum_files[checksum], checksum, info['url'], checksums_to_upload)
else
Chef::Log.debug("#{checksum_files[checksum]} has not changed")
end
end
- until checksums_to_upload.empty?
- sleep 0.1
- end
+ queue.process(@concurrency)
sandbox_url = new_sandbox['uri']
Chef::Log.debug("Committing sandbox")
diff --git a/lib/chef/cookbook_version.rb b/lib/chef/cookbook_version.rb
index 27dc8ef9e5..cbae3a552d 100644
--- a/lib/chef/cookbook_version.rb
+++ b/lib/chef/cookbook_version.rb
@@ -169,14 +169,7 @@ class Chef
next unless @manifest.has_key?(segment)
filenames = @manifest[segment].map{|manifest_record| manifest_record['name']}
- if segment == :recipes
- self.recipe_filenames = filenames
- elsif segment == :attributes
- self.attribute_filenames = filenames
- else
- segment_filenames(segment).clear
- filenames.each { |filename| segment_filenames(segment) << filename }
- end
+ replace_segment_filenames(segment, filenames)
end
end
@@ -272,6 +265,17 @@ class Chef
end
end
+ def replace_segment_filenames(segment, filenames)
+ case segment.to_sym
+ when :recipes
+ self.recipe_filenames = filenames
+ when :attributes
+ self.attribute_filenames = filenames
+ else
+ segment_filenames(segment).replace(filenames)
+ end
+ end
+
# Query whether a template file +template_filename+ is available. File
# specificity for the given +node+ is obeyed in the lookup.
def has_template_for_node?(node, template_filename)
diff --git a/lib/chef/formatters/error_inspectors/api_error_formatting.rb b/lib/chef/formatters/error_inspectors/api_error_formatting.rb
index 1e4e258906..652d478b40 100644
--- a/lib/chef/formatters/error_inspectors/api_error_formatting.rb
+++ b/lib/chef/formatters/error_inspectors/api_error_formatting.rb
@@ -88,7 +88,7 @@ E
def format_rest_error
Array(Chef::JSONCompat.from_json(exception.response.body)["error"]).join('; ')
rescue Exception
- exception.response.body
+ safe_format_rest_error
end
def username
@@ -107,6 +107,23 @@ E
exception.response.body =~ /synchronize the clock/i
end
+ def safe_format_rest_error
+ # When we get 504 from the server, sometimes the response body is non-readable.
+ #
+ # Stack trace:
+ #
+ # NoMethodError: undefined method `closed?' for nil:NilClass
+ # .../lib/ruby/1.9.1/net/http.rb:2789:in `stream_check'
+ # .../lib/ruby/1.9.1/net/http.rb:2709:in `read_body'
+ # .../lib/ruby/1.9.1/net/http.rb:2736:in `body'
+ # .../lib/chef/formatters/error_inspectors/api_error_formatting.rb:91:in `rescue in format_rest_error'
+ begin
+ exception.response.body
+ rescue Exception
+ "Cannot fetch the contents of the response."
+ end
+ end
+
end
end
end
diff --git a/lib/chef/formatters/error_inspectors/cookbook_sync_error_inspector.rb b/lib/chef/formatters/error_inspectors/cookbook_sync_error_inspector.rb
index 56a55a296b..0cb849a17f 100644
--- a/lib/chef/formatters/error_inspectors/cookbook_sync_error_inspector.rb
+++ b/lib/chef/formatters/error_inspectors/cookbook_sync_error_inspector.rb
@@ -65,7 +65,7 @@ class Chef
when Net::HTTPNotFound
when Net::HTTPInternalServerError
describe_500_error(error_description)
- when Net::HTTPBadGateway, Net::HTTPServiceUnavailable
+ when Net::HTTPBadGateway, Net::HTTPServiceUnavailable, Net::HTTPGatewayTimeOut
describe_503_error(error_description)
else
describe_http_error(error_description)
@@ -76,5 +76,3 @@ class Chef
end
end
end
-
-
diff --git a/lib/chef/mixin/create_path.rb b/lib/chef/mixin/create_path.rb
index 9d1248e907..547224dda9 100644
--- a/lib/chef/mixin/create_path.rb
+++ b/lib/chef/mixin/create_path.rb
@@ -44,14 +44,30 @@ class Chef
file_path.each_index do |i|
create_path = File.join(file_path[0, i + 1])
- unless File.directory?(create_path)
- Chef::Log.debug("Creating directory #{create_path}")
- Dir.mkdir(create_path)
- end
+ create_dir(create_path) unless File.directory?(create_path)
end
+
File.expand_path(File.join(file_path))
end
+ private
+
+ def create_dir(path)
+ begin
+ # When doing multithreaded downloads into the file cache, the following
+ # interleaving raises an error here:
+ #
+ # thread1 thread2
+ # File.directory?(create_path) <- false
+ # File.directory?(create_path) <- false
+ # Dir.mkdir(create_path)
+ # Dir.mkdir(create_path) <- raises Errno::EEXIST
+ Chef::Log.debug("Creating directory #{path}")
+ Dir.mkdir(path)
+ rescue Errno::EEXIST
+ end
+ end
+
end
end
end
diff --git a/lib/chef/util/threaded_job_queue.rb b/lib/chef/util/threaded_job_queue.rb
new file mode 100644
index 0000000000..824cd0a3c4
--- /dev/null
+++ b/lib/chef/util/threaded_job_queue.rb
@@ -0,0 +1,61 @@
+# Copyright:: Copyright (c) 2014 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'thread'
+
+class Chef
+ class Util
+ # A simple threaded job queue
+ #
+ # Create a queue:
+ #
+ # queue = ThreadedJobQueue.new
+ #
+ # Add jobs:
+ #
+ # queue << lambda { |lock| foo.the_bar }
+ #
+ # A job is a callable that optionally takes a Mutex instance as its only
+ # parameter.
+ #
+ # Then start processing jobs with +n+ threads:
+ #
+ # queue.process(n)
+ #
+ class ThreadedJobQueue
+ def initialize
+ @queue = Queue.new
+ @lock = Mutex.new
+ end
+
+ def <<(job)
+ @queue << job
+ end
+
+ def process(concurrency = 10)
+ workers = (1..concurrency).map do
+ Thread.new do
+ loop do
+ fn = @queue.pop
+ fn.arity == 1 ? fn.call(@lock) : fn.call
+ end
+ end
+ end
+ workers.each { |worker| self << Thread.method(:exit) }
+ workers.each { |worker| worker.join }
+ end
+ end
+ end
+end
diff --git a/spec/unit/util/threaded_job_queue_spec.rb b/spec/unit/util/threaded_job_queue_spec.rb
new file mode 100644
index 0000000000..a199937639
--- /dev/null
+++ b/spec/unit/util/threaded_job_queue_spec.rb
@@ -0,0 +1,51 @@
+# Copyright:: Copyright (c) 2014 Opscode, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'spec_helper'
+
+class WorkerThreadError < StandardError
+end
+
+describe Chef::Util::ThreadedJobQueue do
+ let(:queue) { Chef::Util::ThreadedJobQueue.new }
+
+ it "should pass mutex to jobs with an arity of 1" do
+ job = double()
+ job.should_receive(:arity).at_least(:once).and_return(1)
+ job.should_receive(:call).exactly(5).times.with(an_instance_of(Mutex))
+
+ 5.times { queue << job }
+ queue.process
+ end
+
+ it "should pass nothing to jobs with an arity of 0" do
+ job = double()
+ job.should_receive(:arity).at_least(:once).and_return(0)
+ job.should_receive(:call).exactly(5).times.with(no_args)
+
+ 5.times { queue << job }
+ queue.process
+ end
+
+ it "should use specified number of threads" do
+ Thread.should_receive(:new).exactly(7).times.and_call_original
+ queue.process(7)
+ end
+
+ it "should propagate exceptions to the main thread" do
+ queue << lambda { raise WorkerThreadError }
+ lambda { queue.process }.should raise_error(WorkerThreadError)
+ end
+end