diff options
author | Rafael H. Schloming <rhs@apache.org> | 2007-03-07 17:13:02 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2007-03-07 17:13:02 +0000 |
commit | 838692e69a2ba3fbbd3f260629725602f2f4b966 (patch) | |
tree | 522613531c2a5b1d056eed8c4e7392ecb37240e7 /qpid/ruby | |
parent | 82075c0d635f29e1698b7adaff54e519aa501086 (diff) | |
download | qpid-python-838692e69a2ba3fbbd3f260629725602f2f4b966.tar.gz |
added test harness, tests, and a few missing pieces of implementation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@515652 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/ruby')
-rw-r--r-- | qpid/ruby/qpid.rb (renamed from qpid/ruby/test.rb) | 19 | ||||
-rw-r--r-- | qpid/ruby/qpid/client.rb | 30 | ||||
-rw-r--r-- | qpid/ruby/qpid/codec.rb | 5 | ||||
-rw-r--r-- | qpid/ruby/qpid/connection.rb | 119 | ||||
-rw-r--r-- | qpid/ruby/qpid/peer.rb | 74 | ||||
-rw-r--r-- | qpid/ruby/qpid/spec.rb | 40 | ||||
-rw-r--r-- | qpid/ruby/qpid/test.rb | 17 | ||||
-rwxr-xr-x | qpid/ruby/run-tests | 4 | ||||
-rw-r--r-- | qpid/ruby/tests/basic.rb | 68 | ||||
-rw-r--r-- | qpid/ruby/tests/channel.rb | 48 |
10 files changed, 360 insertions, 64 deletions
diff --git a/qpid/ruby/test.rb b/qpid/ruby/qpid.rb index f9de9a5a3a..19c3e305b7 100644 --- a/qpid/ruby/test.rb +++ b/qpid/ruby/qpid.rb @@ -17,19 +17,8 @@ # under the License. # -require "qpid/client" +require "qpid/queue" +require "qpid/codec" +require "qpid/connection" +require "qpid/peer" require "qpid/spec" - -def die(msg) - puts msg - exit(1) -end - -specfile = $*[0] -die("usage: test.rb <spec file>") if specfile.nil? - -c = Qpid::Client.new("0.0.0.0", 5672, Spec.load($*[0])) -c.start({"LOGIN" => "guest", "PASSWORD" => "guest"}) -ch = c.channel(1) -p ch.channel_open() -p ch.queue_declare() diff --git a/qpid/ruby/qpid/client.rb b/qpid/ruby/qpid/client.rb index 485dbf745d..f10f2e564b 100644 --- a/qpid/ruby/qpid/client.rb +++ b/qpid/ruby/qpid/client.rb @@ -24,11 +24,11 @@ require "qpid/queue" module Qpid class Client - def initialize(host, port, spec, vhost = nil) + def initialize(host, port, spec, vhost = "/") @host = host @port = port @spec = spec - @vhost = if vhost.nil?; host else vhost end + @vhost = vhost @mechanism = nil @response = nil @@ -38,6 +38,7 @@ module Qpid @mutex = Mutex.new() @closed = false + @code = nil @started = ConditionVariable.new() @conn = Connection.new(@host, @port, @spec) @@ -47,6 +48,8 @@ module Qpid attr_reader :mechanism, :response, :locale def closed?; @closed end + def closed=(value); @closed = value end + def code; @code end def wait() @mutex.synchronize do @@ -85,6 +88,12 @@ module Qpid def channel(id) return @peer.channel(id) end + + def close(msg = nil) + @closed = true + @code = msg + @peer.close() + end end class ClientDelegate @@ -104,6 +113,23 @@ module Qpid ch.connection_tune_ok(*msg.fields) @client.signal_start() end + + def connection_close(ch, msg) + puts "CONNECTION CLOSED: #{msg.args.join(", ")}" + @client.close(msg) + end + + def channel_close(ch, msg) + puts "CHANNEL[#{ch.id}] CLOSED: #{msg.args.join(", ")}" + ch.channel_close_ok() + ch.close() + end + + def basic_deliver(ch, msg) + queue = @client.queue(msg.consumer_tag) + queue << msg + end + end end diff --git a/qpid/ruby/qpid/codec.rb b/qpid/ruby/qpid/codec.rb index 2695228ba6..8d80e10aee 100644 --- a/qpid/ruby/qpid/codec.rb +++ b/qpid/ruby/qpid/codec.rb @@ -67,11 +67,10 @@ module Codec end def longlong(l) - # is this the right byte order? lower = l & 0xffffffff upper = (l & ~0xffffffff) >> 32 - long(lower) long(upper) + long(lower) end def shortstr(s) @@ -116,6 +115,7 @@ module Codec def write(str) flushbits() @out.write(str) +# puts "OUT #{str.inspect()}" end def pack(fmt, *args) @@ -238,6 +238,7 @@ module Codec if result.nil? or result.empty? raise EOF.new() else +# puts " IN #{result.inspect()}" return result end end diff --git a/qpid/ruby/qpid/connection.rb b/qpid/ruby/qpid/connection.rb index c79e9d8319..f6ee9cf1e4 100644 --- a/qpid/ruby/qpid/connection.rb +++ b/qpid/ruby/qpid/connection.rb @@ -48,6 +48,7 @@ module Qpid end def write(frame) +# puts "OUT #{frame.inspect()}" @out.octet(@spec.constants[frame.payload.type].id) @out.short(frame.channel) frame.payload.encode(@out) @@ -62,13 +63,15 @@ module Qpid if oct != frame_end raise Exception.new("framing error: expected #{frame_end}, got #{oct}") end - Frame.new(channel, payload) + frame = Frame.new(channel, payload) +# puts " IN #{frame.inspect}" + return frame end private def frame_end - @spec.constants[:"frame end"].id + @spec.constants[:"frame_end"].id end end @@ -113,9 +116,7 @@ module Qpid attr_reader(:method, :args) - def Method.type - :"frame method" - end + def Method.type; :frame_method end def type; Method.type end @@ -125,6 +126,7 @@ module Qpid enc.short(@method.parent.id) enc.short(@method.id) @method.fields.zip(self.args).each {|f, a| + if a.nil?; a = f.default end enc.encode(f.type, a) } enc.flush() @@ -140,6 +142,113 @@ module Qpid return Method.new(meth, args) end + def inspect(); "#{method.qname}(#{args.join(", ")})" end + + end + + class Header < Payload + + def Header.type; :frame_header end + + def initialize(klass, weight, size, properties) + @klass = klass + @weight = weight + @size = size + @properties = properties + end + + attr_reader :weight, :size, :properties + + def type; Header.type end + + def encode(encoder) + buf = StringWriter.new() + enc = Encoder.new(buf) + enc.short(@klass.id) + enc.short(@weight) + enc.longlong(@size) + + # property flags + nprops = @klass.fields.size + flags = 0 + 0.upto(nprops - 1) do |i| + f = @klass.fields[i] + flags <<= 1 + flags |= 1 unless @properties[f.name].nil? + # the last bit indicates more flags + if i > 0 and (i % 15) == 0 + flags <<= 1 + if nprops > (i + 1) + flags |= 1 + enc.short(flags) + flags = 0 + end + end + end + flags <<= ((16 - (nprops % 15)) % 16) + enc.short(flags) + + # properties + @klass.fields.each do |f| + v = @properties[f.name] + enc.encode(f.type, v) unless v.nil? + end + enc.flush() + encoder.longstr(buf.to_s) + end + + def Header.decode(spec, decoder) + dec = Decoder.new(StringReader.new(decoder.longstr())) + klass = spec.classes[dec.short()] + weight = dec.short() + size = dec.longlong() + + # property flags + bits = [] + while true + flags = dec.short() + 15.downto(1) do |i| + if flags >> i & 0x1 != 0 + bits << true + else + bits << false + end + end + break if flags & 0x1 == 0 + end + + # properties + properties = {} + bits.zip(klass.fields).each do |b, f| + properties[f.name] = dec.decode(f.type) if b + end + return Header.new(klass, weight, size, properties) + end + + def inspect(); "#{@klass.name}(#{@properties.inspect()})" end + + end + + class Body < Payload + + def Body.type; :frame_body end + + def type; Body.type end + + def initialize(content) + @content = content + end + + attr_reader :content + + def encode(enc) + enc.longstr(@content) + end + + def Body.decode(spec, dec) + return Body.new(dec.longstr()) + end + end end diff --git a/qpid/ruby/qpid/peer.rb b/qpid/ruby/qpid/peer.rb index 9e77165d01..320808fdc6 100644 --- a/qpid/ruby/qpid/peer.rb +++ b/qpid/ruby/qpid/peer.rb @@ -39,19 +39,33 @@ module Qpid @mutex.synchronize do ch = @channels[id] if ch.nil? - ch = Channel.new(id, @outgoing, @conn.spec) + ch = Channel.new(id, self, @outgoing, @conn.spec) @channels[id] = ch end return ch end end + def channel_delete(id) + @channels.delete(id) + end + def start() spawn(:writer) spawn(:reader) spawn(:worker) end + def close() + @mutex.synchronize do + @channels.each_value do |ch| + ch.close() + @outgoing.close() + @work.close() + end + end + end + private def spawn(method, *args) @@ -59,6 +73,8 @@ module Qpid begin send(method, *args) # is this the standard way to catch any exception? + rescue Closed => e + puts "#{method} #{e}" rescue Object => e print e e.backtrace.each do |line| @@ -94,7 +110,7 @@ module Qpid ch = channel(frame.channel) payload = frame.payload if payload.method.content? - content = read_content(queue) + content = Qpid::read_content(queue) else content = nil end @@ -106,25 +122,27 @@ module Qpid end class Channel - def initialize(id, outgoing, spec) + def initialize(id, peer, outgoing, spec) @id = id + @peer = peer @outgoing = outgoing @spec = spec @incoming = Queue.new() @responses = Queue.new() @queue = nil @closed = false - @reason = nil end + attr_reader :id + def closed?; @closed end - def close(reason) + def close() return if closed? + @peer.channel_delete(@id) @closed = true - @reason = reason @incoming.close() - @responses .close() + @responses.close() end def dispatch(frame, work) @@ -142,7 +160,7 @@ module Qpid end def method_missing(name, *args) - method = @spec.ruby_method(name) + method = @spec.find_method(name) if method.nil? raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}") end @@ -150,7 +168,7 @@ module Qpid if args.size == 1 and args[0].instance_of? Hash kwargs = args[0] invoke_args = method.fields.map do |f| - kwargs[f.ruby_name] + kwargs[f.name] end content = kwargs[:content] else @@ -173,13 +191,13 @@ module Qpid end def invoke(method, args, content = nil) - raise Closed(@reason) if closed? + raise Closed() if closed? frame = Frame.new(@id, Method.new(method, args)) @outgoing << frame if method.content? content = Content.new() if content.nil? - write_content(method.klass, content, @outgoing) + write_content(method.parent, content, @outgoing) end nowait = false @@ -204,7 +222,7 @@ module Qpid def write_content(klass, content, queue) size = content.size - header = Frame.new(@id, Header.new(klass, content.weight, size)) + header = Frame.new(@id, Header.new(klass, content.weight, size, content.headers)) queue << header content.children.each {|child| write_content(klass, child, queue)} queue << Frame.new(@id, Body.new(content.body)) if size > 0 @@ -212,7 +230,7 @@ module Qpid end - def read_content(queue) + def Qpid.read_content(queue) frame = queue.pop() header = frame.payload children = [] @@ -220,14 +238,30 @@ module Qpid size = header.size read = 0 buf = "" - while read << size - body = queue.get() + while read < size + body = queue.pop() content = body.payload.content buf << content read += content.size end buf.freeze() - return Content.new(buf, children, header.properties.clone()) + return Content.new(header.properties.clone(), buf, children) + end + + class Content + def initialize(headers = {}, body = "", children = []) + @headers = headers + @body = body + @children = children + end + + attr_reader :headers, :body, :children + + def size; body.size end + def weight; children.size end + + def [](key); @headers[key] end + def []=(key, value); @headers[key] = value end end class Message @@ -235,14 +269,18 @@ module Qpid alias fields args + def method_missing(name) + return args[@method.fields[name].id] + end + def inspect() - "#{method.ruby_name}(#{args.join(", ")})" + "#{method.qname}(#{args.join(", ")})" end end module Delegate def dispatch(ch, msg) - send(msg.method.ruby_name, ch, msg) + send(msg.method.qname, ch, msg) end end diff --git a/qpid/ruby/qpid/spec.rb b/qpid/ruby/qpid/spec.rb index c43bce7c25..9a04f584d0 100644 --- a/qpid/ruby/qpid/spec.rb +++ b/qpid/ruby/qpid/spec.rb @@ -55,14 +55,16 @@ module Spec class Root fields(:major, :minor, :classes, :constants, :domains) - def ruby_method(name) + def find_method(name) classes.each do |c| c.methods.each do |m| - if name == m.ruby_name + if name == m.qname return m end end end + + return nil end end @@ -91,18 +93,14 @@ module Spec def response?; @response end def response=(b); @response = b end - def ruby_name - Spec.rubyize(:"#{parent.name}_#{name}") + def qname + :"#{parent.name}_#{name}" end end class Field fields(:name, :id, :type, :docs) - def ruby_name - Spec.rubyize(name) - end - def default case type when :bit then false @@ -205,24 +203,28 @@ module Spec value.intern() unless value.nil? end + def parse_name(value) + value.gsub(/[\s-]/, '_').intern() unless value.nil? + end + def load_amqp() Root.new(attr("major", :int), attr("minor", :int), load("class"), load("constant"), load("domain")) end def load_class() - Class.new(attr("name", :symbol), attr("index", :int), attr("handler", :symbol), + Class.new(attr("name", :name), attr("index", :int), attr("handler", :name), load("field"), load("method"), load("doc")) end def load_method() - Method.new(attr("name", :symbol), attr("index", :int), + Method.new(attr("name", :name), attr("index", :int), attr("content", :bool), load("response"), attr("synchronous", :bool), load("field"), load("docs")) end def load_response() - name = attr("name", :symbol) + name = attr("name", :name) Reference.new {|spec, klass| response = klass.methods[name] if response.nil? @@ -233,23 +235,23 @@ module Spec end def load_field() - type = attr("type", :symbol) + type = attr("type", :name) if type.nil? - domain = attr("domain", :symbol) + domain = attr("domain", :name) type = Reference.new {|spec, klass| spec.domains[domain].type } end - Field.new(attr("name", :symbol), @index, type, load("docs")) + Field.new(attr("name", :name), @index, type, load("docs")) end def load_constant() - Constant.new(attr("name", :symbol), attr("value", :int), attr("class", :symbol), + Constant.new(attr("name", :name), attr("value", :int), attr("class", :name), load("doc")) end def load_domain() - Domain.new(attr("name", :symbol), attr("type", :symbol)) + Domain.new(attr("name", :name), attr("type", :name)) end def load_doc() @@ -284,10 +286,4 @@ module Spec spec end - private - - def Spec.rubyize(name) - name.to_s.gsub(/[\s-]/, '_').intern() - end - end diff --git a/qpid/ruby/qpid/test.rb b/qpid/ruby/qpid/test.rb new file mode 100644 index 0000000000..af57e8cf68 --- /dev/null +++ b/qpid/ruby/qpid/test.rb @@ -0,0 +1,17 @@ +require "qpid/spec" +require "qpid/client" + +module Qpid + + module Test + + def connect() + spec = Spec.load("../specs/amqp.0-8.xml") + c = Client.new("0.0.0.0", 5672, spec) + c.start({"LOGIN" => "guest", "PASSWORD" => "guest"}) + return c + end + + end + +end diff --git a/qpid/ruby/run-tests b/qpid/ruby/run-tests new file mode 100755 index 0000000000..b4c51a75ed --- /dev/null +++ b/qpid/ruby/run-tests @@ -0,0 +1,4 @@ +#!/usr/bin/ruby + +require "tests/channel" +require "tests/basic" diff --git a/qpid/ruby/tests/basic.rb b/qpid/ruby/tests/basic.rb new file mode 100644 index 0000000000..e9bbcedddb --- /dev/null +++ b/qpid/ruby/tests/basic.rb @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid/test" +require "qpid" + +class Basic < Test::Unit::TestCase + + include Qpid::Test + + def publish(body, headers = {}) + cli = connect() + ch = cli.channel(1) + ch.channel_open() + content = Qpid::Content.new(headers, body) + ch.basic_publish(:content => content) + msg = ch.channel_close() + assert msg.method.qname == :channel_close_ok + end + + def consume(body, headers = {}) + cli = connect() + ch = cli.channel(1) + ch.channel_open() + ch.queue_declare(:queue => "test-queue") + ch.queue_bind(:queue_name => "test-queue") + ch.basic_consume(:queue => "test-queue", :consumer_tag => "ctag") + content = Qpid::Content.new(headers, body) + ch.basic_publish(:routing_key => "test-queue", :content => content) + queue = cli.queue("ctag") + msg = queue.pop() + assert content.headers == msg.content.headers + assert content.body == msg.content.body + assert content.children == msg.content.children + msg = ch.channel_close() + assert msg.method.qname == :channel_close_ok + end + + def test_publish(); publish("hello world") end + + def test_publish_empty(); publish("") end + + def test_publish_headers(); publish("hello world", :content_type => "text/plain") end + + def test_consume(); consume("hello world") end + + def test_consume_empty(); consume("") end + + def test_consume_headers(); consume("hello_world", :content_type => "text/plain") end + +end diff --git a/qpid/ruby/tests/channel.rb b/qpid/ruby/tests/channel.rb new file mode 100644 index 0000000000..31c5f19d92 --- /dev/null +++ b/qpid/ruby/tests/channel.rb @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid/test" +require "qpid" + +class Channel < Test::Unit::TestCase + + include Qpid::Test + + def test_channel_open_close() + c = connect() + ch = c.channel(1) + msg = ch.channel_open() + assert msg.method.qname == :channel_open_ok + msg = ch.channel_close() + assert msg.method.qname == :channel_close_ok + end + + def test_channel_close() + c = connect() + ch = c.channel(1) + begin + ch.channel_close() + rescue Qpid::Closed => e + assert c.code.method.qname == :connection_close + assert c.code.reply_code == 504 + end + end + +end |