diff options
author | unknown <joreland@mysql.com> | 2004-12-06 14:52:31 +0100 |
---|---|---|
committer | unknown <joreland@mysql.com> | 2004-12-06 14:52:31 +0100 |
commit | bf0b3493d695ec32814017be1f2da059b65982c0 (patch) | |
tree | 3071885324d9cba2e6369bdea65d4d7f3c56ab03 /ndb | |
parent | eb05d78dcb8074a5dfceedf8c86681531fc3bfbe (diff) | |
parent | b1f4a482f4545999d6210aabdc2c0a80ee574374 (diff) | |
download | mariadb-git-bf0b3493d695ec32814017be1f2da059b65982c0.tar.gz |
Merge mysql.com:/home/jonas/src/mysql-4.1-fix
into mysql.com:/home/jonas/src/wl1744
BitKeeper/etc/logging_ok:
auto-union
configure.in:
Auto merged
ndb/include/Makefile.am:
Auto merged
ndb/src/common/mgmcommon/ConfigRetriever.cpp:
Auto merged
ndb/src/common/util/version.c:
Auto merged
ndb/src/kernel/blocks/dbdict/Dbdict.cpp:
Auto merged
ndb/src/kernel/blocks/dbdict/Dbdict.hpp:
Auto merged
ndb/src/kernel/blocks/dbdih/Dbdih.hpp:
Auto merged
ndb/src/kernel/blocks/dbdih/DbdihMain.cpp:
Auto merged
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
Auto merged
ndb/src/mgmsrv/main.cpp:
Auto merged
ndb/src/ndbapi/NdbConnection.cpp:
Auto merged
sql/ha_ndbcluster.cc:
Auto merged
Diffstat (limited to 'ndb')
47 files changed, 1160 insertions, 793 deletions
diff --git a/ndb/docs/wl2077.txt b/ndb/docs/wl2077.txt new file mode 100644 index 00000000000..5a77c18aa2a --- /dev/null +++ b/ndb/docs/wl2077.txt @@ -0,0 +1,35 @@ + +100' * (select 1 from T1 (1M rows) where key = rand()); +1 host, 1 ndbd, api co-hosted +results in 1000 rows / sec + + wo/reset bounds w/ rb +4.1-read committed a) 4.9 b) 7.4 +4.1-read hold lock c) 4.7 d) 6.7 + +wl2077-read committed 6.4 (+30%) 10.8 (+45%) +wl2077-read hold lock 4.6 (-1%) 6.7 (+ 0%) + +-- Comparision e) +serial pk: 10.9' +batched (1000): 59' +serial uniq index: 8.4' +batched (1000): 33' +index range (1000): 186' + +---- + +load) testScanPerf -c 1 -d 1 T1 +a) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 0 T1 +b) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 1 T1 +c) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 0 T1 +d) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 1 T1 +e) testReadPerf -i 25 -c 0 -d 0 T1 + +--- music join 1db-co 2db-co + +4.1 13s 14s +4.1 wo/ blobs 1.7s 3.2s + +wl2077 12s 14s +wl2077 wo/ blobs 1.2s (-30%) 2.5s (-22%) diff --git a/ndb/include/Makefile.am b/ndb/include/Makefile.am index 56f71ae6d0b..b1e72dacffd 100644 --- a/ndb/include/Makefile.am +++ b/ndb/include/Makefile.am @@ -28,6 +28,7 @@ ndbapi/NdbIndexScanOperation.hpp \ ndbapi/ndberror.h mgmapiinclude_HEADERS = \ +mgmapi/LocalConfig.hpp \ mgmapi/mgmapi.h \ mgmapi/mgmapi_debug.h diff --git a/ndb/include/kernel/signaldata/TupFrag.hpp b/ndb/include/kernel/signaldata/TupFrag.hpp index c1e861c5dff..c132b19c50a 100644 --- a/ndb/include/kernel/signaldata/TupFrag.hpp +++ b/ndb/include/kernel/signaldata/TupFrag.hpp @@ -132,9 +132,10 @@ class TupAddAttrConf { friend class Dblqh; friend class Dbtup; public: - STATIC_CONST( SignalLength = 1 ); + STATIC_CONST( SignalLength = 2 ); private: Uint32 userPtr; + Uint32 lastAttr; // bool: got last attr and closed frag op }; class TupAddAttrRef { @@ -171,9 +172,10 @@ class TuxAddAttrConf { friend class Dblqh; friend class Dbtux; public: - STATIC_CONST( SignalLength = 1 ); + STATIC_CONST( SignalLength = 2 ); private: Uint32 userPtr; + Uint32 lastAttr; // bool: got last attr and closed frag op }; class TuxAddAttrRef { diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp index 2e4d173ac75..5689b62526c 100644 --- a/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/ndb/include/ndbapi/NdbScanOperation.hpp @@ -127,14 +127,23 @@ protected: NdbReceiver** m_receivers; // All receivers Uint32* m_prepared_receivers; // These are to be sent - + + /** + * owned by API/user thread + */ Uint32 m_current_api_receiver; Uint32 m_api_receivers_count; NdbReceiver** m_api_receivers; // These are currently used by api + /** + * owned by receiver thread + */ Uint32 m_conf_receivers_count; // NOTE needs mutex to access NdbReceiver** m_conf_receivers; // receive thread puts them here + /** + * owned by receiver thread + */ Uint32 m_sent_receivers_count; // NOTE needs mutex to access NdbReceiver** m_sent_receivers; // receive thread puts them here diff --git a/ndb/src/common/debugger/signaldata/ScanTab.cpp b/ndb/src/common/debugger/signaldata/ScanTab.cpp index 72a4d9f94b9..0755ee0a856 100644 --- a/ndb/src/common/debugger/signaldata/ScanTab.cpp +++ b/ndb/src/common/debugger/signaldata/ScanTab.cpp @@ -30,13 +30,14 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv fprintf(output, " apiConnectPtr: H\'%.8x", sig->apiConnectPtr); fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo); - fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u\n", + fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u ReadCommitted: %u\n", sig->getParallelism(requestInfo), sig->getScanBatch(requestInfo), sig->getLockMode(requestInfo), + sig->getKeyinfoFlag(requestInfo), sig->getHoldLockFlag(requestInfo), sig->getRangeScanFlag(requestInfo), - sig->getKeyinfoFlag(requestInfo)); + sig->getReadCommittedFlag(requestInfo)); Uint32 keyLen = (sig->attrLenKeyLen >> 16); Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF); diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp index d03d6fbdb6e..06cc7a9cce0 100644 --- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp +++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp @@ -241,7 +241,8 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf, Uint32 char buf[255]; ndb_mgm_configuration_iterator * it; - it = ndb_mgm_create_configuration_iterator((struct ndb_mgm_configuration *)conf, CFG_SECTION_NODE); + it = ndb_mgm_create_configuration_iterator((struct ndb_mgm_configuration *)conf, + CFG_SECTION_NODE); if(it == 0){ BaseString::snprintf(buf, 255, "Unable to create config iterator"); @@ -287,8 +288,14 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf, Uint32 } if(_type != m_node_type){ - BaseString::snprintf(buf, 255, "Supplied node type(%d) and config node type(%d) " - " don't match", m_node_type, _type); + const char *type_s, *alias_s, *type_s2, *alias_s2; + alias_s= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)m_node_type, + &type_s); + alias_s2= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)_type, + &type_s2); + BaseString::snprintf(buf, 255, "This node type %s(%s) and config " + "node type %s(%s) don't match for nodeid %d", + alias_s, type_s, alias_s2, type_s2, nodeid); setError(CR_ERROR, buf); return false; } diff --git a/ndb/src/common/util/NdbSqlUtil.cpp b/ndb/src/common/util/NdbSqlUtil.cpp index 6e4e5919e43..5b2381df50a 100644 --- a/ndb/src/common/util/NdbSqlUtil.cpp +++ b/ndb/src/common/util/NdbSqlUtil.cpp @@ -582,7 +582,7 @@ NdbSqlUtil::usable_in_pk(Uint32 typeId, const void* info) cs->cset != 0 && cs->coll != 0 && cs->coll->strnxfrm != 0 && - cs->strxfrm_multiply == 1; // current limitation + cs->strxfrm_multiply <= 1; // current limitation } break; case Type::Varchar: @@ -618,7 +618,7 @@ NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info) cs->coll != 0 && cs->coll->strnxfrm != 0 && cs->coll->strnncollsp != 0 && - cs->strxfrm_multiply == 1; // current limitation + cs->strxfrm_multiply <= 1; // current limitation } break; case Type::Varchar: @@ -633,7 +633,7 @@ NdbSqlUtil::usable_in_ordered_index(Uint32 typeId, const void* info) cs->coll != 0 && cs->coll->strnxfrm != 0 && cs->coll->strnncollsp != 0 && - cs->strxfrm_multiply == 1; // current limitation + cs->strxfrm_multiply <= 1; // current limitation } break; default: diff --git a/ndb/src/common/util/version.c b/ndb/src/common/util/version.c index dc549457d14..84ea2ea5226 100644 --- a/ndb/src/common/util/version.c +++ b/ndb/src/common/util/version.c @@ -71,7 +71,6 @@ struct NdbUpGradeCompatible { #ifndef TEST_VERSION struct NdbUpGradeCompatible ndbCompatibleTable_full[] = { { MAKE_VERSION(3,5,2), MAKE_VERSION(3,5,1), UG_Exact }, - { MAKE_VERSION(4,1,8), MAKE_VERSION(3,5,4), UG_Exact }, /* Aligned version with MySQL */ { 0, 0, UG_Null } }; diff --git a/ndb/src/kernel/blocks/ERROR_codes.txt b/ndb/src/kernel/blocks/ERROR_codes.txt index 70f11c33cd7..7ff03684cff 100644 --- a/ndb/src/kernel/blocks/ERROR_codes.txt +++ b/ndb/src/kernel/blocks/ERROR_codes.txt @@ -2,7 +2,7 @@ Next QMGR 1 Next NDBCNTR 1000 Next NDBFS 2000 Next DBACC 3001 -Next DBTUP 4007 +Next DBTUP 4013 Next DBLQH 5042 Next DBDICT 6006 Next DBDIH 7174 @@ -10,7 +10,7 @@ Next DBTC 8035 Next CMVMI 9000 Next BACKUP 10022 Next DBUTIL 11002 -Next DBTUX 12001 +Next DBTUX 12007 Next SUMA 13001 TESTING NODE FAILURE, ARBITRATION @@ -393,6 +393,12 @@ Failed Create Table: -------------------- 7173: Create table failed due to not sufficient number of fragment or replica records. +4007 12001: Fail create 1st fragment +4008 12002: Fail create 2nd fragment +4009 12003: Fail create 1st attribute in 1st fragment +4010 12004: Fail create last attribute in 1st fragment +4011 12005: Fail create 1st attribute in 2nd fragment +4012 12006: Fail create last attribute in 2nd fragment Drop Table/Index: ----------------- diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp index 35ea878f94b..8dca303f7f8 100644 --- a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp +++ b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp @@ -239,7 +239,11 @@ Dbdict::packTableIntoPagesImpl(SimpleProperties::Writer & w, w.add(DictTabInfo::TableName, tablePtr.p->tableName); w.add(DictTabInfo::TableId, tablePtr.i); +#ifdef HAVE_TABLE_REORG w.add(DictTabInfo::SecondTableId, tablePtr.p->secondTable); +#else + w.add(DictTabInfo::SecondTableId, (Uint32)0); +#endif w.add(DictTabInfo::TableVersion, tablePtr.p->tableVersion); w.add(DictTabInfo::NoOfKeyAttr, tablePtr.p->noOfPrimkey); w.add(DictTabInfo::NoOfAttributes, tablePtr.p->noOfAttributes); @@ -1436,6 +1440,7 @@ Uint32 Dbdict::getFreeTableRecord(Uint32 primaryTableId) jam(); return RNIL; }//if +#ifdef HAVE_TABLE_REORG bool secondFound = false; for (tablePtr.i = firstTablePtr.i + 1; tablePtr.i < tabSize ; tablePtr.i++) { jam(); @@ -1455,6 +1460,7 @@ Uint32 Dbdict::getFreeTableRecord(Uint32 primaryTableId) firstTablePtr.p->tabState = TableRecord::NOT_DEFINED; return RNIL; }//if +#endif return firstTablePtr.i; }//Dbdict::getFreeTableRecord() @@ -4623,7 +4629,7 @@ void Dbdict::handleTabInfoInit(SimpleProperties::Reader & it, jam(); tablePtr.p->tabState = TableRecord::DEFINING; }//if - +#ifdef HAVE_TABLE_REORG /* ---------------------------------------------------------------- */ // Get id of second table id and check that table doesn't already exist // and set up links between first and second table. @@ -4637,7 +4643,7 @@ void Dbdict::handleTabInfoInit(SimpleProperties::Reader & it, secondTablePtr.p->tabState = TableRecord::REORG_TABLE_PREPARED; secondTablePtr.p->secondTable = tablePtr.i; tablePtr.p->secondTable = secondTablePtr.i; - +#endif /* ---------------------------------------------------------------- */ // Set table version /* ---------------------------------------------------------------- */ @@ -5535,10 +5541,12 @@ void Dbdict::releaseTableObject(Uint32 tableId, bool removeFromHash) nextAttrRecord = attrPtr.p->nextAttrInTable; c_attributeRecordPool.release(attrPtr); }//if +#ifdef HAVE_TABLE_REORG Uint32 secondTableId = tablePtr.p->secondTable; initialiseTableRecord(tablePtr); c_tableRecordPool.getPtr(tablePtr, secondTableId); initialiseTableRecord(tablePtr); +#endif return; }//releaseTableObject() diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.hpp b/ndb/src/kernel/blocks/dbdict/Dbdict.hpp index 5a61c0739a6..5fc4742e829 100644 --- a/ndb/src/kernel/blocks/dbdict/Dbdict.hpp +++ b/ndb/src/kernel/blocks/dbdict/Dbdict.hpp @@ -151,10 +151,10 @@ public: /* Temporary record used during add/drop table */ Uint32 myConnect; - +#ifdef HAVE_TABLE_REORG /* Second table used by this table (for table reorg) */ Uint32 secondTable; - +#endif /* Next record in Pool */ Uint32 nextPool; diff --git a/ndb/src/kernel/blocks/dbdih/Dbdih.hpp b/ndb/src/kernel/blocks/dbdih/Dbdih.hpp index 66acdf32059..ee67bf47d7b 100644 --- a/ndb/src/kernel/blocks/dbdih/Dbdih.hpp +++ b/ndb/src/kernel/blocks/dbdih/Dbdih.hpp @@ -147,7 +147,6 @@ public: Uint32 nfConnect; Uint32 table; Uint32 userpointer; - Uint32 nodeCount; BlockReference userblockref; }; typedef Ptr<ConnectRecord> ConnectRecordPtr; diff --git a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp index 9fd69378359..a8ad4008a74 100644 --- a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp +++ b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp @@ -7080,24 +7080,22 @@ void Dbdih::execDIGETPRIMREQ(Signal* signal) ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE); connectPtr.i = signal->theData[0]; - if(connectPtr.i != RNIL){ + if(connectPtr.i != RNIL) + { jam(); ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord); - ndbrequire(connectPtr.p->connectState == ConnectRecord::INUSE); - getFragstore(tabPtr.p, fragId, fragPtr); - connectPtr.p->nodeCount = extractNodeInfo(fragPtr.p, connectPtr.p->nodes); signal->theData[0] = connectPtr.p->userpointer; - signal->theData[1] = passThrough; - signal->theData[2] = connectPtr.p->nodes[0]; - sendSignal(connectPtr.p->userblockref, GSN_DIGETPRIMCONF, signal, 3, JBB); - return; - }//if - //connectPtr.i == RNIL -> question without connect record + } + else + { + jam(); + signal->theData[0] = RNIL; + } + Uint32 nodes[MAX_REPLICAS]; getFragstore(tabPtr.p, fragId, fragPtr); Uint32 count = extractNodeInfo(fragPtr.p, nodes); - signal->theData[0] = RNIL; signal->theData[1] = passThrough; signal->theData[2] = nodes[0]; signal->theData[3] = nodes[1]; diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index d6987f3e478..0c63cb5fe17 100644 --- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -550,6 +550,11 @@ public: UintR scanErrorCounter; UintR scanLocalFragid; UintR scanSchemaVersion; + + /** + * This is _always_ main table, even in range scan + * in which case scanTcrec->fragmentptr is different + */ Uint32 fragPtrI; UintR scanStoredProcId; ScanState scanState; @@ -2474,7 +2479,7 @@ private: void sendExecFragRefLab(Signal* signal); void fragrefLab(Signal* signal, BlockReference retRef, Uint32 retPtr, Uint32 errorCode); - void accFragRefLab(Signal* signal); + void abortAddFragOps(Signal* signal); void rwConcludedLab(Signal* signal); void sendsttorryLab(Signal* signal); void initialiseRecordsLab(Signal* signal, Uint32 data, Uint32, Uint32); @@ -2925,4 +2930,23 @@ Dblqh::ScanRecord::check_scan_batch_completed() const (max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes)); } +inline +void +Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index) +{ + if (index == 0) { + acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0]; + } else { + Uint32 attr_buf_index, attr_buf_rec; + + AttrbufPtr regAttrPtr; + jam(); + attr_buf_rec= (index + 31) / 32; + attr_buf_index= (index - 1) & 31; + regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; + ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); + acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index]; + } +} + #endif diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index af1131e5e55..8bbbc72a38d 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -912,6 +912,10 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* signal) /* *********************************************************> */ /* LQHFRAGREQ: Create new fragments for a table. Sender DICT */ /* *********************************************************> */ + +// this unbelievable mess could be replaced by one signal to LQH +// and execute direct to local DICT to get everything at once + void Dblqh::execLQHFRAGREQ(Signal* signal) { jamEntry(); @@ -1049,6 +1053,11 @@ void Dblqh::execLQHFRAGREQ(Signal* signal) addfragptr.p->lh3DistrBits = tlhstar; addfragptr.p->tableType = tableType; addfragptr.p->primaryTableId = primaryTableId; + // + addfragptr.p->tup1Connectptr = RNIL; + addfragptr.p->tup2Connectptr = RNIL; + addfragptr.p->tux1Connectptr = RNIL; + addfragptr.p->tux2Connectptr = RNIL; if (DictTabInfo::isTable(tableType) || DictTabInfo::isHashIndex(tableType)) { @@ -1329,15 +1338,21 @@ void Dblqh::execTUP_ADD_ATTCONF(Signal* signal) { jamEntry(); addfragptr.i = signal->theData[0]; + // implies that operation was released on the other side + const bool lastAttr = signal->theData[1]; ptrCheckGuard(addfragptr, caddfragrecFileSize, addFragRecord); switch (addfragptr.p->addfragStatus) { case AddFragRecord::TUP_ATTR_WAIT1: jam(); + if (lastAttr) + addfragptr.p->tup1Connectptr = RNIL; addfragptr.p->addfragStatus = AddFragRecord::TUP_ATTR_WAIT2; sendAddAttrReq(signal); break; case AddFragRecord::TUP_ATTR_WAIT2: jam(); + if (lastAttr) + addfragptr.p->tup2Connectptr = RNIL; if (DictTabInfo::isOrderedIndex(addfragptr.p->tableType)) { addfragptr.p->addfragStatus = AddFragRecord::TUX_ATTR_WAIT1; sendAddAttrReq(signal); @@ -1347,11 +1362,15 @@ void Dblqh::execTUP_ADD_ATTCONF(Signal* signal) break; case AddFragRecord::TUX_ATTR_WAIT1: jam(); + if (lastAttr) + addfragptr.p->tux1Connectptr = RNIL; addfragptr.p->addfragStatus = AddFragRecord::TUX_ATTR_WAIT2; sendAddAttrReq(signal); break; case AddFragRecord::TUX_ATTR_WAIT2: jam(); + if (lastAttr) + addfragptr.p->tux2Connectptr = RNIL; goto done_with_attr; break; done_with_attr: @@ -1455,6 +1474,7 @@ Dblqh::sendAddAttrReq(Signal* signal) jam(); TupAddAttrConf* tupconf = (TupAddAttrConf*)signal->getDataPtrSend(); tupconf->userPtr = addfragptr.i; + tupconf->lastAttr = false; sendSignal(reference(), GSN_TUP_ADD_ATTCONF, signal, TupAddAttrConf::SignalLength, JBB); return; @@ -1485,6 +1505,7 @@ Dblqh::sendAddAttrReq(Signal* signal) jam(); TuxAddAttrConf* tuxconf = (TuxAddAttrConf*)signal->getDataPtrSend(); tuxconf->userPtr = addfragptr.i; + tuxconf->lastAttr = false; sendSignal(reference(), GSN_TUX_ADD_ATTRCONF, signal, TuxAddAttrConf::SignalLength, JBB); return; @@ -1549,6 +1570,40 @@ void Dblqh::fragrefLab(Signal* signal, return; }//Dblqh::fragrefLab() +/* + * Abort on-going ops. + */ +void Dblqh::abortAddFragOps(Signal* signal) +{ + fragptr.i = addfragptr.p->fragmentPtr; + ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord); + signal->theData[0] = (Uint32)-1; + if (addfragptr.p->tup1Connectptr != RNIL) { + jam(); + signal->theData[1] = addfragptr.p->tup1Connectptr; + sendSignal(fragptr.p->tupBlockref, GSN_TUPFRAGREQ, signal, 2, JBB); + addfragptr.p->tup1Connectptr = RNIL; + } + if (addfragptr.p->tup2Connectptr != RNIL) { + jam(); + signal->theData[1] = addfragptr.p->tup2Connectptr; + sendSignal(fragptr.p->tupBlockref, GSN_TUPFRAGREQ, signal, 2, JBB); + addfragptr.p->tup2Connectptr = RNIL; + } + if (addfragptr.p->tux1Connectptr != RNIL) { + jam(); + signal->theData[1] = addfragptr.p->tux1Connectptr; + sendSignal(fragptr.p->tuxBlockref, GSN_TUXFRAGREQ, signal, 2, JBB); + addfragptr.p->tux1Connectptr = RNIL; + } + if (addfragptr.p->tux2Connectptr != RNIL) { + jam(); + signal->theData[1] = addfragptr.p->tux2Connectptr; + sendSignal(fragptr.p->tuxBlockref, GSN_TUXFRAGREQ, signal, 2, JBB); + addfragptr.p->tux2Connectptr = RNIL; + } +} + /* ************>> */ /* ACCFRAGREF > */ /* ************>> */ @@ -1582,6 +1637,27 @@ void Dblqh::execTUPFRAGREF(Signal* signal) fragptr.i = addfragptr.p->fragmentPtr; ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord); addfragptr.p->addfragErrorCode = terrorCode; + + // no operation to release, just add some jams + switch (addfragptr.p->addfragStatus) { + case AddFragRecord::WAIT_TWO_TUP: + jam(); + break; + case AddFragRecord::WAIT_ONE_TUP: + jam(); + break; + case AddFragRecord::WAIT_TWO_TUX: + jam(); + break; + case AddFragRecord::WAIT_ONE_TUX: + jam(); + break; + default: + ndbrequire(false); + break; + } + abortAddFragOps(signal); + const Uint32 ref = addfragptr.p->dictBlockref; const Uint32 senderData = addfragptr.p->dictConnectptr; const Uint32 errorCode = addfragptr.p->addfragErrorCode; @@ -1605,11 +1681,38 @@ void Dblqh::execTUXFRAGREF(Signal* signal) void Dblqh::execTUP_ADD_ATTRREF(Signal* signal) { jamEntry(); - addfragptr.i = signal->theData[0]; ptrCheckGuard(addfragptr, caddfragrecFileSize, addFragRecord); terrorCode = signal->theData[1]; addfragptr.p->addfragErrorCode = terrorCode; + + // operation was released on the other side + switch (addfragptr.p->addfragStatus) { + case AddFragRecord::TUP_ATTR_WAIT1: + jam(); + ndbrequire(addfragptr.p->tup1Connectptr != RNIL); + addfragptr.p->tup1Connectptr = RNIL; + break; + case AddFragRecord::TUP_ATTR_WAIT2: + jam(); + ndbrequire(addfragptr.p->tup2Connectptr != RNIL); + addfragptr.p->tup2Connectptr = RNIL; + break; + case AddFragRecord::TUX_ATTR_WAIT1: + jam(); + ndbrequire(addfragptr.p->tux1Connectptr != RNIL); + addfragptr.p->tux1Connectptr = RNIL; + break; + case AddFragRecord::TUX_ATTR_WAIT2: + jam(); + ndbrequire(addfragptr.p->tux2Connectptr != RNIL); + addfragptr.p->tux2Connectptr = RNIL; + break; + default: + ndbrequire(false); + break; + } + abortAddFragOps(signal); const Uint32 Ref = addfragptr.p->dictBlockref; const Uint32 senderData = addfragptr.p->dictConnectptr; @@ -2981,6 +3084,7 @@ void Dblqh::execATTRINFO(Signal* signal) return; break; default: + ndbout_c("%d", regTcPtr->transactionState); ndbrequire(false); break; }//switch @@ -7058,10 +7162,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal) // Update timer on tcConnectRecord tcConnectptr.p->tcTimer = cLqhTimeOutCount; - init_acc_ptr_list(scanptr.p); - scanptr.p->m_curr_batch_size_rows = 0; - scanptr.p->m_curr_batch_size_bytes= 0; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; scanNextLoopLab(signal); }//Dblqh::continueScanNextReqLab() @@ -7219,6 +7320,8 @@ void Dblqh::closeScanRequestLab(Signal* signal) scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_bytes= 0; sendScanFragConf(signal, ZTRUE); + abort_scan(signal, scanptr.i, 0); + return; break; case TcConnectionrec::SCAN_TUPKEY: case TcConnectionrec::SCAN_FIRST_STOPPED: @@ -7260,22 +7363,32 @@ void Dblqh::scanLockReleasedLab(Signal* signal) tcConnectptr.i = scanptr.p->scanTcrec; ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); releaseActiveFrag(signal); + if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) { if ((scanptr.p->scanErrorCounter > 0) || (scanptr.p->scanCompletedStatus == ZTRUE)) { jam(); + scanptr.p->m_curr_batch_size_rows = 0; + scanptr.p->m_curr_batch_size_bytes = 0; closeScanLab(signal); } else if (scanptr.p->check_scan_batch_completed() && scanptr.p->scanLockHold != ZTRUE) { jam(); scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; sendScanFragConf(signal, ZFALSE); + } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) { + jam(); + closeScanLab(signal); + return; } else { jam(); /* - We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only - come here when scanHoldLock == ZTRUE - */ + * We came here after releasing locks after + * receiving SCAN_NEXTREQ from TC. We only come here + * when scanHoldLock == ZTRUE + */ + scanptr.p->m_curr_batch_size_rows = 0; + scanptr.p->m_curr_batch_size_bytes = 0; continueScanNextReqLab(signal); }//if } else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) { @@ -7362,25 +7475,6 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP) scanP->scan_acc_index = 0; } -inline -void -Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index) -{ - if (index == 0) { - acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0]; - } else { - Uint32 attr_buf_index, attr_buf_rec; - - AttrbufPtr regAttrPtr; - jam(); - attr_buf_rec= (index + 31) / 32; - attr_buf_index= (index - 1) & 31; - regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; - ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); - acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index]; - } -} - Uint32 Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index, @@ -7611,18 +7705,25 @@ void Dblqh::abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode){ jam(); scanptr.i = scan_ptr_i; c_scanRecordPool.getPtr(scanptr); + + fragptr.i = tcConnectptr.p->fragmentptr; + ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord); finishScanrec(signal); releaseScanrec(signal); tcConnectptr.p->transactionState = TcConnectionrec::IDLE; tcConnectptr.p->abortState = TcConnectionrec::ABORT_ACTIVE; - - ScanFragRef * ref = (ScanFragRef*)&signal->theData[0]; - ref->senderData = tcConnectptr.p->clientConnectrec; - ref->transId1 = tcConnectptr.p->transid[0]; - ref->transId2 = tcConnectptr.p->transid[1]; - ref->errorCode = errcode; - sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGREF, signal, - ScanFragRef::SignalLength, JBB); + + if(errcode) + { + jam(); + ScanFragRef * ref = (ScanFragRef*)&signal->theData[0]; + ref->senderData = tcConnectptr.p->clientConnectrec; + ref->transId1 = tcConnectptr.p->transid[0]; + ref->transId2 = tcConnectptr.p->transid[1]; + ref->errorCode = errcode; + sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGREF, signal, + ScanFragRef::SignalLength, JBB); + } deleteTransidHash(signal); releaseOprec(signal); releaseTcrec(signal, tcConnectptr); @@ -7904,6 +8005,13 @@ void Dblqh::nextScanConfScanLab(Signal* signal) /************************************************************* * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. ************************************************************ */ + if (!scanptr.p->scanLockHold) + { + jam(); + closeScanLab(signal); + return; + } + if (scanptr.p->scanCompletedStatus == ZTRUE) { if ((scanptr.p->scanLockHold == ZTRUE) && (scanptr.p->m_curr_batch_size_rows > 0)) { @@ -8404,8 +8512,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal) ScanFragRef::SignalLength, JBB); } else { jam(); - scanptr.p->m_curr_batch_size_rows = 0; - scanptr.p->m_curr_batch_size_bytes= 0; sendScanFragConf(signal, ZSCAN_FRAG_CLOSED); }//if finishScanrec(signal); @@ -8477,7 +8583,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) tFragPtr.i = fragptr.p->tableFragptr; ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); scanptr.p->fragPtrI = fragptr.p->tableFragptr; - + /** * !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11 * idx uses from MAX_PARALLEL_SCANS_PER_FRAG - MAX = 12-42) @@ -8486,10 +8592,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1); stop += start; Uint32 free = tFragPtr.p->m_scanNumberMask.find(start); - + if(free == Fragrecord::ScanNumberMask::NotFound || free >= stop){ jam(); - + if(scanPrio == 0){ jam(); return ScanFragRef::ZTOO_MANY_ACTIVE_SCAN_ERROR; @@ -8500,16 +8606,15 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) */ scanptr.p->scanState = ScanRecord::IN_QUEUE; LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, - tFragPtr.p->m_queuedScans); + fragptr.p->m_queuedScans); queue.add(scanptr); return ZOK; } - scanptr.p->scanNumber = free; tFragPtr.p->m_scanNumberMask.clear(free);// Update mask - - LocalDLList<ScanRecord> active(c_scanRecordPool, tFragPtr.p->m_activeScans); + + LocalDLList<ScanRecord> active(c_scanRecordPool, fragptr.p->m_activeScans); active.add(scanptr); if(scanptr.p->scanKeyinfoFlag){ jam(); @@ -8569,12 +8674,8 @@ void Dblqh::finishScanrec(Signal* signal) { release_acc_ptr_list(scanptr.p); - FragrecordPtr tFragPtr; - tFragPtr.i = scanptr.p->fragPtrI; - ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); - LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, - tFragPtr.p->m_queuedScans); + fragptr.p->m_queuedScans); if(scanptr.p->scanState == ScanRecord::IN_QUEUE){ jam(); @@ -8592,9 +8693,13 @@ void Dblqh::finishScanrec(Signal* signal) ndbrequire(tmp.p == scanptr.p); } - LocalDLList<ScanRecord> scans(c_scanRecordPool, tFragPtr.p->m_activeScans); + LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans); scans.release(scanptr); + FragrecordPtr tFragPtr; + tFragPtr.i = scanptr.p->fragPtrI; + ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); + const Uint32 scanNumber = scanptr.p->scanNumber; ndbrequire(!tFragPtr.p->m_scanNumberMask.get(scanNumber)); ScanRecordPtr restart; @@ -8621,7 +8726,7 @@ void Dblqh::finishScanrec(Signal* signal) ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); restart.p->scanNumber = scanNumber; restart.p->scanState = ScanRecord::WAIT_ACC_SCAN; - + queue.remove(restart); scans.add(restart); if(restart.p->scanKeyinfoFlag){ @@ -8809,6 +8914,13 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted) conf->total_len= total_len; sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF, signal, ScanFragConf::SignalLength, JBB); + + if(!scanptr.p->scanLockHold) + { + jam(); + scanptr.p->m_curr_batch_size_rows = 0; + scanptr.p->m_curr_batch_size_bytes= 0; + } }//Dblqh::sendScanFragConf() /* ######################################################################### */ diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp index a209df24c44..fb90ccc8c90 100644 --- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp +++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp @@ -1054,9 +1054,8 @@ public: // Id of the ScanRecord this fragment scan belongs to Uint32 scanRec; - // The maximum number of operations that can be scanned before - // returning to TC - Uint16 scanFragConcurrency; + // The value of fragmentCompleted in the last received SCAN_FRAGCONF + Uint8 m_scan_frag_conf_status; inline void startFragTimer(Uint32 timeVal){ scanFragTimer = timeVal; @@ -1193,8 +1192,10 @@ public: // Number of operation records per scanned fragment // Number of operations in first batch // Max number of bytes per batch - Uint16 noOprecPerFrag; - Uint16 first_batch_size; + union { + Uint16 first_batch_size_rows; + Uint16 batch_size_rows; + }; Uint32 batch_byte_size; Uint32 scanRequestInfo; // ScanFrag format diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index e7aec9e670c..dd1252b76b9 100644 --- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -8646,9 +8646,9 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, scanptr.p->scanTableref = tabptr.i; scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion; scanptr.p->scanParallel = scanParallel; - scanptr.p->noOprecPerFrag = noOprecPerFrag; - scanptr.p->first_batch_size= scanTabReq->first_batch_size; - scanptr.p->batch_byte_size= scanTabReq->batch_byte_size; + scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size; + scanptr.p->batch_byte_size = scanTabReq->batch_byte_size; + scanptr.p->batch_size_rows = noOprecPerFrag; Uint32 tmp = 0; const UintR ri = scanTabReq->requestInfo; @@ -8672,7 +8672,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, ndbrequire(list.seize(ptr)); ptr.p->scanRec = scanptr.i; ptr.p->scanFragId = 0; - ptr.p->scanFragConcurrency = noOprecPerFrag; ptr.p->m_apiPtr = cdata[i]; }//for @@ -8945,6 +8944,25 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal) scanptr.i = scanFragptr.p->scanRec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); + /** + * This must be false as select count(*) otherwise + * can "pass" committing on backup fragments and + * get incorrect row count + */ + if(false && ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo)) + { + jam(); + Uint32 max = 3+signal->theData[6]; + Uint32 nodeid = getOwnNodeId(); + for(Uint32 i = 3; i<max; i++) + if(signal->theData[i] == nodeid) + { + jam(); + tnodeid = nodeid; + break; + } + } + { /** * Check table @@ -9141,6 +9159,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0]; const Uint32 noCompletedOps = conf->completedOps; + const Uint32 status = conf->fragmentCompleted; scanFragptr.i = conf->senderData; c_scan_frag_pool.getPtr(scanFragptr); @@ -9163,11 +9182,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE); - const Uint32 status = conf->fragmentCompleted; - if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){ jam(); - if(status == ZFALSE){ + if(status == 0){ /** * We have started closing = we sent a close -> ignore this */ @@ -9184,11 +9201,11 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) return; } - if(status == ZCLOSED && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){ + if(noCompletedOps == 0 && status != 0 && + scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){ /** * Start on next fragment */ - ndbrequire(noCompletedOps == 0); scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; scanFragptr.p->startFragTimer(ctcTimer); @@ -9218,6 +9235,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) scanptr.p->m_queued_count++; } + scanFragptr.p->m_scan_frag_conf_status = status; scanFragptr.p->m_ops = noCompletedOps; scanFragptr.p->m_totalLen = total_len; scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY; @@ -9311,7 +9329,6 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) /********************************************************************* * APPLICATION IS CLOSING THE SCAN. **********************************************************************/ - ndbrequire(len == 0); close_scan_req(signal, scanptr, true); return; }//if @@ -9330,11 +9347,12 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) // Copy op ptrs so I dont overwrite them when sending... memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len); - ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0]; - nextReq->closeFlag = ZFALSE; - nextReq->transId1 = apiConnectptr.p->transid[0]; - nextReq->transId2 = apiConnectptr.p->transid[1]; - nextReq->batch_size_bytes= scanP->batch_byte_size; + ScanFragNextReq tmp; + tmp.closeFlag = ZFALSE; + tmp.transId1 = apiConnectptr.p->transid[0]; + tmp.transId2 = apiConnectptr.p->transid[1]; + tmp.batch_size_rows = scanP->batch_size_rows; + tmp.batch_size_bytes = scanP->batch_byte_size; ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags); ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags); @@ -9344,15 +9362,37 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) c_scan_frag_pool.getPtr(scanFragptr); ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED); - scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE; scanFragptr.p->startFragTimer(ctcTimer); - scanFragptr.p->m_ops = 0; - nextReq->senderData = scanFragptr.i; - nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency; - sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, - ScanFragNextReq::SignalLength, JBB); + if(scanFragptr.p->m_scan_frag_conf_status) + { + /** + * last scan was complete + */ + jam(); + ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag); + scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; + + tcConnectptr.i = scanptr.p->scanTcrec; + ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord); + scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++; + signal->theData[0] = tcConnectptr.p->dihConnectptr; + signal->theData[1] = scanFragptr.i; + signal->theData[2] = scanptr.p->scanTableref; + signal->theData[3] = scanFragptr.p->scanFragId; + sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB); + } + else + { + jam(); + scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE; + ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend(); + * req = tmp; + req->senderData = scanFragptr.i; + sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, + ScanFragNextReq::SignalLength, JBB); + } delivered.remove(scanFragptr); running.add(scanFragptr); }//for @@ -9416,7 +9456,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){ ndbrequire(curr.p->scanFragState == ScanFragRec::DELIVERED); delivered.remove(curr); - if(curr.p->m_ops > 0){ + if(curr.p->m_ops > 0 && curr.p->m_scan_frag_conf_status == 0){ jam(); running.add(curr); curr.p->scanFragState = ScanFragRec::LQH_ACTIVE; @@ -9551,7 +9591,7 @@ void Dbtc::sendScanFragReq(Signal* signal, req->transId1 = apiConnectptr.p->transid[0]; req->transId2 = apiConnectptr.p->transid[1]; req->clientOpPtr = scanFragP->m_apiPtr; - req->batch_size_rows= scanFragP->scanFragConcurrency; + req->batch_size_rows= scanP->batch_size_rows; req->batch_size_bytes= scanP->batch_byte_size; sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, ScanFragReq::SignalLength, JBB); @@ -9573,6 +9613,8 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) { jam(); ops += 21; } + + Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId; ScanTabConf * conf = (ScanTabConf*)&signal->theData[0]; conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect; @@ -9588,24 +9630,25 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) { ScanFragRecPtr curr = ptr; // Remove while iterating... queued.next(ptr); + bool done = curr.p->m_scan_frag_conf_status && --left; + * ops++ = curr.p->m_apiPtr; - * ops++ = curr.i; + * ops++ = done ? RNIL : curr.i; * ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops; queued.remove(curr); - if(curr.p->m_ops > 0){ + if(!done){ delivered.add(curr); curr.p->scanFragState = ScanFragRec::DELIVERED; curr.p->stopFragTimer(); } else { - (* --ops) = ScanTabConf::EndOfData; ops++; c_scan_frag_pool.release(curr); curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->stopFragTimer(); } } } - + if(scanPtr.p->m_delivered_scan_frags.isEmpty() && scanPtr.p->m_running_scan_frags.isEmpty()){ conf->requestInfo = op_count | ScanTabConf::EndOfData; @@ -10424,9 +10467,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) sfp.i, sfp.p->scanFragState, sfp.p->scanFragId); - infoEvent(" nodeid=%d, concurr=%d, timer=%d", + infoEvent(" nodeid=%d, timer=%d", refToNode(sfp.p->lqhBlockref), - sfp.p->scanFragConcurrency, sfp.p->scanFragTimer); } @@ -10504,7 +10546,7 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) sp.p->scanAiLength, sp.p->scanParallel, sp.p->scanReceivedOperations, - sp.p->noOprecPerFrag); + sp.p->batch_size_rows); infoEvent(" schv=%d, tab=%d, sproc=%d", sp.p->scanSchemaVersion, sp.p->scanTableref, diff --git a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp index 55ad1d0910a..b48546576f9 100644 --- a/ndb/src/kernel/blocks/dbtup/Dbtup.hpp +++ b/ndb/src/kernel/blocks/dbtup/Dbtup.hpp @@ -504,6 +504,7 @@ struct Fragoperrec { Uint32 noOfNewAttrCount; Uint32 charsetIndex; BlockReference lqhBlockrefFrag; + bool inUse; }; typedef Ptr<Fragoperrec> FragoperrecPtr; @@ -1936,6 +1937,7 @@ private: void setUpKeyArray(Tablerec* const regTabPtr); bool addfragtotab(Tablerec* const regTabPtr, Uint32 fragId, Uint32 fragIndex); void deleteFragTab(Tablerec* const regTabPtr, Uint32 fragId); + void abortAddFragOp(Signal* signal); void releaseTabDescr(Tablerec* const regTabPtr); void getFragmentrec(FragrecordPtr& regFragPtr, Uint32 fragId, Tablerec* const regTabPtr); diff --git a/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp b/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp index efea312b865..914dba00674 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp @@ -39,11 +39,18 @@ /* ---------------------------------------------------------------- */ void Dbtup::execTUPFRAGREQ(Signal* signal) { + ljamEntry(); + + if (signal->theData[0] == (Uint32)-1) { + ljam(); + abortAddFragOp(signal); + return; + } + FragoperrecPtr fragOperPtr; FragrecordPtr regFragPtr; TablerecPtr regTabPtr; - ljamEntry(); Uint32 userptr = signal->theData[0]; Uint32 userblockref = signal->theData[1]; Uint32 reqinfo = signal->theData[2]; @@ -132,6 +139,15 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) return; }//if + if (ERROR_INSERTED(4007) && regTabPtr.p->fragid[0] == fragId || + ERROR_INSERTED(4008) && regTabPtr.p->fragid[1] == fragId) { + ljam(); + terrorCode = 1; + fragrefuse4Lab(signal, fragOperPtr, regFragPtr, regTabPtr.p, fragId); + CLEAR_ERROR_INSERT_VALUE; + return; + } + if (regTabPtr.p->tableStatus == NOT_DEFINED) { ljam(); //------------------------------------------------------------------------------------- @@ -243,6 +259,7 @@ void Dbtup::seizeFragoperrec(FragoperrecPtr& fragOperPtr) ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec); cfirstfreeFragopr = fragOperPtr.p->nextFragoprec; fragOperPtr.p->nextFragoprec = RNIL; + fragOperPtr.p->inUse = true; }//Dbtup::seizeFragoperrec() /* **************************************************************** */ @@ -273,6 +290,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ndbrequire(fragOperPtr.p->attributeCount > 0); fragOperPtr.p->attributeCount--; + const bool lastAttr = (fragOperPtr.p->attributeCount == 0); if ((regTabPtr.p->tableStatus == DEFINING) && (fragOperPtr.p->definingFragment)) { @@ -346,20 +364,30 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId); return; }//if - if ((fragOperPtr.p->attributeCount == 0) && - (fragOperPtr.p->freeNullBit != 0)) { + if (lastAttr && (fragOperPtr.p->freeNullBit != 0)) { ljam(); terrorCode = ZINCONSISTENT_NULL_ATTRIBUTE_COUNT; addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId); return; }//if }//if + if (ERROR_INSERTED(4009) && regTabPtr.p->fragid[0] == fragId && attrId == 0 || + ERROR_INSERTED(4010) && regTabPtr.p->fragid[0] == fragId && lastAttr || + ERROR_INSERTED(4011) && regTabPtr.p->fragid[1] == fragId && attrId == 0 || + ERROR_INSERTED(4012) && regTabPtr.p->fragid[1] == fragId && lastAttr) { + ljam(); + terrorCode = 1; + addattrrefuseLab(signal, regFragPtr, fragOperPtr, regTabPtr.p, fragId); + CLEAR_ERROR_INSERT_VALUE; + return; + } /* **************************************************************** */ /* ************** TUP_ADD_ATTCONF ****************** */ /* **************************************************************** */ signal->theData[0] = fragOperPtr.p->lqhPtrFrag; - sendSignal(fragOperPtr.p->lqhBlockrefFrag, GSN_TUP_ADD_ATTCONF, signal, 1, JBB); - if (fragOperPtr.p->attributeCount > 0) { + signal->theData[1] = lastAttr; + sendSignal(fragOperPtr.p->lqhBlockrefFrag, GSN_TUP_ADD_ATTCONF, signal, 2, JBB); + if (! lastAttr) { ljam(); return; /* EXIT AND WAIT FOR MORE */ }//if @@ -491,11 +519,11 @@ void Dbtup::fragrefuseLab(Signal* signal, FragoperrecPtr fragOperPtr) void Dbtup::releaseFragoperrec(FragoperrecPtr fragOperPtr) { + fragOperPtr.p->inUse = false; fragOperPtr.p->nextFragoprec = cfirstfreeFragopr; cfirstfreeFragopr = fragOperPtr.i; }//Dbtup::releaseFragoperrec() - void Dbtup::deleteFragTab(Tablerec* const regTabPtr, Uint32 fragId) { for (Uint32 i = 0; i < (2 * MAX_FRAG_PER_NODE); i++) { @@ -510,6 +538,20 @@ void Dbtup::deleteFragTab(Tablerec* const regTabPtr, Uint32 fragId) ndbrequire(false); }//Dbtup::deleteFragTab() +/* + * LQH aborts on-going create table operation. The table is later + * dropped by DICT. + */ +void Dbtup::abortAddFragOp(Signal* signal) +{ + FragoperrecPtr fragOperPtr; + + fragOperPtr.i = signal->theData[1]; + ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec); + ndbrequire(fragOperPtr.p->inUse); + releaseFragoperrec(fragOperPtr); +} + void Dbtup::execDROP_TAB_REQ(Signal* signal) { diff --git a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp index e6cc6f68842..cbb165c3eb1 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp @@ -706,7 +706,10 @@ Dbtup::checkUpdateOfPrimaryKey(Uint32* updateBuffer, Tablerec* const regTabPtr) tOutBufIndex = 0; tMaxRead = MAX_KEY_SIZE_IN_WORDS; + bool tmp = tXfrmFlag; + tXfrmFlag = false; ndbrequire((this->*f)(&keyReadBuffer[0], ahOut, attrDescriptor, attributeOffset)); + tXfrmFlag = tmp; ndbrequire(tOutBufIndex == ahOut->getDataSize()); if (ahIn.getDataSize() != ahOut->getDataSize()) { ljam(); diff --git a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp index 8896324f793..8f49b7fa6d6 100644 --- a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp +++ b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp @@ -575,6 +575,7 @@ private: void execDROP_TAB_REQ(Signal* signal); bool allocDescEnt(IndexPtr indexPtr); void freeDescEnt(IndexPtr indexPtr); + void abortAddFragOp(Signal* signal); void dropIndex(Signal* signal, IndexPtr indexPtr, Uint32 senderRef, Uint32 senderData); /* @@ -684,6 +685,7 @@ private: friend class NdbOut& operator<<(NdbOut&, const ScanOp&); friend class NdbOut& operator<<(NdbOut&, const Index&); friend class NdbOut& operator<<(NdbOut&, const Frag&); + friend class NdbOut& operator<<(NdbOut&, const FragOp&); friend class NdbOut& operator<<(NdbOut&, const NodeHandle&); FILE* debugFile; NdbOut debugOut; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp index c5c22264460..1e1b0d1d5b6 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp @@ -404,6 +404,19 @@ operator<<(NdbOut& out, const Dbtux::Frag& frag) } NdbOut& +operator<<(NdbOut& out, const Dbtux::FragOp& fragOp) +{ + out << "[FragOp " << hex << &fragOp; + out << " [userPtr " << dec << fragOp.m_userPtr << "]"; + out << " [indexId " << dec << fragOp.m_indexId << "]"; + out << " [fragId " << dec << fragOp.m_fragId << "]"; + out << " [fragNo " << dec << fragOp.m_fragNo << "]"; + out << " numAttrsRecvd " << dec << fragOp.m_numAttrsRecvd << "]"; + out << "]"; + return out; +} + +NdbOut& operator<<(NdbOut& out, const Dbtux::NodeHandle& node) { const Dbtux::Frag& frag = node.m_frag; diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp index ded02696a89..18aa914de05 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp @@ -24,13 +24,8 @@ Dbtux::Dbtux(const Configuration& conf) : #ifdef VM_TRACE debugFile(0), debugOut(*new NullOutputStream()), - // until ndb_mgm supports dump -#ifdef DBTUX_DEBUG_TREE - debugFlags(DebugTree), -#else debugFlags(0), #endif -#endif c_internalStartPhase(0), c_typeOfStart(NodeState::ST_ILLEGAL_TYPE), c_dataBuffer(0) @@ -86,7 +81,7 @@ Dbtux::execCONTINUEB(Signal* signal) jamEntry(); const Uint32* data = signal->getDataPtr(); switch (data[0]) { - case TuxContinueB::DropIndex: + case TuxContinueB::DropIndex: // currently unused { IndexPtr indexPtr; c_indexPool.getPtr(indexPtr, data[1]); @@ -174,7 +169,7 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_ATTRIBUTE, &nAttribute)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_SCAN_OP, &nScanOp)); - const Uint32 nDescPage = (nIndex + nAttribute + DescPageSize - 1) / DescPageSize; + const Uint32 nDescPage = (nIndex * DescHeadSize + nAttribute * DescAttrSize + DescPageSize - 1) / DescPageSize; const Uint32 nScanBoundWords = nScanOp * ScanBoundSegmentSize * 4; c_indexPool.setSize(nIndex); diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp index 1577c5045e0..b7526593a08 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp @@ -29,6 +29,11 @@ void Dbtux::execTUXFRAGREQ(Signal* signal) { jamEntry(); + if (signal->theData[0] == (Uint32)-1) { + jam(); + abortAddFragOp(signal); + return; + } const TuxFragReq reqCopy = *(const TuxFragReq*)signal->getDataPtr(); const TuxFragReq* const req = &reqCopy; IndexPtr indexPtr; @@ -61,6 +66,11 @@ Dbtux::execTUXFRAGREQ(Signal* signal) fragOpPtr.p->m_fragId = req->fragId; fragOpPtr.p->m_fragNo = indexPtr.p->m_numFrags; fragOpPtr.p->m_numAttrsRecvd = 0; +#ifdef VM_TRACE + if (debugFlags & DebugMeta) { + debugOut << "Seize frag op " << fragOpPtr.i << " " << *fragOpPtr.p << endl; + } +#endif // check if index has place for more fragments ndbrequire(indexPtr.p->m_numFrags < MaxIndexFragments); // seize new fragment record @@ -129,6 +139,14 @@ Dbtux::execTUXFRAGREQ(Signal* signal) debugOut << "Add frag " << fragPtr.i << " " << *fragPtr.p << endl; } #endif + // error inserts + if (ERROR_INSERTED(12001) && fragOpPtr.p->m_fragNo == 0 || + ERROR_INSERTED(12002) && fragOpPtr.p->m_fragNo == 1) { + jam(); + errorCode = (TuxFragRef::ErrorCode)1; + CLEAR_ERROR_INSERT_VALUE; + break; + } // success TuxFragConf* const conf = (TuxFragConf*)signal->getDataPtrSend(); conf->userPtr = req->userPtr; @@ -145,10 +163,18 @@ Dbtux::execTUXFRAGREQ(Signal* signal) ref->errorCode = errorCode; sendSignal(req->userRef, GSN_TUXFRAGREF, signal, TuxFragRef::SignalLength, JBB); - if (fragOpPtr.i != RNIL) + if (fragOpPtr.i != RNIL) { +#ifdef VM_TRACE + if (debugFlags & DebugMeta) { + debugOut << "Release on frag error frag op " << fragOpPtr.i << " " << *fragOpPtr.p << endl; + } +#endif c_fragOpPool.release(fragOpPtr); - if (indexPtr.i != RNIL) - dropIndex(signal, indexPtr, 0, 0); + } + if (indexPtr.i != RNIL) { + jam(); + // let DICT drop the unfinished index + } } void @@ -203,7 +229,16 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) } } #endif - if (indexPtr.p->m_numAttrs == fragOpPtr.p->m_numAttrsRecvd) { + const bool lastAttr = (indexPtr.p->m_numAttrs == fragOpPtr.p->m_numAttrsRecvd); + if (ERROR_INSERTED(12003) && fragOpPtr.p->m_fragNo == 0 && attrId == 0 || + ERROR_INSERTED(12004) && fragOpPtr.p->m_fragNo == 0 && lastAttr || + ERROR_INSERTED(12005) && fragOpPtr.p->m_fragNo == 1 && attrId == 0 || + ERROR_INSERTED(12006) && fragOpPtr.p->m_fragNo == 1 && lastAttr) { + errorCode = (TuxAddAttrRef::ErrorCode)1; + CLEAR_ERROR_INSERT_VALUE; + break; + } + if (lastAttr) { jam(); // initialize tree header TreeHead& tree = fragPtr.p->m_tree; @@ -246,11 +281,17 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) } #endif // fragment is defined +#ifdef VM_TRACE + if (debugFlags & DebugMeta) { + debugOut << "Release frag op " << fragOpPtr.i << " " << *fragOpPtr.p << endl; + } +#endif c_fragOpPool.release(fragOpPtr); } // success TuxAddAttrConf* conf = (TuxAddAttrConf*)signal->getDataPtrSend(); conf->userPtr = fragOpPtr.p->m_userPtr; + conf->lastAttr = lastAttr; sendSignal(fragOpPtr.p->m_userRef, GSN_TUX_ADD_ATTRCONF, signal, TuxAddAttrConf::SignalLength, JBB); return; @@ -261,8 +302,32 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) ref->errorCode = errorCode; sendSignal(fragOpPtr.p->m_userRef, GSN_TUX_ADD_ATTRREF, signal, TuxAddAttrRef::SignalLength, JBB); +#ifdef VM_TRACE + if (debugFlags & DebugMeta) { + debugOut << "Release on attr error frag op " << fragOpPtr.i << " " << *fragOpPtr.p << endl; + } +#endif c_fragOpPool.release(fragOpPtr); - dropIndex(signal, indexPtr, 0, 0); + // let DICT drop the unfinished index +} + +/* + * LQH aborts on-going create index operation. + */ +void +Dbtux::abortAddFragOp(Signal* signal) +{ + FragOpPtr fragOpPtr; + IndexPtr indexPtr; + c_fragOpPool.getPtr(fragOpPtr, signal->theData[1]); + c_indexPool.getPtr(indexPtr, fragOpPtr.p->m_indexId); +#ifdef VM_TRACE + if (debugFlags & DebugMeta) { + debugOut << "Release on abort frag op " << fragOpPtr.i << " " << *fragOpPtr.p << endl; + } +#endif + c_fragOpPool.release(fragOpPtr); + // let DICT drop the unfinished index } /* @@ -341,20 +406,13 @@ Dbtux::dropIndex(Signal* signal, IndexPtr indexPtr, Uint32 senderRef, Uint32 sen { jam(); indexPtr.p->m_state = Index::Dropping; - // drop one fragment at a time - if (indexPtr.p->m_numFrags > 0) { + // drop fragments + while (indexPtr.p->m_numFrags > 0) { jam(); - unsigned i = --indexPtr.p->m_numFrags; + Uint32 i = --indexPtr.p->m_numFrags; FragPtr fragPtr; c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]); c_fragPool.release(fragPtr); - // the real time break is not used for anything currently - signal->theData[0] = TuxContinueB::DropIndex; - signal->theData[1] = indexPtr.i; - signal->theData[2] = senderRef; - signal->theData[3] = senderData; - sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB); - return; } // drop attributes if (indexPtr.p->m_descPage != RNIL) { diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp index d11d5f7176a..f6d9a0ac35a 100644 --- a/ndb/src/kernel/blocks/suma/Suma.cpp +++ b/ndb/src/kernel/blocks/suma/Suma.cpp @@ -1888,7 +1888,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ req->requestInfo = 0; req->savePointId = 0; ScanFragReq::setLockMode(req->requestInfo, 0); - ScanFragReq::setHoldLockFlag(req->requestInfo, 0); + ScanFragReq::setHoldLockFlag(req->requestInfo, 1); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); ScanFragReq::setAttrLen(req->requestInfo, attrLen); req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo; diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp index d940f6e165a..a36263395ec 100644 --- a/ndb/src/mgmclient/CommandInterpreter.cpp +++ b/ndb/src/mgmclient/CommandInterpreter.cpp @@ -193,7 +193,7 @@ extern "C" { { return (Ndb_mgmclient_handle) new Ndb_mgmclient(connect_string); } - int ndb_mgmclient_execute(Ndb_mgmclient_handle h, int argc, const char** argv) + int ndb_mgmclient_execute(Ndb_mgmclient_handle h, int argc, char** argv) { return ((Ndb_mgmclient*)h)->execute(argc, argv, 1); } @@ -226,7 +226,7 @@ extern "C" { #include <util/InputStream.hpp> #include <util/OutputStream.hpp> -int Ndb_mgmclient::execute(int argc, const char** argv, int _try_reconnect) +int Ndb_mgmclient::execute(int argc, char** argv, int _try_reconnect) { if (argc <= 0) return 0; diff --git a/ndb/src/mgmclient/ndb_mgmclient.h b/ndb/src/mgmclient/ndb_mgmclient.h index 265e6bc67ec..b62a33999a3 100644 --- a/ndb/src/mgmclient/ndb_mgmclient.h +++ b/ndb/src/mgmclient/ndb_mgmclient.h @@ -23,7 +23,7 @@ extern "C" { typedef void* Ndb_mgmclient_handle; Ndb_mgmclient_handle ndb_mgmclient_handle_create(const char *connect_string); -int ndb_mgmclient_execute(Ndb_mgmclient_handle, int argc, const char** argv); +int ndb_mgmclient_execute(Ndb_mgmclient_handle, int argc, char** argv); int ndb_mgmclient_handle_destroy(Ndb_mgmclient_handle); #ifdef __cplusplus diff --git a/ndb/src/mgmclient/ndb_mgmclient.hpp b/ndb/src/mgmclient/ndb_mgmclient.hpp index 933d1bab5ce..f6bcebc3896 100644 --- a/ndb/src/mgmclient/ndb_mgmclient.hpp +++ b/ndb/src/mgmclient/ndb_mgmclient.hpp @@ -24,7 +24,7 @@ public: Ndb_mgmclient(const char*); ~Ndb_mgmclient(); int execute(const char *_line, int _try_reconnect=-1); - int execute(int argc, const char** argv, int _try_reconnect=-1); + int execute(int argc, char** argv, int _try_reconnect=-1); int disconnect(); private: CommandInterpreter *m_cmd; diff --git a/ndb/src/mgmsrv/main.cpp b/ndb/src/mgmsrv/main.cpp index 7fb4ea6a26a..1c3b838ef5f 100644 --- a/ndb/src/mgmsrv/main.cpp +++ b/ndb/src/mgmsrv/main.cpp @@ -216,7 +216,7 @@ int main(int argc, char** argv) * Read configuration files * ****************************/ LocalConfig local_config; - if(!local_config.init(0,glob.local_config_filename)){ + if(!local_config.init(opt_connect_str,glob.local_config_filename)){ local_config.printError(); goto error_end; } diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index 7710a3354d3..bd642ef3fd7 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -1086,8 +1086,11 @@ NdbConnection::getNdbIndexScanOperation(const NdbIndexImpl* index, if (indexTable != 0){ NdbIndexScanOperation* tOp = getNdbScanOperation((NdbTableImpl *) indexTable); - tOp->m_currentTable = table; - if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor; + if(tOp) + { + tOp->m_currentTable = table; + tOp->m_cursor_type = NdbScanOperation::IndexCursor; + } return tOp; } else { setOperationErrorCodeAbort(theNdb->theError.code); @@ -1582,9 +1585,6 @@ from other transactions. /** * There's always a TCKEYCONF when using IgnoreError */ -#ifdef VM_TRACE - ndbout_c("Not completing transaction 2"); -#endif return -1; } /**********************************************************************/ @@ -1836,9 +1836,6 @@ NdbConnection::OpCompleteFailure(Uint8 abortOption, bool setFailure) /** * There's always a TCKEYCONF when using IgnoreError */ -#ifdef VM_TRACE - ndbout_c("Not completing transaction"); -#endif return -1; } diff --git a/ndb/src/ndbapi/NdbConnectionScan.cpp b/ndb/src/ndbapi/NdbConnectionScan.cpp index 3fe8993a42b..b0c546c512a 100644 --- a/ndb/src/ndbapi/NdbConnectionScan.cpp +++ b/ndb/src/ndbapi/NdbConnectionScan.cpp @@ -56,13 +56,19 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){ const ScanTabRef * ref = CAST_CONSTPTR(ScanTabRef, aSignal->getDataPtr()); if(checkState_TransId(&ref->transId1)){ - theScanningOp->theError.code = ref->errorCode; + theScanningOp->setErrorCode(ref->errorCode); + theScanningOp->execCLOSE_SCAN_REP(); if(!ref->closeNeeded){ - theScanningOp->execCLOSE_SCAN_REP(); return 0; } - assert(theScanningOp->m_sent_receivers_count); + + /** + * Setup so that close_impl will actually perform a close + * and not "close scan"-optimze it away + */ theScanningOp->m_conf_receivers_count++; + theScanningOp->m_conf_receivers[0] = theScanningOp->m_receivers[0]; + theScanningOp->m_conf_receivers[0]->m_tcPtrI = ~0; return 0; } else { #ifdef NDB_NO_DROPPED_SIGNAL @@ -97,7 +103,7 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, theScanningOp->execCLOSE_SCAN_REP(); return 0; } - + for(Uint32 i = 0; i<len; i += 3){ Uint32 opCount, totalLen; Uint32 ptrI = * ops++; @@ -109,24 +115,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, void * tPtr = theNdb->int2void(ptrI); assert(tPtr); // For now NdbReceiver* tOp = theNdb->void2rec(tPtr); - if (tOp && tOp->checkMagicNumber()){ - if(tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)){ - /** - * - */ - theScanningOp->receiver_delivered(tOp); - } else if(info == ScanTabConf::EndOfData){ + if (tOp && tOp->checkMagicNumber()) + { + if (tcPtrI == RNIL && opCount == 0) theScanningOp->receiver_completed(tOp); - } - } - } - if (conf->requestInfo & ScanTabConf::EndOfData) { - if(theScanningOp->m_ordered) - theScanningOp->m_api_receivers_count = 0; - if(theScanningOp->m_api_receivers_count + - theScanningOp->m_conf_receivers_count + - theScanningOp->m_sent_receivers_count){ - abort(); + else if (tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)) + theScanningOp->receiver_delivered(tOp); } } return 0; diff --git a/ndb/src/ndbapi/NdbIndexOperation.cpp b/ndb/src/ndbapi/NdbIndexOperation.cpp index 3f174a61b64..23af646c4c7 100644 --- a/ndb/src/ndbapi/NdbIndexOperation.cpp +++ b/ndb/src/ndbapi/NdbIndexOperation.cpp @@ -272,7 +272,7 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, CHARSET_INFO* cs = tAttrInfo->m_cs; if (cs != 0) { // current limitation: strxfrm does not increase length - assert(cs->strxfrm_multiply == 1); + assert(cs->strxfrm_multiply <= 1); unsigned n = (*cs->coll->strnxfrm)(cs, (uchar*)xfrmData, sizeof(xfrmData), diff --git a/ndb/src/ndbapi/NdbOperationSearch.cpp b/ndb/src/ndbapi/NdbOperationSearch.cpp index 69b4e803acd..28a70abcb9b 100644 --- a/ndb/src/ndbapi/NdbOperationSearch.cpp +++ b/ndb/src/ndbapi/NdbOperationSearch.cpp @@ -142,7 +142,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, CHARSET_INFO* cs = tAttrInfo->m_cs; if (cs != 0) { // current limitation: strxfrm does not increase length - assert(cs->strxfrm_multiply == 1); + assert(cs->strxfrm_multiply <= 1); unsigned n = (*cs->coll->strnxfrm)(cs, (uchar*)xfrmData, sizeof(xfrmData), diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 373fec1a2b0..6eb5167e385 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -35,6 +35,8 @@ #include <signaldata/AttrInfo.hpp> #include <signaldata/TcKeyReq.hpp> +#define DEBUG_NEXT_RESULT 0 + NdbScanOperation::NdbScanOperation(Ndb* aNdb) : NdbOperation(aNdb), m_resultSet(0), @@ -275,6 +277,9 @@ NdbScanOperation::fix_receivers(Uint32 parallel){ void NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ if(theError.code == 0){ + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver_delivered"); + Uint32 idx = tRec->m_list_index; Uint32 last = m_sent_receivers_count - 1; if(idx != last){ @@ -298,6 +303,9 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ void NdbScanOperation::receiver_completed(NdbReceiver* tRec){ if(theError.code == 0){ + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver_completed"); + Uint32 idx = tRec->m_list_index; Uint32 last = m_sent_receivers_count - 1; if(idx != last){ @@ -445,8 +453,6 @@ NdbScanOperation::executeCursor(int nodeId){ return -1; } -#define DEBUG_NEXT_RESULT 0 - int NdbScanOperation::nextResult(bool fetchAllowed) { if(m_ordered) @@ -486,6 +492,9 @@ int NdbScanOperation::nextResult(bool fetchAllowed) Uint32 nodeId = theNdbCon->theDBnode; TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); + if(theError.code) + return -1; + Uint32 seq = theNdbCon->theNodeSequence; if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){ @@ -579,7 +588,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) int NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ - if(cnt > 0 || stopScanFlag){ + if(cnt > 0) + { NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); @@ -595,33 +605,41 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ */ Uint32 last = m_sent_receivers_count; Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4); + Uint32 sent = 0; for(Uint32 i = 0; i<cnt; i++){ NdbReceiver * tRec = m_api_receivers[i]; - m_sent_receivers[last+i] = tRec; - tRec->m_list_index = last+i; - prep_array[i] = tRec->m_tcPtrI; - tRec->prepareSend(); + if((prep_array[sent] = tRec->m_tcPtrI) != RNIL) + { + m_sent_receivers[last+sent] = tRec; + tRec->m_list_index = last+sent; + tRec->prepareSend(); + sent++; + } } - memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*)); + memmove(m_api_receivers, m_api_receivers+cnt, + (theParallelism-cnt) * sizeof(char*)); - Uint32 nodeId = theNdbCon->theDBnode; - TransporterFacade * tp = TransporterFacade::instance(); - int ret; - if(cnt > 21){ - tSignal.setLength(4); - LinearSectionPtr ptr[3]; - ptr[0].p = prep_array; - ptr[0].sz = cnt; - ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); - } else { - tSignal.setLength(4+cnt); - ret = tp->sendSignal(&tSignal, nodeId); + int ret = 0; + if(sent) + { + Uint32 nodeId = theNdbCon->theDBnode; + TransporterFacade * tp = TransporterFacade::instance(); + if(cnt > 21){ + tSignal.setLength(4); + LinearSectionPtr ptr[3]; + ptr[0].p = prep_array; + ptr[0].sz = sent; + ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); + } else { + tSignal.setLength(4+sent); + ret = tp->sendSignal(&tSignal, nodeId); + } } - - m_sent_receivers_count = last + cnt + stopScanFlag; + + m_sent_receivers_count = last + sent; m_api_receivers_count -= cnt; m_current_api_receiver = 0; - + return ret; } return 0; @@ -670,7 +688,6 @@ void NdbScanOperation::closeScan() void NdbScanOperation::execCLOSE_SCAN_REP(){ - m_api_receivers_count = 0; m_conf_receivers_count = 0; m_sent_receivers_count = 0; } @@ -1091,7 +1108,7 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, Uint32 xfrmData[2000]; if (cs != NULL && aValue != NULL) { // current limitation: strxfrm does not increase length - assert(cs->strxfrm_multiply == 1); + assert(cs->strxfrm_multiply <= 1); unsigned n = (*cs->coll->strnxfrm)(cs, (uchar*)xfrmData, sizeof(xfrmData), @@ -1317,6 +1334,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch..."); TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); + if(theError.code) + return -1; Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){ @@ -1330,6 +1349,13 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ continue; } if(DEBUG_NEXT_RESULT) ndbout_c("return -1"); + setErrorCode(4028); + return -1; + } + + if(theError.code){ + setErrorCode(theError.code); + if(DEBUG_NEXT_RESULT) ndbout_c("return -1"); return -1; } @@ -1339,11 +1365,9 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ memcpy(arr, m_conf_receivers, u_last * sizeof(char*)); if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last); - if(theError.code){ - setErrorCode(theError.code); - if(DEBUG_NEXT_RESULT) ndbout_c("return -1"); - return -1; - } + } else { + setErrorCode(4028); + return -1; } } else { if(DEBUG_NEXT_RESULT) ndbout_c("return 2"); @@ -1412,10 +1436,22 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ if(idx == theParallelism) return 0; + NdbReceiver* tRec = m_api_receivers[idx]; NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); + Uint32 last = m_sent_receivers_count; Uint32* theData = tSignal.getDataPtrSend(); + Uint32* prep_array = theData + 4; + + m_current_api_receiver = idx + 1; + if((prep_array[0] = tRec->m_tcPtrI) == RNIL) + { + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver completed, don't send"); + return 0; + } + theData[0] = theNdbCon->theTCConPtr; theData[1] = 0; Uint64 transId = theNdbCon->theTransactionId; @@ -1425,17 +1461,10 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ /** * Prepare ops */ - Uint32 last = m_sent_receivers_count; - Uint32 * prep_array = theData + 4; - - NdbReceiver * tRec = m_api_receivers[idx]; m_sent_receivers[last] = tRec; tRec->m_list_index = last; - prep_array[0] = tRec->m_tcPtrI; tRec->prepareSend(); - m_sent_receivers_count = last + 1; - m_current_api_receiver = idx + 1; Uint32 nodeId = theNdbCon->theDBnode; TransporterFacade * tp = TransporterFacade::instance(); @@ -1448,12 +1477,17 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; - if(seq != tp->getNodeSequence(nodeId)){ + if(seq != tp->getNodeSequence(nodeId)) + { theNdbCon->theReleaseOnClose = true; return -1; } - while(theError.code == 0 && m_sent_receivers_count){ + /** + * Wait for outstanding + */ + while(theError.code == 0 && m_sent_receivers_count) + { theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1471,18 +1505,59 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ } } - if(m_api_receivers_count+m_conf_receivers_count){ - // Send close scan - if(send_next_scan(0, true) == -1){ // Close scan - theNdbCon->theReleaseOnClose = true; - return -1; - } + if(theError.code) + { + m_api_receivers_count = 0; + m_current_api_receiver = m_ordered ? theParallelism : 0; + } + + + /** + * move all conf'ed into api + * so that send_next_scan can check if they needs to be closed + */ + Uint32 api = m_api_receivers_count; + Uint32 conf = m_conf_receivers_count; + + if(m_ordered) + { + /** + * Ordered scan, keep the m_api_receivers "to the right" + */ + memmove(m_api_receivers, m_api_receivers+m_current_api_receiver, + (theParallelism - m_current_api_receiver) * sizeof(char*)); + api = (theParallelism - m_current_api_receiver); + m_api_receivers_count = api; + } + + if(DEBUG_NEXT_RESULT) + ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d", + m_ordered, api, conf, + m_sent_receivers_count, m_current_api_receiver, theParallelism); + + if(api+conf) + { + /** + * There's something to close + * setup m_api_receivers (for send_next_scan) + */ + memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*)); + m_api_receivers_count = api + conf; + m_conf_receivers_count = 0; + } + + // Send close scan + if(send_next_scan(api+conf, true) == -1) + { + theNdbCon->theReleaseOnClose = true; + return -1; } /** * wait for close scan conf */ - while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){ + while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count) + { theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1499,6 +1574,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ return -1; } } + return 0; } diff --git a/ndb/test/include/HugoTransactions.hpp b/ndb/test/include/HugoTransactions.hpp index 19e4cb43336..b833f2ac629 100644 --- a/ndb/test/include/HugoTransactions.hpp +++ b/ndb/test/include/HugoTransactions.hpp @@ -36,15 +36,21 @@ public: bool allowConstraintViolation = true, int doSleep = 0, bool oneTrans = false); + int scanReadRecords(Ndb*, int records, int abort = 0, int parallelism = 0, - bool committed = false); - int scanReadCommittedRecords(Ndb*, - int records, - int abort = 0, - int parallelism = 0); + NdbOperation::LockMode = NdbOperation::LM_Read); + + int scanReadRecords(Ndb*, + const NdbDictionary::Index*, + int records, + int abort = 0, + int parallelism = 0, + NdbOperation::LockMode = NdbOperation::LM_Read, + bool sorted = false); + int pkReadRecords(Ndb*, int records, int batchsize = 1, diff --git a/ndb/test/include/UtilTransactions.hpp b/ndb/test/include/UtilTransactions.hpp index 37cd99550a5..23902f3b317 100644 --- a/ndb/test/include/UtilTransactions.hpp +++ b/ndb/test/include/UtilTransactions.hpp @@ -53,11 +53,11 @@ public: int selectCount(Ndb*, int parallelism = 0, int* count_rows = NULL, - ScanLock lock = SL_Read, + NdbOperation::LockMode lm = NdbOperation::LM_CommittedRead, NdbConnection* pTrans = NULL); int scanReadRecords(Ndb*, int parallelism, - bool exclusive, + NdbOperation::LockMode lm, int records, int noAttribs, int* attrib_list, diff --git a/ndb/test/ndbapi/testDict.cpp b/ndb/test/ndbapi/testDict.cpp index 9552e321f00..712ab2e4d25 100644 --- a/ndb/test/ndbapi/testDict.cpp +++ b/ndb/test/ndbapi/testDict.cpp @@ -1479,6 +1479,69 @@ runTestDictionaryPerf(NDBT_Context* ctx, NDBT_Step* step){ return NDBT_OK; } +int runFailAddFragment(NDBT_Context* ctx, NDBT_Step* step){ + static int tuplst[] = { 4007, 4008, 4009, 4010, 4011, 4012 }; + static int tuxlst[] = { 12001, 12002, 12003, 12004, 12005, 12006 }; + static unsigned tupcnt = sizeof(tuplst)/sizeof(tuplst[0]); + static unsigned tuxcnt = sizeof(tuxlst)/sizeof(tuxlst[0]); + + NdbRestarter restarter; + int nodeId = restarter.getMasterNodeId(); + Ndb* pNdb = GETNDB(step); + NdbDictionary::Dictionary* pDic = pNdb->getDictionary(); + NdbDictionary::Table tab(*ctx->getTab()); + tab.setFragmentType(NdbDictionary::Object::FragAllLarge); + + // ordered index on first few columns + NdbDictionary::Index idx("X"); + idx.setTable(tab.getName()); + idx.setType(NdbDictionary::Index::OrderedIndex); + idx.setLogging(false); + for (int i_hate_broken_compilers = 0; + i_hate_broken_compilers < 3 && + i_hate_broken_compilers < tab.getNoOfColumns(); + i_hate_broken_compilers++) { + idx.addColumn(*tab.getColumn(i_hate_broken_compilers)); + } + + const int loops = ctx->getNumLoops(); + int result = NDBT_OK; + (void)pDic->dropTable(tab.getName()); + + for (int l = 0; l < loops; l++) { + for (unsigned i1 = 0; i1 < tupcnt; i1++) { + unsigned j = (l == 0 ? i1 : myRandom48(tupcnt)); + int errval = tuplst[j]; + g_info << "insert error node=" << nodeId << " value=" << errval << endl; + CHECK2(restarter.insertErrorInNode(nodeId, errval) == 0, + "failed to set error insert"); + CHECK2(pDic->createTable(tab) != 0, + "failed to fail after error insert " << errval); + CHECK2(pDic->createTable(tab) == 0, + pDic->getNdbError()); + CHECK2(pDic->dropTable(tab.getName()) == 0, + pDic->getNdbError()); + } + for (unsigned i2 = 0; i2 < tuxcnt; i2++) { + unsigned j = (l == 0 ? i2 : myRandom48(tuxcnt)); + int errval = tuxlst[j]; + g_info << "insert error node=" << nodeId << " value=" << errval << endl; + CHECK2(restarter.insertErrorInNode(nodeId, errval) == 0, + "failed to set error insert"); + CHECK2(pDic->createTable(tab) == 0, + pDic->getNdbError()); + CHECK2(pDic->createIndex(idx) != 0, + "failed to fail after error insert " << errval); + CHECK2(pDic->createIndex(idx) == 0, + pDic->getNdbError()); + CHECK2(pDic->dropTable(tab.getName()) == 0, + pDic->getNdbError()); + } + } +end: + return result; +} + NDBT_TESTSUITE(testDict); TESTCASE("CreateAndDrop", "Try to create and drop the table loop number of times\n"){ @@ -1574,6 +1637,10 @@ TESTCASE("DictionaryPerf", ""){ INITIALIZER(runTestDictionaryPerf); } +TESTCASE("FailAddFragment", + "Fail add fragment or attribute in TUP or TUX\n"){ + INITIALIZER(runFailAddFragment); +} NDBT_TESTSUITE_END(testDict); int main(int argc, const char** argv){ diff --git a/ndb/test/ndbapi/testReadPerf.cpp b/ndb/test/ndbapi/testReadPerf.cpp index 380a809ad00..3adcb5a2d9b 100644 --- a/ndb/test/ndbapi/testReadPerf.cpp +++ b/ndb/test/ndbapi/testReadPerf.cpp @@ -391,8 +391,15 @@ run_read(){ void print_result(){ + int tmp = 1; + tmp *= g_paramters[P_RANGE].value; + tmp *= g_paramters[P_LOOPS].value; + + int t, t2; for(int i = 0; i<P_OP_TYPES; i++){ - g_err.println("%s avg: %u us/row", g_ops[i], - (1000*g_times[i])/(g_paramters[P_RANGE].value*g_paramters[P_LOOPS].value)); + g_err << g_ops[i] << " avg: " + << (int)((1000*g_times[i])/tmp) + << " us/row (" + << (1000 * tmp)/g_times[i] << " rows / sec)" << endl; } } diff --git a/ndb/test/ndbapi/testScan.cpp b/ndb/test/ndbapi/testScan.cpp index 0cd30dfefde..22ec3fff327 100644 --- a/ndb/test/ndbapi/testScan.cpp +++ b/ndb/test/ndbapi/testScan.cpp @@ -90,11 +90,59 @@ int runLoadAllTables(NDBT_Context* ctx, NDBT_Step* step){ return NDBT_OK; } +char orderedPkIdxName[255]; + +int createOrderedPkIndex(NDBT_Context* ctx, NDBT_Step* step){ + + const NdbDictionary::Table* pTab = ctx->getTab(); + Ndb* pNdb = GETNDB(step); + + // Create index + BaseString::snprintf(orderedPkIdxName, sizeof(orderedPkIdxName), + "IDC_O_PK_%s", pTab->getName()); + NdbDictionary::Index pIdx(orderedPkIdxName); + pIdx.setTable(pTab->getName()); + pIdx.setType(NdbDictionary::Index::OrderedIndex); + pIdx.setLogging(false); + + for (int c = 0; c< pTab->getNoOfColumns(); c++){ + const NdbDictionary::Column * col = pTab->getColumn(c); + if(col->getPrimaryKey()){ + pIdx.addIndexColumn(col->getName()); + } + } + + if (pNdb->getDictionary()->createIndex(pIdx) != 0){ + ndbout << "FAILED! to create index" << endl; + const NdbError err = pNdb->getDictionary()->getNdbError(); + ERR(err); + return NDBT_FAILED; + } + + return NDBT_OK; +} + +int createOrderedPkIndex_Drop(NDBT_Context* ctx, NDBT_Step* step){ + const NdbDictionary::Table* pTab = ctx->getTab(); + Ndb* pNdb = GETNDB(step); + + // Drop index + if (pNdb->getDictionary()->dropIndex(orderedPkIdxName, + pTab->getName()) != 0){ + ndbout << "FAILED! to drop index" << endl; + ERR(pNdb->getDictionary()->getNdbError()); + return NDBT_FAILED; + } + + return NDBT_OK; +} + + int runScanReadRandomTable(NDBT_Context* ctx, NDBT_Step* step){ int loops = ctx->getNumLoops(); int records = ctx->getNumRecords(); int parallelism = ctx->getProperty("Parallelism", 240); - int abort = ctx->getProperty("AbortProb"); + int abort = ctx->getProperty("AbortProb", 5); int i = 0; while (i<loops) { @@ -218,7 +266,7 @@ int runScanRead(NDBT_Context* ctx, NDBT_Step* step){ int loops = ctx->getNumLoops(); int records = ctx->getNumRecords(); int parallelism = ctx->getProperty("Parallelism", 240); - int abort = ctx->getProperty("AbortProb"); + int abort = ctx->getProperty("AbortProb", 5); int i = 0; HugoTransactions hugoTrans(*ctx->getTab()); @@ -232,18 +280,66 @@ int runScanRead(NDBT_Context* ctx, NDBT_Step* step){ return NDBT_OK; } +int runRandScanRead(NDBT_Context* ctx, NDBT_Step* step){ + int loops = ctx->getNumLoops(); + int records = ctx->getNumRecords(); + int parallelism = ctx->getProperty("Parallelism", 240); + int abort = ctx->getProperty("AbortProb", 5); + + int i = 0; + HugoTransactions hugoTrans(*ctx->getTab()); + while (i<loops && !ctx->isTestStopped()) { + g_info << i << ": "; + NdbOperation::LockMode lm = (NdbOperation::LockMode)(rand() % 3); + if (hugoTrans.scanReadRecords(GETNDB(step), + records, abort, parallelism, + lm) != 0){ + return NDBT_FAILED; + } + i++; + } + return NDBT_OK; +} + +int runScanReadIndex(NDBT_Context* ctx, NDBT_Step* step){ + int loops = ctx->getNumLoops(); + int records = ctx->getNumRecords(); + int parallelism = ctx->getProperty("Parallelism", 240); + int abort = ctx->getProperty("AbortProb", 5); + const NdbDictionary::Index * pIdx = + GETNDB(step)->getDictionary()->getIndex(orderedPkIdxName, + ctx->getTab()->getName()); + + int i = 0; + HugoTransactions hugoTrans(*ctx->getTab()); + while (pIdx && i<loops && !ctx->isTestStopped()) { + g_info << i << ": "; + bool sort = (rand() % 100) > 50 ? true : false; + NdbOperation::LockMode lm = (NdbOperation::LockMode)(rand() % 3); + if (hugoTrans.scanReadRecords(GETNDB(step), pIdx, + records, abort, parallelism, + lm, + sort) != 0){ + return NDBT_FAILED; + } + i++; + } + return NDBT_OK; +} + int runScanReadCommitted(NDBT_Context* ctx, NDBT_Step* step){ int loops = ctx->getNumLoops(); int records = ctx->getNumRecords(); int parallelism = ctx->getProperty("Parallelism", 240); - int abort = ctx->getProperty("AbortProb"); + int abort = ctx->getProperty("AbortProb", 5); int i = 0; HugoTransactions hugoTrans(*ctx->getTab()); while (i<loops && !ctx->isTestStopped()) { g_info << i << ": "; - if (hugoTrans.scanReadCommittedRecords(GETNDB(step), records, - abort, parallelism) != 0){ + if (hugoTrans.scanReadRecords(GETNDB(step), records, + abort, parallelism, + NdbOperation::LM_CommittedRead) != 0){ return NDBT_FAILED; } i++; @@ -424,7 +520,7 @@ int runScanUpdate(NDBT_Context* ctx, NDBT_Step* step){ int loops = ctx->getNumLoops(); int records = ctx->getNumRecords(); int parallelism = ctx->getProperty("Parallelism", 1); - int abort = ctx->getProperty("AbortProb"); + int abort = ctx->getProperty("AbortProb", 5); int i = 0; HugoTransactions hugoTrans(*ctx->getTab()); while (i<loops) { @@ -464,7 +560,7 @@ int runScanUpdate2(NDBT_Context* ctx, NDBT_Step* step){ int loops = ctx->getNumLoops(); int records = ctx->getNumRecords(); int parallelism = ctx->getProperty("Parallelism", 240); - int abort = ctx->getProperty("AbortProb"); + int abort = ctx->getProperty("AbortProb", 5); int i = 0; HugoTransactions hugoTrans(*ctx->getTab()); while (i<loops) { @@ -639,7 +735,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){ g_info << (unsigned)i << endl; if(utilTrans.scanReadRecords(GETNDB(step), parallelism, - false, + NdbOperation::LM_Read, records, alist.attriblist[i]->numAttribs, alist.attriblist[i]->attribs) != 0){ @@ -647,7 +743,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){ } if(utilTrans.scanReadRecords(GETNDB(step), parallelism, - true, + NdbOperation::LM_Read, records, alist.attriblist[i]->numAttribs, alist.attriblist[i]->attribs) != 0){ @@ -1079,7 +1175,30 @@ TESTCASE("ScanRead488", "When this limit is exceeded the scan will be aborted with errorcode "\ "488."){ INITIALIZER(runLoadTable); - STEPS(runScanRead, 70); + STEPS(runRandScanRead, 70); + FINALIZER(runClearTable); +} +TESTCASE("ScanRead488O", + "Verify scan requirement: It's only possible to have 11 concurrent "\ + "scans per fragment running in Ndb kernel at the same time. "\ + "When this limit is exceeded the scan will be aborted with errorcode "\ + "488."){ + INITIALIZER(createOrderedPkIndex); + INITIALIZER(runLoadTable); + STEPS(runScanReadIndex, 70); + FINALIZER(createOrderedPkIndex_Drop); + FINALIZER(runClearTable); +} +TESTCASE("ScanRead488_Mixed", + "Verify scan requirement: It's only possible to have 11 concurrent "\ + "scans per fragment running in Ndb kernel at the same time. "\ + "When this limit is exceeded the scan will be aborted with errorcode "\ + "488."){ + INITIALIZER(createOrderedPkIndex); + INITIALIZER(runLoadTable); + STEPS(runRandScanRead, 50); + STEPS(runScanReadIndex, 50); + FINALIZER(createOrderedPkIndex_Drop); FINALIZER(runClearTable); } TESTCASE("ScanRead488Timeout", diff --git a/ndb/test/ndbapi/testScanPerf.cpp b/ndb/test/ndbapi/testScanPerf.cpp index 003fc67179f..45f0468bc70 100644 --- a/ndb/test/ndbapi/testScanPerf.cpp +++ b/ndb/test/ndbapi/testScanPerf.cpp @@ -39,8 +39,9 @@ struct Parameter { #define P_LOOPS 8 #define P_CREATE 9 #define P_LOAD 10 +#define P_RESET 11 -#define P_MAX 11 +#define P_MAX 12 static Parameter @@ -55,7 +56,8 @@ g_paramters[] = { { "size", 1000000, 1, ~0 }, { "iterations", 3, 1, ~0 }, { "create_drop", 1, 0, 1 }, - { "data", 1, 0, 1 } + { "data", 1, 0, 1 }, + { "q-reset bounds", 0, 1, 0 } }; static Ndb* g_ndb = 0; @@ -219,21 +221,30 @@ run_scan(){ NDB_TICKS start1, stop; int sum_time= 0; + int sample_rows = 0; + int tot_rows = 0; + NDB_TICKS sample_start = NdbTick_CurrentMillisecond(); + Uint32 tot = g_paramters[P_ROWS].value; + if(g_paramters[P_BOUND].value == 2 || g_paramters[P_FILT].value == 2) + iter *= g_paramters[P_ROWS].value; + + NdbScanOperation * pOp = 0; + NdbIndexScanOperation * pIOp = 0; + NdbConnection * pTrans = 0; + NdbResultSet * rs = 0; + int check = 0; + for(int i = 0; i<iter; i++){ start1 = NdbTick_CurrentMillisecond(); - NdbConnection * pTrans = g_ndb->startTransaction(); + pTrans = pTrans ? pTrans : g_ndb->startTransaction(); if(!pTrans){ g_err << "Failed to start transaction" << endl; err(g_ndb->getNdbError()); return -1; } - NdbScanOperation * pOp; - NdbIndexScanOperation * pIOp; - - NdbResultSet * rs; int par = g_paramters[P_PARRA].value; int bat = g_paramters[P_BATCH].value; NdbScanOperation::LockMode lm; @@ -256,9 +267,17 @@ run_scan(){ assert(pOp); rs = pOp->readTuples(lm, bat, par); } else { - pOp = pIOp = pTrans->getNdbIndexScanOperation(g_indexname, g_tablename); - bool ord = g_paramters[P_ACCESS].value == 2; - rs = pIOp->readTuples(lm, bat, par, ord); + if(g_paramters[P_RESET].value == 0 || pIOp == 0) + { + pOp= pIOp= pTrans->getNdbIndexScanOperation(g_indexname, g_tablename); + bool ord = g_paramters[P_ACCESS].value == 2; + rs = pIOp->readTuples(lm, bat, par, ord); + } + else + { + pIOp->reset_bounds(); + } + switch(g_paramters[P_BOUND].value){ case 0: // All break; @@ -268,20 +287,22 @@ run_scan(){ case 2: { // 1 row default: assert(g_table->getNoOfPrimaryKeys() == 1); // only impl. so far - abort(); -#if 0 int tot = g_paramters[P_ROWS].value; int row = rand() % tot; +#if 0 fix_eq_bound(pIOp, row); +#else + pIOp->setBound((Uint32)0, NdbIndexScanOperation::BoundEQ, &row); #endif break; } } + if(g_paramters[P_RESET].value == 2) + goto execute; } assert(pOp); assert(rs); - int check = 0; switch(g_paramters[P_FILT].value){ case 0: // All check = pOp->interpret_exit_ok(); @@ -310,10 +331,13 @@ run_scan(){ } assert(check == 0); + if(g_paramters[P_RESET].value == 1) + g_paramters[P_RESET].value = 2; + for(int i = 0; i<g_table->getNoOfColumns(); i++){ pOp->getValue(i); } - +execute: int rows = 0; check = pTrans->execute(NoCommit); assert(check == 0); @@ -334,19 +358,30 @@ run_scan(){ return -1; } assert(check == 1); - g_info << "Found " << rows << " rows" << endl; - - pTrans->close(); - + if(g_paramters[P_RESET].value == 0) + { + pTrans->close(); + pTrans = 0; + } stop = NdbTick_CurrentMillisecond(); + int time_passed= (int)(stop - start1); - g_err.println("Time: %d ms = %u rows/sec", time_passed, - (1000*tot)/time_passed); + sample_rows += rows; sum_time+= time_passed; + tot_rows+= rows; + + if(sample_rows >= tot) + { + int sample_time = (int)(stop - sample_start); + g_info << "Found " << sample_rows << " rows" << endl; + g_err.println("Time: %d ms = %u rows/sec", sample_time, + (1000*sample_rows)/sample_time); + sample_rows = 0; + sample_start = stop; + } } - sum_time= sum_time / iter; - g_err.println("Avg time: %d ms = %u rows/sec", sum_time, - (1000*tot)/sum_time); + g_err.println("Avg time: %d ms = %u rows/sec", sum_time/tot_rows, + (1000*tot_rows)/sum_time); return 0; } diff --git a/ndb/test/run-test/daily-basic-tests.txt b/ndb/test/run-test/daily-basic-tests.txt index 8d7e8a06c72..8a927b88194 100644 --- a/ndb/test/run-test/daily-basic-tests.txt +++ b/ndb/test/run-test/daily-basic-tests.txt @@ -222,6 +222,18 @@ max-time: 500 cmd: testScan args: -n ScanRead488 -l 10 T6 +max-time: 500 +cmd: testScan +args: -n ScanRead488O -l 10 T6 + +max-time: 1000 +cmd: testScan +args: -n ScanRead488_Mixed -l 10 T6 + +max-time: 500 +cmd: testScan +args: -n ScanRead488Timeout -l 10 T6 + max-time: 600 cmd: testScan args: -n ScanRead40 -l 100 T2 @@ -474,493 +486,13 @@ args: -n UpdateWithoutValues T6 #cmd: testInterpreter #args: T1 # -max-time: 1500 -cmd: testOperations -args: -n ReadRead - -max-time: 1500 -cmd: testOperations -args: -n ReadReadEx - -max-time: 1500 -cmd: testOperations -args: -n ReadInsert - -max-time: 1500 -cmd: testOperations -args: -n ReadUpdate - -max-time: 1500 -cmd: testOperations -args: -n ReadDelete - -max-time: 1500 -cmd: testOperations -args: -n FReadRead - -max-time: 1500 -cmd: testOperations -args: -n FReadReadEx - -max-time: 1500 -cmd: testOperations -args: -n FReadInsert - -max-time: 1500 -cmd: testOperations -args: -n FReadUpdate - -max-time: 1500 -cmd: testOperations -args: -n FReadDelete - -max-time: 1500 -cmd: testOperations -args: -n ReadExRead - -max-time: 1500 -cmd: testOperations -args: -n ReadExReadEx - -max-time: 1500 -cmd: testOperations -args: -n ReadExInsert - -max-time: 1500 -cmd: testOperations -args: -n ReadExUpdate - -max-time: 1500 -cmd: testOperations -args: -n ReadExDelete - -max-time: 1500 -cmd: testOperations -args: -n InsertRead - -max-time: 1500 -cmd: testOperations -args: -n InsertReadEx - -max-time: 1500 -cmd: testOperations -args: -n InsertInsert - -max-time: 1500 -cmd: testOperations -args: -n InsertUpdate - -max-time: 1500 -cmd: testOperations -args: -n InsertDelete - -max-time: 1500 -cmd: testOperations -args: -n UpdateRead - -max-time: 1500 -cmd: testOperations -args: -n UpdateReadEx - -max-time: 1500 -cmd: testOperations -args: -n UpdateInsert - -max-time: 1500 -cmd: testOperations -args: -n UpdateUpdate - -max-time: 1500 -cmd: testOperations -args: -n UpdateDelete - -max-time: 1500 -cmd: testOperations -args: -n DeleteRead - -max-time: 1500 -cmd: testOperations -args: -n DeleteReadEx - -max-time: 1500 -cmd: testOperations -args: -n DeleteInsert - -max-time: 1500 +max-time: 150000 cmd: testOperations -args: -n DeleteUpdate - -max-time: 1500 -cmd: testOperations -args: -n DeleteDelete - -max-time: 1500 -cmd: testOperations -args: -n ReadSimpleRead - -max-time: 1500 -cmd: testOperations -args: -n ReadDirtyRead - -max-time: 1500 -cmd: testOperations -args: -n FReadSimpleRead - -max-time: 1500 -cmd: testOperations -args: -n FReadDirtyRead - -max-time: 1500 -cmd: testOperations -args: -n ReadExSimpleRead - -max-time: 1500 -cmd: testOperations -args: -n ReadExDirtyRead - -max-time: 1500 -cmd: testOperations -args: -n InsertSimpleRead - -max-time: 1500 -cmd: testOperations -args: -n InsertDirtyRead - -max-time: 1500 -cmd: testOperations -args: -n UpdateSimpleRead - -max-time: 1500 -cmd: testOperations -args: -n UpdateDirtyRead - -max-time: 1500 -cmd: testOperations -args: -n DeleteSimpleRead - -max-time: 1500 -cmd: testOperations -args: -n DeleteDirtyRead - -max-time: 1500 -cmd: testTransactions -args: -n ReadRead - -max-time: 1500 -cmd: testTransactions -args: -n ReadReadEx - -max-time: 1500 -cmd: testTransactions -args: -n ReadInsert - -max-time: 1500 -cmd: testTransactions -args: -n ReadUpdate - -max-time: 1500 -cmd: testTransactions -args: -n ReadDelete - -max-time: 1500 -cmd: testTransactions -args: -n ReadExRead - -max-time: 1500 -cmd: testTransactions -args: -n ReadExReadEx - -max-time: 1500 -cmd: testTransactions -args: -n ReadExInsert - -max-time: 1500 -cmd: testTransactions -args: -n ReadExUpdate - -max-time: 1500 -cmd: testTransactions -args: -n ReadExDelete - -max-time: 1500 -cmd: testTransactions -args: -n InsertRead - -max-time: 1500 -cmd: testTransactions -args: -n InsertReadEx - -max-time: 1500 -cmd: testTransactions -args: -n InsertInsert - -max-time: 1500 -cmd: testTransactions -args: -n InsertUpdate - -max-time: 1500 -cmd: testTransactions -args: -n InsertDelete - -max-time: 1500 -cmd: testTransactions -args: -n UpdateRead - -max-time: 1500 -cmd: testTransactions -args: -n UpdateReadEx - -max-time: 1500 -cmd: testTransactions -args: -n UpdateInsert - -max-time: 1500 -cmd: testTransactions -args: -n UpdateUpdate - -max-time: 1500 -cmd: testTransactions -args: -n UpdateDelete - -max-time: 1500 -cmd: testTransactions -args: -n DeleteRead - -max-time: 1500 -cmd: testTransactions -args: -n DeleteReadEx - -max-time: 1500 -cmd: testTransactions -args: -n DeleteInsert - -max-time: 1500 -cmd: testTransactions -args: -n DeleteUpdate - -max-time: 1500 -cmd: testTransactions -args: -n DeleteDelete - -max-time: 1500 -cmd: testTransactions -args: -n ReadSimpleRead - -max-time: 1500 -cmd: testTransactions -args: -n ReadDirtyRead - -max-time: 1500 -cmd: testTransactions -args: -n ReadExSimpleRead - -max-time: 1500 -cmd: testTransactions -args: -n ReadExDirtyRead - -max-time: 1500 -cmd: testTransactions -args: -n InsertSimpleRead - -max-time: 1500 -cmd: testTransactions -args: -n InsertDirtyRead - -max-time: 1500 -cmd: testTransactions -args: -n UpdateSimpleRead - -max-time: 1500 -cmd: testTransactions -args: -n UpdateDirtyRead - -max-time: 1500 -cmd: testTransactions -args: -n DeleteSimpleRead - -max-time: 1500 -cmd: testTransactions -args: -n DeleteDirtyRead - -max-time: 1500 -cmd: testTransactions -args: -n ReadScan - -max-time: 1500 -cmd: testTransactions -args: -n ReadScanHl - -max-time: 1500 -cmd: testTransactions -args: -n ReadScanEx - -max-time: 1500 -cmd: testTransactions -args: -n ScanRead - -max-time: 1500 -cmd: testTransactions -args: -n ScanReadEx - -max-time: 1500 -cmd: testTransactions -args: -n ScanSimpleRead - -max-time: 1500 -cmd: testTransactions -args: -n ScanDirtyRead - -max-time: 1500 -cmd: testTransactions -args: -n ScanInsert - -max-time: 1500 -cmd: testTransactions -args: -n ScanUpdate - -max-time: 1500 -cmd: testTransactions -args: -n ScanDelete - -max-time: 1500 -cmd: testTransactions -args: -n ScanScan - -max-time: 1500 -cmd: testTransactions -args: -n ScanScanHl - -max-time: 1500 -cmd: testTransactions -args: -n ScanScanEx - -max-time: 1500 -cmd: testTransactions -args: -n ScanHlRead - -max-time: 1500 -cmd: testTransactions -args: -n ScanHlReadEx - -max-time: 1500 -cmd: testTransactions -args: -n ScanHlSimpleRead - -max-time: 1500 -cmd: testTransactions -args: -n ScanHlDirtyRead - -max-time: 1500 -cmd: testTransactions -args: -n ScanHlInsert - -max-time: 1500 -cmd: testTransactions -args: -n ScanHlUpdate - -max-time: 1500 -cmd: testTransactions -args: -n ScanHlDelete - -max-time: 1500 -cmd: testTransactions -args: -n ScanHlScan - -max-time: 1500 -cmd: testTransactions -args: -n ScanHlScanHl - -max-time: 1500 -cmd: testTransactions -args: -n ScanHlScanEx - -max-time: 1500 -cmd: testTransactions -args: -n ScanExRead - -max-time: 1500 -cmd: testTransactions -args: -n ScanExReadEx - -max-time: 1500 -cmd: testTransactions -args: -n ScanExSimpleRead - -max-time: 1500 -cmd: testTransactions -args: -n ScanExDirtyRead - -max-time: 1500 -cmd: testTransactions -args: -n ScanExInsert - -max-time: 1500 -cmd: testTransactions -args: -n ScanExUpdate - -max-time: 1500 -cmd: testTransactions -args: -n ScanExDelete - -max-time: 1500 -cmd: testTransactions -args: -n ScanExScan - -max-time: 1500 -cmd: testTransactions -args: -n ScanExScanHl - -max-time: 1500 -cmd: testTransactions -args: -n ScanExScanEx - -max-time: 1500 -cmd: testTransactions -args: -n ReadExScan - -max-time: 1500 -cmd: testTransactions -args: -n ReadExScanHl - -max-time: 1500 -cmd: testTransactions -args: -n ReadExScanEx - -max-time: 1500 -cmd: testTransactions -args: -n InsertScan - -max-time: 1500 -cmd: testTransactions -args: -n InsertScanHl - -max-time: 1500 -cmd: testTransactions -args: -n InsertScanEx - -max-time: 1500 -cmd: testTransactions -args: -n UpdateScan - -max-time: 1500 -cmd: testTransactions -args: -n UpdateScanHl - -max-time: 1500 -cmd: testTransactions -args: -n UpdateScanEx - -max-time: 1500 -cmd: testTransactions -args: -n DeleteScan - -max-time: 1500 -cmd: testTransactions -args: -n DeleteScanHl +args: -max-time: 1500 +max-time: 150000 cmd: testTransactions -args: -n DeleteScanEx +args: max-time: 1500 cmd: testRestartGci diff --git a/ndb/test/src/HugoTransactions.cpp b/ndb/test/src/HugoTransactions.cpp index 456bfffbb77..096f5406bbf 100644 --- a/ndb/test/src/HugoTransactions.cpp +++ b/ndb/test/src/HugoTransactions.cpp @@ -29,26 +29,175 @@ HugoTransactions::~HugoTransactions(){ deallocRows(); } - -int HugoTransactions::scanReadCommittedRecords(Ndb* pNdb, +int +HugoTransactions::scanReadRecords(Ndb* pNdb, int records, int abortPercent, - int parallelism){ - return scanReadRecords(pNdb, records, abortPercent, parallelism, true); + int parallelism, + NdbOperation::LockMode lm) +{ + + int retryAttempt = 0; + const int retryMax = 100; + int check, a; + NdbConnection *pTrans; + NdbScanOperation *pOp; + + while (true){ + + if (retryAttempt >= retryMax){ + g_err << "ERROR: has retried this operation " << retryAttempt + << " times, failing!" << endl; + return NDBT_FAILED; + } + + pTrans = pNdb->startTransaction(); + if (pTrans == NULL) { + const NdbError err = pNdb->getNdbError(); + + if (err.status == NdbError::TemporaryError){ + ERR(err); + NdbSleep_MilliSleep(50); + retryAttempt++; + continue; + } + ERR(err); + return NDBT_FAILED; + } + + pOp = pTrans->getNdbScanOperation(tab.getName()); + if (pOp == NULL) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + NdbResultSet * rs; + rs = pOp ->readTuples(lm); + + if( rs == 0 ) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + check = pOp->interpret_exit_ok(); + if( check == -1 ) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + for(a = 0; a<tab.getNoOfColumns(); a++){ + if((row.attributeStore(a) = + pOp->getValue(tab.getColumn(a)->getName())) == 0) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + } + + check = pTrans->execute(NoCommit); + if( check == -1 ) { + const NdbError err = pTrans->getNdbError(); + if (err.status == NdbError::TemporaryError){ + ERR(err); + pNdb->closeTransaction(pTrans); + NdbSleep_MilliSleep(50); + retryAttempt++; + continue; + } + ERR(err); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + // Abort after 1-100 or 1-records rows + int ranVal = rand(); + int abortCount = ranVal % (records == 0 ? 100 : records); + bool abortTrans = false; + if (abort > 0){ + // Abort if abortCount is less then abortPercent + if (abortCount < abortPercent) + abortTrans = true; + } + + int eof; + int rows = 0; + while((eof = rs->nextResult(true)) == 0){ + rows++; + if (calc.verifyRowValues(&row) != 0){ + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + if (abortCount == rows && abortTrans == true){ + ndbout << "Scan is aborted" << endl; + g_info << "Scan is aborted" << endl; + rs->close(); + if( check == -1 ) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + pNdb->closeTransaction(pTrans); + return NDBT_OK; + } + } + if (eof == -1) { + const NdbError err = pTrans->getNdbError(); + + if (err.status == NdbError::TemporaryError){ + ERR_INFO(err); + pNdb->closeTransaction(pTrans); + NdbSleep_MilliSleep(50); + switch (err.code){ + case 488: + case 245: + case 490: + // Too many active scans, no limit on number of retry attempts + break; + default: + retryAttempt++; + } + continue; + } + ERR(err); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + pNdb->closeTransaction(pTrans); + + g_info << rows << " rows have been read" << endl; + if (records != 0 && rows != records){ + g_err << "Check expected number of records failed" << endl + << " expected=" << records <<", " << endl + << " read=" << rows << endl; + return NDBT_FAILED; + } + + return NDBT_OK; + } + return NDBT_FAILED; } int HugoTransactions::scanReadRecords(Ndb* pNdb, + const NdbDictionary::Index * pIdx, int records, int abortPercent, int parallelism, - bool committed){ + NdbOperation::LockMode lm, + bool sorted) +{ int retryAttempt = 0; const int retryMax = 100; int check, a; NdbConnection *pTrans; - NdbScanOperation *pOp; + NdbIndexScanOperation *pOp; while (true){ @@ -72,7 +221,7 @@ HugoTransactions::scanReadRecords(Ndb* pNdb, return NDBT_FAILED; } - pOp = pTrans->getNdbScanOperation(tab.getName()); + pOp = pTrans->getNdbIndexScanOperation(pIdx->getName(), tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); pNdb->closeTransaction(pTrans); @@ -80,8 +229,7 @@ HugoTransactions::scanReadRecords(Ndb* pNdb, } NdbResultSet * rs; - rs = pOp ->readTuples(committed ? NdbScanOperation::LM_CommittedRead : - NdbScanOperation::LM_Read); + rs = pOp ->readTuples(lm, 0, parallelism, sorted); if( rs == 0 ) { ERR(pTrans->getNdbError()); diff --git a/ndb/test/src/NDBT_Test.cpp b/ndb/test/src/NDBT_Test.cpp index 367223f8c98..bbbde008938 100644 --- a/ndb/test/src/NDBT_Test.cpp +++ b/ndb/test/src/NDBT_Test.cpp @@ -519,6 +519,7 @@ void NDBT_TestCaseImpl1::waitSteps(){ NdbThread_WaitFor(threads[i], &status); NdbThread_Destroy(&threads[i]); } + threads.clear(); } @@ -839,9 +840,9 @@ void NDBT_TestSuite::execute(Ndb* ndb, const NdbDictionary::Table* pTab, continue; } pTab2 = pDict->getTable(pTab->getName()); - } else { + } else if(!pTab2) { pTab2 = pTab; - } + } ctx = new NDBT_Context(); ctx->setTab(pTab2); diff --git a/ndb/test/src/UtilTransactions.cpp b/ndb/test/src/UtilTransactions.cpp index c0e6effd244..869f7fc76cb 100644 --- a/ndb/test/src/UtilTransactions.cpp +++ b/ndb/test/src/UtilTransactions.cpp @@ -619,7 +619,7 @@ UtilTransactions::addRowToInsert(Ndb* pNdb, int UtilTransactions::scanReadRecords(Ndb* pNdb, int parallelism, - bool exclusive, + NdbOperation::LockMode lm, int records, int noAttribs, int *attrib_list, @@ -669,10 +669,7 @@ UtilTransactions::scanReadRecords(Ndb* pNdb, return NDBT_FAILED; } - NdbResultSet * rs = pOp->readTuples(exclusive ? - NdbScanOperation::LM_Exclusive : - NdbScanOperation::LM_Read, - 0, parallelism); + NdbResultSet * rs = pOp->readTuples(lm, 0, parallelism); if( rs == 0 ) { ERR(pTrans->getNdbError()); pNdb->closeTransaction(pTrans); @@ -761,7 +758,7 @@ int UtilTransactions::selectCount(Ndb* pNdb, int parallelism, int* count_rows, - ScanLock lock, + NdbOperation::LockMode lm, NdbConnection* pTrans){ int retryAttempt = 0; @@ -785,19 +782,7 @@ UtilTransactions::selectCount(Ndb* pNdb, return NDBT_FAILED; } - NdbResultSet * rs; - switch(lock){ - case SL_ReadHold: - rs = pOp->readTuples(NdbScanOperation::LM_Read); - break; - case SL_Exclusive: - rs = pOp->readTuples(NdbScanOperation::LM_Exclusive); - break; - case SL_Read: - default: - rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead); - } - + NdbResultSet * rs = pOp->readTuples(lm); if( rs == 0) { ERR(pTrans->getNdbError()); pNdb->closeTransaction(pTrans); diff --git a/ndb/test/tools/create_index.cpp b/ndb/test/tools/create_index.cpp index 75a657522f6..6e4c5377f4a 100644 --- a/ndb/test/tools/create_index.cpp +++ b/ndb/test/tools/create_index.cpp @@ -30,7 +30,7 @@ main(int argc, const char** argv){ const char* _dbname = "TEST_DB"; int _help = 0; - int _ordered, _pk; + int _ordered = 0, _pk = 1; struct getargs args[] = { { "database", 'd', arg_string, &_dbname, "dbname", diff --git a/ndb/test/tools/hugoScanRead.cpp b/ndb/test/tools/hugoScanRead.cpp index cdfdcea4654..42180207a8a 100644 --- a/ndb/test/tools/hugoScanRead.cpp +++ b/ndb/test/tools/hugoScanRead.cpp @@ -35,13 +35,17 @@ int main(int argc, const char** argv){ int _parallelism = 1; const char* _tabname = NULL; int _help = 0; - + int lock = NdbOperation::LM_Read; + int sorted = 0; + struct getargs args[] = { { "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" }, { "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" }, { "parallelism", 'p', arg_integer, &_parallelism, "parallelism(1-240)", "para" }, { "records", 'r', arg_integer, &_records, "Number of records", "recs" }, - { "usage", '?', arg_flag, &_help, "Print help", "" } + { "usage", '?', arg_flag, &_help, "Print help", "" }, + { "lock", 'm', arg_integer, &lock, "lock mode", "" }, + { "sorted", 's', arg_flag, &sorted, "sorted", "" } }; int num_args = sizeof(args) / sizeof(args[0]); int optind = 0; @@ -73,16 +77,48 @@ int main(int argc, const char** argv){ ndbout << " Table " << _tabname << " does not exist!" << endl; return NDBT_ProgramExit(NDBT_WRONGARGS); } + + const NdbDictionary::Index * pIdx = 0; + if(optind+1 < argc) + { + pIdx = MyNdb.getDictionary()->getIndex(argv[optind+1], _tabname); + if(!pIdx) + ndbout << " Index " << argv[optind+1] << " not found" << endl; + else + if(pIdx->getType() != NdbDictionary::Index::UniqueOrderedIndex && + pIdx->getType() != NdbDictionary::Index::OrderedIndex) + { + ndbout << " Index " << argv[optind+1] << " is not scannable" << endl; + pIdx = 0; + } + } HugoTransactions hugoTrans(*pTab); int i = 0; while (i<_loops || _loops==0) { ndbout << i << ": "; - if(hugoTrans.scanReadRecords(&MyNdb, - 0, - _abort, - _parallelism) != 0){ - return NDBT_ProgramExit(NDBT_FAILED); + if(!pIdx) + { + if(hugoTrans.scanReadRecords(&MyNdb, + 0, + _abort, + _parallelism, + (NdbOperation::LockMode)lock) != 0) + { + return NDBT_ProgramExit(NDBT_FAILED); + } + } + else + { + if(hugoTrans.scanReadRecords(&MyNdb, pIdx, + 0, + _abort, + _parallelism, + (NdbOperation::LockMode)lock, + sorted) != 0) + { + return NDBT_ProgramExit(NDBT_FAILED); + } } i++; } diff --git a/ndb/tools/select_all.cpp b/ndb/tools/select_all.cpp index 758c1e48c88..a670a2ed49a 100644 --- a/ndb/tools/select_all.cpp +++ b/ndb/tools/select_all.cpp @@ -50,7 +50,7 @@ static struct my_option my_long_options[] = GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "parallelism", 'p', "parallelism", (gptr*) &_parallelism, (gptr*) &_parallelism, 0, - GET_INT, REQUIRED_ARG, 240, 0, 0, 0, 0, 0 }, + GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "lock", 'l', "Read(0), Read-hold(1), Exclusive(2)", (gptr*) &_lock, (gptr*) &_lock, 0, GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, @@ -133,13 +133,18 @@ int main(int argc, char** argv){ const NdbDictionary::Table* pTab = NDBT_Table::discoverTableFromDb(&MyNdb, _tabname); const NdbDictionary::Index * pIdx = 0; if(argc > 1){ - pIdx = MyNdb.getDictionary()->getIndex(argv[0], _tabname); + pIdx = MyNdb.getDictionary()->getIndex(argv[1], _tabname); } if(pTab == NULL){ ndbout << " Table " << _tabname << " does not exist!" << endl; return NDBT_ProgramExit(NDBT_WRONGARGS); } + + if(argc > 1 && pIdx == 0) + { + ndbout << " Index " << argv[1] << " does not exists" << endl; + } if(_order && pIdx == NULL){ ndbout << " Order flag given without an index" << endl; |