summaryrefslogtreecommitdiff
path: root/storage
diff options
context:
space:
mode:
authorunknown <jonas@perch.ndb.mysql.com>2006-05-19 09:38:59 +0200
committerunknown <jonas@perch.ndb.mysql.com>2006-05-19 09:38:59 +0200
commit8bfc2d9c1b861224f8be8b707a1f9d5e52fda7e4 (patch)
tree54e7b10ff4dbb231c74e14887c3585597e00195f /storage
parentbe0ab479b822a3b9d065dc659ef59d5450f2270b (diff)
downloadmariadb-git-8bfc2d9c1b861224f8be8b707a1f9d5e52fda7e4.tar.gz
ndb -
bug#19928 and bug#19929 fix to critical bugs in tup scan that affected lcp,backup and opt. nr storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp: 1) dont let dirty read scan find uncommitted inserts 2) force opt. nr scan to wait for locked rows 3) when finding LCP keep record, use accOpPtr -1, so that it will not be committed towards ACC
Diffstat (limited to 'storage')
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp40
1 files changed, 24 insertions, 16 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
index 43875911f7f..0d2ffe0927f 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
@@ -582,12 +582,15 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
Fragrecord& frag = *fragPtr.p;
// tuple found
Tuple_header* th = 0;
+ Uint32 thbits = 0;
Uint32 loop_count = 0;
Uint32 scanGCI = scanPtr.p->m_scanGCI;
Uint32 foundGCI;
- bool mm = (bits & ScanOp::SCAN_DD);
- bool lcp = (bits & ScanOp::SCAN_LCP);
+ const bool mm = (bits & ScanOp::SCAN_DD);
+ const bool lcp = (bits & ScanOp::SCAN_LCP);
+ const bool dirty = (bits & ScanOp::SCAN_LOCK) == 0;
+
Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
Uint32 size = table.m_offsets[mm].m_fix_header_size +
(bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0);
@@ -750,22 +753,22 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
{
pos.m_get = ScanPos::Get_next_tuple_fs;
th = (Tuple_header*)&page->m_data[key.m_page_idx];
+ thbits = th->m_header_bits;
+
if (likely(! (bits & ScanOp::SCAN_NR)))
{
- if (! (th->m_header_bits & Tuple_header::FREE)) {
- goto found_tuple;
- }
- else
+ jam();
+ if (! (thbits & Tuple_header::FREE))
{
- jam();
- // skip free tuple
- }
+ if (! ((thbits & Tuple_header::ALLOC) && dirty))
+ goto found_tuple;
+ }
}
else
{
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI)
{
- if (! (th->m_header_bits & Tuple_header::FREE))
+ if (! (thbits & Tuple_header::FREE))
{
jam();
goto found_tuple;
@@ -775,9 +778,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
goto found_deleted_rowid;
}
}
- else
+ else if (thbits != Fix_page::FREE_RECORD &&
+ th->m_operation_ptr_i != RNIL)
{
jam();
+ goto found_tuple; // Locked tuple...
// skip free tuple
}
}
@@ -793,8 +798,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
jam();
{
// caller has already set pos.m_get to next tuple
- if (! (bits & ScanOp::SCAN_LCP &&
- th->m_header_bits & Tuple_header::LCP_SKIP)) {
+ if (! (bits & ScanOp::SCAN_LCP && thbits & Tuple_header::LCP_SKIP)) {
Local_key& key_mm = pos.m_key_mm;
if (! (bits & ScanOp::SCAN_DD)) {
key_mm = pos.m_key;
@@ -810,7 +814,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
} else {
jam();
// clear it so that it will show up in next LCP
- th->m_header_bits &= ~(Uint32)Tuple_header::LCP_SKIP;
+ th->m_header_bits = thbits & ~(Uint32)Tuple_header::LCP_SKIP;
+ if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
+ jam();
+ setChecksum(th, tablePtr.p);
+ }
}
}
break;
@@ -833,7 +841,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI)
{
- if (! (th->m_header_bits & Tuple_header::FREE))
+ if (! (thbits & Tuple_header::FREE))
break;
}
}
@@ -893,7 +901,7 @@ found_lcp_keep:
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scan.m_userPtr;
- conf->accOperationPtr = RNIL + 1;
+ conf->accOperationPtr = (Uint32)-1;
conf->fragId = frag.fragmentId;
conf->localKey[0] = lcp_list;
conf->localKey[1] = 0;