diff options
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 48 |
1 files changed, 44 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 92417608b7..2bd15759ef 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -63,6 +63,7 @@ Link::Link(LinkRegistry* _links, visitCount(0), currentInterval(1), closing(false), + updateUrls(false), channelCounter(1), connection(0), agent(0) @@ -116,6 +117,7 @@ void Link::startConnectionLH () setStateLH(STATE_CONNECTING); broker->connect (host, port, transport, boost::bind (&Link::closed, this, _1, _2)); + QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); } catch(std::exception& e) { setStateLH(STATE_WAITING); if (mgmtObject != 0) @@ -143,6 +145,7 @@ void Link::established () void Link::closed (int, std::string text) { Mutex::ScopedLock mutex(lock); + QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); connection = 0; @@ -251,28 +254,65 @@ void Link::setConnection(Connection* c) { Mutex::ScopedLock mutex(lock); connection = c; + updateUrls = true; } void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); + if (connection && updateUrls) { + urls.reset(connection->getKnownHosts()); + QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls); + updateUrls = false; + } + if (state == STATE_WAITING) { visitCount++; if (visitCount >= currentInterval) { visitCount = 0; - currentInterval *= 2; - if (currentInterval > MAX_INTERVAL) - currentInterval = MAX_INTERVAL; - startConnectionLH(); + //switch host and port to next in url list if possible + if (!tryFailover()) { + currentInterval *= 2; + if (currentInterval > MAX_INTERVAL) + currentInterval = MAX_INTERVAL; + startConnectionLH(); + } } } else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } +void Link::reconnect(const TcpAddress& a) +{ + Mutex::ScopedLock mutex(lock); + host = a.host; + port = a.port; + startConnectionLH(); + if (mgmtObject != 0) { + stringstream errorString; + errorString << "Failed over to " << a; + mgmtObject->set_lastError(errorString.str()); + } +} + +bool Link::tryFailover() +{ + //TODO: urls only work for TCP at present, update when that has changed + TcpAddress next; + if (transport == Broker::TCP_TRANSPORT && urls.next(next) && + (next.host != host || next.port != port)) { + links->changeAddress(TcpAddress(host, port), next); + QPID_LOG(debug, "Link failing over to " << host << ":" << port); + return true; + } else { + return false; + } +} + uint Link::nextChannel() { Mutex::ScopedLock mutex(lock); |