diff options
author | Alan Conway <aconway@apache.org> | 2008-11-19 20:22:38 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-11-19 20:22:38 +0000 |
commit | 66442ebf857fb8a6981ee4b0c45eb2f74143c7d0 (patch) | |
tree | 6faae8c1d841df8b7d5f15a68654f705fbe84bcc /cpp/src/tests/failover_soak.cpp | |
parent | 715dcd61baf8d367eeb31c083bf6a9650e829c4c (diff) | |
download | qpid-python-66442ebf857fb8a6981ee4b0c45eb2f74143c7d0.tar.gz |
tests/failover_soak: run a cluster with clients, kill and restart cluster members, verify client fail-over.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@719055 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/failover_soak.cpp')
-rw-r--r-- | cpp/src/tests/failover_soak.cpp | 651 |
1 files changed, 651 insertions, 0 deletions
diff --git a/cpp/src/tests/failover_soak.cpp b/cpp/src/tests/failover_soak.cpp new file mode 100644 index 0000000000..6971bad8f9 --- /dev/null +++ b/cpp/src/tests/failover_soak.cpp @@ -0,0 +1,651 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/wait.h> +#include <sys/time.h> +#include <string.h> + +#include <string> +#include <iostream> +#include <sstream> +#include <vector> + +#include <ForkedBroker.h> + + + + +using namespace std; + + +typedef vector<ForkedBroker *> brokerVector; + +typedef enum +{ + NO_STATUS, + RUNNING, + COMPLETED +} +childStatus; + + + +struct child +{ + child ( string & name, pid_t pid ) + : name(name), pid(pid), retval(-999), status(RUNNING) + { + gettimeofday ( & startTime, 0 ); + } + + + void + done ( int _retval ) + { + retval = _retval; + status = COMPLETED; + gettimeofday ( & stopTime, 0 ); + } + + + string name; + pid_t pid; + int retval; + childStatus status; + struct timeval startTime, + stopTime; +}; + + + + +struct children : public vector<child *> +{ + void + add ( string & name, pid_t pid ) + { + push_back(new child ( name, pid )); + } + + + child * + get ( pid_t pid ) + { + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + if ( pid == (*i)->pid ) + return *i; + + return 0; + } + + + void + exited ( pid_t pid, int retval ) + { + child * kid = get ( pid ); + if(! kid) + { + if ( verbosity > 0 ) + { + cerr << "children::exited warning: Can't find child with pid " + << pid + << endl; + } + return; + } + + kid->done ( retval ); + } + + + int + unfinished ( ) + { + int count = 0; + + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + if ( COMPLETED != (*i)->status ) + ++ count; + + return count; + } + + + int + checkChildren ( ) + { + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) ) + return (*i)->retval; + + return 0; + } + + + void + killEverybody ( ) + { + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + kill ( (*i)->pid, 9 ); + } + + + + void + print ( ) + { + cout << "--- status of all children --------------\n"; + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + cout << "child: " << (*i)->name + << " status: " << (*i)->status + << endl; + cout << "\n\n\n\n"; + } + + + /* + Only call this if you already know there is at least + one child still running. Supply a time in seconds. + If it has been at least that long since a shild stopped + running, we judge the system to have hung. + */ + bool + hanging ( int hangTime ) + { + struct timeval now, + duration; + gettimeofday ( &now, 0 ); + + vector<child *>::iterator i; + for ( i = begin(); i != end(); ++ i ) + { + timersub ( & now, &((*i)->startTime), & duration ); + if ( duration.tv_sec >= hangTime ) + return true; + } + + return false; + } + + + int verbosity; +}; + + + +children allMyChildren; + + + + +void +childExit ( int signalNumber ) +{ + signalNumber ++; // Now maybe the compiler willleave me alone? + int childReturnCode; + pid_t pid = waitpid ( 0, & childReturnCode, WNOHANG); + + if ( pid > 0 ) + allMyChildren.exited ( pid, childReturnCode ); +} + + + +int +mrand ( int maxDesiredVal ) { + double zeroToOne = (double) rand() / (double) RAND_MAX; + return (int) (zeroToOne * (double) maxDesiredVal); +} + + + +int +mrand ( int minDesiredVal, int maxDesiredVal ) { + int interval = maxDesiredVal - minDesiredVal; + return minDesiredVal + mrand ( interval ); +} + + + +void +makeClusterName ( string & s, int & num ) { + num = mrand(1000); + stringstream ss; + ss << "soakTestCluster_" << num; + s = ss.str(); +} + + + + + +void +printBrokers ( brokerVector & brokers ) +{ + cout << "Broker List ------------ size: " << brokers.size() << "\n"; + for ( brokerVector::iterator i = brokers.begin(); i != brokers.end(); ++ i) { + cout << "pid: " + << (*i)->getPID() + << " port: " + << (*i)->getPort() + << endl; + } + cout << "end Broker List ------------\n"; +} + + + + + +void +startNewBroker ( brokerVector & brokers, + char const * srcRoot, + char const * moduleDir, + string const clusterName ) +{ + stringstream path; + path << srcRoot << "/qpidd"; + + const char * const argv[] = + { + "qpidd", + "-p0", + "--module-dir", + moduleDir, + "--load-module=cluster.so", + "--cluster-name", + clusterName.c_str(), + "--auth=no", + "--no-data-dir", + "--mgmt-enable=no", + 0 + }; + + size_t argc = sizeof(argv)/sizeof(argv[0]); + brokers.push_back ( new ForkedBroker ( argc, argv ) ); +} + + + + + +void +killFrontBroker ( brokerVector & brokers, int verbosity ) +{ + if ( verbosity > 0 ) + cout << "killFrontBroker pid: " << brokers[0]->getPID() << " on port " << brokers[0]->getPort() << endl; + try { brokers[0]->kill(9); } + catch ( const exception& error ) { + if ( verbosity > 0 ) + cout << "error killing broker: " << error.what() << endl; + } + delete brokers[0]; + brokers.erase ( brokers.begin() ); +} + + + + + +void +killAllBrokers ( brokerVector & brokers ) +{ + for ( uint i = 0; i < brokers.size(); ++ i ) + try { brokers[i]->kill(9); } + catch ( ... ) { } +} + + + + + +pid_t +runDeclareQueuesClient ( brokerVector brokers, + char const * host, + char const * path, + int verbosity + ) +{ + string name("declareQueues"); + int port = brokers[0]->getPort ( ); + + if ( verbosity > 0 ) + cout << "startDeclareQueuesClient: host: " + << host + << " port: " + << port + << endl; + stringstream portSs; + portSs << port; + + vector<const char*> argv; + argv.push_back ( "declareQueues" ); + argv.push_back ( host ); + argv.push_back ( portSs.str().c_str() ); + argv.push_back ( 0 ); + pid_t pid = fork(); + + if ( ! pid ) { + execv ( path, const_cast<char * const *>(&argv[0]) ); + perror ( "error executing dq: " ); + return 0; + } + + allMyChildren.add ( name, pid ); + return pid; +} + + + + + +pid_t +startReceivingClient ( brokerVector brokers, + char const * host, + char const * receiverPath, + char const * reportFrequency, + int verbosity + ) +{ + string name("receiver"); + int port = brokers[0]->getPort ( ); + + if ( verbosity > 0 ) + cout << "startReceivingClient: port " << port << endl; + char portStr[100]; + char verbosityStr[100]; + sprintf(portStr, "%d", port); + sprintf(verbosityStr, "%d", verbosity); + + + vector<const char*> argv; + argv.push_back ( "resumingReceiver" ); + argv.push_back ( host ); + argv.push_back ( portStr ); + argv.push_back ( reportFrequency ); + argv.push_back ( verbosityStr ); + argv.push_back ( 0 ); + + pid_t pid = fork(); + + if ( ! pid ) { + execv ( receiverPath, const_cast<char * const *>(&argv[0]) ); + perror ( "error executing receiver: " ); + return 0; + } + + allMyChildren.add ( name, pid ); + return pid; +} + + + + + +pid_t +startSendingClient ( brokerVector brokers, + char const * host, + char const * senderPath, + char const * nMessages, + char const * reportFrequency, + int verbosity + ) +{ + string name("sender"); + int port = brokers[0]->getPort ( ); + + if ( verbosity ) + cout << "startSenderClient: port " << port << endl; + char portStr[100]; + char verbosityStr[100]; + + sprintf ( portStr, "%d", port); + sprintf ( verbosityStr, "%d", verbosity); + + vector<const char*> argv; + argv.push_back ( "replayingSender" ); + argv.push_back ( host ); + argv.push_back ( portStr ); + argv.push_back ( nMessages ); + argv.push_back ( reportFrequency ); + argv.push_back ( verbosityStr ); + argv.push_back ( 0 ); + + pid_t pid = fork(); + + if ( ! pid ) { + execv ( senderPath, const_cast<char * const *>(&argv[0]) ); + perror ( "error executing sender: " ); + return 0; + } + + allMyChildren.add ( name, pid ); + return pid; +} + + + +#define HUNKY_DORY 0 +#define BAD_ARGS 1 +#define CANT_FORK_DQ 2 +#define CANT_FORK_RECEIVER 3 +#define DQ_FAILED 4 +#define ERROR_ON_CHILD 5 +#define HANGING 6 + + +int +main ( int argc, char const ** argv ) +{ + if ( argc < 9 ) { + cerr << "Usage: failoverSoak srcRoot moduleDir host senderPath receiverPath nMessages verbosity\n"; + cerr << " ( argc was " << argc << " )\n"; + return BAD_ARGS; + } + + signal ( SIGCHLD, childExit ); + + char const * srcRoot = argv[1]; + char const * moduleDir = argv[2]; + char const * host = argv[3]; + char const * declareQueuesPath = argv[4]; + char const * senderPath = argv[5]; + char const * receiverPath = argv[6]; + char const * nMessages = argv[7]; + char const * reportFrequency = argv[8]; + int verbosity = atoi(argv[9]); + + int maxBrokers = 50; + + allMyChildren.verbosity = verbosity; + + int clusterNum; + string clusterName; + + srand ( getpid() ); + + makeClusterName ( clusterName, clusterNum ); + + brokerVector brokers; + + if ( verbosity > 0 ) + cout << "Starting initial cluster...\n"; + + int nBrokers = 3; + for ( int i = 0; i < nBrokers; ++ i ) { + startNewBroker ( brokers, + srcRoot, + moduleDir, + clusterName ); + } + + + if ( verbosity > 0 ) + printBrokers ( brokers ); + + // Run the declareQueues child. + int childStatus; + pid_t dqClientPid = + runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity ); + if ( -1 == dqClientPid ) { + cerr << "failoverSoak error: Couldn't fork declareQueues.\n"; + return CANT_FORK_DQ; + } + + // Don't continue until declareQueues is finished. + pid_t retval = waitpid ( dqClientPid, & childStatus, 0); + if ( retval != dqClientPid) { + cerr << "failoverSoak error: waitpid on declareQueues returned value " << retval << endl; + return DQ_FAILED; + } + allMyChildren.exited ( dqClientPid, childStatus ); + + + + // Start the receiving client. + pid_t receivingClientPid = + startReceivingClient ( brokers, + host, + receiverPath, + reportFrequency, + verbosity ); + if ( -1 == receivingClientPid ) { + cerr << "failoverSoak error: Couldn't fork receiver.\n"; + return CANT_FORK_RECEIVER; + } + + + // Start the sending client. + pid_t sendingClientPid = + startSendingClient ( brokers, + host, + senderPath, + nMessages, + reportFrequency, + verbosity ); + if ( -1 == sendingClientPid ) { + cerr << "failoverSoak error: Couldn't fork sender.\n"; + return CANT_FORK_RECEIVER; + } + + + int minSleep = 3, + maxSleep = 6; + + + for ( int totalBrokers = 3; + totalBrokers < maxBrokers; + ++ totalBrokers + ) + { + if ( verbosity > 0 ) + cout << totalBrokers << " brokers have been added to the cluster.\n\n\n"; + + // Sleep for a while. ------------------------- + int sleepyTime = mrand ( minSleep, maxSleep ); + if ( verbosity > 0 ) + cout << "Sleeping for " << sleepyTime << " seconds.\n"; + sleep ( sleepyTime ); + + // Kill the oldest broker. -------------------------- + killFrontBroker ( brokers, verbosity ); + + // Sleep for a while. ------------------------- + sleepyTime = mrand ( minSleep, maxSleep ); + if ( verbosity > 0 ) + cerr << "Sleeping for " << sleepyTime << " seconds.\n"; + sleep ( sleepyTime ); + + // Start a new broker. -------------------------- + if ( verbosity > 0 ) + cout << "Starting new broker.\n\n"; + + startNewBroker ( brokers, + srcRoot, + moduleDir, + clusterName ); + + if ( verbosity > 0 ) + printBrokers ( brokers ); + + // If all children have exited, quit. + int unfinished = allMyChildren.unfinished(); + if ( ! unfinished ) { + killAllBrokers ( brokers ); + + if ( verbosity > 0 ) + cout << "failoverSoak: all children have exited.\n"; + int retval = allMyChildren.checkChildren(); + if ( verbosity > 0 ) + std::cerr << "failoverSoak: checkChildren: " << retval << endl; + return retval ? ERROR_ON_CHILD : HUNKY_DORY; + } + + // Even if some are still running, if there's an error, quit. + if ( allMyChildren.checkChildren() ) + { + if ( verbosity > 0 ) + cout << "failoverSoak: error on child.\n"; + allMyChildren.killEverybody(); + killAllBrokers ( brokers ); + return ERROR_ON_CHILD; + } + + // If one is hanging, quit. + if ( allMyChildren.hanging ( 120 ) ) + { + if ( verbosity > 0 ) + cout << "failoverSoak: child hanging.\n"; + allMyChildren.killEverybody(); + killAllBrokers ( brokers ); + return HANGING; + } + + if ( verbosity > 0 ) { + std::cerr << "------- next kill-broker loop --------\n"; + allMyChildren.print(); + } + } + + retval = allMyChildren.checkChildren(); + if ( verbosity > 0 ) + std::cerr << "failoverSoak: checkChildren: " << retval << endl; + + if ( verbosity > 0 ) + cout << "failoverSoak: maxBrokers reached.\n"; + + allMyChildren.killEverybody(); + killAllBrokers ( brokers ); + + return retval ? ERROR_ON_CHILD : HUNKY_DORY; +} + + + |