summaryrefslogtreecommitdiff
path: root/ndb/src
diff options
context:
space:
mode:
authorunknown <mronstrom@mysql.com>2004-08-17 17:55:47 +0200
committerunknown <mronstrom@mysql.com>2004-08-17 17:55:47 +0200
commitf03022b22a6850d74608be22e9748ed009545e61 (patch)
treeb4f59c85706b09d09400927cafbe760a1f14e5bc /ndb/src
parent6a908c4322a4d63f222e5206f7ab5c3e1a82cb0e (diff)
downloadmariadb-git-f03022b22a6850d74608be22e9748ed009545e61.tar.gz
Fix bugs + include check of batch_byte_size and
use of first_batch_size ndb/include/kernel/ndb_limits.h: New maximum size ndb/include/kernel/signaldata/ScanFrag.hpp: New error code ndb/include/kernel/signaldata/ScanTab.hpp: Need to go to Uint16 when batch size > 255 ndb/src/kernel/blocks/dblqh/Dblqh.hpp: Uint8 => Uint16 when batch_size > 255 New and changed methods for acc ptr's and checking end of scan batch (incl. check of batch_byte_size ndb/src/kernel/blocks/dblqh/DblqhMain.cpp: Uint8 => Uint16 when batch_size > 255 New and changed methods for acc ptr's and checking end of scan batch (incl. check of batch_byte_size
Diffstat (limited to 'ndb/src')
-rw-r--r--ndb/src/kernel/blocks/dblqh/Dblqh.hpp19
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhMain.cpp104
2 files changed, 79 insertions, 44 deletions
diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
index f153083f79c..882b0ee945d 100644
--- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
+++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
@@ -538,8 +538,10 @@ public:
UintR scanApiOpPtr;
UintR scanLocalref[2];
Uint32 scan_batch_len;
+ Uint32 batch_size;
Uint32 first_batch_size;
Uint32 batch_byte_size;
+
UintR copyPtr;
union {
Uint32 nextPool;
@@ -569,14 +571,15 @@ public:
ScanType scanType;
BlockReference scanApiBlockref;
NodeId scanNodeId;
+ Uint16 scanReleaseCounter;
+ Uint16 scanNumber;
+
Uint8 scanCompletedStatus;
Uint8 scanFlag;
Uint8 scanLockHold;
Uint8 scanLockMode;
Uint8 readCommitted;
Uint8 rangeScan;
- Uint8 scanNumber;
- Uint8 scanReleaseCounter;
Uint8 scanTcWaiting;
Uint8 scanKeyinfoFlag;
}; // Size 272 bytes
@@ -2225,9 +2228,10 @@ private:
void init_acc_ptr_list(ScanRecord*);
bool seize_acc_ptr_list(ScanRecord*, Uint32);
void release_acc_ptr_list(ScanRecord*);
- Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32);
+ Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32, bool);
void set_acc_ptr_in_scan_record(ScanRecord*, Uint32, Uint32);
- void get_acc_ptr(ScanRecord*, Uint32*, Uint32);
+ void i_get_acc_ptr(ScanRecord*, Uint32*&, Uint32);
+ bool check_scan_batch_completed(ScanRecord*);
void removeTable(Uint32 tableId);
void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId);
@@ -2926,4 +2930,11 @@ public:
DLHashTable<ScanRecord> c_scanTakeOverHash;
};
+inline
+bool
+Dblqh::check_scan_batch_completed(ScanRecord* scanP)
+{
+ return (scanP->scanCompletedOperations == scanP->scanConcurrentOperations) ||
+ (scanP->scan_batch_len >= scanP->batch_byte_size);
+}
#endif
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index 2394695d769..87a6eebda6f 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -3581,7 +3581,9 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
takeOverErrorLab(signal);
return;
}//if
- Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, ttcScanOp);
+ Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
+ ttcScanOp,
+ true);
if (accOpPtr == RNIL) {
jam();
releaseActiveFrag(signal);
@@ -7023,7 +7025,9 @@ void Dblqh::continueScanReleaseAfterBlockedLab(Signal* signal)
scanptr.p->scanState = ScanRecord::WAIT_RELEASE_LOCK;
signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1]=
- get_acc_ptr_from_scan_record(scanptr.p, scanptr.p->scanReleaseCounter -1);
+ get_acc_ptr_from_scan_record(scanptr.p,
+ scanptr.p->scanReleaseCounter -1,
+ false);
signal->theData[2] = NextScanReq::ZSCAN_COMMIT;
if (! scanptr.p->rangeScan)
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
@@ -7181,8 +7185,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
(scanptr.p->scanCompletedStatus == ZTRUE)) {
jam();
closeScanLab(signal);
- } else if ((scanptr.p->scanConcurrentOperations ==
- scanptr.p->scanCompletedOperations) &&
+ } else if (check_scan_batch_completed(scanptr.p) &&
scanptr.p->scanLockHold != ZTRUE) {
jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
@@ -7279,31 +7282,36 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP)
inline
void
-Dblqh::get_acc_ptr(ScanRecord* scanP, Uint32 *acc_ptr, Uint32 index)
+Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
{
if (index == 0) {
- jam();
- acc_ptr= &scanP->scan_acc_op_ptr[0];
+ acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
} else {
Uint32 attr_buf_index, attr_buf_rec;
+
AttrbufPtr regAttrPtr;
jam();
- attr_buf_rec= (index + 30) / 32;
+ attr_buf_rec= (index + 31) / 32;
attr_buf_index= (index - 1) & 31;
regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
- acc_ptr= &regAttrPtr.p->attrbuf[attr_buf_index];
+ acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
}
}
Uint32
-Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index)
+Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP,
+ Uint32 index,
+ bool crash_flag)
{
- Uint32 *acc_ptr;
+ Uint32* acc_ptr;
Uint32 attr_buf_rec, attr_buf_index;
- ndbrequire((index < MAX_PARALLEL_OP_PER_SCAN) &&
- index < scanP->scan_acc_index);
- get_acc_ptr(scanP, acc_ptr, index);
+ if (!((index < MAX_PARALLEL_OP_PER_SCAN) &&
+ index < scanP->scan_acc_index)) {
+ ndbrequire(crash_flag);
+ return RNIL;
+ }
+ i_get_acc_ptr(scanP, acc_ptr, index);
return *acc_ptr;
}
@@ -7315,7 +7323,7 @@ Dblqh::set_acc_ptr_in_scan_record(ScanRecord* scanP,
ndbrequire((index == 0 || scanP->scan_acc_index == index) &&
(index < MAX_PARALLEL_OP_PER_SCAN));
scanP->scan_acc_index= index + 1;
- get_acc_ptr(scanP, acc_ptr, index);
+ i_get_acc_ptr(scanP, acc_ptr, index);
*acc_ptr= acc;
}
@@ -7882,10 +7890,11 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
GSN_ACC_CHECK_SCAN, signal, 2, JBB);
return;
}//if
-
+ jam();
set_acc_ptr_in_scan_record(scanptr.p,
scanptr.p->scanCompletedOperations,
nextScanConf->accOperationPtr);
+ jam();
scanptr.p->scanLocalref[0] = nextScanConf->localKey[0];
scanptr.p->scanLocalref[1] = nextScanConf->localKey[1];
scanptr.p->scanLocalFragid = nextScanConf->fragId;
@@ -7904,6 +7913,7 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
return;
}//if
}//if
+ jam();
nextScanConfLoopLab(signal);
}//Dblqh::nextScanConfScanLab()
@@ -7926,7 +7936,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
closeScanLab(signal);
return;
}//if
-
+ jam();
Uint32 tableRef;
Uint32 tupFragPtr;
Uint32 reqinfo = (scanptr.p->scanLockHold == ZFALSE);
@@ -7960,6 +7970,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
}//if
}
{
+ jam();
TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend();
tupKeyReq->connectPtr = tcConnectptr.p->tupConnectrec;
@@ -8062,26 +8073,27 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->scan_batch_len+= tdata4;
scanptr.p->scanCompletedOperations++;
- if ((scanptr.p->scanCompletedOperations ==
- scanptr.p->scanConcurrentOperations) &&
- (scanptr.p->scanLockHold == ZTRUE)) {
- jam();
- scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
- sendScanFragConf(signal, ZFALSE);
- return;
- } else if (scanptr.p->scanCompletedOperations ==
- scanptr.p->scanConcurrentOperations) {
- jam();
- scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations;
- scanReleaseLocksLab(signal);
- return;
- } else if (scanptr.p->scanLockHold == ZTRUE) {
- jam();
- scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
+ if (check_scan_batch_completed(scanptr.p)) {
+ if (scanptr.p->scanLockHold == ZTRUE) {
+ jam();
+ scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
+ sendScanFragConf(signal, ZFALSE);
+ return;
+ } else {
+ jam();
+ scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations;
+ scanReleaseLocksLab(signal);
+ return;
+ }
} else {
- jam();
- scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
- }//if
+ if (scanptr.p->scanLockHold == ZTRUE) {
+ jam();
+ scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
+ } else {
+ jam();
+ scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
+ }
+ }
scanNextLoopLab(signal);
}//Dblqh::scanTupkeyConfLab()
@@ -8123,11 +8135,13 @@ void Dblqh::continueScanAfterBlockedLab(Signal* signal)
jam();
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
- scanptr.p->scanCompletedOperations);
+ scanptr.p->scanCompletedOperations,
+ false);
} else if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_COMMIT) {
jam();
accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
- scanptr.p->scanCompletedOperations);
+ scanptr.p->scanCompletedOperations,
+ false);
} else {
jam();
accOpPtr = RNIL; // The value is not used in ACC
@@ -8355,7 +8369,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion;
scanptr.p->scanCompletedOperations = 0;
scanptr.p->scan_batch_len= 0;
- scanptr.p->scanConcurrentOperations = scanConcurrentOperations;
+ scanptr.p->scanConcurrentOperations = scanFragReq->first_batch_size;
+ scanptr.p->batch_size= scanConcurrentOperations;
scanptr.p->batch_byte_size= scanFragReq->batch_byte_size;
scanptr.p->first_batch_size= scanFragReq->first_batch_size;
scanptr.p->scanErrorCounter = 0;
@@ -8371,6 +8386,14 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanNumber = ~0;
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
+ ndbout << "batch_size = " << scanptr.p->batch_size;
+ ndbout << " first_batch_size = " << scanptr.p->scanConcurrentOperations;
+ ndbout << endl;
+ if ((scanptr.p->scanConcurrentOperations == 0) ||
+ (scanptr.p->scanConcurrentOperations > scanptr.p->batch_size)) {
+ jam();
+ return ScanFragRef::ZWRONG_BATCH_SIZE;
+ }
if (!seize_acc_ptr_list(scanptr.p, scanConcurrentOperations)) {
jam();
return ScanFragRef::ZTOO_MANY_ACTIVE_SCAN_ERROR;
@@ -8693,6 +8716,7 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
{
Uint32 completed_ops= scanptr.p->scanCompletedOperations;
Uint32 total_len= scanptr.p->scan_batch_len;
+ scanptr.p->scanConcurrentOperations= scanptr.p->batch_size;
scanptr.p->scanTcWaiting = ZFALSE;
if(ERROR_INSERTED(5037)){
@@ -9276,7 +9300,7 @@ void Dblqh::continueCopyAfterBlockedLab(Signal* signal)
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
tcConnectptr.p->errorCode = 0;
- Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0);
+ Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0, false);
signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = acc_op_ptr;
signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT;