diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
tree | ce493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/broker/Connection.cpp | |
parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 68 |
1 files changed, 61 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 5e339cec03..8d250a32e5 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -43,7 +43,7 @@ #include <iostream> #include <assert.h> - +using std::string; using namespace qpid::sys; using namespace qpid::framing; @@ -87,10 +87,14 @@ Connection::Connection(ConnectionOutputHandler* out_, bool link_, uint64_t objectId_, bool shadow_, - bool delayManagement) : + bool delayManagement, + bool authenticated_ +) : ConnectionState(out_, broker_), securitySettings(external), - adapter(*this, link_, shadow_), + shadow(shadow_), + authenticated(authenticated_), + adapter(*this, link_), link(link_), mgmtClosing(false), mgmtId(mgmtId_), @@ -100,14 +104,12 @@ Connection::Connection(ConnectionOutputHandler* out_, timer(broker_.getTimer()), errorListener(0), objectId(objectId_), - shadow(shadow_), outboundTracker(*this) { outboundTracker.wrap(out); broker.getConnectionObservers().connection(*this); // In a cluster, allow adding the management object to be delayed. if (!delayManagement) addManagementObject(); - if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); } void Connection::addManagementObject() { @@ -141,6 +143,8 @@ Connection::~Connection() // a cluster-unsafe context. Don't raise an event in that case. if (!link && isClusterSafe()) agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId())); + QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId() + << " rhost:" << mgmtId ); } broker.getConnectionObservers().closed(*this); @@ -148,8 +152,9 @@ Connection::~Connection() heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); - - if (!isShadow()) broker.getConnectionCounter().dec_connectionCount(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } } void Connection::received(framing::AMQFrame& frame) { @@ -284,6 +289,10 @@ void Connection::raiseConnectEvent() { mgmtObject->set_authIdentity(userId); agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId)); } + + QPID_LOG_CAT(debug, model, "Create connection. user:" << userId + << " rhost:" << mgmtId ); + } void Connection::setUserProxyAuth(bool b) @@ -300,6 +309,9 @@ void Connection::close(connection::CloseCode code, const string& text) heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } adapter.close(code, text); //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); @@ -313,6 +325,9 @@ void Connection::sendClose() { heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); getOutput().close(); } @@ -326,6 +341,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions. heartbeatTimer->cancel(); if (timeoutTimer) timeoutTimer->cancel(); + if (linkHeartbeatTimer) { + linkHeartbeatTimer->cancel(); + } try { while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); @@ -435,6 +453,31 @@ struct ConnectionHeartbeatTask : public sys::TimerTask { } }; +class LinkHeartbeatTask : public qpid::sys::TimerTask { + sys::Timer& timer; + Connection& connection; + bool heartbeatSeen; + + void fire() { + if (!heartbeatSeen) { + QPID_LOG(error, "Federation link connection " << connection.getMgmtId() << " missed 2 heartbeats - closing connection"); + connection.abort(); + } else { + heartbeatSeen = false; + // Setup next firing + setupNextFire(); + timer.add(this); + } + } + +public: + LinkHeartbeatTask(sys::Timer& t, qpid::sys::Duration period, Connection& c) : + TimerTask(period, "LinkHeartbeatTask"), timer(t), connection(c), heartbeatSeen(false) {} + + void heartbeatReceived() { heartbeatSeen = true; } +}; + + void Connection::abort() { // Make sure that we don't try to send a heartbeat as we're @@ -460,10 +503,21 @@ void Connection::setHeartbeatInterval(uint16_t heartbeat) } } +void Connection::startLinkHeartbeatTimeoutTask() { + if (!linkHeartbeatTimer && heartbeat > 0) { + linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this); + timer.add(linkHeartbeatTimer); + } +} + void Connection::restartTimeout() { if (timeoutTimer) timeoutTimer->touch(); + + if (linkHeartbeatTimer) { + static_cast<LinkHeartbeatTask*>(linkHeartbeatTimer.get())->heartbeatReceived(); + } } bool Connection::isOpen() { return adapter.isOpen(); } |