diff options
author | Andrew Stitcher <astitcher@apache.org> | 2009-01-13 19:08:29 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2009-01-13 19:08:29 +0000 |
commit | e4d0cd740117f0a65b90f3a7d89264e0c82c3e76 (patch) | |
tree | 80547638f7d8eea1d214752c274edb24f797cab6 | |
parent | edd6337731e417cc13f9d698bcda1d5911fcb782 (diff) | |
download | qpid-python-e4d0cd740117f0a65b90f3a7d89264e0c82c3e76.tar.gz |
Send heartbeat from broker to client
- Server sends possible heartbeat range and client replies with desired
heartbeat as part of the tune-tuneOk exchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@734220 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 46 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionState.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionHandler.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionHandler.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ConnectionOptions.h | 1 | ||||
-rw-r--r-- | qpid/specs/amqp.0-10-qpid-errata.xml | 2 |
10 files changed, 88 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index eb54ddfd56..66ee6281c6 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -38,7 +38,6 @@ using namespace qpid::sys; using namespace qpid::framing; -using namespace qpid::sys; using qpid::ptr_map_ptr; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; @@ -57,7 +56,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtId(mgmtId_), mgmtObject(0), links(broker_.getLinks()), - agent(0) + agent(0), + timer(broker_.getTimer()) { Manageable* parent = broker.GetVhostObject(); @@ -92,6 +92,9 @@ Connection::~Connection() } if (isLink) links.notifyClosed(mgmtId); + + if (heartbeatTimer) + heartbeatTimer->cancel(); } void Connection::received(framing::AMQFrame& frame) { @@ -174,6 +177,8 @@ void Connection::setFederationLink(bool b) void Connection::close(connection::CloseCode code, const string& text) { QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")"); + if (heartbeatTimer) + heartbeatTimer->cancel(); adapter.close(code, text); //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); @@ -183,6 +188,8 @@ void Connection::close(connection::CloseCode code, const string& text) // Send a close to the client but keep the channels. Used by cluster. void Connection::sendClose() { + if (heartbeatTimer) + heartbeatTimer->cancel(); adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); getOutput().close(); } @@ -230,6 +237,10 @@ bool Connection::doOutput() { return false; } +void Connection::sendHeartbeat() { + adapter.heartbeat(); +} + void Connection::closeChannel(uint16_t id) { ChannelMap::iterator i = channels.find(id); if (i != channels.end()) channels.erase(i); @@ -272,5 +283,36 @@ void Connection::setSecureConnection(SecureConnection* s) adapter.setSecureConnection(s); } +struct ConnectionHeartbeatTask : public TimerTask { + Timer& timer; + Connection& connection; + ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : + TimerTask(Duration(hb*TIME_SEC)), + timer(t), + connection(c) + {} + + void fire() { + // This is the best we can currently do to avoid a destruction/fire race + if (!isCancelled()) { + // Setup next firing + reset(); + timer.add(this); + + // Send Heartbeat + connection.sendHeartbeat(); + } + } +}; + +void Connection::setHeartbeatInterval(uint16_t heartbeat) +{ + setHeartbeat(heartbeat); + if (heartbeat > 0) { + heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); + timer.add(heartbeatTimer); + } +} + }} diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index acd9f94d9b..5cbff57788 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -99,6 +99,9 @@ class Connection : public sys::ConnectionInputHandler, const std::string& getMgmtId() const { return mgmtId; } management::ManagementAgent* getAgent() const { return agent; } void setFederationLink(bool b); + + void setHeartbeatInterval(uint16_t heartbeat); + void sendHeartbeat(); template <class F> void eachSessionHandler(F f) { for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i) @@ -112,7 +115,7 @@ class Connection : public sys::ConnectionInputHandler, typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; ChannelMap channels; - framing::AMQP_ClientProxy::Connection* client; + //framing::AMQP_ClientProxy::Connection* client; ConnectionHandler adapter; bool isLink; bool mgmtClosing; @@ -121,6 +124,8 @@ class Connection : public sys::ConnectionInputHandler, qmf::org::apache::qpid::broker::Connection* mgmtObject; LinkRegistry& links; management::ManagementAgent* agent; + Timer& timer; + boost::intrusive_ptr<TimerTask> heartbeatTimer; }; }} diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 6f99b60cd8..38e667dcba 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -52,6 +52,11 @@ void ConnectionHandler::close(connection::CloseCode code, const string& text) handler->client.close(code, text); } +void ConnectionHandler::heartbeat() +{ + handler->client.heartbeat(); +} + void ConnectionHandler::handle(framing::AMQFrame& frame) { AMQMethodBody* method=frame.getBody()->getMethod(); @@ -157,7 +162,7 @@ void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/, uint16_t framemax, uint16_t heartbeat) { connection.setFrameMax(framemax); - connection.setHeartbeat(heartbeat); + connection.setHeartbeatInterval(heartbeat); } void ConnectionHandler::Handler::open(const string& /*virtualHost*/, @@ -194,6 +199,11 @@ void ConnectionHandler::Handler::closeOk(){ connection.getOutput().close(); } +void ConnectionHandler::Handler::heartbeat(){ + // Do nothing - the purpose of heartbeats is just to make sure that there is some + // traffic on the connection within the heart beat interval, we check for the + // traffic and don't need to do anything in response to heartbeats +} void ConnectionHandler::Handler::start(const FieldTable& serverProperties, const framing::Array& /*mechanisms*/, diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h index 6fd252b120..b24c10e9e8 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h @@ -62,7 +62,7 @@ class ConnectionHandler : public framing::FrameHandler const std::string& locale); void secureOk(const std::string& response); void tuneOk(uint16_t channelMax, uint16_t frameMax, uint16_t heartbeat); - void heartbeat() {} + void heartbeat(); void open(const std::string& virtualHost, const framing::Array& capabilities, bool insist); void close(uint16_t replyCode, const std::string& replyText); @@ -88,6 +88,7 @@ class ConnectionHandler : public framing::FrameHandler public: ConnectionHandler(Connection& connection, bool isClient); void close(framing::connection::CloseCode code, const std::string& text); + void heartbeat(); void handle(framing::AMQFrame& frame); void setSecureConnection(SecureConnection* secured); }; diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h index fd69157dbd..53591dc40a 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionState.h +++ b/qpid/cpp/src/qpid/broker/ConnectionState.h @@ -44,6 +44,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable outputTasks(out), framemax(65535), heartbeat(0), + heartbeatmax(120), stagingThreshold(broker.getStagingThreshold()), federationLink(true) {} @@ -54,10 +55,12 @@ class ConnectionState : public ConnectionToken, public management::Manageable uint32_t getFrameMax() const { return framemax; } uint16_t getHeartbeat() const { return heartbeat; } + uint16_t getHeartbeatMax() const { return heartbeatmax; } uint64_t getStagingThreshold() const { return stagingThreshold; } void setFrameMax(uint32_t fm) { framemax = fm; } void setHeartbeat(uint16_t hb) { heartbeat = hb; } + void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; } void setStagingThreshold(uint64_t st) { stagingThreshold = st; } virtual void setUserId(const string& uid) { userId = uid; } @@ -88,6 +91,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable framing::ProtocolVersion version; uint32_t framemax; uint16_t heartbeat; + uint16_t heartbeatmax; uint64_t stagingThreshold; string userId; string url; diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp index 9fce1fbbd5..57c761a41d 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -160,7 +160,7 @@ void NullAuthenticator::start(const string& mechanism, const string& response) } else { connection.setUserId("anonymous"); } - client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, 0); + client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax()); } @@ -341,7 +341,7 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen connection.setUserId(const_cast<char*>(static_cast<const char*>(uid))); - client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, 0); + client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, connection.getHeartbeatMax()); } else if (SASL_CONTINUE == code) { string challenge_str(challenge, challenge_len); diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index 2a070ebcff..d5b3f2264b 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -133,6 +133,13 @@ void ConnectionHandler::close() } } +void ConnectionHandler::heartbeat() +{ + // Do nothing - the purpose of heartbeats is just to make sure that there is some + // traffic on the connection within the heart beat interval, we check for the + // traffic and don't need to do anything in response to heartbeats +} + void ConnectionHandler::checkState(STATES s, const std::string& msg) { if (getState() != s) { @@ -195,13 +202,19 @@ void ConnectionHandler::secure(const std::string& challenge) } void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed, - uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/) + uint16_t heartbeatMin, uint16_t heartbeatMax) { checkState(NEGOTIATING, INVALID_STATE_TUNE); maxChannels = std::min(maxChannels, maxChannelsProposed); maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed); - //TODO: implement heartbeats and check desired value is in valid range + // Clip the requested heartbeat to the maximum/minimum offered + uint16_t heartbeat = ConnectionSettings::heartbeat; + heartbeat = heartbeat < heartbeatMin ? heartbeatMin : + heartbeat > heartbeatMax ? heartbeatMax : + heartbeat; proxy.tuneOk(maxChannels, maxFrameSize, heartbeat); + // TODO set connection timeout to be 2x heart beat interval + // TODO and set an alarm for it. setState(OPENING); proxy.open(virtualhost, capabilities, insist); } diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h index ec9278626f..58b540cfeb 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h @@ -86,6 +86,7 @@ class ConnectionHandler : private StateManager, const framing::Array& knownHosts); void close(uint16_t replyCode, const std::string& replyText); void closeOk(); + void heartbeat(); public: using InputHandler::handle; diff --git a/qpid/cpp/src/tests/ConnectionOptions.h b/qpid/cpp/src/tests/ConnectionOptions.h index 30fe5ad9b1..cf86894235 100644 --- a/qpid/cpp/src/tests/ConnectionOptions.h +++ b/qpid/cpp/src/tests/ConnectionOptions.h @@ -44,6 +44,7 @@ struct ConnectionOptions : public qpid::Options, ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.") ("locale", optValue(locale, "LOCALE"), "locale to use.") ("max-channels", optValue(maxChannels, "N"), "the maximum number of channels the client requires.") + ("heartbeat", optValue(heartbeat, "N"), "Desired heartbeat interval in seconds.") ("max-frame-size", optValue(maxFrameSize, "N"), "the maximum frame size to request.") ("bounds-multiplier", optValue(bounds, "N"), "bound size of write queue (as a multiple of the max frame size).") diff --git a/qpid/specs/amqp.0-10-qpid-errata.xml b/qpid/specs/amqp.0-10-qpid-errata.xml index 1b15588a5e..365928ea4e 100644 --- a/qpid/specs/amqp.0-10-qpid-errata.xml +++ b/qpid/specs/amqp.0-10-qpid-errata.xml @@ -1912,6 +1912,8 @@ is idle. If a connection is idle for more than twice the negotiated heartbeat delay, the peers MAY be considered disconnected. </doc> + <implement role="client" handle="MAY" /> + <implement role="server" handle="MAY" /> </control> <!-- - Control: connection.close - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> |