diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-09-01 18:11:26 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-09-01 18:11:26 +0000 |
commit | 0d8d94a588668bd651a02adec04c60fcd615c3ae (patch) | |
tree | 7be439b148c11a1201e2b17b7cc80bd080ab5073 /cpp | |
parent | 60a585f3bd0823aff820279ed87137797eb7c100 (diff) | |
download | qpid-python-0d8d94a588668bd651a02adec04c60fcd615c3ae.tar.gz |
QPID-2841: set the TTL in agent heartbeat messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@991630 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 3 |
4 files changed, 24 insertions, 6 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 15e17e13f3..25d24a542c 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -452,7 +452,12 @@ void ManagementAgentImpl::sendHeartbeat() getHeartbeatContent(map); MapCodec::encode(map, content); - connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str()); + + // Set TTL (in msecs) on outgoing heartbeat indications based on the interval + // time to prevent stale heartbeats from getting to the consoles. + + connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), + "amqp/map", interval * 2 * 1000); QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); } @@ -1213,7 +1218,8 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, const Variant::Map headers, const string& exchange, const string& routingKey, - const string& contentType) + const string& contentType, + uint64_t ttl_msec) { Message msg; Variant::Map::const_iterator i; @@ -1223,6 +1229,10 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, if (!contentType.empty()) msg.getMessageProperties().setContentType(contentType); + + if (ttl_msec) + msg.getDeliveryProperties().setTtl(ttl_msec); + for (i = headers.begin(); i != headers.end(); ++i) { msg.getHeaders().setString(i->first, i->second.asString()); } diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index 477526c882..3bd3c6fb8e 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -224,7 +224,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen const qpid::types::Variant::Map headers, const std::string& exchange, const std::string& routingKey, - const std::string& contentType="amqp/map"); + const std::string& contentType="amqp/map", + uint64_t ttl_msec=0); void sendMessage(qpid::client::Message msg, const std::string& exchange, const std::string& routingKey); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 847f2b53ce..09494f1d8e 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -556,7 +556,8 @@ void ManagementAgent::sendBufferLH(const string& data, const Variant::Map& headers, const string& content_type, qpid::broker::Exchange::shared_ptr exchange, - const string& routingKey) + const string& routingKey, + uint64_t ttl_msec) { Variant::Map::const_iterator i; @@ -595,6 +596,8 @@ void ManagementAgent::sendBufferLH(const string& data, DeliveryProperties* dp = msg->getFrames().getHeaders()->get<DeliveryProperties>(true); dp->setRoutingKey(routingKey); + if (ttl_msec) + dp->setTtl(ttl_msec); msg->getFrames().append(content); @@ -843,7 +846,10 @@ void ManagementAgent::periodicProcessing (void) string content; MapCodec::encode(map, content); - sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str()); + + // Set TTL (in msecs) on outgoing heartbeat indications based on the interval + // time to prevent stale heartbeats from getting to the consoles. + sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); } diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index d12b417820..f4d3c8c299 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -312,7 +312,8 @@ private: const qpid::types::Variant::Map& headers, const std::string& content_type, qpid::broker::Exchange::shared_ptr exchange, - const std::string& routingKey); + const std::string& routingKey, + uint64_t ttl_msec = 0); void moveNewObjectsLH(); bool authorizeAgentMessageLH(qpid::broker::Message& msg); |