summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-03 09:04:00 +0000
committerGordon Sim <gsim@apache.org>2007-07-03 09:04:00 +0000
commite7f82f3d5b39c77ee913ee4cd6b4bf1bdc8b13bc (patch)
tree287e8ce31be4594915c577045025b466d969d17b
parent953eb74e69055777252ec05e1c277ec32fed82d0 (diff)
downloadqpid-python-e7f82f3d5b39c77ee913ee4cd6b4bf1bdc8b13bc.tar.gz
Autodeletable shared queues are now deleted as soon as the consumer count drops to zero (i.e. there is no timeout).
This closes QPID-533. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@552751 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/AutoDelete.cpp74
-rw-r--r--cpp/src/qpid/broker/AutoDelete.h52
-rw-r--r--cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--cpp/src/qpid/broker/Broker.h5
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp5
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp7
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp7
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.h5
-rw-r--r--cpp/src/qpid/broker/Connection.cpp1
-rw-r--r--cpp/src/qpid/broker/Connection.h2
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp2
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h15
13 files changed, 22 insertions, 158 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 4b3a223e72..9f4440323e 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -180,7 +180,6 @@ libqpidcommon_la_SOURCES = \
libqpidbroker_la_LIBADD = libqpidcommon.la -ldaemon -lboost_filesystem
libqpidbroker_la_SOURCES = \
qpid/broker/AccumulatedAck.cpp \
- qpid/broker/AutoDelete.cpp \
qpid/broker/Broker.cpp \
qpid/broker/BrokerAdapter.cpp \
qpid/broker/BrokerSingleton.cpp \
@@ -241,7 +240,6 @@ libqpidclient_la_SOURCES = \
nobase_include_HEADERS = \
$(platform_hdr) \
qpid/broker/AccumulatedAck.h \
- qpid/broker/AutoDelete.h \
qpid/broker/BrokerChannel.h \
qpid/broker/BrokerExchange.h \
qpid/broker/BrokerMessage.h \
diff --git a/cpp/src/qpid/broker/AutoDelete.cpp b/cpp/src/qpid/broker/AutoDelete.cpp
deleted file mode 100644
index 6fa3bfbebf..0000000000
--- a/cpp/src/qpid/broker/AutoDelete.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "AutoDelete.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/Time.h"
-
-using namespace qpid::broker;
-using namespace qpid::sys;
-
-AutoDelete::AutoDelete(QueueRegistry* const _registry, uint32_t _water_mark)
- : registry(_registry), high_water_mark(_water_mark), water_mark(0) { }
-
-void AutoDelete::add(Queue::shared_ptr const queue){
- Mutex::ScopedLock l(lock);
- queues.push(queue);
-}
-
-Queue::shared_ptr const AutoDelete::pop(){
- Queue::shared_ptr next;
- Mutex::ScopedLock l(lock);
- if(!queues.empty()){
- next = queues.front();
- queues.pop();
- }
- return next;
-}
-
-void AutoDelete::clean(){
- if (water_mark++ < high_water_mark)
- return;
- water_mark =0;
- cleanNow();
-}
-
-
-void AutoDelete::cleanNow(){
- Queue::shared_ptr seen;
- for(Queue::shared_ptr q = pop(); q; q = pop()){
- if(seen == q){
- add(q);
- break;
- }else if(q->canAutoDelete()){
- std::string name(q->getName());
- registry->destroy(name);
- QPID_LOG(info, "Auto-deleted queue named " << name);
- }else{
- add(q);
- if(!seen) seen = q;
- }
- }
-
-}
-
-
-
-
diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h
deleted file mode 100644
index c4d46ca505..0000000000
--- a/cpp/src/qpid/broker/AutoDelete.h
+++ /dev/null
@@ -1,52 +0,0 @@
-#ifndef _AutoDelete_
-#define _AutoDelete_
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-#include <queue>
-#include "qpid/sys/Monitor.h"
-#include "BrokerQueue.h"
-#include "QueueRegistry.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-
-namespace qpid {
- namespace broker{
- class AutoDelete {
- qpid::sys::Mutex lock;
- std::queue<Queue::shared_ptr> queues;
- QueueRegistry* const registry;
- uint32_t high_water_mark;
- uint32_t water_mark;
-
- Queue::shared_ptr const pop();
-
- public:
- AutoDelete(QueueRegistry* const registry, uint32_t _water_mark);
- void add(Queue::shared_ptr const);
- void clean();
- void cleanNow();
- };
- }
-}
-
-
-#endif
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 7c96322f02..43a5567b3d 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -80,9 +80,7 @@ Broker::Broker(const Broker::Options& conf) :
config(conf),
store(createStore(conf)),
queues(store.get()),
- timeout(30000),
stagingThreshold(0),
- cleaner(&queues, 100), // clean every 100 auto delete declares.
factory(*this),
dtxManager(store.get())
{
@@ -127,7 +125,6 @@ void Broker::run() {
void Broker::shutdown() {
if (acceptor)
acceptor->shutdown();
-//cct cleaner.cleanNow(); // do we need to delete on close?
}
Broker::~Broker() {
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 58ba8589b0..a27bce1751 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -28,7 +28,6 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Acceptor.h"
#include "MessageStore.h"
-#include "AutoDelete.h"
#include "ExchangeRegistry.h"
#include "ConnectionToken.h"
#include "DirectExchange.h"
@@ -95,9 +94,7 @@ class Broker : public sys::Runnable, public PluginUser
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
ExchangeRegistry& getExchanges() { return exchanges; }
- uint32_t getTimeout() { return timeout; }
uint64_t getStagingThreshold() { return stagingThreshold; }
- AutoDelete& getCleaner() { return cleaner; }
DtxManager& getDtxManager() { return dtxManager; }
private:
@@ -108,9 +105,7 @@ class Broker : public sys::Runnable, public PluginUser
const std::auto_ptr<MessageStore> store;
QueueRegistry queues;
ExchangeRegistry exchanges;
- uint32_t timeout;
uint64_t stagingThreshold;
- AutoDelete cleaner;
ConnectionFactory factory;
DtxManager dtxManager;
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index a0ab42592e..dc8cd6cce1 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -213,7 +213,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint
std::pair<Queue::shared_ptr, bool> queue_created =
broker.getQueues().declare(
name, durable,
- autoDelete ? connection.getTimeout() : 0,
+ autoDelete && !exclusive,
exclusive ? &connection : 0);
queue = queue_created.first;
assert(queue);
@@ -229,9 +229,6 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint
//handle automatic cleanup:
if (exclusive) {
connection.exclusiveQueues.push_back(queue);
- } else if(autoDelete){
- broker.getCleaner().add(queue);
- broker.getCleaner().clean(); // check if cleaning is needed
}
}
}
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 26e590f87e..f6a50a7ef5 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -249,8 +249,13 @@ Channel::ConsumerImpl::~ConsumerImpl() {
}
void Channel::ConsumerImpl::cancel(){
- if(queue)
+ if(queue) {
queue->cancel(this);
+ if (queue->canAutoDelete()) {
+ parent->connection.broker.getQueues().destroyIf(queue->getName(),
+ boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+ }
+ }
}
void Channel::ConsumerImpl::requestDispatch(){
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index f11766e7ec..1473ab6288 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -36,7 +36,7 @@ using namespace qpid::sys;
using namespace qpid::framing;
using boost::format;
-Queue::Queue(const string& _name, uint32_t _autodelete,
+Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const ConnectionToken* const _owner) :
@@ -50,7 +50,6 @@ Queue::Queue(const string& _name, uint32_t _autodelete,
exclusive(0),
persistenceId(0)
{
- if(autodelete) lastUsed = now();
}
Queue::~Queue(){}
@@ -134,7 +133,6 @@ void Queue::consume(Consumer* c, bool requestExclusive){
"Exclusive access denied.") %getName());
exclusive = c;
}
- if(autodelete && consumers.empty()) lastUsed = FAR_FUTURE;
consumers.push_back(c);
}
@@ -143,7 +141,6 @@ void Queue::cancel(Consumer* c){
Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
if (i != consumers.end())
consumers.erase(i);
- if(autodelete && consumers.empty()) lastUsed = now();
if(exclusive == c) exclusive = 0;
}
@@ -192,7 +189,7 @@ uint32_t Queue::getConsumerCount() const{
bool Queue::canAutoDelete() const{
Mutex::ScopedLock locker(lock);
- return Duration(lastUsed, now()) > autodelete;
+ return autodelete && consumers.size() == 0;
}
void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h
index ee472db97b..35ffe2bc13 100644
--- a/cpp/src/qpid/broker/BrokerQueue.h
+++ b/cpp/src/qpid/broker/BrokerQueue.h
@@ -60,7 +60,7 @@ namespace qpid {
typedef std::queue<Message::shared_ptr> Messages;
const string name;
- const sys::Duration autodelete;
+ const bool autodelete;
MessageStore* const store;
const ConnectionToken* const owner;
Consumers consumers;
@@ -69,7 +69,6 @@ namespace qpid {
bool dispatching;
int next;
mutable qpid::sys::Mutex lock;
- sys::AbsTime lastUsed;
Consumer* exclusive;
mutable uint64_t persistenceId;
framing::FieldTable settings;
@@ -89,7 +88,7 @@ namespace qpid {
typedef std::vector<shared_ptr> vector;
- Queue(const string& name, uint32_t autodelete = 0,
+ Queue(const string& name, bool autodelete = false,
MessageStore* const store = 0,
const ConnectionToken* const owner = 0);
~Queue();
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 2bd835e753..ce9e4865db 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -41,7 +41,6 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
framemax(65536),
heartbeat(0),
client(0),
- timeout(broker.getTimeout()),
stagingThreshold(broker.getStagingThreshold())
{}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index fcfc1d3334..259a74f808 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -62,7 +62,6 @@ class Connection : public sys::ConnectionInputHandler,
uint32_t getFrameMax() const { return framemax; }
uint16_t getHeartbeat() const { return heartbeat; }
- uint32_t getTimeout() const { return timeout; }
uint64_t getStagingThreshold() const { return stagingThreshold; }
void setFrameMax(uint32_t fm) { framemax = fm; }
@@ -98,7 +97,6 @@ class Connection : public sys::ConnectionInputHandler,
uint32_t framemax;
uint16_t heartbeat;
framing::AMQP_ClientProxy::Connection* client;
- const uint32_t timeout; //timeout for auto-deleted queues (in ms)
const uint64_t stagingThreshold;
};
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index dcaf7ec0f6..e309594aa9 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -31,7 +31,7 @@ QueueRegistry::~QueueRegistry(){}
std::pair<Queue::shared_ptr, bool>
QueueRegistry::declare(const string& declareName, bool durable,
- uint32_t autoDelete, const ConnectionToken* owner)
+ bool autoDelete, const ConnectionToken* owner)
{
Mutex::ScopedLock locker(lock);
string name = declareName.empty() ? generateName() : declareName;
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index bd53cba817..cec2a11e68 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -22,7 +22,7 @@
#define _QueueRegistry_
#include <map>
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Mutex.h"
#include "BrokerQueue.h"
namespace qpid {
@@ -36,7 +36,6 @@ namespace broker {
*
*/
class QueueRegistry{
-
public:
QueueRegistry(MessageStore* const store = 0);
~QueueRegistry();
@@ -47,7 +46,7 @@ class QueueRegistry{
* @return The queue and a boolean flag which is true if the queue
* was created by this declare call false if it already existed.
*/
- std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, uint32_t autodelete = 0,
+ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, bool autodelete = false,
const ConnectionToken* const owner = 0);
/**
@@ -63,6 +62,13 @@ class QueueRegistry{
*
*/
void destroy(const string& name);
+ template <class Test> void destroyIf(const string& name, Test test)
+ {
+ qpid::sys::Mutex::ScopedLock locker(lock);
+ if (test()) {
+ queues.erase(name);
+ }
+ }
/**
* Find the named queue. Return 0 if not found.
@@ -79,8 +85,7 @@ class QueueRegistry{
*/
MessageStore* const getStore() const;
-
- private:
+private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
qpid::sys::Mutex lock;