summaryrefslogtreecommitdiff
path: root/storage/ndb/test/src/HugoTransactions.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/test/src/HugoTransactions.cpp')
-rw-r--r--storage/ndb/test/src/HugoTransactions.cpp229
1 files changed, 130 insertions, 99 deletions
diff --git a/storage/ndb/test/src/HugoTransactions.cpp b/storage/ndb/test/src/HugoTransactions.cpp
index 3a1600815e0..0e5f7cd8115 100644
--- a/storage/ndb/test/src/HugoTransactions.cpp
+++ b/storage/ndb/test/src/HugoTransactions.cpp
@@ -341,50 +341,14 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
int
HugoTransactions::scanUpdateRecords(Ndb* pNdb,
- int records,
- int abortPercent,
- int parallelism){
- if(m_defaultScanUpdateMethod == 1){
- return scanUpdateRecords1(pNdb, records, abortPercent, parallelism);
- } else if(m_defaultScanUpdateMethod == 2){
- return scanUpdateRecords2(pNdb, records, abortPercent, parallelism);
- } else {
- return scanUpdateRecords3(pNdb, records, abortPercent, parallelism);
- }
-}
-
-// Scan all records exclusive and update
-// them one by one
-int
-HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
- int records,
- int abortPercent,
- int parallelism){
- return scanUpdateRecords3(pNdb, records, abortPercent, 1);
-}
-
-// Scan all records exclusive and update
-// them batched by asking nextScanResult to
-// give us all cached records before fetching new
-// records from db
-int
-HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
- int records,
- int abortPercent,
- int parallelism){
- return scanUpdateRecords3(pNdb, records, abortPercent, parallelism);
-}
-
-int
-HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
- int records,
- int abortPercent,
- int parallelism){
- int retryAttempt = 0;
+ NdbScanOperation::ScanFlag flags,
+ int records,
+ int abortPercent,
+ int parallelism){
+ int retryAttempt = 0;
int check, a;
NdbScanOperation *pOp;
-
while (true){
restart:
if (retryAttempt++ >= m_retryMax){
@@ -411,8 +375,9 @@ restart:
return NDBT_FAILED;
}
- if( pOp->readTuplesExclusive(parallelism) ) {
- ERR(pTrans->getNdbError());
+ if( pOp->readTuples(NdbOperation::LM_Exclusive, flags,
+ parallelism))
+ {
closeTransaction(pNdb);
return NDBT_FAILED;
}
@@ -429,15 +394,18 @@ restart:
check = pTrans->execute(NoCommit, AbortOnError);
if( check == -1 ) {
const NdbError err = pTrans->getNdbError();
- ERR(err);
- closeTransaction(pNdb);
if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ closeTransaction(pNdb);
NdbSleep_MilliSleep(50);
+ retryAttempt++;
continue;
}
+ ERR(err);
+ closeTransaction(pNdb);
return NDBT_FAILED;
}
-
+
// Abort after 1-100 or 1-records rows
int ranVal = rand();
int abortCount = ranVal % (records == 0 ? 100 : records);
@@ -448,75 +416,114 @@ restart:
abortTrans = true;
}
+ int eof;
int rows = 0;
- while((check = pOp->nextResult(true)) == 0){
- do {
- rows++;
- NdbOperation* pUp = pOp->updateCurrentTuple();
- if(pUp == 0){
+ while((eof = pOp->nextResult(true)) == 0){
+ rows++;
+ if (calc.verifyRowValues(&row) != 0){
+ closeTransaction(pNdb);
+ return NDBT_FAILED;
+ }
+
+ if (abortCount == rows && abortTrans == true){
+ ndbout << "Scan is aborted" << endl;
+ g_info << "Scan is aborted" << endl;
+ pOp->close();
+ if( check == -1 ) {
ERR(pTrans->getNdbError());
closeTransaction(pNdb);
return NDBT_FAILED;
}
- const int updates = calc.getUpdatesValue(&row) + 1;
- const int r = calc.getIdValue(&row);
- for(a = 0; a<tab.getNoOfColumns(); a++){
- if (tab.getColumn(a)->getPrimaryKey() == false){
- if(setValueForAttr(pUp, a, r, updates ) != 0){
- ERR(pTrans->getNdbError());
- closeTransaction(pNdb);
- return NDBT_FAILED;
- }
- }
- }
-
- if (rows == abortCount && abortTrans == true){
- g_info << "Scan is aborted" << endl;
- // This scan should be aborted
- closeTransaction(pNdb);
- return NDBT_OK;
- }
- } while((check = pOp->nextResult(false)) == 0);
-
- if(check != -1){
- check = pTrans->execute(Commit, AbortOnError);
- if(check != -1)
- m_latest_gci = pTrans->getGCI();
- pTrans->restart();
- }
-
- const NdbError err = pTrans->getNdbError();
- if( check == -1 ) {
+
closeTransaction(pNdb);
- ERR(err);
- if (err.status == NdbError::TemporaryError){
- NdbSleep_MilliSleep(50);
- goto restart;
- }
- return NDBT_FAILED;
+ return NDBT_OK;
}
}
-
- const NdbError err = pTrans->getNdbError();
- if( check == -1 ) {
- closeTransaction(pNdb);
- ERR(err);
+ if (eof == -1) {
+ const NdbError err = pTrans->getNdbError();
+
if (err.status == NdbError::TemporaryError){
+ ERR_INFO(err);
+ closeTransaction(pNdb);
NdbSleep_MilliSleep(50);
- goto restart;
+ switch (err.code){
+ case 488:
+ case 245:
+ case 490:
+ // Too many active scans, no limit on number of retry attempts
+ break;
+ default:
+ retryAttempt++;
+ }
+ continue;
}
+ ERR(err);
+ closeTransaction(pNdb);
return NDBT_FAILED;
}
-
+
closeTransaction(pNdb);
+
+ g_info << rows << " rows have been read" << endl;
+ if (records != 0 && rows != records){
+ g_err << "Check expected number of records failed" << endl
+ << " expected=" << records <<", " << endl
+ << " read=" << rows << endl;
+ return NDBT_FAILED;
+ }
- g_info << rows << " rows have been updated" << endl;
return NDBT_OK;
}
return NDBT_FAILED;
}
int
+HugoTransactions::scanUpdateRecords(Ndb* pNdb,
+ int records,
+ int abortPercent,
+ int parallelism){
+
+ return scanUpdateRecords(pNdb,
+ (NdbScanOperation::ScanFlag)0,
+ records, abortPercent, parallelism);
+}
+
+// Scan all records exclusive and update
+// them one by one
+int
+HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
+ int records,
+ int abortPercent,
+ int parallelism){
+ return scanUpdateRecords(pNdb,
+ (NdbScanOperation::ScanFlag)0,
+ records, abortPercent, 1);
+}
+
+// Scan all records exclusive and update
+// them batched by asking nextScanResult to
+// give us all cached records before fetching new
+// records from db
+int
+HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
+ int records,
+ int abortPercent,
+ int parallelism){
+ return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
+ records, abortPercent, parallelism);
+}
+
+int
+HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
+ int records,
+ int abortPercent,
+ int parallelism)
+{
+ return scanUpdateRecords(pNdb, (NdbScanOperation::ScanFlag)0,
+ records, abortPercent, parallelism);
+}
+
+int
HugoTransactions::loadTable(Ndb* pNdb,
int records,
int batch,
@@ -524,7 +531,22 @@ HugoTransactions::loadTable(Ndb* pNdb,
int doSleep,
bool oneTrans,
int value,
- bool abort){
+ bool abort)
+{
+ return loadTableStartFrom(pNdb, 0, records, batch, allowConstraintViolation,
+ doSleep, oneTrans, value, abort);
+}
+
+int
+HugoTransactions::loadTableStartFrom(Ndb* pNdb,
+ int startFrom,
+ int records,
+ int batch,
+ bool allowConstraintViolation,
+ int doSleep,
+ bool oneTrans,
+ int value,
+ bool abort){
int check;
int retryAttempt = 0;
int retryMax = 5;
@@ -543,8 +565,9 @@ HugoTransactions::loadTable(Ndb* pNdb,
<< " -> rows/commit = " << batch << endl;
}
+ Uint32 orgbatch = batch;
g_info << "|- Inserting records..." << endl;
- for (int c=0 ; c<records ; ){
+ for (int c=0 ; c<records; ){
bool closeTrans = true;
if(c + batch > records)
@@ -578,7 +601,7 @@ HugoTransactions::loadTable(Ndb* pNdb,
}
}
- if(pkInsertRecord(pNdb, c, batch, value) != NDBT_OK)
+ if(pkInsertRecord(pNdb, c + startFrom, batch, value) != NDBT_OK)
{
ERR(pTrans->getNdbError());
closeTransaction(pNdb);
@@ -625,6 +648,7 @@ HugoTransactions::loadTable(Ndb* pNdb,
ERR(err);
NdbSleep_MilliSleep(50);
retryAttempt++;
+ batch = 1;
continue;
break;
@@ -670,7 +694,14 @@ HugoTransactions::loadTable(Ndb* pNdb,
int
HugoTransactions::fillTable(Ndb* pNdb,
- int batch){
+ int batch){
+ return fillTableStartFrom(pNdb, 0, batch);
+}
+
+int
+HugoTransactions::fillTableStartFrom(Ndb* pNdb,
+ int startFrom,
+ int batch){
int check;
int retryAttempt = 0;
int retryMax = 5;
@@ -688,7 +719,7 @@ HugoTransactions::fillTable(Ndb* pNdb,
<< " -> rows/commit = " << batch << endl;
}
- for (int c=0 ; ; ){
+ for (int c=startFrom ; ; ){
if (retryAttempt >= retryMax){
g_info << "Record " << c << " could not be inserted, has retried "