diff options
38 files changed, 412 insertions, 119 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index da6c2d4a84..1e25af3b64 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -295,7 +295,7 @@ Broker::Broker(const Broker::Options& conf) : framing::FieldTable args; // Default exchnge is not replicated. - exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs()); + exchanges.declare(empty, DirectExchange::typeName, false, false, noReplicateArgs()); RecoveredObjects objects; if (store.get() != 0) { @@ -313,7 +313,7 @@ Broker::Broker(const Broker::Options& conf) : declareStandardExchange(amq_match, HeadersExchange::typeName); if(conf.enableMgmt) { - exchanges.declare(qpid_management, ManagementTopicExchange::typeName, false, noReplicateArgs()); + exchanges.declare(qpid_management, ManagementTopicExchange::typeName, false, false, noReplicateArgs()); Exchange::shared_ptr mExchange = exchanges.get(qpid_management); Exchange::shared_ptr dExchange = exchanges.get(amq_direct); managementAgent->setExchange(mExchange, dExchange); @@ -323,9 +323,9 @@ Broker::Broker(const Broker::Options& conf) : std::string qmfDirect("qmf.default.direct"); std::pair<Exchange::shared_ptr, bool> topicPair( - exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, noReplicateArgs())); + exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, false, noReplicateArgs())); std::pair<Exchange::shared_ptr, bool> directPair( - exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, noReplicateArgs())); + exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, false, noReplicateArgs())); boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2); boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2); @@ -386,7 +386,7 @@ void Broker::declareStandardExchange(const std::string& name, const std::string& framing::FieldTable args; // Standard exchanges are not replicated. std::pair<Exchange::shared_ptr, bool> status = - exchanges.declare(name, type, storeEnabled, noReplicateArgs()); + exchanges.declare(name, type, storeEnabled, false, noReplicateArgs()); if (status.second && storeEnabled) { store->create(*status.first, framing::FieldTable ()); } @@ -759,12 +759,14 @@ void Broker::createObject(const std::string& type, const std::string& name, } } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) { bool durable(false); + bool autodelete(false); std::string exchangeType("topic"); std::string alternateExchange; Variant::Map extensions; for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { // extract durable, auto-delete and alternate-exchange properties if (i->first == DURABLE) durable = i->second; + else if (i->first == AUTO_DELETE) autodelete = i->second; else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString(); else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); //treat everything else as extension properties @@ -775,7 +777,7 @@ void Broker::createObject(const std::string& type, const std::string& name, try { std::pair<boost::shared_ptr<Exchange>, bool> result = - createExchange(name, exchangeType, durable, alternateExchange, arguments, userId, connectionId); + createExchange(name, exchangeType, durable, autodelete, alternateExchange, arguments, userId, connectionId); if (!result.second) { throw ObjectAlreadyExists(name); } @@ -1362,6 +1364,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( const std::string& name, const std::string& type, bool durable, + bool autodelete, const std::string& alternateExchange, const qpid::framing::FieldTable& arguments, const std::string& userId, @@ -1372,6 +1375,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( params.insert(make_pair(acl::PROP_TYPE, type)); params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); + params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE)); if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId)); } @@ -1384,7 +1388,7 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( std::pair<Exchange::shared_ptr, bool> result; result = exchanges.declare( - name, type, durable, arguments, alternate, connectionId, userId); + name, type, durable, autodelete, arguments, alternate, connectionId, userId); if (result.second) { if (durable) { store->create(*result.first, arguments); @@ -1394,7 +1398,8 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( << " rhost:" << connectionId << " type:" << type << " alternateExchange:" << alternateExchange - << " durable:" << (durable ? "T" : "F")); + << " durable:" << (durable ? "T" : "F") + << " autodelete:" << (autodelete ? "T" : "F")); } return result; } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index e658d6f03b..fe6dd6379b 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -329,6 +329,7 @@ class Broker : public sys::Runnable, public Plugin::Target, const std::string& name, const std::string& type, bool durable, + bool autodelete, const std::string& alternateExchange, const qpid::framing::FieldTable& args, const std::string& userId, const std::string& connectionId); diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 8ab7b59ed1..65c8287d4f 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -45,9 +45,9 @@ DirectExchange::DirectExchange(const string& _name, Manageable* _parent, Broker* mgmtExchange->set_type(typeName); } -DirectExchange::DirectExchange(const string& _name, bool _durable, +DirectExchange::DirectExchange(const string& _name, bool _durable, bool autodelete, const FieldTable& _args, Manageable* _parent, Broker* b) : - Exchange(_name, _durable, _args, _parent, b) + Exchange(_name, _durable, autodelete, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type(typeName); @@ -131,6 +131,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c { string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); bool propagate = false; + bool empty = false; QPID_LOG(debug, "Unbinding key [" << routingKey << "] from queue " << queue->getName() << " on exchange " << getName() << " origin=" << fedOrigin << ")" ); @@ -144,6 +145,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c } if (bk.queues.empty()) { bindings.erase(routingKey); + if (bindings.empty()) empty = true; } } else { return false; @@ -153,6 +155,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c // If I delete my local binding, propagate this unbind to any upstream brokers if (propagate) propagateFedOp(routingKey, string(), fedOpUnbind, string()); + if (empty) checkAutodelete(); return true; } @@ -163,7 +166,8 @@ void DirectExchange::route(Deliverable& msg) ConstBindingList b; { Mutex::ScopedLock l(lock); - b = bindings[routingKey].queues.snapshot(); + Bindings::iterator i = bindings.find(routingKey); + if (i != bindings.end()) b = i->second.queues.snapshot(); } doRoute(msg, b); } @@ -202,3 +206,9 @@ DirectExchange::~DirectExchange() { } const std::string DirectExchange::typeName("direct"); + +bool DirectExchange::hasBindings() +{ + Mutex::ScopedLock l(lock); + return !bindings.empty(); +} diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.h b/qpid/cpp/src/qpid/broker/DirectExchange.h index cfefef54e8..cbf9aa5975 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.h +++ b/qpid/cpp/src/qpid/broker/DirectExchange.h @@ -43,16 +43,16 @@ class DirectExchange : public virtual Exchange { public: QPID_BROKER_EXTERN static const std::string typeName; - + QPID_BROKER_EXTERN DirectExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN DirectExchange(const std::string& _name, - bool _durable, + bool _durable, bool autodelete, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); - virtual std::string getType() const { return typeName; } - + virtual std::string getType() const { return typeName; } + QPID_BROKER_EXTERN virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args); @@ -65,6 +65,8 @@ public: QPID_BROKER_EXTERN virtual ~DirectExchange(); virtual bool supportsDynamicBinding() { return true; } + protected: + bool hasBindings(); }; }} diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index efd83a3225..304ed7cec4 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -166,7 +166,7 @@ void Exchange::routeIVE(){ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : - name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false), + name(_name), durable(false), autodelete(false), alternateUsers(0), otherUsers(0), persistenceId(0), sequence(false), sequenceNo(0), ive(false), broker(b), destroyed(false) { if (parent != 0 && broker != 0) @@ -176,7 +176,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : { mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name)); mgmtExchange->set_durable(durable); - mgmtExchange->set_autoDelete(false); + mgmtExchange->set_autoDelete(autodelete); agent->addObject(mgmtExchange, 0, durable); if (broker) brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject()); @@ -184,9 +184,9 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : } } -Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, +Exchange::Exchange(const string& _name, bool _durable, bool _autodelete, const qpid::framing::FieldTable& _args, Manageable* parent, Broker* b) - : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), + : name(_name), durable(_durable), autodelete(_autodelete), alternateUsers(0), otherUsers(0), persistenceId(0), args(_args), sequence(false), sequenceNo(0), ive(false), broker(b), destroyed(false) { if (parent != 0 && broker != 0) @@ -196,7 +196,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel { mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name)); mgmtExchange->set_durable(durable); - mgmtExchange->set_autoDelete(false); + mgmtExchange->set_autoDelete(autodelete); mgmtExchange->set_arguments(ManagementAgent::toMap(args)); agent->addObject(mgmtExchange, 0, durable); if (broker) @@ -255,7 +255,7 @@ Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffe buffer.getShortString(altName); try { - Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first; + Exchange::shared_ptr exch = exchanges.declare(name, type, durable, false, args).first; exch->sequenceNo = args.getAsInt64(qpidSequenceCounter); exch->alternateName.assign(altName); return exch; @@ -415,5 +415,76 @@ void Exchange::setArgs(const framing::FieldTable& newArgs) { if (mgmtExchange) mgmtExchange->set_arguments(ManagementAgent::toMap(args)); } +void Exchange::checkAutodelete() +{ + if (autodelete && !inUse() && broker) { + broker->getExchanges().destroy(name); + } +} +void Exchange::incAlternateUsers() +{ + Mutex::ScopedLock l(usersLock); + alternateUsers++; +} + +void Exchange::decAlternateUsers() +{ + Mutex::ScopedLock l(usersLock); + alternateUsers--; +} + +bool Exchange::inUseAsAlternate() +{ + Mutex::ScopedLock l(usersLock); + return alternateUsers > 0; +} + +void Exchange::incOtherUsers() +{ + Mutex::ScopedLock l(usersLock); + otherUsers++; +} +void Exchange::decOtherUsers() +{ + Mutex::ScopedLock l(usersLock); + assert(otherUsers); + if (otherUsers) otherUsers--; + if (!inUse() && !hasBindings()) checkAutodelete(); +} +bool Exchange::inUse() const +{ + Mutex::ScopedLock l(usersLock); + return alternateUsers > 0 || otherUsers > 0; +} +void Exchange::setDeletionListener(const std::string& key, boost::function0<void> listener) +{ + Mutex::ScopedLock l(usersLock); + if (listener) deletionListeners[key] = listener; +} +void Exchange::unsetDeletionListener(const std::string& key) +{ + Mutex::ScopedLock l(usersLock); + deletionListeners.erase(key); +} + +void Exchange::destroy() +{ + std::map<std::string, boost::function0<void> > copy; + { + Mutex::ScopedLock l(usersLock); + destroyed = true; + deletionListeners.swap(copy); + } + for (std::map<std::string, boost::function0<void> >::iterator i = copy.begin(); i != copy.end(); ++i) { + QPID_LOG(notice, "Exchange::destroy() notifying " << i->first); + if (i->second) i->second(); + } +} +bool Exchange::isDestroyed() const +{ + Mutex::ScopedLock l(usersLock); + return destroyed; +} + }} diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index 70ed393f64..7d3bbcf88e 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -34,6 +34,8 @@ #include "qmf/org/apache/qpid/broker/Exchange.h" #include "qmf/org/apache/qpid/broker/Binding.h" #include "qmf/org/apache/qpid/broker/Broker.h" +#include <map> +#include <boost/function.hpp> namespace qpid { namespace broker { @@ -64,9 +66,13 @@ public: private: const std::string name; const bool durable; + const bool autodelete; std::string alternateName; boost::shared_ptr<Exchange> alternate; + mutable qpid::sys::Mutex usersLock; uint32_t alternateUsers; + uint32_t otherUsers; + std::map<std::string, boost::function0<void> > deletionListeners; mutable uint64_t persistenceId; protected: @@ -89,7 +95,8 @@ protected: typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList; void doRoute(Deliverable& msg, ConstBindingList b); void routeIVE(); - + void checkAutodelete(); + virtual bool hasBindings() = 0; struct MatchQueue { const boost::shared_ptr<Queue> queue; @@ -167,7 +174,7 @@ public: QPID_BROKER_EXTERN explicit Exchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); - QPID_BROKER_EXTERN Exchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, + QPID_BROKER_EXTERN Exchange(const std::string& _name, bool _durable, bool autodelete, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_INLINE_EXTERN virtual ~Exchange(); @@ -179,9 +186,13 @@ public: QPID_BROKER_EXTERN Exchange::shared_ptr getAlternate() { return alternate; } QPID_BROKER_EXTERN void setAlternate(Exchange::shared_ptr _alternate); - void incAlternateUsers() { alternateUsers++; } - void decAlternateUsers() { alternateUsers--; } - bool inUseAsAlternate() { return alternateUsers > 0; } + QPID_BROKER_EXTERN void incAlternateUsers(); + QPID_BROKER_EXTERN void decAlternateUsers(); + QPID_BROKER_EXTERN bool inUseAsAlternate(); + + QPID_BROKER_EXTERN void incOtherUsers(); + QPID_BROKER_EXTERN void decOtherUsers(); + QPID_BROKER_EXTERN bool inUse() const; virtual std::string getType() const = 0; @@ -233,8 +244,11 @@ public: bool routeWithAlternate(Deliverable& message); - void destroy() { destroyed = true; } - bool isDestroyed() const { return destroyed; } + QPID_BROKER_EXTERN void destroy(); + QPID_BROKER_EXTERN bool isDestroyed() const; + + QPID_BROKER_EXTERN void setDeletionListener(const std::string& key, boost::function0<void> listener); + QPID_BROKER_EXTERN void unsetDeletionListener(const std::string& key); protected: qpid::sys::Mutex bridgeLock; diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp index 9eeffadb90..34a31fe769 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -42,11 +42,11 @@ namespace _qmf = qmf::org::apache::qpid::broker; pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){ - return declare(name, type, false, FieldTable()); + return declare(name, type, false, false, FieldTable()); } pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare( - const string& name, const string& type, bool durable, const FieldTable& args, + const string& name, const string& type, bool durable, bool autodelete, const FieldTable& args, Exchange::shared_ptr alternate, const string& connectionId, const string& userId) { Exchange::shared_ptr exchange; @@ -56,13 +56,13 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare( ExchangeMap::iterator i = exchanges.find(name); if (i == exchanges.end()) { if (type == TopicExchange::typeName){ - exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker)); + exchange = Exchange::shared_ptr(new TopicExchange(name, durable, autodelete, args, parent, broker)); }else if(type == DirectExchange::typeName){ - exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker)); + exchange = Exchange::shared_ptr(new DirectExchange(name, durable, autodelete, args, parent, broker)); }else if(type == FanOutExchange::typeName){ - exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker)); + exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, autodelete, args, parent, broker)); }else if (type == HeadersExchange::typeName) { - exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker)); + exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, autodelete, args, parent, broker)); }else if (type == ManagementDirectExchange::typeName) { exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker)); }else if (type == ManagementTopicExchange::typeName) { @@ -74,7 +74,7 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare( if (i == factory.end()) { throw UnknownExchangeTypeException(); } else { - exchange = i->second(name, durable, args, parent, broker); + exchange = i->second(name, durable, autodelete, args, parent, broker); } } exchanges[name] = exchange; diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h index 8db2c34863..99caf30269 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h @@ -42,8 +42,8 @@ struct UnknownExchangeTypeException{}; class ExchangeRegistry{ public: - typedef boost::function5<Exchange::shared_ptr, const std::string&, - bool, const qpid::framing::FieldTable&, qpid::management::Manageable*, qpid::broker::Broker*> FactoryFunction; + typedef boost::function6<Exchange::shared_ptr, const std::string&, + bool, bool, const qpid::framing::FieldTable&, qpid::management::Manageable*, qpid::broker::Broker*> FactoryFunction; ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {} QPID_BROKER_EXTERN std::pair<Exchange::shared_ptr, bool> declare( @@ -53,6 +53,7 @@ class ExchangeRegistry{ const std::string& name, const std::string& type, bool durable, + bool autodelete, const qpid::framing::FieldTable& args = framing::FieldTable(), Exchange::shared_ptr alternate = Exchange::shared_ptr(), const std::string& connectionId = std::string(), diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index 20ca06e048..8c1d3f4954 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -38,9 +38,9 @@ FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent, Br mgmtExchange->set_type (typeName); } -FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, +FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, bool autodelete, const FieldTable& _args, Manageable* _parent, Broker* b) : - Exchange(_name, _durable, _args, _parent, b) + Exchange(_name, _durable, autodelete, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); @@ -101,6 +101,7 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons if (propagate) propagateFedOp(string(), string(), fedOpUnbind, string()); + if (bindings.empty()) checkAutodelete(); return true; } @@ -109,7 +110,7 @@ void FanOutExchange::route(Deliverable& msg) PreRoute pr(msg, this); doRoute(msg, bindings.snapshot()); } - + bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const) { BindingsArray::ConstPtr ptr = bindings.snapshot(); @@ -123,3 +124,9 @@ FanOutExchange::~FanOutExchange() { } const std::string FanOutExchange::typeName("fanout"); + +bool FanOutExchange::hasBindings() +{ + BindingsArray::ConstPtr ptr = bindings.snapshot(); + return ptr && !ptr->empty(); +} diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.h b/qpid/cpp/src/qpid/broker/FanOutExchange.h index c979fdca25..a92ff7ce97 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.h +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.h @@ -38,16 +38,16 @@ class FanOutExchange : public virtual Exchange { FedBinding fedBinding; public: static const std::string typeName; - + QPID_BROKER_EXTERN FanOutExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN FanOutExchange(const std::string& _name, - bool _durable, + bool _durable, bool autodelete, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); - virtual std::string getType() const { return typeName; } - + virtual std::string getType() const { return typeName; } + QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); @@ -62,6 +62,8 @@ class FanOutExchange : public virtual Exchange { QPID_BROKER_EXTERN virtual ~FanOutExchange(); virtual bool supportsDynamicBinding() { return true; } + protected: + bool hasBindings(); }; } diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 4e86b09565..19c7f107f6 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -164,9 +164,9 @@ HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broke mgmtExchange->set_type (typeName); } -HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, +HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, bool autodelete, const FieldTable& _args, Manageable* _parent, Broker* b) : - Exchange(_name, _durable, _args, _parent, b) + Exchange(_name, _durable, autodelete, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); @@ -288,6 +288,7 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, if (propagate) { propagateFedOp(bindingKey, string(), fedOpUnbind, string()); } + if (bindings.empty()) checkAutodelete(); return true; } @@ -404,3 +405,8 @@ bool HeadersExchange::FedUnbindModifier::operator()(BoundKey & bk) return true; } +bool HeadersExchange::hasBindings() +{ + Bindings::ConstPtr ptr = bindings.snapshot(); + return ptr && !ptr->empty(); +} diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h index e51478d365..54c69491e9 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.h +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h @@ -45,12 +45,12 @@ class HeadersExchange : public virtual Exchange { struct MatchArgs { - const Queue::shared_ptr queue; + const Queue::shared_ptr queue; const qpid::framing::FieldTable* args; MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a); bool operator()(const BoundKey & bk); }; - + struct MatchKey { const Queue::shared_ptr queue; @@ -77,6 +77,7 @@ class HeadersExchange : public virtual Exchange { protected: void getNonFedArgs(const framing::FieldTable* args, framing::FieldTable& nonFedArgs); + bool hasBindings(); public: QPID_BROKER_EXTERN static const std::string typeName; @@ -84,12 +85,12 @@ class HeadersExchange : public virtual Exchange { QPID_BROKER_EXTERN HeadersExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN HeadersExchange(const std::string& _name, - bool _durable, + bool _durable, bool autodelete, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); - - virtual std::string getType() const { return typeName; } - + + virtual std::string getType() const { return typeName; } + QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index cd7ba35dd8..fe1cac8aab 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -90,7 +90,7 @@ public: bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; } bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*) { return false; } bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const) {return false;} - + bool hasBindings() { return false; } // Process messages sent from the remote's amq.failover exchange by extracting the failover URLs // and saving them should the Link need to reconnect. void route(broker::Deliverable& /*msg*/) diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index eff77db02f..9a8110d54f 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -69,9 +69,8 @@ static const std::string _FALSE("false"); void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type, const string& alternateExchange, - bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ + bool passive, bool durable, bool autodelete, const FieldTable& args){ - //TODO: implement autoDelete Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { alternate = getBroker().getExchanges().get(alternateExchange); @@ -83,6 +82,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const params.insert(make_pair(acl::PROP_TYPE, type)); params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); + params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE)); if (!acl->authorise(getConnection().getUserId(),acl::ACT_ACCESS,acl::OBJ_EXCHANGE,exchange,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange access request from " << getConnection().getUserId())); } @@ -95,7 +95,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const } try{ std::pair<Exchange::shared_ptr, bool> response = - getBroker().createExchange(exchange, type, durable, alternateExchange, args, + getBroker().createExchange(exchange, type, durable, autodelete, alternateExchange, args, getConnection().getUserId(), getConnection().getMgmtId()); if (!response.second) { //exchange already there, not created @@ -106,7 +106,8 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const << " rhost:" << getConnection().getMgmtId() << " type:" << type << " alternateExchange:" << alternateExchange - << " durable:" << (durable ? "T" : "F")); + << " durable:" << (durable ? "T" : "F") + << " autodelete:" << (autodelete ? "T" : "F")); } }catch(UnknownExchangeTypeException& /*e*/){ throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type)); diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index 6a081bf65f..558c900a4f 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -147,9 +147,9 @@ TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b mgmtExchange->set_type (typeName); } -TopicExchange::TopicExchange(const std::string& _name, bool _durable, +TopicExchange::TopicExchange(const std::string& _name, bool _durable, bool autodelete, const FieldTable& _args, Manageable* _parent, Broker* b) : - Exchange(_name, _durable, _args, _parent, b), + Exchange(_name, _durable, autodelete, _args, _parent, b), nBindings(0) { if (mgmtExchange != 0) @@ -241,6 +241,7 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKe deleteBinding(queue, routingKey, bk); if (propagate) propagateFedOp(routingKey, string(), fedOpUnbind, string()); + if (nBindings == 0) checkAutodelete(); return true; } @@ -340,4 +341,10 @@ TopicExchange::~TopicExchange() { const std::string TopicExchange::typeName("topic"); +bool TopicExchange::hasBindings() +{ + RWlock::ScopedRlock l(lock); + return nBindings > 0; +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h index b8b67bdafa..d54f23a70d 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.h +++ b/qpid/cpp/src/qpid/broker/TopicExchange.h @@ -88,7 +88,7 @@ public: QPID_BROKER_EXTERN TopicExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN TopicExchange(const std::string& _name, - bool _durable, + bool _durable, bool autodelete, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); @@ -111,6 +111,8 @@ public: class TopicExchangeTester; friend class TopicExchangeTester; + protected: + bool hasBindings(); }; diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp index 45536c2262..f1d29fe00e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp @@ -181,6 +181,10 @@ void NodeProperties::write(pn_data_t* data, boost::shared_ptr<Exchange> node) pn_data_put_symbol(data, convert(ALTERNATE_EXCHANGE)); pn_data_put_string(data, convert(node->getAlternate()->getName())); } + if (autoDelete) { + pn_data_put_symbol(data, convert(AUTO_DELETE)); + pn_data_put_bool(data, autoDelete); + } for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { if ((i->first == QPID_MSG_SEQUENCE || i->first == QPID_IVE) && node->getArgs().isSet(i->first)) { @@ -333,6 +337,10 @@ bool NodeProperties::isExclusive() const { return exclusive; } +bool NodeProperties::isAutodelete() const +{ + return autoDelete; +} std::string NodeProperties::getExchangeType() const { return exchangeType; diff --git a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h index df96d5a023..4ac3aa8a0f 100644 --- a/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h +++ b/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h @@ -62,6 +62,7 @@ class NodeProperties : public qpid::amqp::MapReader QueueSettings getQueueSettings(); bool isDurable() const; bool isExclusive() const; + bool isAutodelete() const; std::string getExchangeType() const; std::string getAlternateExchange() const; bool trackControllingLink() const; diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 16aadfc6c8..7170da0797 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -185,7 +185,14 @@ class IncomingToExchange : public DecodingIncoming { public: IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source) - : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise()) {} + : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise()) + { + exchange->incOtherUsers(); + } + ~IncomingToExchange() + { + exchange->decOtherUsers(); + } void handle(qpid::broker::Message& m); private: boost::shared_ptr<qpid::broker::Exchange> exchange; @@ -243,8 +250,9 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te } qpid::framing::FieldTable args; qpid::amqp_0_10::translate(node.properties.getProperties(), args); - node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(), - args, connection.getUserId(), connection.getId()).first; + node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(), + node.properties.getAlternateExchange(), + args, connection.getUserId(), connection.getId()).first; } else { if (node.exchange) { QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists"); diff --git a/qpid/cpp/src/qpid/broker/amqp/Topic.cpp b/qpid/cpp/src/qpid/broker/amqp/Topic.cpp index 9640988834..c04f62b3d1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Topic.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Topic.cpp @@ -111,6 +111,7 @@ boost::shared_ptr<Topic> TopicRegistry::createTopic(Broker& broker, const std::s { boost::shared_ptr<Topic> topic(new Topic(broker, name, properties)); add(topic); + topic->getExchange()->setDeletionListener(name, boost::bind(&TopicRegistry::remove, this, name)); return topic; } @@ -174,6 +175,7 @@ boost::shared_ptr<Topic> TopicRegistry::remove(const std::string& name) if (i != topics.end()) { result = i->second; topics.erase(i); + result->getExchange()->unsetDeletionListener(name); } return result; } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index eb1206437a..a59c874594 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -551,8 +551,10 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { QPID_LOG(warning, logPrefix << "Declare event, replacing existing exchange: " << name); } + //Note: unlike qieth queues, autodeleted exchanges have no + //messages, so need no special handling for autodelete in ha CreateExchangeResult result = createExchange( - name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, + name, values[EXTYPE].asString(), values[DURABLE].asBool(), values[AUTODEL].asBool(), args, values[ALTEX].asString()); assert(result.second); } @@ -700,7 +702,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { deleteExchange(name); } CreateExchangeResult result = createExchange( - name, values[TYPE].asString(), values[DURABLE].asBool(), args, + name, values[TYPE].asString(), values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); } @@ -849,6 +851,7 @@ BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange( const std::string& name, const std::string& type, bool durable, + bool autodelete, const qpid::framing::FieldTable& args, const std::string& alternateExchange) { @@ -857,6 +860,7 @@ BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange( name, type, durable, + autodelete, string(), // Set alternate exchange below args, userId, @@ -872,6 +876,7 @@ BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange( bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } +bool BrokerReplicator::hasBindings() { return false; } string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 395f0706d9..07b992df6a 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -84,6 +84,7 @@ class BrokerReplicator : public broker::Exchange, bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + bool hasBindings(); void shutdown(); QueueReplicatorPtr findQueueReplicator(const std::string& qname); @@ -132,6 +133,7 @@ class BrokerReplicator : public broker::Exchange, const std::string& name, const std::string& type, bool durable, + bool autodelete, const qpid::framing::FieldTable& args, const std::string& alternateExchange); diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp index 9c7b986bf8..f1b87c63c8 100644 --- a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp +++ b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp @@ -108,6 +108,11 @@ bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, return queues.find(queue) != queues.end(); } +bool FailoverExchange::hasBindings() { + Lock l(lock); + return !queues.empty(); +} + void FailoverExchange::route(Deliverable&) { QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring"); } diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.h b/qpid/cpp/src/qpid/ha/FailoverExchange.h index 6ec1d0f152..5263bdfb03 100644 --- a/qpid/cpp/src/qpid/ha/FailoverExchange.h +++ b/qpid/cpp/src/qpid/ha/FailoverExchange.h @@ -54,6 +54,7 @@ class FailoverExchange : public broker::Exchange bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args); + bool hasBindings(); void route(broker::Deliverable& msg); private: diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 0f8ed0a0a7..416bb329a6 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -66,6 +66,7 @@ class PrimaryTxObserver::Exchange : public broker::Exchange { bool bind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; } bool unbind(boost::shared_ptr<Queue>, const string&, const FieldTable*) { return false; } bool isBound(boost::shared_ptr<Queue>, const string* const, const FieldTable* const) { return false; } + bool hasBindings() { return false; } string getType() const { return TYPE_NAME; } private: diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 22af7284a8..8037559c3d 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -293,6 +293,7 @@ ReplicationId QueueReplicator::getMaxId() { bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; } bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; } +bool QueueReplicator::hasBindings() { return false; } std::string QueueReplicator::getType() const { return TYPE_NAME; } diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 01abc88843..cbb36757f6 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -88,6 +88,7 @@ class QueueReplicator : public broker::Exchange, bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + bool hasBindings(); protected: typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn; diff --git a/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp index 1c1d6ef3db..8ede6940b0 100644 --- a/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp +++ b/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp @@ -36,8 +36,8 @@ ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent, Broker* b) : - Exchange (_name, _durable, _args, _parent, b), - DirectExchange(_name, _durable, _args, _parent, b), + Exchange (_name, _durable, false, _args, _parent, b), + DirectExchange(_name, _durable, false, _args, _parent, b), managementAgent(0) {} void ManagementDirectExchange::route(Deliverable& msg) diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp index c8bfef3785..0241d5a404 100644 --- a/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp +++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp @@ -35,8 +35,8 @@ ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent, Broker* b) : - Exchange (_name, _durable, _args, _parent, b), - TopicExchange(_name, _durable, _args, _parent, b), + Exchange (_name, _durable, false, _args, _parent, b), + TopicExchange(_name, _durable, false, _args, _parent, b), managementAgent(0) {} void ManagementTopicExchange::route(Deliverable& msg) diff --git a/qpid/cpp/src/qpid/management/ManagementTopicExchange.h b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h index eff01a8552..f5192a0936 100644 --- a/qpid/cpp/src/qpid/management/ManagementTopicExchange.h +++ b/qpid/cpp/src/qpid/management/ManagementTopicExchange.h @@ -37,7 +37,7 @@ class ManagementTopicExchange : public virtual TopicExchange static const std::string typeName; ManagementTopicExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0); - ManagementTopicExchange(const std::string& _name, bool _durable, + ManagementTopicExchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* _parent = 0, Broker* broker = 0); diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.cpp b/qpid/cpp/src/qpid/xml/XmlExchange.cpp index 3802ec5f7f..837693f53f 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchange.cpp +++ b/qpid/cpp/src/qpid/xml/XmlExchange.cpp @@ -109,9 +109,9 @@ XmlExchange::XmlExchange(const std::string& _name, Manageable* _parent, Broker* mgmtExchange->set_type (typeName); } -XmlExchange::XmlExchange(const std::string& _name, bool _durable, +XmlExchange::XmlExchange(const std::string& _name, bool _durable, bool autodelete, const FieldTable& _args, Manageable* _parent, Broker* b) : - Exchange(_name, _durable, _args, _parent, b) + Exchange(_name, _durable, autodelete, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); @@ -201,9 +201,11 @@ bool XmlExchange::unbindLH(Queue::shared_ptr queue, const std::string& bindingKe if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); } + if (bindingsMap[bindingKey].empty()) bindingsMap.erase(bindingKey); + if (bindingsMap.empty()) checkAutodelete(); return true; } else { - return false; + return false; } } @@ -443,6 +445,11 @@ bool XmlExchange::MatchQueueAndOrigin::operator()(XmlBinding::shared_ptr b) const std::string XmlExchange::typeName("xml"); - + +bool XmlExchange::hasBindings() +{ + RWlock::ScopedRlock l(lock); + return !bindingsMap.empty(); +} } } diff --git a/qpid/cpp/src/qpid/xml/XmlExchange.h b/qpid/cpp/src/qpid/xml/XmlExchange.h index fd3f8d0278..f9e92d60f9 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchange.h +++ b/qpid/cpp/src/qpid/xml/XmlExchange.h @@ -71,7 +71,7 @@ class XmlExchange : public virtual Exchange { static const std::string typeName; XmlExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); - XmlExchange(const std::string& _name, bool _durable, + XmlExchange(const std::string& _name, bool _durable, bool autodelete, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } @@ -107,6 +107,8 @@ class XmlExchange : public virtual Exchange { bool operator()(XmlBinding::shared_ptr b); }; + protected: + bool hasBindings(); private: bool unbindLH(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); }; diff --git a/qpid/cpp/src/qpid/xml/XmlExchangePlugin.cpp b/qpid/cpp/src/qpid/xml/XmlExchangePlugin.cpp index 742b878e86..02f0110faf 100644 --- a/qpid/cpp/src/qpid/xml/XmlExchangePlugin.cpp +++ b/qpid/cpp/src/qpid/xml/XmlExchangePlugin.cpp @@ -34,11 +34,12 @@ using namespace std; class Broker; Exchange::shared_ptr create(const std::string& name, bool durable, - const framing::FieldTable& args, + bool autodelete, + const framing::FieldTable& args, management::Manageable* parent, Broker* broker) { - Exchange::shared_ptr e(new XmlExchange(name, durable, args, parent, broker)); + Exchange::shared_ptr e(new XmlExchange(name, durable, autodelete, args, parent, broker)); return e; } diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp index 8c2dbb21c8..df0684e832 100644 --- a/qpid/cpp/src/tests/ExchangeTest.cpp +++ b/qpid/cpp/src/tests/ExchangeTest.cpp @@ -158,12 +158,12 @@ QPID_AUTO_TEST_CASE(testIsBound) QPID_AUTO_TEST_CASE(testDeleteGetAndRedeclare) { ExchangeRegistry exchanges; - exchanges.declare("my-exchange", "direct", false, FieldTable()); + exchanges.declare("my-exchange", "direct", false, false, FieldTable()); exchanges.destroy("my-exchange"); try { exchanges.get("my-exchange"); } catch (const NotFoundException&) {} - std::pair<Exchange::shared_ptr, bool> response = exchanges.declare("my-exchange", "direct", false, FieldTable()); + std::pair<Exchange::shared_ptr, bool> response = exchanges.declare("my-exchange", "direct", false, false, FieldTable()); BOOST_CHECK_EQUAL(string("direct"), response.first->getType()); } @@ -174,7 +174,7 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) char* buff = new char[10000]; framing::Buffer buffer(buff,10000); { - DirectExchange direct("direct1", false, args); + DirectExchange direct("direct1", false, false, args); DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0); DeliverableMessage msg2(MessageUtils::createMessage("e", "abc"), 0); @@ -188,9 +188,9 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) BOOST_CHECK_EQUAL(2, msg2.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); BOOST_CHECK_EQUAL(3, msg3.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); - FanOutExchange fanout("fanout1", false, args); - HeadersExchange header("headers1", false, args); - TopicExchange topic ("topic1", false, args); + FanOutExchange fanout("fanout1", false, false, args); + HeadersExchange header("headers1", false, false, args); + TopicExchange topic ("topic1", false, false, args); // check other exchanges, that they preroute DeliverableMessage msg4(MessageUtils::createMessage("e", "abc"), 0); @@ -226,10 +226,10 @@ QPID_AUTO_TEST_CASE(testIVEOption) { FieldTable args; args.setInt("qpid.ive",1); - DirectExchange direct("direct1", false, args); - FanOutExchange fanout("fanout1", false, args); - HeadersExchange header("headers1", false, args); - TopicExchange topic ("topic1", false, args); + DirectExchange direct("direct1", false, false, args); + FanOutExchange fanout("fanout1", false, false, args); + HeadersExchange header("headers1", false, false, args); + TopicExchange topic ("topic1", false, false, args); qpid::types::Variant::Map properties; properties["routing-key"] = "abc"; diff --git a/qpid/cpp/src/tests/legacystore/SimpleTest.cpp b/qpid/cpp/src/tests/legacystore/SimpleTest.cpp index c769bdeb75..d3f040817f 100644 --- a/qpid/cpp/src/tests/legacystore/SimpleTest.cpp +++ b/qpid/cpp/src/tests/legacystore/SimpleTest.cpp @@ -102,7 +102,7 @@ void bindAndUnbind(const string& exchangeName, const string& queueName, { MessageStoreImpl store(&br); store.init(test_dir, 4, 1, true); // truncate store - Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args)); + Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, false, args)); Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0)); store.create(*exchange, qpid::framing::FieldTable()); store.create(*queue, qpid::framing::FieldTable()); @@ -376,7 +376,7 @@ QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy) MessageStoreImpl store(&br); store.init(test_dir, 4, 1, true); // truncate store ExchangeRegistry registry; - Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first; + Exchange::shared_ptr exchange = registry.declare(name, type, true, false, args).first; store.create(*exchange, qpid::framing::FieldTable()); id = exchange->getPersistenceId(); BOOST_REQUIRE(id); @@ -446,7 +446,7 @@ QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind) { MessageStoreImpl store(&br); store.init(test_dir, 4, 1, true); // truncate store - Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args)); + Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, false, args)); Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0)); Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0)); store.create(*exchange, qpid::framing::FieldTable()); diff --git a/qpid/cpp/src/tests/misc.py b/qpid/cpp/src/tests/misc.py index 108120f321..257fb9e754 100644 --- a/qpid/cpp/src/tests/misc.py +++ b/qpid/cpp/src/tests/misc.py @@ -50,3 +50,70 @@ class MiscellaneousTests (VersionTest): con.close() other.close() +class AutoDeleteExchangeTests(VersionTest): + def init_test(self, exchange_type="topic"): + rcv = self.ssn.receiver("my-topic; {create:always, node:{type:topic, properties:{'exchange-type':%s, 'auto-delete':True}}}" % exchange_type) + snd = self.ssn.sender("my-topic") + #send some messages + msgs = [Message(content=c) for c in ['a','b','c','d']] + for m in msgs: snd.send(m) + + #verify receipt + for expected in msgs: + msg = rcv.fetch(0) + assert msg.content == expected.content + self.ssn.acknowledge(msg) + return (rcv, snd) + + def on_rcv_detach_test(self, exchange_type="topic"): + rcv, snd = self.init_test(exchange_type) + rcv.close() + #verify exchange is still there + snd.send(Message(content="will be dropped")) + snd.close() + #now verify it is no longer there + try: + self.ssn.sender("my-topic") + assert False, "Attempt to send to deleted exchange should fail" + except MessagingError: None + + def on_snd_detach_test(self, exchange_type="topic"): + rcv, snd = self.init_test(exchange_type) + snd.close() + #verify exchange is still there + snd = self.ssn.sender("my-topic") + snd.send(Message(content="will be dropped")) + snd.close() + rcv.close() + #now verify it is no longer there + try: + self.ssn.sender("my-topic") + assert False, "Attempt to send to deleted exchange should fail" + except MessagingError: None + + def test_autodelete_fanout_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("fanout") + + def test_autodelete_fanout_exchange_on_snd_detach(self): + self.on_snd_detach_test("fanout") + + def test_autodelete_direct_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("direct") + + def test_autodelete_direct_exchange_on_snd_detach(self): + self.on_snd_detach_test("direct") + + def test_autodelete_topic_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("topic") + + def test_autodelete_topic_exchange_on_snd_detach(self): + self.on_snd_detach_test("topic") + + def test_autodelete_headers_exchange_on_rcv_detach(self): + self.on_rcv_detach_test("headers") + + def test_autodelete_headers_exchange_on_snd_detach(self): + self.on_snd_detach_test("headers") + + + diff --git a/qpid/java/test-profiles/python_tests/Java010PythonExcludes b/qpid/java/test-profiles/python_tests/Java010PythonExcludes index 66e0398413..de870e5e27 100644 --- a/qpid/java/test-profiles/python_tests/Java010PythonExcludes +++ b/qpid/java/test-profiles/python_tests/Java010PythonExcludes @@ -47,6 +47,8 @@ qpid_tests.broker_0_10.priority.PriorityTests.test_ring_queue* qpid_tests.broker_0_10.priority.PriorityTests.test_fairshare* qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_with_alias +#The broker does not support the autodelete property on exchanges +qpid_tests.broker_0_10.exchange.AutodeleteTests.testAutodelete* ###### Behavioural differences between Java & CPP Broker ###### diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py index db52b36754..315991d585 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py @@ -122,49 +122,67 @@ class StandardExchangeVerifier: Used as base class for classes that test standard exchanges.""" - def verifyDirectExchange(self, ex): + def verifyDirectExchange(self, ex, unbind=False): """Verify that ex behaves like a direct exchange.""" self.queue_declare(queue="q") self.session.exchange_bind(queue="q", exchange=ex, binding_key="k") - self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") try: - self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") - self.fail("Expected Empty exception") - except Queue.Empty: None # Expected - - def verifyFanOutExchange(self, ex): + self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") + try: + self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") + self.fail("Expected Empty exception") + except Queue.Empty: None # Expected + finally: + if unbind: + self.session.exchange_unbind(queue="q", exchange=ex, binding_key="k") + + def verifyFanOutExchange(self, ex, unbind=False): """Verify that ex behaves like a fanout exchange.""" self.queue_declare(queue="q") self.session.exchange_bind(queue="q", exchange=ex) self.queue_declare(queue="p") self.session.exchange_bind(queue="p", exchange=ex) - for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) + try: + for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) + finally: + if unbind: + self.session.exchange_unbind(queue="q", exchange=ex, binding_key="") + self.session.exchange_unbind(queue="p", exchange=ex, binding_key="") + - def verifyTopicExchange(self, ex): + def verifyTopicExchange(self, ex, unbind=False): """Verify that ex behaves like a topic exchange""" self.queue_declare(queue="a") self.session.exchange_bind(queue="a", exchange=ex, binding_key="a.#.b.*") - q = self.consume("a") - self.assertPublishGet(q, ex, "a.b.x") - self.assertPublishGet(q, ex, "a.x.b.x") - self.assertPublishGet(q, ex, "a.x.x.b.x") - # Shouldn't match - self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) - self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y")) - self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x")) - self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) - self.assert_(q.empty()) - - def verifyHeadersExchange(self, ex): + try: + q = self.consume("a") + self.assertPublishGet(q, ex, "a.b.x") + self.assertPublishGet(q, ex, "a.x.b.x") + self.assertPublishGet(q, ex, "a.x.x.b.x") + # Shouldn't match + self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) + self.session.message_transfer(destination=ex, message=self.createMessage("a.b.x.y")) + self.session.message_transfer(destination=ex, message=self.createMessage("x.a.b.x")) + self.session.message_transfer(destination=ex, message=self.createMessage("a.b")) + self.assert_(q.empty()) + finally: + if unbind: + self.session.exchange_unbind(queue="a", exchange=ex, binding_key="a.#.b.*") + + def verifyHeadersExchange(self, ex, unbind=False): """Verify that ex is a headers exchange""" self.queue_declare(queue="q") self.session.exchange_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) - q = self.consume("q") - headers = {"name":"fred", "age":3} - self.assertPublishGet(q, exchange=ex, properties=headers) - self.session.message_transfer(destination=ex) # No headers, won't deliver - self.assertEmpty(q); - + try: + q = self.consume("q") + headers = {"name":"fred", "age":3} + self.assertPublishGet(q, exchange=ex, properties=headers) + self.session.message_transfer(destination=ex) # No headers, won't deliver + self.assertEmpty(q); + finally: + if unbind: + self.session.exchange_unbind(queue="q", exchange=ex, binding_key="") + class RecommendedTypesRuleTests(TestHelper, StandardExchangeVerifier): """ @@ -485,8 +503,39 @@ class MiscellaneousErrorsTests(TestHelper): class ExchangeTests(TestHelper): def testHeadersBindNoMatchArg(self): self.session.queue_declare(queue="q", exclusive=True, auto_delete=True) - try: + try: self.session.exchange_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} ) self.fail("Expected failure for missing x-match arg.") - except SessionException, e: + except SessionException, e: self.assertEquals(541, e.args[0].error_code) + +class AutodeleteTests(TestHelper, StandardExchangeVerifier): + def checkNotExists(self, e): + try: + s = self.conn.session("verifier") + s.exchange_declare(exchange=e, passive=True) + s.exchange_delete(exchange=e) + self.fail("Expected failure for passive declare of %s" % e) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + + + def testAutodeleteFanout(self): + self.session.exchange_declare(exchange="e", type="fanout", auto_delete=True) + self.verifyFanOutExchange("e", unbind=True) + self.checkNotExists("e"); + + def testAutodeleteDirect(self): + self.session.exchange_declare(exchange="e", type="direct", auto_delete=True) + self.verifyDirectExchange("e", unbind=True) + self.checkNotExists("e"); + + def testAutodeleteTopic(self): + self.session.exchange_declare(exchange="e", type="topic", auto_delete=True) + self.verifyTopicExchange("e", unbind=True) + self.checkNotExists("e"); + + def testAutodeleteHeaders(self): + self.session.exchange_declare(exchange="e", type="headers", auto_delete=True) + self.verifyHeadersExchange("e", unbind=True) + self.checkNotExists("e"); |