summaryrefslogtreecommitdiff
path: root/qpid/ruby/lib/qpid/connection08.rb
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/ruby/lib/qpid/connection08.rb
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-66765100f4257159622cefe57bed50125a5ad017.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/ruby/lib/qpid/connection08.rb')
-rw-r--r--qpid/ruby/lib/qpid/connection08.rb252
1 files changed, 252 insertions, 0 deletions
diff --git a/qpid/ruby/lib/qpid/connection08.rb b/qpid/ruby/lib/qpid/connection08.rb
new file mode 100644
index 0000000000..09a4888cc4
--- /dev/null
+++ b/qpid/ruby/lib/qpid/connection08.rb
@@ -0,0 +1,252 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "socket"
+require "qpid/codec08"
+
+module Qpid08
+
+ class Connection
+
+ def initialize(host, port, spec)
+ @host = host
+ @port = port
+ @spec = spec
+ end
+
+ attr_reader(:host, :port, :spec)
+
+ def connect()
+ @sock = TCPSocket.open(@host, @port)
+ @out = Encoder.new(@sock)
+ @in = Decoder.new(@sock)
+ end
+
+ def init()
+ @out.write("AMQP")
+ [1, 1, @spec.major, @spec.minor].each {|o|
+ @out.octet(o)
+ }
+ end
+
+ def write(frame)
+ # puts "OUT #{frame.inspect()}"
+ @out.octet(@spec.constants[frame.payload.type].id)
+ @out.short(frame.channel)
+ frame.payload.encode(@out)
+ @out.octet(frame_end)
+ end
+
+ def read()
+ type = @spec.constants[@in.octet()].name
+ channel = @in.short()
+ payload = Payload.decode(type, @spec, @in)
+ oct = @in.octet()
+ if oct != frame_end
+ raise Exception.new("framing error: expected #{frame_end}, got #{oct}")
+ end
+ frame = Frame.new(channel, payload)
+ # puts " IN #{frame.inspect}"
+ return frame
+ end
+
+ private
+
+ def frame_end
+ @spec.constants[:"frame_end"].id
+ end
+
+ end
+
+ class Frame
+
+ def initialize(channel, payload)
+ @channel = channel
+ @payload = payload
+ end
+
+ attr_reader(:channel, :payload)
+
+ end
+
+ class Payload
+
+ TYPES = {}
+
+ def Payload.singleton_method_added(name)
+ if name == :type
+ TYPES[type] = self
+ end
+ end
+
+ def Payload.decode(type, spec, dec)
+ klass = TYPES[type]
+ klass.decode(spec, dec)
+ end
+
+ end
+
+ class Method < Payload
+
+ def initialize(method, args)
+ if args.size != method.fields.size
+ raise ArgumentError.new("argument mismatch #{method} #{args}")
+ end
+ @method = method
+ @args = args
+ end
+
+ attr_reader(:method, :args)
+
+ def Method.type; :frame_method end
+
+ def type; Method.type end
+
+ def encode(encoder)
+ buf = StringWriter.new()
+ enc = Encoder.new(buf)
+ enc.short(@method.parent.id)
+ enc.short(@method.id)
+ @method.fields.zip(self.args).each {|f, a|
+ if a.nil?; a = f.default end
+ enc.encode(f.type, a)
+ }
+ enc.flush()
+ encoder.longstr(buf.to_s)
+ end
+
+ def Method.decode(spec, decoder)
+ buf = decoder.longstr()
+ dec = Decoder.new(StringReader.new(buf))
+ klass = spec.classes[dec.short()]
+ meth = klass.methods[dec.short()]
+ args = meth.fields.map {|f| dec.decode(f.type)}
+ return Method.new(meth, args)
+ end
+
+ def inspect(); "#{method.qname}(#{args.join(", ")})" end
+
+ end
+
+ class Header < Payload
+
+ def Header.type; :frame_header end
+
+ def initialize(klass, weight, size, properties)
+ @klass = klass
+ @weight = weight
+ @size = size
+ @properties = properties
+ end
+
+ attr_reader :weight, :size, :properties
+
+ def type; Header.type end
+
+ def encode(encoder)
+ buf = StringWriter.new()
+ enc = Encoder.new(buf)
+ enc.short(@klass.id)
+ enc.short(@weight)
+ enc.longlong(@size)
+
+ # property flags
+ nprops = @klass.fields.size
+ flags = 0
+ 0.upto(nprops - 1) do |i|
+ f = @klass.fields[i]
+ flags <<= 1
+ flags |= 1 unless @properties[f.name].nil?
+ # the last bit indicates more flags
+ if i > 0 and (i % 15) == 0
+ flags <<= 1
+ if nprops > (i + 1)
+ flags |= 1
+ enc.short(flags)
+ flags = 0
+ end
+ end
+ end
+ flags <<= ((16 - (nprops % 15)) % 16)
+ enc.short(flags)
+
+ # properties
+ @klass.fields.each do |f|
+ v = @properties[f.name]
+ enc.encode(f.type, v) unless v.nil?
+ end
+ enc.flush()
+ encoder.longstr(buf.to_s)
+ end
+
+ def Header.decode(spec, decoder)
+ dec = Decoder.new(StringReader.new(decoder.longstr()))
+ klass = spec.classes[dec.short()]
+ weight = dec.short()
+ size = dec.longlong()
+
+ # property flags
+ bits = []
+ while true
+ flags = dec.short()
+ 15.downto(1) do |i|
+ if flags >> i & 0x1 != 0
+ bits << true
+ else
+ bits << false
+ end
+ end
+ break if flags & 0x1 == 0
+ end
+
+ # properties
+ properties = {}
+ bits.zip(klass.fields).each do |b, f|
+ properties[f.name] = dec.decode(f.type) if b
+ end
+ return Header.new(klass, weight, size, properties)
+ end
+
+ def inspect(); "#{@klass.name}(#{@properties.inspect()})" end
+
+ end
+
+ class Body < Payload
+
+ def Body.type; :frame_body end
+
+ def type; Body.type end
+
+ def initialize(content)
+ @content = content
+ end
+
+ attr_reader :content
+
+ def encode(enc)
+ enc.longstr(@content)
+ end
+
+ def Body.decode(spec, dec)
+ return Body.new(dec.longstr())
+ end
+
+ end
+
+end