diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Link.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 77 |
1 files changed, 52 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 2c053220a4..cd5b89e1ad 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/LinkRegistry.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/sys/Timer.h" #include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h" #include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h" #include "boost/bind.hpp" @@ -31,19 +32,35 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/AclModule.h" -using namespace qpid::broker; -using qpid::framing::Buffer; -using qpid::framing::FieldTable; -using qpid::framing::UnauthorizedAccessException; -using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED; -using qpid::management::ManagementAgent; -using qpid::management::ManagementObject; -using qpid::management::Manageable; -using qpid::management::Args; -using qpid::sys::Mutex; +namespace qpid { +namespace broker { + +using framing::Buffer; +using framing::FieldTable; +using framing::UnauthorizedAccessException; +using framing::connection::CLOSE_CODE_CONNECTION_FORCED; +using management::ManagementAgent; +using management::ManagementObject; +using management::Manageable; +using management::Args; +using sys::Mutex; using std::stringstream; using std::string; -namespace _qmf = qmf::org::apache::qpid::broker; +namespace _qmf = ::qmf::org::apache::qpid::broker; + +struct LinkTimerTask : public sys::TimerTask { + LinkTimerTask(Link& l, sys::Timer& t) + : TimerTask(/*FIXME*/100*sys::TIME_MSEC, "Link retry timer"), link(l), timer(t) {} + + void fire() { + link.maintenanceVisit(); // FIXME aconway 2012-01-31: + setupNextFire(); + timer.add(this); + } + + Link& link; + sys::Timer& timer; +}; Link::Link(LinkRegistry* _links, MessageStore* _store, @@ -67,7 +84,8 @@ Link::Link(LinkRegistry* _links, reconnectNext(0), // Index of next address for reconnecting in url. channelCounter(1), connection(0), - agent(0) + agent(0), + timerTask(new LinkTimerTask(*this, broker->getTimer())) { if (parent != 0 && broker != 0) { @@ -80,10 +98,12 @@ Link::Link(LinkRegistry* _links, } setStateLH(STATE_WAITING); startConnectionLH(); + broker->getTimer().add(timerTask); } Link::~Link () { + timerTask->cancel(); if (state == STATE_OPERATIONAL && connection != 0) connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); @@ -121,7 +141,9 @@ void Link::startConnectionLH () broker->connect (host, boost::lexical_cast<std::string>(port), transport, boost::bind (&Link::closed, this, _1, _2)); QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); - } catch(std::exception& e) { + } catch(const std::exception& e) { + QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: " + << e.what()); setStateLH(STATE_WAITING); if (!hideManagement()) mgmtObject->set_lastError (e.what()); @@ -156,14 +178,16 @@ void Link::setUrl(const Url& u) { void Link::opened() { Mutex::ScopedLock mutex(lock); assert(connection); - // Get default URL from known-hosts. - const std::vector<Url>& known = connection->getKnownHosts(); - // Flatten vector of URLs into a single URL listing all addresses. - url.clear(); - for(size_t i = 0; i < known.size(); ++i) - url.insert(url.end(), known[i].begin(), known[i].end()); - reconnectNext = 0; - QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); + // Get default URL from known-hosts if not already set + if (url.empty()) { + const std::vector<Url>& known = connection->getKnownHosts(); + // Flatten vector of URLs into a single URL listing all addresses. + url.clear(); + for(size_t i = 0; i < known.size(); ++i) + url.insert(url.end(), known[i].begin(), known[i].end()); + reconnectNext = 0; + QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); + } } void Link::closed(int, std::string text) @@ -176,7 +200,7 @@ void Link::closed(int, std::string text) if (state == STATE_OPERATIONAL) { stringstream addr; addr << host << ":" << port; - QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str()); + QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str()); if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } @@ -333,7 +357,7 @@ void Link::maintenanceVisit () connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } -void Link::reconnect(const qpid::Address& a) +void Link::reconnect(const Address& a) { Mutex::ScopedLock mutex(lock); host = a.host; @@ -347,13 +371,14 @@ void Link::reconnect(const qpid::Address& a) } } -bool Link::tryFailoverLH() { // FIXME aconway 2012-01-30: lock held? +bool Link::tryFailoverLH() { if (reconnectNext >= url.size()) reconnectNext = 0; if (url.empty()) return false; Address next = url[reconnectNext++]; if (next.host != host || next.port != port || next.protocol != transport) { links->changeAddress(Address(transport, host, port), next); - QPID_LOG(debug, "Link failing over to " << host << ":" << port); + QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port); + reconnect(next); return true; } return false; @@ -510,3 +535,5 @@ void Link::setPassive(bool passive) } } } + +}} // namespace qpid::broker |