summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--BUILD/SETUP.sh11
-rw-r--r--mysql-test/r/ndb_index_ordered.result8
-rw-r--r--mysql-test/t/ndb_index_ordered.test3
-rw-r--r--ndb/include/kernel/signaldata/AttrInfo.hpp3
-rw-r--r--ndb/include/kernel/signaldata/KeyInfo.hpp1
-rw-r--r--ndb/include/kernel/signaldata/ScanFrag.hpp6
-rw-r--r--ndb/include/kernel/signaldata/ScanTab.hpp24
-rw-r--r--ndb/include/ndbapi/NdbConnection.hpp3
-rw-r--r--ndb/include/ndbapi/NdbIndexScanOperation.hpp9
-rw-r--r--ndb/include/ndbapi/NdbOperation.hpp8
-rw-r--r--ndb/include/ndbapi/NdbResultSet.hpp12
-rw-r--r--ndb/include/ndbapi/NdbScanOperation.hpp4
-rw-r--r--ndb/src/common/debugger/signaldata/Makefile.am3
-rw-r--r--ndb/src/common/debugger/signaldata/ScanFrag.cpp42
-rw-r--r--ndb/src/common/debugger/signaldata/ScanTab.cpp11
-rw-r--r--ndb/src/common/debugger/signaldata/SignalDataPrint.cpp2
-rw-r--r--ndb/src/kernel/blocks/backup/Backup.cpp2
-rw-r--r--ndb/src/kernel/blocks/dblqh/Dblqh.hpp2
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhMain.cpp66
-rw-r--r--ndb/src/kernel/blocks/dbtc/Dbtc.hpp20
-rw-r--r--ndb/src/kernel/blocks/dbtc/DbtcMain.cpp146
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp4
-rw-r--r--ndb/src/kernel/blocks/suma/Suma.cpp2
-rw-r--r--ndb/src/ndbapi/NdbConnection.cpp34
-rw-r--r--ndb/src/ndbapi/NdbOperation.cpp10
-rw-r--r--ndb/src/ndbapi/NdbOperationDefine.cpp20
-rw-r--r--ndb/src/ndbapi/NdbOperationInt.cpp4
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp374
-rw-r--r--ndb/src/ndbapi/Ndblist.cpp4
-rw-r--r--ndb/test/include/NDBT_ResultRow.hpp5
-rw-r--r--ndb/test/include/UtilTransactions.hpp19
-rw-r--r--ndb/test/ndbapi/testBlobs.cpp2
-rw-r--r--ndb/test/src/NDBT_ResultRow.cpp6
-rw-r--r--ndb/test/src/UtilTransactions.cpp259
-rw-r--r--sql/ha_ndbcluster.cc59
35 files changed, 847 insertions, 341 deletions
diff --git a/BUILD/SETUP.sh b/BUILD/SETUP.sh
index aa6b412d73b..f8baf317e72 100644
--- a/BUILD/SETUP.sh
+++ b/BUILD/SETUP.sh
@@ -6,6 +6,7 @@ fi
just_print=
just_configure=
+full_debug=
while test $# -gt 0
do
case "$1" in
@@ -21,6 +22,7 @@ Any other options will be passed directly to configure.
Note: this script is intended for internal use by MySQL developers.
EOF
+ --with-debug=full ) full_debug="=full"; shift ;;
* ) break ;;
esac
done
@@ -48,7 +50,8 @@ fast_cflags="-O3 -fno-omit-frame-pointer"
# this is one is for someone who thinks 1% speedup is worth not being
# able to backtrace
reckless_cflags="-O3 -fomit-frame-pointer "
-debug_cflags="-DUNIV_MUST_NOT_INLINE -DEXTRA_DEBUG -DFORCE_INIT_OF_VARS -DSAFEMALLOC -DPEDANTIC_SAFEMALLOC -DSAFE_MUTEX -O1 -Wuninitialized"
+
+debug_cflags="-DUNIV_MUST_NOT_INLINE -DEXTRA_DEBUG -DFORCE_INIT_OF_VARS -DSAFEMALLOC -DPEDANTIC_SAFEMALLOC -DSAFE_MUTEX"
base_cxxflags="-felide-constructors -fno-exceptions -fno-rtti"
@@ -62,7 +65,11 @@ sparc_configs=""
# and unset local_infile_configs
local_infile_configs="--enable-local-infile"
-debug_configs="--with-debug"
+debug_configs="--with-debug$full_debug"
+if [ -z "$full_debug" ]
+then
+ debug_cflags="$debug_cflags -O1 -Wuninitialized"
+fi
if gmake --version > /dev/null 2>&1
then
diff --git a/mysql-test/r/ndb_index_ordered.result b/mysql-test/r/ndb_index_ordered.result
index 2a3050e5dea..e0c486ee2c5 100644
--- a/mysql-test/r/ndb_index_ordered.result
+++ b/mysql-test/r/ndb_index_ordered.result
@@ -37,6 +37,14 @@ a b c
1 2 3
2 3 5
3 4 6
+select tt1.* from t1 as tt1, t1 as tt2 use index(b) where tt1.b = tt2.b order by tt1.c;
+a b c
+6 7 2
+5 6 2
+1 2 3
+2 3 5
+3 4 6
+4 5 8
update t1 set c = 3 where b = 3;
select * from t1 order by a;
a b c
diff --git a/mysql-test/t/ndb_index_ordered.test b/mysql-test/t/ndb_index_ordered.test
index 2a94475df13..e1766f6e624 100644
--- a/mysql-test/t/ndb_index_ordered.test
+++ b/mysql-test/t/ndb_index_ordered.test
@@ -23,6 +23,9 @@ select * from t1 where b > 4 order by b;
select * from t1 where b < 4 order by b;
select * from t1 where b <= 4 order by b;
+# Test of reset_bounds
+select tt1.* from t1 as tt1, t1 as tt2 use index(b) where tt1.b = tt2.b order by tt1.c;
+
#
# Here we should add some "explain select" to verify that the ordered index is
# used for these queries.
diff --git a/ndb/include/kernel/signaldata/AttrInfo.hpp b/ndb/include/kernel/signaldata/AttrInfo.hpp
index 18bd9b22c40..c87470db8b0 100644
--- a/ndb/include/kernel/signaldata/AttrInfo.hpp
+++ b/ndb/include/kernel/signaldata/AttrInfo.hpp
@@ -35,7 +35,8 @@ class AttrInfo {
*/
friend class Dbtc;
friend class Dblqh;
-
+ friend class NdbScanOperation;
+
friend bool printATTRINFO(FILE *, const Uint32 *, Uint32, Uint16);
public:
diff --git a/ndb/include/kernel/signaldata/KeyInfo.hpp b/ndb/include/kernel/signaldata/KeyInfo.hpp
index a4c698f89b2..686f3ae053d 100644
--- a/ndb/include/kernel/signaldata/KeyInfo.hpp
+++ b/ndb/include/kernel/signaldata/KeyInfo.hpp
@@ -26,6 +26,7 @@ class KeyInfo {
friend class DbUtil;
friend class NdbOperation;
friend class NdbScanOperation;
+ friend class NdbIndexScanOperation;
/**
* Reciver(s)
diff --git a/ndb/include/kernel/signaldata/ScanFrag.hpp b/ndb/include/kernel/signaldata/ScanFrag.hpp
index d3a89b8dc25..41ea569c45d 100644
--- a/ndb/include/kernel/signaldata/ScanFrag.hpp
+++ b/ndb/include/kernel/signaldata/ScanFrag.hpp
@@ -34,14 +34,16 @@ class ScanFragReq {
friend class Dblqh;
public:
STATIC_CONST( SignalLength = 12 );
-
+
+ friend bool printSCAN_FRAGREQ(FILE *, const Uint32*, Uint32, Uint16);
+
public:
Uint32 senderData;
Uint32 resultRef; // Where to send the result
Uint32 savePointId;
Uint32 requestInfo;
Uint32 tableId;
- Uint32 fragmentNo;
+ Uint32 fragmentNoKeyLen;
Uint32 schemaVersion;
Uint32 transId1;
Uint32 transId2;
diff --git a/ndb/include/kernel/signaldata/ScanTab.hpp b/ndb/include/kernel/signaldata/ScanTab.hpp
index 1acd7ae4736..fb5f18eae9e 100644
--- a/ndb/include/kernel/signaldata/ScanTab.hpp
+++ b/ndb/include/kernel/signaldata/ScanTab.hpp
@@ -55,7 +55,7 @@ private:
* DATA VARIABLES
*/
UintR apiConnectPtr; // DATA 0
- UintR attrLen; // DATA 1
+ UintR attrLenKeyLen; // DATA 1
UintR requestInfo; // DATA 2
UintR tableId; // DATA 3
UintR tableSchemaVersion; // DATA 4
@@ -74,6 +74,7 @@ private:
static Uint8 getHoldLockFlag(const UintR & requestInfo);
static Uint8 getReadCommittedFlag(const UintR & requestInfo);
static Uint8 getRangeScanFlag(const UintR & requestInfo);
+ static Uint8 getKeyinfoFlag(const UintR & requestInfo);
static Uint16 getScanBatch(const UintR & requestInfo);
/**
@@ -85,6 +86,7 @@ private:
static void setHoldLockFlag(UintR & requestInfo, Uint32 flag);
static void setReadCommittedFlag(UintR & requestInfo, Uint32 flag);
static void setRangeScanFlag(UintR & requestInfo, Uint32 flag);
+ static void setKeyinfoFlag(UintR & requestInfo, Uint32 flag);
static void setScanBatch(Uint32& requestInfo, Uint32 sz);
};
@@ -95,12 +97,13 @@ private:
l = Lock mode - 1 Bit 8
h = Hold lock mode - 1 Bit 10
c = Read Committed - 1 Bit 11
+ k = Keyinfo - 1 Bit 12
x = Range Scan (TUX) - 1 Bit 15
b = Scan batch - 10 Bit 16-25 (max 1023)
1111111111222222222233
01234567890123456789012345678901
- ppppppppl hc xbbbbbbbbbb
+ ppppppppl hck xbbbbbbbbbb
*/
#define PARALLELL_SHIFT (0)
@@ -112,6 +115,9 @@ private:
#define HOLD_LOCK_SHIFT (10)
#define HOLD_LOCK_MASK (1)
+#define KEYINFO_SHIFT (12)
+#define KEYINFO_MASK (1)
+
#define READ_COMMITTED_SHIFT (11)
#define READ_COMMITTED_MASK (1)
@@ -206,6 +212,20 @@ ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){
requestInfo |= (flag << SCAN_BATCH_SHIFT);
}
+inline
+Uint8
+ScanTabReq::getKeyinfoFlag(const UintR & requestInfo){
+ return (Uint8)((requestInfo >> KEYINFO_SHIFT) & KEYINFO_MASK);
+}
+
+inline
+void
+ScanTabReq::setKeyinfoFlag(UintR & requestInfo, Uint32 flag){
+ ASSERT_BOOL(flag, "ScanTabReq::setKeyinfoFlag");
+ requestInfo |= (flag << KEYINFO_SHIFT);
+}
+
+
/**
*
* SENDER: Dbtc
diff --git a/ndb/include/ndbapi/NdbConnection.hpp b/ndb/include/ndbapi/NdbConnection.hpp
index ef4972f205b..d23a2e7cc0d 100644
--- a/ndb/include/ndbapi/NdbConnection.hpp
+++ b/ndb/include/ndbapi/NdbConnection.hpp
@@ -673,6 +673,9 @@ private:
void printState();
#endif
bool checkState_TransId(const Uint32 * transId) const;
+
+ void remove_list(NdbOperation*& head, NdbOperation*);
+ void define_scan_op(NdbIndexScanOperation*);
};
inline
diff --git a/ndb/include/ndbapi/NdbIndexScanOperation.hpp b/ndb/include/ndbapi/NdbIndexScanOperation.hpp
index 82aed04a9fc..a854cb58665 100644
--- a/ndb/include/ndbapi/NdbIndexScanOperation.hpp
+++ b/ndb/include/ndbapi/NdbIndexScanOperation.hpp
@@ -118,13 +118,20 @@ public:
int setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len = 0);
/** @} *********************************************************************/
+
+ /**
+ * Reset bounds and put operation in list that will be
+ * sent on next execute
+ */
+ int reset_bounds();
+ bool getSorted() const { return m_ordered; }
private:
NdbIndexScanOperation(Ndb* aNdb);
virtual ~NdbIndexScanOperation();
int setBound(const NdbColumnImpl*, int type, const void* aValue, Uint32 len);
- int saveBoundATTRINFO();
+ int insertBOUNDS(Uint32 * data, Uint32 sz);
virtual int equal_impl(const NdbColumnImpl*, const char*, Uint32);
virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*);
diff --git a/ndb/include/ndbapi/NdbOperation.hpp b/ndb/include/ndbapi/NdbOperation.hpp
index a8bd8b9bfea..d35fea0e995 100644
--- a/ndb/include/ndbapi/NdbOperation.hpp
+++ b/ndb/include/ndbapi/NdbOperation.hpp
@@ -717,6 +717,8 @@ public:
NotDefined ///< Internal for debugging
};
+ LockMode getLockMode() const { return theLockMode; }
+
protected:
/******************************************************************************
* These are the methods used to create and delete the NdbOperation objects.
@@ -749,7 +751,6 @@ protected:
FinalGetValue,
SubroutineExec,
SubroutineEnd,
- SetBound,
WaitResponse,
WaitCommitResponse,
Finished,
@@ -894,7 +895,7 @@ protected:
// currently defined
OperationType theOperationType; // Read Request, Update Req......
- Uint8 theLockMode; // Can be set to WRITE if read operation
+ LockMode theLockMode; // Can be set to WRITE if read operation
OperationStatus theStatus; // The status of the operation.
Uint32 theMagicNumber; // Magic number to verify that object
// is correct
@@ -921,9 +922,6 @@ protected:
Uint16 m_keyInfoGSN;
Uint16 m_attrInfoGSN;
- // saveBoundATTRINFO() moves ATTRINFO here when setBound() is ready
- NdbApiSignal* theBoundATTRINFO;
- Uint32 theTotalBoundAI_Len;
// Blobs in this operation
NdbBlob* theBlobList;
diff --git a/ndb/include/ndbapi/NdbResultSet.hpp b/ndb/include/ndbapi/NdbResultSet.hpp
index 483e08179c0..478daf8aad2 100644
--- a/ndb/include/ndbapi/NdbResultSet.hpp
+++ b/ndb/include/ndbapi/NdbResultSet.hpp
@@ -138,7 +138,11 @@ public:
*/
int deleteTuple();
int deleteTuple(NdbConnection* takeOverTransaction);
-
+
+ /**
+ * Get underlying operation
+ */
+ NdbOperation* getOperation();
private:
NdbResultSet(NdbScanOperation*);
@@ -149,4 +153,10 @@ private:
NdbScanOperation* m_operation;
};
+inline
+NdbOperation*
+NdbResultSet::getOperation(){
+ return m_operation;
+}
+
#endif
diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp
index e8a4408469c..a1c66b380a7 100644
--- a/ndb/include/ndbapi/NdbScanOperation.hpp
+++ b/ndb/include/ndbapi/NdbScanOperation.hpp
@@ -87,12 +87,13 @@ protected:
CursorType m_cursor_type;
NdbScanOperation(Ndb* aNdb);
- ~NdbScanOperation();
+ virtual ~NdbScanOperation();
int nextResult(bool fetchAllowed = true);
virtual void release();
void closeScan();
+ int close_impl(class TransporterFacade*);
// Overloaded methods from NdbCursorOperation
int executeCursor(int ProcessorId);
@@ -119,6 +120,7 @@ protected:
int prepareSendScan(Uint32 TC_ConnectPtr, Uint64 TransactionId);
int fix_receivers(Uint32 parallel);
+ void reset_receivers(Uint32 parallel, Uint32 ordered);
Uint32* m_array; // containing all arrays below
Uint32 m_allocated_receivers;
NdbReceiver** m_receivers; // All receivers
diff --git a/ndb/src/common/debugger/signaldata/Makefile.am b/ndb/src/common/debugger/signaldata/Makefile.am
index 0a5806e1e00..c855c5f8a18 100644
--- a/ndb/src/common/debugger/signaldata/Makefile.am
+++ b/ndb/src/common/debugger/signaldata/Makefile.am
@@ -23,7 +23,8 @@ libsignaldataprint_la_SOURCES = \
FailRep.cpp DisconnectRep.cpp SignalDroppedRep.cpp \
SumaImpl.cpp NdbSttor.cpp CreateFragmentation.cpp \
UtilLock.cpp TuxMaint.cpp AccLock.cpp \
- LqhTrans.cpp ReadNodesConf.cpp CntrStart.cpp
+ LqhTrans.cpp ReadNodesConf.cpp CntrStart.cpp \
+ ScanFrag.cpp
include $(top_srcdir)/ndb/config/common.mk.am
include $(top_srcdir)/ndb/config/type_ndbapi.mk.am
diff --git a/ndb/src/common/debugger/signaldata/ScanFrag.cpp b/ndb/src/common/debugger/signaldata/ScanFrag.cpp
new file mode 100644
index 00000000000..4d19a325637
--- /dev/null
+++ b/ndb/src/common/debugger/signaldata/ScanFrag.cpp
@@ -0,0 +1,42 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+
+#include <BlockNumbers.h>
+#include <signaldata/ScanTab.hpp>
+#include <signaldata/ScanFrag.hpp>
+
+bool
+printSCAN_FRAGREQ(FILE * output, const Uint32 * theData,
+ Uint32 len, Uint16 receiverBlockNo) {
+ const ScanFragReq * const sig = (ScanFragReq *)theData;
+ fprintf(output, " senderData: %x\n", sig->senderData);
+ fprintf(output, " resultRef: %x\n", sig->resultRef);
+ fprintf(output, " savePointId: %x\n", sig->savePointId);
+ fprintf(output, " requestInfo: %x\n", sig->requestInfo);
+ fprintf(output, " tableId: %x\n", sig->tableId);
+ fprintf(output, " fragmentNo: %x\n", sig->fragmentNoKeyLen & 0xFFFF);
+ fprintf(output, " keyLen: %x\n", sig->fragmentNoKeyLen >> 16);
+ fprintf(output, " schemaVersion: %x\n", sig->schemaVersion);
+ fprintf(output, " transId1: %x\n", sig->transId1);
+ fprintf(output, " transId2: %x\n", sig->transId2);
+ fprintf(output, " clientOpPtr: %x\n", sig->clientOpPtr);
+ fprintf(output, " batch_size_rows: %x\n", sig->batch_size_rows);
+ fprintf(output, " batch_size_bytes: %x\n", sig->batch_size_bytes);
+ return true;
+}
+
diff --git a/ndb/src/common/debugger/signaldata/ScanTab.cpp b/ndb/src/common/debugger/signaldata/ScanTab.cpp
index 3f2109d9477..72a4d9f94b9 100644
--- a/ndb/src/common/debugger/signaldata/ScanTab.cpp
+++ b/ndb/src/common/debugger/signaldata/ScanTab.cpp
@@ -30,15 +30,18 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
fprintf(output, " apiConnectPtr: H\'%.8x",
sig->apiConnectPtr);
fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo);
- fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Holdlock: %u, RangeScan: %u\n",
+ fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u\n",
sig->getParallelism(requestInfo),
sig->getScanBatch(requestInfo),
sig->getLockMode(requestInfo),
sig->getHoldLockFlag(requestInfo),
- sig->getRangeScanFlag(requestInfo));
+ sig->getRangeScanFlag(requestInfo),
+ sig->getKeyinfoFlag(requestInfo));
- fprintf(output, " attrLen: %d, tableId: %d, tableSchemaVer: %d\n",
- sig->attrLen, sig->tableId, sig->tableSchemaVersion);
+ Uint32 keyLen = (sig->attrLenKeyLen >> 16);
+ Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF);
+ fprintf(output, " attrLen: %d, keyLen: %d tableId: %d, tableSchemaVer: %d\n",
+ attrLen, keyLen, sig->tableId, sig->tableSchemaVersion);
fprintf(output, " transId(1, 2): (H\'%.8x, H\'%.8x) storedProcId: H\'%.8x\n",
sig->transId1, sig->transId2, sig->storedProcId);
diff --git a/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp b/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp
index 65351663789..640449a0579 100644
--- a/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp
+++ b/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp
@@ -53,6 +53,7 @@
#include <signaldata/UtilPrepare.hpp>
#include <signaldata/UtilExecute.hpp>
#include <signaldata/ScanTab.hpp>
+#include <signaldata/ScanFrag.hpp>
#include <signaldata/LqhFrag.hpp>
#include <signaldata/LqhTransConf.hpp>
#include <signaldata/DropTab.hpp>
@@ -250,6 +251,7 @@ SignalDataPrintFunctions[] = {
,{ GSN_TUX_MAINT_REQ, printTUX_MAINT_REQ }
,{ GSN_ACC_LOCKREQ, printACC_LOCKREQ }
,{ GSN_LQH_TRANSCONF, printLQH_TRANSCONF }
+ ,{ GSN_SCAN_FRAGREQ, printSCAN_FRAGREQ }
};
const unsigned short NO_OF_PRINT_FUNCTIONS = sizeof(SignalDataPrintFunctions)/sizeof(NameFunctionPair);
diff --git a/ndb/src/kernel/blocks/backup/Backup.cpp b/ndb/src/kernel/blocks/backup/Backup.cpp
index 08a8bf83e20..569b3f98faa 100644
--- a/ndb/src/kernel/blocks/backup/Backup.cpp
+++ b/ndb/src/kernel/blocks/backup/Backup.cpp
@@ -3360,7 +3360,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
req->senderData = filePtr.i;
req->resultRef = reference();
req->schemaVersion = table.schemaVersion;
- req->fragmentNo = fragNo;
+ req->fragmentNoKeyLen = fragNo;
req->requestInfo = 0;
req->savePointId = 0;
req->tableId = table.tableId;
diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
index a94af7b59c8..b4531af7cd6 100644
--- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
+++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
@@ -2248,7 +2248,7 @@ private:
void sendAttrinfoLoop(Signal* signal);
void sendAttrinfoSignal(Signal* signal);
void sendLqhAttrinfoSignal(Signal* signal);
- void sendKeyinfoAcc(Signal* signal);
+ void sendKeyinfoAcc(Signal* signal, Uint32 pos);
Uint32 initScanrec(const class ScanFragReq *);
void initScanTc(Signal* signal,
Uint32 transid1,
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index 8342870d69c..c5e2bd900e9 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -2803,8 +2803,10 @@ void Dblqh::execKEYINFO(Signal* signal)
return;
}//if
TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->transactionState !=
- TcConnectionrec::WAIT_TUPKEYINFO) {
+ TcConnectionrec::TransactionState state = regTcPtr->transactionState;
+ if (state != TcConnectionrec::WAIT_TUPKEYINFO &&
+ state != TcConnectionrec::WAIT_SCAN_AI)
+ {
jam();
/*****************************************************************************/
/* TRANSACTION WAS ABORTED, THIS IS MOST LIKELY A SIGNAL BELONGING TO THE */
@@ -2822,15 +2824,19 @@ void Dblqh::execKEYINFO(Signal* signal)
return;
}//if
jam();
+ abort();
terrorCode = errorCode;
abortErrorLab(signal);
return;
}//if
- FragrecordPtr regFragptr;
- regFragptr.i = regTcPtr->fragmentptr;
- ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
- fragptr = regFragptr;
- endgettupkeyLab(signal);
+ if(state == TcConnectionrec::WAIT_TUPKEYINFO)
+ {
+ FragrecordPtr regFragptr;
+ regFragptr.i = regTcPtr->fragmentptr;
+ ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
+ fragptr = regFragptr;
+ endgettupkeyLab(signal);
+ }
return;
}//Dblqh::execKEYINFO()
@@ -2838,9 +2844,9 @@ void Dblqh::execKEYINFO(Signal* signal)
/* FILL IN KEY DATA INTO DATA BUFFERS. */
/* ------------------------------------------------------------------------- */
Uint32 Dblqh::handleLongTupKey(Signal* signal,
- Uint32 keyLength,
- Uint32 primKeyLength,
- Uint32* dataPtr)
+ Uint32 keyLength,
+ Uint32 primKeyLength,
+ Uint32* dataPtr)
{
TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 dataPos = 0;
@@ -3686,7 +3692,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
signal->theData[9] = sig3;
signal->theData[10] = sig4;
if (regTcPtr->primKeyLen > 4) {
- sendKeyinfoAcc(signal);
+ sendKeyinfoAcc(signal, 11);
}//if
EXECUTE_DIRECT(refToBlock(regTcPtr->tcAccBlockref), GSN_ACCKEYREQ,
signal, 7 + regTcPtr->primKeyLen);
@@ -3708,9 +3714,8 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
/* ======= SEND KEYINFO TO ACC ======= */
/* */
/* ========================================================================== */
-void Dblqh::sendKeyinfoAcc(Signal* signal)
+void Dblqh::sendKeyinfoAcc(Signal* signal, Uint32 Ti)
{
- UintR Ti = 11;
DatabufPtr regDatabufptr;
regDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
@@ -7409,7 +7414,8 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
jamEntry();
const Uint32 reqinfo = scanFragReq->requestInfo;
- const Uint32 fragId = scanFragReq->fragmentNo;
+ const Uint32 fragId = (scanFragReq->fragmentNoKeyLen & 0xFFFF);
+ const Uint32 keyLen = (scanFragReq->fragmentNoKeyLen >> 16);
tabptr.i = scanFragReq->tableId;
const Uint32 max_rows = scanFragReq->batch_size_rows;
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
@@ -7473,6 +7479,8 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
transid2,
fragId,
ZNIL);
+ tcConnectptr.p->save1 = 4;
+ tcConnectptr.p->primKeyLen = keyLen + 4; // hard coded in execKEYINFO
errorCode = initScanrec(scanFragReq);
if (errorCode != ZOK) {
jam();
@@ -7672,34 +7680,18 @@ void Dblqh::accScanConfScanLab(Signal* signal)
return;
}//if
scanptr.p->scanAccPtr = accScanConf->accPtr;
- AttrbufPtr regAttrinbufptr;
- regAttrinbufptr.i = tcConnectptr.p->firstAttrinbuf;
- Uint32 boundAiLength = 0;
+ Uint32 boundAiLength = tcConnectptr.p->primKeyLen - 4;
if (scanptr.p->rangeScan) {
jam();
// bound info length is in first of the 5 header words
- ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
- boundAiLength = regAttrinbufptr.p->attrbuf[0];
TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
req->errorCode = RNIL;
req->tuxScanPtrI = scanptr.p->scanAccPtr;
req->boundAiLength = boundAiLength;
- Uint32* out = (Uint32*)req + TuxBoundInfo::SignalLength;
- Uint32 sz = 0;
- while (sz < boundAiLength) {
- jam();
- ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
- Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
- MEMCOPY_NO_WORDS(&out[sz],
- &regAttrinbufptr.p->attrbuf[0],
- dataLen);
- sz += dataLen;
- regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
- ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
- }
- ndbrequire(sz == boundAiLength);
+ if(boundAiLength > 0)
+ sendKeyinfoAcc(signal, TuxBoundInfo::SignalLength);
EXECUTE_DIRECT(DBTUX, GSN_TUX_BOUND_INFO,
- signal, TuxBoundInfo::SignalLength + boundAiLength);
+ signal, TuxBoundInfo::SignalLength + boundAiLength);
jamEntry();
if (req->errorCode != 0) {
jam();
@@ -7716,12 +7708,14 @@ void Dblqh::accScanConfScanLab(Signal* signal)
signal->theData[1] = tcConnectptr.p->tableref;
signal->theData[2] = scanptr.p->scanSchemaVersion;
signal->theData[3] = ZSTORED_PROC_SCAN;
- ndbrequire(boundAiLength <= scanptr.p->scanAiLength);
- signal->theData[4] = scanptr.p->scanAiLength - boundAiLength;
+
+ signal->theData[4] = scanptr.p->scanAiLength;
sendSignal(tcConnectptr.p->tcTupBlockref,
GSN_STORED_PROCREQ, signal, 5, JBB);
signal->theData[0] = tcConnectptr.p->tupConnectrec;
+ AttrbufPtr regAttrinbufptr;
+ regAttrinbufptr.i = tcConnectptr.p->firstAttrinbuf;
while (regAttrinbufptr.i != RNIL) {
ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
jam();
diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
index a5f07f8e9e1..09a7708783e 100644
--- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
+++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
@@ -192,7 +192,8 @@ public:
OS_WAIT_ATTR = 14,
OS_WAIT_COMMIT_CONF = 15,
OS_WAIT_ABORT_CONF = 16,
- OS_WAIT_COMPLETE_CONF = 17
+ OS_WAIT_COMPLETE_CONF = 17,
+ OS_WAIT_SCAN = 18
};
enum AbortState {
@@ -1169,6 +1170,8 @@ public:
// Length of expected attribute information
Uint32 scanAiLength;
+ Uint32 scanKeyLen;
+
// Reference to ApiConnectRecord
Uint32 scanApiRec;
@@ -1194,18 +1197,7 @@ public:
Uint16 first_batch_size;
Uint32 batch_byte_size;
- // Shall the locks be held until the application have read the
- // records
- Uint8 scanLockHold;
-
- // Shall the locks be read or write locks
- Uint8 scanLockMode;
-
- // Skip locks by other transactions and read latest committed
- Uint8 readCommitted;
-
- // Scan is on ordered index
- Uint8 rangeScan;
+ Uint32 scanRequestInfo; // ScanFrag format
// Close is ordered
bool m_close_scan_req;
@@ -1571,7 +1563,7 @@ private:
void diFcountReqLab(Signal* signal, ScanRecordPtr);
void signalErrorRefuseLab(Signal* signal);
void abort080Lab(Signal* signal);
- void packKeyData000Lab(Signal* signal, BlockReference TBRef);
+ void packKeyData000Lab(Signal* signal, BlockReference TBRef, Uint32 len);
void abortScanLab(Signal* signal, ScanRecordPtr, Uint32 errCode);
void sendAbortedAfterTimeout(Signal* signal, int Tcheck);
void abort010Lab(Signal* signal);
diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
index d270c6acc61..890b6599d0a 100644
--- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
+++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
@@ -1759,6 +1759,7 @@ void Dbtc::execKEYINFO(Signal* signal)
switch (apiConnectptr.p->apiConnectstate) {
case CS_RECEIVING:
case CS_REC_COMMITTING:
+ case CS_START_SCAN:
jam();
/*empty*/;
break;
@@ -1812,12 +1813,54 @@ void Dbtc::execKEYINFO(Signal* signal)
jam();
tckeyreq020Lab(signal);
return;
+ case OS_WAIT_SCAN:
+ break;
default:
jam();
terrorCode = ZSTATE_ERROR;
abortErrorLab(signal);
return;
}//switch
+
+ UintR TdataPos = 0;
+ UintR TkeyLen = regCachePtr->keylen;
+ UintR Tlen = regCachePtr->save1;
+
+ do {
+ if (cfirstfreeDatabuf == RNIL) {
+ jam();
+ abort();
+ seizeDatabuferrorLab(signal);
+ return;
+ }//if
+ linkKeybuf(signal);
+ arrGuard(TdataPos, 19);
+ databufptr.p->data[0] = signal->theData[TdataPos + 3];
+ databufptr.p->data[1] = signal->theData[TdataPos + 4];
+ databufptr.p->data[2] = signal->theData[TdataPos + 5];
+ databufptr.p->data[3] = signal->theData[TdataPos + 6];
+ Tlen = Tlen + 4;
+ TdataPos = TdataPos + 4;
+ if (Tlen < TkeyLen) {
+ jam();
+ if (TdataPos >= tmaxData) {
+ jam();
+ /*----------------------------------------------------*/
+ /** EXIT AND WAIT FOR SIGNAL KEYINFO OR KEYINFO9 **/
+ /** WHEN EITHER OF THE SIGNALS IS RECEIVED A JUMP **/
+ /** TO LABEL "KEYINFO_LABEL" IS DONE. THEN THE **/
+ /** PROGRAM RETURNS TO LABEL TCKEYREQ020 **/
+ /*----------------------------------------------------*/
+ setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
+ regCachePtr->save1 = Tlen;
+ return;
+ }//if
+ } else {
+ jam();
+ return;
+ }//if
+ } while (1);
+ return;
}//Dbtc::execKEYINFO()
/*---------------------------------------------------------------------------*/
@@ -1826,45 +1869,45 @@ void Dbtc::execKEYINFO(Signal* signal)
/* WE WILL ALWAYS PACK 4 WORDS AT A TIME. */
/*---------------------------------------------------------------------------*/
void Dbtc::packKeyData000Lab(Signal* signal,
- BlockReference TBRef)
+ BlockReference TBRef,
+ Uint32 totalLen)
{
CacheRecord * const regCachePtr = cachePtr.p;
UintR Tmp;
- Uint16 tdataPos;
jam();
- tdataPos = 0;
- Tmp = regCachePtr->keylen;
+ Uint32 len = 0;
databufptr.i = regCachePtr->firstKeybuf;
+ signal->theData[0] = tcConnectptr.i;
+ signal->theData[1] = apiConnectptr.p->transid[0];
+ signal->theData[2] = apiConnectptr.p->transid[1];
+ Uint32 * dst = signal->theData+3;
+ ptrCheckGuard(databufptr, cdatabufFilesize, databufRecord);
+
do {
jam();
- if (tdataPos == 20) {
- jam();
- /*---------------------------------------------------------------------*/
- /* 4 MORE WORDS WILL NOT FIT IN THE 24 DATA WORDS IN A SIGNAL */
- /*---------------------------------------------------------------------*/
- sendKeyinfo(signal, TBRef, 20);
- tdataPos = 0;
- }//if
- Tmp = Tmp - 4;
- ptrCheckGuard(databufptr, cdatabufFilesize, databufRecord);
- cdata[tdataPos ] = databufptr.p->data[0];
- cdata[tdataPos + 1] = databufptr.p->data[1];
- cdata[tdataPos + 2] = databufptr.p->data[2];
- cdata[tdataPos + 3] = databufptr.p->data[3];
- tdataPos = tdataPos + 4;
- if (Tmp <= 4) {
+ databufptr.i = databufptr.p->nextDatabuf;
+ dst[len + 0] = databufptr.p->data[0];
+ dst[len + 1] = databufptr.p->data[1];
+ dst[len + 2] = databufptr.p->data[2];
+ dst[len + 3] = databufptr.p->data[3];
+ len += 4;
+ if (totalLen <= 4) {
jam();
/*---------------------------------------------------------------------*/
/* LAST PACK OF KEY DATA HAVE BEEN SENT */
/*---------------------------------------------------------------------*/
/* THERE WERE UNSENT INFORMATION, SEND IT. */
/*---------------------------------------------------------------------*/
- sendKeyinfo(signal, TBRef, tdataPos);
- releaseKeys();
+ sendSignal(TBRef, GSN_KEYINFO, signal, 3 + len, JBB);
return;
- }//if
- databufptr.i = databufptr.p->nextDatabuf;
+ } else if(len == KeyInfo::DataLength){
+ jam();
+ len = 0;
+ sendSignal(TBRef, GSN_KEYINFO, signal, 3 + KeyInfo::DataLength, JBB);
+ }
+ totalLen -= 4;
+ ptrCheckGuard(databufptr, cdatabufFilesize, databufRecord);
} while (1);
}//Dbtc::packKeyData000Lab()
@@ -3014,7 +3057,8 @@ void Dbtc::packLqhkeyreq(Signal* signal,
UintR TfirstAttrbuf = regCachePtr->firstAttrbuf;
sendlqhkeyreq(signal, TBRef);
if (Tkeylen > 4) {
- packKeyData000Lab(signal, TBRef);
+ packKeyData000Lab(signal, TBRef, Tkeylen - 4);
+ releaseKeys();
}//if
packLqhkeyreq040Lab(signal,
TfirstAttrbuf,
@@ -6045,6 +6089,7 @@ void Dbtc::timeOutFoundLab(Signal* signal, Uint32 TapiConPtr)
<< " - place: " << c_apiConTimer_line[apiConnectptr.i]);
switch (apiConnectptr.p->apiConnectstate) {
case CS_STARTED:
+ ndbrequire(c_apiConTimer_line[apiConnectptr.i] != 3615);
if(apiConnectptr.p->lqhkeyreqrec == apiConnectptr.p->lqhkeyconfrec){
jam();
/*
@@ -8406,7 +8451,8 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
{
const ScanTabReq * const scanTabReq = (ScanTabReq *)&signal->theData[0];
const Uint32 reqinfo = scanTabReq->requestInfo;
- const Uint32 aiLength = scanTabReq->attrLen;
+ const Uint32 aiLength = (scanTabReq->attrLenKeyLen & 0xFFFF);
+ const Uint32 keyLen = scanTabReq->attrLenKeyLen >> 16;
const Uint32 schemaVersion = scanTabReq->tableSchemaVersion;
const Uint32 transid1 = scanTabReq->transId1;
const Uint32 transid2 = scanTabReq->transId2;
@@ -8480,8 +8526,12 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
seizeTcConnect(signal);
tcConnectptr.p->apiConnect = apiConnectptr.i;
+ tcConnectptr.p->tcConnectstate = OS_WAIT_SCAN;
+ apiConnectptr.p->lastTcConnect = tcConnectptr.i;
seizeCacheRecord(signal);
+ cachePtr.p->keylen = keyLen;
+ cachePtr.p->save1 = 0;
scanptr = seizeScanrec(signal);
ndbrequire(transP->apiScanRec == RNIL);
@@ -8558,21 +8608,27 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
UintR scanParallel,
UintR noOprecPerFrag)
{
- const UintR reqinfo = scanTabReq->requestInfo;
-
scanptr.p->scanTcrec = tcConnectptr.i;
scanptr.p->scanApiRec = apiConnectptr.i;
- scanptr.p->scanAiLength = scanTabReq->attrLen;
+ scanptr.p->scanAiLength = scanTabReq->attrLenKeyLen & 0xFFFF;
+ scanptr.p->scanKeyLen = scanTabReq->attrLenKeyLen >> 16;
scanptr.p->scanTableref = tabptr.i;
scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion;
scanptr.p->scanParallel = scanParallel;
scanptr.p->noOprecPerFrag = noOprecPerFrag;
scanptr.p->first_batch_size= scanTabReq->first_batch_size;
scanptr.p->batch_byte_size= scanTabReq->batch_byte_size;
- scanptr.p->scanLockMode = ScanTabReq::getLockMode(reqinfo);
- scanptr.p->scanLockHold = ScanTabReq::getHoldLockFlag(reqinfo);
- scanptr.p->readCommitted = ScanTabReq::getReadCommittedFlag(reqinfo);
- scanptr.p->rangeScan = ScanTabReq::getRangeScanFlag(reqinfo);
+
+ Uint32 tmp = 0;
+ const UintR ri = scanTabReq->requestInfo;
+ ScanFragReq::setLockMode(tmp, ScanTabReq::getLockMode(ri));
+ ScanFragReq::setHoldLockFlag(tmp, ScanTabReq::getHoldLockFlag(ri));
+ ScanFragReq::setKeyinfoFlag(tmp, ScanTabReq::getKeyinfoFlag(ri));
+ ScanFragReq::setReadCommittedFlag(tmp,ScanTabReq::getReadCommittedFlag(ri));
+ ScanFragReq::setRangeScanFlag(tmp, ScanTabReq::getRangeScanFlag(ri));
+ ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF);
+
+ scanptr.p->scanRequestInfo = tmp;
scanptr.p->scanStoredProcId = scanTabReq->storedProcId;
scanptr.p->scanState = ScanRecord::RUNNING;
scanptr.p->m_queued_count = 0;
@@ -8589,7 +8645,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
ptr.p->m_apiPtr = cdata[i];
}//for
- (* (scanptr.p->rangeScan ?
+ (* (ScanTabReq::getRangeScanFlag(ri) ?
&c_counters.c_range_scan_count :
&c_counters.c_scan_count))++;
}//Dbtc::initScanrec()
@@ -8807,6 +8863,7 @@ void Dbtc::releaseScanResources(ScanRecordPtr scanPtr)
if (apiConnectptr.p->cachePtr != RNIL) {
cachePtr.i = apiConnectptr.p->cachePtr;
ptrCheckGuard(cachePtr, ccacheFilesize, cacheRecord);
+ releaseKeys();
releaseAttrinfo();
}//if
tcConnectptr.i = scanPtr.p->scanTcrec;
@@ -9448,17 +9505,8 @@ void Dbtc::sendScanFragReq(Signal* signal,
ScanRecord* scanP,
ScanFragRec* scanFragP)
{
- Uint32 requestInfo = 0;
ScanFragReq * const req = (ScanFragReq *)&signal->theData[0];
- ScanFragReq::setLockMode(requestInfo, scanP->scanLockMode);
- ScanFragReq::setHoldLockFlag(requestInfo, scanP->scanLockHold);
- if(scanP->scanLockMode == 1){ // Not read -> keyinfo
- jam();
- ScanFragReq::setKeyinfoFlag(requestInfo, 1);
- }
- ScanFragReq::setReadCommittedFlag(requestInfo, scanP->readCommitted);
- ScanFragReq::setRangeScanFlag(requestInfo, scanP->rangeScan);
- ScanFragReq::setAttrLen(requestInfo, scanP->scanAiLength);
+ Uint32 requestInfo = scanP->scanRequestInfo;
ScanFragReq::setScanPrio(requestInfo, 1);
apiConnectptr.i = scanP->scanApiRec;
req->tableId = scanP->scanTableref;
@@ -9466,7 +9514,7 @@ void Dbtc::sendScanFragReq(Signal* signal,
ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
req->senderData = scanFragptr.i;
req->requestInfo = requestInfo;
- req->fragmentNo = scanFragP->scanFragId;
+ req->fragmentNoKeyLen = scanFragP->scanFragId | (scanP->scanKeyLen << 16);
req->resultRef = apiConnectptr.p->ndbapiBlockref;
req->savePointId = apiConnectptr.p->currSavePointId;
req->transId1 = apiConnectptr.p->transid[0];
@@ -9476,6 +9524,11 @@ void Dbtc::sendScanFragReq(Signal* signal,
req->batch_size_bytes= scanP->batch_byte_size;
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB);
+ if(scanP->scanKeyLen > 0)
+ {
+ tcConnectptr.i = scanFragptr.i;
+ packKeyData000Lab(signal, scanFragP->lqhBlockref, scanP->scanKeyLen);
+ }
updateBuddyTimer(apiConnectptr);
scanFragP->startFragTimer(ctcTimer);
}//Dbtc::sendScanFragReq()
@@ -10422,9 +10475,6 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sp.p->scanSchemaVersion,
sp.p->scanTableref,
sp.p->scanStoredProcId);
- infoEvent(" lhold=%d, lmode=%d",
- sp.p->scanLockHold,
- sp.p->scanLockMode);
infoEvent(" apiRec=%d, next=%d",
sp.p->scanApiRec, sp.p->nextScan);
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
index 5b161d3c4ce..7414691ab78 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
@@ -129,14 +129,14 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
// largest attrId seen plus one
Uint32 maxAttrId = 0;
// skip 5 words
- if (req->boundAiLength < 5) {
+ unsigned offset = 0;
+ if (req->boundAiLength < offset) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
return;
}
const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
- unsigned offset = 5;
// walk through entries
while (offset + 2 <= req->boundAiLength) {
jam();
diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp
index 052809cb084..8e651343ab7 100644
--- a/ndb/src/kernel/blocks/suma/Suma.cpp
+++ b/ndb/src/kernel/blocks/suma/Suma.cpp
@@ -1891,7 +1891,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
ScanFragReq::setAttrLen(req->requestInfo, attrLen);
- req->fragmentNo = fd.m_fragDesc.m_fragmentNo;
+ req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
req->schemaVersion = tabPtr.p->m_schemaVersion;
req->transId1 = 0;
req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp
index 8ab0d13c67f..5e5d982444c 100644
--- a/ndb/src/ndbapi/NdbConnection.cpp
+++ b/ndb/src/ndbapi/NdbConnection.cpp
@@ -1115,15 +1115,8 @@ NdbConnection::getNdbScanOperation(const NdbTableImpl * tab)
if (tOp == NULL)
goto getNdbOp_error1;
- // Link scan operation into list of cursor operations
- if (m_theLastScanOperation == NULL)
- m_theFirstScanOperation = m_theLastScanOperation = tOp;
- else {
- m_theLastScanOperation->next(tOp);
- m_theLastScanOperation = tOp;
- }
- tOp->next(NULL);
if (tOp->init(tab, this) != -1) {
+ define_scan_op(tOp);
return tOp;
} else {
theNdb->releaseScanOperation(tOp);
@@ -1135,6 +1128,31 @@ getNdbOp_error1:
return NULL;
}//NdbConnection::getNdbScanOperation()
+void
+NdbConnection::remove_list(NdbOperation*& list, NdbOperation* op){
+ NdbOperation* tmp= list;
+ if(tmp == op)
+ list = op->next();
+ else {
+ while(tmp && tmp->next() != op) tmp = tmp->next();
+ if(tmp)
+ tmp->next(op->next());
+ }
+ op->next(NULL);
+}
+
+void
+NdbConnection::define_scan_op(NdbIndexScanOperation * tOp){
+ // Link scan operation into list of cursor operations
+ if (m_theLastScanOperation == NULL)
+ m_theFirstScanOperation = m_theLastScanOperation = tOp;
+ else {
+ m_theLastScanOperation->next(tOp);
+ m_theLastScanOperation = tOp;
+ }
+ tOp->next(NULL);
+}
+
NdbScanOperation*
NdbConnection::getNdbScanOperation(const NdbDictionary::Table * table)
{
diff --git a/ndb/src/ndbapi/NdbOperation.cpp b/ndb/src/ndbapi/NdbOperation.cpp
index 53a94d98a5a..b0b95d0ff43 100644
--- a/ndb/src/ndbapi/NdbOperation.cpp
+++ b/ndb/src/ndbapi/NdbOperation.cpp
@@ -78,7 +78,6 @@ NdbOperation::NdbOperation(Ndb* aNdb) :
m_tcReqGSN(GSN_TCKEYREQ),
m_keyInfoGSN(GSN_KEYINFO),
m_attrInfoGSN(GSN_ATTRINFO),
- theBoundATTRINFO(NULL),
theBlobList(NULL)
{
theReceiver.init(NdbReceiver::NDB_OPERATION, this);
@@ -167,7 +166,6 @@ NdbOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection){
theScanInfo = 0;
theTotalNrOfKeyWordInSignal = 8;
theMagicNumber = 0xABCDEF01;
- theBoundATTRINFO = NULL;
theBlobList = NULL;
tSignal = theNdb->getSignal();
@@ -263,14 +261,6 @@ NdbOperation::release()
tSubroutine = tSubroutine->theNext;
theNdb->releaseNdbSubroutine(tSaveSubroutine);
}
- tSignal = theBoundATTRINFO;
- while (tSignal != NULL)
- {
- tSaveSignal = tSignal;
- tSignal = tSignal->next();
- theNdb->releaseSignal(tSaveSignal);
- }
- theBoundATTRINFO = NULL;
}
tBlob = theBlobList;
while (tBlob != NULL)
diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp
index 1cbfedd21b1..4809ba0fe01 100644
--- a/ndb/src/ndbapi/NdbOperationDefine.cpp
+++ b/ndb/src/ndbapi/NdbOperationDefine.cpp
@@ -55,6 +55,7 @@ NdbOperation::insertTuple()
theOperationType = InsertRequest;
tNdbCon->theSimpleState = 0;
theErrorLine = tErrorLine++;
+ theLockMode = LM_Exclusive;
return 0;
} else {
setErrorCode(4200);
@@ -74,6 +75,7 @@ NdbOperation::updateTuple()
tNdbCon->theSimpleState = 0;
theOperationType = UpdateRequest;
theErrorLine = tErrorLine++;
+ theLockMode = LM_Exclusive;
return 0;
} else {
setErrorCode(4200);
@@ -93,6 +95,7 @@ NdbOperation::writeTuple()
tNdbCon->theSimpleState = 0;
theOperationType = WriteRequest;
theErrorLine = tErrorLine++;
+ theLockMode = LM_Exclusive;
return 0;
} else {
setErrorCode(4200);
@@ -115,6 +118,8 @@ NdbOperation::readTuple(NdbOperation::LockMode lm)
case LM_CommittedRead:
return readTuple();
break;
+ default:
+ return -1;
};
}
/******************************************************************************
@@ -130,6 +135,7 @@ NdbOperation::readTuple()
tNdbCon->theSimpleState = 0;
theOperationType = ReadRequest;
theErrorLine = tErrorLine++;
+ theLockMode = LM_Read;
return 0;
} else {
setErrorCode(4200);
@@ -150,6 +156,7 @@ NdbOperation::deleteTuple()
tNdbCon->theSimpleState = 0;
theOperationType = DeleteRequest;
theErrorLine = tErrorLine++;
+ theLockMode = LM_Exclusive;
return 0;
} else {
setErrorCode(4200);
@@ -170,6 +177,7 @@ NdbOperation::readTupleExclusive()
tNdbCon->theSimpleState = 0;
theOperationType = ReadExclusive;
theErrorLine = tErrorLine++;
+ theLockMode = LM_Exclusive;
return 0;
} else {
setErrorCode(4200);
@@ -189,6 +197,7 @@ NdbOperation::simpleRead()
theOperationType = ReadRequest;
theSimpleIndicator = 1;
theErrorLine = tErrorLine++;
+ theLockMode = LM_CommittedRead;
return 0;
} else {
setErrorCode(4200);
@@ -218,6 +227,7 @@ NdbOperation::committedRead()
theSimpleIndicator = 1;
theDirtyIndicator = 1;
theErrorLine = tErrorLine++;
+ theLockMode = LM_CommittedRead;
return 0;
} else {
setErrorCode(4200);
@@ -240,6 +250,7 @@ NdbOperation::dirtyUpdate()
theSimpleIndicator = 1;
theDirtyIndicator = 1;
theErrorLine = tErrorLine++;
+ theLockMode = LM_CommittedRead;
return 0;
} else {
setErrorCode(4200);
@@ -262,6 +273,7 @@ NdbOperation::dirtyWrite()
theSimpleIndicator = 1;
theDirtyIndicator = 1;
theErrorLine = tErrorLine++;
+ theLockMode = LM_CommittedRead;
return 0;
} else {
setErrorCode(4200);
@@ -282,7 +294,7 @@ NdbOperation::interpretedUpdateTuple()
tNdbCon->theSimpleState = 0;
theOperationType = UpdateRequest;
theAI_LenInCurrAI = 25;
-
+ theLockMode = LM_Exclusive;
theErrorLine = tErrorLine++;
initInterpreter();
return 0;
@@ -307,7 +319,7 @@ NdbOperation::interpretedDeleteTuple()
theErrorLine = tErrorLine++;
theAI_LenInCurrAI = 25;
-
+ theLockMode = LM_Exclusive;
initInterpreter();
return 0;
} else {
@@ -334,10 +346,6 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue)
if ((tAttrInfo != NULL) &&
(!tAttrInfo->m_indexOnly) &&
(theStatus != Init)){
- if (theStatus == SetBound) {
- ((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
- theStatus = GetValue;
- }
if (theStatus != GetValue) {
if (theInterpretIndicator == 1) {
if (theStatus == FinalGetValue) {
diff --git a/ndb/src/ndbapi/NdbOperationInt.cpp b/ndb/src/ndbapi/NdbOperationInt.cpp
index f5d334fd79a..ee7b8132cd1 100644
--- a/ndb/src/ndbapi/NdbOperationInt.cpp
+++ b/ndb/src/ndbapi/NdbOperationInt.cpp
@@ -216,10 +216,6 @@ int
NdbOperation::initial_interpreterCheck()
{
if ((theInterpretIndicator == 1)) {
- if (theStatus == SetBound) {
- ((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
- theStatus = GetValue;
- }
if (theStatus == ExecInterpretedValue) {
return 0; // Simply continue with interpretation
} else if (theStatus == GetValue) {
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index 3ff2a32d418..8acc0695e4e 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -32,6 +32,7 @@
#include <signaldata/ScanTab.hpp>
#include <signaldata/KeyInfo.hpp>
+#include <signaldata/AttrInfo.hpp>
#include <signaldata/TcKeyReq.hpp>
NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
@@ -116,10 +117,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection)
theStatus = GetValue;
theOperationType = OpenScanRequest;
-
- theTotalBoundAI_Len = 0;
- theBoundATTRINFO = NULL;
-
+ theNdbCon->theMagicNumber = 0xFE11DF;
+
return 0;
}
@@ -145,6 +144,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
}
theNdbCon->theScanningOp = this;
+ theLockMode = lm;
bool lockExcl, lockHoldMode, readCommitted;
switch(lm){
@@ -168,7 +168,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
return 0;
}
- m_keyInfo = lockExcl;
+ m_keyInfo = lockExcl ? 1 : 0;
bool range = false;
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
@@ -181,7 +181,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
}
assert (m_currentTable != m_accessTable);
// Modify operation state
- theStatus = SetBound;
+ theStatus = GetValue;
theOperationType = OpenRangeScanRequest;
range = true;
}
@@ -219,8 +219,17 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
req->transId1 = (Uint32) transId;
req->transId2 = (Uint32) (transId >> 32);
- getFirstATTRINFOScan();
+ NdbApiSignal* tSignal =
+ theFirstKEYINFO;
+ theFirstKEYINFO = (tSignal ? tSignal : tSignal = theNdb->getSignal());
+ theLastKEYINFO = tSignal;
+
+ tSignal->setSignal(GSN_KEYINFO);
+ theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
+ theTotalNrOfKeyWordInSignal= 0;
+
+ getFirstATTRINFOScan();
return getResultSet();
}
@@ -256,18 +265,7 @@ NdbScanOperation::fix_receivers(Uint32 parallel){
m_allocated_receivers = parallel;
}
- for(Uint32 i = 0; i<parallel; i++){
- m_receivers[i]->m_list_index = i;
- m_prepared_receivers[i] = m_receivers[i]->getId();
- m_sent_receivers[i] = m_receivers[i];
- m_conf_receivers[i] = 0;
- m_api_receivers[i] = 0;
- }
-
- m_api_receivers_count = 0;
- m_current_api_receiver = 0;
- m_sent_receivers_count = parallel;
- m_conf_receivers_count = 0;
+ reset_receivers(parallel, 0);
return 0;
}
@@ -355,6 +353,7 @@ NdbScanOperation::getFirstATTRINFOScan()
* After setBound() are done, move the accumulated ATTRINFO signals to
* a separate list. Then continue with normal scan.
*/
+#if 0
int
NdbIndexScanOperation::saveBoundATTRINFO()
{
@@ -401,6 +400,7 @@ NdbIndexScanOperation::saveBoundATTRINFO()
}
return res;
}
+#endif
#define WAITFOR_SCAN_TIMEOUT 120000
@@ -409,14 +409,22 @@ NdbScanOperation::executeCursor(int nodeId){
NdbConnection * tCon = theNdbCon;
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
+
+ Uint32 magic = tCon->theMagicNumber;
Uint32 seq = tCon->theNodeSequence;
+
if (tp->get_node_alive(nodeId) &&
(tp->getNodeSequence(nodeId) == seq)) {
-
- if(prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)
- return -1;
+ /**
+ * Only call prepareSendScan first time (incase of restarts)
+ * - check with theMagicNumber
+ */
tCon->theMagicNumber = 0x37412619;
+ if(magic != 0x37412619 &&
+ prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)
+ return -1;
+
if (doSendScan(nodeId) == -1)
return -1;
@@ -428,7 +436,6 @@ NdbScanOperation::executeCursor(int nodeId){
TRACE_DEBUG("The node is hard dead when attempting to start a scan");
setErrorCode(4029);
tCon->theReleaseOnClose = true;
- abort();
} else {
TRACE_DEBUG("The node is stopping when attempting to start a scan");
setErrorCode(4030);
@@ -635,7 +642,7 @@ NdbScanOperation::doSend(int ProcessorId)
void NdbScanOperation::closeScan()
{
- if(m_transConnection) do {
+ if(m_transConnection){
if(DEBUG_NEXT_RESULT)
ndbout_c("closeScan() theError.code = %d "
"m_api_receivers_count = %d "
@@ -648,55 +655,8 @@ void NdbScanOperation::closeScan()
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
-
- Uint32 seq = theNdbCon->theNodeSequence;
- Uint32 nodeId = theNdbCon->theDBnode;
-
- if(seq != tp->getNodeSequence(nodeId)){
- theNdbCon->theReleaseOnClose = true;
- break;
- }
-
- while(theError.code == 0 && m_sent_receivers_count){
- theNdb->theWaiter.m_node = nodeId;
- theNdb->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
- switch(return_code){
- case 0:
- break;
- case -1:
- setErrorCode(4008);
- case -2:
- m_api_receivers_count = 0;
- m_conf_receivers_count = 0;
- m_sent_receivers_count = 0;
- theNdbCon->theReleaseOnClose = true;
- }
- }
-
- if(m_api_receivers_count+m_conf_receivers_count){
- // Send close scan
- send_next_scan(0, true); // Close scan
- }
+ close_impl(tp);
- /**
- * wait for close scan conf
- */
- while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
- theNdb->theWaiter.m_node = nodeId;
- theNdb->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
- switch(return_code){
- case 0:
- break;
- case -1:
- setErrorCode(4008);
- case -2:
- m_api_receivers_count = 0;
- m_conf_receivers_count = 0;
- m_sent_receivers_count = 0;
- }
- }
} while(0);
theNdbCon->theScanningOp = 0;
@@ -750,11 +710,6 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
return -1;
}
- if (theStatus == SetBound) {
- ((NdbIndexScanOperation*)this)->saveBoundATTRINFO();
- theStatus = GetValue;
- }
-
theErrorLine = 0;
// In preapareSendInterpreted we set the sizes (word 4-8) in the
@@ -766,26 +721,7 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
((NdbIndexScanOperation*)this)->fix_get_values();
}
- const Uint32 transId1 = (Uint32) (aTransactionId & 0xFFFFFFFF);
- const Uint32 transId2 = (Uint32) (aTransactionId >> 32);
-
- if (theOperationType == OpenRangeScanRequest) {
- NdbApiSignal* tSignal = theBoundATTRINFO;
- do{
- tSignal->setData(aTC_ConnectPtr, 1);
- tSignal->setData(transId1, 2);
- tSignal->setData(transId2, 3);
- tSignal = tSignal->next();
- } while (tSignal != NULL);
- }
theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
- NdbApiSignal* tSignal = theFirstATTRINFO;
- do{
- tSignal->setData(aTC_ConnectPtr, 1);
- tSignal->setData(transId1, 2);
- tSignal->setData(transId2, 3);
- tSignal = tSignal->next();
- } while (tSignal != NULL);
/**
* Prepare all receivers
@@ -808,20 +744,28 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
req->batch_byte_size= batch_byte_size;
req->first_batch_size= first_batch_size;
+ /**
+ * Set keyinfo flag
+ * (Always keyinfo when using blobs)
+ */
+ Uint32 reqInfo = req->requestInfo;
+ ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
+ req->requestInfo = reqInfo;
+
for(Uint32 i = 0; i<theParallelism; i++){
m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size);
}
return 0;
}
-/******************************************************************************
+/*****************************************************************************
int doSend()
Return Value: Return >0 : send was succesful, returns number of signals sent
Return -1: In all other case.
Parameters: aProcessorId: Receiving processor node
Remark: Sends the ATTRINFO signal(s)
-******************************************************************************/
+*****************************************************************************/
int
NdbScanOperation::doSendScan(int aProcessorId)
{
@@ -841,13 +785,18 @@ NdbScanOperation::doSendScan(int aProcessorId)
setErrorCode(4001);
return -1;
}
+
+ Uint32 tupKeyLen = theTupKeyLen;
+ Uint32 len = theTotalNrOfKeyWordInSignal;
+ Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;
+ Uint64 transId = theNdbCon->theTransactionId;
+
// Update the "attribute info length in words" in SCAN_TABREQ before
// sending it. This could not be done in openScan because
// we created the ATTRINFO signals after the SCAN_TABREQ signal.
ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
- req->attrLen = theTotalCurrAI_Len;
- if (theOperationType == OpenRangeScanRequest)
- req->attrLen += theTotalBoundAI_Len;
+ req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;
+
TransporterFacade *tp = TransporterFacade::instance();
LinearSectionPtr ptr[3];
ptr[0].p = m_prepared_receivers;
@@ -856,22 +805,41 @@ NdbScanOperation::doSendScan(int aProcessorId)
setErrorCode(4002);
return -1;
}
- if (theOperationType == OpenRangeScanRequest) {
+
+ if (tupKeyLen > 0){
// must have at least one signal since it contains attrLen for bounds
- assert(theBoundATTRINFO != NULL);
- tSignal = theBoundATTRINFO;
- while (tSignal != NULL) {
+ assert(theLastKEYINFO != NULL);
+ tSignal = theLastKEYINFO;
+ tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal);
+
+ assert(theFirstKEYINFO != NULL);
+ tSignal = theFirstKEYINFO;
+
+ NdbApiSignal* last;
+ do {
+ KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
+ keyInfo->connectPtr = aTC_ConnectPtr;
+ keyInfo->transId[0] = Uint32(transId);
+ keyInfo->transId[1] = Uint32(transId >> 32);
+
if (tp->sendSignal(tSignal,aProcessorId) == -1){
- setErrorCode(4002);
- return -1;
+ setErrorCode(4002);
+ return -1;
}
+
tSignalCount++;
+ last = tSignal;
tSignal = tSignal->next();
- }
+ } while(last != theLastKEYINFO);
}
tSignal = theFirstATTRINFO;
while (tSignal != NULL) {
+ AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend());
+ attrInfo->connectPtr = aTC_ConnectPtr;
+ attrInfo->transId[0] = Uint32(transId);
+ attrInfo->transId[1] = Uint32(transId >> 32);
+
if (tp->sendSignal(tSignal,aProcessorId) == -1){
setErrorCode(4002);
return -1;
@@ -883,7 +851,7 @@ NdbScanOperation::doSendScan(int aProcessorId)
return tSignalCount;
}//NdbOperation::doSendScan()
-/******************************************************************************
+/*****************************************************************************
* NdbOperation* takeOverScanOp(NdbConnection* updateTrans);
*
* Parameters: The update transactions NdbConnection pointer.
@@ -902,7 +870,7 @@ NdbScanOperation::doSendScan(int aProcessorId)
* This means that the updating transactions can be placed
* in separate threads and thus increasing the parallelism during
* the scan process.
- *****************************************************************************/
+ ****************************************************************************/
int
NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
{
@@ -1011,6 +979,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
NdbBlob*
NdbScanOperation::getBlobHandle(const char* anAttrName)
{
+ m_keyInfo = 1;
return NdbOperation::getBlobHandle(m_transConnection,
m_currentTable->getColumn(anAttrName));
}
@@ -1018,6 +987,7 @@ NdbScanOperation::getBlobHandle(const char* anAttrName)
NdbBlob*
NdbScanOperation::getBlobHandle(Uint32 anAttrId)
{
+ m_keyInfo = 1;
return NdbOperation::getBlobHandle(m_transConnection,
m_currentTable->getColumn(anAttrId));
}
@@ -1031,13 +1001,15 @@ NdbIndexScanOperation::~NdbIndexScanOperation(){
}
int
-NdbIndexScanOperation::setBound(const char* anAttrName, int type, const void* aValue, Uint32 len)
+NdbIndexScanOperation::setBound(const char* anAttrName, int type,
+ const void* aValue, Uint32 len)
{
return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len);
}
int
-NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len)
+NdbIndexScanOperation::setBound(Uint32 anAttrId, int type,
+ const void* aValue, Uint32 len)
{
return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len);
}
@@ -1056,11 +1028,6 @@ NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
return NdbScanOperation::getValue_impl(attrInfo, aValue);
}
- if (theStatus == SetBound) {
- saveBoundATTRINFO();
- theStatus = GetValue;
- }
-
int id = attrInfo->m_attrId; // In "real" table
assert(m_accessTable->m_index);
int sz = (int)m_accessTable->m_index->m_key_ids.size();
@@ -1101,12 +1068,13 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
int type, const void* aValue, Uint32 len)
{
if (theOperationType == OpenRangeScanRequest &&
- theStatus == SetBound &&
(0 <= type && type <= 4) &&
len <= 8000) {
// insert bound type
- insertATTRINFO(type);
+ Uint32 currLen = theTotalNrOfKeyWordInSignal;
+ Uint32 remaining = KeyInfo::DataLength - currLen;
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
+
// normalize char bound
CHARSET_INFO* cs = tAttrInfo->m_cs;
Uint32 xfrmData[2000];
@@ -1130,19 +1098,34 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
Uint32 tIndexAttrId = tAttrInfo->m_attrId;
Uint32 sizeInWords = (len + 3) / 4;
AttributeHeader ah(tIndexAttrId, sizeInWords);
- insertATTRINFO(ah.m_value);
- if (len != 0) {
- // insert attribute data
- if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0)
- insertATTRINFOloop((const Uint32*)aValue, sizeInWords);
- else {
- Uint32 tempData[2000];
- memcpy(tempData, aValue, len);
+ const Uint32 ahValue = ah.m_value;
+
+ const bool aligned = (UintPtr(aValue) & 3) == 0;
+ const bool nobytes = (len & 0x3) == 0;
+ const Uint32 totalLen = 2 + sizeInWords;
+ Uint32 tupKeyLen = theTupKeyLen;
+ if(remaining > totalLen && aligned && nobytes){
+ Uint32 * dst = theKEYINFOptr + currLen;
+ * dst ++ = type;
+ * dst ++ = ahValue;
+ memcpy(dst, aValue, 4 * sizeInWords);
+ theTotalNrOfKeyWordInSignal = currLen + totalLen;
+ } else {
+ if(!aligned || !nobytes){
+ Uint32 tempData[2002];
+ tempData[0] = type;
+ tempData[1] = ahValue;
+ memcpy(tempData+2, aValue, len);
while ((len & 0x3) != 0)
- ((char*)tempData)[len++] = 0;
- insertATTRINFOloop(tempData, sizeInWords);
+ ((char*)&tempData[2])[len++] = 0;
+ insertBOUNDS(tempData, 2+sizeInWords);
+ } else {
+ Uint32 buf[2] = { type, ahValue };
+ insertBOUNDS(buf, 2);
+ insertBOUNDS((Uint32*)aValue, sizeInWords);
}
}
+ theTupKeyLen = tupKeyLen + totalLen;
/**
* Do sorted stuff
@@ -1165,6 +1148,46 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
}
}
+int
+NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
+ Uint32 len;
+ Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal;
+ Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal;
+ do {
+ len = (sz < remaining ? sz : remaining);
+ memcpy(dst, data, 4 * len);
+
+ if(sz >= remaining){
+ NdbApiSignal* tCurr = theLastKEYINFO;
+ tCurr->setLength(KeyInfo::MaxSignalLength);
+ NdbApiSignal* tSignal = tCurr->next();
+ if(tSignal)
+ ;
+ else if((tSignal = theNdb->getSignal()) != 0)
+ {
+ tCurr->next(tSignal);
+ tSignal->setSignal(GSN_KEYINFO);
+ } else {
+ goto error;
+ }
+ theLastKEYINFO = tSignal;
+ theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
+ remaining = KeyInfo::DataLength;
+ sz -= len;
+ data += len;
+ } else {
+ len = (KeyInfo::DataLength - remaining) + len;
+ break;
+ }
+ } while(sz >= 0);
+ theTotalNrOfKeyWordInSignal = len;
+ return 0;
+
+error:
+ setErrorCodeAbort(4228); // XXX wrong code
+ return -1;
+}
+
NdbResultSet*
NdbIndexScanOperation::readTuples(LockMode lm,
Uint32 batch,
@@ -1173,9 +1196,23 @@ NdbIndexScanOperation::readTuples(LockMode lm,
NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0);
if(rs && order_by){
m_ordered = 1;
- m_sort_columns = m_accessTable->getNoOfColumns() - 1; // -1 for NDB$NODE
+ Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
+ m_sort_columns = cnt; // -1 for NDB$NODE
m_current_api_receiver = m_sent_receivers_count;
m_api_receivers_count = m_sent_receivers_count;
+
+ m_sort_columns = cnt;
+ for(Uint32 i = 0; i<cnt; i++){
+ const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
+ const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
+ NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
+ UintPtr newVal = UintPtr(tmp);
+ theTupleKeyDefined[i][0] = FAKE_PTR;
+ theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
+#if (SIZEOF_CHARP == 8)
+ theTupleKeyDefined[i][2] = (newVal >> 32);
+#endif
+ }
}
return rs;
}
@@ -1396,10 +1433,7 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
}
int
-NdbScanOperation::restart(){
- TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
-
+NdbScanOperation::close_impl(TransporterFacade* tp){
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
@@ -1407,8 +1441,8 @@ NdbScanOperation::restart(){
theNdbCon->theReleaseOnClose = true;
return -1;
}
-
- while(m_sent_receivers_count){
+
+ while(theError.code == 0 && m_sent_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@@ -1421,14 +1455,17 @@ NdbScanOperation::restart(){
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
+ theNdbCon->theReleaseOnClose = true;
return -1;
}
}
if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan
- if(send_next_scan(0, true) == -1) // Close scan
+ if(send_next_scan(0, true) == -1){ // Close scan
+ theNdbCon->theReleaseOnClose = true;
return -1;
+ }
}
/**
@@ -1447,15 +1484,15 @@ NdbScanOperation::restart(){
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
+ theNdbCon->theReleaseOnClose = true;
return -1;
}
}
+ return 0;
+}
- /**
- * Reset receivers
- */
- const Uint32 parallell = theParallelism;
-
+void
+NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
for(Uint32 i = 0; i<parallell; i++){
m_receivers[i]->m_list_index = i;
m_prepared_receivers[i] = m_receivers[i]->getId();
@@ -1470,13 +1507,64 @@ NdbScanOperation::restart(){
m_sent_receivers_count = parallell;
m_conf_receivers_count = 0;
- if(m_ordered){
+ if(ordered){
m_current_api_receiver = parallell;
m_api_receivers_count = parallell;
}
+}
+
+int
+NdbScanOperation::restart()
+{
+
+ TransporterFacade* tp = TransporterFacade::instance();
+ Guard guard(tp->theMutexPtr);
+ Uint32 nodeId = theNdbCon->theDBnode;
+
+ {
+ int res;
+ if((res= close_impl(tp)))
+ {
+ return res;
+ }
+ }
+
+ /**
+ * Reset receivers
+ */
+ reset_receivers(theParallelism, m_ordered);
+ theError.code = 0;
if (doSendScan(nodeId) == -1)
return -1;
return 0;
}
+
+int
+NdbIndexScanOperation::reset_bounds(){
+ int res;
+
+ {
+ TransporterFacade* tp = TransporterFacade::instance();
+ Guard guard(tp->theMutexPtr);
+ res= close_impl(tp);
+ }
+
+ if(!res)
+ {
+ theError.code = 0;
+ reset_receivers(theParallelism, m_ordered);
+
+ theLastKEYINFO = theFirstKEYINFO;
+ theKEYINFOptr = ((KeyInfo*)theFirstKEYINFO->getDataPtrSend())->keyData;
+ theTupKeyLen = 0;
+ theTotalNrOfKeyWordInSignal = 0;
+ m_transConnection
+ ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp,
+ this);
+ m_transConnection->define_scan_op(this);
+ return 0;
+ }
+ return res;
+}
diff --git a/ndb/src/ndbapi/Ndblist.cpp b/ndb/src/ndbapi/Ndblist.cpp
index af98aa09280..b6739b66dce 100644
--- a/ndb/src/ndbapi/Ndblist.cpp
+++ b/ndb/src/ndbapi/Ndblist.cpp
@@ -649,8 +649,8 @@ Remark: Always release the first item in the free list
void
Ndb::freeScanOperation()
{
- NdbScanOperation* tOp = theScanOpIdleList;
- theScanOpIdleList = (NdbIndexScanOperation *) theScanOpIdleList->next();
+ NdbIndexScanOperation* tOp = theScanOpIdleList;
+ theScanOpIdleList = (NdbIndexScanOperation *)tOp->next();
delete tOp;
}
diff --git a/ndb/test/include/NDBT_ResultRow.hpp b/ndb/test/include/NDBT_ResultRow.hpp
index aa54e892da3..6072d0ea510 100644
--- a/ndb/test/include/NDBT_ResultRow.hpp
+++ b/ndb/test/include/NDBT_ResultRow.hpp
@@ -24,8 +24,9 @@ public:
NDBT_ResultRow(const NdbDictionary::Table &tab, char attrib_delimiter='\t');
~NDBT_ResultRow();
NdbRecAttr * & attributeStore(int i);
- const NdbRecAttr * attributeStore(const char* name);
-
+ const NdbRecAttr * attributeStore(int i) const ;
+ const NdbRecAttr * attributeStore(const char* name) const ;
+
BaseString c_str();
NdbOut & header (NdbOut &) const;
diff --git a/ndb/test/include/UtilTransactions.hpp b/ndb/test/include/UtilTransactions.hpp
index 1298028d591..37cd99550a5 100644
--- a/ndb/test/include/UtilTransactions.hpp
+++ b/ndb/test/include/UtilTransactions.hpp
@@ -87,19 +87,30 @@ private:
int verifyUniqueIndex(Ndb*,
- const char* indexName,
+ const NdbDictionary::Index *,
int parallelism = 0,
bool transactional = false);
-
+
int scanAndCompareUniqueIndex(Ndb* pNdb,
- const char * indexName,
+ const NdbDictionary::Index *,
int parallelism,
bool transactional);
int readRowFromTableAndIndex(Ndb* pNdb,
NdbConnection* pTrans,
- const char * indexName,
+ const NdbDictionary::Index *,
NDBT_ResultRow& row );
+
+ int verifyOrderedIndex(Ndb*,
+ const NdbDictionary::Index *,
+ int parallelism = 0,
+ bool transactional = false);
+
+
+ int get_values(NdbOperation* op, NDBT_ResultRow& dst);
+ int equal(const NdbDictionary::Table*, NdbOperation*, const NDBT_ResultRow&);
+ int equal(const NdbDictionary::Index*, NdbOperation*, const NDBT_ResultRow&);
+
protected:
int m_defaultClearMethod;
const NdbDictionary::Table& tab;
diff --git a/ndb/test/ndbapi/testBlobs.cpp b/ndb/test/ndbapi/testBlobs.cpp
index e18f4a8bd1a..41bb82f3e06 100644
--- a/ndb/test/ndbapi/testBlobs.cpp
+++ b/ndb/test/ndbapi/testBlobs.cpp
@@ -1030,7 +1030,7 @@ readScan(int style, bool idx)
} else {
CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
}
- CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Exclusive)) != 0);
+ CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Read)) != 0);
CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
if (g_opt.m_pk2len != 0)
CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
diff --git a/ndb/test/src/NDBT_ResultRow.cpp b/ndb/test/src/NDBT_ResultRow.cpp
index 7c419444760..f82963901b1 100644
--- a/ndb/test/src/NDBT_ResultRow.cpp
+++ b/ndb/test/src/NDBT_ResultRow.cpp
@@ -58,10 +58,14 @@ NDBT_ResultRow::attributeStore(int i){
return data[i];
}
+const NdbRecAttr*
+NDBT_ResultRow::attributeStore(int i) const {
+ return data[i];
+}
const
NdbRecAttr *
-NDBT_ResultRow::attributeStore(const char* name){
+NDBT_ResultRow::attributeStore(const char* name) const {
for(int i = 0; i<cols; i++){
if (strcmp(names[i], name) == 0)
return data[i];
diff --git a/ndb/test/src/UtilTransactions.cpp b/ndb/test/src/UtilTransactions.cpp
index 506356dd140..52341c0e0e6 100644
--- a/ndb/test/src/UtilTransactions.cpp
+++ b/ndb/test/src/UtilTransactions.cpp
@@ -854,11 +854,12 @@ UtilTransactions::verifyIndex(Ndb* pNdb,
ndbout << " Index " << indexName << " does not exist!" << endl;
return NDBT_FAILED;
}
-
+
switch (pIndex->getType()){
case NdbDictionary::Index::UniqueHashIndex:
+ return verifyUniqueIndex(pNdb, pIndex, parallelism, transactional);
case NdbDictionary::Index::OrderedIndex:
- return verifyUniqueIndex(pNdb, indexName, parallelism, transactional);
+ return verifyOrderedIndex(pNdb, pIndex, parallelism, transactional);
break;
default:
ndbout << "Unknown index type" << endl;
@@ -870,7 +871,7 @@ UtilTransactions::verifyIndex(Ndb* pNdb,
int
UtilTransactions::verifyUniqueIndex(Ndb* pNdb,
- const char* indexName,
+ const NdbDictionary::Index * pIndex,
int parallelism,
bool transactional){
@@ -882,7 +883,7 @@ UtilTransactions::verifyUniqueIndex(Ndb* pNdb,
*/
if (scanAndCompareUniqueIndex(pNdb,
- indexName,
+ pIndex,
parallelism,
transactional) != NDBT_OK){
return NDBT_FAILED;
@@ -896,7 +897,7 @@ UtilTransactions::verifyUniqueIndex(Ndb* pNdb,
int
UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
- const char * indexName,
+ const NdbDictionary::Index* pIndex,
int parallelism,
bool transactional){
@@ -996,7 +997,7 @@ UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
if (readRowFromTableAndIndex(pNdb,
pTrans,
- indexName,
+ pIndex,
row) != NDBT_OK){
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
@@ -1027,15 +1028,9 @@ UtilTransactions::scanAndCompareUniqueIndex(Ndb* pNdb,
int
UtilTransactions::readRowFromTableAndIndex(Ndb* pNdb,
NdbConnection* scanTrans,
- const char * indexName,
+ const NdbDictionary::Index* pIndex,
NDBT_ResultRow& row ){
- const NdbDictionary::Index* pIndex
- = pNdb->getDictionary()->getIndex(indexName, tab.getName());
- if (pIndex == 0){
- ndbout << " Index " << indexName << " does not exist!" << endl;
- return NDBT_FAILED;
- }
NdbDictionary::Index::Type indexType= pIndex->getType();
int retryAttempt = 0;
@@ -1050,7 +1045,7 @@ UtilTransactions::readRowFromTableAndIndex(Ndb* pNdb,
// Allocate place to store the result
NDBT_ResultRow tabRow(tab);
NDBT_ResultRow indexRow(tab);
-
+ const char * indexName = pIndex->getName();
while (true){
if(retryAttempt)
@@ -1281,3 +1276,239 @@ close_all:
return return_code;
}
+
+int
+UtilTransactions::verifyOrderedIndex(Ndb* pNdb,
+ const NdbDictionary::Index* pIndex,
+ int parallelism,
+ bool transactional){
+
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check;
+ NdbConnection *pTrans;
+ NdbScanOperation *pOp;
+ NdbIndexScanOperation * iop = 0;
+ NdbResultSet* cursor= 0;
+
+ NDBT_ResultRow scanRow(tab);
+ NDBT_ResultRow pkRow(tab);
+ NDBT_ResultRow indexRow(tab);
+ const char * indexName = pIndex->getName();
+
+ int res;
+ parallelism = 1;
+
+ while (true){
+
+ if (retryAttempt >= retryMax){
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ pOp = pTrans->getNdbScanOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ NdbResultSet*
+ rs = pOp->readTuples(NdbScanOperation::LM_Read, 0, parallelism);
+
+ if( rs == 0 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->interpret_exit_ok();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ if(get_values(pOp, scanRow))
+ {
+ abort();
+ }
+
+ check = pTrans->execute(NoCommit);
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ int eof;
+ int rows = 0;
+ while(check == 0 && (eof = rs->nextResult()) == 0){
+ rows++;
+
+ bool null_found= false;
+ for(int a = 0; a<(int)pIndex->getNoOfColumns(); a++){
+ const NdbDictionary::Column * col = pIndex->getColumn(a);
+ if (scanRow.attributeStore(col->getName())->isNULL())
+ {
+ null_found= true;
+ break;
+ }
+ }
+
+ // Do pk lookup
+ NdbOperation * pk = pTrans->getNdbOperation(tab.getName());
+ if(!pk || pk->readTuple())
+ goto error;
+ if(equal(&tab, pk, scanRow) || get_values(pk, pkRow))
+ goto error;
+
+ if(!null_found)
+ {
+ if(!iop && (iop= pTrans->getNdbIndexScanOperation(indexName,
+ tab.getName())))
+ {
+ cursor= iop->readTuples(NdbScanOperation::LM_CommittedRead,
+ parallelism);
+ iop->interpret_exit_ok();
+ if(!cursor || get_values(iop, indexRow))
+ goto error;
+ }
+ else if(!iop || iop->reset_bounds())
+ {
+ goto error;
+ }
+
+ if(equal(pIndex, iop, scanRow))
+ goto error;
+ }
+
+ check = pTrans->execute(NoCommit);
+ if(check)
+ goto error;
+
+ if(scanRow.c_str() != pkRow.c_str()){
+ g_err << "Error when comapring records" << endl;
+ g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl;
+ g_err << " pkRow: \n" << pkRow.c_str().c_str() << endl;
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ if(!null_found)
+ {
+
+ if((res= cursor->nextResult()) != 0){
+ g_err << "Failed to find row using index: " << res << endl;
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ if(scanRow.c_str() != indexRow.c_str()){
+ g_err << "Error when comapring records" << endl;
+ g_err << " scanRow: \n" << scanRow.c_str().c_str() << endl;
+ g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl;
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ if(cursor->nextResult() == 0){
+ g_err << "Found extra row!!" << endl;
+ g_err << " indexRow: \n" << indexRow.c_str().c_str() << endl;
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ if (eof == -1 || check == -1) {
+ error:
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ iop = 0;
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ rows--;
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ pNdb->closeTransaction(pTrans);
+
+ return NDBT_OK;
+ }
+ return NDBT_FAILED;
+}
+
+int
+UtilTransactions::get_values(NdbOperation* op, NDBT_ResultRow& dst)
+{
+ for (int a = 0; a < tab.getNoOfColumns(); a++){
+ NdbRecAttr*& ref= dst.attributeStore(a);
+ if ((ref= op->getValue(a)) == 0)
+ {
+ return NDBT_FAILED;
+ }
+ }
+ return 0;
+}
+
+int
+UtilTransactions::equal(const NdbDictionary::Index* pIndex,
+ NdbOperation* op, const NDBT_ResultRow& src)
+{
+ for(Uint32 a = 0; a<pIndex->getNoOfColumns(); a++){
+ const NdbDictionary::Column * col = pIndex->getColumn(a);
+ if(op->equal(col->getName(),
+ src.attributeStore(col->getName())->aRef()) != 0){
+ return NDBT_FAILED;
+ }
+ }
+ return 0;
+}
+
+int
+UtilTransactions::equal(const NdbDictionary::Table* pTable,
+ NdbOperation* op, const NDBT_ResultRow& src)
+{
+ for(Uint32 a = 0; a<tab.getNoOfColumns(); a++){
+ const NdbDictionary::Column* attr = tab.getColumn(a);
+ if (attr->getPrimaryKey() == true){
+ if (op->equal(attr->getName(), src.attributeStore(a)->aRef()) != 0){
+ return NDBT_FAILED;
+ }
+ }
+ }
+ return 0;
+}
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 8faa0b33756..6d643298a79 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -827,10 +827,7 @@ int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
if (type == TL_WRITE_ALLOW_WRITE)
return NdbOperation::LM_Exclusive;
else if (uses_blob_value(retrieve_all_fields))
- /*
- TODO use a new scan mode to read + lock + keyinfo
- */
- return NdbOperation::LM_Exclusive;
+ return NdbOperation::LM_Read;
else
return NdbOperation::LM_CommittedRead;
}
@@ -1342,6 +1339,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
const key_range *end_key,
bool sorted, byte* buf)
{
+ bool restart;
NdbConnection *trans= m_active_trans;
NdbResultSet *cursor;
NdbIndexScanOperation *op;
@@ -1353,16 +1351,28 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_EXECUTE("enter", print_key(start_key, "start_key"););
DBUG_EXECUTE("enter", print_key(end_key, "end_key"););
-
- NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
- m_index[active_index].index,
- (const NDBTAB *) m_table)) ||
- !(cursor= op->readTuples(lm, 0, parallelism, sorted)))
- ERR_RETURN(trans->getNdbError());
- m_active_cursor= cursor;
-
+ if(m_active_cursor == 0)
+ {
+ restart= false;
+ NdbOperation::LockMode lm=
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
+ m_index[active_index].index,
+ (const NDBTAB *) m_table)) ||
+ !(cursor= op->readTuples(lm, 0, parallelism, sorted)))
+ ERR_RETURN(trans->getNdbError());
+ m_active_cursor= cursor;
+ } else {
+ restart= true;
+ op= (NdbIndexScanOperation*)m_active_cursor->getOperation();
+
+ DBUG_ASSERT(op->getSorted() == sorted);
+ DBUG_ASSERT(op->getLockMode() ==
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
+ if(op->reset_bounds())
+ DBUG_RETURN(ndb_err(m_active_trans));
+ }
+
if (start_key &&
set_bounds(op, start_key,
(start_key->flag == HA_READ_KEY_EXACT) ?
@@ -1384,10 +1394,19 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
NdbIndexScanOperation::BoundGT))
DBUG_RETURN(1);
}
- DBUG_RETURN(define_read_attrs(buf, op));
+ if(!restart)
+ {
+ DBUG_RETURN(define_read_attrs(buf, op));
+ }
+ else
+ {
+ if (execute_no_commit(this,trans) != 0)
+ DBUG_RETURN(ndb_err(trans));
+
+ DBUG_RETURN(next_result(buf));
+ }
}
-
/*
Start a filtered scan in NDB.
@@ -2263,9 +2282,6 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf");
DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted));
- if (m_active_cursor)
- close_scan();
-
switch (get_index_type(active_index)){
case PRIMARY_KEY_ORDERED_INDEX:
case PRIMARY_KEY_INDEX:
@@ -2293,11 +2309,8 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
break;
}
-
// Start the ordered index scan and fetch the first row
- error= ordered_index_scan(start_key, end_key, sorted,
- buf);
-
+ error= ordered_index_scan(start_key, end_key, sorted, buf);
DBUG_RETURN(error);
}