diff options
author | Jim MacArthur <jim.macarthur@codethink.co.uk> | 2018-10-19 17:43:08 +0100 |
---|---|---|
committer | Jim MacArthur <jim.macarthur@codethink.co.uk> | 2018-10-19 17:43:08 +0100 |
commit | 789251ac026b58909e67fdbaacc0f10c964bf9b7 (patch) | |
tree | 2449584ca58d0aaf39dda6667030b514c511b97e | |
parent | 7cf28a8d62c163774064951358cff7ea0215c069 (diff) | |
download | buildstream-789251ac026b58909e67fdbaacc0f10c964bf9b7.tar.gz |
Add code necessary to do cas-to-cas import
-rw-r--r-- | buildstream/storage/_casbaseddirectory.py | 246 | ||||
-rw-r--r-- | tests/storage/virtual_directory_import.py | 3 |
2 files changed, 235 insertions, 14 deletions
diff --git a/buildstream/storage/_casbaseddirectory.py b/buildstream/storage/_casbaseddirectory.py index 640c90d07..85c98cf84 100644 --- a/buildstream/storage/_casbaseddirectory.py +++ b/buildstream/storage/_casbaseddirectory.py @@ -136,6 +136,41 @@ class CasBasedDirectory(Directory): # We don't need to do anything more than that; files were already added ealier, and symlinks are # part of the directory structure. + def _add_new_blank_directory(self, name) -> Directory: + bst_dir = CasBasedDirectory(self.context, parent=self, filename=name) + new_pb2_dirnode = self.pb2_directory.directories.add() + new_pb2_dirnode.name = name + # Calculate the hash for an empty directory + if name in self.index: + raise VirtualDirectoryError("Creating directory {} would overwrite an existing item in {}" + .format(name, str(self))) + new_pb2_directory = remote_execution_pb2.Directory() + self.cas_cache.add_object(digest=new_pb2_dirnode.digest, buffer=new_pb2_directory.SerializeToString()) + self.index[name] = IndexEntry(new_pb2_dirnode, buildstream_object=bst_dir) + return bst_dir + + def create_directory(self, name: str) -> Directory: + """Creates a directory if it does not already exist. This does not + cause an error if something exists; it will remove files and + symlinks to files which have the same name in this + directory. Symlinks to directories with the name 'name' are + unaltered; it's assumed that the target of that symlink will + be used. + + """ + existing_item = self._find_pb2_entry(name) + if isinstance(existing_item, remote_execution_pb2.FileNode): + # Directory imported over file with same name + self.remove_item(name) + elif isinstance(existing_item, remote_execution_pb2.SymlinkNode): + # Directory imported over symlink with same source name + if self.symlink_target_is_directory(existing_item): + return self._resolve_symlink_or_directory(name) # That's fine; any files in the source directory should end up at the target of the symlink. + else: + self.remove_item(name) # Symlinks to files get replaced + return self.descend(name, create=True) # Creates the directory if it doesn't already exist. + + def _find_pb2_entry(self, name): if name in self.index: return self.index[name].pb_object @@ -232,6 +267,7 @@ class CasBasedDirectory(Directory): if isinstance(entry, CasBasedDirectory): return entry.descend(subdirectory_spec[1:], create) else: + # May be a symlink error = "Cannot descend into {}, which is a '{}' in the directory {}" raise VirtualDirectoryError(error.format(subdirectory_spec[0], type(entry).__name__, @@ -288,6 +324,29 @@ class CasBasedDirectory(Directory): directory = directory.descend(c, create=True) return directory + def _resolve_symlink(self, node): + """Same as _resolve_symlink_or_directory but takes a SymlinkNode. + """ + + # OK then, it's a symlink + symlink = node + absolute = symlink.target.startswith(CasBasedDirectory._pb2_absolute_path_prefix) + if absolute: + root = self.find_root() + else: + root = self + directory = root + components = symlink.target.split(CasBasedDirectory._pb2_path_sep) + for c in components: + if c == ".": + pass + elif c == "..": + directory = directory.parent + else: + directory = directory.descend(c, create=True) + return directory + + def _resolve(self, name, absolute_symlinks_resolve=True): """ Resolves any name to an object. If the name points to a symlink in this directory, it returns the thing it points to, recursively. Returns a CasBasedDirectory, FileNode or None. Never creates a directory or otherwise alters the directory. """ @@ -427,6 +486,157 @@ class CasBasedDirectory(Directory): result.files_written.append(relative_pathname) return result + + def _save(self, name): + """ Saves this directory into the content cache as a named ref. This function is not + currently in use, but may be useful later. """ + self._recalculate_recursing_up() + self._recalculate_recursing_down() + (rel_refpath, refname) = os.path.split(name) + refdir = os.path.join(self.cas_directory, 'refs', 'heads', rel_refpath) + refname = os.path.join(refdir, refname) + + if not os.path.exists(refdir): + os.makedirs(refdir) + with open(refname, "wb") as f: + f.write(self.ref.SerializeToString()) + + def find_updated_files(self, modified_directory, prefix=""): + """Find the list of written and overwritten files that would result + from importing 'modified_directory' into this one. This does + not change either directory. The reason this exists is for + direct imports of cas directories into other ones, which can + be done by simply replacing a hash, but we still need the file + lists. + + """ + result = FileListResult() + for entry in modified_directory.pb2_directory.directories: + existing_dir = self._find_pb2_entry(entry.name) + if existing_dir: + updates_files = existing_dir.find_updated_files(modified_directory.descend(entry.name), + os.path.join(prefix, entry.name)) + result.combine(updated_files) + else: + for f in source_directory.descend(entry.name).list_relative_paths(): + result.files_written.append(os.path.join(prefix, f)) + # None of these can overwrite anything, since the original files don't exist + for entry in modified_directory.pb2_directory.files + modified_directory.pb2_directory.symlinks: + if self._find_pb2_entry(entry.name): + result.files_overwritten.apppend(os.path.join(prefix, entry.name)) + result.file_written.apppend(os.path.join(prefix, entry.name)) + return result + + def files_in_subdir(sorted_files, dirname): + """Filters sorted_files and returns only the ones which have + 'dirname' as a prefix, with that prefix removed. + + """ + if not dirname.endswith(os.path.sep): + dirname += os.path.sep + return [f[len(dirname):] for f in sorted_files if f.startswith(dirname)] + + def symlink_target_is_directory(self, symlink_node): + x = self._resolve_symlink(symlink_node) + return isinstance(x, CasBasedDirectory) + + def _partial_import_cas_into_cas(self, source_directory, files, path_prefix="", file_list_required=True): + """ Import only the files and symlinks listed in 'files' from source_directory to this one. + Args: + source_directory (:class:`.CasBasedDirectory`): The directory to import from + files ([str]): List of pathnames to import. + path_prefix (str): Prefix used to add entries to the file list result. + file_list_required: Whether to update the file list while processing. + """ + print("Beginning partial import of {} into {}".format(source_directory, self)) + result = FileListResult() + processed_directories = set() + for f in files: + if f == ".": continue + fullname = os.path.join(path_prefix, f) + components = f.split(os.path.sep) + if len(components)>1: + # We are importing a thing which is in a subdirectory. We may have already seen this dirname + # for a previous file. + dirname = components[0] + if dirname not in processed_directories: + # Now strip off the first directory name and import files recursively. + subcomponents = CasBasedDirectory.files_in_subdir(files, dirname) + self.create_directory(dirname) + print("Creating destination in {}: {}".format(self, dirname)) + dest_subdir = self._resolve_symlink_or_directory(dirname) + src_subdir = source_directory.descend(dirname) + import_result = dest_subdir._partial_import_cas_into_cas(src_subdir, subcomponents, + path_prefix=fullname, file_list_required=file_list_required) + result.combine(import_result) + processed_directories.add(dirname) + elif isinstance(source_directory.index[f].buildstream_object, CasBasedDirectory): + # The thing in the input file list is a directory on its own. In which case, replace any existing file, or symlink to file + # with the new, blank directory - if it's neither of those things, or doesn't exist, then just create the dir. + self.create_directory(f) + else: + # We're importing a file or symlink - replace anything with the same name. + self._check_replacement(f, path_prefix, result) + item = source_directory.index[f].pb_object + if isinstance(item, remote_execution_pb2.FileNode): + filenode = self.pb2_directory.files.add(digest=item.digest, name=f, + is_executable=item.is_executable) + self.index[f] = IndexEntry(filenode, modified=(fullname in result.overwritten)) + else: + assert(isinstance(item, remote_execution_pb2.SymlinkNode)) + symlinknode = self.pb2_directory.symlinks.add(name=f, target=item.target) + # A symlink node has no digest. + self.index[f] = IndexEntry(symlinknode, modified=(fullname in result.overwritten)) + return result + + def transfer_node_contents(destination, source): + """Transfers all fields from the source PB2 node into the + destination. Destination and source must be of the same type and must + be a FileNode, SymlinkNode or DirectoryNode. + """ + assert(type(destination) == type(source)) + destination.name = source.name + if isinstance(destination, remote_execution_pb2.FileNode): + destination.digest.hash = source.digest.hash + destination.digest.size_bytes = source.digest.size_bytes + destination.is_executable = source.is_executable + elif isinstance(destination, remote_execution_pb2.SymlinkNode): + destination.target = source.target + elif isinstance(destination, remote_execution_pb2.DirectoryNode): + destination.digest.hash = source.digest.hash + destination.digest.size_bytes = source.digest.size_bytes + else: + raise VirtualDirectoryError("Incompatible type '{}' used as destination for transfer_node_contents" + .format(destination.type)) + + def _add_directory_from_node(self, source_node, source_casdir, can_hardlink=False): + # Duplicate the given node and add it to our index with a CasBasedDirectory object. + # No existing entry with the source node's name can exist. + # source_casdir is only needed if can_hardlink is True. + assert(self._find_pb2_entry(source_node.name) is None) + + if can_hardlink: + new_dir_node = self.pb2_directory.directories.add() + CasBasedDirectory.transfer_node_contents(new_dir_node, source_node) + self.index[source_node.name] = IndexEntry(source_node, buildstream_object=source_casdir, modified=True) + else: + new_dir_node = self.pb2_directory.directories.add() + CasBasedDirectory.transfer_node_contents(new_dir_node, source_node) + buildStreamDirectory = CasBasedDirectory(self.context, ref=source_node.digest, + parent=self, filename=source_node.name) + self.index[source_node.name] = IndexEntry(source_node, buildstream_object=buildStreamDirectory, modified=True) + + def _import_cas_into_cas(self, source_directory, files=None): + """ A full import is significantly quicker than a partial import, because we can just + replace one directory with another's hash, without doing any recursion. + """ + if files is None: + #return self._full_import_cas_into_cas(source_directory, can_hardlink=True) + files = source_directory.list_relative_paths() + print("Extracted all files from source directory '{}': {}".format(source_directory, files)) + return self._partial_import_cas_into_cas(source_directory, files) + + def import_files(self, external_pathspec, *, files=None, report_written=True, update_utimes=False, can_link=False): @@ -448,28 +658,34 @@ class CasBasedDirectory(Directory): can_link (bool): Ignored, since hard links do not have any meaning within CAS. """ - if isinstance(external_pathspec, FileBasedDirectory): - source_directory = external_pathspec._get_underlying_directory() - elif isinstance(external_pathspec, CasBasedDirectory): - # TODO: This transfers from one CAS to another via the - # filesystem, which is very inefficient. Alter this so it - # transfers refs across directly. + + duplicate_cas = None + if isinstance(external_pathspec, CasBasedDirectory): + result = self._import_cas_into_cas(external_pathspec, files=files) + + # Duplicate the current directory and do an import that way. + duplicate_cas = CasBasedDirectory(self.context, ref=self.ref) with tempfile.TemporaryDirectory(prefix="roundtrip") as tmpdir: external_pathspec.export_files(tmpdir) if files is None: files = list_relative_paths(tmpdir) - result = self._import_files_from_directory(tmpdir, files=files) - return result + duplicate_cas._import_files_from_directory(tmpdir, files=files) + duplicate_cas._recalculate_recursing_down() + if duplicate_cas.parent: + duplicate_cas.parent._recalculate_recursing_up(self) else: - source_directory = external_pathspec - - if files is None: - files = list_relative_paths(source_directory) + if isinstance(external_pathspec, FileBasedDirectory): + source_directory = external_pathspec.get_underlying_directory() + else: + source_directory = external_pathspec + if files is None: + files = list_relative_paths(external_pathspec) + result = self._import_files_from_directory(source_directory, files=files) # TODO: No notice is taken of report_written, update_utimes or can_link. # Current behaviour is to fully populate the report, which is inefficient, # but still correct. - result = self._import_files_from_directory(source_directory, files=files) + # We need to recalculate and store the hashes of all directories both # up and down the tree; we have changed our directory by importing files @@ -479,6 +695,10 @@ class CasBasedDirectory(Directory): self._recalculate_recursing_down() if self.parent: self.parent._recalculate_recursing_up(self) + if duplicate_cas: + if duplicate_cas.ref.hash != self.ref.hash: + raise VirtualDirectoryError("Mismatch between file-imported result {} and cas-to-cas imported result {}.".format(duplicate_cas.ref.hash,self.ref.hash)) + return result def set_deterministic_mtime(self): diff --git a/tests/storage/virtual_directory_import.py b/tests/storage/virtual_directory_import.py index 1c78c1bb4..47b493506 100644 --- a/tests/storage/virtual_directory_import.py +++ b/tests/storage/virtual_directory_import.py @@ -150,9 +150,10 @@ def test_cas_import(cli, tmpdir, original, overlay): generate_random_root(tmpdir) d = create_new_casdir(original, fake_context, tmpdir) d2 = create_new_casdir(overlay, fake_context, tmpdir) + print("Importing dir {} into {}".format(overlay, original)) d.import_files(d2) d.export_files(os.path.join(tmpdir, "output")) - + for item in root_filesets[overlay - 1]: (path, typename, content) = item realpath = resolve_symlinks(path, os.path.join(tmpdir, "output")) |