# # Copyright (C) 2018 Bloomberg LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Jim MacArthur """ CasBasedDirectory ========= Implementation of the Directory class which backs onto a Merkle-tree based content addressable storage system. See also: :ref:`sandboxing`. """ import os from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from .directory import Directory, VirtualDirectoryError, _FileType from ._filebaseddirectory import FileBasedDirectory from ..utils import FileListResult, _magic_timestamp class IndexEntry(): """ Directory entry used in CasBasedDirectory.index """ def __init__(self, name, entrytype, *, digest=None, target=None, is_executable=False, buildstream_object=None, modified=False): self.name = name self.type = entrytype self.digest = digest self.target = target self.is_executable = is_executable self.buildstream_object = buildstream_object self.modified = modified def get_directory(self, parent): if not self.buildstream_object: self.buildstream_object = CasBasedDirectory(parent.cas_cache, digest=self.digest, parent=parent, filename=self.name) self.digest = None return self.buildstream_object class ResolutionException(VirtualDirectoryError): """ Superclass of all exceptions that can be raised by CasBasedDirectory._resolve. Should not be used outside this module. """ class InfiniteSymlinkException(ResolutionException): """ Raised when an infinite symlink loop is found. """ class AbsoluteSymlinkException(ResolutionException): """Raised if we try to follow an absolute symlink (i.e. one whose target starts with the path separator) and we have disallowed following such symlinks. """ class UnexpectedFileException(ResolutionException): """Raised if we were found a file where a directory or symlink was expected, for example we try to resolve a symlink pointing to /a/b/c but /a/b is a file. """ def __init__(self, message=""): """Allow constructor with no arguments, since this can be raised in places where there isn't sufficient information to write the message. """ super().__init__(message) # CasBasedDirectory intentionally doesn't call its superclass constuctor, # which is meant to be unimplemented. # pylint: disable=super-init-not-called class CasBasedDirectory(Directory): """ CAS-based directories can have two names; one is a 'common name' which has no effect on functionality, and the 'filename'. If a CasBasedDirectory has a parent, then 'filename' must be the name of an entry in the parent directory's index which points to this object. This is used to inform a parent directory that it must update the given hash for this object when this object changes. Typically a top-level CasBasedDirectory will have a common_name and no filename, and subdirectories wil have a filename and no common_name. common_name can used to identify CasBasedDirectory objects in a log file, since they have no unique position in a file system. """ # Two constants which define the separators used by the remote execution API. _pb2_path_sep = "/" _pb2_absolute_path_prefix = "/" def __init__(self, cas_cache, *, digest=None, parent=None, common_name="untitled", filename=None): self.filename = filename self.common_name = common_name self.cas_cache = cas_cache self.__digest = digest self.index = {} self.parent = parent if digest: self._populate_index(digest) def _populate_index(self, digest): pb2_directory = remote_execution_pb2.Directory() with open(self.cas_cache.objpath(digest), 'rb') as f: pb2_directory.ParseFromString(f.read()) for entry in pb2_directory.directories: self.index[entry.name] = IndexEntry(entry.name, _FileType.DIRECTORY, digest=entry.digest) for entry in pb2_directory.files: self.index[entry.name] = IndexEntry(entry.name, _FileType.REGULAR_FILE, digest=entry.digest, is_executable=entry.is_executable) for entry in pb2_directory.symlinks: self.index[entry.name] = IndexEntry(entry.name, _FileType.SYMLINK, target=entry.target) def _find_self_in_parent(self): assert self.parent is not None parent = self.parent for (k, v) in parent.index.items(): if v.buildstream_object == self: return k return None def _add_directory(self, name): assert name not in self.index newdir = CasBasedDirectory(self.cas_cache, parent=self, filename=name) self.index[name] = IndexEntry(name, _FileType.DIRECTORY, buildstream_object=newdir) self.__invalidate_digest() return newdir def _add_file(self, basename, filename, modified=False): entry = IndexEntry(filename, _FileType.REGULAR_FILE, modified=modified or filename in self.index) entry.digest = self.cas_cache.add_object(path=os.path.join(basename, filename)) entry.is_executable = os.access(os.path.join(basename, filename), os.X_OK) self.index[filename] = entry self.__invalidate_digest() def _copy_link_from_filesystem(self, basename, filename): self._add_new_link_direct(filename, os.readlink(os.path.join(basename, filename))) def _add_new_link_direct(self, name, target): self.index[name] = IndexEntry(name, _FileType.SYMLINK, target=target, modified=name in self.index) self.__invalidate_digest() def delete_entry(self, name): if name in self.index: del self.index[name] self.__invalidate_digest() def descend(self, subdirectory_spec, create=False): """Descend one or more levels of directory hierarchy and return a new Directory object for that directory. Arguments: * subdirectory_spec (list of strings): A list of strings which are all directory names. * create (boolean): If this is true, the directories will be created if they don't already exist. Note: At the moment, creating a directory by descending does not update this object in the CAS cache. However, performing an import_files() into a subdirectory of any depth obtained by descending from this object *will* cause this directory to be updated and stored. """ # It's very common to send a directory name instead of a list and this causes # bizarre errors, so check for it here if not isinstance(subdirectory_spec, list): subdirectory_spec = [subdirectory_spec] # Because of the way split works, it's common to get a list which begins with # an empty string. Detect these and remove them. while subdirectory_spec and subdirectory_spec[0] == "": subdirectory_spec.pop(0) # Descending into [] returns the same directory. if not subdirectory_spec: return self if subdirectory_spec[0] in self.index: entry = self.index[subdirectory_spec[0]] if entry.type == _FileType.DIRECTORY: subdir = entry.get_directory(self) return subdir.descend(subdirectory_spec[1:], create) else: error = "Cannot descend into {}, which is a '{}' in the directory {}" raise VirtualDirectoryError(error.format(subdirectory_spec[0], self.index[subdirectory_spec[0]].type, self)) else: if create: newdir = self._add_directory(subdirectory_spec[0]) return newdir.descend(subdirectory_spec[1:], create) else: error = "'{}' not found in {}" raise VirtualDirectoryError(error.format(subdirectory_spec[0], str(self))) return None def _check_replacement(self, name, path_prefix, fileListResult): """ Checks whether 'name' exists, and if so, whether we can overwrite it. If we can, add the name to 'overwritten_files' and delete the existing entry. Returns 'True' if the import should go ahead. fileListResult.overwritten and fileListResult.ignore are updated depending on the result. """ existing_entry = self.index.get(name) relative_pathname = os.path.join(path_prefix, name) if existing_entry is None: return True elif existing_entry.type == _FileType.DIRECTORY: # If 'name' maps to a DirectoryNode, then there must be an entry in index # pointing to another Directory. subdir = existing_entry.get_directory(self) if subdir.is_empty(): self.delete_entry(name) fileListResult.overwritten.append(relative_pathname) return True else: # We can't overwrite a non-empty directory, so we just ignore it. fileListResult.ignored.append(relative_pathname) return False else: self.delete_entry(name) fileListResult.overwritten.append(relative_pathname) return True def _import_files_from_directory(self, source_directory, filter_callback, *, path_prefix="", result): """ Import files from a traditional directory. """ for direntry in sorted(os.scandir(source_directory), key=lambda e: e.name): # The destination filename, relative to the root where the import started relative_pathname = os.path.join(path_prefix, direntry.name) is_dir = direntry.is_dir(follow_symlinks=False) if is_dir: src_subdir = os.path.join(source_directory, direntry.name) try: create_subdir = direntry.name not in self.index dest_subdir = self.descend(direntry.name, create=create_subdir) except VirtualDirectoryError: filetype = self.index[direntry.name].type raise VirtualDirectoryError('Destination is a {}, not a directory: /{}' .format(filetype, relative_pathname)) dest_subdir._import_files_from_directory(src_subdir, filter_callback, path_prefix=relative_pathname, result=result) if filter_callback and not filter_callback(relative_pathname): if is_dir and create_subdir and dest_subdir.is_empty(): # Complete subdirectory has been filtered out, remove it self.delete_entry(direntry.name) # Entry filtered out, move to next continue if direntry.is_file(follow_symlinks=False): if self._check_replacement(direntry.name, path_prefix, result): self._add_file(source_directory, direntry.name, modified=relative_pathname in result.overwritten) result.files_written.append(relative_pathname) elif direntry.is_symlink(): if self._check_replacement(direntry.name, path_prefix, result): self._copy_link_from_filesystem(source_directory, direntry.name) result.files_written.append(relative_pathname) def _partial_import_cas_into_cas(self, source_directory, filter_callback, *, path_prefix="", result): """ Import files from a CAS-based directory. """ for name, entry in sorted(source_directory.index.items()): # The destination filename, relative to the root where the import started relative_pathname = os.path.join(path_prefix, name) is_dir = entry.type == _FileType.DIRECTORY if is_dir: src_subdir = source_directory.descend(name) try: create_subdir = name not in self.index dest_subdir = self.descend(name, create=create_subdir) except VirtualDirectoryError: filetype = self.index[name].type raise VirtualDirectoryError('Destination is a {}, not a directory: /{}' .format(filetype, relative_pathname)) dest_subdir._partial_import_cas_into_cas(src_subdir, filter_callback, path_prefix=relative_pathname, result=result) if filter_callback and not filter_callback(relative_pathname): if is_dir and create_subdir and dest_subdir.is_empty(): # Complete subdirectory has been filtered out, remove it self.delete_entry(name) # Entry filtered out, move to next continue if not is_dir: if self._check_replacement(name, path_prefix, result): if entry.type == _FileType.REGULAR_FILE: self.index[name] = IndexEntry(name, _FileType.REGULAR_FILE, digest=entry.digest, is_executable=entry.is_executable, modified=True) self.__invalidate_digest() else: assert entry.type == _FileType.SYMLINK self._add_new_link_direct(name=name, target=entry.target) result.files_written.append(relative_pathname) def import_files(self, external_pathspec, *, filter_callback=None, report_written=True, update_mtime=False, can_link=False): """ See superclass Directory for arguments """ result = FileListResult() if isinstance(external_pathspec, FileBasedDirectory): source_directory = external_pathspec._get_underlying_directory() self._import_files_from_directory(source_directory, filter_callback, result=result) elif isinstance(external_pathspec, str): source_directory = external_pathspec self._import_files_from_directory(source_directory, filter_callback, result=result) else: assert isinstance(external_pathspec, CasBasedDirectory) self._partial_import_cas_into_cas(external_pathspec, filter_callback, result=result) # TODO: No notice is taken of report_written, update_mtime or can_link. # Current behaviour is to fully populate the report, which is inefficient, # but still correct. return result def set_deterministic_mtime(self): """ Sets a static modification time for all regular files in this directory. Since we don't store any modification time, we don't need to do anything. """ def set_deterministic_user(self): """ Sets all files in this directory to the current user's euid/egid. We also don't store user data, so this can be ignored. """ def export_files(self, to_directory, *, can_link=False, can_destroy=False): """Copies everything from this into to_directory, which must be the name of a traditional filesystem directory. Arguments: to_directory (string): a path outside this directory object where the contents will be copied to. can_link (bool): Whether we can create hard links in to_directory instead of copying. can_destroy (bool): Whether we can destroy elements in this directory to export them (e.g. by renaming them as the target). """ self.cas_cache.checkout(to_directory, self._get_digest(), can_link=can_link) def export_to_tar(self, tarfile, destination_dir, mtime=_magic_timestamp): raise NotImplementedError() def mark_changed(self): """ It should not be possible to externally modify a CAS-based directory at the moment.""" raise NotImplementedError() def is_empty(self): """ Return true if this directory has no files, subdirectories or links in it. """ return len(self.index) == 0 def _mark_directory_unmodified(self): # Marks all entries in this directory and all child directories as unmodified. for i in self.index.values(): i.modified = False if i.type == _FileType.DIRECTORY and i.buildstream_object: i.buildstream_object._mark_directory_unmodified() def _mark_entry_unmodified(self, name): # Marks an entry as unmodified. If the entry is a directory, it will # recursively mark all its tree as unmodified. self.index[name].modified = False if self.index[name].buildstream_object: self.index[name].buildstream_object._mark_directory_unmodified() def mark_unmodified(self): """ Marks all files in this directory (recursively) as unmodified. If we have a parent, we mark our own entry as unmodified in that parent's index. """ if self.parent: self.parent._mark_entry_unmodified(self._find_self_in_parent()) else: self._mark_directory_unmodified() def _lightweight_resolve_to_index(self, path): """A lightweight function for transforming paths into IndexEntry objects. This does not follow symlinks. path: The string to resolve. This should be a series of path components separated by the protocol buffer path separator _pb2_path_sep. Returns: the IndexEntry found, or None if any of the path components were not present. """ directory = self path_components = path.split(CasBasedDirectory._pb2_path_sep) for component in path_components[:-1]: if component not in directory.index: return None if directory.index[component].type == _FileType.DIRECTORY: directory = directory.index[component].get_directory(self) else: return None return directory.index.get(path_components[-1], None) def list_modified_paths(self): """Provide a list of relative paths which have been modified since the last call to mark_unmodified. Return value: List(str) - list of modified paths """ for p in self.list_relative_paths(): i = self._lightweight_resolve_to_index(p) if i and i.modified: yield p def list_relative_paths(self, relpath=""): """Provide a list of all relative paths. Return value: List(str) - list of all paths """ file_list = list(filter(lambda i: i[1].type != _FileType.DIRECTORY, self.index.items())) directory_list = filter(lambda i: i[1].type == _FileType.DIRECTORY, self.index.items()) if relpath != "": yield relpath for (k, v) in sorted(file_list): yield os.path.join(relpath, k) for (k, v) in sorted(directory_list): subdir = v.get_directory(self) yield from subdir.list_relative_paths(relpath=os.path.join(relpath, k)) def get_size(self): digest = self._get_digest() total = digest.size_bytes for i in self.index.values(): if i.type == _FileType.DIRECTORY: subdir = i.get_directory(self) total += subdir.get_size() elif i.type == _FileType.REGULAR_FILE: src_name = self.cas_cache.objpath(i.digest) filesize = os.stat(src_name).st_size total += filesize # Symlink nodes are encoded as part of the directory serialization. return total def _get_identifier(self): path = "" if self.parent: path = self.parent._get_identifier() if self.filename: path += "/" + self.filename else: path += "/" + self.common_name return path def __str__(self): return "[CAS:{}]".format(self._get_identifier()) def _get_underlying_directory(self): """ There is no underlying directory for a CAS-backed directory, so throw an exception. """ raise VirtualDirectoryError("_get_underlying_directory was called on a CAS-backed directory," + " which has no underlying directory.") # _get_digest(): # # Return the Digest for this directory. # # Returns: # (Digest): The Digest protobuf object for the Directory protobuf # def _get_digest(self): if not self.__digest: # Create updated Directory proto pb2_directory = remote_execution_pb2.Directory() for name, entry in sorted(self.index.items()): if entry.type == _FileType.DIRECTORY: dirnode = pb2_directory.directories.add() dirnode.name = name # Update digests for subdirectories in DirectoryNodes. # No need to call entry.get_directory(). # If it hasn't been instantiated, digest must be up-to-date. subdir = entry.buildstream_object if subdir: dirnode.digest.CopyFrom(subdir._get_digest()) else: dirnode.digest.CopyFrom(entry.digest) elif entry.type == _FileType.REGULAR_FILE: filenode = pb2_directory.files.add() filenode.name = name filenode.digest.CopyFrom(entry.digest) filenode.is_executable = entry.is_executable elif entry.type == _FileType.SYMLINK: symlinknode = pb2_directory.symlinks.add() symlinknode.name = name symlinknode.target = entry.target self.__digest = self.cas_cache.add_object(buffer=pb2_directory.SerializeToString()) return self.__digest def _objpath(self, path): subdir = self.descend(path[:-1]) entry = subdir.index[path[-1]] return self.cas_cache.objpath(entry.digest) def _exists(self, path): try: subdir = self.descend(path[:-1]) return path[-1] in subdir.index except VirtualDirectoryError: return False def __invalidate_digest(self): if self.__digest: self.__digest = None if self.parent: self.parent.__invalidate_digest()