diff options
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 11 |
3 files changed, 16 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 848f6a69dd..f14fdb6643 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -444,12 +444,18 @@ void Exchange::incOtherUsers() Mutex::ScopedLock l(usersLock); otherUsers++; } -void Exchange::decOtherUsers() +void Exchange::decOtherUsers(bool isControllingLink=false) { Mutex::ScopedLock l(usersLock); assert(otherUsers); if (otherUsers) otherUsers--; - if (!inUse() && !hasBindings()) checkAutodelete(); + if (autodelete) { + if (isControllingLink) { + if (broker) broker->getExchanges().destroy(name); + } else if (!inUse() && !hasBindings()) { + checkAutoDelete(); + } + } } bool Exchange::inUse() const { diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index a0d0604be8..3308d54e1b 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -192,7 +192,7 @@ public: QPID_BROKER_EXTERN bool inUseAsAlternate(); QPID_BROKER_EXTERN void incOtherUsers(); - QPID_BROKER_EXTERN void decOtherUsers(); + QPID_BROKER_EXTERN void decOtherUsers(bool isControllingLink); QPID_BROKER_EXTERN bool inUse() const; virtual std::string getType() const = 0; diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index fb3c220d02..f6b242c100 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -185,19 +185,20 @@ class IncomingToQueue : public DecodingIncoming class IncomingToExchange : public DecodingIncoming { public: - IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source) - : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise()) + IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source, bool icl) + : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise()), isControllingLink(icl) { exchange->incOtherUsers(); } ~IncomingToExchange() { - exchange->decOtherUsers(); + exchange->decOtherUsers(isControllingLink); } void handle(qpid::broker::Message& m); private: boost::shared_ptr<qpid::broker::Exchange> exchange; Authorise& authorise; + bool isControllingLink; }; Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o) @@ -425,7 +426,7 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.created && node.properties.trackControllingLink())); incoming[link] = q; } else if (node.exchange) { - boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source)); + boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source, node.created && node.properties.trackControllingLink())); incoming[link] = e; } else if (node.relay) { boost::shared_ptr<Incoming> in(new IncomingToRelay(link, connection.getBroker(), *this, source, name, pn_link_name(link), node.relay)); @@ -717,6 +718,8 @@ void IncomingToQueue::handle(qpid::broker::Message& message) void IncomingToExchange::handle(qpid::broker::Message& message) { + if (exchange->isDestroyed()) + throw qpid::framing::ResourceDeletedException(QPID_MSG("Exchange " << exchange->getName() << " has been deleted.")); authorise.route(exchange, message); DeliverableMessage deliverable(message, 0); exchange->route(deliverable); |