diff options
-rw-r--r-- | fs/errors.py | 12 | ||||
-rw-r--r-- | fs/osfs/__init__.py | 14 | ||||
-rw-r--r-- | fs/path.py | 2 | ||||
-rw-r--r-- | fs/tests/test_fs.py | 13 |
4 files changed, 27 insertions, 14 deletions
diff --git a/fs/errors.py b/fs/errors.py index 04da17f..eea21d0 100644 --- a/fs/errors.py +++ b/fs/errors.py @@ -11,6 +11,7 @@ catch-all exception. __all__ = ['FSError', 'CreateFailedError', 'PathError', + 'InvalidCharsInPathError', 'OperationFailedError', 'UnsupportedError', 'RemoteConnectionError', @@ -32,7 +33,7 @@ __all__ = ['FSError', 'NoMMapError', 'BackReferenceError', 'convert_fs_errors', - 'convert_os_errors' + 'convert_os_errors', ] import sys @@ -42,10 +43,6 @@ from fs.path import * from fs.local_functools import wraps -class InvalidPathError(Exception): - pass - - class FSError(Exception): """Base exception class for the FS module.""" default_message = "Unspecified error" @@ -71,7 +68,6 @@ class FSError(Exception): return (self.__class__,(),self.__dict__.copy(),) - class CreateFailedError(FSError): """An exception thrown when a FS could not be created""" default_message = "Unable to create filesystem" @@ -87,6 +83,10 @@ class PathError(FSError): super(PathError,self).__init__(**kwds) +class InvalidCharsInPathError(PathError): + default_message = "Path contains invalid characters: %(path)s" + + class OperationFailedError(FSError): """Base exception class for errors associated with a specific operation.""" default_message = "Unable to %(opname)s: unspecified error [%(errno)s - %(details)s]" diff --git a/fs/osfs/__init__.py b/fs/osfs/__init__.py index 35e4652..a6f0a3a 100644 --- a/fs/osfs/__init__.py +++ b/fs/osfs/__init__.py @@ -22,8 +22,8 @@ import datetime import platform from fs.base import * -from fs.errors import * from fs.path import * +from fs.errors import * from fs import _thread_synchronize_default from fs.osfs.xattrs import OSFSXAttrMixin @@ -87,6 +87,12 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): 'atomic.setcontents' : False, } + if sys.platform == 'win32': + _invalid_path_chars = '\\:*?"<>|' + else: + _invalid_path_chars = '\0' + _re_invalid_path_chars = re.compile('|'.join(re.escape(c) for c in _invalid_path_chars), re.UNICODE) + def __init__(self, root_path, thread_synchronize=_thread_synchronize_default, encoding=None, create=False, dir_mode=0700, use_long_paths=True): """ Creates an FS object that represents the OS Filesystem under a given root path @@ -147,12 +153,18 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS): return p return p.decode(self.encoding, 'replace') + def _validate_path(self, path): + """Raise an error if there are any invalid characters in the path""" + if self._re_invalid_path_chars.search(path): + raise InvalidCharsInPathError(path) + def getsyspath(self, path, allow_none=False): path = relpath(normpath(path)).replace("/", os.sep) path = os.path.join(self.root_path, path) if not path.startswith(self.root_path): raise PathError(path, msg="OSFS given path outside root: %(path)s") path = self._decode_path(path) + self._validate_path(path) return path def unsyspath(self, path): @@ -1,5 +1,3 @@ -from __future__ import unicode_literals - """ fs.path ======= diff --git a/fs/tests/test_fs.py b/fs/tests/test_fs.py index 65719c5..c816abd 100644 --- a/fs/tests/test_fs.py +++ b/fs/tests/test_fs.py @@ -5,6 +5,8 @@ """ from fs.tests import FSTestCases, ThreadingTestCases +from fs.path import * +from fs import errors import unittest @@ -13,8 +15,6 @@ import sys import shutil import tempfile -from fs.path import * - from fs import osfs class TestOSFS(unittest.TestCase,FSTestCases,ThreadingTestCases): @@ -30,10 +30,13 @@ class TestOSFS(unittest.TestCase,FSTestCases,ThreadingTestCases): def check(self, p): return os.path.exists(os.path.join(self.temp_dir, relpath(p))) + def test_invalid_chars(self): + self.assertRaises(errors.InvalidCharsInPathError, self.fs.open, 'invalid\0file', 'wb') + class TestSubFS(unittest.TestCase,FSTestCases,ThreadingTestCases): - def setUp(self): + def setUp(self): self.temp_dir = tempfile.mkdtemp(u"fstest") self.parent_fs = osfs.OSFS(self.temp_dir) self.parent_fs.makedir("foo/bar", recursive=True) @@ -93,7 +96,7 @@ class TestMountFS_stacked(unittest.TestCase,FSTestCases,ThreadingTestCases): self.mount_fs.mountdir("mem", self.mem_fs1) self.mount_fs.mountdir("mem/two", self.mem_fs2) self.fs = self.mount_fs.opendir("/mem/two") - + def tearDown(self): self.fs.close() @@ -115,4 +118,4 @@ class TestTempFS(unittest.TestCase,FSTestCases,ThreadingTestCases): def check(self, p): td = self.fs._temp_dir return os.path.exists(os.path.join(td, relpath(p))) - + |