summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/management
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-12-01 21:31:36 +0000
committerAlan Conway <aconway@apache.org>2010-12-01 21:31:36 +0000
commitc7ffb7a9c913627a4104823e5384144b348d7623 (patch)
tree5465163d0c5797ba347cef532cab0a380ccbd218 /qpid/cpp/src/qpid/management
parent4101090c274118708f0183b436f3de68f1a32277 (diff)
downloadqpid-python-c7ffb7a9c913627a4104823e5384144b348d7623.tar.gz
Modified cluster_tests causes broker shut down with invalid-argument error.
Described in https://bugzilla.redhat.com/show_bug.cgi?id=655078. The management agent's deleted-object list was not being replicated to new members joining the cluster, so management generated fewer deleted object notifications on the newer member, causing it to fail with an invalid-argument error. The list is now being replicated correctly. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1041181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/management')
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp315
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h41
2 files changed, 347 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 11e65efd64..6295f56226 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -655,6 +655,11 @@ void ManagementAgent::periodicProcessing (void)
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second;
+
+ if (object->isDeleted()) {
+ deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
+ }
+
object->setFlags(0);
if (clientWasAdded) {
object->setForcePublish(true);
@@ -663,6 +668,52 @@ void ManagementAgent::periodicProcessing (void)
clientWasAdded = false;
+ // Remove Deleted objects, and save for later publishing...
+ //
+ for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin();
+ iter != deleteList.rend();
+ iter++) {
+
+ ManagementObject* delObj = iter->second;
+ DeletedObject::shared_ptr dptr(new DeletedObject());
+ std::string classkey(delObj->getPackageName() + std::string(":") + delObj->getClassName());
+ bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || delObj->getForcePublish()));
+
+ dptr->packageName = delObj->getPackageName();
+ dptr->className = delObj->getClassName();
+ delObj->getObjectId().encode(dptr->objectId);
+
+ if (qmf1Support) {
+ delObj->writeProperties(dptr->encodedV1Config);
+ if (send_stats) {
+ delObj->writeStatistics(dptr->encodedV1Inst);
+ }
+ }
+
+ if (qmf2Support) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ delObj->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(),
+ delObj->getClassName(),
+ "_data",
+ delObj->getMd5Sum());
+ delObj->writeTimestamps(map_);
+ delObj->mapEncodeValues(values, true, send_stats);
+ map_["_values"] = values;
+
+ dptr->encodedV2 = map_;
+ }
+
+ pendingDeletedObjs[classkey].push_back(dptr);
+
+ delete iter->second;
+ managementObjects.erase(iter->first);
+ }
+
//
// Process the entire object map. Remember: we drop the userLock each time we call
// sendBuffer(). This allows the managementObjects map to be altered during the
@@ -711,7 +762,13 @@ void ManagementAgent::periodicProcessing (void)
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ // skip any objects marked deleted since our first pass. Deal with them
+ // on the next periodic cycle...
+ if (object->isDeleted()) {
+ continue;
+ }
+
+ send_props = (object->getConfigChanged() || object->getForcePublish());
send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
if (send_props && qmf1Support) {
@@ -749,8 +806,6 @@ void ManagementAgent::periodicProcessing (void)
if (send_props) pcount++;
if (send_stats) scount++;
- if (object->isDeleted())
- deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
object->setForcePublish(false);
if ((qmf1Support && (msgBuffer.available() < HEADROOM)) ||
@@ -796,12 +851,114 @@ void ManagementAgent::periodicProcessing (void)
}
} // end processing updates for all objects
- // Delete flagged objects
- for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin();
- iter != deleteList.rend();
- iter++) {
- delete iter->second;
- managementObjects.erase(iter->first);
+
+ // now send the pending deletes. Make a temporary copy of the pending deletes so dropping the
+ // lock when the buffer is sent is safe.
+ //
+ if (!pendingDeletedObjs.empty()) {
+ PendingDeletedObjsMap tmp(pendingDeletedObjs);
+ pendingDeletedObjs.clear();
+
+ for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) {
+ std::string packageName;
+ std::string className;
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ uint32_t v1Objs = 0;
+ uint32_t v2Objs = 0;
+ Variant::List list_;
+
+ size_t pos = mIter->first.find(":");
+ packageName = mIter->first.substr(0, pos);
+ className = mIter->first.substr(pos+1);
+
+ for (DeletedObjectList::iterator lIter = mIter->second.begin();
+ lIter != mIter->second.end(); lIter++) {
+
+ if (!(*lIter)->encodedV1Config.empty()) {
+ encodeHeader(msgBuffer, 'c');
+ msgBuffer.putRawData((*lIter)->encodedV1Config);
+ v1Objs++;
+ }
+ if (!(*lIter)->encodedV1Inst.empty()) {
+ encodeHeader(msgBuffer, 'i');
+ msgBuffer.putRawData((*lIter)->encodedV1Inst);
+ v1Objs++;
+ }
+ if (v1Objs && msgBuffer.available() < HEADROOM) {
+ v1Objs = 0;
+ contentSize = BUFSIZE - msgBuffer.available();
+ stringstream key;
+ key << "console.obj.1.0." << packageName << "." << className;
+ msgBuffer.reset();
+ sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str());
+ }
+
+ if (!(*lIter)->encodedV2.empty()) {
+ list_.push_back((*lIter)->encodedV2);
+ if (++v2Objs >= maxV2ReplyObjs) {
+ v2Objs = 0;
+
+ string content;
+ ListCodec::encode(list_, content);
+ list_.clear();
+ if (content.length()) {
+ stringstream key;
+ Variant::Map headers;
+ key << "agent.ind.data." << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ key << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ }
+ }
+ }
+ } // end current list
+
+ // send any remaining objects...
+
+ if (v1Objs) {
+ contentSize = BUFSIZE - msgBuffer.available();
+ stringstream key;
+ key << "console.obj.1.0." << packageName << "." << className;
+ msgBuffer.reset();
+ sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str());
+ }
+
+ if (!list_.empty()) {
+ string content;
+ ListCodec::encode(list_, content);
+ list_.clear();
+ if (content.length()) {
+ stringstream key;
+ Variant::Map headers;
+ key << "agent.ind.data." << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey;
+ if (!instanceNameKey.empty())
+ key << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
+ }
+ }
+ } // end map
}
if (!deleteList.empty()) {
@@ -2700,3 +2857,143 @@ Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in)
return out;
}
+
+// Build up a list of the current set of deleted objects that are pending their
+// next (last) publish-ment.
+void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList)
+{
+ sys::Mutex::ScopedLock lock (userLock);
+ list<pair<ObjectId, ManagementObject*> > deleteList;
+
+ moveNewObjectsLH();
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+
+ if (object->isDeleted()) {
+ deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
+ }
+ }
+
+ // Remove Deleted objects, and save for later publishing...
+ //
+ for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin();
+ iter != deleteList.rend();
+ iter++) {
+
+ ManagementObject* delObj = iter->second;
+ DeletedObject::shared_ptr dptr(new DeletedObject());
+ std::string classkey(delObj->getPackageName() + std::string(":") + delObj->getClassName());
+ bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || delObj->getForcePublish()));
+
+ dptr->packageName = delObj->getPackageName();
+ dptr->className = delObj->getClassName();
+ delObj->getObjectId().encode(dptr->objectId);
+
+ if (qmf1Support) {
+ delObj->writeProperties(dptr->encodedV1Config);
+ if (send_stats) {
+ delObj->writeStatistics(dptr->encodedV1Inst);
+ }
+ }
+
+ if (qmf2Support) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ delObj->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(),
+ delObj->getClassName(),
+ "_data",
+ delObj->getMd5Sum());
+ delObj->writeTimestamps(map_);
+ delObj->mapEncodeValues(values, true, send_stats);
+ map_["_values"] = values;
+
+ dptr->encodedV2 = map_;
+ }
+
+ pendingDeletedObjs[classkey].push_back(dptr);
+
+ delete iter->second;
+ managementObjects.erase(iter->first);
+ }
+
+ // now copy the pending deletes into the outList
+
+ for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin();
+ mIter != pendingDeletedObjs.end(); mIter++) {
+ for (DeletedObjectList::iterator lIter = mIter->second.begin();
+ lIter != mIter->second.end(); lIter++) {
+ outList.push_back(*lIter);
+ }
+ }
+}
+
+
+// Merge this list's deleted objects to the management Agent's list of deleted
+// objects waiting for next (last) publish-ment.
+void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList)
+{
+ sys::Mutex::ScopedLock lock (userLock);
+
+ for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) {
+
+ std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className);
+ DeletedObjectList& dList = pendingDeletedObjs[classkey];
+
+ // not sure if this is necessary - merge by objectid....
+ bool found = false;
+ for (DeletedObjectList::iterator dIter = dList.begin(); dIter != dList.end(); dIter++) {
+ if ((*dIter)->objectId == (*lIter)->objectId) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ dList.push_back(*lIter);
+ }
+ }
+}
+
+
+// construct a DeletedObject from an encoded representation. Used by
+// clustering to move deleted objects between clustered brokers. See
+// DeletedObject::encode() for the reverse.
+ManagementAgent::DeletedObject::DeletedObject(const std::string& encoded)
+{
+ qpid::types::Variant::Map map_;
+ MapCodec::decode(encoded, map_);
+
+ packageName = map_["_package_name"].getString();
+ className = map_["_class_name"].getString();
+ objectId = map_["_object_id"].getString();
+
+ encodedV1Config = map_["_v1_config"].getString();
+ encodedV1Inst = map_["_v1_inst"].getString();
+ encodedV2 = map_["_v2_data"].asMap();
+}
+
+
+// encode a DeletedObject to a string buffer. Used by
+// clustering to move deleted objects between clustered brokers. See
+// DeletedObject(const std::string&) for the reverse.
+void ManagementAgent::DeletedObject::encode(std::string& toBuffer)
+{
+ qpid::types::Variant::Map map_;
+
+
+ map_["_package_name"] = packageName;
+ map_["_class_name"] = className;
+ map_["_object_id"] = objectId;
+
+ map_["_v1_config"] = encodedV1Config;
+ map_["_v1_inst"] = encodedV1Inst;
+ map_["_v2_data"] = encodedV2;
+
+ MapCodec::encode(map_, toBuffer);
+}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index f4d3c8c299..9829094a0f 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -148,6 +148,40 @@ public:
static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in);
static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val);
+ // For Clustering: management objects that have been marked as
+ // "deleted", but are waiting for their last published object
+ // update are not visible to the cluster replication code. These
+ // interfaces allow clustering to gather up all the management
+ // objects that are deleted in order to allow all clustered
+ // brokers to publish the same set of deleted objects.
+
+ class DeletedObject {
+ public:
+ typedef boost::shared_ptr<DeletedObject> shared_ptr;
+ DeletedObject() {};
+ DeletedObject( const std::string &encoded );
+ ~DeletedObject() {};
+ void encode( std::string& toBuffer );
+
+ private:
+ friend class ManagementAgent;
+
+ std::string packageName;
+ std::string className;
+ std::string objectId;
+
+ std::string encodedV1Config; // qmfv1 properties
+ std::string encodedV1Inst; // qmfv1 statistics
+ qpid::types::Variant::Map encodedV2;
+ };
+
+ typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
+
+ /** returns a snapshot of all currently deleted management objects. */
+ void exportDeletedObjects( DeletedObjectList& outList );
+
+ /** Import a list of deleted objects to send on next publish interval. */
+ void importDeletedObjects( const DeletedObjectList& inList );
private:
struct Periodic : public qpid::sys::TimerTask
@@ -293,6 +327,13 @@ private:
// message.
uint32_t maxV2ReplyObjs;
+ // list of objects that have been deleted, but have yet to be published
+ // one final time.
+ // Indexed by a string composed of the object's package and class name.
+ // Protected by userLock.
+ typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
+ PendingDeletedObjsMap pendingDeletedObjs;
+
# define MA_BUFFER_SIZE 65536
char inputBuffer[MA_BUFFER_SIZE];
char outputBuffer[MA_BUFFER_SIZE];