summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-01-18 14:51:31 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-01-18 14:51:31 +0000
commit54895c3ce0a66a4630289bfeb6ed4f86516e784e (patch)
tree8003b27438adf77ae7262e939227f858a81d78b1
parentf425c029118b8f2404fea12e08875f48d0e0b720 (diff)
downloadqpid-python-54895c3ce0a66a4630289bfeb6ed4f86516e784e.tar.gz
QPID-2997: remove oid disambiguation, re-order mgmt object status updates.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1060401 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/management/ManagementObject.h1
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp403
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h10
-rw-r--r--qpid/cpp/src/qpid/management/ManagementObject.cpp4
-rw-r--r--qpid/cpp/src/tests/BrokerMgmtAgent.cpp146
5 files changed, 365 insertions, 199 deletions
diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h
index dec5a63ee9..747edda150 100644
--- a/qpid/cpp/include/qpid/management/ManagementObject.h
+++ b/qpid/cpp/include/qpid/management/ManagementObject.h
@@ -82,7 +82,6 @@ public:
QPID_COMMON_EXTERN bool equalV1(const ObjectId &other) const;
QPID_COMMON_EXTERN void setV2Key(const std::string& _key) { v2Key = _key; }
QPID_COMMON_EXTERN void setV2Key(const ManagementObject& object);
- QPID_COMMON_EXTERN void disambiguate();
QPID_COMMON_EXTERN void setAgentName(const std::string& _name) { agentName = _name; }
QPID_COMMON_EXTERN const std::string& getAgentName() const { return agentName; }
QPID_COMMON_EXTERN const std::string& getV2Key() const { return v2Key; }
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index cb33887fc8..7459ac9416 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -306,12 +306,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId
{
sys::Mutex::ScopedLock lock(addLock);
- ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
- while (destIter != newManagementObjects.end()) {
- objId.disambiguate();
- destIter = newManagementObjects.find(objId);
- }
- newManagementObjects[objId] = object;
+ newManagementObjects.push_back(object);
}
QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key());
return objId;
@@ -337,12 +332,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
object->setObjectId(objId);
{
sys::Mutex::ScopedLock lock(addLock);
- ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
- while (destIter != newManagementObjects.end()) {
- objId.disambiguate();
- destIter = newManagementObjects.find(objId);
- }
- newManagementObjects[objId] = object;
+ newManagementObjects.push_back(object);
}
QPID_LOG(debug, "Management object added: " << objId.getV2Key());
return objId;
@@ -621,22 +611,50 @@ void ManagementAgent::sendBufferLH(const string& data,
}
+/** Objects that have been added since the last periodic poll are temporarily
+ * saved in the newManagementObjects list. This allows objects to be
+ * added without needing to block on the userLock (addLock is used instead).
+ * These new objects need to be integrated into the object database
+ * (managementObjects) *before* they can be properly managed. This routine
+ * performs the integration.
+ *
+ * Note well: objects on the newManagementObjects list may have been
+ * marked as "deleted", and, possibly re-added. This would result in
+ * duplicate object ids. To avoid clashes, don't put deleted objects
+ * into the active object database.
+ */
void ManagementAgent::moveNewObjectsLH()
{
sys::Mutex::ScopedLock lock (addLock);
- for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
- iter != newManagementObjects.end ();
- iter++) {
- ObjectId oid = iter->first;
- ManagementObjectMap::iterator destIter = managementObjects.find(oid);
- while (destIter != managementObjects.end()) {
- oid.disambiguate();
- destIter = managementObjects.find(oid);
- }
+ while (!newManagementObjects.empty()) {
+ ManagementObject *object = newManagementObjects.back();
+ newManagementObjects.pop_back();
- managementObjects[oid] = iter->second;
+ if (object->isDeleted()) {
+ DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support));
+ pendingDeletedObjs[dptr->getKey()].push_back(dptr);
+ delete object;
+ } else { // add to active object list, check for duplicates.
+ ObjectId oid = object->getObjectId();
+ ManagementObjectMap::iterator destIter = managementObjects.find(oid);
+ if (destIter != managementObjects.end()) {
+ // duplicate found. It is OK if the old object has been marked
+ // deleted...
+ ManagementObject *oldObj = destIter->second;
+ if (oldObj->isDeleted()) {
+ DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support));
+ pendingDeletedObjs[dptr->getKey()].push_back(dptr);
+ delete oldObj;
+ } else {
+ // Duplicate non-deleted objects? This is a user error - oids must be unique.
+ // for now, leak the old object (safer than deleting - may still be referenced)
+ // and complain loudly...
+ QPID_LOG(error, "Detected two management objects with the same identifier: " << oid);
+ }
+ }
+ managementObjects[oid] = object;
+ }
}
- newManagementObjects.clear();
}
void ManagementAgent::periodicProcessing (void)
@@ -670,7 +688,126 @@ void ManagementAgent::periodicProcessing (void)
clientWasAdded = false;
+ // first send the pending deletes before sending updates. This prevents a
+ // "false delete" scenario: if an object was deleted then re-added during
+ // the last poll cycle, it will have a delete entry and an active entry.
+ // if we sent the active update first, _then_ the delete update, clients
+ // would incorrectly think the object was deleted. See QPID-2997
+ //
bool objectsDeleted = moveDeletedObjectsLH();
+ if (!pendingDeletedObjs.empty()) {
+ // use a temporary copy of the pending deletes so dropping the lock when
+ // the buffer is sent is safe.
+ PendingDeletedObjsMap tmp(pendingDeletedObjs);
+ pendingDeletedObjs.clear();
+
+ for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) {
+ std::string packageName;
+ std::string className;
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ uint32_t v1Objs = 0;
+ uint32_t v2Objs = 0;
+ Variant::List list_;
+
+ size_t pos = mIter->first.find(":");
+ packageName = mIter->first.substr(0, pos);
+ className = mIter->first.substr(pos+1);
+
+ for (DeletedObjectList::iterator lIter = mIter->second.begin();
+ lIter != mIter->second.end(); lIter++) {
+ std::string oid = (*lIter)->objectId;
+ if (!(*lIter)->encodedV1Config.empty()) {
+ encodeHeader(msgBuffer, 'c');
+ msgBuffer.putRawData((*lIter)->encodedV1Config);
+ QPID_LOG(trace, "Deleting V1 properties " << oid
+ << " len=" << (*lIter)->encodedV1Config.size());
+ v1Objs++;
+ }
+ if (!(*lIter)->encodedV1Inst.empty()) {
+ encodeHeader(msgBuffer, 'i');
+ msgBuffer.putRawData((*lIter)->encodedV1Inst);
+ QPID_LOG(trace, "Deleting V1 statistics " << oid
+ << " len=" << (*lIter)->encodedV1Inst.size());
+ v1Objs++;
+ }
+ if (v1Objs && msgBuffer.available() < HEADROOM) {
+ v1Objs = 0;
+ contentSize = BUFSIZE - msgBuffer.available();
+ stringstream key;
+ key << "console.obj.1.0." << packageName << "." << className;
+ msgBuffer.reset();
+ sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to="
+ << key.str() << " len=" << contentSize);
+ }
+
+ if (!(*lIter)->encodedV2.empty()) {
+ QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
+ list_.push_back((*lIter)->encodedV2);
+ if (++v2Objs >= maxV2ReplyObjs) {
+ v2Objs = 0;
+
+ string content;
+ ListCodec::encode(list_, content);
+ list_.clear();
+ if (content.length()) {
+ stringstream key;
+ Variant::Map headers;
+ key << "agent.ind.data." << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ key << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ }
+ }
+ }
+ } // end current list
+
+ // send any remaining objects...
+
+ if (v1Objs) {
+ contentSize = BUFSIZE - msgBuffer.available();
+ stringstream key;
+ key << "console.obj.1.0." << packageName << "." << className;
+ msgBuffer.reset();
+ sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
+ }
+
+ if (!list_.empty()) {
+ string content;
+ ListCodec::encode(list_, content);
+ list_.clear();
+ if (content.length()) {
+ stringstream key;
+ Variant::Map headers;
+ key << "agent.ind.data." << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ key << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ }
+ }
+ } // end map
+ }
//
// Process the entire object map. Remember: we drop the userLock each time we call
@@ -828,122 +965,6 @@ void ManagementAgent::periodicProcessing (void)
}
} // end processing updates for all objects
-
- // now send the pending deletes. Make a temporary copy of the pending deletes so dropping the
- // lock when the buffer is sent is safe.
- //
- if (!pendingDeletedObjs.empty()) {
- PendingDeletedObjsMap tmp(pendingDeletedObjs);
- pendingDeletedObjs.clear();
-
- for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) {
- std::string packageName;
- std::string className;
- Buffer msgBuffer(msgChars, BUFSIZE);
- uint32_t v1Objs = 0;
- uint32_t v2Objs = 0;
- Variant::List list_;
-
- size_t pos = mIter->first.find(":");
- packageName = mIter->first.substr(0, pos);
- className = mIter->first.substr(pos+1);
-
- for (DeletedObjectList::iterator lIter = mIter->second.begin();
- lIter != mIter->second.end(); lIter++) {
- std::string oid = (*lIter)->objectId;
- if (!(*lIter)->encodedV1Config.empty()) {
- encodeHeader(msgBuffer, 'c');
- msgBuffer.putRawData((*lIter)->encodedV1Config);
- QPID_LOG(trace, "Deleting V1 properties " << oid
- << " len=" << (*lIter)->encodedV1Config.size());
- v1Objs++;
- }
- if (!(*lIter)->encodedV1Inst.empty()) {
- encodeHeader(msgBuffer, 'i');
- msgBuffer.putRawData((*lIter)->encodedV1Inst);
- QPID_LOG(trace, "Deleting V1 statistics " << oid
- << " len=" << (*lIter)->encodedV1Inst.size());
- v1Objs++;
- }
- if (v1Objs && msgBuffer.available() < HEADROOM) {
- v1Objs = 0;
- contentSize = BUFSIZE - msgBuffer.available();
- stringstream key;
- key << "console.obj.1.0." << packageName << "." << className;
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to="
- << key.str() << " len=" << contentSize);
- }
-
- if (!(*lIter)->encodedV2.empty()) {
- QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
- list_.push_back((*lIter)->encodedV2);
- if (++v2Objs >= maxV2ReplyObjs) {
- v2Objs = 0;
-
- string content;
- ListCodec::encode(list_, content);
- list_.clear();
- if (content.length()) {
- stringstream key;
- Variant::Map headers;
- key << "agent.ind.data." << keyifyNameStr(packageName)
- << "." << keyifyNameStr(className)
- << "." << vendorNameKey
- << "." << productNameKey;
- if (!instanceNameKey.empty())
- key << "." << instanceNameKey;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
-
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
- }
- }
- }
- } // end current list
-
- // send any remaining objects...
-
- if (v1Objs) {
- contentSize = BUFSIZE - msgBuffer.available();
- stringstream key;
- key << "console.obj.1.0." << packageName << "." << className;
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
- }
-
- if (!list_.empty()) {
- string content;
- ListCodec::encode(list_, content);
- list_.clear();
- if (content.length()) {
- stringstream key;
- Variant::Map headers;
- key << "agent.ind.data." << keyifyNameStr(packageName)
- << "." << keyifyNameStr(className)
- << "." << vendorNameKey
- << "." << productNameKey;
- if (!instanceNameKey.empty())
- key << "." << instanceNameKey;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
-
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
- }
- }
- } // end map
- }
-
if (objectsDeleted) deleteOrphanedAgentsLH();
// heartbeat generation
@@ -2619,13 +2640,24 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
}
namespace {
-bool isDeleted(const ManagementObjectMap::value_type& value) {
+bool isDeletedMap(const ManagementObjectMap::value_type& value) {
return value.second->isDeleted();
}
+bool isDeletedVector(const ManagementObjectVector::value_type& value) {
+ return value->isDeleted();
+}
+
string summarizeMap(const char* name, const ManagementObjectMap& map) {
ostringstream o;
- size_t deleted = std::count_if(map.begin(), map.end(), isDeleted);
+ size_t deleted = std::count_if(map.begin(), map.end(), isDeletedMap);
+ o << map.size() << " " << name << " (" << deleted << " deleted), ";
+ return o.str();
+}
+
+string summarizeVector(const char* name, const ManagementObjectVector& map) {
+ ostringstream o;
+ size_t deleted = std::count_if(map.begin(), map.end(), isDeletedVector);
o << map.size() << " " << name << " (" << deleted << " deleted), ";
return o.str();
}
@@ -2639,6 +2671,15 @@ string dumpMap(const ManagementObjectMap& map) {
return o.str();
}
+string dumpVector(const ManagementObjectVector& map) {
+ ostringstream o;
+ for (ManagementObjectVector::const_iterator i = map.begin(); i != map.end(); ++i) {
+ o << endl << " " << (*i)->getObjectId().getV2Key()
+ << ((*i)->isDeleted() ? " (deleted)" : "");
+ }
+ return o.str();
+}
+
} // namespace
string ManagementAgent::summarizeAgents() {
@@ -2658,14 +2699,14 @@ void ManagementAgent::debugSnapshot(const char* title) {
QPID_LOG(debug, title << ": management snapshot: "
<< packages.size() << " packages, "
<< summarizeMap("objects", managementObjects)
- << summarizeMap("new objects ", newManagementObjects)
+ << summarizeVector("new objects ", newManagementObjects)
<< pendingDeletedObjs.size() << " pending deletes"
<< summarizeAgents());
QPID_LOG_IF(trace, managementObjects.size(),
title << ": objects" << dumpMap(managementObjects));
QPID_LOG_IF(trace, newManagementObjects.size(),
- title << ": new objects" << dumpMap(newManagementObjects));
+ title << ": new objects" << dumpVector(newManagementObjects));
}
Variant::Map ManagementAgent::toMap(const FieldTable& from)
@@ -2910,6 +2951,45 @@ void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList)
}
+// construct a DeletedObject from a management object.
+ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bool v2)
+ : packageName(src->getPackageName()),
+ className(src->getClassName())
+{
+ bool send_stats = (src->hasInst() && (src->getInstChanged() || src->getForcePublish()));
+
+ stringstream oid;
+ oid << src->getObjectId();
+ objectId = oid.str();
+
+ if (v1) {
+ src->writeProperties(encodedV1Config);
+ if (send_stats) {
+ src->writeStatistics(encodedV1Inst);
+ }
+ }
+
+ if (v2) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ src->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(src->getPackageName(),
+ src->getClassName(),
+ "_data",
+ src->getMd5Sum());
+ src->writeTimestamps(map_);
+ src->mapEncodeValues(values, true, send_stats);
+ map_["_values"] = values;
+
+ encodedV2 = map_;
+ }
+}
+
+
+
// construct a DeletedObject from an encoded representation. Used by
// clustering to move deleted objects between clustered brokers. See
// DeletedObject::encode() for the reverse.
@@ -2966,42 +3046,9 @@ bool ManagementAgent::moveDeletedObjectsLH() {
{
ManagementObject* delObj = iter->second;
assert(delObj->isDeleted());
- DeletedObject::shared_ptr dptr(new DeletedObject());
- std::string classkey(delObj->getPackageName() + std::string(":") + delObj->getClassName());
- bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || delObj->getForcePublish()));
-
- dptr->packageName = delObj->getPackageName();
- dptr->className = delObj->getClassName();
- stringstream oid;
- oid << delObj->getObjectId();
- dptr->objectId = oid.str();
-
- if (qmf1Support) {
- delObj->writeProperties(dptr->encodedV1Config);
- if (send_stats) {
- delObj->writeStatistics(dptr->encodedV1Inst);
- }
- }
-
- if (qmf2Support) {
- Variant::Map map_;
- Variant::Map values;
- Variant::Map oid;
-
- delObj->getObjectId().mapEncode(oid);
- map_["_object_id"] = oid;
- map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(),
- delObj->getClassName(),
- "_data",
- delObj->getMd5Sum());
- delObj->writeTimestamps(map_);
- delObj->mapEncodeValues(values, true, send_stats);
- map_["_values"] = values;
-
- dptr->encodedV2 = map_;
- }
+ DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support));
- pendingDeletedObjs[classkey].push_back(dptr);
+ pendingDeletedObjs[dptr->getKey()].push_back(dptr);
managementObjects.erase(iter->first);
delete iter->second;
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 87c39a67bd..2202e2fc98 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -159,13 +159,17 @@ public:
class DeletedObject {
public:
typedef boost::shared_ptr<DeletedObject> shared_ptr;
- DeletedObject() {};
+ DeletedObject(ManagementObject *, bool v1, bool v2);
DeletedObject( const std::string &encoded );
~DeletedObject() {};
void encode( std::string& toBuffer );
+ const std::string getKey() const {
+ // used to batch up objects of the same class type
+ return std::string(packageName + std::string(":") + className);
+ }
private:
- friend class ManagementAgent;
+ friend class ManagementAgent;
std::string packageName;
std::string className;
@@ -280,7 +284,7 @@ private:
//
// Protected by addLock
//
- ManagementObjectMap newManagementObjects;
+ ManagementObjectVector newManagementObjects;
framing::Uuid uuid;
diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp
index cfdd58ed53..b4d469afbe 100644
--- a/qpid/cpp/src/qpid/management/ManagementObject.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp
@@ -187,10 +187,6 @@ void ObjectId::setV2Key(const ManagementObject& object)
v2Key = oname.str();
}
-void ObjectId::disambiguate()
-{
- v2Key = v2Key + "_";
-}
// encode as V2-format map
void ObjectId::mapEncode(types::Variant::Map& map) const
diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
index 80bd590d7d..d0c6668b72 100644
--- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
+++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
@@ -24,9 +24,13 @@
#include "qpid/management/Buffer.h"
#include "qpid/messaging/Message.h"
#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/log/Logger.h"
+#include "qpid/log/Options.h"
#include "qmf/org/apache/qpid/broker/mgmt/test/TestObject.h"
+#include <iomanip>
+
using qpid::management::Mutex;
using qpid::management::Manageable;
@@ -53,9 +57,10 @@ namespace qpid {
MessagingFixture *mFix;
public:
- AgentFixture( unsigned int pubInterval=10, bool qmfV2=false )
+ AgentFixture( unsigned int pubInterval=10,
+ bool qmfV2=false,
+ qpid::broker::Broker::Options opts = qpid::broker::Broker::Options())
{
- qpid::broker::Broker::Options opts = qpid::broker::Broker::Options();
opts.enableMgmt=true;
opts.qmf2Support=qmfV2;
opts.mgmtPubInterval=pubInterval;
@@ -99,12 +104,15 @@ namespace qpid {
class TestManageable : public qpid::management::Manageable
{
management::ManagementObject* mgmtObj;
+ const std::string key;
public:
- TestManageable(management::ManagementAgent *agent) {
+ TestManageable(management::ManagementAgent *agent, std::string _key)
+ : key(_key)
+ {
_qmf::TestObject *tmp = new _qmf::TestObject(agent, this);
// seed it with some default values...
- tmp->set_string1("This is a test string!");
+ tmp->set_string1(key);
tmp->set_bool1(true);
qpid::types::Variant::Map vMap;
vMap["one"] = qpid::types::Variant(1);
@@ -118,8 +126,8 @@ namespace qpid {
management::ManagementObject* GetManagementObject() const { return mgmtObj; };
static void validateTestObjectProperties(_qmf::TestObject& to)
{
- // verify the default values are as expected
- BOOST_CHECK(to.get_string1() == std::string("This is a test string!"));
+ // verify the default values are as expected. We don't check 'string1',
+ // as it is the object key, and is unique for each object (no default value).
BOOST_CHECK(to.get_bool1() == true);
BOOST_CHECK(to.get_map1().size() == 3);
qpid::types::Variant::Map mappy = to.get_map1();
@@ -200,7 +208,7 @@ namespace qpid {
agent = fix->getBrokerAgent();
// create a manageable test object
- TestManageable *tm = new TestManageable(agent);
+ TestManageable *tm = new TestManageable(agent, std::string("obj1"));
uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
@@ -262,7 +270,7 @@ namespace qpid {
management::ManagementAgent* agent;
agent = fix->getBrokerAgent();
- TestManageable *tm = new TestManageable(agent);
+ TestManageable *tm = new TestManageable(agent, std::string("obj2"));
Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#");
@@ -326,7 +334,7 @@ namespace qpid {
agent = fix->getBrokerAgent();
// create a manageable test object
- TestManageable *tm = new TestManageable(agent);
+ TestManageable *tm = new TestManageable(agent, std::string("myObj"));
uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
@@ -390,7 +398,7 @@ namespace qpid {
agent = fix->getBrokerAgent();
// create a manageable test object
- TestManageable *tm = new TestManageable(agent);
+ TestManageable *tm = new TestManageable(agent, std::string("anObj"));
uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
@@ -465,7 +473,7 @@ namespace qpid {
agent = fix->getBrokerAgent();
// create a manageable test object
- TestManageable *tm = new TestManageable(agent);
+ TestManageable *tm = new TestManageable(agent, std::string("objectifyMe"));
// add, then immediately delete and export the object...
@@ -496,7 +504,13 @@ namespace qpid {
uint32_t objLen;
for (size_t i = 0; i < objCount; i++) {
- TestManageable *tm = new TestManageable(agent);
+ std::stringstream key;
+ key << "testobj-" << std::setfill('x') << std::setw(4) << i;
+ // (no, seriously, I didn't just do that.)
+ // Note well: we have to keep the key string length EXACTLY THE SAME
+ // FOR ALL OBJECTS, so objLen will be the same. Otherwise the
+ // decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length).
+ TestManageable *tm = new TestManageable(agent, key.str());
objLen = tm->GetManagementObject()->writePropertiesSize();
agent->addObject(tm->GetManagementObject(), i + 1);
tmv.push_back(tm);
@@ -590,7 +604,7 @@ namespace qpid {
for (size_t i = 0; i < objCount; i++) {
std::stringstream key;
key << "testobj-" << i;
- TestManageable *tm = new TestManageable(agent);
+ TestManageable *tm = new TestManageable(agent, key.str());
objLen = tm->GetManagementObject()->writePropertiesSize();
agent->addObject(tm->GetManagementObject(), key.str());
tmv.push_back(tm);
@@ -665,6 +679,112 @@ namespace qpid {
delete fix;
}
+ // See QPID-2997
+ QPID_AUTO_TEST_CASE(v2RapidRestoreObj)
+ {
+ AgentFixture* fix = new AgentFixture(3, true);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
+
+ // two objects, same ObjID
+ TestManageable *tm1 = new TestManageable(agent, std::string("obj2"));
+ TestManageable *tm2 = new TestManageable(agent, std::string("obj2"));
+
+ Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#");
+
+ // add, then immediately delete and re-add a copy of the object
+ agent->addObject(tm1->GetManagementObject(), "testobj-1");
+ tm1->GetManagementObject()->resourceDestroy();
+ agent->addObject(tm2->GetManagementObject(), "testobj-1");
+
+ // expect: a delete notification, then an update notification
+ TestObjectVector objs;
+ bool isDeleted = false;
+ bool isAdvertised = false;
+ size_t count = 0;
+ Message m1;
+ while (r1.fetch(m1, Duration::SECOND * 6)) {
+
+ decodeV2ObjectUpdates(m1, objs);
+ BOOST_CHECK(objs.size() > 0);
+
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+ count++;
+ TestManageable::validateTestObjectProperties(**oIter);
+
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0) {
+ isDeleted = true;
+ BOOST_CHECK(isAdvertised == false); // delete must be first
+ } else {
+ isAdvertised = true;
+ BOOST_CHECK(isDeleted == true); // delete must be first
+ }
+ }
+ }
+
+ BOOST_CHECK(isDeleted);
+ BOOST_CHECK(isAdvertised);
+ BOOST_CHECK(count == 2);
+
+ r1.close();
+ delete fix;
+ delete tm1;
+ delete tm2;
+ }
+
+ // See QPID-2997
+ QPID_AUTO_TEST_CASE(v2DuplicateErrorObj)
+ {
+ AgentFixture* fix = new AgentFixture(3, true);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
+
+ // turn off the expected error log message
+ qpid::log::Options logOpts;
+ logOpts.selectors.clear();
+ logOpts.selectors.push_back("critical+");
+ qpid::log::Logger::instance().configure(logOpts);
+
+ // two objects, same ObjID
+ TestManageable *tm1 = new TestManageable(agent, std::string("obj2"));
+ TestManageable *tm2 = new TestManageable(agent, std::string("obj2"));
+ // Keep a pointer to the ManagementObject. This test simulates a user-caused error
+ // case (duplicate objects) where the broker has no choice but to leak a management
+ // object (safest assumption). To prevent valgrind from flagging this leak, we
+ // manually clean up the object at the end of the test.
+ management::ManagementObject *save = tm2->GetManagementObject();
+
+ Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#");
+
+ // add, then immediately delete and re-add a copy of the object
+ agent->addObject(tm1->GetManagementObject(), "testobj-1");
+ agent->addObject(tm2->GetManagementObject(), "testobj-1");
+
+ TestObjectVector objs;
+ size_t count = 0;
+ Message m1;
+ while (r1.fetch(m1, Duration::SECOND * 6)) {
+
+ decodeV2ObjectUpdates(m1, objs);
+ BOOST_CHECK(objs.size() > 0);
+
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+ count++;
+ TestManageable::validateTestObjectProperties(**oIter);
+ }
+ }
+
+ BOOST_CHECK(count == 1); // only one should be accepted.
+
+ r1.close();
+ delete fix;
+ delete tm1;
+ delete tm2;
+ delete save;
+ }
+
QPID_AUTO_TEST_SUITE_END()
}
}