summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/CMakeLists.txt1
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/qpid/broker/Broker.h5
-rw-r--r--cpp/src/qpid/broker/ExpiryPolicy.cpp38
-rw-r--r--cpp/src/qpid/broker/ExpiryPolicy.h50
-rw-r--r--cpp/src/qpid/broker/Message.cpp33
-rw-r--r--cpp/src/qpid/broker/Message.h11
-rw-r--r--cpp/src/qpid/broker/PagedQueue.cpp16
-rw-r--r--cpp/src/qpid/broker/PagedQueue.h7
-rw-r--r--cpp/src/qpid/broker/Queue.cpp12
-rw-r--r--cpp/src/qpid/broker/QueueFactory.cpp2
-rw-r--r--cpp/src/qpid/broker/RecoverableMessage.h3
-rw-r--r--cpp/src/qpid/broker/RecoverableMessageImpl.h2
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp4
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--cpp/src/qpid/broker/amqp/Incoming.cpp4
-rw-r--r--cpp/src/qpid/broker/amqp/Incoming.h1
-rw-r--r--cpp/src/qpid/legacystore/MessageStoreImpl.cpp2
-rw-r--r--cpp/src/qpid/linearstore/MessageStoreImpl.cpp2
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp2
-rw-r--r--cpp/src/tests/MessageUtils.h2
-rw-r--r--cpp/src/tests/QueueTest.cpp1
22 files changed, 39 insertions, 163 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index b363421da7..4dd73013fb 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -1134,7 +1134,6 @@ set (qpidbroker_SOURCES
qpid/broker/Broker.cpp
qpid/broker/Credit.cpp
qpid/broker/Exchange.cpp
- qpid/broker/ExpiryPolicy.cpp
qpid/broker/Fairshare.cpp
qpid/broker/MessageDeque.cpp
qpid/broker/MessageMap.cpp
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index da45d4bef9..c3a45992c7 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -34,7 +34,6 @@
#include "qpid/broker/SecureConnectionFactory.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/Link.h"
-#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/PersistableObject.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
@@ -236,7 +235,6 @@ Broker::Broker(const Broker::Options& conf) :
*this),
queueCleaner(queues, poller, timer.get()),
recoveryInProgress(false),
- expiryPolicy(new ExpiryPolicy),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if (!dataDir.isEnabled()) {
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 5d1e241be9..b70adb1503 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -63,7 +63,6 @@ struct Url;
namespace broker {
class AclModule;
-class ExpiryPolicy;
class Message;
struct QueueSettings;
@@ -201,7 +200,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
const Message& msg);
std::string federationTag;
bool recoveryInProgress;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConsumerFactories consumerFactories;
ProtocolRegistry protocolRegistry;
ObjectFactoryRegistry objectFactory;
@@ -248,9 +246,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; }
ObjectFactoryRegistry& getObjectFactoryRegistry() { return objectFactory; }
- void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
- boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
-
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
diff --git a/cpp/src/qpid/broker/ExpiryPolicy.cpp b/cpp/src/qpid/broker/ExpiryPolicy.cpp
deleted file mode 100644
index 687eac7817..0000000000
--- a/cpp/src/qpid/broker/ExpiryPolicy.cpp
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/ExpiryPolicy.h"
-#include "qpid/broker/Message.h"
-#include "qpid/sys/Time.h"
-
-namespace qpid {
-namespace broker {
-
-ExpiryPolicy::~ExpiryPolicy() {}
-
-bool ExpiryPolicy::hasExpired(const Message& m) {
- return m.getExpiration() < sys::AbsTime::now();
-}
-
-sys::AbsTime ExpiryPolicy::getCurrentTime() {
- return sys::AbsTime::now();
-}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ExpiryPolicy.h b/cpp/src/qpid/broker/ExpiryPolicy.h
deleted file mode 100644
index 1fb41ccd29..0000000000
--- a/cpp/src/qpid/broker/ExpiryPolicy.h
+++ /dev/null
@@ -1,50 +0,0 @@
-#ifndef QPID_BROKER_EXPIRYPOLICY_H
-#define QPID_BROKER_EXPIRYPOLICY_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/RefCounted.h"
-#include "qpid/broker/BrokerImportExport.h"
-
-namespace qpid {
-
-namespace sys {
-class AbsTime;
-}
-
-namespace broker {
-
-class Message;
-
-/**
- * Default expiry policy.
- */
-class QPID_BROKER_CLASS_EXTERN ExpiryPolicy : public RefCounted
-{
- public:
- QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
- QPID_BROKER_EXTERN virtual bool hasExpired(const Message&);
- QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime();
-};
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_EXPIRYPOLICY_H*/
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 11c8c13e12..7e15ac1ad2 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -164,11 +164,6 @@ void Message::annotationsChanged()
}
}
-bool Message::hasExpired() const
-{
- return sharedState->hasExpired(*this);
-}
-
uint8_t Message::getPriority() const
{
return getEncoding().getPriority();
@@ -335,36 +330,22 @@ void Message::SharedStateImpl::setExpiration(sys::AbsTime e)
sys::Duration Message::SharedStateImpl::getTimeToExpiration() const
{
- sys::AbsTime current(expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
- return sys::Duration(current, expiration);
+ return sys::Duration(sys::AbsTime::now(), expiration);
}
-void Message::SharedStateImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::SharedStateImpl::computeExpiration()
{
//TODO: this is still quite 0-10 specific...
uint64_t ttl;
if (getTtl(ttl)) {
- if (e) {
- // Use higher resolution time for the internal expiry calculation.
- // Prevent overflow as a signed int64_t
- Duration duration(std::min(ttl * TIME_MSEC,
- (uint64_t) std::numeric_limits<int64_t>::max()));
- expiration = AbsTime(e->getCurrentTime(), duration);
- expiryPolicy = e;
- }
+ // Use higher resolution time for the internal expiry calculation.
+ // Prevent overflow as a signed int64_t
+ Duration duration(std::min(ttl * TIME_MSEC,
+ (uint64_t) std::numeric_limits<int64_t>::max()));
+ expiration = AbsTime(sys::AbsTime::now(), duration);
}
}
-bool Message::SharedStateImpl::hasExpired(const Message& m) const
-{
- return expiryPolicy && expiryPolicy->hasExpired(m);
-}
-
-void Message::SharedStateImpl::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e)
-{
- expiryPolicy = e;
-}
-
bool Message::SharedStateImpl::getIsManagementMessage() const
{
return isManagementMessage;
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index f1829a17bc..fbe341f113 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -23,7 +23,6 @@
*/
#include "qpid/RefCounted.h"
-#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/PersistableMessage.h"
//TODO: move the following out of framing or replace it
#include "qpid/framing/SequenceNumber.h"
@@ -91,9 +90,7 @@ public:
virtual void setExpiration(sys::AbsTime e) = 0;
virtual sys::AbsTime getExpiration() const = 0;
virtual sys::Duration getTimeToExpiration() const = 0;
- virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&) = 0;
- virtual bool hasExpired(const Message& m) const = 0;
- virtual void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) = 0;
+ virtual void computeExpiration() = 0;
virtual bool getIsManagementMessage() const = 0;
virtual void setIsManagementMessage(bool b) = 0;
@@ -103,7 +100,6 @@ public:
{
const Connection* publisher;
qpid::sys::AbsTime expiration;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
bool isManagementMessage;
public:
SharedStateImpl();
@@ -113,9 +109,7 @@ public:
QPID_BROKER_EXTERN void setExpiration(sys::AbsTime e);
QPID_BROKER_EXTERN sys::AbsTime getExpiration() const;
QPID_BROKER_EXTERN sys::Duration getTimeToExpiration() const;
- QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
- QPID_BROKER_EXTERN bool hasExpired(const Message& m) const;
- QPID_BROKER_EXTERN void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ QPID_BROKER_EXTERN void computeExpiration();
QPID_BROKER_EXTERN bool getIsManagementMessage() const;
QPID_BROKER_EXTERN void setIsManagementMessage(bool b);
};
@@ -137,7 +131,6 @@ public:
QPID_BROKER_EXTERN std::string getRoutingKey() const;
QPID_BROKER_EXTERN bool isPersistent() const;
- bool hasExpired() const;
QPID_BROKER_EXTERN sys::AbsTime getExpiration() const;
uint64_t getTtl() const;
QPID_BROKER_EXTERN bool getTtl(uint64_t&) const;
diff --git a/cpp/src/qpid/broker/PagedQueue.cpp b/cpp/src/qpid/broker/PagedQueue.cpp
index a63f6d2485..521bd542f7 100644
--- a/cpp/src/qpid/broker/PagedQueue.cpp
+++ b/cpp/src/qpid/broker/PagedQueue.cpp
@@ -62,8 +62,7 @@ size_t encode(const Message& msg, char* data, size_t size)
return required;
}
-size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size,
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
+size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size)
{
qpid::framing::Buffer metadata(const_cast<char*>(data), size);
uint32_t encoded = metadata.getLong();
@@ -78,7 +77,6 @@ size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_
msg.getPersistentContext()->setPersistenceId(persistenceId);
if (t) {
sys::AbsTime expiration(EPOCH, t);
- msg.getSharedState().setExpiryPolicy(expiryPolicy);
msg.getSharedState().setExpiration(expiration);
}
return encoded + metadata.getPosition();
@@ -86,10 +84,8 @@ size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_
}
-PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p,
- boost::intrusive_ptr<ExpiryPolicy> e)
- : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0),
- expiryPolicy(e)
+PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p)
+ : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0)
{
if (directory.empty()) {
throw qpid::Exception(QPID_MSG("Cannot create paged queue: No paged queue directory specified"));
@@ -322,7 +318,7 @@ Message* PagedQueue::Page::find(qpid::framing::SequenceNumber position)
//if it is the last in the page, decrement the hint count of the page
}
-void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols, boost::intrusive_ptr<ExpiryPolicy> expiryPolicy)
+void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols)
{
QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ", size=" << size);
assert(region == 0);
@@ -336,7 +332,7 @@ void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols,
//decode messages into Page::messages
for (size_t i = 0; i < count; ++i) {
Message message;
- used += decode(protocols, message, region + used, size - used, expiryPolicy);
+ used += decode(protocols, message, region + used, size - used);
if (!contents.contains(message.getSequence())) {
message.setState(DELETED);
QPID_LOG(debug, "Setting state to deleted for message loaded at " << message.getSequence());
@@ -389,7 +385,7 @@ void PagedQueue::load(Page& page)
assert(i != used.rend());
unload(i->second);
}
- page.load(file, protocols, expiryPolicy);
+ page.load(file, protocols);
++loaded;
QPID_LOG(debug, "PagedQueue[" << name << "] loaded page, " << loaded << " pages now loaded");
}
diff --git a/cpp/src/qpid/broker/PagedQueue.h b/cpp/src/qpid/broker/PagedQueue.h
index e4c98f4119..c8a9f13fc7 100644
--- a/cpp/src/qpid/broker/PagedQueue.h
+++ b/cpp/src/qpid/broker/PagedQueue.h
@@ -31,15 +31,13 @@
namespace qpid {
namespace broker {
-class ExpiryPolicy;
class ProtocolRegistry;
/**
*
*/
class PagedQueue : public Messages {
public:
- PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols,
- boost::intrusive_ptr<ExpiryPolicy>);
+ PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols);
~PagedQueue();
size_t size();
bool deleted(const QueueCursor&);
@@ -62,7 +60,7 @@ class PagedQueue : public Messages {
bool add(const Message&);
Message* next(uint32_t version, QueueCursor&);
Message* find(qpid::framing::SequenceNumber);
- void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&, boost::intrusive_ptr<ExpiryPolicy>);
+ void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&);
void unload(qpid::sys::MemoryMappedFile&);
void clear(qpid::sys::MemoryMappedFile&);
size_t available() const;
@@ -88,7 +86,6 @@ class PagedQueue : public Messages {
std::list<Page> free;
uint loaded;
uint32_t version;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;//needed on reload
void addPages(size_t count);
Page& newPage(qpid::framing::SequenceNumber);
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index fd99629492..78ee37607e 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -413,7 +413,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
QueueCursor cursor = c->getCursor(); // Save current position.
Message* msg = messages->next(*c); // Advances c.
if (msg) {
- if (msg->hasExpired()) {
+ if (msg->getExpiration() < sys::AbsTime::now()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
//ERROR: don't hold lock across call to store!!
@@ -616,6 +616,13 @@ void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, cons
}
}
+namespace{
+bool hasExpired(const Message& m, AbsTime now)
+{
+ return m.getExpiration() < now;
+}
+}
+
/**
*@param lapse: time since the last purgeExpired
*/
@@ -627,7 +634,8 @@ void Queue::purgeExpired(sys::Duration lapse) {
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
- uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER, settings.autodelete);
+ sys::AbsTime time = sys::AbsTime::now();
+ uint32_t count = remove(0, boost::bind(&hasExpired, _1, time), 0, CONSUMER, settings.autodelete);
QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
//
// Report the count of discarded-by-ttl messages
diff --git a/cpp/src/qpid/broker/QueueFactory.cpp b/cpp/src/qpid/broker/QueueFactory.cpp
index b4eccd646d..4ce1b796f7 100644
--- a/cpp/src/qpid/broker/QueueFactory.cpp
+++ b/cpp/src/qpid/broker/QueueFactory.cpp
@@ -82,7 +82,7 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que
queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getPagingDir().getPath(),
settings.maxPages ? settings.maxPages : DEFAULT_MAX_PAGES,
settings.pageFactor ? settings.pageFactor : DEFAULT_PAGE_FACTOR,
- broker->getProtocolRegistry(), broker->getExpiryPolicy()));
+ broker->getProtocolRegistry()));
}
} else if (settings.lvqKey.empty()) {//LVQ already handled above
queue->messages = std::auto_ptr<Messages>(new MessageDeque());
diff --git a/cpp/src/qpid/broker/RecoverableMessage.h b/cpp/src/qpid/broker/RecoverableMessage.h
index 888dc364e9..3c82a69883 100644
--- a/cpp/src/qpid/broker/RecoverableMessage.h
+++ b/cpp/src/qpid/broker/RecoverableMessage.h
@@ -29,7 +29,6 @@
namespace qpid {
namespace broker {
-class ExpiryPolicy;
class Message;
/**
* The interface through which messages are reloaded on recovery.
@@ -40,7 +39,7 @@ public:
typedef boost::shared_ptr<RecoverableMessage> shared_ptr;
virtual void setPersistenceId(uint64_t id) = 0;
virtual void setRedelivered() = 0;
- virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) = 0;
+ virtual void computeExpiration() = 0;
/**
* Used by store to determine whether to load content on recovery
* or let message load its own content as and when it requires it.
diff --git a/cpp/src/qpid/broker/RecoverableMessageImpl.h b/cpp/src/qpid/broker/RecoverableMessageImpl.h
index 6f0bd8e4a9..c257c2057b 100644
--- a/cpp/src/qpid/broker/RecoverableMessageImpl.h
+++ b/cpp/src/qpid/broker/RecoverableMessageImpl.h
@@ -37,7 +37,7 @@ public:
~RecoverableMessageImpl() {};
void setPersistenceId(uint64_t id);
void setRedelivered();
- void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep);
+ void computeExpiration();
bool loadContent(uint64_t available);
void decodeContent(framing::Buffer& buffer);
void recover(boost::shared_ptr<Queue> queue);
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 846dc9eb2a..dd9bfc57f6 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -183,9 +183,9 @@ void RecoverableMessageImpl::setRedelivered()
msg.deliver();//increment delivery count (but at present that isn't recorded durably)
}
-void RecoverableMessageImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep)
+void RecoverableMessageImpl::computeExpiration()
{
- msg.getSharedState().computeExpiration(ep);
+ msg.getSharedState().computeExpiration();
}
Message RecoverableMessageImpl::getMessage()
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 2b849e945f..8b57bd9e71 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -220,7 +220,7 @@ void SessionState::handleContent(AMQFrame& frame)
if (broker.isTimestamping())
msg->setTimestamp();
msg->setPublisher(&(getConnection()));
- msg->computeExpiration(getBroker().getExpiryPolicy());
+ msg->computeExpiration();
IncompleteIngressMsgXfer xfer(this, msg);
diff --git a/cpp/src/qpid/broker/amqp/Incoming.cpp b/cpp/src/qpid/broker/amqp/Incoming.cpp
index 889f410443..96a20b5df6 100644
--- a/cpp/src/qpid/broker/amqp/Incoming.cpp
+++ b/cpp/src/qpid/broker/amqp/Incoming.cpp
@@ -107,7 +107,7 @@ namespace {
}
DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& source, const std::string& target, const std::string& name)
- : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()), expiryPolicy(broker.getExpiryPolicy()) {}
+ : Incoming(link, broker, parent, source, target, name), session(parent.shared_from_this()) {}
DecodingIncoming::~DecodingIncoming() {}
void DecodingIncoming::readable(pn_delivery_t* delivery)
@@ -135,7 +135,7 @@ void DecodingIncoming::readable(pn_delivery_t* delivery)
received->scan();
pn_link_advance(link);
received->setPublisher(&session->getParent());
- received->computeExpiration(expiryPolicy);
+ received->computeExpiration();
qpid::broker::Message message(received, received);
userid.verify(message.getUserId());
diff --git a/cpp/src/qpid/broker/amqp/Incoming.h b/cpp/src/qpid/broker/amqp/Incoming.h
index 807b918a17..38b9b3a919 100644
--- a/cpp/src/qpid/broker/amqp/Incoming.h
+++ b/cpp/src/qpid/broker/amqp/Incoming.h
@@ -77,7 +77,6 @@ class DecodingIncoming : public Incoming
virtual void handle(qpid::broker::Message&) = 0;
private:
boost::shared_ptr<Session> session;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
boost::intrusive_ptr<Message> partial;
};
diff --git a/cpp/src/qpid/legacystore/MessageStoreImpl.cpp b/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
index 7863940534..7a1fea95d5 100644
--- a/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
+++ b/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
@@ -980,7 +980,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
// become optional depending on that information.
msg->setRedelivered();
// Reset the TTL for the recovered message
- msg->computeExpiration(broker->getExpiryPolicy());
+ msg->computeExpiration();
u_int32_t contentOffset = headerSize + preambleLength;
u_int64_t contentSize = readSize - contentOffset;
diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index ff5b41b962..b7c6672c61 100644
--- a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -921,7 +921,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
// become optional depending on that information.
msg->setRedelivered();
// Reset the TTL for the recovered message
- msg->computeExpiration(broker->getExpiryPolicy());
+ msg->computeExpiration();
uint32_t contentOffset = headerSize + preambleLength;
uint64_t contentSize = dbuffSize - contentOffset;
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 772fe3c64e..968caba760 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -633,7 +633,7 @@ void ManagementAgent::sendBuffer(const string& data,
transfer->getFrames().append(content);
transfer->computeRequiredCredit();
transfer->setIsManagementMessage(true);
- transfer->computeExpiration(broker->getExpiryPolicy());
+ transfer->computeExpiration();
Message msg(transfer, transfer);
sendQueue->push(make_pair(exchange, msg));
diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h
index 2c205da20f..f05b0d8b20 100644
--- a/cpp/src/tests/MessageUtils.h
+++ b/cpp/src/tests/MessageUtils.h
@@ -107,7 +107,7 @@ struct MessageUtils
AMQFrame data((AMQContentBody(content)));
msg->getFrames().append(data);
}
- if (ttl) msg->computeExpiration(new broker::ExpiryPolicy);
+ if (ttl) msg->computeExpiration();
return Message(msg, msg);
}
};
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 99c6880b26..ee9d37e76d 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -30,7 +30,6 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/NullMessageStore.h"
-#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"