summaryrefslogtreecommitdiff
path: root/fs/tests/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'fs/tests/__init__.py')
-rw-r--r--fs/tests/__init__.py465
1 files changed, 263 insertions, 202 deletions
diff --git a/fs/tests/__init__.py b/fs/tests/__init__.py
index 73a6795..1988f64 100644
--- a/fs/tests/__init__.py
+++ b/fs/tests/__init__.py
@@ -11,7 +11,7 @@ from __future__ import with_statement
# be captured by nose and reported appropriately
import sys
import logging
-#logging.basicConfig(level=logging.ERROR, stream=sys.stdout)
+logging.basicConfig(level=logging.ERROR, stream=sys.stdout)
from fs.base import *
from fs.path import *
@@ -20,7 +20,8 @@ from fs.filelike import StringIO
import datetime
import unittest
-import os, os.path
+import os
+import os.path
import pickle
import random
import copy
@@ -34,6 +35,7 @@ except ImportError:
import six
from six import PY3, b
+
class FSTestCases(object):
"""Base suite of testcases for filesystem implementations.
@@ -80,7 +82,6 @@ class FSTestCases(object):
except NoMetaError:
self.assertFalse(self.fs.hasmeta(meta_name))
-
def test_root_dir(self):
self.assertTrue(self.fs.isdir(""))
self.assertTrue(self.fs.isdir("/"))
@@ -94,10 +95,10 @@ class FSTestCases(object):
except NoSysPathError:
pass
else:
- self.assertTrue(isinstance(syspath,unicode))
- syspath = self.fs.getsyspath("/",allow_none=True)
+ self.assertTrue(isinstance(syspath, unicode))
+ syspath = self.fs.getsyspath("/", allow_none=True)
if syspath is not None:
- self.assertTrue(isinstance(syspath,unicode))
+ self.assertTrue(isinstance(syspath, unicode))
def test_debug(self):
str(self.fs)
@@ -119,49 +120,54 @@ class FSTestCases(object):
assert False, "ResourceInvalidError was not raised"
def test_writefile(self):
- self.assertRaises(ResourceNotFoundError,self.fs.open,"test1.txt")
- f = self.fs.open("test1.txt","wb")
+ self.assertRaises(ResourceNotFoundError, self.fs.open, "test1.txt")
+ f = self.fs.open("test1.txt", "wb")
f.write(b("testing"))
f.close()
self.assertTrue(self.check("test1.txt"))
- f = self.fs.open("test1.txt","rb")
- self.assertEquals(f.read(),b("testing"))
+ f = self.fs.open("test1.txt", "rb")
+ self.assertEquals(f.read(), b("testing"))
f.close()
- f = self.fs.open("test1.txt","wb")
+ f = self.fs.open("test1.txt", "wb")
f.write(b("test file overwrite"))
f.close()
self.assertTrue(self.check("test1.txt"))
- f = self.fs.open("test1.txt","rb")
- self.assertEquals(f.read(),b("test file overwrite"))
+ f = self.fs.open("test1.txt", "rb")
+ self.assertEquals(f.read(), b("test file overwrite"))
f.close()
def test_setcontents(self):
# setcontents() should accept both a string...
- self.fs.setcontents("hello",b("world"))
- self.assertEquals(self.fs.getcontents("hello", "rb"),b("world"))
+ self.fs.setcontents("hello", b("world"))
+ self.assertEquals(self.fs.getcontents("hello", "rb"), b("world"))
# ...and a file-like object
- self.fs.setcontents("hello",StringIO(b("to you, good sir!")))
- self.assertEquals(self.fs.getcontents("hello", "rb"),b("to you, good sir!"))
+ self.fs.setcontents("hello", StringIO(b("to you, good sir!")))
+ self.assertEquals(self.fs.getcontents(
+ "hello", "rb"), b("to you, good sir!"))
# setcontents() should accept both a string...
- self.fs.setcontents("hello",b("world"), chunk_size=2)
- self.assertEquals(self.fs.getcontents("hello", "rb"),b("world"))
+ self.fs.setcontents("hello", b("world"), chunk_size=2)
+ self.assertEquals(self.fs.getcontents("hello", "rb"), b("world"))
# ...and a file-like object
- self.fs.setcontents("hello",StringIO(b("to you, good sir!")), chunk_size=2)
- self.assertEquals(self.fs.getcontents("hello", "rb"),b("to you, good sir!"))
-
+ self.fs.setcontents("hello", StringIO(
+ b("to you, good sir!")), chunk_size=2)
+ self.assertEquals(self.fs.getcontents(
+ "hello", "rb"), b("to you, good sir!"))
def test_setcontents_async(self):
# setcontents() should accept both a string...
self.fs.setcontents_async("hello", b("world")).wait()
self.assertEquals(self.fs.getcontents("hello", "rb"), b("world"))
# ...and a file-like object
- self.fs.setcontents_async("hello",StringIO(b("to you, good sir!"))).wait()
+ self.fs.setcontents_async("hello", StringIO(
+ b("to you, good sir!"))).wait()
self.assertEquals(self.fs.getcontents("hello"), b("to you, good sir!"))
self.fs.setcontents_async("hello", b("world"), chunk_size=2).wait()
self.assertEquals(self.fs.getcontents("hello", "rb"), b("world"))
# ...and a file-like object
- self.fs.setcontents_async("hello", StringIO(b("to you, good sir!")), chunk_size=2).wait()
- self.assertEquals(self.fs.getcontents("hello", "rb"), b("to you, good sir!"))
+ self.fs.setcontents_async("hello", StringIO(
+ b("to you, good sir!")), chunk_size=2).wait()
+ self.assertEquals(self.fs.getcontents(
+ "hello", "rb"), b("to you, good sir!"))
def test_isdir_isfile(self):
self.assertFalse(self.fs.exists("dir1"))
@@ -182,7 +188,7 @@ class FSTestCases(object):
def test_listdir(self):
def check_unicode(items):
for item in items:
- self.assertTrue(isinstance(item,unicode))
+ self.assertTrue(isinstance(item, unicode))
self.fs.setcontents(u"a", b(''))
self.fs.setcontents("b", b(''))
self.fs.setcontents("foo", b(''))
@@ -206,7 +212,7 @@ class FSTestCases(object):
check_unicode(d2)
# Create some deeper subdirectories, to make sure their
# contents are not inadvertantly included
- self.fs.makedir("p/1/2/3",recursive=True)
+ self.fs.makedir("p/1/2/3", recursive=True)
self.fs.setcontents("p/1/2/3/a", b(''))
self.fs.setcontents("p/1/2/3/b", b(''))
self.fs.setcontents("p/1/2/3/foo", b(''))
@@ -218,7 +224,7 @@ class FSTestCases(object):
contains_a = self.fs.listdir(wildcard="*a*")
self.assertEqual(sorted(dirs_only), [u"p", u"q"])
self.assertEqual(sorted(files_only), [u"a", u"b", u"bar", u"foo"])
- self.assertEqual(sorted(contains_a), [u"a",u"bar"])
+ self.assertEqual(sorted(contains_a), [u"a", u"bar"])
check_unicode(dirs_only)
check_unicode(files_only)
check_unicode(contains_a)
@@ -237,16 +243,17 @@ class FSTestCases(object):
self.assertEqual(sorted(d4), [u"p/1/2/3/a", u"p/1/2/3/b", u"p/1/2/3/bar", u"p/1/2/3/foo"])
check_unicode(d4)
# Test that appropriate errors are raised
- self.assertRaises(ResourceNotFoundError,self.fs.listdir,"zebra")
- self.assertRaises(ResourceInvalidError,self.fs.listdir,"foo")
+ self.assertRaises(ResourceNotFoundError, self.fs.listdir, "zebra")
+ self.assertRaises(ResourceInvalidError, self.fs.listdir, "foo")
def test_listdirinfo(self):
def check_unicode(items):
- for (nm,info) in items:
- self.assertTrue(isinstance(nm,unicode))
- def check_equal(items,target):
- names = [nm for (nm,info) in items]
- self.assertEqual(sorted(names),sorted(target))
+ for (nm, info) in items:
+ self.assertTrue(isinstance(nm, unicode))
+
+ def check_equal(items, target):
+ names = [nm for (nm, info) in items]
+ self.assertEqual(sorted(names), sorted(target))
self.fs.setcontents(u"a", b(''))
self.fs.setcontents("b", b(''))
self.fs.setcontents("foo", b(''))
@@ -271,7 +278,7 @@ class FSTestCases(object):
check_unicode(d2)
# Create some deeper subdirectories, to make sure their
# contents are not inadvertantly included
- self.fs.makedir("p/1/2/3",recursive=True)
+ self.fs.makedir("p/1/2/3", recursive=True)
self.fs.setcontents("p/1/2/3/a", b(''))
self.fs.setcontents("p/1/2/3/b", b(''))
self.fs.setcontents("p/1/2/3/foo", b(''))
@@ -283,7 +290,7 @@ class FSTestCases(object):
contains_a = self.fs.listdirinfo(wildcard="*a*")
check_equal(dirs_only, [u"p", u"q"])
check_equal(files_only, [u"a", u"b", u"bar", u"foo"])
- check_equal(contains_a, [u"a",u"bar"])
+ check_equal(contains_a, [u"a", u"bar"])
check_unicode(dirs_only)
check_unicode(files_only)
check_unicode(contains_a)
@@ -302,20 +309,20 @@ class FSTestCases(object):
check_equal(d4, [u"p/1/2/3/a", u"p/1/2/3/b", u"p/1/2/3/bar", u"p/1/2/3/foo"])
check_unicode(d4)
# Test that appropriate errors are raised
- self.assertRaises(ResourceNotFoundError,self.fs.listdirinfo,"zebra")
- self.assertRaises(ResourceInvalidError,self.fs.listdirinfo,"foo")
+ self.assertRaises(ResourceNotFoundError, self.fs.listdirinfo, "zebra")
+ self.assertRaises(ResourceInvalidError, self.fs.listdirinfo, "foo")
def test_walk(self):
self.fs.setcontents('a.txt', b('hello'))
self.fs.setcontents('b.txt', b('world'))
self.fs.makeopendir('foo').setcontents('c', b('123'))
- sorted_walk = sorted([(d,sorted(fs)) for (d,fs) in self.fs.walk()])
+ sorted_walk = sorted([(d, sorted(fs)) for (d, fs) in self.fs.walk()])
self.assertEquals(sorted_walk,
- [("/",["a.txt","b.txt"]),
- ("/foo",["c"])])
+ [("/", ["a.txt", "b.txt"]),
+ ("/foo", ["c"])])
# When searching breadth-first, shallow entries come first
found_a = False
- for _,files in self.fs.walk(search="breadth"):
+ for _, files in self.fs.walk(search="breadth"):
if "a.txt" in files:
found_a = True
if "c" in files:
@@ -323,12 +330,13 @@ class FSTestCases(object):
assert found_a, "breadth search order was wrong"
# When searching depth-first, deep entries come first
found_c = False
- for _,files in self.fs.walk(search="depth"):
+ for _, files in self.fs.walk(search="depth"):
if "c" in files:
found_c = True
if "a.txt" in files:
break
- assert found_c, "depth search order was wrong: " + str(list(self.fs.walk(search="depth")))
+ assert found_c, "depth search order was wrong: " + \
+ str(list(self.fs.walk(search="depth")))
def test_walk_wildcard(self):
self.fs.setcontents('a.txt', b('hello'))
@@ -338,7 +346,7 @@ class FSTestCases(object):
for dir_path, paths in self.fs.walk(wildcard='*.txt'):
for path in paths:
self.assert_(path.endswith('.txt'))
- for dir_path, paths in self.fs.walk(wildcard=lambda fn:fn.endswith('.txt')):
+ for dir_path, paths in self.fs.walk(wildcard=lambda fn: fn.endswith('.txt')):
for path in paths:
self.assert_(path.endswith('.txt'))
@@ -347,22 +355,28 @@ class FSTestCases(object):
self.fs.setcontents('b.txt', b('world'))
self.fs.makeopendir('foo').setcontents('c', b('123'))
self.fs.makeopendir('.svn').setcontents('ignored', b(''))
- for dir_path, paths in self.fs.walk(dir_wildcard=lambda fn:not fn.endswith('.svn')):
+ for dir_path, paths in self.fs.walk(dir_wildcard=lambda fn: not fn.endswith('.svn')):
for path in paths:
self.assert_('.svn' not in path)
def test_walkfiles(self):
self.fs.makeopendir('bar').setcontents('a.txt', b('123'))
self.fs.makeopendir('foo').setcontents('b', b('123'))
- self.assertEquals(sorted(self.fs.walkfiles()),["/bar/a.txt","/foo/b"])
- self.assertEquals(sorted(self.fs.walkfiles(dir_wildcard="*foo*")),["/foo/b"])
- self.assertEquals(sorted(self.fs.walkfiles(wildcard="*.txt")),["/bar/a.txt"])
+ self.assertEquals(sorted(
+ self.fs.walkfiles()), ["/bar/a.txt", "/foo/b"])
+ self.assertEquals(sorted(self.fs.walkfiles(
+ dir_wildcard="*foo*")), ["/foo/b"])
+ self.assertEquals(sorted(self.fs.walkfiles(
+ wildcard="*.txt")), ["/bar/a.txt"])
def test_walkdirs(self):
self.fs.makeopendir('bar').setcontents('a.txt', b('123'))
- self.fs.makeopendir('foo').makeopendir("baz").setcontents('b', b('123'))
- self.assertEquals(sorted(self.fs.walkdirs()),["/","/bar","/foo","/foo/baz"])
- self.assertEquals(sorted(self.fs.walkdirs(wildcard="*foo*")),["/","/foo","/foo/baz"])
+ self.fs.makeopendir('foo').makeopendir(
+ "baz").setcontents('b', b('123'))
+ self.assertEquals(sorted(self.fs.walkdirs()), [
+ "/", "/bar", "/foo", "/foo/baz"])
+ self.assertEquals(sorted(self.fs.walkdirs(
+ wildcard="*foo*")), ["/", "/foo", "/foo/baz"])
def test_unicode(self):
alpha = u"\N{GREEK SMALL LETTER ALPHA}"
@@ -371,32 +385,33 @@ class FSTestCases(object):
self.fs.setcontents(alpha+"/a", b(''))
self.fs.setcontents(alpha+"/"+beta, b(''))
self.assertTrue(self.check(alpha))
- self.assertEquals(sorted(self.fs.listdir(alpha)),["a",beta])
+ self.assertEquals(sorted(self.fs.listdir(alpha)), ["a", beta])
def test_makedir(self):
check = self.check
self.fs.makedir("a")
self.assertTrue(check("a"))
- self.assertRaises(ParentDirectoryMissingError,self.fs.makedir,"a/b/c")
+ self.assertRaises(
+ ParentDirectoryMissingError, self.fs.makedir, "a/b/c")
self.fs.makedir("a/b/c", recursive=True)
self.assert_(check("a/b/c"))
self.fs.makedir("foo/bar/baz", recursive=True)
self.assert_(check("foo/bar/baz"))
self.fs.makedir("a/b/child")
self.assert_(check("a/b/child"))
- self.assertRaises(DestinationExistsError,self.fs.makedir,"/a/b")
- self.fs.makedir("/a/b",allow_recreate=True)
+ self.assertRaises(DestinationExistsError, self.fs.makedir, "/a/b")
+ self.fs.makedir("/a/b", allow_recreate=True)
self.fs.setcontents("/a/file", b(''))
- self.assertRaises(ResourceInvalidError,self.fs.makedir,"a/file")
+ self.assertRaises(ResourceInvalidError, self.fs.makedir, "a/file")
def test_remove(self):
self.fs.setcontents("a.txt", b(''))
self.assertTrue(self.check("a.txt"))
self.fs.remove("a.txt")
self.assertFalse(self.check("a.txt"))
- self.assertRaises(ResourceNotFoundError,self.fs.remove,"a.txt")
+ self.assertRaises(ResourceNotFoundError, self.fs.remove, "a.txt")
self.fs.makedir("dir1")
- self.assertRaises(ResourceInvalidError,self.fs.remove,"dir1")
+ self.assertRaises(ResourceInvalidError, self.fs.remove, "dir1")
self.fs.setcontents("/dir1/a.txt", b(''))
self.assertTrue(self.check("dir1/a.txt"))
self.fs.remove("dir1/a.txt")
@@ -431,10 +446,11 @@ class FSTestCases(object):
self.assert_(check("foo/file.txt"))
# Ensure that force=True works as expected
self.fs.makedir("frollic/waggle", recursive=True)
- self.fs.setcontents("frollic/waddle.txt",b("waddlewaddlewaddle"))
- self.assertRaises(DirectoryNotEmptyError,self.fs.removedir,"frollic")
- self.assertRaises(ResourceInvalidError,self.fs.removedir,"frollic/waddle.txt")
- self.fs.removedir("frollic",force=True)
+ self.fs.setcontents("frollic/waddle.txt", b("waddlewaddlewaddle"))
+ self.assertRaises(DirectoryNotEmptyError, self.fs.removedir, "frollic")
+ self.assertRaises(
+ ResourceInvalidError, self.fs.removedir, "frollic/waddle.txt")
+ self.fs.removedir("frollic", force=True)
self.assert_(not check("frollic"))
# Test removing unicode dirs
kappa = u"\N{GREEK CAPITAL LETTER KAPPA}"
@@ -443,59 +459,64 @@ class FSTestCases(object):
self.fs.removedir(kappa)
self.assertRaises(ResourceNotFoundError, self.fs.removedir, kappa)
self.assert_(not self.fs.isdir(kappa))
- self.fs.makedir(pathjoin("test",kappa),recursive=True)
- self.assert_(check(pathjoin("test",kappa)))
- self.fs.removedir("test",force=True)
+ self.fs.makedir(pathjoin("test", kappa), recursive=True)
+ self.assert_(check(pathjoin("test", kappa)))
+ self.fs.removedir("test", force=True)
self.assert_(not check("test"))
def test_rename(self):
check = self.check
# test renaming a file in the same directory
- self.fs.setcontents("foo.txt",b("Hello, World!"))
+ self.fs.setcontents("foo.txt", b("Hello, World!"))
self.assert_(check("foo.txt"))
self.fs.rename("foo.txt", "bar.txt")
self.assert_(check("bar.txt"))
self.assert_(not check("foo.txt"))
# test renaming a directory in the same directory
self.fs.makedir("dir_a")
- self.fs.setcontents("dir_a/test.txt",b("testerific"))
+ self.fs.setcontents("dir_a/test.txt", b("testerific"))
self.assert_(check("dir_a"))
- self.fs.rename("dir_a","dir_b")
+ self.fs.rename("dir_a", "dir_b")
self.assert_(check("dir_b"))
self.assert_(check("dir_b/test.txt"))
self.assert_(not check("dir_a/test.txt"))
self.assert_(not check("dir_a"))
# test renaming a file into a different directory
self.fs.makedir("dir_a")
- self.fs.rename("dir_b/test.txt","dir_a/test.txt")
+ self.fs.rename("dir_b/test.txt", "dir_a/test.txt")
self.assert_(not check("dir_b/test.txt"))
self.assert_(check("dir_a/test.txt"))
# test renaming a file into a non-existent directory
- self.assertRaises(ParentDirectoryMissingError,self.fs.rename,"dir_a/test.txt","nonexistent/test.txt")
+ self.assertRaises(ParentDirectoryMissingError,
+ self.fs.rename, "dir_a/test.txt", "nonexistent/test.txt")
def test_info(self):
test_str = b("Hello, World!")
- self.fs.setcontents("info.txt",test_str)
+ self.fs.setcontents("info.txt", test_str)
info = self.fs.getinfo("info.txt")
self.assertEqual(info['size'], len(test_str))
self.fs.desc("info.txt")
- self.assertRaises(ResourceNotFoundError,self.fs.getinfo,"notafile")
- self.assertRaises(ResourceNotFoundError,self.fs.getinfo,"info.txt/inval")
+ self.assertRaises(ResourceNotFoundError, self.fs.getinfo, "notafile")
+ self.assertRaises(
+ ResourceNotFoundError, self.fs.getinfo, "info.txt/inval")
def test_getsize(self):
test_str = b("*")*23
- self.fs.setcontents("info.txt",test_str)
+ self.fs.setcontents("info.txt", test_str)
size = self.fs.getsize("info.txt")
self.assertEqual(size, len(test_str))
def test_movefile(self):
check = self.check
- contents = b("If the implementation is hard to explain, it's a bad idea.")
+ contents = b(
+ "If the implementation is hard to explain, it's a bad idea.")
+
def makefile(path):
- self.fs.setcontents(path,contents)
+ self.fs.setcontents(path, contents)
+
def checkcontents(path):
check_contents = self.fs.getcontents(path, "rb")
- self.assertEqual(check_contents,contents)
+ self.assertEqual(check_contents, contents)
return contents == check_contents
self.fs.makedir("foo/bar", recursive=True)
@@ -513,21 +534,23 @@ class FSTestCases(object):
self.assert_(checkcontents("/c.txt"))
makefile("foo/bar/a.txt")
- self.assertRaises(DestinationExistsError,self.fs.move,"foo/bar/a.txt","/c.txt")
+ self.assertRaises(
+ DestinationExistsError, self.fs.move, "foo/bar/a.txt", "/c.txt")
self.assert_(check("foo/bar/a.txt"))
self.assert_(check("/c.txt"))
- self.fs.move("foo/bar/a.txt","/c.txt",overwrite=True)
+ self.fs.move("foo/bar/a.txt", "/c.txt", overwrite=True)
self.assert_(not check("foo/bar/a.txt"))
self.assert_(check("/c.txt"))
-
def test_movedir(self):
check = self.check
- contents = b("If the implementation is hard to explain, it's a bad idea.")
+ contents = b(
+ "If the implementation is hard to explain, it's a bad idea.")
+
def makefile(path):
self.fs.setcontents(path, contents)
- self.assertRaises(ResourceNotFoundError,self.fs.movedir,"a","b")
+ self.assertRaises(ResourceNotFoundError, self.fs.movedir, "a", "b")
self.fs.makedir("a")
self.fs.makedir("b")
makefile("a/1.txt")
@@ -553,34 +576,37 @@ class FSTestCases(object):
self.assert_(not check("a"))
self.fs.makedir("a")
- self.assertRaises(DestinationExistsError,self.fs.movedir,"copy of a","a")
- self.fs.movedir("copy of a","a",overwrite=True)
+ self.assertRaises(
+ DestinationExistsError, self.fs.movedir, "copy of a", "a")
+ self.fs.movedir("copy of a", "a", overwrite=True)
self.assert_(not check("copy of a"))
self.assert_(check("a/1.txt"))
self.assert_(check("a/2.txt"))
self.assert_(check("a/3.txt"))
self.assert_(check("a/foo/bar/baz.txt"))
-
def test_cant_copy_from_os(self):
sys_executable = os.path.abspath(os.path.realpath(sys.executable))
- self.assertRaises(FSError,self.fs.copy,sys_executable,"py.exe")
+ self.assertRaises(FSError, self.fs.copy, sys_executable, "py.exe")
def test_copyfile(self):
check = self.check
- contents = b("If the implementation is hard to explain, it's a bad idea.")
- def makefile(path,contents=contents):
- self.fs.setcontents(path,contents)
- def checkcontents(path,contents=contents):
+ contents = b(
+ "If the implementation is hard to explain, it's a bad idea.")
+
+ def makefile(path, contents=contents):
+ self.fs.setcontents(path, contents)
+
+ def checkcontents(path, contents=contents):
check_contents = self.fs.getcontents(path, "rb")
- self.assertEqual(check_contents,contents)
+ self.assertEqual(check_contents, contents)
return contents == check_contents
self.fs.makedir("foo/bar", recursive=True)
makefile("foo/bar/a.txt")
self.assert_(check("foo/bar/a.txt"))
self.assert_(checkcontents("foo/bar/a.txt"))
- #import rpdb2; rpdb2.start_embedded_debugger('password');
+ # import rpdb2; rpdb2.start_embedded_debugger('password');
self.fs.copy("foo/bar/a.txt", "foo/b.txt")
self.assert_(check("foo/bar/a.txt"))
self.assert_(check("foo/b.txt"))
@@ -592,23 +618,26 @@ class FSTestCases(object):
self.assert_(check("/c.txt"))
self.assert_(checkcontents("/c.txt"))
- makefile("foo/bar/a.txt",b("different contents"))
- self.assert_(checkcontents("foo/bar/a.txt",b("different contents")))
- self.assertRaises(DestinationExistsError,self.fs.copy,"foo/bar/a.txt","/c.txt")
+ makefile("foo/bar/a.txt", b("different contents"))
+ self.assert_(checkcontents("foo/bar/a.txt", b("different contents")))
+ self.assertRaises(
+ DestinationExistsError, self.fs.copy, "foo/bar/a.txt", "/c.txt")
self.assert_(checkcontents("/c.txt"))
- self.fs.copy("foo/bar/a.txt","/c.txt",overwrite=True)
- self.assert_(checkcontents("foo/bar/a.txt",b("different contents")))
- self.assert_(checkcontents("/c.txt",b("different contents")))
-
+ self.fs.copy("foo/bar/a.txt", "/c.txt", overwrite=True)
+ self.assert_(checkcontents("foo/bar/a.txt", b("different contents")))
+ self.assert_(checkcontents("/c.txt", b("different contents")))
def test_copydir(self):
check = self.check
- contents = b("If the implementation is hard to explain, it's a bad idea.")
+ contents = b(
+ "If the implementation is hard to explain, it's a bad idea.")
+
def makefile(path):
- self.fs.setcontents(path,contents)
+ self.fs.setcontents(path, contents)
+
def checkcontents(path):
check_contents = self.fs.getcontents(path)
- self.assertEqual(check_contents,contents)
+ self.assertEqual(check_contents, contents)
return contents == check_contents
self.fs.makedir("a")
@@ -632,8 +661,8 @@ class FSTestCases(object):
self.assert_(check("a/foo/bar/baz.txt"))
checkcontents("a/1.txt")
- self.assertRaises(DestinationExistsError,self.fs.copydir,"a","b")
- self.fs.copydir("a","b",overwrite=True)
+ self.assertRaises(DestinationExistsError, self.fs.copydir, "a", "b")
+ self.fs.copydir("a", "b", overwrite=True)
self.assert_(check("b/1.txt"))
self.assert_(check("b/2.txt"))
self.assert_(check("b/3.txt"))
@@ -642,9 +671,11 @@ class FSTestCases(object):
def test_copydir_with_dotfile(self):
check = self.check
- contents = b("If the implementation is hard to explain, it's a bad idea.")
+ contents = b(
+ "If the implementation is hard to explain, it's a bad idea.")
+
def makefile(path):
- self.fs.setcontents(path,contents)
+ self.fs.setcontents(path, contents)
self.fs.makedir("a")
makefile("a/1.txt")
@@ -663,7 +694,7 @@ class FSTestCases(object):
def test_readwriteappendseek(self):
def checkcontents(path, check_contents):
read_contents = self.fs.getcontents(path, "rb")
- self.assertEqual(read_contents,check_contents)
+ self.assertEqual(read_contents, check_contents)
return read_contents == check_contents
test_strings = [b("Beautiful is better than ugly."),
b("Explicit is better than implicit."),
@@ -688,11 +719,11 @@ class FSTestCases(object):
self.assert_(checkcontents("b.txt", test_strings[0]))
f3 = self.fs.open("b.txt", "ab")
# On win32, tell() gives zero until you actually write to the file
- #self.assertEquals(f3.tell(),len(test_strings[0]))
+ # self.assertEquals(f3.tell(),len(test_strings[0]))
f3.write(test_strings[1])
- self.assertEquals(f3.tell(),len(test_strings[0])+len(test_strings[1]))
+ self.assertEquals(f3.tell(), len(test_strings[0])+len(test_strings[1]))
f3.write(test_strings[2])
- self.assertEquals(f3.tell(),len(all_strings))
+ self.assertEquals(f3.tell(), len(all_strings))
f3.close()
self.assert_(checkcontents("b.txt", all_strings))
f4 = self.fs.open("b.txt", "wb")
@@ -723,46 +754,45 @@ class FSTestCases(object):
def test_truncate(self):
def checkcontents(path, check_contents):
read_contents = self.fs.getcontents(path, "rb")
- self.assertEqual(read_contents,check_contents)
+ self.assertEqual(read_contents, check_contents)
return read_contents == check_contents
- self.fs.setcontents("hello",b("world"))
- checkcontents("hello",b("world"))
- self.fs.setcontents("hello",b("hi"))
- checkcontents("hello",b("hi"))
- self.fs.setcontents("hello",b("1234567890"))
- checkcontents("hello",b("1234567890"))
- with self.fs.open("hello","rb+") as f:
+ self.fs.setcontents("hello", b("world"))
+ checkcontents("hello", b("world"))
+ self.fs.setcontents("hello", b("hi"))
+ checkcontents("hello", b("hi"))
+ self.fs.setcontents("hello", b("1234567890"))
+ checkcontents("hello", b("1234567890"))
+ with self.fs.open("hello", "rb+") as f:
f.truncate(7)
- checkcontents("hello",b("1234567"))
- with self.fs.open("hello","rb+") as f:
+ checkcontents("hello", b("1234567"))
+ with self.fs.open("hello", "rb+") as f:
f.seek(5)
f.truncate()
- checkcontents("hello",b("12345"))
+ checkcontents("hello", b("12345"))
def test_truncate_to_larger_size(self):
- with self.fs.open("hello","wb") as f:
+ with self.fs.open("hello", "wb") as f:
f.truncate(30)
self.assertEquals(self.fs.getsize("hello"), 30)
# Some file systems (FTPFS) don't support both reading and writing
if self.fs.getmeta('file.read_and_write', True):
- with self.fs.open("hello","rb+") as f:
+ with self.fs.open("hello", "rb+") as f:
f.seek(25)
f.write(b("123456"))
- with self.fs.open("hello","rb") as f:
+ with self.fs.open("hello", "rb") as f:
f.seek(25)
- self.assertEquals(f.read(),b("123456"))
+ self.assertEquals(f.read(), b("123456"))
def test_write_past_end_of_file(self):
if self.fs.getmeta('file.read_and_write', True):
- with self.fs.open("write_at_end","wb") as f:
+ with self.fs.open("write_at_end", "wb") as f:
f.seek(25)
f.write(b("EOF"))
- with self.fs.open("write_at_end","rb") as f:
- self.assertEquals(f.read(),b("\x00")*25 + b("EOF"))
-
+ with self.fs.open("write_at_end", "rb") as f:
+ self.assertEquals(f.read(), b("\x00")*25 + b("EOF"))
def test_with_statement(self):
# This is a little tricky since 'with' is actually new syntax.
@@ -775,7 +805,7 @@ class FSTestCases(object):
code += "with self.fs.open('f.txt','wb-') as testfile:\n"
code += " testfile.write(contents)\n"
code += "self.assertEquals(self.fs.getcontents('f.txt', 'rb'),contents)"
- code = compile(code,"<string>",'exec')
+ code = compile(code, "<string>", 'exec')
eval(code)
# A 'with' statement raising an error
contents = "testing the with statement"
@@ -783,42 +813,43 @@ class FSTestCases(object):
code += "with self.fs.open('f.txt','wb-') as testfile:\n"
code += " testfile.write(contents)\n"
code += " raise ValueError\n"
- code = compile(code,"<string>",'exec')
- self.assertRaises(ValueError,eval,code,globals(),locals())
- self.assertEquals(self.fs.getcontents('f.txt', 'rb'),contents)
+ code = compile(code, "<string>", 'exec')
+ self.assertRaises(ValueError, eval, code, globals(), locals())
+ self.assertEquals(self.fs.getcontents('f.txt', 'rb'), contents)
def test_pickling(self):
if self.fs.getmeta('pickle_contents', True):
- self.fs.setcontents("test1",b("hello world"))
+ self.fs.setcontents("test1", b("hello world"))
fs2 = pickle.loads(pickle.dumps(self.fs))
self.assert_(fs2.isfile("test1"))
- fs3 = pickle.loads(pickle.dumps(self.fs,-1))
+ fs3 = pickle.loads(pickle.dumps(self.fs, -1))
self.assert_(fs3.isfile("test1"))
else:
# Just make sure it doesn't throw an exception
fs2 = pickle.loads(pickle.dumps(self.fs))
-
def test_big_file(self):
"""Test handling of a big file (1MB)"""
chunk_size = 1024 * 256
num_chunks = 4
+
def chunk_stream():
"""Generate predictable-but-randomy binary content."""
r = random.Random(0)
randint = r.randint
int2byte = six.int2byte
for _i in xrange(num_chunks):
- c = b("").join(int2byte(randint(0,255)) for _j in xrange(chunk_size//8))
+ c = b("").join(int2byte(randint(
+ 0, 255)) for _j in xrange(chunk_size//8))
yield c * 8
- f = self.fs.open("bigfile","wb")
+ f = self.fs.open("bigfile", "wb")
try:
for chunk in chunk_stream():
f.write(chunk)
finally:
f.close()
chunks = chunk_stream()
- f = self.fs.open("bigfile","rb")
+ f = self.fs.open("bigfile", "rb")
try:
try:
while True:
@@ -854,17 +885,19 @@ class FSTestCases(object):
self.assertRaises(RemoveRootError, self.fs.removedir, "/")
# May be disabled - see end of file
+
+
class ThreadingTestCases(object):
"""Testcases for thread-safety of FS implementations."""
# These are either too slow to be worth repeating,
# or cannot possibly break cross-thread.
- _dont_retest = ("test_pickling","test_multiple_overwrite",)
+ _dont_retest = ("test_pickling", "test_multiple_overwrite",)
__lock = threading.RLock()
def _yield(self):
- #time.sleep(0.001)
+ # time.sleep(0.001)
# Yields without a delay
time.sleep(0)
@@ -874,7 +907,7 @@ class ThreadingTestCases(object):
def _unlock(self):
self.__lock.release()
- def _makeThread(self,func,errors):
+ def _makeThread(self, func, errors):
def runThread():
try:
func()
@@ -884,74 +917,79 @@ class ThreadingTestCases(object):
thread.daemon = True
return thread
- def _runThreads(self,*funcs):
+ def _runThreads(self, *funcs):
check_interval = sys.getcheckinterval()
sys.setcheckinterval(1)
try:
errors = []
- threads = [self._makeThread(f,errors) for f in funcs]
+ threads = [self._makeThread(f, errors) for f in funcs]
for t in threads:
t.start()
for t in threads:
t.join()
- for (c,e,t) in errors:
- raise c,e,t
+ for (c, e, t) in errors:
+ raise e, None, t
finally:
sys.setcheckinterval(check_interval)
def test_setcontents_threaded(self):
- def setcontents(name,contents):
- f = self.fs.open(name,"wb")
+ def setcontents(name, contents):
+ f = self.fs.open(name, "wb")
self._yield()
try:
f.write(contents)
self._yield()
finally:
f.close()
+
def thread1():
c = b("thread1 was 'ere")
- setcontents("thread1.txt",c)
- self.assertEquals(self.fs.getcontents("thread1.txt", 'rb'),c)
+ setcontents("thread1.txt", c)
+ self.assertEquals(self.fs.getcontents("thread1.txt", 'rb'), c)
+
def thread2():
c = b("thread2 was 'ere")
- setcontents("thread2.txt",c)
- self.assertEquals(self.fs.getcontents("thread2.txt", 'rb'),c)
- self._runThreads(thread1,thread2)
+ setcontents("thread2.txt", c)
+ self.assertEquals(self.fs.getcontents("thread2.txt", 'rb'), c)
+ self._runThreads(thread1, thread2)
def test_setcontents_threaded_samefile(self):
- def setcontents(name,contents):
- f = self.fs.open(name,"wb")
+ def setcontents(name, contents):
+ f = self.fs.open(name, "wb")
self._yield()
try:
f.write(contents)
self._yield()
finally:
f.close()
+
def thread1():
c = b("thread1 was 'ere")
- setcontents("threads.txt",c)
+ setcontents("threads.txt", c)
self._yield()
- self.assertEquals(self.fs.listdir("/"),["threads.txt"])
+ self.assertEquals(self.fs.listdir("/"), ["threads.txt"])
+
def thread2():
c = b("thread2 was 'ere")
- setcontents("threads.txt",c)
+ setcontents("threads.txt", c)
self._yield()
- self.assertEquals(self.fs.listdir("/"),["threads.txt"])
+ self.assertEquals(self.fs.listdir("/"), ["threads.txt"])
+
def thread3():
c = b("thread3 was 'ere")
- setcontents("threads.txt",c)
+ setcontents("threads.txt", c)
self._yield()
- self.assertEquals(self.fs.listdir("/"),["threads.txt"])
+ self.assertEquals(self.fs.listdir("/"), ["threads.txt"])
try:
- self._runThreads(thread1,thread2,thread3)
+ self._runThreads(thread1, thread2, thread3)
except ResourceLockedError:
# that's ok, some implementations don't support concurrent writes
pass
def test_cases_in_separate_dirs(self):
- class TestCases_in_subdir(self.__class__,unittest.TestCase):
+ class TestCases_in_subdir(self.__class__, unittest.TestCase):
"""Run all testcases against a subdir of self.fs"""
- def __init__(this,subdir):
+ def __init__(this, subdir):
super(TestCases_in_subdir, this).__init__("test_listdir")
this.subdir = subdir
for meth in dir(this):
@@ -959,113 +997,136 @@ class ThreadingTestCases(object):
continue
if meth in self._dont_retest:
continue
- if not hasattr(FSTestCases,meth):
+ if not hasattr(FSTestCases, meth):
continue
if self.fs.exists(subdir):
- self.fs.removedir(subdir,force=True)
+ self.fs.removedir(subdir, force=True)
self.assertFalse(self.fs.isdir(subdir))
self.assertTrue(self.fs.isdir("/"))
self.fs.makedir(subdir)
self._yield()
- getattr(this,meth)()
+ getattr(this, meth)()
+
@property
def fs(this):
return self.fs.opendir(this.subdir)
- def check(this,p):
- return self.check(pathjoin(this.subdir,relpath(p)))
+
+ def check(this, p):
+ return self.check(pathjoin(this.subdir, relpath(p)))
+
def thread1():
TestCases_in_subdir("thread1")
+
def thread2():
TestCases_in_subdir("thread2")
+
def thread3():
TestCases_in_subdir("thread3")
- self._runThreads(thread1,thread2,thread3)
+ self._runThreads(thread1, thread2, thread3)
def test_makedir_winner(self):
errors = []
+
def makedir():
try:
self.fs.makedir("testdir")
except DestinationExistsError, e:
errors.append(e)
+
def makedir_noerror():
try:
- self.fs.makedir("testdir",allow_recreate=True)
+ self.fs.makedir("testdir", allow_recreate=True)
except DestinationExistsError, e:
errors.append(e)
+
def removedir():
try:
self.fs.removedir("testdir")
- except (ResourceNotFoundError,ResourceLockedError), e:
+ except (ResourceNotFoundError, ResourceLockedError), e:
errors.append(e)
# One thread should succeed, one should error
- self._runThreads(makedir,makedir)
- self.assertEquals(len(errors),1)
+ self._runThreads(makedir, makedir)
+ self.assertEquals(len(errors), 1)
self.fs.removedir("testdir")
# One thread should succeed, two should error
errors = []
- self._runThreads(makedir,makedir,makedir)
+ self._runThreads(makedir, makedir, makedir)
if len(errors) != 2:
raise AssertionError(errors)
self.fs.removedir("testdir")
# All threads should succeed
errors = []
- self._runThreads(makedir_noerror,makedir_noerror,makedir_noerror)
- self.assertEquals(len(errors),0)
+ self._runThreads(makedir_noerror, makedir_noerror, makedir_noerror)
+ self.assertEquals(len(errors), 0)
self.assertTrue(self.fs.isdir("testdir"))
self.fs.removedir("testdir")
# makedir() can beat removedir() and vice-versa
errors = []
- self._runThreads(makedir,removedir)
+ self._runThreads(makedir, removedir)
if self.fs.isdir("testdir"):
- self.assertEquals(len(errors),1)
- self.assertFalse(isinstance(errors[0],DestinationExistsError))
+ self.assertEquals(len(errors), 1)
+ self.assertFalse(isinstance(errors[0], DestinationExistsError))
self.fs.removedir("testdir")
else:
- self.assertEquals(len(errors),0)
+ self.assertEquals(len(errors), 0)
def test_concurrent_copydir(self):
self.fs.makedir("a")
self.fs.makedir("a/b")
- self.fs.setcontents("a/hello.txt",b("hello world"))
- self.fs.setcontents("a/guido.txt",b("is a space alien"))
- self.fs.setcontents("a/b/parrot.txt",b("pining for the fiords"))
+ self.fs.setcontents("a/hello.txt", b("hello world"))
+ self.fs.setcontents("a/guido.txt", b("is a space alien"))
+ self.fs.setcontents("a/b/parrot.txt", b("pining for the fiords"))
+
def copydir():
self._yield()
- self.fs.copydir("a","copy of a")
+ self.fs.copydir("a", "copy of a")
+
def copydir_overwrite():
self._yield()
- self.fs.copydir("a","copy of a",overwrite=True)
+ self.fs.copydir("a", "copy of a", overwrite=True)
# This should error out since we're not overwriting
- self.assertRaises(DestinationExistsError,self._runThreads,copydir,copydir)
+ self.assertRaises(
+ DestinationExistsError, self._runThreads, copydir, copydir)
+ self.assert_(self.fs.isdir('a'))
+ self.assert_(self.fs.isdir('a'))
+ copydir_overwrite()
+ self.assert_(self.fs.isdir('a'))
# This should run to completion and give a valid state, unless
# files get locked when written to.
try:
- self._runThreads(copydir_overwrite,copydir_overwrite)
+ self._runThreads(copydir_overwrite, copydir_overwrite)
except ResourceLockedError:
pass
self.assertTrue(self.fs.isdir("copy of a"))
self.assertTrue(self.fs.isdir("copy of a/b"))
- self.assertEqual(self.fs.getcontents("copy of a/b/parrot.txt", 'rb'),b("pining for the fiords"))
- self.assertEqual(self.fs.getcontents("copy of a/hello.txt", 'rb'),b("hello world"))
- self.assertEqual(self.fs.getcontents("copy of a/guido.txt", 'rb'),b("is a space alien"))
+ self.assertEqual(self.fs.getcontents(
+ "copy of a/b/parrot.txt", 'rb'), b("pining for the fiords"))
+ self.assertEqual(self.fs.getcontents(
+ "copy of a/hello.txt", 'rb'), b("hello world"))
+ self.assertEqual(self.fs.getcontents(
+ "copy of a/guido.txt", 'rb'), b("is a space alien"))
def test_multiple_overwrite(self):
- contents = [b("contents one"),b("contents the second"),b("number three")]
+ contents = [b("contents one"), b(
+ "contents the second"), b("number three")]
+
def thread1():
for i in xrange(30):
for c in contents:
- self.fs.setcontents("thread1.txt",c)
- self.assertEquals(self.fs.getsize("thread1.txt"),len(c))
- self.assertEquals(self.fs.getcontents("thread1.txt", 'rb'),c)
+ self.fs.setcontents("thread1.txt", c)
+ self.assertEquals(self.fs.getsize("thread1.txt"), len(c))
+ self.assertEquals(self.fs.getcontents(
+ "thread1.txt", 'rb'), c)
+
def thread2():
for i in xrange(30):
for c in contents:
- self.fs.setcontents("thread2.txt",c)
- self.assertEquals(self.fs.getsize("thread2.txt"),len(c))
- self.assertEquals(self.fs.getcontents("thread2.txt", 'rb'),c)
- self._runThreads(thread1,thread2)
+ self.fs.setcontents("thread2.txt", c)
+ self.assertEquals(self.fs.getsize("thread2.txt"), len(c))
+ self.assertEquals(self.fs.getcontents(
+ "thread2.txt", 'rb'), c)
+ self._runThreads(thread1, thread2)
# Uncomment to temporarily disable threading tests
-#class ThreadingTestCases(object):
+# class ThreadingTestCases(object):
# _dont_retest = ()