summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-09-01 18:11:26 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-09-01 18:11:26 +0000
commit0d8d94a588668bd651a02adec04c60fcd615c3ae (patch)
tree7be439b148c11a1201e2b17b7cc80bd080ab5073 /cpp
parent60a585f3bd0823aff820279ed87137797eb7c100 (diff)
downloadqpid-python-0d8d94a588668bd651a02adec04c60fcd615c3ae.tar.gz
QPID-2841: set the TTL in agent heartbeat messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@991630 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp14
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.h3
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp10
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h3
4 files changed, 24 insertions, 6 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 15e17e13f3..25d24a542c 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -452,7 +452,12 @@ void ManagementAgentImpl::sendHeartbeat()
getHeartbeatContent(map);
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str());
+
+ // Set TTL (in msecs) on outgoing heartbeat indications based on the interval
+ // time to prevent stale heartbeats from getting to the consoles.
+
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(),
+ "amqp/map", interval * 2 * 1000);
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
@@ -1213,7 +1218,8 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
const Variant::Map headers,
const string& exchange,
const string& routingKey,
- const string& contentType)
+ const string& contentType,
+ uint64_t ttl_msec)
{
Message msg;
Variant::Map::const_iterator i;
@@ -1223,6 +1229,10 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
if (!contentType.empty())
msg.getMessageProperties().setContentType(contentType);
+
+ if (ttl_msec)
+ msg.getDeliveryProperties().setTtl(ttl_msec);
+
for (i = headers.begin(); i != headers.end(); ++i) {
msg.getHeaders().setString(i->first, i->second.asString());
}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h
index 477526c882..3bd3c6fb8e 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -224,7 +224,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
const qpid::types::Variant::Map headers,
const std::string& exchange,
const std::string& routingKey,
- const std::string& contentType="amqp/map");
+ const std::string& contentType="amqp/map",
+ uint64_t ttl_msec=0);
void sendMessage(qpid::client::Message msg,
const std::string& exchange,
const std::string& routingKey);
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 847f2b53ce..09494f1d8e 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -556,7 +556,8 @@ void ManagementAgent::sendBufferLH(const string& data,
const Variant::Map& headers,
const string& content_type,
qpid::broker::Exchange::shared_ptr exchange,
- const string& routingKey)
+ const string& routingKey,
+ uint64_t ttl_msec)
{
Variant::Map::const_iterator i;
@@ -595,6 +596,8 @@ void ManagementAgent::sendBufferLH(const string& data,
DeliveryProperties* dp =
msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
dp->setRoutingKey(routingKey);
+ if (ttl_msec)
+ dp->setTtl(ttl_msec);
msg->getFrames().append(content);
@@ -843,7 +846,10 @@ void ManagementAgent::periodicProcessing (void)
string content;
MapCodec::encode(map, content);
- sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str());
+
+ // Set TTL (in msecs) on outgoing heartbeat indications based on the interval
+ // time to prevent stale heartbeats from getting to the consoles.
+ sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index d12b417820..f4d3c8c299 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -312,7 +312,8 @@ private:
const qpid::types::Variant::Map& headers,
const std::string& content_type,
qpid::broker::Exchange::shared_ptr exchange,
- const std::string& routingKey);
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
void moveNewObjectsLH();
bool authorizeAgentMessageLH(qpid::broker::Message& msg);