summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-09 19:52:44 +0000
committerGordon Sim <gsim@apache.org>2008-04-09 19:52:44 +0000
commit6cf06312f5f9d686a0af76f7c1c08732a7ae27cb (patch)
treed415498924b234d73645a9ae74f486085fe63f15
parent363ed6d7e6a0986c49a9ae5d43954dfec08e7e8c (diff)
downloadqpid-python-6cf06312f5f9d686a0af76f7c1c08732a7ae27cb.tar.gz
Fixes and automated tests for federation function.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@646505 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp4
-rw-r--r--cpp/src/qpid/broker/Connection.cpp11
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.cpp14
-rw-r--r--cpp/src/tests/Makefile.am3
-rwxr-xr-xcpp/src/tests/federation.py191
-rwxr-xr-xcpp/src/tests/run_federation_tests24
-rw-r--r--python/commands/qpid-route22
7 files changed, 247 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 566b9cc197..32819380de 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -49,8 +49,6 @@ void Bridge::create()
framing::AMQP_ServerProxy::Session session(channel);
session.open(0);
- //peer.getSession().open(0);
-
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
} else {
@@ -62,7 +60,7 @@ void Bridge::create()
string queue = "bridge_queue_";
queue += Uuid(true).str();
peer.getQueue().declare(0, queue, "", false, false, true, true, FieldTable());
- peer.getQueue().bind(0, queue, args.i_dest, args.i_key, FieldTable());
+ peer.getQueue().bind(0, queue, args.i_src, args.i_key, FieldTable());
peer.getMessage().subscribe(0, queue, args.i_dest, false, 0, 0, false, FieldTable());
peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 1e55087390..ef1100a2ec 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -70,7 +70,7 @@ class Connection::MgmtLink : public Connection::MgmtWrapper
Bridges cancelled;//holds list of bridges pending cancellation
Bridges active;//holds active bridges
uint channelCounter;
- sys::Mutex lock;
+ sys::Mutex linkLock;
void cancel(Bridge*);
@@ -88,7 +88,7 @@ public:
Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_) :
ConnectionState(out_, broker_),
adapter(*this),
- mgmtClosing(0),
+ mgmtClosing(false),
mgmtId(mgmtId_)
{
initMgmt();
@@ -164,6 +164,7 @@ bool Connection::doOutput()
try{
//process any pending mgmt commands:
if (mgmtWrapper.get()) mgmtWrapper->processPending();
+ if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
//then do other output as needed:
return outputTasks.doOutput();
@@ -203,8 +204,9 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId,
switch (methodId)
{
case management::Client::METHOD_CLOSE :
- mgmtClosing = 1;
+ mgmtClosing = true;
if (mgmtWrapper.get()) mgmtWrapper->closing();
+ out->activateOutput();
status = Manageable::STATUS_OK;
break;
case management::Link::METHOD_BRIDGE :
@@ -253,6 +255,7 @@ void Connection::MgmtLink::closing()
void Connection::MgmtLink::processPending()
{
+ Mutex::ScopedLock l(linkLock);
//process any pending creates
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -271,6 +274,7 @@ void Connection::MgmtLink::processPending()
void Connection::MgmtLink::process(Connection& connection, const management::Args& args)
{
+ Mutex::ScopedLock l(linkLock);
created.push_back(new Bridge(channelCounter++, connection,
boost::bind(&MgmtLink::cancel, this, _1),
dynamic_cast<const management::ArgsLinkBridge&>(args)));
@@ -278,6 +282,7 @@ void Connection::MgmtLink::process(Connection& connection, const management::Arg
void Connection::MgmtLink::cancel(Bridge* b)
{
+ Mutex::ScopedLock l(linkLock);
//need to take this out the active map and add it to the cancelled map
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
if (&(*i) == b) {
diff --git a/cpp/src/qpid/broker/PreviewConnection.cpp b/cpp/src/qpid/broker/PreviewConnection.cpp
index f5a629248c..8901aa9f9d 100644
--- a/cpp/src/qpid/broker/PreviewConnection.cpp
+++ b/cpp/src/qpid/broker/PreviewConnection.cpp
@@ -70,7 +70,7 @@ class PreviewConnection::MgmtLink : public PreviewConnection::MgmtWrapper
Bridges cancelled;//holds list of bridges pending cancellation
Bridges active;//holds active bridges
uint channelCounter;
- sys::Mutex lock;
+ sys::Mutex linkLock;
void cancel(Bridge*);
@@ -88,7 +88,7 @@ public:
PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
ConnectionState(out_, broker_),
adapter(*this, isLink),
- mgmtClosing(0),
+ mgmtClosing(false),
mgmtId(mgmtId_)
{
Manageable* parent = broker.GetVhostObject ();
@@ -111,7 +111,7 @@ PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& brok
PreviewConnection::~PreviewConnection () {}
void PreviewConnection::received(framing::AMQFrame& frame){
- if (mgmtClosing)
+ if (mgmtClosing)
close (403, "Closed by Management Request", 0, 0);
if (frame.getChannel() == 0) {
@@ -159,6 +159,8 @@ bool PreviewConnection::doOutput()
try{
//process any pending mgmt commands:
if (mgmtWrapper.get()) mgmtWrapper->processPending();
+ if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
+
//then do other output as needed:
return outputTasks.doOutput();
@@ -198,8 +200,9 @@ Manageable::status_t PreviewConnection::ManagementMethod (uint32_t methodId,
switch (methodId)
{
case management::Client::METHOD_CLOSE :
- mgmtClosing = 1;
+ mgmtClosing = true;
if (mgmtWrapper.get()) mgmtWrapper->closing();
+ out->activateOutput();
status = Manageable::STATUS_OK;
break;
case management::Link::METHOD_BRIDGE :
@@ -248,6 +251,7 @@ void PreviewConnection::MgmtLink::closing()
void PreviewConnection::MgmtLink::processPending()
{
+ Mutex::ScopedLock l(linkLock);
//process any pending creates
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -266,6 +270,7 @@ void PreviewConnection::MgmtLink::processPending()
void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const management::Args& args)
{
+ Mutex::ScopedLock l(linkLock);
created.push_back(new Bridge(channelCounter++, connection,
boost::bind(&MgmtLink::cancel, this, _1),
dynamic_cast<const management::ArgsLinkBridge&>(args)));
@@ -273,6 +278,7 @@ void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const m
void PreviewConnection::MgmtLink::cancel(Bridge* b)
{
+ Mutex::ScopedLock l(linkLock);
//need to take this out the active map and add it to the cancelled map
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
if (&(*i) == b) {
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 487fb08c9a..3f49d2c6af 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -118,7 +118,7 @@ check_PROGRAMS += $(testprogs) interop_runner
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= $(srcdir)/run_test
system_tests = client_test quick_perftest quick_topictest
-TESTS += run-unit-tests start_broker $(system_tests) python_tests stop_broker
+TESTS += run-unit-tests start_broker $(system_tests) python_tests stop_broker run_federation_tests
EXTRA_DIST += \
run_test vg_check \
@@ -126,6 +126,7 @@ EXTRA_DIST += \
quick_topictest \
quick_perftest \
topictest \
+ run_federation_tests \
.valgrind.supp \
.valgrindrc \
MessageUtils.h \
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
new file mode 100755
index 0000000000..28ebc4a24d
--- /dev/null
+++ b/cpp/src/tests/federation.py
@@ -0,0 +1,191 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from qpid.testlib import TestBase, testrunner
+from qpid.management import managementChannel, managementClient
+from qpid.queue import Empty
+from qpid.content import Content
+
+
+def scan_args(name, default=None, args=sys.argv[1:]):
+ if (name in args):
+ pos = args.index(name)
+ return args[pos + 1]
+ elif default:
+ return default
+ else:
+ print "Please specify extra argument: %s" % name
+ sys.exit(2)
+
+def extract_args(name, args):
+ if (name in args):
+ pos = args.index(name)
+ del args[pos:pos+2]
+ else:
+ return None
+
+def remote_host():
+ return scan_args("--remote-host", "localhost")
+
+def remote_port():
+ return int(scan_args("--remote-port"))
+
+class Helper:
+ def __init__(self, parent):
+ self.parent = parent
+ self.channel = parent.client.channel(2)
+ self.mc = managementClient(self.channel.spec)
+ self.mch = self.mc.addChannel(self.channel)
+ self.mc.syncWaitForStable(self.mch)
+
+ def get_objects(self, type):
+ return self.mc.syncGetObjects(self.mch, type)
+
+ def get_object(self, type, position = 1, expected = None):
+ objects = self.get_objects(type)
+ if not expected: expected = position
+ self.assertEqual(len(objects), expected)
+ return objects[(position - 1)]
+
+
+ def call_method(self, object, method, args=None):
+ res = self.mc.syncCallMethod(self.mch, object.id, object.classKey, method, args)
+ self.assertEqual(res.status, 0)
+ self.assertEqual(res.statusText, "OK")
+ return res
+
+ def assertEqual(self, a, b):
+ self.parent.assertEqual(a, b)
+
+class FederationTests(TestBase):
+
+ def test_bridge_create_and_close(self):
+ mgmt = Helper(self)
+ broker = mgmt.get_object("broker")
+
+ for i in range(10):
+ mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
+ link = mgmt.get_object("link")
+
+ mgmt.call_method(link, "bridge", {"src":"amq.direct", "dest":"amq.direct", "key":"my-key"})
+ bridge = mgmt.get_object("bridge")
+
+ mgmt.call_method(bridge, "close")
+ self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+
+ mgmt.call_method(link, "close")
+ self.assertEqual(len(mgmt.get_objects("link")), 0)
+
+ def test_pull_from_exchange(self):
+ channel = self.channel
+
+ mgmt = Helper(self)
+ broker = mgmt.get_object("broker")
+
+ mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
+ link = mgmt.get_object("link")
+
+ mgmt.call_method(link, "bridge", {"src":"amq.direct", "dest":"amq.fanout", "key":"my-key"})
+ bridge = mgmt.get_object("bridge")
+
+ #setup queue to receive messages from local broker
+ channel.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ channel.queue_bind(queue="fed1", exchange="amq.fanout")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = self.client.queue("f1")
+
+ #send messages to remote broker and confirm it is routed to local broker
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_channel = r_conn.channel(1)
+ r_channel.session_open()
+
+ for i in range(1, 11):
+ r_channel.message_transfer(destination="amq.direct", content=Content(properties={'routing_key' : "my-key"}, body="Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.content.body)
+ except Empty: None
+
+
+ mgmt.call_method(bridge, "close")
+ self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+
+ mgmt.call_method(link, "close")
+ self.assertEqual(len(mgmt.get_objects("link")), 0)
+
+ def test_pull_from_queue(self):
+ channel = self.channel
+
+ #setup queue on remote broker and add some messages
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_channel = r_conn.channel(1)
+ r_channel.session_open()
+ r_channel.queue_declare(queue="my-bridge-queue", exclusive=True, auto_delete=True)
+ for i in range(1, 6):
+ r_channel.message_transfer(content=Content(properties={'routing_key' : "my-bridge-queue"}, body="Message %d" % i))
+
+ #setup queue to receive messages from local broker
+ channel.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ channel.queue_bind(queue="fed1", exchange="amq.fanout")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = self.client.queue("f1")
+
+ mgmt = Helper(self)
+ broker = mgmt.get_object("broker")
+
+ mgmt.call_method(broker, "connect", {"host":remote_host(), "port":remote_port()})
+ link = mgmt.get_object("link")
+
+ mgmt.call_method(link, "bridge", {"src":"my-bridge-queue", "dest":"amq.fanout", "key":"", "src_is_queue":1})
+ bridge = mgmt.get_object("bridge")
+
+ #add some more messages (i.e. after bridge was created)
+ for i in range(6, 11):
+ r_channel.message_transfer(content=Content(properties={'routing_key' : "my-bridge-queue"}, body="Message %d" % i))
+
+ for i in range(1, 11):
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.content.body)
+ except Empty: None
+
+
+ mgmt.call_method(bridge, "close")
+ self.assertEqual(len(mgmt.get_objects("bridge")), 0)
+
+ mgmt.call_method(link, "close")
+ self.assertEqual(len(mgmt.get_objects("link")), 0)
+
+if __name__ == '__main__':
+ args = sys.argv[1:]
+ #need to remove the extra options from args as test runner doesn't recognise them
+ extract_args("--remote-port", args)
+ extract_args("--remote-host", args)
+ #add module(s) to run to testrunners args
+ args.append("federation")
+
+ if not testrunner.run(args): sys.exit(1)
diff --git a/cpp/src/tests/run_federation_tests b/cpp/src/tests/run_federation_tests
new file mode 100755
index 0000000000..63f0baa1ae
--- /dev/null
+++ b/cpp/src/tests/run_federation_tests
@@ -0,0 +1,24 @@
+#!/bin/sh
+# Run the federation tests.
+
+trap stop_brokers EXIT
+
+start_brokers() {
+ ../qpidd --daemon --port 0 --no-data-dir > qpidd.port
+ LOCAL_PORT=`cat qpidd.port`
+ ../qpidd --daemon --port 0 --no-data-dir > qpidd.port
+ REMOTE_PORT=`cat qpidd.port`
+}
+
+stop_brokers() {
+ ../qpidd -q --port $LOCAL_PORT
+ ../qpidd -q --port $REMOTE_PORT
+}
+
+if test -d ../../../python ; then
+ start_brokers
+ echo "Running federation tests using brokers on ports $LOCAL_PORT $REMOTE_PORT"
+ export PYTHONPATH=../../../python
+ ./federation.py -v -s ../../../specs/amqp.0-10-preview.xml -b localhost:$LOCAL_PORT --remote-port $REMOTE_PORT || { echo "FAIL federation tests"; exit 1; }
+fi
+
diff --git a/python/commands/qpid-route b/python/commands/qpid-route
index 3f29760ef5..c69ca6204f 100644
--- a/python/commands/qpid-route
+++ b/python/commands/qpid-route
@@ -86,7 +86,7 @@ class RouteManager:
self.mclient.syncWaitForStable (self.mch)
except socket.error, e:
print "Connect Error:", e
- exit (1)
+ sys.exit (1)
def getLink (self):
links = self.mclient.syncGetObjects (self.mch, "link")
@@ -117,20 +117,20 @@ class RouteManager:
if link == None:
print "Protocol Error - Missing link ID"
- exit (1)
+ sys.exit (1)
bridges = mc.syncGetObjects (self.mch, "bridge")
for bridge in bridges:
if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey:
if not _quiet:
print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)
- exit (1)
- exit (0)
+ sys.exit (1)
+ sys.exit (0)
if _verbose:
print "Creating inter-broker binding..."
bridgeArgs = {}
- bridgeArgs["src"] = "src"
+ bridgeArgs["src"] = exchange
bridgeArgs["dest"] = exchange
bridgeArgs["key"] = routingKey
bridgeArgs["src_is_queue"] = 0
@@ -147,8 +147,8 @@ class RouteManager:
if link == None:
if not _quiet:
print "No link found from %s to %s" % (self.src.name(), self.dest.name())
- exit (1)
- exit (0)
+ sys.exit (1)
+ sys.exit (0)
bridges = mc.syncGetObjects (self.mch, "bridge")
for bridge in bridges:
@@ -158,18 +158,18 @@ class RouteManager:
res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close")
if res.status != 0:
print "Error closing bridge: %d - %s" % (res.status, res.statusText)
- exit (1)
+ sys.exit (1)
if len (bridges) == 1:
if _verbose:
print "Last bridge on link, closing link..."
res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
if res.status != 0:
print "Error closing link: %d - %s" % (res.status, res.statusText)
- exit (1)
- exit (0)
+ sys.exit (1)
+ sys.exit (0)
if not _quiet:
print "Route not found"
- exit (1)
+ sys.exit (1)
def ListRoutes (self):
mc = self.mclient