diff options
author | Alan Conway <aconway@apache.org> | 2012-02-17 14:54:46 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-17 14:54:46 +0000 |
commit | 0a8773c335509c2b9e9b96df360de190a266dcad (patch) | |
tree | 288469c17dacc37199b5f77498965fee7e778d95 /cpp/src/qpid/broker/Link.cpp | |
parent | d82ce6836f7f0e4f7d647b2dc603141f549869d3 (diff) | |
download | qpid-python-0a8773c335509c2b9e9b96df360de190a266dcad.tar.gz |
QPID-3603: Merge new HA foundations.
Merged from qpid-3603-7. This is basic support for the new HA approach.
For information & limitations see qpid/cpp/design_docs/new-ha-design.txt.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1245587 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 192 |
1 files changed, 124 insertions, 68 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 0bc7d8f47b..4af1e6d6bd 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/LinkRegistry.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/sys/Timer.h" #include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h" #include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h" #include "boost/bind.hpp" @@ -31,29 +32,48 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/AclModule.h" -using namespace qpid::broker; -using qpid::framing::Buffer; -using qpid::framing::FieldTable; -using qpid::framing::UnauthorizedAccessException; -using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED; -using qpid::management::ManagementAgent; -using qpid::management::ManagementObject; -using qpid::management::Manageable; -using qpid::management::Args; -using qpid::sys::Mutex; +namespace qpid { +namespace broker { + +using framing::Buffer; +using framing::FieldTable; +using framing::UnauthorizedAccessException; +using framing::connection::CLOSE_CODE_CONNECTION_FORCED; +using management::ManagementAgent; +using management::ManagementObject; +using management::Manageable; +using management::Args; +using sys::Mutex; using std::stringstream; using std::string; -namespace _qmf = qmf::org::apache::qpid::broker; +namespace _qmf = ::qmf::org::apache::qpid::broker; + +struct LinkTimerTask : public sys::TimerTask { + LinkTimerTask(Link& l, sys::Timer& t) + : TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval* + sys::TIME_SEC), + "Link retry timer"), + link(l), timer(t) {} + + void fire() { + link.maintenanceVisit(); + setupNextFire(); + timer.add(this); + } + + Link& link; + sys::Timer& timer; +}; Link::Link(LinkRegistry* _links, MessageStore* _store, - string& _host, + const string& _host, uint16_t _port, - string& _transport, + const string& _transport, bool _durable, - string& _authMechanism, - string& _username, - string& _password, + const string& _authMechanism, + const string& _username, + const string& _password, Broker* _broker, Manageable* parent) : links(_links), store(_store), host(_host), port(_port), @@ -64,10 +84,11 @@ Link::Link(LinkRegistry* _links, visitCount(0), currentInterval(1), closing(false), - updateUrls(false), + reconnectNext(0), // Index of next address for reconnecting in url. channelCounter(1), connection(0), - agent(0) + agent(0), + timerTask(new LinkTimerTask(*this, broker->getTimer())) { if (parent != 0 && broker != 0) { @@ -78,7 +99,13 @@ Link::Link(LinkRegistry* _links, agent->addObject(mgmtObject, 0, durable); } } - setStateLH(STATE_WAITING); + if (links->isPassive()) { + setStateLH(STATE_PASSIVE); + } else { + setStateLH(STATE_WAITING); + startConnectionLH(); + } + broker->getTimer().add(timerTask); } Link::~Link () @@ -113,6 +140,7 @@ void Link::setStateLH (int newState) void Link::startConnectionLH () { + assert(state == STATE_WAITING); try { // Set the state before calling connect. It is possible that connect // will fail synchronously and call Link::closed before returning. @@ -120,15 +148,18 @@ void Link::startConnectionLH () broker->connect (host, boost::lexical_cast<std::string>(port), transport, boost::bind (&Link::closed, this, _1, _2)); QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port); - } catch(std::exception& e) { + } catch(const std::exception& e) { + QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: " + << e.what()); setStateLH(STATE_WAITING); if (!hideManagement()) mgmtObject->set_lastError (e.what()); } } -void Link::established () +void Link::established(Connection* c) { + if (state == STATE_PASSIVE) return; stringstream addr; addr << host << ":" << port; QPID_LOG (info, "Inter-broker link established to " << addr.str()); @@ -136,17 +167,40 @@ void Link::established () if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); - { - Mutex::ScopedLock mutex(lock); - setStateLH(STATE_OPERATIONAL); - currentInterval = 1; - visitCount = 0; - if (closing) - destroy(); + Mutex::ScopedLock mutex(lock); + setStateLH(STATE_OPERATIONAL); + currentInterval = 1; + visitCount = 0; + connection = c; + if (closing) + destroy(); + else // Process any IO tasks bridges added before established. + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); +} + + +void Link::setUrl(const Url& u) { + Mutex::ScopedLock mutex(lock); + url = u; + reconnectNext = 0; +} + +void Link::opened() { + Mutex::ScopedLock mutex(lock); + if (!connection) return; + // Get default URL from known-hosts if not already set + if (url.empty()) { + const std::vector<Url>& known = connection->getKnownHosts(); + // Flatten vector of URLs into a single URL listing all addresses. + url.clear(); + for(size_t i = 0; i < known.size(); ++i) + url.insert(url.end(), known[i].begin(), known[i].end()); + reconnectNext = 0; + QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); } } -void Link::closed (int, std::string text) +void Link::closed(int, std::string text) { Mutex::ScopedLock mutex(lock); QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); @@ -156,7 +210,7 @@ void Link::closed (int, std::string text) if (state == STATE_OPERATIONAL) { stringstream addr; addr << host << ":" << port; - QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str()); + QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str()); if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } @@ -167,7 +221,7 @@ void Link::closed (int, std::string text) } active.clear(); - if (state != STATE_FAILED) + if (state != STATE_FAILED && state != STATE_PASSIVE) { setStateLH(STATE_WAITING); if (!hideManagement()) @@ -178,6 +232,7 @@ void Link::closed (int, std::string text) destroy(); } +// Called in connection IO thread. void Link::destroy () { Bridges toDelete; @@ -187,7 +242,7 @@ void Link::destroy () QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); if (connection) connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); - + connection = 0; setStateLH(STATE_CLOSED); // Move the bridges to be deleted into a local vector so there is no @@ -201,6 +256,8 @@ void Link::destroy () for (Bridges::iterator i = created.begin(); i != created.end(); i++) toDelete.push_back(*i); created.clear(); + + timerTask->cancel(); } // Now delete all bridges on this link (don't hold the lock for this). for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) @@ -213,10 +270,14 @@ void Link::add(Bridge::shared_ptr bridge) { Mutex::ScopedLock mutex(lock); created.push_back (bridge); + if (connection) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + } void Link::cancel(Bridge::shared_ptr bridge) { + bool needIOProcessing = false; { Mutex::ScopedLock mutex(lock); @@ -234,10 +295,10 @@ void Link::cancel(Bridge::shared_ptr bridge) break; } } + needIOProcessing = !cancellations.empty(); } - if (!cancellations.empty()) { + if (needIOProcessing && connection) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); - } } void Link::ioThreadProcessing() @@ -246,7 +307,6 @@ void Link::ioThreadProcessing() if (state != STATE_OPERATIONAL) return; - QPID_LOG(debug, "Link::ioThreadProcessing()"); // check for bridge session errors and recover if (!active.empty()) { @@ -279,23 +339,10 @@ void Link::ioThreadProcessing() } } -void Link::setConnection(Connection* c) -{ - Mutex::ScopedLock mutex(lock); - connection = c; - updateUrls = true; -} - void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); - if (connection && updateUrls) { - urls.reset(connection->getKnownHosts()); - QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls); - updateUrls = false; - } - if (state == STATE_WAITING) { visitCount++; @@ -303,7 +350,7 @@ void Link::maintenanceVisit () { visitCount = 0; //switch host and port to next in url list if possible - if (!tryFailover()) { + if (!tryFailoverLH()) { currentInterval *= 2; if (currentInterval > MAX_INTERVAL) currentInterval = MAX_INTERVAL; @@ -313,11 +360,10 @@ void Link::maintenanceVisit () } else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); -} + } -void Link::reconnect(const qpid::Address& a) +void Link::reconnectLH(const Address& a) { - Mutex::ScopedLock mutex(lock); host = a.host; port = a.port; transport = a.protocol; @@ -329,17 +375,18 @@ void Link::reconnect(const qpid::Address& a) } } -bool Link::tryFailover() -{ - Address next; - if (urls.next(next) && - (next.host != host || next.port != port || next.protocol != transport)) { +bool Link::tryFailoverLH() { + assert(state == STATE_WAITING); + if (reconnectNext >= url.size()) reconnectNext = 0; + if (url.empty()) return false; + Address next = url[reconnectNext++]; + if (next.host != host || next.port != port || next.protocol != transport) { links->changeAddress(Address(transport, host, port), next); - QPID_LOG(debug, "Link failing over to " << host << ":" << port); + QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port); + reconnectLH(next); return true; - } else { - return false; } + return false; } // Management updates for a linke are inconsistent in a cluster, so they are @@ -423,18 +470,24 @@ ManagementObject* Link::GetManagementObject (void) const return (ManagementObject*) mgmtObject; } +void Link::close() { + Mutex::ScopedLock mutex(lock); + if (!closing) { + closing = true; + if (state != STATE_CONNECTING && connection) { + //connection can only be closed on the connections own IO processing thread + connection->requestIOProcessing(boost::bind(&Link::destroy, this)); + } + } +} + + Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& text) { switch (op) { case _qmf::Link::METHOD_CLOSE : - if (!closing) { - closing = true; - if (state != STATE_CONNECTING && connection) { - //connection can only be closed on the connections own IO processing thread - connection->requestIOProcessing(boost::bind(&Link::destroy, this)); - } - } + close(); return Manageable::STATUS_OK; case _qmf::Link::METHOD_BRIDGE : @@ -483,7 +536,10 @@ void Link::setPassive(bool passive) if (state == STATE_PASSIVE) { setStateLH(STATE_WAITING); } else { - QPID_LOG(warning, "Ignoring attempt to activate non-passive link"); + QPID_LOG(warning, "Ignoring attempt to activate non-passive link " + << host << ":" << port); } } } + +}} // namespace qpid::broker |