1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
module Qpid
class << self
attr_accessor :asm_logger
end
class Segment
attr_reader :type, :payload, :track, :channel
attr_accessor :id, :offset
def initialize(first, last, type, track, channel, payload)
@id = nil
@offset = nil
@first = first
@last = last
@type = type
@track = track
@channel = channel
@payload = payload
end
def first_segment? ; @first ; end
def last_segment? ; @last ; end
def decode(spec)
segs = spec[:segment_type]
choice = segs.enum.choices[type]
return method("decode_#{choice.name}").call(spec)
end
def decode_control(spec)
sc = StringCodec.new(spec, payload)
return sc.read_control()
end
def decode_command(spec)
sc = StringCodec.new(spec, payload)
hdr, cmd = sc.read_command()
cmd.id = id
return hdr, cmd
end
def decode_header(spec)
sc = StringCodec.new(spec, payload)
values = []
until sc.encoded.empty?
values << sc.read_struct32()
end
return values
end
def decode_body(spec)
payload
end
def append(frame)
@payload += frame.payload
end
def to_s
f = first_segment? ? 'F' : '.'
l = last_segment? ? 'L' : '.'
return "%s%s %s %s %s %s" % [f, l, @type,
@track, @channel, @payload.inspect]
end
end
class Assembler < Framer
def logger; Qpid::asm_logger; end
def initialize(sock, max_payload = Frame::MAX_PAYLOAD)
super(sock)
@max_payload = max_payload
@fragments = {}
end
def read_segment
loop do
frame = read_frame
key = [frame.channel, frame.track]
seg = @fragments[key]
unless seg
seg = Segment.new(frame.first_segment?,
frame.last_segment?,
frame.type, frame.track,
frame.channel, "")
@fragments[key] = seg
end
seg.append(frame)
if frame.last_frame?
@fragments.delete(key)
logger.debug("RECV #{seg}") if logger
return seg
end
end
end
def write_segment(segment)
remaining = segment.payload
first = true
while first or remaining
payload = remaining[0, @max_payload]
remaining = remaining[@max_payload, remaining.size]
flags = 0
flags |= FIRST_FRM if first
flags |= LAST_FRM unless remaining
flags |= FIRST_SEG if segment.first_segment?
flags |= LAST_SEG if segment.last_segment?
frame = Frame.new(flags, segment.type, segment.track,
segment.channel, payload)
write_frame(frame)
first = false
end
logger.debug("SENT #{segment}") if logger
end
end
end
|