summaryrefslogtreecommitdiff
path: root/ndb/src/kernel/blocks
diff options
context:
space:
mode:
authorunknown <joreland@mysql.com>2005-04-14 13:43:07 +0200
committerunknown <joreland@mysql.com>2005-04-14 13:43:07 +0200
commit5a1af9ed8cf04343423da8a3189e3c3414ba1c3d (patch)
treee6122c99ce2984db7d84af3d8e2040c7533b1afd /ndb/src/kernel/blocks
parent06c001ecaa5abdee84ebe36d5db84e5062ead688 (diff)
downloadmariadb-git-5a1af9ed8cf04343423da8a3189e3c3414ba1c3d.tar.gz
BUG#9891 - ndb lcp
Crash if ACC_CONTOPREQ was sent while ACC_LCPCONF was in job buffer if ACC_LCPCONF would have arrived eariler (before TUP_LCPSTARTED) operations could lockup. But would be restarted on next LCP -- LQH 1) Better check for LCP started that will also return true if ACC or TUP already has completed 2) Remove incorrect if statement that prevented operations to be started if ACC has completed -- ACC Make sure all ACC_CONTOPCONF are sent before releasing lcp record i.e. use noOfLcpConf == 4 (2 ACC_LCPCONF + 2 ACC_CONTOPCONF) Check for == 4 also when sending ACC_CONTOPCONF ndb/src/kernel/blocks/dbacc/DbaccMain.cpp: Make sure all ACC_CONTOPCONF are sent before releasing lcp record i.e. use noOfLcpConf == 4 (2 ACC_LCPCONF + 2 ACC_CONTOPCONF) Check for == 4 also when sending ACC_CONTOPCONF ndb/src/kernel/blocks/dblqh/Dblqh.hpp: Remove LCP_STARTED state ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: 1) Better check for LCP started that will also return true if ACC or TUP already has completed 2) Remove incorrect if statement that prevented operations to be started if ACC has completed
Diffstat (limited to 'ndb/src/kernel/blocks')
-rw-r--r--ndb/src/kernel/blocks/dbacc/DbaccMain.cpp20
-rw-r--r--ndb/src/kernel/blocks/dblqh/Dblqh.hpp3
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhMain.cpp66
3 files changed, 44 insertions, 45 deletions
diff --git a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
index cdb9091da42..d566639489c 100644
--- a/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
+++ b/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
@@ -8486,7 +8486,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal)
break;
}//switch
lcpConnectptr.p->noOfLcpConf++;
- ndbrequire(lcpConnectptr.p->noOfLcpConf <= 2);
+ ndbrequire(lcpConnectptr.p->noOfLcpConf <= 4);
fragrecptr.p->fragState = ACTIVEFRAG;
rlpPageptr.i = fragrecptr.p->zeroPagePtr;
ptrCheckGuard(rlpPageptr, cpagesize, page8);
@@ -8504,7 +8504,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal)
}//for
signal->theData[0] = fragrecptr.p->lcpLqhPtr;
sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPCONF, signal, 1, JBB);
- if (lcpConnectptr.p->noOfLcpConf == 2) {
+ if (lcpConnectptr.p->noOfLcpConf == 4) {
jam();
releaseLcpConnectRec(signal);
rootfragrecptr.i = fragrecptr.p->myroot;
@@ -8535,6 +8535,13 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal)
/* LOCAL FRAG ID */
tresult = 0;
ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
+ if(ERROR_INSERTED(3002) && lcpConnectptr.p->noOfLcpConf < 2)
+ {
+ sendSignalWithDelay(cownBlockref, GSN_ACC_CONTOPREQ, signal, 300,
+ signal->getLength());
+ return;
+ }
+
ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
@@ -8568,6 +8575,15 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal)
}//while
signal->theData[0] = fragrecptr.p->lcpLqhPtr;
sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_CONTOPCONF, signal, 1, JBA);
+
+ lcpConnectptr.p->noOfLcpConf++;
+ if (lcpConnectptr.p->noOfLcpConf == 4) {
+ jam();
+ releaseLcpConnectRec(signal);
+ rootfragrecptr.i = fragrecptr.p->myroot;
+ ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
+ rootfragrecptr.p->rootState = ACTIVEROOT;
+ }//if
return; /* ALL QUEUED OPERATION ARE RESTARTED IF NEEDED. */
}//Dbacc::execACC_CONTOPREQ()
diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
index 0c63cb5fe17..19e055a3011 100644
--- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
+++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
@@ -968,7 +968,6 @@ public:
enum LcpState {
LCP_IDLE = 0,
- LCP_STARTED = 1,
LCP_COMPLETED = 2,
LCP_WAIT_FRAGID = 3,
LCP_WAIT_TUP_PREPLCP = 4,
@@ -2266,7 +2265,7 @@ private:
void sendCopyActiveConf(Signal* signal,Uint32 tableId);
void checkLcpCompleted(Signal* signal);
void checkLcpHoldop(Signal* signal);
- void checkLcpStarted(Signal* signal);
+ bool checkLcpStarted(Signal* signal);
void checkLcpTupprep(Signal* signal);
void getNextFragForLcp(Signal* signal);
void initLcpLocAcc(Signal* signal, Uint32 fragId);
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index c79f4dfc6c7..27f995750b6 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -10351,8 +10351,8 @@ void Dblqh::execTUP_LCPSTARTED(Signal* signal)
void Dblqh::lcpStartedLab(Signal* signal)
{
- checkLcpStarted(signal);
- if (lcpPtr.p->lcpState == LcpRecord::LCP_STARTED) {
+ if (checkLcpStarted(signal))
+ {
jam();
/* ----------------------------------------------------------------------
* THE LOCAL CHECKPOINT HAS BEEN STARTED. IT IS NOW TIME TO
@@ -10432,26 +10432,7 @@ void Dblqh::execLQH_RESTART_OP(Signal* signal)
lcpPtr.i = signal->theData[1];
ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
ndbrequire(fragptr.p->fragStatus == Fragrecord::BLOCKED);
- if (lcpPtr.p->lcpState == LcpRecord::LCP_STARTED) {
- jam();
- /***********************************************************************/
- /* THIS SIGNAL CAN ONLY BE RECEIVED WHEN FRAGMENT IS BLOCKED AND
- * THE LOCAL CHECKPOINT HAS BEEN STARTED. THE BLOCKING WILL BE
- * REMOVED AS SOON AS ALL OPERATIONS HAVE BEEN STARTED.
- ***********************************************************************/
- restartOperationsLab(signal);
- } else if (lcpPtr.p->lcpState == LcpRecord::LCP_BLOCKED_COMP) {
- jam();
- /*******************************************************************>
- * THE CHECKPOINT IS COMPLETED BUT HAS NOT YET STARTED UP
- * ALL OPERATIONS AGAIN.
- * WE PERFORM THIS START-UP BEFORE CONTINUING WITH THE NEXT
- * FRAGMENT OF THE LOCAL CHECKPOINT TO AVOID ANY STRANGE ERRORS.
- *******************************************************************> */
- restartOperationsLab(signal);
- } else {
- ndbrequire(false);
- }
+ restartOperationsLab(signal);
}//Dblqh::execLQH_RESTART_OP()
void Dblqh::restartOperationsLab(Signal* signal)
@@ -11000,7 +10981,8 @@ void Dblqh::checkLcpHoldop(Signal* signal)
*
* SUBROUTINE SHORT NAME = CLS
* ========================================================================== */
-void Dblqh::checkLcpStarted(Signal* signal)
+bool
+Dblqh::checkLcpStarted(Signal* signal)
{
LcpLocRecordPtr clsLcpLocptr;
@@ -11010,7 +10992,7 @@ void Dblqh::checkLcpStarted(Signal* signal)
do {
ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_WAIT_STARTED){
- return;
+ return false;
}//if
clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc;
i++;
@@ -11021,12 +11003,13 @@ void Dblqh::checkLcpStarted(Signal* signal)
do {
ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::TUP_WAIT_STARTED){
- return;
+ return false;
}//if
clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc;
i++;
} while (clsLcpLocptr.i != RNIL);
- lcpPtr.p->lcpState = LcpRecord::LCP_STARTED;
+
+ return true;
}//Dblqh::checkLcpStarted()
/* ==========================================================================
@@ -11187,20 +11170,12 @@ void Dblqh::sendAccContOp(Signal* signal)
do {
ptrCheckGuard(sacLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
sacLcpLocptr.p->accContCounter = 0;
- if(sacLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_STARTED){
- /* ------------------------------------------------------------------- */
- /*SEND START OPERATIONS TO ACC AGAIN */
- /* ------------------------------------------------------------------- */
- signal->theData[0] = lcpPtr.p->lcpAccptr;
- signal->theData[1] = sacLcpLocptr.p->locFragid;
- sendSignal(fragptr.p->accBlockref, GSN_ACC_CONTOPREQ, signal, 2, JBA);
- count++;
- } else if(sacLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_COMPLETED){
- signal->theData[0] = sacLcpLocptr.i;
- sendSignal(reference(), GSN_ACC_CONTOPCONF, signal, 1, JBB);
- } else {
- ndbrequire(false);
- }
+ /* ------------------------------------------------------------------- */
+ /*SEND START OPERATIONS TO ACC AGAIN */
+ /* ------------------------------------------------------------------- */
+ signal->theData[0] = lcpPtr.p->lcpAccptr;
+ signal->theData[1] = sacLcpLocptr.p->locFragid;
+ sendSignal(fragptr.p->accBlockref, GSN_ACC_CONTOPREQ, signal, 2, JBA);
sacLcpLocptr.i = sacLcpLocptr.p->nextLcpLoc;
} while (sacLcpLocptr.i != RNIL);
@@ -11236,9 +11211,18 @@ void Dblqh::sendStartLcp(Signal* signal)
signal->theData[0] = stlLcpLocptr.i;
signal->theData[1] = cownref;
signal->theData[2] = stlLcpLocptr.p->tupRef;
- sendSignal(fragptr.p->tupBlockref, GSN_TUP_LCPREQ, signal, 3, JBA);
+ if(ERROR_INSERTED(5077))
+ sendSignalWithDelay(fragptr.p->tupBlockref, GSN_TUP_LCPREQ,
+ signal, 5000, 3);
+ else
+ sendSignal(fragptr.p->tupBlockref, GSN_TUP_LCPREQ, signal, 3, JBA);
stlLcpLocptr.i = stlLcpLocptr.p->nextLcpLoc;
} while (stlLcpLocptr.i != RNIL);
+
+ if(ERROR_INSERTED(5077))
+ {
+ ndbout_c("Delayed TUP_LCPREQ with 5 sec");
+ }
}//Dblqh::sendStartLcp()
/* ------------------------------------------------------------------------- */