diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueCleaner.cpp | 52 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueCleaner.h | 57 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 40 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 39 |
9 files changed, 216 insertions, 1 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 528236dd62..e37e511120 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -300,6 +300,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/BrokerSingleton.cpp \ qpid/broker/Exchange.cpp \ qpid/broker/Queue.cpp \ + qpid/broker/QueueCleaner.cpp \ qpid/broker/PersistableMessage.cpp \ qpid/broker/Bridge.cpp \ qpid/broker/Connection.cpp \ @@ -430,6 +431,7 @@ nobase_include_HEADERS = \ qpid/broker/SessionAdapter.h \ qpid/broker/Exchange.h \ qpid/broker/Queue.h \ + qpid/broker/QueueCleaner.h \ qpid/broker/BrokerSingleton.h \ qpid/broker/Bridge.h \ qpid/broker/Connection.h \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 410c5cdef0..d401436d38 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -87,6 +87,7 @@ Broker::Options::Options(const std::string& name) : stagingThreshold(5000000), enableMgmt(1), mgmtPubInterval(10), + queueCleanInterval(60*10),//10 minutes auth(AUTH_DEFAULT), realm("QPID"), replayFlushLimit(0), @@ -114,6 +115,8 @@ Broker::Options::Options(const std::string& name) : ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") + ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), + "Interval between attempts to purge any expired messages from queues") ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted") ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication") ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)") @@ -142,7 +145,8 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), - getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) + queueCleaner(queues, timer), + getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); @@ -244,6 +248,10 @@ Broker::Broker(const Broker::Options& conf) : i != plugins.end(); i++) (*i)->initialize(*this); + + if (conf.queueCleanInterval) { + queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC); + } } void Broker::declareStandardExchange(const std::string& name, const std::string& type) diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index c0c74a1493..97a4a36eca 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -31,6 +31,7 @@ #include "QueueRegistry.h" #include "LinkRegistry.h" #include "SessionManager.h" +#include "QueueCleaner.h" #include "Vhost.h" #include "System.h" #include "Timer.h" @@ -90,6 +91,7 @@ class Broker : public sys::Runnable, public Plugin::Target, uint64_t stagingThreshold; bool enableMgmt; uint16_t mgmtPubInterval; + uint16_t queueCleanInterval; bool auth; std::string realm; size_t replayFlushLimit; @@ -120,6 +122,7 @@ class Broker : public sys::Runnable, public Plugin::Target, qmf::org::apache::qpid::broker::Broker* mgmtObject; Vhost::shared_ptr vhostObject; System::shared_ptr systemObject; + QueueCleaner queueCleaner; void declareStandardExchange(const std::string& name, const std::string& type); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 315da23965..ccd7c1fa3e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -403,6 +403,19 @@ QueuedMessage Queue::get(){ return msg; } +void Queue::purgeExpired() +{ + Mutex::ScopedLock locker(messageLock); + for (Messages::iterator i = messages.begin(); i != messages.end(); ) { + if (i->payload->hasExpired()) { + dequeue(0, *i); + i = messages.erase(i); + } else { + ++i; + } + } +} + /** * purge - for purging all or some messages on a queue * depending on the purge_request diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 9a7c90181c..fc628bbbc0 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -189,6 +189,7 @@ namespace qpid { void cancel(Consumer::shared_ptr c); uint32_t purge(const uint32_t purge_request = 0); //defaults to all messages + void purgeExpired(); //move qty # of messages to destination Queue destq uint32_t move(const Queue::shared_ptr destq, uint32_t qty); diff --git a/cpp/src/qpid/broker/QueueCleaner.cpp b/cpp/src/qpid/broker/QueueCleaner.cpp new file mode 100644 index 0000000000..0774dce2b7 --- /dev/null +++ b/cpp/src/qpid/broker/QueueCleaner.cpp @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "QueueCleaner.h" + +#include "Broker.h" +#include <boost/bind.hpp> + +namespace qpid { +namespace broker { + +QueueCleaner::QueueCleaner(QueueRegistry& q, Timer& t) : queues(q), timer(t) {} + +void QueueCleaner::start(qpid::sys::Duration p) +{ + task = boost::intrusive_ptr<TimerTask>(new Task(*this, p)); + timer.add(task); +} + +QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : TimerTask(d), parent(p) {} + +void QueueCleaner::Task::fire() +{ + parent.fired(); +} + +void QueueCleaner::fired() +{ + queues.eachQueue(boost::bind(&Queue::purgeExpired, _1)); + task->reset(); + timer.add(task); +} + + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/QueueCleaner.h b/cpp/src/qpid/broker/QueueCleaner.h new file mode 100644 index 0000000000..7903266f5f --- /dev/null +++ b/cpp/src/qpid/broker/QueueCleaner.h @@ -0,0 +1,57 @@ +#ifndef QPID_BROKER_QUEUECLEANER_H +#define QPID_BROKER_QUEUECLEANER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Timer.h" + +namespace qpid { +namespace broker { + +class QueueRegistry; +/** + * TimerTask to purge expired messages from queues + */ +class QueueCleaner +{ + public: + QueueCleaner(QueueRegistry& queues, Timer& timer); + void start(qpid::sys::Duration period); + private: + class Task : public TimerTask + { + public: + Task(QueueCleaner& parent, qpid::sys::Duration duration); + void fire(); + private: + QueueCleaner& parent; + }; + + boost::intrusive_ptr<TimerTask> task; + QueueRegistry& queues; + Timer& timer; + + void fired(); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_QUEUECLEANER_H*/ diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 85497ace5d..440605a2e4 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -42,6 +42,7 @@ using namespace qpid; using qpid::sys::Monitor; using qpid::sys::Thread; using qpid::sys::TIME_SEC; +using qpid::broker::Broker; using std::string; using std::cout; using std::endl; @@ -94,6 +95,8 @@ struct SimpleListener : public MessageListener struct ClientSessionFixture : public ProxySessionFixture { + ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {} + void declareSubscribe(const string& q="my-queue", const string& dest="my-dest") { @@ -282,6 +285,43 @@ QPID_AUTO_TEST_CASE(testOpenFailure) { BOOST_CHECK(!c.isOpen()); } +QPID_AUTO_TEST_CASE(testPeriodicExpiration) { + Broker::Options opts; + opts.queueCleanInterval = 1; + ClientSessionFixture fix(opts); + fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); + + for (uint i = 0; i < 10; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + if (i % 2) m.getDeliveryProperties().setTtl(500); + fix.session.messageTransfer(arg::content=m); + } + + BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u); + sleep(2); + BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u); +} + +QPID_AUTO_TEST_CASE(testExpirationOnPop) { + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); + + for (uint i = 0; i < 10; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + if (i % 2) m.getDeliveryProperties().setTtl(200); + fix.session.messageTransfer(arg::content=m); + } + + ::usleep(300* 1000); + + for (uint i = 0; i < 10; i++) { + if (i % 2) continue; + Message m; + BOOST_CHECK(fix.subs.get(m, "my-queue", TIME_SEC)); + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); + } +} + QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index ccc2fc2391..ef8aa69dd6 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -20,6 +20,7 @@ */ #include "unit_test.h" #include "qpid/Exception.h" +#include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" @@ -460,6 +461,44 @@ QPID_AUTO_TEST_CASE(testLVQSaftyCheck){ } +void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) +{ + for (uint i = 0; i < count; i++) { + intrusive_ptr<Message> m = message("exchange", "key"); + if (i % 2) { + if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl); + } else { + if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl); + } + m->setTimestamp(); + queue.deliver(m); + } +} + +QPID_AUTO_TEST_CASE(testPurgeExpired) { + Queue queue("my-queue"); + addMessagesToQueue(10, queue); + BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); + ::usleep(300*1000); + queue.purgeExpired(); + BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); +} + +QPID_AUTO_TEST_CASE(testQueueCleaner) { + Timer timer; + QueueRegistry queues; + Queue::shared_ptr queue = queues.declare("my-queue").first; + addMessagesToQueue(10, *queue, 200, 400); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); + + QueueCleaner cleaner(queues, timer); + cleaner.start(100 * qpid::sys::TIME_MSEC); + ::usleep(300*1000); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); + ::usleep(300*1000); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); +} + QPID_AUTO_TEST_SUITE_END() |