diff options
Diffstat (limited to 'ruby/lib/qpid/assembler.rb')
-rw-r--r-- | ruby/lib/qpid/assembler.rb | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/ruby/lib/qpid/assembler.rb b/ruby/lib/qpid/assembler.rb new file mode 100644 index 0000000000..b768c3f195 --- /dev/null +++ b/ruby/lib/qpid/assembler.rb @@ -0,0 +1,148 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + + class << self + attr_accessor :asm_logger + end + + class Segment + + attr_reader :type, :payload, :track, :channel + attr_accessor :id, :offset + + def initialize(first, last, type, track, channel, payload) + @id = nil + @offset = nil + @first = first + @last = last + @type = type + @track = track + @channel = channel + @payload = payload + end + + def first_segment? ; @first ; end + + def last_segment? ; @last ; end + + def decode(spec) + segs = spec[:segment_type] + choice = segs.enum.choices[type] + return method("decode_#{choice.name}").call(spec) + end + + def decode_control(spec) + sc = StringCodec.new(spec, payload) + return sc.read_control() + end + + def decode_command(spec) + sc = StringCodec.new(spec, payload) + hdr, cmd = sc.read_command() + cmd.id = id + return hdr, cmd + end + + def decode_header(spec) + sc = StringCodec.new(spec, payload) + values = [] + until sc.encoded.empty? + values << sc.read_struct32() + end + return values + end + + def decode_body(spec) + payload + end + + def append(frame) + @payload += frame.payload + end + + def to_s + f = first_segment? ? 'F' : '.' + l = last_segment? ? 'L' : '.' + return "%s%s %s %s %s %s" % [f, l, @type, + @track, @channel, @payload.inspect] + end + + end + + class Assembler < Framer + + def logger; Qpid::asm_logger; end + + def initialize(sock, max_payload = Frame::MAX_PAYLOAD) + super(sock) + @max_payload = max_payload + @fragments = {} + end + + def read_segment + loop do + frame = read_frame + key = [frame.channel, frame.track] + seg = @fragments[key] + unless seg + seg = Segment.new(frame.first_segment?, + frame.last_segment?, + frame.type, frame.track, + frame.channel, "") + @fragments[key] = seg + end + + seg.append(frame) + + if frame.last_frame? + @fragments.delete(key) + logger.debug("RECV #{seg}") if logger + return seg + end + end + end + + def write_segment(segment) + remaining = segment.payload + + first = true + while first or remaining + payload = remaining[0, @max_payload] + remaining = remaining[@max_payload, remaining.size] + + flags = 0 + + flags |= FIRST_FRM if first + flags |= LAST_FRM unless remaining + flags |= FIRST_SEG if segment.first_segment? + flags |= LAST_SEG if segment.last_segment? + + frame = Frame.new(flags, segment.type, segment.track, + segment.channel, payload) + write_frame(frame) + + first = false + end + + logger.debug("SENT #{segment}") if logger + end + end +end |