diff options
Diffstat (limited to 'storage/ndb/src/kernel/blocks/dbtup/DbtupUndoLog.cpp')
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbtup/DbtupUndoLog.cpp | 284 |
1 files changed, 284 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupUndoLog.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupUndoLog.cpp new file mode 100644 index 00000000000..869f399583f --- /dev/null +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupUndoLog.cpp @@ -0,0 +1,284 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#define DBTUP_C +#include "Dbtup.hpp" +#include <RefConvert.hpp> +#include <ndb_limits.h> +#include <pc.hpp> + +#define ljam() { jamLine(12000 + __LINE__); } +#define ljamEntry() { jamEntryLine(12000 + __LINE__); } + +void Dbtup::cprAddData(Signal* signal, + Fragrecord* const regFragPtr, + Uint32 pageIndex, + Uint32 noOfWords, + Uint32 startOffset) +{ + UndoPagePtr undoPagePtr; + PagePtr pagePtr; + LocalLogInfoPtr regLliPtr; + + regLliPtr.i = regFragPtr->checkpointVersion; + ptrCheckGuard(regLliPtr, cnoOfParallellUndoFiles, localLogInfo); + + pagePtr.i = pageIndex; + ptrCheckGuard(pagePtr, cnoOfPage, page); + undoPagePtr.i = regLliPtr.p->lliUndoPage; + ptrCheckGuard(undoPagePtr, cnoOfUndoPage, undoPage); + + startOffset++; + noOfWords--; + if ((regLliPtr.p->lliUndoWord + noOfWords) < ZWORDS_ON_PAGE) { + ljam(); + MEMCOPY_NO_WORDS(&undoPagePtr.p->undoPageWord[regLliPtr.p->lliUndoWord], + &pagePtr.p->pageWord[startOffset], + noOfWords); + regLliPtr.p->lliUndoWord += noOfWords; + } else { + for (Uint32 i = 0; i < noOfWords; i++) { + ljam(); + Uint32 undoWord = pagePtr.p->pageWord[startOffset + i]; + cprAddUndoLogWord(signal, regLliPtr.p, undoWord); + }//for + }//if +}//Dbtup::cprAddData() + +void Dbtup::cprAddLogHeader(Signal* signal, + LocalLogInfo* const lliPtr, + Uint32 recordType, + Uint32 tableId, + Uint32 fragId) +{ + Uint32 prevRecId = lliPtr->lliPrevRecordId; + lliPtr->lliPrevRecordId = lliPtr->lliUndoWord + (lliPtr->lliLogFilePage << ZUNDO_RECORD_ID_PAGE_INDEX); + cprAddUndoLogWord(signal, lliPtr, recordType); + cprAddUndoLogWord(signal, lliPtr, prevRecId); + cprAddUndoLogWord(signal, lliPtr, tableId); + cprAddUndoLogWord(signal, lliPtr, fragId); +}//Dbtup::cprAddLogHeader() + +void Dbtup::cprAddGCIUpdate(Signal* signal, + Uint32 prevGCI, + Fragrecord* const regFragPtr) +{ + LocalLogInfoPtr regLliPtr; + regLliPtr.i = regFragPtr->checkpointVersion; + ptrCheckGuard(regLliPtr, cnoOfParallellUndoFiles, localLogInfo); + + cprAddUndoLogWord(signal, regLliPtr.p, prevGCI); +}//Dbtup::cprAddLogHeader() + +void Dbtup::cprAddUndoLogPageHeader(Signal* signal, + Page* const regPagePtr, + Fragrecord* const regFragPtr) +{ + UndoPagePtr regUndoPagePtr; + LocalLogInfoPtr regLliPtr; + + regLliPtr.i = regFragPtr->checkpointVersion; + ptrCheckGuard(regLliPtr, cnoOfParallellUndoFiles, localLogInfo); + + Uint32 prevRecId = regLliPtr.p->lliPrevRecordId; + Uint32 lliWord = regLliPtr.p->lliUndoWord; + regLliPtr.p->lliPrevRecordId = lliWord + + (regLliPtr.p->lliLogFilePage << ZUNDO_RECORD_ID_PAGE_INDEX); + if ((lliWord + 7) < ZWORDS_ON_PAGE) { + ljam(); + regUndoPagePtr.i = regLliPtr.p->lliUndoPage; + ptrCheckGuard(regUndoPagePtr, cnoOfUndoPage, undoPage); + + regUndoPagePtr.p->undoPageWord[lliWord] = ZLCPR_UNDO_LOG_PAGE_HEADER; + regUndoPagePtr.p->undoPageWord[lliWord + 1] = prevRecId; + regUndoPagePtr.p->undoPageWord[lliWord + 2] = regFragPtr->fragTableId; + regUndoPagePtr.p->undoPageWord[lliWord + 3] = regFragPtr->fragmentId; + regUndoPagePtr.p->undoPageWord[lliWord + 4] = regPagePtr->pageWord[ZPAGE_FRAG_PAGE_ID_POS]; + regUndoPagePtr.p->undoPageWord[lliWord + 5] = regPagePtr->pageWord[ZPAGE_STATE_POS]; + regUndoPagePtr.p->undoPageWord[lliWord + 6] = regPagePtr->pageWord[ZPAGE_NEXT_POS]; + regLliPtr.p->lliUndoWord = lliWord + 7; + } else { + ljam(); + cprAddUndoLogWord(signal, regLliPtr.p, ZLCPR_UNDO_LOG_PAGE_HEADER); + cprAddUndoLogWord(signal, regLliPtr.p, prevRecId); + cprAddUndoLogWord(signal, regLliPtr.p, regFragPtr->fragTableId); + cprAddUndoLogWord(signal, regLliPtr.p, regFragPtr->fragmentId); + cprAddUndoLogWord(signal, regLliPtr.p, regPagePtr->pageWord[ZPAGE_FRAG_PAGE_ID_POS]); + cprAddUndoLogWord(signal, regLliPtr.p, regPagePtr->pageWord[ZPAGE_STATE_POS]); + cprAddUndoLogWord(signal, regLliPtr.p, regPagePtr->pageWord[ZPAGE_NEXT_POS]); + }//if +}//Dbtup::cprAddUndoLogPageHeader() + +void Dbtup::cprAddUndoLogRecord(Signal* signal, + Uint32 recordType, + Uint32 pageId, + Uint32 pageIndex, + Uint32 tableId, + Uint32 fragId, + Uint32 localLogIndex) +{ + LocalLogInfoPtr regLliPtr; + UndoPagePtr regUndoPagePtr; + + regLliPtr.i = localLogIndex; + ptrCheckGuard(regLliPtr, cnoOfParallellUndoFiles, localLogInfo); + + Uint32 prevRecId = regLliPtr.p->lliPrevRecordId; + Uint32 lliWord = regLliPtr.p->lliUndoWord; + + regLliPtr.p->lliPrevRecordId = lliWord + + (regLliPtr.p->lliLogFilePage << ZUNDO_RECORD_ID_PAGE_INDEX); + if ((lliWord + 6) < ZWORDS_ON_PAGE) { + ljam(); + regUndoPagePtr.i = regLliPtr.p->lliUndoPage; + ptrCheckGuard(regUndoPagePtr, cnoOfUndoPage, undoPage); + regUndoPagePtr.p->undoPageWord[lliWord] = recordType; + regUndoPagePtr.p->undoPageWord[lliWord + 1] = prevRecId; + regUndoPagePtr.p->undoPageWord[lliWord + 2] = tableId; + regUndoPagePtr.p->undoPageWord[lliWord + 3] = fragId; + regUndoPagePtr.p->undoPageWord[lliWord + 4] = pageId; + regUndoPagePtr.p->undoPageWord[lliWord + 5] = pageIndex; + + regLliPtr.p->lliUndoWord = lliWord + 6; + } else { + ljam(); + cprAddUndoLogWord(signal, regLliPtr.p, recordType); + cprAddUndoLogWord(signal, regLliPtr.p, prevRecId); + cprAddUndoLogWord(signal, regLliPtr.p, tableId); + cprAddUndoLogWord(signal, regLliPtr.p, fragId); + cprAddUndoLogWord(signal, regLliPtr.p, pageId); + cprAddUndoLogWord(signal, regLliPtr.p, pageIndex); + }//if +}//Dbtup::cprAddUndoLogRecord() + +void Dbtup::cprAddAbortUpdate(Signal* signal, + LocalLogInfo* const lliPtr, + Operationrec* const regOperPtr) +{ + Uint32 lliWord = lliPtr->lliUndoWord; + if ((lliWord + 4) < ZWORDS_ON_PAGE) { + ljam(); + UndoPagePtr regUndoPagePtr; + regUndoPagePtr.i = lliPtr->lliUndoPage; + ptrCheckGuard(regUndoPagePtr, cnoOfUndoPage, undoPage); + + regUndoPagePtr.p->undoPageWord[lliWord] = regOperPtr->fragPageId; + regUndoPagePtr.p->undoPageWord[lliWord + 1] = regOperPtr->pageIndex; + regUndoPagePtr.p->undoPageWord[lliWord + 2] = regOperPtr->fragPageIdC; + regUndoPagePtr.p->undoPageWord[lliWord + 3] = regOperPtr->pageIndexC; + lliPtr->lliUndoWord = lliWord + 4; + } else { + ljam(); + cprAddUndoLogWord(signal, lliPtr, regOperPtr->fragPageId); + cprAddUndoLogWord(signal, lliPtr, regOperPtr->pageIndex); + cprAddUndoLogWord(signal, lliPtr, regOperPtr->fragPageIdC); + cprAddUndoLogWord(signal, lliPtr, regOperPtr->pageIndexC); + }//if +}//Dbtup::cprAddAbortUpdate() + +void Dbtup::cprAddUndoLogWord(Signal* signal, LocalLogInfo* const lliPtr, Uint32 undoWord) +{ + DiskBufferSegmentInfoPtr dbsiPtr; + UndoPagePtr regUndoPagePtr; + + ljam(); + regUndoPagePtr.i = lliPtr->lliUndoPage; + ptrCheckGuard(regUndoPagePtr, cnoOfUndoPage, undoPage); + ndbrequire(lliPtr->lliUndoWord < ZWORDS_ON_PAGE); + regUndoPagePtr.p->undoPageWord[lliPtr->lliUndoWord] = undoWord; + + lliPtr->lliUndoWord++; + if (lliPtr->lliUndoWord == ZWORDS_ON_PAGE) { + ljam(); + lliPtr->lliUndoWord = ZUNDO_PAGE_HEADER_SIZE; + lliPtr->lliUndoPage++; + if (clblPageCounter > 0) { + ljam(); + clblPageCounter--; + }//if + dbsiPtr.i = lliPtr->lliUndoBufferSegmentP; + ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo); + dbsiPtr.p->pdxNumDataPages++; + ndbrequire(dbsiPtr.p->pdxNumDataPages < 16); + lliPtr->lliLogFilePage++; + if (dbsiPtr.p->pdxNumDataPages == ZUB_SEGMENT_SIZE) { + ljam(); + lcpWriteUndoSegment(signal, lliPtr, false); + }//if + }//if +}//Dbtup::cprAddUndoLogWord() + +void Dbtup::lcpWriteUndoSegment(Signal* signal, LocalLogInfo* const lliPtr, bool flushFlag) +{ + DiskBufferSegmentInfoPtr dbsiPtr; + + dbsiPtr.i = lliPtr->lliUndoBufferSegmentP; + ptrCheckGuard(dbsiPtr, cnoOfConcurrentWriteOp, diskBufferSegmentInfo); + Uint32 flags = 1; + lliPtr->lliUndoPagesToDiskWithoutSynch += dbsiPtr.p->pdxNumDataPages; + if ((lliPtr->lliUndoPagesToDiskWithoutSynch > MAX_PAGES_WITHOUT_SYNCH) || + (flushFlag)) { + ljam(); +/* ---------------------------------------------------------------- */ +// To avoid synching too big chunks at a time we synch after writing +// a certain number of data pages. (e.g. 2 MBytes). +/* ---------------------------------------------------------------- */ + lliPtr->lliUndoPagesToDiskWithoutSynch = 0; + flags |= 0x10; //Set synch flag unconditionally + }//if + dbsiPtr.p->pdxOperation = CHECKPOINT_UNDO_WRITE; + signal->theData[0] = lliPtr->lliUndoFileHandle; + signal->theData[1] = cownref; + signal->theData[2] = dbsiPtr.i; + signal->theData[3] = flags; + signal->theData[4] = ZBASE_ADDR_UNDO_WORD; + signal->theData[5] = dbsiPtr.p->pdxNumDataPages; + signal->theData[6] = dbsiPtr.p->pdxDataPage[0]; + signal->theData[7] = dbsiPtr.p->pdxFilePage; + sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 8, JBA); + + DiskBufferSegmentInfoPtr newDbsiPtr; + UndoPagePtr newUndoPagePtr; + + seizeUndoBufferSegment(signal, newUndoPagePtr); + seizeDiskBufferSegmentRecord(newDbsiPtr); + newDbsiPtr.p->pdxBuffertype = UNDO_PAGES; + for (Uint32 i = 0; i < ZUB_SEGMENT_SIZE; i++) { + newDbsiPtr.p->pdxDataPage[i] = newUndoPagePtr.i + i; + }//for + newDbsiPtr.p->pdxFilePage = lliPtr->lliLogFilePage; + lliPtr->lliUndoPage = newUndoPagePtr.i; + lliPtr->lliUndoBufferSegmentP = newDbsiPtr.i; +}//Dbtup::lcpWriteUndoSegment() + +void Dbtup::seizeUndoBufferSegment(Signal* signal, UndoPagePtr& regUndoPagePtr) +{ + if (cnoFreeUndoSeg == ZMIN_PAGE_LIMIT_TUP_COMMITREQ) { + EXECUTE_DIRECT(DBLQH, GSN_TUP_COM_BLOCK, signal, 1); + ljamEntry(); + }//if + cnoFreeUndoSeg--; + ndbrequire(cnoFreeUndoSeg >= 0); + ndbrequire(cfirstfreeUndoSeg != RNIL); + regUndoPagePtr.i = cfirstfreeUndoSeg; + ptrCheckGuard(regUndoPagePtr, cnoOfUndoPage, undoPage); + cfirstfreeUndoSeg = regUndoPagePtr.p->undoPageWord[ZPAGE_NEXT_POS]; + regUndoPagePtr.p->undoPageWord[ZPAGE_NEXT_POS] = RNIL; +}//Dbtup::seizeUndoBufferSegment() + + + |