summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-12-17 14:29:44 +0000
committerGordon Sim <gsim@apache.org>2014-12-17 14:29:44 +0000
commitfaec291315dc851eefa59b63f3c7107888b75f98 (patch)
tree4715e5a7718ef91759230c49e4d218707a9f2397
parent9f634ba1b0f2ee6ea2f2e0c3a7a3d892d3eeda7c (diff)
downloadqpid-python-faec291315dc851eefa59b63f3c7107888b75f98.tar.gz
QPID-6274: Delete subscription queue immediately on link close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646260 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Relay.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Relay.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp12
9 files changed, 26 insertions, 24 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 658fc26919..b1f7d0524b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -1300,10 +1300,10 @@ struct AutoDeleteTask : qpid::sys::TimerTask
}
};
-void Queue::scheduleAutoDelete()
+void Queue::scheduleAutoDelete(bool immediate)
{
if (canAutoDelete()) {
- if (settings.autoDeleteDelay) {
+ if (!immediate && settings.autoDeleteDelay) {
AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC));
autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(shared_from_this(), time));
broker->getTimer().add(autoDeleteTask);
@@ -1343,7 +1343,7 @@ bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
return o == owner;
}
-void Queue::releaseExclusiveOwnership()
+void Queue::releaseExclusiveOwnership(bool immediateExpiry)
{
bool unused;
{
@@ -1355,7 +1355,7 @@ void Queue::releaseExclusiveOwnership()
unused = !users.isUsed();
}
if (unused && settings.autodelete) {
- scheduleAutoDelete();
+ scheduleAutoDelete(immediateExpiry);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 65a91b8729..efca9b9d40 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -379,7 +379,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
inline const std::string& getName() const { return name; }
QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const;
- QPID_BROKER_EXTERN void releaseExclusiveOwnership();
+ QPID_BROKER_EXTERN void releaseExclusiveOwnership(bool immediateExpiry=false);
QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o);
QPID_BROKER_EXTERN bool hasExclusiveConsumer() const;
QPID_BROKER_EXTERN bool hasExclusiveOwner() const;
@@ -389,7 +389,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
inline bool isAutoDelete() const { return settings.autodelete; }
inline bool isBrowseOnly() const { return settings.isBrowseOnly; }
QPID_BROKER_EXTERN bool canAutoDelete() const;
- QPID_BROKER_EXTERN void scheduleAutoDelete();
+ QPID_BROKER_EXTERN void scheduleAutoDelete(bool immediate=false);
QPID_BROKER_EXTERN bool isDeleted() const;
const QueueBindings& getBindings() const { return bindings; }
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
index ce4c73dead..d4f73fc511 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
@@ -57,7 +57,7 @@ uint32_t Incoming::getCredit()
return credit;//TODO: proper flow control
}
-void Incoming::detached()
+void Incoming::detached(bool /*closed*/)
{
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
index 1a7064337d..ccf999a256 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
@@ -42,7 +42,7 @@ class Incoming : public ManagedIncomingLink
virtual ~Incoming();
virtual bool doWork();//do anything that requires output
virtual bool haveWork();//called when handling input to see whether any output work is needed
- virtual void detached();
+ virtual void detached(bool closed);
virtual void readable(pn_delivery_t* delivery) = 0;
void verify(const std::string& userid, const std::string& defaultRealm);
void wakeup();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 54993d071e..d0b41c6c90 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -156,7 +156,7 @@ bool OutgoingFromQueue::canDeliver()
return deliveries[current].delivery == 0 && pn_link_credit(link);
}
-void OutgoingFromQueue::detached()
+void OutgoingFromQueue::detached(bool closed)
{
QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName());
queue->cancel(shared_from_this());
@@ -164,12 +164,14 @@ void OutgoingFromQueue::detached()
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
if (deliveries[i].msg) queue->release(deliveries[i].cursor, true);
}
- if (exclusive) queue->releaseExclusiveOwnership();
- else if (isControllingUser) queue->releaseFromUse(true);
+ if (exclusive) {
+ queue->releaseExclusiveOwnership(closed);
+ } else if (isControllingUser) {
+ queue->releaseFromUse(true);
+ }
cancelled = true;
}
-
OutgoingFromQueue::~OutgoingFromQueue()
{
if (!cancelled && isControllingUser) queue->releaseFromUse(true);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index 27d8205fc8..d3825d0894 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -70,7 +70,7 @@ class Outgoing : public ManagedOutgoingLink
/**
* Signals that this link has been detached
*/
- virtual void detached() = 0;
+ virtual void detached(bool closed) = 0;
/**
* Called when a delivery is writable
*/
@@ -98,7 +98,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public
void write(const char* data, size_t size);
void handle(pn_delivery_t* delivery);
bool canDeliver();
- void detached();
+ void detached(bool closed);
//Consumer interface:
bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
index 83b3e64ee6..495fe800cb 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
@@ -163,7 +163,7 @@ void OutgoingFromRelay::handle(pn_delivery_t* delivery)
/**
* Signals that this link has been detached
*/
-void OutgoingFromRelay::detached()
+void OutgoingFromRelay::detached(bool /*closed*/)
{
relay->detached(this);
}
@@ -221,7 +221,7 @@ uint32_t IncomingToRelay::getCredit()
return relay->getCredit();
}
-void IncomingToRelay::detached()
+void IncomingToRelay::detached(bool /*closed*/)
{
relay->detached(this);
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.h b/qpid/cpp/src/qpid/broker/amqp/Relay.h
index ef700690fd..32f317bfe1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Relay.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Relay.h
@@ -100,7 +100,7 @@ class OutgoingFromRelay : public Outgoing
const std::string& target, const std::string& name, boost::shared_ptr<Relay>);
bool doWork();
void handle(pn_delivery_t* delivery);
- void detached();
+ void detached(bool closed);
void init();
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
@@ -118,7 +118,7 @@ class IncomingToRelay : public Incoming
bool settle();
bool doWork();
bool haveWork();
- void detached();
+ void detached(bool closed);
void readable(pn_delivery_t* delivery);
uint32_t getCredit();
private:
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 32a923cac5..2e7d30118a 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -577,7 +577,7 @@ void Session::detach(pn_link_t* link)
if (pn_link_is_sender(link)) {
OutgoingLinks::iterator i = outgoing.find(link);
if (i != outgoing.end()) {
- i->second->detached();
+ i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/);
boost::shared_ptr<Queue> q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get());
if (q && !q->isAutoDelete() && !q->isDeleted()) {
connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId());
@@ -588,7 +588,7 @@ void Session::detach(pn_link_t* link)
} else {
IncomingLinks::iterator i = incoming.find(link);
if (i != incoming.end()) {
- i->second->detached();
+ i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/);
incoming.erase(i);
QPID_LOG(debug, "Incoming link detached");
}
@@ -653,7 +653,7 @@ bool Session::dispatch()
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
pn_link_close(s->first);
- s->second->detached();
+ s->second->detached(true);
outgoing.erase(s++);
output = true;
}
@@ -678,7 +678,7 @@ bool Session::dispatch()
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
pn_link_close(i->first);
- i->second->detached();
+ i->second->detached(true);
incoming.erase(i++);
output = true;
}
@@ -690,10 +690,10 @@ bool Session::dispatch()
void Session::close()
{
for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
- i->second->detached();
+ i->second->detached(false);
}
for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) {
- i->second->detached();
+ i->second->detached(false);
}
outgoing.clear();
incoming.clear();