summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-06-08 14:35:01 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-06-08 14:35:01 +0000
commit58ee0f096ad8d487984fa19ccbfe6b274e989ade (patch)
tree34a089afc4335ec76d398fc187bda5d3f4395553
parent322353742b90b8f133c6c8d0a654c714071bf77f (diff)
downloadqpid-python-58ee0f096ad8d487984fa19ccbfe6b274e989ade.tar.gz
- Added heartbeat generation to the client (actually echo back any
broker generated heartbeat) - Broker now disconnects client if it receives no traffic in 2 heartbeat intervals (which is now the same as the client behvaiour) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@782651 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp68
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h11
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp17
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.cpp20
4 files changed, 82 insertions, 34 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 22188054a6..c53d943e98 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -68,8 +68,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
if (parent != 0)
{
agent = broker_.getManagementAgent();
-
-
+
+
// TODO set last bool true if system connection
if (agent != 0) {
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
@@ -95,9 +95,11 @@ Connection::~Connection()
}
if (isLink)
links.notifyClosed(mgmtId);
-
+
if (heartbeatTimer)
heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
}
void Connection::received(framing::AMQFrame& frame) {
@@ -181,7 +183,9 @@ void Connection::close(connection::CloseCode code, const string& text)
{
QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")");
if (heartbeatTimer)
- heartbeatTimer->cancel();
+ heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
adapter.close(code, text);
//make sure we delete dangling pointers from outputTasks before deleting sessions
outputTasks.removeAll();
@@ -192,7 +196,9 @@ void Connection::close(connection::CloseCode code, const string& text)
// Send a close to the client but keep the channels. Used by cluster.
void Connection::sendClose() {
if (heartbeatTimer)
- heartbeatTimer->cancel();
+ heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
getOutput().close();
}
@@ -203,7 +209,7 @@ void Connection::idleIn(){}
void Connection::closed(){ // Physically closed, suspend open sessions.
try {
- while (!channels.empty())
+ while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
@@ -221,7 +227,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
bool Connection::hasOutput() { return outputTasks.hasOutput(); }
-bool Connection::doOutput() {
+bool Connection::doOutput() {
try{
{
ScopedLock<Mutex> l(ioCallbackLock);
@@ -292,33 +298,65 @@ void Connection::setSecureConnection(SecureConnection* s)
struct ConnectionHeartbeatTask : public TimerTask {
Timer& timer;
Connection& connection;
- ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
+ ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
TimerTask(Duration(hb*TIME_SEC)),
timer(t),
connection(c)
{}
-
+
void fire() {
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
// Setup next firing
reset();
timer.add(this);
-
+
// Send Heartbeat
connection.sendHeartbeat();
}
}
};
+struct ConnectionTimeoutTask : public TimerTask {
+ Timer& timer;
+ Connection& connection;
+ ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) :
+ TimerTask(Duration(hb*2*TIME_SEC)),
+ timer(t),
+ connection(c)
+ {}
+
+ void fire() {
+ // This is the best we can currently do to avoid a destruction/fire race
+ if (!isCancelled()) {
+ // If we get here then we've not received any traffic in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection timed out: closing");
+ connection.abort();
+ }
+ }
+};
+
+void Connection::abort()
+{
+ out.abort();
+}
+
void Connection::setHeartbeatInterval(uint16_t heartbeat)
{
setHeartbeat(heartbeat);
if (heartbeat > 0) {
- heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
- timer.add(heartbeatTimer);
+ heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
+ timer.add(heartbeatTimer);
+ timeoutTimer = new ConnectionTimeoutTask(heartbeat, timer, *this);
+ timer.add(timeoutTimer);
}
}
-}}
+void Connection::restartTimeout()
+{
+ if (timeoutTimer)
+ timeoutTimer->reset();
+}
+}}
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 770bf2184f..df2c36c92e 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,7 +61,7 @@ namespace broker {
class LinkRegistry;
class SecureConnection;
-class Connection : public sys::ConnectionInputHandler,
+class Connection : public sys::ConnectionInputHandler,
public ConnectionState,
public RefCounted
{
@@ -115,9 +115,11 @@ class Connection : public sys::ConnectionInputHandler,
/** Connection does not delete the listener. 0 resets. */
void setErrorListener(ErrorListener* l) { errorListener=l; }
ErrorListener* getErrorListener() { return errorListener; }
-
+
void setHeartbeatInterval(uint16_t heartbeat);
void sendHeartbeat();
+ void restartTimeout();
+ void abort();
template <class F> void eachSessionHandler(F f) {
for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
@@ -143,6 +145,7 @@ class Connection : public sys::ConnectionInputHandler,
management::ManagementAgent* agent;
Timer& timer;
boost::intrusive_ptr<TimerTask> heartbeatTimer;
+ boost::intrusive_ptr<TimerTask> timeoutTimer;
ErrorListener* errorListener;
public:
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 8b70836da0..d3e795ae06 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -63,6 +63,9 @@ void ConnectionHandler::heartbeat()
void ConnectionHandler::handle(framing::AMQFrame& frame)
{
+ // Received frame on connection so delay timeout
+ handler->connection.restartTimeout();
+
AMQMethodBody* method=frame.getBody()->getMethod();
Connection::ErrorListener* errorListener = handler->connection.getErrorListener();
try{
@@ -186,7 +189,7 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
{
std::vector<Url> urls = connection.broker.getKnownBrokers();
framing::Array array(0x95); // str16 array
- for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i)
+ for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i)
array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
proxy.openOk(array);
@@ -197,7 +200,7 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
}
}
-
+
void ConnectionHandler::Handler::close(uint16_t replyCode, const string& replyText)
{
if (replyCode != 200) {
@@ -209,11 +212,11 @@ void ConnectionHandler::Handler::close(uint16_t replyCode, const string& replyTe
proxy.closeOk();
connection.getOutput().close();
-}
-
+}
+
void ConnectionHandler::Handler::closeOk(){
connection.getOutput().close();
-}
+}
void ConnectionHandler::Handler::heartbeat(){
// Do nothing - the purpose of heartbeats is just to make sure that there is some
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
index 6efdb91e96..db113cdf80 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -70,8 +70,8 @@ CloseCode ConnectionHandler::convert(uint16_t replyCode)
}
}
-ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v)
- : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler),
+ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v)
+ : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler),
errorCode(CLOSE_CODE_NORMAL), version(v)
{
insist = true;
@@ -82,7 +82,7 @@ ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersio
FINISHED.insert(FAILED);
FINISHED.insert(CLOSED);
-
+
properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER);
properties.setString(CLIENT_PROCESS_NAME, sys::SystemInfo::getProcessName());
properties.setInt(CLIENT_PID, sys::SystemInfo::getProcessId());
@@ -125,7 +125,7 @@ void ConnectionHandler::incoming(AMQFrame& frame)
void ConnectionHandler::outgoing(AMQFrame& frame)
{
- if (getState() == OPEN)
+ if (getState() == OPEN)
out(frame);
else
throw TransportFailure(errorText.empty() ? "Connection is not open." : errorText);
@@ -160,6 +160,10 @@ void ConnectionHandler::heartbeat()
// Do nothing - the purpose of heartbeats is just to make sure that there is some
// traffic on the connection within the heart beat interval, we check for the
// traffic and don't need to do anything in response to heartbeats
+
+ // Although the above is still true we're now using a received heartbeat as a trigger
+ // to send out our own heartbeat
+ proxy.heartbeat();
}
void ConnectionHandler::checkState(STATES s, const std::string& msg)
@@ -223,13 +227,13 @@ void ConnectionHandler::secure(const std::string& challenge)
}
}
-void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed,
+void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed,
uint16_t heartbeatMin, uint16_t heartbeatMax)
{
checkState(NEGOTIATING, INVALID_STATE_TUNE);
maxChannels = std::min(maxChannels, maxChannelsProposed);
maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
- // Clip the requested heartbeat to the maximum/minimum offered
+ // Clip the requested heartbeat to the maximum/minimum offered
uint16_t heartbeat = ConnectionSettings::heartbeat;
heartbeat = heartbeat < heartbeatMin ? heartbeatMin :
heartbeat > heartbeatMax ? heartbeatMax :