summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Link.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp77
1 files changed, 52 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 2c053220a4..cd5b89e1ad 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/sys/Timer.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
#include "boost/bind.hpp"
@@ -31,19 +32,35 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
-using namespace qpid::broker;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::framing::UnauthorizedAccessException;
-using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-using qpid::sys::Mutex;
+namespace qpid {
+namespace broker {
+
+using framing::Buffer;
+using framing::FieldTable;
+using framing::UnauthorizedAccessException;
+using framing::connection::CLOSE_CODE_CONNECTION_FORCED;
+using management::ManagementAgent;
+using management::ManagementObject;
+using management::Manageable;
+using management::Args;
+using sys::Mutex;
using std::stringstream;
using std::string;
-namespace _qmf = qmf::org::apache::qpid::broker;
+namespace _qmf = ::qmf::org::apache::qpid::broker;
+
+struct LinkTimerTask : public sys::TimerTask {
+ LinkTimerTask(Link& l, sys::Timer& t)
+ : TimerTask(/*FIXME*/100*sys::TIME_MSEC, "Link retry timer"), link(l), timer(t) {}
+
+ void fire() {
+ link.maintenanceVisit(); // FIXME aconway 2012-01-31:
+ setupNextFire();
+ timer.add(this);
+ }
+
+ Link& link;
+ sys::Timer& timer;
+};
Link::Link(LinkRegistry* _links,
MessageStore* _store,
@@ -67,7 +84,8 @@ Link::Link(LinkRegistry* _links,
reconnectNext(0), // Index of next address for reconnecting in url.
channelCounter(1),
connection(0),
- agent(0)
+ agent(0),
+ timerTask(new LinkTimerTask(*this, broker->getTimer()))
{
if (parent != 0 && broker != 0)
{
@@ -80,10 +98,12 @@ Link::Link(LinkRegistry* _links,
}
setStateLH(STATE_WAITING);
startConnectionLH();
+ broker->getTimer().add(timerTask);
}
Link::~Link ()
{
+ timerTask->cancel();
if (state == STATE_OPERATIONAL && connection != 0)
connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
@@ -121,7 +141,9 @@ void Link::startConnectionLH ()
broker->connect (host, boost::lexical_cast<std::string>(port), transport,
boost::bind (&Link::closed, this, _1, _2));
QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
- } catch(std::exception& e) {
+ } catch(const std::exception& e) {
+ QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: "
+ << e.what());
setStateLH(STATE_WAITING);
if (!hideManagement())
mgmtObject->set_lastError (e.what());
@@ -156,14 +178,16 @@ void Link::setUrl(const Url& u) {
void Link::opened() {
Mutex::ScopedLock mutex(lock);
assert(connection);
- // Get default URL from known-hosts.
- const std::vector<Url>& known = connection->getKnownHosts();
- // Flatten vector of URLs into a single URL listing all addresses.
- url.clear();
- for(size_t i = 0; i < known.size(); ++i)
- url.insert(url.end(), known[i].begin(), known[i].end());
- reconnectNext = 0;
- QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
+ // Get default URL from known-hosts if not already set
+ if (url.empty()) {
+ const std::vector<Url>& known = connection->getKnownHosts();
+ // Flatten vector of URLs into a single URL listing all addresses.
+ url.clear();
+ for(size_t i = 0; i < known.size(); ++i)
+ url.insert(url.end(), known[i].begin(), known[i].end());
+ reconnectNext = 0;
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
+ }
}
void Link::closed(int, std::string text)
@@ -176,7 +200,7 @@ void Link::closed(int, std::string text)
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
- QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
+ QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str());
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
@@ -333,7 +357,7 @@ void Link::maintenanceVisit ()
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
-void Link::reconnect(const qpid::Address& a)
+void Link::reconnect(const Address& a)
{
Mutex::ScopedLock mutex(lock);
host = a.host;
@@ -347,13 +371,14 @@ void Link::reconnect(const qpid::Address& a)
}
}
-bool Link::tryFailoverLH() { // FIXME aconway 2012-01-30: lock held?
+bool Link::tryFailoverLH() {
if (reconnectNext >= url.size()) reconnectNext = 0;
if (url.empty()) return false;
Address next = url[reconnectNext++];
if (next.host != host || next.port != port || next.protocol != transport) {
links->changeAddress(Address(transport, host, port), next);
- QPID_LOG(debug, "Link failing over to " << host << ":" << port);
+ QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
+ reconnect(next);
return true;
}
return false;
@@ -510,3 +535,5 @@ void Link::setPassive(bool passive)
}
}
}
+
+}} // namespace qpid::broker