/* Copyright (c) 2003, 2005 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */ #include #include #include #include #include #include #include #include #include "userInterface.h" #include "dbGenerator.h" static int numProcesses; static int numSeconds; static int numWarmSeconds; static int parallellism; static int millisSendPoll; static int minEventSendPoll; static int forceSendPoll; static ThreadData *data; static Ndb_cluster_connection *g_cluster_connection= 0; static void usage(const char *prog) { const char *progname; /*--------------------------------------------*/ /* Get the name of the program (without path) */ /*--------------------------------------------*/ progname = strrchr(prog, '/'); if (progname == 0) progname = prog; else ++progname; ndbout_c( "Usage: %s [-proc ] [-warm ] [-time ] [ -p ] " "[-t ] [ -e ] [ -f ] \n" " -proc Specifies that is the number of\n" " threads. The default is 1.\n" " -time Specifies that the test will run for sec.\n" " The default is 10 sec\n" " -warm Specifies the warm-up/cooldown period of " "sec.\n" " The default is 10 sec\n" " -p The no of parallell transactions started by " "one thread\n" " -e Minimum no of events before wake up in call to " "sendPoll\n" " Default is 1\n" " -f force parameter to sendPoll\n" " Default is 0\n", progname); } static int parse_args(int argc, const char **argv) { int i; numProcesses = 1; numSeconds = 10; numWarmSeconds = 10; parallellism = 1; millisSendPoll = 10000; minEventSendPoll = 1; forceSendPoll = 0; i = 1; while (i < argc){ if (strcmp("-proc",argv[i]) == 0) { if (i + 1 >= argc) { return 1; } if (sscanf(argv[i+1], "%d", &numProcesses) == -1 || numProcesses <= 0 || numProcesses > 127) { ndbout_c("-proc flag requires a positive integer argument [1..127]"); return 1; } i += 2; } else if (strcmp("-p", argv[i]) == 0){ if(i + 1 >= argc){ usage(argv[0]); return 1; } if (sscanf(argv[i+1], "%d", ¶llellism) == -1 || parallellism <= 0){ ndbout_c("-p flag requires a positive integer argument"); return 1; } i += 2; } else if (strcmp("-time",argv[i]) == 0) { if (i + 1 >= argc) { return 1; } if (sscanf(argv[i+1], "%d", &numSeconds) == -1 || numSeconds < 0) { ndbout_c("-time flag requires a positive integer argument"); return 1; } i += 2; } else if (strcmp("-warm",argv[i]) == 0) { if (i + 1 >= argc) { return 1; } if (sscanf(argv[i+1], "%d", &numWarmSeconds) == -1 || numWarmSeconds < 0) { ndbout_c("-warm flag requires a positive integer argument"); return 1; } i += 2; } else if (strcmp("-e",argv[i]) == 0) { if (i + 1 >= argc) { return 1; } if (sscanf(argv[i+1], "%d", &minEventSendPoll) == -1 || minEventSendPoll < 0) { ndbout_c("-e flag requires a positive integer argument"); return 1; } i += 2; } else if (strcmp("-f",argv[i]) == 0) { if (i + 1 >= argc) { usage(argv[0]); return 1; } if (sscanf(argv[i+1], "%d", &forceSendPoll) == -1 || forceSendPoll < 0) { ndbout_c("-f flag requires a positive integer argument"); return 1; } i += 2; } else { return 1; } } if(minEventSendPoll > parallellism){ ndbout_c("minEventSendPoll(%d) > parallellism(%d)", minEventSendPoll, parallellism); ndbout_c("not very good..."); ndbout_c("very bad..."); ndbout_c("exiting..."); return 1; } return 0; } static void print_transaction(const char *header, unsigned long totalCount, TransactionDefinition *trans, unsigned int printBranch, unsigned int printRollback) { double f; ndbout_c(" %s: %d (%.2f%%) " "Latency(ms) avg: %d min: %d max: %d std: %d n: %d", header, trans->count, (double)trans->count / (double)totalCount * 100.0, (int)trans->latency.getMean(), (int)trans->latency.getMin(), (int)trans->latency.getMax(), (int)trans->latency.getStddev(), (int)trans->latency.getCount() ); if( printBranch ){ if( trans->count == 0 ) f = 0.0; else f = (double)trans->branchExecuted / (double)trans->count * 100.0; ndbout_c(" Branches Executed: %d (%.2f%%)", trans->branchExecuted, f); } if( printRollback ){ if( trans->count == 0 ) f = 0.0; else f = (double)trans->rollbackExecuted / (double)trans->count * 100.0; ndbout_c(" Rollback Executed: %d (%.2f%%)",trans->rollbackExecuted,f); } } void print_stats(const char *title, unsigned int length, unsigned int transactionFlag, GeneratorStatistics *gen, int numProc, int parallellism) { int i; char buf[10]; char name[MAXHOSTNAMELEN]; name[0] = 0; NdbHost_GetHostName(name); ndbout_c("\n------ %s ------",title); ndbout_c("Length : %d %s", length, transactionFlag ? "Transactions" : "sec"); ndbout_c("Processor : %s", name); ndbout_c("Number of Proc: %d",numProc); ndbout_c("Parallellism : %d", parallellism); ndbout_c("\n"); if( gen->totalTransactions == 0 ) { ndbout_c(" No Transactions for this test"); } else { for(i = 0; i < 5; i++) { sprintf(buf, "T%d",i+1); print_transaction(buf, gen->totalTransactions, &gen->transactions[i], i >= 2, i >= 3 ); } ndbout_c("\n"); ndbout_c(" Overall Statistics:"); ndbout_c(" Transactions: %d", gen->totalTransactions); ndbout_c(" Outer : %.0f TPS",gen->outerTps); ndbout_c("\n"); } } static void * threadRoutine(void *arg) { int i; ThreadData *data = (ThreadData *)arg; Ndb * pNDB; pNDB = asyncDbConnect(parallellism); /* NdbSleep_MilliSleep(rand() % 10); */ for(i = 0; ipThread = pThread; } else { perror("Failed to create thread"); rc = NDBT_FAILED; } } showTime(); /*--------------------------------*/ /* Wait for all processes to exit */ /*--------------------------------*/ for(i = 0; i < numProcesses; i++) { NdbThread_WaitFor(data[i*parallellism].pThread, &tmp); NdbThread_Destroy(&data[i*parallellism].pThread); } ndbout_c("All threads have finished"); /*-------------------------------------------*/ /* Clear all structures for total statistics */ /*-------------------------------------------*/ stats.totalTransactions = 0; stats.outerTps = 0.0; for(i = 0; i < NUM_TRANSACTION_TYPES; i++ ) { stats.transactions[i].count = 0; stats.transactions[i].branchExecuted = 0; stats.transactions[i].rollbackExecuted = 0; stats.transactions[i].latency.reset(); } /*--------------------------------*/ /* Add the values for all Threads */ /*--------------------------------*/ for(i = 0; i < numProcesses; i++) { for(k = 0; ktotalTransactions; stats.outerTps += p->outerTps; for(j = 0; j < NUM_TRANSACTION_TYPES; j++ ) { stats.transactions[j].count += p->transactions[j].count; stats.transactions[j].branchExecuted += p->transactions[j].branchExecuted; stats.transactions[j].rollbackExecuted += p->transactions[j].rollbackExecuted; stats.transactions[j].latency += p->transactions[j].latency; } } } print_stats("Test Results", numSeconds, 0, &stats, numProcesses, parallellism); free(data); NDBT_ProgramExit(rc); } /*************************************************************** * I N C L U D E D F I L E S * ***************************************************************/ #include #include #include #include #include "ndb_schema.hpp" #include "ndb_error.hpp" #include "userInterface.h" #include #include #include #include #include /*************************************************************** * L O C A L C O N S T A N T S * ***************************************************************/ /*************************************************************** * L O C A L D A T A S T R U C T U R E S * ***************************************************************/ /*************************************************************** * L O C A L F U N C T I O N S * ***************************************************************/ #ifndef NDB_WIN32 #include #endif Ndb* asyncDbConnect(int parallellism){ Ndb * pNDB = new Ndb(g_cluster_connection, "TEST_DB"); pNDB->init(parallellism + 1); while(pNDB->waitUntilReady() != 0){ } return pNDB; } void asyncDbDisconnect(Ndb* pNDB) { delete pNDB; } double userGetTime(void) { static bool initialized = false; static NDB_TICKS initSecs = 0; static Uint32 initMicros = 0; double timeValue = 0; if ( !initialized ) { initialized = true; NdbTick_CurrentMicrosecond(&initSecs, &initMicros); timeValue = 0.0; } else { NDB_TICKS secs = 0; Uint32 micros = 0; NdbTick_CurrentMicrosecond(&secs, µs); double s = (double)secs - (double)initSecs; double us = (double)micros - (double)initMicros; timeValue = s + (us / 1000000.0); } return timeValue; } void showTime() { char buf[128]; struct tm* tm_now; time_t now; now = ::time((time_t*)NULL); tm_now = ::gmtime(&now); ::snprintf(buf, 128, "%d-%.2d-%.2d %.2d:%.2d:%.2d", tm_now->tm_year + 1900, tm_now->tm_mon, tm_now->tm_mday, tm_now->tm_hour, tm_now->tm_min, tm_now->tm_sec); ndbout_c("Time: %s", buf); }