# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # require 'monitor' require 'logger' require 'sasl' module Qpid FIRST_SEG = 0x08 LAST_SEG = 0x04 FIRST_FRM = 0x02 LAST_FRM = 0x01 class << self attr_accessor :raw_logger, :frm_logger end def self.packed_size(format) # FIXME: This is a total copout to simulate Python's # struct.calcsize ([0]*256).pack(format).size end class Frame attr_reader :payload, :track, :flags, :type, :channel # HEADER = "!2BHxBH4x" # Python Meaning Ruby # ! big endian (implied by format char) # 2B 2 uchar C2 # H unsigned short n # x pad byte x # B uchar C # H unsigned short n # 4x pad byte x4 HEADER = "C2nxCnx4" HEADER_SIZE = Qpid::packed_size(HEADER) MAX_PAYLOAD = 65535 - HEADER_SIZE def initialize(flags, type, track, channel, payload) if payload.size > MAX_PAYLOAD raise ArgumentError, "max payload size exceeded: #{payload.size}" end @flags = flags @type = type @track = track @channel = channel @payload = payload end def first_segment? ; FIRST_SEG & @flags > 0 ; end def last_segment? ; LAST_SEG & @flags > 0 ; end def first_frame? ; FIRST_FRM & @flags > 0 ; end def last_frame? ; LAST_FRM & @flags > 0 ; end def to_s fs = first_segment? ? 'S' : '.' ls = last_segment? ? 's' : '.' ff = first_frame? ? 'F' : '.' lf = last_frame? ? 'f' : '.' return "%s%s%s%s %s %s %s %s" % [fs, ls, ff, lf, @type, @track, @channel, @payload.inspect] end end class FramingError < Exception ; end class Closed < Exception ; end class Framer include Packer # Python: "!4s4B" HEADER = "a4C4" HEADER_SIZE = 8 def raw Qpid::raw_logger end def frm Qpid::frm_logger end def initialize(sock) @sock = sock @sock.extend(MonitorMixin) @tx_buf = "" @rx_buf = "" @security_layer_tx = nil @security_layer_rx = nil @maxbufsize = 65535 end attr_reader :sock attr_accessor :security_layer_tx, :security_layer_rx def aborted? ; false ; end def write(buf) @tx_buf += buf end def flush @sock.synchronize do if @security_layer_tx cipher_buf = Sasl.encode(@security_layer_tx, @tx_buf) _write(cipher_buf) else _write(@tx_buf) end @tx_buf = "" frm.debug("FLUSHED") if frm end rescue @sock.close unless @sock.closed? end def _write(buf) while buf && buf.size > 0 # FIXME: Catch errors n = @sock.write(buf) raw.debug("SENT #{buf[0, n].inspect}") if raw buf[0,n] = "" @sock.flush end end def read(n) while @rx_buf.size < n begin s = @sock.recv(@maxbufsize) if @security_layer_rx s = Sasl.decode(@security_layer_rx, s) end rescue IOError => e raise e if @rx_buf != "" @sock.close unless @sock.closed? raise Closed end # FIXME: Catch errors if s.nil? or s.size == 0 @sock.close unless @sock.closed? raise Closed end @rx_buf += s raw.debug("RECV #{n}/#{@rx_buf.size} #{s.inspect}") if raw end data = @rx_buf[0, n] @rx_buf = @rx_buf[n, @rx_buf.size - n] return data end def read_header unpack(Framer::HEADER, Framer::HEADER_SIZE) end def write_header(major, minor) @sock.synchronize do pack(Framer::HEADER, "AMQP", 1, 1, major, minor) flush() end end def write_frame(frame) @sock.synchronize do size = frame.payload.size + Frame::HEADER_SIZE track = frame.track & 0x0F pack(Frame::HEADER, frame.flags, frame.type, size, track, frame.channel) write(frame.payload) if frame.last_segment? and frame.last_frame? flush() frm.debug("SENT #{frame}") if frm end end end def read_frame flags, type, size, track, channel = unpack(Frame::HEADER, Frame::HEADER_SIZE) raise FramingError if (flags & 0xF0 > 0) payload = read(size - Frame::HEADER_SIZE) frame = Frame.new(flags, type, track, channel, payload) frm.debug("RECV #{frame}") if frm return frame end end end