summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-12-27 13:10:19 +0000
committerRafael H. Schloming <rhs@apache.org>2009-12-27 13:10:19 +0000
commitba8f354ad9821fd433bddbb88f59fa318964fa3a (patch)
treea004039f15ea31a163f6baad5f8c3f5a7d1701dd
parent3c49f0c692eb87f5f6eea4f5f8a168acd8efe5ad (diff)
downloadqpid-python-ba8f354ad9821fd433bddbb88f59fa318964fa3a.tar.gz
synced ruby with trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@894062 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--ruby/Makefile47
-rw-r--r--ruby/README.txt22
-rw-r--r--ruby/RELEASE_NOTES24
-rw-r--r--ruby/Rakefile114
-rw-r--r--ruby/examples/hello-world.rb61
-rw-r--r--ruby/examples/qmf-libvirt.rb80
-rw-r--r--ruby/ext/sasl/extconf.rb10
-rw-r--r--ruby/ext/sasl/sasl.c472
-rw-r--r--ruby/lib/qpid.rb41
-rw-r--r--ruby/lib/qpid/assembler.rb148
-rw-r--r--ruby/lib/qpid/client.rb (renamed from ruby/qpid/client.rb)3
-rw-r--r--ruby/lib/qpid/codec.rb457
-rw-r--r--ruby/lib/qpid/codec08.rb (renamed from ruby/qpid/codec.rb)6
-rw-r--r--ruby/lib/qpid/config.rb (renamed from ruby/qpid/queue.rb)39
-rw-r--r--ruby/lib/qpid/connection.rb222
-rw-r--r--ruby/lib/qpid/connection08.rb (renamed from ruby/qpid/connection.rb)10
-rw-r--r--ruby/lib/qpid/datatypes.rb353
-rw-r--r--ruby/lib/qpid/delegates.rb237
-rw-r--r--ruby/lib/qpid/fields.rb (renamed from ruby/qpid/fields.rb)6
-rw-r--r--ruby/lib/qpid/framer.rb212
-rw-r--r--ruby/lib/qpid/invoker.rb65
-rw-r--r--ruby/lib/qpid/packer.rb (renamed from ruby/qpid.rb)24
-rw-r--r--ruby/lib/qpid/peer.rb (renamed from ruby/qpid/peer.rb)32
-rw-r--r--ruby/lib/qpid/qmf.rb1957
-rw-r--r--ruby/lib/qpid/queue.rb101
-rw-r--r--ruby/lib/qpid/session.rb458
-rw-r--r--ruby/lib/qpid/spec.rb183
-rw-r--r--ruby/lib/qpid/spec010.rb485
-rw-r--r--ruby/lib/qpid/spec08.rb190
-rw-r--r--ruby/lib/qpid/test.rb (renamed from ruby/qpid/test.rb)7
-rw-r--r--ruby/lib/qpid/traverse.rb (renamed from ruby/qpid/traverse.rb)0
-rw-r--r--ruby/lib/qpid/util.rb75
-rw-r--r--ruby/qpid/spec.rb289
-rwxr-xr-xruby/run-tests4
-rw-r--r--ruby/tests/assembler.rb78
-rw-r--r--ruby/tests/codec010.rb122
-rw-r--r--ruby/tests/connection.rb246
-rw-r--r--ruby/tests/datatypes.rb224
-rw-r--r--ruby/tests/framer.rb99
-rw-r--r--ruby/tests/qmf.rb248
-rw-r--r--ruby/tests/queue.rb80
-rw-r--r--ruby/tests/spec010.rb80
-rw-r--r--ruby/tests/util.rb72
-rw-r--r--ruby/tests_0-8/basic.rb6
-rw-r--r--ruby/tests_0-8/channel.rb2
45 files changed, 7310 insertions, 381 deletions
diff --git a/ruby/Makefile b/ruby/Makefile
new file mode 100644
index 0000000000..9cac3207c0
--- /dev/null
+++ b/ruby/Makefile
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+SASL_DIR = ext/sasl
+SASL_MODULE = $(SASL_DIR)/sasl.so
+RUBY_LIB = lib
+SPEC_CACHE_SCRIPT = sc.rb
+
+.PHONY: spec_cache all clean distclean
+
+all : build
+
+$(SASL_MODULE) : $(SASL_DIR)/sasl.c
+ cd $(SASL_DIR); ruby extconf.rb
+ $(MAKE) -C $(SASL_DIR)
+
+spec_cache :
+ echo "require 'qpid'" > $(SPEC_CACHE_SCRIPT)
+ echo "Qpid::Spec010::load()" >> $(SPEC_CACHE_SCRIPT)
+ ruby -I $(RUBY_LIB) -I $(SASL_DIR) $(SPEC_CACHE_SCRIPT)
+ rm $(SPEC_CACHE_SCRIPT)
+
+build: $(SASL_MODULE) spec_cache
+
+clean:
+ cd $(SASL_DIR); make clean
+
+distclean:
+ cd $(SASL_DIR); make distclean
+ rm -rf $(RUBY_LIB)/qpid/spec_cache
+
diff --git a/ruby/README.txt b/ruby/README.txt
new file mode 100644
index 0000000000..3c483f996b
--- /dev/null
+++ b/ruby/README.txt
@@ -0,0 +1,22 @@
+= INSTALLATION =
+
+Extract the release archive into a directory of your choice and set
+your RUBYLIB environment variable accordingly:
+
+ tar -xzf qpid-ruby-<version>.tar.gz -C <install-prefix>
+ export RUBYLIB=<install-prefix>/qpid-<version>/ruby/lib
+
+= GETTING STARTED =
+
+The ruby client includes a simple hello-world example that publishes
+and consumes a message:
+
+ cp <install-prefix>/qpid-<version>/ruby/examples/hello-world.rb .
+ ./hello-world.rb
+
+= RUNNING THE TESTS =
+
+The "tests" directory contains a collection of unit tests for the ruby
+client. These can be run with the Rakefile provided:
+
+ rake test
diff --git a/ruby/RELEASE_NOTES b/ruby/RELEASE_NOTES
index 5ea0bd8eec..29760bd83a 100644
--- a/ruby/RELEASE_NOTES
+++ b/ruby/RELEASE_NOTES
@@ -1,19 +1,15 @@
-Apache Incubator Qpid Ruby M2 Release Notes
----------------------------------------------
+Apache Qpid Ruby M4 Release Notes
+---------------------------------
-The Qpid M2 release contains support the for AMQP 0-8 specification.
-You can access the 0-8 specification using the following link.
-http://www.amqp.org/tikiwiki/tiki-index.php?page=Download
+The Qpid M4 release of the ruby client contains support the for AMQP
+0-10 & 0-8 specifications. See:
-For full details of Qpid capabilities, as they currently stand, see our
-detailed project documentation at:
+http://jira.amqp.org/confluence/display/AMQP/Download
-http://cwiki.apache.org/confluence/pages/viewpage.action?pageId=28284
+For full details of Qpid capabilities, as they currently stand, see
+our project page at:
-Please take time to go through the README file provided with the distro.
+http://cwiki.apache.org/confluence/display/qpid/Index
-
-Known Issues/Outstanding Work
------------------------------
-
-Bug QPID-467 Complete Interop Testing
+The README file provided contains some details on installing and using
+the ruby client that is included with this distribution.
diff --git a/ruby/Rakefile b/ruby/Rakefile
new file mode 100644
index 0000000000..94f6389822
--- /dev/null
+++ b/ruby/Rakefile
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Rakefile for ruby-rpm -*- ruby -*-
+require 'rake/clean'
+require 'rake/testtask'
+require 'rake/gempackagetask'
+require 'pathname'
+
+PKG_NAME='ruby-qpid'
+PKG_VERSION='0.10.2'
+GEM_NAME='qpid'
+
+EXT_CONF="ext/sasl/extconf.rb"
+MAKEFILE="ext/sasl/Makefile"
+SASL_MODULE="ext/sasl/sasl.so"
+SASL_SRC=SASL_MODULE.gsub(/.so$/, ".c")
+
+#
+# Additional files for clean/clobber
+#
+
+CLEAN.include [ "**/*~", "lib/*/spec_cache", SASL_MODULE, "ext/**/*.o" ]
+
+CLOBBER.include [ "config.save", "ext/**/mkmf.log",
+ MAKEFILE ]
+
+file MAKEFILE => EXT_CONF do |t|
+ Dir::chdir(File::dirname(EXT_CONF)) do
+ unless sh "ruby #{File::basename(EXT_CONF)}"
+ $stderr.puts "Failed to run extconf"
+ break
+ end
+ end
+end
+
+file SASL_MODULE => [ MAKEFILE, SASL_SRC ] do |t|
+ Dir::chdir(File::dirname(EXT_CONF)) do
+ unless sh "make"
+ $stderr.puts "make failed"
+ break
+ end
+ end
+end
+desc "Build the native library and AMQP spec cache"
+task :build => :spec_cache
+
+Rake::TestTask.new(:test) do |t|
+ t.test_files = FileList['tests/*.rb'].exclude("tests/util.rb")
+ t.libs = [ 'lib', 'ext/sasl' ]
+end
+
+Rake::TestTask.new(:"test_0-8") do |t|
+ t.test_files = FileList["tests_0-8/*.rb"]
+ t.libs = [ 'lib', 'ext/sasl' ]
+end
+
+desc "Create cached versions of the AMQP specs"
+task :spec_cache => SASL_MODULE do |t|
+ pid = fork do
+ $:.insert(0, "lib", "ext/sasl")
+ require 'qpid'
+ Qpid::Spec010::load()
+ end
+ Process.wait(pid)
+end
+
+#
+# Packaging
+#
+
+PKG_FILES = FileList[
+ "DISCLAIMER", "LICENSE.txt", "NOTICE.txt",
+ "Rakefile", "RELEASE_NOTES",
+ "lib/**/*.rb", "lib/*/spec_cache/*.rb*", "tests/**/*", "examples/**", "ext/**/*.[ch]",
+ "ext/**/MANIFEST", "ext/**/extconf.rb"
+]
+
+DIST_FILES = FileList[
+ "pkg/*.tgz", "pkg/*.gem"
+]
+
+SPEC = Gem::Specification.new do |s|
+ s.name = GEM_NAME
+ s.version = PKG_VERSION
+ s.email = "qpid-dev@incubator.apache.org"
+ s.homepage = "http://cwiki.apache.org/qpid/"
+ s.summary = "Ruby client for Qpid"
+ s.files = PKG_FILES
+ s.required_ruby_version = '>= 1.8.1'
+ s.description = "Ruby client for Qpid"
+end
+
+Rake::GemPackageTask.new(SPEC) do |pkg|
+ task pkg.package_dir => [ :spec_cache ]
+ pkg.need_tar = true
+ pkg.need_zip = true
+end
diff --git a/ruby/examples/hello-world.rb b/ruby/examples/hello-world.rb
new file mode 100644
index 0000000000..e8ef673316
--- /dev/null
+++ b/ruby/examples/hello-world.rb
@@ -0,0 +1,61 @@
+#!/usr/bin/ruby
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "qpid"
+require "socket"
+
+broker = if ARGV.length > 0 then ARGV[0] else "localhost" end
+port = if ARGV.length > 1 then ARGV[1].to_i else 5672 end
+if ARGV.length > 2 then
+ puts "usage: hello-world.rb [ <broker> [ <port> ] ]"
+ exit 1
+end
+
+conn = Qpid::Connection.new(TCPSocket.new(broker, port))
+conn.start(10)
+
+ssn = conn.session("test")
+
+# create a queue
+ssn.queue_declare("test-queue")
+
+ssn.exchange_declare("test-exchange", :type => "direct")
+
+# Publish a message
+dp = ssn.delivery_properties(:routing_key => "test-queue")
+mp = ssn.message_properties(:content_type => "text/plain")
+msg = Qpid::Message.new(dp, mp, "Hello World!")
+ssn.message_transfer(:message => msg)
+
+# subscribe to a queue
+ssn.message_subscribe(:destination => "messages", :queue => "test-queue",
+ :accept_mode => ssn.message_accept_mode.none)
+incoming = ssn.incoming("messages")
+
+# start incoming message flow
+incoming.start()
+
+# grab a message from the queue
+p incoming.get(10)
+
+# cancel the subscription and close the session and connection
+ssn.message_cancel(:destination => "messages")
+ssn.close()
+conn.close()
diff --git a/ruby/examples/qmf-libvirt.rb b/ruby/examples/qmf-libvirt.rb
new file mode 100644
index 0000000000..492f4fe8d6
--- /dev/null
+++ b/ruby/examples/qmf-libvirt.rb
@@ -0,0 +1,80 @@
+#!/usr/bin/ruby
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "qpid"
+
+s = Qpid::Qmf::Session.new()
+b = s.add_broker("amqp://localhost:5672")
+
+while true:
+ nodes = s.objects(:class => "node")
+ nodes.each do |node|
+ puts "node: #{node.hostname}"
+ for (key, val) in node.properties
+ puts " property: #{key}, #{val}"
+ end
+
+ # Find any domains that on the current node.
+ domains = s.objects(:class => "domain", 'node' => node.object_id)
+ domains.each do |domain|
+ r = domain.getXMLDesc()
+ puts "status: #{r.status}"
+ if r.status == 0
+ puts "xml description: #{r.description}"
+ puts "length: #{r.description.length}"
+ end
+
+ puts " domain: #{domain.name}, state: #{domain.state}, id: #{domain.id}"
+ for (key, val) in domain.properties
+ puts " property: #{key}, #{val}"
+ end
+ end
+
+ pools = s.objects(:class => "pool", 'node' => node.object_id)
+ pools.each do |pool|
+ puts " pool: #{pool.name}"
+ for (key, val) in pool.properties
+ puts " property: #{key}, #{val}"
+ end
+
+ r = pool.getXMLDesc()
+ puts "status: #{r.status}"
+ puts "text: #{r.text}"
+ if r.status == 0
+ puts "xml description: #{r.description}"
+ puts "length: #{r.description.length}"
+ end
+
+ # Find volumes that are part of the pool.
+ volumes = s.objects(:class => "volume", 'pool' => pool.object_id)
+ volumes.each do |volume|
+ puts " volume: #{volume.name}"
+ for (key, val) in volume.properties
+ puts " property: #{key}, #{val}"
+ end
+ end
+ end
+
+ end
+
+ puts '----------------------------'
+ sleep(5)
+
+end
diff --git a/ruby/ext/sasl/extconf.rb b/ruby/ext/sasl/extconf.rb
new file mode 100644
index 0000000000..5c9a24c35c
--- /dev/null
+++ b/ruby/ext/sasl/extconf.rb
@@ -0,0 +1,10 @@
+require 'mkmf'
+
+extension_name = 'sasl'
+have_library("c", "main")
+
+unless have_library("sasl2")
+ raise "Package cyrus-sasl-devel not found"
+end
+
+create_makefile(extension_name)
diff --git a/ruby/ext/sasl/sasl.c b/ruby/ext/sasl/sasl.c
new file mode 100644
index 0000000000..2d4e40d30e
--- /dev/null
+++ b/ruby/ext/sasl/sasl.c
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+#include <stdio.h>
+#include <unistd.h>
+#include <malloc.h>
+#include <string.h>
+#include <sasl/sasl.h>
+#include <ruby.h>
+
+static VALUE mSasl;
+
+#define INPUT_SIZE 512
+#define MECH_SIZE 32
+
+typedef void* sasl_context_t;
+
+#define QSASL_OK 0
+#define QSASL_CONTINUE 1
+#define QSASL_FAILED 2
+
+typedef struct {
+ char magic[8];
+ sasl_conn_t* conn;
+ sasl_callback_t callbacks[8];
+ char* userName;
+ char* password;
+ char* operUserName;
+ unsigned int minSsf;
+ unsigned int maxSsf;
+ char mechanism[MECH_SIZE];
+ char input[INPUT_SIZE];
+} context_t;
+
+//
+// Resolve forward references
+//
+static VALUE qsasl_free(int, VALUE*, VALUE);
+
+//
+// Validate an input string to ensure that it is either NULL or of reasonable size.
+//
+static int qsasl_valid(char* str)
+{
+ int idx;
+
+ if (str == 0)
+ return 1;
+
+ for (idx = 0; idx < INPUT_SIZE; idx++) {
+ if (str[idx] == '\0')
+ return 1;
+ }
+
+ return 0;
+}
+
+//
+// SASL callback for identity and authentication identity.
+//
+static int qsasl_cb_user(void* _context, int id, const char **result, unsigned *len)
+{
+ context_t* context = (context_t*) _context;
+
+ if (context->userName)
+ *result = context->userName;
+
+ return SASL_OK;
+}
+
+//
+// SASL callback for passwords.
+//
+static int qsasl_cb_password(sasl_conn_t* conn, void* _context, int id, sasl_secret_t **psecret)
+{
+ context_t* context = (context_t*) _context;
+ sasl_secret_t* secret;
+ size_t length;
+
+ if (context->password)
+ length = strlen(context->password);
+ else
+ length = 0;
+
+ secret = (sasl_secret_t*) malloc(sizeof(sasl_secret_t) + length);
+ secret->len = length;
+ if (length)
+ memcpy(secret->data, context->password, length);
+ *psecret = secret;
+
+ return SASL_OK;
+}
+
+//
+// Interactively prompt the user for authentication data.
+//
+static void qsasl_prompt(sasl_context_t _context, sasl_interact_t* interact)
+{
+ context_t* context = (context_t*) _context;
+ char *pass;
+ char *input;
+ char passwdPrompt[100];
+
+ if (interact->id == SASL_CB_PASS) {
+ strncpy(passwdPrompt, interact->prompt, 95);
+ strcat(passwdPrompt, ": ");
+ pass = getpass(passwdPrompt);
+ strncpy(context->input, pass, INPUT_SIZE - 1);
+ context->input[INPUT_SIZE - 1] = '\0';
+ } else {
+ printf(interact->prompt);
+ if (interact->defresult) {
+ printf(" (%s)", interact->defresult);
+ }
+ printf(": ");
+ input = fgets(context->input, INPUT_SIZE, stdin);
+ if (input != context->input) {
+ rb_raise(rb_eRuntimeError, "Unexpected EOF on interactive prompt");
+ }
+ }
+
+ interact->result = context->input;
+ interact->len = strlen(context->input);
+}
+
+//
+// Initialize the SASL client library.
+//
+static VALUE qsasl_client_init()
+{
+ int result;
+
+ result = sasl_client_init(0);
+ if (result != SASL_OK)
+ rb_raise(rb_eRuntimeError,
+ "sasl_client_init failed: %d - %s",
+ result, sasl_errstring(result, -0, 0));
+ return Qnil;
+}
+
+//
+// Allocate a new SASL client context.
+//
+static VALUE qsasl_client_new(int argc, VALUE *argv, VALUE obj)
+{
+ char* mechanism = 0;
+ char* serviceName = 0;
+ char* hostName = 0;
+ char* userName = 0;
+ char* password = 0;
+ unsigned int minSsf = 0;
+ unsigned int maxSsf = 65535;
+
+ int result;
+ int i = 0;
+ context_t *context;
+ sasl_security_properties_t secprops;
+
+ if (argc != 7)
+ rb_raise(rb_eRuntimeError, "Wrong number of arguments");
+
+ if (!NIL_P(argv[0]))
+ mechanism = StringValuePtr(argv[0]);
+ if (!NIL_P(argv[1]))
+ serviceName = StringValuePtr(argv[1]);
+ if (!NIL_P(argv[2]))
+ hostName = StringValuePtr(argv[2]);
+ if (!NIL_P(argv[3]))
+ userName = StringValuePtr(argv[3]);
+ if (!NIL_P(argv[4]))
+ password = StringValuePtr(argv[4]);
+ minSsf = FIX2INT(argv[5]);
+ maxSsf = FIX2INT(argv[6]);
+
+ if (!qsasl_valid(mechanism) || !qsasl_valid(serviceName) ||
+ !qsasl_valid(hostName) || !qsasl_valid(userName) ||
+ !qsasl_valid(password)) {
+ rb_raise(rb_eRuntimeError, "Invalid string argument");
+ }
+
+ context = (context_t*) malloc(sizeof(context_t));
+ memset(context, 0, sizeof(context_t));
+ strcpy(context->magic, "QSASL01");
+
+ context->minSsf = minSsf;
+ context->maxSsf = maxSsf;
+ if (mechanism != 0) {
+ strncpy(context->mechanism, mechanism, MECH_SIZE - 1);
+ context->mechanism[MECH_SIZE - 1] = '\0';
+ }
+
+ context->callbacks[i].id = SASL_CB_GETREALM;
+ context->callbacks[i].proc = 0;
+ context->callbacks[i++].context = 0;
+
+ if (userName != 0 && userName[0] != '\0') {
+ context->userName = (char*) malloc(strlen(userName) + 1);
+ strcpy(context->userName, userName);
+
+ context->callbacks[i].id = SASL_CB_USER;
+ context->callbacks[i].proc = qsasl_cb_user;
+ context->callbacks[i++].context = context;
+
+ context->callbacks[i].id = SASL_CB_AUTHNAME;
+ context->callbacks[i].proc = qsasl_cb_user;
+ context->callbacks[i++].context = context;
+ }
+
+ context->callbacks[i].id = SASL_CB_PASS;
+ if (password != 0 && password[0] != '\0') {
+ context->password = (char*) malloc(strlen(password) + 1);
+ strcpy(context->password, password);
+
+ context->callbacks[i].proc = qsasl_cb_password;
+ } else
+ context->callbacks[i].proc = 0;
+ context->callbacks[i++].context = context;
+
+ context->callbacks[i].id = SASL_CB_LIST_END;
+ context->callbacks[i].proc = 0;
+ context->callbacks[i++].context = 0;
+
+ result = sasl_client_new(serviceName, hostName, 0, 0,
+ context->callbacks, 0, &context->conn);
+
+ if (result != SASL_OK) {
+ context->conn = 0;
+ qsasl_free(1, (VALUE*) &context, Qnil);
+ rb_raise(rb_eRuntimeError, "sasl_client_new failed: %d - %s",
+ result, sasl_errstring(result, 0, 0));
+ }
+
+ secprops.min_ssf = minSsf;
+ secprops.max_ssf = maxSsf;
+ secprops.maxbufsize = 65535;
+ secprops.property_names = 0;
+ secprops.property_values = 0;
+ secprops.security_flags = 0;//TODO: provide means for application to configure these
+
+ result = sasl_setprop(context->conn, SASL_SEC_PROPS, &secprops);
+ if (result != SASL_OK) {
+ qsasl_free(1, (VALUE*) &context, Qnil);
+ rb_raise(rb_eRuntimeError, "sasl_setprop failed: %d - %s",
+ result, sasl_errdetail(context->conn));
+ }
+
+ return (VALUE) context;
+}
+
+//
+// Free a SASL client context.
+//
+static VALUE qsasl_free(int argc, VALUE *argv, VALUE obj)
+{
+ context_t* context;
+
+ if (argc == 1)
+ context = (context_t*) argv[0];
+ else
+ rb_raise(rb_eRuntimeError, "Wrong Number of Arguments");
+
+ if (context->conn)
+ sasl_dispose(&context->conn);
+ if (context->userName)
+ free(context->userName);
+ if (context->password)
+ free(context->password);
+ if (context->operUserName)
+ free(context->operUserName);
+ free(context);
+
+ return Qnil;
+}
+
+//
+// Start the SASL exchange from the client's point of view.
+//
+static VALUE qsasl_client_start(int argc, VALUE *argv, VALUE obj)
+{
+ context_t* context;
+ char* mechList;
+ char* mechToUse;
+ int result;
+ int propResult;
+ const char* response;
+ unsigned int len;
+ sasl_interact_t* interact = 0;
+ const char* chosen;
+ const char* operName;
+
+ if (argc == 2) {
+ context = (context_t*) argv[0];
+ mechList = StringValuePtr(argv[1]);
+ } else
+ rb_raise(rb_eRuntimeError, "Wrong Number of Arguments");
+
+ if (strlen(context->mechanism) == 0)
+ mechToUse = mechList;
+ else
+ mechToUse = context->mechanism;
+
+ do {
+ result = sasl_client_start(context->conn, mechToUse, &interact,
+ &response, &len, &chosen);
+ if (result == SASL_INTERACT) {
+ qsasl_prompt(context, interact);
+ }
+ } while (result == SASL_INTERACT);
+
+ if (result != SASL_OK && result != SASL_CONTINUE)
+ rb_raise(rb_eRuntimeError, "sasl_client_start failed: %d - %s",
+ result, sasl_errdetail(context->conn));
+
+ if (result == SASL_OK) {
+ propResult = sasl_getprop(context->conn, SASL_USERNAME, (const void**) &operName);
+ if (propResult == SASL_OK) {
+ context->operUserName = (char*) malloc(strlen(operName) + 1);
+ strcpy(context->operUserName, operName);
+ }
+ }
+
+ return rb_ary_new3(3, INT2NUM(result), rb_str_new(response, len), rb_str_new2(chosen));
+}
+
+//
+// Take a step in the SASL exchange (only needed for multi-challenge mechanisms).
+//
+static VALUE qsasl_client_step(int argc, VALUE *argv, VALUE obj)
+{
+ context_t* context;
+ VALUE challenge;
+ int result;
+ int propResult;
+ const char* response;
+ const char* operName;
+ unsigned int len;
+ sasl_interact_t* interact = 0;
+
+ if (argc == 2) {
+ context = (context_t*) argv[0];
+ challenge = argv[1];
+ }
+ else
+ rb_raise(rb_eRuntimeError, "Wrong Number of Arguments");
+
+ do {
+ result = sasl_client_step(context->conn,
+ RSTRING(challenge)->ptr, RSTRING(challenge)->len,
+ &interact, &response, &len);
+ if (result == SASL_INTERACT) {
+ qsasl_prompt(context, interact);
+ }
+ } while (result == SASL_INTERACT);
+
+ if (result != SASL_OK && result != SASL_CONTINUE)
+ return QSASL_FAILED;
+
+ if (result == SASL_OK) {
+ propResult = sasl_getprop(context->conn, SASL_USERNAME, (const void**) &operName);
+ if (propResult == SASL_OK) {
+ context->operUserName = (char*) malloc(strlen(operName) + 1);
+ strcpy(context->operUserName, operName);
+ }
+ }
+
+ return rb_ary_new3(2, INT2NUM(result), rb_str_new(response, len));
+}
+
+static VALUE qsasl_user_id(int argc, VALUE *argv, VALUE obj)
+{
+ context_t* context;
+
+ if (argc == 1) {
+ context = (context_t*) argv[0];
+ } else {
+ rb_raise(rb_eRuntimeError, "Wrong Number of Arguments");
+ }
+
+ if (context->operUserName)
+ return rb_str_new2(context->operUserName);
+
+ return Qnil;
+}
+
+//
+// Encode transport data for the security layer.
+//
+static VALUE qsasl_encode(int argc, VALUE *argv, VALUE obj)
+{
+ context_t* context;
+ VALUE clearText;
+ const char* outBuffer;
+ unsigned int outSize;
+ int result;
+
+ if (argc == 2) {
+ context = (context_t*) argv[0];
+ clearText = argv[1];
+ }
+ else
+ rb_raise(rb_eRuntimeError, "Wrong Number of Arguments");
+
+ result = sasl_encode(context->conn,
+ RSTRING(clearText)->ptr, RSTRING(clearText)->len,
+ &outBuffer, &outSize);
+ if (result != SASL_OK)
+ rb_raise(rb_eRuntimeError, "sasl_encode failed: %d - %s",
+ result, sasl_errdetail(context->conn));
+
+ return rb_str_new(outBuffer, outSize);
+}
+
+//
+// Decode transport data for the security layer.
+//
+static VALUE qsasl_decode(int argc, VALUE *argv, VALUE obj)
+{
+ context_t* context;
+ VALUE cipherText;
+ const char* outBuffer;
+ unsigned int outSize;
+ int result;
+
+ if (argc == 2) {
+ context = (context_t*) argv[0];
+ cipherText = argv[1];
+ }
+ else
+ rb_raise(rb_eRuntimeError, "Wrong Number of Arguments");
+
+ result = sasl_decode(context->conn,
+ RSTRING(cipherText)->ptr, RSTRING(cipherText)->len,
+ &outBuffer, &outSize);
+ if (result != SASL_OK)
+ rb_raise(rb_eRuntimeError, "sasl_decode failed: %d - %s",
+ result, sasl_errdetail(context->conn));
+
+ return rb_str_new(outBuffer, outSize);
+}
+
+//
+// Initialize the Sasl module.
+//
+void Init_sasl()
+{
+ mSasl = rb_define_module("Sasl");
+
+ rb_define_module_function(mSasl, "client_init", qsasl_client_init, -1);
+ rb_define_module_function(mSasl, "client_new", qsasl_client_new, -1);
+ rb_define_module_function(mSasl, "free", qsasl_free, -1);
+ rb_define_module_function(mSasl, "client_start", qsasl_client_start, -1);
+ rb_define_module_function(mSasl, "client_step", qsasl_client_step, -1);
+ rb_define_module_function(mSasl, "user_id", qsasl_user_id, -1);
+ rb_define_module_function(mSasl, "encode", qsasl_encode, -1);
+ rb_define_module_function(mSasl, "decode", qsasl_decode, -1);
+}
diff --git a/ruby/lib/qpid.rb b/ruby/lib/qpid.rb
new file mode 100644
index 0000000000..1c719e9b1d
--- /dev/null
+++ b/ruby/lib/qpid.rb
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+module Qpid
+ def self.logger
+ @logger ||= {}
+ @logger
+ end
+end
+
+require "qpid/util"
+require "qpid/queue"
+require "qpid/packer"
+require "qpid/framer"
+require "qpid/codec"
+require 'qpid/datatypes'
+require 'qpid/spec010'
+require 'qpid/delegates'
+require 'qpid/invoker'
+require "qpid/assembler"
+require 'qpid/session'
+require "qpid/connection"
+require "qpid/spec"
+require 'qpid/queue'
+require 'qpid/qmf'
diff --git a/ruby/lib/qpid/assembler.rb b/ruby/lib/qpid/assembler.rb
new file mode 100644
index 0000000000..b768c3f195
--- /dev/null
+++ b/ruby/lib/qpid/assembler.rb
@@ -0,0 +1,148 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+module Qpid
+
+ class << self
+ attr_accessor :asm_logger
+ end
+
+ class Segment
+
+ attr_reader :type, :payload, :track, :channel
+ attr_accessor :id, :offset
+
+ def initialize(first, last, type, track, channel, payload)
+ @id = nil
+ @offset = nil
+ @first = first
+ @last = last
+ @type = type
+ @track = track
+ @channel = channel
+ @payload = payload
+ end
+
+ def first_segment? ; @first ; end
+
+ def last_segment? ; @last ; end
+
+ def decode(spec)
+ segs = spec[:segment_type]
+ choice = segs.enum.choices[type]
+ return method("decode_#{choice.name}").call(spec)
+ end
+
+ def decode_control(spec)
+ sc = StringCodec.new(spec, payload)
+ return sc.read_control()
+ end
+
+ def decode_command(spec)
+ sc = StringCodec.new(spec, payload)
+ hdr, cmd = sc.read_command()
+ cmd.id = id
+ return hdr, cmd
+ end
+
+ def decode_header(spec)
+ sc = StringCodec.new(spec, payload)
+ values = []
+ until sc.encoded.empty?
+ values << sc.read_struct32()
+ end
+ return values
+ end
+
+ def decode_body(spec)
+ payload
+ end
+
+ def append(frame)
+ @payload += frame.payload
+ end
+
+ def to_s
+ f = first_segment? ? 'F' : '.'
+ l = last_segment? ? 'L' : '.'
+ return "%s%s %s %s %s %s" % [f, l, @type,
+ @track, @channel, @payload.inspect]
+ end
+
+ end
+
+ class Assembler < Framer
+
+ def logger; Qpid::asm_logger; end
+
+ def initialize(sock, max_payload = Frame::MAX_PAYLOAD)
+ super(sock)
+ @max_payload = max_payload
+ @fragments = {}
+ end
+
+ def read_segment
+ loop do
+ frame = read_frame
+ key = [frame.channel, frame.track]
+ seg = @fragments[key]
+ unless seg
+ seg = Segment.new(frame.first_segment?,
+ frame.last_segment?,
+ frame.type, frame.track,
+ frame.channel, "")
+ @fragments[key] = seg
+ end
+
+ seg.append(frame)
+
+ if frame.last_frame?
+ @fragments.delete(key)
+ logger.debug("RECV #{seg}") if logger
+ return seg
+ end
+ end
+ end
+
+ def write_segment(segment)
+ remaining = segment.payload
+
+ first = true
+ while first or remaining
+ payload = remaining[0, @max_payload]
+ remaining = remaining[@max_payload, remaining.size]
+
+ flags = 0
+
+ flags |= FIRST_FRM if first
+ flags |= LAST_FRM unless remaining
+ flags |= FIRST_SEG if segment.first_segment?
+ flags |= LAST_SEG if segment.last_segment?
+
+ frame = Frame.new(flags, segment.type, segment.track,
+ segment.channel, payload)
+ write_frame(frame)
+
+ first = false
+ end
+
+ logger.debug("SENT #{segment}") if logger
+ end
+ end
+end
diff --git a/ruby/qpid/client.rb b/ruby/lib/qpid/client.rb
index f10f2e564b..ec3d100a9c 100644
--- a/ruby/qpid/client.rb
+++ b/ruby/lib/qpid/client.rb
@@ -21,7 +21,7 @@ require "thread"
require "qpid/peer"
require "qpid/queue"
-module Qpid
+module Qpid08
class Client
def initialize(host, port, spec, vhost = "/")
@@ -97,6 +97,7 @@ module Qpid
end
class ClientDelegate
+
include Delegate
def initialize(client)
diff --git a/ruby/lib/qpid/codec.rb b/ruby/lib/qpid/codec.rb
new file mode 100644
index 0000000000..a3b5d101c4
--- /dev/null
+++ b/ruby/lib/qpid/codec.rb
@@ -0,0 +1,457 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'qpid/packer.rb'
+require 'iconv'
+
+module Qpid
+
+ class Codec
+
+ include Qpid::Packer
+
+ attr_reader :spec
+
+ def initialize(spec = "")
+ @spec = spec
+ end
+
+ def write_void(v)
+ unless v.nil?
+ raise Exception.new("void not nil: #{v}")
+ end
+ end
+
+ def read_void
+ return nil
+ end
+
+ def write_bit(b)
+ unless b
+ raise Exception.new("bit is nil: #{b}")
+ end
+ end
+
+ def read_bit
+ return true
+ end
+
+ def read_uint8
+ return unpack("C", 1)
+ end
+
+ def write_uint8(n)
+ return pack("C", n)
+ end
+
+ def read_int8
+ return unpack("c", 1)
+ end
+
+ def write_int8(n)
+ pack("c", n)
+ end
+
+ def read_char
+ return unpack("c", 1)
+ end
+
+ def write_char(c)
+ pack("c")
+ end
+
+ def read_boolean
+ return read_uint8 != 0
+ end
+
+ def write_boolean(b)
+ n = 0
+ n = 1 if b != 0
+ write_uint8(n)
+ end
+
+ def read_uint16
+ return unpack("n", 2)
+ end
+
+ def write_uint16(n)
+ pack("n", n)
+ end
+
+ def read_int16
+ # XXX: holy moly.. pack/unpack doesn't have signed network byte order. Crazy hackery.
+ val = unpack("n", 2)
+ val -= 2 ** 16 if val >= 2 ** 15
+ return val
+ end
+
+ def write_int16(n)
+ # XXX: Magically this one works even though it's not signed.
+ pack("n", n)
+ end
+
+ def read_uint32
+ return unpack("N", 4)
+ end
+
+ def write_uint32(n)
+ pack("N", n)
+ end
+
+ def read_int32
+ # Again no pack/unpack for signed int
+ return unpack("N", 4)
+ end
+
+ def write_int32(n)
+ # FIXME
+ pack("N", n)
+ end
+
+ def read_float
+ return unpack("g", 4)
+ end
+
+ def write_float(n)
+ pack("g", n)
+ end
+
+ def read_sequence_no
+ return read_uint32.to_serial
+ end
+
+ def write_sequence_no(n)
+ write_uint32(n.value)
+ end
+
+ def encode_64bit(num, signed = false)
+ b = []
+
+ if num < 0 && signed
+ num += 2 ** 64
+ end
+
+ (0..7).each do |c|
+ d = 7 - c
+ b[c] = (num & (0xff << d * 8)) >> d * 8
+ end
+ pack('C8', *b)
+ end
+
+
+ def decode_64bit(signed = false)
+ # Silly ruby pack/unpack does not implement 64 bit network byte order
+ # encode/decode.
+ a = unpack('C8', 8)
+ num = 0
+ (0..7).each do |c|
+ d = 7 - c
+ num |= a[c] << 8 * d
+ end
+
+ if signed && num >= 2 ** 63
+ num -= 2 ** 64
+ end
+ return num
+ end
+
+ def read_uint64
+ return decode_64bit
+ end
+
+ def write_uint64(n)
+ encode_64bit(n)
+ end
+
+ def read_int64
+ return decode_64bit(signed = true)
+ end
+
+ def write_int64(n)
+ encode_64bit(n, signed = true)
+ end
+
+ def read_datetime
+ return read_uint64
+ end
+
+ def write_datetime(n)
+ write_uint64(n)
+ end
+
+ def read_double
+ return unpack("G", 8)
+ end
+
+ def write_double(n)
+ pack("G", n)
+ end
+
+ def read_vbin8
+ # XXX
+ return read(read_uint8)
+ end
+
+ def write_vbin8(b)
+ # XXX
+ write_uint8(b.length)
+ write(b)
+ end
+
+ def read_str8
+ # FIXME: Check iconv.. I think this will throw if there are odd characters.
+ return Iconv.conv("ASCII", "UTF-8", read_vbin8)
+ end
+
+ def write_str8(s)
+ write_vbin8(Iconv.conv("UTF-8", "ASCII", s))
+ end
+
+ def read_str16
+ return Iconv.conv("ASCII", "UTF-8", read_vbin16)
+ end
+
+ def write_str16(s)
+ write_vbin16(Iconv.conv("UTF-8", "ASCII", s))
+ end
+
+ def read_vbin16
+ # XXX: Using read method?
+ return read(read_uint16)
+ end
+
+ def write_vbin16(b)
+ write_uint16(b.length)
+ write(b)
+ end
+
+ def read_sequence_set
+ # FIXME: Need datatypes
+ result = RangedSet.new
+ size = read_uint16
+ nranges = size / 8
+ nranges.times do |i|
+ lower = read_sequence_no
+ upper = read_sequence_no
+ result.add(lower, upper)
+ end
+ return result
+ end
+
+ def write_sequence_set(ss)
+ size = 8 * ss.ranges.length
+ write_uint16(size)
+ ss.ranges.each do |range|
+ write_sequence_no(range.lower)
+ write_sequence_no(range.upper)
+ end
+ end
+
+ def read_vbin32
+ return read(read_uint32)
+ end
+
+ def write_vbin32(b)
+ write_uint32(b.length)
+ write(b)
+ end
+
+ def write_map(m)
+ sc = StringCodec.new(@spec)
+ unless m.nil?
+ sc.write_uint32(m.size)
+ m.each do |k, v|
+ unless type = @spec.encoding(v.class)
+ raise Exception.new("no encoding for: #{v.class}")
+ end
+ sc.write_str8(k)
+ sc.write_uint8(type.code)
+ type.encode(sc, v)
+ end
+ end
+ write_vbin32(sc.encoded)
+ end
+
+ def read_map
+ sc = StringCodec.new(@spec, read_vbin32)
+ return nil unless sc.encoded
+ count = sc.read_uint32
+ result = nil
+ if count
+ result = {}
+ until sc.encoded.empty?
+ k = sc.read_str8
+ code = sc.read_uint8
+ type = @spec.types[code]
+ v = type.decode(sc)
+ result[k] = v
+ end
+ end
+ return result
+ end
+
+ def write_array(a)
+ sc = StringCodec.new(@spec)
+ unless a.nil?
+ if a.length > 0
+ type = @spec.encoding(a[0].class)
+ else
+ type = @spec.encoding(nil.class)
+ end
+ sc.write_uint8(type.code)
+ sc.write_uint32(a.size)
+ a.each { |o| type.encode(sc, o) }
+ end
+ write_vbin32(sc.encoded)
+ end
+
+ def read_array
+ sc = StringCodec.new(@spec, read_vbin32)
+ return nil if not sc.encoded
+ type = @spec.types[sc.read_uint8]
+ count = sc.read_uint32
+ result = nil
+ if count
+ result = []
+ count.times { |i| result << (type.decode(sc)) }
+ end
+ return result
+ end
+
+ def write_list(l)
+ sc = StringCodec.new(@spec)
+ unless l.nil?
+ sc.write_uint32(l.length)
+ l.each do |o|
+ type = @spec.encoding(o.class)
+ sc.write_uint8(type.code)
+ type.encode(sc, o)
+ end
+ end
+ write_vbin32(sc.encoded)
+ end
+
+ def read_list
+ sc = StringCodec.new(@spec, read_vbin32)
+ return nil if not sc.encoded
+ count = sc.read_uint32
+ result = nil
+ if count
+ result = []
+ count.times do |i|
+ type = @spec.types[sc.read_uint8]
+ result << type.decode(sc)
+ end
+ end
+ return result
+ end
+
+ def read_struct32
+ size = read_uint32
+ code = read_uint16
+ type = @spec.structs[code]
+ # XXX: BLEH!
+ fields = type.decode_fields(self)
+ return Qpid::struct(type, fields)
+ end
+
+ def write_struct32(value)
+ type = value.st_type
+ sc = StringCodec.new(@spec)
+ sc.write_uint16(type.code)
+ type.encode_fields(sc, value)
+ write_vbin32(sc.encoded)
+ end
+
+ def read_control
+ cntrl = @spec.controls[read_uint16]
+ return Qpid::struct(cntrl, cntrl.decode_fields(self))
+ end
+
+ def write_control(ctrl)
+ type = ctrl.st_type
+ write_uint16(type.code)
+ type.encode_fields(self, ctrl)
+ end
+
+ def read_command
+ type = @spec.commands[read_uint16]
+ hdr = @spec[:header].decode(self)
+ cmd = Qpid::struct(type, type.decode_fields(self))
+ return hdr, cmd
+ end
+
+ def write_command(hdr, cmd)
+ type = cmd.st_type
+ write_uint16(type.code)
+ hdr.st_type.encode(self, hdr)
+ type.encode_fields(self, cmd)
+ end
+
+ def read_size(width)
+ if width > 0
+ return send(:"read_uint#{width * 8}")
+ end
+ end
+
+ def write_size(width, n)
+ if width > 0
+ send(:"write_uint#{width * 8}", n)
+ end
+ end
+
+ def read_uuid
+ return unpack("a16", 16)
+ end
+
+ def write_uuid(s)
+ pack("a16", s)
+ end
+
+ def read_bin128
+ return unpack("a16", 16)
+ end
+
+ def write_bin128(b)
+ pack("a16", b)
+ end
+
+ end
+
+ class StringCodec < Codec
+
+ def initialize(spec, encoded = "")
+ @spec = spec
+ @encoded = encoded
+ end
+
+ attr_reader :encoded
+
+ def write(s)
+ @encoded += s
+ end
+
+ def read(n)
+ return "" if n.nil?
+ result = @encoded[0...n]
+ @encoded = @encoded[n...@encoded.size] || ""
+ return result
+ end
+ end
+end
diff --git a/ruby/qpid/codec.rb b/ruby/lib/qpid/codec08.rb
index d1ecd2783f..148dee07bb 100644
--- a/ruby/qpid/codec.rb
+++ b/ruby/lib/qpid/codec08.rb
@@ -17,7 +17,7 @@
# under the License.
#
-module Codec
+module Qpid08
# is there a better way to do this?
class StringWriter
@@ -119,7 +119,7 @@ module Codec
def write(str)
flushbits()
@out.write(str)
-# puts "OUT #{str.inspect()}"
+ # puts "OUT #{str.inspect()}"
end
def pack(fmt, *args)
@@ -246,7 +246,7 @@ module Codec
if result.nil? or result.empty?
raise EOF.new()
else
-# puts " IN #{result.inspect()}"
+ # puts " IN #{result.inspect()}"
return result
end
end
diff --git a/ruby/qpid/queue.rb b/ruby/lib/qpid/config.rb
index 350310882f..1a0942d5d5 100644
--- a/ruby/qpid/queue.rb
+++ b/ruby/lib/qpid/config.rb
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,36 +17,17 @@
# under the License.
#
-require "thread"
-
module Qpid
-
- class Closed < Exception; end
-
- class Queue < Queue
-
- @@END = Object.new()
-
- def close()
- # sentinal to indicate the end of the queue
- self << @@END
- end
-
- def pop(*args)
- result = super(*args)
- if @@END.equal? result
- # we put another sentinal on the end in case there are
- # subsequent calls to pop by this or other threads
- self << @@END
- raise Closed.new()
- else
- return result
+ module Config
+
+ def self.amqp_spec
+ dirs = [File::expand_path(File::join(File::dirname(__FILE__), "../../../specs")),
+ "/usr/share/amqp"]
+ dirs.each do |d|
+ spec = File::join(d, "amqp.0-10-qpid-errata.xml")
+ return spec if File::exists? spec
end
end
- alias shift pop
- alias deq pop
-
end
-
end
diff --git a/ruby/lib/qpid/connection.rb b/ruby/lib/qpid/connection.rb
new file mode 100644
index 0000000000..d2efbfb263
--- /dev/null
+++ b/ruby/lib/qpid/connection.rb
@@ -0,0 +1,222 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'monitor'
+
+module Qpid
+
+ class ChannelBusy< Exception ; end
+
+ class ChannelsBusy < Exception ; end
+
+ class SessionBusy < Exception ; end
+
+ class ConnectionFailed < Exception ; end
+
+ class Timeout < Exception ; end
+
+ class Connection < Assembler
+
+ include MonitorMixin
+
+ attr_reader :spec, :attached, :sessions, :thread
+ attr_accessor :opened, :failed, :close_code, :user_id
+
+ def initialize(sock, args={})
+ super(sock)
+
+ delegate = args[:delegate] || Qpid::Delegate::Client.method(:new)
+ spec = args[:spec] || nil
+
+ @spec = Qpid::Spec010::load(spec)
+ @track = @spec["track"]
+
+ @attached = {}
+ @sessions = {}
+
+ @condition = new_cond
+ @opened = false
+ @failed = false
+ @close_code = [nil, "connection aborted"]
+
+ @thread = nil
+
+ @channel_max = 65535
+ @user_id = nil
+
+ @delegate = delegate.call(self, args)
+ end
+
+ def attach(name, ch, delegate, force=false)
+ synchronize do
+ ssn = @attached[ch.id]
+ if ssn
+ raise ChannelBusy.new(ch, ssn) unless ssn.name == name
+ else
+ ssn = @sessions[name]
+ if ssn.nil?
+ ssn = Session.new(name, @spec, :delegate => delegate)
+ @sessions[name] = ssn
+ elsif ssn.channel
+ if force
+ @attached.delete(ssn.channel.id)
+ ssn.channel = nil
+ else
+ raise SessionBusy.new(ssn)
+ end
+ end
+ @attached[ch.id] = ssn
+ ssn.channel = ch
+ end
+ ch.session = ssn
+ return ssn
+ end
+ end
+
+ def detach(name, ch)
+ synchronize do
+ @attached.delete(ch.id)
+ ssn = @sessions.delete(name)
+ if ssn
+ ssn.channel = nil
+ ssn.closed
+ return ssn
+ end
+ end
+ end
+
+ def session(name, kwargs = {})
+ timeout = kwargs[:timeout]
+ delegate = kwargs[:delegate] || Qpid::Session::Client.method(:new)
+
+ # FIXME: Python has cryptic comment about 'ch 0 ?'
+ channel = (0..@channel_max).detect { |i| ! @attached.key?(i) }
+ raise ChannelsBusy unless channel
+
+ synchronize do
+ ch = Channel.new(self, channel)
+ ssn = attach(name, ch, delegate)
+ ssn.channel.session_attach(name)
+ if ssn.wait_for(timeout) { ssn.channel }
+ return ssn
+ else
+ detach(name, ch)
+ raise Timeout
+ end
+ end
+ end
+
+ def detach_all
+ synchronize do
+ attached.values.each do |ssn|
+ ssn.exceptions << @close_code unless @close_code[0] == 200
+ detach(ssn.name, ssn.channel)
+ end
+ end
+ end
+
+ def start(timeout=nil)
+ @delegate.start
+ @thread = Thread.new { run }
+ @thread[:name] = 'conn'
+ synchronize do
+ unless @condition.wait_for(timeout) { @opened || @failed }
+ raise Timeout
+ end
+ end
+ if @failed
+ raise ConnectionFailed.new(@close_code)
+ end
+ end
+
+ def run
+ # XXX: we don't really have a good way to exit this loop without
+ # getting the other end to kill the socket
+ loop do
+ begin
+ seg = read_segment
+ rescue Qpid::Closed => e
+ detach_all
+ break
+ end
+ @delegate.received(seg)
+ end
+ end
+
+ def close(timeout=nil)
+ return unless @opened
+ Channel.new(self, 0).connection_close(200)
+ synchronize do
+ unless @condition.wait_for(timeout) { ! @opened }
+ raise Timeout
+ end
+ end
+ @thread.join(timeout)
+ @thread = nil
+ end
+
+ def signal
+ synchronize { @condition.signal }
+ end
+
+ def to_s
+ # FIXME: We'd like to report something like HOST:PORT
+ return @sock.to_s
+ end
+
+ class Channel < Invoker
+
+ attr_reader :id, :connection
+ attr_accessor :session
+
+ def initialize(connection, id)
+ @connection = connection
+ @id = id
+ @session = nil
+ end
+
+ def resolve_method(name)
+ inst = @connection.spec[name]
+ if inst.is_a?(Qpid::Spec010::Control)
+ return invocation(:method, inst)
+ else
+ return invocation(:error, nil)
+ end
+ end
+
+ def invoke(type, args)
+ ctl = type.create(*args)
+ sc = StringCodec.new(@connection.spec)
+ sc.write_control(ctl)
+ @connection.write_segment(Segment.new(true, true, type.segment_type,
+ type.track, self.id, sc.encoded))
+
+ log = Qpid::logger["qpid.io.ctl"]
+ log.debug("SENT %s", ctl) if log
+ end
+
+ def to_s
+ return "#{@connection}[#{@id}]"
+ end
+
+ end
+
+ end
+
+end
diff --git a/ruby/qpid/connection.rb b/ruby/lib/qpid/connection08.rb
index f6ee9cf1e4..09a4888cc4 100644
--- a/ruby/qpid/connection.rb
+++ b/ruby/lib/qpid/connection08.rb
@@ -18,11 +18,9 @@
#
require "socket"
-require "qpid/codec"
+require "qpid/codec08"
-include Codec
-
-module Qpid
+module Qpid08
class Connection
@@ -48,7 +46,7 @@ module Qpid
end
def write(frame)
-# puts "OUT #{frame.inspect()}"
+ # puts "OUT #{frame.inspect()}"
@out.octet(@spec.constants[frame.payload.type].id)
@out.short(frame.channel)
frame.payload.encode(@out)
@@ -64,7 +62,7 @@ module Qpid
raise Exception.new("framing error: expected #{frame_end}, got #{oct}")
end
frame = Frame.new(channel, payload)
-# puts " IN #{frame.inspect}"
+ # puts " IN #{frame.inspect}"
return frame
end
diff --git a/ruby/lib/qpid/datatypes.rb b/ruby/lib/qpid/datatypes.rb
new file mode 100644
index 0000000000..418388c73a
--- /dev/null
+++ b/ruby/lib/qpid/datatypes.rb
@@ -0,0 +1,353 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+module Qpid
+
+ def self.struct(type, *args)
+ # FIXME: This is fragile; the last arg could be a hash,
+ # without being hte keywords
+ kwargs = {}
+ kwargs = args.pop if args.any? && args[-1].is_a?(Hash)
+
+ if args.size > type.fields.size
+ raise TypeError,
+ "%s() takes at most %d arguments (%d given)" %
+ [type.name, type.fields.size, args.size]
+ end
+
+ attrs = type.fields.inject({}) do |attrs, field|
+ if args.any?
+ attrs[field.name] = args.shift
+ if kwargs.key?(field.name)
+ raise TypeError,
+ "%s() got multiple values for keyword argument '%s'" %
+ [type.name, field.name]
+ end
+ elsif kwargs.key?(field.name)
+ attrs[field.name] = kwargs.delete(field.name)
+ else
+ attrs[field.name] = field.default
+ end
+ attrs
+ end
+
+ unless kwargs.empty?
+ unexpected = kwargs.keys[0]
+ raise TypeError,
+ "%s() got an unexpected keyword argument '%s'" %
+ [type.name, unexpected]
+ end
+
+ attrs[:st_type] = type
+ attrs[:id] = nil
+
+ name = "Qpid_" + type.name.to_s.capitalize
+ unless ::Struct.const_defined?(name)
+ vars = type.fields.collect { |f| f.name } << :st_type << :id
+ ::Struct.new(name, *vars)
+ end
+ st = ::Struct.const_get(name)
+
+ result = st.new
+ attrs.each { |k, v| result[k] = v }
+ return result
+ end
+
+ class Message
+
+ attr_accessor :headers, :body, :id
+
+ def initialize(*args)
+ @body = nil
+ @headers = nil
+
+ @body = args.pop unless args.empty?
+ @headers = args unless args.empty?
+
+ @id = nil
+ end
+
+ def has(name)
+ return ! get(name).nil?
+ end
+
+ def get(name)
+ if @headers
+ name = name.to_sym
+ @headers.find { |h| h.st_type.name == name }
+ end
+ end
+
+ def set(header)
+ @headers ||= []
+ if h = @headers.find { |h| h.st_type == header.st_type }
+ ind = @headers.index(h)
+ @headers[ind] = header
+ else
+ @headers << header
+ end
+ end
+
+ def clear(name)
+ if @headers
+ name = name.to_sym
+ @headers.delete_if { |h| h.st_type.name == name }
+ end
+ end
+
+ # FIXME: Not sure what to do here
+ # Ruby doesn't have a notion of a evaluable string representation
+ # def __repr__(self):
+ # args = []
+ # if self.headers:
+ # args.extend(map(repr, self.headers))
+ # if self.body:
+ # args.append(repr(self.body))
+ # if self.id is not None:
+ # args.append("id=%s" % self.id)
+ # return "Message(%s)" % ", ".join(args)
+ # end
+ end
+
+ class ::Object
+
+ def to_serial
+ Qpid::Serial.new(self)
+ end
+ end
+
+ class Serial
+
+ include Comparable
+
+ attr_accessor :value
+
+ def initialize(value)
+ @value = value & 0xFFFFFFFF
+ end
+
+ def hash
+ @value.hash
+ end
+
+ def to_serial
+ self
+ end
+
+ def eql?(other)
+ other = other.to_serial
+ value.eql?(other.value)
+ end
+
+ def <=>(other)
+ return 1 if other.nil?
+
+ other = other.to_serial
+
+ delta = (value - other.value) & 0xFFFFFFFF
+ neg = delta & 0x80000000
+ mag = delta & 0x7FFFFFFF
+
+ return (neg>0) ? -mag : mag
+ end
+
+ def +(other)
+ result = other.to_serial
+ result.value += value
+ return result
+ end
+
+ def -(other)
+ result = other.to_serial
+ result.value = value - result.value
+ return result
+ end
+
+ def succ
+ Serial.new(value + 1)
+ end
+
+ # FIXME: Not sure what to do here
+ # Ruby doesn't have a notion of a evaluable string representation
+ # def __repr__(self):
+ # return "serial(%s)" % self.value
+ # end
+
+ def to_s
+ value.to_s
+ end
+
+ end
+
+ # The Python class datatypes.Range is emulated by the standard
+ # Range class with a few additions
+ class ::Range
+
+ alias :lower :begin
+ alias :upper :end
+
+ def touches(r)
+ # XXX: are we doing more checks than we need?
+ return (r.include?(lower - 1) ||
+ r.include?(upper + 1) ||
+ include?(r.lower - 1) ||
+ include?(r.upper + 1) ||
+ r.include?(lower) ||
+ r.include?(upper) ||
+ include?(r.lower) ||
+ include?(r.upper))
+ end
+
+ def span(r)
+ Range.new([lower, r.lower].min, [upper, r.upper].max)
+ end
+
+ def intersect(r)
+ l = [lower, r.lower].max
+ u = [upper, r.upper].min
+ return l > u ? nil : Range.new(l, u)
+ end
+
+ end
+
+ class RangedSet
+
+ include Enumerable
+
+ attr_accessor :ranges
+
+ def initialize(*args)
+ @ranges = []
+ args.each { |n| add(n) }
+ end
+
+ def each(&block)
+ ranges.each { |r| yield(r) }
+ end
+
+ def include?(n)
+ if (n.is_a?(Range))
+ super(n)
+ else
+ ranges.find { |r| r.include?(n) }
+ end
+ end
+
+ def add_range(range)
+ ranges.delete_if do |r|
+ if range.touches(r)
+ range = range.span(r)
+ true
+ else
+ false
+ end
+ end
+ ranges << range
+ end
+
+ def add(lower, upper = nil)
+ upper = lower if upper.nil?
+ add_range(Range.new(lower, upper))
+ end
+
+ def to_s
+ repr = ranges.sort { |a,b| b.lower <=> a.lower }.
+ map { |r| r.to_s }.join(",")
+ "<RangedSet: {#{repr}}"
+ end
+ end
+
+ class Future
+ def initialize(initial=nil, exception=Exception)
+ @value = initial
+ @error = nil
+ @set = Util::Event.new
+ @exception = exception
+ end
+
+ def error(error)
+ @error = error
+ @set.set
+ end
+
+ def set(value)
+ @value = value
+ @set.set
+ end
+
+ def get(timeout=nil)
+ @set.wait(timeout)
+ unless @error.nil?
+ raise @exception.new(@error)
+ end
+ @value
+ end
+ end
+
+ class UUID
+ include Comparable
+
+ attr_accessor :bytes
+
+ def initialize(bytes)
+ @bytes = bytes
+ end
+
+ def <=>(other)
+ if other.respond_to?(:bytes)
+ return bytes <=> other.bytes
+ else
+ raise NotImplementedError
+ end
+ end
+
+ def to_s
+ UUID::format(bytes)
+ end
+
+ # FIXME: Not sure what to do here
+ # Ruby doesn't have a notion of a evaluable string representation
+ # def __repr__(self):
+ # return "UUID(%r)" % str(self)
+ # end
+
+ def self.random_uuid
+ bytes = (1..16).collect { |i| rand(256) }
+
+ # From RFC4122, the version bits are set to 0100
+ bytes[7] &= 0x0F
+ bytes[7] |= 0x40
+
+ # From RFC4122, the top two bits of byte 8 get set to 01
+ bytes[8] &= 0x3F
+ bytes[8] |= 0x80
+ return bytes.pack("C16")
+ end
+
+ def self.uuid4
+ UUID.new(random_uuid)
+ end
+
+ def self.format(s)
+ # Python format !LHHHHL
+ # big-endian, ulong, ushort x 4, ulong
+ "%08x-%04x-%04x-%04x-%04x%08x" % s.unpack("NnnnnN")
+ end
+ end
+end
diff --git a/ruby/lib/qpid/delegates.rb b/ruby/lib/qpid/delegates.rb
new file mode 100644
index 0000000000..f779047e05
--- /dev/null
+++ b/ruby/lib/qpid/delegates.rb
@@ -0,0 +1,237 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'rbconfig'
+require 'sasl'
+
+module Qpid
+
+ class Delegate
+
+ def initialize(connection, args={})
+ @connection = connection
+ @spec = connection.spec
+ @delegate = args[:delegate] || Qpid::Delegate::Client.method(:new)
+ @control = @spec[:track].enum[:control].value
+ end
+
+ def log ; Qpid::logger["qpid.io.ctl"]; end
+
+ def received(seg)
+ ssn = @connection.attached[seg.channel]
+ unless ssn
+ ch = Qpid::Connection::Channel.new(@connection, seg.channel)
+ else
+ ch = ssn.channel
+ end
+
+ if seg.track == @control
+ ctl = seg.decode(@spec)
+ log.debug("RECV %s", ctl) if log
+ attr = ctl.st_type.name
+ method(attr).call(ch, ctl)
+ elsif ssn.nil?
+ ch.session_detached
+ else
+ ssn.received(seg)
+ end
+ end
+
+ def connection_close(ch, close)
+ @connection.close_code = [close.reply_code, close.reply_text]
+ ch.connection_close_ok
+ @connection.sock.close_write()
+ unless @connection.opened
+ @connection.failed = true
+ @connection.signal
+ end
+ end
+
+ def connection_close_ok(ch, close_ok)
+ @connection.opened = false
+ @connection.signal
+ end
+
+ def session_attach(ch, a)
+ begin
+ @connection.attach(a.name, ch, @delegate, a.force)
+ ch.session_attached(a.name)
+ rescue Qpid::ChannelBusy
+ ch.session_detached(a.name)
+ rescue Qpid::SessionBusy
+ ch.session_detached(a.name)
+ end
+ end
+
+ def session_attached(ch, a)
+ ch.session.signal
+ end
+
+ def session_detach(ch, d)
+ #send back the confirmation of detachment before removing the
+ #channel from the attached set; this avoids needing to hold the
+ #connection lock during the sending of this control and ensures
+ #that if the channel is immediately reused for a new session the
+ #attach request will follow the detached notification.
+ ch.session_detached(d.name)
+ ssn = @connection.detach(d.name, ch)
+ end
+
+ def session_detached(ch, d)
+ @connection.detach(d.name, ch)
+ end
+
+ def session_request_timeout(ch, rt)
+ ch.session_timeout(rt.timeout)
+ end
+
+ def session_command_point(ch, cp)
+ ssn = ch.session
+ ssn.receiver.next_id = cp.command_id
+ ssn.receiver.next_offset = cp.command_offset
+ end
+
+ def session_completed(ch, cmp)
+ ch.session.sender.has_completed(cmp.commands)
+ if cmp.timely_reply
+ ch.session_known_completed(cmp.commands)
+ end
+ ch.session.signal
+ end
+
+ def session_known_completed(ch, kn_cmp)
+ ch.session.receiver.known_completed(kn_cmp.commands)
+ end
+
+ def session_flush(ch, f)
+ rcv = ch.session.receiver
+ if f.expected
+ if rcv.next_id
+ exp = Qpid::RangedSet.new(rcv.next_id)
+ else
+ exp = nil
+ end
+ ch.session_expected(exp)
+ end
+ if f.confirmed
+ ch.session_confirmed(rcv.completed)
+ end
+ if f.completed
+ ch.session_completed(rcv.completed)
+ end
+ end
+
+ class Server < Delegate
+
+ def start
+ @connection.read_header()
+ @connection.write_header(@spec.major, @spec.minor)
+ ch = Qpid::Connection::Channel.new(@connection, 0)
+ ch.connection_start(:mechanisms => ["ANONYMOUS"])
+ ch
+ end
+
+ def connection_start_ok(ch, start_ok)
+ ch.connection_tune(:channel_max => 65535)
+ end
+
+ def connection_tune_ok(ch, tune_ok)
+ nil
+ end
+
+ def connection_open(ch, open)
+ @connection.opened = true
+ ch.connection_open_ok()
+ @connection.signal
+ end
+ end
+
+ class Client < Delegate
+
+ # FIXME: Python uses os.name for platform - we don't have an exact
+ # analog in Ruby
+ PROPERTIES = {"product" => "qpid python client",
+ "version" => "development",
+ "platform" => Config::CONFIG["build_os"],
+ "qpid.client_process" => File.basename($0),
+ "qpid.client_pid" => Process.pid,
+ "qpid.client_ppid" => Process.ppid}
+
+
+ def initialize(connection, args)
+ super(connection)
+
+ result = Sasl::client_init
+
+ @mechanism= args[:mechanism]
+ @username = args[:username]
+ @password = args[:password]
+ @service = args[:service] || "qpidd"
+ @min_ssf = args[:min_ssf] || 0
+ @max_ssf = args[:max_ssf] || 65535
+
+ @saslConn = Sasl.client_new(@mechanism, @service, args[:host],
+ @username, @password, @min_ssf, @max_ssf)
+ end
+
+ def start
+ @connection.write_header(@spec.major, @spec.minor)
+ @connection.read_header
+ end
+
+ def connection_start(ch, start)
+ mech_list = ""
+ start.mechanisms.each do |m|
+ mech_list += m + " "
+ end
+ begin
+ resp = Sasl.client_start(@saslConn, mech_list)
+ @connection.user_id = Sasl.user_id(@saslConn)
+ ch.connection_start_ok(:client_properties => PROPERTIES,
+ :mechanism => resp[2],
+ :response => resp[1])
+ rescue exception
+ ch.connection_close(:message => $!.message)
+ @connection.failed = true
+ @connection.signal
+ end
+ end
+
+ def connection_secure(ch, secure)
+ resp = Sasl.client_step(@saslConn, secure.challenge)
+ @connection.user_id = Sasl.user_id(@saslConn)
+ ch.connection_secure_ok(:response => resp[1])
+ end
+
+ def connection_tune(ch, tune)
+ ch.connection_tune_ok(:channel_max => tune.channel_max,
+ :max_frame_size => tune.max_frame_size,
+ :heartbeat => 0)
+ ch.connection_open()
+ @connection.security_layer_tx = @saslConn
+ end
+
+ def connection_open_ok(ch, open_ok)
+ @connection.security_layer_rx = @saslConn
+ @connection.opened = true
+ @connection.signal
+ end
+ end
+ end
+end
diff --git a/ruby/qpid/fields.rb b/ruby/lib/qpid/fields.rb
index 91484af850..cc87d07529 100644
--- a/ruby/qpid/fields.rb
+++ b/ruby/lib/qpid/fields.rb
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,7 +26,7 @@ class Class
if respond_to? :init
init(*args) {|*a| yield(*a)}
elsif args.any?
- raise ArgumentException.new("extra arguments: #{args}")
+ raise ArgumentError, "extra arguments: #{args.inspect}"
end
end
}
diff --git a/ruby/lib/qpid/framer.rb b/ruby/lib/qpid/framer.rb
new file mode 100644
index 0000000000..d057605383
--- /dev/null
+++ b/ruby/lib/qpid/framer.rb
@@ -0,0 +1,212 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'monitor'
+require 'logger'
+require 'sasl'
+
+module Qpid
+
+ FIRST_SEG = 0x08
+ LAST_SEG = 0x04
+ FIRST_FRM = 0x02
+ LAST_FRM = 0x01
+
+ class << self
+ attr_accessor :raw_logger, :frm_logger
+ end
+
+ def self.packed_size(format)
+ # FIXME: This is a total copout to simulate Python's
+ # struct.calcsize
+ ([0]*256).pack(format).size
+ end
+
+ class Frame
+ attr_reader :payload, :track, :flags, :type, :channel
+
+ # HEADER = "!2BHxBH4x"
+ # Python Meaning Ruby
+ # ! big endian (implied by format char)
+ # 2B 2 uchar C2
+ # H unsigned short n
+ # x pad byte x
+ # B uchar C
+ # H unsigned short n
+ # 4x pad byte x4
+ HEADER = "C2nxCnx4"
+ HEADER_SIZE = Qpid::packed_size(HEADER)
+ MAX_PAYLOAD = 65535 - HEADER_SIZE
+
+ def initialize(flags, type, track, channel, payload)
+ if payload.size > MAX_PAYLOAD
+ raise ArgumentError, "max payload size exceeded: #{payload.size}"
+ end
+
+ @flags = flags
+ @type = type
+ @track = track
+ @channel = channel
+ @payload = payload
+ end
+
+ def first_segment? ; FIRST_SEG & @flags > 0 ; end
+
+ def last_segment? ; LAST_SEG & @flags > 0 ; end
+
+ def first_frame? ; FIRST_FRM & @flags > 0 ; end
+
+ def last_frame? ; LAST_FRM & @flags > 0 ; end
+
+ def to_s
+ fs = first_segment? ? 'S' : '.'
+ ls = last_segment? ? 's' : '.'
+ ff = first_frame? ? 'F' : '.'
+ lf = last_frame? ? 'f' : '.'
+
+ return "%s%s%s%s %s %s %s %s" % [fs, ls, ff, lf,
+ @type,
+ @track,
+ @channel,
+ @payload.inspect]
+ end
+ end
+
+ class FramingError < Exception ; end
+
+ class Closed < Exception ; end
+
+ class Framer
+ include Packer
+
+ # Python: "!4s4B"
+ HEADER = "a4C4"
+ HEADER_SIZE = 8
+
+ def raw
+ Qpid::raw_logger
+ end
+
+ def frm
+ Qpid::frm_logger
+ end
+
+ def initialize(sock)
+ @sock = sock
+ @sock.extend(MonitorMixin)
+ @tx_buf = ""
+ @rx_buf = ""
+ @security_layer_tx = nil
+ @security_layer_rx = nil
+ @maxbufsize = 65535
+ end
+
+ attr_reader :sock
+ attr_accessor :security_layer_tx, :security_layer_rx
+
+ def aborted? ; false ; end
+
+ def write(buf)
+ @tx_buf += buf
+ end
+
+ def flush
+ @sock.synchronize do
+ if @security_layer_tx
+ cipher_buf = Sasl.encode(@security_layer_tx, @tx_buf)
+ _write(cipher_buf)
+ else
+ _write(@tx_buf)
+ end
+ @tx_buf = ""
+ frm.debug("FLUSHED") if frm
+ end
+ rescue
+ @sock.close unless @sock.closed?
+ end
+
+ def _write(buf)
+ while buf && buf.size > 0
+ # FIXME: Catch errors
+ n = @sock.write(buf)
+ raw.debug("SENT #{buf[0, n].inspect}") if raw
+ buf[0,n] = ""
+ @sock.flush
+ end
+ end
+
+ def read(n)
+ while @rx_buf.size < n
+ begin
+ s = @sock.recv(@maxbufsize)
+ if @security_layer_rx
+ s = Sasl.decode(@security_layer_rx, s)
+ end
+ rescue IOError => e
+ raise e if @rx_buf != ""
+ @sock.close unless @sock.closed?
+ raise Closed
+ end
+ # FIXME: Catch errors
+ if s.nil? or s.size == 0
+ @sock.close unless @sock.closed?
+ raise Closed
+ end
+ @rx_buf += s
+ raw.debug("RECV #{n}/#{@rx_buf.size} #{s.inspect}") if raw
+ end
+ data = @rx_buf[0, n]
+ @rx_buf = @rx_buf[n, @rx_buf.size - n]
+ return data
+ end
+
+ def read_header
+ unpack(Framer::HEADER, Framer::HEADER_SIZE)
+ end
+
+ def write_header(major, minor)
+ @sock.synchronize do
+ pack(Framer::HEADER, "AMQP", 1, 1, major, minor)
+ flush()
+ end
+ end
+
+ def write_frame(frame)
+ @sock.synchronize do
+ size = frame.payload.size + Frame::HEADER_SIZE
+ track = frame.track & 0x0F
+ pack(Frame::HEADER, frame.flags, frame.type, size, track, frame.channel)
+ write(frame.payload)
+ if frame.last_segment? and frame.last_frame?
+ flush()
+ frm.debug("SENT #{frame}") if frm
+ end
+ end
+ end
+
+ def read_frame
+ flags, type, size, track, channel = unpack(Frame::HEADER, Frame::HEADER_SIZE)
+ raise FramingError if (flags & 0xF0 > 0)
+ payload = read(size - Frame::HEADER_SIZE)
+ frame = Frame.new(flags, type, track, channel, payload)
+ frm.debug("RECV #{frame}") if frm
+ return frame
+ end
+ end
+end
diff --git a/ruby/lib/qpid/invoker.rb b/ruby/lib/qpid/invoker.rb
new file mode 100644
index 0000000000..39716ac6c2
--- /dev/null
+++ b/ruby/lib/qpid/invoker.rb
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Qpid::Invoker
+
+ # Requires that client defines a invoke method and overrides
+ # resolve_method
+
+ # FIXME: Is it really worth defining methods in method_missing ? We
+ # could just dispatch there directly
+
+ def invc_method(name, resolved)
+ define_singleton_method(name) { |*args| invoke(resolved, args) }
+ # FIXME: the Python code also attaches docs from resolved.pydoc
+ end
+
+ def invc_value(name, resolved)
+ define_singleton_method(name) { | | resolved }
+ end
+
+ def invc_error(name, resolved)
+ msg = "%s instance has no attribute '%s'" % [self.class.name, name]
+ if resolved
+ msg += "\n%s" % resolved
+ end
+ raise NameError, msg
+ end
+
+ def resolve_method(name)
+ invocation(:error, nil)
+ end
+
+ def method_missing(name, *args)
+ disp, resolved = resolve_method(name)
+ disp.call(name, resolved)
+ send(name, *args)
+ end
+
+ def invocation(kind, name = nil)
+ [ method("invc_#{kind}"), name ]
+ end
+
+ private
+ def define_singleton_method(name, &body)
+ singleton_class = class << self; self; end
+ singleton_class.send(:define_method, name, &body)
+ end
+
+end
diff --git a/ruby/qpid.rb b/ruby/lib/qpid/packer.rb
index 25cd26f362..ae1be37faf 100644
--- a/ruby/qpid.rb
+++ b/ruby/lib/qpid/packer.rb
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,9 +17,17 @@
# under the License.
#
-require "qpid/client"
-require "qpid/queue"
-require "qpid/codec"
-require "qpid/connection"
-require "qpid/peer"
-require "qpid/spec"
+module Qpid
+ module Packer
+ def unpack(fmt, len)
+ raw = read(len)
+ values = raw.unpack(fmt)
+ values = values[0] if values.size == 1
+ return values
+ end
+
+ def pack(fmt, *args)
+ write(args.pack(fmt))
+ end
+ end
+end
diff --git a/ruby/qpid/peer.rb b/ruby/lib/qpid/peer.rb
index 320808fdc6..cdb962169b 100644
--- a/ruby/qpid/peer.rb
+++ b/ruby/lib/qpid/peer.rb
@@ -19,10 +19,12 @@
require "thread"
require "qpid/queue"
-require "qpid/connection"
+require "qpid/connection08"
require "qpid/fields"
-module Qpid
+module Qpid08
+
+ Queue = Qpid::Queue
class Peer
@@ -60,9 +62,9 @@ module Qpid
@mutex.synchronize do
@channels.each_value do |ch|
ch.close()
- @outgoing.close()
- @work.close()
end
+ @outgoing.close()
+ @work.close()
end
end
@@ -95,22 +97,22 @@ module Qpid
def writer()
while true
- @conn.write(@outgoing.pop())
+ @conn.write(@outgoing.get())
end
end
def worker()
while true
- dispatch(@work.pop())
+ dispatch(@work.get())
end
end
def dispatch(queue)
- frame = queue.pop()
+ frame = queue.get()
ch = channel(frame.channel)
payload = frame.payload
if payload.method.content?
- content = Qpid::read_content(queue)
+ content = Qpid08::read_content(queue)
else
content = nil
end
@@ -161,9 +163,9 @@ module Qpid
def method_missing(name, *args)
method = @spec.find_method(name)
- if method.nil?
- raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}")
- end
+ if method.nil?
+ raise NoMethodError.new("undefined method '#{name}' for #{self}:#{self.class}")
+ end
if args.size == 1 and args[0].instance_of? Hash
kwargs = args[0]
@@ -205,7 +207,7 @@ module Qpid
nowait = args[method.fields.index(f)] unless f.nil?
unless nowait or method.responses.empty?
- resp = @responses.pop().payload
+ resp = @responses.get().payload
if resp.method.content?
content = read_content(@responses)
else
@@ -230,8 +232,8 @@ module Qpid
end
- def Qpid.read_content(queue)
- frame = queue.pop()
+ def Qpid08.read_content(queue)
+ frame = queue.get()
header = frame.payload
children = []
1.upto(header.weight) { children << read_content(queue) }
@@ -239,7 +241,7 @@ module Qpid
read = 0
buf = ""
while read < size
- body = queue.pop()
+ body = queue.get()
content = body.payload.content
buf << content
read += content.size
diff --git a/ruby/lib/qpid/qmf.rb b/ruby/lib/qpid/qmf.rb
new file mode 100644
index 0000000000..4711d355cd
--- /dev/null
+++ b/ruby/lib/qpid/qmf.rb
@@ -0,0 +1,1957 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Console API for Qpid Management Framework
+
+require 'socket'
+require 'monitor'
+require 'thread'
+require 'uri'
+require 'time'
+
+module Qpid::Qmf
+
+ # To access the asynchronous operations, a class must be derived from
+ # Console with overrides of any combination of the available methods.
+ class Console
+
+ # Invoked when a connection is established to a broker
+ def broker_connected(broker); end
+
+ # Invoked when the connection to a broker is lost
+ def broker_disconnected(broker); end
+
+ # Invoked when a QMF package is discovered
+ def new_package(name); end
+
+ # Invoked when a new class is discovered. Session.getSchema can be
+ # used to obtain details about the class
+ def new_class(kind, klass_key); end
+
+ # Invoked when a QMF agent is discovered
+ def new_agent(agent); end
+
+ # Invoked when a QMF agent disconects
+ def del_agent(agent); end
+
+ # Invoked when an object is updated
+ def object_props(broker, record); end
+
+ # Invoked when an object is updated
+ def object_stats(broker, record); end
+
+ # Invoked when an event is raised
+ def event(broker, event); end
+
+ # Invoked when an agent heartbeat is received.
+ def heartbeat(agent, timestamp); end
+
+ # Invoked when the connection sequence reaches the point where broker information is available.
+ def broker_info(broker); end
+
+ # Invoked when a method response from an asynchronous method call is received.
+ def method_response(broker, seq, response); end
+ end
+
+ class BrokerURL
+
+ attr_reader :host, :port, :auth_name, :auth_pass
+
+ def initialize(text)
+ uri = URI.parse(text)
+
+ @host = uri.host
+ @port = uri.port ? uri.port : 5672
+ @auth_name = uri.user
+ @auth_pass = uri.password
+
+ return uri
+ end
+
+ def name
+ "#{@host}:#{@port}"
+ end
+
+ def match(host, port)
+ # FIXME: Unlcear what the Python code is actually checking for
+ # here, especially since HOST can resolve to multiple IP's
+ @port == port &&
+ (host == @host || ipaddr(host, port) == ipaddr(@host, @port))
+ end
+
+ private
+ def ipaddr(host, port)
+ s = Socket::getaddrinfo(host, port,
+ Socket::AF_INET, Socket::SOCK_STREAM)
+ s[0][2]
+ end
+ end
+
+ # An instance of the Session class represents a console session running
+ # against one or more QMF brokers. A single instance of Session is
+ # needed to interact with the management framework as a console.
+ class Session
+ CONTEXT_SYNC = 1
+ CONTEXT_STARTUP = 2
+ CONTEXT_MULTIGET = 3
+
+ DEFAULT_GET_WAIT_TIME = 60
+
+ include MonitorMixin
+
+ attr_reader :binding_key_list, :select, :seq_mgr, :console, :packages
+
+ # Initialize a session. If the console argument is provided, the
+ # more advanced asynchronous features are available. If console is
+ # defaulted, the session will operate in a simpler, synchronous
+ # manner. The rcvObjects, rcvEvents, and rcvHeartbeats arguments
+ # are meaningful only if 'console' is provided. They control
+ # whether object updates, events, and agent-heartbeats are
+ # subscribed to. If the console is not interested in receiving one
+ # or more of the above, setting the argument to False will reduce
+ # tha bandwidth used by the API. If manageConnections is set to
+ # True, the Session object will manage connections to the brokers.
+ # This means that if a broker is unreachable, it will retry until a
+ # connection can be established. If a connection is lost, the
+ # Session will attempt to reconnect.
+ #
+ # If manageConnections is set to False, the user is responsible for
+ # handing failures. In this case, an unreachable broker will cause
+ # addBroker to raise an exception. If userBindings is set to False
+ # (the default) and rcvObjects is True, the console will receive
+ # data for all object classes. If userBindings is set to True, the
+ # user must select which classes the console shall receive by
+ # invoking the bindPackage or bindClass methods. This allows the
+ # console to be configured to receive only information that is
+ # relavant to a particular application. If rcvObjects id False,
+ # userBindings has no meaning.
+ #
+ # Accept a hash of parameters, where keys can be :console,
+ # :rcv_objects, :rcv_events, :rcv_heartbeats, :manage_connections,
+ # and :user_bindings
+ def initialize(kwargs = {})
+ super()
+ @console = kwargs[:console] || nil
+ @brokers = []
+ @packages = {}
+ @seq_mgr = SequenceManager.new
+ @cv = new_cond
+ @sync_sequence_list = []
+ @result = []
+ @select = []
+ @error = nil
+ @rcv_objects = kwargs[:rcv_objects] == nil ? true : kwargs[:rcv_objects]
+ @rcv_events = kwargs[:rcv_events] == nil ? true : kwargs[:rcv_events]
+ @rcv_heartbeats = kwargs[:rcv_heartbeats] == nil ? true : kwargs[:rcv_heartbeats]
+ @user_bindings = kwargs[:user_bindings] == nil ? false : kwargs[:user_bindings]
+ unless @console
+ @rcv_objects = false
+ @rcv_events = false
+ @rcv_heartbeats = false
+ end
+ @binding_key_list = binding_keys
+ @manage_connections = kwargs[:manage_connections] || false
+
+ if @user_bindings && ! @rcv_objects
+ raise ArgumentError, "user_bindings can't be set unless rcv_objects is set and a console is provided"
+ end
+
+ end
+
+ def to_s
+ "QMF Console Session Manager (brokers: #{@brokers.size})"
+ end
+
+ def managedConnections?
+ return @manage_connections
+ end
+
+ # Connect to a Qpid broker. Returns an object of type Broker
+ #
+ # To supply a username for authentication, use the URL syntax:
+ #
+ # amqp://username@hostname:port
+ #
+ # If the broker needs a password for the client, an interactive prompt will be
+ # provided to the user.
+ #
+ # To supply a username and a password, use
+ #
+ # amqp://username:password@hostname:port
+ #
+ # The following keyword arguments may be used to control authentication:
+ #
+ # :mechanism - SASL mechanism (i.e. "PLAIN", "GSSAPI", "ANONYMOUS", etc.
+ # - defaults to unspecified (the system chooses for you)
+ # :service - SASL service name (i.e. the kerberos principal of the broker)
+ # - defaults to "qpidd"
+ # :min_ssf - Minimum Security Strength Factor for SASL security layers
+ # - defaults to 0
+ # :max_ssf - Maximum Security Strength Factor for SASL security layers
+ # - defaults to 65535
+ #
+ def add_broker(target = "amqp://localhost", kwargs = {})
+ url = BrokerURL.new(target)
+ broker = Broker.new(self, url.host, url.port, url.auth_name, url.auth_pass, kwargs)
+ unless broker.connected? || @manage_connections
+ raise broker.error
+ end
+
+ @brokers << broker
+ objects(:broker => broker, :class => "agent") unless @manage_connections
+ return broker
+ end
+
+ # Disconnect from a broker. The 'broker' argument is the object
+ # returned from the addBroker call
+ def del_broker(broker)
+ broker.shutdown
+ @brokers.delete(broker)
+ end
+
+ # Get the list of known classes within a QMF package
+ def classes(package_name)
+ list = []
+ @brokers.each { |broker| broker.wait_for_stable }
+ if @packages.include?(package_name)
+ # FIXME What's the actual structure of @packages[package_name]
+ @packages[package_name].each do |key, schema_class|
+ list << schema_class.klass_key
+ end
+ end
+ return list
+ end
+
+ # Get the schema for a QMF class
+ def schema(klass_key)
+ @brokers.each { |broker| broker.wait_for_stable }
+ if @packages.include?(klass_key.package)
+ @packages[klass_key.package][ [klass_key.klass_name, klass_key.hash] ]
+ end
+ end
+
+ def bind_package(package_name)
+ unless @user_bindings && @rcv_objects
+ raise "userBindings option not set for Session"
+ end
+ @brokers.each do |broker|
+ args = { :exchange => "qpid.management",
+ :queue => broker.topic_name,
+ :binding_key => "console.obj.*.*.#{package_name}.#" }
+ broker.amqp_session.exchange_bind(args)
+ end
+ end
+
+ def bind_class(package_name, class_name)
+ unless @user_bindings && @rcv_objects
+ raise "userBindings option not set for Session"
+ end
+ @brokers.each do |broker|
+ args = { :exchange => "qpid.management",
+ :queue => broker.topic_name,
+ :binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" }
+ broker.amqp_session.exchange_bind(args)
+ end
+ end
+
+ def bind_class_key(klass_key)
+ unless @user_bindings && @rcv_objects
+ raise "userBindings option not set for Session"
+ end
+ pname, cname, hash = klass_key.to_a()
+ @brokers.each do |broker|
+ args = { :exchange => "qpid.management",
+ :queue => broker.topic_name,
+ :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" }
+ broker.amqp_session.exchange_bind(args)
+ end
+ end
+
+ # Get a list of currently known agents
+ def agents(broker=nil)
+ broker_list = []
+ if broker.nil?
+ broker_list = @brokers.dup
+ else
+ broker_list << broker
+ end
+ broker_list.each { |b| b.wait_for_stable }
+ agent_list = []
+ broker_list.each { |b| agent_list += b.agents }
+ return agent_list
+ end
+
+ # Get a list of objects from QMF agents.
+ # All arguments are passed by name(keyword).
+ #
+ # The class for queried objects may be specified in one of the
+ # following ways:
+ # :schema => <schema> - supply a schema object returned from getSchema.
+ # :key => <key> - supply a klass_key from the list returned by getClasses.
+ # :class => <name> - supply a class name as a string. If the class name exists
+ # in multiple packages, a _package argument may also be supplied.
+ # :object_id = <id> - get the object referenced by the object-id
+ #
+ # If objects should be obtained from only one agent, use the following argument.
+ # Otherwise, the query will go to all agents.
+ #
+ # :agent = <agent> - supply an agent from the list returned by getAgents.
+ #
+ # If the get query is to be restricted to one broker (as opposed to
+ # all connected brokers), add the following argument:
+ #
+ # :broker = <broker> - supply a broker as returned by addBroker.
+ #
+ # The default timeout for this synchronous operation is 60 seconds. To change the timeout,
+ # use the following argument:
+ #
+ # :timeout = <time in seconds>
+ #
+ # If additional arguments are supplied, they are used as property
+ # selectors, as long as their keys are strings. For example, if
+ # the argument "name" => "test" is supplied, only objects whose
+ # "name" property is "test" will be returned in the result.
+ def objects(kwargs)
+ if kwargs.include?(:broker)
+ broker_list = []
+ broker_list << kwargs[:broker]
+ else
+ broker_list = @brokers
+ end
+ broker_list.each { |broker|
+ broker.wait_for_stable
+ if kwargs[:package] != "org.apache.qpid.broker" or kwargs[:class] != "agent"
+ objects(:agent => broker.agent(1,0), :package => "org.apache.qpid.broker", :class => "agent") if broker.connected?
+ end
+ }
+
+ agent_list = []
+ if kwargs.include?(:agent)
+ agent = kwargs[:agent]
+ unless broker_list.include?(agent.broker)
+ raise ArgumentError, "Supplied agent is not accessible through the supplied broker"
+ end
+ agent_list << agent if agent.broker.connected?
+ else
+ if kwargs.include?(:object_id)
+ oid = kwargs[:object_id]
+ broker_list.each { |broker|
+ broker.agents.each { |agent|
+ if oid.broker_bank == agent.broker_bank && oid.agent_bank == agent.agent_bank
+ agent_list << agent if agent.broker.connected?
+ end
+ }
+ }
+ else
+ broker_list.each { |broker|
+ agent_list += broker.agents if broker.connected?
+ }
+ end
+ end
+
+ cname = nil
+ if kwargs.include?(:schema)
+ # FIXME: What kind of object is kwargs[:schema]
+ pname, cname, hash = kwargs[:schema].getKey().to_a
+ elsif kwargs.include?(:key)
+ pname, cname, hash = kwargs[:key].to_a
+ elsif kwargs.include?(:class)
+ pname, cname, hash = [kwargs[:package], kwargs[:class], nil]
+ end
+ if cname.nil? && ! kwargs.include?(:object_id)
+ raise ArgumentError,
+ "No class supplied, use :schema, :key, :class, or :object_id' argument"
+ end
+
+ map = {}
+ @select = []
+ if kwargs.include?(:object_id)
+ map["_objectid"] = kwargs[:object_id].to_s
+ else
+ map["_class"] = cname
+ map["_package"] = pname if pname
+ map["_hash"] = hash if hash
+ kwargs.each do |k,v|
+ @select << [k, v] if k.is_a?(String)
+ end
+ end
+
+ @result = []
+ agent_list.each do |agent|
+ broker = agent.broker
+ send_codec = Qpid::StringCodec.new(broker.conn.spec)
+ seq = nil
+ synchronize do
+ seq = @seq_mgr.reserve(CONTEXT_MULTIGET)
+ @sync_sequence_list << seq
+ end
+ broker.set_header(send_codec, ?G, seq)
+ send_codec.write_map(map)
+ bank_key = "%d.%d" % [broker.broker_bank, agent.agent_bank]
+ smsg = broker.message(send_codec.encoded, "agent.#{bank_key}")
+ broker.emit(smsg)
+ end
+
+ timeout = false
+ if kwargs.include?(:timeout)
+ wait_time = kwargs[:timeout]
+ else
+ wait_time = DEFAULT_GET_WAIT_TIME
+ end
+ synchronize do
+ unless @cv.wait_for(wait_time) { @sync_sequence_list.empty? || @error }
+ @sync_sequence_list.each do |pending_seq|
+ @seq_mgr.release(pending_seq)
+ end
+ @sync_sequence_list = []
+ timeout = true
+ end
+ end
+
+ if @error
+ errorText = @error
+ @error = nil
+ raise errorText
+ end
+
+ if @result.empty? && timeout
+ raise "No agent responded within timeout period"
+ end
+ @result
+ end
+
+ # Return one and only one object or nil.
+ def object(kwargs)
+ objs = objects(kwargs)
+ return objs.length == 1 ? objs[0] : nil
+ end
+
+ # Return the first of potentially many objects.
+ def first_object(kwargs)
+ objs = objects(kwargs)
+ return objs.length > 0 ? objs[0] : nil
+ end
+
+ def set_event_filter(kwargs); end
+
+ def handle_broker_connect(broker); end
+
+ def handle_broker_resp(broker, codec, seq)
+ broker.broker_id = codec.read_uuid
+ @console.broker_info(broker) if @console
+
+ # Send a package request
+ # (effectively inc and dec outstanding by not doing anything)
+ send_codec = Qpid::StringCodec.new(broker.conn.spec)
+ seq = @seq_mgr.reserve(CONTEXT_STARTUP)
+ broker.set_header(send_codec, ?P, seq)
+ smsg = broker.message(send_codec.encoded)
+ broker.emit(smsg)
+ end
+
+ def handle_package_ind(broker, codec, seq)
+ pname = codec.read_str8
+ new_package = false
+ synchronize do
+ new_package = ! @packages.include?(pname)
+ @packages[pname] = {} if new_package
+ end
+ @console.new_package(pname) if @console
+
+ # Send a class request
+ broker.inc_outstanding
+ send_codec = Qpid::StringCodec.new(broker.conn.spec)
+ seq = @seq_mgr.reserve(CONTEXT_STARTUP)
+ broker.set_header(send_codec, ?Q, seq)
+ send_codec.write_str8(pname)
+ smsg = broker.message(send_codec.encoded)
+ broker.emit(smsg)
+ end
+
+ def handle_command_complete(broker, codec, seq)
+ code = codec.read_uint32
+ text = codec.read_str8
+ context = @seq_mgr.release(seq)
+ if context == CONTEXT_STARTUP
+ broker.dec_outstanding
+ elsif context == CONTEXT_SYNC && seq == broker.sync_sequence
+ broker.sync_done
+ elsif context == CONTEXT_MULTIGET && @sync_sequence_list.include?(seq)
+ synchronize do
+ @sync_sequence_list.delete(seq)
+ @cv.signal if @sync_sequence_list.empty?
+ end
+ end
+ end
+
+ def handle_class_ind(broker, codec, seq)
+ kind = codec.read_uint8
+ classKey = ClassKey.new(codec)
+ unknown = false
+
+ synchronize do
+ return unless @packages.include?(classKey.package)
+ unknown = true unless @packages[classKey.package].include?([classKey.klass_name, classKey.hash])
+ end
+
+
+ if unknown
+ # Send a schema request for the unknown class
+ broker.inc_outstanding
+ send_codec = Qpid::StringCodec.new(broker.conn.spec)
+ seq = @seq_mgr.reserve(CONTEXT_STARTUP)
+ broker.set_header(send_codec, ?S, seq)
+ classKey.encode(send_codec)
+ smsg = broker.message(send_codec.encoded)
+ broker.emit(smsg)
+ end
+ end
+
+ def handle_method_resp(broker, codec, seq)
+ code = codec.read_uint32
+ text = codec.read_str16
+ out_args = {}
+ pair = @seq_mgr.release(seq)
+ return unless pair
+ method, synchronous = pair
+ if code == 0
+ method.arguments.each do |arg|
+ if arg.dir.index(?O)
+ out_args[arg.name] = decode_value(codec, arg.type)
+ end
+ end
+ end
+ result = MethodResult.new(code, text, out_args)
+ if synchronous:
+ broker.synchronize do
+ broker.sync_result = MethodResult.new(code, text, out_args)
+ broker.sync_done
+ end
+ else
+ @console.method_response(broker, seq, result) if @console
+ end
+ end
+
+ def handle_heartbeat_ind(broker, codec, seq, msg)
+ if @console
+ broker_bank = 1
+ agent_bank = 0
+ dp = msg.get("delivery_properties")
+ if dp
+ key = dp["routing_key"]
+ key_elements = key.split(".")
+ if key_elements.length == 4
+ broker_bank = key_elements[2].to_i
+ agent_bank = key_elements[3].to_i
+ end
+ end
+ agent = broker.agent(broker_bank, agent_bank)
+ timestamp = codec.read_uint64
+ @console.heartbeat(agent, timestamp) if agent
+ end
+ end
+
+ def handle_event_ind(broker, codec, seq)
+ if @console
+ event = Event.new(self, broker, codec)
+ @console.event(broker, event)
+ end
+ end
+
+ def handle_schema_resp(broker, codec, seq)
+ kind = codec.read_uint8
+ classKey = ClassKey.new(codec)
+ klass = SchemaClass.new(self, kind, classKey, codec)
+ synchronize { @packages[classKey.package][ [classKey.klass_name, classKey.hash] ] = klass }
+
+ @seq_mgr.release(seq)
+ broker.dec_outstanding
+ @console.new_class(kind, classKey) if @console
+ end
+
+ def handle_content_ind(broker, codec, seq, prop=false, stat=false)
+ klass_key = ClassKey.new(codec)
+ pname, cname, hash = klass_key.to_a() ;
+
+ schema = nil
+ synchronize do
+ return unless @packages.include?(klass_key.package)
+ return unless @packages[klass_key.package].include?([klass_key.klass_name, klass_key.hash])
+ schema = @packages[klass_key.package][ [klass_key.klass_name, klass_key.hash] ]
+ end
+
+
+ object = Qpid::Qmf::Object.new(self, broker, schema, codec, prop, stat)
+ if pname == "org.apache.qpid.broker" && cname == "agent" && prop
+ broker.update_agent(object)
+ end
+
+ synchronize do
+ if @sync_sequence_list.include?(seq)
+ if object.timestamps()[2] == 0 && select_match(object)
+ @result << object
+ end
+ return
+ end
+ end
+
+ @console.object_props(broker, object) if @console && @rcv_objects && prop
+ @console.object_stats(broker, object) if @console && @rcv_objects && stat
+ end
+
+ def handle_broker_disconnect(broker); end
+
+ def handle_error(error)
+ synchronize do
+ @error = error if @sync_sequence_list.length > 0
+ @sync_sequence_list = []
+ @cv.signal
+ end
+ end
+
+ # Decode, from the codec, a value based on its typecode
+ def decode_value(codec, typecode)
+ case typecode
+ when 1: data = codec.read_uint8 # U8
+ when 2: data = codec.read_uint16 # U16
+ when 3: data = codec.read_uint32 # U32
+ when 4: data = codec.read_uint64 # U64
+ when 6: data = codec.read_str8 # SSTR
+ when 7: data = codec.read_str16 # LSTR
+ when 8: data = codec.read_int64 # ABSTIME
+ when 9: data = codec.read_uint64 # DELTATIME
+ when 10: data = ObjectId.new(codec) # REF
+ when 11: data = codec.read_uint8 != 0 # BOOL
+ when 12: data = codec.read_float # FLOAT
+ when 13: data = codec.read_double # DOUBLE
+ when 14: data = codec.read_uuid # UUID
+ when 15: data = codec.read_map # FTABLE
+ when 16: data = codec.read_int8 # S8
+ when 17: data = codec.read_int16 # S16
+ when 18: data = codec.read_int32 # S32
+ when 19: data = codec.read_int64 # S64
+ when 20: # Object
+ inner_type_code = codec.read_uint8()
+ if (inner_type_code == 20)
+ classKey = ClassKey.new(codec)
+ innerSchema = schema(classKey)
+ data = Object.new(self, @broker, innerSchema, codec, true, true, false) if innerSchema
+ else
+ data = decode_value(codec, inner_type_code)
+ end
+ when 21:
+ data = []
+ rec_codec = Qpid::StringCodec.new(codec.spec, codec.read_vbin32())
+ count = rec_codec.read_uint32()
+ while count > 0 do
+ type = rec_codec.read_uint8()
+ data << (decode_value(rec_codec,type))
+ count -= 1
+ end
+ when 22:
+ data = []
+ rec_codec = Qpid::StringCodec.new(codec.spec, codec.read_vbin32())
+ count = rec_codec.read_uint32()
+ type = rec_codec.read_uint8()
+ while count > 0 do
+ data << (decode_value(rec_codec,type))
+ count -= 1
+ end
+ else
+ raise ArgumentError, "Invalid type code: #{typecode} - #{typecode.inspect}"
+ end
+ return data
+ end
+
+ ENCODINGS = {
+ String => 6,
+ Fixnum => 18,
+ Bignum => 19,
+ Float => 12,
+ Array => 21,
+ Hash => 15
+ }
+
+ def encoding(object)
+ klass = object.class
+ if ENCODINGS.has_key?(klass)
+ return ENCODINGS[klass]
+ end
+ for base in klass.__bases__
+ result = encoding(base)
+ return result unless result.nil?
+ end
+ end
+
+ # Encode, into the codec, a value based on its typecode
+ def encode_value(codec, value, typecode)
+ # FIXME: Python does a lot of magic type conversions
+ # We just assume that value has the right type; this is safer
+ # than coercing explicitly, since Array::pack will complain
+ # loudly about various type errors
+ case typecode
+ when 1: codec.write_uint8(value) # U8
+ when 2: codec.write_uint16(value) # U16
+ when 3: codec.write_uint32(value) # U32
+ when 4: codec.write_uint64(value) # U64
+ when 6: codec.write_str8(value) # SSTR
+ when 7: codec.write_str16(value) # LSTR
+ when 8: codec.write_int64(value) # ABSTIME
+ when 9: codec.write_uint64(value) # DELTATIME
+ when 10: value.encode(codec) # REF
+ when 11: codec.write_uint8(value ? 1 : 0) # BOOL
+ when 12: codec.write_float(value) # FLOAT
+ when 13: codec.write_double(value) # DOUBLE
+ when 14: codec.write_uuid(value) # UUID
+ when 15: codec.write_map(value) # FTABLE
+ when 16: codec.write_int8(value) # S8
+ when 17: codec.write_int16(value) # S16
+ when 18: codec.write_int32(value) # S32
+ when 19: codec.write_int64(value) # S64
+ when 20: value.encode(codec)
+ when 21: # List
+ send_codec = Qpid::StringCodec.new(codec.spec)
+ encode_value(send_codec, value.size, 3)
+ value.each do v
+ ltype = encoding(v)
+ encode_value(send_codec,ltype,1)
+ encode_value(send_codec,v,ltype)
+ end
+ codec.write_vbin32(send_codec.encoded)
+ when 22: # Array
+ send_codec = Qpid::StringCodec.new(codec.spec)
+ encode_value(send_codec, value.size, 3)
+ if value.size > 0
+ ltype = encoding(value[0])
+ encode_value(send_codec,ltype,1)
+ value.each do v
+ encode_value(send_codec,v,ltype)
+ end
+ end
+ codec.write_vbin32(send_codec.encoded)
+ else
+ raise ValueError, "Invalid type code: %d" % typecode
+ end
+ end
+
+ def display_value(value, typecode)
+ case typecode
+ when 1: return value.to_s
+ when 2: return value.to_s
+ when 3: return value.to_s
+ when 4: return value.to_s
+ when 6: return value.to_s
+ when 7: return value.to_s
+ when 8: return strftime("%c", gmtime(value / 1000000000))
+ when 9: return value.to_s
+ when 10: return value.to_s
+ when 11: return value ? 'T' : 'F'
+ when 12: return value.to_s
+ when 13: return value.to_s
+ when 14: return Qpid::UUID::format(value)
+ when 15: return value.to_s
+ when 16: return value.to_s
+ when 17: return value.to_s
+ when 18: return value.to_s
+ when 19: return value.to_s
+ when 20: return value.to_s
+ when 21: return value.to_s
+ when 22: return value.to_s
+ else
+ raise ValueError, "Invalid type code: %d" % typecode
+ end
+ end
+
+ private
+
+ def binding_keys
+ key_list = []
+ key_list << "schema.#"
+ if @rcv_objects && @rcv_events && @rcv_heartbeats &&
+ ! @user_bindings
+ key_list << "console.#"
+ else
+ if @rcv_objects && ! @user_bindings
+ key_list << "console.obj.#"
+ else
+ key_list << "console.obj.*.*.org.apache.qpid.broker.agent"
+ end
+ key_list << "console.event.#" if @rcv_events
+ key_list << "console.heartbeat.#" if @rcv_heartbeats
+ end
+ return key_list
+ end
+
+ # Check the object against select to check for a match
+ def select_match(object)
+ select.each do |key, value|
+ object.properties.each do |prop, propval|
+ return false if key == prop.name && value != propval
+ end
+ end
+ return true
+ end
+
+ end
+
+ class Package
+ attr_reader :name
+
+ def initialize(name)
+ @name = name
+ end
+ end
+
+ # A ClassKey uniquely identifies a class from the schema.
+ class ClassKey
+ attr_reader :package, :klass_name, :hash
+
+ def initialize(package="", klass_name="", hash=0)
+ if (package.kind_of?(Qpid::Codec))
+ @package = package.read_str8()
+ @klass_name = package.read_str8()
+ @hash = package.read_bin128()
+ else
+ @package = package
+ @klass_name = klass_name
+ @hash = hash
+ end
+ end
+
+ def encode(codec)
+ codec.write_str8(@package)
+ codec.write_str8(@klass_name)
+ codec.write_bin128(@hash)
+ end
+
+ def to_a()
+ return [@package, @klass_name, @hash]
+ end
+
+ def hash_string()
+ "%08x-%08x-%08x-%08x" % hash.unpack("NNNN")
+ end
+
+ def to_s()
+ return "#{@package}:#{@klass_name}(#{hash_string()})"
+ end
+ end
+
+ class SchemaClass
+
+ CLASS_KIND_TABLE = 1
+ CLASS_KIND_EVENT = 2
+
+ attr_reader :klass_key, :arguments, :super_klass_key
+
+ def initialize(session, kind, key, codec)
+ @session = session
+ @kind = kind
+ @klass_key = key
+ @super_klass_key = nil
+ @properties = []
+ @statistics = []
+ @methods = []
+ @arguments = []
+
+ has_supertype = 0 #codec.read_uint8
+ if @kind == CLASS_KIND_TABLE
+ prop_count = codec.read_uint16
+ stat_count = codec.read_uint16
+ method_count = codec.read_uint16
+ if has_supertype == 1
+ @super_klass_key = ClassKey.new(codec)
+ end
+ prop_count.times { |idx|
+ @properties << SchemaProperty.new(codec) }
+ stat_count.times { |idx|
+ @statistics << SchemaStatistic.new(codec) }
+ method_count.times { |idx|
+ @methods<< SchemaMethod.new(codec) }
+ elsif @kind == CLASS_KIND_EVENT
+ arg_count = codec.read_uint16
+ arg_count.times { |idx|
+ sa = SchemaArgument.new(codec, false)
+ @arguments << sa
+ }
+ end
+ end
+
+ def is_table?
+ @kind == CLASS_KIND_TABLE
+ end
+
+ def is_event?
+ @kind == CLASS_KIND_EVENT
+ end
+
+ def properties(include_inherited = true)
+ returnValue = @properties
+ if !@super_klass_key.nil? && include_inherited
+ returnValue = @properties + @session.schema(@super_klass_key).properties
+ end
+ return returnValue
+ end
+
+ def statistics(include_inherited = true)
+ returnValue = @statistics
+ if !@super_klass_key.nil? && include_inherited
+ returnValue = @statistics + @session.schema(@super_klass_key).statistics
+ end
+ return returnValue
+ end
+
+ def methods(include_inherited = true)
+ returnValue = @methods
+ if !@super_klass_key.nil? && include_inherited
+ returnValue = @methods + @session.schema(@super_klass_key).methods
+ end
+ return returnValue
+ end
+
+ def to_s
+ if @kind == CLASS_KIND_TABLE
+ kind_str = "Table"
+ elsif @kind == CLASS_KIND_EVENT
+ kind_str = "Event"
+ else
+ kind_str = "Unsupported"
+ end
+ "#{kind_str} Class: #{klass_key.to_s}"
+ end
+ end
+
+ class SchemaProperty
+
+ attr_reader :name, :type, :access, :index, :optional,
+ :unit, :min, :max, :maxlen, :desc, :refClass, :refPackage
+
+ def initialize(codec)
+ map = codec.read_map
+ @name = map["name"]
+ @type = map["type"]
+ @access = map["access"]
+ @index = map["index"] != 0
+ @optional = map["optional"] != 0
+ @unit = map["unit"]
+ @min = map["min"]
+ @max = map["max"]
+ @maxlen = map["maxlen"]
+ @desc = map["desc"]
+ @refClass = map["refClass"]
+ @refPackage = map["refPackage"]
+ end
+
+ def to_s
+ @name
+ end
+ end
+
+ class SchemaStatistic
+
+ attr_reader :name, :type, :unit, :desc, :refClass, :refPackage
+
+ def initialize(codec)
+ map = codec.read_map
+ @name = map["name"]
+ @type = map["type"]
+ @unit = map["unit"]
+ @desc = map["desc"]
+ @refClass = map["refClass"]
+ @refPackage = map["refPackage"]
+ end
+
+ def to_s
+ @name
+ end
+ end
+
+ class SchemaMethod
+
+ attr_reader :name, :desc, :arguments
+
+ def initialize(codec)
+ map = codec.read_map
+ @name = map["name"]
+ arg_count = map["argCount"]
+ @desc = map["desc"]
+ @arguments = []
+ arg_count.times { |idx|
+ @arguments << SchemaArgument.new(codec, true)
+ }
+ end
+
+ def to_s
+ result = @name + "("
+ first = true
+ result += @arguments.select { |arg| arg.dir.index(?I) }.join(", ")
+ result += ")"
+ return result
+ end
+ end
+
+ class SchemaArgument
+
+ attr_reader :name, :type, :dir, :unit, :min, :max, :maxlen
+ attr_reader :desc, :default, :refClass, :refPackage
+
+ def initialize(codec, method_arg)
+ map = codec.read_map
+ @name = map["name"]
+ @type = map["type"]
+ @dir = map["dir"].upcase if method_arg
+ @unit = map["unit"]
+ @min = map["min"]
+ @max = map["max"]
+ @maxlen = map["maxlen"]
+ @desc = map["desc"]
+ @default = map["default"]
+ @refClass = map["refClass"]
+ @refPackage = map["refPackage"]
+ end
+ end
+
+ # Object that represents QMF object identifiers
+ class ObjectId
+
+ include Comparable
+
+ attr_reader :first, :second
+
+ def initialize(codec, first=0, second=0)
+ if codec
+ @first = codec.read_uint64
+ @second = codec.read_uint64
+ else
+ @first = first
+ @second = second
+ end
+ end
+
+ def <=>(other)
+ return 1 unless other.is_a?(ObjectId)
+ return -1 if first < other.first
+ return 1 if first > other.first
+ return second <=> other.second
+ end
+
+ def to_s
+ "%d-%d-%d-%d-%d" % [flags, sequence, broker_bank, agent_bank, object]
+ end
+
+ def index
+ [first, second]
+ end
+
+ def flags
+ (first & 0xF000000000000000) >> 60
+ end
+
+ def sequence
+ (first & 0x0FFF000000000000) >> 48
+ end
+
+ def broker_bank
+ (first & 0x0000FFFFF0000000) >> 28
+ end
+
+ def agent_bank
+ first & 0x000000000FFFFFFF
+ end
+
+ def object
+ second
+ end
+
+ def durable?
+ sequence == 0
+ end
+
+ def encode(codec)
+ codec.write_uint64(first)
+ codec.write_uint64(second)
+ end
+ end
+
+ class Object
+
+ DEFAULT_METHOD_WAIT_TIME = 60
+
+ attr_reader :object_id, :schema, :properties, :statistics,
+ :current_time, :create_time, :delete_time, :broker
+
+ def initialize(session, broker, schema, codec, prop, stat, managed=true)
+ @session = session
+ @broker = broker
+ @schema = schema
+ if managed
+ @current_time = codec.read_uint64
+ @create_time = codec.read_uint64
+ @delete_time = codec.read_uint64
+ @object_id = ObjectId.new(codec)
+ end
+ @properties = []
+ @statistics = []
+ if prop
+ missing = parse_presence_masks(codec, schema)
+ schema.properties.each do |property|
+ v = nil
+ unless missing.include?(property.name)
+ v = @session.decode_value(codec, property.type)
+ end
+ @properties << [property, v]
+ end
+ end
+
+ if stat
+ schema.statistics.each do |statistic|
+ s = @session.decode_value(codec, statistic.type)
+ @statistics << [statistic, s]
+ end
+ end
+ end
+
+ def klass_key
+ @schema.klass_key
+ end
+
+
+ def methods
+ @schema.methods
+ end
+
+ # Return the current, creation, and deletion times for this object
+ def timestamps
+ return [@current_time, @create_time, @delete_time]
+ end
+
+ # Return a string describing this object's primary key
+ def index
+ @properties.select { |property, value|
+ property.index
+ }.collect { |property,value|
+ @session.display_value(value, property.type) }.join(":")
+ end
+
+ # Replace properties and/or statistics with a newly received update
+ def merge_update(newer)
+ unless object_id == newer.object_id
+ raise "Objects with different object-ids"
+ end
+ @properties = newer.properties unless newer.properties.empty?
+ @statistics = newer.statistics unless newer.statistics.empty?
+ end
+
+ def update
+ obj = @session.object(:object_id => @object_id, :broker => @broker)
+ if obj
+ merge_update(obj)
+ else
+ raise "Underlying object no longer exists."
+ end
+ end
+
+ def to_s
+ @schema.klass_key.to_s
+ end
+
+ # This must be defined because ruby has this (deprecated) method built in.
+ def id
+ method_missing(:id)
+ end
+
+ # Same here..
+ def type
+ method_missing(:type)
+ end
+
+ def name
+ method_missing(:name)
+ end
+
+ def method_missing(name, *args)
+ name = name.to_s
+
+ if method = @schema.methods.find { |method| name == method.name }
+ return invoke(method, name, args)
+ end
+
+ @properties.each do |property, value|
+ return value if name == property.name
+ if name == "_#{property.name}_" && property.type == 10
+ # Dereference references
+ deref = @session.objects(:object_id => value, :broker => @broker)
+ return nil unless deref.size == 1
+ return deref[0]
+ end
+ end
+ @statistics.each do |statistic, value|
+ if name == statistic.name
+ return value
+ end
+ end
+ raise "Type Object has no attribute '#{name}'"
+ end
+
+ def encode(codec)
+ codec.write_uint8(20)
+ @schema.klass_key.encode(codec)
+
+ # emit presence masks for optional properties
+ mask = 0
+ bit = 0
+ schema.properties.each do |property|
+ if prop.optional
+ bit = 1 if bit == 0
+ mask |= bit if value
+ bit = bit << 1
+ if bit == 256
+ bit = 0
+ codec.write_uint8(mask)
+ mask = 0
+ end
+ codec.write_uint8(mask) if bit != 0
+ end
+ end
+
+ # encode properties
+ @properties.each do |property, value|
+ @session.encode_value(codec, value, prop.type) if value
+ end
+
+ # encode statistics
+ @statistics.each do |statistic, value|
+ @session.encode_value(codec, value, stat.type)
+ end
+ end
+
+ private
+
+ def send_method_request(method, name, args, synchronous = false, time_wait = nil)
+ @schema.methods.each do |schema_method|
+ if name == schema_method.name
+ send_codec = Qpid::StringCodec.new(@broker.conn.spec)
+ seq = @session.seq_mgr.reserve([schema_method, synchronous])
+ @broker.set_header(send_codec, ?M, seq)
+ @object_id.encode(send_codec)
+ @schema.klass_key.encode(send_codec)
+ send_codec.write_str8(name)
+
+ formals = method.arguments.select { |arg| arg.dir.index(?I) }
+ count = method.arguments.select { |arg| arg.dir.index(?I) }.size
+ unless formals.size == args.size
+ raise "Incorrect number of arguments: expected #{formals.size}, got #{args.size}"
+ end
+
+ formals.zip(args).each do |formal, actual|
+ @session.encode_value(send_codec, actual, formal.type)
+ end
+
+ ttl = time_wait ? time_wait * 1000 : nil
+ smsg = @broker.message(send_codec.encoded,
+ "agent.#{object_id.broker_bank}.#{object_id.agent_bank}", ttl=ttl)
+ @broker.sync_start if synchronous
+ @broker.emit(smsg)
+
+ return seq
+ end
+ end
+ end
+
+ def invoke(method, name, args)
+ kwargs = args[args.size - 1]
+ sync = true
+ timeout = DEFAULT_METHOD_WAIT_TIME
+
+ if kwargs.class == Hash
+ if kwargs.include?(:timeout)
+ timeout = kwargs[:timeout]
+ end
+
+ if kwargs.include?(:async)
+ sync = !kwargs[:async]
+ end
+ args.pop
+ end
+
+ seq = send_method_request(method, name, args, synchronous = sync)
+ if seq
+ return seq unless sync
+ unless @broker.wait_for_sync_done(timeout)
+ @session.seq_mgr.release(seq)
+ raise "Timed out waiting for method to respond"
+ end
+
+ if @broker.error
+ error_text = @broker.error
+ @broker.error = nil
+ raise error_text
+ end
+
+ return @broker.sync_result
+ end
+ raise "Invalid Method (software defect) [#{name}]"
+ end
+
+ def parse_presence_masks(codec, schema)
+ exclude_list = []
+ bit = 0
+ schema.properties.each do |property|
+ if property.optional
+ if bit == 0
+ mask = codec.read_uint8
+ bit = 1
+ end
+ if (mask & bit) == 0
+ exclude_list << property.name
+ end
+ bit *= 2
+ bit = 0 if bit == 256
+ end
+ end
+ return exclude_list
+ end
+ end
+
+ class MethodResult
+
+ attr_reader :status, :text, :out_args
+
+ def initialize(status, text, out_args)
+ @status = status
+ @text = text
+ @out_args = out_args
+ end
+
+ def method_missing(name)
+ name = name.to_s()
+ if @out_args.include?(name)
+ return @out_args[name]
+ else
+ raise "Unknown method result arg #{name}"
+ end
+ end
+
+ def to_s
+ argsString = ""
+ padding = ""
+ out_args.each do |key,value|
+ argsString += padding
+ padding = " " if padding == ""
+ argsString += key.to_s
+ argsString += " => "
+ argsString += value.to_s()
+ end
+ "MethodResult(Msg: '#{text}' Status: #{status} Return: [#{argsString}])"
+ end
+ end
+
+ class ManagedConnection
+
+ DELAY_MIN = 1
+ DELAY_MAX = 128
+ DELAY_FACTOR = 2
+ include MonitorMixin
+
+ def initialize(broker)
+ super()
+ @broker = broker
+ @cv = new_cond
+ @is_cancelled = false
+ end
+
+ # Main body of the running thread.
+ def start
+ @thread = Thread.new {
+ delay = DELAY_MIN
+ while true
+ begin
+ @broker.try_to_connect
+ synchronize do
+ while !@is_cancelled and @broker.connected?
+ @cv.wait
+ Thread.exit if @is_cancelled
+ delay = DELAY_MIN
+ end
+ end
+
+ rescue
+ delay *= DELAY_FACTOR if delay < DELAY_MAX
+ end
+
+ synchronize do
+ @cv.wait(delay)
+ Thread.exit if @is_cancelled
+ end
+ end
+ }
+ end
+
+ # Tell this thread to stop running and return.
+ def stop
+ synchronize do
+ @is_cancelled = true
+ @cv.signal
+ end
+ end
+
+ # Notify the thread that the connection was lost.
+ def disconnected
+ synchronize do
+ @cv.signal
+ end
+ end
+
+ def join
+ @thread.join
+ end
+ end
+
+ class Broker
+
+ SYNC_TIME = 60
+ @@next_seq = 1
+
+ include MonitorMixin
+
+ attr_accessor :error
+
+ attr_reader :amqp_session_id, :amqp_session, :conn, :broker_bank, :topic_name
+
+ attr_accessor :broker_id, :sync_result
+
+ def initialize(session, host, port, auth_name, auth_pass, kwargs)
+ super()
+
+ # For debugging..
+ Thread.abort_on_exception = true
+
+ @session = session
+ @host = host
+ @port = port
+ @auth_name = auth_name
+ @auth_pass = auth_pass
+ @user_id = nil
+ @auth_mechanism = kwargs[:mechanism]
+ @auth_service = kwargs[:service]
+ @broker_bank = 1
+ @topic_bound = false
+ @cv = new_cond
+ @error = nil
+ @broker_id = nil
+ @is_connected = false
+ @amqp_session_id = "%s.%d.%d" % [Socket.gethostname, Process::pid, @@next_seq]
+ @@next_seq += 1
+ @conn = nil
+ if @session.managedConnections?
+ @thread = ManagedConnection.new(self)
+ @thread.start
+ else
+ @thread = nil
+ try_to_connect
+ end
+ end
+
+ def connected?
+ @is_connected
+ end
+
+ def agent(broker_bank, agent_bank)
+ bank_key = "%d.%d" % [broker_bank, agent_bank]
+ return @agents[bank_key]
+ end
+
+ # Get the list of agents reachable via this broker
+ def agents
+ @agents.values
+ end
+
+ def url
+ "#{@host}:#{@port}"
+ end
+
+ def to_s
+ if connected?
+ "Broker connected at: #{url}"
+ else
+ "Disconnected Broker"
+ end
+ end
+
+ def wait_for_sync_done(timeout=nil)
+ wait_time = timeout ? timeout : SYNC_TIME
+ synchronize do
+ return @cv.wait_for(wait_time) { ! @sync_in_flight || @error }
+ end
+ end
+
+ def wait_for_stable
+ synchronize do
+ return unless connected?
+ return if @reqs_outstanding == 0
+ @sync_in_flight = true
+ unless @cv.wait_for(SYNC_TIME) { @reqs_outstanding == 0 }
+ raise "Timed out waiting for broker to synchronize"
+ end
+ end
+ end
+
+ # Compose the header of a management message
+ def set_header(codec, opcode, seq=0)
+ codec.write_uint8(?A)
+ codec.write_uint8(?M)
+ codec.write_uint8(?2)
+ codec.write_uint8(opcode)
+ codec.write_uint32(seq)
+ end
+
+ def message(body, routing_key="broker", ttl=nil)
+ dp = @amqp_session.delivery_properties
+ dp.routing_key = routing_key
+ dp.ttl = ttl if ttl
+ mp = @amqp_session.message_properties
+ mp.content_type = "x-application/qmf"
+ mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name)
+ #mp.user_id = @user_id if @user_id
+ return Qpid::Message.new(dp, mp, body)
+ end
+
+ def emit(msg, dest="qpid.management")
+ @amqp_session.message_transfer(:destination => dest,
+ :message => msg)
+ end
+
+ def inc_outstanding
+ synchronize { @reqs_outstanding += 1 }
+ end
+
+ def dec_outstanding
+ synchronize do
+ @reqs_outstanding -= 1
+ if @reqs_outstanding == 0 && ! @topic_bound
+ @topic_bound = true
+ @session.binding_key_list.each do |key|
+ args = {
+ :exchange => "qpid.management",
+ :queue => @topic_name,
+ :binding_key => key }
+ @amqp_session.exchange_bind(args)
+ end
+ end
+ if @reqs_outstanding == 0 && @sync_in_flight
+ sync_done
+ end
+ end
+ end
+
+ def sync_start
+ synchronize { @sync_in_flight = true }
+ end
+
+ def sync_done
+ synchronize do
+ @sync_in_flight = false
+ @cv.signal
+ end
+ end
+
+ def update_agent(obj)
+ bank_key = "%d.%d" % [obj.brokerBank, obj.agentBank]
+ if obj.delete_time == 0
+ unless @agents.include?(bank_key)
+ agent = Agent.new(self, obj.agentBank, obj.label)
+ @agents[bank_key] = agent
+ @session.console.new_agent(agent) if @session.console
+ end
+ else
+ agent = @agents.delete(bank_key)
+ @session.console.del_agent(agent) if agent && @session.console
+ end
+ end
+
+ def shutdown
+ if @thread
+ @thread.stop
+ @thread.join
+ end
+ if connected?
+ @amqp_session.incoming("rdest").stop
+ if @session.console
+ @amqp_session.incoming("tdest").stop
+ end
+ @amqp_session.close
+ @is_connected = false
+ end
+ end
+
+ def try_to_connect
+ @agents = {}
+ @agents["1.0"] = Agent.new(self, 0, "BrokerAgent")
+ @topic_bound = false
+ @sync_in_flight = false
+ @sync_request = 0
+ @sync_result = nil
+ @reqs_outstanding = 1
+
+ # FIXME: Need sth for Qpid::Util::connect
+
+ @conn = Qpid::Connection.new(TCPSocket.new(@host, @port),
+ :mechanism => @auth_mechanism,
+ :username => @auth_name,
+ :password => @auth_pass,
+ :host => @host,
+ :service => @auth_service)
+ @conn.start
+ @user_id = @conn.user_id
+ @reply_name = "reply-%s" % amqp_session_id
+ @amqp_session = @conn.session(@amqp_session_id)
+ @amqp_session.auto_sync = true
+
+ @amqp_session.queue_declare(:queue => @reply_name,
+ :exclusive => true,
+ :auto_delete => true)
+
+ @amqp_session.exchange_bind(:exchange => "amq.direct",
+ :queue => @reply_name,
+ :binding_key => @reply_name)
+ @amqp_session.message_subscribe(:queue => @reply_name,
+ :destination => "rdest",
+ :accept_mode => @amqp_session.message_accept_mode.none,
+ :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired)
+ q = @amqp_session.incoming("rdest")
+ q.exc_listen(& method(:exception_cb))
+ q.listen(& method(:reply_cb))
+ @amqp_session.message_set_flow_mode(:destination => "rdest",
+ :flow_mode => 1)
+ @amqp_session.message_flow(:destination => "rdest",
+ :unit => 0,
+ :value => 0xFFFFFFFF)
+ @amqp_session.message_flow(:destination => "rdest",
+ :unit => 1,
+ :value => 0xFFFFFFFF)
+
+ @topic_name = "topic-#{@amqp_session_id}"
+ @amqp_session.queue_declare(:queue => @topic_name,
+ :exclusive => true,
+ :auto_delete => true)
+ @amqp_session.message_subscribe(:queue => @topic_name,
+ :destination => "tdest",
+ :accept_mode => @amqp_session.message_accept_mode.none,
+ :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired)
+ @amqp_session.incoming("tdest").listen(& method(:reply_cb))
+ @amqp_session.message_set_flow_mode(:destination => "tdest",
+ :flow_mode => 1)
+ @amqp_session.message_flow(:destination => "tdest",
+ :unit => 0,
+ :value => 0xFFFFFFFF)
+ @amqp_session.message_flow(:destination => "tdest",
+ :unit => 1,
+ :value => 0xFFFFFFFF)
+
+ @is_connected = true
+ @session.handle_broker_connect(self)
+
+ codec = Qpid::StringCodec.new(@conn.spec)
+ set_header(codec, ?B)
+ msg = message(codec.encoded)
+ emit(msg)
+ end
+
+ private
+
+ # Check the header of a management message and extract the opcode and
+ # class
+ def check_header(codec)
+ begin
+ return [nil, nil] unless codec.read_uint8 == ?A
+ return [nil, nil] unless codec.read_uint8 == ?M
+ return [nil, nil] unless codec.read_uint8 == ?2
+ opcode = codec.read_uint8
+ seq = codec.read_uint32
+ return [opcode, seq]
+ rescue
+ return [nil, nil]
+ end
+ end
+
+ def reply_cb(msg)
+ codec = Qpid::StringCodec.new(@conn.spec, msg.body)
+ loop do
+ opcode, seq = check_header(codec)
+ return unless opcode
+ case opcode
+ when ?b: @session.handle_broker_resp(self, codec, seq)
+ when ?p: @session.handle_package_ind(self, codec, seq)
+ when ?z: @session.handle_command_complete(self, codec, seq)
+ when ?q: @session.handle_class_ind(self, codec, seq)
+ when ?m: @session.handle_method_resp(self, codec, seq)
+ when ?h: @session.handle_heartbeat_ind(self, codec, seq, msg)
+ when ?e: @session.handle_event_ind(self, codec, seq)
+ when ?s: @session.handle_schema_resp(self, codec, seq)
+ when ?c: @session.handle_content_ind(self, codec, seq, true, false)
+ when ?i: @session.handle_content_ind(self, codec, seq, false, true)
+ when ?g: @session.handle_content_ind(self, codec, seq, true, true)
+ else
+ raise "Unexpected opcode #{opcode.inspect}"
+ end
+ end
+ end
+
+ def exception_cb(data)
+ @is_connected = false
+ @error = data
+ synchronize { @cv.signal if @sync_in_flight }
+ @session.handle_error(@error)
+ @session.handle_broker_disconnect(self)
+ @thread.disconnected if @thread
+ end
+ end
+
+ class Agent
+ attr_reader :broker, :agent_bank, :label
+
+ def initialize(broker, agent_bank, label)
+ @broker = broker
+ @agent_bank = agent_bank
+ @label = label
+ end
+
+ def broker_bank
+ @broker.broker_bank
+ end
+
+ def to_s
+ "Agent at bank %d.%d (%s)" % [@broker.broker_bank, @agent_bank, @label]
+ end
+ end
+
+ class Event
+
+ attr_reader :klass_key, :arguments, :timestamp, :name, :schema
+
+ def initialize(session, broker, codec)
+ @session = session
+ @broker = broker
+ @klass_key = ClassKey.new(codec)
+ @timestamp = codec.read_int64
+ @severity = codec.read_uint8
+ @schema = nil
+
+ pname, cname, hash = @klass_key.to_a()
+ session.packages.keys.each do |pname|
+ k = [cname, hash]
+ if session.packages[pname].include?(k)
+ @schema = session.packages[pname][k]
+ @arguments = {}
+ @schema.arguments.each do |arg|
+ v = session.decode_value(codec, arg.type)
+ @arguments[arg.name] = v
+ end
+ end
+ end
+ end
+
+ def to_s
+ return "<uninterpretable>" unless @schema
+ t = Time.at(self.timestamp / 1000000000)
+ out = t.strftime("%c")
+ out += " " + sev_name + " " + @klass_key.package + ":" + @klass_key.klass_name
+ out += " broker=" + @broker.url
+ @schema.arguments.each do |arg|
+ out += " " + arg.name + "=" + @session.display_value(@arguments[arg.name], arg.type)
+ end
+ return out
+ end
+
+ def sev_name
+ case @severity
+ when 0 : return "EMER "
+ when 1 : return "ALERT"
+ when 2 : return "CRIT "
+ when 3 : return "ERROR"
+ when 4 : return "WARN "
+ when 5 : return "NOTIC"
+ when 6 : return "INFO "
+ when 7 : return "DEBUG"
+ else
+ return "INV-%d" % @severity
+ end
+ end
+
+ end
+
+ # Manage sequence numbers for asynchronous method calls
+ class SequenceManager
+ include MonitorMixin
+
+ def initialize
+ super()
+ @sequence = 0
+ @pending = {}
+ end
+
+ # Reserve a unique sequence number
+ def reserve (data)
+ synchronize do
+ result = @sequence
+ @sequence += 1
+ @pending[result] = data
+ return result
+ end
+ end
+
+ # Release a reserved sequence number
+ def release (seq)
+ synchronize { @pending.delete(seq) }
+ end
+ end
+
+ class DebugConsole < Console
+
+ def broker_connected(broker)
+ puts "brokerConnected #{broker}"
+ end
+
+ def broker_disconnected(broker)
+ puts "brokerDisconnected #{broker}"
+ end
+
+ def new_package(name)
+ puts "newPackage #{name}"
+ end
+
+ def new_class(kind, klass_key)
+ puts "newClass #{kind} #{klass_key}"
+ end
+
+ def new_agent(agent)
+ puts "new_agent #{agent}"
+ end
+
+ def del_agent(agent)
+ puts "delAgent #{agent}"
+ end
+
+ def object_props(broker, record)
+ puts "objectProps #{record}"
+ end
+
+ def object_stats(broker, record)
+ puts "objectStats #{record}"
+ end
+
+ def event(broker, event)
+ puts "event #{event}"
+ end
+
+ def heartbeat(agent, timestamp)
+ puts "heartbeat #{agent}"
+ end
+
+ def broker_info(broker)
+ puts "brokerInfo #{broker}"
+ end
+ end
+
+ module XML
+ TYPES = {
+ 1 => "uint8",
+ 2 => "uint16",
+ 3 => "uint32",
+ 4 => "uint64",
+ 5 => "bool",
+ 6 => "short-stirng",
+ 7 => "long-string",
+ 8 => "abs-time",
+ 9 => "delta-time",
+ 10 => "reference",
+ 11 => "boolean",
+ 12 => "float",
+ 13 => "double",
+ 14 => "uuid",
+ 15 => "field-table",
+ 16 => "int8",
+ 17 => "int16",
+ 18 => "int32",
+ 19 => "int64",
+ 20 => "object",
+ 21 => "list",
+ 22 => "array"
+ }
+
+ ACCESS_MODES = {
+ 1 => "RC",
+ 2 => "RW",
+ 3 => "RO"
+ }
+
+ def common_attributes(item)
+ attr_string = ""
+ attr_string << " desc='#{item.desc}'" if item.desc
+ attr_string << " desc='#{item.desc}'" if item.desc
+ attr_string << " refPackage='#{item.refPackage}'" if item.refPackage
+ attr_string << " refClass='#{item.refClass}'" if item.refClass
+ attr_string << " unit='#{item.unit}'" if item.unit
+ attr_string << " min='#{item.min}'" if item.min
+ attr_string << " max='#{item.max}'" if item.max
+ attr_string << " maxlen='#{item.maxlen}'" if item.maxlen
+ return attr_string
+ end
+
+ module_function :common_attributes
+
+ def schema_xml(session, *packages)
+ schema = "<schemas>\n"
+ packages.each do |package|
+ schema << "\t<schema package='#{package}'>\n"
+ session.classes(package).each do |klass_key|
+ klass = session.schema(klass_key)
+ if klass.is_table?
+ if klass.super_klass_key
+ schema << "\t\t<class name='#{klass.klass_key.klass_name}' hash='#{klass.klass_key.hash_string}' extends='#{klass.super_klass_key.to_s}'>\n"
+ else
+ schema << "\t\t<class name='#{klass.klass_key.klass_name}' hash='#{klass.klass_key.hash_string}'>\n"
+ end
+ klass.properties(false).each do |property|
+ schema << "\t\t\t<property name='#{property.name}' type='#{TYPES[property.type]}' access='#{ACCESS_MODES[property.access]}' optional='#{property.optional ? "True" : "False"}'#{common_attributes(property)}/>\n"
+ end
+ klass.methods(false).each do |method|
+ schema << "\t\t\t<method name='#{method.name}'>\n"
+ method.arguments.each do |arg|
+ schema << "\t\t\t\t<arg name='#{arg.name}' dir='#{arg.dir}' type='#{TYPES[arg.type]}'#{common_attributes(arg)}/>\n"
+ end
+ schema << "\t\t\t</method>\n"
+ end
+ schema << "\t\t</class>\n"
+ else
+ schema << "\t\t<event name='#{klass.klass_key.klass_name}' hash='#{klass.klass_key.hash_string}'>\n"
+ klass.arguments.each do |arg|
+ schema << "\t\t\t<arg name='#{arg.name}'type='#{TYPES[arg.type]}'#{common_attributes(arg)}/>\n"
+ end
+ schema << "\t\t</event>\n"
+ end
+ end
+ schema << "\t</package>\n"
+ end
+ schema << "</schema>"
+ end
+
+ module_function :schema_xml
+ end
+
+end
diff --git a/ruby/lib/qpid/queue.rb b/ruby/lib/qpid/queue.rb
new file mode 100644
index 0000000000..4150173b53
--- /dev/null
+++ b/ruby/lib/qpid/queue.rb
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Augment the standard python multithreaded Queue implementation to add a
+# close() method so that threads blocking on the content of a queue can be
+# notified if the queue is no longer in use.
+
+require 'thread'
+
+# Python nominally uses a bounded queue, but the code never establishes
+# a maximum size; we therefore use Ruby's unbounded queue
+class Qpid::Queue < ::Queue
+
+ DONE = Object.new
+ STOP = Object.new
+
+ def initialize
+ super
+ @error = nil
+ @listener = nil
+ @exc_listener = nil
+ @exc_listener_lock = Monitor.new
+ @thread = nil
+ end
+
+ def close(error = nil)
+ @error = error
+ put(DONE)
+ unless @thread.nil?
+ @thread.join()
+ @thread = nil
+ end
+ end
+
+ def get(block = true, timeout = nil)
+ unless timeout.nil?
+ raise NotImplementedError
+ end
+ result = pop(! block)
+ if result == DONE
+ # this guarantees that any other waiting threads or any future
+ # calls to get will also result in a Qpid::Closed exception
+ put(DONE)
+ raise Qpid::Closed.new(@error)
+ else
+ return result
+ end
+ end
+
+ alias :put :push
+
+ def exc_listen(&block)
+ @exc_listener_lock.synchronize do
+ @exc_listener = block
+ end
+ end
+
+ def listen(&block)
+ if ! block_given? && @thread
+ put(STOP)
+ @thread.join()
+ @thread = nil
+ end
+
+ # FIXME: There is a potential race since we could be changing one
+ # non-nil listener to another
+ @listener = block
+
+ if block_given? && @thread.nil?
+ @thread = Thread.new do
+ loop do
+ begin
+ o = get()
+ break if o == STOP
+ @listener.call(o)
+ rescue Qpid::Closed => e
+ @exc_listener.call(e) if @exc_listener
+ break
+ end
+ end
+ end
+ end
+ end
+
+end
diff --git a/ruby/lib/qpid/session.rb b/ruby/lib/qpid/session.rb
new file mode 100644
index 0000000000..d693b722c2
--- /dev/null
+++ b/ruby/lib/qpid/session.rb
@@ -0,0 +1,458 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'monitor'
+
+module Qpid
+
+ class Session < Invoker
+
+ def log; Qpid::logger["qpid.io.cmd"]; end
+ def msg; Qpid::logger["qpid.io.msg"]; end
+
+
+ class Exception < RuntimeError; end
+ class Closed < Qpid::Session::Exception; end
+ class Detached < Qpid::Session::Exception; end
+
+
+ INCOMPLETE = Object.new
+
+ def self.client(*args)
+ return Qpid::Client(*args)
+ end
+
+ def self.server(*args)
+ return Server(*args)
+ end
+
+ attr_reader :name, :spec, :auto_sync, :timeout, :channel
+ attr_reader :results, :exceptions
+ attr_accessor :channel, :auto_sync, :send_id, :receiver, :sender
+
+ # FIXME: Pass delegate through a block ?
+ def initialize(name, spec, kwargs = {})
+ auto_sync = true
+ auto_sync = kwargs[:auto_sync] if kwargs.key?(:auto_sync)
+ timeout = kwargs[:timeout] || 10
+ delegate = kwargs[:delegate]
+
+ @name = name
+ @spec = spec
+ @auto_sync = auto_sync
+ @timeout = timeout
+ @invoke_lock = Monitor.new
+ @closing = false
+ @closed = false
+
+ @cond_lock = Monitor.new
+ @condition = @cond_lock.new_cond
+
+ @send_id = true
+ @receiver = Receiver.new(self)
+ @sender = Sender.new(self)
+
+ @lock = Monitor.new
+ @incoming = {}
+ @results = {}
+ @exceptions = []
+
+ @assembly = nil
+
+ @delegate = delegate.call(self) if delegate
+
+ @ctl_seg = spec[:segment_type].enum[:control].value
+ @cmd_seg = spec[:segment_type].enum[:command].value
+ @hdr_seg = spec[:segment_type].enum[:header].value
+ @body_seg = spec[:segment_type].enum[:body].value
+ end
+
+ def incoming(destination)
+ @lock.synchronize do
+ queue = @incoming[destination]
+ unless queue
+ queue = Incoming.new(self, destination)
+ @incoming[destination] = queue
+ end
+ return queue
+ end
+ end
+
+ def error?
+ @exceptions.size > 0
+ end
+
+ def sync(timeout=nil)
+ if channel && Thread.current == channel.connection.thread
+ raise Qpid::Session::Exception, "deadlock detected"
+ end
+ unless @auto_sync
+ execution_sync(:sync => true)
+ end
+ last = @sender.next_id - 1
+ @cond_lock.synchronize do
+ unless @condition.wait_for(timeout) {
+ @sender.completed.include?(last) || error?
+ }
+ raise Qpid::Timeout
+ end
+ end
+ if error?
+ raise Qpid::Session::Exception, @exceptions
+ end
+ end
+
+ def close(timeout=nil)
+ @invoke_lock.synchronize do
+ @closing = true
+ channel.session_detach(name)
+ end
+ @cond_lock.synchronize do
+ unless @condition.wait_for(timeout) { @closed }
+ raise Qpid::Timeout
+ end
+ end
+ end
+
+ def closed
+ @lock.synchronize do
+ return if @closed
+
+ @results.each { |id, f| f.error(exceptions) }
+ @results.clear
+
+ @incoming.values.each { |q| q.close(exceptions) }
+ @closed = true
+ @cond_lock.synchronize { @condition.signal }
+ end
+ end
+
+ def resolve_method(name)
+ o = @spec.children[name]
+ case o
+ when Qpid::Spec010::Command
+ return invocation(:method, o)
+ when Qpid::Spec010::Struct
+ return invocation(:method, o)
+ when Qpid::Spec010::Domain
+ return invocation(:value, o.enum) unless o.enum.nil?
+ end
+
+ matches = @spec.children.select { |x|
+ x.name.to_s.include?(name.to_s)
+ }.collect { |x| x.name.to_s }.sort
+ if matches.size == 0
+ msg = nil
+ elsif matches.size == 1
+ msg = "Did you mean #{matches[0]} ? "
+ else
+ msg = "Did you mean one of #{matches.join(",")} ? "
+ end
+ return invocation(:error, msg)
+ end
+
+ def invoke(type, args)
+ # XXX
+ unless type.respond_to?(:track)
+ return type.create(*args)
+ end
+ @invoke_lock.synchronize do
+ return do_invoke(type, args)
+ end
+ end
+
+ def do_invoke(type, args)
+ raise Qpid::Session::Closed if @closing
+ raise Qpid::Session::Detached unless channel
+
+ # Clumsy simulation of Python's keyword args
+ kwargs = {}
+ if args.size > 0 && args[-1].is_a?(Hash)
+ if args.size > type.fields.size
+ kwargs = args.pop
+ elsif type.fields[args.size - 1].type != @spec[:map]
+ kwargs = args.pop
+ end
+ end
+
+ if type.payload
+ if args.size == type.fields.size + 1
+ message = args.pop
+ else
+ message = kwargs.delete(:message) # XXX Really ?
+ end
+ else
+ message = nil
+ end
+
+ hdr = Qpid::struct(@spec[:header])
+ hdr.sync = @auto_sync || kwargs.delete(:sync)
+
+ cmd = type.create(*args.push(kwargs))
+ sc = Qpid::StringCodec.new(@spec)
+ sc.write_command(hdr, cmd)
+
+ seg = Segment.new(true, (message.nil? ||
+ (message.headers.nil? && message.body.nil?)),
+ type.segment_type, type.track, @channel.id, sc.encoded)
+
+ unless type.result.nil?
+ result = Future.new(exception=Exception)
+ @results[@sender.next_id] = result
+ end
+ emit(seg)
+
+ log.debug("SENT %s %s %s" % [seg.id, hdr, cmd]) if log
+
+ unless message.nil?
+ unless message.headers.nil?
+ sc = Qpid::StringCodec.new(@spec)
+ message.headers.each { |st| sc.write_struct32(st) }
+
+ seg = Segment.new(false, message.body.nil?, @hdr_seg,
+ type.track, @channel.id, sc.encoded)
+ emit(seg)
+ end
+ unless message.body.nil?
+ seg = Segment.new(false, true, @body_seg, type.track,
+ @channel.id, message.body)
+ emit(seg)
+ end
+ msg.debug("SENT %s" % message) if msg
+ end
+
+ if !type.result.nil?
+ return @auto_sync ? result.get(@timeout) : result
+ elsif @auto_sync
+ sync(@timeout)
+ end
+ end
+
+ def received(seg)
+ @receiver.received(seg)
+ if seg.first_segment?
+ raise Qpid::Session::Exception unless @assembly.nil?
+ @assembly = []
+ end
+ @assembly << seg
+ if seg.last_segment?
+ dispatch(@assembly)
+ @assembly = nil
+ end
+ end
+
+ def dispatch(assembly)
+ hdr = nil
+ cmd = nil
+ header = nil
+ body = nil
+ assembly.each do |seg|
+ d = seg.decode(@spec)
+ case seg.type
+ when @cmd_seg
+ hdr, cmd = d
+ when @hdr_seg
+ header = d
+ when @body_seg
+ body = d
+ else
+ raise Qpid::Session::Exception
+ end
+ end
+ log.debug("RECV %s %s %s" % [cmd.id, hdr, cmd]) if log
+
+ if cmd.st_type.payload
+ result = @delegate.send(cmd.st_type.name, cmd, header, body)
+ else
+ result = @delegate.send(cmd.st_type.name, cmd)
+ end
+
+ unless cmd.st_type.result.nil?
+ execution_result(cmd.id, result)
+ end
+
+ if result != INCOMPLETE
+ assembly.each do |seg|
+ @receiver.has_completed(seg)
+ # XXX: don't forget to obey sync for manual completion as well
+ if hdr.sync
+ @channel.session_completed(@receiver.completed)
+ end
+ end
+ end
+ end
+
+ # Python calls this 'send', but that has a special meaning
+ # in Ruby, so we call it 'emit'
+ def emit(seg)
+ @sender.emit(seg)
+ end
+
+ def signal
+ @cond_lock.synchronize { @condition.signal }
+ end
+
+ def wait_for(timeout = nil, &block)
+ @cond_lock.synchronize { @condition.wait_for(timeout, &block) }
+ end
+
+ def to_s
+ "<Session: #{name}, #{channel}>"
+ end
+
+ class Receiver
+
+ attr_reader :completed
+ attr_accessor :next_id, :next_offset
+
+ def initialize(session)
+ @session = session
+ @next_id = nil
+ @next_offset = nil
+ @completed = Qpid::RangedSet.new()
+ end
+
+ def received(seg)
+ if @next_id.nil? || @next_offset.nil?
+ raise Exception, "todo"
+ end
+ seg.id = @next_id
+ seg.offset = @next_offset
+ if seg.last_segment?
+ @next_id += 1
+ @next_offset = 0
+ else
+ @next_offset += seg.payload.size
+ end
+ end
+
+ def has_completed(seg)
+ if seg.id.nil?
+ raise ArgumentError, "cannot complete unidentified segment"
+ end
+ if seg.last_segment?
+ @completed.add(seg.id)
+ end
+ end
+
+ def known_completed(commands)
+ completed = Qpid::RangedSet.new()
+ @completed.ranges.each do |c|
+ unless commands.ranges.find { |kc|
+ kc.contains(c.lower) && kc.contains(c.upper)
+ }
+ completed.add_range(c)
+ end
+ end
+ @completed = completed
+ end
+ end
+
+ class Sender
+
+ def initialize(session)
+ @session = session
+ @next_id = 0.to_serial
+ @next_offset = 0
+ @segments = []
+ @completed = RangedSet.new()
+ end
+
+ attr_reader :next_id, :completed
+
+ def emit(seg)
+ seg.id = @next_id
+ seg.offset = @next_offset
+ if seg.last_segment?
+ @next_id += 1
+ @next_offset = 0
+ else
+ @next_offset += seg.payload.size
+ end
+ @segments << seg
+ if @session.send_id
+ @session.send_id = false
+ @session.channel.session_command_point(seg.id, seg.offset)
+ end
+ @session.channel.connection.write_segment(seg)
+ end
+
+ def has_completed(commands)
+ @segments = @segments.reject { |seg| commands.include?(seg.id) }
+ commands.ranges.each do |range|
+ @completed.add(range.lower, range.upper)
+ end
+ end
+ end
+
+ class Incoming < Qpid::Queue
+
+ def initialize(session, destination)
+ super()
+ @session = session
+ @destination = destination
+ end
+
+ def start
+ @session.message_credit_unit.choices.each do |unit|
+ @session.message_flow(@destination, unit.value, 0xFFFFFFFF)
+ end
+ end
+
+ def stop
+ @session.message_cancel(@destination)
+ listen # Kill the listener
+ end
+ end
+
+ class Delegate
+
+ def initialize(session)
+ @session = session
+ end
+
+ #XXX: do something with incoming accepts
+ def message_accept(ma) nil; end
+
+ def execution_result(er)
+ future = @session.results.delete(er.command_id)
+ future.set(er.value)
+ end
+
+ def execution_exception(ex)
+ @session.exceptions << ex
+ end
+ end
+
+ class Client < Delegate
+
+ def log ; Qpid::logger["qpid.io.msg"]; end
+
+ def message_transfer(cmd, headers, body)
+ m = Qpid::Message.new(body)
+ m.headers = headers
+ m.id = cmd.id
+ messages = @session.incoming(cmd.destination)
+ messages.put(m)
+ log.debug("RECV %s" % m) if log
+ return INCOMPLETE
+ end
+ end
+ end
+end
diff --git a/ruby/lib/qpid/spec.rb b/ruby/lib/qpid/spec.rb
new file mode 100644
index 0000000000..b3d70d019d
--- /dev/null
+++ b/ruby/lib/qpid/spec.rb
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "set"
+require "rexml/document"
+require "qpid/fields"
+require "qpid/traverse"
+
+module Qpid
+ module Spec
+
+ include REXML
+
+ class Container < Array
+
+ def initialize()
+ @cache = {}
+ end
+
+ def [](key)
+ return @cache[key] if @cache.include?(key)
+ value = do_lookup(key)
+ @cache[key] = value
+ return value
+ end
+
+ def do_lookup(key)
+ case key
+ when String
+ return find {|x| x.name == key.intern()}
+ when Symbol
+ return find {|x| x.name == key}
+ else
+ return slice(key)
+ end
+ end
+
+ def +(other)
+ copy = clone()
+ copy.concat(other)
+ return copy
+ end
+
+ end
+
+ class Reference
+
+ fields(:name)
+
+ def init(&block)
+ @resolver = block
+ end
+
+ def resolve(*args)
+ @resolver.call(*args)
+ end
+
+ end
+
+ class Loader
+
+ def initialize()
+ @stack = []
+ end
+
+ def container()
+ return Container.new()
+ end
+
+ def load(obj)
+ case obj
+ when String
+ elem = @stack[-1]
+ result = container()
+ elem.elements.each(obj) {|e|
+ @index = result.size
+ result << load(e)
+ }
+ @index = nil
+ return result
+ else
+ elem = obj
+ @stack << elem
+ begin
+ result = send(:"load_#{elem.name}")
+ ensure
+ @stack.pop()
+ end
+ return result
+ end
+ end
+
+ def element
+ @stack[-1]
+ end
+
+ def text
+ element.text
+ end
+
+ def attr(name, type = :string, default = nil, path = nil)
+ if path.nil?
+ elem = element
+ else
+ elem = nil
+ element.elements.each(path) {|elem|}
+ if elem.nil?
+ return default
+ end
+ end
+
+ value = elem.attributes[name]
+ value = value.strip() unless value.nil?
+ if value.nil?
+ default
+ else
+ send(:"parse_#{type}", value)
+ end
+ end
+
+ def parse_int(value)
+ if value.nil?
+ return nil
+ else
+ value.to_i(0)
+ end
+ end
+
+ TRUE = ["yes", "true", "1"].to_set
+ FALSE = ["no", "false", "0", nil].to_set
+
+ def parse_bool(value)
+ if TRUE.include?(value)
+ true
+ elsif FALSE.include?(value)
+ false
+ else
+ raise Exception.new("parse error, expecting boolean: #{value}")
+ end
+ end
+
+ def parse_string(value)
+ value.to_s
+ end
+
+ def parse_symbol(value)
+ value.intern() unless value.nil?
+ end
+
+ REPLACE = {" " => "_", "-" => "_"}
+ KEYWORDS = {"global" => "global_", "return" => "return_"}
+
+ def parse_name(value)
+ return if value.nil?
+
+ REPLACE.each do |k, v|
+ value = value.gsub(k, v)
+ end
+
+ value = KEYWORDS[value] if KEYWORDS.has_key? value
+ return value.intern()
+ end
+
+ end
+
+ end
+end
diff --git a/ruby/lib/qpid/spec010.rb b/ruby/lib/qpid/spec010.rb
new file mode 100644
index 0000000000..3e54115087
--- /dev/null
+++ b/ruby/lib/qpid/spec010.rb
@@ -0,0 +1,485 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "qpid/spec"
+require 'pathname'
+require 'fileutils'
+
+module Qpid::Spec010
+
+ include Qpid::Spec
+
+ # XXX: workaround for ruby bug/missfeature
+ Reference = Reference
+ Loader = Loader
+
+ class Spec
+
+ ENCODINGS = {
+ String => "str16",
+ Fixnum => "int64",
+ Bignum => "int64",
+ Float => "float",
+ NilClass => "void",
+ Array => "list",
+ Hash => "map"
+ }
+
+ fields(:major, :minor, :port, :children)
+
+ def init()
+ @controls = {}
+ @commands = {}
+ @structs = {}
+ @types = {}
+ children.each {|c|
+ case c
+ when Control
+ @controls[c.code] = c
+ when Command
+ @commands[c.code] = c
+ when Struct
+ @structs[c.code] = c
+ when Type
+ @types[c.code] = c unless c.code.nil?
+ end
+ }
+ end
+
+ attr_reader :controls, :commands, :structs, :types
+
+ def [](key)
+ return @children[key]
+ end
+
+ def encoding(klass)
+ if ENCODINGS.has_key?(klass)
+ return self[ENCODINGS[klass]]
+ end
+ for base in klass.__bases__
+ result = encoding(base)
+ return result unless result.nil?
+ end
+ end
+
+ def inspect; "spec"; end
+ end
+
+ class Constant
+
+ fields(:name, :value)
+
+ attr :parent, true
+
+ end
+
+ class Type
+
+ fields(:name, :code, :fixed, :variable)
+
+ attr :parent, true
+
+ def present?(value)
+ if @fixed == 0
+ return value
+ else
+ return !value.nil?
+ end
+ end
+
+ def encode(codec, value)
+ codec.send("write_#{name}", value)
+ end
+
+ def decode(codec)
+ return codec.send("read_#{name}")
+ end
+
+ def inspect; name; end
+
+ end
+
+ class Domain < Type
+
+ fields(:name, :type, :enum)
+
+ attr :parent, true
+
+ def encode(codec, value)
+ @type.encode(codec, value)
+ end
+
+ def decode(codec)
+ return @type.decode(codec)
+ end
+
+ end
+
+ class Enum
+ fields(:choices)
+
+ def [](choice)
+ case choice
+ when String
+ choice = choice.to_sym
+ return choices.find { |c| c.name == choice }
+ when Symbol
+ return choices.find { |c| c.name == choice }
+ else
+ return choices.find { |c| c.value == choice }
+ end
+ end
+
+ def method_missing(name, *args)
+ raise ArgumentError.new("wrong number of arguments") unless args.empty?
+ return self[name].value
+ end
+
+ end
+
+ class Choice
+ fields(:name, :value)
+ end
+
+ class Composite
+
+ fields(:name, :code, :size, :pack, :fields)
+
+ attr :parent, true
+
+ # Python calls this 'new', but that has special meaning in Ruby
+ def create(*args)
+ return Qpid::struct(self, *args)
+ end
+
+ def decode(codec)
+ codec.read_size(@size)
+ codec.read_uint16() unless @code.nil?
+ return Qpid::struct(self, self.decode_fields(codec))
+ end
+
+ def decode_fields(codec)
+ flags = 0
+ pack.times {|i| flags |= (codec.read_uint8() << 8*i)}
+
+ result = {}
+
+ fields.each_index {|i|
+ f = @fields[i]
+ if flags & (0x1 << i) != 0
+ result[f.name] = f.type.decode(codec)
+ else
+ result[f.name] = nil
+ end
+ }
+
+ return result
+ end
+
+ def encode(codec, value)
+ sc = Qpid::StringCodec.new(@spec)
+ sc.write_uint16(@code) unless @code.nil?
+ encode_fields(sc, value)
+ codec.write_size(@size, sc.encoded.size)
+ codec.write(sc.encoded)
+ end
+
+ def encode_fields(codec, values)
+ # FIXME: This could be written cleaner using select
+ # instead of flags
+ flags = 0
+ fields.each_index do |i|
+ f = fields[i]
+ flags |= (0x1 << i) if f.type.present?(values[f.name])
+ end
+
+ pack.times { |i| codec.write_uint8((flags >> 8*i) & 0xFF) }
+
+ fields.each_index do |i|
+ f = fields[i]
+ f.type.encode(codec, values[f.name]) if flags & (0x1 << i) != 0
+ end
+ end
+
+ def inspect; name; end
+
+ end
+
+ class Field
+
+ fields(:name, :type, :exceptions)
+
+ def default()
+ return nil
+ end
+
+ end
+
+ class Struct < Composite
+
+ def present?(value)
+ return !value.nil?
+ end
+
+ end
+
+ class Action < Composite; end
+
+ class Control < Action
+
+ def segment_type
+ @parent[:segment_type].enum[:control].value
+ end
+
+ def track
+ @parent[:track].enum[:control].value
+ end
+
+ end
+
+ class Command < Action
+
+ attr_accessor :payload, :result
+
+ def segment_type
+ @parent["segment_type"].enum["command"].value
+ end
+
+ def track
+ @parent["track"].enum["command"].value
+ end
+
+ end
+
+ class Doc
+ fields(:type, :title, :text)
+ end
+
+ class Loader010 < Loader
+
+ def initialize()
+ super()
+ end
+
+ def klass
+ cls = element
+ until cls.nil?
+ break if cls.name == "class"
+ cls = cls.parent
+ end
+ return cls
+ end
+
+ def scope
+ if element.name == "struct"
+ return nil
+ else
+ return class_name
+ end
+ end
+
+ def class_name
+ cls = klass
+ if cls.nil?
+ return nil
+ else
+ return parse_name(cls.attributes["name"].strip)
+ end
+ end
+
+ def class_code
+ cls = klass
+ if cls.nil?
+ return 0
+ else
+ return parse_int(cls.attributes["code"].strip)
+ end
+ end
+
+ def parse_decl(value)
+ name = parse_name(value)
+
+ s = scope
+ if s.nil?
+ return name
+ else
+ return :"#{s}_#{name}"
+ end
+ end
+
+ def parse_code(value)
+ c = parse_int(value)
+ if c.nil?
+ return nil
+ else
+ return c | (class_code << 8)
+ end
+ end
+
+ def parse_type(value)
+ name = parse_name(value.sub(".", "_"))
+ cls = class_name
+ return Reference.new {|spec|
+ candidates = [name]
+ candidates << :"#{cls}_#{name}" unless cls.nil?
+ for c in candidates
+ child = spec[c]
+ break unless child.nil?
+ end
+ if child.nil?
+ raise Exception.new("unresolved type: #{name}")
+ else
+ child
+ end
+}
+ end
+
+ def load_amqp()
+ children = nil
+
+ for s in ["constant", "type", "domain", "struct", "control",
+ "command"]
+ ch = load(s)
+ if children.nil?
+ children = ch
+ else
+ children += ch
+ end
+ children += load("class/#{s}")
+ end
+ children += load("class/command/result/struct")
+ Spec.new(attr("major", :int), attr("minor", :int), attr("port", :int),
+ children)
+ end
+
+ def load_constant()
+ Constant.new(attr("name", :decl), attr("value", :int))
+ end
+
+ def load_type()
+ Type.new(attr("name", :decl), attr("code", :code),
+ attr("fixed-width", :int), attr("variable-width", :int))
+ end
+
+ def load_domain()
+ Domain.new(attr("name", :decl), attr("type", :type), load("enum").first)
+ end
+
+ def load_enum()
+ Enum.new(load("choice"))
+ end
+
+ def load_choice()
+ Choice.new(attr("name", :name), attr("value", :int))
+ end
+
+ def load_field()
+ Field.new(attr("name", :name), attr("type", :type))
+ end
+
+ def load_struct()
+ Struct.new(attr("name", :decl), attr("code", :code), attr("size", :int),
+ attr("pack", :int), load("field"))
+ end
+
+ def load_action(cls)
+ cls.new(attr("name", :decl), attr("code", :code), 0, 2, load("field"))
+ end
+
+ def load_control()
+ load_action(Control)
+ end
+
+ def load_command()
+ result = attr("type", :type, nil, "result")
+ result = attr("name", :type, nil, "result/struct") if result.nil?
+ segs = load("segments")
+ cmd = load_action(Command)
+ cmd.result = result
+ cmd.payload = !segs.empty?
+ return cmd
+ end
+
+ def load_result()
+ true
+ end
+
+ def load_segments()
+ true
+ end
+
+ end
+
+ def self.spec_cache(specfile)
+ File::join(File::dirname(__FILE__), "spec_cache",
+ File::basename(specfile, ".xml") + ".rb_marshal")
+ end
+
+ # XXX: could be shared
+ def self.load(spec = nil)
+ return spec if spec.is_a?(Qpid::Spec010::Spec)
+ if spec.nil?
+ # FIXME: Need to add a packaging setup in here so we know where
+ # the installed spec is going to be.
+ specfile = nil
+ if ENV['AMQP_SPEC']
+ specfile = ENV['AMQP_SPEC']
+ else
+ require "qpid/config"
+ specfile = Qpid::Config.amqp_spec
+ end
+ else
+ specfile = spec
+ end
+
+ specfile_cache = spec_cache(specfile)
+ # FIXME: Check that cache is newer than specfile
+ if File::exist?(specfile_cache)
+ begin
+ spec = File::open(specfile_cache, "r") do |f|
+ Marshal::load(f)
+ end
+ return spec
+ rescue
+ # Ignore, will load from XML
+ end
+ end
+
+ doc = File::open(specfile, "r") { |f| Document.new(f) }
+ spec = Loader010.new().load(doc.root)
+ spec.traverse! do |o|
+ if o.is_a?(Reference)
+ o.resolve(spec)
+ else
+ o
+ end
+ end
+
+ spec.children.each { |c| c.parent = spec }
+
+ begin
+ FileUtils::mkdir_p(File::dirname(specfile_cache))
+ File::open(specfile_cache, "w") { |f| Marshal::dump(spec, f) }
+ rescue
+ # Ignore, we are fine without the cached spec
+ end
+ return spec
+ end
+
+end
diff --git a/ruby/lib/qpid/spec08.rb b/ruby/lib/qpid/spec08.rb
new file mode 100644
index 0000000000..902c05c297
--- /dev/null
+++ b/ruby/lib/qpid/spec08.rb
@@ -0,0 +1,190 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "qpid/spec"
+
+module Qpid08
+
+ module Spec
+
+ include Qpid::Spec
+
+ # XXX: workaround for ruby bug/missfeature
+ Reference = Reference
+
+ class Root
+ fields(:major, :minor, :classes, :constants, :domains)
+
+ def find_method(name)
+ classes.each do |c|
+ c.methods.each do |m|
+ if name == m.qname
+ return m
+ end
+ end
+ end
+
+ return nil
+ end
+ end
+
+ class Constant
+ fields(:name, :id, :type, :docs)
+ end
+
+ class Domain
+ fields(:name, :type)
+ end
+
+ class Class
+ fields(:name, :id, :handler, :fields, :methods, :docs)
+ end
+
+ class Method
+ fields(:name, :id, :content?, :responses, :synchronous?, :fields,
+ :docs)
+
+ def init()
+ @response = false
+ end
+
+ attr :parent, true
+
+ def response?; @response end
+ def response=(b); @response = b end
+
+ def qname
+ :"#{parent.name}_#{name}"
+ end
+ end
+
+ class Field
+ fields(:name, :id, :type, :docs)
+
+ def default
+ case type
+ when :bit then false
+ when :octet, :short, :long, :longlong then 0
+ when :shortstr, :longstr then ""
+ when :table then {}
+ end
+ end
+ end
+
+ class Doc
+ fields(:type, :text)
+ end
+
+ class Container08 < Container
+ def do_lookup(key)
+ case key
+ when Integer
+ return find {|x| x.id == key}
+ else
+ return super(key)
+ end
+ end
+ end
+
+ class Loader08 < Loader
+
+ def container()
+ return Container08.new()
+ end
+
+ def load_amqp()
+ Root.new(attr("major", :int), attr("minor", :int), load("class"),
+ load("constant"), load("domain"))
+ end
+
+ def load_class()
+ Class.new(attr("name", :name), attr("index", :int), attr("handler", :name),
+ load("field"), load("method"), load("doc"))
+ end
+
+ def load_method()
+ Method.new(attr("name", :name), attr("index", :int),
+ attr("content", :bool), load("response"),
+ attr("synchronous", :bool), load("field"), load("docs"))
+ end
+
+ def load_response()
+ name = attr("name", :name)
+ Reference.new {|spec, klass|
+ response = klass.methods[name]
+ if response.nil?
+ raise Exception.new("no such method: #{name}")
+ end
+ response
+ }
+ end
+
+ def load_field()
+ type = attr("type", :name)
+ if type.nil?
+ domain = attr("domain", :name)
+ type = Reference.new {|spec, klass|
+ spec.domains[domain].type
+ }
+ end
+ Field.new(attr("name", :name), @index, type, load("docs"))
+ end
+
+ def load_constant()
+ Constant.new(attr("name", :name), attr("value", :int), attr("class", :name),
+ load("doc"))
+ end
+
+ def load_domain()
+ Domain.new(attr("name", :name), attr("type", :name))
+ end
+
+ def load_doc()
+ Doc.new(attr("type", :symbol), text)
+ end
+
+ end
+
+ def self.load(spec)
+ case spec
+ when String
+ spec = File.new(spec)
+ end
+ doc = Document.new(spec)
+ spec = Loader08.new().load(doc.root)
+ spec.classes.each do |klass|
+ klass.traverse! do |o|
+ case o
+ when Reference
+ o.resolve(spec, klass)
+ else
+ o
+ end
+ end
+ klass.methods.each do |m|
+ m.parent = klass
+ m.responses.each do |r|
+ r.response = true
+ end
+ end
+ end
+ return spec
+ end
+ end
+end
diff --git a/ruby/qpid/test.rb b/ruby/lib/qpid/test.rb
index f8107143ab..2e643f4348 100644
--- a/ruby/qpid/test.rb
+++ b/ruby/lib/qpid/test.rb
@@ -1,5 +1,3 @@
-
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,12 +15,11 @@
# specific language governing permissions and limitations
# under the License.
#
-#/
-require "qpid/spec"
+require "qpid/spec08"
require "qpid/client"
-module Qpid
+module Qpid08
module Test
diff --git a/ruby/qpid/traverse.rb b/ruby/lib/qpid/traverse.rb
index 67358a7eb1..67358a7eb1 100644
--- a/ruby/qpid/traverse.rb
+++ b/ruby/lib/qpid/traverse.rb
diff --git a/ruby/lib/qpid/util.rb b/ruby/lib/qpid/util.rb
new file mode 100644
index 0000000000..2dbc37da09
--- /dev/null
+++ b/ruby/lib/qpid/util.rb
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'thread'
+require 'monitor'
+
+# Monkeypatch
+class MonitorMixin::ConditionVariable
+
+ # Wait until BLOCK returns TRUE or TIMEOUT seconds have passed
+ # Return TRUE if BLOCK returned TRUE within the TIMEOUT, FALSE
+ # otherswise
+ def wait_for(timeout=nil, &block)
+ start = Time.now
+ passed = 0
+ until yield
+ if timeout.nil?
+ wait
+ elsif passed < timeout
+ wait(timeout)
+ else
+ return false
+ end
+ passed = Time.now - start
+ end
+ return true
+ end
+end
+
+module Qpid::Util
+
+ # Similar to Python's threading.Event
+ class Event
+ def initialize
+ @monitor = Monitor.new
+ @cond = @monitor.new_cond
+ @set = false
+ end
+
+ def set
+ @monitor.synchronize do
+ @set = true
+ @cond.signal
+ end
+ end
+
+ def clear
+ @monitor.synchronize { @set = false }
+ end
+
+ def wait(timeout = nil)
+ @monitor.synchronize do
+ unless @set
+ @cond.wait_for(timeout) { @set }
+ end
+ end
+ end
+ end
+end
diff --git a/ruby/qpid/spec.rb b/ruby/qpid/spec.rb
deleted file mode 100644
index 9a04f584d0..0000000000
--- a/ruby/qpid/spec.rb
+++ /dev/null
@@ -1,289 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-require "set"
-require "rexml/document"
-require "qpid/fields"
-require "qpid/traverse"
-
-module Spec
-
- include REXML
-
- class Container < Array
-
- def initialize()
- @cache = {}
- end
-
- def [](key)
- return @cache[key] if @cache.include?(key)
-
- case key
- when String
- value = find {|x| x.name == key.intern()}
- when Symbol
- value = find {|x| x.name == key}
- when Integer
- value = find {|x| x.id == key}
- else
- raise Exception.new("invalid key: #{key}")
- end
-
- @cache[key] = value
- return value
- end
-
- end
-
- class Root
- fields(:major, :minor, :classes, :constants, :domains)
-
- def find_method(name)
- classes.each do |c|
- c.methods.each do |m|
- if name == m.qname
- return m
- end
- end
- end
-
- return nil
- end
- end
-
- class Constant
- fields(:name, :id, :type, :docs)
- end
-
- class Domain
- fields(:name, :type)
- end
-
- class Class
- fields(:name, :id, :handler, :fields, :methods, :docs)
- end
-
- class Method
- fields(:name, :id, :content?, :responses, :synchronous?, :fields,
- :docs)
-
- def init()
- @response = false
- end
-
- attr :parent, true
-
- def response?; @response end
- def response=(b); @response = b end
-
- def qname
- :"#{parent.name}_#{name}"
- end
- end
-
- class Field
- fields(:name, :id, :type, :docs)
-
- def default
- case type
- when :bit then false
- when :octet, :short, :long, :longlong then 0
- when :shortstr, :longstr then ""
- when :table then {}
- end
- end
-
- end
-
- class Doc
- fields(:type, :text)
- end
-
- class Reference
-
- fields(:name)
-
- def init(&block)
- @resolver = block
- end
-
- def resolve(spec, klass)
- @resolver.call(spec, klass)
- end
-
- end
-
- class Loader
-
- def initialize()
- @stack = []
- end
-
- def load(obj)
- case obj
- when String
- elem = @stack[-1]
- result = Container.new()
- elem.elements.each(obj) {|e|
- @index = result.size
- result << load(e)
- }
- @index = nil
- return result
- else
- elem = obj
- @stack << elem
- begin
- result = send(:"load_#{elem.name}")
- ensure
- @stack.pop()
- end
- return result
- end
- end
-
- def element
- @stack[-1]
- end
-
- def text
- element.text
- end
-
- def attr(name, type = :string, default = nil)
- value = element.attributes[name]
- value = value.strip() unless value.nil?
- value = nil unless value.nil? or value.any?
- if value.nil? and not default.nil? then
- default
- else
- send(:"parse_#{type}", value)
- end
- end
-
- def parse_int(value)
- value.to_i
- end
-
- TRUE = ["yes", "true", "1"].to_set
- FALSE = ["no", "false", "0", nil].to_set
-
- def parse_bool(value)
- if TRUE.include?(value)
- true
- elsif FALSE.include?(value)
- false
- else
- raise Exception.new("parse error, expecting boolean: #{value}")
- end
- end
-
- def parse_string(value)
- value.to_s
- end
-
- def parse_symbol(value)
- value.intern() unless value.nil?
- end
-
- def parse_name(value)
- value.gsub(/[\s-]/, '_').intern() unless value.nil?
- end
-
- def load_amqp()
- Root.new(attr("major", :int), attr("minor", :int), load("class"),
- load("constant"), load("domain"))
- end
-
- def load_class()
- Class.new(attr("name", :name), attr("index", :int), attr("handler", :name),
- load("field"), load("method"), load("doc"))
- end
-
- def load_method()
- Method.new(attr("name", :name), attr("index", :int),
- attr("content", :bool), load("response"),
- attr("synchronous", :bool), load("field"), load("docs"))
- end
-
- def load_response()
- name = attr("name", :name)
- Reference.new {|spec, klass|
- response = klass.methods[name]
- if response.nil?
- raise Exception.new("no such method: #{name}")
- end
- response
- }
- end
-
- def load_field()
- type = attr("type", :name)
- if type.nil?
- domain = attr("domain", :name)
- type = Reference.new {|spec, klass|
- spec.domains[domain].type
- }
- end
- Field.new(attr("name", :name), @index, type, load("docs"))
- end
-
- def load_constant()
- Constant.new(attr("name", :name), attr("value", :int), attr("class", :name),
- load("doc"))
- end
-
- def load_domain()
- Domain.new(attr("name", :name), attr("type", :name))
- end
-
- def load_doc()
- Doc.new(attr("type", :symbol), text)
- end
-
- end
-
- def Spec.load(spec)
- case spec
- when String
- spec = File.new(spec)
- end
- doc = Document.new(spec)
- spec = Loader.new().load(doc.root)
- spec.classes.each do |klass|
- klass.traverse! do |o|
- case o
- when Reference
- o.resolve(spec, klass)
- else
- o
- end
- end
- klass.methods.each do |m|
- m.parent = klass
- m.responses.each do |r|
- r.response = true
- end
- end
- end
- spec
- end
-
-end
diff --git a/ruby/run-tests b/ruby/run-tests
deleted file mode 100755
index b4c51a75ed..0000000000
--- a/ruby/run-tests
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/usr/bin/ruby
-
-require "tests/channel"
-require "tests/basic"
diff --git a/ruby/tests/assembler.rb b/ruby/tests/assembler.rb
new file mode 100644
index 0000000000..1181ece547
--- /dev/null
+++ b/ruby/tests/assembler.rb
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "test/unit"
+require "qpid"
+require 'tests/util'
+
+require 'logger'
+
+class TestAssembler< Test::Unit::TestCase
+
+ Segment = Qpid::Segment
+ Assembler = Qpid::Assembler
+
+ def setup
+ # Qpid::asm_logger = Logger.new(STDOUT)
+
+ @server = Util::ServerThread.new do |socket|
+ asm = Assembler.new(socket)
+ begin
+ header = asm.read_header
+ asm.write_header(header[-2], header[-1])
+ loop do
+ seg = asm.read_segment
+ asm.write_segment(seg)
+ end
+ rescue Qpid::Closed
+ nil # Ignore
+ end
+ end
+ end
+
+ def teardown
+ @server.finish
+ @server.join
+ end
+
+ def test_assembler
+ asm = Assembler.new(@server.client, max_payload = 1)
+ asm.write_header(0, 10)
+ asm.write_segment(Segment.new(true, false, 1, 2, 3, "TEST"))
+ asm.write_segment(Segment.new(false, true, 1, 2, 3, "ING"))
+
+ assert_equal( ["AMQP", 1, 1, 0, 10], asm.read_header)
+
+ seg = asm.read_segment
+ assert_equal(true, seg.first_segment?)
+ assert_equal(false, seg.last_segment?)
+ assert_equal(1, seg.type)
+ assert_equal(2, seg.track)
+ assert_equal(3, seg.channel)
+ assert_equal("TEST", seg.payload)
+
+ seg = asm.read_segment
+ assert_equal(false, seg.first_segment?)
+ assert_equal(true, seg.last_segment?)
+ assert_equal(1, seg.type)
+ assert_equal(2, seg.track)
+ assert_equal(3, seg.channel)
+ assert_equal("ING", seg.payload)
+ end
+end
diff --git a/ruby/tests/codec010.rb b/ruby/tests/codec010.rb
new file mode 100644
index 0000000000..a9a5ca81e0
--- /dev/null
+++ b/ruby/tests/codec010.rb
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "test/unit"
+require "qpid"
+require "tests/util"
+require "socket"
+
+class CodecTest < Test::Unit::TestCase
+
+ def setup
+ @spec = Qpid::Spec010::load
+ end
+
+ def check(type, value)
+ t = @spec[type]
+ sc = Qpid::StringCodec.new(@spec)
+ t.encode(sc, value)
+ decoded = t.decode(sc)
+ assert_equal(value, decoded)
+ end
+
+
+ def testMapString
+ check("map", {"string" => "this is a test"})
+ end
+
+ def testMapInt
+ check("map", {"int" => 3})
+ end
+
+ def testMapLong
+ check("map", {"long" => 2**32})
+ end
+
+ def testMapNone
+ check("map", {"none" => None})
+ end
+
+ def testMapNested
+ check("map", {"map" => {"string" => "nested test"}})
+ end
+
+ def testMapList
+ check("map", {"list" => [1, "two", 3.0, -4]})
+ end
+
+ def testMapAll
+ check("map", {"string" => "this is a test",
+ "int" => 3,
+ "long" => 2**32,
+ "nil" => nil,
+ "map" => {"string" => "nested map"},
+ "list" => [1, "two", 3.0, -4]})
+ end
+
+ def testMapEmpty
+ check("map", {})
+ end
+
+ def testMapNone
+ check("map", nil)
+ end
+
+ def testList
+ check("list", [1, "two", 3.0, -4])
+ end
+
+ def testListEmpty
+ check("list", [])
+ end
+
+ def testListNone
+ check("list", nil)
+ end
+
+ def testArrayInt
+ check("array", [1, 2, 3, 4])
+ end
+
+ def testArrayString
+ check("array", ["one", "two", "three", "four"])
+ end
+
+ def testArrayEmpty
+ check("array", [])
+ end
+
+ def testArrayNone
+ check("array", nil)
+ end
+
+ def testInt64
+ check("int64", 2 ** 40 * -1 + 43)
+ end
+
+ def testUint64
+ check("int64", 2 ** 42)
+ end
+
+ def testReadNone
+ sc = Qpid::StringCodec.new(@spec)
+ # Python behaves this way
+ assert_equal("", sc.read(nil))
+ end
+end
diff --git a/ruby/tests/connection.rb b/ruby/tests/connection.rb
new file mode 100644
index 0000000000..c2a851ec0a
--- /dev/null
+++ b/ruby/tests/connection.rb
@@ -0,0 +1,246 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'test/unit'
+require 'qpid'
+require 'tests/util'
+
+class MockServer
+
+ def initialize(queue)
+ @queue = queue
+ end
+
+ def connection(conn, args={})
+ return Qpid::Delegate::Server.new(conn, :delegate => method(:session))
+ end
+
+ def session(ssn, args={})
+ ssn.auto_sync = false
+ return MockSession.new(ssn, @queue)
+ end
+end
+
+class MockSession < Qpid::Session::Delegate
+
+ def initialize(session, queue)
+ @session = session
+ @queue = queue
+ end
+
+ def execution_sync(es)
+ nil
+ end
+
+ def queue_query(qq)
+ return qq.st_type.result.create(qq.queue)
+ end
+
+ def message_transfer(cmd, headers, body)
+ if cmd.destination == "echo"
+ m = Qpid::Message.new(body)
+ m.headers = headers
+ @session.message_transfer(cmd.destination, cmd.accept_mode,
+ cmd.acquire_mode, m)
+ elsif cmd.destination == "abort"
+ @session.channel.connection.sock.close()
+ else
+ @queue.put([cmd, headers, body])
+ end
+ end
+
+ def exchange_declare(ed)
+ # do nothing
+ end
+end
+
+class TestConnectionTest < Test::Unit::TestCase
+
+ def setup
+ # Make sure errors in threads lead to a noisy death of the test
+ Thread.abort_on_exception = true
+
+ @queue = Qpid::Queue.new
+ @running = true
+ ts = MockServer.new(@queue)
+ @server = Util::ServerThread.new do |socket|
+ conn = Qpid::Connection.new(socket, :delegate => ts.method(:connection))
+ begin
+ conn.start(5)
+ rescue Qpid::Closed
+ # Ignore
+ end
+ end
+
+ class << @server
+ def finish
+ @running.lock
+ client.close
+ @sockets.each { |sock| sock.close unless sock.closed? }
+ end
+ end
+
+ @server[:name] = 'server'
+ Thread.current[:name] = 'test'
+ end
+
+ def teardown
+ @server.finish
+ @server.join
+ end
+
+ def connect
+ sock = @server.client
+ return Qpid::Connection.new(sock)
+ end
+
+ def test_basic
+ c = connect
+ c.start(10)
+
+ ssn1 = c.session("test1", :timeout => 10)
+ ssn2 = c.session("test2", :timeout => 10)
+
+ assert_equal(c.sessions["test1"], ssn1)
+ assert_equal(c.sessions["test2"], ssn2)
+ assert_not_nil ssn1.channel
+ assert_not_nil ssn2.channel
+ assert(c.attached.values.include?(ssn1))
+ assert(c.attached.values.include?(ssn2))
+
+ ssn1.close(5)
+
+ assert_nil(ssn1.channel)
+ assert(! c.attached.values.include?(ssn1))
+ assert(c.sessions.values.include?(ssn2))
+
+ ssn2.close(5)
+
+ assert_nil(ssn2.channel)
+ assert(! c.attached.values.include?(ssn2))
+ assert(! c.sessions.values.include?(ssn2))
+
+ ssn = c.session("session", :timeout => 10)
+
+ assert_not_nil(ssn.channel)
+ assert(c.sessions.values.include?(ssn))
+
+ destinations = ["one", "two", "three"]
+
+ destinations.each { |d| ssn.message_transfer(d) }
+
+ destinations.each do |d|
+ cmd, header, body = @queue.get(10)
+ assert_equal(d, cmd.destination)
+ assert_nil(header)
+ assert_nil(body)
+ end
+
+ msg = Qpid::Message.new("this is a test")
+ ssn.message_transfer("four", :message => msg)
+ cmd, header, body = @queue.get(10)
+ assert_equal("four", cmd.destination)
+ assert_nil(header)
+ assert_equal(msg.body, body)
+
+ qq = ssn.queue_query("asdf")
+ assert_equal("asdf", qq.queue)
+ c.close(5)
+ end
+
+ def test_close_get
+ c = connect
+ c.start(10)
+ ssn = c.session("test", :timeout => 10)
+ echos = ssn.incoming("echo")
+
+ 10.times do |i|
+ ssn.message_transfer("echo",
+ :message => Qpid::Message.new("test#{i}"))
+ end
+
+ ssn.auto_sync=false
+ ssn.message_transfer("abort")
+
+ 10.times do |i|
+ m = echos.get(timeout=10)
+ assert_equal("test#{i}", m.body)
+ end
+
+ begin
+ m = echos.get(timeout=10)
+ flunk("Expected Closed")
+ rescue Qpid::Closed
+ # Ignore
+ end
+ end
+
+ def test_close_listen
+ c = connect
+ c.start(10)
+ ssn = c.session("test", :timeout => 10)
+ echos = ssn.incoming("echo")
+
+ messages = []
+ exceptions = []
+ lock = Monitor.new
+ condition = lock.new_cond
+
+ echos.exc_listen do |e|
+ exceptions << e
+ lock.synchronize { condition.signal }
+ end
+ echos.listen do |m|
+ messages << m
+ end
+
+ 10.times do |i|
+ ssn.message_transfer("echo",
+ :message => Qpid::Message.new("test#{i}"))
+ end
+ ssn.auto_sync=false
+ ssn.message_transfer("abort")
+
+ lock.synchronize { condition.wait(10) }
+
+ 10.times do |i|
+ m = messages.shift
+ assert_equal("test#{i}", m.body)
+ end
+
+ assert_equal(1, exceptions.size)
+ end
+
+ def test_sync
+ c = connect
+ c.start(10)
+ s = c.session("test")
+ s.auto_sync = false
+ s.message_transfer("echo",
+ :message => Qpid::Message.new("test"))
+ s.sync(10)
+ end
+
+ def test_exchange_declare
+ c = connect
+ c.start(10)
+ s = c.session("test")
+ s.exchange_declare("test-exchange")
+ end
+end
diff --git a/ruby/tests/datatypes.rb b/ruby/tests/datatypes.rb
new file mode 100644
index 0000000000..65b1f9e3f5
--- /dev/null
+++ b/ruby/tests/datatypes.rb
@@ -0,0 +1,224 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'test/unit'
+require 'qpid'
+require 'tests/util'
+
+class TestSerial < Test::Unit::TestCase
+
+ def test_cmp
+ [0, 0x8FFFFFFF, 0xFFFFFFFF].each do |s|
+ s = s.to_serial
+ assert(s + 1 > s)
+ assert(s - 1 < s)
+ assert(s < s + 1)
+ assert(s > s - 1)
+ end
+ last = 0xFFFFFFFF.to_serial
+ zero = 0.to_serial
+ assert_equal(zero, last + 1)
+
+ assert_equal(last, [last, zero].min)
+ assert_equal(zero, [last, zero].max)
+ end
+
+ def test_incr
+ s = 0.to_serial
+ s += 1
+ assert_equal(1.to_serial, s)
+ end
+
+ def test_in
+ l = [1, 2, 3, 4].collect { |i| i.to_serial }
+ assert(l.include?(1.to_serial))
+ assert(l.include?((0xFFFFFFFF + 2).to_serial))
+ assert(l.include?(4))
+ end
+
+ def test_none
+ assert_not_equal(nil, 0.to_serial)
+ end
+
+ def test_hash
+ zero = 0.to_serial
+ d = { zero => :zero }
+ # FIXME: this does not work, since Ruby looks up the key and does
+ # a 0.eql?(zero), which bypasses the Qpid::Serial::eql?
+ # assert_equal(:zero, d[0])
+ end
+end
+
+class TestRangedSet < Test::Unit::TestCase
+
+ def assert_contains(rset, elts, nonelts = [])
+ assert_equal(elts, elts.select { |e| rset.include?(e) })
+ assert_equal(nonelts, nonelts.select { |e| ! rset.include?(e) })
+ end
+
+ def assert_ranges(rs, *ranges)
+ assert_equal(ranges.size, rs.ranges.size)
+ assert( ranges.all? { |rng| rs.include?(rng) } )
+ end
+
+ def test_simple
+ rs = Qpid::RangedSet.new
+
+ assert(rs.ranges.empty?)
+
+ rs.add(1)
+ assert_contains(rs, [1], [0,2])
+ assert_ranges(rs, 1..1)
+
+ rs.add(2)
+ assert_contains(rs, [1,2], [0,3])
+ assert_ranges(rs, 1..2)
+
+ rs.add(0)
+ assert_contains(rs, [0,1,2], [-1, 3])
+ assert_ranges(rs, 0..2)
+
+ rs.add(37)
+ assert_contains(rs, [0,1,2,37], [-1, 3, 36, 38])
+ assert_ranges(rs, 0..2, 37..37)
+
+ rs.add(-1)
+ assert_ranges(rs, -1..2, 37..37)
+
+ rs.add(-3)
+ assert_ranges(rs, -1..2, 37..37, -3..-3)
+
+ rs.add(1, 20)
+ assert_contains(rs, [20], [21])
+ assert_ranges(rs, -1..20, 37..37, -3..-3)
+
+ rs.add(21,36)
+ assert_ranges(rs, -1..37, -3..-3)
+
+ rs.add(-3, 5)
+ assert_ranges(rs, -3..37)
+ end
+
+ def test_add_self
+ a = Qpid::RangedSet.new
+ a.add(0, 8)
+ assert_ranges(a, 0..8)
+
+ a.add(0, 8)
+ assert_ranges(a, 0..8)
+ end
+end
+
+class TestRange < Test::Unit::TestCase
+
+ def test_intersect1
+ a = Range.new(0, 10)
+ b = Range.new(9, 20)
+ i1 = a.intersect(b)
+ i2 = b.intersect(a)
+ assert_equal(9..10, i1)
+ assert_equal(9..10, i2)
+ end
+
+ def test_intersect2
+ a = Range.new(0, 10)
+ b = Range.new(11, 20)
+ assert_equal(nil, a.intersect(b))
+ assert_equal(nil, b.intersect(a))
+ end
+
+ def test_intersect3
+ a = Range.new(0, 10)
+ b = Range.new(3, 5)
+ i1 = a.intersect(b)
+ i2 = b.intersect(a)
+ assert_equal(3..5, i1)
+ assert_equal(3..5, i2)
+ end
+end
+
+class TestUUIDTest < Test::Unit::TestCase
+
+ def test_simple
+ # this test is kind of lame, but it does excercise the basic
+ # functionality of the class
+ u = Qpid::UUID::uuid4
+ 1024.times { |i| assert_not_equal(u, Qpid::UUID::uuid4) }
+ assert_raise NotImplementedError do
+ u == 0
+ end
+ end
+end
+
+class TestMessage < Test::Unit::TestCase
+
+ def setup
+ @@spec ||= Qpid::Spec010::load()
+ @mp = Qpid::struct(@@spec["message_properties"])
+ @dp = Qpid::struct(@@spec["delivery_properties"])
+ @fp = Qpid::struct(@@spec["fragment_properties"])
+ end
+
+ def test_has
+ m = Qpid::Message.new(@mp, @dp, @fp, "body")
+ assert m.has("message_properties")
+ assert m.has("delivery_properties")
+ assert m.has("fragment_properties")
+ end
+
+ def test_get
+ m = Qpid::Message.new(@mp, @dp, @fp, "body")
+ assert_same(@mp, m.get("message_properties"))
+ assert_same(@dp, m.get("delivery_properties"))
+ assert_same(@fp, m.get("fragment_properties"))
+ end
+
+ def test_set
+ m = Qpid::Message.new(@mp, @dp, "body")
+ assert_nil m.get("fragment_properties")
+ m.set(@fp)
+ assert_same(@fp, m.get("fragment_properties"), "4")
+ end
+
+ def test_set_on_empty
+ m = Qpid::Message.new("body")
+ assert_nil m.get("delivery_properties")
+ m.set(@dp)
+ assert_same(@dp, m.get("delivery_properties"), "5")
+ end
+
+ def test_set_replace
+ m = Qpid::Message.new(@mp, @dp, @fp, "body")
+ dp = Qpid::struct(@@spec["delivery_properties"])
+ assert_same(@dp, m.get("delivery_properties"), "6")
+ m.set(dp)
+ assert_same(dp, m.get("delivery_properties"), "7")
+ end
+
+ def test_clear
+ m = Qpid::Message.new(@mp, @dp, @fp, "body")
+ assert_same(@mp, m.get("message_properties"), "8")
+ assert_same(@dp, m.get("delivery_properties"), "9")
+ assert_same(@fp, m.get("fragment_properties"), "10")
+ m.clear("fragment_properties")
+ assert_nil m.get("fragment_properties")
+ assert_same(@mp, m.get("message_properties"), "11")
+ assert_same(@dp, m.get("delivery_properties"), "12")
+ end
+end
diff --git a/ruby/tests/framer.rb b/ruby/tests/framer.rb
new file mode 100644
index 0000000000..1d56f2faf1
--- /dev/null
+++ b/ruby/tests/framer.rb
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "test/unit"
+require "qpid"
+require 'tests/util'
+
+require 'logger'
+
+class TestFramer < Test::Unit::TestCase
+
+ include Test
+
+ def setup
+ #Qpid::raw_logger = Logger.new(STDOUT)
+ #Qpid::frm_logger = Logger.new(STDOUT)
+
+ @server = Util::ServerThread.new do |socket|
+ conn = Qpid::Framer.new(socket)
+ begin
+ h = conn.read_header
+ conn.write_header(h[-2], h[-1])
+ loop do
+ frame = conn.read_frame
+ conn.write_frame(frame)
+ conn.flush
+ end
+ rescue Qpid::Closed
+ nil # Ignore
+ end
+ end
+ end
+
+ def teardown
+ @server.finish
+ @server.join
+ end
+
+ Frame = Qpid::Frame
+
+ def test_framer
+ c = Qpid::Framer.new(@server.client)
+
+ c.write_header(0, 10)
+ assert_equal( ["AMQP", 1, 1, 0, 10], c.read_header())
+
+ c.write_frame(Frame.new(Qpid::FIRST_FRM, 1, 2, 3, "THIS"))
+ c.write_frame(Frame.new(0, 1, 2, 3, "IS"))
+ c.write_frame(Frame.new(0, 1, 2, 3, "A"))
+ c.write_frame(Frame.new(Qpid::LAST_FRM, 1, 2, 3, "TEST"))
+ c.flush()
+
+ f = c.read_frame
+ assert(f.first_frame?)
+ assert(! f.last_frame?)
+ assert_equal(1, f.type)
+ assert_equal(2, f.track)
+ assert_equal(3, f.channel)
+ assert_equal("THIS", f.payload)
+
+ f = c.read_frame
+ assert_equal(0, f.flags)
+ assert_equal(1, f.type)
+ assert_equal(2, f.track)
+ assert_equal(3, f.channel)
+ assert_equal("IS", f.payload)
+
+ f = c.read_frame
+ assert_equal(0, f.flags)
+ assert_equal(1, f.type)
+ assert_equal(2, f.track)
+ assert_equal(3, f.channel)
+ assert_equal("A", f.payload)
+
+ f = c.read_frame
+ assert(f.last_frame?)
+ assert(! f.first_frame?)
+ assert_equal(1, f.type)
+ assert_equal(2, f.track)
+ assert_equal(3, f.channel)
+ assert_equal("TEST", f.payload)
+ end
+end
diff --git a/ruby/tests/qmf.rb b/ruby/tests/qmf.rb
new file mode 100644
index 0000000000..274e38416e
--- /dev/null
+++ b/ruby/tests/qmf.rb
@@ -0,0 +1,248 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "test/unit"
+require "qpid"
+require "tests/util"
+require "socket"
+require "monitor.rb"
+
+class QmfTest < Test::Unit::TestCase
+
+ class Handler < Qpid::Qmf::Console
+ include MonitorMixin
+
+ def initialize
+ super()
+ @xmt_list = {}
+ @rcv_list = {}
+ end
+
+ def method_response(broker, seq, response)
+ synchronize do
+ @rcv_list[seq] = response
+ end
+ end
+
+ def request(broker, count)
+ @count = count
+ for idx in 0...count
+ synchronize do
+ seq = broker.echo(idx, "Echo Message", :async => true)
+ @xmt_list[seq] = idx
+ end
+ end
+ end
+
+ def check
+ return "fail (attempted send=%d, actual sent=%d)" % [@count, @xmt_list.size] unless @count == @xmt_list.size
+ lost = 0
+ mismatched = 0
+ @xmt_list.each do |seq, value|
+ if @rcv_list.include?(seq)
+ result = @rcv_list.delete(seq)
+ mismatch += 1 unless result.sequence == value
+ else
+ lost += 1
+ end
+ end
+ spurious = @rcv_list.size
+ if lost == 0 and mismatched == 0 and spurious == 0
+ return "pass"
+ else
+ return "fail (lost=%d, mismatch=%d, spurious=%d)" % [lost, mismatched, spurious]
+ end
+ end
+ end
+
+ def setup()
+ # Make sure errors in threads lead to a noisy death of the test
+ Thread.abort_on_exception = true
+
+ @host = ENV.fetch("QMF_TEST_HOST", 'localhost')
+ @port = ENV.fetch("QMF_TEST_PORT", 5672)
+
+ sock = TCPSocket.new(@host, @port)
+
+ @conn = Qpid::Connection.new(sock)
+ @conn.start()
+
+ @session = @conn.session("test-session")
+ end
+
+ def teardown
+ unless @session.error?
+ @session.close(10)
+ end
+ @conn.close(10)
+ if @qmf
+ @qmf.del_broker(@qmf_broker)
+ end
+ end
+
+ def start_qmf(kwargs = {})
+ @qmf = Qpid::Qmf::Session.new(kwargs)
+ @qmf_broker = @qmf.add_broker("amqp://%s:%d" % [@host, @port])
+
+ brokers = @qmf.objects(:class => "broker")
+ assert_equal(1, brokers.length)
+ @broker = brokers[0]
+ end
+
+ def test_methods_sync()
+ start_qmf
+ body = "Echo Message Body"
+ for seq in 1..10
+ res = @broker.echo(seq, body, :timeout => 10)
+ assert_equal(0, res.status)
+ assert_equal("OK", res.text)
+ assert_equal(seq, res.sequence)
+ assert_equal(body, res.body)
+ end
+ end
+
+ def test_methods_async()
+ handler = Handler.new
+ start_qmf(:console => handler)
+ handler.request(@broker, 20)
+ sleep(1)
+ assert_equal("pass", handler.check)
+ end
+
+ def test_move_queued_messages()
+ """
+ Test ability to move messages from the head of one queue to another.
+ Need to test moveing all and N messages.
+ """
+
+ "Set up source queue"
+ start_qmf
+ @session.queue_declare(:queue => "src-queue", :exclusive => true, :auto_delete => true)
+ @session.exchange_bind(:queue => "src-queue", :exchange => "amq.direct", :binding_key => "routing_key")
+
+ props = @session.delivery_properties(:routing_key => "routing_key")
+ for count in 1..20
+ body = "Move Message %d" % count
+ src_msg = Qpid::Message.new(props, body)
+ @session.message_transfer(:destination => "amq.direct", :message => src_msg)
+ end
+
+ "Set up destination queue"
+ @session.queue_declare(:queue => "dest-queue", :exclusive => true, :auto_delete => true)
+ @session.exchange_bind(:queue => "dest-queue", :exchange => "amq.direct")
+
+ queues = @qmf.objects(:class => "queue")
+
+ "Move 10 messages from src-queue to dest-queue"
+ result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+ assert_equal(0, result.status)
+
+ sq = @qmf.objects(:class => "queue", "name" => "src-queue")[0]
+ dq = @qmf.objects(:class => "queue", "name" => "dest-queue")[0]
+
+ assert_equal(10, sq.msgDepth)
+ assert_equal(10, dq.msgDepth)
+
+ "Move all remaining messages to destination"
+ result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+ assert_equal(0, result.status)
+
+ sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0]
+ dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0]
+
+ assert_equal(0, sq.msgDepth)
+ assert_equal(20, dq.msgDepth)
+
+ "Use a bad source queue name"
+ result = @qmf.objects(:class => "broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+ assert_equal(4, result.status)
+
+ "Use a bad destination queue name"
+ result = @qmf.objects(:class => "broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+ assert_equal(4, result.status)
+
+ " Use a large qty (40) to move from dest-queue back to "
+ " src-queue- should move all "
+ result = @qmf.objects(:class => "broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+ assert_equal(0, result.status)
+
+ sq = @qmf.objects(:class => "queue", 'name' => "src-queue")[0]
+ dq = @qmf.objects(:class => "queue", 'name' => "dest-queue")[0]
+
+ assert_equal(20, sq.msgDepth)
+ assert_equal(0, dq.msgDepth)
+
+ "Consume the messages of the queue and check they are all there in order"
+ @session.message_subscribe(:queue => "src-queue",
+ :destination => "tag")
+ @session.message_flow(:destination => "tag",
+ :unit => @session.message_credit_unit.message,
+ :value => 0xFFFFFFFF)
+ @session.message_flow(:destination => "tag",
+ :unit => @session.message_credit_unit.byte,
+ :value => 0xFFFFFFFF)
+ queue = @session.incoming("tag")
+ for count in 1..20
+ consumed_msg = queue.get(timeout=1)
+ body = "Move Message %d" % count
+ assert_equal(body, consumed_msg.body)
+ end
+ end
+
+ # Test ability to purge messages from the head of a queue. Need to test
+ # moveing all, 1 (top message) and N messages.
+ def test_purge_queue
+ start_qmf
+ # Set up purge queue"
+ @session.queue_declare(:queue => "purge-queue",
+ :exclusive => true,
+ :auto_delete => true)
+ @session.exchange_bind(:queue => "purge-queue",
+ :exchange => "amq.direct",
+ :binding_key => "routing_key")
+
+ props = @session.delivery_properties(:routing_key => "routing_key")
+ 20.times do |count|
+ body = "Purge Message %d" % count
+ msg = Qpid::Message.new(props, body)
+ @session.message_transfer(:destination => "amq.direct",
+ :message => msg)
+ end
+
+ pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0]
+
+ "Purge top message from purge-queue"
+ result = pq.purge(1)
+ assert_equal(0, result.status)
+ pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0]
+ assert_equal(19, pq.msgDepth)
+
+ "Purge top 9 messages from purge-queue"
+ result = pq.purge(9)
+ assert_equal(0, result.status)
+ pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0]
+ assert_equal(10, pq.msgDepth)
+
+ "Purge all messages from purge-queue"
+ result = pq.purge(0)
+ assert_equal(0, result.status)
+ pq = @qmf.objects(:class => "queue", 'name' => "purge-queue")[0]
+ assert_equal(0, pq.msgDepth)
+ end
+end
diff --git a/ruby/tests/queue.rb b/ruby/tests/queue.rb
new file mode 100644
index 0000000000..4ec0e07ffb
--- /dev/null
+++ b/ruby/tests/queue.rb
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'test/unit'
+require 'qpid'
+
+class TestQueue < Test::Unit::TestCase
+
+ # The qpid queue class just provides sime simple extensions to
+ # python's standard queue data structure, so we don't need to test
+ # all the queue functionality.
+
+ def setup
+ # Make sure errors in threads lead to a noisy death of the test
+ Thread.abort_on_exception = true
+ end
+
+ def test_listen
+ values = []
+ heard = Qpid::Util::Event.new
+
+ listener = Proc.new do |x|
+ values << x
+ heard.set
+ end
+
+ q = Qpid::Queue.new
+ q.listen(&listener)
+
+ heard.clear
+ q.put(1)
+ heard.wait
+ assert_equal([1], values)
+ heard.clear
+ q.put(2)
+ heard.wait
+ assert_equal([1, 2], values)
+
+ q.listen
+ q.put(3)
+ assert_equal(3, q.get)
+
+ q.listen(&listener)
+ heard.clear
+ q.put(4)
+ heard.wait
+ assert_equal([1,2,4], values)
+ end
+
+ def test_close
+ q = Qpid::Queue.new
+ (1..3).each { |i| q.put(i) }
+ q.close
+ assert_equal(1, q.get)
+ assert_equal(2, q.get)
+ assert_equal(3, q.get)
+ 10.times do |i|
+ assert_raises(Qpid::Closed) do
+ q.get
+ end
+ end
+ end
+
+end
diff --git a/ruby/tests/spec010.rb b/ruby/tests/spec010.rb
new file mode 100644
index 0000000000..6db1523455
--- /dev/null
+++ b/ruby/tests/spec010.rb
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "test/unit"
+require "qpid/test"
+require "qpid/spec010"
+
+class SpecTest < Test::Unit::TestCase
+
+ def setup()
+ @spec = Qpid::Spec010.load()
+ end
+
+ def testSessionHeader()
+ hdr = @spec[:header]
+ sc = Qpid::StringCodec.new(@spec)
+ hdr.encode(sc, Qpid::struct(hdr, :sync=>true))
+ assert sc.encoded == "\x01\x01"
+
+ sc = Qpid::StringCodec.new(@spec)
+ hdr.encode(sc, Qpid::struct(hdr, :sync=>false))
+ assert sc.encoded == "\x01\x00"
+ end
+
+ def encdec(type, value)
+ sc = Qpid::StringCodec.new(@spec)
+ type.encode(sc, value)
+ decoded = type.decode(sc)
+ return decoded
+ end
+
+ def testMessageProperties()
+ mp = @spec[:message_properties]
+ rt = @spec[:reply_to]
+
+ props = Qpid::struct(mp,
+ :content_length=>3735928559,
+ :reply_to=>Qpid::struct(rt,
+ :exchange=>"the exchange name",
+ :routing_key=>"the routing key"))
+ dec = encdec(mp, props)
+ assert props.content_length == dec.content_length
+ assert props.reply_to.exchange == dec.reply_to.exchange
+ assert props.reply_to.routing_key == dec.reply_to.routing_key
+ end
+
+ def testMessageSubscribe()
+ ms = @spec[:message_subscribe]
+ cmd = Qpid::struct(ms, :exclusive=>true, :destination=>"this is a test")
+ dec = encdec(@spec[:message_subscribe], cmd)
+ assert cmd.exclusive == dec.exclusive
+ assert cmd.destination == dec.destination
+ end
+
+ def testXid()
+ xid = @spec[:xid]
+ sc = Qpid::StringCodec.new(@spec)
+ st = Qpid::struct(xid, :format=>0, :global_id=>"gid", :branch_id=>"bid")
+ xid.encode(sc, st)
+ assert sc.encoded == "\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid"
+ assert xid.decode(sc) == st
+ end
+
+end
diff --git a/ruby/tests/util.rb b/ruby/tests/util.rb
new file mode 100644
index 0000000000..b22a6bab2f
--- /dev/null
+++ b/ruby/tests/util.rb
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'thread'
+require 'socket'
+
+module Util
+
+ TOPDIR = File::dirname(File::dirname(File::expand_path(__FILE__)))
+ SPEC = File::join(TOPDIR, "specs", "amqp.0-10-qpid-errata.xml")
+
+ PORT = 1234
+ HOST = "0.0.0.0"
+
+ def self.connect(host = HOST, port = PORT)
+ TCPSocket.new(host, port)
+ end
+
+ class ServerThread < Thread
+ def initialize(&block)
+ @sockets = []
+ @running = Mutex.new
+ started = Qpid::Util::Event.new
+ super(started, @running) do |started, running|
+ tcp_srv = TCPServer.new(HOST, PORT)
+ begin
+ started.set
+ while ! running.locked? and (session = tcp_srv.accept)
+ yield(session)
+ end
+ rescue Exception => e
+ # Exceptions in the server thread are hard to see
+ # Make sure they apear loudly on the console
+ $stderr.puts "#{ "*" * 20} Server exception #{ "*" * 20}"
+ $stderr.puts e.message
+ $stderr.puts e.backtrace
+ raise
+ ensure
+ tcp_srv.close
+ end
+ end
+ started.wait
+ end
+
+ def finish
+ @running.lock
+ @sockets.each { |sock| sock.close unless sock.closed? }
+ end
+
+ def client(host = HOST, port = PORT)
+ sock = Util::connect(host, port)
+ @sockets << sock
+ sock
+ end
+ end
+end
diff --git a/ruby/tests_0-8/basic.rb b/ruby/tests_0-8/basic.rb
index 0018050fe2..10a43b1aab 100644
--- a/ruby/tests_0-8/basic.rb
+++ b/ruby/tests_0-8/basic.rb
@@ -23,13 +23,13 @@ require "qpid"
class Basic < Test::Unit::TestCase
- include Qpid::Test
+ include Qpid08::Test
def publish(body, headers = {})
cli = connect()
ch = cli.channel(1)
ch.channel_open()
- content = Qpid::Content.new(headers, body)
+ content = Qpid08::Content.new(headers, body)
ch.basic_publish(:content => content)
msg = ch.channel_close()
assert msg.method.qname == :channel_close_ok
@@ -42,7 +42,7 @@ class Basic < Test::Unit::TestCase
ch.queue_declare(:queue => "test-queue")
ch.queue_bind(:queue_name => "test-queue")
ch.basic_consume(:queue => "test-queue", :consumer_tag => "ctag")
- content = Qpid::Content.new(headers, body)
+ content = Qpid08::Content.new(headers, body)
ch.basic_publish(:routing_key => "test-queue", :content => content)
queue = cli.queue("ctag")
msg = queue.pop()
diff --git a/ruby/tests_0-8/channel.rb b/ruby/tests_0-8/channel.rb
index 31c5f19d92..1eea8f18d9 100644
--- a/ruby/tests_0-8/channel.rb
+++ b/ruby/tests_0-8/channel.rb
@@ -23,7 +23,7 @@ require "qpid"
class Channel < Test::Unit::TestCase
- include Qpid::Test
+ include Qpid08::Test
def test_channel_open_close()
c = connect()