diff options
author | unknown <joreland@mysql.com> | 2004-11-18 20:54:35 +0100 |
---|---|---|
committer | unknown <joreland@mysql.com> | 2004-11-18 20:54:35 +0100 |
commit | 8c5a826e4d426a5dff86e0dda51be6317cd26db7 (patch) | |
tree | 244ef9b227521630aff85611b08fe9d3f995a9df /ndb/src/ndbapi/NdbScanOperation.cpp | |
parent | a16a0d7a8c18093395fbcfe2b8bb8071647cd222 (diff) | |
download | mariadb-git-8c5a826e4d426a5dff86e0dda51be6317cd26db7.tar.gz |
wl2077 moved inline to .hpp file
Diffstat (limited to 'ndb/src/ndbapi/NdbScanOperation.cpp')
-rw-r--r-- | ndb/src/ndbapi/NdbScanOperation.cpp | 135 |
1 files changed, 97 insertions, 38 deletions
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 373fec1a2b0..30b596d7098 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -35,6 +35,8 @@ #include <signaldata/AttrInfo.hpp> #include <signaldata/TcKeyReq.hpp> +#define DEBUG_NEXT_RESULT 0 + NdbScanOperation::NdbScanOperation(Ndb* aNdb) : NdbOperation(aNdb), m_resultSet(0), @@ -275,6 +277,9 @@ NdbScanOperation::fix_receivers(Uint32 parallel){ void NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ if(theError.code == 0){ + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver_delivered"); + Uint32 idx = tRec->m_list_index; Uint32 last = m_sent_receivers_count - 1; if(idx != last){ @@ -298,6 +303,9 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ void NdbScanOperation::receiver_completed(NdbReceiver* tRec){ if(theError.code == 0){ + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver_completed"); + Uint32 idx = tRec->m_list_index; Uint32 last = m_sent_receivers_count - 1; if(idx != last){ @@ -445,8 +453,6 @@ NdbScanOperation::executeCursor(int nodeId){ return -1; } -#define DEBUG_NEXT_RESULT 0 - int NdbScanOperation::nextResult(bool fetchAllowed) { if(m_ordered) @@ -579,7 +585,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) int NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ - if(cnt > 0 || stopScanFlag){ + if(cnt > 0) + { NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); @@ -595,33 +602,40 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ */ Uint32 last = m_sent_receivers_count; Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4); + Uint32 sent = 0; for(Uint32 i = 0; i<cnt; i++){ NdbReceiver * tRec = m_api_receivers[i]; - m_sent_receivers[last+i] = tRec; - tRec->m_list_index = last+i; - prep_array[i] = tRec->m_tcPtrI; - tRec->prepareSend(); + if((prep_array[sent] = tRec->m_tcPtrI) != RNIL) + { + m_sent_receivers[last+sent] = tRec; + tRec->m_list_index = last+sent; + tRec->prepareSend(); + sent++; + } } memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*)); - Uint32 nodeId = theNdbCon->theDBnode; - TransporterFacade * tp = TransporterFacade::instance(); - int ret; - if(cnt > 21){ - tSignal.setLength(4); - LinearSectionPtr ptr[3]; - ptr[0].p = prep_array; - ptr[0].sz = cnt; - ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); - } else { - tSignal.setLength(4+cnt); - ret = tp->sendSignal(&tSignal, nodeId); + int ret = 0; + if(sent) + { + Uint32 nodeId = theNdbCon->theDBnode; + TransporterFacade * tp = TransporterFacade::instance(); + if(cnt > 21 && !stopScanFlag){ + tSignal.setLength(4); + LinearSectionPtr ptr[3]; + ptr[0].p = prep_array; + ptr[0].sz = sent; + ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); + } else { + tSignal.setLength(4+(stopScanFlag ? 0 : sent)); + ret = tp->sendSignal(&tSignal, nodeId); + } } - - m_sent_receivers_count = last + cnt + stopScanFlag; + + m_sent_receivers_count = last + sent; m_api_receivers_count -= cnt; m_current_api_receiver = 0; - + return ret; } return 0; @@ -1412,10 +1426,22 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ if(idx == theParallelism) return 0; + NdbReceiver* tRec = m_api_receivers[idx]; NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); + Uint32 last = m_sent_receivers_count; Uint32* theData = tSignal.getDataPtrSend(); + Uint32* prep_array = theData + 4; + + m_current_api_receiver = idx + 1; + if((prep_array[0] = tRec->m_tcPtrI) == RNIL) + { + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver completed, don't send"); + return 0; + } + theData[0] = theNdbCon->theTCConPtr; theData[1] = 0; Uint64 transId = theNdbCon->theTransactionId; @@ -1425,17 +1451,10 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ /** * Prepare ops */ - Uint32 last = m_sent_receivers_count; - Uint32 * prep_array = theData + 4; - - NdbReceiver * tRec = m_api_receivers[idx]; m_sent_receivers[last] = tRec; tRec->m_list_index = last; - prep_array[0] = tRec->m_tcPtrI; tRec->prepareSend(); - m_sent_receivers_count = last + 1; - m_current_api_receiver = idx + 1; Uint32 nodeId = theNdbCon->theDBnode; TransporterFacade * tp = TransporterFacade::instance(); @@ -1448,12 +1467,17 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; - if(seq != tp->getNodeSequence(nodeId)){ + if(seq != tp->getNodeSequence(nodeId)) + { theNdbCon->theReleaseOnClose = true; return -1; } - while(theError.code == 0 && m_sent_receivers_count){ + /** + * Wait for outstanding + */ + while(theError.code == 0 && m_sent_receivers_count) + { theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1471,18 +1495,52 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ } } - if(m_api_receivers_count+m_conf_receivers_count){ - // Send close scan - if(send_next_scan(0, true) == -1){ // Close scan - theNdbCon->theReleaseOnClose = true; - return -1; - } + /** + * move all conf'ed into api + * so that send_next_scan can check if they needs to be closed + */ + Uint32 api = m_api_receivers_count; + Uint32 conf = m_conf_receivers_count; + + if(m_ordered) + { + /** + * Ordered scan, keep the m_api_receivers "to the right" + */ + memmove(m_api_receivers, m_api_receivers+m_current_api_receiver, + (theParallelism - m_current_api_receiver) * sizeof(char*)); + api = (theParallelism - m_current_api_receiver); + m_api_receivers_count = api; + } + + if(DEBUG_NEXT_RESULT) + ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d", + m_ordered, api, conf, + m_sent_receivers_count, m_current_api_receiver, theParallelism); + + if(api+conf) + { + /** + * There's something to close + * setup m_api_receivers (for send_next_scan) + */ + memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*)); + m_api_receivers_count = api + conf; + m_conf_receivers_count = 0; + } + + // Send close scan + if(send_next_scan(api+conf, true) == -1) + { + theNdbCon->theReleaseOnClose = true; + return -1; } /** * wait for close scan conf */ - while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){ + while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count) + { theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1499,6 +1557,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ return -1; } } + return 0; } |