diff options
author | Alan Conway <aconway@apache.org> | 2011-09-23 18:10:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-23 18:10:26 +0000 |
commit | e3feec93b16698fefcdaabc238c8f07f15ff67ef (patch) | |
tree | 2b141c4accc02bab2de417152720b337e7cf32ea | |
parent | f062bb3306834b22143d9adada4310ed5989928b (diff) | |
download | qpid-python-e3feec93b16698fefcdaabc238c8f07f15ff67ef.tar.gz |
QPID-2920: Make consume-lock timeout configurable
- Configurable cluster-lock timeout.
- Tweaked qpid-cluster-benchmark default settings.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1174930 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/cluster.mk | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Core.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Core.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueHandler.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Settings.cpp | 31 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Settings.h | 43 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cluster-benchmark | 48 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 8 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-receive.cpp | 13 |
14 files changed, 124 insertions, 66 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 3e22ab696c..0945d58f07 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -132,6 +132,8 @@ cluster2_la_SOURCES = \ qpid/cluster/exp/QueueHandler.h \ qpid/cluster/exp/QueueReplica.cpp \ qpid/cluster/exp/QueueReplica.h \ + qpid/cluster/exp/Settings.cpp \ + qpid/cluster/exp/Settings.h \ qpid/cluster/exp/WiringHandler.cpp \ qpid/cluster/exp/WiringHandler.h diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp index 22b7f8f97c..21d497cfb5 100644 --- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp @@ -130,7 +130,7 @@ void BrokerContext::create(broker::Queue& q) { if (tssNoReplicate) return; assert(!QueueContext::get(q)); boost::intrusive_ptr<QueueContext> context( - new QueueContext(q, core.getMulticaster())); + new QueueContext(q, core.getSettings().getConsumeLock(), core.getMulticaster())); std::string data(q.encodedSize(), '\0'); framing::Buffer buf(&data[0], data.size()); q.encode(buf); diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp index 28b7dcec2e..cc8e064627 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp @@ -31,15 +31,16 @@ using broker::Broker; */ struct Cluster2Plugin : public Plugin { struct Opts : public Options { - Core::Settings& settings; - Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) { + Settings& settings; + Opts(Settings& s) : Options("Cluster Options"), settings(s) { addOptions() - ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join"); - // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h + ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join") + ("consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds."); + // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h } }; - Core::Settings settings; + Settings settings; Opts options; Core* core; // Core deletes itself on shutdown. diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp index 5241b9e414..1b24b7fcf6 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp @@ -39,10 +39,11 @@ namespace cluster { Core::Core(const Settings& s, broker::Broker& b) : broker(b), eventHandler(new EventHandler(*this)), - multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this)) + multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this)), + settings(s) { boost::intrusive_ptr<QueueHandler> queueHandler( - new QueueHandler(*eventHandler, multicaster)); + new QueueHandler(*eventHandler, multicaster, settings)); eventHandler->add(queueHandler); eventHandler->add(boost::intrusive_ptr<HandlerBase>( new WiringHandler(*eventHandler, queueHandler))); diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h index d0dc8e57a8..5f5237d679 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Core.h +++ b/qpid/cpp/src/qpid/cluster/exp/Core.h @@ -26,9 +26,11 @@ #include <memory> #include "LockedMap.h" #include "Multicaster.h" +#include "Settings.h" #include "qpid/cluster/types.h" #include "qpid/cluster/Cpg.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/sys/Time.h" // TODO aconway 2010-10-19: experimental cluster code. @@ -56,11 +58,6 @@ class BrokerContext; class Core { public: - /** Configuration settings */ - struct Settings { - std::string name; - }; - typedef LockedMap<RoutingId, boost::intrusive_ptr<broker::Message> > RoutingMap; /** Constructed during Plugin::earlyInitialize() */ @@ -84,12 +81,15 @@ class Core * Used to pass messages being routed from BrokerContext to MessageHandler */ RoutingMap& getRoutingMap() { return routingMap; } + + const Settings& getSettings() const { return settings; } private: broker::Broker& broker; std::auto_ptr<EventHandler> eventHandler; // Handles CPG events. BrokerContext* brokerHandler; // Handles broker events. RoutingMap routingMap; Multicaster multicaster; + Settings settings; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index fa36b9225d..f71b0d1865 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -35,11 +35,10 @@ namespace qpid { namespace cluster { -// FIXME aconway 2011-09-16: configurable timeout. -QueueContext::QueueContext(broker::Queue& q, Multicaster& m) +QueueContext::QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster& m) : timer(boost::bind(&QueueContext::timeout, this), q.getBroker()->getTimer(), - 100*sys::TIME_MSEC), + consumeLock), queue(q), mcast(m), consumers(0) { q.setClusterContext(boost::intrusive_ptr<QueueContext>(this)); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h index 8ed54596a8..cb1499f83c 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h @@ -51,7 +51,7 @@ class Multicaster; */ class QueueContext : public RefCounted { public: - QueueContext(broker::Queue& q, Multicaster& m); + QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster& m); ~QueueContext(); /** Replica state has changed, called in deliver thread. diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp index 37079a17a1..33f805ab82 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp @@ -33,8 +33,8 @@ namespace qpid { namespace cluster { // FIXME aconway 2011-05-11: make Multicaster+EventHandler available as Group, clean this up? -QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m) - : HandlerBase(eh), multicaster(m) {} +QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m, const Settings& s) + : HandlerBase(eh), multicaster(m), consumeLock(s.getConsumeLock()) {} bool QueueHandler::invoke(const framing::AMQBody& body) { return framing::invoke(*this, body).wasHandled(); @@ -64,7 +64,7 @@ void QueueHandler::add(boost::shared_ptr<broker::Queue> q) { // Local queues already have a context, remote queues need one. if (!QueueContext::get(*q)) - new QueueContext(*q, multicaster); // Context attaches itself to the Queue + new QueueContext(*q, consumeLock, multicaster); // Context attaches itself to the Queue // FIXME aconway 2011-09-15: thread safety: called from wiring handler.. queues[q->getName()] = boost::intrusive_ptr<QueueReplica>( new QueueReplica(q, self())); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h index 6494efb1b3..920c5de0b2 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h +++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h @@ -24,6 +24,7 @@ #include "HandlerBase.h" #include "LockedMap.h" +#include "Settings.h" #include "qpid/framing/AMQP_AllOperations.h" #include "boost/shared_ptr.hpp" #include "boost/intrusive_ptr.hpp" @@ -53,7 +54,7 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler, public HandlerBase { public: - QueueHandler(EventHandler&, Multicaster&); + QueueHandler(EventHandler&, Multicaster&, const Settings&); bool invoke(const framing::AMQBody& body); @@ -76,6 +77,7 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler, QueueMap queues; Multicaster& multicaster; + sys::Duration consumeLock; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp new file mode 100644 index 0000000000..59c8c4274c --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Settings.h" + +namespace qpid { +namespace cluster { + +Settings::Settings() : // Default settings + consumeLockMicros(100000) +{} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.h b/qpid/cpp/src/qpid/cluster/exp/Settings.h new file mode 100644 index 0000000000..9d3f5990ac --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/exp/Settings.h @@ -0,0 +1,43 @@ +#ifndef QPID_CLUSTER_EXP_SETTINGS_H +#define QPID_CLUSTER_EXP_SETTINGS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "qpid/sys/Time.h" + +// TODO aconway 2010-10-19: experimental cluster code. + +namespace qpid { +namespace cluster { + +/** Configuration settings */ +struct Settings { + Settings(); + std::string name; + uint32_t consumeLockMicros; + sys::Duration getConsumeLock() const { return consumeLockMicros * sys::TIME_USEC; } +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXP_SETTINGS_H*/ diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark index 9f64de31ce..484fe00912 100755 --- a/qpid/cpp/src/tests/qpid-cluster-benchmark +++ b/qpid/cpp/src/tests/qpid-cluster-benchmark @@ -20,39 +20,33 @@ # Benchmark script for comparing cluster performance. -# Default values -PORT="5672" -MESSAGES=10000 -FLOW=100 # Flow control limit on queue depth for latency. -REPEAT=10 -QUEUES=4 -CLIENTS=3 - -while getopts "p:m:f:r:t:b:q:c" opt; do +# Default options +MESSAGES="-m 10000" +FLOW="--flow-control 100" # Flow control limit on queue depth for latency. +REPEAT="--repeat 10" +QUEUES="-q 4" +SENDERS="-s 3" +RECEIVERS="-r 3" +BROKERS= # Local broker +CLIENT_HOSTS= # No ssh, all clients are local + +while getopts "m:f:n:b:q:s:r:c:" opt; do case $opt in - p) PORT=$OPTARG;; - m) MESSAGES=$OPTARG;; - f) FLOW=$OPTARG;; - r) REPEAT=$OPTARG;; - s) SCALE=$OPTARG;; - b) BROKERS=$OPTARG;; - q) QUEUES=$OPTARG;; - c) CLIENTS=$OPTARG;; + m) MESSAGES="-m $OPTARG";; + f) FLOW="--flow-control $OPTARG";; + n) REPEAT="--repeat $OPTARG";; + b) BROKERS="-b $OPTARG";; + q) QUEUES="-q $OPTARG";; + s) SENDERS="-s $OPTARG";; + r) RECEIVERS="-r $OPTARG";; + c) CLIENT_HOSTS="-c $OPTARG";; *) echo "Unknown option"; exit 1;; esac done -BROKERS=${BROKERS:-$(echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g")} # Broker URL list -BROKER=`echo $BROKERS | awk -F, '{print $1}'` # First broker - run_test() { echo $*; shift; "$@"; echo; echo; echo; } -# Multiple pubs/subs connect via multiple brokers (active-active) -run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $MESSAGES - -# Multiple pubs/subs connect via single broker (active-passive) -run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $MESSAGES +run_test "Throughput:" qpid-cpp-benchmark $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS -# Latency -run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $MESSAGES --flow-control $FLOW +run_test "Latency:" qpid-cpp-benchmark $REPEAT $BROKERS --connection-options "{tcp-nodelay:true}" $MESSAGES $FLOW $CLIENT_HOSTS diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 6da0c11944..bd83403f7e 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -115,9 +115,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - # FIXME aconway 2011-09-15: - # return clients.add(Popen(command, stdout=PIPE, stderr=PIPE)) - return clients.add(Popen(command, stdout=PIPE)) + return clients.add(Popen(command, stdout=PIPE, stderr=PIPE)) def start_send(queue, opts, broker, host): address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) @@ -130,9 +128,7 @@ def start_send(queue, opts, broker, host): "--report-total", "--report-header=no", "--timestamp=%s"%(opts.timestamp and "yes" or "no"), - # FIXME aconway 2011-09-15: - # "--sequence=no", - "--sequence=yes", + "--sequence=no", "--flow-control", str(opts.flow_control), "--durable", str(opts.durable) ] diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 689662c2ba..b26311abef 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -190,24 +190,13 @@ int main(int argc, char ** argv) session.createSender(opts.readyAddress).send(msg); // For receive rate calculation - qpid::sys::AbsTime start; // Will be set on first itertion. - bool started=false; + qpid::sys::AbsTime start(qpid::sys::now()); int64_t interval = 0; if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate; std::map<std::string,Sender> replyTo; while (!done && receiver.fetch(msg, timeout)) { - // FIXME aconway 2011-09-19: -// std::ostringstream os; -// os << "qpid-receive(" << getpid() << ") seq=" << msg.getProperties()[SN] << endl; // FIXME aconway 2011-09-19: -// cerr << os.str() << flush; - if (!started) { - // Start the time on receipt of the first message to avoid counting - // idle time at process startup. - start = qpid::sys::AbsTime::now(); - started = true; - } reporter.message(msg); if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) { if (msg.getContent() == EOS) { |