diff options
author | Alan Conway <aconway@apache.org> | 2010-12-01 21:31:36 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-12-01 21:31:36 +0000 |
commit | c7ffb7a9c913627a4104823e5384144b348d7623 (patch) | |
tree | 5465163d0c5797ba347cef532cab0a380ccbd218 /qpid/cpp/src/qpid/management | |
parent | 4101090c274118708f0183b436f3de68f1a32277 (diff) | |
download | qpid-python-c7ffb7a9c913627a4104823e5384144b348d7623.tar.gz |
Modified cluster_tests causes broker shut down with invalid-argument error.
Described in https://bugzilla.redhat.com/show_bug.cgi?id=655078. The
management agent's deleted-object list was not being replicated to new
members joining the cluster, so management generated fewer deleted
object notifications on the newer member, causing it to fail with an
invalid-argument error. The list is now being replicated correctly.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1041181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/management')
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 315 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 41 |
2 files changed, 347 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 11e65efd64..6295f56226 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -655,6 +655,11 @@ void ManagementAgent::periodicProcessing (void) iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; + + if (object->isDeleted()) { + deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); + } + object->setFlags(0); if (clientWasAdded) { object->setForcePublish(true); @@ -663,6 +668,52 @@ void ManagementAgent::periodicProcessing (void) clientWasAdded = false; + // Remove Deleted objects, and save for later publishing... + // + for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); + iter++) { + + ManagementObject* delObj = iter->second; + DeletedObject::shared_ptr dptr(new DeletedObject()); + std::string classkey(delObj->getPackageName() + std::string(":") + delObj->getClassName()); + bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || delObj->getForcePublish())); + + dptr->packageName = delObj->getPackageName(); + dptr->className = delObj->getClassName(); + delObj->getObjectId().encode(dptr->objectId); + + if (qmf1Support) { + delObj->writeProperties(dptr->encodedV1Config); + if (send_stats) { + delObj->writeStatistics(dptr->encodedV1Inst); + } + } + + if (qmf2Support) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + delObj->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(), + delObj->getClassName(), + "_data", + delObj->getMd5Sum()); + delObj->writeTimestamps(map_); + delObj->mapEncodeValues(values, true, send_stats); + map_["_values"] = values; + + dptr->encodedV2 = map_; + } + + pendingDeletedObjs[classkey].push_back(dptr); + + delete iter->second; + managementObjects.erase(iter->first); + } + // // Process the entire object map. Remember: we drop the userLock each time we call // sendBuffer(). This allows the managementObjects map to be altered during the @@ -711,7 +762,13 @@ void ManagementAgent::periodicProcessing (void) if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + // skip any objects marked deleted since our first pass. Deal with them + // on the next periodic cycle... + if (object->isDeleted()) { + continue; + } + + send_props = (object->getConfigChanged() || object->getForcePublish()); send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); if (send_props && qmf1Support) { @@ -749,8 +806,6 @@ void ManagementAgent::periodicProcessing (void) if (send_props) pcount++; if (send_stats) scount++; - if (object->isDeleted()) - deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); if ((qmf1Support && (msgBuffer.available() < HEADROOM)) || @@ -796,12 +851,114 @@ void ManagementAgent::periodicProcessing (void) } } // end processing updates for all objects - // Delete flagged objects - for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin(); - iter != deleteList.rend(); - iter++) { - delete iter->second; - managementObjects.erase(iter->first); + + // now send the pending deletes. Make a temporary copy of the pending deletes so dropping the + // lock when the buffer is sent is safe. + // + if (!pendingDeletedObjs.empty()) { + PendingDeletedObjsMap tmp(pendingDeletedObjs); + pendingDeletedObjs.clear(); + + for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { + std::string packageName; + std::string className; + Buffer msgBuffer(msgChars, BUFSIZE); + uint32_t v1Objs = 0; + uint32_t v2Objs = 0; + Variant::List list_; + + size_t pos = mIter->first.find(":"); + packageName = mIter->first.substr(0, pos); + className = mIter->first.substr(pos+1); + + for (DeletedObjectList::iterator lIter = mIter->second.begin(); + lIter != mIter->second.end(); lIter++) { + + if (!(*lIter)->encodedV1Config.empty()) { + encodeHeader(msgBuffer, 'c'); + msgBuffer.putRawData((*lIter)->encodedV1Config); + v1Objs++; + } + if (!(*lIter)->encodedV1Inst.empty()) { + encodeHeader(msgBuffer, 'i'); + msgBuffer.putRawData((*lIter)->encodedV1Inst); + v1Objs++; + } + if (v1Objs && msgBuffer.available() < HEADROOM) { + v1Objs = 0; + contentSize = BUFSIZE - msgBuffer.available(); + stringstream key; + key << "console.obj.1.0." << packageName << "." << className; + msgBuffer.reset(); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str()); + } + + if (!(*lIter)->encodedV2.empty()) { + list_.push_back((*lIter)->encodedV2); + if (++v2Objs >= maxV2ReplyObjs) { + v2Objs = 0; + + string content; + ListCodec::encode(list_, content); + list_.clear(); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + } + } + } + } // end current list + + // send any remaining objects... + + if (v1Objs) { + contentSize = BUFSIZE - msgBuffer.available(); + stringstream key; + key << "console.obj.1.0." << packageName << "." << className; + msgBuffer.reset(); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str()); + } + + if (!list_.empty()) { + string content; + ListCodec::encode(list_, content); + list_.clear(); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + } + } + } // end map } if (!deleteList.empty()) { @@ -2700,3 +2857,143 @@ Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in) return out; } + +// Build up a list of the current set of deleted objects that are pending their +// next (last) publish-ment. +void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList) +{ + sys::Mutex::ScopedLock lock (userLock); + list<pair<ObjectId, ManagementObject*> > deleteList; + + moveNewObjectsLH(); + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + + if (object->isDeleted()) { + deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); + } + } + + // Remove Deleted objects, and save for later publishing... + // + for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); + iter++) { + + ManagementObject* delObj = iter->second; + DeletedObject::shared_ptr dptr(new DeletedObject()); + std::string classkey(delObj->getPackageName() + std::string(":") + delObj->getClassName()); + bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || delObj->getForcePublish())); + + dptr->packageName = delObj->getPackageName(); + dptr->className = delObj->getClassName(); + delObj->getObjectId().encode(dptr->objectId); + + if (qmf1Support) { + delObj->writeProperties(dptr->encodedV1Config); + if (send_stats) { + delObj->writeStatistics(dptr->encodedV1Inst); + } + } + + if (qmf2Support) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + delObj->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(), + delObj->getClassName(), + "_data", + delObj->getMd5Sum()); + delObj->writeTimestamps(map_); + delObj->mapEncodeValues(values, true, send_stats); + map_["_values"] = values; + + dptr->encodedV2 = map_; + } + + pendingDeletedObjs[classkey].push_back(dptr); + + delete iter->second; + managementObjects.erase(iter->first); + } + + // now copy the pending deletes into the outList + + for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin(); + mIter != pendingDeletedObjs.end(); mIter++) { + for (DeletedObjectList::iterator lIter = mIter->second.begin(); + lIter != mIter->second.end(); lIter++) { + outList.push_back(*lIter); + } + } +} + + +// Merge this list's deleted objects to the management Agent's list of deleted +// objects waiting for next (last) publish-ment. +void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) +{ + sys::Mutex::ScopedLock lock (userLock); + + for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) { + + std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className); + DeletedObjectList& dList = pendingDeletedObjs[classkey]; + + // not sure if this is necessary - merge by objectid.... + bool found = false; + for (DeletedObjectList::iterator dIter = dList.begin(); dIter != dList.end(); dIter++) { + if ((*dIter)->objectId == (*lIter)->objectId) { + found = true; + break; + } + } + if (!found) { + dList.push_back(*lIter); + } + } +} + + +// construct a DeletedObject from an encoded representation. Used by +// clustering to move deleted objects between clustered brokers. See +// DeletedObject::encode() for the reverse. +ManagementAgent::DeletedObject::DeletedObject(const std::string& encoded) +{ + qpid::types::Variant::Map map_; + MapCodec::decode(encoded, map_); + + packageName = map_["_package_name"].getString(); + className = map_["_class_name"].getString(); + objectId = map_["_object_id"].getString(); + + encodedV1Config = map_["_v1_config"].getString(); + encodedV1Inst = map_["_v1_inst"].getString(); + encodedV2 = map_["_v2_data"].asMap(); +} + + +// encode a DeletedObject to a string buffer. Used by +// clustering to move deleted objects between clustered brokers. See +// DeletedObject(const std::string&) for the reverse. +void ManagementAgent::DeletedObject::encode(std::string& toBuffer) +{ + qpid::types::Variant::Map map_; + + + map_["_package_name"] = packageName; + map_["_class_name"] = className; + map_["_object_id"] = objectId; + + map_["_v1_config"] = encodedV1Config; + map_["_v1_inst"] = encodedV1Inst; + map_["_v2_data"] = encodedV2; + + MapCodec::encode(map_, toBuffer); +} diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index f4d3c8c299..9829094a0f 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -148,6 +148,40 @@ public: static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in); static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val); + // For Clustering: management objects that have been marked as + // "deleted", but are waiting for their last published object + // update are not visible to the cluster replication code. These + // interfaces allow clustering to gather up all the management + // objects that are deleted in order to allow all clustered + // brokers to publish the same set of deleted objects. + + class DeletedObject { + public: + typedef boost::shared_ptr<DeletedObject> shared_ptr; + DeletedObject() {}; + DeletedObject( const std::string &encoded ); + ~DeletedObject() {}; + void encode( std::string& toBuffer ); + + private: + friend class ManagementAgent; + + std::string packageName; + std::string className; + std::string objectId; + + std::string encodedV1Config; // qmfv1 properties + std::string encodedV1Inst; // qmfv1 statistics + qpid::types::Variant::Map encodedV2; + }; + + typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList; + + /** returns a snapshot of all currently deleted management objects. */ + void exportDeletedObjects( DeletedObjectList& outList ); + + /** Import a list of deleted objects to send on next publish interval. */ + void importDeletedObjects( const DeletedObjectList& inList ); private: struct Periodic : public qpid::sys::TimerTask @@ -293,6 +327,13 @@ private: // message. uint32_t maxV2ReplyObjs; + // list of objects that have been deleted, but have yet to be published + // one final time. + // Indexed by a string composed of the object's package and class name. + // Protected by userLock. + typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap; + PendingDeletedObjsMap pendingDeletedObjs; + # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; char outputBuffer[MA_BUFFER_SIZE]; |