diff options
Diffstat (limited to 'storage/ndb/src/ndbapi/NdbScanOperation.cpp')
-rw-r--r-- | storage/ndb/src/ndbapi/NdbScanOperation.cpp | 1669 |
1 files changed, 1669 insertions, 0 deletions
diff --git a/storage/ndb/src/ndbapi/NdbScanOperation.cpp b/storage/ndb/src/ndbapi/NdbScanOperation.cpp new file mode 100644 index 00000000000..9cd78ec721b --- /dev/null +++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp @@ -0,0 +1,1669 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <ndb_global.h> +#include <Ndb.hpp> +#include <NdbScanOperation.hpp> +#include <NdbIndexScanOperation.hpp> +#include <NdbTransaction.hpp> +#include "NdbApiSignal.hpp" +#include <NdbOut.hpp> +#include "NdbDictionaryImpl.hpp" +#include <NdbBlob.hpp> + +#include <NdbRecAttr.hpp> +#include <NdbReceiver.hpp> + +#include <stdlib.h> +#include <NdbSqlUtil.hpp> + +#include <signaldata/ScanTab.hpp> +#include <signaldata/KeyInfo.hpp> +#include <signaldata/AttrInfo.hpp> +#include <signaldata/TcKeyReq.hpp> + +#define DEBUG_NEXT_RESULT 0 + +NdbScanOperation::NdbScanOperation(Ndb* aNdb) : + NdbOperation(aNdb), + m_transConnection(NULL) +{ + theParallelism = 0; + m_allocated_receivers = 0; + m_prepared_receivers = 0; + m_api_receivers = 0; + m_conf_receivers = 0; + m_sent_receivers = 0; + m_receivers = 0; + m_array = new Uint32[1]; // skip if on delete in fix_receivers + theSCAN_TABREQ = 0; +} + +NdbScanOperation::~NdbScanOperation() +{ + for(Uint32 i = 0; i<m_allocated_receivers; i++){ + m_receivers[i]->release(); + theNdb->releaseNdbScanRec(m_receivers[i]); + } + delete[] m_array; +} + +void +NdbScanOperation::setErrorCode(int aErrorCode){ + NdbTransaction* tmp = theNdbCon; + theNdbCon = m_transConnection; + NdbOperation::setErrorCode(aErrorCode); + theNdbCon = tmp; +} + +void +NdbScanOperation::setErrorCodeAbort(int aErrorCode){ + NdbTransaction* tmp = theNdbCon; + theNdbCon = m_transConnection; + NdbOperation::setErrorCodeAbort(aErrorCode); + theNdbCon = tmp; +} + + +/***************************************************************************** + * int init(); + * + * Return Value: Return 0 : init was successful. + * Return -1: In all other case. + * Remark: Initiates operation record after allocation. + *****************************************************************************/ +int +NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection) +{ + m_transConnection = myConnection; + //NdbTransaction* aScanConnection = theNdb->startTransaction(myConnection); + NdbTransaction* aScanConnection = theNdb->hupp(myConnection); + if (!aScanConnection){ + setErrorCodeAbort(theNdb->getNdbError().code); + return -1; + } + + // NOTE! The hupped trans becomes the owner of the operation + if(NdbOperation::init(tab, aScanConnection) != 0){ + return -1; + } + + initInterpreter(); + + theStatus = GetValue; + theOperationType = OpenScanRequest; + theNdbCon->theMagicNumber = 0xFE11DF; + theNoOfTupKeyLeft = tab->m_noOfDistributionKeys; + m_read_range_no = 0; + return 0; +} + +int +NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, + Uint32 batch, + Uint32 parallel) +{ + m_ordered = m_descending = false; + Uint32 fragCount = m_currentTable->m_fragmentCount; + + if (parallel > fragCount || parallel == 0) { + parallel = fragCount; + } + + // It is only possible to call openScan if + // 1. this transcation don't already contain another scan operation + // 2. this transaction don't already contain other operations + // 3. theScanOp contains a NdbScanOperation + if (theNdbCon->theScanningOp != NULL){ + setErrorCode(4605); + return -1; + } + + theNdbCon->theScanningOp = this; + theLockMode = lm; + + bool lockExcl, lockHoldMode, readCommitted; + switch(lm){ + case NdbScanOperation::LM_Read: + lockExcl = false; + lockHoldMode = true; + readCommitted = false; + break; + case NdbScanOperation::LM_Exclusive: + lockExcl = true; + lockHoldMode = true; + readCommitted = false; + break; + case NdbScanOperation::LM_CommittedRead: + lockExcl = false; + lockHoldMode = false; + readCommitted = true; + break; + default: + setErrorCode(4003); + return -1; + } + + m_keyInfo = lockExcl ? 1 : 0; + + bool range = false; + if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex) + { + if (m_currentTable == m_accessTable){ + // Old way of scanning indexes, should not be allowed + m_currentTable = theNdb->theDictionary-> + getTable(m_currentTable->m_primaryTable.c_str()); + assert(m_currentTable != NULL); + } + assert (m_currentTable != m_accessTable); + // Modify operation state + theStatus = GetValue; + theOperationType = OpenRangeScanRequest; + range = true; + } + + theParallelism = parallel; + + if(fix_receivers(parallel) == -1){ + setErrorCodeAbort(4000); + return -1; + } + + theSCAN_TABREQ = (!theSCAN_TABREQ ? theNdb->getSignal() : theSCAN_TABREQ); + if (theSCAN_TABREQ == NULL) { + setErrorCodeAbort(4000); + return -1; + }//if + + theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ); + ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); + req->apiConnectPtr = theNdbCon->theTCConPtr; + req->tableId = m_accessTable->m_tableId; + req->tableSchemaVersion = m_accessTable->m_version; + req->storedProcId = 0xFFFF; + req->buddyConPtr = theNdbCon->theBuddyConPtr; + + Uint32 reqInfo = 0; + ScanTabReq::setParallelism(reqInfo, parallel); + ScanTabReq::setScanBatch(reqInfo, 0); + ScanTabReq::setLockMode(reqInfo, lockExcl); + ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode); + ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted); + ScanTabReq::setRangeScanFlag(reqInfo, range); + req->requestInfo = reqInfo; + + Uint64 transId = theNdbCon->getTransactionId(); + req->transId1 = (Uint32) transId; + req->transId2 = (Uint32) (transId >> 32); + + NdbApiSignal* tSignal = theSCAN_TABREQ->next(); + if(!tSignal) + { + theSCAN_TABREQ->next(tSignal = theNdb->getSignal()); + } + theLastKEYINFO = tSignal; + + tSignal->setSignal(GSN_KEYINFO); + theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; + theTotalNrOfKeyWordInSignal= 0; + + getFirstATTRINFOScan(); + return 0; +} + +int +NdbScanOperation::fix_receivers(Uint32 parallel){ + assert(parallel > 0); + if(parallel > m_allocated_receivers){ + const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32)); + + Uint64 * tmp = new Uint64[(sz+7)/8]; + // Save old receivers + memcpy(tmp, m_receivers, m_allocated_receivers*sizeof(char*)); + delete[] m_array; + m_array = (Uint32*)tmp; + + m_receivers = (NdbReceiver**)tmp; + m_api_receivers = m_receivers + parallel; + m_conf_receivers = m_api_receivers + parallel; + m_sent_receivers = m_conf_receivers + parallel; + m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel); + + // Only get/init "new" receivers + NdbReceiver* tScanRec; + for (Uint32 i = m_allocated_receivers; i < parallel; i ++) { + tScanRec = theNdb->getNdbScanRec(); + if (tScanRec == NULL) { + setErrorCodeAbort(4000); + return -1; + }//if + m_receivers[i] = tScanRec; + tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this); + } + m_allocated_receivers = parallel; + } + + reset_receivers(parallel, 0); + return 0; +} + +/** + * Move receiver from send array to conf:ed array + */ +void +NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ + if(theError.code == 0){ + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver_delivered"); + + Uint32 idx = tRec->m_list_index; + Uint32 last = m_sent_receivers_count - 1; + if(idx != last){ + NdbReceiver * move = m_sent_receivers[last]; + m_sent_receivers[idx] = move; + move->m_list_index = idx; + } + m_sent_receivers_count = last; + + last = m_conf_receivers_count; + m_conf_receivers[last] = tRec; + m_conf_receivers_count = last + 1; + tRec->m_list_index = last; + tRec->m_current_row = 0; + } +} + +/** + * Remove receiver as it's completed + */ +void +NdbScanOperation::receiver_completed(NdbReceiver* tRec){ + if(theError.code == 0){ + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver_completed"); + + Uint32 idx = tRec->m_list_index; + Uint32 last = m_sent_receivers_count - 1; + if(idx != last){ + NdbReceiver * move = m_sent_receivers[last]; + m_sent_receivers[idx] = move; + move->m_list_index = idx; + } + m_sent_receivers_count = last; + } +} + +/***************************************************************************** + * int getFirstATTRINFOScan( U_int32 aData ) + * + * Return Value: Return 0: Successful + * Return -1: All other cases + * Parameters: None: Only allocate the first signal. + * Remark: When a scan is defined we need to use this method instead + * of insertATTRINFO for the first signal. + * This is because we need not to mess up the code in + * insertATTRINFO with if statements since we are not + * interested in the TCKEYREQ signal. + *****************************************************************************/ +int +NdbScanOperation::getFirstATTRINFOScan() +{ + NdbApiSignal* tSignal; + + tSignal = theNdb->getSignal(); + if (tSignal == NULL){ + setErrorCodeAbort(4000); + return -1; + } + tSignal->setSignal(m_attrInfoGSN); + theAI_LenInCurrAI = 8; + theATTRINFOptr = &tSignal->getDataPtrSend()[8]; + theFirstATTRINFO = tSignal; + theCurrentATTRINFO = tSignal; + theCurrentATTRINFO->next(NULL); + + return 0; +} + +/** + * Constats for theTupleKeyDefined[][0] + */ +#define SETBOUND_EQ 1 +#define FAKE_PTR 2 +#define API_PTR 3 + +#define WAITFOR_SCAN_TIMEOUT 120000 + +int +NdbScanOperation::executeCursor(int nodeId){ + NdbTransaction * tCon = theNdbCon; + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + + Uint32 magic = tCon->theMagicNumber; + Uint32 seq = tCon->theNodeSequence; + + if (tp->get_node_alive(nodeId) && + (tp->getNodeSequence(nodeId) == seq)) { + + /** + * Only call prepareSendScan first time (incase of restarts) + * - check with theMagicNumber + */ + tCon->theMagicNumber = 0x37412619; + if(magic != 0x37412619 && + prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1) + return -1; + + + if (doSendScan(nodeId) == -1) + return -1; + + return 0; + } else { + if (!(tp->get_node_stopping(nodeId) && + (tp->getNodeSequence(nodeId) == seq))){ + TRACE_DEBUG("The node is hard dead when attempting to start a scan"); + setErrorCode(4029); + tCon->theReleaseOnClose = true; + } else { + TRACE_DEBUG("The node is stopping when attempting to start a scan"); + setErrorCode(4030); + }//if + tCon->theCommitStatus = NdbTransaction::Aborted; + }//if + return -1; +} + + +int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend) +{ + int res; + if ((res = nextResultImpl(fetchAllowed, forceSend)) == 0) { + // handle blobs + NdbBlob* tBlob = theBlobList; + while (tBlob != 0) { + if (tBlob->atNextResult() == -1) + return -1; + tBlob = tBlob->theNext; + } + /* + * Flush blob part ops on behalf of user because + * - nextResult is analogous to execute(NoCommit) + * - user is likely to want blob value before next execute + */ + if (m_transConnection->executePendingBlobOps() == -1) + return -1; + return 0; + } + return res; +} + +int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend) +{ + if(m_ordered) + return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed, + forceSend); + + /** + * Check current receiver + */ + int retVal = 2; + Uint32 idx = m_current_api_receiver; + Uint32 last = m_api_receivers_count; + m_curr_row = 0; + + if(DEBUG_NEXT_RESULT) + ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last); + + /** + * Check next buckets + */ + for(; idx < last; idx++){ + NdbReceiver* tRec = m_api_receivers[idx]; + if(tRec->nextResult()){ + m_curr_row = tRec->copyout(theReceiver); + retVal = 0; + break; + } + } + + /** + * We have advanced atleast one bucket + */ + if(!fetchAllowed || !retVal){ + m_current_api_receiver = idx; + if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal); + return retVal; + } + + Uint32 nodeId = theNdbCon->theDBnode; + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + if(theError.code) + return -1; + + Uint32 seq = theNdbCon->theNodeSequence; + if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false, + forceSend) == 0){ + + idx = m_current_api_receiver; + last = m_api_receivers_count; + + do { + if(theError.code){ + setErrorCode(theError.code); + if(DEBUG_NEXT_RESULT) ndbout_c("return -1"); + return -1; + } + + Uint32 cnt = m_conf_receivers_count; + Uint32 sent = m_sent_receivers_count; + + if(DEBUG_NEXT_RESULT) + ndbout_c("idx=%d last=%d cnt=%d sent=%d", idx, last, cnt, sent); + + if(cnt > 0){ + /** + * Just move completed receivers + */ + memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*)); + last += cnt; + m_conf_receivers_count = 0; + } else if(retVal == 2 && sent > 0){ + /** + * No completed... + */ + theNdb->theImpl->theWaiter.m_node = nodeId; + theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { + continue; + } else { + idx = last; + retVal = -2; //return_code; + } + } else if(retVal == 2){ + /** + * No completed & no sent -> EndOfData + */ + theError.code = -1; // make sure user gets error if he tries again + if(DEBUG_NEXT_RESULT) ndbout_c("return 1"); + return 1; + } + + if(retVal == 0) + break; + + for(; idx < last; idx++){ + NdbReceiver* tRec = m_api_receivers[idx]; + if(tRec->nextResult()){ + m_curr_row = tRec->copyout(theReceiver); + retVal = 0; + break; + } + } + } while(retVal == 2); + } else { + retVal = -3; + } + + m_api_receivers_count = last; + m_current_api_receiver = idx; + + switch(retVal){ + case 0: + case 1: + case 2: + if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal); + return retVal; + case -1: + setErrorCode(4008); // Timeout + break; + case -2: + setErrorCode(4028); // Node fail + break; + case -3: // send_next_scan -> return fail (set error-code self) + if(theError.code == 0) + setErrorCode(4028); // seq changed = Node fail + break; + } + + theNdbCon->theTransactionIsStarted = false; + theNdbCon->theReleaseOnClose = true; + if(DEBUG_NEXT_RESULT) ndbout_c("return -1", retVal); + return -1; +} + +int +NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, + bool forceSend){ + if(cnt > 0){ + NdbApiSignal tSignal(theNdb->theMyRef); + tSignal.setSignal(GSN_SCAN_NEXTREQ); + + Uint32* theData = tSignal.getDataPtrSend(); + theData[0] = theNdbCon->theTCConPtr; + theData[1] = stopScanFlag == true ? 1 : 0; + Uint64 transId = theNdbCon->theTransactionId; + theData[2] = transId; + theData[3] = (Uint32) (transId >> 32); + + /** + * Prepare ops + */ + Uint32 last = m_sent_receivers_count; + Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4); + Uint32 sent = 0; + for(Uint32 i = 0; i<cnt; i++){ + NdbReceiver * tRec = m_api_receivers[i]; + if((prep_array[sent] = tRec->m_tcPtrI) != RNIL) + { + m_sent_receivers[last+sent] = tRec; + tRec->m_list_index = last+sent; + tRec->prepareSend(); + sent++; + } + } + memmove(m_api_receivers, m_api_receivers+cnt, + (theParallelism-cnt) * sizeof(char*)); + + int ret = 0; + if(sent) + { + Uint32 nodeId = theNdbCon->theDBnode; + TransporterFacade * tp = TransporterFacade::instance(); + if(cnt > 21){ + tSignal.setLength(4); + LinearSectionPtr ptr[3]; + ptr[0].p = prep_array; + ptr[0].sz = sent; + ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); + } else { + tSignal.setLength(4+sent); + ret = tp->sendSignal(&tSignal, nodeId); + } + } + + if (!ret) checkForceSend(forceSend); + + m_sent_receivers_count = last + sent; + m_api_receivers_count -= cnt; + m_current_api_receiver = 0; + + return ret; + } + return 0; +} + +void NdbScanOperation::checkForceSend(bool forceSend) +{ + if (forceSend) { + TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber); + } else { + TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber); + }//if +} + +int +NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId) +{ + printf("NdbScanOperation::prepareSend\n"); + abort(); + return 0; +} + +int +NdbScanOperation::doSend(int ProcessorId) +{ + printf("NdbScanOperation::doSend\n"); + return 0; +} + +void NdbScanOperation::close(bool forceSend) +{ + if(m_transConnection){ + if(DEBUG_NEXT_RESULT) + ndbout_c("close() theError.code = %d " + "m_api_receivers_count = %d " + "m_conf_receivers_count = %d " + "m_sent_receivers_count = %d", + theError.code, + m_api_receivers_count, + m_conf_receivers_count, + m_sent_receivers_count); + + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + close_impl(tp, forceSend); + + } while(0); + + theNdbCon->theScanningOp = 0; + theNdb->closeTransaction(theNdbCon); + + theNdbCon = 0; + m_transConnection = NULL; +} + +void +NdbScanOperation::execCLOSE_SCAN_REP(){ + m_conf_receivers_count = 0; + m_sent_receivers_count = 0; +} + +void NdbScanOperation::release() +{ + if(theNdbCon != 0 || m_transConnection != 0){ + close(); + } + for(Uint32 i = 0; i<m_allocated_receivers; i++){ + m_receivers[i]->release(); + } + + NdbOperation::release(); + + if(theSCAN_TABREQ) + { + theNdb->releaseSignal(theSCAN_TABREQ); + theSCAN_TABREQ = 0; + } +} + +/*************************************************************************** +int prepareSendScan(Uint32 aTC_ConnectPtr, + Uint64 aTransactionId) + +Return Value: Return 0 : preparation of send was succesful. + Return -1: In all other case. +Parameters: aTC_ConnectPtr: the Connect pointer to TC. + aTransactionId: the Transaction identity of the transaction. +Remark: Puts the the final data into ATTRINFO signal(s) after this + we know the how many signal to send and their sizes +***************************************************************************/ +int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, + Uint64 aTransactionId){ + + if (theInterpretIndicator != 1 || + (theOperationType != OpenScanRequest && + theOperationType != OpenRangeScanRequest)) { + setErrorCodeAbort(4005); + return -1; + } + + theErrorLine = 0; + + // In preapareSendInterpreted we set the sizes (word 4-8) in the + // first ATTRINFO signal. + if (prepareSendInterpreted() == -1) + return -1; + + if(m_ordered){ + ((NdbIndexScanOperation*)this)->fix_get_values(); + } + + theCurrentATTRINFO->setLength(theAI_LenInCurrAI); + + /** + * Prepare all receivers + */ + theReceiver.prepareSend(); + bool keyInfo = m_keyInfo; + Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0; + /** + * The number of records sent by each LQH is calculated and the kernel + * is informed of this number by updating the SCAN_TABREQ signal + */ + Uint32 batch_size, batch_byte_size, first_batch_size; + theReceiver.calculate_batch_size(key_size, + theParallelism, + batch_size, + batch_byte_size, + first_batch_size); + ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); + ScanTabReq::setScanBatch(req->requestInfo, batch_size); + req->batch_byte_size= batch_byte_size; + req->first_batch_size= first_batch_size; + + /** + * Set keyinfo flag + * (Always keyinfo when using blobs) + */ + Uint32 reqInfo = req->requestInfo; + ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo); + req->requestInfo = reqInfo; + + for(Uint32 i = 0; i<theParallelism; i++){ + m_receivers[i]->do_get_value(&theReceiver, batch_size, + key_size, + m_read_range_no); + } + return 0; +} + +/***************************************************************************** +int doSend() + +Return Value: Return >0 : send was succesful, returns number of signals sent + Return -1: In all other case. +Parameters: aProcessorId: Receiving processor node +Remark: Sends the ATTRINFO signal(s) +*****************************************************************************/ +int +NdbScanOperation::doSendScan(int aProcessorId) +{ + Uint32 tSignalCount = 0; + NdbApiSignal* tSignal; + + if (theInterpretIndicator != 1 || + (theOperationType != OpenScanRequest && + theOperationType != OpenRangeScanRequest)) { + setErrorCodeAbort(4005); + return -1; + } + + assert(theSCAN_TABREQ != NULL); + tSignal = theSCAN_TABREQ; + + Uint32 tupKeyLen = theTupKeyLen; + Uint32 len = theTotalNrOfKeyWordInSignal; + Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr; + Uint64 transId = theNdbCon->theTransactionId; + + // Update the "attribute info length in words" in SCAN_TABREQ before + // sending it. This could not be done in openScan because + // we created the ATTRINFO signals after the SCAN_TABREQ signal. + ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend()); + req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len; + Uint32 tmp = req->requestInfo; + ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_); + req->distributionKey = theDistributionKey; + req->requestInfo = tmp; + tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_); + + TransporterFacade *tp = TransporterFacade::instance(); + LinearSectionPtr ptr[3]; + ptr[0].p = m_prepared_receivers; + ptr[0].sz = theParallelism; + if (tp->sendSignal(tSignal, aProcessorId, ptr, 1) == -1) { + setErrorCode(4002); + return -1; + } + + if (tupKeyLen > 0){ + // must have at least one signal since it contains attrLen for bounds + assert(theLastKEYINFO != NULL); + tSignal = theLastKEYINFO; + tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal); + + assert(theSCAN_TABREQ->next() != NULL); + tSignal = theSCAN_TABREQ->next(); + + NdbApiSignal* last; + do { + KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend()); + keyInfo->connectPtr = aTC_ConnectPtr; + keyInfo->transId[0] = Uint32(transId); + keyInfo->transId[1] = Uint32(transId >> 32); + + if (tp->sendSignal(tSignal,aProcessorId) == -1){ + setErrorCode(4002); + return -1; + } + + tSignalCount++; + last = tSignal; + tSignal = tSignal->next(); + } while(last != theLastKEYINFO); + } + + tSignal = theFirstATTRINFO; + while (tSignal != NULL) { + AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend()); + attrInfo->connectPtr = aTC_ConnectPtr; + attrInfo->transId[0] = Uint32(transId); + attrInfo->transId[1] = Uint32(transId >> 32); + + if (tp->sendSignal(tSignal,aProcessorId) == -1){ + setErrorCode(4002); + return -1; + } + tSignalCount++; + tSignal = tSignal->next(); + } + theStatus = WaitResponse; + + m_curr_row = 0; + m_sent_receivers_count = theParallelism; + if(m_ordered) + { + m_current_api_receiver = theParallelism; + m_api_receivers_count = theParallelism; + } + + return tSignalCount; +}//NdbOperation::doSendScan() + +/***************************************************************************** + * NdbOperation* takeOverScanOp(NdbTransaction* updateTrans); + * + * Parameters: The update transactions NdbTransaction pointer. + * Return Value: A reference to the transferred operation object + * or NULL if no success. + * Remark: Take over the scanning transactions NdbOperation + * object for a tuple to an update transaction, + * which is the last operation read in nextScanResult() + * (theNdbCon->thePreviousScanRec) + * + * FUTURE IMPLEMENTATION: (This note was moved from header file.) + * In the future, it will even be possible to transfer + * to a NdbTransaction on another Ndb-object. + * In this case the receiving NdbTransaction-object must call + * a method receiveOpFromScan to actually receive the information. + * This means that the updating transactions can be placed + * in separate threads and thus increasing the parallelism during + * the scan process. + ****************************************************************************/ +int +NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size) +{ + NdbRecAttr * tRecAttr = m_curr_row; + if(tRecAttr) + { + const Uint32 * src = (Uint32*)tRecAttr->aRef(); + memcpy(data, src, 4*size); + return 0; + } + return -1; +} + +NdbOperation* +NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans) +{ + + NdbRecAttr * tRecAttr = m_curr_row; + if(tRecAttr) + { + NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable); + if (newOp == NULL){ + return NULL; + } + pTrans->theSimpleState = 0; + + const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1; + + newOp->theTupKeyLen = len; + newOp->theOperationType = opType; + if (opType == DeleteRequest) { + newOp->theStatus = GetValue; + } else { + newOp->theStatus = SetValue; + } + + const Uint32 * src = (Uint32*)tRecAttr->aRef(); + const Uint32 tScanInfo = src[len] & 0x3FFFF; + const Uint32 tTakeOverFragment = src[len] >> 20; + { + UintR scanInfo = 0; + TcKeyReq::setTakeOverScanFlag(scanInfo, 1); + TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment); + TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo); + newOp->theScanInfo = scanInfo; + newOp->theDistrKeyIndicator_ = 1; + newOp->theDistributionKey = tTakeOverFragment; + } + + // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ + TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend()); + Uint32 i = 0; + for (i = 0; i < TcKeyReq::MaxKeyInfo && i < len; i++) { + tcKeyReq->keyInfo[i] = * src++; + } + + if(i < len){ + NdbApiSignal* tSignal = theNdb->getSignal(); + newOp->theTCREQ->next(tSignal); + + Uint32 left = len - i; + while(tSignal && left > KeyInfo::DataLength){ + tSignal->setSignal(GSN_KEYINFO); + KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend()); + memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength); + src += KeyInfo::DataLength; + left -= KeyInfo::DataLength; + + tSignal->next(theNdb->getSignal()); + tSignal = tSignal->next(); + } + + if(tSignal && left > 0){ + tSignal->setSignal(GSN_KEYINFO); + KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend()); + memcpy(keyInfo->keyData, src, 4 * left); + } + } + // create blob handles automatically + if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) { + for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) { + NdbColumnImpl* c = m_currentTable->m_columns[i]; + assert(c != 0); + if (c->getBlobType()) { + if (newOp->getBlobHandle(pTrans, c) == NULL) + return NULL; + } + } + } + + return newOp; + } + return 0; +} + +NdbBlob* +NdbScanOperation::getBlobHandle(const char* anAttrName) +{ + m_keyInfo = 1; + return NdbOperation::getBlobHandle(m_transConnection, + m_currentTable->getColumn(anAttrName)); +} + +NdbBlob* +NdbScanOperation::getBlobHandle(Uint32 anAttrId) +{ + m_keyInfo = 1; + return NdbOperation::getBlobHandle(m_transConnection, + m_currentTable->getColumn(anAttrId)); +} + +NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb) + : NdbScanOperation(aNdb) +{ +} + +NdbIndexScanOperation::~NdbIndexScanOperation(){ +} + +int +NdbIndexScanOperation::setBound(const char* anAttrName, int type, + const void* aValue, Uint32 len) +{ + return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len); +} + +int +NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, + const void* aValue, Uint32 len) +{ + return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len); +} + +int +NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject, + const char* aValue, + Uint32 len){ + return setBound(anAttrObject, BoundEQ, aValue, len); +} + +NdbRecAttr* +NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, + char* aValue){ + if(!m_ordered){ + return NdbScanOperation::getValue_impl(attrInfo, aValue); + } + + int id = attrInfo->m_attrId; // In "real" table + assert(m_accessTable->m_index); + int sz = (int)m_accessTable->m_index->m_key_ids.size(); + if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){ + return NdbScanOperation::getValue_impl(attrInfo, aValue); + } + + assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY); + Uint32 marker = theTupleKeyDefined[id][0]; + + if(marker == SETBOUND_EQ){ + return NdbScanOperation::getValue_impl(attrInfo, aValue); + } else if(marker == API_PTR){ + return NdbScanOperation::getValue_impl(attrInfo, aValue); + } + + assert(marker == FAKE_PTR); + + UintPtr oldVal; + oldVal = theTupleKeyDefined[id][1]; +#if (SIZEOF_CHARP == 8) + oldVal = oldVal | (((UintPtr)theTupleKeyDefined[id][2]) << 32); +#endif + theTupleKeyDefined[id][0] = API_PTR; + + NdbRecAttr* tmp = (NdbRecAttr*)oldVal; + tmp->setup(attrInfo, aValue); + + return tmp; +} + +#include <AttributeHeader.hpp> +/* + * Define bound on index column in range scan. + */ +int +NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, + int type, const void* aValue, Uint32 len) +{ + if (theOperationType == OpenRangeScanRequest && + (0 <= type && type <= 4) && + len <= 8000) { + // insert bound type + Uint32 currLen = theTotalNrOfKeyWordInSignal; + Uint32 remaining = KeyInfo::DataLength - currLen; + Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; + bool tDistrKey = tAttrInfo->m_distributionKey; + + len = aValue != NULL ? sizeInBytes : 0; + if (len != sizeInBytes && (len != 0)) { + setErrorCodeAbort(4209); + return -1; + } + + // insert attribute header + Uint32 tIndexAttrId = tAttrInfo->m_attrId; + Uint32 sizeInWords = (len + 3) / 4; + AttributeHeader ah(tIndexAttrId, sizeInWords); + const Uint32 ahValue = ah.m_value; + + const Uint32 align = (UintPtr(aValue) & 7); + const bool aligned = (tDistrKey && type == BoundEQ) ? + (align == 0) : (align & 3) == 0; + + const bool nobytes = (len & 0x3) == 0; + const Uint32 totalLen = 2 + sizeInWords; + Uint32 tupKeyLen = theTupKeyLen; + if(remaining > totalLen && aligned && nobytes){ + Uint32 * dst = theKEYINFOptr + currLen; + * dst ++ = type; + * dst ++ = ahValue; + memcpy(dst, aValue, 4 * sizeInWords); + theTotalNrOfKeyWordInSignal = currLen + totalLen; + } else { + if(!aligned || !nobytes){ + Uint32 tempData[2000]; + tempData[0] = type; + tempData[1] = ahValue; + tempData[2 + (len >> 2)] = 0; + memcpy(tempData+2, aValue, len); + + insertBOUNDS(tempData, 2+sizeInWords); + } else { + Uint32 buf[2] = { type, ahValue }; + insertBOUNDS(buf, 2); + insertBOUNDS((Uint32*)aValue, sizeInWords); + } + } + theTupKeyLen = tupKeyLen + totalLen; + + /** + * Do sorted stuff + */ + + /** + * The primary keys for an ordered index is defined in the beginning + * so it's safe to use [tIndexAttrId] + * (instead of looping as is NdbOperation::equal_impl) + */ + if(type == BoundEQ && tDistrKey) + { + theNoOfTupKeyLeft--; + return handle_distribution_key((Uint64*)aValue, sizeInWords); + } + return 0; + } else { + setErrorCodeAbort(4228); // XXX wrong code + return -1; + } +} + +int +NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){ + Uint32 len; + Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal; + Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal; + do { + len = (sz < remaining ? sz : remaining); + memcpy(dst, data, 4 * len); + + if(sz >= remaining){ + NdbApiSignal* tCurr = theLastKEYINFO; + tCurr->setLength(KeyInfo::MaxSignalLength); + NdbApiSignal* tSignal = tCurr->next(); + if(tSignal) + ; + else if((tSignal = theNdb->getSignal()) != 0) + { + tCurr->next(tSignal); + tSignal->setSignal(GSN_KEYINFO); + } else { + goto error; + } + theLastKEYINFO = tSignal; + theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; + remaining = KeyInfo::DataLength; + sz -= len; + data += len; + } else { + len = (KeyInfo::DataLength - remaining) + len; + break; + } + } while(true); + theTotalNrOfKeyWordInSignal = len; + return 0; + +error: + setErrorCodeAbort(4228); // XXX wrong code + return -1; +} + +int +NdbIndexScanOperation::readTuples(LockMode lm, + Uint32 batch, + Uint32 parallel, + bool order_by, + bool order_desc, + bool read_range_no){ + int res = NdbScanOperation::readTuples(lm, batch, 0); + if(!res && read_range_no) + { + m_read_range_no = 1; + Uint32 word = 0; + AttributeHeader::init(&word, AttributeHeader::RANGE_NO, 0); + if(insertATTRINFO(word) == -1) + res = -1; + } + if(!res && order_by){ + m_ordered = true; + if (order_desc) { + m_descending = true; + ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); + ScanTabReq::setDescendingFlag(req->requestInfo, true); + } + Uint32 cnt = m_accessTable->getNoOfColumns() - 1; + m_sort_columns = cnt; // -1 for NDB$NODE + m_current_api_receiver = m_sent_receivers_count; + m_api_receivers_count = m_sent_receivers_count; + + m_sort_columns = cnt; + for(Uint32 i = 0; i<cnt; i++){ + const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i]; + const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos); + NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1); + UintPtr newVal = UintPtr(tmp); + theTupleKeyDefined[i][0] = FAKE_PTR; + theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF); +#if (SIZEOF_CHARP == 8) + theTupleKeyDefined[i][2] = (newVal >> 32); +#endif + } + } + m_this_bound_start = 0; + m_first_bound_word = theKEYINFOptr; + + return res; +} + +void +NdbIndexScanOperation::fix_get_values(){ + /** + * Loop through all getValues and set buffer pointer to "API" pointer + */ + NdbRecAttr * curr = theReceiver.theFirstRecAttr; + Uint32 cnt = m_accessTable->getNoOfColumns() - 1; + assert(cnt < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY); + + const NdbIndexImpl * idx = m_accessTable->m_index; + const NdbTableImpl * tab = m_currentTable; + for(Uint32 i = 0; i<cnt; i++){ + Uint32 val = theTupleKeyDefined[i][0]; + switch(val){ + case FAKE_PTR: + curr->setup(curr->m_column, 0); + case API_PTR: + curr = curr->next(); + break; + case SETBOUND_EQ: + break; +#ifdef VM_TRACE + default: + abort(); +#endif + } + } +} + +int +NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, + const NdbReceiver* t1, + const NdbReceiver* t2){ + + NdbRecAttr * r1 = t1->m_rows[t1->m_current_row]; + NdbRecAttr * r2 = t2->m_rows[t2->m_current_row]; + + r1 = (skip ? r1->next() : r1); + r2 = (skip ? r2->next() : r2); + const int jdir = 1 - 2 * (int)m_descending; + assert(jdir == 1 || jdir == -1); + while(cols > 0){ + Uint32 * d1 = (Uint32*)r1->aRef(); + Uint32 * d2 = (Uint32*)r2->aRef(); + unsigned r1_null = r1->isNULL(); + if((r1_null ^ (unsigned)r2->isNULL())){ + return (r1_null ? -1 : 1) * jdir; + } + const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column); + Uint32 len = r1->theAttrSize * r1->theArraySize; + if(!r1_null){ + const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_type); + int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true); + if(r){ + assert(r != NdbSqlUtil::CmpUnknown); + return r * jdir; + } + } + cols--; + r1 = r1->next(); + r2 = r2->next(); + } + return 0; +} + +int +NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, + bool forceSend){ + + m_curr_row = 0; + Uint32 u_idx = 0, u_last = 0; + Uint32 s_idx = m_current_api_receiver; // first sorted + Uint32 s_last = theParallelism; // last sorted + + NdbReceiver** arr = m_api_receivers; + NdbReceiver* tRec = arr[s_idx]; + + if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d", + fetchAllowed, + (s_idx < s_last ? tRec->nextResult() : 0)); + + if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", + u_idx, u_last, + s_idx, s_last); + + bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult(); + + if(fetchNeeded){ + if(fetchAllowed){ + if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch..."); + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + if(theError.code) + return -1; + Uint32 seq = theNdbCon->theNodeSequence; + Uint32 nodeId = theNdbCon->theDBnode; + if(seq == tp->getNodeSequence(nodeId) && + !send_next_scan_ordered(s_idx, forceSend)){ + Uint32 tmp = m_sent_receivers_count; + s_idx = m_current_api_receiver; + while(m_sent_receivers_count > 0 && !theError.code){ + theNdb->theImpl->theWaiter.m_node = nodeId; + theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { + continue; + } + if(DEBUG_NEXT_RESULT) ndbout_c("return -1"); + setErrorCode(4028); + return -1; + } + + if(theError.code){ + setErrorCode(theError.code); + if(DEBUG_NEXT_RESULT) ndbout_c("return -1"); + return -1; + } + + u_idx = 0; + u_last = m_conf_receivers_count; + m_conf_receivers_count = 0; + memcpy(arr, m_conf_receivers, u_last * sizeof(char*)); + + if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last); + } else { + setErrorCode(4028); + return -1; + } + } else { + if(DEBUG_NEXT_RESULT) ndbout_c("return 2"); + return 2; + } + } else { + u_idx = s_idx; + u_last = s_idx + 1; + s_idx++; + } + + if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", + u_idx, u_last, + s_idx, s_last); + + + Uint32 cols = m_sort_columns + m_read_range_no; + Uint32 skip = m_keyInfo; + while(u_idx < u_last){ + u_last--; + tRec = arr[u_last]; + + // Do binary search instead to find place + Uint32 place = s_idx; + for(; place < s_last; place++){ + if(compare(skip, cols, tRec, arr[place]) <= 0){ + break; + } + } + + if(place != s_idx){ + if(DEBUG_NEXT_RESULT) + ndbout_c("memmove(%d, %d, %d)", s_idx-1, s_idx, (place - s_idx)); + memmove(arr+s_idx-1, arr+s_idx, sizeof(char*)*(place - s_idx)); + } + + if(DEBUG_NEXT_RESULT) ndbout_c("putting %d @ %d", u_last, place - 1); + m_api_receivers[place-1] = tRec; + s_idx--; + } + + if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", + u_idx, u_last, + s_idx, s_last); + + m_current_api_receiver = s_idx; + + if(DEBUG_NEXT_RESULT) + for(Uint32 i = s_idx; i<s_last; i++) + ndbout_c("%p", arr[i]); + + tRec = m_api_receivers[s_idx]; + if(s_idx < s_last && tRec->nextResult()){ + m_curr_row = tRec->copyout(theReceiver); + if(DEBUG_NEXT_RESULT) ndbout_c("return 0"); + return 0; + } + + theError.code = -1; + if(DEBUG_NEXT_RESULT) ndbout_c("return 1"); + return 1; +} + +int +NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){ + if(idx == theParallelism) + return 0; + + NdbReceiver* tRec = m_api_receivers[idx]; + NdbApiSignal tSignal(theNdb->theMyRef); + tSignal.setSignal(GSN_SCAN_NEXTREQ); + + Uint32 last = m_sent_receivers_count; + Uint32* theData = tSignal.getDataPtrSend(); + Uint32* prep_array = theData + 4; + + m_current_api_receiver = idx + 1; + if((prep_array[0] = tRec->m_tcPtrI) == RNIL) + { + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver completed, don't send"); + return 0; + } + + theData[0] = theNdbCon->theTCConPtr; + theData[1] = 0; + Uint64 transId = theNdbCon->theTransactionId; + theData[2] = transId; + theData[3] = (Uint32) (transId >> 32); + + /** + * Prepare ops + */ + m_sent_receivers[last] = tRec; + tRec->m_list_index = last; + tRec->prepareSend(); + m_sent_receivers_count = last + 1; + + Uint32 nodeId = theNdbCon->theDBnode; + TransporterFacade * tp = TransporterFacade::instance(); + tSignal.setLength(4+1); + int ret= tp->sendSignal(&tSignal, nodeId); + if (!ret) checkForceSend(forceSend); + return ret; +} + +int +NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ + Uint32 seq = theNdbCon->theNodeSequence; + Uint32 nodeId = theNdbCon->theDBnode; + + if(seq != tp->getNodeSequence(nodeId)) + { + theNdbCon->theReleaseOnClose = true; + return -1; + } + + /** + * Wait for outstanding + */ + while(theError.code == 0 && m_sent_receivers_count) + { + theNdb->theImpl->theWaiter.m_node = nodeId; + theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + switch(return_code){ + case 0: + break; + case -1: + setErrorCode(4008); + case -2: + m_api_receivers_count = 0; + m_conf_receivers_count = 0; + m_sent_receivers_count = 0; + theNdbCon->theReleaseOnClose = true; + return -1; + } + } + + if(theError.code) + { + m_api_receivers_count = 0; + m_current_api_receiver = m_ordered ? theParallelism : 0; + } + + + /** + * move all conf'ed into api + * so that send_next_scan can check if they needs to be closed + */ + Uint32 api = m_api_receivers_count; + Uint32 conf = m_conf_receivers_count; + + if(m_ordered) + { + /** + * Ordered scan, keep the m_api_receivers "to the right" + */ + memmove(m_api_receivers, m_api_receivers+m_current_api_receiver, + (theParallelism - m_current_api_receiver) * sizeof(char*)); + api = (theParallelism - m_current_api_receiver); + m_api_receivers_count = api; + } + + if(DEBUG_NEXT_RESULT) + ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d", + m_ordered, api, conf, + m_sent_receivers_count, m_current_api_receiver, theParallelism); + + if(api+conf) + { + /** + * There's something to close + * setup m_api_receivers (for send_next_scan) + */ + memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*)); + m_api_receivers_count = api + conf; + m_conf_receivers_count = 0; + } + + // Send close scan + if(send_next_scan(api+conf, true, forceSend) == -1) + { + theNdbCon->theReleaseOnClose = true; + return -1; + } + + /** + * wait for close scan conf + */ + while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count) + { + theNdb->theImpl->theWaiter.m_node = nodeId; + theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + switch(return_code){ + case 0: + break; + case -1: + setErrorCode(4008); + case -2: + m_api_receivers_count = 0; + m_conf_receivers_count = 0; + m_sent_receivers_count = 0; + theNdbCon->theReleaseOnClose = true; + return -1; + } + } + + return 0; +} + +void +NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){ + for(Uint32 i = 0; i<parallell; i++){ + m_receivers[i]->m_list_index = i; + m_prepared_receivers[i] = m_receivers[i]->getId(); + m_sent_receivers[i] = m_receivers[i]; + m_conf_receivers[i] = 0; + m_api_receivers[i] = 0; + m_receivers[i]->prepareSend(); + } + + m_api_receivers_count = 0; + m_current_api_receiver = 0; + m_sent_receivers_count = 0; + m_conf_receivers_count = 0; +} + +int +NdbScanOperation::restart(bool forceSend) +{ + + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + Uint32 nodeId = theNdbCon->theDBnode; + + { + int res; + if((res= close_impl(tp, forceSend))) + { + return res; + } + } + + /** + * Reset receivers + */ + reset_receivers(theParallelism, m_ordered); + + theError.code = 0; + if (doSendScan(nodeId) == -1) + return -1; + + return 0; +} + +int +NdbIndexScanOperation::reset_bounds(bool forceSend){ + int res; + + { + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + res= close_impl(tp, forceSend); + } + + if(!res) + { + theError.code = 0; + reset_receivers(theParallelism, m_ordered); + + theLastKEYINFO = theSCAN_TABREQ->next(); + theKEYINFOptr = ((KeyInfo*)theLastKEYINFO->getDataPtrSend())->keyData; + theTupKeyLen = 0; + theTotalNrOfKeyWordInSignal = 0; + theNoOfTupKeyLeft = m_accessTable->m_noOfDistributionKeys; + theDistrKeyIndicator_ = 0; + m_this_bound_start = 0; + m_first_bound_word = theKEYINFOptr; + m_transConnection + ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp, + this); + m_transConnection->define_scan_op(this); + return 0; + } + return res; +} + +int +NdbIndexScanOperation::end_of_bound(Uint32 no) +{ + if(no < (1 << 13)) // Only 12-bits no of ranges + { + Uint32 bound_head = * m_first_bound_word; + bound_head |= (theTupKeyLen - m_this_bound_start) << 16 | (no << 4); + * m_first_bound_word = bound_head; + + m_first_bound_word = theKEYINFOptr + theTotalNrOfKeyWordInSignal;; + m_this_bound_start = theTupKeyLen; + return 0; + } + return -1; +} + +int +NdbIndexScanOperation::get_range_no() +{ + NdbRecAttr* tRecAttr = m_curr_row; + if(m_read_range_no && tRecAttr) + { + if(m_keyInfo) + tRecAttr = tRecAttr->next(); + Uint32 ret = *(Uint32*)tRecAttr->aRef(); + return ret; + } + return -1; +} |