summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-06-18 22:36:51 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-06-18 22:36:51 +0000
commitc0a9cdb63a9e1b58bfed14dd6a3f7d8c6375e5af (patch)
tree8c11f27f49af76c55ff188a3a822302c801b62cf
parent5fbacc774744500e604d58d8904e1c3f8f09578a (diff)
downloadqpid-python-c0a9cdb63a9e1b58bfed14dd6a3f7d8c6375e5af.tar.gz
QPID-4063: allow configuration of source queue for exchange or dynamic routes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1351518 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp24
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp6
-rwxr-xr-xqpid/cpp/src/tests/federation.py165
4 files changed, 187 insertions, 12 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 60d8263a76..63325a1e91 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -59,13 +59,16 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
CancellationListener l, const _qmf::ArgsLinkBridge& _args,
- InitializeCallback init, const string& qn, const string& ae) :
+ InitializeCallback init, const std::string& _queueName, const string& ae) :
link(_link), channel(_id), args(_args), mgmtObject(0),
- listener(l), name(_name), queueName(qn), altEx(ae), persistenceId(0),
- connState(0), conn(0), initialize(init), detached(false)
+ listener(l), name(_name),
+ queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag()
+ : _queueName),
+ altEx(ae), persistenceId(0),
+ connState(0), conn(0), initialize(init), detached(false),
+ useExistingQueue(!_queueName.empty()),
+ sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag())
{
- if (queueName.empty())
- queueName = "qpid.bridge_queue_" + Uuid(true).str();
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
@@ -102,10 +105,10 @@ void Bridge::create(Connection& c)
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
- session->attach(queueName, false);
+ session->attach(sessionName, false);
session->commandPoint(0,0);
} else {
- sessionHandler.attachAs(queueName);
+ sessionHandler.attachAs(sessionName);
// Point the bridging commands at the remote peer broker
peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
}
@@ -137,8 +140,9 @@ void Bridge::create(Connection& c)
}
bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
- bool autoDelete = !durable;//auto delete transient queues?
- peer->getQueue().declare(queueName, altEx, false, durable, true, autoDelete, queueSettings);
+ bool exclusive = !useExistingQueue; // only exclusive if the queue is owned by the bridge
+ bool autoDelete = exclusive && !durable;//auto delete transient queues?
+ peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
if (!args.i_dynamic)
peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
@@ -162,7 +166,7 @@ void Bridge::cancel(Connection&)
{
if (resetProxy()) {
peer->getMessage().cancel(args.i_dest);
- peer->getSession().detach(queueName);
+ peer->getSession().detach(sessionName);
}
QPID_LOG(debug, "Cancelled bridge " << name);
}
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 8d6ed03faf..662590aa45 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -58,6 +58,7 @@ class Bridge : public PersistableConfig,
Bridge(const std::string& name, Link* link, framing::ChannelId id, CancellationListener l,
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args,
+
InitializeCallback init, const std::string& queueName="",
const std::string& altExchange=""
);
@@ -146,6 +147,9 @@ class Bridge : public PersistableConfig,
void closed();
friend class Link; // to call create, cancel, closed()
boost::shared_ptr<ErrorListener> errorListener;
+
+ const bool useExistingQueue;
+ const std::string sessionName;
};
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 77c578bd37..c202d9c4e8 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -797,6 +797,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
std::string key;
std::string id;
std::string excludes;
+ std::string queueName;
bool durable = false;
bool srcIsQueue = false;
bool srcIsLocal = false;
@@ -816,6 +817,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
else if (i->first == DYNAMIC) dynamic = bool(i->second);
else if (i->first == SYNC) sync = i->second.asUint16();
else if (i->first == DURABLE) durable = bool(i->second);
+ else if (i->first == QUEUE_NAME) queueName = i->second.asString();
else {
// TODO: strict checking here
}
@@ -828,7 +830,9 @@ void Broker::createObject(const std::string& type, const std::string& name,
}
std::pair<Bridge::shared_ptr, bool> rc =
links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes,
- dynamic, sync);
+ dynamic, sync,
+ 0,
+ queueName);
if (!rc.first) {
QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName <<
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index d3dfaedaf9..fe037c720d 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -2328,6 +2328,7 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+
def test_multilink_shared_queue(self):
""" Verify that two distinct links can be created between federated
brokers.
@@ -2441,4 +2442,166 @@ class FederationTests(TestBase010):
self.verify_cleanup()
-
+ def test_dynamic_direct_shared_queue(self):
+ """
+ Route Topology:
+
+ +<--- B1
+ B0 <---+<--- B2
+ +<--- B3
+ """
+ session = self.session
+
+ # create the federation
+
+ self.startQmf()
+ qmf = self.qmf
+
+ self._setup_brokers()
+
+ # create direct exchange on each broker, and retrieve the corresponding
+ # management object for that exchange
+
+ exchanges=[]
+ for _b in self._brokers:
+ _b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
+ self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type,
+ "direct", "exchange_declare failed!")
+ # pull the exchange out of qmf...
+ retries = 0
+ my_exchange = None
+ while my_exchange is None:
+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+ for ooo in objs:
+ if ooo.name == "fedX.direct":
+ my_exchange = ooo
+ break
+ if my_exchange is None:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "QMF failed to find new exchange!")
+ sleep(1)
+ exchanges.append(my_exchange)
+
+ self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!")
+
+ # Create 2 links per each source broker (1,2,3) to the downstream
+ # broker 0:
+ for _b in range(1,4):
+ for _l in ["dynamic", "queue"]:
+ result = self._brokers[0].qmf_object.create( "link",
+ "Link-%d-%s" % (_b, _l),
+ {"host":self._brokers[_b].host,
+ "port":self._brokers[_b].port}, False)
+ self.assertEqual(result.status, 0)
+
+ # create queue on source brokers for use by the dynamic route
+ self._brokers[_b].client_session.queue_declare(queue="fedSrcQ", exclusive=False, auto_delete=True)
+
+ for _l in range(1,4):
+ # for each dynamic link, create a dynamic bridge for the "fedX.direct"
+ # exchanges, using the fedSrcQ on each upstream source broker
+ result = self._brokers[0].qmf_object.create("bridge",
+ "Bridge-%d-dynamic" % _l,
+ {"link":"Link-%d-dynamic" % _l,
+ "src":"fedX.direct",
+ "dest":"fedX.direct",
+ "dynamic":True,
+ "queue":"fedSrcQ"}, False)
+ self.assertEqual(result.status, 0)
+
+ # create a queue route that shares the queue used by the dynamic route
+ result = self._brokers[0].qmf_object.create("bridge",
+ "Bridge-%d-queue" % _l,
+ {"link":"Link-%d-queue" % _l,
+ "src":"fedSrcQ",
+ "dest":"fedX.direct",
+ "srcIsQueue":True}, False)
+ self.assertEqual(result.status, 0)
+
+
+ # wait for the inter-broker links to become operational
+ retries = 0
+ operational = False
+ while not operational:
+ operational = True
+ for _l in qmf.getObjects(_class="link"):
+ #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state)))
+ if _l.state != "Operational":
+ operational = False
+ if not operational:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "inter-broker links failed to become operational.")
+ sleep(1)
+
+ # @todo - There is no way to determine when the bridge objects become
+ # active. Hopefully, this is long enough!
+ sleep(6)
+
+ # create a queue on B0, bound to "spudboy"
+ self._brokers[0].client_session.queue_declare(queue="DestQ", exclusive=True, auto_delete=True)
+ self._brokers[0].client_session.exchange_bind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy")
+
+ # subscribe to messages arriving on B2's queue
+ self.subscribe(self._brokers[0].client_session, queue="DestQ", destination="f1")
+ queue = self._brokers[0].client_session.incoming("f1")
+
+ # wait until the binding key has propagated to each broker
+
+ binding_counts = [1, 1, 1, 1]
+ self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+ for i in range(3,-1,-1):
+ retries = 0
+ exchanges[i].update()
+ while exchanges[i].bindingCount < binding_counts[i]:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "binding failed to propagate to broker %d"
+ % i)
+ sleep(3)
+ exchanges[i].update()
+
+ for _b in range(1,4):
+ # send 3 msgs from each source broker
+ for i in range(3):
+ dp = self._brokers[_b].client_session.delivery_properties(routing_key="spudboy")
+ self._brokers[_b].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i))
+
+ # get exactly 9 (3 per broker) on B0
+ for i in range(9):
+ msg = queue.get(timeout=5)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ # verify that messages went across every link
+ for _l in qmf.getObjects(_broker=self._brokers[0].qmf_broker,
+ _class="link"):
+ for _c in qmf.getObjects(_broker=self._brokers[0].qmf_broker,
+ _objectId=_l.connectionRef):
+ self.assertNotEqual(_c.msgsToClient, 0, "Messages did not pass over link as expected.")
+
+ # cleanup
+
+ self._brokers[0].client_session.exchange_unbind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy")
+ self._brokers[0].client_session.message_cancel(destination="f1")
+ self._brokers[0].client_session.queue_delete(queue="DestQ")
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _b in self._brokers:
+ _b.client_session.exchange_delete(exchange="fedX.direct")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+