# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # module Qpid class << self attr_accessor :asm_logger end class Segment attr_reader :type, :payload, :track, :channel attr_accessor :id, :offset def initialize(first, last, type, track, channel, payload) @id = nil @offset = nil @first = first @last = last @type = type @track = track @channel = channel @payload = payload end def first_segment? ; @first ; end def last_segment? ; @last ; end def decode(spec) segs = spec[:segment_type] choice = segs.enum.choices[type] return method("decode_#{choice.name}").call(spec) end def decode_control(spec) sc = StringCodec.new(spec, payload) return sc.read_control() end def decode_command(spec) sc = StringCodec.new(spec, payload) hdr, cmd = sc.read_command() cmd.id = id return hdr, cmd end def decode_header(spec) sc = StringCodec.new(spec, payload) values = [] until sc.encoded.empty? values << sc.read_struct32() end return values end def decode_body(spec) payload end def append(frame) @payload += frame.payload end def to_s f = first_segment? ? 'F' : '.' l = last_segment? ? 'L' : '.' return "%s%s %s %s %s %s" % [f, l, @type, @track, @channel, @payload.inspect] end end class Assembler < Framer def logger; Qpid::asm_logger; end def initialize(sock, max_payload = Frame::MAX_PAYLOAD) super(sock) @max_payload = max_payload @fragments = {} end def read_segment loop do frame = read_frame key = [frame.channel, frame.track] seg = @fragments[key] unless seg seg = Segment.new(frame.first_segment?, frame.last_segment?, frame.type, frame.track, frame.channel, "") @fragments[key] = seg end seg.append(frame) if frame.last_frame? @fragments.delete(key) logger.debug("RECV #{seg}") if logger return seg end end end def write_segment(segment) remaining = segment.payload first = true while first or remaining payload = remaining[0, @max_payload] remaining = remaining[@max_payload, remaining.size] flags = 0 flags |= FIRST_FRM if first flags |= LAST_FRM unless remaining flags |= FIRST_SEG if segment.first_segment? flags |= LAST_SEG if segment.last_segment? frame = Frame.new(flags, segment.type, segment.track, segment.channel, payload) write_frame(frame) first = false end logger.debug("SENT #{segment}") if logger end end end