diff options
Diffstat (limited to 'ndb/src/ndbapi')
-rw-r--r-- | ndb/src/ndbapi/Ndb.cpp | 11 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbApiSignal.cpp | 2 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbBlob.cpp | 6 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbConnection.cpp | 140 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbDictionaryImpl.cpp | 9 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbIndexOperation.cpp | 41 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperation.cpp | 10 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperationDefine.cpp | 28 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperationExec.cpp | 57 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperationInt.cpp | 4 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbOperationSearch.cpp | 3 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbReceiver.cpp | 14 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbScanOperation.cpp | 377 | ||||
-rw-r--r-- | ndb/src/ndbapi/Ndbif.cpp | 14 | ||||
-rw-r--r-- | ndb/src/ndbapi/Ndbinit.cpp | 11 | ||||
-rw-r--r-- | ndb/src/ndbapi/Ndblist.cpp | 14 | ||||
-rw-r--r-- | ndb/src/ndbapi/ndberror.c | 19 |
17 files changed, 472 insertions, 288 deletions
diff --git a/ndb/src/ndbapi/Ndb.cpp b/ndb/src/ndbapi/Ndb.cpp index be8ba86c817..be0445bceb3 100644 --- a/ndb/src/ndbapi/Ndb.cpp +++ b/ndb/src/ndbapi/Ndb.cpp @@ -330,7 +330,7 @@ Ndb::startTransaction(Uint32 aPriority, const char * keyData, Uint32 keyLen) { NdbConnection *trans= startTransactionLocal(aPriority, nodeId); DBUG_PRINT("exit",("start trans: 0x%x transid: 0x%llx", - trans, trans->getTransactionId())); + trans, trans ? trans->getTransactionId() : 0)); DBUG_RETURN(trans); } } else { @@ -371,6 +371,7 @@ Ndb::hupp(NdbConnection* pBuddyTrans) // We could not get a connection to the desired node // release the connection and return NULL closeTransaction(pCon); + theError.code = 4006; DBUG_RETURN(NULL); } pCon->setTransactionId(pBuddyTrans->getTransactionId()); @@ -1157,10 +1158,10 @@ const char * Ndb::getCatalogName() const void Ndb::setCatalogName(const char * a_catalog_name) { if (a_catalog_name) { - snprintf(theDataBase, sizeof(theDataBase), "%s", + BaseString::snprintf(theDataBase, sizeof(theDataBase), "%s", a_catalog_name ? a_catalog_name : ""); - int len = snprintf(prefixName, sizeof(prefixName), "%s%c%s%c", + int len = BaseString::snprintf(prefixName, sizeof(prefixName), "%s%c%s%c", theDataBase, table_name_separator, theDataBaseSchema, table_name_separator); prefixEnd = prefixName + (len < (int) sizeof(prefixName) ? len : @@ -1176,10 +1177,10 @@ const char * Ndb::getSchemaName() const void Ndb::setSchemaName(const char * a_schema_name) { if (a_schema_name) { - snprintf(theDataBaseSchema, sizeof(theDataBase), "%s", + BaseString::snprintf(theDataBaseSchema, sizeof(theDataBase), "%s", a_schema_name ? a_schema_name : ""); - int len = snprintf(prefixName, sizeof(prefixName), "%s%c%s%c", + int len = BaseString::snprintf(prefixName, sizeof(prefixName), "%s%c%s%c", theDataBase, table_name_separator, theDataBaseSchema, table_name_separator); prefixEnd = prefixName + (len < (int) sizeof(prefixName) ? len : diff --git a/ndb/src/ndbapi/NdbApiSignal.cpp b/ndb/src/ndbapi/NdbApiSignal.cpp index d7b2b74b2bf..a1d34896968 100644 --- a/ndb/src/ndbapi/NdbApiSignal.cpp +++ b/ndb/src/ndbapi/NdbApiSignal.cpp @@ -168,7 +168,7 @@ NdbApiSignal::setSignal(int aNdbSignalType) theTrace = TestOrd::TraceAPI; theReceiversBlockNumber = DBTC; theVerId_signalNumber = GSN_TC_COMMITREQ; - theLength = 5; + theLength = 3; } break; diff --git a/ndb/src/ndbapi/NdbBlob.cpp b/ndb/src/ndbapi/NdbBlob.cpp index 7939f54d846..feab95d8ca5 100644 --- a/ndb/src/ndbapi/NdbBlob.cpp +++ b/ndb/src/ndbapi/NdbBlob.cpp @@ -867,7 +867,7 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count) while (n < count) { NdbOperation* tOp = theNdbCon->getNdbOperation(theBlobTable); if (tOp == NULL || - tOp->readTuple() == -1 || + tOp->committedRead() == -1 || setPartKeyValue(tOp, part + n) == -1 || tOp->getValue((Uint32)3, buf) == NULL) { setErrorCode(tOp); @@ -1440,11 +1440,11 @@ NdbOut& operator<<(NdbOut& out, const NdbBlob& blob) { ndbout << dec << "o=" << blob.getOperationType(); - ndbout << dec << " s=" << blob.theState; + ndbout << dec << " s=" << (Uint32) blob.theState; ndbout << dec << " n=" << blob.theNullFlag;; ndbout << dec << " l=" << blob.theLength; ndbout << dec << " p=" << blob.thePos; - ndbout << dec << " u=" << blob.theHeadInlineUpdateFlag; + ndbout << dec << " u=" << (Uint32) blob.theHeadInlineUpdateFlag; return out; } #endif diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index 8ab0d13c67f..1457792cf28 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -83,6 +83,11 @@ NdbConnection::NdbConnection( Ndb* aNdb ) : theListState = NotInList; theError.code = 0; theId = theNdb->theNdbObjectIdMap->map(this); + +#define CHECK_SZ(mask, sz) assert((sizeof(mask)/sizeof(mask[0])) == sz) + + CHECK_SZ(m_db_nodes, NdbNodeBitmask::Size); + CHECK_SZ(m_failed_db_nodes, NdbNodeBitmask::Size); }//NdbConnection::NdbConnection() /***************************************************************************** @@ -308,8 +313,8 @@ NdbConnection::execute(ExecType aTypeOfExec, tPrepOp = tPrepOp->next(); } // save rest of prepared ops if batch - NdbOperation* tRestOp; - NdbOperation* tLastOp; + NdbOperation* tRestOp= 0; + NdbOperation* tLastOp= 0; if (tPrepOp != NULL) { tRestOp = tPrepOp->next(); tPrepOp->next(NULL); @@ -490,11 +495,6 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec, theListState = InPreparedList; tNdb->theNoOfPreparedTransactions = tnoOfPreparedTransactions + 1; - if(tCommitStatus == Committed){ - tCommitStatus = Started; - tTransactionIsStarted = false; - } - if ((tCommitStatus != Started) || (aTypeOfExec == Rollback)) { /***************************************************************************** @@ -503,7 +503,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec, * same action. ****************************************************************************/ if (aTypeOfExec == Rollback) { - if (theTransactionIsStarted == false) { + if (theTransactionIsStarted == false || theSimpleState) { theCommitStatus = Aborted; theSendStatus = sendCompleted; } else { @@ -528,7 +528,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec, tLastOp->theCommitIndicator = 1; }//if } else { - if (aTypeOfExec == Commit) { + if (aTypeOfExec == Commit && !theSimpleState) { /********************************************************************** * A Transaction have been started and no more operations exist. * We will use the commit method. @@ -610,6 +610,8 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec, theNoOfOpSent = 0; theNoOfOpCompleted = 0; theSendStatus = sendOperations; + NdbNodeBitmask::clear(m_db_nodes); + NdbNodeBitmask::clear(m_failed_db_nodes); DBUG_VOID_RETURN; }//NdbConnection::executeAsynchPrepare() @@ -728,7 +730,8 @@ NdbConnection::doSend() theNdb->insert_completed_list(this); DBUG_RETURN(0); default: - ndbout << "Inconsistent theSendStatus = " << theSendStatus << endl; + ndbout << "Inconsistent theSendStatus = " + << (Uint32) theSendStatus << endl; abort(); break; }//switch @@ -1115,15 +1118,8 @@ NdbConnection::getNdbScanOperation(const NdbTableImpl * tab) if (tOp == NULL) goto getNdbOp_error1; - // Link scan operation into list of cursor operations - if (m_theLastScanOperation == NULL) - m_theFirstScanOperation = m_theLastScanOperation = tOp; - else { - m_theLastScanOperation->next(tOp); - m_theLastScanOperation = tOp; - } - tOp->next(NULL); if (tOp->init(tab, this) != -1) { + define_scan_op(tOp); return tOp; } else { theNdb->releaseScanOperation(tOp); @@ -1135,6 +1131,31 @@ getNdbOp_error1: return NULL; }//NdbConnection::getNdbScanOperation() +void +NdbConnection::remove_list(NdbOperation*& list, NdbOperation* op){ + NdbOperation* tmp= list; + if(tmp == op) + list = op->next(); + else { + while(tmp && tmp->next() != op) tmp = tmp->next(); + if(tmp) + tmp->next(op->next()); + } + op->next(NULL); +} + +void +NdbConnection::define_scan_op(NdbIndexScanOperation * tOp){ + // Link scan operation into list of cursor operations + if (m_theLastScanOperation == NULL) + m_theFirstScanOperation = m_theLastScanOperation = tOp; + else { + m_theLastScanOperation->next(tOp); + m_theLastScanOperation = tOp; + } + tOp->next(NULL); +} + NdbScanOperation* NdbConnection::getNdbScanOperation(const NdbDictionary::Table * table) { @@ -1517,12 +1538,21 @@ from other transactions. const Uint32* tPtr = (Uint32 *)&keyConf->operations[0]; Uint32 tNoComp = theNoOfOpCompleted; for (Uint32 i = 0; i < tNoOfOperations ; i++) { - tOp = theNdb->void2rec(theNdb->int2void(*tPtr)); - tPtr++; - const Uint32 tAttrInfoLen = *tPtr; - tPtr++; + tOp = theNdb->void2rec(theNdb->int2void(*tPtr++)); + const Uint32 tAttrInfoLen = *tPtr++; if (tOp && tOp->checkMagicNumber()) { - tNoComp += tOp->execTCOPCONF(tAttrInfoLen); + Uint32 done = tOp->execTCOPCONF(tAttrInfoLen); + if(tAttrInfoLen > TcKeyConf::SimpleReadBit){ + Uint32 node = tAttrInfoLen & (~TcKeyConf::SimpleReadBit); + NdbNodeBitmask::set(m_db_nodes, node); + if(NdbNodeBitmask::get(m_failed_db_nodes, node) && !done) + { + done = 1; + tOp->setErrorCode(4119); + theCompletionStatus = CompletedFailure; + } + } + tNoComp += done; } else { return -1; }//if @@ -1614,6 +1644,10 @@ NdbConnection::receiveTCKEY_FAILCONF(const TcKeyFailConf * failConf) setOperationErrorCodeAbort(4115); tOp = NULL; break; + case NdbOperation::NotDefined: + case NdbOperation::NotDefined2: + assert(false); + break; }//if }//while theReleaseOnClose = true; @@ -1772,7 +1806,7 @@ Parameters: aErrorCode: The error code. Remark: An operation was completed with failure. *******************************************************************************/ int -NdbConnection::OpCompleteFailure() +NdbConnection::OpCompleteFailure(Uint8 abortOption) { Uint32 tNoComp = theNoOfOpCompleted; Uint32 tNoSent = theNoOfOpSent; @@ -1786,10 +1820,7 @@ NdbConnection::OpCompleteFailure() //decide the success of the whole transaction since a simple //operation is not really part of that transaction. //------------------------------------------------------------------------ - if (theSimpleState == 1) { - theCommitStatus = NdbConnection::Aborted; - }//if - if (m_abortOption == IgnoreError){ + if (abortOption == IgnoreError){ /** * There's always a TCKEYCONF when using IgnoreError */ @@ -1824,9 +1855,6 @@ NdbConnection::OpCompleteSuccess() tNoComp++; theNoOfOpCompleted = tNoComp; if (tNoComp == tNoSent) { // Last operation completed - if (theSimpleState == 1) { - theCommitStatus = NdbConnection::Committed; - }//if return 0; } else if (tNoComp < tNoSent) { return -1; // Continue waiting for more signals @@ -1901,14 +1929,14 @@ NdbConnection::printState() CASE(Connected); CASE(DisConnecting); CASE(ConnectFailure); - default: ndbout << theStatus; + default: ndbout << (Uint32) theStatus; } switch (theListState) { CASE(NotInList); CASE(InPreparedList); CASE(InSendList); CASE(InCompletedList); - default: ndbout << theListState; + default: ndbout << (Uint32) theListState; } switch (theSendStatus) { CASE(NotInit); @@ -1921,7 +1949,7 @@ NdbConnection::printState() CASE(sendTC_ROLLBACK); CASE(sendTC_COMMIT); CASE(sendTC_OP); - default: ndbout << theSendStatus; + default: ndbout << (Uint32) theSendStatus; } switch (theCommitStatus) { CASE(NotStarted); @@ -1929,16 +1957,56 @@ NdbConnection::printState() CASE(Committed); CASE(Aborted); CASE(NeedAbort); - default: ndbout << theCommitStatus; + default: ndbout << (Uint32) theCommitStatus; } switch (theCompletionStatus) { CASE(NotCompleted); CASE(CompletedSuccess); CASE(CompletedFailure); CASE(DefinitionFailure); - default: ndbout << theCompletionStatus; + default: ndbout << (Uint32) theCompletionStatus; } ndbout << endl; } #undef CASE #endif + +int +NdbConnection::report_node_failure(Uint32 id){ + NdbNodeBitmask::set(m_failed_db_nodes, id); + if(!NdbNodeBitmask::get(m_db_nodes, id)) + { + return 0; + } + + /** + * Arrived + * TCKEYCONF TRANSIDAI + * 1) - - + * 2) - X + * 3) X - + * 4) X X + */ + NdbOperation* tmp = theFirstExecOpInList; + const Uint32 len = TcKeyConf::SimpleReadBit | id; + Uint32 tNoComp = theNoOfOpCompleted; + Uint32 tNoSent = theNoOfOpSent; + while(tmp != 0) + { + if(tmp->theReceiver.m_expected_result_length == len && + tmp->theReceiver.m_received_result_length == 0) + { + tNoComp++; + tmp->theError.code = 4119; + } + tmp = tmp->next(); + } + theNoOfOpCompleted = tNoComp; + if(tNoComp == tNoSent) + { + theError.code = 4119; + theCompletionStatus = NdbConnection::CompletedFailure; + return 1; + } + return 0; +} diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index 5e640cdebd5..cf51a30fe0b 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -148,6 +148,9 @@ NdbColumnImpl::init(Type t) m_length = 4; m_cs = default_cs; break; + case Undefined: + assert(false); + break; } m_pk = false; m_nullable = false; @@ -1466,7 +1469,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, impl.m_internalName.assign(internalName); UtilBufferWriter w(m_buffer); DictTabInfo::Table tmpTab; tmpTab.init(); - snprintf(tmpTab.TableName, + BaseString::snprintf(tmpTab.TableName, sizeof(tmpTab.TableName), internalName); @@ -1522,7 +1525,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, continue; DictTabInfo::Attribute tmpAttr; tmpAttr.init(); - snprintf(tmpAttr.AttributeName, sizeof(tmpAttr.AttributeName), + BaseString::snprintf(tmpAttr.AttributeName, sizeof(tmpAttr.AttributeName), col->m_name.c_str()); tmpAttr.AttributeId = i; tmpAttr.AttributeKeyFlag = col->m_pk || col->m_tupleKey; @@ -1557,7 +1560,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, (void)tmpAttr.translateExtType(); tmpAttr.AttributeAutoIncrement = col->m_autoIncrement; - snprintf(tmpAttr.AttributeDefaultValue, + BaseString::snprintf(tmpAttr.AttributeDefaultValue, sizeof(tmpAttr.AttributeDefaultValue), col->m_defaultValue.c_str()); s = SimpleProperties::pack(w, diff --git a/ndb/src/ndbapi/NdbIndexOperation.cpp b/ndb/src/ndbapi/NdbIndexOperation.cpp index c62f6962e25..83de6d9ef87 100644 --- a/ndb/src/ndbapi/NdbIndexOperation.cpp +++ b/ndb/src/ndbapi/NdbIndexOperation.cpp @@ -87,7 +87,19 @@ NdbIndexOperation::indxInit(const NdbIndexImpl * anIndex, int NdbIndexOperation::readTuple(NdbOperation::LockMode lm) { - return NdbOperation::readTuple(lm); + switch(lm) { + case LM_Read: + return readTuple(); + break; + case LM_Exclusive: + return readTupleExclusive(); + break; + case LM_CommittedRead: + return readTuple(); + break; + default: + return -1; + }; } int NdbIndexOperation::readTuple() @@ -108,21 +120,21 @@ int NdbIndexOperation::simpleRead() { // First check that index is unique - return NdbOperation::simpleRead(); + return NdbOperation::readTuple(); } int NdbIndexOperation::dirtyRead() { // First check that index is unique - return NdbOperation::dirtyRead(); + return NdbOperation::readTuple(); } int NdbIndexOperation::committedRead() { // First check that index is unique - return NdbOperation::committedRead(); + return NdbOperation::readTuple(); } int NdbIndexOperation::updateTuple() @@ -536,7 +548,7 @@ NdbIndexOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransactionId) //------------------------------------------------------------- Uint8 tReadInd = (theOperationType == ReadRequest); Uint8 tSimpleState = tReadInd & tSimpleAlt; - theNdbCon->theSimpleState = tSimpleState; + //theNdbCon->theSimpleState = tSimpleState; tcIndxReq->transId1 = tTransId1; tcIndxReq->transId2 = tTransId2; @@ -723,23 +735,10 @@ NdbIndexOperation::receiveTCINDXREF( NdbApiSignal* aSignal) theStatus = Finished; theNdbCon->theReturnStatus = NdbConnection::ReturnFailure; - //--------------------------------------------------------------------------// - // If the transaction this operation belongs to consists only of simple reads - // we set the error code on the transaction object. - // If the transaction consists of other types of operations we set - // the error code only on the operation since the simple read is not really - // part of this transaction and we can not decide the status of the whole - // transaction based on this operation. - //--------------------------------------------------------------------------// Uint32 errorCode = tcIndxRef->errorCode; - if (theNdbCon->theSimpleState == 0) { - theError.code = errorCode; - theNdbCon->setOperationErrorCodeAbort(errorCode); - return theNdbCon->OpCompleteFailure(); - } else { - theError.code = errorCode; - return theNdbCon->OpCompleteSuccess(); - } + theError.code = errorCode; + theNdbCon->setOperationErrorCodeAbort(errorCode); + return theNdbCon->OpCompleteFailure(theNdbCon->m_abortOption); }//NdbIndexOperation::receiveTCINDXREF() diff --git a/ndb/src/ndbapi/NdbOperation.cpp b/ndb/src/ndbapi/NdbOperation.cpp index 53a94d98a5a..b0b95d0ff43 100644 --- a/ndb/src/ndbapi/NdbOperation.cpp +++ b/ndb/src/ndbapi/NdbOperation.cpp @@ -78,7 +78,6 @@ NdbOperation::NdbOperation(Ndb* aNdb) : m_tcReqGSN(GSN_TCKEYREQ), m_keyInfoGSN(GSN_KEYINFO), m_attrInfoGSN(GSN_ATTRINFO), - theBoundATTRINFO(NULL), theBlobList(NULL) { theReceiver.init(NdbReceiver::NDB_OPERATION, this); @@ -167,7 +166,6 @@ NdbOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection){ theScanInfo = 0; theTotalNrOfKeyWordInSignal = 8; theMagicNumber = 0xABCDEF01; - theBoundATTRINFO = NULL; theBlobList = NULL; tSignal = theNdb->getSignal(); @@ -263,14 +261,6 @@ NdbOperation::release() tSubroutine = tSubroutine->theNext; theNdb->releaseNdbSubroutine(tSaveSubroutine); } - tSignal = theBoundATTRINFO; - while (tSignal != NULL) - { - tSaveSignal = tSignal; - tSignal = tSignal->next(); - theNdb->releaseSignal(tSaveSignal); - } - theBoundATTRINFO = NULL; } tBlob = theBlobList; while (tBlob != NULL) diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp index 1cbfedd21b1..35abb15b00d 100644 --- a/ndb/src/ndbapi/NdbOperationDefine.cpp +++ b/ndb/src/ndbapi/NdbOperationDefine.cpp @@ -55,6 +55,7 @@ NdbOperation::insertTuple() theOperationType = InsertRequest; tNdbCon->theSimpleState = 0; theErrorLine = tErrorLine++; + theLockMode = LM_Exclusive; return 0; } else { setErrorCode(4200); @@ -74,6 +75,7 @@ NdbOperation::updateTuple() tNdbCon->theSimpleState = 0; theOperationType = UpdateRequest; theErrorLine = tErrorLine++; + theLockMode = LM_Exclusive; return 0; } else { setErrorCode(4200); @@ -93,6 +95,7 @@ NdbOperation::writeTuple() tNdbCon->theSimpleState = 0; theOperationType = WriteRequest; theErrorLine = tErrorLine++; + theLockMode = LM_Exclusive; return 0; } else { setErrorCode(4200); @@ -113,8 +116,10 @@ NdbOperation::readTuple(NdbOperation::LockMode lm) return readTupleExclusive(); break; case LM_CommittedRead: - return readTuple(); + return committedRead(); break; + default: + return -1; }; } /****************************************************************************** @@ -130,6 +135,7 @@ NdbOperation::readTuple() tNdbCon->theSimpleState = 0; theOperationType = ReadRequest; theErrorLine = tErrorLine++; + theLockMode = LM_Read; return 0; } else { setErrorCode(4200); @@ -150,6 +156,7 @@ NdbOperation::deleteTuple() tNdbCon->theSimpleState = 0; theOperationType = DeleteRequest; theErrorLine = tErrorLine++; + theLockMode = LM_Exclusive; return 0; } else { setErrorCode(4200); @@ -170,6 +177,7 @@ NdbOperation::readTupleExclusive() tNdbCon->theSimpleState = 0; theOperationType = ReadExclusive; theErrorLine = tErrorLine++; + theLockMode = LM_Exclusive; return 0; } else { setErrorCode(4200); @@ -183,17 +191,24 @@ NdbOperation::readTupleExclusive() int NdbOperation::simpleRead() { + /** + * Currently/still disabled + */ + return readTuple(); +#if 0 int tErrorLine = theErrorLine; if (theStatus == Init) { theStatus = OperationDefined; theOperationType = ReadRequest; theSimpleIndicator = 1; theErrorLine = tErrorLine++; + theLockMode = LM_Read; return 0; } else { setErrorCode(4200); return -1; }//if +#endif }//NdbOperation::simpleRead() /***************************************************************************** @@ -218,6 +233,7 @@ NdbOperation::committedRead() theSimpleIndicator = 1; theDirtyIndicator = 1; theErrorLine = tErrorLine++; + theLockMode = LM_CommittedRead; return 0; } else { setErrorCode(4200); @@ -240,6 +256,7 @@ NdbOperation::dirtyUpdate() theSimpleIndicator = 1; theDirtyIndicator = 1; theErrorLine = tErrorLine++; + theLockMode = LM_CommittedRead; return 0; } else { setErrorCode(4200); @@ -262,6 +279,7 @@ NdbOperation::dirtyWrite() theSimpleIndicator = 1; theDirtyIndicator = 1; theErrorLine = tErrorLine++; + theLockMode = LM_CommittedRead; return 0; } else { setErrorCode(4200); @@ -282,7 +300,7 @@ NdbOperation::interpretedUpdateTuple() tNdbCon->theSimpleState = 0; theOperationType = UpdateRequest; theAI_LenInCurrAI = 25; - + theLockMode = LM_Exclusive; theErrorLine = tErrorLine++; initInterpreter(); return 0; @@ -307,7 +325,7 @@ NdbOperation::interpretedDeleteTuple() theErrorLine = tErrorLine++; theAI_LenInCurrAI = 25; - + theLockMode = LM_Exclusive; initInterpreter(); return 0; } else { @@ -334,10 +352,6 @@ NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue) if ((tAttrInfo != NULL) && (!tAttrInfo->m_indexOnly) && (theStatus != Init)){ - if (theStatus == SetBound) { - ((NdbIndexScanOperation*)this)->saveBoundATTRINFO(); - theStatus = GetValue; - } if (theStatus != GetValue) { if (theInterpretIndicator == 1) { if (theStatus == FinalGetValue) { diff --git a/ndb/src/ndbapi/NdbOperationExec.cpp b/ndb/src/ndbapi/NdbOperationExec.cpp index cd89f953213..f1338ae01e4 100644 --- a/ndb/src/ndbapi/NdbOperationExec.cpp +++ b/ndb/src/ndbapi/NdbOperationExec.cpp @@ -161,28 +161,17 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId) tTransId1 = (Uint32) aTransId; tTransId2 = (Uint32) (aTransId >> 32); -//------------------------------------------------------------- -// Simple is simple if simple or both start and commit is set. -//------------------------------------------------------------- -// Temporarily disable simple stuff - Uint8 tSimpleIndicator = 0; -// Uint8 tSimpleIndicator = theSimpleIndicator; + Uint8 tSimpleIndicator = theSimpleIndicator; Uint8 tCommitIndicator = theCommitIndicator; Uint8 tStartIndicator = theStartIndicator; -// if ((theNdbCon->theLastOpInList == this) && (theCommitIndicator == 0)) -// abort(); -// Temporarily disable simple stuff - Uint8 tSimpleAlt = 0; -// Uint8 tSimpleAlt = tStartIndicator & tCommitIndicator; - tSimpleIndicator = tSimpleIndicator | tSimpleAlt; + Uint8 tInterpretIndicator = theInterpretIndicator; //------------------------------------------------------------- // Simple state is set if start and commit is set and it is // a read request. Otherwise it is set to zero. //------------------------------------------------------------- Uint8 tReadInd = (theOperationType == ReadRequest); - Uint8 tSimpleState = tReadInd & tSimpleAlt; - theNdbCon->theSimpleState = tSimpleState; + Uint8 tSimpleState = tReadInd & tSimpleIndicator; tcKeyReq->transId1 = tTransId1; tcKeyReq->transId2 = tTransId2; @@ -197,7 +186,6 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId) tcKeyReq->setSimpleFlag(tReqInfo, tSimpleIndicator); tcKeyReq->setCommitFlag(tReqInfo, tCommitIndicator); tcKeyReq->setStartFlag(tReqInfo, tStartIndicator); - const Uint8 tInterpretIndicator = theInterpretIndicator; tcKeyReq->setInterpretedFlag(tReqInfo, tInterpretIndicator); Uint8 tDirtyIndicator = theDirtyIndicator; @@ -208,6 +196,9 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId) tcKeyReq->setDirtyFlag(tReqInfo, tDirtyIndicator); tcKeyReq->setOperationType(tReqInfo, tOperationType); tcKeyReq->setKeyLength(tReqInfo, tTupKeyLen); + + // A simple read is always ignore error + abortOption = tSimpleIndicator ? IgnoreError : abortOption; tcKeyReq->setAbortOption(tReqInfo, abortOption); Uint8 tDistrKeyIndicator = theDistrKeyIndicator; @@ -550,27 +541,29 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal) return -1; }//if + AbortOption ao = (AbortOption)theNdbCon->m_abortOption; + theReceiver.m_received_result_length = ~0; + theStatus = Finished; - theNdbCon->theReturnStatus = NdbConnection::ReturnFailure; - //-------------------------------------------------------------------------// - // If the transaction this operation belongs to consists only of simple reads - // we set the error code on the transaction object. - // If the transaction consists of other types of operations we set - // the error code only on the operation since the simple read is not really - // part of this transaction and we can not decide the status of the whole - // transaction based on this operation. - //-------------------------------------------------------------------------// - if (theNdbCon->theSimpleState == 0) { - theError.code = aSignal->readData(4); - theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4)); - return theNdbCon->OpCompleteFailure(); - } else { - theError.code = aSignal->readData(4); - return theNdbCon->OpCompleteSuccess(); + + theError.code = aSignal->readData(4); + theNdbCon->setOperationErrorCodeAbort(aSignal->readData(4)); + + if(theOperationType != ReadRequest || !theSimpleIndicator) // not simple read + return theNdbCon->OpCompleteFailure(ao); + + /** + * If TCKEYCONF has arrived + * op has completed (maybe trans has completed) + */ + if(theReceiver.m_expected_result_length) + { + return theNdbCon->OpCompleteFailure(AbortOnError); } -}//NdbOperation::receiveTCKEYREF() + return -1; +} void diff --git a/ndb/src/ndbapi/NdbOperationInt.cpp b/ndb/src/ndbapi/NdbOperationInt.cpp index f5d334fd79a..ee7b8132cd1 100644 --- a/ndb/src/ndbapi/NdbOperationInt.cpp +++ b/ndb/src/ndbapi/NdbOperationInt.cpp @@ -216,10 +216,6 @@ int NdbOperation::initial_interpreterCheck() { if ((theInterpretIndicator == 1)) { - if (theStatus == SetBound) { - ((NdbIndexScanOperation*)this)->saveBoundATTRINFO(); - theStatus = GetValue; - } if (theStatus == ExecInterpretedValue) { return 0; // Simply continue with interpretation } else if (theStatus == GetValue) { diff --git a/ndb/src/ndbapi/NdbOperationSearch.cpp b/ndb/src/ndbapi/NdbOperationSearch.cpp index e5166fc4a82..0d3130fffd0 100644 --- a/ndb/src/ndbapi/NdbOperationSearch.cpp +++ b/ndb/src/ndbapi/NdbOperationSearch.cpp @@ -543,7 +543,8 @@ NdbOperation::getKeyFromTCREQ(Uint32* data, unsigned size) assert(m_accessTable->m_sizeOfKeysInWords == size); unsigned pos = 0; while (pos < 8 && pos < size) { - data[pos++] = theKEYINFOptr[pos]; + data[pos] = theKEYINFOptr[pos]; + pos++; } NdbApiSignal* tSignal = theFirstKEYINFO; unsigned n = 0; diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp index caeeac80093..14f8d4b8440 100644 --- a/ndb/src/ndbapi/NdbReceiver.cpp +++ b/ndb/src/ndbapi/NdbReceiver.cpp @@ -22,6 +22,7 @@ #include <AttributeHeader.hpp> #include <NdbConnection.hpp> #include <TransporterFacade.hpp> +#include <signaldata/TcKeyConf.hpp> NdbReceiver::NdbReceiver(Ndb *aNdb) : theMagicNumber(0), @@ -91,7 +92,7 @@ NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){ return 0; } -#define KEY_ATTR_ID (~0) +#define KEY_ATTR_ID (~(Uint32)0) void NdbReceiver::calculate_batch_size(Uint32 key_size, @@ -249,10 +250,11 @@ NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength) /** * Update m_received_result_length */ + Uint32 exp = m_expected_result_length; Uint32 tmp = m_received_result_length + aLength; m_received_result_length = tmp; - return (tmp == m_expected_result_length ? 1 : 0); + return (tmp == exp || (exp > TcKeyConf::SimpleReadBit) ? 1 : 0); } int @@ -272,3 +274,11 @@ NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength) return (tmp == m_expected_result_length ? 1 : 0); } + +void +NdbReceiver::setErrorCode(int code) +{ + theMagicNumber = 0; + NdbOperation* op = (NdbOperation*)getOwner(); + op->setErrorCode(code); +} diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 3ff2a32d418..fd63ce96f25 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -32,6 +32,7 @@ #include <signaldata/ScanTab.hpp> #include <signaldata/KeyInfo.hpp> +#include <signaldata/AttrInfo.hpp> #include <signaldata/TcKeyReq.hpp> NdbScanOperation::NdbScanOperation(Ndb* aNdb) : @@ -116,10 +117,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection) theStatus = GetValue; theOperationType = OpenScanRequest; - - theTotalBoundAI_Len = 0; - theBoundATTRINFO = NULL; - + theNdbCon->theMagicNumber = 0xFE11DF; + return 0; } @@ -145,6 +144,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, } theNdbCon->theScanningOp = this; + theLockMode = lm; bool lockExcl, lockHoldMode, readCommitted; switch(lm){ @@ -168,7 +168,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, return 0; } - m_keyInfo = lockExcl; + m_keyInfo = lockExcl ? 1 : 0; bool range = false; if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex || @@ -181,7 +181,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, } assert (m_currentTable != m_accessTable); // Modify operation state - theStatus = SetBound; + theStatus = GetValue; theOperationType = OpenRangeScanRequest; range = true; } @@ -219,8 +219,17 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, req->transId1 = (Uint32) transId; req->transId2 = (Uint32) (transId >> 32); - getFirstATTRINFOScan(); + NdbApiSignal* tSignal = + theFirstKEYINFO; + theFirstKEYINFO = (tSignal ? tSignal : tSignal = theNdb->getSignal()); + theLastKEYINFO = tSignal; + + tSignal->setSignal(GSN_KEYINFO); + theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; + theTotalNrOfKeyWordInSignal= 0; + + getFirstATTRINFOScan(); return getResultSet(); } @@ -256,18 +265,7 @@ NdbScanOperation::fix_receivers(Uint32 parallel){ m_allocated_receivers = parallel; } - for(Uint32 i = 0; i<parallel; i++){ - m_receivers[i]->m_list_index = i; - m_prepared_receivers[i] = m_receivers[i]->getId(); - m_sent_receivers[i] = m_receivers[i]; - m_conf_receivers[i] = 0; - m_api_receivers[i] = 0; - } - - m_api_receivers_count = 0; - m_current_api_receiver = 0; - m_sent_receivers_count = parallel; - m_conf_receivers_count = 0; + reset_receivers(parallel, 0); return 0; } @@ -355,6 +353,7 @@ NdbScanOperation::getFirstATTRINFOScan() * After setBound() are done, move the accumulated ATTRINFO signals to * a separate list. Then continue with normal scan. */ +#if 0 int NdbIndexScanOperation::saveBoundATTRINFO() { @@ -401,6 +400,7 @@ NdbIndexScanOperation::saveBoundATTRINFO() } return res; } +#endif #define WAITFOR_SCAN_TIMEOUT 120000 @@ -409,14 +409,22 @@ NdbScanOperation::executeCursor(int nodeId){ NdbConnection * tCon = theNdbCon; TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); + + Uint32 magic = tCon->theMagicNumber; Uint32 seq = tCon->theNodeSequence; + if (tp->get_node_alive(nodeId) && (tp->getNodeSequence(nodeId) == seq)) { - - if(prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1) - return -1; + /** + * Only call prepareSendScan first time (incase of restarts) + * - check with theMagicNumber + */ tCon->theMagicNumber = 0x37412619; + if(magic != 0x37412619 && + prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1) + return -1; + if (doSendScan(nodeId) == -1) return -1; @@ -428,7 +436,6 @@ NdbScanOperation::executeCursor(int nodeId){ TRACE_DEBUG("The node is hard dead when attempting to start a scan"); setErrorCode(4029); tCon->theReleaseOnClose = true; - abort(); } else { TRACE_DEBUG("The node is stopping when attempting to start a scan"); setErrorCode(4030); @@ -559,6 +566,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) setErrorCode(4028); // Node fail break; case -3: // send_next_scan -> return fail (set error-code self) + if(theError.code == 0) + setErrorCode(4028); // seq changed = Node fail break; } @@ -635,7 +644,7 @@ NdbScanOperation::doSend(int ProcessorId) void NdbScanOperation::closeScan() { - if(m_transConnection) do { + if(m_transConnection){ if(DEBUG_NEXT_RESULT) ndbout_c("closeScan() theError.code = %d " "m_api_receivers_count = %d " @@ -648,55 +657,8 @@ void NdbScanOperation::closeScan() TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); - - Uint32 seq = theNdbCon->theNodeSequence; - Uint32 nodeId = theNdbCon->theDBnode; - - if(seq != tp->getNodeSequence(nodeId)){ - theNdbCon->theReleaseOnClose = true; - break; - } - - while(theError.code == 0 && m_sent_receivers_count){ - theNdb->theWaiter.m_node = nodeId; - theNdb->theWaiter.m_state = WAIT_SCAN; - int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); - switch(return_code){ - case 0: - break; - case -1: - setErrorCode(4008); - case -2: - m_api_receivers_count = 0; - m_conf_receivers_count = 0; - m_sent_receivers_count = 0; - theNdbCon->theReleaseOnClose = true; - } - } - - if(m_api_receivers_count+m_conf_receivers_count){ - // Send close scan - send_next_scan(0, true); // Close scan - } + close_impl(tp); - /** - * wait for close scan conf - */ - while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){ - theNdb->theWaiter.m_node = nodeId; - theNdb->theWaiter.m_state = WAIT_SCAN; - int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); - switch(return_code){ - case 0: - break; - case -1: - setErrorCode(4008); - case -2: - m_api_receivers_count = 0; - m_conf_receivers_count = 0; - m_sent_receivers_count = 0; - } - } } while(0); theNdbCon->theScanningOp = 0; @@ -750,11 +712,6 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, return -1; } - if (theStatus == SetBound) { - ((NdbIndexScanOperation*)this)->saveBoundATTRINFO(); - theStatus = GetValue; - } - theErrorLine = 0; // In preapareSendInterpreted we set the sizes (word 4-8) in the @@ -766,26 +723,7 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, ((NdbIndexScanOperation*)this)->fix_get_values(); } - const Uint32 transId1 = (Uint32) (aTransactionId & 0xFFFFFFFF); - const Uint32 transId2 = (Uint32) (aTransactionId >> 32); - - if (theOperationType == OpenRangeScanRequest) { - NdbApiSignal* tSignal = theBoundATTRINFO; - do{ - tSignal->setData(aTC_ConnectPtr, 1); - tSignal->setData(transId1, 2); - tSignal->setData(transId2, 3); - tSignal = tSignal->next(); - } while (tSignal != NULL); - } theCurrentATTRINFO->setLength(theAI_LenInCurrAI); - NdbApiSignal* tSignal = theFirstATTRINFO; - do{ - tSignal->setData(aTC_ConnectPtr, 1); - tSignal->setData(transId1, 2); - tSignal->setData(transId2, 3); - tSignal = tSignal->next(); - } while (tSignal != NULL); /** * Prepare all receivers @@ -808,20 +746,28 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, req->batch_byte_size= batch_byte_size; req->first_batch_size= first_batch_size; + /** + * Set keyinfo flag + * (Always keyinfo when using blobs) + */ + Uint32 reqInfo = req->requestInfo; + ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo); + req->requestInfo = reqInfo; + for(Uint32 i = 0; i<theParallelism; i++){ m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size); } return 0; } -/****************************************************************************** +/***************************************************************************** int doSend() Return Value: Return >0 : send was succesful, returns number of signals sent Return -1: In all other case. Parameters: aProcessorId: Receiving processor node Remark: Sends the ATTRINFO signal(s) -******************************************************************************/ +*****************************************************************************/ int NdbScanOperation::doSendScan(int aProcessorId) { @@ -841,13 +787,18 @@ NdbScanOperation::doSendScan(int aProcessorId) setErrorCode(4001); return -1; } + + Uint32 tupKeyLen = theTupKeyLen; + Uint32 len = theTotalNrOfKeyWordInSignal; + Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr; + Uint64 transId = theNdbCon->theTransactionId; + // Update the "attribute info length in words" in SCAN_TABREQ before // sending it. This could not be done in openScan because // we created the ATTRINFO signals after the SCAN_TABREQ signal. ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend()); - req->attrLen = theTotalCurrAI_Len; - if (theOperationType == OpenRangeScanRequest) - req->attrLen += theTotalBoundAI_Len; + req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len; + TransporterFacade *tp = TransporterFacade::instance(); LinearSectionPtr ptr[3]; ptr[0].p = m_prepared_receivers; @@ -856,22 +807,41 @@ NdbScanOperation::doSendScan(int aProcessorId) setErrorCode(4002); return -1; } - if (theOperationType == OpenRangeScanRequest) { + + if (tupKeyLen > 0){ // must have at least one signal since it contains attrLen for bounds - assert(theBoundATTRINFO != NULL); - tSignal = theBoundATTRINFO; - while (tSignal != NULL) { + assert(theLastKEYINFO != NULL); + tSignal = theLastKEYINFO; + tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal); + + assert(theFirstKEYINFO != NULL); + tSignal = theFirstKEYINFO; + + NdbApiSignal* last; + do { + KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend()); + keyInfo->connectPtr = aTC_ConnectPtr; + keyInfo->transId[0] = Uint32(transId); + keyInfo->transId[1] = Uint32(transId >> 32); + if (tp->sendSignal(tSignal,aProcessorId) == -1){ - setErrorCode(4002); - return -1; + setErrorCode(4002); + return -1; } + tSignalCount++; + last = tSignal; tSignal = tSignal->next(); - } + } while(last != theLastKEYINFO); } tSignal = theFirstATTRINFO; while (tSignal != NULL) { + AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend()); + attrInfo->connectPtr = aTC_ConnectPtr; + attrInfo->transId[0] = Uint32(transId); + attrInfo->transId[1] = Uint32(transId >> 32); + if (tp->sendSignal(tSignal,aProcessorId) == -1){ setErrorCode(4002); return -1; @@ -883,7 +853,7 @@ NdbScanOperation::doSendScan(int aProcessorId) return tSignalCount; }//NdbOperation::doSendScan() -/****************************************************************************** +/***************************************************************************** * NdbOperation* takeOverScanOp(NdbConnection* updateTrans); * * Parameters: The update transactions NdbConnection pointer. @@ -902,7 +872,7 @@ NdbScanOperation::doSendScan(int aProcessorId) * This means that the updating transactions can be placed * in separate threads and thus increasing the parallelism during * the scan process. - *****************************************************************************/ + ****************************************************************************/ int NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size) { @@ -940,6 +910,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ if (newOp == NULL){ return NULL; } + pTrans->theSimpleState = 0; const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1; @@ -1011,6 +982,7 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ NdbBlob* NdbScanOperation::getBlobHandle(const char* anAttrName) { + m_keyInfo = 1; return NdbOperation::getBlobHandle(m_transConnection, m_currentTable->getColumn(anAttrName)); } @@ -1018,6 +990,7 @@ NdbScanOperation::getBlobHandle(const char* anAttrName) NdbBlob* NdbScanOperation::getBlobHandle(Uint32 anAttrId) { + m_keyInfo = 1; return NdbOperation::getBlobHandle(m_transConnection, m_currentTable->getColumn(anAttrId)); } @@ -1031,13 +1004,15 @@ NdbIndexScanOperation::~NdbIndexScanOperation(){ } int -NdbIndexScanOperation::setBound(const char* anAttrName, int type, const void* aValue, Uint32 len) +NdbIndexScanOperation::setBound(const char* anAttrName, int type, + const void* aValue, Uint32 len) { return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len); } int -NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len) +NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, + const void* aValue, Uint32 len) { return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len); } @@ -1056,11 +1031,6 @@ NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, return NdbScanOperation::getValue_impl(attrInfo, aValue); } - if (theStatus == SetBound) { - saveBoundATTRINFO(); - theStatus = GetValue; - } - int id = attrInfo->m_attrId; // In "real" table assert(m_accessTable->m_index); int sz = (int)m_accessTable->m_index->m_key_ids.size(); @@ -1101,12 +1071,13 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, int type, const void* aValue, Uint32 len) { if (theOperationType == OpenRangeScanRequest && - theStatus == SetBound && (0 <= type && type <= 4) && len <= 8000) { // insert bound type - insertATTRINFO(type); + Uint32 currLen = theTotalNrOfKeyWordInSignal; + Uint32 remaining = KeyInfo::DataLength - currLen; Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; + // normalize char bound CHARSET_INFO* cs = tAttrInfo->m_cs; Uint32 xfrmData[2000]; @@ -1130,19 +1101,34 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, Uint32 tIndexAttrId = tAttrInfo->m_attrId; Uint32 sizeInWords = (len + 3) / 4; AttributeHeader ah(tIndexAttrId, sizeInWords); - insertATTRINFO(ah.m_value); - if (len != 0) { - // insert attribute data - if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0) - insertATTRINFOloop((const Uint32*)aValue, sizeInWords); - else { - Uint32 tempData[2000]; - memcpy(tempData, aValue, len); + const Uint32 ahValue = ah.m_value; + + const bool aligned = (UintPtr(aValue) & 3) == 0; + const bool nobytes = (len & 0x3) == 0; + const Uint32 totalLen = 2 + sizeInWords; + Uint32 tupKeyLen = theTupKeyLen; + if(remaining > totalLen && aligned && nobytes){ + Uint32 * dst = theKEYINFOptr + currLen; + * dst ++ = type; + * dst ++ = ahValue; + memcpy(dst, aValue, 4 * sizeInWords); + theTotalNrOfKeyWordInSignal = currLen + totalLen; + } else { + if(!aligned || !nobytes){ + Uint32 tempData[2002]; + tempData[0] = type; + tempData[1] = ahValue; + memcpy(tempData+2, aValue, len); while ((len & 0x3) != 0) - ((char*)tempData)[len++] = 0; - insertATTRINFOloop(tempData, sizeInWords); + ((char*)&tempData[2])[len++] = 0; + insertBOUNDS(tempData, 2+sizeInWords); + } else { + Uint32 buf[2] = { type, ahValue }; + insertBOUNDS(buf, 2); + insertBOUNDS((Uint32*)aValue, sizeInWords); } } + theTupKeyLen = tupKeyLen + totalLen; /** * Do sorted stuff @@ -1165,6 +1151,46 @@ NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, } } +int +NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){ + Uint32 len; + Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal; + Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal; + do { + len = (sz < remaining ? sz : remaining); + memcpy(dst, data, 4 * len); + + if(sz >= remaining){ + NdbApiSignal* tCurr = theLastKEYINFO; + tCurr->setLength(KeyInfo::MaxSignalLength); + NdbApiSignal* tSignal = tCurr->next(); + if(tSignal) + ; + else if((tSignal = theNdb->getSignal()) != 0) + { + tCurr->next(tSignal); + tSignal->setSignal(GSN_KEYINFO); + } else { + goto error; + } + theLastKEYINFO = tSignal; + theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData; + remaining = KeyInfo::DataLength; + sz -= len; + data += len; + } else { + len = (KeyInfo::DataLength - remaining) + len; + break; + } + } while(true); + theTotalNrOfKeyWordInSignal = len; + return 0; + +error: + setErrorCodeAbort(4228); // XXX wrong code + return -1; +} + NdbResultSet* NdbIndexScanOperation::readTuples(LockMode lm, Uint32 batch, @@ -1173,9 +1199,23 @@ NdbIndexScanOperation::readTuples(LockMode lm, NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0); if(rs && order_by){ m_ordered = 1; - m_sort_columns = m_accessTable->getNoOfColumns() - 1; // -1 for NDB$NODE + Uint32 cnt = m_accessTable->getNoOfColumns() - 1; + m_sort_columns = cnt; // -1 for NDB$NODE m_current_api_receiver = m_sent_receivers_count; m_api_receivers_count = m_sent_receivers_count; + + m_sort_columns = cnt; + for(Uint32 i = 0; i<cnt; i++){ + const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i]; + const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos); + NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1); + UintPtr newVal = UintPtr(tmp); + theTupleKeyDefined[i][0] = FAKE_PTR; + theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF); +#if (SIZEOF_CHARP == 8) + theTupleKeyDefined[i][2] = (newVal >> 32); +#endif + } } return rs; } @@ -1396,10 +1436,7 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ } int -NdbScanOperation::restart(){ - TransporterFacade* tp = TransporterFacade::instance(); - Guard guard(tp->theMutexPtr); - +NdbScanOperation::close_impl(TransporterFacade* tp){ Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; @@ -1407,8 +1444,8 @@ NdbScanOperation::restart(){ theNdbCon->theReleaseOnClose = true; return -1; } - - while(m_sent_receivers_count){ + + while(theError.code == 0 && m_sent_receivers_count){ theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1421,14 +1458,17 @@ NdbScanOperation::restart(){ m_api_receivers_count = 0; m_conf_receivers_count = 0; m_sent_receivers_count = 0; + theNdbCon->theReleaseOnClose = true; return -1; } } if(m_api_receivers_count+m_conf_receivers_count){ // Send close scan - if(send_next_scan(0, true) == -1) // Close scan + if(send_next_scan(0, true) == -1){ // Close scan + theNdbCon->theReleaseOnClose = true; return -1; + } } /** @@ -1447,15 +1487,15 @@ NdbScanOperation::restart(){ m_api_receivers_count = 0; m_conf_receivers_count = 0; m_sent_receivers_count = 0; + theNdbCon->theReleaseOnClose = true; return -1; } } + return 0; +} - /** - * Reset receivers - */ - const Uint32 parallell = theParallelism; - +void +NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){ for(Uint32 i = 0; i<parallell; i++){ m_receivers[i]->m_list_index = i; m_prepared_receivers[i] = m_receivers[i]->getId(); @@ -1470,13 +1510,64 @@ NdbScanOperation::restart(){ m_sent_receivers_count = parallell; m_conf_receivers_count = 0; - if(m_ordered){ + if(ordered){ m_current_api_receiver = parallell; m_api_receivers_count = parallell; } +} + +int +NdbScanOperation::restart() +{ + + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + Uint32 nodeId = theNdbCon->theDBnode; + + { + int res; + if((res= close_impl(tp))) + { + return res; + } + } + + /** + * Reset receivers + */ + reset_receivers(theParallelism, m_ordered); + theError.code = 0; if (doSendScan(nodeId) == -1) return -1; return 0; } + +int +NdbIndexScanOperation::reset_bounds(){ + int res; + + { + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + res= close_impl(tp); + } + + if(!res) + { + theError.code = 0; + reset_receivers(theParallelism, m_ordered); + + theLastKEYINFO = theFirstKEYINFO; + theKEYINFOptr = ((KeyInfo*)theFirstKEYINFO->getDataPtrSend())->keyData; + theTupKeyLen = 0; + theTotalNrOfKeyWordInSignal = 0; + m_transConnection + ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp, + this); + m_transConnection->define_scan_op(this); + return 0; + } + return res; +} diff --git a/ndb/src/ndbapi/Ndbif.cpp b/ndb/src/ndbapi/Ndbif.cpp index 5c91467f2e5..c011c1a6a26 100644 --- a/ndb/src/ndbapi/Ndbif.cpp +++ b/ndb/src/ndbapi/Ndbif.cpp @@ -249,6 +249,7 @@ Ndb::report_node_failure(Uint32 node_id) */ the_release_ind[node_id] = 1; theWaiter.nodeFail(node_id); + return; }//Ndb::report_node_failure() @@ -271,9 +272,10 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId) Uint32 tNoSentTransactions = theNoOfSentTransactions; for (int i = tNoSentTransactions - 1; i >= 0; i--) { NdbConnection* localCon = theSentTransactionsArray[i]; - if (localCon->getConnectedNodeId() == aNodeId ) { + if (localCon->getConnectedNodeId() == aNodeId) { const NdbConnection::SendStatusType sendStatus = localCon->theSendStatus; - if (sendStatus == NdbConnection::sendTC_OP || sendStatus == NdbConnection::sendTC_COMMIT) { + if (sendStatus == NdbConnection::sendTC_OP || + sendStatus == NdbConnection::sendTC_COMMIT) { /* A transaction was interrupted in the prepare phase by a node failure. Since the transaction was not found in the phase @@ -293,7 +295,7 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId) printState("abortTransactionsAfterNodeFailure %x", this); abort(); #endif - }// + } /* All transactions arriving here have no connection to the kernel intact since the node was failing and they were aborted. Thus we @@ -302,7 +304,11 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId) localCon->theCommitStatus = NdbConnection::Aborted; localCon->theReleaseOnClose = true; completedTransaction(localCon); - }//if + } + else if(localCon->report_node_failure(aNodeId)) + { + completedTransaction(localCon); + } }//for return; }//Ndb::abortTransactionsAfterNodeFailure() diff --git a/ndb/src/ndbapi/Ndbinit.cpp b/ndb/src/ndbapi/Ndbinit.cpp index 8589158ae6a..b8927f2abba 100644 --- a/ndb/src/ndbapi/Ndbinit.cpp +++ b/ndb/src/ndbapi/Ndbinit.cpp @@ -107,8 +107,6 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, theOpIdleList= NULL; theScanOpIdleList= NULL; theIndexOpIdleList= NULL; -// theSchemaConIdleList= NULL; -// theSchemaConToNdbList= NULL; theTransactionList= NULL; theConnectionArray= NULL; theRecAttrIdleList= NULL; @@ -134,10 +132,13 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, fullyQualifiedNames = true; +#ifdef POORMANSPURIFY cgetSignals =0; cfreeSignals = 0; cnewSignals = 0; creleaseSignals = 0; +#endif + theError.code = 0; theNdbObjectIdMap = new NdbObjectIdMap(1024,1024); @@ -159,12 +160,12 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, theLastTupleId[i] = 0; }//for - snprintf(theDataBase, sizeof(theDataBase), "%s", + BaseString::snprintf(theDataBase, sizeof(theDataBase), "%s", aDataBase ? aDataBase : ""); - snprintf(theDataBaseSchema, sizeof(theDataBaseSchema), "%s", + BaseString::snprintf(theDataBaseSchema, sizeof(theDataBaseSchema), "%s", aSchema ? aSchema : ""); - int len = snprintf(prefixName, sizeof(prefixName), "%s%c%s%c", + int len = BaseString::snprintf(prefixName, sizeof(prefixName), "%s%c%s%c", theDataBase, table_name_separator, theDataBaseSchema, table_name_separator); prefixEnd = prefixName + (len < (int) sizeof(prefixName) ? len : diff --git a/ndb/src/ndbapi/Ndblist.cpp b/ndb/src/ndbapi/Ndblist.cpp index af98aa09280..a5f2a4801d5 100644 --- a/ndb/src/ndbapi/Ndblist.cpp +++ b/ndb/src/ndbapi/Ndblist.cpp @@ -33,7 +33,7 @@ Ndb::checkFailedNode() DBUG_PRINT("enter", ("theNoOfDBnodes: %d", theNoOfDBnodes)); DBUG_ASSERT(theNoOfDBnodes < MAX_NDB_NODES); - for (int i = 0; i < theNoOfDBnodes; i++){ + for (Uint32 i = 0; i < theNoOfDBnodes; i++){ const NodeId node_id = theDBnodes[i]; DBUG_PRINT("info", ("i: %d, node_id: %d", i, node_id)); @@ -432,11 +432,15 @@ Ndb::getSignal() theSignalIdleList = tSignalNext; } else { tSignal = new NdbApiSignal(theMyRef); +#ifdef POORMANSPURIFY cnewSignals++; +#endif if (tSignal != NULL) tSignal->next(NULL); } +#ifdef POORMANSPURIFY cgetSignals++; +#endif return tSignal; } @@ -605,7 +609,9 @@ Ndb::releaseSignal(NdbApiSignal* aSignal) } #endif #endif +#ifdef POORMANSPURIFY creleaseSignals++; +#endif aSignal->next(theSignalIdleList); theSignalIdleList = aSignal; } @@ -649,8 +655,8 @@ Remark: Always release the first item in the free list void Ndb::freeScanOperation() { - NdbScanOperation* tOp = theScanOpIdleList; - theScanOpIdleList = (NdbIndexScanOperation *) theScanOpIdleList->next(); + NdbIndexScanOperation* tOp = theScanOpIdleList; + theScanOpIdleList = (NdbIndexScanOperation *)tOp->next(); delete tOp; } @@ -769,7 +775,9 @@ Ndb::freeSignal() NdbApiSignal* tSignal = theSignalIdleList; theSignalIdleList = tSignal->next(); delete tSignal; +#ifdef POORMANSPURIFY cfreeSignals++; +#endif } void diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c index fdfd8a15fb0..20661b89517 100644 --- a/ndb/src/ndbapi/ndberror.c +++ b/ndb/src/ndbapi/ndberror.c @@ -16,7 +16,6 @@ #include <ndb_global.h> - #include <ndberror.h> typedef struct ErrorBundle { @@ -92,9 +91,10 @@ ErrorBundle ErrorCodes[] = { { 4031, NR, "Node failure caused abort of transaction" }, { 4033, NR, "Send to NDB failed" }, { 4115, NR, - "Transaction was committed but all read information was not " - "received due to node crash" }, - + "Transaction was committed but all read information was not " + "received due to node crash" }, + { 4119, NR, "Simple/dirty read failed due to node failure" }, + /** * Node shutdown */ @@ -171,7 +171,7 @@ ErrorBundle ErrorCodes[] = { { 677, OL, "Index UNDO buffers overloaded" }, { 891, OL, "Data UNDO buffers overloaded" }, { 1221, OL, "REDO log buffers overloaded" }, - { 4006, AE, "Connect failure - out of connection objects" }, + { 4006, OL, "Connect failure - out of connection objects" }, @@ -491,6 +491,7 @@ static const int NbClassification = sizeof(StatusClassificationMapping)/sizeof(ErrorStatusClassification); +#ifdef NOT_USED /** * Complete all fields of an NdbError given the error code * and details @@ -506,7 +507,7 @@ set(ndberror_struct * error, int code, const char * details, ...){ va_end(ap); } } - +#endif void ndberror_update(ndberror_struct * error){ @@ -592,8 +593,10 @@ int ndb_error_string(int err_no, char *str, unsigned int size) error.code = err_no; ndberror_update(&error); - len = snprintf(str, size-1, "%s: %s: %s", error.message, - ndberror_status_message(error.status), ndberror_classification_message(error.classification)); + len = + snprintf(str, size-1, "%s: %s: %s", error.message, + ndberror_status_message(error.status), + ndberror_classification_message(error.classification)); str[size-1]= '\0'; return len; |