summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-05-23 16:10:27 +0000
committerGordon Sim <gsim@apache.org>2014-05-23 16:10:27 +0000
commit0c839dc7a1ca94f91dfda29134c3feffe3d6d3d5 (patch)
treefbe6e22c3d6d29dac44645ece91b682d4e84c8a8
parent8f8d83c8a6a078ec1595e2c58eed7ab00ce080c4 (diff)
downloadqpid-python-0c839dc7a1ca94f91dfda29134c3feffe3d6d3d5.tar.gz
QPID-5783: Share immutable state between copies of a message. Avoid using memory for annotations unless actually required.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1597121 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Message.cpp175
-rw-r--r--cpp/src/qpid/broker/Message.h97
-rw-r--r--cpp/src/qpid/broker/PagedQueue.cpp4
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp3
-rw-r--r--cpp/src/qpid/broker/amqp/Incoming.cpp4
-rw-r--r--cpp/src/qpid/broker/amqp/Message.h2
-rw-r--r--cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h4
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp6
-rw-r--r--cpp/src/tests/MessageUtils.h1
-rw-r--r--cpp/src/tests/QueueTest.cpp1
12 files changed, 205 insertions, 96 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 049b583335..11c8c13e12 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -46,14 +46,11 @@ using std::string;
namespace qpid {
namespace broker {
-Message::Message() : deliveryCount(-1), alreadyAcquired(false), publisher(0), expiration(FAR_FUTURE), timestamp(0),
- isManagementMessage(false), replicationId(0)
+Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0)
{}
-Message::Message(boost::intrusive_ptr<Encoding> e, boost::intrusive_ptr<PersistableMessage> p)
- : encoding(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), publisher(0),
- expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false),
- replicationId(0)
+Message::Message(boost::intrusive_ptr<SharedState> e, boost::intrusive_ptr<PersistableMessage> p)
+ : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0)
{
if (persistentContext) persistentContext->setIngressCompletion(e);
}
@@ -78,7 +75,7 @@ uint64_t Message::getMessageSize() const
boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const
{
- return encoding;
+ return sharedState;
}
namespace
@@ -106,35 +103,29 @@ void Message::addTraceId(const std::string& id)
{
std::string trace = getEncoding().getAnnotationAsString(X_QPID_TRACE);
if (trace.empty()) {
- annotations[X_QPID_TRACE] = id;
+ addAnnotation(X_QPID_TRACE, id);
} else if (trace.find(id) == std::string::npos) {
trace += ",";
trace += id;
- annotations[X_QPID_TRACE] = trace;
+ addAnnotation(X_QPID_TRACE, trace);
}
- annotationsChanged();
}
void Message::clearTrace()
{
- annotations[X_QPID_TRACE] = std::string();
- annotationsChanged();
+ addAnnotation(X_QPID_TRACE, std::string());
}
uint64_t Message::getTimestamp() const
{
- return encoding ? encoding->getTimestamp() : 0;
+ return sharedState ? sharedState->getTimestamp() : 0;
}
uint64_t Message::getTtl() const
{
uint64_t ttl;
- if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) {
- sys::AbsTime current(
- expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
- sys::Duration ttl(current, getExpiration());
- // convert from ns to ms; set to 1 if expired
- return (int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
+ if (getTtl(ttl, 1)/*set to 1 if expired*/) {
+ return ttl;
} else {
return 0;
}
@@ -142,35 +133,24 @@ uint64_t Message::getTtl() const
bool Message::getTtl(uint64_t& ttl) const
{
- if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) {
- sys::Duration remaining(sys::AbsTime::now(), getExpiration());
- // convert from ns to ms; set to 0 if expired
- ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : 0);
- return true;
- } else {
- return false;
- }
+ return getTtl(ttl, 0); //set to 0 if expired
}
-void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
+bool Message::getTtl(uint64_t& ttl, uint64_t expiredValue) const
{
- //TODO: this is still quite 0-10 specific...
- uint64_t ttl;
- if (getEncoding().getTtl(ttl)) {
- if (e) {
- // Use higher resolution time for the internal expiry calculation.
- // Prevent overflow as a signed int64_t
- Duration duration(std::min(ttl * TIME_MSEC,
- (uint64_t) std::numeric_limits<int64_t>::max()));
- expiration = AbsTime(e->getCurrentTime(), duration);
- setExpiryPolicy(e);
- }
+ if (sharedState->getTtl(ttl) && sharedState->getExpiration() < FAR_FUTURE) {
+ sys::Duration remaining = sharedState->getTimeToExpiration();
+ // convert from ns to ms
+ ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : expiredValue);
+ return true;
+ } else {
+ return false;
}
}
void Message::addAnnotation(const std::string& key, const qpid::types::Variant& value)
{
- annotations[key] = value;
+ annotations.get()[key] = value;
annotationsChanged();
}
@@ -178,19 +158,15 @@ void Message::annotationsChanged()
{
if (persistentContext) {
uint64_t id = persistentContext->getPersistenceId();
- persistentContext = persistentContext->merge(annotations);
- persistentContext->setIngressCompletion(encoding);
+ persistentContext = persistentContext->merge(getAnnotations());
+ persistentContext->setIngressCompletion(sharedState);
persistentContext->setPersistenceId(id);
}
}
-void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
- expiryPolicy = e;
-}
-
bool Message::hasExpired() const
{
- return expiryPolicy && expiryPolicy->hasExpired(*this);
+ return sharedState->hasExpired(*this);
}
uint8_t Message::getPriority() const
@@ -198,12 +174,12 @@ uint8_t Message::getPriority() const
return getEncoding().getPriority();
}
-bool Message::getIsManagementMessage() const { return isManagementMessage; }
-void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
+bool Message::getIsManagementMessage() const { return sharedState->getIsManagementMessage(); }
-const Connection* Message::getPublisher() const { return publisher; }
-void Message::setPublisher(const Connection& p) { publisher = &p; }
-bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher); }
+const Connection* Message::getPublisher() const { return sharedState->getPublisher(); }
+bool Message::isLocalTo(const OwnershipToken* token) const {
+ return token && sharedState->getPublisher() && token->isLocal(sharedState->getPublisher());
+}
qpid::framing::SequenceNumber Message::getSequence() const
@@ -223,16 +199,20 @@ void Message::setState(MessageState s)
{
state = s;
}
+namespace {
+const qpid::types::Variant::Map EMPTY_MAP;
+}
const qpid::types::Variant::Map& Message::getAnnotations() const
{
- return annotations;
+ return annotations ? *annotations : EMPTY_MAP;
}
qpid::types::Variant Message::getAnnotation(const std::string& key) const
{
- qpid::types::Variant::Map::const_iterator i = annotations.find(key);
- if (i != annotations.end()) return i->second;
+ const qpid::types::Variant::Map& a = getAnnotations();
+ qpid::types::Variant::Map::const_iterator i = a.find(key);
+ if (i != a.end()) return i->second;
//FIXME: modify Encoding interface to allow retrieval of
//annotations of different types from the message data as received
//off the wire
@@ -241,30 +221,30 @@ qpid::types::Variant Message::getAnnotation(const std::string& key) const
std::string Message::getUserId() const
{
- return encoding->getUserId();
+ return sharedState->getUserId();
}
-Message::Encoding& Message::getEncoding()
+Message::SharedState& Message::getSharedState()
{
- return *encoding;
+ return *sharedState;
}
const Message::Encoding& Message::getEncoding() const
{
- return *encoding;
+ return *sharedState;
}
Message::operator bool() const
{
- return !!encoding;
+ return !!sharedState;
}
std::string Message::getContent() const
{
- return encoding->getContent();
+ return sharedState->getContent();
}
std::string Message::getPropertyAsString(const std::string& key) const
{
- return encoding->getPropertyAsString(key);
+ return sharedState->getPropertyAsString(key);
}
namespace {
class PropertyRetriever : public MapHandler
@@ -308,7 +288,7 @@ class PropertyRetriever : public MapHandler
qpid::types::Variant Message::getProperty(const std::string& key) const
{
PropertyRetriever r(key);
- encoding->processProperties(r);
+ sharedState->processProperties(r);
return r.getResult();
}
@@ -319,12 +299,79 @@ boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const
void Message::processProperties(MapHandler& handler) const
{
- encoding->processProperties(handler);
+ sharedState->processProperties(handler);
}
uint64_t Message::getReplicationId() const { return replicationId; }
void Message::setReplicationId(framing::SequenceNumber id) { replicationId = id; }
+sys::AbsTime Message::getExpiration() const
+{
+ return sharedState->getExpiration();
+}
+
+Message::SharedStateImpl::SharedStateImpl() : publisher(0), expiration(qpid::sys::FAR_FUTURE), isManagementMessage(false) {}
+
+const Connection* Message::SharedStateImpl::getPublisher() const
+{
+ return publisher;
+}
+
+void Message::SharedStateImpl::setPublisher(const Connection* p)
+{
+ publisher = p;
+}
+
+sys::AbsTime Message::SharedStateImpl::getExpiration() const
+{
+ return expiration;
+}
+
+void Message::SharedStateImpl::setExpiration(sys::AbsTime e)
+{
+ expiration = e;
+}
+
+sys::Duration Message::SharedStateImpl::getTimeToExpiration() const
+{
+ sys::AbsTime current(expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
+ return sys::Duration(current, expiration);
+}
+
+void Message::SharedStateImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
+{
+ //TODO: this is still quite 0-10 specific...
+ uint64_t ttl;
+ if (getTtl(ttl)) {
+ if (e) {
+ // Use higher resolution time for the internal expiry calculation.
+ // Prevent overflow as a signed int64_t
+ Duration duration(std::min(ttl * TIME_MSEC,
+ (uint64_t) std::numeric_limits<int64_t>::max()));
+ expiration = AbsTime(e->getCurrentTime(), duration);
+ expiryPolicy = e;
+ }
+ }
+}
+
+bool Message::SharedStateImpl::hasExpired(const Message& m) const
+{
+ return expiryPolicy && expiryPolicy->hasExpired(m);
+}
+
+void Message::SharedStateImpl::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e)
+{
+ expiryPolicy = e;
+}
+
+bool Message::SharedStateImpl::getIsManagementMessage() const
+{
+ return isManagementMessage;
+}
+void Message::SharedStateImpl::setIsManagementMessage(bool b)
+{
+ isManagementMessage = b;
+}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index ecd84901f9..f1829a17bc 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -35,6 +35,7 @@
#include <string>
#include <vector>
#include <boost/intrusive_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
namespace qpid {
namespace amqp {
@@ -80,7 +81,46 @@ public:
virtual uint64_t getTimestamp() const = 0;
};
- QPID_BROKER_EXTERN Message(boost::intrusive_ptr<Encoding>, boost::intrusive_ptr<PersistableMessage>);
+ class SharedState : public Encoding
+ {
+ public:
+ virtual ~SharedState() {}
+ virtual const Connection* getPublisher() const = 0;
+ virtual void setPublisher(const Connection* p) = 0;
+
+ virtual void setExpiration(sys::AbsTime e) = 0;
+ virtual sys::AbsTime getExpiration() const = 0;
+ virtual sys::Duration getTimeToExpiration() const = 0;
+ virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&) = 0;
+ virtual bool hasExpired(const Message& m) const = 0;
+ virtual void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) = 0;
+
+ virtual bool getIsManagementMessage() const = 0;
+ virtual void setIsManagementMessage(bool b) = 0;
+ };
+
+ class SharedStateImpl : public SharedState
+ {
+ const Connection* publisher;
+ qpid::sys::AbsTime expiration;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ bool isManagementMessage;
+ public:
+ SharedStateImpl();
+ virtual ~SharedStateImpl() {}
+ QPID_BROKER_EXTERN const Connection* getPublisher() const;
+ QPID_BROKER_EXTERN void setPublisher(const Connection* p);
+ QPID_BROKER_EXTERN void setExpiration(sys::AbsTime e);
+ QPID_BROKER_EXTERN sys::AbsTime getExpiration() const;
+ QPID_BROKER_EXTERN sys::Duration getTimeToExpiration() const;
+ QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ QPID_BROKER_EXTERN bool hasExpired(const Message& m) const;
+ QPID_BROKER_EXTERN void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ QPID_BROKER_EXTERN bool getIsManagementMessage() const;
+ QPID_BROKER_EXTERN void setIsManagementMessage(bool b);
+ };
+
+ QPID_BROKER_EXTERN Message(boost::intrusive_ptr<SharedState>, boost::intrusive_ptr<PersistableMessage>);
QPID_BROKER_EXTERN Message();
QPID_BROKER_EXTERN ~Message();
@@ -91,20 +131,14 @@ public:
int getDeliveryCount() const { return deliveryCount; }
void resetDeliveryCount() { deliveryCount = -1; alreadyAcquired = false; }
- QPID_BROKER_EXTERN void setPublisher(const Connection& p);
const Connection* getPublisher() const;
bool isLocalTo(const OwnershipToken*) const;
QPID_BROKER_EXTERN std::string getRoutingKey() const;
QPID_BROKER_EXTERN bool isPersistent() const;
- /** determine msg expiration time using the TTL value if present */
- QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
- void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
-
bool hasExpired() const;
- sys::AbsTime getExpiration() const { return expiration; }
- void setExpiration(sys::AbsTime exp) { expiration = exp; }
+ QPID_BROKER_EXTERN sys::AbsTime getExpiration() const;
uint64_t getTtl() const;
QPID_BROKER_EXTERN bool getTtl(uint64_t&) const;
@@ -121,12 +155,11 @@ public:
QPID_BROKER_EXTERN uint64_t getMessageSize() const;
- QPID_BROKER_EXTERN Encoding& getEncoding();
QPID_BROKER_EXTERN const Encoding& getEncoding() const;
QPID_BROKER_EXTERN operator bool() const;
+ QPID_BROKER_EXTERN SharedState& getSharedState();
bool getIsManagementMessage() const;
- void setIsManagementMessage(bool b);
QPID_BROKER_EXTERN qpid::framing::SequenceNumber getSequence() const;
QPID_BROKER_EXTERN void setSequence(const qpid::framing::SequenceNumber&);
@@ -146,21 +179,51 @@ public:
QPID_BROKER_EXTERN void setReplicationId(framing::SequenceNumber id);
private:
- boost::intrusive_ptr<Encoding> encoding;
+ /**
+ * Template for optional members that are only constructed when
+ * if/when needed, to conserve memory. (Boost::optional doesn't
+ * help here).
+ */
+ template <typename T> class Optional
+ {
+ boost::scoped_ptr<T> value;
+ public:
+ Optional() : value(0) {}
+ Optional(const Optional<T>& o) : value(o.value ? new T(*o.value) : 0) {}
+ T& get()
+ {
+ if (!value) value.reset(new T);
+ return *value;
+ }
+ const T& operator*() const
+ {
+ return *value;
+ }
+ Optional<T>& operator=(const Optional<T>& o)
+ {
+ if (o.value) {
+ if (!value) value.reset(new T(*o.value));
+ }
+ return *this;
+ }
+ operator bool() const
+ {
+ return value;
+ }
+ };
+
+
+ boost::intrusive_ptr<SharedState> sharedState;
boost::intrusive_ptr<PersistableMessage> persistentContext;
int deliveryCount;
bool alreadyAcquired;
- const Connection* publisher;
- qpid::sys::AbsTime expiration;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- uint64_t timestamp;
- qpid::types::Variant::Map annotations;
- bool isManagementMessage;
+ Optional<qpid::types::Variant::Map> annotations;
MessageState state;
qpid::framing::SequenceNumber sequence;
framing::SequenceNumber replicationId;
void annotationsChanged();
+ bool getTtl(uint64_t&, uint64_t expiredValue) const;
};
}}
diff --git a/cpp/src/qpid/broker/PagedQueue.cpp b/cpp/src/qpid/broker/PagedQueue.cpp
index afb330489b..a63f6d2485 100644
--- a/cpp/src/qpid/broker/PagedQueue.cpp
+++ b/cpp/src/qpid/broker/PagedQueue.cpp
@@ -78,8 +78,8 @@ size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_
msg.getPersistentContext()->setPersistenceId(persistenceId);
if (t) {
sys::AbsTime expiration(EPOCH, t);
- msg.setExpiryPolicy(expiryPolicy);
- msg.setExpiration(expiration);
+ msg.getSharedState().setExpiryPolicy(expiryPolicy);
+ msg.getSharedState().setExpiration(expiration);
}
return encoded + metadata.getPosition();
}
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 5a11db81bb..846dc9eb2a 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -185,7 +185,7 @@ void RecoverableMessageImpl::setRedelivered()
void RecoverableMessageImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep)
{
- msg.computeExpiration(ep);
+ msg.getSharedState().computeExpiration(ep);
}
Message RecoverableMessageImpl::getMessage()
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 82fa3a8f19..4aba596b62 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -482,8 +482,6 @@ TxBuffer* SemanticState::getTxBuffer()
}
void SemanticState::route(Message& msg, Deliverable& strategy) {
- msg.computeExpiration(getSession().getBroker().getExpiryPolicy());
-
std::string exchangeName = qpid::broker::amqp_0_10::MessageTransfer::get(msg).getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 0c343e5d90..2b849e945f 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -219,7 +219,8 @@ void SessionState::handleContent(AMQFrame& frame)
DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer());
if (broker.isTimestamping())
msg->setTimestamp();
- deliverable.getMessage().setPublisher(getConnection());
+ msg->setPublisher(&(getConnection()));
+ msg->computeExpiration(getBroker().getExpiryPolicy());
IncompleteIngressMsgXfer xfer(this, msg);
diff --git a/cpp/src/qpid/broker/amqp/Incoming.cpp b/cpp/src/qpid/broker/amqp/Incoming.cpp
index a1556f5249..889f410443 100644
--- a/cpp/src/qpid/broker/amqp/Incoming.cpp
+++ b/cpp/src/qpid/broker/amqp/Incoming.cpp
@@ -134,11 +134,11 @@ void DecodingIncoming::readable(pn_delivery_t* delivery)
received->scan();
pn_link_advance(link);
+ received->setPublisher(&session->getParent());
+ received->computeExpiration(expiryPolicy);
qpid::broker::Message message(received, received);
- message.setPublisher(session->getParent());
userid.verify(message.getUserId());
- message.computeExpiration(expiryPolicy);
handle(message);
--window;
received->begin();
diff --git a/cpp/src/qpid/broker/amqp/Message.h b/cpp/src/qpid/broker/amqp/Message.h
index 161b1dc6f1..025e07c792 100644
--- a/cpp/src/qpid/broker/amqp/Message.h
+++ b/cpp/src/qpid/broker/amqp/Message.h
@@ -37,7 +37,7 @@ namespace amqp {
/**
* Represents an AMQP 1.0 format message
*/
-class Message : public qpid::broker::Message::Encoding, private qpid::amqp::MessageReader, public qpid::broker::PersistableMessage
+class Message : public qpid::broker::Message::SharedStateImpl, private qpid::amqp::MessageReader, public qpid::broker::PersistableMessage
{
public:
//Encoding interface:
diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
index b55f8577a4..513bbe1bfb 100644
--- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
+++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
@@ -35,7 +35,7 @@ namespace amqp_0_10 {
/**
*
*/
-class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::broker::PersistableMessage
+class MessageTransfer : public qpid::broker::Message::SharedStateImpl, public qpid::broker::PersistableMessage
{
public:
QPID_BROKER_EXTERN MessageTransfer();
@@ -116,7 +116,7 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro
static bool isImmediateDeliveryRequired(const qpid::broker::Message& message);
static MessageTransfer& get(qpid::broker::Message& message) {
- return *dynamic_cast<MessageTransfer*>(&message.getEncoding());
+ return *dynamic_cast<MessageTransfer*>(&message.getSharedState());
}
static const MessageTransfer& get(const qpid::broker::Message& message) {
return *dynamic_cast<const MessageTransfer*>(&message.getEncoding());
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index d4a07f407c..772fe3c64e 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -565,8 +565,8 @@ void ManagementAgent::sendBuffer(Buffer& buf,
dp->setRoutingKey(routingKey);
transfer->getFrames().append(content);
+ transfer->setIsManagementMessage(true);
Message msg(transfer, transfer);
- msg.setIsManagementMessage(true);
sendQueue->push(make_pair(exchange, msg));
buf.reset();
}
@@ -632,9 +632,9 @@ void ManagementAgent::sendBuffer(const string& data,
}
transfer->getFrames().append(content);
transfer->computeRequiredCredit();
+ transfer->setIsManagementMessage(true);
+ transfer->computeExpiration(broker->getExpiryPolicy());
Message msg(transfer, transfer);
- msg.setIsManagementMessage(true);
- msg.computeExpiration(broker->getExpiryPolicy());
sendQueue->push(make_pair(exchange, msg));
}
diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h
index a6a5aa65fd..2c205da20f 100644
--- a/cpp/src/tests/MessageUtils.h
+++ b/cpp/src/tests/MessageUtils.h
@@ -107,6 +107,7 @@ struct MessageUtils
AMQFrame data((AMQContentBody(content)));
msg->getFrames().append(data);
}
+ if (ttl) msg->computeExpiration(new broker::ExpiryPolicy);
return Message(msg, msg);
}
};
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index c44483f8dd..99c6880b26 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -188,7 +188,6 @@ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTt
{
for (uint i = 0; i < count; i++) {
Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
- m.computeExpiration(new broker::ExpiryPolicy);
queue.deliver(m);
}
}