diff options
Diffstat (limited to 'ndb/src/ndbapi')
-rw-r--r-- | ndb/src/ndbapi/Makefile | 3 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbBlob.cpp | 1334 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbConnection.cpp | 148 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbDictionary.cpp | 3 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbDictionaryImpl.cpp | 81 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbDictionaryImpl.hpp | 17 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbIndexOperation.cpp | 11 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperation.cpp | 33 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperationDefine.cpp | 28 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperationScan.cpp | 31 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperationSearch.cpp | 32 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbScanOperation.cpp | 22 | ||||
-rw-r--r-- | ndb/src/ndbapi/Ndberr.cpp | 8 | ||||
-rw-r--r-- | ndb/src/ndbapi/Ndbinit.cpp | 3 | ||||
-rw-r--r-- | ndb/src/ndbapi/Ndblist.cpp | 30 | ||||
-rw-r--r-- | ndb/src/ndbapi/ndberror.c | 10 |
16 files changed, 1765 insertions, 29 deletions
diff --git a/ndb/src/ndbapi/Makefile b/ndb/src/ndbapi/Makefile index f4c82e5d6ba..564f9ab908e 100644 --- a/ndb/src/ndbapi/Makefile +++ b/ndb/src/ndbapi/Makefile @@ -55,7 +55,8 @@ SOURCES = \ NdbSchemaOp.cpp \ NdbUtil.cpp \ NdbReceiver.cpp \ - NdbDictionary.cpp NdbDictionaryImpl.cpp DictCache.cpp + NdbDictionary.cpp NdbDictionaryImpl.cpp DictCache.cpp \ + NdbBlob.cpp include $(NDB_TOP)/Epilogue.mk diff --git a/ndb/src/ndbapi/NdbBlob.cpp b/ndb/src/ndbapi/NdbBlob.cpp new file mode 100644 index 00000000000..99f986b9b8f --- /dev/null +++ b/ndb/src/ndbapi/NdbBlob.cpp @@ -0,0 +1,1334 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "Ndb.hpp" +#include "NdbDictionaryImpl.hpp" +#include "NdbConnection.hpp" +#include "NdbOperation.hpp" +#include "NdbIndexOperation.hpp" +#include "NdbRecAttr.hpp" +#include "NdbBlob.hpp" + +#ifdef NDB_BLOB_DEBUG +#define DBG(x) \ + do { \ + static const char* p = getenv("NDB_BLOB_DEBUG"); \ + if (p == 0 || *p == 0 || *p == '0') break; \ + const char* cname = theColumn == NULL ? "BLOB" : theColumn->m_name.c_str(); \ + ndbout << cname << " " << __LINE__ << " " << x << " " << *this << endl; \ + } while (0) +#define EXE() assert(theNdbCon->executeNoBlobs(NoCommit) == 0) +#else +#undef DBG(x) +#endif + +/* + * Reading index table directly (as a table) is faster but there are + * bugs or limitations. Keep the code but make possible to choose. + */ +static const bool g_ndb_blob_ok_to_read_index_table = false; + +// state (inline) + +inline void +NdbBlob::setState(State newState) +{ + DBG("setState " << newState); + theState = newState; +} + +// define blob table + +int +NdbBlob::getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName) +{ + NdbTableImpl* t = anNdb->theDictionary->m_impl.getTable(tableName); + if (t == NULL) + return -1; + NdbColumnImpl* c = t->getColumn(columnName); + if (c == NULL) + return -1; + getBlobTableName(btname, t, c); + return 0; +} + +void +NdbBlob::getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c) +{ + assert(t != 0 && c != 0 && c->getBlobType()); + memset(btname, 0, BlobTableNameSize); + sprintf(btname, "NDB$BLOB_%d_%d_%d", (int)t->m_tableId, (int)t->m_version, (int)c->m_attrId); +} + +void +NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c) +{ + char btname[BlobTableNameSize]; + getBlobTableName(btname, t, c); + bt.setName(btname); + bt.setLogging(t->getLogging()); + bt.setFragmentType(t->getFragmentType()); + { NdbDictionary::Column bc("DIST"); + bc.setType(NdbDictionary::Column::Unsigned); + bc.setPrimaryKey(true); + bc.setDistributionKey(true); + bt.addColumn(bc); + } + { NdbDictionary::Column bc("PART"); + bc.setType(NdbDictionary::Column::Unsigned); + bc.setPrimaryKey(true); + bt.addColumn(bc); + } + { NdbDictionary::Column bc("PK"); + bc.setType(NdbDictionary::Column::Unsigned); + assert(t->m_sizeOfKeysInWords != 0); + bc.setLength(t->m_sizeOfKeysInWords); + bc.setPrimaryKey(true); + bt.addColumn(bc); + } + { NdbDictionary::Column bc("DATA"); + switch (c->m_type) { + case NdbDictionary::Column::Blob: + bc.setType(NdbDictionary::Column::Binary); + break; + case NdbDictionary::Column::Clob: + bc.setType(NdbDictionary::Column::Char); + break; + default: + assert(false); + break; + } + bc.setLength(c->getPartSize()); + bt.addColumn(bc); + } +} + +// initialization + +NdbBlob::NdbBlob() +{ + init(); +} + +void +NdbBlob::init() +{ + theState = Idle; + theBlobTableName[0] = 0; + theNdb = NULL; + theNdbCon = NULL; + theNdbOp = NULL; + theTable = NULL; + theAccessTable = NULL; + theColumn = NULL; + theFillChar = 0; + theInlineSize = 0; + thePartSize = 0; + theStripeSize = 0; + theGetFlag = false; + theGetBuf = NULL; + theSetFlag = false; + theSetBuf = NULL; + theGetSetBytes = 0; + theHead = NULL; + theInlineData = NULL; + theHeadInlineRecAttr = NULL; + theHeadInlineUpdateFlag = false; + theNewPartFlag = false; + theNullFlag = -1; + theLength = 0; + thePos = 0; + theNext = NULL; +} + +void +NdbBlob::release() +{ + setState(Idle); +} + +// buffers + +NdbBlob::Buf::Buf() : + data(NULL), + size(0), + maxsize(0) +{ +} + +NdbBlob::Buf::~Buf() +{ + delete [] data; +} + +void +NdbBlob::Buf::alloc(unsigned n) +{ + size = n; + if (maxsize < n) { + delete [] data; + // align to Uint64 + if (n % 8 != 0) + n += 8 - n % 8; + data = new char [n]; + maxsize = n; + } +#ifdef VM_TRACE + memset(data, 'X', maxsize); +#endif +} + +// classify operations (inline) + +inline bool +NdbBlob::isTableOp() +{ + return theTable == theAccessTable; +} + +inline bool +NdbBlob::isIndexOp() +{ + return theTable != theAccessTable; +} + +inline bool +NdbBlob::isKeyOp() +{ + return + theNdbOp->theOperationType == InsertRequest || + theNdbOp->theOperationType == UpdateRequest || + theNdbOp->theOperationType == ReadRequest || + theNdbOp->theOperationType == ReadExclusive || + theNdbOp->theOperationType == DeleteRequest; +} + +inline bool +NdbBlob::isReadOp() +{ + return + theNdbOp->theOperationType == ReadRequest || + theNdbOp->theOperationType == ReadExclusive; +} + +inline bool +NdbBlob::isInsertOp() +{ + return + theNdbOp->theOperationType == InsertRequest; +} + +inline bool +NdbBlob::isUpdateOp() +{ + return + theNdbOp->theOperationType == UpdateRequest; +} + +inline bool +NdbBlob::isDeleteOp() +{ + return + theNdbOp->theOperationType == DeleteRequest; +} + +inline bool +NdbBlob::isScanOp() +{ + return + theNdbOp->theOperationType == OpenScanRequest || + theNdbOp->theOperationType == OpenRangeScanRequest; +} + +// computations (inline) + +inline Uint32 +NdbBlob::getPartNumber(Uint64 pos) +{ + assert(pos >= theInlineSize); + return (pos - theInlineSize) / thePartSize; +} + +inline Uint32 +NdbBlob::getPartCount() +{ + if (theLength <= theInlineSize) + return 0; + return 1 + getPartNumber(theLength - 1); +} + +inline Uint32 +NdbBlob::getDistKey(Uint32 part) +{ + assert(theStripeSize != 0); + return (part / theStripeSize) % theStripeSize; +} + +// getters and setters + +int +NdbBlob::getTableKeyValue(NdbOperation* anOp) +{ + Uint32* data = (Uint32*)theKeyBuf.data; + unsigned pos = 0; + DBG("getTableKeyValue"); + for (unsigned i = 0; i < theTable->m_columns.size(); i++) { + NdbColumnImpl* c = theTable->m_columns[i]; + assert(c != NULL); + if (c->m_pk) { + unsigned len = c->m_attrSize * c->m_arraySize; + if (anOp->getValue(c, (char*)&data[pos]) == NULL) { + setErrorCode(anOp); + return -1; + } + // odd bytes receive no data and must be zeroed + while (len % 4 != 0) { + char* p = (char*)data + len++; + *p = 0; + } + pos += len / 4; + } + } + assert(pos == theKeyBuf.size / 4); + return 0; +} + +int +NdbBlob::setTableKeyValue(NdbOperation* anOp) +{ + const Uint32* data = (const Uint32*)theKeyBuf.data; + unsigned pos = 0; + DBG("setTableKeyValue key0=" << data[0]); + for (unsigned i = 0; i < theTable->m_columns.size(); i++) { + NdbColumnImpl* c = theTable->m_columns[i]; + assert(c != NULL); + if (c->m_pk) { + unsigned len = c->m_attrSize * c->m_arraySize; + if (anOp->equal_impl(c, (const char*)&data[pos], len) == -1) { + setErrorCode(anOp); + return -1; + } + pos += (len + 3) / 4; + } + } + assert(pos == theKeyBuf.size / 4); + return 0; +} + +int +NdbBlob::setAccessKeyValue(NdbOperation* anOp) +{ + const Uint32* data = (const Uint32*)theAccessKeyBuf.data; + unsigned pos = 0; + DBG("setAccessKeyValue key0=" << data[0]); + for (unsigned i = 0; i < theAccessTable->m_columns.size(); i++) { + NdbColumnImpl* c = theAccessTable->m_columns[i]; + assert(c != NULL); + if (c->m_pk) { + unsigned len = c->m_attrSize * c->m_arraySize; + if (anOp->equal_impl(c, (const char*)&data[pos], len) == -1) { + setErrorCode(anOp); + return -1; + } + pos += (len + 3) / 4; + } + } + assert(pos == theAccessKeyBuf.size / 4); + return 0; +} + +int +NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part) +{ + DBG("setPartKeyValue dist=" << getDistKey(part) << " part=" << part << " key0=" << *(Uint32*)theKeyBuf.data); + if (anOp->equal((Uint32)0, getDistKey(part)) == -1 || + anOp->equal((Uint32)1, part) == -1 || + anOp->equal((Uint32)2, theKeyBuf.data) == -1) { + setErrorCode(anOp); + return -1; + } + return 0; +} + +int +NdbBlob::getHeadInlineValue(NdbOperation* anOp) +{ + DBG("getHeadInlineValue"); + theHeadInlineRecAttr = anOp->getValue(theColumn, theHeadInlineBuf.data); + if (theHeadInlineRecAttr == NULL) { + setErrorCode(anOp); + return -1; + } + return 0; +} + +void +NdbBlob::getHeadFromRecAttr() +{ + assert(theHeadInlineRecAttr != NULL); + theNullFlag = theHeadInlineRecAttr->isNULL(); + assert(theNullFlag != -1); + theLength = ! theNullFlag ? theHead->length : 0; + DBG("getHeadFromRecAttr out"); +} + +int +NdbBlob::setHeadInlineValue(NdbOperation* anOp) +{ + DBG("setHeadInlineValue"); + theHead->length = theLength; + if (theLength < theInlineSize) + memset(theInlineData + theLength, 0, theInlineSize - theLength); + assert(theNullFlag != -1); + const char* aValue = theNullFlag ? 0 : theHeadInlineBuf.data; + if (anOp->setValue(theColumn, aValue, theHeadInlineBuf.size) == -1) { + setErrorCode(anOp); + return -1; + } + theHeadInlineUpdateFlag = false; + return 0; +} + +// getValue/setValue + +int +NdbBlob::getValue(void* data, Uint32 bytes) +{ + DBG("getValue data=" << hex << data << " bytes=" << dec << bytes); + if (theGetFlag || theState != Prepared) { + setErrorCode(ErrState); + return -1; + } + if (! isReadOp() && ! isScanOp()) { + setErrorCode(ErrUsage); + return -1; + } + if (data == NULL && bytes != 0) { + setErrorCode(ErrUsage); + return -1; + } + theGetFlag = true; + theGetBuf = static_cast<char*>(data); + theGetSetBytes = bytes; + return 0; +} + +int +NdbBlob::setValue(const void* data, Uint32 bytes) +{ + DBG("setValue data=" << hex << data << " bytes=" << dec << bytes); + if (theSetFlag || theState != Prepared) { + setErrorCode(ErrState); + return -1; + } + if (! isInsertOp() && ! isUpdateOp()) { + setErrorCode(ErrUsage); + return -1; + } + if (data == NULL && bytes != 0) { + setErrorCode(ErrUsage); + return -1; + } + theSetFlag = true; + theSetBuf = static_cast<const char*>(data); + theGetSetBytes = bytes; + if (isInsertOp()) { + // write inline part now + if (theSetBuf != 0) { + unsigned n = theGetSetBytes; + if (n > theInlineSize) + n = theInlineSize; + if (writeDataPrivate(0, theSetBuf, n) == -1) + return -1; + } else { + theNullFlag = true; + theLength = 0; + } + if (setHeadInlineValue(theNdbOp) == -1) + return -1; + } + return 0; +} + +// misc operations + +int +NdbBlob::getNull(bool& isNull) +{ + if (theState == Prepared && theSetFlag) { + isNull = (theSetBuf == NULL); + return 0; + } + if (theNullFlag == -1) { + setErrorCode(ErrState); + return -1; + } + isNull = theNullFlag; + return 0; +} + +int +NdbBlob::setNull() +{ + DBG("setNull"); + if (theNullFlag == -1) { + if (theState == Prepared) { + return setValue(0, 0); + } + setErrorCode(ErrState); + return -1; + } + if (theNullFlag) + return 0; + if (deleteParts(0, getPartCount()) == -1) + return -1; + theNullFlag = true; + theLength = 0; + theHeadInlineUpdateFlag = true; + return 0; +} + +int +NdbBlob::getLength(Uint64& len) +{ + if (theState == Prepared && theSetFlag) { + len = theGetSetBytes; + return 0; + } + if (theNullFlag == -1) { + setErrorCode(ErrState); + return -1; + } + len = theLength; + return 0; +} + +int +NdbBlob::truncate(Uint64 length) +{ + DBG("truncate kength=" << length); + if (theNullFlag == -1) { + setErrorCode(ErrState); + return -1; + } + if (theLength > length) { + if (length >= theInlineSize) { + Uint32 part1 = getPartNumber(length); + Uint32 part2 = getPartNumber(theLength - 1); + assert(part2 >= part1); + if (deleteParts(part1, part2 - part1) == -1) + return -1; + } else { + if (deleteParts(0, getPartCount()) == -1) + return -1; + } + theLength = length; + theHeadInlineUpdateFlag = true; + } + return 0; +} + +int +NdbBlob::getPos(Uint64& pos) +{ + if (theNullFlag == -1) { + setErrorCode(ErrState); + return -1; + } + pos = thePos; + return 0; +} + +int +NdbBlob::setPos(Uint64 pos) +{ + if (theNullFlag == -1) { + setErrorCode(ErrState); + return -1; + } + if (pos > theLength) { + setErrorCode(ErrSeek); + return -1; + } + thePos = pos; + return 0; +} + +// read/write + +int +NdbBlob::readData(void* data, Uint32& bytes) +{ + if (readData(thePos, data, bytes) == -1) + return -1; + thePos += bytes; + assert(thePos <= theLength); + return 0; +} + +int +NdbBlob::readData(Uint64 pos, void* data, Uint32& bytes) +{ + if (theState != Active) { + setErrorCode(ErrState); + return -1; + } + char* buf = static_cast<char*>(data); + return readDataPrivate(pos, buf, bytes); +} + +int +NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) +{ + DBG("readData pos=" << pos << " bytes=" << bytes); + if (pos > theLength) { + setErrorCode(ErrSeek); + return -1; + } + if (bytes > theLength - pos) + bytes = theLength - pos; + Uint32 len = bytes; + if (len > 0) { + // inline part + if (pos < theInlineSize) { + Uint32 n = theInlineSize - pos; + if (n > len) + n = len; + memcpy(buf, theInlineData + pos, n); + pos += n; + buf += n; + len -= n; + } + } + if (len > 0) { + assert(pos >= theInlineSize); + Uint32 off = (pos - theInlineSize) % thePartSize; + // partial first block + if (off != 0) { + DBG("partial first block pos=" << pos << " len=" << len); + Uint32 part = (pos - theInlineSize) / thePartSize; + if (readParts(thePartBuf.data, part, 1) == -1) + return -1; + DBG("force execute"); + if (theNdbCon->executeNoBlobs(NoCommit) == -1) { + setErrorCode(theNdbOp); + return -1; + } + Uint32 n = thePartSize - off; + if (n > len) + n = len; + memcpy(buf, thePartBuf.data + off, n); + pos += n; + buf += n; + len -= n; + } + } + if (len > 0) { + assert((pos - theInlineSize) % thePartSize == 0); + // complete blocks in the middle + if (len >= thePartSize) { + Uint32 part = (pos - theInlineSize) / thePartSize; + Uint32 count = len / thePartSize; + if (readParts(buf, part, count) == -1) + return -1; + Uint32 n = thePartSize * count; + pos += n; + buf += n; + len -= n; + } + } + if (len > 0) { + // partial last block + DBG("partial last block pos=" << pos << " len=" << len); + assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize); + Uint32 part = (pos - theInlineSize) / thePartSize; + if (readParts(thePartBuf.data, part, 1) == -1) + return -1; + DBG("force execute"); + if (theNdbCon->executeNoBlobs(NoCommit) == -1) { + setErrorCode(theNdbOp); + return -1; + } + memcpy(buf, thePartBuf.data, len); + Uint32 n = len; + pos += n; + buf += n; + len -= n; + } + assert(len == 0); + return 0; +} + +int +NdbBlob::writeData(const void* data, Uint32 bytes) +{ + if (writeData(thePos, data, bytes) == -1) + return -1; + thePos += bytes; + assert(thePos <= theLength); + return 0; +} + +int +NdbBlob::writeData(Uint64 pos, const void* data, Uint32 bytes) +{ + if (theState != Active) { + setErrorCode(ErrState); + return -1; + } + const char* buf = static_cast<const char*>(data); + return writeDataPrivate(pos, buf, bytes); +} + +int +NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) +{ + DBG("writeData pos=" << pos << " bytes=" << bytes); + if (pos > theLength) { + setErrorCode(ErrSeek); + return -1; + } + Uint32 len = bytes; + // any write makes blob not NULL + if (theNullFlag) { + theNullFlag = false; + theHeadInlineUpdateFlag = true; + } + if (len > 0) { + // inline part + if (pos < theInlineSize) { + Uint32 n = theInlineSize - pos; + if (n > len) + n = len; + memcpy(theInlineData + pos, buf, n); + theHeadInlineUpdateFlag = true; + pos += n; + buf += n; + len -= n; + } + } + if (len > 0) { + assert(pos >= theInlineSize); + Uint32 off = (pos - theInlineSize) % thePartSize; + // partial first block + if (off != 0) { + DBG("partial first block pos=" << pos << " len=" << len); + if (theNewPartFlag) { + // must flush insert to guarantee read + DBG("force execute"); + if (theNdbCon->executeNoBlobs(NoCommit) == -1) { + setErrorCode(theNdbOp); + return -1; + } + theNewPartFlag = false; + } + Uint32 part = (pos - theInlineSize) / thePartSize; + if (readParts(thePartBuf.data, part, 1) == -1) + return -1; + DBG("force execute"); + if (theNdbCon->executeNoBlobs(NoCommit) == -1) { + setErrorCode(theNdbOp); + return -1; + } + Uint32 n = thePartSize - off; + if (n > len) { + memset(thePartBuf.data + off + len, theFillChar, n - len); + n = len; + } + memcpy(thePartBuf.data + off, buf, n); + if (updateParts(thePartBuf.data, part, 1) == -1) + return -1; + pos += n; + buf += n; + len -= n; + } + } + if (len > 0) { + assert((pos - theInlineSize) % thePartSize == 0); + // complete blocks in the middle + if (len >= thePartSize) { + Uint32 part = (pos - theInlineSize) / thePartSize; + Uint32 count = len / thePartSize; + for (unsigned i = 0; i < count; i++) { + if (part + i < getPartCount()) { + if (updateParts(buf, part + i, 1) == -1) + return -1; + } else { + if (insertParts(buf, part + i, 1) == -1) + return -1; + } + Uint32 n = thePartSize; + pos += n; + buf += n; + len -= n; + } + } + } + if (len > 0) { + // partial last block + DBG("partial last block pos=" << pos << " len=" << len); + assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize); + Uint32 part = (pos - theInlineSize) / thePartSize; + if (theLength > pos + len) { + if (theNewPartFlag) { + // must flush insert to guarantee read + DBG("force execute"); + if (theNdbCon->executeNoBlobs(NoCommit) == -1) { + setErrorCode(theNdbOp); + return -1; + } + theNewPartFlag = false; + } + if (readParts(thePartBuf.data, part, 1) == -1) + return -1; + DBG("force execute"); + if (theNdbCon->executeNoBlobs(NoCommit) == -1) { + setErrorCode(theNdbOp); + return -1; + } + memcpy(thePartBuf.data, buf, len); + if (updateParts(thePartBuf.data, part, 1) == -1) + return -1; + } else { + memcpy(thePartBuf.data, buf, len); + memset(thePartBuf.data + len, theFillChar, thePartSize - len); + if (part < getPartCount()) { + if (updateParts(thePartBuf.data, part, 1) == -1) + return -1; + } else { + if (insertParts(thePartBuf.data, part, 1) == -1) + return -1; + } + } + Uint32 n = len; + pos += n; + buf += n; + len -= n; + } + assert(len == 0); + if (theLength < pos) { + theLength = pos; + theHeadInlineUpdateFlag = true; + } + DBG("writeData out"); + return 0; +} + +int +NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) +{ + DBG("readParts part=" << part << " count=" << count); + Uint32 n = 0; + while (n < count) { + NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); + if (tOp == NULL || + tOp->readTuple() == -1 || + setPartKeyValue(tOp, part + n) == -1 || + tOp->getValue((Uint32)3, buf) == NULL) { + setErrorCode(tOp); + return -1; + } + buf += thePartSize; + n++; + } + return 0; +} + +int +NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) +{ + DBG("insertParts part=" << part << " count=" << count); + Uint32 n = 0; + while (n < count) { + NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); + if (tOp == NULL || + tOp->insertTuple() == -1 || + setPartKeyValue(tOp, part + n) == -1 || + tOp->setValue((Uint32)3, buf) == -1) { + setErrorCode(tOp); + return -1; + } + buf += thePartSize; + n++; + theNewPartFlag = true; + } + return 0; +} + +int +NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count) +{ + DBG("updateParts part=" << part << " count=" << count); + Uint32 n = 0; + while (n < count) { + NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); + if (tOp == NULL || + tOp->updateTuple() == -1 || + setPartKeyValue(tOp, part + n) == -1 || + tOp->setValue((Uint32)3, buf) == -1) { + setErrorCode(tOp); + return -1; + } + buf += thePartSize; + n++; + theNewPartFlag = true; + } + return 0; +} + +int +NdbBlob::deleteParts(Uint32 part, Uint32 count) +{ + DBG("deleteParts part=" << part << " count=" << count); + Uint32 n = 0; + while (n < count) { + NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTableName); + if (tOp == NULL || + tOp->deleteTuple() == -1 || + setPartKeyValue(tOp, part + n) == -1) { + setErrorCode(tOp); + return -1; + } + n++; + } + return 0; +} + +// blob handle maintenance + +/* + * Prepare blob handle linked to an operation. Checks blob table. + * Allocates buffers. For key operation fetches key data from signal + * data. For read operation adds read of head+inline. + */ +int +NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn) +{ + assert(theState == Idle); + // ndb api stuff + theNdb = anOp->theNdb; + theNdbCon = aCon; // for scan, this is the real transaction (m_transConnection) + theNdbOp = anOp; + theTable = anOp->m_currentTable; + theAccessTable = anOp->m_accessTable; + theColumn = aColumn; + DBG("atPrepare"); + NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined; + switch (theColumn->getType()) { + case NdbDictionary::Column::Blob: + partType = NdbDictionary::Column::Binary; + theFillChar = 0x0; + break; + case NdbDictionary::Column::Clob: + partType = NdbDictionary::Column::Char; + theFillChar = 0x20; + break; + default: + setErrorCode(ErrUsage); + return -1; + } + // sizes + theInlineSize = theColumn->getInlineSize(); + thePartSize = theColumn->getPartSize(); + theStripeSize = theColumn->getStripeSize(); + // blob table sanity check + assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head)); + assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize); + getBlobTableName(theBlobTableName, theTable, theColumn); + const NdbDictionary::Table* bt; + const NdbDictionary::Column* bc; + if (theInlineSize >= (1 << 16) || + thePartSize == 0 || + thePartSize >= (1 << 16) || + theStripeSize == 0 || + (bt = theNdb->theDictionary->getTable(theBlobTableName)) == NULL || + (bc = bt->getColumn("DATA")) == NULL || + bc->getType() != partType || + bc->getLength() != (int)thePartSize) { + setErrorCode(ErrTable); + return -1; + } + // buffers + theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2); + theAccessKeyBuf.alloc(theAccessTable->m_sizeOfKeysInWords << 2); + theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize); + thePartBuf.alloc(thePartSize); + theHead = (Head*)theHeadInlineBuf.data; + theInlineData = theHeadInlineBuf.data + sizeof(Head); + // handle different operation types + bool supportedOp = false; + if (isKeyOp()) { + if (isTableOp()) { + // get table key + Uint32* data = (Uint32*)theKeyBuf.data; + unsigned size = theTable->m_sizeOfKeysInWords; + if (theNdbOp->getKeyFromTCREQ(data, size) == -1) { + setErrorCode(ErrUsage); + return -1; + } + } + if (isIndexOp()) { + // get index key + Uint32* data = (Uint32*)theAccessKeyBuf.data; + unsigned size = theAccessTable->m_sizeOfKeysInWords; + if (theNdbOp->getKeyFromTCREQ(data, size) == -1) { + setErrorCode(ErrUsage); + return -1; + } + } + if (isReadOp()) { + // add read of head+inline in this op + if (getHeadInlineValue(theNdbOp) == -1) + return -1; + } + if (isInsertOp()) { + // becomes NULL unless set before execute + theNullFlag = true; + theLength = 0; + } + supportedOp = true; + } + if (isScanOp()) { + // add read of head+inline in this op + if (getHeadInlineValue(theNdbOp) == -1) + return -1; + supportedOp = true; + } + if (! supportedOp) { + setErrorCode(ErrUsage); + return -1; + } + setState(Prepared); + DBG("atPrepare out"); + return 0; +} + +/* + * Before execute of prepared operation. May add new operations before + * this one. May ask that this operation and all before it (a "batch") + * is executed immediately in no-commit mode. + */ +int +NdbBlob::preExecute(ExecType anExecType, bool& batch) +{ + DBG("preExecute"); + if (theState == Invalid) + return -1; + assert(theState == Prepared); + // handle different operation types + assert(isKeyOp()); + if (isReadOp()) { + if (theGetFlag && theGetSetBytes > theInlineSize) { + // need blob head before proceeding + batch = true; + } + } + if (isInsertOp()) { + if (theSetFlag && theGetSetBytes > theInlineSize) { + // add ops to write rest of a setValue + assert(theSetBuf != 0); + Uint64 pos = theInlineSize; + const char* buf = theSetBuf + theInlineSize; + Uint32 bytes = theGetSetBytes - theInlineSize; + if (writeDataPrivate(pos, buf, bytes) == -1) + return -1; + if (anExecType == Commit && theHeadInlineUpdateFlag) { + // add an operation to update head+inline + NdbOperation* tOp = theNdbCon->getNdbOperation(theTable); + if (tOp == NULL || + tOp->updateTuple() == -1 || + setTableKeyValue(tOp) == -1 || + setHeadInlineValue(tOp) == -1) { + setErrorCode(ErrAbort); + return -1; + } + } + } + } + if (isTableOp()) { + if (isUpdateOp() || isDeleteOp()) { + // add operation before this one to read head+inline + NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp); + if (tOp == NULL || + tOp->readTuple() == -1 || + setTableKeyValue(tOp) == -1 || + getHeadInlineValue(tOp) == -1) { + setErrorCode(tOp); + return -1; + } + // execute immediately + batch = true; + } + } + if (isIndexOp()) { + // add op before this one to read table key + NdbBlob* tFirstBlob = theNdbOp->theBlobList; + if (this == tFirstBlob) { + // first blob does it for all + if (g_ndb_blob_ok_to_read_index_table) { + Uint32 pkAttrId = theAccessTable->getNoOfColumns() - 1; + NdbOperation* tOp = theNdbCon->getNdbOperation(theAccessTable, theNdbOp); + if (tOp == NULL || + tOp->readTuple() == -1 || + setAccessKeyValue(tOp) == -1 || + tOp->getValue(pkAttrId, theKeyBuf.data) == NULL) { + setErrorCode(tOp); + return -1; + } + } else { + NdbOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp); + if (tOp == NULL || + tOp->readTuple() == -1 || + setAccessKeyValue(tOp) == -1 || + getTableKeyValue(tOp) == -1) { + setErrorCode(tOp); + return -1; + } + } + } + if (isUpdateOp() || isDeleteOp()) { + // add op before this one to read head+inline via index + NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp); + if (tOp == NULL || + tOp->readTuple() == -1 || + setAccessKeyValue(tOp) == -1 || + getHeadInlineValue(tOp) == -1) { + setErrorCode(tOp); + return -1; + } + // execute immediately + batch = true; + } + } + DBG("preExecute out batch=" << batch); + return 0; +} + +/* + * After execute, for any operation. If already Active, this routine + * has been done previously. Operations which requested a no-commit + * batch can add new operations after this one. They are added before + * any remaining prepared operations. + */ +int +NdbBlob::postExecute(ExecType anExecType) +{ + DBG("postExecute type=" << anExecType); + if (theState == Invalid) + return -1; + if (theState == Active) + return 0; + assert(theState == Prepared); + assert(isKeyOp()); + if (isIndexOp()) { + NdbBlob* tFirstBlob = theNdbOp->theBlobList; + if (this != tFirstBlob) { + // copy key from first blob + assert(theKeyBuf.size == tFirstBlob->theKeyBuf.size); + memcpy(theKeyBuf.data, tFirstBlob->theKeyBuf.data, tFirstBlob->theKeyBuf.size); + } + } + if (isReadOp()) { + getHeadFromRecAttr(); + if (theGetFlag && theGetSetBytes > 0) { + // copy inline bytes to user buffer + assert(theGetBuf != NULL); + unsigned n = theGetSetBytes; + if (n > theInlineSize) + n = theInlineSize; + memcpy(theGetBuf, theInlineData, n); + } + if (theGetFlag && theGetSetBytes > theInlineSize) { + // add ops to read rest of a getValue + assert(anExecType == NoCommit); + assert(theGetBuf != 0); + Uint64 pos = theInlineSize; + char* buf = theGetBuf + theInlineSize; + Uint32 bytes = theGetSetBytes - theInlineSize; + if (readDataPrivate(pos, buf, bytes) == -1) + return -1; + } + } + if (isUpdateOp()) { + assert(anExecType == NoCommit); + getHeadFromRecAttr(); + if (theSetFlag) { + // setValue overwrites everything + if (theSetBuf != 0) { + if (truncate(0) == -1) + return -1; + if (writeDataPrivate(0, theSetBuf, theGetSetBytes) == -1) + return -1; + } else { + if (setNull() == -1) + return -1; + } + } + } + if (isDeleteOp()) { + assert(anExecType == NoCommit); + getHeadFromRecAttr(); + if (deleteParts(0, getPartCount()) == -1) + return -1; + } + theNewPartFlag = false; + setState(anExecType == NoCommit ? Active : Closed); + DBG("postExecute out"); + return 0; +} + +/* + * Before commit of completed operation. For write add operation to + * update head+inline. + */ +int +NdbBlob::preCommit() +{ + DBG("preCommit"); + if (theState == Invalid) + return -1; + assert(theState == Active); + assert(isKeyOp()); + if (isInsertOp() || isUpdateOp()) { + if (theHeadInlineUpdateFlag) { + // add an operation to update head+inline + NdbOperation* tOp = theNdbCon->getNdbOperation(theTable); + if (tOp == NULL || + tOp->updateTuple() == -1 || + setTableKeyValue(tOp) == -1 || + setHeadInlineValue(tOp) == -1) { + setErrorCode(ErrAbort); + return -1; + } + } + } + DBG("preCommit out"); + return 0; +} + +/* + * After next scan result. Handle like read op above. + */ +int +NdbBlob::atNextResult() +{ + DBG("atNextResult"); + if (theState == Invalid) + return -1; + assert(isScanOp()); + getHeadFromRecAttr(); + // reset position + thePos = 0; + // get primary key + { Uint32* data = (Uint32*)theKeyBuf.data; + unsigned size = theTable->m_sizeOfKeysInWords; + if (theNdbOp->getKeyFromKEYINFO20(data, size) == -1) { + setErrorCode(ErrUsage); + return -1; + } + } + if (! theNullFlag) { + if (theGetFlag && theGetSetBytes > 0) { + // copy inline bytes to user buffer + assert(theGetBuf != NULL); + unsigned n = theGetSetBytes; + if (n > theLength) + n = theLength; + if (n > theInlineSize) + n = theInlineSize; + memcpy(theGetBuf, theInlineData, n); + } + if (theGetFlag && theGetSetBytes > theInlineSize && theLength > theInlineSize) { + // add ops to read rest of a getValue + assert(theGetBuf != 0); + Uint64 pos = theInlineSize; + char* buf = theGetBuf + theInlineSize; + Uint32 bytes = theGetSetBytes - theInlineSize; + if (readDataPrivate(pos, buf, bytes) == -1) + return -1; + // must also execute them + DBG("force execute"); + if (theNdbCon->executeNoBlobs(NoCommit) == -1) { + setErrorCode((NdbOperation*)0); + return -1; + } + } + } + setState(Active); + DBG("atNextResult out"); + return 0; +} + + +// misc + +const NdbDictionary::Column* +NdbBlob::getColumn() +{ + return theColumn; +} + +// errors + +void +NdbBlob::setErrorCode(int anErrorCode, bool invalidFlag) +{ + DBG("setErrorCode code=" << anErrorCode); + theError.code = anErrorCode; + if (invalidFlag) + setState(Invalid); +} + +void +NdbBlob::setErrorCode(NdbOperation* anOp, bool invalidFlag) +{ + int code = 0; + if (anOp != NULL && (code = anOp->theError.code) != 0) + ; + else if ((code = theNdbCon->theError.code) != 0) + ; + else if ((code = theNdb->theError.code) != 0) + ; + else + code = ErrUnknown; + setErrorCode(code, invalidFlag); +} + +void +NdbBlob::setErrorCode(NdbConnection* aCon, bool invalidFlag) +{ + int code = 0; + if (theNdbCon != NULL && (code = theNdbCon->theError.code) != 0) + ; + else if ((code = theNdb->theError.code) != 0) + ; + else + code = ErrUnknown; + setErrorCode(code, invalidFlag); +} + +#ifdef VM_TRACE +NdbOut& +operator<<(NdbOut& out, const NdbBlob& blob) +{ + ndbout << dec << "s=" << blob.theState; + ndbout << dec << " n=" << blob.theNullFlag;; + ndbout << dec << " l=" << blob.theLength; + ndbout << dec << " p=" << blob.thePos; + ndbout << dec << " u=" << blob.theHeadInlineUpdateFlag; + return out; +} +#endif diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index 4ec098c3c60..0febafbcd64 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -35,6 +35,7 @@ Adjust: 971022 UABMNST First version. #include "NdbApiSignal.hpp" #include "TransporterFacade.hpp" #include "API.hpp" +#include "NdbBlob.hpp" #include <ndb_limits.h> #include <signaldata/TcKeyConf.hpp> @@ -89,7 +90,8 @@ NdbConnection::NdbConnection( Ndb* aNdb ) : theCurrentScanRec(NULL), thePreviousScanRec(NULL), theScanningOp(NULL), - theBuddyConPtr(0xFFFFFFFF) + theBuddyConPtr(0xFFFFFFFF), + theBlobFlag(false) { theListState = NotInList; theError.code = 0; @@ -152,6 +154,8 @@ NdbConnection::init() m_theLastCursorOperation = NULL; m_firstExecutedCursorOp = 0; theBuddyConPtr = 0xFFFFFFFF; + // + theBlobFlag = false; }//NdbConnection::init() /***************************************************************************** @@ -251,6 +255,86 @@ NdbConnection::execute(ExecType aTypeOfExec, AbortOption abortOption, int forceSend) { + if (! theBlobFlag) + return executeNoBlobs(aTypeOfExec, abortOption, forceSend); + + // execute prepared ops in batches, as requested by blobs + + ExecType tExecType; + NdbOperation* tPrepOp; + + do { + tExecType = aTypeOfExec; + tPrepOp = theFirstOpInList; + while (tPrepOp != NULL) { + bool batch = false; + NdbBlob* tBlob = tPrepOp->theBlobList; + while (tBlob != NULL) { + if (tBlob->preExecute(tExecType, batch) == -1) + return -1; + tBlob = tBlob->theNext; + } + if (batch) { + // blob asked to execute all up to here now + tExecType = NoCommit; + break; + } + tPrepOp = tPrepOp->next(); + } + // save rest of prepared ops if batch + NdbOperation* tRestOp; + NdbOperation* tLastOp; + if (tPrepOp != NULL) { + tRestOp = tPrepOp->next(); + tPrepOp->next(NULL); + tLastOp = theLastOpInList; + theLastOpInList = tPrepOp; + } + if (tExecType == Commit) { + NdbOperation* tOp = theCompletedFirstOp; + while (tOp != NULL) { + NdbBlob* tBlob = tOp->theBlobList; + while (tBlob != NULL) { + if (tBlob->preCommit() == -1) + return -1; + tBlob = tBlob->theNext; + } + tOp = tOp->next(); + } + } + if (executeNoBlobs(tExecType, abortOption, forceSend) == -1) + return -1; + { + NdbOperation* tOp = theCompletedFirstOp; + while (tOp != NULL) { + NdbBlob* tBlob = tOp->theBlobList; + while (tBlob != NULL) { + // may add new operations if batch + if (tBlob->postExecute(tExecType) == -1) + return -1; + tBlob = tBlob->theNext; + } + tOp = tOp->next(); + } + } + // add saved prepared ops if batch + if (tPrepOp != NULL && tRestOp != NULL) { + if (theFirstOpInList == NULL) + theFirstOpInList = tRestOp; + else + theLastOpInList->next(tRestOp); + theLastOpInList = tLastOp; + } + } while (theFirstOpInList != NULL || tExecType != aTypeOfExec); + + return 0; +} + +int +NdbConnection::executeNoBlobs(ExecType aTypeOfExec, + AbortOption abortOption, + int forceSend) +{ //------------------------------------------------------------------------ // We will start by preparing all operations in the transaction defined // since last execute or since beginning. If this works ok we will continue @@ -330,7 +414,6 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec, * Reset error.code on execute */ theError.code = 0; - NdbCursorOperation* tcOp = m_theFirstCursorOperation; if (tcOp != 0){ // Execute any cursor operations @@ -885,7 +968,7 @@ Remark: Get an operation from NdbOperation object idlelist and object, synchronous. *****************************************************************************/ NdbOperation* -NdbConnection::getNdbOperation(NdbTableImpl * tab) +NdbConnection::getNdbOperation(NdbTableImpl * tab, NdbOperation* aNextOp) { NdbOperation* tOp; @@ -897,14 +980,28 @@ NdbConnection::getNdbOperation(NdbTableImpl * tab) tOp = theNdb->getOperation(); if (tOp == NULL) goto getNdbOp_error1; - if (theLastOpInList != NULL) { - theLastOpInList->next(tOp); - theLastOpInList = tOp; + if (aNextOp == NULL) { + if (theLastOpInList != NULL) { + theLastOpInList->next(tOp); + theLastOpInList = tOp; + } else { + theLastOpInList = tOp; + theFirstOpInList = tOp; + }//if + tOp->next(NULL); } else { - theLastOpInList = tOp; - theFirstOpInList = tOp; - }//if - tOp->next(NULL); + // add before the given op + if (theFirstOpInList == aNextOp) { + theFirstOpInList = tOp; + } else { + NdbOperation* aLoopOp = theFirstOpInList; + while (aLoopOp != NULL && aLoopOp->next() != aNextOp) + aLoopOp = aLoopOp->next(); + assert(aLoopOp != NULL); + aLoopOp->next(tOp); + } + tOp->next(aNextOp); + } if (tOp->init(tab, this) != -1) { return tOp; } else { @@ -1068,21 +1165,36 @@ Remark: Get an operation from NdbIndexOperation object idlelist and get *****************************************************************************/ NdbIndexOperation* NdbConnection::getNdbIndexOperation(NdbIndexImpl * anIndex, - NdbTableImpl * aTable) + NdbTableImpl * aTable, + NdbOperation* aNextOp) { NdbIndexOperation* tOp; tOp = theNdb->getIndexOperation(); if (tOp == NULL) goto getNdbOp_error1; - if (theLastOpInList != NULL) { - theLastOpInList->next(tOp); - theLastOpInList = tOp; + if (aNextOp == NULL) { + if (theLastOpInList != NULL) { + theLastOpInList->next(tOp); + theLastOpInList = tOp; + } else { + theLastOpInList = tOp; + theFirstOpInList = tOp; + }//if + tOp->next(NULL); } else { - theLastOpInList = tOp; - theFirstOpInList = tOp; - }//if - tOp->next(NULL); + // add before the given op + if (theFirstOpInList == aNextOp) { + theFirstOpInList = tOp; + } else { + NdbOperation* aLoopOp = theFirstOpInList; + while (aLoopOp != NULL && aLoopOp->next() != aNextOp) + aLoopOp = aLoopOp->next(); + assert(aLoopOp != NULL); + aLoopOp->next(tOp); + } + tOp->next(aNextOp); + } if (tOp->indxInit(anIndex, aTable, this)!= -1) { return tOp; } else { diff --git a/ndb/src/ndbapi/NdbDictionary.cpp b/ndb/src/ndbapi/NdbDictionary.cpp index 89ed801bec5..2cd7d1633aa 100644 --- a/ndb/src/ndbapi/NdbDictionary.cpp +++ b/ndb/src/ndbapi/NdbDictionary.cpp @@ -268,6 +268,9 @@ NdbDictionary::Table::addColumn(const Column & c){ if(c.getPrimaryKey()){ m_impl.m_noOfKeys++; } + if (col->getBlobType()) { + m_impl.m_noOfBlobs++; + } m_impl.buildColumnHash(); } diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index 89c4fb19399..43d4bb66d72 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -35,6 +35,7 @@ #include <AttributeList.hpp> #include <NdbEventOperation.hpp> #include "NdbEventOperationImpl.hpp" +#include "NdbBlob.hpp" #define DEBUG_PRINT 0 #define INCOMPATIBLE_VERSION -2 @@ -179,7 +180,14 @@ NdbColumnImpl::equal(const NdbColumnImpl& col) const case NdbDictionary::Column::Double: case NdbDictionary::Column::Datetime: case NdbDictionary::Column::Timespec: + break; case NdbDictionary::Column::Blob: + case NdbDictionary::Column::Clob: + if (m_precision != col.m_precision || + m_scale != col.m_scale || + m_length != col.m_length) { + return false; + } break; } if (m_autoIncrement != col.m_autoIncrement){ @@ -224,6 +232,8 @@ NdbTableImpl::NdbTableImpl() : NdbDictionary::Table(* this), m_facade(this) { m_noOfKeys = 0; + m_sizeOfKeysInWords = 0; + m_noOfBlobs = 0; m_index = 0; init(); } @@ -258,6 +268,8 @@ NdbTableImpl::init(){ m_indexType = NdbDictionary::Index::Undefined; m_noOfKeys = 0; + m_sizeOfKeysInWords = 0; + m_noOfBlobs = 0; } bool @@ -337,6 +349,8 @@ NdbTableImpl::assign(const NdbTableImpl& org) m_index = org.m_index; m_noOfKeys = org.m_noOfKeys; + m_sizeOfKeysInWords = org.m_sizeOfKeysInWords; + m_noOfBlobs = org.m_noOfBlobs; m_version = org.m_version; m_status = org.m_status; @@ -1077,6 +1091,8 @@ columnTypeMapping[] = { { DictTabInfo::ExtVarbinary, NdbDictionary::Column::Varbinary }, { DictTabInfo::ExtDatetime, NdbDictionary::Column::Datetime }, { DictTabInfo::ExtTimespec, NdbDictionary::Column::Timespec }, + { DictTabInfo::ExtBlob, NdbDictionary::Column::Blob }, + { DictTabInfo::ExtClob, NdbDictionary::Column::Clob }, { -1, -1 } }; @@ -1131,6 +1147,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, Uint32 keyInfoPos = 0; Uint32 keyCount = 0; + Uint32 blobCount; for(Uint32 i = 0; i < tableDesc.NoOfAttributes; i++) { DictTabInfo::Attribute attrDesc; attrDesc.init(); @@ -1187,6 +1204,8 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, } else { col->m_keyInfoPos = 0; } + if (col->getBlobType()) + blobCount++; NdbColumnImpl * null = 0; impl->m_columns.fill(attrDesc.AttributeId, null); @@ -1199,6 +1218,8 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, it.next(); } impl->m_noOfKeys = keyCount; + impl->m_sizeOfKeysInWords = keyInfoPos; + impl->m_noOfBlobs = blobCount; * ret = impl; return 0; } @@ -1206,6 +1227,43 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, /***************************************************************** * Create table and alter table */ +int +NdbDictionaryImpl::createTable(NdbTableImpl &t) +{ + if (m_receiver.createTable(m_ndb, t) != 0) + return -1; + if (t.m_noOfBlobs == 0) + return 0; + // update table def from DICT + NdbTableImpl * tp = getTable(t.m_externalName.c_str()); + if (tp == NULL) { + m_error.code = 709; + return -1; + } + if (createBlobTables(* tp) != 0) { + int save_code = m_error.code; + (void)dropTable(t); + m_error.code = save_code; + return -1; + } + return 0; +} + +int +NdbDictionaryImpl::createBlobTables(NdbTableImpl &t) +{ + for (unsigned i = 0; i < t.m_columns.size(); i++) { + NdbColumnImpl & c = *t.m_columns[i]; + if (! c.getBlobType()) + continue; + NdbTableImpl bt; + NdbBlob::getBlobTable(bt, &t, &c); + if (createTable(bt) != 0) + return -1; + } + return 0; +} + int NdbDictInterface::createTable(Ndb & ndb, NdbTableImpl & impl) @@ -1540,6 +1598,12 @@ NdbDictionaryImpl::dropTable(NdbTableImpl & impl) if (dropIndex(element.name, name) == -1) return -1; } + + if (impl.m_noOfBlobs != 0) { + if (dropBlobTables(impl) != 0) + return -1; + } + int ret = m_receiver.dropTable(impl); if(ret == 0){ const char * internalTableName = impl.m_internalName.c_str(); @@ -1555,6 +1619,23 @@ NdbDictionaryImpl::dropTable(NdbTableImpl & impl) } int +NdbDictionaryImpl::dropBlobTables(NdbTableImpl & t) +{ + for (unsigned i = 0; i < t.m_columns.size(); i++) { + NdbColumnImpl & c = *t.m_columns[i]; + if (! c.getBlobType()) + continue; + char btname[NdbBlob::BlobTableNameSize]; + NdbBlob::getBlobTableName(btname, &t, &c); + if (dropTable(btname) != 0) { + if (m_error.code != 709) + return -1; + } + } + return 0; +} + +int NdbDictInterface::dropTable(const NdbTableImpl & impl) { NdbApiSignal tSignal(m_reference); diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/ndb/src/ndbapi/NdbDictionaryImpl.hpp index 3263a636a79..83661a137fb 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.hpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.hpp @@ -81,6 +81,7 @@ public: Uint32 m_keyInfoPos; Uint32 m_extType; // used by restore (kernel type in versin v2x) bool getInterpretableType() const ; + bool getBlobType() const; /** * Equality/assign @@ -141,6 +142,8 @@ public: * Aggregates */ Uint32 m_noOfKeys; + unsigned short m_sizeOfKeysInWords; + unsigned short m_noOfBlobs; /** * Equality/assign @@ -352,13 +355,12 @@ public: bool setTransporter(class Ndb * ndb, class TransporterFacade * tf); bool setTransporter(class TransporterFacade * tf); - int createTable(NdbTableImpl &t) - { - return m_receiver.createTable(m_ndb, t); - } + int createTable(NdbTableImpl &t); + int createBlobTables(NdbTableImpl &); int alterTable(NdbTableImpl &t); int dropTable(const char * name); int dropTable(NdbTableImpl &); + int dropBlobTables(NdbTableImpl &); int invalidateObject(NdbTableImpl &); int removeCachedObject(NdbTableImpl &); @@ -432,6 +434,13 @@ NdbColumnImpl::getInterpretableType() const { } inline +bool +NdbColumnImpl::getBlobType() const { + return (m_type == NdbDictionary::Column::Blob || + m_type == NdbDictionary::Column::Clob); +} + +inline NdbTableImpl & NdbTableImpl::getImpl(NdbDictionary::Table & t){ return t.m_impl; diff --git a/ndb/src/ndbapi/NdbIndexOperation.cpp b/ndb/src/ndbapi/NdbIndexOperation.cpp index ee5491d72a8..d2e7d71b199 100644 --- a/ndb/src/ndbapi/NdbIndexOperation.cpp +++ b/ndb/src/ndbapi/NdbIndexOperation.cpp @@ -372,6 +372,17 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, } else if ((tOpType == ReadRequest) || (tOpType == DeleteRequest) || (tOpType == ReadExclusive)) { theStatus = GetValue; + // create blob handles automatically + if (tOpType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) { + for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) { + NdbColumnImpl* c = m_currentTable->m_columns[i]; + assert(c != 0); + if (c->getBlobType()) { + if (getBlobHandle(theNdbCon, c) == NULL) + return -1; + } + } + } return 0; } else if ((tOpType == InsertRequest) || (tOpType == WriteRequest)) { theStatus = SetValue; diff --git a/ndb/src/ndbapi/NdbOperation.cpp b/ndb/src/ndbapi/NdbOperation.cpp index ccbfa767542..d0d2839a6ab 100644 --- a/ndb/src/ndbapi/NdbOperation.cpp +++ b/ndb/src/ndbapi/NdbOperation.cpp @@ -31,6 +31,7 @@ #include "NdbApiSignal.hpp" #include "NdbRecAttr.hpp" #include "NdbUtil.hpp" +#include "NdbBlob.hpp" #include <signaldata/TcKeyReq.hpp> #include "NdbDictionaryImpl.hpp" @@ -103,7 +104,8 @@ NdbOperation::NdbOperation(Ndb* aNdb) : theFirstSCAN_TABINFO_Recv(NULL), theLastSCAN_TABINFO_Recv(NULL), theSCAN_TABCONF_Recv(NULL), - theBoundATTRINFO(NULL) + theBoundATTRINFO(NULL), + theBlobList(NULL) { theReceiver.init(NdbReceiver::NDB_OPERATION, this); theError.code = 0; @@ -197,6 +199,7 @@ NdbOperation::init(NdbTableImpl* tab, NdbConnection* myConnection){ theTotalNrOfKeyWordInSignal = 8; theMagicNumber = 0xABCDEF01; theBoundATTRINFO = NULL; + theBlobList = NULL; tSignal = theNdb->getSignal(); if (tSignal == NULL) @@ -236,6 +239,8 @@ NdbOperation::release() NdbCall* tSaveCall; NdbSubroutine* tSubroutine; NdbSubroutine* tSaveSubroutine; + NdbBlob* tBlob; + NdbBlob* tSaveBlob; if (theTCREQ != NULL) { @@ -308,6 +313,14 @@ NdbOperation::release() } theBoundATTRINFO = NULL; } + tBlob = theBlobList; + while (tBlob != NULL) + { + tSaveBlob = tBlob; + tBlob = tBlob->theNext; + theNdb->releaseNdbBlob(tSaveBlob); + } + theBlobList = NULL; releaseScan(); } @@ -356,6 +369,18 @@ NdbOperation::setValue( Uint32 anAttrId, return setValue(m_currentTable->getColumn(anAttrId), aValuePassed, len); } +NdbBlob* +NdbOperation::getBlobHandle(const char* anAttrName) +{ + return getBlobHandle(theNdbCon, m_currentTable->getColumn(anAttrName)); +} + +NdbBlob* +NdbOperation::getBlobHandle(Uint32 anAttrId) +{ + return getBlobHandle(theNdbCon, m_currentTable->getColumn(anAttrId)); +} + int NdbOperation::incValue(const char* anAttrName, Uint32 aValue) { @@ -428,4 +453,8 @@ NdbOperation::setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len); } - +const char* +NdbOperation::getTableName() const +{ + return m_currentTable->m_externalName.c_str(); +} diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp index 18f8b79d12e..2ba86d93251 100644 --- a/ndb/src/ndbapi/NdbOperationDefine.cpp +++ b/ndb/src/ndbapi/NdbOperationDefine.cpp @@ -35,6 +35,7 @@ #include "NdbUtil.hpp" #include "NdbOut.hpp" #include "NdbImpl.hpp" +#include "NdbBlob.hpp" #include <Interpreter.hpp> @@ -604,6 +605,33 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, return 0; }//NdbOperation::setValue() +NdbBlob* +NdbOperation::getBlobHandle(NdbConnection* aCon, const NdbColumnImpl* tAttrInfo) +{ + NdbBlob* tBlob = theBlobList; + NdbBlob* tLastBlob = NULL; + while (tBlob != NULL) { + if (tBlob->theColumn == tAttrInfo) + return tBlob; + tLastBlob = tBlob; + tBlob = tBlob->theNext; + } + tBlob = theNdb->getNdbBlob(); + if (tBlob == NULL) + return NULL; + if (tBlob->atPrepare(aCon, this, tAttrInfo) == -1) { + theNdb->releaseNdbBlob(tBlob); + return NULL; + } + if (tLastBlob == NULL) + theBlobList = tBlob; + else + tLastBlob->theNext = tBlob; + tBlob->theNext = NULL; + theNdbCon->theBlobFlag = true; + return tBlob; +} + /* * Define bound on index column in range scan. */ diff --git a/ndb/src/ndbapi/NdbOperationScan.cpp b/ndb/src/ndbapi/NdbOperationScan.cpp index df4f2421ec0..b068ef5d467 100644 --- a/ndb/src/ndbapi/NdbOperationScan.cpp +++ b/ndb/src/ndbapi/NdbOperationScan.cpp @@ -569,8 +569,35 @@ NdbOperation::takeOverScanOp(OperationType opType, NdbConnection* updateTrans) } } + // create blob handles automatically + if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) { + for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) { + NdbColumnImpl* c = m_currentTable->m_columns[i]; + assert(c != 0); + if (c->getBlobType()) { + if (newOp->getBlobHandle(updateTrans, c) == NULL) + return NULL; + } + } + } + return newOp; } - - +int +NdbOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size) +{ + const NdbScanReceiver* tScanRec = theNdbCon->thePreviousScanRec; + NdbApiSignal* tSignal = tScanRec->theFirstKEYINFO20_Recv; + unsigned pos = 0; + unsigned n = 0; + while (pos < size) { + if (n == 20) { + tSignal = tSignal->next(); + n = 0; + } + const unsigned h = KeyInfo20::HeaderLength; + data[pos++] = tSignal->getDataPtrSend()[h + n++]; + } + return 0; +} diff --git a/ndb/src/ndbapi/NdbOperationSearch.cpp b/ndb/src/ndbapi/NdbOperationSearch.cpp index e1d5e823077..a96646f967f 100644 --- a/ndb/src/ndbapi/NdbOperationSearch.cpp +++ b/ndb/src/ndbapi/NdbOperationSearch.cpp @@ -252,6 +252,17 @@ NdbOperation::equal_impl(const NdbColumnImpl* tAttrInfo, } else if ((tOpType == ReadRequest) || (tOpType == DeleteRequest) || (tOpType == ReadExclusive)) { theStatus = GetValue; + // create blob handles automatically + if (tOpType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) { + for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) { + NdbColumnImpl* c = m_currentTable->m_columns[i]; + assert(c != 0); + if (c->getBlobType()) { + if (getBlobHandle(theNdbCon, c) == NULL) + return -1; + } + } + } return 0; } else if ((tOpType == InsertRequest) || (tOpType == WriteRequest)) { theStatus = SetValue; @@ -498,3 +509,24 @@ LastWordLabel: return 0; } + +int +NdbOperation::getKeyFromTCREQ(Uint32* data, unsigned size) +{ + assert(m_accessTable != 0 && m_accessTable->m_sizeOfKeysInWords != 0); + assert(m_accessTable->m_sizeOfKeysInWords == size); + unsigned pos = 0; + while (pos < 8 && pos < size) { + data[pos++] = theKEYINFOptr[pos]; + } + NdbApiSignal* tSignal = theFirstKEYINFO; + unsigned n = 0; + while (pos < size) { + if (n == 20) { + tSignal = tSignal->next(); + n = 0; + } + data[pos++] = tSignal->getDataPtrSend()[3 + n++]; + } + return 0; +} diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 4db0f30f56c..75b5c2103a9 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -34,6 +34,7 @@ #include "NdbApiSignal.hpp" #include <NdbOut.hpp> #include "NdbDictionaryImpl.hpp" +#include "NdbBlob.hpp" NdbScanOperation::NdbScanOperation(Ndb* aNdb) : NdbCursorOperation(aNdb), @@ -292,6 +293,18 @@ int NdbScanOperation::setValue(Uint32 anAttrId, double aValue) return 0; } +NdbBlob* +NdbScanOperation::getBlobHandle(const char* anAttrName) +{ + return NdbOperation::getBlobHandle(m_transConnection, m_currentTable->getColumn(anAttrName)); +} + +NdbBlob* +NdbScanOperation::getBlobHandle(Uint32 anAttrId) +{ + return NdbOperation::getBlobHandle(m_transConnection, m_currentTable->getColumn(anAttrId)); +} + // Private methods int NdbScanOperation::executeCursor(int ProcessorId) @@ -342,6 +355,15 @@ int NdbScanOperation::nextResult(bool fetchAllowed) const NdbError err = theNdbCon->getNdbError(); m_transConnection->setOperationErrorCode(err.code); } + if (result == 0) { + // handle blobs + NdbBlob* tBlob = theBlobList; + while (tBlob != NULL) { + if (tBlob->atNextResult() == -1) + return -1; + tBlob = tBlob->theNext; + } + } return result; } diff --git a/ndb/src/ndbapi/Ndberr.cpp b/ndb/src/ndbapi/Ndberr.cpp index faa2f00cfce..11df77ae614 100644 --- a/ndb/src/ndbapi/Ndberr.cpp +++ b/ndb/src/ndbapi/Ndberr.cpp @@ -21,6 +21,7 @@ #include <NdbSchemaCon.hpp> #include <NdbOperation.hpp> #include <NdbConnection.hpp> +#include <NdbBlob.hpp> static void @@ -73,3 +74,10 @@ NdbSchemaCon::getNdbError() const { update(theError); return theError; } + +const +NdbError & +NdbBlob::getNdbError() const { + update(theError); + return theError; +} diff --git a/ndb/src/ndbapi/Ndbinit.cpp b/ndb/src/ndbapi/Ndbinit.cpp index be7acc48d7a..55d0d6b9c26 100644 --- a/ndb/src/ndbapi/Ndbinit.cpp +++ b/ndb/src/ndbapi/Ndbinit.cpp @@ -83,6 +83,7 @@ Ndb::Ndb( const char* aDataBase , const char* aDataBaseSchema) : theSubroutineList(NULL), theCallList(NULL), theScanList(NULL), + theNdbBlobIdleList(NULL), theNoOfDBnodes(0), theDBnodes(NULL), the_release_ind(NULL), @@ -231,6 +232,8 @@ Ndb::~Ndb() freeNdbCall(); while (theScanList != NULL) freeNdbScanRec(); + while (theNdbBlobIdleList != NULL) + freeNdbBlob(); releaseTransactionArrays(); startTransactionNodeSelectionData.release(); diff --git a/ndb/src/ndbapi/Ndblist.cpp b/ndb/src/ndbapi/Ndblist.cpp index 3839cc3291b..a0139eb1e4a 100644 --- a/ndb/src/ndbapi/Ndblist.cpp +++ b/ndb/src/ndbapi/Ndblist.cpp @@ -27,6 +27,7 @@ #include "NdbScanReceiver.hpp" #include "NdbUtil.hpp" #include "API.hpp" +#include "NdbBlob.hpp" void Ndb::checkFailedNode() @@ -435,6 +436,19 @@ Ndb::getSignal() return tSignal; } +NdbBlob* +Ndb::getNdbBlob() +{ + NdbBlob* tBlob = theNdbBlobIdleList; + if (tBlob != NULL) { + theNdbBlobIdleList = tBlob->theNext; + tBlob->init(); + } else { + tBlob = new NdbBlob; + } + return tBlob; +} + /*************************************************************************** void releaseNdbBranch(NdbBranch* aNdbBranch); @@ -601,6 +615,14 @@ Ndb::releaseSignalsInList(NdbApiSignal** pList){ } } +void +Ndb::releaseNdbBlob(NdbBlob* aBlob) +{ + aBlob->release(); + aBlob->theNext = theNdbBlobIdleList; + theNdbBlobIdleList = aBlob; +} + /*************************************************************************** void freeOperation(); @@ -745,6 +767,14 @@ Ndb::freeSignal() cfreeSignals++; } +void +Ndb::freeNdbBlob() +{ + NdbBlob* tBlob = theNdbBlobIdleList; + theNdbBlobIdleList = tBlob->theNext; + delete tBlob; +} + /**************************************************************************** int releaseConnectToNdb(NdbConnection* aConnectConnection); diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c index ea7cf4de426..760322d669d 100644 --- a/ndb/src/ndbapi/ndberror.c +++ b/ndb/src/ndbapi/ndberror.c @@ -418,8 +418,14 @@ ErrorBundle ErrorCodes[] = { { 4259, AE, "Invalid set of range scan bounds" }, { 4260, UD, "NdbScanFilter: Operator is not defined in NdbScanFilter::Group"}, { 4261, UD, "NdbScanFilter: Column is NULL"}, - { 4262, UD, "NdbScanFilter: Condition is out of bounds"} - + { 4262, UD, "NdbScanFilter: Condition is out of bounds"}, + { 4263, IE, "Invalid blob attributes or invalid blob parts table" }, + { 4264, AE, "Invalid usage of blob attribute" }, + { 4265, AE, "Method is not valid in current blob state" }, + { 4266, AE, "Invalid blob seek position" }, + { 4267, IE, "Corrupted blob value" }, + { 4268, IE, "Error in blob head update forced rollback of transaction" }, + { 4268, IE, "Unknown blob error" } }; static |