summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Connection.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /cpp/src/qpid/broker/Connection.cpp
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp68
1 files changed, 61 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 5e339cec03..8d250a32e5 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -43,7 +43,7 @@
#include <iostream>
#include <assert.h>
-
+using std::string;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -87,10 +87,14 @@ Connection::Connection(ConnectionOutputHandler* out_,
bool link_,
uint64_t objectId_,
bool shadow_,
- bool delayManagement) :
+ bool delayManagement,
+ bool authenticated_
+) :
ConnectionState(out_, broker_),
securitySettings(external),
- adapter(*this, link_, shadow_),
+ shadow(shadow_),
+ authenticated(authenticated_),
+ adapter(*this, link_),
link(link_),
mgmtClosing(false),
mgmtId(mgmtId_),
@@ -100,14 +104,12 @@ Connection::Connection(ConnectionOutputHandler* out_,
timer(broker_.getTimer()),
errorListener(0),
objectId(objectId_),
- shadow(shadow_),
outboundTracker(*this)
{
outboundTracker.wrap(out);
broker.getConnectionObservers().connection(*this);
// In a cluster, allow adding the management object to be delayed.
if (!delayManagement) addManagementObject();
- if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
}
void Connection::addManagementObject() {
@@ -141,6 +143,8 @@ Connection::~Connection()
// a cluster-unsafe context. Don't raise an event in that case.
if (!link && isClusterSafe())
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId()));
+ QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId()
+ << " rhost:" << mgmtId );
}
broker.getConnectionObservers().closed(*this);
@@ -148,8 +152,9 @@ Connection::~Connection()
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
-
- if (!isShadow()) broker.getConnectionCounter().dec_connectionCount();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
}
void Connection::received(framing::AMQFrame& frame) {
@@ -284,6 +289,10 @@ void Connection::raiseConnectEvent() {
mgmtObject->set_authIdentity(userId);
agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId));
}
+
+ QPID_LOG_CAT(debug, model, "Create connection. user:" << userId
+ << " rhost:" << mgmtId );
+
}
void Connection::setUserProxyAuth(bool b)
@@ -300,6 +309,9 @@ void Connection::close(connection::CloseCode code, const string& text)
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
adapter.close(code, text);
//make sure we delete dangling pointers from outputTasks before deleting sessions
outputTasks.removeAll();
@@ -313,6 +325,9 @@ void Connection::sendClose() {
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
getOutput().close();
}
@@ -326,6 +341,9 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
heartbeatTimer->cancel();
if (timeoutTimer)
timeoutTimer->cancel();
+ if (linkHeartbeatTimer) {
+ linkHeartbeatTimer->cancel();
+ }
try {
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
@@ -435,6 +453,31 @@ struct ConnectionHeartbeatTask : public sys::TimerTask {
}
};
+class LinkHeartbeatTask : public qpid::sys::TimerTask {
+ sys::Timer& timer;
+ Connection& connection;
+ bool heartbeatSeen;
+
+ void fire() {
+ if (!heartbeatSeen) {
+ QPID_LOG(error, "Federation link connection " << connection.getMgmtId() << " missed 2 heartbeats - closing connection");
+ connection.abort();
+ } else {
+ heartbeatSeen = false;
+ // Setup next firing
+ setupNextFire();
+ timer.add(this);
+ }
+ }
+
+public:
+ LinkHeartbeatTask(sys::Timer& t, qpid::sys::Duration period, Connection& c) :
+ TimerTask(period, "LinkHeartbeatTask"), timer(t), connection(c), heartbeatSeen(false) {}
+
+ void heartbeatReceived() { heartbeatSeen = true; }
+};
+
+
void Connection::abort()
{
// Make sure that we don't try to send a heartbeat as we're
@@ -460,10 +503,21 @@ void Connection::setHeartbeatInterval(uint16_t heartbeat)
}
}
+void Connection::startLinkHeartbeatTimeoutTask() {
+ if (!linkHeartbeatTimer && heartbeat > 0) {
+ linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this);
+ timer.add(linkHeartbeatTimer);
+ }
+}
+
void Connection::restartTimeout()
{
if (timeoutTimer)
timeoutTimer->touch();
+
+ if (linkHeartbeatTimer) {
+ static_cast<LinkHeartbeatTask*>(linkHeartbeatTimer.get())->heartbeatReceived();
+ }
}
bool Connection::isOpen() { return adapter.isOpen(); }