summaryrefslogtreecommitdiff
path: root/ndb/src/ndbapi/NdbOperationSearch.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/ndbapi/NdbOperationSearch.cpp')
-rw-r--r--ndb/src/ndbapi/NdbOperationSearch.cpp277
1 files changed, 169 insertions, 108 deletions
diff --git a/ndb/src/ndbapi/NdbOperationSearch.cpp b/ndb/src/ndbapi/NdbOperationSearch.cpp
index 69b4e803acd..4f2d26f4501 100644
--- a/ndb/src/ndbapi/NdbOperationSearch.cpp
+++ b/ndb/src/ndbapi/NdbOperationSearch.cpp
@@ -38,7 +38,9 @@ Adjust: 971022 UABMNST First version.
#include <AttributeHeader.hpp>
#include <signaldata/TcKeyReq.hpp>
+#include <signaldata/KeyInfo.hpp>
#include "NdbDictionaryImpl.hpp"
+#include <md5_hash.hpp>
/******************************************************************************
CondIdType equal(const char* anAttrName, char* aValue, Uint32 aVarKeylen);
@@ -60,8 +62,8 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
Uint32 tData;
Uint32 tKeyInfoPosition;
const char* aValue = aValuePassed;
- Uint32 xfrmData[1024];
- Uint32 tempData[1024];
+ Uint64 xfrmData[512];
+ Uint64 tempData[512];
if ((theStatus == OperationDefined) &&
(aValue != NULL) &&
@@ -76,6 +78,8 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
*****************************************************************************/
tAttrId = tAttrInfo->m_attrId;
tKeyInfoPosition = tAttrInfo->m_keyInfoPos;
+ bool tDistrKey = tAttrInfo->m_distributionKey;
+
Uint32 i = 0;
if (tAttrInfo->m_pk) {
Uint32 tKeyDefined = theTupleKeyDefined[0][2];
@@ -119,30 +123,30 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
{
- /***************************************************************************
- * Check if the pointer of the value passed is aligned on a 4 byte
- * boundary. If so only assign the pointer to the internal variable
- * aValue. If it is not aligned then we start by copying the value to
- * tempData and use this as aValue instead.
- *****************************************************************************/
+ /************************************************************************
+ * Check if the pointer of the value passed is aligned on a 4 byte
+ * boundary. If so only assign the pointer to the internal variable
+ * aValue. If it is not aligned then we start by copying the value to
+ * tempData and use this as aValue instead.
+ ***********************************************************************/
const int attributeSize = sizeInBytes;
const int slack = sizeInBytes & 3;
-
- if ((((UintPtr)aValue & 3) != 0) || (slack != 0)){
+ const int align = UintPtr(aValue) & 7;
+
+ if (((align & 3) != 0) || (slack != 0) || (tDistrKey && (align != 0)))
+ {
+ ((Uint32*)tempData)[attributeSize >> 2] = 0;
memcpy(&tempData[0], aValue, attributeSize);
aValue = (char*)&tempData[0];
- if(slack != 0) {
- char * tmp = (char*)&tempData[0];
- memset(&tmp[attributeSize], 0, (4 - slack));
- }//if
}//if
}
const char* aValueToWrite = aValue;
-
+
CHARSET_INFO* cs = tAttrInfo->m_cs;
if (cs != 0) {
// current limitation: strxfrm does not increase length
assert(cs->strxfrm_multiply == 1);
+ ((Uint32*)xfrmData)[sizeInBytes >> 2] = 0;
unsigned n =
(*cs->coll->strnxfrm)(cs,
(uchar*)xfrmData, sizeof(xfrmData),
@@ -152,9 +156,7 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
aValue = (char*)xfrmData;
}
- Uint32 bitsInLastWord = 8 * (sizeInBytes & 3) ;
Uint32 totalSizeInWords = (sizeInBytes + 3)/4; // Inc. bits in last word
- Uint32 sizeInWords = sizeInBytes / 4; // Exc. bits in last word
if (true){ //tArraySize != 0) {
Uint32 tTupKeyLen = theTupKeyLen;
@@ -190,84 +192,49 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
}//if
#endif
- int tDistrKey = tAttrInfo->m_distributionKey;
- int tDistrGroup = tAttrInfo->m_distributionGroup;
OperationType tOpType = theOperationType;
- if ((tDistrKey != 1) && (tDistrGroup != 1)) {
- ;
- } else if (tDistrKey == 1) {
- theDistrKeySize += totalSizeInWords;
- theDistrKeyIndicator = 1;
- } else {
- Uint32 TsizeInBytes = sizeInBytes;
- Uint32 TbyteOrderFix = 0;
- char* TcharByteOrderFix = (char*)&TbyteOrderFix;
- if (tAttrInfo->m_distributionGroupBits == 8) {
- char tFirstChar = aValue[TsizeInBytes - 2];
- char tSecondChar = aValue[TsizeInBytes - 2];
- TcharByteOrderFix[0] = tFirstChar;
- TcharByteOrderFix[1] = tSecondChar;
- TcharByteOrderFix[2] = 0x30;
- TcharByteOrderFix[3] = 0x30;
- theDistrGroupType = 0;
- } else {
- TbyteOrderFix = ((aValue[TsizeInBytes - 2] - 0x30) * 10)
- + (aValue[TsizeInBytes - 1] - 0x30);
- theDistrGroupType = 1;
- }//if
- theDistributionGroup = TbyteOrderFix;
- theDistrGroupIndicator = 1;
- }//if
- /******************************************************************************
+ /**************************************************************************
* If the operation is an insert request and the attribute is stored then
* we also set the value in the stored part through putting the
* information in the ATTRINFO signals.
- *****************************************************************************/
+ *************************************************************************/
if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest)) {
- if (!tAttrInfo->m_indexOnly){
- // invalid data can crash kernel
- if (cs != NULL &&
- (*cs->cset->well_formed_len)(cs,
- aValueToWrite,
- aValueToWrite + sizeInBytes,
- sizeInBytes) != sizeInBytes)
- goto equal_error4;
- Uint32 ahValue;
- const Uint32 sz = totalSizeInWords;
- AttributeHeader::init(&ahValue, tAttrId, sz);
- insertATTRINFO( ahValue );
- insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords);
- if (bitsInLastWord != 0) {
- tData = *(Uint32*)(aValueToWrite + (sizeInWords << 2));
- tData = convertEndian(tData);
- tData = tData & ((1 << bitsInLastWord) - 1);
- tData = convertEndian(tData);
- insertATTRINFO( tData );
- }//if
- }//if
+ // invalid data can crash kernel
+ if (cs != NULL &&
+ (*cs->cset->well_formed_len)(cs,
+ aValueToWrite,
+ aValueToWrite + sizeInBytes,
+ sizeInBytes) != sizeInBytes)
+ goto equal_error4;
+ Uint32 ahValue;
+ const Uint32 sz = totalSizeInWords;
+ AttributeHeader::init(&ahValue, tAttrId, sz);
+ insertATTRINFO( ahValue );
+ insertATTRINFOloop((Uint32*)aValueToWrite, sz);
}//if
- /***************************************************************************
+ /**************************************************************************
* Store the Key information in the TCKEYREQ and KEYINFO signals.
- **************************************************************************/
- if (insertKEYINFO(aValue, tKeyInfoPosition,
- totalSizeInWords, bitsInLastWord) != -1) {
- /*************************************************************************
+ *************************************************************************/
+ if (insertKEYINFO(aValue, tKeyInfoPosition, totalSizeInWords) != -1) {
+ /************************************************************************
* Add one to number of tuple key attributes defined.
* If all have been defined then set the operation state to indicate
* that tuple key is defined.
* Thereby no more search conditions are allowed in this version.
- ************************************************************************/
- Uint32 tNoKeysDef = theNoOfTupKeyDefined;
+ ***********************************************************************/
+ Uint32 tNoKeysDef = theNoOfTupKeyLeft - 1;
Uint32 tErrorLine = theErrorLine;
- int tNoTableKeys = m_currentTable->m_noOfKeys;
unsigned char tInterpretInd = theInterpretIndicator;
- tNoKeysDef++;
- theNoOfTupKeyDefined = tNoKeysDef;
+ theNoOfTupKeyLeft = tNoKeysDef;
tErrorLine++;
theErrorLine = tErrorLine;
- if (int(tNoKeysDef) == tNoTableKeys) {
+
+ if(tDistrKey)
+ handle_distribution_key((Uint64*)aValue, totalSizeInWords);
+
+ if (tNoKeysDef == 0) {
if (tOpType == UpdateRequest) {
if (tInterpretInd == 1) {
theStatus = GetValue;
@@ -297,13 +264,14 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
setErrorCodeAbort(4005);
return -1;
}//if
+ return 0;
}//if
- return 0;
} else {
return -1;
}//if
+ return 0;
}
-
+
if (aValue == NULL) {
// NULL value in primary key
setErrorCodeAbort(4505);
@@ -321,6 +289,8 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo,
setErrorCodeAbort(4225);
return -1;
}//if
+
+ ndbout_c("theStatus: %d", theStatus);
// If we come here, set a general errorcode
// and exit
@@ -387,8 +357,7 @@ NdbOperation::setTupleId()
int
NdbOperation::insertKEYINFO(const char* aValue,
register Uint32 aStartPosition,
- register Uint32 anAttrSizeInWords,
- register Uint32 anAttrBitsInLastWord)
+ register Uint32 anAttrSizeInWords)
{
NdbApiSignal* tSignal;
NdbApiSignal* tCurrentKEYINFO;
@@ -408,7 +377,7 @@ NdbOperation::insertKEYINFO(const char* aValue,
*****************************************************************************/
tEndPos = aStartPosition + anAttrSizeInWords - 1;
- if ((tEndPos < 9) && (anAttrBitsInLastWord == 0)) {
+ if ((tEndPos < 9)) {
register Uint32 tkeyData = *(Uint32*)aValue;
//TcKeyReq* tcKeyReq = CAST_PTR(TcKeyReq, tTCREQ->getDataPtrSend());
register Uint32* tDataPtr = (Uint32*)aValue;
@@ -449,10 +418,11 @@ NdbOperation::insertKEYINFO(const char* aValue,
setErrorCodeAbort(4001);
return -1;
}
- if (theFirstKEYINFO != NULL)
+ if (theTCREQ->next() != NULL)
theLastKEYINFO->next(tSignal);
else
- theFirstKEYINFO = tSignal;
+ theTCREQ->next(tSignal);
+
theLastKEYINFO = tSignal;
theLastKEYINFO->next(NULL);
theTotalNrOfKeyWordInSignal += 20;
@@ -465,7 +435,7 @@ NdbOperation::insertKEYINFO(const char* aValue,
* this is the first word in a KEYINFO signal. *
*****************************************************************************/
tPosition = aStartPosition;
- tCurrentKEYINFO = theFirstKEYINFO;
+ tCurrentKEYINFO = theTCREQ->next();
/*****************************************************************************
* Start by filling up Key information in the 8 words allocated in the *
@@ -518,39 +488,20 @@ NdbOperation::insertKEYINFO(const char* aValue,
} while (1);
LastWordLabel:
-
-/*****************************************************************************
- * There could be a last word that only contains partial data. This word*
- * will contain zeroes in the rest of the bits since the index expects *
- * a certain number of words and do not care for parts of words. *
- *****************************************************************************/
- if (anAttrBitsInLastWord != 0) {
- tData = *(Uint32*)(aValue + (anAttrSizeInWords - 1) * 4);
- tData = convertEndian(tData);
- tData = tData & ((1 << anAttrBitsInLastWord) - 1);
- tData = convertEndian(tData);
- if (tPosition > 8) {
- tCurrentKEYINFO->setData(tData, signalCounter);
- signalCounter++;
- } else {
- theTCREQ->setData(tData, (12 + tPosition));
- }//if
- }//if
-
return 0;
}
int
NdbOperation::getKeyFromTCREQ(Uint32* data, unsigned size)
{
- assert(m_accessTable != 0 && m_accessTable->m_sizeOfKeysInWords != 0);
- assert(m_accessTable->m_sizeOfKeysInWords == size);
+ assert(m_accessTable != 0 && m_accessTable->m_keyLenInWords != 0);
+ assert(m_accessTable->m_keyLenInWords == size);
unsigned pos = 0;
while (pos < 8 && pos < size) {
data[pos] = theKEYINFOptr[pos];
pos++;
}
- NdbApiSignal* tSignal = theFirstKEYINFO;
+ NdbApiSignal* tSignal = theTCREQ->next();
unsigned n = 0;
while (pos < size) {
if (n == 20) {
@@ -561,3 +512,113 @@ NdbOperation::getKeyFromTCREQ(Uint32* data, unsigned size)
}
return 0;
}
+
+int
+NdbOperation::handle_distribution_key(const Uint64* value, Uint32 len)
+{
+ if(theDistrKeyIndicator_ == 1 ||
+ (theNoOfTupKeyLeft > 0 && m_accessTable->m_noOfDistributionKeys > 1))
+ {
+ return 0;
+ }
+
+ if(m_accessTable->m_noOfDistributionKeys == 1)
+ {
+ setPartitionHash(value, len);
+ }
+ else
+ {
+ /**
+ * Copy distribution key to linear memory
+ */
+ NdbColumnImpl* const * cols = m_accessTable->m_columns.getBase();
+ Uint32 len = 0;
+ Uint64 tmp[1000];
+
+ Uint32 chunk = 8;
+ Uint32* dst = (Uint32*)tmp;
+ NdbApiSignal* tSignal = theTCREQ;
+ Uint32* src = ((TcKeyReq*)tSignal->getDataPtrSend())->keyInfo;
+ if(tSignal->readSignalNumber() == GSN_SCAN_TABREQ)
+ {
+ tSignal = tSignal->next();
+ src = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
+ chunk = KeyInfo::DataLength;
+ }
+
+ for(unsigned i = m_accessTable->m_columns.size(); i>0; cols++, i--)
+ {
+ if (!(* cols)->getPrimaryKey())
+ continue;
+
+ NdbColumnImpl* tAttrInfo = * cols;
+ Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
+ Uint32 currLen = (sizeInBytes + 3) >> 2;
+ if (tAttrInfo->getDistributionKey())
+ {
+ while (currLen >= chunk)
+ {
+ memcpy(dst, src, 4*chunk);
+ dst += chunk;
+ tSignal = tSignal->next();
+ src = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
+ currLen -= chunk;
+ chunk = KeyInfo::DataLength;
+ }
+
+ memcpy(dst, src, 4*currLen);
+ dst += currLen;
+ src += currLen;
+ chunk -= currLen;
+ }
+ else
+ {
+ while (currLen >= chunk)
+ {
+ tSignal = tSignal->next();
+ src = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
+ currLen -= chunk;
+ chunk = KeyInfo::DataLength;
+ }
+
+ src += currLen;
+ chunk -= currLen;
+ }
+ }
+ setPartitionHash(tmp, (Uint32*)tmp - dst);
+ }
+ return 0;
+}
+
+void
+NdbOperation::setPartitionHash(Uint32 value)
+{
+ union {
+ Uint32 tmp32;
+ Uint64 tmp64;
+ };
+
+ tmp32 = value;
+ setPartitionHash(&tmp64, 1);
+}
+
+void
+NdbOperation::setPartitionHash(const Uint64* value, Uint32 len)
+{
+ Uint32 buf[4];
+ md5_hash(buf, value, len);
+ setPartitionId(buf[1]);
+}
+
+void
+NdbOperation::setPartitionId(Uint32 value)
+{
+ theDistributionKey = value;
+ theDistrKeyIndicator_ = 1;
+}
+
+Uint32
+NdbOperation::getPartitionId() const
+{
+ return theDistributionKey;
+}