diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/include/qpid/management/ManagementObject.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 79 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 11 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/xml/XmlExchange.h | 2 |
11 files changed, 130 insertions, 26 deletions
diff --git a/cpp/include/qpid/management/ManagementObject.h b/cpp/include/qpid/management/ManagementObject.h index 6475ff5406..b1c70f64d6 100644 --- a/cpp/include/qpid/management/ManagementObject.h +++ b/cpp/include/qpid/management/ManagementObject.h @@ -27,12 +27,14 @@ #include <qpid/framing/Buffer.h> #include "qpid/CommonImportExport.h" #include <map> +#include <vector> namespace qpid { namespace management { class Manageable; class ObjectId; +class ManagementObject; class AgentAttachment { @@ -65,7 +67,9 @@ public: QPID_COMMON_EXTERN uint32_t encodedSize() const { return 16; }; QPID_COMMON_EXTERN void encode(framing::Buffer& buffer) const; QPID_COMMON_EXTERN void decode(framing::Buffer& buffer); - QPID_COMMON_EXTERN void setV2Key(const std::string& key) { v2Key = key; } + QPID_COMMON_EXTERN void setV2Key(const std::string& _key) { v2Key = _key; } + QPID_COMMON_EXTERN void setV2Key(const ManagementObject& object); + QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const; QPID_COMMON_EXTERN const std::string& getV2Key() const { return v2Key; } friend QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const ObjectId&); }; @@ -192,6 +196,7 @@ protected: }; typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap; +typedef std::vector<ManagementObject*> ManagementObjectVector; }} diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 094f59cdec..05179502e6 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -77,6 +77,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con if (exclusiveBinding) bk.queues.clear(); if (bk.queues.add_unless(b, MatchQueue(queue))) { + b->startManagement(); propagate = bk.fedBinding.addOrigin(fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 16eb75c88b..7bb70ed24a 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -306,9 +306,23 @@ void Exchange::propagateFedOp(const string& routingKey, const string& tags, cons (*iter)->propagateBinding(routingKey, tags, op, origin, extra_args); } -Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* parent, - FieldTable _args, const string& origin) - : queue(_queue), key(_key), args(_args), mgmtBinding(0) +Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent, + FieldTable _args, const string& _origin) + : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0) +{ +} + +Exchange::Binding::~Binding () +{ + if (mgmtBinding != 0) { + ManagementObject* mo = queue->GetManagementObject(); + if (mo != 0) + static_cast<_qmf::Queue*>(mo)->dec_bindingCount(); + mgmtBinding->resourceDestroy (); + } +} + +void Exchange::Binding::startManagement() { if (parent != 0) { @@ -333,16 +347,6 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang } } -Exchange::Binding::~Binding () -{ - if (mgmtBinding != 0) { - ManagementObject* mo = queue->GetManagementObject(); - if (mo != 0) - static_cast<_qmf::Queue*>(mo)->dec_bindingCount(); - mgmtBinding->resourceDestroy (); - } -} - ManagementObject* Exchange::Binding::GetManagementObject () const { return (ManagementObject*) mgmtBinding; diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index dfe69e2c04..23d044ffd3 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -45,14 +45,17 @@ public: typedef boost::shared_ptr<Binding> shared_ptr; typedef std::vector<Binding::shared_ptr> vector; + Exchange* parent; Queue::shared_ptr queue; const std::string key; const framing::FieldTable args; + std::string origin; qmf::org::apache::qpid::broker::Binding* mgmtBinding; Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0, framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); ~Binding(); + void startManagement(); management::ManagementObject* GetManagementObject() const; }; diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 6d840b50df..ef410a9154 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -63,6 +63,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin)); if (bindings.add_unless(binding, MatchQueue(queue))) { + binding->startManagement(); propagate = fedBinding.addOrigin(fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index e4a76a0bcd..640036e741 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -114,6 +114,7 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args)); BoundKey bk(binding); if (bindings.add_unless(bk, MatchArgs(queue, args))) { + binding->startManagement(); propagate = bk.fedBinding.addOrigin(fedOrigin); if (mgmtExchange != 0) { mgmtExchange->inc_bindingCount(); diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index dd57549b5d..6e53ef5fd2 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -207,6 +207,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons return false; } else { Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin)); + binding->startManagement(); BoundKey& bk = bindings[routingPattern]; bk.bindingVector.push_back(binding); propagate = bk.fedBinding.addOrigin(fedOrigin); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index e21edb4051..8dd680997f 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -85,6 +85,12 @@ ManagementAgent::~ManagementAgent () delete object; } managementObjects.clear(); + + while (!deletedManagementObjects.empty()) { + ManagementObject* object = deletedManagementObjects.back(); + delete object; + deletedManagementObjects.pop_back(); + } } } @@ -196,9 +202,20 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, } ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); - objId.setV2Key(object->getKey()); + objId.setV2Key(*object); object->setObjectId(objId); + ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); + if (destIter != newManagementObjects.end()) { + if (destIter->second->isDeleted()) { + newDeletedManagementObjects.push_back(destIter->second); + newManagementObjects.erase(destIter); + } else { + QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() << + " key=" << objId.getV2Key()); + return objId; + } + } newManagementObjects[objId] = object; if (publishNow) { @@ -344,9 +361,31 @@ void ManagementAgent::moveNewObjectsLH() Mutex::ScopedLock lock (addLock); for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); iter != newManagementObjects.end (); - iter++) - managementObjects[iter->first] = iter->second; + iter++) { + bool skip = false; + ManagementObjectMap::iterator destIter = managementObjects.find(iter->first); + if (destIter != managementObjects.end()) { + // We have an objectId collision with an existing object. If the old object + // is deleted, move it to the deleted list. + if (destIter->second->isDeleted()) { + deletedManagementObjects.push_back(destIter->second); + managementObjects.erase(destIter); + } else { + QPID_LOG(error, "ObjectId collision in moveNewObjects. class=" << + iter->second->getClassName() << " key=" << iter->first.getV2Key()); + skip = true; + } + } + + if (!skip) + managementObjects[iter->first] = iter->second; + } newManagementObjects.clear(); + + while (!newDeletedManagementObjects.empty()) { + deletedManagementObjects.push_back(newDeletedManagementObjects.back()); + newDeletedManagementObjects.pop_back(); + } } void ManagementAgent::periodicProcessing (void) @@ -449,7 +488,23 @@ void ManagementAgent::periodicProcessing (void) managementObjects.erase(iter->first); } - if (!deleteList.empty()) { + // Publish the deletion of objects created by insert-collision + bool collisionDeletions = false; + for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin(); + cdIter != deletedManagementObjects.end(); cdIter++) { + collisionDeletions = true; + Buffer msgBuffer(msgChars, BUFSIZE); + encodeHeader(msgBuffer, 'c'); + (*cdIter)->writeProperties(msgBuffer); + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + stringstream key; + key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); + sendBuffer (msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + } + + if (!deleteList.empty() || collisionDeletions) { deleteList.clear(); deleteOrphanedAgentsLH(); } @@ -596,7 +651,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey } } - ManagementObjectMap::iterator iter = managementObjects.find(objId); + ManagementObjectMap::iterator iter = numericFind(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); @@ -967,7 +1022,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin return; ObjectId selector(value->get<string>()); - ManagementObjectMap::iterator iter = managementObjects.find(selector); + ManagementObjectMap::iterator iter = numericFind(selector); if (iter != managementObjects.end()) { ManagementObject* object = iter->second; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -1294,6 +1349,18 @@ size_t ManagementAgent::validateEventSchema(Buffer& inBuffer) return end - start; } +ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid) +{ + ManagementObjectMap::iterator iter = managementObjects.begin(); + for (; iter != managementObjects.end(); iter++) { + if (oid.equalV1(iter->first)) + break; + } + + return iter; +} + + void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a) { Mutex::ScopedLock lock (addLock); diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index ea04a6cb72..3e00ebeb81 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -217,8 +217,18 @@ private: RemoteAgentMap remoteAgents; PackageMap packages; + + // + // Protected by userLock + // ManagementObjectMap managementObjects; + ManagementObjectVector deletedManagementObjects; + + // + // Protected by addLock + // ManagementObjectMap newManagementObjects; + ManagementObjectVector newDeletedManagementObjects; framing::Uuid uuid; sys::Mutex addLock; @@ -295,6 +305,7 @@ private: size_t validateSchema(framing::Buffer&, uint8_t kind); size_t validateTableSchema(framing::Buffer&); size_t validateEventSchema(framing::Buffer&); + ManagementObjectMap::iterator numericFind(const ObjectId& oid); void debugSnapshot(const char*); }; diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 4ac6613419..4b87800174 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -109,16 +109,18 @@ void ObjectId::fromString(const std::string& text) bool ObjectId::operator==(const ObjectId &other) const { - uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; - - return first == otherFirst && second == other.second; + return v2Key == other.v2Key; } bool ObjectId::operator<(const ObjectId &other) const { - uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + return v2Key < other.v2Key; +} - return (first < otherFirst) || ((first == otherFirst) && (second < other.second)); +bool ObjectId::equalV1(const ObjectId &other) const +{ + uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; + return first == otherFirst && second == other.second; } void ObjectId::encode(framing::Buffer& buffer) const @@ -136,6 +138,14 @@ void ObjectId::decode(framing::Buffer& buffer) second = buffer.getLongLong(); } +void ObjectId::setV2Key(const ManagementObject& object) +{ + std::stringstream oname; + oname << object.getPackageName() << "." << object.getClassName() << ":" << object.getKey(); + v2Key = oname.str(); +} + + namespace qpid { namespace management { diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h index 802afddeab..4394ede5e7 100644 --- a/cpp/src/qpid/xml/XmlExchange.h +++ b/cpp/src/qpid/xml/XmlExchange.h @@ -53,7 +53,7 @@ class XmlExchange : public virtual Exchange { const ::qpid::framing::FieldTable& _arguments, Query query): Binding(key, queue, parent, _arguments), xquery(query), - parse_message_content(true) {} + parse_message_content(true) { startManagement(); } }; |