diff options
author | Alan Conway <aconway@apache.org> | 2009-07-14 14:49:33 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-07-14 14:49:33 +0000 |
commit | 141f7814e093845265b24c47509fb0a9047c1881 (patch) | |
tree | 7f4fda51f6bf4eec62f332d4c44bf353c08913a5 /cpp | |
parent | 8978cf64e20e9cc89aa973ea7cce2ed3c85ec568 (diff) | |
download | qpid-python-141f7814e093845265b24c47509fb0a9047c1881.tar.gz |
Minor cluster optimizations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@793917 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ErrorCheck.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ErrorCheck.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ActivityTimer.h | 106 | ||||
-rw-r--r-- | cpp/src/tests/latencytest.cpp | 57 |
5 files changed, 55 insertions, 135 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index fc8faf08ec..3d46797679 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -414,7 +414,7 @@ LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");) deliverEventQueue.start(); } // Process each frame through the error checker. - if (settings.checkErrors) { + if (settings.checkErrors && error.isUnresolved()) { error.delivered(e); while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp index 2af820c8a0..d498b252f5 100644 --- a/cpp/src/qpid/cluster/ErrorCheck.cpp +++ b/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -67,8 +67,8 @@ void ErrorCheck::error( } void ErrorCheck::delivered(const EventFrame& e) { - FrameQueue::iterator i = frames.insert(frames.end(), e); - review(i); + frames.push_back(e); + review(frames.end()-1); } // Review a frame in the queue with respect to the current error. @@ -84,7 +84,7 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& if (errorCheck->getType() < type) { // my error is worse than his QPID_LOG(critical, cluster << " error " << frameSeq << " did not occur on " << i->getMemberId()); - throw Exception("Aborted by local failure that did not occur on all replicas"); + throw Exception("Aborted by failure that did not occur on all replicas"); } else { // his error is worse/same as mine. QPID_LOG(debug, cluster << " error " << frameSeq @@ -96,12 +96,12 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& else { const ClusterConfigChangeBody* configChange = 0; if (i->frame.getBody()) - configChange = dynamic_cast<const ClusterConfigChangeBody*>(i->frame.getMethod()); + configChange = dynamic_cast<const ClusterConfigChangeBody*>( + i->frame.getMethod()); if (configChange) { MemberSet members(ClusterMap::decode(configChange->getCurrent())); QPID_LOG(debug, cluster << " apply config change to unresolved: " << members); - MemberSet intersect; set_intersection(members.begin(), members.end(), unresolved.begin(), unresolved.end(), @@ -130,12 +130,4 @@ EventFrame ErrorCheck::getNext() { return e; } -bool ErrorCheck::canProcess() const { - return type == ERROR_TYPE_NONE && !frames.empty(); -} - -bool ErrorCheck::isUnresolved() const { - return type != ERROR_TYPE_NONE; -} - }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h index 51312046b8..550cd36b75 100644 --- a/cpp/src/qpid/cluster/ErrorCheck.h +++ b/cpp/src/qpid/cluster/ErrorCheck.h @@ -62,11 +62,14 @@ class ErrorCheck /**@pre canProcess **/ EventFrame getNext(); - bool canProcess() const; + bool canProcess() const { return type == NONE && !frames.empty(); } + + bool isUnresolved() const { return type != NONE; } + - bool isUnresolved() const; private: + static const ErrorType NONE = framing::cluster::ERROR_TYPE_NONE; typedef std::deque<EventFrame> FrameQueue; FrameQueue::iterator review(const FrameQueue::iterator&); void checkResolved(); diff --git a/cpp/src/qpid/sys/ActivityTimer.h b/cpp/src/qpid/sys/ActivityTimer.h deleted file mode 100644 index d49e16bc4f..0000000000 --- a/cpp/src/qpid/sys/ActivityTimer.h +++ /dev/null @@ -1,106 +0,0 @@ -#ifndef QPID_SYS_ACTIVITYTIMER_H -#define QPID_SYS_ACTIVITYTIMER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Time.h" -#include "qpid/sys/Thread.h" -#include <boost/current_function.hpp> -#include <stdio.h> - -namespace qpid { -namespace sys { - -/** - * Measures and reports time spent in a particular segment of code. - * This is real time so it includes time blocked/sleeping as well as time on CPU. - * - * Intended to be used via the QPID_ACTIVITY_TIMER macro for profiling - * during development & debugging - */ -class ActivityTimer -{ - public: - - struct Stat { // Must be a POD - uint64_t total, count; - void sample(uint64_t value) { total += value; ++count; } - uint64_t mean() { return count ? total/count : 0; } - void reset() { total = count = 0; } - }; - - struct Data { // Must be a POD - uint64_t start, entered; - Stat active; - - void reset() { - start = entered = 0; - active.reset(); - } - - void enter(uint64_t now) { - entered=now; - if (!start) start = Duration(now); - } - - void exit(uint64_t now) { - active.sample(now - entered); - } - }; - - ActivityTimer(Data& d, const char* fn, const char* file, int line, uint64_t reportInterval) : data(d) { - uint64_t now = Duration(qpid::sys::now()); - if (data.start) { - interval = now-data.start; - if (interval > reportInterval) - report(fn, file, line); - } - data.enter(now); - } - - ~ActivityTimer() { - data.exit(Duration(now())); - } - - private: - Data& data; - uint64_t interval; - - void report(const char* fn, const char* file, int line) { - long rate = (data.active.count*TIME_SEC)/interval; - double percent = (data.active.total*100.0)/interval; - printf("%s:%d: TIMER %ld/sec %f%% [%lu] %s\n", - file, line, rate, percent, Thread::current().id(), fn); - data.reset(); - } -}; - -}} // namespace qpid::sys - -/** Measures time between the point of declaration and the end of the innermost enclosing scope. - * Can only have one in a given scope. - */ -#define ACTIVITY_TIMER(REPORT_INTERVAL_SECS) \ - static __thread ::qpid::sys::ActivityTimer::Data qpid__ActivityTimerData__ = { 0, 0, { 0,0 }}; \ - ::qpid::sys::ActivityTimer qpid__ActivityTimerInstance__(qpid__ActivityTimerData__, BOOST_CURRENT_FUNCTION, __FILE__, __LINE__, 2*::qpid::sys::TIME_SEC) - -#endif /*!QPID_SYS_ACTIVITYTIMER_H*/ diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp index 6ad84e1b82..a98aac4855 100644 --- a/cpp/src/tests/latencytest.cpp +++ b/cpp/src/tests/latencytest.cpp @@ -49,7 +49,7 @@ struct Args : public qpid::TestOptions { bool sync; uint reportFrequency; uint timeLimit; - uint queues; + uint concurrentConnections; uint prefetch; uint ack; bool cumulative; @@ -57,17 +57,18 @@ struct Args : public qpid::TestOptions { bool durable; string base; bool singleConnect; + uint queues; Args() : size(256), count(1000), rate(0), reportFrequency(1000), - timeLimit(0), queues(1), + timeLimit(0), concurrentConnections(1), prefetch(100), ack(0), - durable(false), base("latency-test"), singleConnect(false) + durable(false), base("latency-test"), singleConnect(false), queues(1) { addOptions() ("size", optValue(size, "N"), "message size") - ("queues", optValue(queues, "N"), "number of queues") + ("concurrentTests", optValue(concurrentConnections, "N"), "number of concurrent test setup") ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.") ("count", optValue(count, "N"), "number of messages to send") ("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)") @@ -81,7 +82,8 @@ struct Args : public qpid::TestOptions { ("durable", optValue(durable, "yes|no"), "use durable messages") ("csv", optValue(csv), "print stats in csv format (rate,min,max,avg)") ("cumulative", optValue(cumulative), "cumulative stats in csv format") - ("queue-base-name", optValue(base, "<name>"), "base name for queues"); + ("queue-base-name", optValue(base, "<name>"), "base name for queues") + ("queues", optValue(queues, "N"), "declare N queues & bindings to test routing"); } }; @@ -246,8 +248,8 @@ void Receiver::test() void Receiver::received(Message& msg) { ++count; - uint64_t sentAt = msg.getDeliveryProperties().getTimestamp(); uint64_t receivedAt = current_time(); + uint64_t sentAt = msg.getDeliveryProperties().getTimestamp(); stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC); @@ -357,11 +359,11 @@ void Sender::sendByRate() ++missedRate; else sys::usleep(delay / TIME_USEC); - if (timeLimit != 0 && Duration(start, now()) > timeLimit) { - session.sync(); - receiver.stop(); - break; - } + if (timeLimit != 0 && Duration(start, now()) > timeLimit) { + session.sync(); + receiver.stop(); + break; + } } } @@ -414,8 +416,32 @@ int main(int argc, char** argv) opts.parse(argc, argv); if (opts.cumulative) opts.csv = true; - boost::ptr_vector<Test> tests(opts.queues); - for (uint i = 0; i < opts.queues; i++) { + + Connection localConnection; + AsyncSession session; + if (opts.queues > 1){ + opts.open(localConnection); + session = localConnection.newSession(); + std::cout << "More than one queue being used, creating..." << std::endl; + // use default binding + for (uint i=0;i<opts.queues;i++){ + + std::ostringstream out; + out << opts.base << "-" << (i+1); + session.queueDeclare(arg::queue=out.str(), arg::durable=opts.durable, arg::autoDelete=true); + uint msgCount = session.queueQuery(arg::queue=out.str()).get().getMessageCount(); + if (msgCount) { + std::cout << "Warning: found " << msgCount << " msgs on " << out.str() << ". Purging..." << std::endl; + session.queuePurge(arg::queue=out.str()); + } + + } + session.sync(); + std::cout << "Complete..." << std::endl; + } + + boost::ptr_vector<Test> tests(opts.concurrentConnections); + for (uint i = 0; i < opts.concurrentConnections; i++) { std::ostringstream out; out << opts.base << "-" << (i+1); tests.push_back(new Test(out.str())); @@ -437,6 +463,11 @@ int main(int argc, char** argv) } } + if (opts.queues > 1){ + session.close(); + localConnection.close(); + } + return 0; } catch(const std::exception& e) { std::cout << e.what() << std::endl; |