summaryrefslogtreecommitdiff
path: root/ndb/src/kernel
diff options
context:
space:
mode:
authorunknown <joreland@mysql.com>2004-11-18 21:28:38 +0100
committerunknown <joreland@mysql.com>2004-11-18 21:28:38 +0100
commit5bf0d35c4fc0bf9e5d22394bb2c0114c2be6ba10 (patch)
treea6375b059799a3409ab854205ee378c73bd7a626 /ndb/src/kernel
parent6aab88c9169960da307d39eb7b9977f215c63508 (diff)
parent041d153db21109f8b99252f0905439a2c775c81a (diff)
downloadmariadb-git-5bf0d35c4fc0bf9e5d22394bb2c0114c2be6ba10.tar.gz
Merge mysql.com:/home/jonas/src/mysql-4.1
into mysql.com:/home/jonas/src/wl2077 ndb/src/kernel/blocks/dblqh/Dblqh.hpp: Auto merged ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: Auto merged ndb/src/ndbapi/NdbScanOperation.cpp: Auto merged
Diffstat (limited to 'ndb/src/kernel')
-rw-r--r--ndb/src/kernel/blocks/dblqh/Dblqh.hpp19
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhMain.cpp54
-rw-r--r--ndb/src/kernel/blocks/dbtc/Dbtc.hpp11
-rw-r--r--ndb/src/kernel/blocks/dbtc/DbtcMain.cpp80
-rw-r--r--ndb/src/kernel/blocks/suma/Suma.cpp2
5 files changed, 105 insertions, 61 deletions
diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
index 739c3c741fb..6c8c2bb2dae 100644
--- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
+++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
@@ -2925,4 +2925,23 @@ Dblqh::ScanRecord::check_scan_batch_completed() const
(max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes));
}
+inline
+void
+Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
+{
+ if (index == 0) {
+ acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
+ } else {
+ Uint32 attr_buf_index, attr_buf_rec;
+
+ AttrbufPtr regAttrPtr;
+ jam();
+ attr_buf_rec= (index + 31) / 32;
+ attr_buf_index= (index - 1) & 31;
+ regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
+ ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
+ acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
+ }
+}
+
#endif
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index 5622706a96c..fc0f39a4f5f 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -7161,10 +7161,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
// Update timer on tcConnectRecord
tcConnectptr.p->tcTimer = cLqhTimeOutCount;
-
init_acc_ptr_list(scanptr.p);
- scanptr.p->m_curr_batch_size_rows = 0;
- scanptr.p->m_curr_batch_size_bytes= 0;
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
scanNextLoopLab(signal);
}//Dblqh::continueScanNextReqLab()
@@ -7363,22 +7360,32 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
tcConnectptr.i = scanptr.p->scanTcrec;
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
releaseActiveFrag(signal);
+
if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) {
if ((scanptr.p->scanErrorCounter > 0) ||
(scanptr.p->scanCompletedStatus == ZTRUE)) {
jam();
+ scanptr.p->m_curr_batch_size_rows = 0;
+ scanptr.p->m_curr_batch_size_bytes = 0;
closeScanLab(signal);
} else if (scanptr.p->check_scan_batch_completed() &&
scanptr.p->scanLockHold != ZTRUE) {
jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
+ } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
+ jam();
+ closeScanLab(signal);
+ return;
} else {
jam();
/*
- We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only
- come here when scanHoldLock == ZTRUE
- */
+ * We came here after releasing locks after
+ * receiving SCAN_NEXTREQ from TC. We only come here
+ * when scanHoldLock == ZTRUE
+ */
+ scanptr.p->m_curr_batch_size_rows = 0;
+ scanptr.p->m_curr_batch_size_bytes = 0;
continueScanNextReqLab(signal);
}//if
} else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) {
@@ -7465,25 +7472,6 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP)
scanP->scan_acc_index = 0;
}
-inline
-void
-Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
-{
- if (index == 0) {
- acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
- } else {
- Uint32 attr_buf_index, attr_buf_rec;
-
- AttrbufPtr regAttrPtr;
- jam();
- attr_buf_rec= (index + 31) / 32;
- attr_buf_index= (index - 1) & 31;
- regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
- ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
- acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
- }
-}
-
Uint32
Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP,
Uint32 index,
@@ -8007,6 +7995,13 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
/*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
************************************************************ */
+ if (!scanptr.p->scanLockHold)
+ {
+ jam();
+ closeScanLab(signal);
+ return;
+ }
+
if (scanptr.p->scanCompletedStatus == ZTRUE) {
if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) {
@@ -8507,8 +8502,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
ScanFragRef::SignalLength, JBB);
} else {
jam();
- scanptr.p->m_curr_batch_size_rows = 0;
- scanptr.p->m_curr_batch_size_bytes= 0;
sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
}//if
finishScanrec(signal);
@@ -8912,6 +8905,13 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
conf->total_len= total_len;
sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF,
signal, ScanFragConf::SignalLength, JBB);
+
+ if(!scanptr.p->scanLockHold)
+ {
+ jam();
+ scanptr.p->m_curr_batch_size_rows = 0;
+ scanptr.p->m_curr_batch_size_bytes= 0;
+ }
}//Dblqh::sendScanFragConf()
/* ######################################################################### */
diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
index a209df24c44..fb90ccc8c90 100644
--- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
+++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
@@ -1054,9 +1054,8 @@ public:
// Id of the ScanRecord this fragment scan belongs to
Uint32 scanRec;
- // The maximum number of operations that can be scanned before
- // returning to TC
- Uint16 scanFragConcurrency;
+ // The value of fragmentCompleted in the last received SCAN_FRAGCONF
+ Uint8 m_scan_frag_conf_status;
inline void startFragTimer(Uint32 timeVal){
scanFragTimer = timeVal;
@@ -1193,8 +1192,10 @@ public:
// Number of operation records per scanned fragment
// Number of operations in first batch
// Max number of bytes per batch
- Uint16 noOprecPerFrag;
- Uint16 first_batch_size;
+ union {
+ Uint16 first_batch_size_rows;
+ Uint16 batch_size_rows;
+ };
Uint32 batch_byte_size;
Uint32 scanRequestInfo; // ScanFrag format
diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
index d8b3ee10532..c7f467484bd 100644
--- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
+++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
@@ -8646,9 +8646,9 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
scanptr.p->scanTableref = tabptr.i;
scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion;
scanptr.p->scanParallel = scanParallel;
- scanptr.p->noOprecPerFrag = noOprecPerFrag;
- scanptr.p->first_batch_size= scanTabReq->first_batch_size;
- scanptr.p->batch_byte_size= scanTabReq->batch_byte_size;
+ scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size;
+ scanptr.p->batch_byte_size = scanTabReq->batch_byte_size;
+ scanptr.p->batch_size_rows = noOprecPerFrag;
Uint32 tmp = 0;
const UintR ri = scanTabReq->requestInfo;
@@ -8672,7 +8672,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
ndbrequire(list.seize(ptr));
ptr.p->scanRec = scanptr.i;
ptr.p->scanFragId = 0;
- ptr.p->scanFragConcurrency = noOprecPerFrag;
ptr.p->m_apiPtr = cdata[i];
}//for
@@ -9141,6 +9140,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0];
const Uint32 noCompletedOps = conf->completedOps;
+ const Uint32 status = conf->fragmentCompleted;
scanFragptr.i = conf->senderData;
c_scan_frag_pool.getPtr(scanFragptr);
@@ -9163,11 +9163,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
- const Uint32 status = conf->fragmentCompleted;
-
if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
jam();
- if(status == ZFALSE){
+ if(status == 0){
/**
* We have started closing = we sent a close -> ignore this
*/
@@ -9184,11 +9182,11 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
return;
}
- if(status == ZCLOSED && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
+ if(noCompletedOps == 0 && status != 0 &&
+ scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
/**
* Start on next fragment
*/
- ndbrequire(noCompletedOps == 0);
scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
scanFragptr.p->startFragTimer(ctcTimer);
@@ -9218,6 +9216,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
scanptr.p->m_queued_count++;
}
+ scanFragptr.p->m_scan_frag_conf_status = status;
scanFragptr.p->m_ops = noCompletedOps;
scanFragptr.p->m_totalLen = total_len;
scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY;
@@ -9330,11 +9329,12 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
// Copy op ptrs so I dont overwrite them when sending...
memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len);
- ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0];
- nextReq->closeFlag = ZFALSE;
- nextReq->transId1 = apiConnectptr.p->transid[0];
- nextReq->transId2 = apiConnectptr.p->transid[1];
- nextReq->batch_size_bytes= scanP->batch_byte_size;
+ ScanFragNextReq tmp;
+ tmp.closeFlag = ZFALSE;
+ tmp.transId1 = apiConnectptr.p->transid[0];
+ tmp.transId2 = apiConnectptr.p->transid[1];
+ tmp.batch_size_rows = scanP->batch_size_rows;
+ tmp.batch_size_bytes = scanP->batch_byte_size;
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
@@ -9344,15 +9344,37 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
c_scan_frag_pool.getPtr(scanFragptr);
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED);
- scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
scanFragptr.p->startFragTimer(ctcTimer);
-
scanFragptr.p->m_ops = 0;
- nextReq->senderData = scanFragptr.i;
- nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency;
- sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
- ScanFragNextReq::SignalLength, JBB);
+ if(scanFragptr.p->m_scan_frag_conf_status)
+ {
+ /**
+ * last scan was complete
+ */
+ jam();
+ ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag);
+ scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
+
+ tcConnectptr.i = scanptr.p->scanTcrec;
+ ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
+ scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++;
+ signal->theData[0] = tcConnectptr.p->dihConnectptr;
+ signal->theData[1] = scanFragptr.i;
+ signal->theData[2] = scanptr.p->scanTableref;
+ signal->theData[3] = scanFragptr.p->scanFragId;
+ sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB);
+ }
+ else
+ {
+ jam();
+ scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
+ ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend();
+ * req = tmp;
+ req->senderData = scanFragptr.i;
+ sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
+ ScanFragNextReq::SignalLength, JBB);
+ }
delivered.remove(scanFragptr);
running.add(scanFragptr);
}//for
@@ -9551,7 +9573,7 @@ void Dbtc::sendScanFragReq(Signal* signal,
req->transId1 = apiConnectptr.p->transid[0];
req->transId2 = apiConnectptr.p->transid[1];
req->clientOpPtr = scanFragP->m_apiPtr;
- req->batch_size_rows= scanFragP->scanFragConcurrency;
+ req->batch_size_rows= scanP->batch_size_rows;
req->batch_size_bytes= scanP->batch_byte_size;
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB);
@@ -9573,6 +9595,8 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
jam();
ops += 21;
}
+
+ Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId;
ScanTabConf * conf = (ScanTabConf*)&signal->theData[0];
conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
@@ -9588,24 +9612,25 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
ScanFragRecPtr curr = ptr; // Remove while iterating...
queued.next(ptr);
+ bool done = curr.p->m_scan_frag_conf_status && --left;
+
* ops++ = curr.p->m_apiPtr;
- * ops++ = curr.i;
+ * ops++ = done ? RNIL : curr.i;
* ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops;
queued.remove(curr);
- if(curr.p->m_ops > 0){
+ if(!done){
delivered.add(curr);
curr.p->scanFragState = ScanFragRec::DELIVERED;
curr.p->stopFragTimer();
} else {
- (* --ops) = ScanTabConf::EndOfData; ops++;
c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
}
}
}
-
+
if(scanPtr.p->m_delivered_scan_frags.isEmpty() &&
scanPtr.p->m_running_scan_frags.isEmpty()){
conf->requestInfo = op_count | ScanTabConf::EndOfData;
@@ -10424,9 +10449,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sfp.i,
sfp.p->scanFragState,
sfp.p->scanFragId);
- infoEvent(" nodeid=%d, concurr=%d, timer=%d",
+ infoEvent(" nodeid=%d, timer=%d",
refToNode(sfp.p->lqhBlockref),
- sfp.p->scanFragConcurrency,
sfp.p->scanFragTimer);
}
@@ -10504,7 +10528,7 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sp.p->scanAiLength,
sp.p->scanParallel,
sp.p->scanReceivedOperations,
- sp.p->noOprecPerFrag);
+ sp.p->batch_size_rows);
infoEvent(" schv=%d, tab=%d, sproc=%d",
sp.p->scanSchemaVersion,
sp.p->scanTableref,
diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp
index d11d5f7176a..f6d9a0ac35a 100644
--- a/ndb/src/kernel/blocks/suma/Suma.cpp
+++ b/ndb/src/kernel/blocks/suma/Suma.cpp
@@ -1888,7 +1888,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->requestInfo = 0;
req->savePointId = 0;
ScanFragReq::setLockMode(req->requestInfo, 0);
- ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
+ ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
ScanFragReq::setAttrLen(req->requestInfo, attrLen);
req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;