summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-07-14 14:49:33 +0000
committerAlan Conway <aconway@apache.org>2009-07-14 14:49:33 +0000
commit141f7814e093845265b24c47509fb0a9047c1881 (patch)
tree7f4fda51f6bf4eec62f332d4c44bf353c08913a5 /cpp
parent8978cf64e20e9cc89aa973ea7cce2ed3c85ec568 (diff)
downloadqpid-python-141f7814e093845265b24c47509fb0a9047c1881.tar.gz
Minor cluster optimizations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@793917 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp2
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.cpp18
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.h7
-rw-r--r--cpp/src/qpid/sys/ActivityTimer.h106
-rw-r--r--cpp/src/tests/latencytest.cpp57
5 files changed, 55 insertions, 135 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index fc8faf08ec..3d46797679 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -414,7 +414,7 @@ LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
deliverEventQueue.start();
}
// Process each frame through the error checker.
- if (settings.checkErrors) {
+ if (settings.checkErrors && error.isUnresolved()) {
error.delivered(e);
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp
index 2af820c8a0..d498b252f5 100644
--- a/cpp/src/qpid/cluster/ErrorCheck.cpp
+++ b/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -67,8 +67,8 @@ void ErrorCheck::error(
}
void ErrorCheck::delivered(const EventFrame& e) {
- FrameQueue::iterator i = frames.insert(frames.end(), e);
- review(i);
+ frames.push_back(e);
+ review(frames.end()-1);
}
// Review a frame in the queue with respect to the current error.
@@ -84,7 +84,7 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator&
if (errorCheck->getType() < type) { // my error is worse than his
QPID_LOG(critical, cluster << " error " << frameSeq
<< " did not occur on " << i->getMemberId());
- throw Exception("Aborted by local failure that did not occur on all replicas");
+ throw Exception("Aborted by failure that did not occur on all replicas");
}
else { // his error is worse/same as mine.
QPID_LOG(debug, cluster << " error " << frameSeq
@@ -96,12 +96,12 @@ ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator&
else {
const ClusterConfigChangeBody* configChange = 0;
if (i->frame.getBody())
- configChange = dynamic_cast<const ClusterConfigChangeBody*>(i->frame.getMethod());
+ configChange = dynamic_cast<const ClusterConfigChangeBody*>(
+ i->frame.getMethod());
if (configChange) {
MemberSet members(ClusterMap::decode(configChange->getCurrent()));
QPID_LOG(debug, cluster << " apply config change to unresolved: "
<< members);
-
MemberSet intersect;
set_intersection(members.begin(), members.end(),
unresolved.begin(), unresolved.end(),
@@ -130,12 +130,4 @@ EventFrame ErrorCheck::getNext() {
return e;
}
-bool ErrorCheck::canProcess() const {
- return type == ERROR_TYPE_NONE && !frames.empty();
-}
-
-bool ErrorCheck::isUnresolved() const {
- return type != ERROR_TYPE_NONE;
-}
-
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h
index 51312046b8..550cd36b75 100644
--- a/cpp/src/qpid/cluster/ErrorCheck.h
+++ b/cpp/src/qpid/cluster/ErrorCheck.h
@@ -62,11 +62,14 @@ class ErrorCheck
/**@pre canProcess **/
EventFrame getNext();
- bool canProcess() const;
+ bool canProcess() const { return type == NONE && !frames.empty(); }
+
+ bool isUnresolved() const { return type != NONE; }
+
- bool isUnresolved() const;
private:
+ static const ErrorType NONE = framing::cluster::ERROR_TYPE_NONE;
typedef std::deque<EventFrame> FrameQueue;
FrameQueue::iterator review(const FrameQueue::iterator&);
void checkResolved();
diff --git a/cpp/src/qpid/sys/ActivityTimer.h b/cpp/src/qpid/sys/ActivityTimer.h
deleted file mode 100644
index d49e16bc4f..0000000000
--- a/cpp/src/qpid/sys/ActivityTimer.h
+++ /dev/null
@@ -1,106 +0,0 @@
-#ifndef QPID_SYS_ACTIVITYTIMER_H
-#define QPID_SYS_ACTIVITYTIMER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Time.h"
-#include "qpid/sys/Thread.h"
-#include <boost/current_function.hpp>
-#include <stdio.h>
-
-namespace qpid {
-namespace sys {
-
-/**
- * Measures and reports time spent in a particular segment of code.
- * This is real time so it includes time blocked/sleeping as well as time on CPU.
- *
- * Intended to be used via the QPID_ACTIVITY_TIMER macro for profiling
- * during development & debugging
- */
-class ActivityTimer
-{
- public:
-
- struct Stat { // Must be a POD
- uint64_t total, count;
- void sample(uint64_t value) { total += value; ++count; }
- uint64_t mean() { return count ? total/count : 0; }
- void reset() { total = count = 0; }
- };
-
- struct Data { // Must be a POD
- uint64_t start, entered;
- Stat active;
-
- void reset() {
- start = entered = 0;
- active.reset();
- }
-
- void enter(uint64_t now) {
- entered=now;
- if (!start) start = Duration(now);
- }
-
- void exit(uint64_t now) {
- active.sample(now - entered);
- }
- };
-
- ActivityTimer(Data& d, const char* fn, const char* file, int line, uint64_t reportInterval) : data(d) {
- uint64_t now = Duration(qpid::sys::now());
- if (data.start) {
- interval = now-data.start;
- if (interval > reportInterval)
- report(fn, file, line);
- }
- data.enter(now);
- }
-
- ~ActivityTimer() {
- data.exit(Duration(now()));
- }
-
- private:
- Data& data;
- uint64_t interval;
-
- void report(const char* fn, const char* file, int line) {
- long rate = (data.active.count*TIME_SEC)/interval;
- double percent = (data.active.total*100.0)/interval;
- printf("%s:%d: TIMER %ld/sec %f%% [%lu] %s\n",
- file, line, rate, percent, Thread::current().id(), fn);
- data.reset();
- }
-};
-
-}} // namespace qpid::sys
-
-/** Measures time between the point of declaration and the end of the innermost enclosing scope.
- * Can only have one in a given scope.
- */
-#define ACTIVITY_TIMER(REPORT_INTERVAL_SECS) \
- static __thread ::qpid::sys::ActivityTimer::Data qpid__ActivityTimerData__ = { 0, 0, { 0,0 }}; \
- ::qpid::sys::ActivityTimer qpid__ActivityTimerInstance__(qpid__ActivityTimerData__, BOOST_CURRENT_FUNCTION, __FILE__, __LINE__, 2*::qpid::sys::TIME_SEC)
-
-#endif /*!QPID_SYS_ACTIVITYTIMER_H*/
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index 6ad84e1b82..a98aac4855 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -49,7 +49,7 @@ struct Args : public qpid::TestOptions {
bool sync;
uint reportFrequency;
uint timeLimit;
- uint queues;
+ uint concurrentConnections;
uint prefetch;
uint ack;
bool cumulative;
@@ -57,17 +57,18 @@ struct Args : public qpid::TestOptions {
bool durable;
string base;
bool singleConnect;
+ uint queues;
Args() : size(256), count(1000), rate(0), reportFrequency(1000),
- timeLimit(0), queues(1),
+ timeLimit(0), concurrentConnections(1),
prefetch(100), ack(0),
- durable(false), base("latency-test"), singleConnect(false)
+ durable(false), base("latency-test"), singleConnect(false), queues(1)
{
addOptions()
("size", optValue(size, "N"), "message size")
- ("queues", optValue(queues, "N"), "number of queues")
+ ("concurrentTests", optValue(concurrentConnections, "N"), "number of concurrent test setup")
("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.")
("count", optValue(count, "N"), "number of messages to send")
("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)")
@@ -81,7 +82,8 @@ struct Args : public qpid::TestOptions {
("durable", optValue(durable, "yes|no"), "use durable messages")
("csv", optValue(csv), "print stats in csv format (rate,min,max,avg)")
("cumulative", optValue(cumulative), "cumulative stats in csv format")
- ("queue-base-name", optValue(base, "<name>"), "base name for queues");
+ ("queue-base-name", optValue(base, "<name>"), "base name for queues")
+ ("queues", optValue(queues, "N"), "declare N queues & bindings to test routing");
}
};
@@ -246,8 +248,8 @@ void Receiver::test()
void Receiver::received(Message& msg)
{
++count;
- uint64_t sentAt = msg.getDeliveryProperties().getTimestamp();
uint64_t receivedAt = current_time();
+ uint64_t sentAt = msg.getDeliveryProperties().getTimestamp();
stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC);
@@ -357,11 +359,11 @@ void Sender::sendByRate()
++missedRate;
else
sys::usleep(delay / TIME_USEC);
- if (timeLimit != 0 && Duration(start, now()) > timeLimit) {
- session.sync();
- receiver.stop();
- break;
- }
+ if (timeLimit != 0 && Duration(start, now()) > timeLimit) {
+ session.sync();
+ receiver.stop();
+ break;
+ }
}
}
@@ -414,8 +416,32 @@ int main(int argc, char** argv)
opts.parse(argc, argv);
if (opts.cumulative)
opts.csv = true;
- boost::ptr_vector<Test> tests(opts.queues);
- for (uint i = 0; i < opts.queues; i++) {
+
+ Connection localConnection;
+ AsyncSession session;
+ if (opts.queues > 1){
+ opts.open(localConnection);
+ session = localConnection.newSession();
+ std::cout << "More than one queue being used, creating..." << std::endl;
+ // use default binding
+ for (uint i=0;i<opts.queues;i++){
+
+ std::ostringstream out;
+ out << opts.base << "-" << (i+1);
+ session.queueDeclare(arg::queue=out.str(), arg::durable=opts.durable, arg::autoDelete=true);
+ uint msgCount = session.queueQuery(arg::queue=out.str()).get().getMessageCount();
+ if (msgCount) {
+ std::cout << "Warning: found " << msgCount << " msgs on " << out.str() << ". Purging..." << std::endl;
+ session.queuePurge(arg::queue=out.str());
+ }
+
+ }
+ session.sync();
+ std::cout << "Complete..." << std::endl;
+ }
+
+ boost::ptr_vector<Test> tests(opts.concurrentConnections);
+ for (uint i = 0; i < opts.concurrentConnections; i++) {
std::ostringstream out;
out << opts.base << "-" << (i+1);
tests.push_back(new Test(out.str()));
@@ -437,6 +463,11 @@ int main(int argc, char** argv)
}
}
+ if (opts.queues > 1){
+ session.close();
+ localConnection.close();
+ }
+
return 0;
} catch(const std::exception& e) {
std::cout << e.what() << std::endl;