diff options
Diffstat (limited to 'ndb/src/ndbapi/NdbScanOperation.cpp')
-rw-r--r-- | ndb/src/ndbapi/NdbScanOperation.cpp | 513 |
1 files changed, 300 insertions, 213 deletions
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 0aa40f968bb..fd63ce96f25 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -32,6 +32,7 @@ #include <signaldata/ScanTab.hpp> #include <signaldata/KeyInfo.hpp> +#include <signaldata/AttrInfo.hpp> #include <signaldata/TcKeyReq.hpp> NdbScanOperation::NdbScanOperation(Ndb* aNdb) : @@ -47,11 +48,13 @@ NdbScanOperation::NdbScanOperation(Ndb* aNdb) : m_sent_receivers = 0; m_receivers = 0; m_array = new Uint32[1]; // skip if on delete in fix_receivers + theSCAN_TABREQ = 0; } NdbScanOperation::~NdbScanOperation() { for(Uint32 i = 0; i<m_allocated_receivers; i++){ + m_receivers[i]->release(); theNdb->releaseNdbScanRec(m_receivers[i]); } delete[] m_array; @@ -95,7 +98,7 @@ NdbScanOperation::setErrorCodeAbort(int aErrorCode){ * Remark: Initiates operation record after allocation. *****************************************************************************/ int -NdbScanOperation::init(NdbTableImpl* tab, NdbConnection* myConnection) +NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection) { m_transConnection = myConnection; //NdbConnection* aScanConnection = theNdb->startTransaction(myConnection); @@ -114,10 +117,8 @@ NdbScanOperation::init(NdbTableImpl* tab, NdbConnection* myConnection) theStatus = GetValue; theOperationType = OpenScanRequest; - - theTotalBoundAI_Len = 0; - theBoundATTRINFO = NULL; - + theNdbCon->theMagicNumber = 0xFE11DF; + return 0; } @@ -129,17 +130,9 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, Uint32 fragCount = m_currentTable->m_fragmentCount; - if (batch + parallel == 0) { - batch = 16; - parallel= fragCount; - } else { - if (batch == 0 && parallel > 0) { // Backward - batch = (parallel >= 16 ? 16 : parallel); - parallel = (parallel + 15) / 16; - } - if (parallel > fragCount || parallel == 0) + if (parallel > fragCount || parallel == 0) { parallel = fragCount; - } + } // It is only possible to call openScan if // 1. this transcation don't already contain another scan operation @@ -151,6 +144,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, } theNdbCon->theScanningOp = this; + theLockMode = lm; bool lockExcl, lockHoldMode, readCommitted; switch(lm){ @@ -174,7 +168,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, return 0; } - m_keyInfo = lockExcl; + m_keyInfo = lockExcl ? 1 : 0; bool range = false; if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex || @@ -187,20 +181,19 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, } assert (m_currentTable != m_accessTable); // Modify operation state - theStatus = SetBound; + theStatus = GetValue; theOperationType = OpenRangeScanRequest; range = true; } theParallelism = parallel; - theBatchSize = batch; if(fix_receivers(parallel) == -1){ setErrorCodeAbort(4000); return 0; } - theSCAN_TABREQ = theNdb->getSignal(); + theSCAN_TABREQ = (!theSCAN_TABREQ ? theNdb->getSignal() : theSCAN_TABREQ); if (theSCAN_TABREQ == NULL) { setErrorCodeAbort(4000); return 0; @@ -215,7 +208,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, Uint32 reqInfo = 0; ScanTabReq::setParallelism(reqInfo, parallel); - ScanTabReq::setScanBatch(reqInfo, batch); + ScanTabReq::setScanBatch(reqInfo, 0); ScanTabReq::setLockMode(reqInfo, lockExcl); ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode); ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted); @@ -226,8 +219,17 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, req->transId1 = (Uint32) transId; req->transId2 = (Uint32) (transId >> 32); - getFirstATTRINFOScan(); + NdbApiSignal* tSignal = + theFirstKEYINFO; + + theFirstKEYINFO = (tSignal ? tSignal : tSignal = theNdb->getSignal()); + theLastKEYINFO = tSignal; + + tSignal->setSignal(GSN_KEYINFO); + theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; + theTotalNrOfKeyWordInSignal= 0; + getFirstATTRINFOScan(); return getResultSet(); } @@ -263,18 +265,7 @@ NdbScanOperation::fix_receivers(Uint32 parallel){ m_allocated_receivers = parallel; } - for(Uint32 i = 0; i<parallel; i++){ - m_receivers[i]->m_list_index = i; - m_prepared_receivers[i] = m_receivers[i]->getId(); - m_sent_receivers[i] = m_receivers[i]; - m_conf_receivers[i] = 0; - m_api_receivers[i] = 0; - } - - m_api_receivers_count = 0; - m_current_api_receiver = 0; - m_sent_receivers_count = parallel; - m_conf_receivers_count = 0; + reset_receivers(parallel, 0); return 0; } @@ -362,6 +353,7 @@ NdbScanOperation::getFirstATTRINFOScan() * After setBound() are done, move the accumulated ATTRINFO signals to * a separate list. Then continue with normal scan. */ +#if 0 int NdbIndexScanOperation::saveBoundATTRINFO() { @@ -395,8 +387,8 @@ NdbIndexScanOperation::saveBoundATTRINFO() Uint32 cnt = m_accessTable->getNoOfColumns() - 1; m_sort_columns = cnt - i; for(; i<cnt; i++){ - NdbColumnImpl* key = m_accessTable->m_index->m_columns[i]; - NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos); + const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i]; + const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos); NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1); UintPtr newVal = UintPtr(tmp); theTupleKeyDefined[i][0] = FAKE_PTR; @@ -408,6 +400,7 @@ NdbIndexScanOperation::saveBoundATTRINFO() } return res; } +#endif #define WAITFOR_SCAN_TIMEOUT 120000 @@ -416,14 +409,22 @@ NdbScanOperation::executeCursor(int nodeId){ NdbConnection * tCon = theNdbCon; TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); + + Uint32 magic = tCon->theMagicNumber; Uint32 seq = tCon->theNodeSequence; + if (tp->get_node_alive(nodeId) && (tp->getNodeSequence(nodeId) == seq)) { - - if(prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1) - return -1; + /** + * Only call prepareSendScan first time (incase of restarts) + * - check with theMagicNumber + */ tCon->theMagicNumber = 0x37412619; + if(magic != 0x37412619 && + prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1) + return -1; + if (doSendScan(nodeId) == -1) return -1; @@ -435,7 +436,6 @@ NdbScanOperation::executeCursor(int nodeId){ TRACE_DEBUG("The node is hard dead when attempting to start a scan"); setErrorCode(4029); tCon->theReleaseOnClose = true; - abort(); } else { TRACE_DEBUG("The node is stopping when attempting to start a scan"); setErrorCode(4030); @@ -529,20 +529,9 @@ int NdbScanOperation::nextResult(bool fetchAllowed) /** * No completed & no sent -> EndOfData */ - if(send_next_scan(0, true) == 0){ // Close scan - theNdb->theWaiter.m_node = nodeId; - theNdb->theWaiter.m_state = WAIT_SCAN; - int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); - if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { - theError.code = -1; // make sure user gets error if he tries again - if(DEBUG_NEXT_RESULT) ndbout_c("return 1"); - return 1; - } - retVal = -1; //return_code; - } else { - retVal = -3; - } - idx = last; + theError.code = -1; // make sure user gets error if he tries again + if(DEBUG_NEXT_RESULT) ndbout_c("return 1"); + return 1; } if(retVal == 0) @@ -577,6 +566,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) setErrorCode(4028); // Node fail break; case -3: // send_next_scan -> return fail (set error-code self) + if(theError.code == 0) + setErrorCode(4028); // seq changed = Node fail break; } @@ -653,7 +644,7 @@ NdbScanOperation::doSend(int ProcessorId) void NdbScanOperation::closeScan() { - if(m_transConnection) do { + if(m_transConnection){ if(DEBUG_NEXT_RESULT) ndbout_c("closeScan() theError.code = %d " "m_api_receivers_count = %d " @@ -666,55 +657,8 @@ void NdbScanOperation::closeScan() TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); - - Uint32 seq = theNdbCon->theNodeSequence; - Uint32 nodeId = theNdbCon->theDBnode; - - if(seq != tp->getNodeSequence(nodeId)){ - theNdbCon->theReleaseOnClose = true; - break; - } - - while(theError.code == 0 && m_sent_receivers_count){ - theNdb->theWaiter.m_node = nodeId; - theNdb->theWaiter.m_state = WAIT_SCAN; - int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); - switch(return_code){ - case 0: - break; - case -1: - setErrorCode(4008); - case -2: - m_api_receivers_count = 0; - m_conf_receivers_count = 0; - m_sent_receivers_count = 0; - theNdbCon->theReleaseOnClose = true; - } - } - - if(m_api_receivers_count+m_conf_receivers_count){ - // Send close scan - send_next_scan(0, true); // Close scan - } + close_impl(tp); - /** - * wait for close scan conf - */ - while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){ - theNdb->theWaiter.m_node = nodeId; - theNdb->theWaiter.m_state = WAIT_SCAN; - int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); - switch(return_code){ - case 0: - break; - case -1: - setErrorCode(4008); - case -2: - m_api_receivers_count = 0; - m_conf_receivers_count = 0; - m_sent_receivers_count = 0; - } - } } while(0); theNdbCon->theScanningOp = 0; @@ -739,6 +683,12 @@ void NdbScanOperation::release() for(Uint32 i = 0; i<m_allocated_receivers; i++){ m_receivers[i]->release(); } + if(theSCAN_TABREQ) + { + theNdb->releaseSignal(theSCAN_TABREQ); + theSCAN_TABREQ = 0; + } + NdbOperation::release(); } /*************************************************************************** @@ -762,11 +712,6 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, return -1; } - if (theStatus == SetBound) { - ((NdbIndexScanOperation*)this)->saveBoundATTRINFO(); - theStatus = GetValue; - } - theErrorLine = 0; // In preapareSendInterpreted we set the sizes (word 4-8) in the @@ -778,26 +723,7 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, ((NdbIndexScanOperation*)this)->fix_get_values(); } - const Uint32 transId1 = (Uint32) (aTransactionId & 0xFFFFFFFF); - const Uint32 transId2 = (Uint32) (aTransactionId >> 32); - - if (theOperationType == OpenRangeScanRequest) { - NdbApiSignal* tSignal = theBoundATTRINFO; - do{ - tSignal->setData(aTC_ConnectPtr, 1); - tSignal->setData(transId1, 2); - tSignal->setData(transId2, 3); - tSignal = tSignal->next(); - } while (tSignal != NULL); - } theCurrentATTRINFO->setLength(theAI_LenInCurrAI); - NdbApiSignal* tSignal = theFirstATTRINFO; - do{ - tSignal->setData(aTC_ConnectPtr, 1); - tSignal->setData(transId1, 2); - tSignal->setData(transId2, 3); - tSignal = tSignal->next(); - } while (tSignal != NULL); /** * Prepare all receivers @@ -805,20 +731,43 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, theReceiver.prepareSend(); bool keyInfo = m_keyInfo; Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0; + /** + * The number of records sent by each LQH is calculated and the kernel + * is informed of this number by updating the SCAN_TABREQ signal + */ + Uint32 batch_size, batch_byte_size, first_batch_size; + theReceiver.calculate_batch_size(key_size, + theParallelism, + batch_size, + batch_byte_size, + first_batch_size); + ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); + ScanTabReq::setScanBatch(req->requestInfo, batch_size); + req->batch_byte_size= batch_byte_size; + req->first_batch_size= first_batch_size; + + /** + * Set keyinfo flag + * (Always keyinfo when using blobs) + */ + Uint32 reqInfo = req->requestInfo; + ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo); + req->requestInfo = reqInfo; + for(Uint32 i = 0; i<theParallelism; i++){ - m_receivers[i]->do_get_value(&theReceiver, theBatchSize, key_size); + m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size); } return 0; } -/****************************************************************************** +/***************************************************************************** int doSend() Return Value: Return >0 : send was succesful, returns number of signals sent Return -1: In all other case. Parameters: aProcessorId: Receiving processor node Remark: Sends the ATTRINFO signal(s) -******************************************************************************/ +*****************************************************************************/ int NdbScanOperation::doSendScan(int aProcessorId) { @@ -838,47 +787,61 @@ NdbScanOperation::doSendScan(int aProcessorId) setErrorCode(4001); return -1; } + + Uint32 tupKeyLen = theTupKeyLen; + Uint32 len = theTotalNrOfKeyWordInSignal; + Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr; + Uint64 transId = theNdbCon->theTransactionId; + // Update the "attribute info length in words" in SCAN_TABREQ before // sending it. This could not be done in openScan because // we created the ATTRINFO signals after the SCAN_TABREQ signal. ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend()); - req->attrLen = theTotalCurrAI_Len; - if (theOperationType == OpenRangeScanRequest) - req->attrLen += theTotalBoundAI_Len; + req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len; + TransporterFacade *tp = TransporterFacade::instance(); - if(theParallelism > 16){ - LinearSectionPtr ptr[3]; - ptr[0].p = m_prepared_receivers; - ptr[0].sz = theParallelism; - if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) { - setErrorCode(4002); - return -1; - } - } else { - tSignal->setLength(9+theParallelism); - memcpy(tSignal->getDataPtrSend()+9, m_prepared_receivers, 4*theParallelism); - if (tp->sendSignal(tSignal, aProcessorId) == -1) { - setErrorCode(4002); - return -1; - } - } + LinearSectionPtr ptr[3]; + ptr[0].p = m_prepared_receivers; + ptr[0].sz = theParallelism; + if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) { + setErrorCode(4002); + return -1; + } - if (theOperationType == OpenRangeScanRequest) { + if (tupKeyLen > 0){ // must have at least one signal since it contains attrLen for bounds - assert(theBoundATTRINFO != NULL); - tSignal = theBoundATTRINFO; - while (tSignal != NULL) { + assert(theLastKEYINFO != NULL); + tSignal = theLastKEYINFO; + tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal); + + assert(theFirstKEYINFO != NULL); + tSignal = theFirstKEYINFO; + + NdbApiSignal* last; + do { + KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend()); + keyInfo->connectPtr = aTC_ConnectPtr; + keyInfo->transId[0] = Uint32(transId); + keyInfo->transId[1] = Uint32(transId >> 32); + if (tp->sendSignal(tSignal,aProcessorId) == -1){ - setErrorCode(4002); - return -1; + setErrorCode(4002); + return -1; } + tSignalCount++; + last = tSignal; tSignal = tSignal->next(); - } + } while(last != theLastKEYINFO); } tSignal = theFirstATTRINFO; while (tSignal != NULL) { + AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend()); + attrInfo->connectPtr = aTC_ConnectPtr; + attrInfo->transId[0] = Uint32(transId); + attrInfo->transId[1] = Uint32(transId >> 32); + if (tp->sendSignal(tSignal,aProcessorId) == -1){ setErrorCode(4002); return -1; @@ -890,7 +853,7 @@ NdbScanOperation::doSendScan(int aProcessorId) return tSignalCount; }//NdbOperation::doSendScan() -/****************************************************************************** +/***************************************************************************** * NdbOperation* takeOverScanOp(NdbConnection* updateTrans); * * Parameters: The update transactions NdbConnection pointer. @@ -909,7 +872,7 @@ NdbScanOperation::doSendScan(int aProcessorId) * This means that the updating transactions can be placed * in separate threads and thus increasing the parallelism during * the scan process. - *****************************************************************************/ + ****************************************************************************/ int NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size) { @@ -947,6 +910,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ if (newOp == NULL){ return NULL; } + pTrans->theSimpleState = 0; const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1; @@ -959,8 +923,8 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ } const Uint32 * src = (Uint32*)tRecAttr->aRef(); - const Uint32 tScanInfo = src[len] & 0xFFFF; - const Uint32 tTakeOverNode = src[len] >> 16; + const Uint32 tScanInfo = src[len] & 0x3FFFF; + const Uint32 tTakeOverNode = src[len] >> 20; { UintR scanInfo = 0; TcKeyReq::setTakeOverScanFlag(scanInfo, 1); @@ -1018,6 +982,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ NdbBlob* NdbScanOperation::getBlobHandle(const char* anAttrName) { + m_keyInfo = 1; return NdbOperation::getBlobHandle(m_transConnection, m_currentTable->getColumn(anAttrName)); } @@ -1025,6 +990,7 @@ NdbScanOperation::getBlobHandle(const char* anAttrName) NdbBlob* NdbScanOperation::getBlobHandle(Uint32 anAttrId) { + m_keyInfo = 1; return NdbOperation::getBlobHandle(m_transConnection, m_currentTable->getColumn(anAttrId)); } @@ -1038,13 +1004,15 @@ NdbIndexScanOperation::~NdbIndexScanOperation(){ } int -NdbIndexScanOperation::setBound(const char* anAttrName, int type, const void* aValue, Uint32 len) +NdbIndexScanOperation::setBound(const char* anAttrName, int type, + const void* aValue, Uint32 len) { return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len); } int -NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len) +NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, + const void* aValue, Uint32 len) { return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len); } @@ -1063,11 +1031,6 @@ NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, return NdbScanOperation::getValue_impl(attrInfo, aValue); } - if (theStatus == SetBound) { - saveBoundATTRINFO(); - theStatus = GetValue; - } - int id = attrInfo->m_attrId; // In "real" table assert(m_accessTable->m_index); int sz = (int)m_accessTable->m_index->m_key_ids.size(); @@ -1108,35 +1071,64 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, int type, const void* aValue, Uint32 len) { if (theOperationType == OpenRangeScanRequest && - theStatus == SetBound && (0 <= type && type <= 4) && len <= 8000) { - // bound type - - insertATTRINFO(type); - // attribute header + // insert bound type + Uint32 currLen = theTotalNrOfKeyWordInSignal; + Uint32 remaining = KeyInfo::DataLength - currLen; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; + + // normalize char bound + CHARSET_INFO* cs = tAttrInfo->m_cs; + Uint32 xfrmData[2000]; + if (cs != NULL && aValue != NULL) { + // current limitation: strxfrm does not increase length + assert(cs->strxfrm_multiply == 1); + unsigned n = + (*cs->coll->strnxfrm)(cs, + (uchar*)xfrmData, sizeof(xfrmData), + (const uchar*)aValue, sizeInBytes); + while (n < sizeInBytes) + ((uchar*)xfrmData)[n++] = 0x20; + aValue = (char*)xfrmData; + } if (len != sizeInBytes && (len != 0)) { setErrorCodeAbort(4209); return -1; } + // insert attribute header len = aValue != NULL ? sizeInBytes : 0; Uint32 tIndexAttrId = tAttrInfo->m_attrId; Uint32 sizeInWords = (len + 3) / 4; AttributeHeader ah(tIndexAttrId, sizeInWords); - insertATTRINFO(ah.m_value); - if (len != 0) { - // attribute data - if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0) - insertATTRINFOloop((const Uint32*)aValue, sizeInWords); - else { - Uint32 temp[2000]; - memcpy(temp, aValue, len); + const Uint32 ahValue = ah.m_value; + + const bool aligned = (UintPtr(aValue) & 3) == 0; + const bool nobytes = (len & 0x3) == 0; + const Uint32 totalLen = 2 + sizeInWords; + Uint32 tupKeyLen = theTupKeyLen; + if(remaining > totalLen && aligned && nobytes){ + Uint32 * dst = theKEYINFOptr + currLen; + * dst ++ = type; + * dst ++ = ahValue; + memcpy(dst, aValue, 4 * sizeInWords); + theTotalNrOfKeyWordInSignal = currLen + totalLen; + } else { + if(!aligned || !nobytes){ + Uint32 tempData[2002]; + tempData[0] = type; + tempData[1] = ahValue; + memcpy(tempData+2, aValue, len); while ((len & 0x3) != 0) - ((char*)temp)[len++] = 0; - insertATTRINFOloop(temp, sizeInWords); + ((char*)&tempData[2])[len++] = 0; + insertBOUNDS(tempData, 2+sizeInWords); + } else { + Uint32 buf[2] = { type, ahValue }; + insertBOUNDS(buf, 2); + insertBOUNDS((Uint32*)aValue, sizeInWords); } } + theTupKeyLen = tupKeyLen + totalLen; /** * Do sorted stuff @@ -1159,6 +1151,46 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, } } +int +NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){ + Uint32 len; + Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal; + Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal; + do { + len = (sz < remaining ? sz : remaining); + memcpy(dst, data, 4 * len); + + if(sz >= remaining){ + NdbApiSignal* tCurr = theLastKEYINFO; + tCurr->setLength(KeyInfo::MaxSignalLength); + NdbApiSignal* tSignal = tCurr->next(); + if(tSignal) + ; + else if((tSignal = theNdb->getSignal()) != 0) + { + tCurr->next(tSignal); + tSignal->setSignal(GSN_KEYINFO); + } else { + goto error; + } + theLastKEYINFO = tSignal; + theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; + remaining = KeyInfo::DataLength; + sz -= len; + data += len; + } else { + len = (KeyInfo::DataLength - remaining) + len; + break; + } + } while(true); + theTotalNrOfKeyWordInSignal = len; + return 0; + +error: + setErrorCodeAbort(4228); // XXX wrong code + return -1; +} + NdbResultSet* NdbIndexScanOperation::readTuples(LockMode lm, Uint32 batch, @@ -1167,9 +1199,23 @@ NdbIndexScanOperation::readTuples(LockMode lm, NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0); if(rs && order_by){ m_ordered = 1; - m_sort_columns = m_accessTable->getNoOfColumns() - 1; // -1 for NDB$NODE + Uint32 cnt = m_accessTable->getNoOfColumns() - 1; + m_sort_columns = cnt; // -1 for NDB$NODE m_current_api_receiver = m_sent_receivers_count; m_api_receivers_count = m_sent_receivers_count; + + m_sort_columns = cnt; + for(Uint32 i = 0; i<cnt; i++){ + const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i]; + const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos); + NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1); + UintPtr newVal = UintPtr(tmp); + theTupleKeyDefined[i][0] = FAKE_PTR; + theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF); +#if (SIZEOF_CHARP == 8) + theTupleKeyDefined[i][2] = (newVal >> 32); +#endif + } } return rs; } @@ -1183,8 +1229,8 @@ NdbIndexScanOperation::fix_get_values(){ Uint32 cnt = m_accessTable->getNoOfColumns() - 1; assert(cnt < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY); - NdbIndexImpl * idx = m_accessTable->m_index; - NdbTableImpl * tab = m_currentTable; + const NdbIndexImpl * idx = m_accessTable->m_index; + const NdbTableImpl * tab = m_currentTable; for(Uint32 i = 0; i<cnt; i++){ Uint32 val = theTupleKeyDefined[i][0]; switch(val){ @@ -1221,13 +1267,13 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, if((r1_null ^ (unsigned)r2->isNULL())){ return (r1_null ? -1 : 1); } - Uint32 type = NdbColumnImpl::getImpl(* r1->m_column).m_extType; + const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column); Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4; if(!r1_null){ - char r = NdbSqlUtil::cmp(type, d1, d2, size, size); + const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_extType); + int r = (*sqlType.m_cmp)(col.m_cs, d1, d2, size, size); if(r){ assert(r != NdbSqlUtil::CmpUnknown); - assert(r != NdbSqlUtil::CmpError); return r; } } @@ -1348,19 +1394,9 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ return 0; } - TransporterFacade* tp = TransporterFacade::instance(); - Guard guard(tp->theMutexPtr); - Uint32 seq = theNdbCon->theNodeSequence; - Uint32 nodeId = theNdbCon->theDBnode; - if(seq == tp->getNodeSequence(nodeId) && - send_next_scan(0, true) == 0 && - theError.code == 0){ - if(DEBUG_NEXT_RESULT) ndbout_c("return 1"); - return 1; - } - setErrorCode(theError.code); - if(DEBUG_NEXT_RESULT) ndbout_c("return -1"); - return -1; + theError.code = -1; + if(DEBUG_NEXT_RESULT) ndbout_c("return 1"); + return 1; } int @@ -1400,10 +1436,7 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ } int -NdbScanOperation::restart(){ - TransporterFacade* tp = TransporterFacade::instance(); - Guard guard(tp->theMutexPtr); - +NdbScanOperation::close_impl(TransporterFacade* tp){ Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; @@ -1411,8 +1444,8 @@ NdbScanOperation::restart(){ theNdbCon->theReleaseOnClose = true; return -1; } - - while(m_sent_receivers_count){ + + while(theError.code == 0 && m_sent_receivers_count){ theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1425,14 +1458,17 @@ NdbScanOperation::restart(){ m_api_receivers_count = 0; m_conf_receivers_count = 0; m_sent_receivers_count = 0; + theNdbCon->theReleaseOnClose = true; return -1; } } if(m_api_receivers_count+m_conf_receivers_count){ // Send close scan - if(send_next_scan(0, true) == -1) // Close scan + if(send_next_scan(0, true) == -1){ // Close scan + theNdbCon->theReleaseOnClose = true; return -1; + } } /** @@ -1451,15 +1487,15 @@ NdbScanOperation::restart(){ m_api_receivers_count = 0; m_conf_receivers_count = 0; m_sent_receivers_count = 0; + theNdbCon->theReleaseOnClose = true; return -1; } } + return 0; +} - /** - * Reset receivers - */ - const Uint32 parallell = theParallelism; - +void +NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){ for(Uint32 i = 0; i<parallell; i++){ m_receivers[i]->m_list_index = i; m_prepared_receivers[i] = m_receivers[i]->getId(); @@ -1474,13 +1510,64 @@ NdbScanOperation::restart(){ m_sent_receivers_count = parallell; m_conf_receivers_count = 0; - if(m_ordered){ + if(ordered){ m_current_api_receiver = parallell; m_api_receivers_count = parallell; } +} + +int +NdbScanOperation::restart() +{ + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + Uint32 nodeId = theNdbCon->theDBnode; + + { + int res; + if((res= close_impl(tp))) + { + return res; + } + } + + /** + * Reset receivers + */ + reset_receivers(theParallelism, m_ordered); + + theError.code = 0; if (doSendScan(nodeId) == -1) return -1; return 0; } + +int +NdbIndexScanOperation::reset_bounds(){ + int res; + + { + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + res= close_impl(tp); + } + + if(!res) + { + theError.code = 0; + reset_receivers(theParallelism, m_ordered); + + theLastKEYINFO = theFirstKEYINFO; + theKEYINFOptr = ((KeyInfo*)theFirstKEYINFO->getDataPtrSend())->keyData; + theTupKeyLen = 0; + theTotalNrOfKeyWordInSignal = 0; + m_transConnection + ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp, + this); + m_transConnection->define_scan_op(this); + return 0; + } + return res; +} |