summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r--cpp/src/qpid/broker/Link.cpp111
1 files changed, 59 insertions, 52 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 467d422721..ab28c5b01f 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -136,7 +136,8 @@ Link::Link(const string& _name,
const string& _username,
const string& _password,
Broker* _broker,
- Manageable* parent)
+ Manageable* parent,
+ bool failover_)
: name(_name), links(_links),
configuredTransport(_transport), configuredHost(_host), configuredPort(_port),
host(_host), port(_port), transport(_transport),
@@ -152,6 +153,7 @@ Link::Link(const string& _name,
agent(0),
listener(l),
timerTask(new LinkTimerTask(*this, broker->getTimer())),
+ failover(failover_),
failoverChannel(0)
{
if (parent != 0 && broker != 0)
@@ -174,13 +176,15 @@ Link::Link(const string& _name,
}
broker->getTimer().add(timerTask);
- stringstream exchangeName;
- exchangeName << "qpid.link." << name;
- std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(exchangeName.str(),
- exchangeTypeName);
- failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
- assert(failoverExchange);
- failoverExchange->setLink(this);
+ if (failover) {
+ stringstream exchangeName;
+ exchangeName << "qpid.link." << name;
+ std::pair<Exchange::shared_ptr, bool> rc =
+ broker->getExchanges().declare(exchangeName.str(), exchangeTypeName);
+ failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+ assert(failoverExchange);
+ failoverExchange->setLink(this);
+ }
}
Link::~Link ()
@@ -192,7 +196,8 @@ Link::~Link ()
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
- broker->getExchanges().destroy(failoverExchange->getName());
+ if (failover)
+ broker->getExchanges().destroy(failoverExchange->getName());
}
void Link::setStateLH (int newState)
@@ -273,9 +278,7 @@ class DetachedCallback : public SessionHandler::ErrorListener {
void connectionException(framing::connection::CloseCode, const std::string&) {}
void channelException(framing::session::DetachCode, const std::string&) {}
void executionException(framing::execution::ErrorCode, const std::string&) {}
- void detach() {
- QPID_LOG(notice, "detached from 'amq.failover' for link: " << name);
- }
+ void detach() {}
private:
const std::string name;
};
@@ -300,42 +303,44 @@ void Link::opened() {
QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
}
- //
- // attempt to subscribe to failover exchange for updates from remote
- //
-
- const std::string queueName = "qpid.link." + framing::Uuid(true).str();
- failoverChannel = nextChannel();
-
- SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
- sessionHandler.setErrorListener(
- boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this)));
- failoverSession = queueName;
- sessionHandler.attachAs(failoverSession);
-
- framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
-
- remoteBroker.getQueue().declare(queueName,
- "", // alt-exchange
- false, // passive
- false, // durable
- true, // exclusive
- true, // auto-delete
- FieldTable());
- remoteBroker.getExchange().bind(queueName,
- FAILOVER_EXCHANGE,
- "", // no key
- FieldTable());
- remoteBroker.getMessage().subscribe(queueName,
- failoverExchange->getName(),
- 1, // implied-accept mode
- 0, // pre-acquire mode
- false, // exclusive
- "", // resume-id
- 0, // resume-ttl
+ if (failover) {
+ //
+ // attempt to subscribe to failover exchange for updates from remote
+ //
+
+ const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+ failoverChannel = nextChannel();
+
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ sessionHandler.setErrorListener(
+ boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this)));
+ failoverSession = queueName;
+ sessionHandler.attachAs(failoverSession);
+
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+
+ remoteBroker.getQueue().declare(queueName,
+ "", // alt-exchange
+ false, // passive
+ false, // durable
+ true, // exclusive
+ true, // auto-delete
FieldTable());
- remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
- remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
+ remoteBroker.getExchange().bind(queueName,
+ FAILOVER_EXCHANGE,
+ "", // no key
+ FieldTable());
+ remoteBroker.getMessage().subscribe(queueName,
+ failoverExchange->getName(),
+ 1, // implied-accept mode
+ 0, // pre-acquire mode
+ false, // exclusive
+ "", // resume-id
+ 0, // resume-ttl
+ FieldTable());
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
+ }
}
void Link::closed(int, std::string text)
@@ -711,11 +716,13 @@ void Link::closeConnection( const std::string& reason)
{
if (connection != 0) {
// cancel our subscription to the failover exchange
- SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
- if (sessionHandler.getSession()) {
- framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
- remoteBroker.getMessage().cancel(failoverExchange->getName());
- remoteBroker.getSession().detach(failoverSession);
+ if (failover) {
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ if (sessionHandler.getSession()) {
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+ remoteBroker.getMessage().cancel(failoverExchange->getName());
+ remoteBroker.getSession().detach(failoverSession);
+ }
}
connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
connection = 0;