summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r--cpp/src/qpid/broker/Link.cpp192
1 files changed, 124 insertions, 68 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 0bc7d8f47b..4af1e6d6bd 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/sys/Timer.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
#include "boost/bind.hpp"
@@ -31,29 +32,48 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
-using namespace qpid::broker;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::framing::UnauthorizedAccessException;
-using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-using qpid::sys::Mutex;
+namespace qpid {
+namespace broker {
+
+using framing::Buffer;
+using framing::FieldTable;
+using framing::UnauthorizedAccessException;
+using framing::connection::CLOSE_CODE_CONNECTION_FORCED;
+using management::ManagementAgent;
+using management::ManagementObject;
+using management::Manageable;
+using management::Args;
+using sys::Mutex;
using std::stringstream;
using std::string;
-namespace _qmf = qmf::org::apache::qpid::broker;
+namespace _qmf = ::qmf::org::apache::qpid::broker;
+
+struct LinkTimerTask : public sys::TimerTask {
+ LinkTimerTask(Link& l, sys::Timer& t)
+ : TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval*
+ sys::TIME_SEC),
+ "Link retry timer"),
+ link(l), timer(t) {}
+
+ void fire() {
+ link.maintenanceVisit();
+ setupNextFire();
+ timer.add(this);
+ }
+
+ Link& link;
+ sys::Timer& timer;
+};
Link::Link(LinkRegistry* _links,
MessageStore* _store,
- string& _host,
+ const string& _host,
uint16_t _port,
- string& _transport,
+ const string& _transport,
bool _durable,
- string& _authMechanism,
- string& _username,
- string& _password,
+ const string& _authMechanism,
+ const string& _username,
+ const string& _password,
Broker* _broker,
Manageable* parent)
: links(_links), store(_store), host(_host), port(_port),
@@ -64,10 +84,11 @@ Link::Link(LinkRegistry* _links,
visitCount(0),
currentInterval(1),
closing(false),
- updateUrls(false),
+ reconnectNext(0), // Index of next address for reconnecting in url.
channelCounter(1),
connection(0),
- agent(0)
+ agent(0),
+ timerTask(new LinkTimerTask(*this, broker->getTimer()))
{
if (parent != 0 && broker != 0)
{
@@ -78,7 +99,13 @@ Link::Link(LinkRegistry* _links,
agent->addObject(mgmtObject, 0, durable);
}
}
- setStateLH(STATE_WAITING);
+ if (links->isPassive()) {
+ setStateLH(STATE_PASSIVE);
+ } else {
+ setStateLH(STATE_WAITING);
+ startConnectionLH();
+ }
+ broker->getTimer().add(timerTask);
}
Link::~Link ()
@@ -113,6 +140,7 @@ void Link::setStateLH (int newState)
void Link::startConnectionLH ()
{
+ assert(state == STATE_WAITING);
try {
// Set the state before calling connect. It is possible that connect
// will fail synchronously and call Link::closed before returning.
@@ -120,15 +148,18 @@ void Link::startConnectionLH ()
broker->connect (host, boost::lexical_cast<std::string>(port), transport,
boost::bind (&Link::closed, this, _1, _2));
QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
- } catch(std::exception& e) {
+ } catch(const std::exception& e) {
+ QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: "
+ << e.what());
setStateLH(STATE_WAITING);
if (!hideManagement())
mgmtObject->set_lastError (e.what());
}
}
-void Link::established ()
+void Link::established(Connection* c)
{
+ if (state == STATE_PASSIVE) return;
stringstream addr;
addr << host << ":" << port;
QPID_LOG (info, "Inter-broker link established to " << addr.str());
@@ -136,17 +167,40 @@ void Link::established ()
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
- {
- Mutex::ScopedLock mutex(lock);
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- if (closing)
- destroy();
+ Mutex::ScopedLock mutex(lock);
+ setStateLH(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ connection = c;
+ if (closing)
+ destroy();
+ else // Process any IO tasks bridges added before established.
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+
+void Link::setUrl(const Url& u) {
+ Mutex::ScopedLock mutex(lock);
+ url = u;
+ reconnectNext = 0;
+}
+
+void Link::opened() {
+ Mutex::ScopedLock mutex(lock);
+ if (!connection) return;
+ // Get default URL from known-hosts if not already set
+ if (url.empty()) {
+ const std::vector<Url>& known = connection->getKnownHosts();
+ // Flatten vector of URLs into a single URL listing all addresses.
+ url.clear();
+ for(size_t i = 0; i < known.size(); ++i)
+ url.insert(url.end(), known[i].begin(), known[i].end());
+ reconnectNext = 0;
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
}
}
-void Link::closed (int, std::string text)
+void Link::closed(int, std::string text)
{
Mutex::ScopedLock mutex(lock);
QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
@@ -156,7 +210,7 @@ void Link::closed (int, std::string text)
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
- QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
+ QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str());
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
@@ -167,7 +221,7 @@ void Link::closed (int, std::string text)
}
active.clear();
- if (state != STATE_FAILED)
+ if (state != STATE_FAILED && state != STATE_PASSIVE)
{
setStateLH(STATE_WAITING);
if (!hideManagement())
@@ -178,6 +232,7 @@ void Link::closed (int, std::string text)
destroy();
}
+// Called in connection IO thread.
void Link::destroy ()
{
Bridges toDelete;
@@ -187,7 +242,7 @@ void Link::destroy ()
QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
if (connection)
connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-
+ connection = 0;
setStateLH(STATE_CLOSED);
// Move the bridges to be deleted into a local vector so there is no
@@ -201,6 +256,8 @@ void Link::destroy ()
for (Bridges::iterator i = created.begin(); i != created.end(); i++)
toDelete.push_back(*i);
created.clear();
+
+ timerTask->cancel();
}
// Now delete all bridges on this link (don't hold the lock for this).
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
@@ -213,10 +270,14 @@ void Link::add(Bridge::shared_ptr bridge)
{
Mutex::ScopedLock mutex(lock);
created.push_back (bridge);
+ if (connection)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+
}
void Link::cancel(Bridge::shared_ptr bridge)
{
+ bool needIOProcessing = false;
{
Mutex::ScopedLock mutex(lock);
@@ -234,10 +295,10 @@ void Link::cancel(Bridge::shared_ptr bridge)
break;
}
}
+ needIOProcessing = !cancellations.empty();
}
- if (!cancellations.empty()) {
+ if (needIOProcessing && connection)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
- }
}
void Link::ioThreadProcessing()
@@ -246,7 +307,6 @@ void Link::ioThreadProcessing()
if (state != STATE_OPERATIONAL)
return;
- QPID_LOG(debug, "Link::ioThreadProcessing()");
// check for bridge session errors and recover
if (!active.empty()) {
@@ -279,23 +339,10 @@ void Link::ioThreadProcessing()
}
}
-void Link::setConnection(Connection* c)
-{
- Mutex::ScopedLock mutex(lock);
- connection = c;
- updateUrls = true;
-}
-
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
- if (connection && updateUrls) {
- urls.reset(connection->getKnownHosts());
- QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
- updateUrls = false;
- }
-
if (state == STATE_WAITING)
{
visitCount++;
@@ -303,7 +350,7 @@ void Link::maintenanceVisit ()
{
visitCount = 0;
//switch host and port to next in url list if possible
- if (!tryFailover()) {
+ if (!tryFailoverLH()) {
currentInterval *= 2;
if (currentInterval > MAX_INTERVAL)
currentInterval = MAX_INTERVAL;
@@ -313,11 +360,10 @@ void Link::maintenanceVisit ()
}
else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-}
+ }
-void Link::reconnect(const qpid::Address& a)
+void Link::reconnectLH(const Address& a)
{
- Mutex::ScopedLock mutex(lock);
host = a.host;
port = a.port;
transport = a.protocol;
@@ -329,17 +375,18 @@ void Link::reconnect(const qpid::Address& a)
}
}
-bool Link::tryFailover()
-{
- Address next;
- if (urls.next(next) &&
- (next.host != host || next.port != port || next.protocol != transport)) {
+bool Link::tryFailoverLH() {
+ assert(state == STATE_WAITING);
+ if (reconnectNext >= url.size()) reconnectNext = 0;
+ if (url.empty()) return false;
+ Address next = url[reconnectNext++];
+ if (next.host != host || next.port != port || next.protocol != transport) {
links->changeAddress(Address(transport, host, port), next);
- QPID_LOG(debug, "Link failing over to " << host << ":" << port);
+ QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
+ reconnectLH(next);
return true;
- } else {
- return false;
}
+ return false;
}
// Management updates for a linke are inconsistent in a cluster, so they are
@@ -423,18 +470,24 @@ ManagementObject* Link::GetManagementObject (void) const
return (ManagementObject*) mgmtObject;
}
+void Link::close() {
+ Mutex::ScopedLock mutex(lock);
+ if (!closing) {
+ closing = true;
+ if (state != STATE_CONNECTING && connection) {
+ //connection can only be closed on the connections own IO processing thread
+ connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+ }
+ }
+}
+
+
Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& text)
{
switch (op)
{
case _qmf::Link::METHOD_CLOSE :
- if (!closing) {
- closing = true;
- if (state != STATE_CONNECTING && connection) {
- //connection can only be closed on the connections own IO processing thread
- connection->requestIOProcessing(boost::bind(&Link::destroy, this));
- }
- }
+ close();
return Manageable::STATUS_OK;
case _qmf::Link::METHOD_BRIDGE :
@@ -483,7 +536,10 @@ void Link::setPassive(bool passive)
if (state == STATE_PASSIVE) {
setStateLH(STATE_WAITING);
} else {
- QPID_LOG(warning, "Ignoring attempt to activate non-passive link");
+ QPID_LOG(warning, "Ignoring attempt to activate non-passive link "
+ << host << ":" << port);
}
}
}
+
+}} // namespace qpid::broker