summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2012-05-01 13:57:31 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2012-05-01 13:57:31 +0000
commit17895350191c5bfdbefbd1d5919d300abab2db57 (patch)
treefe42f49d4daa98a91db1fb2af993a26fe140c95f
parente3b659e61a270ad25af48a59096db2506eec9447 (diff)
downloadqpid-python-17895350191c5bfdbefbd1d5919d300abab2db57.tar.gz
QPID-3963: cleanups from reviewboard input
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1332655 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp44
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h3
2 files changed, 26 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index a8c4b2c2cb..653601e7e4 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -50,6 +50,13 @@ using std::stringstream;
using std::string;
namespace _qmf = ::qmf::org::apache::qpid::broker;
+
+namespace {
+ const std::string FAILOVER_EXCHANGE("amq.failover");
+ const std::string FAILOVER_HEADER_KEY("amq.failover");
+}
+
+
struct LinkTimerTask : public sys::TimerTask {
LinkTimerTask(Link& l, sys::Timer& t)
: TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval*
@@ -68,11 +75,6 @@ struct LinkTimerTask : public sys::TimerTask {
};
-namespace {
- const std::string FAILOVER_EXCHANGE("amq.failover");
- const std::string FAILOVER_HEADER_KEY("amq.failover");
- const framing::ChannelId FAILOVER_CHANNEL(framing::CHANNEL_HIGH_BIT | 1); // reserved for this link
-}
/** LinkExchange is used by the link to subscribe to the remote broker's amq.failover exchange.
*/
@@ -102,7 +104,7 @@ public:
urlVec = urlArrayToVector(addresses);
for(size_t i = 0; i < urlVec.size(); ++i)
urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
- QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls);
+ QPID_LOG(notice, "Remote broker has provided these failover addresses= " << urls);
link->setUrl(urls);
}
}
@@ -147,7 +149,8 @@ Link::Link(LinkRegistry* _links,
channelCounter(1),
connection(0),
agent(0),
- timerTask(new LinkTimerTask(*this, broker->getTimer()))
+ timerTask(new LinkTimerTask(*this, broker->getTimer())),
+ failoverChannel(0)
{
if (parent != 0 && broker != 0)
{
@@ -170,9 +173,9 @@ Link::Link(LinkRegistry* _links,
_name << "qpid.link." << transport << ":" << host << ":" << port;
std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(_name.str(),
exchangeTypeName);
- exchange = boost::static_pointer_cast<LinkExchange>(rc.first);
- assert(exchange);
- exchange->setLink(this);
+ failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+ assert(failoverExchange);
+ failoverExchange->setLink(this);
}
Link::~Link ()
@@ -184,7 +187,7 @@ Link::~Link ()
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
- broker->getExchanges().destroy(exchange->getName());
+ broker->getExchanges().destroy(failoverExchange->getName());
}
void Link::setStateLH (int newState)
@@ -287,11 +290,12 @@ void Link::opened() {
// attempt to subscribe to failover exchange for updates from remote
//
- const std::string queueName = "qpid.link." + exchange->getName();
+ const std::string queueName = "qpid.link." + failoverExchange->getName();
+ failoverChannel = nextChannel();
- SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL);
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
- sessionHandler.attachAs(exchange->getName());
+ sessionHandler.attachAs(failoverExchange->getName());
framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
@@ -307,15 +311,15 @@ void Link::opened() {
"", // no key
FieldTable());
remoteBroker.getMessage().subscribe(queueName,
- exchange->getName(),
+ failoverExchange->getName(),
1, // implied-accept mode
0, // pre-acquire mode
false, // exclusive
"", // resume-id
0, // resume-ttl
FieldTable());
- remoteBroker.getMessage().flow(exchange->getName(), 0, 0xFFFFFFFF);
- remoteBroker.getMessage().flow(exchange->getName(), 1, 0xFFFFFFFF);
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
}
void Link::closed(int, std::string text)
@@ -665,11 +669,11 @@ void Link::closeConnection( const std::string& reason)
{
if (connection != 0) {
// cancel our subscription to the failover exchange
- SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL);
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
if (sessionHandler.getSession()) {
framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
- remoteBroker.getMessage().cancel(exchange->getName());
- remoteBroker.getSession().detach(exchange->getName());
+ remoteBroker.getMessage().cancel(failoverExchange->getName());
+ remoteBroker.getSession().detach(failoverExchange->getName());
}
connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
connection = 0;
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 1f8b3a2f23..27e9f466f5 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -86,7 +86,8 @@ class Link : public PersistableConfig, public management::Manageable {
Connection* connection;
management::ManagementAgent* agent;
boost::intrusive_ptr<sys::TimerTask> timerTask;
- boost::shared_ptr<broker::LinkExchange> exchange;
+ boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange
+ uint failoverChannel;
static const int STATE_WAITING = 1;
static const int STATE_CONNECTING = 2;