summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-01-13 19:08:29 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-01-13 19:08:29 +0000
commite4d0cd740117f0a65b90f3a7d89264e0c82c3e76 (patch)
tree80547638f7d8eea1d214752c274edb24f797cab6
parentedd6337731e417cc13f9d698bcda1d5911fcb782 (diff)
downloadqpid-python-e4d0cd740117f0a65b90f3a7d89264e0c82c3e76.tar.gz
Send heartbeat from broker to client
- Server sends possible heartbeat range and client replies with desired heartbeat as part of the tune-tuneOk exchange git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@734220 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp46
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h7
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.h3
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionState.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.cpp17
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.h1
-rw-r--r--qpid/cpp/src/tests/ConnectionOptions.h1
-rw-r--r--qpid/specs/amqp.0-10-qpid-errata.xml2
10 files changed, 88 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index eb54ddfd56..66ee6281c6 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -38,7 +38,6 @@
using namespace qpid::sys;
using namespace qpid::framing;
-using namespace qpid::sys;
using qpid::ptr_map_ptr;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
@@ -57,7 +56,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
mgmtId(mgmtId_),
mgmtObject(0),
links(broker_.getLinks()),
- agent(0)
+ agent(0),
+ timer(broker_.getTimer())
{
Manageable* parent = broker.GetVhostObject();
@@ -92,6 +92,9 @@ Connection::~Connection()
}
if (isLink)
links.notifyClosed(mgmtId);
+
+ if (heartbeatTimer)
+ heartbeatTimer->cancel();
}
void Connection::received(framing::AMQFrame& frame) {
@@ -174,6 +177,8 @@ void Connection::setFederationLink(bool b)
void Connection::close(connection::CloseCode code, const string& text)
{
QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")");
+ if (heartbeatTimer)
+ heartbeatTimer->cancel();
adapter.close(code, text);
//make sure we delete dangling pointers from outputTasks before deleting sessions
outputTasks.removeAll();
@@ -183,6 +188,8 @@ void Connection::close(connection::CloseCode code, const string& text)
// Send a close to the client but keep the channels. Used by cluster.
void Connection::sendClose() {
+ if (heartbeatTimer)
+ heartbeatTimer->cancel();
adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
getOutput().close();
}
@@ -230,6 +237,10 @@ bool Connection::doOutput() {
return false;
}
+void Connection::sendHeartbeat() {
+ adapter.heartbeat();
+}
+
void Connection::closeChannel(uint16_t id) {
ChannelMap::iterator i = channels.find(id);
if (i != channels.end()) channels.erase(i);
@@ -272,5 +283,36 @@ void Connection::setSecureConnection(SecureConnection* s)
adapter.setSecureConnection(s);
}
+struct ConnectionHeartbeatTask : public TimerTask {
+ Timer& timer;
+ Connection& connection;
+ ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
+ TimerTask(Duration(hb*TIME_SEC)),
+ timer(t),
+ connection(c)
+ {}
+
+ void fire() {
+ // This is the best we can currently do to avoid a destruction/fire race
+ if (!isCancelled()) {
+ // Setup next firing
+ reset();
+ timer.add(this);
+
+ // Send Heartbeat
+ connection.sendHeartbeat();
+ }
+ }
+};
+
+void Connection::setHeartbeatInterval(uint16_t heartbeat)
+{
+ setHeartbeat(heartbeat);
+ if (heartbeat > 0) {
+ heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
+ timer.add(heartbeatTimer);
+ }
+}
+
}}
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index acd9f94d9b..5cbff57788 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -99,6 +99,9 @@ class Connection : public sys::ConnectionInputHandler,
const std::string& getMgmtId() const { return mgmtId; }
management::ManagementAgent* getAgent() const { return agent; }
void setFederationLink(bool b);
+
+ void setHeartbeatInterval(uint16_t heartbeat);
+ void sendHeartbeat();
template <class F> void eachSessionHandler(F f) {
for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
@@ -112,7 +115,7 @@ class Connection : public sys::ConnectionInputHandler,
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
ChannelMap channels;
- framing::AMQP_ClientProxy::Connection* client;
+ //framing::AMQP_ClientProxy::Connection* client;
ConnectionHandler adapter;
bool isLink;
bool mgmtClosing;
@@ -121,6 +124,8 @@ class Connection : public sys::ConnectionInputHandler,
qmf::org::apache::qpid::broker::Connection* mgmtObject;
LinkRegistry& links;
management::ManagementAgent* agent;
+ Timer& timer;
+ boost::intrusive_ptr<TimerTask> heartbeatTimer;
};
}}
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 6f99b60cd8..38e667dcba 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -52,6 +52,11 @@ void ConnectionHandler::close(connection::CloseCode code, const string& text)
handler->client.close(code, text);
}
+void ConnectionHandler::heartbeat()
+{
+ handler->client.heartbeat();
+}
+
void ConnectionHandler::handle(framing::AMQFrame& frame)
{
AMQMethodBody* method=frame.getBody()->getMethod();
@@ -157,7 +162,7 @@ void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
uint16_t framemax, uint16_t heartbeat)
{
connection.setFrameMax(framemax);
- connection.setHeartbeat(heartbeat);
+ connection.setHeartbeatInterval(heartbeat);
}
void ConnectionHandler::Handler::open(const string& /*virtualHost*/,
@@ -194,6 +199,11 @@ void ConnectionHandler::Handler::closeOk(){
connection.getOutput().close();
}
+void ConnectionHandler::Handler::heartbeat(){
+ // Do nothing - the purpose of heartbeats is just to make sure that there is some
+ // traffic on the connection within the heart beat interval, we check for the
+ // traffic and don't need to do anything in response to heartbeats
+}
void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
const framing::Array& /*mechanisms*/,
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
index 6fd252b120..b24c10e9e8 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
@@ -62,7 +62,7 @@ class ConnectionHandler : public framing::FrameHandler
const std::string& locale);
void secureOk(const std::string& response);
void tuneOk(uint16_t channelMax, uint16_t frameMax, uint16_t heartbeat);
- void heartbeat() {}
+ void heartbeat();
void open(const std::string& virtualHost,
const framing::Array& capabilities, bool insist);
void close(uint16_t replyCode, const std::string& replyText);
@@ -88,6 +88,7 @@ class ConnectionHandler : public framing::FrameHandler
public:
ConnectionHandler(Connection& connection, bool isClient);
void close(framing::connection::CloseCode code, const std::string& text);
+ void heartbeat();
void handle(framing::AMQFrame& frame);
void setSecureConnection(SecureConnection* secured);
};
diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h
index fd69157dbd..53591dc40a 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionState.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionState.h
@@ -44,6 +44,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
outputTasks(out),
framemax(65535),
heartbeat(0),
+ heartbeatmax(120),
stagingThreshold(broker.getStagingThreshold()),
federationLink(true)
{}
@@ -54,10 +55,12 @@ class ConnectionState : public ConnectionToken, public management::Manageable
uint32_t getFrameMax() const { return framemax; }
uint16_t getHeartbeat() const { return heartbeat; }
+ uint16_t getHeartbeatMax() const { return heartbeatmax; }
uint64_t getStagingThreshold() const { return stagingThreshold; }
void setFrameMax(uint32_t fm) { framemax = fm; }
void setHeartbeat(uint16_t hb) { heartbeat = hb; }
+ void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
virtual void setUserId(const string& uid) { userId = uid; }
@@ -88,6 +91,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
framing::ProtocolVersion version;
uint32_t framemax;
uint16_t heartbeat;
+ uint16_t heartbeatmax;
uint64_t stagingThreshold;
string userId;
string url;
diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
index 9fce1fbbd5..57c761a41d 100644
--- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -160,7 +160,7 @@ void NullAuthenticator::start(const string& mechanism, const string& response)
} else {
connection.setUserId("anonymous");
}
- client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, 0);
+ client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax());
}
@@ -341,7 +341,7 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen
connection.setUserId(const_cast<char*>(static_cast<const char*>(uid)));
- client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, 0);
+ client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax());
} else if (SASL_CONTINUE == code) {
string challenge_str(challenge, challenge_len);
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
index 2a070ebcff..d5b3f2264b 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -133,6 +133,13 @@ void ConnectionHandler::close()
}
}
+void ConnectionHandler::heartbeat()
+{
+ // Do nothing - the purpose of heartbeats is just to make sure that there is some
+ // traffic on the connection within the heart beat interval, we check for the
+ // traffic and don't need to do anything in response to heartbeats
+}
+
void ConnectionHandler::checkState(STATES s, const std::string& msg)
{
if (getState() != s) {
@@ -195,13 +202,19 @@ void ConnectionHandler::secure(const std::string& challenge)
}
void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed,
- uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/)
+ uint16_t heartbeatMin, uint16_t heartbeatMax)
{
checkState(NEGOTIATING, INVALID_STATE_TUNE);
maxChannels = std::min(maxChannels, maxChannelsProposed);
maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
- //TODO: implement heartbeats and check desired value is in valid range
+ // Clip the requested heartbeat to the maximum/minimum offered
+ uint16_t heartbeat = ConnectionSettings::heartbeat;
+ heartbeat = heartbeat < heartbeatMin ? heartbeatMin :
+ heartbeat > heartbeatMax ? heartbeatMax :
+ heartbeat;
proxy.tuneOk(maxChannels, maxFrameSize, heartbeat);
+ // TODO set connection timeout to be 2x heart beat interval
+ // TODO and set an alarm for it.
setState(OPENING);
proxy.open(virtualhost, capabilities, insist);
}
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h
index ec9278626f..58b540cfeb 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h
@@ -86,6 +86,7 @@ class ConnectionHandler : private StateManager,
const framing::Array& knownHosts);
void close(uint16_t replyCode, const std::string& replyText);
void closeOk();
+ void heartbeat();
public:
using InputHandler::handle;
diff --git a/qpid/cpp/src/tests/ConnectionOptions.h b/qpid/cpp/src/tests/ConnectionOptions.h
index 30fe5ad9b1..cf86894235 100644
--- a/qpid/cpp/src/tests/ConnectionOptions.h
+++ b/qpid/cpp/src/tests/ConnectionOptions.h
@@ -44,6 +44,7 @@ struct ConnectionOptions : public qpid::Options,
("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.")
("locale", optValue(locale, "LOCALE"), "locale to use.")
("max-channels", optValue(maxChannels, "N"), "the maximum number of channels the client requires.")
+ ("heartbeat", optValue(heartbeat, "N"), "Desired heartbeat interval in seconds.")
("max-frame-size", optValue(maxFrameSize, "N"), "the maximum frame size to request.")
("bounds-multiplier", optValue(bounds, "N"),
"bound size of write queue (as a multiple of the max frame size).")
diff --git a/qpid/specs/amqp.0-10-qpid-errata.xml b/qpid/specs/amqp.0-10-qpid-errata.xml
index 1b15588a5e..365928ea4e 100644
--- a/qpid/specs/amqp.0-10-qpid-errata.xml
+++ b/qpid/specs/amqp.0-10-qpid-errata.xml
@@ -1912,6 +1912,8 @@
is idle. If a connection is idle for more than twice the negotiated heartbeat delay, the
peers MAY be considered disconnected.
</doc>
+ <implement role="client" handle="MAY" />
+ <implement role="server" handle="MAY" />
</control>
<!-- - Control: connection.close - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->