summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJim MacArthur <jim.macarthur@codethink.co.uk>2018-10-19 17:43:08 +0100
committerJim MacArthur <jim.macarthur@codethink.co.uk>2018-10-19 17:43:08 +0100
commit789251ac026b58909e67fdbaacc0f10c964bf9b7 (patch)
tree2449584ca58d0aaf39dda6667030b514c511b97e
parent7cf28a8d62c163774064951358cff7ea0215c069 (diff)
downloadbuildstream-789251ac026b58909e67fdbaacc0f10c964bf9b7.tar.gz
Add code necessary to do cas-to-cas import
-rw-r--r--buildstream/storage/_casbaseddirectory.py246
-rw-r--r--tests/storage/virtual_directory_import.py3
2 files changed, 235 insertions, 14 deletions
diff --git a/buildstream/storage/_casbaseddirectory.py b/buildstream/storage/_casbaseddirectory.py
index 640c90d07..85c98cf84 100644
--- a/buildstream/storage/_casbaseddirectory.py
+++ b/buildstream/storage/_casbaseddirectory.py
@@ -136,6 +136,41 @@ class CasBasedDirectory(Directory):
# We don't need to do anything more than that; files were already added ealier, and symlinks are
# part of the directory structure.
+ def _add_new_blank_directory(self, name) -> Directory:
+ bst_dir = CasBasedDirectory(self.context, parent=self, filename=name)
+ new_pb2_dirnode = self.pb2_directory.directories.add()
+ new_pb2_dirnode.name = name
+ # Calculate the hash for an empty directory
+ if name in self.index:
+ raise VirtualDirectoryError("Creating directory {} would overwrite an existing item in {}"
+ .format(name, str(self)))
+ new_pb2_directory = remote_execution_pb2.Directory()
+ self.cas_cache.add_object(digest=new_pb2_dirnode.digest, buffer=new_pb2_directory.SerializeToString())
+ self.index[name] = IndexEntry(new_pb2_dirnode, buildstream_object=bst_dir)
+ return bst_dir
+
+ def create_directory(self, name: str) -> Directory:
+ """Creates a directory if it does not already exist. This does not
+ cause an error if something exists; it will remove files and
+ symlinks to files which have the same name in this
+ directory. Symlinks to directories with the name 'name' are
+ unaltered; it's assumed that the target of that symlink will
+ be used.
+
+ """
+ existing_item = self._find_pb2_entry(name)
+ if isinstance(existing_item, remote_execution_pb2.FileNode):
+ # Directory imported over file with same name
+ self.remove_item(name)
+ elif isinstance(existing_item, remote_execution_pb2.SymlinkNode):
+ # Directory imported over symlink with same source name
+ if self.symlink_target_is_directory(existing_item):
+ return self._resolve_symlink_or_directory(name) # That's fine; any files in the source directory should end up at the target of the symlink.
+ else:
+ self.remove_item(name) # Symlinks to files get replaced
+ return self.descend(name, create=True) # Creates the directory if it doesn't already exist.
+
+
def _find_pb2_entry(self, name):
if name in self.index:
return self.index[name].pb_object
@@ -232,6 +267,7 @@ class CasBasedDirectory(Directory):
if isinstance(entry, CasBasedDirectory):
return entry.descend(subdirectory_spec[1:], create)
else:
+ # May be a symlink
error = "Cannot descend into {}, which is a '{}' in the directory {}"
raise VirtualDirectoryError(error.format(subdirectory_spec[0],
type(entry).__name__,
@@ -288,6 +324,29 @@ class CasBasedDirectory(Directory):
directory = directory.descend(c, create=True)
return directory
+ def _resolve_symlink(self, node):
+ """Same as _resolve_symlink_or_directory but takes a SymlinkNode.
+ """
+
+ # OK then, it's a symlink
+ symlink = node
+ absolute = symlink.target.startswith(CasBasedDirectory._pb2_absolute_path_prefix)
+ if absolute:
+ root = self.find_root()
+ else:
+ root = self
+ directory = root
+ components = symlink.target.split(CasBasedDirectory._pb2_path_sep)
+ for c in components:
+ if c == ".":
+ pass
+ elif c == "..":
+ directory = directory.parent
+ else:
+ directory = directory.descend(c, create=True)
+ return directory
+
+
def _resolve(self, name, absolute_symlinks_resolve=True):
""" Resolves any name to an object. If the name points to a symlink in this
directory, it returns the thing it points to, recursively. Returns a CasBasedDirectory, FileNode or None. Never creates a directory or otherwise alters the directory. """
@@ -427,6 +486,157 @@ class CasBasedDirectory(Directory):
result.files_written.append(relative_pathname)
return result
+
+ def _save(self, name):
+ """ Saves this directory into the content cache as a named ref. This function is not
+ currently in use, but may be useful later. """
+ self._recalculate_recursing_up()
+ self._recalculate_recursing_down()
+ (rel_refpath, refname) = os.path.split(name)
+ refdir = os.path.join(self.cas_directory, 'refs', 'heads', rel_refpath)
+ refname = os.path.join(refdir, refname)
+
+ if not os.path.exists(refdir):
+ os.makedirs(refdir)
+ with open(refname, "wb") as f:
+ f.write(self.ref.SerializeToString())
+
+ def find_updated_files(self, modified_directory, prefix=""):
+ """Find the list of written and overwritten files that would result
+ from importing 'modified_directory' into this one. This does
+ not change either directory. The reason this exists is for
+ direct imports of cas directories into other ones, which can
+ be done by simply replacing a hash, but we still need the file
+ lists.
+
+ """
+ result = FileListResult()
+ for entry in modified_directory.pb2_directory.directories:
+ existing_dir = self._find_pb2_entry(entry.name)
+ if existing_dir:
+ updates_files = existing_dir.find_updated_files(modified_directory.descend(entry.name),
+ os.path.join(prefix, entry.name))
+ result.combine(updated_files)
+ else:
+ for f in source_directory.descend(entry.name).list_relative_paths():
+ result.files_written.append(os.path.join(prefix, f))
+ # None of these can overwrite anything, since the original files don't exist
+ for entry in modified_directory.pb2_directory.files + modified_directory.pb2_directory.symlinks:
+ if self._find_pb2_entry(entry.name):
+ result.files_overwritten.apppend(os.path.join(prefix, entry.name))
+ result.file_written.apppend(os.path.join(prefix, entry.name))
+ return result
+
+ def files_in_subdir(sorted_files, dirname):
+ """Filters sorted_files and returns only the ones which have
+ 'dirname' as a prefix, with that prefix removed.
+
+ """
+ if not dirname.endswith(os.path.sep):
+ dirname += os.path.sep
+ return [f[len(dirname):] for f in sorted_files if f.startswith(dirname)]
+
+ def symlink_target_is_directory(self, symlink_node):
+ x = self._resolve_symlink(symlink_node)
+ return isinstance(x, CasBasedDirectory)
+
+ def _partial_import_cas_into_cas(self, source_directory, files, path_prefix="", file_list_required=True):
+ """ Import only the files and symlinks listed in 'files' from source_directory to this one.
+ Args:
+ source_directory (:class:`.CasBasedDirectory`): The directory to import from
+ files ([str]): List of pathnames to import.
+ path_prefix (str): Prefix used to add entries to the file list result.
+ file_list_required: Whether to update the file list while processing.
+ """
+ print("Beginning partial import of {} into {}".format(source_directory, self))
+ result = FileListResult()
+ processed_directories = set()
+ for f in files:
+ if f == ".": continue
+ fullname = os.path.join(path_prefix, f)
+ components = f.split(os.path.sep)
+ if len(components)>1:
+ # We are importing a thing which is in a subdirectory. We may have already seen this dirname
+ # for a previous file.
+ dirname = components[0]
+ if dirname not in processed_directories:
+ # Now strip off the first directory name and import files recursively.
+ subcomponents = CasBasedDirectory.files_in_subdir(files, dirname)
+ self.create_directory(dirname)
+ print("Creating destination in {}: {}".format(self, dirname))
+ dest_subdir = self._resolve_symlink_or_directory(dirname)
+ src_subdir = source_directory.descend(dirname)
+ import_result = dest_subdir._partial_import_cas_into_cas(src_subdir, subcomponents,
+ path_prefix=fullname, file_list_required=file_list_required)
+ result.combine(import_result)
+ processed_directories.add(dirname)
+ elif isinstance(source_directory.index[f].buildstream_object, CasBasedDirectory):
+ # The thing in the input file list is a directory on its own. In which case, replace any existing file, or symlink to file
+ # with the new, blank directory - if it's neither of those things, or doesn't exist, then just create the dir.
+ self.create_directory(f)
+ else:
+ # We're importing a file or symlink - replace anything with the same name.
+ self._check_replacement(f, path_prefix, result)
+ item = source_directory.index[f].pb_object
+ if isinstance(item, remote_execution_pb2.FileNode):
+ filenode = self.pb2_directory.files.add(digest=item.digest, name=f,
+ is_executable=item.is_executable)
+ self.index[f] = IndexEntry(filenode, modified=(fullname in result.overwritten))
+ else:
+ assert(isinstance(item, remote_execution_pb2.SymlinkNode))
+ symlinknode = self.pb2_directory.symlinks.add(name=f, target=item.target)
+ # A symlink node has no digest.
+ self.index[f] = IndexEntry(symlinknode, modified=(fullname in result.overwritten))
+ return result
+
+ def transfer_node_contents(destination, source):
+ """Transfers all fields from the source PB2 node into the
+ destination. Destination and source must be of the same type and must
+ be a FileNode, SymlinkNode or DirectoryNode.
+ """
+ assert(type(destination) == type(source))
+ destination.name = source.name
+ if isinstance(destination, remote_execution_pb2.FileNode):
+ destination.digest.hash = source.digest.hash
+ destination.digest.size_bytes = source.digest.size_bytes
+ destination.is_executable = source.is_executable
+ elif isinstance(destination, remote_execution_pb2.SymlinkNode):
+ destination.target = source.target
+ elif isinstance(destination, remote_execution_pb2.DirectoryNode):
+ destination.digest.hash = source.digest.hash
+ destination.digest.size_bytes = source.digest.size_bytes
+ else:
+ raise VirtualDirectoryError("Incompatible type '{}' used as destination for transfer_node_contents"
+ .format(destination.type))
+
+ def _add_directory_from_node(self, source_node, source_casdir, can_hardlink=False):
+ # Duplicate the given node and add it to our index with a CasBasedDirectory object.
+ # No existing entry with the source node's name can exist.
+ # source_casdir is only needed if can_hardlink is True.
+ assert(self._find_pb2_entry(source_node.name) is None)
+
+ if can_hardlink:
+ new_dir_node = self.pb2_directory.directories.add()
+ CasBasedDirectory.transfer_node_contents(new_dir_node, source_node)
+ self.index[source_node.name] = IndexEntry(source_node, buildstream_object=source_casdir, modified=True)
+ else:
+ new_dir_node = self.pb2_directory.directories.add()
+ CasBasedDirectory.transfer_node_contents(new_dir_node, source_node)
+ buildStreamDirectory = CasBasedDirectory(self.context, ref=source_node.digest,
+ parent=self, filename=source_node.name)
+ self.index[source_node.name] = IndexEntry(source_node, buildstream_object=buildStreamDirectory, modified=True)
+
+ def _import_cas_into_cas(self, source_directory, files=None):
+ """ A full import is significantly quicker than a partial import, because we can just
+ replace one directory with another's hash, without doing any recursion.
+ """
+ if files is None:
+ #return self._full_import_cas_into_cas(source_directory, can_hardlink=True)
+ files = source_directory.list_relative_paths()
+ print("Extracted all files from source directory '{}': {}".format(source_directory, files))
+ return self._partial_import_cas_into_cas(source_directory, files)
+
+
def import_files(self, external_pathspec, *, files=None,
report_written=True, update_utimes=False,
can_link=False):
@@ -448,28 +658,34 @@ class CasBasedDirectory(Directory):
can_link (bool): Ignored, since hard links do not have any meaning within CAS.
"""
- if isinstance(external_pathspec, FileBasedDirectory):
- source_directory = external_pathspec._get_underlying_directory()
- elif isinstance(external_pathspec, CasBasedDirectory):
- # TODO: This transfers from one CAS to another via the
- # filesystem, which is very inefficient. Alter this so it
- # transfers refs across directly.
+
+ duplicate_cas = None
+ if isinstance(external_pathspec, CasBasedDirectory):
+ result = self._import_cas_into_cas(external_pathspec, files=files)
+
+ # Duplicate the current directory and do an import that way.
+ duplicate_cas = CasBasedDirectory(self.context, ref=self.ref)
with tempfile.TemporaryDirectory(prefix="roundtrip") as tmpdir:
external_pathspec.export_files(tmpdir)
if files is None:
files = list_relative_paths(tmpdir)
- result = self._import_files_from_directory(tmpdir, files=files)
- return result
+ duplicate_cas._import_files_from_directory(tmpdir, files=files)
+ duplicate_cas._recalculate_recursing_down()
+ if duplicate_cas.parent:
+ duplicate_cas.parent._recalculate_recursing_up(self)
else:
- source_directory = external_pathspec
-
- if files is None:
- files = list_relative_paths(source_directory)
+ if isinstance(external_pathspec, FileBasedDirectory):
+ source_directory = external_pathspec.get_underlying_directory()
+ else:
+ source_directory = external_pathspec
+ if files is None:
+ files = list_relative_paths(external_pathspec)
+ result = self._import_files_from_directory(source_directory, files=files)
# TODO: No notice is taken of report_written, update_utimes or can_link.
# Current behaviour is to fully populate the report, which is inefficient,
# but still correct.
- result = self._import_files_from_directory(source_directory, files=files)
+
# We need to recalculate and store the hashes of all directories both
# up and down the tree; we have changed our directory by importing files
@@ -479,6 +695,10 @@ class CasBasedDirectory(Directory):
self._recalculate_recursing_down()
if self.parent:
self.parent._recalculate_recursing_up(self)
+ if duplicate_cas:
+ if duplicate_cas.ref.hash != self.ref.hash:
+ raise VirtualDirectoryError("Mismatch between file-imported result {} and cas-to-cas imported result {}.".format(duplicate_cas.ref.hash,self.ref.hash))
+
return result
def set_deterministic_mtime(self):
diff --git a/tests/storage/virtual_directory_import.py b/tests/storage/virtual_directory_import.py
index 1c78c1bb4..47b493506 100644
--- a/tests/storage/virtual_directory_import.py
+++ b/tests/storage/virtual_directory_import.py
@@ -150,9 +150,10 @@ def test_cas_import(cli, tmpdir, original, overlay):
generate_random_root(tmpdir)
d = create_new_casdir(original, fake_context, tmpdir)
d2 = create_new_casdir(overlay, fake_context, tmpdir)
+ print("Importing dir {} into {}".format(overlay, original))
d.import_files(d2)
d.export_files(os.path.join(tmpdir, "output"))
-
+
for item in root_filesets[overlay - 1]:
(path, typename, content) = item
realpath = resolve_symlinks(path, os.path.join(tmpdir, "output"))