summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp21
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.cpp16
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.h10
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp83
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h28
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.h5
-rw-r--r--qpid/cpp/src/qpid/broker/FanOutExchange.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/FanOutExchange.h10
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.h13
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/NodeProperties.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Topic.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp9
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/ha/FailoverExchange.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/FailoverExchange.h1
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h1
-rw-r--r--qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp4
-rw-r--r--qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp4
-rw-r--r--qpid/cpp/src/qpid/management/ManagementTopicExchange.h2
-rw-r--r--qpid/cpp/src/qpid/xml/XmlExchange.cpp15
-rw-r--r--qpid/cpp/src/qpid/xml/XmlExchange.h4
-rw-r--r--qpid/cpp/src/qpid/xml/XmlExchangePlugin.cpp5
-rw-r--r--qpid/cpp/src/tests/ExchangeTest.cpp20
-rw-r--r--qpid/cpp/src/tests/legacystore/SimpleTest.cpp6
-rw-r--r--qpid/cpp/src/tests/misc.py67
-rw-r--r--qpid/java/test-profiles/python_tests/Java010PythonExcludes2
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py107
38 files changed, 412 insertions, 119 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index da6c2d4a84..1e25af3b64 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -295,7 +295,7 @@ Broker::Broker(const Broker::Options& conf) :
framing::FieldTable args;
// Default exchnge is not replicated.
- exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs());
+ exchanges.declare(empty, DirectExchange::typeName, false, false, noReplicateArgs());
RecoveredObjects objects;
if (store.get() != 0) {
@@ -313,7 +313,7 @@ Broker::Broker(const Broker::Options& conf) :
declareStandardExchange(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
- exchanges.declare(qpid_management, ManagementTopicExchange::typeName, false, noReplicateArgs());
+ exchanges.declare(qpid_management, ManagementTopicExchange::typeName, false, false, noReplicateArgs());
Exchange::shared_ptr mExchange = exchanges.get(qpid_management);
Exchange::shared_ptr dExchange = exchanges.get(amq_direct);
managementAgent->setExchange(mExchange, dExchange);
@@ -323,9 +323,9 @@ Broker::Broker(const Broker::Options& conf) :
std::string qmfDirect("qmf.default.direct");
std::pair<Exchange::shared_ptr, bool> topicPair(
- exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, noReplicateArgs()));
+ exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, false, noReplicateArgs()));
std::pair<Exchange::shared_ptr, bool> directPair(
- exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, noReplicateArgs()));
+ exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, false, noReplicateArgs()));
boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2);
boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2);
@@ -386,7 +386,7 @@ void Broker::declareStandardExchange(const std::string& name, const std::string&
framing::FieldTable args;
// Standard exchanges are not replicated.
std::pair<Exchange::shared_ptr, bool> status =
- exchanges.declare(name, type, storeEnabled, noReplicateArgs());
+ exchanges.declare(name, type, storeEnabled, false, noReplicateArgs());
if (status.second && storeEnabled) {
store->create(*status.first, framing::FieldTable ());
}
@@ -759,12 +759,14 @@ void Broker::createObject(const std::string& type, const std::string& name,
}
} else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
bool durable(false);
+ bool autodelete(false);
std::string exchangeType("topic");
std::string alternateExchange;
Variant::Map extensions;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
// extract durable, auto-delete and alternate-exchange properties
if (i->first == DURABLE) durable = i->second;
+ else if (i->first == AUTO_DELETE) autodelete = i->second;
else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
//treat everything else as extension properties
@@ -775,7 +777,7 @@ void Broker::createObject(const std::string& type, const std::string& name,
try {
std::pair<boost::shared_ptr<Exchange>, bool> result =
- createExchange(name, exchangeType, durable, alternateExchange, arguments, userId, connectionId);
+ createExchange(name, exchangeType, durable, autodelete, alternateExchange, arguments, userId, connectionId);
if (!result.second) {
throw ObjectAlreadyExists(name);
}
@@ -1362,6 +1364,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
const std::string& name,
const std::string& type,
bool durable,
+ bool autodelete,
const std::string& alternateExchange,
const qpid::framing::FieldTable& arguments,
const std::string& userId,
@@ -1372,6 +1375,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
params.insert(make_pair(acl::PROP_TYPE, type));
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE));
if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,&params) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId));
}
@@ -1384,7 +1388,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
std::pair<Exchange::shared_ptr, bool> result;
result = exchanges.declare(
- name, type, durable, arguments, alternate, connectionId, userId);
+ name, type, durable, autodelete, arguments, alternate, connectionId, userId);
if (result.second) {
if (durable) {
store->create(*result.first, arguments);
@@ -1394,7 +1398,8 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
<< " rhost:" << connectionId
<< " type:" << type
<< " alternateExchange:" << alternateExchange
- << " durable:" << (durable ? "T" : "F"));
+ << " durable:" << (durable ? "T" : "F")
+ << " autodelete:" << (autodelete ? "T" : "F"));
}
return result;
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index e658d6f03b..fe6dd6379b 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -329,6 +329,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
const std::string& name,
const std::string& type,
bool durable,
+ bool autodelete,
const std::string& alternateExchange,
const qpid::framing::FieldTable& args,
const std::string& userId, const std::string& connectionId);
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
index 8ab7b59ed1..65c8287d4f 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
@@ -45,9 +45,9 @@ DirectExchange::DirectExchange(const string& _name, Manageable* _parent, Broker*
mgmtExchange->set_type(typeName);
}
-DirectExchange::DirectExchange(const string& _name, bool _durable,
+DirectExchange::DirectExchange(const string& _name, bool _durable, bool autodelete,
const FieldTable& _args, Manageable* _parent, Broker* b) :
- Exchange(_name, _durable, _args, _parent, b)
+ Exchange(_name, _durable, autodelete, _args, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type(typeName);
@@ -131,6 +131,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
{
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
bool propagate = false;
+ bool empty = false;
QPID_LOG(debug, "Unbinding key [" << routingKey << "] from queue " << queue->getName()
<< " on exchange " << getName() << " origin=" << fedOrigin << ")" );
@@ -144,6 +145,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
}
if (bk.queues.empty()) {
bindings.erase(routingKey);
+ if (bindings.empty()) empty = true;
}
} else {
return false;
@@ -153,6 +155,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
// If I delete my local binding, propagate this unbind to any upstream brokers
if (propagate)
propagateFedOp(routingKey, string(), fedOpUnbind, string());
+ if (empty) checkAutodelete();
return true;
}
@@ -163,7 +166,8 @@ void DirectExchange::route(Deliverable& msg)
ConstBindingList b;
{
Mutex::ScopedLock l(lock);
- b = bindings[routingKey].queues.snapshot();
+ Bindings::iterator i = bindings.find(routingKey);
+ if (i != bindings.end()) b = i->second.queues.snapshot();
}
doRoute(msg, b);
}
@@ -202,3 +206,9 @@ DirectExchange::~DirectExchange() {
}
const std::string DirectExchange::typeName("direct");
+
+bool DirectExchange::hasBindings()
+{
+ Mutex::ScopedLock l(lock);
+ return !bindings.empty();
+}
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.h b/qpid/cpp/src/qpid/broker/DirectExchange.h
index cfefef54e8..cbf9aa5975 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.h
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.h
@@ -43,16 +43,16 @@ class DirectExchange : public virtual Exchange {
public:
QPID_BROKER_EXTERN static const std::string typeName;
-
+
QPID_BROKER_EXTERN DirectExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_EXTERN DirectExchange(const std::string& _name,
- bool _durable,
+ bool _durable, bool autodelete,
const qpid::framing::FieldTable& _args,
management::Manageable* parent = 0, Broker* broker = 0);
- virtual std::string getType() const { return typeName; }
-
+ virtual std::string getType() const { return typeName; }
+
QPID_BROKER_EXTERN virtual bool bind(boost::shared_ptr<Queue> queue,
const std::string& routingKey,
const qpid::framing::FieldTable* args);
@@ -65,6 +65,8 @@ public:
QPID_BROKER_EXTERN virtual ~DirectExchange();
virtual bool supportsDynamicBinding() { return true; }
+ protected:
+ bool hasBindings();
};
}}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index efd83a3225..304ed7cec4 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -166,7 +166,7 @@ void Exchange::routeIVE(){
Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
- name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false),
+ name(_name), durable(false), autodelete(false), alternateUsers(0), otherUsers(0), persistenceId(0), sequence(false),
sequenceNo(0), ive(false), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
@@ -176,7 +176,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
{
mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name));
mgmtExchange->set_durable(durable);
- mgmtExchange->set_autoDelete(false);
+ mgmtExchange->set_autoDelete(autodelete);
agent->addObject(mgmtExchange, 0, durable);
if (broker)
brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
@@ -184,9 +184,9 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
}
}
-Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
+Exchange::Exchange(const string& _name, bool _durable, bool _autodelete, const qpid::framing::FieldTable& _args,
Manageable* parent, Broker* b)
- : name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
+ : name(_name), durable(_durable), autodelete(_autodelete), alternateUsers(0), otherUsers(0), persistenceId(0),
args(_args), sequence(false), sequenceNo(0), ive(false), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
@@ -196,7 +196,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
{
mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name));
mgmtExchange->set_durable(durable);
- mgmtExchange->set_autoDelete(false);
+ mgmtExchange->set_autoDelete(autodelete);
mgmtExchange->set_arguments(ManagementAgent::toMap(args));
agent->addObject(mgmtExchange, 0, durable);
if (broker)
@@ -255,7 +255,7 @@ Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffe
buffer.getShortString(altName);
try {
- Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first;
+ Exchange::shared_ptr exch = exchanges.declare(name, type, durable, false, args).first;
exch->sequenceNo = args.getAsInt64(qpidSequenceCounter);
exch->alternateName.assign(altName);
return exch;
@@ -415,5 +415,76 @@ void Exchange::setArgs(const framing::FieldTable& newArgs) {
if (mgmtExchange) mgmtExchange->set_arguments(ManagementAgent::toMap(args));
}
+void Exchange::checkAutodelete()
+{
+ if (autodelete && !inUse() && broker) {
+ broker->getExchanges().destroy(name);
+ }
+}
+void Exchange::incAlternateUsers()
+{
+ Mutex::ScopedLock l(usersLock);
+ alternateUsers++;
+}
+
+void Exchange::decAlternateUsers()
+{
+ Mutex::ScopedLock l(usersLock);
+ alternateUsers--;
+}
+
+bool Exchange::inUseAsAlternate()
+{
+ Mutex::ScopedLock l(usersLock);
+ return alternateUsers > 0;
+}
+
+void Exchange::incOtherUsers()
+{
+ Mutex::ScopedLock l(usersLock);
+ otherUsers++;
+}
+void Exchange::decOtherUsers()
+{
+ Mutex::ScopedLock l(usersLock);
+ assert(otherUsers);
+ if (otherUsers) otherUsers--;
+ if (!inUse() && !hasBindings()) checkAutodelete();
+}
+bool Exchange::inUse() const
+{
+ Mutex::ScopedLock l(usersLock);
+ return alternateUsers > 0 || otherUsers > 0;
+}
+void Exchange::setDeletionListener(const std::string& key, boost::function0<void> listener)
+{
+ Mutex::ScopedLock l(usersLock);
+ if (listener) deletionListeners[key] = listener;
+}
+void Exchange::unsetDeletionListener(const std::string& key)
+{
+ Mutex::ScopedLock l(usersLock);
+ deletionListeners.erase(key);
+}
+
+void Exchange::destroy()
+{
+ std::map<std::string, boost::function0<void> > copy;
+ {
+ Mutex::ScopedLock l(usersLock);
+ destroyed = true;
+ deletionListeners.swap(copy);
+ }
+ for (std::map<std::string, boost::function0<void> >::iterator i = copy.begin(); i != copy.end(); ++i) {
+ QPID_LOG(notice, "Exchange::destroy() notifying " << i->first);
+ if (i->second) i->second();
+ }
+}
+bool Exchange::isDestroyed() const
+{
+ Mutex::ScopedLock l(usersLock);
+ return destroyed;
+}
+
}}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
index 70ed393f64..7d3bbcf88e 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.h
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -34,6 +34,8 @@
#include "qmf/org/apache/qpid/broker/Exchange.h"
#include "qmf/org/apache/qpid/broker/Binding.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
+#include <map>
+#include <boost/function.hpp>
namespace qpid {
namespace broker {
@@ -64,9 +66,13 @@ public:
private:
const std::string name;
const bool durable;
+ const bool autodelete;
std::string alternateName;
boost::shared_ptr<Exchange> alternate;
+ mutable qpid::sys::Mutex usersLock;
uint32_t alternateUsers;
+ uint32_t otherUsers;
+ std::map<std::string, boost::function0<void> > deletionListeners;
mutable uint64_t persistenceId;
protected:
@@ -89,7 +95,8 @@ protected:
typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
void doRoute(Deliverable& msg, ConstBindingList b);
void routeIVE();
-
+ void checkAutodelete();
+ virtual bool hasBindings() = 0;
struct MatchQueue {
const boost::shared_ptr<Queue> queue;
@@ -167,7 +174,7 @@ public:
QPID_BROKER_EXTERN explicit Exchange(const std::string& name, management::Manageable* parent = 0,
Broker* broker = 0);
- QPID_BROKER_EXTERN Exchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args,
+ QPID_BROKER_EXTERN Exchange(const std::string& _name, bool _durable, bool autodelete, const qpid::framing::FieldTable& _args,
management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_INLINE_EXTERN virtual ~Exchange();
@@ -179,9 +186,13 @@ public:
QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; }
QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate);
- void incAlternateUsers() { alternateUsers++; }
- void decAlternateUsers() { alternateUsers--; }
- bool inUseAsAlternate() { return alternateUsers > 0; }
+ QPID_BROKER_EXTERN void incAlternateUsers();
+ QPID_BROKER_EXTERN void decAlternateUsers();
+ QPID_BROKER_EXTERN bool inUseAsAlternate();
+
+ QPID_BROKER_EXTERN void incOtherUsers();
+ QPID_BROKER_EXTERN void decOtherUsers();
+ QPID_BROKER_EXTERN bool inUse() const;
virtual std::string getType() const = 0;
@@ -233,8 +244,11 @@ public:
bool routeWithAlternate(Deliverable& message);
- void destroy() { destroyed = true; }
- bool isDestroyed() const { return destroyed; }
+ QPID_BROKER_EXTERN void destroy();
+ QPID_BROKER_EXTERN bool isDestroyed() const;
+
+ QPID_BROKER_EXTERN void setDeletionListener(const std::string& key, boost::function0<void> listener);
+ QPID_BROKER_EXTERN void unsetDeletionListener(const std::string& key);
protected:
qpid::sys::Mutex bridgeLock;
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 9eeffadb90..34a31fe769 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -42,11 +42,11 @@ namespace _qmf = qmf::org::apache::qpid::broker;
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){
- return declare(name, type, false, FieldTable());
+ return declare(name, type, false, false, FieldTable());
}
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(
- const string& name, const string& type, bool durable, const FieldTable& args,
+ const string& name, const string& type, bool durable, bool autodelete, const FieldTable& args,
Exchange::shared_ptr alternate, const string& connectionId, const string& userId)
{
Exchange::shared_ptr exchange;
@@ -56,13 +56,13 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(
ExchangeMap::iterator i = exchanges.find(name);
if (i == exchanges.end()) {
if (type == TopicExchange::typeName){
- exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker));
+ exchange = Exchange::shared_ptr(new TopicExchange(name, durable, autodelete, args, parent, broker));
}else if(type == DirectExchange::typeName){
- exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker));
+ exchange = Exchange::shared_ptr(new DirectExchange(name, durable, autodelete, args, parent, broker));
}else if(type == FanOutExchange::typeName){
- exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
+ exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, autodelete, args, parent, broker));
}else if (type == HeadersExchange::typeName) {
- exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
+ exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, autodelete, args, parent, broker));
}else if (type == ManagementDirectExchange::typeName) {
exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
}else if (type == ManagementTopicExchange::typeName) {
@@ -74,7 +74,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(
if (i == factory.end()) {
throw UnknownExchangeTypeException();
} else {
- exchange = i->second(name, durable, args, parent, broker);
+ exchange = i->second(name, durable, autodelete, args, parent, broker);
}
}
exchanges[name] = exchange;
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
index 8db2c34863..99caf30269 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -42,8 +42,8 @@ struct UnknownExchangeTypeException{};
class ExchangeRegistry{
public:
- typedef boost::function5<Exchange::shared_ptr, const std::string&,
- bool, const qpid::framing::FieldTable&, qpid::management::Manageable*, qpid::broker::Broker*> FactoryFunction;
+ typedef boost::function6<Exchange::shared_ptr, const std::string&,
+ bool, bool, const qpid::framing::FieldTable&, qpid::management::Manageable*, qpid::broker::Broker*> FactoryFunction;
ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {}
QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare(
@@ -53,6 +53,7 @@ class ExchangeRegistry{
const std::string& name,
const std::string& type,
bool durable,
+ bool autodelete,
const qpid::framing::FieldTable& args = framing::FieldTable(),
Exchange::shared_ptr alternate = Exchange::shared_ptr(),
const std::string& connectionId = std::string(),
diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
index 20ca06e048..8c1d3f4954 100644
--- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -38,9 +38,9 @@ FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent, Br
mgmtExchange->set_type (typeName);
}
-FanOutExchange::FanOutExchange(const std::string& _name, bool _durable,
+FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, bool autodelete,
const FieldTable& _args, Manageable* _parent, Broker* b) :
- Exchange(_name, _durable, _args, _parent, b)
+ Exchange(_name, _durable, autodelete, _args, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
@@ -101,6 +101,7 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons
if (propagate)
propagateFedOp(string(), string(), fedOpUnbind, string());
+ if (bindings.empty()) checkAutodelete();
return true;
}
@@ -109,7 +110,7 @@ void FanOutExchange::route(Deliverable& msg)
PreRoute pr(msg, this);
doRoute(msg, bindings.snapshot());
}
-
+
bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
{
BindingsArray::ConstPtr ptr = bindings.snapshot();
@@ -123,3 +124,9 @@ FanOutExchange::~FanOutExchange() {
}
const std::string FanOutExchange::typeName("fanout");
+
+bool FanOutExchange::hasBindings()
+{
+ BindingsArray::ConstPtr ptr = bindings.snapshot();
+ return ptr && !ptr->empty();
+}
diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.h b/qpid/cpp/src/qpid/broker/FanOutExchange.h
index c979fdca25..a92ff7ce97 100644
--- a/qpid/cpp/src/qpid/broker/FanOutExchange.h
+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.h
@@ -38,16 +38,16 @@ class FanOutExchange : public virtual Exchange {
FedBinding fedBinding;
public:
static const std::string typeName;
-
+
QPID_BROKER_EXTERN FanOutExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_EXTERN FanOutExchange(const std::string& _name,
- bool _durable,
+ bool _durable, bool autodelete,
const qpid::framing::FieldTable& _args,
management::Manageable* parent = 0, Broker* broker = 0);
- virtual std::string getType() const { return typeName; }
-
+ virtual std::string getType() const { return typeName; }
+
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
const qpid::framing::FieldTable* args);
@@ -62,6 +62,8 @@ class FanOutExchange : public virtual Exchange {
QPID_BROKER_EXTERN virtual ~FanOutExchange();
virtual bool supportsDynamicBinding() { return true; }
+ protected:
+ bool hasBindings();
};
}
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index 4e86b09565..19c7f107f6 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -164,9 +164,9 @@ HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broke
mgmtExchange->set_type (typeName);
}
-HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
+HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, bool autodelete,
const FieldTable& _args, Manageable* _parent, Broker* b) :
- Exchange(_name, _durable, _args, _parent, b)
+ Exchange(_name, _durable, autodelete, _args, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
@@ -288,6 +288,7 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
if (propagate) {
propagateFedOp(bindingKey, string(), fedOpUnbind, string());
}
+ if (bindings.empty()) checkAutodelete();
return true;
}
@@ -404,3 +405,8 @@ bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk)
return true;
}
+bool HeadersExchange::hasBindings()
+{
+ Bindings::ConstPtr ptr = bindings.snapshot();
+ return ptr && !ptr->empty();
+}
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h
index e51478d365..54c69491e9 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.h
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h
@@ -45,12 +45,12 @@ class HeadersExchange : public virtual Exchange {
struct MatchArgs
{
- const Queue::shared_ptr queue;
+ const Queue::shared_ptr queue;
const qpid::framing::FieldTable* args;
MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a);
bool operator()(const BoundKey & bk);
};
-
+
struct MatchKey
{
const Queue::shared_ptr queue;
@@ -77,6 +77,7 @@ class HeadersExchange : public virtual Exchange {
protected:
void getNonFedArgs(const framing::FieldTable* args,
framing::FieldTable& nonFedArgs);
+ bool hasBindings();
public:
QPID_BROKER_EXTERN static const std::string typeName;
@@ -84,12 +85,12 @@ class HeadersExchange : public virtual Exchange {
QPID_BROKER_EXTERN HeadersExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_EXTERN HeadersExchange(const std::string& _name,
- bool _durable,
+ bool _durable, bool autodelete,
const qpid::framing::FieldTable& _args,
management::Manageable* parent = 0, Broker* broker = 0);
-
- virtual std::string getType() const { return typeName; }
-
+
+ virtual std::string getType() const { return typeName; }
+
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,
const qpid::framing::FieldTable* args);
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index cd7ba35dd8..fe1cac8aab 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -90,7 +90,7 @@ public:
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; }
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const) {return false;}
-
+ bool hasBindings() { return false; }
// Process messages sent from the remote's amq.failover exchange by extracting the failover URLs
// and saving them should the Link need to reconnect.
void route(broker::Deliverable& /*msg*/)
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index eff77db02f..9a8110d54f 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -69,9 +69,8 @@ static const std::string _FALSE("false");
void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
const string& alternateExchange,
- bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
+ bool passive, bool durable, bool autodelete, const FieldTable& args){
- //TODO: implement autoDelete
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
alternate = getBroker().getExchanges().get(alternateExchange);
@@ -83,6 +82,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
params.insert(make_pair(acl::PROP_TYPE, type));
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE));
if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchange,&params) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange access request from " << getConnection().getUserId()));
}
@@ -95,7 +95,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
}
try{
std::pair<Exchange::shared_ptr, bool> response =
- getBroker().createExchange(exchange, type, durable, alternateExchange, args,
+ getBroker().createExchange(exchange, type, durable, autodelete, alternateExchange, args,
getConnection().getUserId(), getConnection().getMgmtId());
if (!response.second) {
//exchange already there, not created
@@ -106,7 +106,8 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
<< " rhost:" << getConnection().getMgmtId()
<< " type:" << type
<< " alternateExchange:" << alternateExchange
- << " durable:" << (durable ? "T" : "F"));
+ << " durable:" << (durable ? "T" : "F")
+ << " autodelete:" << (autodelete ? "T" : "F"));
}
}catch(UnknownExchangeTypeException& /*e*/){
throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type));
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
index 6a081bf65f..558c900a4f 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
@@ -147,9 +147,9 @@ TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b
mgmtExchange->set_type (typeName);
}
-TopicExchange::TopicExchange(const std::string& _name, bool _durable,
+TopicExchange::TopicExchange(const std::string& _name, bool _durable, bool autodelete,
const FieldTable& _args, Manageable* _parent, Broker* b) :
- Exchange(_name, _durable, _args, _parent, b),
+ Exchange(_name, _durable, autodelete, _args, _parent, b),
nBindings(0)
{
if (mgmtExchange != 0)
@@ -241,6 +241,7 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKe
deleteBinding(queue, routingKey, bk);
if (propagate)
propagateFedOp(routingKey, string(), fedOpUnbind, string());
+ if (nBindings == 0) checkAutodelete();
return true;
}
@@ -340,4 +341,10 @@ TopicExchange::~TopicExchange() {
const std::string TopicExchange::typeName("topic");
+bool TopicExchange::hasBindings()
+{
+ RWlock::ScopedRlock l(lock);
+ return nBindings > 0;
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h
index b8b67bdafa..d54f23a70d 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.h
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.h
@@ -88,7 +88,7 @@ public:
QPID_BROKER_EXTERN TopicExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
QPID_BROKER_EXTERN TopicExchange(const std::string& _name,
- bool _durable,
+ bool _durable, bool autodelete,
const qpid::framing::FieldTable& _args,
management::Manageable* parent = 0, Broker* broker = 0);
@@ -111,6 +111,8 @@ public:
class TopicExchangeTester;
friend class TopicExchangeTester;
+ protected:
+ bool hasBindings();
};
diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
index 45536c2262..f1d29fe00e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
@@ -181,6 +181,10 @@ void NodeProperties::write(pn_data_t* data, boost::shared_ptr<Exchange> node)
pn_data_put_symbol(data, convert(ALTERNATE_EXCHANGE));
pn_data_put_string(data, convert(node->getAlternate()->getName()));
}
+ if (autoDelete) {
+ pn_data_put_symbol(data, convert(AUTO_DELETE));
+ pn_data_put_bool(data, autoDelete);
+ }
for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
if ((i->first == QPID_MSG_SEQUENCE || i->first == QPID_IVE) && node->getArgs().isSet(i->first)) {
@@ -333,6 +337,10 @@ bool NodeProperties::isExclusive() const
{
return exclusive;
}
+bool NodeProperties::isAutodelete() const
+{
+ return autoDelete;
+}
std::string NodeProperties::getExchangeType() const
{
return exchangeType;
diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h
index df96d5a023..4ac3aa8a0f 100644
--- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h
+++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h
@@ -62,6 +62,7 @@ class NodeProperties : public qpid::amqp::MapReader
QueueSettings getQueueSettings();
bool isDurable() const;
bool isExclusive() const;
+ bool isAutodelete() const;
std::string getExchangeType() const;
std::string getAlternateExchange() const;
bool trackControllingLink() const;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 16aadfc6c8..7170da0797 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -185,7 +185,14 @@ class IncomingToExchange : public DecodingIncoming
{
public:
IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source)
- : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise()) {}
+ : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise())
+ {
+ exchange->incOtherUsers();
+ }
+ ~IncomingToExchange()
+ {
+ exchange->decOtherUsers();
+ }
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Exchange> exchange;
@@ -243,8 +250,9 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
}
qpid::framing::FieldTable args;
qpid::amqp_0_10::translate(node.properties.getProperties(), args);
- node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
- args, connection.getUserId(), connection.getId()).first;
+ node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(),
+ node.properties.getAlternateExchange(),
+ args, connection.getUserId(), connection.getId()).first;
} else {
if (node.exchange) {
QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists");
diff --git a/qpid/cpp/src/qpid/broker/amqp/Topic.cpp b/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
index 9640988834..c04f62b3d1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
@@ -111,6 +111,7 @@ boost::shared_ptr<Topic> TopicRegistry::createTopic(Broker& broker, const std::s
{
boost::shared_ptr<Topic> topic(new Topic(broker, name, properties));
add(topic);
+ topic->getExchange()->setDeletionListener(name, boost::bind(&TopicRegistry::remove, this, name));
return topic;
}
@@ -174,6 +175,7 @@ boost::shared_ptr<Topic> TopicRegistry::remove(const std::string& name)
if (i != topics.end()) {
result = i->second;
topics.erase(i);
+ result->getExchange()->unsetDeletionListener(name);
}
return result;
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index eb1206437a..a59c874594 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -551,8 +551,10 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
QPID_LOG(warning, logPrefix << "Declare event, replacing existing exchange: "
<< name);
}
+ //Note: unlike qieth queues, autodeleted exchanges have no
+ //messages, so need no special handling for autodelete in ha
CreateExchangeResult result = createExchange(
- name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
+ name, values[EXTYPE].asString(), values[DURABLE].asBool(), values[AUTODEL].asBool(), args,
values[ALTEX].asString());
assert(result.second);
}
@@ -700,7 +702,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
deleteExchange(name);
}
CreateExchangeResult result = createExchange(
- name, values[TYPE].asString(), values[DURABLE].asBool(), args,
+ name, values[TYPE].asString(), values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
}
@@ -849,6 +851,7 @@ BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
const std::string& name,
const std::string& type,
bool durable,
+ bool autodelete,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange)
{
@@ -857,6 +860,7 @@ BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
name,
type,
durable,
+ autodelete,
string(), // Set alternate exchange below
args,
userId,
@@ -872,6 +876,7 @@ BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
+bool BrokerReplicator::hasBindings() { return false; }
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 395f0706d9..07b992df6a 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -84,6 +84,7 @@ class BrokerReplicator : public broker::Exchange,
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+ bool hasBindings();
void shutdown();
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
@@ -132,6 +133,7 @@ class BrokerReplicator : public broker::Exchange,
const std::string& name,
const std::string& type,
bool durable,
+ bool autodelete,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange);
diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
index 9c7b986bf8..f1b87c63c8 100644
--- a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
+++ b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
@@ -108,6 +108,11 @@ bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const,
return queues.find(queue) != queues.end();
}
+bool FailoverExchange::hasBindings() {
+ Lock l(lock);
+ return !queues.empty();
+}
+
void FailoverExchange::route(Deliverable&) {
QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring");
}
diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.h b/qpid/cpp/src/qpid/ha/FailoverExchange.h
index 6ec1d0f152..5263bdfb03 100644
--- a/qpid/cpp/src/qpid/ha/FailoverExchange.h
+++ b/qpid/cpp/src/qpid/ha/FailoverExchange.h
@@ -54,6 +54,7 @@ class FailoverExchange : public broker::Exchange
bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args);
+ bool hasBindings();
void route(broker::Deliverable& msg);
private:
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 0f8ed0a0a7..416bb329a6 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -66,6 +66,7 @@ class PrimaryTxObserver::Exchange : public broker::Exchange {
bool bind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; }
bool unbind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; }
bool isBound(boost::shared_ptr<Queue>, const string* const, const FieldTable* const) { return false; }
+ bool hasBindings() { return false; }
string getType() const { return TYPE_NAME; }
private:
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 22af7284a8..8037559c3d 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -293,6 +293,7 @@ ReplicationId QueueReplicator::getMaxId() {
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
+bool QueueReplicator::hasBindings() { return false; }
std::string QueueReplicator::getType() const { return TYPE_NAME; }
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 01abc88843..cbb36757f6 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -88,6 +88,7 @@ class QueueReplicator : public broker::Exchange,
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+ bool hasBindings();
protected:
typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn;
diff --git a/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
index 1c1d6ef3db..8ede6940b0 100644
--- a/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
@@ -36,8 +36,8 @@ ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
bool _durable,
const FieldTable& _args,
Manageable* _parent, Broker* b) :
- Exchange (_name, _durable, _args, _parent, b),
- DirectExchange(_name, _durable, _args, _parent, b),
+ Exchange (_name, _durable, false, _args, _parent, b),
+ DirectExchange(_name, _durable, false, _args, _parent, b),
managementAgent(0) {}
void ManagementDirectExchange::route(Deliverable& msg)
diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
index c8bfef3785..0241d5a404 100644
--- a/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
@@ -35,8 +35,8 @@ ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
bool _durable,
const FieldTable& _args,
Manageable* _parent, Broker* b) :
- Exchange (_name, _durable, _args, _parent, b),
- TopicExchange(_name, _durable, _args, _parent, b),
+ Exchange (_name, _durable, false, _args, _parent, b),
+ TopicExchange(_name, _durable, false, _args, _parent, b),
managementAgent(0) {}
void ManagementTopicExchange::route(Deliverable& msg)
diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.h b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
index eff01a8552..f5192a0936 100644
--- a/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
+++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
@@ -37,7 +37,7 @@ class ManagementTopicExchange : public virtual TopicExchange
static const std::string typeName;
ManagementTopicExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0);
- ManagementTopicExchange(const std::string& _name, bool _durable,
+ ManagementTopicExchange(const std::string& _name, bool _durable,
const qpid::framing::FieldTable& _args,
Manageable* _parent = 0, Broker* broker = 0);
diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp
index 3802ec5f7f..837693f53f 100644
--- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp
@@ -109,9 +109,9 @@ XmlExchange::XmlExchange(const std::string& _name, Manageable* _parent, Broker*
mgmtExchange->set_type (typeName);
}
-XmlExchange::XmlExchange(const std::string& _name, bool _durable,
+XmlExchange::XmlExchange(const std::string& _name, bool _durable, bool autodelete,
const FieldTable& _args, Manageable* _parent, Broker* b) :
- Exchange(_name, _durable, _args, _parent, b)
+ Exchange(_name, _durable, autodelete, _args, _parent, b)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
@@ -201,9 +201,11 @@ bool XmlExchange::unbindLH(Queue::shared_ptr queue, const std::string& bindingKe
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
}
+ if (bindingsMap[bindingKey].empty()) bindingsMap.erase(bindingKey);
+ if (bindingsMap.empty()) checkAutodelete();
return true;
} else {
- return false;
+ return false;
}
}
@@ -443,6 +445,11 @@ bool XmlExchange::MatchQueueAndOrigin::operator()(XmlBinding::shared_ptr b)
const std::string XmlExchange::typeName("xml");
-
+
+bool XmlExchange::hasBindings()
+{
+ RWlock::ScopedRlock l(lock);
+ return !bindingsMap.empty();
+}
}
}
diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.h b/qpid/cpp/src/qpid/xml/XmlExchange.h
index fd3f8d0278..f9e92d60f9 100644
--- a/qpid/cpp/src/qpid/xml/XmlExchange.h
+++ b/qpid/cpp/src/qpid/xml/XmlExchange.h
@@ -71,7 +71,7 @@ class XmlExchange : public virtual Exchange {
static const std::string typeName;
XmlExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0);
- XmlExchange(const std::string& _name, bool _durable,
+ XmlExchange(const std::string& _name, bool _durable, bool autodelete,
const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
@@ -107,6 +107,8 @@ class XmlExchange : public virtual Exchange {
bool operator()(XmlBinding::shared_ptr b);
};
+ protected:
+ bool hasBindings();
private:
bool unbindLH(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
};
diff --git a/qpid/cpp/src/qpid/xml/XmlExchangePlugin.cpp b/qpid/cpp/src/qpid/xml/XmlExchangePlugin.cpp
index 742b878e86..02f0110faf 100644
--- a/qpid/cpp/src/qpid/xml/XmlExchangePlugin.cpp
+++ b/qpid/cpp/src/qpid/xml/XmlExchangePlugin.cpp
@@ -34,11 +34,12 @@ using namespace std;
class Broker;
Exchange::shared_ptr create(const std::string& name, bool durable,
- const framing::FieldTable& args,
+ bool autodelete,
+ const framing::FieldTable& args,
management::Manageable* parent,
Broker* broker)
{
- Exchange::shared_ptr e(new XmlExchange(name, durable, args, parent, broker));
+ Exchange::shared_ptr e(new XmlExchange(name, durable, autodelete, args, parent, broker));
return e;
}
diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp
index 8c2dbb21c8..df0684e832 100644
--- a/qpid/cpp/src/tests/ExchangeTest.cpp
+++ b/qpid/cpp/src/tests/ExchangeTest.cpp
@@ -158,12 +158,12 @@ QPID_AUTO_TEST_CASE(testIsBound)
QPID_AUTO_TEST_CASE(testDeleteGetAndRedeclare)
{
ExchangeRegistry exchanges;
- exchanges.declare("my-exchange", "direct", false, FieldTable());
+ exchanges.declare("my-exchange", "direct", false, false, FieldTable());
exchanges.destroy("my-exchange");
try {
exchanges.get("my-exchange");
} catch (const NotFoundException&) {}
- std::pair<Exchange::shared_ptr, bool> response = exchanges.declare("my-exchange", "direct", false, FieldTable());
+ std::pair<Exchange::shared_ptr, bool> response = exchanges.declare("my-exchange", "direct", false, false, FieldTable());
BOOST_CHECK_EQUAL(string("direct"), response.first->getType());
}
@@ -174,7 +174,7 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
char* buff = new char[10000];
framing::Buffer buffer(buff,10000);
{
- DirectExchange direct("direct1", false, args);
+ DirectExchange direct("direct1", false, false, args);
DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0);
DeliverableMessage msg2(MessageUtils::createMessage("e", "abc"), 0);
@@ -188,9 +188,9 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
BOOST_CHECK_EQUAL(2, msg2.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
BOOST_CHECK_EQUAL(3, msg3.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
- FanOutExchange fanout("fanout1", false, args);
- HeadersExchange header("headers1", false, args);
- TopicExchange topic ("topic1", false, args);
+ FanOutExchange fanout("fanout1", false, false, args);
+ HeadersExchange header("headers1", false, false, args);
+ TopicExchange topic ("topic1", false, false, args);
// check other exchanges, that they preroute
DeliverableMessage msg4(MessageUtils::createMessage("e", "abc"), 0);
@@ -226,10 +226,10 @@ QPID_AUTO_TEST_CASE(testIVEOption)
{
FieldTable args;
args.setInt("qpid.ive",1);
- DirectExchange direct("direct1", false, args);
- FanOutExchange fanout("fanout1", false, args);
- HeadersExchange header("headers1", false, args);
- TopicExchange topic ("topic1", false, args);
+ DirectExchange direct("direct1", false, false, args);
+ FanOutExchange fanout("fanout1", false, false, args);
+ HeadersExchange header("headers1", false, false, args);
+ TopicExchange topic ("topic1", false, false, args);
qpid::types::Variant::Map properties;
properties["routing-key"] = "abc";
diff --git a/qpid/cpp/src/tests/legacystore/SimpleTest.cpp b/qpid/cpp/src/tests/legacystore/SimpleTest.cpp
index c769bdeb75..d3f040817f 100644
--- a/qpid/cpp/src/tests/legacystore/SimpleTest.cpp
+++ b/qpid/cpp/src/tests/legacystore/SimpleTest.cpp
@@ -102,7 +102,7 @@ void bindAndUnbind(const string& exchangeName, const string& queueName,
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
- Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+ Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, false, args));
Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
store.create(*exchange, qpid::framing::FieldTable());
store.create(*queue, qpid::framing::FieldTable());
@@ -376,7 +376,7 @@ QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
ExchangeRegistry registry;
- Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
+ Exchange::shared_ptr exchange = registry.declare(name, type, true, false, args).first;
store.create(*exchange, qpid::framing::FieldTable());
id = exchange->getPersistenceId();
BOOST_REQUIRE(id);
@@ -446,7 +446,7 @@ QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
{
MessageStoreImpl store(&br);
store.init(test_dir, 4, 1, true); // truncate store
- Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+ Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, false, args));
Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0));
store.create(*exchange, qpid::framing::FieldTable());
diff --git a/qpid/cpp/src/tests/misc.py b/qpid/cpp/src/tests/misc.py
index 108120f321..257fb9e754 100644
--- a/qpid/cpp/src/tests/misc.py
+++ b/qpid/cpp/src/tests/misc.py
@@ -50,3 +50,70 @@ class MiscellaneousTests (VersionTest):
con.close()
other.close()
+class AutoDeleteExchangeTests(VersionTest):
+ def init_test(self, exchange_type="topic"):
+ rcv = self.ssn.receiver("my-topic; {create:always, node:{type:topic, properties:{'exchange-type':%s, 'auto-delete':True}}}" % exchange_type)
+ snd = self.ssn.sender("my-topic")
+ #send some messages
+ msgs = [Message(content=c) for c in ['a','b','c','d']]
+ for m in msgs: snd.send(m)
+
+ #verify receipt
+ for expected in msgs:
+ msg = rcv.fetch(0)
+ assert msg.content == expected.content
+ self.ssn.acknowledge(msg)
+ return (rcv, snd)
+
+ def on_rcv_detach_test(self, exchange_type="topic"):
+ rcv, snd = self.init_test(exchange_type)
+ rcv.close()
+ #verify exchange is still there
+ snd.send(Message(content="will be dropped"))
+ snd.close()
+ #now verify it is no longer there
+ try:
+ self.ssn.sender("my-topic")
+ assert False, "Attempt to send to deleted exchange should fail"
+ except MessagingError: None
+
+ def on_snd_detach_test(self, exchange_type="topic"):
+ rcv, snd = self.init_test(exchange_type)
+ snd.close()
+ #verify exchange is still there
+ snd = self.ssn.sender("my-topic")
+ snd.send(Message(content="will be dropped"))
+ snd.close()
+ rcv.close()
+ #now verify it is no longer there
+ try:
+ self.ssn.sender("my-topic")
+ assert False, "Attempt to send to deleted exchange should fail"
+ except MessagingError: None
+
+ def test_autodelete_fanout_exchange_on_rcv_detach(self):
+ self.on_rcv_detach_test("fanout")
+
+ def test_autodelete_fanout_exchange_on_snd_detach(self):
+ self.on_snd_detach_test("fanout")
+
+ def test_autodelete_direct_exchange_on_rcv_detach(self):
+ self.on_rcv_detach_test("direct")
+
+ def test_autodelete_direct_exchange_on_snd_detach(self):
+ self.on_snd_detach_test("direct")
+
+ def test_autodelete_topic_exchange_on_rcv_detach(self):
+ self.on_rcv_detach_test("topic")
+
+ def test_autodelete_topic_exchange_on_snd_detach(self):
+ self.on_snd_detach_test("topic")
+
+ def test_autodelete_headers_exchange_on_rcv_detach(self):
+ self.on_rcv_detach_test("headers")
+
+ def test_autodelete_headers_exchange_on_snd_detach(self):
+ self.on_snd_detach_test("headers")
+
+
+
diff --git a/qpid/java/test-profiles/python_tests/Java010PythonExcludes b/qpid/java/test-profiles/python_tests/Java010PythonExcludes
index 66e0398413..de870e5e27 100644
--- a/qpid/java/test-profiles/python_tests/Java010PythonExcludes
+++ b/qpid/java/test-profiles/python_tests/Java010PythonExcludes
@@ -47,6 +47,8 @@ qpid_tests.broker_0_10.priority.PriorityTests.test_ring_queue*
qpid_tests.broker_0_10.priority.PriorityTests.test_fairshare*
qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_with_alias
+#The broker does not support the autodelete property on exchanges
+qpid_tests.broker_0_10.exchange.AutodeleteTests.testAutodelete*
###### Behavioural differences between Java & CPP Broker ######
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
index db52b36754..315991d585 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
@@ -122,49 +122,67 @@ class StandardExchangeVerifier:
Used as base class for classes that test standard exchanges."""
- def verifyDirectExchange(self, ex):
+ def verifyDirectExchange(self, ex, unbind=False):
"""Verify that ex behaves like a direct exchange."""
self.queue_declare(queue="q")
self.session.exchange_bind(queue="q", exchange=ex, binding_key="k")
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
try:
- self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
- self.fail("Expected Empty exception")
- except Queue.Empty: None # Expected
-
- def verifyFanOutExchange(self, ex):
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
+ try:
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
+ self.fail("Expected Empty exception")
+ except Queue.Empty: None # Expected
+ finally:
+ if unbind:
+ self.session.exchange_unbind(queue="q", exchange=ex, binding_key="k")
+
+ def verifyFanOutExchange(self, ex, unbind=False):
"""Verify that ex behaves like a fanout exchange."""
self.queue_declare(queue="q")
self.session.exchange_bind(queue="q", exchange=ex)
self.queue_declare(queue="p")
self.session.exchange_bind(queue="p", exchange=ex)
- for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+ try:
+ for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+ finally:
+ if unbind:
+ self.session.exchange_unbind(queue="q", exchange=ex, binding_key="")
+ self.session.exchange_unbind(queue="p", exchange=ex, binding_key="")
+
- def verifyTopicExchange(self, ex):
+ def verifyTopicExchange(self, ex, unbind=False):
"""Verify that ex behaves like a topic exchange"""
self.queue_declare(queue="a")
self.session.exchange_bind(queue="a", exchange=ex, binding_key="a.#.b.*")
- q = self.consume("a")
- self.assertPublishGet(q, ex, "a.b.x")
- self.assertPublishGet(q, ex, "a.x.b.x")
- self.assertPublishGet(q, ex, "a.x.x.b.x")
- # Shouldn't match
- self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))
- self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y"))
- self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x"))
- self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))
- self.assert_(q.empty())
-
- def verifyHeadersExchange(self, ex):
+ try:
+ q = self.consume("a")
+ self.assertPublishGet(q, ex, "a.b.x")
+ self.assertPublishGet(q, ex, "a.x.b.x")
+ self.assertPublishGet(q, ex, "a.x.x.b.x")
+ # Shouldn't match
+ self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))
+ self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y"))
+ self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x"))
+ self.session.message_transfer(destination=ex, message=self.createMessage("a.b"))
+ self.assert_(q.empty())
+ finally:
+ if unbind:
+ self.session.exchange_unbind(queue="a", exchange=ex, binding_key="a.#.b.*")
+
+ def verifyHeadersExchange(self, ex, unbind=False):
"""Verify that ex is a headers exchange"""
self.queue_declare(queue="q")
self.session.exchange_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
- q = self.consume("q")
- headers = {"name":"fred", "age":3}
- self.assertPublishGet(q, exchange=ex, properties=headers)
- self.session.message_transfer(destination=ex) # No headers, won't deliver
- self.assertEmpty(q);
-
+ try:
+ q = self.consume("q")
+ headers = {"name":"fred", "age":3}
+ self.assertPublishGet(q, exchange=ex, properties=headers)
+ self.session.message_transfer(destination=ex) # No headers, won't deliver
+ self.assertEmpty(q);
+ finally:
+ if unbind:
+ self.session.exchange_unbind(queue="q", exchange=ex, binding_key="")
+
class RecommendedTypesRuleTests(TestHelper, StandardExchangeVerifier):
"""
@@ -485,8 +503,39 @@ class MiscellaneousErrorsTests(TestHelper):
class ExchangeTests(TestHelper):
def testHeadersBindNoMatchArg(self):
self.session.queue_declare(queue="q", exclusive=True, auto_delete=True)
- try:
+ try:
self.session.exchange_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} )
self.fail("Expected failure for missing x-match arg.")
- except SessionException, e:
+ except SessionException, e:
self.assertEquals(541, e.args[0].error_code)
+
+class AutodeleteTests(TestHelper, StandardExchangeVerifier):
+ def checkNotExists(self, e):
+ try:
+ s = self.conn.session("verifier")
+ s.exchange_declare(exchange=e, passive=True)
+ s.exchange_delete(exchange=e)
+ self.fail("Expected failure for passive declare of %s" % e)
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+
+ def testAutodeleteFanout(self):
+ self.session.exchange_declare(exchange="e", type="fanout", auto_delete=True)
+ self.verifyFanOutExchange("e", unbind=True)
+ self.checkNotExists("e");
+
+ def testAutodeleteDirect(self):
+ self.session.exchange_declare(exchange="e", type="direct", auto_delete=True)
+ self.verifyDirectExchange("e", unbind=True)
+ self.checkNotExists("e");
+
+ def testAutodeleteTopic(self):
+ self.session.exchange_declare(exchange="e", type="topic", auto_delete=True)
+ self.verifyTopicExchange("e", unbind=True)
+ self.checkNotExists("e");
+
+ def testAutodeleteHeaders(self):
+ self.session.exchange_declare(exchange="e", type="headers", auto_delete=True)
+ self.verifyHeadersExchange("e", unbind=True)
+ self.checkNotExists("e");