diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-05-01 13:57:31 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-05-01 13:57:31 +0000 |
commit | 17895350191c5bfdbefbd1d5919d300abab2db57 (patch) | |
tree | fe42f49d4daa98a91db1fb2af993a26fe140c95f | |
parent | e3b659e61a270ad25af48a59096db2506eec9447 (diff) | |
download | qpid-python-17895350191c5bfdbefbd1d5919d300abab2db57.tar.gz |
QPID-3963: cleanups from reviewboard input
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1332655 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 44 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 3 |
2 files changed, 26 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index a8c4b2c2cb..653601e7e4 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -50,6 +50,13 @@ using std::stringstream; using std::string; namespace _qmf = ::qmf::org::apache::qpid::broker; + +namespace { + const std::string FAILOVER_EXCHANGE("amq.failover"); + const std::string FAILOVER_HEADER_KEY("amq.failover"); +} + + struct LinkTimerTask : public sys::TimerTask { LinkTimerTask(Link& l, sys::Timer& t) : TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval* @@ -68,11 +75,6 @@ struct LinkTimerTask : public sys::TimerTask { }; -namespace { - const std::string FAILOVER_EXCHANGE("amq.failover"); - const std::string FAILOVER_HEADER_KEY("amq.failover"); - const framing::ChannelId FAILOVER_CHANNEL(framing::CHANNEL_HIGH_BIT | 1); // reserved for this link -} /** LinkExchange is used by the link to subscribe to the remote broker's amq.failover exchange. */ @@ -102,7 +104,7 @@ public: urlVec = urlArrayToVector(addresses); for(size_t i = 0; i < urlVec.size(); ++i) urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end()); - QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls); + QPID_LOG(notice, "Remote broker has provided these failover addresses= " << urls); link->setUrl(urls); } } @@ -147,7 +149,8 @@ Link::Link(LinkRegistry* _links, channelCounter(1), connection(0), agent(0), - timerTask(new LinkTimerTask(*this, broker->getTimer())) + timerTask(new LinkTimerTask(*this, broker->getTimer())), + failoverChannel(0) { if (parent != 0 && broker != 0) { @@ -170,9 +173,9 @@ Link::Link(LinkRegistry* _links, _name << "qpid.link." << transport << ":" << host << ":" << port; std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(), exchangeTypeName); - exchange = boost::static_pointer_cast<LinkExchange>(rc.first); - assert(exchange); - exchange->setLink(this); + failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first); + assert(failoverExchange); + failoverExchange->setLink(this); } Link::~Link () @@ -184,7 +187,7 @@ Link::~Link () if (mgmtObject != 0) mgmtObject->resourceDestroy (); - broker->getExchanges().destroy(exchange->getName()); + broker->getExchanges().destroy(failoverExchange->getName()); } void Link::setStateLH (int newState) @@ -287,11 +290,12 @@ void Link::opened() { // attempt to subscribe to failover exchange for updates from remote // - const std::string queueName = "qpid.link." + exchange->getName(); + const std::string queueName = "qpid.link." + failoverExchange->getName(); + failoverChannel = nextChannel(); - SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL); + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) ); - sessionHandler.attachAs(exchange->getName()); + sessionHandler.attachAs(failoverExchange->getName()); framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); @@ -307,15 +311,15 @@ void Link::opened() { "", // no key FieldTable()); remoteBroker.getMessage().subscribe(queueName, - exchange->getName(), + failoverExchange->getName(), 1, // implied-accept mode 0, // pre-acquire mode false, // exclusive "", // resume-id 0, // resume-ttl FieldTable()); - remoteBroker.getMessage().flow(exchange->getName(), 0, 0xFFFFFFFF); - remoteBroker.getMessage().flow(exchange->getName(), 1, 0xFFFFFFFF); + remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF); + remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF); } void Link::closed(int, std::string text) @@ -665,11 +669,11 @@ void Link::closeConnection( const std::string& reason) { if (connection != 0) { // cancel our subscription to the failover exchange - SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL); + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); if (sessionHandler.getSession()) { framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); - remoteBroker.getMessage().cancel(exchange->getName()); - remoteBroker.getSession().detach(exchange->getName()); + remoteBroker.getMessage().cancel(failoverExchange->getName()); + remoteBroker.getSession().detach(failoverExchange->getName()); } connection->close(CLOSE_CODE_CONNECTION_FORCED, reason); connection = 0; diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 1f8b3a2f23..27e9f466f5 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -86,7 +86,8 @@ class Link : public PersistableConfig, public management::Manageable { Connection* connection; management::ManagementAgent* agent; boost::intrusive_ptr<sys::TimerTask> timerTask; - boost::shared_ptr<broker::LinkExchange> exchange; + boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange + uint failoverChannel; static const int STATE_WAITING = 1; static const int STATE_CONNECTING = 2; |