diff options
Diffstat (limited to 'ndb/src/ndbapi/Ndbif.cpp')
-rw-r--r-- | ndb/src/ndbapi/Ndbif.cpp | 1356 |
1 files changed, 1356 insertions, 0 deletions
diff --git a/ndb/src/ndbapi/Ndbif.cpp b/ndb/src/ndbapi/Ndbif.cpp new file mode 100644 index 00000000000..e334c1bcc39 --- /dev/null +++ b/ndb/src/ndbapi/Ndbif.cpp @@ -0,0 +1,1356 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#include "NdbApiSignal.hpp" +#include "AttrType.hpp" +#include "NdbImpl.hpp" +#include "NdbSchemaOp.hpp" +#include "NdbSchemaCon.hpp" +#include "NdbOperation.hpp" +#include "NdbIndexOperation.hpp" +#include "NdbScanReceiver.hpp" +#include "NdbConnection.hpp" +#include "NdbRecAttr.hpp" +#include "NdbReceiver.hpp" +#include "API.hpp" + +#include <signaldata/TcCommit.hpp> +#include <signaldata/TcKeyFailConf.hpp> +#include <signaldata/TcKeyConf.hpp> +#include <signaldata/TestOrd.hpp> +#include <signaldata/CreateIndx.hpp> +#include <signaldata/DropIndx.hpp> +#include <signaldata/TcIndx.hpp> + +#include <ndb_limits.h> +#include <NdbOut.hpp> +#include <NdbTick.h> + +#include <assert.h> + +/****************************************************************************** + * int init( int aNrOfCon, int aNrOfOp ); + * + * Return Value: Return 0 : init was successful. + * Return -1: In all other case. + * Parameters: aNrOfCon : Number of connections offered to the application. + * aNrOfOp : Number of operations offered to the application. + * Remark: Create pointers and idle list Synchronous. + ****************************************************************************/ +int +Ndb::init(int aMaxNoOfTransactions) +{ + int i; + int aNrOfCon; + int aNrOfOp; + int tMaxNoOfTransactions; + NdbApiSignal* tSignal[16]; // Initiate free list of 16 signal objects + if (theInitState != NotInitialised) { + switch(theInitState){ + case InitConfigError: + theError.code = 4117; + break; + default: + theError.code = 4104; + break; + } + return -1; + }//if + theInitState = StartingInit; + TransporterFacade * theFacade = TransporterFacade::instance(); + theFacade->lock_mutex(); + + const int tBlockNo = theFacade->open(this, + executeMessage, + statusMessage); + + + if ( tBlockNo == -1 ) { + theError.code = 4105; + theFacade->unlock_mutex(); + return -1; // no more free blocknumbers + }//if + + theNdbBlockNumber = tBlockNo; + + theNode = theFacade->ownId(); + theMyRef = numberToRef(theNdbBlockNumber, theNode); + + for (i = 1; i < MAX_NDB_NODES; i++){ + if (theFacade->getIsNodeDefined(i)){ + theDBnodes[theNoOfDBnodes] = i; + theNoOfDBnodes++; + } + } + + theFirstTransId = ((Uint64)theNdbBlockNumber << 52)+((Uint64)theNode << 40); + theFacade->unlock_mutex(); + + + theDictionary = new NdbDictionaryImpl(*this); + if (theDictionary == NULL) { + theError.code = 4000; + return -1; + } + theDictionary->setTransporter(this, theFacade); + + aNrOfCon = theNoOfDBnodes; + aNrOfOp = 2*theNoOfDBnodes; + + // Create connection object in a linked list + if((createConIdleList(aNrOfCon)) == -1){ + theError.code = 4000; + goto error_handler; + } + + // Create operations in a linked list + if((createOpIdleList(aNrOfOp)) == -1){ + theError.code = 4000; + goto error_handler; + } + + tMaxNoOfTransactions = aMaxNoOfTransactions * 3; + if (tMaxNoOfTransactions > 1024) { + tMaxNoOfTransactions = 1024; + }//if + theMaxNoOfTransactions = tMaxNoOfTransactions; + + thePreparedTransactionsArray = new NdbConnection* [tMaxNoOfTransactions]; + theSentTransactionsArray = new NdbConnection* [tMaxNoOfTransactions]; + theCompletedTransactionsArray = new NdbConnection* [tMaxNoOfTransactions]; + + if ((thePreparedTransactionsArray == NULL) || + (theSentTransactionsArray == NULL) || + (theCompletedTransactionsArray == NULL)) { + goto error_handler; + }//if + + for (i = 0; i < tMaxNoOfTransactions; i++) { + thePreparedTransactionsArray[i] = NULL; + theSentTransactionsArray[i] = NULL; + theCompletedTransactionsArray[i] = NULL; + }//for + + startTransactionNodeSelectionData.init(theNoOfDBnodes, theDBnodes); + + for (i = 0; i < 16; i++){ + tSignal[i] = getSignal(); + if(tSignal[i] == NULL) { + theError.code = 4000; + goto error_handler; + } + } + for (i = 0; i < 16; i++) + releaseSignal(tSignal[i]); + + theInitState = Initialised; + + theCommitAckSignal = new NdbApiSignal(theMyRef); + return 0; + +error_handler: + ndbout << "error_handler" << endl; + releaseTransactionArrays(); + while ( theConIdleList != NULL ) + freeNdbCon(); + while ( theSignalIdleList != NULL ) + freeSignal(); + while (theRecAttrIdleList != NULL) + freeRecAttr(); + while (theOpIdleList != NULL) + freeOperation(); + + delete theDictionary; + TransporterFacade::instance()->close(theNdbBlockNumber); + return -1; +} + +void +Ndb::releaseTransactionArrays() +{ + if (thePreparedTransactionsArray != NULL) { + delete [] thePreparedTransactionsArray; + }//if + if (theSentTransactionsArray != NULL) { + delete [] theSentTransactionsArray; + }//if + if (theCompletedTransactionsArray != NULL) { + delete [] theCompletedTransactionsArray; + }//if +}//Ndb::releaseTransactionArrays() + +void +Ndb::executeMessage(void* NdbObject, + NdbApiSignal * aSignal, + LinearSectionPtr ptr[3]) +{ + Ndb* tNdb = (Ndb*)NdbObject; + tNdb->handleReceivedSignal(aSignal, ptr); +} + +void +Ndb::statusMessage(void* NdbObject, NodeId a_node, bool alive, bool nfComplete) +{ + Ndb* tNdb = (Ndb*)NdbObject; + if (alive) { + if (nfComplete) { + assert(0); + }//if + } else { + if (nfComplete) { + tNdb->report_node_failure_completed(a_node); + } else { + tNdb->report_node_failure(a_node); + }//if + }//if + NdbDictInterface::execNodeStatus(&tNdb->theDictionary->m_receiver, + a_node, alive, nfComplete); +} + +void +Ndb::report_node_failure(Uint32 node_id) +{ + /** + * We can only set the state here since this object can execute + * simultaneously. + * + * This method is only called by ClusterMgr (via lots of methods) + */ + the_release_ind[node_id] = 1; + theWaiter.nodeFail(node_id); +}//Ndb::report_node_failure() + + +void +Ndb::report_node_failure_completed(Uint32 node_id) +{ + abortTransactionsAfterNodeFailure(node_id); + +}//Ndb::report_node_failure_completed() + +/*************************************************************************** +void abortTransactionsAfterNodeFailure(); + +Remark: Abort all transactions in theSentTransactionsArray after connection + to one node has failed +****************************************************************************/ +void +Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId) +{ + Uint32 tNoSentTransactions = theNoOfSentTransactions; + for (int i = tNoSentTransactions - 1; i >= 0; i--) { + NdbConnection* localCon = theSentTransactionsArray[i]; + if (localCon->getConnectedNodeId() == aNodeId ) { + const SendStatusType sendStatus = localCon->theSendStatus; + if (sendStatus == sendTC_OP || sendStatus == sendTC_COMMIT) { + /* + A transaction was interrupted in the prepare phase by a node + failure. Since the transaction was not found in the phase + after the node failure it cannot have been committed and + we report a normal node failure abort. + */ + localCon->setOperationErrorCodeAbort(4010); + localCon->theCompletionStatus = CompletedFailure; + } else if (sendStatus == sendTC_ROLLBACK) { + /* + We aimed for abort and abort we got even if it was by a node + failure. We will thus report it as a success. + */ + localCon->theCompletionStatus = CompletedSuccess; + } else { +#ifdef VM_TRACE + printState("abortTransactionsAfterNodeFailure %x", this); + abort(); +#endif + }// + /* + All transactions arriving here have no connection to the kernel + intact since the node was failing and they were aborted. Thus we + set commit state to Aborted and set state to release on close. + */ + localCon->theCommitStatus = Aborted; + localCon->theReleaseOnClose = true; + completedTransaction(localCon); + }//if + }//for + return; +}//Ndb::abortTransactionsAfterNodeFailure() + +/**************************************************************************** +void handleReceivedSignal(NdbApiSignal* aSignal); + +Parameters: aSignal: The signal object. +Remark: Send all operations belonging to this connection. +*****************************************************************************/ +void +Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) +{ + NdbOperation* tOp; + NdbIndexOperation* tIndexOp; + NdbConnection* tCon; + int tReturnCode; + const Uint32* tDataPtr = aSignal->getDataPtr(); + const Uint32 tWaitState = theWaiter.m_state; + const Uint32 tSignalNumber = aSignal->readSignalNumber(); + const Uint32 tFirstData = *tDataPtr; + + /* + In order to support 64 bit processes in the application we need to use + id's rather than a direct pointer to the object used. It is also a good + idea that one cannot corrupt the application code by sending a corrupt + memory pointer. + + All signals received by the API requires the first data word to be such + an id to the receiving object. + */ + + switch (tSignalNumber){ + case GSN_TCKEYCONF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + const TcKeyConf * const keyConf = (TcKeyConf *)tDataPtr; + const BlockReference aTCRef = aSignal->theSendersBlockRef; + + tCon = void2con(tFirstDataPtr); + if ((tCon->checkMagicNumber() == 0) && + (tCon->theSendStatus == sendTC_OP)) { + tReturnCode = tCon->receiveTCKEYCONF(keyConf, aSignal->getLength()); + if (tReturnCode != -1) { + completedTransaction(tCon); + }//if + + if(TcKeyConf::getMarkerFlag(keyConf->confInfo)){ + NdbConnection::sendTC_COMMIT_ACK(theCommitAckSignal, + keyConf->transId1, + keyConf->transId2, + aTCRef); + } + + return; + }//if + goto InvalidSignal; + + return; + } + case GSN_READCONF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + tOp = void2rec_op(tFirstDataPtr); + if (tOp->checkMagicNumber() == 0) { + tCon = tOp->theNdbCon; + if (tCon != NULL) { + if (tCon->theSendStatus == sendTC_OP) { + tReturnCode = tOp->receiveREAD_CONF(tDataPtr, + aSignal->getLength()); + if (tReturnCode != -1) { + completedTransaction(tCon); + }//if + }//if + }//if + }//if + return; + } + case GSN_TRANSID_AI: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + // ndbout << "*** GSN_TRANSID_AI ***" << endl; + NdbReceiver* tRec = void2rec(tFirstDataPtr); + if (tRec->getType() == NdbReceiver::NDB_OPERATION){ + // tOp = (NdbOperation*)tRec->getOwner(); + tOp = void2rec_op(tFirstDataPtr); + // ndbout << "NDB_OPERATION" << endl; + if (tOp->checkMagicNumber() == 0) { + tCon = tOp->theNdbCon; + if (tCon != NULL) { + if (tCon->theSendStatus == sendTC_OP) { + tReturnCode = tOp->receiveTRANSID_AI(tDataPtr, + aSignal->getLength()); + if (tReturnCode != -1) { + completedTransaction(tCon); + break; + } + } + } + } + } else if (tRec->getType() == NdbReceiver::NDB_INDEX_OPERATION){ + // tOp = (NdbIndexOperation*)tRec->getOwner(); + tOp = void2rec_iop(tFirstDataPtr); + // ndbout << "NDB_INDEX_OPERATION" << endl; + if (tOp->checkMagicNumber() == 0) { + tCon = tOp->theNdbCon; + if (tCon != NULL) { + if (tCon->theSendStatus == sendTC_OP) { + tReturnCode = tOp->receiveTRANSID_AI(tDataPtr, + aSignal->getLength()); + if (tReturnCode != -1) { + completedTransaction(tCon); + break; + } + } + } + } + } else if (tRec->getType() == NdbReceiver::NDB_SCANRECEIVER) { + // NdbScanReceiver* tScanRec = (NdbScanReceiver*)tRec->getOwner(); + // NdbScanReceiver* tScanRec = + // (NdbScanReceiver*)(void2rec(tFirstDataPtr)->getOwner()); + NdbScanReceiver* tScanRec = void2rec_srec(tFirstDataPtr); + // ndbout << "NDB_SCANRECEIVER" << endl; + if(tScanRec->checkMagicNumber() == 0){ + tReturnCode = tScanRec->receiveTRANSID_AI_SCAN(aSignal); + if (tReturnCode != -1) { + theWaiter.m_state = NO_WAIT; + break; + } + } + } else { +#ifdef NDB_NO_DROPPED_SIGNAL + abort(); +#endif + goto InvalidSignal; + } + return; + } + case GSN_TCKEY_FAILCONF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + const TcKeyFailConf * const failConf = (TcKeyFailConf *)tDataPtr; + const BlockReference aTCRef = aSignal->theSendersBlockRef; + + tOp = void2rec_op(tFirstDataPtr); + + if (tOp->checkMagicNumber() == 0) { + tCon = tOp->theNdbCon; + if (tCon != NULL) { + if ((tCon->theSendStatus == sendTC_OP) || + (tCon->theSendStatus == sendTC_COMMIT)) { + tReturnCode = tCon->receiveTCKEY_FAILCONF(failConf); + if (tReturnCode != -1) { + completedTransaction(tCon); + }//if + }//if + }//if + }//if + + if(tFirstData & 1){ + NdbConnection::sendTC_COMMIT_ACK(theCommitAckSignal, + failConf->transId1, + failConf->transId2, + aTCRef); + } + return; + } + case GSN_TCKEY_FAILREF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + tOp = void2rec_op(tFirstDataPtr); + if (tOp->checkMagicNumber() == 0) { + tCon = tOp->theNdbCon; + if (tCon != NULL) { + if ((tCon->theSendStatus == sendTC_OP) || + (tCon->theSendStatus == sendTC_ROLLBACK)) { + tReturnCode = tCon->receiveTCKEY_FAILREF(aSignal); + if (tReturnCode != -1) { + completedTransaction(tCon); + return; + }//if + }//if + }//if + }//if + return; + } + case GSN_TCKEYREF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + tOp = void2rec_op(tFirstDataPtr); + if (tOp->checkMagicNumber() == 0) { + tCon = tOp->theNdbCon; + if (tCon != NULL) { + if (tCon->theSendStatus == sendTC_OP) { + tReturnCode = tOp->receiveTCKEYREF(aSignal); + if (tReturnCode != -1) { + completedTransaction(tCon); + }//if + return; + }//if + }//if + } //if + goto InvalidSignal; + return; + } + case GSN_TC_COMMITCONF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + const TcCommitConf * const commitConf = (TcCommitConf *)tDataPtr; + const BlockReference aTCRef = aSignal->theSendersBlockRef; + + tCon = void2con(tFirstDataPtr); + if ((tCon->checkMagicNumber() == 0) && + (tCon->theSendStatus == sendTC_COMMIT)) { + tReturnCode = tCon->receiveTC_COMMITCONF(commitConf); + if (tReturnCode != -1) { + completedTransaction(tCon); + }//if + + if(tFirstData & 1){ + NdbConnection::sendTC_COMMIT_ACK(theCommitAckSignal, + commitConf->transId1, + commitConf->transId2, + aTCRef); + } + return; + } + goto InvalidSignal; + return; + } + + case GSN_TC_COMMITREF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + tCon = void2con(tFirstDataPtr); + if ((tCon->checkMagicNumber() == 0) && + (tCon->theSendStatus == sendTC_COMMIT)) { + tReturnCode = tCon->receiveTC_COMMITREF(aSignal); + if (tReturnCode != -1) { + completedTransaction(tCon); + return; + }//if + }//if + return; + } + case GSN_TCROLLBACKCONF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + tCon = void2con(tFirstDataPtr); + if ((tCon->checkMagicNumber() == 0) && + (tCon->theSendStatus == sendTC_ROLLBACK)) { + tReturnCode = tCon->receiveTCROLLBACKCONF(aSignal); + if (tReturnCode != -1) { + completedTransaction(tCon); + }//if + }//if + return; + } + case GSN_TCROLLBACKREF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + tCon = void2con(tFirstDataPtr); + if ((tCon->checkMagicNumber() == 0) && + (tCon->theSendStatus == sendTC_ROLLBACK)) { + tReturnCode = tCon->receiveTCROLLBACKREF(aSignal); + if (tReturnCode != -1) { + completedTransaction(tCon); + return; + }//if + }//if + return; + } + case GSN_TCROLLBACKREP: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + tCon = void2con(tFirstDataPtr); + if (tCon->checkMagicNumber() == 0) { + tReturnCode = tCon->receiveTCROLLBACKREP(aSignal); + if (tReturnCode != -1) { + completedTransaction(tCon); + }//if + }//if + return; + } + case GSN_TCSEIZECONF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + if (tWaitState != WAIT_TC_SEIZE) { + return; + }//if + tCon = void2con(tFirstDataPtr); + if (tCon->checkMagicNumber() != 0) { + return; + }//if + tReturnCode = tCon->receiveTCSEIZECONF(aSignal); + if (tReturnCode != -1) { + theWaiter.m_state = NO_WAIT; + } else { + return; + }//if + break; + } + case GSN_TCSEIZEREF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + if (tWaitState != WAIT_TC_SEIZE) { + return; + }//if + tCon = void2con(tFirstDataPtr); + if (tCon->checkMagicNumber() != 0) { + return; + }//if + tReturnCode = tCon->receiveTCSEIZEREF(aSignal); + if (tReturnCode != -1) { + theWaiter.m_state = NO_WAIT; + } else { + return; + }//if + break; + } + case GSN_TCRELEASECONF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + if (tWaitState != WAIT_TC_RELEASE) { + goto InvalidSignal; + }//if + tCon = void2con(tFirstDataPtr); + if (tCon->checkMagicNumber() != 0) { + goto InvalidSignal; + }//if + tReturnCode = tCon->receiveTCRELEASECONF(aSignal); + if (tReturnCode != -1) { + theWaiter.m_state = NO_WAIT; + }//if + break; + } + case GSN_TCRELEASEREF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + if (tWaitState != WAIT_TC_RELEASE) { + goto InvalidSignal; + }//if + tCon = void2con(tFirstDataPtr); + if (tCon->checkMagicNumber() != 0) { + goto InvalidSignal; + }//if + tReturnCode = tCon->receiveTCRELEASEREF(aSignal); + if (tReturnCode != -1) { + theWaiter.m_state = NO_WAIT; + }//if + break; + } + + case GSN_GET_TABINFOREF: + case GSN_GET_TABINFO_CONF: + case GSN_CREATE_TABLE_REF: + case GSN_CREATE_TABLE_CONF: + case GSN_DROP_TABLE_CONF: + case GSN_DROP_TABLE_REF: + case GSN_ALTER_TABLE_CONF: + case GSN_ALTER_TABLE_REF: + case GSN_CREATE_INDX_CONF: + case GSN_CREATE_INDX_REF: + case GSN_DROP_INDX_CONF: + case GSN_DROP_INDX_REF: + case GSN_CREATE_EVNT_CONF: + case GSN_CREATE_EVNT_REF: + case GSN_DROP_EVNT_CONF: + case GSN_DROP_EVNT_REF: + case GSN_LIST_TABLES_CONF: + NdbDictInterface::execSignal(&theDictionary->m_receiver, + aSignal, ptr); + break; + + case GSN_SUB_META_DATA: + case GSN_SUB_REMOVE_CONF: + case GSN_SUB_REMOVE_REF: + break; // ignore these signals + case GSN_SUB_GCP_COMPLETE_REP: + case GSN_SUB_START_CONF: + case GSN_SUB_START_REF: + case GSN_SUB_TABLE_DATA: + case GSN_SUB_STOP_CONF: + case GSN_SUB_STOP_REF: + NdbDictInterface::execSignal(&theDictionary->m_receiver, + aSignal, ptr); + break; + + case GSN_DIHNDBTAMPER: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + if (tWaitState != WAIT_NDB_TAMPER) + return; + tCon = void2con(tFirstDataPtr); + if (tCon->checkMagicNumber() != 0) + return; + tReturnCode = tCon->receiveDIHNDBTAMPER(aSignal); + if (tReturnCode != -1) + theWaiter.m_state = NO_WAIT; + break; + } + case GSN_SCAN_TABCONF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + //ndbout << "*** GSN_SCAN_TABCONF *** " << endl; + if (tWaitState != WAIT_SCAN){ + return; + } + tCon = void2con(tFirstDataPtr); + if (tCon->checkMagicNumber() != 0) + return; + tReturnCode = tCon->receiveSCAN_TABCONF(aSignal); + if (tReturnCode != -1) + theWaiter.m_state = NO_WAIT; + break; + } + case GSN_SCAN_TABREF: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + if (tWaitState == WAIT_SCAN){ + tCon = void2con(tFirstDataPtr); + if (tCon->checkMagicNumber() == 0){ + tReturnCode = tCon->receiveSCAN_TABREF(aSignal); + if (tReturnCode != -1){ + theWaiter.m_state = NO_WAIT; + } + break; + } + } + goto InvalidSignal; + } + case GSN_SCAN_TABINFO: + { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + //ndbout << "*** GSN_SCAN_TABINFO ***" << endl; + if (tWaitState != WAIT_SCAN) + return; + tCon = void2con(tFirstDataPtr); + if (tCon->checkMagicNumber() != 0) + return; + tReturnCode = tCon->receiveSCAN_TABINFO(aSignal); + if (tReturnCode != -1) + theWaiter.m_state = NO_WAIT; + break; + } + case GSN_KEYINFO20: { + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + //ndbout << "*** GSN_KEYINFO20 ***" << endl; + NdbScanReceiver* tScanRec = void2rec_srec(tFirstDataPtr); + if (tScanRec->checkMagicNumber() != 0) + return; + tReturnCode = tScanRec->receiveKEYINFO20(aSignal); + if (tReturnCode != -1) + theWaiter.m_state = NO_WAIT; + break; + } + case GSN_TCINDXCONF:{ + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + const TcIndxConf * const indxConf = (TcIndxConf *)tDataPtr; + const BlockReference aTCRef = aSignal->theSendersBlockRef; + tCon = void2con(tFirstDataPtr); + if ((tCon->checkMagicNumber() == 0) && + (tCon->theSendStatus == sendTC_OP)) { + tReturnCode = tCon->receiveTCINDXCONF(indxConf, aSignal->getLength()); + if (tReturnCode != -1) { + completedTransaction(tCon); + }//if + }//if + + if(TcIndxConf::getMarkerFlag(indxConf->confInfo)){ + NdbConnection::sendTC_COMMIT_ACK(theCommitAckSignal, + indxConf->transId1, + indxConf->transId2, + aTCRef); + } + break; + } + case GSN_TCINDXREF:{ + void* tFirstDataPtr = int2void(tFirstData); + if (tFirstDataPtr == 0) goto InvalidSignal; + + tIndexOp = void2rec_iop(tFirstDataPtr); + if (tIndexOp->checkMagicNumber() == 0) { + tCon = tIndexOp->theNdbCon; + if (tCon != NULL) { + if (tCon->theSendStatus == sendTC_OP) { + tReturnCode = tIndexOp->receiveTCINDXREF(aSignal); + if (tReturnCode != -1) { + completedTransaction(tCon); + }//if + return; + }//if + }//if + }//if + goto InvalidSignal; + return; + } + default: + goto InvalidSignal; + }//switch + + if (theWaiter.m_state == NO_WAIT) { + // Wake up the thread waiting for response + NdbCondition_Signal(theWaiter.m_condition); + }//if + return; + + InvalidSignal: +#ifdef VM_TRACE + ndbout_c("Ndbif: Error Ndb::handleReceivedSignal " + "(GSN=%d, theWaiter.m_state=%d)" + " sender = (Block: %d Node: %d)", + tSignalNumber, + tWaitState, + refToBlock(aSignal->theSendersBlockRef), + refToNode(aSignal->theSendersBlockRef)); +#endif +#ifdef NDB_NO_DROPPED_SIGNAL + abort(); +#endif + + return; +}//Ndb::handleReceivedSignal() + + +/***************************************************************************** +void completedTransaction(NdbConnection* aCon); + +Remark: One transaction has been completed. + Remove it from send array and put it into the completed + transaction array. Finally check if it is time to wake + up a poller. +******************************************************************************/ +void +Ndb::completedTransaction(NdbConnection* aCon) +{ + Uint32 tTransArrayIndex = aCon->theTransArrayIndex; + Uint32 tNoSentTransactions = theNoOfSentTransactions; + Uint32 tNoCompletedTransactions = theNoOfCompletedTransactions; + if ((tNoSentTransactions > 0) && (aCon->theListState == InSendList) && + (tTransArrayIndex < tNoSentTransactions)) { + NdbConnection* tMoveCon = theSentTransactionsArray[tNoSentTransactions - 1]; + + theCompletedTransactionsArray[tNoCompletedTransactions] = aCon; + aCon->theTransArrayIndex = tNoCompletedTransactions; + if (tMoveCon != aCon) { + tMoveCon->theTransArrayIndex = tTransArrayIndex; + theSentTransactionsArray[tTransArrayIndex] = tMoveCon; + }//if + theSentTransactionsArray[tNoSentTransactions - 1] = NULL; + theNoOfCompletedTransactions = tNoCompletedTransactions + 1; + + theNoOfSentTransactions = tNoSentTransactions - 1; + aCon->theListState = InCompletedList; + aCon->handleExecuteCompletion(); + if ((theMinNoOfEventsToWakeUp != 0) && + (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) { + theMinNoOfEventsToWakeUp = 0; + NdbCondition_Signal(theWaiter.m_condition); + return; + }//if + } else { + ndbout << "theNoOfSentTransactions = " << theNoOfSentTransactions; + ndbout << " theListState = " << aCon->theListState; + ndbout << " theTransArrayIndex = " << aCon->theTransArrayIndex; + ndbout << endl << flush; +#ifdef VM_TRACE + printState("completedTransaction abort"); +#endif + abort(); + }//if +}//Ndb::completedTransaction() + +/***************************************************************************** +void reportCallback(NdbConnection** aCopyArray, Uint32 aNoOfCompletedTrans); + +Remark: Call the callback methods of the completed transactions. +******************************************************************************/ +void +Ndb::reportCallback(NdbConnection** aCopyArray, Uint32 aNoOfCompletedTrans) +{ + Uint32 i; + if (aNoOfCompletedTrans > 0) { + for (i = 0; i < aNoOfCompletedTrans; i++) { + void* anyObject = aCopyArray[i]->theCallbackObject; + NdbAsynchCallback aCallback = aCopyArray[i]->theCallbackFunction; + int tResult = 0; + if (aCallback != NULL) { + if (aCopyArray[i]->theReturnStatus == ReturnFailure) { + tResult = -1; + }//if + (*aCallback)(tResult, aCopyArray[i], anyObject); + }//if + }//for + }//if +}//Ndb::reportCallback() + +/***************************************************************************** +Uint32 pollCompleted(NdbConnection** aCopyArray); + +Remark: Transfer the data from the completed transaction to a local array. + This support is used by a number of the poll-methods. +******************************************************************************/ +Uint32 +Ndb::pollCompleted(NdbConnection** aCopyArray) +{ + check_send_timeout(); + Uint32 i; + Uint32 tNoCompletedTransactions = theNoOfCompletedTransactions; + if (tNoCompletedTransactions > 0) { + for (i = 0; i < tNoCompletedTransactions; i++) { + aCopyArray[i] = theCompletedTransactionsArray[i]; + if (aCopyArray[i]->theListState != InCompletedList) { + ndbout << "pollCompleted error "; + ndbout << aCopyArray[i]->theListState << endl; + abort(); + }//if + theCompletedTransactionsArray[i] = NULL; + aCopyArray[i]->theListState = NotInList; + }//for + }//if + theNoOfCompletedTransactions = 0; + return tNoCompletedTransactions; +}//Ndb::pollCompleted() + +void +Ndb::check_send_timeout() +{ + NDB_TICKS current_time = NdbTick_CurrentMillisecond(); + if (current_time - the_last_check_time > 1000) { + the_last_check_time = current_time; + Uint32 no_of_sent = theNoOfSentTransactions; + for (Uint32 i = 0; i < no_of_sent; i++) { + NdbConnection* a_con = theSentTransactionsArray[i]; + if ((current_time - a_con->theStartTransTime) > + WAITFOR_RESPONSE_TIMEOUT) { +#ifdef VM_TRACE + a_con->printState(); +#endif + a_con->setOperationErrorCodeAbort(4012); + a_con->theCommitStatus = Aborted; + a_con->theCompletionStatus = CompletedFailure; + a_con->handleExecuteCompletion(); + remove_sent_list(i); + insert_completed_list(a_con); + no_of_sent--; + i--; + }//if + }//for + }//if +} + +void +Ndb::remove_sent_list(Uint32 list_index) +{ + Uint32 last_index = theNoOfSentTransactions - 1; + if (list_index < last_index) { + NdbConnection* t_con = theSentTransactionsArray[last_index]; + theSentTransactionsArray[list_index] = t_con; + }//if + theNoOfSentTransactions = last_index; + theSentTransactionsArray[last_index] = 0; +} + +Uint32 +Ndb::insert_completed_list(NdbConnection* a_con) +{ + Uint32 no_of_comp = theNoOfCompletedTransactions; + theCompletedTransactionsArray[no_of_comp] = a_con; + theNoOfCompletedTransactions = no_of_comp + 1; + a_con->theListState = InCompletedList; + a_con->theTransArrayIndex = no_of_comp; + return no_of_comp; +} + +Uint32 +Ndb::insert_sent_list(NdbConnection* a_con) +{ + Uint32 no_of_sent = theNoOfSentTransactions; + theSentTransactionsArray[no_of_sent] = a_con; + theNoOfSentTransactions = no_of_sent + 1; + a_con->theListState = InSendList; + a_con->theTransArrayIndex = no_of_sent; + return no_of_sent; +} + +/***************************************************************************** +void sendPrepTrans(int forceSend); + +Remark: Send a batch of transactions prepared for sending to the NDB kernel. +******************************************************************************/ +void +Ndb::sendPrepTrans(int forceSend) +{ + // Always called when holding mutex on TransporterFacade + /* + We will send a list of transactions to the NDB kernel. Before + sending we check the following. + 1) Node connected to is still alive + Checked by both checking node status and node sequence + 2) Send buffer can handle the size of messages we are planning to send + So far this is just a fake check but will soon be a real check + When the connected node has failed we abort the transaction without + responding anymore to the node since the kernel will clean up + automatically. + When sendBuffer cannot handle anymore messages then we will also abort + transaction but by communicating to the kernel since it is still alive + and we keep a small space for messages like that. + */ + Uint32 i; + TransporterFacade* tp = TransporterFacade::instance(); + Uint32 no_of_prep_trans = theNoOfPreparedTransactions; + for (i = 0; i < no_of_prep_trans; i++) { + NdbConnection * a_con = thePreparedTransactionsArray[i]; + thePreparedTransactionsArray[i] = NULL; + Uint32 node_id = a_con->getConnectedNodeId(); + if ((tp->getNodeSequence(node_id) == a_con->theNodeSequence) && + tp->get_node_alive(node_id) || + (tp->get_node_stopping(node_id) && + ((a_con->theSendStatus == sendABORT) || + (a_con->theSendStatus == sendABORTfail) || + (a_con->theSendStatus == sendCOMMITstate) || + (a_con->theSendStatus == sendCompleted)))) { + /* + We will send if + 1) Node is alive and sequences are correct OR + 2) Node is stopping and we only want to commit or abort + In a graceful stop situation we want to ensure quick aborts + of all transactions and commits and thus we allow aborts and + commits to continue but not normal operations. + */ + if (tp->check_send_size(node_id, a_con->get_send_size())) { + if (a_con->doSend() == 0) { + NDB_TICKS current_time = NdbTick_CurrentMillisecond(); + a_con->theStartTransTime = current_time; + continue; + } else { + /* + Although all precautions we did not manage to send the operations + Must have been a dropped connection on the transporter side. + We don't expect to be able to continue using this connection so + we will treat it as a node failure. + */ + TRACE_DEBUG("Send problem even after checking node status"); + }//if + } else { + /* + The send buffer is currently full or at least close to. We will + not allow a send to continue. We will set the connection so that + it is indicated that we need to abort the transaction. If we were + trying to commit or abort and got a send buffer we will not try + again and will thus set the state to Aborted to avoid a more or + less eternal loop of tries. + */ + if (a_con->theSendStatus == sendOperations) { + a_con->setOperationErrorCodeAbort(4021); + a_con->theCommitStatus = NeedAbort; + TRACE_DEBUG("Send buffer full and sendOperations"); + } else { + a_con->setOperationErrorCodeAbort(4026); + a_con->theCommitStatus = Aborted; + TRACE_DEBUG("Send buffer full, set state to Aborted"); + }//if + }//if + } else { +#ifdef VM_TRACE + a_con->printState(); +#endif + if ((tp->getNodeSequence(node_id) == a_con->theNodeSequence) && + tp->get_node_stopping(node_id)) { + /* + The node we are connected to is currently in an early stopping phase + of a graceful stop. We will not send the prepared transactions. We + will simply refuse and let the application code handle the abort. + */ + TRACE_DEBUG("Abort a transaction when stopping a node"); + a_con->setOperationErrorCodeAbort(4023); + a_con->theCommitStatus = NeedAbort; + } else { + /* + The node is hard dead and we cannot continue. We will also release + the connection to the free pool. + */ + TRACE_DEBUG("The node was stone dead, inform about abort"); + a_con->setOperationErrorCodeAbort(4025); + a_con->theReleaseOnClose = true; + a_con->theTransactionIsStarted = false; + a_con->theCommitStatus = Aborted; + }//if + }//if + a_con->theCompletionStatus = CompletedFailure; + a_con->handleExecuteCompletion(); + insert_completed_list(a_con); + }//for + theNoOfPreparedTransactions = 0; + if (forceSend == 0) { + tp->checkForceSend(theNdbBlockNumber); + } else if (forceSend == 1) { + tp->forceSend(theNdbBlockNumber); + }//if + return; +}//Ndb::sendPrepTrans() + +/***************************************************************************** +void waitCompletedTransactions(int aMilliSecondsToWait, int noOfEventsToWaitFor); + +Remark: First send all prepared operations and then check if there are any + transactions already completed. Do not wait for not completed + transactions. +******************************************************************************/ +void +Ndb::waitCompletedTransactions(int aMilliSecondsToWait, + int noOfEventsToWaitFor) +{ + theWaiter.m_state = NO_WAIT; + /** + * theWaiter.m_state = NO_WAIT; + * To ensure no messup with synchronous node fail handling + * (see ReportFailure) + */ + int waitTime = aMilliSecondsToWait; + NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + (NDB_TICKS)waitTime; + theMinNoOfEventsToWakeUp = noOfEventsToWaitFor; + do { + if (waitTime < 1000) waitTime = 1000; + NdbCondition_WaitTimeout(theWaiter.m_condition, + (NdbMutex*)theWaiter.m_mutex, + waitTime); + if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) { + break; + }//if + theMinNoOfEventsToWakeUp = noOfEventsToWaitFor; + waitTime = (int)(maxTime - NdbTick_CurrentMillisecond()); + } while (waitTime > 0); + return; +}//Ndb::waitCompletedTransactions() + +/***************************************************************************** +void sendPreparedTransactions(int forceSend = 0); + +Remark: First send all prepared operations and then check if there are any + transactions already completed. Do not wait for not completed + transactions. +******************************************************************************/ +void +Ndb::sendPreparedTransactions(int forceSend) +{ + TransporterFacade::instance()->lock_mutex(); + sendPrepTrans(forceSend); + TransporterFacade::instance()->unlock_mutex(); + return; +}//Ndb::sendPreparedTransactions() + +/***************************************************************************** +int sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup = 1, int forceSend = 0); + +Remark: First send all prepared operations and then check if there are any + transactions already completed. Wait for not completed + transactions until the specified number have completed or until the + timeout has occured. Timeout zero means no waiting time. +******************************************************************************/ +int +Ndb::sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup, int forceSend) +{ + NdbConnection* tConArray[1024]; + Uint32 tNoCompletedTransactions; + + //theCurrentConnectCounter = 0; + //theCurrentConnectIndex++; + TransporterFacade::instance()->lock_mutex(); + sendPrepTrans(forceSend); + if ((minNoOfEventsToWakeup <= 0) || + ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) { + minNoOfEventsToWakeup = theNoOfSentTransactions; + }//if + if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) && + (aMillisecondNumber > 0)) { + waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup); + tNoCompletedTransactions = pollCompleted(tConArray); + } else { + tNoCompletedTransactions = pollCompleted(tConArray); + }//if + TransporterFacade::instance()->unlock_mutex(); + reportCallback(tConArray, tNoCompletedTransactions); + return tNoCompletedTransactions; +}//Ndb::sendPollNdb() + +/***************************************************************************** +int pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup); + +Remark: Check if there are any transactions already completed. Wait for not + completed transactions until the specified number have completed or + until the timeout has occured. Timeout zero means no waiting time. +******************************************************************************/ +int +Ndb::pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup) +{ + NdbConnection* tConArray[1024]; + Uint32 tNoCompletedTransactions; + + //theCurrentConnectCounter = 0; + //theCurrentConnectIndex++; + TransporterFacade::instance()->lock_mutex(); + if ((minNoOfEventsToWakeup == 0) || + ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) { + minNoOfEventsToWakeup = theNoOfSentTransactions; + }//if + if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) && + (aMillisecondNumber > 0)) { + waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup); + tNoCompletedTransactions = pollCompleted(tConArray); + } else { + tNoCompletedTransactions = pollCompleted(tConArray); + }//if + TransporterFacade::instance()->unlock_mutex(); + reportCallback(tConArray, tNoCompletedTransactions); + return tNoCompletedTransactions; +}//Ndb::sendPollNdbWithoutWait() + +/***************************************************************************** +int receiveOptimisedResponse(); + +Return: 0 - Response received + -1 - Timeout occured waiting for response + -2 - Node failure interupted wait for response + +******************************************************************************/ +int +Ndb::receiveResponse(int waitTime) +{ + int tResultCode; + TransporterFacade::instance()->checkForceSend(theNdbBlockNumber); + + theWaiter.wait(waitTime); + + if(theWaiter.m_state == NO_WAIT) { + tResultCode = 0; + } else { + +#ifdef VM_TRACE + ndbout << "ERR: receiveResponse - theWaiter.m_state = "; + ndbout << theWaiter.m_state << endl; +#endif + + if (theWaiter.m_state == WAIT_NODE_FAILURE){ + tResultCode = -2; + } else { + tResultCode = -1; + } + theWaiter.m_state = NO_WAIT; + } + return tResultCode; +}//Ndb::receiveResponse() + +int +Ndb::sendRecSignal(Uint16 node_id, + Uint32 aWaitState, + NdbApiSignal* aSignal, + Uint32 conn_seq) +{ + /* + In most situations 0 is returned. + In error cases we have 5 different cases + -1: Send ok, time out in waiting for reply + -2: Node has failed + -3: Send buffer not full, send failed yet + -4: Send buffer full + -5: Node is currently stopping + */ + + int return_code; + TransporterFacade* tp = TransporterFacade::instance(); + Uint32 send_size = 1; // Always sends one signal only + tp->lock_mutex(); + // Protected area + if ((tp->get_node_alive(node_id)) && + ((tp->getNodeSequence(node_id) == conn_seq) || + (conn_seq == 0))) { + if (tp->check_send_size(node_id, send_size)) { + return_code = tp->sendSignal(aSignal, node_id); + if (return_code != -1) { + theWaiter.m_node = node_id; + theWaiter.m_state = aWaitState; + return receiveResponse(); + // End of protected area + }//if + return_code = -3; + } else { + return_code = -4; + }//if + } else { + if ((tp->get_node_stopping(node_id)) && + ((tp->getNodeSequence(node_id) == conn_seq) || + (conn_seq == 0))) { + return_code = -5; + } else { + return_code = -2; + }//if + }//if + tp->unlock_mutex(); + // End of protected area + return return_code; +}//Ndb::sendRecSignal() + +void +NdbConnection::sendTC_COMMIT_ACK(NdbApiSignal * aSignal, + Uint32 transId1, Uint32 transId2, + Uint32 aTCRef){ +#if 0 + ndbout_c("Sending TC_COMMIT_ACK(0x%x, 0x%x) to -> %d", + transId1, + transId2, + refToNode(aTCRef)); +#endif + TransporterFacade *tp = TransporterFacade::instance(); + aSignal->theTrace = TestOrd::TraceAPI; + aSignal->theReceiversBlockNumber = DBTC; + aSignal->theVerId_signalNumber = GSN_TC_COMMIT_ACK; + aSignal->theLength = 2; + + Uint32 * dataPtr = aSignal->getDataPtrSend(); + dataPtr[0] = transId1; + dataPtr[1] = transId2; + + tp->sendSignal(aSignal, refToNode(aTCRef)); +} |