diff options
author | Alan Conway <aconway@apache.org> | 2008-09-05 19:53:44 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-05 19:53:44 +0000 |
commit | 650dc8949488e88797a61908723619aca5fc8909 (patch) | |
tree | e09ce3d723a4255abe8355887677e855c225d2c0 /qpid/cpp | |
parent | 79ffcd4b5cf08b435881dd28d1a673f287d42532 (diff) | |
download | qpid-python-650dc8949488e88797a61908723619aca5fc8909.tar.gz |
Fixed cluster membership notification.
Cluster events with RefCountedBuffers for queueing.
PollableQueue clears bacth immediately.
Improved perfdist: clients hit multiple brokers in a cluster.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@692521 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/cluster.mk | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/RefCountedBuffer.cpp | 45 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/RefCountedBuffer.h | 68 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 51 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cpg.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cpg.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Event.cpp | 52 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Event.h | 65 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/PollableQueue.h | 10 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/benchmark | 52 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/perfdist | 38 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/start_cluster_hosts | 16 |
14 files changed, 344 insertions, 100 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 25c27549c3..d52994a6e4 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -237,6 +237,8 @@ libqpidcommon_la_SOURCES = \ qpid/DataDir.cpp \ qpid/Options.cpp \ qpid/log/Options.cpp \ + qpid/RefCountedBuffer.h \ + qpid/RefCountedBuffer.cpp \ qpid/log/Selector.cpp \ qpid/log/Statement.cpp \ qpid/pointer_to_other.h diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index d5be6dfc57..cb07804550 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -25,7 +25,9 @@ libqpidcluster_la_SOURCES = \ qpid/cluster/WriteEstimate.cpp \ qpid/cluster/OutputInterceptor.h \ qpid/cluster/OutputInterceptor.cpp \ - qpid/cluster/ProxyInputHandler.h + qpid/cluster/ProxyInputHandler.h \ + qpid/cluster/Event.h \ + qpid/cluster/Event.cpp libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la diff --git a/qpid/cpp/src/qpid/RefCountedBuffer.cpp b/qpid/cpp/src/qpid/RefCountedBuffer.cpp new file mode 100644 index 0000000000..3a52b94412 --- /dev/null +++ b/qpid/cpp/src/qpid/RefCountedBuffer.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "RefCountedBuffer.h" + +namespace qpid { + +RefCountedBuffer::RefCountedBuffer() : count(0) {} + +void RefCountedBuffer::destroy() const { + this->~RefCountedBuffer(); + ::delete[] reinterpret_cast<const char*>(this); +} + +char* RefCountedBuffer::addr() const { + return const_cast<char*>(reinterpret_cast<const char*>(this)+sizeof(RefCountedBuffer)); +} + +RefCountedBuffer::intrusive_ptr RefCountedBuffer::create(size_t n) { + char* store=::new char[n+sizeof(RefCountedBuffer)]; + new(store) RefCountedBuffer; + return reinterpret_cast<RefCountedBuffer*>(store); +} + +} // namespace qpid + + diff --git a/qpid/cpp/src/qpid/RefCountedBuffer.h b/qpid/cpp/src/qpid/RefCountedBuffer.h new file mode 100644 index 0000000000..af46cbb92a --- /dev/null +++ b/qpid/cpp/src/qpid/RefCountedBuffer.h @@ -0,0 +1,68 @@ +#ifndef QPID_REFCOUNTEDBUFFER_H +#define QPID_REFCOUNTEDBUFFER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/utility.hpp> +#include <boost/detail/atomic_count.hpp> +#include <boost/intrusive_ptr.hpp> + +namespace qpid { + +/** + * Reference-counted byte buffer. + * No alignment guarantees. + */ +class RefCountedBuffer : boost::noncopyable { + mutable boost::detail::atomic_count count; + RefCountedBuffer(); + void destroy() const; + char* addr() const; + +public: + + typedef boost::intrusive_ptr<RefCountedBuffer> intrusive_ptr; + + /** Create a reference counted buffer of size n */ + static intrusive_ptr create(size_t n); + + /** Get a pointer to the start of the buffer. */ + char* get() { return addr(); } + const char* get() const { return addr(); } + char& operator[](size_t i) { return get()[i]; } + const char& operator[](size_t i) const { return get()[i]; } + + void addRef() const { ++count; } + void release() const { if (--count==0) destroy(); } + long refCount() { return count; } +}; + +} // namespace qpid + +// intrusive_ptr support. +namespace boost { +inline void intrusive_ptr_add_ref(const qpid::RefCountedBuffer* p) { p->addRef(); } +inline void intrusive_ptr_release(const qpid::RefCountedBuffer* p) { p->release(); } +} + + +#endif /*!QPID_REFCOUNTEDBUFFER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index f93203acbf..4d54a837ca 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -38,6 +38,7 @@ #include <algorithm> #include <iterator> #include <map> +#include <ostream> namespace qpid { namespace cluster { @@ -67,11 +68,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : ) { broker->addFinalizer(boost::bind(&Cluster::leave, this)); - QPID_LOG(trace, "Joining cluster: " << name << " as " << self); + QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self); cpg.join(name); - mcastFrame(AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), - ConnectionId(self,0)); - // Start dispatching from the poller. cpgDispatchHandle.startWatch(poller); } @@ -94,7 +92,7 @@ void Cluster::leave() { // Leave is called by from Broker destructor after the poller has // been shut down. No dispatches can occur. - QPID_LOG(debug, "Leaving cluster " << name.str()); + QPID_LOG(notice, "Leaving cluster " << name.str()); cpg.leave(name); // broker= is set to 0 when the final config-change is delivered. while(broker) { @@ -158,7 +156,7 @@ boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) if (i == connections.end()) { // New shadow connection. assert(id.getMember() != self); std::ostringstream mgmtId; - mgmtId << name << ":" << id; + mgmtId << name.str() << ":" << id; ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id)); i = connections.insert(value).first; } @@ -205,22 +203,50 @@ void Cluster::deliver( } } +struct AddrList { + const cpg_address* addrs; + int count; + AddrList(const cpg_address* a, int n) : addrs(a), count(n) {} +}; + +ostream& operator<<(ostream& o, const AddrList& a) { + for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { + const char* reasonString; + switch (p->reason) { + case CPG_REASON_JOIN: reasonString = " joined "; break; + case CPG_REASON_LEAVE: reasonString = " left ";break; + case CPG_REASON_NODEDOWN: reasonString = " node-down ";break; + case CPG_REASON_NODEUP: reasonString = " node-up ";break; + case CPG_REASON_PROCDOWN: reasonString = " process-down ";break; + default: reasonString = " "; + } + qpid::cluster::MemberId member(*p); + o << member << reasonString; + } + return o; +} + void Cluster::configChange( cpg_handle_t /*handle*/, cpg_name */*group*/, cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address */*joined*/, int /*nJoined*/) + cpg_address *joined, int nJoined) { - QPID_LOG(debug, "Cluster change: " - << std::make_pair(current, nCurrent) - << std::make_pair(left, nLeft)); + QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " + << AddrList(joined, nJoined) << AddrList(left, nLeft)); + + if (nJoined) // Notfiy new members of my URL. + mcastFrame( + AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), + ConnectionId(self,0)); + Mutex::ScopedLock l(lock); for (int i = 0; i < nLeft; ++i) urls.erase(left[i]); // Add new members when their URL notice arraives. - if (std::find(left, left+nLeft, self) != left+nLeft) + if (find(left, left+nLeft, self) != left+nLeft) broker = 0; // We have left the group, this is the final config change. lock.notifyAll(); // Threads waiting for membership changes. } @@ -236,7 +262,8 @@ void Cluster::disconnect(sys::DispatchHandle& h) { broker->shutdown(); } -void Cluster::urlNotice(const MemberId& m, const std::string& url) { +void Cluster::urlNotice(const MemberId& m, const string& url) { + QPID_LOG(notice, "Cluster member " << m << " has URL " << url); urls.insert(UrlMap::value_type(m,Url(url))); } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 4963400e10..630de97093 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -19,7 +19,6 @@ * */ -#include "qpid/cluster/types.h" #include "qpid/cluster/Cpg.h" #include "qpid/cluster/PollableQueue.h" #include "qpid/cluster/NoOpConnectionOutputHandler.h" diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 754b4abd58..96a5b3da43 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -184,33 +184,3 @@ ostream& operator<<(ostream& o, const ConnectionId& c) { } }} // namespace qpid::cluster - - -// In proper namespace for ADL. - -std::ostream& operator<<(std::ostream& o, const ::cpg_address& a) { - const char* reasonString; - switch (a.reason) { - case CPG_REASON_JOIN: reasonString = " joined"; break; - case CPG_REASON_LEAVE: reasonString = " left";break; - case CPG_REASON_NODEDOWN: reasonString = " node-down";break; - case CPG_REASON_NODEUP: reasonString = " node-up";break; - case CPG_REASON_PROCDOWN: reasonString = " process-down";break; - default: reasonString = ""; - } - return o << qpid::cluster::MemberId(a.nodeid, a.pid) << reasonString; -} - -std::ostream& operator<<(std::ostream& o, const cpg_name& name) { - return o << std::string(name.value, name.length); -} - -namespace std { -ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) { - for (cpg_address* p = a.first; p < a.first+a.second; ++p) - o << *p << " "; - return o; -} -} - - diff --git a/qpid/cpp/src/qpid/cluster/Cpg.h b/qpid/cpp/src/qpid/cluster/Cpg.h index fdc451fbbc..5ffd42e12a 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.h +++ b/qpid/cpp/src/qpid/cluster/Cpg.h @@ -158,8 +158,6 @@ class Cpg : public sys::IOHandle { bool isShutdown; }; -std::ostream& operator <<(std::ostream& out, const MemberId& id); - inline bool operator==(const cpg_name& a, const cpg_name& b) { return a.length==b.length && strncmp(a.value, b.value, a.length) == 0; } @@ -167,12 +165,4 @@ inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); }} // namespace qpid::cluster -// In proper namespaces for ADL -std::ostream& operator <<(std::ostream& out, const cpg_name& name); -std::ostream& operator<<(std::ostream& o, const cpg_address& a); -namespace std { -std::ostream& operator <<(std::ostream& out, std::pair<cpg_address*,int> addresses); -} - - #endif /*!CPG_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp new file mode 100644 index 0000000000..ff558842e4 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/Event.cpp @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "types.h" +#include "Event.h" +#include "Cpg.h" +#include "qpid/framing/Buffer.h" + +namespace qpid { +namespace cluster { +using framing::Buffer; + +const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/; + +Event::Event(EventType t, const ConnectionId c, const size_t s) + : type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {} + +Event::Event(const MemberId& m, const char* d, size_t s) + : connection(m, 0), size(s-OVERHEAD), data(RefCountedBuffer::create(size)) +{ + memcpy(data->get(), d, s); +} + +void Event::mcast(const Cpg::Name& name, Cpg& cpg) { + char header[OVERHEAD]; + Buffer b; + b.putOctet(type); + b.putLongLong(reinterpret_cast<uint64_t>(connection.getConnectionPtr())); + iovec iov[] = { { header, b.getPosition() }, { data.get(), size } }; + cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); +} + + + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h new file mode 100644 index 0000000000..3e4a19f7f3 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/Event.h @@ -0,0 +1,65 @@ +#ifndef QPID_CLUSTER_EVENT_H +#define QPID_CLUSTER_EVENT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "Cpg.h" +#include "qpid/RefCountedBuffer.h" + +namespace qpid { +namespace cluster { + +// TODO aconway 2008-09-03: more efficient solution for shared +// byte-stream data. +// + +/** + * Events are sent to/received from the cluster. + * Refcounted so they can be stored on queues. + */ +struct Event { + public: + /** Create an event with for mcasting, with size bytes of space. */ + Event(EventType t, const ConnectionId c, size_t size); + + /** Create an event from delivered data. */ + Event(const MemberId& m, const char* data, size_t size); + + void mcast(const Cpg::Name& name, Cpg& cpg); + + EventType getType() const { return type; } + ConnectionId getConnection() const { return connection; } + size_t getSize() const { return size; } + char* getData() { return data->get(); } + + private: + static const size_t OVERHEAD; + EventType type; + ConnectionId connection; + size_t size; + RefCountedBuffer::intrusive_ptr data; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EVENT_H*/ diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h index 29891da344..74da2df750 100644 --- a/qpid/cpp/src/qpid/cluster/PollableQueue.h +++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h @@ -89,9 +89,13 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { batch.clear(); batch.swap(queue); condition.clear(); - ScopedUnlock u(lock); - callback(batch.begin(), batch.end()); // Process outside the lock to allow concurrent push. - h.rewatch(); + { + // Process outside the lock to allow concurrent push. + ScopedUnlock u(lock); + callback(batch.begin(), batch.end()); + h.rewatch(); + } + batch.clear(); } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/tests/benchmark b/qpid/cpp/src/tests/benchmark index 9e8023043b..5381fd69fa 100755 --- a/qpid/cpp/src/tests/benchmark +++ b/qpid/cpp/src/tests/benchmark @@ -21,33 +21,45 @@ # and latency against a single cluster member while they are replicating. # # Must be run in the qpid src/tests build directory. -# +# +usage() { +cat <<EOF +Usage: $0 [options] -- client hosts --- broker hosts +Read the script for options. +EOF +} # Defaults TESTDIR=${TESTDIR:-$PWD} # Absolute path to test exes on all hosts. -SCRIPTDIR=${SCRIPTDIR:-$PWD/`dirname $0`} # Absolute path to test exes on all hosts. -SAMPLES=10 # Runs of each test. -COUNT=${COUNT:-10000} # Count for pub/sub tests. -ECHO=${ECHO:-1000} # Count for echo test. -BROKER_FLAGS= +SCRIPTDIR=${SCRIPTDIR:-$PWD/`dirname $0`} # Absolute path to test scripts on all hosts. +SAMPLES=10 # Runs of each test. +COUNT=${COUNT:-10000} # Count for pub/sub tests. +SIZE=${SIZE:-600} # Size of messages +ECHO=${ECHO:-1000} # Count for echo test. -while getopts "t:b:p:s:c:n:e" opt ; do - case $opt in - t) TESTDIR=$OPTARG ;; - b) BROKER_FLAGS="$BROKER_FLAGS -b $OPTARG" ;; - p) BROKER_FLAGS="$BROKER_FLAGS -p $OPTARG" ;; - s) SAMPLES=$OPTARG ;; - c) CLIENTS="$CLIENTS $OPTARG" ;; - n) COUNT="$OPTARG" ;; - e) ECHO="$OPTARG" ;; +collect() { eval $COLLECT=\""\$$COLLECT $*"\"; } +COLLECT=ARGS +while test $# -gt 0; do + case $1 in + --testdir) TESTDIR=$2 ; shift 2 ;; + --samples) SAMPLES=$2 ; shift 2 ;; + --count) COUNT=$2 ; shift 2 ;; + --echos) ECHO==$2 ; shift 2 ;; + --size) SIZE==$2 ; shift 2 ;; + --) COLLECT=CLIENTARG; shift ;; + ---) COLLECT=BROKERARG; shift;; + *) collect $1; shift ;; esac done +CLIENTS=${CLIENTARG:-$CLIENTS} +BROKERS=${BROKERARG:-$BROKERS} test -z "$CLIENTS" && { echo "Must specify at least one client host."; exit 1; } -test -z "$BROKER_FLAGS" && { echo "Must specify a broker host."; exit 1; } +test -z "$BROKERS" && { echo "Must specify at least one broker host."; exit 1; } export TESTDIR # For perfdist CLIENTS=($CLIENTS) # Convert to array +BROKERS=($BROKERS) trap "rm -f $FILES" EXIT dosamples() { @@ -63,13 +75,13 @@ dosamples() { } HEADING="pub sub total Mb" -dosamples $SCRIPTDIR/perfdist $BROKER_FLAGS --count $COUNT --nsubs 2 --npubs 2 --qt 2 -s -- ${CLIENTS[*]} +dosamples $SCRIPTDIR/perfdist --count $COUNT --nsubs 2 --npubs 2 --qt 2 -s -- ${CLIENTS[*]} --- ${BROKERS[*]} HEADING="pub" -dosamples ssh -A ${CLIENTS[0]} $TESTDIR/publish --routing-key perftest0 -s $BROKER_FLAGS --count $COUNT +dosamples ssh -A ${CLIENTS[0]} $TESTDIR/publish --routing-key perftest0 --count $COUNT -s -b ${BROKERS[0]} HEADING="sub" -dosamples ssh -A ${CLIENTS[0]} $TESTDIR/consume --queue perftest0 -s $BROKER_FLAGS --count $COUNT +dosamples ssh -A ${CLIENTS[0]} $TESTDIR/consume --queue perftest0 -s --count $COUNT -b ${BROKERS[0]} HEADING="min max avg" -dosamples ssh -A ${CLIENTS[0]} $TESTDIR/echotest --count $ECHO -s +dosamples ssh -A ${CLIENTS[0]} $TESTDIR/echotest --count $ECHO -s -b ${BROKERS[0]} echo echo "Tab separated spreadsheet (also stored in benchmark.tab):" diff --git a/qpid/cpp/src/tests/perfdist b/qpid/cpp/src/tests/perfdist index 59b6396fe0..9ba92310a9 100755 --- a/qpid/cpp/src/tests/perfdist +++ b/qpid/cpp/src/tests/perfdist @@ -7,17 +7,16 @@ set -e usage() { cat <<EOF -usage: $0 <perftest-args> -- <client-hosts ...> +usage: $0 <perftest-args> -- <client-hosts ...> [ --- <broker hosts...> ] +Client & broker hosts can also be set in env vars CLIENTS and BROKERS. -Run perftest with clients running on the listed hosts. Clients are -assigned to hosts publishers first, then subscribers the host list is -used round-robin if there are more clients than hosts. perftest-args should -include a --host <brokerhost> flag (and --port if necessary). +Run perftest with clients running on the clients and brokers running +on the specified hosts. Clients are assigned to client hosts round +robin: publishers first, then subscribers. If there are multiple +brokers (for cluster tests) clients connect to them round robin. -Do not pass preftest action flags: --setup, --control, --publish, --subscribe. -The script will pass them to the appropriate client processes. - -Note all perftest args must come before --. +Broker hosts can be listed with -b in perftest-args or after --- +at the end of the arguments. Error: $* EOF @@ -36,19 +35,28 @@ while test $# -gt 0; do --npubs) collect $1 $2; NPUBS=$2; shift 2 ;; --nsubs) collect $1 $2; NSUBS=$2; shift 2 ;; -s|--summary) collect $1; QUIET=yes; shift 1 ;; - --) COLLECT=HOSTS; shift ;; + -b|--broker) BROKERS="$BROKERS $2"; shift 2;; + --) COLLECT=CLIENTARG; shift ;; + ---) COLLECT=BROKERARG; shift;; *) collect $1; shift ;; esac done -if [ -z "$HOSTS" ]; then usage "No hosts listed after --"; fi +CLIENTS=${CLIENTARG:-$CLIENTS} +if [ -z "$CLIENTS" ]; then usage "No client hosts listed after --"; fi +BROKERS=${BROKERARG:-$BROKERS} +if [ -z "$BROKERS" ]; then usage "No brokers specified"; fi + PERFTEST="$TESTDIR/perftest $ARGS" -HOSTS=($HOSTS) +CLIENTS=($CLIENTS) +BROKERS=($BROKERS) start() { - HOST=${HOSTS[i % ${#HOSTS[*]}]} - test -z "$QUIET" && echo "Client $i on $HOST $*" - ssh -fT $HOST "PATH=$ADDPATH:\$PATH" $PERFTEST "$@" + CLIENT=${CLIENTS[i % ${#CLIENTS[*]}]} + BROKER=${BROKERS[i % ${#BROKERS[*]}]} + ARGS="$* --broker $BROKER" + test -z "$QUIET" && echo "Client $i on $CLIENT: $ARGS" + ssh -fT $CLIENT $PERFTEST "$@" } $PERFTEST --setup diff --git a/qpid/cpp/src/tests/start_cluster_hosts b/qpid/cpp/src/tests/start_cluster_hosts index 37dda882ca..683798453b 100755 --- a/qpid/cpp/src/tests/start_cluster_hosts +++ b/qpid/cpp/src/tests/start_cluster_hosts @@ -16,14 +16,14 @@ QPIDD=${QPIDD:-$PWD/../qpidd} LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/libqpidcluster.so} -CLUSTER=$USER # User name is default cluster name. +NAME=$USER # User name is default cluster name. RESTART=NO -while getopts "kp:c:q:r" ARG ; do +while getopts "kp:n:q:r" ARG ; do case $ARG in k) KILL=yes ;; p) PORT="$OPTARG" ;; - c) CLUSTER=$OPTARG ;; + n) NAME=$OPTARG ;; q) QPIDD=$OPTARG ;; l) LIBQPIDCLUSTER=$OPTARG ;; r) RESTART=yes ;; @@ -33,17 +33,17 @@ done shift `expr $OPTIND - 1` test -n "$PORT" && PORTOPT="-p $PORT" test "$KILL" = yes && KILL="$QPIDD -q $PORTOPT ;" -test -z "$*" && { echo Must specify at least one host; exit 1; } +CLUSTER=${*:-$CLUSTER} # Use args or env +test -z "$CLUSTER" && { echo Must specify at least one host; exit 1; } -OPTS="-d $PORTOPT --load-module $LIBQPIDCLUSTER --cluster-name=$CLUSTER --no-data-dir --auth=no --log-output=syslog" +OPTS="-d $PORTOPT --load-module $LIBQPIDCLUSTER --cluster-name=$NAME --no-data-dir --auth=no --log-output=syslog --log-enable=info+" num=0 -for h in $*; do +for h in $CLUSTER; do num=`expr $num + 1` # Give a unique log prefix to each node. cmd="$KILL $QPIDD $OPTS --log-prefix $num.$h" - echo == $h - out=`echo "$cmd" | ssh $h newgrp ais` || { echo $out ; exit 1; } + out=`echo "$cmd" | ssh $h newgrp ais` || { echo == $h error: $out ; exit 1; } if [ "$PORT" = 0 ] ; then p=$out; else p=$PORT; fi echo "$h $p" done |