diff options
author | Alan Conway <aconway@apache.org> | 2008-11-19 20:22:38 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-11-19 20:22:38 +0000 |
commit | 66442ebf857fb8a6981ee4b0c45eb2f74143c7d0 (patch) | |
tree | 6faae8c1d841df8b7d5f15a68654f705fbe84bcc /cpp | |
parent | 715dcd61baf8d367eeb31c083bf6a9650e829c4c (diff) | |
download | qpid-python-66442ebf857fb8a6981ee4b0c45eb2f74143c7d0.tar.gz |
tests/failover_soak: run a cluster with clients, kill and restart cluster members, verify client fail-over.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@719055 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/tests/ForkedBroker.h | 20 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 20 | ||||
-rw-r--r-- | cpp/src/tests/declare_queues.cpp | 69 | ||||
-rw-r--r-- | cpp/src/tests/failover_soak.cpp | 651 | ||||
-rw-r--r-- | cpp/src/tests/replaying_sender.cpp | 131 | ||||
-rw-r--r-- | cpp/src/tests/resuming_receiver.cpp | 163 | ||||
-rwxr-xr-x | cpp/src/tests/run_failover_soak | 13 |
7 files changed, 1058 insertions, 9 deletions
diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h index c0d72a6235..98235b9690 100644 --- a/cpp/src/tests/ForkedBroker.h +++ b/cpp/src/tests/ForkedBroker.h @@ -59,17 +59,23 @@ class ForkedBroker { } void kill(int sig=SIGINT) { - using qpid::ErrnoException; if (pid == 0) return; - if (::kill(pid, sig) < 0) throw ErrnoException("kill failed"); + int savePid = pid; + pid = 0; // Always reset pid, even in case of an exception below. + ::close(pipeFds[1]); + + using qpid::ErrnoException; + if (::kill(savePid, sig) < 0) + throw ErrnoException("kill failed"); int status; - if (::waitpid(pid, &status, 0) < 0) throw ErrnoException("wait for forked process failed"); - if (WEXITSTATUS(status) != 0) - throw qpid::Exception(QPID_MSG("forked broker exited with: " << WEXITSTATUS(status))); - pid = 0; + if (::waitpid(savePid, &status, 0) < 0) + throw ErrnoException("wait for forked process failed"); + if (WEXITSTATUS(status) != 0) + throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status))); } uint16_t getPort() { return port; } + pid_t getPID() { return pid; } private: @@ -77,7 +83,6 @@ class ForkedBroker { using qpid::ErrnoException; pid = 0; port = 0; - int pipeFds[2]; if(::pipe(pipeFds) < 0) throw ErrnoException("Can't create pipe"); pid = ::fork(); if (pid < 0) throw ErrnoException("Fork failed"); @@ -104,6 +109,7 @@ class ForkedBroker { } } + int pipeFds[2]; pid_t pid; int port; }; diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 18044d73a1..78aadf0da0 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -134,6 +134,22 @@ check_PROGRAMS+=header_test header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h header_test_LDADD=$(lib_client) +check_PROGRAMS+=failover_soak +failover_soak_SOURCES=failover_soak.cpp ForkedBroker.h +failover_soak_LDADD=$(lib_client) + +check_PROGRAMS+=declare_queues +declare_queues_SOURCES=declare_queues.cpp +declare_queues_LDADD=$(lib_client) + +check_PROGRAMS+=replaying_sender +replaying_sender_SOURCES=replaying_sender.cpp +replaying_sender_LDADD=$(lib_client) + +check_PROGRAMS+=resuming_receiver +resuming_receiver_SOURCES=resuming_receiver.cpp +resuming_receiver_LDADD=$(lib_client) + check_PROGRAMS+=txshift txshift_SOURCES=txshift.cpp TestOptions.h ConnectionOptions.h txshift_LDADD=$(lib_client) @@ -153,7 +169,7 @@ sender_LDADD=$(lib_client) TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest -TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests +TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests EXTRA_DIST += \ run_test vg_check \ @@ -201,7 +217,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers) # Longer running stability tests, not run by default check: target. # Not run under valgrind, too slow -LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest +LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak EXTRA_DIST+=$(LONG_TESTS) run_perftest check-long: $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND= diff --git a/cpp/src/tests/declare_queues.cpp b/cpp/src/tests/declare_queues.cpp new file mode 100644 index 0000000000..7f61bde12a --- /dev/null +++ b/cpp/src/tests/declare_queues.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/FailoverManager.h> +#include <qpid/client/Session.h> +#include <qpid/Exception.h> + +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; + +using namespace std; + +int main(int argc, char ** argv) +{ + ConnectionSettings settings; + if ( argc != 3 ) + { + cerr << "Usage: declare_queues host port\n"; + return 1; + } + + settings.host = argv[1]; + settings.port = atoi(argv[2]); + + FailoverManager connection(settings); + try { + bool complete = false; + while (!complete) { + Session session = connection.connect().newSession(); + try { + session.queueDeclare(arg::queue="message_queue"); + complete = true; + } catch (const qpid::TransportFailure&) {} + } + connection.close(); + return 0; + } catch (const exception& error) { + cerr << "declare_queues failed:" << error.what() << endl; + cerr << " host: " << settings.host + << " port: " << settings.port << endl; + return 1; + } + +} + + + + + diff --git a/cpp/src/tests/failover_soak.cpp b/cpp/src/tests/failover_soak.cpp new file mode 100644 index 0000000000..6971bad8f9 --- /dev/null +++ b/cpp/src/tests/failover_soak.cpp @@ -0,0 +1,651 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/wait.h> +#include <sys/time.h> +#include <string.h> + +#include <string> +#include <iostream> +#include <sstream> +#include <vector> + +#include <ForkedBroker.h> + + + + +using namespace std; + + +typedef vector<ForkedBroker *> brokerVector; + +typedef enum +{ + NO_STATUS, + RUNNING, + COMPLETED +} +childStatus; + + + +struct child +{ + child ( string & name, pid_t pid ) + : name(name), pid(pid), retval(-999), status(RUNNING) + { + gettimeofday ( & startTime, 0 ); + } + + + void + done ( int _retval ) + { + retval = _retval; + status = COMPLETED; + gettimeofday ( & stopTime, 0 ); + } + + + string name; + pid_t pid; + int retval; + childStatus status; + struct timeval startTime, + stopTime; +}; + + + + +struct children : public vector<child *> +{ + void + add ( string & name, pid_t pid ) + { + push_back(new child ( name, pid )); + } + + + child * + get ( pid_t pid ) + { + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + if ( pid == (*i)->pid ) + return *i; + + return 0; + } + + + void + exited ( pid_t pid, int retval ) + { + child * kid = get ( pid ); + if(! kid) + { + if ( verbosity > 0 ) + { + cerr << "children::exited warning: Can't find child with pid " + << pid + << endl; + } + return; + } + + kid->done ( retval ); + } + + + int + unfinished ( ) + { + int count = 0; + + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + if ( COMPLETED != (*i)->status ) + ++ count; + + return count; + } + + + int + checkChildren ( ) + { + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) ) + return (*i)->retval; + + return 0; + } + + + void + killEverybody ( ) + { + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + kill ( (*i)->pid, 9 ); + } + + + + void + print ( ) + { + cout << "--- status of all children --------------\n"; + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + cout << "child: " << (*i)->name + << " status: " << (*i)->status + << endl; + cout << "\n\n\n\n"; + } + + + /* + Only call this if you already know there is at least + one child still running. Supply a time in seconds. + If it has been at least that long since a shild stopped + running, we judge the system to have hung. + */ + bool + hanging ( int hangTime ) + { + struct timeval now, + duration; + gettimeofday ( &now, 0 ); + + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + { + timersub ( & now, &((*i)->startTime), & duration ); + if ( duration.tv_sec >= hangTime ) + return true; + } + + return false; + } + + + int verbosity; +}; + + + +children allMyChildren; + + + + +void +childExit ( int signalNumber ) +{ + signalNumber ++; // Now maybe the compiler willleave me alone? + int childReturnCode; + pid_t pid = waitpid ( 0, & childReturnCode, WNOHANG); + + if ( pid > 0 ) + allMyChildren.exited ( pid, childReturnCode ); +} + + + +int +mrand ( int maxDesiredVal ) { + double zeroToOne = (double) rand() / (double) RAND_MAX; + return (int) (zeroToOne * (double) maxDesiredVal); +} + + + +int +mrand ( int minDesiredVal, int maxDesiredVal ) { + int interval = maxDesiredVal - minDesiredVal; + return minDesiredVal + mrand ( interval ); +} + + + +void +makeClusterName ( string & s, int & num ) { + num = mrand(1000); + stringstream ss; + ss << "soakTestCluster_" << num; + s = ss.str(); +} + + + + + +void +printBrokers ( brokerVector & brokers ) +{ + cout << "Broker List ------------ size: " << brokers.size() << "\n"; + for ( brokerVector::iterator i = brokers.begin(); i != brokers.end(); ++ i) { + cout << "pid: " + << (*i)->getPID() + << " port: " + << (*i)->getPort() + << endl; + } + cout << "end Broker List ------------\n"; +} + + + + + +void +startNewBroker ( brokerVector & brokers, + char const * srcRoot, + char const * moduleDir, + string const clusterName ) +{ + stringstream path; + path << srcRoot << "/qpidd"; + + const char * const argv[] = + { + "qpidd", + "-p0", + "--module-dir", + moduleDir, + "--load-module=cluster.so", + "--cluster-name", + clusterName.c_str(), + "--auth=no", + "--no-data-dir", + "--mgmt-enable=no", + 0 + }; + + size_t argc = sizeof(argv)/sizeof(argv[0]); + brokers.push_back ( new ForkedBroker ( argc, argv ) ); +} + + + + + +void +killFrontBroker ( brokerVector & brokers, int verbosity ) +{ + if ( verbosity > 0 ) + cout << "killFrontBroker pid: " << brokers[0]->getPID() << " on port " << brokers[0]->getPort() << endl; + try { brokers[0]->kill(9); } + catch ( const exception& error ) { + if ( verbosity > 0 ) + cout << "error killing broker: " << error.what() << endl; + } + delete brokers[0]; + brokers.erase ( brokers.begin() ); +} + + + + + +void +killAllBrokers ( brokerVector & brokers ) +{ + for ( uint i = 0; i < brokers.size(); ++ i ) + try { brokers[i]->kill(9); } + catch ( ... ) { } +} + + + + + +pid_t +runDeclareQueuesClient ( brokerVector brokers, + char const * host, + char const * path, + int verbosity + ) +{ + string name("declareQueues"); + int port = brokers[0]->getPort ( ); + + if ( verbosity > 0 ) + cout << "startDeclareQueuesClient: host: " + << host + << " port: " + << port + << endl; + stringstream portSs; + portSs << port; + + vector<const char*> argv; + argv.push_back ( "declareQueues" ); + argv.push_back ( host ); + argv.push_back ( portSs.str().c_str() ); + argv.push_back ( 0 ); + pid_t pid = fork(); + + if ( ! pid ) { + execv ( path, const_cast<char * const *>(&argv[0]) ); + perror ( "error executing dq: " ); + return 0; + } + + allMyChildren.add ( name, pid ); + return pid; +} + + + + + +pid_t +startReceivingClient ( brokerVector brokers, + char const * host, + char const * receiverPath, + char const * reportFrequency, + int verbosity + ) +{ + string name("receiver"); + int port = brokers[0]->getPort ( ); + + if ( verbosity > 0 ) + cout << "startReceivingClient: port " << port << endl; + char portStr[100]; + char verbosityStr[100]; + sprintf(portStr, "%d", port); + sprintf(verbosityStr, "%d", verbosity); + + + vector<const char*> argv; + argv.push_back ( "resumingReceiver" ); + argv.push_back ( host ); + argv.push_back ( portStr ); + argv.push_back ( reportFrequency ); + argv.push_back ( verbosityStr ); + argv.push_back ( 0 ); + + pid_t pid = fork(); + + if ( ! pid ) { + execv ( receiverPath, const_cast<char * const *>(&argv[0]) ); + perror ( "error executing receiver: " ); + return 0; + } + + allMyChildren.add ( name, pid ); + return pid; +} + + + + + +pid_t +startSendingClient ( brokerVector brokers, + char const * host, + char const * senderPath, + char const * nMessages, + char const * reportFrequency, + int verbosity + ) +{ + string name("sender"); + int port = brokers[0]->getPort ( ); + + if ( verbosity ) + cout << "startSenderClient: port " << port << endl; + char portStr[100]; + char verbosityStr[100]; + + sprintf ( portStr, "%d", port); + sprintf ( verbosityStr, "%d", verbosity); + + vector<const char*> argv; + argv.push_back ( "replayingSender" ); + argv.push_back ( host ); + argv.push_back ( portStr ); + argv.push_back ( nMessages ); + argv.push_back ( reportFrequency ); + argv.push_back ( verbosityStr ); + argv.push_back ( 0 ); + + pid_t pid = fork(); + + if ( ! pid ) { + execv ( senderPath, const_cast<char * const *>(&argv[0]) ); + perror ( "error executing sender: " ); + return 0; + } + + allMyChildren.add ( name, pid ); + return pid; +} + + + +#define HUNKY_DORY 0 +#define BAD_ARGS 1 +#define CANT_FORK_DQ 2 +#define CANT_FORK_RECEIVER 3 +#define DQ_FAILED 4 +#define ERROR_ON_CHILD 5 +#define HANGING 6 + + +int +main ( int argc, char const ** argv ) +{ + if ( argc < 9 ) { + cerr << "Usage: failoverSoak srcRoot moduleDir host senderPath receiverPath nMessages verbosity\n"; + cerr << " ( argc was " << argc << " )\n"; + return BAD_ARGS; + } + + signal ( SIGCHLD, childExit ); + + char const * srcRoot = argv[1]; + char const * moduleDir = argv[2]; + char const * host = argv[3]; + char const * declareQueuesPath = argv[4]; + char const * senderPath = argv[5]; + char const * receiverPath = argv[6]; + char const * nMessages = argv[7]; + char const * reportFrequency = argv[8]; + int verbosity = atoi(argv[9]); + + int maxBrokers = 50; + + allMyChildren.verbosity = verbosity; + + int clusterNum; + string clusterName; + + srand ( getpid() ); + + makeClusterName ( clusterName, clusterNum ); + + brokerVector brokers; + + if ( verbosity > 0 ) + cout << "Starting initial cluster...\n"; + + int nBrokers = 3; + for ( int i = 0; i < nBrokers; ++ i ) { + startNewBroker ( brokers, + srcRoot, + moduleDir, + clusterName ); + } + + + if ( verbosity > 0 ) + printBrokers ( brokers ); + + // Run the declareQueues child. + int childStatus; + pid_t dqClientPid = + runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity ); + if ( -1 == dqClientPid ) { + cerr << "failoverSoak error: Couldn't fork declareQueues.\n"; + return CANT_FORK_DQ; + } + + // Don't continue until declareQueues is finished. + pid_t retval = waitpid ( dqClientPid, & childStatus, 0); + if ( retval != dqClientPid) { + cerr << "failoverSoak error: waitpid on declareQueues returned value " << retval << endl; + return DQ_FAILED; + } + allMyChildren.exited ( dqClientPid, childStatus ); + + + + // Start the receiving client. + pid_t receivingClientPid = + startReceivingClient ( brokers, + host, + receiverPath, + reportFrequency, + verbosity ); + if ( -1 == receivingClientPid ) { + cerr << "failoverSoak error: Couldn't fork receiver.\n"; + return CANT_FORK_RECEIVER; + } + + + // Start the sending client. + pid_t sendingClientPid = + startSendingClient ( brokers, + host, + senderPath, + nMessages, + reportFrequency, + verbosity ); + if ( -1 == sendingClientPid ) { + cerr << "failoverSoak error: Couldn't fork sender.\n"; + return CANT_FORK_RECEIVER; + } + + + int minSleep = 3, + maxSleep = 6; + + + for ( int totalBrokers = 3; + totalBrokers < maxBrokers; + ++ totalBrokers + ) + { + if ( verbosity > 0 ) + cout << totalBrokers << " brokers have been added to the cluster.\n\n\n"; + + // Sleep for a while. ------------------------- + int sleepyTime = mrand ( minSleep, maxSleep ); + if ( verbosity > 0 ) + cout << "Sleeping for " << sleepyTime << " seconds.\n"; + sleep ( sleepyTime ); + + // Kill the oldest broker. -------------------------- + killFrontBroker ( brokers, verbosity ); + + // Sleep for a while. ------------------------- + sleepyTime = mrand ( minSleep, maxSleep ); + if ( verbosity > 0 ) + cerr << "Sleeping for " << sleepyTime << " seconds.\n"; + sleep ( sleepyTime ); + + // Start a new broker. -------------------------- + if ( verbosity > 0 ) + cout << "Starting new broker.\n\n"; + + startNewBroker ( brokers, + srcRoot, + moduleDir, + clusterName ); + + if ( verbosity > 0 ) + printBrokers ( brokers ); + + // If all children have exited, quit. + int unfinished = allMyChildren.unfinished(); + if ( ! unfinished ) { + killAllBrokers ( brokers ); + + if ( verbosity > 0 ) + cout << "failoverSoak: all children have exited.\n"; + int retval = allMyChildren.checkChildren(); + if ( verbosity > 0 ) + std::cerr << "failoverSoak: checkChildren: " << retval << endl; + return retval ? ERROR_ON_CHILD : HUNKY_DORY; + } + + // Even if some are still running, if there's an error, quit. + if ( allMyChildren.checkChildren() ) + { + if ( verbosity > 0 ) + cout << "failoverSoak: error on child.\n"; + allMyChildren.killEverybody(); + killAllBrokers ( brokers ); + return ERROR_ON_CHILD; + } + + // If one is hanging, quit. + if ( allMyChildren.hanging ( 120 ) ) + { + if ( verbosity > 0 ) + cout << "failoverSoak: child hanging.\n"; + allMyChildren.killEverybody(); + killAllBrokers ( brokers ); + return HANGING; + } + + if ( verbosity > 0 ) { + std::cerr << "------- next kill-broker loop --------\n"; + allMyChildren.print(); + } + } + + retval = allMyChildren.checkChildren(); + if ( verbosity > 0 ) + std::cerr << "failoverSoak: checkChildren: " << retval << endl; + + if ( verbosity > 0 ) + cout << "failoverSoak: maxBrokers reached.\n"; + + allMyChildren.killEverybody(); + killAllBrokers ( brokers ); + + return retval ? ERROR_ON_CHILD : HUNKY_DORY; +} + + + diff --git a/cpp/src/tests/replaying_sender.cpp b/cpp/src/tests/replaying_sender.cpp new file mode 100644 index 0000000000..7e148e277f --- /dev/null +++ b/cpp/src/tests/replaying_sender.cpp @@ -0,0 +1,131 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/FailoverManager.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> +#include <qpid/client/MessageReplayTracker.h> +#include <qpid/Exception.h> + +#include <iostream> +#include <sstream> + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + +class Sender : public FailoverManager::Command +{ + public: + Sender(const std::string& queue, uint count, uint reportFreq); + void execute(AsyncSession& session, bool isRetry); + uint getSent(); + + int verbosity; + + private: + MessageReplayTracker sender; + const uint count; + uint sent; + const uint reportFrequency; + Message message; + +}; + +Sender::Sender(const std::string& queue, uint count_, uint reportFreq ) : sender(10), count(count_), sent(0), reportFrequency(reportFreq) +{ + message.getDeliveryProperties().setRoutingKey(queue); +} + +void Sender::execute(AsyncSession& session, bool isRetry) +{ + if (isRetry) sender.replay(session); + else sender.init(session); + while (sent < count) { + stringstream message_data; + message_data << ++sent; + message.setData(message_data.str()); + message.getHeaders().setInt("sn", sent); + sender.send(message); + if (count > reportFrequency && !(sent % reportFrequency)) { + if ( verbosity > 0 ) + std::cout << "sent " << sent << " of " << count << std::endl; + } + } + message.setData("That's all, folks!"); + sender.send(message); + + if ( verbosity > 0 ) + std::cout << "SENDER COMPLETED\n"; +} + +uint Sender::getSent() +{ + return sent; +} + +int main(int argc, char ** argv) +{ + ConnectionSettings settings; + + if ( argc != 6 ) + { + std::cerr << "Usage: replaying_sender host port n_messages report_frequency verbosity\n"; + return 1; + } + + settings.host = argv[1]; + settings.port = atoi(argv[2]); + int n_messages = atoi(argv[3]); + int reportFrequency = atoi(argv[4]); + int verbosity = atoi(argv[5]); + + FailoverManager connection(settings); + Sender sender("message_queue", n_messages, reportFrequency ); + sender.verbosity = verbosity; + try { + connection.execute ( sender ); + if ( verbosity > 0 ) + { + std::cout << "Sender finished. Sent " + << sender.getSent() + << " messages." + << endl; + } + connection.close(); + return 0; + } + catch(const std::exception& error) + { + cerr << "Sender (host: " + << settings.host + << " port: " + << settings.port + << " ) " + << " Failed: " + << error.what() + << std::endl; + } + return 1; +} diff --git a/cpp/src/tests/resuming_receiver.cpp b/cpp/src/tests/resuming_receiver.cpp new file mode 100644 index 0000000000..f49a115e1e --- /dev/null +++ b/cpp/src/tests/resuming_receiver.cpp @@ -0,0 +1,163 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/FailoverManager.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/SubscriptionManager.h> + +#include <iostream> +#include <fstream> + + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + + +class Listener : public MessageListener, + public FailoverManager::Command, + public FailoverManager::ReconnectionStrategy +{ + public: + Listener ( int report_frequency = 1000, int verbosity = 0 ); + void received(Message& message); + void execute(AsyncSession& session, bool isRetry); + void check(); + void editUrlList(std::vector<Url>& urls); + private: + Subscription subscription; + uint count; + uint received_twice; + uint lastSn; + bool gaps; + uint reportFrequency; + int verbosity; +}; + + +Listener::Listener(int freq, int verbosity) + : count(0), + received_twice(0), + lastSn(0), + gaps(false), + reportFrequency(freq), + verbosity(verbosity) +{} + + +void Listener::received(Message & message) +{ + if (message.getData() == "That's all, folks!") + { + if(verbosity > 0 ) + { + std::cout << "Shutting down listener for " + << message.getDestination() << std::endl; + + std::cout << "Listener received " + << count + << " messages (" + << received_twice + << " received_twice)" + << endl; + } + subscription.cancel(); + if ( verbosity > 0 ) + std::cout << "LISTENER COMPLETED\n"; + } else { + uint sn = message.getHeaders().getAsInt("sn"); + if (lastSn < sn) { + if (sn - lastSn > 1) { + std::cerr << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl; + gaps = true; + } + lastSn = sn; + ++count; + if ( ! ( count % reportFrequency ) ) { + if ( verbosity > 0 ) + std::cout << "Listener has received " + << count + << " messages.\n"; + } + } else { + ++received_twice; + } + } +} + +void Listener::check() +{ + if (gaps) throw Exception("Detected gaps in sequence; messages appear to have been lost."); +} + +void Listener::execute(AsyncSession& session, bool isRetry) +{ + if (isRetry) { + // std::cout << "Resuming from " << count << std::endl; + } + SubscriptionManager subs(session); + subscription = subs.subscribe(*this, "message_queue"); + subs.run(); +} + +void Listener::editUrlList(std::vector<Url>& urls) +{ + /** + * A more realistic algorithm would be to search through the list + * for prefered hosts and ensure they come first in the list. + */ + if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, urls.end()); +} + +int main(int argc, char ** argv) +{ + ConnectionSettings settings; + + if ( argc != 5 ) + { + std::cerr << "Usage: resuming_receiver host port report_frequency verbosity\n"; + return 1; + } + + settings.host = argv[1]; + settings.port = atoi(argv[2]); + int reportFrequency = atoi(argv[3]); + int verbosity = atoi(argv[4]); + + Listener listener(reportFrequency, verbosity); + FailoverManager connection(settings, &listener); + + try { + connection.execute(listener); + connection.close(); + listener.check(); + return 0; + } catch(const std::exception& error) { + std::cerr << "Receiver failed: " << error.what() << std::endl; + } + return 1; +} + + + diff --git a/cpp/src/tests/run_failover_soak b/cpp/src/tests/run_failover_soak new file mode 100755 index 0000000000..d332cd42b9 --- /dev/null +++ b/cpp/src/tests/run_failover_soak @@ -0,0 +1,13 @@ +#!/bin/sh + +host=127.0.0.1 + +src_root=.. +module_dir=$src_root/.libs +n_messages=300000 +report_frequency=10000 +verbosity=1 + + +exec `dirname $0`/failover_soak $src_root $module_dir $host ./declare_queues ./replaying_sender ./resuming_receiver $n_messages $report_frequency $verbosity + |