diff options
Diffstat (limited to 'storage/ndb/src/kernel/blocks/dbtup/DbtupLCP.cpp')
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbtup/DbtupLCP.cpp | 596 |
1 files changed, 596 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupLCP.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupLCP.cpp new file mode 100644 index 00000000000..370ef4c4ba5 --- /dev/null +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupLCP.cpp @@ -0,0 +1,596 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#define DBTUP_C +#include "Dbtup.hpp" +#include <RefConvert.hpp> +#include <ndb_limits.h> +#include <pc.hpp> +#include <signaldata/FsConf.hpp> +#include <signaldata/FsRef.hpp> + +#define ljam() { jamLine(10000 + __LINE__); } +#define ljamEntry() { jamEntryLine(10000 + __LINE__); } + +/* ---------------------------------------------------------------- */ +/* ---------------------------------------------------------------- */ +/* -------------------- LOCAL CHECKPOINT MODULE ------------------- */ +/* ---------------------------------------------------------------- */ +/* ---------------------------------------------------------------- */ +void Dbtup::execTUP_PREPLCPREQ(Signal* signal) +{ + CheckpointInfoPtr ciPtr; + DiskBufferSegmentInfoPtr dbsiPtr; + FragrecordPtr regFragPtr; + LocalLogInfoPtr lliPtr; + TablerecPtr regTabPtr; + + ljamEntry(); + Uint32 userptr = signal->theData[0]; + BlockReference userblockref = signal->theData[1]; + regTabPtr.i = signal->theData[2]; + ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec); + Uint32 fragId = signal->theData[3]; + Uint32 checkpointNumber = signal->theData[4]; + cundoFileVersion = signal->theData[5]; + + getFragmentrec(regFragPtr, fragId, regTabPtr.p); + ndbrequire(regTabPtr.i != RNIL); + seizeCheckpointInfoRecord(ciPtr); + + lliPtr.i = (cundoFileVersion << 2) + (regTabPtr.i & 0x3); + ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo); + cnoOfDataPagesToDiskWithoutSynch = 0; + + ciPtr.p->lcpDataFileHandle = RNIL; + ciPtr.p->lcpCheckpointVersion = checkpointNumber; + ciPtr.p->lcpLocalLogInfoP = lliPtr.i; + ciPtr.p->lcpFragmentP = regFragPtr.i; /* SET THE FRAGMENT */ + ciPtr.p->lcpFragmentId = fragId; /* SAVE THE FRAGMENT IDENTITY */ + ciPtr.p->lcpTabPtr = regTabPtr.i; /* SET THE TABLE POINTER */ + ciPtr.p->lcpBlockref = userblockref; /* SET THE BLOCK REFERENCE */ + ciPtr.p->lcpUserptr = userptr; /* SET THE USERPOINTER */ + + /***************************************************************/ + /* OPEN THE UNDO FILE FOR WRITE */ + /* UPON FSOPENCONF */ + /***************************************************************/ + if (lliPtr.p->lliActiveLcp == 0) { /* IS THE UNDO LOG FILE OPEN? */ + PendingFileOpenInfoPtr undoPfoiPtr; + UndoPagePtr regUndoPagePtr; + + ljam(); + lliPtr.p->lliPrevRecordId = 0; + lliPtr.p->lliLogFilePage = 0; + lliPtr.p->lliUndoPagesToDiskWithoutSynch = 0; + lliPtr.p->lliUndoWord = ZUNDO_PAGE_HEADER_SIZE; + + seizeUndoBufferSegment(signal, regUndoPagePtr); + seizeDiskBufferSegmentRecord(dbsiPtr); + dbsiPtr.p->pdxBuffertype = UNDO_PAGES; + for (Uint32 i = 0; i < ZUB_SEGMENT_SIZE; i++) { + dbsiPtr.p->pdxDataPage[i] = regUndoPagePtr.i + i; + }//for + dbsiPtr.p->pdxFilePage = lliPtr.p->lliLogFilePage; + lliPtr.p->lliUndoPage = regUndoPagePtr.i; + lliPtr.p->lliUndoBufferSegmentP = dbsiPtr.i; + /* F LEVEL NOT USED */ + Uint32 fileType = 1; /* VERSION */ + fileType = (fileType << 8) | 2; /* .LOCLOG */ + fileType = (fileType << 8) | 6; /* D6 */ + fileType = (fileType << 8) | 0xff; /* DON'T USE P DIRECTORY LEVEL */ + Uint32 fileFlag = 0x301; /* CREATE, WRITE ONLY, TRUNCATE */ + + seizePendingFileOpenInfoRecord(undoPfoiPtr); + undoPfoiPtr.p->pfoOpenType = LCP_UNDO_FILE_WRITE; + undoPfoiPtr.p->pfoCheckpointInfoP = ciPtr.i; + + signal->theData[0] = cownref; + signal->theData[1] = undoPfoiPtr.i; + signal->theData[2] = lliPtr.i; + signal->theData[3] = 0xFFFFFFFF; + signal->theData[4] = cundoFileVersion; + signal->theData[5] = fileType; + signal->theData[6] = fileFlag; + sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA); + }//if + /***************************************************************/ + /* OPEN THE DATA FILE FOR WRITE */ + /* THE FILE HANDLE WILL BE SET IN THE CHECKPOINT_INFO_RECORD */ + /* UPON FSOPENCONF */ + /***************************************************************/ + /* OPEN THE DATA FILE IN THE FOLLOWING FORM */ + /* D5/DBTUP/T<TABID>/F<FRAGID>/S<CHECKPOINT_NUMBER>.DATA */ + + PendingFileOpenInfoPtr dataPfoiPtr; + + Uint32 fileType = 1; /* VERSION */ + fileType = (fileType << 8) | 0; /* .DATA */ + fileType = (fileType << 8) | 5; /* D5 */ + fileType = (fileType << 8) | 0xff; /* DON'T USE P DIRECTORY LEVEL */ + Uint32 fileFlag = 0x301; /* CREATE, WRITE ONLY, TRUNCATE */ + + seizePendingFileOpenInfoRecord(dataPfoiPtr); /* SEIZE A NEW FILE OPEN INFO */ + if (lliPtr.p->lliActiveLcp == 0) { + ljam(); + dataPfoiPtr.p->pfoOpenType = LCP_DATA_FILE_WRITE_WITH_UNDO; + } else { + ljam(); + dataPfoiPtr.p->pfoOpenType = LCP_DATA_FILE_WRITE; + }//if + dataPfoiPtr.p->pfoCheckpointInfoP = ciPtr.i; + + /* LET'S OPEN THE DATA FILE FOR WRITE */ + /* INCREASE NUMBER OF ACTIVE CHECKPOINTS */ + lliPtr.p->lliActiveLcp = 1; + signal->theData[0] = cownref; + signal->theData[1] = dataPfoiPtr.i; + signal->theData[2] = ciPtr.p->lcpTabPtr; + signal->theData[3] = ciPtr.p->lcpFragmentId; + signal->theData[4] = ciPtr.p->lcpCheckpointVersion; + signal->theData[5] = fileType; + signal->theData[6] = fileFlag; + sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA); + return; +}//Dbtup::execTUP_PREPLCPREQ() + +/* ---------------------------------------------------------------- */ +/* ------------------------ START CHECKPOINT --------------------- */ +/* ---------------------------------------------------------------- */ +void Dbtup::execTUP_LCPREQ(Signal* signal) +{ + CheckpointInfoPtr ciPtr; + DiskBufferSegmentInfoPtr dbsiPtr; + FragrecordPtr regFragPtr; + LocalLogInfoPtr lliPtr; + + ljamEntry(); +// Uint32 userptr = signal->theData[0]; +// BlockReference userblockref = signal->theData[1]; + ciPtr.i = signal->theData[2]; + + ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo); + regFragPtr.i = ciPtr.p->lcpFragmentP; + ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord); + +/* ---------------------------------------------------------------- */ +/* ASSIGNING A VALUE DIFFERENT FROM RNIL TO CHECKPOINT VERSION*/ +/* TRIGGERS THAT UNDO LOGGING WILL START FOR THIS FRAGMENT. */ +/* WE ASSIGN IT THE POINTER TO THE CHECKPOINT RECORD FOR */ +/* OPTIMISATION OF THE WRITING OF THE UNDO LOG. */ +/* ---------------------------------------------------------------- */ + regFragPtr.p->checkpointVersion = ciPtr.p->lcpLocalLogInfoP; /* MARK START OF UNDO LOGGING */ + + regFragPtr.p->maxPageWrittenInCheckpoint = getNoOfPages(regFragPtr.p); + regFragPtr.p->minPageNotWrittenInCheckpoint = 0; + ndbrequire(getNoOfPages(regFragPtr.p) > 0); + allocDataBufferSegment(signal, dbsiPtr); + + dbsiPtr.p->pdxNumDataPages = 0; + dbsiPtr.p->pdxFilePage = 1; + ciPtr.p->lcpDataBufferSegmentP = dbsiPtr.i; + dbsiPtr.p->pdxCheckpointInfoP = ciPtr.i; + ciPtr.p->lcpNoOfPages = getNoOfPages(regFragPtr.p); + ciPtr.p->lcpNoCopyPagesAlloc = regFragPtr.p->noCopyPagesAlloc; + ciPtr.p->lcpEmptyPrimPage = regFragPtr.p->emptyPrimPage; + ciPtr.p->lcpThFreeFirst = regFragPtr.p->thFreeFirst; + ciPtr.p->lcpThFreeCopyFirst = regFragPtr.p->thFreeCopyFirst; + lliPtr.i = ciPtr.p->lcpLocalLogInfoP; + ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo); +/* ---------------------------------------------------------------- */ +/* --- PERFORM A COPY OF THE TABLE DESCRIPTOR FOR THIS FRAGMENT --- */ +/* ---------------------------------------------------------------- */ + cprAddLogHeader(signal, + lliPtr.p, + ZTABLE_DESCRIPTOR, + ciPtr.p->lcpTabPtr, + ciPtr.p->lcpFragmentId); + +/* ---------------------------------------------------------------- */ +/* CONTINUE WITH SAVING ACTIVE OPERATIONS AFTER A REAL-TIME */ +/* BREAK. */ +/* ---------------------------------------------------------------- */ + ciPtr.p->lcpTmpOperPtr = regFragPtr.p->firstusedOprec; + lcpSaveCopyListLab(signal, ciPtr); + return; +}//Dbtup::execTUP_LCPREQ() + +void Dbtup::allocDataBufferSegment(Signal* signal, DiskBufferSegmentInfoPtr& dbsiPtr) +{ + UndoPagePtr regUndoPagePtr; + + seizeDiskBufferSegmentRecord(dbsiPtr); + dbsiPtr.p->pdxBuffertype = COMMON_AREA_PAGES; + ndbrequire(cfirstfreeUndoSeg != RNIL); + if (cnoFreeUndoSeg == ZMIN_PAGE_LIMIT_TUP_COMMITREQ) { + EXECUTE_DIRECT(DBLQH, GSN_TUP_COM_BLOCK, signal, 1); + ljamEntry(); + }//if + cnoFreeUndoSeg--; + ndbrequire(cnoFreeUndoSeg >= 0); + + regUndoPagePtr.i = cfirstfreeUndoSeg; + ptrCheckGuard(regUndoPagePtr, cnoOfUndoPage, undoPage); + cfirstfreeUndoSeg = regUndoPagePtr.p->undoPageWord[ZPAGE_NEXT_POS]; + regUndoPagePtr.p->undoPageWord[ZPAGE_NEXT_POS] = RNIL; + for (Uint32 i = 0; i < ZUB_SEGMENT_SIZE; i++) { + dbsiPtr.p->pdxDataPage[i] = regUndoPagePtr.i + i; + }//for +}//Dbtup::allocDataBufferSegment() + +/* ---------------------------------------------------------------- */ +/* --- PERFORM A COPY OF THE ACTIVE OPERATIONS FOR THIS FRAGMENT -- */ +/* ---------------------------------------------------------------- */ +void Dbtup::lcpSaveCopyListLab(Signal* signal, CheckpointInfoPtr ciPtr) +{ + FragrecordPtr regFragPtr; + LocalLogInfoPtr lliPtr; + OperationrecPtr regOpPtr; + + regFragPtr.i = ciPtr.p->lcpFragmentP; + ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord); + lliPtr.i = ciPtr.p->lcpLocalLogInfoP; + ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo); + regOpPtr.i = ciPtr.p->lcpTmpOperPtr; + +/* -------------------------------------------------------------------------------- */ +/* TRAVERSE THE ENTIRE BLOCK OF OPERATIONS. CHECK IF THERE ARE EXISTING COPYS OF */ +/* TUPLES IN THE CHECKPOINTED FRAGMENT. SAVE THOSE IN A LIST IN THE FOLLOWING FORM: */ +/* */ +/* SOURCE PAGE */ +/* SOURCE INDEX */ +/* COPY PAGE */ +/* COPY INDEX */ +/* -------------------------------------------------------------------------------- */ + Uint32 loopCount = 0; + while ((regOpPtr.i != RNIL) && (loopCount < 50)) { + ljam(); + ptrCheckGuard(regOpPtr, cnoOfOprec, operationrec); + if (regOpPtr.p->realPageId != RNIL) { +/* ---------------------------------------------------------------- */ +// We ensure that we have actually allocated the tuple header and +// also found it. Otherwise we will fill the undo log with garbage. +/* ---------------------------------------------------------------- */ + if (regOpPtr.p->optype == ZUPDATE || + (regOpPtr.p->optype == ZINSERT && regOpPtr.p->deleteInsertFlag)) { + ljam(); + if (regOpPtr.p->realPageIdC != RNIL) { +/* ---------------------------------------------------------------- */ +// We ensure that we have actually allocated the tuple header copy. +// Otherwise we will fill the undo log with garbage. +/* ---------------------------------------------------------------- */ + cprAddLogHeader(signal, + lliPtr.p, + ZLCPR_ABORT_UPDATE, + ciPtr.p->lcpTabPtr, + ciPtr.p->lcpFragmentId); + cprAddAbortUpdate(signal, lliPtr.p, regOpPtr.p); + }//if + } else if (regOpPtr.p->optype == ZINSERT) { + ljam(); + cprAddUndoLogRecord(signal, + ZLCPR_ABORT_INSERT, + regOpPtr.p->fragPageId, + regOpPtr.p->pageIndex, + regOpPtr.p->tableRef, + regOpPtr.p->fragId, + regFragPtr.p->checkpointVersion); + } else { + ndbrequire(regOpPtr.p->optype == ZDELETE); + ljam(); + cprAddUndoLogRecord(signal, + ZINDICATE_NO_OP_ACTIVE, + regOpPtr.p->fragPageId, + regOpPtr.p->pageIndex, + regOpPtr.p->tableRef, + regOpPtr.p->fragId, + regFragPtr.p->checkpointVersion); + }//if + }//if + loopCount++;; + regOpPtr.i = regOpPtr.p->nextOprecInList; + }//while + if (regOpPtr.i == RNIL) { + ljam(); + + signal->theData[0] = ciPtr.p->lcpUserptr; + sendSignal(ciPtr.p->lcpBlockref, GSN_TUP_LCPSTARTED, signal, 1, JBA); + + signal->theData[0] = ZCONT_SAVE_DP; + signal->theData[1] = ciPtr.i; + sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB); + } else { + ljam(); + ciPtr.p->lcpTmpOperPtr = regOpPtr.i; + signal->theData[0] = ZCONT_START_SAVE_CL; + signal->theData[1] = ciPtr.i; + sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB); + }//if +}//Dbtup::lcpSaveCopyListLab() + +/* ---------------------------------------------------------------- */ +/* ------- PERFORM A COPY OF ONE DATAPAGE DURING CHECKPOINT ------- */ +/* ---------------------------------------------------------------- */ +/* THE RANGE OF DATA PAGES IS INCLUDED IN THE CHECKPOINT_INFO_PTR */ +/* LAST_PAGE_TO_BUFFER ELEMENT IS INCREASED UNTIL ALL PAGES ARE */ +/* COPIED TO THE DISK BUFFER. WHEN A DISK BUFFER SEGMENT IS FULL */ +/* IT WILL BE WRITTEN TO DISK (TYPICALLY EACH 8:TH PAGE) */ +/* ---------------------------------------------------------------- */ +void Dbtup::lcpSaveDataPageLab(Signal* signal, Uint32 ciIndex) +{ + CheckpointInfoPtr ciPtr; + DiskBufferSegmentInfoPtr dbsiPtr; + FragrecordPtr regFragPtr; + LocalLogInfoPtr lliPtr; + UndoPagePtr undoCopyPagePtr; + PagePtr pagePtr; + + ciPtr.i = ciIndex; + ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo); + if (ERROR_INSERTED(4000)){ + if (ciPtr.p->lcpTabPtr == c_errorInsert4000TableId) { + // Delay writing of data pages during LCP + ndbout << "Delay writing of data pages during LCP" << endl; + signal->theData[0] = ZCONT_SAVE_DP; + signal->theData[1] = ciIndex; + sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 1000, 2); + return; + }//if + }//if + if (clblPageCounter == 0) { + ljam(); + signal->theData[0] = ZCONT_SAVE_DP; + signal->theData[1] = ciPtr.i; + sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 100, 2); + return; + } else { + ljam(); + clblPageCounter--; + }//if + + regFragPtr.i = ciPtr.p->lcpFragmentP; + ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord); + dbsiPtr.i = ciPtr.p->lcpDataBufferSegmentP; + ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo); + + pagePtr.i = getRealpid(regFragPtr.p, regFragPtr.p->minPageNotWrittenInCheckpoint); + ptrCheckGuard(pagePtr, cnoOfPage, page); + ndbrequire(dbsiPtr.p->pdxNumDataPages < 16); + undoCopyPagePtr.i = dbsiPtr.p->pdxDataPage[dbsiPtr.p->pdxNumDataPages]; + ptrCheckGuard(undoCopyPagePtr, cnoOfUndoPage, undoPage); + MEMCOPY_NO_WORDS(&undoCopyPagePtr.p->undoPageWord[0], + &pagePtr.p->pageWord[0], + ZWORDS_ON_PAGE); + regFragPtr.p->minPageNotWrittenInCheckpoint++; + dbsiPtr.p->pdxNumDataPages++; + if (regFragPtr.p->minPageNotWrittenInCheckpoint == regFragPtr.p->maxPageWrittenInCheckpoint) { + /* ---------------------------------------------------------- */ + /* ALL PAGES ARE COPIED, TIME TO FINISH THE CHECKPOINT */ + /* SAVE THE END POSITIONS OF THE LOG RECORDS SINCE ALL DATA */ + /* PAGES ARE NOW SAFE ON DISK AND NO MORE LOGGING WILL APPEAR */ + /* ---------------------------------------------------------- */ + ljam(); + lliPtr.i = ciPtr.p->lcpLocalLogInfoP; + ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo); + regFragPtr.p->checkpointVersion = RNIL; /* UNDO LOGGING IS SHUT OFF */ + lcpWriteListDataPageSegment(signal, dbsiPtr, ciPtr, false); + dbsiPtr.p->pdxOperation = CHECKPOINT_DATA_WRITE_LAST; + } else if (dbsiPtr.p->pdxNumDataPages == ZDB_SEGMENT_SIZE) { + ljam(); + lcpWriteListDataPageSegment(signal, dbsiPtr, ciPtr, false); + dbsiPtr.p->pdxOperation = CHECKPOINT_DATA_WRITE; + } else { + ljam(); + signal->theData[0] = ZCONT_SAVE_DP; + signal->theData[1] = ciPtr.i; + sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB); + }//if +}//Dbtup::lcpSaveDataPageLab() + +void Dbtup::lcpWriteListDataPageSegment(Signal* signal, + DiskBufferSegmentInfoPtr dbsiPtr, + CheckpointInfoPtr ciPtr, + bool flushFlag) +{ + Uint32 flags = 1; + cnoOfDataPagesToDiskWithoutSynch += dbsiPtr.p->pdxNumDataPages; + if ((cnoOfDataPagesToDiskWithoutSynch > MAX_PAGES_WITHOUT_SYNCH) || + (flushFlag)) { + ljam(); +/* ---------------------------------------------------------------- */ +// To avoid synching too big chunks at a time we synch after writing +// a certain number of data pages. (e.g. 2 MBytes). +/* ---------------------------------------------------------------- */ + cnoOfDataPagesToDiskWithoutSynch = 0; + flags |= 0x10; //Set synch flag unconditionally + }//if + signal->theData[0] = ciPtr.p->lcpDataFileHandle; + signal->theData[1] = cownref; + signal->theData[2] = dbsiPtr.i; + signal->theData[3] = flags; + signal->theData[4] = ZBASE_ADDR_UNDO_WORD; + signal->theData[5] = dbsiPtr.p->pdxNumDataPages; + signal->theData[6] = dbsiPtr.p->pdxDataPage[0]; + signal->theData[7] = dbsiPtr.p->pdxFilePage; + sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 8, JBA); + + dbsiPtr.p->pdxFilePage += dbsiPtr.p->pdxNumDataPages; + dbsiPtr.p->pdxNumDataPages = 0; +}//Dbtup::lcpWriteListDataPageSegment() + +void Dbtup::lcpFlushLogLab(Signal* signal, CheckpointInfoPtr ciPtr) +{ + DiskBufferSegmentInfoPtr oldDbsiPtr; + LocalLogInfoPtr lliPtr; + UndoPagePtr oldUndoPagePtr; + UndoPagePtr newUndoPagePtr; + + lliPtr.i = ciPtr.p->lcpLocalLogInfoP; + ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo); + oldDbsiPtr.i = lliPtr.p->lliUndoBufferSegmentP; + ptrCheckGuard(oldDbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo); + oldDbsiPtr.p->pdxNumDataPages++; + if (clblPageCounter > 0) { + ljam(); + clblPageCounter--; + }//if + oldUndoPagePtr.i = lliPtr.p->lliUndoPage; + ptrCheckGuard(oldUndoPagePtr, cnoOfUndoPage, undoPage); + lcpWriteUndoSegment(signal, lliPtr.p, true); + oldDbsiPtr.p->pdxOperation = CHECKPOINT_UNDO_WRITE_FLUSH; + oldDbsiPtr.p->pdxCheckpointInfoP = ciPtr.i; + +/* ---------------------------------------------------------------- */ +/* SINCE LAST PAGE SENT TO DISK WAS NOT FULL YET WE COPY IT */ +/* TO THE NEW LAST PAGE. */ +/* ---------------------------------------------------------------- */ + newUndoPagePtr.i = lliPtr.p->lliUndoPage; + ptrCheckGuard(newUndoPagePtr, cnoOfUndoPage, undoPage); + ndbrequire(lliPtr.p->lliUndoWord < ZWORDS_ON_PAGE); + MEMCOPY_NO_WORDS(&newUndoPagePtr.p->undoPageWord[0], + &oldUndoPagePtr.p->undoPageWord[0], + lliPtr.p->lliUndoWord); +}//Dbtup::lcpFlushLogLab() + +void Dbtup::lcpFlushRestartInfoLab(Signal* signal, Uint32 ciIndex) +{ + CheckpointInfoPtr ciPtr; + DiskBufferSegmentInfoPtr dbsiPtr; + LocalLogInfoPtr lliPtr; + UndoPagePtr undoCopyPagePtr; + + ciPtr.i = ciIndex; + ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo); + + lliPtr.i = ciPtr.p->lcpLocalLogInfoP; + ptrCheckGuard(lliPtr, cnoOfParallellUndoFiles, localLogInfo); + dbsiPtr.i = ciPtr.p->lcpDataBufferSegmentP; + ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo); + undoCopyPagePtr.i = dbsiPtr.p->pdxDataPage[0]; /* UNDO INFO STORED AT PAGE 0 */ + ptrCheckGuard(undoCopyPagePtr, cnoOfUndoPage, undoPage); + ndbrequire(ciPtr.p->lcpNoOfPages > 0); + undoCopyPagePtr.p->undoPageWord[ZSRI_NO_OF_FRAG_PAGES_POS] = ciPtr.p->lcpNoOfPages; + undoCopyPagePtr.p->undoPageWord[ZSRI_NO_COPY_PAGES_ALLOC] = ciPtr.p->lcpNoCopyPagesAlloc; + undoCopyPagePtr.p->undoPageWord[ZSRI_EMPTY_PRIM_PAGE] = ciPtr.p->lcpEmptyPrimPage; + undoCopyPagePtr.p->undoPageWord[ZSRI_TH_FREE_FIRST] = ciPtr.p->lcpThFreeFirst; + undoCopyPagePtr.p->undoPageWord[ZSRI_TH_FREE_COPY_FIRST] = ciPtr.p->lcpThFreeCopyFirst; + undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_LOG_END_REC_ID] = lliPtr.p->lliPrevRecordId; + undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_FILE_VER] = cundoFileVersion; + if (lliPtr.p->lliUndoWord == ZUNDO_PAGE_HEADER_SIZE) { + ljam(); + undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_LOG_END_PAGE_ID] = lliPtr.p->lliLogFilePage - 1; + } else { + ljam(); + undoCopyPagePtr.p->undoPageWord[ZSRI_UNDO_LOG_END_PAGE_ID] = lliPtr.p->lliLogFilePage; + }//if + dbsiPtr.p->pdxNumDataPages = 1; + dbsiPtr.p->pdxFilePage = 0; + if (clblPageCounter > 0) { + ljam(); + clblPageCounter--; + }//if + lcpWriteListDataPageSegment(signal, dbsiPtr, ciPtr, true); + dbsiPtr.p->pdxOperation = CHECKPOINT_DATA_WRITE_FLUSH; + return; +}//Dbtup::lcpFlushRestartInfoLab() + +void Dbtup::lcpCompletedLab(Signal* signal, Uint32 ciIndex) +{ + CheckpointInfoPtr ciPtr; + PendingFileOpenInfoPtr pfoiPtr; +/* ---------------------------------------------------------------------- */ +/* INSERT CODE TO CLOSE DATA FILE HERE. DO THIS BEFORE SEND CONF */ +/* ---------------------------------------------------------------------- */ + ciPtr.i = ciIndex; + ptrCheckGuard(ciPtr, cnoOfLcpRec, checkpointInfo); + + seizePendingFileOpenInfoRecord(pfoiPtr); + pfoiPtr.p->pfoOpenType = LCP_DATA_FILE_CLOSE; + pfoiPtr.p->pfoCheckpointInfoP = ciPtr.i; + + signal->theData[0] = ciPtr.p->lcpDataFileHandle; + signal->theData[1] = cownref; + signal->theData[2] = pfoiPtr.i; + signal->theData[3] = 0; + sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA); + return; +}//Dbtup::lcpCompletedLab() + +void Dbtup::lcpClosedDataFileLab(Signal* signal, CheckpointInfoPtr ciPtr) +{ + signal->theData[0] = ciPtr.p->lcpUserptr; + sendSignal(ciPtr.p->lcpBlockref, GSN_TUP_LCPCONF, signal, 1, JBB); + releaseCheckpointInfoRecord(ciPtr); + return; +}//Dbtup::lcpClosedDataFileLab() + +/* ---------------------------------------------------------------------- */ +/* LCP END IS THE LAST STEP IN THE LCP PROCESS IT WILL CLOSE THE LOGFILES */ +/* AND RELEASE THE ALLOCATED CHECKPOINT_INFO_RECORDS */ +/* ---------------------------------------------------------------------- */ +void Dbtup::execEND_LCPREQ(Signal* signal) +{ + DiskBufferSegmentInfoPtr dbsiPtr; + LocalLogInfoPtr lliPtr; + PendingFileOpenInfoPtr pfoiPtr; + + ljamEntry(); + clqhUserpointer = signal->theData[0]; + clqhBlockref = signal->theData[1]; + for (lliPtr.i = 0; lliPtr.i < 16; lliPtr.i++) { + ljam(); + ptrAss(lliPtr, localLogInfo); + if (lliPtr.p->lliActiveLcp > 0) { + ljam(); + dbsiPtr.i = lliPtr.p->lliUndoBufferSegmentP; + ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo); + freeDiskBufferSegmentRecord(signal, dbsiPtr); + + seizePendingFileOpenInfoRecord(pfoiPtr); /* SEIZE A NEW FILE OPEN INFO */ + pfoiPtr.p->pfoOpenType = LCP_UNDO_FILE_CLOSE; + pfoiPtr.p->pfoCheckpointInfoP = lliPtr.i; + + signal->theData[0] = lliPtr.p->lliUndoFileHandle; + signal->theData[1] = cownref; + signal->theData[2] = pfoiPtr.i; + signal->theData[3] = 0; + sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA); + lliPtr.p->lliActiveLcp = 0; + }//if + }//for + return; +}//Dbtup::execEND_LCPREQ() + +void Dbtup::lcpEndconfLab(Signal* signal) +{ + LocalLogInfoPtr lliPtr; + for (lliPtr.i = 0; lliPtr.i < 16; lliPtr.i++) { + ljam(); + ptrAss(lliPtr, localLogInfo); + if (lliPtr.p->lliUndoFileHandle != RNIL) { + ljam(); +/* ---------------------------------------------------------------------- */ +/* WAIT UNTIL ALL LOG FILES HAVE BEEN CLOSED. */ +/* ---------------------------------------------------------------------- */ + return; + }//if + }//for + signal->theData[0] = clqhUserpointer; + sendSignal(clqhBlockref, GSN_END_LCPCONF, signal, 1, JBB); + return; +}//Dbtup::lcpEndconfLab() + |