summaryrefslogtreecommitdiff
path: root/storage/ndb/src/kernel/blocks
diff options
context:
space:
mode:
authorunknown <jonas@perch.ndb.mysql.com>2006-02-02 12:23:23 +0100
committerunknown <jonas@perch.ndb.mysql.com>2006-02-02 12:23:23 +0100
commit5bc7f5f94c6fb1403ecf4acc6e010676a3e1d44b (patch)
tree1f340f1c386c4fd8466caed37e9db8443613036e /storage/ndb/src/kernel/blocks
parent3e217c8161bd5876c4b21dceca1263da1fadbd86 (diff)
downloadmariadb-git-5bc7f5f94c6fb1403ecf4acc6e010676a3e1d44b.tar.gz
ndb dd
Fix SR bug that extent pages was scanned before undo was run Fix bug wrt page flushing/tsman and tup's dirty page list storage/ndb/include/kernel/signaldata/PgmanContinueB.hpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp: remove some unused code storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: Add LCP_PREPARE to pgman Change order between TSMAN/LGMAN START_RECREQ storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/blocks/diskpage.hpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/blocks/lgman.cpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/blocks/pgman.cpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/blocks/pgman.hpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/blocks/tsman.cpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/blocks/tsman.hpp: Fix dd SR + dd free space bugs storage/ndb/src/kernel/vm/DLFifoList.hpp: Fix dd SR + dd free space bugs storage/ndb/test/tools/hugoLoad.cpp: Fix dd SR + dd free space bugs storage/ndb/tools/delete_all.cpp: Fix dd SR + dd free space bugs
Diffstat (limited to 'storage/ndb/src/kernel/blocks')
-rw-r--r--storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp10
-rw-r--r--storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp192
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp15
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp6
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp831
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp7
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp9
-rw-r--r--storage/ndb/src/kernel/blocks/diskpage.hpp2
-rw-r--r--storage/ndb/src/kernel/blocks/lgman.cpp3
-rw-r--r--storage/ndb/src/kernel/blocks/pgman.cpp282
-rw-r--r--storage/ndb/src/kernel/blocks/pgman.hpp44
-rw-r--r--storage/ndb/src/kernel/blocks/tsman.cpp259
-rw-r--r--storage/ndb/src/kernel/blocks/tsman.hpp49
13 files changed, 856 insertions, 853 deletions
diff --git a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
index bb3941d099c..1dc3f14ce36 100644
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
@@ -1009,8 +1009,6 @@ public:
LCP_SR_STARTED = 10,
LCP_SR_COMPLETED = 11
};
- LcpLocRecord m_acc;
- LcpLocRecord m_tup;
LcpState lcpState;
bool firstFragmentFlag;
@@ -1028,6 +1026,7 @@ public:
bool reportEmpty;
NdbNodeBitmask m_EMPTY_LCP_REQ;
+ Uint32 m_error;
Uint32 m_outstanding;
}; // Size 76 bytes
typedef Ptr<LcpRecord> LcpRecordPtr;
@@ -2248,10 +2247,6 @@ private:
bool checkLcpStarted(Signal* signal);
void checkLcpTupprep(Signal* signal);
void getNextFragForLcp(Signal* signal);
- void initLcpLocAcc(Signal* signal, Uint32 fragId);
- void initLcpLocTup(Signal* signal, Uint32 fragId);
- void releaseLocalLcps(Signal* signal);
- void seizeLcpLoc(Signal* signal);
void sendAccContOp(Signal* signal);
void sendStartLcp(Signal* signal);
void setLogTail(Signal* signal, Uint32 keepGci);
@@ -2283,7 +2278,6 @@ private:
void checkNewMbyte(Signal* signal);
void checkReadExecSr(Signal* signal);
void checkScanTcCompleted(Signal* signal);
- void checkSrCompleted(Signal* signal);
void closeFile(Signal* signal, LogFileRecordPtr logFilePtr, Uint32 place);
void completedLogPage(Signal* signal, Uint32 clpType, Uint32 place);
void deleteFragrec(Uint32 fragId);
@@ -2302,7 +2296,6 @@ private:
void initialiseFragrec(Signal* signal);
void initialiseGcprec(Signal* signal);
void initialiseLcpRec(Signal* signal);
- void initialiseLcpLocrec(Signal* signal);
void initialiseLfo(Signal* signal);
void initialiseLogFile(Signal* signal);
void initialiseLogPage(Signal* signal);
@@ -2346,7 +2339,6 @@ private:
void releaseActiveCopy(Signal* signal);
void releaseAddfragrec(Signal* signal);
void releaseFragrec();
- void releaseLcpLoc(Signal* signal);
void releaseOprec(Signal* signal);
void releasePageRef(Signal* signal);
void releaseMmPages(Signal* signal);
diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index 6ae79dd73e7..a006cc5d785 100644
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -10923,9 +10923,38 @@ void Dblqh::execLCP_PREPARE_REF(Signal* signal)
tabptr.i = ref->tableId;
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
- lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
- lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::ACC_COMPLETED;
- contChkpNextFragLab(signal);
+ ndbrequire(lcpPtr.p->m_outstanding);
+ lcpPtr.p->m_outstanding--;
+
+ /**
+ * Only BACKUP is allowed to ref LCP_PREPARE
+ */
+ ndbrequire(refToBlock(signal->getSendersBlockRef()) == BACKUP);
+ lcpPtr.p->m_error = ref->errorCode;
+
+ if (lcpPtr.p->m_outstanding == 0)
+ {
+ jam();
+
+ if(lcpPtr.p->firstFragmentFlag)
+ {
+ jam();
+ LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
+ lcpPtr.p->firstFragmentFlag= false;
+ *ord = lcpPtr.p->currentFragment.lcpFragOrd;
+ EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
+ jamEntry();
+
+ /**
+ * First fragment mean that last LCP is complete :-)
+ */
+ EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
+ jamEntry();
+ }
+
+ lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
+ contChkpNextFragLab(signal);
+ }
}
/* --------------------------------------------------------------------------
@@ -10945,72 +10974,86 @@ void Dblqh::execLCP_PREPARE_CONF(Signal* signal)
fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
c_fragment_pool.getPtr(fragptr);
-
- ndbrequire(conf->tableId == fragptr.p->tabRef);
- ndbrequire(conf->fragmentId == fragptr.p->fragId);
- lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_HOLDOPS;
- lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::WAIT_LCPHOLDOP;
-
- lcpPtr.p->lcpState = LcpRecord::LCP_START_CHKP;
- lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::HOLDOP_READY;
+ if (refToBlock(signal->getSendersBlockRef()) != PGMAN)
+ {
+ ndbrequire(conf->tableId == fragptr.p->tabRef);
+ ndbrequire(conf->fragmentId == fragptr.p->fragId);
+ }
- /* ----------------------------------------------------------------------
- * UPDATE THE MAX_GCI_IN_LCP AND MAX_GCI_COMPLETED_IN_LCP NOW BEFORE
- * ACTIVATING THE FRAGMENT AGAIN.
- * --------------------------------------------------------------------- */
- ndbrequire(lcpPtr.p->currentFragment.lcpFragOrd.lcpNo < MAX_LCP_STORED);
- fragptr.p->maxGciInLcp = fragptr.p->newestGci;
- fragptr.p->maxGciCompletedInLcp = cnewestCompletedGci;
-
+ ndbrequire(lcpPtr.p->m_outstanding);
+ lcpPtr.p->m_outstanding--;
+ if (lcpPtr.p->m_outstanding == 0)
{
- LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
- *ord = lcpPtr.p->currentFragment.lcpFragOrd;
- EXECUTE_DIRECT(LGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
- jamEntry();
-
- *ord = lcpPtr.p->currentFragment.lcpFragOrd;
- EXECUTE_DIRECT(DBTUP, GSN_LCP_FRAG_ORD, signal, signal->length());
- jamEntry();
+ jam();
if(lcpPtr.p->firstFragmentFlag)
{
jam();
+ LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
lcpPtr.p->firstFragmentFlag= false;
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry();
-
+
/**
* First fragment mean that last LCP is complete :-)
*/
EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
jamEntry();
}
- }
+
+ if (lcpPtr.p->m_error)
+ {
+ jam();
- BackupFragmentReq* req= (BackupFragmentReq*)signal->getDataPtr();
- req->tableId = lcpPtr.p->currentFragment.lcpFragOrd.tableId;
- req->fragmentNo = 0;
- req->backupPtr = m_backup_ptr;
- req->backupId = lcpPtr.p->currentFragment.lcpFragOrd.lcpId;
- req->count = 0;
+ lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
+ contChkpNextFragLab(signal);
+ return;
+ }
+ lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_HOLDOPS;
+ lcpPtr.p->lcpState = LcpRecord::LCP_START_CHKP;
+
+ /* ----------------------------------------------------------------------
+ * UPDATE THE MAX_GCI_IN_LCP AND MAX_GCI_COMPLETED_IN_LCP NOW BEFORE
+ * ACTIVATING THE FRAGMENT AGAIN.
+ * --------------------------------------------------------------------- */
+ ndbrequire(lcpPtr.p->currentFragment.lcpFragOrd.lcpNo < MAX_LCP_STORED);
+ fragptr.p->maxGciInLcp = fragptr.p->newestGci;
+ fragptr.p->maxGciCompletedInLcp = cnewestCompletedGci;
+
+ {
+ LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
+ *ord = lcpPtr.p->currentFragment.lcpFragOrd;
+ EXECUTE_DIRECT(LGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
+ jamEntry();
+
+ *ord = lcpPtr.p->currentFragment.lcpFragOrd;
+ EXECUTE_DIRECT(DBTUP, GSN_LCP_FRAG_ORD, signal, signal->length());
+ jamEntry();
+ }
+
+ BackupFragmentReq* req= (BackupFragmentReq*)signal->getDataPtr();
+ req->tableId = lcpPtr.p->currentFragment.lcpFragOrd.tableId;
+ req->fragmentNo = 0;
+ req->backupPtr = m_backup_ptr;
+ req->backupId = lcpPtr.p->currentFragment.lcpFragOrd.lcpId;
+ req->count = 0;
+
#ifdef NDB_DEBUG_FULL
- if(ERROR_INSERTED(5904))
- {
+ if(ERROR_INSERTED(5904))
+ {
g_trace_lcp.sendSignal(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal,
BackupFragmentReq::SignalLength, JBB);
- }
- else
+ }
+ else
#endif
- {
- sendSignal(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal,
- BackupFragmentReq::SignalLength, JBB);
+ {
+ sendSignal(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal,
+ BackupFragmentReq::SignalLength, JBB);
+ }
}
-
- lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::ACC_STARTED;
- lcpPtr.p->m_tup.lcpLocstate = LcpLocRecord::TUP_COMPLETED;
}
void Dblqh::execBACKUP_FRAGMENT_REF(Signal* signal)
@@ -11025,8 +11068,7 @@ void Dblqh::execBACKUP_FRAGMENT_CONF(Signal* signal)
lcpPtr.i = 0;
ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
- ndbrequire(lcpPtr.p->m_acc.lcpLocstate == LcpLocRecord::ACC_STARTED);
- lcpPtr.p->m_acc.lcpLocstate = LcpLocRecord::ACC_COMPLETED;
+ ndbrequire(lcpPtr.p->lcpState == LcpRecord::LCP_START_CHKP);
lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
/* ------------------------------------------------------------------------
@@ -11142,7 +11184,10 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal)
contChkpNextFragLab(signal);
return;
}
-
+
+ lcpPtr.p->m_error = 0;
+ lcpPtr.p->m_outstanding = 1;
+
ndbrequire(tabPtr.p->tableStatus == Tablerec::TABLE_DEFINED);
lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_FRAGID;
@@ -11157,6 +11202,13 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal)
req->backupId = lcpPtr.p->currentFragment.lcpFragOrd.lcpId;
sendSignal(BACKUP_REF, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB);
+
+ if (lcpPtr.p->firstFragmentFlag)
+ {
+ lcpPtr.p->m_outstanding++;
+ sendSignal(PGMAN_REF, GSN_LCP_PREPARE_REQ, signal,
+ LcpPrepareReq::SignalLength, JBB);
+ }
}//Dblqh::sendLCP_FRAGIDREQ()
void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle)
@@ -13600,7 +13652,7 @@ void Dblqh::execRESTORE_LCP_CONF(Signal* signal)
lcpPtr.p->m_outstanding = 1;
signal->theData[0] = c_lcpId;
- sendSignal(TSMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
+ sendSignal(LGMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
return;
}
}
@@ -13654,7 +13706,7 @@ void Dblqh::execSTART_RECREQ(Signal* signal)
lcpPtr.p->m_outstanding = 1;
signal->theData[0] = c_lcpId;
- sendSignal(TSMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
+ sendSignal(LGMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
}//if
}//Dblqh::execSTART_RECREQ()
@@ -13681,17 +13733,18 @@ void Dblqh::execSTART_RECCONF(Signal* signal)
switch(refToBlock(sender)){
case TSMAN:
jam();
+ break;
+ case LGMAN:
+ jam();
lcpPtr.p->m_outstanding++;
signal->theData[0] = c_lcpId;
- sendSignal(LGMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
+ sendSignal(TSMAN_REF, GSN_START_RECREQ, signal, 1, JBB);
return;
- case LGMAN:
- jam();
break;
default:
ndbrequire(false);
}
-
+
jam();
csrExecUndoLogState = EULS_COMPLETED;
c_lcp_complete_fragments.first(fragptr);
@@ -15781,29 +15834,6 @@ void Dblqh::checkScanTcCompleted(Signal* signal)
}//if
}//Dblqh::checkScanTcCompleted()
-/* ==========================================================================
- * === CHECK IF ALL PARTS OF A SYSTEM RESTART ON A FRAGMENT ARE COMPLETED ===
- *
- * SUBROUTINE SHORT NAME = CSC
- * ========================================================================= */
-void Dblqh::checkSrCompleted(Signal* signal)
-{
- terrorCode = ZOK;
- ptrGuard(lcpPtr);
- if(lcpPtr.p->m_acc.lcpLocstate != LcpLocRecord::SR_ACC_COMPLETED)
- {
- ndbrequire(lcpPtr.p->m_acc.lcpLocstate == LcpLocRecord::SR_ACC_STARTED);
- return;
- }
-
- if(lcpPtr.p->m_tup.lcpLocstate != LcpLocRecord::SR_TUP_COMPLETED)
- {
- ndbrequire(lcpPtr.p->m_tup.lcpLocstate == LcpLocRecord::SR_TUP_STARTED);
- return;
- }
- lcpPtr.p->lcpState = LcpRecord::LCP_SR_COMPLETED;
-}//Dblqh::checkSrCompleted()
-
/* ------------------------------------------------------------------------- */
/* ------ CLOSE A FILE DURING EXECUTION OF FRAGMENT LOG ------- */
/* */
@@ -18116,12 +18146,10 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
// Print information about the current local checkpoint
TlcpPtr.i = 0;
ptrAss(TlcpPtr, lcpRecord);
- infoEvent(" lcpState=%d", TlcpPtr.p->lcpState);
- infoEvent(" lcpAccptr=%d lastFragmentFlag=%d",
- TlcpPtr.p->m_acc.lcpRef,
- TlcpPtr.p->lastFragmentFlag);
+ infoEvent(" lcpState=%d lastFragmentFlag=%d",
+ TlcpPtr.p->lcpState, TlcpPtr.p->lastFragmentFlag);
infoEvent("currentFragment.fragPtrI=%d",
- TlcpPtr.p->currentFragment.fragPtrI);
+ TlcpPtr.p->currentFragment.fragPtrI);
infoEvent("currentFragment.lcpFragOrd.tableId=%d",
TlcpPtr.p->currentFragment.lcpFragOrd.tableId);
infoEvent(" lcpQueued=%d reportEmpty=%d",
diff --git a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
index df929dafbd1..bb179ac52fb 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
@@ -2600,7 +2600,7 @@ private:
void disk_page_prealloc_callback_common(Signal*,
Ptr<Page_request>,
Ptr<Fragrecord>,
- Ptr<GlobalPage>);
+ Ptr<Page>);
void disk_page_alloc(Signal*,
Tablerec*, Fragrecord*, Local_key*, PagePtr, Uint32);
@@ -2631,18 +2631,22 @@ private:
void undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused);
+ void disk_page_set_dirty(Ptr<Page>);
+ void restart_setup_page(Ptr<Page>);
+ void update_extent_pos(Disk_alloc_info&, Ptr<Extent_info>);
+
/**
* Disk restart code
*/
public:
int disk_page_load_hook(Uint32 page_id);
-
- void disk_page_unmap_callback(Uint32 page_id);
+
+ void disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count);
int disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
const Local_key* key, Uint32 pages);
void disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
- const Local_key*, Uint32 old_bits, Uint32 bits);
+ const Local_key*, Uint32 bits);
void disk_restart_undo(Signal* signal, Uint64 lsn,
Uint32 type, const Uint32 * ptr, Uint32 len);
@@ -2654,6 +2658,7 @@ public:
Ptr<Tablerec> m_table_ptr;
Ptr<Fragrecord> m_fragment_ptr;
Ptr<Page> m_page_ptr;
+ Ptr<Extent_info> m_extent_ptr;
Local_key m_key;
};
@@ -2664,7 +2669,7 @@ private:
void disk_restart_undo_alloc(Apply_undo*);
void disk_restart_undo_update(Apply_undo*);
void disk_restart_undo_free(Apply_undo*);
- void disk_restart_undo_page_bits(Apply_undo*);
+ void disk_restart_undo_page_bits(Signal*, Apply_undo*);
#ifdef VM_TRACE
void verify_page_lists(Disk_alloc_info&);
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
index 242ad0c0a6f..92c7ce90464 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
@@ -327,6 +327,8 @@ Dbtup::disk_page_commit_callback(Signal* signal,
regOperPtr.p->m_commit_disk_callback_page= page_id;
m_global_page_pool.getPtr(m_pgman.m_ptr, page_id);
+ disk_page_set_dirty(*(Ptr<Page>*)&m_pgman.m_ptr);
+
execTUP_COMMITREQ(signal);
if(signal->theData[0] == 0)
c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
@@ -477,8 +479,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
* the page hot. XXX move to TUP which knows better.
*/
int flags= regOperPtr.p->op_struct.op_type |
- Page_cache_client::COMMIT_REQ | Page_cache_client::STRICT_ORDER |
- Page_cache_client::CORR_REQ;
+ Page_cache_client::COMMIT_REQ | Page_cache_client::CORR_REQ;
int res= m_pgman.get_page(signal, req, flags);
switch(res){
case 0:
@@ -491,6 +492,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
ndbrequire("NOT YET IMPLEMENTED" == 0);
break;
}
+ disk_page_set_dirty(*(Ptr<Page>*)&m_pgman.m_ptr);
regOperPtr.p->m_commit_disk_callback_page= res;
regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
}
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
index 51c4275d198..611105cc39c 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
@@ -17,6 +17,8 @@
#define DBTUP_C
#include "Dbtup.hpp"
+static bool f_undo_done = true;
+
static
NdbOut&
operator<<(NdbOut& out, const Ptr<Dbtup::Page> & ptr)
@@ -154,8 +156,6 @@ Dbtup::Disk_alloc_info::Disk_alloc_info(const Tablerec* tabPtrP,
{
abort();
}
-
-
}
Uint32
@@ -227,6 +227,48 @@ Dbtup::Disk_alloc_info::calc_extent_pos(const Extent_info* extP) const
return pos;
}
+void
+Dbtup::update_extent_pos(Disk_alloc_info& alloc,
+ Ptr<Extent_info> extentPtr)
+{
+ Uint32 old = extentPtr.p->m_free_matrix_pos;
+ if (old != RNIL)
+ {
+ Uint32 pos = alloc.calc_extent_pos(extentPtr.p);
+ if (old != pos)
+ {
+ jam();
+ Extent_list old_list(c_extent_pool, alloc.m_free_extents[old]);
+ Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
+ old_list.remove(extentPtr);
+ new_list.add(extentPtr);
+ extentPtr.p->m_free_matrix_pos= pos;
+ }
+ }
+ else
+ {
+ ddassert(alloc.m_curr_extent_info_ptr_i == extentPtr.i);
+ }
+}
+
+void
+Dbtup::restart_setup_page(Ptr<Page> pagePtr)
+{
+ /**
+ * Link to extent, clear uncommitted_used_space
+ */
+ pagePtr.p->uncommitted_used_space = 0;
+ pagePtr.p->m_restart_seq = globalData.m_restart_seq;
+
+ Extent_info key;
+ key.m_key.m_file_no = pagePtr.p->m_file_no;
+ key.m_key.m_page_idx = pagePtr.p->m_extent_no;
+ Ptr<Extent_info> extentPtr;
+ ndbrequire(c_extent_hash.find(extentPtr, key));
+ pagePtr.p->m_extent_info_ptr = extentPtr.i;
+}
+
+
/**
* - Page free bits -
* 0 = 00 - free - 100% free
@@ -236,6 +278,8 @@ Dbtup::Disk_alloc_info::calc_extent_pos(const Extent_info* extP) const
*
*/
+#define DBG_DISK 0
+
int
Dbtup::disk_page_prealloc(Signal* signal,
Ptr<Fragrecord> fragPtr,
@@ -252,6 +296,9 @@ Dbtup::disk_page_prealloc(Signal* signal,
fragPtrP->fragmentId,
fragPtrP->m_tablespace_id);
+ if (DBG_DISK)
+ ndbout << "disk_page_prealloc";
+
/**
* 1) search current dirty pages
*/
@@ -266,6 +313,8 @@ Dbtup::disk_page_prealloc(Signal* signal,
disk_page_prealloc_dirty_page(alloc, *(PagePtr*)&page, i, sz);
key->m_page_no= ((Page*)page.p)->m_page_no;
key->m_file_no= ((Page*)page.p)->m_file_no;
+ if (DBG_DISK)
+ ndbout << " found dirty page " << *key << endl;
return 0; // Page in memory
}
}
@@ -285,7 +334,8 @@ Dbtup::disk_page_prealloc(Signal* signal,
disk_page_prealloc_transit_page(alloc, req, i, sz);
* key = req.p->m_key;
- //ndbout_c("found transit page");
+ if (DBG_DISK)
+ ndbout << " found transit page " << *key << endl;
return 0;
}
}
@@ -346,26 +396,11 @@ Dbtup::disk_page_prealloc(Signal* signal,
if ((pos= alloc.find_extent(sz)) != RNIL)
{
jam();
- Uint32 cnt = 0;
LocalDLList<Extent_info> list(c_extent_pool, alloc.m_free_extents[pos]);
list.first(ext);
- while((pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits)) < 0)
- if(!list.next(ext) || ++cnt == 10)
- break;
-
- if (cnt == 10 || ext.isNull())
- {
- pos = RNIL;
- }
- else
- {
- list.remove(ext);
- alloc.m_curr_extent_info_ptr_i= ext.i;
- ext.p->m_free_matrix_pos= RNIL;
- }
+ list.remove(ext);
}
-
- if (pos == RNIL)
+ else
{
jam();
/**
@@ -386,7 +421,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
c_page_request_pool.release(req);
return err;
}
-
+
int pages= err;
ndbout << "allocated " << pages << " pages: " << ext.p->m_key << endl;
ext.p->m_first_page_no = ext.p->m_key.m_page_no;
@@ -398,19 +433,12 @@ Dbtup::disk_page_prealloc(Signal* signal,
LocalSLList<Extent_info, Extent_list_t>
list1(c_extent_pool, alloc.m_extent_list);
list1.add(ext);
-
- alloc.m_curr_extent_info_ptr_i= ext.i;
- ext.p->m_free_matrix_pos= RNIL;
- pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits);
-#ifdef VM_TRACE
- ddassert(pageBits >= 0);
-#else
- if (unlikely(pageBits < 0))
- {
- return -AllocExtentReq::NoExtentAvailable;
- }
-#endif
- }
+ }
+
+ alloc.m_curr_extent_info_ptr_i= ext.i;
+ ext.p->m_free_matrix_pos= RNIL;
+ pageBits= tsman.alloc_page_from_extent(&ext.p->m_key, bits);
+ ddassert(pageBits >= 0);
}
/**
@@ -418,6 +446,9 @@ Dbtup::disk_page_prealloc(Signal* signal,
*/
*key= req.p->m_key= ext.p->m_key;
+ if (DBG_DISK)
+ ndbout << " allocated page " << *key << endl;
+
/**
* We don't know exact free space of page
* but we know what page free bits it has.
@@ -460,7 +491,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
if (pageBits == 0)
{
//XXX empty page -> fast to map
- flags |= Page_cache_client::EMPTY_PAGE | Page_cache_client::NO_HOOK;
+ flags |= Page_cache_client::EMPTY_PAGE;
preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::disk_page_prealloc_initial_callback);
}
@@ -514,21 +545,7 @@ Dbtup::disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,
pagePtr.p->uncommitted_used_space = used;
ddassert(extentPtr.p->m_free_space >= sz);
extentPtr.p->m_free_space -= sz;
- Uint32 old_pos= extentPtr.p->m_free_matrix_pos;
- if (old_pos != RNIL) // Current extent
- {
- jam();
- Uint32 new_pos= alloc.calc_extent_pos(extentPtr.p);
- if (old_pos != new_pos)
- {
- jam();
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[new_pos]);
- old_list.remove(extentPtr);
- new_list.add(extentPtr);
- extentPtr.p->m_free_matrix_pos= new_pos;
- }
- }
+ update_extent_pos(alloc, extentPtr);
}
@@ -567,27 +584,13 @@ Dbtup::disk_page_prealloc_transit_page(Disk_alloc_info& alloc,
req.p->m_estimated_free_space = free - sz;
ddassert(extentPtr.p->m_free_space >= sz);
extentPtr.p->m_free_space -= sz;
- Uint32 old_pos= extentPtr.p->m_free_matrix_pos;
- if (old_pos != RNIL) // Current extent
- {
- jam();
- Uint32 new_pos= alloc.calc_extent_pos(extentPtr.p);
- if (old_pos != new_pos)
- {
- jam();
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[new_pos]);
- old_list.remove(extentPtr);
- new_list.add(extentPtr);
- extentPtr.p->m_free_matrix_pos= new_pos;
- }
- }
+ update_extent_pos(alloc, extentPtr);
}
void
Dbtup::disk_page_prealloc_callback(Signal* signal,
- Uint32 page_request, Uint32 page_id)
+ Uint32 page_request, Uint32 page_id)
{
//ndbout_c("disk_alloc_page_callback id: %d", page_id);
@@ -600,8 +603,15 @@ Dbtup::disk_page_prealloc_callback(Signal* signal,
Ptr<Fragrecord> fragPtr;
fragPtr.i= req.p->m_frag_ptr_i;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
-
- disk_page_prealloc_callback_common(signal, req, fragPtr, gpage);
+
+ Ptr<Page> pagePtr = *(Ptr<Page>*)&gpage;
+
+ if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
+ {
+ restart_setup_page(pagePtr);
+ }
+
+ disk_page_prealloc_callback_common(signal, req, fragPtr, pagePtr);
}
void
@@ -622,6 +632,7 @@ Dbtup::disk_page_prealloc_initial_callback(Signal*signal,
Ptr<GlobalPage> gpage;
m_global_page_pool.getPtr(gpage, page_id);
+ Ptr<Page> pagePtr = *(Ptr<Page>*)&gpage;
Ptr<Fragrecord> fragPtr;
fragPtr.i= req.p->m_frag_ptr_i;
@@ -634,37 +645,34 @@ Dbtup::disk_page_prealloc_initial_callback(Signal*signal,
Ptr<Extent_info> extentPtr;
c_extent_pool.getPtr(extentPtr, req.p->m_extent_info_ptr);
- Page* page= (Page*)gpage.p;
- page->m_page_no= req.p->m_key.m_page_no;
- page->m_file_no= req.p->m_key.m_file_no;
- page->m_table_id= fragPtr.p->fragTableId;
- page->m_fragment_id = fragPtr.p->fragmentId;
- page->m_extent_no = extentPtr.p->m_key.m_page_idx; // logical extent no
- page->m_extent_info_ptr= req.p->m_extent_info_ptr;
- page->m_restart_seq = globalData.m_restart_seq;
- page->list_index = 0x8000;
- page->uncommitted_used_space = 0;
- page->nextList = page->prevList = RNIL;
+ pagePtr.p->m_page_no= req.p->m_key.m_page_no;
+ pagePtr.p->m_file_no= req.p->m_key.m_file_no;
+ pagePtr.p->m_table_id= fragPtr.p->fragTableId;
+ pagePtr.p->m_fragment_id = fragPtr.p->fragmentId;
+ pagePtr.p->m_extent_no = extentPtr.p->m_key.m_page_idx; // logical extent no
+ pagePtr.p->m_extent_info_ptr= req.p->m_extent_info_ptr;
+ pagePtr.p->m_restart_seq = globalData.m_restart_seq;
+ pagePtr.p->list_index = 0x8000;
+ pagePtr.p->uncommitted_used_space = 0;
+ pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
if (tabPtr.p->m_attributes[DD].m_no_of_varsize == 0)
{
- convertThPage((Fix_page*)gpage.p, tabPtr.p, DD);
+ convertThPage((Fix_page*)pagePtr.p, tabPtr.p, DD);
}
else
{
abort();
}
- disk_page_prealloc_callback_common(signal, req, fragPtr, gpage);
+ disk_page_prealloc_callback_common(signal, req, fragPtr, pagePtr);
}
void
Dbtup::disk_page_prealloc_callback_common(Signal* signal,
Ptr<Page_request> req,
Ptr<Fragrecord> fragPtr,
- Ptr<GlobalPage> pagePtr)
+ Ptr<Page> pagePtr)
{
- Page* page= (Page*)pagePtr.p;
-
/**
* 1) remove page request from Disk_alloc_info.m_page_requests
* 2) Add page to Disk_alloc_info.m_dirty_pages
@@ -672,31 +680,31 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
* 4) inform pgman about current users
*/
Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
- ddassert((page->list_index & 0x8000) == 0x8000);
- ddassert(page->m_extent_info_ptr == req.p->m_extent_info_ptr);
- ddassert(page->m_page_no == req.p->m_key.m_page_no);
- ddassert(page->m_file_no == req.p->m_key.m_file_no);
+ ddassert((pagePtr.p->list_index & 0x8000) == 0x8000);
+ ddassert(pagePtr.p->m_extent_info_ptr == req.p->m_extent_info_ptr);
+ ddassert(pagePtr.p->m_page_no == req.p->m_key.m_page_no);
+ ddassert(pagePtr.p->m_file_no == req.p->m_key.m_file_no);
Uint32 old_idx = req.p->m_list_index;
Uint32 free= req.p->m_estimated_free_space;
Uint32 ext = req.p->m_extent_info_ptr;
Uint32 used= req.p->m_uncommitted_used_space;
- Uint32 real_free = page->free_space;
- Uint32 real_used = used + page->uncommitted_used_space;
+ Uint32 real_free = pagePtr.p->free_space;
+ Uint32 real_used = used + pagePtr.p->uncommitted_used_space;
ddassert(real_free >= free);
ddassert(real_free >= real_used);
ddassert(alloc.calc_page_free_bits(free) == old_idx);
Uint32 new_idx= alloc.calc_page_free_bits(real_free - real_used);
-
+
/**
* Add to dirty pages
*/
ArrayPool<Page> *cheat_pool= (ArrayPool<Page>*)&m_global_page_pool;
LocalDLList<Page> list(* cheat_pool, alloc.m_dirty_pages[new_idx]);
list.add(*(Ptr<Page>*)&pagePtr);
- page->uncommitted_used_space = real_used;
- page->list_index = new_idx;
+ pagePtr.p->uncommitted_used_space = real_used;
+ pagePtr.p->list_index = new_idx;
if (old_idx != new_idx || free != real_free)
{
@@ -712,21 +720,11 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
extentPtr.p->m_free_page_count[new_idx]++;
}
- Uint32 old_pos= extentPtr.p->m_free_matrix_pos;
- if (old_pos != RNIL) // Current extent
- {
- jam();
- Uint32 new_pos= alloc.calc_extent_pos(extentPtr.p);
- if (old_pos != new_pos)
- {
- jam();
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[new_pos]);
- old_list.remove(extentPtr);
- new_list.add(extentPtr);
- extentPtr.p->m_free_matrix_pos= new_pos;
- }
- }
+ update_extent_pos(alloc, extentPtr);
+ }
+ else
+ {
+ ndbout << endl;
}
{
@@ -736,103 +734,114 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
}
}
-int
-Dbtup::disk_page_load_hook(Uint32 page_id)
+void
+Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
{
- Ptr<GlobalPage> gpage;
- m_global_page_pool.getPtr(gpage, page_id);
+ Uint32 idx = pagePtr.p->list_index;
+ if ((idx & 0x8000) == 0)
+ {
+ /**
+ * Already in dirty list
+ */
+ return ;
+ }
- PagePtr pagePtr= *(PagePtr*)&gpage;
- Uint32 type = pagePtr.p->m_page_header.m_page_type;
- if (unlikely(type != File_formats::PT_Tup_fixsize_page &&
- type != File_formats::PT_Tup_varsize_page))
+ if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
{
- ndbassert(false);
- return 0;
+ restart_setup_page(pagePtr);
}
- pagePtr.p->list_index |= 0x8000;
- pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
-
Local_key key;
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
- if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
- {
- pagePtr.p->m_restart_seq = globalData.m_restart_seq;
- pagePtr.p->uncommitted_used_space = 0;
-
- Ptr<Tablerec> tabPtr;
- tabPtr.i= pagePtr.p->m_table_id;
- ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
-
- Ptr<Fragrecord> fragPtr;
- getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
-
- Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
- Uint32 idx= alloc.calc_page_free_bits(pagePtr.p->free_space);
-
- pagePtr.p->list_index = idx | 0x8000;
-
- Extent_info key;
- key.m_key.m_file_no = pagePtr.p->m_file_no;
- key.m_key.m_page_idx = pagePtr.p->m_extent_no;
- Ptr<Extent_info> extentPtr;
- ndbrequire(c_extent_hash.find(extentPtr, key));
- pagePtr.p->m_extent_info_ptr = extentPtr.i;
- return 1;
- }
+ if (DBG_DISK)
+ ndbout << " disk_page_set_dirty " << key << endl;
+
+ Uint32 tableId = pagePtr.p->m_table_id;
+ Uint32 fragId = pagePtr.p->m_fragment_id;
+
+ Uint32 free = pagePtr.p->free_space;
+ Uint32 used = pagePtr.p->uncommitted_used_space;
+
+ Ptr<Tablerec> tabPtr;
+ tabPtr.i= pagePtr.p->m_table_id;
+ ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+
+ Ptr<Fragrecord> fragPtr;
+ getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
+
+ Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
+ Tablespace_client tsman(0, c_tsman,
+ fragPtr.p->fragTableId,
+ fragPtr.p->fragmentId,
+ fragPtr.p->m_tablespace_id);
- return 0;
+ ddassert(free >= used);
+ idx= alloc.calc_page_free_bits(free - used);
+
+ pagePtr.p->list_index = idx;
+ ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
+ LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
+ list.add(pagePtr);
+
+ // Make sure no one will allocate it...
+ tsman.unmap_page(&key, MAX_FREE_LIST - 1);
}
void
-Dbtup::disk_page_unmap_callback(Uint32 page_id)
+Dbtup::disk_page_unmap_callback(Uint32 page_id, Uint32 dirty_count)
{
Ptr<GlobalPage> gpage;
m_global_page_pool.getPtr(gpage, page_id);
PagePtr pagePtr= *(PagePtr*)&gpage;
-
+
Uint32 type = pagePtr.p->m_page_header.m_page_type;
- if (unlikely(type != File_formats::PT_Tup_fixsize_page &&
- type != File_formats::PT_Tup_varsize_page))
+ if (unlikely((type != File_formats::PT_Tup_fixsize_page &&
+ type != File_formats::PT_Tup_varsize_page) ||
+ f_undo_done == false))
{
return ;
}
-
- Uint32 i = pagePtr.p->list_index;
-
+
Local_key key;
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
+ Uint32 idx = pagePtr.p->list_index;
- if ((i & 0x8000) == 0)
+ ndbassert((idx & 0x8000) == 0);
+
+ if (DBG_DISK)
+ ndbout << "disk_page_unmap_callback " << key << endl;
+
+ Ptr<Tablerec> tabPtr;
+ tabPtr.i= pagePtr.p->m_table_id;
+ ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+
+ Ptr<Fragrecord> fragPtr;
+ getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
+
+ Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
+
+ if (dirty_count == 0)
{
- Ptr<Tablerec> tabPtr;
- tabPtr.i= pagePtr.p->m_table_id;
- ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+ Uint32 free = pagePtr.p->free_space;
+ Uint32 used = pagePtr.p->uncommitted_used_space;
+ ddassert(free >= used);
+ ddassert(alloc.calc_page_free_bits(free - used) == idx);
- Ptr<Fragrecord> fragPtr;
- getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
+ Tablespace_client tsman(0, c_tsman,
+ fragPtr.p->fragTableId,
+ fragPtr.p->fragmentId,
+ fragPtr.p->m_tablespace_id);
- Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
-
- ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
- LocalDLList<Page> old(*pool, alloc.m_dirty_pages[i]);
- old.remove(pagePtr);
-
- if (pagePtr.p->uncommitted_used_space == 0)
- {
- Tablespace_client tsman(0, c_tsman,
- fragPtr.p->fragTableId,
- fragPtr.p->fragmentId,
- fragPtr.p->m_tablespace_id);
-
- tsman.unmap_page(&key);
- }
+ tsman.unmap_page(&key, idx);
+ pagePtr.p->list_index = idx | 0x8000;
}
- pagePtr.p->list_index = i | 0x8000;
+
+ ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
+ LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
+ list.remove(pagePtr);
}
void
@@ -873,7 +882,7 @@ Dbtup::disk_page_alloc(Signal* signal,
fragPtrP->fragTableId,
fragPtrP->fragmentId,
fragPtrP->m_tablespace_id);
-
+
tsman.update_page_free_bits(key, new_bits, lsn);
}
}
@@ -883,6 +892,9 @@ Dbtup::disk_page_free(Signal *signal,
Tablerec *tabPtrP, Fragrecord * fragPtrP,
Local_key* key, PagePtr pagePtr, Uint32 gci)
{
+ if (DBG_DISK)
+ ndbout << " disk_page_free " << *key << endl;
+
Uint32 page_idx= key->m_page_idx;
Uint32 logfile_group_id= fragPtrP->m_logfile_group_id;
Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
@@ -895,6 +907,7 @@ Dbtup::disk_page_free(Signal *signal,
{
sz = 1;
const Uint32 *src= ((Fix_page*)pagePtr.p)->get_ptr(page_idx, 0);
+ ndbassert(* (src + 1) != Tup_fixsize_page::FREE_RECORD);
lsn= disk_page_undo_free(pagePtr.p, key,
src, tabPtrP->m_offsets[DD].m_fix_header_size,
gci, logfile_group_id);
@@ -921,59 +934,40 @@ Dbtup::disk_page_free(Signal *signal,
fragPtrP->fragTableId,
fragPtrP->fragmentId,
fragPtrP->m_tablespace_id);
-
+
tsman.update_page_free_bits(key, new_bits, lsn);
}
Uint32 ext = pagePtr.p->m_extent_info_ptr;
Uint32 used = pagePtr.p->uncommitted_used_space;
+ Uint32 old_idx = pagePtr.p->list_index;
ddassert(old_free >= used);
ddassert(new_free >= used);
ddassert(new_free >= old_free);
- page_idx = pagePtr.p->list_index;
- Uint32 old_idx = page_idx & 0x7FFF;
+ ddassert((old_idx & 0x8000) == 0);
+
Uint32 new_idx = alloc.calc_page_free_bits(new_free - used);
ddassert(alloc.calc_page_free_bits(old_free - used) == old_idx);
-
+
Ptr<Extent_info> extentPtr;
c_extent_pool.getPtr(extentPtr, ext);
-
+
if (old_idx != new_idx)
{
ddassert(extentPtr.p->m_free_page_count[old_idx]);
extentPtr.p->m_free_page_count[old_idx]--;
extentPtr.p->m_free_page_count[new_idx]++;
- if (old_idx == page_idx)
- {
- ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
- LocalDLList<Page> old_list(*pool, alloc.m_dirty_pages[old_idx]);
- LocalDLList<Page> new_list(*pool, alloc.m_dirty_pages[new_idx]);
- old_list.remove(pagePtr);
- new_list.add(pagePtr);
- pagePtr.p->list_index = new_idx;
- }
- else
- {
- pagePtr.p->list_index = new_idx | 0x8000;
- }
+ ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
+ LocalDLList<Page> new_list(*pool, alloc.m_dirty_pages[new_idx]);
+ LocalDLList<Page> old_list(*pool, alloc.m_dirty_pages[old_idx]);
+ old_list.remove(pagePtr);
+ new_list.add(pagePtr);
+ pagePtr.p->list_index = new_idx;
}
extentPtr.p->m_free_space += sz;
- Uint32 old_pos = extentPtr.p->m_free_matrix_pos;
- if (old_pos != RNIL)
- {
- Uint32 pos= alloc.calc_extent_pos(extentPtr.p);
-
- if (pos != old_pos)
- {
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
- old_list.remove(extentPtr);
- new_list.add(extentPtr);
- extentPtr.p->m_free_matrix_pos= pos;
- }
- }
+ update_extent_pos(alloc, extentPtr);
}
void
@@ -1065,22 +1059,9 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
}
pagePtr.p->uncommitted_used_space = used - sz;
- extentPtr.p->m_free_space += sz;
- Uint32 old_pos = extentPtr.p->m_free_matrix_pos;
- if (old_pos != RNIL)
- {
- Uint32 pos= alloc.calc_extent_pos(extentPtr.p);
-
- if (pos != old_pos)
- {
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
- old_list.remove(extentPtr);
- new_list.add(extentPtr);
- extentPtr.p->m_free_matrix_pos= pos;
- }
- }
+ extentPtr.p->m_free_space += sz;
+ update_extent_pos(alloc, extentPtr);
}
Uint64
@@ -1160,109 +1141,17 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
return lsn;
}
-int
-Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
- const Local_key* key, Uint32 pages)
-{
- TablerecPtr tabPtr;
- FragrecordPtr fragPtr;
- tabPtr.i = tableId;
- ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
- if (tabPtr.p->tableStatus == DEFINED)
- {
- getFragmentrec(fragPtr, fragId, tabPtr.p);
- if (!fragPtr.isNull())
- {
- Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
-
- Ptr<Extent_info> ext;
- ndbrequire(c_extent_pool.seize(ext));
-
- ext.p->m_key = *key;
- ndbout << "allocated " << pages << " pages: " << ext.p->m_key << endl;
- ext.p->m_first_page_no = ext.p->m_key.m_page_no;
- bzero(ext.p->m_free_page_count, sizeof(ext.p->m_free_page_count));
- ext.p->m_free_space= alloc.m_page_free_bits_map[0] * pages;
- ext.p->m_free_page_count[0]= pages; // All pages are "free"-est
-
- if (alloc.m_curr_extent_info_ptr_i != RNIL)
- {
- Ptr<Extent_info> old;
- c_extent_pool.getPtr(old, alloc.m_curr_extent_info_ptr_i);
- ndbassert(old.p->m_free_matrix_pos == RNIL);
- Uint32 pos= alloc.calc_extent_pos(old.p);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
- new_list.add(old);
- old.p->m_free_matrix_pos= pos;
- }
-
- alloc.m_curr_extent_info_ptr_i = ext.i;
- ext.p->m_free_matrix_pos = RNIL;
- c_extent_hash.add(ext);
-
- LocalSLList<Extent_info, Extent_list_t>
- list1(c_extent_pool, alloc.m_extent_list);
- list1.add(ext);
- return 0;
- }
- }
-
- return -1;
-}
-
-void
-Dbtup::disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
- const Local_key*, Uint32 old_bits, Uint32 bits)
-{
- TablerecPtr tabPtr;
- FragrecordPtr fragPtr;
- tabPtr.i = tableId;
- ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
- getFragmentrec(fragPtr, fragId, tabPtr.p);
- Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
-
- Ptr<Extent_info> ext;
- c_extent_pool.getPtr(ext, alloc.m_curr_extent_info_ptr_i);
-
- Uint32 size= alloc.calc_page_free_space(bits);
- Uint32 old_size= alloc.calc_page_free_space(old_bits);
-
- if (bits != old_bits)
- {
- ndbassert(ext.p->m_free_page_count[old_bits] > 0);
- ndbassert(ext.p->m_free_space >= old_size);
-
- ext.p->m_free_page_count[bits]++;
- ext.p->m_free_page_count[old_bits]--;
-
- ext.p->m_free_space += size;
- ext.p->m_free_space -= old_size;
-
- Uint32 old_pos = ext.p->m_free_matrix_pos;
- if (old_pos != RNIL)
- {
- Uint32 pos= alloc.calc_extent_pos(ext.p);
-
- if (pos != old_pos)
- {
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
- old_list.remove(ext);
- new_list.add(ext);
- ext.p->m_free_matrix_pos= pos;
- }
- }
- }
-}
-
#include <signaldata/LgmanContinueB.hpp>
static Dbtup::Apply_undo f_undo;
+#define DBG_UNDO 0
+
void
Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
Uint32 type, const Uint32 * ptr, Uint32 len)
{
+ f_undo_done = false;
f_undo.m_lsn= lsn;
f_undo.m_ptr= ptr;
f_undo.m_len= len;
@@ -1285,7 +1174,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
Disk_undo::Alloc* rec= (Disk_undo::Alloc*)ptr;
preq.m_page.m_page_no = rec->m_page_no;
preq.m_page.m_file_no = rec->m_file_no_page_idx >> 16;
- preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
+ preq.m_page.m_page_idx = rec->m_file_no_page_idx & 0xFFFF;
break;
}
case File_formats::Undofile::UNDO_TUP_UPDATE:
@@ -1319,6 +1208,9 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
disk_restart_undo_next(signal);
return;
}
+ case File_formats::Undofile::UNDO_END:
+ f_undo_done = true;
+ return;
default:
ndbrequire(false);
}
@@ -1327,7 +1219,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::disk_restart_undo_callback);
- int flags = Page_cache_client::NO_HOOK;
+ int flags = 0;
int res= m_pgman.get_page(signal, preq, flags);
switch(res)
{
@@ -1372,150 +1264,93 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
Uint32 page_id)
{
jamEntry();
- Ptr<GlobalPage> page;
- m_global_page_pool.getPtr(page, page_id);
-
- Page* pageP = (Page*)page.p;
+ Ptr<GlobalPage> gpage;
+ m_global_page_pool.getPtr(gpage, page_id);
+ Ptr<Page> pagePtr = *(Ptr<Page>*)&gpage;
+ Apply_undo* undo = &f_undo;
+
bool update = false;
- if (! (pageP->list_index & 0x8000) ||
- pageP->nextList != RNIL ||
- pageP->prevList != RNIL)
+ if (! (pagePtr.p->list_index & 0x8000) ||
+ pagePtr.p->nextList != RNIL ||
+ pagePtr.p->prevList != RNIL)
{
update = true;
- pageP->list_index |= 0x8000;
- pageP->nextList = pageP->prevList = RNIL;
+ pagePtr.p->list_index |= 0x8000;
+ pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
}
-
- Ptr<Tablerec> tabPtr;
- tabPtr.i= pageP->m_table_id;
- ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
- if (tabPtr.p->tableStatus != DEFINED)
+ Uint32 tableId= pagePtr.p->m_table_id;
+ Uint32 fragId = pagePtr.p->m_fragment_id;
+
+ if (tableId >= cnoOfTablerec)
{
disk_restart_undo_next(signal);
return;
}
-
- Ptr<Fragrecord> fragPtr;
- getFragmentrec(fragPtr, pageP->m_fragment_id, tabPtr.p);
- if(fragPtr.isNull())
+ undo->m_table_ptr.i = tableId;
+ ptrCheckGuard(undo->m_table_ptr, cnoOfTablerec, tablerec);
+
+ if (undo->m_table_ptr.p->tableStatus != DEFINED)
+ {
+ disk_restart_undo_next(signal);
+ return;
+ }
+
+ getFragmentrec(undo->m_fragment_ptr, fragId, undo->m_table_ptr.p);
+ if(undo->m_fragment_ptr.isNull())
{
disk_restart_undo_next(signal);
return;
}
-
- Local_key key;
- key.m_page_no = pageP->m_page_no;
- key.m_file_no = pageP->m_file_no;
- if (pageP->m_restart_seq != globalData.m_restart_seq)
+ if (undo->m_fragment_ptr.p->m_undo_complete)
{
- {
- Extent_info key;
- key.m_key.m_file_no = pageP->m_file_no;
- key.m_key.m_page_idx = pageP->m_extent_no;
- Ptr<Extent_info> extentPtr;
- if (c_extent_hash.find(extentPtr, key))
- {
- pageP->m_extent_info_ptr = extentPtr.i;
- }
- else
- {
- /**
- * Extent was not allocated at start of LCP
- * (or was freed during)
- * I.e page does not need to be undoed as it's
- * really free
- */
- disk_restart_undo_next(signal);
- return;
- }
- }
-
- update= true;
- pageP->m_restart_seq = globalData.m_restart_seq;
- pageP->uncommitted_used_space = 0;
-
- Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
- Uint32 idx= alloc.calc_page_free_bits(pageP->free_space);
-
- pageP->list_index = idx | 0x8000;
-
+ disk_restart_undo_next(signal);
+ return;
}
+ Local_key key;
+ key.m_page_no = pagePtr.p->m_page_no;
+ key.m_file_no = pagePtr.p->m_file_no;
+
Uint64 lsn = 0;
- lsn += pageP->m_page_header.m_page_lsn_hi; lsn <<= 32;
- lsn += pageP->m_page_header.m_page_lsn_lo;
+ lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32;
+ lsn += pagePtr.p->m_page_header.m_page_lsn_lo;
- if (f_undo.m_lsn <= lsn)
+ if (undo->m_lsn <= lsn)
{
- Uint32 tableId= pageP->m_table_id;
- Uint32 fragId = pageP->m_fragment_id;
+ undo->m_page_ptr = pagePtr;
- f_undo.m_table_ptr.i= tableId;
- if (tableId < cnoOfTablerec)
- {
- ptrCheckGuard(f_undo.m_table_ptr, cnoOfTablerec, tablerec);
-
- if (f_undo.m_table_ptr.p->tableStatus == DEFINED)
- {
- getFragmentrec(f_undo.m_fragment_ptr, fragId, f_undo.m_table_ptr.p);
- if (!f_undo.m_fragment_ptr.isNull())
- {
- if (!f_undo.m_fragment_ptr.p->m_undo_complete)
- {
- f_undo.m_page_ptr.i = page_id;
- f_undo.m_page_ptr.p = pageP;
-
- update = true;
- ndbout_c("applying %lld", f_undo.m_lsn);
- /**
- * Apply undo record
- */
- switch(f_undo.m_type){
- case File_formats::Undofile::UNDO_TUP_ALLOC:
- disk_restart_undo_alloc(&f_undo);
- break;
- case File_formats::Undofile::UNDO_TUP_UPDATE:
- disk_restart_undo_update(&f_undo);
- break;
- case File_formats::Undofile::UNDO_TUP_FREE:
- disk_restart_undo_free(&f_undo);
- break;
- default:
- ndbrequire(false);
- }
-
- disk_restart_undo_page_bits(&f_undo);
-
- lsn = f_undo.m_lsn - 1; // make sure undo isn't run again...
- }
- else
- {
- ndbout_c("lsn %lld frag undo complete", f_undo.m_lsn);
- }
- }
- else
- {
- ndbout_c("lsn %lld table not defined", f_undo.m_lsn);
- }
- }
- else
- {
- ndbout_c("lsn %lld no such table", f_undo.m_lsn);
- }
- }
- else
- {
- ndbout_c("f_undo.m_lsn %lld > lsn %lld -> skip",
- f_undo.m_lsn, lsn);
+ update = true;
+ if (DBG_UNDO)
+ ndbout_c("applying %lld", undo->m_lsn);
+ /**
+ * Apply undo record
+ */
+ switch(undo->m_type){
+ case File_formats::Undofile::UNDO_TUP_ALLOC:
+ disk_restart_undo_alloc(undo);
+ break;
+ case File_formats::Undofile::UNDO_TUP_UPDATE:
+ disk_restart_undo_update(undo);
+ break;
+ case File_formats::Undofile::UNDO_TUP_FREE:
+ disk_restart_undo_free(undo);
+ break;
+ default:
+ ndbrequire(false);
}
+
+ if (DBG_UNDO)
+ ndbout << "disk_restart_undo: " << undo->m_type << " "
+ << undo->m_key << endl;
- if (update)
- {
- m_pgman.update_lsn(f_undo.m_key, lsn);
- }
+ disk_restart_undo_page_bits(signal, undo);
+
+ lsn = undo->m_lsn - 1; // make sure undo isn't run again...
+
+ m_pgman.update_lsn(undo->m_key, lsn);
}
disk_restart_undo_next(signal);
@@ -1578,7 +1413,7 @@ Dbtup::disk_restart_undo_free(Apply_undo* undo)
}
void
-Dbtup::disk_restart_undo_page_bits(Apply_undo* undo)
+Dbtup::disk_restart_undo_page_bits(Signal* signal, Apply_undo* undo)
{
Fragrecord* fragPtrP = undo->m_fragment_ptr.p;
Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
@@ -1587,46 +1422,86 @@ Dbtup::disk_restart_undo_page_bits(Apply_undo* undo)
* Set alloc.m_curr_extent_info_ptr_i to
* current this extent (and move old extend into free matrix)
*/
- Ptr<Extent_info> extentPtr;
- c_extent_pool.getPtr(extentPtr, undo->m_page_ptr.p->m_extent_info_ptr);
+ Page* pageP = undo->m_page_ptr.p;
+ Uint32 free = pageP->free_space;
+ Uint32 new_bits = alloc.calc_page_free_bits(free);
+ pageP->list_index = 0x8000 | new_bits;
- Uint32 currExtI = alloc.m_curr_extent_info_ptr_i;
- if (extentPtr.i != currExtI && currExtI != RNIL)
- {
- Ptr<Extent_info> currExtPtr;
- c_extent_pool.getPtr(currExtPtr, currExtI);
- ndbrequire(currExtPtr.p->m_free_matrix_pos == RNIL);
-
- Uint32 pos= alloc.calc_extent_pos(currExtPtr.p);
- Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
- new_list.add(currExtPtr);
- currExtPtr.p->m_free_matrix_pos= pos;
- //ndbout_c("moving extent from %d to %d", old_pos, new_pos);
- }
+ Tablespace_client tsman(signal, c_tsman,
+ fragPtrP->fragTableId,
+ fragPtrP->fragmentId,
+ fragPtrP->m_tablespace_id);
- if (extentPtr.i != currExtI)
- {
- Uint32 old_pos = extentPtr.p->m_free_matrix_pos;
- Extent_list old_list(c_extent_pool, alloc.m_free_extents[old_pos]);
- old_list.remove(extentPtr);
- alloc.m_curr_extent_info_ptr_i = extentPtr.i;
- extentPtr.p->m_free_matrix_pos = RNIL;
- }
- else
+ tsman.restart_undo_page_free_bits(&undo->m_key, new_bits, undo->m_lsn);
+}
+
+int
+Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
+ const Local_key* key, Uint32 pages)
+{
+ TablerecPtr tabPtr;
+ FragrecordPtr fragPtr;
+ tabPtr.i = tableId;
+ ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+ if (tabPtr.p->tableStatus == DEFINED)
{
- ndbrequire(extentPtr.p->m_free_matrix_pos == RNIL);
+ getFragmentrec(fragPtr, fragId, tabPtr.p);
+ if (!fragPtr.isNull())
+ {
+ Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
+
+ Ptr<Extent_info> ext;
+ ndbrequire(c_extent_pool.seize(ext));
+
+ ndbout << "allocated " << pages << " pages: " << *key << endl;
+
+ ext.p->m_key = *key;
+ ext.p->m_first_page_no = ext.p->m_key.m_page_no;
+ ext.p->m_free_space= 0;
+ bzero(ext.p->m_free_page_count, sizeof(ext.p->m_free_page_count));
+
+ if (alloc.m_curr_extent_info_ptr_i != RNIL)
+ {
+ Ptr<Extent_info> old;
+ c_extent_pool.getPtr(old, alloc.m_curr_extent_info_ptr_i);
+ ndbassert(old.p->m_free_matrix_pos == RNIL);
+ Uint32 pos= alloc.calc_extent_pos(old.p);
+ Extent_list new_list(c_extent_pool, alloc.m_free_extents[pos]);
+ new_list.add(old);
+ old.p->m_free_matrix_pos= pos;
+ }
+
+ alloc.m_curr_extent_info_ptr_i = ext.i;
+ ext.p->m_free_matrix_pos = RNIL;
+ c_extent_hash.add(ext);
+
+ LocalSLList<Extent_info, Extent_list_t>
+ list1(c_extent_pool, alloc.m_extent_list);
+ list1.add(ext);
+ return 0;
+ }
}
- /**
- * Compute and update free bits for this page
- */
- Uint32 free = undo->m_page_ptr.p->free_space;
- Uint32 bits = alloc.calc_page_free_bits(free);
-
- Tablespace_client tsman(0, c_tsman,
- fragPtrP->fragTableId,
- fragPtrP->fragmentId,
- fragPtrP->m_tablespace_id);
+ return -1;
+}
- tsman.restart_undo_page_free_bits(&undo->m_key, bits, undo->m_lsn);
+void
+Dbtup::disk_restart_page_bits(Uint32 tableId, Uint32 fragId,
+ const Local_key*, Uint32 bits)
+{
+ TablerecPtr tabPtr;
+ FragrecordPtr fragPtr;
+ tabPtr.i = tableId;
+ ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+ getFragmentrec(fragPtr, fragId, tabPtr.p);
+ Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
+
+ Ptr<Extent_info> ext;
+ c_extent_pool.getPtr(ext, alloc.m_curr_extent_info_ptr_i);
+
+ Uint32 size= alloc.calc_page_free_space(bits);
+
+ ext.p->m_free_space += size;
+ ext.p->m_free_page_count[bits]++;
+ ndbassert(ext.p->m_free_matrix_pos == RNIL);
}
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
index bc186e53492..0644d620eab 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
@@ -484,8 +484,6 @@ Dbtup::load_diskpage(Signal* signal,
req.m_callback.m_callbackFunction=
safe_cast(&Dbtup::disk_page_load_callback);
- // Make sure we maintain order
- flags |= Page_cache_client::STRICT_ORDER;
if((res= m_pgman.get_page(signal, req, flags)) > 0)
{
//ndbout_c("in cache");
@@ -563,8 +561,6 @@ Dbtup::load_diskpage_scan(Signal* signal,
req.m_callback.m_callbackFunction=
safe_cast(&Dbtup::disk_page_load_scan_callback);
- // Make sure we maintain order
- flags |= Page_cache_client::STRICT_ORDER;
if((res= m_pgman.get_page(signal, req, flags)) > 0)
{
// ndbout_c("in cache");
@@ -3111,8 +3107,7 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData,
preq.m_callback.m_callbackData = senderData;
preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::nr_delete_page_callback);
- int flags = Page_cache_client::COMMIT_REQ |
- Page_cache_client::STRICT_ORDER;
+ int flags = Page_cache_client::COMMIT_REQ;
res = m_pgman.get_page(signal, preq, flags);
if (res == 0)
{
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
index 177d5c7cc08..3fc6184f63e 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
@@ -694,10 +694,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
frag.fragTableId,
frag.fragmentId,
frag.m_tablespace_id);
- unsigned bits = ~(unsigned)0;
- int ret = tsman.get_page_free_bits(&key, &bits);
+ unsigned uncommitted, committed;
+ uncommitted = committed = ~(unsigned)0;
+ int ret = tsman.get_page_free_bits(&key, &uncommitted, &committed);
ndbrequire(ret == 0);
- if (bits == 0) {
+ if (committed == 0) {
// skip empty page
jam();
pos.m_get = ScanPos::Get_next_page_dd;
@@ -710,7 +711,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
preq.m_callback.m_callbackData = scanPtr.i;
preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::disk_page_tup_scan_callback);
- int flags = Page_cache_client::STRICT_ORDER;
+ int flags = 0;
int res = m_pgman.get_page(signal, preq, flags);
if (res == 0) {
jam();
diff --git a/storage/ndb/src/kernel/blocks/diskpage.hpp b/storage/ndb/src/kernel/blocks/diskpage.hpp
index 108352f8fb7..044597be1cf 100644
--- a/storage/ndb/src/kernel/blocks/diskpage.hpp
+++ b/storage/ndb/src/kernel/blocks/diskpage.hpp
@@ -222,7 +222,7 @@ File_formats::Datafile::Extent_header::check_free(Uint32 extent_size) const
for(; words; words--)
sum |= m_page_bitmask[words-1];
- if(sum & 0x7777)
+ if(sum & 0x3333)
return false;
return true;
diff --git a/storage/ndb/src/kernel/blocks/lgman.cpp b/storage/ndb/src/kernel/blocks/lgman.cpp
index fb01584a52e..1e908e8f8e0 100644
--- a/storage/ndb/src/kernel/blocks/lgman.cpp
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp
@@ -2887,6 +2887,9 @@ Lgman::stop_run_undo_log(Signal* signal)
void
Lgman::execEND_LCP_CONF(Signal* signal)
{
+ Dbtup* tup= (Dbtup*)globalData.getBlock(DBTUP);
+ tup->disk_restart_undo(signal, 0, File_formats::Undofile::UNDO_END, 0, 0);
+
/**
* pgman has completed flushing all pages
*
diff --git a/storage/ndb/src/kernel/blocks/pgman.cpp b/storage/ndb/src/kernel/blocks/pgman.cpp
index 71cf283e75b..3723cf2e74c 100644
--- a/storage/ndb/src/kernel/blocks/pgman.cpp
+++ b/storage/ndb/src/kernel/blocks/pgman.cpp
@@ -70,9 +70,10 @@ Pgman::Pgman(const Configuration & conf) :
addRecSignal(GSN_FSWRITEREF, &Pgman::execFSWRITEREF, true);
addRecSignal(GSN_FSWRITECONF, &Pgman::execFSWRITECONF);
+ addRecSignal(GSN_LCP_PREPARE_REQ, &Pgman::execLCP_PREPARE_REQ);
addRecSignal(GSN_LCP_FRAG_ORD, &Pgman::execLCP_FRAG_ORD);
addRecSignal(GSN_END_LCP_REQ, &Pgman::execEND_LCP_REQ);
-
+
// loop status
m_stats_loop_on = false;
m_busy_loop_on = false;
@@ -84,8 +85,6 @@ Pgman::Pgman(const Configuration & conf) :
m_last_lcp_complete = 0;
m_lcp_curr_bucket = ~(Uint32)0;
m_lcp_outstanding = 0;
- m_lcp_copy_page = RNIL;
- m_lcp_copy_page_free = false;
// clean-up variables
m_cleanup_ptr.i = RNIL;
@@ -175,10 +174,6 @@ Pgman::execSTTOR(Signal* signal)
break;
case 3:
{
- Ptr<GlobalPage> page_ptr;
- ndbrequire(m_global_page_pool.seize(page_ptr));
- m_lcp_copy_page = page_ptr.i;
- m_lcp_copy_page_free = true;
// start forever loops
do_stats_loop(signal);
do_cleanup_loop(signal);
@@ -229,6 +224,23 @@ Pgman::execCONTINUEB(Signal* signal)
jam();
do_lcp_loop(signal);
break;
+ case PgmanContinueB::LCP_PREPARE:
+ {
+ jam();
+ Ptr<Page_entry> ptr;
+ Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
+ pl.getPtr(ptr, data1);
+ if (pl.next(ptr))
+ {
+ process_lcp_prepare(signal, ptr);
+ }
+ else
+ {
+ signal->theData[0] = 0;
+ sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB);
+ }
+ return;
+ }
default:
ndbrequire(false);
break;
@@ -242,8 +254,10 @@ Pgman::Page_entry::Page_entry(Uint32 file_no, Uint32 page_no) :
m_file_no(file_no),
m_page_no(page_no),
m_real_page_i(RNIL),
+ m_copy_page_i(RNIL),
m_lsn(0),
m_last_lcp(0),
+ m_dirty_count(0),
m_busy_count(0),
m_requests()
{
@@ -252,7 +266,7 @@ Pgman::Page_entry::Page_entry(Uint32 file_no, Uint32 page_no) :
// page lists
Uint32
-Pgman::get_sublist_no(Uint16 state)
+Pgman::get_sublist_no(Page_state state)
{
if (state == 0)
{
@@ -290,14 +304,14 @@ Pgman::get_sublist_no(Uint16 state)
}
void
-Pgman::set_page_state(Ptr<Page_entry> ptr, Uint16 new_state)
+Pgman::set_page_state(Ptr<Page_entry> ptr, Page_state new_state)
{
#ifdef VM_TRACE
debugOut << "PGMAN: >set_page_state: state=" << hex << new_state << endl;
debugOut << "PGMAN: " << ptr << ": before" << endl;
#endif
- Uint16 old_state = ptr.p->m_state;
+ Page_state old_state = ptr.p->m_state;
if (old_state != new_state)
{
Uint32 old_list_no = get_sublist_no(old_state);
@@ -424,7 +438,7 @@ Pgman::release_page_entry(Ptr<Page_entry>& ptr)
debugOut << "PGMAN: release_page_entry" << endl;
debugOut << "PGMAN: " << ptr << endl;
#endif
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(! (state & Page_entry::REQUEST));
ndbrequire(ptr.p->m_requests.isEmpty());
@@ -459,7 +473,7 @@ Pgman::lirs_stack_prune()
while (pl_stack.first(ptr)) // first is stack bottom
{
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
if (state & Page_entry::HOT)
{
jam();
@@ -514,7 +528,7 @@ Pgman::lirs_stack_pop()
Ptr<Page_entry> ptr;
bool ok = pl_stack.first(ptr);
ndbrequire(ok);
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
#ifdef VM_TRACE
debugOut << "PGMAN: " << ptr << ": pop from stack" << endl;
@@ -557,7 +571,7 @@ Pgman::lirs_reference(Ptr<Page_entry> ptr)
Page_stack& pl_stack = m_page_stack;
Page_queue& pl_queue = m_page_queue;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(! (state & Page_entry::LOCKED));
// even non-LIRS cache pages are counted on l.h.s.
@@ -811,7 +825,7 @@ Pgman::process_bind(Signal* signal, Ptr<Page_entry> ptr)
// XXX busy loop
return false;
}
- Uint16 clean_state = clean_ptr.p->m_state;
+ Page_state clean_state = clean_ptr.p->m_state;
// under unusual circumstances it could still be paging in
if (! (clean_state & Page_entry::MAPPED) ||
clean_state & Page_entry::DIRTY ||
@@ -830,6 +844,7 @@ Pgman::process_bind(Signal* signal, Ptr<Page_entry> ptr)
debugOut << "PGMAN: " << clean_ptr << " : evict" << endl;
#endif
+ ndbassert(clean_ptr.p->m_dirty_count == 0);
ndbrequire(clean_state & Page_entry::ONQUEUE);
ndbrequire(clean_state & Page_entry::BOUND);
ndbrequire(clean_state & Page_entry::MAPPED);
@@ -840,7 +855,6 @@ Pgman::process_bind(Signal* signal, Ptr<Page_entry> ptr)
gptr.i = clean_ptr.p->m_real_page_i;
- c_tup->disk_page_unmap_callback(clean_ptr.p->m_real_page_i);
clean_ptr.p->m_real_page_i = RNIL;
clean_state &= ~ Page_entry::BOUND;
clean_state &= ~ Page_entry::MAPPED;
@@ -853,7 +867,7 @@ Pgman::process_bind(Signal* signal, Ptr<Page_entry> ptr)
m_global_page_pool.getPtr(gptr);
}
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ptr.p->m_real_page_i = gptr.i;
state |= Page_entry::BOUND;
@@ -952,7 +966,7 @@ Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr)
debugOut << "PGMAN: " << ptr << " : process_callback" << endl;
#endif
int max_count = 1;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
while (! ptr.p->m_requests.isEmpty() && --max_count >= 0)
{
@@ -981,19 +995,19 @@ Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr)
{
jam();
state |= Page_entry::DIRTY;
+ ndbassert(ptr.p->m_dirty_count);
+ ptr.p->m_dirty_count --;
}
}
ndbrequire(state & Page_entry::BOUND);
ndbrequire(state & Page_entry::MAPPED);
-
+
// callback may re-enter PGMAN and change page state
set_page_state(ptr, state);
b->execute(signal, callback, ptr.p->m_real_page_i);
state = ptr.p->m_state;
-
- state &= ~ Page_entry::NO_HOOK;
}
-
+
if (ptr.p->m_requests.isEmpty())
{
jam();
@@ -1040,7 +1054,7 @@ Pgman::process_cleanup(Signal* signal)
Ptr<Page_entry> ptr = m_cleanup_ptr;
while (max_loop_count != 0 && max_count != 0)
{
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(! (state & Page_entry::LOCKED));
if (state & Page_entry::BUSY)
{
@@ -1057,6 +1071,8 @@ Pgman::process_cleanup(Signal* signal)
#ifdef VM_TRACE
debugOut << "PGMAN: " << ptr << " : process_cleanup" << endl;
#endif
+ c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
+ ptr.p->m_dirty_count);
pageout(signal, ptr);
max_count--;
}
@@ -1090,6 +1106,91 @@ Pgman::move_cleanup_ptr(Ptr<Page_entry> ptr)
// LCP
void
+Pgman::execLCP_PREPARE_REQ(Signal* signal)
+{
+ jamEntry();
+
+ /**
+ * Reserve pages for all LOCKED pages...
+ */
+ Ptr<Page_entry> ptr;
+ Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
+ if (pl.first(ptr))
+ {
+ process_lcp_prepare(signal, ptr);
+ }
+ else
+ {
+ signal->theData[0] = 0;
+ sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB);
+ }
+}
+
+void
+Pgman::process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr)
+{
+ ndbrequire(ptr.p->m_copy_page_i == RNIL);
+
+ Ptr<GlobalPage> copy;
+ ndbrequire(m_global_page_pool.seize(copy));
+ ptr.p->m_copy_page_i = copy.i;
+
+ DBG_LCP("assigning copy page to " << ptr << endl);
+
+ signal->theData[0] = PgmanContinueB::LCP_PREPARE;
+ signal->theData[1] = ptr.i;
+ sendSignal(PGMAN_REF, GSN_CONTINUEB, signal, 2, JBB);
+}
+
+int
+Pgman::create_copy_page(Ptr<Page_entry> ptr, Uint32 req_flags)
+{
+ DBG_LCP(<< ptr << " create_copy_page ");
+
+ if (! (req_flags & DIRTY_FLAGS) && ! (ptr.p->m_state & Page_entry::COPY))
+ {
+ DBG_LCP(" return original" << endl);
+ return ptr.p->m_real_page_i;
+ }
+ if (! (ptr.p->m_state & Page_entry::COPY))
+ {
+ ptr.p->m_state |= Page_entry::COPY;
+
+ Ptr<GlobalPage> src;
+ Ptr<GlobalPage> copy;
+ m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
+ m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
+ memcpy(copy.p, src.p, sizeof(GlobalPage));
+ DBG_LCP("making copy... ");
+ }
+ DBG_LCP("return " << ptr.p->m_copy_page_i);
+ return ptr.p->m_copy_page_i;
+}
+
+void
+Pgman::restore_copy_page(Ptr<Page_entry> ptr)
+{
+ DBG_LCP(ptr << " restore_copy_page");
+
+ Uint32 copyPtrI = ptr.p->m_copy_page_i;
+ if (ptr.p->m_state & Page_entry::COPY)
+ {
+ DBG_LCP(" copy back");
+ Ptr<GlobalPage> src;
+ Ptr<GlobalPage> copy;
+ m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
+ m_global_page_pool.getPtr(copy, copyPtrI);
+ memcpy(src.p, copy.p, sizeof(GlobalPage));
+ }
+
+ m_global_page_pool.release(copyPtrI);
+ DBG_LCP(endl);
+
+ ptr.p->m_state &= ~Page_entry::COPY;
+ ptr.p->m_copy_page_i = RNIL;
+}
+
+void
Pgman::execLCP_FRAG_ORD(Signal* signal)
{
LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtr();
@@ -1098,16 +1199,15 @@ Pgman::execLCP_FRAG_ORD(Signal* signal)
DBG_LCP("execLCP_FRAG_ORD" << endl);
ndbrequire(!m_lcp_outstanding);
- ndbrequire(m_lcp_copy_page_free);
m_lcp_curr_bucket = 0;
-
+
#ifdef VM_TRACE
debugOut
<< "PGMAN: execLCP_FRAG_ORD"
<< " this=" << m_last_lcp << " last_complete=" << m_last_lcp_complete
<< " bucket=" << m_lcp_curr_bucket << endl;
#endif
-
+
do_lcp_loop(signal, true);
}
@@ -1168,7 +1268,7 @@ Pgman::process_lcp(Signal* signal)
(loop ++ < 32 || iter.bucket == m_lcp_curr_bucket))
{
Ptr<Page_entry>& ptr = iter.curr;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
DBG_LCP("LCP "
<< " m_lcp_outstanding: " << m_lcp_outstanding
@@ -1190,28 +1290,7 @@ Pgman::process_lcp(Signal* signal)
DBG_LCP(" BUSY" << endl);
break; // wait for it
}
- if (state & Page_entry::LOCKED)
- {
- /**
- * Special handling of LOCKED pages...only write 1 at a time...
- * using copy page (m_lcp_copy_page)
- */
- if (!m_lcp_copy_page_free)
- {
- DBG_LCP(" !m_lcp_copy_page_free" << endl);
- break;
- }
- m_lcp_copy_page_free = false;
- Ptr<GlobalPage> src, copy;
- m_global_page_pool.getPtr(copy, m_lcp_copy_page);
- m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
- memcpy(copy.p, src.p, sizeof(GlobalPage));
- ptr.p->m_real_page_i = copy.i;
- ptr.p->m_copy_real_page_i = src.i;
- ptr.p->m_state |= Page_entry::LCP;
- pageout(signal, ptr);
- }
- else if (state & Page_entry::PAGEOUT)
+ if (state & Page_entry::PAGEOUT)
{
DBG_LCP(" PAGEOUT -> state |= LCP" << endl);
set_page_state(ptr, state | Page_entry::LCP);
@@ -1220,18 +1299,25 @@ Pgman::process_lcp(Signal* signal)
{
DBG_LCP(" pageout()" << endl);
ptr.p->m_state |= Page_entry::LCP;
+ c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
+ ptr.p->m_dirty_count);
pageout(signal, ptr);
}
ptr.p->m_last_lcp = m_last_lcp;
m_lcp_outstanding++;
}
+ else if (ptr.p->m_copy_page_i != RNIL)
+ {
+ DBG_LCP(" NOT DIRTY" << endl);
+ restore_copy_page(ptr);
+ }
else
{
DBG_LCP(" NOT DIRTY" << endl);
}
pl_hash.next(iter);
}
-
+
m_lcp_curr_bucket = (iter.curr.i != RNIL ? iter.bucket : ~(Uint32)0);
}
@@ -1278,17 +1364,10 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr)
debugOut << "PGMAN: " << ptr << endl;
#endif
ndbrequire(ptr.p->m_state & Page_entry::PAGEIN);
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
- if (!(state & Page_entry::NO_HOOK) &&
- c_tup->disk_page_load_hook(ptr.p->m_real_page_i))
- {
- state |= Page_entry::DIRTY;
- }
-
state &= ~ Page_entry::PAGEIN;
state &= ~ Page_entry::EMPTY;
- state &= ~ Page_entry::NO_HOOK;
state |= Page_entry::MAPPED;
set_page_state(ptr, state);
@@ -1307,16 +1386,14 @@ Pgman::pageout(Signal* signal, Ptr<Page_entry> ptr)
debugOut << "PGMAN: " << ptr << endl;
#endif
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::BOUND);
ndbrequire(state & Page_entry::MAPPED);
ndbrequire(! (state & Page_entry::BUSY));
ndbrequire(! (state & Page_entry::PAGEOUT));
- state &= ~ Page_entry::NO_HOOK;
state |= Page_entry::PAGEOUT;
- c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i);
-
+
// update lsn on page prior to write
Ptr<GlobalPage> pagePtr;
m_global_page_pool.getPtr(pagePtr, ptr.p->m_real_page_i);
@@ -1355,7 +1432,7 @@ Pgman::logsync_callback(Signal* signal, Uint32 ptrI, Uint32 res)
#endif
// it is OK to be "busy" at this point (the commit is queued)
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::PAGEOUT);
ndbrequire(state & Page_entry::LOGSYNC);
state &= ~ Page_entry::LOGSYNC;
@@ -1373,7 +1450,7 @@ Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
debugOut << "PGMAN: " << ptr << endl;
#endif
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::PAGEOUT);
state &= ~ Page_entry::PAGEOUT;
@@ -1383,22 +1460,20 @@ Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
ndbrequire(m_stats.m_current_io_waits > 0);
m_stats.m_current_io_waits--;
- if (state & Page_entry::LOCKED)
+ if (ptr.p->m_copy_page_i != RNIL)
{
jam();
- ndbrequire(!m_lcp_copy_page_free);
- m_lcp_copy_page_free = true;
- ptr.p->m_real_page_i = ptr.p->m_copy_real_page_i;
- ptr.p->m_copy_real_page_i = RNIL;
+ restore_copy_page(ptr);
+ state &= ~ Page_entry::COPY;
}
-
+
if (state & Page_entry::LCP)
{
ndbrequire(m_lcp_outstanding);
m_lcp_outstanding--;
+ state &= ~ Page_entry::LCP;
}
- state &= ~ Page_entry::LCP;
-
+
set_page_state(ptr, state);
do_busy_loop(signal, true);
}
@@ -1511,7 +1586,7 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
//ndbrequire(ptr.p->m_requests.isEmpty());
}
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
bool is_new = (state == 0);
bool busy_count = false;
@@ -1545,8 +1620,21 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
state = ptr.p->m_state;
}
+ const Page_state LOCKED = Page_entry::LOCKED | Page_entry::MAPPED;
+ if ((state & LOCKED) == LOCKED &&
+ ! (req_flags & Page_request::UNLOCK_PAGE))
+ {
+ ptr.p->m_state |= (req_flags & DIRTY_FLAGS ? Page_entry::DIRTY : 0);
+ if (m_lcp_loop_on && ptr.p->m_copy_page_i != RNIL)
+ {
+ return create_copy_page(ptr, req_flags);
+ }
+
+ return ptr.p->m_real_page_i;
+ }
+
bool only_request = ptr.p->m_requests.isEmpty();
-
+
if (only_request &&
state & Page_entry::MAPPED)
{
@@ -1565,14 +1653,6 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
ndbrequire(ptr.p->m_real_page_i != RNIL);
return ptr.p->m_real_page_i;
}
-
- if (state & Page_entry::LOCKED &&
- ! (req_flags & Page_request::UNLOCK_PAGE))
- {
- ndbrequire(ptr.p->m_copy_real_page_i != m_lcp_copy_page);
- ndbrequire(ptr.p->m_copy_real_page_i != RNIL);
- return ptr.p->m_copy_real_page_i;
- }
}
if (! (req_flags & Page_request::LOCK_PAGE))
@@ -1585,9 +1665,12 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
{
LocalDLFifoList<Page_request>
req_list(m_page_request_pool, ptr.p->m_requests);
- req_list.seize(req_ptr);
+ if (! (req_flags & Page_request::ALLOC_REQ))
+ req_list.seize(req_ptr);
+ else
+ req_list.seizeFront(req_ptr);
}
-
+
if (req_ptr.i == RNIL)
{
if (is_new)
@@ -1607,19 +1690,15 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
state |= Page_entry::EMPTY;
}
- if (req_flags & Page_request::NO_HOOK)
- {
- state |= Page_entry::NO_HOOK;
- }
-
if (req_flags & Page_request::UNLOCK_PAGE)
{
state &= ~ Page_entry::LOCKED;
}
ptr.p->m_busy_count += busy_count;
+ ptr.p->m_dirty_count += !!(req_flags & DIRTY_FLAGS);
set_page_state(ptr, state);
-
+
do_busy_loop(signal, true);
#ifdef VM_TRACE
@@ -1638,7 +1717,7 @@ Pgman::update_lsn(Ptr<Page_entry> ptr, Uint32 block, Uint64 lsn)
debugOut << "PGMAN: " << ptr << endl;
#endif
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ptr.p->m_lsn = lsn;
if (state & Page_entry::BUSY)
@@ -1745,7 +1824,7 @@ Pgman::drop_page(Ptr<Page_entry> ptr)
Page_stack& pl_stack = m_page_stack;
Page_queue& pl_queue = m_page_queue;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
if (! (state & (Page_entry::PAGEIN | Page_entry::PAGEOUT)))
{
ndbrequire(state & Page_entry::BOUND);
@@ -1768,7 +1847,6 @@ Pgman::drop_page(Ptr<Page_entry> ptr)
if (ptr.p->m_real_page_i != RNIL)
{
jam();
- c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i);
release_cache_page(ptr.p->m_real_page_i);
ptr.p->m_real_page_i = RNIL;
}
@@ -1790,7 +1868,7 @@ void
Pgman::verify_page_entry(Ptr<Page_entry> ptr)
{
Uint32 ptrI = ptr.i;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
bool has_req = state & Page_entry::REQUEST;
bool has_req2 = ! ptr.p->m_requests.isEmpty();
@@ -1879,7 +1957,7 @@ Pgman::verify_page_lists()
{
verify_page_entry(iter.curr);
- Uint16 state = iter.curr.p->m_state;
+ Page_state state = iter.curr.p->m_state;
if (state & Page_entry::ONSTACK)
stack_count++;
if (state & Page_entry::ONQUEUE)
@@ -1906,7 +1984,7 @@ Pgman::verify_page_lists()
{
ndbrequire(i1 != ptr.i);
i1 = ptr.i;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::ONSTACK || dump_page_lists());
if (! pl_stack.hasPrev(ptr))
ndbrequire(state & Page_entry::HOT || dump_page_lists());
@@ -1920,7 +1998,7 @@ Pgman::verify_page_lists()
{
ndbrequire(i2 != ptr.i);
i2 = ptr.i;
- Uint16 state = ptr.p->m_state;
+ Page_state state = ptr.p->m_state;
ndbrequire(state & Page_entry::ONQUEUE || dump_page_lists());
ndbrequire(state & Page_entry::BOUND || dump_page_lists());
cold_bound_count++;
@@ -2082,8 +2160,6 @@ operator<<(NdbOut& out, Ptr<Pgman::Page_request> ptr)
out << " flags=" << hex << pr.m_flags;
out << "," << dec << (pr.m_flags & Pgman::Page_request::OP_MASK);
{
- if (pr.m_flags & Pgman::Page_request::STRICT_ORDER)
- out << ",strict_order";
if (pr.m_flags & Pgman::Page_request::LOCK_PAGE)
out << ",lock_page";
if (pr.m_flags & Pgman::Page_request::EMPTY_PAGE)
@@ -2094,8 +2170,6 @@ operator<<(NdbOut& out, Ptr<Pgman::Page_request> ptr)
out << ",commit_req";
if (pr.m_flags & Pgman::Page_request::DIRTY_REQ)
out << ",dirty_req";
- if (pr.m_flags & Pgman::Page_request::NO_HOOK)
- out << ",no_hook";
if (pr.m_flags & Pgman::Page_request::CORR_REQ)
out << ",corr_req";
}
@@ -2132,8 +2206,6 @@ operator<<(NdbOut& out, Ptr<Pgman::Page_entry> ptr)
out << ",pageout";
if (pe.m_state & Pgman::Page_entry::LOGSYNC)
out << ",logsync";
- if (pe.m_state & Pgman::Page_entry::NO_HOOK)
- out << ",no_hook";
if (pe.m_state & Pgman::Page_entry::LCP)
out << ",lcp";
if (pe.m_state & Pgman::Page_entry::HOT)
@@ -2237,10 +2309,12 @@ Pgman::execDUMP_STATE_ORD(Signal* signal)
if (pl_hash.find(ptr, key))
{
ndbout << "pageout " << ptr << endl;
+ c_tup->disk_page_unmap_callback(ptr.p->m_real_page_i,
+ ptr.p->m_dirty_count);
pageout(signal, ptr);
}
}
-
+
if (signal->theData[0] == 11003)
{
diff --git a/storage/ndb/src/kernel/blocks/pgman.hpp b/storage/ndb/src/kernel/blocks/pgman.hpp
index b616e169381..5c27fdd31e8 100644
--- a/storage/ndb/src/kernel/blocks/pgman.hpp
+++ b/storage/ndb/src/kernel/blocks/pgman.hpp
@@ -249,15 +249,13 @@ private:
struct Page_request {
enum Flags {
OP_MASK = 0x000F // 4 bits for TUP operation
- ,STRICT_ORDER = 0x0010 // maintain request order
,LOCK_PAGE = 0x0020 // lock page in memory
,EMPTY_PAGE = 0x0040 // empty (new) page
,ALLOC_REQ = 0x0080 // part of alloc
,COMMIT_REQ = 0x0100 // part of commit
,DIRTY_REQ = 0x0200 // make page dirty wo/ update_lsn
- ,NO_HOOK = 0x0400 // dont run load hook
- ,UNLOCK_PAGE = 0x0800
- ,CORR_REQ = 0x1000 // correlated request (no LIRS update)
+ ,UNLOCK_PAGE = 0x0400
+ ,CORR_REQ = 0x0800 // correlated request (no LIRS update)
};
Uint16 m_block;
@@ -286,6 +284,8 @@ private:
Uint32 prevList;
};
+ typedef Uint16 Page_state;
+
struct Page_entry : Page_entry_stack_ptr,
Page_entry_queue_ptr,
Page_entry_sublist_ptr {
@@ -305,7 +305,7 @@ private:
,PAGEIN = 0x0100 // paging in
,PAGEOUT = 0x0200 // paging out
,LOGSYNC = 0x0400 // undo WAL as part of pageout
- ,NO_HOOK = 0x0800 // don't run load hook
+ ,COPY = 0x0800 // Copy page for LCP
,LCP = 0x1000 // page is LCP flushed
,HOT = 0x2000 // page is hot
,ONSTACK = 0x4000 // page is on LIRS stack
@@ -324,26 +324,26 @@ private:
,SUBLIST_COUNT = 8
};
- Uint16 m_state; // flags (0 for new entry)
-
- Uint16 m_file_no; // disk page address set at seize
+ Uint16 m_file_no; // disk page address set at seize
+ Page_state m_state; // flags (0 for new entry)
+
Uint32 m_page_no;
Uint32 m_real_page_i;
- Uint32 m_copy_real_page_i; // used for flushing LOCKED pages
-
Uint64 m_lsn;
- Uint32 m_last_lcp;
+ Uint32 m_last_lcp;
+ Uint32 m_dirty_count;
+ Uint32 m_copy_page_i;
union {
Uint32 m_busy_count; // non-zero means BUSY
Uint32 nextPool;
};
DLFifoList<Page_request>::Head m_requests;
-
+
Uint32 nextHash;
Uint32 prevHash;
-
+
Uint32 hashValue() const { return m_file_no << 16 | m_page_no; }
bool equal(const Page_entry& obj) const {
return
@@ -374,8 +374,6 @@ private:
Uint32 m_last_lcp_complete;
Uint32 m_lcp_curr_bucket;
Uint32 m_lcp_outstanding; // remaining i/o waits
- Uint32 m_lcp_copy_page;
- bool m_lcp_copy_page_free;
EndLcpReq m_end_lcp_req;
// clean-up variables
@@ -421,9 +419,10 @@ protected:
void execREAD_CONFIG_REQ(Signal* signal);
void execCONTINUEB(Signal* signal);
+ void execLCP_PREPARE_REQ(Signal* signal);
void execLCP_FRAG_ORD(Signal*);
void execEND_LCP_REQ(Signal*);
-
+
void execFSREADCONF(Signal*);
void execFSREADREF(Signal*);
void execFSWRITECONF(Signal*);
@@ -432,8 +431,8 @@ protected:
void execDUMP_STATE_ORD(Signal* signal);
private:
- static Uint32 get_sublist_no(Uint16 state);
- void set_page_state(Ptr<Page_entry> ptr, Uint16 new_state);
+ static Uint32 get_sublist_no(Page_state state);
+ void set_page_state(Ptr<Page_entry> ptr, Page_state new_state);
bool seize_cache_page(Ptr<GlobalPage>& gptr);
void release_cache_page(Uint32 i);
@@ -463,7 +462,10 @@ private:
void move_cleanup_ptr(Ptr<Page_entry> ptr);
bool process_lcp(Signal*);
-
+ void process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr);
+ int create_copy_page(Ptr<Page_entry>, Uint32 req_flags);
+ void restore_copy_page(Ptr<Page_entry>);
+
void pagein(Signal*, Ptr<Page_entry>);
void fsreadreq(Signal*, Ptr<Page_entry>);
void fsreadconf(Signal*, Ptr<Page_entry>);
@@ -510,13 +512,11 @@ public:
Ptr<GlobalPage> m_ptr; // TODO remove
enum RequestFlags {
- STRICT_ORDER = Pgman::Page_request::STRICT_ORDER
- ,LOCK_PAGE = Pgman::Page_request::LOCK_PAGE
+ LOCK_PAGE = Pgman::Page_request::LOCK_PAGE
,EMPTY_PAGE = Pgman::Page_request::EMPTY_PAGE
,ALLOC_REQ = Pgman::Page_request::ALLOC_REQ
,COMMIT_REQ = Pgman::Page_request::COMMIT_REQ
,DIRTY_REQ = Pgman::Page_request::DIRTY_REQ
- ,NO_HOOK = Pgman::Page_request::NO_HOOK
,UNLOCK_PAGE = Pgman::Page_request::UNLOCK_PAGE
,CORR_REQ = Pgman::Page_request::CORR_REQ
};
diff --git a/storage/ndb/src/kernel/blocks/tsman.cpp b/storage/ndb/src/kernel/blocks/tsman.cpp
index 25bf998a625..3b1a09fbf12 100644
--- a/storage/ndb/src/kernel/blocks/tsman.cpp
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp
@@ -32,6 +32,12 @@
#define JONAS 0
+#define COMMITTED_MASK ((1 << 0) | (1 << 1))
+#define UNCOMMITTED_MASK ((1 << 2) | (1 << 3))
+#define UNCOMMITTED_SHIFT 2
+
+#define DBG_UNDO 0
+
Tsman::Tsman(const Configuration & conf, class Pgman* pg, class Lgman* lg) :
SimulatedBlock(TSMAN, conf),
m_file_hash(m_file_pool),
@@ -41,6 +47,10 @@ Tsman::Tsman(const Configuration & conf, class Pgman* pg, class Lgman* lg) :
m_lgman(lg)
{
BLOCK_CONSTRUCTOR(Tsman);
+
+ Uint32 SZ = File_formats::Datafile::EXTENT_HEADER_BITMASK_BITS_PER_PAGE;
+ ndbrequire((COMMITTED_MASK & UNCOMMITTED_MASK) == 0);
+ ndbrequire((COMMITTED_MASK | UNCOMMITTED_MASK) == ((1 << SZ) - 1));
// Add received signals
addRecSignal(GSN_STTOR, &Tsman::execSTTOR);
@@ -548,7 +558,7 @@ Tsman::release_extent_pages(Signal* signal, Ptr<Datafile> ptr)
safe_cast(&Tsman::release_extent_pages_callback);
int page_id;
- int flags = Page_cache_client::UNLOCK_PAGE | Page_cache_client::NO_HOOK;
+ int flags = Page_cache_client::UNLOCK_PAGE;
if((page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
{
execute(signal, preq.m_callback, page_id);
@@ -1039,7 +1049,7 @@ Tsman::load_extent_pages(Signal* signal, Ptr<Datafile> ptr)
safe_cast(&Tsman::load_extent_page_callback);
int page_id;
- int flags = Page_cache_client::LOCK_PAGE | Page_cache_client::NO_HOOK;
+ int flags = Page_cache_client::LOCK_PAGE;
if((page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
{
load_extent_page_callback(signal, ptr.i, (Uint32)page_id);
@@ -1202,7 +1212,7 @@ Tsman::scan_extent_headers(Signal* signal, Ptr<Datafile> ptr)
Uint32 extent_no = extents - j - 1;
File_formats::Datafile::Extent_header* header=
page->get_header(extent_no, size);
- if(header->m_table == RNIL)
+ if (header->m_table == RNIL)
{
header->m_next_free_extent = firstFree;
firstFree = page_no * per_page + extent_no;
@@ -1221,9 +1231,9 @@ Tsman::scan_extent_headers(Signal* signal, Ptr<Datafile> ptr)
ptr.p->m_online.m_used_extent_cnt++;
for(Uint32 i = 0; i<size; i++, key.m_page_no++)
{
- Uint32 bits= header->get_free_bits(i) & ~(1 << (SZ - 1));
- header->update_free_bits(i, bits);
- tup->disk_restart_page_bits(tableId, fragmentId, &key, 0, bits);
+ Uint32 bits= header->get_free_bits(i) & COMMITTED_MASK;
+ header->update_free_bits(i, bits | (bits << UNCOMMITTED_SHIFT));
+ tup->disk_restart_page_bits(tableId, fragmentId, &key, bits);
}
}
else
@@ -1550,7 +1560,9 @@ Tsman::execFREE_EXTENT_REQ(Signal* signal)
int
Tsman::update_page_free_bits(Signal* signal,
- Local_key *key, unsigned bit, Uint64 lsn)
+ Local_key *key,
+ unsigned committed_bits,
+ Uint64 lsn)
{
jamEntry();
@@ -1600,20 +1612,12 @@ Tsman::update_page_free_bits(Signal* signal,
/**
* Toggle word
*/
- bit |= header->get_free_bits(page_no_in_extent) & (1 << (SZ - 1));
- header->update_free_bits(page_no_in_extent, bit);
+ ndbassert((committed_bits & ~(COMMITTED_MASK)) == 0);
+ Uint32 src = header->get_free_bits(page_no_in_extent) & UNCOMMITTED_MASK;
+ header->update_free_bits(page_no_in_extent, src | committed_bits);
-#ifdef VM_TRACE
- if(! (bit & ((1 << (SZ - 1)) - 1)) && header->check_free(eh_words))
- {
- ndbout_c("Extent is now free!!");
- }
-#endif
-
- /**
- * Update lsn
- */
m_page_cache_client.update_lsn(preq.m_page, lsn);
+
return 0;
}
@@ -1621,7 +1625,9 @@ Tsman::update_page_free_bits(Signal* signal,
}
int
-Tsman::get_page_free_bits(Signal* signal, Local_key *key, unsigned* bits)
+Tsman::get_page_free_bits(Signal* signal, Local_key *key,
+ unsigned* uncommitted,
+ unsigned* committed)
{
jamEntry();
@@ -1664,7 +1670,9 @@ Tsman::get_page_free_bits(Signal* signal, Local_key *key, unsigned* bits)
ndbrequire(header->m_table != RNIL);
Uint32 page_no_in_extent = (key->m_page_no - data_off) % size;
- *bits = header->get_free_bits(page_no_in_extent);
+ Uint32 bits = header->get_free_bits(page_no_in_extent);
+ *uncommitted = (bits & UNCOMMITTED_MASK) >> UNCOMMITTED_SHIFT;
+ *committed = (bits & COMMITTED_MASK);
return 0;
}
@@ -1672,7 +1680,7 @@ Tsman::get_page_free_bits(Signal* signal, Local_key *key, unsigned* bits)
}
int
-Tsman::unmap_page(Signal* signal, Local_key *key)
+Tsman::unmap_page(Signal* signal, Local_key *key, Uint32 uncommitted_bits)
{
jamEntry();
@@ -1704,7 +1712,7 @@ Tsman::unmap_page(Signal* signal, Local_key *key)
/**
* Handling of unmapped extent header pages is not implemented
*/
- int flags = Page_cache_client::DIRTY_REQ;
+ int flags = 0;
int real_page_id;
if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
{
@@ -1722,13 +1730,106 @@ Tsman::unmap_page(Signal* signal, Local_key *key)
/**
* Toggle word
*/
- Uint32 old = header->get_free_bits(page_no_in_extent);
- unsigned bit =
- (header->get_free_bits(page_no_in_extent) & ((1 << (SZ - 1)) - 1));
- header->update_free_bits(page_no_in_extent, bit);
- if (JONAS)
- ndbout_c("toggle page: (%d, %d, %d) from %x to %x",
- key->m_page_no, extent, page_no_in_extent, old, bit);
+ ndbassert(((uncommitted_bits << UNCOMMITTED_SHIFT) & ~UNCOMMITTED_MASK) == 0);
+ Uint32 src = header->get_free_bits(page_no_in_extent) & COMMITTED_MASK;
+ header->update_free_bits(page_no_in_extent,
+ src | (uncommitted_bits << UNCOMMITTED_SHIFT));
+ }
+
+ return AllocExtentReq::UnmappedExtentPageIsNotImplemented;
+}
+
+int
+Tsman::restart_undo_page_free_bits(Signal* signal,
+ Uint32 tableId,
+ Uint32 fragId,
+ Local_key *key,
+ unsigned bits,
+ Uint64 undo_lsn)
+{
+ jamEntry();
+
+ /**
+ * 1) Compute which extent_no key belongs to
+ * 2) Find out which page extent_no belongs to
+ * 3) Undo log m_page_bitmask
+ * 4) Update m_page_bitmask
+ */
+ Ptr<Datafile> file_ptr;
+ Datafile file_key;
+ file_key.m_file_no = key->m_file_no;
+ ndbrequire(m_file_hash.find(file_ptr, file_key));
+
+ Uint32 size = file_ptr.p->m_extent_size;
+ Uint32 data_off = file_ptr.p->m_online.m_offset_data_pages;
+ Uint32 eh_words = File_formats::Datafile::extent_header_words(size);
+ Uint32 per_page = File_formats::Datafile::EXTENT_PAGE_WORDS/eh_words;
+ Uint32 SZ= File_formats::Datafile::EXTENT_HEADER_BITMASK_BITS_PER_PAGE;
+
+ Uint32 extent = (key->m_page_no - data_off) / size + per_page;
+ Uint32 page_no = extent / per_page;
+ Uint32 extent_no = extent % per_page;
+
+ Page_cache_client::Request preq;
+ preq.m_page.m_page_no = page_no;
+ preq.m_page.m_file_no = key->m_file_no;
+
+ /**
+ * Handling of unmapped extent header pages is not implemented
+ */
+ int flags = 0;
+ int real_page_id;
+ if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
+ {
+ GlobalPage* ptr_p = m_page_cache_client.m_ptr.p;
+
+ File_formats::Datafile::Extent_page* page =
+ (File_formats::Datafile::Extent_page*)ptr_p;
+ File_formats::Datafile::Extent_header* header =
+ page->get_header(extent_no, size);
+
+ if (header->m_table == RNIL)
+ {
+ if (DBG_UNDO)
+ ndbout_c("tsman: apply undo - skip table == RNIL");
+ return 0;
+ }
+
+ ndbrequire(header->m_table == tableId);
+ ndbrequire(header->m_fragment_id == fragId);
+
+ Uint32 page_no_in_extent = (key->m_page_no - data_off) % size;
+ Uint32 src = header->get_free_bits(page_no_in_extent);
+
+ Uint64 lsn = 0;
+ lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
+ lsn += page->m_page_header.m_page_lsn_lo;
+
+ if (undo_lsn <= lsn)
+ {
+ /**
+ * Toggle word
+ */
+ if (DBG_UNDO)
+ ndbout_c("tsman: apply %lld(%lld) %x -> %x",
+ undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT)));
+
+ lsn = undo_lsn;
+ page->m_page_header.m_page_lsn_hi = lsn >> 32;
+ page->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
+ ndbassert((bits & ~(COMMITTED_MASK)) == 0);
+ header->update_free_bits(page_no_in_extent,
+ bits | (bits << UNCOMMITTED_SHIFT));
+
+ m_page_cache_client.update_lsn(preq.m_page, lsn);
+ }
+ else
+ {
+ if (DBG_UNDO)
+ ndbout_c("tsman: apply %lld(%lld) %x -> %x",
+ undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT)));
+ }
+
return 0;
}
@@ -1773,7 +1874,7 @@ Tsman::execALLOC_PAGE_REQ(Signal* signal)
/**
* Handling of unmapped extent header pages is not implemented
*/
- int flags = Page_cache_client::DIRTY_REQ;
+ int flags = 0;
int real_page_id;
Uint32 page_no;
Uint32 src_bits;
@@ -1799,6 +1900,8 @@ Tsman::execALLOC_PAGE_REQ(Signal* signal)
* 3 = 11 - full - less than pct_free% free, pct_free=10%
*/
+ Uint32 reqbits = req.bits << UNCOMMITTED_SHIFT;
+
/**
* Search
*/
@@ -1806,7 +1909,7 @@ Tsman::execALLOC_PAGE_REQ(Signal* signal)
for(page_no= page_no_in_extent; page_no<size; page_no++)
{
src_bits= (* src >> shift) & ((1 << SZ) - 1);
- if(src_bits <= req.bits)
+ if((src_bits & UNCOMMITTED_MASK) <= reqbits)
{
goto found;
}
@@ -1820,7 +1923,7 @@ Tsman::execALLOC_PAGE_REQ(Signal* signal)
for(page_no= 0; page_no<page_no_in_extent; page_no++)
{
src_bits= (* src >> shift) & ((1 << SZ) - 1);
- if(src_bits <= req.bits)
+ if((src_bits & UNCOMMITTED_MASK) <= reqbits)
{
goto found;
}
@@ -1844,99 +1947,13 @@ Tsman::execALLOC_PAGE_REQ(Signal* signal)
return;
found:
- if (JONAS)
- ndbout_c("alloc page: (%d, %d, %d)",
- data_off + extent * size + page_no, per_page + extent, page_no);
- src_bits |= (1 << (SZ - 1)); // high unlogged, allocated bit
- header->update_free_bits(page_no, src_bits);
- rep->bits= src_bits & ((1 << (SZ - 1)) - 1);
+ header->update_free_bits(page_no, src_bits | UNCOMMITTED_MASK);
+ rep->bits= (src_bits & UNCOMMITTED_MASK) >> UNCOMMITTED_SHIFT;
rep->key.m_page_no= data_off + extent * size + page_no;
rep->reply.errorCode= 0;
return;
}
-int
-Tsman::restart_undo_page_free_bits(Signal* signal,
- Local_key *key, unsigned bit,
- Uint64 undo_lsn)
-{
- jamEntry();
-
- /**
- * 1) Compute which extent_no key belongs to
- * 2) Find out which page extent_no belongs to
- * 3) Undo log m_page_bitmask
- * 4) Update m_page_bitmask
- */
- Ptr<Datafile> file_ptr;
- Datafile file_key;
- file_key.m_file_no = key->m_file_no;
- ndbrequire(m_file_hash.find(file_ptr, file_key));
-
- Uint32 size = file_ptr.p->m_extent_size;
- Uint32 data_off = file_ptr.p->m_online.m_offset_data_pages;
- Uint32 eh_words = File_formats::Datafile::extent_header_words(size);
- Uint32 per_page = File_formats::Datafile::EXTENT_PAGE_WORDS/eh_words;
- Uint32 SZ= File_formats::Datafile::EXTENT_HEADER_BITMASK_BITS_PER_PAGE;
-
- Uint32 extent = (key->m_page_no - data_off) / size + per_page;
- Uint32 page_no = extent / per_page;
- Uint32 extent_no = extent % per_page;
-
- Page_cache_client::Request preq;
- preq.m_page.m_page_no = page_no;
- preq.m_page.m_file_no = key->m_file_no;
-
- /**
- * Handling of unmapped extent header pages is not implemented
- */
- int flags = Page_cache_client::COMMIT_REQ;
- int real_page_id;
- if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
- {
- GlobalPage* ptr_p = m_page_cache_client.m_ptr.p;
-
- File_formats::Datafile::Extent_page* page =
- (File_formats::Datafile::Extent_page*)ptr_p;
-
- Uint64 lsn = 0;
- lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
- lsn += page->m_page_header.m_page_lsn_lo;
-
- if (undo_lsn <= lsn)
- {
- File_formats::Datafile::Extent_header* header =
- page->get_header(extent_no, size);
-
- Uint32 tableId = header->m_table;
- Uint32 fragmentId = header->m_fragment_id;
- ndbrequire(tableId != RNIL);
- ndbrequire(fragmentId != RNIL);
-
- Uint32 page_no_in_extent = (key->m_page_no - data_off) % size;
-
- Uint32 old_bits = header->get_free_bits(page_no_in_extent);
- if (old_bits != bit)
- {
- ndbout << "tsman toggle " << *key << " from " << old_bits << " to "
- << bit << endl;
- Dbtup* tup= (Dbtup*)globalData.getBlock(DBTUP);
- header->update_free_bits(page_no_in_extent, bit);
- tup->disk_restart_page_bits(tableId, fragmentId, key, old_bits, bit);
- }
- lsn--; // prevent UNDO from being run again...
- }
- else
- {
- ndbout_c("tsman skipping undo %lld %lld", undo_lsn, lsn);
- }
-
- m_page_cache_client.update_lsn(preq.m_page, lsn);
- return 0;
- }
- return AllocExtentReq::UnmappedExtentPageIsNotImplemented;
-}
-
void
Tsman::execEND_LCP_REQ(Signal* signal)
{
diff --git a/storage/ndb/src/kernel/blocks/tsman.hpp b/storage/ndb/src/kernel/blocks/tsman.hpp
index 194178163e8..4471f5effa2 100644
--- a/storage/ndb/src/kernel/blocks/tsman.hpp
+++ b/storage/ndb/src/kernel/blocks/tsman.hpp
@@ -195,11 +195,13 @@ private:
void load_extent_page_callback(Signal*, Uint32, Uint32);
void create_file_ref(Signal*, Ptr<Tablespace>, Ptr<Datafile>,
Uint32,Uint32,Uint32);
- int update_page_free_bits(Signal*, Local_key*, unsigned bits, Uint64 lsn);
- int get_page_free_bits(Signal*, Local_key*, unsigned* bits);
- int unmap_page(Signal*, Local_key*);
- int restart_undo_page_free_bits(Signal*, Local_key*, unsigned, Uint64);
-
+ int update_page_free_bits(Signal*, Local_key*, unsigned committed_bits,
+ Uint64 lsn);
+ int get_page_free_bits(Signal*, Local_key*, unsigned*, unsigned*);
+ int unmap_page(Signal*, Local_key*, unsigned uncommitted_bits);
+ int restart_undo_page_free_bits(Signal*, Uint32, Uint32, Local_key*,
+ unsigned committed_bits, Uint64 lsn);
+
int alloc_extent(Signal* signal, Uint32 tablespace, Local_key* key);
int alloc_page_from_extent(Signal*, Uint32, Local_key*, Uint32 bits);
@@ -266,22 +268,23 @@ public:
* Update page free bits
*/
int update_page_free_bits(Local_key*, unsigned bits, Uint64 lsn);
-
+
/**
* Get page free bits
*/
- int get_page_free_bits(Local_key*, unsigned* bits);
-
+ int get_page_free_bits(Local_key*,
+ unsigned* uncommitted, unsigned* committed);
+
/**
* Update unlogged page free bit
*/
- int unmap_page(Local_key*);
-
+ int unmap_page(Local_key*, Uint32 bits);
+
/**
* Undo handling of page bits
*/
int restart_undo_page_free_bits(Local_key*, unsigned bits, Uint64 lsn);
-
+
/**
* Get tablespace info
*
@@ -353,32 +356,40 @@ Tablespace_client::free_extent(Local_key* key)
inline
int
Tablespace_client::update_page_free_bits(Local_key *key,
- unsigned bits, Uint64 lsn)
+ unsigned committed_bits,
+ Uint64 lsn)
{
- return m_tsman->update_page_free_bits(m_signal, key, bits, lsn);
+ return m_tsman->update_page_free_bits(m_signal, key, committed_bits, lsn);
}
inline
int
-Tablespace_client::get_page_free_bits(Local_key *key, unsigned* bits)
+Tablespace_client::get_page_free_bits(Local_key *key,
+ unsigned* uncommited,
+ unsigned* commited)
{
- return m_tsman->get_page_free_bits(m_signal, key, bits);
+ return m_tsman->get_page_free_bits(m_signal, key, uncommited, commited);
}
inline
int
-Tablespace_client::unmap_page(Local_key *key)
+Tablespace_client::unmap_page(Local_key *key, unsigned uncommitted_bits)
{
- return m_tsman->unmap_page(m_signal, key);
+ return m_tsman->unmap_page(m_signal, key, uncommitted_bits);
}
inline
int
Tablespace_client::restart_undo_page_free_bits(Local_key* key,
- unsigned bits, Uint64 lsn)
+ unsigned committed_bits,
+ Uint64 lsn)
{
return m_tsman->restart_undo_page_free_bits(m_signal,
- key, bits, lsn);
+ m_table_id,
+ m_fragment_id,
+ key,
+ committed_bits,
+ lsn);
}