summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2009-05-05 06:05:28 +0000
committerrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2009-05-05 06:05:28 +0000
commitfacbf53c6b075a7e959cc3bd32d7183bd576b3a2 (patch)
tree1fc89d97f729ec7cae64ca03dbe01bc2d5029bd8
parent13753ac8c2eb75a9142b400fd009cce2b2dc628c (diff)
downloadpyfilesystem-facbf53c6b075a7e959cc3bd32d7183bd576b3a2.tar.gz
More tests, especially assertRaises() variants
git-svn-id: http://pyfilesystem.googlecode.com/svn/branches/rfk-ideas@129 67cdc799-7952-0410-af00-57a81ceafa0f
-rw-r--r--fs/tests.py732
1 files changed, 395 insertions, 337 deletions
diff --git a/fs/tests.py b/fs/tests.py
index deb880c..022b7b1 100644
--- a/fs/tests.py
+++ b/fs/tests.py
@@ -1,253 +1,183 @@
#!/usr/bin/env python
+"""
-import unittest
-import base as fs
-from helpers import *
-from helpers import _iteratepath
-import shutil
-
-class TestHelpers(unittest.TestCase):
-
- def test_isabsolutepath(self):
- tests = [ ('', False),
- ('/', True),
- ('/A/B', True),
- ('/asdasd', True),
- ('a/b/c', False),
- ]
- for path, result in tests:
- self.assertEqual(fs.isabsolutepath(path), result)
-
- def test_normpath(self):
- tests = [ ("\\a\\b\\c", "/a/b/c"),
- ("", ""),
- ("/a/b/c", "/a/b/c"),
- ]
- for path, result in tests:
- self.assertEqual(fs.normpath(path), result)
-
- def test_pathjoin(self):
- tests = [ ("", "a", "a"),
- ("a", "a", "a/a"),
- ("a/b", "../c", "a/c"),
- ("a/b/../c", "d", "a/c/d"),
- ("/a/b/c", "d", "/a/b/c/d"),
- ("/a/b/c", "../../../d", "/d"),
- ("a", "b", "c", "a/b/c"),
- ("a/b/c", "../d", "c", "a/b/d/c"),
- ("a/b/c", "../d", "/a", "/a"),
- ("aaa", "bbb/ccc", "aaa/bbb/ccc"),
- ("aaa", "bbb\ccc", "aaa/bbb/ccc"),
- ("aaa", "bbb", "ccc", "/aaa", "eee", "/aaa/eee"),
- ("a/b", "./d", "e", "a/b/d/e"),
- ("/", "/", "/"),
- ("/", "", "/"),
- ]
- for testpaths in tests:
- paths = testpaths[:-1]
- result = testpaths[-1]
- self.assertEqual(fs.pathjoin(*paths), result)
-
- self.assertRaises(ValueError, fs.pathjoin, "../")
- self.assertRaises(ValueError, fs.pathjoin, "./../")
- self.assertRaises(ValueError, fs.pathjoin, "a/b", "../../..")
- self.assertRaises(ValueError, fs.pathjoin, "a/b/../../../d")
-
- def test_makerelative(self):
- tests = [ ("/a/b", "a/b"),
- ("a/b", "a/b"),
- ("/", "") ]
-
- for path, result in tests:
- print path, result
- self.assertEqual(fs.makerelative(path), result)
-
- def test_makeabsolute(self):
- tests = [ ("/a/b", "/a/b"),
- ("a/b", "/a/b"),
- ("/", "/") ]
-
- for path, result in tests:
- self.assertEqual(fs.makeabsolute(path), result)
-
- def test_iteratepath(self):
- tests = [ ("a/b", ["a", "b"]),
- ("", [] ),
- ("aaa/bbb/ccc", ["aaa", "bbb", "ccc"]),
- ("a/b/c/../d", ["a", "b", "d"]) ]
-
- for path, results in tests:
- print repr(path), results
- for path_component, expected in zip(_iteratepath(path), results):
- self.assertEqual(path_component, expected)
-
- self.assertEqual(list(_iteratepath("a/b/c/d", 1)), ["a", "b/c/d"])
- self.assertEqual(list(_iteratepath("a/b/c/d", 2)), ["a", "b", "c/d"])
-
- def test_pathsplit(self):
- tests = [ ("a/b", ("a", "b")),
- ("a/b/c", ("a/b", "c")),
- ("a", ("", "a")),
- ("", ("", "")),
- ("/", ("", "")),
- ("foo/bar", ("foo", "bar")),
- ("foo/bar/baz", ("foo/bar", "baz")),
- ]
- for path, result in tests:
- self.assertEqual(fs.pathsplit(path), result)
-
+ fs.tests: testcases for the fs module
-import objecttree
-
-class TestObjectTree(unittest.TestCase):
+"""
- def test_getset(self):
- ot = objecttree.ObjectTree()
- ot['foo'] = "bar"
- self.assertEqual(ot['foo'], 'bar')
-
- ot = objecttree.ObjectTree()
- ot['foo/bar'] = "baz"
- self.assertEqual(ot['foo'], {'bar':'baz'})
- self.assertEqual(ot['foo/bar'], 'baz')
-
- del ot['foo/bar']
- self.assertEqual(ot['foo'], {})
-
- ot = objecttree.ObjectTree()
- ot['a/b/c'] = "A"
- ot['a/b/d'] = "B"
- ot['a/b/e'] = "C"
- ot['a/b/f'] = "D"
- self.assertEqual(sorted(ot['a/b'].values()), ['A', 'B', 'C', 'D'])
- self.assert_(ot.get('a/b/x', -1) == -1)
-
- self.assert_('a/b/c' in ot)
- self.assert_('a/b/x' not in ot)
- self.assert_(ot.isobject('a/b/c'))
- self.assert_(ot.isobject('a/b/d'))
- self.assert_(not ot.isobject('a/b'))
+import unittest
- left, object, right = ot.partialget('a/b/e/f/g')
- self.assertEqual(left, "a/b/e")
- self.assertEqual(object, "C")
- self.assertEqual(right, "f/g")
+import shutil
+import tempfile
+import pickle
+import base as fs
+from helpers import *
-import tempfile
-import osfs
-import os
-class TestOSFS(unittest.TestCase):
+####################
- def setUp(self):
- self.temp_dir = tempfile.mkdtemp("fstest")
- self.fs = osfs.OSFS(self.temp_dir)
- print "Temp dir is", self.temp_dir
- def tearDown(self):
- shutil.rmtree(self.temp_dir)
+class BaseFSTestCase(unittest.TestCase):
+ """Base suite of testcases for filesystem implementations.
+ Any FS subclass should be capable of passing all of these tests.
+ To apply the tests to your own FS implementation, simply subclass
+ BaseFSTestCase and have the setUp method set self.fs to an instance
+ of your FS implementation.
+ """
+
def check(self, p):
- return os.path.exists(os.path.join(self.temp_dir, makerelative(p)))
+ """Check that a file exists within self.fs"""
+ return self.fs.exists(p)
+
+ def test_root_dir(self):
+ """Test that the root directory is in fact a directory."""
+ self.assertTrue(self.fs.isdir(""))
+ self.assertTrue(self.fs.isdir("/"))
def test_debug(self):
+ """Test that the FS can be printed for debugging"""
str(self.fs)
repr(self.fs)
self.assert_(hasattr(self.fs, 'desc'))
+ def test_writefile(self):
+ """Test that basic file reading/writing works as expected."""
+ self.assertRaises(fs.ResourceNotFoundError,self.fs.open,"test1.txt")
+ f = self.fs.open("test1.txt","w")
+ f.write("testing")
+ f.close()
+ f = self.fs.open("test1.txt","r")
+ self.assertEquals(f.read(),"testing")
+
+ def test_isdir_isfile(self):
+ """Test that the existience-checking methods work correctly."""
+ self.assertFalse(self.fs.exists("dir1"))
+ self.assertFalse(self.fs.isdir("dir1"))
+ self.assertFalse(self.fs.isfile("a.txt"))
+ self.fs.createfile("a.txt")
+ self.assertFalse(self.fs.isdir("dir1"))
+ self.assertTrue(self.fs.exists("a.txt"))
+ self.assertTrue(self.fs.isfile("a.txt"))
+ self.fs.makedir("dir1")
+ self.assertTrue(self.fs.isdir("dir1"))
+ self.assertTrue(self.fs.exists("dir1"))
+ self.assertTrue(self.fs.exists("a.txt"))
+ self.fs.remove("a.txt")
+ self.assertFalse(self.fs.exists("a.txt"))
+
+ def test_listdir(self):
+ """Test the directory listing works, including keyword arguments."""
+ self.fs.createfile("a")
+ self.fs.createfile("b")
+ self.fs.createfile("foo")
+ self.fs.createfile("bar")
+ # Test listing of the root directory
+ d1 = self.fs.listdir()
+ self.assertEqual(len(d1), 4)
+ self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"])
+ d1 = self.fs.listdir("")
+ self.assertEqual(len(d1), 4)
+ self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"])
+ d1 = self.fs.listdir("/")
+ self.assertEqual(len(d1), 4)
+ # Test listing absolute paths
+ d2 = self.fs.listdir(absolute=True)
+ self.assertEqual(len(d2), 4)
+ self.assertEqual(sorted(d2), ["/a", "/b", "/bar", "/foo"])
+ # Create some deeper subdirectories, to make sure their
+ # contents are not inadvertantly included
+ self.fs.makedir("p/1/2/3",recursive=True)
+ self.fs.createfile("p/1/2/3/a")
+ self.fs.createfile("p/1/2/3/b")
+ self.fs.createfile("p/1/2/3/foo")
+ self.fs.createfile("p/1/2/3/bar")
+ self.fs.makedir("q")
+ # Test listing just files, just dirs, and wildcards
+ dirs_only = self.fs.listdir(dirs_only=True)
+ files_only = self.fs.listdir(files_only=True)
+ contains_a = self.fs.listdir(wildcard="*a*")
+ self.assertEqual(sorted(dirs_only), ["p", "q"])
+ self.assertEqual(sorted(files_only), ["a", "b", "bar", "foo"])
+ self.assertEqual(sorted(contains_a), ["a", "bar"])
+ # Test listing a subdirectory
+ d3 = self.fs.listdir("p/1/2/3")
+ self.assertEqual(len(d3), 4)
+ self.assertEqual(sorted(d3), ["a", "b", "bar", "foo"])
+ # Test listing a subdirectory with absoliute and full paths
+ d4 = self.fs.listdir("p/1/2/3", absolute=True)
+ self.assertEqual(len(d4), 4)
+ self.assertEqual(sorted(d4), ["/p/1/2/3/a", "/p/1/2/3/b", "/p/1/2/3/bar", "/p/1/2/3/foo"])
+ d4 = self.fs.listdir("p/1/2/3", full=True)
+ self.assertEqual(len(d4), 4)
+ self.assertEqual(sorted(d4), ["p/1/2/3/a", "p/1/2/3/b", "p/1/2/3/bar", "p/1/2/3/foo"])
+ # Test that appropriate errors are raised
+ self.assertRaises(fs.ResourceNotFoundError,self.fs.listdir,"zebra")
+ self.assertRaises(fs.ResourceInvalidError,self.fs.listdir,"foo")
+
def test_makedir(self):
+ """Test for proper functioning of makedir."""
check = self.check
-
self.fs.makedir("a")
- self.assert_(check("a"))
- self.assertRaises(fs.FSError, self.fs.makedir, "a/b/c")
-
+ self.assertTrue(check("a"))
+ self.assertRaises(fs.ParentDirectoryMissingError,self.fs.makedir,"a/b/c")
self.fs.makedir("a/b/c", recursive=True)
self.assert_(check("a/b/c"))
-
self.fs.makedir("foo/bar/baz", recursive=True)
self.assert_(check("foo/bar/baz"))
-
self.fs.makedir("a/b/child")
self.assert_(check("a/b/child"))
-
- self.fs.desc("a")
- self.fs.desc("a/b/child")
+ self.assertRaises(fs.DestinationExistsError,self.fs.makedir,"/a/b")
+ self.fs.makedir("/a/b",allow_recreate=True)
+ self.fs.createfile("/a/file")
+ self.assertRaises(fs.ResourceInvalidError,self.fs.makedir,"a/file")
+
+ def test_remove(self):
+ """Test that removing files works correctly."""
+ self.fs.createfile("a.txt")
+ self.assertTrue(self.check("a.txt"))
+ self.fs.remove("a.txt")
+ self.assertFalse(self.check("a.txt"))
+ self.assertRaises(fs.ResourceNotFoundError,self.fs.remove,"a.txt")
+ self.fs.makedir("dir1")
+ self.assertRaises(fs.ResourceInvalidError,self.fs.remove,"dir1")
+ self.fs.createfile("/dir1/a.txt")
+ self.assertTrue(self.check("dir1/a.txt"))
+ self.fs.remove("dir1/a.txt")
+ self.assertFalse(self.check("/dir1/a.txt"))
def test_removedir(self):
+ """Test for proper functioning of removedir."""
check = self.check
self.fs.makedir("a")
self.assert_(check("a"))
self.fs.removedir("a")
self.assert_(not check("a"))
self.fs.makedir("a/b/c/d", recursive=True)
- self.assertRaises(fs.FSError, self.fs.removedir, "a/b")
+ self.assertRaises(fs.DirectoryNotEmptyError, self.fs.removedir, "a/b")
self.fs.removedir("a/b/c/d")
self.assert_(not check("a/b/c/d"))
self.fs.removedir("a/b/c")
self.assert_(not check("a/b/c"))
self.fs.removedir("a/b")
self.assert_(not check("a/b"))
-
+ # Test recursive removal of empty parent dirs
self.fs.makedir("foo/bar/baz", recursive=True)
self.fs.removedir("foo/bar/baz", recursive=True)
self.assert_(not check("foo/bar/baz"))
self.assert_(not check("foo/bar"))
self.assert_(not check("foo"))
-
+ # Ensure that force=True works as expected
self.fs.makedir("frollic/waggle", recursive=True)
self.fs.createfile("frollic/waddle.txt","waddlewaddlewaddle")
- self.assertRaises(fs.OperationFailedError,self.fs.removedir,"frollic")
+ self.assertRaises(fs.DirectoryNotEmptyError,self.fs.removedir,"frollic")
+ self.assertRaises(fs.ResourceInvalidError,self.fs.removedir,"frollic/waddle.txt")
self.fs.removedir("frollic",force=True)
self.assert_(not check("frollic"))
- def test_listdir(self):
-
- def makefile(fname):
- f = self.fs.open(fname, "wb")
- f.write("*")
- f.close()
-
- makefile("a")
- makefile("b")
- makefile("foo")
- makefile("bar")
-
- d1 = self.fs.listdir()
- self.assertEqual(len(d1), 4)
- self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"])
-
- d2 = self.fs.listdir(absolute=True)
- self.assertEqual(len(d2), 4)
- self.assertEqual(sorted(d2), ["/a", "/b", "/bar", "/foo"])
-
- self.fs.makedir("p/1/2/3", recursive=True)
- makefile("p/1/2/3/a")
- makefile("p/1/2/3/b")
- makefile("p/1/2/3/foo")
- makefile("p/1/2/3/bar")
-
- self.fs.makedir("q")
- dirs_only = self.fs.listdir(dirs_only=True)
- files_only = self.fs.listdir(files_only=True)
- self.assertEqual(sorted(dirs_only), ["p", "q"])
- self.assertEqual(sorted(files_only), ["a", "b", "bar", "foo"])
-
- d3 = self.fs.listdir("p/1/2/3")
- self.assertEqual(len(d3), 4)
- self.assertEqual(sorted(d3), ["a", "b", "bar", "foo"])
-
- d4 = self.fs.listdir("p/1/2/3", absolute=True)
- self.assertEqual(len(d4), 4)
- self.assertEqual(sorted(d4), ["/p/1/2/3/a", "/p/1/2/3/b", "/p/1/2/3/bar", "/p/1/2/3/foo"])
-
- d4 = self.fs.listdir("p/1/2/3", full=True)
- self.assertEqual(len(d4), 4)
- self.assertEqual(sorted(d4), ["p/1/2/3/a", "p/1/2/3/b", "p/1/2/3/bar", "p/1/2/3/foo"])
-
-
def test_rename(self):
+ """Test that the rename() method works correctly."""
check = self.check
self.fs.open("foo.txt", 'wt').write("Hello, World!")
self.assert_(check("foo.txt"))
@@ -256,19 +186,17 @@ class TestOSFS(unittest.TestCase):
self.assert_(not check("foo.txt"))
def test_info(self):
+ """Test that getinfo() returns an appropriate structure."""
test_str = "Hello, World!"
- f = self.fs.open("info.txt", 'wb')
- f.write(test_str)
- f.close()
+ self.fs.createfile("info.txt",test_str)
info = self.fs.getinfo("info.txt")
self.assertEqual(info['size'], len(test_str))
self.fs.desc("info.txt")
def test_getsize(self):
+ """Test that file sizes are correctly reported."""
test_str = "*"*23
- f = self.fs.open("info.txt", 'wb')
- f.write(test_str)
- f.close()
+ self.fs.createfile("info.txt",test_str)
size = self.fs.getsize("info.txt")
self.assertEqual(size, len(test_str))
@@ -276,13 +204,9 @@ class TestOSFS(unittest.TestCase):
check = self.check
contents = "If the implementation is hard to explain, it's a bad idea."
def makefile(path):
- f = self.fs.open(path, "wb")
- f.write(contents)
- f.close()
+ self.fs.createfile(path,contents)
def checkcontents(path):
- f = self.fs.open(path, "rb")
- check_contents = f.read()
- f.close()
+ check_contents = self.fs.getcontents(path)
self.assertEqual(check_contents,contents)
return contents == check_contents
@@ -296,7 +220,6 @@ class TestOSFS(unittest.TestCase):
self.assert_(checkcontents("foo/b.txt"))
self.fs.move("foo/b.txt", "c.txt")
- fs.print_fs(self.fs)
self.assert_(not check("foo/b.txt"))
self.assert_(check("/c.txt"))
self.assert_(checkcontents("/c.txt"))
@@ -314,9 +237,7 @@ class TestOSFS(unittest.TestCase):
check = self.check
contents = "If the implementation is hard to explain, it's a bad idea."
def makefile(path):
- f = self.fs.open(path, "wb")
- f.write(contents)
- f.close()
+ self.fs.createfile(path,contents)
self.fs.makedir("a")
self.fs.makedir("b")
@@ -355,13 +276,9 @@ class TestOSFS(unittest.TestCase):
check = self.check
contents = "If the implementation is hard to explain, it's a bad idea."
def makefile(path,contents=contents):
- f = self.fs.open(path, "wb")
- f.write(contents)
- f.close()
+ self.fs.createfile(path,contents)
def checkcontents(path,contents=contents):
- f = self.fs.open(path, "rb")
- check_contents = f.read()
- f.close()
+ check_contents = self.fs.getcontents(path)
self.assertEqual(check_contents,contents)
return contents == check_contents
@@ -391,9 +308,11 @@ class TestOSFS(unittest.TestCase):
check = self.check
contents = "If the implementation is hard to explain, it's a bad idea."
def makefile(path):
- f = self.fs.open(path, "wb")
- f.write(contents)
- f.close()
+ self.fs.createfile(path,contents)
+ def checkcontents(path):
+ check_contents = self.fs.getcontents(path)
+ self.assertEqual(check_contents,contents)
+ return contents == check_contents
self.fs.makedir("a")
self.fs.makedir("b")
@@ -408,11 +327,13 @@ class TestOSFS(unittest.TestCase):
self.assert_(check("copy of a/2.txt"))
self.assert_(check("copy of a/3.txt"))
self.assert_(check("copy of a/foo/bar/baz.txt"))
+ checkcontents("copy of a/1.txt")
self.assert_(check("a/1.txt"))
self.assert_(check("a/2.txt"))
self.assert_(check("a/3.txt"))
self.assert_(check("a/foo/bar/baz.txt"))
+ checkcontents("a/1.txt")
self.assertRaises(fs.DestinationExistsError,self.fs.copydir,"a","b")
self.fs.copydir("a","b",overwrite=True)
@@ -420,15 +341,14 @@ class TestOSFS(unittest.TestCase):
self.assert_(check("b/2.txt"))
self.assert_(check("b/3.txt"))
self.assert_(check("b/foo/bar/baz.txt"))
+ checkcontents("b/1.txt")
- def test_copydir_with_hidden(self):
+ def test_copydir_with_dotfile(self):
check = self.check
contents = "If the implementation is hard to explain, it's a bad idea."
def makefile(path):
- f = self.fs.open(path, "wb")
- f.write(contents)
- f.close()
+ self.fs.createfile(path,contents)
self.fs.makedir("a")
makefile("a/1.txt")
@@ -446,13 +366,7 @@ class TestOSFS(unittest.TestCase):
def test_readwriteappendseek(self):
def checkcontents(path, check_contents):
- f = None
- try:
- f = self.fs.open(path, "rb")
- read_contents = f.read()
- finally:
- if f is not None:
- f.close()
+ read_contents = self.fs.getcontents(path)
self.assertEqual(read_contents,check_contents)
return read_contents == check_contents
test_strings = ["Beautiful is better than ugly.",
@@ -506,16 +420,190 @@ class TestOSFS(unittest.TestCase):
f7.close()
self.assertEqual(self.fs.getcontents("a.txt"), all_strings)
+ def test_with_statement(self):
+ import sys
+ if sys.version_info[0] >= 2 and sys.version_info[1] >= 5:
+ # A successful 'with' statement
+ contents = "testing the with statement"
+ code = "from __future__ import with_statement\n"
+ code += "with self.fs.open('f.txt','w-') as testfile:\n"
+ code += " testfile.write(contents)\n"
+ code += "self.assertEquals(self.fs.getcontents('f.txt'),contents)"
+ code = compile(code,"<string>",'exec')
+ eval(code)
+ # A 'with' statement raising an error
+ contents = "testing the with statement"
+ code = "from __future__ import with_statement\n"
+ code += "with self.fs.open('f.txt','w-') as testfile:\n"
+ code += " testfile.write(contents)\n"
+ code += " raise ValueError\n"
+ code = compile(code,"<string>",'exec')
+ self.assertRaises(ValueError,eval,code,globals(),locals())
+ self.assertEquals(self.fs.getcontents('f.txt'),contents)
+
+ def test_pickling(self):
+ self.fs.createfile("test1","hello world")
+ oldfs = self.fs
+ self.fs = pickle.loads(pickle.dumps(self.fs))
+ self.assert_(self.fs.isfile("test1"))
+
-class TestSubFS(TestOSFS):
+####################
+
+
+class TestHelpers(unittest.TestCase):
+ """Testcases for FS path helpers."""
+
+ def test_normpath(self):
+ tests = [ ("\\a\\b\\c", "/a/b/c"),
+ ("", ""),
+ ("/a/b/c", "/a/b/c"),
+ ("a/b/c", "a/b/c"),
+ ("a/b/../c/", "a/c"),
+ ]
+ for path, result in tests:
+ self.assertEqual(normpath(path), result)
+
+ def test_pathjoin(self):
+ tests = [ ("", "a", "a"),
+ ("a", "a", "a/a"),
+ ("a/b", "../c", "a/c"),
+ ("a/b/../c", "d", "a/c/d"),
+ ("/a/b/c", "d", "/a/b/c/d"),
+ ("/a/b/c", "../../../d", "/d"),
+ ("a", "b", "c", "a/b/c"),
+ ("a/b/c", "../d", "c", "a/b/d/c"),
+ ("a/b/c", "../d", "/a", "/a"),
+ ("aaa", "bbb/ccc", "aaa/bbb/ccc"),
+ ("aaa", "bbb\ccc", "aaa/bbb/ccc"),
+ ("aaa", "bbb", "ccc", "/aaa", "eee", "/aaa/eee"),
+ ("a/b", "./d", "e", "a/b/d/e"),
+ ("/", "/", "/"),
+ ("/", "", "/"),
+ ]
+ for testpaths in tests:
+ paths = testpaths[:-1]
+ result = testpaths[-1]
+ self.assertEqual(fs.pathjoin(*paths), result)
+
+ self.assertRaises(ValueError, fs.pathjoin, "../")
+ self.assertRaises(ValueError, fs.pathjoin, "./../")
+ self.assertRaises(ValueError, fs.pathjoin, "a/b", "../../..")
+ self.assertRaises(ValueError, fs.pathjoin, "a/b/../../../d")
+
+ def test_makerelative(self):
+ tests = [ ("/a/b", "a/b"),
+ ("a/b", "a/b"),
+ ("/", "") ]
+
+ for path, result in tests:
+ print path, result
+ self.assertEqual(fs.makerelative(path), result)
+
+ def test_makeabsolute(self):
+ tests = [ ("/a/b", "/a/b"),
+ ("a/b", "/a/b"),
+ ("/", "/") ]
+
+ for path, result in tests:
+ self.assertEqual(fs.makeabsolute(path), result)
+
+ def test_iteratepath(self):
+ tests = [ ("a/b", ["a", "b"]),
+ ("", [] ),
+ ("aaa/bbb/ccc", ["aaa", "bbb", "ccc"]),
+ ("a/b/c/../d", ["a", "b", "d"]) ]
+
+ for path, results in tests:
+ print repr(path), results
+ for path_component, expected in zip(iteratepath(path), results):
+ self.assertEqual(path_component, expected)
+
+ self.assertEqual(list(iteratepath("a/b/c/d", 1)), ["a", "b/c/d"])
+ self.assertEqual(list(iteratepath("a/b/c/d", 2)), ["a", "b", "c/d"])
+
+ def test_pathsplit(self):
+ tests = [ ("a/b", ("a", "b")),
+ ("a/b/c", ("a/b", "c")),
+ ("a", ("", "a")),
+ ("", ("", "")),
+ ("/", ("", "")),
+ ("foo/bar", ("foo", "bar")),
+ ("foo/bar/baz", ("foo/bar", "baz")),
+ ]
+ for path, result in tests:
+ self.assertEqual(fs.pathsplit(path), result)
+
+
+####################
+
+
+import objecttree
+
+class TestObjectTree(unittest.TestCase):
+ """Testcases for the ObjectTree class."""
+
+ def test_getset(self):
+ ot = objecttree.ObjectTree()
+ ot['foo'] = "bar"
+ self.assertEqual(ot['foo'], 'bar')
+
+ ot = objecttree.ObjectTree()
+ ot['foo/bar'] = "baz"
+ self.assertEqual(ot['foo'], {'bar':'baz'})
+ self.assertEqual(ot['foo/bar'], 'baz')
+
+ del ot['foo/bar']
+ self.assertEqual(ot['foo'], {})
+
+ ot = objecttree.ObjectTree()
+ ot['a/b/c'] = "A"
+ ot['a/b/d'] = "B"
+ ot['a/b/e'] = "C"
+ ot['a/b/f'] = "D"
+ self.assertEqual(sorted(ot['a/b'].values()), ['A', 'B', 'C', 'D'])
+ self.assert_(ot.get('a/b/x', -1) == -1)
+
+ self.assert_('a/b/c' in ot)
+ self.assert_('a/b/x' not in ot)
+ self.assert_(ot.isobject('a/b/c'))
+ self.assert_(ot.isobject('a/b/d'))
+ self.assert_(not ot.isobject('a/b'))
+
+ left, object, right = ot.partialget('a/b/e/f/g')
+ self.assertEqual(left, "a/b/e")
+ self.assertEqual(object, "C")
+ self.assertEqual(right, "f/g")
+
+
+####################
+
+
+import osfs
+import os
+
+class TestOSFS(BaseFSTestCase):
+
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp("fstest")
+ self.fs = osfs.OSFS(self.temp_dir)
+
+ def tearDown(self):
+ shutil.rmtree(self.temp_dir)
+
+ def check(self, p):
+ return os.path.exists(os.path.join(self.temp_dir, makerelative(p)))
+
+
+
+class TestSubFS(BaseFSTestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp("fstest")
self.parent_fs = osfs.OSFS(self.temp_dir)
self.parent_fs.makedir("foo/bar", recursive=True)
self.fs = self.parent_fs.opendir("foo/bar")
- print "Temp dir is", self.temp_dir
def tearDown(self):
shutil.rmtree(self.temp_dir)
@@ -527,20 +615,14 @@ class TestSubFS(TestOSFS):
import memoryfs
-class TestMemoryFS(TestOSFS):
+class TestMemoryFS(BaseFSTestCase):
def setUp(self):
self.fs = memoryfs.MemoryFS()
- def tearDown(self):
- pass
-
- def check(self, p):
- return self.fs.exists(p)
-
import mountfs
-class TestMountFS(TestOSFS):
+class TestMountFS(BaseFSTestCase):
def setUp(self):
self.mount_fs = mountfs.MountFS()
@@ -554,8 +636,9 @@ class TestMountFS(TestOSFS):
def check(self, p):
return self.mount_fs.exists(os.path.join("mounted/memfs", makerelative(p)))
+
import tempfs
-class TestTempFS(TestOSFS):
+class TestTempFS(BaseFSTestCase):
def setUp(self):
self.fs = tempfs.TempFS()
@@ -569,6 +652,71 @@ class TestTempFS(TestOSFS):
td = self.fs._temp_dir
return os.path.exists(os.path.join(td, makerelative(p)))
+
+import s3fs
+class TestS3FS(BaseFSTestCase):
+
+ bucket = "test-s3fs.rfk.id.au"
+
+ def setUp(self):
+ self.fs = s3fs.S3FS(self.bucket,"/unittest/files")
+ self._clear()
+
+ def _clear(self):
+ for (path,files) in self.fs.walk(search="depth"):
+ for fn in files:
+ self.fs.remove(pathjoin(path,fn))
+ if path and path != "/":
+ self.fs.removedir(path)
+
+ def tearDown(self):
+ self._clear()
+ for k in self.fs._s3bukt.list():
+ self.fs._s3bukt.delete_key(k)
+ self.fs._s3conn.delete_bucket(self.bucket)
+
+
+import rpcfs
+import socket
+import threading
+class TestRPCFS(TestOSFS):
+
+ def setUp(self):
+ self.port = 8000
+ self.server = None
+ while not self.server:
+ try:
+ self.server = rpcfs.RPCFSServer(tempfs.TempFS(),("localhost",self.port),logRequests=False)
+ except socket.error, e:
+ if e.args[1] == "Address already in use":
+ self.port += 1
+ else:
+ raise e
+ self.server_thread = threading.Thread(target=self._run_server)
+ self.server_thread.start()
+ self.fs = rpcfs.RPCFS("http://localhost:" + str(self.port))
+
+ def _run_server(self):
+ """Run the server, swallowing shutdown-related execptions."""
+ try:
+ self.server.serve_forever()
+ except:
+ pass
+
+ def tearDown(self):
+ try:
+ # Shut the server down. We send one final request to
+ # bump the socket and make it recognise the shutdown.
+ self.server.serve_more_requests = False
+ self.server.server_close()
+ self.fs.exists("/")
+ except Exception:
+ pass
+
+
+####################
+
+
import zipfs
import random
import zipfile
@@ -640,6 +788,7 @@ class TestReadZipFS(unittest.TestCase):
check_listing('foo', ['second.txt', 'bar'])
check_listing('foo/bar', ['baz.txt'])
+
class TestWriteZipFS(unittest.TestCase):
def setUp(self):
@@ -708,99 +857,8 @@ class TestAppendZipFS(TestWriteZipFS):
zip_fs.close()
-import s3fs
-class TestS3FS(TestOSFS):
-
- bucket = "test-s3fs.rfk.id.au"
-
- def setUp(self):
- self.fs = s3fs.S3FS(self.bucket,"/unittest/files")
- self._clear()
-
- def _clear(self):
- for (path,files) in self.fs.walk(search="depth"):
- for fn in files:
- self.fs.remove(pathjoin(path,fn))
- if path and path != "/":
- self.fs.removedir(path)
-
- def tearDown(self):
- self._clear()
- for k in self.fs._s3bukt.list():
- self.fs._s3bukt.delete_key(k)
- self.fs._s3conn.delete_bucket(self.bucket)
-
- def check(self, p):
- return self.fs.exists(p)
-
- def test_with_statement(self):
- import sys
- if sys.version_info[0] >= 2 and sys.version_info[1] >= 5:
- # A successful 'with' statement
- contents = "testing the with statement"
- code = "from __future__ import with_statement\n"
- code += "with self.fs.open('f.txt','w-') as testfile:\n"
- code += " testfile.write(contents)\n"
- code += "self.assertEquals(self.fs.getcontents('f.txt'),contents)"
- code = compile(code,"<string>",'exec')
- eval(code)
- # A 'with' statement raising an error
- contents = "testing the with statement"
- code = "from __future__ import with_statement\n"
- code += "with self.fs.open('f.txt','w-') as testfile:\n"
- code += " testfile.write(contents)\n"
- code += " raise ValueError\n"
- code = compile(code,"<string>",'exec')
- self.assertRaises(ValueError,eval,code,globals(),locals())
- self.assertEquals(self.fs.getcontents('f.txt'),contents)
-
-
-
-import rpcfs
-import socket
-import threading
-import time
-class TestRPCFS(TestOSFS):
-
- def setUp(self):
- self.port = 8000
- self.server = None
- while not self.server:
- try:
- self.server = rpcfs.RPCFSServer(tempfs.TempFS(),("localhost",self.port),logRequests=False)
- except socket.error, e:
- if e.args[1] == "Address already in use":
- self.port += 1
- else:
- raise e
- self.server_thread = threading.Thread(target=self._run_server)
- self.server_thread.start()
- self.fs = rpcfs.RPCFS("http://localhost:" + str(self.port))
-
- def _run_server(self):
- """Run the server, swallowing shutdown-related execptions."""
- try:
- self.server.serve_forever()
- except:
- pass
-
- def tearDown(self):
- try:
- # Shut the server down. We send one final request to
- # bump the socket and make it recognise the shutdown.
- self.server.serve_more_requests = False
- self.server.server_close()
- self.fs.exists("/")
- except Exception:
- pass
-
- def check(self, p):
- return self.fs.exists(p)
-
if __name__ == "__main__":
- #t = TestFS()
- #t.setUp()
- #t.tearDown()
import nose
nose.main()
+