summaryrefslogtreecommitdiff
path: root/ndb
diff options
context:
space:
mode:
authorunknown <joreland@mysql.com>2004-06-01 13:28:29 +0200
committerunknown <joreland@mysql.com>2004-06-01 13:28:29 +0200
commitab198e52501d78830acbcc3643f8ad09a40cc288 (patch)
treef38d4be428d92161aa51f9142698d7b4642ec0e9 /ndb
parent9ff4d240cf53a3946093cfebacf83890bcb02f81 (diff)
downloadmariadb-git-ab198e52501d78830acbcc3643f8ad09a40cc288.tar.gz
Update error handling of new scan
Still known bugs :-( ndb/include/kernel/signaldata/ScanTab.hpp: Add close flag ndb/include/ndbapi/NdbConnection.hpp: Moved mehtod outside ndb/include/ndbapi/NdbScanOperation.hpp: Removed err code from ndb/src/common/debugger/signaldata/ScanTab.cpp: Updated printer ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: New error inserts for SCAN ndb/src/kernel/blocks/dbtc/Dbtc.hpp: Update handling of frag timeouts ndb/src/kernel/blocks/dbtc/DbtcMain.cpp: Update handling of frag timeouts ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp: Don't send empty TRANSID_AI's ndb/src/ndbapi/NdbConnectionScan.cpp: Update error handling of scan ndb/src/ndbapi/NdbScanOperation.cpp: Update error handling of scan ndb/src/ndbapi/Ndbif.cpp: Update error handling of scan
Diffstat (limited to 'ndb')
-rw-r--r--ndb/include/kernel/signaldata/ScanTab.hpp4
-rw-r--r--ndb/include/ndbapi/NdbConnection.hpp25
-rw-r--r--ndb/include/ndbapi/NdbScanOperation.hpp2
-rw-r--r--ndb/src/common/debugger/signaldata/ScanTab.cpp4
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhMain.cpp16
-rw-r--r--ndb/src/kernel/blocks/dbtc/Dbtc.hpp20
-rw-r--r--ndb/src/kernel/blocks/dbtc/DbtcMain.cpp639
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp3
-rw-r--r--ndb/src/ndbapi/NdbConnectionScan.cpp16
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp70
-rw-r--r--ndb/src/ndbapi/Ndbif.cpp26
11 files changed, 470 insertions, 355 deletions
diff --git a/ndb/include/kernel/signaldata/ScanTab.hpp b/ndb/include/kernel/signaldata/ScanTab.hpp
index 6cef4381c07..1c11bdee4ae 100644
--- a/ndb/include/kernel/signaldata/ScanTab.hpp
+++ b/ndb/include/kernel/signaldata/ScanTab.hpp
@@ -367,7 +367,7 @@ public:
/**
* Length of signal
*/
- STATIC_CONST( SignalLength = 4 );
+ STATIC_CONST( SignalLength = 5 );
private:
@@ -380,7 +380,7 @@ private:
UintR transId1; // DATA 1
UintR transId2; // DATA 2
UintR errorCode; // DATA 3
- // UintR sendScanNextReqWithClose; // DATA 4
+ UintR closeNeeded; // DATA 4
};
diff --git a/ndb/include/ndbapi/NdbConnection.hpp b/ndb/include/ndbapi/NdbConnection.hpp
index 65f6bd2995f..bf5a4f6f0e5 100644
--- a/ndb/include/ndbapi/NdbConnection.hpp
+++ b/ndb/include/ndbapi/NdbConnection.hpp
@@ -633,17 +633,7 @@ private:
#ifdef VM_TRACE
void printState();
#endif
-
- bool checkState_TransId(const Uint32 * transId) const {
- const Uint32 tTmp1 = transId[0];
- const Uint32 tTmp2 = transId[1];
- Uint64 tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
- bool b = theStatus == Connected && theTransactionId == tRecTransId;
-#ifdef NDB_NO_DROPPED_SIGNAL
- if(!b) abort();
-#endif
- return b;
- }
+ bool checkState_TransId(const Uint32 * transId) const;
};
inline
@@ -678,6 +668,19 @@ NdbConnection::checkMagicNumber()
}
}
+inline
+bool
+NdbConnection::checkState_TransId(const Uint32 * transId) const {
+ const Uint32 tTmp1 = transId[0];
+ const Uint32 tTmp2 = transId[1];
+ Uint64 tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
+ bool b = theStatus == Connected && theTransactionId == tRecTransId;
+#ifdef NDB_NO_DROPPED_SIGNAL
+ if(!b) abort();
+#endif
+ return b;
+}
+
/************************************************************************************************
void setTransactionId(Uint64 aTransactionId);
diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp
index 8ff640dc6ec..a329505ef1b 100644
--- a/ndb/include/ndbapi/NdbScanOperation.hpp
+++ b/ndb/include/ndbapi/NdbScanOperation.hpp
@@ -146,7 +146,7 @@ protected:
int send_next_scan(Uint32 cnt, bool close);
void receiver_delivered(NdbReceiver*);
void receiver_completed(NdbReceiver*);
- void execCLOSE_SCAN_REP(Uint32 errCode);
+ void execCLOSE_SCAN_REP();
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
diff --git a/ndb/src/common/debugger/signaldata/ScanTab.cpp b/ndb/src/common/debugger/signaldata/ScanTab.cpp
index 776e9cf3bfc..b0383d6d6df 100644
--- a/ndb/src/common/debugger/signaldata/ScanTab.cpp
+++ b/ndb/src/common/debugger/signaldata/ScanTab.cpp
@@ -119,8 +119,8 @@ printSCANTABREF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
sig->transId1, sig->transId2);
fprintf(output, " Errorcode: %u\n", sig->errorCode);
-
- // fprintf(output, " sendScanNextReqWithClose: %u\n", sig->sendScanNextReqWithClose);
+
+ fprintf(output, " closeNeeded: %u\n", sig->closeNeeded);
return false;
}
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index 463a3d47354..2a744ea746a 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -6785,7 +6785,8 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal)
if (findTransaction(transid1, transid2, senderData) != ZOK){
jam();
- DEBUG("Received SCAN_NEXTREQ in LQH with close flag when closed");
+ DEBUG(senderData <<
+ " Received SCAN_NEXTREQ in LQH with close flag when closed");
ndbrequire(nextReq->closeFlag == ZTRUE);
return;
}
@@ -6825,6 +6826,10 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal)
return;
}//if
+ if(ERROR_INSERTED(5036)){
+ return;
+ }
+
scanptr.i = tcConnectptr.p->tcScanRec;
ndbrequire(scanptr.i != RNIL);
c_scanRecordPool.getPtr(scanptr);
@@ -6841,6 +6846,10 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal)
if(ERROR_INSERTED(5034)){
CLEAR_ERROR_INSERT_VALUE;
}
+ if(ERROR_INSERTED(5036)){
+ CLEAR_ERROR_INSERT_VALUE;
+ return;
+ }
closeScanRequestLab(signal);
return;
}//if
@@ -8517,6 +8526,11 @@ void Dblqh::sendKeyinfo20(Signal* signal,
* ------------------------------------------------------------------------ */
void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
{
+ if(ERROR_INSERTED(5037)){
+ CLEAR_ERROR_INSERT_VALUE;
+ return;
+ }
+
scanptr.p->scanTcWaiting = ZFALSE;
ScanFragConf * conf = (ScanFragConf*)&signal->theData[0];
diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
index 61e7e42621c..501cec1f231 100644
--- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
+++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
@@ -1194,6 +1194,9 @@ public:
// Scan is on ordered index
Uint8 rangeScan;
+
+ // Close is ordered
+ bool m_close_scan_req;
};
typedef Ptr<ScanRecord> ScanRecordPtr;
@@ -1414,15 +1417,15 @@ private:
Uint32 buddyPtr,
UintR transid1,
UintR transid2);
- void initScanrec(Signal* signal,
+ void initScanrec(ScanRecordPtr, const class ScanTabReq*,
const UintR scanParallel,
const UintR noOprecPerFrag);
void initScanfragrec(Signal* signal);
void releaseScanResources(ScanRecordPtr);
- void seizeScanrec(Signal* signal);
- void sendScanFragReq(Signal* signal);
- void sendScanTabConf(Signal* signal);
- void close_scan_req(Signal*, ScanRecordPtr);
+ ScanRecordPtr seizeScanrec(Signal* signal);
+ void sendScanFragReq(Signal* signal, ScanRecord*, ScanFragRec*);
+ void sendScanTabConf(Signal* signal, ScanRecord*);
+ void close_scan_req(Signal*, ScanRecordPtr, bool received_req);
void close_scan_req_send_conf(Signal*, ScanRecordPtr);
void checkGcp(Signal* signal);
@@ -1557,11 +1560,11 @@ private:
void systemErrorLab(Signal* signal);
void sendSignalErrorRefuseLab(Signal* signal);
void scanTabRefLab(Signal* signal, Uint32 errCode);
- void diFcountReqLab(Signal* signal);
+ void diFcountReqLab(Signal* signal, ScanRecordPtr);
void signalErrorRefuseLab(Signal* signal);
void abort080Lab(Signal* signal);
void packKeyData000Lab(Signal* signal, BlockReference TBRef);
- void abortScanLab(Signal* signal, Uint32 errCode);
+ void abortScanLab(Signal* signal, ScanRecordPtr, Uint32 errCode);
void sendAbortedAfterTimeout(Signal* signal, int Tcheck);
void abort010Lab(Signal* signal);
void abort015Lab(Signal* signal);
@@ -1589,7 +1592,7 @@ private:
void attrinfo020Lab(Signal* signal);
void scanReleaseResourcesLab(Signal* signal);
void scanCompletedLab(Signal* signal);
- void scanFragError(Signal* signal, Uint32 errorCode);
+ void scanError(Signal* signal, ScanRecordPtr, Uint32 errorCode);
void diverify010Lab(Signal* signal);
void intstartphase2x010Lab(Signal* signal);
void intstartphase3x010Lab(Signal* signal);
@@ -1699,7 +1702,6 @@ private:
ApiConnectRecordPtr timeOutptr;
ScanRecord *scanRecord;
- ScanRecordPtr scanptr;
UintR cscanrecFileSize;
UnsafeArrayPool<ScanFragRec> c_scan_frag_pool;
diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
index edb51ea3c89..04506bc62eb 100644
--- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
+++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
@@ -76,6 +76,39 @@
#define INTERNAL_TRIGGER_TCKEYREQ_JBA 0
+#ifdef VM_TRACE
+NdbOut &
+operator<<(NdbOut& out, Dbtc::ConnectionState state){
+ out << (int)state;
+ return out;
+}
+NdbOut &
+operator<<(NdbOut& out, Dbtc::OperationState state){
+ out << (int)state;
+ return out;
+}
+NdbOut &
+operator<<(NdbOut& out, Dbtc::AbortState state){
+ out << (int)state;
+ return out;
+}
+NdbOut &
+operator<<(NdbOut& out, Dbtc::ReturnSignal state){
+ out << (int)state;
+ return out;
+}
+NdbOut &
+operator<<(NdbOut& out, Dbtc::ScanRecord::ScanState state){
+ out << (int)state;
+ return out;
+}
+NdbOut &
+operator<<(NdbOut& out, Dbtc::ScanFragRec::ScanFragState state){
+ out << (int)state;
+ return out;
+}
+#endif
+
void
Dbtc::updateBuddyTimer(ApiConnectRecordPtr apiPtr)
{
@@ -915,7 +948,7 @@ Dbtc::handleFailedApiNode(Signal* signal,
ScanRecordPtr scanPtr;
scanPtr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanPtr, cscanrecFileSize, scanRecord);
- close_scan_req(signal, scanPtr);
+ close_scan_req(signal, scanPtr, true);
TloopCount += 64;
break;
@@ -1095,138 +1128,6 @@ void Dbtc::handleApiFailState(Signal* signal, UintR TapiConnectptr)
}//if
}//Dbtc::handleApiFailState()
-/**
- * Dbtc::handleScanStop
- * This function is called when an entire scan should be stopped
- * Check state of the scan and take appropriate action.
- * The parameter TapiFailedNode indicates if the scan is stopped
- * because an API node has failed or if it has been stopped because
- * the scan has timed out.
- *
- */
-void Dbtc::handleScanStop(Signal* signal, UintR TapiFailedNode)
-{
-#if JONAS_NOT_DONE
- arrGuard(TapiFailedNode, MAX_NODES);
-
- scanptr.i = apiConnectptr.p->apiScanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
-
- // If api has failed we must release all resources
- bool apiNodeHasFailed = (TapiFailedNode != 0);
-
- DEBUG("handleScanStop: scanState = "<< scanptr.p->scanState);
-
- switch (scanptr.p->scanState) {
- case ScanRecord::WAIT_SCAN_TAB_INFO:
- case ScanRecord::WAIT_AI:
- jam();
- /**
- * The scan process is still in the definition phase.
- * We will release the resources and then release the connection
- * to the failed API.
- */
- releaseScanResources(scanptr);
- if (apiNodeHasFailed) {
- jam();
- releaseApiCon(signal, apiConnectptr.i);
- }//if
- break;
-
- case ScanRecord::WAIT_FRAGMENT_COUNT:
- jam();
- if (!apiNodeHasFailed) {
- jam();
- /**
- * Time-out waiting for a local signal can only happen
- * if we have a serious problem.
- */
- systemErrorLab(signal);
- }//if
- capiConnectClosing[TapiFailedNode]++;
- apiConnectptr.p->apiFailState = ZTRUE;
- scanptr.p->apiIsClosed = true;
- break;
-
- case ScanRecord::CLOSING_SCAN:
- jam();
- /**
- * With CLOSING_SCAN it is enough to set the
- * fail state such that the connection is released at the end of the
- * closing process. The close process is already ongoing.
- * Set apiIsClosed to true to indicate that resources should be released
- * at the end of the close process.
- **/
-
- if (apiNodeHasFailed) {
- jam();
- capiConnectClosing[TapiFailedNode]++;
- apiConnectptr.p->apiFailState = ZTRUE;
- scanptr.p->apiIsClosed = true;
- }//if
- if (apiConnectptr.p->apiFailState == ZTRUE) {
- jam();
- handleApiFailState(signal, apiConnectptr.i);
- return;
- }//if
- break;
-
- case ScanRecord::SCAN_NEXT_ORDERED:
- /**
- * In the SCAN_NEXT_ORDERED state we will wait for the next natural place
- * to receive some action from the API and instead of waiting for the
- * API here we will start the abort process.
-
- * After the abort process is completed we will release the connection.
- */
- if (apiNodeHasFailed) {
- jam();
- capiConnectClosing[TapiFailedNode]++;
- apiConnectptr.p->apiFailState = ZTRUE;
- }//if
- // Release resources and send a response to API
- scanptr.p->apiIsClosed = true;
- scanCompletedLab(signal);
- break;
-
- case ScanRecord::DELIVERED:
- case ScanRecord::QUEUED_DELIVERED:
- /**
- * A response has been sent to the api but it has not responded
- */
-
- if (apiNodeHasFailed) {
- jam();
- capiConnectClosing[TapiFailedNode]++;
- apiConnectptr.p->apiFailState = ZTRUE;
- scanptr.p->apiIsClosed = true;
- } else {
- jam();
- /*
- In this case we have received a time-out caused by the application
- waiting too long to continue the scan. We will check the application
- time-out instead of the deadlock detetection time-out. If the
- application time-out hasn't fired we will simply ignore the condition.
- */
- if ((ctcTimer - getApiConTimer(apiConnectptr.i)) <= c_appl_timeout_value) {
- jam();
- return;
- }//if
- // Dont' release, wait until api responds or fails
- scanptr.p->apiIsClosed = false;
- }
- scanCompletedLab(signal);
- break;
-
- default:
- jam();
- systemErrorLab(signal);
- break;
-
- }//switch
-#endif
-}//Dbtc::handleScanStop()
-
/****************************************************************************
* T C S E I Z E R E Q
* THE APPLICATION SENDS A REQUEST TO SEIZE A CONNECT RECORD TO CARRY OUT A
@@ -1409,7 +1310,7 @@ void Dbtc::printState(Signal* signal, int place)
<< " counter = " << apiConnectptr.p->counter
<< " lqhkeyconfrec = " << apiConnectptr.p->lqhkeyconfrec
<< " lqhkeyreqrec = " << apiConnectptr.p->lqhkeyreqrec << endl;
- ndbout << "abortState = " << (int)apiConnectptr.p->abortState
+ ndbout << "abortState = " << apiConnectptr.p->abortState
<< " apiScanRec = " << apiConnectptr.p->apiScanRec
<< " returncode = " << apiConnectptr.p->returncode << endl;
ndbout << "tckeyrec = " << apiConnectptr.p->tckeyrec
@@ -6155,11 +6056,14 @@ void Dbtc::timeOutFoundLab(Signal* signal, Uint32 TapiConPtr)
tcConnectptr.i = apiConnectptr.p->firstTcConnect;
sendAbortedAfterTimeout(signal, 0);
break;
- case CS_START_SCAN:
+ case CS_START_SCAN:{
jam();
- apiConnectptr.p->returncode = ZSCANTIME_OUT_ERROR;
- handleScanStop(signal, 0);
+ ScanRecordPtr scanPtr;
+ scanPtr.i = apiConnectptr.p->apiScanRec;
+ ptrCheckGuard(scanPtr, cscanrecFileSize, scanRecord);
+ scanError(signal, scanPtr, ZSCANTIME_OUT_ERROR);
break;
+ }
case CS_WAIT_ABORT_CONF:
jam();
tcConnectptr.i = apiConnectptr.p->currentTcConnect;
@@ -6529,14 +6433,15 @@ void Dbtc::execSCAN_HBREP(Signal* signal)
c_scan_frag_pool.getPtr(scanFragptr);
switch (scanFragptr.p->scanFragState){
case ScanFragRec::LQH_ACTIVE:
+ //case ScanFragRec::LQH_ACTIVE_CLOSE:
break;
-
default:
DEBUG("execSCAN_HBREP: scanFragState="<<scanFragptr.p->scanFragState);
systemErrorLab(signal);
break;
}
+ ScanRecordPtr scanptr;
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
@@ -6567,6 +6472,7 @@ void Dbtc::execSCAN_HBREP(Signal* signal)
updateBuddyTimer(apiConnectptr);
scanFragptr.p->startFragTimer(ctcTimer);
} else {
+ ndbassert(false);
DEBUG("SCAN_HBREP when scanFragTimer was turned off");
}
}//execSCAN_HBREP()
@@ -6575,34 +6481,56 @@ void Dbtc::execSCAN_HBREP(Signal* signal)
/* Timeout has occured on a fragment which means a scan has timed out. */
/* If this is true we have an error in LQH/ACC. */
/*--------------------------------------------------------------------------*/
+static int kalle = 0;
void Dbtc::timeOutFoundFragLab(Signal* signal, UintR TscanConPtr)
{
- scanFragptr.i = TscanConPtr;
- c_scan_frag_pool.getPtr(scanFragptr);
- DEBUG("timeOutFoundFragLab: scanFragState = "<<scanFragptr.p->scanFragState);
+ ScanFragRecPtr ptr;
+ c_scan_frag_pool.getPtr(ptr, TscanConPtr);
+ DEBUG(TscanConPtr << " timeOutFoundFragLab: scanFragState = "<< ptr.p->scanFragState);
/*-------------------------------------------------------------------------*/
// The scan fragment has expired its timeout. Check its state to decide
// what to do.
/*-------------------------------------------------------------------------*/
- switch (scanFragptr.p->scanFragState) {
-
+ switch (ptr.p->scanFragState) {
case ScanFragRec::WAIT_GET_PRIMCONF:
jam();
- // Crash the system if we do not return from DIGETPRIMREQ in time.
- systemErrorLab(signal);
+ ndbrequire(false);
break;
-
- case ScanFragRec::LQH_ACTIVE:
+ case ScanFragRec::LQH_ACTIVE:{
jam();
+
/**
* The LQH expired it's timeout, try to close it
*/
- scanFragError(signal, ZSCAN_FRAG_LQH_ERROR);
- DEBUG(" LQH_ACTIVE - closing the fragment scan in node "
- << refToNode(scanFragptr.p->lqhBlockref));
- break;
+ Uint32 nodeId = refToNode(ptr.p->lqhBlockref);
+ Uint32 connectCount = getNodeInfo(nodeId).m_connectCount;
+ ScanRecordPtr scanptr;
+ scanptr.i = ptr.p->scanRec;
+ ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
+
+ if(connectCount != ptr.p->m_connectCount){
+ jam();
+ /**
+ * The node has died
+ */
+ ndbout_c("Node %d has died", nodeId);
+ ptr.p->scanFragState = ScanFragRec::COMPLETED;
+ ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
+ ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
+
+ run.remove(ptr);
+ comp.add(ptr);
+ ptr.p->stopFragTimer();
+ } else {
+ kalle++;
+ if(kalle > 5)
+ ndbassert(scanptr.p->scanState != ScanRecord::CLOSING_SCAN);
+ }
+ scanError(signal, scanptr, ZSCAN_FRAG_LQH_ERROR);
+ break;
+ }
case ScanFragRec::DELIVERED:
jam();
case ScanFragRec::IDLE:
@@ -6863,14 +6791,38 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
Uint32 scanPtrI,
Uint32 failedNodeId){
+ ScanRecordPtr scanptr;
for (scanptr.i = scanPtrI; scanptr.i < cscanrecFileSize; scanptr.i++) {
jam();
ptrAss(scanptr, scanRecord);
if (scanptr.p->scanState != ScanRecord::IDLE){
- checkScanFragList(signal, failedNodeId,
- scanptr.p, scanptr.p->m_running_scan_frags);
- }
+ jam();
+ ScanFragRecPtr ptr;
+ ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
+ ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
+ bool found = false;
+ for(run.first(ptr); !ptr.isNull(); ){
+ jam();
+ ScanFragRecPtr curr = ptr;
+ run.next(ptr);
+ if (curr.p->scanFragState == ScanFragRec::LQH_ACTIVE &&
+ refToNode(curr.p->lqhBlockref) == failedNodeId){
+ jam();
+
+ run.remove(curr);
+ comp.add(curr);
+ curr.p->scanFragState = ScanFragRec::COMPLETED;
+ curr.p->stopFragTimer();
+ found = true;
+ }
+ }
+ if(found){
+ jam();
+ scanError(signal, scanptr, ZSCAN_LQH_ERROR);
+ }
+ }
+
// Send CONTINUEB to continue later
signal->theData[0] = TcContinueB::ZCHECK_SCAN_ACTIVE_FAILED_LQH;
signal->theData[1] = scanptr.i + 1; // Check next scanptr
@@ -6886,29 +6838,7 @@ Dbtc::checkScanFragList(Signal* signal,
ScanRecord * scanP,
ScanFragList::Head & head){
- ScanFragRecPtr ptr;
- ScanFragList list(c_scan_frag_pool, head);
-
- for(list.first(ptr); !ptr.isNull(); list.next(ptr)){
- if (refToNode(ptr.p->lqhBlockref) == failedNodeId){
- switch (ptr.p->scanFragState){
- case ScanFragRec::LQH_ACTIVE:
- jam();
- apiConnectptr.i = scanptr.p->scanApiRec;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize,
- apiConnectRecord);
-
- DEBUG("checkScanActiveInFailedLqh: scanFragError");
- scanFragError(signal, ZSCAN_LQH_ERROR);
-
- break;
- default:
- /* empty */
- jam();
- break;
- }
- }
- }
+ DEBUG("checkScanActiveInFailedLqh: scanFragError");
}
void Dbtc::execTAKE_OVERTCCONF(Signal* signal)
@@ -8421,6 +8351,7 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
Uint32 noOprecPerFrag = ScanTabReq::getScanBatch(reqinfo);
Uint32 scanParallel = scanConcurrency;
Uint32 errCode;
+ ScanRecordPtr scanptr;
if(noOprecPerFrag == 0){
jam();
@@ -8445,12 +8376,13 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
return;
}//if
ptrAss(apiConnectptr, apiConnectRecord);
+ ApiConnectRecord * transP = apiConnectptr.p;
- if (apiConnectptr.p->apiConnectstate != CS_CONNECTED) {
+ if (transP->apiConnectstate != CS_CONNECTED) {
jam();
// could be left over from TCKEYREQ rollback
- if (apiConnectptr.p->apiConnectstate == CS_ABORTING &&
- apiConnectptr.p->abortState == AS_IDLE) {
+ if (transP->apiConnectstate == CS_ABORTING &&
+ transP->abortState == AS_IDLE) {
jam();
} else {
jam();
@@ -8515,15 +8447,26 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
}
seizeTcConnect(signal);
- seizeCacheRecord(signal);
- seizeScanrec(signal);
- initScanrec(signal, scanParallel, noOprecPerFrag);
tcConnectptr.p->apiConnect = apiConnectptr.i;
- initScanApirec(signal, buddyPtr, transid1, transid2);
+
+ seizeCacheRecord(signal);
+ scanptr = seizeScanrec(signal);
+
+ ndbrequire(transP->apiScanRec == RNIL);
+ ndbrequire(scanptr.p->scanApiRec == RNIL);
+
+ initScanrec(scanptr, scanTabReq, scanParallel, noOprecPerFrag);
+
+ //initScanApirec(signal, buddyPtr, transid1, transid2);
+ transP->apiScanRec = scanptr.i;
+ transP->returncode = 0;
+ transP->transid[0] = transid1;
+ transP->transid[1] = transid2;
+ transP->buddyPtr = buddyPtr;
// The scan is started
- apiConnectptr.p->apiConnectstate = CS_START_SCAN;
- apiConnectptr.p->currSavePointId = currSavePointId;
+ transP->apiConnectstate = CS_START_SCAN;
+ transP->currSavePointId = currSavePointId;
/**********************************************************
* We start the timer on scanRec to be able to discover a
@@ -8546,12 +8489,14 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
SCAN_TAB_error:
jam();
+ ndbrequire(false);
ScanTabRef * ref = (ScanTabRef*)&signal->theData[0];
- ref->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
+ ref->apiConnectPtr = transP->ndbapiConnect;
ref->transId1 = transid1;
ref->transId2 = transid2;
ref->errorCode = errCode;
- sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABREF,
+ ref->closeNeeded = 0;
+ sendSignal(transP->ndbapiBlockref, GSN_SCAN_TABREF,
signal, ScanTabRef::SignalLength, JBB);
return;
@@ -8561,20 +8506,13 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
void Dbtc::initScanApirec(Signal* signal,
Uint32 buddyPtr, UintR transid1, UintR transid2)
{
- ApiConnectRecord * apiPtr = apiConnectptr.p;
- apiPtr->apiScanRec = scanptr.i;
- apiPtr->returncode = 0;
- apiPtr->transid[0] = transid1;
- apiPtr->transid[1] = transid2;
- apiPtr->buddyPtr = buddyPtr;
-
}//Dbtc::initScanApirec()
-void Dbtc::initScanrec(Signal* signal,
+void Dbtc::initScanrec(ScanRecordPtr scanptr,
+ const ScanTabReq * scanTabReq,
UintR scanParallel,
UintR noOprecPerFrag)
{
- const ScanTabReq * const scanTabReq = (ScanTabReq *)&signal->theData[0];
const UintR reqinfo = scanTabReq->requestInfo;
ndbrequire(scanParallel < 16);
@@ -8613,6 +8551,7 @@ void Dbtc::scanTabRefLab(Signal* signal, Uint32 errCode)
ref->transId1 = apiConnectptr.p->transid[0];
ref->transId2 = apiConnectptr.p->transid[1];
ref->errorCode = errCode;
+ ref->closeNeeded = 0;
sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABREF,
signal, ScanTabRef::SignalLength, JBB);
}//Dbtc::scanTabRefLab()
@@ -8623,6 +8562,7 @@ void Dbtc::scanTabRefLab(Signal* signal, Uint32 errCode)
/*---------------------------------------------------------------------------*/
void Dbtc::scanAttrinfoLab(Signal* signal, UintR Tlen)
{
+ ScanRecordPtr scanptr;
scanptr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
tcConnectptr.i = scanptr.p->scanTcrec;
@@ -8652,7 +8592,7 @@ void Dbtc::scanAttrinfoLab(Signal* signal, UintR Tlen)
* THIS SCAN. WE ARE READY TO START THE ACTUAL
* EXECUTION OF THE SCAN QUERY
**************************************************/
- diFcountReqLab(signal);
+ diFcountReqLab(signal, scanptr);
return;
}//if
}//if
@@ -8660,21 +8600,21 @@ void Dbtc::scanAttrinfoLab(Signal* signal, UintR Tlen)
scanAttrinfo_attrbuf_error:
jam();
- abortScanLab(signal, ZGET_ATTRBUF_ERROR);
+ abortScanLab(signal, scanptr, ZGET_ATTRBUF_ERROR);
return;
scanAttrinfo_attrbuf2_error:
jam();
- abortScanLab(signal, ZGET_ATTRBUF_ERROR);
+ abortScanLab(signal, scanptr, ZGET_ATTRBUF_ERROR);
return;
scanAttrinfo_len_error:
jam();
- abortScanLab(signal, ZLENGTH_ERROR);
+ abortScanLab(signal, scanptr, ZLENGTH_ERROR);
return;
}//Dbtc::scanAttrinfoLab()
-void Dbtc::diFcountReqLab(Signal* signal)
+void Dbtc::diFcountReqLab(Signal* signal, ScanRecordPtr scanptr)
{
/**
* Check so that the table is not being dropped
@@ -8685,7 +8625,8 @@ void Dbtc::diFcountReqLab(Signal* signal)
if (tabPtr.p->checkTable(scanptr.p->scanSchemaVersion)){
;
} else {
- abortScanLab(signal, tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion));
+ abortScanLab(signal, scanptr,
+ tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion));
return;
}
@@ -8717,6 +8658,7 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
apiConnectptr.i = tcConnectptr.p->apiConnect;
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
+ ScanRecordPtr scanptr;
scanptr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_FRAGMENT_COUNT);
@@ -8728,7 +8670,7 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal)
}//if
if (tfragCount == 0) {
jam();
- abortScanLab(signal, ZNO_FRAGMENT_ERROR);
+ abortScanLab(signal, scanptr, ZNO_FRAGMENT_ERROR);
return;
}//if
@@ -8741,19 +8683,22 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal)
if (tabPtr.p->checkTable(scanptr.p->scanSchemaVersion)){
;
} else {
- abortScanLab(signal, tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion));
+ abortScanLab(signal, scanptr,
+ tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion));
return;
}
if(scanptr.p->scanParallel > tfragCount){
jam();
- abortScanLab(signal, ZTOO_HIGH_CONCURRENCY_ERROR);
+ abortScanLab(signal, scanptr, ZTOO_HIGH_CONCURRENCY_ERROR);
return;
}
scanptr.p->scanParallel = tfragCount;
scanptr.p->scanNoFrag = tfragCount;
scanptr.p->scanNextFragId = 0;
+ scanptr.p->scanState = ScanRecord::RUNNING;
+
setApiConTimer(apiConnectptr.i, 0, __LINE__);
updateBuddyTimer(apiConnectptr);
@@ -8768,6 +8713,7 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal)
scanptr.p->scanTableref, scanptr.p->scanNextFragId);
#endif
+ ptr.p->lqhBlockref = 0;
ptr.p->startFragTimer(ctcTimer);
ptr.p->scanFragId = scanptr.p->scanNextFragId++;
ptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
@@ -8792,6 +8738,7 @@ void Dbtc::execDI_FCOUNTREF(Signal* signal)
const Uint32 errCode = signal->theData[1];
apiConnectptr.i = tcConnectptr.p->apiConnect;
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
+ ScanRecordPtr scanptr;
scanptr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_FRAGMENT_COUNT);
@@ -8801,10 +8748,10 @@ void Dbtc::execDI_FCOUNTREF(Signal* signal)
handleApiFailState(signal, apiConnectptr.i);
return;
}//if
- abortScanLab(signal, errCode);
+ abortScanLab(signal, scanptr, errCode);
}//Dbtc::execDI_FCOUNTREF()
-void Dbtc::abortScanLab(Signal* signal, Uint32 errCode)
+void Dbtc::abortScanLab(Signal* signal, ScanRecordPtr scanptr, Uint32 errCode)
{
scanTabRefLab(signal, errCode);
releaseScanResources(scanptr);
@@ -8835,6 +8782,7 @@ void Dbtc::releaseScanResources(ScanRecordPtr scanPtr)
scanPtr.p->nextScan = cfirstfreeScanrec;
scanPtr.p->scanState = ScanRecord::IDLE;
scanPtr.p->scanTcrec = RNIL;
+ scanPtr.p->scanApiRec = RNIL;
cfirstfreeScanrec = scanPtr.i;
apiConnectptr.p->apiScanRec = RNIL;
@@ -8862,7 +8810,8 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF);
scanFragptr.p->stopFragTimer();
-
+
+ ScanRecordPtr scanptr;
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
@@ -8876,7 +8825,12 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
Uint32 schemaVersion = scanptr.p->scanSchemaVersion;
if(tabPtr.p->checkTable(schemaVersion) == false){
jam();
- scanFragError(signal, tabPtr.p->getErrorCode(schemaVersion));
+ ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
+ ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
+
+ run.remove(scanFragptr);
+ comp.add(scanFragptr);
+ scanError(signal, scanptr, tabPtr.p->getErrorCode(schemaVersion));
return;
}
}
@@ -8908,7 +8862,7 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
Uint32 ref = calcLqhBlockRef(tnodeid);
scanFragptr.p->lqhBlockref = ref;
scanFragptr.p->m_connectCount = getNodeInfo(tnodeid).m_connectCount;
- sendScanFragReq(signal);
+ sendScanFragReq(signal, scanptr.p, scanFragptr.p);
attrbufptr.i = cachePtr.p->firstAttrbuf;
while (attrbufptr.i != RNIL) {
jam();
@@ -8943,7 +8897,18 @@ void Dbtc::execDIGETPRIMREF(Signal* signal)
const Uint32 errCode = signal->theData[2];
c_scan_frag_pool.getPtr(scanFragptr);
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF);
- scanFragError(signal, errCode);
+
+ ScanRecordPtr scanptr;
+ scanptr.i = scanFragptr.p->scanRec;
+ ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
+
+ ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
+ ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
+
+ run.remove(scanFragptr);
+ comp.add(scanFragptr);
+
+ scanError(signal, scanptr, errCode);
}//Dbtc::execDIGETPRIMREF()
/**
@@ -8962,6 +8927,7 @@ void Dbtc::execSCAN_FRAGREF(Signal* signal)
scanFragptr.i = ref->senderData;
c_scan_frag_pool.getPtr(scanFragptr);
+ ScanRecordPtr scanptr;
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
@@ -8981,42 +8947,64 @@ void Dbtc::execSCAN_FRAGREF(Signal* signal)
* stop fragment timer and call scanFragError to start
* close of the other fragment scans
*/
- scanFragError(signal, errCode);
+ ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
+ {
+ scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
+ ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
+ ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
+
+ run.remove(scanFragptr);
+ comp.add(scanFragptr);
+ scanFragptr.p->stopFragTimer();
+ }
+ scanError(signal, scanptr, errCode);
}//Dbtc::execSCAN_FRAGREF()
/**
- * Dbtc::scanFragError
+ * Dbtc::scanError
*
* Called when an error occurs during
- * a scan of a fragment.
- * NOTE that one scan may consist of several fragment scans.
- *
*/
-void Dbtc::scanFragError(Signal* signal, Uint32 errorCode)
+void Dbtc::scanError(Signal* signal, ScanRecordPtr scanptr, Uint32 errorCode)
{
jam();
- scanptr.i = scanFragptr.p->scanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
- DEBUG("scanFragError, errorCode = "<< errorCode
- << ", scanState = " << scanptr.p->scanState);
+ ScanRecord* scanP = scanptr.p;
+
+ DEBUG("scanError, errorCode = "<< errorCode <<
+ ", scanState = " << scanptr.p->scanState);
- scanFragptr.p->stopFragTimer();
-#if JONAS_NOT_DONE
+ if(scanP->scanState == ScanRecord::CLOSING_SCAN){
+ jam();
+ close_scan_req_send_conf(signal, scanptr);
+ return;
+ }
- apiConnectptr.i = scanptr.p->scanApiRec;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
+ ndbrequire(scanP->scanState == ScanRecord::RUNNING);
- // If close of the scan is not already started
- if (scanptr.p->scanState != ScanRecord::CLOSING_SCAN) {
+ apiConnectptr.i = scanP->scanApiRec;
+ ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
+ ndbrequire(apiConnectptr.p->apiScanRec == scanptr.i);
+
+ /**
+ * Close scan wo/ having received an order to do so
+ */
+ close_scan_req(signal, scanptr, false);
+
+ const bool apiFail = (apiConnectptr.p->apiFailState == ZTRUE);
+ if(apiFail){
jam();
- apiConnectptr.p->returncode = errorCode;
-
- scanCompletedLab(signal);
return;
- }//if
-#endif
-}//Dbtc::scanFragError()
-
+ }
+
+ ScanTabRef * ref = (ScanTabRef*)&signal->theData[0];
+ ref->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
+ ref->transId1 = apiConnectptr.p->transid[0];
+ ref->transId2 = apiConnectptr.p->transid[1];
+ ref->errorCode = errorCode;
+ ref->closeNeeded = 1;
+ sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABREF,
+ signal, ScanTabRef::SignalLength, JBB);
+}//Dbtc::scanError()
/************************************************************
* execSCAN_FRAGCONF
@@ -9034,6 +9022,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
scanFragptr.i = conf->senderData;
c_scan_frag_pool.getPtr(scanFragptr);
+ ScanRecordPtr scanptr;
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
@@ -9051,32 +9040,34 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
const Uint32 status = conf->fragmentCompleted;
- scanFragptr.p->stopFragTimer();
-
+
+ DEBUG(apiConnectptr.i << " " << scanFragptr.i <<
+ " execSCAN_FRAGCONF() status: " << status
+ << " ops: " << noCompletedOps << " from: " << refToNode(signal->getSendersBlockRef()));
+
if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
+ jam();
if(status == ZFALSE){
/**
- * Dont deliver to api, but instead close in LQH
- * Dont need to mess with queues
+ * We have started closing = we sent a close -> ignore this
*/
- ndbout_c("running -> running(close)");
-
- jam();
- ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0];
- nextReq->senderData = scanFragptr.i;
- nextReq->closeFlag = ZTRUE;
- nextReq->transId1 = apiConnectptr.p->transid[0];
- nextReq->transId2 = apiConnectptr.p->transid[1];
- sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
- ScanFragNextReq::SignalLength, JBB);
+ DEBUG(apiConnectptr.i << " " << scanFragptr.i <<
+ " Received SCANFRAG_CONF wo/ close when in "
+ " CLOSING_SCAN:" << status << " " << noCompletedOps);
return;
} else {
jam();
+ DEBUG(apiConnectptr.i << " " << scanFragptr.i
+ << " Received SCANFRAG_CONF w/ close when in "
+ " CLOSING_SCAN:" << status << " " << noCompletedOps);
+
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr);
comp.add(scanFragptr);
+ scanFragptr.p->stopFragTimer();
+ scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
}
close_scan_req_send_conf(signal, scanptr);
return;
@@ -9126,7 +9117,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
if(scanptr.p->m_queued_count > /** Min */ 0){
jam();
- sendScanTabConf(signal);
+ sendScanTabConf(signal, scanptr.p);
}
}//Dbtc::execSCAN_FRAGCONF()
@@ -9164,6 +9155,7 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
ref->transId1 = transid1;
ref->transId2 = transid2;
ref->errorCode = ZSTATE_ERROR;
+ ref->closeNeeded = 0;
sendSignal(signal->senderBlockRef(), GSN_SCAN_TABREF,
signal, ScanTabRef::SignalLength, JBB);
DEBUG("Wrong transid");
@@ -9188,6 +9180,7 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
}
DEBUG("scanTabRefLab: ZSTATE_ERROR");
DEBUG(" apiConnectstate="<<apiConnectptr.p->apiConnectstate);
+ ndbrequire(false); //B2 indication of strange things going on
scanTabRefLab(signal, ZSTATE_ERROR);
return;
}//if
@@ -9197,6 +9190,7 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
********************************************************/
// Stop the timer that is used to check for timeout in the API
setApiConTimer(apiConnectptr.i, 0, __LINE__);
+ ScanRecordPtr scanptr;
scanptr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
ScanRecord* scanP = scanptr.p;
@@ -9209,10 +9203,21 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
* APPLICATION IS CLOSING THE SCAN.
**********************************************************************/
ndbrequire(len == 0);
- close_scan_req(signal, scanptr);
+ close_scan_req(signal, scanptr, true);
return;
}//if
+ if (scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
+ jam();
+ /**
+ * The scan is closing (typically due to error)
+ * but the API hasn't understood it yet
+ *
+ * Wait for API close request
+ */
+ return;
+ }
+
// Copy op ptrs so I dont overwrite them when sending...
memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len);
@@ -9243,26 +9248,25 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
}//Dbtc::execSCAN_NEXTREQ()
void
-Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){
+Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
#ifdef VM_TRACE
ndbout_c("%d close_scan_req", apiConnectptr.i);
#endif
ScanRecord* scanP = scanPtr.p;
scanPtr.p->scanState = ScanRecord::CLOSING_SCAN;
+ scanPtr.p->m_close_scan_req = req_received;
/**
- * Queue : Action
- * ========== : =================
- * completed : -
- * running : -
- * delivered : close -> LQH
- * queued w/ : close -> LQH
- * queued wo/ : move to completed
+ * Queue : Action
+ * ============= : =================
+ * completed : -
+ * running : close -> LQH
+ * delivered w/ : close -> LQH
+ * delivered wo/ : move to completed
+ * queued w/ : close -> LQH
+ * queued wo/ : move to completed
*/
- /**
- * All delivered should to be closed
- */
ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0];
nextReq->closeFlag = ZTRUE;
nextReq->transId1 = apiConnectptr.p->transid[0];
@@ -9273,6 +9277,28 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
+ ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
+
+ // Close running
+ for(running.first(ptr); !ptr.isNull(); ){
+ ScanFragRecPtr curr = ptr; // Remove while iterating...
+ running.next(ptr);
+
+ if(curr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF){
+ jam();
+ continue;
+ }
+ ndbrequire(curr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
+
+ curr.p->startFragTimer(ctcTimer);
+ curr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
+ nextReq->senderData = curr.i;
+ sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
+ ScanFragNextReq::SignalLength, JBB);
+ ndbout_c("%d running -> closing", curr.i);
+ }
+
+ // Close delivered
for(delivered.first(ptr); !ptr.isNull(); ){
jam();
ScanFragRecPtr curr = ptr; // Remove while iterating...
@@ -9290,20 +9316,19 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){
sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
- ndbout_c("delivered -> running");
+ ndbout_c("%d delivered -> closing (%d)", curr.i, curr.p->m_ops);
} else {
jam();
completed.add(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
- ndbout_c("delivered -> completed");
+ ndbout_c("%d delivered -> completed", curr.i);
}
}//for
/**
* All queued with data should be closed
*/
- ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
for(queued.first(ptr); !ptr.isNull(); ){
jam();
ndbrequire(ptr.p->scanFragState == ScanFragRec::QUEUED_FOR_DELIVERY);
@@ -9322,32 +9347,59 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr){
sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
- ndbout_c("queued -> running");
+ ndbout_c("%d queued -> closing", curr.i);
} else {
jam();
completed.add(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
- ndbout_c("queued -> completed");
+ ndbout_c("%d queued -> completed", curr.i);
}
}
}
- close_scan_req_send_conf(signal, scanptr);
+ close_scan_req_send_conf(signal, scanPtr);
}
void
Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){
jam();
+
ndbrequire(scanPtr.p->m_queued_scan_frags.isEmpty());
ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty());
+ //ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty());
+
+#if 1
+ {
+ ScanFragList comp(c_scan_frag_pool, scanPtr.p->m_completed_scan_frags);
+ ScanFragRecPtr ptr;
+ for(comp.first(ptr); !ptr.isNull(); comp.next(ptr)){
+ ndbrequire(ptr.p->scanFragTimer == 0);
+ ndbrequire(ptr.p->scanFragState == ScanFragRec::COMPLETED);
+ }
+ }
+#endif
+
if(!scanPtr.p->m_running_scan_frags.isEmpty()){
jam();
+
+ ndbout_c("%d close_scan_req_send_conf: not ready", apiConnectptr.i);
return;
}
-
+
const bool apiFail = (apiConnectptr.p->apiFailState == ZTRUE);
+ if(!scanPtr.p->m_close_scan_req){
+ jam();
+ /**
+ * The API hasn't order closing yet
+ */
+ ndbout_c("%d close_scan_req_send_conf: api not ready", apiConnectptr.i);
+ return;
+ }
+
+ ndbout_c("%d close_scan_req_send_conf: ready", apiConnectptr.i);
+
if(!apiFail){
jam();
Uint32 ref = apiConnectptr.p->ndbapiBlockref;
@@ -9370,53 +9422,58 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){
}
}
-void Dbtc::seizeScanrec(Signal* signal) {
+Dbtc::ScanRecordPtr
+Dbtc::seizeScanrec(Signal* signal) {
+ ScanRecordPtr scanptr;
scanptr.i = cfirstfreeScanrec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
cfirstfreeScanrec = scanptr.p->nextScan;
scanptr.p->nextScan = RNIL;
ndbrequire(scanptr.p->scanState == ScanRecord::IDLE);
+ return scanptr;
}//Dbtc::seizeScanrec()
-void Dbtc::sendScanFragReq(Signal* signal) {
+void Dbtc::sendScanFragReq(Signal* signal,
+ ScanRecord* scanP,
+ ScanFragRec* scanFragP){
Uint32 requestInfo = 0;
- ScanFragReq::setConcurrency(requestInfo, scanFragptr.p->scanFragConcurrency);
- ScanFragReq::setLockMode(requestInfo, scanptr.p->scanLockMode);
- ScanFragReq::setHoldLockFlag(requestInfo, scanptr.p->scanLockHold);
- if(scanptr.p->scanLockMode == 1){ // Not read -> keyinfo
+ ScanFragReq::setConcurrency(requestInfo, scanFragP->scanFragConcurrency);
+ ScanFragReq::setLockMode(requestInfo, scanP->scanLockMode);
+ ScanFragReq::setHoldLockFlag(requestInfo, scanP->scanLockHold);
+ if(scanP->scanLockMode == 1){ // Not read -> keyinfo
jam();
ScanFragReq::setKeyinfoFlag(requestInfo, 1);
}
- ScanFragReq::setReadCommittedFlag(requestInfo, scanptr.p->readCommitted);
- ScanFragReq::setRangeScanFlag(requestInfo, scanptr.p->rangeScan);
- ScanFragReq::setAttrLen(requestInfo, scanptr.p->scanAiLength);
+ ScanFragReq::setReadCommittedFlag(requestInfo, scanP->readCommitted);
+ ScanFragReq::setRangeScanFlag(requestInfo, scanP->rangeScan);
+ ScanFragReq::setAttrLen(requestInfo, scanP->scanAiLength);
ScanFragReq::setScanPrio(requestInfo, 1);
- apiConnectptr.i = scanptr.p->scanApiRec;
+ apiConnectptr.i = scanP->scanApiRec;
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
ScanFragReq * const req = (ScanFragReq *)&signal->theData[0];
req->senderData = scanFragptr.i;
req->resultRef = apiConnectptr.p->ndbapiBlockref;
req->requestInfo = requestInfo;
req->savePointId = apiConnectptr.p->currSavePointId;
- req->tableId = scanptr.p->scanTableref;
- req->fragmentNo = scanFragptr.p->scanFragId;
- req->schemaVersion = scanptr.p->scanSchemaVersion;
+ req->tableId = scanP->scanTableref;
+ req->fragmentNo = scanFragP->scanFragId;
+ req->schemaVersion = scanP->scanSchemaVersion;
req->transId1 = apiConnectptr.p->transid[0];
req->transId2 = apiConnectptr.p->transid[1];
for(int i = 0; i<16; i++){
- req->clientOpPtr[i] = scanFragptr.p->m_apiPtr;
+ req->clientOpPtr[i] = scanFragP->m_apiPtr;
}
- sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_FRAGREQ, signal, 25, JBB);
+ sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, 25, JBB);
updateBuddyTimer(apiConnectptr);
- scanFragptr.p->startFragTimer(ctcTimer);
+ scanFragP->startFragTimer(ctcTimer);
}//Dbtc::sendScanFragReq()
-void Dbtc::sendScanTabConf(Signal* signal) {
+void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
jam();
Uint32* ops = signal->getDataPtrSend()+4;
- Uint32 op_count = scanptr.p->m_queued_count;
+ Uint32 op_count = scanP->m_queued_count;
if(4 + 3 * op_count > 25){
jam();
ops += 21;
@@ -9428,7 +9485,6 @@ void Dbtc::sendScanTabConf(Signal* signal) {
conf->transId1 = apiConnectptr.p->transid[0];
conf->transId2 = apiConnectptr.p->transid[1];
ScanFragRecPtr ptr;
- ScanRecord* scanP = scanptr.p;
ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
@@ -9466,7 +9522,7 @@ void Dbtc::sendScanTabConf(Signal* signal) {
sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABCONF, signal,
ScanTabConf::SignalLength + 3 * op_count, JBB);
}
- scanptr.p->m_queued_count = 0;
+ scanP->m_queued_count = 0;
}//Dbtc::sendScanTabConf()
@@ -9715,12 +9771,14 @@ void Dbtc::initialiseRecordsLab(Signal* signal, UintR Tdata0,
/* ========================================================================= */
void Dbtc::initialiseScanrec(Signal* signal)
{
+ ScanRecordPtr scanptr;
ndbrequire(cscanrecFileSize > 0);
for (scanptr.i = 0; scanptr.i < cscanrecFileSize; scanptr.i++) {
jam();
ptrAss(scanptr, scanRecord);
new (scanptr.p) ScanRecord();
scanptr.p->scanState = ScanRecord::IDLE;
+ scanptr.p->scanApiRec = RNIL;
scanptr.p->nextScan = scanptr.i + 1;
}//for
scanptr.i = cscanrecFileSize - 1;
@@ -11496,7 +11554,8 @@ void Dbtc::readIndexTable(Signal* signal,
Uint32 transId1 = indexOp->tcIndxReq->transId1;
Uint32 transId2 = indexOp->tcIndxReq->transId2;
- const Uint8 opType = TcKeyReq::getOperationType(tcKeyRequestInfo);
+ const Operation_t opType =
+ (Operation_t)TcKeyReq::getOperationType(tcKeyRequestInfo);
// Find index table
if ((indexData = c_theIndexes.getPtr(indexOp->tcIndxReq->indexId)) == NULL) {
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp b/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp
index cd5057d8a62..f7d55d0acc9 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp
@@ -54,6 +54,9 @@ void Dbtup::execSEND_PACKED(Signal* signal)
void Dbtup::bufferTRANSID_AI(Signal* signal, BlockReference aRef,
Uint32 Tlen)
{
+ if(Tlen == 3)
+ return;
+
Uint32 hostId = refToNode(aRef);
Uint32 Theader = ((refToBlock(aRef) << 16)+(Tlen-3));
diff --git a/ndb/src/ndbapi/NdbConnectionScan.cpp b/ndb/src/ndbapi/NdbConnectionScan.cpp
index ea45f2b5a00..d405dedc09f 100644
--- a/ndb/src/ndbapi/NdbConnectionScan.cpp
+++ b/ndb/src/ndbapi/NdbConnectionScan.cpp
@@ -61,7 +61,14 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){
const ScanTabRef * ref = CAST_CONSTPTR(ScanTabRef, aSignal->getDataPtr());
if(checkState_TransId(&ref->transId1)){
- theScanningOp->execCLOSE_SCAN_REP(ref->errorCode);
+ theScanningOp->theError.code = ref->errorCode;
+ if(!ref->closeNeeded){
+ theScanningOp->execCLOSE_SCAN_REP();
+ return 0;
+ }
+ assert(theScanningOp->m_sent_receivers_count);
+ theScanningOp->m_sent_receivers_count--;
+ theScanningOp->m_conf_receivers_count++;
return 0;
}
return -1;
@@ -88,11 +95,10 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
if(checkState_TransId(&conf->transId1)){
if (conf->requestInfo == ScanTabConf::EndOfData) {
- theScanningOp->execCLOSE_SCAN_REP(0);
+ theScanningOp->execCLOSE_SCAN_REP();
return 0;
}
- int noComp = -1;
for(Uint32 i = 0; i<len; i += 3){
Uint32 ptrI = * ops++;
Uint32 tcPtrI = * ops++;
@@ -108,15 +114,13 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
/**
*
*/
- noComp++;
theScanningOp->receiver_delivered(tOp);
} else if(info == ScanTabConf::EndOfData){
- noComp++;
theScanningOp->receiver_completed(tOp);
}
}
}
- return noComp;
+ return 0;
}
return -1;
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index 7cbf35ab4fd..2f0bd82044c 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -124,7 +124,7 @@ NdbScanOperation::init(NdbTableImpl* tab, NdbConnection* myConnection)
theTotalBoundAI_Len = 0;
theBoundATTRINFO = NULL;
-
+
return 0;
}
@@ -135,6 +135,8 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
m_ordered = 0;
Uint32 fragCount = m_currentTable->m_fragmentCount;
+ ndbout_c("batch: %d parallell: %d fragCount: %d",
+ batch, parallell, fragCount);
if(batch + parallell == 0){ // Max speed
batch = 16;
@@ -153,6 +155,9 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
parallell = fragCount;
else if(parallell == 0)
parallell = fragCount;
+
+ ndbout_c("batch: %d parallell: %d fragCount: %d",
+ batch, parallell, fragCount);
assert(parallell > 0);
@@ -486,6 +491,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
last = m_api_receivers_count;
do {
+ if(theError.code){
+ setErrorCode(theError.code);
+ return -1;
+ }
+
Uint32 cnt = m_conf_receivers_count;
Uint32 sent = m_sent_receivers_count;
@@ -502,12 +512,17 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
*/
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
+ ndbout_c("%d : api: %d conf: %d sent: %d",
+ __LINE__,
+ m_api_receivers_count,
+ m_conf_receivers_count,
+ m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
} else {
idx = last;
- retVal = -1; //return_code;
+ retVal = -2; //return_code;
}
} else if(retVal == 2){
/**
@@ -516,6 +531,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
if(send_next_scan(0, true) == 0){ // Close scan
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
+ ndbout_c("%d : api: %d conf: %d sent: %d",
+ __LINE__,
+ m_api_receivers_count,
+ m_conf_receivers_count,
+ m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
return 1;
@@ -633,6 +653,12 @@ NdbScanOperation::doSend(int ProcessorId)
void NdbScanOperation::closeScan()
{
+ ndbout_c("closeScan %d : api: %d conf: %d sent: %d",
+ __LINE__,
+ m_api_receivers_count,
+ m_conf_receivers_count,
+ m_sent_receivers_count);
+
do {
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
@@ -651,6 +677,11 @@ void NdbScanOperation::closeScan()
while(m_sent_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
+ ndbout_c("%d : api: %d conf: %d sent: %d",
+ __LINE__,
+ m_api_receivers_count,
+ m_conf_receivers_count,
+ m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
@@ -679,6 +710,11 @@ void NdbScanOperation::closeScan()
do {
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
+ ndbout_c("%d : api: %d conf: %d sent: %d",
+ __LINE__,
+ m_api_receivers_count,
+ m_conf_receivers_count,
+ m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
@@ -701,22 +737,7 @@ void NdbScanOperation::closeScan()
}
void
-NdbScanOperation::execCLOSE_SCAN_REP(Uint32 errCode){
- /**
- * We will receive no further signals from this scan
- */
- if(!errCode){
- /**
- * Normal termination
- */
- theNdbCon->theCommitStatus = NdbConnection::Committed;
- theNdbCon->theCompletionStatus = NdbConnection::CompletedSuccess;
- } else {
- /**
- * Something is fishy
- */
- abort();
- }
+NdbScanOperation::execCLOSE_SCAN_REP(){
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
@@ -1206,7 +1227,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(u_idx)){
Uint32 tmp = m_sent_receivers_count;
- while(m_sent_receivers_count > 0){
+ while(m_sent_receivers_count > 0 && !theError.code){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@@ -1223,12 +1244,16 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
+ if(theError.code){
+ setErrorCode(theError.code);
+ return -1;
+ }
}
} else {
return 2;
}
}
-
+
if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
u_idx, u_last,
s_idx, s_last);
@@ -1279,9 +1304,12 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
- if(seq == tp->getNodeSequence(nodeId) && send_next_scan(0, true) == 0){
+ if(seq == tp->getNodeSequence(nodeId) &&
+ send_next_scan(0, true) == 0 &&
+ theError.code == 0){
return 1;
}
+ setErrorCode(theError.code);
return -1;
}
diff --git a/ndb/src/ndbapi/Ndbif.cpp b/ndb/src/ndbapi/Ndbif.cpp
index f7d537dafa5..92723431860 100644
--- a/ndb/src/ndbapi/Ndbif.cpp
+++ b/ndb/src/ndbapi/Ndbif.cpp
@@ -705,23 +705,25 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
{
tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
+
+ tCon = void2con(tFirstDataPtr);
+
+ assert(tFirstDataPtr != 0 &&
+ void2con(tFirstDataPtr)->checkMagicNumber() == 0);
- if (tWaitState == WAIT_SCAN){
- tCon = void2con(tFirstDataPtr);
- if (tCon->checkMagicNumber() == 0){
- tReturnCode = tCon->receiveSCAN_TABREF(aSignal);
- if (tReturnCode != -1){
- theWaiter.m_state = NO_WAIT;
- }
- break;
+ if (tCon->checkMagicNumber() == 0){
+ tReturnCode = tCon->receiveSCAN_TABREF(aSignal);
+ if (tReturnCode != -1){
+ theWaiter.m_state = NO_WAIT;
}
+ break;
}
goto InvalidSignal;
- }
+ }
case GSN_SCAN_TABINFO:
- {
- goto InvalidSignal;
- }
+ {
+ goto InvalidSignal;
+ }
case GSN_KEYINFO20: {
tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;