From facbf53c6b075a7e959cc3bd32d7183bd576b3a2 Mon Sep 17 00:00:00 2001 From: rfkelly0 Date: Tue, 5 May 2009 06:05:28 +0000 Subject: More tests, especially assertRaises() variants git-svn-id: http://pyfilesystem.googlecode.com/svn/branches/rfk-ideas@129 67cdc799-7952-0410-af00-57a81ceafa0f --- fs/tests.py | 758 ++++++++++++++++++++++++++++++++---------------------------- 1 file changed, 408 insertions(+), 350 deletions(-) diff --git a/fs/tests.py b/fs/tests.py index deb880c..022b7b1 100644 --- a/fs/tests.py +++ b/fs/tests.py @@ -1,253 +1,183 @@ #!/usr/bin/env python +""" -import unittest -import base as fs -from helpers import * -from helpers import _iteratepath -import shutil - -class TestHelpers(unittest.TestCase): - - def test_isabsolutepath(self): - tests = [ ('', False), - ('/', True), - ('/A/B', True), - ('/asdasd', True), - ('a/b/c', False), - ] - for path, result in tests: - self.assertEqual(fs.isabsolutepath(path), result) - - def test_normpath(self): - tests = [ ("\\a\\b\\c", "/a/b/c"), - ("", ""), - ("/a/b/c", "/a/b/c"), - ] - for path, result in tests: - self.assertEqual(fs.normpath(path), result) - - def test_pathjoin(self): - tests = [ ("", "a", "a"), - ("a", "a", "a/a"), - ("a/b", "../c", "a/c"), - ("a/b/../c", "d", "a/c/d"), - ("/a/b/c", "d", "/a/b/c/d"), - ("/a/b/c", "../../../d", "/d"), - ("a", "b", "c", "a/b/c"), - ("a/b/c", "../d", "c", "a/b/d/c"), - ("a/b/c", "../d", "/a", "/a"), - ("aaa", "bbb/ccc", "aaa/bbb/ccc"), - ("aaa", "bbb\ccc", "aaa/bbb/ccc"), - ("aaa", "bbb", "ccc", "/aaa", "eee", "/aaa/eee"), - ("a/b", "./d", "e", "a/b/d/e"), - ("/", "/", "/"), - ("/", "", "/"), - ] - for testpaths in tests: - paths = testpaths[:-1] - result = testpaths[-1] - self.assertEqual(fs.pathjoin(*paths), result) - - self.assertRaises(ValueError, fs.pathjoin, "../") - self.assertRaises(ValueError, fs.pathjoin, "./../") - self.assertRaises(ValueError, fs.pathjoin, "a/b", "../../..") - self.assertRaises(ValueError, fs.pathjoin, "a/b/../../../d") - - def test_makerelative(self): - tests = [ ("/a/b", "a/b"), - ("a/b", "a/b"), - ("/", "") ] - - for path, result in tests: - print path, result - self.assertEqual(fs.makerelative(path), result) - - def test_makeabsolute(self): - tests = [ ("/a/b", "/a/b"), - ("a/b", "/a/b"), - ("/", "/") ] - - for path, result in tests: - self.assertEqual(fs.makeabsolute(path), result) - - def test_iteratepath(self): - tests = [ ("a/b", ["a", "b"]), - ("", [] ), - ("aaa/bbb/ccc", ["aaa", "bbb", "ccc"]), - ("a/b/c/../d", ["a", "b", "d"]) ] - - for path, results in tests: - print repr(path), results - for path_component, expected in zip(_iteratepath(path), results): - self.assertEqual(path_component, expected) - - self.assertEqual(list(_iteratepath("a/b/c/d", 1)), ["a", "b/c/d"]) - self.assertEqual(list(_iteratepath("a/b/c/d", 2)), ["a", "b", "c/d"]) - - def test_pathsplit(self): - tests = [ ("a/b", ("a", "b")), - ("a/b/c", ("a/b", "c")), - ("a", ("", "a")), - ("", ("", "")), - ("/", ("", "")), - ("foo/bar", ("foo", "bar")), - ("foo/bar/baz", ("foo/bar", "baz")), - ] - for path, result in tests: - self.assertEqual(fs.pathsplit(path), result) - - -import objecttree - -class TestObjectTree(unittest.TestCase): - - def test_getset(self): - ot = objecttree.ObjectTree() - ot['foo'] = "bar" - self.assertEqual(ot['foo'], 'bar') - - ot = objecttree.ObjectTree() - ot['foo/bar'] = "baz" - self.assertEqual(ot['foo'], {'bar':'baz'}) - self.assertEqual(ot['foo/bar'], 'baz') - - del ot['foo/bar'] - self.assertEqual(ot['foo'], {}) + fs.tests: testcases for the fs module - ot = objecttree.ObjectTree() - ot['a/b/c'] = "A" - ot['a/b/d'] = "B" - ot['a/b/e'] = "C" - ot['a/b/f'] = "D" - self.assertEqual(sorted(ot['a/b'].values()), ['A', 'B', 'C', 'D']) - self.assert_(ot.get('a/b/x', -1) == -1) +""" - self.assert_('a/b/c' in ot) - self.assert_('a/b/x' not in ot) - self.assert_(ot.isobject('a/b/c')) - self.assert_(ot.isobject('a/b/d')) - self.assert_(not ot.isobject('a/b')) +import unittest - left, object, right = ot.partialget('a/b/e/f/g') - self.assertEqual(left, "a/b/e") - self.assertEqual(object, "C") - self.assertEqual(right, "f/g") +import shutil +import tempfile +import pickle +import base as fs +from helpers import * -import tempfile -import osfs -import os -class TestOSFS(unittest.TestCase): +#################### - def setUp(self): - self.temp_dir = tempfile.mkdtemp("fstest") - self.fs = osfs.OSFS(self.temp_dir) - print "Temp dir is", self.temp_dir - def tearDown(self): - shutil.rmtree(self.temp_dir) +class BaseFSTestCase(unittest.TestCase): + """Base suite of testcases for filesystem implementations. + Any FS subclass should be capable of passing all of these tests. + To apply the tests to your own FS implementation, simply subclass + BaseFSTestCase and have the setUp method set self.fs to an instance + of your FS implementation. + """ + def check(self, p): - return os.path.exists(os.path.join(self.temp_dir, makerelative(p))) + """Check that a file exists within self.fs""" + return self.fs.exists(p) + + def test_root_dir(self): + """Test that the root directory is in fact a directory.""" + self.assertTrue(self.fs.isdir("")) + self.assertTrue(self.fs.isdir("/")) def test_debug(self): + """Test that the FS can be printed for debugging""" str(self.fs) repr(self.fs) self.assert_(hasattr(self.fs, 'desc')) + def test_writefile(self): + """Test that basic file reading/writing works as expected.""" + self.assertRaises(fs.ResourceNotFoundError,self.fs.open,"test1.txt") + f = self.fs.open("test1.txt","w") + f.write("testing") + f.close() + f = self.fs.open("test1.txt","r") + self.assertEquals(f.read(),"testing") + + def test_isdir_isfile(self): + """Test that the existience-checking methods work correctly.""" + self.assertFalse(self.fs.exists("dir1")) + self.assertFalse(self.fs.isdir("dir1")) + self.assertFalse(self.fs.isfile("a.txt")) + self.fs.createfile("a.txt") + self.assertFalse(self.fs.isdir("dir1")) + self.assertTrue(self.fs.exists("a.txt")) + self.assertTrue(self.fs.isfile("a.txt")) + self.fs.makedir("dir1") + self.assertTrue(self.fs.isdir("dir1")) + self.assertTrue(self.fs.exists("dir1")) + self.assertTrue(self.fs.exists("a.txt")) + self.fs.remove("a.txt") + self.assertFalse(self.fs.exists("a.txt")) + + def test_listdir(self): + """Test the directory listing works, including keyword arguments.""" + self.fs.createfile("a") + self.fs.createfile("b") + self.fs.createfile("foo") + self.fs.createfile("bar") + # Test listing of the root directory + d1 = self.fs.listdir() + self.assertEqual(len(d1), 4) + self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"]) + d1 = self.fs.listdir("") + self.assertEqual(len(d1), 4) + self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"]) + d1 = self.fs.listdir("/") + self.assertEqual(len(d1), 4) + # Test listing absolute paths + d2 = self.fs.listdir(absolute=True) + self.assertEqual(len(d2), 4) + self.assertEqual(sorted(d2), ["/a", "/b", "/bar", "/foo"]) + # Create some deeper subdirectories, to make sure their + # contents are not inadvertantly included + self.fs.makedir("p/1/2/3",recursive=True) + self.fs.createfile("p/1/2/3/a") + self.fs.createfile("p/1/2/3/b") + self.fs.createfile("p/1/2/3/foo") + self.fs.createfile("p/1/2/3/bar") + self.fs.makedir("q") + # Test listing just files, just dirs, and wildcards + dirs_only = self.fs.listdir(dirs_only=True) + files_only = self.fs.listdir(files_only=True) + contains_a = self.fs.listdir(wildcard="*a*") + self.assertEqual(sorted(dirs_only), ["p", "q"]) + self.assertEqual(sorted(files_only), ["a", "b", "bar", "foo"]) + self.assertEqual(sorted(contains_a), ["a", "bar"]) + # Test listing a subdirectory + d3 = self.fs.listdir("p/1/2/3") + self.assertEqual(len(d3), 4) + self.assertEqual(sorted(d3), ["a", "b", "bar", "foo"]) + # Test listing a subdirectory with absoliute and full paths + d4 = self.fs.listdir("p/1/2/3", absolute=True) + self.assertEqual(len(d4), 4) + self.assertEqual(sorted(d4), ["/p/1/2/3/a", "/p/1/2/3/b", "/p/1/2/3/bar", "/p/1/2/3/foo"]) + d4 = self.fs.listdir("p/1/2/3", full=True) + self.assertEqual(len(d4), 4) + self.assertEqual(sorted(d4), ["p/1/2/3/a", "p/1/2/3/b", "p/1/2/3/bar", "p/1/2/3/foo"]) + # Test that appropriate errors are raised + self.assertRaises(fs.ResourceNotFoundError,self.fs.listdir,"zebra") + self.assertRaises(fs.ResourceInvalidError,self.fs.listdir,"foo") + def test_makedir(self): + """Test for proper functioning of makedir.""" check = self.check - self.fs.makedir("a") - self.assert_(check("a")) - self.assertRaises(fs.FSError, self.fs.makedir, "a/b/c") - + self.assertTrue(check("a")) + self.assertRaises(fs.ParentDirectoryMissingError,self.fs.makedir,"a/b/c") self.fs.makedir("a/b/c", recursive=True) self.assert_(check("a/b/c")) - self.fs.makedir("foo/bar/baz", recursive=True) self.assert_(check("foo/bar/baz")) - self.fs.makedir("a/b/child") self.assert_(check("a/b/child")) - - self.fs.desc("a") - self.fs.desc("a/b/child") + self.assertRaises(fs.DestinationExistsError,self.fs.makedir,"/a/b") + self.fs.makedir("/a/b",allow_recreate=True) + self.fs.createfile("/a/file") + self.assertRaises(fs.ResourceInvalidError,self.fs.makedir,"a/file") + + def test_remove(self): + """Test that removing files works correctly.""" + self.fs.createfile("a.txt") + self.assertTrue(self.check("a.txt")) + self.fs.remove("a.txt") + self.assertFalse(self.check("a.txt")) + self.assertRaises(fs.ResourceNotFoundError,self.fs.remove,"a.txt") + self.fs.makedir("dir1") + self.assertRaises(fs.ResourceInvalidError,self.fs.remove,"dir1") + self.fs.createfile("/dir1/a.txt") + self.assertTrue(self.check("dir1/a.txt")) + self.fs.remove("dir1/a.txt") + self.assertFalse(self.check("/dir1/a.txt")) def test_removedir(self): + """Test for proper functioning of removedir.""" check = self.check self.fs.makedir("a") self.assert_(check("a")) self.fs.removedir("a") self.assert_(not check("a")) self.fs.makedir("a/b/c/d", recursive=True) - self.assertRaises(fs.FSError, self.fs.removedir, "a/b") + self.assertRaises(fs.DirectoryNotEmptyError, self.fs.removedir, "a/b") self.fs.removedir("a/b/c/d") self.assert_(not check("a/b/c/d")) self.fs.removedir("a/b/c") self.assert_(not check("a/b/c")) self.fs.removedir("a/b") self.assert_(not check("a/b")) - + # Test recursive removal of empty parent dirs self.fs.makedir("foo/bar/baz", recursive=True) self.fs.removedir("foo/bar/baz", recursive=True) self.assert_(not check("foo/bar/baz")) self.assert_(not check("foo/bar")) self.assert_(not check("foo")) - + # Ensure that force=True works as expected self.fs.makedir("frollic/waggle", recursive=True) self.fs.createfile("frollic/waddle.txt","waddlewaddlewaddle") - self.assertRaises(fs.OperationFailedError,self.fs.removedir,"frollic") + self.assertRaises(fs.DirectoryNotEmptyError,self.fs.removedir,"frollic") + self.assertRaises(fs.ResourceInvalidError,self.fs.removedir,"frollic/waddle.txt") self.fs.removedir("frollic",force=True) self.assert_(not check("frollic")) - def test_listdir(self): - - def makefile(fname): - f = self.fs.open(fname, "wb") - f.write("*") - f.close() - - makefile("a") - makefile("b") - makefile("foo") - makefile("bar") - - d1 = self.fs.listdir() - self.assertEqual(len(d1), 4) - self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"]) - - d2 = self.fs.listdir(absolute=True) - self.assertEqual(len(d2), 4) - self.assertEqual(sorted(d2), ["/a", "/b", "/bar", "/foo"]) - - self.fs.makedir("p/1/2/3", recursive=True) - makefile("p/1/2/3/a") - makefile("p/1/2/3/b") - makefile("p/1/2/3/foo") - makefile("p/1/2/3/bar") - - self.fs.makedir("q") - dirs_only = self.fs.listdir(dirs_only=True) - files_only = self.fs.listdir(files_only=True) - self.assertEqual(sorted(dirs_only), ["p", "q"]) - self.assertEqual(sorted(files_only), ["a", "b", "bar", "foo"]) - - d3 = self.fs.listdir("p/1/2/3") - self.assertEqual(len(d3), 4) - self.assertEqual(sorted(d3), ["a", "b", "bar", "foo"]) - - d4 = self.fs.listdir("p/1/2/3", absolute=True) - self.assertEqual(len(d4), 4) - self.assertEqual(sorted(d4), ["/p/1/2/3/a", "/p/1/2/3/b", "/p/1/2/3/bar", "/p/1/2/3/foo"]) - - d4 = self.fs.listdir("p/1/2/3", full=True) - self.assertEqual(len(d4), 4) - self.assertEqual(sorted(d4), ["p/1/2/3/a", "p/1/2/3/b", "p/1/2/3/bar", "p/1/2/3/foo"]) - - def test_rename(self): + """Test that the rename() method works correctly.""" check = self.check self.fs.open("foo.txt", 'wt').write("Hello, World!") self.assert_(check("foo.txt")) @@ -256,19 +186,17 @@ class TestOSFS(unittest.TestCase): self.assert_(not check("foo.txt")) def test_info(self): + """Test that getinfo() returns an appropriate structure.""" test_str = "Hello, World!" - f = self.fs.open("info.txt", 'wb') - f.write(test_str) - f.close() + self.fs.createfile("info.txt",test_str) info = self.fs.getinfo("info.txt") self.assertEqual(info['size'], len(test_str)) self.fs.desc("info.txt") def test_getsize(self): + """Test that file sizes are correctly reported.""" test_str = "*"*23 - f = self.fs.open("info.txt", 'wb') - f.write(test_str) - f.close() + self.fs.createfile("info.txt",test_str) size = self.fs.getsize("info.txt") self.assertEqual(size, len(test_str)) @@ -276,13 +204,9 @@ class TestOSFS(unittest.TestCase): check = self.check contents = "If the implementation is hard to explain, it's a bad idea." def makefile(path): - f = self.fs.open(path, "wb") - f.write(contents) - f.close() + self.fs.createfile(path,contents) def checkcontents(path): - f = self.fs.open(path, "rb") - check_contents = f.read() - f.close() + check_contents = self.fs.getcontents(path) self.assertEqual(check_contents,contents) return contents == check_contents @@ -296,7 +220,6 @@ class TestOSFS(unittest.TestCase): self.assert_(checkcontents("foo/b.txt")) self.fs.move("foo/b.txt", "c.txt") - fs.print_fs(self.fs) self.assert_(not check("foo/b.txt")) self.assert_(check("/c.txt")) self.assert_(checkcontents("/c.txt")) @@ -314,9 +237,7 @@ class TestOSFS(unittest.TestCase): check = self.check contents = "If the implementation is hard to explain, it's a bad idea." def makefile(path): - f = self.fs.open(path, "wb") - f.write(contents) - f.close() + self.fs.createfile(path,contents) self.fs.makedir("a") self.fs.makedir("b") @@ -355,13 +276,9 @@ class TestOSFS(unittest.TestCase): check = self.check contents = "If the implementation is hard to explain, it's a bad idea." def makefile(path,contents=contents): - f = self.fs.open(path, "wb") - f.write(contents) - f.close() + self.fs.createfile(path,contents) def checkcontents(path,contents=contents): - f = self.fs.open(path, "rb") - check_contents = f.read() - f.close() + check_contents = self.fs.getcontents(path) self.assertEqual(check_contents,contents) return contents == check_contents @@ -391,9 +308,11 @@ class TestOSFS(unittest.TestCase): check = self.check contents = "If the implementation is hard to explain, it's a bad idea." def makefile(path): - f = self.fs.open(path, "wb") - f.write(contents) - f.close() + self.fs.createfile(path,contents) + def checkcontents(path): + check_contents = self.fs.getcontents(path) + self.assertEqual(check_contents,contents) + return contents == check_contents self.fs.makedir("a") self.fs.makedir("b") @@ -408,11 +327,13 @@ class TestOSFS(unittest.TestCase): self.assert_(check("copy of a/2.txt")) self.assert_(check("copy of a/3.txt")) self.assert_(check("copy of a/foo/bar/baz.txt")) + checkcontents("copy of a/1.txt") self.assert_(check("a/1.txt")) self.assert_(check("a/2.txt")) self.assert_(check("a/3.txt")) self.assert_(check("a/foo/bar/baz.txt")) + checkcontents("a/1.txt") self.assertRaises(fs.DestinationExistsError,self.fs.copydir,"a","b") self.fs.copydir("a","b",overwrite=True) @@ -420,15 +341,14 @@ class TestOSFS(unittest.TestCase): self.assert_(check("b/2.txt")) self.assert_(check("b/3.txt")) self.assert_(check("b/foo/bar/baz.txt")) + checkcontents("b/1.txt") - def test_copydir_with_hidden(self): + def test_copydir_with_dotfile(self): check = self.check contents = "If the implementation is hard to explain, it's a bad idea." def makefile(path): - f = self.fs.open(path, "wb") - f.write(contents) - f.close() + self.fs.createfile(path,contents) self.fs.makedir("a") makefile("a/1.txt") @@ -446,13 +366,7 @@ class TestOSFS(unittest.TestCase): def test_readwriteappendseek(self): def checkcontents(path, check_contents): - f = None - try: - f = self.fs.open(path, "rb") - read_contents = f.read() - finally: - if f is not None: - f.close() + read_contents = self.fs.getcontents(path) self.assertEqual(read_contents,check_contents) return read_contents == check_contents test_strings = ["Beautiful is better than ugly.", @@ -506,41 +420,209 @@ class TestOSFS(unittest.TestCase): f7.close() self.assertEqual(self.fs.getcontents("a.txt"), all_strings) + def test_with_statement(self): + import sys + if sys.version_info[0] >= 2 and sys.version_info[1] >= 5: + # A successful 'with' statement + contents = "testing the with statement" + code = "from __future__ import with_statement\n" + code += "with self.fs.open('f.txt','w-') as testfile:\n" + code += " testfile.write(contents)\n" + code += "self.assertEquals(self.fs.getcontents('f.txt'),contents)" + code = compile(code,"",'exec') + eval(code) + # A 'with' statement raising an error + contents = "testing the with statement" + code = "from __future__ import with_statement\n" + code += "with self.fs.open('f.txt','w-') as testfile:\n" + code += " testfile.write(contents)\n" + code += " raise ValueError\n" + code = compile(code,"",'exec') + self.assertRaises(ValueError,eval,code,globals(),locals()) + self.assertEquals(self.fs.getcontents('f.txt'),contents) + def test_pickling(self): + self.fs.createfile("test1","hello world") + oldfs = self.fs + self.fs = pickle.loads(pickle.dumps(self.fs)) + self.assert_(self.fs.isfile("test1")) -class TestSubFS(TestOSFS): - - def setUp(self): - self.temp_dir = tempfile.mkdtemp("fstest") - self.parent_fs = osfs.OSFS(self.temp_dir) - self.parent_fs.makedir("foo/bar", recursive=True) - self.fs = self.parent_fs.opendir("foo/bar") - print "Temp dir is", self.temp_dir - - def tearDown(self): - shutil.rmtree(self.temp_dir) - def check(self, p): - p = os.path.join("foo/bar", makerelative(p)) - full_p = os.path.join(self.temp_dir, p) - return os.path.exists(full_p) +#################### -import memoryfs -class TestMemoryFS(TestOSFS): - def setUp(self): - self.fs = memoryfs.MemoryFS() +class TestHelpers(unittest.TestCase): + """Testcases for FS path helpers.""" + + def test_normpath(self): + tests = [ ("\\a\\b\\c", "/a/b/c"), + ("", ""), + ("/a/b/c", "/a/b/c"), + ("a/b/c", "a/b/c"), + ("a/b/../c/", "a/c"), + ] + for path, result in tests: + self.assertEqual(normpath(path), result) + + def test_pathjoin(self): + tests = [ ("", "a", "a"), + ("a", "a", "a/a"), + ("a/b", "../c", "a/c"), + ("a/b/../c", "d", "a/c/d"), + ("/a/b/c", "d", "/a/b/c/d"), + ("/a/b/c", "../../../d", "/d"), + ("a", "b", "c", "a/b/c"), + ("a/b/c", "../d", "c", "a/b/d/c"), + ("a/b/c", "../d", "/a", "/a"), + ("aaa", "bbb/ccc", "aaa/bbb/ccc"), + ("aaa", "bbb\ccc", "aaa/bbb/ccc"), + ("aaa", "bbb", "ccc", "/aaa", "eee", "/aaa/eee"), + ("a/b", "./d", "e", "a/b/d/e"), + ("/", "/", "/"), + ("/", "", "/"), + ] + for testpaths in tests: + paths = testpaths[:-1] + result = testpaths[-1] + self.assertEqual(fs.pathjoin(*paths), result) + + self.assertRaises(ValueError, fs.pathjoin, "../") + self.assertRaises(ValueError, fs.pathjoin, "./../") + self.assertRaises(ValueError, fs.pathjoin, "a/b", "../../..") + self.assertRaises(ValueError, fs.pathjoin, "a/b/../../../d") + + def test_makerelative(self): + tests = [ ("/a/b", "a/b"), + ("a/b", "a/b"), + ("/", "") ] + + for path, result in tests: + print path, result + self.assertEqual(fs.makerelative(path), result) + + def test_makeabsolute(self): + tests = [ ("/a/b", "/a/b"), + ("a/b", "/a/b"), + ("/", "/") ] + + for path, result in tests: + self.assertEqual(fs.makeabsolute(path), result) + + def test_iteratepath(self): + tests = [ ("a/b", ["a", "b"]), + ("", [] ), + ("aaa/bbb/ccc", ["aaa", "bbb", "ccc"]), + ("a/b/c/../d", ["a", "b", "d"]) ] + + for path, results in tests: + print repr(path), results + for path_component, expected in zip(iteratepath(path), results): + self.assertEqual(path_component, expected) + + self.assertEqual(list(iteratepath("a/b/c/d", 1)), ["a", "b/c/d"]) + self.assertEqual(list(iteratepath("a/b/c/d", 2)), ["a", "b", "c/d"]) + + def test_pathsplit(self): + tests = [ ("a/b", ("a", "b")), + ("a/b/c", ("a/b", "c")), + ("a", ("", "a")), + ("", ("", "")), + ("/", ("", "")), + ("foo/bar", ("foo", "bar")), + ("foo/bar/baz", ("foo/bar", "baz")), + ] + for path, result in tests: + self.assertEqual(fs.pathsplit(path), result) + + +#################### - def tearDown(self): - pass + +import objecttree + +class TestObjectTree(unittest.TestCase): + """Testcases for the ObjectTree class.""" + + def test_getset(self): + ot = objecttree.ObjectTree() + ot['foo'] = "bar" + self.assertEqual(ot['foo'], 'bar') + + ot = objecttree.ObjectTree() + ot['foo/bar'] = "baz" + self.assertEqual(ot['foo'], {'bar':'baz'}) + self.assertEqual(ot['foo/bar'], 'baz') + + del ot['foo/bar'] + self.assertEqual(ot['foo'], {}) + + ot = objecttree.ObjectTree() + ot['a/b/c'] = "A" + ot['a/b/d'] = "B" + ot['a/b/e'] = "C" + ot['a/b/f'] = "D" + self.assertEqual(sorted(ot['a/b'].values()), ['A', 'B', 'C', 'D']) + self.assert_(ot.get('a/b/x', -1) == -1) + + self.assert_('a/b/c' in ot) + self.assert_('a/b/x' not in ot) + self.assert_(ot.isobject('a/b/c')) + self.assert_(ot.isobject('a/b/d')) + self.assert_(not ot.isobject('a/b')) + + left, object, right = ot.partialget('a/b/e/f/g') + self.assertEqual(left, "a/b/e") + self.assertEqual(object, "C") + self.assertEqual(right, "f/g") + + +#################### + + +import osfs +import os + +class TestOSFS(BaseFSTestCase): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp("fstest") + self.fs = osfs.OSFS(self.temp_dir) + + def tearDown(self): + shutil.rmtree(self.temp_dir) def check(self, p): - return self.fs.exists(p) + return os.path.exists(os.path.join(self.temp_dir, makerelative(p))) + + + +class TestSubFS(BaseFSTestCase): + + def setUp(self): + self.temp_dir = tempfile.mkdtemp("fstest") + self.parent_fs = osfs.OSFS(self.temp_dir) + self.parent_fs.makedir("foo/bar", recursive=True) + self.fs = self.parent_fs.opendir("foo/bar") + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def check(self, p): + p = os.path.join("foo/bar", makerelative(p)) + full_p = os.path.join(self.temp_dir, p) + return os.path.exists(full_p) + + +import memoryfs +class TestMemoryFS(BaseFSTestCase): + + def setUp(self): + self.fs = memoryfs.MemoryFS() import mountfs -class TestMountFS(TestOSFS): +class TestMountFS(BaseFSTestCase): def setUp(self): self.mount_fs = mountfs.MountFS() @@ -554,8 +636,9 @@ class TestMountFS(TestOSFS): def check(self, p): return self.mount_fs.exists(os.path.join("mounted/memfs", makerelative(p))) + import tempfs -class TestTempFS(TestOSFS): +class TestTempFS(BaseFSTestCase): def setUp(self): self.fs = tempfs.TempFS() @@ -569,6 +652,71 @@ class TestTempFS(TestOSFS): td = self.fs._temp_dir return os.path.exists(os.path.join(td, makerelative(p))) + +import s3fs +class TestS3FS(BaseFSTestCase): + + bucket = "test-s3fs.rfk.id.au" + + def setUp(self): + self.fs = s3fs.S3FS(self.bucket,"/unittest/files") + self._clear() + + def _clear(self): + for (path,files) in self.fs.walk(search="depth"): + for fn in files: + self.fs.remove(pathjoin(path,fn)) + if path and path != "/": + self.fs.removedir(path) + + def tearDown(self): + self._clear() + for k in self.fs._s3bukt.list(): + self.fs._s3bukt.delete_key(k) + self.fs._s3conn.delete_bucket(self.bucket) + + +import rpcfs +import socket +import threading +class TestRPCFS(TestOSFS): + + def setUp(self): + self.port = 8000 + self.server = None + while not self.server: + try: + self.server = rpcfs.RPCFSServer(tempfs.TempFS(),("localhost",self.port),logRequests=False) + except socket.error, e: + if e.args[1] == "Address already in use": + self.port += 1 + else: + raise e + self.server_thread = threading.Thread(target=self._run_server) + self.server_thread.start() + self.fs = rpcfs.RPCFS("http://localhost:" + str(self.port)) + + def _run_server(self): + """Run the server, swallowing shutdown-related execptions.""" + try: + self.server.serve_forever() + except: + pass + + def tearDown(self): + try: + # Shut the server down. We send one final request to + # bump the socket and make it recognise the shutdown. + self.server.serve_more_requests = False + self.server.server_close() + self.fs.exists("/") + except Exception: + pass + + +#################### + + import zipfs import random import zipfile @@ -640,6 +788,7 @@ class TestReadZipFS(unittest.TestCase): check_listing('foo', ['second.txt', 'bar']) check_listing('foo/bar', ['baz.txt']) + class TestWriteZipFS(unittest.TestCase): def setUp(self): @@ -708,99 +857,8 @@ class TestAppendZipFS(TestWriteZipFS): zip_fs.close() -import s3fs -class TestS3FS(TestOSFS): - - bucket = "test-s3fs.rfk.id.au" - - def setUp(self): - self.fs = s3fs.S3FS(self.bucket,"/unittest/files") - self._clear() - - def _clear(self): - for (path,files) in self.fs.walk(search="depth"): - for fn in files: - self.fs.remove(pathjoin(path,fn)) - if path and path != "/": - self.fs.removedir(path) - - def tearDown(self): - self._clear() - for k in self.fs._s3bukt.list(): - self.fs._s3bukt.delete_key(k) - self.fs._s3conn.delete_bucket(self.bucket) - - def check(self, p): - return self.fs.exists(p) - - def test_with_statement(self): - import sys - if sys.version_info[0] >= 2 and sys.version_info[1] >= 5: - # A successful 'with' statement - contents = "testing the with statement" - code = "from __future__ import with_statement\n" - code += "with self.fs.open('f.txt','w-') as testfile:\n" - code += " testfile.write(contents)\n" - code += "self.assertEquals(self.fs.getcontents('f.txt'),contents)" - code = compile(code,"",'exec') - eval(code) - # A 'with' statement raising an error - contents = "testing the with statement" - code = "from __future__ import with_statement\n" - code += "with self.fs.open('f.txt','w-') as testfile:\n" - code += " testfile.write(contents)\n" - code += " raise ValueError\n" - code = compile(code,"",'exec') - self.assertRaises(ValueError,eval,code,globals(),locals()) - self.assertEquals(self.fs.getcontents('f.txt'),contents) - - - -import rpcfs -import socket -import threading -import time -class TestRPCFS(TestOSFS): - - def setUp(self): - self.port = 8000 - self.server = None - while not self.server: - try: - self.server = rpcfs.RPCFSServer(tempfs.TempFS(),("localhost",self.port),logRequests=False) - except socket.error, e: - if e.args[1] == "Address already in use": - self.port += 1 - else: - raise e - self.server_thread = threading.Thread(target=self._run_server) - self.server_thread.start() - self.fs = rpcfs.RPCFS("http://localhost:" + str(self.port)) - - def _run_server(self): - """Run the server, swallowing shutdown-related execptions.""" - try: - self.server.serve_forever() - except: - pass - - def tearDown(self): - try: - # Shut the server down. We send one final request to - # bump the socket and make it recognise the shutdown. - self.server.serve_more_requests = False - self.server.server_close() - self.fs.exists("/") - except Exception: - pass - - def check(self, p): - return self.fs.exists(p) - if __name__ == "__main__": - #t = TestFS() - #t.setUp() - #t.tearDown() import nose nose.main() + -- cgit v1.2.1