diff options
-rw-r--r-- | storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 287 | ||||
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp | 37 | ||||
-rw-r--r-- | storage/ndb/src/kernel/blocks/pgman.cpp | 53 | ||||
-rw-r--r-- | storage/ndb/src/kernel/blocks/pgman.hpp | 20 |
4 files changed, 282 insertions, 115 deletions
diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 2b452e9529b..d031f9a00bf 100644 --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -1394,6 +1394,7 @@ void Dblqh::execTUP_ADD_ATTCONF(Signal* signal) if (! DictTabInfo::isOrderedIndex(addfragptr.p->tableType)) { fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED; + //fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY; fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION; } else @@ -2470,6 +2471,8 @@ void Dblqh::execTUPKEYCONF(Signal* signal) jamEntry(); tcConnectptr.i = tcIndex; ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec); + TcConnectionrec * regTcPtr = tcConnectptr.p; + Uint32 activeCreat = regTcPtr->activeCreat; FragrecordPtr regFragptr; regFragptr.i = tcConnectptr.p->fragmentptr; @@ -2497,6 +2500,32 @@ void Dblqh::execTUPKEYCONF(Signal* signal) // Abort was not ready to start until this signal came back. Now we are ready // to start the abort. /* ------------------------------------------------------------------------- */ + if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) + { + jam(); + ndbrequire(regTcPtr->m_nr_delete.m_cnt); + regTcPtr->m_nr_delete.m_cnt--; + if (regTcPtr->m_nr_delete.m_cnt) + { + jam(); + /** + * Let operation wait for pending NR operations + * even for before writing log...(as it's simpler) + */ + +#ifdef VM_TRACE + /** + * Only disk table can have pending ops... + */ + TablerecPtr tablePtr; + tablePtr.i = regTcPtr->tableref; + ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); + ndbrequire(tablePtr.p->m_disk_table); +#endif + return; + } + } + abortCommonLab(signal); break; case TcConnectionrec::WAIT_ACC_ABORT: @@ -2523,13 +2552,23 @@ void Dblqh::execTUPKEYREF(Signal* signal) tcConnectptr.i = tupKeyRef->userRef; terrorCode = tupKeyRef->errorCode; ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); + TcConnectionrec* regTcPtr = tcConnectptr.p; + Uint32 activeCreat = regTcPtr->activeCreat; FragrecordPtr regFragptr; - regFragptr.i = tcConnectptr.p->fragmentptr; + regFragptr.i = regTcPtr->fragmentptr; c_fragment_pool.getPtr(regFragptr); fragptr = regFragptr; + + if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) + { + jam(); + ndbrequire(regTcPtr->m_nr_delete.m_cnt); + regTcPtr->m_nr_delete.m_cnt--; + ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP || + regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT); + } - TcConnectionrec* regTcPtr = tcConnectptr.p; switch (tcConnectptr.p->transactionState) { case TcConnectionrec::WAIT_TUP: jam(); @@ -3767,7 +3806,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); jamEntry(); - execACC_ABORTCONF(signal); + packLqhkeyreqLab(signal); } } @@ -3890,16 +3929,17 @@ Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr) if (TRACENR_FLAG) TRACENR(" performing DELETE key: " << dst[0] << endl); - regTcPtr.p->tupkeyData[0] = regTcPtr.p->m_row_id.ref(); - if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr) - { - regTcPtr.p->hashValue = calculateHash(tableId, dst); - } - else + + nr_copy_delete_row(signal, regTcPtr, ®TcPtr.p->m_row_id, len); + ndbassert(regTcPtr.p->m_nr_delete.m_cnt); + regTcPtr.p->m_nr_delete.m_cnt--; // No real op is run + if (regTcPtr.p->m_nr_delete.m_cnt) { - regTcPtr.p->hashValue = md5_hash((Uint64*)dst, len); + jam(); + return; } - goto run; + packLqhkeyreqLab(signal); + return; } else if (len == 0 && op == ZDELETE) { @@ -3993,9 +4033,7 @@ update_gci_ignore: signal->theData[0] = regTcPtr.p->tupConnectrec; EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); - regTcPtr.p->transactionState = TcConnectionrec::WAIT_ACC_ABORT; - signal->theData[0] = regTcPtr.i; - execACC_ABORTCONF(signal); + packLqhkeyreqLab(signal); } int @@ -4149,7 +4187,6 @@ Dblqh::get_nr_op_info(Nr_op_info* op, Uint32 page_id) op->m_gci = tcPtr.p->gci; op->m_tup_frag_ptr_i = fragPtr.p->tupFragptr; - ndbrequire(tcPtr.p->transactionState == TcConnectionrec::WAIT_TUP_COMMIT); ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY); ndbrequire(tcPtr.p->m_nr_delete.m_cnt); @@ -4194,16 +4231,36 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op) tcPtr.i = op->m_ptr_i; ptrCheckGuard(tcPtr, ctcConnectrecFileSize, tcConnectionrec); - ndbrequire(tcPtr.p->transactionState == TcConnectionrec::WAIT_TUP_COMMIT); ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY); ndbrequire(tcPtr.p->m_nr_delete.m_cnt); tcPtr.p->m_nr_delete.m_cnt--; if (tcPtr.p->m_nr_delete.m_cnt == 0) { + jam(); tcConnectptr = tcPtr; c_fragment_pool.getPtr(fragptr, tcPtr.p->fragmentptr); - packLqhkeyreqLab(signal); + + if (tcPtr.p->abortState != TcConnectionrec::ABORT_IDLE) + { + jam(); + tcPtr.p->activeCreat = Fragrecord::AC_NORMAL; + abortCommonLab(signal); + } + else if (tcPtr.p->operation == ZDELETE && + LqhKeyReq::getNrCopyFlag(tcPtr.p->reqinfo)) + { + /** + * This is run directly in handle_nr_copy + */ + jam(); + packLqhkeyreqLab(signal); + } + else + { + jam(); + rwConcludedLab(signal); + } return; } @@ -4319,7 +4376,6 @@ void Dblqh::execACCKEYCONF(Signal* signal) return; }//if - // reset the activeCreat since that is only valid in cases where the record was not present. /* ------------------------------------------------------------------------ * IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE * INFORMATION ON WHICH TABLE AND FRAGMENT, THE LOCAL KEY AND IT NEEDS TO @@ -4536,6 +4592,7 @@ Dblqh::acckeyconf_load_diskpage(Signal* signal, TcConnectionrecPtr regTcPtr, } else { + regTcPtr.p->transactionState = TcConnectionrec::WAIT_TUP; TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr(); ref->userRef= regTcPtr.i; ref->errorCode= ~0; @@ -4571,6 +4628,7 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal, } else { + regTcPtr->transactionState = TcConnectionrec::WAIT_TUP; TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr(); ref->userRef= callbackData; ref->errorCode= disk_page; @@ -4592,9 +4650,11 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal, * -------------------------------------------------------------------------- */ void Dblqh::tupkeyConfLab(Signal* signal) { -/* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED ---- */ +/* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED --- */ const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0]; TcConnectionrec * const regTcPtr = tcConnectptr.p; + Uint32 activeCreat = regTcPtr->activeCreat; + if (regTcPtr->simpleRead) { jam(); /* ---------------------------------------------------------------------- @@ -4616,6 +4676,34 @@ void Dblqh::tupkeyConfLab(Signal* signal) }//if regTcPtr->totSendlenAi = tupKeyConf->writeLength; ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen); + + if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) + { + jam(); + ndbrequire(regTcPtr->m_nr_delete.m_cnt); + regTcPtr->m_nr_delete.m_cnt--; + if (regTcPtr->m_nr_delete.m_cnt) + { + jam(); + /** + * Let operation wait for pending NR operations + * even for before writing log...(as it's simpler) + */ + +#ifdef VM_TRACE + /** + * Only disk table can have pending ops... + */ + TablerecPtr tablePtr; + tablePtr.i = regTcPtr->tableref; + ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); + ndbrequire(tablePtr.p->m_disk_table); +#endif + + return; + } + } + rwConcludedLab(signal); return; }//Dblqh::tupkeyConfLab() @@ -6325,27 +6413,19 @@ Dblqh::tupcommit_conf(Signal* signal, /*SEND ANY COMMIT OR COMPLETE MESSAGES TO OTHER NODES. THEY WILL MERELY SEND */ /*THOSE SIGNALS INTERNALLY. */ /* ------------------------------------------------------------------------- */ - if (tcPtrP->abortState == TcConnectionrec::ABORT_IDLE) { + if (tcPtrP->abortState == TcConnectionrec::ABORT_IDLE) + { jam(); - if (activeCreat == Fragrecord::AC_NR_COPY && - tcPtrP->m_nr_delete.m_cnt > 1) + if (activeCreat == Fragrecord::AC_NR_COPY) { jam(); - /** - * Nr delete waiting for disk delete to complete... - */ -#ifdef VM_TRACE - TablerecPtr tablePtr; - tablePtr.i = tcPtrP->tableref; - ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); - ndbrequire(tablePtr.p->m_disk_table); -#endif - tcPtrP->m_nr_delete.m_cnt--; - tcPtrP->transactionState = TcConnectionrec::WAIT_TUP_COMMIT; - return; + ndbrequire(LqhKeyReq::getNrCopyFlag(tcPtrP->reqinfo)); + ndbrequire(tcPtrP->m_nr_delete.m_cnt == 0); } packLqhkeyreqLab(signal); - } else { + } + else + { ndbrequire(tcPtrP->abortState != TcConnectionrec::NEW_FROM_TC); jam(); sendLqhTransconf(signal, LqhTransConf::Committed); @@ -6549,7 +6629,7 @@ void Dblqh::execABORT(Signal* signal) }//if TcConnectionrec * const regTcPtr = tcConnectptr.p; - + Uint32 activeCreat = regTcPtr->activeCreat; if (ERROR_INSERTED(5100)) { SET_ERROR_INSERT_VALUE(5101); @@ -6574,10 +6654,10 @@ void Dblqh::execABORT(Signal* signal) sendSignal(TLqhRef, GSN_ABORT, signal, 4, JBB); }//if regTcPtr->abortState = TcConnectionrec::ABORT_FROM_TC; - regTcPtr->activeCreat = Fragrecord::AC_NORMAL; const Uint32 commitAckMarker = regTcPtr->commitAckMarker; - if(commitAckMarker != RNIL){ + if(commitAckMarker != RNIL) + { jam(); #ifdef MARKER_TRACE { @@ -6627,6 +6707,7 @@ void Dblqh::execABORTREQ(Signal* signal) return; }//if TcConnectionrec * const regTcPtr = tcConnectptr.p; + Uint32 activeCreat = regTcPtr->activeCreat; if (regTcPtr->transactionState != TcConnectionrec::PREPARED) { warningReport(signal, 10); return; @@ -6634,7 +6715,7 @@ void Dblqh::execABORTREQ(Signal* signal) regTcPtr->reqBlockref = reqBlockref; regTcPtr->reqRef = reqPtr; regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC; - regTcPtr->activeCreat = Fragrecord::AC_NORMAL; + abortCommonLab(signal); return; }//Dblqh::execABORTREQ() @@ -6704,42 +6785,26 @@ void Dblqh::execACCKEYREF(Signal* signal) } - if (tcPtr->activeCreat == Fragrecord::AC_NR_COPY) - { - jam(); - Uint32 op = tcPtr->operation; - switch(errCode){ - case ZNO_TUPLE_FOUND: - ndbrequire(op == ZDELETE); - break; - break; - default: - ndbrequire(false); - } - tcPtr->activeCreat = Fragrecord::AC_IGNORED; - } - else - { - ndbrequire(!LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo)); - - /** - * Only primary replica can get ZTUPLE_ALREADY_EXIST || ZNO_TUPLE_FOUND - * - * Unless it's a simple or dirty read - * - * NOT TRUE! - * 1) op1 - primary insert ok - * 2) op1 - backup insert fail (log full or what ever) - * 3) op1 - delete ok @ primary - * 4) op1 - delete fail @ backup - * - * -> ZNO_TUPLE_FOUND is possible - */ - ndbrequire - (tcPtr->seqNoReplica == 0 || - errCode != ZTUPLE_ALREADY_EXIST || - (tcPtr->operation == ZREAD && (tcPtr->dirtyOp || tcPtr->opSimple))); - } + ndbrequire(tcPtr->activeCreat == Fragrecord::AC_NORMAL); + ndbrequire(!LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo)); + + /** + * Only primary replica can get ZTUPLE_ALREADY_EXIST || ZNO_TUPLE_FOUND + * + * Unless it's a simple or dirty read + * + * NOT TRUE! + * 1) op1 - primary insert ok + * 2) op1 - backup insert fail (log full or what ever) + * 3) op1 - delete ok @ primary + * 4) op1 - delete fail @ backup + * + * -> ZNO_TUPLE_FOUND is possible + */ + ndbrequire + (tcPtr->seqNoReplica == 0 || + errCode != ZTUPLE_ALREADY_EXIST || + (tcPtr->operation == ZREAD && (tcPtr->dirtyOp || tcPtr->opSimple))); tcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; abortCommonLab(signal); @@ -6753,7 +6818,6 @@ void Dblqh::localAbortStateHandlerLab(Signal* signal) jam(); return; }//if - regTcPtr->activeCreat = Fragrecord::AC_NORMAL; regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; regTcPtr->errorCode = terrorCode; abortStateHandlerLab(signal); @@ -6929,11 +6993,6 @@ void Dblqh::abortErrorLab(Signal* signal) regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; regTcPtr->errorCode = terrorCode; }//if - /* ----------------------------------------------------------------------- - * ACTIVE CREATION IS RESET FOR ALL ERRORS WHICH SHOULD BE HANDLED - * WITH NORMAL ABORT HANDLING. - * ----------------------------------------------------------------------- */ - regTcPtr->activeCreat = Fragrecord::AC_NORMAL; abortCommonLab(signal); return; }//Dblqh::abortErrorLab() @@ -6942,8 +7001,9 @@ void Dblqh::abortCommonLab(Signal* signal) { TcConnectionrec * const regTcPtr = tcConnectptr.p; const Uint32 commitAckMarker = regTcPtr->commitAckMarker; - if(regTcPtr->activeCreat != Fragrecord::AC_IGNORED && - commitAckMarker != RNIL){ + const Uint32 activeCreat = regTcPtr->activeCreat; + if (commitAckMarker != RNIL) + { /** * There is no NR ongoing and we have a marker */ @@ -6958,6 +7018,29 @@ void Dblqh::abortCommonLab(Signal* signal) m_commitAckMarkerHash.release(commitAckMarker); regTcPtr->commitAckMarker = RNIL; } + + if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) + { + jam(); + if (regTcPtr->m_nr_delete.m_cnt) + { + jam(); + /** + * Let operation wait for pending NR operations + */ + +#ifdef VM_TRACE + /** + * Only disk table can have pending ops... + */ + TablerecPtr tablePtr; + tablePtr.i = regTcPtr->tableref; + ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec); + ndbrequire(tablePtr.p->m_disk_table); +#endif + return; + } + } fragptr.i = regTcPtr->fragmentptr; if (fragptr.i != RNIL) { @@ -7034,25 +7117,6 @@ void Dblqh::execACC_ABORTCONF(Signal* signal) ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); TcConnectionrec * const regTcPtr = tcConnectptr.p; ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT); - if (regTcPtr->activeCreat == Fragrecord::AC_IGNORED) { - /* ---------------------------------------------------------------------- - * A NORMAL EVENT DURING CREATION OF A FRAGMENT. WE NOW NEED TO CONTINUE - * WITH NORMAL COMMIT PROCESSING. - * --------------------------------------------------------------------- */ - if (regTcPtr->currTupAiLen == regTcPtr->totReclenAi) { - jam(); - regTcPtr->abortState = TcConnectionrec::ABORT_IDLE; - fragptr.i = regTcPtr->fragmentptr; - c_fragment_pool.getPtr(fragptr); - rwConcludedLab(signal); - return; - } else { - ndbrequire(regTcPtr->currTupAiLen < regTcPtr->totReclenAi); - jam(); - regTcPtr->transactionState = TcConnectionrec::WAIT_AI_AFTER_ABORT; - return; - }//if - }//if continueAbortLab(signal); return; }//Dblqh::execACC_ABORTCONF() @@ -9450,7 +9514,7 @@ void Dblqh::initScanTc(const ScanFragReq* req, tcConnectptr.p->m_offset_current_keybuf = 0; tcConnectptr.p->m_scan_curr_range_no = 0; tcConnectptr.p->m_dealloc = 0; - + tcConnectptr.p->activeCreat = Fragrecord::AC_NORMAL; TablerecPtr tTablePtr; tTablePtr.i = tabptr.p->primaryTableId; ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec); @@ -9929,16 +9993,21 @@ void Dblqh::continueFirstCopyAfterBlockedLab(Signal* signal) */ fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY; - if (0) + scanptr.i = tcConnectptr.p->tcScanRec; + c_scanRecordPool.getPtr(scanptr); + + if (false && fragptr.p->tabRef > 4) { - ndbout_c("STOPPING COPY (%d -> %d %d %d)", - scanptr.p->scanBlockref, + ndbout_c("STOPPING COPY X = [ %d %d %d %d ]", + refToBlock(scanptr.p->scanBlockref), scanptr.p->scanAccPtr, RNIL, NextScanReq::ZSCAN_NEXT); + + /** + * RESTART: > DUMP 7020 332 X + */ return; } - scanptr.i = tcConnectptr.p->tcScanRec; - c_scanRecordPool.getPtr(scanptr); signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[1] = RNIL; signal->theData[2] = NextScanReq::ZSCAN_NEXT; @@ -18351,6 +18420,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) << " tcBlockref = " << hex << tcRec.p->tcBlockref << " reqBlockref = " << hex << tcRec.p->reqBlockref << " primKeyLen = " << tcRec.p->primKeyLen + << " nrcopyflag = " << LqhKeyReq::getNrCopyFlag(tcRec.p->reqinfo) << endl; ndbout << " nextReplica = " << tcRec.p->nextReplica << " tcBlockref = " << hex << tcRec.p->tcBlockref @@ -18421,6 +18491,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) << endl; ndbout << " tupkeyData2 = " << tcRec.p->tupkeyData[2] << " tupkeyData3 = " << tcRec.p->tupkeyData[3] + << " m_nr_delete.m_cnt = " << tcRec.p->m_nr_delete.m_cnt << endl; switch (tcRec.p->transactionState) { diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp index b4e79729399..5f970072a19 100644 --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp @@ -483,6 +483,14 @@ Dbtup::load_diskpage(Signal* signal, req.m_callback.m_callbackData= opRec; req.m_callback.m_callbackFunction= safe_cast(&Dbtup::disk_page_load_callback); + +#ifdef ERROR_INSERTED + if (ERROR_INSERTED(4022)) + { + flags |= Page_cache_client::DELAY_REQ; + req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000; + } +#endif if((res= m_pgman.get_page(signal, req, flags)) > 0) { @@ -3119,6 +3127,35 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData, preq.m_callback.m_callbackFunction = safe_cast(&Dbtup::nr_delete_page_callback); int flags = Page_cache_client::COMMIT_REQ; + +#ifdef ERROR_INSERT + if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024)) + { + int rnd = rand() % 100; + int slp = 0; + if (ERROR_INSERTED(4024)) + { + slp = 3000; + } + else if (rnd > 90) + { + slp = 3000; + } + else if (rnd > 70) + { + slp = 100; + } + + ndbout_c("rnd: %d slp: %d", rnd, slp); + + if (slp) + { + flags |= Page_cache_client::DELAY_REQ; + preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp; + } + } +#endif + res = m_pgman.get_page(signal, preq, flags); if (res == 0) { diff --git a/storage/ndb/src/kernel/blocks/pgman.cpp b/storage/ndb/src/kernel/blocks/pgman.cpp index be661433ef6..75a63f9d76e 100644 --- a/storage/ndb/src/kernel/blocks/pgman.cpp +++ b/storage/ndb/src/kernel/blocks/pgman.cpp @@ -944,12 +944,16 @@ Pgman::process_callback(Signal* signal) int max_count = 1; Page_sublist& pl_callback = *m_page_sublist[Page_entry::SL_CALLBACK]; - while (! pl_callback.isEmpty() && --max_count >= 0) + Ptr<Page_entry> ptr; + pl_callback.first(ptr); + + while (! ptr.isNull() && --max_count >= 0) { jam(); - Ptr<Page_entry> ptr; - pl_callback.first(ptr); - if (! process_callback(signal, ptr)) + Ptr<Page_entry> curr = ptr; + pl_callback.next(ptr); + + if (! process_callback(signal, curr)) { jam(); break; @@ -987,6 +991,18 @@ Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr) #ifdef VM_TRACE debugOut << "PGMAN: " << req_ptr << " : process_callback" << endl; #endif + +#ifdef ERROR_INSERT + if (req_ptr.p->m_flags & Page_request::DELAY_REQ) + { + Uint64 now = NdbTick_CurrentMillisecond(); + if (now < req_ptr.p->m_delay_until_time) + { + break; + } + } +#endif + b = globalData.getBlock(req_ptr.p->m_block); callback = req_ptr.p->m_callback; @@ -1314,6 +1330,24 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr) state |= Page_entry::MAPPED; set_page_state(ptr, state); + { + /** + * Update lsn record on page + * as it can be modified/flushed wo/ update_lsn has been called + * (e.g. prealloc) and it then would get lsn 0, which is bad + * when running undo and following SR + */ + Ptr<GlobalPage> pagePtr; + m_global_page_pool.getPtr(pagePtr, ptr.p->m_real_page_i); + File_formats::Datafile::Data_page* page = + (File_formats::Datafile::Data_page*)pagePtr.p; + + Uint64 lsn = 0; + lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32; + lsn += page->m_page_header.m_page_lsn_lo; + ptr.p->m_lsn = lsn; + } + ndbrequire(m_stats.m_current_io_waits > 0); m_stats.m_current_io_waits--; @@ -1576,6 +1610,12 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req) bool only_request = ptr.p->m_requests.isEmpty(); + if (req_flags & Page_request::DELAY_REQ) + { + jam(); + only_request = false; + } + if (only_request && state & Page_entry::MAPPED) { @@ -1623,7 +1663,10 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req) req_ptr.p->m_block = page_req.m_block; req_ptr.p->m_flags = page_req.m_flags; req_ptr.p->m_callback = page_req.m_callback; - +#ifdef ERROR_INSERT + req_ptr.p->m_delay_until_time = page_req.m_delay_until_time; +#endif + state |= Page_entry::REQUEST; if (only_request && req_flags & Page_request::EMPTY_PAGE) { diff --git a/storage/ndb/src/kernel/blocks/pgman.hpp b/storage/ndb/src/kernel/blocks/pgman.hpp index 5a6a5f319bd..ae7736025ab 100644 --- a/storage/ndb/src/kernel/blocks/pgman.hpp +++ b/storage/ndb/src/kernel/blocks/pgman.hpp @@ -256,12 +256,18 @@ private: ,DIRTY_REQ = 0x0200 // make page dirty wo/ update_lsn ,UNLOCK_PAGE = 0x0400 ,CORR_REQ = 0x0800 // correlated request (no LIRS update) +#ifdef ERROR_INSERT + ,DELAY_REQ = 0x1000 // Force request to be delayed +#endif }; - + Uint16 m_block; Uint16 m_flags; SimulatedBlock::Callback m_callback; +#ifdef ERROR_INSERT + Uint64 m_delay_until_time; +#endif Uint32 nextList; Uint32 m_magic; }; @@ -508,6 +514,10 @@ public: struct Request { Local_key m_page; SimulatedBlock::Callback m_callback; + +#ifdef ERROR_INSERT + Uint64 m_delay_until_time; +#endif }; Ptr<GlobalPage> m_ptr; // TODO remove @@ -520,6 +530,9 @@ public: ,DIRTY_REQ = Pgman::Page_request::DIRTY_REQ ,UNLOCK_PAGE = Pgman::Page_request::UNLOCK_PAGE ,CORR_REQ = Pgman::Page_request::CORR_REQ +#ifdef ERROR_INSERT + ,DELAY_REQ = Pgman::Page_request::DELAY_REQ +#endif }; /** @@ -588,7 +601,10 @@ Page_cache_client::get_page(Signal* signal, Request& req, Uint32 flags) page_req.m_block = m_block; page_req.m_flags = flags; page_req.m_callback = req.m_callback; - +#ifdef ERROR_INSERT + page_req.m_delay_until_time = req.m_delay_until_time; +#endif + int i = m_pgman->get_page(signal, entry_ptr, page_req); if (i > 0) { |