diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/ruby/lib/qpid/connection08.rb | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/ruby/lib/qpid/connection08.rb')
-rw-r--r-- | qpid/ruby/lib/qpid/connection08.rb | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/qpid/ruby/lib/qpid/connection08.rb b/qpid/ruby/lib/qpid/connection08.rb new file mode 100644 index 0000000000..09a4888cc4 --- /dev/null +++ b/qpid/ruby/lib/qpid/connection08.rb @@ -0,0 +1,252 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "socket" +require "qpid/codec08" + +module Qpid08 + + class Connection + + def initialize(host, port, spec) + @host = host + @port = port + @spec = spec + end + + attr_reader(:host, :port, :spec) + + def connect() + @sock = TCPSocket.open(@host, @port) + @out = Encoder.new(@sock) + @in = Decoder.new(@sock) + end + + def init() + @out.write("AMQP") + [1, 1, @spec.major, @spec.minor].each {|o| + @out.octet(o) + } + end + + def write(frame) + # puts "OUT #{frame.inspect()}" + @out.octet(@spec.constants[frame.payload.type].id) + @out.short(frame.channel) + frame.payload.encode(@out) + @out.octet(frame_end) + end + + def read() + type = @spec.constants[@in.octet()].name + channel = @in.short() + payload = Payload.decode(type, @spec, @in) + oct = @in.octet() + if oct != frame_end + raise Exception.new("framing error: expected #{frame_end}, got #{oct}") + end + frame = Frame.new(channel, payload) + # puts " IN #{frame.inspect}" + return frame + end + + private + + def frame_end + @spec.constants[:"frame_end"].id + end + + end + + class Frame + + def initialize(channel, payload) + @channel = channel + @payload = payload + end + + attr_reader(:channel, :payload) + + end + + class Payload + + TYPES = {} + + def Payload.singleton_method_added(name) + if name == :type + TYPES[type] = self + end + end + + def Payload.decode(type, spec, dec) + klass = TYPES[type] + klass.decode(spec, dec) + end + + end + + class Method < Payload + + def initialize(method, args) + if args.size != method.fields.size + raise ArgumentError.new("argument mismatch #{method} #{args}") + end + @method = method + @args = args + end + + attr_reader(:method, :args) + + def Method.type; :frame_method end + + def type; Method.type end + + def encode(encoder) + buf = StringWriter.new() + enc = Encoder.new(buf) + enc.short(@method.parent.id) + enc.short(@method.id) + @method.fields.zip(self.args).each {|f, a| + if a.nil?; a = f.default end + enc.encode(f.type, a) + } + enc.flush() + encoder.longstr(buf.to_s) + end + + def Method.decode(spec, decoder) + buf = decoder.longstr() + dec = Decoder.new(StringReader.new(buf)) + klass = spec.classes[dec.short()] + meth = klass.methods[dec.short()] + args = meth.fields.map {|f| dec.decode(f.type)} + return Method.new(meth, args) + end + + def inspect(); "#{method.qname}(#{args.join(", ")})" end + + end + + class Header < Payload + + def Header.type; :frame_header end + + def initialize(klass, weight, size, properties) + @klass = klass + @weight = weight + @size = size + @properties = properties + end + + attr_reader :weight, :size, :properties + + def type; Header.type end + + def encode(encoder) + buf = StringWriter.new() + enc = Encoder.new(buf) + enc.short(@klass.id) + enc.short(@weight) + enc.longlong(@size) + + # property flags + nprops = @klass.fields.size + flags = 0 + 0.upto(nprops - 1) do |i| + f = @klass.fields[i] + flags <<= 1 + flags |= 1 unless @properties[f.name].nil? + # the last bit indicates more flags + if i > 0 and (i % 15) == 0 + flags <<= 1 + if nprops > (i + 1) + flags |= 1 + enc.short(flags) + flags = 0 + end + end + end + flags <<= ((16 - (nprops % 15)) % 16) + enc.short(flags) + + # properties + @klass.fields.each do |f| + v = @properties[f.name] + enc.encode(f.type, v) unless v.nil? + end + enc.flush() + encoder.longstr(buf.to_s) + end + + def Header.decode(spec, decoder) + dec = Decoder.new(StringReader.new(decoder.longstr())) + klass = spec.classes[dec.short()] + weight = dec.short() + size = dec.longlong() + + # property flags + bits = [] + while true + flags = dec.short() + 15.downto(1) do |i| + if flags >> i & 0x1 != 0 + bits << true + else + bits << false + end + end + break if flags & 0x1 == 0 + end + + # properties + properties = {} + bits.zip(klass.fields).each do |b, f| + properties[f.name] = dec.decode(f.type) if b + end + return Header.new(klass, weight, size, properties) + end + + def inspect(); "#{@klass.name}(#{@properties.inspect()})" end + + end + + class Body < Payload + + def Body.type; :frame_body end + + def type; Body.type end + + def initialize(content) + @content = content + end + + attr_reader :content + + def encode(enc) + enc.longstr(@content) + end + + def Body.decode(spec, dec) + return Body.new(dec.longstr()) + end + + end + +end |