diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-11-13 02:45:18 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-11-13 02:45:18 +0000 |
commit | 92239a7a1362501ec492f176909eb3c092771cb9 (patch) | |
tree | d5392e5a3355dfd6929216e99a6141fda7276506 /qpid/ruby | |
parent | 726494e8b3d6eb93a500d2e06a707fea5ce3dfd3 (diff) | |
download | qpid-python-92239a7a1362501ec492f176909eb3c092771cb9.tar.gz |
merged 0-10 ruby client from QPID-1443 into existing ruby client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@713616 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/ruby')
39 files changed, 6105 insertions, 388 deletions
diff --git a/qpid/ruby/Rakefile b/qpid/ruby/Rakefile new file mode 100644 index 0000000000..66e0fed226 --- /dev/null +++ b/qpid/ruby/Rakefile @@ -0,0 +1,75 @@ +# Rakefile for ruby-rpm -*- ruby -*- +require 'rake/clean' +require 'rake/testtask' +require 'rake/gempackagetask' +require 'pathname' + +PKG_NAME='ruby-qpid' +PKG_VERSION='0.10.2' +GEM_NAME='qpid' + +AMQP_SPEC_SRC=Pathname.new("../specs").realpath +AMQP_SPEC_PATH=["/usr/share/amqp", AMQP_SPEC_SRC].join(File::PATH_SEPARATOR) +AMQP_SPEC_FILES = FileList["amqp.0-10-qpid-errata.xml"] + +ENV["AMQP_SPEC_PATH"] = AMQP_SPEC_PATH unless ENV["AMQP_SPEC_PATH"] + +# +# Additional files for clean/clobber +# + +CLEAN.include [ "**/*~", "lib/*/spec_cache" ] + +Rake::TestTask.new(:test) do |t| + t.test_files = FileList['tests/*.rb'].exclude("tests/util.rb") + t.libs = [ 'lib' ] +end + +Rake::TestTask.new(:"test_0-8") do |t| + t.test_files = FileList["tests_0-8/*.rb"] + t.libs = [ 'lib' ] +end + +desc "Create cached versions of the AMQP specs" +task :spec_cache do |t| + AMQP_SPEC_FILES.each do |f| + pid = fork do + $: << "lib" + require 'qpid' + Qpid::Spec010::load(f) + puts "Cached #{f}" + end + Process.wait(pid) + end +end + +# +# Packaging +# + +PKG_FILES = FileList[ + "DISCLAIMER", "LICENSE.txt", "NOTICE.txt", + "Rakefile", "RELEASE_NOTES", + "lib/**/*.rb", "lib/*/spec_cache/*.rb*", "tests/**/*", "examples/**" +] + +DIST_FILES = FileList[ + "pkg/*.tgz", "pkg/*.gem" +] + +SPEC = Gem::Specification.new do |s| + s.name = GEM_NAME + s.version = PKG_VERSION + s.email = "qpid-dev@incubator.apache.org" + s.homepage = "http://cwiki.apache.org/qpid/" + s.summary = "Ruby client for Qpid" + s.files = PKG_FILES + s.required_ruby_version = '>= 1.8.1' + s.description = "Ruby client for Qpid" +end + +Rake::GemPackageTask.new(SPEC) do |pkg| + task pkg.package_dir => [ :spec_cache ] + pkg.need_tar = true + pkg.need_zip = true +end diff --git a/qpid/ruby/examples/hello-world.rb b/qpid/ruby/examples/hello-world.rb new file mode 100644 index 0000000000..9cddf93ae0 --- /dev/null +++ b/qpid/ruby/examples/hello-world.rb @@ -0,0 +1,53 @@ +#!/usr/bin/ruby +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "rubygems" +require "qpid" +require "socket" + +conn = Qpid::Connection.new(TCPSocket.new("localhost", 5672)) +conn.start(10) + +ssn = conn.session("test") + +# create a queue +ssn.queue_declare("test-queue") + +# publish a message +dp = ssn.delivery_properties(:routing_key => "test-queue") +mp = ssn.message_properties(:content_type => "text/plain") +msg = Qpid::Message.new(dp, mp, "Hello World!") +ssn.message_transfer(:message => msg) + +# subscribe to a queue +ssn.message_subscribe(:destination => "messages", :queue => "test-queue", + :accept_mode => ssn.message_accept_mode.none) +incoming = ssn.incoming("messages") + +# start incoming message flow +incoming.start() + +# grab a message from the queue +p incoming.get(10) + +# cancel the subscription and close the session and connection +ssn.message_cancel(:destination => "messages") +ssn.close() +conn.close() diff --git a/qpid/ruby/examples/qmf-libvirt.rb b/qpid/ruby/examples/qmf-libvirt.rb new file mode 100644 index 0000000000..128cfb95ee --- /dev/null +++ b/qpid/ruby/examples/qmf-libvirt.rb @@ -0,0 +1,81 @@ +#!/usr/bin/ruby +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "rubygems" +require "qpid" + +s = Qpid::Qmf::Session.new() +b = s.add_broker("amqp://localhost:5672") + +while true: + nodes = s.objects(:class => "node") + nodes.each do |node| + puts "node: #{node.hostname}" + for (key, val) in node.properties + puts " property: #{key}, #{val}" + end + + # Find any domains that on the current node. + domains = s.objects(:class => "domain", 'node' => node.object_id) + domains.each do |domain| + r = domain.getXMLDesc() + puts "status: #{r.status}" + if r.status == 0 + puts "xml description: #{r.description}" + puts "length: #{r.description.length}" + end + + puts " domain: #{domain.name}, state: #{domain.state}, id: #{domain.id}" + for (key, val) in domain.properties + puts " property: #{key}, #{val}" + end + end + + pools = s.objects(:class => "pool", 'node' => node.object_id) + pools.each do |pool| + puts " pool: #{pool.name}" + for (key, val) in pool.properties + puts " property: #{key}, #{val}" + end + + r = pool.getXMLDesc() + puts "status: #{r.status}" + puts "text: #{r.text}" + if r.status == 0 + puts "xml description: #{r.description}" + puts "length: #{r.description.length}" + end + + # Find volumes that are part of the pool. + volumes = s.objects(:class => "volume", 'pool' => pool.object_id) + volumes.each do |volume| + puts " volume: #{volume.name}" + for (key, val) in volume.properties + puts " property: #{key}, #{val}" + end + end + end + + end + + puts '----------------------------' + sleep(5) + +end diff --git a/qpid/ruby/qpid.rb b/qpid/ruby/lib/qpid.rb index 25cd26f362..1c719e9b1d 100644 --- a/qpid/ruby/qpid.rb +++ b/qpid/ruby/lib/qpid.rb @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -17,9 +17,25 @@ # under the License. # -require "qpid/client" +module Qpid + def self.logger + @logger ||= {} + @logger + end +end + +require "qpid/util" require "qpid/queue" +require "qpid/packer" +require "qpid/framer" require "qpid/codec" +require 'qpid/datatypes' +require 'qpid/spec010' +require 'qpid/delegates' +require 'qpid/invoker' +require "qpid/assembler" +require 'qpid/session' require "qpid/connection" -require "qpid/peer" require "qpid/spec" +require 'qpid/queue' +require 'qpid/qmf' diff --git a/qpid/ruby/lib/qpid/assembler.rb b/qpid/ruby/lib/qpid/assembler.rb new file mode 100644 index 0000000000..b768c3f195 --- /dev/null +++ b/qpid/ruby/lib/qpid/assembler.rb @@ -0,0 +1,148 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + + class << self + attr_accessor :asm_logger + end + + class Segment + + attr_reader :type, :payload, :track, :channel + attr_accessor :id, :offset + + def initialize(first, last, type, track, channel, payload) + @id = nil + @offset = nil + @first = first + @last = last + @type = type + @track = track + @channel = channel + @payload = payload + end + + def first_segment? ; @first ; end + + def last_segment? ; @last ; end + + def decode(spec) + segs = spec[:segment_type] + choice = segs.enum.choices[type] + return method("decode_#{choice.name}").call(spec) + end + + def decode_control(spec) + sc = StringCodec.new(spec, payload) + return sc.read_control() + end + + def decode_command(spec) + sc = StringCodec.new(spec, payload) + hdr, cmd = sc.read_command() + cmd.id = id + return hdr, cmd + end + + def decode_header(spec) + sc = StringCodec.new(spec, payload) + values = [] + until sc.encoded.empty? + values << sc.read_struct32() + end + return values + end + + def decode_body(spec) + payload + end + + def append(frame) + @payload += frame.payload + end + + def to_s + f = first_segment? ? 'F' : '.' + l = last_segment? ? 'L' : '.' + return "%s%s %s %s %s %s" % [f, l, @type, + @track, @channel, @payload.inspect] + end + + end + + class Assembler < Framer + + def logger; Qpid::asm_logger; end + + def initialize(sock, max_payload = Frame::MAX_PAYLOAD) + super(sock) + @max_payload = max_payload + @fragments = {} + end + + def read_segment + loop do + frame = read_frame + key = [frame.channel, frame.track] + seg = @fragments[key] + unless seg + seg = Segment.new(frame.first_segment?, + frame.last_segment?, + frame.type, frame.track, + frame.channel, "") + @fragments[key] = seg + end + + seg.append(frame) + + if frame.last_frame? + @fragments.delete(key) + logger.debug("RECV #{seg}") if logger + return seg + end + end + end + + def write_segment(segment) + remaining = segment.payload + + first = true + while first or remaining + payload = remaining[0, @max_payload] + remaining = remaining[@max_payload, remaining.size] + + flags = 0 + + flags |= FIRST_FRM if first + flags |= LAST_FRM unless remaining + flags |= FIRST_SEG if segment.first_segment? + flags |= LAST_SEG if segment.last_segment? + + frame = Frame.new(flags, segment.type, segment.track, + segment.channel, payload) + write_frame(frame) + + first = false + end + + logger.debug("SENT #{segment}") if logger + end + end +end diff --git a/qpid/ruby/qpid/client.rb b/qpid/ruby/lib/qpid/client.rb index f10f2e564b..ec3d100a9c 100644 --- a/qpid/ruby/qpid/client.rb +++ b/qpid/ruby/lib/qpid/client.rb @@ -21,7 +21,7 @@ require "thread" require "qpid/peer" require "qpid/queue" -module Qpid +module Qpid08 class Client def initialize(host, port, spec, vhost = "/") @@ -97,6 +97,7 @@ module Qpid end class ClientDelegate + include Delegate def initialize(client) diff --git a/qpid/ruby/lib/qpid/codec.rb b/qpid/ruby/lib/qpid/codec.rb new file mode 100644 index 0000000000..009b1eef53 --- /dev/null +++ b/qpid/ruby/lib/qpid/codec.rb @@ -0,0 +1,455 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'qpid/packer.rb' +require 'iconv' + +module Qpid + + class Codec + + include Qpid::Packer + + def initialize(spec = "") + @spec = spec + end + + def write_void(v) + unless v.nil? + raise Exception.new("void not nil: #{v}") + end + end + + def read_void + return nil + end + + def write_bit(b) + unless b + raise Exception.new("bit is nil: #{b}") + end + end + + def read_bit + return true + end + + def read_uint8 + return unpack("C", 1) + end + + def write_uint8(n) + return pack("C", n) + end + + def read_int8 + return unpack("c", 1) + end + + def write_int8(n) + pack("c", n) + end + + def read_char + return unpack("c", 1) + end + + def write_char(c) + pack("c") + end + + def read_boolean + return read_uint8 != 0 + end + + def write_boolean(b) + n = 0 + n = 1 if b != 0 + write_uint8(n) + end + + def read_uint16 + return unpack("n", 2) + end + + def write_uint16(n) + pack("n", n) + end + + def read_int16 + # XXX: holy moly.. pack/unpack doesn't have signed network byte order. Crazy hackery. + val = unpack("n", 2) + val -= 2 ** 16 if val >= 2 ** 15 + return val + end + + def write_int16(n) + # XXX: Magically this one works even though it's not signed. + pack("n", n) + end + + def read_uint32 + return unpack("N", 4) + end + + def write_uint32(n) + pack("N", n) + end + + def read_int32 + # Again no pack/unpack for signed int + return unpack("N", 4) + end + + def write_int32(n) + # FIXME + pack("N", n) + end + + def read_float + return unpack("g", 4) + end + + def write_float(n) + pack("g", n) + end + + def read_sequence_no + return read_uint32.to_serial + end + + def write_sequence_no(n) + write_uint32(n.value) + end + + def encode_64bit(num, signed = false) + b = [] + + if num < 0 && signed + num += 2 ** 64 + end + + (0..7).each do |c| + d = 7 - c + b[c] = (num & (0xff << d * 8)) >> d * 8 + end + pack('C8', *b) + end + + + def decode_64bit(signed = false) + # Silly ruby pack/unpack does not implement 64 bit network byte order + # encode/decode. + a = unpack('C8', 8) + num = 0 + (0..7).each do |c| + d = 7 - c + num |= a[c] << 8 * d + end + + if signed && num >= 2 ** 63 + num -= 2 ** 64 + end + return num + end + + def read_uint64 + return decode_64bit + end + + def write_uint64(n) + encode_64bit(n) + end + + def read_int64 + return decode_64bit(signed = true) + end + + def write_int64(n) + encode_64bit(n, signed = true) + end + + def read_datetime + return read_uint64 + end + + def write_datetime(n) + write_uint64(n) + end + + def read_double + return unpack("G", 8) + end + + def write_double(n) + pack("G", n) + end + + def read_vbin8 + # XXX + return read(read_uint8) + end + + def write_vbin8(b) + # XXX + write_uint8(b.length) + write(b) + end + + def read_str8 + # FIXME: Check iconv.. I think this will throw if there are odd characters. + return Iconv.conv("ASCII", "UTF-8", read_vbin8) + end + + def write_str8(s) + write_vbin8(Iconv.conv("UTF-8", "ASCII", s)) + end + + def read_str16 + return Iconv.conv("ASCII", "UTF-8", read_vbin16) + end + + def write_str16(s) + write_vbin16(Iconv.conv("UTF-8", "ASCII", s)) + end + + def read_vbin16 + # XXX: Using read method? + return read(read_uint16) + end + + def write_vbin16(b) + write_uint16(b.length) + write(b) + end + + def read_sequence_set + # FIXME: Need datatypes + result = RangedSet.new + size = read_uint16 + nranges = size / 8 + nranges.times do |i| + lower = read_sequence_no + upper = read_sequence_no + result.add(lower, upper) + end + return result + end + + def write_sequence_set(ss) + size = 8 * ss.ranges.length + write_uint16(size) + ss.ranges.each do |range| + write_sequence_no(range.lower) + write_sequence_no(range.upper) + end + end + + def read_vbin32 + return read(read_uint32) + end + + def write_vbin32(b) + write_uint32(b.length) + write(b) + end + + def write_map(m) + sc = StringCodec.new(@spec) + unless m.nil? + sc.write_uint32(m.size) + m.each do |k, v| + unless type = @spec.encoding(v.class) + raise Exception.new("no encoding for: #{v.class}") + end + sc.write_str8(k) + sc.write_uint8(type.code) + type.encode(sc, v) + end + end + write_vbin32(sc.encoded) + end + + def read_map + sc = StringCodec.new(@spec, read_vbin32) + return nil unless sc.encoded + count = sc.read_uint32 + result = nil + if count + result = {} + until sc.encoded.empty? + k = sc.read_str8 + code = sc.read_uint8 + type = @spec.types[code] + v = type.decode(sc) + result[k] = v + end + end + return result + end + + def write_array(a) + sc = StringCodec.new(@spec) + unless a.nil? + if a.length > 0 + type = @spec.encoding(a[0].class) + else + type = @spec.encoding(nil.class) + end + sc.write_uint8(type.code) + sc.write_uint32(a.size) + a.each { |o| type.encode(sc, o) } + end + write_vbin32(sc.encoded) + end + + def read_array + sc = StringCodec.new(@spec, read_vbin32) + return nil if not sc.encoded + type = @spec.types[sc.read_uint8] + count = sc.read_uint32 + result = nil + if count + result = [] + count.times { |i| result << (type.decode(sc)) } + end + return result + end + + def write_list(l) + sc = StringCodec.new(@spec) + unless l.nil? + sc.write_uint32(l.length) + l.each do |o| + type = @spec.encoding(o.class) + sc.write_uint8(type.code) + type.encode(sc, o) + end + end + write_vbin32(sc.encoded) + end + + def read_list + sc = StringCodec.new(@spec, read_vbin32) + return nil if not sc.encoded + count = sc.read_uint32 + result = nil + if count + result = [] + count.times do |i| + type = @spec.types[sc.read_uint8] + result << type.decode(sc) + end + end + return result + end + + def read_struct32 + size = read_uint32 + code = read_uint16 + type = @spec.structs[code] + # XXX: BLEH! + fields = type.decode_fields(self) + return Qpid::struct(type, fields) + end + + def write_struct32(value) + type = value.type + sc = StringCodec.new(@spec) + sc.write_uint16(type.code) + type.encode_fields(sc, value) + write_vbin32(sc.encoded) + end + + def read_control + cntrl = @spec.controls[read_uint16] + return Qpid::struct(cntrl, cntrl.decode_fields(self)) + end + + def write_control(ctrl) + type = ctrl.type + write_uint16(type.code) + type.encode_fields(self, ctrl) + end + + def read_command + type = @spec.commands[read_uint16] + hdr = @spec[:header].decode(self) + cmd = Qpid::struct(type, type.decode_fields(self)) + return hdr, cmd + end + + def write_command(hdr, cmd) + type = cmd.type + write_uint16(type.code) + hdr.type.encode(self, hdr) + type.encode_fields(self, cmd) + end + + def read_size(width) + if width > 0 + return send(:"read_uint#{width * 8}") + end + end + + def write_size(width, n) + if width > 0 + send(:"write_uint#{width * 8}", n) + end + end + + def read_uuid + return unpack("A16", 16) + end + + def write_uuid(s) + pack("A16", s) + end + + def read_bin128 + return unpack("A16", 16) + end + + def write_bin128(b) + pack("A16", b) + end + + end + + class StringCodec < Codec + + def initialize(spec, encoded = "") + @spec = spec + @encoded = encoded + end + + attr_reader :encoded + + def write(s) + @encoded += s + end + + def read(n) + return "" if n.nil? + result = @encoded[0...n] + @encoded = @encoded[n...@encoded.size] || "" + return result + end + end +end diff --git a/qpid/ruby/qpid/codec.rb b/qpid/ruby/lib/qpid/codec08.rb index d1ecd2783f..148dee07bb 100644 --- a/qpid/ruby/qpid/codec.rb +++ b/qpid/ruby/lib/qpid/codec08.rb @@ -17,7 +17,7 @@ # under the License. # -module Codec +module Qpid08 # is there a better way to do this? class StringWriter @@ -119,7 +119,7 @@ module Codec def write(str) flushbits() @out.write(str) -# puts "OUT #{str.inspect()}" + # puts "OUT #{str.inspect()}" end def pack(fmt, *args) @@ -246,7 +246,7 @@ module Codec if result.nil? or result.empty? raise EOF.new() else -# puts " IN #{result.inspect()}" + # puts " IN #{result.inspect()}" return result end end diff --git a/qpid/ruby/lib/qpid/connection.rb b/qpid/ruby/lib/qpid/connection.rb new file mode 100644 index 0000000000..59d88196a3 --- /dev/null +++ b/qpid/ruby/lib/qpid/connection.rb @@ -0,0 +1,221 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'monitor' + +module Qpid + + class ChannelBusy< Exception ; end + + class ChannelsBusy < Exception ; end + + class SessionBusy < Exception ; end + + class ConnectionFailed < Exception ; end + + class Timeout < Exception ; end + + class Connection < Assembler + + include MonitorMixin + + attr_reader :spec, :attached, :sessions, :thread + attr_accessor :opened, :failed, :close_code + + def initialize(sock, args={}) + super(sock) + + delegate = args[:delegate] || Qpid::Delegate::Client.method(:new) + spec = args[:spec] || nil + + @spec = Qpid::Spec010::load(spec) + @track = @spec["track"] + + @attached = {} + @sessions = {} + + @condition = new_cond + @opened = false + @failed = false + @close_code = [nil, "connection aborted"] + + @thread = nil + + @channel_max = 65535 + + @delegate = delegate.call(self, args) + end + + def attach(name, ch, delegate, force=false) + synchronize do + ssn = @attached[ch.id] + if ssn + raise ChannelBusy.new(ch, ssn) unless ssn.name == name + else + ssn = @sessions[name] + if ssn.nil? + ssn = Session.new(name, @spec, :delegate => delegate) + @sessions[name] = ssn + elsif ssn.channel + if force + @attached.delete(ssn.channel.id) + ssn.channel = nil + else + raise SessionBusy.new(ssn) + end + end + @attached[ch.id] = ssn + ssn.channel = ch + end + ch.session = ssn + return ssn + end + end + + def detach(name, ch) + synchronize do + @attached.delete(ch.id) + ssn = @sessions.delete(name) + if ssn + ssn.channel = nil + ssn.closed + return ssn + end + end + end + + def session(name, kwargs = {}) + timeout = kwargs[:timeout] + delegate = kwargs[:delegate] || Qpid::Session::Client.method(:new) + + # FIXME: Python has cryptic comment about 'ch 0 ?' + channel = (0..@channel_max).detect { |i| ! @attached.key?(i) } + raise ChannelsBusy unless channel + + synchronize do + ch = Channel.new(self, channel) + ssn = attach(name, ch, delegate) + ssn.channel.session_attach(name) + if ssn.wait_for(timeout) { ssn.channel } + return ssn + else + detach(name, ch) + raise Timeout + end + end + end + + def detach_all + synchronize do + attached.values.each do |ssn| + ssn.exceptions << @close_code unless @close_code[0] == 200 + detach(ssn.name, ssn.channel) + end + end + end + + def start(timeout=nil) + @delegate.start + @thread = Thread.new { run } + @thread[:name] = 'conn' + synchronize do + unless @condition.wait_for(timeout) { @opened || @failed } + raise Timeout + end + end + if @failed + raise ConnectionFailed.new(@close_code) + end + end + + def run + # XXX: we don't really have a good way to exit this loop without + # getting the other end to kill the socket + loop do + begin + seg = read_segment + rescue Qpid::Closed => e + detach_all + break + end + @delegate.received(seg) + end + end + + def close(timeout=nil) + return unless @opened + Channel.new(self, 0).connection_close(200) + synchronize do + unless @condition.wait_for(timeout) { ! @opened } + raise Timeout + end + end + @thread.join(timeout) + @thread = nil + end + + def signal + synchronize { @condition.signal } + end + + def to_s + # FIXME: We'd like to report something like HOST:PORT + return @sock.to_s + end + + class Channel < Invoker + + attr_reader :id, :connection + attr_accessor :session + + def initialize(connection, id) + @connection = connection + @id = id + @session = nil + end + + def resolve_method(name) + inst = @connection.spec[name] + if inst.is_a?(Qpid::Spec010::Control) + return invocation(:method, inst) + else + return invocation(:error, nil) + end + end + + def invoke(type, args) + ctl = type.create(*args) + sc = StringCodec.new(@connection.spec) + sc.write_control(ctl) + @connection.write_segment(Segment.new(true, true, type.segment_type, + type.track, self.id, sc.encoded)) + + log = Qpid::logger["qpid.io.ctl"] + log.debug("SENT %s", ctl) if log + end + + def to_s + return "#{@connection}[#{@id}]" + end + + end + + end + +end diff --git a/qpid/ruby/qpid/connection.rb b/qpid/ruby/lib/qpid/connection08.rb index f6ee9cf1e4..09a4888cc4 100644 --- a/qpid/ruby/qpid/connection.rb +++ b/qpid/ruby/lib/qpid/connection08.rb @@ -18,11 +18,9 @@ # require "socket" -require "qpid/codec" +require "qpid/codec08" -include Codec - -module Qpid +module Qpid08 class Connection @@ -48,7 +46,7 @@ module Qpid end def write(frame) -# puts "OUT #{frame.inspect()}" + # puts "OUT #{frame.inspect()}" @out.octet(@spec.constants[frame.payload.type].id) @out.short(frame.channel) frame.payload.encode(@out) @@ -64,7 +62,7 @@ module Qpid raise Exception.new("framing error: expected #{frame_end}, got #{oct}") end frame = Frame.new(channel, payload) -# puts " IN #{frame.inspect}" + # puts " IN #{frame.inspect}" return frame end diff --git a/qpid/ruby/lib/qpid/datatypes.rb b/qpid/ruby/lib/qpid/datatypes.rb new file mode 100644 index 0000000000..f49ed0fd5f --- /dev/null +++ b/qpid/ruby/lib/qpid/datatypes.rb @@ -0,0 +1,353 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + + def self.struct(type, *args) + # FIXME: This is fragile; the last arg could be a hash, + # without being hte keywords + kwargs = {} + kwargs = args.pop if args.any? && args[-1].is_a?(Hash) + + if args.size > type.fields.size + raise TypeError, + "%s() takes at most %d arguments (%d given)" % + [type.name, type.fields.size, args.size] + end + + attrs = type.fields.inject({}) do |attrs, field| + if args.any? + attrs[field.name] = args.shift + if kwargs.key?(field.name) + raise TypeError, + "%s() got multiple values for keyword argument '%s'" % + [type.name, field.name] + end + elsif kwargs.key?(field.name) + attrs[field.name] = kwargs.delete(field.name) + else + attrs[field.name] = field.default + end + attrs + end + + unless kwargs.empty? + unexpected = kwargs.keys[0] + raise TypeError, + "%s() got an unexpected keyword argument '%s'" % + [type.name, unexpected] + end + + attrs[:type] = type + attrs[:id] = nil + + name = "Qpid_" + type.name.to_s.capitalize + unless ::Struct.const_defined?(name) + vars = type.fields.collect { |f| f.name } << :type << :id + ::Struct.new(name, *vars) + end + st = ::Struct.const_get(name) + + result = st.new + attrs.each { |k, v| result[k] = v } + return result + end + + class Message + + attr_accessor :headers, :body, :id + + def initialize(*args) + @body = nil + @headers = nil + + @body = args.pop unless args.empty? + @headers = args unless args.empty? + + @id = nil + end + + def has(name) + return ! get(name).nil? + end + + def get(name) + if @headers + name = name.to_sym + @headers.find { |h| h.type.name == name } + end + end + + def set(header) + @headers ||= [] + if h = @headers.find { |h| h.type == header.type } + ind = @headers.index(h) + @headers[ind] = header + else + @headers << header + end + end + + def clear(name) + if @headers + name = name.to_sym + @headers.delete_if { |h| h.type.name == name } + end + end + + # FIXME: Not sure what to do here + # Ruby doesn't have a notion of a evaluable string representation + # def __repr__(self): + # args = [] + # if self.headers: + # args.extend(map(repr, self.headers)) + # if self.body: + # args.append(repr(self.body)) + # if self.id is not None: + # args.append("id=%s" % self.id) + # return "Message(%s)" % ", ".join(args) + # end + end + + class ::Object + + def to_serial + Qpid::Serial.new(self) + end + end + + class Serial + + include Comparable + + attr_accessor :value + + def initialize(value) + @value = value & 0xFFFFFFFF + end + + def hash + @value.hash + end + + def to_serial + self + end + + def eql?(other) + other = other.to_serial + value.eql?(other.value) + end + + def <=>(other) + return 1 if other.nil? + + other = other.to_serial + + delta = (value - other.value) & 0xFFFFFFFF + neg = delta & 0x80000000 + mag = delta & 0x7FFFFFFF + + return (neg>0) ? -mag : mag + end + + def +(other) + result = other.to_serial + result.value += value + return result + end + + def -(other) + result = other.to_serial + result.value = value - result.value + return result + end + + def succ + Serial.new(value + 1) + end + + # FIXME: Not sure what to do here + # Ruby doesn't have a notion of a evaluable string representation + # def __repr__(self): + # return "serial(%s)" % self.value + # end + + def to_s + value.to_s + end + + end + + # The Python class datatypes.Range is emulated by the standard + # Range class with a few additions + class ::Range + + alias :lower :begin + alias :upper :end + + def touches(r) + # XXX: are we doing more checks than we need? + return (r.include?(lower - 1) || + r.include?(upper + 1) || + include?(r.lower - 1) || + include?(r.upper + 1) || + r.include?(lower) || + r.include?(upper) || + include?(r.lower) || + include?(r.upper)) + end + + def span(r) + Range.new([lower, r.lower].min, [upper, r.upper].max) + end + + def intersect(r) + l = [lower, r.lower].max + u = [upper, r.upper].min + return l > u ? nil : Range.new(l, u) + end + + end + + class RangedSet + + include Enumerable + + attr_accessor :ranges + + def initialize(*args) + @ranges = [] + args.each { |n| add(n) } + end + + def each(&block) + ranges.each { |r| yield(r) } + end + + def include?(n) + if (n.is_a?(Range)) + super(n) + else + ranges.find { |r| r.include?(n) } + end + end + + def add_range(range) + ranges.delete_if do |r| + if range.touches(r) + range = range.span(r) + true + else + false + end + end + ranges << range + end + + def add(lower, upper = nil) + upper = lower if upper.nil? + add_range(Range.new(lower, upper)) + end + + def to_s + repr = ranges.sort { |a,b| b.lower <=> a.lower }. + map { |r| r.to_s }.join(",") + "<RangedSet: {#{repr}}" + end + end + + class Future + def initialize(initial=nil, exception=Exception) + @value = initial + @error = nil + @set = Util::Event.new + @exception = exception + end + + def error(error) + @error = error + @set.set + end + + def set(value) + @value = value + @set.set + end + + def get(timeout=nil) + @set.wait(timeout) + unless @error.nil? + raise @exception.new(@error) + end + @value + end + end + + class UUID + include Comparable + + attr_accessor :bytes + + def initialize(bytes) + @bytes = bytes + end + + def <=>(other) + if other.respond_to?(:bytes) + return bytes <=> other.bytes + else + raise NotImplementedError + end + end + + def to_s + UUID::format(bytes) + end + + # FIXME: Not sure what to do here + # Ruby doesn't have a notion of a evaluable string representation + # def __repr__(self): + # return "UUID(%r)" % str(self) + # end + + def self.random_uuid + bytes = (1..16).collect { |i| rand(256) } + + # From RFC4122, the version bits are set to 0100 + bytes[7] &= 0x0F + bytes[7] |= 0x40 + + # From RFC4122, the top two bits of byte 8 get set to 01 + bytes[8] &= 0x3F + bytes[8] |= 0x80 + return bytes.pack("C16") + end + + def self.uuid4 + UUID.new(random_uuid) + end + + def self.format(s) + # Python format !LHHHHL + # big-endian, ulong, ushort x 4, ulong + "%08x-%04x-%04x-%04x-%04x%08x" % bytes.unpack("NnnnnN") + end + end +end diff --git a/qpid/ruby/lib/qpid/delegates.rb b/qpid/ruby/lib/qpid/delegates.rb new file mode 100644 index 0000000000..21513fc677 --- /dev/null +++ b/qpid/ruby/lib/qpid/delegates.rb @@ -0,0 +1,204 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'rbconfig' + +module Qpid + + class Delegate + + def initialize(connection, args={}) + @connection = connection + @spec = connection.spec + @delegate = args[:delegate] || Qpid::Delegate::Client.method(:new) + @control = @spec[:track].enum[:control].value + end + + def log ; Qpid::logger["qpid.io.ctl"]; end + + def received(seg) + ssn = @connection.attached[seg.channel] + unless ssn + ch = Qpid::Connection::Channel.new(@connection, seg.channel) + else + ch = ssn.channel + end + + if seg.track == @control + ctl = seg.decode(@spec) + log.debug("RECV %s", ctl) if log + attr = ctl.type.name + method(attr).call(ch, ctl) + elsif ssn.nil? + ch.session_detached + else + ssn.received(seg) + end + end + + def connection_close(ch, close) + @connection.close_code = [close.reply_code, close.reply_text] + ch.connection_close_ok + @connection.sock.close_write() + unless @connection.opened + @connection.failed = true + @connection.signal + end + end + + def connection_close_ok(ch, close_ok) + @connection.opened = false + @connection.signal + end + + def session_attach(ch, a) + begin + @connection.attach(a.name, ch, @delegate, a.force) + ch.session_attached(a.name) + rescue Qpid::ChannelBusy + ch.session_detached(a.name) + rescue Qpid::SessionBusy + ch.session_detached(a.name) + end + end + + def session_attached(ch, a) + ch.session.signal + end + + def session_detach(ch, d) + #send back the confirmation of detachment before removing the + #channel from the attached set; this avoids needing to hold the + #connection lock during the sending of this control and ensures + #that if the channel is immediately reused for a new session the + #attach request will follow the detached notification. + ch.session_detached(d.name) + ssn = @connection.detach(d.name, ch) + end + + def session_detached(ch, d) + @connection.detach(d.name, ch) + end + + def session_request_timeout(ch, rt) + ch.session_timeout(rt.timeout) + end + + def session_command_point(ch, cp) + ssn = ch.session + ssn.receiver.next_id = cp.command_id + ssn.receiver.next_offset = cp.command_offset + end + + def session_completed(ch, cmp) + ch.session.sender.has_completed(cmp.commands) + if cmp.timely_reply + ch.session_known_completed(cmp.commands) + end + ch.session.signal + end + + def session_known_completed(ch, kn_cmp) + ch.session.receiver.known_completed(kn_cmp.commands) + end + + def session_flush(ch, f) + rcv = ch.session.receiver + if f.expected + if rcv.next_id + exp = Qpid::RangedSet.new(rcv.next_id) + else + exp = nil + end + ch.session_expected(exp) + end + if f.confirmed + ch.session_confirmed(rcv.completed) + end + if f.completed + ch.session_completed(rcv.completed) + end + end + + class Server < Delegate + + def start + @connection.read_header() + @connection.write_header(@spec.major, @spec.minor) + ch = Qpid::Connection::Channel.new(@connection, 0) + ch.connection_start(:mechanisms => ["ANONYMOUS"]) + ch + end + + def connection_start_ok(ch, start_ok) + ch.connection_tune(:channel_max => 65535) + end + + def connection_tune_ok(ch, tune_ok) + nil + end + + def connection_open(ch, open) + @connection.opened = true + ch.connection_open_ok() + @connection.signal + end + end + + class Client < Delegate + + # FIXME: Python uses os.name for platform - we don't have an exact + # analog in Ruby + PROPERTIES = {"product" => "qpid python client", + "version" => "development", + "platform" => Config::CONFIG["build_os"]} + + + def initialize(connection, args) + super(connection) + + @username = args[:username] || "guest" + @password = args[:password] || "guest" + @mechanism= args[:mechanism] || "PLAIN" + end + + def start + @connection.write_header(@spec.major, @spec.minor) + @connection.read_header + end + + def connection_start(ch, start) + r = "\0%s\0%s" % [@username, @password] + ch.connection_start_ok(:client_properties => PROPERTIES, + :mechanism => @mechanism, + :response => r) + end + + def connection_tune(ch, tune) + ch.connection_tune_ok() + ch.connection_open() + end + + def connection_open_ok(ch, open_ok) + @connection.opened = true + @connection.signal + end + end + end +end diff --git a/qpid/ruby/qpid/fields.rb b/qpid/ruby/lib/qpid/fields.rb index 91484af850..cc87d07529 100644 --- a/qpid/ruby/qpid/fields.rb +++ b/qpid/ruby/lib/qpid/fields.rb @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,7 +26,7 @@ class Class if respond_to? :init init(*args) {|*a| yield(*a)} elsif args.any? - raise ArgumentException.new("extra arguments: #{args}") + raise ArgumentError, "extra arguments: #{args.inspect}" end end } diff --git a/qpid/ruby/lib/qpid/framer.rb b/qpid/ruby/lib/qpid/framer.rb new file mode 100644 index 0000000000..2a565a69a8 --- /dev/null +++ b/qpid/ruby/lib/qpid/framer.rb @@ -0,0 +1,195 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'monitor' +require 'logger' + +module Qpid + + FIRST_SEG = 0x08 + LAST_SEG = 0x04 + FIRST_FRM = 0x02 + LAST_FRM = 0x01 + + class << self + attr_accessor :raw_logger, :frm_logger + end + + def self.packed_size(format) + # FIXME: This is a total copout to simulate Python's + # struct.calcsize + ([0]*256).pack(format).size + end + + class Frame + attr_reader :payload, :track, :flags, :type, :channel + + # HEADER = "!2BHxBH4x" + # Python Meaning Ruby + # ! big endian (implied by format char) + # 2B 2 uchar C2 + # H unsigned short n + # x pad byte x + # B uchar C + # H unsigned short n + # 4x pad byte x4 + HEADER = "C2nxCnx4" + HEADER_SIZE = Qpid::packed_size(HEADER) + MAX_PAYLOAD = 65535 - HEADER_SIZE + + def initialize(flags, type, track, channel, payload) + if payload.size > MAX_PAYLOAD + raise ArgumentError, "max payload size exceeded: #{payload.size}" + end + + @flags = flags + @type = type + @track = track + @channel = channel + @payload = payload + end + + def first_segment? ; FIRST_SEG & @flags > 0 ; end + + def last_segment? ; LAST_SEG & @flags > 0 ; end + + def first_frame? ; FIRST_FRM & @flags > 0 ; end + + def last_frame? ; LAST_FRM & @flags > 0 ; end + + def to_s + fs = first_segment? ? 'S' : '.' + ls = last_segment? ? 's' : '.' + ff = first_frame? ? 'F' : '.' + lf = last_frame? ? 'f' : '.' + + return "%s%s%s%s %s %s %s %s" % [fs, ls, ff, lf, + @type, + @track, + @channel, + @payload.inspect] + end + end + + class FramingError < Exception ; end + + class Closed < Exception ; end + + class Framer + include Packer + + # Python: "!4s4B" + HEADER = "a4C4" + HEADER_SIZE = 8 + + def raw + Qpid::raw_logger + end + + def frm + Qpid::frm_logger + end + + def initialize(sock) + @sock = sock + @sock.extend(MonitorMixin) + @buf = "" + end + + attr_reader :sock + + def aborted? ; false ; end + + def write(buf) + @buf += buf + end + + def flush + @sock.synchronize do + _write(@buf) + @buf = "" + frm.debug("FLUSHED") if frm + end + end + + def _write(buf) + while buf && buf.size > 0 + # FIXME: Catch errors + n = @sock.write(buf) + raw.debug("SENT #{buf[0, n].inspect}") if raw + buf[0,n] = "" + @sock.flush + end + end + + def read(n) + data = "" + while data.size < n + begin + s = @sock.read(n - data.size) + rescue IOError => e + raise e if data != "" + @sock.close unless @sock.closed? + raise Closed + end + # FIXME: Catch errors + if s.nil? or s.size == 0 + @sock.close unless @sock.closed? + raise Closed + end + data += s + raw.debug("RECV #{n}/#{data.size} #{s.inspect}") if raw + end + return data + end + + def read_header + unpack(Framer::HEADER, Framer::HEADER_SIZE) + end + + def write_header(major, minor) + @sock.synchronize do + pack(Framer::HEADER, "AMQP", 1, 1, major, minor) + flush() + end + end + + def write_frame(frame) + @sock.synchronize do + size = frame.payload.size + Frame::HEADER_SIZE + track = frame.track & 0x0F + pack(Frame::HEADER, frame.flags, frame.type, size, track, frame.channel) + write(frame.payload) + if frame.last_segment? and frame.last_frame? + flush() + frm.debug("SENT #{frame}") if frm + end + end + end + + def read_frame + flags, type, size, track, channel = unpack(Frame::HEADER, Frame::HEADER_SIZE) + raise FramingError if (flags & 0xF0 > 0) + payload = read(size - Frame::HEADER_SIZE) + frame = Frame.new(flags, type, track, channel, payload) + frm.debug("RECV #{frame}") if frm + return frame + end + end +end diff --git a/qpid/ruby/lib/qpid/invoker.rb b/qpid/ruby/lib/qpid/invoker.rb new file mode 100644 index 0000000000..39716ac6c2 --- /dev/null +++ b/qpid/ruby/lib/qpid/invoker.rb @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Qpid::Invoker + + # Requires that client defines a invoke method and overrides + # resolve_method + + # FIXME: Is it really worth defining methods in method_missing ? We + # could just dispatch there directly + + def invc_method(name, resolved) + define_singleton_method(name) { |*args| invoke(resolved, args) } + # FIXME: the Python code also attaches docs from resolved.pydoc + end + + def invc_value(name, resolved) + define_singleton_method(name) { | | resolved } + end + + def invc_error(name, resolved) + msg = "%s instance has no attribute '%s'" % [self.class.name, name] + if resolved + msg += "\n%s" % resolved + end + raise NameError, msg + end + + def resolve_method(name) + invocation(:error, nil) + end + + def method_missing(name, *args) + disp, resolved = resolve_method(name) + disp.call(name, resolved) + send(name, *args) + end + + def invocation(kind, name = nil) + [ method("invc_#{kind}"), name ] + end + + private + def define_singleton_method(name, &body) + singleton_class = class << self; self; end + singleton_class.send(:define_method, name, &body) + end + +end diff --git a/qpid/ruby/run-tests b/qpid/ruby/lib/qpid/packer.rb index 956f8d0d4d..ae1be37faf 100755..100644 --- a/qpid/ruby/run-tests +++ b/qpid/ruby/lib/qpid/packer.rb @@ -1,4 +1,3 @@ -#!/usr/bin/ruby # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -7,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -17,7 +16,18 @@ # specific language governing permissions and limitations # under the License. # -# -require "tests/channel" -require "tests/basic" +module Qpid + module Packer + def unpack(fmt, len) + raw = read(len) + values = raw.unpack(fmt) + values = values[0] if values.size == 1 + return values + end + + def pack(fmt, *args) + write(args.pack(fmt)) + end + end +end diff --git a/qpid/ruby/qpid/peer.rb b/qpid/ruby/lib/qpid/peer.rb index 320808fdc6..cdb962169b 100644 --- a/qpid/ruby/qpid/peer.rb +++ b/qpid/ruby/lib/qpid/peer.rb @@ -19,10 +19,12 @@ require "thread" require "qpid/queue" -require "qpid/connection" +require "qpid/connection08" require "qpid/fields" -module Qpid +module Qpid08 + + Queue = Qpid::Queue class Peer @@ -60,9 +62,9 @@ module Qpid @mutex.synchronize do @channels.each_value do |ch| ch.close() - @outgoing.close() - @work.close() end + @outgoing.close() + @work.close() end end @@ -95,22 +97,22 @@ module Qpid def writer() while true - @conn.write(@outgoing.pop()) + @conn.write(@outgoing.get()) end end def worker() while true - dispatch(@work.pop()) + dispatch(@work.get()) end end def dispatch(queue) - frame = queue.pop() + frame = queue.get() ch = channel(frame.channel) payload = frame.payload if payload.method.content? - content = Qpid::read_content(queue) + content = Qpid08::read_content(queue) else content = nil end @@ -161,9 +163,9 @@ module Qpid def method_missing(name, *args) method = @spec.find_method(name) - if method.nil? - raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}") - end + if method.nil? + raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}") + end if args.size == 1 and args[0].instance_of? Hash kwargs = args[0] @@ -205,7 +207,7 @@ module Qpid nowait = args[method.fields.index(f)] unless f.nil? unless nowait or method.responses.empty? - resp = @responses.pop().payload + resp = @responses.get().payload if resp.method.content? content = read_content(@responses) else @@ -230,8 +232,8 @@ module Qpid end - def Qpid.read_content(queue) - frame = queue.pop() + def Qpid08.read_content(queue) + frame = queue.get() header = frame.payload children = [] 1.upto(header.weight) { children << read_content(queue) } @@ -239,7 +241,7 @@ module Qpid read = 0 buf = "" while read < size - body = queue.pop() + body = queue.get() content = body.payload.content buf << content read += content.size diff --git a/qpid/ruby/lib/qpid/qmf.rb b/qpid/ruby/lib/qpid/qmf.rb new file mode 100644 index 0000000000..05827dfd67 --- /dev/null +++ b/qpid/ruby/lib/qpid/qmf.rb @@ -0,0 +1,1503 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Console API for Qpid Management Framework + +require 'socket' +require 'monitor' +require 'uri' +require 'time' + +module Qpid::Qmf + + # To access the asynchronous operations, a class must be derived from + # Console with overrides of any combination of the available methods. + class Console + + # Invoked when a connection is established to a broker + def broker_connected(broker); end + + # Invoked when the connection to a broker is lost + def broker_disconnected(broker); end + + # Invoked when a QMF package is discovered + def new_package(name); end + + # Invoked when a new class is discovered. Session.getSchema can be + # used to obtain details about the class + def new_class(kind, klass_key); end + + # Invoked when a QMF agent is discovered + def new_agent(agent); end + + # Invoked when a QMF agent disconects + def del_agent(agent); end + + # Invoked when an object is updated + def object_props(broker, record); end + + # Invoked when an object is updated + def objectStats(broker, record); end + + # Invoked when an event is raised + def event(broker, event); end + + def heartbeat(agent, timestamp); end + + def broker_info(broker); end + end + + class BrokerURL + def initialize(text) + uri = URI.parse(text) + + @host = uri.host + @port = uri.port ? uri.port : 5672 + @auth_name = uri.user ? uri.user : "guest" + @auth_pass = uri.password ? uri.password: "guest" + @auth_mech = "PLAIN" + + return uri + end + + def name + "#{@host}:#{@port}" + end + + def match(host, port) + # FIXME: Unlcear what the Python code is actually checking for + # here, especially since HOST can resolve to multiple IP's + @port == port && + (host == @host || ipaddr(host, port) == ipaddr(@host, @port)) + end + + private + def ipaddr(host, port) + s = Socket::getaddrinfo(host, port, + Socket::AF_INET, Socket::SOCK_STREAM) + s[0][2] + end + end + + # An instance of the Session class represents a console session running + # against one or more QMF brokers. A single instance of Session is + # needed to interact with the management framework as a console. + class Session + CONTEXT_SYNC = 1 + CONTEXT_STARTUP = 2 + CONTEXT_MULTIGET = 3 + + GET_WAIT_TIME = 60 + + include MonitorMixin + + attr_reader :binding_key_list, :select, :seq_mgr, :console, :packages + + # Initialize a session. If the console argument is provided, the + # more advanced asynchronous features are available. If console is + # defaulted, the session will operate in a simpler, synchronous + # manner. The rcvObjects, rcvEvents, and rcvHeartbeats arguments + # are meaningful only if 'console' is provided. They control + # whether object updates, events, and agent-heartbeats are + # subscribed to. If the console is not interested in receiving one + # or more of the above, setting the argument to False will reduce + # tha bandwidth used by the API. If manageConnections is set to + # True, the Session object will manage connections to the brokers. + # This means that if a broker is unreachable, it will retry until a + # connection can be established. If a connection is lost, the + # Session will attempt to reconnect. + # + # If manageConnections is set to False, the user is responsible for + # handing failures. In this case, an unreachable broker will cause + # addBroker to raise an exception. If userBindings is set to False + # (the default) and rcvObjects is True, the console will receive + # data for all object classes. If userBindings is set to True, the + # user must select which classes the console shall receive by + # invoking the bindPackage or bindClass methods. This allows the + # console to be configured to receive only information that is + # relavant to a particular application. If rcvObjects id False, + # userBindings has no meaning. + # + # Accept a hash of parameters, where keys can be :console, + # :rcv_objects, :rcv_events, :rcv_heartbeats, :manage_connections, + # and :user_bindings + def initialize(kwargs = {}) + super() + @console = kwargs[:console] || nil + @brokers = [] + @packages = {} + @seq_mgr = SequenceManager.new + @cv = new_cond + @sync_sequence_list = [] + @result = [] + @select = [] + @error = nil + @rcv_objects = kwargs[:rcv_objects] || true + @rcv_events = kwargs[:rcv_events] || true + @rcv_heartbeats = kwargs[:rcv_heartbeats] || true + @user_bindings = kwargs[:user_bindings] || false + unless @console + @rcv_objects = false + @rcv_events = false + @rcv_heartbeats = false + end + @binding_key_list = binding_keys + @manage_connections = kwargs[:manage_connections] || false + + if @user_bindings && ! @rcv_objects + raise ArgumentError, "user_bindings can't be set unless rcv_objects is set and a console is provided" + end + + #raise NotImplementedError, @manage_connections + end + + def to_s + "QMF Console Session Manager (brokers connected: #{@brokers.size})" + end + + # Connect to a Qpid broker. Returns an object of type Broker + def add_broker(target="amqp://localhost") + uri = URI.parse(target) + broker = Broker.new(self, uri.host, uri.port, "PLAIN", uri.user, uri.password) + unless broker.connected? || @manage_connections + raise broker.error + end + + @brokers << broker + objects(:broker => broker, :class => "agent") + return broker + end + + # Disconnect from a broker. The 'broker' argument is the object + # returned from the addBroker call + def del_broker(broker) + broker.shutdown + @brokers.delete(broker) + end + + # Get the list of known classes within a QMF package + def classes(package_name) + @brokers.each { |broker| broker.wait_for_stable } + if @packages.include?(package_name) + # FIXME What's the actual structure of @packages[package_name] + @packages[package_name].inject([]) do |list, cname, hash| + list << [ package_name, cname, hash] + end + end + end + + # Get the schema for a QMF class + def schema(klass_key) + @brokers.each { |broker| broker.wait_for_stable } + pname, cname, hash = klass_key + if @packages.include?(pname) + @packages[pname][ [cname, hash] ] + end + end + + def bind_package(package_name) + unless @user_bindings && @rcv_objects + raise "userBindings option not set for Session" + end + @brokers.each do |broker| + args = { :exchange => "qpid.management", + :queue => broker.topicName, + :binding_key => "console.obj.#{package_name}" } + broker.amqpSession.exchange_bind(args) + end + end + + def bind_class(klass_key) + unless @user_bindings && @rcv_objects + raise "userBindings option not set for Session" + end + pname, cname, hash = klass_key + @brokers.each do |broker| + args = { :exchange => "qpid.management", + :queue => broker.topicName, + :binding_key => "console.obj.#{pname}.#{cname}" } + broker.amqpSession.exchange_bind(args) + end + end + + # Get a list of currently known agents + def agents(broker=nil) + broker_list = [] + if broker.nil? + broker_list = @brokers.dup + else + broker_list << broker + end + broker_list.each { |b| b.wait_for_stable } + agent_list = [] + broker_list.each { |b| agent_list += b.agents } + return agent_list + end + + # Get a list of objects from QMF agents. + # All arguments are passed by name(keyword). + # + # The class for queried objects may be specified in one of the + # following ways: + # :schema => <schema> - supply a schema object returned from getSchema. + # :key => <key> - supply a klass_key from the list returned by getClasses. + # :class => <name> - supply a class name as a string. If the class name exists + # in multiple packages, a _package argument may also be supplied. + # :object_id = <id> - get the object referenced by the object-id + # + # If objects should be obtained from only one agent, use the following argument. + # Otherwise, the query will go to all agents. + # + # :agent = <agent> - supply an agent from the list returned by getAgents. + # If the get query is to be restricted to one broker (as opposed to + # all connected brokers), add the following argument: + # + # :broker = <broker> - supply a broker as returned by addBroker. + # + # If additional arguments are supplied, they are used as property + # selectors, as long as their keys are strings. For example, if + # the argument "name" => "test" is supplied, only objects whose + # "name" property is "test" will be returned in the result. + def objects(kwargs) + if kwargs.include?(:broker) + broker_list = [] + broker_list << kwargs[:broker] + else + broker_list = @brokers + end + broker_list.each { |broker| broker.wait_for_stable } + + agent_list = [] + if kwargs.include?(:agent) + agent = kwargs[:agent] + unless broker_list.include?(agent.broker) + raise ArgumentError, "Supplied agent is not accessible through the supplied broker" + end + agent_list << agent + else + broker_list.each { |broker| agent_list += broker.agents } + end + + cname = nil + if kwargs.include?(:schema) + # FIXME: What kind of object is kwargs[:schema] + pname, cname, hash = kwargs[:schema].getKey() + elsif kwargs.include?(:key) + pname, cname, hash = kwargs[:key] + elsif kwargs.include?(:class) + pname, cname, hash = [kwargs[:package], kwargs[:class], nil] + end + if cname.nil? && ! kwargs.include?(:object_id) + raise ArgumentError, + "No class supplied, use :schema, :key, :class, or :object_id' argument" + end + + map = {} + @select = [] + if kwargs.include?(:object_id) + map["_objectId"] = kwargs[:object_id].to_str + else + map["_class"] = cname + map["_package"] = pname if pname + map["_hash"] = hash if hash + kwargs.each do |k,v| + @select << [k, v] if k.is_a?(String) + end + end + + @result = [] + agent_list.each do |agent| + broker = agent.broker + send_codec = Qpid::StringCodec.new(broker.conn.spec) + seq = nil + synchronize do + seq = @seq_mgr.reserve(CONTEXT_MULTIGET) + @sync_sequence_list << seq + end + broker.set_header(send_codec, ?G, seq) + send_codec.write_map(map) + smsg = broker.message(send_codec.encoded, "agent.#{agent.bank}") + broker.emit(smsg) + end + + timeout = false + synchronize do + unless @cv.wait_for(GET_WAIT_TIME) { + @sync_sequence_list.empty? || @error } + @sync_sequence_list.each do |pending_seq| + @seq_mgr.release(pending_seq) + end + @sync_sequence_list = [] + timeout = true + end + end + + if @error + errorText = @error + @error = nil + raise errorText + end + + if @result.empty? && timeout + raise "No agent responded within timeout period" + end + @result + end + + def set_event_filter(kwargs); end + + def handle_broker_connect(broker); end + + def handle_broker_resp(broker, codec, seq) + broker.broker_id = codec.read_uuid + @console.broker_info(broker) if @console + + # Send a package request + # (effectively inc and dec outstanding by not doing anything) + send_codec = Qpid::StringCodec.new(broker.conn.spec) + seq = @seq_mgr.reserve(CONTEXT_STARTUP) + broker.set_header(send_codec, ?P, seq) + smsg = broker.message(send_codec.encoded) + broker.emit(smsg) + end + + def handle_package_ind(broker, codec, seq) + pname = codec.read_str8 + new_package = false + synchronize do + new_package = ! @packages.include?(pname) + @packages[pname] = {} if new_package + end + @console.new_package(pname) if @console + + # Send a class request + broker.inc_outstanding + send_codec = Qpid::StringCodec.new(broker.conn.spec) + seq = @seq_mgr.reserve(CONTEXT_STARTUP) + broker.set_header(send_codec, ?Q, seq) + send_codec.write_str8(pname) + smsg = broker.message(send_codec.encoded) + broker.emit(smsg) + end + + def handle_command_complete(broker, codec, seq) + code = codec.read_uint32 + text = codec.read_str8 + context = @seq_mgr.release(seq) + if context == CONTEXT_STARTUP + broker.dec_outstanding + elsif context == CONTEXT_SYNC && seq == broker.sync_sequence + broker.sync_done + elsif context == CONTEXT_MULTIGET && @sync_sequence_list.include?(seq) + synchronize do + @sync_sequence_list.delete(seq) + @cv.signal if @sync_sequence_list.empty? + end + end + end + + def handle_class_ind(broker, codec, seq) + kind = codec.read_uint8 + pname = codec.read_str8 + cname = codec.read_str8 + hash = codec.read_bin128 + unknown = false + + synchronize do + return unless @packages.include?(pname) + unknown = true unless @packages[pname].include?([cname, hash]) + end + + if unknown + # Send a schema request for the unknown class + broker.inc_outstanding + send_codec = Qpid::StringCodec.new(broker.conn.spec) + seq = @seq_mgr.reserve(CONTEXT_STARTUP) + broker.set_header(send_codec, ?S, seq) + send_codec.write_str8(pname) + send_codec.write_str8(cname) + send_codec.write_bin128(hash) + smsg = broker.message(send_codec.encoded) + broker.emit(smsg) + end + end + + def handle_method_resp(broker, codec, seq) + code = codec.read_uint32 + + text = codec.read_str16 + out_args = {} + method, synchronous = @seq_mgr.release(seq) + if code == 0 + method.arguments.each do |arg| + if arg.dir.index(?O) + out_args[arg.name] = decode_value(codec, arg.type) + end + end + end + result = MethodResult.new(code, text, out_args) + if synchronous: + broker.synchronize do + broker.sync_result = MethodResult.new(code, text, out_args) + broker.sync_done + end + else + @console.method_response(broker, seq, result) if @console + end + end + + def handle_heartbeat_ind(broker, codec, seq, msg) + if @console + broker_bank = 1 + agent_bank = 0 + dp = msg.get("delivery_properties") + if dp + key = dp["routing_key"] + key_elements = key.split(".") + if key_elements.length == 4 + broker_bank = key_elements[2].to_i + agent_bank = key_lements[2].to_i + end + end + agent = broker.agent(broker_bank, agent_bank) + timestamp = codec.read_uint64 + @console.heartbeat(agent, timestamp) + end + end + + def handle_event_ind(broker, codec, seq) + if @console + event = Event.new(self, broker, codec) + @console.event(broker, event) + end + end + + def handle_schema_resp(broker, codec, seq) + kind = codec.read_uint8 + pname = codec.read_str8 + cname = codec.read_str8 + hash = codec.read_bin128 + klass_key = [pname, cname, hash] + klass = SchemaClass.new(kind, klass_key, codec) + synchronize { @packages[pname][ [cname, hash] ] = klass } + + @seq_mgr.release(seq) + broker.dec_outstanding + @console.new_class(kind, klass_key) if @console + end + + def handle_content_ind(broker, codec, seq, prop=false, stat=false) + pname = codec.read_str8 + cname = codec.read_str8 + hash = codec.read_bin128 + klass_key = [pname, cname, hash] + + schema = nil + synchronize do + return unless @packages.include?(pname) + return unless @packages[pname].include?([cname, hash]) + schema = @packages[pname][ [cname, hash] ] + end + + object = Qpid::Qmf::Object.new(self, broker, schema, codec, prop, stat) + if pname == "org.apache.qpid.broker" && cname == "agent" + broker.update_agent(object) + end + + synchronize do + if @sync_sequence_list.include?(seq) + if object.timestamps()[2] == 0 && select_match(object) + @result << object + end + return + end + end + + @console.object_props(broker, object) if @console && prop + @console.object_stats(broker, object) if @console && stat + end + + def handle_broker_disconnect(broker); end + + def handle_error(error) + @error = error + synchronize do + @sync_sequence_list = [] + @cv.signal + end + end + + # Decode, from the codec, a value based on its typecode + def decode_value(codec, typecode) + case typecode + when 1: data = codec.read_uint8 # U8 + when 2: data = codec.read_uint16 # U16 + when 3: data = codec.read_uint32 # U32 + when 4: data = codec.read_uint64 # U64 + when 6: data = codec.read_str8 # SSTR + when 7: data = codec.read_str16 # LSTR + when 8: data = codec.read_int64 # ABSTIME + when 9: data = codec.read_uint64 # DELTATIME + when 10: data = ObjectId.new(codec) # REF + when 11: data = codec.read_uint8 != 0 # BOOL + when 12: data = codec.read_float # FLOAT + when 13: data = codec.read_double # DOUBLE + when 14: data = codec.read_uuid # UUID + when 15: data = codec.read_map # FTABLE + when 16: data = codec.read_int8 # S8 + when 17: data = codec.read_int16 # S16 + when 18: data = codec.read_int32 # S32 + when 19: data = codec.read_int64 # S64 + else + raise ArgumentError, "Invalid type code: #{typecode} - #{typecode.inspect}" + end + return data + end + + # Encode, into the codec, a value based on its typecode + def encode_value(codec, value, typecode) + # FIXME: Python does a lot of magic type conversions + # We just assume that value has the right type; this is safer + # than coercing explicitly, since Array::pack will complain + # loudly about various type errors + case typecode + when 1: codec.write_uint8(value) # U8 + when 2: codec.write_uint16(value) # U16 + when 3: codec.write_uint32(value) # U32 + when 4: codec.write_uint64(value) # U64 + when 6: codec.write_str8(value) # SSTR + when 7: codec.write_str16(value) # LSTR + when 8: codec.write_int64(value) # ABSTIME + when 9: codec.write_uint64(value) # DELTATIME + when 10: value.encode(codec) # REF + when 11: codec.write_uint8(value ? 1 : 0) # BOOL + when 12: codec.write_float(value) # FLOAT + when 13: codec.write_double(value) # DOUBLE + when 14: codec.write_uuid(value) # UUID + when 15: codec.write_map(value) # FTABLE + when 16: codec.write_int8(value) # S8 + when 17: codec.write_int16(value) # S16 + when 18: codec.write_int32(value) # S32 + when 19: codec.write_int64(value) # S64 + else + raise ValueError, "Invalid type code: %d" % typecode + end + end + + def display_value(value, typecode) + case typecode + when 1: return value.to_s + when 2: return value.to_s + when 3: return value.to_s + when 4: return value.to_s + when 6: return value.to_s + when 7: return value.to_s + when 8: return strftime("%c", gmtime(value / 1000000000)) + when 9: return value.to_s + when 10: return value.to_s + when 11: return value ? 'T' : 'F' + when 12: return value.to_s + when 13: return value.to_s + when 14: return Qpid::UUID::format(hash) + when 15: return value.to_s + when 16: return value.to_s + when 17: return value.to_s + when 18: return value.to_s + when 19: return value.to_s + else + raise ValueError, "Invalid type code: %d" % typecode + end + end + + private + + def binding_keys + key_list = [] + key_list << "schema.#" + if @rcv_objects && @rcv_events && @rcv_heartbeats && + ! @user_bindings + key_list << "console.#" + else + if @rcv_objects && ! @user_bindings + key_list << "console.obj.#" + else + key_list << "console.obj.org.apache.qpid.broker.agent" + end + key_list << "console.event.#" if @rcv_events + key_list << "console.heartbeat" if @rcv_heartbeats + end + return key_list + end + + # Check the object against select to check for a match + def select_match(object) + select.each do |key, value| + object.properties.each do |prop, propval| + return false if key == prop.name && value != propval + end + end + return true + end + + end + + class Package + attr_reader :name + + def initialize(name) + @name = name + end + end + + class ClassKey + attr_reader :package, :klass_name, :hash + + def initialize(package, klass_name, hash) + @package = package + @klass_name = klass_name + @hash = hash + end + end + + class SchemaClass + + CLASS_KIND_TABLE = 1 + CLASS_KIND_EVENT = 2 + + attr_reader :klass_key, :properties, :statistics, :methods, :arguments + + def initialize(kind, key, codec) + @kind = kind + @klass_key = key + @properties = [] + @statistics = [] + @methods = [] + @arguments = [] + + if @kind == CLASS_KIND_TABLE + prop_count = codec.read_uint16 + stat_count = codec.read_uint16 + method_count = codec.read_uint16 + prop_count.times { |idx| + @properties << SchemaProperty.new(codec) } + stat_count.times { |idx| + @statistics << SchemaStatistic.new(codec) } + method_count.times { |idx| + @methods<< SchemaMethod.new(codec) } + elsif @kind == CLASS_KIND_EVENT + arg_count = codec.read_uint16 + arg_count.times { |idx| + sa = SchemaArgument.new(codec, false) + @arguments << sa + } + end + end + + def to_s + pname, cname, hash = @klass_key + if @kind == CLASS_KIND_TABLE + kind_str = "Table" + elsif @kind == CLASS_KIND_EVENT + kind_str = "Event" + else + kind_str = "Unsupported" + end + result = "%s Class: %s:%s " % [kind_str, pname, cname] + result += Qpid::UUID::format(hash) + return result + end + end + + class SchemaProperty + + attr_reader :name, :type, :access, :index, :optional, + :unit, :min, :max, :maxlan, :desc + + def initialize(codec) + map = codec.read_map + @name = map["name"] + @type = map["type"] + @access = map["access"] + @index = map["index"] != 0 + @optional = map["optional"] != 0 + @unit = map["unit"] + @min = map["min"] + @max = map["max"] + @maxlan = map["maxlen"] + @desc = map["desc"] + end + + def to_s + @name + end + end + + class SchemaStatistic + + attr_reader :name, :type, :unit, :desc + + def initialize(codec) + map = codec.read_map + @name = map["name"] + @type = map["type"] + @unit = map["unit"] + @desc = map["desc"] + end + + def to_s + @name + end + end + + class SchemaMethod + + attr_reader :name, :desc, :arguments + + def initialize(codec) + map = codec.read_map + @name = map["name"] + arg_count = map["argCount"] + @desc = map["desc"] + @arguments = [] + arg_count.times { |idx| + @arguments << SchemaArgument.new(codec, true) + } + end + + def to_s + result = @name + "(" + first = true + result += @arguments.select { |arg| arg.dir.index(?I) }.join(", ") + result += ")" + return result + end + end + + class SchemaArgument + + attr_reader :name, :type, :dir, :unit, :min, :max, :maxlen + attr_reader :desc, :default + + def initialize(codec, method_arg) + map = codec.read_map + @name = map["name"] + @type = map["type"] + @dir = map["dir"].upcase if method_arg + @unit = map["unit"] + @min = map["min"] + @max = map["max"] + @maxlen = map["maxlen"] + @desc = map["desc"] + @default = map["default"] + end + end + + # Object that represents QMF object identifiers + class ObjectId + + include Comparable + + attr_reader :first, :second + + def initialize(codec, first=0, second=0) + if codec + @first = codec.read_uint64 + @second = codec.read_uint64 + else + @first = first + @second = second + end + end + + def <=>(other) + return 1 unless other.is_a?(ObjectId) + return -1 if first < other.first + return 1 if first > other.first + return second <=> other.second + end + + def to_s + "%d-%d-%d-%d-%d" % [flags, sequence, broker_bank, agent_bank, object] + end + + def index + [first, second] + end + + def flags + (first & 0xF000000000000000) >> 60 + end + + def sequence + (first & 0x0FFF000000000000) >> 48 + end + + def broker_bank + (first & 0x0000FFFFF0000000) >> 28 + end + + def agent_bank + first & 0x000000000FFFFFFF + end + + def object + second + end + + def durable? + sequence == 0 + end + + def encode(codec) + codec.write_uint64(first) + codec.write_uint64(second) + end + end + + class Object + + attr_reader :object_id, :schema, :properties, :statistics, + :current_time, :create_time, :delete_time, :broker + + def initialize(session, broker, schema, codec, prop, stat) + @session = session + @broker = broker + @schema = schema + @current_time = codec.read_uint64 + @create_time = codec.read_uint64 + @delete_time = codec.read_uint64 + @object_id = ObjectId.new(codec) + @properties = [] + @statistics = [] + if prop + missing = parse_presence_masks(codec, schema) + schema.properties.each do |property| + v = nil + unless missing.include?(property.name) + v = @session.decode_value(codec, property.type) + end + @properties << [property, v] + end + end + + if stat + schema.statistics.each do |statistic| + s = @session.decode_value(codec, statistic.type) + @statistics << [statistic, s] + end + end + end + + def klass_key + @schema.klass_key + end + + + def methods + @schema.methods + end + + # Return the current, creation, and deletion times for this object + def timestamps + return [@current_time, @create_time, @delete_time] + end + + # Return a string describing this object's primary key + def index + @properties.select { |property, value| + property.index + }.collect { |property,value| + value.to_s }.join(":") + end + + # Replace properties and/or statistics with a newly received update + def merge_update(newer) + unless object_id == newer.object_id + raise "Objects with different object-ids" + end + @properties = newer.getProperties unless newer.properties.empty? + @statistics = newer.getStatistics unless newer.statistics.empty? + end + + def to_s + index + end + + # This must be defined because ruby has this (deprecated) method built in. + def id + method_missing(:id) + end + + # Same here.. + def type + method_missing(:type) + end + + def name + method_missing(:name) + end + + def method_missing(name, *args) + name = name.to_s + + if method = @schema.methods.find { |method| name == method.name } + return invoke(method, name, args) + end + + @properties.each do |property, value| + return value if name == property.name + if name == "_#{property.name}_" && property.type == 10 + # Dereference references + deref = @session.objects(:object_id => value, :broker => @broker) + return nil unless deref.size == 1 + return deref[0] + end + end + @statistics.each do |statistic, value| + if name == statistic.name + return value + end + end + raise "Type Object has no attribute '#{name}'" + end + + private + + def send_method_request(method, name, args, synchronous = false) + @schema.methods.each do |schema_method| + if name == schema_method.name + send_codec = Qpid::StringCodec.new(@broker.conn.spec) + seq = @session.seq_mgr.reserve([schema_method, synchronous]) + @broker.set_header(send_codec, ?M, seq) + @object_id.encode(send_codec) + pname, cname, hash = @schema.klass_key + send_codec.write_str8(pname) + send_codec.write_str8(cname) + send_codec.write_bin128(hash) + send_codec.write_str8(name) + + formals = method.arguments.select { |arg| arg.dir.index(?I) } + count = method.arguments.select { |arg| arg.dir.index(?I) }.size + unless formals.size == args.size + raise "Incorrect number of arguments: expected #{formals.size}, got #{args.size}" + end + + formals.zip(args).each do |formal, actual| + @session.encode_value(send_codec, actual, formal.type) + end + + smsg = @broker.message(send_codec.encoded, + "agent.#{object_id.broker_bank}.#{object_id.agent_bank}") + + @broker.sync_start if synchronous + @broker.emit(smsg) + + return seq + end + end + end + + def invoke(method, name, args) + if send_method_request(method, name, args, synchronous = true) + unless @broker.wait_for_sync_done + @session.seq_mgr.release(seq) + raise "Timed out waiting for method to respond" + end + + if @broker.error + error_text = @broker.error + @broker.error = nil + raise error_text + end + + return @broker.sync_result + end + raise "Invalid Method (software defect) [#{name}]" + end + + def parse_presence_masks(codec, schema) + exclude_list = [] + bit = 0 + schema.properties.each do |property| + if property.optional + if bit == 0 + mask = codec.read_uint8 + bit = 1 + end + if (mask & bit) == 0 + exclude_list << property.name + end + bit *= 2 + bit = 0 if bit == 256 + end + end + return exclude_list + end + end + + class MethodResult + + attr_reader :status, :text + + def initialize(status, text, out_args) + @status = status + @text = text + @out_args = out_args + end + + def method_missing(name) + name = name.to_s() + if @out_args.include?(name) + return @out_args[name] + else + raise "Unknown method result arg #{name}" + end + end + + def to_s + "#{text} (#{status}) - #{out_args.inspect}" + end + end + + class Broker + + SYNC_TIME = 60 + + include MonitorMixin + + attr_accessor :error + + attr_reader :amqp_session_id, :amqp_session, :conn + + attr_accessor :broker_id, :sync_result + + def initialize(session, host, port, auth_mech, auth_user, auth_pass) + super() + + # For debugging.. + Thread.abort_on_exception = true + + @session = session + @host = host + @port = port + @auth_user = auth_user + @auth_pass = auth_pass + @agents = {} + @agents["1.0"] = Agent.new(self, "1.0", "BrokerAgent") + @topic_bound = false + @cv = new_cond + @sync_in_flight = false + @sync_request = 0 + @sync_result = nil + @reqs_outstanding = 1 + @error = nil + @broker_id = nil + @is_connected = false + @conn = nil + try_to_connect + end + + def connected? + @is_connected + end + + def agent(broker_bank, agent_bank) + bank_key = "%d.%d" % [broker_bank, agent_bank] + return @agents[bank_key] + end + + # Get the list of agents reachable via this broker + def agents + @agents.values + end + + def url + "#{@host}:#{@port}" + end + + def to_s + if connected? + "Broker connected at: #{url}" + else + "Disconnected Broker" + end + end + + def wait_for_sync_done + synchronize do + return @cv.wait_for(SYNC_TIME) { ! @sync_in_flight || @error } + end + end + + def wait_for_stable + synchronize do + return if @reqs_outstanding == 0 + @sync_in_flight = true + unless @cv.wait_for(SYNC_TIME) { @reqs_outstanding == 0 } + raise "Timed out waiting for broker to synchronize" + end + end + end + + # Compose the header of a management message + def set_header(codec, opcode, seq=0) + codec.write_uint8(?A) + codec.write_uint8(?M) + codec.write_uint8(?2) + codec.write_uint8(opcode) + codec.write_uint32(seq) + end + + def message(body, routing_key="broker") + dp = @amqp_session.delivery_properties + dp.routing_key = routing_key + mp = @amqp_session.message_properties + mp.content_type = "x-application/qmf" + mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name) + return Qpid::Message.new(dp, mp, body) + end + + def emit(msg, dest="qpid.management") + @amqp_session.message_transfer(:destination => dest, + :message => msg) + end + + def inc_outstanding + synchronize { @reqs_outstanding += 1 } + end + + def dec_outstanding + synchronize do + @reqs_outstanding -= 1 + if @reqs_outstanding == 0 && ! @topic_bound + @topic_bound = true + @session.binding_key_list.each do |key| + args = { + :exchange => "qpid.management", + :queue => @topic_name, + :binding_key => key } + @amqp_session.exchange_bind(args) + end + end + if @reqs_outstanding == 0 && @sync_in_flight + sync_done + end + end + end + + def sync_start + synchronize { @sync_in_flight = true } + end + + def sync_done + synchronize do + @sync_in_flight = false + @cv.signal + end + end + + def update_agent(obj) + bank_key = "%d.%d" % [obj.brokerBank, obj.agentBank] + if obj.delete_time == 0 + unless @agents.include?(bank_key) + agent = Agent.new(self, bank_key, obj.label) + @agents[bank_key] = agent + @session.console.new_agent(agent) if @session.console + end + else + agent = @agents.delete(bank_key) + @session.console.del_agent(agent) if agent && @session.console + end + end + + def shutdown + if connected? + @amqp_session.incoming("rdest").stop + if @session.console + @amqp_session.incoming("tdest").stop + end + @amqp_session.close + @is_connected = false + else + raise "Broker already disconnected" + end + end + + private + + def try_to_connect + #begin + @amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid] + # FIXME: Need sth for Qpid::Util::connect + + @conn = Qpid::Connection.new(TCPSocket.new(@host, @port), + :username => @auth_user, + :password => @auth_pass) + @conn.start + @reply_name = "reply-%s" % amqp_session_id + @amqp_session = @conn.session(@amqp_session_id) + @amqp_session.auto_sync = true + + @amqp_session.queue_declare(:queue => @reply_name, + :exclusive => true, + :auto_delete => true) + + @amqp_session.exchange_bind(:exchange => "amq.direct", + :queue => @reply_name, + :binding_key => @reply_name) + @amqp_session.message_subscribe(:queue => @reply_name, + :destination => "rdest", + :accept_mode => @amqp_session.message_accept_mode.none, + :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired) + q = @amqp_session.incoming("rdest") + q.exc_listen(& method(:exception_cb)) + q.listen(& method(:reply_cb)) + @amqp_session.message_set_flow_mode(:destination => "rdest", + :flow_mode => 1) + @amqp_session.message_flow(:destination => "rdest", + :unit => 0, + :value => 0xFFFFFFFF) + @amqp_session.message_flow(:destination => "rdest", + :unit => 1, + :value => 0xFFFFFFFF) + + @topic_name = "topic-#{@amqp_session_id}" + @amqp_session.queue_declare(:queue => @topic_name, + :exclusive => true, + :auto_delete => true) + @amqp_session.message_subscribe(:queue => @topic_name, + :destination => "tdest", + :accept_mode => @amqp_session.message_accept_mode.none, + :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired) + @amqp_session.incoming("tdest").listen(& method(:reply_cb)) + @amqp_session.message_set_flow_mode(:destination => "tdest", + :flow_mode => 1) + @amqp_session.message_flow(:destination => "tdest", + :unit => 0, + :value => 0xFFFFFFFF) + @amqp_session.message_flow(:destination => "tdest", + :unit => 1, + :value => 0xFFFFFFFF) + + @is_connected = true + @session.handle_broker_connect(self) + + codec = Qpid::StringCodec.new(@conn.spec) + set_header(codec, ?B) + msg = message(codec.encoded) + emit(msg) + # FIXME: These exceptions are bogus here + #rescue socket.error => e + # @error = "Socket Error %s - %s" % [e[0], e[1]] + #rescue Closed => e + # @error = "Connect Failed %d - %s" % [e[0], e[1]] + #rescue ConnectionFailed => e + # @error = "Connect Failed %d - %s" % [e[0], e[1]] + #end + end + + # Check the header of a management message and extract the opcode and + # class + def check_header(codec) + begin + return [nil, nil] unless codec.read_uint8 == ?A + return [nil, nil] unless codec.read_uint8 == ?M + return [nil, nil] unless codec.read_uint8 == ?2 + opcode = codec.read_uint8 + seq = codec.read_uint32 + return [opcode, seq] + rescue + return [nil, nil] + end + end + + def reply_cb(msg) + codec = Qpid::StringCodec.new(@conn.spec, msg.body) + loop do + opcode, seq = check_header(codec) + return unless opcode + case opcode + when ?b: @session.handle_broker_resp(self, codec, seq) + when ?p: @session.handle_package_ind(self, codec, seq) + when ?z: @session.handle_command_complete(self, codec, seq) + when ?q: @session.handle_class_ind(self, codec, seq) + when ?m: @session.handle_method_resp(self, codec, seq) + when ?h: @session.handle_heartbeat_ind(self, codec, seq, msg) + when ?e: @session.handle_event_ind(self, codec, seq) + when ?s: @session.handle_schema_resp(self, codec, seq) + when ?c: @session.handle_content_ind(self, codec, seq, true, false) + when ?i: @session.handle_content_ind(self, codec, seq, false, true) + when ?g: @session.handle_content_ind(self, codec, seq, true, true) + else + raise "Unexpected opcode #{opcode.inspect}" + end + end + end + + def exception_cb(data) + @is_connected = false + @error = data + synchronize { @cv.signal if @sync_in_flight } + @session.handle_error(@error) + @session.handle_broker_disconnect(self) + end + end + + class Agent + attr_reader :broker, :bank + + def initialize(broker, bank, label) + @broker = broker + @bank = bank + @label = label + end + + def to_s + "Agent at bank %s (%s)" % [@bank, @label] + end + end + + class Event + + attr_reader :klass_key, :arguments, :timestamp, :name, :schema + + def initialize(session, broker, codec) + @session = session + @broker = broker + pname = codec.read_str8 + cname = codec.read_str8 + hash = codec.read_bin128 + @klass_key = [pname, cname, hash] + @timestamp = codec.read_int64 + @severity = codec.read_uint8 + @schema = nil + session.packages.keys.each do |pname| + k = [cname, hash] + if session.packages[pname].include?(k) + @schema = session.packages[pname][k] + @arguments = {} + @schema.arguments.each do |arg| + v = session.decode_value(codec, arg.type) + @arguments[arg.name] = v + end + end + end + end + + def to_s + return "<uninterpretable>" unless @schema + t = Time.at(self.timestamp / 1000000000) + out = t.strftime("%c") + out += " " + sev_name + " " + @klass_key[0] + ":" + klass_key[1] + out += " broker=" + @broker.url + @schema.arguments.each do |arg| + out += " " + arg.name + "=" + @session.display_value(@arguments[arg.name], arg.type) + end + return out + end + + def sev_name + case @severity + when 0 : return "EMER " + when 1 : return "ALERT" + when 2 : return "CRIT " + when 3 : return "ERROR" + when 4 : return "WARN " + when 5 : return "NOTIC" + when 6 : return "INFO " + when 7 : return "DEBUG" + else + return "INV-%d" % @severity + end + end + + end + + # Manage sequence numbers for asynchronous method calls + class SequenceManager + include MonitorMixin + + def initialize + super() + @sequence = 0 + @pending = {} + end + + # Reserve a unique sequence number + def reserve (data) + synchronize do + result = @sequence + @sequence += 1 + @pending[result] = data + return result + end + end + + # Release a reserved sequence number + def release (seq) + synchronize { @pending.delete(seq) } + end + end + + class DebugConsole < Console + + def broker_connected(broker) + puts "brokerConnected #{broker}" + end + + def broker_disconnected(broker) + puts "brokerDisconnected #{broker}" + end + + def new_package(name) + puts "newPackage #{name}" + end + + def new_class(kind, klass_key) + puts "newClass #{kind} #{klass_key}" + end + + def new_agent(agent) + puts "new_agent #{agent}" + end + + def del_agent(agent) + puts "delAgent #{agent}" + end + + def object_props(broker, record) + puts "objectProps #{record.klass_key}" + end + + def object_stats(broker, record) + puts "objectStats #{record.klass_key}" + end + + def event(broker, event) + puts "event #{event}" + end + + def heartbeat(agent, timestamp) + puts "heartbeat #{agent}" + end + + def broker_info(broker) + puts "brokerInfo #{broker}" + end + end +end diff --git a/qpid/ruby/lib/qpid/queue.rb b/qpid/ruby/lib/qpid/queue.rb new file mode 100644 index 0000000000..4150173b53 --- /dev/null +++ b/qpid/ruby/lib/qpid/queue.rb @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Augment the standard python multithreaded Queue implementation to add a +# close() method so that threads blocking on the content of a queue can be +# notified if the queue is no longer in use. + +require 'thread' + +# Python nominally uses a bounded queue, but the code never establishes +# a maximum size; we therefore use Ruby's unbounded queue +class Qpid::Queue < ::Queue + + DONE = Object.new + STOP = Object.new + + def initialize + super + @error = nil + @listener = nil + @exc_listener = nil + @exc_listener_lock = Monitor.new + @thread = nil + end + + def close(error = nil) + @error = error + put(DONE) + unless @thread.nil? + @thread.join() + @thread = nil + end + end + + def get(block = true, timeout = nil) + unless timeout.nil? + raise NotImplementedError + end + result = pop(! block) + if result == DONE + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Qpid::Closed exception + put(DONE) + raise Qpid::Closed.new(@error) + else + return result + end + end + + alias :put :push + + def exc_listen(&block) + @exc_listener_lock.synchronize do + @exc_listener = block + end + end + + def listen(&block) + if ! block_given? && @thread + put(STOP) + @thread.join() + @thread = nil + end + + # FIXME: There is a potential race since we could be changing one + # non-nil listener to another + @listener = block + + if block_given? && @thread.nil? + @thread = Thread.new do + loop do + begin + o = get() + break if o == STOP + @listener.call(o) + rescue Qpid::Closed => e + @exc_listener.call(e) if @exc_listener + break + end + end + end + end + end + +end diff --git a/qpid/ruby/lib/qpid/session.rb b/qpid/ruby/lib/qpid/session.rb new file mode 100644 index 0000000000..43a664d285 --- /dev/null +++ b/qpid/ruby/lib/qpid/session.rb @@ -0,0 +1,458 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'monitor' + +module Qpid + + class Session < Invoker + + def log; Qpid::logger["qpid.io.cmd"]; end + def msg; Qpid::logger["qpid.io.msg"]; end + + + class Exception < RuntimeError; end + class Closed < Qpid::Session::Exception; end + class Detached < Qpid::Session::Exception; end + + + INCOMPLETE = Object.new + + def self.client(*args) + return Qpid::Client(*args) + end + + def self.server(*args) + return Server(*args) + end + + attr_reader :name, :spec, :auto_sync, :timeout, :channel + attr_reader :results, :exceptions + attr_accessor :channel, :auto_sync, :send_id, :receiver, :sender + + # FIXME: Pass delegate through a block ? + def initialize(name, spec, kwargs = {}) + auto_sync = true + auto_sync = kwargs[:auto_sync] if kwargs.key?(:auto_sync) + timeout = kwargs[:timeout] || 10 + delegate = kwargs[:delegate] + + @name = name + @spec = spec + @auto_sync = auto_sync + @timeout = timeout + @invoke_lock = Monitor.new + @closing = false + @closed = false + + @cond_lock = Monitor.new + @condition = @cond_lock.new_cond + + @send_id = true + @receiver = Receiver.new(self) + @sender = Sender.new(self) + + @lock = Monitor.new + @incoming = {} + @results = {} + @exceptions = [] + + @assembly = nil + + @delegate = delegate.call(self) if delegate + + @ctl_seg = spec[:segment_type].enum[:control].value + @cmd_seg = spec[:segment_type].enum[:command].value + @hdr_seg = spec[:segment_type].enum[:header].value + @body_seg = spec[:segment_type].enum[:body].value + end + + def incoming(destination) + @lock.synchronize do + queue = @incoming[destination] + unless queue + queue = Incoming.new(self, destination) + @incoming[destination] = queue + end + return queue + end + end + + def error? + @exceptions.size > 0 + end + + def sync(timeout=nil) + if channel && Thread.current == channel.connection.thread + raise Qpid::Session::Exception, "deadlock detected" + end + unless @auto_sync + execution_sync(:sync => true) + end + last = @sender.next_id - 1 + @cond_lock.synchronize do + unless @condition.wait_for(timeout) { + @sender.completed.include?(last) || error? + } + raise Qpid::Timeout + end + end + if error? + raise Qpid::Session::Exception, exceptions + end + end + + def close(timeout=nil) + @invoke_lock.synchronize do + @closing = true + channel.session_detach(name) + end + @cond_lock.synchronize do + unless @condition.wait_for(timeout) { @closed } + raise Qpid::Timeout + end + end + end + + def closed + @lock.synchronize do + return if @closed + + @results.each { |id, f| f.error(exceptions) } + @results.clear + + @incoming.values.each { |q| q.close(exceptions) } + @closed = true + @cond_lock.synchronize { @condition.signal } + end + end + + def resolve_method(name) + o = @spec.children[name] + case o + when Qpid::Spec010::Command + return invocation(:method, o) + when Qpid::Spec010::Struct + return invocation(:method, o) + when Qpid::Spec010::Domain + return invocation(:value, o.enum) unless o.enum.nil? + end + + matches = @spec.children.select { |x| + x.name.to_s.include?(name.to_s) + }.collect { |x| x.name.to_s }.sort + if matches.size == 0 + msg = nil + elsif matches.size == 1 + msg = "Did you mean #{matches[0]} ? " + else + msg = "Did you mean one of #{matches.join(",")} ? " + end + return invocation(:error, msg) + end + + def invoke(type, args) + # XXX + unless type.respond_to?(:track) + return type.create(*args) + end + @invoke_lock.synchronize do + return do_invoke(type, args) + end + end + + def do_invoke(type, args) + raise Qpid::Session::Closed if @closing + raise Qpid::Session::Detached unless channel + + # Clumsy simulation of Python's keyword args + kwargs = {} + if args.size > 0 && args[-1].is_a?(Hash) + if args.size > type.fields.size + kwargs = args.pop + elsif type.fields[args.size - 1].type != @spec[:map] + kwargs = args.pop + end + end + + if type.payload + if args.size == type.fields.size + 1 + message = args.pop + else + message = kwargs.delete(:message) # XXX Really ? + end + else + message = nil + end + + hdr = Qpid::struct(@spec[:header]) + hdr.sync = @auto_sync || kwargs.delete(:sync) + + cmd = type.create(*args.push(kwargs)) + sc = Qpid::StringCodec.new(@spec) + sc.write_command(hdr, cmd) + + seg = Segment.new(true, (message.nil? || + (message.headers.nil? && message.body.nil?)), + type.segment_type, type.track, @channel.id, sc.encoded) + + unless type.result.nil? + result = Future.new(exception=Exception) + @results[@sender.next_id] = result + end + emit(seg) + + log.debug("SENT %s %s %s" % [seg.id, hdr, cmd]) if log + + unless message.nil? + unless message.headers.nil? + sc = Qpid::StringCodec.new(@spec) + message.headers.each { |st| sc.write_struct32(st) } + + seg = Segment.new(false, message.body.nil?, @hdr_seg, + type.track, @channel.id, sc.encoded) + emit(seg) + end + unless message.body.nil? + seg = Segment.new(false, true, @body_seg, type.track, + @channel.id, message.body) + emit(seg) + end + msg.debug("SENT %s" % message) if msg + end + + if !type.result.nil? + return @auto_sync ? result.get(@timeout) : result + elsif @auto_sync + sync(@timeout) + end + end + + def received(seg) + @receiver.received(seg) + if seg.first_segment? + raise Qpid::Session::Exception unless @assembly.nil? + @assembly = [] + end + @assembly << seg + if seg.last_segment? + dispatch(@assembly) + @assembly = nil + end + end + + def dispatch(assembly) + hdr = nil + cmd = nil + header = nil + body = nil + assembly.each do |seg| + d = seg.decode(@spec) + case seg.type + when @cmd_seg + hdr, cmd = d + when @hdr_seg + header = d + when @body_seg + body = d + else + raise Qpid::Session::Exception + end + end + log.debug("RECV %s %s %s" % [cmd.id, hdr, cmd]) if log + + if cmd.type.payload + result = @delegate.send(cmd.type.name, cmd, header, body) + else + result = @delegate.send(cmd.type.name, cmd) + end + + unless cmd.type.result.nil? + execution_result(cmd.id, result) + end + + if result != INCOMPLETE + assembly.each do |seg| + @receiver.has_completed(seg) + # XXX: don't forget to obey sync for manual completion as well + if hdr.sync + @channel.session_completed(@receiver.completed) + end + end + end + end + + # Python calls this 'send', but that has a special meaning + # in Ruby, so we call it 'emit' + def emit(seg) + @sender.emit(seg) + end + + def signal + @cond_lock.synchronize { @condition.signal } + end + + def wait_for(timeout = nil, &block) + @cond_lock.synchronize { @condition.wait_for(timeout, &block) } + end + + def to_s + "<Session: #{name}, #{channel}>" + end + + class Receiver + + attr_reader :completed + attr_accessor :next_id, :next_offset + + def initialize(session) + @session = session + @next_id = nil + @next_offset = nil + @completed = Qpid::RangedSet.new() + end + + def received(seg) + if @next_id.nil? || @next_offset.nil? + raise Exception, "todo" + end + seg.id = @next_id + seg.offset = @next_offset + if seg.last_segment? + @next_id += 1 + @next_offset = 0 + else + @next_offset += seg.payload.size + end + end + + def has_completed(seg) + if seg.id.nil? + raise ArgumentError, "cannot complete unidentified segment" + end + if seg.last_segment? + @completed.add(seg.id) + end + end + + def known_completed(commands) + completed = Qpid::RangedSet.new() + @completed.ranges.each do |c| + unless commands.ranges.find { |kc| + kc.contains(c.lower) && kc.contains(c.upper) + } + completed.add_range(c) + end + end + @completed = completed + end + end + + class Sender + + def initialize(session) + @session = session + @next_id = 0.to_serial + @next_offset = 0 + @segments = [] + @completed = RangedSet.new() + end + + attr_reader :next_id, :completed + + def emit(seg) + seg.id = @next_id + seg.offset = @next_offset + if seg.last_segment? + @next_id += 1 + @next_offset = 0 + else + @next_offset += seg.payload.size + end + @segments << seg + if @session.send_id + @session.send_id = false + @session.channel.session_command_point(seg.id, seg.offset) + end + @session.channel.connection.write_segment(seg) + end + + def has_completed(commands) + @segments = @segments.reject { |seg| commands.include?(seg.id) } + commands.ranges.each do |range| + @completed.add(range.lower, range.upper) + end + end + end + + class Incoming < Qpid::Queue + + def initialize(session, destination) + super() + @session = session + @destination = destination + end + + def start + @session.message_credit_unit.choices.each do |unit| + @session.message_flow(@destination, unit.value, 0xFFFFFFFF) + end + end + + def stop + @session.message_cancel(@destination) + listen # Kill the listener + end + end + + class Delegate + + def initialize(session) + @session = session + end + + #XXX: do something with incoming accepts + def message_accept(ma) nil; end + + def execution_result(er) + future = @session.results.delete(er.command_id) + future.set(er.value) + end + + def execution_exception(ex) + @session.exceptions.append(ex) + end + end + + class Client < Delegate + + def log ; Qpid::logger["qpid.io.msg"]; end + + def message_transfer(cmd, headers, body) + m = Qpid::Message.new(body) + m.headers = headers + m.id = cmd.id + messages = @session.incoming(cmd.destination) + messages.put(m) + log.debug("RECV %s" % m) if log + return INCOMPLETE + end + end + end +end diff --git a/qpid/ruby/lib/qpid/spec.rb b/qpid/ruby/lib/qpid/spec.rb new file mode 100644 index 0000000000..b3d70d019d --- /dev/null +++ b/qpid/ruby/lib/qpid/spec.rb @@ -0,0 +1,183 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "set" +require "rexml/document" +require "qpid/fields" +require "qpid/traverse" + +module Qpid + module Spec + + include REXML + + class Container < Array + + def initialize() + @cache = {} + end + + def [](key) + return @cache[key] if @cache.include?(key) + value = do_lookup(key) + @cache[key] = value + return value + end + + def do_lookup(key) + case key + when String + return find {|x| x.name == key.intern()} + when Symbol + return find {|x| x.name == key} + else + return slice(key) + end + end + + def +(other) + copy = clone() + copy.concat(other) + return copy + end + + end + + class Reference + + fields(:name) + + def init(&block) + @resolver = block + end + + def resolve(*args) + @resolver.call(*args) + end + + end + + class Loader + + def initialize() + @stack = [] + end + + def container() + return Container.new() + end + + def load(obj) + case obj + when String + elem = @stack[-1] + result = container() + elem.elements.each(obj) {|e| + @index = result.size + result << load(e) + } + @index = nil + return result + else + elem = obj + @stack << elem + begin + result = send(:"load_#{elem.name}") + ensure + @stack.pop() + end + return result + end + end + + def element + @stack[-1] + end + + def text + element.text + end + + def attr(name, type = :string, default = nil, path = nil) + if path.nil? + elem = element + else + elem = nil + element.elements.each(path) {|elem|} + if elem.nil? + return default + end + end + + value = elem.attributes[name] + value = value.strip() unless value.nil? + if value.nil? + default + else + send(:"parse_#{type}", value) + end + end + + def parse_int(value) + if value.nil? + return nil + else + value.to_i(0) + end + end + + TRUE = ["yes", "true", "1"].to_set + FALSE = ["no", "false", "0", nil].to_set + + def parse_bool(value) + if TRUE.include?(value) + true + elsif FALSE.include?(value) + false + else + raise Exception.new("parse error, expecting boolean: #{value}") + end + end + + def parse_string(value) + value.to_s + end + + def parse_symbol(value) + value.intern() unless value.nil? + end + + REPLACE = {" " => "_", "-" => "_"} + KEYWORDS = {"global" => "global_", "return" => "return_"} + + def parse_name(value) + return if value.nil? + + REPLACE.each do |k, v| + value = value.gsub(k, v) + end + + value = KEYWORDS[value] if KEYWORDS.has_key? value + return value.intern() + end + + end + + end +end diff --git a/qpid/ruby/lib/qpid/spec010.rb b/qpid/ruby/lib/qpid/spec010.rb new file mode 100644 index 0000000000..517c5a168a --- /dev/null +++ b/qpid/ruby/lib/qpid/spec010.rb @@ -0,0 +1,497 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "qpid/spec" +require 'pathname' +require 'fileutils' + +module Qpid::Spec010 + + include Qpid::Spec + + AMQP_SPEC_DEFAULT_DIR = "/usr/share/amqp" + + # XXX: workaround for ruby bug/missfeature + Reference = Reference + Loader = Loader + + class Spec + + ENCODINGS = { + String => "vbin16", + Fixnum => "int64", + Bignum => "int64", + Float => "float", + NilClass => "void", + Array => "list", + Hash => "map" + } + + fields(:major, :minor, :port, :children) + + def init() + @controls = {} + @commands = {} + @structs = {} + @types = {} + children.each {|c| + case c + when Control + @controls[c.code] = c + when Command + @commands[c.code] = c + when Struct + @structs[c.code] = c + when Type + @types[c.code] = c unless c.code.nil? + end + } + end + + attr_reader :controls, :commands, :structs, :types + + def [](key) + return @children[key] + end + + def encoding(klass) + if ENCODINGS.has_key?(klass) + return self[ENCODINGS[klass]] + end + for base in klass.__bases__ + result = encoding(base) + return result unless result.nil? + end + end + + def inspect; "spec"; end + end + + class Constant + + fields(:name, :value) + + attr :parent, true + + end + + class Type + + fields(:name, :code, :fixed, :variable) + + attr :parent, true + + def present?(value) + if @fixed == 0 + return value + else + return !value.nil? + end + end + + def encode(codec, value) + codec.send("write_#{name}", value) + end + + def decode(codec) + return codec.send("read_#{name}") + end + + def inspect; name; end + + end + + class Domain < Type + + fields(:name, :type, :enum) + + attr :parent, true + + def encode(codec, value) + @type.encode(codec, value) + end + + def decode(codec) + return @type.decode(codec) + end + + end + + class Enum + fields(:choices) + + def [](choice) + case choice + when String + choice = choice.to_sym + return choices.find { |c| c.name == choice } + when Symbol + return choices.find { |c| c.name == choice } + else + return choices.find { |c| c.value == choice } + end + end + + def method_missing(name, *args) + raise ArgumentError.new("wrong number of arguments") unless args.empty? + return self[name].value + end + + end + + class Choice + fields(:name, :value) + end + + class Composite + + fields(:name, :code, :size, :pack, :fields) + + attr :parent, true + + # Python calls this 'new', but that has special meaning in Ruby + def create(*args) + return Qpid::struct(self, *args) + end + + def decode(codec) + codec.read_size(@size) + codec.read_uint16() unless @code.nil? + return Qpid::struct(self, self.decode_fields(codec)) + end + + def decode_fields(codec) + flags = 0 + pack.times {|i| flags |= (codec.read_uint8() << 8*i)} + + result = {} + + fields.each_index {|i| + f = @fields[i] + if flags & (0x1 << i) != 0 + result[f.name] = f.type.decode(codec) + else + result[f.name] = nil + end + } + + return result + end + + def encode(codec, value) + sc = Qpid::StringCodec.new(@spec) + sc.write_uint16(@code) unless @code.nil? + encode_fields(sc, value) + codec.write_size(@size, sc.encoded.size) + codec.write(sc.encoded) + end + + def encode_fields(codec, values) + # FIXME: This could be written cleaner using select + # instead of flags + flags = 0 + fields.each_index do |i| + f = fields[i] + flags |= (0x1 << i) if f.type.present?(values[f.name]) + end + + pack.times { |i| codec.write_uint8((flags >> 8*i) & 0xFF) } + + fields.each_index do |i| + f = fields[i] + f.type.encode(codec, values[f.name]) if flags & (0x1 << i) != 0 + end + end + + def inspect; name; end + + end + + class Field + + fields(:name, :type, :exceptions) + + def default() + return nil + end + + end + + class Struct < Composite + + def present?(value) + return !value.nil? + end + + end + + class Action < Composite; end + + class Control < Action + + def segment_type + @parent[:segment_type].enum[:control].value + end + + def track + @parent[:track].enum[:control].value + end + + end + + class Command < Action + + attr_accessor :payload, :result + + def segment_type + @parent["segment_type"].enum["command"].value + end + + def track + @parent["track"].enum["command"].value + end + + end + + class Doc + fields(:type, :title, :text) + end + + class Loader010 < Loader + + def initialize() + super() + end + + def klass + cls = element + until cls.nil? + break if cls.name == "class" + cls = cls.parent + end + return cls + end + + def scope + if element.name == "struct" + return nil + else + return class_name + end + end + + def class_name + cls = klass + if cls.nil? + return nil + else + return parse_name(cls.attributes["name"].strip) + end + end + + def class_code + cls = klass + if cls.nil? + return 0 + else + return parse_int(cls.attributes["code"].strip) + end + end + + def parse_decl(value) + name = parse_name(value) + + s = scope + if s.nil? + return name + else + return :"#{s}_#{name}" + end + end + + def parse_code(value) + c = parse_int(value) + if c.nil? + return nil + else + return c | (class_code << 8) + end + end + + def parse_type(value) + name = parse_name(value.sub(".", "_")) + cls = class_name + return Reference.new {|spec| + candidates = [name] + candidates << :"#{cls}_#{name}" unless cls.nil? + for c in candidates + child = spec[c] + break unless child.nil? + end + if child.nil? + raise Exception.new("unresolved type: #{name}") + else + child + end +} + end + + def load_amqp() + children = nil + + for s in ["constant", "type", "domain", "struct", "control", + "command"] + ch = load(s) + if children.nil? + children = ch + else + children += ch + end + children += load("class/#{s}") + end + children += load("class/command/result/struct") + Spec.new(attr("major", :int), attr("minor", :int), attr("port", :int), + children) + end + + def load_constant() + Constant.new(attr("name", :decl), attr("value", :int)) + end + + def load_type() + Type.new(attr("name", :decl), attr("code", :code), + attr("fixed-width", :int), attr("variable-width", :int)) + end + + def load_domain() + Domain.new(attr("name", :decl), attr("type", :type), load("enum").first) + end + + def load_enum() + Enum.new(load("choice")) + end + + def load_choice() + Choice.new(attr("name", :name), attr("value", :int)) + end + + def load_field() + Field.new(attr("name", :name), attr("type", :type)) + end + + def load_struct() + Struct.new(attr("name", :decl), attr("code", :code), attr("size", :int), + attr("pack", :int), load("field")) + end + + def load_action(cls) + cls.new(attr("name", :decl), attr("code", :code), 0, 2, load("field")) + end + + def load_control() + load_action(Control) + end + + def load_command() + result = attr("type", :type, nil, "result") + result = attr("name", :type, nil, "result/struct") if result.nil? + segs = load("segments") + cmd = load_action(Command) + cmd.result = result + cmd.payload = !segs.empty? + return cmd + end + + def load_result() + true + end + + def load_segments() + true + end + + end + + def self.spec_cache(specfile) + File::join(File::dirname(__FILE__), "spec_cache", + File::basename(specfile, ".xml") + ".rb_marshal") + end + + # XXX: could be shared + def self.load(spec = nil) + return spec if spec.is_a?(Qpid::Spec010::Spec) + if spec.nil? + # FIXME: Need to add a packaging setup in here so we know where + # the installed spec is going to be. + specfile = nil + if ENV['AMQP_SPEC'] + specfile = ENV['AMQP_SPEC'] + else + topdir = File::dirname(File::dirname(File::expand_path(__FILE__))) + specfile = File::join(topdir, "../../specs", "amqp.0-10-qpid-errata.xml") + end + else + specfile = spec + end + + unless Pathname.new(specfile).absolute? + path = ENV["AMQP_SPEC_PATH"] || AMQP_SPEC_DEFAULT_DIR + + p = path.split(File::PATH_SEPARATOR).collect { |p| + Pathname.new(p).join(specfile) + }.find { |p| p.file? } + raise "Can not find file for spec #{spec}" unless p + specfile = p.to_s + end + + specfile_cache = spec_cache(specfile) + # FIXME: Check that cache is newer than specfile + if File::exist?(specfile_cache) + begin + spec = File::open(specfile_cache, "r") do |f| + Marshal::load(f) + end + return spec + rescue + # Ignore, will load from XML + end + end + + doc = File::open(specfile, "r") { |f| Document.new(f) } + spec = Loader010.new().load(doc.root) + spec.traverse! do |o| + if o.is_a?(Reference) + o.resolve(spec) + else + o + end + end + + spec.children.each { |c| c.parent = spec } + + begin + FileUtils::mkdir_p(File::dirname(specfile_cache)) + File::open(specfile_cache, "w") { |f| Marshal::dump(spec, f) } + rescue + # Ignore, we are fine without the cached spec + end + return spec + end + +end diff --git a/qpid/ruby/lib/qpid/spec08.rb b/qpid/ruby/lib/qpid/spec08.rb new file mode 100644 index 0000000000..902c05c297 --- /dev/null +++ b/qpid/ruby/lib/qpid/spec08.rb @@ -0,0 +1,190 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "qpid/spec" + +module Qpid08 + + module Spec + + include Qpid::Spec + + # XXX: workaround for ruby bug/missfeature + Reference = Reference + + class Root + fields(:major, :minor, :classes, :constants, :domains) + + def find_method(name) + classes.each do |c| + c.methods.each do |m| + if name == m.qname + return m + end + end + end + + return nil + end + end + + class Constant + fields(:name, :id, :type, :docs) + end + + class Domain + fields(:name, :type) + end + + class Class + fields(:name, :id, :handler, :fields, :methods, :docs) + end + + class Method + fields(:name, :id, :content?, :responses, :synchronous?, :fields, + :docs) + + def init() + @response = false + end + + attr :parent, true + + def response?; @response end + def response=(b); @response = b end + + def qname + :"#{parent.name}_#{name}" + end + end + + class Field + fields(:name, :id, :type, :docs) + + def default + case type + when :bit then false + when :octet, :short, :long, :longlong then 0 + when :shortstr, :longstr then "" + when :table then {} + end + end + end + + class Doc + fields(:type, :text) + end + + class Container08 < Container + def do_lookup(key) + case key + when Integer + return find {|x| x.id == key} + else + return super(key) + end + end + end + + class Loader08 < Loader + + def container() + return Container08.new() + end + + def load_amqp() + Root.new(attr("major", :int), attr("minor", :int), load("class"), + load("constant"), load("domain")) + end + + def load_class() + Class.new(attr("name", :name), attr("index", :int), attr("handler", :name), + load("field"), load("method"), load("doc")) + end + + def load_method() + Method.new(attr("name", :name), attr("index", :int), + attr("content", :bool), load("response"), + attr("synchronous", :bool), load("field"), load("docs")) + end + + def load_response() + name = attr("name", :name) + Reference.new {|spec, klass| + response = klass.methods[name] + if response.nil? + raise Exception.new("no such method: #{name}") + end + response + } + end + + def load_field() + type = attr("type", :name) + if type.nil? + domain = attr("domain", :name) + type = Reference.new {|spec, klass| + spec.domains[domain].type + } + end + Field.new(attr("name", :name), @index, type, load("docs")) + end + + def load_constant() + Constant.new(attr("name", :name), attr("value", :int), attr("class", :name), + load("doc")) + end + + def load_domain() + Domain.new(attr("name", :name), attr("type", :name)) + end + + def load_doc() + Doc.new(attr("type", :symbol), text) + end + + end + + def self.load(spec) + case spec + when String + spec = File.new(spec) + end + doc = Document.new(spec) + spec = Loader08.new().load(doc.root) + spec.classes.each do |klass| + klass.traverse! do |o| + case o + when Reference + o.resolve(spec, klass) + else + o + end + end + klass.methods.each do |m| + m.parent = klass + m.responses.each do |r| + r.response = true + end + end + end + return spec + end + end +end diff --git a/qpid/ruby/qpid/test.rb b/qpid/ruby/lib/qpid/test.rb index f8107143ab..2e643f4348 100644 --- a/qpid/ruby/qpid/test.rb +++ b/qpid/ruby/lib/qpid/test.rb @@ -1,5 +1,3 @@ - - # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -17,12 +15,11 @@ # specific language governing permissions and limitations # under the License. # -#/ -require "qpid/spec" +require "qpid/spec08" require "qpid/client" -module Qpid +module Qpid08 module Test diff --git a/qpid/ruby/qpid/traverse.rb b/qpid/ruby/lib/qpid/traverse.rb index 67358a7eb1..67358a7eb1 100644 --- a/qpid/ruby/qpid/traverse.rb +++ b/qpid/ruby/lib/qpid/traverse.rb diff --git a/qpid/ruby/lib/qpid/util.rb b/qpid/ruby/lib/qpid/util.rb new file mode 100644 index 0000000000..2dbc37da09 --- /dev/null +++ b/qpid/ruby/lib/qpid/util.rb @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'thread' +require 'monitor' + +# Monkeypatch +class MonitorMixin::ConditionVariable + + # Wait until BLOCK returns TRUE or TIMEOUT seconds have passed + # Return TRUE if BLOCK returned TRUE within the TIMEOUT, FALSE + # otherswise + def wait_for(timeout=nil, &block) + start = Time.now + passed = 0 + until yield + if timeout.nil? + wait + elsif passed < timeout + wait(timeout) + else + return false + end + passed = Time.now - start + end + return true + end +end + +module Qpid::Util + + # Similar to Python's threading.Event + class Event + def initialize + @monitor = Monitor.new + @cond = @monitor.new_cond + @set = false + end + + def set + @monitor.synchronize do + @set = true + @cond.signal + end + end + + def clear + @monitor.synchronize { @set = false } + end + + def wait(timeout = nil) + @monitor.synchronize do + unless @set + @cond.wait_for(timeout) { @set } + end + end + end + end +end diff --git a/qpid/ruby/qpid/queue.rb b/qpid/ruby/qpid/queue.rb deleted file mode 100644 index 350310882f..0000000000 --- a/qpid/ruby/qpid/queue.rb +++ /dev/null @@ -1,52 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -require "thread" - -module Qpid - - class Closed < Exception; end - - class Queue < Queue - - @@END = Object.new() - - def close() - # sentinal to indicate the end of the queue - self << @@END - end - - def pop(*args) - result = super(*args) - if @@END.equal? result - # we put another sentinal on the end in case there are - # subsequent calls to pop by this or other threads - self << @@END - raise Closed.new() - else - return result - end - end - - alias shift pop - alias deq pop - - end - -end diff --git a/qpid/ruby/qpid/spec.rb b/qpid/ruby/qpid/spec.rb deleted file mode 100644 index 9a04f584d0..0000000000 --- a/qpid/ruby/qpid/spec.rb +++ /dev/null @@ -1,289 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -require "set" -require "rexml/document" -require "qpid/fields" -require "qpid/traverse" - -module Spec - - include REXML - - class Container < Array - - def initialize() - @cache = {} - end - - def [](key) - return @cache[key] if @cache.include?(key) - - case key - when String - value = find {|x| x.name == key.intern()} - when Symbol - value = find {|x| x.name == key} - when Integer - value = find {|x| x.id == key} - else - raise Exception.new("invalid key: #{key}") - end - - @cache[key] = value - return value - end - - end - - class Root - fields(:major, :minor, :classes, :constants, :domains) - - def find_method(name) - classes.each do |c| - c.methods.each do |m| - if name == m.qname - return m - end - end - end - - return nil - end - end - - class Constant - fields(:name, :id, :type, :docs) - end - - class Domain - fields(:name, :type) - end - - class Class - fields(:name, :id, :handler, :fields, :methods, :docs) - end - - class Method - fields(:name, :id, :content?, :responses, :synchronous?, :fields, - :docs) - - def init() - @response = false - end - - attr :parent, true - - def response?; @response end - def response=(b); @response = b end - - def qname - :"#{parent.name}_#{name}" - end - end - - class Field - fields(:name, :id, :type, :docs) - - def default - case type - when :bit then false - when :octet, :short, :long, :longlong then 0 - when :shortstr, :longstr then "" - when :table then {} - end - end - - end - - class Doc - fields(:type, :text) - end - - class Reference - - fields(:name) - - def init(&block) - @resolver = block - end - - def resolve(spec, klass) - @resolver.call(spec, klass) - end - - end - - class Loader - - def initialize() - @stack = [] - end - - def load(obj) - case obj - when String - elem = @stack[-1] - result = Container.new() - elem.elements.each(obj) {|e| - @index = result.size - result << load(e) - } - @index = nil - return result - else - elem = obj - @stack << elem - begin - result = send(:"load_#{elem.name}") - ensure - @stack.pop() - end - return result - end - end - - def element - @stack[-1] - end - - def text - element.text - end - - def attr(name, type = :string, default = nil) - value = element.attributes[name] - value = value.strip() unless value.nil? - value = nil unless value.nil? or value.any? - if value.nil? and not default.nil? then - default - else - send(:"parse_#{type}", value) - end - end - - def parse_int(value) - value.to_i - end - - TRUE = ["yes", "true", "1"].to_set - FALSE = ["no", "false", "0", nil].to_set - - def parse_bool(value) - if TRUE.include?(value) - true - elsif FALSE.include?(value) - false - else - raise Exception.new("parse error, expecting boolean: #{value}") - end - end - - def parse_string(value) - value.to_s - end - - def parse_symbol(value) - value.intern() unless value.nil? - end - - def parse_name(value) - value.gsub(/[\s-]/, '_').intern() unless value.nil? - end - - def load_amqp() - Root.new(attr("major", :int), attr("minor", :int), load("class"), - load("constant"), load("domain")) - end - - def load_class() - Class.new(attr("name", :name), attr("index", :int), attr("handler", :name), - load("field"), load("method"), load("doc")) - end - - def load_method() - Method.new(attr("name", :name), attr("index", :int), - attr("content", :bool), load("response"), - attr("synchronous", :bool), load("field"), load("docs")) - end - - def load_response() - name = attr("name", :name) - Reference.new {|spec, klass| - response = klass.methods[name] - if response.nil? - raise Exception.new("no such method: #{name}") - end - response - } - end - - def load_field() - type = attr("type", :name) - if type.nil? - domain = attr("domain", :name) - type = Reference.new {|spec, klass| - spec.domains[domain].type - } - end - Field.new(attr("name", :name), @index, type, load("docs")) - end - - def load_constant() - Constant.new(attr("name", :name), attr("value", :int), attr("class", :name), - load("doc")) - end - - def load_domain() - Domain.new(attr("name", :name), attr("type", :name)) - end - - def load_doc() - Doc.new(attr("type", :symbol), text) - end - - end - - def Spec.load(spec) - case spec - when String - spec = File.new(spec) - end - doc = Document.new(spec) - spec = Loader.new().load(doc.root) - spec.classes.each do |klass| - klass.traverse! do |o| - case o - when Reference - o.resolve(spec, klass) - else - o - end - end - klass.methods.each do |m| - m.parent = klass - m.responses.each do |r| - r.response = true - end - end - end - spec - end - -end diff --git a/qpid/ruby/tests/assembler.rb b/qpid/ruby/tests/assembler.rb new file mode 100644 index 0000000000..1181ece547 --- /dev/null +++ b/qpid/ruby/tests/assembler.rb @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid" +require 'tests/util' + +require 'logger' + +class TestAssembler< Test::Unit::TestCase + + Segment = Qpid::Segment + Assembler = Qpid::Assembler + + def setup + # Qpid::asm_logger = Logger.new(STDOUT) + + @server = Util::ServerThread.new do |socket| + asm = Assembler.new(socket) + begin + header = asm.read_header + asm.write_header(header[-2], header[-1]) + loop do + seg = asm.read_segment + asm.write_segment(seg) + end + rescue Qpid::Closed + nil # Ignore + end + end + end + + def teardown + @server.finish + @server.join + end + + def test_assembler + asm = Assembler.new(@server.client, max_payload = 1) + asm.write_header(0, 10) + asm.write_segment(Segment.new(true, false, 1, 2, 3, "TEST")) + asm.write_segment(Segment.new(false, true, 1, 2, 3, "ING")) + + assert_equal( ["AMQP", 1, 1, 0, 10], asm.read_header) + + seg = asm.read_segment + assert_equal(true, seg.first_segment?) + assert_equal(false, seg.last_segment?) + assert_equal(1, seg.type) + assert_equal(2, seg.track) + assert_equal(3, seg.channel) + assert_equal("TEST", seg.payload) + + seg = asm.read_segment + assert_equal(false, seg.first_segment?) + assert_equal(true, seg.last_segment?) + assert_equal(1, seg.type) + assert_equal(2, seg.track) + assert_equal(3, seg.channel) + assert_equal("ING", seg.payload) + end +end diff --git a/qpid/ruby/tests/codec010.rb b/qpid/ruby/tests/codec010.rb new file mode 100644 index 0000000000..a9a5ca81e0 --- /dev/null +++ b/qpid/ruby/tests/codec010.rb @@ -0,0 +1,122 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid" +require "tests/util" +require "socket" + +class CodecTest < Test::Unit::TestCase + + def setup + @spec = Qpid::Spec010::load + end + + def check(type, value) + t = @spec[type] + sc = Qpid::StringCodec.new(@spec) + t.encode(sc, value) + decoded = t.decode(sc) + assert_equal(value, decoded) + end + + + def testMapString + check("map", {"string" => "this is a test"}) + end + + def testMapInt + check("map", {"int" => 3}) + end + + def testMapLong + check("map", {"long" => 2**32}) + end + + def testMapNone + check("map", {"none" => None}) + end + + def testMapNested + check("map", {"map" => {"string" => "nested test"}}) + end + + def testMapList + check("map", {"list" => [1, "two", 3.0, -4]}) + end + + def testMapAll + check("map", {"string" => "this is a test", + "int" => 3, + "long" => 2**32, + "nil" => nil, + "map" => {"string" => "nested map"}, + "list" => [1, "two", 3.0, -4]}) + end + + def testMapEmpty + check("map", {}) + end + + def testMapNone + check("map", nil) + end + + def testList + check("list", [1, "two", 3.0, -4]) + end + + def testListEmpty + check("list", []) + end + + def testListNone + check("list", nil) + end + + def testArrayInt + check("array", [1, 2, 3, 4]) + end + + def testArrayString + check("array", ["one", "two", "three", "four"]) + end + + def testArrayEmpty + check("array", []) + end + + def testArrayNone + check("array", nil) + end + + def testInt64 + check("int64", 2 ** 40 * -1 + 43) + end + + def testUint64 + check("int64", 2 ** 42) + end + + def testReadNone + sc = Qpid::StringCodec.new(@spec) + # Python behaves this way + assert_equal("", sc.read(nil)) + end +end diff --git a/qpid/ruby/tests/connection.rb b/qpid/ruby/tests/connection.rb new file mode 100644 index 0000000000..ab892d8e53 --- /dev/null +++ b/qpid/ruby/tests/connection.rb @@ -0,0 +1,235 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'test/unit' +require 'qpid' +require 'tests/util' + +class MockServer + + def initialize(queue) + @queue = queue + end + + def connection(conn, args={}) + return Qpid::Delegate::Server.new(conn, :delegate => method(:session)) + end + + def session(ssn, args={}) + ssn.auto_sync = false + return MockSession.new(ssn, @queue) + end +end + +class MockSession < Qpid::Session::Delegate + + def initialize(session, queue) + @session = session + @queue = queue + end + + def execution_sync(es) + nil + end + + def queue_query(qq) + return qq.type.result.create(qq.queue) + end + + def message_transfer(cmd, headers, body) + if cmd.destination == "echo" + m = Qpid::Message.new(body) + m.headers = headers + @session.message_transfer(cmd.destination, cmd.accept_mode, + cmd.acquire_mode, m) + elsif cmd.destination == "abort" + @session.channel.connection.sock.close() + else + @queue.put([cmd, headers, body]) + end + end +end + +class TestConnectionTest < Test::Unit::TestCase + + def setup + # Make sure errors in threads lead to a noisy death of the test + Thread.abort_on_exception = true + + @queue = Qpid::Queue.new + @running = true + ts = MockServer.new(@queue) + @server = Util::ServerThread.new do |socket| + conn = Qpid::Connection.new(socket, :delegate => ts.method(:connection)) + begin + conn.start(5) + rescue Qpid::Closed + # Ignore + end + end + + class << @server + def finish + @running.lock + client.close + @sockets.each { |sock| sock.close unless sock.closed? } + end + end + + @server[:name] = 'server' + Thread.current[:name] = 'test' + end + + def teardown + @server.finish + @server.join + end + + def connect + sock = @server.client + return Qpid::Connection.new(sock) + end + + def test_basic + c = connect + c.start(10) + + ssn1 = c.session("test1", :timeout => 10) + ssn2 = c.session("test2", :timeout => 10) + + assert_equal(c.sessions["test1"], ssn1) + assert_equal(c.sessions["test2"], ssn2) + assert_not_nil ssn1.channel + assert_not_nil ssn2.channel + assert(c.attached.values.include?(ssn1)) + assert(c.attached.values.include?(ssn2)) + + ssn1.close(5) + + assert_nil(ssn1.channel) + assert(! c.attached.values.include?(ssn1)) + assert(c.sessions.values.include?(ssn2)) + + ssn2.close(5) + + assert_nil(ssn2.channel) + assert(! c.attached.values.include?(ssn2)) + assert(! c.sessions.values.include?(ssn2)) + + ssn = c.session("session", :timeout => 10) + + assert_not_nil(ssn.channel) + assert(c.sessions.values.include?(ssn)) + + destinations = ["one", "two", "three"] + + destinations.each { |d| ssn.message_transfer(d) } + + destinations.each do |d| + cmd, header, body = @queue.get(10) + assert_equal(d, cmd.destination) + assert_nil(header) + assert_nil(body) + end + + msg = Qpid::Message.new("this is a test") + ssn.message_transfer("four", :message => msg) + cmd, header, body = @queue.get(10) + assert_equal("four", cmd.destination) + assert_nil(header) + assert_equal(msg.body, body) + + qq = ssn.queue_query("asdf") + assert_equal("asdf", qq.queue) + c.close(5) + end + + def test_close_get + c = connect + c.start(10) + ssn = c.session("test", :timeout => 10) + echos = ssn.incoming("echo") + + 10.times do |i| + ssn.message_transfer("echo", + :message => Qpid::Message.new("test#{i}")) + end + + ssn.auto_sync=false + ssn.message_transfer("abort") + + 10.times do |i| + m = echos.get(timeout=10) + assert_equal("test#{i}", m.body) + end + + begin + m = echos.get(timeout=10) + flunk("Expected Closed") + rescue Qpid::Closed + # Ignore + end + end + + def test_close_listen + c = connect + c.start(10) + ssn = c.session("test", :timeout => 10) + echos = ssn.incoming("echo") + + messages = [] + exceptions = [] + lock = Monitor.new + condition = lock.new_cond + + echos.exc_listen do |e| + exceptions << e + lock.synchronize { condition.signal } + end + echos.listen do |m| + messages << m + end + + 10.times do |i| + ssn.message_transfer("echo", + :message => Qpid::Message.new("test#{i}")) + end + ssn.auto_sync=false + ssn.message_transfer("abort") + + lock.synchronize { condition.wait(10) } + + 10.times do |i| + m = messages.shift + assert_equal("test#{i}", m.body) + end + + assert_equal(1, exceptions.size) + end + + def test_sync + c = connect + c.start(10) + s = c.session("test") + s.auto_sync = false + s.message_transfer("echo", + :message => Qpid::Message.new("test")) + s.sync(10) + end +end diff --git a/qpid/ruby/tests/datatypes.rb b/qpid/ruby/tests/datatypes.rb new file mode 100644 index 0000000000..65b1f9e3f5 --- /dev/null +++ b/qpid/ruby/tests/datatypes.rb @@ -0,0 +1,224 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'test/unit' +require 'qpid' +require 'tests/util' + +class TestSerial < Test::Unit::TestCase + + def test_cmp + [0, 0x8FFFFFFF, 0xFFFFFFFF].each do |s| + s = s.to_serial + assert(s + 1 > s) + assert(s - 1 < s) + assert(s < s + 1) + assert(s > s - 1) + end + last = 0xFFFFFFFF.to_serial + zero = 0.to_serial + assert_equal(zero, last + 1) + + assert_equal(last, [last, zero].min) + assert_equal(zero, [last, zero].max) + end + + def test_incr + s = 0.to_serial + s += 1 + assert_equal(1.to_serial, s) + end + + def test_in + l = [1, 2, 3, 4].collect { |i| i.to_serial } + assert(l.include?(1.to_serial)) + assert(l.include?((0xFFFFFFFF + 2).to_serial)) + assert(l.include?(4)) + end + + def test_none + assert_not_equal(nil, 0.to_serial) + end + + def test_hash + zero = 0.to_serial + d = { zero => :zero } + # FIXME: this does not work, since Ruby looks up the key and does + # a 0.eql?(zero), which bypasses the Qpid::Serial::eql? + # assert_equal(:zero, d[0]) + end +end + +class TestRangedSet < Test::Unit::TestCase + + def assert_contains(rset, elts, nonelts = []) + assert_equal(elts, elts.select { |e| rset.include?(e) }) + assert_equal(nonelts, nonelts.select { |e| ! rset.include?(e) }) + end + + def assert_ranges(rs, *ranges) + assert_equal(ranges.size, rs.ranges.size) + assert( ranges.all? { |rng| rs.include?(rng) } ) + end + + def test_simple + rs = Qpid::RangedSet.new + + assert(rs.ranges.empty?) + + rs.add(1) + assert_contains(rs, [1], [0,2]) + assert_ranges(rs, 1..1) + + rs.add(2) + assert_contains(rs, [1,2], [0,3]) + assert_ranges(rs, 1..2) + + rs.add(0) + assert_contains(rs, [0,1,2], [-1, 3]) + assert_ranges(rs, 0..2) + + rs.add(37) + assert_contains(rs, [0,1,2,37], [-1, 3, 36, 38]) + assert_ranges(rs, 0..2, 37..37) + + rs.add(-1) + assert_ranges(rs, -1..2, 37..37) + + rs.add(-3) + assert_ranges(rs, -1..2, 37..37, -3..-3) + + rs.add(1, 20) + assert_contains(rs, [20], [21]) + assert_ranges(rs, -1..20, 37..37, -3..-3) + + rs.add(21,36) + assert_ranges(rs, -1..37, -3..-3) + + rs.add(-3, 5) + assert_ranges(rs, -3..37) + end + + def test_add_self + a = Qpid::RangedSet.new + a.add(0, 8) + assert_ranges(a, 0..8) + + a.add(0, 8) + assert_ranges(a, 0..8) + end +end + +class TestRange < Test::Unit::TestCase + + def test_intersect1 + a = Range.new(0, 10) + b = Range.new(9, 20) + i1 = a.intersect(b) + i2 = b.intersect(a) + assert_equal(9..10, i1) + assert_equal(9..10, i2) + end + + def test_intersect2 + a = Range.new(0, 10) + b = Range.new(11, 20) + assert_equal(nil, a.intersect(b)) + assert_equal(nil, b.intersect(a)) + end + + def test_intersect3 + a = Range.new(0, 10) + b = Range.new(3, 5) + i1 = a.intersect(b) + i2 = b.intersect(a) + assert_equal(3..5, i1) + assert_equal(3..5, i2) + end +end + +class TestUUIDTest < Test::Unit::TestCase + + def test_simple + # this test is kind of lame, but it does excercise the basic + # functionality of the class + u = Qpid::UUID::uuid4 + 1024.times { |i| assert_not_equal(u, Qpid::UUID::uuid4) } + assert_raise NotImplementedError do + u == 0 + end + end +end + +class TestMessage < Test::Unit::TestCase + + def setup + @@spec ||= Qpid::Spec010::load() + @mp = Qpid::struct(@@spec["message_properties"]) + @dp = Qpid::struct(@@spec["delivery_properties"]) + @fp = Qpid::struct(@@spec["fragment_properties"]) + end + + def test_has + m = Qpid::Message.new(@mp, @dp, @fp, "body") + assert m.has("message_properties") + assert m.has("delivery_properties") + assert m.has("fragment_properties") + end + + def test_get + m = Qpid::Message.new(@mp, @dp, @fp, "body") + assert_same(@mp, m.get("message_properties")) + assert_same(@dp, m.get("delivery_properties")) + assert_same(@fp, m.get("fragment_properties")) + end + + def test_set + m = Qpid::Message.new(@mp, @dp, "body") + assert_nil m.get("fragment_properties") + m.set(@fp) + assert_same(@fp, m.get("fragment_properties"), "4") + end + + def test_set_on_empty + m = Qpid::Message.new("body") + assert_nil m.get("delivery_properties") + m.set(@dp) + assert_same(@dp, m.get("delivery_properties"), "5") + end + + def test_set_replace + m = Qpid::Message.new(@mp, @dp, @fp, "body") + dp = Qpid::struct(@@spec["delivery_properties"]) + assert_same(@dp, m.get("delivery_properties"), "6") + m.set(dp) + assert_same(dp, m.get("delivery_properties"), "7") + end + + def test_clear + m = Qpid::Message.new(@mp, @dp, @fp, "body") + assert_same(@mp, m.get("message_properties"), "8") + assert_same(@dp, m.get("delivery_properties"), "9") + assert_same(@fp, m.get("fragment_properties"), "10") + m.clear("fragment_properties") + assert_nil m.get("fragment_properties") + assert_same(@mp, m.get("message_properties"), "11") + assert_same(@dp, m.get("delivery_properties"), "12") + end +end diff --git a/qpid/ruby/tests/framer.rb b/qpid/ruby/tests/framer.rb new file mode 100644 index 0000000000..1d56f2faf1 --- /dev/null +++ b/qpid/ruby/tests/framer.rb @@ -0,0 +1,99 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid" +require 'tests/util' + +require 'logger' + +class TestFramer < Test::Unit::TestCase + + include Test + + def setup + #Qpid::raw_logger = Logger.new(STDOUT) + #Qpid::frm_logger = Logger.new(STDOUT) + + @server = Util::ServerThread.new do |socket| + conn = Qpid::Framer.new(socket) + begin + h = conn.read_header + conn.write_header(h[-2], h[-1]) + loop do + frame = conn.read_frame + conn.write_frame(frame) + conn.flush + end + rescue Qpid::Closed + nil # Ignore + end + end + end + + def teardown + @server.finish + @server.join + end + + Frame = Qpid::Frame + + def test_framer + c = Qpid::Framer.new(@server.client) + + c.write_header(0, 10) + assert_equal( ["AMQP", 1, 1, 0, 10], c.read_header()) + + c.write_frame(Frame.new(Qpid::FIRST_FRM, 1, 2, 3, "THIS")) + c.write_frame(Frame.new(0, 1, 2, 3, "IS")) + c.write_frame(Frame.new(0, 1, 2, 3, "A")) + c.write_frame(Frame.new(Qpid::LAST_FRM, 1, 2, 3, "TEST")) + c.flush() + + f = c.read_frame + assert(f.first_frame?) + assert(! f.last_frame?) + assert_equal(1, f.type) + assert_equal(2, f.track) + assert_equal(3, f.channel) + assert_equal("THIS", f.payload) + + f = c.read_frame + assert_equal(0, f.flags) + assert_equal(1, f.type) + assert_equal(2, f.track) + assert_equal(3, f.channel) + assert_equal("IS", f.payload) + + f = c.read_frame + assert_equal(0, f.flags) + assert_equal(1, f.type) + assert_equal(2, f.track) + assert_equal(3, f.channel) + assert_equal("A", f.payload) + + f = c.read_frame + assert(f.last_frame?) + assert(! f.first_frame?) + assert_equal(1, f.type) + assert_equal(2, f.track) + assert_equal(3, f.channel) + assert_equal("TEST", f.payload) + end +end diff --git a/qpid/ruby/tests/qmf.rb b/qpid/ruby/tests/qmf.rb new file mode 100644 index 0000000000..75250a7938 --- /dev/null +++ b/qpid/ruby/tests/qmf.rb @@ -0,0 +1,187 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid" +require "tests/util" +require "socket" + +class QmfTest < Test::Unit::TestCase + + def setup() + # Make sure errors in threads lead to a noisy death of the test + Thread.abort_on_exception = true + + sock = TCPSocket.new('localhost', 5672) + + @conn = Qpid::Connection.new(sock) + @conn.start() + + @session = @conn.session("test-session") + + # It's a bit odd that we're using two connections but that's the way + # the python one works afaict. + @qmf = Qpid::Qmf::Session.new() + @qmf_broker = @qmf.add_broker("amqp://localhost:5672") + + brokers = @qmf.objects(:class => "broker") + assert_equal(1, brokers.length) + @broker = brokers[0] + end + + def teardown + unless @session.error? + @session.close(10) + end + @conn.close(10) + if @qmf + @qmf.del_broker(@qmf_broker) + end + end + + def test_broker_connectivity() + body = "Echo Message Body" + for seq in 1..10 + res = @broker.echo(seq, body) + assert_equal(0, res.status) + assert_equal("OK", res.text) + assert_equal(seq, res.sequence) + assert_equal(body, res.body) + end + end + + def test_move_queued_messages() + """ + Test ability to move messages from the head of one queue to another. + Need to test moveing all and N messages. + """ + + "Set up source queue" + @session.queue_declare(:queue => "src-queue", :exclusive => true, :auto_delete => true) + @session.exchange_bind(:queue => "src-queue", :exchange => "amq.direct", :binding_key => "routing_key") + + props = @session.delivery_properties(:routing_key => "routing_key") + for count in 1..20 + body = "Move Message %d" % count + src_msg = Qpid::Message.new(props, body) + @session.message_transfer(:destination => "amq.direct", :message => src_msg) + end + + "Set up destination queue" + @session.queue_declare(:queue => "dest-queue", :exclusive => true, :auto_delete => true) + @session.exchange_bind(:queue => "dest-queue", :exchange => "amq.direct") + + queues = @qmf.objects(:class => "queue") + + "Move 10 messages from src-queue to dest-queue" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) + assert_equal(0, result.status) + + sq = @qmf.objects(:class => "queue", "name" => "src-queue")[0] + dq = @qmf.objects(:class => "queue", "name" => "dest-queue")[0] + + assert_equal(10, sq.msgDepth) + assert_equal(10, dq.msgDepth) + + "Move all remaining messages to destination" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) + assert_equal(0, result.status) + + sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0] + dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0] + + assert_equal(0, sq.msgDepth) + assert_equal(20, dq.msgDepth) + + "Use a bad source queue name" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) + assert_equal(4, result.status) + + "Use a bad destination queue name" + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) + assert_equal(4, result.status) + + " Use a large qty (40) to move from dest-queue back to " + " src-queue- should move all " + result = @qmf.objects(:class => "broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) + assert_equal(0, result.status) + + sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0] + dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0] + + assert_equal(20, sq.msgDepth) + assert_equal(0, dq.msgDepth) + + "Consume the messages of the queue and check they are all there in order" + @session.message_subscribe(:queue => "src-queue", + :destination => "tag") + @session.message_flow(:destination => "tag", + :unit => @session.message_credit_unit.message, + :value => 0xFFFFFFFF) + @session.message_flow(:destination => "tag", + :unit => @session.message_credit_unit.byte, + :value => 0xFFFFFFFF) + queue = @session.incoming("tag") + for count in 1..20 + consumed_msg = queue.get(timeout=1) + body = "Move Message %d" % count + assert_equal(body, consumed_msg.body) + end + end + + # Test ability to purge messages from the head of a queue. Need to test + # moveing all, 1 (top message) and N messages. + def test_purge_queue + # Set up purge queue" + @session.queue_declare(:queue => "purge-queue", + :exclusive => true, + :auto_delete => true) + @session.exchange_bind(:queue => "purge-queue", + :exchange => "amq.direct", + :binding_key => "routing_key") + + props = @session.delivery_properties(:routing_key => "routing_key") + 20.times do |count| + body = "Purge Message %d" % count + msg = Qpid::Message.new(props, body) + @session.message_transfer(:destination => "amq.direct", + :message => msg) + end + + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + + "Purge top message from purge-queue" + result = pq.purge(1) + assert_equal(0, result.status) + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + assert_equal(19, pq.msgDepth) + + "Purge top 9 messages from purge-queue" + result = pq.purge(9) + assert_equal(0, result.status) + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + assert_equal(10, pq.msgDepth) + + "Purge all messages from purge-queue" + result = pq.purge(0) + assert_equal(0, result.status) + pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0] + assert_equal(0, pq.msgDepth) + end +end diff --git a/qpid/ruby/tests/queue.rb b/qpid/ruby/tests/queue.rb new file mode 100644 index 0000000000..4ec0e07ffb --- /dev/null +++ b/qpid/ruby/tests/queue.rb @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'test/unit' +require 'qpid' + +class TestQueue < Test::Unit::TestCase + + # The qpid queue class just provides sime simple extensions to + # python's standard queue data structure, so we don't need to test + # all the queue functionality. + + def setup + # Make sure errors in threads lead to a noisy death of the test + Thread.abort_on_exception = true + end + + def test_listen + values = [] + heard = Qpid::Util::Event.new + + listener = Proc.new do |x| + values << x + heard.set + end + + q = Qpid::Queue.new + q.listen(&listener) + + heard.clear + q.put(1) + heard.wait + assert_equal([1], values) + heard.clear + q.put(2) + heard.wait + assert_equal([1, 2], values) + + q.listen + q.put(3) + assert_equal(3, q.get) + + q.listen(&listener) + heard.clear + q.put(4) + heard.wait + assert_equal([1,2,4], values) + end + + def test_close + q = Qpid::Queue.new + (1..3).each { |i| q.put(i) } + q.close + assert_equal(1, q.get) + assert_equal(2, q.get) + assert_equal(3, q.get) + 10.times do |i| + assert_raises(Qpid::Closed) do + q.get + end + end + end + +end diff --git a/qpid/ruby/tests/spec010.rb b/qpid/ruby/tests/spec010.rb new file mode 100644 index 0000000000..6db1523455 --- /dev/null +++ b/qpid/ruby/tests/spec010.rb @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "test/unit" +require "qpid/test" +require "qpid/spec010" + +class SpecTest < Test::Unit::TestCase + + def setup() + @spec = Qpid::Spec010.load() + end + + def testSessionHeader() + hdr = @spec[:header] + sc = Qpid::StringCodec.new(@spec) + hdr.encode(sc, Qpid::struct(hdr, :sync=>true)) + assert sc.encoded == "\x01\x01" + + sc = Qpid::StringCodec.new(@spec) + hdr.encode(sc, Qpid::struct(hdr, :sync=>false)) + assert sc.encoded == "\x01\x00" + end + + def encdec(type, value) + sc = Qpid::StringCodec.new(@spec) + type.encode(sc, value) + decoded = type.decode(sc) + return decoded + end + + def testMessageProperties() + mp = @spec[:message_properties] + rt = @spec[:reply_to] + + props = Qpid::struct(mp, + :content_length=>3735928559, + :reply_to=>Qpid::struct(rt, + :exchange=>"the exchange name", + :routing_key=>"the routing key")) + dec = encdec(mp, props) + assert props.content_length == dec.content_length + assert props.reply_to.exchange == dec.reply_to.exchange + assert props.reply_to.routing_key == dec.reply_to.routing_key + end + + def testMessageSubscribe() + ms = @spec[:message_subscribe] + cmd = Qpid::struct(ms, :exclusive=>true, :destination=>"this is a test") + dec = encdec(@spec[:message_subscribe], cmd) + assert cmd.exclusive == dec.exclusive + assert cmd.destination == dec.destination + end + + def testXid() + xid = @spec[:xid] + sc = Qpid::StringCodec.new(@spec) + st = Qpid::struct(xid, :format=>0, :global_id=>"gid", :branch_id=>"bid") + xid.encode(sc, st) + assert sc.encoded == "\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid" + assert xid.decode(sc) == st + end + +end diff --git a/qpid/ruby/tests/util.rb b/qpid/ruby/tests/util.rb new file mode 100644 index 0000000000..b22a6bab2f --- /dev/null +++ b/qpid/ruby/tests/util.rb @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'thread' +require 'socket' + +module Util + + TOPDIR = File::dirname(File::dirname(File::expand_path(__FILE__))) + SPEC = File::join(TOPDIR, "specs", "amqp.0-10-qpid-errata.xml") + + PORT = 1234 + HOST = "0.0.0.0" + + def self.connect(host = HOST, port = PORT) + TCPSocket.new(host, port) + end + + class ServerThread < Thread + def initialize(&block) + @sockets = [] + @running = Mutex.new + started = Qpid::Util::Event.new + super(started, @running) do |started, running| + tcp_srv = TCPServer.new(HOST, PORT) + begin + started.set + while ! running.locked? and (session = tcp_srv.accept) + yield(session) + end + rescue Exception => e + # Exceptions in the server thread are hard to see + # Make sure they apear loudly on the console + $stderr.puts "#{ "*" * 20} Server exception #{ "*" * 20}" + $stderr.puts e.message + $stderr.puts e.backtrace + raise + ensure + tcp_srv.close + end + end + started.wait + end + + def finish + @running.lock + @sockets.each { |sock| sock.close unless sock.closed? } + end + + def client(host = HOST, port = PORT) + sock = Util::connect(host, port) + @sockets << sock + sock + end + end +end diff --git a/qpid/ruby/tests/basic.rb b/qpid/ruby/tests_0-8/basic.rb index 0018050fe2..10a43b1aab 100644 --- a/qpid/ruby/tests/basic.rb +++ b/qpid/ruby/tests_0-8/basic.rb @@ -23,13 +23,13 @@ require "qpid" class Basic < Test::Unit::TestCase - include Qpid::Test + include Qpid08::Test def publish(body, headers = {}) cli = connect() ch = cli.channel(1) ch.channel_open() - content = Qpid::Content.new(headers, body) + content = Qpid08::Content.new(headers, body) ch.basic_publish(:content => content) msg = ch.channel_close() assert msg.method.qname == :channel_close_ok @@ -42,7 +42,7 @@ class Basic < Test::Unit::TestCase ch.queue_declare(:queue => "test-queue") ch.queue_bind(:queue_name => "test-queue") ch.basic_consume(:queue => "test-queue", :consumer_tag => "ctag") - content = Qpid::Content.new(headers, body) + content = Qpid08::Content.new(headers, body) ch.basic_publish(:routing_key => "test-queue", :content => content) queue = cli.queue("ctag") msg = queue.pop() diff --git a/qpid/ruby/tests/channel.rb b/qpid/ruby/tests_0-8/channel.rb index 31c5f19d92..1eea8f18d9 100644 --- a/qpid/ruby/tests/channel.rb +++ b/qpid/ruby/tests_0-8/channel.rb @@ -23,7 +23,7 @@ require "qpid" class Channel < Test::Unit::TestCase - include Qpid::Test + include Qpid08::Test def test_channel_open_close() c = connect() |