summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2009-01-16 01:46:32 +0000
committerrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2009-01-16 01:46:32 +0000
commitd59c7059aae5ee34850cb5800c0b271ed01f98bf (patch)
tree3bef569aca9aeb04499259efc14fa423eb47a4fe
parent53830923df0d66c6aad4c7df0f99b45626e26ed4 (diff)
downloadpyfilesystem-d59c7059aae5ee34850cb5800c0b271ed01f98bf.tar.gz
make open(path,"w") immediately create an empty file in S3FS/RPCFS
git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@111 67cdc799-7952-0410-af00-57a81ceafa0f
-rw-r--r--fs/rpcfs.py37
-rw-r--r--fs/s3fs.py131
2 files changed, 101 insertions, 67 deletions
diff --git a/fs/rpcfs.py b/fs/rpcfs.py
index 4d2a676..ced3b7a 100644
--- a/fs/rpcfs.py
+++ b/fs/rpcfs.py
@@ -39,19 +39,6 @@ class ObjProxy:
def __getattr__(self,attr):
return getattr(self._obj,attr)
-class ReRaiseErrors:
- """XML-RPC proxy wrapper that tries to re-raise Faults as actual errors."""
- def __init__(self,proxy):
- self._proxy = proxy
- def __getattr__(self,attr):
- val = getattr(self._proxy,attr)
- if not callable(val):
- return val
- def func(*args,**kwds):
- try:
- val(*args,**kwds)
- except xmlrpclib.Fault, e:
- print "ERROR", e, e.faultCode, e.faultString
def re_raise_faults(func):
"""Decorator to re-raise XML-RPC faults as proper exceptions."""
@@ -108,6 +95,7 @@ class ReRaiseFaults:
val = getattr(self._obj,attr)
if callable(val):
val = re_raise_faults(val)
+ self.__dict__[attr] = val
return val
@@ -128,7 +116,7 @@ class RPCFS(FS):
The only required argument is the uri of the server to connect
to. This will be passed to the underlying XML-RPC server proxy
- object along with any other arguments if they are provided.
+ object along with the 'transport' argument if it is provided.
"""
self.uri = uri
if transport is not None:
@@ -144,7 +132,9 @@ class RPCFS(FS):
def open(self,path,mode):
# TODO: chunked transport of large files
- if "r" in mode or "a" in mode:
+ if "w" in mode:
+ self.proxy.set_contents(path,xmlrpclib.Binary(""))
+ if "r" in mode or "a" in mode or "+" in mode:
try:
data = self.proxy.get_contents(path).data
except IOError:
@@ -152,6 +142,7 @@ class RPCFS(FS):
raise ResourceNotFoundError("NO_FILE",path)
if not self.isdir(dirname(path)):
raise OperationFailedError("OPEN_FAILED", path,msg="Parent directory does not exist")
+ self.proxy.set_contents(path,xmlrpclib.Binary(""))
else:
data = ""
f = ObjProxy(StringIO(data))
@@ -159,11 +150,15 @@ class RPCFS(FS):
f.seek(0,0)
else:
f.seek(0,2)
+ oldflush = f.flush
oldclose = f.close
+ def newflush():
+ oldflush()
+ self.proxy.set_contents(path,xmlrpclib.Binary(f.getvalue()))
def newclose():
f.flush()
- self.proxy.set_contents(path,xmlrpclib.Binary(f.getvalue()))
oldclose()
+ f.flush = newflush
f.close = newclose
return f
@@ -217,7 +212,11 @@ class RPCFS(FS):
class RPCFSInterface(object):
- """Wrapper to expose an FS via a XML-RPC compatible interface."""
+ """Wrapper to expose an FS via a XML-RPC compatible interface.
+
+ The only real trick is using xmlrpclib.Binary objects to trasnport
+ the contents of files.
+ """
def __init__(self,fs):
self.fs = fs
@@ -281,12 +280,16 @@ class RPCFSInterface(object):
class RPCFSServer(SimpleXMLRPCServer):
"""Server to expose an FS object via XML-RPC.
+ This class takes as its first argument an FS instance, and as its second
+ argument a (hostname,port) tuple on which to listen for XML-RPC requests.
Example:
fs = OSFS('/var/srv/myfiles')
s = RPCFSServer(fs,("",8080))
s.serve_forever()
+ To cleanly shut down the server after calling serve_forever, set the
+ attribute "serve_more_requests" to False.
"""
def __init__(self,fs,addr,requestHandler=None,logRequests=None):
diff --git a/fs/s3fs.py b/fs/s3fs.py
index b42f176..79a0a0b 100644
--- a/fs/s3fs.py
+++ b/fs/s3fs.py
@@ -20,12 +20,7 @@ except ImportError:
from fs.base import *
from fs.helpers import *
-try:
- from collections import MutableMapping as DictMixin
-except ImportError:
- from UserDict import DictMixin
-
-
+
class S3FS(FS):
"""A filesystem stored in Amazon S3.
@@ -35,15 +30,16 @@ class S3FS(FS):
be stored.
Local temporary files are used when opening files from this filesystem,
- and any changes are only pushed back into S3 when the files are closed.
+ and any changes are only pushed back into S3 when the files are closed
+ or flushed.
"""
class meta:
PATH_MAX = None
NAME_MAX = None
- def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_syncronize=True):
- """Constructor for S3SF objects.
+ def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_syncronize=True,key_sync_timeout=1):
+ """Constructor for S3FS objects.
S3FS objects required the name of the S3 bucket in which to store
files, and can optionally be given a prefix under which the files
@@ -52,11 +48,19 @@ class S3FS(FS):
read from the two environment variables AWS_ACCESS_KEY_ID and
AWS_SECRET_ACCESS_KEY.
+ The keyword argument 'key_sync_timeout' specifies the maximum
+ time in seconds that the filesystem will spend trying to confirm
+ that a newly-uploaded S3 key is available for reading. For no
+ timeout set it to zero. To disable these checks entirely (and
+ thus reduce the filesystem's consistency guarantees to those of
+ S3's "eventual consistency" model) set it to None.
+
By default the path separator is "/", but this can be overridden
by specifying the keyword 'separator' in the constructor.
"""
- self._bucketName = bucket
+ self._bucket_name = bucket
self._separator = separator
+ self._key_sync_timeout = key_sync_timeout
self._s3conn = boto.s3.connection.S3Connection(aws_access_key,aws_secret_key)
self._s3bukt = self._s3conn.create_bucket(bucket)
# Normalise prefix to this form: path/to/files/
@@ -64,16 +68,17 @@ class S3FS(FS):
prefix = prefix[1:]
if not prefix.endswith(separator):
prefix = prefix + separator
- self._prefixStr = prefix
+ self._prefix = prefix
FS.__init__(self, thread_syncronize=thread_syncronize)
def __str__(self):
- return '<S3FS: %s:%s>' % (self._bucketName,self._prefixStr)
+ return '<S3FS: %s:%s>' % (self._bucket_name,self._prefix)
+
__repr__ = __str__
def _s3path(self,path):
"""Get the absolute path to a file stored in S3."""
- path = self._prefixStr + path
+ path = self._prefix + path
path = self._separator.join(self._pathbits(path))
return path
@@ -83,6 +88,41 @@ class S3FS(FS):
if bit and bit != ".":
yield bit
+ def _sync_key(self,k):
+ """Synchronise on contents of the given key.
+
+ Since S3 only offers "eventual consistency" of data, it is possible
+ to create a key but be unable to read it back straight away. This
+ method works around that limitation by polling the key until it reads
+ back the expected by the given key.
+
+ Note that this could easily fail if the key is modified by another
+ program, meaning the content will never be as specified in the given
+ key. This is the reason for the timeout argument to the construtcor.
+ """
+ timeout = self._key_sync_timeout
+ if timeout is None:
+ return k
+ k2 = self._s3bukt.get_key(k.name)
+ t = time.time()
+ while k2 is None or k2.etag != k.etag:
+ if timeout > 0:
+ if t + timeout < time.time():
+ break
+ time.sleep(0.1)
+ k2 = self._s3bukt.get_key(k.name)
+ return k2
+
+ def _sync_set_contents(self,key,contents):
+ """Synchronously set the contents of a key."""
+ if isinstance(key,basestring):
+ key = self._s3bukt.new_key(key)
+ if isinstance(contents,basestring):
+ key.set_contents_from_string(contents)
+ else:
+ key.set_contents_from_file(contents)
+ return self._sync_key(key)
+
def open(self,path,mode="r"):
"""Open the named file in the given mode.
@@ -91,19 +131,22 @@ class S3FS(FS):
file are only sent back to S3 when the file is flushed or closed.
"""
tf = TempFile()
- oldLM = None
s3path = self._s3path(path)
- if "r" in mode or "+" in mode or "a" in mode:
- # Get the file contents into the tempfile.
- # If it does not exist and has been opened for writing, create it.
+ # Truncate the file if requested
+ if "w" in mode:
+ k = self._sync_set_contents(s3path,"")
+ else:
k = self._s3bukt.get_key(s3path)
- if k is None:
- if "w" not in mode and "a" not in mode:
- raise ResourceNotFoundError("NO_FILE",path)
- if not self.isdir(dirname(path)):
- raise OperationFailedError("OPEN_FAILED", path,msg="Parent directory does not exist")
- else:
- oldLM = k.last_modified
+ if k is None:
+ # Create the file if it's missing
+ if "w" not in mode and "a" not in mode:
+ raise ResourceNotFoundError("NO_FILE",path)
+ if not self.isdir(dirname(path)):
+ raise OperationFailedError("OPEN_FAILED", path,msg="Parent directory does not exist")
+ k = self._sync_set_contents(s3path,"")
+ else:
+ # Get the file contents into the tempfile.
+ if "r" in mode or "+" in mode or "a" in mode:
k.get_contents_to_file(tf)
if "a" not in mode:
tf.seek(0)
@@ -111,22 +154,16 @@ class S3FS(FS):
if "w" in mode or "a" in mode or "+" in mode:
oldflush = tf.flush
oldclose = tf.close
- def upload():
- tf.seek(0)
- k = self._s3bukt.new_key(s3path)
- k.set_contents_from_file(tf)
- k = self._s3bukt.get_key(s3path)
- while k.last_modified == oldLM:
- time.sleep(0.1)
- k = self._s3bukt.get_key(s3path)
def newflush():
oldflush()
pos = tf.tell()
- upload()
+ tf.seek(0)
+ self._sync_set_contents(k,tf)
tf.seek(pos)
def newclose():
oldflush()
- upload()
+ tf.seek(0)
+ self._sync_set_contents(k,tf)
oldclose()
tf.close = newclose
tf.flush = newflush
@@ -137,7 +174,7 @@ class S3FS(FS):
s3path = self._s3path(path)
s3pathD = s3path + self._separator
# The root directory always exists
- if self._prefixStr.startswith(s3path):
+ if self._prefix.startswith(s3path):
return True
ks = self._s3bukt.list(prefix=s3path,delimiter=self._separator)
for k in ks:
@@ -153,7 +190,7 @@ class S3FS(FS):
"""Check whether a path exists and is a directory."""
s3path = self._s3path(path) + self._separator
# Root is always a directory
- if s3path == self._prefixStr:
+ if s3path == self._prefix:
return True
# Use a list request so that we return true if there are any files
# in that directory. This avoids requiring a special file for the
@@ -167,7 +204,7 @@ class S3FS(FS):
"""Check whether a path exists and is a regular file."""
s3path = self._s3path(path)
# Root is never a file
- if self._prefixStr.startswith(s3path):
+ if self._prefix.startswith(s3path):
return False
k = self._s3bukt.get_key(s3path)
if k is not None:
@@ -193,7 +230,8 @@ class S3FS(FS):
nm = nm.encode()
paths.append(nm)
if not isDir:
- if s3path != self._prefixStr:
+ if s3path != self._prefix:
+ print "NOT A DIR:", s3path
raise OperationFailedError("LISTDIR_FAILED",path)
return self._listdir_helper(path,paths,wildcard,full,absolute,hidden,dirs_only,files_only)
@@ -233,14 +271,14 @@ class S3FS(FS):
"""
s3path = self._s3path(path)
s3pathD = s3path + self._separator
- if s3pathD == self._prefixStr:
+ if s3pathD == self._prefix:
if allow_recreate:
return
raise OperationFailedError("MAKEDIR_FAILED", path, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s")
s3pathP = self._s3path(dirname(path[:-1])) + self._separator
# Check various preconditions using list of parent dir
ks = self._s3bukt.list(prefix=s3pathP,delimiter=self._separator)
- if s3pathP == self._prefixStr:
+ if s3pathP == self._prefix:
parentExists = True
else:
parentExists = False
@@ -263,12 +301,7 @@ class S3FS(FS):
raise OperationFailedError("MAKEDIR_FAILED",path, msg="Parent directory does not exist: %(path)s")
# Create an empty file representing the directory
# TODO: is there some standard scheme for representing empty dirs?
- k = self._s3bukt.new_key(s3pathD)
- k.set_contents_from_string("")
- k = self._s3bukt.get_key(s3pathD)
- while not k:
- time.sleep(0.1)
- k = self._s3bukt.get_key(s3pathD)
+ self._sync_set_contents(s3pathD,"")
def remove(self,path):
"""Remove the file at the given path."""
@@ -349,16 +382,14 @@ class S3FS(FS):
# OK, now we can copy the file.
s3path_src = self._s3path(src)
try:
- self._s3bukt.copy_key(s3path_dst,self._bucketName,s3path_src)
+ self._s3bukt.copy_key(s3path_dst,self._bucket_name,s3path_src)
except S3ResponseError, e:
if "404 Not Found" in str(e):
raise ResourceInvalid("WRONG_TYPE", src, msg="Source is not a file: %(path)s")
raise e
else:
k = self._s3bukt.get_key(s3path_dst)
- while not k:
- time.sleep(0.1)
- k = self._s3bukt.get_key(s3path_dst)
+ self._sync_key(k)
def move(self,src,dst,chunk_size=16384):
"""Move a file from one location to another."""