summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/management
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-01-20 14:13:08 +0000
committerAlan Conway <aconway@apache.org>2011-01-20 14:13:08 +0000
commit888f2f8b812308c32594f8abcca7aefc956b5803 (patch)
tree67ac8a15014e7c6b31f2ea3b5b64e354d7eb5614 /qpid/cpp/src/qpid/management
parentedc8085edae3d1664ab82d736542f5643a7ea7ad (diff)
downloadqpid-python-888f2f8b812308c32594f8abcca7aefc956b5803.tar.gz
Bug 654872, QPID-3007: Batch management messages by count, not size.
QMF V1 management messages were being batched by accumulating up to a certain total size of data. Since management messages may have different sizes on brokers in a cluster, this was leading to inconsistencies. This patch batches V1 messages by count rather than by size, similar to V2 messages. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1061308 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/management')
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp28
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h4
2 files changed, 19 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 7459ac9416..0fb23bdb7d 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -106,7 +106,8 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
- qmf1Support(qmfV1), qmf2Support(qmfV2), maxV2ReplyObjs(100)
+ qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100),
+ msgBuffer(MA_BUFFER_SIZE)
{
nextObjectId = 1;
brokerBank = 1;
@@ -663,7 +664,6 @@ void ManagementAgent::periodicProcessing (void)
#define HEADROOM 4096
debugSnapshot("Management agent periodic processing");
sys::Mutex::ScopedLock lock (userLock);
- char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
string sBuf;
@@ -704,7 +704,7 @@ void ManagementAgent::periodicProcessing (void)
for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) {
std::string packageName;
std::string className;
- Buffer msgBuffer(msgChars, BUFSIZE);
+ msgBuffer.reset();
uint32_t v1Objs = 0;
uint32_t v2Objs = 0;
Variant::List list_;
@@ -715,6 +715,7 @@ void ManagementAgent::periodicProcessing (void)
for (DeletedObjectList::iterator lIter = mIter->second.begin();
lIter != mIter->second.end(); lIter++) {
+ msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space.
std::string oid = (*lIter)->objectId;
if (!(*lIter)->encodedV1Config.empty()) {
encodeHeader(msgBuffer, 'c');
@@ -730,9 +731,9 @@ void ManagementAgent::periodicProcessing (void)
<< " len=" << (*lIter)->encodedV1Inst.size());
v1Objs++;
}
- if (v1Objs && msgBuffer.available() < HEADROOM) {
+ if (v1Objs >= maxReplyObjs) {
v1Objs = 0;
- contentSize = BUFSIZE - msgBuffer.available();
+ contentSize = msgBuffer.getSize();
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
msgBuffer.reset();
@@ -744,7 +745,7 @@ void ManagementAgent::periodicProcessing (void)
if (!(*lIter)->encodedV2.empty()) {
QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
list_.push_back((*lIter)->encodedV2);
- if (++v2Objs >= maxV2ReplyObjs) {
+ if (++v2Objs >= maxReplyObjs) {
v2Objs = 0;
string content;
@@ -815,11 +816,11 @@ void ManagementAgent::periodicProcessing (void)
// sendBuffer() call, so always restart the search after a sendBuffer() call
//
while (1) {
- Buffer msgBuffer(msgChars, BUFSIZE);
+ msgBuffer.reset();
Variant::List list_;
uint32_t pcount;
uint32_t scount;
- uint32_t v2Objs;
+ uint32_t v1Objs, v2Objs;
ManagementObjectMap::iterator baseIter;
std::string packageName;
std::string className;
@@ -842,6 +843,7 @@ void ManagementAgent::periodicProcessing (void)
break; // done - all objects processed
pcount = scount = 0;
+ v1Objs = 0;
v2Objs = 0;
list_.clear();
msgBuffer.reset();
@@ -849,6 +851,7 @@ void ManagementAgent::periodicProcessing (void)
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
iter++) {
+ msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space
ManagementObject* baseObject = baseIter->second;
ManagementObject* object = iter->second;
bool send_stats, send_props;
@@ -875,6 +878,7 @@ void ManagementAgent::periodicProcessing (void)
QPID_LOG(trace, "Changed V1 properties "
<< object->getObjectId().getV2Key()
<< " len=" << msgBuffer.getPosition()-pos);
+ ++v1Objs;
}
if (send_stats && qmf1Support) {
@@ -886,7 +890,7 @@ void ManagementAgent::periodicProcessing (void)
QPID_LOG(trace, "Changed V1 statistics "
<< object->getObjectId().getV2Key()
<< " len=" << msgBuffer.getPosition()-pos);
-
+ ++v1Objs;
}
if ((send_stats || send_props) && qmf2Support) {
@@ -916,8 +920,8 @@ void ManagementAgent::periodicProcessing (void)
object->setForcePublish(false);
- if ((qmf1Support && (msgBuffer.available() < HEADROOM)) ||
- (qmf2Support && (v2Objs >= maxV2ReplyObjs)))
+ if ((qmf1Support && (v1Objs >= maxReplyObjs)) ||
+ (qmf2Support && (v2Objs >= maxReplyObjs)))
break; // have enough objects, send an indication...
}
}
@@ -1967,7 +1971,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo
"_data",
object->getMd5Sum());
_subList.push_back(map_);
- if (++objCount >= maxV2ReplyObjs) {
+ if (++objCount >= maxReplyObjs) {
objCount = 0;
_list.push_back(_subList);
_subList.clear();
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 2202e2fc98..d434fe44da 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -35,6 +35,7 @@
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/FieldValue.h>
+#include <qpid/framing/ResizableBuffer.h>
#include <memory>
#include <string>
#include <map>
@@ -330,7 +331,7 @@ private:
// Maximum # of objects allowed in a single V2 response
// message.
- uint32_t maxV2ReplyObjs;
+ uint32_t maxReplyObjs;
// list of objects that have been deleted, but have yet to be published
// one final time.
@@ -343,6 +344,7 @@ private:
char inputBuffer[MA_BUFFER_SIZE];
char outputBuffer[MA_BUFFER_SIZE];
char eventBuffer[MA_BUFFER_SIZE];
+ framing::ResizableBuffer msgBuffer;
void writeData ();
void periodicProcessing (void);