diff options
35 files changed, 847 insertions, 341 deletions
diff --git a/BUILD/SETUP.sh b/BUILD/SETUP.sh index aa6b412d73b..f8baf317e72 100644 --- a/BUILD/SETUP.sh +++ b/BUILD/SETUP.sh @@ -6,6 +6,7 @@ fi just_print= just_configure= +full_debug= while test $# -gt 0 do case "$1" in @@ -21,6 +22,7 @@ Any other options will be passed directly to configure. Note: this script is intended for internal use by MySQL developers. EOF + --with-debug=full ) full_debug="=full"; shift ;; * ) break ;; esac done @@ -48,7 +50,8 @@ fast_cflags="-O3 -fno-omit-frame-pointer" # this is one is for someone who thinks 1% speedup is worth not being # able to backtrace reckless_cflags="-O3 -fomit-frame-pointer " -debug_cflags="-DUNIV_MUST_NOT_INLINE -DEXTRA_DEBUG -DFORCE_INIT_OF_VARS -DSAFEMALLOC -DPEDANTIC_SAFEMALLOC -DSAFE_MUTEX -O1 -Wuninitialized" + +debug_cflags="-DUNIV_MUST_NOT_INLINE -DEXTRA_DEBUG -DFORCE_INIT_OF_VARS -DSAFEMALLOC -DPEDANTIC_SAFEMALLOC -DSAFE_MUTEX" base_cxxflags="-felide-constructors -fno-exceptions -fno-rtti" @@ -62,7 +65,11 @@ sparc_configs="" # and unset local_infile_configs local_infile_configs="--enable-local-infile" -debug_configs="--with-debug" +debug_configs="--with-debug$full_debug" +if [ -z "$full_debug" ] +then + debug_cflags="$debug_cflags -O1 -Wuninitialized" +fi if gmake --version > /dev/null 2>&1 then diff --git a/mysql-test/r/ndb_index_ordered.result b/mysql-test/r/ndb_index_ordered.result index 2a3050e5dea..e0c486ee2c5 100644 --- a/mysql-test/r/ndb_index_ordered.result +++ b/mysql-test/r/ndb_index_ordered.result @@ -37,6 +37,14 @@ a b c 1 2 3 2 3 5 3 4 6 +select tt1.* from t1 as tt1, t1 as tt2 use index(b) where tt1.b = tt2.b order by tt1.c; +a b c +6 7 2 +5 6 2 +1 2 3 +2 3 5 +3 4 6 +4 5 8 update t1 set c = 3 where b = 3; select * from t1 order by a; a b c diff --git a/mysql-test/t/ndb_index_ordered.test b/mysql-test/t/ndb_index_ordered.test index 2a94475df13..e1766f6e624 100644 --- a/mysql-test/t/ndb_index_ordered.test +++ b/mysql-test/t/ndb_index_ordered.test @@ -23,6 +23,9 @@ select * from t1 where b > 4 order by b; select * from t1 where b < 4 order by b; select * from t1 where b <= 4 order by b; +# Test of reset_bounds +select tt1.* from t1 as tt1, t1 as tt2 use index(b) where tt1.b = tt2.b order by tt1.c; + # # Here we should add some "explain select" to verify that the ordered index is # used for these queries. diff --git a/ndb/include/kernel/signaldata/AttrInfo.hpp b/ndb/include/kernel/signaldata/AttrInfo.hpp index 18bd9b22c40..c87470db8b0 100644 --- a/ndb/include/kernel/signaldata/AttrInfo.hpp +++ b/ndb/include/kernel/signaldata/AttrInfo.hpp @@ -35,7 +35,8 @@ class AttrInfo { */ friend class Dbtc; friend class Dblqh; - + friend class NdbScanOperation; + friend bool printATTRINFO(FILE *, const Uint32 *, Uint32, Uint16); public: diff --git a/ndb/include/kernel/signaldata/KeyInfo.hpp b/ndb/include/kernel/signaldata/KeyInfo.hpp index a4c698f89b2..686f3ae053d 100644 --- a/ndb/include/kernel/signaldata/KeyInfo.hpp +++ b/ndb/include/kernel/signaldata/KeyInfo.hpp @@ -26,6 +26,7 @@ class KeyInfo { friend class DbUtil; friend class NdbOperation; friend class NdbScanOperation; + friend class NdbIndexScanOperation; /** * Reciver(s) diff --git a/ndb/include/kernel/signaldata/ScanFrag.hpp b/ndb/include/kernel/signaldata/ScanFrag.hpp index d3a89b8dc25..41ea569c45d 100644 --- a/ndb/include/kernel/signaldata/ScanFrag.hpp +++ b/ndb/include/kernel/signaldata/ScanFrag.hpp @@ -34,14 +34,16 @@ class ScanFragReq { friend class Dblqh; public: STATIC_CONST( SignalLength = 12 ); - + + friend bool printSCAN_FRAGREQ(FILE *, const Uint32*, Uint32, Uint16); + public: Uint32 senderData; Uint32 resultRef; // Where to send the result Uint32 savePointId; Uint32 requestInfo; Uint32 tableId; - Uint32 fragmentNo; + Uint32 fragmentNoKeyLen; Uint32 schemaVersion; Uint32 transId1; Uint32 transId2; diff --git a/ndb/include/kernel/signaldata/ScanTab.hpp b/ndb/include/kernel/signaldata/ScanTab.hpp index 1acd7ae4736..fb5f18eae9e 100644 --- a/ndb/include/kernel/signaldata/ScanTab.hpp +++ b/ndb/include/kernel/signaldata/ScanTab.hpp @@ -55,7 +55,7 @@ private: * DATA VARIABLES */ UintR apiConnectPtr; // DATA 0 - UintR attrLen; // DATA 1 + UintR attrLenKeyLen; // DATA 1 UintR requestInfo; // DATA 2 UintR tableId; // DATA 3 UintR tableSchemaVersion; // DATA 4 @@ -74,6 +74,7 @@ private: static Uint8 getHoldLockFlag(const UintR & requestInfo); static Uint8 getReadCommittedFlag(const UintR & requestInfo); static Uint8 getRangeScanFlag(const UintR & requestInfo); + static Uint8 getKeyinfoFlag(const UintR & requestInfo); static Uint16 getScanBatch(const UintR & requestInfo); /** @@ -85,6 +86,7 @@ private: static void setHoldLockFlag(UintR & requestInfo, Uint32 flag); static void setReadCommittedFlag(UintR & requestInfo, Uint32 flag); static void setRangeScanFlag(UintR & requestInfo, Uint32 flag); + static void setKeyinfoFlag(UintR & requestInfo, Uint32 flag); static void setScanBatch(Uint32& requestInfo, Uint32 sz); }; @@ -95,12 +97,13 @@ private: l = Lock mode - 1 Bit 8 h = Hold lock mode - 1 Bit 10 c = Read Committed - 1 Bit 11 + k = Keyinfo - 1 Bit 12 x = Range Scan (TUX) - 1 Bit 15 b = Scan batch - 10 Bit 16-25 (max 1023) 1111111111222222222233 01234567890123456789012345678901 - ppppppppl hc xbbbbbbbbbb + ppppppppl hck xbbbbbbbbbb */ #define PARALLELL_SHIFT (0) @@ -112,6 +115,9 @@ private: #define HOLD_LOCK_SHIFT (10) #define HOLD_LOCK_MASK (1) +#define KEYINFO_SHIFT (12) +#define KEYINFO_MASK (1) + #define READ_COMMITTED_SHIFT (11) #define READ_COMMITTED_MASK (1) @@ -206,6 +212,20 @@ ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){ requestInfo |= (flag << SCAN_BATCH_SHIFT); } +inline +Uint8 +ScanTabReq::getKeyinfoFlag(const UintR & requestInfo){ + return (Uint8)((requestInfo >> KEYINFO_SHIFT) & KEYINFO_MASK); +} + +inline +void +ScanTabReq::setKeyinfoFlag(UintR & requestInfo, Uint32 flag){ + ASSERT_BOOL(flag, "ScanTabReq::setKeyinfoFlag"); + requestInfo |= (flag << KEYINFO_SHIFT); +} + + /** * * SENDER: Dbtc diff --git a/ndb/include/ndbapi/NdbConnection.hpp b/ndb/include/ndbapi/NdbConnection.hpp index ef4972f205b..d23a2e7cc0d 100644 --- a/ndb/include/ndbapi/NdbConnection.hpp +++ b/ndb/include/ndbapi/NdbConnection.hpp @@ -673,6 +673,9 @@ private: void printState(); #endif bool checkState_TransId(const Uint32 * transId) const; + + void remove_list(NdbOperation*& head, NdbOperation*); + void define_scan_op(NdbIndexScanOperation*); }; inline diff --git a/ndb/include/ndbapi/NdbIndexScanOperation.hpp b/ndb/include/ndbapi/NdbIndexScanOperation.hpp index 82aed04a9fc..a854cb58665 100644 --- a/ndb/include/ndbapi/NdbIndexScanOperation.hpp +++ b/ndb/include/ndbapi/NdbIndexScanOperation.hpp @@ -118,13 +118,20 @@ public: int setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len = 0); /** @} *********************************************************************/ + + /** + * Reset bounds and put operation in list that will be + * sent on next execute + */ + int reset_bounds(); + bool getSorted() const { return m_ordered; } private: NdbIndexScanOperation(Ndb* aNdb); virtual ~NdbIndexScanOperation(); int setBound(const NdbColumnImpl*, int type, const void* aValue, Uint32 len); - int saveBoundATTRINFO(); + int insertBOUNDS(Uint32 * data, Uint32 sz); virtual int equal_impl(const NdbColumnImpl*, const char*, Uint32); virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*); diff --git a/ndb/include/ndbapi/NdbOperation.hpp b/ndb/include/ndbapi/NdbOperation.hpp index a8bd8b9bfea..d35fea0e995 100644 --- a/ndb/include/ndbapi/NdbOperation.hpp +++ b/ndb/include/ndbapi/NdbOperation.hpp @@ -717,6 +717,8 @@ public: NotDefined ///< Internal for debugging }; + LockMode getLockMode() const { return theLockMode; } + protected: /****************************************************************************** * These are the methods used to create and delete the NdbOperation objects. @@ -749,7 +751,6 @@ protected: FinalGetValue, SubroutineExec, SubroutineEnd, - SetBound, WaitResponse, WaitCommitResponse, Finished, @@ -894,7 +895,7 @@ protected: // currently defined OperationType theOperationType; // Read Request, Update Req...... - Uint8 theLockMode; // Can be set to WRITE if read operation + LockMode theLockMode; // Can be set to WRITE if read operation OperationStatus theStatus; // The status of the operation. Uint32 theMagicNumber; // Magic number to verify that object // is correct @@ -921,9 +922,6 @@ protected: Uint16 m_keyInfoGSN; Uint16 m_attrInfoGSN; - // saveBoundATTRINFO() moves ATTRINFO here when setBound() is ready - NdbApiSignal* theBoundATTRINFO; - Uint32 theTotalBoundAI_Len; // Blobs in this operation NdbBlob* theBlobList; diff --git a/ndb/include/ndbapi/NdbResultSet.hpp b/ndb/include/ndbapi/NdbResultSet.hpp index 483e08179c0..478daf8aad2 100644 --- a/ndb/include/ndbapi/NdbResultSet.hpp +++ b/ndb/include/ndbapi/NdbResultSet.hpp @@ -138,7 +138,11 @@ public: */ int deleteTuple(); int deleteTuple(NdbConnection* takeOverTransaction); - + + /** + * Get underlying operation + */ + NdbOperation* getOperation(); private: NdbResultSet(NdbScanOperation*); @@ -149,4 +153,10 @@ private: NdbScanOperation* m_operation; }; +inline +NdbOperation* +NdbResultSet::getOperation(){ + return m_operation; +} + #endif diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp index e8a4408469c..a1c66b380a7 100644 --- a/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/ndb/include/ndbapi/NdbScanOperation.hpp @@ -87,12 +87,13 @@ protected: CursorType m_cursor_type; NdbScanOperation(Ndb* aNdb); - ~NdbScanOperation(); + virtual ~NdbScanOperation(); int nextResult(bool fetchAllowed = true); virtual void release(); void closeScan(); + int close_impl(class TransporterFacade*); // Overloaded methods from NdbCursorOperation int executeCursor(int ProcessorId); @@ -119,6 +120,7 @@ protected: int prepareSendScan(Uint32 TC_ConnectPtr, Uint64 TransactionId); int fix_receivers(Uint32 parallel); + void reset_receivers(Uint32 parallel, Uint32 ordered); Uint32* m_array; // containing all arrays below Uint32 m_allocated_receivers; NdbReceiver** m_receivers; // All receivers diff --git a/ndb/src/common/debugger/signaldata/Makefile.am b/ndb/src/common/debugger/signaldata/Makefile.am index 0a5806e1e00..c855c5f8a18 100644 --- a/ndb/src/common/debugger/signaldata/Makefile.am +++ b/ndb/src/common/debugger/signaldata/Makefile.am @@ -23,7 +23,8 @@ libsignaldataprint_la_SOURCES = \ FailRep.cpp DisconnectRep.cpp SignalDroppedRep.cpp \ SumaImpl.cpp NdbSttor.cpp CreateFragmentation.cpp \ UtilLock.cpp TuxMaint.cpp AccLock.cpp \ - LqhTrans.cpp ReadNodesConf.cpp CntrStart.cpp + LqhTrans.cpp ReadNodesConf.cpp CntrStart.cpp \ + ScanFrag.cpp include $(top_srcdir)/ndb/config/common.mk.am include $(top_srcdir)/ndb/config/type_ndbapi.mk.am diff --git a/ndb/src/common/debugger/signaldata/ScanFrag.cpp b/ndb/src/common/debugger/signaldata/ScanFrag.cpp new file mode 100644 index 00000000000..4d19a325637 --- /dev/null +++ b/ndb/src/common/debugger/signaldata/ScanFrag.cpp @@ -0,0 +1,42 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + + +#include <BlockNumbers.h> +#include <signaldata/ScanTab.hpp> +#include <signaldata/ScanFrag.hpp> + +bool +printSCAN_FRAGREQ(FILE * output, const Uint32 * theData, + Uint32 len, Uint16 receiverBlockNo) { + const ScanFragReq * const sig = (ScanFragReq *)theData; + fprintf(output, " senderData: %x\n", sig->senderData); + fprintf(output, " resultRef: %x\n", sig->resultRef); + fprintf(output, " savePointId: %x\n", sig->savePointId); + fprintf(output, " requestInfo: %x\n", sig->requestInfo); + fprintf(output, " tableId: %x\n", sig->tableId); + fprintf(output, " fragmentNo: %x\n", sig->fragmentNoKeyLen & 0xFFFF); + fprintf(output, " keyLen: %x\n", sig->fragmentNoKeyLen >> 16); + fprintf(output, " schemaVersion: %x\n", sig->schemaVersion); + fprintf(output, " transId1: %x\n", sig->transId1); + fprintf(output, " transId2: %x\n", sig->transId2); + fprintf(output, " clientOpPtr: %x\n", sig->clientOpPtr); + fprintf(output, " batch_size_rows: %x\n", sig->batch_size_rows); + fprintf(output, " batch_size_bytes: %x\n", sig->batch_size_bytes); + return true; +} + diff --git a/ndb/src/common/debugger/signaldata/ScanTab.cpp b/ndb/src/common/debugger/signaldata/ScanTab.cpp index 3f2109d9477..72a4d9f94b9 100644 --- a/ndb/src/common/debugger/signaldata/ScanTab.cpp +++ b/ndb/src/common/debugger/signaldata/ScanTab.cpp @@ -30,15 +30,18 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv fprintf(output, " apiConnectPtr: H\'%.8x", sig->apiConnectPtr); fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo); - fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Holdlock: %u, RangeScan: %u\n", + fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u\n", sig->getParallelism(requestInfo), sig->getScanBatch(requestInfo), sig->getLockMode(requestInfo), sig->getHoldLockFlag(requestInfo), - sig->getRangeScanFlag(requestInfo)); + sig->getRangeScanFlag(requestInfo), + sig->getKeyinfoFlag(requestInfo)); - fprintf(output, " attrLen: %d, tableId: %d, tableSchemaVer: %d\n", - sig->attrLen, sig->tableId, sig->tableSchemaVersion); + Uint32 keyLen = (sig->attrLenKeyLen >> 16); + Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF); + fprintf(output, " attrLen: %d, keyLen: %d tableId: %d, tableSchemaVer: %d\n", + attrLen, keyLen, sig->tableId, sig->tableSchemaVersion); fprintf(output, " transId(1, 2): (H\'%.8x, H\'%.8x) storedProcId: H\'%.8x\n", sig->transId1, sig->transId2, sig->storedProcId); diff --git a/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp b/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp index 65351663789..640449a0579 100644 --- a/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp +++ b/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp @@ -53,6 +53,7 @@ #include <signaldata/UtilPrepare.hpp> #include <signaldata/UtilExecute.hpp> #include <signaldata/ScanTab.hpp> +#include <signaldata/ScanFrag.hpp> #include <signaldata/LqhFrag.hpp> #include <signaldata/LqhTransConf.hpp> #include <signaldata/DropTab.hpp> @@ -250,6 +251,7 @@ SignalDataPrintFunctions[] = { ,{ GSN_TUX_MAINT_REQ, printTUX_MAINT_REQ } ,{ GSN_ACC_LOCKREQ, printACC_LOCKREQ } ,{ GSN_LQH_TRANSCONF, printLQH_TRANSCONF } + ,{ GSN_SCAN_FRAGREQ, printSCAN_FRAGREQ } }; const unsigned short NO_OF_PRINT_FUNCTIONS = sizeof(SignalDataPrintFunctions)/sizeof(NameFunctionPair); diff --git a/ndb/src/kernel/blocks/backup/Backup.cpp b/ndb/src/kernel/blocks/backup/Backup.cpp index 08a8bf83e20..569b3f98faa 100644 --- a/ndb/src/kernel/blocks/backup/Backup.cpp +++ b/ndb/src/kernel/blocks/backup/Backup.cpp @@ -3360,7 +3360,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal) req->senderData = filePtr.i; req->resultRef = reference(); req->schemaVersion = table.schemaVersion; - req->fragmentNo = fragNo; + req->fragmentNoKeyLen = fragNo; req->requestInfo = 0; req->savePointId = 0; req->tableId = table.tableId; diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index a94af7b59c8..b4531af7cd6 100644 --- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -2248,7 +2248,7 @@ private: void sendAttrinfoLoop(Signal* signal); void sendAttrinfoSignal(Signal* signal); void sendLqhAttrinfoSignal(Signal* signal); - void sendKeyinfoAcc(Signal* signal); + void sendKeyinfoAcc(Signal* signal, Uint32 pos); Uint32 initScanrec(const class ScanFragReq *); void initScanTc(Signal* signal, Uint32 transid1, diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 8342870d69c..c5e2bd900e9 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -2803,8 +2803,10 @@ void Dblqh::execKEYINFO(Signal* signal) return; }//if TcConnectionrec * const regTcPtr = tcConnectptr.p; - if (regTcPtr->transactionState != - TcConnectionrec::WAIT_TUPKEYINFO) { + TcConnectionrec::TransactionState state = regTcPtr->transactionState; + if (state != TcConnectionrec::WAIT_TUPKEYINFO && + state != TcConnectionrec::WAIT_SCAN_AI) + { jam(); /*****************************************************************************/ /* TRANSACTION WAS ABORTED, THIS IS MOST LIKELY A SIGNAL BELONGING TO THE */ @@ -2822,15 +2824,19 @@ void Dblqh::execKEYINFO(Signal* signal) return; }//if jam(); + abort(); terrorCode = errorCode; abortErrorLab(signal); return; }//if - FragrecordPtr regFragptr; - regFragptr.i = regTcPtr->fragmentptr; - ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord); - fragptr = regFragptr; - endgettupkeyLab(signal); + if(state == TcConnectionrec::WAIT_TUPKEYINFO) + { + FragrecordPtr regFragptr; + regFragptr.i = regTcPtr->fragmentptr; + ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord); + fragptr = regFragptr; + endgettupkeyLab(signal); + } return; }//Dblqh::execKEYINFO() @@ -2838,9 +2844,9 @@ void Dblqh::execKEYINFO(Signal* signal) /* FILL IN KEY DATA INTO DATA BUFFERS. */ /* ------------------------------------------------------------------------- */ Uint32 Dblqh::handleLongTupKey(Signal* signal, - Uint32 keyLength, - Uint32 primKeyLength, - Uint32* dataPtr) + Uint32 keyLength, + Uint32 primKeyLength, + Uint32* dataPtr) { TcConnectionrec * const regTcPtr = tcConnectptr.p; Uint32 dataPos = 0; @@ -3686,7 +3692,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) signal->theData[9] = sig3; signal->theData[10] = sig4; if (regTcPtr->primKeyLen > 4) { - sendKeyinfoAcc(signal); + sendKeyinfoAcc(signal, 11); }//if EXECUTE_DIRECT(refToBlock(regTcPtr->tcAccBlockref), GSN_ACCKEYREQ, signal, 7 + regTcPtr->primKeyLen); @@ -3708,9 +3714,8 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) /* ======= SEND KEYINFO TO ACC ======= */ /* */ /* ========================================================================== */ -void Dblqh::sendKeyinfoAcc(Signal* signal) +void Dblqh::sendKeyinfoAcc(Signal* signal, Uint32 Ti) { - UintR Ti = 11; DatabufPtr regDatabufptr; regDatabufptr.i = tcConnectptr.p->firstTupkeybuf; @@ -7409,7 +7414,8 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal) jamEntry(); const Uint32 reqinfo = scanFragReq->requestInfo; - const Uint32 fragId = scanFragReq->fragmentNo; + const Uint32 fragId = (scanFragReq->fragmentNoKeyLen & 0xFFFF); + const Uint32 keyLen = (scanFragReq->fragmentNoKeyLen >> 16); tabptr.i = scanFragReq->tableId; const Uint32 max_rows = scanFragReq->batch_size_rows; const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo); @@ -7473,6 +7479,8 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal) transid2, fragId, ZNIL); + tcConnectptr.p->save1 = 4; + tcConnectptr.p->primKeyLen = keyLen + 4; // hard coded in execKEYINFO errorCode = initScanrec(scanFragReq); if (errorCode != ZOK) { jam(); @@ -7672,34 +7680,18 @@ void Dblqh::accScanConfScanLab(Signal* signal) return; }//if scanptr.p->scanAccPtr = accScanConf->accPtr; - AttrbufPtr regAttrinbufptr; - regAttrinbufptr.i = tcConnectptr.p->firstAttrinbuf; - Uint32 boundAiLength = 0; + Uint32 boundAiLength = tcConnectptr.p->primKeyLen - 4; if (scanptr.p->rangeScan) { jam(); // bound info length is in first of the 5 header words - ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf); - boundAiLength = regAttrinbufptr.p->attrbuf[0]; TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend(); req->errorCode = RNIL; req->tuxScanPtrI = scanptr.p->scanAccPtr; req->boundAiLength = boundAiLength; - Uint32* out = (Uint32*)req + TuxBoundInfo::SignalLength; - Uint32 sz = 0; - while (sz < boundAiLength) { - jam(); - ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf); - Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN]; - MEMCOPY_NO_WORDS(&out[sz], - ®Attrinbufptr.p->attrbuf[0], - dataLen); - sz += dataLen; - regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT]; - ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf); - } - ndbrequire(sz == boundAiLength); + if(boundAiLength > 0) + sendKeyinfoAcc(signal, TuxBoundInfo::SignalLength); EXECUTE_DIRECT(DBTUX, GSN_TUX_BOUND_INFO, - signal, TuxBoundInfo::SignalLength + boundAiLength); + signal, TuxBoundInfo::SignalLength + boundAiLength); jamEntry(); if (req->errorCode != 0) { jam(); @@ -7716,12 +7708,14 @@ void Dblqh::accScanConfScanLab(Signal* signal) signal->theData[1] = tcConnectptr.p->tableref; signal->theData[2] = scanptr.p->scanSchemaVersion; signal->theData[3] = ZSTORED_PROC_SCAN; - ndbrequire(boundAiLength <= scanptr.p->scanAiLength); - signal->theData[4] = scanptr.p->scanAiLength - boundAiLength; + + signal->theData[4] = scanptr.p->scanAiLength; sendSignal(tcConnectptr.p->tcTupBlockref, GSN_STORED_PROCREQ, signal, 5, JBB); signal->theData[0] = tcConnectptr.p->tupConnectrec; + AttrbufPtr regAttrinbufptr; + regAttrinbufptr.i = tcConnectptr.p->firstAttrinbuf; while (regAttrinbufptr.i != RNIL) { ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf); jam(); diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp index a5f07f8e9e1..09a7708783e 100644 --- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp +++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp @@ -192,7 +192,8 @@ public: OS_WAIT_ATTR = 14, OS_WAIT_COMMIT_CONF = 15, OS_WAIT_ABORT_CONF = 16, - OS_WAIT_COMPLETE_CONF = 17 + OS_WAIT_COMPLETE_CONF = 17, + OS_WAIT_SCAN = 18 }; enum AbortState { @@ -1169,6 +1170,8 @@ public: // Length of expected attribute information Uint32 scanAiLength; + Uint32 scanKeyLen; + // Reference to ApiConnectRecord Uint32 scanApiRec; @@ -1194,18 +1197,7 @@ public: Uint16 first_batch_size; Uint32 batch_byte_size; - // Shall the locks be held until the application have read the - // records - Uint8 scanLockHold; - - // Shall the locks be read or write locks - Uint8 scanLockMode; - - // Skip locks by other transactions and read latest committed - Uint8 readCommitted; - - // Scan is on ordered index - Uint8 rangeScan; + Uint32 scanRequestInfo; // ScanFrag format // Close is ordered bool m_close_scan_req; @@ -1571,7 +1563,7 @@ private: void diFcountReqLab(Signal* signal, ScanRecordPtr); void signalErrorRefuseLab(Signal* signal); void abort080Lab(Signal* signal); - void packKeyData000Lab(Signal* signal, BlockReference TBRef); + void packKeyData000Lab(Signal* signal, BlockReference TBRef, Uint32 len); void abortScanLab(Signal* signal, ScanRecordPtr, Uint32 errCode); void sendAbortedAfterTimeout(Signal* signal, int Tcheck); void abort010Lab(Signal* signal); diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index d270c6acc61..890b6599d0a 100644 --- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -1759,6 +1759,7 @@ void Dbtc::execKEYINFO(Signal* signal) switch (apiConnectptr.p->apiConnectstate) { case CS_RECEIVING: case CS_REC_COMMITTING: + case CS_START_SCAN: jam(); /*empty*/; break; @@ -1812,12 +1813,54 @@ void Dbtc::execKEYINFO(Signal* signal) jam(); tckeyreq020Lab(signal); return; + case OS_WAIT_SCAN: + break; default: jam(); terrorCode = ZSTATE_ERROR; abortErrorLab(signal); return; }//switch + + UintR TdataPos = 0; + UintR TkeyLen = regCachePtr->keylen; + UintR Tlen = regCachePtr->save1; + + do { + if (cfirstfreeDatabuf == RNIL) { + jam(); + abort(); + seizeDatabuferrorLab(signal); + return; + }//if + linkKeybuf(signal); + arrGuard(TdataPos, 19); + databufptr.p->data[0] = signal->theData[TdataPos + 3]; + databufptr.p->data[1] = signal->theData[TdataPos + 4]; + databufptr.p->data[2] = signal->theData[TdataPos + 5]; + databufptr.p->data[3] = signal->theData[TdataPos + 6]; + Tlen = Tlen + 4; + TdataPos = TdataPos + 4; + if (Tlen < TkeyLen) { + jam(); + if (TdataPos >= tmaxData) { + jam(); + /*----------------------------------------------------*/ + /** EXIT AND WAIT FOR SIGNAL KEYINFO OR KEYINFO9 **/ + /** WHEN EITHER OF THE SIGNALS IS RECEIVED A JUMP **/ + /** TO LABEL "KEYINFO_LABEL" IS DONE. THEN THE **/ + /** PROGRAM RETURNS TO LABEL TCKEYREQ020 **/ + /*----------------------------------------------------*/ + setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__); + regCachePtr->save1 = Tlen; + return; + }//if + } else { + jam(); + return; + }//if + } while (1); + return; }//Dbtc::execKEYINFO() /*---------------------------------------------------------------------------*/ @@ -1826,45 +1869,45 @@ void Dbtc::execKEYINFO(Signal* signal) /* WE WILL ALWAYS PACK 4 WORDS AT A TIME. */ /*---------------------------------------------------------------------------*/ void Dbtc::packKeyData000Lab(Signal* signal, - BlockReference TBRef) + BlockReference TBRef, + Uint32 totalLen) { CacheRecord * const regCachePtr = cachePtr.p; UintR Tmp; - Uint16 tdataPos; jam(); - tdataPos = 0; - Tmp = regCachePtr->keylen; + Uint32 len = 0; databufptr.i = regCachePtr->firstKeybuf; + signal->theData[0] = tcConnectptr.i; + signal->theData[1] = apiConnectptr.p->transid[0]; + signal->theData[2] = apiConnectptr.p->transid[1]; + Uint32 * dst = signal->theData+3; + ptrCheckGuard(databufptr, cdatabufFilesize, databufRecord); + do { jam(); - if (tdataPos == 20) { - jam(); - /*---------------------------------------------------------------------*/ - /* 4 MORE WORDS WILL NOT FIT IN THE 24 DATA WORDS IN A SIGNAL */ - /*---------------------------------------------------------------------*/ - sendKeyinfo(signal, TBRef, 20); - tdataPos = 0; - }//if - Tmp = Tmp - 4; - ptrCheckGuard(databufptr, cdatabufFilesize, databufRecord); - cdata[tdataPos ] = databufptr.p->data[0]; - cdata[tdataPos + 1] = databufptr.p->data[1]; - cdata[tdataPos + 2] = databufptr.p->data[2]; - cdata[tdataPos + 3] = databufptr.p->data[3]; - tdataPos = tdataPos + 4; - if (Tmp <= 4) { + databufptr.i = databufptr.p->nextDatabuf; + dst[len + 0] = databufptr.p->data[0]; + dst[len + 1] = databufptr.p->data[1]; + dst[len + 2] = databufptr.p->data[2]; + dst[len + 3] = databufptr.p->data[3]; + len += 4; + if (totalLen <= 4) { jam(); /*---------------------------------------------------------------------*/ /* LAST PACK OF KEY DATA HAVE BEEN SENT */ /*---------------------------------------------------------------------*/ /* THERE WERE UNSENT INFORMATION, SEND IT. */ /*---------------------------------------------------------------------*/ - sendKeyinfo(signal, TBRef, tdataPos); - releaseKeys(); + sendSignal(TBRef, GSN_KEYINFO, signal, 3 + len, JBB); return; - }//if - databufptr.i = databufptr.p->nextDatabuf; + } else if(len == KeyInfo::DataLength){ + jam(); + len = 0; + sendSignal(TBRef, GSN_KEYINFO, signal, 3 + KeyInfo::DataLength, JBB); + } + totalLen -= 4; + ptrCheckGuard(databufptr, cdatabufFilesize, databufRecord); } while (1); }//Dbtc::packKeyData000Lab() @@ -3014,7 +3057,8 @@ void Dbtc::packLqhkeyreq(Signal* signal, UintR TfirstAttrbuf = regCachePtr->firstAttrbuf; sendlqhkeyreq(signal, TBRef); if (Tkeylen > 4) { - packKeyData000Lab(signal, TBRef); + packKeyData000Lab(signal, TBRef, Tkeylen - 4); + releaseKeys(); }//if packLqhkeyreq040Lab(signal, TfirstAttrbuf, @@ -6045,6 +6089,7 @@ void Dbtc::timeOutFoundLab(Signal* signal, Uint32 TapiConPtr) << " - place: " << c_apiConTimer_line[apiConnectptr.i]); switch (apiConnectptr.p->apiConnectstate) { case CS_STARTED: + ndbrequire(c_apiConTimer_line[apiConnectptr.i] != 3615); if(apiConnectptr.p->lqhkeyreqrec == apiConnectptr.p->lqhkeyconfrec){ jam(); /* @@ -8406,7 +8451,8 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) { const ScanTabReq * const scanTabReq = (ScanTabReq *)&signal->theData[0]; const Uint32 reqinfo = scanTabReq->requestInfo; - const Uint32 aiLength = scanTabReq->attrLen; + const Uint32 aiLength = (scanTabReq->attrLenKeyLen & 0xFFFF); + const Uint32 keyLen = scanTabReq->attrLenKeyLen >> 16; const Uint32 schemaVersion = scanTabReq->tableSchemaVersion; const Uint32 transid1 = scanTabReq->transId1; const Uint32 transid2 = scanTabReq->transId2; @@ -8480,8 +8526,12 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) seizeTcConnect(signal); tcConnectptr.p->apiConnect = apiConnectptr.i; + tcConnectptr.p->tcConnectstate = OS_WAIT_SCAN; + apiConnectptr.p->lastTcConnect = tcConnectptr.i; seizeCacheRecord(signal); + cachePtr.p->keylen = keyLen; + cachePtr.p->save1 = 0; scanptr = seizeScanrec(signal); ndbrequire(transP->apiScanRec == RNIL); @@ -8558,21 +8608,27 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, UintR scanParallel, UintR noOprecPerFrag) { - const UintR reqinfo = scanTabReq->requestInfo; - scanptr.p->scanTcrec = tcConnectptr.i; scanptr.p->scanApiRec = apiConnectptr.i; - scanptr.p->scanAiLength = scanTabReq->attrLen; + scanptr.p->scanAiLength = scanTabReq->attrLenKeyLen & 0xFFFF; + scanptr.p->scanKeyLen = scanTabReq->attrLenKeyLen >> 16; scanptr.p->scanTableref = tabptr.i; scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion; scanptr.p->scanParallel = scanParallel; scanptr.p->noOprecPerFrag = noOprecPerFrag; scanptr.p->first_batch_size= scanTabReq->first_batch_size; scanptr.p->batch_byte_size= scanTabReq->batch_byte_size; - scanptr.p->scanLockMode = ScanTabReq::getLockMode(reqinfo); - scanptr.p->scanLockHold = ScanTabReq::getHoldLockFlag(reqinfo); - scanptr.p->readCommitted = ScanTabReq::getReadCommittedFlag(reqinfo); - scanptr.p->rangeScan = ScanTabReq::getRangeScanFlag(reqinfo); + + Uint32 tmp = 0; + const UintR ri = scanTabReq->requestInfo; + ScanFragReq::setLockMode(tmp, ScanTabReq::getLockMode(ri)); + ScanFragReq::setHoldLockFlag(tmp, ScanTabReq::getHoldLockFlag(ri)); + ScanFragReq::setKeyinfoFlag(tmp, ScanTabReq::getKeyinfoFlag(ri)); + ScanFragReq::setReadCommittedFlag(tmp,ScanTabReq::getReadCommittedFlag(ri)); + ScanFragReq::setRangeScanFlag(tmp, ScanTabReq::getRangeScanFlag(ri)); + ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF); + + scanptr.p->scanRequestInfo = tmp; scanptr.p->scanStoredProcId = scanTabReq->storedProcId; scanptr.p->scanState = ScanRecord::RUNNING; scanptr.p->m_queued_count = 0; @@ -8589,7 +8645,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, ptr.p->m_apiPtr = cdata[i]; }//for - (* (scanptr.p->rangeScan ? + (* (ScanTabReq::getRangeScanFlag(ri) ? &c_counters.c_range_scan_count : &c_counters.c_scan_count))++; }//Dbtc::initScanrec() @@ -8807,6 +8863,7 @@ void Dbtc::releaseScanResources(ScanRecordPtr scanPtr) if (apiConnectptr.p->cachePtr != RNIL) { cachePtr.i = apiConnectptr.p->cachePtr; ptrCheckGuard(cachePtr, ccacheFilesize, cacheRecord); + releaseKeys(); releaseAttrinfo(); }//if tcConnectptr.i = scanPtr.p->scanTcrec; @@ -9448,17 +9505,8 @@ void Dbtc::sendScanFragReq(Signal* signal, ScanRecord* scanP, ScanFragRec* scanFragP) { - Uint32 requestInfo = 0; ScanFragReq * const req = (ScanFragReq *)&signal->theData[0]; - ScanFragReq::setLockMode(requestInfo, scanP->scanLockMode); - ScanFragReq::setHoldLockFlag(requestInfo, scanP->scanLockHold); - if(scanP->scanLockMode == 1){ // Not read -> keyinfo - jam(); - ScanFragReq::setKeyinfoFlag(requestInfo, 1); - } - ScanFragReq::setReadCommittedFlag(requestInfo, scanP->readCommitted); - ScanFragReq::setRangeScanFlag(requestInfo, scanP->rangeScan); - ScanFragReq::setAttrLen(requestInfo, scanP->scanAiLength); + Uint32 requestInfo = scanP->scanRequestInfo; ScanFragReq::setScanPrio(requestInfo, 1); apiConnectptr.i = scanP->scanApiRec; req->tableId = scanP->scanTableref; @@ -9466,7 +9514,7 @@ void Dbtc::sendScanFragReq(Signal* signal, ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord); req->senderData = scanFragptr.i; req->requestInfo = requestInfo; - req->fragmentNo = scanFragP->scanFragId; + req->fragmentNoKeyLen = scanFragP->scanFragId | (scanP->scanKeyLen << 16); req->resultRef = apiConnectptr.p->ndbapiBlockref; req->savePointId = apiConnectptr.p->currSavePointId; req->transId1 = apiConnectptr.p->transid[0]; @@ -9476,6 +9524,11 @@ void Dbtc::sendScanFragReq(Signal* signal, req->batch_size_bytes= scanP->batch_byte_size; sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, ScanFragReq::SignalLength, JBB); + if(scanP->scanKeyLen > 0) + { + tcConnectptr.i = scanFragptr.i; + packKeyData000Lab(signal, scanFragP->lqhBlockref, scanP->scanKeyLen); + } updateBuddyTimer(apiConnectptr); scanFragP->startFragTimer(ctcTimer); }//Dbtc::sendScanFragReq() @@ -10422,9 +10475,6 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) sp.p->scanSchemaVersion, sp.p->scanTableref, sp.p->scanStoredProcId); - infoEvent(" lhold=%d, lmode=%d", - sp.p->scanLockHold, - sp.p->scanLockMode); infoEvent(" apiRec=%d, next=%d", sp.p->scanApiRec, sp.p->nextScan); diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index 5b161d3c4ce..7414691ab78 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -129,14 +129,14 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal) // largest attrId seen plus one Uint32 maxAttrId = 0; // skip 5 words - if (req->boundAiLength < 5) { + unsigned offset = 0; + if (req->boundAiLength < offset) { jam(); scan.m_state = ScanOp::Invalid; sig->errorCode = TuxBoundInfo::InvalidAttrInfo; return; } const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength; - unsigned offset = 5; // walk through entries while (offset + 2 <= req->boundAiLength) { jam(); diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp index 052809cb084..8e651343ab7 100644 --- a/ndb/src/kernel/blocks/suma/Suma.cpp +++ b/ndb/src/kernel/blocks/suma/Suma.cpp @@ -1891,7 +1891,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ ScanFragReq::setHoldLockFlag(req->requestInfo, 0); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); ScanFragReq::setAttrLen(req->requestInfo, attrLen); - req->fragmentNo = fd.m_fragDesc.m_fragmentNo; + req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo; req->schemaVersion = tabPtr.p->m_schemaVersion; req->transId1 = 0; req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8); diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index 8ab0d13c67f..5e5d982444c 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -1115,15 +1115,8 @@ NdbConnection::getNdbScanOperation(const NdbTableImpl * tab) if (tOp == NULL) goto getNdbOp_error1; - // Link scan operation into list of cursor operations - if (m_theLastScanOperation == NULL) - m_theFirstScanOperation = m_theLastScanOperation = tOp; - else { - m_theLastScanOperation->next(tOp); - m_theLastScanOperation = tOp; - } - tOp->next(NULL); if (tOp->init(tab, this) != -1) { + define_scan_op(tOp); return tOp; } else { theNdb->releaseScanOperation(tOp); @@ -1135,6 +1128,31 @@ getNdbOp_error1: return NULL; }//NdbConnection::getNdbScanOperation() +void +NdbConnection::remove_list(NdbOperation*& list, NdbOperation* op){ + NdbOperation* tmp= list; + if(tmp == op) + list = op->next(); + else { + while(tmp && tmp->next() != op) tmp = tmp->next(); + if(tmp) + tmp->next(op->next()); + } + op->next(NULL); +} + +void +NdbConnection::define_scan_op(NdbIndexScanOperation * tOp){ + // Link scan operation into list of cursor operations + if (m_theLastScanOperation == NULL) + m_theFirstScanOperation = m_theLastScanOperation = tOp; + else { + m_theLastScanOperation->next(tOp); + m_theLastScanOperation = tOp; + } + tOp->next(NULL); +} + NdbScanOperation* NdbConnection::getNdbScanOperation(const NdbDictionary::Table * table) { diff --git a/ndb/src/ndbapi/NdbOperation.cpp b/ndb/src/ndbapi/NdbOperation.cpp index 53a94d98a5a..b0b95d0ff43 100644 --- a/ndb/src/ndbapi/NdbOperation.cpp +++ b/ndb/src/ndbapi/NdbOperation.cpp @@ -78,7 +78,6 @@ NdbOperation::NdbOperation(Ndb* aNdb) : m_tcReqGSN(GSN_TCKEYREQ), m_keyInfoGSN(GSN_KEYINFO), m_attrInfoGSN(GSN_ATTRINFO), - theBoundATTRINFO(NULL), theBlobList(NULL) { theReceiver.init(NdbReceiver::NDB_OPERATION, this); @@ -167,7 +166,6 @@ NdbOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection){ theScanInfo = 0; theTotalNrOfKeyWordInSignal = 8; theMagicNumber = 0xABCDEF01; - theBoundATTRINFO = NULL; theBlobList = NULL; tSignal = theNdb->getSignal(); @@ -263,14 +261,6 @@ NdbOperation::release() tSubroutine = tSubroutine->theNext; theNdb->releaseNdbSubroutine(tSaveSubroutine); } - tSignal = theBoundATTRINFO; - while (tSignal != NULL) - { - tSaveSignal = tSignal; - tSignal = tSignal->next(); - theNdb->releaseSignal(tSaveSignal); - } - theBoundATTRINFO = NULL; } tBlob = theBlobList; while (tBlob != NULL) diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp index 1cbfedd21b1..4809ba0fe01 100644 --- a/ndb/src/ndbapi/NdbOperationDefine.cpp +++ b/ndb/src/ndbapi/NdbOperationDefine.cpp @@ -55,6 +55,7 @@ NdbOperation::insertTuple() theOperationType = InsertRequest; tNdbCon->theSimpleState = 0; theErrorLine = tErrorLine++; + theLockMode = LM_Exclusive; return 0; } else { setErrorCode(4200); @@ -74,6 +75,7 @@ NdbOperation::updateTuple() tNdbCon->theSimpleState = 0; theOperationType = UpdateRequest; theErrorLine = tErrorLine++; + theLockMode = LM_Exclusive; return 0; } else { setErrorCode(4200); @@ -93,6 +95,7 @@ NdbOperation::writeTuple() tNdbCon->theSimpleState = 0; theOperationType = WriteRequest; theErrorLine = tErrorLine++; + theLockMode = LM_Exclusive; return 0; } else { setErrorCode(4200); @@ -115,6 +118,8 @@ NdbOperation::readTuple(NdbOperation::LockMode lm) case LM_CommittedRead: return readTuple(); break; + default: + return -1; }; } /****************************************************************************** @@ -130,6 +135,7 @@ NdbOperation::readTuple() tNdbCon->theSimpleState = 0; theOperationType = ReadRequest; theErrorLine = tErrorLine++; + theLockMode = LM_Read; return 0; } else { setErrorCode(4200); @@ -150,6 +156,7 @@ NdbOperation::deleteTuple() tNdbCon->theSimpleState = 0; theOperationType = DeleteRequest; theErrorLine = tErrorLine++; + theLockMode = LM_Exclusive; return 0; } else { setErrorCode(4200); @@ -170,6 +177,7 @@ NdbOperation::readTupleExclusive() tNdbCon->theSimpleState = 0; theOperationType = ReadExclusive; theErrorLine = tErrorLine++; + theLockMode = LM_Exclusive; return 0; } else { setErrorCode(4200); @@ -189,6 +197,7 @@ NdbOperation::simpleRead() theOperationType = ReadRequest; theSimpleIndicator = 1; theErrorLine = tErrorLine++; + theLockMode = LM_CommittedRead; return 0; } else { setErrorCode(4200); @@ -218,6 +227,7 @@ NdbOperation::committedRead() theSimpleIndicator = 1; theDirtyIndicator = 1; theErrorLine = tErrorLine++; + theLockMode = LM_CommittedRead; return 0; } else { setErrorCode(4200); @@ -240,6 +250,7 @@ NdbOperation::dirtyUpdate() theSimpleIndicator = 1; theDirtyIndicator = 1; theErrorLine = tErrorLine++; + theLockMode = LM_CommittedRead; return 0; } else { setErrorCode(4200); @@ -262,6 +273,7 @@ NdbOperation::dirtyWrite() theSimpleIndicator = 1; theDirtyIndicator = 1; theErrorLine = tErrorLine++; + theLockMode = LM_CommittedRead; return 0; } else { setErrorCode(4200); @@ -282,7 +294,7 @@ NdbOperation::interpretedUpdateTuple() tNdbCon->theSimpleState = 0; theOperationType = UpdateRequest; theAI_LenInCurrAI = 25; - + theLockMode = LM_Exclusive; theErrorLine = tErrorLine++; initInterpreter(); return 0; @@ -307,7 +319,7 @@ NdbOperation::interpretedDeleteTuple() theErrorLine = tErrorLine++; theAI_LenInCurrAI = 25; - + theLockMode = LM_Exclusive; initInterpreter(); return 0; } else { @@ -334,10 +346,6 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue) if ((tAttrInfo != NULL) && (!tAttrInfo->m_indexOnly) && (theStatus != Init)){ - if (theStatus == SetBound) { - ((NdbIndexScanOperation*)this)->saveBoundATTRINFO(); - theStatus = GetValue; - } if (theStatus != GetValue) { if (theInterpretIndicator == 1) { if (theStatus == FinalGetValue) { diff --git a/ndb/src/ndbapi/NdbOperationInt.cpp b/ndb/src/ndbapi/NdbOperationInt.cpp index f5d334fd79a..ee7b8132cd1 100644 --- a/ndb/src/ndbapi/NdbOperationInt.cpp +++ b/ndb/src/ndbapi/NdbOperationInt.cpp @@ -216,10 +216,6 @@ int NdbOperation::initial_interpreterCheck() { if ((theInterpretIndicator == 1)) { - if (theStatus == SetBound) { - ((NdbIndexScanOperation*)this)->saveBoundATTRINFO(); - theStatus = GetValue; - } if (theStatus == ExecInterpretedValue) { return 0; // Simply continue with interpretation } else if (theStatus == GetValue) { diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 3ff2a32d418..8acc0695e4e 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -32,6 +32,7 @@ #include <signaldata/ScanTab.hpp> #include <signaldata/KeyInfo.hpp> +#include <signaldata/AttrInfo.hpp> #include <signaldata/TcKeyReq.hpp> NdbScanOperation::NdbScanOperation(Ndb* aNdb) : @@ -116,10 +117,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection) theStatus = GetValue; theOperationType = OpenScanRequest; - - theTotalBoundAI_Len = 0; - theBoundATTRINFO = NULL; - + theNdbCon->theMagicNumber = 0xFE11DF; + return 0; } @@ -145,6 +144,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, } theNdbCon->theScanningOp = this; + theLockMode = lm; bool lockExcl, lockHoldMode, readCommitted; switch(lm){ @@ -168,7 +168,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, return 0; } - m_keyInfo = lockExcl; + m_keyInfo = lockExcl ? 1 : 0; bool range = false; if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex || @@ -181,7 +181,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, } assert (m_currentTable != m_accessTable); // Modify operation state - theStatus = SetBound; + theStatus = GetValue; theOperationType = OpenRangeScanRequest; range = true; } @@ -219,8 +219,17 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, req->transId1 = (Uint32) transId; req->transId2 = (Uint32) (transId >> 32); - getFirstATTRINFOScan(); + NdbApiSignal* tSignal = + theFirstKEYINFO; + theFirstKEYINFO = (tSignal ? tSignal : tSignal = theNdb->getSignal()); + theLastKEYINFO = tSignal; + + tSignal->setSignal(GSN_KEYINFO); + theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; + theTotalNrOfKeyWordInSignal= 0; + + getFirstATTRINFOScan(); return getResultSet(); } @@ -256,18 +265,7 @@ NdbScanOperation::fix_receivers(Uint32 parallel){ m_allocated_receivers = parallel; } - for(Uint32 i = 0; i<parallel; i++){ - m_receivers[i]->m_list_index = i; - m_prepared_receivers[i] = m_receivers[i]->getId(); - m_sent_receivers[i] = m_receivers[i]; - m_conf_receivers[i] = 0; - m_api_receivers[i] = 0; - } - - m_api_receivers_count = 0; - m_current_api_receiver = 0; - m_sent_receivers_count = parallel; - m_conf_receivers_count = 0; + reset_receivers(parallel, 0); return 0; } @@ -355,6 +353,7 @@ NdbScanOperation::getFirstATTRINFOScan() * After setBound() are done, move the accumulated ATTRINFO signals to * a separate list. Then continue with normal scan. */ +#if 0 int NdbIndexScanOperation::saveBoundATTRINFO() { @@ -401,6 +400,7 @@ NdbIndexScanOperation::saveBoundATTRINFO() } return res; } +#endif #define WAITFOR_SCAN_TIMEOUT 120000 @@ -409,14 +409,22 @@ NdbScanOperation::executeCursor(int nodeId){ NdbConnection * tCon = theNdbCon; TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); + + Uint32 magic = tCon->theMagicNumber; Uint32 seq = tCon->theNodeSequence; + if (tp->get_node_alive(nodeId) && (tp->getNodeSequence(nodeId) == seq)) { - - if(prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1) - return -1; + /** + * Only call prepareSendScan first time (incase of restarts) + * - check with theMagicNumber + */ tCon->theMagicNumber = 0x37412619; + if(magic != 0x37412619 && + prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1) + return -1; + if (doSendScan(nodeId) == -1) return -1; @@ -428,7 +436,6 @@ NdbScanOperation::executeCursor(int nodeId){ TRACE_DEBUG("The node is hard dead when attempting to start a scan"); setErrorCode(4029); tCon->theReleaseOnClose = true; - abort(); } else { TRACE_DEBUG("The node is stopping when attempting to start a scan"); setErrorCode(4030); @@ -635,7 +642,7 @@ NdbScanOperation::doSend(int ProcessorId) void NdbScanOperation::closeScan() { - if(m_transConnection) do { + if(m_transConnection){ if(DEBUG_NEXT_RESULT) ndbout_c("closeScan() theError.code = %d " "m_api_receivers_count = %d " @@ -648,55 +655,8 @@ void NdbScanOperation::closeScan() TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); - - Uint32 seq = theNdbCon->theNodeSequence; - Uint32 nodeId = theNdbCon->theDBnode; - - if(seq != tp->getNodeSequence(nodeId)){ - theNdbCon->theReleaseOnClose = true; - break; - } - - while(theError.code == 0 && m_sent_receivers_count){ - theNdb->theWaiter.m_node = nodeId; - theNdb->theWaiter.m_state = WAIT_SCAN; - int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); - switch(return_code){ - case 0: - break; - case -1: - setErrorCode(4008); - case -2: - m_api_receivers_count = 0; - m_conf_receivers_count = 0; - m_sent_receivers_count = 0; - theNdbCon->theReleaseOnClose = true; - } - } - - if(m_api_receivers_count+m_conf_receivers_count){ - // Send close scan - send_next_scan(0, true); // Close scan - } + close_impl(tp); - /** - * wait for close scan conf - */ - while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){ - theNdb->theWaiter.m_node = nodeId; - theNdb->theWaiter.m_state = WAIT_SCAN; - int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); - switch(return_code){ - case 0: - break; - case -1: - setErrorCode(4008); - case -2: - m_api_receivers_count = 0; - m_conf_receivers_count = 0; - m_sent_receivers_count = 0; - } - } } while(0); theNdbCon->theScanningOp = 0; @@ -750,11 +710,6 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, return -1; } - if (theStatus == SetBound) { - ((NdbIndexScanOperation*)this)->saveBoundATTRINFO(); - theStatus = GetValue; - } - theErrorLine = 0; // In preapareSendInterpreted we set the sizes (word 4-8) in the @@ -766,26 +721,7 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, ((NdbIndexScanOperation*)this)->fix_get_values(); } - const Uint32 transId1 = (Uint32) (aTransactionId & 0xFFFFFFFF); - const Uint32 transId2 = (Uint32) (aTransactionId >> 32); - - if (theOperationType == OpenRangeScanRequest) { - NdbApiSignal* tSignal = theBoundATTRINFO; - do{ - tSignal->setData(aTC_ConnectPtr, 1); - tSignal->setData(transId1, 2); - tSignal->setData(transId2, 3); - tSignal = tSignal->next(); - } while (tSignal != NULL); - } theCurrentATTRINFO->setLength(theAI_LenInCurrAI); - NdbApiSignal* tSignal = theFirstATTRINFO; - do{ - tSignal->setData(aTC_ConnectPtr, 1); - tSignal->setData(transId1, 2); - tSignal->setData(transId2, 3); - tSignal = tSignal->next(); - } while (tSignal != NULL); /** * Prepare all receivers @@ -808,20 +744,28 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, req->batch_byte_size= batch_byte_size; req->first_batch_size= first_batch_size; + /** + * Set keyinfo flag + * (Always keyinfo when using blobs) + */ + Uint32 reqInfo = req->requestInfo; + ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo); + req->requestInfo = reqInfo; + for(Uint32 i = 0; i<theParallelism; i++){ m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size); } return 0; } -/****************************************************************************** +/***************************************************************************** int doSend() Return Value: Return >0 : send was succesful, returns number of signals sent Return -1: In all other case. Parameters: aProcessorId: Receiving processor node Remark: Sends the ATTRINFO signal(s) -******************************************************************************/ +*****************************************************************************/ int NdbScanOperation::doSendScan(int aProcessorId) { @@ -841,13 +785,18 @@ NdbScanOperation::doSendScan(int aProcessorId) setErrorCode(4001); return -1; } + + Uint32 tupKeyLen = theTupKeyLen; + Uint32 len = theTotalNrOfKeyWordInSignal; + Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr; + Uint64 transId = theNdbCon->theTransactionId; + // Update the "attribute info length in words" in SCAN_TABREQ before // sending it. This could not be done in openScan because // we created the ATTRINFO signals after the SCAN_TABREQ signal. ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend()); - req->attrLen = theTotalCurrAI_Len; - if (theOperationType == OpenRangeScanRequest) - req->attrLen += theTotalBoundAI_Len; + req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len; + TransporterFacade *tp = TransporterFacade::instance(); LinearSectionPtr ptr[3]; ptr[0].p = m_prepared_receivers; @@ -856,22 +805,41 @@ NdbScanOperation::doSendScan(int aProcessorId) setErrorCode(4002); return -1; } - if (theOperationType == OpenRangeScanRequest) { + + if (tupKeyLen > 0){ // must have at least one signal since it contains attrLen for bounds - assert(theBoundATTRINFO != NULL); - tSignal = theBoundATTRINFO; - while (tSignal != NULL) { + assert(theLastKEYINFO != NULL); + tSignal = theLastKEYINFO; + tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal); + + assert(theFirstKEYINFO != NULL); + tSignal = theFirstKEYINFO; + + NdbApiSignal* last; + do { + KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend()); + keyInfo->connectPtr = aTC_ConnectPtr; + keyInfo->transId[0] = Uint32(transId); + keyInfo->transId[1] = Uint32(transId >> 32); + if (tp->sendSignal(tSignal,aProcessorId) == -1){ - setErrorCode(4002); - return -1; + setErrorCode(4002); + return -1; } + tSignalCount++; + last = tSignal; tSignal = tSignal->next(); - } + } while(last != theLastKEYINFO); } tSignal = theFirstATTRINFO; while (tSignal != NULL) { + AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend()); + attrInfo->connectPtr = aTC_ConnectPtr; + attrInfo->transId[0] = Uint32(transId); + attrInfo->transId[1] = Uint32(transId >> 32); + if (tp->sendSignal(tSignal,aProcessorId) == -1){ setErrorCode(4002); return -1; @@ -883,7 +851,7 @@ NdbScanOperation::doSendScan(int aProcessorId) return tSignalCount; }//NdbOperation::doSendScan() -/****************************************************************************** +/***************************************************************************** * NdbOperation* takeOverScanOp(NdbConnection* updateTrans); * * Parameters: The update transactions NdbConnection pointer. @@ -902,7 +870,7 @@ NdbScanOperation::doSendScan(int aProcessorId) * This means that the updating transactions can be placed * in separate threads and thus increasing the parallelism during * the scan process. - *****************************************************************************/ + ****************************************************************************/ int NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size) { @@ -1011,6 +979,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ NdbBlob* NdbScanOperation::getBlobHandle(const char* anAttrName) { + m_keyInfo = 1; return NdbOperation::getBlobHandle(m_transConnection, m_currentTable->getColumn(anAttrName)); } @@ -1018,6 +987,7 @@ NdbScanOperation::getBlobHandle(const char* anAttrName) NdbBlob* NdbScanOperation::getBlobHandle(Uint32 anAttrId) { + m_keyInfo = 1; return NdbOperation::getBlobHandle(m_transConnection, m_currentTable->getColumn(anAttrId)); } @@ -1031,13 +1001,15 @@ NdbIndexScanOperation::~NdbIndexScanOperation(){ } int -NdbIndexScanOperation::setBound(const char* anAttrName, int type, const void* aValue, Uint32 len) +NdbIndexScanOperation::setBound(const char* anAttrName, int type, + const void* aValue, Uint32 len) { return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len); } int -NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len) +NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, + const void* aValue, Uint32 len) { return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len); } @@ -1056,11 +1028,6 @@ NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, return NdbScanOperation::getValue_impl(attrInfo, aValue); } - if (theStatus == SetBound) { - saveBoundATTRINFO(); - theStatus = GetValue; - } - int id = attrInfo->m_attrId; // In "real" table assert(m_accessTable->m_index); int sz = (int)m_accessTable->m_index->m_key_ids.size(); @@ -1101,12 +1068,13 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, int type, const void* aValue, Uint32 len) { if (theOperationType == OpenRangeScanRequest && - theStatus == SetBound && (0 <= type && type <= 4) && len <= 8000) { // insert bound type - insertATTRINFO(type); + Uint32 currLen = theTotalNrOfKeyWordInSignal; + Uint32 remaining = KeyInfo::DataLength - currLen; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; + // normalize char bound CHARSET_INFO* cs = tAttrInfo->m_cs; Uint32 xfrmData[2000]; @@ -1130,19 +1098,34 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, Uint32 tIndexAttrId = tAttrInfo->m_attrId; Uint32 sizeInWords = (len + 3) / 4; AttributeHeader ah(tIndexAttrId, sizeInWords); - insertATTRINFO(ah.m_value); - if (len != 0) { - // insert attribute data - if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0) - insertATTRINFOloop((const Uint32*)aValue, sizeInWords); - else { - Uint32 tempData[2000]; - memcpy(tempData, aValue, len); + const Uint32 ahValue = ah.m_value; + + const bool aligned = (UintPtr(aValue) & 3) == 0; + const bool nobytes = (len & 0x3) == 0; + const Uint32 totalLen = 2 + sizeInWords; + Uint32 tupKeyLen = theTupKeyLen; + if(remaining > totalLen && aligned && nobytes){ + Uint32 * dst = theKEYINFOptr + currLen; + * dst ++ = type; + * dst ++ = ahValue; + memcpy(dst, aValue, 4 * sizeInWords); + theTotalNrOfKeyWordInSignal = currLen + totalLen; + } else { + if(!aligned || !nobytes){ + Uint32 tempData[2002]; + tempData[0] = type; + tempData[1] = ahValue; + memcpy(tempData+2, aValue, len); while ((len & 0x3) != 0) - ((char*)tempData)[len++] = 0; - insertATTRINFOloop(tempData, sizeInWords); + ((char*)&tempData[2])[len++] = 0; + insertBOUNDS(tempData, 2+sizeInWords); + } else { + Uint32 buf[2] = { type, ahValue }; + insertBOUNDS(buf, 2); + insertBOUNDS((Uint32*)aValue, sizeInWords); } } + theTupKeyLen = tupKeyLen + totalLen; /** * Do sorted stuff @@ -1165,6 +1148,46 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, } } +int +NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){ + Uint32 len; + Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal; + Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal; + do { + len = (sz < remaining ? sz : remaining); + memcpy(dst, data, 4 * len); + + if(sz >= remaining){ + NdbApiSignal* tCurr = theLastKEYINFO; + tCurr->setLength(KeyInfo::MaxSignalLength); + NdbApiSignal* tSignal = tCurr->next(); + if(tSignal) + ; + else if((tSignal = theNdb->getSignal()) != 0) + { + tCurr->next(tSignal); + tSignal->setSignal(GSN_KEYINFO); + } else { + goto error; + } + theLastKEYINFO = tSignal; + theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; + remaining = KeyInfo::DataLength; + sz -= len; + data += len; + } else { + len = (KeyInfo::DataLength - remaining) + len; + break; + } + } while(sz >= 0); + theTotalNrOfKeyWordInSignal = len; + return 0; + +error: + setErrorCodeAbort(4228); // XXX wrong code + return -1; +} + NdbResultSet* NdbIndexScanOperation::readTuples(LockMode lm, Uint32 batch, @@ -1173,9 +1196,23 @@ NdbIndexScanOperation::readTuples(LockMode lm, NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0); if(rs && order_by){ m_ordered = 1; - m_sort_columns = m_accessTable->getNoOfColumns() - 1; // -1 for NDB$NODE + Uint32 cnt = m_accessTable->getNoOfColumns() - 1; + m_sort_columns = cnt; // -1 for NDB$NODE m_current_api_receiver = m_sent_receivers_count; m_api_receivers_count = m_sent_receivers_count; + + m_sort_columns = cnt; + for(Uint32 i = 0; i<cnt; i++){ + const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i]; + const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos); + NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1); + UintPtr newVal = UintPtr(tmp); + theTupleKeyDefined[i][0] = FAKE_PTR; + theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF); +#if (SIZEOF_CHARP == 8) + theTupleKeyDefined[i][2] = (newVal >> 32); +#endif + } } return rs; } @@ -1396,10 +1433,7 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ } int -NdbScanOperation::restart(){ - TransporterFacade* tp = TransporterFacade::instance(); - Guard guard(tp->theMutexPtr); - +NdbScanOperation::close_impl(TransporterFacade* tp){ Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; @@ -1407,8 +1441,8 @@ NdbScanOperation::restart(){ theNdbCon->theReleaseOnClose = true; return -1; } - - while(m_sent_receivers_count){ + + while(theError.code == 0 && m_sent_receivers_count){ theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1421,14 +1455,17 @@ NdbScanOperation::restart(){ m_api_receivers_count = 0; m_conf_receivers_count = 0; m_sent_receivers_count = 0; + theNdbCon->theReleaseOnClose = true; return -1; } } if(m_api_receivers_count+m_conf_receivers_count){ // Send close scan - if(send_next_scan(0, true) == -1) // Close scan + if(send_next_scan(0, true) == -1){ // Close scan + theNdbCon->theReleaseOnClose = true; return -1; + } } /** @@ -1447,15 +1484,15 @@ NdbScanOperation::restart(){ m_api_receivers_count = 0; m_conf_receivers_count = 0; m_sent_receivers_count = 0; + theNdbCon->theReleaseOnClose = true; return -1; } } + return 0; +} - /** - * Reset receivers - */ - const Uint32 parallell = theParallelism; - +void +NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){ for(Uint32 i = 0; i<parallell; i++){ m_receivers[i]->m_list_index = i; m_prepared_receivers[i] = m_receivers[i]->getId(); @@ -1470,13 +1507,64 @@ NdbScanOperation::restart(){ m_sent_receivers_count = parallell; m_conf_receivers_count = 0; - if(m_ordered){ + if(ordered){ m_current_api_receiver = parallell; m_api_receivers_count = parallell; } +} + +int +NdbScanOperation::restart() +{ + + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + Uint32 nodeId = theNdbCon->theDBnode; + + { + int res; + if((res= close_impl(tp))) + { + return res; + } + } + + /** + * Reset receivers + */ + reset_receivers(theParallelism, m_ordered); + theError.code = 0; if (doSendScan(nodeId) == -1) return -1; return 0; } + +int +NdbIndexScanOperation::reset_bounds(){ + int res; + + { + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + res= close_impl(tp); + } + + if(!res) + { + theError.code = 0; + reset_receivers(theParallelism, m_ordered); + + theLastKEYINFO = theFirstKEYINFO; + theKEYINFOptr = ((KeyInfo*)theFirstKEYINFO->getDataPtrSend())->keyData; + theTupKeyLen = 0; + theTotalNrOfKeyWordInSignal = 0; + m_transConnection + ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp, + this); + m_transConnection->define_scan_op(this); + return 0; + } + return res; +} diff --git a/ndb/src/ndbapi/Ndblist.cpp b/ndb/src/ndbapi/Ndblist.cpp index af98aa09280..b6739b66dce 100644 --- a/ndb/src/ndbapi/Ndblist.cpp +++ b/ndb/src/ndbapi/Ndblist.cpp @@ -649,8 +649,8 @@ Remark: Always release the first item in the free list void Ndb::freeScanOperation() { - NdbScanOperation* tOp = theScanOpIdleList; - theScanOpIdleList = (NdbIndexScanOperation *) theScanOpIdleList->next(); + NdbIndexScanOperation* tOp = theScanOpIdleList; + theScanOpIdleList = (NdbIndexScanOperation *)tOp->next(); delete tOp; } diff --git a/ndb/test/include/NDBT_ResultRow.hpp b/ndb/test/include/NDBT_ResultRow.hpp index aa54e892da3..6072d0ea510 100644 --- a/ndb/test/include/NDBT_ResultRow.hpp +++ b/ndb/test/include/NDBT_ResultRow.hpp @@ -24,8 +24,9 @@ public: NDBT_ResultRow(const NdbDictionary::Table &tab, char attrib_delimiter='\t'); ~NDBT_ResultRow(); NdbRecAttr * & attributeStore(int i); - const NdbRecAttr * attributeStore(const char* name); - + const NdbRecAttr * attributeStore(int i) const ; + const NdbRecAttr * attributeStore(const char* name) const ; + BaseString c_str(); NdbOut & header (NdbOut &) const; diff --git a/ndb/test/include/UtilTransactions.hpp b/ndb/test/include/UtilTransactions.hpp index 1298028d591..37cd99550a5 100644 --- a/ndb/test/include/UtilTransactions.hpp +++ b/ndb/test/include/UtilTransactions.hpp @@ -87,19 +87,30 @@ private: int verifyUniqueIndex(Ndb*, - const char* indexName, + const NdbDictionary::Index *, int parallelism = 0, bool transactional = false); - + int scanAndCompareUniqueIndex(Ndb* pNdb, - const char * indexName, + const NdbDictionary::Index *, int parallelism, bool transactional); int readRowFromTableAndIndex(Ndb* pNdb, NdbConnection* pTrans, - const char * indexName, + const NdbDictionary::Index *, NDBT_ResultRow& row ); + + int verifyOrderedIndex(Ndb*, + const NdbDictionary::Index *, + int parallelism = 0, + bool transactional = false); + + + int get_values(NdbOperation* op, NDBT_ResultRow& dst); + int equal(const NdbDictionary::Table*, NdbOperation*, const NDBT_ResultRow&); + int equal(const NdbDictionary::Index*, NdbOperation*, const NDBT_ResultRow&); + protected: int m_defaultClearMethod; const NdbDictionary::Table& tab; diff --git a/ndb/test/ndbapi/testBlobs.cpp b/ndb/test/ndbapi/testBlobs.cpp index e18f4a8bd1a..41bb82f3e06 100644 --- a/ndb/test/ndbapi/testBlobs.cpp +++ b/ndb/test/ndbapi/testBlobs.cpp @@ -1030,7 +1030,7 @@ readScan(int style, bool idx) } else { CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0); } - CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Exclusive)) != 0); + CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Read)) != 0); CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0); if (g_opt.m_pk2len != 0) CHK(g_ops->getValue("PK2", tup.m_pk2) != 0); diff --git a/ndb/test/src/NDBT_ResultRow.cpp b/ndb/test/src/NDBT_ResultRow.cpp index 7c419444760..f82963901b1 100644 --- a/ndb/test/src/NDBT_ResultRow.cpp +++ b/ndb/test/src/NDBT_ResultRow.cpp @@ -58,10 +58,14 @@ NDBT_ResultRow::attributeStore(int i){ return data[i]; } +const NdbRecAttr* +NDBT_ResultRow::attributeStore(int i) const { + return data[i]; +} const NdbRecAttr * -NDBT_ResultRow::attributeStore(const char* name){ +NDBT_ResultRow::attributeStore(const char* name) const { for(int i = 0; i<cols; i++){ if (strcmp(names[i], name) == 0) return data[i]; diff --git a/ndb/test/src/UtilTransactions.cpp b/ndb/test/src/UtilTransactions.cpp index 506356dd140..52341c0e0e6 100644 --- a/ndb/test/src/UtilTransactions.cpp +++ b/ndb/test/src/UtilTransactions.cpp @@ -854,11 +854,12 @@ UtilTransactions::verifyIndex(Ndb* pNdb, ndbout << " Index " << indexName << " does not exist!" << endl; return NDBT_FAILED; } - + switch (pIndex->getType()){ case NdbDictionary::Index::UniqueHashIndex: + return verifyUniqueIndex(pNdb, pIndex, parallelism, transactional); case NdbDictionary::Index::OrderedIndex: - return verifyUniqueIndex(pNdb, indexName, parallelism, transactional); + return verifyOrderedIndex(pNdb, pIndex, parallelism, transactional); break; default: ndbout << "Unknown index type" << endl; @@ -870,7 +871,7 @@ UtilTransactions::verifyIndex(Ndb* pNdb, int UtilTransactions::verifyUniqueIndex(Ndb* pNdb, - const char* indexName, + const NdbDictionary::Index * pIndex, int parallelism, bool transactional){ @@ -882,7 +883,7 @@ UtilTransactions::verifyUniqueIndex(Ndb* pNdb, */ if (scanAndCompareUniqueIndex(pNdb, - indexName, + pIndex, parallelism, transactional) != NDBT_OK){ return NDBT_FAILED; @@ -896,7 +897,7 @@ UtilTransactions::verifyUniqueIndex(Ndb* pNdb, int UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb, - const char * indexName, + const NdbDictionary::Index* pIndex, int parallelism, bool transactional){ @@ -996,7 +997,7 @@ UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb, if (readRowFromTableAndIndex(pNdb, pTrans, - indexName, + pIndex, row) != NDBT_OK){ pNdb->closeTransaction(pTrans); return NDBT_FAILED; @@ -1027,15 +1028,9 @@ UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb, int UtilTransactions::readRowFromTableAndIndex(Ndb* pNdb, NdbConnection* scanTrans, - const char * indexName, + const NdbDictionary::Index* pIndex, NDBT_ResultRow& row ){ - const NdbDictionary::Index* pIndex - = pNdb->getDictionary()->getIndex(indexName, tab.getName()); - if (pIndex == 0){ - ndbout << " Index " << indexName << " does not exist!" << endl; - return NDBT_FAILED; - } NdbDictionary::Index::Type indexType= pIndex->getType(); int retryAttempt = 0; @@ -1050,7 +1045,7 @@ UtilTransactions::readRowFromTableAndIndex(Ndb* pNdb, // Allocate place to store the result NDBT_ResultRow tabRow(tab); NDBT_ResultRow indexRow(tab); - + const char * indexName = pIndex->getName(); while (true){ if(retryAttempt) @@ -1281,3 +1276,239 @@ close_all: return return_code; } + +int +UtilTransactions::verifyOrderedIndex(Ndb* pNdb, + const NdbDictionary::Index* pIndex, + int parallelism, + bool transactional){ + + int retryAttempt = 0; + const int retryMax = 100; + int check; + NdbConnection *pTrans; + NdbScanOperation *pOp; + NdbIndexScanOperation * iop = 0; + NdbResultSet* cursor= 0; + + NDBT_ResultRow scanRow(tab); + NDBT_ResultRow pkRow(tab); + NDBT_ResultRow indexRow(tab); + const char * indexName = pIndex->getName(); + + int res; + parallelism = 1; + + while (true){ + + if (retryAttempt >= retryMax){ + g_info << "ERROR: has retried this operation " << retryAttempt + << " times, failing!" << endl; + return NDBT_FAILED; + } + + pTrans = pNdb->startTransaction(); + if (pTrans == NULL) { + const NdbError err = pNdb->getNdbError(); + + if (err.status == NdbError::TemporaryError){ + ERR(err); + NdbSleep_MilliSleep(50); + retryAttempt++; + continue; + } + ERR(err); + return NDBT_FAILED; + } + + pOp = pTrans->getNdbScanOperation(tab.getName()); + if (pOp == NULL) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + NdbResultSet* + rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism); + + if( rs == 0 ) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + check = pOp->interpret_exit_ok(); + if( check == -1 ) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + if(get_values(pOp, scanRow)) + { + abort(); + } + + check = pTrans->execute(NoCommit); + if( check == -1 ) { + const NdbError err = pTrans->getNdbError(); + + if (err.status == NdbError::TemporaryError){ + ERR(err); + pNdb->closeTransaction(pTrans); + NdbSleep_MilliSleep(50); + retryAttempt++; + continue; + } + ERR(err); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + int eof; + int rows = 0; + while(check == 0 && (eof = rs->nextResult()) == 0){ + rows++; + + bool null_found= false; + for(int a = 0; a<(int)pIndex->getNoOfColumns(); a++){ + const NdbDictionary::Column * col = pIndex->getColumn(a); + if (scanRow.attributeStore(col->getName())->isNULL()) + { + null_found= true; + break; + } + } + + // Do pk lookup + NdbOperation * pk = pTrans->getNdbOperation(tab.getName()); + if(!pk || pk->readTuple()) + goto error; + if(equal(&tab, pk, scanRow) || get_values(pk, pkRow)) + goto error; + + if(!null_found) + { + if(!iop && (iop= pTrans->getNdbIndexScanOperation(indexName, + tab.getName()))) + { + cursor= iop->readTuples(NdbScanOperation::LM_CommittedRead, + parallelism); + iop->interpret_exit_ok(); + if(!cursor || get_values(iop, indexRow)) + goto error; + } + else if(!iop || iop->reset_bounds()) + { + goto error; + } + + if(equal(pIndex, iop, scanRow)) + goto error; + } + + check = pTrans->execute(NoCommit); + if(check) + goto error; + + if(scanRow.c_str() != pkRow.c_str()){ + g_err << "Error when comapring records" << endl; + g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl; + g_err << " pkRow: \n" << pkRow.c_str().c_str() << endl; + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + if(!null_found) + { + + if((res= cursor->nextResult()) != 0){ + g_err << "Failed to find row using index: " << res << endl; + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + if(scanRow.c_str() != indexRow.c_str()){ + g_err << "Error when comapring records" << endl; + g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl; + g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl; + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + if(cursor->nextResult() == 0){ + g_err << "Found extra row!!" << endl; + g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl; + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + } + } + + if (eof == -1 || check == -1) { + error: + const NdbError err = pTrans->getNdbError(); + + if (err.status == NdbError::TemporaryError){ + ERR(err); + iop = 0; + pNdb->closeTransaction(pTrans); + NdbSleep_MilliSleep(50); + retryAttempt++; + rows--; + continue; + } + ERR(err); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + pNdb->closeTransaction(pTrans); + + return NDBT_OK; + } + return NDBT_FAILED; +} + +int +UtilTransactions::get_values(NdbOperation* op, NDBT_ResultRow& dst) +{ + for (int a = 0; a < tab.getNoOfColumns(); a++){ + NdbRecAttr*& ref= dst.attributeStore(a); + if ((ref= op->getValue(a)) == 0) + { + return NDBT_FAILED; + } + } + return 0; +} + +int +UtilTransactions::equal(const NdbDictionary::Index* pIndex, + NdbOperation* op, const NDBT_ResultRow& src) +{ + for(Uint32 a = 0; a<pIndex->getNoOfColumns(); a++){ + const NdbDictionary::Column * col = pIndex->getColumn(a); + if(op->equal(col->getName(), + src.attributeStore(col->getName())->aRef()) != 0){ + return NDBT_FAILED; + } + } + return 0; +} + +int +UtilTransactions::equal(const NdbDictionary::Table* pTable, + NdbOperation* op, const NDBT_ResultRow& src) +{ + for(Uint32 a = 0; a<tab.getNoOfColumns(); a++){ + const NdbDictionary::Column* attr = tab.getColumn(a); + if (attr->getPrimaryKey() == true){ + if (op->equal(attr->getName(), src.attributeStore(a)->aRef()) != 0){ + return NDBT_FAILED; + } + } + } + return 0; +} diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 8faa0b33756..6d643298a79 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -827,10 +827,7 @@ int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type) if (type == TL_WRITE_ALLOW_WRITE) return NdbOperation::LM_Exclusive; else if (uses_blob_value(retrieve_all_fields)) - /* - TODO use a new scan mode to read + lock + keyinfo - */ - return NdbOperation::LM_Exclusive; + return NdbOperation::LM_Read; else return NdbOperation::LM_CommittedRead; } @@ -1342,6 +1339,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, const key_range *end_key, bool sorted, byte* buf) { + bool restart; NdbConnection *trans= m_active_trans; NdbResultSet *cursor; NdbIndexScanOperation *op; @@ -1353,16 +1351,28 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, DBUG_EXECUTE("enter", print_key(start_key, "start_key");); DBUG_EXECUTE("enter", print_key(end_key, "end_key");); - - NdbOperation::LockMode lm= - (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); - if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *) - m_index[active_index].index, - (const NDBTAB *) m_table)) || - !(cursor= op->readTuples(lm, 0, parallelism, sorted))) - ERR_RETURN(trans->getNdbError()); - m_active_cursor= cursor; - + if(m_active_cursor == 0) + { + restart= false; + NdbOperation::LockMode lm= + (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); + if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *) + m_index[active_index].index, + (const NDBTAB *) m_table)) || + !(cursor= op->readTuples(lm, 0, parallelism, sorted))) + ERR_RETURN(trans->getNdbError()); + m_active_cursor= cursor; + } else { + restart= true; + op= (NdbIndexScanOperation*)m_active_cursor->getOperation(); + + DBUG_ASSERT(op->getSorted() == sorted); + DBUG_ASSERT(op->getLockMode() == + (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type)); + if(op->reset_bounds()) + DBUG_RETURN(ndb_err(m_active_trans)); + } + if (start_key && set_bounds(op, start_key, (start_key->flag == HA_READ_KEY_EXACT) ? @@ -1384,10 +1394,19 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, NdbIndexScanOperation::BoundGT)) DBUG_RETURN(1); } - DBUG_RETURN(define_read_attrs(buf, op)); + if(!restart) + { + DBUG_RETURN(define_read_attrs(buf, op)); + } + else + { + if (execute_no_commit(this,trans) != 0) + DBUG_RETURN(ndb_err(trans)); + + DBUG_RETURN(next_result(buf)); + } } - /* Start a filtered scan in NDB. @@ -2263,9 +2282,6 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key, DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf"); DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted)); - if (m_active_cursor) - close_scan(); - switch (get_index_type(active_index)){ case PRIMARY_KEY_ORDERED_INDEX: case PRIMARY_KEY_INDEX: @@ -2293,11 +2309,8 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key, break; } - // Start the ordered index scan and fetch the first row - error= ordered_index_scan(start_key, end_key, sorted, - buf); - + error= ordered_index_scan(start_key, end_key, sorted, buf); DBUG_RETURN(error); } |