summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r--cpp/src/qpid/broker/Link.cpp48
1 files changed, 44 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 92417608b7..2bd15759ef 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -63,6 +63,7 @@ Link::Link(LinkRegistry* _links,
visitCount(0),
currentInterval(1),
closing(false),
+ updateUrls(false),
channelCounter(1),
connection(0),
agent(0)
@@ -116,6 +117,7 @@ void Link::startConnectionLH ()
setStateLH(STATE_CONNECTING);
broker->connect (host, port, transport,
boost::bind (&Link::closed, this, _1, _2));
+ QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
} catch(std::exception& e) {
setStateLH(STATE_WAITING);
if (mgmtObject != 0)
@@ -143,6 +145,7 @@ void Link::established ()
void Link::closed (int, std::string text)
{
Mutex::ScopedLock mutex(lock);
+ QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
connection = 0;
@@ -251,28 +254,65 @@ void Link::setConnection(Connection* c)
{
Mutex::ScopedLock mutex(lock);
connection = c;
+ updateUrls = true;
}
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
+ if (connection && updateUrls) {
+ urls.reset(connection->getKnownHosts());
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
+ updateUrls = false;
+ }
+
if (state == STATE_WAITING)
{
visitCount++;
if (visitCount >= currentInterval)
{
visitCount = 0;
- currentInterval *= 2;
- if (currentInterval > MAX_INTERVAL)
- currentInterval = MAX_INTERVAL;
- startConnectionLH();
+ //switch host and port to next in url list if possible
+ if (!tryFailover()) {
+ currentInterval *= 2;
+ if (currentInterval > MAX_INTERVAL)
+ currentInterval = MAX_INTERVAL;
+ startConnectionLH();
+ }
}
}
else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
+void Link::reconnect(const TcpAddress& a)
+{
+ Mutex::ScopedLock mutex(lock);
+ host = a.host;
+ port = a.port;
+ startConnectionLH();
+ if (mgmtObject != 0) {
+ stringstream errorString;
+ errorString << "Failed over to " << a;
+ mgmtObject->set_lastError(errorString.str());
+ }
+}
+
+bool Link::tryFailover()
+{
+ //TODO: urls only work for TCP at present, update when that has changed
+ TcpAddress next;
+ if (transport == Broker::TCP_TRANSPORT && urls.next(next) &&
+ (next.host != host || next.port != port)) {
+ links->changeAddress(TcpAddress(host, port), next);
+ QPID_LOG(debug, "Link failing over to " << host << ":" << port);
+ return true;
+ } else {
+ return false;
+ }
+}
+
uint Link::nextChannel()
{
Mutex::ScopedLock mutex(lock);