1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from codec010 import StringCodec
from framer import *
from logging import getLogger
log = getLogger("qpid.io.seg")
class Segment:
def __init__(self, first, last, type, track, channel, payload):
self.id = None
self.offset = None
self.first = first
self.last = last
self.type = type
self.track = track
self.channel = channel
self.payload = payload
def decode(self, spec):
segs = spec["segment_type"]
choice = segs.choices[self.type]
return getattr(self, "decode_%s" % choice.name)(spec)
def decode_control(self, spec):
sc = StringCodec(spec, self.payload)
return sc.read_control()
def decode_command(self, spec):
sc = StringCodec(spec, self.payload)
hdr, cmd = sc.read_command()
cmd.id = self.id
return hdr, cmd
def decode_header(self, spec):
sc = StringCodec(spec, self.payload)
values = []
while len(sc.encoded) > 0:
values.append(sc.read_struct32())
return values
def decode_body(self, spec):
return self.payload
def __str__(self):
return "%s%s %s %s %s %r" % (int(self.first), int(self.last), self.type,
self.track, self.channel, self.payload)
def __repr__(self):
return str(self)
class Assembler(Framer):
def __init__(self, sock, max_payload = Frame.MAX_PAYLOAD):
Framer.__init__(self, sock)
self.max_payload = max_payload
self.fragments = {}
def read_segment(self):
while True:
frame = self.read_frame()
key = (frame.channel, frame.track)
seg = self.fragments.get(key)
if seg == None:
seg = Segment(frame.isFirstSegment(), frame.isLastSegment(),
frame.type, frame.track, frame.channel, "")
self.fragments[key] = seg
seg.payload += frame.payload
if frame.isLastFrame():
self.fragments.pop(key)
log.debug("RECV %s", seg)
return seg
def write_segment(self, segment):
remaining = segment.payload
first = True
while first or remaining:
payload = remaining[:self.max_payload]
remaining = remaining[self.max_payload:]
flags = 0
if first:
flags |= FIRST_FRM
first = False
if not remaining:
flags |= LAST_FRM
if segment.first:
flags |= FIRST_SEG
if segment.last:
flags |= LAST_SEG
frame = Frame(flags, segment.type, segment.track, segment.channel,
payload)
self.write_frame(frame)
log.debug("SENT %s", segment)
|