diff options
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 111 |
1 files changed, 59 insertions, 52 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 467d422721..ab28c5b01f 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -136,7 +136,8 @@ Link::Link(const string& _name, const string& _username, const string& _password, Broker* _broker, - Manageable* parent) + Manageable* parent, + bool failover_) : name(_name), links(_links), configuredTransport(_transport), configuredHost(_host), configuredPort(_port), host(_host), port(_port), transport(_transport), @@ -152,6 +153,7 @@ Link::Link(const string& _name, agent(0), listener(l), timerTask(new LinkTimerTask(*this, broker->getTimer())), + failover(failover_), failoverChannel(0) { if (parent != 0 && broker != 0) @@ -174,13 +176,15 @@ Link::Link(const string& _name, } broker->getTimer().add(timerTask); - stringstream exchangeName; - exchangeName << "qpid.link." << name; - std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(exchangeName.str(), - exchangeTypeName); - failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first); - assert(failoverExchange); - failoverExchange->setLink(this); + if (failover) { + stringstream exchangeName; + exchangeName << "qpid.link." << name; + std::pair<Exchange::shared_ptr, bool> rc = + broker->getExchanges().declare(exchangeName.str(), exchangeTypeName); + failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first); + assert(failoverExchange); + failoverExchange->setLink(this); + } } Link::~Link () @@ -192,7 +196,8 @@ Link::~Link () if (mgmtObject != 0) mgmtObject->resourceDestroy (); - broker->getExchanges().destroy(failoverExchange->getName()); + if (failover) + broker->getExchanges().destroy(failoverExchange->getName()); } void Link::setStateLH (int newState) @@ -273,9 +278,7 @@ class DetachedCallback : public SessionHandler::ErrorListener { void connectionException(framing::connection::CloseCode, const std::string&) {} void channelException(framing::session::DetachCode, const std::string&) {} void executionException(framing::execution::ErrorCode, const std::string&) {} - void detach() { - QPID_LOG(notice, "detached from 'amq.failover' for link: " << name); - } + void detach() {} private: const std::string name; }; @@ -300,42 +303,44 @@ void Link::opened() { QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); } - // - // attempt to subscribe to failover exchange for updates from remote - // - - const std::string queueName = "qpid.link." + framing::Uuid(true).str(); - failoverChannel = nextChannel(); - - SessionHandler& sessionHandler = connection->getChannel(failoverChannel); - sessionHandler.setErrorListener( - boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this))); - failoverSession = queueName; - sessionHandler.attachAs(failoverSession); - - framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); - - remoteBroker.getQueue().declare(queueName, - "", // alt-exchange - false, // passive - false, // durable - true, // exclusive - true, // auto-delete - FieldTable()); - remoteBroker.getExchange().bind(queueName, - FAILOVER_EXCHANGE, - "", // no key - FieldTable()); - remoteBroker.getMessage().subscribe(queueName, - failoverExchange->getName(), - 1, // implied-accept mode - 0, // pre-acquire mode - false, // exclusive - "", // resume-id - 0, // resume-ttl + if (failover) { + // + // attempt to subscribe to failover exchange for updates from remote + // + + const std::string queueName = "qpid.link." + framing::Uuid(true).str(); + failoverChannel = nextChannel(); + + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); + sessionHandler.setErrorListener( + boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this))); + failoverSession = queueName; + sessionHandler.attachAs(failoverSession); + + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + + remoteBroker.getQueue().declare(queueName, + "", // alt-exchange + false, // passive + false, // durable + true, // exclusive + true, // auto-delete FieldTable()); - remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF); - remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF); + remoteBroker.getExchange().bind(queueName, + FAILOVER_EXCHANGE, + "", // no key + FieldTable()); + remoteBroker.getMessage().subscribe(queueName, + failoverExchange->getName(), + 1, // implied-accept mode + 0, // pre-acquire mode + false, // exclusive + "", // resume-id + 0, // resume-ttl + FieldTable()); + remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF); + remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF); + } } void Link::closed(int, std::string text) @@ -711,11 +716,13 @@ void Link::closeConnection( const std::string& reason) { if (connection != 0) { // cancel our subscription to the failover exchange - SessionHandler& sessionHandler = connection->getChannel(failoverChannel); - if (sessionHandler.getSession()) { - framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); - remoteBroker.getMessage().cancel(failoverExchange->getName()); - remoteBroker.getSession().detach(failoverSession); + if (failover) { + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); + if (sessionHandler.getSession()) { + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + remoteBroker.getMessage().cancel(failoverExchange->getName()); + remoteBroker.getSession().detach(failoverSession); + } } connection->close(CLOSE_CODE_CONNECTION_FORCED, reason); connection = 0; |
