diff options
Diffstat (limited to 'Tools/Scripts/webkitpy/common/system/filesystem_mock.py')
-rw-r--r-- | Tools/Scripts/webkitpy/common/system/filesystem_mock.py | 408 |
1 files changed, 408 insertions, 0 deletions
diff --git a/Tools/Scripts/webkitpy/common/system/filesystem_mock.py b/Tools/Scripts/webkitpy/common/system/filesystem_mock.py new file mode 100644 index 000000000..e91d6682a --- /dev/null +++ b/Tools/Scripts/webkitpy/common/system/filesystem_mock.py @@ -0,0 +1,408 @@ +# Copyright (C) 2009 Google Inc. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import StringIO +import errno +import hashlib +import os +import re + +from webkitpy.common.system import path +from webkitpy.common.system import ospath + + +class MockFileSystem(object): + def __init__(self, files=None, dirs=None, cwd='/'): + """Initializes a "mock" filesystem that can be used to completely + stub out a filesystem. + + Args: + files: a dict of filenames -> file contents. A file contents + value of None is used to indicate that the file should + not exist. + """ + self.files = files or {} + self.written_files = {} + self._sep = '/' + self.current_tmpno = 0 + self.cwd = cwd + self.dirs = set(dirs or []) + self.dirs.add(cwd) + for f in self.files: + d = self.dirname(f) + while not d in self.dirs: + self.dirs.add(d) + d = self.dirname(d) + + def _get_sep(self): + return self._sep + + sep = property(_get_sep, doc="pathname separator") + + def clear_written_files(self): + # This function can be used to track what is written between steps in a test. + self.written_files = {} + + def _raise_not_found(self, path): + raise IOError(errno.ENOENT, path, os.strerror(errno.ENOENT)) + + def _split(self, path): + return path.rsplit(self.sep, 1) + + def abspath(self, path): + if os.path.isabs(path): + return self.normpath(path) + return self.abspath(self.join(self.cwd, path)) + + def basename(self, path): + return self._split(path)[1] + + def expanduser(self, path): + if path[0] != "~": + return path + parts = path.split(self.sep, 1) + home_directory = self.sep + "Users" + self.sep + "mock" + if len(parts) == 1: + return home_directory + return home_directory + self.sep + parts[1] + + def path_to_module(self, module_name): + return "/mock-checkout/Tools/Scripts/" + module_name.replace('.', '/') + ".py" + + def chdir(self, path): + path = self.normpath(path) + if not self.isdir(path): + raise OSError(errno.ENOENT, path, os.strerror(errno.ENOENT)) + self.cwd = path + + def copyfile(self, source, destination): + if not self.exists(source): + self._raise_not_found(source) + if self.isdir(source): + raise IOError(errno.EISDIR, source, os.strerror(errno.ISDIR)) + if self.isdir(destination): + raise IOError(errno.EISDIR, destination, os.strerror(errno.ISDIR)) + + self.files[destination] = self.files[source] + self.written_files[destination] = self.files[source] + + def dirname(self, path): + return self._split(path)[0] + + def exists(self, path): + return self.isfile(path) or self.isdir(path) + + def files_under(self, path, dirs_to_skip=[], file_filter=None): + def filter_all(fs, dirpath, basename): + return True + + file_filter = file_filter or filter_all + files = [] + if self.isfile(path): + if file_filter(self, self.dirname(path), self.basename(path)): + files.append(path) + return files + + if self.basename(path) in dirs_to_skip: + return [] + + if not path.endswith(self.sep): + path += self.sep + + dir_substrings = [self.sep + d + self.sep for d in dirs_to_skip] + for filename in self.files: + if not filename.startswith(path): + continue + + suffix = filename[len(path) - 1:] + if any(dir_substring in suffix for dir_substring in dir_substrings): + continue + + dirpath, basename = self._split(filename) + if file_filter(self, dirpath, basename): + files.append(filename) + + return files + + def getcwd(self): + return self.cwd + + def glob(self, glob_string): + # FIXME: This handles '*', but not '?', '[', or ']'. + glob_string = re.escape(glob_string) + glob_string = glob_string.replace('\\*', '[^\\/]*') + '$' + glob_string = glob_string.replace('\\/', '/') + path_filter = lambda path: re.match(glob_string, path) + + # We could use fnmatch.fnmatch, but that might not do the right thing on windows. + existing_files = [path for path, contents in self.files.items() if contents is not None] + return filter(path_filter, existing_files) + filter(path_filter, self.dirs) + + def isabs(self, path): + return path.startswith(self.sep) + + def isfile(self, path): + return path in self.files and self.files[path] is not None + + def isdir(self, path): + if path in self.files: + return False + path = self.normpath(path) + if path in self.dirs: + return True + + # We need to use a copy of the keys here in order to avoid switching + # to a different thread and potentially modifying the dict in + # mid-iteration. + files = self.files.keys()[:] + result = any(f.startswith(path) for f in files) + if result: + self.dirs.add(path) + return result + + def join(self, *comps): + # FIXME: might want tests for this and/or a better comment about how + # it works. + return re.sub(re.escape(os.path.sep), self.sep, os.path.join(*comps)) + + def listdir(self, path): + if not self.isdir(path): + raise OSError("%s is not a directory" % path) + + if not path.endswith(self.sep): + path += self.sep + + dirs = [] + files = [] + for f in self.files: + if self.exists(f) and f.startswith(path): + remaining = f[len(path):] + if self.sep in remaining: + dir = remaining[:remaining.index(self.sep)] + if not dir in dirs: + dirs.append(dir) + else: + files.append(remaining) + return dirs + files + + def mtime(self, path): + if self.exists(path): + return 0 + self._raise_not_found(path) + + def _mktemp(self, suffix='', prefix='tmp', dir=None, **kwargs): + if dir is None: + dir = self.sep + '__im_tmp' + curno = self.current_tmpno + self.current_tmpno += 1 + return self.join(dir, "%s_%u_%s" % (prefix, curno, suffix)) + + def mkdtemp(self, **kwargs): + class TemporaryDirectory(object): + def __init__(self, fs, **kwargs): + self._kwargs = kwargs + self._filesystem = fs + self._directory_path = fs._mktemp(**kwargs) + fs.maybe_make_directory(self._directory_path) + + def __str__(self): + return self._directory_path + + def __enter__(self): + return self._directory_path + + def __exit__(self, type, value, traceback): + # Only self-delete if necessary. + + # FIXME: Should we delete non-empty directories? + if self._filesystem.exists(self._directory_path): + self._filesystem.rmtree(self._directory_path) + + return TemporaryDirectory(fs=self, **kwargs) + + def maybe_make_directory(self, *path): + norm_path = self.normpath(self.join(*path)) + while norm_path and not self.isdir(norm_path): + self.dirs.add(norm_path) + norm_path = self.dirname(norm_path) + + def move(self, source, destination): + if self.files[source] is None: + self._raise_not_found(source) + self.files[destination] = self.files[source] + self.written_files[destination] = self.files[destination] + self.files[source] = None + self.written_files[source] = None + + def normpath(self, path): + # Like join(), relies on os.path functionality but normalizes the + # path separator to the mock one. + return re.sub(re.escape(os.path.sep), self.sep, os.path.normpath(path)) + + def open_binary_tempfile(self, suffix=''): + path = self._mktemp(suffix) + return (WritableBinaryFileObject(self, path), path) + + def open_binary_file_for_reading(self, path): + if self.files[path] is None: + self._raise_not_found(path) + return ReadableBinaryFileObject(self, path, self.files[path]) + + def read_binary_file(self, path): + # Intentionally raises KeyError if we don't recognize the path. + if self.files[path] is None: + self._raise_not_found(path) + return self.files[path] + + def write_binary_file(self, path, contents): + # FIXME: should this assert if dirname(path) doesn't exist? + self.files[path] = contents + self.written_files[path] = contents + + def open_text_file_for_reading(self, path): + if self.files[path] is None: + self._raise_not_found(path) + return ReadableTextFileObject(self, path, self.files[path]) + + def open_text_file_for_writing(self, path): + return WritableTextFileObject(self, path) + + def read_text_file(self, path): + return self.read_binary_file(path).decode('utf-8') + + def write_text_file(self, path, contents): + return self.write_binary_file(path, contents.encode('utf-8')) + + def sha1(self, path): + contents = self.read_binary_file(path) + return hashlib.sha1(contents).hexdigest() + + def relpath(self, path, start='.'): + return ospath.relpath(path, start, self.abspath, self.sep) + + def remove(self, path): + if self.files[path] is None: + self._raise_not_found(path) + self.files[path] = None + self.written_files[path] = None + + def rmtree(self, path): + path = self.normpath(path) + + for f in self.files: + if f.startswith(path): + self.files[f] = None + + self.dirs = set(filter(lambda d: not d.startswith(path), self.dirs)) + + def split(self, path): + idx = path.rfind(self.sep) + if idx == -1: + return ('', path) + return (path[:idx], path[(idx + 1):]) + + def splitext(self, path): + idx = path.rfind('.') + if idx == -1: + idx = 0 + return (path[0:idx], path[idx:]) + + +class WritableBinaryFileObject(object): + def __init__(self, fs, path): + self.fs = fs + self.path = path + self.closed = False + self.fs.files[path] = "" + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + def close(self): + self.closed = True + + def write(self, str): + self.fs.files[self.path] += str + self.fs.written_files[self.path] = self.fs.files[self.path] + + +class WritableTextFileObject(WritableBinaryFileObject): + def write(self, str): + WritableBinaryFileObject.write(self, str.encode('utf-8')) + + +class ReadableBinaryFileObject(object): + def __init__(self, fs, path, data): + self.fs = fs + self.path = path + self.closed = False + self.data = data + self.offset = 0 + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + def close(self): + self.closed = True + + def read(self, bytes=None): + if not bytes: + return self.data[self.offset:] + start = self.offset + self.offset += bytes + return self.data[start:self.offset] + + +class ReadableTextFileObject(ReadableBinaryFileObject): + def __init__(self, fs, path, data): + super(ReadableTextFileObject, self).__init__(fs, path, StringIO.StringIO(data)) + + def close(self): + self.data.close() + super(ReadableTextFileObject, self).close() + + def read(self, bytes=-1): + return self.data.read(bytes) + + def readline(self, length=None): + return self.data.readline(length) + + def __iter__(self): + return self.data.__iter__() + + def next(self): + return self.data.next() + + def seek(self, offset, whence=os.SEEK_SET): + self.data.seek(offset, whence) |