diff options
author | unknown <mronstrom@mysql.com> | 2004-08-17 17:55:47 +0200 |
---|---|---|
committer | unknown <mronstrom@mysql.com> | 2004-08-17 17:55:47 +0200 |
commit | f03022b22a6850d74608be22e9748ed009545e61 (patch) | |
tree | b4f59c85706b09d09400927cafbe760a1f14e5bc /ndb/src | |
parent | 6a908c4322a4d63f222e5206f7ab5c3e1a82cb0e (diff) | |
download | mariadb-git-f03022b22a6850d74608be22e9748ed009545e61.tar.gz |
Fix bugs + include check of batch_byte_size and
use of first_batch_size
ndb/include/kernel/ndb_limits.h:
New maximum size
ndb/include/kernel/signaldata/ScanFrag.hpp:
New error code
ndb/include/kernel/signaldata/ScanTab.hpp:
Need to go to Uint16 when batch size > 255
ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
Uint8 => Uint16 when batch_size > 255
New and changed methods for acc ptr's and
checking end of scan batch (incl. check of batch_byte_size
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
Uint8 => Uint16 when batch_size > 255
New and changed methods for acc ptr's and
checking end of scan batch (incl. check of batch_byte_size
Diffstat (limited to 'ndb/src')
-rw-r--r-- | ndb/src/kernel/blocks/dblqh/Dblqh.hpp | 19 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 104 |
2 files changed, 79 insertions, 44 deletions
diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index f153083f79c..882b0ee945d 100644 --- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -538,8 +538,10 @@ public: UintR scanApiOpPtr; UintR scanLocalref[2]; Uint32 scan_batch_len; + Uint32 batch_size; Uint32 first_batch_size; Uint32 batch_byte_size; + UintR copyPtr; union { Uint32 nextPool; @@ -569,14 +571,15 @@ public: ScanType scanType; BlockReference scanApiBlockref; NodeId scanNodeId; + Uint16 scanReleaseCounter; + Uint16 scanNumber; + Uint8 scanCompletedStatus; Uint8 scanFlag; Uint8 scanLockHold; Uint8 scanLockMode; Uint8 readCommitted; Uint8 rangeScan; - Uint8 scanNumber; - Uint8 scanReleaseCounter; Uint8 scanTcWaiting; Uint8 scanKeyinfoFlag; }; // Size 272 bytes @@ -2225,9 +2228,10 @@ private: void init_acc_ptr_list(ScanRecord*); bool seize_acc_ptr_list(ScanRecord*, Uint32); void release_acc_ptr_list(ScanRecord*); - Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32); + Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32, bool); void set_acc_ptr_in_scan_record(ScanRecord*, Uint32, Uint32); - void get_acc_ptr(ScanRecord*, Uint32*, Uint32); + void i_get_acc_ptr(ScanRecord*, Uint32*&, Uint32); + bool check_scan_batch_completed(ScanRecord*); void removeTable(Uint32 tableId); void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId); @@ -2926,4 +2930,11 @@ public: DLHashTable<ScanRecord> c_scanTakeOverHash; }; +inline +bool +Dblqh::check_scan_batch_completed(ScanRecord* scanP) +{ + return (scanP->scanCompletedOperations == scanP->scanConcurrentOperations) || + (scanP->scan_batch_len >= scanP->batch_byte_size); +} #endif diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 2394695d769..87a6eebda6f 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -3581,7 +3581,9 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) takeOverErrorLab(signal); return; }//if - Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, ttcScanOp); + Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, + ttcScanOp, + true); if (accOpPtr == RNIL) { jam(); releaseActiveFrag(signal); @@ -7023,7 +7025,9 @@ void Dblqh::continueScanReleaseAfterBlockedLab(Signal* signal) scanptr.p->scanState = ScanRecord::WAIT_RELEASE_LOCK; signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[1]= - get_acc_ptr_from_scan_record(scanptr.p, scanptr.p->scanReleaseCounter -1); + get_acc_ptr_from_scan_record(scanptr.p, + scanptr.p->scanReleaseCounter -1, + false); signal->theData[2] = NextScanReq::ZSCAN_COMMIT; if (! scanptr.p->rangeScan) sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB); @@ -7181,8 +7185,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal) (scanptr.p->scanCompletedStatus == ZTRUE)) { jam(); closeScanLab(signal); - } else if ((scanptr.p->scanConcurrentOperations == - scanptr.p->scanCompletedOperations) && + } else if (check_scan_batch_completed(scanptr.p) && scanptr.p->scanLockHold != ZTRUE) { jam(); scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; @@ -7279,31 +7282,36 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP) inline void -Dblqh::get_acc_ptr(ScanRecord* scanP, Uint32 *acc_ptr, Uint32 index) +Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index) { if (index == 0) { - jam(); - acc_ptr= &scanP->scan_acc_op_ptr[0]; + acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0]; } else { Uint32 attr_buf_index, attr_buf_rec; + AttrbufPtr regAttrPtr; jam(); - attr_buf_rec= (index + 30) / 32; + attr_buf_rec= (index + 31) / 32; attr_buf_index= (index - 1) & 31; regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); - acc_ptr= ®AttrPtr.p->attrbuf[attr_buf_index]; + acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index]; } } Uint32 -Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index) +Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, + Uint32 index, + bool crash_flag) { - Uint32 *acc_ptr; + Uint32* acc_ptr; Uint32 attr_buf_rec, attr_buf_index; - ndbrequire((index < MAX_PARALLEL_OP_PER_SCAN) && - index < scanP->scan_acc_index); - get_acc_ptr(scanP, acc_ptr, index); + if (!((index < MAX_PARALLEL_OP_PER_SCAN) && + index < scanP->scan_acc_index)) { + ndbrequire(crash_flag); + return RNIL; + } + i_get_acc_ptr(scanP, acc_ptr, index); return *acc_ptr; } @@ -7315,7 +7323,7 @@ Dblqh::set_acc_ptr_in_scan_record(ScanRecord* scanP, ndbrequire((index == 0 || scanP->scan_acc_index == index) && (index < MAX_PARALLEL_OP_PER_SCAN)); scanP->scan_acc_index= index + 1; - get_acc_ptr(scanP, acc_ptr, index); + i_get_acc_ptr(scanP, acc_ptr, index); *acc_ptr= acc; } @@ -7882,10 +7890,11 @@ void Dblqh::nextScanConfScanLab(Signal* signal) GSN_ACC_CHECK_SCAN, signal, 2, JBB); return; }//if - + jam(); set_acc_ptr_in_scan_record(scanptr.p, scanptr.p->scanCompletedOperations, nextScanConf->accOperationPtr); + jam(); scanptr.p->scanLocalref[0] = nextScanConf->localKey[0]; scanptr.p->scanLocalref[1] = nextScanConf->localKey[1]; scanptr.p->scanLocalFragid = nextScanConf->fragId; @@ -7904,6 +7913,7 @@ void Dblqh::nextScanConfScanLab(Signal* signal) return; }//if }//if + jam(); nextScanConfLoopLab(signal); }//Dblqh::nextScanConfScanLab() @@ -7926,7 +7936,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal) closeScanLab(signal); return; }//if - + jam(); Uint32 tableRef; Uint32 tupFragPtr; Uint32 reqinfo = (scanptr.p->scanLockHold == ZFALSE); @@ -7960,6 +7970,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal) }//if } { + jam(); TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend(); tupKeyReq->connectPtr = tcConnectptr.p->tupConnectrec; @@ -8062,26 +8073,27 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN); scanptr.p->scan_batch_len+= tdata4; scanptr.p->scanCompletedOperations++; - if ((scanptr.p->scanCompletedOperations == - scanptr.p->scanConcurrentOperations) && - (scanptr.p->scanLockHold == ZTRUE)) { - jam(); - scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; - sendScanFragConf(signal, ZFALSE); - return; - } else if (scanptr.p->scanCompletedOperations == - scanptr.p->scanConcurrentOperations) { - jam(); - scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations; - scanReleaseLocksLab(signal); - return; - } else if (scanptr.p->scanLockHold == ZTRUE) { - jam(); - scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; + if (check_scan_batch_completed(scanptr.p)) { + if (scanptr.p->scanLockHold == ZTRUE) { + jam(); + scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; + sendScanFragConf(signal, ZFALSE); + return; + } else { + jam(); + scanptr.p->scanReleaseCounter = scanptr.p->scanCompletedOperations; + scanReleaseLocksLab(signal); + return; + } } else { - jam(); - scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT; - }//if + if (scanptr.p->scanLockHold == ZTRUE) { + jam(); + scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; + } else { + jam(); + scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT; + } + } scanNextLoopLab(signal); }//Dblqh::scanTupkeyConfLab() @@ -8123,11 +8135,13 @@ void Dblqh::continueScanAfterBlockedLab(Signal* signal) jam(); scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT; accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, - scanptr.p->scanCompletedOperations); + scanptr.p->scanCompletedOperations, + false); } else if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_COMMIT) { jam(); accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, - scanptr.p->scanCompletedOperations); + scanptr.p->scanCompletedOperations, + false); } else { jam(); accOpPtr = RNIL; // The value is not used in ACC @@ -8355,7 +8369,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion; scanptr.p->scanCompletedOperations = 0; scanptr.p->scan_batch_len= 0; - scanptr.p->scanConcurrentOperations = scanConcurrentOperations; + scanptr.p->scanConcurrentOperations = scanFragReq->first_batch_size; + scanptr.p->batch_size= scanConcurrentOperations; scanptr.p->batch_byte_size= scanFragReq->batch_byte_size; scanptr.p->first_batch_size= scanFragReq->first_batch_size; scanptr.p->scanErrorCounter = 0; @@ -8371,6 +8386,14 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) scanptr.p->scanNumber = ~0; scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr; + ndbout << "batch_size = " << scanptr.p->batch_size; + ndbout << " first_batch_size = " << scanptr.p->scanConcurrentOperations; + ndbout << endl; + if ((scanptr.p->scanConcurrentOperations == 0) || + (scanptr.p->scanConcurrentOperations > scanptr.p->batch_size)) { + jam(); + return ScanFragRef::ZWRONG_BATCH_SIZE; + } if (!seize_acc_ptr_list(scanptr.p, scanConcurrentOperations)) { jam(); return ScanFragRef::ZTOO_MANY_ACTIVE_SCAN_ERROR; @@ -8693,6 +8716,7 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted) { Uint32 completed_ops= scanptr.p->scanCompletedOperations; Uint32 total_len= scanptr.p->scan_batch_len; + scanptr.p->scanConcurrentOperations= scanptr.p->batch_size; scanptr.p->scanTcWaiting = ZFALSE; if(ERROR_INSERTED(5037)){ @@ -9276,7 +9300,7 @@ void Dblqh::continueCopyAfterBlockedLab(Signal* signal) scanptr.i = tcConnectptr.p->tcScanRec; c_scanRecordPool.getPtr(scanptr); tcConnectptr.p->errorCode = 0; - Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0); + Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0, false); signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[1] = acc_op_ptr; signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT; |