diff options
author | willmcgugan@gmail.com <willmcgugan@gmail.com@67cdc799-7952-0410-af00-57a81ceafa0f> | 2012-11-24 16:09:25 +0000 |
---|---|---|
committer | willmcgugan@gmail.com <willmcgugan@gmail.com@67cdc799-7952-0410-af00-57a81ceafa0f> | 2012-11-24 16:09:25 +0000 |
commit | 17c48538ae56ddda5869613c33edee524b834658 (patch) | |
tree | 8ae0c7ce76332323e6b73bb0358bdac51b3466df | |
parent | 220721359b22654303dc0039fcd4f6ea5d304307 (diff) | |
download | pyfilesystem-17c48538ae56ddda5869613c33edee524b834658.tar.gz |
Fixes for backslashes on Linux issue, see Issue #139
git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@829 67cdc799-7952-0410-af00-57a81ceafa0f
-rw-r--r-- | fs/commands/runner.py | 202 | ||||
-rw-r--r-- | fs/errors.py | 9 | ||||
-rw-r--r-- | fs/osfs/__init__.py | 71 | ||||
-rw-r--r-- | fs/path.py | 149 | ||||
-rw-r--r-- | fs/tests/test_path.py | 14 |
5 files changed, 234 insertions, 211 deletions
diff --git a/fs/commands/runner.py b/fs/commands/runner.py index 71a64b8..e32d036 100644 --- a/fs/commands/runner.py +++ b/fs/commands/runner.py @@ -16,15 +16,15 @@ if platform.system() == 'Windows': try: ## {{{ http://code.activestate.com/recipes/440694/ (r3) from ctypes import windll, create_string_buffer - + # stdin handle is -10 # stdout handle is -11 # stderr handle is -12 - + h = windll.kernel32.GetStdHandle(-12) csbi = create_string_buffer(22) res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) - + if res: import struct (bufx, bufy, curx, cury, wattr, @@ -36,9 +36,9 @@ if platform.system() == 'Windows': return sizex, sizey except: return 80, 25 - + else: - + def getTerminalSize(): def ioctl_GWINSZ(fd): try: @@ -57,10 +57,10 @@ else: except: pass if cr: - return int(cr[1]), int(cr[0]) + return int(cr[1]), int(cr[0]) try: h, w = os.popen("stty size", "r").read().split() - return int(w), int(h) + return int(w), int(h) except: pass return 80, 25 @@ -71,11 +71,11 @@ def _unicode(text): return text class Command(object): - + usage = '' version = '' - - def __init__(self, usage='', version=''): + + def __init__(self, usage='', version=''): self.output_file = sys.stdout self.error_file = sys.stderr self.encoding = getattr(self.output_file, 'encoding', 'utf-8') or 'utf-8' @@ -87,24 +87,24 @@ class Command(object): else: self.terminal_width = 80 self.name = self.__class__.__name__.lower() - + def is_terminal(self): try: return self.output_file.isatty() except AttributeError: - return False - + return False + def wrap_dirname(self, dirname): if not self.terminal_colors: return dirname return '\x1b[1;34m%s\x1b[0m' % dirname - + def wrap_error(self, msg): if not self.terminal_colors: return msg return '\x1b[31m%s\x1b[0m' % msg - - def wrap_filename(self, fname): + + def wrap_filename(self, fname): fname = _unicode(fname) if not self.terminal_colors: return fname @@ -116,28 +116,28 @@ class Command(object): if isdotfile(fname): fname = '\x1b[33m%s\x1b[0m' % fname return fname - + def wrap_faded(self, text): text = _unicode(text) if not self.terminal_colors: return text return u'\x1b[2m%s\x1b[0m' % text - + def wrap_link(self, text): if not self.terminal_colors: return text return u'\x1b[1;33m%s\x1b[0m' % text - + def wrap_strong(self, text): if not self.terminal_colors: return text return u'\x1b[1m%s\x1b[0m' % text - + def wrap_table_header(self, name): if not self.terminal_colors: return name return '\x1b[1;32m%s\x1b[0m' % name - + def highlight_fsurls(self, text): if not self.terminal_colors: return text @@ -146,13 +146,13 @@ class Command(object): fs_url = matchobj.group(0) return self.wrap_link(fs_url) return re.sub(re_fs, repl, text) - - def open_fs(self, fs_url, writeable=False, create_dir=False): - fs, path = opener.parse(fs_url, writeable=writeable, create_dir=create_dir) - fs.cache_hint(True) + + def open_fs(self, fs_url, writeable=False, create_dir=False): + fs, path = opener.parse(fs_url, writeable=writeable, create_dir=create_dir) + fs.cache_hint(True) return fs, path - - def expand_wildcard(self, fs, path): + + def expand_wildcard(self, fs, path): if path is None: return [], [] pathname, resourcename = pathsplit(path) @@ -161,35 +161,35 @@ class Command(object): wildcard=resourcename, absolute=True, dirs_only=True) - + file_paths = fs.listdir(pathname, wildcard=resourcename, absolute=True, files_only=True) return dir_paths, file_paths - - else: - if fs.isdir(path): + + else: + if fs.isdir(path): #file_paths = fs.listdir(path, # absolute=True) return [path], [] return [], [path] - + def get_resources(self, fs_urls, dirs_only=False, files_only=False, single=False): - - fs_paths = [self.open_fs(fs_url) for fs_url in fs_urls] + + fs_paths = [self.open_fs(fs_url) for fs_url in fs_urls] resources = [] - - for fs, path in fs_paths: + + for fs, path in fs_paths: if path and iswildcard(path): if not files_only: dir_paths = fs.listdir(wildcard=path, dirs_only=True) for path in dir_paths: - resources.append([fs, path, True]) - if not dirs_only: + resources.append([fs, path, True]) + if not dirs_only: file_paths = fs.listdir(wildcard=path, files_only=True) for path in file_paths: - resources.append([fs, path, False]) + resources.append([fs, path, False]) else: path = path or '/' is_dir = fs.isdir(path) @@ -199,47 +199,47 @@ class Command(object): elif files_only and not is_dir: resources.append(resource) elif dirs_only and is_dir: - resources.append(resource) - + resources.append(resource) + if single: break - - return resources - + + return resources + def ask(self, msg): - return raw_input('%s: %s ' % (self.name, msg)) - - def text_encode(self, text): + return raw_input('%s: %s ' % (self.name, msg)) + + def text_encode(self, text): if not isinstance(text, unicode): - text = text.decode('ascii', 'replace') + text = text.decode('ascii', 'replace') text = text.encode(self.encoding, 'replace') - + return text - - def output(self, msgs, verbose=False): + + def output(self, msgs, verbose=False): if verbose and not self.options.verbose: - return + return if isinstance(msgs, basestring): - msgs = (msgs,) - for msg in msgs: + msgs = (msgs,) + for msg in msgs: self.output_file.write(self.text_encode(msg)) - + def output_table(self, table, col_process=None, verbose=False): - + if verbose and not self.verbose: return - + if col_process is None: col_process = {} - + max_row_widths = defaultdict(int) - + for row in table: for col_no, col in enumerate(row): max_row_widths[col_no] = max(max_row_widths[col_no], len(col)) - - lines = [] + + lines = [] for row in table: out_col = [] for col_no, col in enumerate(row): @@ -248,12 +248,12 @@ class Command(object): td = col_process[col_no](td) out_col.append(td) lines.append(self.text_encode('%s\n' % ' '.join(out_col).rstrip())) - self.output(''.join(lines)) - + self.output(''.join(lines)) + def error(self, *msgs): for msg in msgs: - self.error_file.write('%s: %s' % (self.name, self.text_encode(msg))) - + self.error_file.write('%s: %s' % (self.name, self.text_encode(msg))) + def get_optparse(self): optparse = OptionParser(usage=self.usage, version=self.version) optparse.add_option('--debug', dest='debug', action="store_true", default=False, @@ -265,29 +265,29 @@ class Command(object): optparse.add_option('--fs', dest='fs', action='append', type="string", help="import an FS opener e.g --fs foo.bar.MyOpener", metavar="OPENER") return optparse - + def list_openers(self): - + opener_table = [] - + for fs_opener in opener.openers.itervalues(): names = fs_opener.names - desc = getattr(fs_opener, 'desc', '') + desc = getattr(fs_opener, 'desc', '') opener_table.append((names, desc)) opener_table.sort(key = lambda r:r[0]) - + def wrap_line(text): - - lines = text.split('\n') + + lines = text.split('\n') for line in lines: words = [] line_len = 0 for word in line.split(): if word == '*': - word = ' *' + word = ' *' if line_len + len(word) > self.terminal_width: - self.output((self.highlight_fsurls(' '.join(words)), '\n')) + self.output((self.highlight_fsurls(' '.join(words)), '\n')) del words[:] line_len = 0 words.append(word) @@ -295,73 +295,73 @@ class Command(object): if words: self.output(self.highlight_fsurls(' '.join(words))) self.output('\n') - - for names, desc in opener_table: - self.output(('-' * self.terminal_width, '\n')) + + for names, desc in opener_table: + self.output(('-' * self.terminal_width, '\n')) proto = ', '.join([n+'://' for n in names]) - self.output((self.wrap_dirname('[%s]' % proto), '\n\n')) + self.output((self.wrap_dirname('[%s]' % proto), '\n\n')) if not desc.strip(): - desc = "No information available" + desc = "No information available" wrap_line(desc) self.output('\n') - - - def run(self): + + + def run(self): parser = self.get_optparse() options, args = parser.parse_args() self.options = options - + if options.listopeners: self.list_openers() return 0 - + ilocals = {} - if options.fs: + if options.fs: for import_opener in options.fs: - module_name, opener_class = import_opener.rsplit('.', 1) + module_name, opener_class = import_opener.rsplit('.', 1) try: - opener_module = __import__(module_name, globals(), ilocals, [opener_class], -1) + opener_module = __import__(module_name, globals(), ilocals, [opener_class], -1) except ImportError: self.error("Unable to import opener %s\n" % import_opener) return 0 - - new_opener = getattr(opener_module, opener_class) - - try: + + new_opener = getattr(opener_module, opener_class) + + try: if not issubclass(new_opener, Opener): self.error('%s is not an fs.opener.Opener\n' % import_opener) return 0 except TypeError: self.error('%s is not an opener class\n' % import_opener) return 0 - + if options.verbose: self.output('Imported opener %s\n' % import_opener) - + opener.add(new_opener) - + args = [unicode(arg, sys.getfilesystemencoding()) for arg in args] - self.verbose = options.verbose + self.verbose = options.verbose try: return self.do_run(options, args) or 0 except FSError, e: self.error(self.wrap_error(unicode(e)) + '\n') if options.debug: raise - return 1 + return 1 except KeyboardInterrupt: if self.is_terminal(): self.output("\n") - return 0 + return 0 except SystemExit: - return 0 - except Exception, e: + return 0 + except Exception, e: self.error(self.wrap_error('Error - %s\n' % unicode(e))) if options.debug: raise return 1 - - + + if __name__ == "__main__": command = Command() sys.exit(command.run())
\ No newline at end of file diff --git a/fs/errors.py b/fs/errors.py index c7256fa..04da17f 100644 --- a/fs/errors.py +++ b/fs/errors.py @@ -24,7 +24,7 @@ __all__ = ['FSError', 'NoMetaError', 'NoPathURLError', 'ResourceNotFoundError', - 'ResourceInvalidError', + 'ResourceInvalidError', 'DestinationExistsError', 'DirectoryNotEmptyError', 'ParentDirectoryMissingError', @@ -42,6 +42,10 @@ from fs.path import * from fs.local_functools import wraps +class InvalidPathError(Exception): + pass + + class FSError(Exception): """Base exception class for the FS module.""" default_message = "Unspecified error" @@ -81,7 +85,7 @@ class PathError(FSError): def __init__(self,path="",**kwds): self.path = path super(PathError,self).__init__(**kwds) - + class OperationFailedError(FSError): """Base exception class for errors associated with a specific operation.""" @@ -184,6 +188,7 @@ class ResourceLockedError(ResourceError): """Exception raised when a resource can't be used because it is locked.""" default_message = "Resource is locked: %(path)s" + class NoMMapError(ResourceError): """Exception raise when getmmap fails to create a mmap""" default_message = "Can't get mmap for %(path)s" diff --git a/fs/osfs/__init__.py b/fs/osfs/__init__.py index ed63bb1..35e4652 100644 --- a/fs/osfs/__init__.py +++ b/fs/osfs/__init__.py @@ -32,13 +32,15 @@ from fs.osfs.watch import OSFSWatchMixin @convert_os_errors def _os_stat(path): - """Replacement for os.stat that raises FSError subclasses.""" + """Replacement for os.stat that raises FSError subclasses.""" return os.stat(path) + @convert_os_errors def _os_mkdir(name, mode=0777): """Replacement for os.mkdir that raises FSError subclasses.""" - return os.mkdir(name,mode) + return os.mkdir(name, mode) + @convert_os_errors def _os_makedirs(name, mode=0777): @@ -64,7 +66,6 @@ def _os_makedirs(name, mode=0777): if tail == os.curdir: return os.mkdir(name, mode) - class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): @@ -74,7 +75,7 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): filesystem of the OS. Most of its methods simply defer to the matching methods in the os and os.path modules. """ - + _meta = { 'thread_safe' : True, 'network' : False, 'virtual' : False, @@ -90,7 +91,7 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): """ Creates an FS object that represents the OS Filesystem under a given root path - :param root_path: The root OS path + :param root_path: The root OS path :param thread_synchronize: If True, this object will be thread-safe by use of a threading.Lock object :param encoding: The encoding method for path strings :param create: If True, then root_path will be created if it doesn't already exist @@ -114,7 +115,7 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): if root_path.startswith("\\\\"): root_path = u"\\\\?\\UNC\\" + root_path[2:] else: - root_path = u"\\\\?" + root_path + root_path = u"\\\\?" + root_path # If it points at the root of a drive, it needs a trailing slash. if len(root_path) == 6 and not root_path.endswith("\\"): root_path = root_path + "\\" @@ -126,9 +127,9 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): pass if not os.path.exists(root_path): - raise ResourceNotFoundError(root_path,msg="Root directory does not exist: %(path)s") + raise ResourceNotFoundError(root_path, msg="Root directory does not exist: %(path)s") if not os.path.isdir(root_path): - raise ResourceInvalidError(root_path,msg="Root path is not a directory: %(path)s") + raise ResourceInvalidError(root_path, msg="Root path is not a directory: %(path)s") self.root_path = root_path self.dir_mode = dir_mode @@ -137,20 +138,20 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): def __repr__(self): return "<OSFS: %r>" % self.root_path - + def __unicode__(self): return u"<OSFS: %s>" % self.root_path def _decode_path(self, p): if isinstance(p, unicode): - return p - return p.decode(self.encoding, 'replace') + return p + return p.decode(self.encoding, 'replace') def getsyspath(self, path, allow_none=False): - path = relpath(normpath(path)).replace("/",os.sep) + path = relpath(normpath(path)).replace("/", os.sep) path = os.path.join(self.root_path, path) if not path.startswith(self.root_path): - raise PathError(path,msg="OSFS given path outside root: %(path)s") + raise PathError(path, msg="OSFS given path outside root: %(path)s") path = self._decode_path(path) return path @@ -159,11 +160,11 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): This basically the reverse of getsyspath(). If the path does not refer to a location within this filesystem, ValueError is raised. - + :param path: a system path :returns: a path within this FS object :rtype: string - + """ path = os.path.normpath(os.path.abspath(path)) path = self._decode_path(path) @@ -173,11 +174,11 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): if not prefix.endswith(os.path.sep): prefix += os.path.sep if not os.path.normcase(path).startswith(prefix): - raise ValueError("path not within this FS: %s (%s)" % (os.path.normcase(path),prefix)) + raise ValueError("path not within this FS: %s (%s)" % (os.path.normcase(path), prefix)) return normpath(path[len(self.root_path):]) def getmeta(self, meta_name, default=NoDefaultMeta): - + if meta_name == 'free_space': if platform.system() == 'Windows': try: @@ -204,11 +205,11 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): else: stat = os.statvfs(self.root_path) return stat.f_blocks * stat.f_bsize - + return super(OSFS, self).getmeta(meta_name, default) @convert_os_errors - def open(self, path, mode="r", **kwargs): + def open(self, path, mode="r", **kwargs): mode = ''.join(c for c in mode if c in 'rwabt+') sys_path = self.getsyspath(path) try: @@ -221,25 +222,25 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): raise @convert_os_errors - def setcontents(self, path, contents, chunk_size=64*1024): - return super(OSFS,self).setcontents(path, contents, chunk_size) + def setcontents(self, path, contents, chunk_size=64 * 1024): + return super(OSFS, self).setcontents(path, contents, chunk_size) @convert_os_errors - def exists(self, path): + def exists(self, path): return _exists(self.getsyspath(path)) @convert_os_errors - def isdir(self, path): + def isdir(self, path): return _isdir(self.getsyspath(path)) @convert_os_errors - def isfile(self, path): + def isfile(self, path): return _isfile(self.getsyspath(path)) @convert_os_errors def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False): - _decode_path = self._decode_path - paths = [_decode_path(p) for p in os.listdir(self.getsyspath(path))] + _decode_path = self._decode_path + paths = [_decode_path(p) for p in os.listdir(self.getsyspath(path))] return self._listdir_helper(path, paths, wildcard, full, absolute, dirs_only, files_only) @convert_os_errors @@ -252,16 +253,16 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): _os_mkdir(sys_path, self.dir_mode) except DestinationExistsError: if self.isfile(path): - raise ResourceInvalidError(path,msg="Cannot create directory, there's already a file of that name: %(path)s") + raise ResourceInvalidError(path, msg="Cannot create directory, there's already a file of that name: %(path)s") if not allow_recreate: - raise DestinationExistsError(path,msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s") + raise DestinationExistsError(path, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s") except ResourceNotFoundError: raise ParentDirectoryMissingError(path) @convert_os_errors def remove(self, path): sys_path = self.getsyspath(path) - try: + try: os.remove(sys_path) except OSError, e: if e.errno == errno.EACCES and sys.platform == "win32": @@ -275,7 +276,7 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): raise @convert_os_errors - def removedir(self, path, recursive=False, force=False): + def removedir(self, path, recursive=False, force=False): sys_path = self.getsyspath(path) if force: for path2 in self.listdir(path, absolute=True, files_only=True): @@ -297,7 +298,7 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): if recursive: try: if dirname(path) not in ('', '/'): - self.removedir(dirname(path),recursive=True) + self.removedir(dirname(path), recursive=True) except DirectoryNotEmptyError: pass @@ -320,9 +321,9 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): if e.errno == errno.ENOENT: if not os.path.exists(os.path.dirname(path_dst)): raise ParentDirectoryMissingError(dst) - raise - - def _stat(self,path): + raise + + def _stat(self, path): """Stat the given path, normalising error codes.""" sys_path = self.getsyspath(path) try: @@ -350,5 +351,3 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): @convert_os_errors def getsize(self, path): return self._stat(path).st_size - - @@ -1,3 +1,5 @@ +from __future__ import unicode_literals + """ fs.path ======= @@ -11,22 +13,23 @@ by forward slashes and with an optional leading slash). """ import re +import os + + +_requires_normalization = re.compile(r'/\.\.|\./|\.|//').search + -_requires_normalization = re.compile(r'/\.\.|\./|\.|//|\\').search def normpath(path): """Normalizes a path to be in the format expected by FS objects. This function remove any leading or trailing slashes, collapses - duplicate slashes, replaces backward with forward slashes, and generally - tries very hard to return a new path string the canonical FS format. + duplicate slashes, and generally tries very hard to return a new path + in the canonical FS format. If the path is invalid, ValueError will be raised. - + :param path: path to normalize :returns: a valid FS path - >>> normpath(r"foo\\bar\\baz") - 'foo/bar/baz' - >>> normpath("/foo//bar/frob/../baz") '/foo/bar/baz' @@ -40,15 +43,13 @@ def normpath(path): if path in ('', '/'): return path - path = path.replace('\\', '/') - # An early out if there is no need to normalize this path if not _requires_normalization(path): return path.rstrip('/') components = [] append = components.append - special = ('..', '.', '').__contains__ + special = ('..', '.', '').__contains__ try: for component in path.split('/'): if special(component): @@ -66,12 +67,27 @@ def normpath(path): return '/'.join(components) +if os.sep != '/': + def ospath(path): + """Replace path separators in an OS path if required""" + return path.replace(os.sep, '/') +else: + def ospath(path): + """Replace path separators in an OS path if required""" + return path + + +def normospath(path): + """Normalizes a path with os separators""" + return normpath(ospath) + + def iteratepath(path, numsplits=None): """Iterate over the individual components of a path. - + :param path: Path to iterate over :numsplits: Maximum number of splits - + """ path = relpath(normpath(path)) if not path: @@ -84,39 +100,40 @@ def iteratepath(path, numsplits=None): def recursepath(path, reverse=False): """Returns intermediate paths from the root to the given path - + :param reverse: reverses the order of the paths - + >>> recursepath('a/b/c') ['/', u'/a', u'/a/b', u'/a/b/c'] - - """ - + + """ + if path in ('', '/'): return [u'/'] - - path = abspath(normpath(path)) + '/' - + + path = abspath(normpath(path)) + '/' + paths = [u'/'] find = path.find - append = paths.append + append = paths.append pos = 1 - len_path = len(path) - - while pos < len_path: - pos = find('/', pos) + len_path = len(path) + + while pos < len_path: + pos = find('/', pos) append(path[:pos]) - pos += 1 - + pos += 1 + if reverse: return paths[::-1] - return paths + return paths def isabs(path): """Return True if path is an absolute path.""" return path.startswith('/') + def abspath(path): """Convert the given path to an absolute path. @@ -134,9 +151,9 @@ def relpath(path): This is the inverse of abspath(), stripping a leading '/' from the path if it is present. - + :param path: Path to adjust - + >>> relpath('/a/b') 'a/b' @@ -146,7 +163,7 @@ def relpath(path): def pathjoin(*paths): """Joins any number of paths together, returning a new path string. - + :param paths: Paths to join are given in positional arguments >>> pathjoin('foo', 'bar', 'baz') @@ -160,10 +177,10 @@ def pathjoin(*paths): """ absolute = False - relpaths = [] + relpaths = [] for p in paths: if p: - if p[0] in '\\/': + if p[0] == '/': del relpaths[:] absolute = True relpaths.append(p) @@ -173,24 +190,26 @@ def pathjoin(*paths): path = abspath(path) return path + def pathcombine(path1, path2): """Joins two paths together. - + This is faster than `pathjoin`, but only works when the second path is relative, - and there are no backreferences in either path. - + and there are no backreferences in either path. + >>> pathcombine("foo/bar", "baz") - 'foo/bar/baz' - - """ + 'foo/bar/baz' + + """ return "%s/%s" % (path1.rstrip('/'), path2.lstrip('/')) + def join(*paths): """Joins any number of paths together, returning a new path string. This is a simple alias for the ``pathjoin`` function, allowing it to be used as ``fs.path.join`` in direct correspondence with ``os.path.join``. - + :param paths: Paths to join are given in positional arguments """ return pathjoin(*paths) @@ -201,7 +220,7 @@ def pathsplit(path): This function splits a path into a pair (head, tail) where 'tail' is the last pathname component and 'head' is all preceding components. - + :param path: Path to split >>> pathsplit("foo/bar") @@ -209,7 +228,7 @@ def pathsplit(path): >>> pathsplit("foo/bar/baz") ('foo/bar', 'baz') - + >>> pathsplit("/foo/bar/baz") ('/foo/bar', 'baz') @@ -234,17 +253,17 @@ def split(path): def splitext(path): """Splits the extension from the path, and returns the path (up to the last '.' and the extension). - + :param path: A path to split - + >>> splitext('baz.txt') ('baz', 'txt') - + >>> splitext('foo/bar/baz.txt') ('foo/bar/baz', 'txt') - + """ - + parent_path, pathname = pathsplit(path) if '.' not in pathname: return path, '' @@ -256,18 +275,18 @@ def splitext(path): def isdotfile(path): """Detects if a path references a dot file, i.e. a resource who's name starts with a '.' - + :param path: Path to check - + >>> isdotfile('.baz') True - + >>> isdotfile('foo/bar/baz') True - + >>> isdotfile('foo/bar.baz'). False - + """ return basename(path).startswith('.') @@ -277,15 +296,15 @@ def dirname(path): This is always equivalent to the 'head' component of the value returned by pathsplit(path). - + :param path: A FS path >>> dirname('foo/bar/baz') 'foo/bar' - + >>> dirname('/foo/bar') '/foo' - + >>> dirname('/foo') '/' @@ -298,15 +317,15 @@ def basename(path): This is always equivalent to the 'tail' component of the value returned by pathsplit(path). - + :param path: A FS path >>> basename('foo/bar/baz') 'baz' - + >>> basename('foo/bar') 'bar' - + >>> basename('foo/bar/') '' @@ -316,7 +335,7 @@ def basename(path): def issamedir(path1, path2): """Return true if two paths reference a resource in the same directory. - + :param path1: An FS path :param path2: An FS path @@ -332,15 +351,15 @@ def issamedir(path1, path2): def isbase(path1, path2): p1 = forcedir(abspath(path1)) p2 = forcedir(abspath(path2)) - return p1 == p2 or p1.startswith(p2) + return p1 == p2 or p1.startswith(p2) def isprefix(path1, path2): """Return true is path1 is a prefix of path2. - + :param path1: An FS path :param path2: An FS path - + >>> isprefix("foo/bar", "foo/bar/spam.txt") True >>> isprefix("foo/bar/", "foo/bar") @@ -365,7 +384,7 @@ def isprefix(path1, path2): def forcedir(path): """Ensure the path ends with a trailing / - + :param path: An FS path >>> forcedir("foo/bar") @@ -602,12 +621,12 @@ class PathMap(object): _wild_chars = frozenset('*?[]!{}') def iswildcard(path): """Check if a path ends with a wildcard - + >>> is_wildcard('foo/bar/baz.*') True >>> is_wildcard('foo/bar') False - + """ assert path is not None base_chars = frozenset(basename(path)) diff --git a/fs/tests/test_path.py b/fs/tests/test_path.py index 639a4cc..40ec742 100644 --- a/fs/tests/test_path.py +++ b/fs/tests/test_path.py @@ -14,7 +14,7 @@ class TestPathFunctions(unittest.TestCase): """Testcases for FS path functions.""" def test_normpath(self): - tests = [ ("\\a\\b\\c", "/a/b/c"), + tests = [ ("\\a\\b\\c", "\\a\\b\\c"), (".", ""), ("./", ""), ("", ""), @@ -22,7 +22,7 @@ class TestPathFunctions(unittest.TestCase): ("a/b/c", "a/b/c"), ("a/b/../c/", "a/c"), ("/","/"), - (u"a/\N{GREEK SMALL LETTER BETA}\\c",u"a/\N{GREEK SMALL LETTER BETA}/c"), + (u"a/\N{GREEK SMALL LETTER BETA}/c",u"a/\N{GREEK SMALL LETTER BETA}/c"), ] for path, result in tests: self.assertEqual(normpath(path), result) @@ -38,7 +38,7 @@ class TestPathFunctions(unittest.TestCase): ("a/b/c", "../d", "c", "a/b/d/c"), ("a/b/c", "../d", "/a", "/a"), ("aaa", "bbb/ccc", "aaa/bbb/ccc"), - ("aaa", "bbb\ccc", "aaa/bbb/ccc"), + ("aaa", "bbb\\ccc", "aaa/bbb\\ccc"), ("aaa", "bbb", "ccc", "/aaa", "eee", "/aaa/eee"), ("a/b", "./d", "e", "a/b/d/e"), ("/", "/", "/"), @@ -104,7 +104,7 @@ class TestPathFunctions(unittest.TestCase): self.assertEquals(recursepath("/hello/world/",reverse=True),["/hello/world","/hello","/"]) self.assertEquals(recursepath("hello",reverse=True),["/hello","/"]) self.assertEquals(recursepath("",reverse=True),["/"]) - + def test_isdotfile(self): for path in ['.foo', '.svn', @@ -112,14 +112,14 @@ class TestPathFunctions(unittest.TestCase): 'foo/bar/.svn', '/foo/.bar']: self.assert_(isdotfile(path)) - + for path in ['asfoo', 'df.svn', 'foo/er.svn', 'foo/bar/test.txt', '/foo/bar']: self.assertFalse(isdotfile(path)) - + def test_dirname(self): tests = [('foo', ''), ('foo/bar', 'foo'), @@ -129,7 +129,7 @@ class TestPathFunctions(unittest.TestCase): ('/', '/')] for path, test_dirname in tests: self.assertEqual(dirname(path), test_dirname) - + def test_basename(self): tests = [('foo', 'foo'), ('foo/bar', 'bar'), |