summaryrefslogtreecommitdiff
path: root/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp')
-rw-r--r--ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp390
1 files changed, 287 insertions, 103 deletions
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
index 7b642f90a17..8a55777ac05 100644
--- a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
+++ b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
@@ -40,7 +40,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
if ((AttributeDescriptor::getArrayType(attrDescriptor) == ZNON_ARRAY) ||
(AttributeDescriptor::getArrayType(attrDescriptor) == ZFIXED_ARRAY)) {
if (!AttributeDescriptor::getNullable(attrDescriptor)) {
- if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1) {
+ if (AttributeDescriptor::getSize(attrDescriptor) == 0){
+ ljam();
+ regTabPtr->readFunctionArray[i] = &Dbtup::readBitsNotNULL;
+ regTabPtr->updateFunctionArray[i] = &Dbtup::updateBitsNotNULL;
+ } else if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1){
ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHOneWordNotNULL;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHOneWordNotNULL;
@@ -55,13 +59,18 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
} else {
ndbrequire(false);
}//if
- // replace read function of char attribute
+ // replace functions for char attribute
if (AttributeOffset::getCharsetFlag(attrOffset)) {
ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readCharNotNULL;
+ regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNotNULL;
+ regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNotNULL;
}
} else {
- if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1) {
+ if (AttributeDescriptor::getSize(attrDescriptor) == 0){
+ ljam();
+ regTabPtr->readFunctionArray[i] = &Dbtup::readBitsNULLable;
+ regTabPtr->updateFunctionArray[i] = &Dbtup::updateBitsNULLable;
+ } else if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1){
ljam();
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHOneWordNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
@@ -78,10 +87,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr)
regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHZeroWordNULLable;
regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
}//if
- // replace read function of char attribute
+ // replace functions for char attribute
if (AttributeOffset::getCharsetFlag(attrOffset)) {
ljam();
- regTabPtr->readFunctionArray[i] = &Dbtup::readCharNULLable;
+ regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNULLable;
+ regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable;
}
}//if
} else if (AttributeDescriptor::getArrayType(attrDescriptor) == ZVAR_ARRAY) {
@@ -329,25 +339,68 @@ Dbtup::readFixedSizeTHManyWordNotNULL(Uint32* outBuffer,
Uint32 attrDes2)
{
Uint32 indexBuf = tOutBufIndex;
+ Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
- Uint32 newIndexBuf = indexBuf + attrNoOfWords;
Uint32 maxRead = tMaxRead;
ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset);
- if (newIndexBuf <= maxRead) {
- ljam();
- ahOut->setDataSize(attrNoOfWords);
- MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
- &tTupleHeader[readOffset],
- attrNoOfWords);
- tOutBufIndex = newIndexBuf;
- return true;
+ if (! charsetFlag || ! tXfrmFlag) {
+ Uint32 newIndexBuf = indexBuf + attrNoOfWords;
+ if (newIndexBuf <= maxRead) {
+ ljam();
+ ahOut->setDataSize(attrNoOfWords);
+ MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
+ &tTupleHeader[readOffset],
+ attrNoOfWords);
+ tOutBufIndex = newIndexBuf;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ }//if
} else {
ljam();
- terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
- return false;
- }//if
+ Tablerec* regTabPtr = tabptr.p;
+ Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+ uchar* dstPtr = (uchar*)&outBuffer[indexBuf];
+ const uchar* srcPtr = (uchar*)&tTupleHeader[readOffset];
+ Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
+ ndbrequire(i < regTabPtr->noOfCharsets);
+ CHARSET_INFO* cs = regTabPtr->charsetArray[i];
+ Uint32 typeId = AttributeDescriptor::getType(attrDescriptor);
+ Uint32 lb, len;
+ bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
+ if (ok) {
+ Uint32 xmul = cs->strxfrm_multiply;
+ if (xmul == 0)
+ xmul = 1;
+ // see comment in DbtcMain.cpp
+ Uint32 dstLen = xmul * (srcBytes - lb);
+ Uint32 maxIndexBuf = indexBuf + (dstLen >> 2);
+ if (maxIndexBuf <= maxRead) {
+ ljam();
+ int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
+ ndbrequire(n != -1);
+ while ((n & 3) != 0) {
+ dstPtr[n++] = 0;
+ }
+ Uint32 dstWords = (n >> 2);
+ ahOut->setDataSize(dstWords);
+ Uint32 newIndexBuf = indexBuf + dstWords;
+ ndbrequire(newIndexBuf <= maxRead);
+ tOutBufIndex = newIndexBuf;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ }
+ } else {
+ ljam();
+ terrorCode = ZTUPLE_CORRUPTED_ERROR;
+ }
+ }
+ return false;
}//Dbtup::readFixedSizeTHManyWordNotNULL()
bool
@@ -394,7 +447,6 @@ Dbtup::readFixedSizeTHManyWordNULLable(Uint32* outBuffer,
Uint32 attrDescriptor,
Uint32 attrDes2)
{
-ljam();
if (!nullFlagCheck(attrDes2)) {
ljam();
return readFixedSizeTHManyWordNotNULL(outBuffer,
@@ -555,74 +607,6 @@ Dbtup::readDynSmallVarSize(Uint32* outBuffer,
return false;
}//Dbtup::readDynSmallVarSize()
-
-bool
-Dbtup::readCharNotNULL(Uint32* outBuffer,
- AttributeHeader* ahOut,
- Uint32 attrDescriptor,
- Uint32 attrDes2)
-{
- Uint32 indexBuf = tOutBufIndex;
- Uint32 readOffset = AttributeOffset::getOffset(attrDes2);
- Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
- Uint32 newIndexBuf = indexBuf + attrNoOfWords;
- Uint32 maxRead = tMaxRead;
-
- ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset);
- if (newIndexBuf <= maxRead) {
- ljam();
- ahOut->setDataSize(attrNoOfWords);
- if (! tXfrmFlag) {
- MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
- &tTupleHeader[readOffset],
- attrNoOfWords);
- } else {
- ljam();
- Tablerec* regTabPtr = tabptr.p;
- Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
- ndbrequire(i < tabptr.p->noOfCharsets);
- // not const in MySQL
- CHARSET_INFO* cs = tabptr.p->charsetArray[i];
- // XXX should strip Uint32 null padding
- const unsigned nBytes = attrNoOfWords << 2;
- unsigned n =
- (*cs->coll->strnxfrm)(cs,
- (uchar*)&outBuffer[indexBuf],
- nBytes,
- (const uchar*)&tTupleHeader[readOffset],
- nBytes);
- // pad with ascii spaces
- while (n < nBytes)
- ((uchar*)&outBuffer[indexBuf])[n++] = 0x20;
- }
- tOutBufIndex = newIndexBuf;
- return true;
- } else {
- ljam();
- terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
- return false;
- }
-}
-
-bool
-Dbtup::readCharNULLable(Uint32* outBuffer,
- AttributeHeader* ahOut,
- Uint32 attrDescriptor,
- Uint32 attrDes2)
-{
- if (!nullFlagCheck(attrDes2)) {
- ljam();
- return readCharNotNULL(outBuffer,
- ahOut,
- attrDescriptor,
- attrDes2);
- } else {
- ljam();
- ahOut->setNULL();
- return true;
- }
-}
-
/* ---------------------------------------------------------------------- */
/* THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS */
/* USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE */
@@ -701,22 +685,16 @@ Dbtup::checkUpdateOfPrimaryKey(Uint32* updateBuffer, Tablerec* const regTabPtr)
Uint32 attrDescriptor = tableDescriptor[attrDescriptorIndex].tabDescr;
Uint32 attributeOffset = tableDescriptor[attrDescriptorIndex + 1].tabDescr;
- Uint32 xfrmBuffer[1 + MAX_KEY_SIZE_IN_WORDS * 1]; // strxfrm_multiply == 1
+ Uint32 xfrmBuffer[1 + MAX_KEY_SIZE_IN_WORDS * MAX_XFRM_MULTIPLY];
Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attributeOffset);
if (charsetFlag) {
- Uint32 csPos = AttributeOffset::getCharsetPos(attributeOffset);
- CHARSET_INFO* cs = regTabPtr->charsetArray[csPos];
- Uint32 sizeInBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
- Uint32 sizeInWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
- const uchar* srcPtr = (uchar*)&updateBuffer[1];
- uchar* dstPtr = (uchar*)&xfrmBuffer[1];
- Uint32 n =
- (*cs->coll->strnxfrm)(cs, dstPtr, sizeInBytes, srcPtr, sizeInBytes);
- // pad with blanks (unlikely) and zeroes to match NDB API behaviour
- while (n < sizeInBytes)
- dstPtr[n++] = 0x20;
- while (n < 4 * sizeInWords)
- dstPtr[n++] = 0;
+ Uint32 csIndex = AttributeOffset::getCharsetPos(attributeOffset);
+ CHARSET_INFO* cs = regTabPtr->charsetArray[csIndex];
+ Uint32 srcPos = 0;
+ Uint32 dstPos = 0;
+ xfrm_attr(attrDescriptor, cs, &updateBuffer[1], srcPos,
+ &xfrmBuffer[1], dstPos, MAX_KEY_SIZE_IN_WORDS * MAX_XFRM_MULTIPLY);
+ ahIn.setDataSize(dstPos);
xfrmBuffer[0] = ahIn.m_value;
updateBuffer = xfrmBuffer;
}
@@ -831,6 +809,7 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer,
Uint32 indexBuf = tInBufIndex;
Uint32 inBufLen = tInBufLen;
Uint32 updateOffset = AttributeOffset::getOffset(attrDes2);
+ Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
AttributeHeader ahIn(inBuffer[indexBuf]);
Uint32 nullIndicator = ahIn.isNULL();
Uint32 noOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor);
@@ -840,6 +819,31 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer,
if (newIndex <= inBufLen) {
if (!nullIndicator) {
ljam();
+ if (charsetFlag) {
+ ljam();
+ Tablerec* regTabPtr = tabptr.p;
+ Uint32 typeId = AttributeDescriptor::getType(attrDescriptor);
+ Uint32 bytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+ Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
+ ndbrequire(i < regTabPtr->noOfCharsets);
+ // not const in MySQL
+ CHARSET_INFO* cs = regTabPtr->charsetArray[i];
+ int not_used;
+ const char* ssrc = (const char*)&inBuffer[tInBufIndex + 1];
+ Uint32 lb, len;
+ if (! NdbSqlUtil::get_var_length(typeId, ssrc, bytes, lb, len)) {
+ ljam();
+ terrorCode = ZINVALID_CHAR_FORMAT;
+ return false;
+ }
+ // fast fix bug#7340
+ if (typeId != NDB_TYPE_TEXT &&
+ (*cs->cset->well_formed_len)(cs, ssrc + lb, ssrc + lb + len, ZNIL, &not_used) != len) {
+ ljam();
+ terrorCode = ZINVALID_CHAR_FORMAT;
+ return false;
+ }
+ }
tInBufIndex = newIndex;
MEMCOPY_NO_WORDS(&tTupleHeader[updateOffset],
&inBuffer[indexBuf + 1],
@@ -1011,18 +1015,198 @@ Dbtup::read_psuedo(Uint32 attrId, Uint32* outBuffer){
Signal * signal = (Signal*)&tmp;
switch(attrId){
case AttributeHeader::FRAGMENT:
- * outBuffer = operPtr.p->fragId;
+ * outBuffer = operPtr.p->fragId >> 1; // remove "hash" bit
+ return 1;
+ case AttributeHeader::FRAGMENT_MEMORY:
+ {
+ Uint64 tmp = 0;
+ tmp += fragptr.p->noOfPages;
+ {
+ /**
+ * Each fragment is split into 2...get #pages from other as well
+ */
+ Uint32 twin = fragptr.p->fragmentId ^ 1;
+ FragrecordPtr twinPtr;
+ getFragmentrec(twinPtr, twin, tabptr.p);
+ ndbrequire(twinPtr.p != 0);
+ tmp += twinPtr.p->noOfPages;
+ }
+ tmp *= 32768;
+ memcpy(outBuffer,&tmp,8);
+ }
+ return 2;
+ case AttributeHeader::ROW_SIZE:
+ * outBuffer = tabptr.p->tupheadsize << 2;
return 1;
case AttributeHeader::ROW_COUNT:
case AttributeHeader::COMMIT_COUNT:
signal->theData[0] = operPtr.p->userpointer;
signal->theData[1] = attrId;
-
+
EXECUTE_DIRECT(DBLQH, GSN_READ_PSUEDO_REQ, signal, 2);
outBuffer[0] = signal->theData[0];
outBuffer[1] = signal->theData[1];
return 2;
+ case AttributeHeader::RANGE_NO:
+ signal->theData[0] = operPtr.p->userpointer;
+ signal->theData[1] = attrId;
+
+ EXECUTE_DIRECT(DBLQH, GSN_READ_PSUEDO_REQ, signal, 2);
+ outBuffer[0] = signal->theData[0];
+ return 1;
default:
return 0;
}
}
+
+bool
+Dbtup::readBitsNotNULL(Uint32* outBuffer,
+ AttributeHeader* ahOut,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr = tabptr.p;
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+ Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
+ Uint32 indexBuf = tOutBufIndex;
+ Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5);
+ Uint32 maxRead = tMaxRead;
+
+ if (newIndexBuf <= maxRead) {
+ ljam();
+ ahOut->setDataSize((bitCount + 31) >> 5);
+ tOutBufIndex = newIndexBuf;
+
+ BitmaskImpl::getField(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos,
+ bitCount,
+ outBuffer+indexBuf);
+
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ return false;
+ }//if
+}
+
+bool
+Dbtup::readBitsNULLable(Uint32* outBuffer,
+ AttributeHeader* ahOut,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr = tabptr.p;
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+ Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
+
+ Uint32 indexBuf = tOutBufIndex;
+ Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5);
+ Uint32 maxRead = tMaxRead;
+
+ if(BitmaskImpl::get(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos))
+ {
+ ljam();
+ ahOut->setNULL();
+ return true;
+ }
+
+
+ if (newIndexBuf <= maxRead) {
+ ljam();
+ ahOut->setDataSize((bitCount + 31) >> 5);
+ tOutBufIndex = newIndexBuf;
+ BitmaskImpl::getField(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos+1,
+ bitCount,
+ outBuffer+indexBuf);
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ return false;
+ }//if
+}
+
+bool
+Dbtup::updateBitsNotNULL(Uint32* inBuffer,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr = tabptr.p;
+ Uint32 indexBuf = tInBufIndex;
+ Uint32 inBufLen = tInBufLen;
+ AttributeHeader ahIn(inBuffer[indexBuf]);
+ Uint32 nullIndicator = ahIn.isNULL();
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+ Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
+ Uint32 newIndex = indexBuf + 1 + ((bitCount + 31) >> 5);
+
+ if (newIndex <= inBufLen) {
+ if (!nullIndicator) {
+ BitmaskImpl::setField(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos,
+ bitCount,
+ inBuffer+indexBuf+1);
+ tInBufIndex = newIndex;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZNOT_NULL_ATTR;
+ return false;
+ }//if
+ } else {
+ ljam();
+ terrorCode = ZAI_INCONSISTENCY_ERROR;
+ return false;
+ }//if
+ return true;
+}
+
+bool
+Dbtup::updateBitsNULLable(Uint32* inBuffer,
+ Uint32 attrDescriptor,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr = tabptr.p;
+ AttributeHeader ahIn(inBuffer[tInBufIndex]);
+ Uint32 indexBuf = tInBufIndex;
+ Uint32 nullIndicator = ahIn.isNULL();
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+ Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor);
+
+ if (!nullIndicator) {
+ BitmaskImpl::clear(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos);
+ BitmaskImpl::setField(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos+1,
+ bitCount,
+ inBuffer+indexBuf+1);
+
+ Uint32 newIndex = indexBuf + 1 + ((bitCount + 31) >> 5);
+ tInBufIndex = newIndex;
+ return true;
+ } else {
+ Uint32 newIndex = tInBufIndex + 1;
+ if (newIndex <= tInBufLen) {
+ ljam();
+ BitmaskImpl::set(regTabPtr->tupNullWords,
+ tTupleHeader+regTabPtr->tupNullIndex,
+ pos);
+
+ tInBufIndex = newIndex;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZAI_INCONSISTENCY_ERROR;
+ return false;
+ }//if
+ }//if
+}