diff options
author | rfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f> | 2009-01-16 01:46:32 +0000 |
---|---|---|
committer | rfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f> | 2009-01-16 01:46:32 +0000 |
commit | d59c7059aae5ee34850cb5800c0b271ed01f98bf (patch) | |
tree | 3bef569aca9aeb04499259efc14fa423eb47a4fe | |
parent | 53830923df0d66c6aad4c7df0f99b45626e26ed4 (diff) | |
download | pyfilesystem-d59c7059aae5ee34850cb5800c0b271ed01f98bf.tar.gz |
make open(path,"w") immediately create an empty file in S3FS/RPCFS
git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@111 67cdc799-7952-0410-af00-57a81ceafa0f
-rw-r--r-- | fs/rpcfs.py | 37 | ||||
-rw-r--r-- | fs/s3fs.py | 131 |
2 files changed, 101 insertions, 67 deletions
diff --git a/fs/rpcfs.py b/fs/rpcfs.py index 4d2a676..ced3b7a 100644 --- a/fs/rpcfs.py +++ b/fs/rpcfs.py @@ -39,19 +39,6 @@ class ObjProxy: def __getattr__(self,attr): return getattr(self._obj,attr) -class ReRaiseErrors: - """XML-RPC proxy wrapper that tries to re-raise Faults as actual errors.""" - def __init__(self,proxy): - self._proxy = proxy - def __getattr__(self,attr): - val = getattr(self._proxy,attr) - if not callable(val): - return val - def func(*args,**kwds): - try: - val(*args,**kwds) - except xmlrpclib.Fault, e: - print "ERROR", e, e.faultCode, e.faultString def re_raise_faults(func): """Decorator to re-raise XML-RPC faults as proper exceptions.""" @@ -108,6 +95,7 @@ class ReRaiseFaults: val = getattr(self._obj,attr) if callable(val): val = re_raise_faults(val) + self.__dict__[attr] = val return val @@ -128,7 +116,7 @@ class RPCFS(FS): The only required argument is the uri of the server to connect to. This will be passed to the underlying XML-RPC server proxy - object along with any other arguments if they are provided. + object along with the 'transport' argument if it is provided. """ self.uri = uri if transport is not None: @@ -144,7 +132,9 @@ class RPCFS(FS): def open(self,path,mode): # TODO: chunked transport of large files - if "r" in mode or "a" in mode: + if "w" in mode: + self.proxy.set_contents(path,xmlrpclib.Binary("")) + if "r" in mode or "a" in mode or "+" in mode: try: data = self.proxy.get_contents(path).data except IOError: @@ -152,6 +142,7 @@ class RPCFS(FS): raise ResourceNotFoundError("NO_FILE",path) if not self.isdir(dirname(path)): raise OperationFailedError("OPEN_FAILED", path,msg="Parent directory does not exist") + self.proxy.set_contents(path,xmlrpclib.Binary("")) else: data = "" f = ObjProxy(StringIO(data)) @@ -159,11 +150,15 @@ class RPCFS(FS): f.seek(0,0) else: f.seek(0,2) + oldflush = f.flush oldclose = f.close + def newflush(): + oldflush() + self.proxy.set_contents(path,xmlrpclib.Binary(f.getvalue())) def newclose(): f.flush() - self.proxy.set_contents(path,xmlrpclib.Binary(f.getvalue())) oldclose() + f.flush = newflush f.close = newclose return f @@ -217,7 +212,11 @@ class RPCFS(FS): class RPCFSInterface(object): - """Wrapper to expose an FS via a XML-RPC compatible interface.""" + """Wrapper to expose an FS via a XML-RPC compatible interface. + + The only real trick is using xmlrpclib.Binary objects to trasnport + the contents of files. + """ def __init__(self,fs): self.fs = fs @@ -281,12 +280,16 @@ class RPCFSInterface(object): class RPCFSServer(SimpleXMLRPCServer): """Server to expose an FS object via XML-RPC. + This class takes as its first argument an FS instance, and as its second + argument a (hostname,port) tuple on which to listen for XML-RPC requests. Example: fs = OSFS('/var/srv/myfiles') s = RPCFSServer(fs,("",8080)) s.serve_forever() + To cleanly shut down the server after calling serve_forever, set the + attribute "serve_more_requests" to False. """ def __init__(self,fs,addr,requestHandler=None,logRequests=None): @@ -20,12 +20,7 @@ except ImportError: from fs.base import * from fs.helpers import * -try: - from collections import MutableMapping as DictMixin -except ImportError: - from UserDict import DictMixin - - + class S3FS(FS): """A filesystem stored in Amazon S3. @@ -35,15 +30,16 @@ class S3FS(FS): be stored. Local temporary files are used when opening files from this filesystem, - and any changes are only pushed back into S3 when the files are closed. + and any changes are only pushed back into S3 when the files are closed + or flushed. """ class meta: PATH_MAX = None NAME_MAX = None - def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_syncronize=True): - """Constructor for S3SF objects. + def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_syncronize=True,key_sync_timeout=1): + """Constructor for S3FS objects. S3FS objects required the name of the S3 bucket in which to store files, and can optionally be given a prefix under which the files @@ -52,11 +48,19 @@ class S3FS(FS): read from the two environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. + The keyword argument 'key_sync_timeout' specifies the maximum + time in seconds that the filesystem will spend trying to confirm + that a newly-uploaded S3 key is available for reading. For no + timeout set it to zero. To disable these checks entirely (and + thus reduce the filesystem's consistency guarantees to those of + S3's "eventual consistency" model) set it to None. + By default the path separator is "/", but this can be overridden by specifying the keyword 'separator' in the constructor. """ - self._bucketName = bucket + self._bucket_name = bucket self._separator = separator + self._key_sync_timeout = key_sync_timeout self._s3conn = boto.s3.connection.S3Connection(aws_access_key,aws_secret_key) self._s3bukt = self._s3conn.create_bucket(bucket) # Normalise prefix to this form: path/to/files/ @@ -64,16 +68,17 @@ class S3FS(FS): prefix = prefix[1:] if not prefix.endswith(separator): prefix = prefix + separator - self._prefixStr = prefix + self._prefix = prefix FS.__init__(self, thread_syncronize=thread_syncronize) def __str__(self): - return '<S3FS: %s:%s>' % (self._bucketName,self._prefixStr) + return '<S3FS: %s:%s>' % (self._bucket_name,self._prefix) + __repr__ = __str__ def _s3path(self,path): """Get the absolute path to a file stored in S3.""" - path = self._prefixStr + path + path = self._prefix + path path = self._separator.join(self._pathbits(path)) return path @@ -83,6 +88,41 @@ class S3FS(FS): if bit and bit != ".": yield bit + def _sync_key(self,k): + """Synchronise on contents of the given key. + + Since S3 only offers "eventual consistency" of data, it is possible + to create a key but be unable to read it back straight away. This + method works around that limitation by polling the key until it reads + back the expected by the given key. + + Note that this could easily fail if the key is modified by another + program, meaning the content will never be as specified in the given + key. This is the reason for the timeout argument to the construtcor. + """ + timeout = self._key_sync_timeout + if timeout is None: + return k + k2 = self._s3bukt.get_key(k.name) + t = time.time() + while k2 is None or k2.etag != k.etag: + if timeout > 0: + if t + timeout < time.time(): + break + time.sleep(0.1) + k2 = self._s3bukt.get_key(k.name) + return k2 + + def _sync_set_contents(self,key,contents): + """Synchronously set the contents of a key.""" + if isinstance(key,basestring): + key = self._s3bukt.new_key(key) + if isinstance(contents,basestring): + key.set_contents_from_string(contents) + else: + key.set_contents_from_file(contents) + return self._sync_key(key) + def open(self,path,mode="r"): """Open the named file in the given mode. @@ -91,19 +131,22 @@ class S3FS(FS): file are only sent back to S3 when the file is flushed or closed. """ tf = TempFile() - oldLM = None s3path = self._s3path(path) - if "r" in mode or "+" in mode or "a" in mode: - # Get the file contents into the tempfile. - # If it does not exist and has been opened for writing, create it. + # Truncate the file if requested + if "w" in mode: + k = self._sync_set_contents(s3path,"") + else: k = self._s3bukt.get_key(s3path) - if k is None: - if "w" not in mode and "a" not in mode: - raise ResourceNotFoundError("NO_FILE",path) - if not self.isdir(dirname(path)): - raise OperationFailedError("OPEN_FAILED", path,msg="Parent directory does not exist") - else: - oldLM = k.last_modified + if k is None: + # Create the file if it's missing + if "w" not in mode and "a" not in mode: + raise ResourceNotFoundError("NO_FILE",path) + if not self.isdir(dirname(path)): + raise OperationFailedError("OPEN_FAILED", path,msg="Parent directory does not exist") + k = self._sync_set_contents(s3path,"") + else: + # Get the file contents into the tempfile. + if "r" in mode or "+" in mode or "a" in mode: k.get_contents_to_file(tf) if "a" not in mode: tf.seek(0) @@ -111,22 +154,16 @@ class S3FS(FS): if "w" in mode or "a" in mode or "+" in mode: oldflush = tf.flush oldclose = tf.close - def upload(): - tf.seek(0) - k = self._s3bukt.new_key(s3path) - k.set_contents_from_file(tf) - k = self._s3bukt.get_key(s3path) - while k.last_modified == oldLM: - time.sleep(0.1) - k = self._s3bukt.get_key(s3path) def newflush(): oldflush() pos = tf.tell() - upload() + tf.seek(0) + self._sync_set_contents(k,tf) tf.seek(pos) def newclose(): oldflush() - upload() + tf.seek(0) + self._sync_set_contents(k,tf) oldclose() tf.close = newclose tf.flush = newflush @@ -137,7 +174,7 @@ class S3FS(FS): s3path = self._s3path(path) s3pathD = s3path + self._separator # The root directory always exists - if self._prefixStr.startswith(s3path): + if self._prefix.startswith(s3path): return True ks = self._s3bukt.list(prefix=s3path,delimiter=self._separator) for k in ks: @@ -153,7 +190,7 @@ class S3FS(FS): """Check whether a path exists and is a directory.""" s3path = self._s3path(path) + self._separator # Root is always a directory - if s3path == self._prefixStr: + if s3path == self._prefix: return True # Use a list request so that we return true if there are any files # in that directory. This avoids requiring a special file for the @@ -167,7 +204,7 @@ class S3FS(FS): """Check whether a path exists and is a regular file.""" s3path = self._s3path(path) # Root is never a file - if self._prefixStr.startswith(s3path): + if self._prefix.startswith(s3path): return False k = self._s3bukt.get_key(s3path) if k is not None: @@ -193,7 +230,8 @@ class S3FS(FS): nm = nm.encode() paths.append(nm) if not isDir: - if s3path != self._prefixStr: + if s3path != self._prefix: + print "NOT A DIR:", s3path raise OperationFailedError("LISTDIR_FAILED",path) return self._listdir_helper(path,paths,wildcard,full,absolute,hidden,dirs_only,files_only) @@ -233,14 +271,14 @@ class S3FS(FS): """ s3path = self._s3path(path) s3pathD = s3path + self._separator - if s3pathD == self._prefixStr: + if s3pathD == self._prefix: if allow_recreate: return raise OperationFailedError("MAKEDIR_FAILED", path, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s") s3pathP = self._s3path(dirname(path[:-1])) + self._separator # Check various preconditions using list of parent dir ks = self._s3bukt.list(prefix=s3pathP,delimiter=self._separator) - if s3pathP == self._prefixStr: + if s3pathP == self._prefix: parentExists = True else: parentExists = False @@ -263,12 +301,7 @@ class S3FS(FS): raise OperationFailedError("MAKEDIR_FAILED",path, msg="Parent directory does not exist: %(path)s") # Create an empty file representing the directory # TODO: is there some standard scheme for representing empty dirs? - k = self._s3bukt.new_key(s3pathD) - k.set_contents_from_string("") - k = self._s3bukt.get_key(s3pathD) - while not k: - time.sleep(0.1) - k = self._s3bukt.get_key(s3pathD) + self._sync_set_contents(s3pathD,"") def remove(self,path): """Remove the file at the given path.""" @@ -349,16 +382,14 @@ class S3FS(FS): # OK, now we can copy the file. s3path_src = self._s3path(src) try: - self._s3bukt.copy_key(s3path_dst,self._bucketName,s3path_src) + self._s3bukt.copy_key(s3path_dst,self._bucket_name,s3path_src) except S3ResponseError, e: if "404 Not Found" in str(e): raise ResourceInvalid("WRONG_TYPE", src, msg="Source is not a file: %(path)s") raise e else: k = self._s3bukt.get_key(s3path_dst) - while not k: - time.sleep(0.1) - k = self._s3bukt.get_key(s3path_dst) + self._sync_key(k) def move(self,src,dst,chunk_size=16384): """Move a file from one location to another.""" |