diff options
author | Andrew Scheller <github@loowis.durge.org> | 2016-07-04 15:14:39 +0100 |
---|---|---|
committer | Andrew Scheller <github@loowis.durge.org> | 2016-07-04 15:14:39 +0100 |
commit | 119fd3e8d8c0297e1103c7d5e128edbaf737a006 (patch) | |
tree | 0e85792028f63251b6f57c27c476d0280e6a293e /fs/expose | |
parent | 249342f38762b223b7846526e4637553987f4006 (diff) | |
download | pyfilesystem-git-119fd3e8d8c0297e1103c7d5e128edbaf737a006.tar.gz |
Minor whitespace tidyups
* convert all files to UNIX line-endings
* strip trailing whitespace
Diffstat (limited to 'fs/expose')
-rw-r--r-- | fs/expose/django_storage.py | 2 | ||||
-rw-r--r-- | fs/expose/fuse/fuse.py | 156 | ||||
-rw-r--r-- | fs/expose/fuse/fuse_ctypes.py | 162 | ||||
-rw-r--r-- | fs/expose/importhook.py | 2 | ||||
-rw-r--r-- | fs/expose/serve/packetstream.py | 120 | ||||
-rw-r--r-- | fs/expose/serve/server.py | 86 | ||||
-rw-r--r-- | fs/expose/serve/threadpool.py | 58 |
7 files changed, 293 insertions, 293 deletions
diff --git a/fs/expose/django_storage.py b/fs/expose/django_storage.py index 9a84b2a..636c3d5 100644 --- a/fs/expose/django_storage.py +++ b/fs/expose/django_storage.py @@ -28,7 +28,7 @@ class FSStorage(Storage): """ :param fs: an FS object :param base_url: The url to prepend to the path - + """ if fs is None: fs = settings.DEFAULT_FILE_STORAGE_FS diff --git a/fs/expose/fuse/fuse.py b/fs/expose/fuse/fuse.py index e15a5d5..ed1fbc4 100644 --- a/fs/expose/fuse/fuse.py +++ b/fs/expose/fuse/fuse.py @@ -113,7 +113,7 @@ elif _system == 'Linux': c_uid_t = c_uint setxattr_t = CFUNCTYPE(c_int, c_char_p, c_char_p, POINTER(c_byte), c_size_t, c_int) getxattr_t = CFUNCTYPE(c_int, c_char_p, c_char_p, POINTER(c_byte), c_size_t) - + if _machine == 'x86_64': c_stat._fields_ = [ ('st_dev', c_dev_t), @@ -294,12 +294,12 @@ class FUSE(object): """This class is the lower level interface and should not be subclassed under normal use. Its methods are called by fuse. Assumes API version 2.6 or later.""" - + def __init__(self, operations, mountpoint, raw_fi=False, **kwargs): """Setting raw_fi to True will cause FUSE to pass the fuse_file_info class as is to Operations, instead of just the fh field. This gives you access to direct_io, keep_cache, etc.""" - + self.operations = operations self.raw_fi = raw_fi args = ['fuse'] @@ -315,7 +315,7 @@ class FUSE(object): for key, val in kwargs.items())) args.append(mountpoint) argv = (c_char_p * len(args))(*args) - + fuse_ops = fuse_operations() for name, prototype in fuse_operations._fields_: if prototype != c_voidp and getattr(operations, name, None): @@ -326,7 +326,7 @@ class FUSE(object): del self.operations # Invoke the destructor if err: raise RuntimeError(err) - + def _wrapper_(self, func, *args, **kwargs): """Decorator for the methods that follow""" try: @@ -336,40 +336,40 @@ class FUSE(object): except: print_exc() return -EFAULT - + def getattr(self, path, buf): return self.fgetattr(path, buf, None) - + def readlink(self, path, buf, bufsize): ret = self.operations('readlink', path) data = create_string_buffer(ret[:bufsize - 1]) memmove(buf, data, len(data)) return 0 - + def mknod(self, path, mode, dev): return self.operations('mknod', path, mode, dev) - + def mkdir(self, path, mode): return self.operations('mkdir', path, mode) - + def unlink(self, path): return self.operations('unlink', path) - + def rmdir(self, path): return self.operations('rmdir', path) - + def symlink(self, source, target): return self.operations('symlink', target, source) - + def rename(self, old, new): return self.operations('rename', old, new) - + def link(self, source, target): return self.operations('link', target, source) - + def chmod(self, path, mode): return self.operations('chmod', path, mode) - + def chown(self, path, uid, gid): # Check if any of the arguments is a -1 that has overflowed if c_uid_t(uid + 1).value == 0: @@ -377,10 +377,10 @@ class FUSE(object): if c_gid_t(gid + 1).value == 0: gid = -1 return self.operations('chown', path, uid, gid) - + def truncate(self, path, length): return self.operations('truncate', path, length) - + def open(self, path, fip): fi = fip.contents if self.raw_fi: @@ -388,7 +388,7 @@ class FUSE(object): else: fi.fh = self.operations('open', path, fi.flags) return 0 - + def read(self, path, buf, size, offset, fip): fh = fip.contents if self.raw_fi else fip.contents.fh ret = self.operations('read', path, size, offset, fh) @@ -397,12 +397,12 @@ class FUSE(object): data = create_string_buffer(ret[:size], size) memmove(buf, data, size) return size - + def write(self, path, buf, size, offset, fip): data = string_at(buf, size) fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('write', path, data, offset, fh) - + def statfs(self, path, buf): stv = buf.contents attrs = self.operations('statfs', path) @@ -410,23 +410,23 @@ class FUSE(object): if hasattr(stv, key): setattr(stv, key, val) return 0 - + def flush(self, path, fip): fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('flush', path, fh) - + def release(self, path, fip): fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('release', path, fh) - + def fsync(self, path, datasync, fip): fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('fsync', path, datasync, fh) - + def setxattr(self, path, name, value, size, options, *args): data = string_at(value, size) return self.operations('setxattr', path, name, data, options, *args) - + def getxattr(self, path, name, value, size, *args): ret = self.operations('getxattr', path, name, *args) retsize = len(ret) @@ -436,7 +436,7 @@ class FUSE(object): return -ERANGE memmove(value, buf, retsize) return retsize - + def listxattr(self, path, namebuf, size): ret = self.operations('listxattr', path) buf = create_string_buffer('\x00'.join(ret)) if ret else '' @@ -446,15 +446,15 @@ class FUSE(object): return -ERANGE memmove(namebuf, buf, bufsize) return bufsize - + def removexattr(self, path, name): return self.operations('removexattr', path, name) - + def opendir(self, path, fip): # Ignore raw_fi fip.contents.fh = self.operations('opendir', path) return 0 - + def readdir(self, path, buf, filler, offset, fip): # Ignore raw_fi for item in self.operations('readdir', path, fip.contents.fh): @@ -470,24 +470,24 @@ class FUSE(object): if filler(buf, name, st, offset) != 0: break return 0 - + def releasedir(self, path, fip): # Ignore raw_fi return self.operations('releasedir', path, fip.contents.fh) - + def fsyncdir(self, path, datasync, fip): # Ignore raw_fi return self.operations('fsyncdir', path, datasync, fip.contents.fh) - + def init(self, conn): return self.operations('init', '/') - + def destroy(self, private_data): return self.operations('destroy', '/') - + def access(self, path, amode): return self.operations('access', path, amode) - + def create(self, path, mode, fip): fi = fip.contents if self.raw_fi: @@ -495,11 +495,11 @@ class FUSE(object): else: fi.fh = self.operations('create', path, mode) return 0 - + def ftruncate(self, path, length, fip): fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('truncate', path, length, fh) - + def fgetattr(self, path, buf, fip): memset(buf, 0, sizeof(c_stat)) st = buf.contents @@ -507,11 +507,11 @@ class FUSE(object): attrs = self.operations('getattr', path, fh) set_st_attrs(st, attrs) return 0 - + def lock(self, path, fip, cmd, lock): fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('lock', path, fh, cmd, lock) - + def utimens(self, path, buf): if buf: atime = time_of_timespec(buf.contents.actime) @@ -520,7 +520,7 @@ class FUSE(object): else: times = None return self.operations('utimens', path, times) - + def bmap(self, path, blocksize, idx): return self.operations('bmap', path, blocksize, idx) @@ -529,46 +529,46 @@ class Operations(object): """This class should be subclassed and passed as an argument to FUSE on initialization. All operations should raise a FuseOSError exception on error. - + When in doubt of what an operation should do, check the FUSE header file or the corresponding system call man page.""" - + def __call__(self, op, *args): if not hasattr(self, op): raise FuseOSError(EFAULT) return getattr(self, op)(*args) - + def access(self, path, amode): return 0 - + bmap = None - + def chmod(self, path, mode): raise FuseOSError(EROFS) - + def chown(self, path, uid, gid): raise FuseOSError(EROFS) - + def create(self, path, mode, fi=None): """When raw_fi is False (default case), fi is None and create should return a numerical file handle. When raw_fi is True the file handle should be set directly by create and return 0.""" raise FuseOSError(EROFS) - + def destroy(self, path): """Called on filesystem destruction. Path is always /""" pass - + def flush(self, path, fh): return 0 - + def fsync(self, path, datasync, fh): return 0 - + def fsyncdir(self, path, datasync, fh): return 0 - + def getattr(self, path, fh=None): """Returns a dictionary with keys identical to the stat C structure of stat(2). @@ -576,33 +576,33 @@ class Operations(object): NOTE: There is an incombatibility between Linux and Mac OS X concerning st_nlink of directories. Mac OS X counts all files inside the directory, while Linux counts only the subdirectories.""" - + if path != '/': raise FuseOSError(ENOENT) return dict(st_mode=(S_IFDIR | 0755), st_nlink=2) - + def getxattr(self, path, name, position=0): raise FuseOSError(ENOTSUP) - + def init(self, path): """Called on filesystem initialization. Path is always / Use it instead of __init__ if you start threads on initialization.""" pass - + def link(self, target, source): raise FuseOSError(EROFS) - + def listxattr(self, path): return [] - + lock = None - + def mkdir(self, path, mode): raise FuseOSError(EROFS) - + def mknod(self, path, mode, dev): raise FuseOSError(EROFS) - + def open(self, path, flags): """When raw_fi is False (default case), open should return a numerical file handle. @@ -610,60 +610,60 @@ class Operations(object): open(self, path, fi) and the file handle should be set directly.""" return 0 - + def opendir(self, path): """Returns a numerical file handle.""" return 0 - + def read(self, path, size, offset, fh): """Returns a string containing the data requested.""" raise FuseOSError(EIO) - + def readdir(self, path, fh): """Can return either a list of names, or a list of (name, attrs, offset) tuples. attrs is a dict as in getattr.""" return ['.', '..'] - + def readlink(self, path): raise FuseOSError(ENOENT) - + def release(self, path, fh): return 0 - + def releasedir(self, path, fh): return 0 - + def removexattr(self, path, name): raise FuseOSError(ENOTSUP) - + def rename(self, old, new): raise FuseOSError(EROFS) - + def rmdir(self, path): raise FuseOSError(EROFS) - + def setxattr(self, path, name, value, options, position=0): raise FuseOSError(ENOTSUP) - + def statfs(self, path): """Returns a dictionary with keys identical to the statvfs C structure of statvfs(3). On Mac OS X f_bsize and f_frsize must be a power of 2 (minimum 512).""" return {} - + def symlink(self, target, source): raise FuseOSError(EROFS) - + def truncate(self, path, length, fh=None): raise FuseOSError(EROFS) - + def unlink(self, path): raise FuseOSError(EROFS) - + def utimens(self, path, times=None): """Times is a (atime, mtime) tuple. If None use current time.""" return 0 - + def write(self, path, data, offset, fh): raise FuseOSError(EROFS) diff --git a/fs/expose/fuse/fuse_ctypes.py b/fs/expose/fuse/fuse_ctypes.py index 375a819..4e247f0 100644 --- a/fs/expose/fuse/fuse_ctypes.py +++ b/fs/expose/fuse/fuse_ctypes.py @@ -1,9 +1,9 @@ # Copyright (c) 2008 Giorgos Verigakis <verigak@gmail.com> -# +# # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. -# +# # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR @@ -104,7 +104,7 @@ if _system in ('Darwin', 'Darwin-MacFuse', 'FreeBSD'): ('st_size', c_off_t), ('st_blocks', c_int64), ('st_blksize', c_int32)] - + elif _system == 'Linux': ENOTSUP = 95 c_dev_t = c_ulonglong @@ -117,7 +117,7 @@ elif _system == 'Linux': c_uid_t = c_uint setxattr_t = CFUNCTYPE(c_int, c_char_p, c_char_p, POINTER(c_byte), c_size_t, c_int) getxattr_t = CFUNCTYPE(c_int, c_char_p, c_char_p, POINTER(c_byte), c_size_t) - + _machine = machine() if _machine == 'x86_64': c_stat._fields_ = [ @@ -296,12 +296,12 @@ class FUSE(object): """This class is the lower level interface and should not be subclassed under normal use. Its methods are called by fuse. Assumes API version 2.6 or later.""" - + def __init__(self, operations, mountpoint, raw_fi=False, **kwargs): """Setting raw_fi to True will cause FUSE to pass the fuse_file_info class as is to Operations, instead of just the fh field. This gives you access to direct_io, keep_cache, etc.""" - + self.operations = operations self.raw_fi = raw_fi args = ['fuse'] @@ -317,7 +317,7 @@ class FUSE(object): for key, val in kwargs.items())) args.append(mountpoint) argv = (c_char_p * len(args))(*args) - + fuse_ops = fuse_operations() for name, prototype in fuse_operations._fields_: if prototype != c_voidp and getattr(operations, name, None): @@ -326,7 +326,7 @@ class FUSE(object): _libfuse.fuse_main_real(len(args), argv, pointer(fuse_ops), sizeof(fuse_ops), None) del self.operations # Invoke the destructor - + def _wrapper_(self, func, *args, **kwargs): """Decorator for the methods that follow""" try: @@ -336,46 +336,46 @@ class FUSE(object): except: print_exc() return -EFAULT - + def getattr(self, path, buf): return self.fgetattr(path, buf, None) - + def readlink(self, path, buf, bufsize): ret = self.operations('readlink', path) data = create_string_buffer(ret[:bufsize - 1]) memmove(buf, data, len(data)) return 0 - + def mknod(self, path, mode, dev): return self.operations('mknod', path, mode, dev) - + def mkdir(self, path, mode): return self.operations('mkdir', path, mode) - + def unlink(self, path): return self.operations('unlink', path) - + def rmdir(self, path): return self.operations('rmdir', path) - + def symlink(self, source, target): return self.operations('symlink', target, source) - + def rename(self, old, new): return self.operations('rename', old, new) - + def link(self, source, target): return self.operations('link', target, source) - + def chmod(self, path, mode): return self.operations('chmod', path, mode) - + def chown(self, path, uid, gid): return self.operations('chown', path, uid, gid) - + def truncate(self, path, length): return self.operations('truncate', path, length) - + def open(self, path, fip): fi = fip.contents if self.raw_fi: @@ -383,7 +383,7 @@ class FUSE(object): else: fi.fh = self.operations('open', path, fi.flags) return 0 - + def read(self, path, buf, size, offset, fip): fh = fip.contents if self.raw_fi else fip.contents.fh ret = self.operations('read', path, size, offset, fh) @@ -391,12 +391,12 @@ class FUSE(object): strbuf = create_string_buffer(ret) memmove(buf, strbuf, len(strbuf)) return len(ret) - + def write(self, path, buf, size, offset, fip): data = string_at(buf, size) fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('write', path, data, offset, fh) - + def statfs(self, path, buf): stv = buf.contents attrs = self.operations('statfs', path) @@ -404,23 +404,23 @@ class FUSE(object): if hasattr(stv, key): setattr(stv, key, val) return 0 - + def flush(self, path, fip): fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('flush', path, fh) - + def release(self, path, fip): fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('release', path, fh) - + def fsync(self, path, datasync, fip): fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('fsync', path, datasync, fh) - + def setxattr(self, path, name, value, size, options, *args): data = string_at(value, size) return self.operations('setxattr', path, name, data, options, *args) - + def getxattr(self, path, name, value, size, *args): ret = self.operations('getxattr', path, name, *args) retsize = len(ret) @@ -430,7 +430,7 @@ class FUSE(object): return -ERANGE memmove(value, buf, retsize) return retsize - + def listxattr(self, path, namebuf, size): ret = self.operations('listxattr', path) if ret: @@ -443,15 +443,15 @@ class FUSE(object): return -ERANGE memmove(namebuf, buf, bufsize) return bufsize - + def removexattr(self, path, name): return self.operations('removexattr', path, name) - + def opendir(self, path, fip): # Ignore raw_fi fip.contents.fh = self.operations('opendir', path) return 0 - + def readdir(self, path, buf, filler, offset, fip): # Ignore raw_fi for item in self.operations('readdir', path, fip.contents.fh): @@ -467,24 +467,24 @@ class FUSE(object): if filler(buf, name, st, offset) != 0: break return 0 - + def releasedir(self, path, fip): # Ignore raw_fi return self.operations('releasedir', path, fip.contents.fh) - + def fsyncdir(self, path, datasync, fip): # Ignore raw_fi return self.operations('fsyncdir', path, datasync, fip.contents.fh) - + def init(self, conn): return self.operations('init', '/') - + def destroy(self, private_data): return self.operations('destroy', '/') - + def access(self, path, amode): return self.operations('access', path, amode) - + def create(self, path, mode, fip): fi = fip.contents if self.raw_fi: @@ -492,11 +492,11 @@ class FUSE(object): else: fi.fh = self.operations('create', path, mode) return 0 - + def ftruncate(self, path, length, fip): fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('truncate', path, length, fh) - + def fgetattr(self, path, buf, fip): memset(buf, 0, sizeof(c_stat)) st = buf.contents @@ -504,11 +504,11 @@ class FUSE(object): attrs = self.operations('getattr', path, fh) set_st_attrs(st, attrs) return 0 - + def lock(self, path, fip, cmd, lock): fh = fip.contents if self.raw_fi else fip.contents.fh return self.operations('lock', path, fh, cmd, lock) - + def utimens(self, path, buf): if buf: atime = time_of_timespec(buf.contents.actime) @@ -517,7 +517,7 @@ class FUSE(object): else: times = None return self.operations('utimens', path, times) - + def bmap(self, path, blocksize, idx): return self.operations('bmap', path, blocksize, idx) @@ -526,46 +526,46 @@ class Operations(object): """This class should be subclassed and passed as an argument to FUSE on initialization. All operations should raise an OSError exception on error. - + When in doubt of what an operation should do, check the FUSE header file or the corresponding system call man page.""" - + def __call__(self, op, *args): if not hasattr(self, op): raise OSError(EFAULT, '') return getattr(self, op)(*args) - + def access(self, path, amode): return 0 - + bmap = None - + def chmod(self, path, mode): raise OSError(EROFS, '') - + def chown(self, path, uid, gid): raise OSError(EROFS, '') - + def create(self, path, mode, fi=None): """When raw_fi is False (default case), fi is None and create should return a numerical file handle. When raw_fi is True the file handle should be set directly by create and return 0.""" raise OSError(EROFS, '') - + def destroy(self, path): """Called on filesystem destruction. Path is always /""" pass - + def flush(self, path, fh): return 0 - + def fsync(self, path, datasync, fh): return 0 - + def fsyncdir(self, path, datasync, fh): return 0 - + def getattr(self, path, fh=None): """Returns a dictionary with keys identical to the stat C structure of stat(2). @@ -573,33 +573,33 @@ class Operations(object): NOTE: There is an incombatibility between Linux and Mac OS X concerning st_nlink of directories. Mac OS X counts all files inside the directory, while Linux counts only the subdirectories.""" - + if path != '/': raise OSError(ENOENT, '') return dict(st_mode=(S_IFDIR | 0755), st_nlink=2) - + def getxattr(self, path, name, position=0): raise OSError(ENOTSUP, '') - + def init(self, path): """Called on filesystem initialization. Path is always / Use it instead of __init__ if you start threads on initialization.""" pass - + def link(self, target, source): raise OSError(EROFS, '') - + def listxattr(self, path): return [] - + lock = None - + def mkdir(self, path, mode): raise OSError(EROFS, '') - + def mknod(self, path, mode, dev): raise OSError(EROFS, '') - + def open(self, path, flags): """When raw_fi is False (default case), open should return a numerical file handle. @@ -607,60 +607,60 @@ class Operations(object): open(self, path, fi) and the file handle should be set directly.""" return 0 - + def opendir(self, path): """Returns a numerical file handle.""" return 0 - + def read(self, path, size, offset, fh): """Returns a string containing the data requested.""" raise OSError(ENOENT, '') - + def readdir(self, path, fh): """Can return either a list of names, or a list of (name, attrs, offset) tuples. attrs is a dict as in getattr.""" return ['.', '..'] - + def readlink(self, path): raise OSError(ENOENT, '') - + def release(self, path, fh): return 0 - + def releasedir(self, path, fh): return 0 - + def removexattr(self, path, name): raise OSError(ENOTSUP, '') - + def rename(self, old, new): raise OSError(EROFS, '') - + def rmdir(self, path): raise OSError(EROFS, '') - + def setxattr(self, path, name, value, options, position=0): raise OSError(ENOTSUP, '') - + def statfs(self, path): """Returns a dictionary with keys identical to the statvfs C structure of statvfs(3). On Mac OS X f_bsize and f_frsize must be a power of 2 (minimum 512).""" return {} - + def symlink(self, target, source): raise OSError(EROFS, '') - + def truncate(self, path, length, fh=None): raise OSError(EROFS, '') - + def unlink(self, path): raise OSError(EROFS, '') - + def utimens(self, path, times=None): """Times is a (atime, mtime) tuple. If None use current time.""" return 0 - + def write(self, path, data, offset, fh): raise OSError(EROFS, '') diff --git a/fs/expose/importhook.py b/fs/expose/importhook.py index e3d2d8e..cc8837f 100644 --- a/fs/expose/importhook.py +++ b/fs/expose/importhook.py @@ -240,4 +240,4 @@ class FSImportHook(object): info = self._get_module_info(fullname) (path,type,ispkg) = info return path - + diff --git a/fs/expose/serve/packetstream.py b/fs/expose/serve/packetstream.py index 6f24fd9..0449937 100644 --- a/fs/expose/serve/packetstream.py +++ b/fs/expose/serve/packetstream.py @@ -12,21 +12,21 @@ def encode(header='', payload=''): def textsize(s): if s: return str(len(s)) - return '' + return '' return '%i,%i:%s%s' % (textsize(header), textsize(payload), header, payload) class FileEncoder(object): - + def __init__(self, f): self.f = f - + def write(self, header='', payload=''): fwrite = self.f.write def textsize(s): if s: return str(len(s)) - return '' + return '' fwrite('%s,%s:' % (textsize(header), textsize(payload))) if header: fwrite(header) @@ -35,11 +35,11 @@ class FileEncoder(object): class JSONFileEncoder(FileEncoder): - + def write(self, header=None, payload=''): if header is None: super(JSONFileEncoder, self).write('', payload) - else: + else: header_json = dumps(header, separators=(',', ':')) super(JSONFileEncoder, self).write(header_json, payload) @@ -51,12 +51,12 @@ class PreludeError(DecoderError): pass class Decoder(object): - + STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4) MAX_PRELUDE = 255 - + def __init__(self, no_prelude=False, prelude_callback=None): - + self.prelude_callback = prelude_callback self.stream_broken = False self.expecting_bytes = None @@ -64,49 +64,49 @@ class Decoder(object): self._prelude = [] self._size = [] self._expecting_bytes = None - + self.header_size = None self.payload_size = None - + self._header_bytes = None self._payload_bytes = None - + self._header_data = [] self._payload_data = [] - + self.header = None self.payload = None - + if no_prelude: self.stage = self.STAGE_SIZE - - + + def feed(self, data): - + if self.stream_broken: raise DecoderError('Stream is broken') - + STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4) - + size_append = self._size.append header_append = self._header_data.append payload_append = self._payload_data.append datafind = data.find - + def reset_packet(): self.expecting_bytes = None del self._header_data[:] del self._payload_data[:] self.header = None self.payload = None - + data_len = len(data) data_pos = 0 - expecting_bytes = self.expecting_bytes + expecting_bytes = self.expecting_bytes stage = self.stage - + if stage == STAGE_PRELUDE: - max_find = min(len(data), data_pos + self.MAX_PRELUDE) + max_find = min(len(data), data_pos + self.MAX_PRELUDE) cr_pos = datafind('\n', data_pos, max_find) if cr_pos == -1: self._prelude.append(data[data_pos:]) @@ -119,53 +119,53 @@ class Decoder(object): if sum(len(s) for s in self._prelude) > self.MAX_PRELUDE: self.stream_broken = True raise PreludeError('Prelude not found') - data_pos = cr_pos + 1 + data_pos = cr_pos + 1 prelude = ''.join(self._prelude) del self._prelude[:] reset_packet() if not self.on_prelude(prelude): self.broken = True return - stage = STAGE_SIZE - + stage = STAGE_SIZE + while data_pos < data_len: - + if stage == STAGE_HEADER: bytes_to_read = min(data_len - data_pos, expecting_bytes) header_append(data[data_pos:data_pos + bytes_to_read]) data_pos += bytes_to_read - expecting_bytes -= bytes_to_read - if not expecting_bytes: - self.header = ''.join(self._header_data) + expecting_bytes -= bytes_to_read + if not expecting_bytes: + self.header = ''.join(self._header_data) if not self.payload_size: yield self.header, '' reset_packet() expecting_bytes = None stage = STAGE_SIZE - else: + else: stage = STAGE_PAYLOAD expecting_bytes = self.payload_size - + elif stage == STAGE_PAYLOAD: - bytes_to_read = min(data_len - data_pos, expecting_bytes) + bytes_to_read = min(data_len - data_pos, expecting_bytes) payload_append(data[data_pos:data_pos + bytes_to_read]) data_pos += bytes_to_read - expecting_bytes -= bytes_to_read - if not expecting_bytes: + expecting_bytes -= bytes_to_read + if not expecting_bytes: self.payload = ''.join(self._payload_data) yield self.header, self.payload reset_packet() stage = STAGE_SIZE expecting_bytes = None - + elif stage == STAGE_SIZE: term_pos = datafind(':', data_pos) if term_pos == -1: - size_append(data[data_pos:]) + size_append(data[data_pos:]) break else: size_append(data[data_pos:term_pos]) - data_pos = term_pos + 1 + data_pos = term_pos + 1 size = ''.join(self._size) del self._size[:] @@ -173,30 +173,30 @@ class Decoder(object): header_size, payload_size = size.split(',', 1) else: header_size = size - payload_size = '' + payload_size = '' try: self.header_size = int(header_size or '0') self.payload_size = int(payload_size or '0') except ValueError: self.stream_broken = False raise DecoderError('Invalid size in packet (%s)' % size) - + if self.header_size: - expecting_bytes = self.header_size + expecting_bytes = self.header_size stage = STAGE_HEADER elif self.payload_size: - expecting_bytes = self.payload_size + expecting_bytes = self.payload_size stage = STAGE_PAYLOAD else: # A completely empty packet, permitted, if a little odd yield '', '' - reset_packet() + reset_packet() expecting_bytes = None - self.expecting_bytes = expecting_bytes + self.expecting_bytes = expecting_bytes self.stage = stage - - + + def on_prelude(self, prelude): if self.prelude_callback and not self.prelude_callback(self, prelude): return False @@ -206,7 +206,7 @@ class Decoder(object): class JSONDecoder(Decoder): - + def feed(self, data): for header, payload in Decoder.feed(self, data): if header: @@ -215,9 +215,9 @@ class JSONDecoder(Decoder): header = {} yield header, payload - + if __name__ == "__main__": - + f = StringIO() encoder = JSONFileEncoder(f) encoder.write(dict(a=1, b=2), 'Payload') @@ -225,29 +225,29 @@ if __name__ == "__main__": encoder.write(None, 'Payload') encoder.write(dict(a=1)) encoder.write() - + stream = 'prelude\n' + f.getvalue() - + #print stream - + # packets = ['Prelude string\n', # encode('header', 'payload'), # encode('header number 2', 'second payload'), # encode('', '')] -# +# # stream = ''.join(packets) - + decoder = JSONDecoder() - + stream = 'pyfs/0.1\n59,13:{"type":"rpc","method":"ping","client_ref":"-1221142848:1"}Hello, World!' - + fdata = StringIO(stream) - + while 1: data = fdata.read(3) if not data: break for header, payload in decoder.feed(data): print "Header:", repr(header) - print "Payload:", repr(payload) -
\ No newline at end of file + print "Payload:", repr(payload) + diff --git a/fs/expose/serve/server.py b/fs/expose/serve/server.py index 7ac8c1f..d870175 100644 --- a/fs/expose/serve/server.py +++ b/fs/expose/serve/server.py @@ -9,24 +9,24 @@ from packetstream import JSONDecoder, JSONFileEncoder class _SocketFile(object): def __init__(self, socket): self.socket = socket - + def read(self, size): try: return self.socket.recv(size) except socket.error: return '' - + def write(self, data): self.socket.sendall(data) def remote_call(method_name=None): - method = method_name + method = method_name def deco(f): if not hasattr(f, '_remote_call_names'): - f._remote_call_names = [] + f._remote_call_names = [] f._remote_call_names.append(method or f.__name__) - return f + return f return deco @@ -36,34 +36,34 @@ class RemoteResponse(Exception): self.payload = payload class ConnectionHandlerBase(threading.Thread): - + _methods = {} - + def __init__(self, server, connection_id, socket, address): super(ConnectionHandlerBase, self).__init__() self.server = server - self.connection_id = connection_id + self.connection_id = connection_id self.socket = socket self.transport = _SocketFile(socket) - self.address = address + self.address = address self.encoder = JSONFileEncoder(self.transport) - self.decoder = JSONDecoder(prelude_callback=self.on_stream_prelude) - + self.decoder = JSONDecoder(prelude_callback=self.on_stream_prelude) + self._lock = threading.RLock() self.socket_error = None - - if not self._methods: + + if not self._methods: for method_name in dir(self): method = getattr(self, method_name) if callable(method) and hasattr(method, '_remote_call_names'): for name in method._remote_call_names: - + self._methods[name] = method - + print self._methods - - self.fs = None - + + self.fs = None + def run(self): self.transport.write('pyfs/1.0\n') while True: @@ -74,27 +74,27 @@ class ConnectionHandlerBase(threading.Thread): self.socket_error = socket_error break print "data", repr(data) - if data: - for packet in self.decoder.feed(data): + if data: + for packet in self.decoder.feed(data): print repr(packet) self.on_packet(*packet) else: - break + break self.on_connection_close() - + def close(self): with self._lock: self.socket.close() - + def on_connection_close(self): self.socket.shutdown(socket.SHUT_RDWR) self.socket.close() self.server.on_connection_close(self.connection_id) - + def on_stream_prelude(self, packet_stream, prelude): print "prelude", prelude return True - + def on_packet(self, header, payload): print '-' * 30 print repr(header) @@ -111,7 +111,7 @@ class ConnectionHandlerBase(threading.Thread): remote['response'] = response self.encoder.write(remote, '') except RemoteResponse, response: - self.encoder.write(response.header, response.payload) + self.encoder.write(response.header, response.payload) class RemoteFSConnection(ConnectionHandlerBase): @@ -122,9 +122,9 @@ class RemoteFSConnection(ConnectionHandlerBase): self.resource = resource from fs.memoryfs import MemoryFS self.fs = MemoryFS() - + class Server(object): - + def __init__(self, addr='', port=3000, connection_factory=RemoteFSConnection): self.addr = addr self.port = port @@ -133,41 +133,41 @@ class Server(object): self.connection_id = 0 self.threads = {} self._lock = threading.RLock() - + def serve_forever(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind((self.addr, self.port)) - + sock.bind((self.addr, self.port)) + sock.listen(5) - + try: while True: clientsocket, address = sock.accept() self.on_connect(clientsocket, address) except KeyboardInterrupt: pass - + try: self._close_graceful() except KeyboardInterrupt: self._close_harsh() - + def _close_graceful(self): """Tell all threads to exit and wait for them""" with self._lock: for connection in self.threads.itervalues(): - connection.close() + connection.close() for connection in self.threads.itervalues(): connection.join() self.threads.clear() - + def _close_harsh(self): with self._lock: for connection in self.threads.itervalues(): connection.close() self.threads.clear() - + def on_connect(self, clientsocket, address): print "Connection from", address with self._lock: @@ -175,17 +175,17 @@ class Server(object): thread = self.connection_factory(self, self.connection_id, clientsocket, - address) - self.threads[self.connection_id] = thread - thread.start() - + address) + self.threads[self.connection_id] = thread + thread.start() + def on_connection_close(self, connection_id): pass #with self._lock: # self.threads[connection_id].join() # del self.threads[connection_id] - + if __name__ == "__main__": server = Server() server.serve_forever() -
\ No newline at end of file + diff --git a/fs/expose/serve/threadpool.py b/fs/expose/serve/threadpool.py index f448a12..5c2ecd5 100644 --- a/fs/expose/serve/threadpool.py +++ b/fs/expose/serve/threadpool.py @@ -10,48 +10,48 @@ def make_job(job_callable, *args, **kwargs): class _PoolThread(threading.Thread): """ Internal thread class that runs jobs. """ - + def __init__(self, queue, name): super(_PoolThread, self).__init__() self.queue = queue self.name = name - + def __str__(self): return self.name - + def run(self): - + while True: try: - _priority, job = self.queue.get() - except queue.Empty: + _priority, job = self.queue.get() + except queue.Empty: break - - if job is None: + + if job is None: break - + if callable(job): try: - job() + job() except Exception, e: - print e + print e self.queue.task_done() - + class ThreadPool(object): - + def __init__(self, num_threads, size=None, name=''): - + self.num_threads = num_threads self.name = name self.queue = queue.PriorityQueue(size) self.job_no = 0 - + self.threads = [_PoolThread(self.queue, '%s #%i' % (name, i)) for i in xrange(num_threads)] - + for thread in self.threads: thread.start() - + def _make_priority_key(self, i): no = self.job_no self.job_no += 1 @@ -62,38 +62,38 @@ class ThreadPool(object): def job(): return job_callable(*args, **kwargs) self.queue.put( (self._make_priority_key(1), job), True ) - return self.job_no - + return self.job_no + def flush_quit(self): - """ Quit after all tasks on the queue have been processed. """ + """ Quit after all tasks on the queue have been processed. """ for thread in self.threads: - self.queue.put( (self._make_priority_key(1), None) ) + self.queue.put( (self._make_priority_key(1), None) ) for thread in self.threads: thread.join() - + def quit(self): """ Quit as soon as possible, potentially leaving tasks on the queue. """ for thread in self.threads: - self.queue.put( (self._make_priority_key(0), None) ) + self.queue.put( (self._make_priority_key(0), None) ) for thread in self.threads: thread.join() if __name__ == "__main__": import time - + def job(n): print "Starting #%i" % n time.sleep(1) print "Ending #%i" % n - + pool = ThreadPool(5, 'test thread') - + for n in range(20): pool.job(job, n) - + pool.flush_quit() - -
\ No newline at end of file + + |