diff options
author | Gordon Sim <gsim@apache.org> | 2014-05-23 16:10:27 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2014-05-23 16:10:27 +0000 |
commit | 0c839dc7a1ca94f91dfda29134c3feffe3d6d3d5 (patch) | |
tree | fbe6e22c3d6d29dac44645ece91b682d4e84c8a8 | |
parent | 8f8d83c8a6a078ec1595e2c58eed7ab00ce080c4 (diff) | |
download | qpid-python-0c839dc7a1ca94f91dfda29134c3feffe3d6d3d5.tar.gz |
QPID-5783: Share immutable state between copies of a message. Avoid using memory for annotations unless actually required.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1597121 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 175 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 97 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PagedQueue.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Incoming.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Message.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/MessageUtils.h | 1 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 1 |
12 files changed, 205 insertions, 96 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 049b583335..11c8c13e12 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -46,14 +46,11 @@ using std::string; namespace qpid { namespace broker { -Message::Message() : deliveryCount(-1), alreadyAcquired(false), publisher(0), expiration(FAR_FUTURE), timestamp(0), - isManagementMessage(false), replicationId(0) +Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0) {} -Message::Message(boost::intrusive_ptr<Encoding> e, boost::intrusive_ptr<PersistableMessage> p) - : encoding(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), publisher(0), - expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false), - replicationId(0) +Message::Message(boost::intrusive_ptr<SharedState> e, boost::intrusive_ptr<PersistableMessage> p) + : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0) { if (persistentContext) persistentContext->setIngressCompletion(e); } @@ -78,7 +75,7 @@ uint64_t Message::getMessageSize() const boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const { - return encoding; + return sharedState; } namespace @@ -106,35 +103,29 @@ void Message::addTraceId(const std::string& id) { std::string trace = getEncoding().getAnnotationAsString(X_QPID_TRACE); if (trace.empty()) { - annotations[X_QPID_TRACE] = id; + addAnnotation(X_QPID_TRACE, id); } else if (trace.find(id) == std::string::npos) { trace += ","; trace += id; - annotations[X_QPID_TRACE] = trace; + addAnnotation(X_QPID_TRACE, trace); } - annotationsChanged(); } void Message::clearTrace() { - annotations[X_QPID_TRACE] = std::string(); - annotationsChanged(); + addAnnotation(X_QPID_TRACE, std::string()); } uint64_t Message::getTimestamp() const { - return encoding ? encoding->getTimestamp() : 0; + return sharedState ? sharedState->getTimestamp() : 0; } uint64_t Message::getTtl() const { uint64_t ttl; - if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) { - sys::AbsTime current( - expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); - sys::Duration ttl(current, getExpiration()); - // convert from ns to ms; set to 1 if expired - return (int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1); + if (getTtl(ttl, 1)/*set to 1 if expired*/) { + return ttl; } else { return 0; } @@ -142,35 +133,24 @@ uint64_t Message::getTtl() const bool Message::getTtl(uint64_t& ttl) const { - if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) { - sys::Duration remaining(sys::AbsTime::now(), getExpiration()); - // convert from ns to ms; set to 0 if expired - ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : 0); - return true; - } else { - return false; - } + return getTtl(ttl, 0); //set to 0 if expired } -void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) +bool Message::getTtl(uint64_t& ttl, uint64_t expiredValue) const { - //TODO: this is still quite 0-10 specific... - uint64_t ttl; - if (getEncoding().getTtl(ttl)) { - if (e) { - // Use higher resolution time for the internal expiry calculation. - // Prevent overflow as a signed int64_t - Duration duration(std::min(ttl * TIME_MSEC, - (uint64_t) std::numeric_limits<int64_t>::max())); - expiration = AbsTime(e->getCurrentTime(), duration); - setExpiryPolicy(e); - } + if (sharedState->getTtl(ttl) && sharedState->getExpiration() < FAR_FUTURE) { + sys::Duration remaining = sharedState->getTimeToExpiration(); + // convert from ns to ms + ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : expiredValue); + return true; + } else { + return false; } } void Message::addAnnotation(const std::string& key, const qpid::types::Variant& value) { - annotations[key] = value; + annotations.get()[key] = value; annotationsChanged(); } @@ -178,19 +158,15 @@ void Message::annotationsChanged() { if (persistentContext) { uint64_t id = persistentContext->getPersistenceId(); - persistentContext = persistentContext->merge(annotations); - persistentContext->setIngressCompletion(encoding); + persistentContext = persistentContext->merge(getAnnotations()); + persistentContext->setIngressCompletion(sharedState); persistentContext->setPersistenceId(id); } } -void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { - expiryPolicy = e; -} - bool Message::hasExpired() const { - return expiryPolicy && expiryPolicy->hasExpired(*this); + return sharedState->hasExpired(*this); } uint8_t Message::getPriority() const @@ -198,12 +174,12 @@ uint8_t Message::getPriority() const return getEncoding().getPriority(); } -bool Message::getIsManagementMessage() const { return isManagementMessage; } -void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } +bool Message::getIsManagementMessage() const { return sharedState->getIsManagementMessage(); } -const Connection* Message::getPublisher() const { return publisher; } -void Message::setPublisher(const Connection& p) { publisher = &p; } -bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher); } +const Connection* Message::getPublisher() const { return sharedState->getPublisher(); } +bool Message::isLocalTo(const OwnershipToken* token) const { + return token && sharedState->getPublisher() && token->isLocal(sharedState->getPublisher()); +} qpid::framing::SequenceNumber Message::getSequence() const @@ -223,16 +199,20 @@ void Message::setState(MessageState s) { state = s; } +namespace { +const qpid::types::Variant::Map EMPTY_MAP; +} const qpid::types::Variant::Map& Message::getAnnotations() const { - return annotations; + return annotations ? *annotations : EMPTY_MAP; } qpid::types::Variant Message::getAnnotation(const std::string& key) const { - qpid::types::Variant::Map::const_iterator i = annotations.find(key); - if (i != annotations.end()) return i->second; + const qpid::types::Variant::Map& a = getAnnotations(); + qpid::types::Variant::Map::const_iterator i = a.find(key); + if (i != a.end()) return i->second; //FIXME: modify Encoding interface to allow retrieval of //annotations of different types from the message data as received //off the wire @@ -241,30 +221,30 @@ qpid::types::Variant Message::getAnnotation(const std::string& key) const std::string Message::getUserId() const { - return encoding->getUserId(); + return sharedState->getUserId(); } -Message::Encoding& Message::getEncoding() +Message::SharedState& Message::getSharedState() { - return *encoding; + return *sharedState; } const Message::Encoding& Message::getEncoding() const { - return *encoding; + return *sharedState; } Message::operator bool() const { - return !!encoding; + return !!sharedState; } std::string Message::getContent() const { - return encoding->getContent(); + return sharedState->getContent(); } std::string Message::getPropertyAsString(const std::string& key) const { - return encoding->getPropertyAsString(key); + return sharedState->getPropertyAsString(key); } namespace { class PropertyRetriever : public MapHandler @@ -308,7 +288,7 @@ class PropertyRetriever : public MapHandler qpid::types::Variant Message::getProperty(const std::string& key) const { PropertyRetriever r(key); - encoding->processProperties(r); + sharedState->processProperties(r); return r.getResult(); } @@ -319,12 +299,79 @@ boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const void Message::processProperties(MapHandler& handler) const { - encoding->processProperties(handler); + sharedState->processProperties(handler); } uint64_t Message::getReplicationId() const { return replicationId; } void Message::setReplicationId(framing::SequenceNumber id) { replicationId = id; } +sys::AbsTime Message::getExpiration() const +{ + return sharedState->getExpiration(); +} + +Message::SharedStateImpl::SharedStateImpl() : publisher(0), expiration(qpid::sys::FAR_FUTURE), isManagementMessage(false) {} + +const Connection* Message::SharedStateImpl::getPublisher() const +{ + return publisher; +} + +void Message::SharedStateImpl::setPublisher(const Connection* p) +{ + publisher = p; +} + +sys::AbsTime Message::SharedStateImpl::getExpiration() const +{ + return expiration; +} + +void Message::SharedStateImpl::setExpiration(sys::AbsTime e) +{ + expiration = e; +} + +sys::Duration Message::SharedStateImpl::getTimeToExpiration() const +{ + sys::AbsTime current(expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); + return sys::Duration(current, expiration); +} + +void Message::SharedStateImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) +{ + //TODO: this is still quite 0-10 specific... + uint64_t ttl; + if (getTtl(ttl)) { + if (e) { + // Use higher resolution time for the internal expiry calculation. + // Prevent overflow as a signed int64_t + Duration duration(std::min(ttl * TIME_MSEC, + (uint64_t) std::numeric_limits<int64_t>::max())); + expiration = AbsTime(e->getCurrentTime(), duration); + expiryPolicy = e; + } + } +} + +bool Message::SharedStateImpl::hasExpired(const Message& m) const +{ + return expiryPolicy && expiryPolicy->hasExpired(m); +} + +void Message::SharedStateImpl::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) +{ + expiryPolicy = e; +} + +bool Message::SharedStateImpl::getIsManagementMessage() const +{ + return isManagementMessage; +} +void Message::SharedStateImpl::setIsManagementMessage(bool b) +{ + isManagementMessage = b; +} }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index ecd84901f9..f1829a17bc 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -35,6 +35,7 @@ #include <string> #include <vector> #include <boost/intrusive_ptr.hpp> +#include <boost/scoped_ptr.hpp> namespace qpid { namespace amqp { @@ -80,7 +81,46 @@ public: virtual uint64_t getTimestamp() const = 0; }; - QPID_BROKER_EXTERN Message(boost::intrusive_ptr<Encoding>, boost::intrusive_ptr<PersistableMessage>); + class SharedState : public Encoding + { + public: + virtual ~SharedState() {} + virtual const Connection* getPublisher() const = 0; + virtual void setPublisher(const Connection* p) = 0; + + virtual void setExpiration(sys::AbsTime e) = 0; + virtual sys::AbsTime getExpiration() const = 0; + virtual sys::Duration getTimeToExpiration() const = 0; + virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&) = 0; + virtual bool hasExpired(const Message& m) const = 0; + virtual void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) = 0; + + virtual bool getIsManagementMessage() const = 0; + virtual void setIsManagementMessage(bool b) = 0; + }; + + class SharedStateImpl : public SharedState + { + const Connection* publisher; + qpid::sys::AbsTime expiration; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + bool isManagementMessage; + public: + SharedStateImpl(); + virtual ~SharedStateImpl() {} + QPID_BROKER_EXTERN const Connection* getPublisher() const; + QPID_BROKER_EXTERN void setPublisher(const Connection* p); + QPID_BROKER_EXTERN void setExpiration(sys::AbsTime e); + QPID_BROKER_EXTERN sys::AbsTime getExpiration() const; + QPID_BROKER_EXTERN sys::Duration getTimeToExpiration() const; + QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e); + QPID_BROKER_EXTERN bool hasExpired(const Message& m) const; + QPID_BROKER_EXTERN void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); + QPID_BROKER_EXTERN bool getIsManagementMessage() const; + QPID_BROKER_EXTERN void setIsManagementMessage(bool b); + }; + + QPID_BROKER_EXTERN Message(boost::intrusive_ptr<SharedState>, boost::intrusive_ptr<PersistableMessage>); QPID_BROKER_EXTERN Message(); QPID_BROKER_EXTERN ~Message(); @@ -91,20 +131,14 @@ public: int getDeliveryCount() const { return deliveryCount; } void resetDeliveryCount() { deliveryCount = -1; alreadyAcquired = false; } - QPID_BROKER_EXTERN void setPublisher(const Connection& p); const Connection* getPublisher() const; bool isLocalTo(const OwnershipToken*) const; QPID_BROKER_EXTERN std::string getRoutingKey() const; QPID_BROKER_EXTERN bool isPersistent() const; - /** determine msg expiration time using the TTL value if present */ - QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e); - void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); - bool hasExpired() const; - sys::AbsTime getExpiration() const { return expiration; } - void setExpiration(sys::AbsTime exp) { expiration = exp; } + QPID_BROKER_EXTERN sys::AbsTime getExpiration() const; uint64_t getTtl() const; QPID_BROKER_EXTERN bool getTtl(uint64_t&) const; @@ -121,12 +155,11 @@ public: QPID_BROKER_EXTERN uint64_t getMessageSize() const; - QPID_BROKER_EXTERN Encoding& getEncoding(); QPID_BROKER_EXTERN const Encoding& getEncoding() const; QPID_BROKER_EXTERN operator bool() const; + QPID_BROKER_EXTERN SharedState& getSharedState(); bool getIsManagementMessage() const; - void setIsManagementMessage(bool b); QPID_BROKER_EXTERN qpid::framing::SequenceNumber getSequence() const; QPID_BROKER_EXTERN void setSequence(const qpid::framing::SequenceNumber&); @@ -146,21 +179,51 @@ public: QPID_BROKER_EXTERN void setReplicationId(framing::SequenceNumber id); private: - boost::intrusive_ptr<Encoding> encoding; + /** + * Template for optional members that are only constructed when + * if/when needed, to conserve memory. (Boost::optional doesn't + * help here). + */ + template <typename T> class Optional + { + boost::scoped_ptr<T> value; + public: + Optional() : value(0) {} + Optional(const Optional<T>& o) : value(o.value ? new T(*o.value) : 0) {} + T& get() + { + if (!value) value.reset(new T); + return *value; + } + const T& operator*() const + { + return *value; + } + Optional<T>& operator=(const Optional<T>& o) + { + if (o.value) { + if (!value) value.reset(new T(*o.value)); + } + return *this; + } + operator bool() const + { + return value; + } + }; + + + boost::intrusive_ptr<SharedState> sharedState; boost::intrusive_ptr<PersistableMessage> persistentContext; int deliveryCount; bool alreadyAcquired; - const Connection* publisher; - qpid::sys::AbsTime expiration; - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - uint64_t timestamp; - qpid::types::Variant::Map annotations; - bool isManagementMessage; + Optional<qpid::types::Variant::Map> annotations; MessageState state; qpid::framing::SequenceNumber sequence; framing::SequenceNumber replicationId; void annotationsChanged(); + bool getTtl(uint64_t&, uint64_t expiredValue) const; }; }} diff --git a/cpp/src/qpid/broker/PagedQueue.cpp b/cpp/src/qpid/broker/PagedQueue.cpp index afb330489b..a63f6d2485 100644 --- a/cpp/src/qpid/broker/PagedQueue.cpp +++ b/cpp/src/qpid/broker/PagedQueue.cpp @@ -78,8 +78,8 @@ size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_ msg.getPersistentContext()->setPersistenceId(persistenceId); if (t) { sys::AbsTime expiration(EPOCH, t); - msg.setExpiryPolicy(expiryPolicy); - msg.setExpiration(expiration); + msg.getSharedState().setExpiryPolicy(expiryPolicy); + msg.getSharedState().setExpiration(expiration); } return encoded + metadata.getPosition(); } diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 5a11db81bb..846dc9eb2a 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -185,7 +185,7 @@ void RecoverableMessageImpl::setRedelivered() void RecoverableMessageImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep) { - msg.computeExpiration(ep); + msg.getSharedState().computeExpiration(ep); } Message RecoverableMessageImpl::getMessage() diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 82fa3a8f19..4aba596b62 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -482,8 +482,6 @@ TxBuffer* SemanticState::getTxBuffer() } void SemanticState::route(Message& msg, Deliverable& strategy) { - msg.computeExpiration(getSession().getBroker().getExpiryPolicy()); - std::string exchangeName = qpid::broker::amqp_0_10::MessageTransfer::get(msg).getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) cacheExchange = session.getBroker().getExchanges().get(exchangeName); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 0c343e5d90..2b849e945f 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -219,7 +219,8 @@ void SessionState::handleContent(AMQFrame& frame) DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer()); if (broker.isTimestamping()) msg->setTimestamp(); - deliverable.getMessage().setPublisher(getConnection()); + msg->setPublisher(&(getConnection())); + msg->computeExpiration(getBroker().getExpiryPolicy()); IncompleteIngressMsgXfer xfer(this, msg); diff --git a/cpp/src/qpid/broker/amqp/Incoming.cpp b/cpp/src/qpid/broker/amqp/Incoming.cpp index a1556f5249..889f410443 100644 --- a/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -134,11 +134,11 @@ void DecodingIncoming::readable(pn_delivery_t* delivery) received->scan(); pn_link_advance(link); + received->setPublisher(&session->getParent()); + received->computeExpiration(expiryPolicy); qpid::broker::Message message(received, received); - message.setPublisher(session->getParent()); userid.verify(message.getUserId()); - message.computeExpiration(expiryPolicy); handle(message); --window; received->begin(); diff --git a/cpp/src/qpid/broker/amqp/Message.h b/cpp/src/qpid/broker/amqp/Message.h index 161b1dc6f1..025e07c792 100644 --- a/cpp/src/qpid/broker/amqp/Message.h +++ b/cpp/src/qpid/broker/amqp/Message.h @@ -37,7 +37,7 @@ namespace amqp { /** * Represents an AMQP 1.0 format message */ -class Message : public qpid::broker::Message::Encoding, private qpid::amqp::MessageReader, public qpid::broker::PersistableMessage +class Message : public qpid::broker::Message::SharedStateImpl, private qpid::amqp::MessageReader, public qpid::broker::PersistableMessage { public: //Encoding interface: diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h index b55f8577a4..513bbe1bfb 100644 --- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h +++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h @@ -35,7 +35,7 @@ namespace amqp_0_10 { /** * */ -class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::broker::PersistableMessage +class MessageTransfer : public qpid::broker::Message::SharedStateImpl, public qpid::broker::PersistableMessage { public: QPID_BROKER_EXTERN MessageTransfer(); @@ -116,7 +116,7 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro static bool isImmediateDeliveryRequired(const qpid::broker::Message& message); static MessageTransfer& get(qpid::broker::Message& message) { - return *dynamic_cast<MessageTransfer*>(&message.getEncoding()); + return *dynamic_cast<MessageTransfer*>(&message.getSharedState()); } static const MessageTransfer& get(const qpid::broker::Message& message) { return *dynamic_cast<const MessageTransfer*>(&message.getEncoding()); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index d4a07f407c..772fe3c64e 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -565,8 +565,8 @@ void ManagementAgent::sendBuffer(Buffer& buf, dp->setRoutingKey(routingKey); transfer->getFrames().append(content); + transfer->setIsManagementMessage(true); Message msg(transfer, transfer); - msg.setIsManagementMessage(true); sendQueue->push(make_pair(exchange, msg)); buf.reset(); } @@ -632,9 +632,9 @@ void ManagementAgent::sendBuffer(const string& data, } transfer->getFrames().append(content); transfer->computeRequiredCredit(); + transfer->setIsManagementMessage(true); + transfer->computeExpiration(broker->getExpiryPolicy()); Message msg(transfer, transfer); - msg.setIsManagementMessage(true); - msg.computeExpiration(broker->getExpiryPolicy()); sendQueue->push(make_pair(exchange, msg)); } diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h index a6a5aa65fd..2c205da20f 100644 --- a/cpp/src/tests/MessageUtils.h +++ b/cpp/src/tests/MessageUtils.h @@ -107,6 +107,7 @@ struct MessageUtils AMQFrame data((AMQContentBody(content))); msg->getFrames().append(data); } + if (ttl) msg->computeExpiration(new broker::ExpiryPolicy); return Message(msg, msg); } }; diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index c44483f8dd..99c6880b26 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -188,7 +188,6 @@ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTt { for (uint i = 0; i < count; i++) { Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl); - m.computeExpiration(new broker::ExpiryPolicy); queue.deliver(m); } } |