summaryrefslogtreecommitdiff
path: root/qpid/ruby
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-03-07 17:13:02 +0000
committerRafael H. Schloming <rhs@apache.org>2007-03-07 17:13:02 +0000
commit838692e69a2ba3fbbd3f260629725602f2f4b966 (patch)
tree522613531c2a5b1d056eed8c4e7392ecb37240e7 /qpid/ruby
parent82075c0d635f29e1698b7adaff54e519aa501086 (diff)
downloadqpid-python-838692e69a2ba3fbbd3f260629725602f2f4b966.tar.gz
added test harness, tests, and a few missing pieces of implementation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@515652 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/ruby')
-rw-r--r--qpid/ruby/qpid.rb (renamed from qpid/ruby/test.rb)19
-rw-r--r--qpid/ruby/qpid/client.rb30
-rw-r--r--qpid/ruby/qpid/codec.rb5
-rw-r--r--qpid/ruby/qpid/connection.rb119
-rw-r--r--qpid/ruby/qpid/peer.rb74
-rw-r--r--qpid/ruby/qpid/spec.rb40
-rw-r--r--qpid/ruby/qpid/test.rb17
-rwxr-xr-xqpid/ruby/run-tests4
-rw-r--r--qpid/ruby/tests/basic.rb68
-rw-r--r--qpid/ruby/tests/channel.rb48
10 files changed, 360 insertions, 64 deletions
diff --git a/qpid/ruby/test.rb b/qpid/ruby/qpid.rb
index f9de9a5a3a..19c3e305b7 100644
--- a/qpid/ruby/test.rb
+++ b/qpid/ruby/qpid.rb
@@ -17,19 +17,8 @@
# under the License.
#
-require "qpid/client"
+require "qpid/queue"
+require "qpid/codec"
+require "qpid/connection"
+require "qpid/peer"
require "qpid/spec"
-
-def die(msg)
- puts msg
- exit(1)
-end
-
-specfile = $*[0]
-die("usage: test.rb <spec file>") if specfile.nil?
-
-c = Qpid::Client.new("0.0.0.0", 5672, Spec.load($*[0]))
-c.start({"LOGIN" => "guest", "PASSWORD" => "guest"})
-ch = c.channel(1)
-p ch.channel_open()
-p ch.queue_declare()
diff --git a/qpid/ruby/qpid/client.rb b/qpid/ruby/qpid/client.rb
index 485dbf745d..f10f2e564b 100644
--- a/qpid/ruby/qpid/client.rb
+++ b/qpid/ruby/qpid/client.rb
@@ -24,11 +24,11 @@ require "qpid/queue"
module Qpid
class Client
- def initialize(host, port, spec, vhost = nil)
+ def initialize(host, port, spec, vhost = "/")
@host = host
@port = port
@spec = spec
- @vhost = if vhost.nil?; host else vhost end
+ @vhost = vhost
@mechanism = nil
@response = nil
@@ -38,6 +38,7 @@ module Qpid
@mutex = Mutex.new()
@closed = false
+ @code = nil
@started = ConditionVariable.new()
@conn = Connection.new(@host, @port, @spec)
@@ -47,6 +48,8 @@ module Qpid
attr_reader :mechanism, :response, :locale
def closed?; @closed end
+ def closed=(value); @closed = value end
+ def code; @code end
def wait()
@mutex.synchronize do
@@ -85,6 +88,12 @@ module Qpid
def channel(id)
return @peer.channel(id)
end
+
+ def close(msg = nil)
+ @closed = true
+ @code = msg
+ @peer.close()
+ end
end
class ClientDelegate
@@ -104,6 +113,23 @@ module Qpid
ch.connection_tune_ok(*msg.fields)
@client.signal_start()
end
+
+ def connection_close(ch, msg)
+ puts "CONNECTION CLOSED: #{msg.args.join(", ")}"
+ @client.close(msg)
+ end
+
+ def channel_close(ch, msg)
+ puts "CHANNEL[#{ch.id}] CLOSED: #{msg.args.join(", ")}"
+ ch.channel_close_ok()
+ ch.close()
+ end
+
+ def basic_deliver(ch, msg)
+ queue = @client.queue(msg.consumer_tag)
+ queue << msg
+ end
+
end
end
diff --git a/qpid/ruby/qpid/codec.rb b/qpid/ruby/qpid/codec.rb
index 2695228ba6..8d80e10aee 100644
--- a/qpid/ruby/qpid/codec.rb
+++ b/qpid/ruby/qpid/codec.rb
@@ -67,11 +67,10 @@ module Codec
end
def longlong(l)
- # is this the right byte order?
lower = l & 0xffffffff
upper = (l & ~0xffffffff) >> 32
- long(lower)
long(upper)
+ long(lower)
end
def shortstr(s)
@@ -116,6 +115,7 @@ module Codec
def write(str)
flushbits()
@out.write(str)
+# puts "OUT #{str.inspect()}"
end
def pack(fmt, *args)
@@ -238,6 +238,7 @@ module Codec
if result.nil? or result.empty?
raise EOF.new()
else
+# puts " IN #{result.inspect()}"
return result
end
end
diff --git a/qpid/ruby/qpid/connection.rb b/qpid/ruby/qpid/connection.rb
index c79e9d8319..f6ee9cf1e4 100644
--- a/qpid/ruby/qpid/connection.rb
+++ b/qpid/ruby/qpid/connection.rb
@@ -48,6 +48,7 @@ module Qpid
end
def write(frame)
+# puts "OUT #{frame.inspect()}"
@out.octet(@spec.constants[frame.payload.type].id)
@out.short(frame.channel)
frame.payload.encode(@out)
@@ -62,13 +63,15 @@ module Qpid
if oct != frame_end
raise Exception.new("framing error: expected #{frame_end}, got #{oct}")
end
- Frame.new(channel, payload)
+ frame = Frame.new(channel, payload)
+# puts " IN #{frame.inspect}"
+ return frame
end
private
def frame_end
- @spec.constants[:"frame end"].id
+ @spec.constants[:"frame_end"].id
end
end
@@ -113,9 +116,7 @@ module Qpid
attr_reader(:method, :args)
- def Method.type
- :"frame method"
- end
+ def Method.type; :frame_method end
def type; Method.type end
@@ -125,6 +126,7 @@ module Qpid
enc.short(@method.parent.id)
enc.short(@method.id)
@method.fields.zip(self.args).each {|f, a|
+ if a.nil?; a = f.default end
enc.encode(f.type, a)
}
enc.flush()
@@ -140,6 +142,113 @@ module Qpid
return Method.new(meth, args)
end
+ def inspect(); "#{method.qname}(#{args.join(", ")})" end
+
+ end
+
+ class Header < Payload
+
+ def Header.type; :frame_header end
+
+ def initialize(klass, weight, size, properties)
+ @klass = klass
+ @weight = weight
+ @size = size
+ @properties = properties
+ end
+
+ attr_reader :weight, :size, :properties
+
+ def type; Header.type end
+
+ def encode(encoder)
+ buf = StringWriter.new()
+ enc = Encoder.new(buf)
+ enc.short(@klass.id)
+ enc.short(@weight)
+ enc.longlong(@size)
+
+ # property flags
+ nprops = @klass.fields.size
+ flags = 0
+ 0.upto(nprops - 1) do |i|
+ f = @klass.fields[i]
+ flags <<= 1
+ flags |= 1 unless @properties[f.name].nil?
+ # the last bit indicates more flags
+ if i > 0 and (i % 15) == 0
+ flags <<= 1
+ if nprops > (i + 1)
+ flags |= 1
+ enc.short(flags)
+ flags = 0
+ end
+ end
+ end
+ flags <<= ((16 - (nprops % 15)) % 16)
+ enc.short(flags)
+
+ # properties
+ @klass.fields.each do |f|
+ v = @properties[f.name]
+ enc.encode(f.type, v) unless v.nil?
+ end
+ enc.flush()
+ encoder.longstr(buf.to_s)
+ end
+
+ def Header.decode(spec, decoder)
+ dec = Decoder.new(StringReader.new(decoder.longstr()))
+ klass = spec.classes[dec.short()]
+ weight = dec.short()
+ size = dec.longlong()
+
+ # property flags
+ bits = []
+ while true
+ flags = dec.short()
+ 15.downto(1) do |i|
+ if flags >> i & 0x1 != 0
+ bits << true
+ else
+ bits << false
+ end
+ end
+ break if flags & 0x1 == 0
+ end
+
+ # properties
+ properties = {}
+ bits.zip(klass.fields).each do |b, f|
+ properties[f.name] = dec.decode(f.type) if b
+ end
+ return Header.new(klass, weight, size, properties)
+ end
+
+ def inspect(); "#{@klass.name}(#{@properties.inspect()})" end
+
+ end
+
+ class Body < Payload
+
+ def Body.type; :frame_body end
+
+ def type; Body.type end
+
+ def initialize(content)
+ @content = content
+ end
+
+ attr_reader :content
+
+ def encode(enc)
+ enc.longstr(@content)
+ end
+
+ def Body.decode(spec, dec)
+ return Body.new(dec.longstr())
+ end
+
end
end
diff --git a/qpid/ruby/qpid/peer.rb b/qpid/ruby/qpid/peer.rb
index 9e77165d01..320808fdc6 100644
--- a/qpid/ruby/qpid/peer.rb
+++ b/qpid/ruby/qpid/peer.rb
@@ -39,19 +39,33 @@ module Qpid
@mutex.synchronize do
ch = @channels[id]
if ch.nil?
- ch = Channel.new(id, @outgoing, @conn.spec)
+ ch = Channel.new(id, self, @outgoing, @conn.spec)
@channels[id] = ch
end
return ch
end
end
+ def channel_delete(id)
+ @channels.delete(id)
+ end
+
def start()
spawn(:writer)
spawn(:reader)
spawn(:worker)
end
+ def close()
+ @mutex.synchronize do
+ @channels.each_value do |ch|
+ ch.close()
+ @outgoing.close()
+ @work.close()
+ end
+ end
+ end
+
private
def spawn(method, *args)
@@ -59,6 +73,8 @@ module Qpid
begin
send(method, *args)
# is this the standard way to catch any exception?
+ rescue Closed => e
+ puts "#{method} #{e}"
rescue Object => e
print e
e.backtrace.each do |line|
@@ -94,7 +110,7 @@ module Qpid
ch = channel(frame.channel)
payload = frame.payload
if payload.method.content?
- content = read_content(queue)
+ content = Qpid::read_content(queue)
else
content = nil
end
@@ -106,25 +122,27 @@ module Qpid
end
class Channel
- def initialize(id, outgoing, spec)
+ def initialize(id, peer, outgoing, spec)
@id = id
+ @peer = peer
@outgoing = outgoing
@spec = spec
@incoming = Queue.new()
@responses = Queue.new()
@queue = nil
@closed = false
- @reason = nil
end
+ attr_reader :id
+
def closed?; @closed end
- def close(reason)
+ def close()
return if closed?
+ @peer.channel_delete(@id)
@closed = true
- @reason = reason
@incoming.close()
- @responses .close()
+ @responses.close()
end
def dispatch(frame, work)
@@ -142,7 +160,7 @@ module Qpid
end
def method_missing(name, *args)
- method = @spec.ruby_method(name)
+ method = @spec.find_method(name)
if method.nil?
raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}")
end
@@ -150,7 +168,7 @@ module Qpid
if args.size == 1 and args[0].instance_of? Hash
kwargs = args[0]
invoke_args = method.fields.map do |f|
- kwargs[f.ruby_name]
+ kwargs[f.name]
end
content = kwargs[:content]
else
@@ -173,13 +191,13 @@ module Qpid
end
def invoke(method, args, content = nil)
- raise Closed(@reason) if closed?
+ raise Closed() if closed?
frame = Frame.new(@id, Method.new(method, args))
@outgoing << frame
if method.content?
content = Content.new() if content.nil?
- write_content(method.klass, content, @outgoing)
+ write_content(method.parent, content, @outgoing)
end
nowait = false
@@ -204,7 +222,7 @@ module Qpid
def write_content(klass, content, queue)
size = content.size
- header = Frame.new(@id, Header.new(klass, content.weight, size))
+ header = Frame.new(@id, Header.new(klass, content.weight, size, content.headers))
queue << header
content.children.each {|child| write_content(klass, child, queue)}
queue << Frame.new(@id, Body.new(content.body)) if size > 0
@@ -212,7 +230,7 @@ module Qpid
end
- def read_content(queue)
+ def Qpid.read_content(queue)
frame = queue.pop()
header = frame.payload
children = []
@@ -220,14 +238,30 @@ module Qpid
size = header.size
read = 0
buf = ""
- while read << size
- body = queue.get()
+ while read < size
+ body = queue.pop()
content = body.payload.content
buf << content
read += content.size
end
buf.freeze()
- return Content.new(buf, children, header.properties.clone())
+ return Content.new(header.properties.clone(), buf, children)
+ end
+
+ class Content
+ def initialize(headers = {}, body = "", children = [])
+ @headers = headers
+ @body = body
+ @children = children
+ end
+
+ attr_reader :headers, :body, :children
+
+ def size; body.size end
+ def weight; children.size end
+
+ def [](key); @headers[key] end
+ def []=(key, value); @headers[key] = value end
end
class Message
@@ -235,14 +269,18 @@ module Qpid
alias fields args
+ def method_missing(name)
+ return args[@method.fields[name].id]
+ end
+
def inspect()
- "#{method.ruby_name}(#{args.join(", ")})"
+ "#{method.qname}(#{args.join(", ")})"
end
end
module Delegate
def dispatch(ch, msg)
- send(msg.method.ruby_name, ch, msg)
+ send(msg.method.qname, ch, msg)
end
end
diff --git a/qpid/ruby/qpid/spec.rb b/qpid/ruby/qpid/spec.rb
index c43bce7c25..9a04f584d0 100644
--- a/qpid/ruby/qpid/spec.rb
+++ b/qpid/ruby/qpid/spec.rb
@@ -55,14 +55,16 @@ module Spec
class Root
fields(:major, :minor, :classes, :constants, :domains)
- def ruby_method(name)
+ def find_method(name)
classes.each do |c|
c.methods.each do |m|
- if name == m.ruby_name
+ if name == m.qname
return m
end
end
end
+
+ return nil
end
end
@@ -91,18 +93,14 @@ module Spec
def response?; @response end
def response=(b); @response = b end
- def ruby_name
- Spec.rubyize(:"#{parent.name}_#{name}")
+ def qname
+ :"#{parent.name}_#{name}"
end
end
class Field
fields(:name, :id, :type, :docs)
- def ruby_name
- Spec.rubyize(name)
- end
-
def default
case type
when :bit then false
@@ -205,24 +203,28 @@ module Spec
value.intern() unless value.nil?
end
+ def parse_name(value)
+ value.gsub(/[\s-]/, '_').intern() unless value.nil?
+ end
+
def load_amqp()
Root.new(attr("major", :int), attr("minor", :int), load("class"),
load("constant"), load("domain"))
end
def load_class()
- Class.new(attr("name", :symbol), attr("index", :int), attr("handler", :symbol),
+ Class.new(attr("name", :name), attr("index", :int), attr("handler", :name),
load("field"), load("method"), load("doc"))
end
def load_method()
- Method.new(attr("name", :symbol), attr("index", :int),
+ Method.new(attr("name", :name), attr("index", :int),
attr("content", :bool), load("response"),
attr("synchronous", :bool), load("field"), load("docs"))
end
def load_response()
- name = attr("name", :symbol)
+ name = attr("name", :name)
Reference.new {|spec, klass|
response = klass.methods[name]
if response.nil?
@@ -233,23 +235,23 @@ module Spec
end
def load_field()
- type = attr("type", :symbol)
+ type = attr("type", :name)
if type.nil?
- domain = attr("domain", :symbol)
+ domain = attr("domain", :name)
type = Reference.new {|spec, klass|
spec.domains[domain].type
}
end
- Field.new(attr("name", :symbol), @index, type, load("docs"))
+ Field.new(attr("name", :name), @index, type, load("docs"))
end
def load_constant()
- Constant.new(attr("name", :symbol), attr("value", :int), attr("class", :symbol),
+ Constant.new(attr("name", :name), attr("value", :int), attr("class", :name),
load("doc"))
end
def load_domain()
- Domain.new(attr("name", :symbol), attr("type", :symbol))
+ Domain.new(attr("name", :name), attr("type", :name))
end
def load_doc()
@@ -284,10 +286,4 @@ module Spec
spec
end
- private
-
- def Spec.rubyize(name)
- name.to_s.gsub(/[\s-]/, '_').intern()
- end
-
end
diff --git a/qpid/ruby/qpid/test.rb b/qpid/ruby/qpid/test.rb
new file mode 100644
index 0000000000..af57e8cf68
--- /dev/null
+++ b/qpid/ruby/qpid/test.rb
@@ -0,0 +1,17 @@
+require "qpid/spec"
+require "qpid/client"
+
+module Qpid
+
+ module Test
+
+ def connect()
+ spec = Spec.load("../specs/amqp.0-8.xml")
+ c = Client.new("0.0.0.0", 5672, spec)
+ c.start({"LOGIN" => "guest", "PASSWORD" => "guest"})
+ return c
+ end
+
+ end
+
+end
diff --git a/qpid/ruby/run-tests b/qpid/ruby/run-tests
new file mode 100755
index 0000000000..b4c51a75ed
--- /dev/null
+++ b/qpid/ruby/run-tests
@@ -0,0 +1,4 @@
+#!/usr/bin/ruby
+
+require "tests/channel"
+require "tests/basic"
diff --git a/qpid/ruby/tests/basic.rb b/qpid/ruby/tests/basic.rb
new file mode 100644
index 0000000000..e9bbcedddb
--- /dev/null
+++ b/qpid/ruby/tests/basic.rb
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "test/unit"
+require "qpid/test"
+require "qpid"
+
+class Basic < Test::Unit::TestCase
+
+ include Qpid::Test
+
+ def publish(body, headers = {})
+ cli = connect()
+ ch = cli.channel(1)
+ ch.channel_open()
+ content = Qpid::Content.new(headers, body)
+ ch.basic_publish(:content => content)
+ msg = ch.channel_close()
+ assert msg.method.qname == :channel_close_ok
+ end
+
+ def consume(body, headers = {})
+ cli = connect()
+ ch = cli.channel(1)
+ ch.channel_open()
+ ch.queue_declare(:queue => "test-queue")
+ ch.queue_bind(:queue_name => "test-queue")
+ ch.basic_consume(:queue => "test-queue", :consumer_tag => "ctag")
+ content = Qpid::Content.new(headers, body)
+ ch.basic_publish(:routing_key => "test-queue", :content => content)
+ queue = cli.queue("ctag")
+ msg = queue.pop()
+ assert content.headers == msg.content.headers
+ assert content.body == msg.content.body
+ assert content.children == msg.content.children
+ msg = ch.channel_close()
+ assert msg.method.qname == :channel_close_ok
+ end
+
+ def test_publish(); publish("hello world") end
+
+ def test_publish_empty(); publish("") end
+
+ def test_publish_headers(); publish("hello world", :content_type => "text/plain") end
+
+ def test_consume(); consume("hello world") end
+
+ def test_consume_empty(); consume("") end
+
+ def test_consume_headers(); consume("hello_world", :content_type => "text/plain") end
+
+end
diff --git a/qpid/ruby/tests/channel.rb b/qpid/ruby/tests/channel.rb
new file mode 100644
index 0000000000..31c5f19d92
--- /dev/null
+++ b/qpid/ruby/tests/channel.rb
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "test/unit"
+require "qpid/test"
+require "qpid"
+
+class Channel < Test::Unit::TestCase
+
+ include Qpid::Test
+
+ def test_channel_open_close()
+ c = connect()
+ ch = c.channel(1)
+ msg = ch.channel_open()
+ assert msg.method.qname == :channel_open_ok
+ msg = ch.channel_close()
+ assert msg.method.qname == :channel_close_ok
+ end
+
+ def test_channel_close()
+ c = connect()
+ ch = c.channel(1)
+ begin
+ ch.channel_close()
+ rescue Qpid::Closed => e
+ assert c.code.method.qname == :connection_close
+ assert c.code.reply_code == 504
+ end
+ end
+
+end