/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include #include #include #include #include #include #include #include #include using namespace std; typedef vector brokerVector; typedef enum { NO_STATUS, RUNNING, COMPLETED } childStatus; struct child { child ( string & name, pid_t pid ) : name(name), pid(pid), retval(-999), status(RUNNING) { gettimeofday ( & startTime, 0 ); } void done ( int _retval ) { retval = _retval; status = COMPLETED; gettimeofday ( & stopTime, 0 ); } string name; pid_t pid; int retval; childStatus status; struct timeval startTime, stopTime; }; struct children : public vector { void add ( string & name, pid_t pid ) { push_back(new child ( name, pid )); } child * get ( pid_t pid ) { vector::iterator i; for ( i = begin(); i != end(); ++ i ) if ( pid == (*i)->pid ) return *i; return 0; } void exited ( pid_t pid, int retval ) { child * kid = get ( pid ); if(! kid) { if ( verbosity > 0 ) { cerr << "children::exited warning: Can't find child with pid " << pid << endl; } return; } kid->done ( retval ); } int unfinished ( ) { int count = 0; vector::iterator i; for ( i = begin(); i != end(); ++ i ) if ( COMPLETED != (*i)->status ) ++ count; return count; } int checkChildren ( ) { vector::iterator i; for ( i = begin(); i != end(); ++ i ) if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) ) return (*i)->retval; return 0; } void killEverybody ( ) { vector::iterator i; for ( i = begin(); i != end(); ++ i ) kill ( (*i)->pid, 9 ); } void print ( ) { cout << "--- status of all children --------------\n"; vector::iterator i; for ( i = begin(); i != end(); ++ i ) cout << "child: " << (*i)->name << " status: " << (*i)->status << endl; cout << "\n\n\n\n"; } /* Only call this if you already know there is at least one child still running. Supply a time in seconds. If it has been at least that long since a shild stopped running, we judge the system to have hung. */ bool hanging ( int hangTime ) { struct timeval now, duration; gettimeofday ( &now, 0 ); vector::iterator i; for ( i = begin(); i != end(); ++ i ) { timersub ( & now, &((*i)->startTime), & duration ); if ( duration.tv_sec >= hangTime ) return true; } return false; } int verbosity; }; children allMyChildren; void childExit ( int signalNumber ) { signalNumber ++; // Now maybe the compiler willleave me alone? int childReturnCode; pid_t pid = waitpid ( 0, & childReturnCode, WNOHANG); if ( pid > 0 ) allMyChildren.exited ( pid, childReturnCode ); } int mrand ( int maxDesiredVal ) { double zeroToOne = (double) rand() / (double) RAND_MAX; return (int) (zeroToOne * (double) maxDesiredVal); } int mrand ( int minDesiredVal, int maxDesiredVal ) { int interval = maxDesiredVal - minDesiredVal; return minDesiredVal + mrand ( interval ); } void makeClusterName ( string & s, int & num ) { num = mrand(1000); stringstream ss; ss << "soakTestCluster_" << num; s = ss.str(); } void printBrokers ( brokerVector & brokers ) { cout << "Broker List ------------ size: " << brokers.size() << "\n"; for ( brokerVector::iterator i = brokers.begin(); i != brokers.end(); ++ i) { cout << "pid: " << (*i)->getPID() << " port: " << (*i)->getPort() << endl; } cout << "end Broker List ------------\n"; } void startNewBroker ( brokerVector & brokers, char const * srcRoot, char const * moduleDir, string const clusterName ) { static int brokerId = 0; stringstream path, prefix, module; module << moduleDir << "/cluster.so"; path << srcRoot << "/qpidd"; prefix << "soak-" << brokerId++; const char * const argv[] = { "qpidd", "-p0", "--load-module=cluster.so", "--cluster-name", clusterName.c_str(), "--auth=no", "--no-data-dir", "--no-module-dir", "--mgmt-enable=no", "--log-prefix", prefix.str().c_str(), 0 }; size_t argc = sizeof(argv)/sizeof(argv[0]); brokers.push_back ( new ForkedBroker ( argc, argv ) ); } void killFrontBroker ( brokerVector & brokers, int verbosity ) { if ( verbosity > 0 ) cout << "killFrontBroker pid: " << brokers[0]->getPID() << " on port " << brokers[0]->getPort() << endl; try { brokers[0]->kill(9); } catch ( const exception& error ) { if ( verbosity > 0 ) cout << "error killing broker: " << error.what() << endl; } delete brokers[0]; brokers.erase ( brokers.begin() ); } void killAllBrokers ( brokerVector & brokers ) { for ( uint i = 0; i < brokers.size(); ++ i ) try { brokers[i]->kill(9); } catch ( ... ) { } } pid_t runDeclareQueuesClient ( brokerVector brokers, char const * host, char const * path, int verbosity ) { string name("declareQueues"); int port = brokers[0]->getPort ( ); if ( verbosity > 0 ) cout << "startDeclareQueuesClient: host: " << host << " port: " << port << endl; stringstream portSs; portSs << port; vector argv; argv.push_back ( "declareQueues" ); argv.push_back ( host ); argv.push_back ( portSs.str().c_str() ); argv.push_back ( 0 ); pid_t pid = fork(); if ( ! pid ) { execv ( path, const_cast(&argv[0]) ); perror ( "error executing dq: " ); return 0; } allMyChildren.add ( name, pid ); return pid; } pid_t startReceivingClient ( brokerVector brokers, char const * host, char const * receiverPath, char const * reportFrequency, int verbosity ) { string name("receiver"); int port = brokers[0]->getPort ( ); if ( verbosity > 0 ) cout << "startReceivingClient: port " << port << endl; char portStr[100]; char verbosityStr[100]; sprintf(portStr, "%d", port); sprintf(verbosityStr, "%d", verbosity); vector argv; argv.push_back ( "resumingReceiver" ); argv.push_back ( host ); argv.push_back ( portStr ); argv.push_back ( reportFrequency ); argv.push_back ( verbosityStr ); argv.push_back ( 0 ); pid_t pid = fork(); if ( ! pid ) { execv ( receiverPath, const_cast(&argv[0]) ); perror ( "error executing receiver: " ); return 0; } allMyChildren.add ( name, pid ); return pid; } pid_t startSendingClient ( brokerVector brokers, char const * host, char const * senderPath, char const * nMessages, char const * reportFrequency, int verbosity ) { string name("sender"); int port = brokers[0]->getPort ( ); if ( verbosity ) cout << "startSenderClient: port " << port << endl; char portStr[100]; char verbosityStr[100]; sprintf ( portStr, "%d", port); sprintf ( verbosityStr, "%d", verbosity); vector argv; argv.push_back ( "replayingSender" ); argv.push_back ( host ); argv.push_back ( portStr ); argv.push_back ( nMessages ); argv.push_back ( reportFrequency ); argv.push_back ( verbosityStr ); argv.push_back ( 0 ); pid_t pid = fork(); if ( ! pid ) { execv ( senderPath, const_cast(&argv[0]) ); perror ( "error executing sender: " ); return 0; } allMyChildren.add ( name, pid ); return pid; } #define HUNKY_DORY 0 #define BAD_ARGS 1 #define CANT_FORK_DQ 2 #define CANT_FORK_RECEIVER 3 #define DQ_FAILED 4 #define ERROR_ON_CHILD 5 #define HANGING 6 int main ( int argc, char const ** argv ) { if ( argc < 9 ) { cerr << "Usage: failoverSoak srcRoot moduleDir host senderPath receiverPath nMessages verbosity\n"; cerr << " ( argc was " << argc << " )\n"; return BAD_ARGS; } signal ( SIGCHLD, childExit ); char const * srcRoot = argv[1]; char const * moduleDir = argv[2]; char const * host = argv[3]; char const * declareQueuesPath = argv[4]; char const * senderPath = argv[5]; char const * receiverPath = argv[6]; char const * nMessages = argv[7]; char const * reportFrequency = argv[8]; int verbosity = atoi(argv[9]); int maxBrokers = 50; allMyChildren.verbosity = verbosity; int clusterNum; string clusterName; srand ( getpid() ); makeClusterName ( clusterName, clusterNum ); brokerVector brokers; if ( verbosity > 0 ) cout << "Starting initial cluster...\n"; int nBrokers = 3; for ( int i = 0; i < nBrokers; ++ i ) { startNewBroker ( brokers, srcRoot, moduleDir, clusterName ); } if ( verbosity > 0 ) printBrokers ( brokers ); // Run the declareQueues child. int childStatus; pid_t dqClientPid = runDeclareQueuesClient ( brokers, host, declareQueuesPath, verbosity ); if ( -1 == dqClientPid ) { cerr << "failoverSoak error: Couldn't fork declareQueues.\n"; return CANT_FORK_DQ; } // Don't continue until declareQueues is finished. pid_t retval = waitpid ( dqClientPid, & childStatus, 0); if ( retval != dqClientPid) { cerr << "failoverSoak error: waitpid on declareQueues returned value " << retval << endl; return DQ_FAILED; } allMyChildren.exited ( dqClientPid, childStatus ); // Start the receiving client. pid_t receivingClientPid = startReceivingClient ( brokers, host, receiverPath, reportFrequency, verbosity ); if ( -1 == receivingClientPid ) { cerr << "failoverSoak error: Couldn't fork receiver.\n"; return CANT_FORK_RECEIVER; } // Start the sending client. pid_t sendingClientPid = startSendingClient ( brokers, host, senderPath, nMessages, reportFrequency, verbosity ); if ( -1 == sendingClientPid ) { cerr << "failoverSoak error: Couldn't fork sender.\n"; return CANT_FORK_RECEIVER; } int minSleep = 3, maxSleep = 6; for ( int totalBrokers = 3; totalBrokers < maxBrokers; ++ totalBrokers ) { if ( verbosity > 0 ) cout << totalBrokers << " brokers have been added to the cluster.\n\n\n"; // Sleep for a while. ------------------------- int sleepyTime = mrand ( minSleep, maxSleep ); if ( verbosity > 0 ) cout << "Sleeping for " << sleepyTime << " seconds.\n"; sleep ( sleepyTime ); // Kill the oldest broker. -------------------------- killFrontBroker ( brokers, verbosity ); // Sleep for a while. ------------------------- sleepyTime = mrand ( minSleep, maxSleep ); if ( verbosity > 0 ) cerr << "Sleeping for " << sleepyTime << " seconds.\n"; sleep ( sleepyTime ); // Start a new broker. -------------------------- if ( verbosity > 0 ) cout << "Starting new broker.\n\n"; startNewBroker ( brokers, srcRoot, moduleDir, clusterName ); if ( verbosity > 0 ) printBrokers ( brokers ); // If all children have exited, quit. int unfinished = allMyChildren.unfinished(); if ( ! unfinished ) { killAllBrokers ( brokers ); if ( verbosity > 0 ) cout << "failoverSoak: all children have exited.\n"; int retval = allMyChildren.checkChildren(); if ( verbosity > 0 ) std::cerr << "failoverSoak: checkChildren: " << retval << endl; return retval ? ERROR_ON_CHILD : HUNKY_DORY; } // Even if some are still running, if there's an error, quit. if ( allMyChildren.checkChildren() ) { if ( verbosity > 0 ) cout << "failoverSoak: error on child.\n"; allMyChildren.killEverybody(); killAllBrokers ( brokers ); return ERROR_ON_CHILD; } // If one is hanging, quit. if ( allMyChildren.hanging ( 120 ) ) { if ( verbosity > 0 ) cout << "failoverSoak: child hanging.\n"; allMyChildren.killEverybody(); killAllBrokers ( brokers ); return HANGING; } if ( verbosity > 0 ) { std::cerr << "------- next kill-broker loop --------\n"; allMyChildren.print(); } } retval = allMyChildren.checkChildren(); if ( verbosity > 0 ) std::cerr << "failoverSoak: checkChildren: " << retval << endl; if ( verbosity > 0 ) cout << "failoverSoak: maxBrokers reached.\n"; allMyChildren.killEverybody(); killAllBrokers ( brokers ); return retval ? ERROR_ON_CHILD : HUNKY_DORY; }