diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-03 09:04:00 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-03 09:04:00 +0000 |
commit | e7f82f3d5b39c77ee913ee4cd6b4bf1bdc8b13bc (patch) | |
tree | 287e8ce31be4594915c577045025b466d969d17b | |
parent | 953eb74e69055777252ec05e1c277ec32fed82d0 (diff) | |
download | qpid-python-e7f82f3d5b39c77ee913ee4cd6b4bf1bdc8b13bc.tar.gz |
Autodeletable shared queues are now deleted as soon as the consumer count drops to zero (i.e. there is no timeout).
This closes QPID-533.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@552751 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AutoDelete.cpp | 74 | ||||
-rw-r--r-- | cpp/src/qpid/broker/AutoDelete.h | 52 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/QueueRegistry.h | 15 |
13 files changed, 22 insertions, 158 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 4b3a223e72..9f4440323e 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -180,7 +180,6 @@ libqpidcommon_la_SOURCES = \ libqpidbroker_la_LIBADD = libqpidcommon.la -ldaemon -lboost_filesystem libqpidbroker_la_SOURCES = \ qpid/broker/AccumulatedAck.cpp \ - qpid/broker/AutoDelete.cpp \ qpid/broker/Broker.cpp \ qpid/broker/BrokerAdapter.cpp \ qpid/broker/BrokerSingleton.cpp \ @@ -241,7 +240,6 @@ libqpidclient_la_SOURCES = \ nobase_include_HEADERS = \ $(platform_hdr) \ qpid/broker/AccumulatedAck.h \ - qpid/broker/AutoDelete.h \ qpid/broker/BrokerChannel.h \ qpid/broker/BrokerExchange.h \ qpid/broker/BrokerMessage.h \ diff --git a/cpp/src/qpid/broker/AutoDelete.cpp b/cpp/src/qpid/broker/AutoDelete.cpp deleted file mode 100644 index 6fa3bfbebf..0000000000 --- a/cpp/src/qpid/broker/AutoDelete.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "AutoDelete.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/Time.h" - -using namespace qpid::broker; -using namespace qpid::sys; - -AutoDelete::AutoDelete(QueueRegistry* const _registry, uint32_t _water_mark) - : registry(_registry), high_water_mark(_water_mark), water_mark(0) { } - -void AutoDelete::add(Queue::shared_ptr const queue){ - Mutex::ScopedLock l(lock); - queues.push(queue); -} - -Queue::shared_ptr const AutoDelete::pop(){ - Queue::shared_ptr next; - Mutex::ScopedLock l(lock); - if(!queues.empty()){ - next = queues.front(); - queues.pop(); - } - return next; -} - -void AutoDelete::clean(){ - if (water_mark++ < high_water_mark) - return; - water_mark =0; - cleanNow(); -} - - -void AutoDelete::cleanNow(){ - Queue::shared_ptr seen; - for(Queue::shared_ptr q = pop(); q; q = pop()){ - if(seen == q){ - add(q); - break; - }else if(q->canAutoDelete()){ - std::string name(q->getName()); - registry->destroy(name); - QPID_LOG(info, "Auto-deleted queue named " << name); - }else{ - add(q); - if(!seen) seen = q; - } - } - -} - - - - diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h deleted file mode 100644 index c4d46ca505..0000000000 --- a/cpp/src/qpid/broker/AutoDelete.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef _AutoDelete_ -#define _AutoDelete_ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <queue> -#include "qpid/sys/Monitor.h" -#include "BrokerQueue.h" -#include "QueueRegistry.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" - -namespace qpid { - namespace broker{ - class AutoDelete { - qpid::sys::Mutex lock; - std::queue<Queue::shared_ptr> queues; - QueueRegistry* const registry; - uint32_t high_water_mark; - uint32_t water_mark; - - Queue::shared_ptr const pop(); - - public: - AutoDelete(QueueRegistry* const registry, uint32_t _water_mark); - void add(Queue::shared_ptr const); - void clean(); - void cleanNow(); - }; - } -} - - -#endif diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 7c96322f02..43a5567b3d 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -80,9 +80,7 @@ Broker::Broker(const Broker::Options& conf) : config(conf), store(createStore(conf)), queues(store.get()), - timeout(30000), stagingThreshold(0), - cleaner(&queues, 100), // clean every 100 auto delete declares. factory(*this), dtxManager(store.get()) { @@ -127,7 +125,6 @@ void Broker::run() { void Broker::shutdown() { if (acceptor) acceptor->shutdown(); -//cct cleaner.cleanNow(); // do we need to delete on close? } Broker::~Broker() { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 58ba8589b0..a27bce1751 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -28,7 +28,6 @@ #include "qpid/sys/Runnable.h" #include "qpid/sys/Acceptor.h" #include "MessageStore.h" -#include "AutoDelete.h" #include "ExchangeRegistry.h" #include "ConnectionToken.h" #include "DirectExchange.h" @@ -95,9 +94,7 @@ class Broker : public sys::Runnable, public PluginUser MessageStore& getStore() { return *store; } QueueRegistry& getQueues() { return queues; } ExchangeRegistry& getExchanges() { return exchanges; } - uint32_t getTimeout() { return timeout; } uint64_t getStagingThreshold() { return stagingThreshold; } - AutoDelete& getCleaner() { return cleaner; } DtxManager& getDtxManager() { return dtxManager; } private: @@ -108,9 +105,7 @@ class Broker : public sys::Runnable, public PluginUser const std::auto_ptr<MessageStore> store; QueueRegistry queues; ExchangeRegistry exchanges; - uint32_t timeout; uint64_t stagingThreshold; - AutoDelete cleaner; ConnectionFactory factory; DtxManager dtxManager; diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index a0ab42592e..dc8cd6cce1 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -213,7 +213,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint std::pair<Queue::shared_ptr, bool> queue_created = broker.getQueues().declare( name, durable, - autoDelete ? connection.getTimeout() : 0, + autoDelete && !exclusive, exclusive ? &connection : 0); queue = queue_created.first; assert(queue); @@ -229,9 +229,6 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint //handle automatic cleanup: if (exclusive) { connection.exclusiveQueues.push_back(queue); - } else if(autoDelete){ - broker.getCleaner().add(queue); - broker.getCleaner().clean(); // check if cleaning is needed } } } diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 26e590f87e..f6a50a7ef5 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -249,8 +249,13 @@ Channel::ConsumerImpl::~ConsumerImpl() { } void Channel::ConsumerImpl::cancel(){ - if(queue) + if(queue) { queue->cancel(this); + if (queue->canAutoDelete()) { + parent->connection.broker.getQueues().destroyIf(queue->getName(), + boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue)); + } + } } void Channel::ConsumerImpl::requestDispatch(){ diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index f11766e7ec..1473ab6288 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -36,7 +36,7 @@ using namespace qpid::sys; using namespace qpid::framing; using boost::format; -Queue::Queue(const string& _name, uint32_t _autodelete, +Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const ConnectionToken* const _owner) : @@ -50,7 +50,6 @@ Queue::Queue(const string& _name, uint32_t _autodelete, exclusive(0), persistenceId(0) { - if(autodelete) lastUsed = now(); } Queue::~Queue(){} @@ -134,7 +133,6 @@ void Queue::consume(Consumer* c, bool requestExclusive){ "Exclusive access denied.") %getName()); exclusive = c; } - if(autodelete && consumers.empty()) lastUsed = FAR_FUTURE; consumers.push_back(c); } @@ -143,7 +141,6 @@ void Queue::cancel(Consumer* c){ Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); if (i != consumers.end()) consumers.erase(i); - if(autodelete && consumers.empty()) lastUsed = now(); if(exclusive == c) exclusive = 0; } @@ -192,7 +189,7 @@ uint32_t Queue::getConsumerCount() const{ bool Queue::canAutoDelete() const{ Mutex::ScopedLock locker(lock); - return Duration(lastUsed, now()) > autodelete; + return autodelete && consumers.size() == 0; } void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg) diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index ee472db97b..35ffe2bc13 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -60,7 +60,7 @@ namespace qpid { typedef std::queue<Message::shared_ptr> Messages; const string name; - const sys::Duration autodelete; + const bool autodelete; MessageStore* const store; const ConnectionToken* const owner; Consumers consumers; @@ -69,7 +69,6 @@ namespace qpid { bool dispatching; int next; mutable qpid::sys::Mutex lock; - sys::AbsTime lastUsed; Consumer* exclusive; mutable uint64_t persistenceId; framing::FieldTable settings; @@ -89,7 +88,7 @@ namespace qpid { typedef std::vector<shared_ptr> vector; - Queue(const string& name, uint32_t autodelete = 0, + Queue(const string& name, bool autodelete = false, MessageStore* const store = 0, const ConnectionToken* const owner = 0); ~Queue(); diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 2bd835e753..ce9e4865db 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -41,7 +41,6 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : framemax(65536), heartbeat(0), client(0), - timeout(broker.getTimeout()), stagingThreshold(broker.getStagingThreshold()) {} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index fcfc1d3334..259a74f808 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -62,7 +62,6 @@ class Connection : public sys::ConnectionInputHandler, uint32_t getFrameMax() const { return framemax; } uint16_t getHeartbeat() const { return heartbeat; } - uint32_t getTimeout() const { return timeout; } uint64_t getStagingThreshold() const { return stagingThreshold; } void setFrameMax(uint32_t fm) { framemax = fm; } @@ -98,7 +97,6 @@ class Connection : public sys::ConnectionInputHandler, uint32_t framemax; uint16_t heartbeat; framing::AMQP_ClientProxy::Connection* client; - const uint32_t timeout; //timeout for auto-deleted queues (in ms) const uint64_t stagingThreshold; }; diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index dcaf7ec0f6..e309594aa9 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -31,7 +31,7 @@ QueueRegistry::~QueueRegistry(){} std::pair<Queue::shared_ptr, bool> QueueRegistry::declare(const string& declareName, bool durable, - uint32_t autoDelete, const ConnectionToken* owner) + bool autoDelete, const ConnectionToken* owner) { Mutex::ScopedLock locker(lock); string name = declareName.empty() ? generateName() : declareName; diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index bd53cba817..cec2a11e68 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -22,7 +22,7 @@ #define _QueueRegistry_ #include <map> -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Mutex.h" #include "BrokerQueue.h" namespace qpid { @@ -36,7 +36,6 @@ namespace broker { * */ class QueueRegistry{ - public: QueueRegistry(MessageStore* const store = 0); ~QueueRegistry(); @@ -47,7 +46,7 @@ class QueueRegistry{ * @return The queue and a boolean flag which is true if the queue * was created by this declare call false if it already existed. */ - std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, uint32_t autodelete = 0, + std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, bool autodelete = false, const ConnectionToken* const owner = 0); /** @@ -63,6 +62,13 @@ class QueueRegistry{ * */ void destroy(const string& name); + template <class Test> void destroyIf(const string& name, Test test) + { + qpid::sys::Mutex::ScopedLock locker(lock); + if (test()) { + queues.erase(name); + } + } /** * Find the named queue. Return 0 if not found. @@ -79,8 +85,7 @@ class QueueRegistry{ */ MessageStore* const getStore() const; - - private: +private: typedef std::map<string, Queue::shared_ptr> QueueMap; QueueMap queues; qpid::sys::Mutex lock; |