diff options
author | Nikhil Benesch <nikhil.benesch@gmail.com> | 2014-03-09 22:58:59 -0400 |
---|---|---|
committer | Claire McQuin <claire@getchef.com> | 2014-05-13 15:21:16 -0700 |
commit | c9426f9a0fd8e7943fd51b032999bde080c07af7 (patch) | |
tree | aa4e53053adf300faa92430560454663428a2bb9 /lib/chef/cookbook/synchronizer.rb | |
parent | d465f8ca9937e871caa409988f5909b2b0f0f843 (diff) | |
download | chef-c9426f9a0fd8e7943fd51b032999bde080c07af7.tar.gz |
CHEF-4423: implement threaded cookbook synchronization
This commit parallelizes cookbook synchronization (downloads) with the
threaded worker queue model used during cookbook upload.
Diffstat (limited to 'lib/chef/cookbook/synchronizer.rb')
-rw-r--r-- | lib/chef/cookbook/synchronizer.rb | 106 |
1 files changed, 64 insertions, 42 deletions
diff --git a/lib/chef/cookbook/synchronizer.rb b/lib/chef/cookbook/synchronizer.rb index fc5d16617c..c855a4f708 100644 --- a/lib/chef/cookbook/synchronizer.rb +++ b/lib/chef/cookbook/synchronizer.rb @@ -1,4 +1,5 @@ require 'chef/client' +require 'chef/util/threaded_job_queue' require 'singleton' class Chef @@ -56,6 +57,8 @@ class Chef # Synchronizes the locally cached copies of cookbooks with the files on the # server. class CookbookSynchronizer + CookbookFile = Struct.new(:cookbook, :segment, :manifest_record) + def initialize(cookbooks_by_name, events) @eager_segments = Chef::CookbookVersion::COOKBOOK_SEGMENTS.dup unless Chef::Config[:no_lazy_load] @@ -87,6 +90,38 @@ class Chef @cookbooks_by_name.key?(cookbook_name) end + def files + @files ||= cookbooks.inject([]) do |memo, cookbook| + @eager_segments.each do |segment| + cookbook.manifest[segment].each do |manifest_record| + memo << CookbookFile.new(cookbook, segment, manifest_record) + end + end + memo + end + end + + def files_by_cookbook + files.group_by { |file| file.cookbook } + end + + def files_remaining_by_cookbook + @files_remaining_by_cookbook ||= begin + files_by_cookbook.inject({}) do |memo, (cookbook, files)| + memo[cookbook] = files.size + memo + end + end + end + + def mark_file_synced(file) + files_remaining_by_cookbook[file.cookbook] -= 1 + + if files_remaining_by_cookbook[file.cookbook] == 0 + @events.synchronized_cookbook(file.cookbook.name) + end + end + # Synchronizes all the cookbooks from the chef-server. #) # === Returns @@ -97,14 +132,19 @@ class Chef clear_obsoleted_cookbooks - @events.cookbook_sync_start(cookbook_count) + queue = Chef::Util::ThreadedJobQueue.new - # Synchronize each of the node's cookbooks, and add to the - # valid_cache_entries hash. - cookbooks.each do |cookbook| - sync_cookbook(cookbook) + files.each do |file| + queue << lambda do |lock| + sync_file(file) + lock.synchronize { mark_file_synced(file) } + end end + @events.cookbook_sync_start(cookbook_count) + queue.process(20) + update_cookbook_filenames + rescue Exception => e @events.cookbook_sync_failed(cookbooks, e) raise @@ -129,61 +169,43 @@ class Chef @events.cookbook_clean_complete end - # Sync the eagerly loaded files contained by +cookbook+ - # - # === Arguments - # cookbook<Chef::Cookbook>:: The cookbook to update - # valid_cache_entries<Hash>:: Out-param; Added to this hash are the files that - # were referred to by this cookbook - def sync_cookbook(cookbook) - Chef::Log.debug("Synchronizing cookbook #{cookbook.name} #{cookbook.version}") - - # files and templates are lazily loaded, and will be done later. - - @eager_segments.each do |segment| - segment_filenames = Array.new - cookbook.manifest[segment].each do |manifest_record| - - cache_filename = sync_file_in_cookbook(cookbook, manifest_record) - # make the segment filenames a full path. - full_path_cache_filename = cache.load(cache_filename, false) - segment_filenames << full_path_cache_filename - end + def update_cookbook_filenames + files_by_cookbook.each do |cookbook, cookbook_files| + files_by_segment = cookbook_files.group_by { |file| file.segment } + @eager_segments.each do |segment| + segment_files = files_by_segment[segment] + next unless segment_files - # replace segment filenames with a full-path one. - if segment.to_sym == :recipes - cookbook.recipe_filenames = segment_filenames - elsif segment.to_sym == :attributes - cookbook.attribute_filenames = segment_filenames - else - cookbook.segment_filenames(segment).replace(segment_filenames) + filenames = segment_files.map { |file| file.manifest_record['path'] } + cookbook.replace_segment_filenames(segment, filenames) end end - @events.synchronized_cookbook(cookbook.name) end # Sync an individual file if needed. If there is an up to date copy - # locally, nothing is done. + # locally, nothing is done. Updates +file+'s manifest with the full path to + # the cached file. # # === Arguments - # file_manifest::: A Hash of the form {"path" => 'relative/path', "url" => "location to fetch the file"} + # file<CookbookFile> # === Returns - # Path to the cached file as a String - def sync_file_in_cookbook(cookbook, file_manifest) - cache_filename = File.join("cookbooks", cookbook.name, file_manifest['path']) + # Full path to the cached file as a String + def sync_file(file) + cache_filename = File.join("cookbooks", file.cookbook.name, file.manifest_record['path']) mark_cached_file_valid(cache_filename) # If the checksums are different between on-disk (current) and on-server # (remote, per manifest), do the update. This will also execute if there # is no current checksum. - if !cached_copy_up_to_date?(cache_filename, file_manifest['checksum']) - download_file(file_manifest['url'], cache_filename) - @events.updated_cookbook_file(cookbook.name, cache_filename) + if !cached_copy_up_to_date?(cache_filename, file.manifest_record['checksum']) + download_file(file.manifest_record['url'], cache_filename) + @events.updated_cookbook_file(file.cookbook.name, cache_filename) else Chef::Log.debug("Not storing #{cache_filename}, as the cache is up to date.") end - cache_filename + # Update the manifest with the full path to the cached file + file.manifest_record['path'] = cache.load(cache_filename, false) end def cached_copy_up_to_date?(local_path, expected_checksum) |