diff options
author | Alan Conway <aconway@apache.org> | 2011-01-20 14:13:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-01-20 14:13:08 +0000 |
commit | 888f2f8b812308c32594f8abcca7aefc956b5803 (patch) | |
tree | 67ac8a15014e7c6b31f2ea3b5b64e354d7eb5614 /qpid/cpp/src/qpid/management | |
parent | edc8085edae3d1664ab82d736542f5643a7ea7ad (diff) | |
download | qpid-python-888f2f8b812308c32594f8abcca7aefc956b5803.tar.gz |
Bug 654872, QPID-3007: Batch management messages by count, not size.
QMF V1 management messages were being batched by accumulating up to a
certain total size of data. Since management messages may have
different sizes on brokers in a cluster, this was leading to
inconsistencies.
This patch batches V1 messages by count rather than by size, similar
to V2 messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1061308 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/management')
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 28 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 4 |
2 files changed, 19 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 7459ac9416..0fb23bdb7d 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -106,7 +106,8 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : startTime(sys::now()), suppressed(false), disallowAllV1Methods(false), vendorNameKey(defaultVendorName), productNameKey(defaultProductName), - qmf1Support(qmfV1), qmf2Support(qmfV2), maxV2ReplyObjs(100) + qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100), + msgBuffer(MA_BUFFER_SIZE) { nextObjectId = 1; brokerBank = 1; @@ -663,7 +664,6 @@ void ManagementAgent::periodicProcessing (void) #define HEADROOM 4096 debugSnapshot("Management agent periodic processing"); sys::Mutex::ScopedLock lock (userLock); - char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; string sBuf; @@ -704,7 +704,7 @@ void ManagementAgent::periodicProcessing (void) for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { std::string packageName; std::string className; - Buffer msgBuffer(msgChars, BUFSIZE); + msgBuffer.reset(); uint32_t v1Objs = 0; uint32_t v2Objs = 0; Variant::List list_; @@ -715,6 +715,7 @@ void ManagementAgent::periodicProcessing (void) for (DeletedObjectList::iterator lIter = mIter->second.begin(); lIter != mIter->second.end(); lIter++) { + msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space. std::string oid = (*lIter)->objectId; if (!(*lIter)->encodedV1Config.empty()) { encodeHeader(msgBuffer, 'c'); @@ -730,9 +731,9 @@ void ManagementAgent::periodicProcessing (void) << " len=" << (*lIter)->encodedV1Inst.size()); v1Objs++; } - if (v1Objs && msgBuffer.available() < HEADROOM) { + if (v1Objs >= maxReplyObjs) { v1Objs = 0; - contentSize = BUFSIZE - msgBuffer.available(); + contentSize = msgBuffer.getSize(); stringstream key; key << "console.obj.1.0." << packageName << "." << className; msgBuffer.reset(); @@ -744,7 +745,7 @@ void ManagementAgent::periodicProcessing (void) if (!(*lIter)->encodedV2.empty()) { QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2); list_.push_back((*lIter)->encodedV2); - if (++v2Objs >= maxV2ReplyObjs) { + if (++v2Objs >= maxReplyObjs) { v2Objs = 0; string content; @@ -815,11 +816,11 @@ void ManagementAgent::periodicProcessing (void) // sendBuffer() call, so always restart the search after a sendBuffer() call // while (1) { - Buffer msgBuffer(msgChars, BUFSIZE); + msgBuffer.reset(); Variant::List list_; uint32_t pcount; uint32_t scount; - uint32_t v2Objs; + uint32_t v1Objs, v2Objs; ManagementObjectMap::iterator baseIter; std::string packageName; std::string className; @@ -842,6 +843,7 @@ void ManagementAgent::periodicProcessing (void) break; // done - all objects processed pcount = scount = 0; + v1Objs = 0; v2Objs = 0; list_.clear(); msgBuffer.reset(); @@ -849,6 +851,7 @@ void ManagementAgent::periodicProcessing (void) for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); iter++) { + msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space ManagementObject* baseObject = baseIter->second; ManagementObject* object = iter->second; bool send_stats, send_props; @@ -875,6 +878,7 @@ void ManagementAgent::periodicProcessing (void) QPID_LOG(trace, "Changed V1 properties " << object->getObjectId().getV2Key() << " len=" << msgBuffer.getPosition()-pos); + ++v1Objs; } if (send_stats && qmf1Support) { @@ -886,7 +890,7 @@ void ManagementAgent::periodicProcessing (void) QPID_LOG(trace, "Changed V1 statistics " << object->getObjectId().getV2Key() << " len=" << msgBuffer.getPosition()-pos); - + ++v1Objs; } if ((send_stats || send_props) && qmf2Support) { @@ -916,8 +920,8 @@ void ManagementAgent::periodicProcessing (void) object->setForcePublish(false); - if ((qmf1Support && (msgBuffer.available() < HEADROOM)) || - (qmf2Support && (v2Objs >= maxV2ReplyObjs))) + if ((qmf1Support && (v1Objs >= maxReplyObjs)) || + (qmf2Support && (v2Objs >= maxReplyObjs))) break; // have enough objects, send an indication... } } @@ -1967,7 +1971,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo "_data", object->getMd5Sum()); _subList.push_back(map_); - if (++objCount >= maxV2ReplyObjs) { + if (++objCount >= maxReplyObjs) { objCount = 0; _list.push_back(_subList); _subList.clear(); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 2202e2fc98..d434fe44da 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -35,6 +35,7 @@ #include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> #include <qpid/framing/FieldValue.h> +#include <qpid/framing/ResizableBuffer.h> #include <memory> #include <string> #include <map> @@ -330,7 +331,7 @@ private: // Maximum # of objects allowed in a single V2 response // message. - uint32_t maxV2ReplyObjs; + uint32_t maxReplyObjs; // list of objects that have been deleted, but have yet to be published // one final time. @@ -343,6 +344,7 @@ private: char inputBuffer[MA_BUFFER_SIZE]; char outputBuffer[MA_BUFFER_SIZE]; char eventBuffer[MA_BUFFER_SIZE]; + framing::ResizableBuffer msgBuffer; void writeData (); void periodicProcessing (void); |