summaryrefslogtreecommitdiff
path: root/ndb
diff options
context:
space:
mode:
Diffstat (limited to 'ndb')
-rw-r--r--ndb/src/kernel/blocks/dblqh/Dblqh.hpp19
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhMain.cpp54
-rw-r--r--ndb/src/kernel/blocks/dbtc/Dbtc.hpp11
-rw-r--r--ndb/src/kernel/blocks/dbtc/DbtcMain.cpp80
-rw-r--r--ndb/src/kernel/blocks/suma/Suma.cpp2
-rw-r--r--ndb/src/ndbapi/NdbConnection.cpp6
-rw-r--r--ndb/src/ndbapi/NdbConnectionScan.cpp24
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp135
-rw-r--r--ndb/test/include/HugoTransactions.hpp16
-rw-r--r--ndb/test/include/UtilTransactions.hpp4
-rw-r--r--ndb/test/ndbapi/testScan.cpp9
-rw-r--r--ndb/test/src/HugoTransactions.cpp166
-rw-r--r--ndb/test/src/UtilTransactions.cpp23
-rw-r--r--ndb/test/tools/create_index.cpp2
-rw-r--r--ndb/test/tools/hugoScanRead.cpp50
-rw-r--r--ndb/tools/select_all.cpp7
16 files changed, 437 insertions, 171 deletions
diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
index d6987f3e478..983fda9b7be 100644
--- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
+++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
@@ -2925,4 +2925,23 @@ Dblqh::ScanRecord::check_scan_batch_completed() const
(max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes));
}
+inline
+void
+Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
+{
+ if (index == 0) {
+ acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
+ } else {
+ Uint32 attr_buf_index, attr_buf_rec;
+
+ AttrbufPtr regAttrPtr;
+ jam();
+ attr_buf_rec= (index + 31) / 32;
+ attr_buf_index= (index - 1) & 31;
+ regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
+ ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
+ acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
+ }
+}
+
#endif
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index cd15ad0c3b2..e9e81d55488 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -7058,10 +7058,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
// Update timer on tcConnectRecord
tcConnectptr.p->tcTimer = cLqhTimeOutCount;
-
init_acc_ptr_list(scanptr.p);
- scanptr.p->m_curr_batch_size_rows = 0;
- scanptr.p->m_curr_batch_size_bytes= 0;
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
scanNextLoopLab(signal);
}//Dblqh::continueScanNextReqLab()
@@ -7260,22 +7257,32 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
tcConnectptr.i = scanptr.p->scanTcrec;
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
releaseActiveFrag(signal);
+
if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) {
if ((scanptr.p->scanErrorCounter > 0) ||
(scanptr.p->scanCompletedStatus == ZTRUE)) {
jam();
+ scanptr.p->m_curr_batch_size_rows = 0;
+ scanptr.p->m_curr_batch_size_bytes = 0;
closeScanLab(signal);
} else if (scanptr.p->check_scan_batch_completed() &&
scanptr.p->scanLockHold != ZTRUE) {
jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
+ } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
+ jam();
+ closeScanLab(signal);
+ return;
} else {
jam();
/*
- We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only
- come here when scanHoldLock == ZTRUE
- */
+ * We came here after releasing locks after
+ * receiving SCAN_NEXTREQ from TC. We only come here
+ * when scanHoldLock == ZTRUE
+ */
+ scanptr.p->m_curr_batch_size_rows = 0;
+ scanptr.p->m_curr_batch_size_bytes = 0;
continueScanNextReqLab(signal);
}//if
} else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) {
@@ -7362,25 +7369,6 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP)
scanP->scan_acc_index = 0;
}
-inline
-void
-Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
-{
- if (index == 0) {
- acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
- } else {
- Uint32 attr_buf_index, attr_buf_rec;
-
- AttrbufPtr regAttrPtr;
- jam();
- attr_buf_rec= (index + 31) / 32;
- attr_buf_index= (index - 1) & 31;
- regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
- ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
- acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
- }
-}
-
Uint32
Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP,
Uint32 index,
@@ -7904,6 +7892,13 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
/*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
************************************************************ */
+ if (!scanptr.p->scanLockHold)
+ {
+ jam();
+ closeScanLab(signal);
+ return;
+ }
+
if (scanptr.p->scanCompletedStatus == ZTRUE) {
if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) {
@@ -8404,8 +8399,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
ScanFragRef::SignalLength, JBB);
} else {
jam();
- scanptr.p->m_curr_batch_size_rows = 0;
- scanptr.p->m_curr_batch_size_bytes= 0;
sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
}//if
finishScanrec(signal);
@@ -8809,6 +8802,13 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
conf->total_len= total_len;
sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF,
signal, ScanFragConf::SignalLength, JBB);
+
+ if(!scanptr.p->scanLockHold)
+ {
+ jam();
+ scanptr.p->m_curr_batch_size_rows = 0;
+ scanptr.p->m_curr_batch_size_bytes= 0;
+ }
}//Dblqh::sendScanFragConf()
/* ######################################################################### */
diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
index a209df24c44..fb90ccc8c90 100644
--- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
+++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
@@ -1054,9 +1054,8 @@ public:
// Id of the ScanRecord this fragment scan belongs to
Uint32 scanRec;
- // The maximum number of operations that can be scanned before
- // returning to TC
- Uint16 scanFragConcurrency;
+ // The value of fragmentCompleted in the last received SCAN_FRAGCONF
+ Uint8 m_scan_frag_conf_status;
inline void startFragTimer(Uint32 timeVal){
scanFragTimer = timeVal;
@@ -1193,8 +1192,10 @@ public:
// Number of operation records per scanned fragment
// Number of operations in first batch
// Max number of bytes per batch
- Uint16 noOprecPerFrag;
- Uint16 first_batch_size;
+ union {
+ Uint16 first_batch_size_rows;
+ Uint16 batch_size_rows;
+ };
Uint32 batch_byte_size;
Uint32 scanRequestInfo; // ScanFrag format
diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
index d8b3ee10532..c7f467484bd 100644
--- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
+++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
@@ -8646,9 +8646,9 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
scanptr.p->scanTableref = tabptr.i;
scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion;
scanptr.p->scanParallel = scanParallel;
- scanptr.p->noOprecPerFrag = noOprecPerFrag;
- scanptr.p->first_batch_size= scanTabReq->first_batch_size;
- scanptr.p->batch_byte_size= scanTabReq->batch_byte_size;
+ scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size;
+ scanptr.p->batch_byte_size = scanTabReq->batch_byte_size;
+ scanptr.p->batch_size_rows = noOprecPerFrag;
Uint32 tmp = 0;
const UintR ri = scanTabReq->requestInfo;
@@ -8672,7 +8672,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
ndbrequire(list.seize(ptr));
ptr.p->scanRec = scanptr.i;
ptr.p->scanFragId = 0;
- ptr.p->scanFragConcurrency = noOprecPerFrag;
ptr.p->m_apiPtr = cdata[i];
}//for
@@ -9141,6 +9140,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0];
const Uint32 noCompletedOps = conf->completedOps;
+ const Uint32 status = conf->fragmentCompleted;
scanFragptr.i = conf->senderData;
c_scan_frag_pool.getPtr(scanFragptr);
@@ -9163,11 +9163,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
- const Uint32 status = conf->fragmentCompleted;
-
if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
jam();
- if(status == ZFALSE){
+ if(status == 0){
/**
* We have started closing = we sent a close -> ignore this
*/
@@ -9184,11 +9182,11 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
return;
}
- if(status == ZCLOSED && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
+ if(noCompletedOps == 0 && status != 0 &&
+ scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
/**
* Start on next fragment
*/
- ndbrequire(noCompletedOps == 0);
scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
scanFragptr.p->startFragTimer(ctcTimer);
@@ -9218,6 +9216,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
scanptr.p->m_queued_count++;
}
+ scanFragptr.p->m_scan_frag_conf_status = status;
scanFragptr.p->m_ops = noCompletedOps;
scanFragptr.p->m_totalLen = total_len;
scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY;
@@ -9330,11 +9329,12 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
// Copy op ptrs so I dont overwrite them when sending...
memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len);
- ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0];
- nextReq->closeFlag = ZFALSE;
- nextReq->transId1 = apiConnectptr.p->transid[0];
- nextReq->transId2 = apiConnectptr.p->transid[1];
- nextReq->batch_size_bytes= scanP->batch_byte_size;
+ ScanFragNextReq tmp;
+ tmp.closeFlag = ZFALSE;
+ tmp.transId1 = apiConnectptr.p->transid[0];
+ tmp.transId2 = apiConnectptr.p->transid[1];
+ tmp.batch_size_rows = scanP->batch_size_rows;
+ tmp.batch_size_bytes = scanP->batch_byte_size;
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
@@ -9344,15 +9344,37 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
c_scan_frag_pool.getPtr(scanFragptr);
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED);
- scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
scanFragptr.p->startFragTimer(ctcTimer);
-
scanFragptr.p->m_ops = 0;
- nextReq->senderData = scanFragptr.i;
- nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency;
- sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
- ScanFragNextReq::SignalLength, JBB);
+ if(scanFragptr.p->m_scan_frag_conf_status)
+ {
+ /**
+ * last scan was complete
+ */
+ jam();
+ ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag);
+ scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
+
+ tcConnectptr.i = scanptr.p->scanTcrec;
+ ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
+ scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++;
+ signal->theData[0] = tcConnectptr.p->dihConnectptr;
+ signal->theData[1] = scanFragptr.i;
+ signal->theData[2] = scanptr.p->scanTableref;
+ signal->theData[3] = scanFragptr.p->scanFragId;
+ sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB);
+ }
+ else
+ {
+ jam();
+ scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
+ ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend();
+ * req = tmp;
+ req->senderData = scanFragptr.i;
+ sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
+ ScanFragNextReq::SignalLength, JBB);
+ }
delivered.remove(scanFragptr);
running.add(scanFragptr);
}//for
@@ -9551,7 +9573,7 @@ void Dbtc::sendScanFragReq(Signal* signal,
req->transId1 = apiConnectptr.p->transid[0];
req->transId2 = apiConnectptr.p->transid[1];
req->clientOpPtr = scanFragP->m_apiPtr;
- req->batch_size_rows= scanFragP->scanFragConcurrency;
+ req->batch_size_rows= scanP->batch_size_rows;
req->batch_size_bytes= scanP->batch_byte_size;
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB);
@@ -9573,6 +9595,8 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
jam();
ops += 21;
}
+
+ Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId;
ScanTabConf * conf = (ScanTabConf*)&signal->theData[0];
conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
@@ -9588,24 +9612,25 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
ScanFragRecPtr curr = ptr; // Remove while iterating...
queued.next(ptr);
+ bool done = curr.p->m_scan_frag_conf_status && --left;
+
* ops++ = curr.p->m_apiPtr;
- * ops++ = curr.i;
+ * ops++ = done ? RNIL : curr.i;
* ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops;
queued.remove(curr);
- if(curr.p->m_ops > 0){
+ if(!done){
delivered.add(curr);
curr.p->scanFragState = ScanFragRec::DELIVERED;
curr.p->stopFragTimer();
} else {
- (* --ops) = ScanTabConf::EndOfData; ops++;
c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
}
}
}
-
+
if(scanPtr.p->m_delivered_scan_frags.isEmpty() &&
scanPtr.p->m_running_scan_frags.isEmpty()){
conf->requestInfo = op_count | ScanTabConf::EndOfData;
@@ -10424,9 +10449,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sfp.i,
sfp.p->scanFragState,
sfp.p->scanFragId);
- infoEvent(" nodeid=%d, concurr=%d, timer=%d",
+ infoEvent(" nodeid=%d, timer=%d",
refToNode(sfp.p->lqhBlockref),
- sfp.p->scanFragConcurrency,
sfp.p->scanFragTimer);
}
@@ -10504,7 +10528,7 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sp.p->scanAiLength,
sp.p->scanParallel,
sp.p->scanReceivedOperations,
- sp.p->noOprecPerFrag);
+ sp.p->batch_size_rows);
infoEvent(" schv=%d, tab=%d, sproc=%d",
sp.p->scanSchemaVersion,
sp.p->scanTableref,
diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp
index d11d5f7176a..f6d9a0ac35a 100644
--- a/ndb/src/kernel/blocks/suma/Suma.cpp
+++ b/ndb/src/kernel/blocks/suma/Suma.cpp
@@ -1888,7 +1888,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->requestInfo = 0;
req->savePointId = 0;
ScanFragReq::setLockMode(req->requestInfo, 0);
- ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
+ ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
ScanFragReq::setAttrLen(req->requestInfo, attrLen);
req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp
index c21a85fd24d..aecc7124d15 100644
--- a/ndb/src/ndbapi/NdbConnection.cpp
+++ b/ndb/src/ndbapi/NdbConnection.cpp
@@ -1577,9 +1577,6 @@ from other transactions.
/**
* There's always a TCKEYCONF when using IgnoreError
*/
-#ifdef VM_TRACE
- ndbout_c("Not completing transaction 2");
-#endif
return -1;
}
/**********************************************************************/
@@ -1831,9 +1828,6 @@ NdbConnection::OpCompleteFailure(Uint8 abortOption, bool setFailure)
/**
* There's always a TCKEYCONF when using IgnoreError
*/
-#ifdef VM_TRACE
- ndbout_c("Not completing transaction");
-#endif
return -1;
}
diff --git a/ndb/src/ndbapi/NdbConnectionScan.cpp b/ndb/src/ndbapi/NdbConnectionScan.cpp
index 3fe8993a42b..a7a8f1350b1 100644
--- a/ndb/src/ndbapi/NdbConnectionScan.cpp
+++ b/ndb/src/ndbapi/NdbConnectionScan.cpp
@@ -97,7 +97,7 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
theScanningOp->execCLOSE_SCAN_REP();
return 0;
}
-
+
for(Uint32 i = 0; i<len; i += 3){
Uint32 opCount, totalLen;
Uint32 ptrI = * ops++;
@@ -109,24 +109,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
void * tPtr = theNdb->int2void(ptrI);
assert(tPtr); // For now
NdbReceiver* tOp = theNdb->void2rec(tPtr);
- if (tOp && tOp->checkMagicNumber()){
- if(tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)){
- /**
- *
- */
- theScanningOp->receiver_delivered(tOp);
- } else if(info == ScanTabConf::EndOfData){
+ if (tOp && tOp->checkMagicNumber())
+ {
+ if (tcPtrI == RNIL && opCount == 0)
theScanningOp->receiver_completed(tOp);
- }
- }
- }
- if (conf->requestInfo & ScanTabConf::EndOfData) {
- if(theScanningOp->m_ordered)
- theScanningOp->m_api_receivers_count = 0;
- if(theScanningOp->m_api_receivers_count +
- theScanningOp->m_conf_receivers_count +
- theScanningOp->m_sent_receivers_count){
- abort();
+ else if (tOp->execSCANOPCONF(tcPtrI, totalLen, opCount))
+ theScanningOp->receiver_delivered(tOp);
}
}
return 0;
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index 373fec1a2b0..30b596d7098 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -35,6 +35,8 @@
#include <signaldata/AttrInfo.hpp>
#include <signaldata/TcKeyReq.hpp>
+#define DEBUG_NEXT_RESULT 0
+
NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
NdbOperation(aNdb),
m_resultSet(0),
@@ -275,6 +277,9 @@ NdbScanOperation::fix_receivers(Uint32 parallel){
void
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
if(theError.code == 0){
+ if(DEBUG_NEXT_RESULT)
+ ndbout_c("receiver_delivered");
+
Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1;
if(idx != last){
@@ -298,6 +303,9 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
void
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
if(theError.code == 0){
+ if(DEBUG_NEXT_RESULT)
+ ndbout_c("receiver_completed");
+
Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1;
if(idx != last){
@@ -445,8 +453,6 @@ NdbScanOperation::executeCursor(int nodeId){
return -1;
}
-#define DEBUG_NEXT_RESULT 0
-
int NdbScanOperation::nextResult(bool fetchAllowed)
{
if(m_ordered)
@@ -579,7 +585,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
int
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
- if(cnt > 0 || stopScanFlag){
+ if(cnt > 0)
+ {
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
@@ -595,33 +602,40 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
*/
Uint32 last = m_sent_receivers_count;
Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
+ Uint32 sent = 0;
for(Uint32 i = 0; i<cnt; i++){
NdbReceiver * tRec = m_api_receivers[i];
- m_sent_receivers[last+i] = tRec;
- tRec->m_list_index = last+i;
- prep_array[i] = tRec->m_tcPtrI;
- tRec->prepareSend();
+ if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
+ {
+ m_sent_receivers[last+sent] = tRec;
+ tRec->m_list_index = last+sent;
+ tRec->prepareSend();
+ sent++;
+ }
}
memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*));
- Uint32 nodeId = theNdbCon->theDBnode;
- TransporterFacade * tp = TransporterFacade::instance();
- int ret;
- if(cnt > 21){
- tSignal.setLength(4);
- LinearSectionPtr ptr[3];
- ptr[0].p = prep_array;
- ptr[0].sz = cnt;
- ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
- } else {
- tSignal.setLength(4+cnt);
- ret = tp->sendSignal(&tSignal, nodeId);
+ int ret = 0;
+ if(sent)
+ {
+ Uint32 nodeId = theNdbCon->theDBnode;
+ TransporterFacade * tp = TransporterFacade::instance();
+ if(cnt > 21 && !stopScanFlag){
+ tSignal.setLength(4);
+ LinearSectionPtr ptr[3];
+ ptr[0].p = prep_array;
+ ptr[0].sz = sent;
+ ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
+ } else {
+ tSignal.setLength(4+(stopScanFlag ? 0 : sent));
+ ret = tp->sendSignal(&tSignal, nodeId);
+ }
}
-
- m_sent_receivers_count = last + cnt + stopScanFlag;
+
+ m_sent_receivers_count = last + sent;
m_api_receivers_count -= cnt;
m_current_api_receiver = 0;
-
+
return ret;
}
return 0;
@@ -1412,10 +1426,22 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
if(idx == theParallelism)
return 0;
+ NdbReceiver* tRec = m_api_receivers[idx];
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
+ Uint32 last = m_sent_receivers_count;
Uint32* theData = tSignal.getDataPtrSend();
+ Uint32* prep_array = theData + 4;
+
+ m_current_api_receiver = idx + 1;
+ if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
+ {
+ if(DEBUG_NEXT_RESULT)
+ ndbout_c("receiver completed, don't send");
+ return 0;
+ }
+
theData[0] = theNdbCon->theTCConPtr;
theData[1] = 0;
Uint64 transId = theNdbCon->theTransactionId;
@@ -1425,17 +1451,10 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
/**
* Prepare ops
*/
- Uint32 last = m_sent_receivers_count;
- Uint32 * prep_array = theData + 4;
-
- NdbReceiver * tRec = m_api_receivers[idx];
m_sent_receivers[last] = tRec;
tRec->m_list_index = last;
- prep_array[0] = tRec->m_tcPtrI;
tRec->prepareSend();
-
m_sent_receivers_count = last + 1;
- m_current_api_receiver = idx + 1;
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
@@ -1448,12 +1467,17 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
- if(seq != tp->getNodeSequence(nodeId)){
+ if(seq != tp->getNodeSequence(nodeId))
+ {
theNdbCon->theReleaseOnClose = true;
return -1;
}
- while(theError.code == 0 && m_sent_receivers_count){
+ /**
+ * Wait for outstanding
+ */
+ while(theError.code == 0 && m_sent_receivers_count)
+ {
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@@ -1471,18 +1495,52 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
}
}
- if(m_api_receivers_count+m_conf_receivers_count){
- // Send close scan
- if(send_next_scan(0, true) == -1){ // Close scan
- theNdbCon->theReleaseOnClose = true;
- return -1;
- }
+ /**
+ * move all conf'ed into api
+ * so that send_next_scan can check if they needs to be closed
+ */
+ Uint32 api = m_api_receivers_count;
+ Uint32 conf = m_conf_receivers_count;
+
+ if(m_ordered)
+ {
+ /**
+ * Ordered scan, keep the m_api_receivers "to the right"
+ */
+ memmove(m_api_receivers, m_api_receivers+m_current_api_receiver,
+ (theParallelism - m_current_api_receiver) * sizeof(char*));
+ api = (theParallelism - m_current_api_receiver);
+ m_api_receivers_count = api;
+ }
+
+ if(DEBUG_NEXT_RESULT)
+ ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
+ m_ordered, api, conf,
+ m_sent_receivers_count, m_current_api_receiver, theParallelism);
+
+ if(api+conf)
+ {
+ /**
+ * There's something to close
+ * setup m_api_receivers (for send_next_scan)
+ */
+ memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
+ m_api_receivers_count = api + conf;
+ m_conf_receivers_count = 0;
+ }
+
+ // Send close scan
+ if(send_next_scan(api+conf, true) == -1)
+ {
+ theNdbCon->theReleaseOnClose = true;
+ return -1;
}
/**
* wait for close scan conf
*/
- while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
+ while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
+ {
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@@ -1499,6 +1557,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
return -1;
}
}
+
return 0;
}
diff --git a/ndb/test/include/HugoTransactions.hpp b/ndb/test/include/HugoTransactions.hpp
index 19e4cb43336..b833f2ac629 100644
--- a/ndb/test/include/HugoTransactions.hpp
+++ b/ndb/test/include/HugoTransactions.hpp
@@ -36,15 +36,21 @@ public:
bool allowConstraintViolation = true,
int doSleep = 0,
bool oneTrans = false);
+
int scanReadRecords(Ndb*,
int records,
int abort = 0,
int parallelism = 0,
- bool committed = false);
- int scanReadCommittedRecords(Ndb*,
- int records,
- int abort = 0,
- int parallelism = 0);
+ NdbOperation::LockMode = NdbOperation::LM_Read);
+
+ int scanReadRecords(Ndb*,
+ const NdbDictionary::Index*,
+ int records,
+ int abort = 0,
+ int parallelism = 0,
+ NdbOperation::LockMode = NdbOperation::LM_Read,
+ bool sorted = false);
+
int pkReadRecords(Ndb*,
int records,
int batchsize = 1,
diff --git a/ndb/test/include/UtilTransactions.hpp b/ndb/test/include/UtilTransactions.hpp
index 37cd99550a5..23902f3b317 100644
--- a/ndb/test/include/UtilTransactions.hpp
+++ b/ndb/test/include/UtilTransactions.hpp
@@ -53,11 +53,11 @@ public:
int selectCount(Ndb*,
int parallelism = 0,
int* count_rows = NULL,
- ScanLock lock = SL_Read,
+ NdbOperation::LockMode lm = NdbOperation::LM_CommittedRead,
NdbConnection* pTrans = NULL);
int scanReadRecords(Ndb*,
int parallelism,
- bool exclusive,
+ NdbOperation::LockMode lm,
int records,
int noAttribs,
int* attrib_list,
diff --git a/ndb/test/ndbapi/testScan.cpp b/ndb/test/ndbapi/testScan.cpp
index 0cd30dfefde..51913e8fbf9 100644
--- a/ndb/test/ndbapi/testScan.cpp
+++ b/ndb/test/ndbapi/testScan.cpp
@@ -242,8 +242,9 @@ int runScanReadCommitted(NDBT_Context* ctx, NDBT_Step* step){
HugoTransactions hugoTrans(*ctx->getTab());
while (i<loops && !ctx->isTestStopped()) {
g_info << i << ": ";
- if (hugoTrans.scanReadCommittedRecords(GETNDB(step), records,
- abort, parallelism) != 0){
+ if (hugoTrans.scanReadRecords(GETNDB(step), records,
+ abort, parallelism,
+ NdbOperation::LM_CommittedRead) != 0){
return NDBT_FAILED;
}
i++;
@@ -639,7 +640,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){
g_info << (unsigned)i << endl;
if(utilTrans.scanReadRecords(GETNDB(step),
parallelism,
- false,
+ NdbOperation::LM_Read,
records,
alist.attriblist[i]->numAttribs,
alist.attriblist[i]->attribs) != 0){
@@ -647,7 +648,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){
}
if(utilTrans.scanReadRecords(GETNDB(step),
parallelism,
- true,
+ NdbOperation::LM_Read,
records,
alist.attriblist[i]->numAttribs,
alist.attriblist[i]->attribs) != 0){
diff --git a/ndb/test/src/HugoTransactions.cpp b/ndb/test/src/HugoTransactions.cpp
index 456bfffbb77..096f5406bbf 100644
--- a/ndb/test/src/HugoTransactions.cpp
+++ b/ndb/test/src/HugoTransactions.cpp
@@ -29,26 +29,175 @@ HugoTransactions::~HugoTransactions(){
deallocRows();
}
-
-int HugoTransactions::scanReadCommittedRecords(Ndb* pNdb,
+int
+HugoTransactions::scanReadRecords(Ndb* pNdb,
int records,
int abortPercent,
- int parallelism){
- return scanReadRecords(pNdb, records, abortPercent, parallelism, true);
+ int parallelism,
+ NdbOperation::LockMode lm)
+{
+
+ int retryAttempt = 0;
+ const int retryMax = 100;
+ int check, a;
+ NdbConnection *pTrans;
+ NdbScanOperation *pOp;
+
+ while (true){
+
+ if (retryAttempt >= retryMax){
+ g_err << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL) {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ pOp = pTrans->getNdbScanOperation(tab.getName());
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ NdbResultSet * rs;
+ rs = pOp ->readTuples(lm);
+
+ if( rs == 0 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ check = pOp->interpret_exit_ok();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ for(a = 0; a<tab.getNoOfColumns(); a++){
+ if((row.attributeStore(a) =
+ pOp->getValue(tab.getColumn(a)->getName())) == 0) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+ }
+
+ check = pTrans->execute(NoCommit);
+ if( check == -1 ) {
+ const NdbError err = pTrans->getNdbError();
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ // Abort after 1-100 or 1-records rows
+ int ranVal = rand();
+ int abortCount = ranVal % (records == 0 ? 100 : records);
+ bool abortTrans = false;
+ if (abort > 0){
+ // Abort if abortCount is less then abortPercent
+ if (abortCount < abortPercent)
+ abortTrans = true;
+ }
+
+ int eof;
+ int rows = 0;
+ while((eof = rs->nextResult(true)) == 0){
+ rows++;
+ if (calc.verifyRowValues(&row) != 0){
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ if (abortCount == rows && abortTrans == true){
+ ndbout << "Scan is aborted" << endl;
+ g_info << "Scan is aborted" << endl;
+ rs->close();
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ pNdb->closeTransaction(pTrans);
+ return NDBT_OK;
+ }
+ }
+ if (eof == -1) {
+ const NdbError err = pTrans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR_INFO(err);
+ pNdb->closeTransaction(pTrans);
+ NdbSleep_MilliSleep(50);
+ switch (err.code){
+ case 488:
+ case 245:
+ case 490:
+ // Too many active scans, no limit on number of retry attempts
+ break;
+ default:
+ retryAttempt++;
+ }
+ continue;
+ }
+ ERR(err);
+ pNdb->closeTransaction(pTrans);
+ return NDBT_FAILED;
+ }
+
+ pNdb->closeTransaction(pTrans);
+
+ g_info << rows << " rows have been read" << endl;
+ if (records != 0 && rows != records){
+ g_err << "Check expected number of records failed" << endl
+ << " expected=" << records <<", " << endl
+ << " read=" << rows << endl;
+ return NDBT_FAILED;
+ }
+
+ return NDBT_OK;
+ }
+ return NDBT_FAILED;
}
int
HugoTransactions::scanReadRecords(Ndb* pNdb,
+ const NdbDictionary::Index * pIdx,
int records,
int abortPercent,
int parallelism,
- bool committed){
+ NdbOperation::LockMode lm,
+ bool sorted)
+{
int retryAttempt = 0;
const int retryMax = 100;
int check, a;
NdbConnection *pTrans;
- NdbScanOperation *pOp;
+ NdbIndexScanOperation *pOp;
while (true){
@@ -72,7 +221,7 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
return NDBT_FAILED;
}
- pOp = pTrans->getNdbScanOperation(tab.getName());
+ pOp = pTrans->getNdbIndexScanOperation(pIdx->getName(), tab.getName());
if (pOp == NULL) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
@@ -80,8 +229,7 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
}
NdbResultSet * rs;
- rs = pOp ->readTuples(committed ? NdbScanOperation::LM_CommittedRead :
- NdbScanOperation::LM_Read);
+ rs = pOp ->readTuples(lm, 0, parallelism, sorted);
if( rs == 0 ) {
ERR(pTrans->getNdbError());
diff --git a/ndb/test/src/UtilTransactions.cpp b/ndb/test/src/UtilTransactions.cpp
index c0e6effd244..869f7fc76cb 100644
--- a/ndb/test/src/UtilTransactions.cpp
+++ b/ndb/test/src/UtilTransactions.cpp
@@ -619,7 +619,7 @@ UtilTransactions::addRowToInsert(Ndb* pNdb,
int
UtilTransactions::scanReadRecords(Ndb* pNdb,
int parallelism,
- bool exclusive,
+ NdbOperation::LockMode lm,
int records,
int noAttribs,
int *attrib_list,
@@ -669,10 +669,7 @@ UtilTransactions::scanReadRecords(Ndb* pNdb,
return NDBT_FAILED;
}
- NdbResultSet * rs = pOp->readTuples(exclusive ?
- NdbScanOperation::LM_Exclusive :
- NdbScanOperation::LM_Read,
- 0, parallelism);
+ NdbResultSet * rs = pOp->readTuples(lm, 0, parallelism);
if( rs == 0 ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
@@ -761,7 +758,7 @@ int
UtilTransactions::selectCount(Ndb* pNdb,
int parallelism,
int* count_rows,
- ScanLock lock,
+ NdbOperation::LockMode lm,
NdbConnection* pTrans){
int retryAttempt = 0;
@@ -785,19 +782,7 @@ UtilTransactions::selectCount(Ndb* pNdb,
return NDBT_FAILED;
}
- NdbResultSet * rs;
- switch(lock){
- case SL_ReadHold:
- rs = pOp->readTuples(NdbScanOperation::LM_Read);
- break;
- case SL_Exclusive:
- rs = pOp->readTuples(NdbScanOperation::LM_Exclusive);
- break;
- case SL_Read:
- default:
- rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead);
- }
-
+ NdbResultSet * rs = pOp->readTuples(lm);
if( rs == 0) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
diff --git a/ndb/test/tools/create_index.cpp b/ndb/test/tools/create_index.cpp
index 75a657522f6..6e4c5377f4a 100644
--- a/ndb/test/tools/create_index.cpp
+++ b/ndb/test/tools/create_index.cpp
@@ -30,7 +30,7 @@ main(int argc, const char** argv){
const char* _dbname = "TEST_DB";
int _help = 0;
- int _ordered, _pk;
+ int _ordered = 0, _pk = 1;
struct getargs args[] = {
{ "database", 'd', arg_string, &_dbname, "dbname",
diff --git a/ndb/test/tools/hugoScanRead.cpp b/ndb/test/tools/hugoScanRead.cpp
index cdfdcea4654..42180207a8a 100644
--- a/ndb/test/tools/hugoScanRead.cpp
+++ b/ndb/test/tools/hugoScanRead.cpp
@@ -35,13 +35,17 @@ int main(int argc, const char** argv){
int _parallelism = 1;
const char* _tabname = NULL;
int _help = 0;
-
+ int lock = NdbOperation::LM_Read;
+ int sorted = 0;
+
struct getargs args[] = {
{ "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" },
{ "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" },
{ "parallelism", 'p', arg_integer, &_parallelism, "parallelism(1-240)", "para" },
{ "records", 'r', arg_integer, &_records, "Number of records", "recs" },
- { "usage", '?', arg_flag, &_help, "Print help", "" }
+ { "usage", '?', arg_flag, &_help, "Print help", "" },
+ { "lock", 'm', arg_integer, &lock, "lock mode", "" },
+ { "sorted", 's', arg_flag, &sorted, "sorted", "" }
};
int num_args = sizeof(args) / sizeof(args[0]);
int optind = 0;
@@ -73,16 +77,48 @@ int main(int argc, const char** argv){
ndbout << " Table " << _tabname << " does not exist!" << endl;
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
+
+ const NdbDictionary::Index * pIdx = 0;
+ if(optind+1 < argc)
+ {
+ pIdx = MyNdb.getDictionary()->getIndex(argv[optind+1], _tabname);
+ if(!pIdx)
+ ndbout << " Index " << argv[optind+1] << " not found" << endl;
+ else
+ if(pIdx->getType() != NdbDictionary::Index::UniqueOrderedIndex &&
+ pIdx->getType() != NdbDictionary::Index::OrderedIndex)
+ {
+ ndbout << " Index " << argv[optind+1] << " is not scannable" << endl;
+ pIdx = 0;
+ }
+ }
HugoTransactions hugoTrans(*pTab);
int i = 0;
while (i<_loops || _loops==0) {
ndbout << i << ": ";
- if(hugoTrans.scanReadRecords(&MyNdb,
- 0,
- _abort,
- _parallelism) != 0){
- return NDBT_ProgramExit(NDBT_FAILED);
+ if(!pIdx)
+ {
+ if(hugoTrans.scanReadRecords(&MyNdb,
+ 0,
+ _abort,
+ _parallelism,
+ (NdbOperation::LockMode)lock) != 0)
+ {
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
+ }
+ else
+ {
+ if(hugoTrans.scanReadRecords(&MyNdb, pIdx,
+ 0,
+ _abort,
+ _parallelism,
+ (NdbOperation::LockMode)lock,
+ sorted) != 0)
+ {
+ return NDBT_ProgramExit(NDBT_FAILED);
+ }
}
i++;
}
diff --git a/ndb/tools/select_all.cpp b/ndb/tools/select_all.cpp
index 758c1e48c88..9c944c8c93e 100644
--- a/ndb/tools/select_all.cpp
+++ b/ndb/tools/select_all.cpp
@@ -133,13 +133,18 @@ int main(int argc, char** argv){
const NdbDictionary::Table* pTab = NDBT_Table::discoverTableFromDb(&MyNdb, _tabname);
const NdbDictionary::Index * pIdx = 0;
if(argc > 1){
- pIdx = MyNdb.getDictionary()->getIndex(argv[0], _tabname);
+ pIdx = MyNdb.getDictionary()->getIndex(argv[1], _tabname);
}
if(pTab == NULL){
ndbout << " Table " << _tabname << " does not exist!" << endl;
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
+
+ if(argc > 1 && pIdx == 0)
+ {
+ ndbout << " Index " << argv[1] << " does not exists" << endl;
+ }
if(_order && pIdx == NULL){
ndbout << " Order flag given without an index" << endl;