diff options
Diffstat (limited to 'ndb/src/ndbapi/NdbBlob.cpp')
-rw-r--r-- | ndb/src/ndbapi/NdbBlob.cpp | 291 |
1 files changed, 196 insertions, 95 deletions
diff --git a/ndb/src/ndbapi/NdbBlob.cpp b/ndb/src/ndbapi/NdbBlob.cpp index 8e067f770e8..7939f54d846 100644 --- a/ndb/src/ndbapi/NdbBlob.cpp +++ b/ndb/src/ndbapi/NdbBlob.cpp @@ -14,23 +14,25 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include "Ndb.hpp" -#include "NdbDictionaryImpl.hpp" -#include "NdbConnection.hpp" -#include "NdbOperation.hpp" -#include "NdbIndexOperation.hpp" -#include "NdbRecAttr.hpp" -#include "NdbBlob.hpp" +#include <Ndb.hpp> +#include <NdbDictionaryImpl.hpp> +#include <NdbConnection.hpp> +#include <NdbOperation.hpp> +#include <NdbIndexOperation.hpp> +#include <NdbRecAttr.hpp> +#include <NdbBlob.hpp> +#include <NdbScanOperation.hpp> #ifdef NDB_BLOB_DEBUG #define DBG(x) \ do { \ static const char* p = getenv("NDB_BLOB_DEBUG"); \ if (p == 0 || *p == 0 || *p == '0') break; \ - const char* cname = theColumn == NULL ? "BLOB" : theColumn->m_name.c_str(); \ - ndbout << cname << " " << __LINE__ << " " << x << " " << *this << endl; \ + static char* prefix = "BLOB"; \ + const char* cname = theColumn == NULL ? "-" : theColumn->m_name.c_str(); \ + ndbout << prefix << " " << hex << (void*)this << " " << cname; \ + ndbout << " " << dec << __LINE__ << " " << x << " " << *this << endl; \ } while (0) -#define EXE() assert(theNdbCon->executeNoBlobs(NoCommit) == 0) #else #define DBG(x) #endif @@ -48,7 +50,7 @@ ndb_blob_debug(const Uint32* data, unsigned size) /* * Reading index table directly (as a table) is faster but there are - * bugs or limitations. Keep the code but make possible to choose. + * bugs or limitations. Keep the code and make possible to choose. */ static const bool g_ndb_blob_ok_to_read_index_table = false; @@ -81,7 +83,7 @@ NdbBlob::getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnIm { assert(t != 0 && c != 0 && c->getBlobType()); memset(btname, 0, BlobTableNameSize); - sprintf(btname, "NDB$BLOB_%d_%d_%d", (int)t->m_tableId, (int)t->m_version, (int)c->m_attrId); + sprintf(btname, "NDB$BLOB_%d_%d", (int)t->m_tableId, (int)c->m_attrId); } void @@ -115,7 +117,7 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm case NdbDictionary::Column::Blob: bc.setType(NdbDictionary::Column::Binary); break; - case NdbDictionary::Column::Clob: + case NdbDictionary::Column::Text: bc.setType(NdbDictionary::Column::Char); break; default: @@ -138,12 +140,12 @@ void NdbBlob::init() { theState = Idle; - theBlobTableName[0] = 0; theNdb = NULL; theNdbCon = NULL; theNdbOp = NULL; theTable = NULL; theAccessTable = NULL; + theBlobTable = NULL; theColumn = NULL; theFillChar = 0; theInlineSize = 0; @@ -154,11 +156,13 @@ NdbBlob::init() theSetFlag = false; theSetBuf = NULL; theGetSetBytes = 0; + thePendingBlobOps = 0; + theActiveHook = NULL; + theActiveHookArg = NULL; theHead = NULL; theInlineData = NULL; theHeadInlineRecAttr = NULL; theHeadInlineUpdateFlag = false; - theNewPartFlag = false; theNullFlag = -1; theLength = 0; thePos = 0; @@ -269,7 +273,7 @@ NdbBlob::isScanOp() inline Uint32 NdbBlob::getPartNumber(Uint64 pos) { - assert(pos >= theInlineSize); + assert(thePartSize != 0 && pos >= theInlineSize); return (pos - theInlineSize) / thePartSize; } @@ -301,7 +305,7 @@ NdbBlob::getTableKeyValue(NdbOperation* anOp) assert(c != NULL); if (c->m_pk) { unsigned len = c->m_attrSize * c->m_arraySize; - if (anOp->getValue(c, (char*)&data[pos]) == NULL) { + if (anOp->getValue_impl(c, (char*)&data[pos]) == NULL) { setErrorCode(anOp); return -1; } @@ -321,10 +325,10 @@ int NdbBlob::setTableKeyValue(NdbOperation* anOp) { const Uint32* data = (const Uint32*)theKeyBuf.data; + DBG("setTableKeyValue key=" << ndb_blob_debug(data, theTable->m_sizeOfKeysInWords)); + const unsigned columns = theTable->m_columns.size(); unsigned pos = 0; - const unsigned size = theTable->m_columns.size(); - DBG("setTableKeyValue key=" << ndb_blob_debug(data, size)); - for (unsigned i = 0; i < size; i++) { + for (unsigned i = 0; i < columns; i++) { NdbColumnImpl* c = theTable->m_columns[i]; assert(c != NULL); if (c->m_pk) { @@ -344,10 +348,10 @@ int NdbBlob::setAccessKeyValue(NdbOperation* anOp) { const Uint32* data = (const Uint32*)theAccessKeyBuf.data; + DBG("setAccessKeyValue key=" << ndb_blob_debug(data, theAccessTable->m_sizeOfKeysInWords)); + const unsigned columns = theAccessTable->m_columns.size(); unsigned pos = 0; - const unsigned size = theAccessTable->m_columns.size(); - DBG("setAccessKeyValue key=" << ndb_blob_debug(data, size)); - for (unsigned i = 0; i < size; i++) { + for (unsigned i = 0; i < columns; i++) { NdbColumnImpl* c = theAccessTable->m_columns[i]; assert(c != NULL); if (c->m_pk) { @@ -382,7 +386,7 @@ int NdbBlob::getHeadInlineValue(NdbOperation* anOp) { DBG("getHeadInlineValue"); - theHeadInlineRecAttr = anOp->getValue(theColumn, theHeadInlineBuf.data); + theHeadInlineRecAttr = anOp->getValue_impl(theColumn, theHeadInlineBuf.data); if (theHeadInlineRecAttr == NULL) { setErrorCode(anOp); return -1; @@ -478,11 +482,27 @@ NdbBlob::setValue(const void* data, Uint32 bytes) return 0; } +// activation hook + +int +NdbBlob::setActiveHook(ActiveHook activeHook, void* arg) +{ + DBG("setActiveHook hook=" << hex << (void*)activeHook << " arg=" << hex << arg); + if (theState != Prepared) { + setErrorCode(ErrState); + return -1; + } + theActiveHook = activeHook; + theActiveHookArg = arg; + return 0; +} + // misc operations int NdbBlob::getNull(bool& isNull) { + DBG("getNull"); if (theState == Prepared && theSetFlag) { isNull = (theSetBuf == NULL); return 0; @@ -519,6 +539,7 @@ NdbBlob::setNull() int NdbBlob::getLength(Uint64& len) { + DBG("getLength"); if (theState == Prepared && theSetFlag) { len = theGetSetBytes; return 0; @@ -534,17 +555,17 @@ NdbBlob::getLength(Uint64& len) int NdbBlob::truncate(Uint64 length) { - DBG("truncate kength=" << length); + DBG("truncate length=" << length); if (theNullFlag == -1) { setErrorCode(ErrState); return -1; } if (theLength > length) { - if (length >= theInlineSize) { - Uint32 part1 = getPartNumber(length); + if (length > theInlineSize) { + Uint32 part1 = getPartNumber(length - 1); Uint32 part2 = getPartNumber(theLength - 1); assert(part2 >= part1); - if (deleteParts(part1, part2 - part1) == -1) + if (part2 > part1 && deleteParts(part1 + 1, part2 - part1) == -1) return -1; } else { if (deleteParts(0, getPartCount()) == -1) @@ -559,6 +580,7 @@ NdbBlob::truncate(Uint64 length) int NdbBlob::getPos(Uint64& pos) { + DBG("getPos"); if (theNullFlag == -1) { setErrorCode(ErrState); return -1; @@ -570,6 +592,7 @@ NdbBlob::getPos(Uint64& pos) int NdbBlob::setPos(Uint64 pos) { + DBG("setPos pos=" << pos); if (theNullFlag == -1) { setErrorCode(ErrState); return -1; @@ -628,6 +651,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) len -= n; } } + if (len > 0 && thePartSize == 0) { + setErrorCode(ErrSeek); + return -1; + } if (len > 0) { assert(pos >= theInlineSize); Uint32 off = (pos - theInlineSize) % thePartSize; @@ -637,11 +664,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) Uint32 part = (pos - theInlineSize) / thePartSize; if (readParts(thePartBuf.data, part, 1) == -1) return -1; - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); + // need result now + DBG("execute pending part reads"); + if (executePendingBlobReads() == -1) return -1; - } Uint32 n = thePartSize - off; if (n > len) n = len; @@ -672,11 +698,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) Uint32 part = (pos - theInlineSize) / thePartSize; if (readParts(thePartBuf.data, part, 1) == -1) return -1; - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); + // need result now + DBG("execute pending part reads"); + if (executePendingBlobReads() == -1) return -1; - } memcpy(buf, thePartBuf.data, len); Uint32 n = len; pos += n; @@ -735,29 +760,27 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) len -= n; } } + if (len > 0 && thePartSize == 0) { + setErrorCode(ErrSeek); + return -1; + } if (len > 0) { assert(pos >= theInlineSize); Uint32 off = (pos - theInlineSize) % thePartSize; // partial first block if (off != 0) { DBG("partial first block pos=" << pos << " len=" << len); - if (theNewPartFlag) { - // must flush insert to guarantee read - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); - return -1; - } - theNewPartFlag = false; - } + // flush writes to guarantee correct read + DBG("execute pending part writes"); + if (executePendingBlobWrites() == -1) + return -1; Uint32 part = (pos - theInlineSize) / thePartSize; if (readParts(thePartBuf.data, part, 1) == -1) return -1; - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); + // need result now + DBG("execute pending part reafs"); + if (executePendingBlobReads() == -1) return -1; - } Uint32 n = thePartSize - off; if (n > len) { memset(thePartBuf.data + off + len, theFillChar, n - len); @@ -798,22 +821,16 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize); Uint32 part = (pos - theInlineSize) / thePartSize; if (theLength > pos + len) { - if (theNewPartFlag) { - // must flush insert to guarantee read - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); - return -1; - } - theNewPartFlag = false; - } + // flush writes to guarantee correct read + DBG("execute pending part writes"); + if (executePendingBlobWrites() == -1) + return -1; if (readParts(thePartBuf.data, part, 1) == -1) return -1; - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode(theNdbOp); + // need result now + DBG("execute pending part reads"); + if (executePendingBlobReads() == -1) return -1; - } memcpy(thePartBuf.data, buf, len); if (updateParts(thePartBuf.data, part, 1) == -1) return -1; @@ -848,7 +865,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) DBG("readParts part=" << part << " count=" << count); Uint32 n = 0; while (n < count) { - NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); + NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); if (tOp == NULL || tOp->readTuple() == -1 || setPartKeyValue(tOp, part + n) == -1 || @@ -858,6 +875,8 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) } buf += thePartSize; n++; + thePendingBlobOps |= (1 << NdbOperation::ReadRequest); + theNdbCon->thePendingBlobOps |= (1 << NdbOperation::ReadRequest); } return 0; } @@ -868,7 +887,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) DBG("insertParts part=" << part << " count=" << count); Uint32 n = 0; while (n < count) { - NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); + NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); if (tOp == NULL || tOp->insertTuple() == -1 || setPartKeyValue(tOp, part + n) == -1 || @@ -878,7 +897,8 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) } buf += thePartSize; n++; - theNewPartFlag = true; + thePendingBlobOps |= (1 << NdbOperation::InsertRequest); + theNdbCon->thePendingBlobOps |= (1 << NdbOperation::InsertRequest); } return 0; } @@ -889,7 +909,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count) DBG("updateParts part=" << part << " count=" << count); Uint32 n = 0; while (n < count) { - NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); + NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); if (tOp == NULL || tOp->updateTuple() == -1 || setPartKeyValue(tOp, part + n) == -1 || @@ -899,7 +919,8 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count) } buf += thePartSize; n++; - theNewPartFlag = true; + thePendingBlobOps |= (1 << NdbOperation::UpdateRequest); + theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UpdateRequest); } return 0; } @@ -910,7 +931,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count) DBG("deleteParts part=" << part << " count=" << count); Uint32 n = 0; while (n < count) { - NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); + NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); if (tOp == NULL || tOp->deleteTuple() == -1 || setPartKeyValue(tOp, part + n) == -1) { @@ -918,6 +939,52 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count) return -1; } n++; + thePendingBlobOps |= (1 << NdbOperation::DeleteRequest); + theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest); + } + return 0; +} + +// pending ops + +int +NdbBlob::executePendingBlobReads() +{ + Uint8 flags = (1 << NdbOperation::ReadRequest); + if (thePendingBlobOps & flags) { + if (theNdbCon->executeNoBlobs(NoCommit) == -1) + return -1; + thePendingBlobOps = 0; + theNdbCon->thePendingBlobOps = 0; + } + return 0; +} + +int +NdbBlob::executePendingBlobWrites() +{ + Uint8 flags = 0xFF & ~(1 << NdbOperation::ReadRequest); + if (thePendingBlobOps & flags) { + if (theNdbCon->executeNoBlobs(NoCommit) == -1) + return -1; + thePendingBlobOps = 0; + theNdbCon->thePendingBlobOps = 0; + } + return 0; +} + +// callbacks + +int +NdbBlob::invokeActiveHook() +{ + DBG("invokeActiveHook"); + assert(theState == Active && theActiveHook != NULL); + int ret = (*theActiveHook)(this, theActiveHookArg); + DBG("invokeActiveHook ret=" << ret); + if (ret != 0) { + // no error is set on blob level + return -1; } return 0; } @@ -947,7 +1014,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* partType = NdbDictionary::Column::Binary; theFillChar = 0x0; break; - case NdbDictionary::Column::Clob: + case NdbDictionary::Column::Text: partType = NdbDictionary::Column::Char; theFillChar = 0x20; break; @@ -959,22 +1026,21 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* theInlineSize = theColumn->getInlineSize(); thePartSize = theColumn->getPartSize(); theStripeSize = theColumn->getStripeSize(); - // blob table sanity check + // sanity check assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head)); assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize); - getBlobTableName(theBlobTableName, theTable, theColumn); - const NdbDictionary::Table* bt; - const NdbDictionary::Column* bc; - if (theInlineSize >= (1 << 16) || - thePartSize == 0 || - thePartSize >= (1 << 16) || - theStripeSize == 0 || - (bt = theNdb->theDictionary->getTable(theBlobTableName)) == NULL || - (bc = bt->getColumn("DATA")) == NULL || - bc->getType() != partType || - bc->getLength() != (int)thePartSize) { - setErrorCode(ErrTable); - return -1; + if (thePartSize > 0) { + const NdbDictionary::Table* bt = NULL; + const NdbDictionary::Column* bc = NULL; + if (theStripeSize == 0 || + (bt = theColumn->getBlobTable()) == NULL || + (bc = bt->getColumn("DATA")) == NULL || + bc->getType() != partType || + bc->getLength() != (int)thePartSize) { + setErrorCode(ErrTable); + return -1; + } + theBlobTable = &NdbTableImpl::getImpl(*bt); } // buffers theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2); @@ -1060,7 +1126,7 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) Uint32 bytes = theGetSetBytes - theInlineSize; if (writeDataPrivate(pos, buf, bytes) == -1) return -1; - if (anExecType == Commit && theHeadInlineUpdateFlag) { + if (theHeadInlineUpdateFlag) { // add an operation to update head+inline NdbOperation* tOp = theNdbCon->getNdbOperation(theTable); if (tOp == NULL || @@ -1128,6 +1194,10 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) batch = true; } } + if (theActiveHook != NULL) { + // need blob head for callback + batch = true; + } DBG("preExecute out batch=" << batch); return 0; } @@ -1144,8 +1214,11 @@ NdbBlob::postExecute(ExecType anExecType) DBG("postExecute type=" << anExecType); if (theState == Invalid) return -1; - if (theState == Active) + if (theState == Active) { + setState(anExecType == NoCommit ? Active : Closed); + DBG("postExecute skip"); return 0; + } assert(theState == Prepared); assert(isKeyOp()); if (isIndexOp()) { @@ -1199,8 +1272,12 @@ NdbBlob::postExecute(ExecType anExecType) if (deleteParts(0, getPartCount()) == -1) return -1; } - theNewPartFlag = false; setState(anExecType == NoCommit ? Active : Closed); + // activation callback + if (theActiveHook != NULL) { + if (invokeActiveHook() == -1) + return -1; + } DBG("postExecute out"); return 0; } @@ -1250,7 +1327,7 @@ NdbBlob::atNextResult() // get primary key { Uint32* data = (Uint32*)theKeyBuf.data; unsigned size = theTable->m_sizeOfKeysInWords; - if (theNdbOp->getKeyFromKEYINFO20(data, size) == -1) { + if (((NdbScanOperation*)theNdbOp)->getKeyFromKEYINFO20(data, size) == -1) { setErrorCode(ErrUsage); return -1; } @@ -1274,20 +1351,18 @@ NdbBlob::atNextResult() Uint32 bytes = theGetSetBytes - theInlineSize; if (readDataPrivate(pos, buf, bytes) == -1) return -1; - // must also execute them - DBG("force execute"); - if (theNdbCon->executeNoBlobs(NoCommit) == -1) { - setErrorCode((NdbOperation*)0); - return -1; - } } } setState(Active); + // activation callback + if (theActiveHook != NULL) { + if (invokeActiveHook() == -1) + return -1; + } DBG("atNextResult out"); return 0; } - // misc const NdbDictionary::Column* @@ -1303,6 +1378,9 @@ NdbBlob::setErrorCode(int anErrorCode, bool invalidFlag) { DBG("setErrorCode code=" << anErrorCode); theError.code = anErrorCode; + // conditionally copy error to operation level + if (theNdbOp != NULL && theNdbOp->theError.code == 0) + theNdbOp->setErrorCode(theError.code); if (invalidFlag) setState(Invalid); } @@ -1335,11 +1413,34 @@ NdbBlob::setErrorCode(NdbConnection* aCon, bool invalidFlag) setErrorCode(code, invalidFlag); } +// info about all blobs in this operation + +NdbBlob* +NdbBlob::blobsFirstBlob() +{ + return theNdbOp->theBlobList; +} + +NdbBlob* +NdbBlob::blobsNextBlob() +{ + return theNext; +} + +// debug + #ifdef VM_TRACE +inline int +NdbBlob::getOperationType() const +{ + return theNdbOp != NULL ? theNdbOp->theOperationType : -1; +} + NdbOut& operator<<(NdbOut& out, const NdbBlob& blob) { - ndbout << dec << "s=" << blob.theState; + ndbout << dec << "o=" << blob.getOperationType(); + ndbout << dec << " s=" << blob.theState; ndbout << dec << " n=" << blob.theNullFlag;; ndbout << dec << " l=" << blob.theLength; ndbout << dec << " p=" << blob.thePos; |