summaryrefslogtreecommitdiff
path: root/qpid/ruby/lib/qpid/framer.rb
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/ruby/lib/qpid/framer.rb')
-rw-r--r--qpid/ruby/lib/qpid/framer.rb212
1 files changed, 212 insertions, 0 deletions
diff --git a/qpid/ruby/lib/qpid/framer.rb b/qpid/ruby/lib/qpid/framer.rb
new file mode 100644
index 0000000000..d057605383
--- /dev/null
+++ b/qpid/ruby/lib/qpid/framer.rb
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'monitor'
+require 'logger'
+require 'sasl'
+
+module Qpid
+
+ FIRST_SEG = 0x08
+ LAST_SEG = 0x04
+ FIRST_FRM = 0x02
+ LAST_FRM = 0x01
+
+ class << self
+ attr_accessor :raw_logger, :frm_logger
+ end
+
+ def self.packed_size(format)
+ # FIXME: This is a total copout to simulate Python's
+ # struct.calcsize
+ ([0]*256).pack(format).size
+ end
+
+ class Frame
+ attr_reader :payload, :track, :flags, :type, :channel
+
+ # HEADER = "!2BHxBH4x"
+ # Python Meaning Ruby
+ # ! big endian (implied by format char)
+ # 2B 2 uchar C2
+ # H unsigned short n
+ # x pad byte x
+ # B uchar C
+ # H unsigned short n
+ # 4x pad byte x4
+ HEADER = "C2nxCnx4"
+ HEADER_SIZE = Qpid::packed_size(HEADER)
+ MAX_PAYLOAD = 65535 - HEADER_SIZE
+
+ def initialize(flags, type, track, channel, payload)
+ if payload.size > MAX_PAYLOAD
+ raise ArgumentError, "max payload size exceeded: #{payload.size}"
+ end
+
+ @flags = flags
+ @type = type
+ @track = track
+ @channel = channel
+ @payload = payload
+ end
+
+ def first_segment? ; FIRST_SEG & @flags > 0 ; end
+
+ def last_segment? ; LAST_SEG & @flags > 0 ; end
+
+ def first_frame? ; FIRST_FRM & @flags > 0 ; end
+
+ def last_frame? ; LAST_FRM & @flags > 0 ; end
+
+ def to_s
+ fs = first_segment? ? 'S' : '.'
+ ls = last_segment? ? 's' : '.'
+ ff = first_frame? ? 'F' : '.'
+ lf = last_frame? ? 'f' : '.'
+
+ return "%s%s%s%s %s %s %s %s" % [fs, ls, ff, lf,
+ @type,
+ @track,
+ @channel,
+ @payload.inspect]
+ end
+ end
+
+ class FramingError < Exception ; end
+
+ class Closed < Exception ; end
+
+ class Framer
+ include Packer
+
+ # Python: "!4s4B"
+ HEADER = "a4C4"
+ HEADER_SIZE = 8
+
+ def raw
+ Qpid::raw_logger
+ end
+
+ def frm
+ Qpid::frm_logger
+ end
+
+ def initialize(sock)
+ @sock = sock
+ @sock.extend(MonitorMixin)
+ @tx_buf = ""
+ @rx_buf = ""
+ @security_layer_tx = nil
+ @security_layer_rx = nil
+ @maxbufsize = 65535
+ end
+
+ attr_reader :sock
+ attr_accessor :security_layer_tx, :security_layer_rx
+
+ def aborted? ; false ; end
+
+ def write(buf)
+ @tx_buf += buf
+ end
+
+ def flush
+ @sock.synchronize do
+ if @security_layer_tx
+ cipher_buf = Sasl.encode(@security_layer_tx, @tx_buf)
+ _write(cipher_buf)
+ else
+ _write(@tx_buf)
+ end
+ @tx_buf = ""
+ frm.debug("FLUSHED") if frm
+ end
+ rescue
+ @sock.close unless @sock.closed?
+ end
+
+ def _write(buf)
+ while buf && buf.size > 0
+ # FIXME: Catch errors
+ n = @sock.write(buf)
+ raw.debug("SENT #{buf[0, n].inspect}") if raw
+ buf[0,n] = ""
+ @sock.flush
+ end
+ end
+
+ def read(n)
+ while @rx_buf.size < n
+ begin
+ s = @sock.recv(@maxbufsize)
+ if @security_layer_rx
+ s = Sasl.decode(@security_layer_rx, s)
+ end
+ rescue IOError => e
+ raise e if @rx_buf != ""
+ @sock.close unless @sock.closed?
+ raise Closed
+ end
+ # FIXME: Catch errors
+ if s.nil? or s.size == 0
+ @sock.close unless @sock.closed?
+ raise Closed
+ end
+ @rx_buf += s
+ raw.debug("RECV #{n}/#{@rx_buf.size} #{s.inspect}") if raw
+ end
+ data = @rx_buf[0, n]
+ @rx_buf = @rx_buf[n, @rx_buf.size - n]
+ return data
+ end
+
+ def read_header
+ unpack(Framer::HEADER, Framer::HEADER_SIZE)
+ end
+
+ def write_header(major, minor)
+ @sock.synchronize do
+ pack(Framer::HEADER, "AMQP", 1, 1, major, minor)
+ flush()
+ end
+ end
+
+ def write_frame(frame)
+ @sock.synchronize do
+ size = frame.payload.size + Frame::HEADER_SIZE
+ track = frame.track & 0x0F
+ pack(Frame::HEADER, frame.flags, frame.type, size, track, frame.channel)
+ write(frame.payload)
+ if frame.last_segment? and frame.last_frame?
+ flush()
+ frm.debug("SENT #{frame}") if frm
+ end
+ end
+ end
+
+ def read_frame
+ flags, type, size, track, channel = unpack(Frame::HEADER, Frame::HEADER_SIZE)
+ raise FramingError if (flags & 0xF0 > 0)
+ payload = read(size - Frame::HEADER_SIZE)
+ frame = Frame.new(flags, type, track, channel, payload)
+ frm.debug("RECV #{frame}") if frm
+ return frame
+ end
+ end
+end