From 54895c3ce0a66a4630289bfeb6ed4f86516e784e Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Tue, 18 Jan 2011 14:51:31 +0000 Subject: QPID-2997: remove oid disambiguation, re-order mgmt object status updates. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1060401 13f79535-47bb-0310-9956-ffa450edef68 --- .../cpp/include/qpid/management/ManagementObject.h | 1 - qpid/cpp/src/qpid/management/ManagementAgent.cpp | 403 ++++++++++++--------- qpid/cpp/src/qpid/management/ManagementAgent.h | 10 +- qpid/cpp/src/qpid/management/ManagementObject.cpp | 4 - qpid/cpp/src/tests/BrokerMgmtAgent.cpp | 146 +++++++- 5 files changed, 365 insertions(+), 199 deletions(-) diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h index dec5a63ee9..747edda150 100644 --- a/qpid/cpp/include/qpid/management/ManagementObject.h +++ b/qpid/cpp/include/qpid/management/ManagementObject.h @@ -82,7 +82,6 @@ public: QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const; QPID_COMMON_EXTERN void setV2Key(const std::string& _key) { v2Key = _key; } QPID_COMMON_EXTERN void setV2Key(const ManagementObject& object); - QPID_COMMON_EXTERN void disambiguate(); QPID_COMMON_EXTERN void setAgentName(const std::string& _name) { agentName = _name; } QPID_COMMON_EXTERN const std::string& getAgentName() const { return agentName; } QPID_COMMON_EXTERN const std::string& getV2Key() const { return v2Key; } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index cb33887fc8..7459ac9416 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -306,12 +306,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId { sys::Mutex::ScopedLock lock(addLock); - ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); - while (destIter != newManagementObjects.end()) { - objId.disambiguate(); - destIter = newManagementObjects.find(objId); - } - newManagementObjects[objId] = object; + newManagementObjects.push_back(object); } QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key()); return objId; @@ -337,12 +332,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, object->setObjectId(objId); { sys::Mutex::ScopedLock lock(addLock); - ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); - while (destIter != newManagementObjects.end()) { - objId.disambiguate(); - destIter = newManagementObjects.find(objId); - } - newManagementObjects[objId] = object; + newManagementObjects.push_back(object); } QPID_LOG(debug, "Management object added: " << objId.getV2Key()); return objId; @@ -621,22 +611,50 @@ void ManagementAgent::sendBufferLH(const string& data, } +/** Objects that have been added since the last periodic poll are temporarily + * saved in the newManagementObjects list. This allows objects to be + * added without needing to block on the userLock (addLock is used instead). + * These new objects need to be integrated into the object database + * (managementObjects) *before* they can be properly managed. This routine + * performs the integration. + * + * Note well: objects on the newManagementObjects list may have been + * marked as "deleted", and, possibly re-added. This would result in + * duplicate object ids. To avoid clashes, don't put deleted objects + * into the active object database. + */ void ManagementAgent::moveNewObjectsLH() { sys::Mutex::ScopedLock lock (addLock); - for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); - iter != newManagementObjects.end (); - iter++) { - ObjectId oid = iter->first; - ManagementObjectMap::iterator destIter = managementObjects.find(oid); - while (destIter != managementObjects.end()) { - oid.disambiguate(); - destIter = managementObjects.find(oid); - } + while (!newManagementObjects.empty()) { + ManagementObject *object = newManagementObjects.back(); + newManagementObjects.pop_back(); - managementObjects[oid] = iter->second; + if (object->isDeleted()) { + DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support)); + pendingDeletedObjs[dptr->getKey()].push_back(dptr); + delete object; + } else { // add to active object list, check for duplicates. + ObjectId oid = object->getObjectId(); + ManagementObjectMap::iterator destIter = managementObjects.find(oid); + if (destIter != managementObjects.end()) { + // duplicate found. It is OK if the old object has been marked + // deleted... + ManagementObject *oldObj = destIter->second; + if (oldObj->isDeleted()) { + DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); + pendingDeletedObjs[dptr->getKey()].push_back(dptr); + delete oldObj; + } else { + // Duplicate non-deleted objects? This is a user error - oids must be unique. + // for now, leak the old object (safer than deleting - may still be referenced) + // and complain loudly... + QPID_LOG(error, "Detected two management objects with the same identifier: " << oid); + } + } + managementObjects[oid] = object; + } } - newManagementObjects.clear(); } void ManagementAgent::periodicProcessing (void) @@ -670,7 +688,126 @@ void ManagementAgent::periodicProcessing (void) clientWasAdded = false; + // first send the pending deletes before sending updates. This prevents a + // "false delete" scenario: if an object was deleted then re-added during + // the last poll cycle, it will have a delete entry and an active entry. + // if we sent the active update first, _then_ the delete update, clients + // would incorrectly think the object was deleted. See QPID-2997 + // bool objectsDeleted = moveDeletedObjectsLH(); + if (!pendingDeletedObjs.empty()) { + // use a temporary copy of the pending deletes so dropping the lock when + // the buffer is sent is safe. + PendingDeletedObjsMap tmp(pendingDeletedObjs); + pendingDeletedObjs.clear(); + + for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { + std::string packageName; + std::string className; + Buffer msgBuffer(msgChars, BUFSIZE); + uint32_t v1Objs = 0; + uint32_t v2Objs = 0; + Variant::List list_; + + size_t pos = mIter->first.find(":"); + packageName = mIter->first.substr(0, pos); + className = mIter->first.substr(pos+1); + + for (DeletedObjectList::iterator lIter = mIter->second.begin(); + lIter != mIter->second.end(); lIter++) { + std::string oid = (*lIter)->objectId; + if (!(*lIter)->encodedV1Config.empty()) { + encodeHeader(msgBuffer, 'c'); + msgBuffer.putRawData((*lIter)->encodedV1Config); + QPID_LOG(trace, "Deleting V1 properties " << oid + << " len=" << (*lIter)->encodedV1Config.size()); + v1Objs++; + } + if (!(*lIter)->encodedV1Inst.empty()) { + encodeHeader(msgBuffer, 'i'); + msgBuffer.putRawData((*lIter)->encodedV1Inst); + QPID_LOG(trace, "Deleting V1 statistics " << oid + << " len=" << (*lIter)->encodedV1Inst.size()); + v1Objs++; + } + if (v1Objs && msgBuffer.available() < HEADROOM) { + v1Objs = 0; + contentSize = BUFSIZE - msgBuffer.available(); + stringstream key; + key << "console.obj.1.0." << packageName << "." << className; + msgBuffer.reset(); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" + << key.str() << " len=" << contentSize); + } + + if (!(*lIter)->encodedV2.empty()) { + QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2); + list_.push_back((*lIter)->encodedV2); + if (++v2Objs >= maxV2ReplyObjs) { + v2Objs = 0; + + string content; + ListCodec::encode(list_, content); + list_.clear(); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + } + } + } + } // end current list + + // send any remaining objects... + + if (v1Objs) { + contentSize = BUFSIZE - msgBuffer.available(); + stringstream key; + key << "console.obj.1.0." << packageName << "." << className; + msgBuffer.reset(); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); + } + + if (!list_.empty()) { + string content; + ListCodec::encode(list_, content); + list_.clear(); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + } + } + } // end map + } // // Process the entire object map. Remember: we drop the userLock each time we call @@ -828,122 +965,6 @@ void ManagementAgent::periodicProcessing (void) } } // end processing updates for all objects - - // now send the pending deletes. Make a temporary copy of the pending deletes so dropping the - // lock when the buffer is sent is safe. - // - if (!pendingDeletedObjs.empty()) { - PendingDeletedObjsMap tmp(pendingDeletedObjs); - pendingDeletedObjs.clear(); - - for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { - std::string packageName; - std::string className; - Buffer msgBuffer(msgChars, BUFSIZE); - uint32_t v1Objs = 0; - uint32_t v2Objs = 0; - Variant::List list_; - - size_t pos = mIter->first.find(":"); - packageName = mIter->first.substr(0, pos); - className = mIter->first.substr(pos+1); - - for (DeletedObjectList::iterator lIter = mIter->second.begin(); - lIter != mIter->second.end(); lIter++) { - std::string oid = (*lIter)->objectId; - if (!(*lIter)->encodedV1Config.empty()) { - encodeHeader(msgBuffer, 'c'); - msgBuffer.putRawData((*lIter)->encodedV1Config); - QPID_LOG(trace, "Deleting V1 properties " << oid - << " len=" << (*lIter)->encodedV1Config.size()); - v1Objs++; - } - if (!(*lIter)->encodedV1Inst.empty()) { - encodeHeader(msgBuffer, 'i'); - msgBuffer.putRawData((*lIter)->encodedV1Inst); - QPID_LOG(trace, "Deleting V1 statistics " << oid - << " len=" << (*lIter)->encodedV1Inst.size()); - v1Objs++; - } - if (v1Objs && msgBuffer.available() < HEADROOM) { - v1Objs = 0; - contentSize = BUFSIZE - msgBuffer.available(); - stringstream key; - key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" - << key.str() << " len=" << contentSize); - } - - if (!(*lIter)->encodedV2.empty()) { - QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2); - list_.push_back((*lIter)->encodedV2); - if (++v2Objs >= maxV2ReplyObjs) { - v2Objs = 0; - - string content; - ListCodec::encode(list_, content); - list_.clear(); - if (content.length()) { - stringstream key; - Variant::Map headers; - key << "agent.ind.data." << keyifyNameStr(packageName) - << "." << keyifyNameStr(className) - << "." << vendorNameKey - << "." << productNameKey; - if (!instanceNameKey.empty()) - key << "." << instanceNameKey; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; - - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); - } - } - } - } // end current list - - // send any remaining objects... - - if (v1Objs) { - contentSize = BUFSIZE - msgBuffer.available(); - stringstream key; - key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); - } - - if (!list_.empty()) { - string content; - ListCodec::encode(list_, content); - list_.clear(); - if (content.length()) { - stringstream key; - Variant::Map headers; - key << "agent.ind.data." << keyifyNameStr(packageName) - << "." << keyifyNameStr(className) - << "." << vendorNameKey - << "." << productNameKey; - if (!instanceNameKey.empty()) - key << "." << instanceNameKey; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; - - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); - } - } - } // end map - } - if (objectsDeleted) deleteOrphanedAgentsLH(); // heartbeat generation @@ -2619,13 +2640,24 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { } namespace { -bool isDeleted(const ManagementObjectMap::value_type& value) { +bool isDeletedMap(const ManagementObjectMap::value_type& value) { return value.second->isDeleted(); } +bool isDeletedVector(const ManagementObjectVector::value_type& value) { + return value->isDeleted(); +} + string summarizeMap(const char* name, const ManagementObjectMap& map) { ostringstream o; - size_t deleted = std::count_if(map.begin(), map.end(), isDeleted); + size_t deleted = std::count_if(map.begin(), map.end(), isDeletedMap); + o << map.size() << " " << name << " (" << deleted << " deleted), "; + return o.str(); +} + +string summarizeVector(const char* name, const ManagementObjectVector& map) { + ostringstream o; + size_t deleted = std::count_if(map.begin(), map.end(), isDeletedVector); o << map.size() << " " << name << " (" << deleted << " deleted), "; return o.str(); } @@ -2639,6 +2671,15 @@ string dumpMap(const ManagementObjectMap& map) { return o.str(); } +string dumpVector(const ManagementObjectVector& map) { + ostringstream o; + for (ManagementObjectVector::const_iterator i = map.begin(); i != map.end(); ++i) { + o << endl << " " << (*i)->getObjectId().getV2Key() + << ((*i)->isDeleted() ? " (deleted)" : ""); + } + return o.str(); +} + } // namespace string ManagementAgent::summarizeAgents() { @@ -2658,14 +2699,14 @@ void ManagementAgent::debugSnapshot(const char* title) { QPID_LOG(debug, title << ": management snapshot: " << packages.size() << " packages, " << summarizeMap("objects", managementObjects) - << summarizeMap("new objects ", newManagementObjects) + << summarizeVector("new objects ", newManagementObjects) << pendingDeletedObjs.size() << " pending deletes" << summarizeAgents()); QPID_LOG_IF(trace, managementObjects.size(), title << ": objects" << dumpMap(managementObjects)); QPID_LOG_IF(trace, newManagementObjects.size(), - title << ": new objects" << dumpMap(newManagementObjects)); + title << ": new objects" << dumpVector(newManagementObjects)); } Variant::Map ManagementAgent::toMap(const FieldTable& from) @@ -2910,6 +2951,45 @@ void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) } +// construct a DeletedObject from a management object. +ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bool v2) + : packageName(src->getPackageName()), + className(src->getClassName()) +{ + bool send_stats = (src->hasInst() && (src->getInstChanged() || src->getForcePublish())); + + stringstream oid; + oid << src->getObjectId(); + objectId = oid.str(); + + if (v1) { + src->writeProperties(encodedV1Config); + if (send_stats) { + src->writeStatistics(encodedV1Inst); + } + } + + if (v2) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + src->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(src->getPackageName(), + src->getClassName(), + "_data", + src->getMd5Sum()); + src->writeTimestamps(map_); + src->mapEncodeValues(values, true, send_stats); + map_["_values"] = values; + + encodedV2 = map_; + } +} + + + // construct a DeletedObject from an encoded representation. Used by // clustering to move deleted objects between clustered brokers. See // DeletedObject::encode() for the reverse. @@ -2966,42 +3046,9 @@ bool ManagementAgent::moveDeletedObjectsLH() { { ManagementObject* delObj = iter->second; assert(delObj->isDeleted()); - DeletedObject::shared_ptr dptr(new DeletedObject()); - std::string classkey(delObj->getPackageName() + std::string(":") + delObj->getClassName()); - bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || delObj->getForcePublish())); - - dptr->packageName = delObj->getPackageName(); - dptr->className = delObj->getClassName(); - stringstream oid; - oid << delObj->getObjectId(); - dptr->objectId = oid.str(); - - if (qmf1Support) { - delObj->writeProperties(dptr->encodedV1Config); - if (send_stats) { - delObj->writeStatistics(dptr->encodedV1Inst); - } - } - - if (qmf2Support) { - Variant::Map map_; - Variant::Map values; - Variant::Map oid; - - delObj->getObjectId().mapEncode(oid); - map_["_object_id"] = oid; - map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(), - delObj->getClassName(), - "_data", - delObj->getMd5Sum()); - delObj->writeTimestamps(map_); - delObj->mapEncodeValues(values, true, send_stats); - map_["_values"] = values; - - dptr->encodedV2 = map_; - } + DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support)); - pendingDeletedObjs[classkey].push_back(dptr); + pendingDeletedObjs[dptr->getKey()].push_back(dptr); managementObjects.erase(iter->first); delete iter->second; } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 87c39a67bd..2202e2fc98 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -159,13 +159,17 @@ public: class DeletedObject { public: typedef boost::shared_ptr shared_ptr; - DeletedObject() {}; + DeletedObject(ManagementObject *, bool v1, bool v2); DeletedObject( const std::string &encoded ); ~DeletedObject() {}; void encode( std::string& toBuffer ); + const std::string getKey() const { + // used to batch up objects of the same class type + return std::string(packageName + std::string(":") + className); + } private: - friend class ManagementAgent; + friend class ManagementAgent; std::string packageName; std::string className; @@ -280,7 +284,7 @@ private: // // Protected by addLock // - ManagementObjectMap newManagementObjects; + ManagementObjectVector newManagementObjects; framing::Uuid uuid; diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index cfdd58ed53..b4d469afbe 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -187,10 +187,6 @@ void ObjectId::setV2Key(const ManagementObject& object) v2Key = oname.str(); } -void ObjectId::disambiguate() -{ - v2Key = v2Key + "_"; -} // encode as V2-format map void ObjectId::mapEncode(types::Variant::Map& map) const diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp index 80bd590d7d..d0c6668b72 100644 --- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp +++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp @@ -24,9 +24,13 @@ #include "qpid/management/Buffer.h" #include "qpid/messaging/Message.h" #include "qpid/amqp_0_10/Codecs.h" +#include "qpid/log/Logger.h" +#include "qpid/log/Options.h" #include "qmf/org/apache/qpid/broker/mgmt/test/TestObject.h" +#include + using qpid::management::Mutex; using qpid::management::Manageable; @@ -53,9 +57,10 @@ namespace qpid { MessagingFixture *mFix; public: - AgentFixture( unsigned int pubInterval=10, bool qmfV2=false ) + AgentFixture( unsigned int pubInterval=10, + bool qmfV2=false, + qpid::broker::Broker::Options opts = qpid::broker::Broker::Options()) { - qpid::broker::Broker::Options opts = qpid::broker::Broker::Options(); opts.enableMgmt=true; opts.qmf2Support=qmfV2; opts.mgmtPubInterval=pubInterval; @@ -99,12 +104,15 @@ namespace qpid { class TestManageable : public qpid::management::Manageable { management::ManagementObject* mgmtObj; + const std::string key; public: - TestManageable(management::ManagementAgent *agent) { + TestManageable(management::ManagementAgent *agent, std::string _key) + : key(_key) + { _qmf::TestObject *tmp = new _qmf::TestObject(agent, this); // seed it with some default values... - tmp->set_string1("This is a test string!"); + tmp->set_string1(key); tmp->set_bool1(true); qpid::types::Variant::Map vMap; vMap["one"] = qpid::types::Variant(1); @@ -118,8 +126,8 @@ namespace qpid { management::ManagementObject* GetManagementObject() const { return mgmtObj; }; static void validateTestObjectProperties(_qmf::TestObject& to) { - // verify the default values are as expected - BOOST_CHECK(to.get_string1() == std::string("This is a test string!")); + // verify the default values are as expected. We don't check 'string1', + // as it is the object key, and is unique for each object (no default value). BOOST_CHECK(to.get_bool1() == true); BOOST_CHECK(to.get_map1().size() == 3); qpid::types::Variant::Map mappy = to.get_map1(); @@ -200,7 +208,7 @@ namespace qpid { agent = fix->getBrokerAgent(); // create a manageable test object - TestManageable *tm = new TestManageable(agent); + TestManageable *tm = new TestManageable(agent, std::string("obj1")); uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); @@ -262,7 +270,7 @@ namespace qpid { management::ManagementAgent* agent; agent = fix->getBrokerAgent(); - TestManageable *tm = new TestManageable(agent); + TestManageable *tm = new TestManageable(agent, std::string("obj2")); Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#"); @@ -326,7 +334,7 @@ namespace qpid { agent = fix->getBrokerAgent(); // create a manageable test object - TestManageable *tm = new TestManageable(agent); + TestManageable *tm = new TestManageable(agent, std::string("myObj")); uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); @@ -390,7 +398,7 @@ namespace qpid { agent = fix->getBrokerAgent(); // create a manageable test object - TestManageable *tm = new TestManageable(agent); + TestManageable *tm = new TestManageable(agent, std::string("anObj")); uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); @@ -465,7 +473,7 @@ namespace qpid { agent = fix->getBrokerAgent(); // create a manageable test object - TestManageable *tm = new TestManageable(agent); + TestManageable *tm = new TestManageable(agent, std::string("objectifyMe")); // add, then immediately delete and export the object... @@ -496,7 +504,13 @@ namespace qpid { uint32_t objLen; for (size_t i = 0; i < objCount; i++) { - TestManageable *tm = new TestManageable(agent); + std::stringstream key; + key << "testobj-" << std::setfill('x') << std::setw(4) << i; + // (no, seriously, I didn't just do that.) + // Note well: we have to keep the key string length EXACTLY THE SAME + // FOR ALL OBJECTS, so objLen will be the same. Otherwise the + // decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length). + TestManageable *tm = new TestManageable(agent, key.str()); objLen = tm->GetManagementObject()->writePropertiesSize(); agent->addObject(tm->GetManagementObject(), i + 1); tmv.push_back(tm); @@ -590,7 +604,7 @@ namespace qpid { for (size_t i = 0; i < objCount; i++) { std::stringstream key; key << "testobj-" << i; - TestManageable *tm = new TestManageable(agent); + TestManageable *tm = new TestManageable(agent, key.str()); objLen = tm->GetManagementObject()->writePropertiesSize(); agent->addObject(tm->GetManagementObject(), key.str()); tmv.push_back(tm); @@ -665,6 +679,112 @@ namespace qpid { delete fix; } + // See QPID-2997 + QPID_AUTO_TEST_CASE(v2RapidRestoreObj) + { + AgentFixture* fix = new AgentFixture(3, true); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); + + // two objects, same ObjID + TestManageable *tm1 = new TestManageable(agent, std::string("obj2")); + TestManageable *tm2 = new TestManageable(agent, std::string("obj2")); + + Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#"); + + // add, then immediately delete and re-add a copy of the object + agent->addObject(tm1->GetManagementObject(), "testobj-1"); + tm1->GetManagementObject()->resourceDestroy(); + agent->addObject(tm2->GetManagementObject(), "testobj-1"); + + // expect: a delete notification, then an update notification + TestObjectVector objs; + bool isDeleted = false; + bool isAdvertised = false; + size_t count = 0; + Message m1; + while (r1.fetch(m1, Duration::SECOND * 6)) { + + decodeV2ObjectUpdates(m1, objs); + BOOST_CHECK(objs.size() > 0); + + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + count++; + TestManageable::validateTestObjectProperties(**oIter); + + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) { + isDeleted = true; + BOOST_CHECK(isAdvertised == false); // delete must be first + } else { + isAdvertised = true; + BOOST_CHECK(isDeleted == true); // delete must be first + } + } + } + + BOOST_CHECK(isDeleted); + BOOST_CHECK(isAdvertised); + BOOST_CHECK(count == 2); + + r1.close(); + delete fix; + delete tm1; + delete tm2; + } + + // See QPID-2997 + QPID_AUTO_TEST_CASE(v2DuplicateErrorObj) + { + AgentFixture* fix = new AgentFixture(3, true); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); + + // turn off the expected error log message + qpid::log::Options logOpts; + logOpts.selectors.clear(); + logOpts.selectors.push_back("critical+"); + qpid::log::Logger::instance().configure(logOpts); + + // two objects, same ObjID + TestManageable *tm1 = new TestManageable(agent, std::string("obj2")); + TestManageable *tm2 = new TestManageable(agent, std::string("obj2")); + // Keep a pointer to the ManagementObject. This test simulates a user-caused error + // case (duplicate objects) where the broker has no choice but to leak a management + // object (safest assumption). To prevent valgrind from flagging this leak, we + // manually clean up the object at the end of the test. + management::ManagementObject *save = tm2->GetManagementObject(); + + Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#"); + + // add, then immediately delete and re-add a copy of the object + agent->addObject(tm1->GetManagementObject(), "testobj-1"); + agent->addObject(tm2->GetManagementObject(), "testobj-1"); + + TestObjectVector objs; + size_t count = 0; + Message m1; + while (r1.fetch(m1, Duration::SECOND * 6)) { + + decodeV2ObjectUpdates(m1, objs); + BOOST_CHECK(objs.size() > 0); + + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + count++; + TestManageable::validateTestObjectProperties(**oIter); + } + } + + BOOST_CHECK(count == 1); // only one should be accepted. + + r1.close(); + delete fix; + delete tm1; + delete tm2; + delete save; + } + QPID_AUTO_TEST_SUITE_END() } } -- cgit v1.2.1