summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-01-20 14:13:08 +0000
committerAlan Conway <aconway@apache.org>2011-01-20 14:13:08 +0000
commitf89731ae3ec246a09a53f6eaa0c8506199321fd2 (patch)
treeff4519d88f3adf1027544fe5017c7466c8995e33
parentcddd34f0fa3d15d2963976670747e616f947df91 (diff)
downloadqpid-python-f89731ae3ec246a09a53f6eaa0c8506199321fd2.tar.gz
Bug 654872, QPID-3007: Batch management messages by count, not size.
QMF V1 management messages were being batched by accumulating up to a certain total size of data. Since management messages may have different sizes on brokers in a cluster, this was leading to inconsistencies. This patch batches V1 messages by count rather than by size, similar to V2 messages. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1061308 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/include/qpid/framing/ResizableBuffer.h60
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp28
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h4
-rwxr-xr-xcpp/src/tests/cluster_test_logs.py7
-rwxr-xr-xcpp/src/tests/cluster_tests.py13
6 files changed, 95 insertions, 18 deletions
diff --git a/cpp/include/qpid/framing/ResizableBuffer.h b/cpp/include/qpid/framing/ResizableBuffer.h
new file mode 100644
index 0000000000..e6c9e7a113
--- /dev/null
+++ b/cpp/include/qpid/framing/ResizableBuffer.h
@@ -0,0 +1,60 @@
+#ifndef QPID_FRAMING_RESIZABLEBUFFER_H
+#define QPID_FRAMING_RESIZABLEBUFFER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Buffer.h"
+#include <vector>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * A buffer that maintains its own storage and can be resized,
+ * keeping any data already written to the buffer.
+ */
+class ResizableBuffer : public Buffer
+{
+ public:
+ ResizableBuffer(size_t initialSize) : store(initialSize) {
+ static_cast<Buffer&>(*this) = Buffer(&store[0], store.size());
+ }
+
+ void resize(size_t newSize) {
+ size_t oldPos = getPosition();
+ store.resize(newSize);
+ static_cast<Buffer&>(*this) = Buffer(&store[0], store.size());
+ setPosition(oldPos);
+ }
+
+ /** Make sure at least n bytes are available */
+ void makeAvailable(size_t n) {
+ if (n > available())
+ resize(getSize() + n - available());
+ }
+
+ private:
+ std::vector<char> store;
+};
+}} // namespace qpid::framing
+
+#endif /*!QPID_FRAMING_RESIZABLEBUFFER_H*/
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 9f97b94b8a..e93e0c960d 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -373,6 +373,7 @@ libqpidcommon_la_SOURCES += \
qpid/framing/BodyHandler.cpp \
qpid/framing/BodyHandler.h \
qpid/framing/Buffer.cpp \
+ qpid/framing/ResizableBuffer.h \
qpid/framing/ChannelHandler.h \
qpid/framing/Endian.cpp \
qpid/framing/Endian.h \
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 7459ac9416..0fb23bdb7d 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -106,7 +106,8 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
- qmf1Support(qmfV1), qmf2Support(qmfV2), maxV2ReplyObjs(100)
+ qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100),
+ msgBuffer(MA_BUFFER_SIZE)
{
nextObjectId = 1;
brokerBank = 1;
@@ -663,7 +664,6 @@ void ManagementAgent::periodicProcessing (void)
#define HEADROOM 4096
debugSnapshot("Management agent periodic processing");
sys::Mutex::ScopedLock lock (userLock);
- char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
string sBuf;
@@ -704,7 +704,7 @@ void ManagementAgent::periodicProcessing (void)
for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) {
std::string packageName;
std::string className;
- Buffer msgBuffer(msgChars, BUFSIZE);
+ msgBuffer.reset();
uint32_t v1Objs = 0;
uint32_t v2Objs = 0;
Variant::List list_;
@@ -715,6 +715,7 @@ void ManagementAgent::periodicProcessing (void)
for (DeletedObjectList::iterator lIter = mIter->second.begin();
lIter != mIter->second.end(); lIter++) {
+ msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space.
std::string oid = (*lIter)->objectId;
if (!(*lIter)->encodedV1Config.empty()) {
encodeHeader(msgBuffer, 'c');
@@ -730,9 +731,9 @@ void ManagementAgent::periodicProcessing (void)
<< " len=" << (*lIter)->encodedV1Inst.size());
v1Objs++;
}
- if (v1Objs && msgBuffer.available() < HEADROOM) {
+ if (v1Objs >= maxReplyObjs) {
v1Objs = 0;
- contentSize = BUFSIZE - msgBuffer.available();
+ contentSize = msgBuffer.getSize();
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
msgBuffer.reset();
@@ -744,7 +745,7 @@ void ManagementAgent::periodicProcessing (void)
if (!(*lIter)->encodedV2.empty()) {
QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
list_.push_back((*lIter)->encodedV2);
- if (++v2Objs >= maxV2ReplyObjs) {
+ if (++v2Objs >= maxReplyObjs) {
v2Objs = 0;
string content;
@@ -815,11 +816,11 @@ void ManagementAgent::periodicProcessing (void)
// sendBuffer() call, so always restart the search after a sendBuffer() call
//
while (1) {
- Buffer msgBuffer(msgChars, BUFSIZE);
+ msgBuffer.reset();
Variant::List list_;
uint32_t pcount;
uint32_t scount;
- uint32_t v2Objs;
+ uint32_t v1Objs, v2Objs;
ManagementObjectMap::iterator baseIter;
std::string packageName;
std::string className;
@@ -842,6 +843,7 @@ void ManagementAgent::periodicProcessing (void)
break; // done - all objects processed
pcount = scount = 0;
+ v1Objs = 0;
v2Objs = 0;
list_.clear();
msgBuffer.reset();
@@ -849,6 +851,7 @@ void ManagementAgent::periodicProcessing (void)
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
iter++) {
+ msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space
ManagementObject* baseObject = baseIter->second;
ManagementObject* object = iter->second;
bool send_stats, send_props;
@@ -875,6 +878,7 @@ void ManagementAgent::periodicProcessing (void)
QPID_LOG(trace, "Changed V1 properties "
<< object->getObjectId().getV2Key()
<< " len=" << msgBuffer.getPosition()-pos);
+ ++v1Objs;
}
if (send_stats && qmf1Support) {
@@ -886,7 +890,7 @@ void ManagementAgent::periodicProcessing (void)
QPID_LOG(trace, "Changed V1 statistics "
<< object->getObjectId().getV2Key()
<< " len=" << msgBuffer.getPosition()-pos);
-
+ ++v1Objs;
}
if ((send_stats || send_props) && qmf2Support) {
@@ -916,8 +920,8 @@ void ManagementAgent::periodicProcessing (void)
object->setForcePublish(false);
- if ((qmf1Support && (msgBuffer.available() < HEADROOM)) ||
- (qmf2Support && (v2Objs >= maxV2ReplyObjs)))
+ if ((qmf1Support && (v1Objs >= maxReplyObjs)) ||
+ (qmf2Support && (v2Objs >= maxReplyObjs)))
break; // have enough objects, send an indication...
}
}
@@ -1967,7 +1971,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo
"_data",
object->getMd5Sum());
_subList.push_back(map_);
- if (++objCount >= maxV2ReplyObjs) {
+ if (++objCount >= maxReplyObjs) {
objCount = 0;
_list.push_back(_subList);
_subList.clear();
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index 2202e2fc98..d434fe44da 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -35,6 +35,7 @@
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/FieldValue.h>
+#include <qpid/framing/ResizableBuffer.h>
#include <memory>
#include <string>
#include <map>
@@ -330,7 +331,7 @@ private:
// Maximum # of objects allowed in a single V2 response
// message.
- uint32_t maxV2ReplyObjs;
+ uint32_t maxReplyObjs;
// list of objects that have been deleted, but have yet to be published
// one final time.
@@ -343,6 +344,7 @@ private:
char inputBuffer[MA_BUFFER_SIZE];
char outputBuffer[MA_BUFFER_SIZE];
char eventBuffer[MA_BUFFER_SIZE];
+ framing::ResizableBuffer msgBuffer;
void writeData ();
void periodicProcessing (void);
diff --git a/cpp/src/tests/cluster_test_logs.py b/cpp/src/tests/cluster_test_logs.py
index 0333822824..4cb921932e 100755
--- a/cpp/src/tests/cluster_test_logs.py
+++ b/cpp/src/tests/cluster_test_logs.py
@@ -59,7 +59,8 @@ def filter_log(log):
'task late',
'task overran',
'warning CLOSING .* unsent data',
- 'Inter-broker link '
+ 'Inter-broker link ',
+ 'Running in a cluster, marking store'
])
if re.compile(skip).search(l): continue
@@ -85,7 +86,7 @@ def filter_log(log):
out.write(l)
out.close()
-def verify_logs(logs):
+def verify_logs():
"""Compare log files from cluster brokers, verify that they correspond correctly."""
# FIXME aconway 2011-01-19: disable when called from unit tests
# Causing sporadic failures, see https://issues.apache.org/jira/browse/QPID-3007
@@ -110,4 +111,4 @@ def verify_logs(logs):
# Can be run as a script.
if __name__ == "__main__":
- verify_logs(glob.glob("*.log"))
+ verify_logs()
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 6e515cdbf1..27010c17f7 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -302,7 +302,7 @@ acl allow all all
scanner.join()
assert scanner.found
# Verify logs are consistent
- cluster_test_logs.verify_logs(glob.glob("*.log"))
+ cluster_test_logs.verify_logs()
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
@@ -448,11 +448,20 @@ class LongTests(BrokerTest):
c.stop()
# Verify that logs are consistent
- cluster_test_logs.verify_logs(glob.glob("*.log"))
+ cluster_test_logs.verify_logs()
def test_management_qmf2(self):
self.test_management(args=["--mgmt-qmf2=yes"])
+ def test_connect_consistent(self): # FIXME aconway 2011-01-18:
+ args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+ cluster = self.cluster(2, args=args)
+ end = time.time() + self.duration()
+ while (time.time() < end): # Get a management interval
+ for i in xrange(1000): cluster[0].connect().close()
+ cluster_test_logs.verify_logs()
+
+
class StoreTests(BrokerTest):
"""
Cluster tests that can only be run if there is a store available.