summaryrefslogtreecommitdiff
path: root/storage/ndb/src/kernel/blocks/dbtup/DbtupLCP.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/kernel/blocks/dbtup/DbtupLCP.cpp')
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupLCP.cpp596
1 files changed, 596 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupLCP.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupLCP.cpp
new file mode 100644
index 00000000000..370ef4c4ba5
--- /dev/null
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupLCP.cpp
@@ -0,0 +1,596 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#define DBTUP_C
+#include "Dbtup.hpp"
+#include <RefConvert.hpp>
+#include <ndb_limits.h>
+#include <pc.hpp>
+#include <signaldata/FsConf.hpp>
+#include <signaldata/FsRef.hpp>
+
+#define ljam() { jamLine(10000 + __LINE__); }
+#define ljamEntry() { jamEntryLine(10000 + __LINE__); }
+
+/* ---------------------------------------------------------------- */
+/* ---------------------------------------------------------------- */
+/* -------------------- LOCAL CHECKPOINT MODULE ------------------- */
+/* ---------------------------------------------------------------- */
+/* ---------------------------------------------------------------- */
+void Dbtup::execTUP_PREPLCPREQ(Signal* signal)
+{
+ CheckpointInfoPtr ciPtr;
+ DiskBufferSegmentInfoPtr dbsiPtr;
+ FragrecordPtr regFragPtr;
+ LocalLogInfoPtr lliPtr;
+ TablerecPtr regTabPtr;
+
+ ljamEntry();
+ Uint32 userptr = signal->theData[0];
+ BlockReference userblockref = signal->theData[1];
+ regTabPtr.i = signal->theData[2];
+ ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
+ Uint32 fragId = signal->theData[3];
+ Uint32 checkpointNumber = signal->theData[4];
+ cundoFileVersion = signal->theData[5];
+
+ getFragmentrec(regFragPtr, fragId, regTabPtr.p);
+ ndbrequire(regTabPtr.i != RNIL);
+ seizeCheckpointInfoRecord(ciPtr);
+
+ lliPtr.i = (cundoFileVersion << 2) + (regTabPtr.i & 0x3);
+ ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
+ cnoOfDataPagesToDiskWithoutSynch = 0;
+
+ ciPtr.p->lcpDataFileHandle = RNIL;
+ ciPtr.p->lcpCheckpointVersion = checkpointNumber;
+ ciPtr.p->lcpLocalLogInfoP = lliPtr.i;
+ ciPtr.p->lcpFragmentP = regFragPtr.i; /* SET THE FRAGMENT */
+ ciPtr.p->lcpFragmentId = fragId; /* SAVE THE FRAGMENT IDENTITY */
+ ciPtr.p->lcpTabPtr = regTabPtr.i; /* SET THE TABLE POINTER */
+ ciPtr.p->lcpBlockref = userblockref; /* SET THE BLOCK REFERENCE */
+ ciPtr.p->lcpUserptr = userptr; /* SET THE USERPOINTER */
+
+ /***************************************************************/
+ /* OPEN THE UNDO FILE FOR WRITE */
+ /* UPON FSOPENCONF */
+ /***************************************************************/
+ if (lliPtr.p->lliActiveLcp == 0) { /* IS THE UNDO LOG FILE OPEN? */
+ PendingFileOpenInfoPtr undoPfoiPtr;
+ UndoPagePtr regUndoPagePtr;
+
+ ljam();
+ lliPtr.p->lliPrevRecordId = 0;
+ lliPtr.p->lliLogFilePage = 0;
+ lliPtr.p->lliUndoPagesToDiskWithoutSynch = 0;
+ lliPtr.p->lliUndoWord = ZUNDO_PAGE_HEADER_SIZE;
+
+ seizeUndoBufferSegment(signal, regUndoPagePtr);
+ seizeDiskBufferSegmentRecord(dbsiPtr);
+ dbsiPtr.p->pdxBuffertype = UNDO_PAGES;
+ for (Uint32 i = 0; i < ZUB_SEGMENT_SIZE; i++) {
+ dbsiPtr.p->pdxDataPage[i] = regUndoPagePtr.i + i;
+ }//for
+ dbsiPtr.p->pdxFilePage = lliPtr.p->lliLogFilePage;
+ lliPtr.p->lliUndoPage = regUndoPagePtr.i;
+ lliPtr.p->lliUndoBufferSegmentP = dbsiPtr.i;
+ /* F LEVEL NOT USED */
+ Uint32 fileType = 1; /* VERSION */
+ fileType = (fileType << 8) | 2; /* .LOCLOG */
+ fileType = (fileType << 8) | 6; /* D6 */
+ fileType = (fileType << 8) | 0xff; /* DON'T USE P DIRECTORY LEVEL */
+ Uint32 fileFlag = 0x301; /* CREATE, WRITE ONLY, TRUNCATE */
+
+ seizePendingFileOpenInfoRecord(undoPfoiPtr);
+ undoPfoiPtr.p->pfoOpenType = LCP_UNDO_FILE_WRITE;
+ undoPfoiPtr.p->pfoCheckpointInfoP = ciPtr.i;
+
+ signal->theData[0] = cownref;
+ signal->theData[1] = undoPfoiPtr.i;
+ signal->theData[2] = lliPtr.i;
+ signal->theData[3] = 0xFFFFFFFF;
+ signal->theData[4] = cundoFileVersion;
+ signal->theData[5] = fileType;
+ signal->theData[6] = fileFlag;
+ sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
+ }//if
+ /***************************************************************/
+ /* OPEN THE DATA FILE FOR WRITE */
+ /* THE FILE HANDLE WILL BE SET IN THE CHECKPOINT_INFO_RECORD */
+ /* UPON FSOPENCONF */
+ /***************************************************************/
+ /* OPEN THE DATA FILE IN THE FOLLOWING FORM */
+ /* D5/DBTUP/T<TABID>/F<FRAGID>/S<CHECKPOINT_NUMBER>.DATA */
+
+ PendingFileOpenInfoPtr dataPfoiPtr;
+
+ Uint32 fileType = 1; /* VERSION */
+ fileType = (fileType << 8) | 0; /* .DATA */
+ fileType = (fileType << 8) | 5; /* D5 */
+ fileType = (fileType << 8) | 0xff; /* DON'T USE P DIRECTORY LEVEL */
+ Uint32 fileFlag = 0x301; /* CREATE, WRITE ONLY, TRUNCATE */
+
+ seizePendingFileOpenInfoRecord(dataPfoiPtr); /* SEIZE A NEW FILE OPEN INFO */
+ if (lliPtr.p->lliActiveLcp == 0) {
+ ljam();
+ dataPfoiPtr.p->pfoOpenType = LCP_DATA_FILE_WRITE_WITH_UNDO;
+ } else {
+ ljam();
+ dataPfoiPtr.p->pfoOpenType = LCP_DATA_FILE_WRITE;
+ }//if
+ dataPfoiPtr.p->pfoCheckpointInfoP = ciPtr.i;
+
+ /* LET'S OPEN THE DATA FILE FOR WRITE */
+ /* INCREASE NUMBER OF ACTIVE CHECKPOINTS */
+ lliPtr.p->lliActiveLcp = 1;
+ signal->theData[0] = cownref;
+ signal->theData[1] = dataPfoiPtr.i;
+ signal->theData[2] = ciPtr.p->lcpTabPtr;
+ signal->theData[3] = ciPtr.p->lcpFragmentId;
+ signal->theData[4] = ciPtr.p->lcpCheckpointVersion;
+ signal->theData[5] = fileType;
+ signal->theData[6] = fileFlag;
+ sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
+ return;
+}//Dbtup::execTUP_PREPLCPREQ()
+
+/* ---------------------------------------------------------------- */
+/* ------------------------ START CHECKPOINT --------------------- */
+/* ---------------------------------------------------------------- */
+void Dbtup::execTUP_LCPREQ(Signal* signal)
+{
+ CheckpointInfoPtr ciPtr;
+ DiskBufferSegmentInfoPtr dbsiPtr;
+ FragrecordPtr regFragPtr;
+ LocalLogInfoPtr lliPtr;
+
+ ljamEntry();
+// Uint32 userptr = signal->theData[0];
+// BlockReference userblockref = signal->theData[1];
+ ciPtr.i = signal->theData[2];
+
+ ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo);
+ regFragPtr.i = ciPtr.p->lcpFragmentP;
+ ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord);
+
+/* ---------------------------------------------------------------- */
+/* ASSIGNING A VALUE DIFFERENT FROM RNIL TO CHECKPOINT VERSION*/
+/* TRIGGERS THAT UNDO LOGGING WILL START FOR THIS FRAGMENT. */
+/* WE ASSIGN IT THE POINTER TO THE CHECKPOINT RECORD FOR */
+/* OPTIMISATION OF THE WRITING OF THE UNDO LOG. */
+/* ---------------------------------------------------------------- */
+ regFragPtr.p->checkpointVersion = ciPtr.p->lcpLocalLogInfoP; /* MARK START OF UNDO LOGGING */
+
+ regFragPtr.p->maxPageWrittenInCheckpoint = getNoOfPages(regFragPtr.p);
+ regFragPtr.p->minPageNotWrittenInCheckpoint = 0;
+ ndbrequire(getNoOfPages(regFragPtr.p) > 0);
+ allocDataBufferSegment(signal, dbsiPtr);
+
+ dbsiPtr.p->pdxNumDataPages = 0;
+ dbsiPtr.p->pdxFilePage = 1;
+ ciPtr.p->lcpDataBufferSegmentP = dbsiPtr.i;
+ dbsiPtr.p->pdxCheckpointInfoP = ciPtr.i;
+ ciPtr.p->lcpNoOfPages = getNoOfPages(regFragPtr.p);
+ ciPtr.p->lcpNoCopyPagesAlloc = regFragPtr.p->noCopyPagesAlloc;
+ ciPtr.p->lcpEmptyPrimPage = regFragPtr.p->emptyPrimPage;
+ ciPtr.p->lcpThFreeFirst = regFragPtr.p->thFreeFirst;
+ ciPtr.p->lcpThFreeCopyFirst = regFragPtr.p->thFreeCopyFirst;
+ lliPtr.i = ciPtr.p->lcpLocalLogInfoP;
+ ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
+/* ---------------------------------------------------------------- */
+/* --- PERFORM A COPY OF THE TABLE DESCRIPTOR FOR THIS FRAGMENT --- */
+/* ---------------------------------------------------------------- */
+ cprAddLogHeader(signal,
+ lliPtr.p,
+ ZTABLE_DESCRIPTOR,
+ ciPtr.p->lcpTabPtr,
+ ciPtr.p->lcpFragmentId);
+
+/* ---------------------------------------------------------------- */
+/* CONTINUE WITH SAVING ACTIVE OPERATIONS AFTER A REAL-TIME */
+/* BREAK. */
+/* ---------------------------------------------------------------- */
+ ciPtr.p->lcpTmpOperPtr = regFragPtr.p->firstusedOprec;
+ lcpSaveCopyListLab(signal, ciPtr);
+ return;
+}//Dbtup::execTUP_LCPREQ()
+
+void Dbtup::allocDataBufferSegment(Signal* signal, DiskBufferSegmentInfoPtr& dbsiPtr)
+{
+ UndoPagePtr regUndoPagePtr;
+
+ seizeDiskBufferSegmentRecord(dbsiPtr);
+ dbsiPtr.p->pdxBuffertype = COMMON_AREA_PAGES;
+ ndbrequire(cfirstfreeUndoSeg != RNIL);
+ if (cnoFreeUndoSeg == ZMIN_PAGE_LIMIT_TUP_COMMITREQ) {
+ EXECUTE_DIRECT(DBLQH, GSN_TUP_COM_BLOCK, signal, 1);
+ ljamEntry();
+ }//if
+ cnoFreeUndoSeg--;
+ ndbrequire(cnoFreeUndoSeg >= 0);
+
+ regUndoPagePtr.i = cfirstfreeUndoSeg;
+ ptrCheckGuard(regUndoPagePtr, cnoOfUndoPage, undoPage);
+ cfirstfreeUndoSeg = regUndoPagePtr.p->undoPageWord[ZPAGE_NEXT_POS];
+ regUndoPagePtr.p->undoPageWord[ZPAGE_NEXT_POS] = RNIL;
+ for (Uint32 i = 0; i < ZUB_SEGMENT_SIZE; i++) {
+ dbsiPtr.p->pdxDataPage[i] = regUndoPagePtr.i + i;
+ }//for
+}//Dbtup::allocDataBufferSegment()
+
+/* ---------------------------------------------------------------- */
+/* --- PERFORM A COPY OF THE ACTIVE OPERATIONS FOR THIS FRAGMENT -- */
+/* ---------------------------------------------------------------- */
+void Dbtup::lcpSaveCopyListLab(Signal* signal, CheckpointInfoPtr ciPtr)
+{
+ FragrecordPtr regFragPtr;
+ LocalLogInfoPtr lliPtr;
+ OperationrecPtr regOpPtr;
+
+ regFragPtr.i = ciPtr.p->lcpFragmentP;
+ ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord);
+ lliPtr.i = ciPtr.p->lcpLocalLogInfoP;
+ ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
+ regOpPtr.i = ciPtr.p->lcpTmpOperPtr;
+
+/* -------------------------------------------------------------------------------- */
+/* TRAVERSE THE ENTIRE BLOCK OF OPERATIONS. CHECK IF THERE ARE EXISTING COPYS OF */
+/* TUPLES IN THE CHECKPOINTED FRAGMENT. SAVE THOSE IN A LIST IN THE FOLLOWING FORM: */
+/* */
+/* SOURCE PAGE */
+/* SOURCE INDEX */
+/* COPY PAGE */
+/* COPY INDEX */
+/* -------------------------------------------------------------------------------- */
+ Uint32 loopCount = 0;
+ while ((regOpPtr.i != RNIL) && (loopCount < 50)) {
+ ljam();
+ ptrCheckGuard(regOpPtr, cnoOfOprec, operationrec);
+ if (regOpPtr.p->realPageId != RNIL) {
+/* ---------------------------------------------------------------- */
+// We ensure that we have actually allocated the tuple header and
+// also found it. Otherwise we will fill the undo log with garbage.
+/* ---------------------------------------------------------------- */
+ if (regOpPtr.p->optype == ZUPDATE ||
+ (regOpPtr.p->optype == ZINSERT && regOpPtr.p->deleteInsertFlag)) {
+ ljam();
+ if (regOpPtr.p->realPageIdC != RNIL) {
+/* ---------------------------------------------------------------- */
+// We ensure that we have actually allocated the tuple header copy.
+// Otherwise we will fill the undo log with garbage.
+/* ---------------------------------------------------------------- */
+ cprAddLogHeader(signal,
+ lliPtr.p,
+ ZLCPR_ABORT_UPDATE,
+ ciPtr.p->lcpTabPtr,
+ ciPtr.p->lcpFragmentId);
+ cprAddAbortUpdate(signal, lliPtr.p, regOpPtr.p);
+ }//if
+ } else if (regOpPtr.p->optype == ZINSERT) {
+ ljam();
+ cprAddUndoLogRecord(signal,
+ ZLCPR_ABORT_INSERT,
+ regOpPtr.p->fragPageId,
+ regOpPtr.p->pageIndex,
+ regOpPtr.p->tableRef,
+ regOpPtr.p->fragId,
+ regFragPtr.p->checkpointVersion);
+ } else {
+ ndbrequire(regOpPtr.p->optype == ZDELETE);
+ ljam();
+ cprAddUndoLogRecord(signal,
+ ZINDICATE_NO_OP_ACTIVE,
+ regOpPtr.p->fragPageId,
+ regOpPtr.p->pageIndex,
+ regOpPtr.p->tableRef,
+ regOpPtr.p->fragId,
+ regFragPtr.p->checkpointVersion);
+ }//if
+ }//if
+ loopCount++;;
+ regOpPtr.i = regOpPtr.p->nextOprecInList;
+ }//while
+ if (regOpPtr.i == RNIL) {
+ ljam();
+
+ signal->theData[0] = ciPtr.p->lcpUserptr;
+ sendSignal(ciPtr.p->lcpBlockref, GSN_TUP_LCPSTARTED, signal, 1, JBA);
+
+ signal->theData[0] = ZCONT_SAVE_DP;
+ signal->theData[1] = ciPtr.i;
+ sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
+ } else {
+ ljam();
+ ciPtr.p->lcpTmpOperPtr = regOpPtr.i;
+ signal->theData[0] = ZCONT_START_SAVE_CL;
+ signal->theData[1] = ciPtr.i;
+ sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
+ }//if
+}//Dbtup::lcpSaveCopyListLab()
+
+/* ---------------------------------------------------------------- */
+/* ------- PERFORM A COPY OF ONE DATAPAGE DURING CHECKPOINT ------- */
+/* ---------------------------------------------------------------- */
+/* THE RANGE OF DATA PAGES IS INCLUDED IN THE CHECKPOINT_INFO_PTR */
+/* LAST_PAGE_TO_BUFFER ELEMENT IS INCREASED UNTIL ALL PAGES ARE */
+/* COPIED TO THE DISK BUFFER. WHEN A DISK BUFFER SEGMENT IS FULL */
+/* IT WILL BE WRITTEN TO DISK (TYPICALLY EACH 8:TH PAGE) */
+/* ---------------------------------------------------------------- */
+void Dbtup::lcpSaveDataPageLab(Signal* signal, Uint32 ciIndex)
+{
+ CheckpointInfoPtr ciPtr;
+ DiskBufferSegmentInfoPtr dbsiPtr;
+ FragrecordPtr regFragPtr;
+ LocalLogInfoPtr lliPtr;
+ UndoPagePtr undoCopyPagePtr;
+ PagePtr pagePtr;
+
+ ciPtr.i = ciIndex;
+ ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo);
+ if (ERROR_INSERTED(4000)){
+ if (ciPtr.p->lcpTabPtr == c_errorInsert4000TableId) {
+ // Delay writing of data pages during LCP
+ ndbout << "Delay writing of data pages during LCP" << endl;
+ signal->theData[0] = ZCONT_SAVE_DP;
+ signal->theData[1] = ciIndex;
+ sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 1000, 2);
+ return;
+ }//if
+ }//if
+ if (clblPageCounter == 0) {
+ ljam();
+ signal->theData[0] = ZCONT_SAVE_DP;
+ signal->theData[1] = ciPtr.i;
+ sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 100, 2);
+ return;
+ } else {
+ ljam();
+ clblPageCounter--;
+ }//if
+
+ regFragPtr.i = ciPtr.p->lcpFragmentP;
+ ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord);
+ dbsiPtr.i = ciPtr.p->lcpDataBufferSegmentP;
+ ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo);
+
+ pagePtr.i = getRealpid(regFragPtr.p, regFragPtr.p->minPageNotWrittenInCheckpoint);
+ ptrCheckGuard(pagePtr, cnoOfPage, page);
+ ndbrequire(dbsiPtr.p->pdxNumDataPages < 16);
+ undoCopyPagePtr.i = dbsiPtr.p->pdxDataPage[dbsiPtr.p->pdxNumDataPages];
+ ptrCheckGuard(undoCopyPagePtr, cnoOfUndoPage, undoPage);
+ MEMCOPY_NO_WORDS(&undoCopyPagePtr.p->undoPageWord[0],
+ &pagePtr.p->pageWord[0],
+ ZWORDS_ON_PAGE);
+ regFragPtr.p->minPageNotWrittenInCheckpoint++;
+ dbsiPtr.p->pdxNumDataPages++;
+ if (regFragPtr.p->minPageNotWrittenInCheckpoint == regFragPtr.p->maxPageWrittenInCheckpoint) {
+ /* ---------------------------------------------------------- */
+ /* ALL PAGES ARE COPIED, TIME TO FINISH THE CHECKPOINT */
+ /* SAVE THE END POSITIONS OF THE LOG RECORDS SINCE ALL DATA */
+ /* PAGES ARE NOW SAFE ON DISK AND NO MORE LOGGING WILL APPEAR */
+ /* ---------------------------------------------------------- */
+ ljam();
+ lliPtr.i = ciPtr.p->lcpLocalLogInfoP;
+ ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
+ regFragPtr.p->checkpointVersion = RNIL; /* UNDO LOGGING IS SHUT OFF */
+ lcpWriteListDataPageSegment(signal, dbsiPtr, ciPtr, false);
+ dbsiPtr.p->pdxOperation = CHECKPOINT_DATA_WRITE_LAST;
+ } else if (dbsiPtr.p->pdxNumDataPages == ZDB_SEGMENT_SIZE) {
+ ljam();
+ lcpWriteListDataPageSegment(signal, dbsiPtr, ciPtr, false);
+ dbsiPtr.p->pdxOperation = CHECKPOINT_DATA_WRITE;
+ } else {
+ ljam();
+ signal->theData[0] = ZCONT_SAVE_DP;
+ signal->theData[1] = ciPtr.i;
+ sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
+ }//if
+}//Dbtup::lcpSaveDataPageLab()
+
+void Dbtup::lcpWriteListDataPageSegment(Signal* signal,
+ DiskBufferSegmentInfoPtr dbsiPtr,
+ CheckpointInfoPtr ciPtr,
+ bool flushFlag)
+{
+ Uint32 flags = 1;
+ cnoOfDataPagesToDiskWithoutSynch += dbsiPtr.p->pdxNumDataPages;
+ if ((cnoOfDataPagesToDiskWithoutSynch > MAX_PAGES_WITHOUT_SYNCH) ||
+ (flushFlag)) {
+ ljam();
+/* ---------------------------------------------------------------- */
+// To avoid synching too big chunks at a time we synch after writing
+// a certain number of data pages. (e.g. 2 MBytes).
+/* ---------------------------------------------------------------- */
+ cnoOfDataPagesToDiskWithoutSynch = 0;
+ flags |= 0x10; //Set synch flag unconditionally
+ }//if
+ signal->theData[0] = ciPtr.p->lcpDataFileHandle;
+ signal->theData[1] = cownref;
+ signal->theData[2] = dbsiPtr.i;
+ signal->theData[3] = flags;
+ signal->theData[4] = ZBASE_ADDR_UNDO_WORD;
+ signal->theData[5] = dbsiPtr.p->pdxNumDataPages;
+ signal->theData[6] = dbsiPtr.p->pdxDataPage[0];
+ signal->theData[7] = dbsiPtr.p->pdxFilePage;
+ sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 8, JBA);
+
+ dbsiPtr.p->pdxFilePage += dbsiPtr.p->pdxNumDataPages;
+ dbsiPtr.p->pdxNumDataPages = 0;
+}//Dbtup::lcpWriteListDataPageSegment()
+
+void Dbtup::lcpFlushLogLab(Signal* signal, CheckpointInfoPtr ciPtr)
+{
+ DiskBufferSegmentInfoPtr oldDbsiPtr;
+ LocalLogInfoPtr lliPtr;
+ UndoPagePtr oldUndoPagePtr;
+ UndoPagePtr newUndoPagePtr;
+
+ lliPtr.i = ciPtr.p->lcpLocalLogInfoP;
+ ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
+ oldDbsiPtr.i = lliPtr.p->lliUndoBufferSegmentP;
+ ptrCheckGuard(oldDbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo);
+ oldDbsiPtr.p->pdxNumDataPages++;
+ if (clblPageCounter > 0) {
+ ljam();
+ clblPageCounter--;
+ }//if
+ oldUndoPagePtr.i = lliPtr.p->lliUndoPage;
+ ptrCheckGuard(oldUndoPagePtr, cnoOfUndoPage, undoPage);
+ lcpWriteUndoSegment(signal, lliPtr.p, true);
+ oldDbsiPtr.p->pdxOperation = CHECKPOINT_UNDO_WRITE_FLUSH;
+ oldDbsiPtr.p->pdxCheckpointInfoP = ciPtr.i;
+
+/* ---------------------------------------------------------------- */
+/* SINCE LAST PAGE SENT TO DISK WAS NOT FULL YET WE COPY IT */
+/* TO THE NEW LAST PAGE. */
+/* ---------------------------------------------------------------- */
+ newUndoPagePtr.i = lliPtr.p->lliUndoPage;
+ ptrCheckGuard(newUndoPagePtr, cnoOfUndoPage, undoPage);
+ ndbrequire(lliPtr.p->lliUndoWord < ZWORDS_ON_PAGE);
+ MEMCOPY_NO_WORDS(&newUndoPagePtr.p->undoPageWord[0],
+ &oldUndoPagePtr.p->undoPageWord[0],
+ lliPtr.p->lliUndoWord);
+}//Dbtup::lcpFlushLogLab()
+
+void Dbtup::lcpFlushRestartInfoLab(Signal* signal, Uint32 ciIndex)
+{
+ CheckpointInfoPtr ciPtr;
+ DiskBufferSegmentInfoPtr dbsiPtr;
+ LocalLogInfoPtr lliPtr;
+ UndoPagePtr undoCopyPagePtr;
+
+ ciPtr.i = ciIndex;
+ ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo);
+
+ lliPtr.i = ciPtr.p->lcpLocalLogInfoP;
+ ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo);
+ dbsiPtr.i = ciPtr.p->lcpDataBufferSegmentP;
+ ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo);
+ undoCopyPagePtr.i = dbsiPtr.p->pdxDataPage[0]; /* UNDO INFO STORED AT PAGE 0 */
+ ptrCheckGuard(undoCopyPagePtr, cnoOfUndoPage, undoPage);
+ ndbrequire(ciPtr.p->lcpNoOfPages > 0);
+ undoCopyPagePtr.p->undoPageWord[ZSRI_NO_OF_FRAG_PAGES_POS] = ciPtr.p->lcpNoOfPages;
+ undoCopyPagePtr.p->undoPageWord[ZSRI_NO_COPY_PAGES_ALLOC] = ciPtr.p->lcpNoCopyPagesAlloc;
+ undoCopyPagePtr.p->undoPageWord[ZSRI_EMPTY_PRIM_PAGE] = ciPtr.p->lcpEmptyPrimPage;
+ undoCopyPagePtr.p->undoPageWord[ZSRI_TH_FREE_FIRST] = ciPtr.p->lcpThFreeFirst;
+ undoCopyPagePtr.p->undoPageWord[ZSRI_TH_FREE_COPY_FIRST] = ciPtr.p->lcpThFreeCopyFirst;
+ undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_LOG_END_REC_ID] = lliPtr.p->lliPrevRecordId;
+ undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_FILE_VER] = cundoFileVersion;
+ if (lliPtr.p->lliUndoWord == ZUNDO_PAGE_HEADER_SIZE) {
+ ljam();
+ undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_LOG_END_PAGE_ID] = lliPtr.p->lliLogFilePage - 1;
+ } else {
+ ljam();
+ undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_LOG_END_PAGE_ID] = lliPtr.p->lliLogFilePage;
+ }//if
+ dbsiPtr.p->pdxNumDataPages = 1;
+ dbsiPtr.p->pdxFilePage = 0;
+ if (clblPageCounter > 0) {
+ ljam();
+ clblPageCounter--;
+ }//if
+ lcpWriteListDataPageSegment(signal, dbsiPtr, ciPtr, true);
+ dbsiPtr.p->pdxOperation = CHECKPOINT_DATA_WRITE_FLUSH;
+ return;
+}//Dbtup::lcpFlushRestartInfoLab()
+
+void Dbtup::lcpCompletedLab(Signal* signal, Uint32 ciIndex)
+{
+ CheckpointInfoPtr ciPtr;
+ PendingFileOpenInfoPtr pfoiPtr;
+/* ---------------------------------------------------------------------- */
+/* INSERT CODE TO CLOSE DATA FILE HERE. DO THIS BEFORE SEND CONF */
+/* ---------------------------------------------------------------------- */
+ ciPtr.i = ciIndex;
+ ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo);
+
+ seizePendingFileOpenInfoRecord(pfoiPtr);
+ pfoiPtr.p->pfoOpenType = LCP_DATA_FILE_CLOSE;
+ pfoiPtr.p->pfoCheckpointInfoP = ciPtr.i;
+
+ signal->theData[0] = ciPtr.p->lcpDataFileHandle;
+ signal->theData[1] = cownref;
+ signal->theData[2] = pfoiPtr.i;
+ signal->theData[3] = 0;
+ sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
+ return;
+}//Dbtup::lcpCompletedLab()
+
+void Dbtup::lcpClosedDataFileLab(Signal* signal, CheckpointInfoPtr ciPtr)
+{
+ signal->theData[0] = ciPtr.p->lcpUserptr;
+ sendSignal(ciPtr.p->lcpBlockref, GSN_TUP_LCPCONF, signal, 1, JBB);
+ releaseCheckpointInfoRecord(ciPtr);
+ return;
+}//Dbtup::lcpClosedDataFileLab()
+
+/* ---------------------------------------------------------------------- */
+/* LCP END IS THE LAST STEP IN THE LCP PROCESS IT WILL CLOSE THE LOGFILES */
+/* AND RELEASE THE ALLOCATED CHECKPOINT_INFO_RECORDS */
+/* ---------------------------------------------------------------------- */
+void Dbtup::execEND_LCPREQ(Signal* signal)
+{
+ DiskBufferSegmentInfoPtr dbsiPtr;
+ LocalLogInfoPtr lliPtr;
+ PendingFileOpenInfoPtr pfoiPtr;
+
+ ljamEntry();
+ clqhUserpointer = signal->theData[0];
+ clqhBlockref = signal->theData[1];
+ for (lliPtr.i = 0; lliPtr.i < 16; lliPtr.i++) {
+ ljam();
+ ptrAss(lliPtr, localLogInfo);
+ if (lliPtr.p->lliActiveLcp > 0) {
+ ljam();
+ dbsiPtr.i = lliPtr.p->lliUndoBufferSegmentP;
+ ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo);
+ freeDiskBufferSegmentRecord(signal, dbsiPtr);
+
+ seizePendingFileOpenInfoRecord(pfoiPtr); /* SEIZE A NEW FILE OPEN INFO */
+ pfoiPtr.p->pfoOpenType = LCP_UNDO_FILE_CLOSE;
+ pfoiPtr.p->pfoCheckpointInfoP = lliPtr.i;
+
+ signal->theData[0] = lliPtr.p->lliUndoFileHandle;
+ signal->theData[1] = cownref;
+ signal->theData[2] = pfoiPtr.i;
+ signal->theData[3] = 0;
+ sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
+ lliPtr.p->lliActiveLcp = 0;
+ }//if
+ }//for
+ return;
+}//Dbtup::execEND_LCPREQ()
+
+void Dbtup::lcpEndconfLab(Signal* signal)
+{
+ LocalLogInfoPtr lliPtr;
+ for (lliPtr.i = 0; lliPtr.i < 16; lliPtr.i++) {
+ ljam();
+ ptrAss(lliPtr, localLogInfo);
+ if (lliPtr.p->lliUndoFileHandle != RNIL) {
+ ljam();
+/* ---------------------------------------------------------------------- */
+/* WAIT UNTIL ALL LOG FILES HAVE BEEN CLOSED. */
+/* ---------------------------------------------------------------------- */
+ return;
+ }//if
+ }//for
+ signal->theData[0] = clqhUserpointer;
+ sendSignal(clqhBlockref, GSN_END_LCPCONF, signal, 1, JBB);
+ return;
+}//Dbtup::lcpEndconfLab()
+