diff options
author | Pavel Moravec <pmoravec@apache.org> | 2014-05-28 07:16:57 +0000 |
---|---|---|
committer | Pavel Moravec <pmoravec@apache.org> | 2014-05-28 07:16:57 +0000 |
commit | 6a7554a2ceb02bcc8f3f81f40d2421672aab60b9 (patch) | |
tree | 2826acca3cfcdd6da65f9afae74d23db8a3877c8 | |
parent | 14a61e05779a40ae102e9ba1783d69bef968e527 (diff) | |
download | qpid-python-6a7554a2ceb02bcc8f3f81f40d2421672aab60b9.tar.gz |
QPID-5748: [C++ broker] Make Queue::purgeExpired more efficient; remove ExpiryPolicy
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1597931 13f79535-47bb-0310-9956-ffa450edef68
22 files changed, 39 insertions, 163 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index b363421da7..4dd73013fb 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -1134,7 +1134,6 @@ set (qpidbroker_SOURCES qpid/broker/Broker.cpp qpid/broker/Credit.cpp qpid/broker/Exchange.cpp - qpid/broker/ExpiryPolicy.cpp qpid/broker/Fairshare.cpp qpid/broker/MessageDeque.cpp qpid/broker/MessageMap.cpp diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index da45d4bef9..c3a45992c7 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -34,7 +34,6 @@ #include "qpid/broker/SecureConnectionFactory.h" #include "qpid/broker/TopicExchange.h" #include "qpid/broker/Link.h" -#include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/PersistableObject.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/QueueSettings.h" @@ -236,7 +235,6 @@ Broker::Broker(const Broker::Options& conf) : *this), queueCleaner(queues, poller, timer.get()), recoveryInProgress(false), - expiryPolicy(new ExpiryPolicy), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (!dataDir.isEnabled()) { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 5d1e241be9..b70adb1503 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -63,7 +63,6 @@ struct Url; namespace broker { class AclModule; -class ExpiryPolicy; class Message; struct QueueSettings; @@ -201,7 +200,6 @@ class Broker : public sys::Runnable, public Plugin::Target, const Message& msg); std::string federationTag; bool recoveryInProgress; - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConsumerFactories consumerFactories; ProtocolRegistry protocolRegistry; ObjectFactoryRegistry objectFactory; @@ -248,9 +246,6 @@ class Broker : public sys::Runnable, public Plugin::Target, ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; } ObjectFactoryRegistry& getObjectFactoryRegistry() { return objectFactory; } - void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } - boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; } - SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } diff --git a/cpp/src/qpid/broker/ExpiryPolicy.cpp b/cpp/src/qpid/broker/ExpiryPolicy.cpp deleted file mode 100644 index 687eac7817..0000000000 --- a/cpp/src/qpid/broker/ExpiryPolicy.cpp +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/ExpiryPolicy.h" -#include "qpid/broker/Message.h" -#include "qpid/sys/Time.h" - -namespace qpid { -namespace broker { - -ExpiryPolicy::~ExpiryPolicy() {} - -bool ExpiryPolicy::hasExpired(const Message& m) { - return m.getExpiration() < sys::AbsTime::now(); -} - -sys::AbsTime ExpiryPolicy::getCurrentTime() { - return sys::AbsTime::now(); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ExpiryPolicy.h b/cpp/src/qpid/broker/ExpiryPolicy.h deleted file mode 100644 index 1fb41ccd29..0000000000 --- a/cpp/src/qpid/broker/ExpiryPolicy.h +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef QPID_BROKER_EXPIRYPOLICY_H -#define QPID_BROKER_EXPIRYPOLICY_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/RefCounted.h" -#include "qpid/broker/BrokerImportExport.h" - -namespace qpid { - -namespace sys { -class AbsTime; -} - -namespace broker { - -class Message; - -/** - * Default expiry policy. - */ -class QPID_BROKER_CLASS_EXTERN ExpiryPolicy : public RefCounted -{ - public: - QPID_BROKER_EXTERN virtual ~ExpiryPolicy(); - QPID_BROKER_EXTERN virtual bool hasExpired(const Message&); - QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime(); -}; -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_EXPIRYPOLICY_H*/ diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 11c8c13e12..7e15ac1ad2 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -164,11 +164,6 @@ void Message::annotationsChanged() } } -bool Message::hasExpired() const -{ - return sharedState->hasExpired(*this); -} - uint8_t Message::getPriority() const { return getEncoding().getPriority(); @@ -335,36 +330,22 @@ void Message::SharedStateImpl::setExpiration(sys::AbsTime e) sys::Duration Message::SharedStateImpl::getTimeToExpiration() const { - sys::AbsTime current(expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); - return sys::Duration(current, expiration); + return sys::Duration(sys::AbsTime::now(), expiration); } -void Message::SharedStateImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) +void Message::SharedStateImpl::computeExpiration() { //TODO: this is still quite 0-10 specific... uint64_t ttl; if (getTtl(ttl)) { - if (e) { - // Use higher resolution time for the internal expiry calculation. - // Prevent overflow as a signed int64_t - Duration duration(std::min(ttl * TIME_MSEC, - (uint64_t) std::numeric_limits<int64_t>::max())); - expiration = AbsTime(e->getCurrentTime(), duration); - expiryPolicy = e; - } + // Use higher resolution time for the internal expiry calculation. + // Prevent overflow as a signed int64_t + Duration duration(std::min(ttl * TIME_MSEC, + (uint64_t) std::numeric_limits<int64_t>::max())); + expiration = AbsTime(sys::AbsTime::now(), duration); } } -bool Message::SharedStateImpl::hasExpired(const Message& m) const -{ - return expiryPolicy && expiryPolicy->hasExpired(m); -} - -void Message::SharedStateImpl::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) -{ - expiryPolicy = e; -} - bool Message::SharedStateImpl::getIsManagementMessage() const { return isManagementMessage; diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index f1829a17bc..fbe341f113 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -23,7 +23,6 @@ */ #include "qpid/RefCounted.h" -#include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/PersistableMessage.h" //TODO: move the following out of framing or replace it #include "qpid/framing/SequenceNumber.h" @@ -91,9 +90,7 @@ public: virtual void setExpiration(sys::AbsTime e) = 0; virtual sys::AbsTime getExpiration() const = 0; virtual sys::Duration getTimeToExpiration() const = 0; - virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&) = 0; - virtual bool hasExpired(const Message& m) const = 0; - virtual void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) = 0; + virtual void computeExpiration() = 0; virtual bool getIsManagementMessage() const = 0; virtual void setIsManagementMessage(bool b) = 0; @@ -103,7 +100,6 @@ public: { const Connection* publisher; qpid::sys::AbsTime expiration; - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; bool isManagementMessage; public: SharedStateImpl(); @@ -113,9 +109,7 @@ public: QPID_BROKER_EXTERN void setExpiration(sys::AbsTime e); QPID_BROKER_EXTERN sys::AbsTime getExpiration() const; QPID_BROKER_EXTERN sys::Duration getTimeToExpiration() const; - QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e); - QPID_BROKER_EXTERN bool hasExpired(const Message& m) const; - QPID_BROKER_EXTERN void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); + QPID_BROKER_EXTERN void computeExpiration(); QPID_BROKER_EXTERN bool getIsManagementMessage() const; QPID_BROKER_EXTERN void setIsManagementMessage(bool b); }; @@ -137,7 +131,6 @@ public: QPID_BROKER_EXTERN std::string getRoutingKey() const; QPID_BROKER_EXTERN bool isPersistent() const; - bool hasExpired() const; QPID_BROKER_EXTERN sys::AbsTime getExpiration() const; uint64_t getTtl() const; QPID_BROKER_EXTERN bool getTtl(uint64_t&) const; diff --git a/cpp/src/qpid/broker/PagedQueue.cpp b/cpp/src/qpid/broker/PagedQueue.cpp index a63f6d2485..521bd542f7 100644 --- a/cpp/src/qpid/broker/PagedQueue.cpp +++ b/cpp/src/qpid/broker/PagedQueue.cpp @@ -62,8 +62,7 @@ size_t encode(const Message& msg, char* data, size_t size) return required; } -size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size, - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy) +size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size) { qpid::framing::Buffer metadata(const_cast<char*>(data), size); uint32_t encoded = metadata.getLong(); @@ -78,7 +77,6 @@ size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_ msg.getPersistentContext()->setPersistenceId(persistenceId); if (t) { sys::AbsTime expiration(EPOCH, t); - msg.getSharedState().setExpiryPolicy(expiryPolicy); msg.getSharedState().setExpiration(expiration); } return encoded + metadata.getPosition(); @@ -86,10 +84,8 @@ size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_ } -PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p, - boost::intrusive_ptr<ExpiryPolicy> e) - : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0), - expiryPolicy(e) +PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p) + : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0) { if (directory.empty()) { throw qpid::Exception(QPID_MSG("Cannot create paged queue: No paged queue directory specified")); @@ -322,7 +318,7 @@ Message* PagedQueue::Page::find(qpid::framing::SequenceNumber position) //if it is the last in the page, decrement the hint count of the page } -void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols, boost::intrusive_ptr<ExpiryPolicy> expiryPolicy) +void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols) { QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ", size=" << size); assert(region == 0); @@ -336,7 +332,7 @@ void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols, //decode messages into Page::messages for (size_t i = 0; i < count; ++i) { Message message; - used += decode(protocols, message, region + used, size - used, expiryPolicy); + used += decode(protocols, message, region + used, size - used); if (!contents.contains(message.getSequence())) { message.setState(DELETED); QPID_LOG(debug, "Setting state to deleted for message loaded at " << message.getSequence()); @@ -389,7 +385,7 @@ void PagedQueue::load(Page& page) assert(i != used.rend()); unload(i->second); } - page.load(file, protocols, expiryPolicy); + page.load(file, protocols); ++loaded; QPID_LOG(debug, "PagedQueue[" << name << "] loaded page, " << loaded << " pages now loaded"); } diff --git a/cpp/src/qpid/broker/PagedQueue.h b/cpp/src/qpid/broker/PagedQueue.h index e4c98f4119..c8a9f13fc7 100644 --- a/cpp/src/qpid/broker/PagedQueue.h +++ b/cpp/src/qpid/broker/PagedQueue.h @@ -31,15 +31,13 @@ namespace qpid { namespace broker { -class ExpiryPolicy; class ProtocolRegistry; /** * */ class PagedQueue : public Messages { public: - PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols, - boost::intrusive_ptr<ExpiryPolicy>); + PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols); ~PagedQueue(); size_t size(); bool deleted(const QueueCursor&); @@ -62,7 +60,7 @@ class PagedQueue : public Messages { bool add(const Message&); Message* next(uint32_t version, QueueCursor&); Message* find(qpid::framing::SequenceNumber); - void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&, boost::intrusive_ptr<ExpiryPolicy>); + void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&); void unload(qpid::sys::MemoryMappedFile&); void clear(qpid::sys::MemoryMappedFile&); size_t available() const; @@ -88,7 +86,6 @@ class PagedQueue : public Messages { std::list<Page> free; uint loaded; uint32_t version; - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;//needed on reload void addPages(size_t count); Page& newPage(qpid::framing::SequenceNumber); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index fd99629492..78ee37607e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -413,7 +413,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) QueueCursor cursor = c->getCursor(); // Save current position. Message* msg = messages->next(*c); // Advances c. if (msg) { - if (msg->hasExpired()) { + if (msg->getExpiration() < sys::AbsTime::now()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); //ERROR: don't hold lock across call to store!! @@ -616,6 +616,13 @@ void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, cons } } +namespace{ +bool hasExpired(const Message& m, AbsTime now) +{ + return m.getExpiration() < now; +} +} + /** *@param lapse: time since the last purgeExpired */ @@ -627,7 +634,8 @@ void Queue::purgeExpired(sys::Duration lapse) { dequeueSincePurge -= count; int seconds = int64_t(lapse)/qpid::sys::TIME_SEC; if (seconds == 0 || count / seconds < 1) { - uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER, settings.autodelete); + sys::AbsTime time = sys::AbsTime::now(); + uint32_t count = remove(0, boost::bind(&hasExpired, _1, time), 0, CONSUMER, settings.autodelete); QPID_LOG(debug, "Purged " << count << " expired messages from " << getName()); // // Report the count of discarded-by-ttl messages diff --git a/cpp/src/qpid/broker/QueueFactory.cpp b/cpp/src/qpid/broker/QueueFactory.cpp index b4eccd646d..4ce1b796f7 100644 --- a/cpp/src/qpid/broker/QueueFactory.cpp +++ b/cpp/src/qpid/broker/QueueFactory.cpp @@ -82,7 +82,7 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getPagingDir().getPath(), settings.maxPages ? settings.maxPages : DEFAULT_MAX_PAGES, settings.pageFactor ? settings.pageFactor : DEFAULT_PAGE_FACTOR, - broker->getProtocolRegistry(), broker->getExpiryPolicy())); + broker->getProtocolRegistry())); } } else if (settings.lvqKey.empty()) {//LVQ already handled above queue->messages = std::auto_ptr<Messages>(new MessageDeque()); diff --git a/cpp/src/qpid/broker/RecoverableMessage.h b/cpp/src/qpid/broker/RecoverableMessage.h index 888dc364e9..3c82a69883 100644 --- a/cpp/src/qpid/broker/RecoverableMessage.h +++ b/cpp/src/qpid/broker/RecoverableMessage.h @@ -29,7 +29,6 @@ namespace qpid { namespace broker { -class ExpiryPolicy; class Message; /** * The interface through which messages are reloaded on recovery. @@ -40,7 +39,7 @@ public: typedef boost::shared_ptr<RecoverableMessage> shared_ptr; virtual void setPersistenceId(uint64_t id) = 0; virtual void setRedelivered() = 0; - virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) = 0; + virtual void computeExpiration() = 0; /** * Used by store to determine whether to load content on recovery * or let message load its own content as and when it requires it. diff --git a/cpp/src/qpid/broker/RecoverableMessageImpl.h b/cpp/src/qpid/broker/RecoverableMessageImpl.h index 6f0bd8e4a9..c257c2057b 100644 --- a/cpp/src/qpid/broker/RecoverableMessageImpl.h +++ b/cpp/src/qpid/broker/RecoverableMessageImpl.h @@ -37,7 +37,7 @@ public: ~RecoverableMessageImpl() {}; void setPersistenceId(uint64_t id); void setRedelivered(); - void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep); + void computeExpiration(); bool loadContent(uint64_t available); void decodeContent(framing::Buffer& buffer); void recover(boost::shared_ptr<Queue> queue); diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 846dc9eb2a..dd9bfc57f6 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -183,9 +183,9 @@ void RecoverableMessageImpl::setRedelivered() msg.deliver();//increment delivery count (but at present that isn't recorded durably) } -void RecoverableMessageImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep) +void RecoverableMessageImpl::computeExpiration() { - msg.getSharedState().computeExpiration(ep); + msg.getSharedState().computeExpiration(); } Message RecoverableMessageImpl::getMessage() diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 2b849e945f..8b57bd9e71 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -220,7 +220,7 @@ void SessionState::handleContent(AMQFrame& frame) if (broker.isTimestamping()) msg->setTimestamp(); msg->setPublisher(&(getConnection())); - msg->computeExpiration(getBroker().getExpiryPolicy()); + msg->computeExpiration(); IncompleteIngressMsgXfer xfer(this, msg); diff --git a/cpp/src/qpid/broker/amqp/Incoming.cpp b/cpp/src/qpid/broker/amqp/Incoming.cpp index 889f410443..96a20b5df6 100644 --- a/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -107,7 +107,7 @@ namespace { } DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name) - : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()), expiryPolicy(broker.getExpiryPolicy()) {} + : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()) {} DecodingIncoming::~DecodingIncoming() {} void DecodingIncoming::readable(pn_delivery_t* delivery) @@ -135,7 +135,7 @@ void DecodingIncoming::readable(pn_delivery_t* delivery) received->scan(); pn_link_advance(link); received->setPublisher(&session->getParent()); - received->computeExpiration(expiryPolicy); + received->computeExpiration(); qpid::broker::Message message(received, received); userid.verify(message.getUserId()); diff --git a/cpp/src/qpid/broker/amqp/Incoming.h b/cpp/src/qpid/broker/amqp/Incoming.h index 807b918a17..38b9b3a919 100644 --- a/cpp/src/qpid/broker/amqp/Incoming.h +++ b/cpp/src/qpid/broker/amqp/Incoming.h @@ -77,7 +77,6 @@ class DecodingIncoming : public Incoming virtual void handle(qpid::broker::Message&) = 0; private: boost::shared_ptr<Session> session; - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; boost::intrusive_ptr<Message> partial; }; diff --git a/cpp/src/qpid/legacystore/MessageStoreImpl.cpp b/cpp/src/qpid/legacystore/MessageStoreImpl.cpp index 7863940534..7a1fea95d5 100644 --- a/cpp/src/qpid/legacystore/MessageStoreImpl.cpp +++ b/cpp/src/qpid/legacystore/MessageStoreImpl.cpp @@ -980,7 +980,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, // become optional depending on that information. msg->setRedelivered(); // Reset the TTL for the recovered message - msg->computeExpiration(broker->getExpiryPolicy()); + msg->computeExpiration(); u_int32_t contentOffset = headerSize + preambleLength; u_int64_t contentSize = readSize - contentOffset; diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index ff5b41b962..b7c6672c61 100644 --- a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -921,7 +921,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, // become optional depending on that information. msg->setRedelivered(); // Reset the TTL for the recovered message - msg->computeExpiration(broker->getExpiryPolicy()); + msg->computeExpiration(); uint32_t contentOffset = headerSize + preambleLength; uint64_t contentSize = dbuffSize - contentOffset; diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 772fe3c64e..968caba760 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -633,7 +633,7 @@ void ManagementAgent::sendBuffer(const string& data, transfer->getFrames().append(content); transfer->computeRequiredCredit(); transfer->setIsManagementMessage(true); - transfer->computeExpiration(broker->getExpiryPolicy()); + transfer->computeExpiration(); Message msg(transfer, transfer); sendQueue->push(make_pair(exchange, msg)); diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h index 2c205da20f..f05b0d8b20 100644 --- a/cpp/src/tests/MessageUtils.h +++ b/cpp/src/tests/MessageUtils.h @@ -107,7 +107,7 @@ struct MessageUtils AMQFrame data((AMQContentBody(content))); msg->getFrames().append(data); } - if (ttl) msg->computeExpiration(new broker::ExpiryPolicy); + if (ttl) msg->computeExpiration(); return Message(msg, msg); } }; diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 99c6880b26..ee9d37e76d 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -30,7 +30,6 @@ #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/NullMessageStore.h" -#include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" |