From 3a60db0672b78a75c52f39f5cefeeb00d3eeba97 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 9 Feb 2009 22:25:26 +0000 Subject: Cluster support for message time-to-live. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@742774 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Message.cpp | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) (limited to 'cpp/src/qpid/broker/Message.cpp') diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e5a0c3e9e1..ce0477b08c 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -21,6 +21,7 @@ #include "Message.h" #include "ExchangeRegistry.h" +#include "ExpiryPolicy.h" #include "qpid/StringUtils.h" #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" @@ -316,24 +317,29 @@ void Message::addTraceId(const std::string& id) } } -void Message::setTimestamp() +void Message::setTimestamp(const boost::intrusive_ptr& e) { DeliveryProperties* props = getProperties(); - //Spec states that timestamp should be set, evaluate the - //performance impact before re-enabling this: - //time_t now = ::time(0); - //props->setTimestamp(now); if (props->getTtl()) { - //set expiration (nb: ttl is in millisecs, time_t is in secs) + // AMQP requires setting the expiration property to be posix + // time_t in seconds. TTL is in milliseconds time_t now = ::time(0); props->setExpiration(now + (props->getTtl()/1000)); + // Use higher resolution time for the internal expiry calculation. expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC)); + setExpiryPolicy(e); } } -bool Message::hasExpired() const +void Message::setExpiryPolicy(const boost::intrusive_ptr& e) { + expiryPolicy = e; + if (expiryPolicy) + expiryPolicy->willExpire(*this); +} + +bool Message::hasExpired() { - return expiration < FAR_FUTURE && expiration < AbsTime::now(); + return expiryPolicy && expiryPolicy->hasExpired(*this); } boost::intrusive_ptr& Message::getReplacementMessage(const Queue* qfor) const -- cgit v1.2.1