summaryrefslogtreecommitdiff
path: root/storage/ndb/src/ndbapi/NdbBlob.cpp
diff options
context:
space:
mode:
authorunknown <pekka@mysql.com>2006-03-09 13:27:58 +0100
committerunknown <pekka@mysql.com>2006-03-09 13:27:58 +0100
commit7f86cede6ac1e48292ada5d0f41f438890f20691 (patch)
tree7dd270f8b31c795fe173afa823f4a31bfe130407 /storage/ndb/src/ndbapi/NdbBlob.cpp
parente0f44ddf82d16ef8a475318299639c8ee1cea2d6 (diff)
downloadmariadb-git-7f86cede6ac1e48292ada5d0f41f438890f20691.tar.gz
ndb - fix bug#18075 varsize PK + blobs
storage/ndb/include/ndbapi/NdbBlob.hpp: fast fix bug#18075 storage/ndb/include/ndbapi/NdbOperation.hpp: fast fix bug#18075 storage/ndb/include/ndbapi/NdbScanOperation.hpp: fast fix bug#18075 storage/ndb/src/ndbapi/NdbBlob.cpp: fast fix bug#18075 storage/ndb/src/ndbapi/NdbBlobImpl.hpp: fast fix bug#18075 storage/ndb/src/ndbapi/NdbOperationSearch.cpp: fast fix bug#18075 storage/ndb/src/ndbapi/NdbRecAttr.cpp: fast fix bug#18075 storage/ndb/src/ndbapi/NdbScanOperation.cpp: fast fix bug#18075 storage/ndb/src/ndbapi/ndberror.c: fast fix bug#18075 storage/ndb/test/ndbapi/testBlobs.cpp: fast fix bug#18075
Diffstat (limited to 'storage/ndb/src/ndbapi/NdbBlob.cpp')
-rw-r--r--storage/ndb/src/ndbapi/NdbBlob.cpp117
1 files changed, 99 insertions, 18 deletions
diff --git a/storage/ndb/src/ndbapi/NdbBlob.cpp b/storage/ndb/src/ndbapi/NdbBlob.cpp
index 663d346c8ef..680f9e5c70e 100644
--- a/storage/ndb/src/ndbapi/NdbBlob.cpp
+++ b/storage/ndb/src/ndbapi/NdbBlob.cpp
@@ -310,7 +310,7 @@ NdbBlob::Buf::alloc(unsigned n)
void
NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src)
{
- assert(size == src.size);
+ size = src.size;
memcpy(data, src.data, size);
}
@@ -408,6 +408,75 @@ NdbBlob::getDistKey(Uint32 part)
return (part / theStripeSize) % theStripeSize;
}
+// pack/unpack table/index key XXX support routines, shortcuts
+
+int
+NdbBlob::packKeyValue(const NdbTableImpl* aTable, const Buf& srcBuf)
+{
+ DBUG_ENTER("NdbBlob::packKeyValue");
+ const Uint32* data = (const Uint32*)srcBuf.data;
+ unsigned pos = 0;
+ Uint32* pack_data = (Uint32*)thePackKeyBuf.data;
+ unsigned pack_pos = 0;
+ for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
+ NdbColumnImpl* c = aTable->m_columns[i];
+ assert(c != NULL);
+ if (c->m_pk) {
+ unsigned len = c->m_attrSize * c->m_arraySize;
+ Uint32 pack_len;
+ bool ok = c->get_var_length(&data[pos], pack_len);
+ if (! ok) {
+ setErrorCode(NdbBlobImpl::ErrCorruptPK);
+ DBUG_RETURN(-1);
+ }
+ memcpy(&pack_data[pack_pos], &data[pos], pack_len);
+ while (pack_len % 4 != 0) {
+ char* p = (char*)&pack_data[pack_pos] + pack_len++;
+ *p = 0;
+ }
+ pos += (len + 3) / 4;
+ pack_pos += pack_len / 4;
+ }
+ }
+ assert(4 * pos == srcBuf.size);
+ assert(4 * pack_pos <= thePackKeyBuf.maxsize);
+ thePackKeyBuf.size = 4 * pack_pos;
+ DBUG_RETURN(0);
+}
+
+int
+NdbBlob::unpackKeyValue(const NdbTableImpl* aTable, Buf& dstBuf)
+{
+ DBUG_ENTER("NdbBlob::unpackKeyValue");
+ Uint32* data = (Uint32*)dstBuf.data;
+ unsigned pos = 0;
+ const Uint32* pack_data = (const Uint32*)thePackKeyBuf.data;
+ unsigned pack_pos = 0;
+ for (unsigned i = 0; i < aTable->m_columns.size(); i++) {
+ NdbColumnImpl* c = aTable->m_columns[i];
+ assert(c != NULL);
+ if (c->m_pk) {
+ unsigned len = c->m_attrSize * c->m_arraySize;
+ Uint32 pack_len;
+ bool ok = c->get_var_length(&pack_data[pack_pos], pack_len);
+ if (! ok) {
+ setErrorCode(NdbBlobImpl::ErrCorruptPK);
+ DBUG_RETURN(-1);
+ }
+ memcpy(&data[pos], &pack_data[pack_pos], pack_len);
+ while (pack_len % 4 != 0) {
+ char* p = (char*)&data[pos] + pack_len++;
+ *p = 0;
+ }
+ pos += (len + 3) / 4;
+ pack_pos += pack_len / 4;
+ }
+ }
+ assert(4 * pos == dstBuf.size);
+ assert(4 * pack_pos == thePackKeyBuf.size);
+ DBUG_RETURN(0);
+}
+
// getters and setters
int
@@ -489,12 +558,10 @@ int
NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part)
{
DBUG_ENTER("NdbBlob::setPartKeyValue");
- DBUG_PRINT("info", ("dist=%u part=%u key=", getDistKey(part), part));
- DBUG_DUMP("info", theKeyBuf.data, 4 * theTable->m_keyLenInWords);
- //Uint32* data = (Uint32*)theKeyBuf.data;
- //unsigned size = theTable->m_keyLenInWords;
+ DBUG_PRINT("info", ("dist=%u part=%u packkey=", getDistKey(part), part));
+ DBUG_DUMP("info", thePackKeyBuf.data, 4 * thePackKeyBuf.size);
// TODO use attr ids after compatibility with 4.1.7 not needed
- if (anOp->equal("PK", theKeyBuf.data) == -1 ||
+ if (anOp->equal("PK", thePackKeyBuf.data) == -1 ||
anOp->equal("DIST", getDistKey(part)) == -1 ||
anOp->equal("PART", part) == -1) {
setErrorCode(anOp);
@@ -1242,21 +1309,27 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
if (isKeyOp()) {
if (isTableOp()) {
// get table key
- Uint32* data = (Uint32*)theKeyBuf.data;
- unsigned size = theTable->m_keyLenInWords;
+ Uint32* data = (Uint32*)thePackKeyBuf.data;
+ Uint32 size = theTable->m_keyLenInWords; // in-out
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
setErrorCode(NdbBlobImpl::ErrUsage);
DBUG_RETURN(-1);
}
+ thePackKeyBuf.size = 4 * size;
+ if (unpackKeyValue(theTable, theKeyBuf) == -1)
+ DBUG_RETURN(-1);
}
if (isIndexOp()) {
// get index key
- Uint32* data = (Uint32*)theAccessKeyBuf.data;
- unsigned size = theAccessTable->m_keyLenInWords;
+ Uint32* data = (Uint32*)thePackKeyBuf.data;
+ Uint32 size = theAccessTable->m_keyLenInWords; // in-out
if (theNdbOp->getKeyFromTCREQ(data, size) == -1) {
setErrorCode(NdbBlobImpl::ErrUsage);
DBUG_RETURN(-1);
}
+ thePackKeyBuf.size = 4 * size;
+ if (unpackKeyValue(theAccessTable, theAccessKeyBuf) == -1)
+ DBUG_RETURN(-1);
}
if (isReadOp()) {
// add read of head+inline in this op
@@ -1303,6 +1376,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
theEventOp = anOp;
theBlobEventOp = aBlobOp;
theTable = anOp->m_eventImpl->m_tableImpl;
+ theAccessTable = theTable;
theColumn = aColumn;
// prepare blob column and table
if (prepareColumn() == -1)
@@ -1321,7 +1395,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
if (theBlobEventOp != NULL) {
if ((theBlobEventPkRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0),
- theKeyBuf.data, version)) == NULL ||
+ thePackKeyBuf.data, version)) == NULL ||
(theBlobEventDistRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)1),
(char*)0, version)) == NULL ||
@@ -1380,6 +1454,7 @@ NdbBlob::prepareColumn()
}
// these buffers are always used
theKeyBuf.alloc(theTable->m_keyLenInWords << 2);
+ thePackKeyBuf.alloc(max(theTable->m_keyLenInWords, theAccessTable->m_keyLenInWords) << 2);
theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize);
theHead = (Head*)theHeadInlineBuf.data;
theInlineData = theHeadInlineBuf.data + sizeof(Head);
@@ -1464,7 +1539,7 @@ NdbBlob::preExecute(NdbTransaction::ExecType anExecType, bool& batch)
if (tOp == NULL ||
tOp->readTuple() == -1 ||
setAccessKeyValue(tOp) == -1 ||
- tOp->getValue(pkAttrId, theKeyBuf.data) == NULL) {
+ tOp->getValue(pkAttrId, thePackKeyBuf.data) == NULL) {
setErrorCode(tOp);
DBUG_RETURN(-1);
}
@@ -1553,10 +1628,12 @@ NdbBlob::postExecute(NdbTransaction::ExecType anExecType)
assert(isKeyOp());
if (isIndexOp()) {
NdbBlob* tFirstBlob = theNdbOp->theBlobList;
- if (this != tFirstBlob) {
+ if (this == tFirstBlob) {
+ packKeyValue(theTable, theKeyBuf);
+ } else {
// copy key from first blob
- assert(theKeyBuf.size == tFirstBlob->theKeyBuf.size);
- memcpy(theKeyBuf.data, tFirstBlob->theKeyBuf.data, tFirstBlob->theKeyBuf.size);
+ theKeyBuf.copyfrom(tFirstBlob->theKeyBuf);
+ thePackKeyBuf.copyfrom(tFirstBlob->thePackKeyBuf);
}
}
if (isReadOp()) {
@@ -1710,12 +1787,16 @@ NdbBlob::atNextResult()
DBUG_RETURN(-1);
assert(isScanOp());
// get primary key
- { Uint32* data = (Uint32*)theKeyBuf.data;
- unsigned size = theTable->m_keyLenInWords;
- if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) {
+ { NdbScanOperation* tScanOp = (NdbScanOperation*)theNdbOp;
+ Uint32* data = (Uint32*)thePackKeyBuf.data;
+ unsigned size = theTable->m_keyLenInWords; // in-out
+ if (tScanOp->getKeyFromKEYINFO20(data, size) == -1) {
setErrorCode(NdbBlobImpl::ErrUsage);
DBUG_RETURN(-1);
}
+ thePackKeyBuf.size = 4 * size;
+ if (unpackKeyValue(theTable, theKeyBuf) == -1)
+ DBUG_RETURN(-1);
}
getHeadFromRecAttr();
if (setPos(0) == -1)