summaryrefslogtreecommitdiff
path: root/cpp/src/tests/failover_soak.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-19 20:22:38 +0000
committerAlan Conway <aconway@apache.org>2008-11-19 20:22:38 +0000
commit66442ebf857fb8a6981ee4b0c45eb2f74143c7d0 (patch)
tree6faae8c1d841df8b7d5f15a68654f705fbe84bcc /cpp/src/tests/failover_soak.cpp
parent715dcd61baf8d367eeb31c083bf6a9650e829c4c (diff)
downloadqpid-python-66442ebf857fb8a6981ee4b0c45eb2f74143c7d0.tar.gz
tests/failover_soak: run a cluster with clients, kill and restart cluster members, verify client fail-over.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@719055 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/failover_soak.cpp')
-rw-r--r--cpp/src/tests/failover_soak.cpp651
1 files changed, 651 insertions, 0 deletions
diff --git a/cpp/src/tests/failover_soak.cpp b/cpp/src/tests/failover_soak.cpp
new file mode 100644
index 0000000000..6971bad8f9
--- /dev/null
+++ b/cpp/src/tests/failover_soak.cpp
@@ -0,0 +1,651 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/wait.h>
+#include <sys/time.h>
+#include <string.h>
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <vector>
+
+#include <ForkedBroker.h>
+
+
+
+
+using namespace std;
+
+
+typedef vector<ForkedBroker *> brokerVector;
+
+typedef enum
+{
+ NO_STATUS,
+ RUNNING,
+ COMPLETED
+}
+childStatus;
+
+
+
+struct child
+{
+ child ( string & name, pid_t pid )
+ : name(name), pid(pid), retval(-999), status(RUNNING)
+ {
+ gettimeofday ( & startTime, 0 );
+ }
+
+
+ void
+ done ( int _retval )
+ {
+ retval = _retval;
+ status = COMPLETED;
+ gettimeofday ( & stopTime, 0 );
+ }
+
+
+ string name;
+ pid_t pid;
+ int retval;
+ childStatus status;
+ struct timeval startTime,
+ stopTime;
+};
+
+
+
+
+struct children : public vector<child *>
+{
+ void
+ add ( string & name, pid_t pid )
+ {
+ push_back(new child ( name, pid ));
+ }
+
+
+ child *
+ get ( pid_t pid )
+ {
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ if ( pid == (*i)->pid )
+ return *i;
+
+ return 0;
+ }
+
+
+ void
+ exited ( pid_t pid, int retval )
+ {
+ child * kid = get ( pid );
+ if(! kid)
+ {
+ if ( verbosity > 0 )
+ {
+ cerr << "children::exited warning: Can't find child with pid "
+ << pid
+ << endl;
+ }
+ return;
+ }
+
+ kid->done ( retval );
+ }
+
+
+ int
+ unfinished ( )
+ {
+ int count = 0;
+
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ if ( COMPLETED != (*i)->status )
+ ++ count;
+
+ return count;
+ }
+
+
+ int
+ checkChildren ( )
+ {
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) )
+ return (*i)->retval;
+
+ return 0;
+ }
+
+
+ void
+ killEverybody ( )
+ {
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ kill ( (*i)->pid, 9 );
+ }
+
+
+
+ void
+ print ( )
+ {
+ cout << "--- status of all children --------------\n";
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ cout << "child: " << (*i)->name
+ << " status: " << (*i)->status
+ << endl;
+ cout << "\n\n\n\n";
+ }
+
+
+ /*
+ Only call this if you already know there is at least
+ one child still running. Supply a time in seconds.
+ If it has been at least that long since a shild stopped
+ running, we judge the system to have hung.
+ */
+ bool
+ hanging ( int hangTime )
+ {
+ struct timeval now,
+ duration;
+ gettimeofday ( &now, 0 );
+
+ vector<child *>::iterator i;
+ for ( i = begin(); i != end(); ++ i )
+ {
+ timersub ( & now, &((*i)->startTime), & duration );
+ if ( duration.tv_sec >= hangTime )
+ return true;
+ }
+
+ return false;
+ }
+
+
+ int verbosity;
+};
+
+
+
+children allMyChildren;
+
+
+
+
+void
+childExit ( int signalNumber )
+{
+ signalNumber ++; // Now maybe the compiler willleave me alone?
+ int childReturnCode;
+ pid_t pid = waitpid ( 0, & childReturnCode, WNOHANG);
+
+ if ( pid > 0 )
+ allMyChildren.exited ( pid, childReturnCode );
+}
+
+
+
+int
+mrand ( int maxDesiredVal ) {
+ double zeroToOne = (double) rand() / (double) RAND_MAX;
+ return (int) (zeroToOne * (double) maxDesiredVal);
+}
+
+
+
+int
+mrand ( int minDesiredVal, int maxDesiredVal ) {
+ int interval = maxDesiredVal - minDesiredVal;
+ return minDesiredVal + mrand ( interval );
+}
+
+
+
+void
+makeClusterName ( string & s, int & num ) {
+ num = mrand(1000);
+ stringstream ss;
+ ss << "soakTestCluster_" << num;
+ s = ss.str();
+}
+
+
+
+
+
+void
+printBrokers ( brokerVector & brokers )
+{
+ cout << "Broker List ------------ size: " << brokers.size() << "\n";
+ for ( brokerVector::iterator i = brokers.begin(); i != brokers.end(); ++ i) {
+ cout << "pid: "
+ << (*i)->getPID()
+ << " port: "
+ << (*i)->getPort()
+ << endl;
+ }
+ cout << "end Broker List ------------\n";
+}
+
+
+
+
+
+void
+startNewBroker ( brokerVector & brokers,
+ char const * srcRoot,
+ char const * moduleDir,
+ string const clusterName )
+{
+ stringstream path;
+ path << srcRoot << "/qpidd";
+
+ const char * const argv[] =
+ {
+ "qpidd",
+ "-p0",
+ "--module-dir",
+ moduleDir,
+ "--load-module=cluster.so",
+ "--cluster-name",
+ clusterName.c_str(),
+ "--auth=no",
+ "--no-data-dir",
+ "--mgmt-enable=no",
+ 0
+ };
+
+ size_t argc = sizeof(argv)/sizeof(argv[0]);
+ brokers.push_back ( new ForkedBroker ( argc, argv ) );
+}
+
+
+
+
+
+void
+killFrontBroker ( brokerVector & brokers, int verbosity )
+{
+ if ( verbosity > 0 )
+ cout << "killFrontBroker pid: " << brokers[0]->getPID() << " on port " << brokers[0]->getPort() << endl;
+ try { brokers[0]->kill(9); }
+ catch ( const exception& error ) {
+ if ( verbosity > 0 )
+ cout << "error killing broker: " << error.what() << endl;
+ }
+ delete brokers[0];
+ brokers.erase ( brokers.begin() );
+}
+
+
+
+
+
+void
+killAllBrokers ( brokerVector & brokers )
+{
+ for ( uint i = 0; i < brokers.size(); ++ i )
+ try { brokers[i]->kill(9); }
+ catch ( ... ) { }
+}
+
+
+
+
+
+pid_t
+runDeclareQueuesClient ( brokerVector brokers,
+ char const * host,
+ char const * path,
+ int verbosity
+ )
+{
+ string name("declareQueues");
+ int port = brokers[0]->getPort ( );
+
+ if ( verbosity > 0 )
+ cout << "startDeclareQueuesClient: host: "
+ << host
+ << " port: "
+ << port
+ << endl;
+ stringstream portSs;
+ portSs << port;
+
+ vector<const char*> argv;
+ argv.push_back ( "declareQueues" );
+ argv.push_back ( host );
+ argv.push_back ( portSs.str().c_str() );
+ argv.push_back ( 0 );
+ pid_t pid = fork();
+
+ if ( ! pid ) {
+ execv ( path, const_cast<char * const *>(&argv[0]) );
+ perror ( "error executing dq: " );
+ return 0;
+ }
+
+ allMyChildren.add ( name, pid );
+ return pid;
+}
+
+
+
+
+
+pid_t
+startReceivingClient ( brokerVector brokers,
+ char const * host,
+ char const * receiverPath,
+ char const * reportFrequency,
+ int verbosity
+ )
+{
+ string name("receiver");
+ int port = brokers[0]->getPort ( );
+
+ if ( verbosity > 0 )
+ cout << "startReceivingClient: port " << port << endl;
+ char portStr[100];
+ char verbosityStr[100];
+ sprintf(portStr, "%d", port);
+ sprintf(verbosityStr, "%d", verbosity);
+
+
+ vector<const char*> argv;
+ argv.push_back ( "resumingReceiver" );
+ argv.push_back ( host );
+ argv.push_back ( portStr );
+ argv.push_back ( reportFrequency );
+ argv.push_back ( verbosityStr );
+ argv.push_back ( 0 );
+
+ pid_t pid = fork();
+
+ if ( ! pid ) {
+ execv ( receiverPath, const_cast<char * const *>(&argv[0]) );
+ perror ( "error executing receiver: " );
+ return 0;
+ }
+
+ allMyChildren.add ( name, pid );
+ return pid;
+}
+
+
+
+
+
+pid_t
+startSendingClient ( brokerVector brokers,
+ char const * host,
+ char const * senderPath,
+ char const * nMessages,
+ char const * reportFrequency,
+ int verbosity
+ )
+{
+ string name("sender");
+ int port = brokers[0]->getPort ( );
+
+ if ( verbosity )
+ cout << "startSenderClient: port " << port << endl;
+ char portStr[100];
+ char verbosityStr[100];
+
+ sprintf ( portStr, "%d", port);
+ sprintf ( verbosityStr, "%d", verbosity);
+
+ vector<const char*> argv;
+ argv.push_back ( "replayingSender" );
+ argv.push_back ( host );
+ argv.push_back ( portStr );
+ argv.push_back ( nMessages );
+ argv.push_back ( reportFrequency );
+ argv.push_back ( verbosityStr );
+ argv.push_back ( 0 );
+
+ pid_t pid = fork();
+
+ if ( ! pid ) {
+ execv ( senderPath, const_cast<char * const *>(&argv[0]) );
+ perror ( "error executing sender: " );
+ return 0;
+ }
+
+ allMyChildren.add ( name, pid );
+ return pid;
+}
+
+
+
+#define HUNKY_DORY 0
+#define BAD_ARGS 1
+#define CANT_FORK_DQ 2
+#define CANT_FORK_RECEIVER 3
+#define DQ_FAILED 4
+#define ERROR_ON_CHILD 5
+#define HANGING 6
+
+
+int
+main ( int argc, char const ** argv )
+{
+ if ( argc < 9 ) {
+ cerr << "Usage: failoverSoak srcRoot moduleDir host senderPath receiverPath nMessages verbosity\n";
+ cerr << " ( argc was " << argc << " )\n";
+ return BAD_ARGS;
+ }
+
+ signal ( SIGCHLD, childExit );
+
+ char const * srcRoot = argv[1];
+ char const * moduleDir = argv[2];
+ char const * host = argv[3];
+ char const * declareQueuesPath = argv[4];
+ char const * senderPath = argv[5];
+ char const * receiverPath = argv[6];
+ char const * nMessages = argv[7];
+ char const * reportFrequency = argv[8];
+ int verbosity = atoi(argv[9]);
+
+ int maxBrokers = 50;
+
+ allMyChildren.verbosity = verbosity;
+
+ int clusterNum;
+ string clusterName;
+
+ srand ( getpid() );
+
+ makeClusterName ( clusterName, clusterNum );
+
+ brokerVector brokers;
+
+ if ( verbosity > 0 )
+ cout << "Starting initial cluster...\n";
+
+ int nBrokers = 3;
+ for ( int i = 0; i < nBrokers; ++ i ) {
+ startNewBroker ( brokers,
+ srcRoot,
+ moduleDir,
+ clusterName );
+ }
+
+
+ if ( verbosity > 0 )
+ printBrokers ( brokers );
+
+ // Run the declareQueues child.
+ int childStatus;
+ pid_t dqClientPid =
+ runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity );
+ if ( -1 == dqClientPid ) {
+ cerr << "failoverSoak error: Couldn't fork declareQueues.\n";
+ return CANT_FORK_DQ;
+ }
+
+ // Don't continue until declareQueues is finished.
+ pid_t retval = waitpid ( dqClientPid, & childStatus, 0);
+ if ( retval != dqClientPid) {
+ cerr << "failoverSoak error: waitpid on declareQueues returned value " << retval << endl;
+ return DQ_FAILED;
+ }
+ allMyChildren.exited ( dqClientPid, childStatus );
+
+
+
+ // Start the receiving client.
+ pid_t receivingClientPid =
+ startReceivingClient ( brokers,
+ host,
+ receiverPath,
+ reportFrequency,
+ verbosity );
+ if ( -1 == receivingClientPid ) {
+ cerr << "failoverSoak error: Couldn't fork receiver.\n";
+ return CANT_FORK_RECEIVER;
+ }
+
+
+ // Start the sending client.
+ pid_t sendingClientPid =
+ startSendingClient ( brokers,
+ host,
+ senderPath,
+ nMessages,
+ reportFrequency,
+ verbosity );
+ if ( -1 == sendingClientPid ) {
+ cerr << "failoverSoak error: Couldn't fork sender.\n";
+ return CANT_FORK_RECEIVER;
+ }
+
+
+ int minSleep = 3,
+ maxSleep = 6;
+
+
+ for ( int totalBrokers = 3;
+ totalBrokers < maxBrokers;
+ ++ totalBrokers
+ )
+ {
+ if ( verbosity > 0 )
+ cout << totalBrokers << " brokers have been added to the cluster.\n\n\n";
+
+ // Sleep for a while. -------------------------
+ int sleepyTime = mrand ( minSleep, maxSleep );
+ if ( verbosity > 0 )
+ cout << "Sleeping for " << sleepyTime << " seconds.\n";
+ sleep ( sleepyTime );
+
+ // Kill the oldest broker. --------------------------
+ killFrontBroker ( brokers, verbosity );
+
+ // Sleep for a while. -------------------------
+ sleepyTime = mrand ( minSleep, maxSleep );
+ if ( verbosity > 0 )
+ cerr << "Sleeping for " << sleepyTime << " seconds.\n";
+ sleep ( sleepyTime );
+
+ // Start a new broker. --------------------------
+ if ( verbosity > 0 )
+ cout << "Starting new broker.\n\n";
+
+ startNewBroker ( brokers,
+ srcRoot,
+ moduleDir,
+ clusterName );
+
+ if ( verbosity > 0 )
+ printBrokers ( brokers );
+
+ // If all children have exited, quit.
+ int unfinished = allMyChildren.unfinished();
+ if ( ! unfinished ) {
+ killAllBrokers ( brokers );
+
+ if ( verbosity > 0 )
+ cout << "failoverSoak: all children have exited.\n";
+ int retval = allMyChildren.checkChildren();
+ if ( verbosity > 0 )
+ std::cerr << "failoverSoak: checkChildren: " << retval << endl;
+ return retval ? ERROR_ON_CHILD : HUNKY_DORY;
+ }
+
+ // Even if some are still running, if there's an error, quit.
+ if ( allMyChildren.checkChildren() )
+ {
+ if ( verbosity > 0 )
+ cout << "failoverSoak: error on child.\n";
+ allMyChildren.killEverybody();
+ killAllBrokers ( brokers );
+ return ERROR_ON_CHILD;
+ }
+
+ // If one is hanging, quit.
+ if ( allMyChildren.hanging ( 120 ) )
+ {
+ if ( verbosity > 0 )
+ cout << "failoverSoak: child hanging.\n";
+ allMyChildren.killEverybody();
+ killAllBrokers ( brokers );
+ return HANGING;
+ }
+
+ if ( verbosity > 0 ) {
+ std::cerr << "------- next kill-broker loop --------\n";
+ allMyChildren.print();
+ }
+ }
+
+ retval = allMyChildren.checkChildren();
+ if ( verbosity > 0 )
+ std::cerr << "failoverSoak: checkChildren: " << retval << endl;
+
+ if ( verbosity > 0 )
+ cout << "failoverSoak: maxBrokers reached.\n";
+
+ allMyChildren.killEverybody();
+ killAllBrokers ( brokers );
+
+ return retval ? ERROR_ON_CHILD : HUNKY_DORY;
+}
+
+
+