summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2009-06-04 05:51:16 +0000
committerrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2009-06-04 05:51:16 +0000
commit9a23be35ebaac211da6a67e63a0fc46f142558b3 (patch)
treed3b6f4ec89f1ca9c365a6669ce9c3708b89d0f7b
parent00abdcdb43051f9628ef1e729dae82674141af8a (diff)
downloadpyfilesystem-9a23be35ebaac211da6a67e63a0fc46f142558b3.tar.gz
Rename 'helpers' to 'path'.
Split tests up into several smaller modules git-svn-id: http://pyfilesystem.googlecode.com/svn/branches/rfk-ideas@152 67cdc799-7952-0410-af00-57a81ceafa0f
-rw-r--r--fs/__init__.py4
-rw-r--r--fs/base.py12
-rw-r--r--fs/expose/sftp.py8
-rw-r--r--fs/memoryfs.py4
-rw-r--r--fs/mountfs.py4
-rw-r--r--fs/multifs.py5
-rw-r--r--fs/osfs.py6
-rw-r--r--fs/path.py (renamed from fs/helpers.py)125
-rw-r--r--fs/s3fs.py5
-rw-r--r--fs/sftpfs.py5
-rw-r--r--fs/tests.py997
-rw-r--r--fs/tests/__init__.py443
-rw-r--r--fs/tests/test_expose.py95
-rw-r--r--fs/tests/test_fs.py88
-rw-r--r--fs/tests/test_objecttree.py47
-rw-r--r--fs/tests/test_path.py98
-rw-r--r--fs/tests/test_s3fs.py41
-rw-r--r--fs/tests/test_xattr.py116
-rw-r--r--fs/tests/test_zipfs.py153
-rw-r--r--fs/wrappers/hidedotfiles.py4
-rw-r--r--fs/wrappers/xattr.py2
-rw-r--r--fs/zipfs.py3
22 files changed, 1167 insertions, 1098 deletions
diff --git a/fs/__init__.py b/fs/__init__.py
index 438c822..cd8a13b 100644
--- a/fs/__init__.py
+++ b/fs/__init__.py
@@ -6,9 +6,9 @@ A filesystem abstraction.
__version__ = "0.1.1dev"
__author__ = "Will McGugan (will@willmcgugan.com)"
-# 'base' imports * from 'helpers' and 'errors'
+# 'base' imports * from 'path' and 'errors'
from base import *
import errors
-import helpers
+import path
diff --git a/fs/base.py b/fs/base.py
index 5400cf9..8dfeb2d 100644
--- a/fs/base.py
+++ b/fs/base.py
@@ -20,8 +20,8 @@ except ImportError:
import dummy_threading as threading
import dummy_threading
-from helpers import *
-from errors import *
+from fs.path import *
+from fs.errors import *
def silence_fserrors(f, *args, **kwargs):
@@ -581,7 +581,7 @@ class FS(object):
self.makedir(dst, allow_recreate=True)
for dirname, filenames in self.walk(src, search="depth"):
- dst_dirname = makerelative(dirname[len(src):])
+ dst_dirname = relpath(dirname[len(src):])
dst_dirpath = pathjoin(dst, dst_dirname)
self.makedir(dst_dirpath, allow_recreate=True, recursive=True)
@@ -625,7 +625,7 @@ class FS(object):
self.makedir(dst, allow_recreate=True)
for dirname, filenames in self.walk(src):
- dst_dirname = makerelative(dirname[len(src):])
+ dst_dirname = relpath(dirname[len(src):])
dst_dirpath = pathjoin(dst, dst_dirname)
self.makedir(dst_dirpath, allow_recreate=True)
@@ -713,10 +713,10 @@ class SubFS(FS):
files_only)
if absolute:
listpath = normpath(path)
- paths = [makeabsolute(pathjoin(listpath, path)) for path in paths]
+ paths = [abspath(pathjoin(listpath, path)) for path in paths]
elif full:
listpath = normpath(path)
- paths = [makerelative(pathjoin(listpath, path)) for path in paths]
+ paths = [relpath(pathjoin(listpath, path)) for path in paths]
return paths
diff --git a/fs/expose/sftp.py b/fs/expose/sftp.py
index 9147f3a..0042ee8 100644
--- a/fs/expose/sftp.py
+++ b/fs/expose/sftp.py
@@ -31,8 +31,8 @@ from StringIO import StringIO
import paramiko
+from fs.path import *
from fs.errors import *
-from fs.helpers import *
try:
@@ -99,7 +99,7 @@ class SFTPServerInterface(paramiko.SFTPServerInterface):
def stat(self,path):
info = self.fs.getinfo(path)
stat = paramiko.SFTPAttributes()
- stat.filename = resourcename(path)
+ stat.filename = basename(path)
stat.st_size = info.get("size")
stat.st_atime = time.mktime(info.get("accessed_time").timetuple())
stat.st_mtime = time.mktime(info.get("modified_time").timetuple())
@@ -136,7 +136,7 @@ class SFTPServerInterface(paramiko.SFTPServerInterface):
return paramiko.SFTP_OK
def canonicalize(self,path):
- return makeabsolute(path)
+ return abspath(path)
def chattr(self,path,attr):
return paramiko.SFTP_OP_UNSUPPORTED
@@ -299,5 +299,5 @@ if __name__ == "__main__":
try:
server.serve_forever()
except (SystemExit,KeyboardInterrupt):
- server.shutdown()
+ server.server_close()
diff --git a/fs/memoryfs.py b/fs/memoryfs.py
index 54be786..2fc4a86 100644
--- a/fs/memoryfs.py
+++ b/fs/memoryfs.py
@@ -8,8 +8,8 @@ Obviously that makes this particular filesystem very fast...
"""
import datetime
-from helpers import iteratepath
-from base import *
+from fs.path import iteratepath
+from fs.base import *
try:
from cStringIO import StringIO
diff --git a/fs/mountfs.py b/fs/mountfs.py
index 742b4d7..777dfd3 100644
--- a/fs/mountfs.py
+++ b/fs/mountfs.py
@@ -128,9 +128,9 @@ class MountFS(FS):
files_only=files_only)
if full or absolute:
if full:
- path = makeabsolute(path)
+ path = abspath(path)
else:
- path = makerelative(path)
+ path = relpath(path)
paths = [pathjoin(path, p) for p in paths]
return paths
diff --git a/fs/multifs.py b/fs/multifs.py
index c295e61..837f667 100644
--- a/fs/multifs.py
+++ b/fs/multifs.py
@@ -1,7 +1,8 @@
#!/usr/in/env python
-from base import FS, FSError
-from helpers import *
+from fs.base import FS, FSError
+from fs.path import *
+
class MultiFS(FS):
diff --git a/fs/osfs.py b/fs/osfs.py
index 8fc1838..f99b8fc 100644
--- a/fs/osfs.py
+++ b/fs/osfs.py
@@ -2,8 +2,8 @@
import os
-from base import *
-from helpers import *
+from fs.base import *
+from fs.path import *
try:
import xattr
@@ -35,7 +35,7 @@ class OSFS(FS):
return "<OSFS: %s>" % self.root_path
def getsyspath(self, path, allow_none=False):
- sys_path = os.path.join(self.root_path, makerelative(normpath(path))).replace('/', os.sep)
+ sys_path = os.path.join(self.root_path, relpath(path)).replace('/', os.sep)
return sys_path
def open(self, path, mode="r", **kwargs):
diff --git a/fs/helpers.py b/fs/path.py
index f88b17e..b4659d5 100644
--- a/fs/helpers.py
+++ b/fs/path.py
@@ -1,21 +1,12 @@
"""
- fs.helpers: useful standalone functions for FS path manipulation.
+ fs.path: useful functions for FS path manipulation.
-"""
-
-from itertools import chain
+This is broadly similar to the standard 'os.path' module but works with
+paths in the canonical format expected by all FS objects (backslash-separated,
+optional leading slash).
-
-def iteratepath(path, numsplits=None):
- """Iterate over the individual components of a path."""
- path = makerelative(normpath(path))
- if not path:
- return []
- if numsplits == None:
- return path.split('/')
- else:
- return path.split('/', numsplits)
+"""
def normpath(path):
@@ -40,7 +31,6 @@ def normpath(path):
"""
if not path:
return path
-
components = []
for comp in path.replace('\\','/').split("/"):
if not comp or comp == ".":
@@ -53,33 +43,48 @@ def normpath(path):
raise ValueError(err)
else:
components.append(comp)
-
if path[0] in "\\/":
if not components:
components = [""]
components.insert(0,"")
-
return "/".join(components)
+def iteratepath(path, numsplits=None):
+ """Iterate over the individual components of a path."""
+ path = relpath(normpath(path))
+ if not path:
+ return []
+ if numsplits == None:
+ return path.split('/')
+ else:
+ return path.split('/', numsplits)
+
+
def abspath(path):
"""Convert the given path to a normalized, absolute path.
- path -- A FS path
+ Since FS objects have no concept of a 'current directory' this simply
+ adds a leading '/' character if the path doesn't already have one.
+
"""
path = normpath(path)
- if not path or path[0] != "/":
- path = "/" + path
+ if not path:
+ return "/"
+ if path[0] != "/":
+ return "/" + path
return path
def relpath(path):
"""Convert the given path to a normalized, relative path.
- path -- A FS path
+ This is the inverse of abspath(), stripping a leading '/' from the
+ path if it is present.
+
"""
path = normpath(path)
- if path and path[0] == "/":
+ while path and path[0] == "/":
path = path[1:]
return path
@@ -87,8 +92,6 @@ def relpath(path):
def pathjoin(*paths):
"""Joins any number of paths together, returning a new path string.
- paths -- An iterable of path strings
-
>>> pathjoin('foo', 'bar', 'baz')
'foo/bar/baz'
@@ -100,7 +103,6 @@ def pathjoin(*paths):
"""
absolute = False
-
relpaths = []
for p in paths:
if p:
@@ -114,12 +116,15 @@ def pathjoin(*paths):
path = "/" + path
return path
+# Allow pathjoin() to be used as fs.path.join()
+join = pathjoin
+
def pathsplit(path):
- """Splits a path on a path separator. Returns a tuple containing the path up
- to that last separator and the remaining path component.
+ """Splits a path into (head,tail) pair.
- path -- A FS path
+ This function splits a path into a pair (head,tail) where 'tail' is the
+ last pathname component and 'head' is all preceeding components.
>>> pathsplit("foo/bar")
('foo', 'bar')
@@ -133,11 +138,15 @@ def pathsplit(path):
return ('', split[0])
return tuple(split)
+# Allow pathsplit() to be used as fs.path.split()
+split = pathsplit
+
def dirname(path):
"""Returns the parent directory of a path.
- path -- A FS path
+ This is always equivalent to the 'head' component of the value returned
+ by pathsplit(path).
>>> dirname('foo/bar/baz')
'foo/bar'
@@ -146,64 +155,44 @@ def dirname(path):
return pathsplit(path)[0]
-def resourcename(path):
- """Returns the resource references by a path.
+def basename(path):
+ """Returns the basename of the resource referenced by a path.
- path -- A FS path
+ This is always equivalent to the 'head' component of the value returned
+ by pathsplit(path).
- >>> resourcename('foo/bar/baz')
+ >>> basename('foo/bar/baz')
'baz'
"""
return pathsplit(path)[1]
-def makerelative(path):
- """Makes a path relative by removing initial separator.
-
- path -- A FS path
-
- >>> makerelative("/foo/bar")
- 'foo/bar'
-
- """
- path = normpath(path)
- if path.startswith('/'):
- return path[1:]
- return path
-
-
-def makeabsolute(path):
- """Makes a path absolute by adding a separator at the start of the path.
-
- path -- A FS path
-
- >>> makeabsolute("foo/bar/baz")
- '/foo/bar/baz'
-
- """
- path = normpath(path)
- if not path.startswith('/'):
- return '/'+path
- return path
-
-
def issamedir(path1, path2):
"""Return true if two paths reference a resource in the same directory.
- path1 -- First path
- path2 -- Second path
-
>>> issamedir("foo/bar/baz.txt", "foo/bar/spam.txt")
True
>>> issamedir("foo/bar/baz/txt", "spam/eggs/spam.txt")
False
+
"""
return pathsplit(normpath(path1))[0] == pathsplit(normpath(path2))[0]
def isprefix(path1,path2):
- """Return true is path1 is a prefix of path2."""
+ """Return true is path1 is a prefix of path2.
+
+ >>> isprefix("foo/bar", "foo/bar/spam.txt")
+ True
+ >>> isprefix("foo/bar/", "foo/bar")
+ True
+ >>> isprefix("foo/barry", "foo/baz/bar")
+ False
+ >>> isprefix("foo/bar/baz/", "foo/baz/bar")
+ False
+
+ """
bits1 = path1.split("/")
bits2 = path2.split("/")
while bits1 and bits1[-1] == "":
@@ -214,6 +203,4 @@ def isprefix(path1,path2):
if bit1 != bit2:
return False
return True
-
-
diff --git a/fs/s3fs.py b/fs/s3fs.py
index 145a604..a1cc233 100644
--- a/fs/s3fs.py
+++ b/fs/s3fs.py
@@ -18,7 +18,6 @@ except ImportError:
from tempfile import NamedTemporaryFile as TempFile
from fs.base import *
-from fs.helpers import *
class S3FS(FS):
@@ -116,7 +115,7 @@ class S3FS(FS):
def _s3path(self,path):
"""Get the absolute path to a file stored in S3."""
- path = makerelative(path)
+ path = relpath(path)
path = self._separator.join(iteratepath(path))
s3path = self._prefix + path
if s3path[-1] == self._separator:
@@ -436,7 +435,7 @@ class S3FS(FS):
# Since S3 lists in lexicographic order, subsequent iterations
# of the loop will check for the existence of the new filename.
if k.name == s3path_dstD:
- nm = resourcename(src)
+ nm = basename(src)
dst = pathjoin(dirname(dst),nm)
s3path_dst = s3path_dstD + nm
dstOK = True
diff --git a/fs/sftpfs.py b/fs/sftpfs.py
index 72855af..6429791 100644
--- a/fs/sftpfs.py
+++ b/fs/sftpfs.py
@@ -10,7 +10,6 @@ import stat as statinfo
import paramiko
from fs.base import *
-from fs.helpers import *
if not hasattr(paramiko.SFTPFile,"__enter__"):
@@ -53,7 +52,7 @@ class SFTPFS(FS):
if not connection.is_authenticated():
connection.connect(**credentials)
self.client = paramiko.SFTPClient.from_transport(connection)
- self.root = makeabsolute(root)
+ self.root = abspath(root)
def __del__(self):
self.close()
@@ -84,7 +83,7 @@ class SFTPFS(FS):
self.client = None
def _normpath(self,path):
- npath = pathjoin(self.root,makerelative(path))
+ npath = pathjoin(self.root,relpath(path))
if not isprefix(self.root,npath):
raise PathError(path,msg="Path is outside root: %(path)s")
return npath
diff --git a/fs/tests.py b/fs/tests.py
deleted file mode 100644
index 2ceb42c..0000000
--- a/fs/tests.py
+++ /dev/null
@@ -1,997 +0,0 @@
-#!/usr/bin/env python
-"""
-
- fs.tests: testcases for the fs module
-
-"""
-
-import sys
-import logging
-logging.basicConfig(level=logging.ERROR,stream=sys.stdout)
-
-import unittest
-
-import shutil
-import tempfile
-import pickle
-import socket
-import threading
-
-import base as fs
-from helpers import *
-
-
-####################
-
-
-class FSTestCases:
- """Base suite of testcases for filesystem implementations.
-
- Any FS subclass should be capable of passing all of these tests.
- To apply the tests to your own FS implementation, simply subclass
- FSTestCase and have the setUp method set self.fs to an instance
- of your FS implementation.
-
- Note that you'll also need to subclass unittest.TestCase - this class
- is designed as a mixin so that it's not picked up by test tools such
- as nose.
- """
-
- def check(self, p):
- """Check that a file exists within self.fs"""
- return self.fs.exists(p)
-
- def test_root_dir(self):
- self.assertTrue(self.fs.isdir(""))
- self.assertTrue(self.fs.isdir("/"))
-
- def test_debug(self):
- str(self.fs)
- repr(self.fs)
- self.assert_(hasattr(self.fs, 'desc'))
-
- def test_writefile(self):
- self.assertRaises(fs.ResourceNotFoundError,self.fs.open,"test1.txt")
- f = self.fs.open("test1.txt","w")
- f.write("testing")
- f.close()
- f = self.fs.open("test1.txt","r")
- self.assertEquals(f.read(),"testing")
-
- def test_isdir_isfile(self):
- self.assertFalse(self.fs.exists("dir1"))
- self.assertFalse(self.fs.isdir("dir1"))
- self.assertFalse(self.fs.isfile("a.txt"))
- self.fs.createfile("a.txt")
- self.assertFalse(self.fs.isdir("dir1"))
- self.assertTrue(self.fs.exists("a.txt"))
- self.assertTrue(self.fs.isfile("a.txt"))
- self.fs.makedir("dir1")
- self.assertTrue(self.fs.isdir("dir1"))
- self.assertTrue(self.fs.exists("dir1"))
- self.assertTrue(self.fs.exists("a.txt"))
- self.fs.remove("a.txt")
- self.assertFalse(self.fs.exists("a.txt"))
-
- def test_listdir(self):
- self.fs.createfile("a")
- self.fs.createfile("b")
- self.fs.createfile("foo")
- self.fs.createfile("bar")
- # Test listing of the root directory
- d1 = self.fs.listdir()
- self.assertEqual(len(d1), 4)
- self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"])
- d1 = self.fs.listdir("")
- self.assertEqual(len(d1), 4)
- self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"])
- d1 = self.fs.listdir("/")
- self.assertEqual(len(d1), 4)
- # Test listing absolute paths
- d2 = self.fs.listdir(absolute=True)
- self.assertEqual(len(d2), 4)
- self.assertEqual(sorted(d2), ["/a", "/b", "/bar", "/foo"])
- # Create some deeper subdirectories, to make sure their
- # contents are not inadvertantly included
- self.fs.makedir("p/1/2/3",recursive=True)
- self.fs.createfile("p/1/2/3/a")
- self.fs.createfile("p/1/2/3/b")
- self.fs.createfile("p/1/2/3/foo")
- self.fs.createfile("p/1/2/3/bar")
- self.fs.makedir("q")
- # Test listing just files, just dirs, and wildcards
- dirs_only = self.fs.listdir(dirs_only=True)
- files_only = self.fs.listdir(files_only=True)
- contains_a = self.fs.listdir(wildcard="*a*")
- self.assertEqual(sorted(dirs_only), ["p", "q"])
- self.assertEqual(sorted(files_only), ["a", "b", "bar", "foo"])
- self.assertEqual(sorted(contains_a), ["a", "bar"])
- # Test listing a subdirectory
- d3 = self.fs.listdir("p/1/2/3")
- self.assertEqual(len(d3), 4)
- self.assertEqual(sorted(d3), ["a", "b", "bar", "foo"])
- # Test listing a subdirectory with absoliute and full paths
- d4 = self.fs.listdir("p/1/2/3", absolute=True)
- self.assertEqual(len(d4), 4)
- self.assertEqual(sorted(d4), ["/p/1/2/3/a", "/p/1/2/3/b", "/p/1/2/3/bar", "/p/1/2/3/foo"])
- d4 = self.fs.listdir("p/1/2/3", full=True)
- self.assertEqual(len(d4), 4)
- self.assertEqual(sorted(d4), ["p/1/2/3/a", "p/1/2/3/b", "p/1/2/3/bar", "p/1/2/3/foo"])
- # Test that appropriate errors are raised
- self.assertRaises(fs.ResourceNotFoundError,self.fs.listdir,"zebra")
- self.assertRaises(fs.ResourceInvalidError,self.fs.listdir,"foo")
-
- def test_makedir(self):
- check = self.check
- self.fs.makedir("a")
- self.assertTrue(check("a"))
- self.assertRaises(fs.ParentDirectoryMissingError,self.fs.makedir,"a/b/c")
- self.fs.makedir("a/b/c", recursive=True)
- self.assert_(check("a/b/c"))
- self.fs.makedir("foo/bar/baz", recursive=True)
- self.assert_(check("foo/bar/baz"))
- self.fs.makedir("a/b/child")
- self.assert_(check("a/b/child"))
- self.assertRaises(fs.DestinationExistsError,self.fs.makedir,"/a/b")
- self.fs.makedir("/a/b",allow_recreate=True)
- self.fs.createfile("/a/file")
- self.assertRaises(fs.ResourceInvalidError,self.fs.makedir,"a/file")
-
- def test_remove(self):
- self.fs.createfile("a.txt")
- self.assertTrue(self.check("a.txt"))
- self.fs.remove("a.txt")
- self.assertFalse(self.check("a.txt"))
- self.assertRaises(fs.ResourceNotFoundError,self.fs.remove,"a.txt")
- self.fs.makedir("dir1")
- self.assertRaises(fs.ResourceInvalidError,self.fs.remove,"dir1")
- self.fs.createfile("/dir1/a.txt")
- self.assertTrue(self.check("dir1/a.txt"))
- self.fs.remove("dir1/a.txt")
- self.assertFalse(self.check("/dir1/a.txt"))
-
- def test_removedir(self):
- check = self.check
- self.fs.makedir("a")
- self.assert_(check("a"))
- self.fs.removedir("a")
- self.assert_(not check("a"))
- self.fs.makedir("a/b/c/d", recursive=True)
- self.assertRaises(fs.DirectoryNotEmptyError, self.fs.removedir, "a/b")
- self.fs.removedir("a/b/c/d")
- self.assert_(not check("a/b/c/d"))
- self.fs.removedir("a/b/c")
- self.assert_(not check("a/b/c"))
- self.fs.removedir("a/b")
- self.assert_(not check("a/b"))
- # Test recursive removal of empty parent dirs
- self.fs.makedir("foo/bar/baz", recursive=True)
- self.fs.removedir("foo/bar/baz", recursive=True)
- self.assert_(not check("foo/bar/baz"))
- self.assert_(not check("foo/bar"))
- self.assert_(not check("foo"))
- # Ensure that force=True works as expected
- self.fs.makedir("frollic/waggle", recursive=True)
- self.fs.createfile("frollic/waddle.txt","waddlewaddlewaddle")
- self.assertRaises(fs.DirectoryNotEmptyError,self.fs.removedir,"frollic")
- self.assertRaises(fs.ResourceInvalidError,self.fs.removedir,"frollic/waddle.txt")
- self.fs.removedir("frollic",force=True)
- self.assert_(not check("frollic"))
-
- def test_rename(self):
- check = self.check
- self.fs.open("foo.txt", 'wt').write("Hello, World!")
- self.assert_(check("foo.txt"))
- self.fs.rename("foo.txt", "bar.txt")
- self.assert_(check("bar.txt"))
- self.assert_(not check("foo.txt"))
-
- def test_info(self):
- test_str = "Hello, World!"
- self.fs.createfile("info.txt",test_str)
- info = self.fs.getinfo("info.txt")
- self.assertEqual(info['size'], len(test_str))
- self.fs.desc("info.txt")
-
- def test_getsize(self):
- test_str = "*"*23
- self.fs.createfile("info.txt",test_str)
- size = self.fs.getsize("info.txt")
- self.assertEqual(size, len(test_str))
-
- def test_movefile(self):
- check = self.check
- contents = "If the implementation is hard to explain, it's a bad idea."
- def makefile(path):
- self.fs.createfile(path,contents)
- def checkcontents(path):
- check_contents = self.fs.getcontents(path)
- self.assertEqual(check_contents,contents)
- return contents == check_contents
-
- self.fs.makedir("foo/bar", recursive=True)
- makefile("foo/bar/a.txt")
- self.assert_(check("foo/bar/a.txt"))
- self.assert_(checkcontents("foo/bar/a.txt"))
- self.fs.move("foo/bar/a.txt", "foo/b.txt")
- self.assert_(not check("foo/bar/a.txt"))
- self.assert_(check("foo/b.txt"))
- self.assert_(checkcontents("foo/b.txt"))
-
- self.fs.move("foo/b.txt", "c.txt")
- self.assert_(not check("foo/b.txt"))
- self.assert_(check("/c.txt"))
- self.assert_(checkcontents("/c.txt"))
-
- makefile("foo/bar/a.txt")
- self.assertRaises(fs.DestinationExistsError,self.fs.move,"foo/bar/a.txt","/c.txt")
- self.assert_(check("foo/bar/a.txt"))
- self.assert_(check("/c.txt"))
- self.fs.move("foo/bar/a.txt","/c.txt",overwrite=True)
- self.assert_(not check("foo/bar/a.txt"))
- self.assert_(check("/c.txt"))
-
-
- def test_movedir(self):
- check = self.check
- contents = "If the implementation is hard to explain, it's a bad idea."
- def makefile(path):
- self.fs.createfile(path,contents)
-
- self.fs.makedir("a")
- self.fs.makedir("b")
- makefile("a/1.txt")
- makefile("a/2.txt")
- makefile("a/3.txt")
- self.fs.makedir("a/foo/bar", recursive=True)
- makefile("a/foo/bar/baz.txt")
-
- self.fs.movedir("a", "copy of a")
-
- self.assert_(check("copy of a/1.txt"))
- self.assert_(check("copy of a/2.txt"))
- self.assert_(check("copy of a/3.txt"))
- self.assert_(check("copy of a/foo/bar/baz.txt"))
-
- self.assert_(not check("a/1.txt"))
- self.assert_(not check("a/2.txt"))
- self.assert_(not check("a/3.txt"))
- self.assert_(not check("a/foo/bar/baz.txt"))
- self.assert_(not check("a/foo/bar"))
- self.assert_(not check("a/foo"))
- self.assert_(not check("a"))
-
- self.fs.makedir("a")
- print self.fs.listdir("a")
- self.assertRaises(fs.DestinationExistsError,self.fs.movedir,"copy of a","a")
- self.fs.movedir("copy of a","a",overwrite=True)
- self.assert_(not check("copy of a"))
- print self.fs.listdir("a")
- self.assert_(check("a/1.txt"))
- self.assert_(check("a/2.txt"))
- self.assert_(check("a/3.txt"))
- self.assert_(check("a/foo/bar/baz.txt"))
-
-
- def test_copyfile(self):
- check = self.check
- contents = "If the implementation is hard to explain, it's a bad idea."
- def makefile(path,contents=contents):
- self.fs.createfile(path,contents)
- def checkcontents(path,contents=contents):
- check_contents = self.fs.getcontents(path)
- self.assertEqual(check_contents,contents)
- return contents == check_contents
-
- self.fs.makedir("foo/bar", recursive=True)
- makefile("foo/bar/a.txt")
- self.assert_(check("foo/bar/a.txt"))
- self.assert_(checkcontents("foo/bar/a.txt"))
- self.fs.copy("foo/bar/a.txt", "foo/b.txt")
- self.assert_(check("foo/bar/a.txt"))
- self.assert_(check("foo/b.txt"))
- self.assert_(checkcontents("foo/b.txt"))
-
- self.fs.copy("foo/b.txt", "c.txt")
- self.assert_(check("foo/b.txt"))
- self.assert_(check("/c.txt"))
- self.assert_(checkcontents("/c.txt"))
-
- makefile("foo/bar/a.txt","different contents")
- self.assertRaises(fs.DestinationExistsError,self.fs.copy,"foo/bar/a.txt","/c.txt")
- self.assert_(checkcontents("/c.txt"))
- self.fs.copy("foo/bar/a.txt","/c.txt",overwrite=True)
- self.assert_(checkcontents("foo/bar/a.txt","different contents"))
- self.assert_(checkcontents("/c.txt","different contents"))
-
-
- def test_copydir(self):
- check = self.check
- contents = "If the implementation is hard to explain, it's a bad idea."
- def makefile(path):
- self.fs.createfile(path,contents)
- def checkcontents(path):
- check_contents = self.fs.getcontents(path)
- self.assertEqual(check_contents,contents)
- return contents == check_contents
-
- self.fs.makedir("a")
- self.fs.makedir("b")
- makefile("a/1.txt")
- makefile("a/2.txt")
- makefile("a/3.txt")
- self.fs.makedir("a/foo/bar", recursive=True)
- makefile("a/foo/bar/baz.txt")
-
- self.fs.copydir("a", "copy of a")
- self.assert_(check("copy of a/1.txt"))
- self.assert_(check("copy of a/2.txt"))
- self.assert_(check("copy of a/3.txt"))
- self.assert_(check("copy of a/foo/bar/baz.txt"))
- checkcontents("copy of a/1.txt")
-
- self.assert_(check("a/1.txt"))
- self.assert_(check("a/2.txt"))
- self.assert_(check("a/3.txt"))
- self.assert_(check("a/foo/bar/baz.txt"))
- checkcontents("a/1.txt")
-
- self.assertRaises(fs.DestinationExistsError,self.fs.copydir,"a","b")
- self.fs.copydir("a","b",overwrite=True)
- self.assert_(check("b/1.txt"))
- self.assert_(check("b/2.txt"))
- self.assert_(check("b/3.txt"))
- self.assert_(check("b/foo/bar/baz.txt"))
- checkcontents("b/1.txt")
-
- def test_copydir_with_dotfile(self):
- check = self.check
- contents = "If the implementation is hard to explain, it's a bad idea."
- def makefile(path):
- self.fs.createfile(path,contents)
-
- self.fs.makedir("a")
- makefile("a/1.txt")
- makefile("a/2.txt")
- makefile("a/.hidden.txt")
-
- self.fs.copydir("a", "copy of a")
- self.assert_(check("copy of a/1.txt"))
- self.assert_(check("copy of a/2.txt"))
- self.assert_(check("copy of a/.hidden.txt"))
-
- self.assert_(check("a/1.txt"))
- self.assert_(check("a/2.txt"))
- self.assert_(check("a/.hidden.txt"))
-
- def test_readwriteappendseek(self):
- def checkcontents(path, check_contents):
- read_contents = self.fs.getcontents(path)
- self.assertEqual(read_contents,check_contents)
- return read_contents == check_contents
- test_strings = ["Beautiful is better than ugly.",
- "Explicit is better than implicit.",
- "Simple is better than complex."]
- all_strings = "".join(test_strings)
-
- self.assertRaises(fs.ResourceNotFoundError, self.fs.open, "a.txt", "r")
- self.assert_(not self.fs.exists("a.txt"))
- f1 = self.fs.open("a.txt", "wb")
- pos = 0
- for s in test_strings:
- f1.write(s)
- pos += len(s)
- self.assertEqual(pos, f1.tell())
- f1.close()
- self.assert_(self.fs.exists("a.txt"))
- self.assert_(checkcontents("a.txt", all_strings))
-
- f2 = self.fs.open("b.txt", "wb")
- f2.write(test_strings[0])
- f2.close()
- self.assert_(checkcontents("b.txt", test_strings[0]))
- f3 = self.fs.open("b.txt", "ab")
- f3.write(test_strings[1])
- f3.write(test_strings[2])
- f3.close()
- self.assert_(checkcontents("b.txt", all_strings))
- f4 = self.fs.open("b.txt", "wb")
- f4.write(test_strings[2])
- f4.close()
- self.assert_(checkcontents("b.txt", test_strings[2]))
- f5 = self.fs.open("c.txt", "wb")
- for s in test_strings:
- f5.write(s+"\n")
- f5.close()
- f6 = self.fs.open("c.txt", "rb")
- for s, t in zip(f6, test_strings):
- self.assertEqual(s, t+"\n")
- f6.close()
- f7 = self.fs.open("c.txt", "rb")
- f7.seek(13)
- word = f7.read(6)
- self.assertEqual(word, "better")
- f7.seek(1, os.SEEK_CUR)
- word = f7.read(4)
- self.assertEqual(word, "than")
- f7.seek(-9, os.SEEK_END)
- word = f7.read(7)
- self.assertEqual(word, "complex")
- f7.close()
- self.assertEqual(self.fs.getcontents("a.txt"), all_strings)
-
- def test_with_statement(self):
- import sys
- if sys.version_info[0] >= 2 and sys.version_info[1] >= 5:
- # A successful 'with' statement
- contents = "testing the with statement"
- code = "from __future__ import with_statement\n"
- code += "with self.fs.open('f.txt','w-') as testfile:\n"
- code += " testfile.write(contents)\n"
- code += "self.assertEquals(self.fs.getcontents('f.txt'),contents)"
- code = compile(code,"<string>",'exec')
- eval(code)
- # A 'with' statement raising an error
- contents = "testing the with statement"
- code = "from __future__ import with_statement\n"
- code += "with self.fs.open('f.txt','w-') as testfile:\n"
- code += " testfile.write(contents)\n"
- code += " raise ValueError\n"
- code = compile(code,"<string>",'exec')
- self.assertRaises(ValueError,eval,code,globals(),locals())
- self.assertEquals(self.fs.getcontents('f.txt'),contents)
-
- def test_pickling(self):
- self.fs.createfile("test1","hello world")
- oldfs = self.fs
- self.fs = pickle.loads(pickle.dumps(self.fs))
- self.assert_(self.fs.isfile("test1"))
-
-
-
-class XAttrTestCases:
- """Testcases for filesystems providing extended attribute support."""
-
- def test_getsetdel(self):
- def do_getsetdel(p):
- self.assertEqual(self.fs.getxattr(p,"xattr1"),None)
- self.fs.setxattr(p,"xattr1","value1")
- self.assertEqual(self.fs.getxattr(p,"xattr1"),"value1")
- self.fs.delxattr(p,"xattr1")
- self.assertEqual(self.fs.getxattr(p,"xattr1"),None)
- self.fs.createfile("test.txt","hello")
- do_getsetdel("test.txt")
- self.assertRaises(fs.ResourceNotFoundError,self.fs.getxattr,"test2.txt","xattr1")
- self.fs.makedir("mystuff")
- self.fs.createfile("/mystuff/test.txt","")
- do_getsetdel("mystuff")
- do_getsetdel("mystuff/test.txt")
-
- def test_list_xattrs(self):
- def do_list(p):
- self.assertEquals(sorted(self.fs.xattrs(p)),[])
- self.fs.setxattr(p,"xattr1","value1")
- self.assertEquals(sorted(self.fs.xattrs(p)),["xattr1"])
- self.fs.setxattr(p,"attr2","value2")
- self.assertEquals(sorted(self.fs.xattrs(p)),["attr2","xattr1"])
- self.fs.delxattr(p,"xattr1")
- self.assertEquals(sorted(self.fs.xattrs(p)),["attr2"])
- self.fs.delxattr(p,"attr2")
- self.assertEquals(sorted(self.fs.xattrs(p)),[])
- self.fs.createfile("test.txt","hello")
- do_list("test.txt")
- self.fs.makedir("mystuff")
- self.fs.createfile("/mystuff/test.txt","")
- do_list("mystuff")
- do_list("mystuff/test.txt")
-
- def test_copy_xattrs(self):
- self.fs.createfile("a.txt","content")
- self.fs.setxattr("a.txt","myattr","myvalue")
- self.fs.setxattr("a.txt","testattr","testvalue")
- self.fs.makedir("stuff")
- self.fs.copy("a.txt","stuff/a.txt")
- self.assertTrue(self.fs.exists("stuff/a.txt"))
- self.assertEquals(self.fs.getxattr("stuff/a.txt","myattr"),"myvalue")
- self.assertEquals(self.fs.getxattr("stuff/a.txt","testattr"),"testvalue")
- self.assertEquals(self.fs.getxattr("a.txt","myattr"),"myvalue")
- self.assertEquals(self.fs.getxattr("a.txt","testattr"),"testvalue")
- self.fs.setxattr("stuff","dirattr","a directory")
- self.fs.copydir("stuff","stuff2")
- self.assertEquals(self.fs.getxattr("stuff2/a.txt","myattr"),"myvalue")
- self.assertEquals(self.fs.getxattr("stuff2/a.txt","testattr"),"testvalue")
- self.assertEquals(self.fs.getxattr("stuff2","dirattr"),"a directory")
- self.assertEquals(self.fs.getxattr("stuff","dirattr"),"a directory")
-
- def test_move_xattrs(self):
- self.fs.createfile("a.txt","content")
- self.fs.setxattr("a.txt","myattr","myvalue")
- self.fs.setxattr("a.txt","testattr","testvalue")
- self.fs.makedir("stuff")
- self.fs.move("a.txt","stuff/a.txt")
- self.assertTrue(self.fs.exists("stuff/a.txt"))
- self.assertEquals(self.fs.getxattr("stuff/a.txt","myattr"),"myvalue")
- self.assertEquals(self.fs.getxattr("stuff/a.txt","testattr"),"testvalue")
- self.fs.setxattr("stuff","dirattr","a directory")
- self.fs.movedir("stuff","stuff2")
- self.assertEquals(self.fs.getxattr("stuff2/a.txt","myattr"),"myvalue")
- self.assertEquals(self.fs.getxattr("stuff2/a.txt","testattr"),"testvalue")
- self.assertEquals(self.fs.getxattr("stuff2","dirattr"),"a directory")
-
-
-####################
-
-
-class TestHelpers(unittest.TestCase):
- """Testcases for FS path helpers."""
-
- def test_normpath(self):
- tests = [ ("\\a\\b\\c", "/a/b/c"),
- ("", ""),
- ("/a/b/c", "/a/b/c"),
- ("a/b/c", "a/b/c"),
- ("a/b/../c/", "a/c"),
- ("/","/"),
- ]
- for path, result in tests:
- self.assertEqual(normpath(path), result)
-
- def test_pathjoin(self):
- tests = [ ("", "a", "a"),
- ("a", "a", "a/a"),
- ("a/b", "../c", "a/c"),
- ("a/b/../c", "d", "a/c/d"),
- ("/a/b/c", "d", "/a/b/c/d"),
- ("/a/b/c", "../../../d", "/d"),
- ("a", "b", "c", "a/b/c"),
- ("a/b/c", "../d", "c", "a/b/d/c"),
- ("a/b/c", "../d", "/a", "/a"),
- ("aaa", "bbb/ccc", "aaa/bbb/ccc"),
- ("aaa", "bbb\ccc", "aaa/bbb/ccc"),
- ("aaa", "bbb", "ccc", "/aaa", "eee", "/aaa/eee"),
- ("a/b", "./d", "e", "a/b/d/e"),
- ("/", "/", "/"),
- ("/", "", "/"),
- ]
- for testpaths in tests:
- paths = testpaths[:-1]
- result = testpaths[-1]
- self.assertEqual(fs.pathjoin(*paths), result)
-
- self.assertRaises(ValueError, fs.pathjoin, "../")
- self.assertRaises(ValueError, fs.pathjoin, "./../")
- self.assertRaises(ValueError, fs.pathjoin, "a/b", "../../..")
- self.assertRaises(ValueError, fs.pathjoin, "a/b/../../../d")
-
- def test_makerelative(self):
- tests = [ ("/a/b", "a/b"),
- ("a/b", "a/b"),
- ("/", "") ]
-
- for path, result in tests:
- print path, result
- self.assertEqual(fs.makerelative(path), result)
-
- def test_makeabsolute(self):
- tests = [ ("/a/b", "/a/b"),
- ("a/b", "/a/b"),
- ("/", "/") ]
-
- for path, result in tests:
- self.assertEqual(fs.makeabsolute(path), result)
-
- def test_iteratepath(self):
- tests = [ ("a/b", ["a", "b"]),
- ("", [] ),
- ("aaa/bbb/ccc", ["aaa", "bbb", "ccc"]),
- ("a/b/c/../d", ["a", "b", "d"]) ]
-
- for path, results in tests:
- print repr(path), results
- for path_component, expected in zip(iteratepath(path), results):
- self.assertEqual(path_component, expected)
-
- self.assertEqual(list(iteratepath("a/b/c/d", 1)), ["a", "b/c/d"])
- self.assertEqual(list(iteratepath("a/b/c/d", 2)), ["a", "b", "c/d"])
-
- def test_pathsplit(self):
- tests = [ ("a/b", ("a", "b")),
- ("a/b/c", ("a/b", "c")),
- ("a", ("", "a")),
- ("", ("", "")),
- ("/", ("", "")),
- ("foo/bar", ("foo", "bar")),
- ("foo/bar/baz", ("foo/bar", "baz")),
- ]
- for path, result in tests:
- self.assertEqual(fs.pathsplit(path), result)
-
-
-####################
-
-
-import objecttree
-
-class TestObjectTree(unittest.TestCase):
- """Testcases for the ObjectTree class."""
-
- def test_getset(self):
- ot = objecttree.ObjectTree()
- ot['foo'] = "bar"
- self.assertEqual(ot['foo'], 'bar')
-
- ot = objecttree.ObjectTree()
- ot['foo/bar'] = "baz"
- self.assertEqual(ot['foo'], {'bar':'baz'})
- self.assertEqual(ot['foo/bar'], 'baz')
-
- del ot['foo/bar']
- self.assertEqual(ot['foo'], {})
-
- ot = objecttree.ObjectTree()
- ot['a/b/c'] = "A"
- ot['a/b/d'] = "B"
- ot['a/b/e'] = "C"
- ot['a/b/f'] = "D"
- self.assertEqual(sorted(ot['a/b'].values()), ['A', 'B', 'C', 'D'])
- self.assert_(ot.get('a/b/x', -1) == -1)
-
- self.assert_('a/b/c' in ot)
- self.assert_('a/b/x' not in ot)
- self.assert_(ot.isobject('a/b/c'))
- self.assert_(ot.isobject('a/b/d'))
- self.assert_(not ot.isobject('a/b'))
-
- left, object, right = ot.partialget('a/b/e/f/g')
- self.assertEqual(left, "a/b/e")
- self.assertEqual(object, "C")
- self.assertEqual(right, "f/g")
-
-
-####################
-
-
-import osfs
-import os
-
-class TestOSFS(unittest.TestCase,FSTestCases):
-
- def setUp(self):
- self.temp_dir = tempfile.mkdtemp("fstest")
- self.fs = osfs.OSFS(self.temp_dir)
-
- def tearDown(self):
- shutil.rmtree(self.temp_dir)
-
- def check(self, p):
- return os.path.exists(os.path.join(self.temp_dir, makerelative(p)))
-
-#try:
-# import xattr
-#except ImportError:
-# pass
-#else:
-# TestOSFS.__bases__ = TestOSFS.__bases__ + (XAttrTestCases,)
-
-
-
-class TestSubFS(unittest.TestCase,FSTestCases):
-
- def setUp(self):
- self.temp_dir = tempfile.mkdtemp("fstest")
- self.parent_fs = osfs.OSFS(self.temp_dir)
- self.parent_fs.makedir("foo/bar", recursive=True)
- self.fs = self.parent_fs.opendir("foo/bar")
-
- def tearDown(self):
- shutil.rmtree(self.temp_dir)
-
- def check(self, p):
- p = os.path.join("foo/bar", makerelative(p))
- full_p = os.path.join(self.temp_dir, p)
- return os.path.exists(full_p)
-
-
-import memoryfs
-class TestMemoryFS(unittest.TestCase,FSTestCases):
-
- def setUp(self):
- self.fs = memoryfs.MemoryFS()
-
-
-import mountfs
-class TestMountFS(unittest.TestCase,FSTestCases):
-
- def setUp(self):
- self.mount_fs = mountfs.MountFS()
- self.mem_fs = memoryfs.MemoryFS()
- self.mount_fs.mountdir("mounted/memfs", self.mem_fs)
- self.fs = self.mount_fs.opendir("mounted/memfs")
-
- def tearDown(self):
- pass
-
- def check(self, p):
- return self.mount_fs.exists(os.path.join("mounted/memfs", makerelative(p)))
-
-
-import tempfs
-class TestTempFS(unittest.TestCase,FSTestCases):
-
- def setUp(self):
- self.fs = tempfs.TempFS()
-
- def tearDown(self):
- td = self.fs._temp_dir
- self.fs.close()
- self.assert_(not os.path.exists(td))
-
- def check(self, p):
- td = self.fs._temp_dir
- return os.path.exists(os.path.join(td, makerelative(p)))
-
-
-import s3fs
-class TestS3FS(unittest.TestCase,FSTestCases):
-
- bucket = "test-s3fs.rfk.id.au"
-
- def setUp(self):
- import nose
- raise nose.SkipTest
- self.fs = s3fs.S3FS(self.bucket,"/unittest/files")
- self._clear()
-
- def _clear(self):
- for (path,files) in self.fs.walk(search="depth"):
- for fn in files:
- self.fs.remove(pathjoin(path,fn))
- if path and path != "/":
- self.fs.removedir(path)
-
- def tearDown(self):
- self._clear()
- for k in self.fs._s3bukt.list():
- self.fs._s3bukt.delete_key(k)
- self.fs._s3conn.delete_bucket(self.bucket)
-
-
-import rpcfs
-from fs.expose.xmlrpc import RPCFSServer
-class TestRPCFS(unittest.TestCase,FSTestCases):
-
- def setUp(self):
- self.port = 8000
- self.server = None
- while not self.server:
- try:
- self.server = RPCFSServer(tempfs.TempFS(),("localhost",self.port),logRequests=False)
- except socket.error, e:
- if e.args[1] == "Address already in use":
- self.port += 1
- else:
- raise e
- self.server_thread = threading.Thread(target=self._run_server)
- self.server_thread.start()
- self.fs = rpcfs.RPCFS("http://localhost:" + str(self.port))
-
- def _run_server(self):
- """Run the server, swallowing shutdown-related execptions."""
- try:
- self.server.serve_forever()
- except:
- pass
-
- def tearDown(self):
- try:
- # Shut the server down. We send one final request to
- # bump the socket and make it recognise the shutdown.
- self.server.serve_more_requests = False
- self.server.server_close()
- self.fs.exists("/")
- except Exception:
- pass
-
-
-import sftpfs
-from fs.expose.sftp import BaseSFTPServer
-class TestSFTPFS(unittest.TestCase,FSTestCases):
-
- def setUp(self):
- self.temp_fs = tempfs.TempFS()
- self.port = 8000
- self.server = None
- while not self.server:
- try:
- self.server = BaseSFTPServer(("localhost",self.port),self.temp_fs)
- except socket.error, e:
- if e.args[1] == "Address already in use":
- self.port += 1
- else:
- raise
- self.server_thread = threading.Thread(target=self._run_server)
- self.server_thread.start()
- self.fs = sftpfs.SFTPFS(("localhost",self.port))
-
- def _run_server(self):
- """Run the server, swallowing shutdown-related execptions."""
- try:
- self.server.serve_forever()
- except:
- pass
-
- def tearDown(self):
- try:
- self.server.shutdown()
- self.fs.exists("/")
- except Exception:
- pass
- self.temp_fs.close()
-
-
-####################
-
-from fs.wrappers.xattr import ensure_xattr
-class TestXAttr(unittest.TestCase,FSTestCases,XAttrTestCases):
-
- def setUp(self):
- self.fs = ensure_xattr(tempfs.TempFS())
-
- def tearDown(self):
- td = self.fs._temp_dir
- self.fs.close()
- self.assert_(not os.path.exists(td))
-
- def check(self, p):
- td = self.fs._temp_dir
- return os.path.exists(os.path.join(td, makerelative(p)))
-
-
-####################
-
-
-import zipfs
-import random
-import zipfile
-class TestReadZipFS(unittest.TestCase):
-
- def setUp(self):
- self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip"
- self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename)
-
- self.zf = zipfile.ZipFile(self.temp_filename, "w")
- zf = self.zf
- zf.writestr("a.txt", "Hello, World!")
- zf.writestr("b.txt", "b")
- zf.writestr("1.txt", "1")
- zf.writestr("foo/bar/baz.txt", "baz")
- zf.writestr("foo/second.txt", "hai")
- zf.close()
- self.fs = zipfs.ZipFS(self.temp_filename, "r")
-
- def tearDown(self):
- self.fs.close()
- os.remove(self.temp_filename)
-
- def check(self, p):
- try:
- self.zipfile.getinfo(p)
- return True
- except:
- return False
-
- def test_reads(self):
- def read_contents(path):
- f = self.fs.open(path)
- contents = f.read()
- return contents
- def check_contents(path, expected):
- self.assert_(read_contents(path)==expected)
- check_contents("a.txt", "Hello, World!")
- check_contents("1.txt", "1")
- check_contents("foo/bar/baz.txt", "baz")
-
- def test_getcontents(self):
- def read_contents(path):
- return self.fs.getcontents(path)
- def check_contents(path, expected):
- self.assert_(read_contents(path)==expected)
- check_contents("a.txt", "Hello, World!")
- check_contents("1.txt", "1")
- check_contents("foo/bar/baz.txt", "baz")
-
- def test_is(self):
- self.assert_(self.fs.isfile('a.txt'))
- self.assert_(self.fs.isfile('1.txt'))
- self.assert_(self.fs.isfile('foo/bar/baz.txt'))
- self.assert_(self.fs.isdir('foo'))
- self.assert_(self.fs.isdir('foo/bar'))
- self.assert_(self.fs.exists('a.txt'))
- self.assert_(self.fs.exists('1.txt'))
- self.assert_(self.fs.exists('foo/bar/baz.txt'))
- self.assert_(self.fs.exists('foo'))
- self.assert_(self.fs.exists('foo/bar'))
-
- def test_listdir(self):
-
- def check_listing(path, expected):
- dir_list = self.fs.listdir(path)
- self.assert_(sorted(dir_list) == sorted(expected))
- check_listing('/', ['a.txt', '1.txt', 'foo', 'b.txt'])
- check_listing('foo', ['second.txt', 'bar'])
- check_listing('foo/bar', ['baz.txt'])
-
-
-class TestWriteZipFS(unittest.TestCase):
-
- def setUp(self):
- self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip"
- self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename)
-
- zip_fs = zipfs.ZipFS(self.temp_filename, 'w')
-
- def makefile(filename, contents):
- if dirname(filename):
- zip_fs.makedir(dirname(filename), recursive=True, allow_recreate=True)
- f = zip_fs.open(filename, 'w')
- f.write(contents)
- f.close()
-
- makefile("a.txt", "Hello, World!")
- makefile("b.txt", "b")
- makefile("foo/bar/baz.txt", "baz")
- makefile("foo/second.txt", "hai")
-
- zip_fs.close()
-
- def tearDown(self):
- os.remove(self.temp_filename)
-
- def test_valid(self):
- zf = zipfile.ZipFile(self.temp_filename, "r")
- self.assert_(zf.testzip() is None)
- zf.close()
-
- def test_creation(self):
- zf = zipfile.ZipFile(self.temp_filename, "r")
- def check_contents(filename, contents):
- zcontents = zf.read(filename)
- self.assertEqual(contents, zcontents)
- check_contents("a.txt", "Hello, World!")
- check_contents("b.txt", "b")
- check_contents("foo/bar/baz.txt", "baz")
- check_contents("foo/second.txt", "hai")
-
-
-class TestAppendZipFS(TestWriteZipFS):
-
- def setUp(self):
- self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip"
- self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename)
-
- zip_fs = zipfs.ZipFS(self.temp_filename, 'w')
-
- def makefile(filename, contents):
- if dirname(filename):
- zip_fs.makedir(dirname(filename), recursive=True, allow_recreate=True)
- f = zip_fs.open(filename, 'w')
- f.write(contents)
- f.close()
-
- makefile("a.txt", "Hello, World!")
- makefile("b.txt", "b")
-
- zip_fs.close()
- zip_fs = zipfs.ZipFS(self.temp_filename, 'a')
-
- makefile("foo/bar/baz.txt", "baz")
- makefile("foo/second.txt", "hai")
-
- zip_fs.close()
-
-
-
-if __name__ == "__main__":
- import nose
- nose.main()
-
diff --git a/fs/tests/__init__.py b/fs/tests/__init__.py
new file mode 100644
index 0000000..3d2c309
--- /dev/null
+++ b/fs/tests/__init__.py
@@ -0,0 +1,443 @@
+#!/usr/bin/env python
+"""
+
+ fs.tests: testcases for the fs module
+
+"""
+
+# Send any output from the logging module to stdout, so it will
+# be captured by nose and reported appropriately
+import sys
+import logging
+logging.basicConfig(level=logging.ERROR,stream=sys.stdout)
+
+from fs.base import *
+
+import pickle
+
+
+
+class FSTestCases:
+ """Base suite of testcases for filesystem implementations.
+
+ Any FS subclass should be capable of passing all of these tests.
+ To apply the tests to your own FS implementation, simply use FSTestCase
+ as a mixin for your own unittest.TestCase subclass and have the setUp
+ method set self.fs to an instance of your FS implementation.
+
+ This class is designed as a mixin so that it's not detected by test
+ loading tools such as nose.
+ """
+
+ def check(self, p):
+ """Check that a file exists within self.fs"""
+ return self.fs.exists(p)
+
+ def test_root_dir(self):
+ self.assertTrue(self.fs.isdir(""))
+ self.assertTrue(self.fs.isdir("/"))
+
+ def test_debug(self):
+ str(self.fs)
+ repr(self.fs)
+ self.assert_(hasattr(self.fs, 'desc'))
+
+ def test_writefile(self):
+ self.assertRaises(ResourceNotFoundError,self.fs.open,"test1.txt")
+ f = self.fs.open("test1.txt","w")
+ f.write("testing")
+ f.close()
+ f = self.fs.open("test1.txt","r")
+ self.assertEquals(f.read(),"testing")
+
+ def test_isdir_isfile(self):
+ self.assertFalse(self.fs.exists("dir1"))
+ self.assertFalse(self.fs.isdir("dir1"))
+ self.assertFalse(self.fs.isfile("a.txt"))
+ self.fs.createfile("a.txt")
+ self.assertFalse(self.fs.isdir("dir1"))
+ self.assertTrue(self.fs.exists("a.txt"))
+ self.assertTrue(self.fs.isfile("a.txt"))
+ self.fs.makedir("dir1")
+ self.assertTrue(self.fs.isdir("dir1"))
+ self.assertTrue(self.fs.exists("dir1"))
+ self.assertTrue(self.fs.exists("a.txt"))
+ self.fs.remove("a.txt")
+ self.assertFalse(self.fs.exists("a.txt"))
+
+ def test_listdir(self):
+ self.fs.createfile("a")
+ self.fs.createfile("b")
+ self.fs.createfile("foo")
+ self.fs.createfile("bar")
+ # Test listing of the root directory
+ d1 = self.fs.listdir()
+ self.assertEqual(len(d1), 4)
+ self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"])
+ d1 = self.fs.listdir("")
+ self.assertEqual(len(d1), 4)
+ self.assertEqual(sorted(d1), ["a", "b", "bar", "foo"])
+ d1 = self.fs.listdir("/")
+ self.assertEqual(len(d1), 4)
+ # Test listing absolute paths
+ d2 = self.fs.listdir(absolute=True)
+ self.assertEqual(len(d2), 4)
+ self.assertEqual(sorted(d2), ["/a", "/b", "/bar", "/foo"])
+ # Create some deeper subdirectories, to make sure their
+ # contents are not inadvertantly included
+ self.fs.makedir("p/1/2/3",recursive=True)
+ self.fs.createfile("p/1/2/3/a")
+ self.fs.createfile("p/1/2/3/b")
+ self.fs.createfile("p/1/2/3/foo")
+ self.fs.createfile("p/1/2/3/bar")
+ self.fs.makedir("q")
+ # Test listing just files, just dirs, and wildcards
+ dirs_only = self.fs.listdir(dirs_only=True)
+ files_only = self.fs.listdir(files_only=True)
+ contains_a = self.fs.listdir(wildcard="*a*")
+ self.assertEqual(sorted(dirs_only), ["p", "q"])
+ self.assertEqual(sorted(files_only), ["a", "b", "bar", "foo"])
+ self.assertEqual(sorted(contains_a), ["a", "bar"])
+ # Test listing a subdirectory
+ d3 = self.fs.listdir("p/1/2/3")
+ self.assertEqual(len(d3), 4)
+ self.assertEqual(sorted(d3), ["a", "b", "bar", "foo"])
+ # Test listing a subdirectory with absoliute and full paths
+ d4 = self.fs.listdir("p/1/2/3", absolute=True)
+ self.assertEqual(len(d4), 4)
+ self.assertEqual(sorted(d4), ["/p/1/2/3/a", "/p/1/2/3/b", "/p/1/2/3/bar", "/p/1/2/3/foo"])
+ d4 = self.fs.listdir("p/1/2/3", full=True)
+ self.assertEqual(len(d4), 4)
+ self.assertEqual(sorted(d4), ["p/1/2/3/a", "p/1/2/3/b", "p/1/2/3/bar", "p/1/2/3/foo"])
+ # Test that appropriate errors are raised
+ self.assertRaises(ResourceNotFoundError,self.fs.listdir,"zebra")
+ self.assertRaises(ResourceInvalidError,self.fs.listdir,"foo")
+
+ def test_makedir(self):
+ check = self.check
+ self.fs.makedir("a")
+ self.assertTrue(check("a"))
+ self.assertRaises(ParentDirectoryMissingError,self.fs.makedir,"a/b/c")
+ self.fs.makedir("a/b/c", recursive=True)
+ self.assert_(check("a/b/c"))
+ self.fs.makedir("foo/bar/baz", recursive=True)
+ self.assert_(check("foo/bar/baz"))
+ self.fs.makedir("a/b/child")
+ self.assert_(check("a/b/child"))
+ self.assertRaises(DestinationExistsError,self.fs.makedir,"/a/b")
+ self.fs.makedir("/a/b",allow_recreate=True)
+ self.fs.createfile("/a/file")
+ self.assertRaises(ResourceInvalidError,self.fs.makedir,"a/file")
+
+ def test_remove(self):
+ self.fs.createfile("a.txt")
+ self.assertTrue(self.check("a.txt"))
+ self.fs.remove("a.txt")
+ self.assertFalse(self.check("a.txt"))
+ self.assertRaises(ResourceNotFoundError,self.fs.remove,"a.txt")
+ self.fs.makedir("dir1")
+ self.assertRaises(ResourceInvalidError,self.fs.remove,"dir1")
+ self.fs.createfile("/dir1/a.txt")
+ self.assertTrue(self.check("dir1/a.txt"))
+ self.fs.remove("dir1/a.txt")
+ self.assertFalse(self.check("/dir1/a.txt"))
+
+ def test_removedir(self):
+ check = self.check
+ self.fs.makedir("a")
+ self.assert_(check("a"))
+ self.fs.removedir("a")
+ self.assert_(not check("a"))
+ self.fs.makedir("a/b/c/d", recursive=True)
+ self.assertRaises(DirectoryNotEmptyError, self.fs.removedir, "a/b")
+ self.fs.removedir("a/b/c/d")
+ self.assert_(not check("a/b/c/d"))
+ self.fs.removedir("a/b/c")
+ self.assert_(not check("a/b/c"))
+ self.fs.removedir("a/b")
+ self.assert_(not check("a/b"))
+ # Test recursive removal of empty parent dirs
+ self.fs.makedir("foo/bar/baz", recursive=True)
+ self.fs.removedir("foo/bar/baz", recursive=True)
+ self.assert_(not check("foo/bar/baz"))
+ self.assert_(not check("foo/bar"))
+ self.assert_(not check("foo"))
+ # Ensure that force=True works as expected
+ self.fs.makedir("frollic/waggle", recursive=True)
+ self.fs.createfile("frollic/waddle.txt","waddlewaddlewaddle")
+ self.assertRaises(DirectoryNotEmptyError,self.fs.removedir,"frollic")
+ self.assertRaises(ResourceInvalidError,self.fs.removedir,"frollic/waddle.txt")
+ self.fs.removedir("frollic",force=True)
+ self.assert_(not check("frollic"))
+
+ def test_rename(self):
+ check = self.check
+ self.fs.open("foo.txt", 'wt').write("Hello, World!")
+ self.assert_(check("foo.txt"))
+ self.fs.rename("foo.txt", "bar.txt")
+ self.assert_(check("bar.txt"))
+ self.assert_(not check("foo.txt"))
+
+ def test_info(self):
+ test_str = "Hello, World!"
+ self.fs.createfile("info.txt",test_str)
+ info = self.fs.getinfo("info.txt")
+ self.assertEqual(info['size'], len(test_str))
+ self.fs.desc("info.txt")
+
+ def test_getsize(self):
+ test_str = "*"*23
+ self.fs.createfile("info.txt",test_str)
+ size = self.fs.getsize("info.txt")
+ self.assertEqual(size, len(test_str))
+
+ def test_movefile(self):
+ check = self.check
+ contents = "If the implementation is hard to explain, it's a bad idea."
+ def makefile(path):
+ self.fs.createfile(path,contents)
+ def checkcontents(path):
+ check_contents = self.fs.getcontents(path)
+ self.assertEqual(check_contents,contents)
+ return contents == check_contents
+
+ self.fs.makedir("foo/bar", recursive=True)
+ makefile("foo/bar/a.txt")
+ self.assert_(check("foo/bar/a.txt"))
+ self.assert_(checkcontents("foo/bar/a.txt"))
+ self.fs.move("foo/bar/a.txt", "foo/b.txt")
+ self.assert_(not check("foo/bar/a.txt"))
+ self.assert_(check("foo/b.txt"))
+ self.assert_(checkcontents("foo/b.txt"))
+
+ self.fs.move("foo/b.txt", "c.txt")
+ self.assert_(not check("foo/b.txt"))
+ self.assert_(check("/c.txt"))
+ self.assert_(checkcontents("/c.txt"))
+
+ makefile("foo/bar/a.txt")
+ self.assertRaises(DestinationExistsError,self.fs.move,"foo/bar/a.txt","/c.txt")
+ self.assert_(check("foo/bar/a.txt"))
+ self.assert_(check("/c.txt"))
+ self.fs.move("foo/bar/a.txt","/c.txt",overwrite=True)
+ self.assert_(not check("foo/bar/a.txt"))
+ self.assert_(check("/c.txt"))
+
+
+ def test_movedir(self):
+ check = self.check
+ contents = "If the implementation is hard to explain, it's a bad idea."
+ def makefile(path):
+ self.fs.createfile(path,contents)
+
+ self.fs.makedir("a")
+ self.fs.makedir("b")
+ makefile("a/1.txt")
+ makefile("a/2.txt")
+ makefile("a/3.txt")
+ self.fs.makedir("a/foo/bar", recursive=True)
+ makefile("a/foo/bar/baz.txt")
+
+ self.fs.movedir("a", "copy of a")
+
+ self.assert_(check("copy of a/1.txt"))
+ self.assert_(check("copy of a/2.txt"))
+ self.assert_(check("copy of a/3.txt"))
+ self.assert_(check("copy of a/foo/bar/baz.txt"))
+
+ self.assert_(not check("a/1.txt"))
+ self.assert_(not check("a/2.txt"))
+ self.assert_(not check("a/3.txt"))
+ self.assert_(not check("a/foo/bar/baz.txt"))
+ self.assert_(not check("a/foo/bar"))
+ self.assert_(not check("a/foo"))
+ self.assert_(not check("a"))
+
+ self.fs.makedir("a")
+ print self.fs.listdir("a")
+ self.assertRaises(DestinationExistsError,self.fs.movedir,"copy of a","a")
+ self.fs.movedir("copy of a","a",overwrite=True)
+ self.assert_(not check("copy of a"))
+ print self.fs.listdir("a")
+ self.assert_(check("a/1.txt"))
+ self.assert_(check("a/2.txt"))
+ self.assert_(check("a/3.txt"))
+ self.assert_(check("a/foo/bar/baz.txt"))
+
+
+ def test_copyfile(self):
+ check = self.check
+ contents = "If the implementation is hard to explain, it's a bad idea."
+ def makefile(path,contents=contents):
+ self.fs.createfile(path,contents)
+ def checkcontents(path,contents=contents):
+ check_contents = self.fs.getcontents(path)
+ self.assertEqual(check_contents,contents)
+ return contents == check_contents
+
+ self.fs.makedir("foo/bar", recursive=True)
+ makefile("foo/bar/a.txt")
+ self.assert_(check("foo/bar/a.txt"))
+ self.assert_(checkcontents("foo/bar/a.txt"))
+ self.fs.copy("foo/bar/a.txt", "foo/b.txt")
+ self.assert_(check("foo/bar/a.txt"))
+ self.assert_(check("foo/b.txt"))
+ self.assert_(checkcontents("foo/b.txt"))
+
+ self.fs.copy("foo/b.txt", "c.txt")
+ self.assert_(check("foo/b.txt"))
+ self.assert_(check("/c.txt"))
+ self.assert_(checkcontents("/c.txt"))
+
+ makefile("foo/bar/a.txt","different contents")
+ self.assertRaises(DestinationExistsError,self.fs.copy,"foo/bar/a.txt","/c.txt")
+ self.assert_(checkcontents("/c.txt"))
+ self.fs.copy("foo/bar/a.txt","/c.txt",overwrite=True)
+ self.assert_(checkcontents("foo/bar/a.txt","different contents"))
+ self.assert_(checkcontents("/c.txt","different contents"))
+
+
+ def test_copydir(self):
+ check = self.check
+ contents = "If the implementation is hard to explain, it's a bad idea."
+ def makefile(path):
+ self.fs.createfile(path,contents)
+ def checkcontents(path):
+ check_contents = self.fs.getcontents(path)
+ self.assertEqual(check_contents,contents)
+ return contents == check_contents
+
+ self.fs.makedir("a")
+ self.fs.makedir("b")
+ makefile("a/1.txt")
+ makefile("a/2.txt")
+ makefile("a/3.txt")
+ self.fs.makedir("a/foo/bar", recursive=True)
+ makefile("a/foo/bar/baz.txt")
+
+ self.fs.copydir("a", "copy of a")
+ self.assert_(check("copy of a/1.txt"))
+ self.assert_(check("copy of a/2.txt"))
+ self.assert_(check("copy of a/3.txt"))
+ self.assert_(check("copy of a/foo/bar/baz.txt"))
+ checkcontents("copy of a/1.txt")
+
+ self.assert_(check("a/1.txt"))
+ self.assert_(check("a/2.txt"))
+ self.assert_(check("a/3.txt"))
+ self.assert_(check("a/foo/bar/baz.txt"))
+ checkcontents("a/1.txt")
+
+ self.assertRaises(DestinationExistsError,self.fs.copydir,"a","b")
+ self.fs.copydir("a","b",overwrite=True)
+ self.assert_(check("b/1.txt"))
+ self.assert_(check("b/2.txt"))
+ self.assert_(check("b/3.txt"))
+ self.assert_(check("b/foo/bar/baz.txt"))
+ checkcontents("b/1.txt")
+
+ def test_copydir_with_dotfile(self):
+ check = self.check
+ contents = "If the implementation is hard to explain, it's a bad idea."
+ def makefile(path):
+ self.fs.createfile(path,contents)
+
+ self.fs.makedir("a")
+ makefile("a/1.txt")
+ makefile("a/2.txt")
+ makefile("a/.hidden.txt")
+
+ self.fs.copydir("a", "copy of a")
+ self.assert_(check("copy of a/1.txt"))
+ self.assert_(check("copy of a/2.txt"))
+ self.assert_(check("copy of a/.hidden.txt"))
+
+ self.assert_(check("a/1.txt"))
+ self.assert_(check("a/2.txt"))
+ self.assert_(check("a/.hidden.txt"))
+
+ def test_readwriteappendseek(self):
+ def checkcontents(path, check_contents):
+ read_contents = self.fs.getcontents(path)
+ self.assertEqual(read_contents,check_contents)
+ return read_contents == check_contents
+ test_strings = ["Beautiful is better than ugly.",
+ "Explicit is better than implicit.",
+ "Simple is better than complex."]
+ all_strings = "".join(test_strings)
+
+ self.assertRaises(ResourceNotFoundError, self.fs.open, "a.txt", "r")
+ self.assert_(not self.fs.exists("a.txt"))
+ f1 = self.fs.open("a.txt", "wb")
+ pos = 0
+ for s in test_strings:
+ f1.write(s)
+ pos += len(s)
+ self.assertEqual(pos, f1.tell())
+ f1.close()
+ self.assert_(self.fs.exists("a.txt"))
+ self.assert_(checkcontents("a.txt", all_strings))
+
+ f2 = self.fs.open("b.txt", "wb")
+ f2.write(test_strings[0])
+ f2.close()
+ self.assert_(checkcontents("b.txt", test_strings[0]))
+ f3 = self.fs.open("b.txt", "ab")
+ f3.write(test_strings[1])
+ f3.write(test_strings[2])
+ f3.close()
+ self.assert_(checkcontents("b.txt", all_strings))
+ f4 = self.fs.open("b.txt", "wb")
+ f4.write(test_strings[2])
+ f4.close()
+ self.assert_(checkcontents("b.txt", test_strings[2]))
+ f5 = self.fs.open("c.txt", "wb")
+ for s in test_strings:
+ f5.write(s+"\n")
+ f5.close()
+ f6 = self.fs.open("c.txt", "rb")
+ for s, t in zip(f6, test_strings):
+ self.assertEqual(s, t+"\n")
+ f6.close()
+ f7 = self.fs.open("c.txt", "rb")
+ f7.seek(13)
+ word = f7.read(6)
+ self.assertEqual(word, "better")
+ f7.seek(1, os.SEEK_CUR)
+ word = f7.read(4)
+ self.assertEqual(word, "than")
+ f7.seek(-9, os.SEEK_END)
+ word = f7.read(7)
+ self.assertEqual(word, "complex")
+ f7.close()
+ self.assertEqual(self.fs.getcontents("a.txt"), all_strings)
+
+ def test_with_statement(self):
+ # This is a little tricky since 'with' is actually new syntax.
+ # We use eval() to make this method safe for old python versions.
+ import sys
+ if sys.version_info[0] >= 2 and sys.version_info[1] >= 5:
+ # A successful 'with' statement
+ contents = "testing the with statement"
+ code = "from __future__ import with_statement\n"
+ code += "with self.fs.open('f.txt','w-') as testfile:\n"
+ code += " testfile.write(contents)\n"
+ code += "self.assertEquals(self.fs.getcontents('f.txt'),contents)"
+ code = compile(code,"<string>",'exec')
+ eval(code)
+ # A 'with' statement raising an error
+ contents = "testing the with statement"
+ code = "from __future__ import with_statement\n"
+ code += "with self.fs.open('f.txt','w-') as testfile:\n"
+ code += " testfile.write(contents)\n"
+ code += " raise ValueError\n"
+ code = compile(code,"<string>",'exec')
+ self.assertRaises(ValueError,eval,code,globals(),locals())
+ self.assertEquals(self.fs.getcontents('f.txt'),contents)
+
+ def test_pickling(self):
+ self.fs.createfile("test1","hello world")
+ oldfs = self.fs
+ self.fs = pickle.loads(pickle.dumps(self.fs))
+ self.assert_(self.fs.isfile("test1"))
+
diff --git a/fs/tests/test_expose.py b/fs/tests/test_expose.py
new file mode 100644
index 0000000..88f5095
--- /dev/null
+++ b/fs/tests/test_expose.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+"""
+
+ fs.tests.test_expose: testcases for fs.expose and associated FS classes
+
+"""
+
+import unittest
+import socket
+import threading
+
+from fs.tests import FSTestCases
+from fs.tempfs import TempFS
+
+from fs import rpcfs
+from fs.expose.xmlrpc import RPCFSServer
+class TestRPCFS(unittest.TestCase,FSTestCases):
+
+ def makeServer(self,fs,addr):
+ return RPCFSServer(fs,addr,logRequests=False)
+
+ def startServer(self):
+ port = 8000
+ self.temp_fs = TempFS()
+ self.server = None
+ while not self.server:
+ try:
+ self.server = self.makeServer(self.temp_fs,("localhost",port))
+ except socket.error, e:
+ if e.args[1] == "Address already in use":
+ port += 1
+ else:
+ raise
+ self.server_addr = ("localhost",port)
+ self.serve_more_requests = True
+ self.server_thread = threading.Thread(target=self.runServer)
+ self.server_thread.start()
+
+ def runServer(self):
+ """Run the server, swallowing shutdown-related execptions."""
+ self.server.socket.settimeout(0.1)
+ try:
+ while self.serve_more_requests:
+ self.server.handle_request()
+ except Exception, e:
+ pass
+
+ def setUp(self):
+ self.startServer()
+ self.fs = rpcfs.RPCFS("http://%s:%d" % self.server_addr)
+
+ def tearDown(self):
+ self.serve_more_requests = False
+ try:
+ self.bump()
+ self.server.server_close()
+ except Exception:
+ pass
+ self.server_thread.join()
+ self.temp_fs.close()
+
+ def bump(self):
+ host, port = self.server_addr
+ for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
+ af, socktype, proto, cn, sa = res
+ sock = None
+ try:
+ sock = socket.socket(af, socktype, proto)
+ sock.settimeout(1)
+ sock.connect(sa)
+ sock.send("\n")
+ except socket.error, e:
+ pass
+ finally:
+ if sock is not None:
+ sock.close()
+
+
+from fs import sftpfs
+from fs.expose.sftp import BaseSFTPServer
+class TestSFTPFS(TestRPCFS):
+
+ def makeServer(self,fs,addr):
+ return BaseSFTPServer(addr,fs)
+
+ def setUp(self):
+ self.startServer()
+ self.fs = sftpfs.SFTPFS(self.server_addr)
+
+ def bump(self):
+ # paramiko doesn't like being bumped, just wait for it to timeout.
+ # TODO: do this using a paramiko.Transport() connection
+ pass
+
+
diff --git a/fs/tests/test_fs.py b/fs/tests/test_fs.py
new file mode 100644
index 0000000..49c099f
--- /dev/null
+++ b/fs/tests/test_fs.py
@@ -0,0 +1,88 @@
+"""
+
+ fs.tests.test_fs: testcases for basic FS implementations
+
+"""
+
+from fs.tests import FSTestCases
+
+import unittest
+
+import os
+import shutil
+import tempfile
+
+from fs.path import *
+
+
+from fs import osfs
+class TestOSFS(unittest.TestCase,FSTestCases):
+
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp("fstest")
+ self.fs = osfs.OSFS(self.temp_dir)
+
+ def tearDown(self):
+ shutil.rmtree(self.temp_dir)
+
+ def check(self, p):
+ return os.path.exists(os.path.join(self.temp_dir, relpath(p)))
+
+
+
+class TestSubFS(unittest.TestCase,FSTestCases):
+
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp("fstest")
+ self.parent_fs = osfs.OSFS(self.temp_dir)
+ self.parent_fs.makedir("foo/bar", recursive=True)
+ self.fs = self.parent_fs.opendir("foo/bar")
+
+ def tearDown(self):
+ shutil.rmtree(self.temp_dir)
+
+ def check(self, p):
+ p = os.path.join("foo/bar", relpath(p))
+ full_p = os.path.join(self.temp_dir, p)
+ return os.path.exists(full_p)
+
+
+from fs import memoryfs
+class TestMemoryFS(unittest.TestCase,FSTestCases):
+
+ def setUp(self):
+ self.fs = memoryfs.MemoryFS()
+
+
+from fs import mountfs
+class TestMountFS(unittest.TestCase,FSTestCases):
+
+ def setUp(self):
+ self.mount_fs = mountfs.MountFS()
+ self.mem_fs = memoryfs.MemoryFS()
+ self.mount_fs.mountdir("mounted/memfs", self.mem_fs)
+ self.fs = self.mount_fs.opendir("mounted/memfs")
+
+ def tearDown(self):
+ pass
+
+ def check(self, p):
+ return self.mount_fs.exists(os.path.join("mounted/memfs", relpath(p)))
+
+
+from fs import tempfs
+class TestTempFS(unittest.TestCase,FSTestCases):
+
+ def setUp(self):
+ self.fs = tempfs.TempFS()
+
+ def tearDown(self):
+ td = self.fs._temp_dir
+ self.fs.close()
+ self.assert_(not os.path.exists(td))
+
+ def check(self, p):
+ td = self.fs._temp_dir
+ return os.path.exists(os.path.join(td, relpath(p)))
+
+
diff --git a/fs/tests/test_objecttree.py b/fs/tests/test_objecttree.py
new file mode 100644
index 0000000..b8d9a12
--- /dev/null
+++ b/fs/tests/test_objecttree.py
@@ -0,0 +1,47 @@
+"""
+
+ fs.tests.test_objectree: testcases for the fs objecttree module
+
+"""
+
+
+import unittest
+
+import fs.tests
+from fs import objecttree
+
+class TestObjectTree(unittest.TestCase):
+ """Testcases for the ObjectTree class."""
+
+ def test_getset(self):
+ ot = objecttree.ObjectTree()
+ ot['foo'] = "bar"
+ self.assertEqual(ot['foo'], 'bar')
+
+ ot = objecttree.ObjectTree()
+ ot['foo/bar'] = "baz"
+ self.assertEqual(ot['foo'], {'bar':'baz'})
+ self.assertEqual(ot['foo/bar'], 'baz')
+
+ del ot['foo/bar']
+ self.assertEqual(ot['foo'], {})
+
+ ot = objecttree.ObjectTree()
+ ot['a/b/c'] = "A"
+ ot['a/b/d'] = "B"
+ ot['a/b/e'] = "C"
+ ot['a/b/f'] = "D"
+ self.assertEqual(sorted(ot['a/b'].values()), ['A', 'B', 'C', 'D'])
+ self.assert_(ot.get('a/b/x', -1) == -1)
+
+ self.assert_('a/b/c' in ot)
+ self.assert_('a/b/x' not in ot)
+ self.assert_(ot.isobject('a/b/c'))
+ self.assert_(ot.isobject('a/b/d'))
+ self.assert_(not ot.isobject('a/b'))
+
+ left, object, right = ot.partialget('a/b/e/f/g')
+ self.assertEqual(left, "a/b/e")
+ self.assertEqual(object, "C")
+ self.assertEqual(right, "f/g")
+
diff --git a/fs/tests/test_path.py b/fs/tests/test_path.py
new file mode 100644
index 0000000..35c9185
--- /dev/null
+++ b/fs/tests/test_path.py
@@ -0,0 +1,98 @@
+"""
+
+ fs.tests.test_path: testcases for the fs path functions
+
+"""
+
+
+import unittest
+import fs.tests
+
+from fs.path import *
+
+class TestPathFunctions(unittest.TestCase):
+ """Testcases for FS path functions."""
+
+ def test_normpath(self):
+ tests = [ ("\\a\\b\\c", "/a/b/c"),
+ ("", ""),
+ ("/a/b/c", "/a/b/c"),
+ ("a/b/c", "a/b/c"),
+ ("a/b/../c/", "a/c"),
+ ("/","/"),
+ ]
+ for path, result in tests:
+ self.assertEqual(normpath(path), result)
+
+ def test_pathjoin(self):
+ tests = [ ("", "a", "a"),
+ ("a", "a", "a/a"),
+ ("a/b", "../c", "a/c"),
+ ("a/b/../c", "d", "a/c/d"),
+ ("/a/b/c", "d", "/a/b/c/d"),
+ ("/a/b/c", "../../../d", "/d"),
+ ("a", "b", "c", "a/b/c"),
+ ("a/b/c", "../d", "c", "a/b/d/c"),
+ ("a/b/c", "../d", "/a", "/a"),
+ ("aaa", "bbb/ccc", "aaa/bbb/ccc"),
+ ("aaa", "bbb\ccc", "aaa/bbb/ccc"),
+ ("aaa", "bbb", "ccc", "/aaa", "eee", "/aaa/eee"),
+ ("a/b", "./d", "e", "a/b/d/e"),
+ ("/", "/", "/"),
+ ("/", "", "/"),
+ ]
+ for testpaths in tests:
+ paths = testpaths[:-1]
+ result = testpaths[-1]
+ self.assertEqual(fs.pathjoin(*paths), result)
+
+ self.assertRaises(ValueError, fs.pathjoin, "../")
+ self.assertRaises(ValueError, fs.pathjoin, "./../")
+ self.assertRaises(ValueError, fs.pathjoin, "a/b", "../../..")
+ self.assertRaises(ValueError, fs.pathjoin, "a/b/../../../d")
+
+ def test_relpath(self):
+ tests = [ ("/a/b", "a/b"),
+ ("a/b", "a/b"),
+ ("/", "") ]
+
+ for path, result in tests:
+ print path, result
+ self.assertEqual(fs.relpath(path), result)
+
+ def test_abspath(self):
+ tests = [ ("/a/b", "/a/b"),
+ ("a/b", "/a/b"),
+ ("/", "/") ]
+
+ for path, result in tests:
+ self.assertEqual(fs.abspath(path), result)
+
+ def test_iteratepath(self):
+ tests = [ ("a/b", ["a", "b"]),
+ ("", [] ),
+ ("aaa/bbb/ccc", ["aaa", "bbb", "ccc"]),
+ ("a/b/c/../d", ["a", "b", "d"]) ]
+
+ for path, results in tests:
+ print repr(path), results
+ for path_component, expected in zip(iteratepath(path), results):
+ self.assertEqual(path_component, expected)
+
+ self.assertEqual(list(iteratepath("a/b/c/d", 1)), ["a", "b/c/d"])
+ self.assertEqual(list(iteratepath("a/b/c/d", 2)), ["a", "b", "c/d"])
+
+ def test_pathsplit(self):
+ tests = [ ("a/b", ("a", "b")),
+ ("a/b/c", ("a/b", "c")),
+ ("a", ("", "a")),
+ ("", ("", "")),
+ ("/", ("", "")),
+ ("foo/bar", ("foo", "bar")),
+ ("foo/bar/baz", ("foo/bar", "baz")),
+ ]
+ for path, result in tests:
+ self.assertEqual(fs.pathsplit(path), result)
+
+
+
diff --git a/fs/tests/test_s3fs.py b/fs/tests/test_s3fs.py
new file mode 100644
index 0000000..49ee51d
--- /dev/null
+++ b/fs/tests/test_s3fs.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+"""
+
+ fs.tests.test_s3fs: testcases for the S3FS module
+
+These tests are set up to be skipped by default, since they're very slow,
+require a valid AWS account, and cost money. You'll have to set the '__test__'
+attribute the True on te TestS3FS class to get them running.
+
+"""
+
+import unittest
+
+from fs.tests import FSTestCases
+from fs.path import *
+
+from fs import s3fs
+class TestS3FS(unittest.TestCase,FSTestCases):
+
+ # Disable the tests by default
+ __test__ = False
+
+ bucket = "test-s3fs.rfk.id.au"
+
+ def setUp(self):
+ self.fs = s3fs.S3FS(self.bucket,"/unittest/files")
+ self._clear()
+
+ def _clear(self):
+ for (path,files) in self.fs.walk(search="depth"):
+ for fn in files:
+ self.fs.remove(pathjoin(path,fn))
+ if path and path != "/":
+ self.fs.removedir(path)
+
+ def tearDown(self):
+ self._clear()
+ for k in self.fs._s3bukt.list():
+ self.fs._s3bukt.delete_key(k)
+ self.fs._s3conn.delete_bucket(self.bucket)
+
diff --git a/fs/tests/test_xattr.py b/fs/tests/test_xattr.py
new file mode 100644
index 0000000..ca6e4d8
--- /dev/null
+++ b/fs/tests/test_xattr.py
@@ -0,0 +1,116 @@
+"""
+
+ fs.tests.test_xattr: testcases for extended attribute support
+
+"""
+
+import unittest
+import os
+
+from fs.path import *
+from fs.errors import *
+from fs.tests import FSTestCases
+
+
+class XAttrTestCases:
+ """Testcases for filesystems providing extended attribute support.
+
+ This class should be used as a mixin to the unittest.TestCase class
+ for filesystems that provide extended attribute support.
+ """
+
+ def test_getsetdel(self):
+ def do_getsetdel(p):
+ self.assertEqual(self.fs.getxattr(p,"xattr1"),None)
+ self.fs.setxattr(p,"xattr1","value1")
+ self.assertEqual(self.fs.getxattr(p,"xattr1"),"value1")
+ self.fs.delxattr(p,"xattr1")
+ self.assertEqual(self.fs.getxattr(p,"xattr1"),None)
+ self.fs.createfile("test.txt","hello")
+ do_getsetdel("test.txt")
+ self.assertRaises(ResourceNotFoundError,self.fs.getxattr,"test2.txt","xattr1")
+ self.fs.makedir("mystuff")
+ self.fs.createfile("/mystuff/test.txt","")
+ do_getsetdel("mystuff")
+ do_getsetdel("mystuff/test.txt")
+
+ def test_list_xattrs(self):
+ def do_list(p):
+ self.assertEquals(sorted(self.fs.xattrs(p)),[])
+ self.fs.setxattr(p,"xattr1","value1")
+ self.assertEquals(sorted(self.fs.xattrs(p)),["xattr1"])
+ self.fs.setxattr(p,"attr2","value2")
+ self.assertEquals(sorted(self.fs.xattrs(p)),["attr2","xattr1"])
+ self.fs.delxattr(p,"xattr1")
+ self.assertEquals(sorted(self.fs.xattrs(p)),["attr2"])
+ self.fs.delxattr(p,"attr2")
+ self.assertEquals(sorted(self.fs.xattrs(p)),[])
+ self.fs.createfile("test.txt","hello")
+ do_list("test.txt")
+ self.fs.makedir("mystuff")
+ self.fs.createfile("/mystuff/test.txt","")
+ do_list("mystuff")
+ do_list("mystuff/test.txt")
+
+ def test_copy_xattrs(self):
+ self.fs.createfile("a.txt","content")
+ self.fs.setxattr("a.txt","myattr","myvalue")
+ self.fs.setxattr("a.txt","testattr","testvalue")
+ self.fs.makedir("stuff")
+ self.fs.copy("a.txt","stuff/a.txt")
+ self.assertTrue(self.fs.exists("stuff/a.txt"))
+ self.assertEquals(self.fs.getxattr("stuff/a.txt","myattr"),"myvalue")
+ self.assertEquals(self.fs.getxattr("stuff/a.txt","testattr"),"testvalue")
+ self.assertEquals(self.fs.getxattr("a.txt","myattr"),"myvalue")
+ self.assertEquals(self.fs.getxattr("a.txt","testattr"),"testvalue")
+ self.fs.setxattr("stuff","dirattr","a directory")
+ self.fs.copydir("stuff","stuff2")
+ self.assertEquals(self.fs.getxattr("stuff2/a.txt","myattr"),"myvalue")
+ self.assertEquals(self.fs.getxattr("stuff2/a.txt","testattr"),"testvalue")
+ self.assertEquals(self.fs.getxattr("stuff2","dirattr"),"a directory")
+ self.assertEquals(self.fs.getxattr("stuff","dirattr"),"a directory")
+
+ def test_move_xattrs(self):
+ self.fs.createfile("a.txt","content")
+ self.fs.setxattr("a.txt","myattr","myvalue")
+ self.fs.setxattr("a.txt","testattr","testvalue")
+ self.fs.makedir("stuff")
+ self.fs.move("a.txt","stuff/a.txt")
+ self.assertTrue(self.fs.exists("stuff/a.txt"))
+ self.assertEquals(self.fs.getxattr("stuff/a.txt","myattr"),"myvalue")
+ self.assertEquals(self.fs.getxattr("stuff/a.txt","testattr"),"testvalue")
+ self.fs.setxattr("stuff","dirattr","a directory")
+ self.fs.movedir("stuff","stuff2")
+ self.assertEquals(self.fs.getxattr("stuff2/a.txt","myattr"),"myvalue")
+ self.assertEquals(self.fs.getxattr("stuff2/a.txt","testattr"),"testvalue")
+ self.assertEquals(self.fs.getxattr("stuff2","dirattr"),"a directory")
+
+
+
+from fs.wrappers.xattr import ensure_xattr
+
+from fs import tempfs
+class TestXAttr_TempFS(unittest.TestCase,FSTestCases,XAttrTestCases):
+
+ def setUp(self):
+ self.fs = ensure_xattr(tempfs.TempFS())
+
+ def tearDown(self):
+ td = self.fs._temp_dir
+ self.fs.close()
+ self.assert_(not os.path.exists(td))
+
+ def check(self, p):
+ td = self.fs._temp_dir
+ return os.path.exists(os.path.join(td, relpath(p)))
+
+
+from fs import memoryfs
+class TestXAttr_MemoryFS(unittest.TestCase,FSTestCases,XAttrTestCases):
+
+ def setUp(self):
+ self.fs = ensure_xattr(memoryfs.MemoryFS())
+
+ def check(self, p):
+ return self.fs.exists(p)
+
diff --git a/fs/tests/test_zipfs.py b/fs/tests/test_zipfs.py
new file mode 100644
index 0000000..fec01a6
--- /dev/null
+++ b/fs/tests/test_zipfs.py
@@ -0,0 +1,153 @@
+"""
+
+ fs.tests.test_zipfs: testcases for the ZipFS class
+
+"""
+
+import unittest
+import os
+import random
+import zipfile
+import tempfile
+
+import fs.tests
+from fs.path import *
+
+
+from fs import zipfs
+class TestReadZipFS(unittest.TestCase):
+
+ def setUp(self):
+ self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip"
+ self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename)
+
+ self.zf = zipfile.ZipFile(self.temp_filename, "w")
+ zf = self.zf
+ zf.writestr("a.txt", "Hello, World!")
+ zf.writestr("b.txt", "b")
+ zf.writestr("1.txt", "1")
+ zf.writestr("foo/bar/baz.txt", "baz")
+ zf.writestr("foo/second.txt", "hai")
+ zf.close()
+ self.fs = zipfs.ZipFS(self.temp_filename, "r")
+
+ def tearDown(self):
+ self.fs.close()
+ os.remove(self.temp_filename)
+
+ def check(self, p):
+ try:
+ self.zipfile.getinfo(p)
+ return True
+ except:
+ return False
+
+ def test_reads(self):
+ def read_contents(path):
+ f = self.fs.open(path)
+ contents = f.read()
+ return contents
+ def check_contents(path, expected):
+ self.assert_(read_contents(path)==expected)
+ check_contents("a.txt", "Hello, World!")
+ check_contents("1.txt", "1")
+ check_contents("foo/bar/baz.txt", "baz")
+
+ def test_getcontents(self):
+ def read_contents(path):
+ return self.fs.getcontents(path)
+ def check_contents(path, expected):
+ self.assert_(read_contents(path)==expected)
+ check_contents("a.txt", "Hello, World!")
+ check_contents("1.txt", "1")
+ check_contents("foo/bar/baz.txt", "baz")
+
+ def test_is(self):
+ self.assert_(self.fs.isfile('a.txt'))
+ self.assert_(self.fs.isfile('1.txt'))
+ self.assert_(self.fs.isfile('foo/bar/baz.txt'))
+ self.assert_(self.fs.isdir('foo'))
+ self.assert_(self.fs.isdir('foo/bar'))
+ self.assert_(self.fs.exists('a.txt'))
+ self.assert_(self.fs.exists('1.txt'))
+ self.assert_(self.fs.exists('foo/bar/baz.txt'))
+ self.assert_(self.fs.exists('foo'))
+ self.assert_(self.fs.exists('foo/bar'))
+
+ def test_listdir(self):
+
+ def check_listing(path, expected):
+ dir_list = self.fs.listdir(path)
+ self.assert_(sorted(dir_list) == sorted(expected))
+ check_listing('/', ['a.txt', '1.txt', 'foo', 'b.txt'])
+ check_listing('foo', ['second.txt', 'bar'])
+ check_listing('foo/bar', ['baz.txt'])
+
+
+class TestWriteZipFS(unittest.TestCase):
+
+ def setUp(self):
+ self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip"
+ self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename)
+
+ zip_fs = zipfs.ZipFS(self.temp_filename, 'w')
+
+ def makefile(filename, contents):
+ if dirname(filename):
+ zip_fs.makedir(dirname(filename), recursive=True, allow_recreate=True)
+ f = zip_fs.open(filename, 'w')
+ f.write(contents)
+ f.close()
+
+ makefile("a.txt", "Hello, World!")
+ makefile("b.txt", "b")
+ makefile("foo/bar/baz.txt", "baz")
+ makefile("foo/second.txt", "hai")
+
+ zip_fs.close()
+
+ def tearDown(self):
+ os.remove(self.temp_filename)
+
+ def test_valid(self):
+ zf = zipfile.ZipFile(self.temp_filename, "r")
+ self.assert_(zf.testzip() is None)
+ zf.close()
+
+ def test_creation(self):
+ zf = zipfile.ZipFile(self.temp_filename, "r")
+ def check_contents(filename, contents):
+ zcontents = zf.read(filename)
+ self.assertEqual(contents, zcontents)
+ check_contents("a.txt", "Hello, World!")
+ check_contents("b.txt", "b")
+ check_contents("foo/bar/baz.txt", "baz")
+ check_contents("foo/second.txt", "hai")
+
+
+class TestAppendZipFS(TestWriteZipFS):
+
+ def setUp(self):
+ self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip"
+ self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename)
+
+ zip_fs = zipfs.ZipFS(self.temp_filename, 'w')
+
+ def makefile(filename, contents):
+ if dirname(filename):
+ zip_fs.makedir(dirname(filename), recursive=True, allow_recreate=True)
+ f = zip_fs.open(filename, 'w')
+ f.write(contents)
+ f.close()
+
+ makefile("a.txt", "Hello, World!")
+ makefile("b.txt", "b")
+
+ zip_fs.close()
+ zip_fs = zipfs.ZipFS(self.temp_filename, 'a')
+
+ makefile("foo/bar/baz.txt", "baz")
+ makefile("foo/second.txt", "hai")
+
+ zip_fs.close()
+
diff --git a/fs/wrappers/hidedotfiles.py b/fs/wrappers/hidedotfiles.py
index 6203410..527a2d7 100644
--- a/fs/wrappers/hidedotfiles.py
+++ b/fs/wrappers/hidedotfiles.py
@@ -4,7 +4,7 @@
"""
-from fs.helpers import *
+from fs.path import *
from fs.errors import *
from fs.wrappers import FSWrapper
@@ -19,7 +19,7 @@ class HideDotFiles(FSWrapper):
def is_hidden(self,path):
"""Check whether the given path should be hidden."""
- return path and resourcename(path)[0] == "."
+ return path and basename(path)[0] == "."
def _encode(self,path):
return path
diff --git a/fs/wrappers/xattr.py b/fs/wrappers/xattr.py
index a3bea2b..524477f 100644
--- a/fs/wrappers/xattr.py
+++ b/fs/wrappers/xattr.py
@@ -16,7 +16,7 @@ try:
except ImportError:
import pickle
-from fs.helpers import *
+from fs.path import *
from fs.errors import *
from fs.wrappers import FSWrapper
diff --git a/fs/zipfs.py b/fs/zipfs.py
index 504273a..3da4da4 100644
--- a/fs/zipfs.py
+++ b/fs/zipfs.py
@@ -1,7 +1,6 @@
#!/usr/bin/env python
-from base import *
-from helpers import *
+from fs.base import *
from zipfile import ZipFile, ZIP_DEFLATED, ZIP_STORED
from memoryfs import MemoryFS