From 7b8a91375dc0528e2c15686ab71c3647ac3e2f96 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 22 Jun 2016 20:41:55 +0000 Subject: QPID-7306: Fix race conditions during Queue destruction. Stack traces indicate a Queue was being destroyed concurrently while still in use by its ManagedObject. ManagedObject holds a plain pointer to the Manageable object (e.g. Queue) it belongs to. The Manageable calls ManagedObject::resourceDestroy() when it is deleted, but without any locking. Added a locked wrapper class ManageablePtr so destroy is atomic with respect to other calls via ManageablePtr, calls after pointer is reset to 0 in destroy() are skipped. Call resourceDestroy() in Queue::~Queue if it was not called already. This is probably redundant given given the fixes above but can't hurt. Queue::destroyed() was also being called without locking and could be called concurrrently, e.g. if auto-delete happens concurrently with delete via QMF or by a 0-10 client. Moved the destroyed() call into QueueRegistry::destroy(), using QueueRegistry lock to guarantee it is called exactly once. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1749782 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/managementgen/qmfgen/schema.py | 8 +++--- qpid/cpp/src/qpid/broker/Broker.cpp | 1 - qpid/cpp/src/qpid/broker/Queue.cpp | 5 ++-- qpid/cpp/src/qpid/broker/Queue.h | 4 ++- qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 17 +++++++++++-- qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp | 1 - qpid/cpp/src/qpid/management/Manageable.h | 6 +++-- qpid/cpp/src/qpid/management/ManagementObject.cpp | 30 ++++++++++++++++++++++- qpid/cpp/src/qpid/management/ManagementObject.h | 20 +++++++++++++-- 9 files changed, 76 insertions(+), 16 deletions(-) diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py index 7bf161dc2b..19da8ef5fb 100755 --- a/qpid/cpp/managementgen/qmfgen/schema.py +++ b/qpid/cpp/managementgen/qmfgen/schema.py @@ -1371,10 +1371,10 @@ class SchemaClass: arg.dir.lower () + "_" +\ arg.name, "inBuf") + ";\n") - stream.write (" bool allow = coreObject->AuthorizeMethod(METHOD_" +\ + stream.write (" bool allow = manageable.AuthorizeMethod(METHOD_" +\ method.getName().upper() + ", ioArgs, userId);\n") stream.write (" if (allow)\n") - stream.write (" status = coreObject->ManagementMethod (METHOD_" +\ + stream.write (" status = manageable.ManagementMethod (METHOD_" +\ method.getName().upper() + ", ioArgs, text);\n") stream.write (" else\n") stream.write (" status = Manageable::STATUS_FORBIDDEN;\n") @@ -1413,10 +1413,10 @@ class SchemaClass: False, arg.default) - stream.write (" bool allow = coreObject->AuthorizeMethod(METHOD_" +\ + stream.write (" bool allow = manageable.AuthorizeMethod(METHOD_" +\ method.getName().upper() + ", ioArgs, userId);\n") stream.write (" if (allow)\n") - stream.write (" status = coreObject->ManagementMethod (METHOD_" +\ + stream.write (" status = manageable.ManagementMethod (METHOD_" +\ method.getName().upper() + ", ioArgs, text);\n") stream.write (" else\n") stream.write (" status = Manageable::STATUS_FORBIDDEN;\n") diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 055eba4f13..77674692ab 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -1496,7 +1496,6 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId, queue->isRedirectSource() ? peerQ : queue, false); queues.destroy(name, connectionId, userId); - queue->destroyed(); } else { throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name)); } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index fea5946247..625c6cceba 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -237,8 +237,10 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, Queue::~Queue() { - if (mgmtObject != 0) + if (mgmtObject != 0) { mgmtObject->debugStats("destroying"); + mgmtObject->resourceDestroy(); + } } bool Queue::isLocal(const Message& msg) @@ -1346,7 +1348,6 @@ void Queue::tryAutoDelete(long expectedVersion) broker->getAcl()->recordDestroyQueue(name); QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")"); - destroyed(); } else { //queue was accessed since the delayed auto-delete was scheduled, so try again QPID_LOG_CAT(debug, model, "Auto-delete interrupted for queue: " << name); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 150ad1ce12..cd4a511181 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -299,7 +299,6 @@ class Queue : public boost::enable_shared_from_this, */ QPID_BROKER_EXTERN void create(); - void destroyed(); QPID_BROKER_EXTERN void bound(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args); @@ -534,6 +533,9 @@ class Queue : public boost::enable_shared_from_this, static bool reroute(boost::shared_ptr e, const Message& m); static bool isExpired(const std::string& queueName, const Message&, qpid::sys::AbsTime); + private: + void destroyed(); // Only called by QueueRegistry::destroy() + friend class QueueRegistry; friend class QueueFactory; friend class QueueRegistry; }; diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 2101d51fc2..c3cd3488e8 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -101,6 +101,13 @@ void QueueRegistry::destroy( eraseLH(i, q, name, connectionId, userId); } } + // Destroy management object, store record etc. The Queue will not + // actually be deleted till all shared_ptr to it are gone. + // + // Outside the lock (avoid deadlock) but guaranteed to be called exactly once, + // since q will only be set on the first call to destroy above. + if (q) + q->destroyed(); } void QueueRegistry::eraseLH(QueueMap::iterator i, Queue::shared_ptr q, const string& name, const string& connectionId, const string& userId) @@ -129,11 +136,17 @@ bool QueueRegistry::destroyIfUntouched(const string& name, long version, q = i->second; if (q->version == version) { eraseLH(i, q, name, connectionId, userId); - return true; } } - return false; } + // Destroy management object, store record etc. The Queue will not + // actually be deleted till all shared_ptr to it are gone. + // + // Outside the lock (avoid deadlock) but guaranteed to be called exactly once, + // since q will only be set on the first call to destroy above. + if (q) + q->destroyed(); + return q; } Queue::shared_ptr QueueRegistry::find(const string& name){ diff --git a/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp b/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp index f8b26a62ad..55f644ed86 100644 --- a/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp +++ b/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp @@ -37,7 +37,6 @@ bool SelfDestructQueue::checkDepth(const QueueDepth& increment, const Message&) if (broker->getAcl()) broker->getAcl()->recordDestroyQueue(name); QPID_LOG_CAT(debug, model, "Queue " << name << " deleted itself due to reaching limit: " << current << " (policy is " << settings.maxDepth << ")"); - destroyed(); } current += increment; return true; diff --git a/qpid/cpp/src/qpid/management/Manageable.h b/qpid/cpp/src/qpid/management/Manageable.h index 70c9a5a188..5b263e195d 100644 --- a/qpid/cpp/src/qpid/management/Manageable.h +++ b/qpid/cpp/src/qpid/management/Manageable.h @@ -20,10 +20,12 @@ // under the License. // -#include "qpid/management/ManagementObject.h" +#include "qpid/CommonImportExport.h" #include "qpid/management/Args.h" +#include "qpid/management/ManagementObject.h" +#include "qpid/sys/IntegerTypes.h" + #include -#include "qpid/CommonImportExport.h" namespace qpid { namespace management { diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index 019963e832..f18f575ff8 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -23,6 +23,7 @@ #include "qpid/management/ManagementObject.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/Buffer.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/sys/Time.h" #include "qpid/sys/Thread.h" #include "qpid/log/Statement.h" @@ -245,11 +246,37 @@ ostream& operator<<(ostream& out, const ObjectId& i) }} +// Called with lock held +Manageable* ManagementObject::ManageablePtr::get() const { + if (ptr == 0) + throw framing::ResourceDeletedException("managed object deleted"); + return ptr; +} + +void ManagementObject::ManageablePtr::reset() { + Mutex::ScopedLock l(lock); + ptr = 0; +} + +uint32_t ManagementObject::ManageablePtr::ManagementMethod( + uint32_t methodId, Args& args, std::string& text) +{ + Mutex::ScopedLock l(lock); + return get()->ManagementMethod(methodId, args, text); +} + +bool ManagementObject::ManageablePtr:: AuthorizeMethod( + uint32_t methodId, Args& args, const std::string& userId) +{ + Mutex::ScopedLock l(lock); + return get()->AuthorizeMethod(methodId, args, userId); +} + ManagementObject::ManagementObject(Manageable* _core) : createTime(qpid::sys::Duration::FromEpoch()), destroyTime(0), updateTime(createTime), configChanged(true), instChanged(true), deleted(false), - coreObject(_core), flags(0), forcePublish(false) {} + manageable(_core), flags(0), forcePublish(false) {} void ManagementObject::setUpdateTime() { @@ -261,6 +288,7 @@ void ManagementObject::resourceDestroy() QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key()); destroyTime = sys::Duration::FromEpoch(); deleted = true; + manageable.reset(); } int ManagementObject::maxThreads = 1; diff --git a/qpid/cpp/src/qpid/management/ManagementObject.h b/qpid/cpp/src/qpid/management/ManagementObject.h index 5719c2354d..a299f1ef4a 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.h +++ b/qpid/cpp/src/qpid/management/ManagementObject.h @@ -23,6 +23,7 @@ */ #include "qpid/CommonImportExport.h" +#include "qpid/management/Args.h" #include "qpid/management/Mutex.h" #include "qpid/types/Variant.h" #include @@ -33,9 +34,9 @@ namespace qpid { namespace management { -class Manageable; class ObjectId; class ManagementObject; +class Manageable; class AgentAttachment { @@ -135,6 +136,21 @@ public: class QPID_COMMON_CLASS_EXTERN ManagementObject : public ManagementItem { protected: + // Thread safe wrapper for Manageable* with atomic calls and destroy(). + class ManageablePtr { + Manageable* ptr; + mutable Mutex lock; + Manageable* get() const; + ManageablePtr(const ManageablePtr&); // not copyable + ManageablePtr& operator=(const ManageablePtr&); // not copyable + + public: + ManageablePtr(Manageable* m) : ptr(m) {} + + uint32_t ManagementMethod(uint32_t methodId, Args& args, std::string& text); + bool AuthorizeMethod(uint32_t methodId, Args& args, const std::string& userId); + void reset(); + }; uint64_t createTime; uint64_t destroyTime; @@ -143,7 +159,7 @@ protected: mutable bool configChanged; mutable bool instChanged; bool deleted; - Manageable* coreObject; + ManageablePtr manageable; mutable Mutex accessLock; uint32_t flags; -- cgit v1.2.1