diff options
author | Alan Conway <aconway@apache.org> | 2012-12-19 21:24:38 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-12-19 21:24:38 +0000 |
commit | ee9947727aea741b14e28006a2adfa23b6de3a48 (patch) | |
tree | ee1bf22f86481ac3a9e6aa9995173658a1f7dd35 /qpid/cpp/src | |
parent | cae4ddd7bce66f83f5ebc05e47c3163ce3986ee7 (diff) | |
download | qpid-python-ee9947727aea741b14e28006a2adfa23b6de3a48.tar.gz |
QPID-4514: Remove obsolete cluster code: cleanup tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1424139 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
26 files changed, 3 insertions, 4520 deletions
diff --git a/qpid/cpp/src/qpid/UrlArray.h b/qpid/cpp/src/qpid/UrlArray.h index ce9e42f248..f0065f0f0c 100644 --- a/qpid/cpp/src/qpid/UrlArray.h +++ b/qpid/cpp/src/qpid/UrlArray.h @@ -1,5 +1,5 @@ -#ifndef QPID_CLUSTER_URLARRAY_H -#define QPID_CLUSTER_URLARRAY_H +#ifndef QPID_URLARRAY_H +#define QPID_URLARRAY_H /* * @@ -33,4 +33,4 @@ QPID_COMMON_EXTERN std::vector<Url> urlArrayToVector(const framing::Array& array QPID_COMMON_EXTERN framing::Array vectorToUrlArray(const std::vector<Url>& urls); } // namespace qpid -#endif /* !QPID_CLUSTER_URLARRAY_H */ +#endif /* !QPID_URLARRAY_H */ diff --git a/qpid/cpp/src/tests/ClusterFailover.cpp b/qpid/cpp/src/tests/ClusterFailover.cpp deleted file mode 100644 index bf5c147f19..0000000000 --- a/qpid/cpp/src/tests/ClusterFailover.cpp +++ /dev/null @@ -1,115 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/**@file Tests for partial failure in a cluster. - * Partial failure means some nodes experience a failure while others do not. - * In this case the failed nodes must shut down. - */ - -#include "test_tools.h" -#include "unit_test.h" -#include "ClusterFixture.h" -#include "qpid/client/FailoverManager.h" -#include <boost/assign.hpp> -#include <boost/algorithm/string.hpp> -#include <boost/bind.hpp> - -namespace qpid { -namespace tests { - -QPID_AUTO_TEST_SUITE(ClusterFailoverTestSuite) - -using namespace std; -using namespace qpid; -using namespace qpid::cluster; -using namespace qpid::framing; -using namespace qpid::client; -using namespace qpid::client::arg; -using namespace boost::assign; -using broker::Broker; -using boost::shared_ptr; - -// Timeout for tests that wait for messages -const sys::Duration TIMEOUT=sys::TIME_SEC/4; - -ClusterFixture::Args getArgs(bool durable=std::getenv("STORE_LIB")) -{ - ClusterFixture::Args args; - args += "--auth", "no", "--no-module-dir", - "--load-module", getLibPath("CLUSTER_LIB"); - if (durable) - args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR"; - else - args += "--no-data-dir"; - return args; -} - -// Test re-connecting with same session name after a failure. -QPID_AUTO_TEST_CASE(testReconnectSameSessionName) { - ClusterFixture cluster(2, getArgs(), -1); - // Specify a timeout to make sure it is ignored, session resume is - // not implemented so sessions belonging to dead brokers should - // not be kept. - Client c0(cluster[0], "foo", 5); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size()); // wait for both. - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("sendme", "q")); - BOOST_CHECK_EQUAL(c0.subs.get("q").getData(), "sendme"); - cluster.killWithSilencer(0, c0.connection, 9); - Client c1(cluster[1], "foo", 5); - c1.session.queueQuery(); // Try to use the session. -} - -QPID_AUTO_TEST_CASE(testReconnectExclusiveQueue) { - // Regresion test. Session timeouts should be ignored - // by the broker as session resume is not implemented. - ClusterFixture cluster(2, getArgs(), -1); - Client c0(cluster[0], "foo", 5); - c0.session.queueDeclare("exq", arg::exclusive=true); - SubscriptionSettings settings; - settings.exclusive = true; - settings.autoAck = 0; - Subscription s0 = c0.subs.subscribe(c0.lq, "exq", settings, "exsub"); - c0.session.messageTransfer(arg::content=Message("sendme", "exq")); - BOOST_CHECK_EQUAL(c0.lq.get().getData(), "sendme"); - - // Regression: core dump on exit if unacked messages were left in - // a session with a timeout. - cluster.killWithSilencer(0, c0.connection); - - // Regression: session timeouts prevented re-connecting to - // exclusive queue. - Client c1(cluster[1]); - c1.session.queueDeclare("exq", arg::exclusive=true); - Subscription s1 = c1.subs.subscribe(c1.lq, "exq", settings, "exsub"); - s1.cancel(); - - // Regression: session timeouts prevented new member joining - // cluster with exclusive queues. - cluster.add(); - Client c2(cluster[2]); - c2.session.queueQuery(); -} - - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/ClusterFixture.cpp b/qpid/cpp/src/tests/ClusterFixture.cpp deleted file mode 100644 index 6b62cb6fc7..0000000000 --- a/qpid/cpp/src/tests/ClusterFixture.cpp +++ /dev/null @@ -1,160 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "test_tools.h" -#include "unit_test.h" -#include "ForkedBroker.h" -#include "BrokerFixture.h" - -#include "qpid/client/Connection.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/client/Session.h" -#include "qpid/client/FailoverListener.h" -#include "qpid/cluster/Cluster.h" -#include "qpid/cluster/Cpg.h" -#include "qpid/cluster/UpdateClient.h" -#include "qpid/framing/AMQBody.h" -#include "qpid/framing/Uuid.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/enum.h" -#include "qpid/log/Logger.h" - -#include <boost/bind.hpp> -#include <boost/shared_ptr.hpp> -#include <boost/assign.hpp> - -#include <string> -#include <iostream> -#include <iterator> -#include <vector> -#include <set> -#include <algorithm> -#include <iterator> - - -using namespace std; -using namespace qpid; -using namespace qpid::cluster; -using namespace qpid::framing; -using namespace qpid::client; -using qpid::sys::TIME_SEC; -using qpid::broker::Broker; -using boost::shared_ptr; -using qpid::cluster::Cluster; -using boost::assign::list_of; - - -#include "ClusterFixture.h" - -namespace qpid { -namespace tests { - -ClusterFixture::ClusterFixture(size_t n, const Args& args_, int localIndex_) - : name(Uuid(true).str()), localIndex(localIndex_), userArgs(args_) -{ - add(n); -} - -ClusterFixture::ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs_, int localIndex_) - : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_) -{ - add(n); -} - -ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) { - Args args = list_of<string>("qpidd ") - ("--cluster-name")(name) - ("--log-prefix")(prefix); - args.insert(args.end(), userArgs.begin(), userArgs.end()); - if (updateArgs) updateArgs(args, index); - return args; -} - -void ClusterFixture::add() { - if (size() != size_t(localIndex)) { // fork a broker process. - std::ostringstream os; os << "fork" << size(); - std::string prefix = os.str(); - forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix, size())))); - push_back(forkedBrokers.back()->getPort()); - } - else { // Run in this process - addLocal(); - } -} - -namespace { -/** Parse broker & cluster options */ -Broker::Options parseOpts(size_t argc, const char* argv[]) { - Broker::Options opts; - Plugin::addOptions(opts); // Pick up cluster options. - opts.parse(argc, argv, "", true); // Allow-unknown for --load-module - return opts; -} -} - -void ClusterFixture::addLocal() { - assert(int(size()) == localIndex); - ostringstream os; os << "local" << localIndex; - string prefix = os.str(); - Args args(makeArgs(prefix, localIndex)); - vector<const char*> argv(args.size()); - transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1)); - qpid::log::Logger::instance().setPrefix(prefix); - localBroker.reset(new BrokerFixture(parseOpts(argv.size(), &argv[0]))); - push_back(localBroker->getPort()); - forkedBrokers.push_back(shared_ptr<ForkedBroker>()); -} - -bool ClusterFixture::hasLocal() const { return localIndex >= 0 && size_t(localIndex) < size(); } - -/** Kill a forked broker with sig, or shutdown localBroker if n==0. */ -void ClusterFixture::kill(size_t n, int sig) { - if (n == size_t(localIndex)) - localBroker->broker->shutdown(); - else - forkedBrokers[n]->kill(sig); -} - -/** Kill a broker and suppress errors from closing connection c. */ -void ClusterFixture::killWithSilencer(size_t n, client::Connection& c, int sig) { - ScopedSuppressLogging sl; - try { c.close(); } catch(...) {} - kill(n,sig); -} - -/** - * Get the known broker ports from a Connection. - *@param n if specified wait for the cluster size to be n, up to a timeout. - */ -std::set<int> knownBrokerPorts(qpid::client::Connection& c, int n) { - FailoverListener fl(c, false); - std::vector<qpid::Url> urls = fl.getKnownBrokers(); - if (n >= 0 && unsigned(n) != urls.size()) { - // Retry up to 10 secs in .1 second intervals. - for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) { - qpid::sys::usleep(1000*100); // 0.1 secs - urls = fl.getKnownBrokers(); - } - } - std::set<int> s; - for (std::vector<qpid::Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) - s.insert((*i)[0].port); - return s; -} - -}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/ClusterFixture.h b/qpid/cpp/src/tests/ClusterFixture.h deleted file mode 100644 index f548ff9376..0000000000 --- a/qpid/cpp/src/tests/ClusterFixture.h +++ /dev/null @@ -1,115 +0,0 @@ -#ifndef CLUSTER_FIXTURE_H -#define CLUSTER_FIXTURE_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "test_tools.h" -#include "unit_test.h" -#include "ForkedBroker.h" -#include "BrokerFixture.h" - -#include "qpid/client/Connection.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/client/Session.h" -#include "qpid/client/FailoverListener.h" -#include "qpid/cluster/Cluster.h" -#include "qpid/cluster/Cpg.h" -#include "qpid/cluster/UpdateClient.h" -#include "qpid/framing/AMQBody.h" -#include "qpid/framing/Uuid.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/enum.h" -#include "qpid/log/Logger.h" - -#include <boost/bind.hpp> -#include <boost/function.hpp> -#include <boost/shared_ptr.hpp> - -#include <string> -#include <iostream> -#include <iterator> -#include <vector> -#include <set> -#include <algorithm> -#include <iterator> - - -using namespace std; -using namespace qpid; -using namespace qpid::cluster; -using namespace qpid::framing; -using namespace qpid::client; -using qpid::sys::TIME_SEC; -using qpid::broker::Broker; -using boost::shared_ptr; -using qpid::cluster::Cluster; - -namespace qpid { -namespace tests { - -/** Cluster fixture is a vector of ports for the replicas. - * - * At most one replica (by default replica 0) is in the current - * process, all others are forked as children. - */ -class ClusterFixture : public vector<uint16_t> { - public: - typedef std::vector<std::string> Args; - - /** @param localIndex can be -1 meaning don't automatically start a local broker. - * A local broker can be started with addLocal(). - */ - ClusterFixture(size_t n, const Args& args, int localIndex=-1); - - /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */ - ClusterFixture(size_t n, boost::function<void (Args&, size_t)> updateArgs, int localIndex=-1); - - void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } - void add(); // Add a broker. - void setup(); - - bool hasLocal() const; - - /** Kill a forked broker with sig, or shutdown localBroker. */ - void kill(size_t n, int sig=SIGINT); - - /** Kill a broker and suppress errors from closing connection c. */ - void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT); - - private: - - void addLocal(); // Add a local broker. - Args makeArgs(const std::string& prefix, size_t index); - string name; - std::auto_ptr<BrokerFixture> localBroker; - int localIndex; - std::vector<shared_ptr<ForkedBroker> > forkedBrokers; - Args userArgs; - boost::function<void (Args&, size_t)> updateArgs; -}; - -/** - * Get the known broker ports from a Connection. - *@param n if specified wait for the cluster size to be n, up to a timeout. - */ -std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n=-1); - -}} // namespace qpid::tests - -#endif /*!CLUSTER_FIXTURE_H*/ diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 4184b5f38a..113005d2c2 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -376,7 +376,6 @@ EXTRA_DIST+= \ multiq_perftest \ topic_perftest \ run_failover_soak \ - federated_cluster_test_with_node_failure \ sasl_test_setup.sh \ run_msg_group_tests_soak \ qpidd-empty.conf diff --git a/qpid/cpp/src/tests/cluster_authentication_soak.cpp b/qpid/cpp/src/tests/cluster_authentication_soak.cpp deleted file mode 100644 index a3271701c3..0000000000 --- a/qpid/cpp/src/tests/cluster_authentication_soak.cpp +++ /dev/null @@ -1,310 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <signal.h> -#include <fcntl.h> - -#include <sys/wait.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <sys/time.h> - -#include <string> -#include <iostream> -#include <sstream> -#include <vector> - -#include <boost/assign.hpp> - -#include "qpid/framing/Uuid.h" - -#include <ForkedBroker.h> -#include <qpid/client/Connection.h> - -#include <sasl/sasl.h> - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - - - - -using namespace std; -using boost::assign::list_of; -using namespace qpid::framing; -using namespace qpid::client; - - -namespace qpid { -namespace tests { - -vector<pid_t> brokerPids; - -typedef vector<ForkedBroker *> brokerVector; - - - - - -int runSilent = 1; -int newbiePort = 0; - - -void -makeClusterName ( string & s ) { - stringstream ss; - ss << "authenticationSoakCluster_" << Uuid(true).str(); - s = ss.str(); -} - - - -void -startBroker ( brokerVector & brokers , int brokerNumber, string const & clusterName ) { - stringstream prefix, clusterArg; - prefix << "soak-" << brokerNumber; - clusterArg << "--cluster-name=" << clusterName; - - std::vector<std::string> argv; - - argv.push_back ("../qpidd"); - argv.push_back ("--no-module-dir"); - argv.push_back ("--load-module=../.libs/cluster.so"); - argv.push_back (clusterArg.str()); - argv.push_back ("--cluster-username=zig"); - argv.push_back ("--cluster-password=zig"); - argv.push_back ("--cluster-mechanism=PLAIN"); - argv.push_back ("--sasl-config=./sasl_config"); - argv.push_back ("--auth=yes"); - argv.push_back ("--mgmt-enable=yes"); - argv.push_back ("--log-prefix"); - argv.push_back (prefix.str()); - argv.push_back ("--log-to-file"); - argv.push_back (prefix.str()+".log"); - argv.push_back ("TMP_DATA_DIR"); - - ForkedBroker * newbie = new ForkedBroker (argv); - newbiePort = newbie->getPort(); - brokers.push_back ( newbie ); -} - - - - -bool -runPerftest ( bool hangTest ) { - stringstream portSs; - portSs << newbiePort; - string portStr = portSs.str(); - char const * path = "./qpid-perftest"; - - vector<char const *> argv; - argv.push_back ( "./qpid-perftest" ); - argv.push_back ( "-p" ); - argv.push_back ( portStr.c_str() ); - argv.push_back ( "--username" ); - argv.push_back ( "zig" ); - argv.push_back ( "--password" ); - argv.push_back ( "zig" ); - argv.push_back ( "--mechanism" ); - argv.push_back ( "DIGEST-MD5" ); - argv.push_back ( "--count" ); - argv.push_back ( "20000" ); - argv.push_back ( 0 ); - - pid_t pid = fork(); - - if ( ! pid ) { - int i=open("/dev/null",O_RDWR); - dup2 ( i, fileno(stdout) ); - dup2 ( i, fileno(stderr) ); - - execv ( path, const_cast<char * const *>(&argv[0]) ); - // The exec failed: we are still in parent process. - perror ( "error running qpid-perftest: " ); - return false; - } - else { - if ( hangTest ) { - if ( ! runSilent ) - cerr << "Pausing perftest " << pid << endl; - kill ( pid, 19 ); - } - - struct timeval startTime, - currentTime, - duration; - - gettimeofday ( & startTime, 0 ); - - while ( 1 ) { - sleep ( 2 ); - int status; - int returned_pid = waitpid ( pid, &status, WNOHANG ); - if ( returned_pid == pid ) { - int exit_status = WEXITSTATUS(status); - if ( exit_status ) { - cerr << "qpid-perftest failed. exit_status was: " << exit_status << endl; - return false; - } - else { - return true; // qpid-perftest succeeded. - } - } - else { // qpid-perftest has not yet completed. - gettimeofday ( & currentTime, 0 ); - timersub ( & currentTime, & startTime, & duration ); - if ( duration.tv_sec > 60 ) { - kill ( pid, 9 ); - cerr << "qpid-perftest pid " << pid << " hanging: killed.\n"; - return false; - } - } - } - - } -} - - - -bool -allBrokersAreAlive ( brokerVector & brokers ) { - for ( unsigned int i = 0; i < brokers.size(); ++ i ) - if ( ! brokers[i]->isRunning() ) - return false; - - return true; -} - - - - - -void -killAllBrokers ( brokerVector & brokers ) { - for ( unsigned int i = 0; i < brokers.size(); ++ i ) { - brokers[i]->kill ( 9 ); - } -} - - - - -void -killOneBroker ( brokerVector & brokers ) { - int doomedBroker = getpid() % brokers.size(); - cout << "Killing broker " << brokers[doomedBroker]->getPID() << endl; - brokers[doomedBroker]->kill ( 9 ); - sleep ( 2 ); -} - - - - -}} // namespace qpid::tests - -using namespace qpid::tests; - - - -/* - * Please note that this test has self-test capability. - * It is intended to detect - * 1. perftest hangs. - * 2. broker deaths - * Both of these condtions can be forced when running manually - * to ensure that the test really does detect them. - * See command-line arguments 3 and 4. - */ -int -main ( int argc, char ** argv ) -{ - // I need the SASL_PATH_TYPE_CONFIG feature, which did not appear until SASL 2.1.22 -#if (SASL_VERSION_FULL < ((2<<16)|(1<<8)|22)) - cout << "Skipping SASL test, SASL version too low." << endl; - return 0; -#endif - - int n_iterations = argc > 1 ? atoi(argv[1]) : 1; - runSilent = argc > 2 ? atoi(argv[2]) : 1; // default to silent - int killBroker = argc > 3 ? atoi(argv[3]) : 0; // Force the kill of one broker. - int hangTest = argc > 4 ? atoi(argv[4]) : 0; // Force the first perftest to hang. - int n_brokers = 3; - brokerVector brokers; - - srand ( getpid() ); - string clusterName; - makeClusterName ( clusterName ); - for ( int i = 0; i < n_brokers; ++ i ) { - startBroker ( brokers, i, clusterName ); - } - - sleep ( 3 ); - - /* Run all qpid-perftest iterations, and only then check for brokers - * still being up. If you just want a quick check for the failure - * mode in which a single iteration would kill all brokers except - * the client-connected one, just run it with the iterations arg - * set to 1. - */ - for ( int iteration = 0; iteration < n_iterations; ++ iteration ) { - if ( ! runPerftest ( hangTest ) ) { - if ( ! runSilent ) - cerr << "qpid-perftest " << iteration << " failed.\n"; - return 1; - } - if ( ! ( iteration % 10 ) ) { - if ( ! runSilent ) - cerr << "qpid-perftest " << iteration << " complete. -------------- \n"; - } - } - if ( ! runSilent ) - cerr << "\nqpid-perftest " << n_iterations << " iterations complete. -------------- \n\n"; - - /* If the command-line tells us to kill a broker, do - * it now. Use this option to prove that this test - * really can detect broker-deaths. - */ - if ( killBroker ) { - killOneBroker ( brokers ); - } - - if ( ! allBrokersAreAlive ( brokers ) ) { - if ( ! runSilent ) - cerr << "not all brokers are alive.\n"; - killAllBrokers ( brokers ); - return 2; - } - - killAllBrokers ( brokers ); - if ( ! runSilent ) - cout << "success.\n"; - - return 0; -} - - - diff --git a/qpid/cpp/src/tests/cluster_failover b/qpid/cpp/src/tests/cluster_failover deleted file mode 100755 index 43170c731a..0000000000 --- a/qpid/cpp/src/tests/cluster_failover +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/sh -# A simple manual failover test, sends a stream of numbered messages. -# You can kill the connected broker and verify that the clients reconnect -# and no messages are lost. - -URL=$1 -test -n "$URL" || { echo Usage: $0 URL ; exit 1; } -SEND=$(mktemp /tmp/send.XXXXXXXXXX) -RECV=$(mktemp /tmp/recv.XXXXXXXXXX) -echo $SEND $RECV - -seq 1000000 > $SEND - -qpid-send -a 'cluster_failover;{create:always}' -b $URL --connection-options "{reconnect:true}" --send-rate 10 --content-stdin < $SEND & - -while msg=$(qpid-receive -m1 -f -a 'cluster_failover;{create:always}' -b $URL --connection-options "{reconnect:true,heartbeat:1}"); do - echo -n $msg; date -done -wait diff --git a/qpid/cpp/src/tests/cluster_python_tests b/qpid/cpp/src/tests/cluster_python_tests deleted file mode 100755 index 25c7889246..0000000000 --- a/qpid/cpp/src/tests/cluster_python_tests +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Skip if cluster services not running. -. cpg_check.sh -cpg_enabled || exit 0 - -FAILING=`dirname $0`/cluster_python_tests_failing.txt -source `dirname $0`/python_tests - diff --git a/qpid/cpp/src/tests/cluster_python_tests_failing.txt b/qpid/cpp/src/tests/cluster_python_tests_failing.txt deleted file mode 100644 index f8639d7b59..0000000000 --- a/qpid/cpp/src/tests/cluster_python_tests_failing.txt +++ /dev/null @@ -1,4 +0,0 @@ -qpid_tests.broker_0_10.management.ManagementTest.test_purge_queue -qpid_tests.broker_0_10.management.ManagementTest.test_connection_close -qpid_tests.broker_0_10.message.MessageTests.test_ttl -qpid_tests.broker_0_10.management.ManagementTest.test_broker_connectivity_oldAPI diff --git a/qpid/cpp/src/tests/cluster_read_credit b/qpid/cpp/src/tests/cluster_read_credit deleted file mode 100755 index 552ffee53b..0000000000 --- a/qpid/cpp/src/tests/cluster_read_credit +++ /dev/null @@ -1,29 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Regression test for http://issues.apache.org/jira/browse/QPID-2086 - -srcdir=`dirname $0` -source cpg_check.sh -cpg_enabled || exit 0 - -$srcdir/start_cluster 1 --cluster-read-max=2 || exit 1 -trap $srcdir/stop_cluster EXIT -seq 1 10000 | ./sender --port `cat cluster.ports` --routing-key no-such-queue diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp deleted file mode 100644 index f2ccd0ba84..0000000000 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ /dev/null @@ -1,1231 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "test_tools.h" -#include "unit_test.h" -#include "ForkedBroker.h" -#include "BrokerFixture.h" -#include "ClusterFixture.h" - -#include "qpid/client/Connection.h" -#include "qpid/client/ConnectionSettings.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/client/Session.h" -#include "qpid/client/FailoverListener.h" -#include "qpid/client/FailoverManager.h" -#include "qpid/client/QueueOptions.h" -#include "qpid/cluster/Cluster.h" -#include "qpid/cluster/Cpg.h" -#include "qpid/cluster/UpdateClient.h" -#include "qpid/framing/AMQBody.h" -#include "qpid/framing/Uuid.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/enum.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/log/Logger.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Thread.h" - -#include <boost/bind.hpp> -#include <boost/shared_ptr.hpp> -#include <boost/assign.hpp> - -#include <string> -#include <iostream> -#include <fstream> -#include <iterator> -#include <vector> -#include <set> -#include <algorithm> -#include <iterator> - - -using namespace std; -using namespace qpid; -using namespace qpid::cluster; -using namespace qpid::framing; -using namespace qpid::client; -using namespace boost::assign; -using broker::Broker; -using boost::shared_ptr; - -namespace qpid { -namespace tests { - -QPID_AUTO_TEST_SUITE(cluster_test) - -bool durableFlag = std::getenv("STORE_LIB") != 0; - -void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { - ostringstream clusterLib; - clusterLib << getLibPath("CLUSTER_LIB"); - args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str(); - if (durableFlag) - args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR"; - else - args += "--no-data-dir"; -} - -ClusterFixture::Args prepareArgs(const bool durableFlag = false) { - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - return args; -} - -// Timeout for tests that wait for messages -const sys::Duration TIMEOUT=2*sys::TIME_SEC; - - -ostream& operator<<(ostream& o, const cpg_name* n) { - return o << Cpg::str(*n); -} - -ostream& operator<<(ostream& o, const cpg_address& a) { - return o << "(" << a.nodeid <<","<<a.pid<<","<<a.reason<<")"; -} - -template <class T> -ostream& operator<<(ostream& o, const pair<T*, int>& array) { - o << "{ "; - ostream_iterator<cpg_address> i(o, " "); - copy(array.first, array.first+array.second, i); - o << "}"; - return o; -} - -template <class C> set<int> makeSet(const C& c) { - set<int> s; - copy(c.begin(), c.end(), inserter(s, s.begin())); - return s; -} - -class Sender { - public: - Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {} - void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) { - AMQFrame f(body); - f.setChannel(channel); - f.setFirstSegment(firstSeg); - f.setLastSegment(lastSeg); - f.setFirstFrame(firstFrame); - f.setLastFrame(lastFrame); - connection->expand(f.encodedSize(), false); - connection->handle(f); - } - - private: - boost::shared_ptr<ConnectionImpl> connection; - uint16_t channel; -}; - -int64_t getMsgSequence(const Message& m) { - return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence"); -} - -Message ttlMessage(const string& data, const string& key, uint64_t ttl, bool durable = false) { - Message m(data, key); - m.getDeliveryProperties().setTtl(ttl); - if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - return m; -} - -Message makeMessage(const string& data, const string& key, bool durable = false) { - Message m(data, key); - if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - return m; -} - -vector<string> browse(Client& c, const string& q, int n) { - SubscriptionSettings browseSettings( - FlowControl::messageCredit(n), - ACCEPT_MODE_NONE, - ACQUIRE_MODE_NOT_ACQUIRED, - 0 // No auto-ack. - ); - LocalQueue lq; - c.subs.subscribe(lq, q, browseSettings); - c.session.messageFlush(q); - vector<string> result; - for (int i = 0; i < n; ++i) { - Message m; - if (!lq.get(m, TIMEOUT)) - break; - result.push_back(m.getData()); - } - c.subs.getSubscription(q).cancel(); - return result; -} - -ConnectionSettings aclSettings(int port, const std::string& id) { - ConnectionSettings settings; - settings.port = port; - settings.mechanism = "PLAIN"; - settings.username = id; - settings.password = id; - return settings; -} - -// An illegal frame body -struct PoisonPill : public AMQBody { - virtual uint8_t type() const { return 0xFF; } - virtual void encode(Buffer& ) const {} - virtual void decode(Buffer& , uint32_t=0) {} - virtual uint32_t encodedSize() const { return 0; } - - virtual void print(std::ostream&) const {}; - virtual void accept(AMQBodyConstVisitor&) const {}; - - virtual AMQMethodBody* getMethod() { return 0; } - virtual const AMQMethodBody* getMethod() const { return 0; } - - /** Match if same type and same class/method ID for methods */ - static bool match(const AMQBody& , const AMQBody& ) { return false; } - virtual boost::intrusive_ptr<AMQBody> clone() const { return new PoisonPill; } -}; - -QPID_AUTO_TEST_CASE(testBadClientData) { - // Ensure that bad data on a client connection closes the - // connection but does not stop the broker. - ClusterFixture::Args args; - prepareArgs(args, false); - args += "--log-enable=critical"; // Supress expected errors - ClusterFixture cluster(2, args, -1); - Client c0(cluster[0]); - Client c1(cluster[1]); - boost::shared_ptr<client::ConnectionImpl> ci = - client::ConnectionAccess::getImpl(c0.connection); - AMQFrame poison(boost::intrusive_ptr<AMQBody>(new PoisonPill)); - ci->expand(poison.encodedSize(), false); - ci->handle(poison); - { - ScopedSuppressLogging sl; - BOOST_CHECK_THROW(c0.session.queueQuery("q0"), Exception); - } - Client c00(cluster[0]); - BOOST_CHECK_EQUAL(c00.session.queueQuery("q00").getQueue(), ""); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getQueue(), ""); -} - -QPID_AUTO_TEST_CASE(testAcl) { - ofstream policyFile("cluster_test.acl"); - policyFile << "acl allow foo@QPID create queue name=foo" << endl - << "acl allow foo@QPID create queue name=foo2" << endl - << "acl deny foo@QPID create queue name=bar" << endl - << "acl allow all all" << endl; - policyFile.close(); - char cwd[1024]; - BOOST_CHECK(::getcwd(cwd, sizeof(cwd))); - ostringstream aclLib; - aclLib << getLibPath("ACL_LIB"); - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - args += "--log-enable=critical"; // Supress expected errors - args += "--acl-file", string(cwd) + "/cluster_test.acl", - "--cluster-mechanism", "PLAIN", - "--cluster-username", "cluster", - "--cluster-password", "cluster", - "--load-module", aclLib.str(); - ClusterFixture cluster(2, args, -1); - - Client c0(aclSettings(cluster[0], "c0"), "c0"); - Client c1(aclSettings(cluster[1], "c1"), "c1"); - Client foo(aclSettings(cluster[1], "foo"), "foo"); - - foo.session.queueDeclare("foo", arg::durable=durableFlag); - BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo"); - - { - ScopedSuppressLogging sl; - BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException); - } - BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty()); - BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty()); - - cluster.add(); - Client c2(aclSettings(cluster[2], "c2"), "c2"); - { - ScopedSuppressLogging sl; - BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException); - } - BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty()); -} - -QPID_AUTO_TEST_CASE(testMessageTimeToLive) { - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(2, args, -1); - Client c0(cluster[0], "c0"); - Client c1(cluster[1], "c1"); - c0.session.queueDeclare("p", arg::durable=durableFlag); - c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200, durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("b", "q", durableFlag)); - c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 100000, durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("y", "p", durableFlag)); - cluster.add(); - Client c2(cluster[1], "c2"); - - BOOST_CHECK_EQUAL(browse(c0, "p", 1), list_of<string>("x")); - BOOST_CHECK_EQUAL(browse(c1, "p", 1), list_of<string>("x")); - BOOST_CHECK_EQUAL(browse(c2, "p", 1), list_of<string>("x")); - - sys::usleep(200*1000); - BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b")); - BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b")); - BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of<string>("b")); -} - -QPID_AUTO_TEST_CASE(testSequenceOptions) { - // Make sure the exchange qpid.msg_sequence property is properly replicated. - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c0(cluster[0], "c0"); - FieldTable ftargs; - ftargs.setInt("qpid.msg_sequence", 1); - c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); - c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs); - c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); - c0.session.messageTransfer(arg::content=makeMessage("1", "k", durableFlag), arg::destination="ex"); - c0.session.messageTransfer(arg::content=makeMessage("2", "k", durableFlag), arg::destination="ex"); - BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT))); - BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT))); - - cluster.add(); - Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=makeMessage("3", "k", durableFlag), arg::destination="ex"); - BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT))); -} - -QPID_AUTO_TEST_CASE(testTxTransaction) { - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c0(cluster[0], "c0"); - c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=makeMessage("A", "q", durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("B", "q", durableFlag)); - - // Start a transaction that will commit. - Session commitSession = c0.connection.newSession("commit"); - SubscriptionManager commitSubs(commitSession); - commitSession.txSelect(); - commitSession.messageTransfer(arg::content=makeMessage("a", "q", durableFlag)); - commitSession.messageTransfer(arg::content=makeMessage("b", "q", durableFlag)); - BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A"); - - // Start a transaction that will roll back. - Session rollbackSession = c0.connection.newSession("rollback"); - SubscriptionManager rollbackSubs(rollbackSession); - rollbackSession.txSelect(); - rollbackSession.messageTransfer(arg::content=makeMessage("1", "q", durableFlag)); - Message rollbackMessage = rollbackSubs.get("q", TIMEOUT); - BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B"); - - BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); - // Add new member mid transaction. - cluster.add(); - Client c1(cluster[1], "c1"); - - // More transactional work - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - rollbackSession.messageTransfer(arg::content=makeMessage("2", "q", durableFlag)); - commitSession.messageTransfer(arg::content=makeMessage("c", "q", durableFlag)); - rollbackSession.messageTransfer(arg::content=makeMessage("3", "q", durableFlag)); - - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - - // Commit/roll back. - commitSession.txCommit(); - rollbackSession.txRollback(); - rollbackSession.messageRelease(rollbackMessage.getId()); - - // Verify queue status: just the comitted messages and dequeues should remain. - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b"); - BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c"); - - commitSession.close(); - rollbackSession.close(); -} - -QPID_AUTO_TEST_CASE(testUnacked) { - // Verify replication of unacknowledged messages. - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c0(cluster[0], "c0"); - - Message m; - - // Create unacked message: acquired but not accepted. - SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0); - c0.session.queueDeclare("q1", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=makeMessage("11","q1", durableFlag)); - LocalQueue q1; - c0.subs.subscribe(q1, "q1", manualAccept); - BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted - BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue - - // Create unacked message: not acquired, accepted or completeed. - SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0); - c0.session.queueDeclare("q2", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=makeMessage("21","q2", durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("22","q2", durableFlag)); - LocalQueue q2; - c0.subs.subscribe(q2, "q2", manualAcquire); - m = q2.get(TIMEOUT); // Not acquired or accepted, still on queue - BOOST_CHECK_EQUAL(m.getData(), "21"); - BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed - c0.subs.getSubscription("q2").acquire(m); // Acquire manually - BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed - BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue - BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired. - - // Create empty credit record: acquire and accept but don't complete. - SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION); - c0.session.queueDeclare("q3", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=makeMessage("31", "q3", durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("32", "q3", durableFlag)); - LocalQueue q3; - c0.subs.subscribe(q3, "q3", manualComplete); - Message m31=q3.get(TIMEOUT); - BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed. - BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u); - - // Add new member while there are unacked messages. - cluster.add(); - Client c1(cluster[1], "c1"); - - // Check queue counts - BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u); - - // Complete the empty credit message, should unblock the message behind it. - BOOST_CHECK_THROW(q3.get(0), Exception); - c0.session.markCompleted(SequenceSet(m31.getId()), true); - BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32"); - BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u); - - // Close the original session - unacked messages should be requeued. - c0.session.close(); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u); - - BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11"); - BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21"); - BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22"); -} - -// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented. -void testUpdateTxState() { - // Verify that we update transaction state correctly to new members. - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c0(cluster[0], "c0"); - - // Do work in a transaction. - c0.session.txSelect(); - c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=makeMessage("1","q", durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("2","q", durableFlag)); - Message m; - BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT)); - BOOST_CHECK_EQUAL(m.getData(), "1"); - - // New member, TX not comitted, c1 should see nothing. - cluster.add(); - Client c1(cluster[1], "c1"); - BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); - - // After commit c1 shoudl see results of tx. - c0.session.txCommit(); - BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); - BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); - BOOST_CHECK_EQUAL(m.getData(), "2"); - - // Another transaction with both members active. - c0.session.messageTransfer(arg::content=makeMessage("3","q", durableFlag)); - BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); - c0.session.txCommit(); - BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); - BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); - BOOST_CHECK_EQUAL(m.getData(), "3"); -} - -QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { - // Verify that we update a partially recieved message to a new member. - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q", arg::durable=durableFlag); - Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel()); - - // Send first 2 frames of message. - MessageTransferBody transfer( - ProtocolVersion(), string(), // default exchange. - framing::message::ACCEPT_MODE_NONE, - framing::message::ACQUIRE_MODE_PRE_ACQUIRED); - sender.send(transfer, true, false, true, true); - AMQHeaderBody header; - header.get<DeliveryProperties>(true)->setRoutingKey("q"); - if (durableFlag) - header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT); - else - header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT); - sender.send(header, false, false, true, true); - - // No reliable way to ensure the partial message has arrived - // before we start the new broker, so we sleep. - sys::usleep(2500); - cluster.add(); - - // Send final 2 frames of message. - sender.send(AMQContentBody("ab"), false, true, true, false); - sender.send(AMQContentBody("cd"), false, true, false, true); - - // Verify message is enqued correctly on second member. - Message m; - Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); - BOOST_CHECK_EQUAL(m.getData(), "abcd"); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size()); -} - -QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c0(cluster[0], "c0"); - set<int> kb0 = knownBrokerPorts(c0.connection, 1); - BOOST_CHECK_EQUAL(kb0.size(), 1u); - BOOST_CHECK_EQUAL(kb0, makeSet(cluster)); - - cluster.add(); - Client c1(cluster[1], "c1"); - set<int> kb1 = knownBrokerPorts(c1.connection, 2); - kb0 = knownBrokerPorts(c0.connection, 2); - BOOST_CHECK_EQUAL(kb1.size(), 2u); - BOOST_CHECK_EQUAL(kb1, makeSet(cluster)); - BOOST_CHECK_EQUAL(kb1,kb0); - - cluster.add(); - Client c2(cluster[2], "c2"); - set<int> kb2 = knownBrokerPorts(c2.connection, 3); - kb1 = knownBrokerPorts(c1.connection, 3); - kb0 = knownBrokerPorts(c0.connection, 3); - BOOST_CHECK_EQUAL(kb2.size(), 3u); - BOOST_CHECK_EQUAL(kb2, makeSet(cluster)); - BOOST_CHECK_EQUAL(kb2,kb0); - BOOST_CHECK_EQUAL(kb2,kb1); - - cluster.killWithSilencer(1,c1.connection,9); - kb0 = knownBrokerPorts(c0.connection, 2); - kb2 = knownBrokerPorts(c2.connection, 2); - BOOST_CHECK_EQUAL(kb0.size(), 2u); - BOOST_CHECK_EQUAL(kb0, kb2); -} - -QPID_AUTO_TEST_CASE(testUpdateConsumers) { - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - - Client c0(cluster[0], "c0"); - c0.session.queueDeclare("p", arg::durable=durableFlag); - c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.subs.subscribe(c0.lq, "q", FlowControl::zero()); - LocalQueue lp; - c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1)); - c0.session.sync(); - - // Start new members - cluster.add(); // Local - Client c1(cluster[1], "c1"); - cluster.add(); - Client c2(cluster[2], "c2"); - - // Transfer messages - c0.session.messageTransfer(arg::content=makeMessage("aaa", "q", durableFlag)); - - c0.session.messageTransfer(arg::content=makeMessage("bbb", "p", durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("ccc", "p", durableFlag)); - - // Activate the subscription, ensure message removed on all queues. - c0.subs.setFlowControl("q", FlowControl::unlimited()); - Message m; - BOOST_CHECK(c0.lq.get(m, TIMEOUT)); - BOOST_CHECK_EQUAL(m.getData(), "aaa"); - BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); - - // Check second subscription's flow control: gets first message, not second. - BOOST_CHECK(lp.get(m, TIMEOUT)); - BOOST_CHECK_EQUAL(m.getData(), "bbb"); - BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u); - BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u); - BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u); - - BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT)); - BOOST_CHECK_EQUAL(m.getData(), "ccc"); - - // Kill the subscribing member, ensure further messages are not removed. - cluster.killWithSilencer(0,c0.connection,9); - BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u); - for (int i = 0; i < 10; ++i) { - c1.session.messageTransfer(arg::content=makeMessage("xxx", "q", durableFlag)); - BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT)); - BOOST_REQUIRE_EQUAL(m.getData(), "xxx"); - } -} - -// Test that message data and delivery properties are updated properly. -QPID_AUTO_TEST_CASE(testUpdateMessages) { - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c0(cluster[0], "c0"); - - // Create messages with different delivery properties - c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q"); - c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag), - arg::destination="amq.fanout"); - - while (c0.session.queueQuery("q").getMessageCount() != 2) - sys::usleep(1000); // Wait for message to show up on broker 0. - - // Add a new broker, it will catch up. - cluster.add(); - - // Do some work post-add - c0.session.queueDeclare("p", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=makeMessage("pfoo","p", durableFlag)); - - // Do some work post-join - BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u); - c0.session.messageTransfer(arg::content=makeMessage("pbar","p", durableFlag)); - - // Verify new brokers have state. - Message m; - - Client c1(cluster[1], "c1"); - - BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); - BOOST_CHECK_EQUAL(m.getData(), "foo"); - BOOST_CHECK(m.getDeliveryProperties().hasExchange()); - BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), ""); - BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT)); - BOOST_CHECK_EQUAL(m.getData(), "bar"); - BOOST_CHECK(m.getDeliveryProperties().hasExchange()); - BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout"); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - - // Add another broker, don't wait for join - should be stalled till ready. - cluster.add(); - Client c2(cluster[2], "c2"); - BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); - BOOST_CHECK_EQUAL(m.getData(), "pfoo"); - BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT)); - BOOST_CHECK_EQUAL(m.getData(), "pbar"); - BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u); -} - -QPID_AUTO_TEST_CASE(testWiringReplication) { - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(3, args, -1); - Client c0(cluster[0]); - BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); - BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); - c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.session.exchangeDeclare("ex", arg::type="direct"); - c0.session.close(); - c0.connection.close(); - // Verify all brokers get wiring update. - for (size_t i = 0; i < cluster.size(); ++i) { - BOOST_MESSAGE("i == "<< i); - Client c(cluster[i]); - BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue()); - BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType()); - } -} - -QPID_AUTO_TEST_CASE(testMessageEnqueue) { - // Enqueue on one broker, dequeue on another. - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(2, args, -1); - Client c0(cluster[0]); - c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); - c0.session.close(); - Client c1(cluster[1]); - Message msg; - BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); - BOOST_CHECK_EQUAL(string("foo"), msg.getData()); - BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT)); - BOOST_CHECK_EQUAL(string("bar"), msg.getData()); -} - -QPID_AUTO_TEST_CASE(testMessageDequeue) { - // Enqueue on one broker, dequeue on two others. - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(3, args, -1); - Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); - c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); - - Message msg; - - // Dequeue on 2 others, ensure correct order. - Client c1(cluster[1], "c1"); - BOOST_CHECK(c1.subs.get(msg, "q")); - BOOST_CHECK_EQUAL("foo", msg.getData()); - - Client c2(cluster[2], "c2"); - BOOST_CHECK(c1.subs.get(msg, "q")); - BOOST_CHECK_EQUAL("bar", msg.getData()); - - // Queue should be empty on all cluster members. - BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); - BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); - BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); -} - -QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(3, args, -1); - Client c0(cluster[0]); - BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers. - - // First start a subscription. - c0.session.queueDeclare("q", arg::durable=durableFlag); - c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2)); - - // Now send messages - Client c1(cluster[1]); - c1.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag)); - c1.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag)); - - // Check they arrived - Message m; - BOOST_CHECK(c0.lq.get(m, TIMEOUT)); - BOOST_CHECK_EQUAL("foo", m.getData()); - BOOST_CHECK(c0.lq.get(m, TIMEOUT)); - BOOST_CHECK_EQUAL("bar", m.getData()); - - // Queue should be empty on all cluster members. - Client c2(cluster[2]); - BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount()); - BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount()); - BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount()); -} - -QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie) -{ - /* - Start with a single broker. - Set up two queues: one durable, and one not. - Add a new broker to the cluster. - Make sure it has one durable and one non-durable queue. - */ - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c0(cluster[0]); - c0.session.queueDeclare("durable_queue", arg::durable=true); - c0.session.queueDeclare("non_durable_queue", arg::durable=false); - cluster.add(); - Client c1(cluster[1]); - QueueQueryResult durable_query = c1.session.queueQuery ( "durable_queue" ); - QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" ); - BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue")); - BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue")); - - BOOST_CHECK_EQUAL ( durable_query.getDurable(), true ); - BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false ); -} - - -QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover) -{ - - struct Sender : FailoverManager::Command - { - std::string queue; - std::string content; - - Sender(const std::string& q, const std::string& c) : queue(q), content(c) {} - - void execute(AsyncSession& session, bool) - { - session.messageTransfer(arg::content=makeMessage(content, queue, durableFlag)); - } - }; - - struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable - { - FailoverManager& mgr; - std::string queue; - std::string expectedContent; - qpid::client::Subscription subscription; - qpid::sys::Monitor lock; - bool ready, failed; - - Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false), failed(false) {} - - void received(Message& message) - { - BOOST_CHECK_EQUAL(expectedContent, message.getData()); - subscription.cancel(); - } - - void execute(AsyncSession& session, bool) - { - session.queueDeclare(arg::queue=queue, arg::durable=durableFlag); - SubscriptionManager subs(session); - subscription = subs.subscribe(*this, queue); - session.sync(); - setReady(); - subs.run(); - //cleanup: - session.queueDelete(arg::queue=queue); - } - - void run() - { - try { - mgr.execute(*this); - } - catch (const std::exception& e) { - BOOST_MESSAGE("Exception in mgr.execute: " << e.what()); - failed = true; - } - } - - void waitForReady() - { - qpid::sys::Monitor::ScopedLock l(lock); - while (!ready) { - lock.wait(); - } - } - - void setReady() - { - qpid::sys::Monitor::ScopedLock l(lock); - ready = true; - lock.notify(); - } - }; - - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(2, args, -1); - ConnectionSettings settings; - settings.port = cluster[1]; - settings.heartbeat = 1; - FailoverManager fmgr(settings); - Sender sender("my-queue", "my-data"); - Receiver receiver(fmgr, "my-queue", "my-data"); - qpid::sys::Thread runner(receiver); - receiver.waitForReady(); - { - ScopedSuppressLogging allQuiet; // suppress connection closed messages - cluster.kill(1); - //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection: - ::usleep(2*1000*1000); - } - fmgr.execute(sender); - runner.join(); - BOOST_CHECK(!receiver.failed); - fmgr.close(); -} - -QPID_AUTO_TEST_CASE(testPolicyUpdate) { - //tests that the policys internal state is accurate on newly - //joined nodes - ClusterFixture::Args args; - args += "--log-enable", "critical"; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c1(cluster[0], "c1"); - { - ScopedSuppressLogging allQuiet; - QueueOptions options; - options.setSizePolicy(REJECT, 0, 2); - c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag)); - cluster.add(); - Client c2(cluster[1], "c2"); - c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag)); - - BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException); - - Message received; - BOOST_CHECK(c1.subs.get(received, "q")); - BOOST_CHECK_EQUAL(received.getData(), std::string("one")); - BOOST_CHECK(c1.subs.get(received, "q")); - BOOST_CHECK_EQUAL(received.getData(), std::string("two")); - BOOST_CHECK(!c1.subs.get(received, "q")); - } -} - -QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) { - //tests that exclusive queues are accurately replicated on newly - //joined nodes - ClusterFixture::Args args; - args += "--log-enable", "critical"; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c1(cluster[0], "c1"); - { - ScopedSuppressLogging allQuiet; - c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout"); - cluster.add(); - Client c2(cluster[1], "c2"); - QueueQueryResult result = c2.session.queueQuery("q"); - BOOST_CHECK_EQUAL(result.getQueue(), std::string("q")); - BOOST_CHECK(result.getExclusive()); - BOOST_CHECK(result.getAutoDelete()); - BOOST_CHECK(!result.getDurable()); - BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout")); - BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException); - c1.session.close(); - c1.connection.close(); - c2.session = c2.connection.newSession(); - BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException); - } -} - -/** - * Subscribes to specified queue and acquires up to the specified - * number of message but does not accept or release them. These - * message are therefore 'locked' by the clients session. - */ -Subscription lockMessages(Client& client, const std::string& queue, int count) -{ - LocalQueue q; - SubscriptionSettings settings(FlowControl::messageCredit(count)); - settings.autoAck = 0; - Subscription sub = client.subs.subscribe(q, queue, settings); - client.session.messageFlush(sub.getName()); - return sub; -} - -/** - * check that the specified queue contains the expected set of - * messages (matched on content) for all nodes in the cluster - */ -void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector<std::string>& messages) -{ - for (size_t i = 0; i < cluster.size(); i++) { - Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str()); - BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages); - client.close(); - } -} - -void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m", - const std::string& lvqKey="") -{ - for (int i = 0; i < count; i++) { - Message message = makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag); - if (!lvqKey.empty()) message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqKey); - client.session.messageTransfer(arg::content=message); - } -} - -QPID_AUTO_TEST_CASE(testRingQueueUpdate) { - //tests that ring queues are accurately replicated on newly - //joined nodes - ClusterFixture::Args args; - args += "--log-enable", "critical"; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c1(cluster[0], "c1"); - { - ScopedSuppressLogging allQuiet; - QueueOptions options; - options.setSizePolicy(RING, 0, 5); - c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - send(c1, "q", 5); - lockMessages(c1, "q", 1); - //add new node - cluster.add(); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined - //send one more message - send(c1, "q", 1, 6); - //release locked message - c1.close(); - //check state of queue on both nodes - checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); - } -} - -QPID_AUTO_TEST_CASE(testRingQueueUpdate2) { - //tests that ring queues are accurately replicated on newly joined - //nodes; just like testRingQueueUpdate, but new node joins after - //the sixth message has been sent. - ClusterFixture::Args args; - args += "--log-enable", "critical"; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c1(cluster[0], "c1"); - { - ScopedSuppressLogging allQuiet; - QueueOptions options; - options.setSizePolicy(RING, 0, 5); - c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - send(c1, "q", 5); - lockMessages(c1, "q", 1); - //send sixth message - send(c1, "q", 1, 6); - //add new node - cluster.add(); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined - //release locked message - c1.close(); - //check state of queue on both nodes - checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6")); - } -} - -QPID_AUTO_TEST_CASE(testLvqUpdate) { - //tests that lvqs are accurately replicated on newly joined nodes - ClusterFixture::Args args; - args += "--log-enable", "critical"; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c1(cluster[0], "c1"); - { - ScopedSuppressLogging allQuiet; - QueueOptions options; - options.setOrdering(LVQ); - c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - - send(c1, "q", 5, 1, "a", "a"); - send(c1, "q", 2, 1, "b", "b"); - send(c1, "q", 1, 1, "c", "c"); - send(c1, "q", 1, 3, "b", "b"); - - //add new node - cluster.add(); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined - - //check state of queue on both nodes - checkQueue(cluster, "q", list_of<string>("a_5")("b_3")("c_1")); - } -} - - -QPID_AUTO_TEST_CASE(testBrowsedLvqUpdate) { - //tests that lvqs are accurately replicated on newly joined nodes - //if the lvq state has been affected by browsers - ClusterFixture::Args args; - args += "--log-enable", "critical"; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c1(cluster[0], "c1"); - { - ScopedSuppressLogging allQuiet; - QueueOptions options; - options.setOrdering(LVQ); - c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag); - - send(c1, "q", 1, 1, "a", "a"); - send(c1, "q", 2, 1, "b", "b"); - send(c1, "q", 1, 1, "c", "c"); - checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")); - send(c1, "q", 4, 2, "a", "a"); - send(c1, "q", 1, 3, "b", "b"); - - //add new node - cluster.add(); - BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined - - //check state of queue on both nodes - checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")("a_5")("b_3")); - } -} - -QPID_AUTO_TEST_CASE(testRelease) { - //tests that releasing a messages that was unacked when one node - //joined works correctly - ClusterFixture::Args args; - args += "--log-enable", "critical"; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c1(cluster[0], "c1"); - { - ScopedSuppressLogging allQuiet; - c1.session.queueDeclare("q", arg::durable=durableFlag); - for (int i = 0; i < 5; i++) { - c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag)); - } - //receive but don't ack a message - LocalQueue lq; - SubscriptionSettings lqSettings(FlowControl::messageCredit(1)); - lqSettings.autoAck = 0; - Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings); - c1.session.messageFlush("q"); - Message received; - BOOST_CHECK(lq.get(received)); - BOOST_CHECK_EQUAL(received.getData(), std::string("m_1")); - - //add new node - cluster.add(); - - lqSub.release(lqSub.getUnaccepted()); - - //check state of queue on both nodes - vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5"); - Client c3(cluster[0], "c3"); - BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected); - Client c2(cluster[1], "c2"); - BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected); - } -} - - -// Browse for 1 message with byte credit, return true if a message was -// received false if not. -bool browseByteCredit(Client& c, const string& q, int n, Message& m) { - SubscriptionSettings browseSettings( - FlowControl(1, n, false), // 1 message, n bytes credit, no window - ACCEPT_MODE_NONE, - ACQUIRE_MODE_NOT_ACQUIRED, - 0 // No auto-ack. - ); - LocalQueue lq; - Subscription s = c.subs.subscribe(lq, q, browseSettings); - c.session.messageFlush(arg::destination=q, arg::sync=true); - c.session.sync(); - c.subs.getSubscription(q).cancel(); - return lq.get(m, 0); // No timeout, flush should push message thru. -} - -// Ensure cluster update preserves exact message size, use byte credt as test. -QPID_AUTO_TEST_CASE(testExactByteCredit) { - ClusterFixture cluster(1, prepareArgs(), -1); - Client c0(cluster[0], "c0"); - c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=Message("MyMessage", "q")); - cluster.add(); - - int size=36; // Size of message on broker: headers+body - Client c1(cluster[1], "c1"); - Message m; - - // Ensure we get the message with exact credit. - BOOST_CHECK(browseByteCredit(c0, "q", size, m)); - BOOST_CHECK(browseByteCredit(c1, "q", size, m)); - // and not with one byte less. - BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m)); - BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m)); -} - -// Test that consumer positions are updated correctly. -// Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=541927 -// -QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) { - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c0(cluster[0], "c0"); - - c0.session.queueDeclare("q", arg::durable=durableFlag); - SubscriptionSettings settings; - settings.autoAck = 0; - // Set the acquire mode to 'not-acquired' the consumer moves along the queue - // but does not acquire (remove) messages. - settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED; - Subscription s = c0.subs.subscribe(c0.lq, "q", settings); - c0.session.messageTransfer(arg::content=makeMessage("1", "q", durableFlag)); - BOOST_CHECK_EQUAL("1", c0.lq.get(TIMEOUT).getData()); - - // Add another member, send/receive another message and acquire - // the messages. With the bug, this creates an inconsistency - // because the browse position was not updated to the new member. - cluster.add(); - c0.session.messageTransfer(arg::content=makeMessage("2", "q", durableFlag)); - BOOST_CHECK_EQUAL("2", c0.lq.get(TIMEOUT).getData()); - s.acquire(s.getUnacquired()); - s.accept(s.getUnaccepted()); - - // In the bug we now have 0 messages on cluster[0] and 1 message on cluster[1] - // Subscribing on cluster[1] provokes an error that shuts down cluster[0] - Client c1(cluster[1], "c1"); - Subscription s1 = c1.subs.subscribe(c1.lq, "q"); // Default auto-ack=1 - Message m; - BOOST_CHECK(!c1.lq.get(m, TIMEOUT/10)); - BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u); - BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); -} - -QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) { - ClusterFixture::Args args; - prepareArgs(args, durableFlag); - ClusterFixture cluster(1, args, -1); - Client c0(cluster[0], "c0"); - - FieldTable arguments; - arguments.setInt("x-qpid-priorities", 10); - arguments.setInt("x-qpid-fairshare", 5); - c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments); - - //send messages of different priorities - for (int i = 0; i < 20; i++) { - Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag); - msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5); - c0.session.messageTransfer(arg::content=msg); - } - - //pull off a couple of the messages (first four should be the top priority messages - for (int i = 0; i < 4; i++) { - BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData()); - } - - // Add another member - cluster.add(); - Client c1(cluster[1], "c1"); - - //pull off some more messages - BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData()); - BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData()); - BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData()); - - //check queue has same content on both nodes - BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12)); -} - -QPID_AUTO_TEST_SUITE_END() -}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py deleted file mode 100755 index 22f2470590..0000000000 --- a/qpid/cpp/src/tests/cluster_test_logs.py +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env python - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Functions for comparing broker log files, used by cluster_tests.py. - -import os, os.path, re, glob -from itertools import izip - -def split_log(log): - """Split a broker log at checkpoints where a member joins. - Return the set of checkpoints discovered.""" - checkpoint_re = re.compile("Member joined, frameSeq=([0-9]+), queue snapshot:") - outfile = None - checkpoints = [] - for l in open(log): - match = checkpoint_re.search(l) - if match: - checkpoint = match.groups()[0] - checkpoints.append(checkpoint) - if outfile: outfile.close() - outfile = open("%s.%s"%(log, checkpoint), 'w') - - if outfile: outfile.write(l) - if outfile: outfile.close() - return checkpoints - -def filter_log(log): - """Filter the contents of a log file to remove data that is expected - to differ between brokers in a cluster. Filtered log contents between - the same checkpoints should match across the cluster.""" - out = open("%s.filter"%(log), 'w') - # Lines to skip entirely, expected differences - skip = "|".join([ - 'local connection', # Only on local broker - 'UPDATER|UPDATEE', # Ignore update process - 'stall for update|unstall, ignore update|cancelled offer .* unstall', - 'caught up', - 'active for links|Passivating links|Activating links', - 'info Connecting: .*', # UpdateClient connection - 'info Connection.* connected to', # UpdateClient connection - 'warning Connection \\[[-0-9.: ]+\\] closed', # UpdateClient connection - 'warning Broker closed connection: 200, OK', - 'task late', - 'task overran', - 'warning CLOSING .* unsent data', - 'Inter-broker link ', # ignore link state changes - 'Updated link key from ', # ignore link state changes - 'Running in a cluster, marking store', - 'debug Sending keepalive signal to watchdog', # Watchdog timer thread - 'last broker standing joined by 1 replicas, updating queue policies.', - 'Connection .* timed out: closing', # heartbeat connection close - "org.apache.qpid.broker:bridge:", # ignore bridge index - "closed connection" - ]) - # Regex to match a UUID - uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w' - # Substitutions to remove expected differences - subs = [ - (r'\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d ', ''), # Remove timestamp - (r'cluster\([0-9.: ]*', 'cluster('), # Remove cluster node id - (r' local\)| shadow\)', ')'), # Remove local/shadow indication - (r'CATCHUP', 'READY'), # Treat catchup as equivalent to ready. - (r'OFFER', 'READY'), # Treat offer as equivalent to ready. - # System UUID expected to be different - (r'(org.apache.qpid.broker:system[:(])%s(\)?)'%(uuid), r'\1UUID\2'), - - # TODO aconway 2010-12-20: review if these should be expected: - (r' len=\d+', ' len=NN'), # buffer lengths - (r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name - (r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs - ] - # Substitutions to mask known issue: durable test shows inconsistent "changed stats for com.redhat.rhm.store:journal" messages. - skip += '|Changed V[12] statistics com.redhat.rhm.store:journal' - subs += [(r'to=console.obj.1.0.com.redhat.rhm.store.journal props=\d+ stats=\d+', - 'to=console.obj.1.0.com.redhat.rhm.store.journal props=NN stats=NN')] - - skip_re = re.compile(skip) - subs = [(re.compile(pattern), subst) for pattern, subst in subs] - for l in open(log): - if skip_re.search(l): continue - for pattern,subst in subs: l = re.sub(pattern,subst,l) - out.write(l) - out.close() - -def verify_logs(): - """Compare log files from cluster brokers, verify that they correspond correctly.""" - for l in glob.glob("*.log"): filter_log(l) - checkpoints = set() - for l in glob.glob("*.filter"): checkpoints = checkpoints.union(set(split_log(l))) - errors=[] - for c in checkpoints: - fragments = glob.glob("*.filter.%s"%(c)) - fragments.sort(reverse=True, key=os.path.getsize) - while len(fragments) >= 2: - a = fragments.pop(0) - b = fragments[0] - for ab in izip(open(a), open(b)): - if ab[0] != ab[1]: - errors.append("\n %s %s"%(a, b)) - break - if errors: - raise Exception("Files differ in %s"%(os.getcwd())+"".join(errors)) - -# Can be run as a script. -if __name__ == "__main__": - verify_logs() diff --git a/qpid/cpp/src/tests/cluster_test_scripts/README.txt b/qpid/cpp/src/tests/cluster_test_scripts/README.txt deleted file mode 100644 index e861a2f397..0000000000 --- a/qpid/cpp/src/tests/cluster_test_scripts/README.txt +++ /dev/null @@ -1,20 +0,0 @@ -Cluster test scripts. - -A set of scripts to start and stop cluster and test clients on -multiple hosts using ssh. - -Pre-requisites: You must be - - set up for password-free ssh access to the test hosts. - - a member of the ais group on all the test hosts. - -Configuration: - -Copy defaults.sh to config.sh and edit the values as necessary. - -Test scripts: - -Test scripts use the functions in functions.sh to start & monitor -cluster and clients. -A test script can collect other scripts. - - diff --git a/qpid/cpp/src/tests/cluster_test_scripts/cluster_check b/qpid/cpp/src/tests/cluster_test_scripts/cluster_check deleted file mode 100755 index 05fcc1bcd2..0000000000 --- a/qpid/cpp/src/tests/cluster_test_scripts/cluster_check +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/sh - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Check that all members of a cluster are running - -source config.sh - -HOSTS=(`cat $CLUSTER_HOME/hosts`) -PORTS=(`cat $CLUSTER_HOME/ports`) - -for ((i=0; i<${#HOSTS[*]}; ++i)); do - host=${HOSTS[$i]} - port=${PORTS[$i]} - ssh $host "$QPIDD -cp $port" > /dev/null || { - ret=1 - echo "ERROR: broker not running $host:$port" - } -done -exit $ret diff --git a/qpid/cpp/src/tests/cluster_test_scripts/cluster_start b/qpid/cpp/src/tests/cluster_test_scripts/cluster_start deleted file mode 100755 index 8911358f7e..0000000000 --- a/qpid/cpp/src/tests/cluster_test_scripts/cluster_start +++ /dev/null @@ -1,56 +0,0 @@ -#!/bin/sh - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Start a cluster -# -# Arguments: NAME HOST [host...] -# Start a cluster called NAME with N nodes running on the given HOSTs -# repeat the host name to run multiple brokers on one host. Use dynamic -# ports. -# -# Log files, data directories and hosts/ports files are all stored under -# $HOME/cluster_test/$NAME -# - -source config.sh - -CLUSTER_NAME=`date +"${USER}_%F_%T"` -HOSTS=($BROKER_HOSTS) -for ((i = 0; i < ${#HOSTS[*]}; ++i)) ; do - host=${HOSTS[$i]} - datadir=$CLUSTER_HOME/broker$i - log=$datadir/qpidd.log - ssh $host "rm -rf $datadir; mkdir -p $datadir" || { - echo "ERROR: can't make data dir $datadir"; exit 1 - } - port=`ssh $host "echo $QPIDD -dp0 --cluster-name=$CLUSTER_NAME \ - --data-dir=$datadir \ - --log-to-file=$log --log-prefix=broker$i \ - $QPIDD_OPTS | newgrp ais"` || { - error "ERROR: can't start broker $i on $host"; exit 1; - } - PORTS="$PORTS $port" -done - -echo "$BROKER_HOSTS" > $CLUSTER_HOME/hosts -echo "$PORTS" > $CLUSTER_HOME/ports - -`dirname $0`/cluster_check $NAME diff --git a/qpid/cpp/src/tests/cluster_test_scripts/cluster_stop b/qpid/cpp/src/tests/cluster_test_scripts/cluster_stop deleted file mode 100755 index 09aa8f3b21..0000000000 --- a/qpid/cpp/src/tests/cluster_test_scripts/cluster_stop +++ /dev/null @@ -1,38 +0,0 @@ -#!/bin/sh - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Stop the cluster. - -source config.sh - -HOSTS=(`cat $CLUSTER_HOME/hosts`) -PORTS=(`cat $CLUSTER_HOME/ports`) - -for ((i=0; i<${#HOSTS[*]}; ++i)); do - host=${HOSTS[$i]} - port=${PORTS[$i]} - ssh $host "$QPIDD -qp $port" > /dev/null || { - ret=1 - echo "ERROR: stopping broker at $host:$port" - } -done - -exit $ret diff --git a/qpid/cpp/src/tests/cluster_test_scripts/config_example.sh b/qpid/cpp/src/tests/cluster_test_scripts/config_example.sh deleted file mode 100755 index d47c9a9c77..0000000000 --- a/qpid/cpp/src/tests/cluster_test_scripts/config_example.sh +++ /dev/null @@ -1,44 +0,0 @@ -# Cluster configuration. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# All output stored under $HOME/$CLUSTER_HOME. -CLUSTER_HOME=$HOME/cluster_test - -# Hosts where brokers will be run. Repeat hostname to run multiple brokers on 1 host. -BROKER_HOSTS="mrg22 mrg23 mrg24 mrg25 mrg26" - -# Hosts where clients will be run. -CLIENT_HOSTS="$BROKER_HOSTS" - -# Paths to executables -QPIDD=qpidd -PERFTEST=perftest - -# Directory containing tests -TESTDIR=/usr/bin - -# Options for qpidd, must be sufficient to load the cluster plugin. -# Scripts will add --cluster-name, --daemon, --port and --log-to-file options here. -QPIDD_OPTS=" \ ---auth=no \ ---log-enable=notice+ \ ---log-enable=debug+:cluster \ -" diff --git a/qpid/cpp/src/tests/cluster_test_scripts/perftest b/qpid/cpp/src/tests/cluster_test_scripts/perftest deleted file mode 100755 index 984761eb5f..0000000000 --- a/qpid/cpp/src/tests/cluster_test_scripts/perftest +++ /dev/null @@ -1,54 +0,0 @@ -#!/bin/sh - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Run a distributed perftest against a cluster. -# Args: npubs nsubs [perftest-options] - -source config.sh - -NPUBS=${1:-4} ; shift -NSUBS=${1:-4} ; shift -OPTS="--npubs $NPUBS --nsubs $NSUBS $*" - -CLIENTS=($CLIENT_HOSTS) -BROKERS=(`cat $CLUSTER_HOME/hosts`) -PORTS=(`cat $CLUSTER_HOME/ports`) - -start() { - client=${CLIENTS[i % ${#CLIENTS[*]}]} - broker=${BROKERS[i % ${#BROKERS[*]}]} - port=${PORTS[i % ${#PORTS[*]}]} - ssh -n $client $PERFTEST $OPTS $* -b $broker -p $port & - PIDS="$PIDS $!" -} - -ssh ${CLIENTS[0]} $PERFTEST $OPTS --setup -b ${BROKERS[0]} -p${PORTS[0]} -for (( i=0 ; i < $NPUBS ; ++i)); do start --publish; done -for (( ; i < $NPUBS+$NSUBS ; ++i)); do start --subscribe; done -ssh ${CLIENTS[0]} $PERFTEST $OPTS --control -b ${BROKERS[0]} -p${PORTS[0]} - -for pid in $PIDS; do - wait $pid || echo "ERROR: client process $pid failed" -done - -`dirname $0`/cluster_check - - diff --git a/qpid/cpp/src/tests/cluster_tests.fail b/qpid/cpp/src/tests/cluster_tests.fail deleted file mode 100644 index b28b04f643..0000000000 --- a/qpid/cpp/src/tests/cluster_tests.fail +++ /dev/null @@ -1,3 +0,0 @@ - - - diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py deleted file mode 100755 index 3c96b252df..0000000000 --- a/qpid/cpp/src/tests/cluster_tests.py +++ /dev/null @@ -1,1834 +0,0 @@ -#!/usr/bin/env python - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os, signal, sys, time, imp, re, subprocess, glob, random, logging -import cluster_test_logs -from qpid import datatypes, messaging -from brokertest import * -from qpid.harness import Skipped -from qpid.messaging import Message, Empty, Disposition, REJECTED, util -from threading import Thread, Lock, Condition -from logging import getLogger -from itertools import chain -from tempfile import NamedTemporaryFile - -log = getLogger("qpid.cluster_tests") - -# Note: brokers that shut themselves down due to critical error during -# normal operation will still have an exit code of 0. Brokers that -# shut down because of an error found during initialize will exit with -# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK -# and EXPECT_EXIT_FAIL in some of the tests below. - -# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error -# should give non-0 exit status. - -# Import scripts as modules -qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC")) - -def readfile(filename): - """Returns te content of file named filename as a string""" - f = file(filename) - try: return f.read() - finally: f.close() - -class ShortTests(BrokerTest): - """Short cluster functionality tests.""" - - def test_message_replication(self): - """Test basic cluster message replication.""" - # Start a cluster, send some messages to member 0. - cluster = self.cluster(2) - s0 = cluster[0].connect().session() - s0.sender("q; {create:always}").send(Message("x")) - s0.sender("q; {create:always}").send(Message("y")) - s0.connection.close() - - # Verify messages available on member 1. - s1 = cluster[1].connect().session() - m = s1.receiver("q", capacity=1).fetch(timeout=1) - s1.acknowledge() - self.assertEqual("x", m.content) - s1.connection.close() - - # Start member 2 and verify messages available. - s2 = cluster.start().connect().session() - m = s2.receiver("q", capacity=1).fetch(timeout=1) - s2.acknowledge() - self.assertEqual("y", m.content) - s2.connection.close() - - def test_store_direct_update_match(self): - """Verify that brokers stores an identical message whether they receive it - direct from clients or during an update, no header or other differences""" - cluster = self.cluster(0, args=["--load-module", self.test_store_lib]) - cluster.start(args=["--test-store-dump", "direct.dump"]) - # Try messages with various headers - cluster[0].send_message("q", Message(durable=True, content="foobar", - subject="subject", - reply_to="reply_to", - properties={"n":10})) - # Try messages of different sizes - for size in range(0,10000,100): - cluster[0].send_message("q", Message(content="x"*size, durable=True)) - # Try sending via named exchange - c = cluster[0].connect_old() - s = c.session(str(qpid.datatypes.uuid4())) - s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q") - props = s.delivery_properties(routing_key="foo", delivery_mode=2) - s.message_transfer( - destination="amq.direct", - message=qpid.datatypes.Message(props, "content")) - - # Try message with TTL and differnet headers/properties - cluster[0].send_message("q", Message(durable=True, ttl=100000)) - cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000)) - cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000)) - - # Now update a new member and compare their dumps. - cluster.start(args=["--test-store-dump", "updatee.dump"]) - assert readfile("direct.dump") == readfile("updatee.dump") - - os.remove("direct.dump") - os.remove("updatee.dump") - - def test_sasl(self): - """Test SASL authentication and encryption in a cluster""" - sasl_config=os.path.join(self.rootdir, "sasl_config") - acl=os.path.join(os.getcwd(), "policy.acl") - aclf=file(acl,"w") - # Must allow cluster-user (zag) access to credentials exchange. - aclf.write(""" -acl allow zag@QPID publish exchange name=qpid.cluster-credentials -acl allow zig@QPID all all -acl deny all all -""") - aclf.close() - cluster = self.cluster(1, args=["--auth", "yes", - "--sasl-config", sasl_config, - "--load-module", os.getenv("ACL_LIB"), - "--acl-file", acl, - "--cluster-username=zag", - "--cluster-password=zag", - "--cluster-mechanism=PLAIN" - ]) - - # Valid user/password, ensure queue is created. - c = cluster[0].connect(username="zig", password="zig") - c.session().sender("ziggy;{create:always,node:{x-declare:{exclusive:true}}}") - c.close() - cluster.start() # Start second node. - - # Check queue is created on second node. - c = cluster[1].connect(username="zig", password="zig") - c.session().receiver("ziggy;{assert:always}") - c.close() - for b in cluster: b.ready() # Make sure all brokers still running. - - # Valid user, bad password - try: - cluster[0].connect(username="zig", password="foo").close() - self.fail("Expected exception") - except messaging.exceptions.ConnectionError: pass - for b in cluster: b.ready() # Make sure all brokers still running. - - # Bad user ID - try: - cluster[0].connect(username="foo", password="bar").close() - self.fail("Expected exception") - except messaging.exceptions.ConnectionError: pass - for b in cluster: b.ready() # Make sure all brokers still running. - - # Action disallowed by ACL - c = cluster[0].connect(username="zag", password="zag") - try: - s = c.session() - s.sender("zaggy;{create:always}") - s.close() - self.fail("Expected exception") - except messaging.exceptions.UnauthorizedAccess: pass - # make sure the queue was not created at the other node. - c = cluster[1].connect(username="zig", password="zig") - try: - s = c.session() - s.sender("zaggy;{assert:always}") - s.close() - self.fail("Expected exception") - except messaging.exceptions.NotFound: pass - - def test_sasl_join_good(self): - """Verify SASL authentication between brokers when joining a cluster.""" - sasl_config=os.path.join(self.rootdir, "sasl_config") - # Test with a valid username/password - cluster = self.cluster(1, args=["--auth", "yes", - "--sasl-config", sasl_config, - "--cluster-username=zig", - "--cluster-password=zig", - "--cluster-mechanism=PLAIN" - ]) - cluster.start() - c = cluster[1].connect(username="zag", password="zag", mechanism="PLAIN") - - def test_sasl_join_bad_password(self): - # Test with an invalid password - cluster = self.cluster(1, args=["--auth", "yes", - "--sasl-config", os.path.join(self.rootdir, "sasl_config"), - "--cluster-username=zig", - "--cluster-password=bad", - "--cluster-mechanism=PLAIN" - ]) - cluster.start(wait=False, expect=EXPECT_EXIT_FAIL) - assert cluster[1].log_contains("critical Unexpected error: connection-forced: Authentication failed") - - def test_sasl_join_wrong_user(self): - # Test with a valid user that is not the cluster user. - cluster = self.cluster(0, args=["--auth", "yes", - "--sasl-config", os.path.join(self.rootdir, "sasl_config")]) - cluster.start(args=["--cluster-username=zig", - "--cluster-password=zig", - "--cluster-mechanism=PLAIN" - ]) - - cluster.start(wait=False, expect=EXPECT_EXIT_FAIL, - args=["--cluster-username=zag", - "--cluster-password=zag", - "--cluster-mechanism=PLAIN" - ]) - assert cluster[1].log_contains("critical Unexpected error: unauthorized-access: unauthorized-access: Unauthorized user zag@QPID for qpid.cluster-credentials, should be zig") - - def test_user_id_update(self): - """Ensure that user-id of an open session is updated to new cluster members""" - sasl_config=os.path.join(self.rootdir, "sasl_config") - cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config, - "--cluster-mechanism=ANONYMOUS"]) - c = cluster[0].connect(username="zig", password="zig") - s = c.session().sender("q;{create:always}") - s.send(Message("x", user_id="zig")) # Message sent before start new broker - cluster.start() - s.send(Message("y", user_id="zig")) # Messsage sent after start of new broker - # Verify brokers are healthy and messages are on the queue. - self.assertEqual("x", cluster[0].get_message("q").content) - self.assertEqual("y", cluster[1].get_message("q").content) - - def test_other_mech(self): - """Test using a mechanism other than PLAIN/ANONYMOUS for cluster update authentication. - Regression test for https://issues.apache.org/jira/browse/QPID-3849""" - sasl_config=os.path.join(self.rootdir, "sasl_config") - cluster = self.cluster(2, args=["--auth", "yes", "--sasl-config", sasl_config, - "--cluster-username=zig", - "--cluster-password=zig", - "--cluster-mechanism=DIGEST-MD5"]) - cluster[0].connect() - cluster.start() # Before the fix this broker falied to join the cluster. - cluster[2].connect() - - def test_link_events(self): - """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543""" - args = ["--mgmt-pub-interval", 1] # Publish management information every second. - broker1 = self.cluster(1, args)[0] - broker2 = self.cluster(1, args)[0] - qp = self.popen(["qpid-printevents", broker1.host_port()], EXPECT_RUNNING) - qr = self.popen(["qpid-route", "route", "add", - broker1.host_port(), broker2.host_port(), - "amq.fanout", "key" - ], EXPECT_EXIT_OK) - # Look for link event in printevents output. - retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out"))) - broker1.ready() - broker2.ready() - qr.wait() - - def test_queue_cleaner(self): - """ Regression test to ensure that cleanup of expired messages works correctly """ - cluster = self.cluster(2, args=["--queue-purge-interval", 3]) - - s0 = cluster[0].connect().session() - sender = s0.sender("my-lvq; {create: always, node:{x-declare:{arguments:{'qpid.last_value_queue':1}}}}") - #send 10 messages that will all expire and be cleaned up - for i in range(1, 10): - msg = Message("message-%s" % i) - msg.properties["qpid.LVQ_key"] = "a" - msg.ttl = 0.1 - sender.send(msg) - #wait for queue cleaner to run - time.sleep(3) - - #test all is ok by sending and receiving a message - msg = Message("non-expiring") - msg.properties["qpid.LVQ_key"] = "b" - sender.send(msg) - s0.connection.close() - s1 = cluster[1].connect().session() - m = s1.receiver("my-lvq", capacity=1).fetch(timeout=1) - s1.acknowledge() - self.assertEqual("non-expiring", m.content) - s1.connection.close() - - for b in cluster: b.ready() # Make sure all brokers still running. - - - def test_amqfailover_visible(self): - """Verify that the amq.failover exchange can be seen by - QMF-based tools - regression test for BZ615300.""" - broker1 = self.cluster(1)[0] - broker2 = self.cluster(1)[0] - qs = subprocess.Popen(["qpid-stat", "-e", "-b", broker1.host_port()], stdout=subprocess.PIPE) - out = qs.communicate()[0] - assert out.find("amq.failover") > 0 - - def evaluate_address(self, session, address): - """Create a receiver just to evaluate an address for its side effects""" - r = session.receiver(address) - r.close() - - def test_expire_fanout(self): - """Regression test for QPID-2874: Clustered broker crashes in assertion in - cluster/ExpiryPolicy.cpp. - Caused by a fan-out message being updated as separate messages""" - cluster = self.cluster(1) - session0 = cluster[0].connect().session() - # Create 2 queues bound to fanout exchange. - self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}") - self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}") - queues = ["q1", "q2"] - # Send a fanout message with a long timeout - s = session0.sender("amq.fanout") - s.send(Message("foo", ttl=100), sync=False) - # Start a new member, check the messages - cluster.start() - session1 = cluster[1].connect().session() - for q in queues: self.assert_browse(session1, "q1", ["foo"]) - - def test_route_update(self): - """Regression test for https://issues.apache.org/jira/browse/QPID-2982 - Links and bridges associated with routes were not replicated on update. - This meant extra management objects and caused an exit if a management - client was attached. - """ - args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] - - # First broker will be killed. - cluster0 = self.cluster(1, args=args) - cluster1 = self.cluster(1, args=args) - assert 0 == subprocess.call( - ["qpid-route", "route", "add", cluster0[0].host_port(), - cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"]) - cluster0.start() - - # Wait for qpid-tool:list on cluster0[0] to generate expected output. - pattern = re.compile("org.apache.qpid.broker.*link") - qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()], - stdin=subprocess.PIPE, stdout=subprocess.PIPE) - class Scanner(Thread): - def __init__(self): self.found = False; Thread.__init__(self) - def run(self): - for l in qpid_tool.stdout: - if pattern.search(l): self.found = True; return - scanner = Scanner() - scanner.start() - start = time.time() - try: - # Wait up to 5 second timeout for scanner to find expected output - while not scanner.found and time.time() < start + 5: - qpid_tool.stdin.write("list\n") # Ask qpid-tool to list - for b in cluster0: b.ready() # Raise if any brokers are down - finally: - qpid_tool.stdin.write("quit\n") - qpid_tool.wait() - scanner.join() - assert scanner.found - # Regression test for https://issues.apache.org/jira/browse/QPID-3235 - # Inconsistent stats when changing elder. - - # Force a change of elder - cluster0.start() - for b in cluster0: b.ready() - cluster0[0].expect=EXPECT_EXIT_FAIL # About to die. - cluster0[0].kill() - time.sleep(2) # Allow a management interval to pass. - for b in cluster0[1:]: b.ready() - # Verify logs are consistent - cluster_test_logs.verify_logs() - - def test_redelivered(self): - """Verify that redelivered flag is set correctly on replayed messages""" - cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) - url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port()) - queue = "my-queue" - cluster[0].declare_queue(queue) - self.sender = self.popen( - ["qpid-send", - "--broker", url, - "--address", queue, - "--sequence=true", - "--send-eos=1", - "--messages=100000", - "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS) - ]) - self.receiver = self.popen( - ["qpid-receive", - "--broker", url, - "--address", queue, - "--ignore-duplicates", - "--check-redelivered", - "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS), - "--forever" - ]) - time.sleep(1)#give sender enough time to have some messages to replay - cluster[0].kill() - self.sender.wait() - self.receiver.wait() - cluster[1].kill() - - class BlockedSend(Thread): - """Send a message, send is expected to block. - Verify that it does block (for a given timeout), then allow - waiting till it unblocks when it is expected to do so.""" - def __init__(self, sender, msg): - self.sender, self.msg = sender, msg - self.blocked = True - self.condition = Condition() - self.timeout = 0.1 # Time to wait for expected results. - Thread.__init__(self) - def run(self): - try: - self.sender.send(self.msg, sync=True) - self.condition.acquire() - try: - self.blocked = False - self.condition.notify() - finally: self.condition.release() - except Exception,e: print "BlockedSend exception: %s"%e - def start(self): - Thread.start(self) - time.sleep(self.timeout) - assert self.blocked # Expected to block - def assert_blocked(self): assert self.blocked - def wait(self): # Now expecting to unblock - self.condition.acquire() - try: - while self.blocked: - self.condition.wait(self.timeout) - if self.blocked: raise Exception("Timed out waiting for send to unblock") - finally: self.condition.release() - self.join() - - def queue_flowlimit_test(self, brokers): - """Verify that the queue's flowlimit configuration and state are - correctly replicated. - The brokers argument allows this test to run on single broker, - cluster of 2 pre-startd brokers or cluster where second broker - starts after queue is in flow control. - """ - # configure a queue with a specific flow limit on first broker - ssn0 = brokers.first().connect().session() - s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") - brokers.first().startQmf() - q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - oid = q1.getObjectId() - self.assertEqual(q1.name, "flq") - self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert not q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 0) - - # fill the queue on one broker until flow control is active - for x in range(5): s0.send(Message(str(x))) - sender = ShortTests.BlockedSend(s0, Message(str(6))) - sender.start() # Tests that sender does block - # Verify the broker queue goes into a flowStopped state - deadline = time.time() + 1 - while not q1.flowStopped and time.time() < deadline: q1.update() - assert q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 1) - sender.assert_blocked() # Still blocked - - # Now verify the both brokers in cluster have same configuration - brokers.second().startQmf() - qs = brokers.second().qmf_session.getObjects(_objectId=oid) - self.assertEqual(len(qs), 1) - q2 = qs[0] - self.assertEqual(q2.name, "flq") - self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert q2.flowStopped - self.assertEqual(q2.flowStoppedCount, 1) - - # now drain the queue using a session to the other broker - ssn1 = brokers.second().connect().session() - r1 = ssn1.receiver("flq", capacity=6) - for x in range(4): - r1.fetch(timeout=0) - ssn1.acknowledge() - sender.wait() # Verify no longer blocked. - - # and re-verify state of queue on both brokers - q1.update() - assert not q1.flowStopped - q2.update() - assert not q2.flowStopped - - ssn0.connection.close() - ssn1.connection.close() - cluster_test_logs.verify_logs() - - def test_queue_flowlimit(self): - """Test flow limits on a standalone broker""" - broker = self.broker() - class Brokers: - def first(self): return broker - def second(self): return broker - self.queue_flowlimit_test(Brokers()) - - def test_queue_flowlimit_cluster(self): - cluster = self.cluster(2) - class Brokers: - def first(self): return cluster[0] - def second(self): return cluster[1] - self.queue_flowlimit_test(Brokers()) - - def test_queue_flowlimit_cluster_join(self): - cluster = self.cluster(1) - class Brokers: - def first(self): return cluster[0] - def second(self): - if len(cluster) == 1: cluster.start() - return cluster[1] - self.queue_flowlimit_test(Brokers()) - - def test_queue_flowlimit_replicate(self): - """ Verify that a queue which is in flow control BUT has drained BELOW - the flow control 'stop' threshold, is correctly replicated when a new - broker is added to the cluster. - """ - - class AsyncSender(Thread): - """Send a fixed number of msgs from a sender in a separate thread - so it may block without blocking the test. - """ - def __init__(self, broker, address, count=1, size=4): - Thread.__init__(self) - self.daemon = True - self.broker = broker - self.queue = address - self.count = count - self.size = size - self.done = False - - def run(self): - self.sender = subprocess.Popen(["qpid-send", - "--capacity=1", - "--content-size=%s" % self.size, - "--messages=%s" % self.count, - "--failover-updates", - "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS), - "--address=%s" % self.queue, - "--broker=%s" % self.broker.host_port()]) - self.sender.wait() - self.done = True - - cluster = self.cluster(2) - # create a queue with rather draconian flow control settings - ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}") - - # fire off the sending thread to broker[0], and wait until the queue - # hits flow control on broker[1] - sender = AsyncSender(cluster[0], "flq", count=110); - sender.start(); - - cluster[1].startQmf() - q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - deadline = time.time() + 10 - while not q_obj.flowStopped and time.time() < deadline: - q_obj.update() - assert q_obj.flowStopped - assert not sender.done - assert q_obj.msgDepth < 110 - - # Now drain enough messages on broker[1] to drop below the flow stop - # threshold, but not relieve flow control... - receiver = subprocess.Popen(["qpid-receive", - "--messages=15", - "--timeout=1", - "--print-content=no", - "--failover-updates", - "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS), - "--ack-frequency=1", - "--address=flq", - "--broker=%s" % cluster[1].host_port()]) - receiver.wait() - q_obj.update() - assert q_obj.flowStopped - assert not sender.done - current_depth = q_obj.msgDepth - - # add a new broker to the cluster, and verify that the queue is in flow - # control on that broker - cluster.start() - cluster[2].startQmf() - q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - assert q_obj.flowStopped - assert q_obj.msgDepth == current_depth - - # now drain the queue on broker[2], and verify that the sender becomes - # unblocked - receiver = subprocess.Popen(["qpid-receive", - "--messages=95", - "--timeout=1", - "--print-content=no", - "--failover-updates", - "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS), - "--ack-frequency=1", - "--address=flq", - "--broker=%s" % cluster[2].host_port()]) - receiver.wait() - q_obj.update() - assert not q_obj.flowStopped - self.assertEqual(q_obj.msgDepth, 0) - - # verify that the sender has become unblocked - sender.join(timeout=5) - assert not sender.isAlive() - assert sender.done - - def test_blocked_queue_delete(self): - """Verify that producers which are blocked on a queue due to flow - control are unblocked when that queue is deleted. - """ - - cluster = self.cluster(2) - cluster[0].startQmf() - cluster[1].startQmf() - - # configure a queue with a specific flow limit on first broker - ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") - q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - oid = q1.getObjectId() - self.assertEqual(q1.name, "flq") - self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert not q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 0) - - # fill the queue on one broker until flow control is active - for x in range(5): s0.send(Message(str(x))) - sender = ShortTests.BlockedSend(s0, Message(str(6))) - sender.start() # Tests that sender does block - # Verify the broker queue goes into a flowStopped state - deadline = time.time() + 1 - while not q1.flowStopped and time.time() < deadline: q1.update() - assert q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 1) - sender.assert_blocked() # Still blocked - - # Now verify the both brokers in cluster have same configuration - qs = cluster[1].qmf_session.getObjects(_objectId=oid) - self.assertEqual(len(qs), 1) - q2 = qs[0] - self.assertEqual(q2.name, "flq") - self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert q2.flowStopped - self.assertEqual(q2.flowStoppedCount, 1) - - # now delete the blocked queue from other broker - ssn1 = cluster[1].connect().session() - self.evaluate_address(ssn1, "flq;{delete:always}") - sender.wait() # Verify no longer blocked. - - ssn0.connection.close() - ssn1.connection.close() - cluster_test_logs.verify_logs() - - - def test_alternate_exchange_update(self): - """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """ - cluster = self.cluster(1) - s0 = cluster[0].connect().session() - # create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges - self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}") - # create direct exchange ex with alternate-exchange amq.fanout and no queues bound - self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}") - # create queue q with alternate-exchange amq.fanout - self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}") - - def verify(broker): - s = broker.connect().session() - # Verify unmatched message goes to ex's alternate. - s.sender("ex").send("foo") - self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content) - # Verify rejected message goes to q's alternate. - s.sender("q").send("bar") - msg = s.receiver("q").fetch(timeout=0) - self.assertEqual("bar", msg.content) - s.acknowledge(msg, Disposition(REJECTED)) # Reject the message - self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content) - - verify(cluster[0]) - cluster.start() - verify(cluster[1]) - - def test_binding_order(self): - """Regression test for binding order inconsistency in cluster""" - cluster = self.cluster(1) - c0 = cluster[0].connect() - s0 = c0.session() - # Declare multiple queues bound to same key on amq.topic - def declare(q,max=0): - if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d, "qpid.flow_stop_count":0}}'%max - else: declare = 'x-declare:{}' - bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q) - s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind)) - declare('d',max=4) # Only one with a limit - for q in ['c', 'b','a']: declare(q) - # Add a cluster member, send enough messages to exceed the max count - cluster.start() - try: - s = s0.sender('amq.topic/key') - for m in xrange(1,6): s.send(Message(str(m))) - self.fail("Expected capacity exceeded exception") - except messaging.exceptions.TargetCapacityExceeded: pass - c1 = cluster[1].connect() - s1 = c1.session() - s0 = c0.session() # Old session s0 is broken by exception. - # Verify queue contents are consistent. - for q in ['a','b','c','d']: - self.assertEqual(self.browse(s0, q), self.browse(s1, q)) - # Verify queue contents are "best effort" - for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)]) - self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)]) - - def test_deleted_exchange(self): - """QPID-3215: cached exchange reference can cause cluster inconsistencies - if exchange is deleted/recreated - Verify stand-alone case - """ - cluster = self.cluster() - # Verify we do not route message via an exchange that has been destroyed. - cluster.start() - s0 = cluster[0].connect().session() - self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") - self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") - send0 = s0.sender("ex/foo") - send0.send("foo") - self.assert_browse(s0, "q", ["foo"]) - self.evaluate_address(s0, "ex;{delete:always}") - try: - send0.send("bar") # Should fail, exchange is deleted. - self.fail("Expected not-found exception") - except qpid.messaging.NotFound: pass - self.assert_browse(cluster[0].connect().session(), "q", ["foo"]) - - def test_deleted_exchange_inconsistent(self): - """QPID-3215: cached exchange reference can cause cluster inconsistencies - if exchange is deleted/recreated - - Verify cluster inconsistency. - """ - cluster = self.cluster() - cluster.start() - s0 = cluster[0].connect().session() - self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") - self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") - send0 = s0.sender("ex/foo") - send0.send("foo") - self.assert_browse(s0, "q", ["foo"]) - - cluster.start() - s1 = cluster[1].connect().session() - self.evaluate_address(s0, "ex;{delete:always}") - try: - send0.send("bar") - self.fail("Expected not-found exception") - except qpid.messaging.NotFound: pass - - self.assert_browse(s1, "q", ["foo"]) - - - def test_ttl_consistent(self): - """Ensure we don't get inconsistent errors with message that have TTL very close together""" - messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)] - messages.append(Message("x")) - cluster = self.cluster(2) - sender = cluster[0].connect().session().sender("q;{create:always}") - - def fetch(b): - receiver = b.connect().session().receiver("q;{create:always}") - while receiver.fetch().content != "x": pass - - for m in messages: sender.send(m, sync=False) - for m in messages: sender.send(m, sync=False) - fetch(cluster[0]) - fetch(cluster[1]) - for m in messages: sender.send(m, sync=False) - cluster.start() - fetch(cluster[2]) - - - def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30): - """ Prove that traffic can pass between two federated brokers. - """ - tot_time = 0 - active = False - send_session = src_broker.connect().session() - sender = send_session.sender(src) - receive_session = dst_broker.connect().session() - receiver = receive_session.receiver(dst) - while not active and tot_time < timeout: - sender.send(Message("Hello from Source!")) - try: - receiver.fetch(timeout = 1) - receive_session.acknowledge() - # Get this far without Empty exception, and the link is good! - active = True - while True: - # Keep receiving msgs, as several may have accumulated - receiver.fetch(timeout = 1) - receive_session.acknowledge() - except Empty: - if not active: - tot_time += 1 - receiver.close() - receive_session.close() - sender.close() - send_session.close() - return active - - def test_federation_failover(self): - """ - Verify that federation operates across failures occuring in a cluster. - Specifically: - 1) Destination cluster learns of membership changes in the source - cluster - 2) Destination cluster replicates the current state of the source - cluster to newly-added members - """ - - # 2 node cluster source, 2 node cluster destination - src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) - src_cluster.ready(); - dst_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) - dst_cluster.ready(); - - cmd = self.popen(["qpid-config", - "--broker", src_cluster[0].host_port(), - "add", "queue", "srcQ"], EXPECT_EXIT_OK) - cmd.wait() - - cmd = self.popen(["qpid-config", - "--broker", dst_cluster[0].host_port(), - "add", "exchange", "fanout", "destX"], EXPECT_EXIT_OK) - cmd.wait() - - cmd = self.popen(["qpid-config", - "--broker", dst_cluster[0].host_port(), - "add", "queue", "destQ"], EXPECT_EXIT_OK) - cmd.wait() - - cmd = self.popen(["qpid-config", - "--broker", dst_cluster[0].host_port(), - "bind", "destX", "destQ"], EXPECT_EXIT_OK) - cmd.wait() - - # federate the srcQ to the destination exchange - dst_cluster[0].startQmf() - dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0] - result = dst_broker.connect(src_cluster[0].host(), src_cluster[0].port(), False, "PLAIN", - "guest", "guest", "tcp") - self.assertEqual(result.status, 0, result); - - link = dst_cluster[0].qmf_session.getObjects(_class="link")[0] - result = link.bridge(False, "srcQ", "destX", "", "", "", True, False, False, 10) - self.assertEqual(result.status, 0, result) - - # check that traffic passes - assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ") - - # add src[2] broker to source cluster - src_cluster.start(expect=EXPECT_EXIT_FAIL); - src_cluster.ready(); - assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") - - # Kill src[0]. dst[0] should fail over to src[1] - src_cluster[0].kill() - for b in src_cluster[1:]: b.ready() - assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ") - - # Kill src[1], dst[0] should fail over to src[2] - src_cluster[1].kill() - for b in src_cluster[2:]: b.ready() - assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") - - # Kill dest[0], force failover to dest[1] - dst_cluster[0].kill() - for b in dst_cluster[1:]: b.ready() - assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") - - # Add dest[2] - # dest[1] syncs dest[2] to current remote state - dst_cluster.start(expect=EXPECT_EXIT_FAIL); - for b in dst_cluster[1:]: b.ready() - assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") - - # Kill dest[1], force failover to dest[2] - dst_cluster[1].kill() - for b in dst_cluster[2:]: b.ready() - assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ") - - for i in range(2, len(src_cluster)): src_cluster[i].kill() - for i in range(2, len(dst_cluster)): dst_cluster[i].kill() - - - def test_federation_multilink_failover(self): - """ - Verify that multi-link federation operates across failures occuring in - a cluster. - """ - - # 1 node cluster source, 1 node cluster destination - src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) - src_cluster.ready(); - dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) - dst_cluster.ready(); - - # federate a direct binding across two separate links - - # first, create a direct exchange bound to two queues using different - # bindings - cmd = self.popen(["qpid-config", - "--broker", src_cluster[0].host_port(), - "add", "exchange", "direct", "FedX"], - EXPECT_EXIT_OK) - cmd.wait() - - cmd = self.popen(["qpid-config", - "--broker", dst_cluster[0].host_port(), - "add", "exchange", "direct", "FedX"], - EXPECT_EXIT_OK) - cmd.wait() - - cmd = self.popen(["qpid-config", - "--broker", dst_cluster[0].host_port(), - "add", "queue", "destQ1"], - EXPECT_EXIT_OK) - cmd.wait() - - cmd = self.popen(["qpid-config", - "--broker", dst_cluster[0].host_port(), - "bind", "FedX", "destQ1", "one"], - EXPECT_EXIT_OK) - cmd.wait() - - cmd = self.popen(["qpid-config", - "--broker", dst_cluster[0].host_port(), - "add", "queue", "destQ2"], - EXPECT_EXIT_OK) - cmd.wait() - - cmd = self.popen(["qpid-config", - "--broker", dst_cluster[0].host_port(), - "bind", "FedX", "destQ2", "two"], - EXPECT_EXIT_OK) - cmd.wait() - - # Create two separate links between the dst and source brokers, bind - # each to different keys - dst_cluster[0].startQmf() - dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0] - - for _l in [("link1", "bridge1", "one"), - ("link2", "bridge2", "two")]: - result = dst_broker.create("link", _l[0], - {"host":src_cluster[0].host(), - "port":src_cluster[0].port()}, - False) - self.assertEqual(result.status, 0, result); - result = dst_broker.create("bridge", _l[1], - {"link":_l[0], - "src":"FedX", - "dest":"FedX", - "key":_l[2]}, False) - self.assertEqual(result.status, 0); - - # check that traffic passes - assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") - assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") - - # add new member, verify traffic - src_cluster.start(expect=EXPECT_EXIT_FAIL); - src_cluster.ready(); - - dst_cluster.start(expect=EXPECT_EXIT_FAIL); - dst_cluster.ready(); - - assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") - assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") - - src_cluster[0].kill() - for b in src_cluster[1:]: b.ready() - - assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1") - assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2") - - dst_cluster[0].kill() - for b in dst_cluster[1:]: b.ready() - - assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1") - assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2") - - for i in range(1, len(src_cluster)): src_cluster[i].kill() - for i in range(1, len(dst_cluster)): dst_cluster[i].kill() - - - -# Some utility code for transaction tests -XA_RBROLLBACK = 1 -XA_RBTIMEOUT = 2 -XA_OK = 0 -dtx_branch_counter = 0 - -class DtxStatusException(Exception): - def __init__(self, expect, actual): - self.expect = expect - self.actual = actual - - def str(self): - return "DtxStatusException(expect=%s, actual=%s)"%(self.expect, self.actual) - -class DtxTestFixture: - """Bundle together some common requirements for dtx tests.""" - def __init__(self, test, broker, name, exclusive=False): - self.test = test - self.broker = broker - self.name = name - # Use old API. DTX is not supported in messaging API. - self.connection = broker.connect_old() - self.session = self.connection.session(name, 1) # 1 second timeout - self.queue = self.session.queue_declare(name, exclusive=exclusive) - self.session.dtx_select() - self.consumer = None - - def xid(self, id=None): - if id is None: id = self.name - return self.session.xid(format=0, global_id=id) - - def check_status(self, expect, actual): - if expect != actual: raise DtxStatusException(expect, actual) - - def start(self, id=None, resume=False): - self.check_status(XA_OK, self.session.dtx_start(xid=self.xid(id), resume=resume).status) - - def end(self, id=None, suspend=False): - self.check_status(XA_OK, self.session.dtx_end(xid=self.xid(id), suspend=suspend).status) - - def prepare(self, id=None): - self.check_status(XA_OK, self.session.dtx_prepare(xid=self.xid(id)).status) - - def commit(self, id=None, one_phase=True): - self.check_status( - XA_OK, self.session.dtx_commit(xid=self.xid(id), one_phase=one_phase).status) - - def rollback(self, id=None): - self.check_status(XA_OK, self.session.dtx_rollback(xid=self.xid(id)).status) - - def set_timeout(self, timeout, id=None): - self.session.dtx_set_timeout(xid=self.xid(id),timeout=timeout) - - def send(self, messages): - for m in messages: - dp=self.session.delivery_properties(routing_key=self.name) - mp=self.session.message_properties() - self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m)) - - def accept(self): - """Accept 1 message from queue""" - consumer_tag="%s-consumer"%(self.name) - self.session.message_subscribe(queue=self.name, destination=consumer_tag) - self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag) - self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag) - msg = self.session.incoming(consumer_tag).get(timeout=1) - self.session.message_cancel(destination=consumer_tag) - self.session.message_accept(qpid.datatypes.RangedSet(msg.id)) - return msg - - - def verify(self, sessions, messages): - for s in sessions: - self.test.assert_browse(s, self.name, messages) - -class DtxTests(BrokerTest): - - def test_dtx_update(self): - """Verify that DTX transaction state is updated to a new broker. - Start a collection of transactions, then add a new cluster member, - then verify they commit/rollback correctly on the new broker.""" - - # Note: multiple test have been bundled into one to avoid the need to start/stop - # multiple brokers per test. - - cluster=self.cluster(1) - sessions = [cluster[0].connect().session()] # For verify - - # Transaction that will be open when new member joins, then committed. - t1 = DtxTestFixture(self, cluster[0], "t1") - t1.start() - t1.send(["1", "2"]) - t1.verify(sessions, []) # Not visible outside of transaction - - # Transaction that will be open when new member joins, then rolled back. - t2 = DtxTestFixture(self, cluster[0], "t2") - t2.start() - t2.send(["1", "2"]) - - # Transaction that will be prepared when new member joins, then committed. - t3 = DtxTestFixture(self, cluster[0], "t3") - t3.start() - t3.send(["1", "2"]) - t3.end() - t3.prepare() - t1.verify(sessions, []) # Not visible outside of transaction - - # Transaction that will be prepared when new member joins, then rolled back. - t4 = DtxTestFixture(self, cluster[0], "t4") - t4.start() - t4.send(["1", "2"]) - t4.end() - t4.prepare() - - # Transaction using an exclusive queue - t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True) - t5.start() - t5.send(["1", "2"]) - - # Accept messages in a transaction before/after join then commit - # Note: Message sent outside transaction, we're testing transactional acceptance. - t6 = DtxTestFixture(self, cluster[0], "t6") - t6.send(["a","b","c"]) - t6.start() - self.assertEqual(t6.accept().body, "a"); - t6.verify(sessions, ["b", "c"]) - - # Accept messages in a transaction before/after join then roll back - # Note: Message sent outside transaction, we're testing transactional acceptance. - t7 = DtxTestFixture(self, cluster[0], "t7") - t7.send(["a","b","c"]) - t7.start() - self.assertEqual(t7.accept().body, "a"); - t7.verify(sessions, ["b", "c"]) - - # Ended, suspended transactions across join. - t8 = DtxTestFixture(self, cluster[0], "t8") - t8.start(id="1") - t8.send(["x"]) - t8.end(id="1", suspend=True) - t8.start(id="2") - t8.send(["y"]) - t8.end(id="2") - t8.start() - t8.send("z") - - - # Start new cluster member - cluster.start() - sessions.append(cluster[1].connect().session()) - - # Commit t1 - t1.send(["3","4"]) - t1.verify(sessions, []) - t1.end() - t1.commit(one_phase=True) - t1.verify(sessions, ["1","2","3","4"]) - - # Rollback t2 - t2.send(["3","4"]) - t2.end() - t2.rollback() - t2.verify(sessions, []) - - # Commit t3 - t3.commit(one_phase=False) - t3.verify(sessions, ["1","2"]) - - # Rollback t4 - t4.rollback() - t4.verify(sessions, []) - - # Commit t5 - t5.send(["3","4"]) - t5.verify(sessions, []) - t5.end() - t5.commit(one_phase=True) - t5.verify(sessions, ["1","2","3","4"]) - - # Commit t6 - self.assertEqual(t6.accept().body, "b"); - t6.verify(sessions, ["c"]) - t6.end() - t6.commit(one_phase=True) - t6.session.close() # Make sure they're not requeued by the session. - t6.verify(sessions, ["c"]) - - # Rollback t7 - self.assertEqual(t7.accept().body, "b"); - t7.verify(sessions, ["c"]) - t7.end() - t7.rollback() - t7.verify(sessions, ["a", "b", "c"]) - - # Resume t8 - t8.end() - t8.commit(one_phase=True) - t8.start("1", resume=True) - t8.end("1") - t8.commit("1", one_phase=True) - t8.commit("2", one_phase=True) - t8.verify(sessions, ["z", "x","y"]) - - - def test_dtx_failover_rollback(self): - """Kill a broker during a transaction, verify we roll back correctly""" - cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL) - cluster.start(expect=EXPECT_RUNNING) - - # Test unprepared at crash - t1 = DtxTestFixture(self, cluster[0], "t1") - t1.send(["a"]) # Not in transaction - t1.start() - t1.send(["b"]) # In transaction - - # Test prepared at crash - t2 = DtxTestFixture(self, cluster[0], "t2") - t2.send(["a"]) # Not in transaction - t2.start() - t2.send(["b"]) # In transaction - t2.end() - t2.prepare() - - # Crash the broker - cluster[0].kill() - - # Transactional changes should not appear - s = cluster[1].connect().session(); - self.assert_browse(s, "t1", ["a"]) - self.assert_browse(s, "t2", ["a"]) - - def test_dtx_timeout(self): - """Verify that dtx timeout works""" - cluster = self.cluster(1) - t1 = DtxTestFixture(self, cluster[0], "t1") - t1.start() - t1.set_timeout(1) - time.sleep(1.1) - try: - t1.end() - self.fail("Expected rollback timeout.") - except DtxStatusException, e: - self.assertEqual(e.actual, XA_RBTIMEOUT) - -class TxTests(BrokerTest): - - def test_tx_update(self): - """Verify that transaction state is updated to a new broker""" - - def make_message(session, body=None, key=None, id=None): - dp=session.delivery_properties(routing_key=key) - mp=session.message_properties(correlation_id=id) - return qpid.datatypes.Message(dp, mp, body) - - cluster=self.cluster(1) - # Use old API. TX is not supported in messaging API. - c = cluster[0].connect_old() - s = c.session("tx-session", 1) - s.queue_declare(queue="q") - # Start transaction - s.tx_select() - s.message_transfer(message=make_message(s, "1", "q")) - # Start new member mid-transaction - cluster.start() - # Do more work - s.message_transfer(message=make_message(s, "2", "q")) - # Commit the transaction and verify the results. - s.tx_commit() - for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"]) - - -class LongTests(BrokerTest): - """Tests that can run for a long time if -DDURATION=<minutes> is set""" - def duration(self): - d = self.config.defines.get("DURATION") - if d: return float(d)*60 - else: return 3 # Default is to be quick - - def test_failover(self): - """Test fail-over during continuous send-receive with errors""" - - # Original cluster will all be killed so expect exit with failure - cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) - for b in cluster: b.ready() # Wait for brokers to be ready - for b in cluster: ErrorGenerator(b) - - # Start sender and receiver threads - cluster[0].declare_queue("test-queue") - sender = NumberedSender(cluster[0], max_depth=1000) - receiver = NumberedReceiver(cluster[0], sender=sender) - receiver.start() - sender.start() - # Wait for sender & receiver to get up and running - retry(lambda: receiver.received > 0) - - # Kill original brokers, start new ones for the duration. - endtime = time.time() + self.duration() - i = 0 - while time.time() < endtime: - sender.sender.assert_running() - receiver.receiver.assert_running() - cluster[i].kill() - i += 1 - b = cluster.start(expect=EXPECT_EXIT_FAIL) - for b in cluster[i:]: b.ready() - ErrorGenerator(b) - time.sleep(5) - sender.stop() - receiver.stop() - for i in range(i, len(cluster)): cluster[i].kill() - - def test_management(self, args=[]): - """ - Stress test: Run management clients and other clients concurrently - while killing and restarting brokers. - """ - - class ClientLoop(StoppableThread): - """Run a client executable in a loop.""" - def __init__(self, broker, cmd): - StoppableThread.__init__(self) - self.broker=broker - self.cmd = cmd # Client command. - self.lock = Lock() - self.process = None # Client process. - self.start() - - def run(self): - try: - while True: - self.lock.acquire() - try: - if self.stopped: break - self.process = self.broker.test.popen( - self.cmd, expect=EXPECT_UNKNOWN) - finally: - self.lock.release() - try: - exit = self.process.wait() - except OSError, e: - # Process may already have been killed by self.stop() - break - except Exception, e: - self.process.unexpected( - "client of %s: %s"%(self.broker.name, e)) - self.lock.acquire() - try: - if self.stopped: break - if exit != 0: - self.process.unexpected( - "client of %s exit code %s"%(self.broker.name, exit)) - finally: - self.lock.release() - except Exception, e: - self.error = RethrownException("Error in ClientLoop.run") - - def stop(self): - """Stop the running client and wait for it to exit""" - self.lock.acquire() - try: - if self.stopped: return - self.stopped = True - if self.process: - try: self.process.kill() # Kill the client. - except OSError: pass # The client might not be running. - finally: self.lock.release() - StoppableThread.stop(self) - - # body of test_management() - - args += ["--mgmt-pub-interval", 1] - args += ["--log-enable=trace+:management"] - # Use store if present. - if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] - cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed - - clients = [] # Per-broker list of clients that only connect to one broker. - mclients = [] # Management clients that connect to every broker in the cluster. - - def start_clients(broker): - """Start ordinary clients for a broker.""" - cmds=[ - ["qpid-tool", "localhost:%s"%(broker.port())], - ["qpid-perftest", "--count=5000", "--durable=yes", - "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()], - ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()), - "--port", broker.port()], - ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())] - ] - clients.append([ClientLoop(broker, cmd) for cmd in cmds]) - - def start_mclients(broker): - """Start management clients that make multiple connections.""" - cmd = ["qpid-cluster", "-C", "localhost:%s" %(broker.port())] - mclients.append(ClientLoop(broker, cmd)) - - endtime = time.time() + self.duration() - # For long duration, first run is a quarter of the duration. - runtime = min(5.0, self.duration() / 3.0) - alive = 0 # First live cluster member - for i in range(len(cluster)): start_clients(cluster[i]) - start_mclients(cluster[alive]) - - while time.time() < endtime: - time.sleep(runtime) - runtime = 5 # Remaining runs 5 seconds, frequent broker kills - for b in cluster[alive:]: b.ready() # Check if a broker crashed. - # Kill the first broker, expect the clients to fail. - b = cluster[alive] - b.ready() - b.kill() - # Stop the brokers clients and all the mclients. - for c in clients[alive] + mclients: - try: c.stop() - except: pass # Ignore expected errors due to broker shutdown. - clients[alive] = [] - mclients = [] - # Start another broker and clients - alive += 1 - cluster.start(expect=EXPECT_EXIT_FAIL) - cluster[-1].ready() # Wait till its ready - start_clients(cluster[-1]) - start_mclients(cluster[alive]) - for c in chain(mclients, *clients): - c.stop() - for b in cluster[alive:]: - b.ready() # Verify still alive - b.kill() - # Verify that logs are consistent - cluster_test_logs.verify_logs() - - def test_management_qmf2(self): - self.test_management(args=["--mgmt-qmf2=yes"]) - - def test_connect_consistent(self): - args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] - cluster = self.cluster(2, args=args) - end = time.time() + self.duration() - while (time.time() < end): # Get a management interval - for i in xrange(1000): cluster[0].connect().close() - cluster_test_logs.verify_logs() - - def test_flowlimit_failover(self): - """Test fail-over during continuous send-receive with flow control - active. - """ - - # Original cluster will all be killed so expect exit with failure - cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) - for b in cluster: b.ready() # Wait for brokers to be ready - - # create a queue with rather draconian flow control settings - ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}") - - receiver = NumberedReceiver(cluster[0]) - receiver.start() - sender = NumberedSender(cluster[0]) - sender.start() - # Wait for senders & receiver to get up and running - retry(lambda: receiver.received > 10) - - # Kill original brokers, start new ones for the duration. - endtime = time.time() + self.duration(); - i = 0 - while time.time() < endtime: - sender.sender.assert_running() - receiver.receiver.assert_running() - for b in cluster[i:]: b.ready() # Check if any broker crashed. - cluster[i].kill() - i += 1 - b = cluster.start(expect=EXPECT_EXIT_FAIL) - time.sleep(5) - sender.stop() - receiver.stop() - for i in range(i, len(cluster)): cluster[i].kill() - - def test_ttl_failover(self): - """Test that messages with TTL don't cause problems in a cluster with failover""" - - class Client(StoppableThread): - - def __init__(self, broker): - StoppableThread.__init__(self) - self.connection = broker.connect(reconnect=True) - self.auto_fetch_reconnect_urls(self.connection) - self.session = self.connection.session() - - def auto_fetch_reconnect_urls(self, conn): - """Replacment for qpid.messaging.util version which is noisy""" - ssn = conn.session("auto-fetch-reconnect-urls") - rcv = ssn.receiver("amq.failover") - rcv.capacity = 10 - - def main(): - while True: - try: - msg = rcv.fetch() - qpid.messaging.util.set_reconnect_urls(conn, msg) - ssn.acknowledge(msg, sync=False) - except messaging.exceptions.LinkClosed: return - except messaging.exceptions.ConnectionError: return - - thread = Thread(name="auto-fetch-reconnect-urls", target=main) - thread.setDaemon(True) - thread.start() - - def stop(self): - StoppableThread.stop(self) - self.connection.detach() - - class Sender(Client): - def __init__(self, broker, address): - Client.__init__(self, broker) - self.sent = 0 # Number of messages _reliably_ sent. - self.sender = self.session.sender(address, capacity=1000) - - def send_counted(self, ttl): - self.sender.send(Message(str(self.sent), ttl=ttl)) - self.sent += 1 - - def run(self): - while not self.stopped: - choice = random.randint(0,4) - if choice == 0: self.send_counted(None) # No ttl - elif choice == 1: self.send_counted(100000) # Large ttl - else: # Small ttl, might expire - self.sender.send(Message("", ttl=random.random()/10)) - self.sender.send(Message("z"), sync=True) # Chaser. - - class Receiver(Client): - - def __init__(self, broker, address): - Client.__init__(self, broker) - self.received = 0 # Number of non-empty (reliable) messages received. - self.receiver = self.session.receiver(address, capacity=1000) - def run(self): - try: - while True: - m = self.receiver.fetch(1) - if m.content == "z": break - if m.content: # Ignore unreliable messages - # Ignore duplicates - if int(m.content) == self.received: self.received += 1 - except Exception,e: self.error = e - - # def test_ttl_failover - - # Original cluster will all be killed so expect exit with failure - # Set small purge interval. - cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"]) - for b in cluster: b.ready() # Wait for brokers to be ready - - # Python client failover produces noisy WARN logs, disable temporarily - logger = logging.getLogger() - log_level = logger.getEffectiveLevel() - logger.setLevel(logging.ERROR) - sender = None - receiver = None - try: - # Start sender and receiver threads - receiver = Receiver(cluster[0], "q;{create:always}") - receiver.start() - sender = Sender(cluster[0], "q;{create:always}") - sender.start() - # Wait for sender & receiver to get up and running - retry(lambda: receiver.received > 0) - - # Kill brokers in a cycle. - endtime = time.time() + self.duration() - runtime = min(5.0, self.duration() / 4.0) - i = 0 - while time.time() < endtime: - for b in cluster[i:]: b.ready() # Check if any broker crashed. - cluster[i].kill() - i += 1 - b = cluster.start(expect=EXPECT_EXIT_FAIL) - b.ready() - time.sleep(runtime) - sender.stop() - receiver.stop() - for b in cluster[i:]: - b.ready() # Check it didn't crash - b.kill() - self.assertEqual(sender.sent, receiver.received) - cluster_test_logs.verify_logs() - - finally: - # Detach to avoid slow reconnect attempts during shut-down if test fails. - if sender: sender.connection.detach() - if receiver: receiver.connection.detach() - logger.setLevel(log_level) - - def test_msg_group_failover(self): - """Test fail-over during continuous send-receive of grouped messages. - """ - - class GroupedTrafficGenerator(Thread): - def __init__(self, url, queue, group_key): - Thread.__init__(self) - self.url = url - self.queue = queue - self.group_key = group_key - self.status = -1 - - def run(self): - # generate traffic for approx 10 seconds (2011msgs / 200 per-sec) - cmd = ["msg_group_test", - "--broker=%s" % self.url, - "--address=%s" % self.queue, - "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS), - "--group-key=%s" % self.group_key, - "--receivers=2", - "--senders=3", - "--messages=2011", - "--send-rate=200", - "--capacity=11", - "--ack-frequency=23", - "--allow-duplicates", - "--group-size=37", - "--randomize-group-size", - "--interleave=13"] - # "--trace"] - self.generator = Popen( cmd ); - self.status = self.generator.wait() - return self.status - - def results(self): - self.join(timeout=30) # 3x assumed duration - if self.isAlive(): return -1 - return self.status - - # Original cluster will all be killed so expect exit with failure - cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"]) - for b in cluster: b.ready() # Wait for brokers to be ready - - # create a queue with rather draconian flow control settings - ssn0 = cluster[0].connect().session() - q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}" - s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args) - - # Kill original brokers, start new ones for the duration. - endtime = time.time() + self.duration(); - i = 0 - while time.time() < endtime: - traffic = GroupedTrafficGenerator( cluster[i].host_port(), - "test-group-q", "group-id" ) - traffic.start() - time.sleep(1) - - for x in range(2): - for b in cluster[i:]: b.ready() # Check if any broker crashed. - cluster[i].kill() - i += 1 - b = cluster.start(expect=EXPECT_EXIT_FAIL) - time.sleep(1) - - # wait for traffic to finish, verify success - self.assertEqual(0, traffic.results()) - - for i in range(i, len(cluster)): cluster[i].kill() - - -class StoreTests(BrokerTest): - """ - Cluster tests that can only be run if there is a store available. - """ - def args(self): - assert BrokerTest.store_lib - return ["--load-module", BrokerTest.store_lib] - - def test_store_loaded(self): - """Ensure we are indeed loading a working store""" - broker = self.broker(self.args(), name="recoverme", expect=EXPECT_EXIT_FAIL) - m = Message("x", durable=True) - broker.send_message("q", m) - broker.kill() - broker = self.broker(self.args(), name="recoverme") - self.assertEqual("x", broker.get_message("q").content) - - def test_kill_restart(self): - """Verify we can kill/resetart a broker with store in a cluster""" - cluster = self.cluster(1, self.args()) - cluster.start("restartme", expect=EXPECT_EXIT_FAIL).kill() - - # Send a message, retrieve from the restarted broker - cluster[0].send_message("q", "x") - m = cluster.start("restartme").get_message("q") - self.assertEqual("x", m.content) - - def stop_cluster(self,broker): - """Clean shut-down of a cluster""" - self.assertEqual(0, qpid_cluster.main( - ["-kf", broker.host_port()])) - - def test_persistent_restart(self): - """Verify persistent cluster shutdown/restart scenarios""" - cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) - a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) - c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True) - a.send_message("q", Message("1", durable=True)) - # Kill & restart one member. - c.kill() - self.assertEqual(a.get_message("q").content, "1") - a.send_message("q", Message("2", durable=True)) - c = cluster.start("c", expect=EXPECT_EXIT_OK) - self.assertEqual(c.get_message("q").content, "2") - # Shut down the entire cluster cleanly and bring it back up - a.send_message("q", Message("3", durable=True)) - self.stop_cluster(a) - a = cluster.start("a", wait=False) - b = cluster.start("b", wait=False) - c = cluster.start("c", wait=True) - self.assertEqual(a.get_message("q").content, "3") - - def test_persistent_partial_failure(self): - # Kill 2 members, shut down the last cleanly then restart - # Ensure we use the clean database - cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) - a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) - c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True) - a.send_message("q", Message("4", durable=True)) - a.kill() - b.kill() - self.assertEqual(c.get_message("q").content, "4") - c.send_message("q", Message("clean", durable=True)) - self.stop_cluster(c) - a = cluster.start("a", wait=False) - b = cluster.start("b", wait=False) - c = cluster.start("c", wait=True) - self.assertEqual(a.get_message("q").content, "clean") - - def test_wrong_cluster_id(self): - # Start a cluster1 broker, then try to restart in cluster2 - cluster1 = self.cluster(0, args=self.args()) - a = cluster1.start("a", expect=EXPECT_EXIT_OK) - a.terminate() - cluster2 = self.cluster(1, args=self.args()) - try: - a = cluster2.start("a", expect=EXPECT_EXIT_FAIL) - a.ready() - self.fail("Expected exception") - except: pass - - def test_wrong_shutdown_id(self): - # Start 2 members and shut down. - cluster = self.cluster(0, args=self.args()+["--cluster-size=2"]) - a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) - self.stop_cluster(a) - self.assertEqual(a.wait(), 0) - self.assertEqual(b.wait(), 0) - - # Restart with a different member and shut down. - a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) - c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False) - self.stop_cluster(a) - self.assertEqual(a.wait(), 0) - self.assertEqual(c.wait(), 0) - # Mix members from both shutdown events, they should fail - # TODO aconway 2010-03-11: can't predict the exit status of these - # as it depends on the order of delivery of initial-status messages. - # See comment at top of this file. - a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False) - b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False) - self.assertRaises(Exception, lambda: a.ready()) - self.assertRaises(Exception, lambda: b.ready()) - - def test_solo_store_clean(self): - # A single node cluster should always leave a clean store. - cluster = self.cluster(0, self.args()) - a = cluster.start("a", expect=EXPECT_EXIT_FAIL) - a.send_message("q", Message("x", durable=True)) - a.kill() - a = cluster.start("a") - self.assertEqual(a.get_message("q").content, "x") - - def test_last_store_clean(self): - # Verify that only the last node in a cluster to shut down has - # a clean store. Start with cluster of 3, reduce to 1 then - # increase again to ensure that a node that was once alone but - # finally did not finish as the last node does not get a clean - # store. - cluster = self.cluster(0, self.args()) - a = cluster.start("a", expect=EXPECT_EXIT_FAIL) - self.assertEqual(a.store_state(), "clean") - b = cluster.start("b", expect=EXPECT_EXIT_FAIL) - c = cluster.start("c", expect=EXPECT_EXIT_FAIL) - self.assertEqual(b.store_state(), "dirty") - self.assertEqual(c.store_state(), "dirty") - retry(lambda: a.store_state() == "dirty") - - a.send_message("q", Message("x", durable=True)) - a.kill() - b.kill() # c is last man, will mark store clean - retry(lambda: c.store_state() == "clean") - a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man - retry(lambda: c.store_state() == "dirty") - c.kill() # a is now last man - retry(lambda: a.store_state() == "clean") - a.kill() - self.assertEqual(a.store_state(), "clean") - self.assertEqual(b.store_state(), "dirty") - self.assertEqual(c.store_state(), "dirty") - - def test_restart_clean(self): - """Verify that we can re-start brokers one by one in a - persistent cluster after a clean oshutdown""" - cluster = self.cluster(0, self.args()) - a = cluster.start("a", expect=EXPECT_EXIT_OK) - b = cluster.start("b", expect=EXPECT_EXIT_OK) - c = cluster.start("c", expect=EXPECT_EXIT_OK) - a.send_message("q", Message("x", durable=True)) - self.stop_cluster(a) - a = cluster.start("a") - b = cluster.start("b") - c = cluster.start("c") - self.assertEqual(c.get_message("q").content, "x") - - def test_join_sub_size(self): - """Verify that after starting a cluster with cluster-size=N, - we can join new members even if size < N-1""" - cluster = self.cluster(0, self.args()+["--cluster-size=3"]) - a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL) - b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL) - c = cluster.start("c") - a.send_message("q", Message("x", durable=True)) - a.send_message("q", Message("y", durable=True)) - a.kill() - b.kill() - a = cluster.start("a") - self.assertEqual(c.get_message("q").content, "x") - b = cluster.start("b") - self.assertEqual(c.get_message("q").content, "y") diff --git a/qpid/cpp/src/tests/federated_cluster_test b/qpid/cpp/src/tests/federated_cluster_test deleted file mode 100755 index f42b7501b8..0000000000 --- a/qpid/cpp/src/tests/federated_cluster_test +++ /dev/null @@ -1,153 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Test reliability of the replication feature in the face of link -# failures: -srcdir=`dirname $0` -source ./test_env.sh - -trap stop_brokers EXIT - -fail() { - echo $1 - exit 1 -} - -stop_brokers() { - if [[ $BROKER_A ]] ; then - ../qpidd --no-module-dir -q --port $BROKER_A - unset BROKER_A - fi - if [[ $NODE_1 ]] ; then - ../qpidd --no-module-dir -q --port $NODE_1 - unset NODE_1 - fi - if [[ $NODE_2 ]] ; then - ../qpidd --no-module-dir -q --port $NODE_2 - unset NODE_2 - fi - if [ -f cluster.ports ]; then - rm cluster.ports - fi -} - -start_brokers() { - #start single node... - BROKER_A=`../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --log-enable info+` || fail "BROKER_A failed to start" - - #...and start cluster - $srcdir/start_cluster 2 || fail "Could not start cluster" - NODE_1=$(head -1 cluster.ports) - NODE_2=$(tail -1 cluster.ports) - test -n "$NODE_1" || fail "NODE_1 failed to start" - test -n "$NODE_2" || fail "NODE_2 failed to start" -} - -setup() { - #create exchange on both cluster and single broker - $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add exchange direct test-exchange - $PYTHON_COMMANDS/qpid-config -b "localhost:$NODE_1" add exchange direct test-exchange - - #create dynamic routes for test exchange - $PYTHON_COMMANDS/qpid-route dynamic add "localhost:$NODE_2" "localhost:$BROKER_A" test-exchange - $PYTHON_COMMANDS/qpid-route dynamic add "localhost:$BROKER_A" "localhost:$NODE_2" test-exchange - - #create test queue on cluster and bind it to the test exchange - $PYTHON_COMMANDS/qpid-config -b "localhost:$NODE_1" add queue test-queue - $PYTHON_COMMANDS/qpid-config -b "localhost:$NODE_1" bind test-exchange test-queue to-cluster - - #create test queue on single broker and bind it to the test exchange - $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" add queue test-queue - $PYTHON_COMMANDS/qpid-config -b "localhost:$BROKER_A" bind test-exchange test-queue from-cluster -} - -run_test_pull_to_cluster_two_consumers() { - #start consumers on each of the two nodes of the cluster - ./receiver --port $NODE_1 --queue test-queue --credit-window 1 > fed1.out.tmp & - ./receiver --port $NODE_2 --queue test-queue --credit-window 1 > fed2.out.tmp & - - #send stream of messages to test exchange on single broker - for i in `seq 1 1000`; do echo Message $i >> fed.in.tmp; done - ./sender --port $BROKER_A --exchange test-exchange --routing-key to-cluster --send-eos 2 < fed.in.tmp - - #combine output of the two consumers, sort it and compare with the expected stream - wait - sort -g -k 2 fed1.out.tmp fed2.out.tmp > fed.out.tmp - diff fed.in.tmp fed.out.tmp || fail "federated link to cluster failed: expectations not met!" - - rm -f fed*.tmp #cleanup -} - -run_test_pull_to_cluster() { - #send stream of messages to test exchange on single broker - for i in `seq 1 1000`; do echo Message $i >> fed.in.tmp; done - ./sender --port $BROKER_A --exchange test-exchange --routing-key to-cluster --send-eos 1 < fed.in.tmp - - #consume from remaining node of the cluster - ./receiver --port $NODE_2 --queue test-queue > fed.out.tmp - - #verify all messages are received - diff fed.in.tmp fed.out.tmp || fail "federated link to cluster failed: expectations not met!" - - rm -f fed*.tmp #cleanup -} - -run_test_pull_from_cluster() { - #start consumer on single broker - ./receiver --port $BROKER_A --queue test-queue --credit-window 1 > fed.out.tmp & - - #send stream of messages to test exchange on cluster - for i in `seq 1 1000`; do echo Message $i >> fed.in.tmp; done - ./sender --port $NODE_2 --exchange test-exchange --routing-key from-cluster --send-eos 1 < fed.in.tmp - - #verify all messages are received - wait - diff fed.in.tmp fed.out.tmp || fail "federated link from cluster failed: expectations not met!" - - rm -f fed*.tmp #cleanup -} - - -if test -d ${PYTHON_DIR}; then - . cpg_check.sh - cpg_enabled || exit 0 - - rm -f fed*.tmp #cleanup any files left from previous run - start_brokers - echo "brokers started" - setup - echo "setup completed" - run_test_pull_to_cluster_two_consumers - echo "federated link to cluster verified" - run_test_pull_from_cluster - echo "federated link from cluster verified" - if [[ $TEST_NODE_FAILURE ]] ; then - #kill first cluster node and retest - kill -9 $(../qpidd --check --port $NODE_1) && unset NODE_1 - echo "killed first cluster node; waiting for links to re-establish themselves..." - sleep 5 - echo "retesting..." - run_test_pull_to_cluster - echo "federated link to cluster verified" - run_test_pull_from_cluster - echo "federated link from cluster verified" - fi -fi diff --git a/qpid/cpp/src/tests/federated_cluster_test_with_node_failure b/qpid/cpp/src/tests/federated_cluster_test_with_node_failure deleted file mode 100755 index e9ae4b5914..0000000000 --- a/qpid/cpp/src/tests/federated_cluster_test_with_node_failure +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -srcdir=`dirname $0` -TEST_NODE_FAILURE=1 $srcdir/federated_cluster_test diff --git a/qpid/cpp/src/tests/run_cluster_authentication_soak b/qpid/cpp/src/tests/run_cluster_authentication_soak deleted file mode 100755 index 24befa28ba..0000000000 --- a/qpid/cpp/src/tests/run_cluster_authentication_soak +++ /dev/null @@ -1,27 +0,0 @@ -#! /bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -source ./test_env.sh -source sasl_test_setup.sh -source cpg_check.sh -cpg_enabled || exit 0 - -with_ais_group ./cluster_authentication_soak 500 - diff --git a/qpid/cpp/src/tests/run_cluster_authentication_test b/qpid/cpp/src/tests/run_cluster_authentication_test deleted file mode 100755 index 844807a857..0000000000 --- a/qpid/cpp/src/tests/run_cluster_authentication_test +++ /dev/null @@ -1,27 +0,0 @@ -#! /bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -source ./test_env.sh -source sasl_test_setup.sh -source cpg_check.sh -cpg_enabled || exit 0 - -with_ais_group ./cluster_authentication_soak - diff --git a/qpid/cpp/src/tests/run_cluster_test b/qpid/cpp/src/tests/run_cluster_test deleted file mode 100755 index 11df3d63a3..0000000000 --- a/qpid/cpp/src/tests/run_cluster_test +++ /dev/null @@ -1,27 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -# Run the tests -srcdir=`dirname $0` -source cpg_check.sh -cpg_enabled || exit 0 -with_ais_group $srcdir/run_test ./cluster_test diff --git a/qpid/cpp/src/tests/run_cluster_tests b/qpid/cpp/src/tests/run_cluster_tests deleted file mode 100755 index a5cea5ff6e..0000000000 --- a/qpid/cpp/src/tests/run_cluster_tests +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -source ./test_env.sh -source cpg_check.sh -cpg_enabled || exit 0 - - -test -x $QPID_PYTHON_TEST || { echo Skipping test, $QPID_PYTHON_TEST not found; exit 0; } - -# Delete old cluster test data -OUTDIR=${OUTDIR:-brokertest.tmp} -rm -rf $OUTDIR -mkdir -p $OUTDIR - -# Ignore tests requiring a store by default. -CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail} -CLUSTER_TESTS=${CLUSTER_TESTS:-$*} - -with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 -rm -rf $OUTDIR |