diff options
author | unknown <joreland@mysql.com> | 2004-06-01 13:28:29 +0200 |
---|---|---|
committer | unknown <joreland@mysql.com> | 2004-06-01 13:28:29 +0200 |
commit | ab198e52501d78830acbcc3643f8ad09a40cc288 (patch) | |
tree | f38d4be428d92161aa51f9142698d7b4642ec0e9 /ndb | |
parent | 9ff4d240cf53a3946093cfebacf83890bcb02f81 (diff) | |
download | mariadb-git-ab198e52501d78830acbcc3643f8ad09a40cc288.tar.gz |
Update error handling of new scan
Still known bugs :-(
ndb/include/kernel/signaldata/ScanTab.hpp:
Add close flag
ndb/include/ndbapi/NdbConnection.hpp:
Moved mehtod outside
ndb/include/ndbapi/NdbScanOperation.hpp:
Removed err code from
ndb/src/common/debugger/signaldata/ScanTab.cpp:
Updated printer
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
New error inserts for SCAN
ndb/src/kernel/blocks/dbtc/Dbtc.hpp:
Update handling of frag timeouts
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
Update handling of frag timeouts
ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp:
Don't send empty TRANSID_AI's
ndb/src/ndbapi/NdbConnectionScan.cpp:
Update error handling of scan
ndb/src/ndbapi/NdbScanOperation.cpp:
Update error handling of scan
ndb/src/ndbapi/Ndbif.cpp:
Update error handling of scan
Diffstat (limited to 'ndb')
-rw-r--r-- | ndb/include/kernel/signaldata/ScanTab.hpp | 4 | ||||
-rw-r--r-- | ndb/include/ndbapi/NdbConnection.hpp | 25 | ||||
-rw-r--r-- | ndb/include/ndbapi/NdbScanOperation.hpp | 2 | ||||
-rw-r--r-- | ndb/src/common/debugger/signaldata/ScanTab.cpp | 4 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 16 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtc/Dbtc.hpp | 20 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtc/DbtcMain.cpp | 639 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp | 3 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbConnectionScan.cpp | 16 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbScanOperation.cpp | 70 | ||||
-rw-r--r-- | ndb/src/ndbapi/Ndbif.cpp | 26 |
11 files changed, 470 insertions, 355 deletions
diff --git a/ndb/include/kernel/signaldata/ScanTab.hpp b/ndb/include/kernel/signaldata/ScanTab.hpp index 6cef4381c07..1c11bdee4ae 100644 --- a/ndb/include/kernel/signaldata/ScanTab.hpp +++ b/ndb/include/kernel/signaldata/ScanTab.hpp @@ -367,7 +367,7 @@ public: /** * Length of signal */ - STATIC_CONST( SignalLength = 4 ); + STATIC_CONST( SignalLength = 5 ); private: @@ -380,7 +380,7 @@ private: UintR transId1; // DATA 1 UintR transId2; // DATA 2 UintR errorCode; // DATA 3 - // UintR sendScanNextReqWithClose; // DATA 4 + UintR closeNeeded; // DATA 4 }; diff --git a/ndb/include/ndbapi/NdbConnection.hpp b/ndb/include/ndbapi/NdbConnection.hpp index 65f6bd2995f..bf5a4f6f0e5 100644 --- a/ndb/include/ndbapi/NdbConnection.hpp +++ b/ndb/include/ndbapi/NdbConnection.hpp @@ -633,17 +633,7 @@ private: #ifdef VM_TRACE void printState(); #endif - - bool checkState_TransId(const Uint32 * transId) const { - const Uint32 tTmp1 = transId[0]; - const Uint32 tTmp2 = transId[1]; - Uint64 tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32); - bool b = theStatus == Connected && theTransactionId == tRecTransId; -#ifdef NDB_NO_DROPPED_SIGNAL - if(!b) abort(); -#endif - return b; - } + bool checkState_TransId(const Uint32 * transId) const; }; inline @@ -678,6 +668,19 @@ NdbConnection::checkMagicNumber() } } +inline +bool +NdbConnection::checkState_TransId(const Uint32 * transId) const { + const Uint32 tTmp1 = transId[0]; + const Uint32 tTmp2 = transId[1]; + Uint64 tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32); + bool b = theStatus == Connected && theTransactionId == tRecTransId; +#ifdef NDB_NO_DROPPED_SIGNAL + if(!b) abort(); +#endif + return b; +} + /************************************************************************************************ void setTransactionId(Uint64 aTransactionId); diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp index 8ff640dc6ec..a329505ef1b 100644 --- a/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/ndb/include/ndbapi/NdbScanOperation.hpp @@ -146,7 +146,7 @@ protected: int send_next_scan(Uint32 cnt, bool close); void receiver_delivered(NdbReceiver*); void receiver_completed(NdbReceiver*); - void execCLOSE_SCAN_REP(Uint32 errCode); + void execCLOSE_SCAN_REP(); NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*); diff --git a/ndb/src/common/debugger/signaldata/ScanTab.cpp b/ndb/src/common/debugger/signaldata/ScanTab.cpp index 776e9cf3bfc..b0383d6d6df 100644 --- a/ndb/src/common/debugger/signaldata/ScanTab.cpp +++ b/ndb/src/common/debugger/signaldata/ScanTab.cpp @@ -119,8 +119,8 @@ printSCANTABREF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv sig->transId1, sig->transId2); fprintf(output, " Errorcode: %u\n", sig->errorCode); - - // fprintf(output, " sendScanNextReqWithClose: %u\n", sig->sendScanNextReqWithClose); + + fprintf(output, " closeNeeded: %u\n", sig->closeNeeded); return false; } diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 463a3d47354..2a744ea746a 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -6785,7 +6785,8 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal) if (findTransaction(transid1, transid2, senderData) != ZOK){ jam(); - DEBUG("Received SCAN_NEXTREQ in LQH with close flag when closed"); + DEBUG(senderData << + " Received SCAN_NEXTREQ in LQH with close flag when closed"); ndbrequire(nextReq->closeFlag == ZTRUE); return; } @@ -6825,6 +6826,10 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal) return; }//if + if(ERROR_INSERTED(5036)){ + return; + } + scanptr.i = tcConnectptr.p->tcScanRec; ndbrequire(scanptr.i != RNIL); c_scanRecordPool.getPtr(scanptr); @@ -6841,6 +6846,10 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal) if(ERROR_INSERTED(5034)){ CLEAR_ERROR_INSERT_VALUE; } + if(ERROR_INSERTED(5036)){ + CLEAR_ERROR_INSERT_VALUE; + return; + } closeScanRequestLab(signal); return; }//if @@ -8517,6 +8526,11 @@ void Dblqh::sendKeyinfo20(Signal* signal, * ------------------------------------------------------------------------ */ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted) { + if(ERROR_INSERTED(5037)){ + CLEAR_ERROR_INSERT_VALUE; + return; + } + scanptr.p->scanTcWaiting = ZFALSE; ScanFragConf * conf = (ScanFragConf*)&signal->theData[0]; diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp index 61e7e42621c..501cec1f231 100644 --- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp +++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp @@ -1194,6 +1194,9 @@ public: // Scan is on ordered index Uint8 rangeScan; + + // Close is ordered + bool m_close_scan_req; }; typedef Ptr<ScanRecord> ScanRecordPtr; @@ -1414,15 +1417,15 @@ private: Uint32 buddyPtr, UintR transid1, UintR transid2); - void initScanrec(Signal* signal, + void initScanrec(ScanRecordPtr, const class ScanTabReq*, const UintR scanParallel, const UintR noOprecPerFrag); void initScanfragrec(Signal* signal); void releaseScanResources(ScanRecordPtr); - void seizeScanrec(Signal* signal); - void sendScanFragReq(Signal* signal); - void sendScanTabConf(Signal* signal); - void close_scan_req(Signal*, ScanRecordPtr); + ScanRecordPtr seizeScanrec(Signal* signal); + void sendScanFragReq(Signal* signal, ScanRecord*, ScanFragRec*); + void sendScanTabConf(Signal* signal, ScanRecord*); + void close_scan_req(Signal*, ScanRecordPtr, bool received_req); void close_scan_req_send_conf(Signal*, ScanRecordPtr); void checkGcp(Signal* signal); @@ -1557,11 +1560,11 @@ private: void systemErrorLab(Signal* signal); void sendSignalErrorRefuseLab(Signal* signal); void scanTabRefLab(Signal* signal, Uint32 errCode); - void diFcountReqLab(Signal* signal); + void diFcountReqLab(Signal* signal, ScanRecordPtr); void signalErrorRefuseLab(Signal* signal); void abort080Lab(Signal* signal); void packKeyData000Lab(Signal* signal, BlockReference TBRef); - void abortScanLab(Signal* signal, Uint32 errCode); + void abortScanLab(Signal* signal, ScanRecordPtr, Uint32 errCode); void sendAbortedAfterTimeout(Signal* signal, int Tcheck); void abort010Lab(Signal* signal); void abort015Lab(Signal* signal); @@ -1589,7 +1592,7 @@ private: void attrinfo020Lab(Signal* signal); void scanReleaseResourcesLab(Signal* signal); void scanCompletedLab(Signal* signal); - void scanFragError(Signal* signal, Uint32 errorCode); + void scanError(Signal* signal, ScanRecordPtr, Uint32 errorCode); void diverify010Lab(Signal* signal); void intstartphase2x010Lab(Signal* signal); void intstartphase3x010Lab(Signal* signal); @@ -1699,7 +1702,6 @@ private: ApiConnectRecordPtr timeOutptr; ScanRecord *scanRecord; - ScanRecordPtr scanptr; UintR cscanrecFileSize; UnsafeArrayPool<ScanFragRec> c_scan_frag_pool; diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index edb51ea3c89..04506bc62eb 100644 --- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -76,6 +76,39 @@ #define INTERNAL_TRIGGER_TCKEYREQ_JBA 0 +#ifdef VM_TRACE +NdbOut & +operator<<(NdbOut& out, Dbtc::ConnectionState state){ + out << (int)state; + return out; +} +NdbOut & +operator<<(NdbOut& out, Dbtc::OperationState state){ + out << (int)state; + return out; +} +NdbOut & +operator<<(NdbOut& out, Dbtc::AbortState state){ + out << (int)state; + return out; +} +NdbOut & +operator<<(NdbOut& out, Dbtc::ReturnSignal state){ + out << (int)state; + return out; +} +NdbOut & +operator<<(NdbOut& out, Dbtc::ScanRecord::ScanState state){ + out << (int)state; + return out; +} +NdbOut & +operator<<(NdbOut& out, Dbtc::ScanFragRec::ScanFragState state){ + out << (int)state; + return out; +} +#endif + void Dbtc::updateBuddyTimer(ApiConnectRecordPtr apiPtr) { @@ -915,7 +948,7 @@ Dbtc::handleFailedApiNode(Signal* signal, ScanRecordPtr scanPtr; scanPtr.i = apiConnectptr.p->apiScanRec; ptrCheckGuard(scanPtr, cscanrecFileSize, scanRecord); - close_scan_req(signal, scanPtr); + close_scan_req(signal, scanPtr, true); TloopCount += 64; break; @@ -1095,138 +1128,6 @@ void Dbtc::handleApiFailState(Signal* signal, UintR TapiConnectptr) }//if }//Dbtc::handleApiFailState() -/** - * Dbtc::handleScanStop - * This function is called when an entire scan should be stopped - * Check state of the scan and take appropriate action. - * The parameter TapiFailedNode indicates if the scan is stopped - * because an API node has failed or if it has been stopped because - * the scan has timed out. - * - */ -void Dbtc::handleScanStop(Signal* signal, UintR TapiFailedNode) -{ -#if JONAS_NOT_DONE - arrGuard(TapiFailedNode, MAX_NODES); - - scanptr.i = apiConnectptr.p->apiScanRec; - ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); - - // If api has failed we must release all resources - bool apiNodeHasFailed = (TapiFailedNode != 0); - - DEBUG("handleScanStop: scanState = "<< scanptr.p->scanState); - - switch (scanptr.p->scanState) { - case ScanRecord::WAIT_SCAN_TAB_INFO: - case ScanRecord::WAIT_AI: - jam(); - /** - * The scan process is still in the definition phase. - * We will release the resources and then release the connection - * to the failed API. - */ - releaseScanResources(scanptr); - if (apiNodeHasFailed) { - jam(); - releaseApiCon(signal, apiConnectptr.i); - }//if - break; - - case ScanRecord::WAIT_FRAGMENT_COUNT: - jam(); - if (!apiNodeHasFailed) { - jam(); - /** - * Time-out waiting for a local signal can only happen - * if we have a serious problem. - */ - systemErrorLab(signal); - }//if - capiConnectClosing[TapiFailedNode]++; - apiConnectptr.p->apiFailState = ZTRUE; - scanptr.p->apiIsClosed = true; - break; - - case ScanRecord::CLOSING_SCAN: - jam(); - /** - * With CLOSING_SCAN it is enough to set the - * fail state such that the connection is released at the end of the - * closing process. The close process is already ongoing. - * Set apiIsClosed to true to indicate that resources should be released - * at the end of the close process. - **/ - - if (apiNodeHasFailed) { - jam(); - capiConnectClosing[TapiFailedNode]++; - apiConnectptr.p->apiFailState = ZTRUE; - scanptr.p->apiIsClosed = true; - }//if - if (apiConnectptr.p->apiFailState == ZTRUE) { - jam(); - handleApiFailState(signal, apiConnectptr.i); - return; - }//if - break; - - case ScanRecord::SCAN_NEXT_ORDERED: - /** - * In the SCAN_NEXT_ORDERED state we will wait for the next natural place - * to receive some action from the API and instead of waiting for the - * API here we will start the abort process. - - * After the abort process is completed we will release the connection. - */ - if (apiNodeHasFailed) { - jam(); - capiConnectClosing[TapiFailedNode]++; - apiConnectptr.p->apiFailState = ZTRUE; - }//if - // Release resources and send a response to API - scanptr.p->apiIsClosed = true; - scanCompletedLab(signal); - break; - - case ScanRecord::DELIVERED: - case ScanRecord::QUEUED_DELIVERED: - /** - * A response has been sent to the api but it has not responded - */ - - if (apiNodeHasFailed) { - jam(); - capiConnectClosing[TapiFailedNode]++; - apiConnectptr.p->apiFailState = ZTRUE; - scanptr.p->apiIsClosed = true; - } else { - jam(); - /* - In this case we have received a time-out caused by the application - waiting too long to continue the scan. We will check the application - time-out instead of the deadlock detetection time-out. If the - application time-out hasn't fired we will simply ignore the condition. - */ - if ((ctcTimer - getApiConTimer(apiConnectptr.i)) <= c_appl_timeout_value) { - jam(); - return; - }//if - // Dont' release, wait until api responds or fails - scanptr.p->apiIsClosed = false; - } - scanCompletedLab(signal); - break; - - default: - jam(); - systemErrorLab(signal); - break; - - }//switch -#endif -}//Dbtc::handleScanStop() - /**************************************************************************** * T C S E I Z E R E Q * THE APPLICATION SENDS A REQUEST TO SEIZE A CONNECT RECORD TO CARRY OUT A @@ -1409,7 +1310,7 @@ void Dbtc::printState(Signal* signal, int place) << " counter = " << apiConnectptr.p->counter << " lqhkeyconfrec = " << apiConnectptr.p->lqhkeyconfrec << " lqhkeyreqrec = " << apiConnectptr.p->lqhkeyreqrec << endl; - ndbout << "abortState = " << (int)apiConnectptr.p->abortState + ndbout << "abortState = " << apiConnectptr.p->abortState << " apiScanRec = " << apiConnectptr.p->apiScanRec << " returncode = " << apiConnectptr.p->returncode << endl; ndbout << "tckeyrec = " << apiConnectptr.p->tckeyrec @@ -6155,11 +6056,14 @@ void Dbtc::timeOutFoundLab(Signal* signal, Uint32 TapiConPtr) tcConnectptr.i = apiConnectptr.p->firstTcConnect; sendAbortedAfterTimeout(signal, 0); break; - case CS_START_SCAN: + case CS_START_SCAN:{ jam(); - apiConnectptr.p->returncode = ZSCANTIME_OUT_ERROR; - handleScanStop(signal, 0); + ScanRecordPtr scanPtr; + scanPtr.i = apiConnectptr.p->apiScanRec; + ptrCheckGuard(scanPtr, cscanrecFileSize, scanRecord); + scanError(signal, scanPtr, ZSCANTIME_OUT_ERROR); break; + } case CS_WAIT_ABORT_CONF: jam(); tcConnectptr.i = apiConnectptr.p->currentTcConnect; @@ -6529,14 +6433,15 @@ void Dbtc::execSCAN_HBREP(Signal* signal) c_scan_frag_pool.getPtr(scanFragptr); switch (scanFragptr.p->scanFragState){ case ScanFragRec::LQH_ACTIVE: + //case ScanFragRec::LQH_ACTIVE_CLOSE: break; - default: DEBUG("execSCAN_HBREP: scanFragState="<<scanFragptr.p->scanFragState); systemErrorLab(signal); break; } + ScanRecordPtr scanptr; scanptr.i = scanFragptr.p->scanRec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); @@ -6567,6 +6472,7 @@ void Dbtc::execSCAN_HBREP(Signal* signal) updateBuddyTimer(apiConnectptr); scanFragptr.p->startFragTimer(ctcTimer); } else { + ndbassert(false); DEBUG("SCAN_HBREP when scanFragTimer was turned off"); } }//execSCAN_HBREP() @@ -6575,34 +6481,56 @@ void Dbtc::execSCAN_HBREP(Signal* signal) /* Timeout has occured on a fragment which means a scan has timed out. */ /* If this is true we have an error in LQH/ACC. */ /*--------------------------------------------------------------------------*/ +static int kalle = 0; void Dbtc::timeOutFoundFragLab(Signal* signal, UintR TscanConPtr) { - scanFragptr.i = TscanConPtr; - c_scan_frag_pool.getPtr(scanFragptr); - DEBUG("timeOutFoundFragLab: scanFragState = "<<scanFragptr.p->scanFragState); + ScanFragRecPtr ptr; + c_scan_frag_pool.getPtr(ptr, TscanConPtr); + DEBUG(TscanConPtr << " timeOutFoundFragLab: scanFragState = "<< ptr.p->scanFragState); /*-------------------------------------------------------------------------*/ // The scan fragment has expired its timeout. Check its state to decide // what to do. /*-------------------------------------------------------------------------*/ - switch (scanFragptr.p->scanFragState) { - + switch (ptr.p->scanFragState) { case ScanFragRec::WAIT_GET_PRIMCONF: jam(); - // Crash the system if we do not return from DIGETPRIMREQ in time. - systemErrorLab(signal); + ndbrequire(false); break; - - case ScanFragRec::LQH_ACTIVE: + case ScanFragRec::LQH_ACTIVE:{ jam(); + /** * The LQH expired it's timeout, try to close it */ - scanFragError(signal, ZSCAN_FRAG_LQH_ERROR); - DEBUG(" LQH_ACTIVE - closing the fragment scan in node " - << refToNode(scanFragptr.p->lqhBlockref)); - break; + Uint32 nodeId = refToNode(ptr.p->lqhBlockref); + Uint32 connectCount = getNodeInfo(nodeId).m_connectCount; + ScanRecordPtr scanptr; + scanptr.i = ptr.p->scanRec; + ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); + + if(connectCount != ptr.p->m_connectCount){ + jam(); + /** + * The node has died + */ + ndbout_c("Node %d has died", nodeId); + ptr.p->scanFragState = ScanFragRec::COMPLETED; + ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); + ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags); + + run.remove(ptr); + comp.add(ptr); + ptr.p->stopFragTimer(); + } else { + kalle++; + if(kalle > 5) + ndbassert(scanptr.p->scanState != ScanRecord::CLOSING_SCAN); + } + scanError(signal, scanptr, ZSCAN_FRAG_LQH_ERROR); + break; + } case ScanFragRec::DELIVERED: jam(); case ScanFragRec::IDLE: @@ -6863,14 +6791,38 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal, Uint32 scanPtrI, Uint32 failedNodeId){ + ScanRecordPtr scanptr; for (scanptr.i = scanPtrI; scanptr.i < cscanrecFileSize; scanptr.i++) { jam(); ptrAss(scanptr, scanRecord); if (scanptr.p->scanState != ScanRecord::IDLE){ - checkScanFragList(signal, failedNodeId, - scanptr.p, scanptr.p->m_running_scan_frags); - } + jam(); + ScanFragRecPtr ptr; + ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); + ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags); + bool found = false; + for(run.first(ptr); !ptr.isNull(); ){ + jam(); + ScanFragRecPtr curr = ptr; + run.next(ptr); + if (curr.p->scanFragState == ScanFragRec::LQH_ACTIVE && + refToNode(curr.p->lqhBlockref) == failedNodeId){ + jam(); + + run.remove(curr); + comp.add(curr); + curr.p->scanFragState = ScanFragRec::COMPLETED; + curr.p->stopFragTimer(); + found = true; + } + } + if(found){ + jam(); + scanError(signal, scanptr, ZSCAN_LQH_ERROR); + } + } + // Send CONTINUEB to continue later signal->theData[0] = TcContinueB::ZCHECK_SCAN_ACTIVE_FAILED_LQH; signal->theData[1] = scanptr.i + 1; // Check next scanptr @@ -6886,29 +6838,7 @@ Dbtc::checkScanFragList(Signal* signal, ScanRecord * scanP, ScanFragList::Head & head){ - ScanFragRecPtr ptr; - ScanFragList list(c_scan_frag_pool, head); - - for(list.first(ptr); !ptr.isNull(); list.next(ptr)){ - if (refToNode(ptr.p->lqhBlockref) == failedNodeId){ - switch (ptr.p->scanFragState){ - case ScanFragRec::LQH_ACTIVE: - jam(); - apiConnectptr.i = scanptr.p->scanApiRec; - ptrCheckGuard(apiConnectptr, capiConnectFilesize, - apiConnectRecord); - - DEBUG("checkScanActiveInFailedLqh: scanFragError"); - scanFragError(signal, ZSCAN_LQH_ERROR); - - break; - default: - /* empty */ - jam(); - break; - } - } - } + DEBUG("checkScanActiveInFailedLqh: scanFragError"); } void Dbtc::execTAKE_OVERTCCONF(Signal* signal) @@ -8421,6 +8351,7 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) Uint32 noOprecPerFrag = ScanTabReq::getScanBatch(reqinfo); Uint32 scanParallel = scanConcurrency; Uint32 errCode; + ScanRecordPtr scanptr; if(noOprecPerFrag == 0){ jam(); @@ -8445,12 +8376,13 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) return; }//if ptrAss(apiConnectptr, apiConnectRecord); + ApiConnectRecord * transP = apiConnectptr.p; - if (apiConnectptr.p->apiConnectstate != CS_CONNECTED) { + if (transP->apiConnectstate != CS_CONNECTED) { jam(); // could be left over from TCKEYREQ rollback - if (apiConnectptr.p->apiConnectstate == CS_ABORTING && - apiConnectptr.p->abortState == AS_IDLE) { + if (transP->apiConnectstate == CS_ABORTING && + transP->abortState == AS_IDLE) { jam(); } else { jam(); @@ -8515,15 +8447,26 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) } seizeTcConnect(signal); - seizeCacheRecord(signal); - seizeScanrec(signal); - initScanrec(signal, scanParallel, noOprecPerFrag); tcConnectptr.p->apiConnect = apiConnectptr.i; - initScanApirec(signal, buddyPtr, transid1, transid2); + + seizeCacheRecord(signal); + scanptr = seizeScanrec(signal); + + ndbrequire(transP->apiScanRec == RNIL); + ndbrequire(scanptr.p->scanApiRec == RNIL); + + initScanrec(scanptr, scanTabReq, scanParallel, noOprecPerFrag); + + //initScanApirec(signal, buddyPtr, transid1, transid2); + transP->apiScanRec = scanptr.i; + transP->returncode = 0; + transP->transid[0] = transid1; + transP->transid[1] = transid2; + transP->buddyPtr = buddyPtr; // The scan is started - apiConnectptr.p->apiConnectstate = CS_START_SCAN; - apiConnectptr.p->currSavePointId = currSavePointId; + transP->apiConnectstate = CS_START_SCAN; + transP->currSavePointId = currSavePointId; /********************************************************** * We start the timer on scanRec to be able to discover a @@ -8546,12 +8489,14 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) SCAN_TAB_error: jam(); + ndbrequire(false); ScanTabRef * ref = (ScanTabRef*)&signal->theData[0]; - ref->apiConnectPtr = apiConnectptr.p->ndbapiConnect; + ref->apiConnectPtr = transP->ndbapiConnect; ref->transId1 = transid1; ref->transId2 = transid2; ref->errorCode = errCode; - sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABREF, + ref->closeNeeded = 0; + sendSignal(transP->ndbapiBlockref, GSN_SCAN_TABREF, signal, ScanTabRef::SignalLength, JBB); return; @@ -8561,20 +8506,13 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) void Dbtc::initScanApirec(Signal* signal, Uint32 buddyPtr, UintR transid1, UintR transid2) { - ApiConnectRecord * apiPtr = apiConnectptr.p; - apiPtr->apiScanRec = scanptr.i; - apiPtr->returncode = 0; - apiPtr->transid[0] = transid1; - apiPtr->transid[1] = transid2; - apiPtr->buddyPtr = buddyPtr; - }//Dbtc::initScanApirec() -void Dbtc::initScanrec(Signal* signal, +void Dbtc::initScanrec(ScanRecordPtr scanptr, + const ScanTabReq * scanTabReq, UintR scanParallel, UintR noOprecPerFrag) { - const ScanTabReq * const scanTabReq = (ScanTabReq *)&signal->theData[0]; const UintR reqinfo = scanTabReq->requestInfo; ndbrequire(scanParallel < 16); @@ -8613,6 +8551,7 @@ void Dbtc::scanTabRefLab(Signal* signal, Uint32 errCode) ref->transId1 = apiConnectptr.p->transid[0]; ref->transId2 = apiConnectptr.p->transid[1]; ref->errorCode = errCode; + ref->closeNeeded = 0; sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABREF, signal, ScanTabRef::SignalLength, JBB); }//Dbtc::scanTabRefLab() @@ -8623,6 +8562,7 @@ void Dbtc::scanTabRefLab(Signal* signal, Uint32 errCode) /*---------------------------------------------------------------------------*/ void Dbtc::scanAttrinfoLab(Signal* signal, UintR Tlen) { + ScanRecordPtr scanptr; scanptr.i = apiConnectptr.p->apiScanRec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); tcConnectptr.i = scanptr.p->scanTcrec; @@ -8652,7 +8592,7 @@ void Dbtc::scanAttrinfoLab(Signal* signal, UintR Tlen) * THIS SCAN. WE ARE READY TO START THE ACTUAL * EXECUTION OF THE SCAN QUERY **************************************************/ - diFcountReqLab(signal); + diFcountReqLab(signal, scanptr); return; }//if }//if @@ -8660,21 +8600,21 @@ void Dbtc::scanAttrinfoLab(Signal* signal, UintR Tlen) scanAttrinfo_attrbuf_error: jam(); - abortScanLab(signal, ZGET_ATTRBUF_ERROR); + abortScanLab(signal, scanptr, ZGET_ATTRBUF_ERROR); return; scanAttrinfo_attrbuf2_error: jam(); - abortScanLab(signal, ZGET_ATTRBUF_ERROR); + abortScanLab(signal, scanptr, ZGET_ATTRBUF_ERROR); return; scanAttrinfo_len_error: jam(); - abortScanLab(signal, ZLENGTH_ERROR); + abortScanLab(signal, scanptr, ZLENGTH_ERROR); return; }//Dbtc::scanAttrinfoLab() -void Dbtc::diFcountReqLab(Signal* signal) +void Dbtc::diFcountReqLab(Signal* signal, ScanRecordPtr scanptr) { /** * Check so that the table is not being dropped @@ -8685,7 +8625,8 @@ void Dbtc::diFcountReqLab(Signal* signal) if (tabPtr.p->checkTable(scanptr.p->scanSchemaVersion)){ ; } else { - abortScanLab(signal, tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion)); + abortScanLab(signal, scanptr, + tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion)); return; } @@ -8717,6 +8658,7 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal) ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord); apiConnectptr.i = tcConnectptr.p->apiConnect; ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord); + ScanRecordPtr scanptr; scanptr.i = apiConnectptr.p->apiScanRec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_FRAGMENT_COUNT); @@ -8728,7 +8670,7 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal) }//if if (tfragCount == 0) { jam(); - abortScanLab(signal, ZNO_FRAGMENT_ERROR); + abortScanLab(signal, scanptr, ZNO_FRAGMENT_ERROR); return; }//if @@ -8741,19 +8683,22 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal) if (tabPtr.p->checkTable(scanptr.p->scanSchemaVersion)){ ; } else { - abortScanLab(signal, tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion)); + abortScanLab(signal, scanptr, + tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion)); return; } if(scanptr.p->scanParallel > tfragCount){ jam(); - abortScanLab(signal, ZTOO_HIGH_CONCURRENCY_ERROR); + abortScanLab(signal, scanptr, ZTOO_HIGH_CONCURRENCY_ERROR); return; } scanptr.p->scanParallel = tfragCount; scanptr.p->scanNoFrag = tfragCount; scanptr.p->scanNextFragId = 0; + scanptr.p->scanState = ScanRecord::RUNNING; + setApiConTimer(apiConnectptr.i, 0, __LINE__); updateBuddyTimer(apiConnectptr); @@ -8768,6 +8713,7 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal) scanptr.p->scanTableref, scanptr.p->scanNextFragId); #endif + ptr.p->lqhBlockref = 0; ptr.p->startFragTimer(ctcTimer); ptr.p->scanFragId = scanptr.p->scanNextFragId++; ptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; @@ -8792,6 +8738,7 @@ void Dbtc::execDI_FCOUNTREF(Signal* signal) const Uint32 errCode = signal->theData[1]; apiConnectptr.i = tcConnectptr.p->apiConnect; ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord); + ScanRecordPtr scanptr; scanptr.i = apiConnectptr.p->apiScanRec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_FRAGMENT_COUNT); @@ -8801,10 +8748,10 @@ void Dbtc::execDI_FCOUNTREF(Signal* signal) handleApiFailState(signal, apiConnectptr.i); return; }//if - abortScanLab(signal, errCode); + abortScanLab(signal, scanptr, errCode); }//Dbtc::execDI_FCOUNTREF() -void Dbtc::abortScanLab(Signal* signal, Uint32 errCode) +void Dbtc::abortScanLab(Signal* signal, ScanRecordPtr scanptr, Uint32 errCode) { scanTabRefLab(signal, errCode); releaseScanResources(scanptr); @@ -8835,6 +8782,7 @@ void Dbtc::releaseScanResources(ScanRecordPtr scanPtr) scanPtr.p->nextScan = cfirstfreeScanrec; scanPtr.p->scanState = ScanRecord::IDLE; scanPtr.p->scanTcrec = RNIL; + scanPtr.p->scanApiRec = RNIL; cfirstfreeScanrec = scanPtr.i; apiConnectptr.p->apiScanRec = RNIL; @@ -8862,7 +8810,8 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal) ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF); scanFragptr.p->stopFragTimer(); - + + ScanRecordPtr scanptr; scanptr.i = scanFragptr.p->scanRec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); @@ -8876,7 +8825,12 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal) Uint32 schemaVersion = scanptr.p->scanSchemaVersion; if(tabPtr.p->checkTable(schemaVersion) == false){ jam(); - scanFragError(signal, tabPtr.p->getErrorCode(schemaVersion)); + ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); + ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags); + + run.remove(scanFragptr); + comp.add(scanFragptr); + scanError(signal, scanptr, tabPtr.p->getErrorCode(schemaVersion)); return; } } @@ -8908,7 +8862,7 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal) Uint32 ref = calcLqhBlockRef(tnodeid); scanFragptr.p->lqhBlockref = ref; scanFragptr.p->m_connectCount = getNodeInfo(tnodeid).m_connectCount; - sendScanFragReq(signal); + sendScanFragReq(signal, scanptr.p, scanFragptr.p); attrbufptr.i = cachePtr.p->firstAttrbuf; while (attrbufptr.i != RNIL) { jam(); @@ -8943,7 +8897,18 @@ void Dbtc::execDIGETPRIMREF(Signal* signal) const Uint32 errCode = signal->theData[2]; c_scan_frag_pool.getPtr(scanFragptr); ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF); - scanFragError(signal, errCode); + + ScanRecordPtr scanptr; + scanptr.i = scanFragptr.p->scanRec; + ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); + + ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); + ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags); + + run.remove(scanFragptr); + comp.add(scanFragptr); + + scanError(signal, scanptr, errCode); }//Dbtc::execDIGETPRIMREF() /** @@ -8962,6 +8927,7 @@ void Dbtc::execSCAN_FRAGREF(Signal* signal) scanFragptr.i = ref->senderData; c_scan_frag_pool.getPtr(scanFragptr); + ScanRecordPtr scanptr; scanptr.i = scanFragptr.p->scanRec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); @@ -8981,42 +8947,64 @@ void Dbtc::execSCAN_FRAGREF(Signal* signal) * stop fragment timer and call scanFragError to start * close of the other fragment scans */ - scanFragError(signal, errCode); + ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE); + { + scanFragptr.p->scanFragState = ScanFragRec::COMPLETED; + ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); + ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags); + + run.remove(scanFragptr); + comp.add(scanFragptr); + scanFragptr.p->stopFragTimer(); + } + scanError(signal, scanptr, errCode); }//Dbtc::execSCAN_FRAGREF() /** - * Dbtc::scanFragError + * Dbtc::scanError * * Called when an error occurs during - * a scan of a fragment. - * NOTE that one scan may consist of several fragment scans. - * */ -void Dbtc::scanFragError(Signal* signal, Uint32 errorCode) +void Dbtc::scanError(Signal* signal, ScanRecordPtr scanptr, Uint32 errorCode) { jam(); - scanptr.i = scanFragptr.p->scanRec; - ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); - DEBUG("scanFragError, errorCode = "<< errorCode - << ", scanState = " << scanptr.p->scanState); + ScanRecord* scanP = scanptr.p; + + DEBUG("scanError, errorCode = "<< errorCode << + ", scanState = " << scanptr.p->scanState); - scanFragptr.p->stopFragTimer(); -#if JONAS_NOT_DONE + if(scanP->scanState == ScanRecord::CLOSING_SCAN){ + jam(); + close_scan_req_send_conf(signal, scanptr); + return; + } - apiConnectptr.i = scanptr.p->scanApiRec; - ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord); + ndbrequire(scanP->scanState == ScanRecord::RUNNING); - // If close of the scan is not already started - if (scanptr.p->scanState != ScanRecord::CLOSING_SCAN) { + apiConnectptr.i = scanP->scanApiRec; + ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord); + ndbrequire(apiConnectptr.p->apiScanRec == scanptr.i); + + /** + * Close scan wo/ having received an order to do so + */ + close_scan_req(signal, scanptr, false); + + const bool apiFail = (apiConnectptr.p->apiFailState == ZTRUE); + if(apiFail){ jam(); - apiConnectptr.p->returncode = errorCode; - - scanCompletedLab(signal); return; - }//if -#endif -}//Dbtc::scanFragError() - + } + + ScanTabRef * ref = (ScanTabRef*)&signal->theData[0]; + ref->apiConnectPtr = apiConnectptr.p->ndbapiConnect; + ref->transId1 = apiConnectptr.p->transid[0]; + ref->transId2 = apiConnectptr.p->transid[1]; + ref->errorCode = errorCode; + ref->closeNeeded = 1; + sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABREF, + signal, ScanTabRef::SignalLength, JBB); +}//Dbtc::scanError() /************************************************************ * execSCAN_FRAGCONF @@ -9034,6 +9022,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) scanFragptr.i = conf->senderData; c_scan_frag_pool.getPtr(scanFragptr); + ScanRecordPtr scanptr; scanptr.i = scanFragptr.p->scanRec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); @@ -9051,32 +9040,34 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE); const Uint32 status = conf->fragmentCompleted; - scanFragptr.p->stopFragTimer(); - + + DEBUG(apiConnectptr.i << " " << scanFragptr.i << + " execSCAN_FRAGCONF() status: " << status + << " ops: " << noCompletedOps << " from: " << refToNode(signal->getSendersBlockRef())); + if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){ + jam(); if(status == ZFALSE){ /** - * Dont deliver to api, but instead close in LQH - * Dont need to mess with queues + * We have started closing = we sent a close -> ignore this */ - ndbout_c("running -> running(close)"); - - jam(); - ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0]; - nextReq->senderData = scanFragptr.i; - nextReq->closeFlag = ZTRUE; - nextReq->transId1 = apiConnectptr.p->transid[0]; - nextReq->transId2 = apiConnectptr.p->transid[1]; - sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, - ScanFragNextReq::SignalLength, JBB); + DEBUG(apiConnectptr.i << " " << scanFragptr.i << + " Received SCANFRAG_CONF wo/ close when in " + " CLOSING_SCAN:" << status << " " << noCompletedOps); return; } else { jam(); + DEBUG(apiConnectptr.i << " " << scanFragptr.i + << " Received SCANFRAG_CONF w/ close when in " + " CLOSING_SCAN:" << status << " " << noCompletedOps); + ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags); run.remove(scanFragptr); comp.add(scanFragptr); + scanFragptr.p->stopFragTimer(); + scanFragptr.p->scanFragState = ScanFragRec::COMPLETED; } close_scan_req_send_conf(signal, scanptr); return; @@ -9126,7 +9117,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) if(scanptr.p->m_queued_count > /** Min */ 0){ jam(); - sendScanTabConf(signal); + sendScanTabConf(signal, scanptr.p); } }//Dbtc::execSCAN_FRAGCONF() @@ -9164,6 +9155,7 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) ref->transId1 = transid1; ref->transId2 = transid2; ref->errorCode = ZSTATE_ERROR; + ref->closeNeeded = 0; sendSignal(signal->senderBlockRef(), GSN_SCAN_TABREF, signal, ScanTabRef::SignalLength, JBB); DEBUG("Wrong transid"); @@ -9188,6 +9180,7 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) } DEBUG("scanTabRefLab: ZSTATE_ERROR"); DEBUG(" apiConnectstate="<<apiConnectptr.p->apiConnectstate); + ndbrequire(false); //B2 indication of strange things going on scanTabRefLab(signal, ZSTATE_ERROR); return; }//if @@ -9197,6 +9190,7 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) ********************************************************/ // Stop the timer that is used to check for timeout in the API setApiConTimer(apiConnectptr.i, 0, __LINE__); + ScanRecordPtr scanptr; scanptr.i = apiConnectptr.p->apiScanRec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); ScanRecord* scanP = scanptr.p; @@ -9209,10 +9203,21 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) * APPLICATION IS CLOSING THE SCAN. **********************************************************************/ ndbrequire(len == 0); - close_scan_req(signal, scanptr); + close_scan_req(signal, scanptr, true); return; }//if + if (scanptr.p->scanState == ScanRecord::CLOSING_SCAN){ + jam(); + /** + * The scan is closing (typically due to error) + * but the API hasn't understood it yet + * + * Wait for API close request + */ + return; + } + // Copy op ptrs so I dont overwrite them when sending... memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len); @@ -9243,26 +9248,25 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) }//Dbtc::execSCAN_NEXTREQ() void -Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){ +Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){ #ifdef VM_TRACE ndbout_c("%d close_scan_req", apiConnectptr.i); #endif ScanRecord* scanP = scanPtr.p; scanPtr.p->scanState = ScanRecord::CLOSING_SCAN; + scanPtr.p->m_close_scan_req = req_received; /** - * Queue : Action - * ========== : ================= - * completed : - - * running : - - * delivered : close -> LQH - * queued w/ : close -> LQH - * queued wo/ : move to completed + * Queue : Action + * ============= : ================= + * completed : - + * running : close -> LQH + * delivered w/ : close -> LQH + * delivered wo/ : move to completed + * queued w/ : close -> LQH + * queued wo/ : move to completed */ - /** - * All delivered should to be closed - */ ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0]; nextReq->closeFlag = ZTRUE; nextReq->transId1 = apiConnectptr.p->transid[0]; @@ -9273,6 +9277,28 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){ ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags); ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags); ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags); + ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags); + + // Close running + for(running.first(ptr); !ptr.isNull(); ){ + ScanFragRecPtr curr = ptr; // Remove while iterating... + running.next(ptr); + + if(curr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF){ + jam(); + continue; + } + ndbrequire(curr.p->scanFragState == ScanFragRec::LQH_ACTIVE); + + curr.p->startFragTimer(ctcTimer); + curr.p->scanFragState = ScanFragRec::LQH_ACTIVE; + nextReq->senderData = curr.i; + sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, + ScanFragNextReq::SignalLength, JBB); + ndbout_c("%d running -> closing", curr.i); + } + + // Close delivered for(delivered.first(ptr); !ptr.isNull(); ){ jam(); ScanFragRecPtr curr = ptr; // Remove while iterating... @@ -9290,20 +9316,19 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){ sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength, JBB); - ndbout_c("delivered -> running"); + ndbout_c("%d delivered -> closing (%d)", curr.i, curr.p->m_ops); } else { jam(); completed.add(curr); curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->stopFragTimer(); - ndbout_c("delivered -> completed"); + ndbout_c("%d delivered -> completed", curr.i); } }//for /** * All queued with data should be closed */ - ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags); for(queued.first(ptr); !ptr.isNull(); ){ jam(); ndbrequire(ptr.p->scanFragState == ScanFragRec::QUEUED_FOR_DELIVERY); @@ -9322,32 +9347,59 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){ sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, ScanFragNextReq::SignalLength, JBB); - ndbout_c("queued -> running"); + ndbout_c("%d queued -> closing", curr.i); } else { jam(); completed.add(curr); curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->stopFragTimer(); - ndbout_c("queued -> completed"); + ndbout_c("%d queued -> completed", curr.i); } } } - close_scan_req_send_conf(signal, scanptr); + close_scan_req_send_conf(signal, scanPtr); } void Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){ jam(); + ndbrequire(scanPtr.p->m_queued_scan_frags.isEmpty()); ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty()); + //ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty()); + +#if 1 + { + ScanFragList comp(c_scan_frag_pool, scanPtr.p->m_completed_scan_frags); + ScanFragRecPtr ptr; + for(comp.first(ptr); !ptr.isNull(); comp.next(ptr)){ + ndbrequire(ptr.p->scanFragTimer == 0); + ndbrequire(ptr.p->scanFragState == ScanFragRec::COMPLETED); + } + } +#endif + if(!scanPtr.p->m_running_scan_frags.isEmpty()){ jam(); + + ndbout_c("%d close_scan_req_send_conf: not ready", apiConnectptr.i); return; } - + const bool apiFail = (apiConnectptr.p->apiFailState == ZTRUE); + if(!scanPtr.p->m_close_scan_req){ + jam(); + /** + * The API hasn't order closing yet + */ + ndbout_c("%d close_scan_req_send_conf: api not ready", apiConnectptr.i); + return; + } + + ndbout_c("%d close_scan_req_send_conf: ready", apiConnectptr.i); + if(!apiFail){ jam(); Uint32 ref = apiConnectptr.p->ndbapiBlockref; @@ -9370,53 +9422,58 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){ } } -void Dbtc::seizeScanrec(Signal* signal) { +Dbtc::ScanRecordPtr +Dbtc::seizeScanrec(Signal* signal) { + ScanRecordPtr scanptr; scanptr.i = cfirstfreeScanrec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); cfirstfreeScanrec = scanptr.p->nextScan; scanptr.p->nextScan = RNIL; ndbrequire(scanptr.p->scanState == ScanRecord::IDLE); + return scanptr; }//Dbtc::seizeScanrec() -void Dbtc::sendScanFragReq(Signal* signal) { +void Dbtc::sendScanFragReq(Signal* signal, + ScanRecord* scanP, + ScanFragRec* scanFragP){ Uint32 requestInfo = 0; - ScanFragReq::setConcurrency(requestInfo, scanFragptr.p->scanFragConcurrency); - ScanFragReq::setLockMode(requestInfo, scanptr.p->scanLockMode); - ScanFragReq::setHoldLockFlag(requestInfo, scanptr.p->scanLockHold); - if(scanptr.p->scanLockMode == 1){ // Not read -> keyinfo + ScanFragReq::setConcurrency(requestInfo, scanFragP->scanFragConcurrency); + ScanFragReq::setLockMode(requestInfo, scanP->scanLockMode); + ScanFragReq::setHoldLockFlag(requestInfo, scanP->scanLockHold); + if(scanP->scanLockMode == 1){ // Not read -> keyinfo jam(); ScanFragReq::setKeyinfoFlag(requestInfo, 1); } - ScanFragReq::setReadCommittedFlag(requestInfo, scanptr.p->readCommitted); - ScanFragReq::setRangeScanFlag(requestInfo, scanptr.p->rangeScan); - ScanFragReq::setAttrLen(requestInfo, scanptr.p->scanAiLength); + ScanFragReq::setReadCommittedFlag(requestInfo, scanP->readCommitted); + ScanFragReq::setRangeScanFlag(requestInfo, scanP->rangeScan); + ScanFragReq::setAttrLen(requestInfo, scanP->scanAiLength); ScanFragReq::setScanPrio(requestInfo, 1); - apiConnectptr.i = scanptr.p->scanApiRec; + apiConnectptr.i = scanP->scanApiRec; ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord); ScanFragReq * const req = (ScanFragReq *)&signal->theData[0]; req->senderData = scanFragptr.i; req->resultRef = apiConnectptr.p->ndbapiBlockref; req->requestInfo = requestInfo; req->savePointId = apiConnectptr.p->currSavePointId; - req->tableId = scanptr.p->scanTableref; - req->fragmentNo = scanFragptr.p->scanFragId; - req->schemaVersion = scanptr.p->scanSchemaVersion; + req->tableId = scanP->scanTableref; + req->fragmentNo = scanFragP->scanFragId; + req->schemaVersion = scanP->scanSchemaVersion; req->transId1 = apiConnectptr.p->transid[0]; req->transId2 = apiConnectptr.p->transid[1]; for(int i = 0; i<16; i++){ - req->clientOpPtr[i] = scanFragptr.p->m_apiPtr; + req->clientOpPtr[i] = scanFragP->m_apiPtr; } - sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_FRAGREQ, signal, 25, JBB); + sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, 25, JBB); updateBuddyTimer(apiConnectptr); - scanFragptr.p->startFragTimer(ctcTimer); + scanFragP->startFragTimer(ctcTimer); }//Dbtc::sendScanFragReq() -void Dbtc::sendScanTabConf(Signal* signal) { +void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) { jam(); Uint32* ops = signal->getDataPtrSend()+4; - Uint32 op_count = scanptr.p->m_queued_count; + Uint32 op_count = scanP->m_queued_count; if(4 + 3 * op_count > 25){ jam(); ops += 21; @@ -9428,7 +9485,6 @@ void Dbtc::sendScanTabConf(Signal* signal) { conf->transId1 = apiConnectptr.p->transid[0]; conf->transId2 = apiConnectptr.p->transid[1]; ScanFragRecPtr ptr; - ScanRecord* scanP = scanptr.p; ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags); ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags); ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags); @@ -9466,7 +9522,7 @@ void Dbtc::sendScanTabConf(Signal* signal) { sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABCONF, signal, ScanTabConf::SignalLength + 3 * op_count, JBB); } - scanptr.p->m_queued_count = 0; + scanP->m_queued_count = 0; }//Dbtc::sendScanTabConf() @@ -9715,12 +9771,14 @@ void Dbtc::initialiseRecordsLab(Signal* signal, UintR Tdata0, /* ========================================================================= */ void Dbtc::initialiseScanrec(Signal* signal) { + ScanRecordPtr scanptr; ndbrequire(cscanrecFileSize > 0); for (scanptr.i = 0; scanptr.i < cscanrecFileSize; scanptr.i++) { jam(); ptrAss(scanptr, scanRecord); new (scanptr.p) ScanRecord(); scanptr.p->scanState = ScanRecord::IDLE; + scanptr.p->scanApiRec = RNIL; scanptr.p->nextScan = scanptr.i + 1; }//for scanptr.i = cscanrecFileSize - 1; @@ -11496,7 +11554,8 @@ void Dbtc::readIndexTable(Signal* signal, Uint32 transId1 = indexOp->tcIndxReq->transId1; Uint32 transId2 = indexOp->tcIndxReq->transId2; - const Uint8 opType = TcKeyReq::getOperationType(tcKeyRequestInfo); + const Operation_t opType = + (Operation_t)TcKeyReq::getOperationType(tcKeyRequestInfo); // Find index table if ((indexData = c_theIndexes.getPtr(indexOp->tcIndxReq->indexId)) == NULL) { diff --git a/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp b/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp index cd5057d8a62..f7d55d0acc9 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp @@ -54,6 +54,9 @@ void Dbtup::execSEND_PACKED(Signal* signal) void Dbtup::bufferTRANSID_AI(Signal* signal, BlockReference aRef, Uint32 Tlen) { + if(Tlen == 3) + return; + Uint32 hostId = refToNode(aRef); Uint32 Theader = ((refToBlock(aRef) << 16)+(Tlen-3)); diff --git a/ndb/src/ndbapi/NdbConnectionScan.cpp b/ndb/src/ndbapi/NdbConnectionScan.cpp index ea45f2b5a00..d405dedc09f 100644 --- a/ndb/src/ndbapi/NdbConnectionScan.cpp +++ b/ndb/src/ndbapi/NdbConnectionScan.cpp @@ -61,7 +61,14 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){ const ScanTabRef * ref = CAST_CONSTPTR(ScanTabRef, aSignal->getDataPtr()); if(checkState_TransId(&ref->transId1)){ - theScanningOp->execCLOSE_SCAN_REP(ref->errorCode); + theScanningOp->theError.code = ref->errorCode; + if(!ref->closeNeeded){ + theScanningOp->execCLOSE_SCAN_REP(); + return 0; + } + assert(theScanningOp->m_sent_receivers_count); + theScanningOp->m_sent_receivers_count--; + theScanningOp->m_conf_receivers_count++; return 0; } return -1; @@ -88,11 +95,10 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, if(checkState_TransId(&conf->transId1)){ if (conf->requestInfo == ScanTabConf::EndOfData) { - theScanningOp->execCLOSE_SCAN_REP(0); + theScanningOp->execCLOSE_SCAN_REP(); return 0; } - int noComp = -1; for(Uint32 i = 0; i<len; i += 3){ Uint32 ptrI = * ops++; Uint32 tcPtrI = * ops++; @@ -108,15 +114,13 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, /** * */ - noComp++; theScanningOp->receiver_delivered(tOp); } else if(info == ScanTabConf::EndOfData){ - noComp++; theScanningOp->receiver_completed(tOp); } } } - return noComp; + return 0; } return -1; diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 7cbf35ab4fd..2f0bd82044c 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -124,7 +124,7 @@ NdbScanOperation::init(NdbTableImpl* tab, NdbConnection* myConnection) theTotalBoundAI_Len = 0; theBoundATTRINFO = NULL; - + return 0; } @@ -135,6 +135,8 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, m_ordered = 0; Uint32 fragCount = m_currentTable->m_fragmentCount; + ndbout_c("batch: %d parallell: %d fragCount: %d", + batch, parallell, fragCount); if(batch + parallell == 0){ // Max speed batch = 16; @@ -153,6 +155,9 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, parallell = fragCount; else if(parallell == 0) parallell = fragCount; + + ndbout_c("batch: %d parallell: %d fragCount: %d", + batch, parallell, fragCount); assert(parallell > 0); @@ -486,6 +491,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed) last = m_api_receivers_count; do { + if(theError.code){ + setErrorCode(theError.code); + return -1; + } + Uint32 cnt = m_conf_receivers_count; Uint32 sent = m_sent_receivers_count; @@ -502,12 +512,17 @@ int NdbScanOperation::nextResult(bool fetchAllowed) */ theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; + ndbout_c("%d : api: %d conf: %d sent: %d", + __LINE__, + m_api_receivers_count, + m_conf_receivers_count, + m_sent_receivers_count); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { continue; } else { idx = last; - retVal = -1; //return_code; + retVal = -2; //return_code; } } else if(retVal == 2){ /** @@ -516,6 +531,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed) if(send_next_scan(0, true) == 0){ // Close scan theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; + ndbout_c("%d : api: %d conf: %d sent: %d", + __LINE__, + m_api_receivers_count, + m_conf_receivers_count, + m_sent_receivers_count); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { return 1; @@ -633,6 +653,12 @@ NdbScanOperation::doSend(int ProcessorId) void NdbScanOperation::closeScan() { + ndbout_c("closeScan %d : api: %d conf: %d sent: %d", + __LINE__, + m_api_receivers_count, + m_conf_receivers_count, + m_sent_receivers_count); + do { TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); @@ -651,6 +677,11 @@ void NdbScanOperation::closeScan() while(m_sent_receivers_count){ theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; + ndbout_c("%d : api: %d conf: %d sent: %d", + __LINE__, + m_api_receivers_count, + m_conf_receivers_count, + m_sent_receivers_count); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); switch(return_code){ case 0: @@ -679,6 +710,11 @@ void NdbScanOperation::closeScan() do { theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; + ndbout_c("%d : api: %d conf: %d sent: %d", + __LINE__, + m_api_receivers_count, + m_conf_receivers_count, + m_sent_receivers_count); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); switch(return_code){ case 0: @@ -701,22 +737,7 @@ void NdbScanOperation::closeScan() } void -NdbScanOperation::execCLOSE_SCAN_REP(Uint32 errCode){ - /** - * We will receive no further signals from this scan - */ - if(!errCode){ - /** - * Normal termination - */ - theNdbCon->theCommitStatus = NdbConnection::Committed; - theNdbCon->theCompletionStatus = NdbConnection::CompletedSuccess; - } else { - /** - * Something is fishy - */ - abort(); - } +NdbScanOperation::execCLOSE_SCAN_REP(){ m_api_receivers_count = 0; m_conf_receivers_count = 0; m_sent_receivers_count = 0; @@ -1206,7 +1227,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ Uint32 nodeId = theNdbCon->theDBnode; if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(u_idx)){ Uint32 tmp = m_sent_receivers_count; - while(m_sent_receivers_count > 0){ + while(m_sent_receivers_count > 0 && !theError.code){ theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1223,12 +1244,16 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ memcpy(arr, m_conf_receivers, u_last * sizeof(char*)); if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last); + if(theError.code){ + setErrorCode(theError.code); + return -1; + } } } else { return 2; } } - + if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", u_idx, u_last, s_idx, s_last); @@ -1279,9 +1304,12 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ Guard guard(tp->theMutexPtr); Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; - if(seq == tp->getNodeSequence(nodeId) && send_next_scan(0, true) == 0){ + if(seq == tp->getNodeSequence(nodeId) && + send_next_scan(0, true) == 0 && + theError.code == 0){ return 1; } + setErrorCode(theError.code); return -1; } diff --git a/ndb/src/ndbapi/Ndbif.cpp b/ndb/src/ndbapi/Ndbif.cpp index f7d537dafa5..92723431860 100644 --- a/ndb/src/ndbapi/Ndbif.cpp +++ b/ndb/src/ndbapi/Ndbif.cpp @@ -705,23 +705,25 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) { tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; + + tCon = void2con(tFirstDataPtr); + + assert(tFirstDataPtr != 0 && + void2con(tFirstDataPtr)->checkMagicNumber() == 0); - if (tWaitState == WAIT_SCAN){ - tCon = void2con(tFirstDataPtr); - if (tCon->checkMagicNumber() == 0){ - tReturnCode = tCon->receiveSCAN_TABREF(aSignal); - if (tReturnCode != -1){ - theWaiter.m_state = NO_WAIT; - } - break; + if (tCon->checkMagicNumber() == 0){ + tReturnCode = tCon->receiveSCAN_TABREF(aSignal); + if (tReturnCode != -1){ + theWaiter.m_state = NO_WAIT; } + break; } goto InvalidSignal; - } + } case GSN_SCAN_TABINFO: - { - goto InvalidSignal; - } + { + goto InvalidSignal; + } case GSN_KEYINFO20: { tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; |