1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
|
try:
from json import dumps, loads
except ImportError:
from simplejson import dumps, loads
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
def encode(header='', payload=''):
def textsize(s):
if s:
return str(len(s))
return ''
return '%i,%i:%s%s' % (textsize(header), textsize(payload), header, payload)
class FileEncoder(object):
def __init__(self, f):
self.f = f
def write(self, header='', payload=''):
fwrite = self.f.write
def textsize(s):
if s:
return str(len(s))
return ''
fwrite('%s,%s:' % (textsize(header), textsize(payload)))
if header:
fwrite(header)
if payload:
fwrite(payload)
class JSONFileEncoder(FileEncoder):
def write(self, header=None, payload=''):
if header is None:
super(JSONFileEncoder, self).write('', payload)
else:
header_json = dumps(header, separators=(',', ':'))
super(JSONFileEncoder, self).write(header_json, payload)
class DecoderError(Exception):
pass
class PreludeError(DecoderError):
pass
class Decoder(object):
STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4)
MAX_PRELUDE = 255
def __init__(self, no_prelude=False, prelude_callback=None):
self.prelude_callback = prelude_callback
self.stream_broken = False
self.expecting_bytes = None
self.stage = self.STAGE_PRELUDE
self._prelude = []
self._size = []
self._expecting_bytes = None
self.header_size = None
self.payload_size = None
self._header_bytes = None
self._payload_bytes = None
self._header_data = []
self._payload_data = []
self.header = None
self.payload = None
if no_prelude:
self.stage = self.STAGE_SIZE
def feed(self, data):
if self.stream_broken:
raise DecoderError('Stream is broken')
STAGE_PRELUDE, STAGE_SIZE, STAGE_HEADER, STAGE_PAYLOAD = range(4)
size_append = self._size.append
header_append = self._header_data.append
payload_append = self._payload_data.append
datafind = data.find
def reset_packet():
self.expecting_bytes = None
del self._header_data[:]
del self._payload_data[:]
self.header = None
self.payload = None
data_len = len(data)
data_pos = 0
expecting_bytes = self.expecting_bytes
stage = self.stage
if stage == STAGE_PRELUDE:
max_find = min(len(data), data_pos + self.MAX_PRELUDE)
cr_pos = datafind('\n', data_pos, max_find)
if cr_pos == -1:
self._prelude.append(data[data_pos:])
data_pos = max_find
if sum(len(s) for s in self._prelude) > self.MAX_PRELUDE:
self.stream_broken = True
raise PreludeError('Prelude not found')
else:
self._prelude.append(data[data_pos:cr_pos])
if sum(len(s) for s in self._prelude) > self.MAX_PRELUDE:
self.stream_broken = True
raise PreludeError('Prelude not found')
data_pos = cr_pos + 1
prelude = ''.join(self._prelude)
del self._prelude[:]
reset_packet()
if not self.on_prelude(prelude):
self.broken = True
return
stage = STAGE_SIZE
while data_pos < data_len:
if stage == STAGE_HEADER:
bytes_to_read = min(data_len - data_pos, expecting_bytes)
header_append(data[data_pos:data_pos + bytes_to_read])
data_pos += bytes_to_read
expecting_bytes -= bytes_to_read
if not expecting_bytes:
self.header = ''.join(self._header_data)
if not self.payload_size:
yield self.header, ''
reset_packet()
expecting_bytes = None
stage = STAGE_SIZE
else:
stage = STAGE_PAYLOAD
expecting_bytes = self.payload_size
elif stage == STAGE_PAYLOAD:
bytes_to_read = min(data_len - data_pos, expecting_bytes)
payload_append(data[data_pos:data_pos + bytes_to_read])
data_pos += bytes_to_read
expecting_bytes -= bytes_to_read
if not expecting_bytes:
self.payload = ''.join(self._payload_data)
yield self.header, self.payload
reset_packet()
stage = STAGE_SIZE
expecting_bytes = None
elif stage == STAGE_SIZE:
term_pos = datafind(':', data_pos)
if term_pos == -1:
size_append(data[data_pos:])
break
else:
size_append(data[data_pos:term_pos])
data_pos = term_pos + 1
size = ''.join(self._size)
del self._size[:]
if ',' in size:
header_size, payload_size = size.split(',', 1)
else:
header_size = size
payload_size = ''
try:
self.header_size = int(header_size or '0')
self.payload_size = int(payload_size or '0')
except ValueError:
self.stream_broken = False
raise DecoderError('Invalid size in packet (%s)' % size)
if self.header_size:
expecting_bytes = self.header_size
stage = STAGE_HEADER
elif self.payload_size:
expecting_bytes = self.payload_size
stage = STAGE_PAYLOAD
else:
# A completely empty packet, permitted, if a little odd
yield '', ''
reset_packet()
expecting_bytes = None
self.expecting_bytes = expecting_bytes
self.stage = stage
def on_prelude(self, prelude):
if self.prelude_callback and not self.prelude_callback(self, prelude):
return False
#pass
#print "Prelude:", prelude
return True
class JSONDecoder(Decoder):
def feed(self, data):
for header, payload in Decoder.feed(self, data):
if header:
header = loads(header)
else:
header = {}
yield header, payload
if __name__ == "__main__":
f = StringIO()
encoder = JSONFileEncoder(f)
encoder.write(dict(a=1, b=2), 'Payload')
encoder.write(dict(foo="bar", nested=dict(apples="oranges"), alist=range(5)), 'Payload goes here')
encoder.write(None, 'Payload')
encoder.write(dict(a=1))
encoder.write()
stream = 'prelude\n' + f.getvalue()
#print stream
# packets = ['Prelude string\n',
# encode('header', 'payload'),
# encode('header number 2', 'second payload'),
# encode('', '')]
#
# stream = ''.join(packets)
decoder = JSONDecoder()
stream = 'pyfs/0.1\n59,13:{"type":"rpc","method":"ping","client_ref":"-1221142848:1"}Hello, World!'
fdata = StringIO(stream)
while 1:
data = fdata.read(3)
if not data:
break
for header, payload in decoder.feed(data):
print "Header:", repr(header)
print "Payload:", repr(payload)
|