summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2016-06-22 20:41:55 +0000
committerAlan Conway <aconway@apache.org>2016-06-22 20:41:55 +0000
commit7b8a91375dc0528e2c15686ab71c3647ac3e2f96 (patch)
tree37b11722254d3109e23ebe676a1137840120a107
parente1d9be1b3590c79a0b2a6b6ad4cd3e6a65877401 (diff)
downloadqpid-python-7b8a91375dc0528e2c15686ab71c3647ac3e2f96.tar.gz
QPID-7306: Fix race conditions during Queue destruction.
Stack traces indicate a Queue was being destroyed concurrently while still in use by its ManagedObject. ManagedObject holds a plain pointer to the Manageable object (e.g. Queue) it belongs to. The Manageable calls ManagedObject::resourceDestroy() when it is deleted, but without any locking. Added a locked wrapper class ManageablePtr so destroy is atomic with respect to other calls via ManageablePtr, calls after pointer is reset to 0 in destroy() are skipped. Call resourceDestroy() in Queue::~Queue if it was not called already. This is probably redundant given given the fixes above but can't hurt. Queue::destroyed() was also being called without locking and could be called concurrrently, e.g. if auto-delete happens concurrently with delete via QMF or by a 0-10 client. Moved the destroyed() call into QueueRegistry::destroy(), using QueueRegistry lock to guarantee it is called exactly once. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1749782 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/managementgen/qmfgen/schema.py8
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h4
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp1
-rw-r--r--qpid/cpp/src/qpid/management/Manageable.h6
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp30
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.h20
9 files changed, 76 insertions, 16 deletions
diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py
index 7bf161dc2b..19da8ef5fb 100755
--- a/qpid/cpp/managementgen/qmfgen/schema.py
+++ b/qpid/cpp/managementgen/qmfgen/schema.py
@@ -1371,10 +1371,10 @@ class SchemaClass:
arg.dir.lower () + "_" +\
arg.name, "inBuf") + ";\n")
- stream.write (" bool allow = coreObject->AuthorizeMethod(METHOD_" +\
+ stream.write (" bool allow = manageable.AuthorizeMethod(METHOD_" +\
method.getName().upper() + ", ioArgs, userId);\n")
stream.write (" if (allow)\n")
- stream.write (" status = coreObject->ManagementMethod (METHOD_" +\
+ stream.write (" status = manageable.ManagementMethod (METHOD_" +\
method.getName().upper() + ", ioArgs, text);\n")
stream.write (" else\n")
stream.write (" status = Manageable::STATUS_FORBIDDEN;\n")
@@ -1413,10 +1413,10 @@ class SchemaClass:
False,
arg.default)
- stream.write (" bool allow = coreObject->AuthorizeMethod(METHOD_" +\
+ stream.write (" bool allow = manageable.AuthorizeMethod(METHOD_" +\
method.getName().upper() + ", ioArgs, userId);\n")
stream.write (" if (allow)\n")
- stream.write (" status = coreObject->ManagementMethod (METHOD_" +\
+ stream.write (" status = manageable.ManagementMethod (METHOD_" +\
method.getName().upper() + ", ioArgs, text);\n")
stream.write (" else\n")
stream.write (" status = Manageable::STATUS_FORBIDDEN;\n")
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 055eba4f13..77674692ab 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -1496,7 +1496,6 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId,
queue->isRedirectSource() ? peerQ : queue,
false);
queues.destroy(name, connectionId, userId);
- queue->destroyed();
} else {
throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index fea5946247..625c6cceba 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -237,8 +237,10 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
Queue::~Queue()
{
- if (mgmtObject != 0)
+ if (mgmtObject != 0) {
mgmtObject->debugStats("destroying");
+ mgmtObject->resourceDestroy();
+ }
}
bool Queue::isLocal(const Message& msg)
@@ -1346,7 +1348,6 @@ void Queue::tryAutoDelete(long expectedVersion)
broker->getAcl()->recordDestroyQueue(name);
QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")");
- destroyed();
} else {
//queue was accessed since the delayed auto-delete was scheduled, so try again
QPID_LOG_CAT(debug, model, "Auto-delete interrupted for queue: " << name);
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 150ad1ce12..cd4a511181 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -299,7 +299,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
*/
QPID_BROKER_EXTERN void create();
- void destroyed();
QPID_BROKER_EXTERN void bound(const std::string& exchange,
const std::string& key,
const qpid::framing::FieldTable& args);
@@ -534,6 +533,9 @@ class Queue : public boost::enable_shared_from_this<Queue>,
static bool reroute(boost::shared_ptr<Exchange> e, const Message& m);
static bool isExpired(const std::string& queueName, const Message&, qpid::sys::AbsTime);
+ private:
+ void destroyed(); // Only called by QueueRegistry::destroy()
+ friend class QueueRegistry;
friend class QueueFactory;
friend class QueueRegistry;
};
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 2101d51fc2..c3cd3488e8 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -101,6 +101,13 @@ void QueueRegistry::destroy(
eraseLH(i, q, name, connectionId, userId);
}
}
+ // Destroy management object, store record etc. The Queue will not
+ // actually be deleted till all shared_ptr to it are gone.
+ //
+ // Outside the lock (avoid deadlock) but guaranteed to be called exactly once,
+ // since q will only be set on the first call to destroy above.
+ if (q)
+ q->destroyed();
}
void QueueRegistry::eraseLH(QueueMap::iterator i, Queue::shared_ptr q, const string& name, const string& connectionId, const string& userId)
@@ -129,11 +136,17 @@ bool QueueRegistry::destroyIfUntouched(const string& name, long version,
q = i->second;
if (q->version == version) {
eraseLH(i, q, name, connectionId, userId);
- return true;
}
}
- return false;
}
+ // Destroy management object, store record etc. The Queue will not
+ // actually be deleted till all shared_ptr to it are gone.
+ //
+ // Outside the lock (avoid deadlock) but guaranteed to be called exactly once,
+ // since q will only be set on the first call to destroy above.
+ if (q)
+ q->destroyed();
+ return q;
}
Queue::shared_ptr QueueRegistry::find(const string& name){
diff --git a/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp b/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp
index f8b26a62ad..55f644ed86 100644
--- a/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp
@@ -37,7 +37,6 @@ bool SelfDestructQueue::checkDepth(const QueueDepth& increment, const Message&)
if (broker->getAcl())
broker->getAcl()->recordDestroyQueue(name);
QPID_LOG_CAT(debug, model, "Queue " << name << " deleted itself due to reaching limit: " << current << " (policy is " << settings.maxDepth << ")");
- destroyed();
}
current += increment;
return true;
diff --git a/qpid/cpp/src/qpid/management/Manageable.h b/qpid/cpp/src/qpid/management/Manageable.h
index 70c9a5a188..5b263e195d 100644
--- a/qpid/cpp/src/qpid/management/Manageable.h
+++ b/qpid/cpp/src/qpid/management/Manageable.h
@@ -20,10 +20,12 @@
// under the License.
//
-#include "qpid/management/ManagementObject.h"
+#include "qpid/CommonImportExport.h"
#include "qpid/management/Args.h"
+#include "qpid/management/ManagementObject.h"
+#include "qpid/sys/IntegerTypes.h"
+
#include <string>
-#include "qpid/CommonImportExport.h"
namespace qpid {
namespace management {
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index 019963e832..f18f575ff8 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -23,6 +23,7 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Buffer.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Thread.h"
#include "qpid/log/Statement.h"
@@ -245,11 +246,37 @@ ostream& operator<<(ostream& out, const ObjectId& i)
}}
+// Called with lock held
+Manageable* ManagementObject::ManageablePtr::get() const {
+ if (ptr == 0)
+ throw framing::ResourceDeletedException("managed object deleted");
+ return ptr;
+}
+
+void ManagementObject::ManageablePtr::reset() {
+ Mutex::ScopedLock l(lock);
+ ptr = 0;
+}
+
+uint32_t ManagementObject::ManageablePtr::ManagementMethod(
+ uint32_t methodId, Args& args, std::string& text)
+{
+ Mutex::ScopedLock l(lock);
+ return get()->ManagementMethod(methodId, args, text);
+}
+
+bool ManagementObject::ManageablePtr:: AuthorizeMethod(
+ uint32_t methodId, Args& args, const std::string& userId)
+{
+ Mutex::ScopedLock l(lock);
+ return get()->AuthorizeMethod(methodId, args, userId);
+}
+
ManagementObject::ManagementObject(Manageable* _core) :
createTime(qpid::sys::Duration::FromEpoch()),
destroyTime(0), updateTime(createTime), configChanged(true),
instChanged(true), deleted(false),
- coreObject(_core), flags(0), forcePublish(false) {}
+ manageable(_core), flags(0), forcePublish(false) {}
void ManagementObject::setUpdateTime()
{
@@ -261,6 +288,7 @@ void ManagementObject::resourceDestroy()
QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key());
destroyTime = sys::Duration::FromEpoch();
deleted = true;
+ manageable.reset();
}
int ManagementObject::maxThreads = 1;
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h
index 5719c2354d..a299f1ef4a 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.h
+++ b/qpid/cpp/src/qpid/management/ManagementObject.h
@@ -23,6 +23,7 @@
*/
#include "qpid/CommonImportExport.h"
+#include "qpid/management/Args.h"
#include "qpid/management/Mutex.h"
#include "qpid/types/Variant.h"
#include <map>
@@ -33,9 +34,9 @@
namespace qpid {
namespace management {
-class Manageable;
class ObjectId;
class ManagementObject;
+class Manageable;
class AgentAttachment {
@@ -135,6 +136,21 @@ public:
class QPID_COMMON_CLASS_EXTERN ManagementObject : public ManagementItem
{
protected:
+ // Thread safe wrapper for Manageable* with atomic calls and destroy().
+ class ManageablePtr {
+ Manageable* ptr;
+ mutable Mutex lock;
+ Manageable* get() const;
+ ManageablePtr(const ManageablePtr&); // not copyable
+ ManageablePtr& operator=(const ManageablePtr&); // not copyable
+
+ public:
+ ManageablePtr(Manageable* m) : ptr(m) {}
+
+ uint32_t ManagementMethod(uint32_t methodId, Args& args, std::string& text);
+ bool AuthorizeMethod(uint32_t methodId, Args& args, const std::string& userId);
+ void reset();
+ };
uint64_t createTime;
uint64_t destroyTime;
@@ -143,7 +159,7 @@ protected:
mutable bool configChanged;
mutable bool instChanged;
bool deleted;
- Manageable* coreObject;
+ ManageablePtr manageable;
mutable Mutex accessLock;
uint32_t flags;