summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <joreland@mysql.com>2004-07-06 11:34:26 +0200
committerunknown <joreland@mysql.com>2004-07-06 11:34:26 +0200
commitca6e7a0ca965fdbe9d0e629fb38d256852c7c81b (patch)
tree12d899e269fa6e101daac38c5d88520e378dda57
parentd0a9676b4a423e5a039e23fa0c6615e3dcfdf9b9 (diff)
parent6fcaa7d5ac94ba47980efec596e991b3223c3fbb (diff)
downloadmariadb-git-ca6e7a0ca965fdbe9d0e629fb38d256852c7c81b.tar.gz
Merge mysql.com:/home/jonas/src/mysql-4.1-ndb
into mysql.com:/home/jonas/src/wl1873 ndb/include/ndbapi/NdbScanOperation.hpp: Auto merged ndb/src/ndbapi/NdbResultSet.cpp: Auto merged ndb/src/ndbapi/NdbScanOperation.cpp: Auto merged ndb/test/ndbapi/testScan.cpp: Auto merged
-rw-r--r--ndb/include/ndbapi/NdbResultSet.hpp5
-rw-r--r--ndb/include/ndbapi/NdbScanOperation.hpp2
-rw-r--r--ndb/src/ndbapi/NdbResultSet.cpp5
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp90
-rw-r--r--ndb/test/ndbapi/testScan.cpp93
5 files changed, 195 insertions, 0 deletions
diff --git a/ndb/include/ndbapi/NdbResultSet.hpp b/ndb/include/ndbapi/NdbResultSet.hpp
index 7cf18a6685d..483e08179c0 100644
--- a/ndb/include/ndbapi/NdbResultSet.hpp
+++ b/ndb/include/ndbapi/NdbResultSet.hpp
@@ -97,6 +97,11 @@ public:
void close();
/**
+ * Restart
+ */
+ int restart();
+
+ /**
* Transfer scan operation to an updating transaction. Use this function
* when a scan has found a record that you want to update.
* 1. Start a new transaction.
diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp
index 6ebf5a083f8..c7ae029e742 100644
--- a/ndb/include/ndbapi/NdbScanOperation.hpp
+++ b/ndb/include/ndbapi/NdbScanOperation.hpp
@@ -157,6 +157,8 @@ protected:
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
Uint32 m_ordered;
+
+ int restart();
};
inline
diff --git a/ndb/src/ndbapi/NdbResultSet.cpp b/ndb/src/ndbapi/NdbResultSet.cpp
index 2c5d4a43c4c..b286c9fd7c9 100644
--- a/ndb/src/ndbapi/NdbResultSet.cpp
+++ b/ndb/src/ndbapi/NdbResultSet.cpp
@@ -89,3 +89,8 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){
return -1;
return 0;
}
+
+int
+NdbResultSet::restart(){
+ return m_operation->restart();
+}
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index fcb3e137a47..7dcad95bf5b 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -470,6 +470,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
if(DEBUG_NEXT_RESULT)
ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last);
+ if(DEBUG_NEXT_RESULT)
+ ndbout_c("nextResult(%d) idx=%d last=%d",
+ fetchAllowed,
+ idx, last);
+
/**
* Check next buckets
*/
@@ -1395,3 +1400,88 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
tSignal.setLength(4+1);
return tp->sendSignal(&tSignal, nodeId);
}
+
+int
+NdbScanOperation::restart(){
+ TransporterFacade* tp = TransporterFacade::instance();
+ Guard guard(tp->theMutexPtr);
+
+ Uint32 seq = theNdbCon->theNodeSequence;
+ Uint32 nodeId = theNdbCon->theDBnode;
+
+ if(seq != tp->getNodeSequence(nodeId)){
+ theNdbCon->theReleaseOnClose = true;
+ return -1;
+ }
+
+ while(m_sent_receivers_count){
+ theNdb->theWaiter.m_node = nodeId;
+ theNdb->theWaiter.m_state = WAIT_SCAN;
+ int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+ switch(return_code){
+ case 0:
+ break;
+ case -1:
+ setErrorCode(4008);
+ case -2:
+ m_api_receivers_count = 0;
+ m_conf_receivers_count = 0;
+ m_sent_receivers_count = 0;
+ return -1;
+ }
+ }
+
+ if(m_api_receivers_count+m_conf_receivers_count){
+ // Send close scan
+ if(send_next_scan(0, true) == -1) // Close scan
+ return -1;
+ }
+
+ /**
+ * wait for close scan conf
+ */
+ while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
+ theNdb->theWaiter.m_node = nodeId;
+ theNdb->theWaiter.m_state = WAIT_SCAN;
+ int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+ switch(return_code){
+ case 0:
+ break;
+ case -1:
+ setErrorCode(4008);
+ case -2:
+ m_api_receivers_count = 0;
+ m_conf_receivers_count = 0;
+ m_sent_receivers_count = 0;
+ return -1;
+ }
+ }
+
+ /**
+ * Reset receivers
+ */
+ const Uint32 parallell = theParallelism;
+
+ for(Uint32 i = 0; i<parallell; i++){
+ m_receivers[i]->m_list_index = i;
+ m_prepared_receivers[i] = m_receivers[i]->getId();
+ m_sent_receivers[i] = m_receivers[i];
+ m_conf_receivers[i] = 0;
+ m_api_receivers[i] = 0;
+ m_receivers[i]->prepareSend();
+ }
+
+ m_api_receivers_count = 0;
+ m_current_api_receiver = 0;
+ m_sent_receivers_count = parallell;
+ m_conf_receivers_count = 0;
+
+ if(m_ordered){
+ m_current_api_receiver = parallell;
+ }
+
+ if (doSendScan(nodeId) == -1)
+ return -1;
+
+ return 0;
+}
diff --git a/ndb/test/ndbapi/testScan.cpp b/ndb/test/ndbapi/testScan.cpp
index 0a4fa96dd2d..de60d68f213 100644
--- a/ndb/test/ndbapi/testScan.cpp
+++ b/ndb/test/ndbapi/testScan.cpp
@@ -881,6 +881,93 @@ int runCheckInactivityBeforeClose(NDBT_Context* ctx, NDBT_Step* step){
}
+int runScanRestart(NDBT_Context* ctx, NDBT_Step* step){
+ int loops = ctx->getNumLoops();
+ int records = ctx->getNumRecords();
+ Ndb * pNdb = GETNDB(step);
+ const NdbDictionary::Table* pTab = ctx->getTab();
+
+ HugoCalculator calc(* pTab);
+ NDBT_ResultRow tmpRow(* pTab);
+
+ int i = 0;
+ while (i<loops && !ctx->isTestStopped()) {
+ g_info << i++ << ": ";
+ const int record = (rand() % records);
+ g_info << " row=" << record;
+
+ NdbConnection* pCon = pNdb->startTransaction();
+ NdbScanOperation* pOp = pCon->getNdbScanOperation(pTab->getName());
+ if (pOp == NULL) {
+ ERR(pCon->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ NdbResultSet* rs = pOp->readTuples();
+ if( rs == 0 ) {
+ ERR(pCon->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ int check = pOp->interpret_exit_ok();
+ if( check == -1 ) {
+ ERR(pCon->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ // Define attributes to read
+ for(int a = 0; a<pTab->getNoOfColumns(); a++){
+ if((tmpRow.attributeStore(a) =
+ pOp->getValue(pTab->getColumn(a)->getName())) == 0) {
+ ERR(pCon->getNdbError());
+ return NDBT_FAILED;
+ }
+ }
+
+ check = pCon->execute(NoCommit);
+ if( check == -1 ) {
+ ERR(pCon->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ int res;
+ int row = 0;
+ while(row < record && (res = rs->nextResult()) == 0) {
+ if(calc.verifyRowValues(&tmpRow) != 0){
+ abort();
+ return NDBT_FAILED;
+ }
+ row++;
+ }
+ if(row != record){
+ ERR(pCon->getNdbError());
+ abort();
+ return NDBT_FAILED;
+ }
+ g_info << " restarting" << endl;
+ if((res = rs->restart()) != 0){
+ ERR(pCon->getNdbError());
+ abort();
+ return NDBT_FAILED;
+ }
+
+ row = 0;
+ while((res = rs->nextResult()) == 0) {
+ if(calc.verifyRowValues(&tmpRow) != 0){
+ abort();
+ return NDBT_FAILED;
+ }
+ row++;
+ }
+ if(res != 1 || row != records){
+ ERR(pCon->getNdbError());
+ abort();
+ return NDBT_FAILED;
+ }
+ pCon->close();
+ }
+ return NDBT_OK;
+}
NDBT_TESTSUITE(testScan);
@@ -1304,6 +1391,12 @@ TESTCASE("ScanReadWhileNodeIsDown",
STEP(runStopAndStartNode);
FINALIZER(runClearTable);
}
+TESTCASE("ScanRestart",
+ "Verify restart functionallity"){
+ INITIALIZER(runLoadTable);
+ STEP(runScanRestart);
+ FINALIZER(runClearTable);
+}
NDBT_TESTSUITE_END(testScan);
int main(int argc, const char** argv){