summaryrefslogtreecommitdiff
path: root/storage/ndb/test/src/HugoAsynchTransactions.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/test/src/HugoAsynchTransactions.cpp')
-rw-r--r--storage/ndb/test/src/HugoAsynchTransactions.cpp498
1 files changed, 498 insertions, 0 deletions
diff --git a/storage/ndb/test/src/HugoAsynchTransactions.cpp b/storage/ndb/test/src/HugoAsynchTransactions.cpp
new file mode 100644
index 00000000000..5d2eb451c0b
--- /dev/null
+++ b/storage/ndb/test/src/HugoAsynchTransactions.cpp
@@ -0,0 +1,498 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <NdbSleep.h>
+#include <HugoAsynchTransactions.hpp>
+
+HugoAsynchTransactions::HugoAsynchTransactions(const NdbDictionary::Table& _t)
+ : HugoTransactions(_t),
+ transactionsCompleted(0),
+ numTransactions(0),
+ transactions(NULL)
+{
+}
+
+HugoAsynchTransactions::~HugoAsynchTransactions(){
+ deallocTransactions();
+}
+
+void asynchCallback(int result, NdbConnection* pTrans,
+ void* anObject) {
+ HugoAsynchTransactions* pHugo = (HugoAsynchTransactions*) anObject;
+
+ pHugo->transactionCompleted();
+
+ if (result == -1) {
+ const NdbError err = pTrans->getNdbError();
+ switch(err.status) {
+ case NdbError::Success:
+ ERR(err);
+ g_info << "ERROR: NdbError reports success when transcaction failed"
+ << endl;
+ break;
+
+ case NdbError::TemporaryError:
+ ERR(err);
+ break;
+
+#if 0
+ case 626: // Tuple did not exist
+ g_info << (unsigned int)pHugo->getTransactionsCompleted() << ": "
+ << err.code << " " << err.message << endl;
+ break;
+#endif
+
+ case NdbError::UnknownResult:
+ ERR(err);
+ break;
+
+ case NdbError::PermanentError:
+ switch (err.classification) {
+ case NdbError::ConstraintViolation:
+ // Tuple already existed, OK in this application,
+ // but should be reported
+ g_info << (unsigned int)pHugo->getTransactionsCompleted()
+ << ": " << err.code << " " << err.message << endl;
+ break;
+ default:
+ ERR(err);
+ break;
+ }
+ break;
+ }
+ } else {// if (result == -1)
+ /*
+ ndbout << (unsigned int)pHugo->getTransactionsCompleted() << " completed"
+ << endl;
+ */
+ }
+}
+
+int
+HugoAsynchTransactions::loadTableAsynch(Ndb* pNdb,
+ int records,
+ int batch,
+ int trans,
+ int operations){
+
+ int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
+ NO_INSERT);
+ g_info << (unsigned int)transactionsCompleted * operations
+ << "|- inserted..." << endl;
+
+ return result;
+}
+
+void
+HugoAsynchTransactions::transactionCompleted() {
+ transactionsCompleted++;
+}
+
+long
+HugoAsynchTransactions::getTransactionsCompleted() {
+ return transactionsCompleted;
+}
+
+int
+HugoAsynchTransactions::pkDelRecordsAsynch(Ndb* pNdb,
+ int records,
+ int batch,
+ int trans,
+ int operations) {
+
+ g_info << "|- Deleting records asynchronous..." << endl;
+
+ int result = executeAsynchOperation(pNdb, records, batch, trans,
+ operations,
+ NO_DELETE);
+ g_info << "|- " << (unsigned int)transactionsCompleted * operations
+ << " deleted..." << endl;
+
+ return result;
+}
+
+int
+HugoAsynchTransactions::pkReadRecordsAsynch(Ndb* pNdb,
+ int records,
+ int batch,
+ int trans,
+ int operations) {
+
+ g_info << "|- Reading records asynchronous..." << endl;
+
+ allocRows(trans*operations);
+ int result = executeAsynchOperation(pNdb, records, batch, trans, operations,
+ NO_READ);
+
+ g_info << "|- " << (unsigned int)transactionsCompleted * operations
+ << " read..."
+ << endl;
+
+ deallocRows();
+
+ return result;
+}
+
+int
+HugoAsynchTransactions::pkUpdateRecordsAsynch(Ndb* pNdb,
+ int records,
+ int batch,
+ int trans,
+ int operations) {
+
+ g_info << "|- Updating records asynchronous..." << endl;
+
+ int check = 0;
+ int cTrans = 0;
+ int cReadRecords = 0;
+ int cReadIndex = 0;
+ int cRecords = 0;
+ int cIndex = 0;
+
+ transactionsCompleted = 0;
+
+ allocRows(trans*operations);
+ allocTransactions(trans);
+ int a, t, r;
+
+ for (int i = 0; i < batch; i++) { // For each batch
+ while (cRecords < records*batch) {
+ cTrans = 0;
+ cReadIndex = 0;
+ for (t = 0; t < trans; t++) { // For each transaction
+ transactions[t] = pNdb->startTransaction();
+ if (transactions[t] == NULL) {
+ ERR(pNdb->getNdbError());
+ return NDBT_FAILED;
+ }
+ for (int k = 0; k < operations; k++) { // For each operation
+ NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+
+ // Read
+ // Define primary keys
+ check = pOp->readTupleExclusive();
+ for (a = 0; a < tab.getNoOfColumns(); a++) {
+ if (tab.getColumn(a)->getPrimaryKey() == true) {
+ if (equalForAttr(pOp, a, cReadRecords) != 0){
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+ }
+ }
+ // Define attributes to read
+ for (a = 0; a < tab.getNoOfColumns(); a++) {
+ if ((rows[cReadIndex]->attributeStore(a) =
+ pOp->getValue(tab.getColumn(a)->getName())) == 0) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+ }
+ cReadIndex++;
+ cReadRecords++;
+
+ } // For each operation
+
+ // Let's prepare...
+ transactions[t]->executeAsynchPrepare(NoCommit, &asynchCallback,
+ this);
+ cTrans++;
+
+ if (cReadRecords >= records) {
+ // No more transactions needed
+ break;
+ }
+ } // For each transaction
+
+ // Wait for all outstanding transactions
+ pNdb->sendPollNdb(3000, 0, 0);
+
+ // Verify the data!
+ for (r = 0; r < trans*operations; r++) {
+ if (calc.verifyRowValues(rows[r]) != 0) {
+ g_info << "|- Verify failed..." << endl;
+ // Close all transactions
+ for (int t = 0; t < cTrans; t++) {
+ pNdb->closeTransaction(transactions[t]);
+ }
+ return NDBT_FAILED;
+ }
+ }
+
+ // Update
+ cTrans = 0;
+ cIndex = 0;
+ for (t = 0; t < trans; t++) { // For each transaction
+ for (int k = 0; k < operations; k++) { // For each operation
+ NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+
+ int updates = calc.getUpdatesValue(rows[cIndex]) + 1;
+
+ check = pOp->updateTuple();
+ if (check == -1) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+
+ // Set search condition for the record
+ for (a = 0; a < tab.getNoOfColumns(); a++) {
+ if (tab.getColumn(a)->getPrimaryKey() == true) {
+ if (equalForAttr(pOp, a, cRecords) != 0) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ // Update the record
+ for (a = 0; a < tab.getNoOfColumns(); a++) {
+ if (tab.getColumn(a)->getPrimaryKey() == false) {
+ if (setValueForAttr(pOp, a, cRecords, updates) != 0) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+ }
+ }
+ cIndex++;
+ cRecords++;
+
+ } // For each operation
+
+ // Let's prepare...
+ transactions[t]->executeAsynchPrepare(Commit, &asynchCallback,
+ this);
+ cTrans++;
+
+ if (cRecords >= records) {
+ // No more transactions needed
+ break;
+ }
+ } // For each transaction
+
+ // Wait for all outstanding transactions
+ pNdb->sendPollNdb(3000, 0, 0);
+
+ // Close all transactions
+ for (t = 0; t < cTrans; t++) {
+ pNdb->closeTransaction(transactions[t]);
+ }
+
+ } // while (cRecords < records*batch)
+
+ } // For each batch
+
+ deallocTransactions();
+ deallocRows();
+
+ g_info << "|- " << ((unsigned int)transactionsCompleted * operations)/2
+ << " updated..." << endl;
+ return NDBT_OK;
+}
+
+void
+HugoAsynchTransactions::allocTransactions(int trans) {
+ if (transactions != NULL) {
+ deallocTransactions();
+ }
+ numTransactions = trans;
+ transactions = new NdbConnection*[numTransactions];
+}
+
+void
+HugoAsynchTransactions::deallocTransactions() {
+ if (transactions != NULL){
+ delete[] transactions;
+ }
+ transactions = NULL;
+}
+
+int
+HugoAsynchTransactions::executeAsynchOperation(Ndb* pNdb,
+ int records,
+ int batch,
+ int trans,
+ int operations,
+ NDB_OPERATION theOperation,
+ ExecType theType) {
+
+ int check = 0;
+ // int retryAttempt = 0; // Not used at the moment
+ // int retryMax = 5; // Not used at the moment
+ int cTrans = 0;
+ int cRecords = 0;
+ int cIndex = 0;
+ int a,t,r;
+
+ transactionsCompleted = 0;
+ allocTransactions(trans);
+
+ for (int i = 0; i < batch; i++) { // For each batch
+ while (cRecords < records*batch) {
+ cTrans = 0;
+ cIndex = 0;
+ for (t = 0; t < trans; t++) { // For each transaction
+ transactions[t] = pNdb->startTransaction();
+ if (transactions[t] == NULL) {
+ ERR(pNdb->getNdbError());
+ return NDBT_FAILED;
+ }
+ for (int k = 0; k < operations; k++) { // For each operation
+ NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+
+ switch (theOperation) {
+ case NO_INSERT:
+ // Insert
+ check = pOp->insertTuple();
+ if (check == -1) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+
+ // Set a calculated value for each attribute in this table
+ for (a = 0; a < tab.getNoOfColumns(); a++) {
+ if (setValueForAttr(pOp, a, cRecords, 0 ) != 0) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+ } // For each attribute
+ break;
+ case NO_UPDATE:
+ // This is a special case and is handled in the calling client...
+ break;
+ break;
+ case NO_READ:
+ // Define primary keys
+ check = pOp->readTuple();
+ for (a = 0; a < tab.getNoOfColumns(); a++) {
+ if (tab.getColumn(a)->getPrimaryKey() == true) {
+ if (equalForAttr(pOp, a, cRecords) != 0){
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+ }
+ }
+ // Define attributes to read
+ for (a = 0; a < tab.getNoOfColumns(); a++) {
+ if ((rows[cIndex]->attributeStore(a) =
+ pOp->getValue(tab.getColumn(a)->getName())) == 0) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+ }
+ break;
+ case NO_DELETE:
+ // Delete
+ check = pOp->deleteTuple();
+ if (check == -1) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+
+ // Define primary keys
+ for (a = 0; a < tab.getNoOfColumns(); a++) {
+ if (tab.getColumn(a)->getPrimaryKey() == true){
+ if (equalForAttr(pOp, a, cRecords) != 0) {
+ ERR(transactions[t]->getNdbError());
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+ }
+ }
+ break;
+ default:
+ // Should not happen...
+ pNdb->closeTransaction(transactions[t]);
+ return NDBT_FAILED;
+ }
+
+ cIndex++;
+ cRecords++;
+
+ } // For each operation
+
+ // Let's prepare...
+ transactions[t]->executeAsynchPrepare(theType, &asynchCallback,
+ this);
+ cTrans++;
+
+ if (cRecords >= records) {
+ // No more transactions needed
+ break;
+ }
+ } // For each transaction
+
+ // Wait for all outstanding transactions
+ pNdb->sendPollNdb(3000, 0, 0);
+
+ // ugly... it's starts to resemble flexXXX ...:(
+ switch (theOperation) {
+ case NO_READ:
+ // Verify the data!
+ for (r = 0; r < trans*operations; r++) {
+ if (calc.verifyRowValues(rows[r]) != 0) {
+ g_info << "|- Verify failed..." << endl;
+ // Close all transactions
+ for (int t = 0; t < cTrans; t++) {
+ pNdb->closeTransaction(transactions[t]);
+ }
+ return NDBT_FAILED;
+ }
+ }
+ break;
+ case NO_INSERT:
+ case NO_UPDATE:
+ case NO_DELETE:
+ break;
+ }
+
+ // Close all transactions
+ for (t = 0; t < cTrans; t++) {
+ pNdb->closeTransaction(transactions[t]);
+ }
+
+ } // while (cRecords < records*batch)
+
+ } // For each batch
+
+ deallocTransactions();
+
+ return NDBT_OK;
+
+}