summaryrefslogtreecommitdiff
path: root/fs/expose
diff options
context:
space:
mode:
authorwillmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f>2011-05-09 11:54:21 +0000
committerwillmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f>2011-05-09 11:54:21 +0000
commit5686efd7be3f91a6d693a8a356f7f59f22973821 (patch)
tree2fe66eaf89e01b185bed0a36d20203f196e31f51 /fs/expose
parent0001fb0a26b9ee03b60480c15ea4a28a677d35a9 (diff)
downloadpyfilesystem-git-5686efd7be3f91a6d693a8a356f7f59f22973821.tar.gz
Work in progress for a new filesystem server
Diffstat (limited to 'fs/expose')
-rw-r--r--fs/expose/serve/__init__.py1
-rw-r--r--fs/expose/serve/packetstream.py253
-rw-r--r--fs/expose/serve/server.py143
-rw-r--r--fs/expose/serve/threadpool.py99
4 files changed, 496 insertions, 0 deletions
diff --git a/fs/expose/serve/__init__.py b/fs/expose/serve/__init__.py
new file mode 100644
index 0000000..acfff8c
--- /dev/null
+++ b/fs/expose/serve/__init__.py
@@ -0,0 +1 @@
+# Work in progress \ No newline at end of file
diff --git a/fs/expose/serve/packetstream.py b/fs/expose/serve/packetstream.py
new file mode 100644
index 0000000..6f24fd9
--- /dev/null
+++ b/fs/expose/serve/packetstream.py
@@ -0,0 +1,253 @@
+try:
+ from json import dumps, loads
+except ImportError:
+ from simplejson import dumps, loads
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+
+
+def encode(header='', payload=''):
+ def textsize(s):
+ if s:
+ return str(len(s))
+ return ''
+ return '%i,%i:%s%s' % (textsize(header), textsize(payload), header, payload)
+
+
+class FileEncoder(object):
+
+ def __init__(self, f):
+ self.f = f
+
+ def write(self, header='', payload=''):
+ fwrite = self.f.write
+ def textsize(s):
+ if s:
+ return str(len(s))
+ return ''
+ fwrite('%s,%s:' % (textsize(header), textsize(payload)))
+ if header:
+ fwrite(header)
+ if payload:
+ fwrite(payload)
+
+
+class JSONFileEncoder(FileEncoder):
+
+ def write(self, header=None, payload=''):
+ if header is None:
+ super(JSONFileEncoder, self).write('', payload)
+ else:
+ header_json = dumps(header, separators=(',', ':'))
+ super(JSONFileEncoder, self).write(header_json, payload)
+
+
+class DecoderError(Exception):
+ pass
+
+class PreludeError(DecoderError):
+ pass
+
+class Decoder(object):
+
+ STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4)
+ MAX_PRELUDE = 255
+
+ def __init__(self, no_prelude=False, prelude_callback=None):
+
+ self.prelude_callback = prelude_callback
+ self.stream_broken = False
+ self.expecting_bytes = None
+ self.stage = self.STAGE_PRELUDE
+ self._prelude = []
+ self._size = []
+ self._expecting_bytes = None
+
+ self.header_size = None
+ self.payload_size = None
+
+ self._header_bytes = None
+ self._payload_bytes = None
+
+ self._header_data = []
+ self._payload_data = []
+
+ self.header = None
+ self.payload = None
+
+ if no_prelude:
+ self.stage = self.STAGE_SIZE
+
+
+ def feed(self, data):
+
+ if self.stream_broken:
+ raise DecoderError('Stream is broken')
+
+ STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4)
+
+ size_append = self._size.append
+ header_append = self._header_data.append
+ payload_append = self._payload_data.append
+ datafind = data.find
+
+ def reset_packet():
+ self.expecting_bytes = None
+ del self._header_data[:]
+ del self._payload_data[:]
+ self.header = None
+ self.payload = None
+
+ data_len = len(data)
+ data_pos = 0
+ expecting_bytes = self.expecting_bytes
+ stage = self.stage
+
+ if stage == STAGE_PRELUDE:
+ max_find = min(len(data), data_pos + self.MAX_PRELUDE)
+ cr_pos = datafind('\n', data_pos, max_find)
+ if cr_pos == -1:
+ self._prelude.append(data[data_pos:])
+ data_pos = max_find
+ if sum(len(s) for s in self._prelude) > self.MAX_PRELUDE:
+ self.stream_broken = True
+ raise PreludeError('Prelude not found')
+ else:
+ self._prelude.append(data[data_pos:cr_pos])
+ if sum(len(s) for s in self._prelude) > self.MAX_PRELUDE:
+ self.stream_broken = True
+ raise PreludeError('Prelude not found')
+ data_pos = cr_pos + 1
+ prelude = ''.join(self._prelude)
+ del self._prelude[:]
+ reset_packet()
+ if not self.on_prelude(prelude):
+ self.broken = True
+ return
+ stage = STAGE_SIZE
+
+ while data_pos < data_len:
+
+ if stage == STAGE_HEADER:
+ bytes_to_read = min(data_len - data_pos, expecting_bytes)
+ header_append(data[data_pos:data_pos + bytes_to_read])
+ data_pos += bytes_to_read
+ expecting_bytes -= bytes_to_read
+ if not expecting_bytes:
+ self.header = ''.join(self._header_data)
+ if not self.payload_size:
+ yield self.header, ''
+ reset_packet()
+ expecting_bytes = None
+ stage = STAGE_SIZE
+ else:
+ stage = STAGE_PAYLOAD
+ expecting_bytes = self.payload_size
+
+ elif stage == STAGE_PAYLOAD:
+ bytes_to_read = min(data_len - data_pos, expecting_bytes)
+ payload_append(data[data_pos:data_pos + bytes_to_read])
+ data_pos += bytes_to_read
+ expecting_bytes -= bytes_to_read
+ if not expecting_bytes:
+ self.payload = ''.join(self._payload_data)
+ yield self.header, self.payload
+ reset_packet()
+ stage = STAGE_SIZE
+ expecting_bytes = None
+
+ elif stage == STAGE_SIZE:
+ term_pos = datafind(':', data_pos)
+ if term_pos == -1:
+ size_append(data[data_pos:])
+ break
+ else:
+ size_append(data[data_pos:term_pos])
+ data_pos = term_pos + 1
+
+ size = ''.join(self._size)
+ del self._size[:]
+ if ',' in size:
+ header_size, payload_size = size.split(',', 1)
+ else:
+ header_size = size
+ payload_size = ''
+ try:
+ self.header_size = int(header_size or '0')
+ self.payload_size = int(payload_size or '0')
+ except ValueError:
+ self.stream_broken = False
+ raise DecoderError('Invalid size in packet (%s)' % size)
+
+ if self.header_size:
+ expecting_bytes = self.header_size
+ stage = STAGE_HEADER
+ elif self.payload_size:
+ expecting_bytes = self.payload_size
+ stage = STAGE_PAYLOAD
+ else:
+ # A completely empty packet, permitted, if a little odd
+ yield '', ''
+ reset_packet()
+ expecting_bytes = None
+
+ self.expecting_bytes = expecting_bytes
+ self.stage = stage
+
+
+ def on_prelude(self, prelude):
+ if self.prelude_callback and not self.prelude_callback(self, prelude):
+ return False
+ #pass
+ #print "Prelude:", prelude
+ return True
+
+
+class JSONDecoder(Decoder):
+
+ def feed(self, data):
+ for header, payload in Decoder.feed(self, data):
+ if header:
+ header = loads(header)
+ else:
+ header = {}
+ yield header, payload
+
+
+if __name__ == "__main__":
+
+ f = StringIO()
+ encoder = JSONFileEncoder(f)
+ encoder.write(dict(a=1, b=2), 'Payload')
+ encoder.write(dict(foo="bar", nested=dict(apples="oranges"), alist=range(5)), 'Payload goes here')
+ encoder.write(None, 'Payload')
+ encoder.write(dict(a=1))
+ encoder.write()
+
+ stream = 'prelude\n' + f.getvalue()
+
+ #print stream
+
+# packets = ['Prelude string\n',
+# encode('header', 'payload'),
+# encode('header number 2', 'second payload'),
+# encode('', '')]
+#
+# stream = ''.join(packets)
+
+ decoder = JSONDecoder()
+
+ stream = 'pyfs/0.1\n59,13:{"type":"rpc","method":"ping","client_ref":"-1221142848:1"}Hello, World!'
+
+ fdata = StringIO(stream)
+
+ while 1:
+ data = fdata.read(3)
+ if not data:
+ break
+ for header, payload in decoder.feed(data):
+ print "Header:", repr(header)
+ print "Payload:", repr(payload)
+ \ No newline at end of file
diff --git a/fs/expose/serve/server.py b/fs/expose/serve/server.py
new file mode 100644
index 0000000..c8082a1
--- /dev/null
+++ b/fs/expose/serve/server.py
@@ -0,0 +1,143 @@
+from __future__ import with_statement
+
+import socket
+import threading
+from packetstream import JSONDecoder, JSONFileEncoder
+
+
+
+class _SocketFile(object):
+ def __init__(self, socket):
+ self.socket = socket
+
+ def read(self, size):
+ try:
+ return self.socket.recv(size)
+ except socket.error:
+ return ''
+
+ def write(self, data):
+ self.socket.sendall(data)
+
+class ConnectionThread(threading.Thread):
+
+ def __init__(self, server, connection_id, socket, address):
+ super(ConnectionThread, self).__init__()
+ self.server = server
+ self.connection_id = connection_id
+ self.socket = socket
+ self.transport = _SocketFile(socket)
+ self.address = address
+ self.encoder = JSONFileEncoder(self.transport)
+ self.decoder = JSONDecoder(prelude_callback=self.on_stream_prelude)
+
+ self._lock = threading.RLock()
+ self.socket_error = None
+
+ self.fs = None
+
+ def run(self):
+ self.transport.write('pyfs/1.0\n')
+ while True:
+ try:
+ data = self.transport.read(4096)
+ except socket.error, socket_error:
+ print socket_error
+ self.socket_error = socket_error
+ break
+ print "data", repr(data)
+ if data:
+ for packet in self.decoder.feed(data):
+ print repr(packet)
+ self.on_packet(*packet)
+ else:
+ break
+ self.on_connection_close()
+
+ def close(self):
+ with self._lock:
+ self.socket.close()
+
+ def on_connection_close(self):
+ self.socket.shutdown(socket.SHUT_RDWR)
+ self.socket.close()
+ self.server.on_connection_close(self.connection_id)
+
+ def on_stream_prelude(self, packet_stream, prelude):
+ print "prelude", prelude
+ return True
+
+ def on_packet(self, header, payload):
+ print '-' * 30
+ print repr(header)
+ print repr(payload)
+
+ if header['method'] == 'ping':
+ self.encoder.write({'client_ref':header['client_ref']}, payload)
+
+
+class Server(object):
+
+ def __init__(self, addr='', port=3000):
+ self.addr = addr
+ self.port = port
+ self.socket = None
+ self.connection_id = 0
+ self.threads = {}
+ self._lock = threading.RLock()
+
+ def serve_forever(self):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((self.addr, self.port))
+
+ sock.listen(5)
+
+ try:
+ while True:
+ clientsocket, address = sock.accept()
+ self.on_connect(clientsocket, address)
+ except KeyboardInterrupt:
+ pass
+
+ try:
+ self._close_graceful()
+ except KeyboardInterrupt:
+ self._close_harsh()
+
+ def _close_graceful(self):
+ """Tell all threads to exit and wait for them"""
+ with self._lock:
+ for connection in self.threads.itervalues():
+ connection.close()
+ for connection in self.threads.itervalues():
+ connection.join()
+ self.threads.clear()
+
+ def _close_harsh(self):
+ with self._lock:
+ for connection in self.threads.itervalues():
+ connection.close()
+ self.threads.clear()
+
+ def on_connect(self, clientsocket, address):
+ print "Connection from", address
+ with self._lock:
+ self.connection_id += 1
+ thread = ConnectionThread(self,
+ self.connection_id,
+ clientsocket,
+ address)
+ self.threads[self.connection_id] = thread
+ thread.start()
+
+ def on_connection_close(self, connection_id):
+ pass
+ #with self._lock:
+ # self.threads[connection_id].join()
+ # del self.threads[connection_id]
+
+if __name__ == "__main__":
+ server = Server()
+ server.serve_forever()
+ \ No newline at end of file
diff --git a/fs/expose/serve/threadpool.py b/fs/expose/serve/threadpool.py
new file mode 100644
index 0000000..f448a12
--- /dev/null
+++ b/fs/expose/serve/threadpool.py
@@ -0,0 +1,99 @@
+import threading
+import Queue as queue
+
+def make_job(job_callable, *args, **kwargs):
+ """ Returns a callable that calls the supplied callable with given arguements. """
+ def job():
+ return job_callable(*args, **kwargs)
+ return job
+
+
+class _PoolThread(threading.Thread):
+ """ Internal thread class that runs jobs. """
+
+ def __init__(self, queue, name):
+ super(_PoolThread, self).__init__()
+ self.queue = queue
+ self.name = name
+
+ def __str__(self):
+ return self.name
+
+ def run(self):
+
+ while True:
+ try:
+ _priority, job = self.queue.get()
+ except queue.Empty:
+ break
+
+ if job is None:
+ break
+
+ if callable(job):
+ try:
+ job()
+ except Exception, e:
+ print e
+ self.queue.task_done()
+
+
+class ThreadPool(object):
+
+ def __init__(self, num_threads, size=None, name=''):
+
+ self.num_threads = num_threads
+ self.name = name
+ self.queue = queue.PriorityQueue(size)
+ self.job_no = 0
+
+ self.threads = [_PoolThread(self.queue, '%s #%i' % (name, i)) for i in xrange(num_threads)]
+
+ for thread in self.threads:
+ thread.start()
+
+ def _make_priority_key(self, i):
+ no = self.job_no
+ self.job_no += 1
+ return (i, no)
+
+ def job(self, job_callable, *args, **kwargs):
+ """ Post a job to the queue. """
+ def job():
+ return job_callable(*args, **kwargs)
+ self.queue.put( (self._make_priority_key(1), job), True )
+ return self.job_no
+
+ def flush_quit(self):
+ """ Quit after all tasks on the queue have been processed. """
+ for thread in self.threads:
+ self.queue.put( (self._make_priority_key(1), None) )
+ for thread in self.threads:
+ thread.join()
+
+ def quit(self):
+ """ Quit as soon as possible, potentially leaving tasks on the queue. """
+ for thread in self.threads:
+ self.queue.put( (self._make_priority_key(0), None) )
+ for thread in self.threads:
+ thread.join()
+
+
+if __name__ == "__main__":
+ import time
+
+
+ def job(n):
+ print "Starting #%i" % n
+ time.sleep(1)
+ print "Ending #%i" % n
+
+ pool = ThreadPool(5, 'test thread')
+
+ for n in range(20):
+ pool.job(job, n)
+
+ pool.flush_quit()
+
+
+ \ No newline at end of file