diff options
Diffstat (limited to 'ndb/src/ndbapi/NdbConnection.cpp')
-rw-r--r-- | ndb/src/ndbapi/NdbConnection.cpp | 150 |
1 files changed, 131 insertions, 19 deletions
diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index 4ec098c3c60..ad415b8acbf 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -35,6 +35,7 @@ Adjust: 971022 UABMNST First version. #include "NdbApiSignal.hpp" #include "TransporterFacade.hpp" #include "API.hpp" +#include "NdbBlob.hpp" #include <ndb_limits.h> #include <signaldata/TcKeyConf.hpp> @@ -89,7 +90,8 @@ NdbConnection::NdbConnection( Ndb* aNdb ) : theCurrentScanRec(NULL), thePreviousScanRec(NULL), theScanningOp(NULL), - theBuddyConPtr(0xFFFFFFFF) + theBuddyConPtr(0xFFFFFFFF), + theBlobFlag(false) { theListState = NotInList; theError.code = 0; @@ -152,6 +154,8 @@ NdbConnection::init() m_theLastCursorOperation = NULL; m_firstExecutedCursorOp = 0; theBuddyConPtr = 0xFFFFFFFF; + // + theBlobFlag = false; }//NdbConnection::init() /***************************************************************************** @@ -251,6 +255,86 @@ NdbConnection::execute(ExecType aTypeOfExec, AbortOption abortOption, int forceSend) { + if (! theBlobFlag) + return executeNoBlobs(aTypeOfExec, abortOption, forceSend); + + // execute prepared ops in batches, as requested by blobs + + ExecType tExecType; + NdbOperation* tPrepOp; + + do { + tExecType = aTypeOfExec; + tPrepOp = theFirstOpInList; + while (tPrepOp != NULL) { + bool batch = false; + NdbBlob* tBlob = tPrepOp->theBlobList; + while (tBlob != NULL) { + if (tBlob->preExecute(tExecType, batch) == -1) + return -1; + tBlob = tBlob->theNext; + } + if (batch) { + // blob asked to execute all up to here now + tExecType = NoCommit; + break; + } + tPrepOp = tPrepOp->next(); + } + // save rest of prepared ops if batch + NdbOperation* tRestOp; + NdbOperation* tLastOp; + if (tPrepOp != NULL) { + tRestOp = tPrepOp->next(); + tPrepOp->next(NULL); + tLastOp = theLastOpInList; + theLastOpInList = tPrepOp; + } + if (tExecType == Commit) { + NdbOperation* tOp = theCompletedFirstOp; + while (tOp != NULL) { + NdbBlob* tBlob = tOp->theBlobList; + while (tBlob != NULL) { + if (tBlob->preCommit() == -1) + return -1; + tBlob = tBlob->theNext; + } + tOp = tOp->next(); + } + } + if (executeNoBlobs(tExecType, abortOption, forceSend) == -1) + return -1; + { + NdbOperation* tOp = theCompletedFirstOp; + while (tOp != NULL) { + NdbBlob* tBlob = tOp->theBlobList; + while (tBlob != NULL) { + // may add new operations if batch + if (tBlob->postExecute(tExecType) == -1) + return -1; + tBlob = tBlob->theNext; + } + tOp = tOp->next(); + } + } + // add saved prepared ops if batch + if (tPrepOp != NULL && tRestOp != NULL) { + if (theFirstOpInList == NULL) + theFirstOpInList = tRestOp; + else + theLastOpInList->next(tRestOp); + theLastOpInList = tLastOp; + } + } while (theFirstOpInList != NULL || tExecType != aTypeOfExec); + + return 0; +} + +int +NdbConnection::executeNoBlobs(ExecType aTypeOfExec, + AbortOption abortOption, + int forceSend) +{ //------------------------------------------------------------------------ // We will start by preparing all operations in the transaction defined // since last execute or since beginning. If this works ok we will continue @@ -330,7 +414,6 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec, * Reset error.code on execute */ theError.code = 0; - NdbCursorOperation* tcOp = m_theFirstCursorOperation; if (tcOp != 0){ // Execute any cursor operations @@ -885,7 +968,7 @@ Remark: Get an operation from NdbOperation object idlelist and object, synchronous. *****************************************************************************/ NdbOperation* -NdbConnection::getNdbOperation(NdbTableImpl * tab) +NdbConnection::getNdbOperation(NdbTableImpl * tab, NdbOperation* aNextOp) { NdbOperation* tOp; @@ -897,14 +980,28 @@ NdbConnection::getNdbOperation(NdbTableImpl * tab) tOp = theNdb->getOperation(); if (tOp == NULL) goto getNdbOp_error1; - if (theLastOpInList != NULL) { - theLastOpInList->next(tOp); - theLastOpInList = tOp; + if (aNextOp == NULL) { + if (theLastOpInList != NULL) { + theLastOpInList->next(tOp); + theLastOpInList = tOp; + } else { + theLastOpInList = tOp; + theFirstOpInList = tOp; + }//if + tOp->next(NULL); } else { - theLastOpInList = tOp; - theFirstOpInList = tOp; - }//if - tOp->next(NULL); + // add before the given op + if (theFirstOpInList == aNextOp) { + theFirstOpInList = tOp; + } else { + NdbOperation* aLoopOp = theFirstOpInList; + while (aLoopOp != NULL && aLoopOp->next() != aNextOp) + aLoopOp = aLoopOp->next(); + assert(aLoopOp != NULL); + aLoopOp->next(tOp); + } + tOp->next(aNextOp); + } if (tOp->init(tab, this) != -1) { return tOp; } else { @@ -1068,21 +1165,36 @@ Remark: Get an operation from NdbIndexOperation object idlelist and get *****************************************************************************/ NdbIndexOperation* NdbConnection::getNdbIndexOperation(NdbIndexImpl * anIndex, - NdbTableImpl * aTable) + NdbTableImpl * aTable, + NdbOperation* aNextOp) { NdbIndexOperation* tOp; tOp = theNdb->getIndexOperation(); if (tOp == NULL) goto getNdbOp_error1; - if (theLastOpInList != NULL) { - theLastOpInList->next(tOp); - theLastOpInList = tOp; + if (aNextOp == NULL) { + if (theLastOpInList != NULL) { + theLastOpInList->next(tOp); + theLastOpInList = tOp; + } else { + theLastOpInList = tOp; + theFirstOpInList = tOp; + }//if + tOp->next(NULL); } else { - theLastOpInList = tOp; - theFirstOpInList = tOp; - }//if - tOp->next(NULL); + // add before the given op + if (theFirstOpInList == aNextOp) { + theFirstOpInList = tOp; + } else { + NdbOperation* aLoopOp = theFirstOpInList; + while (aLoopOp != NULL && aLoopOp->next() != aNextOp) + aLoopOp = aLoopOp->next(); + assert(aLoopOp != NULL); + aLoopOp->next(tOp); + } + tOp->next(aNextOp); + } if (tOp->indxInit(anIndex, aTable, this)!= -1) { return tOp; } else { @@ -1706,7 +1818,7 @@ NdbConnection::getTransactionId() return theTransactionId; }//NdbConnection::getTransactionId() -CommitStatusType +NdbConnection::CommitStatusType NdbConnection::commitStatus() { return theCommitStatus; |