summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Bridge.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Bridge.cpp')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp24
1 files changed, 14 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 60d8263a76..63325a1e91 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -59,13 +59,16 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
CancellationListener l, const _qmf::ArgsLinkBridge& _args,
- InitializeCallback init, const string& qn, const string& ae) :
+ InitializeCallback init, const std::string& _queueName, const string& ae) :
link(_link), channel(_id), args(_args), mgmtObject(0),
- listener(l), name(_name), queueName(qn), altEx(ae), persistenceId(0),
- connState(0), conn(0), initialize(init), detached(false)
+ listener(l), name(_name),
+ queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag()
+ : _queueName),
+ altEx(ae), persistenceId(0),
+ connState(0), conn(0), initialize(init), detached(false),
+ useExistingQueue(!_queueName.empty()),
+ sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag())
{
- if (queueName.empty())
- queueName = "qpid.bridge_queue_" + Uuid(true).str();
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
@@ -102,10 +105,10 @@ void Bridge::create(Connection& c)
session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
- session->attach(queueName, false);
+ session->attach(sessionName, false);
session->commandPoint(0,0);
} else {
- sessionHandler.attachAs(queueName);
+ sessionHandler.attachAs(sessionName);
// Point the bridging commands at the remote peer broker
peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
}
@@ -137,8 +140,9 @@ void Bridge::create(Connection& c)
}
bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
- bool autoDelete = !durable;//auto delete transient queues?
- peer->getQueue().declare(queueName, altEx, false, durable, true, autoDelete, queueSettings);
+ bool exclusive = !useExistingQueue; // only exclusive if the queue is owned by the bridge
+ bool autoDelete = exclusive && !durable;//auto delete transient queues?
+ peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings);
if (!args.i_dynamic)
peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable());
peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
@@ -162,7 +166,7 @@ void Bridge::cancel(Connection&)
{
if (resetProxy()) {
peer->getMessage().cancel(args.i_dest);
- peer->getSession().detach(queueName);
+ peer->getSession().detach(sessionName);
}
QPID_LOG(debug, "Cancelled bridge " << name);
}