diff options
author | unknown <joreland@mysql.com> | 2004-11-01 23:23:26 +0100 |
---|---|---|
committer | unknown <joreland@mysql.com> | 2004-11-01 23:23:26 +0100 |
commit | 1e7ff000943c7e98fcad90132a7640aecd6e78e2 (patch) | |
tree | 28a81d0899b3211dcbb88ea2aa831dd737c1f986 | |
parent | 747e452820293b884b758b2ce61f0709a75d836f (diff) | |
download | mariadb-git-1e7ff000943c7e98fcad90132a7640aecd6e78e2.tar.gz |
wl1504 - scan using distribution key and EQ_BOUND
ndb/include/kernel/signaldata/ScanTab.hpp:
Add distribution key to ScanTab to enable scanning of specific fragment
ndb/include/ndbapi/NdbOperation.hpp:
New methods for controlling distribution/partitioning
ndb/include/ndbapi/NdbScanOperation.hpp:
New methods for controlling distribution/partitioning
ndb/src/kernel/blocks/dbdih/DbdihMain.cpp:
Don't mask away kvalue from hash while computing fragId
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
Add distribution key to ScanTab to enable scanning of specific fragment
ndb/src/ndbapi/NdbBlob.cpp:
removed m_sizeOfKeysInWords which was the same a m_keyLenInWords
ndb/src/ndbapi/NdbConnection.cpp:
removed explicit cast
ndb/src/ndbapi/NdbDictionaryImpl.cpp:
removed m_sizeOfKeysInWords which was the same a m_keyLenInWords
ndb/src/ndbapi/NdbDictionaryImpl.hpp:
removed m_sizeOfKeysInWords which was the same a m_keyLenInWords
ndb/src/ndbapi/NdbIndexOperation.cpp:
removed theFirstKEYINFO and replaced that with theTCREQ->next
ndb/src/ndbapi/NdbOperation.cpp:
removed theFirstKEYINFO and replaced that with theTCREQ->next
ndb/src/ndbapi/NdbOperationExec.cpp:
removed theFirstKEYINFO and replaced that with theTCREQ->next
ndb/src/ndbapi/NdbOperationSearch.cpp:
removed theFirstKEYINFO and replaced that with theTCREQ->next
Enable partition scan
ndb/src/ndbapi/NdbScanOperation.cpp:
removed theFirstKEYINFO and replaced that with theTCREQ->next
Enable partition scan
-rw-r--r-- | ndb/include/kernel/signaldata/ScanTab.hpp | 24 | ||||
-rw-r--r-- | ndb/include/ndbapi/NdbOperation.hpp | 46 | ||||
-rw-r--r-- | ndb/include/ndbapi/NdbScanOperation.hpp | 1 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbdih/DbdihMain.cpp | 3 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtc/DbtcMain.cpp | 81 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbBlob.cpp | 20 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbConnection.cpp | 3 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbDictionaryImpl.cpp | 12 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbDictionaryImpl.hpp | 10 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbIndexOperation.cpp | 7 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperation.cpp | 31 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperationExec.cpp | 6 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperationSearch.cpp | 188 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbScanOperation.cpp | 126 |
14 files changed, 347 insertions, 211 deletions
diff --git a/ndb/include/kernel/signaldata/ScanTab.hpp b/ndb/include/kernel/signaldata/ScanTab.hpp index 2029b16197e..ca70b8c62a9 100644 --- a/ndb/include/kernel/signaldata/ScanTab.hpp +++ b/ndb/include/kernel/signaldata/ScanTab.hpp @@ -65,7 +65,12 @@ private: UintR buddyConPtr; // DATA 8 UintR batch_byte_size; // DATA 9 UintR first_batch_size; // DATA 10 - + + /** + * Optional + */ + Uint32 distributionKey; + /** * Get:ers for requestInfo */ @@ -76,6 +81,7 @@ private: static Uint8 getRangeScanFlag(const UintR & requestInfo); static Uint8 getKeyinfoFlag(const UintR & requestInfo); static Uint16 getScanBatch(const UintR & requestInfo); + static Uint8 getDistributionKeyFlag(const UintR & requestInfo); /** * Set:ers for requestInfo @@ -88,6 +94,7 @@ private: static void setRangeScanFlag(UintR & requestInfo, Uint32 flag); static void setKeyinfoFlag(UintR & requestInfo, Uint32 flag); static void setScanBatch(Uint32& requestInfo, Uint32 sz); + static void setDistributionKeyFlag(Uint32& requestInfo, Uint32 flag); }; /** @@ -100,6 +107,7 @@ private: k = Keyinfo - 1 Bit 12 x = Range Scan (TUX) - 1 Bit 15 b = Scan batch - 10 Bit 16-25 (max 1023) + d = Distribution key flag 1111111111222222222233 01234567890123456789012345678901 @@ -127,6 +135,8 @@ private: #define SCAN_BATCH_SHIFT (16) #define SCAN_BATCH_MASK (1023) +#define SCAN_DISTR_KEY_SHIFT (26) + inline Uint8 ScanTabReq::getParallelism(const UintR & requestInfo){ @@ -225,6 +235,18 @@ ScanTabReq::setKeyinfoFlag(UintR & requestInfo, Uint32 flag){ requestInfo |= (flag << KEYINFO_SHIFT); } +inline +Uint8 +ScanTabReq::getDistributionKeyFlag(const UintR & requestInfo){ + return (Uint8)((requestInfo >> SCAN_DISTR_KEY_SHIFT) & 1); +} + +inline +void +ScanTabReq::setDistributionKeyFlag(UintR & requestInfo, Uint32 flag){ + ASSERT_BOOL(flag, "ScanTabReq::setKeyinfoFlag"); + requestInfo |= (flag << SCAN_DISTR_KEY_SHIFT); +} /** * diff --git a/ndb/include/ndbapi/NdbOperation.hpp b/ndb/include/ndbapi/NdbOperation.hpp index ef450ec18c2..23ca32cab37 100644 --- a/ndb/include/ndbapi/NdbOperation.hpp +++ b/ndb/include/ndbapi/NdbOperation.hpp @@ -720,11 +720,14 @@ public: LockMode getLockMode() const { return theLockMode; } /** - * Set/get distribution key + * Set/get distribution/partition key */ - void setDistributionKey(Uint32 key); - Uint32 getDistributionKey() const; - + void setPartitionId(Uint32 id); + void setPartitionHash(Uint32 key); + void setPartitionHash(const Uint64 *, Uint32 len); + Uint32 getPartitionId() const; +protected: + int handle_distribution_key(const Uint64 *, Uint32 len); protected: /****************************************************************************** * These are the methods used to create and delete the NdbOperation objects. @@ -855,14 +858,18 @@ protected: Ndb* theNdb; // Point back to the Ndb object. NdbConnection* theNdbCon; // Point back to the connection object. NdbOperation* theNext; // Next pointer to operation. - NdbApiSignal* theTCREQ; // The TC[KEY/INDX]REQ signal object + + union { + NdbApiSignal* theTCREQ; // The TC[KEY/INDX]REQ signal object + NdbApiSignal* theSCAN_TABREQ; + }; + NdbApiSignal* theFirstATTRINFO; // The first ATTRINFO signal object NdbApiSignal* theCurrentATTRINFO; // The current ATTRINFO signal object Uint32 theTotalCurrAI_Len; // The total number of attribute info // words currently defined Uint32 theAI_LenInCurrAI; // The number of words defined in the // current ATTRINFO signal - NdbApiSignal* theFirstKEYINFO; // The first KEYINFO signal object NdbApiSignal* theLastKEYINFO; // The first KEYINFO signal object class NdbLabel* theFirstLabel; @@ -879,8 +886,8 @@ protected: Uint32* theKEYINFOptr; // Pointer to where to write KEYINFO Uint32* theATTRINFOptr; // Pointer to where to write ATTRINFO - const class NdbTableImpl* m_currentTable; // The current table - const class NdbTableImpl* m_accessTable; + const class NdbTableImpl* m_currentTable; // The current table + const class NdbTableImpl* m_accessTable; // Index table (== current for pk) // Set to TRUE when a tuple key attribute has been defined. Uint32 theTupleKeyDefined[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY][3]; @@ -888,13 +895,14 @@ protected: Uint32 theTotalNrOfKeyWordInSignal; // The total number of // keyword in signal. - Uint32 theTupKeyLen; // Length of the tuple key in words - Uint32 theNoOfTupKeyDefined; // The number of tuple key attributes - // currently defined - OperationType theOperationType; // Read Request, Update Req...... - + Uint32 theTupKeyLen; // Length of the tuple key in words + // left until done + Uint8 theNoOfTupKeyLeft; // The number of tuple key attributes + OperationType theOperationType; // Read Request, Update Req...... + LockMode theLockMode; // Can be set to WRITE if read operation OperationStatus theStatus; // The status of the operation. + Uint32 theMagicNumber; // Magic number to verify that object // is correct Uint32 theScanInfo; // Scan info bits (take over flag etc) @@ -906,12 +914,12 @@ protected: Uint32 theFinalUpdateSize; // Size of final updates for interpretation Uint32 theFinalReadSize; // Size of final reads for interpretation - Uint8 theStartIndicator; // Indicator of whether start operation - Uint8 theCommitIndicator; // Indicator of whether commit operation - Uint8 theSimpleIndicator; // Indicator of whether simple operation - Uint8 theDirtyIndicator; // Indicator of whether dirty operation - Uint8 theInterpretIndicator; // Indicator of whether interpreted operation - Uint8 theDistrKeyIndicator; // Indicates whether distr. key is used + Uint8 theStartIndicator; // Indicator of whether start operation + Uint8 theCommitIndicator; // Indicator of whether commit operation + Uint8 theSimpleIndicator; // Indicator of whether simple operation + Uint8 theDirtyIndicator; // Indicator of whether dirty operation + Uint8 theInterpretIndicator; // Indicator of whether interpreted operation + Int8 theDistrKeyIndicator_; // Indicates whether distr. key is used Uint16 m_tcReqGSN; Uint16 m_keyInfoGSN; diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp index 2e4d173ac75..81d0839ad8a 100644 --- a/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/ndb/include/ndbapi/NdbScanOperation.hpp @@ -114,7 +114,6 @@ protected: // Scan related variables Uint32 theParallelism; Uint32 m_keyInfo; - NdbApiSignal* theSCAN_TABREQ; int getFirstATTRINFOScan(); int doSendScan(int ProcessorId); diff --git a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp index 76aa745c3e0..83ab897d0c3 100644 --- a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp +++ b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp @@ -6852,8 +6852,7 @@ void Dbdih::execDIGETNODESREQ(Signal* signal) TabRecord* regTabDesc = tabRecord; jamEntry(); ptrCheckGuard(tabPtr, ttabFileSize, regTabDesc); - hashValue = hashValue >> tabPtr.p->kvalue; - Uint32 fragId = tabPtr.p->mask & hashValue; + Uint32 fragId = hashValue & tabPtr.p->mask; ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE); if (fragId < tabPtr.p->hashpointer) { jam(); diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index 3e09f91b2c4..b9c553bb805 100644 --- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -8459,7 +8459,7 @@ void Dbtc::systemErrorLab(Signal* signal) void Dbtc::execSCAN_TABREQ(Signal* signal) { const ScanTabReq * const scanTabReq = (ScanTabReq *)&signal->theData[0]; - const Uint32 reqinfo = scanTabReq->requestInfo; + const Uint32 ri = scanTabReq->requestInfo; const Uint32 aiLength = (scanTabReq->attrLenKeyLen & 0xFFFF); const Uint32 keyLen = scanTabReq->attrLenKeyLen >> 16; const Uint32 schemaVersion = scanTabReq->tableSchemaVersion; @@ -8469,8 +8469,8 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) const Uint32 buddyPtr = (tmpXX == 0xFFFFFFFF ? RNIL : tmpXX); Uint32 currSavePointId = 0; - Uint32 scanConcurrency = scanTabReq->getParallelism(reqinfo); - Uint32 noOprecPerFrag = ScanTabReq::getScanBatch(reqinfo); + Uint32 scanConcurrency = scanTabReq->getParallelism(ri); + Uint32 noOprecPerFrag = ScanTabReq::getScanBatch(ri); Uint32 scanParallel = scanConcurrency; Uint32 errCode; ScanRecordPtr scanptr; @@ -8545,6 +8545,8 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) seizeCacheRecord(signal); cachePtr.p->keylen = keyLen; cachePtr.p->save1 = 0; + cachePtr.p->distributionKey = scanTabReq->distributionKey; + cachePtr.p->distributionKeyIndicator= ScanTabReq::getDistributionKeyFlag(ri); scanptr = seizeScanrec(signal); ndbrequire(transP->apiScanRec == RNIL); @@ -8621,6 +8623,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, UintR scanParallel, UintR noOprecPerFrag) { + const UintR ri = scanTabReq->requestInfo; scanptr.p->scanTcrec = tcConnectptr.i; scanptr.p->scanApiRec = apiConnectptr.i; scanptr.p->scanAiLength = scanTabReq->attrLenKeyLen & 0xFFFF; @@ -8633,7 +8636,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, scanptr.p->batch_byte_size= scanTabReq->batch_byte_size; Uint32 tmp = 0; - const UintR ri = scanTabReq->requestInfo; ScanFragReq::setLockMode(tmp, ScanTabReq::getLockMode(ri)); ScanFragReq::setHoldLockFlag(tmp, ScanTabReq::getHoldLockFlag(ri)); ScanFragReq::setKeyinfoFlag(tmp, ScanTabReq::getKeyinfoFlag(ri)); @@ -8749,14 +8751,42 @@ void Dbtc::diFcountReqLab(Signal* signal, ScanRecordPtr scanptr) return; } + scanptr.p->scanNextFragId = 0; scanptr.p->scanState = ScanRecord::WAIT_FRAGMENT_COUNT; - /************************************************* - * THE FIRST STEP TO RECEIVE IS SUCCESSFULLY COMPLETED. - * WE MUST FIRST GET THE NUMBER OF FRAGMENTS IN THE TABLE. - ***************************************************/ - signal->theData[0] = tcConnectptr.p->dihConnectptr; - signal->theData[1] = scanptr.p->scanTableref; - sendSignal(cdihblockref, GSN_DI_FCOUNTREQ, signal, 2, JBB); + + if(!cachePtr.p->distributionKeyIndicator) + { + jam(); + /************************************************* + * THE FIRST STEP TO RECEIVE IS SUCCESSFULLY COMPLETED. + * WE MUST FIRST GET THE NUMBER OF FRAGMENTS IN THE TABLE. + ***************************************************/ + signal->theData[0] = tcConnectptr.p->dihConnectptr; + signal->theData[1] = scanptr.p->scanTableref; + sendSignal(cdihblockref, GSN_DI_FCOUNTREQ, signal, 2, JBB); + } + else + { + signal->theData[0] = tcConnectptr.p->dihConnectptr; + signal->theData[1] = tabPtr.i; + signal->theData[2] = cachePtr.p->distributionKey; + EXECUTE_DIRECT(DBDIH, GSN_DIGETNODESREQ, signal, 3); + UintR TerrorIndicator = signal->theData[0]; + jamEntry(); + if (TerrorIndicator != 0) { + signal->theData[0] = tcConnectptr.i; + //signal->theData[1] Contains error + execDI_FCOUNTREF(signal); + return; + } + + UintR Tdata1 = signal->theData[1]; + scanptr.p->scanNextFragId = Tdata1; + + signal->theData[0] = tcConnectptr.i; + signal->theData[1] = 1; // Frag count + execDI_FCOUNTCONF(signal); + } return; }//Dbtc::diFcountReqLab() @@ -8773,7 +8803,7 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal) { jamEntry(); tcConnectptr.i = signal->theData[0]; - const UintR tfragCount = signal->theData[1]; + Uint32 tfragCount = signal->theData[1]; ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord); apiConnectptr.i = tcConnectptr.p->apiConnect; ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord); @@ -8807,24 +8837,17 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal) return; } - if(scanptr.p->scanParallel > tfragCount){ - jam(); - abortScanLab(signal, scanptr, ZTOO_HIGH_CONCURRENCY_ERROR); - return; - } - scanptr.p->scanParallel = tfragCount; scanptr.p->scanNoFrag = tfragCount; - scanptr.p->scanNextFragId = 0; scanptr.p->scanState = ScanRecord::RUNNING; setApiConTimer(apiConnectptr.i, 0, __LINE__); updateBuddyTimer(apiConnectptr); ScanFragRecPtr ptr; - ScanFragList list(c_scan_frag_pool, - scanptr.p->m_running_scan_frags); - for (list.first(ptr); !ptr.isNull(); list.next(ptr)){ + ScanFragList list(c_scan_frag_pool, scanptr.p->m_running_scan_frags); + for (list.first(ptr); !ptr.isNull() && tfragCount; + list.next(ptr), tfragCount--){ jam(); ptr.p->lqhBlockref = 0; @@ -8839,6 +8862,20 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal) signal->theData[3] = ptr.p->scanFragId; sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB); }//for + + ScanFragList queued(c_scan_frag_pool, scanptr.p->m_queued_scan_frags); + for (; !ptr.isNull();) + { + ptr.p->m_ops = 0; + ptr.p->m_totalLen = 0; + ptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY; + ptr.p->stopFragTimer(); + + ScanFragRecPtr tmp = ptr; + list.next(ptr); + list.remove(tmp); + queued.add(tmp); + } }//Dbtc::execDI_FCOUNTCONF() /****************************************************** diff --git a/ndb/src/ndbapi/NdbBlob.cpp b/ndb/src/ndbapi/NdbBlob.cpp index feab95d8ca5..615c167991f 100644 --- a/ndb/src/ndbapi/NdbBlob.cpp +++ b/ndb/src/ndbapi/NdbBlob.cpp @@ -107,8 +107,8 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm } { NdbDictionary::Column bc("PK"); bc.setType(NdbDictionary::Column::Unsigned); - assert(t->m_sizeOfKeysInWords != 0); - bc.setLength(t->m_sizeOfKeysInWords); + assert(t->m_keyLenInWords != 0); + bc.setLength(t->m_keyLenInWords); bc.setPrimaryKey(true); bt.addColumn(bc); } @@ -325,7 +325,7 @@ int NdbBlob::setTableKeyValue(NdbOperation* anOp) { const Uint32* data = (const Uint32*)theKeyBuf.data; - DBG("setTableKeyValue key=" << ndb_blob_debug(data, theTable->m_sizeOfKeysInWords)); + DBG("setTableKeyValue key=" << ndb_blob_debug(data, theTable->m_keyLenInWords)); const unsigned columns = theTable->m_columns.size(); unsigned pos = 0; for (unsigned i = 0; i < columns; i++) { @@ -348,7 +348,7 @@ int NdbBlob::setAccessKeyValue(NdbOperation* anOp) { const Uint32* data = (const Uint32*)theAccessKeyBuf.data; - DBG("setAccessKeyValue key=" << ndb_blob_debug(data, theAccessTable->m_sizeOfKeysInWords)); + DBG("setAccessKeyValue key=" << ndb_blob_debug(data, theAccessTable->m_keyLenInWords)); const unsigned columns = theAccessTable->m_columns.size(); unsigned pos = 0; for (unsigned i = 0; i < columns; i++) { @@ -371,7 +371,7 @@ int NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part) { Uint32* data = (Uint32*)theKeyBuf.data; - unsigned size = theTable->m_sizeOfKeysInWords; + unsigned size = theTable->m_keyLenInWords; DBG("setPartKeyValue dist=" << getDistKey(part) << " part=" << part << " key=" << ndb_blob_debug(data, size)); if (anOp->equal((Uint32)0, getDistKey(part)) == -1 || anOp->equal((Uint32)1, part) == -1 || @@ -1043,8 +1043,8 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* theBlobTable = &NdbTableImpl::getImpl(*bt); } // buffers - theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2); - theAccessKeyBuf.alloc(theAccessTable->m_sizeOfKeysInWords << 2); + theKeyBuf.alloc(theTable->m_keyLenInWords << 2); + theAccessKeyBuf.alloc(theAccessTable->m_keyLenInWords << 2); theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize); thePartBuf.alloc(thePartSize); theHead = (Head*)theHeadInlineBuf.data; @@ -1055,7 +1055,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* if (isTableOp()) { // get table key Uint32* data = (Uint32*)theKeyBuf.data; - unsigned size = theTable->m_sizeOfKeysInWords; + unsigned size = theTable->m_keyLenInWords; if (theNdbOp->getKeyFromTCREQ(data, size) == -1) { setErrorCode(ErrUsage); return -1; @@ -1064,7 +1064,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* if (isIndexOp()) { // get index key Uint32* data = (Uint32*)theAccessKeyBuf.data; - unsigned size = theAccessTable->m_sizeOfKeysInWords; + unsigned size = theAccessTable->m_keyLenInWords; if (theNdbOp->getKeyFromTCREQ(data, size) == -1) { setErrorCode(ErrUsage); return -1; @@ -1326,7 +1326,7 @@ NdbBlob::atNextResult() thePos = 0; // get primary key { Uint32* data = (Uint32*)theKeyBuf.data; - unsigned size = theTable->m_sizeOfKeysInWords; + unsigned size = theTable->m_keyLenInWords; if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) { setErrorCode(ErrUsage); return -1; diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index 1457792cf28..607afc8ddf4 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -1073,8 +1073,7 @@ NdbConnection::getNdbIndexScanOperation(const NdbIndexImpl* index, if (theCommitStatus == Started){ const NdbTableImpl * indexTable = index->getIndexTable(); if (indexTable != 0){ - NdbIndexScanOperation* tOp = - getNdbScanOperation((NdbTableImpl *) indexTable); + NdbIndexScanOperation* tOp = getNdbScanOperation(indexTable); tOp->m_currentTable = table; if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor; return tOp; diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index f3c15ea3f84..79ae484aff8 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -265,8 +265,9 @@ NdbTableImpl::init(){ m_indexType = NdbDictionary::Index::Undefined; m_noOfKeys = 0; + m_noOfDistributionKeys = 0; m_fragmentCount = 0; - m_sizeOfKeysInWords = 0; + m_keyLenInWords = 0; m_noOfBlobs = 0; } @@ -345,8 +346,9 @@ NdbTableImpl::assign(const NdbTableImpl& org) delete m_index; m_index = org.m_index; + m_noOfDistributionKeys = org.m_noOfDistributionKeys; m_noOfKeys = org.m_noOfKeys; - m_sizeOfKeysInWords = org.m_sizeOfKeysInWords; + m_keyLenInWords = org.m_keyLenInWords; m_noOfBlobs = org.m_noOfBlobs; m_version = org.m_version; @@ -1213,6 +1215,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, Uint32 keyInfoPos = 0; Uint32 keyCount = 0; Uint32 blobCount = 0; + Uint32 distKeys = 0; for(Uint32 i = 0; i < tableDesc.NoOfAttributes; i++) { DictTabInfo::Attribute attrDesc; attrDesc.init(); @@ -1276,6 +1279,9 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, col->m_keyInfoPos = keyInfoPos + 1; keyInfoPos += ((col->m_attrSize * col->m_arraySize + 3) / 4); keyCount++; + + if(attrDesc.AttributeDKey) + distKeys++; } else { col->m_keyInfoPos = 0; } @@ -1294,8 +1300,8 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, impl->m_noOfKeys = keyCount; impl->m_keyLenInWords = keyInfoPos; - impl->m_sizeOfKeysInWords = keyInfoPos; impl->m_noOfBlobs = blobCount; + impl->m_noOfDistributionKeys = distKeys; * ret = impl; return 0; } diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/ndb/src/ndbapi/NdbDictionaryImpl.hpp index 90e12ddeccd..955ac3ed3c7 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.hpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.hpp @@ -124,8 +124,8 @@ public: int m_kvalue; int m_minLoadFactor; int m_maxLoadFactor; - int m_keyLenInWords; - int m_fragmentCount; + Uint16 m_keyLenInWords; + Uint16 m_fragmentCount; NdbDictionaryImpl * m_dictionary; NdbIndexImpl * m_index; @@ -143,9 +143,9 @@ public: /** * Aggregates */ - Uint32 m_noOfKeys; - unsigned short m_sizeOfKeysInWords; - unsigned short m_noOfBlobs; + Uint8 m_noOfKeys; + Uint8 m_noOfDistributionKeys; + Uint8 m_noOfBlobs; /** * Equality/assign diff --git a/ndb/src/ndbapi/NdbIndexOperation.cpp b/ndb/src/ndbapi/NdbIndexOperation.cpp index 159726164c2..d3fbd015178 100644 --- a/ndb/src/ndbapi/NdbIndexOperation.cpp +++ b/ndb/src/ndbapi/NdbIndexOperation.cpp @@ -70,9 +70,6 @@ NdbIndexOperation::indxInit(const NdbIndexImpl * anIndex, } m_theIndex = anIndex; m_accessTable = anIndex->m_table; - TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq, theTCREQ->getDataPtrSend()); - theKEYINFOptr = &tcKeyReq->keyInfo[0]; - theATTRINFOptr = &tcKeyReq->attrInfo[0]; return 0; } @@ -269,7 +266,7 @@ NdbIndexOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransactionId) tcKeyReq->setKeyLength(tReqInfo, tIndexLen); tcKeyReq->setAbortOption(tReqInfo, abortOption); - Uint8 tDistrKeyIndicator = theDistrKeyIndicator; + Uint8 tDistrKeyIndicator = theDistrKeyIndicator_; Uint8 tScanIndicator = theScanInfo & 1; tcKeyReq->setDistributionKeyFlag(tReqInfo, tDistrKeyIndicator); @@ -328,7 +325,7 @@ NdbIndexOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransactionId) /** * Set transid and TC connect ptr in the INDXKEYINFO signals */ - NdbApiSignal* tSignal = theFirstKEYINFO; + NdbApiSignal* tSignal = theTCREQ->next(); Uint32 remainingKey = tIndexLen - TcKeyReq::MaxKeyInfo; do { diff --git a/ndb/src/ndbapi/NdbOperation.cpp b/ndb/src/ndbapi/NdbOperation.cpp index 05c3cf65c69..4f39286a624 100644 --- a/ndb/src/ndbapi/NdbOperation.cpp +++ b/ndb/src/ndbapi/NdbOperation.cpp @@ -49,7 +49,6 @@ NdbOperation::NdbOperation(Ndb* aNdb) : theCurrentATTRINFO(NULL), theTotalCurrAI_Len(0), theAI_LenInCurrAI(0), - theFirstKEYINFO(NULL), theLastKEYINFO(NULL), theFirstLabel(NULL), @@ -68,7 +67,7 @@ NdbOperation::NdbOperation(Ndb* aNdb) : //theSchemaVersion(0), theTotalNrOfKeyWordInSignal(8), theTupKeyLen(0), - theNoOfTupKeyDefined(0), + theNoOfTupKeyLeft(0), theOperationType(NotDefined), theStatus(Init), theMagicNumber(0xFE11D0), @@ -142,12 +141,11 @@ NdbOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection){ theFirstATTRINFO = NULL; theCurrentATTRINFO = NULL; - theFirstKEYINFO = NULL; theLastKEYINFO = NULL; - theTupKeyLen = 0; - theNoOfTupKeyDefined = 0; + theTupKeyLen = 0; + theNoOfTupKeyLeft = tab->getNoOfPrimaryKeys(); theTotalCurrAI_Len = 0; theAI_LenInCurrAI = 0; @@ -156,7 +154,7 @@ NdbOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection){ theSimpleIndicator = 0; theDirtyIndicator = 0; theInterpretIndicator = 0; - theDistrKeyIndicator = 0; + theDistrKeyIndicator_ = 0; theScanInfo = 0; theTotalNrOfKeyWordInSignal = 8; theMagicNumber = 0xABCDEF01; @@ -202,11 +200,16 @@ NdbOperation::release() NdbBlob* tBlob; NdbBlob* tSaveBlob; - if (theTCREQ != NULL) + tSignal = theTCREQ; + while (tSignal != NULL) { - theNdb->releaseSignal(theTCREQ); - } + tSaveSignal = tSignal; + tSignal = tSignal->next(); + theNdb->releaseSignal(tSaveSignal); + } theTCREQ = NULL; + theLastKEYINFO = NULL; + tSignal = theFirstATTRINFO; while (tSignal != NULL) { @@ -216,15 +219,7 @@ NdbOperation::release() } theFirstATTRINFO = NULL; theCurrentATTRINFO = NULL; - tSignal = theFirstKEYINFO; - while (tSignal != NULL) - { - tSaveSignal = tSignal; - tSignal = tSignal->next(); - theNdb->releaseSignal(tSaveSignal); - } - theFirstKEYINFO = NULL; - theLastKEYINFO = NULL; + if (theInterpretIndicator == 1) { tBranch = theFirstBranch; diff --git a/ndb/src/ndbapi/NdbOperationExec.cpp b/ndb/src/ndbapi/NdbOperationExec.cpp index ebd83dc3ef9..7fa6627afa8 100644 --- a/ndb/src/ndbapi/NdbOperationExec.cpp +++ b/ndb/src/ndbapi/NdbOperationExec.cpp @@ -65,7 +65,7 @@ NdbOperation::doSend(int aNodeId, Uint32 lastFlag) if (tReturnCode == -1) { return -1; } - NdbApiSignal *tSignal = theFirstKEYINFO; + NdbApiSignal *tSignal = theTCREQ->next(); while (tSignal != NULL) { NdbApiSignal* tnextSignal = tSignal->next(); tReturnCode = tp->sendSignal(tSignal, aNodeId); @@ -201,7 +201,7 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId) abortOption = tSimpleIndicator ? IgnoreError : abortOption; tcKeyReq->setAbortOption(tReqInfo, abortOption); - Uint8 tDistrKeyIndicator = theDistrKeyIndicator; + Uint8 tDistrKeyIndicator = theDistrKeyIndicator_; Uint8 tScanIndicator = theScanInfo & 1; tcKeyReq->setDistributionKeyFlag(tReqInfo, tDistrKeyIndicator); @@ -260,7 +260,7 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId) /** * Set transid, TC connect ptr and length in the KEYINFO signals */ - NdbApiSignal* tSignal = theFirstKEYINFO; + NdbApiSignal* tSignal = theTCREQ->next(); Uint32 remainingKey = tTupKeyLen - TcKeyReq::MaxKeyInfo; do { Uint32* tSigDataPtr = tSignal->getDataPtrSend(); diff --git a/ndb/src/ndbapi/NdbOperationSearch.cpp b/ndb/src/ndbapi/NdbOperationSearch.cpp index f172753bfda..adf67a649f2 100644 --- a/ndb/src/ndbapi/NdbOperationSearch.cpp +++ b/ndb/src/ndbapi/NdbOperationSearch.cpp @@ -38,7 +38,9 @@ Adjust: 971022 UABMNST First version. #include <AttributeHeader.hpp> #include <signaldata/TcKeyReq.hpp> +#include <signaldata/KeyInfo.hpp> #include "NdbDictionaryImpl.hpp" +#include <md5_hash.hpp> /****************************************************************************** CondIdType equal(const char* anAttrName, char* aValue, Uint32 aVarKeylen); @@ -60,8 +62,8 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, Uint32 tData; Uint32 tKeyInfoPosition; const char* aValue = aValuePassed; - Uint32 xfrmData[1024]; - Uint32 tempData[1024]; + Uint64 xfrmData[512]; + Uint64 tempData[512]; if ((theStatus == OperationDefined) && (aValue != NULL) && @@ -76,6 +78,8 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, *****************************************************************************/ tAttrId = tAttrInfo->m_attrId; tKeyInfoPosition = tAttrInfo->m_keyInfoPos; + bool tDistrKey = tAttrInfo->m_distributionKey; + Uint32 i = 0; if (tAttrInfo->m_pk) { Uint32 tKeyDefined = theTupleKeyDefined[0][2]; @@ -119,23 +123,25 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; { - /*************************************************************************** - * Check if the pointer of the value passed is aligned on a 4 byte - * boundary. If so only assign the pointer to the internal variable - * aValue. If it is not aligned then we start by copying the value to - * tempData and use this as aValue instead. - *****************************************************************************/ + /************************************************************************ + * Check if the pointer of the value passed is aligned on a 4 byte + * boundary. If so only assign the pointer to the internal variable + * aValue. If it is not aligned then we start by copying the value to + * tempData and use this as aValue instead. + ***********************************************************************/ const int attributeSize = sizeInBytes; const int slack = sizeInBytes & 3; - - if ((((UintPtr)aValue & 3) != 0) || (slack != 0)){ - tempData[attributeSize >> 2] = 0; + const int align = UintPtr(aValue) & 7; + + if (((align & 3) != 0) || (slack != 0) || (tDistrKey && (align != 0))) + { + ((Uint32*)tempData)[attributeSize >> 2] = 0; memcpy(&tempData[0], aValue, attributeSize); aValue = (char*)&tempData[0]; }//if } const char* aValueToWrite = aValue; - + CHARSET_INFO* cs = tAttrInfo->m_cs; if (cs != 0) { // current limitation: strxfrm does not increase length @@ -185,18 +191,12 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, }//if #endif - int tDistrKey = tAttrInfo->m_distributionKey; OperationType tOpType = theOperationType; - if ((tDistrKey != 1)) { - ; - } else { - //set_distribution_key(aValue, totalSizeInWords); - } - /****************************************************************************** + /************************************************************************** * If the operation is an insert request and the attribute is stored then * we also set the value in the stored part through putting the * information in the ATTRINFO signals. - *****************************************************************************/ + *************************************************************************/ if ((tOpType == InsertRequest) || (tOpType == WriteRequest)) { // invalid data can crash kernel @@ -213,25 +213,23 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, insertATTRINFOloop((Uint32*)aValueToWrite, sz); }//if - /*************************************************************************** + /************************************************************************** * Store the Key information in the TCKEYREQ and KEYINFO signals. - **************************************************************************/ + *************************************************************************/ if (insertKEYINFO(aValue, tKeyInfoPosition, totalSizeInWords) != -1) { - /************************************************************************* + /************************************************************************ * Add one to number of tuple key attributes defined. * If all have been defined then set the operation state to indicate * that tuple key is defined. * Thereby no more search conditions are allowed in this version. - ************************************************************************/ - Uint32 tNoKeysDef = theNoOfTupKeyDefined; + ***********************************************************************/ + Uint32 tNoKeysDef = theNoOfTupKeyLeft - 1; Uint32 tErrorLine = theErrorLine; - int tNoTableKeys = m_accessTable->m_noOfKeys; unsigned char tInterpretInd = theInterpretIndicator; - tNoKeysDef++; - theNoOfTupKeyDefined = tNoKeysDef; + theNoOfTupKeyLeft = tNoKeysDef; tErrorLine++; theErrorLine = tErrorLine; - if (int(tNoKeysDef) == tNoTableKeys) { + if (tNoKeysDef == 0) { if (tOpType == UpdateRequest) { if (tInterpretInd == 1) { theStatus = GetValue; @@ -262,12 +260,19 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, return -1; }//if }//if - return 0; + if (!tDistrKey) + { + return 0; + } + else + { + return handle_distribution_key((Uint64*)aValue, totalSizeInWords); + } } else { return -1; }//if } - + if (aValue == NULL) { // NULL value in primary key setErrorCodeAbort(4505); @@ -412,10 +417,11 @@ NdbOperation::insertKEYINFO(const char* aValue, setErrorCodeAbort(4001); return -1; } - if (theFirstKEYINFO != NULL) + if (theTCREQ->next() != NULL) theLastKEYINFO->next(tSignal); else - theFirstKEYINFO = tSignal; + theTCREQ->next(tSignal); + theLastKEYINFO = tSignal; theLastKEYINFO->next(NULL); theTotalNrOfKeyWordInSignal += 20; @@ -428,7 +434,7 @@ NdbOperation::insertKEYINFO(const char* aValue, * this is the first word in a KEYINFO signal. * *****************************************************************************/ tPosition = aStartPosition; - tCurrentKEYINFO = theFirstKEYINFO; + tCurrentKEYINFO = theTCREQ->next(); /***************************************************************************** * Start by filling up Key information in the 8 words allocated in the * @@ -487,14 +493,14 @@ LastWordLabel: int NdbOperation::getKeyFromTCREQ(Uint32* data, unsigned size) { - assert(m_accessTable != 0 && m_accessTable->m_sizeOfKeysInWords != 0); - assert(m_accessTable->m_sizeOfKeysInWords == size); + assert(m_accessTable != 0 && m_accessTable->m_keyLenInWords != 0); + assert(m_accessTable->m_keyLenInWords == size); unsigned pos = 0; while (pos < 8 && pos < size) { data[pos] = theKEYINFOptr[pos]; pos++; } - NdbApiSignal* tSignal = theFirstKEYINFO; + NdbApiSignal* tSignal = theTCREQ->next(); unsigned n = 0; while (pos < size) { if (n == 20) { @@ -505,3 +511,111 @@ NdbOperation::getKeyFromTCREQ(Uint32* data, unsigned size) } return 0; } + +int +NdbOperation::handle_distribution_key(const Uint64* value, Uint32 len) +{ + if(theNoOfTupKeyLeft > 0) + { + return 0; + } + + if(m_accessTable->m_noOfDistributionKeys == 1) + { + setPartitionHash(value, len); + } + else + { + /** + * Copy distribution key to linear memory + */ + NdbColumnImpl* const * cols = m_accessTable->m_columns.getBase(); + Uint32 len = 0; + Uint64 tmp[1000]; + + Uint32 chunk = 8; + Uint32* dst = (Uint32*)tmp; + NdbApiSignal* tSignal = theTCREQ; + Uint32* src = ((TcKeyReq*)tSignal->getDataPtrSend())->keyInfo; + if(tSignal->readSignalNumber() == GSN_SCAN_TABREQ) + { + tSignal = tSignal->next(); + src = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; + } + + for(unsigned i = m_accessTable->m_columns.size(); i>0; cols++, i--) + { + if (!(* cols)->getPrimaryKey()) + continue; + + NdbColumnImpl* tAttrInfo = * cols; + Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; + Uint32 currLen = (sizeInBytes + 3) >> 2; + if (tAttrInfo->getDistributionKey()) + { + while (currLen >= chunk) + { + memcpy(dst, src, 4*chunk); + dst += chunk; + tSignal = tSignal->next(); + src = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; + currLen -= chunk; + chunk = KeyInfo::DataLength; + } + + memcpy(dst, src, 4*currLen); + dst += currLen; + src += currLen; + chunk -= currLen; + } + else + { + while (currLen >= chunk) + { + tSignal = tSignal->next(); + src = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; + currLen -= chunk; + chunk = KeyInfo::DataLength; + } + + src += currLen; + chunk -= currLen; + } + } + setPartitionHash(tmp, (Uint32*)tmp - dst); + } + return 0; +} + +void +NdbOperation::setPartitionHash(Uint32 value) +{ + union { + Uint32 tmp32; + Uint64 tmp64; + }; + + tmp32 = value; + setPartitionHash(&tmp64, 1); +} + +void +NdbOperation::setPartitionHash(const Uint64* value, Uint32 len) +{ + Uint32 buf[4]; + md5_hash(buf, value, len); + setPartitionId(buf[1]); +} + +void +NdbOperation::setPartitionId(Uint32 value) +{ + theDistributionKey = value; + theDistrKeyIndicator_ = 1; +} + +Uint32 +NdbOperation::getPartitionId() const +{ + return theDistributionKey; +} diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index fd63ce96f25..3296aadae7c 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -118,7 +118,7 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection) theStatus = GetValue; theOperationType = OpenScanRequest; theNdbCon->theMagicNumber = 0xFE11DF; - + theNoOfTupKeyLeft = tab->m_noOfDistributionKeys; return 0; } @@ -199,6 +199,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, return 0; }//if + theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ); ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); req->apiConnectPtr = theNdbCon->theTCConPtr; req->tableId = m_accessTable->m_tableId; @@ -219,16 +220,17 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, req->transId1 = (Uint32) transId; req->transId2 = (Uint32) (transId >> 32); - NdbApiSignal* tSignal = - theFirstKEYINFO; - - theFirstKEYINFO = (tSignal ? tSignal : tSignal = theNdb->getSignal()); + NdbApiSignal* tSignal = theSCAN_TABREQ->next(); + if(!tSignal) + { + theSCAN_TABREQ->next(tSignal = theNdb->getSignal()); + } theLastKEYINFO = tSignal; tSignal->setSignal(GSN_KEYINFO); theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; theTotalNrOfKeyWordInSignal= 0; - + getFirstATTRINFOScan(); return getResultSet(); } @@ -348,60 +350,6 @@ NdbScanOperation::getFirstATTRINFOScan() #define FAKE_PTR 2 #define API_PTR 3 - -/* - * After setBound() are done, move the accumulated ATTRINFO signals to - * a separate list. Then continue with normal scan. - */ -#if 0 -int -NdbIndexScanOperation::saveBoundATTRINFO() -{ - theCurrentATTRINFO->setLength(theAI_LenInCurrAI); - theBoundATTRINFO = theFirstATTRINFO; - theTotalBoundAI_Len = theTotalCurrAI_Len; - theTotalCurrAI_Len = 5; - theBoundATTRINFO->setData(theTotalBoundAI_Len, 4); - theBoundATTRINFO->setData(0, 5); - theBoundATTRINFO->setData(0, 6); - theBoundATTRINFO->setData(0, 7); - theBoundATTRINFO->setData(0, 8); - theStatus = GetValue; - - int res = getFirstATTRINFOScan(); - - /** - * Define each key with getValue (if ordered) - * unless the one's with EqBound - */ - if(!res && m_ordered){ - - /** - * If setBound EQ - */ - Uint32 i = 0; - while(theTupleKeyDefined[i][0] == SETBOUND_EQ) - i++; - - - Uint32 cnt = m_accessTable->getNoOfColumns() - 1; - m_sort_columns = cnt - i; - for(; i<cnt; i++){ - const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i]; - const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos); - NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1); - UintPtr newVal = UintPtr(tmp); - theTupleKeyDefined[i][0] = FAKE_PTR; - theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF); -#if (SIZEOF_CHARP == 8) - theTupleKeyDefined[i][2] = (newVal >> 32); -#endif - } - } - return res; -} -#endif - #define WAITFOR_SCAN_TIMEOUT 120000 int @@ -683,12 +631,14 @@ void NdbScanOperation::release() for(Uint32 i = 0; i<m_allocated_receivers; i++){ m_receivers[i]->release(); } + + NdbOperation::release(); + if(theSCAN_TABREQ) { theNdb->releaseSignal(theSCAN_TABREQ); theSCAN_TABREQ = 0; } - NdbOperation::release(); } /*************************************************************************** @@ -783,10 +733,6 @@ NdbScanOperation::doSendScan(int aProcessorId) assert(theSCAN_TABREQ != NULL); tSignal = theSCAN_TABREQ; - if (tSignal->setSignal(GSN_SCAN_TABREQ) == -1) { - setErrorCode(4001); - return -1; - } Uint32 tupKeyLen = theTupKeyLen; Uint32 len = theTotalNrOfKeyWordInSignal; @@ -798,6 +744,10 @@ NdbScanOperation::doSendScan(int aProcessorId) // we created the ATTRINFO signals after the SCAN_TABREQ signal. ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend()); req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len; + Uint32 tmp = req->requestInfo; + ScanTabReq::setDistributionKeyFlag(tmp, theDistrKeyIndicator_); + req->distributionKey = theDistributionKey; + tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_); TransporterFacade *tp = TransporterFacade::instance(); LinearSectionPtr ptr[3]; @@ -814,8 +764,8 @@ NdbScanOperation::doSendScan(int aProcessorId) tSignal = theLastKEYINFO; tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal); - assert(theFirstKEYINFO != NULL); - tSignal = theFirstKEYINFO; + assert(theSCAN_TABREQ->next() != NULL); + tSignal = theSCAN_TABREQ->next(); NdbApiSignal* last; do { @@ -942,7 +892,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ if(i < len){ NdbApiSignal* tSignal = theNdb->getSignal(); - newOp->theFirstKEYINFO = tSignal; + newOp->theTCREQ->next(tSignal); Uint32 left = len - i; while(tSignal && left > KeyInfo::DataLength){ @@ -1077,37 +1027,46 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, Uint32 currLen = theTotalNrOfKeyWordInSignal; Uint32 remaining = KeyInfo::DataLength - currLen; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; - + bool tDistrKey = tAttrInfo->m_distributionKey; + // normalize char bound CHARSET_INFO* cs = tAttrInfo->m_cs; - Uint32 xfrmData[2000]; + Uint64 xfrmData[1001]; if (cs != NULL && aValue != NULL) { // current limitation: strxfrm does not increase length assert(cs->strxfrm_multiply == 1); unsigned n = - (*cs->coll->strnxfrm)(cs, + (*cs->coll->strnxfrm)(cs, (uchar*)xfrmData, sizeof(xfrmData), (const uchar*)aValue, sizeInBytes); + + ((Uint32*)xfrmData)[sizeInBytes >> 2] = 0; + while (n < sizeInBytes) ((uchar*)xfrmData)[n++] = 0x20; + + if(sizeInBytes & 3) + sizeInBytes += (4 - sizeInBytes & 3); + aValue = (char*)xfrmData; } + len = aValue != NULL ? sizeInBytes : 0; if (len != sizeInBytes && (len != 0)) { setErrorCodeAbort(4209); return -1; } // insert attribute header - len = aValue != NULL ? sizeInBytes : 0; Uint32 tIndexAttrId = tAttrInfo->m_attrId; Uint32 sizeInWords = (len + 3) / 4; AttributeHeader ah(tIndexAttrId, sizeInWords); const Uint32 ahValue = ah.m_value; - const bool aligned = (UintPtr(aValue) & 3) == 0; + const Uint32 align = (UintPtr(aValue) & 7); + const bool aligned = (type == BoundEQ) ? (align & 3) == 0 : (align == 0); const bool nobytes = (len & 0x3) == 0; const Uint32 totalLen = 2 + sizeInWords; Uint32 tupKeyLen = theTupKeyLen; - if(remaining > totalLen && aligned && nobytes){ + if(remaining > totalLen && aligned && nobytes){ Uint32 * dst = theKEYINFOptr + currLen; * dst ++ = type; * dst ++ = ahValue; @@ -1115,12 +1074,11 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, theTotalNrOfKeyWordInSignal = currLen + totalLen; } else { if(!aligned || !nobytes){ - Uint32 tempData[2002]; + Uint32 *tempData = (Uint32*)xfrmData; tempData[0] = type; tempData[1] = ahValue; + tempData[sizeInBytes >> 2] = 0; memcpy(tempData+2, aValue, len); - while ((len & 0x3) != 0) - ((char*)&tempData[2])[len++] = 0; insertBOUNDS(tempData, 2+sizeInWords); } else { Uint32 buf[2] = { type, ahValue }; @@ -1139,11 +1097,11 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, * so it's safe to use [tIndexAttrId] * (instead of looping as is NdbOperation::equal_impl) */ - if(type == BoundEQ && !theTupleKeyDefined[tIndexAttrId][0]){ - theNoOfTupKeyDefined++; - theTupleKeyDefined[tIndexAttrId][0] = SETBOUND_EQ; + if(type == BoundEQ && tDistrKey) + { + theNoOfTupKeyLeft--; + return handle_distribution_key((Uint64*)aValue, sizeInWords); } - return 0; } else { setErrorCodeAbort(4228); // XXX wrong code @@ -1559,10 +1517,12 @@ NdbIndexScanOperation::reset_bounds(){ theError.code = 0; reset_receivers(theParallelism, m_ordered); - theLastKEYINFO = theFirstKEYINFO; - theKEYINFOptr = ((KeyInfo*)theFirstKEYINFO->getDataPtrSend())->keyData; + theLastKEYINFO = theSCAN_TABREQ->next(); + theKEYINFOptr = ((KeyInfo*)theLastKEYINFO->getDataPtrSend())->keyData; theTupKeyLen = 0; theTotalNrOfKeyWordInSignal = 0; + theNoOfTupKeyLeft = m_accessTable->m_noOfDistributionKeys; + theDistrKeyIndicator_ = 0; m_transConnection ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp, this); |