diff options
Diffstat (limited to 'qpid/ruby/lib/qpid/framer.rb')
-rw-r--r-- | qpid/ruby/lib/qpid/framer.rb | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/qpid/ruby/lib/qpid/framer.rb b/qpid/ruby/lib/qpid/framer.rb new file mode 100644 index 0000000000..d057605383 --- /dev/null +++ b/qpid/ruby/lib/qpid/framer.rb @@ -0,0 +1,212 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'monitor' +require 'logger' +require 'sasl' + +module Qpid + + FIRST_SEG = 0x08 + LAST_SEG = 0x04 + FIRST_FRM = 0x02 + LAST_FRM = 0x01 + + class << self + attr_accessor :raw_logger, :frm_logger + end + + def self.packed_size(format) + # FIXME: This is a total copout to simulate Python's + # struct.calcsize + ([0]*256).pack(format).size + end + + class Frame + attr_reader :payload, :track, :flags, :type, :channel + + # HEADER = "!2BHxBH4x" + # Python Meaning Ruby + # ! big endian (implied by format char) + # 2B 2 uchar C2 + # H unsigned short n + # x pad byte x + # B uchar C + # H unsigned short n + # 4x pad byte x4 + HEADER = "C2nxCnx4" + HEADER_SIZE = Qpid::packed_size(HEADER) + MAX_PAYLOAD = 65535 - HEADER_SIZE + + def initialize(flags, type, track, channel, payload) + if payload.size > MAX_PAYLOAD + raise ArgumentError, "max payload size exceeded: #{payload.size}" + end + + @flags = flags + @type = type + @track = track + @channel = channel + @payload = payload + end + + def first_segment? ; FIRST_SEG & @flags > 0 ; end + + def last_segment? ; LAST_SEG & @flags > 0 ; end + + def first_frame? ; FIRST_FRM & @flags > 0 ; end + + def last_frame? ; LAST_FRM & @flags > 0 ; end + + def to_s + fs = first_segment? ? 'S' : '.' + ls = last_segment? ? 's' : '.' + ff = first_frame? ? 'F' : '.' + lf = last_frame? ? 'f' : '.' + + return "%s%s%s%s %s %s %s %s" % [fs, ls, ff, lf, + @type, + @track, + @channel, + @payload.inspect] + end + end + + class FramingError < Exception ; end + + class Closed < Exception ; end + + class Framer + include Packer + + # Python: "!4s4B" + HEADER = "a4C4" + HEADER_SIZE = 8 + + def raw + Qpid::raw_logger + end + + def frm + Qpid::frm_logger + end + + def initialize(sock) + @sock = sock + @sock.extend(MonitorMixin) + @tx_buf = "" + @rx_buf = "" + @security_layer_tx = nil + @security_layer_rx = nil + @maxbufsize = 65535 + end + + attr_reader :sock + attr_accessor :security_layer_tx, :security_layer_rx + + def aborted? ; false ; end + + def write(buf) + @tx_buf += buf + end + + def flush + @sock.synchronize do + if @security_layer_tx + cipher_buf = Sasl.encode(@security_layer_tx, @tx_buf) + _write(cipher_buf) + else + _write(@tx_buf) + end + @tx_buf = "" + frm.debug("FLUSHED") if frm + end + rescue + @sock.close unless @sock.closed? + end + + def _write(buf) + while buf && buf.size > 0 + # FIXME: Catch errors + n = @sock.write(buf) + raw.debug("SENT #{buf[0, n].inspect}") if raw + buf[0,n] = "" + @sock.flush + end + end + + def read(n) + while @rx_buf.size < n + begin + s = @sock.recv(@maxbufsize) + if @security_layer_rx + s = Sasl.decode(@security_layer_rx, s) + end + rescue IOError => e + raise e if @rx_buf != "" + @sock.close unless @sock.closed? + raise Closed + end + # FIXME: Catch errors + if s.nil? or s.size == 0 + @sock.close unless @sock.closed? + raise Closed + end + @rx_buf += s + raw.debug("RECV #{n}/#{@rx_buf.size} #{s.inspect}") if raw + end + data = @rx_buf[0, n] + @rx_buf = @rx_buf[n, @rx_buf.size - n] + return data + end + + def read_header + unpack(Framer::HEADER, Framer::HEADER_SIZE) + end + + def write_header(major, minor) + @sock.synchronize do + pack(Framer::HEADER, "AMQP", 1, 1, major, minor) + flush() + end + end + + def write_frame(frame) + @sock.synchronize do + size = frame.payload.size + Frame::HEADER_SIZE + track = frame.track & 0x0F + pack(Frame::HEADER, frame.flags, frame.type, size, track, frame.channel) + write(frame.payload) + if frame.last_segment? and frame.last_frame? + flush() + frm.debug("SENT #{frame}") if frm + end + end + end + + def read_frame + flags, type, size, track, channel = unpack(Frame::HEADER, Frame::HEADER_SIZE) + raise FramingError if (flags & 0xF0 > 0) + payload = read(size - Frame::HEADER_SIZE) + frame = Frame.new(flags, type, track, channel, payload) + frm.debug("RECV #{frame}") if frm + return frame + end + end +end |