diff options
author | Gordon Sim <gsim@apache.org> | 2014-12-17 14:29:44 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2014-12-17 14:29:44 +0000 |
commit | faec291315dc851eefa59b63f3c7107888b75f98 (patch) | |
tree | 4715e5a7718ef91759230c49e4d218707a9f2397 | |
parent | 9f634ba1b0f2ee6ea2f2e0c3a7a3d892d3eeda7c (diff) | |
download | qpid-python-faec291315dc851eefa59b63f3c7107888b75f98.tar.gz |
QPID-6274: Delete subscription queue immediately on link close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646260 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Incoming.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Incoming.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Relay.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Relay.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 12 |
9 files changed, 26 insertions, 24 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 658fc26919..b1f7d0524b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1300,10 +1300,10 @@ struct AutoDeleteTask : qpid::sys::TimerTask } }; -void Queue::scheduleAutoDelete() +void Queue::scheduleAutoDelete(bool immediate) { if (canAutoDelete()) { - if (settings.autoDeleteDelay) { + if (!immediate && settings.autoDeleteDelay) { AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC)); autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(shared_from_this(), time)); broker->getTimer().add(autoDeleteTask); @@ -1343,7 +1343,7 @@ bool Queue::isExclusiveOwner(const OwnershipToken* const o) const return o == owner; } -void Queue::releaseExclusiveOwnership() +void Queue::releaseExclusiveOwnership(bool immediateExpiry) { bool unused; { @@ -1355,7 +1355,7 @@ void Queue::releaseExclusiveOwnership() unused = !users.isUsed(); } if (unused && settings.autodelete) { - scheduleAutoDelete(); + scheduleAutoDelete(immediateExpiry); } } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 65a91b8729..efca9b9d40 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -379,7 +379,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN uint32_t getConsumerCount() const; inline const std::string& getName() const { return name; } QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const; - QPID_BROKER_EXTERN void releaseExclusiveOwnership(); + QPID_BROKER_EXTERN void releaseExclusiveOwnership(bool immediateExpiry=false); QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o); QPID_BROKER_EXTERN bool hasExclusiveConsumer() const; QPID_BROKER_EXTERN bool hasExclusiveOwner() const; @@ -389,7 +389,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, inline bool isAutoDelete() const { return settings.autodelete; } inline bool isBrowseOnly() const { return settings.isBrowseOnly; } QPID_BROKER_EXTERN bool canAutoDelete() const; - QPID_BROKER_EXTERN void scheduleAutoDelete(); + QPID_BROKER_EXTERN void scheduleAutoDelete(bool immediate=false); QPID_BROKER_EXTERN bool isDeleted() const; const QueueBindings& getBindings() const { return bindings; } diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp index ce4c73dead..d4f73fc511 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -57,7 +57,7 @@ uint32_t Incoming::getCredit() return credit;//TODO: proper flow control } -void Incoming::detached() +void Incoming::detached(bool /*closed*/) { } diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h index 1a7064337d..ccf999a256 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h @@ -42,7 +42,7 @@ class Incoming : public ManagedIncomingLink virtual ~Incoming(); virtual bool doWork();//do anything that requires output virtual bool haveWork();//called when handling input to see whether any output work is needed - virtual void detached(); + virtual void detached(bool closed); virtual void readable(pn_delivery_t* delivery) = 0; void verify(const std::string& userid, const std::string& defaultRealm); void wakeup(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 54993d071e..d0b41c6c90 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -156,7 +156,7 @@ bool OutgoingFromQueue::canDeliver() return deliveries[current].delivery == 0 && pn_link_credit(link); } -void OutgoingFromQueue::detached() +void OutgoingFromQueue::detached(bool closed) { QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName()); queue->cancel(shared_from_this()); @@ -164,12 +164,14 @@ void OutgoingFromQueue::detached() for (size_t i = 0 ; i < deliveries.capacity(); ++i) { if (deliveries[i].msg) queue->release(deliveries[i].cursor, true); } - if (exclusive) queue->releaseExclusiveOwnership(); - else if (isControllingUser) queue->releaseFromUse(true); + if (exclusive) { + queue->releaseExclusiveOwnership(closed); + } else if (isControllingUser) { + queue->releaseFromUse(true); + } cancelled = true; } - OutgoingFromQueue::~OutgoingFromQueue() { if (!cancelled && isControllingUser) queue->releaseFromUse(true); diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index 27d8205fc8..d3825d0894 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -70,7 +70,7 @@ class Outgoing : public ManagedOutgoingLink /** * Signals that this link has been detached */ - virtual void detached() = 0; + virtual void detached(bool closed) = 0; /** * Called when a delivery is writable */ @@ -98,7 +98,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public void write(const char* data, size_t size); void handle(pn_delivery_t* delivery); bool canDeliver(); - void detached(); + void detached(bool closed); //Consumer interface: bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg); diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp index 83b3e64ee6..495fe800cb 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp @@ -163,7 +163,7 @@ void OutgoingFromRelay::handle(pn_delivery_t* delivery) /** * Signals that this link has been detached */ -void OutgoingFromRelay::detached() +void OutgoingFromRelay::detached(bool /*closed*/) { relay->detached(this); } @@ -221,7 +221,7 @@ uint32_t IncomingToRelay::getCredit() return relay->getCredit(); } -void IncomingToRelay::detached() +void IncomingToRelay::detached(bool /*closed*/) { relay->detached(this); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.h b/qpid/cpp/src/qpid/broker/amqp/Relay.h index ef700690fd..32f317bfe1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Relay.h +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.h @@ -100,7 +100,7 @@ class OutgoingFromRelay : public Outgoing const std::string& target, const std::string& name, boost::shared_ptr<Relay>); bool doWork(); void handle(pn_delivery_t* delivery); - void detached(); + void detached(bool closed); void init(); void setSubjectFilter(const std::string&); void setSelectorFilter(const std::string&); @@ -118,7 +118,7 @@ class IncomingToRelay : public Incoming bool settle(); bool doWork(); bool haveWork(); - void detached(); + void detached(bool closed); void readable(pn_delivery_t* delivery); uint32_t getCredit(); private: diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 32a923cac5..2e7d30118a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -577,7 +577,7 @@ void Session::detach(pn_link_t* link) if (pn_link_is_sender(link)) { OutgoingLinks::iterator i = outgoing.find(link); if (i != outgoing.end()) { - i->second->detached(); + i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/); boost::shared_ptr<Queue> q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get()); if (q && !q->isAutoDelete() && !q->isDeleted()) { connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId()); @@ -588,7 +588,7 @@ void Session::detach(pn_link_t* link) } else { IncomingLinks::iterator i = incoming.find(link); if (i != incoming.end()) { - i->second->detached(); + i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/); incoming.erase(i); QPID_LOG(debug, "Incoming link detached"); } @@ -653,7 +653,7 @@ bool Session::dispatch() pn_condition_set_name(error, e.symbol()); pn_condition_set_description(error, e.what()); pn_link_close(s->first); - s->second->detached(); + s->second->detached(true); outgoing.erase(s++); output = true; } @@ -678,7 +678,7 @@ bool Session::dispatch() pn_condition_set_name(error, e.symbol()); pn_condition_set_description(error, e.what()); pn_link_close(i->first); - i->second->detached(); + i->second->detached(true); incoming.erase(i++); output = true; } @@ -690,10 +690,10 @@ bool Session::dispatch() void Session::close() { for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { - i->second->detached(); + i->second->detached(false); } for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) { - i->second->detached(); + i->second->detached(false); } outgoing.clear(); incoming.clear(); |