summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/Broker.cpp10
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/Queue.cpp13
-rw-r--r--cpp/src/qpid/broker/Queue.h1
-rw-r--r--cpp/src/qpid/broker/QueueCleaner.cpp52
-rw-r--r--cpp/src/qpid/broker/QueueCleaner.h57
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp40
-rw-r--r--cpp/src/tests/QueueTest.cpp39
9 files changed, 216 insertions, 1 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 528236dd62..e37e511120 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -300,6 +300,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/BrokerSingleton.cpp \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
+ qpid/broker/QueueCleaner.cpp \
qpid/broker/PersistableMessage.cpp \
qpid/broker/Bridge.cpp \
qpid/broker/Connection.cpp \
@@ -430,6 +431,7 @@ nobase_include_HEADERS = \
qpid/broker/SessionAdapter.h \
qpid/broker/Exchange.h \
qpid/broker/Queue.h \
+ qpid/broker/QueueCleaner.h \
qpid/broker/BrokerSingleton.h \
qpid/broker/Bridge.h \
qpid/broker/Connection.h \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 410c5cdef0..d401436d38 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -87,6 +87,7 @@ Broker::Options::Options(const std::string& name) :
stagingThreshold(5000000),
enableMgmt(1),
mgmtPubInterval(10),
+ queueCleanInterval(60*10),//10 minutes
auth(AUTH_DEFAULT),
realm("QPID"),
replayFlushLimit(0),
@@ -114,6 +115,8 @@ Broker::Options::Options(const std::string& name) :
("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
+ ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
+ "Interval between attempts to purge any expired messages from queues")
("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted")
("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication")
("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)")
@@ -142,7 +145,8 @@ Broker::Broker(const Broker::Options& conf) :
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
- getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
+ queueCleaner(queues, timer),
+ getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
@@ -244,6 +248,10 @@ Broker::Broker(const Broker::Options& conf) :
i != plugins.end();
i++)
(*i)->initialize(*this);
+
+ if (conf.queueCleanInterval) {
+ queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
+ }
}
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index c0c74a1493..97a4a36eca 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -31,6 +31,7 @@
#include "QueueRegistry.h"
#include "LinkRegistry.h"
#include "SessionManager.h"
+#include "QueueCleaner.h"
#include "Vhost.h"
#include "System.h"
#include "Timer.h"
@@ -90,6 +91,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
uint64_t stagingThreshold;
bool enableMgmt;
uint16_t mgmtPubInterval;
+ uint16_t queueCleanInterval;
bool auth;
std::string realm;
size_t replayFlushLimit;
@@ -120,6 +122,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
qmf::org::apache::qpid::broker::Broker* mgmtObject;
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
+ QueueCleaner queueCleaner;
void declareStandardExchange(const std::string& name, const std::string& type);
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 315da23965..ccd7c1fa3e 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -403,6 +403,19 @@ QueuedMessage Queue::get(){
return msg;
}
+void Queue::purgeExpired()
+{
+ Mutex::ScopedLock locker(messageLock);
+ for (Messages::iterator i = messages.begin(); i != messages.end(); ) {
+ if (i->payload->hasExpired()) {
+ dequeue(0, *i);
+ i = messages.erase(i);
+ } else {
+ ++i;
+ }
+ }
+}
+
/**
* purge - for purging all or some messages on a queue
* depending on the purge_request
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 9a7c90181c..fc628bbbc0 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -189,6 +189,7 @@ namespace qpid {
void cancel(Consumer::shared_ptr c);
uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages
+ void purgeExpired();
//move qty # of messages to destination Queue destq
uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
diff --git a/cpp/src/qpid/broker/QueueCleaner.cpp b/cpp/src/qpid/broker/QueueCleaner.cpp
new file mode 100644
index 0000000000..0774dce2b7
--- /dev/null
+++ b/cpp/src/qpid/broker/QueueCleaner.cpp
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueCleaner.h"
+
+#include "Broker.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+
+QueueCleaner::QueueCleaner(QueueRegistry& q, Timer& t) : queues(q), timer(t) {}
+
+void QueueCleaner::start(qpid::sys::Duration p)
+{
+ task = boost::intrusive_ptr<TimerTask>(new Task(*this, p));
+ timer.add(task);
+}
+
+QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : TimerTask(d), parent(p) {}
+
+void QueueCleaner::Task::fire()
+{
+ parent.fired();
+}
+
+void QueueCleaner::fired()
+{
+ queues.eachQueue(boost::bind(&Queue::purgeExpired, _1));
+ task->reset();
+ timer.add(task);
+}
+
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/QueueCleaner.h b/cpp/src/qpid/broker/QueueCleaner.h
new file mode 100644
index 0000000000..7903266f5f
--- /dev/null
+++ b/cpp/src/qpid/broker/QueueCleaner.h
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_QUEUECLEANER_H
+#define QPID_BROKER_QUEUECLEANER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Timer.h"
+
+namespace qpid {
+namespace broker {
+
+class QueueRegistry;
+/**
+ * TimerTask to purge expired messages from queues
+ */
+class QueueCleaner
+{
+ public:
+ QueueCleaner(QueueRegistry& queues, Timer& timer);
+ void start(qpid::sys::Duration period);
+ private:
+ class Task : public TimerTask
+ {
+ public:
+ Task(QueueCleaner& parent, qpid::sys::Duration duration);
+ void fire();
+ private:
+ QueueCleaner& parent;
+ };
+
+ boost::intrusive_ptr<TimerTask> task;
+ QueueRegistry& queues;
+ Timer& timer;
+
+ void fired();
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUECLEANER_H*/
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 85497ace5d..440605a2e4 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -42,6 +42,7 @@ using namespace qpid;
using qpid::sys::Monitor;
using qpid::sys::Thread;
using qpid::sys::TIME_SEC;
+using qpid::broker::Broker;
using std::string;
using std::cout;
using std::endl;
@@ -94,6 +95,8 @@ struct SimpleListener : public MessageListener
struct ClientSessionFixture : public ProxySessionFixture
{
+ ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {}
+
void declareSubscribe(const string& q="my-queue",
const string& dest="my-dest")
{
@@ -282,6 +285,43 @@ QPID_AUTO_TEST_CASE(testOpenFailure) {
BOOST_CHECK(!c.isOpen());
}
+QPID_AUTO_TEST_CASE(testPeriodicExpiration) {
+ Broker::Options opts;
+ opts.queueCleanInterval = 1;
+ ClientSessionFixture fix(opts);
+ fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
+
+ for (uint i = 0; i < 10; i++) {
+ Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
+ if (i % 2) m.getDeliveryProperties().setTtl(500);
+ fix.session.messageTransfer(arg::content=m);
+ }
+
+ BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u);
+ sleep(2);
+ BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u);
+}
+
+QPID_AUTO_TEST_CASE(testExpirationOnPop) {
+ ClientSessionFixture fix;
+ fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true);
+
+ for (uint i = 0; i < 10; i++) {
+ Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue");
+ if (i % 2) m.getDeliveryProperties().setTtl(200);
+ fix.session.messageTransfer(arg::content=m);
+ }
+
+ ::usleep(300* 1000);
+
+ for (uint i = 0; i < 10; i++) {
+ if (i % 2) continue;
+ Message m;
+ BOOST_CHECK(fix.subs.get(m, "my-queue", TIME_SEC));
+ BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData());
+ }
+}
+
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index ccc2fc2391..ef8aa69dd6 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -20,6 +20,7 @@
*/
#include "unit_test.h"
#include "qpid/Exception.h"
+#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -460,6 +461,44 @@ QPID_AUTO_TEST_CASE(testLVQSaftyCheck){
}
+void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
+{
+ for (uint i = 0; i < count; i++) {
+ intrusive_ptr<Message> m = message("exchange", "key");
+ if (i % 2) {
+ if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
+ } else {
+ if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl);
+ }
+ m->setTimestamp();
+ queue.deliver(m);
+ }
+}
+
+QPID_AUTO_TEST_CASE(testPurgeExpired) {
+ Queue queue("my-queue");
+ addMessagesToQueue(10, queue);
+ BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
+ ::usleep(300*1000);
+ queue.purgeExpired();
+ BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
+}
+
+QPID_AUTO_TEST_CASE(testQueueCleaner) {
+ Timer timer;
+ QueueRegistry queues;
+ Queue::shared_ptr queue = queues.declare("my-queue").first;
+ addMessagesToQueue(10, *queue, 200, 400);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
+
+ QueueCleaner cleaner(queues, timer);
+ cleaner.start(100 * qpid::sys::TIME_MSEC);
+ ::usleep(300*1000);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
+ ::usleep(300*1000);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
+}
+
QPID_AUTO_TEST_SUITE_END()