diff options
Diffstat (limited to 'ndb/src/ndbapi')
30 files changed, 2041 insertions, 2785 deletions
diff --git a/ndb/src/ndbapi/ClusterMgr.cpp b/ndb/src/ndbapi/ClusterMgr.cpp index 57967e5534f..b26d550fe31 100644 --- a/ndb/src/ndbapi/ClusterMgr.cpp +++ b/ndb/src/ndbapi/ClusterMgr.cpp @@ -32,6 +32,10 @@ #include <signaldata/NFCompleteRep.hpp> #include <signaldata/ApiRegSignalData.hpp> +#include <mgmapi.h> +#include <mgmapi_configuration.hpp> +#include <mgmapi_config_parameters.h> + // Just a C wrapper for threadMain extern "C" void* @@ -69,32 +73,49 @@ ClusterMgr::~ClusterMgr(){ } void -ClusterMgr::init(const IPCConfig & config){ - NodeId tmp = 0; - while(config.getNextRemoteNodeId(tmp)) { +ClusterMgr::init(ndb_mgm_configuration_iterator & iter){ + for(iter.first(); iter.valid(); iter.next()){ + Uint32 tmp = 0; + if(iter.get(CFG_NODE_ID, &tmp)) + continue; + theNodes[tmp].defined = true; #if 0 ndbout << "--------------------------------------" << endl; - config.print(); ndbout << "--------------------------------------" << endl; ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp)); #endif - if(strcmp(config.getNodeType(tmp), "DB") == 0) { + + unsigned type; + if(iter.get(CFG_TYPE_OF_SECTION, &type)) + continue; + + switch(type){ + case NODE_TYPE_DB: theNodes[tmp].m_info.m_type = NodeInfo::DB; - } else if(strcmp(config.getNodeType(tmp), "API") == 0) { + break; + case NODE_TYPE_API: theNodes[tmp].m_info.m_type = NodeInfo::API; - } else if(strcmp(config.getNodeType(tmp), "MGM") == 0) { + break; + case NODE_TYPE_MGM: theNodes[tmp].m_info.m_type = NodeInfo::MGM; - } else if(strcmp(config.getNodeType(tmp), "REP") == 0) { + break; + case NODE_TYPE_REP: theNodes[tmp].m_info.m_type = NodeInfo::REP; - } else if(strcmp(config.getNodeType(tmp), "EXTERNAL REP") == 0) { + break; + case NODE_TYPE_EXT_REP: theNodes[tmp].m_info.m_type = NodeInfo::REP; - theNodes[tmp].hbFrequency = config.getREPHBFrequency(tmp); - assert(100 <= theNodes[tmp].hbFrequency && - theNodes[tmp].hbFrequency < 60 * 60 * 1000); - } else { + { + Uint32 hbFreq = 10000; + //ndb_mgm_get_int_parameter(iter, CFG_, &hbFreq); + theNodes[tmp].hbFrequency = hbFreq; + assert(100 <= hbFreq && hbFreq < 60 * 60 * 1000); + } + break; + default: + type = type; #if 0 - ndbout_c("ClusterMgr: Unknown node type: %s", config.getNodeType(tmp)); + ndbout_c("ClusterMgr: Unknown node type: %d", type); #endif } } @@ -162,45 +183,43 @@ ClusterMgr::threadMain( ){ const NodeId nodeId = i; Node & theNode = theNodes[nodeId]; - if (theNode.defined == true) { -#if 0 - ndbout_c("ClusterMgr: compatible %d", (int)nodeId); -#endif + if (!theNode.defined) + continue; - if (theNode.connected == false){ - theFacade.doConnect(nodeId); - continue; + if (theNode.connected == false){ + theFacade.doConnect(nodeId); + continue; + } + + if (!theNode.compatible){ + continue; + } + + theNode.hbCounter += timeSlept; + if (theNode.hbCounter >= theNode.hbFrequency){ + /** + * It is now time to send a new Heartbeat + */ + theNode.hbSent++; + theNode.hbCounter = 0; + /** + * If the node is of type REP, + * then the receiver of the signal should be API_CLUSTERMGR + */ + if (theNode.m_info.m_type == NodeInfo::REP) { + signal.theReceiversBlockNumber = API_CLUSTERMGR; } - -#if 0 - ndbout_c("ClusterMgr: connected %d", (int)nodeId); +#if 0 + ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId); #endif - - theNode.hbCounter += timeSlept; - if (theNode.hbCounter >= theNode.hbFrequency){ - /** - * It is now time to send a new Heartbeat - */ - theNode.hbSent++; - theNode.hbCounter = 0; - /** - * If the node is of type REP, - * then the receiver of the signal should be API_CLUSTERMGR - */ - if (theNode.m_info.m_type == NodeInfo::REP) { - signal.theReceiversBlockNumber = API_CLUSTERMGR; - } -#if 0 - ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId); -#endif - theFacade.sendSignalUnCond(&signal, nodeId); - }//if - - if (theNode.hbSent == 4 && theNode.hbFrequency > 0){ - reportNodeFailed(i); - }//if - }//if(defined) - }//for + theFacade.sendSignalUnCond(&signal, nodeId); + }//if + + if (theNode.hbSent == 4 && theNode.hbFrequency > 0){ + reportNodeFailed(i); + }//if + } + /** * End of secure area. Let other threads in */ @@ -281,6 +300,10 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); +#if 0 + ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId); +#endif + assert(nodeId > 0 && nodeId < MAX_NODES); Node & node = theNodes[nodeId]; diff --git a/ndb/src/ndbapi/ClusterMgr.hpp b/ndb/src/ndbapi/ClusterMgr.hpp index 7b7b947742b..cc3cf66c8aa 100644 --- a/ndb/src/ndbapi/ClusterMgr.hpp +++ b/ndb/src/ndbapi/ClusterMgr.hpp @@ -40,7 +40,7 @@ class ClusterMgr { public: ClusterMgr(class TransporterFacade &); ~ClusterMgr(); - void init(const IPCConfig & config); + void init(struct ndb_mgm_configuration_iterator & config); void reportConnected(NodeId nodeId); void reportDisconnected(NodeId nodeId); @@ -114,7 +114,7 @@ ClusterMgr::getNoOfConnectedNodes() const { return noOfConnectedNodes; } -/******************************************************************************/ +/*****************************************************************************/ /** * @class ArbitMgr diff --git a/ndb/src/ndbapi/Makefile_old b/ndb/src/ndbapi/Makefile_old index f4c82e5d6ba..648c8cbb016 100644 --- a/ndb/src/ndbapi/Makefile_old +++ b/ndb/src/ndbapi/Makefile_old @@ -15,13 +15,16 @@ LIB_TARGET_ARCHIVES := $(ARCHIVE_TARGET) \ transporter \ general \ signaldataprint \ - mgmsrvcommon \ + mgmapi mgmsrvcommon \ portlib \ logger \ trace DIRS := signal-sender +CFLAGS_TransporterFacade.cpp := -I$(call fixpath,$(NDB_TOP)/src/mgmapi) +CFLAGS_ClusterMgr.cpp := -I$(call fixpath,$(NDB_TOP)/src/mgmapi) + # Source files of non-templated classes (.cpp files) SOURCES = \ TransporterFacade.cpp \ @@ -31,31 +34,25 @@ SOURCES = \ Ndblist.cpp \ Ndbif.cpp \ Ndbinit.cpp \ - Ndberr.cpp \ - ndberror.c \ - NdbErrorOut.cpp \ - NdbConnection.cpp \ + ndberror.c Ndberr.cpp NdbErrorOut.cpp \ + NdbConnection.cpp \ NdbConnectionScan.cpp \ NdbOperation.cpp \ NdbOperationSearch.cpp \ - NdbOperationScan.cpp \ NdbOperationInt.cpp \ NdbOperationDefine.cpp \ NdbOperationExec.cpp \ - NdbScanReceiver.cpp \ NdbResultSet.cpp \ - NdbCursorOperation.cpp \ NdbScanOperation.cpp NdbScanFilter.cpp \ NdbIndexOperation.cpp \ NdbEventOperation.cpp \ NdbEventOperationImpl.cpp \ NdbApiSignal.cpp \ NdbRecAttr.cpp \ - NdbSchemaCon.cpp \ - NdbSchemaOp.cpp \ NdbUtil.cpp \ NdbReceiver.cpp \ - NdbDictionary.cpp NdbDictionaryImpl.cpp DictCache.cpp + NdbDictionary.cpp NdbDictionaryImpl.cpp DictCache.cpp \ + NdbSchemaCon.cpp NdbSchemaOp.cpp include $(NDB_TOP)/Epilogue.mk diff --git a/ndb/src/ndbapi/Ndb.cpp b/ndb/src/ndbapi/Ndb.cpp index fe21b4e02a4..e7edf19c0a3 100644 --- a/ndb/src/ndbapi/Ndb.cpp +++ b/ndb/src/ndbapi/Ndb.cpp @@ -156,26 +156,22 @@ Ndb::NDB_connect(Uint32 tNode) tNdbCon->Status(NdbConnection::Connecting); // Set status to connecting Uint32 nodeSequence; { // send and receive signal - tp->lock_mutex(); + Guard guard(tp->theMutexPtr); nodeSequence = tp->getNodeSequence(tNode); bool node_is_alive = tp->get_node_alive(tNode); if (node_is_alive) { tReturnCode = tp->sendSignal(tSignal, tNode); releaseSignal(tSignal); - if (tReturnCode == -1) { - tp->unlock_mutex(); - } else { + if (tReturnCode != -1) { theWaiter.m_node = tNode; theWaiter.m_state = WAIT_TC_SEIZE; tReturnCode = receiveResponse(); }//if } else { releaseSignal(tSignal); - tp->unlock_mutex(); tReturnCode = -1; }//if } - if ((tReturnCode == 0) && (tNdbCon->Status() == NdbConnection::Connected)) { //************************************************ // Send and receive was successful @@ -465,41 +461,43 @@ Ndb::closeTransaction(NdbConnection* aConnection) CHECK_STATUS_MACRO_VOID; tCon = theTransactionList; - + if (aConnection == tCon) { // Remove the active connection object - theTransactionList = tCon->next(); // from the transaction list. + theTransactionList = tCon->next(); // from the transaction list. } else { while (aConnection != tCon) { if (tCon == NULL) { //----------------------------------------------------- // closeTransaction called on non-existing transaction //----------------------------------------------------- + + if(aConnection->theError.code == 4008){ + /** + * When a SCAN timed-out, returning the NdbConnection leads + * to reuse. And TC crashes when the API tries to reuse it to + * something else... + */ #ifdef VM_TRACE - printf("Non-existing transaction into closeTransaction\n"); + printf("Scan timeout:ed NdbConnection-> " + "not returning it-> memory leak\n"); +#endif + return; + } + +#ifdef VM_TRACE + printf("Non-existing transaction into closeTransaction\n"); abort(); #endif - return; + return; }//if tPreviousCon = tCon; tCon = tCon->next(); }//while tPreviousCon->next(tCon->next()); }//if - + aConnection->release(); - - if(aConnection->theError.code == 4008){ - /** - * When a SCAN timed-out, returning the NdbConnection leads - * to reuse. And TC crashes when the API tries to reuse it to - * something else... - */ -#ifdef VM_TRACE - printf("Scan timeout:ed NdbConnection-> not returning it-> memory leak\n"); -#endif - return; - } - + if(aConnection->theError.code == 4008){ /** * Something timed-out, returning the NdbConnection leads @@ -511,7 +509,7 @@ Ndb::closeTransaction(NdbConnection* aConnection) #endif return; } - + if (aConnection->theReleaseOnClose == false) { /** * Put it back in idle list for that node diff --git a/ndb/src/ndbapi/NdbApiSignal.cpp b/ndb/src/ndbapi/NdbApiSignal.cpp index a44937cd398..d173a462020 100644 --- a/ndb/src/ndbapi/NdbApiSignal.cpp +++ b/ndb/src/ndbapi/NdbApiSignal.cpp @@ -46,6 +46,7 @@ Adjust: 971114 UABMNST First version. #include <signaldata/IndxKeyInfo.hpp> #include <signaldata/IndxAttrInfo.hpp> #include <signaldata/TcHbRep.hpp> +#include <signaldata/ScanTab.hpp> #include <NdbOut.hpp> @@ -188,7 +189,7 @@ NdbApiSignal::setSignal(int aNdbSignalType) theTrace = TestOrd::TraceAPI; theReceiversBlockNumber = DBTC; theVerId_signalNumber = GSN_SCAN_TABREQ; - theLength = 25; + theLength = 9; // ScanTabReq::SignalLength; } break; diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index fbfd0e99238..8ccd0aa8523 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -27,11 +27,12 @@ Description: Interface between TIS and NDB Documentation: Adjust: 971022 UABMNST First version. *****************************************************************************/ -#include "NdbOut.hpp" -#include "NdbConnection.hpp" -#include "NdbOperation.hpp" -#include "NdbScanOperation.hpp" -#include "NdbIndexOperation.hpp" +#include <NdbOut.hpp> +#include <NdbConnection.hpp> +#include <NdbOperation.hpp> +#include <NdbScanOperation.hpp> +#include <NdbIndexScanOperation.hpp> +#include <NdbIndexOperation.hpp> #include "NdbApiSignal.hpp" #include "TransporterFacade.hpp" #include "API.hpp" @@ -79,15 +80,12 @@ NdbConnection::NdbConnection( Ndb* aNdb ) : theTransactionIsStarted(false), theDBnode(0), theReleaseOnClose(false), - // Cursor operations + // Scan operations m_waitForReply(true), - m_theFirstCursorOperation(NULL), - m_theLastCursorOperation(NULL), - m_firstExecutedCursorOp(NULL), + m_theFirstScanOperation(NULL), + m_theLastScanOperation(NULL), + m_firstExecutedScanOp(NULL), // Scan operations - theScanFinished(0), - theCurrentScanRec(NULL), - thePreviousScanRec(NULL), theScanningOp(NULL), theBuddyConPtr(0xFFFFFFFF) { @@ -117,7 +115,6 @@ NdbConnection::init() theListState = NotInList; theInUseState = true; theTransactionIsStarted = false; - theScanFinished = 0; theNext = NULL; theFirstOpInList = NULL; @@ -128,9 +125,6 @@ NdbConnection::init() theFirstExecOpInList = NULL; theLastExecOpInList = NULL; - theCurrentScanRec = NULL; - thePreviousScanRec = NULL; - theCompletedFirstOp = NULL; theGlobalCheckpointId = 0; @@ -146,11 +140,11 @@ NdbConnection::init() theSimpleState = true; theSendStatus = InitState; theMagicNumber = 0x37412619; - // Cursor operations + // Scan operations m_waitForReply = true; - m_theFirstCursorOperation = NULL; - m_theLastCursorOperation = NULL; - m_firstExecutedCursorOp = 0; + m_theFirstScanOperation = NULL; + m_theLastScanOperation = NULL; + m_firstExecutedScanOp = 0; theBuddyConPtr = 0xFFFFFFFF; }//NdbConnection::init() @@ -331,7 +325,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec, */ theError.code = 0; - NdbCursorOperation* tcOp = m_theFirstCursorOperation; + NdbScanOperation* tcOp = m_theFirstScanOperation; if (tcOp != 0){ // Execute any cursor operations while (tcOp != NULL) { @@ -340,14 +334,14 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec, if (tReturnCode == -1) { return; }//if - tcOp = (NdbCursorOperation*)tcOp->next(); + tcOp = (NdbScanOperation*)tcOp->next(); } // while - m_theLastCursorOperation->next(m_firstExecutedCursorOp); - m_firstExecutedCursorOp = m_theFirstCursorOperation; + m_theLastScanOperation->next(m_firstExecutedScanOp); + m_firstExecutedScanOp = m_theFirstScanOperation; // Discard cursor operations, since these are also // in the complete operations list we do not need // to release them. - m_theFirstCursorOperation = m_theLastCursorOperation = NULL; + m_theFirstScanOperation = m_theLastScanOperation = NULL; } bool tTransactionIsStarted = theTransactionIsStarted; @@ -714,17 +708,14 @@ Remark: Release all operations. ******************************************************************************/ void NdbConnection::release(){ - if (theTransactionIsStarted == true && theScanningOp != NULL ) - stopScan(); - releaseOperations(); if ( (theTransactionIsStarted == true) && - ((theCommitStatus != Committed) && - (theCommitStatus != Aborted))) { -/**************************************************************************** - * The user did not perform any rollback but simply closed the - * transaction. We must rollback Ndb since Ndb have been contacted. -******************************************************************************/ + ((theCommitStatus != Committed) && + (theCommitStatus != Aborted))) { + /************************************************************************ + * The user did not perform any rollback but simply closed the + * transaction. We must rollback Ndb since Ndb have been contacted. + ************************************************************************/ execute(Rollback); }//if theMagicNumber = 0xFE11DC; @@ -756,8 +747,8 @@ void NdbConnection::releaseOperations() { // Release any open scans - releaseCursorOperations(m_theFirstCursorOperation); - releaseCursorOperations(m_firstExecutedCursorOp); + releaseScanOperations(m_theFirstScanOperation); + releaseScanOperations(m_firstExecutedScanOp); releaseOps(theCompletedFirstOp); releaseOps(theFirstOpInList); @@ -769,9 +760,9 @@ NdbConnection::releaseOperations() theLastOpInList = NULL; theLastExecOpInList = NULL; theScanningOp = NULL; - m_theFirstCursorOperation = NULL; - m_theLastCursorOperation = NULL; - m_firstExecutedCursorOp = NULL; + m_theFirstScanOperation = NULL; + m_theLastScanOperation = NULL; + m_firstExecutedScanOp = NULL; }//NdbConnection::releaseOperations() void @@ -782,24 +773,21 @@ NdbConnection::releaseCompletedOperations() }//NdbConnection::releaseOperations() /****************************************************************************** -void releaseCursorOperations(); +void releaseScanOperations(); Remark: Release all cursor operations. (NdbScanOperation and NdbIndexOperation) ******************************************************************************/ void -NdbConnection::releaseCursorOperations(NdbCursorOperation* cursorOp) +NdbConnection::releaseScanOperations(NdbIndexScanOperation* cursorOp) { while(cursorOp != 0){ - NdbCursorOperation* next = (NdbCursorOperation*)cursorOp->next(); + NdbIndexScanOperation* next = (NdbIndexScanOperation*)cursorOp->next(); cursorOp->release(); - if (cursorOp->cursorType() == NdbCursorOperation::ScanCursor) - theNdb->releaseScanOperation((NdbScanOperation*)cursorOp); - else - theNdb->releaseOperation(cursorOp); + theNdb->releaseScanOperation(cursorOp); cursorOp = next; } -}//NdbConnection::releaseCursorOperations() +}//NdbConnection::releaseScanOperations() /***************************************************************************** NdbOperation* getNdbOperation(const char* aTableName); @@ -833,45 +821,6 @@ NdbConnection::getNdbOperation(const char* aTableName) }//NdbConnection::getNdbOperation() /***************************************************************************** -NdbOperation* getNdbOperation(const char* anIndexName, const char* aTableName); - -Return Value Return a pointer to a NdbOperation object if getNdbOperation - was succesful. - Return NULL : In all other case. -Parameters: anIndexName : Name of the index to use. - aTableName : Name of the database table. -Remark: Get an operation from NdbOperation idlelist and get the - NdbConnection object - who was fetch by startTransaction pointing to this operation - getOperation will set the theTableId in the NdbOperation object. - synchronous -******************************************************************************/ -NdbOperation* -NdbConnection::getNdbOperation(const char* anIndexName, const char* aTableName) -{ - if ((theError.code == 0) && - (theCommitStatus == Started)){ - NdbIndexImpl* index = - theNdb->theDictionary->getIndex(anIndexName, aTableName); - NdbTableImpl* table = theNdb->theDictionary->getTable(aTableName); - NdbTableImpl* indexTable = - theNdb->theDictionary->getIndexTable(index, table); - if (indexTable != 0){ - return getNdbOperation(indexTable); - } else { - setErrorCode(theNdb->theDictionary->getNdbError().code); - return NULL; - }//if - } else { - if (theError.code == 0) { - setOperationErrorCodeAbort(4114); - }//if - - return NULL; - }//if -}//NdbConnection::getNdbOperation() - -/***************************************************************************** NdbOperation* getNdbOperation(int aTableId); Return Value Return a pointer to a NdbOperation object if getNdbOperation @@ -956,8 +905,9 @@ Remark: Get an operation from NdbScanOperation idlelist and get the NdbC who was fetch by startTransaction pointing to this operation getOperation will set the theTableId in the NdbOperation object.synchronous ******************************************************************************/ -NdbScanOperation* -NdbConnection::getNdbScanOperation(const char* anIndexName, const char* aTableName) +NdbIndexScanOperation* +NdbConnection::getNdbIndexScanOperation(const char* anIndexName, + const char* aTableName) { if (theCommitStatus == Started){ NdbIndexImpl* index = @@ -966,7 +916,9 @@ NdbConnection::getNdbScanOperation(const char* anIndexName, const char* aTableNa NdbTableImpl* indexTable = theNdb->theDictionary->getIndexTable(index, table); if (indexTable != 0){ - return getNdbScanOperation(indexTable); + NdbIndexScanOperation* tOp = getNdbScanOperation(indexTable); + if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor; + return tOp; } else { setOperationErrorCodeAbort(theNdb->theError.code); return NULL; @@ -987,21 +939,21 @@ Remark: Get an operation from NdbScanOperation object idlelist and get t object who was fetch by startTransaction pointing to this operation getOperation will set the theTableId in the NdbOperation object, synchronous. *****************************************************************************/ -NdbScanOperation* +NdbIndexScanOperation* NdbConnection::getNdbScanOperation(NdbTableImpl * tab) { - NdbScanOperation* tOp; + NdbIndexScanOperation* tOp; tOp = theNdb->getScanOperation(); if (tOp == NULL) goto getNdbOp_error1; // Link scan operation into list of cursor operations - if (m_theLastCursorOperation == NULL) - m_theFirstCursorOperation = m_theLastCursorOperation = tOp; + if (m_theLastScanOperation == NULL) + m_theFirstScanOperation = m_theLastScanOperation = tOp; else { - m_theLastCursorOperation->next(tOp); - m_theLastCursorOperation = tOp; + m_theLastScanOperation->next(tOp); + m_theLastScanOperation = tOp; } tOp->next(NULL); if (tOp->init(tab, this) != -1) { @@ -1211,12 +1163,12 @@ Remark: int NdbConnection::receiveTC_COMMITCONF(const TcCommitConf * commitConf) { - if(theStatus != Connected){ - return -1; + if(checkState_TransId(&commitConf->transId1)){ + theCommitStatus = Committed; + theCompletionStatus = CompletedSuccess; + return 0; } - theCommitStatus = Committed; - theCompletionStatus = CompletedSuccess; - return 0; + return -1; }//NdbConnection::receiveTC_COMMITCONF() /****************************************************************************** @@ -1230,33 +1182,33 @@ Remark: int NdbConnection::receiveTC_COMMITREF(NdbApiSignal* aSignal) { - if(theStatus != Connected){ - return -1; + const TcCommitRef * ref = CAST_CONSTPTR(TcCommitRef, aSignal->getDataPtr()); + if(checkState_TransId(&ref->transId1)){ + setOperationErrorCodeAbort(ref->errorCode); + theCommitStatus = Aborted; + theCompletionStatus = CompletedFailure; + return 0; } - const TcCommitRef * const ref = CAST_CONSTPTR(TcCommitRef, aSignal->getDataPtr()); - setOperationErrorCodeAbort(ref->errorCode); - theCommitStatus = Aborted; - theCompletionStatus = CompletedFailure; - return 0; + return -1; }//NdbConnection::receiveTC_COMMITREF() -/******************************************************************************* +/****************************************************************************** int receiveTCROLLBACKCONF(NdbApiSignal* aSignal); Return Value: Return 0 : receiveTCROLLBACKCONF was successful. Return -1: In all other case. Parameters: aSignal: The signal object pointer. Remark: -*******************************************************************************/ +******************************************************************************/ int NdbConnection::receiveTCROLLBACKCONF(NdbApiSignal* aSignal) { - if(theStatus != Connected){ - return -1; + if(checkState_TransId(aSignal->getDataPtr() + 1)){ + theCommitStatus = Aborted; + theCompletionStatus = CompletedSuccess; + return 0; } - theCommitStatus = Aborted; - theCompletionStatus = CompletedSuccess; - return 0; + return -1; }//NdbConnection::receiveTCROLLBACKCONF() /******************************************************************************* @@ -1270,13 +1222,13 @@ Remark: int NdbConnection::receiveTCROLLBACKREF(NdbApiSignal* aSignal) { - if(theStatus != Connected){ - return -1; + if(checkState_TransId(aSignal->getDataPtr() + 1)){ + setOperationErrorCodeAbort(aSignal->readData(4)); + theCommitStatus = Aborted; + theCompletionStatus = CompletedFailure; + return 0; } - setOperationErrorCodeAbort(aSignal->readData(2)); - theCommitStatus = Aborted; - theCompletionStatus = CompletedFailure; - return 0; + return -1; }//NdbConnection::receiveTCROLLBACKREF() /***************************************************************************** @@ -1291,36 +1243,26 @@ Remark: Handles the reception of the ROLLBACKREP signal. int NdbConnection::receiveTCROLLBACKREP( NdbApiSignal* aSignal) { - Uint64 tRecTransId, tCurrTransId; - Uint32 tTmp1, tTmp2; - - if (theStatus != Connected) { - return -1; - }//if -/***************************************************************************** + /**************************************************************************** Check that we are expecting signals from this transaction and that it doesn't belong to a transaction already completed. Simply ignore messages from other transactions. -******************************************************************************/ - tTmp1 = aSignal->readData(2); - tTmp2 = aSignal->readData(3); - tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32); - tCurrTransId = this->getTransactionId(); - if (tCurrTransId != tRecTransId) { - return -1; - }//if - theError.code = aSignal->readData(4); // Override any previous errors - -/**********************************************************************/ -/* A serious error has occured. This could be due to deadlock or */ -/* lack of resources or simply a programming error in NDB. This */ -/* transaction will be aborted. Actually it has already been */ -/* and we only need to report completion and return with the */ -/* error code to the application. */ -/**********************************************************************/ - theCompletionStatus = CompletedFailure; - theCommitStatus = Aborted; - return 0; + ****************************************************************************/ + if(checkState_TransId(aSignal->getDataPtr() + 1)){ + theError.code = aSignal->readData(4);// Override any previous errors + + /**********************************************************************/ + /* A serious error has occured. This could be due to deadlock or */ + /* lack of resources or simply a programming error in NDB. This */ + /* transaction will be aborted. Actually it has already been */ + /* and we only need to report completion and return with the */ + /* error code to the application. */ + /**********************************************************************/ + theCompletionStatus = CompletedFailure; + theCommitStatus = Aborted; + return 0; + } + return -1; }//NdbConnection::receiveTCROLLBACKREP() /******************************************************************************* @@ -1334,47 +1276,38 @@ Remark: int NdbConnection::receiveTCKEYCONF(const TcKeyConf * keyConf, Uint32 aDataLength) { - Uint64 tRecTransId; - NdbOperation* tOp; - Uint32 tConditionFlag; - + NdbReceiver* tOp; const Uint32 tTemp = keyConf->confInfo; - const Uint32 tTmp1 = keyConf->transId1; - const Uint32 tTmp2 = keyConf->transId2; -/****************************************************************************** + /*************************************************************************** Check that we are expecting signals from this transaction and that it doesn't belong to a transaction already completed. Simply ignore messages from other transactions. -******************************************************************************/ - tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32); + ***************************************************************************/ + if(checkState_TransId(&keyConf->transId1)){ - const Uint32 tNoOfOperations = TcKeyConf::getNoOfOperations(tTemp); - const Uint32 tCommitFlag = TcKeyConf::getCommitFlag(tTemp); - tConditionFlag = (Uint32)(((aDataLength - 5) >> 1) - tNoOfOperations); - tConditionFlag |= (Uint32)(tNoOfOperations > 10); - tConditionFlag |= (Uint32)(tNoOfOperations <= 0); - tConditionFlag |= (Uint32)(theTransactionId - tRecTransId); - tConditionFlag |= (Uint32)(theStatus - Connected); + const Uint32 tNoOfOperations = TcKeyConf::getNoOfOperations(tTemp); + const Uint32 tCommitFlag = TcKeyConf::getCommitFlag(tTemp); - if (tConditionFlag == 0) { const Uint32* tPtr = (Uint32 *)&keyConf->operations[0]; + Uint32 tNoComp = theNoOfOpCompleted; for (Uint32 i = 0; i < tNoOfOperations ; i++) { - tOp = theNdb->void2rec_op(theNdb->int2void(*tPtr)); + tOp = theNdb->void2rec(theNdb->int2void(*tPtr)); tPtr++; const Uint32 tAttrInfoLen = *tPtr; tPtr++; - if (tOp && tOp->checkMagicNumber() != -1) { - tOp->TCOPCONF(tAttrInfoLen); + if (tOp && tOp->checkMagicNumber()) { + tNoComp += tOp->execTCOPCONF(tAttrInfoLen); } else { return -1; }//if }//for - Uint32 tNoComp = theNoOfOpCompleted; Uint32 tNoSent = theNoOfOpSent; + theNoOfOpCompleted = tNoComp; Uint32 tGCI = keyConf->gci; if (tCommitFlag == 1) { theCommitStatus = Committed; theGlobalCheckpointId = tGCI; + theTransactionId++; } else if ((tNoComp >= tNoSent) && (theLastExecOpInList->theCommitIndicator == 1)){ /**********************************************************************/ @@ -1406,50 +1339,46 @@ Remark: Handles the reception of the TCKEY_FAILCONF signal. int NdbConnection::receiveTCKEY_FAILCONF(const TcKeyFailConf * failConf) { - Uint64 tRecTransId, tCurrTransId; - Uint32 tTmp1, tTmp2; NdbOperation* tOp; - if (theStatus != Connected) { - return -1; - }//if /* - Check that we are expecting signals from this transaction and that it - doesn't belong to a transaction already completed. Simply ignore - messages from other transactions. + Check that we are expecting signals from this transaction and that it + doesn't belong to a transaction already completed. Simply ignore + messages from other transactions. */ - tTmp1 = failConf->transId1; - tTmp2 = failConf->transId2; - tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32); - tCurrTransId = this->getTransactionId(); - if (tCurrTransId != tRecTransId) { - return -1; - }//if - /* - A node failure of the TC node occured. The transaction has - been committed. - */ - theCommitStatus = Committed; - tOp = theFirstExecOpInList; - while (tOp != NULL) { + if(checkState_TransId(&failConf->transId1)){ /* - Check if the transaction expected read values... - If it did some of them might have gotten lost even if we succeeded - in committing the transaction. + A node failure of the TC node occured. The transaction has + been committed. */ - if (tOp->theAI_ElementLen != 0) { - theCompletionStatus = CompletedFailure; - setOperationErrorCodeAbort(4115); - break; - }//if - if (tOp->theCurrentRecAttr != NULL) { - theCompletionStatus = CompletedFailure; - setOperationErrorCodeAbort(4115); - break; - }//if - tOp = tOp->next(); - }//while - theReleaseOnClose = true; - return 0; + theCommitStatus = Committed; + tOp = theFirstExecOpInList; + while (tOp != NULL) { + /* + * Check if the transaction expected read values... + * If it did some of them might have gotten lost even if we succeeded + * in committing the transaction. + */ + switch(tOp->theOperationType){ + case UpdateRequest: + case InsertRequest: + case DeleteRequest: + case WriteRequest: + tOp = tOp->next(); + break; + case ReadRequest: + case ReadExclusive: + case OpenScanRequest: + case OpenRangeScanRequest: + theCompletionStatus = CompletedFailure; + setOperationErrorCodeAbort(4115); + tOp = NULL; + break; + }//if + }//while + theReleaseOnClose = true; + return 0; + } + return -1; }//NdbConnection::receiveTCKEY_FAILCONF() /************************************************************************* @@ -1464,101 +1393,75 @@ Remark: Handles the reception of the TCKEY_FAILREF signal. int NdbConnection::receiveTCKEY_FAILREF(NdbApiSignal* aSignal) { - Uint64 tRecTransId, tCurrTransId; - Uint32 tTmp1, tTmp2; - - if (theStatus != Connected) { - return -1; - }//if /* - Check that we are expecting signals from this transaction and - that it doesn't belong to a transaction already - completed. Simply ignore messages from other transactions. + Check that we are expecting signals from this transaction and + that it doesn't belong to a transaction already + completed. Simply ignore messages from other transactions. */ - tTmp1 = aSignal->readData(2); - tTmp2 = aSignal->readData(3); - tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32); - tCurrTransId = this->getTransactionId(); - if (tCurrTransId != tRecTransId) { - return -1; - }//if - /* - We received an indication of that this transaction was aborted due to a - node failure. - */ - if (theSendStatus == sendTC_ROLLBACK) { + if(checkState_TransId(aSignal->getDataPtr()+1)){ /* - We were in the process of sending a rollback anyways. We will - report it as a success. + We received an indication of that this transaction was aborted due to a + node failure. */ - theCompletionStatus = CompletedSuccess; - } else { - theCompletionStatus = CompletedFailure; - theError.code = 4031; - }//if - theReleaseOnClose = true; - theCommitStatus = Aborted; - return 0; + if (theSendStatus == sendTC_ROLLBACK) { + /* + We were in the process of sending a rollback anyways. We will + report it as a success. + */ + theCompletionStatus = CompletedSuccess; + } else { + theCompletionStatus = CompletedFailure; + theError.code = 4031; + }//if + theReleaseOnClose = true; + theCommitStatus = Aborted; + return 0; + } + return -1; }//NdbConnection::receiveTCKEY_FAILREF() -/******************************************************************************* +/****************************************************************************** int receiveTCINDXCONF(NdbApiSignal* aSignal, Uint32 long_short_ind); Return Value: Return 0 : receiveTCINDXCONF was successful. Return -1: In all other case. Parameters: aSignal: The signal object pointer. Remark: -*******************************************************************************/ +******************************************************************************/ int -NdbConnection::receiveTCINDXCONF(const TcIndxConf * indxConf, Uint32 aDataLength) +NdbConnection::receiveTCINDXCONF(const TcIndxConf * indxConf, + Uint32 aDataLength) { - Uint64 tRecTransId; - Uint32 tConditionFlag; - - const Uint32 tTemp = indxConf->confInfo; - const Uint32 tTmp1 = indxConf->transId1; - const Uint32 tTmp2 = indxConf->transId2; -/****************************************************************************** -Check that we are expecting signals from this transaction and that it -doesn't belong to a transaction already completed. Simply ignore messages -from other transactions. -******************************************************************************/ - tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32); - - const Uint32 tNoOfOperations = TcIndxConf::getNoOfOperations(tTemp); - const Uint32 tCommitFlag = TcKeyConf::getCommitFlag(tTemp); - - tConditionFlag = (Uint32)(((aDataLength - 5) >> 1) - tNoOfOperations); - tConditionFlag |= (Uint32)(tNoOfOperations > 10); - tConditionFlag |= (Uint32)(tNoOfOperations <= 0); - tConditionFlag |= (Uint32)(theTransactionId - tRecTransId); - tConditionFlag |= (Uint32)(theStatus - Connected); - - if (tConditionFlag == 0) { + if(checkState_TransId(&indxConf->transId1)){ + const Uint32 tTemp = indxConf->confInfo; + const Uint32 tNoOfOperations = TcIndxConf::getNoOfOperations(tTemp); + const Uint32 tCommitFlag = TcKeyConf::getCommitFlag(tTemp); + const Uint32* tPtr = (Uint32 *)&indxConf->operations[0]; + Uint32 tNoComp = theNoOfOpCompleted; for (Uint32 i = 0; i < tNoOfOperations ; i++) { - NdbIndexOperation* tOp = theNdb->void2rec_iop(theNdb->int2void(*tPtr)); + NdbReceiver* tOp = theNdb->void2rec(theNdb->int2void(*tPtr)); tPtr++; const Uint32 tAttrInfoLen = *tPtr; tPtr++; - if (tOp && tOp->checkMagicNumber() != -1) { - tOp->TCOPCONF(tAttrInfoLen); + if (tOp && tOp->checkMagicNumber()) { + tNoComp += tOp->execTCOPCONF(tAttrInfoLen); } else { return -1; }//if }//for - Uint32 tNoComp = theNoOfOpCompleted; Uint32 tNoSent = theNoOfOpSent; Uint32 tGCI = indxConf->gci; + theNoOfOpCompleted = tNoComp; if (tCommitFlag == 1) { theCommitStatus = Committed; theGlobalCheckpointId = tGCI; } else if ((tNoComp >= tNoSent) && (theLastExecOpInList->theCommitIndicator == 1)){ -/**********************************************************************/ -// We sent the transaction with Commit flag set and received a CONF with -// no Commit flag set. This is clearly an anomaly. -/**********************************************************************/ + /**********************************************************************/ + // We sent the transaction with Commit flag set and received a CONF with + // no Commit flag set. This is clearly an anomaly. + /**********************************************************************/ theError.code = 4011; theCompletionStatus = CompletedFailure; theCommitStatus = Aborted; @@ -1584,36 +1487,21 @@ Remark: Handles the reception of the TCINDXREF signal. int NdbConnection::receiveTCINDXREF( NdbApiSignal* aSignal) { - Uint64 tRecTransId, tCurrTransId; - Uint32 tTmp1, tTmp2; - - if (theStatus != Connected) { - return -1; - }//if -/***************************************************************************** -Check that we are expecting signals from this transaction and that it doesn't -belong to a transaction already completed. Simply ignore messages from other -transactions. -******************************************************************************/ - tTmp1 = aSignal->readData(2); - tTmp2 = aSignal->readData(3); - tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32); - tCurrTransId = this->getTransactionId(); - if (tCurrTransId != tRecTransId) { - return -1; - }//if - theError.code = aSignal->readData(4); // Override any previous errors - -/**********************************************************************/ -/* A serious error has occured. This could be due to deadlock or */ -/* lack of resources or simply a programming error in NDB. This */ -/* transaction will be aborted. Actually it has already been */ -/* and we only need to report completion and return with the */ -/* error code to the application. */ -/**********************************************************************/ - theCompletionStatus = CompletedFailure; - theCommitStatus = Aborted; - return 0; + if(checkState_TransId(aSignal->getDataPtr()+1)){ + theError.code = aSignal->readData(4); // Override any previous errors + + /**********************************************************************/ + /* A serious error has occured. This could be due to deadlock or */ + /* lack of resources or simply a programming error in NDB. This */ + /* transaction will be aborted. Actually it has already been */ + /* and we only need to report completion and return with the */ + /* error code to the application. */ + /**********************************************************************/ + theCompletionStatus = CompletedFailure; + theCommitStatus = Aborted; + return 0; + } + return -1; }//NdbConnection::receiveTCINDXREF() /******************************************************************************* diff --git a/ndb/src/ndbapi/NdbConnectionScan.cpp b/ndb/src/ndbapi/NdbConnectionScan.cpp index 962acc0bdac..ea45f2b5a00 100644 --- a/ndb/src/ndbapi/NdbConnectionScan.cpp +++ b/ndb/src/ndbapi/NdbConnectionScan.cpp @@ -33,7 +33,6 @@ #include <NdbConnection.hpp> #include <NdbOperation.hpp> #include <NdbScanOperation.hpp> -#include "NdbScanReceiver.hpp" #include "NdbApiSignal.hpp" #include "TransporterFacade.hpp" #include "NdbUtil.hpp" @@ -49,299 +48,6 @@ #define WAITFOR_SCAN_TIMEOUT 120000 -/***************************************************************************** - * int executeScan(); - * - * 1. Check that the transaction is started and other important preconditions - * 2. Tell the kernel to start scanning by sending one SCAN_TABREQ, if - * parallelism is greater than 16 also send one SCAN_TABINFO for each - * additional 16 - * Define which attributes to scan in ATTRINFO, this signal also holds the - * interpreted program - * 3. Wait for the answer of the SCAN_TABREQ. This is either a SCAN_TABCONF if - * the scan was correctly defined and a SCAN_TABREF if the scan couldn't - * be started. - * 4. Check the result, if scan was not started return -1 - * - ****************************************************************************/ -int -NdbConnection::executeScan(){ - if (theTransactionIsStarted == true){ // Transaction already started. - setErrorCode(4600); - return -1; - } - if (theStatus != Connected) { // Lost connection - setErrorCode(4601); - return -1; - } - if (theScanningOp == NULL){ - setErrorCode(4602); // getNdbOperation must be called before executeScan - return -1; - } - TransporterFacade* tp = TransporterFacade::instance(); - theNoOfOpCompleted = 0; - theNoOfSCANTABCONFRecv = 0; - tp->lock_mutex(); - if (tp->get_node_alive(theDBnode) && - (tp->getNodeSequence(theDBnode) == theNodeSequence)) { - if (tp->check_send_size(theDBnode, get_send_size())) { - theTransactionIsStarted = true; - if (sendScanStart() == -1){ - tp->unlock_mutex(); - return -1; - }//if - theNdb->theWaiter.m_node = theDBnode; - theNdb->theWaiter.m_state = WAIT_SCAN; - int res = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); - if (res == 0) { - return 0; - } else { - if (res == -1) { - setErrorCode(4008); - } else if (res == -2) { - theTransactionIsStarted = false; - theReleaseOnClose = true; - setErrorCode(4028); - } else { - ndbout << "Impossible return from receiveResponse in executeScan"; - ndbout << endl; - abort(); - }//if - theCommitStatus = Aborted; - return -1; - }//if - } else { - TRACE_DEBUG("Start a scan with send buffer full attempted"); - setErrorCode(4022); - theCommitStatus = Aborted; - }//if - } else { - if (!(tp->get_node_stopping(theDBnode) && - (tp->getNodeSequence(theDBnode) == theNodeSequence))) { - TRACE_DEBUG("The node is hard dead when attempting to start a scan"); - setErrorCode(4029); - theReleaseOnClose = true; - } else { - TRACE_DEBUG("The node is stopping when attempting to start a scan"); - setErrorCode(4030); - }//if - theCommitStatus = Aborted; - }//if - tp->unlock_mutex(); - return -1; -} - -/****************************************************************************** - * int nextScanResult(); - * Remark: - * This method is used to distribute data received to the application. - * Iterate through the list and search for operations that haven't - * been distributed yet (status != Finished). - * If there are no more operations/records still waiting to be exececuted - * we have to send SCAN_NEXTREQ to fetch next set of records. - * - * TODO - This function should be able to return a value indicating if - * there are any more records already fetched from memory or if it has to - * ask the db for more. This would mean we could get better performance when - * takeOver is used wince we can take over all ops already fetched, put them - * in another trans and send them of to the db when there are no more records - * already fetched. Maybe use a new argument to the function for this -******************************************************************************/ -int -NdbConnection::nextScanResult(bool fetchAllowed){ - - if (theTransactionIsStarted != true){ // Transaction not started. - setErrorCode(4601); - return -1; - } - // Scan has finished ok but no operations recived = empty recordset. - if(theScanFinished == true){ - return 1; // No more records - } - if (theStatus != Connected){// Lost connection - setErrorCode(4601); - return -1; - } - // Something went wrong, probably we got a SCAN_TABREF earlier. - if (theCompletionStatus == CompletedFailure) { - return -1; - } - if (theNoOfOpCompleted == theNoOfOpFetched) { - // There are no more records cached in NdbApi - if (fetchAllowed == true){ - // Get some more records from db - - if (fetchNextScanResult() == -1){ - return -1; - } - if (theScanFinished == true) { // The scan has finished. - return 1; // 1 = No more records - } - if (theCompletionStatus == CompletedFailure) { - return -1; // Something went wrong, probably we got a SCAN_TABREF. - } - } else { - // There where no more cached records in NdbApi - // and we where not allowed to go to db and ask for - // more - return 2; - } - } - - // It's not allowed to come here without any cached records - if (theCurrentScanRec == NULL){ -#ifdef VM_TRACE - ndbout << "nextScanResult("<<fetchAllowed<<")"<<endl - << " theTransactionIsStarted = " << theTransactionIsStarted << endl - << " theScanFinished = " << theScanFinished << endl - << " theCommitStatus = " << theCommitStatus << endl - << " theStatus = " << theStatus << endl - << " theCompletionStatus = " << theCompletionStatus << endl - << " theNoOfOpCompleted = " << theNoOfOpCompleted << endl - << " theNoOfOpFetched = " << theNoOfOpFetched << endl - << " theScanningOp = " << theScanningOp << endl - << " theNoOfSCANTABCONFRecv = "<< theNoOfSCANTABCONFRecv << endl - << " theNdb->theWaiter.m_node = " <<theNdb->theWaiter.m_node<<endl - << " theNdb->theWaiter.m_state = " << theNdb->theWaiter.m_state << endl; - abort(); -#endif - return -1; - } - - // Execute the saved signals for this operation. - NdbScanReceiver* tScanRec = theCurrentScanRec; - theScanningOp->theCurrRecAI_Len = 0; - theScanningOp->theCurrentRecAttr = theScanningOp->theFirstRecAttr; - if(tScanRec->executeSavedSignals() != 0) - return -1; - theNoOfOpCompleted++; - // Remember for next iteration and takeOverScanOp - thePreviousScanRec = tScanRec; - theCurrentScanRec = tScanRec->next(); - return 0; // 0 = There are more rows to be fetched. -} - -/****************************************************************************** - * int stopScan() - * Remark: By sending SCAN_NEXTREQ with data word 2 set to TRUE we - * abort the scan process. - *****************************************************************************/ -int -NdbConnection::stopScan() -{ - if(theScanFinished == true){ - return 0; - } - if (theCompletionStatus == CompletedFailure){ - return 0; - } - - if (theScanningOp == 0){ - return 0; - } - - theNoOfOpCompleted = 0; - theNoOfSCANTABCONFRecv = 0; - theScanningOp->prepareNextScanResult(); - return sendScanNext(1); -} - - -/******************************************************************** - * int sendScanStart() - * - * Send the signals reuired to define and start the scan - * 1. Send SCAN_TABREQ - * 2. Send SCAN_TABINFO(if any, parallelism must be > 16) - * 3. Send ATTRINFO signals - * - * Returns -1 if an error occurs otherwise 0. - * - ********************************************************************/ -int -NdbConnection::sendScanStart(){ - - /***** 0. Prepare signals ******************/ - // This might modify variables and signals - if(theScanningOp->prepareSendScan(theTCConPtr, - theTransactionId) == -1) - return -1; - - /***** 1. Send SCAN_TABREQ **************/ - /***** 2. Send SCAN_TABINFO *************/ - /***** 3. Send ATTRINFO signals *********/ - if (theScanningOp->doSendScan(theDBnode) == -1) - return -1; - return 0; -} - - -int -NdbConnection::fetchNextScanResult(){ - theNoOfOpCompleted = 0; - theNoOfSCANTABCONFRecv = 0; - theScanningOp->prepareNextScanResult(); - return sendScanNext(0); -} - - - -/*********************************************************** - * int sendScanNext(int stopScanFlag) - * - * ************************************************************/ -int NdbConnection::sendScanNext(bool stopScanFlag){ - NdbApiSignal tSignal(theNdb->theMyRef); - Uint32 tTransId1, tTransId2; - tSignal.setSignal(GSN_SCAN_NEXTREQ); - tSignal.setData(theTCConPtr, 1); - // Set the stop flag in word 2(1 = stop) - Uint32 tStopValue; - tStopValue = stopScanFlag == true ? 1 : 0; - tSignal.setData(tStopValue, 2); - tTransId1 = (Uint32) theTransactionId; - tTransId2 = (Uint32) (theTransactionId >> 32); - tSignal.setData(tTransId1, 3); - tSignal.setData(tTransId2, 4); - tSignal.setLength(4); - Uint32 conn_seq = theNodeSequence; - int return_code = theNdb->sendRecSignal(theDBnode, - WAIT_SCAN, - &tSignal, - conn_seq); - if (return_code == 0) { - return 0; - } else if (return_code == -1) { // Time-out - TRACE_DEBUG("Time-out when sending sendScanNext"); - setErrorCode(4024); - theTransactionIsStarted = false; - theReleaseOnClose = true; - theCommitStatus = Aborted; - } else if (return_code == -2) { // Node failed - TRACE_DEBUG("Node failed when sendScanNext"); - setErrorCode(4027); - theTransactionIsStarted = false; - theReleaseOnClose = true; - theCommitStatus = Aborted; - } else if (return_code == -3) { - TRACE_DEBUG("Send failed when sendScanNext"); - setErrorCode(4033); - theTransactionIsStarted = false; - theReleaseOnClose = true; - theCommitStatus = Aborted; - } else if (return_code == -4) { - TRACE_DEBUG("Send buffer full when sendScanNext"); - setErrorCode(4032); - } else if (return_code == -5) { - TRACE_DEBUG("Node stopping when sendScanNext"); - setErrorCode(4034); - } else { - ndbout << "Impossible return from sendRecSignal" << endl; - abort(); - }//if - return -1; -} - /*************************************************************************** * int receiveSCAN_TABREF(NdbApiSignal* aSignal) @@ -352,39 +58,13 @@ int NdbConnection::sendScanNext(bool stopScanFlag){ ****************************************************************************/ int NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){ - const ScanTabRef * const scanTabRef = CAST_CONSTPTR(ScanTabRef, aSignal->getDataPtr()); - if (theStatus != Connected){ -#ifdef VM_TRACE - ndbout << "SCAN_TABREF dropped, theStatus = " << theStatus << endl; -#endif - return -1; - } - if (aSignal->getLength() != ScanTabRef::SignalLength){ -#ifdef VM_TRACE - ndbout << "SCAN_TABREF dropped, signal length " << aSignal->getLength() << endl; -#endif - return -1; - } - const Uint64 tCurrTransId = this->getTransactionId(); - const Uint64 tRecTransId = (Uint64)scanTabRef->transId1 + - ((Uint64)scanTabRef->transId2 << 32); - if ((tRecTransId - tCurrTransId) != (Uint64)0){ -#ifdef VM_TRACE - ndbout << "SCAN_TABREF dropped, wrong transid" << endl; -#endif - return -1; + const ScanTabRef * ref = CAST_CONSTPTR(ScanTabRef, aSignal->getDataPtr()); + + if(checkState_TransId(&ref->transId1)){ + theScanningOp->execCLOSE_SCAN_REP(ref->errorCode); + return 0; } -#if 0 - ndbout << "SCAN_TABREF, " - <<"transid=("<<hex<<scanTabRef->transId1<<", "<<hex<<scanTabRef->transId2<<")" - <<", err="<<dec<<scanTabRef->errorCode << endl; -#endif - setErrorCode(scanTabRef->errorCode); - theCompletionStatus = CompletedFailure; - theCommitStatus = Aborted; // Indicate that this "transaction" was aborted - theTransactionIsStarted = false; - theScanningOp->releaseSignals(); - return 0; + return -1; } /***************************************************************************** @@ -401,173 +81,43 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){ * *****************************************************************************/ int -NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal) -{ - const ScanTabConf * const conf = CAST_CONSTPTR(ScanTabConf, aSignal->getDataPtr()); - if (theStatus != Connected){ -#ifdef VM_TRACE - ndbout << "Dropping SCAN_TABCONF, theStatus = "<< theStatus << endl; -#endif - return -1; - } - if(aSignal->getLength() != ScanTabConf::SignalLength){ -#ifdef VM_TRACE - ndbout << "Dropping SCAN_TABCONF, getLength = "<< aSignal->getLength() << endl; -#endif - return -1; - } - const Uint64 tCurrTransId = this->getTransactionId(); - const Uint64 tRecTransId = - (Uint64)conf->transId1 + ((Uint64)conf->transId2 << 32); - if ((tRecTransId - tCurrTransId) != (Uint64)0){ -#ifdef VM_TRACE - ndbout << "Dropping SCAN_TABCONF, wrong transid" << endl; -#endif - return -1; - } - - const Uint8 scanStatus = - ScanTabConf::getScanStatus(conf->requestInfo); - - if (scanStatus != 0) { - theCompletionStatus = CompletedSuccess; - theCommitStatus = Committed; - theScanFinished = true; - return 0; - } - - // There can only be one SCANTABCONF - assert(theNoOfSCANTABCONFRecv == 0); - theNoOfSCANTABCONFRecv++; - - // Save a copy of the signal - NdbApiSignal * tCopy = new NdbApiSignal(0);//getSignal(); - if (tCopy == NULL){ - setErrorCode(4000); - return 2; // theWaiter.m_state = NO_WAIT - } - tCopy->copyFrom(aSignal); - tCopy->next(NULL); - theScanningOp->theSCAN_TABCONF_Recv = tCopy; - - return checkNextScanResultComplete(); - -} - -/***************************************************************************** - * int receiveSCAN_TABINFO(NdbApiSignal* aSignal) - * - * Receive SCAN_TABINFO - * - *****************************************************************************/ -int -NdbConnection::receiveSCAN_TABINFO(NdbApiSignal* aSignal) +NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, + const Uint32 * ops, Uint32 len) { - if (theStatus != Connected){ - //ndbout << "SCAN_TABINFO dropped, theStatus = " << theStatus << endl; - return -1; - } - if (aSignal->getLength() != ScanTabInfo::SignalLength){ - //ndbout << "SCAN_TABINFO dropped, length = " << aSignal->getLength() << endl; - return -1; - } - - NdbApiSignal * tCopy = new NdbApiSignal(0);//getSignal(); - if (tCopy == NULL){ - setErrorCode(4000); - return 2; // theWaiter.m_state = NO_WAIT - } - tCopy->copyFrom(aSignal); - tCopy->next(NULL); - - // Put the signal last in list - if (theScanningOp->theFirstSCAN_TABINFO_Recv == NULL) - theScanningOp->theFirstSCAN_TABINFO_Recv = tCopy; - else - theScanningOp->theLastSCAN_TABINFO_Recv->next(tCopy); - theScanningOp->theLastSCAN_TABINFO_Recv = tCopy; - - return checkNextScanResultComplete(); -} - -/****************************************************************************** - * int checkNextScanResultComplete(NdbApiSignal* aSignal) - * - * Remark Traverses all the lists that are associated with - * this resultset and checks if all signals are there. - * If all required signal are received return 0 - * - * - *****************************************************************************/ -int -NdbConnection::checkNextScanResultComplete(){ - - if (theNoOfSCANTABCONFRecv != 1) { - return -1; - } - - Uint32 tNoOfOpFetched = 0; - theCurrentScanRec = NULL; - thePreviousScanRec = NULL; - - const ScanTabConf * const conf = - CAST_CONSTPTR(ScanTabConf, theScanningOp->theSCAN_TABCONF_Recv->getDataPtr()); - const Uint32 numOperations = ScanTabConf::getOperations(conf->requestInfo); - Uint32 sigIndex = 0; - NdbApiSignal* tSignal = theScanningOp->theFirstSCAN_TABINFO_Recv; - while(tSignal != NULL){ - const ScanTabInfo * const info = CAST_CONSTPTR(ScanTabInfo, tSignal->getDataPtr()); - // Loop through the operations for this SCAN_TABINFO - // tOpAndLength is allowed to be zero, this means no - // TRANSID_AI signals where sent for this record - // I.e getValue was called 0 times when defining scan - - // The max number of operations in each signal is 16 - Uint32 numOpsInSig = numOperations - sigIndex*16; - if (numOpsInSig > 16) - numOpsInSig = 16; - for(Uint32 i = 0; i < numOpsInSig; i++){ - const Uint32 tOpAndLength = info->operLenAndIdx[i]; - const Uint32 tOpIndex = ScanTabInfo::getIdx(tOpAndLength); - const Uint32 tOpLen = ScanTabInfo::getLen(tOpAndLength); - - assert(tOpIndex < 256); - NdbScanReceiver* tScanRec = - theScanningOp->theScanReceiversArray[tOpIndex]; - assert(tScanRec != NULL); - if(tScanRec->isCompleted(tOpLen)) - tScanRec->setCompleted(); - else{ - return -1; // At least one receiver was not ready - } - - // Build list of scan receivers - if (theCurrentScanRec == NULL) { - theCurrentScanRec = tScanRec; - thePreviousScanRec = tScanRec; - } else { - thePreviousScanRec->next(tScanRec); - thePreviousScanRec = tScanRec; + const ScanTabConf * conf = CAST_CONSTPTR(ScanTabConf, aSignal->getDataPtr()); + if(checkState_TransId(&conf->transId1)){ + + if (conf->requestInfo == ScanTabConf::EndOfData) { + theScanningOp->execCLOSE_SCAN_REP(0); + return 0; + } + + int noComp = -1; + for(Uint32 i = 0; i<len; i += 3){ + Uint32 ptrI = * ops++; + Uint32 tcPtrI = * ops++; + Uint32 info = * ops++; + Uint32 opCount = ScanTabConf::getRows(info); + Uint32 totalLen = ScanTabConf::getLength(info); + + void * tPtr = theNdb->int2void(ptrI); + assert(tPtr); // For now + NdbReceiver* tOp = theNdb->void2rec(tPtr); + if (tOp && tOp->checkMagicNumber()){ + if(tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)){ + /** + * + */ + noComp++; + theScanningOp->receiver_delivered(tOp); + } else if(info == ScanTabConf::EndOfData){ + noComp++; + theScanningOp->receiver_completed(tOp); + } } - tNoOfOpFetched++; } - tSignal = tSignal->next(); - sigIndex++; - } - - // Check number of operations fetched against value in SCANTAB_CONF - if (tNoOfOpFetched != numOperations) { - setErrorCode(4113); - return 2; // theWaiter.m_state = NO_WAIT + return noComp; } - // All signals for this resultset recieved - // release SCAN_TAB signals - theNoOfSCANTABCONFRecv = 0; - theScanningOp->releaseSignals(); - - // We have received all operations with correct lengths. - thePreviousScanRec = NULL; - theNoOfOpFetched = tNoOfOpFetched; - return 0; + return -1; } diff --git a/ndb/src/ndbapi/NdbCursorOperation.cpp b/ndb/src/ndbapi/NdbCursorOperation.cpp index e4dd600c57f..a9f84c4c110 100644 --- a/ndb/src/ndbapi/NdbCursorOperation.cpp +++ b/ndb/src/ndbapi/NdbCursorOperation.cpp @@ -30,8 +30,6 @@ #include <NdbResultSet.hpp> NdbCursorOperation::NdbCursorOperation(Ndb* aNdb) : - NdbOperation(aNdb), - m_resultSet(0) { } @@ -48,10 +46,6 @@ void NdbCursorOperation::cursInit() NdbResultSet* NdbCursorOperation::getResultSet() { - if (!m_resultSet) - m_resultSet = new NdbResultSet(this); - - return m_resultSet; } diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index 3b31fa70e35..899359b12a4 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -257,6 +257,7 @@ NdbTableImpl::init(){ m_indexType = NdbDictionary::Index::Undefined; m_noOfKeys = 0; + m_fragmentCount = 0; } bool @@ -275,11 +276,9 @@ NdbTableImpl::equal(const NdbTableImpl& obj) const if(strcmp(m_internalName.c_str(), obj.m_internalName.c_str()) != 0){ return false; } - if(m_fragmentType != obj.m_fragmentType){ return false; } - if(m_columns.size() != obj.m_columns.size()){ return false; } @@ -318,6 +317,7 @@ NdbTableImpl::assign(const NdbTableImpl& org) m_newExternalName.assign(org.m_newExternalName); m_frm.assign(org.m_frm.get_data(), org.m_frm.length()); m_fragmentType = org.m_fragmentType; + m_fragmentCount = org.m_fragmentCount; for(unsigned i = 0; i<org.m_columns.size(); i++){ NdbColumnImpl * col = new NdbColumnImpl(); @@ -826,6 +826,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal, m_waiter.m_state = wst; m_waiter.wait(theWait); + m_transporter->unlock_mutex(); // End of Protected area if(m_waiter.m_state == NO_WAIT && m_error.code == 0){ @@ -1115,6 +1116,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, impl->m_kvalue = tableDesc.TableKValue; impl->m_minLoadFactor = tableDesc.MinLoadFactor; impl->m_maxLoadFactor = tableDesc.MaxLoadFactor; + impl->m_fragmentCount = tableDesc.FragmentCount; impl->m_indexType = (NdbDictionary::Index::Type) getApiConstant(tableDesc.TableType, @@ -1198,6 +1200,8 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, it.next(); } impl->m_noOfKeys = keyCount; + impl->m_keyLenInWords = keyInfoPos; + * ret = impl; return 0; } @@ -2707,6 +2711,7 @@ NdbDictInterface::listObjects(NdbApiSignal* signal) m_waiter.m_node = aNodeId; m_waiter.m_state = WAIT_LIST_TABLES_CONF; m_waiter.wait(WAITFOR_RESPONSE_TIMEOUT); + m_transporter->unlock_mutex(); // end protected if (m_waiter.m_state == NO_WAIT && m_error.code == 0) return 0; diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/ndb/src/ndbapi/NdbDictionaryImpl.hpp index 3263a636a79..311d101f8f4 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.hpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.hpp @@ -123,7 +123,9 @@ public: int m_kvalue; int m_minLoadFactor; int m_maxLoadFactor; - + int m_keyLenInWords; + int m_fragmentCount; + NdbDictionaryImpl * m_dictionary; NdbIndexImpl * m_index; NdbColumnImpl * getColumn(unsigned attrId); diff --git a/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/ndb/src/ndbapi/NdbEventOperationImpl.cpp index 7b4afc72ef7..6ece69cce91 100644 --- a/ndb/src/ndbapi/NdbEventOperationImpl.cpp +++ b/ndb/src/ndbapi/NdbEventOperationImpl.cpp @@ -166,7 +166,7 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in } //theErrorLine++; - tRecAttr->setUNDEFINED(); + tRecAttr->setNULL(); // We want to keep the list sorted to make data insertion easier later if (theFirstRecAttr == NULL) { @@ -387,7 +387,7 @@ NdbEventOperationImpl::next(int *pOverrun) while (tAttrId > tRecAttrId) { //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); - tWorkingRecAttr->setUNDEFINED(); + tWorkingRecAttr->setNULL(); tWorkingRecAttr = tWorkingRecAttr->next(); if (tWorkingRecAttr == NULL) break; @@ -399,19 +399,16 @@ NdbEventOperationImpl::next(int *pOverrun) //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); if (tAttrId == tRecAttrId) { - tWorkingRecAttr->setNotNULL(); if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()) hasSomeData++; //printf("set!\n"); - Uint32 *theRef = (Uint32*)tWorkingRecAttr->aRef(); - Uint32 *theEndRef = theRef + tDataSz; - while (theRef < theEndRef) - *theRef++ = *aDataPtr++; + tWorkingRecAttr->receive_data(aDataPtr, tDataSz); // move forward, data has already moved forward aAttrPtr++; + aDataPtr += tDataSz; tWorkingRecAttr = tWorkingRecAttr->next(); } else { // move only attr forward @@ -423,7 +420,7 @@ NdbEventOperationImpl::next(int *pOverrun) while (tWorkingRecAttr != NULL) { tRecAttrId = tWorkingRecAttr->attrId(); //printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); - tWorkingRecAttr->setUNDEFINED(); + tWorkingRecAttr->setNULL(); tWorkingRecAttr = tWorkingRecAttr->next(); } @@ -436,7 +433,7 @@ NdbEventOperationImpl::next(int *pOverrun) tDataSz = AttributeHeader(*aDataPtr).getDataSize(); aDataPtr++; while (tAttrId > tRecAttrId) { - tWorkingRecAttr->setUNDEFINED(); + tWorkingRecAttr->setNULL(); tWorkingRecAttr = tWorkingRecAttr->next(); if (tWorkingRecAttr == NULL) break; @@ -445,16 +442,11 @@ NdbEventOperationImpl::next(int *pOverrun) if (tWorkingRecAttr == NULL) break; if (tAttrId == tRecAttrId) { - tWorkingRecAttr->setNotNULL(); - if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()) hasSomeData++; - Uint32 *theRef = (Uint32*)tWorkingRecAttr->aRef(); - Uint32 *theEndRef = theRef + tDataSz; - while (theRef < theEndRef) - *theRef++ = *aDataPtr++; - + tWorkingRecAttr->receive_data(aDataPtr, tDataSz); + aDataPtr += tDataSz; // move forward, data+attr has already moved forward tWorkingRecAttr = tWorkingRecAttr->next(); } else { @@ -463,7 +455,7 @@ NdbEventOperationImpl::next(int *pOverrun) } } while (tWorkingRecAttr != NULL) { - tWorkingRecAttr->setUNDEFINED(); + tWorkingRecAttr->setNULL(); tWorkingRecAttr = tWorkingRecAttr->next(); } diff --git a/ndb/src/ndbapi/NdbImpl.hpp b/ndb/src/ndbapi/NdbImpl.hpp index cd05335b337..1fb1969b589 100644 --- a/ndb/src/ndbapi/NdbImpl.hpp +++ b/ndb/src/ndbapi/NdbImpl.hpp @@ -35,6 +35,7 @@ public: #include <NdbError.hpp> #include <NdbCondition.h> #include <NdbReceiver.hpp> +#include <NdbOperation.hpp> #include <NdbTick.h> @@ -83,12 +84,13 @@ Ndb::void2rec_iop(void* val){ return (NdbIndexOperation*)(void2rec(val)->getOwner()); } -inline -NdbScanReceiver* -Ndb::void2rec_srec(void* val){ - return (NdbScanReceiver*)(void2rec(val)->getOwner()); +inline +NdbConnection * +NdbReceiver::getTransaction(){ + return ((NdbOperation*)m_owner)->theNdbCon; } + inline int Ndb::checkInitState() @@ -151,7 +153,6 @@ NdbWaiter::wait(int waitTime) waitTime = maxTime - NdbTick_CurrentMillisecond(); } } - NdbMutex_Unlock((NdbMutex*)m_mutex); } inline diff --git a/ndb/src/ndbapi/NdbIndexOperation.cpp b/ndb/src/ndbapi/NdbIndexOperation.cpp index 02d94f39f2d..d976c912c5e 100644 --- a/ndb/src/ndbapi/NdbIndexOperation.cpp +++ b/ndb/src/ndbapi/NdbIndexOperation.cpp @@ -52,7 +52,7 @@ NdbIndexOperation::NdbIndexOperation(Ndb* aNdb) : /** * Change receiver type */ - theReceiver.init(NdbReceiver::NDB_INDEX_OPERATION, this); + theReceiver.init(NdbReceiver::NDB_INDEX_OPERATION, this, false); } NdbIndexOperation::~NdbIndexOperation() @@ -664,10 +664,8 @@ NdbIndexOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransactionId) tSignal = tnextSignal; } while (tSignal != NULL); }//if - NdbRecAttr* tRecAttrObject = theFirstRecAttr; theStatus = WaitResponse; - theCurrentRecAttr = tRecAttrObject; - + theReceiver.prepareSend(); return 0; } diff --git a/ndb/src/ndbapi/NdbOperation.cpp b/ndb/src/ndbapi/NdbOperation.cpp index 8bc9111d060..74d6a462ac8 100644 --- a/ndb/src/ndbapi/NdbOperation.cpp +++ b/ndb/src/ndbapi/NdbOperation.cpp @@ -54,7 +54,6 @@ NdbOperation::NdbOperation(Ndb* aNdb) : //theTable(aTable), theNdbCon(NULL), theNext(NULL), - theNextScanOp(NULL), theTCREQ(NULL), theFirstATTRINFO(NULL), theCurrentATTRINFO(NULL), @@ -62,8 +61,6 @@ NdbOperation::NdbOperation(Ndb* aNdb) : theAI_LenInCurrAI(0), theFirstKEYINFO(NULL), theLastKEYINFO(NULL), - theFirstRecAttr(NULL), - theCurrentRecAttr(NULL), theFirstLabel(NULL), theLastLabel(NULL), @@ -76,10 +73,6 @@ NdbOperation::NdbOperation(Ndb* aNdb) : theNoOfLabels(0), theNoOfSubroutines(0), - theTotalRecAI_Len(0), - theCurrRecAI_Len(0), - theAI_ElementLen(0), - theCurrElemPtr(NULL), m_currentTable(NULL), //theTableId(0xFFFF), m_accessTable(NULL), //theAccessTableId(0xFFFF), //theSchemaVersion(0), @@ -95,17 +88,9 @@ NdbOperation::NdbOperation(Ndb* aNdb) : m_tcReqGSN(GSN_TCKEYREQ), m_keyInfoGSN(GSN_KEYINFO), m_attrInfoGSN(GSN_ATTRINFO), - theParallelism(0), - theScanReceiversArray(NULL), - theSCAN_TABREQ(NULL), - theFirstSCAN_TABINFO_Send(NULL), - theLastSCAN_TABINFO_Send(NULL), - theFirstSCAN_TABINFO_Recv(NULL), - theLastSCAN_TABINFO_Recv(NULL), - theSCAN_TABCONF_Recv(NULL), theBoundATTRINFO(NULL) { - theReceiver.init(NdbReceiver::NDB_OPERATION, this); + theReceiver.init(NdbReceiver::NDB_OPERATION, this, false); theError.code = 0; } /***************************************************************************** @@ -165,7 +150,7 @@ NdbOperation::init(NdbTableImpl* tab, NdbConnection* myConnection){ theNdbCon = myConnection; for (Uint32 i=0; i<NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY; i++) for (int j=0; j<3; j++) - theTupleKeyDefined[i][j] = false; + theTupleKeyDefined[i][j] = 0; theFirstATTRINFO = NULL; theCurrentATTRINFO = NULL; @@ -175,13 +160,11 @@ NdbOperation::init(NdbTableImpl* tab, NdbConnection* myConnection){ theTupKeyLen = 0; theNoOfTupKeyDefined = 0; - theTotalCurrAI_Len = 0; - theAI_LenInCurrAI = 0; - theTotalRecAI_Len = 0; theDistrKeySize = 0; theDistributionGroup = 0; - theCurrRecAI_Len = 0; - theAI_ElementLen = 0; + + theTotalCurrAI_Len = 0; + theAI_LenInCurrAI = 0; theStartIndicator = 0; theCommitIndicator = 0; theSimpleIndicator = 0; @@ -191,9 +174,6 @@ NdbOperation::init(NdbTableImpl* tab, NdbConnection* myConnection){ theDistrGroupType = 0; theDistrKeyIndicator = 0; theScanInfo = 0; - theFirstRecAttr = NULL; - theCurrentRecAttr = NULL; - theCurrElemPtr = NULL; theTotalNrOfKeyWordInSignal = 8; theMagicNumber = 0xABCDEF01; theBoundATTRINFO = NULL; @@ -212,6 +192,7 @@ NdbOperation::init(NdbTableImpl* tab, NdbConnection* myConnection){ tcKeyReq->scanInfo = 0; theKEYINFOptr = &tcKeyReq->keyInfo[0]; theATTRINFOptr = &tcKeyReq->attrInfo[0]; + theReceiver.init(NdbReceiver::NDB_OPERATION, this, false); return 0; } @@ -226,8 +207,6 @@ NdbOperation::release() { NdbApiSignal* tSignal; NdbApiSignal* tSaveSignal; - NdbRecAttr* tRecAttr; - NdbRecAttr* tSaveRecAttr; NdbBranch* tBranch; NdbBranch* tSaveBranch; NdbLabel* tLabel; @@ -260,15 +239,6 @@ NdbOperation::release() } theFirstKEYINFO = NULL; theLastKEYINFO = NULL; - tRecAttr = theFirstRecAttr; - while (tRecAttr != NULL) - { - tSaveRecAttr = tRecAttr; - tRecAttr = tRecAttr->next(); - theNdb->releaseRecAttr(tSaveRecAttr); - } - theFirstRecAttr = NULL; - theCurrentRecAttr = NULL; if (theInterpretIndicator == 1) { tBranch = theFirstBranch; @@ -308,19 +278,18 @@ NdbOperation::release() } theBoundATTRINFO = NULL; } - releaseScan(); } NdbRecAttr* NdbOperation::getValue(const char* anAttrName, char* aValue) { - return getValue(m_currentTable->getColumn(anAttrName), aValue); + return getValue_impl(m_currentTable->getColumn(anAttrName), aValue); } NdbRecAttr* NdbOperation::getValue(Uint32 anAttrId, char* aValue) { - return getValue(m_currentTable->getColumn(anAttrId), aValue); + return getValue_impl(m_currentTable->getColumn(anAttrId), aValue); } int @@ -416,16 +385,4 @@ NdbOperation::write_attr(Uint32 anAttrId, Uint32 RegDest) return write_attr(m_currentTable->getColumn(anAttrId), RegDest); } -int -NdbOperation::setBound(const char* anAttrName, int type, const void* aValue, Uint32 len) -{ - return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len); -} - -int -NdbOperation::setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len) -{ - return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len); -} - diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp index 20134068075..c174c6a629a 100644 --- a/ndb/src/ndbapi/NdbOperationDefine.cpp +++ b/ndb/src/ndbapi/NdbOperationDefine.cpp @@ -34,6 +34,7 @@ #include "NdbUtil.hpp" #include "NdbOut.hpp" #include "NdbImpl.hpp" +#include <NdbScanOperation.hpp> #include <Interpreter.hpp> @@ -261,30 +262,10 @@ NdbOperation::interpretedUpdateTuple() theStatus = OperationDefined; tNdbCon->theSimpleState = 0; theOperationType = UpdateRequest; - theInterpretIndicator = 1; theAI_LenInCurrAI = 25; theErrorLine = tErrorLine++; - theTotalCurrAI_Len = 5; - theSubroutineSize = 0; - theInitialReadSize = 0; - theInterpretedSize = 0; - theFinalUpdateSize = 0; - theFinalReadSize = 0; - - theFirstLabel = NULL; - theLastLabel = NULL; - theFirstBranch = NULL; - theLastBranch = NULL; - - theFirstCall = NULL; - theLastCall = NULL; - theFirstSubroutine = NULL; - theLastSubroutine = NULL; - - theNoOfLabels = 0; - theNoOfSubroutines = 0; - + initInterpreter(); return 0; } else { setErrorCode(4200); @@ -304,30 +285,11 @@ NdbOperation::interpretedDeleteTuple() theStatus = OperationDefined; tNdbCon->theSimpleState = 0; theOperationType = DeleteRequest; - theInterpretIndicator = 1; theErrorLine = tErrorLine++; theAI_LenInCurrAI = 25; - theTotalCurrAI_Len = 5; - theSubroutineSize = 0; - theInitialReadSize = 0; - theInterpretedSize = 0; - theFinalUpdateSize = 0; - theFinalReadSize = 0; - - theFirstLabel = NULL; - theLastLabel = NULL; - theFirstBranch = NULL; - theLastBranch = NULL; - - theFirstCall = NULL; - theLastCall = NULL; - theFirstSubroutine = NULL; - theLastSubroutine = NULL; - - theNoOfLabels = 0; - theNoOfSubroutines = 0; + initInterpreter(); return 0; } else { setErrorCode(4200); @@ -347,14 +309,14 @@ NdbOperation::interpretedDeleteTuple() * Remark: Define an attribute to retrieve in query. *****************************************************************************/ NdbRecAttr* -NdbOperation::getValue(const NdbColumnImpl* tAttrInfo, char* aValue) +NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue) { NdbRecAttr* tRecAttr; if ((tAttrInfo != NULL) && (!tAttrInfo->m_indexOnly) && (theStatus != Init)){ if (theStatus == SetBound) { - saveBoundATTRINFO(); + ((NdbScanOperation*)this)->saveBoundATTRINFO(); theStatus = GetValue; } if (theStatus != GetValue) { @@ -386,33 +348,15 @@ NdbOperation::getValue(const NdbColumnImpl* tAttrInfo, char* aValue) // Insert Attribute Id into ATTRINFO part. /************************************************************************ - * Get a Receive Attribute object and link it into the operation object. - ************************************************************************/ - tRecAttr = theNdb->getRecAttr(); - if (tRecAttr != NULL) { - if (theFirstRecAttr == NULL) - theFirstRecAttr = tRecAttr; - else - theCurrentRecAttr->next(tRecAttr); - theCurrentRecAttr = tRecAttr; - tRecAttr->next(NULL); - - /********************************************************************** - * Now set the attribute identity and the pointer to the data in - * the RecAttr object - * Also set attribute size, array size and attribute type - ********************************************************************/ - if (tRecAttr->setup(tAttrInfo, aValue) == 0) { - theErrorLine++; - return tRecAttr; - } else { - setErrorCodeAbort(4000); - return NULL; - } - } else { + * Get a Receive Attribute object and link it into the operation object. + ***********************************************************************/ + if((tRecAttr = theReceiver.getValue(tAttrInfo, aValue)) != 0){ + theErrorLine++; + return tRecAttr; + } else { setErrorCodeAbort(4000); return NULL; - }//if getRecAttr failure + } } else { return NULL; }//if insertATTRINFO failure @@ -603,47 +547,6 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, return 0; }//NdbOperation::setValue() -/* - * Define bound on index column in range scan. - */ -int -NdbOperation::setBound(const NdbColumnImpl* tAttrInfo, int type, const void* aValue, Uint32 len) -{ - if (theOperationType == OpenRangeScanRequest && - theStatus == SetBound && - (0 <= type && type <= 4) && - aValue != NULL && - len <= 8000) { - // bound type - insertATTRINFO(type); - // attribute header - Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; - if (len != sizeInBytes && (len != 0)) { - setErrorCodeAbort(4209); - return -1; - } - len = sizeInBytes; - Uint32 tIndexAttrId = tAttrInfo->m_attrId; - Uint32 sizeInWords = (len + 3) / 4; - AttributeHeader ah(tIndexAttrId, sizeInWords); - insertATTRINFO(ah.m_value); - // attribute data - if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0) - insertATTRINFOloop((const Uint32*)aValue, sizeInWords); - else { - Uint32 temp[2000]; - memcpy(temp, aValue, len); - while ((len & 0x3) != 0) - ((char*)temp)[len++] = 0; - insertATTRINFOloop(temp, sizeInWords); - } - return 0; - } else { - setErrorCodeAbort(4228); // XXX wrong code - return -1; - } -} - /**************************************************************************** * int insertATTRINFO( Uint32 aData ); * diff --git a/ndb/src/ndbapi/NdbOperationExec.cpp b/ndb/src/ndbapi/NdbOperationExec.cpp index d00c527550d..3d8c2f29615 100644 --- a/ndb/src/ndbapi/NdbOperationExec.cpp +++ b/ndb/src/ndbapi/NdbOperationExec.cpp @@ -46,83 +46,6 @@ Documentation: #include <NdbOut.hpp> -/****************************************************************************** -int doSend() - -Return Value: Return >0 : send was succesful, returns number of signals sent - Return -1: In all other case. -Parameters: aProcessorId: Receiving processor node -Remark: Sends the ATTRINFO signal(s) -******************************************************************************/ -int -NdbOperation::doSendScan(int aProcessorId) -{ - Uint32 tSignalCount = 0; - NdbApiSignal* tSignal; - - if (theInterpretIndicator != 1 || - (theOperationType != OpenScanRequest && - theOperationType != OpenRangeScanRequest)) { - setErrorCodeAbort(4005); - return -1; - } - - assert(theSCAN_TABREQ != NULL); - tSignal = theSCAN_TABREQ; - if (tSignal->setSignal(GSN_SCAN_TABREQ) == -1) { - setErrorCode(4001); - return -1; - } - // Update the "attribute info length in words" in SCAN_TABREQ before - // sending it. This could not be done in openScan because - // we created the ATTRINFO signals after the SCAN_TABREQ signal. - ScanTabReq * const scanTabReq = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend()); - scanTabReq->attrLen = theTotalCurrAI_Len; - if (theOperationType == OpenRangeScanRequest) - scanTabReq->attrLen += theTotalBoundAI_Len; - TransporterFacade *tp = TransporterFacade::instance(); - if (tp->sendSignal(tSignal, aProcessorId) == -1) { - setErrorCode(4002); - return -1; - } - tSignalCount++; - - tSignal = theFirstSCAN_TABINFO_Send; - while (tSignal != NULL){ - if (tp->sendSignal(tSignal, aProcessorId)) { - setErrorCode(4002); - return -1; - } - tSignalCount++; - tSignal = tSignal->next(); - } - - if (theOperationType == OpenRangeScanRequest) { - // must have at least one signal since it contains attrLen for bounds - assert(theBoundATTRINFO != NULL); - tSignal = theBoundATTRINFO; - while (tSignal != NULL) { - if (tp->sendSignal(tSignal,aProcessorId) == -1){ - setErrorCode(4002); - return -1; - } - tSignalCount++; - tSignal = tSignal->next(); - } - } - - tSignal = theFirstATTRINFO; - while (tSignal != NULL) { - if (tp->sendSignal(tSignal,aProcessorId) == -1){ - setErrorCode(4002); - return -1; - } - tSignalCount++; - tSignal = tSignal->next(); - } - theStatus = WaitResponse; - return tSignalCount; -}//NdbOperation::doSendScan() void NdbOperation::setLastFlag(NdbApiSignal* signal, Uint32 lastFlag) @@ -178,62 +101,6 @@ NdbOperation::doSend(int aNodeId, Uint32 lastFlag) }//NdbOperation::doSend() /*************************************************************************** -int prepareSendScan(Uint32 aTC_ConnectPtr, - Uint64 aTransactionId) - -Return Value: Return 0 : preparation of send was succesful. - Return -1: In all other case. -Parameters: aTC_ConnectPtr: the Connect pointer to TC. - aTransactionId: the Transaction identity of the transaction. -Remark: Puts the the final data into ATTRINFO signal(s) after this - we know the how many signal to send and their sizes -***************************************************************************/ -int NdbOperation::prepareSendScan(Uint32 aTC_ConnectPtr, - Uint64 aTransactionId){ - - if (theInterpretIndicator != 1 || - (theOperationType != OpenScanRequest && - theOperationType != OpenRangeScanRequest)) { - setErrorCodeAbort(4005); - return -1; - } - - if (theStatus == SetBound) { - saveBoundATTRINFO(); - theStatus = GetValue; - } - - theErrorLine = 0; - - // In preapareSendInterpreted we set the sizes (word 4-8) in the - // first ATTRINFO signal. - if (prepareSendInterpreted() == -1) - return -1; - - const Uint32 transId1 = (Uint32) (aTransactionId & 0xFFFFFFFF); - const Uint32 transId2 = (Uint32) (aTransactionId >> 32); - - if (theOperationType == OpenRangeScanRequest) { - NdbApiSignal* tSignal = theBoundATTRINFO; - do{ - tSignal->setData(aTC_ConnectPtr, 1); - tSignal->setData(transId1, 2); - tSignal->setData(transId2, 3); - tSignal = tSignal->next(); - } while (tSignal != NULL); - } - theCurrentATTRINFO->setLength(theAI_LenInCurrAI); - NdbApiSignal* tSignal = theFirstATTRINFO; - do{ - tSignal->setData(aTC_ConnectPtr, 1); - tSignal->setData(transId1, 2); - tSignal->setData(transId2, 3); - tSignal = tSignal->next(); - } while (tSignal != NULL); - return 0; -} - -/*************************************************************************** int prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransactionId) @@ -457,6 +324,7 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId) theTCREQ->setLength(tcKeyReq->getAIInTcKeyReq(tReqInfo) + tAttrInfoIndex + TcKeyReq::StaticLength); + tAIDataPtr[0] = Tdata1; tAIDataPtr[1] = Tdata2; tAIDataPtr[2] = Tdata3; @@ -479,9 +347,8 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId) tSignal = tnextSignal; } while (tSignal != NULL); }//if - NdbRecAttr* tRecAttrObject = theFirstRecAttr; theStatus = WaitResponse; - theCurrentRecAttr = tRecAttrObject; + theReceiver.prepareSend(); return 0; }//NdbOperation::prepareSend() @@ -648,71 +515,10 @@ NdbOperation::prepareSendInterpreted() theFirstATTRINFO->setData(tFinalReadSize, 7); theFirstATTRINFO->setData(tSubroutineSize, 8); }//if + theReceiver.prepareSend(); return 0; }//NdbOperation::prepareSendInterpreted() -/*************************************************************************** -int TCOPCONF(int anAttrInfoLen) - -Return Value: Return 0 : send was succesful. - Return -1: In all other case. -Parameters: anAttrInfoLen: The length of the attribute information from TC. -Remark: Handles the reception of the TC[KEY/INDX]CONF signal. -***************************************************************************/ -void -NdbOperation::TCOPCONF(Uint32 anAttrInfoLen) -{ - Uint32 tCurrRecLen = theCurrRecAI_Len; - if (theStatus == WaitResponse) { - theTotalRecAI_Len = anAttrInfoLen; - if (anAttrInfoLen == tCurrRecLen) { - Uint32 tAI_ElemLen = theAI_ElementLen; - NdbRecAttr* tCurrRecAttr = theCurrentRecAttr; - theStatus = Finished; - - if ((tAI_ElemLen == 0) && - (tCurrRecAttr == NULL)) { - NdbRecAttr* tRecAttr = theFirstRecAttr; - while (tRecAttr != NULL) { - if (tRecAttr->copyoutRequired()) // copy to application buffer - tRecAttr->copyout(); - tRecAttr = tRecAttr->next(); - } - theNdbCon->OpCompleteSuccess(); - return; - } else if (tAI_ElemLen != 0) { - setErrorCode(4213); - theNdbCon->OpCompleteFailure(); - return; - } else { - setErrorCode(4214); - theNdbCon->OpCompleteFailure(); - return; - }//if - } else if (anAttrInfoLen > tCurrRecLen) { - return; - } else { - theStatus = Finished; - - if (theAI_ElementLen != 0) { - setErrorCode(4213); - theNdbCon->OpCompleteFailure(); - return; - }//if - if (theCurrentRecAttr != NULL) { - setErrorCode(4214); - theNdbCon->OpCompleteFailure(); - return; - }//if - theNdbCon->OpCompleteFailure(); - return; - }//if - } else { - setErrorCode(4004); - }//if - return; -}//NdbOperation::TCKEYOPCONF() - int NdbOperation::checkState_TransId(NdbApiSignal* aSignal) { @@ -777,188 +583,13 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal) }//NdbOperation::receiveTCKEYREF() -/*************************************************************************** -int receiveREAD_CONF( NdbApiSignal* aSignal) - -Return Value: Return 0 : send was succesful. - Return -1: In all other case. -Parameters: aSignal: the signal object that contains the READCONF signal from TUP. -Remark: Handles the reception of the READCONF signal. -***************************************************************************/ -int -NdbOperation::receiveREAD_CONF(const Uint32* aDataPtr, Uint32 aDataLength) -{ - Uint64 tRecTransId, tCurrTransId; - Uint32 tCondFlag = (Uint32)(theStatus - WaitResponse); - Uint32 tTotLen = aDataPtr[3]; - - tRecTransId = (Uint64)aDataPtr[1] + ((Uint64)aDataPtr[2] << 32); - tCurrTransId = theNdbCon->getTransactionId(); - tCondFlag |= (Uint32)((tRecTransId - tCurrTransId) != (Uint64)0); - tCondFlag |= (Uint32)(aDataLength < 4); - - if (tCondFlag == 0) { - theTotalRecAI_Len = tTotLen; - int tRetValue = receiveREAD_AI((Uint32*)&aDataPtr[4], (aDataLength - 4)); - if (theStatus == Finished) { - return tRetValue; - } else { - theStatus = Finished; - return theNdbCon->OpCompleteFailure(); - }//if - }//if -#ifdef NDB_NO_DROPPED_SIGNAL - abort(); -#endif - return -1; -}//NdbOperation::receiveREAD_CONF() - -/*************************************************************************** -int receiveTRANSID_AI( NdbApiSignal* aSignal) - -Return Value: Return 0 : send was succesful. - Return -1: In all other case. -Parameters: aSignal: the signal object that contains the TRANSID_AI signal. -Remark: Handles the reception of the TRANSID_AI signal. -***************************************************************************/ -int -NdbOperation::receiveTRANSID_AI(const Uint32* aDataPtr, Uint32 aDataLength) -{ - Uint64 tRecTransId, tCurrTransId; - Uint32 tCondFlag = (Uint32)(theStatus - WaitResponse); - - tRecTransId = (Uint64)aDataPtr[1] + ((Uint64)aDataPtr[2] << 32); - tCurrTransId = theNdbCon->getTransactionId(); - tCondFlag |= (Uint32)((tRecTransId - tCurrTransId) != (Uint64)0); - tCondFlag |= (Uint32)(aDataLength < 3); - - if (tCondFlag == 0) { - return receiveREAD_AI((Uint32*)&aDataPtr[3], (aDataLength - 3)); - }//if -#ifdef NDB_NO_DROPPED_SIGNAL - abort(); -#endif - return -1; -}//NdbOperation::receiveTRANSID_AI() - -/*************************************************************************** -int receiveREAD_AI( NdbApiSignal* aSignal, int aLength, int aStartPos) - -Return Value: Return 0 : send was succesoccurredful. - Return -1: In all other case. -Parameters: aSignal: the signal object that contains the LEN_ATTRINFO11 signal. - aLength: - aStartPos: -Remark: Handles the reception of the LEN_ATTRINFO11 signal. -***************************************************************************/ -int -NdbOperation::receiveREAD_AI(Uint32* aDataPtr, Uint32 aLength) -{ - - register Uint32 tAI_ElementLen = theAI_ElementLen; - register Uint32* tCurrElemPtr = theCurrElemPtr; - if (theError.code == 0) { - // If inconsistency error occurred we will still continue - // receiving signals since we need to know whether commit - // has occurred. - - register Uint32 tData; - for (register Uint32 i = 0; i < aLength ; i++, aDataPtr++) - { - // Code to receive Attribute Information - tData = *aDataPtr; - if (tAI_ElementLen != 0) { - tAI_ElementLen--; - *tCurrElemPtr = tData; - tCurrElemPtr++; - continue; - } else { - // Waiting for a new attribute element - NdbRecAttr* tWorkingRecAttr; - - tWorkingRecAttr = theCurrentRecAttr; - AttributeHeader ah(tData); - const Uint32 tAttrId = ah.getAttributeId(); - const Uint32 tAttrSize = ah.getDataSize(); - if ((tWorkingRecAttr != NULL) && - (tWorkingRecAttr->attrId() == tAttrId)) { - ; - } else { - setErrorCode(4211); - break; - }//if - theCurrentRecAttr = tWorkingRecAttr->next(); - NdbColumnImpl * col = m_currentTable->getColumn(tAttrId); - if (ah.isNULL()) { - // Return a Null value from the NDB to the attribute. - if(col != 0 && col->m_nullable) { - tWorkingRecAttr->setNULL(); - tAI_ElementLen = 0; - } else { - setErrorCode(4212); - break; - }//if - } else { - // Return a value from the NDB to the attribute. - tWorkingRecAttr->setNotNULL(); - const Uint32 sizeInBytes = col->m_attrSize * col->m_arraySize; - const Uint32 sizeInWords = (sizeInBytes + 3) / 4; - tAI_ElementLen = tAttrSize; - tCurrElemPtr = (Uint32*)tWorkingRecAttr->aRef(); - if (sizeInWords == tAttrSize){ - continue; - } else { - setErrorCode(4201); - break; - }//if - }//if - }//if - }//for - }//if - Uint32 tCurrRecLen = theCurrRecAI_Len; - Uint32 tTotRecLen = theTotalRecAI_Len; - theAI_ElementLen = tAI_ElementLen; - theCurrElemPtr = tCurrElemPtr; - tCurrRecLen = tCurrRecLen + aLength; - theCurrRecAI_Len = tCurrRecLen; // Update Current Received AI Length - if (tTotRecLen == tCurrRecLen){ // Operation completed - NdbRecAttr* tCurrRecAttr = theCurrentRecAttr; - theStatus = Finished; - - NdbConnection* tNdbCon = theNdbCon; - if ((tAI_ElementLen == 0) && - (tCurrRecAttr == NULL)) { - NdbRecAttr* tRecAttr = theFirstRecAttr; - while (tRecAttr != NULL) { - if (tRecAttr->copyoutRequired()) // copy to application buffer - tRecAttr->copyout(); - tRecAttr = tRecAttr->next(); - } - return tNdbCon->OpCompleteSuccess(); - } else if (tAI_ElementLen != 0) { - setErrorCode(4213); - return tNdbCon->OpCompleteFailure(); - } else { - setErrorCode(4214); - return tNdbCon->OpCompleteFailure(); - }//if - } - else if ((tCurrRecLen > tTotRecLen) && - (tTotRecLen > 0)) { /* == 0 if TCKEYCONF not yet received */ - setErrorCode(4215); - theStatus = Finished; - - return theNdbCon->OpCompleteFailure(); - }//if - return -1; // Continue waiting for more signals of this operation -}//NdbOperation::receiveREAD_AI() void NdbOperation::handleFailedAI_ElemLen() { - NdbRecAttr* tRecAttr = theFirstRecAttr; + NdbRecAttr* tRecAttr = theReceiver.theFirstRecAttr; while (tRecAttr != NULL) { - tRecAttr->setUNDEFINED(); + tRecAttr->setNULL(); tRecAttr = tRecAttr->next(); }//while }//NdbOperation::handleFailedAI_ElemLen() diff --git a/ndb/src/ndbapi/NdbOperationInt.cpp b/ndb/src/ndbapi/NdbOperationInt.cpp index e61fc5b05d7..9abcfd6ef33 100644 --- a/ndb/src/ndbapi/NdbOperationInt.cpp +++ b/ndb/src/ndbapi/NdbOperationInt.cpp @@ -33,6 +33,7 @@ Adjust: 991029 UABRONM First version. #include "NdbRecAttr.hpp" #include "NdbUtil.hpp" #include "Interpreter.hpp" +#include <NdbScanOperation.hpp> #ifdef VM_TRACE #include <NdbEnv.h> @@ -43,6 +44,31 @@ Adjust: 991029 UABRONM First version. #define INT_DEBUG(x) #endif +void +NdbOperation::initInterpreter(){ + theFirstLabel = NULL; + theLastLabel = NULL; + theFirstBranch = NULL; + theLastBranch = NULL; + + theFirstCall = NULL; + theLastCall = NULL; + theFirstSubroutine = NULL; + theLastSubroutine = NULL; + + theNoOfLabels = 0; + theNoOfSubroutines = 0; + + theSubroutineSize = 0; + theInitialReadSize = 0; + theInterpretedSize = 0; + theFinalUpdateSize = 0; + theFinalReadSize = 0; + theInterpretIndicator = 1; + + theTotalCurrAI_Len = 5; +} + int NdbOperation::incCheck(const NdbColumnImpl* tNdbColumnImpl) { @@ -191,7 +217,7 @@ NdbOperation::initial_interpreterCheck() { if ((theInterpretIndicator == 1)) { if (theStatus == SetBound) { - saveBoundATTRINFO(); + ((NdbScanOperation*)this)->saveBoundATTRINFO(); theStatus = GetValue; } if (theStatus == ExecInterpretedValue) { diff --git a/ndb/src/ndbapi/NdbOperationScan.cpp b/ndb/src/ndbapi/NdbOperationScan.cpp index 0c377d3fd98..283eb591bdb 100644 --- a/ndb/src/ndbapi/NdbOperationScan.cpp +++ b/ndb/src/ndbapi/NdbOperationScan.cpp @@ -14,563 +14,3 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include "NdbOperation.hpp" -#include "NdbScanReceiver.hpp" - -#include <signaldata/TcKeyReq.hpp> -#include <signaldata/ScanTab.hpp> -#include <signaldata/ScanFrag.hpp> -#include <signaldata/KeyInfo.hpp> - - -/****************************************************************************** - * int openScanRead(); - *****************************************************************************/ -int -NdbOperation::openScanRead(Uint32 aParallelism) -{ - aParallelism = checkParallelism(aParallelism); - - if ((theNdbCon->theCommitStatus != NdbConnection::Started) && - (theStatus != Init) && - (aParallelism == 0)) { - setErrorCode(4200); - return -1; - } - return openScan(aParallelism, false, false, false); -} - -/**************************************************************************** - * int openScanExclusive(); - ****************************************************************************/ -int -NdbOperation::openScanExclusive(Uint32 aParallelism) -{ - aParallelism = checkParallelism(aParallelism); - - if ((theNdbCon->theCommitStatus != NdbConnection::Started) && - (theStatus != Init) && - (aParallelism == 0)) { - setErrorCode(4200); - return -1; - } - return openScan(aParallelism, true, true, false); -} - -/****************************************************************************** - * int openScanReadHoldLock(); - *****************************************************************************/ -int -NdbOperation::openScanReadHoldLock(Uint32 aParallelism) -{ - aParallelism = checkParallelism(aParallelism); - - if ((theNdbCon->theCommitStatus != NdbConnection::Started) && - (theStatus != Init) && - (aParallelism == 0)) { - setErrorCode(4200); - return -1; - } - return openScan(aParallelism, false, true, false); -} - -/****************************************************************************** - * int openScanReadCommitted(); - *****************************************************************************/ -int -NdbOperation::openScanReadCommitted(Uint32 aParallelism) -{ - aParallelism = checkParallelism(aParallelism); - - if ((theNdbCon->theCommitStatus != NdbConnection::Started) && - (theStatus != Init) && - (aParallelism == 0)) { - setErrorCode(4200); - return -1; - } - return openScan(aParallelism, false, false, true); -} - -/**************************************************************************** - * int checkParallelism(); - * Remark If the parallelism is set wrong the number of scan-operations - * will not correspond to the number of TRANSID_AI signals returned - * from NDB and the result will be a crash, therefore - * we adjust it or return an error if the value is totally wrong. - ****************************************************************************/ -int -NdbOperation::checkParallelism(Uint32 aParallelism) -{ - if (aParallelism == 0) { - setErrorCodeAbort(4232); - return 0; - } - if (aParallelism > 16) { - if (aParallelism <= 240) { - - /** - * If tscanConcurrency > 16 it must be a multiple of 16 - */ - if (((aParallelism >> 4) << 4) < aParallelism) { - aParallelism = ((aParallelism >> 4) << 4) + 16; - }//if - - /*---------------------------------------------------------------*/ - /* We cannot have a parallelism > 16 per node */ - /*---------------------------------------------------------------*/ - if ((aParallelism / theNdb->theNoOfDBnodes) > 16) { - aParallelism = theNdb->theNoOfDBnodes * 16; - }//if - - } else { - setErrorCodeAbort(4232); - aParallelism = 0; - }//if - }//if - return aParallelism; -}//NdbOperation::checkParallelism() - -/********************************************************************** - * int openScan(); - *************************************************************************/ -int -NdbOperation::openScan(Uint32 aParallelism, - bool lockMode, bool lockHoldMode, bool readCommitted) -{ - aParallelism = checkParallelism(aParallelism); - if(aParallelism == 0){ - return 0; - } - NdbScanReceiver* tScanRec; - // It is only possible to call openScan if - // 1. this transcation don't already contain another scan operation - // 2. this transaction don't already contain other operations - // 3. theScanOp contains a NdbScanOperation - if (theNdbCon->theScanningOp != NULL){ - setErrorCode(4605); - return -1; - } - - if ((theNdbCon->theFirstOpInList != this) || - (theNdbCon->theLastOpInList != this)) { - setErrorCode(4603); - return -1; - } - theNdbCon->theScanningOp = this; - - initScan(); - theParallelism = aParallelism; - - // If the scan is on ordered index then it is a range scan - if (m_currentTable->m_indexType == NdbDictionary::Index::OrderedIndex || - m_currentTable->m_indexType == NdbDictionary::Index::UniqueOrderedIndex) { - assert(m_currentTable == m_accessTable); - m_currentTable = theNdb->theDictionary->getTable(m_currentTable->m_primaryTable.c_str()); - assert(m_currentTable != NULL); - // Modify operation state - theStatus = SetBound; - theOperationType = OpenRangeScanRequest; - } - - theScanReceiversArray = new NdbScanReceiver* [aParallelism]; - if (theScanReceiversArray == NULL){ - setErrorCodeAbort(4000); - return -1; - } - - for (Uint32 i = 0; i < aParallelism; i ++) { - tScanRec = theNdb->getNdbScanRec(); - if (tScanRec == NULL) { - setErrorCodeAbort(4000); - return -1; - }//if - tScanRec->init(this, lockMode); - theScanReceiversArray[i] = tScanRec; - } - - theSCAN_TABREQ = theNdb->getSignal(); - if (theSCAN_TABREQ == NULL) { - setErrorCodeAbort(4000); - return -1; - }//if - ScanTabReq * const scanTabReq = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); - scanTabReq->apiConnectPtr = theNdbCon->theTCConPtr; - scanTabReq->tableId = m_accessTable->m_tableId; - scanTabReq->tableSchemaVersion = m_accessTable->m_version; - scanTabReq->storedProcId = 0xFFFF; - scanTabReq->buddyConPtr = theNdbCon->theBuddyConPtr; - - Uint32 reqInfo = 0; - ScanTabReq::setParallelism(reqInfo, aParallelism); - ScanTabReq::setLockMode(reqInfo, lockMode); - ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode); - ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted); - if (theOperationType == OpenRangeScanRequest) - ScanTabReq::setRangeScanFlag(reqInfo, true); - scanTabReq->requestInfo = reqInfo; - - Uint64 transId = theNdbCon->getTransactionId(); - scanTabReq->transId1 = (Uint32) transId; - scanTabReq->transId2 = (Uint32) (transId >> 32); - - for (Uint32 i = 0; i < 16 && i < aParallelism ; i++) { - scanTabReq->apiOperationPtr[i] = theScanReceiversArray[i]->ptr2int(); - }//for - - // Create one additional SCAN_TABINFO for each - // 16 of parallelism - NdbApiSignal* tSignal; - Uint32 tParallelism = aParallelism; - while (tParallelism > 16) { - tSignal = theNdb->getSignal(); - if (tSignal == NULL) { - setErrorCodeAbort(4000); - return -1; - }//if - if (tSignal->setSignal(GSN_SCAN_TABINFO) == -1) { - setErrorCode(4001); - return -1; - } - tSignal->next(theFirstSCAN_TABINFO_Send); - theFirstSCAN_TABINFO_Send = tSignal; - tParallelism -= 16; - }//while - - // Format all SCAN_TABINFO signals - tParallelism = 16; - tSignal = theFirstSCAN_TABINFO_Send; - while (tSignal != NULL) { - tSignal->setData(theNdbCon->theTCConPtr, 1); - for (int i = 0; i < 16 ; i++) { - tSignal->setData(theScanReceiversArray[i + tParallelism]->ptr2int(), i + 2); - }//for - tSignal = tSignal->next(); - tParallelism += 16; - }//while - - getFirstATTRINFOScan(); - return 0; -}//NdbScanOperation::openScan() - -/***************************************************************************** - * int getFirstATTRINFOScan( U_int32 aData ) - * - * Return Value: Return 0: Successful - * Return -1: All other cases - * Parameters: None: Only allocate the first signal. - * Remark: When a scan is defined we need to use this method instead - * of insertATTRINFO for the first signal. - * This is because we need not to mess up the code in - * insertATTRINFO with if statements since we are not - * interested in the TCKEYREQ signal. - *****************************************************************************/ -int -NdbOperation::getFirstATTRINFOScan() -{ - NdbApiSignal* tSignal; - - tSignal = theNdb->getSignal(); - if (tSignal == NULL){ - setErrorCodeAbort(4000); - return -1; - } - tSignal->setSignal(m_attrInfoGSN); - theAI_LenInCurrAI = 8; - theATTRINFOptr = &tSignal->getDataPtrSend()[8]; - theFirstATTRINFO = tSignal; - theCurrentATTRINFO = tSignal; - theCurrentATTRINFO->next(NULL); - return 0; -} - -/* - * After setBound() are done, move the accumulated ATTRINFO signals to - * a separate list. Then continue with normal scan. - */ -int -NdbOperation::saveBoundATTRINFO() -{ - theCurrentATTRINFO->setLength(theAI_LenInCurrAI); - theBoundATTRINFO = theFirstATTRINFO; - theTotalBoundAI_Len = theTotalCurrAI_Len; - theTotalCurrAI_Len = 5; - theBoundATTRINFO->setData(theTotalBoundAI_Len, 4); - theBoundATTRINFO->setData(0, 5); - theBoundATTRINFO->setData(0, 6); - theBoundATTRINFO->setData(0, 7); - theBoundATTRINFO->setData(0, 8); - theStatus = GetValue; - return getFirstATTRINFOScan(); -} - -/***************************************************************************** - * void releaseScan() - * - * Return Value No return value. - * Parameters: No parameters. - * Remark: Release objects after scanning. - *****************************************************************************/ -void -NdbOperation::releaseScan() -{ - NdbScanReceiver* tScanRec; - TransporterFacade::instance()->lock_mutex(); - for (Uint32 i = 0; i < theParallelism && theScanReceiversArray != NULL; i++) { - tScanRec = theScanReceiversArray[i]; - if (tScanRec != NULL) { - tScanRec->release(); - tScanRec->next(NULL); - } - } - TransporterFacade::instance()->unlock_mutex(); - releaseSignals(); - - if (theScanReceiversArray != NULL) { - for (Uint32 i = 0; i < theParallelism; i++) { - NdbScanReceiver* tScanRec; - tScanRec = theScanReceiversArray[i]; - if (tScanRec != NULL) { - theNdb->releaseNdbScanRec(tScanRec); - theScanReceiversArray[i] = NULL; - } - } - - delete [] theScanReceiversArray; - }//if - theScanReceiversArray = NULL; - - if (theSCAN_TABREQ != NULL){ - theNdb->releaseSignal(theSCAN_TABREQ); - theSCAN_TABREQ = NULL; - } -} - -void NdbOperation::releaseSignals(){ - theNdb->releaseSignalsInList(&theFirstSCAN_TABINFO_Send); - theFirstSCAN_TABINFO_Send = NULL; - theLastSCAN_TABINFO_Send = NULL; - // theNdb->releaseSignalsInList(&theFirstSCAN_TABINFO_Recv); - - while(theFirstSCAN_TABINFO_Recv != NULL){ - NdbApiSignal* tmp = theFirstSCAN_TABINFO_Recv; - theFirstSCAN_TABINFO_Recv = tmp->next(); - delete tmp; - } - theFirstSCAN_TABINFO_Recv = NULL; - theLastSCAN_TABINFO_Recv = NULL; - if (theSCAN_TABCONF_Recv != NULL){ - // theNdb->releaseSignal(theSCAN_TABCONF_Recv); - delete theSCAN_TABCONF_Recv; - theSCAN_TABCONF_Recv = NULL; - } -} - - -void NdbOperation::prepareNextScanResult(){ - NdbScanReceiver* tScanRec; - for (Uint32 i = 0; i < theParallelism; i++) { - tScanRec = theScanReceiversArray[i]; - assert(tScanRec != NULL); - tScanRec->prepareNextScanResult(); - tScanRec->next(NULL); - } - releaseSignals(); -} - -/****************************************************************************** - * void initScan(); - * - * Return Value: Return 0 : init was successful. - * Return -1: In all other case. - * Remark: Initiates operation record after allocation. - *****************************************************************************/ -void -NdbOperation::initScan() -{ - theTotalRecAI_Len = 0; - theCurrRecAI_Len = 0; - theStatus = GetValue; - theOperationType = OpenScanRequest; - theCurrentRecAttr = theFirstRecAttr; - theScanInfo = 0; - theMagicNumber = 0xABCDEF01; - theTotalCurrAI_Len = 5; - - theFirstLabel = NULL; - theLastLabel = NULL; - theFirstBranch = NULL; - theLastBranch = NULL; - - theFirstCall = NULL; - theLastCall = NULL; - theFirstSubroutine = NULL; - theLastSubroutine = NULL; - - theNoOfLabels = 0; - theNoOfSubroutines = 0; - - theSubroutineSize = 0; - theInitialReadSize = 0; - theInterpretedSize = 0; - theFinalUpdateSize = 0; - theFinalReadSize = 0; - theInterpretIndicator = 1; - - - theFirstSCAN_TABINFO_Send = NULL; - theLastSCAN_TABINFO_Send = NULL; - theFirstSCAN_TABINFO_Recv = NULL; - theLastSCAN_TABINFO_Recv = NULL; - theSCAN_TABCONF_Recv = NULL; - - theScanReceiversArray = NULL; - - theTotalBoundAI_Len = 0; - theBoundATTRINFO = NULL; - return; -} - -NdbOperation* NdbOperation::takeOverForDelete(NdbConnection* updateTrans){ - return takeOverScanOp(DeleteRequest, updateTrans); -} - -NdbOperation* NdbOperation::takeOverForUpdate(NdbConnection* updateTrans){ - return takeOverScanOp(UpdateRequest, updateTrans); -} -/****************************************************************************** - * NdbOperation* takeOverScanOp(NdbConnection* updateTrans); - * - * Parameters: The update transactions NdbConnection pointer. - * Return Value: A reference to the transferred operation object - * or NULL if no success. - * Remark: Take over the scanning transactions NdbOperation - * object for a tuple to an update transaction, - * which is the last operation read in nextScanResult() - * (theNdbCon->thePreviousScanRec) - * - * FUTURE IMPLEMENTATION: (This note was moved from header file.) - * In the future, it will even be possible to transfer - * to a NdbConnection on another Ndb-object. - * In this case the receiving NdbConnection-object must call - * a method receiveOpFromScan to actually receive the information. - * This means that the updating transactions can be placed - * in separate threads and thus increasing the parallelism during - * the scan process. - *****************************************************************************/ -NdbOperation* -NdbOperation::takeOverScanOp(OperationType opType, NdbConnection* updateTrans) -{ - if (opType != UpdateRequest && opType != DeleteRequest) { - setErrorCode(4604); - return NULL; - } - - const NdbScanReceiver* tScanRec = theNdbCon->thePreviousScanRec; - if (tScanRec == NULL){ - // No operation read by nextScanResult - setErrorCode(4609); - return NULL; - } - - if (tScanRec->theFirstKEYINFO20_Recv == NULL){ - // No KEYINFO20 received - setErrorCode(4608); - return NULL; - } - - NdbOperation * newOp = updateTrans->getNdbOperation(m_currentTable); - if (newOp == NULL){ - return NULL; - } - - /** - * Copy and caclulate attributes from the scanned operation to the - * new operation - */ - const KeyInfo20 * const firstKeyInfo20 = - CAST_CONSTPTR(KeyInfo20, tScanRec->theFirstKEYINFO20_Recv->getDataPtr()); - const Uint32 totalKeyLen = firstKeyInfo20->keyLen; - newOp->theTupKeyLen = totalKeyLen; - - newOp->theOperationType = opType; - if (opType == DeleteRequest) { - newOp->theStatus = GetValue; - } else { - newOp->theStatus = SetValue; - } - const Uint32 tScanInfo = firstKeyInfo20->scanInfo_Node & 0xFFFF; - const Uint32 tTakeOverNode = firstKeyInfo20->scanInfo_Node >> 16; - { - UintR scanInfo = 0; - TcKeyReq::setTakeOverScanFlag(scanInfo, 1); - TcKeyReq::setTakeOverScanNode(scanInfo, tTakeOverNode); - TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo); - newOp->theScanInfo = scanInfo; - } - - /** - * Copy received KEYINFO20 signals into TCKEYREQ and KEYINFO signals - * put them in list of the new op - */ - TcKeyReq * const tcKeyReq = - CAST_PTR(TcKeyReq, newOp->theTCREQ->getDataPtrSend()); - - // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ - for (Uint32 i = 0; i < TcKeyReq::MaxKeyInfo; i++) { - tcKeyReq->keyInfo[i] = firstKeyInfo20->keyData[i]; - } - if (totalKeyLen > TcKeyReq::MaxKeyInfo) { - - Uint32 keyWordsCopied = TcKeyReq::MaxKeyInfo; - - // Create KEYINFO signals in newOp - for (Uint32 i = keyWordsCopied; i < totalKeyLen; i += KeyInfo::DataLength){ - NdbApiSignal* tSignal = theNdb->getSignal(); - if (tSignal == NULL){ - setErrorCodeAbort(4000); - return NULL; - } - if (tSignal->setSignal(GSN_KEYINFO) == -1){ - setErrorCodeAbort(4001); - return NULL; - } - tSignal->next(newOp->theFirstKEYINFO); - newOp->theFirstKEYINFO = tSignal; - } - - // Init pointers to KEYINFO20 signal - NdbApiSignal* currKeyInfo20 = tScanRec->theFirstKEYINFO20_Recv; - const KeyInfo20 * keyInfo20 = - CAST_CONSTPTR(KeyInfo20, currKeyInfo20->getDataPtr()); - Uint32 posInKeyInfo20 = keyWordsCopied; - - // Init pointers to KEYINFO signal - NdbApiSignal* currKeyInfo = newOp->theFirstKEYINFO; - KeyInfo * keyInfo = CAST_PTR(KeyInfo, currKeyInfo->getDataPtrSend()); - Uint32 posInKeyInfo = 0; - - // Copy from KEYINFO20 to KEYINFO - while(keyWordsCopied < totalKeyLen){ - keyInfo->keyData[posInKeyInfo++] = keyInfo20->keyData[posInKeyInfo20++]; - keyWordsCopied++; - if(keyWordsCopied >= totalKeyLen) - break; - if (posInKeyInfo20 >= - (currKeyInfo20->getLength()-KeyInfo20::HeaderLength)){ - currKeyInfo20 = currKeyInfo20->next(); - keyInfo20 = CAST_CONSTPTR(KeyInfo20, currKeyInfo20->getDataPtr()); - posInKeyInfo20 = 0; - } - if (posInKeyInfo >= KeyInfo::DataLength){ - currKeyInfo = currKeyInfo->next(); - keyInfo = CAST_PTR(KeyInfo, currKeyInfo->getDataPtrSend()); - posInKeyInfo = 0; - } - } - } - - return newOp; -} - - - diff --git a/ndb/src/ndbapi/NdbRecAttr.cpp b/ndb/src/ndbapi/NdbRecAttr.cpp index 0f7baeac4f5..18ce59745d0 100644 --- a/ndb/src/ndbapi/NdbRecAttr.cpp +++ b/ndb/src/ndbapi/NdbRecAttr.cpp @@ -57,6 +57,8 @@ NdbRecAttr::setup(const NdbColumnImpl* anAttrInfo, char* aValue) theAttrSize = tAttrSize; theArraySize = tArraySize; theValue = aValue; + theNULLind = 0; + m_nullable = anAttrInfo->m_nullable; // check alignment to signal data // a future version could check alignment per data type as well @@ -124,3 +126,19 @@ NdbRecAttr::clone() const { memcpy(ret->theRef, theRef, n); return ret; } + +bool +NdbRecAttr::receive_data(const Uint32 * data, Uint32 sz){ + const Uint32 n = (theAttrSize * theArraySize + 3) >> 2; + if(n == sz){ + if(!copyoutRequired()) + memcpy(theRef, data, 4 * sz); + else + memcpy(theValue, data, theAttrSize * theArraySize); + return true; + } else if(sz == 0){ + setNULL(); + return true; + } + return false; +} diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp index 4c461698a4a..7a538de3d7c 100644 --- a/ndb/src/ndbapi/NdbReceiver.cpp +++ b/ndb/src/ndbapi/NdbReceiver.cpp @@ -16,6 +16,10 @@ #include "NdbImpl.hpp" #include <NdbReceiver.hpp> +#include "NdbDictionaryImpl.hpp" +#include <NdbRecAttr.hpp> +#include <AttributeHeader.hpp> +#include <NdbConnection.hpp> NdbReceiver::NdbReceiver(Ndb *aNdb) : theMagicNumber(0), @@ -24,10 +28,11 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) : m_type(NDB_UNINITIALIZED), m_owner(0) { + theCurrentRecAttr = theFirstRecAttr = 0; } void -NdbReceiver::init(ReceiverType type, void* owner) +NdbReceiver::init(ReceiverType type, void* owner, bool keyInfo) { theMagicNumber = 0x11223344; m_type = type; @@ -36,6 +41,24 @@ NdbReceiver::init(ReceiverType type, void* owner) if (m_ndb) m_id = m_ndb->theNdbObjectIdMap->map(this); } + + theFirstRecAttr = NULL; + theCurrentRecAttr = NULL; + m_key_info = (keyInfo ? 1 : 0); + m_defined_rows = 0; +} + +void +NdbReceiver::release(){ + NdbRecAttr* tRecAttr = theFirstRecAttr; + while (tRecAttr != NULL) + { + NdbRecAttr* tSaveRecAttr = tRecAttr; + tRecAttr = tRecAttr->next(); + m_ndb->releaseRecAttr(tSaveRecAttr); + } + theFirstRecAttr = NULL; + theCurrentRecAttr = NULL; } NdbReceiver::~NdbReceiver() @@ -44,3 +67,150 @@ NdbReceiver::~NdbReceiver() m_ndb->theNdbObjectIdMap->unmap(m_id, this); } } + +NdbRecAttr * +NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){ + NdbRecAttr* tRecAttr = m_ndb->getRecAttr(); + if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){ + if (theFirstRecAttr == NULL) + theFirstRecAttr = tRecAttr; + else + theCurrentRecAttr->next(tRecAttr); + theCurrentRecAttr = tRecAttr; + tRecAttr->next(NULL); + return tRecAttr; + } + if(tRecAttr){ + m_ndb->releaseRecAttr(tRecAttr); + } + return 0; +} + +#define KEY_ATTR_ID (~0) + +void +NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ + m_defined_rows = rows; + m_rows = new NdbRecAttr*[rows + 1]; m_rows[rows] = 0; + + NdbColumnImpl key; + if(key_size){ + key.m_attrId = KEY_ATTR_ID; + key.m_arraySize = key_size+1; + key.m_attrSize = 4; + key.m_nullable = true; // So that receive works w.r.t KEYINFO20 + } + + for(Uint32 i = 0; i<rows; i++){ + NdbRecAttr * prev = theCurrentRecAttr; + + // Put key-recAttr fir on each row + if(key_size && !getValue(&key, (char*)0)){ + abort(); + return ; // -1 + } + + NdbRecAttr* tRecAttr = org->theFirstRecAttr; + while(tRecAttr != 0){ + if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0)) + tRecAttr = tRecAttr->next(); + else + break; + } + + if(tRecAttr){ + abort(); + return ;// -1; + } + + // Store first recAttr for each row in m_rows[i] + if(prev){ + m_rows[i] = prev->next(); + } else { + m_rows[i] = theFirstRecAttr; + } + } + + prepareSend(); + return ; //0; +} + +void +NdbReceiver::copyout(NdbReceiver & dstRec){ + NdbRecAttr* src = m_rows[m_current_row++]; + NdbRecAttr* dst = dstRec.theFirstRecAttr; + Uint32 tmp = m_key_info; + if(tmp > 0){ + src = src->next(); + } + + while(dst){ + Uint32 len = ((src->theAttrSize * src->theArraySize)+3)/4; + dst->receive_data((Uint32*)src->aRef(), len); + src = src->next(); + dst = dst->next(); + } +} + +int +NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength) +{ + bool ok = true; + NdbRecAttr* currRecAttr = theCurrentRecAttr; + NdbRecAttr* prevRecAttr = currRecAttr; + + for (Uint32 used = 0; used < aLength ; used++){ + AttributeHeader ah(* aDataPtr++); + const Uint32 tAttrId = ah.getAttributeId(); + const Uint32 tAttrSize = ah.getDataSize(); + + /** + * Set all results to NULL if not found... + */ + while(currRecAttr && currRecAttr->attrId() != tAttrId){ + ok &= currRecAttr->setNULL(); + prevRecAttr = currRecAttr; + currRecAttr = currRecAttr->next(); + } + + if(ok && currRecAttr && currRecAttr->receive_data(aDataPtr, tAttrSize)){ + used += tAttrSize; + aDataPtr += tAttrSize; + prevRecAttr = currRecAttr; + currRecAttr = currRecAttr->next(); + } else { + ndbout_c("%p: ok: %d tAttrId: %d currRecAttr: %p", + this,ok, tAttrId, currRecAttr); + abort(); + return -1; + } + } + + theCurrentRecAttr = currRecAttr; + + /** + * Update m_received_result_length + */ + Uint32 tmp = m_received_result_length + aLength; + m_received_result_length = tmp; + + return (tmp == m_expected_result_length ? 1 : 0); +} + +int +NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength) +{ + NdbRecAttr* currRecAttr = m_rows[m_current_row++]; + assert(currRecAttr->attrId() == KEY_ATTR_ID); + currRecAttr->receive_data(aDataPtr, aLength + 1); + + /** + * Save scanInfo in the end of keyinfo + */ + ((Uint32*)currRecAttr->aRef())[aLength] = info; + + Uint32 tmp = m_received_result_length + aLength; + m_received_result_length = tmp; + + return (tmp == m_expected_result_length ? 1 : 0); +} diff --git a/ndb/src/ndbapi/NdbResultSet.cpp b/ndb/src/ndbapi/NdbResultSet.cpp index 65ed43f60d8..d15c58ba972 100644 --- a/ndb/src/ndbapi/NdbResultSet.cpp +++ b/ndb/src/ndbapi/NdbResultSet.cpp @@ -30,7 +30,7 @@ #include <NdbConnection.hpp> #include <NdbResultSet.hpp> -NdbResultSet::NdbResultSet(NdbCursorOperation *owner) +NdbResultSet::NdbResultSet(NdbScanOperation *owner) : m_operation(owner) { } @@ -55,49 +55,22 @@ void NdbResultSet::close() NdbOperation* NdbResultSet::updateTuple(){ - if(m_operation->cursorType() != NdbCursorOperation::ScanCursor){ - m_operation->setErrorCode(4003); - return 0; - } - - NdbScanOperation * op = (NdbScanOperation*)(m_operation); - return op->takeOverScanOp(NdbOperation::UpdateRequest, - op->m_transConnection); + return updateTuple(m_operation->m_transConnection); } NdbOperation* NdbResultSet::updateTuple(NdbConnection* takeOverTrans){ - if(m_operation->cursorType() != NdbCursorOperation::ScanCursor){ - m_operation->setErrorCode(4003); - return 0; - } - return m_operation->takeOverScanOp(NdbOperation::UpdateRequest, takeOverTrans); } int NdbResultSet::deleteTuple(){ - if(m_operation->cursorType() != NdbCursorOperation::ScanCursor){ - m_operation->setErrorCode(4003); - return 0; - } - - NdbScanOperation * op = (NdbScanOperation*)(m_operation); - void * res = op->takeOverScanOp(NdbOperation::DeleteRequest, - op->m_transConnection); - if(res == 0) - return -1; - return 0; + return deleteTuple(m_operation->m_transConnection); } int NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){ - if(m_operation->cursorType() != NdbCursorOperation::ScanCursor){ - m_operation->setErrorCode(4003); - return 0; - } - void * res = m_operation->takeOverScanOp(NdbOperation::DeleteRequest, takeOverTrans); if(res == 0) diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 8064eef838e..8a22d6a3c0f 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -26,36 +26,57 @@ * Adjust: 2002-04-01 UABMASD First version. ****************************************************************************/ -#include <ndb_global.h> #include <Ndb.hpp> #include <NdbScanOperation.hpp> +#include <NdbIndexScanOperation.hpp> #include <NdbConnection.hpp> #include <NdbResultSet.hpp> #include "NdbApiSignal.hpp" #include <NdbOut.hpp> #include "NdbDictionaryImpl.hpp" +#include <NdbRecAttr.hpp> +#include <NdbReceiver.hpp> + +#include <stdlib.h> +#include <NdbSqlUtil.hpp> + +#include <signaldata/ScanTab.hpp> +#include <signaldata/KeyInfo.hpp> +#include <signaldata/TcKeyReq.hpp> + NdbScanOperation::NdbScanOperation(Ndb* aNdb) : - NdbCursorOperation(aNdb), - m_transConnection(NULL), - m_autoExecute(false), - m_updateOp(false), - m_deleteOp(false), - m_setValueList(new SetValueRecList()) + NdbOperation(aNdb), + m_resultSet(0), + m_transConnection(NULL) { + theParallelism = 0; + m_allocated_receivers = 0; + m_prepared_receivers = 0; + m_api_receivers = 0; + m_conf_receivers = 0; + m_sent_receivers = 0; + m_receivers = 0; } NdbScanOperation::~NdbScanOperation() { - if (m_setValueList) delete m_setValueList; + fix_receivers(0, false); + if (m_resultSet) + delete m_resultSet; } -NdbCursorOperation::CursorType -NdbScanOperation::cursorType() +NdbResultSet* +NdbScanOperation::getResultSet() { - return NdbCursorOperation::ScanCursor; + if (!m_resultSet) + m_resultSet = new NdbResultSet(this); + + return m_resultSet; } + + void NdbScanOperation::setErrorCode(int aErrorCode){ NdbConnection* tmp = theNdbCon; @@ -90,267 +111,516 @@ NdbScanOperation::init(NdbTableImpl* tab, NdbConnection* myConnection) setErrorCodeAbort(theNdb->getNdbError().code); return -1; } - aScanConnection->theFirstOpInList = this; - aScanConnection->theLastOpInList = this; - NdbCursorOperation::cursInit(); - // NOTE! The hupped trans becomes the owner of the operation - return NdbOperation::init(tab, aScanConnection); -} - -NdbResultSet* NdbScanOperation::readTuples(Uint32 parallell, - NdbCursorOperation::LockMode lm) -{ - int res = 0; - switch(lm){ - case NdbCursorOperation::LM_Read: - parallell = (parallell == 0 ? 240 : parallell); - res = openScan(parallell, false, true, false); - break; - case NdbCursorOperation::LM_Exclusive: - parallell = (parallell == 0 ? 1 : parallell); - res = openScan(parallell, true, true, false); - break; - case NdbCursorOperation::LM_Dirty: - parallell = (parallell == 0 ? 240 : parallell); - res = openScan(parallell, false, false, true); - break; - default: - res = -1; - setErrorCode(4003); - } - if(res == -1){ - return NULL; - } - theNdbCon->theFirstOpInList = 0; - theNdbCon->theLastOpInList = 0; - return getResultSet(); -} - -int NdbScanOperation::updateTuples(Uint32 parallelism) -{ - if (openScanExclusive(parallelism) == -1) { + // NOTE! The hupped trans becomes the owner of the operation + if(NdbOperation::init(tab, aScanConnection) != 0){ return -1; } - theNdbCon->theFirstOpInList = 0; - theNdbCon->theLastOpInList = 0; - - m_updateOp = true; + + initInterpreter(); + + theStatus = GetValue; + theOperationType = OpenScanRequest; + + theTotalBoundAI_Len = 0; + theBoundATTRINFO = NULL; return 0; } -int NdbScanOperation::deleteTuples(Uint32 parallelism) +NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, + Uint32 batch, + Uint32 parallell) { - if (openScanExclusive(parallelism) == -1) { - return -1; - } - theNdbCon->theFirstOpInList = 0; - theNdbCon->theLastOpInList = 0; + m_ordered = 0; - m_deleteOp = true; + Uint32 fragCount = m_currentTable->m_fragmentCount; - return 0; -} + if(batch + parallell == 0){ // Max speed + batch = 16; + parallell = fragCount; + } -int NdbScanOperation::setValue(const char* anAttrName, const char* aValue, Uint32 len) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrName) == NULL) - return -1; + if(batch == 0 && parallell > 0){ // Backward + batch = (parallell >= 16 ? 16 : parallell & 15); + parallell = (parallell + 15) / 16; + + if(parallell == 0) + parallell = 1; + } - m_setValueList->add(anAttrName, aValue, len); - return 0; -} - -int NdbScanOperation::setValue(const char* anAttrName, Int32 aValue) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrName) == NULL) - return -1; - - m_setValueList->add(anAttrName, aValue); - return 0; -} - -int NdbScanOperation::setValue(const char* anAttrName, Uint32 aValue) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrName) == NULL) - return -1; + if(parallell > fragCount) + parallell = fragCount; + else if(parallell == 0) + parallell = fragCount; + + assert(parallell > 0); + + // It is only possible to call openScan if + // 1. this transcation don't already contain another scan operation + // 2. this transaction don't already contain other operations + // 3. theScanOp contains a NdbScanOperation + if (theNdbCon->theScanningOp != NULL){ + setErrorCode(4605); + return 0; + } - m_setValueList->add(anAttrName, aValue); - return 0; -} + theNdbCon->theScanningOp = this; -int NdbScanOperation::setValue(const char* anAttrName, Uint64 aValue) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrName) == NULL) - return -1; + bool lockExcl, lockHoldMode, readCommitted; + switch(lm){ + case NdbScanOperation::LM_Read: + lockExcl = false; + lockHoldMode = true; + readCommitted = false; + break; + case NdbScanOperation::LM_Exclusive: + lockExcl = true; + lockHoldMode = true; + readCommitted = false; + break; + case NdbScanOperation::LM_Dirty: + lockExcl = false; + lockHoldMode = false; + readCommitted = true; + break; + default: + setErrorCode(4003); + return 0; + } - m_setValueList->add(anAttrName, aValue); - return 0; -} + m_keyInfo = lockExcl; + + bool range = false; + if (m_currentTable->m_indexType == NdbDictionary::Index::OrderedIndex || + m_currentTable->m_indexType == NdbDictionary::Index::UniqueOrderedIndex){ + assert(m_currentTable == m_accessTable); + m_currentTable = theNdb->theDictionary-> + getTable(m_currentTable->m_primaryTable.c_str()); + assert(m_currentTable != NULL); + // Modify operation state + theStatus = SetBound; + theOperationType = OpenRangeScanRequest; + range = true; + } + + theParallelism = parallell; + theBatchSize = batch; -int NdbScanOperation::setValue(const char* anAttrName, Int64 aValue) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrName) == NULL) - return -1; + if(fix_receivers(parallell, lockExcl) == -1){ + setErrorCodeAbort(4000); + return 0; + } + + theSCAN_TABREQ = theNdb->getSignal(); + if (theSCAN_TABREQ == NULL) { + setErrorCodeAbort(4000); + return 0; + }//if + + ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend()); + req->apiConnectPtr = theNdbCon->theTCConPtr; + req->tableId = m_accessTable->m_tableId; + req->tableSchemaVersion = m_accessTable->m_version; + req->storedProcId = 0xFFFF; + req->buddyConPtr = theNdbCon->theBuddyConPtr; + + Uint32 reqInfo = 0; + ScanTabReq::setParallelism(reqInfo, parallell); + ScanTabReq::setScanBatch(reqInfo, batch); + ScanTabReq::setLockMode(reqInfo, lockExcl); + ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode); + ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted); + ScanTabReq::setRangeScanFlag(reqInfo, range); + req->requestInfo = reqInfo; - m_setValueList->add(anAttrName, aValue); - return 0; -} + Uint64 transId = theNdbCon->getTransactionId(); + req->transId1 = (Uint32) transId; + req->transId2 = (Uint32) (transId >> 32); -int NdbScanOperation::setValue(const char* anAttrName, float aValue) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrName) == NULL) - return -1; + getFirstATTRINFOScan(); - m_setValueList->add(anAttrName, aValue); - return 0; + return getResultSet(); } -int NdbScanOperation::setValue(const char* anAttrName, double aValue) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrName) == NULL) - return -1; +int +NdbScanOperation::fix_receivers(Uint32 parallell, bool keyInfo){ + if(parallell == 0 || parallell > m_allocated_receivers){ + if(m_prepared_receivers) delete[] m_prepared_receivers; + if(m_receivers) delete[] m_receivers; + if(m_api_receivers) delete[] m_api_receivers; + if(m_conf_receivers) delete[] m_conf_receivers; + if(m_sent_receivers) delete[] m_sent_receivers; + + m_allocated_receivers = parallell; + if(parallell == 0){ + return 0; + } + + m_prepared_receivers = new Uint32[parallell]; + m_receivers = new NdbReceiver*[parallell]; + m_api_receivers = new NdbReceiver*[parallell]; + m_conf_receivers = new NdbReceiver*[parallell]; + m_sent_receivers = new NdbReceiver*[parallell]; + + NdbReceiver* tScanRec; + for (Uint32 i = 0; i < parallell; i ++) { + tScanRec = theNdb->getNdbScanRec(); + if (tScanRec == NULL) { + setErrorCodeAbort(4000); + return -1; + }//if + m_receivers[i] = tScanRec; + tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this, keyInfo); + } + } - m_setValueList->add(anAttrName, aValue); + for(Uint32 i = 0; i<parallell; i++){ + m_receivers[i]->m_list_index = i; + m_prepared_receivers[i] = m_receivers[i]->getId(); + m_sent_receivers[i] = m_receivers[i]; + m_conf_receivers[i] = 0; + m_api_receivers[i] = 0; + } + + m_api_receivers_count = 0; + m_current_api_receiver = 0; + m_sent_receivers_count = parallell; + m_conf_receivers_count = 0; return 0; } - -int NdbScanOperation::setValue(Uint32 anAttrId, const char* aValue, Uint32 len) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrId) == NULL) - return -1; - - m_setValueList->add(anAttrId, aValue, len); - return 0; +/** + * Move receiver from send array to conf:ed array + */ +void +NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ + Uint32 idx = tRec->m_list_index; + Uint32 last = m_sent_receivers_count - 1; + if(idx != last){ + NdbReceiver * move = m_sent_receivers[last]; + m_sent_receivers[idx] = move; + move->m_list_index = idx; + } + m_sent_receivers_count = last; + + last = m_conf_receivers_count; + m_conf_receivers[last] = tRec; + m_conf_receivers_count = last + 1; + tRec->m_list_index = last; + tRec->m_current_row = 0; } -int NdbScanOperation::setValue(Uint32 anAttrId, Int32 aValue) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrId) == NULL) - return -1; - - m_setValueList->add(anAttrId, aValue); - return 0; +/** + * Remove receiver as it's completed + */ +void +NdbScanOperation::receiver_completed(NdbReceiver* tRec){ + Uint32 idx = tRec->m_list_index; + Uint32 last = m_sent_receivers_count - 1; + if(idx != last){ + NdbReceiver * move = m_sent_receivers[last]; + m_sent_receivers[idx] = move; + move->m_list_index = idx; + } + m_sent_receivers_count = last; } -int NdbScanOperation::setValue(Uint32 anAttrId, Uint32 aValue) +/***************************************************************************** + * int getFirstATTRINFOScan( U_int32 aData ) + * + * Return Value: Return 0: Successful + * Return -1: All other cases + * Parameters: None: Only allocate the first signal. + * Remark: When a scan is defined we need to use this method instead + * of insertATTRINFO for the first signal. + * This is because we need not to mess up the code in + * insertATTRINFO with if statements since we are not + * interested in the TCKEYREQ signal. + *****************************************************************************/ +int +NdbScanOperation::getFirstATTRINFOScan() { - // Check if attribute exist - if (m_currentTable->getColumn(anAttrId) == NULL) - return -1; + NdbApiSignal* tSignal; - m_setValueList->add(anAttrId, aValue); - return 0; -} - -int NdbScanOperation::setValue(Uint32 anAttrId, Uint64 aValue) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrId) == NULL) - return -1; + tSignal = theNdb->getSignal(); + if (tSignal == NULL){ + setErrorCodeAbort(4000); + return -1; + } + tSignal->setSignal(m_attrInfoGSN); + theAI_LenInCurrAI = 8; + theATTRINFOptr = &tSignal->getDataPtrSend()[8]; + theFirstATTRINFO = tSignal; + theCurrentATTRINFO = tSignal; + theCurrentATTRINFO->next(NULL); - m_setValueList->add(anAttrId, aValue); return 0; } -int NdbScanOperation::setValue(Uint32 anAttrId, Int64 aValue) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrId) == NULL) - return -1; +/** + * Constats for theTupleKeyDefined[][0] + */ +#define SETBOUND_EQ 1 +#define FAKE_PTR 2 +#define API_PTR 3 - m_setValueList->add(anAttrId, aValue); - return 0; -} -int NdbScanOperation::setValue(Uint32 anAttrId, float aValue) +/* + * After setBound() are done, move the accumulated ATTRINFO signals to + * a separate list. Then continue with normal scan. + */ +int +NdbScanOperation::saveBoundATTRINFO() { - // Check if attribute exist - if (m_currentTable->getColumn(anAttrId) == NULL) - return -1; - - m_setValueList->add(anAttrId, aValue); - return 0; + theCurrentATTRINFO->setLength(theAI_LenInCurrAI); + theBoundATTRINFO = theFirstATTRINFO; + theTotalBoundAI_Len = theTotalCurrAI_Len; + theTotalCurrAI_Len = 5; + theBoundATTRINFO->setData(theTotalBoundAI_Len, 4); + theBoundATTRINFO->setData(0, 5); + theBoundATTRINFO->setData(0, 6); + theBoundATTRINFO->setData(0, 7); + theBoundATTRINFO->setData(0, 8); + theStatus = GetValue; + + int res = getFirstATTRINFOScan(); + + /** + * Define each key with getValue (if ordered) + * unless the one's with EqBound + */ + if(!res && m_ordered){ + Uint32 idx = 0; + Uint32 cnt = m_currentTable->getNoOfPrimaryKeys(); + while(!theTupleKeyDefined[idx][0] && idx < cnt){ + NdbColumnImpl* col = m_currentTable->getColumn(idx); + NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1); + UintPtr newVal = UintPtr(tmp); + theTupleKeyDefined[idx][0] = FAKE_PTR; + theTupleKeyDefined[idx][1] = (newVal & 0xFFFFFFFF); +#if (SIZEOF_CHARP == 8) + theTupleKeyDefined[idx][2] = (newVal >> 32); +#endif + idx++; + } + } + return res; } -int NdbScanOperation::setValue(Uint32 anAttrId, double aValue) -{ - // Check if attribute exist - if (m_currentTable->getColumn(anAttrId) == NULL) - return -1; +#define WAITFOR_SCAN_TIMEOUT 120000 - m_setValueList->add(anAttrId, aValue); - return 0; +int +NdbScanOperation::executeCursor(int nodeId){ + NdbConnection * tCon = theNdbCon; + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + Uint32 seq = tCon->theNodeSequence; + if (tp->get_node_alive(nodeId) && + (tp->getNodeSequence(nodeId) == seq)) { + + if(prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1) + return -1; + + tCon->theMagicNumber = 0x37412619; + + if (doSendScan(nodeId) == -1) + return -1; + + return 0; + } else { + if (!(tp->get_node_stopping(nodeId) && + (tp->getNodeSequence(nodeId) == seq))){ + TRACE_DEBUG("The node is hard dead when attempting to start a scan"); + setErrorCode(4029); + tCon->theReleaseOnClose = true; + abort(); + } else { + TRACE_DEBUG("The node is stopping when attempting to start a scan"); + setErrorCode(4030); + }//if + tCon->theCommitStatus = Aborted; + }//if + return -1; } -// Private methods - -int NdbScanOperation::executeCursor(int ProcessorId) +int NdbScanOperation::nextResult(bool fetchAllowed) { - int result = theNdbCon->executeScan(); - // If the scan started ok and we are updating or deleting - // iterate over all tuples - if ((m_updateOp) || (m_deleteOp)) { - NdbOperation* newOp; - - while ((result != -1) && (nextResult() == 0)) { - if (m_updateOp) { - newOp = takeOverScanOp(UpdateRequest, m_transConnection); - // Pass setValues from scan operation to new operation - m_setValueList->iterate(SetValueRecList::callSetValueFn, *newOp); - // No need to call updateTuple since scan was taken over for update - // it should be the same with delete - MASV - // newOp->updateTuple(); - } - else if (m_deleteOp) { - newOp = takeOverScanOp(DeleteRequest, m_transConnection); - // newOp->deleteTuple(); + if(m_ordered) + return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed); + + /** + * Check current receiver + */ + int retVal = 2; + Uint32 idx = m_current_api_receiver; + Uint32 last = m_api_receivers_count; + + /** + * Check next buckets + */ + for(; idx < last; idx++){ + NdbReceiver* tRec = m_api_receivers[idx]; + if(tRec->nextResult()){ + tRec->copyout(theReceiver); + retVal = 0; + break; + } + } + + /** + * We have advanced atleast one bucket + */ + if(!fetchAllowed){ + m_current_api_receiver = idx; + return retVal; + } + + Uint32 nodeId = theNdbCon->theDBnode; + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + Uint32 seq = theNdbCon->theNodeSequence; + if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){ + + idx = m_current_api_receiver; + last = m_api_receivers_count; + + do { + Uint32 cnt = m_conf_receivers_count; + Uint32 sent = m_sent_receivers_count; + + if(cnt > 0){ + /** + * Just move completed receivers + */ + memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*)); + last += cnt; + m_conf_receivers_count = 0; + } else if(retVal == 2 && sent > 0){ + /** + * No completed... + */ + theNdb->theWaiter.m_node = nodeId; + theNdb->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { + continue; + } else { + idx = last; + retVal = -1; //return_code; + } + } else if(retVal == 2){ + /** + * No completed & no sent -> EndOfData + */ + if(send_next_scan(0, true) == 0){ // Close scan + theNdb->theWaiter.m_node = nodeId; + theNdb->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { + return 1; + } + retVal = -1; //return_code; + } else { + retVal = -3; + } + idx = last; } -#if 0 - // takeOverScanOp will take over the lock that scan aquired - // the lock is released when nextScanResult is called - // That means that the "takeover" has to be sent to the kernel - // before nextScanresult is called - MASV - if (m_autoExecute){ - m_transConnection->execute(NoCommit); + + if(retVal == 0) + break; + + for(; idx < last; idx++){ + NdbReceiver* tRec = m_api_receivers[idx]; + if(tRec->nextResult()){ + tRec->copyout(theReceiver); + retVal = 0; + break; + } } -#else - m_transConnection->execute(NoCommit); -#endif - } - closeScan(); + } while(retVal == 2); + } else { + retVal = -3; } - - return result; + + m_api_receivers_count = last; + m_current_api_receiver = idx; + + switch(retVal){ + case 0: + case 1: + case 2: + return retVal; + case -1: + setErrorCode(4008); // Timeout + break; + case -2: + setErrorCode(4028); // Node fail + break; + case -3: // send_next_scan -> return fail (set error-code self) + break; + } + + theNdbCon->theTransactionIsStarted = false; + theNdbCon->theReleaseOnClose = true; + return -1; } -int NdbScanOperation::nextResult(bool fetchAllowed) -{ - int result = theNdbCon->nextScanResult(fetchAllowed); - if (result == -1){ - // Move the error code from hupped transaction - // to the real trans - const NdbError err = theNdbCon->getNdbError(); - m_transConnection->setOperationErrorCode(err.code); +int +NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ + if(cnt > 0 || stopScanFlag){ + NdbApiSignal tSignal(theNdb->theMyRef); + tSignal.setSignal(GSN_SCAN_NEXTREQ); + + Uint32* theData = tSignal.getDataPtrSend(); + theData[0] = theNdbCon->theTCConPtr; + theData[1] = stopScanFlag == true ? 1 : 0; + Uint64 transId = theNdbCon->theTransactionId; + theData[2] = transId; + theData[3] = (Uint32) (transId >> 32); + + /** + * Prepare ops + */ + Uint32 last = m_sent_receivers_count; + Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4); + for(Uint32 i = 0; i<cnt; i++){ + NdbReceiver * tRec = m_api_receivers[i]; + m_sent_receivers[last+i] = tRec; + tRec->m_list_index = last+i; + prep_array[i] = tRec->m_tcPtrI; + tRec->prepareSend(); + } + memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*)); + + Uint32 nodeId = theNdbCon->theDBnode; + TransporterFacade * tp = TransporterFacade::instance(); + int ret; + if(cnt > 21){ + tSignal.setLength(4); + LinearSectionPtr ptr[3]; + ptr[0].p = prep_array; + ptr[0].sz = cnt; + ret = tp->sendFragmentedSignal(&tSignal, nodeId, ptr, 1); + } else { + tSignal.setLength(4+cnt); + ret = tp->sendSignal(&tSignal, nodeId); + } + + m_sent_receivers_count = last + cnt + stopScanFlag; + m_api_receivers_count -= cnt; + m_current_api_receiver = 0; + + return ret; } - return result; + return 0; } int NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId) { printf("NdbScanOperation::prepareSend\n"); + abort(); return 0; } @@ -363,300 +633,689 @@ NdbScanOperation::doSend(int ProcessorId) void NdbScanOperation::closeScan() { - if(theNdbCon){ - if (theNdbCon->stopScan() == -1) - theError = theNdbCon->getNdbError(); - theNdb->closeTransaction(theNdbCon); - theNdbCon = 0; - } - m_transConnection = NULL; -} + do { + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); -void NdbScanOperation::release(){ - closeScan(); - NdbCursorOperation::release(); -} + Uint32 seq = theNdbCon->theNodeSequence; + Uint32 nodeId = theNdbCon->theDBnode; -void SetValueRecList::add(const char* anAttrName, const char* aValue, Uint32 len) -{ - SetValueRec* newSetValueRec = new SetValueRec(); + if(seq != tp->getNodeSequence(nodeId)){ + theNdbCon->theReleaseOnClose = true; + break; + } + + /** + * Wait for all running scans... + */ + while(m_sent_receivers_count){ + theNdb->theWaiter.m_node = nodeId; + theNdb->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + switch(return_code){ + case 0: + break; + case -1: + setErrorCode(4008); + case -2: + m_sent_receivers_count = 0; + m_api_receivers_count = 0; + m_conf_receivers_count = 0; + } + } + + if(seq != tp->getNodeSequence(nodeId)){ + theNdbCon->theReleaseOnClose = true; + break; + } + + if(m_api_receivers_count+m_conf_receivers_count){ + // Send close scan + send_next_scan(0, true); // Close scan + + /** + * wait for close scan conf + */ + do { + theNdb->theWaiter.m_node = nodeId; + theNdb->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + switch(return_code){ + case 0: + break; + case -1: + setErrorCode(4008); + case -2: + m_api_receivers_count = 0; + m_conf_receivers_count = 0; + } + } while(m_api_receivers_count+m_conf_receivers_count); + } + } while(0); + + theNdbCon->theScanningOp = 0; + theNdb->closeTransaction(theNdbCon); + + theNdbCon = 0; + m_transConnection = NULL; +} - newSetValueRec->stype = SetValueRec::SET_STRING_ATTR1; - newSetValueRec->anAttrName = strdup(anAttrName); - newSetValueRec->stringStruct.aStringValue = (char *) malloc(len); - strlcpy(newSetValueRec->stringStruct.aStringValue, aValue, len); - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; +void +NdbScanOperation::execCLOSE_SCAN_REP(Uint32 errCode){ + /** + * We will receive no further signals from this scan + */ + if(!errCode){ + /** + * Normal termination + */ + theNdbCon->theCommitStatus = Committed; + theNdbCon->theCompletionStatus = CompletedSuccess; + } else { + /** + * Something is fishy + */ + abort(); } + m_api_receivers_count = 0; + m_conf_receivers_count = 0; + m_sent_receivers_count = 0; } -void SetValueRecList::add(const char* anAttrName, Int32 aValue) +void NdbScanOperation::release() { - SetValueRec* newSetValueRec = new SetValueRec(); - - newSetValueRec->stype = SetValueRec::SET_INT32_ATTR1; - newSetValueRec->anAttrName = strdup(anAttrName); - newSetValueRec->anInt32Value = aValue; - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; + if(theNdbCon != 0 || m_transConnection != 0){ + closeScan(); + } + for(Uint32 i = 0; i<m_allocated_receivers; i++){ + m_receivers[i]->release(); } } -void SetValueRecList::add(const char* anAttrName, Uint32 aValue) -{ - SetValueRec* newSetValueRec = new SetValueRec(); +/*************************************************************************** +int prepareSendScan(Uint32 aTC_ConnectPtr, + Uint64 aTransactionId) + +Return Value: Return 0 : preparation of send was succesful. + Return -1: In all other case. +Parameters: aTC_ConnectPtr: the Connect pointer to TC. + aTransactionId: the Transaction identity of the transaction. +Remark: Puts the the final data into ATTRINFO signal(s) after this + we know the how many signal to send and their sizes +***************************************************************************/ +int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, + Uint64 aTransactionId){ + + if (theInterpretIndicator != 1 || + (theOperationType != OpenScanRequest && + theOperationType != OpenRangeScanRequest)) { + setErrorCodeAbort(4005); + return -1; + } - newSetValueRec->stype = SetValueRec::SET_UINT32_ATTR1; - newSetValueRec->anAttrName = strdup(anAttrName); - newSetValueRec->anUint32Value = aValue; - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; + if (theStatus == SetBound) { + saveBoundATTRINFO(); + theStatus = GetValue; } -} -void SetValueRecList::add(const char* anAttrName, Int64 aValue) -{ - SetValueRec* newSetValueRec = new SetValueRec(); + theErrorLine = 0; - newSetValueRec->stype = SetValueRec::SET_INT64_ATTR1; - newSetValueRec->anAttrName = strdup(anAttrName); - newSetValueRec->anInt64Value = aValue; - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; + // In preapareSendInterpreted we set the sizes (word 4-8) in the + // first ATTRINFO signal. + if (prepareSendInterpreted() == -1) + return -1; + + if(m_ordered){ + ((NdbIndexScanOperation*)this)->fix_get_values(); + } + + const Uint32 transId1 = (Uint32) (aTransactionId & 0xFFFFFFFF); + const Uint32 transId2 = (Uint32) (aTransactionId >> 32); + + if (theOperationType == OpenRangeScanRequest) { + NdbApiSignal* tSignal = theBoundATTRINFO; + do{ + tSignal->setData(aTC_ConnectPtr, 1); + tSignal->setData(transId1, 2); + tSignal->setData(transId2, 3); + tSignal = tSignal->next(); + } while (tSignal != NULL); + } + theCurrentATTRINFO->setLength(theAI_LenInCurrAI); + NdbApiSignal* tSignal = theFirstATTRINFO; + do{ + tSignal->setData(aTC_ConnectPtr, 1); + tSignal->setData(transId1, 2); + tSignal->setData(transId2, 3); + tSignal = tSignal->next(); + } while (tSignal != NULL); + + /** + * Prepare all receivers + */ + theReceiver.prepareSend(); + bool keyInfo = m_keyInfo; + Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0; + for(Uint32 i = 0; i<theParallelism; i++){ + m_receivers[i]->do_get_value(&theReceiver, theBatchSize, key_size); } + return 0; } -void SetValueRecList::add(const char* anAttrName, Uint64 aValue) +/****************************************************************************** +int doSend() + +Return Value: Return >0 : send was succesful, returns number of signals sent + Return -1: In all other case. +Parameters: aProcessorId: Receiving processor node +Remark: Sends the ATTRINFO signal(s) +******************************************************************************/ +int +NdbScanOperation::doSendScan(int aProcessorId) { - SetValueRec* newSetValueRec = new SetValueRec(); + Uint32 tSignalCount = 0; + NdbApiSignal* tSignal; + + if (theInterpretIndicator != 1 || + (theOperationType != OpenScanRequest && + theOperationType != OpenRangeScanRequest)) { + setErrorCodeAbort(4005); + return -1; + } + + assert(theSCAN_TABREQ != NULL); + tSignal = theSCAN_TABREQ; + if (tSignal->setSignal(GSN_SCAN_TABREQ) == -1) { + setErrorCode(4001); + return -1; + } + // Update the "attribute info length in words" in SCAN_TABREQ before + // sending it. This could not be done in openScan because + // we created the ATTRINFO signals after the SCAN_TABREQ signal. + ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend()); + req->attrLen = theTotalCurrAI_Len; + if (theOperationType == OpenRangeScanRequest) + req->attrLen += theTotalBoundAI_Len; + TransporterFacade *tp = TransporterFacade::instance(); + if(theParallelism > 16){ + LinearSectionPtr ptr[3]; + ptr[0].p = m_prepared_receivers; + ptr[0].sz = theParallelism; + if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) { + setErrorCode(4002); + return -1; + } + } else { + tSignal->setLength(9+theParallelism); + memcpy(tSignal->getDataPtrSend()+9, m_prepared_receivers, 4*theParallelism); + if (tp->sendSignal(tSignal, aProcessorId) == -1) { + setErrorCode(4002); + return -1; + } + } - newSetValueRec->stype = SetValueRec::SET_UINT64_ATTR1; - newSetValueRec->anAttrName = strdup(anAttrName); - newSetValueRec->anUint64Value = aValue; - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; + if (theOperationType == OpenRangeScanRequest) { + // must have at least one signal since it contains attrLen for bounds + assert(theBoundATTRINFO != NULL); + tSignal = theBoundATTRINFO; + while (tSignal != NULL) { + if (tp->sendSignal(tSignal,aProcessorId) == -1){ + setErrorCode(4002); + return -1; + } + tSignalCount++; + tSignal = tSignal->next(); + } } -} + + tSignal = theFirstATTRINFO; + while (tSignal != NULL) { + if (tp->sendSignal(tSignal,aProcessorId) == -1){ + setErrorCode(4002); + return -1; + } + tSignalCount++; + tSignal = tSignal->next(); + } + theStatus = WaitResponse; + return tSignalCount; +}//NdbOperation::doSendScan() + +/****************************************************************************** + * NdbOperation* takeOverScanOp(NdbConnection* updateTrans); + * + * Parameters: The update transactions NdbConnection pointer. + * Return Value: A reference to the transferred operation object + * or NULL if no success. + * Remark: Take over the scanning transactions NdbOperation + * object for a tuple to an update transaction, + * which is the last operation read in nextScanResult() + * (theNdbCon->thePreviousScanRec) + * + * FUTURE IMPLEMENTATION: (This note was moved from header file.) + * In the future, it will even be possible to transfer + * to a NdbConnection on another Ndb-object. + * In this case the receiving NdbConnection-object must call + * a method receiveOpFromScan to actually receive the information. + * This means that the updating transactions can be placed + * in separate threads and thus increasing the parallelism during + * the scan process. + *****************************************************************************/ +NdbOperation* +NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ + + Uint32 idx = m_current_api_receiver; + Uint32 last = m_api_receivers_count; + + Uint32 row; + NdbReceiver * tRec; + NdbRecAttr * tRecAttr; + if(idx < last && (tRec = m_api_receivers[idx]) + && ((row = tRec->m_current_row) <= tRec->m_defined_rows) + && (tRecAttr = tRec->m_rows[row-1])){ + + NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable); + if (newOp == NULL){ + return NULL; + } + + const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1; + + newOp->theTupKeyLen = len; + newOp->theOperationType = opType; + if (opType == DeleteRequest) { + newOp->theStatus = GetValue; + } else { + newOp->theStatus = SetValue; + } + + const Uint32 * src = (Uint32*)tRecAttr->aRef(); + const Uint32 tScanInfo = src[len] & 0xFFFF; + const Uint32 tTakeOverNode = src[len] >> 16; + { + UintR scanInfo = 0; + TcKeyReq::setTakeOverScanFlag(scanInfo, 1); + TcKeyReq::setTakeOverScanNode(scanInfo, tTakeOverNode); + TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo); + newOp->theScanInfo = scanInfo; + } -void SetValueRecList::add(const char* anAttrName, float aValue) -{ - SetValueRec* newSetValueRec = new SetValueRec(); + // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ + TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend()); + Uint32 i = 0; + for (i = 0; i < TcKeyReq::MaxKeyInfo && i < len; i++) { + tcKeyReq->keyInfo[i] = * src++; + } + + if(i < len){ + NdbApiSignal* tSignal = theNdb->getSignal(); + newOp->theFirstKEYINFO = tSignal; + + Uint32 left = len - i; + while(tSignal && left > KeyInfo::DataLength){ + tSignal->setSignal(GSN_KEYINFO); + KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend()); + memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength); + src += KeyInfo::DataLength; + left -= KeyInfo::DataLength; + + tSignal->next(theNdb->getSignal()); + tSignal = tSignal->next(); + } - newSetValueRec->stype = SetValueRec::SET_FLOAT_ATTR1; - newSetValueRec->anAttrName = strdup(anAttrName); - newSetValueRec->aFloatValue = aValue; - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; + if(tSignal && left > 0){ + tSignal->setSignal(GSN_KEYINFO); + KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend()); + memcpy(keyInfo->keyData, src, 4 * left); + } + } + return newOp; } + return 0; } -void SetValueRecList::add(const char* anAttrName, double aValue) +NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb) + : NdbScanOperation(aNdb) { - SetValueRec* newSetValueRec = new SetValueRec(); +} - newSetValueRec->stype = SetValueRec::SET_DOUBLE_ATTR1; - newSetValueRec->anAttrName = strdup(anAttrName); - newSetValueRec->aDoubleValue = aValue; - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; - } +NdbIndexScanOperation::~NdbIndexScanOperation(){ } -void SetValueRecList::add(Uint32 anAttrId, const char* aValue, Uint32 len) +int +NdbIndexScanOperation::setBound(const char* anAttrName, int type, const void* aValue, Uint32 len) { - SetValueRec* newSetValueRec = new SetValueRec(); - - newSetValueRec->stype = SetValueRec::SET_STRING_ATTR2; - newSetValueRec->anAttrId = anAttrId; - newSetValueRec->stringStruct.aStringValue = (char *) malloc(len); - strlcpy(newSetValueRec->stringStruct.aStringValue, aValue, len); - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; - } + return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len); } -void SetValueRecList::add(Uint32 anAttrId, Int32 aValue) +int +NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len) { - SetValueRec* newSetValueRec = new SetValueRec(); + return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len); +} - newSetValueRec->stype = SetValueRec::SET_INT32_ATTR2; - newSetValueRec->anAttrId = anAttrId; - newSetValueRec->anInt32Value = aValue; - last->next = newSetValueRec; - last = newSetValueRec; +int +NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject, + const char* aValue, + Uint32 len){ + return setBound(anAttrObject, BoundEQ, aValue, len); } -void SetValueRecList::add(Uint32 anAttrId, Uint32 aValue) -{ - SetValueRec* newSetValueRec = new SetValueRec(); +NdbRecAttr* +NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, + char* aValue){ + if(!attrInfo->getPrimaryKey() || !m_ordered){ + return NdbScanOperation::getValue_impl(attrInfo, aValue); + } + + Uint32 id = attrInfo->m_attrId; + Uint32 marker = theTupleKeyDefined[id][0]; - newSetValueRec->stype = SetValueRec::SET_UINT32_ATTR2; - newSetValueRec->anAttrId = anAttrId; - newSetValueRec->anUint32Value = aValue; - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; + if(marker == SETBOUND_EQ){ + return NdbScanOperation::getValue_impl(attrInfo, aValue); + } else if(marker == API_PTR){ + return NdbScanOperation::getValue_impl(attrInfo, aValue); } + + UintPtr oldVal; + oldVal = theTupleKeyDefined[id][1]; +#if (SIZEOF_CHARP == 8) + oldVal = oldVal | (((UintPtr)theTupleKeyDefined[id][2]) << 32); +#endif + theTupleKeyDefined[id][0] = API_PTR; + + NdbRecAttr* tmp = (NdbRecAttr*)oldVal; + tmp->setup(attrInfo, aValue); + return tmp; } -void SetValueRecList::add(Uint32 anAttrId, Int64 aValue) +#include <AttributeHeader.hpp> +/* + * Define bound on index column in range scan. + */ +int +NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, + int type, const void* aValue, Uint32 len) { - SetValueRec* newSetValueRec = new SetValueRec(); + if (theOperationType == OpenRangeScanRequest && + theStatus == SetBound && + (0 <= type && type <= 4) && + aValue != NULL && + len <= 8000) { + // bound type + + insertATTRINFO(type); + // attribute header + Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize; + if (len != sizeInBytes && (len != 0)) { + setErrorCodeAbort(4209); + return -1; + } + len = sizeInBytes; + Uint32 tIndexAttrId = tAttrInfo->m_attrId; + Uint32 sizeInWords = (len + 3) / 4; + AttributeHeader ah(tIndexAttrId, sizeInWords); + insertATTRINFO(ah.m_value); + // attribute data + if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0) + insertATTRINFOloop((const Uint32*)aValue, sizeInWords); + else { + Uint32 temp[2000]; + memcpy(temp, aValue, len); + while ((len & 0x3) != 0) + ((char*)temp)[len++] = 0; + insertATTRINFOloop(temp, sizeInWords); + } - newSetValueRec->stype = SetValueRec::SET_INT64_ATTR2; - newSetValueRec->anAttrId = anAttrId; - newSetValueRec->anInt64Value = aValue; - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; + /** + * Do sorted stuff + */ + + /** + * The primary keys for an ordered index is defined in the beginning + * so it's safe to use [tIndexAttrId] + * (instead of looping as is NdbOperation::equal_impl) + */ + if(!theTupleKeyDefined[tIndexAttrId][0]){ + theNoOfTupKeyDefined++; + theTupleKeyDefined[tIndexAttrId][0] = SETBOUND_EQ; + m_sort_columns -= m_ordered; + } + + return 0; + } else { + setErrorCodeAbort(4228); // XXX wrong code + return -1; } } -void SetValueRecList::add(Uint32 anAttrId, Uint64 aValue) -{ - SetValueRec* newSetValueRec = new SetValueRec(); +NdbResultSet* +NdbIndexScanOperation::readTuples(LockMode lm, + Uint32 batch, + Uint32 parallel, + bool order_by){ + NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0); + if(rs && order_by){ + m_ordered = 1; + m_sort_columns = m_accessTable->getNoOfPrimaryKeys(); + m_current_api_receiver = m_sent_receivers_count; + } + return rs; +} - newSetValueRec->stype = SetValueRec::SET_UINT64_ATTR2; - newSetValueRec->anAttrId = anAttrId; - newSetValueRec->anUint64Value = aValue; - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; +void +NdbIndexScanOperation::fix_get_values(){ + /** + * Loop through all getValues and set buffer pointer to "API" pointer + */ + NdbRecAttr * curr = theReceiver.theFirstRecAttr; + + Uint32 cnt = m_sort_columns; + assert(cnt < MAXNROFTUPLEKEY); + + Uint32 idx = 0; + NdbTableImpl * tab = m_currentTable; + while(cnt > 0){ // To MAXNROFTUPLEKEY loops + NdbColumnImpl * col = tab->getColumn(idx); + if(col->getPrimaryKey()){ + Uint32 val = theTupleKeyDefined[idx][0]; + switch(val){ + case FAKE_PTR: + curr->setup(col, 0); + // Fall-through + case API_PTR: + cnt--; + break; + case SETBOUND_EQ: + (void)1; +#ifdef VM_TRACE + break; + default: + abort(); +#endif + } + } + idx++; } } -void SetValueRecList::add(Uint32 anAttrId, float aValue) -{ - SetValueRec* newSetValueRec = new SetValueRec(); +int +NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, + const NdbReceiver* t1, + const NdbReceiver* t2){ + + NdbRecAttr * r1 = t1->m_rows[t1->m_current_row]; + NdbRecAttr * r2 = t2->m_rows[t2->m_current_row]; - newSetValueRec->stype = SetValueRec::SET_FLOAT_ATTR2; - newSetValueRec->anAttrId = anAttrId; - newSetValueRec->aFloatValue = aValue; - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; + r1 = (skip ? r1->next() : r1); + r2 = (skip ? r2->next() : r2); + + while(cols > 0){ + Uint32 * d1 = (Uint32*)r1->aRef(); + Uint32 * d2 = (Uint32*)r2->aRef(); + unsigned r1_null = r1->isNULL(); + if((r1_null ^ (unsigned)r2->isNULL())){ + return (r1_null ? 1 : -1); + } + Uint32 type = NdbColumnImpl::getImpl(* r1->m_column).m_extType; + Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4; + if(!r1_null){ + char r = NdbSqlUtil::cmp(type, d1, d2, size, size); + if(r){ + assert(r != NdbSqlUtil::CmpUnknown); + assert(r != NdbSqlUtil::CmpError); + return r; + } + } + cols--; + r1 = r1->next(); + r2 = r2->next(); } + return 0; } -void SetValueRecList::add(Uint32 anAttrId, double aValue) -{ - SetValueRec* newSetValueRec = new SetValueRec(); +#define DEBUG_NEXT_RESULT 0 + +int +NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ + + Uint32 u_idx = m_current_api_receiver; // start of unsorted + Uint32 u_last = u_idx + 1; // last unsorted + Uint32 s_idx = u_last; // start of sorted + Uint32 s_last = theParallelism; // last sorted + + NdbReceiver** arr = m_api_receivers; + NdbReceiver* tRec = arr[u_idx]; + + if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d", + fetchAllowed, + (u_idx < s_last ? tRec->nextResult() : 0)); + + if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", + u_idx, u_last, + s_idx, s_last); + + bool fetchNeeded = (u_idx == s_last) || !tRec->nextResult(); + + if(fetchNeeded){ + if(fetchAllowed){ + if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch..."); + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + Uint32 seq = theNdbCon->theNodeSequence; + Uint32 nodeId = theNdbCon->theDBnode; + if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(u_idx)){ + Uint32 tmp = m_sent_receivers_count; + while(m_sent_receivers_count > 0){ + theNdb->theWaiter.m_node = nodeId; + theNdb->theWaiter.m_state = WAIT_SCAN; + int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { + continue; + } + return -1; + } + + u_idx = 0; + u_last = m_conf_receivers_count; + s_idx = (u_last > 1 ? s_last : s_idx); + m_conf_receivers_count = 0; + memcpy(arr, m_conf_receivers, u_last * sizeof(char*)); + + if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last); + } + } else { + return 2; + } + } - newSetValueRec->stype = SetValueRec::SET_DOUBLE_ATTR2; - newSetValueRec->anAttrId = anAttrId; - newSetValueRec->aDoubleValue = aValue; - if (!last) - first = last = newSetValueRec; - else { - last->next = newSetValueRec; - last = newSetValueRec; + if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", + u_idx, u_last, + s_idx, s_last); + + + Uint32 cols = m_sort_columns; + Uint32 skip = m_keyInfo; + while(u_idx < u_last){ + u_last--; + tRec = arr[u_last]; + + // Do binary search instead to find place + Uint32 place = s_idx; + for(; place < s_last; place++){ + if(compare(skip, cols, tRec, arr[place]) <= 0){ + break; + } + } + + if(place != s_idx){ + if(DEBUG_NEXT_RESULT) + ndbout_c("memmove(%d, %d, %d)", s_idx-1, s_idx, (place - s_idx)); + memmove(arr+s_idx-1, arr+s_idx, sizeof(char*)*(place - s_idx)); + } + + if(DEBUG_NEXT_RESULT) ndbout_c("putting %d @ %d", u_last, place - 1); + m_api_receivers[place-1] = tRec; + s_idx--; } -} -void -SetValueRecList::callSetValueFn(SetValueRec& aSetValueRec, NdbOperation& oper) -{ - switch(aSetValueRec.stype) { - case(SetValueRec::SET_STRING_ATTR1): - oper.setValue(aSetValueRec.anAttrName, aSetValueRec.stringStruct.aStringValue, aSetValueRec.stringStruct.len); - break; - case(SetValueRec::SET_INT32_ATTR1): - oper.setValue(aSetValueRec.anAttrName, aSetValueRec.anInt32Value); - break; - case(SetValueRec::SET_UINT32_ATTR1): - oper.setValue(aSetValueRec.anAttrName, aSetValueRec.anUint32Value); - break; - case(SetValueRec::SET_INT64_ATTR1): - oper.setValue(aSetValueRec.anAttrName, aSetValueRec.anInt64Value); - break; - case(SetValueRec::SET_UINT64_ATTR1): - oper.setValue(aSetValueRec.anAttrName, aSetValueRec.anUint64Value); - break; - case(SetValueRec::SET_FLOAT_ATTR1): - oper.setValue(aSetValueRec.anAttrName, aSetValueRec.aFloatValue); - break; - case(SetValueRec::SET_DOUBLE_ATTR1): - oper.setValue(aSetValueRec.anAttrName, aSetValueRec.aDoubleValue); - break; - case(SetValueRec::SET_STRING_ATTR2): - oper.setValue(aSetValueRec.anAttrId, aSetValueRec.stringStruct.aStringValue, aSetValueRec.stringStruct.len); - break; - case(SetValueRec::SET_INT32_ATTR2): - oper.setValue(aSetValueRec.anAttrId, aSetValueRec.anInt32Value); - break; - case(SetValueRec::SET_UINT32_ATTR2): - oper.setValue(aSetValueRec.anAttrId, aSetValueRec.anUint32Value); - break; - case(SetValueRec::SET_INT64_ATTR2): - oper.setValue(aSetValueRec.anAttrId, aSetValueRec.anInt64Value); - break; - case(SetValueRec::SET_UINT64_ATTR2): - oper.setValue(aSetValueRec.anAttrId, aSetValueRec.anUint64Value); - break; - case(SetValueRec::SET_FLOAT_ATTR2): - oper.setValue(aSetValueRec.anAttrId, aSetValueRec.aFloatValue); - break; - case(SetValueRec::SET_DOUBLE_ATTR2): - oper.setValue(aSetValueRec.anAttrId, aSetValueRec.aDoubleValue); - break; + if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", + u_idx, u_last, + s_idx, s_last); + + m_current_api_receiver = s_idx; + + if(DEBUG_NEXT_RESULT) + for(Uint32 i = s_idx; i<s_last; i++) + ndbout_c("%p", arr[i]); + + tRec = m_api_receivers[s_idx]; + if(s_idx < s_last && tRec->nextResult()){ + tRec->copyout(theReceiver); + return 0; } -} -SetValueRec::~SetValueRec() -{ - if ((stype == SET_STRING_ATTR1) || - (stype == SET_INT32_ATTR1) || - (stype == SET_UINT32_ATTR1) || - (stype == SET_INT64_ATTR1) || - (stype == SET_UINT64_ATTR1) || - (stype == SET_FLOAT_ATTR1) || - (stype == SET_DOUBLE_ATTR1)) - free(anAttrName); - - if ((stype == SET_STRING_ATTR1) || - (stype == SET_STRING_ATTR2)) - free(stringStruct.aStringValue); - if (next) delete next; - next = 0; + TransporterFacade* tp = TransporterFacade::instance(); + Guard guard(tp->theMutexPtr); + Uint32 seq = theNdbCon->theNodeSequence; + Uint32 nodeId = theNdbCon->theDBnode; + if(seq == tp->getNodeSequence(nodeId) && send_next_scan(0, true) == 0){ + return 1; + } + return -1; } int -NdbScanOperation::equal_impl(const NdbColumnImpl* anAttrObject, - const char* aValue, - Uint32 len){ - return setBound(anAttrObject, BoundEQ, aValue, len); +NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ + if(idx == theParallelism) + return 0; + + NdbApiSignal tSignal(theNdb->theMyRef); + tSignal.setSignal(GSN_SCAN_NEXTREQ); + + Uint32* theData = tSignal.getDataPtrSend(); + theData[0] = theNdbCon->theTCConPtr; + theData[1] = 0; + Uint64 transId = theNdbCon->theTransactionId; + theData[2] = transId; + theData[3] = (Uint32) (transId >> 32); + + /** + * Prepare ops + */ + Uint32 last = m_sent_receivers_count; + Uint32 * prep_array = theData + 4; + + NdbReceiver * tRec = m_api_receivers[idx]; + m_sent_receivers[last] = tRec; + tRec->m_list_index = last; + prep_array[0] = tRec->m_tcPtrI; + tRec->prepareSend(); + + m_sent_receivers_count = last + 1; + + Uint32 nodeId = theNdbCon->theDBnode; + TransporterFacade * tp = TransporterFacade::instance(); + tSignal.setLength(4+1); + return tp->sendSignal(&tSignal, nodeId); } - - diff --git a/ndb/src/ndbapi/Ndbif.cpp b/ndb/src/ndbapi/Ndbif.cpp index 2d722e12129..60eda978397 100644 --- a/ndb/src/ndbapi/Ndbif.cpp +++ b/ndb/src/ndbapi/Ndbif.cpp @@ -16,12 +16,11 @@ #include "NdbApiSignal.hpp" +#include "AttrType.hpp" #include "NdbImpl.hpp" -//#include "NdbSchemaOp.hpp" -//#include "NdbSchemaCon.hpp" #include "NdbOperation.hpp" #include "NdbIndexOperation.hpp" -#include "NdbScanReceiver.hpp" +#include "NdbScanOperation.hpp" #include "NdbConnection.hpp" #include "NdbRecAttr.hpp" #include "NdbReceiver.hpp" @@ -34,6 +33,9 @@ #include <signaldata/CreateIndx.hpp> #include <signaldata/DropIndx.hpp> #include <signaldata/TcIndx.hpp> +#include <signaldata/TransIdAI.hpp> +#include <signaldata/ScanFrag.hpp> +#include <signaldata/ScanTab.hpp> #include <ndb_limits.h> #include <NdbOut.hpp> @@ -41,12 +43,13 @@ /****************************************************************************** - * int init( int aMaxNoOfTransactions ); + * int init( int aNrOfCon, int aNrOfOp ); * * Return Value: Return 0 : init was successful. * Return -1: In all other case. - * Parameters: aMaxNoOfTransactions : Max number of simultaneous transations - * Remark: Create pointers and idle list Synchronous. + * Parameters: aNrOfCon : Number of connections offered to the application. + * aNrOfOp : Number of operations offered to the application. + * Remark: Create pointers and idle list Synchronous. ****************************************************************************/ int Ndb::init(int aMaxNoOfTransactions) @@ -88,7 +91,7 @@ Ndb::init(int aMaxNoOfTransactions) theMyRef = numberToRef(theNdbBlockNumber, theNode); for (i = 1; i < MAX_NDB_NODES; i++){ - if (theFacade->getIsNodeDefined(i)){ + if (theFacade->getIsDbNode(i)){ theDBnodes[theNoOfDBnodes] = i; theNoOfDBnodes++; } @@ -252,9 +255,8 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId) for (int i = tNoSentTransactions - 1; i >= 0; i--) { NdbConnection* localCon = theSentTransactionsArray[i]; if (localCon->getConnectedNodeId() == aNodeId ) { - const NdbConnection::SendStatusType sendStatus = localCon->theSendStatus; - if (sendStatus == NdbConnection::sendTC_OP || - sendStatus == NdbConnection::sendTC_COMMIT) { + const SendStatusType sendStatus = localCon->theSendStatus; + if (sendStatus == sendTC_OP || sendStatus == sendTC_COMMIT) { /* A transaction was interrupted in the prepare phase by a node failure. Since the transaction was not found in the phase @@ -262,13 +264,13 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId) we report a normal node failure abort. */ localCon->setOperationErrorCodeAbort(4010); - localCon->theCompletionStatus = NdbConnection::CompletedFailure; - } else if (sendStatus == NdbConnection::sendTC_ROLLBACK) { + localCon->theCompletionStatus = CompletedFailure; + } else if (sendStatus == sendTC_ROLLBACK) { /* We aimed for abort and abort we got even if it was by a node failure. We will thus report it as a success. */ - localCon->theCompletionStatus = NdbConnection::CompletedSuccess; + localCon->theCompletionStatus = CompletedSuccess; } else { #ifdef VM_TRACE printState("abortTransactionsAfterNodeFailure %x", this); @@ -280,7 +282,7 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId) intact since the node was failing and they were aborted. Thus we set commit state to Aborted and set state to release on close. */ - localCon->theCommitStatus = NdbConnection::Aborted; + localCon->theCommitStatus = Aborted; localCon->theReleaseOnClose = true; completedTransaction(localCon); }//if @@ -300,26 +302,28 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) NdbOperation* tOp; NdbIndexOperation* tIndexOp; NdbConnection* tCon; - int tReturnCode; + int tReturnCode = -1; const Uint32* tDataPtr = aSignal->getDataPtr(); const Uint32 tWaitState = theWaiter.m_state; const Uint32 tSignalNumber = aSignal->readSignalNumber(); const Uint32 tFirstData = *tDataPtr; + const Uint32 tLen = aSignal->getLength(); + void * tFirstDataPtr; /* - In order to support 64 bit processes in the application we need to use - id's rather than a direct pointer to the object used. It is also a good - idea that one cannot corrupt the application code by sending a corrupt - memory pointer. - - All signals received by the API requires the first data word to be such - an id to the receiving object. + In order to support 64 bit processes in the application we need to use + id's rather than a direct pointer to the object used. It is also a good + idea that one cannot corrupt the application code by sending a corrupt + memory pointer. + + All signals received by the API requires the first data word to be such + an id to the receiving object. */ - + switch (tSignalNumber){ case GSN_TCKEYCONF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; const TcKeyConf * const keyConf = (TcKeyConf *)tDataPtr; @@ -327,8 +331,8 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) tCon = void2con(tFirstDataPtr); if ((tCon->checkMagicNumber() == 0) && - (tCon->theSendStatus == NdbConnection::sendTC_OP)) { - tReturnCode = tCon->receiveTCKEYCONF(keyConf, aSignal->getLength()); + (tCon->theSendStatus == sendTC_OP)) { + tReturnCode = tCon->receiveTCKEYCONF(keyConf, tLen); if (tReturnCode != -1) { completedTransaction(tCon); }//if @@ -346,91 +350,48 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) return; } - case GSN_READCONF: - { - void* tFirstDataPtr = int2void(tFirstData); - if (tFirstDataPtr == 0) goto InvalidSignal; - - tOp = void2rec_op(tFirstDataPtr); - if (tOp->checkMagicNumber() == 0) { - tCon = tOp->theNdbCon; - if (tCon != NULL) { - if (tCon->theSendStatus == NdbConnection::sendTC_OP) { - tReturnCode = tOp->receiveREAD_CONF(tDataPtr, - aSignal->getLength()); - if (tReturnCode != -1) { - completedTransaction(tCon); - }//if - }//if - }//if - }//if - return; - } case GSN_TRANSID_AI: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); + assert(tFirstDataPtr); if (tFirstDataPtr == 0) goto InvalidSignal; - - // ndbout << "*** GSN_TRANSID_AI ***" << endl; NdbReceiver* tRec = void2rec(tFirstDataPtr); - if (tRec->getType() == NdbReceiver::NDB_OPERATION){ - // tOp = (NdbOperation*)tRec->getOwner(); - tOp = void2rec_op(tFirstDataPtr); - // ndbout << "NDB_OPERATION" << endl; - if (tOp->checkMagicNumber() == 0) { - tCon = tOp->theNdbCon; - if (tCon != NULL) { - if (tCon->theSendStatus == NdbConnection::sendTC_OP) { - tReturnCode = tOp->receiveTRANSID_AI(tDataPtr, - aSignal->getLength()); - if (tReturnCode != -1) { - completedTransaction(tCon); - break; - } - } - } + assert(tRec->checkMagicNumber()); + assert(tRec->getTransaction()); + assert(tRec->getTransaction()->checkState_TransId(((const TransIdAI*)tDataPtr)->transId)); + if(tRec->checkMagicNumber() && (tCon = tRec->getTransaction()) && + tCon->checkState_TransId(((const TransIdAI*)tDataPtr)->transId)){ + Uint32 com; + if(aSignal->m_noOfSections > 0){ + com = tRec->execTRANSID_AI(ptr[0].p, ptr[0].sz); + } else { + com = tRec->execTRANSID_AI(tDataPtr + TransIdAI::HeaderLength, + tLen - TransIdAI::HeaderLength); } - } else if (tRec->getType() == NdbReceiver::NDB_INDEX_OPERATION){ - // tOp = (NdbIndexOperation*)tRec->getOwner(); - tOp = void2rec_iop(tFirstDataPtr); - // ndbout << "NDB_INDEX_OPERATION" << endl; - if (tOp->checkMagicNumber() == 0) { - tCon = tOp->theNdbCon; - if (tCon != NULL) { - if (tCon->theSendStatus == NdbConnection::sendTC_OP) { - tReturnCode = tOp->receiveTRANSID_AI(tDataPtr, - aSignal->getLength()); - if (tReturnCode != -1) { - completedTransaction(tCon); - break; - } - } - } - } - } else if (tRec->getType() == NdbReceiver::NDB_SCANRECEIVER) { - // NdbScanReceiver* tScanRec = (NdbScanReceiver*)tRec->getOwner(); - // NdbScanReceiver* tScanRec = - // (NdbScanReceiver*)(void2rec(tFirstDataPtr)->getOwner()); - NdbScanReceiver* tScanRec = void2rec_srec(tFirstDataPtr); - // ndbout << "NDB_SCANRECEIVER" << endl; - if(tScanRec->checkMagicNumber() == 0){ - tReturnCode = tScanRec->receiveTRANSID_AI_SCAN(aSignal); - if (tReturnCode != -1) { + + if(com == 1){ + switch(tRec->getType()){ + case NdbReceiver::NDB_OPERATION: + case NdbReceiver::NDB_INDEX_OPERATION: + if(tCon->OpCompleteSuccess() != -1) + completedTransaction(tCon); + break; + case NdbReceiver::NDB_SCANRECEIVER: + tCon->theScanningOp->receiver_delivered(tRec); theWaiter.m_state = NO_WAIT; break; + default: + goto InvalidSignal; } } + break; } else { -#ifdef NDB_NO_DROPPED_SIGNAL - abort(); -#endif goto InvalidSignal; } - return; } case GSN_TCKEY_FAILCONF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; const TcKeyFailConf * const failConf = (TcKeyFailConf *)tDataPtr; @@ -441,8 +402,8 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) if (tOp->checkMagicNumber() == 0) { tCon = tOp->theNdbCon; if (tCon != NULL) { - if ((tCon->theSendStatus == NdbConnection::sendTC_OP) || - (tCon->theSendStatus == NdbConnection::sendTC_COMMIT)) { + if ((tCon->theSendStatus == sendTC_OP) || + (tCon->theSendStatus == sendTC_COMMIT)) { tReturnCode = tCon->receiveTCKEY_FAILCONF(failConf); if (tReturnCode != -1) { completedTransaction(tCon); @@ -461,15 +422,15 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } case GSN_TCKEY_FAILREF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; tOp = void2rec_op(tFirstDataPtr); if (tOp->checkMagicNumber() == 0) { tCon = tOp->theNdbCon; if (tCon != NULL) { - if ((tCon->theSendStatus == NdbConnection::sendTC_OP) || - (tCon->theSendStatus == NdbConnection::sendTC_ROLLBACK)) { + if ((tCon->theSendStatus == sendTC_OP) || + (tCon->theSendStatus == sendTC_ROLLBACK)) { tReturnCode = tCon->receiveTCKEY_FAILREF(aSignal); if (tReturnCode != -1) { completedTransaction(tCon); @@ -482,14 +443,14 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } case GSN_TCKEYREF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; tOp = void2rec_op(tFirstDataPtr); if (tOp->checkMagicNumber() == 0) { tCon = tOp->theNdbCon; if (tCon != NULL) { - if (tCon->theSendStatus == NdbConnection::sendTC_OP) { + if (tCon->theSendStatus == sendTC_OP) { tReturnCode = tOp->receiveTCKEYREF(aSignal); if (tReturnCode != -1) { completedTransaction(tCon); @@ -503,7 +464,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } case GSN_TC_COMMITCONF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; const TcCommitConf * const commitConf = (TcCommitConf *)tDataPtr; @@ -511,7 +472,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) tCon = void2con(tFirstDataPtr); if ((tCon->checkMagicNumber() == 0) && - (tCon->theSendStatus == NdbConnection::sendTC_COMMIT)) { + (tCon->theSendStatus == sendTC_COMMIT)) { tReturnCode = tCon->receiveTC_COMMITCONF(commitConf); if (tReturnCode != -1) { completedTransaction(tCon); @@ -531,12 +492,12 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) case GSN_TC_COMMITREF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; tCon = void2con(tFirstDataPtr); if ((tCon->checkMagicNumber() == 0) && - (tCon->theSendStatus == NdbConnection::sendTC_COMMIT)) { + (tCon->theSendStatus == sendTC_COMMIT)) { tReturnCode = tCon->receiveTC_COMMITREF(aSignal); if (tReturnCode != -1) { completedTransaction(tCon); @@ -547,12 +508,12 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } case GSN_TCROLLBACKCONF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; tCon = void2con(tFirstDataPtr); if ((tCon->checkMagicNumber() == 0) && - (tCon->theSendStatus == NdbConnection::sendTC_ROLLBACK)) { + (tCon->theSendStatus == sendTC_ROLLBACK)) { tReturnCode = tCon->receiveTCROLLBACKCONF(aSignal); if (tReturnCode != -1) { completedTransaction(tCon); @@ -562,12 +523,12 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } case GSN_TCROLLBACKREF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; tCon = void2con(tFirstDataPtr); if ((tCon->checkMagicNumber() == 0) && - (tCon->theSendStatus == NdbConnection::sendTC_ROLLBACK)) { + (tCon->theSendStatus == sendTC_ROLLBACK)) { tReturnCode = tCon->receiveTCROLLBACKREF(aSignal); if (tReturnCode != -1) { completedTransaction(tCon); @@ -578,7 +539,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } case GSN_TCROLLBACKREP: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; tCon = void2con(tFirstDataPtr); @@ -592,7 +553,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } case GSN_TCSEIZECONF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; if (tWaitState != WAIT_TC_SEIZE) { @@ -612,7 +573,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } case GSN_TCSEIZEREF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; if (tWaitState != WAIT_TC_SEIZE) { @@ -632,7 +593,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } case GSN_TCRELEASECONF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; if (tWaitState != WAIT_TC_RELEASE) { @@ -650,7 +611,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } case GSN_TCRELEASEREF: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; if (tWaitState != WAIT_TC_RELEASE) { @@ -704,7 +665,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) case GSN_DIHNDBTAMPER: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; if (tWaitState != WAIT_NDB_TAMPER) @@ -718,27 +679,34 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) break; } case GSN_SCAN_TABCONF: - { - void* tFirstDataPtr = int2void(tFirstData); - if (tFirstDataPtr == 0) goto InvalidSignal; - - //ndbout << "*** GSN_SCAN_TABCONF *** " << endl; - if (tWaitState != WAIT_SCAN){ - return; - } - tCon = void2con(tFirstDataPtr); - if (tCon->checkMagicNumber() != 0) - return; - tReturnCode = tCon->receiveSCAN_TABCONF(aSignal); - if (tReturnCode != -1) - theWaiter.m_state = NO_WAIT; - break; + { + tFirstDataPtr = int2void(tFirstData); + assert(tFirstDataPtr); + assert(void2con(tFirstDataPtr)); + assert(void2con(tFirstDataPtr)->checkMagicNumber() == 0); + if(tFirstDataPtr && + (tCon = void2con(tFirstDataPtr)) && (tCon->checkMagicNumber() == 0)){ + + if(aSignal->m_noOfSections > 0){ + tReturnCode = tCon->receiveSCAN_TABCONF(aSignal, ptr[0].p, ptr[0].sz); + } else { + tReturnCode = + tCon->receiveSCAN_TABCONF(aSignal, + tDataPtr + ScanTabConf::SignalLength, + tLen - ScanTabConf::SignalLength); + } + if (tReturnCode != -1) + theWaiter.m_state = NO_WAIT; + break; + } else { + goto InvalidSignal; } + } case GSN_SCAN_TABREF: - { - void* tFirstDataPtr = int2void(tFirstData); + { + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; - + if (tWaitState == WAIT_SCAN){ tCon = void2con(tFirstDataPtr); if (tCon->checkMagicNumber() == 0){ @@ -753,43 +721,49 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } case GSN_SCAN_TABINFO: { - void* tFirstDataPtr = int2void(tFirstData); - if (tFirstDataPtr == 0) goto InvalidSignal; - - //ndbout << "*** GSN_SCAN_TABINFO ***" << endl; - if (tWaitState != WAIT_SCAN) - return; - tCon = void2con(tFirstDataPtr); - if (tCon->checkMagicNumber() != 0) - return; - tReturnCode = tCon->receiveSCAN_TABINFO(aSignal); - if (tReturnCode != -1) - theWaiter.m_state = NO_WAIT; - break; + goto InvalidSignal; } case GSN_KEYINFO20: { - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; - - //ndbout << "*** GSN_KEYINFO20 ***" << endl; - NdbScanReceiver* tScanRec = void2rec_srec(tFirstDataPtr); - if (tScanRec->checkMagicNumber() != 0) - return; - tReturnCode = tScanRec->receiveKEYINFO20(aSignal); - if (tReturnCode != -1) - theWaiter.m_state = NO_WAIT; - break; + NdbReceiver* tRec = void2rec(tFirstDataPtr); + + if(tRec->checkMagicNumber() && (tCon = tRec->getTransaction()) && + tCon->checkState_TransId(&((const KeyInfo20*)tDataPtr)->transId1)){ + + Uint32 len = ((const KeyInfo20*)tDataPtr)->keyLen; + Uint32 info = ((const KeyInfo20*)tDataPtr)->scanInfo_Node; + int com = -1; + if(aSignal->m_noOfSections > 0 && len == ptr[0].sz){ + com = tRec->execKEYINFO20(info, ptr[0].p, len); + } else if(len == tLen - KeyInfo20::HeaderLength){ + com = tRec->execKEYINFO20(info, tDataPtr+KeyInfo20::HeaderLength, len); + } + + switch(com){ + case 1: + tCon->theScanningOp->receiver_delivered(tRec); + theWaiter.m_state = NO_WAIT; + break; + case 0: + break; + case -1: + goto InvalidSignal; + } + break; + } + goto InvalidSignal; } case GSN_TCINDXCONF:{ - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; const TcIndxConf * const indxConf = (TcIndxConf *)tDataPtr; const BlockReference aTCRef = aSignal->theSendersBlockRef; tCon = void2con(tFirstDataPtr); if ((tCon->checkMagicNumber() == 0) && - (tCon->theSendStatus == NdbConnection::sendTC_OP)) { - tReturnCode = tCon->receiveTCINDXCONF(indxConf, aSignal->getLength()); + (tCon->theSendStatus == sendTC_OP)) { + tReturnCode = tCon->receiveTCINDXCONF(indxConf, tLen); if (tReturnCode != -1) { completedTransaction(tCon); }//if @@ -804,14 +778,14 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) break; } case GSN_TCINDXREF:{ - void* tFirstDataPtr = int2void(tFirstData); + tFirstDataPtr = int2void(tFirstData); if (tFirstDataPtr == 0) goto InvalidSignal; tIndexOp = void2rec_iop(tFirstDataPtr); if (tIndexOp->checkMagicNumber() == 0) { tCon = tIndexOp->theNdbCon; if (tCon != NULL) { - if (tCon->theSendStatus == NdbConnection::sendTC_OP) { + if (tCon->theSendStatus == sendTC_OP) { tReturnCode = tIndexOp->receiveTCINDXREF(aSignal); if (tReturnCode != -1) { completedTransaction(tCon); @@ -865,8 +839,7 @@ Ndb::completedTransaction(NdbConnection* aCon) Uint32 tTransArrayIndex = aCon->theTransArrayIndex; Uint32 tNoSentTransactions = theNoOfSentTransactions; Uint32 tNoCompletedTransactions = theNoOfCompletedTransactions; - if ((tNoSentTransactions > 0) && - (aCon->theListState == NdbConnection::InSendList) && + if ((tNoSentTransactions > 0) && (aCon->theListState == InSendList) && (tTransArrayIndex < tNoSentTransactions)) { NdbConnection* tMoveCon = theSentTransactionsArray[tNoSentTransactions - 1]; @@ -880,7 +853,7 @@ Ndb::completedTransaction(NdbConnection* aCon) theNoOfCompletedTransactions = tNoCompletedTransactions + 1; theNoOfSentTransactions = tNoSentTransactions - 1; - aCon->theListState = NdbConnection::InCompletedList; + aCon->theListState = InCompletedList; aCon->handleExecuteCompletion(); if ((theMinNoOfEventsToWakeUp != 0) && (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) { @@ -895,8 +868,8 @@ Ndb::completedTransaction(NdbConnection* aCon) ndbout << endl << flush; #ifdef VM_TRACE printState("completedTransaction abort"); -#endif abort(); +#endif }//if }//Ndb::completedTransaction() @@ -915,7 +888,7 @@ Ndb::reportCallback(NdbConnection** aCopyArray, Uint32 aNoOfCompletedTrans) NdbAsynchCallback aCallback = aCopyArray[i]->theCallbackFunction; int tResult = 0; if (aCallback != NULL) { - if (aCopyArray[i]->theReturnStatus == NdbConnection::ReturnFailure) { + if (aCopyArray[i]->theReturnStatus == ReturnFailure) { tResult = -1; }//if (*aCallback)(tResult, aCopyArray[i], anyObject); @@ -939,13 +912,13 @@ Ndb::pollCompleted(NdbConnection** aCopyArray) if (tNoCompletedTransactions > 0) { for (i = 0; i < tNoCompletedTransactions; i++) { aCopyArray[i] = theCompletedTransactionsArray[i]; - if (aCopyArray[i]->theListState != NdbConnection::InCompletedList) { + if (aCopyArray[i]->theListState != InCompletedList) { ndbout << "pollCompleted error "; ndbout << aCopyArray[i]->theListState << endl; abort(); }//if theCompletedTransactionsArray[i] = NULL; - aCopyArray[i]->theListState = NdbConnection::NotInList; + aCopyArray[i]->theListState = NotInList; }//for }//if theNoOfCompletedTransactions = 0; @@ -967,8 +940,8 @@ Ndb::check_send_timeout() a_con->printState(); #endif a_con->setOperationErrorCodeAbort(4012); - a_con->theCommitStatus = NdbConnection::Aborted; - a_con->theCompletionStatus = NdbConnection::CompletedFailure; + a_con->theCommitStatus = Aborted; + a_con->theCompletionStatus = CompletedFailure; a_con->handleExecuteCompletion(); remove_sent_list(i); insert_completed_list(a_con); @@ -997,7 +970,7 @@ Ndb::insert_completed_list(NdbConnection* a_con) Uint32 no_of_comp = theNoOfCompletedTransactions; theCompletedTransactionsArray[no_of_comp] = a_con; theNoOfCompletedTransactions = no_of_comp + 1; - a_con->theListState = NdbConnection::InCompletedList; + a_con->theListState = InCompletedList; a_con->theTransArrayIndex = no_of_comp; return no_of_comp; } @@ -1008,7 +981,7 @@ Ndb::insert_sent_list(NdbConnection* a_con) Uint32 no_of_sent = theNoOfSentTransactions; theSentTransactionsArray[no_of_sent] = a_con; theNoOfSentTransactions = no_of_sent + 1; - a_con->theListState = NdbConnection::InSendList; + a_con->theListState = InSendList; a_con->theTransArrayIndex = no_of_sent; return no_of_sent; } @@ -1046,10 +1019,10 @@ Ndb::sendPrepTrans(int forceSend) if ((tp->getNodeSequence(node_id) == a_con->theNodeSequence) && tp->get_node_alive(node_id) || (tp->get_node_stopping(node_id) && - ((a_con->theSendStatus == NdbConnection::sendABORT) || - (a_con->theSendStatus == NdbConnection::sendABORTfail) || - (a_con->theSendStatus == NdbConnection::sendCOMMITstate) || - (a_con->theSendStatus == NdbConnection::sendCompleted)))) { + ((a_con->theSendStatus == sendABORT) || + (a_con->theSendStatus == sendABORTfail) || + (a_con->theSendStatus == sendCOMMITstate) || + (a_con->theSendStatus == sendCompleted)))) { /* We will send if 1) Node is alive and sequences are correct OR @@ -1081,13 +1054,13 @@ Ndb::sendPrepTrans(int forceSend) again and will thus set the state to Aborted to avoid a more or less eternal loop of tries. */ - if (a_con->theSendStatus == NdbConnection::sendOperations) { + if (a_con->theSendStatus == sendOperations) { a_con->setOperationErrorCodeAbort(4021); - a_con->theCommitStatus = NdbConnection::NeedAbort; + a_con->theCommitStatus = NeedAbort; TRACE_DEBUG("Send buffer full and sendOperations"); } else { a_con->setOperationErrorCodeAbort(4026); - a_con->theCommitStatus = NdbConnection::Aborted; + a_con->theCommitStatus = Aborted; TRACE_DEBUG("Send buffer full, set state to Aborted"); }//if }//if @@ -1104,7 +1077,7 @@ Ndb::sendPrepTrans(int forceSend) */ TRACE_DEBUG("Abort a transaction when stopping a node"); a_con->setOperationErrorCodeAbort(4023); - a_con->theCommitStatus = NdbConnection::NeedAbort; + a_con->theCommitStatus = NeedAbort; } else { /* The node is hard dead and we cannot continue. We will also release @@ -1114,10 +1087,10 @@ Ndb::sendPrepTrans(int forceSend) a_con->setOperationErrorCodeAbort(4025); a_con->theReleaseOnClose = true; a_con->theTransactionIsStarted = false; - a_con->theCommitStatus = NdbConnection::Aborted; + a_con->theCommitStatus = Aborted; }//if }//if - a_con->theCompletionStatus = NdbConnection::CompletedFailure; + a_con->theCompletionStatus = CompletedFailure; a_con->handleExecuteCompletion(); insert_completed_list(a_con); }//for @@ -1255,8 +1228,7 @@ Return: 0 - Response received ******************************************************************************/ int -Ndb::receiveResponse(int waitTime) -{ +Ndb::receiveResponse(int waitTime){ int tResultCode; TransporterFacade::instance()->checkForceSend(theNdbBlockNumber); @@ -1310,10 +1282,10 @@ Ndb::sendRecSignal(Uint16 node_id, if (return_code != -1) { theWaiter.m_node = node_id; theWaiter.m_state = aWaitState; - return receiveResponse(); - // End of protected area - }//if - return_code = -3; + return_code = receiveResponse(); + } else { + return_code = -3; + } } else { return_code = -4; }//if diff --git a/ndb/src/ndbapi/Ndbinit.cpp b/ndb/src/ndbapi/Ndbinit.cpp index 042faa431a0..1d9d6fee5e4 100644 --- a/ndb/src/ndbapi/Ndbinit.cpp +++ b/ndb/src/ndbapi/Ndbinit.cpp @@ -137,7 +137,7 @@ Ndb::Ndb( const char* aDataBase , const char* aDataBaseSchema) : TransporterFacade * m_facade = 0; if(theNoOfNdbObjects == 0){ - if ((m_facade = TransporterFacade::start_instance(0,ndbConnectString)) == 0) + if ((m_facade = TransporterFacade::start_instance(ndbConnectString)) == 0) theInitState = InitConfigError; } else { m_facade = TransporterFacade::instance(); diff --git a/ndb/src/ndbapi/Ndblist.cpp b/ndb/src/ndbapi/Ndblist.cpp index 0d9c0f60985..8cc8ba1a079 100644 --- a/ndb/src/ndbapi/Ndblist.cpp +++ b/ndb/src/ndbapi/Ndblist.cpp @@ -15,16 +15,13 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include <NdbOut.hpp> -#include "Ndb.hpp" -//#include "NdbSchemaOp.hpp" -//#include "NdbSchemaCon.hpp" -#include "NdbOperation.hpp" -#include "NdbScanOperation.hpp" -#include "NdbIndexOperation.hpp" -#include "NdbConnection.hpp" +#include <Ndb.hpp> +#include <NdbOperation.hpp> +#include <NdbIndexOperation.hpp> +#include <NdbIndexScanOperation.hpp> +#include <NdbConnection.hpp> #include "NdbApiSignal.hpp" -#include "NdbRecAttr.hpp" -#include "NdbScanReceiver.hpp" +#include <NdbRecAttr.hpp> #include "NdbUtil.hpp" #include "API.hpp" @@ -263,13 +260,13 @@ Ndb::getNdbLabel() * Remark: Get a NdbScanReceiver from theScanRecList and return the * object . ****************************************************************************/ -NdbScanReceiver* +NdbReceiver* Ndb::getNdbScanRec() { - NdbScanReceiver* tNdbScanRec; + NdbReceiver* tNdbScanRec; if ( theScanList == NULL ) { - tNdbScanRec = new NdbScanReceiver(this); + tNdbScanRec = new NdbReceiver(this); if (tNdbScanRec == NULL) { return NULL; @@ -344,17 +341,17 @@ Return Value: Return theOpList : if the getScanOperation was succesful. Return NULL : In all other case. Remark: Get an operation from theScanOpIdleList and return the object . ***************************************************************************/ -NdbScanOperation* +NdbIndexScanOperation* Ndb::getScanOperation() { - NdbScanOperation* tOp = theScanOpIdleList; + NdbIndexScanOperation* tOp = theScanOpIdleList; if (tOp != NULL ) { - NdbScanOperation* tOpNext = (NdbScanOperation*) tOp->next(); + NdbIndexScanOperation* tOpNext = (NdbIndexScanOperation*)tOp->next(); tOp->next(NULL); theScanOpIdleList = tOpNext; return tOp; } else { - tOp = new NdbScanOperation(this); + tOp = new NdbIndexScanOperation(this); if (tOp != NULL) tOp->next(NULL); } @@ -495,7 +492,7 @@ Parameters: aNdbScanRec: The NdbScanReceiver object. Remark: Add a NdbScanReceiver object into the Scan idlelist. ***************************************************************************/ void -Ndb::releaseNdbScanRec(NdbScanReceiver* aNdbScanRec) +Ndb::releaseNdbScanRec(NdbReceiver* aNdbScanRec) { aNdbScanRec->next(theScanList); theScanList = aNdbScanRec; @@ -544,12 +541,12 @@ Parameters: aScanOperation : The released NdbScanOperation object. Remark: Add a NdbScanOperation object into the signal idlelist. ***************************************************************************/ void -Ndb::releaseScanOperation(NdbScanOperation* aScanOperation) +Ndb::releaseScanOperation(NdbIndexScanOperation* aScanOperation) { aScanOperation->next(theScanOpIdleList); aScanOperation->theNdbCon = NULL; aScanOperation->theMagicNumber = 0xFE11D2; - theScanOpIdleList = (NdbScanOperation*)aScanOperation; + theScanOpIdleList = aScanOperation; } /*************************************************************************** @@ -623,7 +620,7 @@ void Ndb::freeScanOperation() { NdbScanOperation* tOp = theScanOpIdleList; - theScanOpIdleList = (NdbScanOperation *) theScanOpIdleList->next(); + theScanOpIdleList = (NdbIndexScanOperation *) theScanOpIdleList->next(); delete tOp; } @@ -674,7 +671,7 @@ Remark: Always release the first item in the free list void Ndb::freeNdbScanRec() { - NdbScanReceiver* tNdbScanRec = theScanList; + NdbReceiver* tNdbScanRec = theScanList; theScanList = theScanList->next(); delete tNdbScanRec; } diff --git a/ndb/src/ndbapi/ObjectMap.hpp b/ndb/src/ndbapi/ObjectMap.hpp index 4abb54b5081..f67774bb413 100644 --- a/ndb/src/ndbapi/ObjectMap.hpp +++ b/ndb/src/ndbapi/ObjectMap.hpp @@ -93,26 +93,28 @@ inline void * NdbObjectIdMap::unmap(Uint32 id, void *object){ - int i = id>>2; + Uint32 i = id>>2; // lock(); - - void * obj = m_map[i].m_obj; - if (object == obj) { - m_map[i].m_next = m_firstFree; - m_firstFree = i; - } else { - ndbout_c("Error: NdbObjectIdMap::::unmap(%u, 0x%x) obj=0x%x", id, object, obj); - return 0; - } - - // unlock(); - + if(i < m_size){ + void * obj = m_map[i].m_obj; + if (object == obj) { + m_map[i].m_next = m_firstFree; + m_firstFree = i; + } else { + ndbout_c("Error: NdbObjectIdMap::::unmap(%u, 0x%x) obj=0x%x", id, object, obj); + return 0; + } + + // unlock(); + #ifdef DEBUG_OBJECTMAP - ndbout_c("NdbObjectIdMap::unmap(%u) obj=0x%x", id, obj); + ndbout_c("NdbObjectIdMap::unmap(%u) obj=0x%x", id, obj); #endif - - return obj; + + return obj; + } + return 0; } inline void * @@ -120,7 +122,11 @@ NdbObjectIdMap::getObject(Uint32 id){ #ifdef DEBUG_OBJECTMAP ndbout_c("NdbObjectIdMap::getObject(%u) obj=0x%x", id, m_map[id>>2].m_obj); #endif - return m_map[id>>2].m_obj; + id >>= 2; + if(id < m_size){ + return m_map[id].m_obj; + } + return 0; } inline void @@ -129,7 +135,6 @@ NdbObjectIdMap::expand(Uint32 incSize){ MapEntry * tmp = (MapEntry*)malloc(newSize * sizeof(MapEntry)); memcpy(tmp, m_map, m_size * sizeof(MapEntry)); - free(m_map); m_map = tmp; for(Uint32 i = m_size; i<newSize; i++){ diff --git a/ndb/src/ndbapi/ScanOperation.txt b/ndb/src/ndbapi/ScanOperation.txt index 7197cf66f7e..27e4e8c1755 100644 --- a/ndb/src/ndbapi/ScanOperation.txt +++ b/ndb/src/ndbapi/ScanOperation.txt @@ -8,3 +8,49 @@ theNdbCon -> z z) NdbConnection (scan) theScanningOp -> y theFirstOpInList -> y (until after openScan) + +# SU + +ScanOpLen: includes KeyInfo +New protocol + +# -- Impl. + +1) Scan uses one NdbReceiver per "parallelism" +2) Each NdbReceiver can handle up to "batch size" rows +3) API send one "pointer" per parallelism (prev. was one per row) +4) API handles each receiver independently. + It can "nextResult"-one, receive one and close-one +5) When a recevier has been "nextResult"-ed, the API can fetch from it again +6) After doing "openScan"-req, no wait is performed + (only possible to block on nextResult(true) or closeScan) + +7) Instead of "ack"-ing each row with length, +* Each row is sent in one lonw signal (unless to short) +* Each NdbReceiver is ack-ed with #rows and sum(#length) +* KeyInfo20 is one signal and included in sum(#length) + +8) The API receive(s) the data into NdbRecAttr-objects + (prev. it copied signals using new/delete) +9) KeyInfo20 is also received into a NdbRecAttr-object +10) + +# -- Close of scan + +1) Each NdbReciver gets a signal when it's complete + (0 rows is ack-ed) +2) The API then "closes" this receiver +3) The API can at any time close then scan for other reason(s) + (example dying) +4) This is signal:ed via a NEXT_SCANREQ (close = 1) +5) TC responds with a SCAN_TABCONF (close = 1) + + +# -- Sorted + +1) The sorted scan is transparent to TC + It's a API only impl. +2) The API makes the following adjustements: +* Scan all fragments simultaniously (max parallelism) +* Never return a row to the API if a NdbReciver is "outstanding" +* Sort Receivers (only top row as they already are sorted within) diff --git a/ndb/src/ndbapi/TransporterFacade.cpp b/ndb/src/ndbapi/TransporterFacade.cpp index 7806e482f1f..e725144a8f8 100644 --- a/ndb/src/ndbapi/TransporterFacade.cpp +++ b/ndb/src/ndbapi/TransporterFacade.cpp @@ -28,6 +28,8 @@ #include "API.hpp" #include <ConfigRetriever.hpp> +#include <mgmapi_config_parameters.h> +#include <mgmapi_configuration.hpp> #include <NdbConfig.h> #include <ndb_version.h> #include <SignalLoggerManager.hpp> @@ -331,39 +333,40 @@ atexit_stop_instance(){ * Which is protected by a mutex */ TransporterFacade* -TransporterFacade::start_instance(Properties* props, const char *connectString) -{ - bool ownProps = false; - if (props == NULL) { - // TransporterFacade used from API get config from mgmt srvr - ConfigRetriever configRetriever; - configRetriever.setConnectString(connectString); - props = configRetriever.getConfig("API", NDB_VERSION); - if (props == 0) { - ndbout << "Configuration error: "; - const char* erString = configRetriever.getErrorString(); - if (erString == 0) { - erString = "No error specified!"; - } - ndbout << erString << endl; - return NULL; +TransporterFacade::start_instance(const char * connectString){ + + // TransporterFacade used from API get config from mgmt srvr + ConfigRetriever configRetriever; + configRetriever.setConnectString(connectString); + ndb_mgm_configuration * props = configRetriever.getConfig(NDB_VERSION, + NODE_TYPE_API); + if (props == 0) { + ndbout << "Configuration error: "; + const char* erString = configRetriever.getErrorString(); + if (erString == 0) { + erString = "No error specified!"; } - props->put("LocalNodeId", configRetriever.getOwnNodeId()); - props->put("LocalNodeType", "API"); - - ownProps = true; + ndbout << erString << endl; + return 0; } - TransporterFacade* tf = new TransporterFacade(); + const int nodeId = configRetriever.getOwnNodeId(); - if (! tf->init(props)) { + TransporterFacade * tf = start_instance(nodeId, props); + + free(props); + return tf; +} + +TransporterFacade* +TransporterFacade::start_instance(int nodeId, + const ndb_mgm_configuration* props) +{ + TransporterFacade* tf = new TransporterFacade(); + if (! tf->init(nodeId, props)) { delete tf; return NULL; } - if (ownProps) { - delete props; - } - /** * Install atexit handler */ @@ -498,61 +501,65 @@ TransporterFacade::TransporterFacade() : } bool -TransporterFacade::init(Properties* props) +TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) { - IPCConfig config(props); - - if (config.init() != 0) { - TRP_DEBUG( "IPCConfig object config failed to init()" ); - return false; - } - theOwnId = config.ownId(); - + theOwnId = nodeId; theTransporterRegistry = new TransporterRegistry(this); - if(config.configureTransporters(theTransporterRegistry) <= 0) { + + const int res = IPCConfig::configureTransporters(nodeId, + * props, + * theTransporterRegistry); + if(res <= 0){ TRP_DEBUG( "configureTransporters returned 0 or less" ); return false; } + ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE); + iter.first(); theClusterMgr = new ClusterMgr(* this); - theClusterMgr->init(config); - - theReceiveThread = NdbThread_Create(runReceiveResponse_C, - (void**)this, - 32768, - "ndb_receive", - NDB_THREAD_PRIO_LOW); - - theSendThread = NdbThread_Create(runSendRequest_C, - (void**)this, - 32768, - "ndb_send", - NDB_THREAD_PRIO_LOW); - - theClusterMgr->startThread(); + theClusterMgr->init(iter); /** * Unless there is a "Name", the initiated transporter is within * an NDB Cluster. (If "Name" is defined, then the transporter * is used to connect to a different system, i.e. NDB Cluster.) */ +#if 0 if (!props->contains("Name")) { - const Properties* p = 0; - if(!props->get("Node", ownId(), &p)) { +#endif + iter.first(); + if(iter.find(CFG_NODE_ID, nodeId)){ TRP_DEBUG( "Node info missing from config." ); return false; } Uint32 rank = 0; - if (p->get("ArbitrationRank", &rank) && rank > 0) { + if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){ theArbitMgr = new ArbitMgr(* this); theArbitMgr->setRank(rank); Uint32 delay = 0; - p->get("ArbitrationDelay", &delay); + iter.get(CFG_NODE_ARBIT_DELAY, &delay); theArbitMgr->setDelay(delay); } + +#if 0 } +#endif + + theReceiveThread = NdbThread_Create(runReceiveResponse_C, + (void**)this, + 32768, + "ndb_receive", + NDB_THREAD_PRIO_LOW); + + theSendThread = NdbThread_Create(runSendRequest_C, + (void**)this, + 32768, + "ndb_send", + NDB_THREAD_PRIO_LOW); + theClusterMgr->startThread(); + #ifdef API_TRACE signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut); #endif diff --git a/ndb/src/ndbapi/TransporterFacade.hpp b/ndb/src/ndbapi/TransporterFacade.hpp index 1fe72debe1c..4b76cbe864a 100644 --- a/ndb/src/ndbapi/TransporterFacade.hpp +++ b/ndb/src/ndbapi/TransporterFacade.hpp @@ -27,8 +27,8 @@ class ClusterMgr; class ArbitMgr; -class Properties; class IPCConfig; +struct ndb_mgm_configuration; class Ndb; class NdbApiSignal; @@ -51,10 +51,11 @@ class TransporterFacade public: TransporterFacade(); virtual ~TransporterFacade(); - bool init(Properties* props); + bool init(Uint32, const ndb_mgm_configuration *); static TransporterFacade* instance(); - static TransporterFacade* start_instance(Properties* ipcConfig, const char *connectString); + static TransporterFacade* start_instance(int, const ndb_mgm_configuration*); + static TransporterFacade* start_instance(const char *connectString); static void stop_instance(); /** @@ -79,7 +80,7 @@ public: // Is node available for running transactions bool get_node_alive(NodeId nodeId) const; bool get_node_stopping(NodeId nodeId) const; - bool getIsNodeDefined(NodeId nodeId) const; + bool getIsDbNode(NodeId nodeId) const; bool getIsNodeSendable(NodeId nodeId) const; Uint32 getNodeGrp(NodeId nodeId) const; Uint32 getNodeSequence(NodeId nodeId) const; @@ -255,8 +256,10 @@ TransporterFacade::check_send_size(Uint32 node_id, Uint32 send_size) inline bool -TransporterFacade::getIsNodeDefined(NodeId n) const { - return theClusterMgr->getNodeInfo(n).defined; +TransporterFacade::getIsDbNode(NodeId n) const { + return + theClusterMgr->getNodeInfo(n).defined && + theClusterMgr->getNodeInfo(n).m_info.m_type == NodeInfo::DB; } inline diff --git a/ndb/src/ndbapi/signal-sender/SignalSender.cpp b/ndb/src/ndbapi/signal-sender/SignalSender.cpp index e642848dcee..680d0c23b4a 100644 --- a/ndb/src/ndbapi/signal-sender/SignalSender.cpp +++ b/ndb/src/ndbapi/signal-sender/SignalSender.cpp @@ -71,7 +71,7 @@ SimpleSignal::print(FILE * out){ SignalSender::SignalSender(const char * connectString){ m_cond = NdbCondition_Create(); - theFacade = TransporterFacade::start_instance(0,connectString); + theFacade = TransporterFacade::start_instance(connectString); m_blockNo = theFacade->open(this, execSignal, execNodeStatus); assert(m_blockNo > 0); } |