summaryrefslogtreecommitdiff
path: root/ndb/test/src/HugoTransactions.cpp
diff options
context:
space:
mode:
authorunknown <magnus@neptunus.(none)>2004-04-14 10:53:21 +0200
committerunknown <magnus@neptunus.(none)>2004-04-14 10:53:21 +0200
commit6386c55cee50bad6a9979d1fab28e03bb8612ca7 (patch)
tree3fbbacf704304b69228474b9f03549ccd585a017 /ndb/test/src/HugoTransactions.cpp
parent0ba6cb48d84f1ff951d09871a96be6cdef3f2c3c (diff)
downloadmariadb-git-6386c55cee50bad6a9979d1fab28e03bb8612ca7.tar.gz
Initial revision of NDB Cluster files
BitKeeper/etc/logging_ok: Logging to logging@openlogging.org accepted
Diffstat (limited to 'ndb/test/src/HugoTransactions.cpp')
-rw-r--r--ndb/test/src/HugoTransactions.cpp2404
1 files changed, 2404 insertions, 0 deletions
diff --git a/ndb/test/src/HugoTransactions.cpp b/ndb/test/src/HugoTransactions.cpp
new file mode 100644
index 00000000000..b1c55fcc780
--- /dev/null
+++ b/ndb/test/src/HugoTransactions.cpp
@@ -0,0 +1,2404 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "HugoTransactions.hpp"
+#include <NdbSleep.h>
+
+
+HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab):
+ HugoOperations(_tab),
+ row(_tab){
+
+ m_defaultScanUpdateMethod = 3;
+}
+
+HugoTransactions::~HugoTransactions(){
+ deallocRows();
+}
+
+
+int HugoTransactions::scanReadCommittedRecords(Ndb* pNdb,
+ int records,
+ int abortPercent,
+ int parallelism){
+ return scanReadRecords(pNdb, records, abortPercent, parallelism, true);
+}
+
+int
+HugoTransactions::scanReadRecords(Ndb* pNdb,
+ int records,
+ int abortPercent,
+ int parallelism,
+ bool committed){
+
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+ NdbOperation *pOp;
+
+ while (true){
+
+ if (retryAttempt >= retryMax){
+ g_err << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ pOp = pTrans->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ if (committed == true)
+ check = pOp->openScanReadCommitted(parallelism);
+ else
+ check = pOp->openScanRead(parallelism);
+
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->interpret_exit_ok();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if((row.attributeStore(a) =
+ pOp->getValue(tab.getColumn(a)->getName())) == 0) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+
+ check = pTrans->executeScan();
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Abort after 1-100 or 1-records rows
+ int ranVal = rand();
+ int abortCount = ranVal % (records == 0 ? 100 : records);
+ bool abortTrans = false;
+ if (abort > 0){
+ // Abort if abortCount is less then abortPercent
+ if (abortCount < abortPercent)
+ abortTrans = true;
+ }
+
+ int eof;
+ int rows = 0;
+ eof = pTrans->nextScanResult();
+
+ while(eof == 0){
+ rows++;
+ if (calc.verifyRowValues(&row) != 0){
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ if (abortCount == rows && abortTrans == true){
+ ndbout << "Scan is aborted" << endl;
+ g_info << "Scan is aborted" << endl;
+ check = pTrans->stopScan();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ pNdb->closeTransaction(pTrans);
+ return NDBT_OK;
+ }
+
+ eof = pTrans->nextScanResult();
+ }
+ if (eof == -1) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR_INFO(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ switch (err.code){
+ case 488:
+ case 245:
+ case 490:
+ // Too many active scans, no limit on number of retry attempts
+ break;
+ default:
+ retryAttempt++;
+ }
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ pNdb->closeTransaction(pTrans);
+
+ g_info << rows << " rows have been read" << endl;
+ if (records != 0 && rows != records){
+ g_err << "Check expected number of records failed" << endl
+ << " expected=" << records <<", " << endl
+ << " read=" << rows << endl;
+ return NDBT_FAILED;
+ }
+
+ return NDBT_OK;
+ }
+ return NDBT_FAILED;
+}
+
+
+#define RESTART_SCAN 99
+
+// Take over one record from pOrgOp and update it
+int
+HugoTransactions::takeOverAndUpdateRecord(Ndb* pNdb,
+ NdbOperation* pOrgOp){
+ int retryAttempt = 0;
+ const int retryMax = 10;
+ int check;
+ NdbConnection *pUpdTrans;
+ NdbOperation *pUpdOp;
+
+ while (true){
+
+ if (retryAttempt >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pUpdTrans = pNdb->startTransaction();
+ if (pUpdTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ if ((pUpdOp = pOrgOp->takeOverForUpdate(pUpdTrans)) == NULL){
+ ERR(pNdb->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ int updates = calc.getUpdatesValue(&row) + 1;
+ int id = calc.getIdValue(&row);
+
+ // Set a calculated value for each non-PK attribute in this table
+ for (int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == false){
+ if(setValueForAttr(pUpdOp, a, id, updates ) != 0){
+ ERR(pUpdTrans->getNdbError());
+ pNdb->closeTransaction(pUpdTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+ check = pUpdTrans->execute( Commit );
+ if(check == -1 ) {
+ const NdbError err = pUpdTrans->getNdbError();
+ pNdb->closeTransaction(pUpdTrans);
+
+ ERR(err);
+ if(err.code == 499 || err.code == 250){
+ return RESTART_SCAN;
+ }
+
+ switch(err.status){
+ case NdbError::Success:
+ g_info << "ERROR: NdbError reports success when transcaction failed"
+ << endl;
+ return NDBT_FAILED;
+ break;
+
+ case NdbError::TemporaryError:
+ NdbSleep_MilliSleep(50+50*retryAttempt);
+ retryAttempt++;
+ continue;
+ break;
+
+ case NdbError::UnknownResult:
+ return NDBT_FAILED;
+ break;
+
+ default:
+ case NdbError::PermanentError:
+ switch (err.code){
+ case 499:
+ case 250:
+ return NDBT_TEMPORARY;
+
+ default:
+ return NDBT_FAILED;
+ break;
+ }
+ break;
+ }
+ }
+ else{
+ pNdb->closeTransaction(pUpdTrans);
+ }
+
+ return NDBT_OK;
+ }
+ return NDBT_FAILED;
+}
+
+int
+HugoTransactions::scanUpdateRecords(Ndb* pNdb,
+ int records,
+ int abortPercent,
+ int parallelism){
+ if(m_defaultScanUpdateMethod == 1){
+ return scanUpdateRecords1(pNdb, records, abortPercent, parallelism);
+ } else if(m_defaultScanUpdateMethod == 2){
+ return scanUpdateRecords2(pNdb, records, abortPercent, parallelism);
+ } else {
+ return scanUpdateRecords3(pNdb, records, abortPercent, parallelism);
+ }
+}
+
+// Scan all records exclusive and update
+// them one by one
+int
+HugoTransactions::scanUpdateRecords1(Ndb* pNdb,
+ int records,
+ int abortPercent,
+ int parallelism){
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+ NdbOperation *pOp;
+
+
+ while (true){
+
+ if (retryAttempt >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ pOp = pTrans->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->openScanExclusive(parallelism);
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->interpret_exit_ok();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Read all attributes from this table
+ for(int a=0; a<tab.getNoOfColumns(); a++){
+ if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+
+ check = pTrans->executeScan();
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Abort after 1-100 or 1-records rows
+ int ranVal = rand();
+ int abortCount = ranVal % (records == 0 ? 100 : records);
+ bool abortTrans = false;
+ if (abort > 0){
+ // Abort if abortCount is less then abortPercent
+ if (abortCount < abortPercent)
+ abortTrans = true;
+ }
+
+
+ int eof;
+ int rows = 0;
+
+ eof = pTrans->nextScanResult();
+ while(eof == 0){
+ rows++;
+
+ if (abortCount == rows && abortTrans == true){
+ g_info << "Scan is aborted" << endl;
+ // This scan should be aborted
+ check = pTrans->stopScan();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ pNdb->closeTransaction(pTrans);
+ return NDBT_OK;
+ }
+ int res = takeOverAndUpdateRecord(pNdb, pOp);
+ if(res == RESTART_SCAN){
+ eof = -2;
+ continue;
+ }
+ if (res != 0){
+ pNdb->closeTransaction(pTrans);
+ return res;
+ }
+
+ eof = pTrans->nextScanResult();
+ }
+ if (eof == -1) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ switch (err.code){
+ case 488:
+ case 245:
+ case 490:
+ // Too many active scans, no limit on number of retry attempts
+ break;
+ default:
+ retryAttempt++;
+ }
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ if(eof == -2){
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+
+ pNdb->closeTransaction(pTrans);
+
+ g_info << rows << " rows have been updated" << endl;
+ return NDBT_OK;
+ }
+ return NDBT_FAILED;
+}
+
+
+// Scan all records exclusive and update
+// them batched by asking nextScanResult to
+// give us all cached records before fetching new
+// records from db
+int
+HugoTransactions::scanUpdateRecords2(Ndb* pNdb,
+ int records,
+ int abortPercent,
+ int parallelism){
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+ NdbOperation *pOp;
+
+
+ while (true){
+
+ if (retryAttempt >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ pOp = pTrans->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->openScanExclusive(parallelism);
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->interpret_exit_ok();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Read all attributes from this table
+ for(int a=0; a<tab.getNoOfColumns(); a++){
+ if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+
+ check = pTrans->executeScan();
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Abort after 1-100 or 1-records rows
+ int ranVal = rand();
+ int abortCount = ranVal % (records == 0 ? 100 : records);
+ bool abortTrans = false;
+ if (abort > 0){
+ // Abort if abortCount is less then abortPercent
+ if (abortCount < abortPercent)
+ abortTrans = true;
+ }
+
+ int eof;
+ int rows = 0;
+ NdbConnection* pUpTrans;
+
+ while((eof = pTrans->nextScanResult(true)) == 0){
+ pUpTrans = pNdb->startTransaction();
+ if (pUpTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+ do {
+ rows++;
+ if (addRowToUpdate(pNdb, pUpTrans, pOp) != 0){
+ pNdb->closeTransaction(pUpTrans);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ } while((eof = pTrans->nextScanResult(false)) == 0);
+
+ if (abortCount == rows && abortTrans == true){
+ g_info << "Scan is aborted" << endl;
+ // This scan should be aborted
+ check = pTrans->stopScan();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ pNdb->closeTransaction(pUpTrans);
+ return NDBT_FAILED;
+ }
+
+ pNdb->closeTransaction(pTrans);
+ pNdb->closeTransaction(pUpTrans);
+ return NDBT_OK;
+ }
+
+ check = pUpTrans->execute(Commit);
+ if( check == -1 ) {
+ const NdbError err = pUpTrans->getNdbError();
+ ERR(err);
+ pNdb->closeTransaction(pUpTrans);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ pNdb->closeTransaction(pUpTrans);
+ }
+ if (eof == -1) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ pNdb->closeTransaction(pTrans);
+
+ g_info << rows << " rows have been updated" << endl;
+ return NDBT_OK;
+ }
+ return NDBT_FAILED;
+}
+
+int
+HugoTransactions::addRowToUpdate(Ndb* pNdb,
+ NdbConnection* pUpdTrans,
+ NdbOperation* pOrgOp){
+
+ int updates = calc.getUpdatesValue(&row) + 1;
+ int r = calc.getIdValue(&row);
+
+ NdbOperation* pUpdOp = pOrgOp->takeOverForUpdate(pUpdTrans);
+ if (pUpdOp == NULL){
+ ERR(pNdb->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == false){
+ if(setValueForAttr(pUpdOp, a, r, updates ) != 0){
+ ERR(pUpdTrans->getNdbError());
+ pNdb->closeTransaction(pUpdTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+ return NDBT_OK;
+}
+
+
+int
+HugoTransactions::scanUpdateRecords3(Ndb* pNdb,
+ int records,
+ int abortPercent,
+ int parallelism){
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+ NdbScanOperation *pOp;
+
+
+ while (true){
+ restart:
+ if (retryAttempt++ >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+ ERR(err);
+ if (err.status == NdbError::TemporaryError){
+ NdbSleep_MilliSleep(50);
+ continue;
+ }
+ return NDBT_FAILED;
+ }
+
+ pOp = pTrans->getNdbScanOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ NdbResultSet *rs = pOp->readTuplesExclusive(parallelism);
+ if( rs == 0 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Read all attributes from this table
+ for(int a=0; a<tab.getNoOfColumns(); a++){
+ if((row.attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == NULL){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+
+ check = pTrans->execute(NoCommit);
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ if (err.status == NdbError::TemporaryError){
+ NdbSleep_MilliSleep(50);
+ continue;
+ }
+ return NDBT_FAILED;
+ }
+
+ // Abort after 1-100 or 1-records rows
+ int ranVal = rand();
+ int abortCount = ranVal % (records == 0 ? 100 : records);
+ bool abortTrans = false;
+ if (abort > 0){
+ // Abort if abortCount is less then abortPercent
+ if (abortCount < abortPercent)
+ abortTrans = true;
+ }
+
+ int rows = 0;
+ while((check = rs->nextResult(true)) == 0){
+ do {
+ rows++;
+ NdbOperation* pUp = rs->updateTuple();
+ if(pUp == 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ const int updates = calc.getUpdatesValue(&row) + 1;
+ const int r = calc.getIdValue(&row);
+
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == false){
+ if(setValueForAttr(pUp, a, r, updates ) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ if (rows == abortCount && abortTrans == true){
+ g_info << "Scan is aborted" << endl;
+ // This scan should be aborted
+ pNdb->closeTransaction(pTrans);
+ return NDBT_OK;
+ }
+ } while((check = rs->nextResult(false)) == 0);
+
+ if(check != -1){
+ check = pTrans->execute(Commit);
+ pTrans->releaseCompletedOperations();
+ }
+
+ const NdbError err = pTrans->getNdbError();
+ if( check == -1 ) {
+ pNdb->closeTransaction(pTrans);
+ ERR(err);
+ if (err.status == NdbError::TemporaryError){
+ NdbSleep_MilliSleep(50);
+ goto restart;
+ }
+ return NDBT_FAILED;
+ }
+ }
+ pNdb->closeTransaction(pTrans);
+
+ g_info << rows << " rows have been updated" << endl;
+ return NDBT_OK;
+ }
+ return NDBT_FAILED;
+}
+
+int
+HugoTransactions::loadTable(Ndb* pNdb,
+ int records,
+ int batch,
+ bool allowConstraintViolation,
+ int doSleep){
+ int check;
+ int retryAttempt = 0;
+ int retryMax = 5;
+ NdbConnection *pTrans;
+ NdbOperation *pOp;
+
+ const int org = batch;
+ const int cols = tab.getNoOfColumns();
+ const int brow = tab.getRowSizeInBytes();
+ const int bytes = 12 + brow + 4 * cols;
+ batch = (batch * 256); // -> 512 -> 65536k per commit
+ batch = batch/bytes; //
+ batch = batch == 0 ? 1 : batch;
+
+ if(batch != org){
+ g_info << "batch = " << org << " rowsize = " << bytes
+ << " -> rows/commit = " << batch << endl;
+ }
+
+ g_info << "|- Inserting records..." << endl;
+ for (int c=0 ; c<records ; ){
+
+ if (retryAttempt >= retryMax){
+ g_info << "Record " << c << " could not be inserted, has retried "
+ << retryAttempt << " times " << endl;
+ // Reset retry counters and continue with next record
+ retryAttempt = 0;
+ c++;
+ }
+ if (doSleep > 0)
+ NdbSleep_MilliSleep(doSleep);
+
+ pTrans = pNdb->startTransaction();
+
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ for(int b = 0; b < batch && c+b<records; b++){
+
+ pOp = pTrans->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->insertTuple();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Set a calculated value for each attribute in this table
+ for (int a = 0; a<tab.getNoOfColumns(); a++){
+ if(setValueForAttr(pOp, a, c+b, 0 ) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ // Execute the transaction and insert the record
+ check = pTrans->execute( Commit );
+ if(check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+ pNdb->closeTransaction(pTrans);
+
+ switch(err.status){
+ case NdbError::Success:
+ ERR(err);
+ g_info << "ERROR: NdbError reports success when transcaction failed"
+ << endl;
+ return NDBT_FAILED;
+ break;
+
+ case NdbError::TemporaryError:
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ break;
+
+ case NdbError::UnknownResult:
+ ERR(err);
+ return NDBT_FAILED;
+ break;
+
+ case NdbError::PermanentError:
+ if (allowConstraintViolation == true){
+ switch (err.classification){
+ case NdbError::ConstraintViolation:
+ // Tuple already existed, OK but should be reported
+ g_info << c << ": " << err.code << " " << err.message << endl;
+ c++;
+ continue;
+ break;
+ default:
+ break;
+ }
+ }
+ ERR(err);
+ return err.code;
+ break;
+ }
+ }
+ else{
+ pNdb->closeTransaction(pTrans);
+ }
+
+ // Step to next record
+ c = c+batch;
+ retryAttempt = 0;
+ }
+ return NDBT_OK;
+}
+
+int
+HugoTransactions::fillTable(Ndb* pNdb,
+ int batch){
+ int check;
+ int retryAttempt = 0;
+ int retryMax = 5;
+ NdbConnection *pTrans;
+ NdbOperation *pOp;
+
+ g_info << "|- Inserting records..." << endl;
+ for (int c=0 ; ; ){
+
+ if (retryAttempt >= retryMax){
+ g_info << "Record " << c << " could not be inserted, has retried "
+ << retryAttempt << " times " << endl;
+ // Reset retry counters and continue with next record
+ retryAttempt = 0;
+ c++;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ for(int b = 0; b < batch; b++){
+
+ pOp = pTrans->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->insertTuple();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Set a calculated value for each attribute in this table
+ for (int a = 0; a<tab.getNoOfColumns(); a++){
+ if(setValueForAttr(pOp, a, c+b, 0 ) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ // Execute the transaction and insert the record
+ check = pTrans->execute( Commit, CommitAsMuchAsPossible );
+ if(check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+ pNdb->closeTransaction(pTrans);
+
+ switch(err.status){
+ case NdbError::Success:
+ ERR(err);
+ g_info << "ERROR: NdbError reports success when transcaction failed"
+ << endl;
+ return NDBT_FAILED;
+ break;
+
+ case NdbError::TemporaryError:
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ break;
+
+ case NdbError::UnknownResult:
+ ERR(err);
+ return NDBT_FAILED;
+ break;
+
+ case NdbError::PermanentError:
+ // if (allowConstraintViolation == true){
+ // switch (err.classification){
+ // case NdbError::ConstraintViolation:
+ // // Tuple already existed, OK but should be reported
+ // g_info << c << ": " << err.code << " " << err.message << endl;
+ // c++;
+ // continue;
+ // break;
+ // default:
+ // break;es
+ // }
+ // }
+
+ // Check if this is the "db full" error
+ if (err.classification==NdbError::InsufficientSpace){
+ ERR(err);
+ return NDBT_OK;
+ }
+
+ if (err.classification == NdbError::ConstraintViolation){
+ ERR(err);
+ break;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ break;
+ }
+ }
+ else{
+ pNdb->closeTransaction(pTrans);
+ }
+
+ // Step to next record
+ c = c+batch;
+ retryAttempt = 0;
+ }
+ return NDBT_OK;
+}
+
+int
+HugoTransactions::createEvent(Ndb* pNdb){
+
+ char eventName[1024];
+ sprintf(eventName,"%s_EVENT",tab.getName());
+
+ NdbDictionary::Dictionary *myDict = pNdb->getDictionary();
+
+ if (!myDict) {
+ printf("Event Creation failedDictionary not found");
+ return NDBT_FAILED;
+ }
+
+ NdbDictionary::Event myEvent(eventName);
+ myEvent.setTable(tab.getName());
+ myEvent.addTableEvent(NdbDictionary::Event::TE_ALL);
+ // myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);
+ // myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE);
+ // myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
+
+ // const NdbDictionary::Table *_table = myDict->getTable(tab.getName());
+ for(int a = 0; a < tab.getNoOfColumns(); a++){
+ // myEvent.addEventColumn(_table->getColumn(a)->getName());
+ myEvent.addEventColumn(a);
+ }
+
+ int res = myDict->createEvent(myEvent); // Add event to database
+
+ if (res == 0)
+ myEvent.print();
+ else {
+ g_info << "Event creation failed\n";
+ g_info << "trying drop Event, maybe event exists\n";
+ res = myDict->dropEvent(eventName);
+ if (res) {
+ g_err << "failed to drop event\n";
+ return NDBT_FAILED;
+ }
+ // try again
+ res = myDict->createEvent(myEvent); // Add event to database
+ if (res) {
+ g_err << "failed to create event\n";
+ return NDBT_FAILED;
+ }
+ }
+
+ return NDBT_OK;
+}
+
+#include <NdbEventOperation.hpp>
+#include "TestNdbEventOperation.hpp"
+#include <NdbAutoPtr.hpp>
+
+struct receivedEvent {
+ Uint32 pk;
+ Uint32 count;
+ Uint32 event;
+};
+
+int XXXXX = 0;
+int
+HugoTransactions::eventOperation(Ndb* pNdb, void* pstats,
+ int records) {
+ int myXXXXX = XXXXX++;
+
+ const char function[] = "HugoTransactions::eventOperation: ";
+ struct receivedEvent* recInsertEvent;
+ NdbAutoObjArrayPtr<struct receivedEvent>
+ p00( recInsertEvent = new struct receivedEvent[3*records] );
+ struct receivedEvent* recUpdateEvent = &recInsertEvent[records];
+ struct receivedEvent* recDeleteEvent = &recInsertEvent[2*records];
+
+ EventOperationStats &stats = *(EventOperationStats*)pstats;
+
+ stats.n_inserts = 0;
+ stats.n_deletes = 0;
+ stats.n_updates = 0;
+ stats.n_consecutive = 0;
+ stats.n_duplicates = 0;
+ stats.n_inconsistent_gcis = 0;
+
+ for (int i = 0; i < records; i++) {
+ recInsertEvent[i].pk = 0xFFFFFFFF;
+ recInsertEvent[i].count = 0;
+ recInsertEvent[i].event = 0xFFFFFFFF;
+
+ recUpdateEvent[i].pk = 0xFFFFFFFF;
+ recUpdateEvent[i].count = 0;
+ recUpdateEvent[i].event = 0xFFFFFFFF;
+
+ recDeleteEvent[i].pk = 0xFFFFFFFF;
+ recDeleteEvent[i].count = 0;
+ recDeleteEvent[i].event = 0xFFFFFFFF;
+ }
+
+ NdbDictionary::Dictionary *myDict = pNdb->getDictionary();
+
+ if (!myDict) {
+ g_err << function << "Event Creation failedDictionary not found\n";
+ return NDBT_FAILED;
+ }
+
+ int r = 0;
+ NdbEventOperation *pOp;
+
+ char eventName[1024];
+ sprintf(eventName,"%s_EVENT",tab.getName());
+ int noEventColumnName = tab.getNoOfColumns();
+
+ g_info << function << "create EventOperation\n";
+ pOp = pNdb->createEventOperation(eventName, 100);
+ if ( pOp == NULL ) {
+ g_err << function << "Event operation creation failed\n";
+ return NDBT_FAILED;
+ }
+
+ g_info << function << "get values\n";
+ NdbRecAttr* recAttr[1024];
+ NdbRecAttr* recAttrPre[1024];
+
+ const NdbDictionary::Table *_table = myDict->getTable(tab.getName());
+
+ for (int a = 0; a < noEventColumnName; a++) {
+ recAttr[a] = pOp->getValue(_table->getColumn(a)->getName());
+ recAttrPre[a] = pOp->getPreValue(_table->getColumn(a)->getName());
+ }
+
+ // set up the callbacks
+ g_info << function << "execute\n";
+ if (pOp->execute()) { // This starts changes to "start flowing"
+ g_err << function << "operation execution failed\n";
+ return NDBT_FAILED;
+ }
+
+ g_info << function << "ok\n";
+
+ int count = 0;
+ Uint32 last_inconsitant_gci = 0xEFFFFFF0;
+
+ while (r < records){
+ //printf("now waiting for event...\n");
+ int res = pNdb->pollEvents(1000); // wait for event or 1000 ms
+
+ if (res > 0) {
+ //printf("got data! %d\n", r);
+ int overrun;
+ while (pOp->next(&overrun) > 0) {
+ r++;
+ r += overrun;
+ count++;
+
+ Uint32 gci = pOp->getGCI();
+ Uint32 pk = recAttr[0]->u_32_value();
+
+ if (!pOp->isConsistent()) {
+ if (last_inconsitant_gci != gci) {
+ last_inconsitant_gci = gci;
+ stats.n_inconsistent_gcis++;
+ }
+ g_warning << "A node failure has occured and events might be missing\n";
+ }
+ g_info << function << "GCI " << gci << ": " << count;
+ struct receivedEvent* recEvent;
+ switch (pOp->getEventType()) {
+ case NdbDictionary::Event::TE_INSERT:
+ stats.n_inserts++;
+ g_info << " INSERT: ";
+ recEvent = recInsertEvent;
+ break;
+ case NdbDictionary::Event::TE_DELETE:
+ stats.n_deletes++;
+ g_info << " DELETE: ";
+ recEvent = recDeleteEvent;
+ break;
+ case NdbDictionary::Event::TE_UPDATE:
+ stats.n_updates++;
+ g_info << " UPDATE: ";
+ recEvent = recUpdateEvent;
+ break;
+ }
+
+ if (pk < records) {
+ recEvent[pk].pk = pk;
+ recEvent[pk].count++;
+ }
+
+ g_info << "overrun " << overrun << " pk " << pk;
+ for (int i = 1; i < noEventColumnName; i++) {
+ if (recAttr[i]->isNULL() >= 0) { // we have a value
+ g_info << " post[" << i << "]=";
+ if (recAttr[i]->isNULL() == 0) // we have a non-null value
+ g_info << recAttr[i]->u_32_value();
+ else // we have a null value
+ g_info << "NULL";
+ }
+ if (recAttrPre[i]->isNULL() >= 0) { // we have a value
+ g_info << " pre[" << i << "]=";
+ if (recAttrPre[i]->isNULL() == 0) // we have a non-null value
+ g_info << recAttrPre[i]->u_32_value();
+ else // we have a null value
+ g_info << "NULL";
+ }
+ }
+ g_info << endl;
+ }
+ } else
+ ;//printf("timed out\n");
+ }
+
+ // sleep ((XXXXX-myXXXXX)*2);
+
+ g_info << myXXXXX << "dropping event operation" << endl;
+
+ int res = pNdb->dropEventOperation(pOp);
+ if (res != 0) {
+ g_err << "operation execution failed\n";
+ return NDBT_FAILED;
+ }
+
+ g_info << myXXXXX << " ok" << endl;
+
+ if (stats.n_inserts > 0) {
+ stats.n_consecutive++;
+ }
+ if (stats.n_deletes > 0) {
+ stats.n_consecutive++;
+ }
+ if (stats.n_updates > 0) {
+ stats.n_consecutive++;
+ }
+ for (Uint32 i = 0; i < records/3; i++) {
+ if (recInsertEvent[i].pk != i) {
+ stats.n_consecutive ++;
+ ndbout << "missing insert pk " << i << endl;
+ } else if (recInsertEvent[i].count > 1) {
+ ndbout << "duplicates insert pk " << i
+ << " count " << recInsertEvent[i].count << endl;
+ stats.n_duplicates += recInsertEvent[i].count-1;
+ }
+ if (recUpdateEvent[i].pk != i) {
+ stats.n_consecutive ++;
+ ndbout << "missing update pk " << i << endl;
+ } else if (recUpdateEvent[i].count > 1) {
+ ndbout << "duplicates update pk " << i
+ << " count " << recUpdateEvent[i].count << endl;
+ stats.n_duplicates += recUpdateEvent[i].count-1;
+ }
+ if (recDeleteEvent[i].pk != i) {
+ stats.n_consecutive ++;
+ ndbout << "missing delete pk " << i << endl;
+ } else if (recDeleteEvent[i].count > 1) {
+ ndbout << "duplicates delete pk " << i
+ << " count " << recDeleteEvent[i].count << endl;
+ stats.n_duplicates += recDeleteEvent[i].count-1;
+ }
+ }
+
+ return NDBT_OK;
+}
+
+int
+HugoTransactions::pkReadRecords(Ndb* pNdb,
+ int records,
+ int batchsize,
+ bool dirty){
+ int reads = 0;
+ int r = 0;
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+ NdbOperation *pOp;
+
+ if (batchsize == 0) {
+ g_info << "ERROR: Argument batchsize == 0 in pkReadRecords(). Not allowed." << endl;
+ return NDBT_FAILED;
+ }
+
+ allocRows(batchsize);
+
+ while (r < records){
+ if (retryAttempt >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ for(int b=0; (b<batchsize) && (r+b < records); b++){
+ pOp = pTrans->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ if (dirty == true){
+ check = pOp->dirtyRead();
+ } else {
+ check = pOp->readTuple();
+ }
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Define primary keys
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == true){
+ if(equalForAttr(pOp, a, r+b) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ // Define attributes to read
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if((rows[b]->attributeStore(a) =
+ pOp->getValue(tab.getColumn(a)->getName())) == 0) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+
+ }
+
+ check = pTrans->execute(Commit);
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ switch(err.code){
+ case 626: // Tuple did not exist
+ g_info << r << ": " << err.code << " " << err.message << endl;
+ r++;
+ break;
+
+ default:
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ } else{
+ for (int b=0; (b<batchsize) && (r+b<records); b++){
+ if (calc.verifyRowValues(rows[b]) != 0){
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ reads++;
+ r++;
+ }
+ }
+
+ pNdb->closeTransaction(pTrans);
+
+ }
+ deallocRows();
+ g_info << reads << " records read" << endl;
+ return NDBT_OK;
+}
+
+
+
+int
+HugoTransactions::pkUpdateRecords(Ndb* pNdb,
+ int records,
+ int batch,
+ int doSleep){
+ int updated = 0;
+ int r = 0;
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+ NdbOperation *pOp;
+
+ allocRows(batch);
+
+ g_info << "|- Updating records..." << endl;
+ while (r < records){
+
+ if (retryAttempt >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ if (doSleep > 0)
+ NdbSleep_MilliSleep(doSleep);
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ for(int b = 0; b<batch && (r+b) < records; b++){
+ pOp = pTrans->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->readTupleExclusive();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Define primary keys
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == true){
+ if(equalForAttr(pOp, a, r+b) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ // Define attributes to read
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if((rows[b]->attributeStore(a) =
+ pOp->getValue(tab.getColumn(a)->getName())) == 0) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ check = pTrans->execute(NoCommit);
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ for(int b = 0; b<batch && (b+r)<records; b++){
+ if (calc.verifyRowValues(rows[b]) != 0){
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ int updates = calc.getUpdatesValue(rows[b]) + 1;
+
+ NdbOperation* pUpdOp;
+ pUpdOp = pTrans->getNdbOperation(tab.getName());
+ if (pUpdOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pUpdOp->updateTuple();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == true){
+ if(equalForAttr(pUpdOp, a, r+b) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == false){
+ if(setValueForAttr(pUpdOp, a, r+b, updates ) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+ }
+
+ check = pTrans->execute(Commit);
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ ndbout << "r = " << r << endl;
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ else{
+ updated += batch;
+ }
+
+
+ pNdb->closeTransaction(pTrans);
+
+ r += batch; // Read next record
+ }
+
+ deallocRows();
+ g_info << "|- " << updated << " records updated" << endl;
+ return NDBT_OK;
+}
+
+int
+HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb,
+ int records,
+ int batch){
+ int updated = 0;
+ int r = 0;
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+
+ while (r < records){
+
+ if (retryAttempt >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ NdbOperation* pOp = pTrans->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->readTupleExclusive();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Define primary keys
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == true){
+ if(equalForAttr(pOp, a, r) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ // Read update value
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (calc.isUpdateCol(a) == true){
+ if((row.attributeStore(a) =
+ pOp->getValue(tab.getColumn(a)->getName())) == 0) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ check = pTrans->execute(NoCommit);
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ int updates = calc.getUpdatesValue(&row) + 1;
+
+ NdbOperation* pUpdOp;
+ pUpdOp = pTrans->getNdbOperation(tab.getName());
+ if (pUpdOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pUpdOp->interpretedUpdateTuple();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // PKs
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == true){
+ if(equalForAttr(pUpdOp, a, r) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ // Update col
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if ((tab.getColumn(a)->getPrimaryKey() == false) &&
+ (calc.isUpdateCol(a) == true)){
+
+ // TODO switch for 32/64 bit
+ const NdbDictionary::Column* attr = tab.getColumn(a);
+ Uint32 valToIncWith = 1;
+ check = pUpdOp->incValue(attr->getName(), valToIncWith);
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ // Remaining attributes
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if ((tab.getColumn(a)->getPrimaryKey() == false) &&
+ (calc.isUpdateCol(a) == false)){
+ if(setValueForAttr(pUpdOp, a, r, updates ) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+
+
+ check = pTrans->execute(Commit);
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ ndbout << "r = " << r << endl;
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ else{
+ updated++;
+ }
+
+
+ pNdb->closeTransaction(pTrans);
+
+ r++; // Read next record
+
+ }
+
+ g_info << "|- " << updated << " records updated" << endl;
+ return NDBT_OK;
+}
+
+int
+HugoTransactions::pkDelRecords(Ndb* pNdb,
+ int records,
+ int batch,
+ bool allowConstraintViolation,
+ int doSleep){
+ // TODO Batch is not implemented
+ int deleted = 0;
+ int r = 0;
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+ NdbOperation *pOp;
+
+ g_info << "|- Deleting records..." << endl;
+ while (r < records){
+
+ if (retryAttempt >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ if (doSleep > 0)
+ NdbSleep_MilliSleep(doSleep);
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ pOp = pTrans->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->deleteTuple();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Define primary keys
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == true){
+ if(equalForAttr(pOp, a, r) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+ check = pTrans->execute(Commit);
+ if( check == -1) {
+ const NdbError err = pTrans->getNdbError();
+
+ switch(err.status){
+ case NdbError::TemporaryError:
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ break;
+
+ case NdbError::PermanentError:
+ if (allowConstraintViolation == true){
+ switch (err.classification){
+ case NdbError::ConstraintViolation:
+ // Tuple did not exist, OK but should be reported
+ g_info << r << ": " << err.code << " " << err.message << endl;
+ continue;
+ break;
+ default:
+ break;
+ }
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ break;
+
+ default:
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ else {
+ deleted++;
+ }
+ pNdb->closeTransaction(pTrans);
+
+ r++; // Read next record
+
+ }
+
+ g_info << "|- " << deleted << " records deleted" << endl;
+ return NDBT_OK;
+}
+
+
+int
+HugoTransactions::lockRecords(Ndb* pNdb,
+ int records,
+ int percentToLock,
+ int lockTime){
+ // Place a lock on percentToLock% of the records in the Db
+ // Keep the locks for lockTime ms, commit operation
+ // and lock som other records
+ int r = 0;
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+ NdbOperation *pOp;
+
+ // Calculate how many records to lock in each batch
+ if (percentToLock <= 0)
+ percentToLock = 1;
+ double percentVal = (double)percentToLock / 100;
+ int lockBatch = (int)(records * percentVal);
+ if (lockBatch <= 0)
+ lockBatch = 1;
+
+ allocRows(lockBatch);
+
+ while (r < records){
+ g_info << "|- Locking " << lockBatch << " records..." << endl;
+
+ if (retryAttempt >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ for(int b = 0; (b<lockBatch) && (r+b < records); b++){
+ pOp = pTrans->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->readTupleExclusive();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Define primary keys
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == true){
+ if(equalForAttr(pOp, a, r+b) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ // Define attributes to read
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if((rows[b]->attributeStore(a) =
+ pOp->getValue(tab.getColumn(a)->getName())) == 0) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+ // NoCommit lockTime times with 100 millis interval
+ int sleepInterval = 50;
+ int lockCount = lockTime / sleepInterval;
+ int commitCount = 0;
+ do {
+ check = pTrans->execute(NoCommit);
+ if( check == -1) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ for (int b=0; (b<lockBatch) && (r+b<records); b++){
+ if (calc.verifyRowValues(rows[b]) != 0){
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ commitCount++;
+ NdbSleep_MilliSleep(sleepInterval);
+ } while (commitCount < lockCount);
+
+ // Really commit the trans, puuh!
+ check = pTrans->execute(Commit);
+ if( check == -1) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ else{
+ for (int b=0; (b<lockBatch) && (r<records); b++){
+ if (calc.verifyRowValues(rows[b]) != 0){
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ r++; // Read next record
+ }
+ }
+
+ pNdb->closeTransaction(pTrans);
+
+
+ }
+ deallocRows();
+ g_info << "|- Record locking completed" << endl;
+ return NDBT_OK;
+}
+
+int
+HugoTransactions::indexReadRecords(Ndb* pNdb,
+ const char * idxName,
+ int records,
+ int batchsize){
+ int reads = 0;
+ int r = 0;
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+ NdbOperation *pOp;
+ NdbScanOperation *sOp;
+ NdbResultSet * rs;
+
+ const NdbDictionary::Index* pIndex
+ = pNdb->getDictionary()->getIndex(idxName, tab.getName());
+
+ const bool ordered = (pIndex->getType()==NdbDictionary::Index::OrderedIndex);
+
+ if (batchsize == 0) {
+ g_info << "ERROR: Argument batchsize == 0 in indexReadRecords(). "
+ << "Not allowed." << endl;
+ return NDBT_FAILED;
+ }
+
+ if (ordered) {
+ batchsize = 1;
+ }
+
+ allocRows(batchsize);
+
+ while (r < records){
+ if (retryAttempt >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ for(int b=0; (b<batchsize) && (r+b < records); b++){
+ if(!ordered){
+ pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ check = pOp->readTuple();
+ } else {
+ pOp = sOp = pTrans->getNdbScanOperation(idxName, tab.getName());
+ if (sOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = 0;
+ rs = sOp->readTuples();
+ }
+
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Define primary keys
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == true){
+ if(equalForAttr(pOp, a, r+b) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ // Define attributes to read
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if((rows[b]->attributeStore(a) =
+ pOp->getValue(tab.getColumn(a)->getName())) == 0) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ check = pTrans->execute(Commit);
+ check = (check == -1 ? -1 : !ordered ? check : rs->nextResult(true));
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ switch(err.code){
+ case 626: // Tuple did not exist
+ g_info << r << ": " << err.code << " " << err.message << endl;
+ r++;
+ break;
+
+ default:
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ } else{
+ for (int b=0; (b<batchsize) && (r+b<records); b++){
+ if (calc.verifyRowValues(rows[b]) != 0){
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ reads++;
+ r++;
+ }
+ if(ordered && rs->nextResult(true) == 0){
+ ndbout << "Error when comparing records "
+ << " - index op next_result to many" << endl;
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ pNdb->closeTransaction(pTrans);
+ }
+ deallocRows();
+ g_info << reads << " records read" << endl;
+ return NDBT_OK;
+}
+
+
+
+int
+HugoTransactions::indexUpdateRecords(Ndb* pNdb,
+ const char * idxName,
+ int records,
+ int batchsize){
+
+ int updated = 0;
+ int r = 0;
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+ NdbOperation *pOp;
+ NdbScanOperation * sOp;
+ NdbResultSet * rs;
+
+ const NdbDictionary::Index* pIndex
+ = pNdb->getDictionary()->getIndex(idxName, tab.getName());
+
+ const bool ordered = (pIndex->getType()==NdbDictionary::Index::OrderedIndex);
+ if (ordered){
+ batchsize = 1;
+ }
+
+ allocRows(batchsize);
+
+ while (r < records){
+ if (retryAttempt >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ for(int b = 0; b<batchsize && (b+r)<records; b++){
+ if(!ordered){
+ pOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->readTupleExclusive();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ } else {
+ pOp = sOp = pTrans->getNdbScanOperation(idxName, tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = 0;
+ rs = sOp->readTuplesExclusive();
+ }
+
+ // Define primary keys
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == true){
+ if(equalForAttr(pOp, a, r+b) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ // Define attributes to read
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if((rows[b]->attributeStore(a) =
+ pOp->getValue(tab.getColumn(a)->getName())) == 0) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ check = pTrans->execute(NoCommit);
+ check = (check == -1 ? -1 : !ordered ? check : rs->nextResult(true));
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+
+ if (err.status == NdbError::TemporaryError){
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ return NDBT_FAILED;
+ }
+
+ if(ordered && check != 0){
+ g_err << "Row: " << r << " not found!!" << endl;
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ for(int b = 0; b<batchsize && (b+r)<records; b++){
+ if (calc.verifyRowValues(rows[b]) != 0){
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ int updates = calc.getUpdatesValue(rows[b]) + 1;
+
+ NdbOperation* pUpdOp;
+ if(!ordered){
+ pUpdOp = pTrans->getNdbIndexOperation(idxName, tab.getName());
+ check = (pUpdOp == 0 ? -1 : pUpdOp->updateTuple());
+ } else {
+ pUpdOp = rs->updateTuple();
+ }
+
+ if (pUpdOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ if(!ordered){
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == true){
+ if(equalForAttr(pUpdOp, a, r+b) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+ }
+
+ for(int a = 0; a<tab.getNoOfColumns(); a++){
+ if (tab.getColumn(a)->getPrimaryKey() == false){
+ if(setValueForAttr(pUpdOp, a, r+b, updates ) != 0){
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+ }
+
+ check = pTrans->execute(Commit);
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+
+ if (err.status == NdbError::TemporaryError){
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ndbout << "r = " << r << endl;
+ return NDBT_FAILED;
+ } else {
+ updated += batchsize;
+ }
+
+ pNdb->closeTransaction(pTrans);
+
+ r+= batchsize; // Read next record
+ }
+
+ g_info << "|- " << updated << " records updated" << endl;
+ return NDBT_OK;
+}
+
+