diff options
author | Alan Conway <aconway@apache.org> | 2011-01-20 14:13:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-01-20 14:13:08 +0000 |
commit | f89731ae3ec246a09a53f6eaa0c8506199321fd2 (patch) | |
tree | ff4519d88f3adf1027544fe5017c7466c8995e33 | |
parent | cddd34f0fa3d15d2963976670747e616f947df91 (diff) | |
download | qpid-python-f89731ae3ec246a09a53f6eaa0c8506199321fd2.tar.gz |
Bug 654872, QPID-3007: Batch management messages by count, not size.
QMF V1 management messages were being batched by accumulating up to a
certain total size of data. Since management messages may have
different sizes on brokers in a cluster, this was leading to
inconsistencies.
This patch batches V1 messages by count rather than by size, similar
to V2 messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1061308 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/include/qpid/framing/ResizableBuffer.h | 60 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 28 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 4 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_test_logs.py | 7 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 13 |
6 files changed, 95 insertions, 18 deletions
diff --git a/cpp/include/qpid/framing/ResizableBuffer.h b/cpp/include/qpid/framing/ResizableBuffer.h new file mode 100644 index 0000000000..e6c9e7a113 --- /dev/null +++ b/cpp/include/qpid/framing/ResizableBuffer.h @@ -0,0 +1,60 @@ +#ifndef QPID_FRAMING_RESIZABLEBUFFER_H +#define QPID_FRAMING_RESIZABLEBUFFER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Buffer.h" +#include <vector> + +namespace qpid { +namespace framing { + +/** + * A buffer that maintains its own storage and can be resized, + * keeping any data already written to the buffer. + */ +class ResizableBuffer : public Buffer +{ + public: + ResizableBuffer(size_t initialSize) : store(initialSize) { + static_cast<Buffer&>(*this) = Buffer(&store[0], store.size()); + } + + void resize(size_t newSize) { + size_t oldPos = getPosition(); + store.resize(newSize); + static_cast<Buffer&>(*this) = Buffer(&store[0], store.size()); + setPosition(oldPos); + } + + /** Make sure at least n bytes are available */ + void makeAvailable(size_t n) { + if (n > available()) + resize(getSize() + n - available()); + } + + private: + std::vector<char> store; +}; +}} // namespace qpid::framing + +#endif /*!QPID_FRAMING_RESIZABLEBUFFER_H*/ diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 9f97b94b8a..e93e0c960d 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -373,6 +373,7 @@ libqpidcommon_la_SOURCES += \ qpid/framing/BodyHandler.cpp \ qpid/framing/BodyHandler.h \ qpid/framing/Buffer.cpp \ + qpid/framing/ResizableBuffer.h \ qpid/framing/ChannelHandler.h \ qpid/framing/Endian.cpp \ qpid/framing/Endian.h \ diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 7459ac9416..0fb23bdb7d 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -106,7 +106,8 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : startTime(sys::now()), suppressed(false), disallowAllV1Methods(false), vendorNameKey(defaultVendorName), productNameKey(defaultProductName), - qmf1Support(qmfV1), qmf2Support(qmfV2), maxV2ReplyObjs(100) + qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100), + msgBuffer(MA_BUFFER_SIZE) { nextObjectId = 1; brokerBank = 1; @@ -663,7 +664,6 @@ void ManagementAgent::periodicProcessing (void) #define HEADROOM 4096 debugSnapshot("Management agent periodic processing"); sys::Mutex::ScopedLock lock (userLock); - char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; string sBuf; @@ -704,7 +704,7 @@ void ManagementAgent::periodicProcessing (void) for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { std::string packageName; std::string className; - Buffer msgBuffer(msgChars, BUFSIZE); + msgBuffer.reset(); uint32_t v1Objs = 0; uint32_t v2Objs = 0; Variant::List list_; @@ -715,6 +715,7 @@ void ManagementAgent::periodicProcessing (void) for (DeletedObjectList::iterator lIter = mIter->second.begin(); lIter != mIter->second.end(); lIter++) { + msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space. std::string oid = (*lIter)->objectId; if (!(*lIter)->encodedV1Config.empty()) { encodeHeader(msgBuffer, 'c'); @@ -730,9 +731,9 @@ void ManagementAgent::periodicProcessing (void) << " len=" << (*lIter)->encodedV1Inst.size()); v1Objs++; } - if (v1Objs && msgBuffer.available() < HEADROOM) { + if (v1Objs >= maxReplyObjs) { v1Objs = 0; - contentSize = BUFSIZE - msgBuffer.available(); + contentSize = msgBuffer.getSize(); stringstream key; key << "console.obj.1.0." << packageName << "." << className; msgBuffer.reset(); @@ -744,7 +745,7 @@ void ManagementAgent::periodicProcessing (void) if (!(*lIter)->encodedV2.empty()) { QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2); list_.push_back((*lIter)->encodedV2); - if (++v2Objs >= maxV2ReplyObjs) { + if (++v2Objs >= maxReplyObjs) { v2Objs = 0; string content; @@ -815,11 +816,11 @@ void ManagementAgent::periodicProcessing (void) // sendBuffer() call, so always restart the search after a sendBuffer() call // while (1) { - Buffer msgBuffer(msgChars, BUFSIZE); + msgBuffer.reset(); Variant::List list_; uint32_t pcount; uint32_t scount; - uint32_t v2Objs; + uint32_t v1Objs, v2Objs; ManagementObjectMap::iterator baseIter; std::string packageName; std::string className; @@ -842,6 +843,7 @@ void ManagementAgent::periodicProcessing (void) break; // done - all objects processed pcount = scount = 0; + v1Objs = 0; v2Objs = 0; list_.clear(); msgBuffer.reset(); @@ -849,6 +851,7 @@ void ManagementAgent::periodicProcessing (void) for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); iter++) { + msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space ManagementObject* baseObject = baseIter->second; ManagementObject* object = iter->second; bool send_stats, send_props; @@ -875,6 +878,7 @@ void ManagementAgent::periodicProcessing (void) QPID_LOG(trace, "Changed V1 properties " << object->getObjectId().getV2Key() << " len=" << msgBuffer.getPosition()-pos); + ++v1Objs; } if (send_stats && qmf1Support) { @@ -886,7 +890,7 @@ void ManagementAgent::periodicProcessing (void) QPID_LOG(trace, "Changed V1 statistics " << object->getObjectId().getV2Key() << " len=" << msgBuffer.getPosition()-pos); - + ++v1Objs; } if ((send_stats || send_props) && qmf2Support) { @@ -916,8 +920,8 @@ void ManagementAgent::periodicProcessing (void) object->setForcePublish(false); - if ((qmf1Support && (msgBuffer.available() < HEADROOM)) || - (qmf2Support && (v2Objs >= maxV2ReplyObjs))) + if ((qmf1Support && (v1Objs >= maxReplyObjs)) || + (qmf2Support && (v2Objs >= maxReplyObjs))) break; // have enough objects, send an indication... } } @@ -1967,7 +1971,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo "_data", object->getMd5Sum()); _subList.push_back(map_); - if (++objCount >= maxV2ReplyObjs) { + if (++objCount >= maxReplyObjs) { objCount = 0; _list.push_back(_subList); _subList.clear(); diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 2202e2fc98..d434fe44da 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -35,6 +35,7 @@ #include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> #include <qpid/framing/FieldValue.h> +#include <qpid/framing/ResizableBuffer.h> #include <memory> #include <string> #include <map> @@ -330,7 +331,7 @@ private: // Maximum # of objects allowed in a single V2 response // message. - uint32_t maxV2ReplyObjs; + uint32_t maxReplyObjs; // list of objects that have been deleted, but have yet to be published // one final time. @@ -343,6 +344,7 @@ private: char inputBuffer[MA_BUFFER_SIZE]; char outputBuffer[MA_BUFFER_SIZE]; char eventBuffer[MA_BUFFER_SIZE]; + framing::ResizableBuffer msgBuffer; void writeData (); void periodicProcessing (void); diff --git a/cpp/src/tests/cluster_test_logs.py b/cpp/src/tests/cluster_test_logs.py index 0333822824..4cb921932e 100755 --- a/cpp/src/tests/cluster_test_logs.py +++ b/cpp/src/tests/cluster_test_logs.py @@ -59,7 +59,8 @@ def filter_log(log): 'task late', 'task overran', 'warning CLOSING .* unsent data', - 'Inter-broker link ' + 'Inter-broker link ', + 'Running in a cluster, marking store' ]) if re.compile(skip).search(l): continue @@ -85,7 +86,7 @@ def filter_log(log): out.write(l) out.close() -def verify_logs(logs): +def verify_logs(): """Compare log files from cluster brokers, verify that they correspond correctly.""" # FIXME aconway 2011-01-19: disable when called from unit tests # Causing sporadic failures, see https://issues.apache.org/jira/browse/QPID-3007 @@ -110,4 +111,4 @@ def verify_logs(logs): # Can be run as a script. if __name__ == "__main__": - verify_logs(glob.glob("*.log")) + verify_logs() diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 6e515cdbf1..27010c17f7 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -302,7 +302,7 @@ acl allow all all scanner.join() assert scanner.found # Verify logs are consistent - cluster_test_logs.verify_logs(glob.glob("*.log")) + cluster_test_logs.verify_logs() class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" @@ -448,11 +448,20 @@ class LongTests(BrokerTest): c.stop() # Verify that logs are consistent - cluster_test_logs.verify_logs(glob.glob("*.log")) + cluster_test_logs.verify_logs() def test_management_qmf2(self): self.test_management(args=["--mgmt-qmf2=yes"]) + def test_connect_consistent(self): # FIXME aconway 2011-01-18: + args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] + cluster = self.cluster(2, args=args) + end = time.time() + self.duration() + while (time.time() < end): # Get a management interval + for i in xrange(1000): cluster[0].connect().close() + cluster_test_logs.verify_logs() + + class StoreTests(BrokerTest): """ Cluster tests that can only be run if there is a store available. |