/* Copyright (c) 2003-2005, 2007 MySQL AB Use is subject to license terms This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */ #include #include HugoAsynchTransactions::HugoAsynchTransactions(const NdbDictionary::Table& _t) : HugoTransactions(_t), transactionsCompleted(0), numTransactions(0), transactions(NULL) { } HugoAsynchTransactions::~HugoAsynchTransactions(){ deallocTransactions(); } void asynchCallback(int result, NdbConnection* pTrans, void* anObject) { HugoAsynchTransactions* pHugo = (HugoAsynchTransactions*) anObject; pHugo->transactionCompleted(); if (result == -1) { const NdbError err = pTrans->getNdbError(); switch(err.status) { case NdbError::Success: ERR(err); g_info << "ERROR: NdbError reports success when transcaction failed" << endl; break; case NdbError::TemporaryError: ERR(err); break; #if 0 case 626: // Tuple did not exist g_info << (unsigned int)pHugo->getTransactionsCompleted() << ": " << err.code << " " << err.message << endl; break; #endif case NdbError::UnknownResult: ERR(err); break; case NdbError::PermanentError: switch (err.classification) { case NdbError::ConstraintViolation: // Tuple already existed, OK in this application, // but should be reported g_info << (unsigned int)pHugo->getTransactionsCompleted() << ": " << err.code << " " << err.message << endl; break; default: ERR(err); break; } break; } } else {// if (result == -1) /* ndbout << (unsigned int)pHugo->getTransactionsCompleted() << " completed" << endl; */ } } int HugoAsynchTransactions::loadTableAsynch(Ndb* pNdb, int records, int batch, int trans, int operations){ int result = executeAsynchOperation(pNdb, records, batch, trans, operations, NO_INSERT); g_info << (unsigned int)transactionsCompleted * operations << "|- inserted..." << endl; return result; } void HugoAsynchTransactions::transactionCompleted() { transactionsCompleted++; } long HugoAsynchTransactions::getTransactionsCompleted() { return transactionsCompleted; } int HugoAsynchTransactions::pkDelRecordsAsynch(Ndb* pNdb, int records, int batch, int trans, int operations) { g_info << "|- Deleting records asynchronous..." << endl; int result = executeAsynchOperation(pNdb, records, batch, trans, operations, NO_DELETE); g_info << "|- " << (unsigned int)transactionsCompleted * operations << " deleted..." << endl; return result; } int HugoAsynchTransactions::pkReadRecordsAsynch(Ndb* pNdb, int records, int batch, int trans, int operations) { g_info << "|- Reading records asynchronous..." << endl; allocRows(trans*operations); int result = executeAsynchOperation(pNdb, records, batch, trans, operations, NO_READ); g_info << "|- " << (unsigned int)transactionsCompleted * operations << " read..." << endl; deallocRows(); return result; } int HugoAsynchTransactions::pkUpdateRecordsAsynch(Ndb* pNdb, int records, int batch, int trans, int operations) { g_info << "|- Updating records asynchronous..." << endl; int check = 0; int cTrans = 0; int cReadRecords = 0; int cReadIndex = 0; int cRecords = 0; int cIndex = 0; transactionsCompleted = 0; allocRows(trans*operations); allocTransactions(trans); int a, t, r; for (int i = 0; i < batch; i++) { // For each batch while (cRecords < records*batch) { cTrans = 0; cReadIndex = 0; for (t = 0; t < trans; t++) { // For each transaction transactions[t] = pNdb->startTransaction(); if (transactions[t] == NULL) { ERR(pNdb->getNdbError()); return NDBT_FAILED; } for (int k = 0; k < operations; k++) { // For each operation NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } // Read // Define primary keys check = pOp->readTupleExclusive(); if (equalForRow(pOp, cReadRecords) != 0) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } // Define attributes to read for (a = 0; a < tab.getNoOfColumns(); a++) { if ((rows[cReadIndex]->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } } cReadIndex++; cReadRecords++; } // For each operation // Let's prepare... transactions[t]->executeAsynchPrepare(NoCommit, &asynchCallback, this); cTrans++; if (cReadRecords >= records) { // No more transactions needed break; } } // For each transaction // Wait for all outstanding transactions pNdb->sendPollNdb(3000, 0, 0); // Verify the data! for (r = 0; r < trans*operations; r++) { if (calc.verifyRowValues(rows[r]) != 0) { g_info << "|- Verify failed..." << endl; // Close all transactions for (int t = 0; t < cTrans; t++) { pNdb->closeTransaction(transactions[t]); } return NDBT_FAILED; } } // Update cTrans = 0; cIndex = 0; for (t = 0; t < trans; t++) { // For each transaction for (int k = 0; k < operations; k++) { // For each operation NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } int updates = calc.getUpdatesValue(rows[cIndex]) + 1; check = pOp->updateTuple(); if (check == -1) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } // Set search condition for the record if (equalForRow(pOp, cReadRecords) != 0) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } // Update the record for (a = 0; a < tab.getNoOfColumns(); a++) { if (tab.getColumn(a)->getPrimaryKey() == false) { if (setValueForAttr(pOp, a, cRecords, updates) != 0) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } } } cIndex++; cRecords++; } // For each operation // Let's prepare... transactions[t]->executeAsynchPrepare(Commit, &asynchCallback, this); cTrans++; if (cRecords >= records) { // No more transactions needed break; } } // For each transaction // Wait for all outstanding transactions pNdb->sendPollNdb(3000, 0, 0); // Close all transactions for (t = 0; t < cTrans; t++) { pNdb->closeTransaction(transactions[t]); } } // while (cRecords < records*batch) } // For each batch deallocTransactions(); deallocRows(); g_info << "|- " << ((unsigned int)transactionsCompleted * operations)/2 << " updated..." << endl; return NDBT_OK; } void HugoAsynchTransactions::allocTransactions(int trans) { if (transactions != NULL) { deallocTransactions(); } numTransactions = trans; transactions = new NdbConnection*[numTransactions]; } void HugoAsynchTransactions::deallocTransactions() { if (transactions != NULL){ delete[] transactions; } transactions = NULL; } int HugoAsynchTransactions::executeAsynchOperation(Ndb* pNdb, int records, int batch, int trans, int operations, NDB_OPERATION theOperation, ExecType theType) { int check = 0; // int retryAttempt = 0; // Not used at the moment // int retryMax = 5; // Not used at the moment int cTrans = 0; int cRecords = 0; int cIndex = 0; int a,t,r; transactionsCompleted = 0; allocTransactions(trans); for (int i = 0; i < batch; i++) { // For each batch while (cRecords < records*batch) { cTrans = 0; cIndex = 0; for (t = 0; t < trans; t++) { // For each transaction transactions[t] = pNdb->startTransaction(); if (transactions[t] == NULL) { ERR(pNdb->getNdbError()); return NDBT_FAILED; } for (int k = 0; k < operations; k++) { // For each operation NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName()); if (pOp == NULL) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } switch (theOperation) { case NO_INSERT: // Insert check = pOp->insertTuple(); if (check == -1) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } // Set a calculated value for each attribute in this table for (a = 0; a < tab.getNoOfColumns(); a++) { if (setValueForAttr(pOp, a, cRecords, 0 ) != 0) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } } // For each attribute break; case NO_UPDATE: // This is a special case and is handled in the calling client... break; break; case NO_READ: // Define primary keys check = pOp->readTuple(); if (equalForRow(pOp, cRecords) != 0) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } // Define attributes to read for (a = 0; a < tab.getNoOfColumns(); a++) { if ((rows[cIndex]->attributeStore(a) = pOp->getValue(tab.getColumn(a)->getName())) == 0) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } } break; case NO_DELETE: // Delete check = pOp->deleteTuple(); if (check == -1) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } // Define primary keys if (equalForRow(pOp, cRecords) != 0) { ERR(transactions[t]->getNdbError()); pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } break; default: // Should not happen... pNdb->closeTransaction(transactions[t]); return NDBT_FAILED; } cIndex++; cRecords++; } // For each operation // Let's prepare... transactions[t]->executeAsynchPrepare(theType, &asynchCallback, this); cTrans++; if (cRecords >= records) { // No more transactions needed break; } } // For each transaction // Wait for all outstanding transactions pNdb->sendPollNdb(3000, 0, 0); // ugly... it's starts to resemble flexXXX ...:( switch (theOperation) { case NO_READ: // Verify the data! for (r = 0; r < trans*operations; r++) { if (calc.verifyRowValues(rows[r]) != 0) { g_info << "|- Verify failed..." << endl; // Close all transactions for (int t = 0; t < cTrans; t++) { pNdb->closeTransaction(transactions[t]); } return NDBT_FAILED; } } break; case NO_INSERT: case NO_UPDATE: case NO_DELETE: break; } // Close all transactions for (t = 0; t < cTrans; t++) { pNdb->closeTransaction(transactions[t]); } } // while (cRecords < records*batch) } // For each batch deallocTransactions(); return NDBT_OK; }