diff options
Diffstat (limited to 'ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp')
-rw-r--r-- | ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp | 390 |
1 files changed, 287 insertions, 103 deletions
diff --git a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp index 7b642f90a17..8a55777ac05 100644 --- a/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp +++ b/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp @@ -40,7 +40,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) if ((AttributeDescriptor::getArrayType(attrDescriptor) == ZNON_ARRAY) || (AttributeDescriptor::getArrayType(attrDescriptor) == ZFIXED_ARRAY)) { if (!AttributeDescriptor::getNullable(attrDescriptor)) { - if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1) { + if (AttributeDescriptor::getSize(attrDescriptor) == 0){ + ljam(); + regTabPtr->readFunctionArray[i] = &Dbtup::readBitsNotNULL; + regTabPtr->updateFunctionArray[i] = &Dbtup::updateBitsNotNULL; + } else if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1){ ljam(); regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHOneWordNotNULL; regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHOneWordNotNULL; @@ -55,13 +59,18 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) } else { ndbrequire(false); }//if - // replace read function of char attribute + // replace functions for char attribute if (AttributeOffset::getCharsetFlag(attrOffset)) { ljam(); - regTabPtr->readFunctionArray[i] = &Dbtup::readCharNotNULL; + regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNotNULL; + regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNotNULL; } } else { - if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1) { + if (AttributeDescriptor::getSize(attrDescriptor) == 0){ + ljam(); + regTabPtr->readFunctionArray[i] = &Dbtup::readBitsNULLable; + regTabPtr->updateFunctionArray[i] = &Dbtup::updateBitsNULLable; + } else if (AttributeDescriptor::getSizeInWords(attrDescriptor) == 1){ ljam(); regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHOneWordNULLable; regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable; @@ -78,10 +87,11 @@ Dbtup::setUpQueryRoutines(Tablerec* const regTabPtr) regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHZeroWordNULLable; regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable; }//if - // replace read function of char attribute + // replace functions for char attribute if (AttributeOffset::getCharsetFlag(attrOffset)) { ljam(); - regTabPtr->readFunctionArray[i] = &Dbtup::readCharNULLable; + regTabPtr->readFunctionArray[i] = &Dbtup::readFixedSizeTHManyWordNULLable; + regTabPtr->updateFunctionArray[i] = &Dbtup::updateFixedSizeTHManyWordNULLable; } }//if } else if (AttributeDescriptor::getArrayType(attrDescriptor) == ZVAR_ARRAY) { @@ -329,25 +339,68 @@ Dbtup::readFixedSizeTHManyWordNotNULL(Uint32* outBuffer, Uint32 attrDes2) { Uint32 indexBuf = tOutBufIndex; + Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2); Uint32 readOffset = AttributeOffset::getOffset(attrDes2); Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor); - Uint32 newIndexBuf = indexBuf + attrNoOfWords; Uint32 maxRead = tMaxRead; ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset); - if (newIndexBuf <= maxRead) { - ljam(); - ahOut->setDataSize(attrNoOfWords); - MEMCOPY_NO_WORDS(&outBuffer[indexBuf], - &tTupleHeader[readOffset], - attrNoOfWords); - tOutBufIndex = newIndexBuf; - return true; + if (! charsetFlag || ! tXfrmFlag) { + Uint32 newIndexBuf = indexBuf + attrNoOfWords; + if (newIndexBuf <= maxRead) { + ljam(); + ahOut->setDataSize(attrNoOfWords); + MEMCOPY_NO_WORDS(&outBuffer[indexBuf], + &tTupleHeader[readOffset], + attrNoOfWords); + tOutBufIndex = newIndexBuf; + return true; + } else { + ljam(); + terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; + }//if } else { ljam(); - terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; - return false; - }//if + Tablerec* regTabPtr = tabptr.p; + Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor); + uchar* dstPtr = (uchar*)&outBuffer[indexBuf]; + const uchar* srcPtr = (uchar*)&tTupleHeader[readOffset]; + Uint32 i = AttributeOffset::getCharsetPos(attrDes2); + ndbrequire(i < regTabPtr->noOfCharsets); + CHARSET_INFO* cs = regTabPtr->charsetArray[i]; + Uint32 typeId = AttributeDescriptor::getType(attrDescriptor); + Uint32 lb, len; + bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len); + if (ok) { + Uint32 xmul = cs->strxfrm_multiply; + if (xmul == 0) + xmul = 1; + // see comment in DbtcMain.cpp + Uint32 dstLen = xmul * (srcBytes - lb); + Uint32 maxIndexBuf = indexBuf + (dstLen >> 2); + if (maxIndexBuf <= maxRead) { + ljam(); + int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len); + ndbrequire(n != -1); + while ((n & 3) != 0) { + dstPtr[n++] = 0; + } + Uint32 dstWords = (n >> 2); + ahOut->setDataSize(dstWords); + Uint32 newIndexBuf = indexBuf + dstWords; + ndbrequire(newIndexBuf <= maxRead); + tOutBufIndex = newIndexBuf; + return true; + } else { + ljam(); + terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; + } + } else { + ljam(); + terrorCode = ZTUPLE_CORRUPTED_ERROR; + } + } + return false; }//Dbtup::readFixedSizeTHManyWordNotNULL() bool @@ -394,7 +447,6 @@ Dbtup::readFixedSizeTHManyWordNULLable(Uint32* outBuffer, Uint32 attrDescriptor, Uint32 attrDes2) { -ljam(); if (!nullFlagCheck(attrDes2)) { ljam(); return readFixedSizeTHManyWordNotNULL(outBuffer, @@ -555,74 +607,6 @@ Dbtup::readDynSmallVarSize(Uint32* outBuffer, return false; }//Dbtup::readDynSmallVarSize() - -bool -Dbtup::readCharNotNULL(Uint32* outBuffer, - AttributeHeader* ahOut, - Uint32 attrDescriptor, - Uint32 attrDes2) -{ - Uint32 indexBuf = tOutBufIndex; - Uint32 readOffset = AttributeOffset::getOffset(attrDes2); - Uint32 attrNoOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor); - Uint32 newIndexBuf = indexBuf + attrNoOfWords; - Uint32 maxRead = tMaxRead; - - ndbrequire((readOffset + attrNoOfWords - 1) < tCheckOffset); - if (newIndexBuf <= maxRead) { - ljam(); - ahOut->setDataSize(attrNoOfWords); - if (! tXfrmFlag) { - MEMCOPY_NO_WORDS(&outBuffer[indexBuf], - &tTupleHeader[readOffset], - attrNoOfWords); - } else { - ljam(); - Tablerec* regTabPtr = tabptr.p; - Uint32 i = AttributeOffset::getCharsetPos(attrDes2); - ndbrequire(i < tabptr.p->noOfCharsets); - // not const in MySQL - CHARSET_INFO* cs = tabptr.p->charsetArray[i]; - // XXX should strip Uint32 null padding - const unsigned nBytes = attrNoOfWords << 2; - unsigned n = - (*cs->coll->strnxfrm)(cs, - (uchar*)&outBuffer[indexBuf], - nBytes, - (const uchar*)&tTupleHeader[readOffset], - nBytes); - // pad with ascii spaces - while (n < nBytes) - ((uchar*)&outBuffer[indexBuf])[n++] = 0x20; - } - tOutBufIndex = newIndexBuf; - return true; - } else { - ljam(); - terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; - return false; - } -} - -bool -Dbtup::readCharNULLable(Uint32* outBuffer, - AttributeHeader* ahOut, - Uint32 attrDescriptor, - Uint32 attrDes2) -{ - if (!nullFlagCheck(attrDes2)) { - ljam(); - return readCharNotNULL(outBuffer, - ahOut, - attrDescriptor, - attrDes2); - } else { - ljam(); - ahOut->setNULL(); - return true; - } -} - /* ---------------------------------------------------------------------- */ /* THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS */ /* USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE */ @@ -701,22 +685,16 @@ Dbtup::checkUpdateOfPrimaryKey(Uint32* updateBuffer, Tablerec* const regTabPtr) Uint32 attrDescriptor = tableDescriptor[attrDescriptorIndex].tabDescr; Uint32 attributeOffset = tableDescriptor[attrDescriptorIndex + 1].tabDescr; - Uint32 xfrmBuffer[1 + MAX_KEY_SIZE_IN_WORDS * 1]; // strxfrm_multiply == 1 + Uint32 xfrmBuffer[1 + MAX_KEY_SIZE_IN_WORDS * MAX_XFRM_MULTIPLY]; Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attributeOffset); if (charsetFlag) { - Uint32 csPos = AttributeOffset::getCharsetPos(attributeOffset); - CHARSET_INFO* cs = regTabPtr->charsetArray[csPos]; - Uint32 sizeInBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor); - Uint32 sizeInWords = AttributeDescriptor::getSizeInWords(attrDescriptor); - const uchar* srcPtr = (uchar*)&updateBuffer[1]; - uchar* dstPtr = (uchar*)&xfrmBuffer[1]; - Uint32 n = - (*cs->coll->strnxfrm)(cs, dstPtr, sizeInBytes, srcPtr, sizeInBytes); - // pad with blanks (unlikely) and zeroes to match NDB API behaviour - while (n < sizeInBytes) - dstPtr[n++] = 0x20; - while (n < 4 * sizeInWords) - dstPtr[n++] = 0; + Uint32 csIndex = AttributeOffset::getCharsetPos(attributeOffset); + CHARSET_INFO* cs = regTabPtr->charsetArray[csIndex]; + Uint32 srcPos = 0; + Uint32 dstPos = 0; + xfrm_attr(attrDescriptor, cs, &updateBuffer[1], srcPos, + &xfrmBuffer[1], dstPos, MAX_KEY_SIZE_IN_WORDS * MAX_XFRM_MULTIPLY); + ahIn.setDataSize(dstPos); xfrmBuffer[0] = ahIn.m_value; updateBuffer = xfrmBuffer; } @@ -831,6 +809,7 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer, Uint32 indexBuf = tInBufIndex; Uint32 inBufLen = tInBufLen; Uint32 updateOffset = AttributeOffset::getOffset(attrDes2); + Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2); AttributeHeader ahIn(inBuffer[indexBuf]); Uint32 nullIndicator = ahIn.isNULL(); Uint32 noOfWords = AttributeDescriptor::getSizeInWords(attrDescriptor); @@ -840,6 +819,31 @@ Dbtup::updateFixedSizeTHManyWordNotNULL(Uint32* inBuffer, if (newIndex <= inBufLen) { if (!nullIndicator) { ljam(); + if (charsetFlag) { + ljam(); + Tablerec* regTabPtr = tabptr.p; + Uint32 typeId = AttributeDescriptor::getType(attrDescriptor); + Uint32 bytes = AttributeDescriptor::getSizeInBytes(attrDescriptor); + Uint32 i = AttributeOffset::getCharsetPos(attrDes2); + ndbrequire(i < regTabPtr->noOfCharsets); + // not const in MySQL + CHARSET_INFO* cs = regTabPtr->charsetArray[i]; + int not_used; + const char* ssrc = (const char*)&inBuffer[tInBufIndex + 1]; + Uint32 lb, len; + if (! NdbSqlUtil::get_var_length(typeId, ssrc, bytes, lb, len)) { + ljam(); + terrorCode = ZINVALID_CHAR_FORMAT; + return false; + } + // fast fix bug#7340 + if (typeId != NDB_TYPE_TEXT && + (*cs->cset->well_formed_len)(cs, ssrc + lb, ssrc + lb + len, ZNIL, ¬_used) != len) { + ljam(); + terrorCode = ZINVALID_CHAR_FORMAT; + return false; + } + } tInBufIndex = newIndex; MEMCOPY_NO_WORDS(&tTupleHeader[updateOffset], &inBuffer[indexBuf + 1], @@ -1011,18 +1015,198 @@ Dbtup::read_psuedo(Uint32 attrId, Uint32* outBuffer){ Signal * signal = (Signal*)&tmp; switch(attrId){ case AttributeHeader::FRAGMENT: - * outBuffer = operPtr.p->fragId; + * outBuffer = operPtr.p->fragId >> 1; // remove "hash" bit + return 1; + case AttributeHeader::FRAGMENT_MEMORY: + { + Uint64 tmp = 0; + tmp += fragptr.p->noOfPages; + { + /** + * Each fragment is split into 2...get #pages from other as well + */ + Uint32 twin = fragptr.p->fragmentId ^ 1; + FragrecordPtr twinPtr; + getFragmentrec(twinPtr, twin, tabptr.p); + ndbrequire(twinPtr.p != 0); + tmp += twinPtr.p->noOfPages; + } + tmp *= 32768; + memcpy(outBuffer,&tmp,8); + } + return 2; + case AttributeHeader::ROW_SIZE: + * outBuffer = tabptr.p->tupheadsize << 2; return 1; case AttributeHeader::ROW_COUNT: case AttributeHeader::COMMIT_COUNT: signal->theData[0] = operPtr.p->userpointer; signal->theData[1] = attrId; - + EXECUTE_DIRECT(DBLQH, GSN_READ_PSUEDO_REQ, signal, 2); outBuffer[0] = signal->theData[0]; outBuffer[1] = signal->theData[1]; return 2; + case AttributeHeader::RANGE_NO: + signal->theData[0] = operPtr.p->userpointer; + signal->theData[1] = attrId; + + EXECUTE_DIRECT(DBLQH, GSN_READ_PSUEDO_REQ, signal, 2); + outBuffer[0] = signal->theData[0]; + return 1; default: return 0; } } + +bool +Dbtup::readBitsNotNULL(Uint32* outBuffer, + AttributeHeader* ahOut, + Uint32 attrDescriptor, + Uint32 attrDes2) +{ + Tablerec* const regTabPtr = tabptr.p; + Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2); + Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor); + Uint32 indexBuf = tOutBufIndex; + Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5); + Uint32 maxRead = tMaxRead; + + if (newIndexBuf <= maxRead) { + ljam(); + ahOut->setDataSize((bitCount + 31) >> 5); + tOutBufIndex = newIndexBuf; + + BitmaskImpl::getField(regTabPtr->tupNullWords, + tTupleHeader+regTabPtr->tupNullIndex, + pos, + bitCount, + outBuffer+indexBuf); + + return true; + } else { + ljam(); + terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; + return false; + }//if +} + +bool +Dbtup::readBitsNULLable(Uint32* outBuffer, + AttributeHeader* ahOut, + Uint32 attrDescriptor, + Uint32 attrDes2) +{ + Tablerec* const regTabPtr = tabptr.p; + Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2); + Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor); + + Uint32 indexBuf = tOutBufIndex; + Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5); + Uint32 maxRead = tMaxRead; + + if(BitmaskImpl::get(regTabPtr->tupNullWords, + tTupleHeader+regTabPtr->tupNullIndex, + pos)) + { + ljam(); + ahOut->setNULL(); + return true; + } + + + if (newIndexBuf <= maxRead) { + ljam(); + ahOut->setDataSize((bitCount + 31) >> 5); + tOutBufIndex = newIndexBuf; + BitmaskImpl::getField(regTabPtr->tupNullWords, + tTupleHeader+regTabPtr->tupNullIndex, + pos+1, + bitCount, + outBuffer+indexBuf); + return true; + } else { + ljam(); + terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR; + return false; + }//if +} + +bool +Dbtup::updateBitsNotNULL(Uint32* inBuffer, + Uint32 attrDescriptor, + Uint32 attrDes2) +{ + Tablerec* const regTabPtr = tabptr.p; + Uint32 indexBuf = tInBufIndex; + Uint32 inBufLen = tInBufLen; + AttributeHeader ahIn(inBuffer[indexBuf]); + Uint32 nullIndicator = ahIn.isNULL(); + Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2); + Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor); + Uint32 newIndex = indexBuf + 1 + ((bitCount + 31) >> 5); + + if (newIndex <= inBufLen) { + if (!nullIndicator) { + BitmaskImpl::setField(regTabPtr->tupNullWords, + tTupleHeader+regTabPtr->tupNullIndex, + pos, + bitCount, + inBuffer+indexBuf+1); + tInBufIndex = newIndex; + return true; + } else { + ljam(); + terrorCode = ZNOT_NULL_ATTR; + return false; + }//if + } else { + ljam(); + terrorCode = ZAI_INCONSISTENCY_ERROR; + return false; + }//if + return true; +} + +bool +Dbtup::updateBitsNULLable(Uint32* inBuffer, + Uint32 attrDescriptor, + Uint32 attrDes2) +{ + Tablerec* const regTabPtr = tabptr.p; + AttributeHeader ahIn(inBuffer[tInBufIndex]); + Uint32 indexBuf = tInBufIndex; + Uint32 nullIndicator = ahIn.isNULL(); + Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2); + Uint32 bitCount = AttributeDescriptor::getArraySize(attrDescriptor); + + if (!nullIndicator) { + BitmaskImpl::clear(regTabPtr->tupNullWords, + tTupleHeader+regTabPtr->tupNullIndex, + pos); + BitmaskImpl::setField(regTabPtr->tupNullWords, + tTupleHeader+regTabPtr->tupNullIndex, + pos+1, + bitCount, + inBuffer+indexBuf+1); + + Uint32 newIndex = indexBuf + 1 + ((bitCount + 31) >> 5); + tInBufIndex = newIndex; + return true; + } else { + Uint32 newIndex = tInBufIndex + 1; + if (newIndex <= tInBufLen) { + ljam(); + BitmaskImpl::set(regTabPtr->tupNullWords, + tTupleHeader+regTabPtr->tupNullIndex, + pos); + + tInBufIndex = newIndex; + return true; + } else { + ljam(); + terrorCode = ZAI_INCONSISTENCY_ERROR; + return false; + }//if + }//if +} |