diff options
Diffstat (limited to 'ndb/test/ndbapi/flexBench.cpp')
-rw-r--r-- | ndb/test/ndbapi/flexBench.cpp | 1153 |
1 files changed, 1153 insertions, 0 deletions
diff --git a/ndb/test/ndbapi/flexBench.cpp b/ndb/test/ndbapi/flexBench.cpp new file mode 100644 index 00000000000..809d11086bf --- /dev/null +++ b/ndb/test/ndbapi/flexBench.cpp @@ -0,0 +1,1153 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +/* *************************************************** +FLEXBENCH +Perform benchmark of insert, update and delete transactions + +Arguments: + -t Number of threads to start, default 1 + -o Number of operations per loop, default 500 + -l Number of loops to run, default 1, 0=infinite + -a Number of attributes, default 25 + -c Number of tables, default 1 + -s Size of each attribute, default 1 (Primary Key is always of size 1, + independent of this value) + -lkn Number of long primary keys, default 1 + -lks Size of each long primary key, default 1 + -simple Use simple read to read from database + -dirty Use dirty read to read from database + -write Use writeTuple in insert and update + -stdtables Use standard table names + -no_table_create Don't create tables in db + -sleep Sleep a number of seconds before running the test, this + can be used so that another flexBench have time to create tables + -temp Use tables without logging + -verify Verify inserts, updates and deletes +#ifdef CEBIT_STAT + -statserv host:port statistics server to report to + -statfreq ops report every ops operations (default 100) +#endif + Returns: + 0 - Test passed + 1 - Test failed + 2 - Invalid arguments + +* *************************************************** */ + +#include "NdbApi.hpp" + +#include <NdbMain.h> +#include <NdbOut.hpp> +#include <NdbSleep.h> +#include <NdbTick.h> +#include <NdbTimer.hpp> +#include <NdbThread.h> + +#include <NdbTest.hpp> + +#define MAXSTRLEN 16 +#define MAXATTR 64 +#define MAXTABLES 128 +#define MAXATTRSIZE 1000 +#define MAXNOLONGKEY 16 // Max number of long keys. +#define MAXLONGKEYTOTALSIZE 1023 // words = 4092 bytes + +extern "C" { static void* flexBenchThread(void*); } +static int readArguments(int argc, const char** argv); +static int createTables(Ndb*); +static void sleepBeforeStartingTest(int seconds); +static void input_error(); + +enum StartType { + stIdle, + stInsert, + stVerify, + stRead, + stUpdate, + stDelete, + stTryDelete, + stVerifyDelete, + stStop +}; + +struct ThreadData +{ + int threadNo; + NdbThread* threadLife; + int threadReady; + StartType threadStart; + int threadResult; +}; + +static int tNodeId = 0 ; +static char tableName[MAXTABLES][MAXSTRLEN+1]; +static char attrName[MAXATTR][MAXSTRLEN+1]; +static char** longKeyAttrName; + +// Program Parameters +static int tNoOfLoops = 1; +static int tAttributeSize = 1; +static unsigned int tNoOfThreads = 1; +static unsigned int tNoOfTables = 1; +static unsigned int tNoOfAttributes = 25; +static unsigned int tNoOfOperations = 500; +static unsigned int tSleepTime = 0; +static unsigned int tNoOfLongPK = 1; +static unsigned int tSizeOfLongPK = 1; + +//Program Flags +static int theSimpleFlag = 0; +static int theDirtyFlag = 0; +static int theWriteFlag = 0; +static int theStdTableNameFlag = 0; +static int theTableCreateFlag = 0; +static bool theTempTable = false; +static bool VerifyFlag = true; +static bool useLongKeys = false; + + +static ErrorData theErrorData; // Part of flexBench-program + +#define START_TIMER { NdbTimer timer; timer.doStart(); +#define STOP_TIMER timer.doStop(); +#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; + +#include <NdbTCP.h> + +#ifdef CEBIT_STAT +#include <NdbMutex.h> +static bool statEnable = false; +static char statHost[100]; +static int statFreq = 100; +static int statPort = 0; +static int statSock = -1; +static enum { statError = -1, statClosed, statOpen } statState; +static NdbMutex statMutex = NDB_MUTEX_INITIALIZER; +#endif + +//------------------------------------------------------------------- +// Statistical Reporting routines +//------------------------------------------------------------------- +#ifdef CEBIT_STAT +// Experimental client-side statistic for CeBIT + +static void +statReport(enum StartType st, int ops) +{ + if (!statEnable) + return; + if (NdbMutex_Lock(&statMutex) < 0) { + if (statState != statError) { + ndbout_c("stat: lock mutex failed: %s", strerror(errno)); + statState = statError; + } + return; + } + static int nodeid; + // open connection + if (statState != statOpen) { + char *p = getenv("NDB_NODEID"); // ndbnet sets NDB_NODEID + nodeid = p == 0 ? 0 : atoi(p); + if ((statSock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + if (statState != statError) { + ndbout_c("stat: create socket failed: %s", strerror(errno)); + statState = statError; + } + (void)NdbMutex_Unlock(&statMutex); + return; + } + struct sockaddr_in saddr; + memset(&saddr, 0, sizeof(saddr)); + saddr.sin_family = AF_INET; + saddr.sin_port = htons(statPort); + if (Ndb_getInAddr(&saddr.sin_addr, statHost) < 0) { + if (statState != statError) { + ndbout_c("stat: host %s not found", statHost); + statState = statError; + } + (void)close(statSock); + (void)NdbMutex_Unlock(&statMutex); + return; + } + if (connect(statSock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { + if (statState != statError) { + ndbout_c("stat: connect failed: %s", strerror(errno)); + statState = statError; + } + (void)close(statSock); + (void)NdbMutex_Unlock(&statMutex); + return; + } + statState = statOpen; + ndbout_c("stat: connection to %s:%d opened", statHost, (int)statPort); + } + const char *text; + switch (st) { + case stInsert: + text = "insert"; + break; + case stVerify: + text = "verify"; + break; + case stRead: + text = "read"; + break; + case stUpdate: + text = "update"; + break; + case stDelete: + text = "delete"; + break; + case stVerifyDelete: + text = "verifydelete"; + break; + default: + text = "unknown"; + break; + } + char buf[100]; + sprintf(buf, "%d %s %d\n", nodeid, text, ops); + int len = strlen(buf); + // assume SIGPIPE already ignored + if (write(statSock, buf, len) != len) { + if (statState != statError) { + ndbout_c("stat: write failed: %s", strerror(errno)); + statState = statError; + } + (void)close(statSock); + (void)NdbMutex_Unlock(&statMutex); + return; + } + (void)NdbMutex_Unlock(&statMutex); +} +#endif // CEBIT_STAT + +static void +resetThreads(ThreadData* pt){ + for (unsigned int i = 0; i < tNoOfThreads; i++){ + pt[i].threadReady = 0; + pt[i].threadResult = 0; + pt[i].threadStart = stIdle; + } +} + +static int +checkThreadResults(ThreadData* pt){ + for (unsigned int i = 0; i < tNoOfThreads; i++){ + if(pt[i].threadResult != 0){ + ndbout_c("Thread%d reported fatal error %d", i, pt[i].threadResult); + return -1; + } + } + return 0; +} + +static +void +waitForThreads(ThreadData* pt) +{ + int cont = 1; + while (cont){ + NdbSleep_MilliSleep(100); + cont = 0; + for (unsigned int i = 0; i < tNoOfThreads; i++){ + if (pt[i].threadReady == 0) + cont = 1; + } + } +} + +static void +tellThreads(ThreadData* pt, StartType what) +{ + for (unsigned int i = 0; i < tNoOfThreads; i++) + pt[i].threadStart = what; +} + +NDB_COMMAND(flexBench, "flexBench", "flexBench", "flexbench", 65535) +{ + ThreadData* pThreadsData; + int tLoops = 0; + int returnValue = NDBT_OK; + + if (readArguments(argc, argv) != 0){ + input_error(); + return NDBT_ProgramExit(NDBT_WRONGARGS); + } + + if(useLongKeys){ + longKeyAttrName = (char **) malloc(sizeof(char*) * tNoOfLongPK); + for (Uint32 i = 0; i < tNoOfLongPK; i++) { + longKeyAttrName[i] = (char *) malloc(strlen("KEYATTR ") + 1); + memset(longKeyAttrName[i], 0, strlen("KEYATTR ") + 1); + sprintf(longKeyAttrName[i], "KEYATTR%i", i); + } + } + + pThreadsData = new ThreadData[tNoOfThreads]; + + ndbout << endl << "FLEXBENCH - Starting normal mode" << endl; + ndbout << "Perform benchmark of insert, update and delete transactions"<< endl; + ndbout << " " << tNoOfThreads << " thread(s) " << endl; + ndbout << " " << tNoOfLoops << " iterations " << endl; + ndbout << " " << tNoOfTables << " table(s) and " << 1 << " operation(s) per transaction " <<endl; + ndbout << " " << tNoOfAttributes << " attributes per table " << endl; + ndbout << " " << tNoOfOperations << " transaction(s) per thread and round " << endl; + ndbout << " " << tAttributeSize << " is the number of 32 bit words per attribute "<< endl; + ndbout << " " << "Table(s) without logging: " << (Uint32)theTempTable << endl; + + if(useLongKeys) + ndbout << " " << "Using long keys with " << tNoOfLongPK << " keys a' " << + tSizeOfLongPK * 4 << " bytes each." << endl; + + ndbout << " " << "Verification is " ; + if(VerifyFlag) { + ndbout << "enabled" << endl ; + }else{ + ndbout << "disabled" << endl ; + } + theErrorData.printSettings(ndbout); + + NdbThread_SetConcurrencyLevel(tNoOfThreads + 2); + + Ndb* pNdb; + pNdb = new Ndb( "TEST_DB" ); + pNdb->init(); + + tNodeId = pNdb->getNodeId(); + ndbout << " NdbAPI node with id = " << tNodeId << endl; + ndbout << endl; + + ndbout << "Waiting for ndb to become ready..." <<endl; + if (pNdb->waitUntilReady(2000) != 0){ + ndbout << "NDB is not ready" << endl; + ndbout << "Benchmark failed!" << endl; + returnValue = NDBT_FAILED; + } + + if(returnValue == NDBT_OK){ + if (createTables(pNdb) != 0){ + returnValue = NDBT_FAILED; + } + } + + if(returnValue == NDBT_OK){ + + sleepBeforeStartingTest(tSleepTime); + + /**************************************************************** + * Create threads. * + ****************************************************************/ + resetThreads(pThreadsData); + + for (unsigned int i = 0; i < tNoOfThreads; i++){ + pThreadsData[i].threadNo = i; + pThreadsData[i].threadLife = NdbThread_Create(flexBenchThread, + (void**)&pThreadsData[i], + 32768, + "flexBenchThread", + NDB_THREAD_PRIO_LOW); + } + + waitForThreads(pThreadsData); + + ndbout << endl << "All threads started" << endl << endl; + + /**************************************************************** + * Execute program. * + ****************************************************************/ + + for(;;){ + + int loopCount = tLoops + 1; + ndbout << endl << "Loop # " << loopCount << endl << endl; + + /**************************************************************** + * Perform inserts. * + ****************************************************************/ + // Reset and start timer + START_TIMER; + // Give insert-command to all threads + resetThreads(pThreadsData); + tellThreads(pThreadsData, stInsert); + waitForThreads(pThreadsData); + if (checkThreadResults(pThreadsData) != 0){ + ndbout << "Error: Threads failed in performing insert" << endl; + returnValue = NDBT_FAILED; + break; + } + // stop timer and print results. + STOP_TIMER; + PRINT_TIMER("insert", tNoOfOperations*tNoOfThreads, tNoOfTables); + /**************************************************************** + * Verify inserts. * + ****************************************************************/ + if (VerifyFlag) { + resetThreads(pThreadsData); + ndbout << "Verifying inserts...\t" ; + tellThreads(pThreadsData, stVerify); + waitForThreads(pThreadsData); + if (checkThreadResults(pThreadsData) != 0){ + ndbout << "Error: Threads failed while verifying inserts" << endl; + returnValue = NDBT_FAILED; + break; + }else{ + ndbout << "\t\tOK" << endl << endl ; + } + } + + /**************************************************************** + * Perform read. * + ****************************************************************/ + // Reset and start timer + START_TIMER; + // Give read-command to all threads + resetThreads(pThreadsData); + tellThreads(pThreadsData, stRead); + waitForThreads(pThreadsData); + if (checkThreadResults(pThreadsData) != 0){ + ndbout << "Error: Threads failed in performing read" << endl; + returnValue = NDBT_FAILED; + break; + } + // stop timer and print results. + STOP_TIMER; + PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables); + + /**************************************************************** + * Perform update. * + ****************************************************************/ + // Reset and start timer + START_TIMER; + // Give insert-command to all threads + resetThreads(pThreadsData); + tellThreads(pThreadsData, stUpdate); + waitForThreads(pThreadsData); + if (checkThreadResults(pThreadsData) != 0){ + ndbout << "Error: Threads failed in performing update" << endl; + returnValue = NDBT_FAILED; + break; + } + // stop timer and print results. + STOP_TIMER; + PRINT_TIMER("update", tNoOfOperations*tNoOfThreads, tNoOfTables); + + /**************************************************************** + * Verify updates. * + ****************************************************************/ + if (VerifyFlag) { + resetThreads(pThreadsData); + ndbout << "Verifying updates...\t" ; + tellThreads(pThreadsData, stVerify); + waitForThreads(pThreadsData); + if (checkThreadResults(pThreadsData) != 0){ + ndbout << "Error: Threads failed while verifying updates" << endl; + returnValue = NDBT_FAILED; + break; + }else{ + ndbout << "\t\tOK" << endl << endl ; + } + } + + /**************************************************************** + * Perform read. * + ****************************************************************/ + // Reset and start timer + START_TIMER; + // Give insert-command to all threads + resetThreads(pThreadsData); + tellThreads(pThreadsData, stRead); + waitForThreads(pThreadsData); + if (checkThreadResults(pThreadsData) != 0){ + ndbout << "Error: Threads failed in performing read" << endl; + returnValue = NDBT_FAILED; + break; + } + // stop timer and print results. + STOP_TIMER; + PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables); + + /**************************************************************** + * Perform delete. * + ****************************************************************/ + // Reset and start timer + START_TIMER; + // Give insert-command to all threads + resetThreads(pThreadsData); + tellThreads(pThreadsData, stDelete); + waitForThreads(pThreadsData); + if (checkThreadResults(pThreadsData) != 0){ + ndbout << "Error: Threads failed in performing delete" << endl; + returnValue = NDBT_FAILED; + break; + } + // stop timer and print results. + STOP_TIMER; + PRINT_TIMER("delete", tNoOfOperations*tNoOfThreads, tNoOfTables); + + /**************************************************************** + * Verify deletes. * + ****************************************************************/ + if (VerifyFlag) { + resetThreads(pThreadsData); + ndbout << "Verifying tuple deletion..." ; + tellThreads(pThreadsData, stVerifyDelete); + waitForThreads(pThreadsData); + if (checkThreadResults(pThreadsData) != 0){ + ndbout << "Error: Threads failed in verifying deletes" << endl; + returnValue = NDBT_FAILED; + break; + }else{ + ndbout << "\t\tOK" << endl << endl ; + } + } + + ndbout << "--------------------------------------------------" << endl; + + tLoops++; + + if ( 0 != tNoOfLoops && tNoOfLoops <= tLoops ) + break; + theErrorData.printErrorCounters(); + } + + resetThreads(pThreadsData); + tellThreads(pThreadsData, stStop); + waitForThreads(pThreadsData); + + void * tmp; + for(Uint32 i = 0; i<tNoOfThreads; i++){ + NdbThread_WaitFor(pThreadsData[i].threadLife, &tmp); + NdbThread_Destroy(&pThreadsData[i].threadLife); + } + } + + if (useLongKeys == true) { + // Only free these areas if they have been allocated + // Otherwise cores will happen + for (Uint32 i = 0; i < tNoOfLongPK; i++) + free(longKeyAttrName[i]); + free(longKeyAttrName); + } // if + + delete [] pThreadsData; + delete pNdb; + theErrorData.printErrorCounters(); + return NDBT_ProgramExit(returnValue); +} +//////////////////////////////////////// + + +unsigned long get_hash(unsigned long * hash_key, int len) +{ + unsigned long hash_value = 147; + unsigned h_key; + int i; + for (i = 0; i < len; i++) + { + h_key = hash_key[i]; + hash_value = (hash_value << 5) + hash_value + (h_key & 255); + hash_value = (hash_value << 5) + hash_value + ((h_key >> 8) & 255); + hash_value = (hash_value << 5) + hash_value + ((h_key >> 16) & 255); + hash_value = (hash_value << 5) + hash_value + ((h_key >> 24) & 255); + } + return hash_value; +} + +// End of warming up phase + + + +static void* flexBenchThread(void* pArg) +{ + ThreadData* pThreadData = (ThreadData*)pArg; + unsigned int threadNo, threadBase; + Ndb* pNdb = NULL ; + NdbConnection *pTrans = NULL ; + NdbOperation** pOps = NULL ; + StartType tType ; + StartType tSaveType ; + NdbRecAttr* tTmp = NULL ; + int* attrValue = NULL ; + int* attrRefValue = NULL ; + int check = 0 ; + int loopCountOps, loopCountTables, loopCountAttributes; + int tAttemptNo = 0; + int tRetryAttempts = 20; + int tResult = 0; + int tSpecialTrans = 0; + int nRefLocalOpOffset = 0 ; + int nReadBuffSize = + tNoOfTables * tNoOfAttributes * sizeof(int) * tAttributeSize ; + int nRefBuffSize = + tNoOfOperations * tNoOfAttributes * sizeof(int) * tAttributeSize ; + unsigned*** longKeyAttrValue; + + + threadNo = pThreadData->threadNo ; + + attrValue = (int*)malloc(nReadBuffSize) ; + attrRefValue = (int*)malloc(nRefBuffSize) ; + pOps = (NdbOperation**)malloc(tNoOfTables*sizeof(NdbOperation*)) ; + pNdb = new Ndb( "TEST_DB" ); + + if(!attrValue || !attrRefValue || !pOps || !pNdb){ + // Check allocations to make sure we got all the memory we asked for + ndbout << "One or more memory allocations failed when starting thread #"; + ndbout << threadNo << endl ; + ndbout << "Thread #" << threadNo << " will now exit" << endl ; + tResult = 13 ; + free(attrValue) ; + free(attrRefValue) ; + free(pOps) ; + delete pNdb ; + NdbThread_Exit(0) ; + } + + pNdb->init(); + pNdb->waitUntilReady(); + + // To make sure that two different threads doesn't operate on the same record + // Calculate an "unique" number to use as primary key + threadBase = (threadNo * 2000000) + (tNodeId * 260000000); + + if(useLongKeys){ + // Allocate and populate the longkey array. + longKeyAttrValue = (unsigned ***) malloc(sizeof(unsigned**) * tNoOfOperations ); + for (Uint32 n = 0; n < tNoOfOperations; n++) + longKeyAttrValue[n] = (unsigned **) malloc(sizeof(unsigned*) * tNoOfLongPK ); + for (Uint32 n = 0; n < tNoOfOperations; n++){ + for (Uint32 i = 0; i < tNoOfLongPK ; i++) { + longKeyAttrValue[n][i] = (unsigned *) malloc(sizeof(unsigned) * tSizeOfLongPK); + memset(longKeyAttrValue[n][i], 0, sizeof(unsigned) * tSizeOfLongPK); + for(Uint32 j = 0; j < tSizeOfLongPK; j++) { + // Repeat the unique value to fill up the long key. + longKeyAttrValue[n][i][j] = threadBase + n; + } + } + } + } + + int nRefOpOffset = 0 ; + //Assign reference attribute values to memory + for(Uint32 ops = 1 ; ops < tNoOfOperations ; ops++){ + // Calculate offset value before going into the next loop + nRefOpOffset = tAttributeSize*tNoOfAttributes*(ops-1) ; + for(Uint32 a = 0 ; a < tNoOfAttributes ; a++){ + *(int*)&attrRefValue[nRefOpOffset + tAttributeSize*a] = + (int)(threadBase + ops + a) ; + } + } + +#ifdef CEBIT_STAT + // ops not yet reported + int statOps = 0; +#endif + for (;;) { + pThreadData->threadResult = tResult; // Report error to main thread, + // normally tResult is set to 0 + pThreadData->threadReady = 1; + + while (pThreadData->threadStart == stIdle){ + NdbSleep_MilliSleep(100); + }//while + + // Check if signal to exit is received + if (pThreadData->threadStart == stStop){ + pThreadData->threadReady = 1; + // ndbout_c("Thread%d is stopping", threadNo); + // In order to stop this thread, the main thread has signaled + // stStop, break out of the for loop so that destructors + // and the proper exit functions are called + break; + }//if + + tType = pThreadData->threadStart; + tSaveType = tType; + pThreadData->threadStart = stIdle; + + // Start transaction, type of transaction + // is received in the array ThreadStart + loopCountOps = tNoOfOperations; + loopCountTables = tNoOfTables; + loopCountAttributes = tNoOfAttributes; + + for (int count = 1; count < loopCountOps && tResult == 0;){ + + pTrans = pNdb->startTransaction(); + if (pTrans == NULL) { + // This is a fatal error, abort program + ndbout << "Could not start transaction in thread" << threadNo; + ndbout << endl; + ndbout << pNdb->getNdbError() << endl; + tResult = 1; // Indicate fatal error + break; // Break out of for loop + } + + // Calculate the current operation offset in the reference array + nRefLocalOpOffset = tAttributeSize*tNoOfAttributes*(count - 1) ; + + for (int countTables = 0; + countTables < loopCountTables && tResult == 0; + countTables++) { + + pOps[countTables] = pTrans->getNdbOperation(tableName[countTables]); + if (pOps[countTables] == NULL) { + // This is a fatal error, abort program + ndbout << "getNdbOperation: " << pTrans->getNdbError(); + tResult = 2; // Indicate fatal error + break; + }//if + + switch (tType) { + case stInsert: // Insert case + if (theWriteFlag == 1 && theDirtyFlag == 1) + pOps[countTables]->dirtyWrite(); + else if (theWriteFlag == 1) + pOps[countTables]->writeTuple(); + else + pOps[countTables]->insertTuple(); + break; + case stRead: // Read Case + if (theSimpleFlag == 1) + pOps[countTables]->simpleRead(); + else if (theDirtyFlag == 1) + pOps[countTables]->dirtyRead(); + else + pOps[countTables]->readTuple(); + break; + case stUpdate: // Update Case + if (theWriteFlag == 1 && theDirtyFlag == 1) + pOps[countTables]->dirtyWrite(); + else if (theWriteFlag == 1) + pOps[countTables]->writeTuple(); + else if (theDirtyFlag == 1) + pOps[countTables]->dirtyUpdate(); + else + pOps[countTables]->updateTuple(); + break; + case stDelete: // Delete Case + pOps[countTables]->deleteTuple(); + break; + case stVerify: + pOps[countTables]->readTuple(); + break; + case stVerifyDelete: + pOps[countTables]->readTuple(); + break; + default: + assert(false); + }//switch + + + if(useLongKeys){ + // Loop the equal call so the complete key is send to the kernel. + for(Uint32 i = 0; i < tNoOfLongPK; i++) + pOps[countTables]->equal(longKeyAttrName[i], + (char *)longKeyAttrValue[count - 1][i], tSizeOfLongPK*4); + } + else + pOps[countTables]->equal((char*)attrName[0], + (char*)&attrRefValue[nRefLocalOpOffset]); + + if (tType == stInsert || tType == stUpdate){ + for (int ca = 1; ca < loopCountAttributes; ca++){ + pOps[countTables]->setValue((char*)attrName[ca], + (char*)&attrRefValue[nRefLocalOpOffset + tAttributeSize*ca]); + }//for + } else if (tType == stRead || stVerify == tType) { + int nTableOffset = tAttributeSize * + loopCountAttributes * + countTables ; + for (int ca = 1; ca < loopCountAttributes; ca++) { + tTmp = pOps[countTables]->getValue((char*)attrName[ca], + (char*)&attrValue[nTableOffset + tAttributeSize*ca]); + }//for + } else if (stVerifyDelete == tType) { + if(useLongKeys){ + int nTableOffset = tAttributeSize * + loopCountAttributes * + countTables ; + tTmp = pOps[countTables]->getValue(longKeyAttrName[0], + (char*)&attrValue[nTableOffset]); + } else { + int nTableOffset = tAttributeSize * + loopCountAttributes * + countTables ; + tTmp = pOps[countTables]->getValue((char*)attrName[0], + (char*)&attrValue[nTableOffset]); + } + }//if + }//for Tables loop + + if (tResult != 0) + break; + check = pTrans->execute(Commit); + + // Decide what kind of error this is + if ((tSpecialTrans == 1) && + (check == -1)) { + // -------------------------------------------------------------------- + // A special transaction have been executed, change to check = 0 in + // certain situations. + // -------------------------------------------------------------------- + switch (tType) { + case stInsert: // Insert case + if (630 == pTrans->getNdbError().code ) { + check = 0; + ndbout << "Insert with 4007 was successful" << endl; + }//if + break; + case stDelete: // Delete Case + if (626 == pTrans->getNdbError().code ) { + check = 0; + ndbout << "Delete with 4007 was successful" << endl; + }//if + break; + default: + assert(false); + }//switch + }//if + tSpecialTrans = 0; + if (check == -1) { + if ((stVerifyDelete == tType) && + (626 == pTrans->getNdbError().code)) { + // ---------------------------------------------- + // It's good news - the deleted tuple is gone, + // so reset "check" flag + // ---------------------------------------------- + check = 0 ; + } else { + int retCode = + theErrorData.handleErrorCommon(pTrans->getNdbError()); + if (retCode == 1) { + ndbout_c("execute: %d, %d, %s", count, tType, + pTrans->getNdbError().message ); + ndbout_c("Error code = %d", pTrans->getNdbError().code ); + tResult = 20; + } else if (retCode == 2) { + ndbout << "4115 should not happen in flexBench" << endl; + tResult = 20; + } else if (retCode == 3) { + // -------------------------------------------------------------------- + // We are not certain if the transaction was successful or not. + // We must reexecute but might very well find that the transaction + // actually was updated. Updates and Reads are no problem here. Inserts + // will not cause a problem if error code 630 arrives. Deletes will + // not cause a problem if 626 arrives. + // -------------------------------------------------------------------- + if ((tType == stInsert) || (tType == stDelete)) { + tSpecialTrans = 1; + }//if + }//if + }//if + }//if + // Check if retries should be made + if (check == -1 && tResult == 0) { + if (tAttemptNo < tRetryAttempts){ + tAttemptNo++; + } else { + // -------------------------------------------------------------------- + // Too many retries have been made, report error and break out of loop + // -------------------------------------------------------------------- + ndbout << "Thread" << threadNo; + ndbout << ": too many errors reported" << endl; + tResult = 10; + break; + }//if + }//if + + if (check == 0){ + // Go to the next record + count++; + tAttemptNo = 0; +#ifdef CEBIT_STAT + // report successful ops + if (statEnable) { + statOps += loopCountTables; + if (statOps >= statFreq) { + statReport(tType, statOps); + statOps = 0; + }//if + }//if +#endif + }//if + + if (stVerify == tType && 0 == check){ + int nTableOffset = 0 ; + for (int a = 1 ; a < loopCountAttributes ; a++){ + for (int tables = 0 ; tables < loopCountTables ; tables++){ + nTableOffset = tables*loopCountAttributes*tAttributeSize ; + if (*(int*)&attrValue[nTableOffset + tAttributeSize*a] != *(int*)&attrRefValue[nRefLocalOpOffset + tAttributeSize*a]){ + ndbout << "Error in verify:" << endl ; + ndbout << "attrValue[" << nTableOffset + tAttributeSize*a << "] = " << attrValue[a] << endl ; + ndbout << "attrRefValue[" << nRefLocalOpOffset + tAttributeSize*a << "]" << attrRefValue[nRefLocalOpOffset + tAttributeSize*a] << endl ; + tResult = 11 ; + break ; + }//if + }//for + }//for + }// if(stVerify ... ) + pNdb->closeTransaction(pTrans) ; + }// operations loop +#ifdef CEBIT_STAT + // report remaining successful ops + if (statEnable) { + if (statOps > 0) { + statReport(tType, statOps); + statOps = 0; + }//if + }//if +#endif + } + delete pNdb; + free(attrValue) ; + free(attrRefValue) ; + free(pOps) ; + + if (useLongKeys == true) { + // Only free these areas if they have been allocated + // Otherwise cores will occur + for (Uint32 n = 0; n < tNoOfOperations; n++){ + for (Uint32 i = 0; i < tNoOfLongPK; i++) { + free(longKeyAttrValue[n][i]); + } + free(longKeyAttrValue[n]); + } + free(longKeyAttrValue); + } // if + + NdbThread_Exit(0); + return NULL; // Just to keep compiler happy +} + + +static int readArguments(int argc, const char** argv) +{ + + int i = 1; + while (argc > 1){ + if (strcmp(argv[i], "-t") == 0){ + tNoOfThreads = atoi(argv[i+1]); + if ((tNoOfThreads < 1)) + return -1; + argc -= 1; + i++; + }else if (strcmp(argv[i], "-o") == 0){ + tNoOfOperations = atoi(argv[i+1]); + if (tNoOfOperations < 1) + return -1;; + argc -= 1; + i++; + }else if (strcmp(argv[i], "-a") == 0){ + tNoOfAttributes = atoi(argv[i+1]); + if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR)) + return -1; + argc -= 1; + i++; + }else if (strcmp(argv[i], "-lkn") == 0){ + tNoOfLongPK = atoi(argv[i+1]); + useLongKeys = true; + if ((tNoOfLongPK < 1) || (tNoOfLongPK > MAXNOLONGKEY) || + (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){ + ndbout << "Argument -lkn is not in the proper range." << endl; + return -1; + } + argc -= 1; + i++; + }else if (strcmp(argv[i], "-lks") == 0){ + tSizeOfLongPK = atoi(argv[i+1]); + useLongKeys = true; + if ((tSizeOfLongPK < 1) || (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){ + ndbout << "Argument -lks is not in the proper range 1 to " << + MAXLONGKEYTOTALSIZE << endl; + return -1; + } + argc -= 1; + i++; + }else if (strcmp(argv[i], "-c") == 0){ + tNoOfTables = atoi(argv[i+1]); + if ((tNoOfTables < 1) || (tNoOfTables > MAXTABLES)) + return -1; + argc -= 1; + i++; + }else if (strcmp(argv[i], "-stdtables") == 0){ + theStdTableNameFlag = 1; + }else if (strcmp(argv[i], "-l") == 0){ + tNoOfLoops = atoi(argv[i+1]); + if ((tNoOfLoops < 0) || (tNoOfLoops > 100000)) + return -1; + argc -= 1; + i++; + }else if (strcmp(argv[i], "-s") == 0){ + tAttributeSize = atoi(argv[i+1]); + if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE)) + return -1; + argc -= 1; + i++; + }else if (strcmp(argv[i], "-sleep") == 0){ + tSleepTime = atoi(argv[i+1]); + if ((tSleepTime < 1) || (tSleepTime > 3600)) + return -1; + argc -= 1; + i++; + }else if (strcmp(argv[i], "-simple") == 0){ + theSimpleFlag = 1; + }else if (strcmp(argv[i], "-write") == 0){ + theWriteFlag = 1; + }else if (strcmp(argv[i], "-dirty") == 0){ + theDirtyFlag = 1; + }else if (strcmp(argv[i], "-no_table_create") == 0){ + theTableCreateFlag = 1; + }else if (strcmp(argv[i], "-temp") == 0){ + theTempTable = true; + }else if (strcmp(argv[i], "-noverify") == 0){ + VerifyFlag = false ; + }else if (theErrorData.parseCmdLineArg(argv, i) == true){ + ; //empty, updated in errorArg(..) + }else if (strcmp(argv[i], "-verify") == 0){ + VerifyFlag = true ; +#ifdef CEBIT_STAT + }else if (strcmp(argv[i], "-statserv") == 0){ + if (! (argc > 2)) + return -1; + const char *p = argv[i+1]; + const char *q = strrchr(p, ':'); + if (q == 0) + return -1; + snprintf(statHost, sizeof(statHost), "%.*s", q-p, p); + statPort = atoi(q+1); + statEnable = true; + argc -= 1; + i++; + }else if (strcmp(argv[i], "-statfreq") == 0){ + if (! (argc > 2)) + return -1; + statFreq = atoi(argv[i+1]); + if (statFreq < 1) + return -1; + argc -= 1; + i++; +#endif + }else{ + return -1; + } + argc -= 1; + i++; + } + return 0; +} + +static void sleepBeforeStartingTest(int seconds){ + if (seconds > 0){ + ndbout << "Sleeping(" <<seconds << ")..."; + NdbSleep_SecSleep(seconds); + ndbout << " done!" << endl; + } +} + + +static int +createTables(Ndb* pMyNdb){ + for (Uint32 i = 0; i < tNoOfAttributes; i++){ + snprintf(attrName[i], MAXSTRLEN, "COL%d", i); + } + + // Note! Uses only uppercase letters in table name's + // so that we can look at the tables with SQL + for (Uint32 i = 0; i < tNoOfTables; i++){ + if (theStdTableNameFlag == 0){ + snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i, + (int)(NdbTick_CurrentMillisecond() / 1000)); + } else { + snprintf(tableName[i], MAXSTRLEN, "TAB%d", i); + } + } + + for(unsigned i = 0; i < tNoOfTables; i++){ + ndbout << "Creating " << tableName[i] << "... "; + + NdbDictionary::Table tmpTable(tableName[i]); + + tmpTable.setStoredTable(!theTempTable); + + if(useLongKeys){ + for(Uint32 i = 0; i < tNoOfLongPK; i++) { + NdbDictionary::Column col(longKeyAttrName[i]); + col.setType(NdbDictionary::Column::Unsigned); + col.setLength(tSizeOfLongPK); + col.setPrimaryKey(true); + tmpTable.addColumn(col); + } + } else { + NdbDictionary::Column col(attrName[0]); + col.setType(NdbDictionary::Column::Unsigned); + col.setLength(1); + col.setPrimaryKey(true); + tmpTable.addColumn(col); + } + + + NdbDictionary::Column col; + col.setType(NdbDictionary::Column::Unsigned); + col.setLength(tAttributeSize); + for (unsigned j = 1; j < tNoOfAttributes; j++){ + col.setName(attrName[j]); + tmpTable.addColumn(col); + } + + if(pMyNdb->getDictionary()->createTable(tmpTable) == -1){ + return -1; + } + ndbout << "done" << endl; + } + + return 0; +} + + +static void input_error(){ + ndbout << endl << "Invalid argument!" << endl; + ndbout << endl << "Arguments:" << endl; + ndbout << " -t Number of threads to start, default 1" << endl; + ndbout << " -o Number of operations per loop, default 500" << endl; + ndbout << " -l Number of loops to run, default 1, 0=infinite" << endl; + ndbout << " -a Number of attributes, default 25" << endl; + ndbout << " -c Number of tables, default 1" << endl; + ndbout << " -s Size of each attribute, default 1 (Primary Key is always of size 1," << endl; + ndbout << " independent of this value)" << endl; + ndbout << " -lkn Number of long primary keys, default 1" << endl; + ndbout << " -lks Size of each long primary key, default 1" << endl; + + ndbout << " -simple Use simple read to read from database" << endl; + ndbout << " -dirty Use dirty read to read from database" << endl; + ndbout << " -write Use writeTuple in insert and update" << endl; + ndbout << " -stdtables Use standard table names" << endl; + ndbout << " -no_table_create Don't create tables in db" << endl; + ndbout << " -sleep Sleep a number of seconds before running the test, this" << endl; + ndbout << " can be used so that another flexBench have time to create tables" << endl; + ndbout << " -temp Use tables without logging" << endl; + ndbout << " -verify Verify inserts, updates and deletes" << endl ; + theErrorData.printCmdLineArgs(ndbout); + ndbout << endl <<"Returns:" << endl; + ndbout << "\t 0 - Test passed" << endl; + ndbout << "\t 1 - Test failed" << endl; + ndbout << "\t 2 - Invalid arguments" << endl << endl; +} + +// vim: set sw=2: |