From 9a23be35ebaac211da6a67e63a0fc46f142558b3 Mon Sep 17 00:00:00 2001 From: rfkelly0 Date: Thu, 4 Jun 2009 05:51:16 +0000 Subject: Rename 'helpers' to 'path'. Split tests up into several smaller modules git-svn-id: http://pyfilesystem.googlecode.com/svn/branches/rfk-ideas@152 67cdc799-7952-0410-af00-57a81ceafa0f --- fs/__init__.py | 4 +- fs/base.py | 12 +- fs/expose/sftp.py | 8 +- fs/helpers.py | 219 ---------- fs/memoryfs.py | 4 +- fs/mountfs.py | 4 +- fs/multifs.py | 5 +- fs/osfs.py | 6 +- fs/path.py | 206 +++++++++ fs/s3fs.py | 5 +- fs/sftpfs.py | 5 +- fs/tests.py | 997 -------------------------------------------- fs/tests/__init__.py | 443 ++++++++++++++++++++ fs/tests/test_expose.py | 95 +++++ fs/tests/test_fs.py | 88 ++++ fs/tests/test_objecttree.py | 47 +++ fs/tests/test_path.py | 98 +++++ fs/tests/test_s3fs.py | 41 ++ fs/tests/test_xattr.py | 116 ++++++ fs/tests/test_zipfs.py | 153 +++++++ fs/wrappers/hidedotfiles.py | 4 +- fs/wrappers/xattr.py | 2 +- fs/zipfs.py | 3 +- 23 files changed, 1317 insertions(+), 1248 deletions(-) delete mode 100644 fs/helpers.py create mode 100644 fs/path.py delete mode 100644 fs/tests.py create mode 100644 fs/tests/__init__.py create mode 100644 fs/tests/test_expose.py create mode 100644 fs/tests/test_fs.py create mode 100644 fs/tests/test_objecttree.py create mode 100644 fs/tests/test_path.py create mode 100644 fs/tests/test_s3fs.py create mode 100644 fs/tests/test_xattr.py create mode 100644 fs/tests/test_zipfs.py diff --git a/fs/__init__.py b/fs/__init__.py index 438c822..cd8a13b 100644 --- a/fs/__init__.py +++ b/fs/__init__.py @@ -6,9 +6,9 @@ A filesystem abstraction. __version__ = "0.1.1dev" __author__ = "Will McGugan (will@willmcgugan.com)" -# 'base' imports * from 'helpers' and 'errors' +# 'base' imports * from 'path' and 'errors' from base import * import errors -import helpers +import path diff --git a/fs/base.py b/fs/base.py index 5400cf9..8dfeb2d 100644 --- a/fs/base.py +++ b/fs/base.py @@ -20,8 +20,8 @@ except ImportError: import dummy_threading as threading import dummy_threading -from helpers import * -from errors import * +from fs.path import * +from fs.errors import * def silence_fserrors(f, *args, **kwargs): @@ -581,7 +581,7 @@ class FS(object): self.makedir(dst, allow_recreate=True) for dirname, filenames in self.walk(src, search="depth"): - dst_dirname = makerelative(dirname[len(src):]) + dst_dirname = relpath(dirname[len(src):]) dst_dirpath = pathjoin(dst, dst_dirname) self.makedir(dst_dirpath, allow_recreate=True, recursive=True) @@ -625,7 +625,7 @@ class FS(object): self.makedir(dst, allow_recreate=True) for dirname, filenames in self.walk(src): - dst_dirname = makerelative(dirname[len(src):]) + dst_dirname = relpath(dirname[len(src):]) dst_dirpath = pathjoin(dst, dst_dirname) self.makedir(dst_dirpath, allow_recreate=True) @@ -713,10 +713,10 @@ class SubFS(FS): files_only) if absolute: listpath = normpath(path) - paths = [makeabsolute(pathjoin(listpath, path)) for path in paths] + paths = [abspath(pathjoin(listpath, path)) for path in paths] elif full: listpath = normpath(path) - paths = [makerelative(pathjoin(listpath, path)) for path in paths] + paths = [relpath(pathjoin(listpath, path)) for path in paths] return paths diff --git a/fs/expose/sftp.py b/fs/expose/sftp.py index 9147f3a..0042ee8 100644 --- a/fs/expose/sftp.py +++ b/fs/expose/sftp.py @@ -31,8 +31,8 @@ from StringIO import StringIO import paramiko +from fs.path import * from fs.errors import * -from fs.helpers import * try: @@ -99,7 +99,7 @@ class SFTPServerInterface(paramiko.SFTPServerInterface): def stat(self,path): info = self.fs.getinfo(path) stat = paramiko.SFTPAttributes() - stat.filename = resourcename(path) + stat.filename = basename(path) stat.st_size = info.get("size") stat.st_atime = time.mktime(info.get("accessed_time").timetuple()) stat.st_mtime = time.mktime(info.get("modified_time").timetuple()) @@ -136,7 +136,7 @@ class SFTPServerInterface(paramiko.SFTPServerInterface): return paramiko.SFTP_OK def canonicalize(self,path): - return makeabsolute(path) + return abspath(path) def chattr(self,path,attr): return paramiko.SFTP_OP_UNSUPPORTED @@ -299,5 +299,5 @@ if __name__ == "__main__": try: server.serve_forever() except (SystemExit,KeyboardInterrupt): - server.shutdown() + server.server_close() diff --git a/fs/helpers.py b/fs/helpers.py deleted file mode 100644 index f88b17e..0000000 --- a/fs/helpers.py +++ /dev/null @@ -1,219 +0,0 @@ -""" - - fs.helpers: useful standalone functions for FS path manipulation. - -""" - -from itertools import chain - - -def iteratepath(path, numsplits=None): - """Iterate over the individual components of a path.""" - path = makerelative(normpath(path)) - if not path: - return [] - if numsplits == None: - return path.split('/') - else: - return path.split('/', numsplits) - - -def normpath(path): - """Normalizes a path to be in the format expected by FS objects. - - This function remove any leading or trailing slashes, collapses - duplicate slashes, replaces forward with backward slashes, and generally - tries very hard to return a new path string the canonical FS format. - If the path is invalid, ValueError will be raised. - - >>> normpath(r"foo\\bar\\baz") - 'foo/bar/baz' - - >>> normpath("/foo//bar/frob/../baz") - '/foo/bar/baz' - - >>> normpath("foo/../../bar") - Traceback (most recent call last) - ... - ValueError: too many backrefs in path 'foo/../../bar' - - """ - if not path: - return path - - components = [] - for comp in path.replace('\\','/').split("/"): - if not comp or comp == ".": - pass - elif comp == "..": - try: - components.pop() - except IndexError: - err = "too many backrefs in path '%s'" % (path,) - raise ValueError(err) - else: - components.append(comp) - - if path[0] in "\\/": - if not components: - components = [""] - components.insert(0,"") - - return "/".join(components) - - -def abspath(path): - """Convert the given path to a normalized, absolute path. - - path -- A FS path - """ - path = normpath(path) - if not path or path[0] != "/": - path = "/" + path - return path - - -def relpath(path): - """Convert the given path to a normalized, relative path. - - path -- A FS path - """ - path = normpath(path) - if path and path[0] == "/": - path = path[1:] - return path - - -def pathjoin(*paths): - """Joins any number of paths together, returning a new path string. - - paths -- An iterable of path strings - - >>> pathjoin('foo', 'bar', 'baz') - 'foo/bar/baz' - - >>> pathjoin('foo/bar', '../baz') - 'foo/baz' - - >>> pathjoin('foo/bar', '/baz') - '/baz' - - """ - absolute = False - - relpaths = [] - for p in paths: - if p: - if p[0] in '\\/': - del relpaths[:] - absolute = True - relpaths.append(p) - - path = normpath("/".join(relpaths)) - if absolute and not path.startswith("/"): - path = "/" + path - return path - - -def pathsplit(path): - """Splits a path on a path separator. Returns a tuple containing the path up - to that last separator and the remaining path component. - - path -- A FS path - - >>> pathsplit("foo/bar") - ('foo', 'bar') - - >>> pathsplit("foo/bar/baz") - ('foo/bar', 'baz') - - """ - split = normpath(path).rsplit('/', 1) - if len(split) == 1: - return ('', split[0]) - return tuple(split) - - -def dirname(path): - """Returns the parent directory of a path. - - path -- A FS path - - >>> dirname('foo/bar/baz') - 'foo/bar' - - """ - return pathsplit(path)[0] - - -def resourcename(path): - """Returns the resource references by a path. - - path -- A FS path - - >>> resourcename('foo/bar/baz') - 'baz' - - """ - return pathsplit(path)[1] - - -def makerelative(path): - """Makes a path relative by removing initial separator. - - path -- A FS path - - >>> makerelative("/foo/bar") - 'foo/bar' - - """ - path = normpath(path) - if path.startswith('/'): - return path[1:] - return path - - -def makeabsolute(path): - """Makes a path absolute by adding a separator at the start of the path. - - path -- A FS path - - >>> makeabsolute("foo/bar/baz") - '/foo/bar/baz' - - """ - path = normpath(path) - if not path.startswith('/'): - return '/'+path - return path - - -def issamedir(path1, path2): - """Return true if two paths reference a resource in the same directory. - - path1 -- First path - path2 -- Second path - - >>> issamedir("foo/bar/baz.txt", "foo/bar/spam.txt") - True - >>> issamedir("foo/bar/baz/txt", "spam/eggs/spam.txt") - False - """ - return pathsplit(normpath(path1))[0] == pathsplit(normpath(path2))[0] - - -def isprefix(path1,path2): - """Return true is path1 is a prefix of path2.""" - bits1 = path1.split("/") - bits2 = path2.split("/") - while bits1 and bits1[-1] == "": - bits1.pop() - if len(bits1) > len(bits2): - return False - for (bit1,bit2) in zip(bits1,bits2): - if bit1 != bit2: - return False - return True - - - diff --git a/fs/memoryfs.py b/fs/memoryfs.py index 54be786..2fc4a86 100644 --- a/fs/memoryfs.py +++ b/fs/memoryfs.py @@ -8,8 +8,8 @@ Obviously that makes this particular filesystem very fast... """ import datetime -from helpers import iteratepath -from base import * +from fs.path import iteratepath +from fs.base import * try: from cStringIO import StringIO diff --git a/fs/mountfs.py b/fs/mountfs.py index 742b4d7..777dfd3 100644 --- a/fs/mountfs.py +++ b/fs/mountfs.py @@ -128,9 +128,9 @@ class MountFS(FS): files_only=files_only) if full or absolute: if full: - path = makeabsolute(path) + path = abspath(path) else: - path = makerelative(path) + path = relpath(path) paths = [pathjoin(path, p) for p in paths] return paths diff --git a/fs/multifs.py b/fs/multifs.py index c295e61..837f667 100644 --- a/fs/multifs.py +++ b/fs/multifs.py @@ -1,7 +1,8 @@ #!/usr/in/env python -from base import FS, FSError -from helpers import * +from fs.base import FS, FSError +from fs.path import * + class MultiFS(FS): diff --git a/fs/osfs.py b/fs/osfs.py index 8fc1838..f99b8fc 100644 --- a/fs/osfs.py +++ b/fs/osfs.py @@ -2,8 +2,8 @@ import os -from base import * -from helpers import * +from fs.base import * +from fs.path import * try: import xattr @@ -35,7 +35,7 @@ class OSFS(FS): return "" % self.root_path def getsyspath(self, path, allow_none=False): - sys_path = os.path.join(self.root_path, makerelative(normpath(path))).replace('/', os.sep) + sys_path = os.path.join(self.root_path, relpath(path)).replace('/', os.sep) return sys_path def open(self, path, mode="r", **kwargs): diff --git a/fs/path.py b/fs/path.py new file mode 100644 index 0000000..b4659d5 --- /dev/null +++ b/fs/path.py @@ -0,0 +1,206 @@ +""" + + fs.path: useful functions for FS path manipulation. + +This is broadly similar to the standard 'os.path' module but works with +paths in the canonical format expected by all FS objects (backslash-separated, +optional leading slash). + +""" + + +def normpath(path): + """Normalizes a path to be in the format expected by FS objects. + + This function remove any leading or trailing slashes, collapses + duplicate slashes, replaces forward with backward slashes, and generally + tries very hard to return a new path string the canonical FS format. + If the path is invalid, ValueError will be raised. + + >>> normpath(r"foo\\bar\\baz") + 'foo/bar/baz' + + >>> normpath("/foo//bar/frob/../baz") + '/foo/bar/baz' + + >>> normpath("foo/../../bar") + Traceback (most recent call last) + ... + ValueError: too many backrefs in path 'foo/../../bar' + + """ + if not path: + return path + components = [] + for comp in path.replace('\\','/').split("/"): + if not comp or comp == ".": + pass + elif comp == "..": + try: + components.pop() + except IndexError: + err = "too many backrefs in path '%s'" % (path,) + raise ValueError(err) + else: + components.append(comp) + if path[0] in "\\/": + if not components: + components = [""] + components.insert(0,"") + return "/".join(components) + + +def iteratepath(path, numsplits=None): + """Iterate over the individual components of a path.""" + path = relpath(normpath(path)) + if not path: + return [] + if numsplits == None: + return path.split('/') + else: + return path.split('/', numsplits) + + +def abspath(path): + """Convert the given path to a normalized, absolute path. + + Since FS objects have no concept of a 'current directory' this simply + adds a leading '/' character if the path doesn't already have one. + + """ + path = normpath(path) + if not path: + return "/" + if path[0] != "/": + return "/" + path + return path + + +def relpath(path): + """Convert the given path to a normalized, relative path. + + This is the inverse of abspath(), stripping a leading '/' from the + path if it is present. + + """ + path = normpath(path) + while path and path[0] == "/": + path = path[1:] + return path + + +def pathjoin(*paths): + """Joins any number of paths together, returning a new path string. + + >>> pathjoin('foo', 'bar', 'baz') + 'foo/bar/baz' + + >>> pathjoin('foo/bar', '../baz') + 'foo/baz' + + >>> pathjoin('foo/bar', '/baz') + '/baz' + + """ + absolute = False + relpaths = [] + for p in paths: + if p: + if p[0] in '\\/': + del relpaths[:] + absolute = True + relpaths.append(p) + + path = normpath("/".join(relpaths)) + if absolute and not path.startswith("/"): + path = "/" + path + return path + +# Allow pathjoin() to be used as fs.path.join() +join = pathjoin + + +def pathsplit(path): + """Splits a path into (head,tail) pair. + + This function splits a path into a pair (head,tail) where 'tail' is the + last pathname component and 'head' is all preceeding components. + + >>> pathsplit("foo/bar") + ('foo', 'bar') + + >>> pathsplit("foo/bar/baz") + ('foo/bar', 'baz') + + """ + split = normpath(path).rsplit('/', 1) + if len(split) == 1: + return ('', split[0]) + return tuple(split) + +# Allow pathsplit() to be used as fs.path.split() +split = pathsplit + + +def dirname(path): + """Returns the parent directory of a path. + + This is always equivalent to the 'head' component of the value returned + by pathsplit(path). + + >>> dirname('foo/bar/baz') + 'foo/bar' + + """ + return pathsplit(path)[0] + + +def basename(path): + """Returns the basename of the resource referenced by a path. + + This is always equivalent to the 'head' component of the value returned + by pathsplit(path). + + >>> basename('foo/bar/baz') + 'baz' + + """ + return pathsplit(path)[1] + + +def issamedir(path1, path2): + """Return true if two paths reference a resource in the same directory. + + >>> issamedir("foo/bar/baz.txt", "foo/bar/spam.txt") + True + >>> issamedir("foo/bar/baz/txt", "spam/eggs/spam.txt") + False + + """ + return pathsplit(normpath(path1))[0] == pathsplit(normpath(path2))[0] + + +def isprefix(path1,path2): + """Return true is path1 is a prefix of path2. + + >>> isprefix("foo/bar", "foo/bar/spam.txt") + True + >>> isprefix("foo/bar/", "foo/bar") + True + >>> isprefix("foo/barry", "foo/baz/bar") + False + >>> isprefix("foo/bar/baz/", "foo/baz/bar") + False + + """ + bits1 = path1.split("/") + bits2 = path2.split("/") + while bits1 and bits1[-1] == "": + bits1.pop() + if len(bits1) > len(bits2): + return False + for (bit1,bit2) in zip(bits1,bits2): + if bit1 != bit2: + return False + return True + diff --git a/fs/s3fs.py b/fs/s3fs.py index 145a604..a1cc233 100644 --- a/fs/s3fs.py +++ b/fs/s3fs.py @@ -18,7 +18,6 @@ except ImportError: from tempfile import NamedTemporaryFile as TempFile from fs.base import * -from fs.helpers import * class S3FS(FS): @@ -116,7 +115,7 @@ class S3FS(FS): def _s3path(self,path): """Get the absolute path to a file stored in S3.""" - path = makerelative(path) + path = relpath(path) path = self._separator.join(iteratepath(path)) s3path = self._prefix + path if s3path[-1] == self._separator: @@ -436,7 +435,7 @@ class S3FS(FS): # Since S3 lists in lexicographic order, subsequent iterations # of the loop will check for the existence of the new filename. if k.name == s3path_dstD: - nm = resourcename(src) + nm = basename(src) dst = pathjoin(dirname(dst),nm) s3path_dst = s3path_dstD + nm dstOK = True diff --git a/fs/sftpfs.py b/fs/sftpfs.py index 72855af..6429791 100644 --- a/fs/sftpfs.py +++ b/fs/sftpfs.py @@ -10,7 +10,6 @@ import stat as statinfo import paramiko from fs.base import * -from fs.helpers import * if not hasattr(paramiko.SFTPFile,"__enter__"): @@ -53,7 +52,7 @@ class SFTPFS(FS): if not connection.is_authenticated(): connection.connect(**credentials) self.client = paramiko.SFTPClient.from_transport(connection) - self.root = makeabsolute(root) + self.root = abspath(root) def __del__(self): self.close() @@ -84,7 +83,7 @@ class SFTPFS(FS): self.client = None def _normpath(self,path): - npath = pathjoin(self.root,makerelative(path)) + npath = pathjoin(self.root,relpath(path)) if not isprefix(self.root,npath): raise PathError(path,msg="Path is outside root: %(path)s") return npath diff --git a/fs/tests.py b/fs/tests.py deleted file mode 100644 index 2ceb42c..0000000 --- a/fs/tests.py +++ /dev/null @@ -1,997 +0,0 @@ -#!/usr/bin/env python -""" - - fs.tests: testcases for the fs module - -""" - -import sys -import logging -logging.basicConfig(level=logging.ERROR,stream=sys.stdout) - -import unittest - -import shutil -import tempfile -import pickle -import socket -import threading - -import base as fs -from helpers import * - - -#################### - - -class FSTestCases: - """Base suite of testcases for filesystem implementations. - - Any FS subclass should be capable of passing all of these tests. - To apply the tests to your own FS implementation, simply subclass - FSTestCase and have the setUp method set self.fs to an instance - of your FS implementation. - - Note that you'll also need to subclass unittest.TestCase - this class - is designed as a mixin so that it's not picked up by test tools such - as nose. - """ - - def check(self, p): - """Check that a file exists within self.fs""" - return self.fs.exists(p) - - def test_root_dir(self): - self.assertTrue(self.fs.isdir("")) - self.assertTrue(self.fs.isdir("/")) - - def test_debug(self): - str(self.fs) - repr(self.fs) - self.assert_(hasattr(self.fs, 'desc')) - - def test_writefile(self): - self.assertRaises(fs.ResourceNotFoundError,self.fs.open,"test1.txt") - f = self.fs.open("test1.txt","w") - f.write("testing") - f.close() - f = self.fs.open("test1.txt","r") - self.assertEquals(f.read(),"testing") - - def test_isdir_isfile(self): - self.assertFalse(self.fs.exists("dir1")) - self.assertFalse(self.fs.isdir("dir1")) - self.assertFalse(self.fs.isfile("a.txt")) - self.fs.createfile("a.txt") - self.assertFalse(self.fs.isdir("dir1")) - self.assertTrue(self.fs.exists("a.txt")) - self.assertTrue(self.fs.isfile("a.txt")) - self.fs.makedir("dir1") - self.assertTrue(self.fs.isdir("dir1")) - self.assertTrue(self.fs.exists("dir1")) - self.assertTrue(self.fs.exists("a.txt")) - self.fs.remove("a.txt") - self.assertFalse(self.fs.exists("a.txt")) - - def test_listdir(self): - self.fs.createfile("a") - self.fs.createfile("b") - self.fs.createfile("foo") - self.fs.createfile("bar") - # Test listing of the root directory - d1 = self.fs.listdir() - self.assertEqual(len(d1), 4) - self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"]) - d1 = self.fs.listdir("") - self.assertEqual(len(d1), 4) - self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"]) - d1 = self.fs.listdir("/") - self.assertEqual(len(d1), 4) - # Test listing absolute paths - d2 = self.fs.listdir(absolute=True) - self.assertEqual(len(d2), 4) - self.assertEqual(sorted(d2), ["/a", "/b", "/bar", "/foo"]) - # Create some deeper subdirectories, to make sure their - # contents are not inadvertantly included - self.fs.makedir("p/1/2/3",recursive=True) - self.fs.createfile("p/1/2/3/a") - self.fs.createfile("p/1/2/3/b") - self.fs.createfile("p/1/2/3/foo") - self.fs.createfile("p/1/2/3/bar") - self.fs.makedir("q") - # Test listing just files, just dirs, and wildcards - dirs_only = self.fs.listdir(dirs_only=True) - files_only = self.fs.listdir(files_only=True) - contains_a = self.fs.listdir(wildcard="*a*") - self.assertEqual(sorted(dirs_only), ["p", "q"]) - self.assertEqual(sorted(files_only), ["a", "b", "bar", "foo"]) - self.assertEqual(sorted(contains_a), ["a", "bar"]) - # Test listing a subdirectory - d3 = self.fs.listdir("p/1/2/3") - self.assertEqual(len(d3), 4) - self.assertEqual(sorted(d3), ["a", "b", "bar", "foo"]) - # Test listing a subdirectory with absoliute and full paths - d4 = self.fs.listdir("p/1/2/3", absolute=True) - self.assertEqual(len(d4), 4) - self.assertEqual(sorted(d4), ["/p/1/2/3/a", "/p/1/2/3/b", "/p/1/2/3/bar", "/p/1/2/3/foo"]) - d4 = self.fs.listdir("p/1/2/3", full=True) - self.assertEqual(len(d4), 4) - self.assertEqual(sorted(d4), ["p/1/2/3/a", "p/1/2/3/b", "p/1/2/3/bar", "p/1/2/3/foo"]) - # Test that appropriate errors are raised - self.assertRaises(fs.ResourceNotFoundError,self.fs.listdir,"zebra") - self.assertRaises(fs.ResourceInvalidError,self.fs.listdir,"foo") - - def test_makedir(self): - check = self.check - self.fs.makedir("a") - self.assertTrue(check("a")) - self.assertRaises(fs.ParentDirectoryMissingError,self.fs.makedir,"a/b/c") - self.fs.makedir("a/b/c", recursive=True) - self.assert_(check("a/b/c")) - self.fs.makedir("foo/bar/baz", recursive=True) - self.assert_(check("foo/bar/baz")) - self.fs.makedir("a/b/child") - self.assert_(check("a/b/child")) - self.assertRaises(fs.DestinationExistsError,self.fs.makedir,"/a/b") - self.fs.makedir("/a/b",allow_recreate=True) - self.fs.createfile("/a/file") - self.assertRaises(fs.ResourceInvalidError,self.fs.makedir,"a/file") - - def test_remove(self): - self.fs.createfile("a.txt") - self.assertTrue(self.check("a.txt")) - self.fs.remove("a.txt") - self.assertFalse(self.check("a.txt")) - self.assertRaises(fs.ResourceNotFoundError,self.fs.remove,"a.txt") - self.fs.makedir("dir1") - self.assertRaises(fs.ResourceInvalidError,self.fs.remove,"dir1") - self.fs.createfile("/dir1/a.txt") - self.assertTrue(self.check("dir1/a.txt")) - self.fs.remove("dir1/a.txt") - self.assertFalse(self.check("/dir1/a.txt")) - - def test_removedir(self): - check = self.check - self.fs.makedir("a") - self.assert_(check("a")) - self.fs.removedir("a") - self.assert_(not check("a")) - self.fs.makedir("a/b/c/d", recursive=True) - self.assertRaises(fs.DirectoryNotEmptyError, self.fs.removedir, "a/b") - self.fs.removedir("a/b/c/d") - self.assert_(not check("a/b/c/d")) - self.fs.removedir("a/b/c") - self.assert_(not check("a/b/c")) - self.fs.removedir("a/b") - self.assert_(not check("a/b")) - # Test recursive removal of empty parent dirs - self.fs.makedir("foo/bar/baz", recursive=True) - self.fs.removedir("foo/bar/baz", recursive=True) - self.assert_(not check("foo/bar/baz")) - self.assert_(not check("foo/bar")) - self.assert_(not check("foo")) - # Ensure that force=True works as expected - self.fs.makedir("frollic/waggle", recursive=True) - self.fs.createfile("frollic/waddle.txt","waddlewaddlewaddle") - self.assertRaises(fs.DirectoryNotEmptyError,self.fs.removedir,"frollic") - self.assertRaises(fs.ResourceInvalidError,self.fs.removedir,"frollic/waddle.txt") - self.fs.removedir("frollic",force=True) - self.assert_(not check("frollic")) - - def test_rename(self): - check = self.check - self.fs.open("foo.txt", 'wt').write("Hello, World!") - self.assert_(check("foo.txt")) - self.fs.rename("foo.txt", "bar.txt") - self.assert_(check("bar.txt")) - self.assert_(not check("foo.txt")) - - def test_info(self): - test_str = "Hello, World!" - self.fs.createfile("info.txt",test_str) - info = self.fs.getinfo("info.txt") - self.assertEqual(info['size'], len(test_str)) - self.fs.desc("info.txt") - - def test_getsize(self): - test_str = "*"*23 - self.fs.createfile("info.txt",test_str) - size = self.fs.getsize("info.txt") - self.assertEqual(size, len(test_str)) - - def test_movefile(self): - check = self.check - contents = "If the implementation is hard to explain, it's a bad idea." - def makefile(path): - self.fs.createfile(path,contents) - def checkcontents(path): - check_contents = self.fs.getcontents(path) - self.assertEqual(check_contents,contents) - return contents == check_contents - - self.fs.makedir("foo/bar", recursive=True) - makefile("foo/bar/a.txt") - self.assert_(check("foo/bar/a.txt")) - self.assert_(checkcontents("foo/bar/a.txt")) - self.fs.move("foo/bar/a.txt", "foo/b.txt") - self.assert_(not check("foo/bar/a.txt")) - self.assert_(check("foo/b.txt")) - self.assert_(checkcontents("foo/b.txt")) - - self.fs.move("foo/b.txt", "c.txt") - self.assert_(not check("foo/b.txt")) - self.assert_(check("/c.txt")) - self.assert_(checkcontents("/c.txt")) - - makefile("foo/bar/a.txt") - self.assertRaises(fs.DestinationExistsError,self.fs.move,"foo/bar/a.txt","/c.txt") - self.assert_(check("foo/bar/a.txt")) - self.assert_(check("/c.txt")) - self.fs.move("foo/bar/a.txt","/c.txt",overwrite=True) - self.assert_(not check("foo/bar/a.txt")) - self.assert_(check("/c.txt")) - - - def test_movedir(self): - check = self.check - contents = "If the implementation is hard to explain, it's a bad idea." - def makefile(path): - self.fs.createfile(path,contents) - - self.fs.makedir("a") - self.fs.makedir("b") - makefile("a/1.txt") - makefile("a/2.txt") - makefile("a/3.txt") - self.fs.makedir("a/foo/bar", recursive=True) - makefile("a/foo/bar/baz.txt") - - self.fs.movedir("a", "copy of a") - - self.assert_(check("copy of a/1.txt")) - self.assert_(check("copy of a/2.txt")) - self.assert_(check("copy of a/3.txt")) - self.assert_(check("copy of a/foo/bar/baz.txt")) - - self.assert_(not check("a/1.txt")) - self.assert_(not check("a/2.txt")) - self.assert_(not check("a/3.txt")) - self.assert_(not check("a/foo/bar/baz.txt")) - self.assert_(not check("a/foo/bar")) - self.assert_(not check("a/foo")) - self.assert_(not check("a")) - - self.fs.makedir("a") - print self.fs.listdir("a") - self.assertRaises(fs.DestinationExistsError,self.fs.movedir,"copy of a","a") - self.fs.movedir("copy of a","a",overwrite=True) - self.assert_(not check("copy of a")) - print self.fs.listdir("a") - self.assert_(check("a/1.txt")) - self.assert_(check("a/2.txt")) - self.assert_(check("a/3.txt")) - self.assert_(check("a/foo/bar/baz.txt")) - - - def test_copyfile(self): - check = self.check - contents = "If the implementation is hard to explain, it's a bad idea." - def makefile(path,contents=contents): - self.fs.createfile(path,contents) - def checkcontents(path,contents=contents): - check_contents = self.fs.getcontents(path) - self.assertEqual(check_contents,contents) - return contents == check_contents - - self.fs.makedir("foo/bar", recursive=True) - makefile("foo/bar/a.txt") - self.assert_(check("foo/bar/a.txt")) - self.assert_(checkcontents("foo/bar/a.txt")) - self.fs.copy("foo/bar/a.txt", "foo/b.txt") - self.assert_(check("foo/bar/a.txt")) - self.assert_(check("foo/b.txt")) - self.assert_(checkcontents("foo/b.txt")) - - self.fs.copy("foo/b.txt", "c.txt") - self.assert_(check("foo/b.txt")) - self.assert_(check("/c.txt")) - self.assert_(checkcontents("/c.txt")) - - makefile("foo/bar/a.txt","different contents") - self.assertRaises(fs.DestinationExistsError,self.fs.copy,"foo/bar/a.txt","/c.txt") - self.assert_(checkcontents("/c.txt")) - self.fs.copy("foo/bar/a.txt","/c.txt",overwrite=True) - self.assert_(checkcontents("foo/bar/a.txt","different contents")) - self.assert_(checkcontents("/c.txt","different contents")) - - - def test_copydir(self): - check = self.check - contents = "If the implementation is hard to explain, it's a bad idea." - def makefile(path): - self.fs.createfile(path,contents) - def checkcontents(path): - check_contents = self.fs.getcontents(path) - self.assertEqual(check_contents,contents) - return contents == check_contents - - self.fs.makedir("a") - self.fs.makedir("b") - makefile("a/1.txt") - makefile("a/2.txt") - makefile("a/3.txt") - self.fs.makedir("a/foo/bar", recursive=True) - makefile("a/foo/bar/baz.txt") - - self.fs.copydir("a", "copy of a") - self.assert_(check("copy of a/1.txt")) - self.assert_(check("copy of a/2.txt")) - self.assert_(check("copy of a/3.txt")) - self.assert_(check("copy of a/foo/bar/baz.txt")) - checkcontents("copy of a/1.txt") - - self.assert_(check("a/1.txt")) - self.assert_(check("a/2.txt")) - self.assert_(check("a/3.txt")) - self.assert_(check("a/foo/bar/baz.txt")) - checkcontents("a/1.txt") - - self.assertRaises(fs.DestinationExistsError,self.fs.copydir,"a","b") - self.fs.copydir("a","b",overwrite=True) - self.assert_(check("b/1.txt")) - self.assert_(check("b/2.txt")) - self.assert_(check("b/3.txt")) - self.assert_(check("b/foo/bar/baz.txt")) - checkcontents("b/1.txt") - - def test_copydir_with_dotfile(self): - check = self.check - contents = "If the implementation is hard to explain, it's a bad idea." - def makefile(path): - self.fs.createfile(path,contents) - - self.fs.makedir("a") - makefile("a/1.txt") - makefile("a/2.txt") - makefile("a/.hidden.txt") - - self.fs.copydir("a", "copy of a") - self.assert_(check("copy of a/1.txt")) - self.assert_(check("copy of a/2.txt")) - self.assert_(check("copy of a/.hidden.txt")) - - self.assert_(check("a/1.txt")) - self.assert_(check("a/2.txt")) - self.assert_(check("a/.hidden.txt")) - - def test_readwriteappendseek(self): - def checkcontents(path, check_contents): - read_contents = self.fs.getcontents(path) - self.assertEqual(read_contents,check_contents) - return read_contents == check_contents - test_strings = ["Beautiful is better than ugly.", - "Explicit is better than implicit.", - "Simple is better than complex."] - all_strings = "".join(test_strings) - - self.assertRaises(fs.ResourceNotFoundError, self.fs.open, "a.txt", "r") - self.assert_(not self.fs.exists("a.txt")) - f1 = self.fs.open("a.txt", "wb") - pos = 0 - for s in test_strings: - f1.write(s) - pos += len(s) - self.assertEqual(pos, f1.tell()) - f1.close() - self.assert_(self.fs.exists("a.txt")) - self.assert_(checkcontents("a.txt", all_strings)) - - f2 = self.fs.open("b.txt", "wb") - f2.write(test_strings[0]) - f2.close() - self.assert_(checkcontents("b.txt", test_strings[0])) - f3 = self.fs.open("b.txt", "ab") - f3.write(test_strings[1]) - f3.write(test_strings[2]) - f3.close() - self.assert_(checkcontents("b.txt", all_strings)) - f4 = self.fs.open("b.txt", "wb") - f4.write(test_strings[2]) - f4.close() - self.assert_(checkcontents("b.txt", test_strings[2])) - f5 = self.fs.open("c.txt", "wb") - for s in test_strings: - f5.write(s+"\n") - f5.close() - f6 = self.fs.open("c.txt", "rb") - for s, t in zip(f6, test_strings): - self.assertEqual(s, t+"\n") - f6.close() - f7 = self.fs.open("c.txt", "rb") - f7.seek(13) - word = f7.read(6) - self.assertEqual(word, "better") - f7.seek(1, os.SEEK_CUR) - word = f7.read(4) - self.assertEqual(word, "than") - f7.seek(-9, os.SEEK_END) - word = f7.read(7) - self.assertEqual(word, "complex") - f7.close() - self.assertEqual(self.fs.getcontents("a.txt"), all_strings) - - def test_with_statement(self): - import sys - if sys.version_info[0] >= 2 and sys.version_info[1] >= 5: - # A successful 'with' statement - contents = "testing the with statement" - code = "from __future__ import with_statement\n" - code += "with self.fs.open('f.txt','w-') as testfile:\n" - code += " testfile.write(contents)\n" - code += "self.assertEquals(self.fs.getcontents('f.txt'),contents)" - code = compile(code,"",'exec') - eval(code) - # A 'with' statement raising an error - contents = "testing the with statement" - code = "from __future__ import with_statement\n" - code += "with self.fs.open('f.txt','w-') as testfile:\n" - code += " testfile.write(contents)\n" - code += " raise ValueError\n" - code = compile(code,"",'exec') - self.assertRaises(ValueError,eval,code,globals(),locals()) - self.assertEquals(self.fs.getcontents('f.txt'),contents) - - def test_pickling(self): - self.fs.createfile("test1","hello world") - oldfs = self.fs - self.fs = pickle.loads(pickle.dumps(self.fs)) - self.assert_(self.fs.isfile("test1")) - - - -class XAttrTestCases: - """Testcases for filesystems providing extended attribute support.""" - - def test_getsetdel(self): - def do_getsetdel(p): - self.assertEqual(self.fs.getxattr(p,"xattr1"),None) - self.fs.setxattr(p,"xattr1","value1") - self.assertEqual(self.fs.getxattr(p,"xattr1"),"value1") - self.fs.delxattr(p,"xattr1") - self.assertEqual(self.fs.getxattr(p,"xattr1"),None) - self.fs.createfile("test.txt","hello") - do_getsetdel("test.txt") - self.assertRaises(fs.ResourceNotFoundError,self.fs.getxattr,"test2.txt","xattr1") - self.fs.makedir("mystuff") - self.fs.createfile("/mystuff/test.txt","") - do_getsetdel("mystuff") - do_getsetdel("mystuff/test.txt") - - def test_list_xattrs(self): - def do_list(p): - self.assertEquals(sorted(self.fs.xattrs(p)),[]) - self.fs.setxattr(p,"xattr1","value1") - self.assertEquals(sorted(self.fs.xattrs(p)),["xattr1"]) - self.fs.setxattr(p,"attr2","value2") - self.assertEquals(sorted(self.fs.xattrs(p)),["attr2","xattr1"]) - self.fs.delxattr(p,"xattr1") - self.assertEquals(sorted(self.fs.xattrs(p)),["attr2"]) - self.fs.delxattr(p,"attr2") - self.assertEquals(sorted(self.fs.xattrs(p)),[]) - self.fs.createfile("test.txt","hello") - do_list("test.txt") - self.fs.makedir("mystuff") - self.fs.createfile("/mystuff/test.txt","") - do_list("mystuff") - do_list("mystuff/test.txt") - - def test_copy_xattrs(self): - self.fs.createfile("a.txt","content") - self.fs.setxattr("a.txt","myattr","myvalue") - self.fs.setxattr("a.txt","testattr","testvalue") - self.fs.makedir("stuff") - self.fs.copy("a.txt","stuff/a.txt") - self.assertTrue(self.fs.exists("stuff/a.txt")) - self.assertEquals(self.fs.getxattr("stuff/a.txt","myattr"),"myvalue") - self.assertEquals(self.fs.getxattr("stuff/a.txt","testattr"),"testvalue") - self.assertEquals(self.fs.getxattr("a.txt","myattr"),"myvalue") - self.assertEquals(self.fs.getxattr("a.txt","testattr"),"testvalue") - self.fs.setxattr("stuff","dirattr","a directory") - self.fs.copydir("stuff","stuff2") - self.assertEquals(self.fs.getxattr("stuff2/a.txt","myattr"),"myvalue") - self.assertEquals(self.fs.getxattr("stuff2/a.txt","testattr"),"testvalue") - self.assertEquals(self.fs.getxattr("stuff2","dirattr"),"a directory") - self.assertEquals(self.fs.getxattr("stuff","dirattr"),"a directory") - - def test_move_xattrs(self): - self.fs.createfile("a.txt","content") - self.fs.setxattr("a.txt","myattr","myvalue") - self.fs.setxattr("a.txt","testattr","testvalue") - self.fs.makedir("stuff") - self.fs.move("a.txt","stuff/a.txt") - self.assertTrue(self.fs.exists("stuff/a.txt")) - self.assertEquals(self.fs.getxattr("stuff/a.txt","myattr"),"myvalue") - self.assertEquals(self.fs.getxattr("stuff/a.txt","testattr"),"testvalue") - self.fs.setxattr("stuff","dirattr","a directory") - self.fs.movedir("stuff","stuff2") - self.assertEquals(self.fs.getxattr("stuff2/a.txt","myattr"),"myvalue") - self.assertEquals(self.fs.getxattr("stuff2/a.txt","testattr"),"testvalue") - self.assertEquals(self.fs.getxattr("stuff2","dirattr"),"a directory") - - -#################### - - -class TestHelpers(unittest.TestCase): - """Testcases for FS path helpers.""" - - def test_normpath(self): - tests = [ ("\\a\\b\\c", "/a/b/c"), - ("", ""), - ("/a/b/c", "/a/b/c"), - ("a/b/c", "a/b/c"), - ("a/b/../c/", "a/c"), - ("/","/"), - ] - for path, result in tests: - self.assertEqual(normpath(path), result) - - def test_pathjoin(self): - tests = [ ("", "a", "a"), - ("a", "a", "a/a"), - ("a/b", "../c", "a/c"), - ("a/b/../c", "d", "a/c/d"), - ("/a/b/c", "d", "/a/b/c/d"), - ("/a/b/c", "../../../d", "/d"), - ("a", "b", "c", "a/b/c"), - ("a/b/c", "../d", "c", "a/b/d/c"), - ("a/b/c", "../d", "/a", "/a"), - ("aaa", "bbb/ccc", "aaa/bbb/ccc"), - ("aaa", "bbb\ccc", "aaa/bbb/ccc"), - ("aaa", "bbb", "ccc", "/aaa", "eee", "/aaa/eee"), - ("a/b", "./d", "e", "a/b/d/e"), - ("/", "/", "/"), - ("/", "", "/"), - ] - for testpaths in tests: - paths = testpaths[:-1] - result = testpaths[-1] - self.assertEqual(fs.pathjoin(*paths), result) - - self.assertRaises(ValueError, fs.pathjoin, "../") - self.assertRaises(ValueError, fs.pathjoin, "./../") - self.assertRaises(ValueError, fs.pathjoin, "a/b", "../../..") - self.assertRaises(ValueError, fs.pathjoin, "a/b/../../../d") - - def test_makerelative(self): - tests = [ ("/a/b", "a/b"), - ("a/b", "a/b"), - ("/", "") ] - - for path, result in tests: - print path, result - self.assertEqual(fs.makerelative(path), result) - - def test_makeabsolute(self): - tests = [ ("/a/b", "/a/b"), - ("a/b", "/a/b"), - ("/", "/") ] - - for path, result in tests: - self.assertEqual(fs.makeabsolute(path), result) - - def test_iteratepath(self): - tests = [ ("a/b", ["a", "b"]), - ("", [] ), - ("aaa/bbb/ccc", ["aaa", "bbb", "ccc"]), - ("a/b/c/../d", ["a", "b", "d"]) ] - - for path, results in tests: - print repr(path), results - for path_component, expected in zip(iteratepath(path), results): - self.assertEqual(path_component, expected) - - self.assertEqual(list(iteratepath("a/b/c/d", 1)), ["a", "b/c/d"]) - self.assertEqual(list(iteratepath("a/b/c/d", 2)), ["a", "b", "c/d"]) - - def test_pathsplit(self): - tests = [ ("a/b", ("a", "b")), - ("a/b/c", ("a/b", "c")), - ("a", ("", "a")), - ("", ("", "")), - ("/", ("", "")), - ("foo/bar", ("foo", "bar")), - ("foo/bar/baz", ("foo/bar", "baz")), - ] - for path, result in tests: - self.assertEqual(fs.pathsplit(path), result) - - -#################### - - -import objecttree - -class TestObjectTree(unittest.TestCase): - """Testcases for the ObjectTree class.""" - - def test_getset(self): - ot = objecttree.ObjectTree() - ot['foo'] = "bar" - self.assertEqual(ot['foo'], 'bar') - - ot = objecttree.ObjectTree() - ot['foo/bar'] = "baz" - self.assertEqual(ot['foo'], {'bar':'baz'}) - self.assertEqual(ot['foo/bar'], 'baz') - - del ot['foo/bar'] - self.assertEqual(ot['foo'], {}) - - ot = objecttree.ObjectTree() - ot['a/b/c'] = "A" - ot['a/b/d'] = "B" - ot['a/b/e'] = "C" - ot['a/b/f'] = "D" - self.assertEqual(sorted(ot['a/b'].values()), ['A', 'B', 'C', 'D']) - self.assert_(ot.get('a/b/x', -1) == -1) - - self.assert_('a/b/c' in ot) - self.assert_('a/b/x' not in ot) - self.assert_(ot.isobject('a/b/c')) - self.assert_(ot.isobject('a/b/d')) - self.assert_(not ot.isobject('a/b')) - - left, object, right = ot.partialget('a/b/e/f/g') - self.assertEqual(left, "a/b/e") - self.assertEqual(object, "C") - self.assertEqual(right, "f/g") - - -#################### - - -import osfs -import os - -class TestOSFS(unittest.TestCase,FSTestCases): - - def setUp(self): - self.temp_dir = tempfile.mkdtemp("fstest") - self.fs = osfs.OSFS(self.temp_dir) - - def tearDown(self): - shutil.rmtree(self.temp_dir) - - def check(self, p): - return os.path.exists(os.path.join(self.temp_dir, makerelative(p))) - -#try: -# import xattr -#except ImportError: -# pass -#else: -# TestOSFS.__bases__ = TestOSFS.__bases__ + (XAttrTestCases,) - - - -class TestSubFS(unittest.TestCase,FSTestCases): - - def setUp(self): - self.temp_dir = tempfile.mkdtemp("fstest") - self.parent_fs = osfs.OSFS(self.temp_dir) - self.parent_fs.makedir("foo/bar", recursive=True) - self.fs = self.parent_fs.opendir("foo/bar") - - def tearDown(self): - shutil.rmtree(self.temp_dir) - - def check(self, p): - p = os.path.join("foo/bar", makerelative(p)) - full_p = os.path.join(self.temp_dir, p) - return os.path.exists(full_p) - - -import memoryfs -class TestMemoryFS(unittest.TestCase,FSTestCases): - - def setUp(self): - self.fs = memoryfs.MemoryFS() - - -import mountfs -class TestMountFS(unittest.TestCase,FSTestCases): - - def setUp(self): - self.mount_fs = mountfs.MountFS() - self.mem_fs = memoryfs.MemoryFS() - self.mount_fs.mountdir("mounted/memfs", self.mem_fs) - self.fs = self.mount_fs.opendir("mounted/memfs") - - def tearDown(self): - pass - - def check(self, p): - return self.mount_fs.exists(os.path.join("mounted/memfs", makerelative(p))) - - -import tempfs -class TestTempFS(unittest.TestCase,FSTestCases): - - def setUp(self): - self.fs = tempfs.TempFS() - - def tearDown(self): - td = self.fs._temp_dir - self.fs.close() - self.assert_(not os.path.exists(td)) - - def check(self, p): - td = self.fs._temp_dir - return os.path.exists(os.path.join(td, makerelative(p))) - - -import s3fs -class TestS3FS(unittest.TestCase,FSTestCases): - - bucket = "test-s3fs.rfk.id.au" - - def setUp(self): - import nose - raise nose.SkipTest - self.fs = s3fs.S3FS(self.bucket,"/unittest/files") - self._clear() - - def _clear(self): - for (path,files) in self.fs.walk(search="depth"): - for fn in files: - self.fs.remove(pathjoin(path,fn)) - if path and path != "/": - self.fs.removedir(path) - - def tearDown(self): - self._clear() - for k in self.fs._s3bukt.list(): - self.fs._s3bukt.delete_key(k) - self.fs._s3conn.delete_bucket(self.bucket) - - -import rpcfs -from fs.expose.xmlrpc import RPCFSServer -class TestRPCFS(unittest.TestCase,FSTestCases): - - def setUp(self): - self.port = 8000 - self.server = None - while not self.server: - try: - self.server = RPCFSServer(tempfs.TempFS(),("localhost",self.port),logRequests=False) - except socket.error, e: - if e.args[1] == "Address already in use": - self.port += 1 - else: - raise e - self.server_thread = threading.Thread(target=self._run_server) - self.server_thread.start() - self.fs = rpcfs.RPCFS("http://localhost:" + str(self.port)) - - def _run_server(self): - """Run the server, swallowing shutdown-related execptions.""" - try: - self.server.serve_forever() - except: - pass - - def tearDown(self): - try: - # Shut the server down. We send one final request to - # bump the socket and make it recognise the shutdown. - self.server.serve_more_requests = False - self.server.server_close() - self.fs.exists("/") - except Exception: - pass - - -import sftpfs -from fs.expose.sftp import BaseSFTPServer -class TestSFTPFS(unittest.TestCase,FSTestCases): - - def setUp(self): - self.temp_fs = tempfs.TempFS() - self.port = 8000 - self.server = None - while not self.server: - try: - self.server = BaseSFTPServer(("localhost",self.port),self.temp_fs) - except socket.error, e: - if e.args[1] == "Address already in use": - self.port += 1 - else: - raise - self.server_thread = threading.Thread(target=self._run_server) - self.server_thread.start() - self.fs = sftpfs.SFTPFS(("localhost",self.port)) - - def _run_server(self): - """Run the server, swallowing shutdown-related execptions.""" - try: - self.server.serve_forever() - except: - pass - - def tearDown(self): - try: - self.server.shutdown() - self.fs.exists("/") - except Exception: - pass - self.temp_fs.close() - - -#################### - -from fs.wrappers.xattr import ensure_xattr -class TestXAttr(unittest.TestCase,FSTestCases,XAttrTestCases): - - def setUp(self): - self.fs = ensure_xattr(tempfs.TempFS()) - - def tearDown(self): - td = self.fs._temp_dir - self.fs.close() - self.assert_(not os.path.exists(td)) - - def check(self, p): - td = self.fs._temp_dir - return os.path.exists(os.path.join(td, makerelative(p))) - - -#################### - - -import zipfs -import random -import zipfile -class TestReadZipFS(unittest.TestCase): - - def setUp(self): - self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip" - self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename) - - self.zf = zipfile.ZipFile(self.temp_filename, "w") - zf = self.zf - zf.writestr("a.txt", "Hello, World!") - zf.writestr("b.txt", "b") - zf.writestr("1.txt", "1") - zf.writestr("foo/bar/baz.txt", "baz") - zf.writestr("foo/second.txt", "hai") - zf.close() - self.fs = zipfs.ZipFS(self.temp_filename, "r") - - def tearDown(self): - self.fs.close() - os.remove(self.temp_filename) - - def check(self, p): - try: - self.zipfile.getinfo(p) - return True - except: - return False - - def test_reads(self): - def read_contents(path): - f = self.fs.open(path) - contents = f.read() - return contents - def check_contents(path, expected): - self.assert_(read_contents(path)==expected) - check_contents("a.txt", "Hello, World!") - check_contents("1.txt", "1") - check_contents("foo/bar/baz.txt", "baz") - - def test_getcontents(self): - def read_contents(path): - return self.fs.getcontents(path) - def check_contents(path, expected): - self.assert_(read_contents(path)==expected) - check_contents("a.txt", "Hello, World!") - check_contents("1.txt", "1") - check_contents("foo/bar/baz.txt", "baz") - - def test_is(self): - self.assert_(self.fs.isfile('a.txt')) - self.assert_(self.fs.isfile('1.txt')) - self.assert_(self.fs.isfile('foo/bar/baz.txt')) - self.assert_(self.fs.isdir('foo')) - self.assert_(self.fs.isdir('foo/bar')) - self.assert_(self.fs.exists('a.txt')) - self.assert_(self.fs.exists('1.txt')) - self.assert_(self.fs.exists('foo/bar/baz.txt')) - self.assert_(self.fs.exists('foo')) - self.assert_(self.fs.exists('foo/bar')) - - def test_listdir(self): - - def check_listing(path, expected): - dir_list = self.fs.listdir(path) - self.assert_(sorted(dir_list) == sorted(expected)) - check_listing('/', ['a.txt', '1.txt', 'foo', 'b.txt']) - check_listing('foo', ['second.txt', 'bar']) - check_listing('foo/bar', ['baz.txt']) - - -class TestWriteZipFS(unittest.TestCase): - - def setUp(self): - self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip" - self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename) - - zip_fs = zipfs.ZipFS(self.temp_filename, 'w') - - def makefile(filename, contents): - if dirname(filename): - zip_fs.makedir(dirname(filename), recursive=True, allow_recreate=True) - f = zip_fs.open(filename, 'w') - f.write(contents) - f.close() - - makefile("a.txt", "Hello, World!") - makefile("b.txt", "b") - makefile("foo/bar/baz.txt", "baz") - makefile("foo/second.txt", "hai") - - zip_fs.close() - - def tearDown(self): - os.remove(self.temp_filename) - - def test_valid(self): - zf = zipfile.ZipFile(self.temp_filename, "r") - self.assert_(zf.testzip() is None) - zf.close() - - def test_creation(self): - zf = zipfile.ZipFile(self.temp_filename, "r") - def check_contents(filename, contents): - zcontents = zf.read(filename) - self.assertEqual(contents, zcontents) - check_contents("a.txt", "Hello, World!") - check_contents("b.txt", "b") - check_contents("foo/bar/baz.txt", "baz") - check_contents("foo/second.txt", "hai") - - -class TestAppendZipFS(TestWriteZipFS): - - def setUp(self): - self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip" - self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename) - - zip_fs = zipfs.ZipFS(self.temp_filename, 'w') - - def makefile(filename, contents): - if dirname(filename): - zip_fs.makedir(dirname(filename), recursive=True, allow_recreate=True) - f = zip_fs.open(filename, 'w') - f.write(contents) - f.close() - - makefile("a.txt", "Hello, World!") - makefile("b.txt", "b") - - zip_fs.close() - zip_fs = zipfs.ZipFS(self.temp_filename, 'a') - - makefile("foo/bar/baz.txt", "baz") - makefile("foo/second.txt", "hai") - - zip_fs.close() - - - -if __name__ == "__main__": - import nose - nose.main() - diff --git a/fs/tests/__init__.py b/fs/tests/__init__.py new file mode 100644 index 0000000..3d2c309 --- /dev/null +++ b/fs/tests/__init__.py @@ -0,0 +1,443 @@ +#!/usr/bin/env python +""" + + fs.tests: testcases for the fs module + +""" + +# Send any output from the logging module to stdout, so it will +# be captured by nose and reported appropriately +import sys +import logging +logging.basicConfig(level=logging.ERROR,stream=sys.stdout) + +from fs.base import * + +import pickle + + + +class FSTestCases: + """Base suite of testcases for filesystem implementations. + + Any FS subclass should be capable of passing all of these tests. + To apply the tests to your own FS implementation, simply use FSTestCase + as a mixin for your own unittest.TestCase subclass and have the setUp + method set self.fs to an instance of your FS implementation. + + This class is designed as a mixin so that it's not detected by test + loading tools such as nose. + """ + + def check(self, p): + """Check that a file exists within self.fs""" + return self.fs.exists(p) + + def test_root_dir(self): + self.assertTrue(self.fs.isdir("")) + self.assertTrue(self.fs.isdir("/")) + + def test_debug(self): + str(self.fs) + repr(self.fs) + self.assert_(hasattr(self.fs, 'desc')) + + def test_writefile(self): + self.assertRaises(ResourceNotFoundError,self.fs.open,"test1.txt") + f = self.fs.open("test1.txt","w") + f.write("testing") + f.close() + f = self.fs.open("test1.txt","r") + self.assertEquals(f.read(),"testing") + + def test_isdir_isfile(self): + self.assertFalse(self.fs.exists("dir1")) + self.assertFalse(self.fs.isdir("dir1")) + self.assertFalse(self.fs.isfile("a.txt")) + self.fs.createfile("a.txt") + self.assertFalse(self.fs.isdir("dir1")) + self.assertTrue(self.fs.exists("a.txt")) + self.assertTrue(self.fs.isfile("a.txt")) + self.fs.makedir("dir1") + self.assertTrue(self.fs.isdir("dir1")) + self.assertTrue(self.fs.exists("dir1")) + self.assertTrue(self.fs.exists("a.txt")) + self.fs.remove("a.txt") + self.assertFalse(self.fs.exists("a.txt")) + + def test_listdir(self): + self.fs.createfile("a") + self.fs.createfile("b") + self.fs.createfile("foo") + self.fs.createfile("bar") + # Test listing of the root directory + d1 = self.fs.listdir() + self.assertEqual(len(d1), 4) + self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"]) + d1 = self.fs.listdir("") + self.assertEqual(len(d1), 4) + self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"]) + d1 = self.fs.listdir("/") + self.assertEqual(len(d1), 4) + # Test listing absolute paths + d2 = self.fs.listdir(absolute=True) + self.assertEqual(len(d2), 4) + self.assertEqual(sorted(d2), ["/a", "/b", "/bar", "/foo"]) + # Create some deeper subdirectories, to make sure their + # contents are not inadvertantly included + self.fs.makedir("p/1/2/3",recursive=True) + self.fs.createfile("p/1/2/3/a") + self.fs.createfile("p/1/2/3/b") + self.fs.createfile("p/1/2/3/foo") + self.fs.createfile("p/1/2/3/bar") + self.fs.makedir("q") + # Test listing just files, just dirs, and wildcards + dirs_only = self.fs.listdir(dirs_only=True) + files_only = self.fs.listdir(files_only=True) + contains_a = self.fs.listdir(wildcard="*a*") + self.assertEqual(sorted(dirs_only), ["p", "q"]) + self.assertEqual(sorted(files_only), ["a", "b", "bar", "foo"]) + self.assertEqual(sorted(contains_a), ["a", "bar"]) + # Test listing a subdirectory + d3 = self.fs.listdir("p/1/2/3") + self.assertEqual(len(d3), 4) + self.assertEqual(sorted(d3), ["a", "b", "bar", "foo"]) + # Test listing a subdirectory with absoliute and full paths + d4 = self.fs.listdir("p/1/2/3", absolute=True) + self.assertEqual(len(d4), 4) + self.assertEqual(sorted(d4), ["/p/1/2/3/a", "/p/1/2/3/b", "/p/1/2/3/bar", "/p/1/2/3/foo"]) + d4 = self.fs.listdir("p/1/2/3", full=True) + self.assertEqual(len(d4), 4) + self.assertEqual(sorted(d4), ["p/1/2/3/a", "p/1/2/3/b", "p/1/2/3/bar", "p/1/2/3/foo"]) + # Test that appropriate errors are raised + self.assertRaises(ResourceNotFoundError,self.fs.listdir,"zebra") + self.assertRaises(ResourceInvalidError,self.fs.listdir,"foo") + + def test_makedir(self): + check = self.check + self.fs.makedir("a") + self.assertTrue(check("a")) + self.assertRaises(ParentDirectoryMissingError,self.fs.makedir,"a/b/c") + self.fs.makedir("a/b/c", recursive=True) + self.assert_(check("a/b/c")) + self.fs.makedir("foo/bar/baz", recursive=True) + self.assert_(check("foo/bar/baz")) + self.fs.makedir("a/b/child") + self.assert_(check("a/b/child")) + self.assertRaises(DestinationExistsError,self.fs.makedir,"/a/b") + self.fs.makedir("/a/b",allow_recreate=True) + self.fs.createfile("/a/file") + self.assertRaises(ResourceInvalidError,self.fs.makedir,"a/file") + + def test_remove(self): + self.fs.createfile("a.txt") + self.assertTrue(self.check("a.txt")) + self.fs.remove("a.txt") + self.assertFalse(self.check("a.txt")) + self.assertRaises(ResourceNotFoundError,self.fs.remove,"a.txt") + self.fs.makedir("dir1") + self.assertRaises(ResourceInvalidError,self.fs.remove,"dir1") + self.fs.createfile("/dir1/a.txt") + self.assertTrue(self.check("dir1/a.txt")) + self.fs.remove("dir1/a.txt") + self.assertFalse(self.check("/dir1/a.txt")) + + def test_removedir(self): + check = self.check + self.fs.makedir("a") + self.assert_(check("a")) + self.fs.removedir("a") + self.assert_(not check("a")) + self.fs.makedir("a/b/c/d", recursive=True) + self.assertRaises(DirectoryNotEmptyError, self.fs.removedir, "a/b") + self.fs.removedir("a/b/c/d") + self.assert_(not check("a/b/c/d")) + self.fs.removedir("a/b/c") + self.assert_(not check("a/b/c")) + self.fs.removedir("a/b") + self.assert_(not check("a/b")) + # Test recursive removal of empty parent dirs + self.fs.makedir("foo/bar/baz", recursive=True) + self.fs.removedir("foo/bar/baz", recursive=True) + self.assert_(not check("foo/bar/baz")) + self.assert_(not check("foo/bar")) + self.assert_(not check("foo")) + # Ensure that force=True works as expected + self.fs.makedir("frollic/waggle", recursive=True) + self.fs.createfile("frollic/waddle.txt","waddlewaddlewaddle") + self.assertRaises(DirectoryNotEmptyError,self.fs.removedir,"frollic") + self.assertRaises(ResourceInvalidError,self.fs.removedir,"frollic/waddle.txt") + self.fs.removedir("frollic",force=True) + self.assert_(not check("frollic")) + + def test_rename(self): + check = self.check + self.fs.open("foo.txt", 'wt').write("Hello, World!") + self.assert_(check("foo.txt")) + self.fs.rename("foo.txt", "bar.txt") + self.assert_(check("bar.txt")) + self.assert_(not check("foo.txt")) + + def test_info(self): + test_str = "Hello, World!" + self.fs.createfile("info.txt",test_str) + info = self.fs.getinfo("info.txt") + self.assertEqual(info['size'], len(test_str)) + self.fs.desc("info.txt") + + def test_getsize(self): + test_str = "*"*23 + self.fs.createfile("info.txt",test_str) + size = self.fs.getsize("info.txt") + self.assertEqual(size, len(test_str)) + + def test_movefile(self): + check = self.check + contents = "If the implementation is hard to explain, it's a bad idea." + def makefile(path): + self.fs.createfile(path,contents) + def checkcontents(path): + check_contents = self.fs.getcontents(path) + self.assertEqual(check_contents,contents) + return contents == check_contents + + self.fs.makedir("foo/bar", recursive=True) + makefile("foo/bar/a.txt") + self.assert_(check("foo/bar/a.txt")) + self.assert_(checkcontents("foo/bar/a.txt")) + self.fs.move("foo/bar/a.txt", "foo/b.txt") + self.assert_(not check("foo/bar/a.txt")) + self.assert_(check("foo/b.txt")) + self.assert_(checkcontents("foo/b.txt")) + + self.fs.move("foo/b.txt", "c.txt") + self.assert_(not check("foo/b.txt")) + self.assert_(check("/c.txt")) + self.assert_(checkcontents("/c.txt")) + + makefile("foo/bar/a.txt") + self.assertRaises(DestinationExistsError,self.fs.move,"foo/bar/a.txt","/c.txt") + self.assert_(check("foo/bar/a.txt")) + self.assert_(check("/c.txt")) + self.fs.move("foo/bar/a.txt","/c.txt",overwrite=True) + self.assert_(not check("foo/bar/a.txt")) + self.assert_(check("/c.txt")) + + + def test_movedir(self): + check = self.check + contents = "If the implementation is hard to explain, it's a bad idea." + def makefile(path): + self.fs.createfile(path,contents) + + self.fs.makedir("a") + self.fs.makedir("b") + makefile("a/1.txt") + makefile("a/2.txt") + makefile("a/3.txt") + self.fs.makedir("a/foo/bar", recursive=True) + makefile("a/foo/bar/baz.txt") + + self.fs.movedir("a", "copy of a") + + self.assert_(check("copy of a/1.txt")) + self.assert_(check("copy of a/2.txt")) + self.assert_(check("copy of a/3.txt")) + self.assert_(check("copy of a/foo/bar/baz.txt")) + + self.assert_(not check("a/1.txt")) + self.assert_(not check("a/2.txt")) + self.assert_(not check("a/3.txt")) + self.assert_(not check("a/foo/bar/baz.txt")) + self.assert_(not check("a/foo/bar")) + self.assert_(not check("a/foo")) + self.assert_(not check("a")) + + self.fs.makedir("a") + print self.fs.listdir("a") + self.assertRaises(DestinationExistsError,self.fs.movedir,"copy of a","a") + self.fs.movedir("copy of a","a",overwrite=True) + self.assert_(not check("copy of a")) + print self.fs.listdir("a") + self.assert_(check("a/1.txt")) + self.assert_(check("a/2.txt")) + self.assert_(check("a/3.txt")) + self.assert_(check("a/foo/bar/baz.txt")) + + + def test_copyfile(self): + check = self.check + contents = "If the implementation is hard to explain, it's a bad idea." + def makefile(path,contents=contents): + self.fs.createfile(path,contents) + def checkcontents(path,contents=contents): + check_contents = self.fs.getcontents(path) + self.assertEqual(check_contents,contents) + return contents == check_contents + + self.fs.makedir("foo/bar", recursive=True) + makefile("foo/bar/a.txt") + self.assert_(check("foo/bar/a.txt")) + self.assert_(checkcontents("foo/bar/a.txt")) + self.fs.copy("foo/bar/a.txt", "foo/b.txt") + self.assert_(check("foo/bar/a.txt")) + self.assert_(check("foo/b.txt")) + self.assert_(checkcontents("foo/b.txt")) + + self.fs.copy("foo/b.txt", "c.txt") + self.assert_(check("foo/b.txt")) + self.assert_(check("/c.txt")) + self.assert_(checkcontents("/c.txt")) + + makefile("foo/bar/a.txt","different contents") + self.assertRaises(DestinationExistsError,self.fs.copy,"foo/bar/a.txt","/c.txt") + self.assert_(checkcontents("/c.txt")) + self.fs.copy("foo/bar/a.txt","/c.txt",overwrite=True) + self.assert_(checkcontents("foo/bar/a.txt","different contents")) + self.assert_(checkcontents("/c.txt","different contents")) + + + def test_copydir(self): + check = self.check + contents = "If the implementation is hard to explain, it's a bad idea." + def makefile(path): + self.fs.createfile(path,contents) + def checkcontents(path): + check_contents = self.fs.getcontents(path) + self.assertEqual(check_contents,contents) + return contents == check_contents + + self.fs.makedir("a") + self.fs.makedir("b") + makefile("a/1.txt") + makefile("a/2.txt") + makefile("a/3.txt") + self.fs.makedir("a/foo/bar", recursive=True) + makefile("a/foo/bar/baz.txt") + + self.fs.copydir("a", "copy of a") + self.assert_(check("copy of a/1.txt")) + self.assert_(check("copy of a/2.txt")) + self.assert_(check("copy of a/3.txt")) + self.assert_(check("copy of a/foo/bar/baz.txt")) + checkcontents("copy of a/1.txt") + + self.assert_(check("a/1.txt")) + self.assert_(check("a/2.txt")) + self.assert_(check("a/3.txt")) + self.assert_(check("a/foo/bar/baz.txt")) + checkcontents("a/1.txt") + + self.assertRaises(DestinationExistsError,self.fs.copydir,"a","b") + self.fs.copydir("a","b",overwrite=True) + self.assert_(check("b/1.txt")) + self.assert_(check("b/2.txt")) + self.assert_(check("b/3.txt")) + self.assert_(check("b/foo/bar/baz.txt")) + checkcontents("b/1.txt") + + def test_copydir_with_dotfile(self): + check = self.check + contents = "If the implementation is hard to explain, it's a bad idea." + def makefile(path): + self.fs.createfile(path,contents) + + self.fs.makedir("a") + makefile("a/1.txt") + makefile("a/2.txt") + makefile("a/.hidden.txt") + + self.fs.copydir("a", "copy of a") + self.assert_(check("copy of a/1.txt")) + self.assert_(check("copy of a/2.txt")) + self.assert_(check("copy of a/.hidden.txt")) + + self.assert_(check("a/1.txt")) + self.assert_(check("a/2.txt")) + self.assert_(check("a/.hidden.txt")) + + def test_readwriteappendseek(self): + def checkcontents(path, check_contents): + read_contents = self.fs.getcontents(path) + self.assertEqual(read_contents,check_contents) + return read_contents == check_contents + test_strings = ["Beautiful is better than ugly.", + "Explicit is better than implicit.", + "Simple is better than complex."] + all_strings = "".join(test_strings) + + self.assertRaises(ResourceNotFoundError, self.fs.open, "a.txt", "r") + self.assert_(not self.fs.exists("a.txt")) + f1 = self.fs.open("a.txt", "wb") + pos = 0 + for s in test_strings: + f1.write(s) + pos += len(s) + self.assertEqual(pos, f1.tell()) + f1.close() + self.assert_(self.fs.exists("a.txt")) + self.assert_(checkcontents("a.txt", all_strings)) + + f2 = self.fs.open("b.txt", "wb") + f2.write(test_strings[0]) + f2.close() + self.assert_(checkcontents("b.txt", test_strings[0])) + f3 = self.fs.open("b.txt", "ab") + f3.write(test_strings[1]) + f3.write(test_strings[2]) + f3.close() + self.assert_(checkcontents("b.txt", all_strings)) + f4 = self.fs.open("b.txt", "wb") + f4.write(test_strings[2]) + f4.close() + self.assert_(checkcontents("b.txt", test_strings[2])) + f5 = self.fs.open("c.txt", "wb") + for s in test_strings: + f5.write(s+"\n") + f5.close() + f6 = self.fs.open("c.txt", "rb") + for s, t in zip(f6, test_strings): + self.assertEqual(s, t+"\n") + f6.close() + f7 = self.fs.open("c.txt", "rb") + f7.seek(13) + word = f7.read(6) + self.assertEqual(word, "better") + f7.seek(1, os.SEEK_CUR) + word = f7.read(4) + self.assertEqual(word, "than") + f7.seek(-9, os.SEEK_END) + word = f7.read(7) + self.assertEqual(word, "complex") + f7.close() + self.assertEqual(self.fs.getcontents("a.txt"), all_strings) + + def test_with_statement(self): + # This is a little tricky since 'with' is actually new syntax. + # We use eval() to make this method safe for old python versions. + import sys + if sys.version_info[0] >= 2 and sys.version_info[1] >= 5: + # A successful 'with' statement + contents = "testing the with statement" + code = "from __future__ import with_statement\n" + code += "with self.fs.open('f.txt','w-') as testfile:\n" + code += " testfile.write(contents)\n" + code += "self.assertEquals(self.fs.getcontents('f.txt'),contents)" + code = compile(code,"",'exec') + eval(code) + # A 'with' statement raising an error + contents = "testing the with statement" + code = "from __future__ import with_statement\n" + code += "with self.fs.open('f.txt','w-') as testfile:\n" + code += " testfile.write(contents)\n" + code += " raise ValueError\n" + code = compile(code,"",'exec') + self.assertRaises(ValueError,eval,code,globals(),locals()) + self.assertEquals(self.fs.getcontents('f.txt'),contents) + + def test_pickling(self): + self.fs.createfile("test1","hello world") + oldfs = self.fs + self.fs = pickle.loads(pickle.dumps(self.fs)) + self.assert_(self.fs.isfile("test1")) + diff --git a/fs/tests/test_expose.py b/fs/tests/test_expose.py new file mode 100644 index 0000000..88f5095 --- /dev/null +++ b/fs/tests/test_expose.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +""" + + fs.tests.test_expose: testcases for fs.expose and associated FS classes + +""" + +import unittest +import socket +import threading + +from fs.tests import FSTestCases +from fs.tempfs import TempFS + +from fs import rpcfs +from fs.expose.xmlrpc import RPCFSServer +class TestRPCFS(unittest.TestCase,FSTestCases): + + def makeServer(self,fs,addr): + return RPCFSServer(fs,addr,logRequests=False) + + def startServer(self): + port = 8000 + self.temp_fs = TempFS() + self.server = None + while not self.server: + try: + self.server = self.makeServer(self.temp_fs,("localhost",port)) + except socket.error, e: + if e.args[1] == "Address already in use": + port += 1 + else: + raise + self.server_addr = ("localhost",port) + self.serve_more_requests = True + self.server_thread = threading.Thread(target=self.runServer) + self.server_thread.start() + + def runServer(self): + """Run the server, swallowing shutdown-related execptions.""" + self.server.socket.settimeout(0.1) + try: + while self.serve_more_requests: + self.server.handle_request() + except Exception, e: + pass + + def setUp(self): + self.startServer() + self.fs = rpcfs.RPCFS("http://%s:%d" % self.server_addr) + + def tearDown(self): + self.serve_more_requests = False + try: + self.bump() + self.server.server_close() + except Exception: + pass + self.server_thread.join() + self.temp_fs.close() + + def bump(self): + host, port = self.server_addr + for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): + af, socktype, proto, cn, sa = res + sock = None + try: + sock = socket.socket(af, socktype, proto) + sock.settimeout(1) + sock.connect(sa) + sock.send("\n") + except socket.error, e: + pass + finally: + if sock is not None: + sock.close() + + +from fs import sftpfs +from fs.expose.sftp import BaseSFTPServer +class TestSFTPFS(TestRPCFS): + + def makeServer(self,fs,addr): + return BaseSFTPServer(addr,fs) + + def setUp(self): + self.startServer() + self.fs = sftpfs.SFTPFS(self.server_addr) + + def bump(self): + # paramiko doesn't like being bumped, just wait for it to timeout. + # TODO: do this using a paramiko.Transport() connection + pass + + diff --git a/fs/tests/test_fs.py b/fs/tests/test_fs.py new file mode 100644 index 0000000..49c099f --- /dev/null +++ b/fs/tests/test_fs.py @@ -0,0 +1,88 @@ +""" + + fs.tests.test_fs: testcases for basic FS implementations + +""" + +from fs.tests import FSTestCases + +import unittest + +import os +import shutil +import tempfile + +from fs.path import * + + +from fs import osfs +class TestOSFS(unittest.TestCase,FSTestCases): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp("fstest") + self.fs = osfs.OSFS(self.temp_dir) + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def check(self, p): + return os.path.exists(os.path.join(self.temp_dir, relpath(p))) + + + +class TestSubFS(unittest.TestCase,FSTestCases): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp("fstest") + self.parent_fs = osfs.OSFS(self.temp_dir) + self.parent_fs.makedir("foo/bar", recursive=True) + self.fs = self.parent_fs.opendir("foo/bar") + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def check(self, p): + p = os.path.join("foo/bar", relpath(p)) + full_p = os.path.join(self.temp_dir, p) + return os.path.exists(full_p) + + +from fs import memoryfs +class TestMemoryFS(unittest.TestCase,FSTestCases): + + def setUp(self): + self.fs = memoryfs.MemoryFS() + + +from fs import mountfs +class TestMountFS(unittest.TestCase,FSTestCases): + + def setUp(self): + self.mount_fs = mountfs.MountFS() + self.mem_fs = memoryfs.MemoryFS() + self.mount_fs.mountdir("mounted/memfs", self.mem_fs) + self.fs = self.mount_fs.opendir("mounted/memfs") + + def tearDown(self): + pass + + def check(self, p): + return self.mount_fs.exists(os.path.join("mounted/memfs", relpath(p))) + + +from fs import tempfs +class TestTempFS(unittest.TestCase,FSTestCases): + + def setUp(self): + self.fs = tempfs.TempFS() + + def tearDown(self): + td = self.fs._temp_dir + self.fs.close() + self.assert_(not os.path.exists(td)) + + def check(self, p): + td = self.fs._temp_dir + return os.path.exists(os.path.join(td, relpath(p))) + + diff --git a/fs/tests/test_objecttree.py b/fs/tests/test_objecttree.py new file mode 100644 index 0000000..b8d9a12 --- /dev/null +++ b/fs/tests/test_objecttree.py @@ -0,0 +1,47 @@ +""" + + fs.tests.test_objectree: testcases for the fs objecttree module + +""" + + +import unittest + +import fs.tests +from fs import objecttree + +class TestObjectTree(unittest.TestCase): + """Testcases for the ObjectTree class.""" + + def test_getset(self): + ot = objecttree.ObjectTree() + ot['foo'] = "bar" + self.assertEqual(ot['foo'], 'bar') + + ot = objecttree.ObjectTree() + ot['foo/bar'] = "baz" + self.assertEqual(ot['foo'], {'bar':'baz'}) + self.assertEqual(ot['foo/bar'], 'baz') + + del ot['foo/bar'] + self.assertEqual(ot['foo'], {}) + + ot = objecttree.ObjectTree() + ot['a/b/c'] = "A" + ot['a/b/d'] = "B" + ot['a/b/e'] = "C" + ot['a/b/f'] = "D" + self.assertEqual(sorted(ot['a/b'].values()), ['A', 'B', 'C', 'D']) + self.assert_(ot.get('a/b/x', -1) == -1) + + self.assert_('a/b/c' in ot) + self.assert_('a/b/x' not in ot) + self.assert_(ot.isobject('a/b/c')) + self.assert_(ot.isobject('a/b/d')) + self.assert_(not ot.isobject('a/b')) + + left, object, right = ot.partialget('a/b/e/f/g') + self.assertEqual(left, "a/b/e") + self.assertEqual(object, "C") + self.assertEqual(right, "f/g") + diff --git a/fs/tests/test_path.py b/fs/tests/test_path.py new file mode 100644 index 0000000..35c9185 --- /dev/null +++ b/fs/tests/test_path.py @@ -0,0 +1,98 @@ +""" + + fs.tests.test_path: testcases for the fs path functions + +""" + + +import unittest +import fs.tests + +from fs.path import * + +class TestPathFunctions(unittest.TestCase): + """Testcases for FS path functions.""" + + def test_normpath(self): + tests = [ ("\\a\\b\\c", "/a/b/c"), + ("", ""), + ("/a/b/c", "/a/b/c"), + ("a/b/c", "a/b/c"), + ("a/b/../c/", "a/c"), + ("/","/"), + ] + for path, result in tests: + self.assertEqual(normpath(path), result) + + def test_pathjoin(self): + tests = [ ("", "a", "a"), + ("a", "a", "a/a"), + ("a/b", "../c", "a/c"), + ("a/b/../c", "d", "a/c/d"), + ("/a/b/c", "d", "/a/b/c/d"), + ("/a/b/c", "../../../d", "/d"), + ("a", "b", "c", "a/b/c"), + ("a/b/c", "../d", "c", "a/b/d/c"), + ("a/b/c", "../d", "/a", "/a"), + ("aaa", "bbb/ccc", "aaa/bbb/ccc"), + ("aaa", "bbb\ccc", "aaa/bbb/ccc"), + ("aaa", "bbb", "ccc", "/aaa", "eee", "/aaa/eee"), + ("a/b", "./d", "e", "a/b/d/e"), + ("/", "/", "/"), + ("/", "", "/"), + ] + for testpaths in tests: + paths = testpaths[:-1] + result = testpaths[-1] + self.assertEqual(fs.pathjoin(*paths), result) + + self.assertRaises(ValueError, fs.pathjoin, "../") + self.assertRaises(ValueError, fs.pathjoin, "./../") + self.assertRaises(ValueError, fs.pathjoin, "a/b", "../../..") + self.assertRaises(ValueError, fs.pathjoin, "a/b/../../../d") + + def test_relpath(self): + tests = [ ("/a/b", "a/b"), + ("a/b", "a/b"), + ("/", "") ] + + for path, result in tests: + print path, result + self.assertEqual(fs.relpath(path), result) + + def test_abspath(self): + tests = [ ("/a/b", "/a/b"), + ("a/b", "/a/b"), + ("/", "/") ] + + for path, result in tests: + self.assertEqual(fs.abspath(path), result) + + def test_iteratepath(self): + tests = [ ("a/b", ["a", "b"]), + ("", [] ), + ("aaa/bbb/ccc", ["aaa", "bbb", "ccc"]), + ("a/b/c/../d", ["a", "b", "d"]) ] + + for path, results in tests: + print repr(path), results + for path_component, expected in zip(iteratepath(path), results): + self.assertEqual(path_component, expected) + + self.assertEqual(list(iteratepath("a/b/c/d", 1)), ["a", "b/c/d"]) + self.assertEqual(list(iteratepath("a/b/c/d", 2)), ["a", "b", "c/d"]) + + def test_pathsplit(self): + tests = [ ("a/b", ("a", "b")), + ("a/b/c", ("a/b", "c")), + ("a", ("", "a")), + ("", ("", "")), + ("/", ("", "")), + ("foo/bar", ("foo", "bar")), + ("foo/bar/baz", ("foo/bar", "baz")), + ] + for path, result in tests: + self.assertEqual(fs.pathsplit(path), result) + + + diff --git a/fs/tests/test_s3fs.py b/fs/tests/test_s3fs.py new file mode 100644 index 0000000..49ee51d --- /dev/null +++ b/fs/tests/test_s3fs.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +""" + + fs.tests.test_s3fs: testcases for the S3FS module + +These tests are set up to be skipped by default, since they're very slow, +require a valid AWS account, and cost money. You'll have to set the '__test__' +attribute the True on te TestS3FS class to get them running. + +""" + +import unittest + +from fs.tests import FSTestCases +from fs.path import * + +from fs import s3fs +class TestS3FS(unittest.TestCase,FSTestCases): + + # Disable the tests by default + __test__ = False + + bucket = "test-s3fs.rfk.id.au" + + def setUp(self): + self.fs = s3fs.S3FS(self.bucket,"/unittest/files") + self._clear() + + def _clear(self): + for (path,files) in self.fs.walk(search="depth"): + for fn in files: + self.fs.remove(pathjoin(path,fn)) + if path and path != "/": + self.fs.removedir(path) + + def tearDown(self): + self._clear() + for k in self.fs._s3bukt.list(): + self.fs._s3bukt.delete_key(k) + self.fs._s3conn.delete_bucket(self.bucket) + diff --git a/fs/tests/test_xattr.py b/fs/tests/test_xattr.py new file mode 100644 index 0000000..ca6e4d8 --- /dev/null +++ b/fs/tests/test_xattr.py @@ -0,0 +1,116 @@ +""" + + fs.tests.test_xattr: testcases for extended attribute support + +""" + +import unittest +import os + +from fs.path import * +from fs.errors import * +from fs.tests import FSTestCases + + +class XAttrTestCases: + """Testcases for filesystems providing extended attribute support. + + This class should be used as a mixin to the unittest.TestCase class + for filesystems that provide extended attribute support. + """ + + def test_getsetdel(self): + def do_getsetdel(p): + self.assertEqual(self.fs.getxattr(p,"xattr1"),None) + self.fs.setxattr(p,"xattr1","value1") + self.assertEqual(self.fs.getxattr(p,"xattr1"),"value1") + self.fs.delxattr(p,"xattr1") + self.assertEqual(self.fs.getxattr(p,"xattr1"),None) + self.fs.createfile("test.txt","hello") + do_getsetdel("test.txt") + self.assertRaises(ResourceNotFoundError,self.fs.getxattr,"test2.txt","xattr1") + self.fs.makedir("mystuff") + self.fs.createfile("/mystuff/test.txt","") + do_getsetdel("mystuff") + do_getsetdel("mystuff/test.txt") + + def test_list_xattrs(self): + def do_list(p): + self.assertEquals(sorted(self.fs.xattrs(p)),[]) + self.fs.setxattr(p,"xattr1","value1") + self.assertEquals(sorted(self.fs.xattrs(p)),["xattr1"]) + self.fs.setxattr(p,"attr2","value2") + self.assertEquals(sorted(self.fs.xattrs(p)),["attr2","xattr1"]) + self.fs.delxattr(p,"xattr1") + self.assertEquals(sorted(self.fs.xattrs(p)),["attr2"]) + self.fs.delxattr(p,"attr2") + self.assertEquals(sorted(self.fs.xattrs(p)),[]) + self.fs.createfile("test.txt","hello") + do_list("test.txt") + self.fs.makedir("mystuff") + self.fs.createfile("/mystuff/test.txt","") + do_list("mystuff") + do_list("mystuff/test.txt") + + def test_copy_xattrs(self): + self.fs.createfile("a.txt","content") + self.fs.setxattr("a.txt","myattr","myvalue") + self.fs.setxattr("a.txt","testattr","testvalue") + self.fs.makedir("stuff") + self.fs.copy("a.txt","stuff/a.txt") + self.assertTrue(self.fs.exists("stuff/a.txt")) + self.assertEquals(self.fs.getxattr("stuff/a.txt","myattr"),"myvalue") + self.assertEquals(self.fs.getxattr("stuff/a.txt","testattr"),"testvalue") + self.assertEquals(self.fs.getxattr("a.txt","myattr"),"myvalue") + self.assertEquals(self.fs.getxattr("a.txt","testattr"),"testvalue") + self.fs.setxattr("stuff","dirattr","a directory") + self.fs.copydir("stuff","stuff2") + self.assertEquals(self.fs.getxattr("stuff2/a.txt","myattr"),"myvalue") + self.assertEquals(self.fs.getxattr("stuff2/a.txt","testattr"),"testvalue") + self.assertEquals(self.fs.getxattr("stuff2","dirattr"),"a directory") + self.assertEquals(self.fs.getxattr("stuff","dirattr"),"a directory") + + def test_move_xattrs(self): + self.fs.createfile("a.txt","content") + self.fs.setxattr("a.txt","myattr","myvalue") + self.fs.setxattr("a.txt","testattr","testvalue") + self.fs.makedir("stuff") + self.fs.move("a.txt","stuff/a.txt") + self.assertTrue(self.fs.exists("stuff/a.txt")) + self.assertEquals(self.fs.getxattr("stuff/a.txt","myattr"),"myvalue") + self.assertEquals(self.fs.getxattr("stuff/a.txt","testattr"),"testvalue") + self.fs.setxattr("stuff","dirattr","a directory") + self.fs.movedir("stuff","stuff2") + self.assertEquals(self.fs.getxattr("stuff2/a.txt","myattr"),"myvalue") + self.assertEquals(self.fs.getxattr("stuff2/a.txt","testattr"),"testvalue") + self.assertEquals(self.fs.getxattr("stuff2","dirattr"),"a directory") + + + +from fs.wrappers.xattr import ensure_xattr + +from fs import tempfs +class TestXAttr_TempFS(unittest.TestCase,FSTestCases,XAttrTestCases): + + def setUp(self): + self.fs = ensure_xattr(tempfs.TempFS()) + + def tearDown(self): + td = self.fs._temp_dir + self.fs.close() + self.assert_(not os.path.exists(td)) + + def check(self, p): + td = self.fs._temp_dir + return os.path.exists(os.path.join(td, relpath(p))) + + +from fs import memoryfs +class TestXAttr_MemoryFS(unittest.TestCase,FSTestCases,XAttrTestCases): + + def setUp(self): + self.fs = ensure_xattr(memoryfs.MemoryFS()) + + def check(self, p): + return self.fs.exists(p) + diff --git a/fs/tests/test_zipfs.py b/fs/tests/test_zipfs.py new file mode 100644 index 0000000..fec01a6 --- /dev/null +++ b/fs/tests/test_zipfs.py @@ -0,0 +1,153 @@ +""" + + fs.tests.test_zipfs: testcases for the ZipFS class + +""" + +import unittest +import os +import random +import zipfile +import tempfile + +import fs.tests +from fs.path import * + + +from fs import zipfs +class TestReadZipFS(unittest.TestCase): + + def setUp(self): + self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip" + self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename) + + self.zf = zipfile.ZipFile(self.temp_filename, "w") + zf = self.zf + zf.writestr("a.txt", "Hello, World!") + zf.writestr("b.txt", "b") + zf.writestr("1.txt", "1") + zf.writestr("foo/bar/baz.txt", "baz") + zf.writestr("foo/second.txt", "hai") + zf.close() + self.fs = zipfs.ZipFS(self.temp_filename, "r") + + def tearDown(self): + self.fs.close() + os.remove(self.temp_filename) + + def check(self, p): + try: + self.zipfile.getinfo(p) + return True + except: + return False + + def test_reads(self): + def read_contents(path): + f = self.fs.open(path) + contents = f.read() + return contents + def check_contents(path, expected): + self.assert_(read_contents(path)==expected) + check_contents("a.txt", "Hello, World!") + check_contents("1.txt", "1") + check_contents("foo/bar/baz.txt", "baz") + + def test_getcontents(self): + def read_contents(path): + return self.fs.getcontents(path) + def check_contents(path, expected): + self.assert_(read_contents(path)==expected) + check_contents("a.txt", "Hello, World!") + check_contents("1.txt", "1") + check_contents("foo/bar/baz.txt", "baz") + + def test_is(self): + self.assert_(self.fs.isfile('a.txt')) + self.assert_(self.fs.isfile('1.txt')) + self.assert_(self.fs.isfile('foo/bar/baz.txt')) + self.assert_(self.fs.isdir('foo')) + self.assert_(self.fs.isdir('foo/bar')) + self.assert_(self.fs.exists('a.txt')) + self.assert_(self.fs.exists('1.txt')) + self.assert_(self.fs.exists('foo/bar/baz.txt')) + self.assert_(self.fs.exists('foo')) + self.assert_(self.fs.exists('foo/bar')) + + def test_listdir(self): + + def check_listing(path, expected): + dir_list = self.fs.listdir(path) + self.assert_(sorted(dir_list) == sorted(expected)) + check_listing('/', ['a.txt', '1.txt', 'foo', 'b.txt']) + check_listing('foo', ['second.txt', 'bar']) + check_listing('foo/bar', ['baz.txt']) + + +class TestWriteZipFS(unittest.TestCase): + + def setUp(self): + self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip" + self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename) + + zip_fs = zipfs.ZipFS(self.temp_filename, 'w') + + def makefile(filename, contents): + if dirname(filename): + zip_fs.makedir(dirname(filename), recursive=True, allow_recreate=True) + f = zip_fs.open(filename, 'w') + f.write(contents) + f.close() + + makefile("a.txt", "Hello, World!") + makefile("b.txt", "b") + makefile("foo/bar/baz.txt", "baz") + makefile("foo/second.txt", "hai") + + zip_fs.close() + + def tearDown(self): + os.remove(self.temp_filename) + + def test_valid(self): + zf = zipfile.ZipFile(self.temp_filename, "r") + self.assert_(zf.testzip() is None) + zf.close() + + def test_creation(self): + zf = zipfile.ZipFile(self.temp_filename, "r") + def check_contents(filename, contents): + zcontents = zf.read(filename) + self.assertEqual(contents, zcontents) + check_contents("a.txt", "Hello, World!") + check_contents("b.txt", "b") + check_contents("foo/bar/baz.txt", "baz") + check_contents("foo/second.txt", "hai") + + +class TestAppendZipFS(TestWriteZipFS): + + def setUp(self): + self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip" + self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename) + + zip_fs = zipfs.ZipFS(self.temp_filename, 'w') + + def makefile(filename, contents): + if dirname(filename): + zip_fs.makedir(dirname(filename), recursive=True, allow_recreate=True) + f = zip_fs.open(filename, 'w') + f.write(contents) + f.close() + + makefile("a.txt", "Hello, World!") + makefile("b.txt", "b") + + zip_fs.close() + zip_fs = zipfs.ZipFS(self.temp_filename, 'a') + + makefile("foo/bar/baz.txt", "baz") + makefile("foo/second.txt", "hai") + + zip_fs.close() + diff --git a/fs/wrappers/hidedotfiles.py b/fs/wrappers/hidedotfiles.py index 6203410..527a2d7 100644 --- a/fs/wrappers/hidedotfiles.py +++ b/fs/wrappers/hidedotfiles.py @@ -4,7 +4,7 @@ """ -from fs.helpers import * +from fs.path import * from fs.errors import * from fs.wrappers import FSWrapper @@ -19,7 +19,7 @@ class HideDotFiles(FSWrapper): def is_hidden(self,path): """Check whether the given path should be hidden.""" - return path and resourcename(path)[0] == "." + return path and basename(path)[0] == "." def _encode(self,path): return path diff --git a/fs/wrappers/xattr.py b/fs/wrappers/xattr.py index a3bea2b..524477f 100644 --- a/fs/wrappers/xattr.py +++ b/fs/wrappers/xattr.py @@ -16,7 +16,7 @@ try: except ImportError: import pickle -from fs.helpers import * +from fs.path import * from fs.errors import * from fs.wrappers import FSWrapper diff --git a/fs/zipfs.py b/fs/zipfs.py index 504273a..3da4da4 100644 --- a/fs/zipfs.py +++ b/fs/zipfs.py @@ -1,7 +1,6 @@ #!/usr/bin/env python -from base import * -from helpers import * +from fs.base import * from zipfile import ZipFile, ZIP_DEFLATED, ZIP_STORED from memoryfs import MemoryFS -- cgit v1.2.1