diff options
author | Ted Ross <tross@apache.org> | 2010-02-12 21:23:27 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-02-12 21:23:27 +0000 |
commit | 0b4bb5acdba9afef93a99864b39e1de438b5dc42 (patch) | |
tree | 5582b724a780595e6a33ac959a313c3e0e65048e /cpp/src | |
parent | 1827a69f01ea0a955161fd93edfa137d7b1723a4 (diff) | |
download | qpid-python-0b4bb5acdba9afef93a99864b39e1de438b5dc42.tar.gz |
Changes needed for QPID-2029 (Clustering and Management don't work well together)
This update changes the indexing of object IDs in the broker-resident management agent
from being based on the QMFv1 format (numeric) to the QMFv2 format (string name). This removes
the need for numeric objectIds to be synchronized across a set of clustered brokers.
Also included in this patch is a fix to a bug in binding creation. Previously, when a binding
was created that already existed, the management object for the proposed binding (duplicate of
the existing one) was created then destroyed. This is inefficient and causes problems when the
name-based indexes collide.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@909610 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 79 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/xml/XmlExchange.h | 2 |
10 files changed, 124 insertions, 25 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 094f59cdec..05179502e6 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -77,6 +77,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con if (exclusiveBinding) bk.queues.clear(); if (bk.queues.add_unless(b, MatchQueue(queue))) { + b->startManagement(); propagate = bk.fedBinding.addOrigin(fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 16eb75c88b..7bb70ed24a 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -306,9 +306,23 @@ void Exchange::propagateFedOp(const string& routingKey, const string& tags, cons (*iter)->propagateBinding(routingKey, tags, op, origin, extra_args); } -Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent, - FieldTable _args, const string& origin) - : queue(_queue), key(_key), args(_args), mgmtBinding(0) +Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent, + FieldTable _args, const string& _origin) + : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0) +{ +} + +Exchange::Binding::~Binding () +{ + if (mgmtBinding != 0) { + ManagementObject* mo = queue->GetManagementObject(); + if (mo != 0) + static_cast<_qmf::Queue*>(mo)->dec_bindingCount(); + mgmtBinding->resourceDestroy (); + } +} + +void Exchange::Binding::startManagement() { if (parent != 0) { @@ -333,16 +347,6 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang } } -Exchange::Binding::~Binding () -{ - if (mgmtBinding != 0) { - ManagementObject* mo = queue->GetManagementObject(); - if (mo != 0) - static_cast<_qmf::Queue*>(mo)->dec_bindingCount(); - mgmtBinding->resourceDestroy (); - } -} - ManagementObject* Exchange::Binding::GetManagementObject () const { return (ManagementObject*) mgmtBinding; diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index dfe69e2c04..23d044ffd3 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -45,14 +45,17 @@ public: typedef boost::shared_ptr<Binding> shared_ptr; typedef std::vector<Binding::shared_ptr> vector; + Exchange* parent; Queue::shared_ptr queue; const std::string key; const framing::FieldTable args; + std::string origin; qmf::org::apache::qpid::broker::Binding* mgmtBinding; Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0, framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); ~Binding(); + void startManagement(); management::ManagementObject* GetManagementObject() const; }; diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 6d840b50df..ef410a9154 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -63,6 +63,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin)); if (bindings.add_unless(binding, MatchQueue(queue))) { + binding->startManagement(); propagate = fedBinding.addOrigin(fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index e4a76a0bcd..640036e741 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -114,6 +114,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args)); BoundKey bk(binding); if (bindings.add_unless(bk, MatchArgs(queue, args))) { + binding->startManagement(); propagate = bk.fedBinding.addOrigin(fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index dd57549b5d..6e53ef5fd2 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -207,6 +207,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons return false; } else { Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin)); + binding->startManagement(); BoundKey& bk = bindings[routingPattern]; bk.bindingVector.push_back(binding); propagate = bk.fedBinding.addOrigin(fedOrigin); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index e21edb4051..8dd680997f 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -85,6 +85,12 @@ ManagementAgent::~ManagementAgent () delete object; } managementObjects.clear(); + + while (!deletedManagementObjects.empty()) { + ManagementObject* object = deletedManagementObjects.back(); + delete object; + deletedManagementObjects.pop_back(); + } } } @@ -196,9 +202,20 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, } ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); - objId.setV2Key(object->getKey()); + objId.setV2Key(*object); object->setObjectId(objId); + ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); + if (destIter != newManagementObjects.end()) { + if (destIter->second->isDeleted()) { + newDeletedManagementObjects.push_back(destIter->second); + newManagementObjects.erase(destIter); + } else { + QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() << + " key=" << objId.getV2Key()); + return objId; + } + } newManagementObjects[objId] = object; if (publishNow) { @@ -344,9 +361,31 @@ void ManagementAgent::moveNewObjectsLH() Mutex::ScopedLock lock (addLock); for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); iter != newManagementObjects.end (); - iter++) - managementObjects[iter->first] = iter->second; + iter++) { + bool skip = false; + ManagementObjectMap::iterator destIter = managementObjects.find(iter->first); + if (destIter != managementObjects.end()) { + // We have an objectId collision with an existing object. If the old object + // is deleted, move it to the deleted list. + if (destIter->second->isDeleted()) { + deletedManagementObjects.push_back(destIter->second); + managementObjects.erase(destIter); + } else { + QPID_LOG(error, "ObjectId collision in moveNewObjects. class=" << + iter->second->getClassName() << " key=" << iter->first.getV2Key()); + skip = true; + } + } + + if (!skip) + managementObjects[iter->first] = iter->second; + } newManagementObjects.clear(); + + while (!newDeletedManagementObjects.empty()) { + deletedManagementObjects.push_back(newDeletedManagementObjects.back()); + newDeletedManagementObjects.pop_back(); + } } void ManagementAgent::periodicProcessing (void) @@ -449,7 +488,23 @@ void ManagementAgent::periodicProcessing (void) managementObjects.erase(iter->first); } - if (!deleteList.empty()) { + // Publish the deletion of objects created by insert-collision + bool collisionDeletions = false; + for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin(); + cdIter != deletedManagementObjects.end(); cdIter++) { + collisionDeletions = true; + Buffer msgBuffer(msgChars, BUFSIZE); + encodeHeader(msgBuffer, 'c'); + (*cdIter)->writeProperties(msgBuffer); + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + stringstream key; + key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); + sendBuffer (msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + } + + if (!deleteList.empty() || collisionDeletions) { deleteList.clear(); deleteOrphanedAgentsLH(); } @@ -596,7 +651,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey } } - ManagementObjectMap::iterator iter = managementObjects.find(objId); + ManagementObjectMap::iterator iter = numericFind(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); @@ -967,7 +1022,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin return; ObjectId selector(value->get<string>()); - ManagementObjectMap::iterator iter = managementObjects.find(selector); + ManagementObjectMap::iterator iter = numericFind(selector); if (iter != managementObjects.end()) { ManagementObject* object = iter->second; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -1294,6 +1349,18 @@ size_t ManagementAgent::validateEventSchema(Buffer& inBuffer) return end - start; } +ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid) +{ + ManagementObjectMap::iterator iter = managementObjects.begin(); + for (; iter != managementObjects.end(); iter++) { + if (oid.equalV1(iter->first)) + break; + } + + return iter; +} + + void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a) { Mutex::ScopedLock lock (addLock); diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index ea04a6cb72..3e00ebeb81 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -217,8 +217,18 @@ private: RemoteAgentMap remoteAgents; PackageMap packages; + + // + // Protected by userLock + // ManagementObjectMap managementObjects; + ManagementObjectVector deletedManagementObjects; + + // + // Protected by addLock + // ManagementObjectMap newManagementObjects; + ManagementObjectVector newDeletedManagementObjects; framing::Uuid uuid; sys::Mutex addLock; @@ -295,6 +305,7 @@ private: size_t validateSchema(framing::Buffer&, uint8_t kind); size_t validateTableSchema(framing::Buffer&); size_t validateEventSchema(framing::Buffer&); + ManagementObjectMap::iterator numericFind(const ObjectId& oid); void debugSnapshot(const char*); }; diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 4ac6613419..4b87800174 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -109,16 +109,18 @@ void ObjectId::fromString(const std::string& text) bool ObjectId::operator==(const ObjectId &other) const { - uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; - - return first == otherFirst && second == other.second; + return v2Key == other.v2Key; } bool ObjectId::operator<(const ObjectId &other) const { - uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + return v2Key < other.v2Key; +} - return (first < otherFirst) || ((first == otherFirst) && (second < other.second)); +bool ObjectId::equalV1(const ObjectId &other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + return first == otherFirst && second == other.second; } void ObjectId::encode(framing::Buffer& buffer) const @@ -136,6 +138,14 @@ void ObjectId::decode(framing::Buffer& buffer) second = buffer.getLongLong(); } +void ObjectId::setV2Key(const ManagementObject& object) +{ + std::stringstream oname; + oname << object.getPackageName() << "." << object.getClassName() << ":" << object.getKey(); + v2Key = oname.str(); +} + + namespace qpid { namespace management { diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h index 802afddeab..4394ede5e7 100644 --- a/cpp/src/qpid/xml/XmlExchange.h +++ b/cpp/src/qpid/xml/XmlExchange.h @@ -53,7 +53,7 @@ class XmlExchange : public virtual Exchange { const ::qpid::framing::FieldTable& _arguments, Query query): Binding(key, queue, parent, _arguments), xquery(query), - parse_message_content(true) {} + parse_message_content(true) { startManagement(); } }; |