summaryrefslogtreecommitdiff
path: root/ndb
diff options
context:
space:
mode:
authorunknown <pekka@sama.ndb.mysql.com>2008-01-27 17:26:27 +0100
committerunknown <pekka@sama.ndb.mysql.com>2008-01-27 17:26:27 +0100
commit507c8a1374c7c10900d2e84c3146fe9bb777cf81 (patch)
tree7dc1b4c1bc6373ad125907a671fa5d2933712eae /ndb
parent27a337e7c79ec947549003e8bb4cf1f63df791ee (diff)
parent66b1905fbd73ab1db542ac02c3d3dab5db72c157 (diff)
downloadmariadb-git-507c8a1374c7c10900d2e84c3146fe9bb777cf81.tar.gz
Merge sama.ndb.mysql.com:/export/space/pekka/ndb/version/my50-ndb
into sama.ndb.mysql.com:/export/space/pekka/ndb/version/my50-bug31477
Diffstat (limited to 'ndb')
-rw-r--r--ndb/src/common/util/NdbOut.cpp2
-rw-r--r--ndb/src/kernel/blocks/dbtup/Dbtup.hpp22
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp96
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp6
-rw-r--r--ndb/test/ndbapi/testOIBasic.cpp2943
5 files changed, 1841 insertions, 1228 deletions
diff --git a/ndb/src/common/util/NdbOut.cpp b/ndb/src/common/util/NdbOut.cpp
index 7ca7c91e266..61de2be7572 100644
--- a/ndb/src/common/util/NdbOut.cpp
+++ b/ndb/src/common/util/NdbOut.cpp
@@ -29,7 +29,7 @@ static const char * fms[] = {
"%d", "0x%08x", // Int32
"%u", "0x%08x", // Uint32
"%lld", "0x%016llx", // Int64
- "%llu", "0x%016llx" // Uint64
+ "%llu", "0x%016llx", // Uint64
"%llu", "0x%016llx" // UintPtr
};
diff --git a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
index 6fe0eefcdb5..a80f4542fee 100644
--- a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
+++ b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
@@ -1085,7 +1085,7 @@ public:
/*
* TUX checks if tuple is visible to scan.
*/
- bool tuxQueryTh(Uint32 fragPtrI, Uint32 tupAddr, Uint32 tupVersion, Uint32 transId1, Uint32 transId2, Uint32 savePointId);
+ bool tuxQueryTh(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 transId1, Uint32 transId2, bool dirty, Uint32 savePointId);
private:
BLOCK_DEFINES(Dbtup);
@@ -1942,6 +1942,8 @@ private:
bool getPageThroughSavePoint(Operationrec* const regOperPtr,
Operationrec* const leaderOpPtr);
+ bool find_savepoint(OperationrecPtr& loopOpPtr, Uint32 savepointId);
+
Uint32 calculateChecksum(Page* const pagePtr, Uint32 tupHeadOffset, Uint32 tupHeadSize);
void setChecksum(Page* const pagePtr, Uint32 tupHeadOffset, Uint32 tupHeadSize);
@@ -2467,4 +2469,22 @@ bool Dbtup::isPageUndoLogged(Fragrecord* const regFragPtr,
return false;
}//Dbtup::isUndoLoggingNeeded()
+inline
+bool Dbtup::find_savepoint(OperationrecPtr& loopOpPtr, Uint32 savepointId)
+{
+ while (true) {
+ if (savepointId > loopOpPtr.p->savePointId) {
+ jam();
+ return true;
+ }
+ // note 5.0 has reversed next/prev pointers
+ loopOpPtr.i = loopOpPtr.p->nextActiveOp;
+ if (loopOpPtr.i == RNIL) {
+ break;
+ }
+ ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
+ }
+ return false;
+}
+
#endif
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
index 964d8578217..c03ca35bc6a 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
@@ -246,8 +246,18 @@ Dbtup::accReadPk(Uint32 tableId, Uint32 fragId, Uint32 fragPageId, Uint32 pageIn
return ret;
}
+/*
+ * TUX index contains all tuple versions. A scan in TUX has scanned
+ * one of them and asks if it can be returned as scan result. This
+ * depends on trans id, dirty read flag, and savepoint within trans.
+ *
+ * Previously this faked a ZREAD operation and used getPage().
+ * In TUP getPage() is run after ACC locking, but TUX comes here
+ * before ACC access. Instead of modifying getPage() it is more
+ * clear to do the full check here.
+ */
bool
-Dbtup::tuxQueryTh(Uint32 fragPtrI, Uint32 tupAddr, Uint32 tupVersion, Uint32 transId1, Uint32 transId2, Uint32 savePointId)
+Dbtup::tuxQueryTh(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 transId1, Uint32 transId2, bool dirty, Uint32 savePointId)
{
ljamEntry();
FragrecordPtr fragPtr;
@@ -256,33 +266,73 @@ Dbtup::tuxQueryTh(Uint32 fragPtrI, Uint32 tupAddr, Uint32 tupVersion, Uint32 tra
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
- // get page
PagePtr pagePtr;
- Uint32 fragPageId = tupAddr >> MAX_TUPLES_BITS;
- Uint32 pageIndex = tupAddr & ((1 << MAX_TUPLES_BITS ) - 1);
- // use temp op rec
- Operationrec tempOp;
- tempOp.fragPageId = fragPageId;
- tempOp.pageIndex = pageIndex;
- tempOp.transid1 = transId1;
- tempOp.transid2 = transId2;
- tempOp.savePointId = savePointId;
- tempOp.optype = ZREAD;
- tempOp.dirtyOp = 1;
- if (getPage(pagePtr, &tempOp, fragPtr.p, tablePtr.p)) {
- /*
- * We use the normal getPage which will return the tuple to be used
- * for this transaction and savepoint id. If its tuple version
- * equals the requested then we have a visible tuple otherwise not.
- */
+ pagePtr.i = pageId;
+ ptrCheckGuard(pagePtr, cnoOfPage, page);
+
+ OperationrecPtr currOpPtr;
+ currOpPtr.i = pagePtr.p->pageWord[pageOffset];
+ if (currOpPtr.i == RNIL) {
+ ljam();
+ // tuple has no operation, any scan can see it
+ return true;
+ }
+ ptrCheckGuard(currOpPtr, cnoOfOprec, operationrec);
+
+ const bool sameTrans =
+ transId1 == currOpPtr.p->transid1 &&
+ transId2 == currOpPtr.p->transid2;
+
+ bool res = false;
+ OperationrecPtr loopOpPtr = currOpPtr;
+
+ if (!sameTrans) {
ljam();
- Uint32 read_tupVersion = pagePtr.p->pageWord[tempOp.pageOffset + 1];
- if (read_tupVersion == tupVersion) {
+ if (!dirty) {
ljam();
- return true;
+ if (currOpPtr.p->prevActiveOp == RNIL) {
+ ljam();
+ // last op - TUX makes ACC lock request in same timeslice
+ res = true;
+ }
+ }
+ else {
+ // loop to first op (returns false)
+ find_savepoint(loopOpPtr, 0);
+ const Uint32 op_type = loopOpPtr.p->optype;
+
+ if (op_type != ZINSERT) {
+ ljam();
+ // read committed version from the page
+ const Uint32 origVersion = pagePtr.p->pageWord[pageOffset + 1];
+ if (origVersion == tupVersion) {
+ ljam();
+ res = true;
+ }
+ }
}
}
- return false;
+ else {
+ ljam();
+ // for own trans, ignore dirty flag
+
+ if (find_savepoint(loopOpPtr, savePointId)) {
+ ljam();
+ const Uint32 op_type = loopOpPtr.p->optype;
+
+ if (op_type != ZDELETE) {
+ ljam();
+ // check if this op has produced the scanned version
+ Uint32 loopVersion = loopOpPtr.p->tupVersion;
+ if (loopVersion == tupVersion) {
+ ljam();
+ res = true;
+ }
+ }
+ }
+ }
+
+ return res;
}
// ordered index build
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
index 7eae1486d43..58d37310c91 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
@@ -984,7 +984,8 @@ Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
Uint32 fragBit = ent.m_fragBit;
Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[fragBit];
- Uint32 tupAddr = getTupAddr(frag, ent);
+ Uint32 pageId = ent.m_tupLoc.getPageId();
+ Uint32 pageOffset = ent.m_tupLoc.getPageOffset();
Uint32 tupVersion = ent.m_tupVersion;
// check for same tuple twice in row
if (scan.m_scanEnt.m_tupLoc == ent.m_tupLoc &&
@@ -994,8 +995,9 @@ Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
}
Uint32 transId1 = scan.m_transId1;
Uint32 transId2 = scan.m_transId2;
+ bool dirty = scan.m_readCommitted;
Uint32 savePointId = scan.m_savePointId;
- bool ret = c_tup->tuxQueryTh(tableFragPtrI, tupAddr, tupVersion, transId1, transId2, savePointId);
+ bool ret = c_tup->tuxQueryTh(tableFragPtrI, pageId, pageOffset, tupVersion, transId1, transId2, dirty, savePointId);
jamEntry();
return ret;
}
diff --git a/ndb/test/ndbapi/testOIBasic.cpp b/ndb/test/ndbapi/testOIBasic.cpp
index 8f77f0bb062..a7a7661d53a 100644
--- a/ndb/test/ndbapi/testOIBasic.cpp
+++ b/ndb/test/ndbapi/testOIBasic.cpp
@@ -36,10 +36,11 @@
struct Opt {
// common options
- unsigned m_batch;
+ uint m_batch;
const char* m_bound;
const char* m_case;
bool m_collsp;
+ bool m_cont;
bool m_core;
const char* m_csname;
CHARSET_INFO* m_cs;
@@ -47,26 +48,29 @@ struct Opt {
bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype;
const char* m_index;
- unsigned m_loop;
+ uint m_loop;
bool m_msglock;
bool m_nologging;
bool m_noverify;
- unsigned m_pctnull;
- unsigned m_rows;
- unsigned m_samples;
- unsigned m_scanbatch;
- unsigned m_scanpar;
- unsigned m_scanstop;
+ uint m_pctnull;
+ uint m_rows;
+ uint m_samples;
+ uint m_scanbatch;
+ uint m_scanpar;
+ uint m_scanstop;
int m_seed;
- unsigned m_subloop;
+ const char* m_skip;
+ uint m_sloop;
+ uint m_ssloop;
const char* m_table;
- unsigned m_threads;
+ uint m_threads;
int m_v; // int for lint
Opt() :
m_batch(32),
m_bound("01234"),
m_case(0),
m_collsp(false),
+ m_cont(false),
m_core(false),
m_csname("random"),
m_cs(0),
@@ -85,7 +89,9 @@ struct Opt {
m_scanpar(0),
m_scanstop(0),
m_seed(-1),
- m_subloop(4),
+ m_skip(0),
+ m_sloop(4),
+ m_ssloop(4),
m_table(0),
m_threads(4),
m_v(1) {
@@ -107,6 +113,7 @@ printhelp()
<< " -bound xyz use only these bound types 0-4 [" << d.m_bound << "]" << endl
<< " -case abc only given test cases (letters a-z)" << endl
<< " -collsp use strnncollsp instead of strnxfrm" << endl
+ << " -cont on error continue to next test case [" << d.m_cont << "]" << endl
<< " -core core dump on error [" << d.m_core << "]" << endl
<< " -csname S charset or collation [" << d.m_csname << "]" << endl
<< " -die nnn exit immediately on NDB error code nnn" << endl
@@ -122,7 +129,9 @@ printhelp()
<< " -scanbatch N scan batch 0=default [" << d.m_scanbatch << "]" << endl
<< " -scanpar N scan parallel 0=default [" << d.m_scanpar << "]" << endl
<< " -seed N srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl
- << " -subloop N subtest (and subsubtest) loop count [" << d.m_subloop << "]" << endl
+ << " -skip abc skip given test cases (letters a-z)" << endl
+ << " -sloop N level 2 (sub)loop count [" << d.m_sloop << "]" << endl
+ << " -ssloop N level 3 (sub)loop count [" << d.m_ssloop << "]" << endl
<< " -table xyz only given table numbers (digits 0-9)" << endl
<< " -threads N number of threads [" << d.m_threads << "]" << endl
<< " -vN verbosity [" << d.m_v << "]" << endl
@@ -142,17 +151,17 @@ static const char* hexstr = "0123456789abcdef";
// random ints
-static unsigned
-urandom(unsigned n)
+static uint
+urandom(uint n)
{
if (n == 0)
return 0;
- unsigned i = random() % n;
+ uint i = random() % n;
return i;
}
static int
-irandom(unsigned n)
+irandom(uint n)
{
if (n == 0)
return 0;
@@ -163,7 +172,7 @@ irandom(unsigned n)
}
static bool
-randompct(unsigned pct)
+randompct(uint pct)
{
if (pct == 0)
return false;
@@ -172,15 +181,15 @@ randompct(unsigned pct)
return urandom(100) < pct;
}
-static unsigned
-random_coprime(unsigned n)
+static uint
+random_coprime(uint n)
{
- unsigned prime[] = { 101, 211, 307, 401, 503, 601, 701, 809, 907 };
- unsigned count = sizeof(prime) / sizeof(prime[0]);
+ uint prime[] = { 101, 211, 307, 401, 503, 601, 701, 809, 907 };
+ uint count = sizeof(prime) / sizeof(prime[0]);
if (n == 0)
return 0;
while (1) {
- unsigned i = urandom(count);
+ uint i = urandom(count);
if (n % prime[i] != 0)
return prime[i];
}
@@ -189,16 +198,16 @@ random_coprime(unsigned n)
// random re-sequence of 0...(n-1)
struct Rsq {
- Rsq(unsigned n);
- unsigned next();
+ Rsq(uint n);
+ uint next();
private:
- unsigned m_n;
- unsigned m_i;
- unsigned m_start;
- unsigned m_prime;
+ uint m_n;
+ uint m_i;
+ uint m_start;
+ uint m_prime;
};
-Rsq::Rsq(unsigned n)
+Rsq::Rsq(uint n)
{
m_n = n;
m_i = 0;
@@ -206,7 +215,7 @@ Rsq::Rsq(unsigned n)
m_prime = random_coprime(n);
}
-unsigned
+uint
Rsq::next()
{
assert(m_n != 0);
@@ -216,30 +225,16 @@ Rsq::next()
// log and error macros
static NdbMutex *ndbout_mutex = NULL;
-
-static unsigned getthrno();
-
-static const char*
-getthrstr()
-{
- static char buf[20];
- unsigned n = getthrno();
- if (n == (unsigned)-1)
- strcpy(buf, "");
- else {
- unsigned m =
- g_opt.m_threads < 10 ? 1 :
- g_opt.m_threads < 100 ? 2 : 3;
- sprintf(buf, "[%0*u] ", m, n);
- }
- return buf;
-}
+static const char* getthrprefix();
#define LLN(n, s) \
do { \
if ((n) > g_opt.m_v) break; \
if (g_opt.m_msglock) NdbMutex_Lock(ndbout_mutex); \
- ndbout << getthrstr() << s << endl; \
+ ndbout << getthrprefix(); \
+ if ((n) > 2) \
+ ndbout << "line " << __LINE__ << ": "; \
+ ndbout << s << endl; \
if (g_opt.m_msglock) NdbMutex_Unlock(ndbout_mutex); \
} while(0)
@@ -250,6 +245,8 @@ getthrstr()
#define LL4(s) LLN(4, s)
#define LL5(s) LLN(5, s)
+#define HEX(x) hex << (x) << dec
+
// following check a condition and return -1 on failure
#undef CHK // simple check
@@ -281,53 +278,58 @@ getthrstr()
class Thr;
class Con;
class Tab;
+class ITab;
class Set;
class Tmr;
struct Par : public Opt {
- unsigned m_no;
+ uint m_no;
Con* m_con;
Con& con() const { assert(m_con != 0); return *m_con; }
const Tab* m_tab;
const Tab& tab() const { assert(m_tab != 0); return *m_tab; }
+ const ITab* m_itab;
+ const ITab& itab() const { assert(m_itab != 0); return *m_itab; }
Set* m_set;
Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
char m_currcase[2];
- unsigned m_lno;
- unsigned m_slno;
- unsigned m_totrows;
+ uint m_lno;
+ uint m_slno;
+ uint m_totrows;
// value calculation
- unsigned m_range;
- unsigned m_pctrange;
- unsigned m_pctbrange;
+ uint m_range;
+ uint m_pctrange;
+ uint m_pctbrange;
int m_bdir;
bool m_noindexkeyupdate;
// choice of key
bool m_randomkey;
// do verify after read
bool m_verify;
- // deadlock possible
- bool m_deadlock;
- // abort percentabge
- unsigned m_abortpct;
+ // errors to catch (see Con)
+ bool m_catcherr;
+ // abort percentage
+ uint m_abortpct;
NdbOperation::LockMode m_lockmode;
// scan options
bool m_tupscan;
bool m_ordered;
bool m_descending;
- // timer location
+ // threads used by current test case
+ uint m_usedthreads;
Par(const Opt& opt) :
Opt(opt),
m_no(0),
m_con(0),
m_tab(0),
+ m_itab(0),
m_set(0),
m_tmr(0),
m_lno(0),
m_slno(0),
- m_totrows(m_threads * m_rows),
+ m_totrows(0),
m_range(m_rows),
m_pctrange(40),
m_pctbrange(80),
@@ -335,39 +337,40 @@ struct Par : public Opt {
m_noindexkeyupdate(false),
m_randomkey(false),
m_verify(false),
- m_deadlock(false),
+ m_catcherr(0),
m_abortpct(0),
m_lockmode(NdbOperation::LM_Read),
m_tupscan(false),
m_ordered(false),
- m_descending(false)
+ m_descending(false),
+ m_usedthreads(0)
{
m_currcase[0] = 0;
}
};
static bool
-usetable(Par par, unsigned i)
+usetable(Par par, uint i)
{
return par.m_table == 0 || strchr(par.m_table, '0' + i) != 0;
}
static bool
-useindex(Par par, unsigned i)
+useindex(Par par, uint i)
{
return par.m_index == 0 || strchr(par.m_index, '0' + i) != 0;
}
-static unsigned
-thrrow(Par par, unsigned j)
+static uint
+thrrow(Par par, uint j)
{
- return par.m_threads * j + par.m_no;
+ return par.m_usedthreads * j + par.m_no;
}
static bool
-isthrrow(Par par, unsigned i)
+isthrrow(Par par, uint i)
{
- return i % par.m_threads == par.m_no;
+ return i % par.m_usedthreads == par.m_no;
}
// timer
@@ -375,13 +378,13 @@ isthrrow(Par par, unsigned i)
struct Tmr {
void clr();
void on();
- void off(unsigned cnt = 0);
+ void off(uint cnt = 0);
const char* time();
const char* pct(const Tmr& t1);
const char* over(const Tmr& t1);
NDB_TICKS m_on;
- unsigned m_ms;
- unsigned m_cnt;
+ uint m_ms;
+ uint m_cnt;
char m_time[100];
char m_text[100];
Tmr() { clr(); }
@@ -401,7 +404,7 @@ Tmr::on()
}
void
-Tmr::off(unsigned cnt)
+Tmr::off(uint cnt)
{
NDB_TICKS off = NdbTick_CurrentMillisecond();
assert(m_on != 0 && off >= m_on);
@@ -446,53 +449,18 @@ Tmr::over(const Tmr& t1)
return m_text;
}
-// list of ints
-
-struct Lst {
- Lst();
- unsigned m_arr[1000];
- unsigned m_cnt;
- void push(unsigned i);
- unsigned cnt() const;
- void reset();
-};
-
-Lst::Lst() :
- m_cnt(0)
-{
-}
-
-void
-Lst::push(unsigned i)
-{
- assert(m_cnt < sizeof(m_arr)/sizeof(m_arr[0]));
- m_arr[m_cnt++] = i;
-}
-
-unsigned
-Lst::cnt() const
-{
- return m_cnt;
-}
-
-void
-Lst::reset()
-{
- m_cnt = 0;
-}
-
// character sets
-static const unsigned maxcsnumber = 512;
-static const unsigned maxcharcount = 32;
-static const unsigned maxcharsize = 4;
-static const unsigned maxxmulsize = 8;
+static const uint maxcsnumber = 512;
+static const uint maxcharcount = 32;
+static const uint maxcharsize = 4;
+static const uint maxxmulsize = 8;
// single mb char
struct Chr {
- unsigned char m_bytes[maxcharsize];
- unsigned char m_xbytes[maxxmulsize * maxcharsize];
- unsigned m_size;
+ uchar m_bytes[maxcharsize];
+ uchar m_xbytes[maxxmulsize * maxcharsize];
+ uint m_size;
Chr();
};
@@ -506,7 +474,7 @@ Chr::Chr()
// charset and random valid chars to use
struct Chs {
CHARSET_INFO* m_cs;
- unsigned m_xmul;
+ uint m_xmul;
Chr* m_chr;
Chs(CHARSET_INFO* cs);
~Chs();
@@ -523,22 +491,22 @@ Chs::Chs(CHARSET_INFO* cs) :
m_xmul = 1;
assert(m_xmul <= maxxmulsize);
m_chr = new Chr [maxcharcount];
- unsigned i = 0;
- unsigned miss1 = 0;
- unsigned miss2 = 0;
- unsigned miss3 = 0;
- unsigned miss4 = 0;
+ uint i = 0;
+ uint miss1 = 0;
+ uint miss2 = 0;
+ uint miss3 = 0;
+ uint miss4 = 0;
while (i < maxcharcount) {
- unsigned char* bytes = m_chr[i].m_bytes;
- unsigned char* xbytes = m_chr[i].m_xbytes;
- unsigned& size = m_chr[i].m_size;
+ uchar* bytes = m_chr[i].m_bytes;
+ uchar* xbytes = m_chr[i].m_xbytes;
+ uint& size = m_chr[i].m_size;
bool ok;
size = m_cs->mbminlen + urandom(m_cs->mbmaxlen - m_cs->mbminlen + 1);
assert(m_cs->mbminlen <= size && size <= m_cs->mbmaxlen);
// prefer longer chars
if (size == m_cs->mbminlen && m_cs->mbminlen < m_cs->mbmaxlen && urandom(5) != 0)
continue;
- for (unsigned j = 0; j < size; j++) {
+ for (uint j = 0; j < size; j++) {
bytes[j] = urandom(256);
}
int not_used;
@@ -550,13 +518,13 @@ Chs::Chs(CHARSET_INFO* cs) :
}
// check no proper prefix wellformed
ok = true;
- for (unsigned j = 1; j < size; j++) {
+ for (uint j = 1; j < size; j++) {
if ((*cs->cset->well_formed_len)(cs, sbytes, sbytes + j, 1, &not_used) == j) {
ok = false;
break;
}
}
- if (! ok) {
+ if (!ok) {
miss2++;
continue;
}
@@ -566,37 +534,37 @@ Chs::Chs(CHARSET_INFO* cs) :
int xlen = (*cs->coll->strnxfrm)(cs, xbytes, m_xmul * size, bytes, size);
// check we got something
ok = false;
- for (unsigned j = 0; j < xlen; j++) {
+ for (uint j = 0; j < xlen; j++) {
if (xbytes[j] != 0) {
ok = true;
break;
}
}
- if (! ok) {
+ if (!ok) {
miss3++;
continue;
}
// check for duplicate (before normalize)
ok = true;
- for (unsigned j = 0; j < i; j++) {
+ for (uint j = 0; j < i; j++) {
const Chr& chr = m_chr[j];
if (chr.m_size == size && memcmp(chr.m_bytes, bytes, size) == 0) {
ok = false;
break;
}
}
- if (! ok) {
+ if (!ok) {
miss4++;
continue;
}
i++;
}
bool disorder = true;
- unsigned bubbles = 0;
+ uint bubbles = 0;
while (disorder) {
disorder = false;
- for (unsigned i = 1; i < maxcharcount; i++) {
- unsigned len = sizeof(m_chr[i].m_xbytes);
+ for (uint i = 1; i < maxcharcount; i++) {
+ uint len = sizeof(m_chr[i].m_xbytes);
if (memcmp(m_chr[i-1].m_xbytes, m_chr[i].m_xbytes, len) > 0) {
Chr chr = m_chr[i];
m_chr[i] = m_chr[i-1];
@@ -627,7 +595,7 @@ static Chs* cslist[maxcsnumber];
static void
resetcslist()
{
- for (unsigned i = 0; i < maxcsnumber; i++) {
+ for (uint i = 0; i < maxcsnumber; i++) {
delete cslist[i];
cslist[i] = 0;
}
@@ -641,7 +609,7 @@ getcs(Par par)
cs = par.m_cs;
} else {
while (1) {
- unsigned n = urandom(maxcsnumber);
+ uint n = urandom(maxcsnumber);
cs = get_charset(n, MYF(0));
if (cs != 0) {
// prefer complex charsets
@@ -667,24 +635,24 @@ struct Col {
Longvarchar = NdbDictionary::Column::Longvarchar
};
const class Tab& m_tab;
- unsigned m_num;
+ uint m_num;
const char* m_name;
bool m_pk;
Type m_type;
- unsigned m_length;
- unsigned m_bytelength; // multiplied by char width
- unsigned m_attrsize; // base type size
- unsigned m_headsize; // length bytes
- unsigned m_bytesize; // full value size
+ uint m_length;
+ uint m_bytelength; // multiplied by char width
+ uint m_attrsize; // base type size
+ uint m_headsize; // length bytes
+ uint m_bytesize; // full value size
bool m_nullable;
const Chs* m_chs;
- Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs);
+ Col(const class Tab& tab, uint num, const char* name, bool pk, Type type, uint length, bool nullable, const Chs* chs);
~Col();
bool equal(const Col& col2) const;
void wellformed(const void* addr) const;
};
-Col::Col(const class Tab& tab, unsigned num, const char* name, bool pk, Type type, unsigned length, bool nullable, const Chs* chs) :
+Col::Col(const class Tab& tab, uint num, const char* name, bool pk, Type type, uint length, bool nullable, const Chs* chs) :
m_tab(tab),
m_num(num),
m_name(strcpy(new char [strlen(name) + 1], name)),
@@ -735,7 +703,7 @@ Col::wellformed(const void* addr) const
{
CHARSET_INFO* cs = m_chs->m_cs;
const char* src = (const char*)addr;
- unsigned len = m_bytelength;
+ uint len = m_bytelength;
int not_used;
assert((*cs->cset->well_formed_len)(cs, src, src + len, 0xffff, &not_used) == len);
}
@@ -743,9 +711,9 @@ Col::wellformed(const void* addr) const
case Col::Varchar:
{
CHARSET_INFO* cs = m_chs->m_cs;
- const unsigned char* src = (const unsigned char*)addr;
+ const uchar* src = (const uchar*)addr;
const char* ssrc = (const char*)src;
- unsigned len = src[0];
+ uint len = src[0];
int not_used;
assert(len <= m_bytelength);
assert((*cs->cset->well_formed_len)(cs, ssrc + 1, ssrc + 1 + len, 0xffff, &not_used) == len);
@@ -754,9 +722,9 @@ Col::wellformed(const void* addr) const
case Col::Longvarchar:
{
CHARSET_INFO* cs = m_chs->m_cs;
- const unsigned char* src = (const unsigned char*)addr;
+ const uchar* src = (const uchar*)addr;
const char* ssrc = (const char*)src;
- unsigned len = src[0] + (src[1] << 8);
+ uint len = src[0] + (src[1] << 8);
int not_used;
assert(len <= m_bytelength);
assert((*cs->cset->well_formed_len)(cs, ssrc + 2, ssrc + 2 + len, 0xffff, &not_used) == len);
@@ -774,7 +742,7 @@ operator<<(NdbOut& out, const Col& col)
out << "col[" << col.m_num << "] " << col.m_name;
switch (col.m_type) {
case Col::Unsigned:
- out << " unsigned";
+ out << " uint";
break;
case Col::Char:
{
@@ -808,13 +776,13 @@ operator<<(NdbOut& out, const Col& col)
struct ICol {
const class ITab& m_itab;
- unsigned m_num;
+ uint m_num;
const Col& m_col;
- ICol(const class ITab& itab, unsigned num, const Col& col);
+ ICol(const class ITab& itab, uint num, const Col& col);
~ICol();
};
-ICol::ICol(const class ITab& itab, unsigned num, const Col& col) :
+ICol::ICol(const class ITab& itab, uint num, const Col& col) :
m_itab(itab),
m_num(num),
m_col(col)
@@ -842,47 +810,47 @@ struct ITab {
const class Tab& m_tab;
const char* m_name;
Type m_type;
- unsigned m_icols;
+ uint m_icols;
const ICol** m_icol;
- unsigned m_colmask;
- ITab(const class Tab& tab, const char* name, Type type, unsigned icols);
+ uint m_keymask;
+ ITab(const class Tab& tab, const char* name, Type type, uint icols);
~ITab();
- void icoladd(unsigned k, const ICol* icolptr);
+ void icoladd(uint k, const ICol* icolptr);
};
-ITab::ITab(const class Tab& tab, const char* name, Type type, unsigned icols) :
+ITab::ITab(const class Tab& tab, const char* name, Type type, uint icols) :
m_tab(tab),
m_name(strcpy(new char [strlen(name) + 1], name)),
m_type(type),
m_icols(icols),
m_icol(new const ICol* [icols + 1]),
- m_colmask(0)
+ m_keymask(0)
{
- for (unsigned k = 0; k <= m_icols; k++)
+ for (uint k = 0; k <= m_icols; k++)
m_icol[k] = 0;
}
ITab::~ITab()
{
delete [] m_name;
- for (unsigned i = 0; i < m_icols; i++)
+ for (uint i = 0; i < m_icols; i++)
delete m_icol[i];
delete [] m_icol;
}
void
-ITab::icoladd(unsigned k, const ICol* icolptr)
+ITab::icoladd(uint k, const ICol* icolptr)
{
assert(k == icolptr->m_num && k < m_icols && m_icol[k] == 0);
m_icol[k] = icolptr;
- m_colmask |= (1 << icolptr->m_col.m_num);
+ m_keymask |= (1 << icolptr->m_col.m_num);
}
static NdbOut&
operator<<(NdbOut& out, const ITab& itab)
{
out << "itab " << itab.m_name << " icols=" << itab.m_icols;
- for (unsigned k = 0; k < itab.m_icols; k++) {
+ for (uint k = 0; k < itab.m_icols; k++) {
const ICol& icol = *itab.m_icol[k];
out << endl << icol;
}
@@ -893,56 +861,60 @@ operator<<(NdbOut& out, const ITab& itab)
struct Tab {
const char* m_name;
- unsigned m_cols;
+ uint m_cols;
const Col** m_col;
- unsigned m_itabs;
+ uint m_pkmask;
+ uint m_itabs;
const ITab** m_itab;
- unsigned m_orderedindexes;
- unsigned m_hashindexes;
+ uint m_orderedindexes;
+ uint m_hashindexes;
// pk must contain an Unsigned column
- unsigned m_keycol;
- void coladd(unsigned k, Col* colptr);
- void itabadd(unsigned j, ITab* itab);
- Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol);
+ uint m_keycol;
+ void coladd(uint k, Col* colptr);
+ void itabadd(uint j, ITab* itab);
+ Tab(const char* name, uint cols, uint itabs, uint keycol);
~Tab();
};
-Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) :
+Tab::Tab(const char* name, uint cols, uint itabs, uint keycol) :
m_name(strcpy(new char [strlen(name) + 1], name)),
m_cols(cols),
m_col(new const Col* [cols + 1]),
+ m_pkmask(0),
m_itabs(itabs),
m_itab(new const ITab* [itabs + 1]),
m_orderedindexes(0),
m_hashindexes(0),
m_keycol(keycol)
{
- for (unsigned k = 0; k <= cols; k++)
+ for (uint k = 0; k <= cols; k++)
m_col[k] = 0;
- for (unsigned j = 0; j <= itabs; j++)
+ for (uint j = 0; j <= itabs; j++)
m_itab[j] = 0;
}
Tab::~Tab()
{
delete [] m_name;
- for (unsigned i = 0; i < m_cols; i++)
+ for (uint i = 0; i < m_cols; i++)
delete m_col[i];
delete [] m_col;
- for (unsigned i = 0; i < m_itabs; i++)
+ for (uint i = 0; i < m_itabs; i++)
delete m_itab[i];
delete [] m_itab;
}
void
-Tab::coladd(unsigned k, Col* colptr)
+Tab::coladd(uint k, Col* colptr)
{
assert(k == colptr->m_num && k < m_cols && m_col[k] == 0);
m_col[k] = colptr;
+ if (colptr->m_pk)
+ m_pkmask |= (1 << k);
}
void
-Tab::itabadd(unsigned j, ITab* itabptr)
+Tab::itabadd(uint j, ITab* itabptr)
{
assert(j < m_itabs && m_itab[j] == 0 && itabptr != 0);
m_itab[j] = itabptr;
@@ -956,11 +928,11 @@ static NdbOut&
operator<<(NdbOut& out, const Tab& tab)
{
out << "tab " << tab.m_name << " cols=" << tab.m_cols;
- for (unsigned k = 0; k < tab.m_cols; k++) {
+ for (uint k = 0; k < tab.m_cols; k++) {
const Col& col = *tab.m_col[k];
out << endl << col;
}
- for (unsigned i = 0; i < tab.m_itabs; i++) {
+ for (uint i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
@@ -972,20 +944,20 @@ operator<<(NdbOut& out, const Tab& tab)
// make table structs
static const Tab** tablist = 0;
-static unsigned tabcount = 0;
+static uint tabcount = 0;
static void
verifytables()
{
- for (unsigned j = 0; j < tabcount; j++) {
+ for (uint j = 0; j < tabcount; j++) {
const Tab* t = tablist[j];
if (t == 0)
continue;
assert(t->m_cols != 0 && t->m_col != 0);
- for (unsigned k = 0; k < t->m_cols; k++) {
+ for (uint k = 0; k < t->m_cols; k++) {
const Col* c = t->m_col[k];
assert(c != 0 && c->m_num == k);
- assert(! (c->m_pk && c->m_nullable));
+ assert(!(c->m_pk && c->m_nullable));
}
assert(t->m_col[t->m_cols] == 0);
{
@@ -994,16 +966,16 @@ verifytables()
assert(c->m_pk && c->m_type == Col::Unsigned);
}
assert(t->m_itabs != 0 && t->m_itab != 0);
- for (unsigned i = 0; i < t->m_itabs; i++) {
+ for (uint i = 0; i < t->m_itabs; i++) {
const ITab* x = t->m_itab[i];
if (x == 0)
continue;
assert(x != 0 && x->m_icols != 0 && x->m_icol != 0);
- for (unsigned k = 0; k < x->m_icols; k++) {
+ for (uint k = 0; k < x->m_icols; k++) {
const ICol* c = x->m_icol[k];
assert(c != 0 && c->m_num == k && c->m_col.m_num < t->m_cols);
if (x->m_type == ITab::UniqueHashIndex) {
- assert(! c->m_col.m_nullable);
+ assert(!c->m_col.m_nullable);
}
}
}
@@ -1019,11 +991,11 @@ makebuiltintables(Par par)
tabcount = 3;
if (tablist == 0) {
tablist = new const Tab* [tabcount];
- for (unsigned j = 0; j < tabcount; j++) {
+ for (uint j = 0; j < tabcount; j++) {
tablist[j] = 0;
}
} else {
- for (unsigned j = 0; j < tabcount; j++) {
+ for (uint j = 0; j < tabcount; j++) {
delete tablist[j];
tablist[j] = 0;
}
@@ -1202,7 +1174,8 @@ static Ndb_cluster_connection* g_ncc = 0;
struct Con {
Ndb* m_ndb;
NdbDictionary::Dictionary* m_dic;
- NdbConnection* m_tx;
+ NdbTransaction* m_tx;
+ Uint64 m_txid;
NdbOperation* m_op;
NdbIndexOperation* m_indexop;
NdbScanOperation* m_scanop;
@@ -1210,10 +1183,16 @@ struct Con {
NdbScanFilter* m_scanfilter;
enum ScanMode { ScanNo = 0, Committed, Latest, Exclusive };
ScanMode m_scanmode;
- enum ErrType { ErrNone = 0, ErrDeadlock, ErrNospace, ErrOther };
+ enum ErrType {
+ ErrNone = 0,
+ ErrDeadlock = 1,
+ ErrNospace = 2,
+ ErrOther = 4
+ };
ErrType m_errtype;
+ char m_errname[100];
Con() :
- m_ndb(0), m_dic(0), m_tx(0), m_op(0), m_indexop(0),
+ m_ndb(0), m_dic(0), m_tx(0), m_txid(0), m_op(0), m_indexop(0),
m_scanop(0), m_indexscanop(0), m_scanfilter(0),
m_scanmode(ScanNo), m_errtype(ErrNone) {}
~Con() {
@@ -1237,18 +1216,20 @@ struct Con {
int setBound(int num, int type, const void* value);
int beginFilter(int group);
int endFilter();
- int setFilter(int num, int cond, const void* value, unsigned len);
- int execute(ExecType t);
- int execute(ExecType t, bool& deadlock, bool& nospace);
+ int setFilter(int num, int cond, const void* value, uint len);
+ int execute(ExecType et);
+ int execute(ExecType et, uint& err);
+ int readTuple(Par par);
int readTuples(Par par);
int readIndexTuples(Par par);
int executeScan();
int nextScanResult(bool fetchAllowed);
- int nextScanResult(bool fetchAllowed, bool& deadlock);
+ int nextScanResult(bool fetchAllowed, uint& err);
int updateScanTuple(Con& con2);
int deleteScanTuple(Con& con2);
void closeScan();
void closeTransaction();
+ const char* errname(uint err);
void printerror(NdbOut& out);
};
@@ -1259,7 +1240,7 @@ Con::connect()
m_ndb = new Ndb(g_ncc, "TEST_DB");
CHKCON(m_ndb->init() == 0, *this);
CHKCON(m_ndb->waitUntilReady(30) == 0, *this);
- m_tx = 0, m_op = 0;
+ m_tx = 0, m_txid = 0, m_op = 0;
return 0;
}
@@ -1274,7 +1255,7 @@ void
Con::disconnect()
{
delete m_ndb;
- m_ndb = 0, m_dic = 0, m_tx = 0, m_op = 0;
+ m_ndb = 0, m_dic = 0, m_tx = 0, m_txid = 0, m_op = 0;
}
int
@@ -1284,6 +1265,7 @@ Con::startTransaction()
if (m_tx != 0)
closeTransaction();
CHKCON((m_tx = m_ndb->startTransaction()) != 0, *this);
+ m_txid = m_tx->getTransactionId();
return 0;
}
@@ -1307,7 +1289,7 @@ int
Con::getNdbIndexOperation(const ITab& itab, const Tab& tab)
{
assert(m_tx != 0);
- unsigned tries = 0;
+ uint tries = 0;
while (1) {
if (getNdbIndexOperation1(itab, tab) == 0)
break;
@@ -1337,7 +1319,7 @@ int
Con::getNdbIndexScanOperation(const ITab& itab, const Tab& tab)
{
assert(m_tx != 0);
- unsigned tries = 0;
+ uint tries = 0;
while (1) {
if (getNdbIndexScanOperation1(itab, tab) == 0)
break;
@@ -1405,7 +1387,7 @@ Con::endFilter()
}
int
-Con::setFilter(int num, int cond, const void* value, unsigned len)
+Con::setFilter(int num, int cond, const void* value, uint len)
{
assert(m_tx != 0 && m_scanfilter != 0);
CHKCON(m_scanfilter->cmp((NdbScanFilter::BinaryCondition)cond, num, value, len) == 0, *this);
@@ -1413,34 +1395,46 @@ Con::setFilter(int num, int cond, const void* value, unsigned len)
}
int
-Con::execute(ExecType t)
+Con::execute(ExecType et)
{
assert(m_tx != 0);
- CHKCON(m_tx->execute(t) == 0, *this);
+ CHKCON(m_tx->execute(et) == 0, *this);
return 0;
}
int
-Con::execute(ExecType t, bool& deadlock, bool& nospace)
+Con::execute(ExecType et, uint& err)
{
- int ret = execute(t);
- if (ret != 0 && deadlock && m_errtype == ErrDeadlock) {
- LL3("caught deadlock");
- ret = 0;
- } else {
- deadlock = false;
- }
- if (ret != 0 && nospace && m_errtype == ErrNospace) {
- LL3("caught nospace");
- ret = 0;
- } else {
- nospace = false;
+ int ret = execute(et);
+ // err in: errors to catch, out: error caught
+ const uint errin = err;
+ err = 0;
+ if (ret == -1) {
+ if (m_errtype == ErrDeadlock && (errin & ErrDeadlock)) {
+ LL3("caught deadlock");
+ err = ErrDeadlock;
+ ret = 0;
+ }
+ if (m_errtype == ErrNospace && (errin & ErrNospace)) {
+ LL3("caught nospace");
+ err = ErrNospace;
+ ret = 0;
+ }
}
CHK(ret == 0);
return 0;
}
int
+Con::readTuple(Par par)
+{
+ assert(m_tx != 0 && m_op != 0);
+ NdbOperation::LockMode lm = par.m_lockmode;
+ CHKCON(m_op->readTuple(lm) == 0, *this);
+ return 0;
+}
+
+int
Con::readTuples(Par par)
{
assert(m_tx != 0 && m_scanop != 0);
@@ -1477,23 +1471,25 @@ Con::nextScanResult(bool fetchAllowed)
int ret;
assert(m_scanop != 0);
CHKCON((ret = m_scanop->nextResult(fetchAllowed)) != -1, *this);
- assert(ret == 0 || ret == 1 || (! fetchAllowed && ret == 2));
+ assert(ret == 0 || ret == 1 || (!fetchAllowed && ret == 2));
return ret;
}
int
-Con::nextScanResult(bool fetchAllowed, bool& deadlock)
+Con::nextScanResult(bool fetchAllowed, uint& err)
{
int ret = nextScanResult(fetchAllowed);
+ // err in: errors to catch, out: error caught
+ const uint errin = err;
+ err = 0;
if (ret == -1) {
- if (deadlock && m_errtype == ErrDeadlock) {
+ if (m_errtype == ErrDeadlock && (errin & ErrDeadlock)) {
LL3("caught deadlock");
+ err = ErrDeadlock;
ret = 0;
}
- } else {
- deadlock = false;
}
- CHK(ret == 0 || ret == 1 || (! fetchAllowed && ret == 2));
+ CHK(ret == 0 || ret == 1 || (!fetchAllowed && ret == 2));
return ret;
}
@@ -1502,6 +1498,7 @@ Con::updateScanTuple(Con& con2)
{
assert(con2.m_tx != 0);
CHKCON((con2.m_op = m_scanop->updateCurrentTuple(con2.m_tx)) != 0, *this);
+ con2.m_txid = m_txid; // in the kernel
return 0;
}
@@ -1510,6 +1507,7 @@ Con::deleteScanTuple(Con& con2)
{
assert(con2.m_tx != 0);
CHKCON(m_scanop->deleteCurrentTuple(con2.m_tx) == 0, *this);
+ con2.m_txid = m_txid; // in the kernel
return 0;
}
@@ -1527,15 +1525,26 @@ Con::closeTransaction()
{
assert(m_ndb != 0 && m_tx != 0);
m_ndb->closeTransaction(m_tx);
- m_tx = 0, m_op = 0;
+ m_tx = 0, m_txid = 0, m_op = 0;
m_scanop = 0, m_indexscanop = 0;
}
+const char*
+Con::errname(uint err)
+{
+ sprintf(m_errname, "0x%x", err);
+ if (err & ErrDeadlock)
+ strcat(m_errname, ",deadlock");
+ if (err & ErrNospace)
+ strcat(m_errname, ",nospace");
+ return m_errname;
+}
+
void
Con::printerror(NdbOut& out)
{
m_errtype = ErrOther;
- unsigned any = 0;
+ uint any = 0;
int code;
int die = 0;
if (m_ndb) {
@@ -1563,7 +1572,7 @@ Con::printerror(NdbOut& out)
}
}
}
- if (! any) {
+ if (!any) {
LL0("failed but no NDB error code");
}
if (die) {
@@ -1589,7 +1598,7 @@ invalidateindex(Par par)
{
Con& con = par.con();
const Tab& tab = par.tab();
- for (unsigned i = 0; i < tab.m_itabs; i++) {
+ for (uint i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
@@ -1639,7 +1648,7 @@ createtable(Par par)
if (par.m_nologging) {
t.setLogging(false);
}
- for (unsigned k = 0; k < tab.m_cols; k++) {
+ for (uint k = 0; k < tab.m_cols; k++) {
const Col& col = *tab.m_col[k];
NdbDictionary::Column c(col.m_name);
c.setType((NdbDictionary::Column::Type)col.m_type);
@@ -1677,7 +1686,7 @@ static int
dropindex(Par par)
{
const Tab& tab = par.tab();
- for (unsigned i = 0; i < tab.m_itabs; i++) {
+ for (uint i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
@@ -1699,7 +1708,7 @@ createindex(Par par, const ITab& itab)
if (par.m_nologging || itab.m_type == ITab::OrderedIndex) {
x.setLogging(false);
}
- for (unsigned k = 0; k < itab.m_icols; k++) {
+ for (uint k = 0; k < itab.m_icols; k++) {
const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
x.addColumnName(col.m_name);
@@ -1714,7 +1723,7 @@ static int
createindex(Par par)
{
const Tab& tab = par.tab();
- for (unsigned i = 0; i < tab.m_itabs; i++) {
+ for (uint i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
@@ -1731,27 +1740,29 @@ struct Val {
const Col& m_col;
union {
Uint32 m_uint32;
- unsigned char* m_char;
- unsigned char* m_varchar;
- unsigned char* m_longvarchar;
+ uchar* m_char;
+ uchar* m_varchar;
+ uchar* m_longvarchar;
};
+ bool m_null;
+ // construct
Val(const Col& col);
~Val();
void copy(const Val& val2);
void copy(const void* addr);
const void* dataaddr() const;
- bool m_null;
- int equal(Par par) const;
- int equal(Par par, const ICol& icol) const;
- int setval(Par par) const;
- void calc(Par par, unsigned i);
- void calckey(Par par, unsigned i);
- void calckeychars(Par par, unsigned i, unsigned& n, unsigned char* buf);
+ void calc(Par par, uint i);
+ void calckey(Par par, uint i);
+ void calckeychars(Par par, uint i, uint& n, uchar* buf);
void calcnokey(Par par);
- void calcnokeychars(Par par, unsigned& n, unsigned char* buf);
- int verify(Par par, const Val& val2) const;
+ void calcnokeychars(Par par, uint& n, uchar* buf);
+ // operations
+ int setval(Par par) const;
+ int setval(Par par, const ICol& icol) const;
+ // compare
int cmp(Par par, const Val& val2) const;
- int cmpchars(Par par, const unsigned char* buf1, unsigned len1, const unsigned char* buf2, unsigned len2) const;
+ int cmpchars(Par par, const uchar* buf1, uint len1, const uchar* buf2, uint len2) const;
+ int verify(Par par, const Val& val2) const;
private:
Val& operator=(const Val& val2);
};
@@ -1759,20 +1770,26 @@ private:
static NdbOut&
operator<<(NdbOut& out, const Val& val);
+// construct
+
Val::Val(const Col& col) :
m_col(col)
{
switch (col.m_type) {
case Col::Unsigned:
+ m_uint32 = 0x7e7e7e7e;
break;
case Col::Char:
- m_char = new unsigned char [col.m_bytelength];
+ m_char = new uchar [col.m_bytelength];
+ memset(m_char, 0x7e, col.m_bytelength);
break;
case Col::Varchar:
- m_varchar = new unsigned char [1 + col.m_bytelength];
+ m_varchar = new uchar [1 + col.m_bytelength];
+ memset(m_char, 0x7e, 1 + col.m_bytelength);
break;
case Col::Longvarchar:
- m_longvarchar = new unsigned char [2 + col.m_bytelength];
+ m_longvarchar = new uchar [2 + col.m_bytelength];
+ memset(m_char, 0x7e, 2 + col.m_bytelength);
break;
default:
assert(false);
@@ -1858,52 +1875,17 @@ Val::dataaddr() const
return 0;
}
-int
-Val::equal(Par par) const
-{
- Con& con = par.con();
- const Col& col = m_col;
- assert(col.m_pk && ! m_null);
- const char* addr = (const char*)dataaddr();
- LL5("equal [" << col << "] " << *this);
- CHK(con.equal(col.m_num, addr) == 0);
- return 0;
-}
-
-int
-Val::equal(Par par, const ICol& icol) const
-{
- Con& con = par.con();
- assert(! m_null);
- const char* addr = (const char*)dataaddr();
- LL5("equal [" << icol << "] " << *this);
- CHK(con.equal(icol.m_num, addr) == 0);
- return 0;
-}
-
-int
-Val::setval(Par par) const
-{
- Con& con = par.con();
- const Col& col = m_col;
- assert(! col.m_pk);
- const char* addr = ! m_null ? (const char*)dataaddr() : 0;
- LL5("setval [" << col << "] " << *this);
- CHK(con.setValue(col.m_num, addr) == 0);
- return 0;
-}
-
void
-Val::calc(Par par, unsigned i)
+Val::calc(Par par, uint i)
{
const Col& col = m_col;
col.m_pk ? calckey(par, i) : calcnokey(par);
- if (! m_null)
+ if (!m_null)
col.wellformed(dataaddr());
}
void
-Val::calckey(Par par, unsigned i)
+Val::calckey(Par par, uint i)
{
const Col& col = m_col;
m_null = false;
@@ -1915,7 +1897,7 @@ Val::calckey(Par par, unsigned i)
{
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
- unsigned n = 0;
+ uint n = 0;
calckeychars(par, i, n, m_char);
// extend by appropriate space
(*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
@@ -1923,7 +1905,7 @@ Val::calckey(Par par, unsigned i)
break;
case Col::Varchar:
{
- unsigned n = 0;
+ uint n = 0;
calckeychars(par, i, n, m_varchar + 1);
// set length and pad with nulls
m_varchar[0] = n;
@@ -1932,7 +1914,7 @@ Val::calckey(Par par, unsigned i)
break;
case Col::Longvarchar:
{
- unsigned n = 0;
+ uint n = 0;
calckeychars(par, i, n, m_longvarchar + 2);
// set length and pad with nulls
m_longvarchar[0] = (n & 0xff);
@@ -1947,13 +1929,13 @@ Val::calckey(Par par, unsigned i)
}
void
-Val::calckeychars(Par par, unsigned i, unsigned& n, unsigned char* buf)
+Val::calckeychars(Par par, uint i, uint& n, uchar* buf)
{
const Col& col = m_col;
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
n = 0;
- unsigned len = 0;
+ uint len = 0;
while (len < col.m_length) {
if (i % (1 + n) == 0) {
break;
@@ -1980,7 +1962,7 @@ Val::calcnokey(Par par)
if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0)
r = -r;
}
- unsigned v = par.m_range + r;
+ uint v = par.m_range + r;
switch (col.m_type) {
case Col::Unsigned:
m_uint32 = v;
@@ -1989,7 +1971,7 @@ Val::calcnokey(Par par)
{
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
- unsigned n = 0;
+ uint n = 0;
calcnokeychars(par, n, m_char);
// extend by appropriate space
(*cs->cset->fill)(cs, (char*)&m_char[n], col.m_bytelength - n, 0x20);
@@ -1997,7 +1979,7 @@ Val::calcnokey(Par par)
break;
case Col::Varchar:
{
- unsigned n = 0;
+ uint n = 0;
calcnokeychars(par, n, m_varchar + 1);
// set length and pad with nulls
m_varchar[0] = n;
@@ -2006,7 +1988,7 @@ Val::calcnokey(Par par)
break;
case Col::Longvarchar:
{
- unsigned n = 0;
+ uint n = 0;
calcnokeychars(par, n, m_longvarchar + 2);
// set length and pad with nulls
m_longvarchar[0] = (n & 0xff);
@@ -2021,24 +2003,24 @@ Val::calcnokey(Par par)
}
void
-Val::calcnokeychars(Par par, unsigned& n, unsigned char* buf)
+Val::calcnokeychars(Par par, uint& n, uchar* buf)
{
const Col& col = m_col;
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
n = 0;
- unsigned len = 0;
+ uint len = 0;
while (len < col.m_length) {
if (urandom(1 + col.m_bytelength) == 0) {
break;
}
- unsigned half = maxcharcount / 2;
+ uint half = maxcharcount / 2;
int r = irandom((par.m_pctrange * half) / 100);
if (par.m_bdir != 0 && urandom(10) != 0) {
if (r < 0 && par.m_bdir > 0 || r > 0 && par.m_bdir < 0)
r = -r;
}
- unsigned i = half + r;
+ uint i = half + r;
assert(i < maxcharcount);
const Chr& chr = chs->m_chr[i];
assert(n + chr.m_size <= col.m_bytelength);
@@ -2048,13 +2030,39 @@ Val::calcnokeychars(Par par, unsigned& n, unsigned char* buf)
}
}
+// operations
+
int
-Val::verify(Par par, const Val& val2) const
+Val::setval(Par par) const
{
- CHK(cmp(par, val2) == 0);
+ Con& con = par.con();
+ const Col& col = m_col;
+ if (col.m_pk) {
+ assert(!m_null);
+ const char* addr = (const char*)dataaddr();
+ LL5("setval pk [" << col << "] " << *this);
+ CHK(con.equal(col.m_num, addr) == 0);
+ } else {
+ const char* addr = !m_null ? (const char*)dataaddr() : 0;
+ LL5("setval non-pk [" << col << "] " << *this);
+ CHK(con.setValue(col.m_num, addr) == 0);
+ }
+ return 0;
+}
+
+int
+Val::setval(Par par, const ICol& icol) const
+{
+ Con& con = par.con();
+ assert(!m_null);
+ const char* addr = (const char*)dataaddr();
+ LL5("setval key [" << icol << "] " << *this);
+ CHK(con.equal(icol.m_num, addr) == 0);
return 0;
}
+// compare
+
int
Val::cmp(Par par, const Val& val2) const
{
@@ -2062,9 +2070,9 @@ Val::cmp(Par par, const Val& val2) const
const Col& col2 = val2.m_col;
assert(col.equal(col2));
if (m_null || val2.m_null) {
- if (! m_null)
+ if (!m_null)
return +1;
- if (! val2.m_null)
+ if (!val2.m_null)
return -1;
return 0;
}
@@ -2084,21 +2092,21 @@ Val::cmp(Par par, const Val& val2) const
break;
case Col::Char:
{
- unsigned len = col.m_bytelength;
+ uint len = col.m_bytelength;
return cmpchars(par, m_char, len, val2.m_char, len);
}
break;
case Col::Varchar:
{
- unsigned len1 = m_varchar[0];
- unsigned len2 = val2.m_varchar[0];
+ uint len1 = m_varchar[0];
+ uint len2 = val2.m_varchar[0];
return cmpchars(par, m_varchar + 1, len1, val2.m_varchar + 1, len2);
}
break;
case Col::Longvarchar:
{
- unsigned len1 = m_longvarchar[0] + (m_longvarchar[1] << 8);
- unsigned len2 = val2.m_longvarchar[0] + (val2.m_longvarchar[1] << 8);
+ uint len1 = m_longvarchar[0] + (m_longvarchar[1] << 8);
+ uint len2 = val2.m_longvarchar[0] + (val2.m_longvarchar[1] << 8);
return cmpchars(par, m_longvarchar + 2, len1, val2.m_longvarchar + 2, len2);
}
break;
@@ -2110,17 +2118,17 @@ Val::cmp(Par par, const Val& val2) const
}
int
-Val::cmpchars(Par par, const unsigned char* buf1, unsigned len1, const unsigned char* buf2, unsigned len2) const
+Val::cmpchars(Par par, const uchar* buf1, uint len1, const uchar* buf2, uint len2) const
{
const Col& col = m_col;
const Chs* chs = col.m_chs;
CHARSET_INFO* cs = chs->m_cs;
int k;
- if (! par.m_collsp) {
- unsigned char x1[maxxmulsize * 8000];
- unsigned char x2[maxxmulsize * 8000];
+ if (!par.m_collsp) {
+ uchar x1[maxxmulsize * 8000];
+ uchar x2[maxxmulsize * 8000];
// make strxfrm pad both to same length
- unsigned len = maxxmulsize * col.m_bytelength;
+ uint len = maxxmulsize * col.m_bytelength;
int n1 = NdbSqlUtil::strnxfrm_bug7284(cs, x1, chs->m_xmul * len, buf1, len1);
int n2 = NdbSqlUtil::strnxfrm_bug7284(cs, x2, chs->m_xmul * len, buf2, len2);
assert(n1 != -1 && n1 == n2);
@@ -2131,8 +2139,17 @@ Val::cmpchars(Par par, const unsigned char* buf1, unsigned len1, const unsigned
return k < 0 ? -1 : k > 0 ? +1 : 0;
}
+int
+Val::verify(Par par, const Val& val2) const
+{
+ CHK(cmp(par, val2) == 0);
+ return 0;
+}
+
+// print
+
static void
-printstring(NdbOut& out, const unsigned char* str, unsigned len, bool showlen)
+printstring(NdbOut& out, const uchar* str, uint len, bool showlen)
{
char buf[4 * 8000];
char *p = buf;
@@ -2141,12 +2158,12 @@ printstring(NdbOut& out, const unsigned char* str, unsigned len, bool showlen)
sprintf(p, "%u:", len);
p += strlen(p);
}
- for (unsigned i = 0; i < len; i++) {
- unsigned char c = str[i];
+ for (uint i = 0; i < len; i++) {
+ uchar c = str[i];
if (c == '\\') {
*p++ = '\\';
*p++ = c;
- } else if (0x20 <= c && c < 0x7e) {
+ } else if (0x20 <= c && c <= 0x7e) {
*p++ = c;
} else {
*p++ = '\\';
@@ -2173,19 +2190,19 @@ operator<<(NdbOut& out, const Val& val)
break;
case Col::Char:
{
- unsigned len = col.m_bytelength;
+ uint len = col.m_bytelength;
printstring(out, val.m_char, len, false);
}
break;
case Col::Varchar:
{
- unsigned len = val.m_varchar[0];
+ uint len = val.m_varchar[0];
printstring(out, val.m_varchar + 1, len, true);
}
break;
case Col::Longvarchar:
{
- unsigned len = val.m_longvarchar[0] + (val.m_longvarchar[1] << 8);
+ uint len = val.m_longvarchar[0] + (val.m_longvarchar[1] << 8);
printstring(out, val.m_longvarchar + 2, len, true);
}
break;
@@ -2202,16 +2219,36 @@ operator<<(NdbOut& out, const Val& val)
struct Row {
const Tab& m_tab;
Val** m_val;
- bool m_exist;
- enum Op { NoOp = 0, ReadOp = 1, InsOp = 2, UpdOp = 4, DelOp = 8, AnyOp = 15 };
- Op m_pending;
- Row* m_dbrow; // copy of db row before update
+ enum St {
+ StUndef = 0,
+ StDefine = 1,
+ StPrepare = 2,
+ StCommit = 3
+ };
+ enum Op {
+ OpNone = 0,
+ OpIns = 2,
+ OpUpd = 4,
+ OpDel = 8,
+ OpRead = 16,
+ OpReadEx = 32,
+ OpReadCom = 64,
+ OpDML = 2 | 4 | 8,
+ OpREAD = 16 | 32 | 64
+ };
+ St m_st;
+ Op m_op;
+ Uint64 m_txid;
+ Row* m_bi;
+ // construct
Row(const Tab& tab);
~Row();
- void copy(const Row& row2);
- void calc(Par par, unsigned i, unsigned mask = 0);
- const Row& dbrow() const;
- int verify(Par par, const Row& row2, bool pkonly) const;
+ void copy(const Row& row2, bool copy_bi);
+ void copyval(const Row& row2, uint colmask = ~0);
+ void calc(Par par, uint i, uint colmask = ~0);
+ // operations
+ int setval(Par par, uint colmask = ~0);
+ int setval(Par par, const ITab& itab);
int insrow(Par par);
int updrow(Par par);
int updrow(Par par, const ITab& itab);
@@ -2220,120 +2257,132 @@ struct Row {
int selrow(Par par);
int selrow(Par par, const ITab& itab);
int setrow(Par par);
+ // compare
int cmp(Par par, const Row& row2) const;
int cmp(Par par, const Row& row2, const ITab& itab) const;
+ int verify(Par par, const Row& row2, bool pkonly) const;
private:
Row& operator=(const Row& row2);
};
+static NdbOut&
+operator<<(NdbOut& out, const Row* rowp);
+
+static NdbOut&
+operator<<(NdbOut& out, const Row& row);
+
+// construct
+
Row::Row(const Tab& tab) :
m_tab(tab)
{
m_val = new Val* [tab.m_cols];
- for (unsigned k = 0; k < tab.m_cols; k++) {
+ for (uint k = 0; k < tab.m_cols; k++) {
const Col& col = *tab.m_col[k];
m_val[k] = new Val(col);
}
- m_exist = false;
- m_pending = NoOp;
- m_dbrow = 0;
+ m_st = StUndef;
+ m_op = OpNone;
+ m_txid = 0;
+ m_bi = 0;
}
Row::~Row()
{
const Tab& tab = m_tab;
- for (unsigned k = 0; k < tab.m_cols; k++) {
+ for (uint k = 0; k < tab.m_cols; k++) {
delete m_val[k];
}
delete [] m_val;
- delete m_dbrow;
+ delete m_bi;
}
void
-Row::copy(const Row& row2)
+Row::copy(const Row& row2, bool copy_bi)
+{
+ const Tab& tab = m_tab;
+ copyval(row2);
+ m_st = row2.m_st;
+ m_op = row2.m_op;
+ m_txid = row2.m_txid;
+ assert(m_bi == 0);
+ if (copy_bi && row2.m_bi != 0) {
+ m_bi = new Row(tab);
+ m_bi->copy(*row2.m_bi, copy_bi);
+ }
+}
+
+void
+Row::copyval(const Row& row2, uint colmask)
{
const Tab& tab = m_tab;
assert(&tab == &row2.m_tab);
- for (unsigned k = 0; k < tab.m_cols; k++) {
+ for (uint k = 0; k < tab.m_cols; k++) {
Val& val = *m_val[k];
const Val& val2 = *row2.m_val[k];
- val.copy(val2);
- }
- m_exist = row2.m_exist;
- m_pending = row2.m_pending;
- if (row2.m_dbrow == 0) {
- m_dbrow = 0;
- } else {
- assert(row2.m_dbrow->m_dbrow == 0);
- if (m_dbrow == 0)
- m_dbrow = new Row(tab);
- m_dbrow->copy(*row2.m_dbrow);
+ if ((1 << k) & colmask)
+ val.copy(val2);
}
}
void
-Row::calc(Par par, unsigned i, unsigned mask)
+Row::calc(Par par, uint i, uint colmask)
{
const Tab& tab = m_tab;
- for (unsigned k = 0; k < tab.m_cols; k++) {
- if (! (mask & (1 << k))) {
+ for (uint k = 0; k < tab.m_cols; k++) {
+ if ((1 << k) & colmask) {
Val& val = *m_val[k];
val.calc(par, i);
}
}
}
-const Row&
-Row::dbrow() const
-{
- if (m_dbrow == 0)
- return *this;
- assert(m_pending == Row::UpdOp || m_pending == Row::DelOp);
- return *m_dbrow;
-}
+// operations
int
-Row::verify(Par par, const Row& row2, bool pkonly) const
+Row::setval(Par par, uint colmask)
{
const Tab& tab = m_tab;
- const Row& row1 = *this;
- assert(&row1.m_tab == &row2.m_tab && row1.m_exist && row2.m_exist);
- for (unsigned k = 0; k < tab.m_cols; k++) {
- const Col& col = row1.m_val[k]->m_col;
- if (! pkonly || col.m_pk) {
- const Val& val1 = *row1.m_val[k];
- const Val& val2 = *row2.m_val[k];
- CHK(val1.verify(par, val2) == 0);
+ Rsq rsq(tab.m_cols);
+ for (uint k = 0; k < tab.m_cols; k++) {
+ uint k2 = rsq.next();
+ if ((1 << k2) & colmask) {
+ const Val& val = *m_val[k2];
+ CHK(val.setval(par) == 0);
}
}
return 0;
}
int
+Row::setval(Par par, const ITab& itab)
+{
+ Con& con = par.con();
+ Rsq rsq(itab.m_icols);
+ for (uint k = 0; k < itab.m_icols; k++) {
+ uint k2 = rsq.next();
+ const ICol& icol = *itab.m_icol[k2];
+ const Col& col = icol.m_col;
+ uint m = col.m_num;
+ const Val& val = *m_val[m];
+ CHK(val.setval(par, icol) == 0);
+ }
+ return 0;
+}
+
+int
Row::insrow(Par par)
{
Con& con = par.con();
const Tab& tab = m_tab;
- assert(! m_exist);
CHK(con.getNdbOperation(tab) == 0);
CHKCON(con.m_op->insertTuple() == 0, con);
- Rsq rsq1(tab.m_cols);
- for (unsigned k = 0; k < tab.m_cols; k++) {
- unsigned k2 = rsq1.next();
- const Val& val = *m_val[k2];
- const Col& col = val.m_col;
- if (col.m_pk)
- CHK(val.equal(par) == 0);
- }
- Rsq rsq2(tab.m_cols);
- for (unsigned k = 0; k < tab.m_cols; k++) {
- unsigned k2 = rsq2.next();
- const Val& val = *m_val[k2];
- const Col& col = val.m_col;
- if (! col.m_pk)
- CHK(val.setval(par) == 0);
- }
- m_pending = InsOp;
+ CHK(setval(par, tab.m_pkmask) == 0);
+ CHK(setval(par, ~tab.m_pkmask) == 0);
+ assert(m_st == StUndef);
+ m_st = StDefine;
+ m_op = OpIns;
+ m_txid = con.m_txid;
return 0;
}
@@ -2342,26 +2391,14 @@ Row::updrow(Par par)
{
Con& con = par.con();
const Tab& tab = m_tab;
- assert(m_exist);
CHK(con.getNdbOperation(tab) == 0);
CHKCON(con.m_op->updateTuple() == 0, con);
- Rsq rsq1(tab.m_cols);
- for (unsigned k = 0; k < tab.m_cols; k++) {
- unsigned k2 = rsq1.next();
- const Val& val = *m_val[k2];
- const Col& col = val.m_col;
- if (col.m_pk)
- CHK(val.equal(par) == 0);
- }
- Rsq rsq2(tab.m_cols);
- for (unsigned k = 0; k < tab.m_cols; k++) {
- unsigned k2 = rsq2.next();
- const Val& val = *m_val[k2];
- const Col& col = val.m_col;
- if (! col.m_pk)
- CHK(val.setval(par) == 0);
- }
- m_pending = UpdOp;
+ CHK(setval(par, tab.m_pkmask) == 0);
+ CHK(setval(par, ~tab.m_pkmask) == 0);
+ assert(m_st == StUndef);
+ m_st = StDefine;
+ m_op = OpUpd;
+ m_txid = con.m_txid;
return 0;
}
@@ -2371,27 +2408,14 @@ Row::updrow(Par par, const ITab& itab)
Con& con = par.con();
const Tab& tab = m_tab;
assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
- assert(m_exist);
CHK(con.getNdbIndexOperation(itab, tab) == 0);
CHKCON(con.m_op->updateTuple() == 0, con);
- Rsq rsq1(itab.m_icols);
- for (unsigned k = 0; k < itab.m_icols; k++) {
- unsigned k2 = rsq1.next();
- const ICol& icol = *itab.m_icol[k2];
- const Col& col = icol.m_col;
- unsigned m = col.m_num;
- const Val& val = *m_val[m];
- CHK(val.equal(par, icol) == 0);
- }
- Rsq rsq2(tab.m_cols);
- for (unsigned k = 0; k < tab.m_cols; k++) {
- unsigned k2 = rsq2.next();
- const Val& val = *m_val[k2];
- const Col& col = val.m_col;
- if (! col.m_pk)
- CHK(val.setval(par) == 0);
- }
- m_pending = UpdOp;
+ CHK(setval(par, itab) == 0);
+ CHK(setval(par, ~tab.m_pkmask) == 0);
+ assert(m_st == StUndef);
+ m_st = StDefine;
+ m_op = OpUpd;
+ m_txid = con.m_txid;
return 0;
}
@@ -2400,18 +2424,13 @@ Row::delrow(Par par)
{
Con& con = par.con();
const Tab& tab = m_tab;
- assert(m_exist);
CHK(con.getNdbOperation(m_tab) == 0);
CHKCON(con.m_op->deleteTuple() == 0, con);
- Rsq rsq1(tab.m_cols);
- for (unsigned k = 0; k < tab.m_cols; k++) {
- unsigned k2 = rsq1.next();
- const Val& val = *m_val[k2];
- const Col& col = val.m_col;
- if (col.m_pk)
- CHK(val.equal(par) == 0);
- }
- m_pending = DelOp;
+ CHK(setval(par, tab.m_pkmask) == 0);
+ assert(m_st == StUndef);
+ m_st = StDefine;
+ m_op = OpDel;
+ m_txid = con.m_txid;
return 0;
}
@@ -2421,19 +2440,13 @@ Row::delrow(Par par, const ITab& itab)
Con& con = par.con();
const Tab& tab = m_tab;
assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
- assert(m_exist);
CHK(con.getNdbIndexOperation(itab, tab) == 0);
CHKCON(con.m_op->deleteTuple() == 0, con);
- Rsq rsq1(itab.m_icols);
- for (unsigned k = 0; k < itab.m_icols; k++) {
- unsigned k2 = rsq1.next();
- const ICol& icol = *itab.m_icol[k2];
- const Col& col = icol.m_col;
- unsigned m = col.m_num;
- const Val& val = *m_val[m];
- CHK(val.equal(par, icol) == 0);
- }
- m_pending = DelOp;
+ CHK(setval(par, itab) == 0);
+ assert(m_st == StUndef);
+ m_st = StDefine;
+ m_op = OpDel;
+ m_txid = con.m_txid;
return 0;
}
@@ -2443,15 +2456,9 @@ Row::selrow(Par par)
Con& con = par.con();
const Tab& tab = m_tab;
CHK(con.getNdbOperation(m_tab) == 0);
- CHKCON(con.m_op->readTuple() == 0, con);
- Rsq rsq1(tab.m_cols);
- for (unsigned k = 0; k < tab.m_cols; k++) {
- unsigned k2 = rsq1.next();
- const Val& val = *m_val[k2];
- const Col& col = val.m_col;
- if (col.m_pk)
- CHK(val.equal(par) == 0);
- }
+ CHKCON(con.readTuple(par) == 0, con);
+ CHK(setval(par, tab.m_pkmask) == 0);
+ // TODO state
return 0;
}
@@ -2462,16 +2469,9 @@ Row::selrow(Par par, const ITab& itab)
const Tab& tab = m_tab;
assert(itab.m_type == ITab::UniqueHashIndex && &itab.m_tab == &tab);
CHK(con.getNdbIndexOperation(itab, tab) == 0);
- CHKCON(con.m_op->readTuple() == 0, con);
- Rsq rsq1(itab.m_icols);
- for (unsigned k = 0; k < itab.m_icols; k++) {
- unsigned k2 = rsq1.next();
- const ICol& icol = *itab.m_icol[k2];
- const Col& col = icol.m_col;
- unsigned m = col.m_num;
- const Val& val = *m_val[m];
- CHK(val.equal(par, icol) == 0);
- }
+ CHKCON(con.readTuple(par) == 0, con);
+ CHK(setval(par, itab) == 0);
+ // TODO state
return 0;
}
@@ -2480,25 +2480,23 @@ Row::setrow(Par par)
{
Con& con = par.con();
const Tab& tab = m_tab;
- Rsq rsq1(tab.m_cols);
- for (unsigned k = 0; k < tab.m_cols; k++) {
- unsigned k2 = rsq1.next();
- const Val& val = *m_val[k2];
- const Col& col = val.m_col;
- if (! col.m_pk)
- CHK(val.setval(par) == 0);
- }
- m_pending = UpdOp;
+ CHK(setval(par, ~tab.m_pkmask) == 0);
+ assert(m_st == StUndef);
+ m_st = StDefine;
+ m_op = OpUpd;
+ m_txid = con.m_txid;
return 0;
}
+// compare
+
int
Row::cmp(Par par, const Row& row2) const
{
const Tab& tab = m_tab;
assert(&tab == &row2.m_tab);
int c = 0;
- for (unsigned k = 0; k < tab.m_cols; k++) {
+ for (uint k = 0; k < tab.m_cols; k++) {
const Val& val = *m_val[k];
const Val& val2 = *row2.m_val[k];
if ((c = val.cmp(par, val2)) != 0)
@@ -2512,10 +2510,10 @@ Row::cmp(Par par, const Row& row2, const ITab& itab) const
{
const Tab& tab = m_tab;
int c = 0;
- for (unsigned i = 0; i < itab.m_icols; i++) {
+ for (uint i = 0; i < itab.m_icols; i++) {
const ICol& icol = *itab.m_icol[i];
const Col& col = icol.m_col;
- unsigned k = col.m_num;
+ uint k = col.m_num;
assert(k < tab.m_cols);
const Val& val = *m_val[k];
const Val& val2 = *row2.m_val[k];
@@ -2525,19 +2523,70 @@ Row::cmp(Par par, const Row& row2, const ITab& itab) const
return c;
}
+int
+Row::verify(Par par, const Row& row2, bool pkonly) const
+{
+ const Tab& tab = m_tab;
+ const Row& row1 = *this;
+ assert(&row1.m_tab == &row2.m_tab);
+ for (uint k = 0; k < tab.m_cols; k++) {
+ const Col& col = row1.m_val[k]->m_col;
+ if (!pkonly || col.m_pk) {
+ const Val& val1 = *row1.m_val[k];
+ const Val& val2 = *row2.m_val[k];
+ CHK(val1.verify(par, val2) == 0);
+ }
+ }
+ return 0;
+}
+
+// print
+
+static NdbOut&
+operator<<(NdbOut& out, const Row::St st)
+{
+ if (st == Row::StUndef)
+ out << "StUndef";
+ else if (st == Row::StDefine)
+ out << "StDefine";
+ else if (st == Row::StPrepare)
+ out << "StPrepare";
+ else if (st == Row::StCommit)
+ out << "StCommit";
+ else
+ out << "st=" << st;
+ return out;
+}
+
static NdbOut&
operator<<(NdbOut& out, const Row::Op op)
{
- if (op == Row::NoOp)
- out << "NoOp";
- else if (op == Row::InsOp)
- out << "InsOp";
- else if (op == Row::UpdOp)
- out << "UpdOp";
- else if (op == Row::DelOp)
- out << "DelOp";
+ if (op == Row::OpNone)
+ out << "OpNone";
+ else if (op == Row::OpIns)
+ out << "OpIns";
+ else if (op == Row::OpUpd)
+ out << "OpUpd";
+ else if (op == Row::OpDel)
+ out << "OpDel";
+ else if (op == Row::OpRead)
+ out << "OpRead";
+ else if (op == Row::OpReadEx)
+ out << "OpReadEx";
+ else if (op == Row::OpReadCom)
+ out << "OpReadCom";
else
- out << op;
+ out << "op=" << op;
+ return out;
+}
+
+static NdbOut&
+operator<<(NdbOut& out, const Row* rowp)
+{
+ if (rowp == 0)
+ out << "[null]";
+ else
+ out << *rowp;
return out;
}
@@ -2545,26 +2594,18 @@ static NdbOut&
operator<<(NdbOut& out, const Row& row)
{
const Tab& tab = row.m_tab;
- for (unsigned i = 0; i < tab.m_cols; i++) {
+ out << "[";
+ for (uint i = 0; i < tab.m_cols; i++) {
if (i > 0)
out << " ";
out << *row.m_val[i];
}
- out << " exist=" << row.m_exist;
- if (row.m_pending)
- out << " pending=" << row.m_pending;
- if (row.m_dbrow != 0)
- out << " [dbrow=" << *row.m_dbrow << "]";
- return out;
-}
-
-static NdbOut&
-operator<<(NdbOut& out, const Row* rowptr)
-{
- if (rowptr == 0)
- out << "null";
- else
- out << *rowptr;
+ out << " " << row.m_st;
+ out << " " << row.m_op;
+ out << " " << HEX(row.m_txid);
+ if (row.m_bi != 0)
+ out << " " << row.m_bi;
+ out << "]";
return out;
}
@@ -2572,45 +2613,38 @@ operator<<(NdbOut& out, const Row* rowptr)
struct Set {
const Tab& m_tab;
- unsigned m_rows;
+ uint m_rows;
Row** m_row;
- unsigned* m_rowkey; // maps row number (from 0) in scan to tuple key
+ uint* m_rowkey; // maps row number (from 0) in scan to tuple key
Row* m_keyrow;
NdbRecAttr** m_rec;
- Set(const Tab& tab, unsigned rows);
+ // construct
+ Set(const Tab& tab, uint rows);
~Set();
void reset();
- unsigned count() const;
- // old and new values
- bool exist(unsigned i) const;
- void dbsave(unsigned i);
- void calc(Par par, unsigned i, unsigned mask = 0);
- bool pending(unsigned i, unsigned mask) const;
- void notpending(unsigned i, ExecType et = Commit);
- void notpending(const Lst& lst, ExecType et = Commit);
- void dbdiscard(unsigned i);
- void dbdiscard(const Lst& lst);
- const Row& dbrow(unsigned i) const;
+ bool compat(Par par, uint i, const Row::Op op) const;
+ void push(uint i);
+ void copyval(uint i, uint colmask = ~0); // from bi
+ void calc(Par par, uint i, uint colmask = ~0);
+ uint count() const;
+ const Row* getrow(uint i, bool dirty = false) const;
+ // transaction
+ void post(Par par, ExecType et);
// operations
- int insrow(Par par, unsigned i);
- int updrow(Par par, unsigned i);
- int updrow(Par par, const ITab& itab, unsigned i);
- int delrow(Par par, unsigned i);
- int delrow(Par par, const ITab& itab, unsigned i);
+ int insrow(Par par, uint i);
+ int updrow(Par par, uint i);
+ int updrow(Par par, const ITab& itab, uint i);
+ int delrow(Par par, uint i);
+ int delrow(Par par, const ITab& itab, uint i);
int selrow(Par par, const Row& keyrow);
int selrow(Par par, const ITab& itab, const Row& keyrow);
- // set and get
- void setkey(Par par, const Row& keyrow);
- void setkey(Par par, const ITab& itab, const Row& keyrow);
- int setrow(Par par, unsigned i);
+ int setrow(Par par, uint i);
int getval(Par par);
- int getkey(Par par, unsigned* i);
- int putval(unsigned i, bool force, unsigned n = ~0);
- // sort rows in-place according to ordered index
+ int getkey(Par par, uint* i);
+ int putval(uint i, bool force, uint n = ~0);
+ // compare
void sort(Par par, const ITab& itab);
- void sort(Par par, const ITab& itab, unsigned lo, unsigned hi);
- // verify
- int verify(Par par, const Set& set2, bool pkonly) const;
+ int verify(Par par, const Set& set2, bool pkonly, bool dirty = false) const;
int verifyorder(Par par, const ITab& itab, bool descending) const;
// protect structure
NdbMutex* m_mutex;
@@ -2621,26 +2655,27 @@ struct Set {
NdbMutex_Unlock(m_mutex);
}
private:
+ void sort(Par par, const ITab& itab, uint lo, uint hi);
Set& operator=(const Set& set2);
};
-Set::Set(const Tab& tab, unsigned rows) :
+// construct
+
+Set::Set(const Tab& tab, uint rows) :
m_tab(tab)
{
m_rows = rows;
m_row = new Row* [m_rows];
- for (unsigned i = 0; i < m_rows; i++) {
- // allocate on need to save space
+ for (uint i = 0; i < m_rows; i++) {
m_row[i] = 0;
}
- m_rowkey = new unsigned [m_rows];
- for (unsigned n = 0; n < m_rows; n++) {
- // initialize to null
+ m_rowkey = new uint [m_rows];
+ for (uint n = 0; n < m_rows; n++) {
m_rowkey[n] = ~0;
}
m_keyrow = new Row(tab);
m_rec = new NdbRecAttr* [tab.m_cols];
- for (unsigned k = 0; k < tab.m_cols; k++) {
+ for (uint k = 0; k < tab.m_cols; k++) {
m_rec[k] = 0;
}
m_mutex = NdbMutex_Create();
@@ -2649,7 +2684,7 @@ Set::Set(const Tab& tab, unsigned rows) :
Set::~Set()
{
- for (unsigned i = 0; i < m_rows; i++) {
+ for (uint i = 0; i < m_rows; i++) {
delete m_row[i];
}
delete [] m_row;
@@ -2662,132 +2697,203 @@ Set::~Set()
void
Set::reset()
{
- for (unsigned i = 0; i < m_rows; i++) {
- if (m_row[i] != 0) {
- Row& row = *m_row[i];
- row.m_exist = false;
- }
- }
-}
-
-unsigned
-Set::count() const
-{
- unsigned count = 0;
- for (unsigned i = 0; i < m_rows; i++) {
- if (m_row[i] != 0) {
- Row& row = *m_row[i];
- if (row.m_exist)
- count++;
- }
+ for (uint i = 0; i < m_rows; i++) {
+ m_row[i] = 0;
}
- return count;
}
-// old and new values
-
+// this sucks
bool
-Set::exist(unsigned i) const
+Set::compat(Par par, uint i, const Row::Op op) const
{
- assert(i < m_rows);
- if (m_row[i] == 0) // not allocated => not exist
- return false;
- return m_row[i]->m_exist;
+ Con& con = par.con();
+ int ret = -1;
+ int place = 0;
+ do {
+ const Row* rowp = getrow(i);
+ if (rowp == 0) {
+ ret = op == Row::OpIns;
+ place = 1;
+ break;
+ }
+ const Row& row = *rowp;
+ if (!(op & Row::OpREAD)) {
+ if (row.m_st == Row::StDefine || row.m_st == Row::StPrepare) {
+ assert(row.m_op & Row::OpDML);
+ assert(row.m_txid != 0);
+ if (con.m_txid != row.m_txid) {
+ ret = false;
+ place = 2;
+ break;
+ }
+ if (row.m_op != Row::OpDel) {
+ ret = op == Row::OpUpd || op == Row::OpDel;
+ place = 3;
+ break;
+ }
+ ret = op == Row::OpIns;
+ place = 4;
+ break;
+ }
+ if (row.m_st == Row::StCommit) {
+ assert(row.m_op == Row::OpNone);
+ assert(row.m_txid == 0);
+ ret = op == Row::OpUpd || op == Row::OpDel;
+ place = 5;
+ break;
+ }
+ }
+ if (op & Row::OpREAD) {
+ bool dirty =
+ con.m_txid != row.m_txid &&
+ par.m_lockmode == NdbOperation::LM_CommittedRead;
+ const Row* rowp2 = getrow(i, dirty);
+ if (rowp2 == 0 || rowp2->m_op == Row::OpDel) {
+ ret = false;
+ place = 6;
+ break;
+ }
+ ret = true;
+ place = 7;
+ break;
+ }
+ } while (0);
+ LL4("compat ret=" << ret << " place=" << place);
+ assert(ret == false || ret == true);
+ return ret;
}
void
-Set::dbsave(unsigned i)
+Set::push(uint i)
{
const Tab& tab = m_tab;
- assert(i < m_rows && m_row[i] != 0);
+ assert(i < m_rows);
+ Row* bi = m_row[i];
+ m_row[i] = new Row(tab);
Row& row = *m_row[i];
- LL5("dbsave " << i << ": " << row);
- assert(row.m_exist && ! row.m_pending && row.m_dbrow == 0);
- // could swap pointers but making copy is safer
- Row* rowptr = new Row(tab);
- rowptr->copy(row);
- row.m_dbrow = rowptr;
+ row.m_bi = bi;
+ if (bi != 0)
+ row.copyval(*bi);
}
void
-Set::calc(Par par, unsigned i, unsigned mask)
+Set::copyval(uint i, uint colmask)
{
- const Tab& tab = m_tab;
- if (m_row[i] == 0)
- m_row[i] = new Row(tab);
+ assert(m_row[i] != 0);
Row& row = *m_row[i];
- row.calc(par, i, mask);
-}
-
-bool
-Set::pending(unsigned i, unsigned mask) const
-{
- assert(i < m_rows);
- if (m_row[i] == 0) // not allocated => not pending
- return Row::NoOp;
- return m_row[i]->m_pending & mask;
+ assert(row.m_bi != 0);
+ row.copyval(*row.m_bi, colmask);
}
void
-Set::notpending(unsigned i, ExecType et)
+Set::calc(Par par, uint i, uint colmask)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
- if (et == Commit) {
- if (row.m_pending == Row::InsOp)
- row.m_exist = true;
- if (row.m_pending == Row::DelOp)
- row.m_exist = false;
- } else {
- if (row.m_pending == Row::InsOp)
- row.m_exist = false;
- if (row.m_pending == Row::DelOp)
- row.m_exist = true;
- }
- row.m_pending = Row::NoOp;
+ row.calc(par, i, colmask);
}
-void
-Set::notpending(const Lst& lst, ExecType et)
+uint
+Set::count() const
{
- for (unsigned j = 0; j < lst.m_cnt; j++) {
- unsigned i = lst.m_arr[j];
- notpending(i, et);
+ uint count = 0;
+ for (uint i = 0; i < m_rows; i++) {
+ if (m_row[i] != 0)
+ count++;
}
+ return count;
}
-void
-Set::dbdiscard(unsigned i)
+const Row*
+Set::getrow(uint i, bool dirty) const
{
- assert(m_row[i] != 0);
- Row& row = *m_row[i];
- LL5("dbdiscard " << i << ": " << row);
- assert(row.m_dbrow != 0);
- delete row.m_dbrow;
- row.m_dbrow = 0;
+ assert(i < m_rows);
+ const Row* rowp = m_row[i];
+ if (dirty) {
+ while (rowp != 0) {
+ bool b1 = rowp->m_op == Row::OpNone;
+ bool b2 = rowp->m_st == Row::StCommit;
+ assert(b1 == b2);
+ if (b1) {
+ assert(rowp->m_bi == 0);
+ break;
+ }
+ rowp = rowp->m_bi;
+ }
+ }
+ return rowp;
}
-const Row&
-Set::dbrow(unsigned i) const
-{
- assert(m_row[i] != 0);
- Row& row = *m_row[i];
- return row.dbrow();
-}
+// transaction
void
-Set::dbdiscard(const Lst& lst)
+Set::post(Par par, ExecType et)
{
- for (unsigned j = 0; j < lst.m_cnt; j++) {
- unsigned i = lst.m_arr[j];
- dbdiscard(i);
+ LL4("post");
+ Con& con = par.con();
+ assert(con.m_txid != 0);
+ uint i;
+ for (i = 0; i < m_rows; i++) {
+ Row* rowp = m_row[i];
+ if (rowp == 0) {
+ LL5("skip " << i << " " << rowp);
+ continue;
+ }
+ if (rowp->m_st == Row::StCommit) {
+ assert(rowp->m_op == Row::OpNone);
+ assert(rowp->m_txid == 0);
+ assert(rowp->m_bi == 0);
+ LL5("skip committed " << i << " " << rowp);
+ continue;
+ }
+ assert(rowp->m_st == Row::StDefine || rowp->m_st == Row::StPrepare);
+ assert(rowp->m_txid != 0);
+ if (con.m_txid != rowp->m_txid) {
+ LL5("skip txid " << i << " " << HEX(con.m_txid) << " " << rowp);
+ continue;
+ }
+ // TODO read ops
+ assert(rowp->m_op & Row::OpDML);
+ LL4("post BEFORE " << rowp);
+ if (et == NoCommit) {
+ if (rowp->m_st == Row::StDefine) {
+ rowp->m_st = Row::StPrepare;
+ Row* bi = rowp->m_bi;
+ while (bi != 0 && bi->m_st == Row::StDefine) {
+ bi->m_st = Row::StPrepare;
+ bi = bi->m_bi;
+ }
+ }
+ } else if (et == Commit) {
+ if (rowp->m_op != Row::OpDel) {
+ rowp->m_st = Row::StCommit;
+ rowp->m_op = Row::OpNone;
+ rowp->m_txid = 0;
+ delete rowp->m_bi;
+ rowp->m_bi = 0;
+ } else {
+ delete rowp;
+ rowp = 0;
+ }
+ } else if (et == Rollback) {
+ while (rowp != 0 && rowp->m_st != Row::StCommit) {
+ Row* tmp = rowp;
+ rowp = rowp->m_bi;
+ tmp->m_bi = 0;
+ delete tmp;
+ }
+ } else {
+ assert(false);
+ }
+ m_row[i] = rowp;
+ LL4("post AFTER " << rowp);
}
}
// operations
int
-Set::insrow(Par par, unsigned i)
+Set::insrow(Par par, uint i)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
@@ -2796,7 +2902,7 @@ Set::insrow(Par par, unsigned i)
}
int
-Set::updrow(Par par, unsigned i)
+Set::updrow(Par par, uint i)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
@@ -2805,7 +2911,7 @@ Set::updrow(Par par, unsigned i)
}
int
-Set::updrow(Par par, const ITab& itab, unsigned i)
+Set::updrow(Par par, const ITab& itab, uint i)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
@@ -2814,7 +2920,7 @@ Set::updrow(Par par, const ITab& itab, unsigned i)
}
int
-Set::delrow(Par par, unsigned i)
+Set::delrow(Par par, uint i)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
@@ -2823,7 +2929,7 @@ Set::delrow(Par par, unsigned i)
}
int
-Set::delrow(Par par, const ITab& itab, unsigned i)
+Set::delrow(Par par, const ITab& itab, uint i)
{
assert(m_row[i] != 0);
Row& row = *m_row[i];
@@ -2836,8 +2942,8 @@ Set::selrow(Par par, const Row& keyrow)
{
Con& con = par.con();
const Tab& tab = par.tab();
- setkey(par, keyrow);
- LL5("selrow " << tab.m_name << ": keyrow: " << keyrow);
+ LL5("selrow " << tab.m_name << " keyrow " << keyrow);
+ m_keyrow->copyval(keyrow, tab.m_pkmask);
CHK(m_keyrow->selrow(par) == 0);
CHK(getval(par) == 0);
return 0;
@@ -2847,45 +2953,15 @@ int
Set::selrow(Par par, const ITab& itab, const Row& keyrow)
{
Con& con = par.con();
- setkey(par, itab, keyrow);
- LL5("selrow " << itab.m_name << ": keyrow: " << keyrow);
+ LL5("selrow " << itab.m_name << " keyrow " << keyrow);
+ m_keyrow->copyval(keyrow, itab.m_keymask);
CHK(m_keyrow->selrow(par, itab) == 0);
CHK(getval(par) == 0);
return 0;
}
-// set and get
-
-void
-Set::setkey(Par par, const Row& keyrow)
-{
- const Tab& tab = m_tab;
- for (unsigned k = 0; k < tab.m_cols; k++) {
- const Col& col = *tab.m_col[k];
- if (col.m_pk) {
- Val& val1 = *m_keyrow->m_val[k];
- const Val& val2 = *keyrow.dbrow().m_val[k];
- val1.copy(val2);
- }
- }
-}
-
-void
-Set::setkey(Par par, const ITab& itab, const Row& keyrow)
-{
- const Tab& tab = m_tab;
- for (unsigned k = 0; k < itab.m_icols; k++) {
- const ICol& icol = *itab.m_icol[k];
- const Col& col = icol.m_col;
- unsigned m = col.m_num;
- Val& val1 = *m_keyrow->m_val[m];
- const Val& val2 = *keyrow.dbrow().m_val[m];
- val1.copy(val2);
- }
-}
-
int
-Set::setrow(Par par, unsigned i)
+Set::setrow(Par par, uint i)
{
Con& con = par.con();
assert(m_row[i] != 0);
@@ -2899,18 +2975,18 @@ Set::getval(Par par)
Con& con = par.con();
const Tab& tab = m_tab;
Rsq rsq1(tab.m_cols);
- for (unsigned k = 0; k < tab.m_cols; k++) {
- unsigned k2 = rsq1.next();
+ for (uint k = 0; k < tab.m_cols; k++) {
+ uint k2 = rsq1.next();
CHK(con.getValue(k2, m_rec[k2]) == 0);
}
return 0;
}
int
-Set::getkey(Par par, unsigned* i)
+Set::getkey(Par par, uint* i)
{
const Tab& tab = m_tab;
- unsigned k = tab.m_keycol;
+ uint k = tab.m_keycol;
assert(m_rec[k] != 0);
const char* aRef = m_rec[k]->aRef();
Uint32 key = *(const Uint32*)aRef;
@@ -2921,14 +2997,18 @@ Set::getkey(Par par, unsigned* i)
}
int
-Set::putval(unsigned i, bool force, unsigned n)
+Set::putval(uint i, bool force, uint n)
{
const Tab& tab = m_tab;
- if (m_row[i] == 0)
- m_row[i] = new Row(tab);
+ LL4("putval key=" << i << " row=" << n << " old=" << m_row[i]);
+ if (m_row[i] != 0) {
+ assert(force);
+ delete m_row[i];
+ m_row[i] = 0;
+ }
+ m_row[i] = new Row(tab);
Row& row = *m_row[i];
- CHK(! row.m_exist || force);
- for (unsigned k = 0; k < tab.m_cols; k++) {
+ for (uint k = 0; k < tab.m_cols; k++) {
Val& val = *row.m_val[k];
NdbRecAttr* rec = m_rec[k];
assert(rec != 0);
@@ -2940,13 +3020,13 @@ Set::putval(unsigned i, bool force, unsigned n)
val.copy(aRef);
val.m_null = false;
}
- if (! row.m_exist)
- row.m_exist = true;
if (n != ~0)
m_rowkey[n] = i;
return 0;
}
+// compare
+
void
Set::sort(Par par, const ITab& itab)
{
@@ -2955,12 +3035,12 @@ Set::sort(Par par, const ITab& itab)
}
void
-Set::sort(Par par, const ITab& itab, unsigned lo, unsigned hi)
+Set::sort(Par par, const ITab& itab, uint lo, uint hi)
{
assert(lo < m_rows && hi < m_rows && lo <= hi);
Row* const p = m_row[lo];
- unsigned i = lo;
- unsigned j = hi;
+ uint i = lo;
+ uint j = hi;
while (i < j) {
while (i < j && m_row[j]->cmp(par, *p, itab) >= 0)
j--;
@@ -2982,22 +3062,48 @@ Set::sort(Par par, const ITab& itab, unsigned lo, unsigned hi)
sort(par, itab, i + 1, hi);
}
+/*
+ * set1 (self) is from dml and can contain un-committed operations.
+ * set2 is from read and contains no operations. "dirty" applies
+ * to set1: false = use latest row, true = use committed row.
+ */
int
-Set::verify(Par par, const Set& set2, bool pkonly) const
-{
- assert(&m_tab == &set2.m_tab && m_rows == set2.m_rows);
- LL4("verify set1 count=" << count() << " vs set2 count=" << set2.count());
- for (unsigned i = 0; i < m_rows; i++) {
+Set::verify(Par par, const Set& set2, bool pkonly, bool dirty) const
+{
+ const Set& set1 = *this;
+ assert(&set1.m_tab == &set2.m_tab && set1.m_rows == set2.m_rows);
+ LL3("verify dirty:" << dirty);
+ for (uint i = 0; i < set1.m_rows; i++) {
+ // the row versions we actually compare
+ const Row* row1p = set1.getrow(i, dirty);
+ const Row* row2p = set2.getrow(i);
bool ok = true;
- if (exist(i) != set2.exist(i)) {
- ok = false;
- } else if (exist(i)) {
- if (dbrow(i).verify(par, set2.dbrow(i), pkonly) != 0)
+ int place = 0;
+ if (row1p == 0) {
+ if (row2p != 0) {
+ ok = false;
+ place = 1;
+ }
+ } else {
+ Row::Op op1 = row1p->m_op;
+ if (op1 != Row::OpDel) {
+ if (row2p == 0) {
+ ok = false;
+ place = 2;
+ } else if (row1p->verify(par, *row2p, pkonly) == -1) {
+ ok = false;
+ place = 3;
+ }
+ } else if (row2p != 0) {
ok = false;
+ place = 4;
+ }
}
- if (! ok) {
- LL1("verify failed: key=" << i << " row1=" << m_row[i] << " row2=" << set2.m_row[i]);
- CHK(0 == 1);
+ if (!ok) {
+ LL1("verify " << i << " failed at " << place);
+ LL1("row1 " << row1p);
+ LL1("row2 " << row2p);
+ CHK(false);
}
}
return 0;
@@ -3007,18 +3113,17 @@ int
Set::verifyorder(Par par, const ITab& itab, bool descending) const
{
const Tab& tab = m_tab;
- for (unsigned n = 0; n < m_rows; n++) {
- unsigned i2 = m_rowkey[n];
+ for (uint n = 0; n < m_rows; n++) {
+ uint i2 = m_rowkey[n];
if (i2 == ~0)
break;
if (n == 0)
continue;
- unsigned i1 = m_rowkey[n - 1];
- assert(i1 < m_rows && i2 < m_rows);
+ uint i1 = m_rowkey[n - 1];
+ assert(m_row[i1] != 0 && m_row[i2] != 0);
const Row& row1 = *m_row[i1];
const Row& row2 = *m_row[i2];
- assert(row1.m_exist && row2.m_exist);
- if (! descending)
+ if (!descending)
CHK(row1.cmp(par, row2, itab) <= 0);
else
CHK(row1.cmp(par, row2, itab) >= 0);
@@ -3026,10 +3131,12 @@ Set::verifyorder(Par par, const ITab& itab, bool descending) const
return 0;
}
+// print
+
static NdbOut&
operator<<(NdbOut& out, const Set& set)
{
- for (unsigned i = 0; i < set.m_rows; i++) {
+ for (uint i = 0; i < set.m_rows; i++) {
const Row& row = *set.m_row[i];
if (i > 0)
out << endl;
@@ -3058,8 +3165,8 @@ int
BVal::setbnd(Par par) const
{
Con& con = par.con();
- assert(g_compare_null || ! m_null);
- const char* addr = ! m_null ? (const char*)dataaddr() : 0;
+ assert(g_compare_null || !m_null);
+ const char* addr = !m_null ? (const char*)dataaddr() : 0;
const ICol& icol = m_icol;
CHK(con.setBound(icol.m_num, m_type, addr) == 0);
return 0;
@@ -3068,7 +3175,7 @@ BVal::setbnd(Par par) const
int
BVal::setflt(Par par) const
{
- static unsigned index_bound_to_filter_bound[5] = {
+ static uint index_bound_to_filter_bound[5] = {
NdbScanFilter::COND_GE,
NdbScanFilter::COND_GT,
NdbScanFilter::COND_LE,
@@ -3076,12 +3183,12 @@ BVal::setflt(Par par) const
NdbScanFilter::COND_EQ
};
Con& con = par.con();
- assert(g_compare_null || ! m_null);
- const char* addr = ! m_null ? (const char*)dataaddr() : 0;
+ assert(g_compare_null || !m_null);
+ const char* addr = !m_null ? (const char*)dataaddr() : 0;
const ICol& icol = m_icol;
const Col& col = icol.m_col;
- unsigned length = col.m_bytesize;
- unsigned cond = index_bound_to_filter_bound[m_type];
+ uint length = col.m_bytesize;
+ uint cond = index_bound_to_filter_bound[m_type];
CHK(con.setFilter(col.m_num, cond, addr, length) == 0);
return 0;
}
@@ -3104,27 +3211,27 @@ operator<<(NdbOut& out, const BVal& bval)
struct BSet {
const Tab& m_tab;
const ITab& m_itab;
- unsigned m_alloc;
- unsigned m_bvals;
+ uint m_alloc;
+ uint m_bvals;
BVal** m_bval;
- BSet(const Tab& tab, const ITab& itab, unsigned rows);
+ BSet(const Tab& tab, const ITab& itab);
~BSet();
void reset();
void calc(Par par);
- void calcpk(Par par, unsigned i);
+ void calcpk(Par par, uint i);
int setbnd(Par par) const;
int setflt(Par par) const;
void filter(Par par, const Set& set, Set& set2) const;
};
-BSet::BSet(const Tab& tab, const ITab& itab, unsigned rows) :
+BSet::BSet(const Tab& tab, const ITab& itab) :
m_tab(tab),
m_itab(itab),
m_alloc(2 * itab.m_icols),
m_bvals(0)
{
m_bval = new BVal* [m_alloc];
- for (unsigned i = 0; i < m_alloc; i++) {
+ for (uint i = 0; i < m_alloc; i++) {
m_bval[i] = 0;
}
}
@@ -3138,7 +3245,7 @@ void
BSet::reset()
{
while (m_bvals > 0) {
- unsigned i = --m_bvals;
+ uint i = --m_bvals;
delete m_bval[i];
m_bval[i] = 0;
}
@@ -3150,10 +3257,10 @@ BSet::calc(Par par)
const ITab& itab = m_itab;
par.m_pctrange = par.m_pctbrange;
reset();
- for (unsigned k = 0; k < itab.m_icols; k++) {
+ for (uint k = 0; k < itab.m_icols; k++) {
const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
- for (unsigned i = 0; i <= 1; i++) {
+ for (uint i = 0; i <= 1; i++) {
if (m_bvals == 0 && urandom(100) == 0)
return;
if (m_bvals != 0 && urandom(3) == 0)
@@ -3162,7 +3269,7 @@ BSet::calc(Par par)
BVal& bval = *new BVal(icol);
m_bval[m_bvals++] = &bval;
bval.m_null = false;
- unsigned sel;
+ uint sel;
do {
// equality bound only on i==0
sel = urandom(5 - i);
@@ -3175,7 +3282,7 @@ BSet::calc(Par par)
bval.m_type = 4;
if (k + 1 < itab.m_icols)
bval.m_type = 4;
- if (! g_compare_null)
+ if (!g_compare_null)
par.m_pctnull = 0;
if (bval.m_type == 0 || bval.m_type == 1)
par.m_bdir = -1;
@@ -3199,11 +3306,11 @@ BSet::calc(Par par)
}
void
-BSet::calcpk(Par par, unsigned i)
+BSet::calcpk(Par par, uint i)
{
const ITab& itab = m_itab;
reset();
- for (unsigned k = 0; k < itab.m_icols; k++) {
+ for (uint k = 0; k < itab.m_icols; k++) {
const ICol& icol = *itab.m_icol[k];
const Col& col = icol.m_col;
assert(col.m_pk);
@@ -3220,14 +3327,14 @@ BSet::setbnd(Par par) const
{
if (m_bvals != 0) {
Rsq rsq1(m_bvals);
- for (unsigned j = 0; j < m_bvals; j++) {
- unsigned j2 = rsq1.next();
+ for (uint j = 0; j < m_bvals; j++) {
+ uint j2 = rsq1.next();
const BVal& bval = *m_bval[j2];
CHK(bval.setbnd(par) == 0);
}
// duplicate
if (urandom(5) == 0) {
- unsigned j3 = urandom(m_bvals);
+ uint j3 = urandom(m_bvals);
const BVal& bval = *m_bval[j3];
CHK(bval.setbnd(par) == 0);
}
@@ -3243,14 +3350,14 @@ BSet::setflt(Par par) const
CHK(con.beginFilter(NdbScanFilter::AND) == 0);
if (m_bvals != 0) {
Rsq rsq1(m_bvals);
- for (unsigned j = 0; j < m_bvals; j++) {
- unsigned j2 = rsq1.next();
+ for (uint j = 0; j < m_bvals; j++) {
+ uint j2 = rsq1.next();
const BVal& bval = *m_bval[j2];
CHK(bval.setflt(par) == 0);
}
// duplicate
if (urandom(5) == 0) {
- unsigned j3 = urandom(m_bvals);
+ uint j3 = urandom(m_bvals);
const BVal& bval = *m_bval[j3];
CHK(bval.setflt(par) == 0);
}
@@ -3266,57 +3373,59 @@ BSet::filter(Par par, const Set& set, Set& set2) const
const ITab& itab = m_itab;
assert(&tab == &set2.m_tab && set.m_rows == set2.m_rows);
assert(set2.count() == 0);
- for (unsigned i = 0; i < set.m_rows; i++) {
- if (! set.exist(i))
- continue;
+ for (uint i = 0; i < set.m_rows; i++) {
set.lock();
- const Row& row = set.dbrow(i);
- set.unlock();
- if (! g_store_null_key) {
- bool ok1 = false;
- for (unsigned k = 0; k < itab.m_icols; k++) {
- const ICol& icol = *itab.m_icol[k];
+ do {
+ if (set.m_row[i] == 0) {
+ break;
+ }
+ const Row& row = *set.m_row[i];
+ if (!g_store_null_key) {
+ bool ok1 = false;
+ for (uint k = 0; k < itab.m_icols; k++) {
+ const ICol& icol = *itab.m_icol[k];
+ const Col& col = icol.m_col;
+ const Val& val = *row.m_val[col.m_num];
+ if (!val.m_null) {
+ ok1 = true;
+ break;
+ }
+ }
+ if (!ok1)
+ break;
+ }
+ bool ok2 = true;
+ for (uint j = 0; j < m_bvals; j++) {
+ const BVal& bval = *m_bval[j];
+ const ICol& icol = bval.m_icol;
const Col& col = icol.m_col;
const Val& val = *row.m_val[col.m_num];
- if (! val.m_null) {
- ok1 = true;
- break;
+ int ret = bval.cmp(par, val);
+ LL5("cmp: ret=" << ret << " " << bval << " vs " << val);
+ if (bval.m_type == 0)
+ ok2 = (ret <= 0);
+ else if (bval.m_type == 1)
+ ok2 = (ret < 0);
+ else if (bval.m_type == 2)
+ ok2 = (ret >= 0);
+ else if (bval.m_type == 3)
+ ok2 = (ret > 0);
+ else if (bval.m_type == 4)
+ ok2 = (ret == 0);
+ else {
+ assert(false);
}
+ if (!ok2)
+ break;
}
- if (! ok1)
- continue;
- }
- bool ok2 = true;
- for (unsigned j = 0; j < m_bvals; j++) {
- const BVal& bval = *m_bval[j];
- const ICol& icol = bval.m_icol;
- const Col& col = icol.m_col;
- const Val& val = *row.m_val[col.m_num];
- int ret = bval.cmp(par, val);
- LL5("cmp: ret=" << ret << " " << bval << " vs " << val);
- if (bval.m_type == 0)
- ok2 = (ret <= 0);
- else if (bval.m_type == 1)
- ok2 = (ret < 0);
- else if (bval.m_type == 2)
- ok2 = (ret >= 0);
- else if (bval.m_type == 3)
- ok2 = (ret > 0);
- else if (bval.m_type == 4)
- ok2 = (ret == 0);
- else {
- assert(false);
- }
- if (! ok2)
+ if (!ok2)
break;
- }
- if (! ok2)
- continue;
- if (set2.m_row[i] == 0)
+ assert(set2.m_row[i] == 0);
set2.m_row[i] = new Row(tab);
- Row& row2 = *set2.m_row[i];
- assert(! row2.m_exist);
- row2.copy(row);
+ Row& row2 = *set2.m_row[i];
+ row2.copy(row, true);
+ } while (0);
+ set.unlock();
}
}
@@ -3324,7 +3433,7 @@ static NdbOut&
operator<<(NdbOut& out, const BSet& bset)
{
out << "bounds=" << bset.m_bvals;
- for (unsigned j = 0; j < bset.m_bvals; j++) {
+ for (uint j = 0; j < bset.m_bvals; j++) {
const BVal& bval = *bset.m_bval[j];
out << " [bound " << j << ": " << bval << "]";
}
@@ -3341,59 +3450,40 @@ pkinsert(Par par)
Set& set = par.set();
LL3("pkinsert " << tab.m_name);
CHK(con.startTransaction() == 0);
- Lst lst;
- for (unsigned j = 0; j < par.m_rows; j++) {
- unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
- unsigned i = thrrow(par, j2);
+ uint batch = 0;
+ for (uint j = 0; j < par.m_rows; j++) {
+ uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
+ uint i = thrrow(par, j2);
set.lock();
- if (set.exist(i) || set.pending(i, Row::AnyOp)) {
+ if (!set.compat(par, i, Row::OpIns)) {
+ LL3("pkinsert SKIP " << i << " " << set.getrow(i));
set.unlock();
- continue;
- }
- set.calc(par, i);
- CHK(set.insrow(par, i) == 0);
- set.unlock();
- LL4("pkinsert " << i << ": " << *set.m_row[i]);
- lst.push(i);
- if (lst.cnt() == par.m_batch) {
- bool deadlock = par.m_deadlock;
- bool nospace = true;
- ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
- CHK(con.execute(et, deadlock, nospace) == 0);
- con.closeTransaction();
- if (deadlock) {
- LL1("pkinsert: stop on deadlock [at 1]");
- return 0;
- }
- if (nospace) {
- LL1("pkinsert: cnt=" << j << " stop on nospace");
- return 0;
- }
+ } else {
+ set.push(i);
+ set.calc(par, i);
+ CHK(set.insrow(par, i) == 0);
+ set.unlock();
+ LL4("pkinsert key=" << i << " " << set.getrow(i));
+ batch++;
+ }
+ bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
+ if (batch == par.m_batch || lastbatch) {
+ uint err = par.m_catcherr;
+ ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
+ CHK(con.execute(et, err) == 0);
set.lock();
- set.notpending(lst, et);
+ set.post(par, !err ? et : Rollback);
set.unlock();
- lst.reset();
- CHK(con.startTransaction() == 0);
- }
- }
- if (lst.cnt() != 0) {
- bool deadlock = par.m_deadlock;
- bool nospace = true;
- ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
- CHK(con.execute(et, deadlock, nospace) == 0);
- con.closeTransaction();
- if (deadlock) {
- LL1("pkinsert: stop on deadlock [at 2]");
- return 0;
- }
- if (nospace) {
- LL1("pkinsert: end: stop on nospace");
- return 0;
+ if (err) {
+ LL1("pkinsert key=" << i << " stop on " << con.errname(err));
+ break;
+ }
+ batch = 0;
+ if (!lastbatch) {
+ con.closeTransaction();
+ CHK(con.startTransaction() == 0);
+ }
}
- set.lock();
- set.notpending(lst, et);
- set.unlock();
- return 0;
}
con.closeTransaction();
return 0;
@@ -3407,59 +3497,40 @@ pkupdate(Par par)
Set& set = par.set();
LL3("pkupdate " << tab.m_name);
CHK(con.startTransaction() == 0);
- Lst lst;
- bool deadlock = false;
- bool nospace = false;
- for (unsigned j = 0; j < par.m_rows; j++) {
- unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
- unsigned i = thrrow(par, j2);
+ uint batch = 0;
+ for (uint j = 0; j < par.m_rows; j++) {
+ uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
+ uint i = thrrow(par, j2);
set.lock();
- if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
- set.unlock();
- continue;
- }
- set.dbsave(i);
- set.calc(par, i);
- CHK(set.updrow(par, i) == 0);
- set.unlock();
- LL4("pkupdate " << i << ": " << *set.m_row[i]);
- lst.push(i);
- if (lst.cnt() == par.m_batch) {
- deadlock = par.m_deadlock;
- nospace = true;
- ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
- CHK(con.execute(et, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("pkupdate: stop on deadlock [at 1]");
- break;
- }
- if (nospace) {
- LL1("pkupdate: cnt=" << j << " stop on nospace [at 1]");
- break;
- }
- con.closeTransaction();
- set.lock();
- set.notpending(lst, et);
- set.dbdiscard(lst);
+ if (!set.compat(par, i, Row::OpUpd)) {
+ LL3("pkupdate SKIP " << i << " " << set.getrow(i));
set.unlock();
- lst.reset();
- CHK(con.startTransaction() == 0);
- }
- }
- if (! deadlock && ! nospace && lst.cnt() != 0) {
- deadlock = par.m_deadlock;
- nospace = true;
- ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
- CHK(con.execute(et, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("pkupdate: stop on deadlock [at 2]");
- } else if (nospace) {
- LL1("pkupdate: end: stop on nospace [at 2]");
} else {
+ set.push(i);
+ set.copyval(i, tab.m_pkmask);
+ set.calc(par, i, ~tab.m_pkmask);
+ CHK(set.updrow(par, i) == 0);
+ set.unlock();
+ LL4("pkupdate key=" << i << " " << set.getrow(i));
+ batch++;
+ }
+ bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
+ if (batch == par.m_batch || lastbatch) {
+ uint err = par.m_catcherr;
+ ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
+ CHK(con.execute(et, err) == 0);
set.lock();
- set.notpending(lst, et);
- set.dbdiscard(lst);
+ set.post(par, !err ? et : Rollback);
set.unlock();
+ if (err) {
+ LL1("pkupdate key=" << i << ": stop on " << con.errname(err));
+ break;
+ }
+ batch = 0;
+ if (!lastbatch) {
+ con.closeTransaction();
+ CHK(con.startTransaction() == 0);
+ }
}
}
con.closeTransaction();
@@ -3474,49 +3545,39 @@ pkdelete(Par par)
Set& set = par.set();
LL3("pkdelete " << tab.m_name);
CHK(con.startTransaction() == 0);
- Lst lst;
- bool deadlock = false;
- bool nospace = false;
- for (unsigned j = 0; j < par.m_rows; j++) {
- unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
- unsigned i = thrrow(par, j2);
+ uint batch = 0;
+ for (uint j = 0; j < par.m_rows; j++) {
+ uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
+ uint i = thrrow(par, j2);
set.lock();
- if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
+ if (!set.compat(par, i, Row::OpDel)) {
+ LL3("pkdelete SKIP " << i << " " << set.getrow(i));
set.unlock();
- continue;
- }
- CHK(set.delrow(par, i) == 0);
- set.unlock();
- LL4("pkdelete " << i << ": " << *set.m_row[i]);
- lst.push(i);
- if (lst.cnt() == par.m_batch) {
- deadlock = par.m_deadlock;
- nospace = true;
- ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
- CHK(con.execute(et, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("pkdelete: stop on deadlock [at 1]");
- break;
- }
- con.closeTransaction();
- set.lock();
- set.notpending(lst, et);
- set.unlock();
- lst.reset();
- CHK(con.startTransaction() == 0);
- }
- }
- if (! deadlock && ! nospace && lst.cnt() != 0) {
- deadlock = par.m_deadlock;
- nospace = true;
- ExecType et = randompct(par.m_abortpct) ? Rollback : Commit;
- CHK(con.execute(et, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("pkdelete: stop on deadlock [at 2]");
} else {
+ set.push(i);
+ set.copyval(i, tab.m_pkmask);
+ CHK(set.delrow(par, i) == 0);
+ set.unlock();
+ LL4("pkdelete key=" << i << " " << set.getrow(i));
+ batch++;
+ }
+ bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
+ if (batch == par.m_batch || lastbatch) {
+ uint err = par.m_catcherr;
+ ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
+ CHK(con.execute(et, err) == 0);
set.lock();
- set.notpending(lst, et);
+ set.post(par, !err ? et : Rollback);
set.unlock();
+ if (err) {
+ LL1("pkdelete key=" << i << " stop on " << con.errname(err));
+ break;
+ }
+ batch = 0;
+ if (!lastbatch) {
+ con.closeTransaction();
+ CHK(con.startTransaction() == 0);
+ }
}
}
con.closeTransaction();
@@ -3533,9 +3594,11 @@ pkread(Par par)
// expected
const Set& set1 = set;
Set set2(tab, set.m_rows);
- for (unsigned i = 0; i < set.m_rows; i++) {
+ for (uint i = 0; i < set.m_rows; i++) {
set.lock();
- if (! set.exist(i)) {
+ // TODO lock mode
+ if (!set.compat(par, i, Row::OpREAD)) {
+ LL3("pkread SKIP " << i << " " << set.getrow(i));
set.unlock();
continue;
}
@@ -3543,10 +3606,10 @@ pkread(Par par)
CHK(con.startTransaction() == 0);
CHK(set2.selrow(par, *set1.m_row[i]) == 0);
CHK(con.execute(Commit) == 0);
- unsigned i2 = (unsigned)-1;
+ uint i2 = (uint)-1;
CHK(set2.getkey(par, &i2) == 0 && i == i2);
CHK(set2.putval(i, false) == 0);
- LL4("row " << set2.count() << ": " << *set2.m_row[i]);
+ LL4("row " << set2.count() << " " << set2.getrow(i));
con.closeTransaction();
}
if (par.m_verify)
@@ -3555,7 +3618,7 @@ pkread(Par par)
}
static int
-pkreadfast(Par par, unsigned count)
+pkreadfast(Par par, uint count)
{
Con& con = par.con();
const Tab& tab = par.tab();
@@ -3563,9 +3626,9 @@ pkreadfast(Par par, unsigned count)
LL3("pkfast " << tab.m_name);
Row keyrow(tab);
// not batched on purpose
- for (unsigned j = 0; j < count; j++) {
- unsigned i = urandom(set.m_rows);
- assert(set.exist(i));
+ for (uint j = 0; j < count; j++) {
+ uint i = urandom(set.m_rows);
+ assert(set.compat(par, i, Row::OpREAD));
CHK(con.startTransaction() == 0);
// define key
keyrow.calc(par, i);
@@ -3585,53 +3648,46 @@ static int
hashindexupdate(Par par, const ITab& itab)
{
Con& con = par.con();
+ const Tab& tab = par.tab();
Set& set = par.set();
LL3("hashindexupdate " << itab.m_name);
CHK(con.startTransaction() == 0);
- Lst lst;
- bool deadlock = false;
- bool nospace = false;
- for (unsigned j = 0; j < par.m_rows; j++) {
- unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
- unsigned i = thrrow(par, j2);
+ uint batch = 0;
+ for (uint j = 0; j < par.m_rows; j++) {
+ uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
+ uint i = thrrow(par, j2);
set.lock();
- if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
+ if (!set.compat(par, i, Row::OpUpd)) {
+ LL3("hashindexupdate SKIP " << i << " " << set.getrow(i));
set.unlock();
- continue;
- }
- set.dbsave(i);
- // index key columns are not re-calculated
- set.calc(par, i, itab.m_colmask);
- CHK(set.updrow(par, itab, i) == 0);
- set.unlock();
- LL4("hashindexupdate " << i << ": " << *set.m_row[i]);
- lst.push(i);
- if (lst.cnt() == par.m_batch) {
- deadlock = par.m_deadlock;
- CHK(con.execute(Commit, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("hashindexupdate: stop on deadlock [at 1]");
- break;
- }
- con.closeTransaction();
- set.lock();
- set.notpending(lst);
- set.dbdiscard(lst);
- set.unlock();
- lst.reset();
- CHK(con.startTransaction() == 0);
- }
- }
- if (! deadlock && lst.cnt() != 0) {
- deadlock = par.m_deadlock;
- CHK(con.execute(Commit, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("hashindexupdate: stop on deadlock [at 2]");
} else {
+ // table pk and index key are not updated
+ set.push(i);
+ uint keymask = tab.m_pkmask | itab.m_keymask;
+ set.copyval(i, keymask);
+ set.calc(par, i, ~keymask);
+ CHK(set.updrow(par, itab, i) == 0);
+ set.unlock();
+ LL4("hashindexupdate " << i << " " << set.getrow(i));
+ batch++;
+ }
+ bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
+ if (batch == par.m_batch || lastbatch) {
+ uint err = par.m_catcherr;
+ ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
+ CHK(con.execute(et, err) == 0);
set.lock();
- set.notpending(lst);
- set.dbdiscard(lst);
+ set.post(par, !err ? et : Rollback);
set.unlock();
+ if (err) {
+ LL1("hashindexupdate " << i << " stop on " << con.errname(err));
+ break;
+ }
+ batch = 0;
+ if (!lastbatch) {
+ con.closeTransaction();
+ CHK(con.startTransaction() == 0);
+ }
}
}
con.closeTransaction();
@@ -3645,45 +3701,39 @@ hashindexdelete(Par par, const ITab& itab)
Set& set = par.set();
LL3("hashindexdelete " << itab.m_name);
CHK(con.startTransaction() == 0);
- Lst lst;
- bool deadlock = false;
- bool nospace = false;
- for (unsigned j = 0; j < par.m_rows; j++) {
- unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
- unsigned i = thrrow(par, j2);
+ uint batch = 0;
+ for (uint j = 0; j < par.m_rows; j++) {
+ uint j2 = !par.m_randomkey ? j : urandom(par.m_rows);
+ uint i = thrrow(par, j2);
set.lock();
- if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
- set.unlock();
- continue;
- }
- CHK(set.delrow(par, itab, i) == 0);
- set.unlock();
- LL4("hashindexdelete " << i << ": " << *set.m_row[i]);
- lst.push(i);
- if (lst.cnt() == par.m_batch) {
- deadlock = par.m_deadlock;
- CHK(con.execute(Commit, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("hashindexdelete: stop on deadlock [at 1]");
- break;
- }
- con.closeTransaction();
- set.lock();
- set.notpending(lst);
+ if (!set.compat(par, i, Row::OpDel)) {
+ LL3("hashindexdelete SKIP " << i << " " << set.getrow(i));
set.unlock();
- lst.reset();
- CHK(con.startTransaction() == 0);
- }
- }
- if (! deadlock && lst.cnt() != 0) {
- deadlock = par.m_deadlock;
- CHK(con.execute(Commit, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("hashindexdelete: stop on deadlock [at 2]");
} else {
+ set.push(i);
+ set.copyval(i, itab.m_keymask);
+ CHK(set.delrow(par, itab, i) == 0);
+ set.unlock();
+ LL4("hashindexdelete " << i << " " << set.getrow(i));
+ batch++;
+ }
+ bool lastbatch = (batch != 0 && j + 1 == par.m_rows);
+ if (batch == par.m_batch || lastbatch) {
+ uint err = par.m_catcherr;
+ ExecType et = !randompct(par.m_abortpct) ? Commit : Rollback;
+ CHK(con.execute(et, err) == 0);
set.lock();
- set.notpending(lst);
+ set.post(par, !err ? et : Rollback);
set.unlock();
+ if (err) {
+ LL1("hashindexdelete " << i << " stop on " << con.errname(err));
+ break;
+ }
+ batch = 0;
+ if (!lastbatch) {
+ con.closeTransaction();
+ CHK(con.startTransaction() == 0);
+ }
}
}
con.closeTransaction();
@@ -3700,9 +3750,11 @@ hashindexread(Par par, const ITab& itab)
// expected
const Set& set1 = set;
Set set2(tab, set.m_rows);
- for (unsigned i = 0; i < set.m_rows; i++) {
+ for (uint i = 0; i < set.m_rows; i++) {
set.lock();
- if (! set.exist(i)) {
+ // TODO lock mode
+ if (!set.compat(par, i, Row::OpREAD)) {
+ LL3("hashindexread SKIP " << i << " " << set.getrow(i));
set.unlock();
continue;
}
@@ -3710,10 +3762,10 @@ hashindexread(Par par, const ITab& itab)
CHK(con.startTransaction() == 0);
CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
CHK(con.execute(Commit) == 0);
- unsigned i2 = (unsigned)-1;
+ uint i2 = (uint)-1;
CHK(set2.getkey(par, &i2) == 0 && i == i2);
CHK(set2.putval(i, false) == 0);
- LL4("row " << set2.count() << ": " << *set2.m_row[i]);
+ LL4("row " << set2.count() << " " << *set2.m_row[i]);
con.closeTransaction();
}
if (par.m_verify)
@@ -3731,40 +3783,39 @@ scanreadtable(Par par)
const Set& set = par.set();
// expected
const Set& set1 = set;
- LL3("scanread " << tab.m_name << " lockmode=" << par.m_lockmode << " tupscan=" << par.m_tupscan << " expect=" << set1.count() << " verify=" << par.m_verify);
+ LL3("scanreadtable " << tab.m_name << " lockmode=" << par.m_lockmode << " tupscan=" << par.m_tupscan << " expect=" << set1.count() << " verify=" << par.m_verify);
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbScanOperation(tab) == 0);
CHK(con.readTuples(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
- unsigned n = 0;
- bool deadlock = false;
+ uint n = 0;
while (1) {
int ret;
- deadlock = par.m_deadlock;
- CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
+ uint err = par.m_catcherr;
+ CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
if (ret == 1)
break;
- if (deadlock) {
- LL1("scanreadtable: stop on deadlock");
+ if (err) {
+ LL1("scanreadtable stop on " << con.errname(err));
break;
}
- unsigned i = (unsigned)-1;
+ uint i = (uint)-1;
CHK(set2.getkey(par, &i) == 0);
CHK(set2.putval(i, false, n) == 0);
- LL4("row " << n << ": " << *set2.m_row[i]);
+ LL4("row " << n << " " << *set2.m_row[i]);
n++;
}
con.closeTransaction();
if (par.m_verify)
CHK(set1.verify(par, set2, false) == 0);
- LL3("scanread " << tab.m_name << " done rows=" << n);
+ LL3("scanreadtable " << tab.m_name << " done rows=" << n);
return 0;
}
static int
-scanreadtablefast(Par par, unsigned countcheck)
+scanreadtablefast(Par par, uint countcheck)
{
Con& con = par.con();
const Tab& tab = par.tab();
@@ -3777,7 +3828,7 @@ scanreadtablefast(Par par, unsigned countcheck)
NdbRecAttr* rec;
CHK(con.getValue((Uint32)0, rec) == 0);
CHK(con.executeScan() == 0);
- unsigned count = 0;
+ uint count = 0;
while (1) {
int ret;
CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
@@ -3797,7 +3848,7 @@ calcscanbounds(Par par, const ITab& itab, BSet& bset, const Set& set, Set& set1)
while (true) {
bset.calc(par);
bset.filter(par, set, set1);
- unsigned n = set1.count();
+ uint n = set1.count();
// prefer proper subset
if (0 < n && n < set.m_rows)
break;
@@ -3819,7 +3870,7 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
} else {
bset.filter(par, set, set1);
}
- LL3("scanread " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
+ LL3("scanreadindex " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
@@ -3827,22 +3878,21 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
CHK(bset.setbnd(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
- unsigned n = 0;
- bool deadlock = false;
+ uint n = 0;
while (1) {
int ret;
- deadlock = par.m_deadlock;
- CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
+ uint err = par.m_catcherr;
+ CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
if (ret == 1)
break;
- if (deadlock) {
- LL1("scanreadindex: stop on deadlock");
+ if (err) {
+ LL1("scanreadindex stop on " << con.errname(err));
break;
}
- unsigned i = (unsigned)-1;
+ uint i = (uint)-1;
CHK(set2.getkey(par, &i) == 0);
CHK(set2.putval(i, par.m_dups, n) == 0);
- LL4("key " << i << " row " << n << ": " << *set2.m_row[i]);
+ LL4("key " << i << " row " << n << " " << *set2.m_row[i]);
n++;
}
con.closeTransaction();
@@ -3851,12 +3901,12 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
if (par.m_ordered)
CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
}
- LL3("scanread " << itab.m_name << " done rows=" << n);
+ LL3("scanreadindex " << itab.m_name << " done rows=" << n);
return 0;
}
static int
-scanreadindexfast(Par par, const ITab& itab, const BSet& bset, unsigned countcheck)
+scanreadindexfast(Par par, const ITab& itab, const BSet& bset, uint countcheck)
{
Con& con = par.con();
const Tab& tab = par.tab();
@@ -3871,7 +3921,7 @@ scanreadindexfast(Par par, const ITab& itab, const BSet& bset, unsigned countche
NdbRecAttr* rec;
CHK(con.getValue((Uint32)0, rec) == 0);
CHK(con.executeScan() == 0);
- unsigned count = 0;
+ uint count = 0;
while (1) {
int ret;
CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
@@ -3904,22 +3954,21 @@ scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
CHK(bset.setflt(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
- unsigned n = 0;
- bool deadlock = false;
+ uint n = 0;
while (1) {
int ret;
- deadlock = par.m_deadlock;
- CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
+ uint err = par.m_catcherr;
+ CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
if (ret == 1)
break;
- if (deadlock) {
- LL1("scanfilter: stop on deadlock");
+ if (err) {
+ LL1("scanfilter stop on " << con.errname(err));
break;
}
- unsigned i = (unsigned)-1;
+ uint i = (uint)-1;
CHK(set2.getkey(par, &i) == 0);
CHK(set2.putval(i, par.m_dups, n) == 0);
- LL4("key " << i << " row " << n << ": " << *set2.m_row[i]);
+ LL4("key " << i << " row " << n << " " << *set2.m_row[i]);
n++;
}
con.closeTransaction();
@@ -3934,9 +3983,9 @@ static int
scanreadindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
- for (unsigned i = 0; i < par.m_subloop; i++) {
+ for (uint i = 0; i < par.m_ssloop; i++) {
if (itab.m_type == ITab::OrderedIndex) {
- BSet bset(tab, itab, par.m_rows);
+ BSet bset(tab, itab);
CHK(scanreadfilter(par, itab, bset, true) == 0);
CHK(scanreadindex(par, itab, bset, true) == 0);
}
@@ -3948,7 +3997,7 @@ static int
scanreadindex(Par par)
{
const Tab& tab = par.tab();
- for (unsigned i = 0; i < tab.m_itabs; i++) {
+ for (uint i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
@@ -3985,7 +4034,7 @@ timescanpkindex(Par par)
{
const Tab& tab = par.tab();
const ITab& itab = *tab.m_itab[0]; // 1st index is on PK
- BSet bset(tab, itab, par.m_rows);
+ BSet bset(tab, itab);
par.tmr().on();
CHK(scanreadindexfast(par, itab, bset, par.m_totrows) == 0);
par.tmr().off(par.set().m_rows);
@@ -3996,7 +4045,7 @@ static int
timepkreadtable(Par par)
{
par.tmr().on();
- unsigned count = par.m_samples;
+ uint count = par.m_samples;
if (count == 0)
count = par.m_totrows;
CHK(pkreadfast(par, count) == 0);
@@ -4009,13 +4058,13 @@ timepkreadindex(Par par)
{
const Tab& tab = par.tab();
const ITab& itab = *tab.m_itab[0]; // 1st index is on PK
- BSet bset(tab, itab, par.m_rows);
- unsigned count = par.m_samples;
+ BSet bset(tab, itab);
+ uint count = par.m_samples;
if (count == 0)
count = par.m_totrows;
par.tmr().on();
- for (unsigned j = 0; j < count; j++) {
- unsigned i = urandom(par.m_totrows);
+ for (uint j = 0; j < count; j++) {
+ uint i = urandom(par.m_totrows);
bset.calcpk(par, i);
CHK(scanreadindexfast(par, itab, bset, 1) == 0);
}
@@ -4031,7 +4080,7 @@ scanupdatetable(Par par)
Con& con = par.con();
const Tab& tab = par.tab();
Set& set = par.set();
- LL3("scan update " << tab.m_name);
+ LL3("scanupdatetable " << tab.m_name);
Set set2(tab, set.m_rows);
par.m_lockmode = NdbOperation::LM_Exclusive;
CHK(con.startTransaction() == 0);
@@ -4039,87 +4088,70 @@ scanupdatetable(Par par)
CHK(con.readTuples(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
- unsigned count = 0;
+ uint count = 0;
// updating trans
Con con2;
con2.connect(con);
CHK(con2.startTransaction() == 0);
- Lst lst;
- bool deadlock = false;
- bool nospace = false;
+ uint batch = 0;
while (1) {
int ret;
- deadlock = par.m_deadlock;
- CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
- if (ret == 1)
+ uint32 err = par.m_catcherr;
+ CHK((ret = con.nextScanResult(true, err)) != -1);
+ if (ret != 0)
break;
- if (deadlock) {
- LL1("scanupdatetable: stop on deadlock [at 1]");
+ if (err) {
+ LL1("scanupdatetable [scan] stop on " << con.errname(err));
break;
}
if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
con.closeScan();
break;
}
- do {
- unsigned i = (unsigned)-1;
+ while (1) {
+ uint i = (uint)-1;
CHK(set2.getkey(par, &i) == 0);
- const Row& row = *set.m_row[i];
set.lock();
- if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
- LL4("scan update " << tab.m_name << ": skip: " << row);
+ if (!set.compat(par, i, Row::OpUpd)) {
+ LL3("scanupdatetable SKIP " << i << " " << set.getrow(i));
} else {
CHKTRY(set2.putval(i, false) == 0, set.unlock());
CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
Par par2 = par;
par2.m_con = &con2;
- set.dbsave(i);
- set.calc(par, i);
+ set.push(i);
+ set.calc(par, i, ~tab.m_pkmask);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
- LL4("scan update " << tab.m_name << ": " << row);
- lst.push(i);
+ LL4("scanupdatetable " << i << " " << set.getrow(i));
+ batch++;
}
set.unlock();
- if (lst.cnt() == par.m_batch) {
- deadlock = par.m_deadlock;
- CHK(con2.execute(Commit, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("scanupdatetable: stop on deadlock [at 2]");
- goto out;
- }
- con2.closeTransaction();
+ CHK((ret = con.nextScanResult(false)) != -1);
+ bool lastbatch = (batch != 0 && ret != 0);
+ if (batch == par.m_batch || lastbatch) {
+ uint err = par.m_catcherr;
+ ExecType et = Commit;
+ CHK(con2.execute(et, err) == 0);
set.lock();
- set.notpending(lst);
- set.dbdiscard(lst);
+ set.post(par, !err ? et : Rollback);
set.unlock();
- count += lst.cnt();
- lst.reset();
- CHK(con2.startTransaction() == 0);
- }
- CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2);
- if (ret == 2 && lst.cnt() != 0) {
- deadlock = par.m_deadlock;
- CHK(con2.execute(Commit, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("scanupdatetable: stop on deadlock [at 3]");
+ if (err) {
+ LL1("scanupdatetable [update] stop on " << con2.errname(err));
goto out;
}
+ LL4("scanupdatetable committed batch");
+ count += batch;
+ batch = 0;
con2.closeTransaction();
- set.lock();
- set.notpending(lst);
- set.dbdiscard(lst);
- set.unlock();
- count += lst.cnt();
- lst.reset();
CHK(con2.startTransaction() == 0);
}
- } while (ret == 0);
- if (ret == 1)
- break;
+ if (ret != 0)
+ break;
+ }
}
out:
con2.closeTransaction();
- LL3("scan update " << tab.m_name << " rows updated=" << count);
+ LL3("scanupdatetable " << tab.m_name << " rows updated=" << count);
con.closeTransaction();
return 0;
}
@@ -4137,7 +4169,7 @@ scanupdateindex(Par par, const ITab& itab, BSet& bset, bool calc)
} else {
bset.filter(par, set, set1);
}
- LL3("scan update " << itab.m_name << " " << bset << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
+ LL3("scanupdateindex " << itab.m_name << " " << bset << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
Set set2(tab, set.m_rows);
par.m_lockmode = NdbOperation::LM_Exclusive;
CHK(con.startTransaction() == 0);
@@ -4146,83 +4178,67 @@ scanupdateindex(Par par, const ITab& itab, BSet& bset, bool calc)
CHK(bset.setbnd(par) == 0);
set2.getval(par);
CHK(con.executeScan() == 0);
- unsigned count = 0;
+ uint count = 0;
// updating trans
Con con2;
con2.connect(con);
CHK(con2.startTransaction() == 0);
- Lst lst;
- bool deadlock = false;
- bool nospace = false;
+ uint batch = 0;
while (1) {
int ret;
- deadlock = par.m_deadlock;
- CHK((ret = con.nextScanResult(true, deadlock)) == 0 || ret == 1);
- if (ret == 1)
+ uint err = par.m_catcherr;
+ CHK((ret = con.nextScanResult(true, err)) != -1);
+ if (ret != 0)
break;
- if (deadlock) {
- LL1("scanupdateindex: stop on deadlock [at 1]");
+ if (err) {
+ LL1("scanupdateindex [scan] stop on " << con.errname(err));
break;
}
if (par.m_scanstop != 0 && urandom(par.m_scanstop) == 0) {
con.closeScan();
break;
}
- do {
- unsigned i = (unsigned)-1;
+ while (1) {
+ uint i = (uint)-1;
CHK(set2.getkey(par, &i) == 0);
- const Row& row = *set.m_row[i];
set.lock();
- if (! set.exist(i) || set.pending(i, Row::AnyOp)) {
- LL4("scan update " << itab.m_name << ": skip: " << row);
+ if (!set.compat(par, i, Row::OpUpd)) {
+ LL4("scanupdateindex SKIP " << set.getrow(i));
} else {
CHKTRY(set2.putval(i, par.m_dups) == 0, set.unlock());
CHKTRY(con.updateScanTuple(con2) == 0, set.unlock());
Par par2 = par;
par2.m_con = &con2;
- set.dbsave(i);
- set.calc(par, i, ! par.m_noindexkeyupdate ? 0 : itab.m_colmask);
+ set.push(i);
+ uint colmask = !par.m_noindexkeyupdate ? ~0 : ~itab.m_keymask;
+ set.calc(par, i, colmask);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
- LL4("scan update " << itab.m_name << ": " << row);
- lst.push(i);
+ LL4("scanupdateindex " << i << " " << set.getrow(i));
+ batch++;
}
set.unlock();
- if (lst.cnt() == par.m_batch) {
- deadlock = par.m_deadlock;
- CHK(con2.execute(Commit, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("scanupdateindex: stop on deadlock [at 2]");
- goto out;
- }
- con2.closeTransaction();
- LL4("scanupdateindex: committed batch [at 1]");
+ CHK((ret = con.nextScanResult(false)) != -1);
+ bool lastbatch = (batch != 0 && ret != 0);
+ if (batch == par.m_batch || lastbatch) {
+ uint err = par.m_catcherr;
+ ExecType et = Commit;
+ CHK(con2.execute(et, err) == 0);
set.lock();
- set.notpending(lst);
- set.dbdiscard(lst);
+ set.post(par, !err ? et : Rollback);
set.unlock();
- count += lst.cnt();
- lst.reset();
- CHK(con2.startTransaction() == 0);
- }
- CHK((ret = con.nextScanResult(false)) == 0 || ret == 1 || ret == 2);
- if (ret == 2 && lst.cnt() != 0) {
- deadlock = par.m_deadlock;
- CHK(con2.execute(Commit, deadlock, nospace) == 0);
- if (deadlock) {
- LL1("scanupdateindex: stop on deadlock [at 3]");
+ if (err) {
+ LL1("scanupdateindex [update] stop on " << con2.errname(err));
goto out;
}
+ LL4("scanupdateindex committed batch");
+ count += batch;
+ batch = 0;
con2.closeTransaction();
- LL4("scanupdateindex: committed batch [at 2]");
- set.lock();
- set.notpending(lst);
- set.dbdiscard(lst);
- set.unlock();
- count += lst.cnt();
- lst.reset();
CHK(con2.startTransaction() == 0);
}
- } while (ret == 0);
+ if (ret != 0)
+ break;
+ }
}
out:
con2.closeTransaction();
@@ -4231,7 +4247,7 @@ out:
if (par.m_ordered)
CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
}
- LL3("scan update " << itab.m_name << " rows updated=" << count);
+ LL3("scanupdateindex " << itab.m_name << " rows updated=" << count);
con.closeTransaction();
return 0;
}
@@ -4240,9 +4256,9 @@ static int
scanupdateindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
- for (unsigned i = 0; i < par.m_subloop; i++) {
+ for (uint i = 0; i < par.m_ssloop; i++) {
if (itab.m_type == ITab::OrderedIndex) {
- BSet bset(tab, itab, par.m_rows);
+ BSet bset(tab, itab);
CHK(scanupdateindex(par, itab, bset, true) == 0);
} else {
CHK(hashindexupdate(par, itab) == 0);
@@ -4255,7 +4271,7 @@ static int
scanupdateindex(Par par)
{
const Tab& tab = par.tab();
- for (unsigned i = 0; i < tab.m_itabs; i++) {
+ for (uint i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
@@ -4294,14 +4310,14 @@ readverifyfull(Par par)
CHK(scanreadtable(par) == 0);
}
// each thread scans different indexes
- for (unsigned i = 0; i < tab.m_itabs; i++) {
- if (i % par.m_threads != par.m_no)
+ for (uint i = 0; i < tab.m_itabs; i++) {
+ if (i % par.m_usedthreads != par.m_no)
continue;
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
if (itab.m_type == ITab::OrderedIndex) {
- BSet bset(tab, itab, par.m_rows);
+ BSet bset(tab, itab);
CHK(scanreadindex(par, itab, bset, false) == 0);
} else {
CHK(hashindexread(par, itab) == 0);
@@ -4317,7 +4333,7 @@ readverifyindex(Par par)
return 0;
par.m_verify = true;
par.m_lockmode = NdbOperation::LM_CommittedRead;
- unsigned sel = urandom(10);
+ uint sel = urandom(10);
if (sel < 9) {
par.m_ordered = true;
par.m_descending = (sel < 5);
@@ -4331,8 +4347,8 @@ pkops(Par par)
{
const Tab& tab = par.tab();
par.m_randomkey = true;
- for (unsigned i = 0; i < par.m_subloop; i++) {
- unsigned j = 0;
+ for (uint i = 0; i < par.m_ssloop; i++) {
+ uint j = 0;
while (j < tab.m_itabs) {
if (tab.m_itab[j] != 0) {
const ITab& itab = *tab.m_itab[j];
@@ -4341,7 +4357,7 @@ pkops(Par par)
}
j++;
}
- unsigned sel = urandom(10);
+ uint sel = urandom(10);
if (par.m_slno % 2 == 0) {
// favor insert
if (sel < 8) {
@@ -4389,8 +4405,8 @@ static int
pkupdatescanread(Par par)
{
par.m_dups = true;
- par.m_deadlock = true;
- unsigned sel = urandom(10);
+ par.m_catcherr |= Con::ErrDeadlock;
+ uint sel = urandom(10);
if (sel < 5) {
CHK(pkupdate(par) == 0);
} else if (sel < 6) {
@@ -4411,9 +4427,9 @@ static int
mixedoperations(Par par)
{
par.m_dups = true;
- par.m_deadlock = true;
+ par.m_catcherr |= Con::ErrDeadlock;
par.m_scanstop = par.m_totrows; // randomly close scans
- unsigned sel = urandom(10);
+ uint sel = urandom(10);
if (sel < 2) {
CHK(pkdelete(par) == 0);
} else if (sel < 4) {
@@ -4434,8 +4450,8 @@ static int
parallelorderedupdate(Par par)
{
const Tab& tab = par.tab();
- unsigned k = 0;
- for (unsigned i = 0; i < tab.m_itabs; i++) {
+ uint k = 0;
+ for (uint i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
@@ -4447,10 +4463,11 @@ parallelorderedupdate(Par par)
par.m_noindexkeyupdate = true;
par.m_ordered = true;
par.m_descending = (par.m_slno != 0);
+ par.m_dups = false;
par.m_verify = true;
- BSet bset(tab, itab, par.m_rows); // empty bounds
+ BSet bset(tab, itab); // empty bounds
// prefer empty bounds
- unsigned sel = urandom(10);
+ uint sel = urandom(10);
CHK(scanupdateindex(par, itab, bset, sel < 2) == 0);
}
}
@@ -4469,6 +4486,418 @@ pkupdateindexbuild(Par par)
return 0;
}
+// savepoint tests (single thread for now)
+
+struct Spt {
+ enum Res { Committed, Latest, Deadlock };
+ bool m_same; // same transaction
+ NdbOperation::LockMode m_lm;
+ Res m_res;
+};
+
+static Spt sptlist[] = {
+ { 1, NdbOperation::LM_Read, Spt::Latest },
+ { 1, NdbOperation::LM_Exclusive, Spt::Latest },
+ { 1, NdbOperation::LM_CommittedRead, Spt::Latest },
+ { 0, NdbOperation::LM_Read, Spt::Deadlock },
+ { 0, NdbOperation::LM_Exclusive, Spt::Deadlock },
+ { 0, NdbOperation::LM_CommittedRead, Spt::Committed }
+};
+static uint sptcount = sizeof(sptlist)/sizeof(sptlist[0]);
+
+static int
+savepointreadpk(Par par, Spt spt)
+{
+ LL3("savepointreadpk");
+ Con& con = par.con();
+ const Tab& tab = par.tab();
+ Set& set = par.set();
+ const Set& set1 = set;
+ Set set2(tab, set.m_rows);
+ uint n = 0;
+ for (uint i = 0; i < set.m_rows; i++) {
+ set.lock();
+ if (!set.compat(par, i, Row::OpREAD)) {
+ LL4("savepointreadpk SKIP " << i << " " << set.getrow(i));
+ set.unlock();
+ continue;
+ }
+ set.unlock();
+ CHK(set2.selrow(par, *set1.m_row[i]) == 0);
+ uint err = par.m_catcherr | Con::ErrDeadlock;
+ ExecType et = NoCommit;
+ CHK(con.execute(et, err) == 0);
+ if (err) {
+ if (err & Con::ErrDeadlock) {
+ CHK(spt.m_res == Spt::Deadlock);
+ // all rows have same behaviour
+ CHK(n == 0);
+ }
+ LL1("savepointreadpk stop on " << con.errname(err));
+ break;
+ }
+ uint i2 = (uint)-1;
+ CHK(set2.getkey(par, &i2) == 0 && i == i2);
+ CHK(set2.putval(i, false) == 0);
+ LL4("row " << set2.count() << " " << set2.getrow(i));
+ n++;
+ }
+ bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
+ if (spt.m_res != Spt::Deadlock)
+ CHK(set1.verify(par, set2, false, dirty) == 0);
+ return 0;
+}
+
+static int
+savepointreadhashindex(Par par, Spt spt)
+{
+ if (spt.m_lm == NdbOperation::LM_CommittedRead && !spt.m_same) {
+ LL1("skip hash index dirty read");
+ return 0;
+ }
+ LL3("savepointreadhashindex");
+ Con& con = par.con();
+ const Tab& tab = par.tab();
+ const ITab& itab = par.itab();
+ Set& set = par.set();
+ const Set& set1 = set;
+ Set set2(tab, set.m_rows);
+ uint n = 0;
+ for (uint i = 0; i < set.m_rows; i++) {
+ set.lock();
+ if (!set.compat(par, i, Row::OpREAD)) {
+ LL3("savepointreadhashindex SKIP " << i << " " << set.getrow(i));
+ set.unlock();
+ continue;
+ }
+ set.unlock();
+ CHK(set2.selrow(par, itab, *set1.m_row[i]) == 0);
+ uint err = par.m_catcherr | Con::ErrDeadlock;
+ ExecType et = NoCommit;
+ CHK(con.execute(et, err) == 0);
+ if (err) {
+ if (err & Con::ErrDeadlock) {
+ CHK(spt.m_res == Spt::Deadlock);
+ // all rows have same behaviour
+ CHK(n == 0);
+ }
+ LL1("savepointreadhashindex stop on " << con.errname(err));
+ break;
+ }
+ uint i2 = (uint)-1;
+ CHK(set2.getkey(par, &i2) == 0 && i == i2);
+ CHK(set2.putval(i, false) == 0);
+ LL4("row " << set2.count() << " " << *set2.m_row[i]);
+ n++;
+ }
+ bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
+ if (spt.m_res != Spt::Deadlock)
+ CHK(set1.verify(par, set2, false, dirty) == 0);
+ return 0;
+}
+
+static int
+savepointscantable(Par par, Spt spt)
+{
+ LL3("savepointscantable");
+ Con& con = par.con();
+ const Tab& tab = par.tab();
+ const Set& set = par.set();
+ const Set& set1 = set; // not modifying current set
+ Set set2(tab, set.m_rows); // scan result
+ CHK(con.getNdbScanOperation(tab) == 0);
+ CHK(con.readTuples(par) == 0);
+ set2.getval(par); // getValue all columns
+ CHK(con.executeScan() == 0);
+ bool deadlock = false;
+ uint n = 0;
+ while (1) {
+ int ret;
+ uint err = par.m_catcherr | Con::ErrDeadlock;
+ CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
+ if (ret == 1)
+ break;
+ if (err) {
+ if (err & Con::ErrDeadlock) {
+ CHK(spt.m_res == Spt::Deadlock);
+ // all rows have same behaviour
+ CHK(n == 0);
+ deadlock = true;
+ }
+ LL1("savepointscantable stop on " << con.errname(err));
+ break;
+ }
+ CHK(spt.m_res != Spt::Deadlock);
+ uint i = (uint)-1;
+ CHK(set2.getkey(par, &i) == 0);
+ CHK(set2.putval(i, false, n) == 0);
+ LL4("row " << n << " key " << i << " " << set2.getrow(i));
+ n++;
+ }
+ if (set1.m_rows > 0) {
+ if (!deadlock)
+ CHK(spt.m_res != Spt::Deadlock);
+ else
+ CHK(spt.m_res == Spt::Deadlock);
+ }
+ LL2("savepointscantable " << n << " rows");
+ bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
+ if (spt.m_res != Spt::Deadlock)
+ CHK(set1.verify(par, set2, false, dirty) == 0);
+ return 0;
+}
+
+static int
+savepointscanindex(Par par, Spt spt)
+{
+ LL3("savepointscanindex");
+ Con& con = par.con();
+ const Tab& tab = par.tab();
+ const ITab& itab = par.itab();
+ const Set& set = par.set();
+ const Set& set1 = set;
+ Set set2(tab, set.m_rows);
+ CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
+ CHK(con.readIndexTuples(par) == 0);
+ set2.getval(par);
+ CHK(con.executeScan() == 0);
+ bool deadlock = false;
+ uint n = 0;
+ while (1) {
+ int ret;
+ uint err = par.m_catcherr | Con::ErrDeadlock;
+ CHK((ret = con.nextScanResult(true, err)) == 0 || ret == 1);
+ if (ret == 1)
+ break;
+ if (err) {
+ if (err & Con::ErrDeadlock) {
+ CHK(spt.m_res == Spt::Deadlock);
+ // all rows have same behaviour
+ CHK(n == 0);
+ deadlock = true;
+ }
+ LL1("savepointscanindex stop on " << con.errname(err));
+ break;
+ }
+ CHK(spt.m_res != Spt::Deadlock);
+ uint i = (uint)-1;
+ CHK(set2.getkey(par, &i) == 0);
+ CHK(set2.putval(i, par.m_dups, n) == 0);
+ LL4("row " << n << " key " << i << " " << set2.getrow(i));
+ n++;
+ }
+ if (set1.m_rows > 0) {
+ if (!deadlock)
+ CHK(spt.m_res != Spt::Deadlock);
+ else
+ CHK(spt.m_res == Spt::Deadlock);
+ }
+ LL2("savepointscanindex " << n << " rows");
+ bool dirty = (!spt.m_same && spt.m_lm == NdbOperation::LM_CommittedRead);
+ if (spt.m_res != Spt::Deadlock)
+ CHK(set1.verify(par, set2, false, dirty) == 0);
+ return 0;
+}
+
+typedef int (*SptFun)(Par, Spt);
+
+static int
+savepointtest(Par par, Spt spt, SptFun fun)
+{
+ Con& con = par.con();
+ Par par2 = par;
+ Con con2;
+ if (!spt.m_same) {
+ con2.connect(con); // copy ndb reference
+ par2.m_con = &con2;
+ CHK(con2.startTransaction() == 0);
+ }
+ par2.m_lockmode = spt.m_lm;
+ CHK((*fun)(par2, spt) == 0);
+ if (!spt.m_same) {
+ con2.closeTransaction();
+ }
+ return 0;
+}
+
+static int
+savepointtest(Par par, const char* op)
+{
+ Con& con = par.con();
+ const Tab& tab = par.tab();
+ Set& set = par.set();
+ LL2("savepointtest op=\"" << op << "\"");
+ CHK(con.startTransaction() == 0);
+ const char* p = op;
+ char c;
+ while ((c = *p++) != 0) {
+ uint j;
+ for (j = 0; j < par.m_rows; j++) {
+ uint i = thrrow(par, j);
+ if (c == 'c') {
+ ExecType et = Commit;
+ CHK(con.execute(et) == 0);
+ set.lock();
+ set.post(par, et);
+ set.unlock();
+ CHK(con.startTransaction() == 0);
+ } else {
+ set.lock();
+ set.push(i);
+ if (c == 'i') {
+ set.calc(par, i);
+ CHK(set.insrow(par, i) == 0);
+ } else if (c == 'u') {
+ set.copyval(i, tab.m_pkmask);
+ set.calc(par, i, ~tab.m_pkmask);
+ CHK(set.updrow(par, i) == 0);
+ } else if (c == 'd') {
+ set.copyval(i, tab.m_pkmask);
+ CHK(set.delrow(par, i) == 0);
+ } else {
+ assert(false);
+ }
+ set.unlock();
+ }
+ }
+ }
+ {
+ ExecType et = NoCommit;
+ CHK(con.execute(et) == 0);
+ set.lock();
+ set.post(par, et);
+ set.unlock();
+ }
+ for (uint k = 0; k < sptcount; k++) {
+ Spt spt = sptlist[k];
+ LL2("spt lm=" << spt.m_lm << " same=" << spt.m_same);
+ CHK(savepointtest(par, spt, &savepointreadpk) == 0);
+ CHK(savepointtest(par, spt, &savepointscantable) == 0);
+ for (uint i = 0; i < tab.m_itabs; i++) {
+ if (tab.m_itab[i] == 0)
+ continue;
+ const ITab& itab = *tab.m_itab[i];
+ par.m_itab = &itab;
+ if (itab.m_type == ITab::OrderedIndex)
+ CHK(savepointtest(par, spt, &savepointscanindex) == 0);
+ else
+ CHK(savepointtest(par, spt, &savepointreadhashindex) == 0);
+ par.m_itab = 0;
+ }
+ }
+ {
+ ExecType et = Rollback;
+ CHK(con.execute(et) == 0);
+ set.lock();
+ set.post(par, et);
+ set.unlock();
+ }
+ con.closeTransaction();
+ return 0;
+}
+
+static int
+savepointtest(Par par)
+{
+ assert(par.m_usedthreads == 1);
+ const char* oplist[] = {
+ // each based on previous and "c" not last
+ "i",
+ "icu",
+ "uuuuu",
+ "d",
+ "dciuuuuud",
+ 0
+ };
+ int i;
+ for (i = 0; oplist[i] != 0; i++) {
+ CHK(savepointtest(par, oplist[i]) == 0);
+ }
+ return 0;
+}
+
+static int
+halloweentest(Par par, const ITab& itab)
+{
+ LL2("halloweentest " << itab.m_name);
+ Con& con = par.con();
+ const Tab& tab = par.tab();
+ Set& set = par.set();
+ CHK(con.startTransaction() == 0);
+ // insert 1 row
+ uint i = 0;
+ set.push(i);
+ set.calc(par, i);
+ CHK(set.insrow(par, i) == 0);
+ CHK(con.execute(NoCommit) == 0);
+ // scan via index until Set m_rows reached
+ uint scancount = 0;
+ bool stop = false;
+ while (!stop) {
+ par.m_lockmode = // makes no difference
+ scancount % 2 == 0 ? NdbOperation::LM_CommittedRead :
+ NdbOperation::LM_Read;
+ Set set1(tab, set.m_rows); // expected scan result
+ Set set2(tab, set.m_rows); // actual scan result
+ BSet bset(tab, itab);
+ calcscanbounds(par, itab, bset, set, set1);
+ CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
+ CHK(con.readIndexTuples(par) == 0);
+ CHK(bset.setbnd(par) == 0);
+ set2.getval(par);
+ CHK(con.executeScan() == 0);
+ const uint savepoint = i;
+ LL3("scancount=" << scancount << " savepoint=" << savepoint);
+ uint n = 0;
+ while (1) {
+ int ret;
+ CHK((ret = con.nextScanResult(true)) == 0 || ret == 1);
+ if (ret == 1)
+ break;
+ uint k = (uint)-1;
+ CHK(set2.getkey(par, &k) == 0);
+ CHK(set2.putval(k, false, n) == 0);
+ LL3("row=" << n << " key=" << k);
+ CHK(k <= savepoint);
+ if (++i == set.m_rows) {
+ stop = true;
+ break;
+ }
+ set.push(i);
+ set.calc(par, i);
+ CHK(set.insrow(par, i) == 0);
+ CHK(con.execute(NoCommit) == 0);
+ n++;
+ }
+ con.closeScan();
+ LL3("scanrows=" << n);
+ if (!stop) {
+ CHK(set1.verify(par, set2, false) == 0);
+ }
+ scancount++;
+ }
+ CHK(con.execute(Commit) == 0);
+ set.post(par, Commit);
+ assert(set.count() == set.m_rows);
+ CHK(pkdelete(par) == 0);
+ return 0;
+}
+
+static int
+halloweentest(Par par)
+{
+ assert(par.m_usedthreads == 1);
+ const Tab& tab = par.tab();
+ for (uint i = 0; i < tab.m_itabs; i++) {
+ if (tab.m_itab[i] == 0)
+ continue;
+ const ITab& itab = *tab.m_itab[i];
+ if (itab.m_type == ITab::OrderedIndex)
+ CHK(halloweentest(par, itab) == 0);
+ }
+ return 0;
+}
+
// threads
typedef int (*TFunc)(Par par);
@@ -4477,7 +4906,7 @@ enum TMode { ST = 1, MT = 2 };
extern "C" { static void* runthread(void* arg); }
struct Thr {
- enum State { Wait, Start, Stop, Stopped, Exit };
+ enum State { Wait, Start, Stop, Exit };
State m_state;
Par m_par;
Uint64 m_id;
@@ -4487,12 +4916,12 @@ struct Thr {
TFunc m_func;
int m_ret;
void* m_status;
- Thr(Par par, unsigned n);
+ char m_tmp[20]; // used for debug msg prefix
+ Thr(Par par, uint n);
~Thr();
int run();
void start();
void stop();
- void stopped();
void exit();
//
void lock() {
@@ -4513,7 +4942,7 @@ struct Thr {
}
};
-Thr::Thr(Par par, unsigned n) :
+Thr::Thr(Par par, uint n) :
m_state(Wait),
m_par(par),
m_id(0),
@@ -4533,7 +4962,7 @@ Thr::Thr(Par par, unsigned n) :
m_cond = NdbCondition_Create();
assert(m_mutex != 0 && m_cond != 0);
// run
- const unsigned stacksize = 256 * 1024;
+ const uint stacksize = 256 * 1024;
const NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
m_thread = NdbThread_Create(runthread, (void**)this, stacksize, name, prio);
}
@@ -4589,11 +5018,16 @@ Thr::run()
LL4("start");
assert(m_state == Start);
m_ret = (*m_func)(m_par);
- m_state = Stopped;
+ m_state = Stop;
LL4("stop");
signal();
unlock();
- CHK(m_ret == 0);
+ if (m_ret == -1) {
+ if (m_par.m_cont)
+ LL1("continue running due to -cont");
+ else
+ return -1;
+ }
}
con.disconnect();
return 0;
@@ -4612,16 +5046,7 @@ void
Thr::stop()
{
lock();
- m_state = Stop;
- signal();
- unlock();
-}
-
-void
-Thr::stopped()
-{
- lock();
- while (m_state != Stopped)
+ while (m_state != Stop)
wait();
m_state = Wait;
unlock();
@@ -4640,27 +5065,44 @@ Thr::exit()
static Thr** g_thrlist = 0;
-static unsigned
-getthrno()
+static Thr*
+getthr()
{
if (g_thrlist != 0) {
Uint64 id = (Uint64)pthread_self();
- for (unsigned n = 0; n < g_opt.m_threads; n++) {
+ for (uint n = 0; n < g_opt.m_threads; n++) {
if (g_thrlist[n] != 0) {
- const Thr& thr = *g_thrlist[n];
+ Thr& thr = *g_thrlist[n];
if (thr.m_id == id)
- return thr.m_par.m_no;
+ return &thr;
}
}
}
- return (unsigned)-1;
+ return 0;
+}
+
+// for debug messages (par.m_no not available)
+static const char*
+getthrprefix()
+{
+ Thr* thrp = getthr();
+ if (thrp != 0) {
+ Thr& thr = *thrp;
+ uint n = thr.m_par.m_no;
+ uint m =
+ g_opt.m_threads < 10 ? 1 :
+ g_opt.m_threads < 100 ? 2 : 3;
+ sprintf(thr.m_tmp, "[%0*u] ", m, n);
+ return thr.m_tmp;
+ }
+ return "";
}
static int
-runstep(Par par, const char* fname, TFunc func, unsigned mode)
+runstep(Par par, const char* fname, TFunc func, uint mode)
{
LL2("step: " << fname);
- const int threads = (mode & ST ? 1 : par.m_threads);
+ const int threads = (mode & ST ? 1 : par.m_usedthreads);
int n;
for (n = 0; n < threads; n++) {
LL4("start " << n);
@@ -4673,11 +5115,11 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
thr.m_func = func;
thr.start();
}
- unsigned errs = 0;
+ uint errs = 0;
for (n = threads - 1; n >= 0; n--) {
LL4("stop " << n);
Thr& thr = *g_thrlist[n];
- thr.stopped();
+ thr.stop();
if (thr.m_ret != 0)
errs++;
}
@@ -4689,7 +5131,7 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
CHK(runstep(par, #func, func, mode) == 0)
#define SUBLOOP(par) \
- "subloop: " << par.m_lno << "/" << par.m_currcase << "/" << \
+ "sloop: " << par.m_lno << "/" << par.m_currcase << "/" << \
par.m_tab->m_name << "/" << par.m_slno
static int
@@ -4698,7 +5140,7 @@ tbuild(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
if (par.m_slno % 3 == 0) {
RUNSTEP(par, createindex, ST);
@@ -4718,7 +5160,7 @@ tbuild(Par par)
}
RUNSTEP(par, readverifyfull, MT);
// leave last one
- if (par.m_slno + 1 < par.m_subloop) {
+ if (par.m_slno + 1 < par.m_sloop) {
RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST);
@@ -4737,7 +5179,7 @@ tindexscan(Par par)
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, readverifyfull, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, readverifyindex, MT);
}
@@ -4753,7 +5195,7 @@ tpkops(Par par)
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkops, MT);
LL2("rows=" << par.set().count());
@@ -4772,7 +5214,7 @@ tpkopsread(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverifyfull, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverifyfull, MT);
@@ -4792,7 +5234,7 @@ tmixedops(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverifyfull, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverifyfull, MT);
@@ -4807,7 +5249,7 @@ tbusybuild(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
@@ -4828,7 +5270,7 @@ trollback(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverifyfull, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverifyfull, MT);
@@ -4846,7 +5288,7 @@ tparupdate(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverifyfull, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, parallelorderedupdate, MT);
RUNSTEP(par, readverifyfull, MT);
@@ -4855,13 +5297,44 @@ tparupdate(Par par)
}
static int
+tsavepoint(Par par)
+{
+ RUNSTEP(par, droptable, ST);
+ RUNSTEP(par, createtable, ST);
+ RUNSTEP(par, invalidatetable, MT);
+ RUNSTEP(par, createindex, ST);
+ RUNSTEP(par, invalidateindex, MT);
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
+ RUNSTEP(par, savepointtest, MT);
+ RUNSTEP(par, readverifyfull, MT);
+ }
+ return 0;
+}
+
+static int
+thalloween(Par par)
+{
+ RUNSTEP(par, droptable, ST);
+ RUNSTEP(par, createtable, ST);
+ RUNSTEP(par, invalidatetable, MT);
+ RUNSTEP(par, createindex, ST);
+ RUNSTEP(par, invalidateindex, MT);
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
+ RUNSTEP(par, halloweentest, MT);
+ }
+ return 0;
+}
+
+static int
ttimebuild(Par par)
{
Tmr t1;
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
t1.on();
@@ -4881,7 +5354,7 @@ ttimemaint(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
t1.on();
@@ -4911,7 +5384,7 @@ ttimescan(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
@@ -4938,7 +5411,7 @@ ttimepkread(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
- for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ for (par.m_slno = 0; par.m_slno < par.m_sloop; par.m_slno++) {
LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
@@ -4981,7 +5454,9 @@ tcaselist[] = {
TCase("e", tmixedops, "pk operations and scan operations"),
TCase("f", tbusybuild, "pk operations and index build"),
TCase("g", trollback, "operations with random rollbacks"),
- TCase("h", tparupdate, "parallel ordered update (bug20446)"),
+ TCase("h", tparupdate, "parallel ordered update bug#20446"),
+ TCase("i", tsavepoint, "savepoint test locking bug#31477"),
+ TCase("j", thalloween, "savepoint test halloween problem"),
TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"),
@@ -4989,14 +5464,14 @@ tcaselist[] = {
TCase("z", tdrop, "drop test tables")
};
-static const unsigned
+static const uint
tcasecount = sizeof(tcaselist) / sizeof(tcaselist[0]);
static void
printcases()
{
ndbout << "test cases:" << endl;
- for (unsigned i = 0; i < tcasecount; i++) {
+ for (uint i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i];
ndbout << " " << tcase.m_name << " - " << tcase.m_desc << endl;
}
@@ -5008,13 +5483,13 @@ printtables()
Par par(g_opt);
makebuiltintables(par);
ndbout << "tables and indexes (x=ordered z=hash x0=on pk):" << endl;
- for (unsigned j = 0; j < tabcount; j++) {
+ for (uint j = 0; j < tabcount; j++) {
if (tablist[j] == 0)
continue;
const Tab& tab = *tablist[j];
const char* tname = tab.m_name;
ndbout << " " << tname;
- for (unsigned i = 0; i < tab.m_itabs; i++) {
+ for (uint i = 0; i < tab.m_itabs; i++) {
if (tab.m_itab[i] == 0)
continue;
const ITab& itab = *tab.m_itab[i];
@@ -5023,7 +5498,7 @@ printtables()
iname += strlen(tname);
ndbout << " " << iname;
ndbout << "(";
- for (unsigned k = 0; k < itab.m_icols; k++) {
+ for (uint k = 0; k < itab.m_icols; k++) {
if (k != 0)
ndbout << ",";
const ICol& icol = *itab.m_icol[k];
@@ -5036,14 +5511,48 @@ printtables()
}
}
+static bool
+setcasepar(Par& par)
+{
+ Opt d;
+ const char* c = par.m_currcase;
+ switch (c[0]) {
+ case 'i':
+ {
+ if (par.m_usedthreads > 1) {
+ par.m_usedthreads = 1;
+ LL1("case " << c << " reduce threads to " << par.m_usedthreads);
+ }
+ const uint rows = 100;
+ if (par.m_rows > rows) {
+ par.m_rows = rows;
+ LL1("case " << c << " reduce rows to " << rows);
+ }
+ }
+ break;
+ case 'j':
+ {
+ if (par.m_usedthreads > 1) {
+ par.m_usedthreads = 1;
+ LL1("case " << c << " reduce threads to " << par.m_usedthreads);
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ return true;
+}
+
static int
runtest(Par par)
{
+ int totret = 0;
if (par.m_seed == -1) {
// good enough for daily run
- unsigned short seed = (unsigned short)getpid();
+ ushort seed = (ushort)getpid();
LL0("random seed: " << seed);
- srandom((unsigned)seed);
+ srandom((uint)seed);
} else if (par.m_seed != 0) {
LL0("random seed: " << par.m_seed);
srandom(par.m_seed);
@@ -5061,9 +5570,10 @@ runtest(Par par)
Con con;
CHK(con.connect() == 0);
par.m_con = &con;
+ par.m_catcherr |= Con::ErrNospace;
// threads
g_thrlist = new Thr* [par.m_threads];
- unsigned n;
+ uint n;
for (n = 0; n < par.m_threads; n++) {
g_thrlist[n] = 0;
}
@@ -5078,23 +5588,38 @@ runtest(Par par)
LL1("random seed: " << par.m_lno);
srandom(par.m_lno);
}
- for (unsigned i = 0; i < tcasecount; i++) {
+ for (uint i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i];
- if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
+ if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0 ||
+ par.m_skip != 0 && strchr(par.m_skip, tcase.m_name[0]) != 0) {
continue;
+ }
sprintf(par.m_currcase, "%c", tcase.m_name[0]);
+ par.m_usedthreads = par.m_threads;
+ if (!setcasepar(par)) {
+ LL1("case " << tcase.m_name << " cannot run with given options");
+ continue;
+ }
+ par.m_totrows = par.m_usedthreads * par.m_rows;
makebuiltintables(par);
LL1("case: " << par.m_lno << "/" << tcase.m_name << " - " << tcase.m_desc);
- for (unsigned j = 0; j < tabcount; j++) {
+ for (uint j = 0; j < tabcount; j++) {
if (tablist[j] == 0)
continue;
const Tab& tab = *tablist[j];
par.m_tab = &tab;
par.m_set = new Set(tab, par.m_totrows);
LL1("table: " << par.m_lno << "/" << tcase.m_name << "/" << tab.m_name);
- CHK(tcase.m_func(par) == 0);
+ int ret = tcase.m_func(par);
delete par.m_set;
par.m_set = 0;
+ if (ret == -1) {
+ if (!par.m_cont)
+ return -1;
+ totret = -1;
+ LL1("continue to next case due to -cont");
+ break;
+ }
}
}
}
@@ -5110,7 +5635,7 @@ runtest(Par par)
delete [] g_thrlist;
g_thrlist = 0;
con.disconnect();
- return 0;
+ return totret;
}
static const char* g_progname = "testOIBasic";
@@ -5119,7 +5644,7 @@ int
main(int argc, char** argv)
{
ndb_init();
- unsigned i;
+ uint i;
ndbout << g_progname;
for (i = 1; i < argc; i++)
ndbout << " " << argv[i];
@@ -5156,6 +5681,10 @@ main(int argc, char** argv)
g_opt.m_collsp = true;
continue;
}
+ if (strcmp(arg, "-cont") == 0) {
+ g_opt.m_cont = true;
+ continue;
+ }
if (strcmp(arg, "-core") == 0) {
g_opt.m_core = true;
continue;
@@ -5252,9 +5781,21 @@ main(int argc, char** argv)
continue;
}
}
- if (strcmp(arg, "-subloop") == 0) {
+ if (strcmp(arg, "-skip") == 0) {
+ if (++argv, --argc > 0) {
+ g_opt.m_skip = strdup(argv[0]);
+ continue;
+ }
+ }
+ if (strcmp(arg, "-sloop") == 0) {
+ if (++argv, --argc > 0) {
+ g_opt.m_sloop = atoi(argv[0]);
+ continue;
+ }
+ }
+ if (strcmp(arg, "-ssloop") == 0) {
if (++argv, --argc > 0) {
- g_opt.m_subloop = atoi(argv[0]);
+ g_opt.m_ssloop = atoi(argv[0]);
continue;
}
}