diff options
Diffstat (limited to 'ndb/src/ndbapi')
-rw-r--r-- | ndb/src/ndbapi/Ndb.cpp | 7 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbBlob.cpp | 404 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbConnection.cpp | 20 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbDictionaryImpl.cpp | 7 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbIndexOperation.cpp | 26 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperation.cpp | 4 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperationExec.cpp | 14 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbScanOperation.cpp | 4 | ||||
-rw-r--r-- | ndb/src/ndbapi/TransporterFacade.cpp | 159 | ||||
-rw-r--r-- | ndb/src/ndbapi/TransporterFacade.hpp | 10 | ||||
-rw-r--r-- | ndb/src/ndbapi/ndberror.c | 2 |
11 files changed, 460 insertions, 197 deletions
diff --git a/ndb/src/ndbapi/Ndb.cpp b/ndb/src/ndbapi/Ndb.cpp index d7b8a695fe2..75ae539fc8b 100644 --- a/ndb/src/ndbapi/Ndb.cpp +++ b/ndb/src/ndbapi/Ndb.cpp @@ -1386,6 +1386,7 @@ Ndb::printState(const char* fmt, ...) va_end(ap); NdbMutex_Lock(ndb_print_state_mutex); bool dups = false; + unsigned i; ndbout << buf << " ndb=" << hex << this << dec; #ifndef NDB_WIN32 ndbout << " thread=" << (int)pthread_self(); @@ -1406,21 +1407,21 @@ Ndb::printState(const char* fmt, ...) ndbout << "!! DUPS !!" << endl; dups = true; } - for (unsigned i = 0; i < theNoOfPreparedTransactions; i++) + for (i = 0; i < theNoOfPreparedTransactions; i++) thePreparedTransactionsArray[i]->printState(); ndbout << "sent: " << theNoOfSentTransactions<< endl; if (checkdups(theSentTransactionsArray, theNoOfSentTransactions)) { ndbout << "!! DUPS !!" << endl; dups = true; } - for (unsigned i = 0; i < theNoOfSentTransactions; i++) + for (i = 0; i < theNoOfSentTransactions; i++) theSentTransactionsArray[i]->printState(); ndbout << "completed: " << theNoOfCompletedTransactions<< endl; if (checkdups(theCompletedTransactionsArray, theNoOfCompletedTransactions)) { ndbout << "!! DUPS !!" << endl; dups = true; } - for (unsigned i = 0; i < theNoOfCompletedTransactions; i++) + for (i = 0; i < theNoOfCompletedTransactions; i++) theCompletedTransactionsArray[i]->printState(); NdbMutex_Unlock(ndb_print_state_mutex); } diff --git a/ndb/src/ndbapi/NdbBlob.cpp b/ndb/src/ndbapi/NdbBlob.cpp index feab95d8ca5..53c0a0e07f9 100644 --- a/ndb/src/ndbapi/NdbBlob.cpp +++ b/ndb/src/ndbapi/NdbBlob.cpp @@ -33,21 +33,24 @@ ndbout << prefix << " " << hex << (void*)this << " " << cname; \ ndbout << " " << dec << __LINE__ << " " << x << " " << *this << endl; \ } while (0) -#else -#define DBG(x) -#endif static char* ndb_blob_debug(const Uint32* data, unsigned size) { - static char buf[128 + 1]; // MT irrelevant + static char buf[200]; // MT irrelevant buf[0] = 0; - for (unsigned i = 0; i < size && i < 128 / 4; i++) { - sprintf(buf + strlen(buf), "%*s%08x", i != 0, "", data[i]); + for (unsigned i = 0; i < size; i++) { + unsigned n = strlen(buf); + if (n + 10 < sizeof(buf)) + sprintf(buf + n, "%*s%08x", i != 0, "", data[i]); } return buf; } +#else +#define DBG(x) +#endif + /* * Reading index table directly (as a table) is faster but there are * bugs or limitations. Keep the code and make possible to choose. @@ -94,22 +97,24 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm bt.setName(btname); bt.setLogging(t->getLogging()); bt.setFragmentType(t->getFragmentType()); - { NdbDictionary::Column bc("DIST"); + { NdbDictionary::Column bc("PK"); bc.setType(NdbDictionary::Column::Unsigned); + assert(t->m_sizeOfKeysInWords != 0); + bc.setLength(t->m_sizeOfKeysInWords); bc.setPrimaryKey(true); bc.setDistributionKey(true); bt.addColumn(bc); } - { NdbDictionary::Column bc("PART"); + { NdbDictionary::Column bc("DIST"); bc.setType(NdbDictionary::Column::Unsigned); bc.setPrimaryKey(true); + bc.setDistributionKey(true); bt.addColumn(bc); } - { NdbDictionary::Column bc("PK"); + { NdbDictionary::Column bc("PART"); bc.setType(NdbDictionary::Column::Unsigned); - assert(t->m_sizeOfKeysInWords != 0); - bc.setLength(t->m_sizeOfKeysInWords); bc.setPrimaryKey(true); + bc.setDistributionKey(false); bt.addColumn(bc); } { NdbDictionary::Column bc("DATA"); @@ -162,6 +167,7 @@ NdbBlob::init() theHead = NULL; theInlineData = NULL; theHeadInlineRecAttr = NULL; + theHeadInlineReadOp = NULL; theHeadInlineUpdateFlag = false; theNullFlag = -1; theLength = 0; @@ -206,6 +212,13 @@ NdbBlob::Buf::alloc(unsigned n) #endif } +void +NdbBlob::Buf::copyfrom(const NdbBlob::Buf& src) +{ + assert(size == src.size); + memcpy(data, src.data, size); +} + // classify operations (inline) inline bool @@ -226,6 +239,7 @@ NdbBlob::isKeyOp() return theNdbOp->theOperationType == NdbOperation::InsertRequest || theNdbOp->theOperationType == NdbOperation::UpdateRequest || + theNdbOp->theOperationType == NdbOperation::WriteRequest || theNdbOp->theOperationType == NdbOperation::ReadRequest || theNdbOp->theOperationType == NdbOperation::ReadExclusive || theNdbOp->theOperationType == NdbOperation::DeleteRequest; @@ -254,6 +268,13 @@ NdbBlob::isUpdateOp() } inline bool +NdbBlob::isWriteOp() +{ + return + theNdbOp->theOperationType == NdbOperation::WriteRequest; +} + +inline bool NdbBlob::isDeleteOp() { return @@ -373,9 +394,10 @@ NdbBlob::setPartKeyValue(NdbOperation* anOp, Uint32 part) Uint32* data = (Uint32*)theKeyBuf.data; unsigned size = theTable->m_sizeOfKeysInWords; DBG("setPartKeyValue dist=" << getDistKey(part) << " part=" << part << " key=" << ndb_blob_debug(data, size)); - if (anOp->equal((Uint32)0, getDistKey(part)) == -1 || - anOp->equal((Uint32)1, part) == -1 || - anOp->equal((Uint32)2, theKeyBuf.data) == -1) { + // TODO use attr ids after compatibility with 4.1.7 not needed + if (anOp->equal("PK", theKeyBuf.data) == -1 || + anOp->equal("DIST", getDistKey(part)) == -1 || + anOp->equal("PART", part) == -1) { setErrorCode(anOp); return -1; } @@ -401,7 +423,7 @@ NdbBlob::getHeadFromRecAttr() theNullFlag = theHeadInlineRecAttr->isNULL(); assert(theNullFlag != -1); theLength = ! theNullFlag ? theHead->length : 0; - DBG("getHeadFromRecAttr out"); + DBG("getHeadFromRecAttr [out]"); } int @@ -453,7 +475,7 @@ NdbBlob::setValue(const void* data, Uint32 bytes) setErrorCode(ErrState); return -1; } - if (! isInsertOp() && ! isUpdateOp()) { + if (! isInsertOp() && ! isUpdateOp() && ! isWriteOp()) { setErrorCode(ErrUsage); return -1; } @@ -466,11 +488,12 @@ NdbBlob::setValue(const void* data, Uint32 bytes) theGetSetBytes = bytes; if (isInsertOp()) { // write inline part now - if (theSetBuf != 0) { - unsigned n = theGetSetBytes; + if (theSetBuf != NULL) { + Uint32 n = theGetSetBytes; if (n > theInlineSize) n = theInlineSize; - if (writeDataPrivate(0, theSetBuf, n) == -1) + assert(thePos == 0); + if (writeDataPrivate(theSetBuf, n) == -1) return -1; } else { theNullFlag = true; @@ -555,7 +578,7 @@ NdbBlob::getLength(Uint64& len) int NdbBlob::truncate(Uint64 length) { - DBG("truncate length=" << length); + DBG("truncate [in] length=" << length); if (theNullFlag == -1) { setErrorCode(ErrState); return -1; @@ -573,7 +596,10 @@ NdbBlob::truncate(Uint64 length) } theLength = length; theHeadInlineUpdateFlag = true; + if (thePos > length) + thePos = length; } + DBG("truncate [out]"); return 0; } @@ -610,32 +636,20 @@ NdbBlob::setPos(Uint64 pos) int NdbBlob::readData(void* data, Uint32& bytes) { - if (readData(thePos, data, bytes) == -1) - return -1; - thePos += bytes; - assert(thePos <= theLength); - return 0; -} - -int -NdbBlob::readData(Uint64 pos, void* data, Uint32& bytes) -{ if (theState != Active) { setErrorCode(ErrState); return -1; } char* buf = static_cast<char*>(data); - return readDataPrivate(pos, buf, bytes); + return readDataPrivate(buf, bytes); } int -NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) +NdbBlob::readDataPrivate(char* buf, Uint32& bytes) { - DBG("readData pos=" << pos << " bytes=" << bytes); - if (pos > theLength) { - setErrorCode(ErrSeek); - return -1; - } + DBG("readData [in] bytes=" << bytes); + assert(thePos <= theLength); + Uint64 pos = thePos; if (bytes > theLength - pos) bytes = theLength - pos; Uint32 len = bytes; @@ -665,7 +679,6 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) if (readParts(thePartBuf.data, part, 1) == -1) return -1; // need result now - DBG("execute pending part reads"); if (executePendingBlobReads() == -1) return -1; Uint32 n = thePartSize - off; @@ -699,7 +712,6 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) if (readParts(thePartBuf.data, part, 1) == -1) return -1; // need result now - DBG("execute pending part reads"); if (executePendingBlobReads() == -1) return -1; memcpy(buf, thePartBuf.data, len); @@ -709,38 +721,29 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes) len -= n; } assert(len == 0); - return 0; -} - -int -NdbBlob::writeData(const void* data, Uint32 bytes) -{ - if (writeData(thePos, data, bytes) == -1) - return -1; - thePos += bytes; + thePos = pos; assert(thePos <= theLength); + DBG("readData [out]"); return 0; } int -NdbBlob::writeData(Uint64 pos, const void* data, Uint32 bytes) +NdbBlob::writeData(const void* data, Uint32 bytes) { if (theState != Active) { setErrorCode(ErrState); return -1; } const char* buf = static_cast<const char*>(data); - return writeDataPrivate(pos, buf, bytes); + return writeDataPrivate(buf, bytes); } int -NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) +NdbBlob::writeDataPrivate(const char* buf, Uint32 bytes) { - DBG("writeData pos=" << pos << " bytes=" << bytes); - if (pos > theLength) { - setErrorCode(ErrSeek); - return -1; - } + DBG("writeData [in] bytes=" << bytes); + assert(thePos <= theLength); + Uint64 pos = thePos; Uint32 len = bytes; // any write makes blob not NULL if (theNullFlag) { @@ -771,14 +774,12 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) if (off != 0) { DBG("partial first block pos=" << pos << " len=" << len); // flush writes to guarantee correct read - DBG("execute pending part writes"); if (executePendingBlobWrites() == -1) return -1; Uint32 part = (pos - theInlineSize) / thePartSize; if (readParts(thePartBuf.data, part, 1) == -1) return -1; // need result now - DBG("execute pending part reafs"); if (executePendingBlobReads() == -1) return -1; Uint32 n = thePartSize - off; @@ -822,13 +823,11 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) Uint32 part = (pos - theInlineSize) / thePartSize; if (theLength > pos + len) { // flush writes to guarantee correct read - DBG("execute pending part writes"); if (executePendingBlobWrites() == -1) return -1; if (readParts(thePartBuf.data, part, 1) == -1) return -1; // need result now - DBG("execute pending part reads"); if (executePendingBlobReads() == -1) return -1; memcpy(thePartBuf.data, buf, len); @@ -855,14 +854,16 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes) theLength = pos; theHeadInlineUpdateFlag = true; } - DBG("writeData out"); + thePos = pos; + assert(thePos <= theLength); + DBG("writeData [out]"); return 0; } int NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) { - DBG("readParts part=" << part << " count=" << count); + DBG("readParts [in] part=" << part << " count=" << count); Uint32 n = 0; while (n < count) { NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); @@ -873,6 +874,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) setErrorCode(tOp); return -1; } + tOp->m_abortOption = AbortOnError; buf += thePartSize; n++; thePendingBlobOps |= (1 << NdbOperation::ReadRequest); @@ -884,7 +886,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) int NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) { - DBG("insertParts part=" << part << " count=" << count); + DBG("insertParts [in] part=" << part << " count=" << count); Uint32 n = 0; while (n < count) { NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); @@ -895,6 +897,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) setErrorCode(tOp); return -1; } + tOp->m_abortOption = AbortOnError; buf += thePartSize; n++; thePendingBlobOps |= (1 << NdbOperation::InsertRequest); @@ -906,7 +909,7 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count) int NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count) { - DBG("updateParts part=" << part << " count=" << count); + DBG("updateParts [in] part=" << part << " count=" << count); Uint32 n = 0; while (n < count) { NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); @@ -917,6 +920,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count) setErrorCode(tOp); return -1; } + tOp->m_abortOption = AbortOnError; buf += thePartSize; n++; thePendingBlobOps |= (1 << NdbOperation::UpdateRequest); @@ -928,7 +932,7 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count) int NdbBlob::deleteParts(Uint32 part, Uint32 count) { - DBG("deleteParts part=" << part << " count=" << count); + DBG("deleteParts [in] part=" << part << " count=" << count); Uint32 n = 0; while (n < count) { NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); @@ -938,6 +942,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count) setErrorCode(tOp); return -1; } + tOp->m_abortOption = AbortOnError; n++; thePendingBlobOps |= (1 << NdbOperation::DeleteRequest); theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest); @@ -945,6 +950,59 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count) return 0; } +/* + * Number of blob parts not known. Used to check for race condition + * when writeTuple is used for insert. Deletes all parts found. + */ +int +NdbBlob::deletePartsUnknown(Uint32 part) +{ + DBG("deletePartsUnknown [in] part=" << part << " count=all"); + static const unsigned maxbat = 256; + static const unsigned minbat = 1; + unsigned bat = minbat; + NdbOperation* tOpList[maxbat]; + Uint32 count = 0; + while (true) { + Uint32 n; + n = 0; + while (n < bat) { + NdbOperation*& tOp = tOpList[n]; // ref + tOp = theNdbCon->getNdbOperation(theBlobTable); + if (tOp == NULL || + tOp->deleteTuple() == -1 || + setPartKeyValue(tOp, part + count + n) == -1) { + setErrorCode(tOp); + return -1; + } + tOp->m_abortOption = IgnoreError; + n++; + } + DBG("deletePartsUnknown: executeNoBlobs [in] bat=" << bat); + if (theNdbCon->executeNoBlobs(NoCommit) == -1) + return -1; + DBG("deletePartsUnknown: executeNoBlobs [out]"); + n = 0; + while (n < bat) { + NdbOperation* tOp = tOpList[n]; + if (tOp->theError.code != 0) { + if (tOp->theError.code != 626) { + setErrorCode(tOp); + return -1; + } + // first non-existent part + DBG("deletePartsUnknown [out] count=" << count); + return 0; + } + n++; + count++; + } + bat *= 4; + if (bat > maxbat) + bat = maxbat; + } +} + // pending ops int @@ -952,8 +1010,10 @@ NdbBlob::executePendingBlobReads() { Uint8 flags = (1 << NdbOperation::ReadRequest); if (thePendingBlobOps & flags) { + DBG("executePendingBlobReads: executeNoBlobs [in]"); if (theNdbCon->executeNoBlobs(NoCommit) == -1) return -1; + DBG("executePendingBlobReads: executeNoBlobs [out]"); thePendingBlobOps = 0; theNdbCon->thePendingBlobOps = 0; } @@ -965,8 +1025,10 @@ NdbBlob::executePendingBlobWrites() { Uint8 flags = 0xFF & ~(1 << NdbOperation::ReadRequest); if (thePendingBlobOps & flags) { + DBG("executePendingBlobWrites: executeNoBlobs [in]"); if (theNdbCon->executeNoBlobs(NoCommit) == -1) return -1; + DBG("executePendingBlobWrites: executeNoBlobs [out]"); thePendingBlobOps = 0; theNdbCon->thePendingBlobOps = 0; } @@ -978,10 +1040,10 @@ NdbBlob::executePendingBlobWrites() int NdbBlob::invokeActiveHook() { - DBG("invokeActiveHook"); + DBG("invokeActiveHook [in]"); assert(theState == Active && theActiveHook != NULL); int ret = (*theActiveHook)(this, theActiveHookArg); - DBG("invokeActiveHook ret=" << ret); + DBG("invokeActiveHook [out] ret=" << ret); if (ret != 0) { // no error is set on blob level return -1; @@ -1007,7 +1069,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* theTable = anOp->m_currentTable; theAccessTable = anOp->m_accessTable; theColumn = aColumn; - DBG("atPrepare"); + DBG("atPrepare [in]"); NdbDictionary::Column::Type partType = NdbDictionary::Column::Undefined; switch (theColumn->getType()) { case NdbDictionary::Column::Blob: @@ -1046,6 +1108,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2); theAccessKeyBuf.alloc(theAccessTable->m_sizeOfKeysInWords << 2); theHeadInlineBuf.alloc(sizeof(Head) + theInlineSize); + theHeadInlineCopyBuf.alloc(sizeof(Head) + theInlineSize); thePartBuf.alloc(thePartSize); theHead = (Head*)theHeadInlineBuf.data; theInlineData = theHeadInlineBuf.data + sizeof(Head); @@ -1080,6 +1143,12 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* theNullFlag = true; theLength = 0; } + if (isWriteOp()) { + // becomes NULL unless set before execute + theNullFlag = true; + theLength = 0; + theHeadInlineUpdateFlag = true; + } supportedOp = true; } if (isScanOp()) { @@ -1093,19 +1162,21 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* return -1; } setState(Prepared); - DBG("atPrepare out"); + DBG("atPrepare [out]"); return 0; } /* * Before execute of prepared operation. May add new operations before * this one. May ask that this operation and all before it (a "batch") - * is executed immediately in no-commit mode. + * is executed immediately in no-commit mode. In this case remaining + * prepared operations are saved in a separate list. They are added + * back after postExecute. */ int NdbBlob::preExecute(ExecType anExecType, bool& batch) { - DBG("preExecute"); + DBG("preExecute [in]"); if (theState == Invalid) return -1; assert(theState == Prepared); @@ -1120,11 +1191,11 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) if (isInsertOp()) { if (theSetFlag && theGetSetBytes > theInlineSize) { // add ops to write rest of a setValue - assert(theSetBuf != 0); - Uint64 pos = theInlineSize; + assert(theSetBuf != NULL); const char* buf = theSetBuf + theInlineSize; Uint32 bytes = theGetSetBytes - theInlineSize; - if (writeDataPrivate(pos, buf, bytes) == -1) + assert(thePos == theInlineSize); + if (writeDataPrivate(buf, bytes) == -1) return -1; if (theHeadInlineUpdateFlag) { // add an operation to update head+inline @@ -1136,11 +1207,12 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) setErrorCode(ErrAbort); return -1; } + DBG("add op to update head+inline"); } } } if (isTableOp()) { - if (isUpdateOp() || isDeleteOp()) { + if (isUpdateOp() || isWriteOp() || isDeleteOp()) { // add operation before this one to read head+inline NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp); if (tOp == NULL || @@ -1150,8 +1222,13 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) setErrorCode(tOp); return -1; } + if (isWriteOp()) { + tOp->m_abortOption = IgnoreError; + } + theHeadInlineReadOp = tOp; // execute immediately batch = true; + DBG("add op before to read head+inline"); } } if (isIndexOp()) { @@ -1170,7 +1247,7 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) return -1; } } else { - NdbOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp); + NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp); if (tOp == NULL || tOp->readTuple() == -1 || setAccessKeyValue(tOp) == -1 || @@ -1180,6 +1257,7 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) } } } + DBG("added op before to read table key"); if (isUpdateOp() || isDeleteOp()) { // add op before this one to read head+inline via index NdbIndexOperation* tOp = theNdbCon->getNdbIndexOperation(theAccessTable->m_index, theTable, theNdbOp); @@ -1190,15 +1268,43 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) setErrorCode(tOp); return -1; } + if (isWriteOp()) { + tOp->m_abortOption = IgnoreError; + } + theHeadInlineReadOp = tOp; // execute immediately batch = true; + DBG("added index op before to read head+inline"); + } + if (isWriteOp()) { + // XXX until IgnoreError fixed for index op + batch = true; + } + } + if (isWriteOp()) { + if (theSetFlag) { + // write head+inline now + theNullFlag = true; + theLength = 0; + if (theSetBuf != NULL) { + Uint32 n = theGetSetBytes; + if (n > theInlineSize) + n = theInlineSize; + assert(thePos == 0); + if (writeDataPrivate(theSetBuf, n) == -1) + return -1; + } + if (setHeadInlineValue(theNdbOp) == -1) + return -1; + // the read op before us may overwrite + theHeadInlineCopyBuf.copyfrom(theHeadInlineBuf); } } if (theActiveHook != NULL) { // need blob head for callback batch = true; } - DBG("preExecute out batch=" << batch); + DBG("preExecute [out] batch=" << batch); return 0; } @@ -1211,15 +1317,16 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) int NdbBlob::postExecute(ExecType anExecType) { - DBG("postExecute type=" << anExecType); + DBG("postExecute [in] type=" << anExecType); if (theState == Invalid) return -1; if (theState == Active) { setState(anExecType == NoCommit ? Active : Closed); - DBG("postExecute skip"); + DBG("postExecute [skip]"); return 0; } assert(theState == Prepared); + setState(anExecType == NoCommit ? Active : Closed); assert(isKeyOp()); if (isIndexOp()) { NdbBlob* tFirstBlob = theNdbOp->theBlobList; @@ -1231,22 +1338,13 @@ NdbBlob::postExecute(ExecType anExecType) } if (isReadOp()) { getHeadFromRecAttr(); - if (theGetFlag && theGetSetBytes > 0) { - // copy inline bytes to user buffer - assert(theGetBuf != NULL); - unsigned n = theGetSetBytes; - if (n > theInlineSize) - n = theInlineSize; - memcpy(theGetBuf, theInlineData, n); - } - if (theGetFlag && theGetSetBytes > theInlineSize) { - // add ops to read rest of a getValue - assert(anExecType == NoCommit); - assert(theGetBuf != 0); - Uint64 pos = theInlineSize; - char* buf = theGetBuf + theInlineSize; - Uint32 bytes = theGetSetBytes - theInlineSize; - if (readDataPrivate(pos, buf, bytes) == -1) + if (setPos(0) == -1) + return -1; + if (theGetFlag) { + assert(theGetSetBytes == 0 || theGetBuf != 0); + assert(theGetSetBytes <= theInlineSize || anExecType == NoCommit); + Uint32 bytes = theGetSetBytes; + if (readDataPrivate(theGetBuf, bytes) == -1) return -1; } } @@ -1255,10 +1353,11 @@ NdbBlob::postExecute(ExecType anExecType) getHeadFromRecAttr(); if (theSetFlag) { // setValue overwrites everything - if (theSetBuf != 0) { + if (theSetBuf != NULL) { if (truncate(0) == -1) return -1; - if (writeDataPrivate(0, theSetBuf, theGetSetBytes) == -1) + assert(thePos == 0); + if (writeDataPrivate(theSetBuf, theGetSetBytes) == -1) return -1; } else { if (setNull() == -1) @@ -1266,6 +1365,57 @@ NdbBlob::postExecute(ExecType anExecType) } } } + if (isWriteOp() && isTableOp()) { + assert(anExecType == NoCommit); + if (theHeadInlineReadOp->theError.code == 0) { + int tNullFlag = theNullFlag; + Uint64 tLength = theLength; + Uint64 tPos = thePos; + getHeadFromRecAttr(); + DBG("tuple found"); + if (truncate(0) == -1) + return -1; + // restore previous head+inline + theHeadInlineBuf.copyfrom(theHeadInlineCopyBuf); + theNullFlag = tNullFlag; + theLength = tLength; + thePos = tPos; + } else { + if (theHeadInlineReadOp->theError.code != 626) { + setErrorCode(theHeadInlineReadOp); + return -1; + } + DBG("tuple not found"); + /* + * Read found no tuple but it is possible that a tuple was + * created after the read by another transaction. Delete all + * blob parts which may exist. + */ + if (deletePartsUnknown(0) == -1) + return -1; + } + if (theSetFlag && theGetSetBytes > theInlineSize) { + assert(theSetBuf != NULL); + const char* buf = theSetBuf + theInlineSize; + Uint32 bytes = theGetSetBytes - theInlineSize; + assert(thePos == theInlineSize); + if (writeDataPrivate(buf, bytes) == -1) + return -1; + } + } + if (isWriteOp() && isIndexOp()) { + // XXX until IgnoreError fixed for index op + if (deletePartsUnknown(0) == -1) + return -1; + if (theSetFlag && theGetSetBytes > theInlineSize) { + assert(theSetBuf != NULL); + const char* buf = theSetBuf + theInlineSize; + Uint32 bytes = theGetSetBytes - theInlineSize; + assert(thePos == theInlineSize); + if (writeDataPrivate(buf, bytes) == -1) + return -1; + } + } if (isDeleteOp()) { assert(anExecType == NoCommit); getHeadFromRecAttr(); @@ -1278,7 +1428,19 @@ NdbBlob::postExecute(ExecType anExecType) if (invokeActiveHook() == -1) return -1; } - DBG("postExecute out"); + if (anExecType == NoCommit && theHeadInlineUpdateFlag) { + NdbOperation* tOp = theNdbCon->getNdbOperation(theTable); + if (tOp == NULL || + tOp->updateTuple() == -1 || + setTableKeyValue(tOp) == -1 || + setHeadInlineValue(tOp) == -1) { + setErrorCode(ErrAbort); + return -1; + } + tOp->m_abortOption = AbortOnError; + DBG("added op to update head+inline"); + } + DBG("postExecute [out]"); return 0; } @@ -1289,12 +1451,12 @@ NdbBlob::postExecute(ExecType anExecType) int NdbBlob::preCommit() { - DBG("preCommit"); + DBG("preCommit [in]"); if (theState == Invalid) return -1; assert(theState == Active); assert(isKeyOp()); - if (isInsertOp() || isUpdateOp()) { + if (isInsertOp() || isUpdateOp() || isWriteOp()) { if (theHeadInlineUpdateFlag) { // add an operation to update head+inline NdbOperation* tOp = theNdbCon->getNdbOperation(theTable); @@ -1305,9 +1467,11 @@ NdbBlob::preCommit() setErrorCode(ErrAbort); return -1; } + tOp->m_abortOption = AbortOnError; + DBG("added op to update head+inline"); } } - DBG("preCommit out"); + DBG("preCommit [out]"); return 0; } @@ -1317,13 +1481,10 @@ NdbBlob::preCommit() int NdbBlob::atNextResult() { - DBG("atNextResult"); + DBG("atNextResult [in]"); if (theState == Invalid) return -1; assert(isScanOp()); - getHeadFromRecAttr(); - // reset position - thePos = 0; // get primary key { Uint32* data = (Uint32*)theKeyBuf.data; unsigned size = theTable->m_sizeOfKeysInWords; @@ -1332,26 +1493,14 @@ NdbBlob::atNextResult() return -1; } } - if (! theNullFlag) { - if (theGetFlag && theGetSetBytes > 0) { - // copy inline bytes to user buffer - assert(theGetBuf != NULL); - unsigned n = theGetSetBytes; - if (n > theLength) - n = theLength; - if (n > theInlineSize) - n = theInlineSize; - memcpy(theGetBuf, theInlineData, n); - } - if (theGetFlag && theGetSetBytes > theInlineSize && theLength > theInlineSize) { - // add ops to read rest of a getValue - assert(theGetBuf != 0); - Uint64 pos = theInlineSize; - char* buf = theGetBuf + theInlineSize; - Uint32 bytes = theGetSetBytes - theInlineSize; - if (readDataPrivate(pos, buf, bytes) == -1) - return -1; - } + getHeadFromRecAttr(); + if (setPos(0) == -1) + return -1; + if (theGetFlag) { + assert(theGetSetBytes == 0 || theGetBuf != 0); + Uint32 bytes = theGetSetBytes; + if (readDataPrivate(theGetBuf, bytes) == -1) + return -1; } setState(Active); // activation callback @@ -1359,7 +1508,7 @@ NdbBlob::atNextResult() if (invokeActiveHook() == -1) return -1; } - DBG("atNextResult out"); + DBG("atNextResult [out]"); return 0; } @@ -1444,7 +1593,8 @@ operator<<(NdbOut& out, const NdbBlob& blob) ndbout << dec << " n=" << blob.theNullFlag;; ndbout << dec << " l=" << blob.theLength; ndbout << dec << " p=" << blob.thePos; - ndbout << dec << " u=" << (Uint32) blob.theHeadInlineUpdateFlag; + ndbout << dec << " u=" << (Uint32)blob.theHeadInlineUpdateFlag; + ndbout << dec << " g=" << (Uint32)blob.theGetSetBytes; return out; } #endif diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index 1457792cf28..4f6468eb4ae 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -170,12 +170,14 @@ Remark: Sets an error code on the connection object from an operation object. *****************************************************************************/ void -NdbConnection::setOperationErrorCodeAbort(int error) +NdbConnection::setOperationErrorCodeAbort(int error, int abortOption) { DBUG_ENTER("NdbConnection::setOperationErrorCodeAbort"); + if (abortOption == -1) + abortOption = m_abortOption; if (theTransactionIsStarted == false) { theCommitStatus = Aborted; - } else if ((m_abortOption == AbortOnError) && + } else if ((abortOption == AbortOnError) && (theCommitStatus != Committed) && (theCommitStatus != Aborted)) { theCommitStatus = NeedAbort; @@ -335,8 +337,16 @@ NdbConnection::execute(ExecType aTypeOfExec, tOp = tOp->next(); } } + if (executeNoBlobs(tExecType, abortOption, forceSend) == -1) ret = -1; +#ifndef VM_TRACE + // can happen in complex abort cases + theFirstOpInList = theLastOpInList = NULL; +#else + assert(theFirstOpInList == NULL && theLastOpInList == NULL); +#endif + { NdbOperation* tOp = theCompletedFirstOp; while (tOp != NULL) { @@ -360,6 +370,7 @@ NdbConnection::execute(ExecType aTypeOfExec, theLastOpInList->next(tRestOp); theLastOpInList = tLastOp; } + assert(theFirstOpInList == NULL || tExecType == NoCommit); } while (theFirstOpInList != NULL || tExecType != aTypeOfExec); DBUG_RETURN(ret); @@ -1806,11 +1817,12 @@ Parameters: aErrorCode: The error code. Remark: An operation was completed with failure. *******************************************************************************/ int -NdbConnection::OpCompleteFailure(Uint8 abortOption) +NdbConnection::OpCompleteFailure(Uint8 abortOption, bool setFailure) { Uint32 tNoComp = theNoOfOpCompleted; Uint32 tNoSent = theNoOfOpSent; - theCompletionStatus = NdbConnection::CompletedFailure; + if (setFailure) + theCompletionStatus = NdbConnection::CompletedFailure; tNoComp++; theNoOfOpCompleted = tNoComp; if (tNoComp == tNoSent) { diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index cf51a30fe0b..304d1b904d4 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -47,13 +47,13 @@ * Column */ NdbColumnImpl::NdbColumnImpl() - : NdbDictionary::Column(* this), m_facade(this) + : NdbDictionary::Column(* this), m_attrId(-1), m_facade(this) { init(); } NdbColumnImpl::NdbColumnImpl(NdbDictionary::Column & f) - : NdbDictionary::Column(* this), m_facade(&f) + : NdbDictionary::Column(* this), m_attrId(-1), m_facade(&f) { init(); } @@ -93,8 +93,7 @@ NdbColumnImpl::init(Type t) { // do not use default_charset_info as it may not be initialized yet // use binary collation until NDB tests can handle charsets - CHARSET_INFO* default_cs = &my_charset_latin1_bin; - m_attrId = -1; + CHARSET_INFO* default_cs = &my_charset_bin; m_type = t; switch (m_type) { case Tinyint: diff --git a/ndb/src/ndbapi/NdbIndexOperation.cpp b/ndb/src/ndbapi/NdbIndexOperation.cpp index 9abde639914..3f174a61b64 100644 --- a/ndb/src/ndbapi/NdbIndexOperation.cpp +++ b/ndb/src/ndbapi/NdbIndexOperation.cpp @@ -71,6 +71,7 @@ NdbIndexOperation::indxInit(const NdbIndexImpl * anIndex, return -1; } m_theIndex = anIndex; + m_thePrimaryTable = aTable; m_accessTable = anIndex->m_table; m_theIndexLen = 0; m_theNoOfIndexDefined = 0; @@ -102,6 +103,12 @@ int NdbIndexOperation::readTuple(NdbOperation::LockMode lm) }; } +int NdbIndexOperation::insertTuple() +{ + setErrorCode(4200); + return -1; +} + int NdbIndexOperation::readTuple() { // First check that index is unique @@ -341,12 +348,11 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, theDistrGroupIndicator = 1; }//if /************************************************************************** - * If the operation is an insert request and the attribute is stored then + * If the operation is a write request and the attribute is stored then * we also set the value in the stored part through putting the * information in the INDXATTRINFO signals. *************************************************************************/ - if ((tOpType == InsertRequest) || - (tOpType == WriteRequest)) { + if ((tOpType == WriteRequest)) { if (!tAttrInfo->m_indexOnly){ // invalid data can crash kernel if (cs != NULL && @@ -357,7 +363,13 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, goto equal_error4; Uint32 ahValue; Uint32 sz = totalSizeInWords; - AttributeHeader::init(&ahValue, tAttrId, sz); + /* + * XXX should be linked in metadata but cannot now because + * things can be defined in arbitrary order + */ + const NdbColumnImpl* primaryCol = m_thePrimaryTable->getColumn(tAttrInfo->m_name.c_str()); + assert(primaryCol != NULL); + AttributeHeader::init(&ahValue, primaryCol->m_attrId, sz); insertATTRINFO( ahValue ); insertATTRINFOloop((Uint32*)aValueToWrite, sizeInWords); if (bitsInLastWord != 0) { @@ -369,7 +381,6 @@ int NdbIndexOperation::equal_impl(const NdbColumnImpl* tAttrInfo, }//if }//if }//if - /************************************************************************** * Store the Key information in the TCINDXREQ and INDXKEYINFO signals. *************************************************************************/ @@ -734,13 +745,10 @@ NdbIndexOperation::receiveTCINDXREF( NdbApiSignal* aSignal) }//if theStatus = Finished; - + theNdbCon->theReturnStatus = NdbConnection::ReturnFailure; Uint32 errorCode = tcIndxRef->errorCode; theError.code = errorCode; theNdbCon->setOperationErrorCodeAbort(errorCode); return theNdbCon->OpCompleteFailure(theNdbCon->m_abortOption); }//NdbIndexOperation::receiveTCINDXREF() - - - diff --git a/ndb/src/ndbapi/NdbOperation.cpp b/ndb/src/ndbapi/NdbOperation.cpp index b0b95d0ff43..88d8a000d50 100644 --- a/ndb/src/ndbapi/NdbOperation.cpp +++ b/ndb/src/ndbapi/NdbOperation.cpp @@ -78,7 +78,8 @@ NdbOperation::NdbOperation(Ndb* aNdb) : m_tcReqGSN(GSN_TCKEYREQ), m_keyInfoGSN(GSN_KEYINFO), m_attrInfoGSN(GSN_ATTRINFO), - theBlobList(NULL) + theBlobList(NULL), + m_abortOption(-1) { theReceiver.init(NdbReceiver::NDB_OPERATION, this); theError.code = 0; @@ -167,6 +168,7 @@ NdbOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection){ theTotalNrOfKeyWordInSignal = 8; theMagicNumber = 0xABCDEF01; theBlobList = NULL; + m_abortOption = -1; tSignal = theNdb->getSignal(); if (tSignal == NULL) diff --git a/ndb/src/ndbapi/NdbOperationExec.cpp b/ndb/src/ndbapi/NdbOperationExec.cpp index f1338ae01e4..13664794dcd 100644 --- a/ndb/src/ndbapi/NdbOperationExec.cpp +++ b/ndb/src/ndbapi/NdbOperationExec.cpp @@ -191,7 +191,8 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId) Uint8 tDirtyIndicator = theDirtyIndicator; OperationType tOperationType = theOperationType; Uint32 tTupKeyLen = theTupKeyLen; - Uint8 abortOption = theNdbCon->m_abortOption; + Uint8 abortOption = + m_abortOption != (Int8)-1 ? m_abortOption : theNdbCon->m_abortOption; tcKeyReq->setDirtyFlag(tReqInfo, tDirtyIndicator); tcKeyReq->setOperationType(tReqInfo, tOperationType); @@ -541,17 +542,20 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal) return -1; }//if - AbortOption ao = (AbortOption)theNdbCon->m_abortOption; + AbortOption ao = (AbortOption) + (m_abortOption != (Int8)-1 ? m_abortOption : theNdbCon->m_abortOption); theReceiver.m_received_result_length = ~0; theStatus = Finished; - theNdbCon->theReturnStatus = NdbConnection::ReturnFailure; + // blobs want this + if (m_abortOption != IgnoreError) + theNdbCon->theReturnStatus = NdbConnection::ReturnFailure; theError.code = aSignal->readData(4); - theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4)); + theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4), m_abortOption); if(theOperationType != ReadRequest || !theSimpleIndicator) // not simple read - return theNdbCon->OpCompleteFailure(ao); + return theNdbCon->OpCompleteFailure(ao, m_abortOption != IgnoreError); /** * If TCKEYCONF has arrived diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index fd63ce96f25..86bac7deb16 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -612,7 +612,7 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ LinearSectionPtr ptr[3]; ptr[0].p = prep_array; ptr[0].sz = cnt; - ret = tp->sendFragmentedSignal(&tSignal, nodeId, ptr, 1); + ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); } else { tSignal.setLength(4+cnt); ret = tp->sendSignal(&tSignal, nodeId); @@ -803,7 +803,7 @@ NdbScanOperation::doSendScan(int aProcessorId) LinearSectionPtr ptr[3]; ptr[0].p = m_prepared_receivers; ptr[0].sz = theParallelism; - if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) { + if (tp->sendSignal(tSignal, aProcessorId, ptr, 1) == -1) { setErrorCode(4002); return -1; } diff --git a/ndb/src/ndbapi/TransporterFacade.cpp b/ndb/src/ndbapi/TransporterFacade.cpp index bc24110ea14..dfb090c8416 100644 --- a/ndb/src/ndbapi/TransporterFacade.cpp +++ b/ndb/src/ndbapi/TransporterFacade.cpp @@ -34,6 +34,7 @@ #include <NdbConfig.h> #include <ndb_version.h> #include <SignalLoggerManager.hpp> +#include <kernel/ndb_limits.h> //#define REPORT_TRANSPORTER //#define API_TRACE; @@ -475,7 +476,8 @@ TransporterFacade::TransporterFacade() : theTransporterRegistry(0), theStopReceive(0), theSendThread(NULL), - theReceiveThread(NULL) + theReceiveThread(NULL), + m_fragmented_signal_id(0) { theOwnId = 0; @@ -833,9 +835,129 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){ return (ss == SEND_OK ? 0 : -1); } +#define CHUNK_SZ NDB_SECTION_SEGMENT_SZ*64 // related to MAX_MESSAGE_SIZE int TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, - LinearSectionPtr ptr[3], Uint32 secs){ + LinearSectionPtr ptr[3], Uint32 secs) +{ + if(getIsNodeSendable(aNode) != true) + return -1; + +#ifdef API_TRACE + if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){ + Uint32 tmp = aSignal->theSendersBlockRef; + aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId); + signalLogger.sendSignal(* aSignal, + 1, + aSignal->getDataPtrSend(), + aNode, + ptr, secs); + aSignal->theSendersBlockRef = tmp; + } +#endif + + NdbApiSignal tmp_signal(*(SignalHeader*)aSignal); + LinearSectionPtr tmp_ptr[3]; + Uint32 unique_id= m_fragmented_signal_id++; // next unique id + unsigned i; + for (i= 0; i < secs; i++) + tmp_ptr[i]= ptr[i]; + + unsigned start_i= 0; + unsigned chunk_sz= 0; + unsigned fragment_info= 0; + Uint32 *tmp_data= tmp_signal.getDataPtrSend(); + for (i= 0; i < secs;) { + unsigned save_sz= tmp_ptr[i].sz; + tmp_data[i-start_i]= i; + if (chunk_sz + save_sz > CHUNK_SZ) { + // truncate + unsigned send_sz= CHUNK_SZ - chunk_sz; + if (i != start_i) // first piece of a new section has to be a multiple of NDB_SECTION_SEGMENT_SZ + { + send_sz= + NDB_SECTION_SEGMENT_SZ + *(send_sz+NDB_SECTION_SEGMENT_SZ-1) + /NDB_SECTION_SEGMENT_SZ; + if (send_sz > save_sz) + send_sz= save_sz; + } + tmp_ptr[i].sz= send_sz; + + if (fragment_info < 2) // 1 = first fragment, 2 = middle fragments + fragment_info++; + + // send tmp_signal + tmp_data[i-start_i+1]= unique_id; + tmp_signal.setLength(i-start_i+2); + tmp_signal.m_fragmentInfo= fragment_info; + tmp_signal.m_noOfSections= i-start_i+1; + // do prepare send + { + SendStatus ss = theTransporterRegistry->prepareSend + (&tmp_signal, + 1, /*JBB*/ + tmp_data, + aNode, + &tmp_ptr[start_i]); + assert(ss != SEND_MESSAGE_TOO_BIG); + if (ss != SEND_OK) return -1; + } + // setup variables for next signal + start_i= i; + chunk_sz= 0; + tmp_ptr[i].sz= save_sz-send_sz; + tmp_ptr[i].p+= send_sz; + if (tmp_ptr[i].sz == 0) + i++; + } + else + { + chunk_sz+=save_sz; + i++; + } + } + + unsigned a_sz= aSignal->getLength(); + + if (fragment_info > 0) { + // update the original signal to include section info + Uint32 *a_data= aSignal->getDataPtrSend(); + unsigned tmp_sz= i-start_i; + memcpy(a_data+a_sz, + tmp_data, + tmp_sz*sizeof(Uint32)); + a_data[a_sz+tmp_sz]= unique_id; + aSignal->setLength(a_sz+tmp_sz+1); + + // send last fragment + aSignal->m_fragmentInfo= 3; // 3 = last fragment + aSignal->m_noOfSections= i-start_i; + } else { + aSignal->m_noOfSections= secs; + } + + // send aSignal + int ret; + { + SendStatus ss = theTransporterRegistry->prepareSend + (aSignal, + 1/*JBB*/, + aSignal->getDataPtrSend(), + aNode, + &tmp_ptr[start_i]); + assert(ss != SEND_MESSAGE_TOO_BIG); + ret = (ss == SEND_OK ? 0 : -1); + } + aSignal->m_noOfSections = 0; + aSignal->m_fragmentInfo = 0; + aSignal->setLength(a_sz); + return ret; +} + +int +TransporterFacade::sendSignal(NdbApiSignal* aSignal, NodeId aNode, + LinearSectionPtr ptr[3], Uint32 secs){ aSignal->m_noOfSections = secs; if(getIsNodeSendable(aNode) == true){ #ifdef API_TRACE @@ -865,39 +987,6 @@ TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, return -1; } - - -int -TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal, - NodeId aNode, - LinearSectionPtr ptr[3], - Uint32 secs){ - aSignal->m_noOfSections = secs; - -#ifdef API_TRACE - if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){ - Uint32 tmp = aSignal->theSendersBlockRef; - aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId); - signalLogger.sendSignal(* aSignal, - 1, - aSignal->getDataPtrSend(), - aNode, - ptr, secs); - aSignal->theSendersBlockRef = tmp; - } -#endif - SendStatus ss = theTransporterRegistry->prepareSend(aSignal, - 1, // JBB - aSignal->getDataPtrSend(), - aNode, - ptr); - assert(ss != SEND_MESSAGE_TOO_BIG); - aSignal->m_noOfSections = 0; - return (ss == SEND_OK ? 0 : -1); -} - - - /****************************************************************************** * CONNECTION METHODS Etc ******************************************************************************/ diff --git a/ndb/src/ndbapi/TransporterFacade.hpp b/ndb/src/ndbapi/TransporterFacade.hpp index 5f473975585..5680e3a6f03 100644 --- a/ndb/src/ndbapi/TransporterFacade.hpp +++ b/ndb/src/ndbapi/TransporterFacade.hpp @@ -69,14 +69,11 @@ public: // Only sends to nodes which are alive int sendSignal(NdbApiSignal * signal, NodeId nodeId); + int sendSignal(NdbApiSignal*, NodeId, + LinearSectionPtr ptr[3], Uint32 secs); int sendFragmentedSignal(NdbApiSignal*, NodeId, LinearSectionPtr ptr[3], Uint32 secs); - //Dirrrrty - int sendFragmentedSignalUnCond(NdbApiSignal*, NodeId, - LinearSectionPtr ptr[3], Uint32 secs); - - // Is node available for running transactions bool get_node_alive(NodeId nodeId) const; bool get_node_stopping(NodeId nodeId) const; @@ -224,7 +221,8 @@ private: } m_threads; Uint32 m_max_trans_id; - + Uint32 m_fragmented_signal_id; + /** * execute function */ diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c index 20661b89517..17a80082023 100644 --- a/ndb/src/ndbapi/ndberror.c +++ b/ndb/src/ndbapi/ndberror.c @@ -150,7 +150,7 @@ ErrorBundle ErrorCodes[] = { { 623, IS, "623" }, { 624, IS, "624" }, { 625, IS, "Out of memory in Ndb Kernel, index part" }, - { 826, IS, "826" }, + { 826, IS, "Too many tables and attributes (increase MaxNoOfAttributes)" }, { 827, IS, "Out of memory in Ndb Kernel, data part" }, { 832, IS, "832" }, |