summaryrefslogtreecommitdiff
path: root/storage/ndb/src
diff options
context:
space:
mode:
authorjoerg@trift2. <>2008-02-22 16:14:27 +0100
committerjoerg@trift2. <>2008-02-22 16:14:27 +0100
commit25ab6afc033657b82ceeb6bc581ef67febdcdd79 (patch)
treec34960a8d8c55cea599ef88af042bbf2ecd8baca /storage/ndb/src
parenta54f3995ec4cd45e261fb000657872d49afbb5ee (diff)
parent39c20516b75735df9c559b99abf5efeb898936ba (diff)
downloadmariadb-git-25ab6afc033657b82ceeb6bc581ef67febdcdd79.tar.gz
Merge trift2.:/MySQL/M51/mysql-5.1
into trift2.:/MySQL/M51/push-5.1
Diffstat (limited to 'storage/ndb/src')
-rw-r--r--storage/ndb/src/common/debugger/SignalLoggerManager.cpp2
-rw-r--r--storage/ndb/src/common/debugger/signaldata/ScanTab.cpp2
-rw-r--r--storage/ndb/src/common/transporter/TCP_Transporter.cpp22
-rw-r--r--storage/ndb/src/common/util/Bitmask.cpp420
-rw-r--r--storage/ndb/src/common/util/NdbOut.cpp2
-rw-r--r--storage/ndb/src/kernel/blocks/ERROR_codes.txt20
-rw-r--r--storage/ndb/src/kernel/blocks/backup/Backup.cpp73
-rw-r--r--storage/ndb/src/kernel/blocks/backup/Backup.hpp4
-rw-r--r--storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp1
-rw-r--r--storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp7
-rw-r--r--storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp2
-rw-r--r--storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp6
-rw-r--r--storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp6
-rw-r--r--storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp5
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp48
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp35
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp247
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp39
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp83
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp113
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp133
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupStoredProcDef.cpp16
-rw-r--r--storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp33
-rw-r--r--storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp6
-rw-r--r--storage/ndb/src/kernel/blocks/lgman.cpp20
-rw-r--r--storage/ndb/src/kernel/blocks/suma/Suma.cpp16
-rw-r--r--storage/ndb/src/kernel/blocks/suma/SumaInit.cpp2
-rw-r--r--storage/ndb/src/kernel/blocks/tsman.cpp16
-rw-r--r--storage/ndb/src/kernel/vm/DLHashTable.hpp22
-rw-r--r--storage/ndb/src/kernel/vm/DLHashTable2.hpp22
-rw-r--r--storage/ndb/src/kernel/vm/NdbdSuperPool.cpp2
-rw-r--r--storage/ndb/src/kernel/vm/Pool.cpp3
-rw-r--r--storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp398
-rw-r--r--storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp34
-rw-r--r--storage/ndb/src/kernel/vm/pc.hpp6
-rw-r--r--storage/ndb/src/mgmapi/ndb_logevent.cpp2
-rw-r--r--storage/ndb/src/mgmclient/CommandInterpreter.cpp17
-rw-r--r--storage/ndb/src/mgmsrv/ConfigInfo.cpp12
-rw-r--r--storage/ndb/src/ndbapi/Ndb.cpp141
-rw-r--r--storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp50
-rw-r--r--storage/ndb/src/ndbapi/NdbScanOperation.cpp42
-rw-r--r--storage/ndb/src/ndbapi/ndberror.c4
42 files changed, 1266 insertions, 868 deletions
diff --git a/storage/ndb/src/common/debugger/SignalLoggerManager.cpp b/storage/ndb/src/common/debugger/SignalLoggerManager.cpp
index 471bea64f64..48cacb6bc1a 100644
--- a/storage/ndb/src/common/debugger/SignalLoggerManager.cpp
+++ b/storage/ndb/src/common/debugger/SignalLoggerManager.cpp
@@ -129,7 +129,7 @@ SignalLoggerManager::log(LogMode logMode, const char * params)
const int count = getParameter(blocks, "BLOCK=", params);
int cnt = 0;
- if((count == 1 && blocks[0] == "ALL") ||
+ if((count == 1 && !strcmp(blocks[0], "ALL")) ||
count == 0){
for (int number = 0; number < NO_OF_BLOCKS; ++number){
diff --git a/storage/ndb/src/common/debugger/signaldata/ScanTab.cpp b/storage/ndb/src/common/debugger/signaldata/ScanTab.cpp
index 9668c9d0605..4f6b69ebfba 100644
--- a/storage/ndb/src/common/debugger/signaldata/ScanTab.cpp
+++ b/storage/ndb/src/common/debugger/signaldata/ScanTab.cpp
@@ -70,7 +70,7 @@ printSCANTABCONF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 recei
sig->transId1, sig->transId2);
fprintf(output, " requestInfo: Eod: %d OpCount: %d\n",
- (requestInfo & ScanTabConf::EndOfData == ScanTabConf::EndOfData),
+ (requestInfo & ScanTabConf::EndOfData) == ScanTabConf::EndOfData,
(requestInfo & (~ScanTabConf::EndOfData)));
size_t op_count= requestInfo & (~ScanTabConf::EndOfData);
if(op_count){
diff --git a/storage/ndb/src/common/transporter/TCP_Transporter.cpp b/storage/ndb/src/common/transporter/TCP_Transporter.cpp
index 298e43710b0..8b386483bff 100644
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp
@@ -317,22 +317,32 @@ TCP_Transporter::doSend() {
// Empty the SendBuffers
- const char * const sendPtr = m_sendBuffer.sendPtr;
- const Uint32 sizeToSend = m_sendBuffer.sendDataSize;
- if (sizeToSend > 0){
+ bool sent_any = true;
+ while (m_sendBuffer.dataSize > 0)
+ {
+ const char * const sendPtr = m_sendBuffer.sendPtr;
+ const Uint32 sizeToSend = m_sendBuffer.sendDataSize;
const int nBytesSent = send(theSocket, sendPtr, sizeToSend, 0);
- if (nBytesSent > 0) {
+ if (nBytesSent > 0)
+ {
+ sent_any = true;
m_sendBuffer.bytesSent(nBytesSent);
sendCount ++;
sendSize += nBytesSent;
- if(sendCount == reportFreq){
+ if(sendCount == reportFreq)
+ {
reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize);
sendCount = 0;
sendSize = 0;
}
- } else {
+ }
+ else
+ {
+ if (nBytesSent < 0 && InetErrno == EAGAIN && sent_any)
+ break;
+
// Send failed
#if defined DEBUG_TRANSPORTER
g_eventLogger.error("Send Failure(disconnect==%d) to node = %d nBytesSent = %d "
diff --git a/storage/ndb/src/common/util/Bitmask.cpp b/storage/ndb/src/common/util/Bitmask.cpp
index edfe2363039..22919fe585a 100644
--- a/storage/ndb/src/common/util/Bitmask.cpp
+++ b/storage/ndb/src/common/util/Bitmask.cpp
@@ -20,28 +20,59 @@ void
BitmaskImpl::getFieldImpl(const Uint32 src[],
unsigned shiftL, unsigned len, Uint32 dst[])
{
+ /* Copy whole words of src to dst, shifting src left
+ * by shiftL. Undefined bits of the last written dst word
+ * should be zeroed.
+ */
assert(shiftL < 32);
unsigned shiftR = 32 - shiftL;
unsigned undefined = shiftL ? ~0 : 0;
+ /* Merge first word with previously set bits if there's a shift */
* dst = shiftL ? * dst : 0;
-
- while(len >= 32)
- {
- * dst++ |= (* src) << shiftL;
- * dst = ((* src++) >> shiftR) & undefined;
- len -= 32;
- }
-
- if(len < shiftR)
+
+ /* Treat the zero-shift case separately to avoid
+ * trampling or reading past the end of src
+ */
+ if (shiftL == 0)
{
- * dst |= ((* src) & ((1 << len) - 1)) << shiftL;
+ while(len >= 32)
+ {
+ * dst++ = * src++;
+ len -=32;
+ }
+
+ if (len != 0)
+ {
+ /* Last word has some bits set */
+ Uint32 mask= ((1 << len) -1); // 0000111
+ * dst = (* src) & mask;
+ }
}
- else
+ else // shiftL !=0, need to build each word from two words shifted
{
- * dst++ |= ((* src) << shiftL);
- * dst = ((* src) >> shiftR) & ((1 << (len - shiftR)) - 1) & undefined;
+ while(len >= 32)
+ {
+ * dst++ |= (* src) << shiftL;
+ * dst = ((* src++) >> shiftR) & undefined;
+ len -= 32;
+ }
+
+ /* Have space for shiftR more bits in the current dst word
+ * is that enough?
+ */
+ if(len <= shiftR)
+ {
+ /* Fit the remaining bits in the current dst word */
+ * dst |= ((* src) & ((1 << len) - 1)) << shiftL;
+ }
+ else
+ {
+ /* Need to write to two dst words */
+ * dst++ |= ((* src) << shiftL);
+ * dst = ((* src) >> shiftR) & ((1 << (len - shiftR)) - 1) & undefined;
+ }
}
}
@@ -64,370 +95,23 @@ BitmaskImpl::setFieldImpl(Uint32 dst[],
len -= 32;
}
+ /* Copy last bits */
Uint32 mask = ((1 << len) -1);
* dst = (* dst & ~mask);
- if(len < shiftR)
+ if(len <= shiftR)
{
+ /* Remaining bits fit in current word */
* dst |= ((* src++) >> shiftL) & mask;
}
else
{
+ /* Remaining bits update 2 words */
* dst |= ((* src++) >> shiftL);
* dst |= ((* src) & ((1 << (len - shiftR)) - 1)) << shiftR ;
}
}
-#ifdef __TEST_BITMASK__
-
-static
-void print(const Uint32 src[], Uint32 len, Uint32 pos = 0)
-{
- printf("b'");
- for(unsigned i = 0; i<len; i++)
- {
- if(BitmaskImpl::get((pos + len + 31) >> 5, src, i+pos))
- printf("1");
- else
- printf("0");
- if((i & 31) == 31)
- printf(" ");
- }
-}
-
-
-#define DEBUG 0
-#include <Vector.hpp>
-static void do_test(int bitmask_size);
-
-int
-main(int argc, char** argv)
-{
- int loops = argc > 1 ? atoi(argv[1]) : 1000;
- int max_size = argc > 2 ? atoi(argv[2]) : 1000;
-
-
- for(int i = 0; i<loops; i++)
- do_test(1 + (rand() % max_size));
-}
-
-struct Alloc
-{
- Uint32 pos;
- Uint32 size;
- Vector<Uint32> data;
-};
-
-static void require(bool b)
-{
- if(!b) abort();
-}
-
-static
-bool cmp(const Uint32 b1[], const Uint32 b2[], Uint32 len)
-{
- Uint32 sz32 = (len + 31) >> 5;
- for(int i = 0; i<len; i++)
- {
- if(BitmaskImpl::get(sz32, b1, i) ^ BitmaskImpl::get(sz32, b2, i))
- return false;
- }
- return true;
-}
-
-
-static int val_pos = 0;
-static int val[] = { 384, 241, 32,
- 1,1,1,1, 0,0,0,0, 1,1,1,1, 0,0,0,0,
- 241 };
-
-static int lrand()
-{
-#if 0
- return val[val_pos++];
-#else
- return rand();
-#endif
-}
-
-static
-void rand(Uint32 dst[], Uint32 len)
-{
- for(int i = 0; i<len; i++)
- BitmaskImpl::set((len + 31) >> 5, dst, i, (lrand() % 1000) > 500);
-}
-
-static
-void simple(int pos, int size)
-{
- ndbout_c("simple pos: %d size: %d", pos, size);
- Vector<Uint32> _mask;
- Vector<Uint32> _src;
- Vector<Uint32> _dst;
- Uint32 sz32 = (size + pos + 32) >> 5;
- const Uint32 sz = 4 * sz32;
-
- Uint32 zero = 0;
- _mask.fill(sz32+1, zero);
- _src.fill(sz32+1, zero);
- _dst.fill(sz32+1, zero);
-
- Uint32 * src = _src.getBase();
- Uint32 * dst = _dst.getBase();
- Uint32 * mask = _mask.getBase();
-
- memset(src, 0x0, sz);
- memset(dst, 0x0, sz);
- memset(mask, 0xFF, sz);
- rand(src, size);
- BitmaskImpl::setField(sz32, mask, pos, size, src);
- BitmaskImpl::getField(sz32, mask, pos, size, dst);
- printf("src: "); print(src, size+31); printf("\n");
- printf("msk: "); print(mask, (sz32 << 5) + 31); printf("\n");
- printf("dst: "); print(dst, size+31); printf("\n");
- require(cmp(src, dst, size+31));
-};
-
-static
-void simple2(int size, int loops)
-{
- ndbout_c("simple2 %d - ", size);
- Vector<Uint32> _mask;
- Vector<Uint32> _src;
- Vector<Uint32> _dst;
-
- Uint32 sz32 = (size + 32) >> 5;
- Uint32 sz = sz32 << 2;
-
- Uint32 zero = 0;
- _mask.fill(sz32+1, zero);
- _src.fill(sz32+1, zero);
- _dst.fill(sz32+1, zero);
-
- Uint32 * src = _src.getBase();
- Uint32 * dst = _dst.getBase();
- Uint32 * mask = _mask.getBase();
-
- Vector<Uint32> save;
- for(int i = 0; i<loops; i++)
- {
- memset(mask, 0xFF, sz);
- memset(dst, 0xFF, sz);
- int len;
- int pos = 0;
- while(pos+1 < size)
- {
- memset(src, 0xFF, sz);
- while(!(len = rand() % (size - pos)));
- BitmaskImpl::setField(sz32, mask, pos, len, src);
- if(memcmp(dst, mask, sz))
- {
- ndbout_c("pos: %d len: %d", pos, len);
- print(mask, size);
- abort();
- }
- printf("[ %d %d ]", pos, len);
- save.push_back(pos);
- save.push_back(len);
- pos += len;
- }
-
- for(int j = 0; j<save.size(); )
- {
- pos = save[j++];
- len = save[j++];
- memset(src, 0xFF, sz);
- BitmaskImpl::getField(sz32, mask, pos, len, src);
- if(memcmp(dst, src, sz))
- {
- ndbout_c("pos: %d len: %d", pos, len);
- printf("src: "); print(src, size); printf("\n");
- printf("dst: "); print(dst, size); printf("\n");
- printf("msk: "); print(mask, size); printf("\n");
- abort();
- }
- }
- ndbout_c("");
- }
-}
-
-static void
-do_test(int bitmask_size)
-{
-#if 1
- simple(rand() % 33, (rand() % 63)+1);
-//#else
- Vector<Alloc> alloc_list;
- bitmask_size = (bitmask_size + 31) & ~31;
- Uint32 sz32 = (bitmask_size >> 5);
- Vector<Uint32> alloc_mask;
- Vector<Uint32> test_mask;
-
- ndbout_c("Testing bitmask of size %d", bitmask_size);
- Uint32 zero = 0;
- alloc_mask.fill(sz32, zero);
- test_mask.fill(sz32, zero);
-
- for(int i = 0; i<5000; i++)
- {
- Vector<Uint32> tmp;
- tmp.fill(sz32, zero);
-
- int pos = lrand() % (bitmask_size - 1);
- int free = 0;
- if(BitmaskImpl::get(sz32, alloc_mask.getBase(), pos))
- {
- // Bit was allocated
- // 1) Look up allocation
- // 2) Check data
- // 3) free it
- size_t j;
- int min, max;
- for(j = 0; j<alloc_list.size(); j++)
- {
- min = alloc_list[j].pos;
- max = min + alloc_list[j].size;
- if(pos >= min && pos < max)
- {
- break;
- }
- }
- require(pos >= min && pos < max);
- BitmaskImpl::getField(sz32, test_mask.getBase(), min, max-min,
- tmp.getBase());
- if(DEBUG)
- {
- printf("freeing [ %d %d ]", min, max);
- printf("- mask: ");
- print(tmp.getBase(), max - min);
-
- printf(" save: ");
- size_t k;
- Alloc& a = alloc_list[j];
- for(k = 0; k<a.data.size(); k++)
- printf("%.8x ", a.data[k]);
- printf("\n");
- }
- int bytes = (max - min + 7) >> 3;
- if(!cmp(tmp.getBase(), alloc_list[j].data.getBase(), max - min))
- {
- abort();
- }
- while(min < max)
- BitmaskImpl::clear(sz32, alloc_mask.getBase(), min++);
- alloc_list.erase(j);
- }
- else
- {
- Vector<Uint32> tmp;
- tmp.fill(sz32, zero);
-
- // Bit was free
- // 1) Check how much space is avaiable
- // 2) Create new allocation of lrandom size
- // 3) Fill data with lrandom data
- // 4) Update alloc mask
- while(pos+free < bitmask_size &&
- !BitmaskImpl::get(sz32, alloc_mask.getBase(), pos+free))
- free++;
-
- Uint32 sz =
- (free <= 64 && ((lrand() % 100) > 80)) ? free : (lrand() % free);
- sz = sz ? sz : 1;
- sz = pos + sz == bitmask_size ? sz - 1 : sz;
- Alloc a;
- a.pos = pos;
- a.size = sz;
- a.data.fill(((sz+31)>> 5)-1, zero);
- if(DEBUG)
- printf("pos %d -> alloc [ %d %d ]", pos, pos, pos+sz);
- for(size_t j = 0; j<sz; j++)
- {
- BitmaskImpl::set(sz32, alloc_mask.getBase(), pos+j);
- if((lrand() % 1000) > 500)
- BitmaskImpl::set((sz + 31) >> 5, a.data.getBase(), j);
- }
- if(DEBUG)
- {
- printf("- mask: ");
- print(a.data.getBase(), sz);
- printf("\n");
- }
- BitmaskImpl::setField(sz32, test_mask.getBase(), pos, sz,
- a.data.getBase());
- alloc_list.push_back(a);
- }
- }
-
- for(Uint32 i = 0; i<1000; i++)
- {
- Uint32 sz32 = 10+rand() % 100;
- Uint32 zero = 0;
- Vector<Uint32> map;
- map.fill(sz32, zero);
-
- Uint32 sz = 32 * sz32;
- Uint32 start = (rand() % sz);
- Uint32 stop = start + ((rand() % (sz - start)) & 0xFFFFFFFF);
-
- Vector<Uint32> check;
- check.fill(sz32, zero);
-
- for(Uint32 j = 0; j<sz; j++)
- {
- bool expect = (j >= start && j<stop);
- if(expect)
- BitmaskImpl::set(sz32, check.getBase(), j);
- }
-
- BitmaskImpl::set(sz32, map.getBase(), start, stop);
- if (!BitmaskImpl::equal(sz32, map.getBase(), check.getBase()))
- {
- ndbout_c(" FAIL sz: %d [ %d %d ]", sz, start, stop);
- printf("check: ");
- for(Uint32 j = 0; j<sz32; j++)
- printf("%.8x ", check[j]);
- printf("\n");
-
- printf("map : ");
- for(Uint32 j = 0; j<sz32; j++)
- printf("%.8x ", map[j]);
- printf("\n");
- abort();
- }
-
- map.clear();
- check.clear();
-
- Uint32 one = ~(Uint32)0;
- map.fill(sz32, one);
- check.fill(sz32, one);
-
- for(Uint32 j = 0; j<sz; j++)
- {
- bool expect = (j >= start && j<stop);
- if(expect)
- BitmaskImpl::clear(sz32, check.getBase(), j);
- }
-
- BitmaskImpl::clear(sz32, map.getBase(), start, stop);
- if (!BitmaskImpl::equal(sz32, map.getBase(), check.getBase()))
- {
- ndbout_c(" FAIL sz: %d [ %d %d ]", sz, start, stop);
- printf("check: ");
- for(Uint32 j = 0; j<sz32; j++)
- printf("%.8x ", check[j]);
- printf("\n");
-
- printf("map : ");
- for(Uint32 j = 0; j<sz32; j++)
- printf("%.8x ", map[j]);
- printf("\n");
- abort();
- }
- }
-#endif
-}
-
-template class Vector<Alloc>;
-template class Vector<Uint32>;
-#endif
+/* Bitmask testcase code moved from here to
+ * storage/ndb/test/ndbapi/testBitfield.cpp
+ * to get coverage from automated testing
+ */
diff --git a/storage/ndb/src/common/util/NdbOut.cpp b/storage/ndb/src/common/util/NdbOut.cpp
index 7ca7c91e266..61de2be7572 100644
--- a/storage/ndb/src/common/util/NdbOut.cpp
+++ b/storage/ndb/src/common/util/NdbOut.cpp
@@ -29,7 +29,7 @@ static const char * fms[] = {
"%d", "0x%08x", // Int32
"%u", "0x%08x", // Uint32
"%lld", "0x%016llx", // Int64
- "%llu", "0x%016llx" // Uint64
+ "%llu", "0x%016llx", // Uint64
"%llu", "0x%016llx" // UintPtr
};
diff --git a/storage/ndb/src/kernel/blocks/ERROR_codes.txt b/storage/ndb/src/kernel/blocks/ERROR_codes.txt
index 72791cb0ebc..150400b9deb 100644
--- a/storage/ndb/src/kernel/blocks/ERROR_codes.txt
+++ b/storage/ndb/src/kernel/blocks/ERROR_codes.txt
@@ -3,15 +3,18 @@ Next NDBCNTR 1002
Next NDBFS 2000
Next DBACC 3002
Next DBTUP 4029
-Next DBLQH 5047
+Next DBLQH 5050
Next DBDICT 6008
Next DBDIH 7195
-Next DBTC 8054
+Next DBTC 8058
Next CMVMI 9000
Next BACKUP 10038
Next DBUTIL 11002
Next DBTUX 12008
Next SUMA 13034
+Next SUMA 13036
+Next LGMAN 15001
+Next TSMAN 16001
TESTING NODE FAILURE, ARBITRATION
---------------------------------
@@ -260,6 +263,9 @@ Delay execution of ABORTCONF signal 2 seconds to generate time-out.
8053: Crash in timeOutFoundLab, state CS_WAIT_COMMIT_CONF
+5048: Crash in execCOMMIT
+5049: SET_ERROR_INSERT_VALUE(5048)
+
ERROR CODES FOR TESTING TIME-OUT HANDLING IN DBTC
-------------------------------------------------
@@ -316,6 +322,8 @@ ABORT OF TCKEYREQ
8038 : Simulate API disconnect just after SCAN_TAB_REQ
+8057 : Send only 1 COMMIT per timeslice
+
8052 : Simulate failure of TransactionBufferMemory allocation for OI lookup
8051 : Simulate failure of allocation for saveINDXKEYINFO
@@ -547,3 +555,11 @@ NDBCNTR:
1000: Crash insertion on SystemError::CopyFragRef
1001: Delay sending NODE_FAILREP (to own node), until error is cleared
+
+LGMAN:
+-----
+15000: Fail to create log file
+
+TSMAN:
+-----
+16000: Fail to create data file
diff --git a/storage/ndb/src/kernel/blocks/backup/Backup.cpp b/storage/ndb/src/kernel/blocks/backup/Backup.cpp
index 45501bf50d5..5cc486ff83b 100644
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp
@@ -326,16 +326,18 @@ Backup::execCONTINUEB(Signal* signal)
ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
FsBuffer & buf = filePtr.p->operation.dataBuffer;
- if(buf.getFreeSize() + buf.getMinRead() < buf.getUsableSize()) {
+ if(buf.getFreeSize() < buf.getMaxWrite()) {
jam();
TablePtr tabPtr LINT_SET_PTR;
c_tablePool.getPtr(tabPtr, Tdata2);
- DEBUG_OUT("Backup - Buffer full - " << buf.getFreeSize()
- << " + " << buf.getMinRead()
- << " < " << buf.getUsableSize()
- << " - tableId = " << tabPtr.p->tableId);
-
+ DEBUG_OUT("Backup - Buffer full - "
+ << buf.getFreeSize()
+ << " < " << buf.getMaxWrite()
+ << " (sz: " << buf.getUsableSize()
+ << " getMinRead: " << buf.getMinRead()
+ << ") - tableId = " << tabPtr.p->tableId);
+
signal->theData[0] = BackupContinueB::BUFFER_FULL_META;
signal->theData[1] = Tdata1;
signal->theData[2] = Tdata2;
@@ -967,6 +969,7 @@ Backup::checkNodeFail(Signal* signal,
ref->backupPtr = ptr.i;
ref->backupId = ptr.p->backupId;
ref->errorCode = AbortBackupOrd::BackupFailureDueToNodeFail;
+ ref->nodeId = getOwnNodeId();
gsn= GSN_STOP_BACKUP_REF;
len= StopBackupRef::SignalLength;
pos= &ref->nodeId - signal->getDataPtr();
@@ -2081,6 +2084,15 @@ Backup::sendDropTrig(Signal* signal, BackupRecordPtr ptr)
/**
* Insert footers
*/
+ //if backup error, we needn't insert footers
+ if(ptr.p->checkError())
+ {
+ jam();
+ closeFiles(signal, ptr);
+ ptr.p->errorCode = 0;
+ return;
+ }
+
{
BackupFilePtr filePtr LINT_SET_PTR;
ptr.p->files.getPtr(filePtr, ptr.p->logFilePtr);
@@ -4187,6 +4199,37 @@ Backup::checkFile(Signal* signal, BackupFilePtr filePtr)
#if 0
ndbout << "Ptr to data = " << hex << tmp << endl;
#endif
+ BackupRecordPtr ptr LINT_SET_PTR;
+ c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
+
+ if (ERROR_INSERTED(10036))
+ {
+ jam();
+ filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_FILE_THREAD;
+ filePtr.p->errorCode = 2810;
+ ptr.p->setErrorCode(2810);
+
+ if(ptr.p->m_gsn == GSN_STOP_BACKUP_REQ)
+ {
+ jam();
+ closeFile(signal, ptr, filePtr);
+ }
+ return;
+ }
+
+ if(filePtr.p->errorCode != 0)
+ {
+ jam();
+ ptr.p->setErrorCode(filePtr.p->errorCode);
+
+ if(ptr.p->m_gsn == GSN_STOP_BACKUP_REQ)
+ {
+ jam();
+ closeFile(signal, ptr, filePtr);
+ }
+ return;
+ }
+
if (!ready_to_write(ready, sz, eof, filePtr.p))
{
jam();
@@ -4218,8 +4261,6 @@ Backup::checkFile(Signal* signal, BackupFilePtr filePtr)
ndbrequire(flags & BackupFile::BF_OPEN);
ndbrequire(flags & BackupFile::BF_FILE_THREAD);
- BackupRecordPtr ptr LINT_SET_PTR;
- c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
closeFile(signal, ptr, filePtr);
}
@@ -4582,6 +4623,22 @@ Backup::closeFilesDone(Signal* signal, BackupRecordPtr ptr)
jam();
+ //error when do insert footer or close file
+ if(ptr.p->checkError())
+ {
+ StopBackupRef * ref = (StopBackupRef*)signal->getDataPtr();
+ ref->backupPtr = ptr.i;
+ ref->backupId = ptr.p->backupId;
+ ref->errorCode = ptr.p->errorCode;
+ ref->nodeId = getOwnNodeId();
+ sendSignal(ptr.p->masterRef, GSN_STOP_BACKUP_REF, signal,
+ StopBackupConf::SignalLength, JBB);
+
+ ptr.p->m_gsn = GSN_STOP_BACKUP_REF;
+ ptr.p->slaveState.setState(CLEANING);
+ return;
+ }
+
StopBackupConf* conf = (StopBackupConf*)signal->getDataPtrSend();
conf->backupId = ptr.p->backupId;
conf->backupPtr = ptr.i;
diff --git a/storage/ndb/src/kernel/blocks/backup/Backup.hpp b/storage/ndb/src/kernel/blocks/backup/Backup.hpp
index 3fd9b2967fd..ad6e3b8fadc 100644
--- a/storage/ndb/src/kernel/blocks/backup/Backup.hpp
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.hpp
@@ -557,8 +557,8 @@ public:
NDB_TICKS m_reset_disk_speed_time;
static const int DISK_SPEED_CHECK_DELAY = 100;
- STATIC_CONST(NO_OF_PAGES_META_FILE =
- (MAX_WORDS_META_FILE + BACKUP_WORDS_PER_PAGE - 1) /
+ STATIC_CONST(NO_OF_PAGES_META_FILE =
+ (2*MAX_WORDS_META_FILE + BACKUP_WORDS_PER_PAGE - 1) /
BACKUP_WORDS_PER_PAGE);
/**
diff --git a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
index 63d22bd0a37..1805d6ef4f8 100644
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
@@ -3210,7 +3210,6 @@ Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, Ptr<Operationrec> opPtr)
{
dump_lock_queue(opPtr);
ndbrequire(opPtr.p->nextParallelQue == RNIL);
- ndbrequire(opPtr.p->nextSerialQue == RNIL);
ndbrequire(opPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED);
ndbrequire(opPtr.p->m_op_bits & Operationrec::OP_COMMIT_DELETE_CHECK);
ndbrequire((opPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_RUNNING);
diff --git a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
index 7ced078144a..a61a5bc035c 100644
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
@@ -4086,9 +4086,7 @@ Dbdict::execALTER_TABLE_REQ(Signal* signal)
bool ok = false;
switch(tabState){
case TableRecord::NOT_DEFINED:
- case TableRecord::REORG_TABLE_PREPARED:
case TableRecord::DEFINING:
- case TableRecord::CHECKED:
jam();
alterTableRef(signal, req, AlterTableRef::NoSuchTable);
return;
@@ -4339,9 +4337,7 @@ Dbdict::execALTER_TAB_REQ(Signal * signal)
bool ok = false;
switch(tabState){
case TableRecord::NOT_DEFINED:
- case TableRecord::REORG_TABLE_PREPARED:
case TableRecord::DEFINING:
- case TableRecord::CHECKED:
jam();
alterTabRef(signal, req, AlterTableRef::NoSuchTable);
return;
@@ -6690,9 +6686,7 @@ Dbdict::execDROP_TABLE_REQ(Signal* signal){
bool ok = false;
switch(tabState){
case TableRecord::NOT_DEFINED:
- case TableRecord::REORG_TABLE_PREPARED:
case TableRecord::DEFINING:
- case TableRecord::CHECKED:
jam();
dropTableRef(signal, req, DropTableRef::NoSuchTable);
return;
@@ -7718,7 +7712,6 @@ Dbdict::execLIST_TABLES_REQ(Signal* signal)
if(DictTabInfo::isTable(type)){
switch (tablePtr.p->tabState) {
case TableRecord::DEFINING:
- case TableRecord::CHECKED:
conf->setTableState(pos, DictTabInfo::StateBuilding);
break;
case TableRecord::PREPARE_DROPPING:
diff --git a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
index 3fff330d699..1189b23c14d 100644
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
@@ -320,9 +320,7 @@ public:
enum TabState {
NOT_DEFINED = 0,
- REORG_TABLE_PREPARED = 1,
DEFINING = 2,
- CHECKED = 3,
DEFINED = 4,
PREPARE_DROPPING = 5,
DROPPING = 6,
diff --git a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
index bbacb300089..e2b1058242d 100644
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
@@ -8464,6 +8464,12 @@ void Dbdih::execDIHNDBTAMPER(Signal* signal)
} else if (tuserpointer < 15000) {
jam();
tuserblockref = DBDICT_REF;
+ } else if (tuserpointer < 16000) {
+ jam();
+ tuserblockref = LGMAN_REF;
+ } else if (tuserpointer < 17000) {
+ jam();
+ tuserblockref = TSMAN_REF;
} else if (tuserpointer < 30000) {
/*--------------------------------------------------------------------*/
// Ignore errors in the 20000-range.
diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index 83d38595c1f..bc8adf6fd32 100644
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -5959,6 +5959,12 @@ void Dblqh::execCOMMIT(Signal* signal)
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TRACE_OP(regTcPtr, "COMMIT");
+
+ CRASH_INSERTION(5048);
+ if (ERROR_INSERTED(5049))
+ {
+ SET_ERROR_INSERT_VALUE(5048);
+ }
commitReqLab(signal, gci);
return;
diff --git a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
index e584883e3b6..043df5d5038 100644
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
@@ -4495,7 +4495,7 @@ void Dbtc::commit020Lab(Signal* signal)
if (localTcConnectptr.i != RNIL) {
Tcount = Tcount + 1;
- if (Tcount < 16) {
+ if (Tcount < 16 && !ERROR_INSERTED(8057)) {
ptrCheckGuard(localTcConnectptr,
TtcConnectFilesize, localTcConnectRecord);
jam();
@@ -4514,6 +4514,9 @@ void Dbtc::commit020Lab(Signal* signal)
}//if
} else {
jam();
+ if (ERROR_INSERTED(8057))
+ CLEAR_ERROR_INSERT_VALUE;
+
regApiPtr->apiConnectstate = CS_COMMIT_SENT;
return;
}//if
diff --git a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
index 45d124b8d7d..decb47e9758 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
@@ -251,6 +251,7 @@ inline const Uint32* ALIGN_WORD(const void* ptr)
#define ZUNSUPPORTED_BRANCH 892
#define ZSTORED_SEIZE_ATTRINBUFREC_ERROR 873 // Part of Scan
+#define ZSTORED_TOO_MUCH_ATTRINFO_ERROR 874
#define ZREAD_ONLY_CONSTRAINT_VIOLATION 893
#define ZVAR_SIZED_NOT_SUPPORTED 894
@@ -1330,6 +1331,11 @@ typedef Ptr<HostBuffer> HostBufferPtr;
struct Tuple_header
{
union {
+ /**
+ * List of prepared operations for this tuple.
+ * Points to most recent/last operation, ie. to walk the list must follow
+ * regOperPtr->prevActiveOp links.
+ */
Uint32 m_operation_ptr_i; // OperationPtrI
Uint32 m_base_record_ref; // For disk tuple, ref to MM tuple
};
@@ -1558,7 +1564,7 @@ public:
/*
* TUX checks if tuple is visible to scan.
*/
- bool tuxQueryTh(Uint32 fragPtrI, Uint32 tupAddr, Uint32 tupVersion, Uint32 transId1, Uint32 transId2, Uint32 savePointId);
+ bool tuxQueryTh(Uint32 fragPtrI, Uint32 pageId, Uint32 pageIndex, Uint32 tupVersion, Uint32 transId1, Uint32 transId2, bool dirty, Uint32 savepointId);
int load_diskpage(Signal*, Uint32 opRec, Uint32 fragPtrI,
Uint32 local_key, Uint32 flags);
@@ -2202,17 +2208,20 @@ private:
void
checkImmediateTriggersAfterInsert(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
- Tablerec* tablePtr);
+ Tablerec* tablePtr,
+ bool disk);
void
checkImmediateTriggersAfterUpdate(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
- Tablerec* tablePtr);
+ Tablerec* tablePtr,
+ bool disk);
void
checkImmediateTriggersAfterDelete(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
- Tablerec* tablePtr);
+ Tablerec* tablePtr,
+ bool disk);
#if 0
void checkDeferredTriggers(Signal* signal,
@@ -2226,7 +2235,8 @@ private:
void fireImmediateTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList,
- Operationrec* regOperPtr);
+ Operationrec* regOperPtr,
+ bool disk);
void fireDeferredTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList,
@@ -2239,12 +2249,13 @@ private:
void executeTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList,
- Operationrec* regOperPtr);
+ Operationrec* regOperPtr,
+ bool disk);
void executeTrigger(KeyReqStruct *req_struct,
TupTriggerData* trigPtr,
Operationrec* regOperPtr,
- bool disk = true);
+ bool disk);
bool readTriggerInfo(TupTriggerData* trigPtr,
Operationrec* regOperPtr,
@@ -2421,6 +2432,7 @@ private:
void setNullBits(Uint32*, Tablerec* regTabPtr);
bool checkNullAttributes(KeyReqStruct * const, Tablerec* const);
+ bool find_savepoint(OperationrecPtr& loopOpPtr, Uint32 savepointId);
bool setup_read(KeyReqStruct* req_struct,
Operationrec* regOperPtr,
Fragrecord* regFragPtr,
@@ -2533,7 +2545,8 @@ private:
Operationrec* regOperPtr,
Uint32 lenAttrInfo);
void storedSeizeAttrinbufrecErrorLab(Signal* signal,
- Operationrec* regOperPtr);
+ Operationrec* regOperPtr,
+ Uint32 errorCode);
bool storedProcedureAttrInfo(Signal* signal,
Operationrec* regOperPtr,
const Uint32* data,
@@ -2874,7 +2887,7 @@ private:
void verify_page_lists(Disk_alloc_info&) {}
#endif
- void fix_commit_order(OperationrecPtr);
+ void findFirstOp(OperationrecPtr&);
void commit_operation(Signal*, Uint32, Tuple_header*, PagePtr,
Operationrec*, Fragrecord*, Tablerec*);
@@ -3036,4 +3049,21 @@ Dbtup::get_dd_ptr(PagePtr* pagePtr,
NdbOut&
operator<<(NdbOut&, const Dbtup::Tablerec&);
+inline
+bool Dbtup::find_savepoint(OperationrecPtr& loopOpPtr, Uint32 savepointId)
+{
+ while (true) {
+ if (savepointId > loopOpPtr.p->savepointId) {
+ jam();
+ return true;
+ }
+ loopOpPtr.i = loopOpPtr.p->prevActiveOp;
+ if (loopOpPtr.i == RNIL) {
+ break;
+ }
+ c_operation_pool.getPtr(loopOpPtr);
+ }
+ return false;
+}
+
#endif
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
index 59adfbfde89..93a160a4df3 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
@@ -385,3 +385,38 @@ void Dbtup::send_TUPKEYREF(Signal* signal,
TupKeyRef::SignalLength, JBB);
}
+/**
+ * Unlink one operation from the m_operation_ptr_i list in the tuple.
+ */
+void Dbtup::removeActiveOpList(Operationrec* const regOperPtr,
+ Tuple_header *tuple_ptr)
+{
+ OperationrecPtr raoOperPtr;
+
+ if(!regOperPtr->m_copy_tuple_location.isNull())
+ {
+ jam();
+ c_undo_buffer.free_copy_tuple(&regOperPtr->m_copy_tuple_location);
+ }
+
+ if (regOperPtr->op_struct.in_active_list) {
+ regOperPtr->op_struct.in_active_list= false;
+ if (regOperPtr->nextActiveOp != RNIL) {
+ jam();
+ raoOperPtr.i= regOperPtr->nextActiveOp;
+ c_operation_pool.getPtr(raoOperPtr);
+ raoOperPtr.p->prevActiveOp= regOperPtr->prevActiveOp;
+ } else {
+ jam();
+ tuple_ptr->m_operation_ptr_i = regOperPtr->prevActiveOp;
+ }
+ if (regOperPtr->prevActiveOp != RNIL) {
+ jam();
+ raoOperPtr.i= regOperPtr->prevActiveOp;
+ c_operation_pool.getPtr(raoOperPtr);
+ raoOperPtr.p->nextActiveOp= regOperPtr->nextActiveOp;
+ }
+ regOperPtr->prevActiveOp= RNIL;
+ regOperPtr->nextActiveOp= RNIL;
+ }
+}
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
index 812f071e037..f56e772c8b9 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
@@ -97,39 +97,6 @@ void Dbtup::execTUP_WRITELOG_REQ(Signal* signal)
} while (true);
}
-void Dbtup::removeActiveOpList(Operationrec* const regOperPtr,
- Tuple_header *tuple_ptr)
-{
- OperationrecPtr raoOperPtr;
-
- /**
- * Release copy tuple
- */
- if(!regOperPtr->m_copy_tuple_location.isNull())
- c_undo_buffer.free_copy_tuple(&regOperPtr->m_copy_tuple_location);
-
- if (regOperPtr->op_struct.in_active_list) {
- regOperPtr->op_struct.in_active_list= false;
- if (regOperPtr->nextActiveOp != RNIL) {
- jam();
- raoOperPtr.i= regOperPtr->nextActiveOp;
- c_operation_pool.getPtr(raoOperPtr);
- raoOperPtr.p->prevActiveOp= regOperPtr->prevActiveOp;
- } else {
- jam();
- tuple_ptr->m_operation_ptr_i = regOperPtr->prevActiveOp;
- }
- if (regOperPtr->prevActiveOp != RNIL) {
- jam();
- raoOperPtr.i= regOperPtr->prevActiveOp;
- c_operation_pool.getPtr(raoOperPtr);
- raoOperPtr.p->nextActiveOp= regOperPtr->nextActiveOp;
- }
- regOperPtr->prevActiveOp= RNIL;
- regOperPtr->nextActiveOp= RNIL;
- }
-}
-
/* ---------------------------------------------------------------- */
/* INITIALIZATION OF ONE CONNECTION RECORD TO PREPARE FOR NEXT OP. */
/* ---------------------------------------------------------------- */
@@ -142,6 +109,7 @@ void Dbtup::initOpConnection(Operationrec* regOperPtr)
regOperPtr->op_struct.m_disk_preallocated= 0;
regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
regOperPtr->op_struct.m_wait_log_buffer= 0;
+ regOperPtr->op_struct.in_active_list = false;
regOperPtr->m_undo_buffer_space= 0;
}
@@ -170,6 +138,7 @@ Dbtup::dealloc_tuple(Signal* signal,
Uint32 extra_bits = Tuple_header::FREED;
if (bits & Tuple_header::DISK_PART)
{
+ jam();
Local_key disk;
memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
PagePtr tmpptr;
@@ -182,6 +151,7 @@ Dbtup::dealloc_tuple(Signal* signal,
if (! (bits & (Tuple_header::LCP_SKIP | Tuple_header::ALLOC)) &&
lcpScan_ptr_i != RNIL)
{
+ jam();
ScanOpPtr scanOp;
c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
Local_key rowid = regOperPtr->m_tuple_location;
@@ -189,6 +159,7 @@ Dbtup::dealloc_tuple(Signal* signal,
rowid.m_page_no = page->frag_page_id;
if (rowid > scanpos)
{
+ jam();
extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
ptr->m_operation_ptr_i = lcp_keep_list;
regFragPtr->m_lcp_keep_list = rowid.ref();
@@ -229,11 +200,13 @@ Dbtup::commit_operation(Signal* signal,
Uint32 mm_vars= regTabPtr->m_attributes[MM].m_no_of_varsize;
if(mm_vars == 0)
{
+ jam();
memcpy(tuple_ptr, copy, 4*fixsize);
disk_ptr= (Tuple_header*)(((Uint32*)copy)+fixsize);
}
else
{
+ jam();
/**
* Var_part_ref is only stored in *allocated* tuple
* so memcpy from copy, will over write it...
@@ -258,6 +231,7 @@ Dbtup::commit_operation(Signal* signal,
if(copy_bits & Tuple_header::MM_SHRINK)
{
+ jam();
vpagePtrP->shrink_entry(tmp.m_page_idx, (sz + 3) >> 2);
update_free_page_list(regFragPtr, vpagePtr);
}
@@ -268,6 +242,7 @@ Dbtup::commit_operation(Signal* signal,
if (regTabPtr->m_no_of_disk_attributes &&
(copy_bits & Tuple_header::DISK_INLINE))
{
+ jam();
Local_key key;
memcpy(&key, copy->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
Uint32 logfile_group_id= regFragPtr->m_logfile_group_id;
@@ -278,22 +253,26 @@ Dbtup::commit_operation(Signal* signal,
Uint32 sz, *dst;
if(copy_bits & Tuple_header::DISK_ALLOC)
{
+ jam();
disk_page_alloc(signal, regTabPtr, regFragPtr, &key, diskPagePtr, gci);
}
if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
{
+ jam();
sz= regTabPtr->m_offsets[DD].m_fix_header_size;
dst= ((Fix_page*)diskPagePtr.p)->get_ptr(key.m_page_idx, sz);
}
else
{
+ jam();
dst= ((Var_page*)diskPagePtr.p)->get_ptr(key.m_page_idx);
sz= ((Var_page*)diskPagePtr.p)->get_entry_len(key.m_page_idx);
}
if(! (copy_bits & Tuple_header::DISK_ALLOC))
{
+ jam();
disk_page_undo_update(diskPagePtr.p,
&key, dst, sz, gci, logfile_group_id);
}
@@ -307,6 +286,7 @@ Dbtup::commit_operation(Signal* signal,
if(lcpScan_ptr_i != RNIL && (bits & Tuple_header::ALLOC))
{
+ jam();
ScanOpPtr scanOp;
c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
Local_key rowid = regOperPtr->m_tuple_location;
@@ -314,6 +294,7 @@ Dbtup::commit_operation(Signal* signal,
rowid.m_page_no = pagePtr.p->frag_page_id;
if(rowid > scanpos)
{
+ jam();
copy_bits |= Tuple_header::LCP_SKIP;
}
}
@@ -372,7 +353,10 @@ Dbtup::disk_page_commit_callback(Signal* signal,
execTUP_COMMITREQ(signal);
if(signal->theData[0] == 0)
+ {
+ jam();
c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
+ }
}
void
@@ -407,35 +391,21 @@ Dbtup::disk_page_log_buffer_callback(Signal* signal,
c_lqh->tupcommit_conf_callback(signal, regOperPtr.p->userpointer);
}
+/**
+ * Move to the first operation performed on this tuple
+ */
void
-Dbtup::fix_commit_order(OperationrecPtr opPtr)
+Dbtup::findFirstOp(OperationrecPtr & firstPtr)
{
- ndbassert(!opPtr.p->is_first_operation());
- OperationrecPtr firstPtr = opPtr;
+ jam();
+ printf("Detect out-of-order commit(%u) -> ", firstPtr.i);
+ ndbassert(!firstPtr.p->is_first_operation());
while(firstPtr.p->prevActiveOp != RNIL)
{
firstPtr.i = firstPtr.p->prevActiveOp;
c_operation_pool.getPtr(firstPtr);
}
-
- ndbout_c("fix_commit_order (swapping %d and %d)",
- opPtr.i, firstPtr.i);
-
- /**
- * Swap data between first and curr
- */
- Uint32 prev= opPtr.p->prevActiveOp;
- Uint32 next= opPtr.p->nextActiveOp;
- Uint32 seco= firstPtr.p->nextActiveOp;
-
- Operationrec tmp = *opPtr.p;
- * opPtr.p = * firstPtr.p;
- * firstPtr.p = tmp;
-
- c_operation_pool.getPtr(seco)->prevActiveOp = opPtr.i;
- c_operation_pool.getPtr(prev)->nextActiveOp = firstPtr.i;
- if(next != RNIL)
- c_operation_pool.getPtr(next)->prevActiveOp = firstPtr.i;
+ ndbout_c("%u", firstPtr.i);
}
/* ----------------------------------------------------------------- */
@@ -448,22 +418,17 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
TablerecPtr regTabPtr;
KeyReqStruct req_struct;
TransState trans_state;
- Uint32 no_of_fragrec, no_of_tablerec, hash_value, gci;
+ Uint32 no_of_fragrec, no_of_tablerec;
TupCommitReq * const tupCommitReq= (TupCommitReq *)signal->getDataPtr();
regOperPtr.i= tupCommitReq->opPtr;
+ Uint32 hash_value= tupCommitReq->hashValue;
+ Uint32 gci = tupCommitReq->gci;
+
jamEntry();
c_operation_pool.getPtr(regOperPtr);
- if(!regOperPtr.p->is_first_operation())
- {
- /**
- * Out of order commit XXX check effect on triggers
- */
- fix_commit_order(regOperPtr);
- }
- ndbassert(regOperPtr.p->is_first_operation());
regFragPtr.i= regOperPtr.p->fragmentPtr;
trans_state= get_trans_state(regOperPtr.p);
@@ -486,8 +451,10 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
#ifdef VM_TRACE
if (tupCommitReq->diskpage == RNIL)
{
- m_pgman.m_ptr.setNull();
- req_struct.m_disk_page_ptr.setNull();
+ m_pgman.m_ptr.i = RNIL;
+ m_pgman.m_ptr.p = 0;
+ req_struct.m_disk_page_ptr.i = RNIL;
+ req_struct.m_disk_page_ptr.p = 0;
}
#endif
@@ -496,19 +463,63 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
PagePtr page;
Tuple_header* tuple_ptr= (Tuple_header*)
get_ptr(&page, &regOperPtr.p->m_tuple_location, regTabPtr.p);
+
+ /**
+ * NOTE: This has to be run before potential time-slice when
+ * waiting for disk, as otherwise the "other-ops" in a multi-op
+ * commit might run while we're waiting for disk
+ *
+ */
+ if (!regTabPtr.p->tuxCustomTriggers.isEmpty())
+ {
+ if(get_tuple_state(regOperPtr.p) == TUPLE_PREPARED)
+ {
+ jam();
+
+ OperationrecPtr loopPtr = regOperPtr;
+ if (unlikely(!regOperPtr.p->is_first_operation()))
+ {
+ findFirstOp(loopPtr);
+ }
+
+ /**
+ * Execute all tux triggers at first commit
+ * since previous tuple is otherwise removed...
+ */
+ jam();
+ goto first;
+ while(loopPtr.i != RNIL)
+ {
+ c_operation_pool.getPtr(loopPtr);
+ first:
+ executeTuxCommitTriggers(signal,
+ loopPtr.p,
+ regFragPtr.p,
+ regTabPtr.p);
+ set_tuple_state(loopPtr.p, TUPLE_TO_BE_COMMITTED);
+ loopPtr.i = loopPtr.p->nextActiveOp;
+ }
+ }
+ }
bool get_page = false;
if(regOperPtr.p->op_struct.m_load_diskpage_on_commit)
{
+ jam();
Page_cache_client::Request req;
- ndbassert(regOperPtr.p->is_first_operation() &&
- regOperPtr.p->is_last_operation());
+
+ /**
+ * Only last op on tuple needs "real" commit,
+ * hence only this one should have m_load_diskpage_on_commit
+ */
+ ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
/**
* Check for page
*/
if(!regOperPtr.p->m_copy_tuple_location.isNull())
{
+ jam();
Tuple_header* tmp= (Tuple_header*)
c_undo_buffer.get_ptr(&regOperPtr.p->m_copy_tuple_location);
@@ -518,23 +529,26 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
if (unlikely(regOperPtr.p->op_struct.op_type == ZDELETE &&
tmp->m_header_bits & Tuple_header::DISK_ALLOC))
{
- jam();
+ jam();
/**
* Insert+Delete
*/
- regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0;
- regOperPtr.p->op_struct.m_wait_log_buffer = 0;
- disk_page_abort_prealloc(signal, regFragPtr.p,
+ regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0;
+ regOperPtr.p->op_struct.m_wait_log_buffer = 0;
+ disk_page_abort_prealloc(signal, regFragPtr.p,
&req.m_page, req.m_page.m_page_idx);
-
- c_lgman->free_log_space(regFragPtr.p->m_logfile_group_id,
+
+ c_lgman->free_log_space(regFragPtr.p->m_logfile_group_id,
regOperPtr.p->m_undo_buffer_space);
- if (0) ndbout_c("insert+delete");
goto skip_disk;
+ if (0) ndbout_c("insert+delete");
+ jamEntry();
+ goto skip_disk;
}
}
else
{
+ jam();
// initial delete
ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE);
memcpy(&req.m_page,
@@ -558,11 +572,14 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
/**
* Timeslice
*/
+ jam();
signal->theData[0] = 1;
return;
case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0);
break;
+ default:
+ jam();
}
get_page = true;
@@ -579,8 +596,12 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
if(regOperPtr.p->op_struct.m_wait_log_buffer)
{
- ndbassert(regOperPtr.p->is_first_operation() &&
- regOperPtr.p->is_last_operation());
+ jam();
+ /**
+ * Only last op on tuple needs "real" commit,
+ * hence only this one should have m_wait_log_buffer
+ */
+ ndbassert(tuple_ptr->m_operation_ptr_i == regOperPtr.i);
Callback cb;
cb.m_callbackData= regOperPtr.i;
@@ -590,51 +611,39 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
Logfile_client lgman(this, c_lgman, regFragPtr.p->m_logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb);
+ jamEntry();
switch(res){
case 0:
+ jam();
signal->theData[0] = 1;
return;
case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0);
break;
+ default:
+ jam();
}
}
- if(!tuple_ptr)
- {
- tuple_ptr = (Tuple_header*)
- get_ptr(&page, &regOperPtr.p->m_tuple_location,regTabPtr.p);
- }
+ assert(tuple_ptr);
skip_disk:
req_struct.m_tuple_ptr = tuple_ptr;
- if(get_tuple_state(regOperPtr.p) == TUPLE_PREPARED)
- {
- /**
- * Execute all tux triggers at first commit
- * since previous tuple is otherwise removed...
- * btw...is this a "good" solution??
- *
- * why can't we instead remove "own version" (when approriate ofcourse)
- */
- if (!regTabPtr.p->tuxCustomTriggers.isEmpty()) {
- jam();
- OperationrecPtr loopPtr= regOperPtr;
- while(loopPtr.i != RNIL)
- {
- c_operation_pool.getPtr(loopPtr);
- executeTuxCommitTriggers(signal,
- loopPtr.p,
- regFragPtr.p,
- regTabPtr.p);
- set_tuple_state(loopPtr.p, TUPLE_TO_BE_COMMITTED);
- loopPtr.i = loopPtr.p->nextActiveOp;
- }
- }
- }
-
- if(regOperPtr.p->is_last_operation())
+ Uint32 nextOp = regOperPtr.p->nextActiveOp;
+ Uint32 prevOp = regOperPtr.p->prevActiveOp;
+ /**
+ * The trigger code (which is shared between detached/imediate)
+ * check op-list to check were to read before values from
+ * detached triggers should always read from original tuple value
+ * from before transaction start, not from any intermediate update
+ *
+ * Setting the op-list has this effect
+ */
+ regOperPtr.p->nextActiveOp = RNIL;
+ regOperPtr.p->prevActiveOp = RNIL;
+ if(tuple_ptr->m_operation_ptr_i == regOperPtr.i)
{
+ jam();
/**
* Perform "real" commit
*/
@@ -643,24 +652,38 @@ skip_disk:
checkDetachedTriggers(&req_struct, regOperPtr.p, regTabPtr.p,
disk != RNIL);
+ tuple_ptr->m_operation_ptr_i = RNIL;
+
if(regOperPtr.p->op_struct.op_type != ZDELETE)
{
+ jam();
commit_operation(signal, gci, tuple_ptr, page,
regOperPtr.p, regFragPtr.p, regTabPtr.p);
- removeActiveOpList(regOperPtr.p, tuple_ptr);
}
else
{
- removeActiveOpList(regOperPtr.p, tuple_ptr);
+ jam();
if (get_page)
ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
dealloc_tuple(signal, gci, page.p, tuple_ptr,
regOperPtr.p, regFragPtr.p, regTabPtr.p);
}
}
- else
+
+ if (nextOp != RNIL)
{
- removeActiveOpList(regOperPtr.p, tuple_ptr);
+ c_operation_pool.getPtr(nextOp)->prevActiveOp = prevOp;
+ }
+
+ if (prevOp != RNIL)
+ {
+ c_operation_pool.getPtr(prevOp)->nextActiveOp = nextOp;
+ }
+
+ if(!regOperPtr.p->m_copy_tuple_location.isNull())
+ {
+ jam();
+ c_undo_buffer.free_copy_tuple(&regOperPtr.p->m_copy_tuple_location);
}
initOpConnection(regOperPtr.p);
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
index 8420e7f2bde..3a3c1155657 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
@@ -282,6 +282,7 @@ Dbtup::update_extent_pos(Disk_alloc_info& alloc,
void
Dbtup::restart_setup_page(Disk_alloc_info& alloc, PagePtr pagePtr)
{
+ jam();
/**
* Link to extent, clear uncommitted_used_space
*/
@@ -302,6 +303,7 @@ Dbtup::restart_setup_page(Disk_alloc_info& alloc, PagePtr pagePtr)
ddassert(real_free >= estimated);
if (real_free != estimated)
{
+ jam();
extentPtr.p->m_free_space += (real_free - estimated);
update_extent_pos(alloc, extentPtr);
}
@@ -374,6 +376,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
key->m_file_no= tmp.p->m_file_no;
if (DBG_DISK)
ndbout << " found dirty page " << *key << endl;
+ jam();
return 0; // Page in memory
}
}
@@ -395,6 +398,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
* key = req.p->m_key;
if (DBG_DISK)
ndbout << " found transit page " << *key << endl;
+ jam();
return 0;
}
}
@@ -404,6 +408,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
*/
if (!c_page_request_pool.seize(req))
{
+ jam();
err= 1;
//XXX set error code
ndbout_c("no free request");
@@ -468,6 +473,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
err = c_lgman->alloc_log_space(logfile_group_id,
sizeof(Disk_undo::AllocExtent)>>2);
+ jamEntry();
if(unlikely(err))
{
return -err;
@@ -568,6 +574,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
Uint32 newPageBits= alloc.calc_page_free_bits(new_size);
if (newPageBits != (Uint32)pageBits)
{
+ jam();
ddassert(ext.p->m_free_page_count[pageBits] > 0);
ext.p->m_free_page_count[pageBits]--;
ext.p->m_free_page_count[newPageBits]++;
@@ -595,6 +602,7 @@ Dbtup::disk_page_prealloc(Signal* signal,
int flags= Page_cache_client::ALLOC_REQ;
if (pageBits == 0)
{
+ jam();
//XXX empty page -> fast to map
flags |= Page_cache_client::EMPTY_PAGE;
preq.m_callback.m_callbackFunction =
@@ -606,11 +614,13 @@ Dbtup::disk_page_prealloc(Signal* signal,
switch(res)
{
case 0:
+ jam();
break;
case -1:
ndbassert(false);
break;
default:
+ jam();
execute(signal, preq.m_callback, res); // run callback
}
@@ -622,6 +632,7 @@ Dbtup::disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,
PagePtr pagePtr,
Uint32 old_idx, Uint32 sz)
{
+ jam();
ddassert(pagePtr.p->list_index == old_idx);
Uint32 free= pagePtr.p->free_space;
@@ -637,6 +648,7 @@ Dbtup::disk_page_prealloc_dirty_page(Disk_alloc_info & alloc,
if (old_idx != new_idx)
{
+ jam();
LocalDLList<Page> old_list(*pool, alloc.m_dirty_pages[old_idx]);
LocalDLList<Page> new_list(*pool, alloc.m_dirty_pages[new_idx]);
old_list.remove(pagePtr);
@@ -660,6 +672,7 @@ Dbtup::disk_page_prealloc_transit_page(Disk_alloc_info& alloc,
Ptr<Page_request> req,
Uint32 old_idx, Uint32 sz)
{
+ jam();
ddassert(req.p->m_list_index == old_idx);
Uint32 free= req.p->m_estimated_free_space;
@@ -674,6 +687,7 @@ Dbtup::disk_page_prealloc_transit_page(Disk_alloc_info& alloc,
if (old_idx != new_idx)
{
+ jam();
Page_request_list::Head *lists = alloc.m_page_requests;
Local_page_request_list old_list(c_page_request_pool, lists[old_idx]);
Local_page_request_list new_list(c_page_request_pool, lists[new_idx]);
@@ -698,6 +712,7 @@ void
Dbtup::disk_page_prealloc_callback(Signal* signal,
Uint32 page_request, Uint32 page_id)
{
+ jamEntry();
//ndbout_c("disk_alloc_page_callback id: %d", page_id);
Ptr<Page_request> req;
@@ -727,6 +742,7 @@ Dbtup::disk_page_prealloc_initial_callback(Signal*signal,
Uint32 page_request,
Uint32 page_id)
{
+ jamEntry();
//ndbout_c("disk_alloc_page_callback_initial id: %d", page_id);
/**
* 1) lookup page request
@@ -818,6 +834,7 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
if (old_idx != new_idx || free != real_free)
{
+ jam();
Ptr<Extent_info> extentPtr;
c_extent_pool.getPtr(extentPtr, ext);
@@ -825,6 +842,7 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
if (old_idx != new_idx)
{
+ jam();
ddassert(extentPtr.p->m_free_page_count[old_idx]);
extentPtr.p->m_free_page_count[old_idx]--;
extentPtr.p->m_free_page_count[new_idx]++;
@@ -843,9 +861,11 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
void
Dbtup::disk_page_set_dirty(PagePtr pagePtr)
{
+ jam();
Uint32 idx = pagePtr.p->list_index;
if ((idx & 0x8000) == 0)
{
+ jam();
/**
* Already in dirty list
*/
@@ -874,7 +894,6 @@ Dbtup::disk_page_set_dirty(PagePtr pagePtr)
Uint32 used = pagePtr.p->uncommitted_used_space;
if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
{
- jam();
restart_setup_page(alloc, pagePtr);
idx = alloc.calc_page_free_bits(free);
used = 0;
@@ -918,6 +937,7 @@ Dbtup::disk_page_unmap_callback(Uint32 when,
type != File_formats::PT_Tup_varsize_page) ||
f_undo_done == false))
{
+ jam();
return ;
}
@@ -1014,6 +1034,7 @@ Dbtup::disk_page_unmap_callback(Uint32 when,
<< endl;
}
tsman.update_page_free_bits(&key, alloc.calc_page_free_bits(real_free));
+ jamEntry();
}
}
@@ -1022,6 +1043,7 @@ Dbtup::disk_page_alloc(Signal* signal,
Tablerec* tabPtrP, Fragrecord* fragPtrP,
Local_key* key, PagePtr pagePtr, Uint32 gci)
{
+ jam();
Uint32 logfile_group_id= fragPtrP->m_logfile_group_id;
Disk_alloc_info& alloc= fragPtrP->m_disk_alloc_info;
@@ -1050,6 +1072,7 @@ Dbtup::disk_page_free(Signal *signal,
Tablerec *tabPtrP, Fragrecord * fragPtrP,
Local_key* key, PagePtr pagePtr, Uint32 gci)
{
+ jam();
if (DBG_DISK)
ndbout << " disk_page_free " << *key << endl;
@@ -1100,6 +1123,7 @@ Dbtup::disk_page_free(Signal *signal,
if (old_idx != new_idx)
{
+ jam();
ddassert(extentPtr.p->m_free_page_count[old_idx]);
extentPtr.p->m_free_page_count[old_idx]--;
extentPtr.p->m_free_page_count[new_idx]++;
@@ -1126,6 +1150,7 @@ void
Dbtup::disk_page_abort_prealloc(Signal *signal, Fragrecord* fragPtrP,
Local_key* key, Uint32 sz)
{
+ jam();
Page_cache_client::Request req;
req.m_callback.m_callbackData= sz;
req.m_callback.m_callbackFunction =
@@ -1139,9 +1164,13 @@ Dbtup::disk_page_abort_prealloc(Signal *signal, Fragrecord* fragPtrP,
switch(res)
{
case 0:
+ jam();
+ break;
case -1:
+ ndbrequire(false);
break;
default:
+ jam();
Ptr<GlobalPage> gpage;
m_global_page_pool.getPtr(gpage, (Uint32)res);
PagePtr pagePtr;
@@ -1157,7 +1186,7 @@ Dbtup::disk_page_abort_prealloc_callback(Signal* signal,
Uint32 sz, Uint32 page_id)
{
//ndbout_c("disk_alloc_page_callback id: %d", page_id);
-
+ jamEntry();
Ptr<GlobalPage> gpage;
m_global_page_pool.getPtr(gpage, page_id);
@@ -1200,12 +1229,14 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
c_extent_pool.getPtr(extentPtr, ext);
if (old_idx != new_idx)
{
+ jam();
ddassert(extentPtr.p->m_free_page_count[old_idx]);
extentPtr.p->m_free_page_count[old_idx]--;
extentPtr.p->m_free_page_count[new_idx]++;
if (old_idx == page_idx)
{
+ jam();
ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
LocalDLList<Page> old_list(*pool, alloc.m_dirty_pages[old_idx]);
LocalDLList<Page> new_list(*pool, alloc.m_dirty_pages[new_idx]);
@@ -1215,6 +1246,7 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
}
else
{
+ jam();
pagePtr.p->list_index = new_idx | 0x8000;
}
}
@@ -1272,6 +1304,7 @@ Uint64
Dbtup::disk_page_undo_alloc(Page* page, const Local_key* key,
Uint32 sz, Uint32 gci, Uint32 logfile_group_id)
{
+ jam();
Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Alloc alloc;
@@ -1293,6 +1326,7 @@ Dbtup::disk_page_undo_update(Page* page, const Local_key* key,
const Uint32* src, Uint32 sz,
Uint32 gci, Uint32 logfile_group_id)
{
+ jam();
Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Update update;
@@ -1323,6 +1357,7 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
const Uint32* src, Uint32 sz,
Uint32 gci, Uint32 logfile_group_id)
{
+ jam();
Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Free free;
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
index a642d704eb9..8c096681b58 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
@@ -75,9 +75,17 @@ void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
jam();
ndbrequire(copyAttrBufPtr.i < RnoOfAttrBufrec);
ptrAss(copyAttrBufPtr, attrbufrec);
- RbufLen= copyAttrBufPtr.p->attrbuf[ZBUF_DATA_LEN];
- Rnext= copyAttrBufPtr.p->attrbuf[ZBUF_NEXT];
- Rfirst= cfirstfreeAttrbufrec;
+ RbufLen = copyAttrBufPtr.p->attrbuf[ZBUF_DATA_LEN];
+ Rnext = copyAttrBufPtr.p->attrbuf[ZBUF_NEXT];
+ Rfirst = cfirstfreeAttrbufrec;
+ /*
+ * ATTRINFO comes from 2 mutually exclusive places:
+ * 1) TUPKEYREQ (also interpreted part)
+ * 2) STORED_PROCREQ before scan start
+ * Assert here that both have a check for overflow.
+ * The "<" instead of "<=" is intentional.
+ */
+ ndbrequire(RinBufIndex + RbufLen < ZATTR_BUFFER_SIZE);
MEMCOPY_NO_WORDS(&inBuffer[RinBufIndex],
&copyAttrBufPtr.p->attrbuf[0],
RbufLen);
@@ -221,7 +229,7 @@ Dbtup::calculateChecksum(Tuple_header* tuple_ptr,
// includes tupVersion
//printf("%p - ", tuple_ptr);
- for (i= 0; i < rec_size-2; i++) {
+ for (i= 0; i < rec_size-Tuple_header::HeaderSize; i++) {
checksum ^= tuple_header[i];
//printf("%.8x ", tuple_header[i]);
}
@@ -358,21 +366,7 @@ Dbtup::setup_read(KeyReqStruct *req_struct,
dirty= false;
}
- OperationrecPtr prevOpPtr = currOpPtr;
- bool found= false;
- while(true)
- {
- if (savepointId > currOpPtr.p->savepointId) {
- found= true;
- break;
- }
- if (currOpPtr.p->is_first_operation()){
- break;
- }
- prevOpPtr= currOpPtr;
- currOpPtr.i = currOpPtr.p->prevActiveOp;
- c_operation_pool.getPtr(currOpPtr);
- }
+ bool found= find_savepoint(currOpPtr, savepointId);
Uint32 currOp= currOpPtr.p->op_struct.op_type;
@@ -763,7 +757,8 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
}
checkImmediateTriggersAfterInsert(&req_struct,
regOperPtr,
- regTabPtr);
+ regTabPtr,
+ disk_page != RNIL);
set_change_mask_state(regOperPtr, SET_ALL_MASK);
sendTUPKEYCONF(signal, &req_struct, regOperPtr);
return;
@@ -796,7 +791,8 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
}
checkImmediateTriggersAfterUpdate(&req_struct,
regOperPtr,
- regTabPtr);
+ regTabPtr,
+ disk_page != RNIL);
// XXX use terrorCode for now since all methods are void
if (terrorCode != 0)
{
@@ -827,7 +823,8 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
*/
checkImmediateTriggersAfterDelete(&req_struct,
regOperPtr,
- regTabPtr);
+ regTabPtr,
+ disk_page != RNIL);
set_change_mask_state(regOperPtr, DELETE_CHANGES);
sendTUPKEYCONF(signal, &req_struct, regOperPtr);
return;
@@ -1506,32 +1503,22 @@ int Dbtup::handleDeleteReq(Signal* signal,
else
{
regOperPtr->tupVersion= req_struct->m_tuple_ptr->get_tuple_version();
- if(regTabPtr->m_no_of_disk_attributes)
+ }
+
+ if(disk && regOperPtr->m_undo_buffer_space == 0)
+ {
+ regOperPtr->op_struct.m_wait_log_buffer = 1;
+ regOperPtr->op_struct.m_load_diskpage_on_commit = 1;
+ Uint32 sz= regOperPtr->m_undo_buffer_space=
+ (sizeof(Dbtup::Disk_undo::Free) >> 2) +
+ regTabPtr->m_offsets[DD].m_fix_header_size - 1;
+
+ terrorCode= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
+ sz);
+ if(unlikely(terrorCode))
{
- Uint32 sz;
- if(regTabPtr->m_attributes[DD].m_no_of_varsize)
- {
- /**
- * Need to have page in memory to read size
- * to alloc undo space
- */
- abort();
- }
- else
- sz= (sizeof(Dbtup::Disk_undo::Free) >> 2) +
- regTabPtr->m_offsets[DD].m_fix_header_size - 1;
-
- regOperPtr->m_undo_buffer_space= sz;
-
- int res;
- if((res= c_lgman->alloc_log_space(regFragPtr->m_logfile_group_id,
- sz)))
- {
- terrorCode= res;
- regOperPtr->m_undo_buffer_space= 0;
- goto error;
- }
-
+ regOperPtr->m_undo_buffer_space= 0;
+ goto error;
}
}
if (req_struct->attrinfo_len == 0)
@@ -1540,7 +1527,9 @@ int Dbtup::handleDeleteReq(Signal* signal,
}
if (regTabPtr->need_expand(disk))
+ {
prepare_read(req_struct, regTabPtr, disk);
+ }
{
Uint32 RlogSize;
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
index 0427f1c7612..0a4477db0d0 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
@@ -15,6 +15,7 @@
#define DBTUP_C
#define DBTUP_INDEX_CPP
+#include <Dblqh.hpp>
#include "Dbtup.hpp"
#include <RefConvert.hpp>
#include <ndb_limits.h>
@@ -319,13 +320,25 @@ Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIn
return ret;
}
+/*
+ * TUX index contains all tuple versions. A scan in TUX has scanned
+ * one of them and asks if it can be returned as scan result. This
+ * depends on trans id, dirty read flag, and savepoint within trans.
+ *
+ * Previously this faked a ZREAD operation and used getPage().
+ * In TUP getPage() is run after ACC locking, but TUX comes here
+ * before ACC access. Instead of modifying getPage() it is more
+ * clear to do the full check here.
+ */
bool
Dbtup::tuxQueryTh(Uint32 fragPtrI,
- Uint32 tupAddr,
+ Uint32 pageId,
+ Uint32 pageIndex,
Uint32 tupVersion,
Uint32 transId1,
Uint32 transId2,
- Uint32 savePointId)
+ bool dirty,
+ Uint32 savepointId)
{
jamEntry();
FragrecordPtr fragPtr;
@@ -334,35 +347,83 @@ Dbtup::tuxQueryTh(Uint32 fragPtrI,
TablerecPtr tablePtr;
tablePtr.i= fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
- // get page
- Uint32 fragPageId= tupAddr >> MAX_TUPLES_BITS;
- Uint32 pageIndex= tupAddr & ((1 << MAX_TUPLES_BITS ) - 1);
- // use temp op rec
- Operationrec tempOp;
+ PagePtr pagePtr;
+ pagePtr.i = pageId;
+ c_page_pool.getPtr(pagePtr);
+
KeyReqStruct req_struct;
- tempOp.m_tuple_location.m_page_no= getRealpid(fragPtr.p, fragPageId);
- tempOp.m_tuple_location.m_page_idx= pageIndex;
- tempOp.savepointId= savePointId;
- tempOp.op_struct.op_type= ZREAD;
- req_struct.frag_page_id= fragPageId;
- req_struct.trans_id1= transId1;
- req_struct.trans_id2= transId2;
- req_struct.dirty_op= 1;
-
- setup_fixed_part(&req_struct, &tempOp, tablePtr.p);
- if (setup_read(&req_struct, &tempOp, fragPtr.p, tablePtr.p, false)) {
- /*
- * We use the normal getPage which will return the tuple to be used
- * for this transaction and savepoint id. If its tuple version
- * equals the requested then we have a visible tuple otherwise not.
- */
+
+ {
+ Operationrec tmpOp;
+ tmpOp.m_tuple_location.m_page_no = pageId;
+ tmpOp.m_tuple_location.m_page_idx = pageIndex;
+ setup_fixed_part(&req_struct, &tmpOp, tablePtr.p);
+ }
+
+ Tuple_header* tuple_ptr = req_struct.m_tuple_ptr;
+
+ OperationrecPtr currOpPtr;
+ currOpPtr.i = tuple_ptr->m_operation_ptr_i;
+ if (currOpPtr.i == RNIL) {
+ jam();
+ // tuple has no operation, any scan can see it
+ return true;
+ }
+ c_operation_pool.getPtr(currOpPtr);
+
+ const bool sameTrans =
+ c_lqh->is_same_trans(currOpPtr.p->userpointer, transId1, transId2);
+
+ bool res = false;
+ OperationrecPtr loopOpPtr = currOpPtr;
+
+ if (!sameTrans) {
jam();
- if (req_struct.m_tuple_ptr->get_tuple_version() == tupVersion) {
+ if (!dirty) {
jam();
- return true;
+ if (currOpPtr.p->nextActiveOp == RNIL) {
+ jam();
+ // last op - TUX makes ACC lock request in same timeslice
+ res = true;
+ }
+ }
+ else {
+ // loop to first op (returns false)
+ find_savepoint(loopOpPtr, 0);
+ const Uint32 op_type = loopOpPtr.p->op_struct.op_type;
+
+ if (op_type != ZINSERT) {
+ jam();
+ // read committed version
+ const Uint32 origVersion = tuple_ptr->get_tuple_version();
+ if (origVersion == tupVersion) {
+ jam();
+ res = true;
+ }
+ }
}
}
- return false;
+ else {
+ jam();
+ // for own trans, ignore dirty flag
+
+ if (find_savepoint(loopOpPtr, savepointId)) {
+ jam();
+ const Uint32 op_type = loopOpPtr.p->op_struct.op_type;
+
+ if (op_type != ZDELETE) {
+ jam();
+ // check if this op has produced the scanned version
+ Uint32 loopVersion = loopOpPtr.p->tupVersion;
+ if (loopVersion == tupVersion) {
+ jam();
+ res = true;
+ }
+ }
+ }
+ }
+
+ return res;
}
// ordered index build
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
index 5e9306909b4..4c5fb7b645f 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
@@ -54,7 +54,14 @@ Dbtup::execACC_SCANREQ(Signal* signal)
// flags
Uint32 bits = 0;
- if (!AccScanReq::getLcpScanFlag(req->requestInfo))
+
+ if (AccScanReq::getLcpScanFlag(req->requestInfo))
+ {
+ jam();
+ bits |= ScanOp::SCAN_LCP;
+ c_scanOpPool.getPtr(scanPtr, c_lcp_scan_op);
+ }
+ else
{
// seize from pool and link to per-fragment list
LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
@@ -62,37 +69,26 @@ Dbtup::execACC_SCANREQ(Signal* signal)
jam();
break;
}
-
- if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
- && tablePtr.p->m_no_of_disk_attributes)
- {
- bits |= ScanOp::SCAN_DD;
- }
-
- bool mm = (bits & ScanOp::SCAN_DD);
- if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
- bits |= ScanOp::SCAN_VS;
-
- // disk pages have fixed page format
- ndbrequire(! (bits & ScanOp::SCAN_DD));
- }
- if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) {
- if (AccScanReq::getLockMode(req->requestInfo) == 0)
- bits |= ScanOp::SCAN_LOCK_SH;
- else
- bits |= ScanOp::SCAN_LOCK_EX;
- }
- } else {
- jam();
- // LCP scan and disk
+ }
+
+ if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
+ && tablePtr.p->m_no_of_disk_attributes)
+ {
+ bits |= ScanOp::SCAN_DD;
+ }
+
+ bool mm = (bits & ScanOp::SCAN_DD);
+ if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
+ bits |= ScanOp::SCAN_VS;
- ndbrequire(frag.m_lcp_scan_op == c_lcp_scan_op);
- c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
- ndbrequire(scanPtr.p->m_fragPtrI == fragPtr.i);
- bits |= ScanOp::SCAN_LCP;
- if (tablePtr.p->m_attributes[MM].m_no_of_varsize > 0) {
- bits |= ScanOp::SCAN_VS;
- }
+ // disk pages have fixed page format
+ ndbrequire(! (bits & ScanOp::SCAN_DD));
+ }
+ if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) {
+ if (AccScanReq::getLockMode(req->requestInfo) == 0)
+ bits |= ScanOp::SCAN_LOCK_SH;
+ else
+ bits |= ScanOp::SCAN_LOCK_EX;
}
if (AccScanReq::getNRScanFlag(req->requestInfo))
@@ -112,6 +108,13 @@ Dbtup::execACC_SCANREQ(Signal* signal)
jam();
scanPtr.p->m_endPage = RNIL;
}
+
+ if (AccScanReq::getLcpScanFlag(req->requestInfo))
+ {
+ jam();
+ ndbrequire((bits & ScanOp::SCAN_DD) == 0);
+ ndbrequire((bits & ScanOp::SCAN_LOCK) == 0);
+ }
// set up scan op
new (scanPtr.p) ScanOp();
@@ -618,6 +621,24 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (lcp && lcp_list != RNIL)
goto found_lcp_keep;
+
+ switch(pos.m_get){
+ case ScanPos::Get_next_tuple:
+ case ScanPos::Get_next_tuple_fs:
+ jam();
+ key.m_page_idx += size;
+ // fall through
+ case ScanPos::Get_tuple:
+ case ScanPos::Get_tuple_fs:
+ jam();
+ /**
+ * We need to refetch page after timeslice
+ */
+ pos.m_get = ScanPos::Get_page;
+ break;
+ default:
+ break;
+ }
while (true) {
switch (pos.m_get) {
@@ -1141,16 +1162,17 @@ Dbtup::releaseScanOp(ScanOpPtr& scanPtr)
fragPtr.i = scanPtr.p->m_fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
- if(! (scanPtr.p->m_bits & ScanOp::SCAN_LCP))
+ if(scanPtr.p->m_bits & ScanOp::SCAN_LCP)
{
- LocalDLList<ScanOp> list(c_scanOpPool, fragPtr.p->m_scanList);
- list.release(scanPtr);
+ jam();
+ fragPtr.p->m_lcp_scan_op = RNIL;
+ scanPtr.p->m_fragPtrI = RNIL;
}
else
{
- ndbrequire(fragPtr.p->m_lcp_scan_op == scanPtr.i);
- fragPtr.p->m_lcp_scan_op = RNIL;
- scanPtr.p->m_fragPtrI = RNIL;
+ jam();
+ LocalDLList<ScanOp> list(c_scanOpPool, fragPtr.p->m_scanList);
+ list.release(scanPtr);
}
}
@@ -1163,21 +1185,24 @@ Dbtup::execLCP_FRAG_ORD(Signal* signal)
tablePtr.i = req->tableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
- jam();
- FragrecordPtr fragPtr;
- Uint32 fragId = req->fragmentId;
- fragPtr.i = RNIL;
- getFragmentrec(fragPtr, fragId, tablePtr.p);
- ndbrequire(fragPtr.i != RNIL);
- Fragrecord& frag = *fragPtr.p;
-
- ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
- frag.m_lcp_scan_op = c_lcp_scan_op;
- ScanOpPtr scanPtr;
- c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
- ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
- scanPtr.p->m_fragPtrI = fragPtr.i;
-
- scanFirst(signal, scanPtr);
- scanPtr.p->m_state = ScanOp::First;
+ if (tablePtr.p->m_no_of_disk_attributes)
+ {
+ jam();
+ FragrecordPtr fragPtr;
+ Uint32 fragId = req->fragmentId;
+ fragPtr.i = RNIL;
+ getFragmentrec(fragPtr, fragId, tablePtr.p);
+ ndbrequire(fragPtr.i != RNIL);
+ Fragrecord& frag = *fragPtr.p;
+
+ ndbrequire(frag.m_lcp_scan_op == RNIL && c_lcp_scan_op != RNIL);
+ frag.m_lcp_scan_op = c_lcp_scan_op;
+ ScanOpPtr scanPtr;
+ c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
+ ndbrequire(scanPtr.p->m_fragPtrI == RNIL);
+ scanPtr.p->m_fragPtrI = fragPtr.i;
+
+ scanFirst(signal, scanPtr);
+ scanPtr.p->m_state = ScanOp::First;
+ }
}
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupStoredProcDef.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupStoredProcDef.cpp
index 12d5f8aba38..a1e350853ce 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupStoredProcDef.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupStoredProcDef.cpp
@@ -106,6 +106,11 @@ void Dbtup::scanProcedure(Signal* signal,
regOperPtr->attrinbufLen = lenAttrInfo;
regOperPtr->currentAttrinbufLen = 0;
regOperPtr->storedProcPtr = storedPtr.i;
+ if (lenAttrInfo >= ZATTR_BUFFER_SIZE) { // yes ">="
+ jam();
+ // send REF and change state to ignore the ATTRINFO to come
+ storedSeizeAttrinbufrecErrorLab(signal, regOperPtr, ZSTORED_TOO_MUCH_ATTRINFO_ERROR);
+ }
}//Dbtup::scanProcedure()
void Dbtup::copyProcedure(Signal* signal,
@@ -146,7 +151,7 @@ bool Dbtup::storedProcedureAttrInfo(Signal* signal,
Uint32 RnoFree = cnoFreeAttrbufrec;
if (ERROR_INSERTED(4004) && !copyProcedure) {
CLEAR_ERROR_INSERT_VALUE;
- storedSeizeAttrinbufrecErrorLab(signal, regOperPtr);
+ storedSeizeAttrinbufrecErrorLab(signal, regOperPtr, ZSTORED_SEIZE_ATTRINBUFREC_ERROR);
return false;
}//if
regOperPtr->currentAttrinbufLen += length;
@@ -162,7 +167,7 @@ bool Dbtup::storedProcedureAttrInfo(Signal* signal,
regAttrPtr.p->attrbuf[ZBUF_NEXT] = RNIL;
} else {
jam();
- storedSeizeAttrinbufrecErrorLab(signal, regOperPtr);
+ storedSeizeAttrinbufrecErrorLab(signal, regOperPtr, ZSTORED_SEIZE_ATTRINBUFREC_ERROR);
return false;
}//if
if (regOperPtr->firstAttrinbufrec == RNIL) {
@@ -190,7 +195,7 @@ bool Dbtup::storedProcedureAttrInfo(Signal* signal,
}//if
if (ERROR_INSERTED(4005) && !copyProcedure) {
CLEAR_ERROR_INSERT_VALUE;
- storedSeizeAttrinbufrecErrorLab(signal, regOperPtr);
+ storedSeizeAttrinbufrecErrorLab(signal, regOperPtr, ZSTORED_SEIZE_ATTRINBUFREC_ERROR);
return false;
}//if
@@ -212,7 +217,8 @@ bool Dbtup::storedProcedureAttrInfo(Signal* signal,
}//Dbtup::storedProcedureAttrInfo()
void Dbtup::storedSeizeAttrinbufrecErrorLab(Signal* signal,
- Operationrec* regOperPtr)
+ Operationrec* regOperPtr,
+ Uint32 errorCode)
{
StoredProcPtr storedPtr;
c_storedProcPool.getPtr(storedPtr, regOperPtr->storedProcPtr);
@@ -224,7 +230,7 @@ void Dbtup::storedSeizeAttrinbufrecErrorLab(Signal* signal,
regOperPtr->m_any_value = 0;
set_trans_state(regOperPtr, TRANS_ERROR_WAIT_STORED_PROCREQ);
signal->theData[0] = regOperPtr->userpointer;
- signal->theData[1] = ZSTORED_SEIZE_ATTRINBUFREC_ERROR;
+ signal->theData[1] = errorCode;
signal->theData[2] = regOperPtr->storedProcPtr;
sendSignal(DBLQH_REF, GSN_STORED_PROCREF, signal, 3, JBB);
}//Dbtup::storedSeizeAttrinbufrecErrorLab()
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
index 09d71a19add..0ae6d0f4ac6 100644
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
@@ -369,7 +369,8 @@ Dbtup::dropTrigger(Tablerec* table, const DropTrigReq* req, BlockNumber sender)
void
Dbtup::checkImmediateTriggersAfterInsert(KeyReqStruct *req_struct,
Operationrec *regOperPtr,
- Tablerec *regTablePtr)
+ Tablerec *regTablePtr,
+ bool disk)
{
if(refToBlock(req_struct->TC_ref) != DBTC) {
return;
@@ -380,14 +381,16 @@ Dbtup::checkImmediateTriggersAfterInsert(KeyReqStruct *req_struct,
jam();
fireImmediateTriggers(req_struct,
regTablePtr->afterInsertTriggers,
- regOperPtr);
+ regOperPtr,
+ disk);
}
}
void
Dbtup::checkImmediateTriggersAfterUpdate(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
- Tablerec* regTablePtr)
+ Tablerec* regTablePtr,
+ bool disk)
{
if(refToBlock(req_struct->TC_ref) != DBTC) {
return;
@@ -398,21 +401,24 @@ Dbtup::checkImmediateTriggersAfterUpdate(KeyReqStruct *req_struct,
jam();
fireImmediateTriggers(req_struct,
regTablePtr->afterUpdateTriggers,
- regOperPtr);
+ regOperPtr,
+ disk);
}
if ((regOperPtr->op_struct.primary_replica) &&
(!(regTablePtr->constraintUpdateTriggers.isEmpty()))) {
jam();
fireImmediateTriggers(req_struct,
regTablePtr->constraintUpdateTriggers,
- regOperPtr);
+ regOperPtr,
+ disk);
}
}
void
Dbtup::checkImmediateTriggersAfterDelete(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
- Tablerec* regTablePtr)
+ Tablerec* regTablePtr,
+ bool disk)
{
if(refToBlock(req_struct->TC_ref) != DBTC) {
return;
@@ -423,7 +429,8 @@ Dbtup::checkImmediateTriggersAfterDelete(KeyReqStruct *req_struct,
jam();
executeTriggers(req_struct,
regTablePtr->afterDeleteTriggers,
- regOperPtr);
+ regOperPtr,
+ disk);
}
}
@@ -547,7 +554,8 @@ end:
void
Dbtup::fireImmediateTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList,
- Operationrec* const regOperPtr)
+ Operationrec* const regOperPtr,
+ bool disk)
{
TriggerPtr trigPtr;
triggerList.first(trigPtr);
@@ -558,7 +566,8 @@ Dbtup::fireImmediateTriggers(KeyReqStruct *req_struct,
jam();
executeTrigger(req_struct,
trigPtr.p,
- regOperPtr);
+ regOperPtr,
+ disk);
}//if
triggerList.next(trigPtr);
}//while
@@ -621,7 +630,8 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct,
void Dbtup::executeTriggers(KeyReqStruct *req_struct,
DLList<TupTriggerData>& triggerList,
- Operationrec* regOperPtr)
+ Operationrec* regOperPtr,
+ bool disk)
{
TriggerPtr trigPtr;
triggerList.first(trigPtr);
@@ -629,7 +639,8 @@ void Dbtup::executeTriggers(KeyReqStruct *req_struct,
jam();
executeTrigger(req_struct,
trigPtr.p,
- regOperPtr);
+ regOperPtr,
+ disk);
triggerList.next(trigPtr);
}
diff --git a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
index e6ee2374056..8cef61038a6 100644
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
@@ -987,7 +987,8 @@ Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
const ScanOp& scan = *scanPtr.p;
const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
Uint32 tableFragPtrI = frag.m_tupTableFragPtrI;
- Uint32 tupAddr = getTupAddr(frag, ent);
+ Uint32 pageId = ent.m_tupLoc.getPageId();
+ Uint32 pageOffset = ent.m_tupLoc.getPageOffset();
Uint32 tupVersion = ent.m_tupVersion;
// check for same tuple twice in row
if (scan.m_scanEnt.m_tupLoc == ent.m_tupLoc)
@@ -997,8 +998,9 @@ Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
}
Uint32 transId1 = scan.m_transId1;
Uint32 transId2 = scan.m_transId2;
+ bool dirty = scan.m_readCommitted;
Uint32 savePointId = scan.m_savePointId;
- bool ret = c_tup->tuxQueryTh(tableFragPtrI, tupAddr, tupVersion, transId1, transId2, savePointId);
+ bool ret = c_tup->tuxQueryTh(tableFragPtrI, pageId, pageOffset, tupVersion, transId1, transId2, dirty, savePointId);
jamEntry();
return ret;
}
diff --git a/storage/ndb/src/kernel/blocks/lgman.cpp b/storage/ndb/src/kernel/blocks/lgman.cpp
index 0481f7b399b..53cb1e113e1 100644
--- a/storage/ndb/src/kernel/blocks/lgman.cpp
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp
@@ -547,6 +547,22 @@ Lgman::execCREATE_FILE_REQ(Signal* signal)
break;
}
+ if(ERROR_INSERTED(15000) ||
+ (sizeof(void*) == 4 && req->file_size_hi & 0xFFFFFFFF))
+ {
+ jam();
+ if(signal->getNoOfSections())
+ releaseSections(signal);
+
+ CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
+ ref->senderData = senderData;
+ ref->senderRef = reference();
+ ref->errorCode = CreateFileImplRef::FileSizeTooLarge;
+ sendSignal(senderRef, GSN_CREATE_FILE_REF, signal,
+ CreateFileImplRef::SignalLength, JBB);
+ return;
+ }
+
new (file_ptr.p) Undofile(req, ptr.i);
Local_undofile_list tmp(m_file_pool, ptr.p->m_meta_files);
@@ -902,7 +918,7 @@ Lgman::alloc_logbuffer_memory(Ptr<Logfile_group> ptr, Uint32 bytes)
{
Uint32 ptrI;
Uint32 cnt = pages > 64 ? 64 : pages;
- m_ctx.m_mm.alloc(&ptrI, &cnt, 1);
+ m_ctx.m_mm.alloc_pages(RG_DISK_OPERATIONS, &ptrI, &cnt, 1);
if (cnt)
{
Buffer_idx range;
@@ -1021,7 +1037,7 @@ Lgman::free_logbuffer_memory(Ptr<Logfile_group> ptr)
ndbrequire(map.next(it));
tmp[1] = *it.data;
- m_ctx.m_mm.release(range.m_ptr_i, range.m_idx);
+ m_ctx.m_mm.release_pages(RG_DISK_OPERATIONS, range.m_ptr_i, range.m_idx);
map.next(it);
}
map.release();
diff --git a/storage/ndb/src/kernel/blocks/suma/Suma.cpp b/storage/ndb/src/kernel/blocks/suma/Suma.cpp
index 113b63a19d3..5f0510cf43a 100644
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp
@@ -4908,6 +4908,21 @@ Suma::release_gci(Signal* signal, Uint32 buck, Uint32 gci)
if(gci >= head.m_max_gci)
{
jam();
+ if (ERROR_INSERTED(13034))
+ {
+ jam();
+ SET_ERROR_INSERT_VALUE(13035);
+ return;
+ }
+ if (ERROR_INSERTED(13035))
+ {
+ CLEAR_ERROR_INSERT_VALUE;
+ NodeReceiverGroup rg(CMVMI, c_nodes_in_nodegroup_mask);
+ rg.m_nodes.clear(getOwnNodeId());
+ signal->theData[0] = 9999;
+ sendSignal(rg, GSN_NDB_TAMPER, signal, 1, JBA);
+ return;
+ }
head.m_page_pos = 0;
head.m_max_gci = gci;
head.m_last_gci = 0;
@@ -4979,7 +4994,6 @@ Suma::start_resend(Signal* signal, Uint32 buck)
if(min > max)
{
- ndbrequire(pos.m_page_pos <= 2);
ndbrequire(pos.m_page_id == bucket->m_buffer_tail);
m_active_buckets.set(buck);
m_gcp_complete_rep_count ++;
diff --git a/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp b/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp
index bf5c07b5b97..0248833978c 100644
--- a/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp
+++ b/storage/ndb/src/kernel/blocks/suma/SumaInit.cpp
@@ -27,6 +27,8 @@ Suma::Suma(Block_context& ctx) :
Restart(*this),
c_gcp_list(c_gcp_pool)
{
+ BLOCK_CONSTRUCTOR(Suma);
+
// Add received signals
addRecSignal(GSN_READ_CONFIG_REQ, &Suma::execREAD_CONFIG_REQ);
addRecSignal(GSN_STTOR, &Suma::execSTTOR);
diff --git a/storage/ndb/src/kernel/blocks/tsman.cpp b/storage/ndb/src/kernel/blocks/tsman.cpp
index 3a7003d56c8..8e68e118f98 100644
--- a/storage/ndb/src/kernel/blocks/tsman.cpp
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp
@@ -537,6 +537,22 @@ Tsman::execCREATE_FILE_REQ(Signal* signal){
break;
}
+ if(ERROR_INSERTED(16000) ||
+ (sizeof(void*) == 4 && req->file_size_hi & 0xFFFFFFFF))
+ {
+ jam();
+ if(signal->getNoOfSections())
+ releaseSections(signal);
+
+ CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr();
+ ref->senderData = senderData;
+ ref->senderRef = reference();
+ ref->errorCode = CreateFileImplRef::FileSizeTooLarge;
+ sendSignal(senderRef, GSN_CREATE_FILE_REF, signal,
+ CreateFileImplRef::SignalLength, JBB);
+ return;
+ }
+
new (file_ptr.p) Datafile(req);
Local_datafile_list tmp(m_file_pool, ptr.p->m_meta_files);
tmp.add(file_ptr);
diff --git a/storage/ndb/src/kernel/vm/DLHashTable.hpp b/storage/ndb/src/kernel/vm/DLHashTable.hpp
index df56dcb8846..b9d5f7c597f 100644
--- a/storage/ndb/src/kernel/vm/DLHashTable.hpp
+++ b/storage/ndb/src/kernel/vm/DLHashTable.hpp
@@ -46,8 +46,8 @@ public:
/**
* Seize element from pool - return i
*
- * Note must be either added using <b>add</b> or released
- * using <b>release</b>
+ * Note *must* be added using <b>add</b> (even before hash.release)
+ * or be released using pool
*/
bool seize(Ptr<T> &);
@@ -360,7 +360,14 @@ DLHashTableImpl<P, T, U>::remove(Ptr<T> & ptr)
else
{
const Uint32 hv = ptr.p->hashValue() & mask;
- hashValues[hv] = next;
+ if (hashValues[hv] == ptr.i)
+ {
+ hashValues[hv] = next;
+ }
+ else
+ {
+ // Will add assert in 5.1
+ }
}
if(next != RNIL)
@@ -386,7 +393,14 @@ DLHashTableImpl<P, T, U>::release(Ptr<T> & ptr)
else
{
const Uint32 hv = ptr.p->hashValue() & mask;
- hashValues[hv] = next;
+ if (hashValues[hv] == ptr.i)
+ {
+ hashValues[hv] = next;
+ }
+ else
+ {
+ // Will add assert in 5.1
+ }
}
if(next != RNIL)
diff --git a/storage/ndb/src/kernel/vm/DLHashTable2.hpp b/storage/ndb/src/kernel/vm/DLHashTable2.hpp
index 23ced757d8b..05340317adb 100644
--- a/storage/ndb/src/kernel/vm/DLHashTable2.hpp
+++ b/storage/ndb/src/kernel/vm/DLHashTable2.hpp
@@ -42,8 +42,8 @@ public:
/**
* Seize element from pool - return i
*
- * Note must be either added using <b>add</b> or released
- * using <b>release</b>
+ * Note *must* be added using <b>add</b> (even before hash.release)
+ * or be released using pool
*/
bool seize(Ptr<T> &);
@@ -374,7 +374,14 @@ DLHashTable2<T, U>::remove(Ptr<T> & ptr){
prevP->nextHash = next;
} else {
const Uint32 hv = ptr.p->hashValue() & mask;
- hashValues[hv] = next;
+ if (hashValues[hv] == ptr.i)
+ {
+ hashValues[hv] = next;
+ }
+ else
+ {
+ // Will add assert in 5.1
+ }
}
if(next != RNIL){
@@ -395,7 +402,14 @@ DLHashTable2<T, U>::release(Ptr<T> & ptr){
prevP->nextHash = next;
} else {
const Uint32 hv = ptr.p->hashValue() & mask;
- hashValues[hv] = next;
+ if (hashValues[hv] == ptr.i)
+ {
+ hashValues[hv] = next;
+ }
+ else
+ {
+ // Will add assert in 5.1
+ }
}
if(next != RNIL){
diff --git a/storage/ndb/src/kernel/vm/NdbdSuperPool.cpp b/storage/ndb/src/kernel/vm/NdbdSuperPool.cpp
index 14545e23820..717c354a180 100644
--- a/storage/ndb/src/kernel/vm/NdbdSuperPool.cpp
+++ b/storage/ndb/src/kernel/vm/NdbdSuperPool.cpp
@@ -48,7 +48,7 @@ NdbdSuperPool::NdbdSuperPool(class Ndbd_mem_manager & mm,
{
m_memRoot = m_mm.get_memroot();
- m_shift = Ndbd_mem_manager::log2((1 << (BMW_2LOG + 2)) / pageSize) - 1;
+ m_shift = Ndbd_mem_manager::ndb_log2((1 << (BMW_2LOG + 2)) / pageSize) - 1;
m_add = (1 << m_shift) - 1;
}
diff --git a/storage/ndb/src/kernel/vm/Pool.cpp b/storage/ndb/src/kernel/vm/Pool.cpp
index 29c4aec4ee3..f252a601ac2 100644
--- a/storage/ndb/src/kernel/vm/Pool.cpp
+++ b/storage/ndb/src/kernel/vm/Pool.cpp
@@ -20,7 +20,8 @@
void*
Pool_context::alloc_page(Uint32 type_id, Uint32 *i)
{
- return m_block->m_ctx.m_mm.alloc_page(type_id, i);
+ return m_block->m_ctx.m_mm.alloc_page(type_id, i,
+ Ndbd_mem_manager::NDB_ZONE_LO);
}
void
diff --git a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
index 70637a362d0..e2100e66baa 100644
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
@@ -25,19 +25,22 @@ extern EventLogger g_eventLogger;
extern EventLogger g_eventLogger;
#endif
-#ifdef NDBD_MALLOC_METHOD
-#if NDBD_MALLOC_METHOD == sbrk
-static const char * f_method = "sbrk";
+static int f_method_idx = 0;
+#ifdef NDBD_MALLOC_METHOD_SBRK
+static const char * f_method = "SMsm";
#else
-static const char * f_method = "malloc";
-#endif
-#elif SIZEOF_CHARP == 8
-static const char * f_method = "sbrk";
-#else
-static const char * f_method = "malloc";
+static const char * f_method = "MSms";
#endif
#define MAX_CHUNKS 10
+#define ZONE_LO 0
+#define ZONE_HI 1
+
+/**
+ * POOL_RECORD_BITS == 13 => 32 - 13 = 19 bits for page
+ */
+#define ZONE_LO_BOUND (1u << 19)
+
struct InitChunk
{
Uint32 m_cnt;
@@ -54,28 +57,42 @@ do_malloc(Uint32 pages, InitChunk* chunk)
pages += 1;
void * ptr = 0;
Uint32 sz = pages;
- if (strcmp(f_method, "sbrk") == 0)
+
+retry:
+ char method = f_method[f_method_idx];
+ switch(method){
+ case 0:
+ return false;
+ case 'S':
+ case 's':
{
ptr = 0;
while (ptr == 0)
{
ptr = sbrk(sizeof(Alloc_page) * sz);
+
if (ptr == (void*)-1)
{
+ if (method == 'S')
+ {
+ f_method_idx++;
+ goto retry;
+ }
+
ptr = 0;
sz = 1 + (9 * sz) / 10;
if (pages >= 32 && sz < 32)
{
sz = pages;
- f_method = "malloc";
- g_eventLogger.info("sbrk(%lld) failed, trying malloc",
- (Uint64)(sizeof(Alloc_page) * sz));
- break;
+ f_method_idx++;
+ goto retry;
}
}
}
+ break;
}
- if (strcmp(f_method, "malloc") == 0)
+ case 'M':
+ case 'm':
{
ptr = 0;
while (ptr == 0)
@@ -83,15 +100,26 @@ do_malloc(Uint32 pages, InitChunk* chunk)
ptr = malloc(sizeof(Alloc_page) * sz);
if (ptr == 0)
{
+ if (method == 'M')
+ {
+ f_method_idx++;
+ goto retry;
+ }
+
sz = 1 + (9 * sz) / 10;
if (pages >= 32 && sz < 32)
{
- return false;
+ f_method_idx++;
+ goto retry;
}
}
}
+ break;
}
-
+ default:
+ return false;
+ }
+
chunk->m_cnt = sz;
chunk->m_ptr = (Alloc_page*)ptr;
const UintPtr align = sizeof(Alloc_page) - 1;
@@ -125,7 +153,7 @@ do_malloc(Uint32 pages, InitChunk* chunk)
}
Uint32
-Ndbd_mem_manager::log2(Uint32 input)
+Ndbd_mem_manager::ndb_log2(Uint32 input)
{
input = input | (input >> 8);
input = input | (input >> 4);
@@ -151,6 +179,12 @@ Ndbd_mem_manager::Ndbd_mem_manager()
}
}
+/**
+ * m_min = reserved
+ * m_curr = current
+ * m_max = max alloc, 0 = no limit
+ */
+
void
Ndbd_mem_manager::set_resource_limit(const Resource_limit& rl)
{
@@ -176,6 +210,40 @@ Ndbd_mem_manager::get_resource_limit(Uint32 id, Resource_limit& rl) const
return false;
}
+static
+inline
+void
+check_resource_limits(Resource_limit* rl)
+{
+#ifdef VM_TRACE
+ Uint32 curr = 0;
+ Uint32 res_alloc = 0;
+ Uint32 shared_alloc = 0;
+ Uint32 sumres = 0;
+ for (Uint32 i = 1; i<XX_RL_COUNT; i++)
+ {
+ curr += rl[i].m_curr;
+ sumres += rl[i].m_min;
+ assert(rl[i].m_max == 0 || rl[i].m_curr <= rl[i].m_max);
+ if (rl[i].m_curr > rl[i].m_min)
+ {
+ shared_alloc += rl[i].m_curr - rl[i].m_min;
+ res_alloc += rl[i].m_min;
+ }
+ else
+ {
+ res_alloc += rl[i].m_curr;
+ }
+ }
+ assert(curr == rl[0].m_curr);
+ assert(res_alloc + shared_alloc == curr);
+ assert(res_alloc <= sumres);
+ assert(sumres == res_alloc + rl[0].m_min);
+ assert(rl[0].m_curr <= rl[0].m_max);
+#endif
+}
+
+
bool
Ndbd_mem_manager::init(bool alloc_less_memory)
{
@@ -292,6 +360,8 @@ Ndbd_mem_manager::init(bool alloc_less_memory)
grow(chunks[i].m_start, chunks[i].m_cnt);
}
+ check_resource_limits(m_resource_limit);
+
return true;
}
@@ -321,35 +391,68 @@ Ndbd_mem_manager::grow(Uint32 start, Uint32 cnt)
cnt--; // last page is always marked as empty
}
- if (!m_used_bitmap_pages.get(start_bmp))
- {
- if (start != (start_bmp << BPP_2LOG))
- {
- ndbout_c("ndbd_malloc_impl.cpp:%d:grow(%d, %d) %d!=%d"
- " - Unable to use due to bitmap pages missaligned!!",
- __LINE__, start, cnt, start, (start_bmp << BPP_2LOG));
- g_eventLogger.error("ndbd_malloc_impl.cpp:%d:grow(%d, %d)"
- " - Unable to use due to bitmap pages missaligned!!",
- __LINE__, start, cnt);
- return;
- }
+ for (Uint32 i = 0; i<m_used_bitmap_pages.size(); i++)
+ if (m_used_bitmap_pages[i] == start_bmp)
+ goto found;
+ if (start != (start_bmp << BPP_2LOG))
+ {
+
+ ndbout_c("ndbd_malloc_impl.cpp:%d:grow(%d, %d) %d!=%d not using %uMb"
+ " - Unable to use due to bitmap pages missaligned!!",
+ __LINE__, start, cnt, start, (start_bmp << BPP_2LOG),
+ (cnt >> (20 - 15)));
+ g_eventLogger.error("ndbd_malloc_impl.cpp:%d:grow(%d, %d) not using %uMb"
+ " - Unable to use due to bitmap pages missaligned!!",
+ __LINE__, start, cnt,
+ (cnt >> (20 - 15)));
+
+ dump();
+ return;
+ }
+
#ifdef UNIT_TEST
- ndbout_c("creating bitmap page %d", start_bmp);
+ ndbout_c("creating bitmap page %d", start_bmp);
#endif
-
+
+ {
Alloc_page* bmp = m_base_page + start;
memset(bmp, 0, sizeof(Alloc_page));
- m_used_bitmap_pages.set(start_bmp);
cnt--;
start++;
}
-
+ m_used_bitmap_pages.push_back(start_bmp);
+
+found:
if (cnt)
{
m_resource_limit[0].m_curr += cnt;
m_resource_limit[0].m_max += cnt;
- release(start, cnt);
+ if (start >= ZONE_LO_BOUND)
+ {
+ Uint64 mbytes = ((Uint64(cnt) * 32) + 1023) / 1024;
+ ndbout_c("Adding %uMb to ZONE_HI (%u,%u)", (Uint32)mbytes, start, cnt);
+ release(start, cnt);
+ }
+ else if (start + cnt <= ZONE_LO_BOUND)
+ {
+ Uint64 mbytes = ((Uint64(cnt)*32) + 1023) / 1024;
+ ndbout_c("Adding %uMb to ZONE_LO (%u,%u)", (Uint32)mbytes, start, cnt);
+ release(start, cnt);
+ }
+ else
+ {
+ Uint32 cnt0 = ZONE_LO_BOUND - start;
+ Uint32 cnt1 = start + cnt - ZONE_LO_BOUND;
+ Uint64 mbytes0 = ((Uint64(cnt0)*32) + 1023) / 1024;
+ Uint64 mbytes1 = ((Uint64(cnt1)*32) + 1023) / 1024;
+ ndbout_c("Adding %uMb to ZONE_LO (split %u,%u)", (Uint32)mbytes0,
+ start, cnt0);
+ ndbout_c("Adding %uMb to ZONE_HI (split %u,%u)", (Uint32)mbytes1,
+ ZONE_LO_BOUND, cnt1);
+ release(start, cnt0);
+ release(ZONE_LO_BOUND, cnt1);
+ }
}
}
@@ -362,64 +465,82 @@ Ndbd_mem_manager::release(Uint32 start, Uint32 cnt)
set(start, start+cnt-1);
- release_impl(start, cnt);
+ Uint32 zone = start < ZONE_LO_BOUND ? 0 : 1;
+ release_impl(zone, start, cnt);
}
void
-Ndbd_mem_manager::release_impl(Uint32 start, Uint32 cnt)
+Ndbd_mem_manager::release_impl(Uint32 zone, Uint32 start, Uint32 cnt)
{
assert(start);
Uint32 test = check(start-1, start+cnt);
- if (test & 1)
+ if (start != ZONE_LO_BOUND && test & 1)
{
Free_page_data *fd = get_free_page_data(m_base_page + start - 1,
start - 1);
Uint32 sz = fd->m_size;
Uint32 left = start - sz;
- remove_free_list(left, fd->m_list);
+ remove_free_list(zone, left, fd->m_list);
cnt += sz;
start = left;
}
Uint32 right = start + cnt;
- if (test & 2)
+ if (right != ZONE_LO_BOUND && test & 2)
{
Free_page_data *fd = get_free_page_data(m_base_page+right, right);
Uint32 sz = fd->m_size;
- remove_free_list(right, fd->m_list);
+ remove_free_list(zone, right, fd->m_list);
cnt += sz;
}
- insert_free_list(start, cnt);
+ insert_free_list(zone, start, cnt);
}
void
-Ndbd_mem_manager::alloc(Uint32* ret, Uint32 *pages, Uint32 min)
+Ndbd_mem_manager::alloc(AllocZone zone,
+ Uint32* ret, Uint32 *pages, Uint32 min)
+{
+ if (zone == NDB_ZONE_ANY)
+ {
+ Uint32 save = * pages;
+ alloc_impl(ZONE_HI, ret, pages, min);
+ if (*pages)
+ return;
+ * pages = save;
+ }
+
+ alloc_impl(ZONE_LO, ret, pages, min);
+}
+
+void
+Ndbd_mem_manager::alloc_impl(Uint32 zone,
+ Uint32* ret, Uint32 *pages, Uint32 min)
{
Int32 i;
Uint32 start;
Uint32 cnt = * pages;
- Uint32 list = log2(cnt - 1);
+ Uint32 list = ndb_log2(cnt - 1);
assert(cnt);
assert(list <= 16);
for (i = list; i < 16; i++)
{
- if ((start = m_buddy_lists[i]))
+ if ((start = m_buddy_lists[zone][i]))
{
/* ---------------------------------------------------------------- */
/* PROPER AMOUNT OF PAGES WERE FOUND. NOW SPLIT THE FOUND */
/* AREA AND RETURN THE PART NOT NEEDED. */
/* ---------------------------------------------------------------- */
- Uint32 sz = remove_free_list(start, i);
+ Uint32 sz = remove_free_list(zone, start, i);
Uint32 extra = sz - cnt;
assert(sz >= cnt);
if (extra)
{
- insert_free_list(start + cnt, extra);
+ insert_free_list(zone, start + cnt, extra);
clear_and_set(start, start+cnt-1);
}
else
@@ -427,8 +548,7 @@ Ndbd_mem_manager::alloc(Uint32* ret, Uint32 *pages, Uint32 min)
clear(start, start+cnt-1);
}
* ret = start;
- m_resource_limit[0].m_curr += cnt;
- assert(m_resource_limit[0].m_curr <= m_resource_limit[0].m_max);
+ assert(m_resource_limit[0].m_curr + cnt <= m_resource_limit[0].m_max);
return;
}
}
@@ -438,17 +558,17 @@ Ndbd_mem_manager::alloc(Uint32* ret, Uint32 *pages, Uint32 min)
* search in other lists...
*/
- Int32 min_list = log2(min - 1);
+ Int32 min_list = ndb_log2(min - 1);
assert((Int32)list >= min_list);
for (i = list - 1; i >= min_list; i--)
{
- if ((start = m_buddy_lists[i]))
+ if ((start = m_buddy_lists[zone][i]))
{
- Uint32 sz = remove_free_list(start, i);
+ Uint32 sz = remove_free_list(zone, start, i);
Uint32 extra = sz - cnt;
if (sz > cnt)
{
- insert_free_list(start + cnt, extra);
+ insert_free_list(zone, start + cnt, extra);
sz -= extra;
clear_and_set(start, start+sz-1);
}
@@ -459,8 +579,7 @@ Ndbd_mem_manager::alloc(Uint32* ret, Uint32 *pages, Uint32 min)
* ret = start;
* pages = sz;
- m_resource_limit[0].m_curr += sz;
- assert(m_resource_limit[0].m_curr <= m_resource_limit[0].m_max);
+ assert(m_resource_limit[0].m_curr + sz <= m_resource_limit[0].m_max);
return;
}
}
@@ -468,12 +587,12 @@ Ndbd_mem_manager::alloc(Uint32* ret, Uint32 *pages, Uint32 min)
}
void
-Ndbd_mem_manager::insert_free_list(Uint32 start, Uint32 size)
+Ndbd_mem_manager::insert_free_list(Uint32 zone, Uint32 start, Uint32 size)
{
- Uint32 list = log2(size) - 1;
+ Uint32 list = ndb_log2(size) - 1;
Uint32 last = start + size - 1;
- Uint32 head = m_buddy_lists[list];
+ Uint32 head = m_buddy_lists[zone][list];
Free_page_data* fd_first = get_free_page_data(m_base_page+start,
start);
fd_first->m_list = list;
@@ -495,11 +614,11 @@ Ndbd_mem_manager::insert_free_list(Uint32 start, Uint32 size)
fd->m_prev = start;
}
- m_buddy_lists[list] = start;
+ m_buddy_lists[zone][list] = start;
}
Uint32
-Ndbd_mem_manager::remove_free_list(Uint32 start, Uint32 list)
+Ndbd_mem_manager::remove_free_list(Uint32 zone, Uint32 start, Uint32 list)
{
Free_page_data* fd = get_free_page_data(m_base_page+start, start);
Uint32 size = fd->m_size;
@@ -509,7 +628,7 @@ Ndbd_mem_manager::remove_free_list(Uint32 start, Uint32 list)
if (prev)
{
- assert(m_buddy_lists[list] != start);
+ assert(m_buddy_lists[zone][list] != start);
fd = get_free_page_data(m_base_page+prev, prev);
assert(fd->m_next == start);
assert(fd->m_list == list);
@@ -517,8 +636,8 @@ Ndbd_mem_manager::remove_free_list(Uint32 start, Uint32 list)
}
else
{
- assert(m_buddy_lists[list] == start);
- m_buddy_lists[list] = next;
+ assert(m_buddy_lists[zone][list] == start);
+ m_buddy_lists[zone][list] = next;
}
if (next)
@@ -535,42 +654,62 @@ Ndbd_mem_manager::remove_free_list(Uint32 start, Uint32 list)
void
Ndbd_mem_manager::dump() const
{
- for(Uint32 i = 0; i<16; i++)
+ for (Uint32 zone = 0; zone < 2; zone ++)
{
- printf(" list: %d - ", i);
- Uint32 head = m_buddy_lists[i];
- while(head)
+ for (Uint32 i = 0; i<16; i++)
{
- Free_page_data* fd = get_free_page_data(m_base_page+head, head);
- printf("[ i: %d prev %d next %d list %d size %d ] ",
- head, fd->m_prev, fd->m_next, fd->m_list, fd->m_size);
- head = fd->m_next;
+ printf(" list: %d - ", i);
+ Uint32 head = m_buddy_lists[zone][i];
+ while(head)
+ {
+ Free_page_data* fd = get_free_page_data(m_base_page+head, head);
+ printf("[ i: %d prev %d next %d list %d size %d ] ",
+ head, fd->m_prev, fd->m_next, fd->m_list, fd->m_size);
+ head = fd->m_next;
+ }
+ printf("EOL\n");
+ }
+
+ for (Uint32 i = 0; i<XX_RL_COUNT; i++)
+ {
+ printf("ri: %d min: %d curr: %d max: %d\n",
+ i,
+ m_resource_limit[i].m_min,
+ m_resource_limit[i].m_curr,
+ m_resource_limit[i].m_max);
}
- printf("EOL\n");
}
}
void*
-Ndbd_mem_manager::alloc_page(Uint32 type, Uint32* i)
+Ndbd_mem_manager::alloc_page(Uint32 type, Uint32* i, AllocZone zone)
{
Uint32 idx = type & RG_MASK;
assert(idx && idx < XX_RL_COUNT);
Resource_limit tot = m_resource_limit[0];
Resource_limit rl = m_resource_limit[idx];
- Uint32 add = (rl.m_curr < rl.m_min) ? 0 : 1; // Over min ?
+ Uint32 cnt = 1;
+ Uint32 res0 = (rl.m_curr < rl.m_min) ? 1 : 0;
Uint32 limit = (rl.m_max == 0 || rl.m_curr < rl.m_max) ? 0 : 1; // Over limit
Uint32 free = (tot.m_min + tot.m_curr < tot.m_max) ? 1 : 0; // Has free
- if (likely(add == 0 || (limit == 0 && free == 1)))
+ assert(tot.m_min >= res0);
+
+ if (likely(res0 == 1 || (limit == 0 && free == 1)))
{
- Uint32 cnt = 1;
- alloc(i, &cnt, 1);
- assert(cnt);
- m_resource_limit[0].m_curr = tot.m_curr + add;
- m_resource_limit[idx].m_curr = rl.m_curr + 1;
- return m_base_page + *i;
+ alloc(zone, i, &cnt, 1);
+ if (likely(cnt))
+ {
+ m_resource_limit[0].m_curr = tot.m_curr + cnt;
+ m_resource_limit[0].m_min = tot.m_min - res0;
+ m_resource_limit[idx].m_curr = rl.m_curr + cnt;
+
+ check_resource_limits(m_resource_limit);
+ return m_base_page + *i;
+ }
}
+
return 0;
}
@@ -582,10 +721,102 @@ Ndbd_mem_manager::release_page(Uint32 type, Uint32 i)
Resource_limit tot = m_resource_limit[0];
Resource_limit rl = m_resource_limit[idx];
- Uint32 sub = (rl.m_curr < rl.m_min) ? 0 : 1; // Over min ?
+ Uint32 sub = (rl.m_curr <= rl.m_min) ? 1 : 0; // Over min ?
release(i, 1);
- m_resource_limit[0].m_curr = tot.m_curr - sub;
+ m_resource_limit[0].m_curr = tot.m_curr - 1;
+ m_resource_limit[0].m_min = tot.m_min + sub;
m_resource_limit[idx].m_curr = rl.m_curr - 1;
+
+ check_resource_limits(m_resource_limit);
+}
+
+void
+Ndbd_mem_manager::alloc_pages(Uint32 type, Uint32* i, Uint32 *cnt, Uint32 min)
+{
+ Uint32 idx = type & RG_MASK;
+ assert(idx && idx < XX_RL_COUNT);
+ Resource_limit tot = m_resource_limit[0];
+ Resource_limit rl = m_resource_limit[idx];
+
+ Uint32 req = *cnt;
+
+ Uint32 max = rl.m_max - rl.m_curr;
+ Uint32 res0 = rl.m_min - rl.m_curr;
+ Uint32 free_shared = tot.m_max - (tot.m_min + tot.m_curr);
+
+ Uint32 res1;
+ if (rl.m_curr + req <= rl.m_min)
+ {
+ // all is reserved...
+ res0 = req;
+ res1 = 0;
+ }
+ else
+ {
+ req = rl.m_max ? max : req;
+ res0 = (rl.m_curr > rl.m_min) ? 0 : res0;
+ res1 = req - res0;
+
+ if (unlikely(res1 > free_shared))
+ {
+ res1 = free_shared;
+ req = res0 + res1;
+ }
+ }
+
+ // req = pages to alloc
+ // res0 = portion that is reserved
+ // res1 = part that is over reserver
+ assert (res0 + res1 == req);
+ assert (tot.m_min >= res0);
+
+ if (likely(req))
+ {
+ // Hi order allocations can always use any zone
+ alloc(NDB_ZONE_ANY, i, &req, 1);
+ * cnt = req;
+ if (unlikely(req < res0)) // Got min than what was reserved :-(
+ {
+ res0 = req;
+ }
+ assert(tot.m_min >= res0);
+ assert(tot.m_curr + req <= tot.m_max);
+
+ m_resource_limit[0].m_curr = tot.m_curr + req;
+ m_resource_limit[0].m_min = tot.m_min - res0;
+ m_resource_limit[idx].m_curr = rl.m_curr + req;
+ check_resource_limits(m_resource_limit);
+ return ;
+ }
+ * cnt = req;
+ return;
+}
+
+void
+Ndbd_mem_manager::release_pages(Uint32 type, Uint32 i, Uint32 cnt)
+{
+ Uint32 idx = type & RG_MASK;
+ assert(idx && idx < XX_RL_COUNT);
+ Resource_limit tot = m_resource_limit[0];
+ Resource_limit rl = m_resource_limit[idx];
+
+ release(i, cnt);
+
+ Uint32 currnew = rl.m_curr - cnt;
+ if (rl.m_curr > rl.m_min)
+ {
+ if (currnew < rl.m_min)
+ {
+ m_resource_limit[0].m_min = tot.m_min + (rl.m_min - currnew);
+ }
+ }
+ else
+ {
+ m_resource_limit[0].m_min = tot.m_min + cnt;
+ }
+ m_resource_limit[0].m_curr = tot.m_curr - cnt;
+ m_resource_limit[idx].m_curr = currnew;
+ check_resource_limits(m_resource_limit);
}
#ifdef UNIT_TEST
@@ -781,3 +1012,4 @@ main(int argc, char** argv)
template class Vector<Chunk>;
#endif
+template class Vector<Uint32>;
diff --git a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
index d87927d9c45..78e41f1cabd 100644
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
@@ -20,6 +20,7 @@
#include <Bitmask.hpp>
#include <assert.h>
#include "Pool.hpp"
+#include <Vector.hpp>
/**
* 13 -> 8192 words -> 32768 bytes
@@ -59,46 +60,53 @@ public:
bool init(bool allow_alloc_less_than_requested = true);
void* get_memroot() const { return (void*)m_base_page;}
- void alloc(Uint32* ret, Uint32 *pages, Uint32 min_requested);
- void release(Uint32 start, Uint32 cnt);
-
void dump() const ;
- void* alloc_page(Uint32 type, Uint32* i);
+ enum AllocZone
+ {
+ NDB_ZONE_LO = 0, // Only allocate with page_id < (1 << 13)
+ NDB_ZONE_ANY = 1 // Allocate with any page_id
+ };
+
+ void* alloc_page(Uint32 type, Uint32* i, enum AllocZone);
void release_page(Uint32 type, Uint32 i);
- void* alloc_pages(Uint32 type, Uint32* i, Uint32 *cnt, Uint32 min = 1);
- void release_pages(Uint32 type, Uint32 i, void*p, Uint32 cnt);
+ void alloc_pages(Uint32 type, Uint32* i, Uint32 *cnt, Uint32 min = 1);
+ void release_pages(Uint32 type, Uint32 i, Uint32 cnt);
/**
* Compute 2log of size
* @note size = 0 -> 0
* @note size > 65536 -> 16
*/
- static Uint32 log2(Uint32 size);
+ static Uint32 ndb_log2(Uint32 size);
private:
void grow(Uint32 start, Uint32 cnt);
-#define XX_RL_COUNT 3
+#define XX_RL_COUNT 4
/**
* Return pointer to free page data on page
*/
static Free_page_data* get_free_page_data(Alloc_page*, Uint32 idx);
- Bitmask<1> m_used_bitmap_pages;
+ Vector<Uint32> m_used_bitmap_pages;
- Uint32 m_buddy_lists[16];
+ Uint32 m_buddy_lists[2][16];
Resource_limit m_resource_limit[XX_RL_COUNT]; // RG_COUNT in record_types.hpp
Alloc_page * m_base_page;
- void release_impl(Uint32 start, Uint32 cnt);
- void insert_free_list(Uint32 start, Uint32 cnt);
- Uint32 remove_free_list(Uint32 start, Uint32 list);
+ void release_impl(Uint32 zone, Uint32 start, Uint32 cnt);
+ void insert_free_list(Uint32 zone, Uint32 start, Uint32 cnt);
+ Uint32 remove_free_list(Uint32 zone, Uint32 start, Uint32 list);
void set(Uint32 first, Uint32 last);
void clear(Uint32 first, Uint32 last);
void clear_and_set(Uint32 first, Uint32 last);
Uint32 check(Uint32 first, Uint32 last);
+
+ void alloc(AllocZone, Uint32* ret, Uint32 *pages, Uint32 min_requested);
+ void alloc_impl(Uint32 zone, Uint32* ret, Uint32 *pages, Uint32 min);
+ void release(Uint32 start, Uint32 cnt);
};
inline
diff --git a/storage/ndb/src/kernel/vm/pc.hpp b/storage/ndb/src/kernel/vm/pc.hpp
index 194d5534646..cf03f676ae1 100644
--- a/storage/ndb/src/kernel/vm/pc.hpp
+++ b/storage/ndb/src/kernel/vm/pc.hpp
@@ -49,7 +49,7 @@
theEmulatedJamBlockNumber = number(); \
Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
*(Uint32*)(theEmulatedJam + tEmulatedJamIndex) = \
- ((theEmulatedJamBlockNumber << 20) | line); \
+ ((theEmulatedJamBlockNumber << 20) | (line)); \
theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
#else
@@ -72,7 +72,7 @@
theEmulatedJamBlockNumber = number(); \
Uint32 tEmulatedJamIndex = theEmulatedJamIndex; \
*(Uint32*)((UintPtr)theEmulatedJam + (Uint32)tEmulatedJamIndex) = \
- ((theEmulatedJamBlockNumber << 20) | line); \
+ ((theEmulatedJamBlockNumber << 20) | (line)); \
theEmulatedJamIndex = (tEmulatedJamIndex + 4) & JAM_MASK; }
#endif
@@ -231,6 +231,6 @@
#define MEMCOPY_PAGE(to, from, page_size_in_bytes) \
memcpy((void*)(to), (void*)(from), (size_t)(page_size_in_bytes));
#define MEMCOPY_NO_WORDS(to, from, no_of_words) \
- memcpy((to), (void*)(from), (size_t)(no_of_words << 2));
+ memcpy((to), (void*)(from), (size_t)((no_of_words) << 2));
#endif
diff --git a/storage/ndb/src/mgmapi/ndb_logevent.cpp b/storage/ndb/src/mgmapi/ndb_logevent.cpp
index ed72db297ab..fbf026fd79d 100644
--- a/storage/ndb/src/mgmapi/ndb_logevent.cpp
+++ b/storage/ndb/src/mgmapi/ndb_logevent.cpp
@@ -256,7 +256,7 @@ struct Ndb_logevent_body_row ndb_logevent_body[]= {
ROW( ReceiveBytesStatistic, "mean_received_bytes", 2, mean_received_bytes),
ROW( MemoryUsage, "gth", 1, gth),
- ROW( MemoryUsage, "page_size_kb", 2, page_size_kb),
+ ROW( MemoryUsage, "page_size_bytes", 2, page_size_bytes),
ROW( MemoryUsage, "pages_used", 3, pages_used),
ROW( MemoryUsage, "pages_total", 4, pages_total),
ROW( MemoryUsage, "block", 5, block),
diff --git a/storage/ndb/src/mgmclient/CommandInterpreter.cpp b/storage/ndb/src/mgmclient/CommandInterpreter.cpp
index 9e8910c9649..7057525efc7 100644
--- a/storage/ndb/src/mgmclient/CommandInterpreter.cpp
+++ b/storage/ndb/src/mgmclient/CommandInterpreter.cpp
@@ -859,10 +859,14 @@ event_thread_run(void* p)
{
do_event_thread= 1;
do {
- if (ndb_logevent_get_next(log_handle, &log_event, 2000) <= 0)
- continue;
- Guard g(printmutex);
- printLogEvent(&log_event);
+ int res= ndb_logevent_get_next(log_handle, &log_event, 2000);
+ if (res > 0)
+ {
+ Guard g(printmutex);
+ printLogEvent(&log_event);
+ }
+ else if (res < 0)
+ break;
} while(do_event_thread);
ndb_mgm_destroy_logevent_handle(&log_handle);
}
@@ -2663,8 +2667,9 @@ CommandInterpreter::executeStartBackup(char* parameters, bool interactive)
{
int count = 0;
int retry = 0;
+ int res;
do {
- if (ndb_logevent_get_next(log_handle, &log_event, 60000) > 0)
+ if ((res= ndb_logevent_get_next(log_handle, &log_event, 60000)) > 0)
{
int print = 0;
switch (log_event.type) {
@@ -2694,7 +2699,7 @@ CommandInterpreter::executeStartBackup(char* parameters, bool interactive)
{
retry++;
}
- } while(count < 2 && retry < 3);
+ } while(res >= 0 && count < 2 && retry < 3);
if (retry >= 3)
ndbout << "get backup event failed for " << retry << " times" << endl;
diff --git a/storage/ndb/src/mgmsrv/ConfigInfo.cpp b/storage/ndb/src/mgmsrv/ConfigInfo.cpp
index 9cbb7d93ceb..6e560ff2701 100644
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp
@@ -397,7 +397,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
MANDATORY,
"1",
- STR_VALUE(MAX_NODES) },
+ STR_VALUE(MAX_DATA_NODE_ID) },
{
CFG_NODE_ID,
@@ -409,7 +409,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
MANDATORY,
"1",
- STR_VALUE(MAX_NODES) },
+ STR_VALUE(MAX_DATA_NODE_ID) },
{
KEY_INTERNAL,
@@ -1404,7 +1404,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
MANDATORY,
"1",
- STR_VALUE(MAX_NODES) },
+ STR_VALUE(MAX_NODES_ID) },
{
CFG_NODE_ID,
@@ -1416,7 +1416,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
MANDATORY,
"1",
- STR_VALUE(MAX_NODES) },
+ STR_VALUE(MAX_NODES_ID) },
{
KEY_INTERNAL,
@@ -1547,7 +1547,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
MANDATORY,
"1",
- STR_VALUE(MAX_NODES) },
+ STR_VALUE(MAX_NODES_ID) },
{
CFG_NODE_ID,
@@ -1559,7 +1559,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT,
MANDATORY,
"1",
- STR_VALUE(MAX_NODES) },
+ STR_VALUE(MAX_NODES_ID) },
{
CFG_LOG_DESTINATION,
diff --git a/storage/ndb/src/ndbapi/Ndb.cpp b/storage/ndb/src/ndbapi/Ndb.cpp
index 15647861eef..e6a1c2cfcfd 100644
--- a/storage/ndb/src/ndbapi/Ndb.cpp
+++ b/storage/ndb/src/ndbapi/Ndb.cpp
@@ -942,6 +942,7 @@ Parameters: aTableName (IN) : The table name.
step (IN) : Specifies the step between the
autoincrement values.
start (IN) : Start value for first value
+Returns: 0 if succesful, -1 if error encountered
Remark: Returns a new autoincrement value to the application.
The autoincrement values can be increased by steps
(default 1) and a number of values can be prefetched
@@ -1072,9 +1073,18 @@ Ndb::getTupleIdFromNdb(const NdbTableImpl* table,
DBUG_RETURN(0);
}
+/****************************************************************************
+int readAutoIncrementValue( const char* aTableName,
+ Uint64 & autoValue);
+
+Parameters: aTableName (IN) : The table name.
+ autoValue (OUT) : The current autoincrement value
+Returns: 0 if succesful, -1 if error encountered
+Remark: Returns the current autoincrement value to the application.
+****************************************************************************/
int
Ndb::readAutoIncrementValue(const char* aTableName,
- Uint64 & tupleId)
+ Uint64 & autoValue)
{
DBUG_ENTER("Ndb::readAutoIncrementValue");
ASSERT_NOT_MYSQLD;
@@ -1088,15 +1098,15 @@ Ndb::readAutoIncrementValue(const char* aTableName,
}
const NdbTableImpl* table = info->m_table_impl;
TupleIdRange & range = info->m_tuple_id_range;
- if (readTupleIdFromNdb(table, range, tupleId) == -1)
+ if (readTupleIdFromNdb(table, range, autoValue) == -1)
DBUG_RETURN(-1);
- DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+ DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
DBUG_RETURN(0);
}
int
Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
- Uint64 & tupleId)
+ Uint64 & autoValue)
{
DBUG_ENTER("Ndb::readAutoIncrementValue");
ASSERT_NOT_MYSQLD;
@@ -1111,23 +1121,23 @@ Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
DBUG_RETURN(-1);
}
TupleIdRange & range = info->m_tuple_id_range;
- if (readTupleIdFromNdb(table, range, tupleId) == -1)
+ if (readTupleIdFromNdb(table, range, autoValue) == -1)
DBUG_RETURN(-1);
- DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+ DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
DBUG_RETURN(0);
}
int
Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
- TupleIdRange & range, Uint64 & tupleId)
+ TupleIdRange & range, Uint64 & autoValue)
{
DBUG_ENTER("Ndb::readAutoIncrementValue");
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
- if (readTupleIdFromNdb(table, range, tupleId) == -1)
+ if (readTupleIdFromNdb(table, range, autoValue) == -1)
DBUG_RETURN(-1);
- DBUG_PRINT("info", ("value %lu", (ulong)tupleId));
+ DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
DBUG_RETURN(0);
}
@@ -1155,9 +1165,20 @@ Ndb::readTupleIdFromNdb(const NdbTableImpl* table,
DBUG_RETURN(0);
}
+/****************************************************************************
+int setAutoIncrementValue( const char* aTableName,
+ Uint64 autoValue,
+ bool modify);
+
+Parameters: aTableName (IN) : The table name.
+ autoValue (IN) : The new autoincrement value
+ modify (IN) : Modify existing value (not initialization)
+Returns: 0 if succesful, -1 if error encountered
+Remark: Sets a new autoincrement value for the application.
+****************************************************************************/
int
Ndb::setAutoIncrementValue(const char* aTableName,
- Uint64 tupleId, bool increase)
+ Uint64 autoValue, bool modify)
{
DBUG_ENTER("Ndb::setAutoIncrementValue");
ASSERT_NOT_MYSQLD;
@@ -1171,14 +1192,14 @@ Ndb::setAutoIncrementValue(const char* aTableName,
}
const NdbTableImpl* table = info->m_table_impl;
TupleIdRange & range = info->m_tuple_id_range;
- if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
+ if (setTupleIdInNdb(table, range, autoValue, modify) == -1)
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
int
Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
- Uint64 tupleId, bool increase)
+ Uint64 autoValue, bool modify)
{
DBUG_ENTER("Ndb::setAutoIncrementValue");
ASSERT_NOT_MYSQLD;
@@ -1193,52 +1214,55 @@ Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
DBUG_RETURN(-1);
}
TupleIdRange & range = info->m_tuple_id_range;
- if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
+ if (setTupleIdInNdb(table, range, autoValue, modify) == -1)
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
int
Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
- TupleIdRange & range, Uint64 tupleId,
- bool increase)
+ TupleIdRange & range, Uint64 autoValue,
+ bool modify)
{
DBUG_ENTER("Ndb::setAutoIncrementValue");
assert(aTable != 0);
const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
- if (setTupleIdInNdb(table, range, tupleId, increase) == -1)
+ if (setTupleIdInNdb(table, range, autoValue, modify) == -1)
DBUG_RETURN(-1);
DBUG_RETURN(0);
}
int
Ndb::setTupleIdInNdb(const NdbTableImpl* table,
- TupleIdRange & range, Uint64 tupleId, bool increase)
+ TupleIdRange & range, Uint64 tupleId, bool modify)
{
DBUG_ENTER("Ndb::setTupleIdInNdb");
- if (increase)
+ if (modify)
{
- if (range.m_first_tuple_id != range.m_last_tuple_id)
+ if (checkTupleIdInNdb(range, tupleId))
{
- assert(range.m_first_tuple_id < range.m_last_tuple_id);
- if (tupleId <= range.m_first_tuple_id + 1)
- DBUG_RETURN(0);
- if (tupleId <= range.m_last_tuple_id)
+ if (range.m_first_tuple_id != range.m_last_tuple_id)
{
- range.m_first_tuple_id = tupleId - 1;
- DBUG_PRINT("info",
- ("Setting next auto increment cached value to %lu",
- (ulong)tupleId));
- DBUG_RETURN(0);
+ assert(range.m_first_tuple_id < range.m_last_tuple_id);
+ if (tupleId <= range.m_first_tuple_id + 1)
+ DBUG_RETURN(0);
+ if (tupleId <= range.m_last_tuple_id)
+ {
+ range.m_first_tuple_id = tupleId - 1;
+ DBUG_PRINT("info",
+ ("Setting next auto increment cached value to %lu",
+ (ulong)tupleId));
+ DBUG_RETURN(0);
+ }
}
+ /*
+ * if tupleId <= NEXTID, do nothing. otherwise update NEXTID to
+ * tupleId and set cached range to first = last = tupleId - 1.
+ */
+ if (opTupleIdOnNdb(table, range, tupleId, 2) == -1)
+ DBUG_RETURN(-1);
}
- /*
- * if tupleId <= NEXTID, do nothing. otherwise update NEXTID to
- * tupleId and set cached range to first = last = tupleId - 1.
- */
- if (opTupleIdOnNdb(table, range, tupleId, 2) == -1)
- DBUG_RETURN(-1);
}
else
{
@@ -1277,6 +1301,39 @@ int Ndb::initAutoIncrement()
return 0;
}
+bool
+Ndb::checkUpdateAutoIncrementValue(TupleIdRange & range, Uint64 autoValue)
+{
+ return(checkTupleIdInNdb(range, autoValue) != 0);
+}
+
+int
+Ndb::checkTupleIdInNdb(TupleIdRange & range, Uint64 tupleId)
+{
+ DBUG_ENTER("Ndb::checkTupleIdIndNdb");
+ if ((range.m_first_tuple_id != ~(Uint64)0) &&
+ (range.m_first_tuple_id > tupleId))
+ {
+ /*
+ * If we have ever cached a value in this object and this cached
+ * value is larger than the value we're trying to set then we
+ * need not check with the real value in the SYSTAB_0 table.
+ */
+ DBUG_RETURN(0);
+ }
+ if (range.m_highest_seen > tupleId)
+ {
+ /*
+ * Although we've never cached any higher value we have read
+ * a higher value and again it isn't necessary to change the
+ * auto increment value.
+ */
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(1);
+}
+
+
int
Ndb::opTupleIdOnNdb(const NdbTableImpl* table,
TupleIdRange & range, Uint64 & opValue, Uint32 op)
@@ -1342,15 +1399,15 @@ Ndb::opTupleIdOnNdb(const NdbTableImpl* table,
tOperation->write_attr("NEXTID", 1);
tOperation->interpret_exit_ok();
tOperation->def_label(0);
- tOperation->interpret_exit_nok(9999);
-
+ tOperation->interpret_exit_ok();
+ tRecAttrResult = tOperation->getValue("NEXTID");
if (tConnection->execute( NdbTransaction::Commit ) == -1)
{
- if (tConnection->theError.code != 9999)
- goto error_handler;
+ goto error_handler;
}
else
{
+ range.m_highest_seen = tRecAttrResult->u_64_value();
DBUG_PRINT("info",
("Setting next auto increment value (db) to %lu",
(ulong) opValue));
@@ -1363,7 +1420,7 @@ Ndb::opTupleIdOnNdb(const NdbTableImpl* table,
tRecAttrResult = tOperation->getValue("NEXTID");
if (tConnection->execute( NdbTransaction::Commit ) == -1 )
goto error_handler;
- opValue = tRecAttrResult->u_64_value(); // out
+ range.m_highest_seen = opValue = tRecAttrResult->u_64_value(); // out
break;
default:
goto error_handler;
@@ -1819,11 +1876,7 @@ Ndb::printState(const char* fmt, ...)
NdbMutex_Lock(ndb_print_state_mutex);
bool dups = false;
unsigned i;
- ndbout << buf << " ndb=" << hex << this << dec;
-#ifndef NDB_WIN32
- ndbout << " thread=" << (int)pthread_self();
-#endif
- ndbout << endl;
+ ndbout << buf << " ndb=" << hex << (void*)this << endl;
for (unsigned n = 0; n < MAX_NDB_NODES; n++) {
NdbTransaction* con = theConnectionArray[n];
if (con != 0) {
diff --git a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
index ab6d90ad59e..0d226a97621 100644
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
@@ -43,6 +43,7 @@
#include <NdbEnv.h>
#include <NdbMem.h>
#include <util/version.h>
+#include <NdbSleep.h>
#define DEBUG_PRINT 0
#define INCOMPATIBLE_VERSION -2
@@ -1777,7 +1778,23 @@ NdbDictInterface::dictSignal(NdbApiSignal* sig,
{
DBUG_ENTER("NdbDictInterface::dictSignal");
DBUG_PRINT("enter", ("useMasterNodeId: %d", node_specification));
- for(Uint32 i = 0; i<RETRIES; i++){
+
+ int sleep = 50;
+ int mod = 5;
+
+ for(Uint32 i = 0; i<RETRIES; i++)
+ {
+ if (i > 0)
+ NdbSleep_MilliSleep(sleep + 10 * (rand() % mod));
+ if (i == RETRIES / 2)
+ {
+ mod = 10;
+ }
+ if (i == 3*RETRIES/4)
+ {
+ sleep = 100;
+ }
+
m_buffer.clear();
// Protected area
@@ -2340,6 +2357,22 @@ NdbDictionaryImpl::createTable(NdbTableImpl &t)
{
DBUG_ENTER("NdbDictionaryImpl::createTable");
+
+ bool autoIncrement = false;
+ Uint64 initialValue = 0;
+ for (Uint32 i = 0; i < t.m_columns.size(); i++) {
+ const NdbColumnImpl* c = t.m_columns[i];
+ assert(c != NULL);
+ if (c->m_autoIncrement) {
+ if (autoIncrement) {
+ m_error.code = 4335;
+ DBUG_RETURN(-1);
+ }
+ autoIncrement = true;
+ initialValue = c->m_autoIncrementInitialValue;
+ }
+ }
+
// if the new name has not been set, use the copied name
if (t.m_newExternalName.empty())
{
@@ -2377,21 +2410,6 @@ NdbDictionaryImpl::createTable(NdbTableImpl &t)
// auto-increment - use "t" because initial value is not in DICT
{
- bool autoIncrement = false;
- Uint64 initialValue = 0;
- for (Uint32 i = 0; i < t.m_columns.size(); i++) {
- const NdbColumnImpl* c = t.m_columns[i];
- assert(c != NULL);
- if (c->m_autoIncrement) {
- if (autoIncrement) {
- m_error.code = 4335;
- delete t2;
- DBUG_RETURN(-1);
- }
- autoIncrement = true;
- initialValue = c->m_autoIncrementInitialValue;
- }
- }
if (autoIncrement) {
// XXX unlikely race condition - t.m_id may no longer be same table
// the tuple id range is not used on input
diff --git a/storage/ndb/src/ndbapi/NdbScanOperation.cpp b/storage/ndb/src/ndbapi/NdbScanOperation.cpp
index afbec070ac8..96a3ce4332e 100644
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -1340,29 +1340,41 @@ NdbIndexScanOperation::readTuples(LockMode lm,
if(insertATTRINFO(word) == -1)
res = -1;
}
- if(!res && order_by){
- m_ordered = true;
+ if (!res)
+ {
+ /**
+ * Note that it is valid to have order_desc true and order_by false.
+ *
+ * This means that there will be no merge sort among partitions, but
+ * each partition will still be returned in descending sort order.
+ *
+ * This is useful eg. if it is known that the scan spans only one
+ * partition.
+ */
if (order_desc) {
m_descending = true;
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
ScanTabReq::setDescendingFlag(req->requestInfo, true);
}
- Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
- m_sort_columns = cnt; // -1 for NDB$NODE
- m_current_api_receiver = m_sent_receivers_count;
- m_api_receivers_count = m_sent_receivers_count;
+ if (order_by) {
+ m_ordered = true;
+ Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
+ m_sort_columns = cnt; // -1 for NDB$NODE
+ m_current_api_receiver = m_sent_receivers_count;
+ m_api_receivers_count = m_sent_receivers_count;
- m_sort_columns = cnt;
- for(Uint32 i = 0; i<cnt; i++){
- const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
- const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
- NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
- UintPtr newVal = UintPtr(tmp);
- theTupleKeyDefined[i][0] = FAKE_PTR;
- theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
+ m_sort_columns = cnt;
+ for(Uint32 i = 0; i<cnt; i++){
+ const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
+ const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
+ NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
+ UintPtr newVal = UintPtr(tmp);
+ theTupleKeyDefined[i][0] = FAKE_PTR;
+ theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
#if (SIZEOF_CHARP == 8)
- theTupleKeyDefined[i][2] = (newVal >> 32);
+ theTupleKeyDefined[i][2] = (newVal >> 32);
#endif
+ }
}
}
m_this_bound_start = 0;
diff --git a/storage/ndb/src/ndbapi/ndberror.c b/storage/ndb/src/ndbapi/ndberror.c
index e7f946118f2..363e25fc519 100644
--- a/storage/ndb/src/ndbapi/ndberror.c
+++ b/storage/ndb/src/ndbapi/ndberror.c
@@ -314,6 +314,7 @@ ErrorBundle ErrorCodes[] = {
{ 242, DMEC, AE, "Zero concurrency in scan"},
{ 244, DMEC, AE, "Too high concurrency in scan"},
{ 269, DMEC, AE, "No condition and attributes to read in scan"},
+ { 874, DMEC, AE, "Too much attrinfo (e.g. scan filter) for scan in tuple manager" },
{ 4600, DMEC, AE, "Transaction is already started"},
{ 4601, DMEC, AE, "Transaction is not started"},
{ 4602, DMEC, AE, "You must call getNdbOperation before executeScan" },
@@ -426,6 +427,7 @@ ErrorBundle ErrorCodes[] = {
{ 1512, DMEC, SE, "File read error" },
{ 1513, DMEC, IE, "Filegroup not online" },
{ 1514, DMEC, SE, "Currently there is a limit of one logfile group" },
+ { 1515, DMEC, SE, "Currently there is a 4G limit of one undo/data-file in 32-bit host" },
{ 773, DMEC, SE, "Out of string memory, please modify StringMemory config parameter" },
{ 775, DMEC, SE, "Create file is not supported when Diskless=1" },
@@ -625,6 +627,8 @@ ErrorBundle ErrorCodes[] = {
{ 4274, DMEC, IE, "Corrupted main table PK in blob operation" },
{ 4275, DMEC, AE, "The blob method is incompatible with operation type or lock mode" },
{ 4294, DMEC, AE, "Scan filter is too large, discarded" },
+ { 2810, DMEC, TR, "No space left on the device" },
+ { 2815, DMEC, TR, "Error in reading files, please check file system" },
{ NO_CONTACT_WITH_PROCESS, DMEC, AE,
"No contact with the process (dead ?)."},