diff options
Diffstat (limited to 'fs/expose/serve/packetstream.py')
-rw-r--r-- | fs/expose/serve/packetstream.py | 120 |
1 files changed, 60 insertions, 60 deletions
diff --git a/fs/expose/serve/packetstream.py b/fs/expose/serve/packetstream.py index 6f24fd9..0449937 100644 --- a/fs/expose/serve/packetstream.py +++ b/fs/expose/serve/packetstream.py @@ -12,21 +12,21 @@ def encode(header='', payload=''): def textsize(s): if s: return str(len(s)) - return '' + return '' return '%i,%i:%s%s' % (textsize(header), textsize(payload), header, payload) class FileEncoder(object): - + def __init__(self, f): self.f = f - + def write(self, header='', payload=''): fwrite = self.f.write def textsize(s): if s: return str(len(s)) - return '' + return '' fwrite('%s,%s:' % (textsize(header), textsize(payload))) if header: fwrite(header) @@ -35,11 +35,11 @@ class FileEncoder(object): class JSONFileEncoder(FileEncoder): - + def write(self, header=None, payload=''): if header is None: super(JSONFileEncoder, self).write('', payload) - else: + else: header_json = dumps(header, separators=(',', ':')) super(JSONFileEncoder, self).write(header_json, payload) @@ -51,12 +51,12 @@ class PreludeError(DecoderError): pass class Decoder(object): - + STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4) MAX_PRELUDE = 255 - + def __init__(self, no_prelude=False, prelude_callback=None): - + self.prelude_callback = prelude_callback self.stream_broken = False self.expecting_bytes = None @@ -64,49 +64,49 @@ class Decoder(object): self._prelude = [] self._size = [] self._expecting_bytes = None - + self.header_size = None self.payload_size = None - + self._header_bytes = None self._payload_bytes = None - + self._header_data = [] self._payload_data = [] - + self.header = None self.payload = None - + if no_prelude: self.stage = self.STAGE_SIZE - - + + def feed(self, data): - + if self.stream_broken: raise DecoderError('Stream is broken') - + STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4) - + size_append = self._size.append header_append = self._header_data.append payload_append = self._payload_data.append datafind = data.find - + def reset_packet(): self.expecting_bytes = None del self._header_data[:] del self._payload_data[:] self.header = None self.payload = None - + data_len = len(data) data_pos = 0 - expecting_bytes = self.expecting_bytes + expecting_bytes = self.expecting_bytes stage = self.stage - + if stage == STAGE_PRELUDE: - max_find = min(len(data), data_pos + self.MAX_PRELUDE) + max_find = min(len(data), data_pos + self.MAX_PRELUDE) cr_pos = datafind('\n', data_pos, max_find) if cr_pos == -1: self._prelude.append(data[data_pos:]) @@ -119,53 +119,53 @@ class Decoder(object): if sum(len(s) for s in self._prelude) > self.MAX_PRELUDE: self.stream_broken = True raise PreludeError('Prelude not found') - data_pos = cr_pos + 1 + data_pos = cr_pos + 1 prelude = ''.join(self._prelude) del self._prelude[:] reset_packet() if not self.on_prelude(prelude): self.broken = True return - stage = STAGE_SIZE - + stage = STAGE_SIZE + while data_pos < data_len: - + if stage == STAGE_HEADER: bytes_to_read = min(data_len - data_pos, expecting_bytes) header_append(data[data_pos:data_pos + bytes_to_read]) data_pos += bytes_to_read - expecting_bytes -= bytes_to_read - if not expecting_bytes: - self.header = ''.join(self._header_data) + expecting_bytes -= bytes_to_read + if not expecting_bytes: + self.header = ''.join(self._header_data) if not self.payload_size: yield self.header, '' reset_packet() expecting_bytes = None stage = STAGE_SIZE - else: + else: stage = STAGE_PAYLOAD expecting_bytes = self.payload_size - + elif stage == STAGE_PAYLOAD: - bytes_to_read = min(data_len - data_pos, expecting_bytes) + bytes_to_read = min(data_len - data_pos, expecting_bytes) payload_append(data[data_pos:data_pos + bytes_to_read]) data_pos += bytes_to_read - expecting_bytes -= bytes_to_read - if not expecting_bytes: + expecting_bytes -= bytes_to_read + if not expecting_bytes: self.payload = ''.join(self._payload_data) yield self.header, self.payload reset_packet() stage = STAGE_SIZE expecting_bytes = None - + elif stage == STAGE_SIZE: term_pos = datafind(':', data_pos) if term_pos == -1: - size_append(data[data_pos:]) + size_append(data[data_pos:]) break else: size_append(data[data_pos:term_pos]) - data_pos = term_pos + 1 + data_pos = term_pos + 1 size = ''.join(self._size) del self._size[:] @@ -173,30 +173,30 @@ class Decoder(object): header_size, payload_size = size.split(',', 1) else: header_size = size - payload_size = '' + payload_size = '' try: self.header_size = int(header_size or '0') self.payload_size = int(payload_size or '0') except ValueError: self.stream_broken = False raise DecoderError('Invalid size in packet (%s)' % size) - + if self.header_size: - expecting_bytes = self.header_size + expecting_bytes = self.header_size stage = STAGE_HEADER elif self.payload_size: - expecting_bytes = self.payload_size + expecting_bytes = self.payload_size stage = STAGE_PAYLOAD else: # A completely empty packet, permitted, if a little odd yield '', '' - reset_packet() + reset_packet() expecting_bytes = None - self.expecting_bytes = expecting_bytes + self.expecting_bytes = expecting_bytes self.stage = stage - - + + def on_prelude(self, prelude): if self.prelude_callback and not self.prelude_callback(self, prelude): return False @@ -206,7 +206,7 @@ class Decoder(object): class JSONDecoder(Decoder): - + def feed(self, data): for header, payload in Decoder.feed(self, data): if header: @@ -215,9 +215,9 @@ class JSONDecoder(Decoder): header = {} yield header, payload - + if __name__ == "__main__": - + f = StringIO() encoder = JSONFileEncoder(f) encoder.write(dict(a=1, b=2), 'Payload') @@ -225,29 +225,29 @@ if __name__ == "__main__": encoder.write(None, 'Payload') encoder.write(dict(a=1)) encoder.write() - + stream = 'prelude\n' + f.getvalue() - + #print stream - + # packets = ['Prelude string\n', # encode('header', 'payload'), # encode('header number 2', 'second payload'), # encode('', '')] -# +# # stream = ''.join(packets) - + decoder = JSONDecoder() - + stream = 'pyfs/0.1\n59,13:{"type":"rpc","method":"ping","client_ref":"-1221142848:1"}Hello, World!' - + fdata = StringIO(stream) - + while 1: data = fdata.read(3) if not data: break for header, payload in decoder.feed(data): print "Header:", repr(header) - print "Payload:", repr(payload) -
\ No newline at end of file + print "Payload:", repr(payload) + |