summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-23 18:10:26 +0000
committerAlan Conway <aconway@apache.org>2011-09-23 18:10:26 +0000
commite3feec93b16698fefcdaabc238c8f07f15ff67ef (patch)
tree2b141c4accc02bab2de417152720b337e7cf32ea
parentf062bb3306834b22143d9adada4310ed5989928b (diff)
downloadqpid-python-e3feec93b16698fefcdaabc238c8f07f15ff67ef.tar.gz
QPID-2920: Make consume-lock timeout configurable
- Configurable cluster-lock timeout. - Tweaked qpid-cluster-benchmark default settings. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1174930 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/cluster.mk2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Core.h10
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp6
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueHandler.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Settings.cpp31
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Settings.h43
-rwxr-xr-xqpid/cpp/src/tests/qpid-cluster-benchmark48
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark8
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp13
14 files changed, 124 insertions, 66 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index 3e22ab696c..0945d58f07 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -132,6 +132,8 @@ cluster2_la_SOURCES = \
qpid/cluster/exp/QueueHandler.h \
qpid/cluster/exp/QueueReplica.cpp \
qpid/cluster/exp/QueueReplica.h \
+ qpid/cluster/exp/Settings.cpp \
+ qpid/cluster/exp/Settings.h \
qpid/cluster/exp/WiringHandler.cpp \
qpid/cluster/exp/WiringHandler.h
diff --git a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
index 22b7f8f97c..21d497cfb5 100644
--- a/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
@@ -130,7 +130,7 @@ void BrokerContext::create(broker::Queue& q) {
if (tssNoReplicate) return;
assert(!QueueContext::get(q));
boost::intrusive_ptr<QueueContext> context(
- new QueueContext(q, core.getMulticaster()));
+ new QueueContext(q, core.getSettings().getConsumeLock(), core.getMulticaster()));
std::string data(q.encodedSize(), '\0');
framing::Buffer buf(&data[0], data.size());
q.encode(buf);
diff --git a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
index 28b7dcec2e..cc8e064627 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
@@ -31,15 +31,16 @@ using broker::Broker;
*/
struct Cluster2Plugin : public Plugin {
struct Opts : public Options {
- Core::Settings& settings;
- Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) {
+ Settings& settings;
+ Opts(Settings& s) : Options("Cluster Options"), settings(s) {
addOptions()
- ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join");
- // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
+ ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join")
+ ("consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds.");
+ // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
}
};
- Core::Settings settings;
+ Settings settings;
Opts options;
Core* core; // Core deletes itself on shutdown.
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.cpp b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
index 5241b9e414..1b24b7fcf6 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.cpp
@@ -39,10 +39,11 @@ namespace cluster {
Core::Core(const Settings& s, broker::Broker& b) :
broker(b),
eventHandler(new EventHandler(*this)),
- multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this))
+ multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this)),
+ settings(s)
{
boost::intrusive_ptr<QueueHandler> queueHandler(
- new QueueHandler(*eventHandler, multicaster));
+ new QueueHandler(*eventHandler, multicaster, settings));
eventHandler->add(queueHandler);
eventHandler->add(boost::intrusive_ptr<HandlerBase>(
new WiringHandler(*eventHandler, queueHandler)));
diff --git a/qpid/cpp/src/qpid/cluster/exp/Core.h b/qpid/cpp/src/qpid/cluster/exp/Core.h
index d0dc8e57a8..5f5237d679 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Core.h
+++ b/qpid/cpp/src/qpid/cluster/exp/Core.h
@@ -26,9 +26,11 @@
#include <memory>
#include "LockedMap.h"
#include "Multicaster.h"
+#include "Settings.h"
#include "qpid/cluster/types.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/sys/Time.h"
// TODO aconway 2010-10-19: experimental cluster code.
@@ -56,11 +58,6 @@ class BrokerContext;
class Core
{
public:
- /** Configuration settings */
- struct Settings {
- std::string name;
- };
-
typedef LockedMap<RoutingId, boost::intrusive_ptr<broker::Message> > RoutingMap;
/** Constructed during Plugin::earlyInitialize() */
@@ -84,12 +81,15 @@ class Core
* Used to pass messages being routed from BrokerContext to MessageHandler
*/
RoutingMap& getRoutingMap() { return routingMap; }
+
+ const Settings& getSettings() const { return settings; }
private:
broker::Broker& broker;
std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
BrokerContext* brokerHandler; // Handles broker events.
RoutingMap routingMap;
Multicaster multicaster;
+ Settings settings;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index fa36b9225d..f71b0d1865 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -35,11 +35,10 @@
namespace qpid {
namespace cluster {
-// FIXME aconway 2011-09-16: configurable timeout.
-QueueContext::QueueContext(broker::Queue& q, Multicaster& m)
+QueueContext::QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster& m)
: timer(boost::bind(&QueueContext::timeout, this),
q.getBroker()->getTimer(),
- 100*sys::TIME_MSEC),
+ consumeLock),
queue(q), mcast(m), consumers(0)
{
q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
index 8ed54596a8..cb1499f83c 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
@@ -51,7 +51,7 @@ class Multicaster;
*/
class QueueContext : public RefCounted {
public:
- QueueContext(broker::Queue& q, Multicaster& m);
+ QueueContext(broker::Queue& q, sys::Duration consumeLock, Multicaster& m);
~QueueContext();
/** Replica state has changed, called in deliver thread.
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
index 37079a17a1..33f805ab82 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
@@ -33,8 +33,8 @@ namespace qpid {
namespace cluster {
// FIXME aconway 2011-05-11: make Multicaster+EventHandler available as Group, clean this up?
-QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m)
- : HandlerBase(eh), multicaster(m) {}
+QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m, const Settings& s)
+ : HandlerBase(eh), multicaster(m), consumeLock(s.getConsumeLock()) {}
bool QueueHandler::invoke(const framing::AMQBody& body) {
return framing::invoke(*this, body).wasHandled();
@@ -64,7 +64,7 @@ void QueueHandler::add(boost::shared_ptr<broker::Queue> q) {
// Local queues already have a context, remote queues need one.
if (!QueueContext::get(*q))
- new QueueContext(*q, multicaster); // Context attaches itself to the Queue
+ new QueueContext(*q, consumeLock, multicaster); // Context attaches itself to the Queue
// FIXME aconway 2011-09-15: thread safety: called from wiring handler..
queues[q->getName()] = boost::intrusive_ptr<QueueReplica>(
new QueueReplica(q, self()));
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
index 6494efb1b3..920c5de0b2 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
@@ -24,6 +24,7 @@
#include "HandlerBase.h"
#include "LockedMap.h"
+#include "Settings.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "boost/shared_ptr.hpp"
#include "boost/intrusive_ptr.hpp"
@@ -53,7 +54,7 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler,
public HandlerBase
{
public:
- QueueHandler(EventHandler&, Multicaster&);
+ QueueHandler(EventHandler&, Multicaster&, const Settings&);
bool invoke(const framing::AMQBody& body);
@@ -76,6 +77,7 @@ class QueueHandler : public framing::AMQP_AllOperations::ClusterQueueHandler,
QueueMap queues;
Multicaster& multicaster;
+ sys::Duration consumeLock;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
new file mode 100644
index 0000000000..59c8c4274c
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Settings.h"
+
+namespace qpid {
+namespace cluster {
+
+Settings::Settings() : // Default settings
+ consumeLockMicros(100000)
+{}
+
+}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.h b/qpid/cpp/src/qpid/cluster/exp/Settings.h
new file mode 100644
index 0000000000..9d3f5990ac
--- /dev/null
+++ b/qpid/cpp/src/qpid/cluster/exp/Settings.h
@@ -0,0 +1,43 @@
+#ifndef QPID_CLUSTER_EXP_SETTINGS_H
+#define QPID_CLUSTER_EXP_SETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "qpid/sys/Time.h"
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+namespace qpid {
+namespace cluster {
+
+/** Configuration settings */
+struct Settings {
+ Settings();
+ std::string name;
+ uint32_t consumeLockMicros;
+ sys::Duration getConsumeLock() const { return consumeLockMicros * sys::TIME_USEC; }
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXP_SETTINGS_H*/
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark
index 9f64de31ce..484fe00912 100755
--- a/qpid/cpp/src/tests/qpid-cluster-benchmark
+++ b/qpid/cpp/src/tests/qpid-cluster-benchmark
@@ -20,39 +20,33 @@
# Benchmark script for comparing cluster performance.
-# Default values
-PORT="5672"
-MESSAGES=10000
-FLOW=100 # Flow control limit on queue depth for latency.
-REPEAT=10
-QUEUES=4
-CLIENTS=3
-
-while getopts "p:m:f:r:t:b:q:c" opt; do
+# Default options
+MESSAGES="-m 10000"
+FLOW="--flow-control 100" # Flow control limit on queue depth for latency.
+REPEAT="--repeat 10"
+QUEUES="-q 4"
+SENDERS="-s 3"
+RECEIVERS="-r 3"
+BROKERS= # Local broker
+CLIENT_HOSTS= # No ssh, all clients are local
+
+while getopts "m:f:n:b:q:s:r:c:" opt; do
case $opt in
- p) PORT=$OPTARG;;
- m) MESSAGES=$OPTARG;;
- f) FLOW=$OPTARG;;
- r) REPEAT=$OPTARG;;
- s) SCALE=$OPTARG;;
- b) BROKERS=$OPTARG;;
- q) QUEUES=$OPTARG;;
- c) CLIENTS=$OPTARG;;
+ m) MESSAGES="-m $OPTARG";;
+ f) FLOW="--flow-control $OPTARG";;
+ n) REPEAT="--repeat $OPTARG";;
+ b) BROKERS="-b $OPTARG";;
+ q) QUEUES="-q $OPTARG";;
+ s) SENDERS="-s $OPTARG";;
+ r) RECEIVERS="-r $OPTARG";;
+ c) CLIENT_HOSTS="-c $OPTARG";;
*) echo "Unknown option"; exit 1;;
esac
done
-BROKERS=${BROKERS:-$(echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g")} # Broker URL list
-BROKER=`echo $BROKERS | awk -F, '{print $1}'` # First broker
-
run_test() { echo $*; shift; "$@"; echo; echo; echo; }
-# Multiple pubs/subs connect via multiple brokers (active-active)
-run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $MESSAGES
-
-# Multiple pubs/subs connect via single broker (active-passive)
-run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $MESSAGES
+run_test "Throughput:" qpid-cpp-benchmark $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS
-# Latency
-run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $MESSAGES --flow-control $FLOW
+run_test "Latency:" qpid-cpp-benchmark $REPEAT $BROKERS --connection-options "{tcp-nodelay:true}" $MESSAGES $FLOW $CLIENT_HOSTS
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 6da0c11944..bd83403f7e 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -115,9 +115,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- # FIXME aconway 2011-09-15:
- # return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
- return clients.add(Popen(command, stdout=PIPE))
+ return clients.add(Popen(command, stdout=PIPE, stderr=PIPE))
def start_send(queue, opts, broker, host):
address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
@@ -130,9 +128,7 @@ def start_send(queue, opts, broker, host):
"--report-total",
"--report-header=no",
"--timestamp=%s"%(opts.timestamp and "yes" or "no"),
- # FIXME aconway 2011-09-15:
- # "--sequence=no",
- "--sequence=yes",
+ "--sequence=no",
"--flow-control", str(opts.flow_control),
"--durable", str(opts.durable)
]
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index 689662c2ba..b26311abef 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -190,24 +190,13 @@ int main(int argc, char ** argv)
session.createSender(opts.readyAddress).send(msg);
// For receive rate calculation
- qpid::sys::AbsTime start; // Will be set on first itertion.
- bool started=false;
+ qpid::sys::AbsTime start(qpid::sys::now());
int64_t interval = 0;
if (opts.receiveRate) interval = qpid::sys::TIME_SEC/opts.receiveRate;
std::map<std::string,Sender> replyTo;
while (!done && receiver.fetch(msg, timeout)) {
- // FIXME aconway 2011-09-19:
-// std::ostringstream os;
-// os << "qpid-receive(" << getpid() << ") seq=" << msg.getProperties()[SN] << endl; // FIXME aconway 2011-09-19:
-// cerr << os.str() << flush;
- if (!started) {
- // Start the time on receipt of the first message to avoid counting
- // idle time at process startup.
- start = qpid::sys::AbsTime::now();
- started = true;
- }
reporter.message(msg);
if (!opts.ignoreDuplicates || !sequenceTracker.isDuplicate(msg)) {
if (msg.getContent() == EOS) {