diff options
author | willmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f> | 2011-05-09 11:54:21 +0000 |
---|---|---|
committer | willmcgugan <willmcgugan@67cdc799-7952-0410-af00-57a81ceafa0f> | 2011-05-09 11:54:21 +0000 |
commit | 5686efd7be3f91a6d693a8a356f7f59f22973821 (patch) | |
tree | 2fe66eaf89e01b185bed0a36d20203f196e31f51 /fs/expose | |
parent | 0001fb0a26b9ee03b60480c15ea4a28a677d35a9 (diff) | |
download | pyfilesystem-git-5686efd7be3f91a6d693a8a356f7f59f22973821.tar.gz |
Work in progress for a new filesystem server
Diffstat (limited to 'fs/expose')
-rw-r--r-- | fs/expose/serve/__init__.py | 1 | ||||
-rw-r--r-- | fs/expose/serve/packetstream.py | 253 | ||||
-rw-r--r-- | fs/expose/serve/server.py | 143 | ||||
-rw-r--r-- | fs/expose/serve/threadpool.py | 99 |
4 files changed, 496 insertions, 0 deletions
diff --git a/fs/expose/serve/__init__.py b/fs/expose/serve/__init__.py new file mode 100644 index 0000000..acfff8c --- /dev/null +++ b/fs/expose/serve/__init__.py @@ -0,0 +1 @@ +# Work in progress
\ No newline at end of file diff --git a/fs/expose/serve/packetstream.py b/fs/expose/serve/packetstream.py new file mode 100644 index 0000000..6f24fd9 --- /dev/null +++ b/fs/expose/serve/packetstream.py @@ -0,0 +1,253 @@ +try: + from json import dumps, loads +except ImportError: + from simplejson import dumps, loads +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + + +def encode(header='', payload=''): + def textsize(s): + if s: + return str(len(s)) + return '' + return '%i,%i:%s%s' % (textsize(header), textsize(payload), header, payload) + + +class FileEncoder(object): + + def __init__(self, f): + self.f = f + + def write(self, header='', payload=''): + fwrite = self.f.write + def textsize(s): + if s: + return str(len(s)) + return '' + fwrite('%s,%s:' % (textsize(header), textsize(payload))) + if header: + fwrite(header) + if payload: + fwrite(payload) + + +class JSONFileEncoder(FileEncoder): + + def write(self, header=None, payload=''): + if header is None: + super(JSONFileEncoder, self).write('', payload) + else: + header_json = dumps(header, separators=(',', ':')) + super(JSONFileEncoder, self).write(header_json, payload) + + +class DecoderError(Exception): + pass + +class PreludeError(DecoderError): + pass + +class Decoder(object): + + STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4) + MAX_PRELUDE = 255 + + def __init__(self, no_prelude=False, prelude_callback=None): + + self.prelude_callback = prelude_callback + self.stream_broken = False + self.expecting_bytes = None + self.stage = self.STAGE_PRELUDE + self._prelude = [] + self._size = [] + self._expecting_bytes = None + + self.header_size = None + self.payload_size = None + + self._header_bytes = None + self._payload_bytes = None + + self._header_data = [] + self._payload_data = [] + + self.header = None + self.payload = None + + if no_prelude: + self.stage = self.STAGE_SIZE + + + def feed(self, data): + + if self.stream_broken: + raise DecoderError('Stream is broken') + + STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4) + + size_append = self._size.append + header_append = self._header_data.append + payload_append = self._payload_data.append + datafind = data.find + + def reset_packet(): + self.expecting_bytes = None + del self._header_data[:] + del self._payload_data[:] + self.header = None + self.payload = None + + data_len = len(data) + data_pos = 0 + expecting_bytes = self.expecting_bytes + stage = self.stage + + if stage == STAGE_PRELUDE: + max_find = min(len(data), data_pos + self.MAX_PRELUDE) + cr_pos = datafind('\n', data_pos, max_find) + if cr_pos == -1: + self._prelude.append(data[data_pos:]) + data_pos = max_find + if sum(len(s) for s in self._prelude) > self.MAX_PRELUDE: + self.stream_broken = True + raise PreludeError('Prelude not found') + else: + self._prelude.append(data[data_pos:cr_pos]) + if sum(len(s) for s in self._prelude) > self.MAX_PRELUDE: + self.stream_broken = True + raise PreludeError('Prelude not found') + data_pos = cr_pos + 1 + prelude = ''.join(self._prelude) + del self._prelude[:] + reset_packet() + if not self.on_prelude(prelude): + self.broken = True + return + stage = STAGE_SIZE + + while data_pos < data_len: + + if stage == STAGE_HEADER: + bytes_to_read = min(data_len - data_pos, expecting_bytes) + header_append(data[data_pos:data_pos + bytes_to_read]) + data_pos += bytes_to_read + expecting_bytes -= bytes_to_read + if not expecting_bytes: + self.header = ''.join(self._header_data) + if not self.payload_size: + yield self.header, '' + reset_packet() + expecting_bytes = None + stage = STAGE_SIZE + else: + stage = STAGE_PAYLOAD + expecting_bytes = self.payload_size + + elif stage == STAGE_PAYLOAD: + bytes_to_read = min(data_len - data_pos, expecting_bytes) + payload_append(data[data_pos:data_pos + bytes_to_read]) + data_pos += bytes_to_read + expecting_bytes -= bytes_to_read + if not expecting_bytes: + self.payload = ''.join(self._payload_data) + yield self.header, self.payload + reset_packet() + stage = STAGE_SIZE + expecting_bytes = None + + elif stage == STAGE_SIZE: + term_pos = datafind(':', data_pos) + if term_pos == -1: + size_append(data[data_pos:]) + break + else: + size_append(data[data_pos:term_pos]) + data_pos = term_pos + 1 + + size = ''.join(self._size) + del self._size[:] + if ',' in size: + header_size, payload_size = size.split(',', 1) + else: + header_size = size + payload_size = '' + try: + self.header_size = int(header_size or '0') + self.payload_size = int(payload_size or '0') + except ValueError: + self.stream_broken = False + raise DecoderError('Invalid size in packet (%s)' % size) + + if self.header_size: + expecting_bytes = self.header_size + stage = STAGE_HEADER + elif self.payload_size: + expecting_bytes = self.payload_size + stage = STAGE_PAYLOAD + else: + # A completely empty packet, permitted, if a little odd + yield '', '' + reset_packet() + expecting_bytes = None + + self.expecting_bytes = expecting_bytes + self.stage = stage + + + def on_prelude(self, prelude): + if self.prelude_callback and not self.prelude_callback(self, prelude): + return False + #pass + #print "Prelude:", prelude + return True + + +class JSONDecoder(Decoder): + + def feed(self, data): + for header, payload in Decoder.feed(self, data): + if header: + header = loads(header) + else: + header = {} + yield header, payload + + +if __name__ == "__main__": + + f = StringIO() + encoder = JSONFileEncoder(f) + encoder.write(dict(a=1, b=2), 'Payload') + encoder.write(dict(foo="bar", nested=dict(apples="oranges"), alist=range(5)), 'Payload goes here') + encoder.write(None, 'Payload') + encoder.write(dict(a=1)) + encoder.write() + + stream = 'prelude\n' + f.getvalue() + + #print stream + +# packets = ['Prelude string\n', +# encode('header', 'payload'), +# encode('header number 2', 'second payload'), +# encode('', '')] +# +# stream = ''.join(packets) + + decoder = JSONDecoder() + + stream = 'pyfs/0.1\n59,13:{"type":"rpc","method":"ping","client_ref":"-1221142848:1"}Hello, World!' + + fdata = StringIO(stream) + + while 1: + data = fdata.read(3) + if not data: + break + for header, payload in decoder.feed(data): + print "Header:", repr(header) + print "Payload:", repr(payload) +
\ No newline at end of file diff --git a/fs/expose/serve/server.py b/fs/expose/serve/server.py new file mode 100644 index 0000000..c8082a1 --- /dev/null +++ b/fs/expose/serve/server.py @@ -0,0 +1,143 @@ +from __future__ import with_statement + +import socket +import threading +from packetstream import JSONDecoder, JSONFileEncoder + + + +class _SocketFile(object): + def __init__(self, socket): + self.socket = socket + + def read(self, size): + try: + return self.socket.recv(size) + except socket.error: + return '' + + def write(self, data): + self.socket.sendall(data) + +class ConnectionThread(threading.Thread): + + def __init__(self, server, connection_id, socket, address): + super(ConnectionThread, self).__init__() + self.server = server + self.connection_id = connection_id + self.socket = socket + self.transport = _SocketFile(socket) + self.address = address + self.encoder = JSONFileEncoder(self.transport) + self.decoder = JSONDecoder(prelude_callback=self.on_stream_prelude) + + self._lock = threading.RLock() + self.socket_error = None + + self.fs = None + + def run(self): + self.transport.write('pyfs/1.0\n') + while True: + try: + data = self.transport.read(4096) + except socket.error, socket_error: + print socket_error + self.socket_error = socket_error + break + print "data", repr(data) + if data: + for packet in self.decoder.feed(data): + print repr(packet) + self.on_packet(*packet) + else: + break + self.on_connection_close() + + def close(self): + with self._lock: + self.socket.close() + + def on_connection_close(self): + self.socket.shutdown(socket.SHUT_RDWR) + self.socket.close() + self.server.on_connection_close(self.connection_id) + + def on_stream_prelude(self, packet_stream, prelude): + print "prelude", prelude + return True + + def on_packet(self, header, payload): + print '-' * 30 + print repr(header) + print repr(payload) + + if header['method'] == 'ping': + self.encoder.write({'client_ref':header['client_ref']}, payload) + + +class Server(object): + + def __init__(self, addr='', port=3000): + self.addr = addr + self.port = port + self.socket = None + self.connection_id = 0 + self.threads = {} + self._lock = threading.RLock() + + def serve_forever(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((self.addr, self.port)) + + sock.listen(5) + + try: + while True: + clientsocket, address = sock.accept() + self.on_connect(clientsocket, address) + except KeyboardInterrupt: + pass + + try: + self._close_graceful() + except KeyboardInterrupt: + self._close_harsh() + + def _close_graceful(self): + """Tell all threads to exit and wait for them""" + with self._lock: + for connection in self.threads.itervalues(): + connection.close() + for connection in self.threads.itervalues(): + connection.join() + self.threads.clear() + + def _close_harsh(self): + with self._lock: + for connection in self.threads.itervalues(): + connection.close() + self.threads.clear() + + def on_connect(self, clientsocket, address): + print "Connection from", address + with self._lock: + self.connection_id += 1 + thread = ConnectionThread(self, + self.connection_id, + clientsocket, + address) + self.threads[self.connection_id] = thread + thread.start() + + def on_connection_close(self, connection_id): + pass + #with self._lock: + # self.threads[connection_id].join() + # del self.threads[connection_id] + +if __name__ == "__main__": + server = Server() + server.serve_forever() +
\ No newline at end of file diff --git a/fs/expose/serve/threadpool.py b/fs/expose/serve/threadpool.py new file mode 100644 index 0000000..f448a12 --- /dev/null +++ b/fs/expose/serve/threadpool.py @@ -0,0 +1,99 @@ +import threading +import Queue as queue + +def make_job(job_callable, *args, **kwargs): + """ Returns a callable that calls the supplied callable with given arguements. """ + def job(): + return job_callable(*args, **kwargs) + return job + + +class _PoolThread(threading.Thread): + """ Internal thread class that runs jobs. """ + + def __init__(self, queue, name): + super(_PoolThread, self).__init__() + self.queue = queue + self.name = name + + def __str__(self): + return self.name + + def run(self): + + while True: + try: + _priority, job = self.queue.get() + except queue.Empty: + break + + if job is None: + break + + if callable(job): + try: + job() + except Exception, e: + print e + self.queue.task_done() + + +class ThreadPool(object): + + def __init__(self, num_threads, size=None, name=''): + + self.num_threads = num_threads + self.name = name + self.queue = queue.PriorityQueue(size) + self.job_no = 0 + + self.threads = [_PoolThread(self.queue, '%s #%i' % (name, i)) for i in xrange(num_threads)] + + for thread in self.threads: + thread.start() + + def _make_priority_key(self, i): + no = self.job_no + self.job_no += 1 + return (i, no) + + def job(self, job_callable, *args, **kwargs): + """ Post a job to the queue. """ + def job(): + return job_callable(*args, **kwargs) + self.queue.put( (self._make_priority_key(1), job), True ) + return self.job_no + + def flush_quit(self): + """ Quit after all tasks on the queue have been processed. """ + for thread in self.threads: + self.queue.put( (self._make_priority_key(1), None) ) + for thread in self.threads: + thread.join() + + def quit(self): + """ Quit as soon as possible, potentially leaving tasks on the queue. """ + for thread in self.threads: + self.queue.put( (self._make_priority_key(0), None) ) + for thread in self.threads: + thread.join() + + +if __name__ == "__main__": + import time + + + def job(n): + print "Starting #%i" % n + time.sleep(1) + print "Ending #%i" % n + + pool = ThreadPool(5, 'test thread') + + for n in range(20): + pool.job(job, n) + + pool.flush_quit() + + +
\ No newline at end of file |