summaryrefslogtreecommitdiff
path: root/ndb/src/ndbapi
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/ndbapi')
-rw-r--r--ndb/src/ndbapi/ClusterMgr.cpp123
-rw-r--r--ndb/src/ndbapi/ClusterMgr.hpp4
-rw-r--r--ndb/src/ndbapi/Makefile_old19
-rw-r--r--ndb/src/ndbapi/Ndb.cpp48
-rw-r--r--ndb/src/ndbapi/NdbApiSignal.cpp3
-rw-r--r--ndb/src/ndbapi/NdbConnection.cpp492
-rw-r--r--ndb/src/ndbapi/NdbConnectionScan.cpp530
-rw-r--r--ndb/src/ndbapi/NdbCursorOperation.cpp6
-rw-r--r--ndb/src/ndbapi/NdbDictionaryImpl.cpp9
-rw-r--r--ndb/src/ndbapi/NdbDictionaryImpl.hpp4
-rw-r--r--ndb/src/ndbapi/NdbEventOperationImpl.cpp26
-rw-r--r--ndb/src/ndbapi/NdbImpl.hpp11
-rw-r--r--ndb/src/ndbapi/NdbIndexOperation.cpp6
-rw-r--r--ndb/src/ndbapi/NdbOperation.cpp59
-rw-r--r--ndb/src/ndbapi/NdbOperationDefine.cpp121
-rw-r--r--ndb/src/ndbapi/NdbOperationExec.cpp379
-rw-r--r--ndb/src/ndbapi/NdbOperationInt.cpp28
-rw-r--r--ndb/src/ndbapi/NdbOperationScan.cpp560
-rw-r--r--ndb/src/ndbapi/NdbRecAttr.cpp18
-rw-r--r--ndb/src/ndbapi/NdbReceiver.cpp172
-rw-r--r--ndb/src/ndbapi/NdbResultSet.cpp33
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp1561
-rw-r--r--ndb/src/ndbapi/Ndbif.cpp352
-rw-r--r--ndb/src/ndbapi/Ndbinit.cpp2
-rw-r--r--ndb/src/ndbapi/Ndblist.cpp39
-rw-r--r--ndb/src/ndbapi/ObjectMap.hpp41
-rw-r--r--ndb/src/ndbapi/ScanOperation.txt46
-rw-r--r--ndb/src/ndbapi/TransporterFacade.cpp117
-rw-r--r--ndb/src/ndbapi/TransporterFacade.hpp15
-rw-r--r--ndb/src/ndbapi/signal-sender/SignalSender.cpp2
30 files changed, 2041 insertions, 2785 deletions
diff --git a/ndb/src/ndbapi/ClusterMgr.cpp b/ndb/src/ndbapi/ClusterMgr.cpp
index 57967e5534f..b26d550fe31 100644
--- a/ndb/src/ndbapi/ClusterMgr.cpp
+++ b/ndb/src/ndbapi/ClusterMgr.cpp
@@ -32,6 +32,10 @@
#include <signaldata/NFCompleteRep.hpp>
#include <signaldata/ApiRegSignalData.hpp>
+#include <mgmapi.h>
+#include <mgmapi_configuration.hpp>
+#include <mgmapi_config_parameters.h>
+
// Just a C wrapper for threadMain
extern "C"
void*
@@ -69,32 +73,49 @@ ClusterMgr::~ClusterMgr(){
}
void
-ClusterMgr::init(const IPCConfig & config){
- NodeId tmp = 0;
- while(config.getNextRemoteNodeId(tmp)) {
+ClusterMgr::init(ndb_mgm_configuration_iterator & iter){
+ for(iter.first(); iter.valid(); iter.next()){
+ Uint32 tmp = 0;
+ if(iter.get(CFG_NODE_ID, &tmp))
+ continue;
+
theNodes[tmp].defined = true;
#if 0
ndbout << "--------------------------------------" << endl;
- config.print();
ndbout << "--------------------------------------" << endl;
ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp));
#endif
- if(strcmp(config.getNodeType(tmp), "DB") == 0) {
+
+ unsigned type;
+ if(iter.get(CFG_TYPE_OF_SECTION, &type))
+ continue;
+
+ switch(type){
+ case NODE_TYPE_DB:
theNodes[tmp].m_info.m_type = NodeInfo::DB;
- } else if(strcmp(config.getNodeType(tmp), "API") == 0) {
+ break;
+ case NODE_TYPE_API:
theNodes[tmp].m_info.m_type = NodeInfo::API;
- } else if(strcmp(config.getNodeType(tmp), "MGM") == 0) {
+ break;
+ case NODE_TYPE_MGM:
theNodes[tmp].m_info.m_type = NodeInfo::MGM;
- } else if(strcmp(config.getNodeType(tmp), "REP") == 0) {
+ break;
+ case NODE_TYPE_REP:
theNodes[tmp].m_info.m_type = NodeInfo::REP;
- } else if(strcmp(config.getNodeType(tmp), "EXTERNAL REP") == 0) {
+ break;
+ case NODE_TYPE_EXT_REP:
theNodes[tmp].m_info.m_type = NodeInfo::REP;
- theNodes[tmp].hbFrequency = config.getREPHBFrequency(tmp);
- assert(100 <= theNodes[tmp].hbFrequency &&
- theNodes[tmp].hbFrequency < 60 * 60 * 1000);
- } else {
+ {
+ Uint32 hbFreq = 10000;
+ //ndb_mgm_get_int_parameter(iter, CFG_, &hbFreq);
+ theNodes[tmp].hbFrequency = hbFreq;
+ assert(100 <= hbFreq && hbFreq < 60 * 60 * 1000);
+ }
+ break;
+ default:
+ type = type;
#if 0
- ndbout_c("ClusterMgr: Unknown node type: %s", config.getNodeType(tmp));
+ ndbout_c("ClusterMgr: Unknown node type: %d", type);
#endif
}
}
@@ -162,45 +183,43 @@ ClusterMgr::threadMain( ){
const NodeId nodeId = i;
Node & theNode = theNodes[nodeId];
- if (theNode.defined == true) {
-#if 0
- ndbout_c("ClusterMgr: compatible %d", (int)nodeId);
-#endif
+ if (!theNode.defined)
+ continue;
- if (theNode.connected == false){
- theFacade.doConnect(nodeId);
- continue;
+ if (theNode.connected == false){
+ theFacade.doConnect(nodeId);
+ continue;
+ }
+
+ if (!theNode.compatible){
+ continue;
+ }
+
+ theNode.hbCounter += timeSlept;
+ if (theNode.hbCounter >= theNode.hbFrequency){
+ /**
+ * It is now time to send a new Heartbeat
+ */
+ theNode.hbSent++;
+ theNode.hbCounter = 0;
+ /**
+ * If the node is of type REP,
+ * then the receiver of the signal should be API_CLUSTERMGR
+ */
+ if (theNode.m_info.m_type == NodeInfo::REP) {
+ signal.theReceiversBlockNumber = API_CLUSTERMGR;
}
-
-#if 0
- ndbout_c("ClusterMgr: connected %d", (int)nodeId);
+#if 0
+ ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
#endif
-
- theNode.hbCounter += timeSlept;
- if (theNode.hbCounter >= theNode.hbFrequency){
- /**
- * It is now time to send a new Heartbeat
- */
- theNode.hbSent++;
- theNode.hbCounter = 0;
- /**
- * If the node is of type REP,
- * then the receiver of the signal should be API_CLUSTERMGR
- */
- if (theNode.m_info.m_type == NodeInfo::REP) {
- signal.theReceiversBlockNumber = API_CLUSTERMGR;
- }
-#if 0
- ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
-#endif
- theFacade.sendSignalUnCond(&signal, nodeId);
- }//if
-
- if (theNode.hbSent == 4 && theNode.hbFrequency > 0){
- reportNodeFailed(i);
- }//if
- }//if(defined)
- }//for
+ theFacade.sendSignalUnCond(&signal, nodeId);
+ }//if
+
+ if (theNode.hbSent == 4 && theNode.hbFrequency > 0){
+ reportNodeFailed(i);
+ }//if
+ }
+
/**
* End of secure area. Let other threads in
*/
@@ -281,6 +300,10 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
+#if 0
+ ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
+#endif
+
assert(nodeId > 0 && nodeId < MAX_NODES);
Node & node = theNodes[nodeId];
diff --git a/ndb/src/ndbapi/ClusterMgr.hpp b/ndb/src/ndbapi/ClusterMgr.hpp
index 7b7b947742b..cc3cf66c8aa 100644
--- a/ndb/src/ndbapi/ClusterMgr.hpp
+++ b/ndb/src/ndbapi/ClusterMgr.hpp
@@ -40,7 +40,7 @@ class ClusterMgr {
public:
ClusterMgr(class TransporterFacade &);
~ClusterMgr();
- void init(const IPCConfig & config);
+ void init(struct ndb_mgm_configuration_iterator & config);
void reportConnected(NodeId nodeId);
void reportDisconnected(NodeId nodeId);
@@ -114,7 +114,7 @@ ClusterMgr::getNoOfConnectedNodes() const {
return noOfConnectedNodes;
}
-/******************************************************************************/
+/*****************************************************************************/
/**
* @class ArbitMgr
diff --git a/ndb/src/ndbapi/Makefile_old b/ndb/src/ndbapi/Makefile_old
index f4c82e5d6ba..648c8cbb016 100644
--- a/ndb/src/ndbapi/Makefile_old
+++ b/ndb/src/ndbapi/Makefile_old
@@ -15,13 +15,16 @@ LIB_TARGET_ARCHIVES := $(ARCHIVE_TARGET) \
transporter \
general \
signaldataprint \
- mgmsrvcommon \
+ mgmapi mgmsrvcommon \
portlib \
logger \
trace
DIRS := signal-sender
+CFLAGS_TransporterFacade.cpp := -I$(call fixpath,$(NDB_TOP)/src/mgmapi)
+CFLAGS_ClusterMgr.cpp := -I$(call fixpath,$(NDB_TOP)/src/mgmapi)
+
# Source files of non-templated classes (.cpp files)
SOURCES = \
TransporterFacade.cpp \
@@ -31,31 +34,25 @@ SOURCES = \
Ndblist.cpp \
Ndbif.cpp \
Ndbinit.cpp \
- Ndberr.cpp \
- ndberror.c \
- NdbErrorOut.cpp \
- NdbConnection.cpp \
+ ndberror.c Ndberr.cpp NdbErrorOut.cpp \
+ NdbConnection.cpp \
NdbConnectionScan.cpp \
NdbOperation.cpp \
NdbOperationSearch.cpp \
- NdbOperationScan.cpp \
NdbOperationInt.cpp \
NdbOperationDefine.cpp \
NdbOperationExec.cpp \
- NdbScanReceiver.cpp \
NdbResultSet.cpp \
- NdbCursorOperation.cpp \
NdbScanOperation.cpp NdbScanFilter.cpp \
NdbIndexOperation.cpp \
NdbEventOperation.cpp \
NdbEventOperationImpl.cpp \
NdbApiSignal.cpp \
NdbRecAttr.cpp \
- NdbSchemaCon.cpp \
- NdbSchemaOp.cpp \
NdbUtil.cpp \
NdbReceiver.cpp \
- NdbDictionary.cpp NdbDictionaryImpl.cpp DictCache.cpp
+ NdbDictionary.cpp NdbDictionaryImpl.cpp DictCache.cpp \
+ NdbSchemaCon.cpp NdbSchemaOp.cpp
include $(NDB_TOP)/Epilogue.mk
diff --git a/ndb/src/ndbapi/Ndb.cpp b/ndb/src/ndbapi/Ndb.cpp
index fe21b4e02a4..e7edf19c0a3 100644
--- a/ndb/src/ndbapi/Ndb.cpp
+++ b/ndb/src/ndbapi/Ndb.cpp
@@ -156,26 +156,22 @@ Ndb::NDB_connect(Uint32 tNode)
tNdbCon->Status(NdbConnection::Connecting); // Set status to connecting
Uint32 nodeSequence;
{ // send and receive signal
- tp->lock_mutex();
+ Guard guard(tp->theMutexPtr);
nodeSequence = tp->getNodeSequence(tNode);
bool node_is_alive = tp->get_node_alive(tNode);
if (node_is_alive) {
tReturnCode = tp->sendSignal(tSignal, tNode);
releaseSignal(tSignal);
- if (tReturnCode == -1) {
- tp->unlock_mutex();
- } else {
+ if (tReturnCode != -1) {
theWaiter.m_node = tNode;
theWaiter.m_state = WAIT_TC_SEIZE;
tReturnCode = receiveResponse();
}//if
} else {
releaseSignal(tSignal);
- tp->unlock_mutex();
tReturnCode = -1;
}//if
}
-
if ((tReturnCode == 0) && (tNdbCon->Status() == NdbConnection::Connected)) {
//************************************************
// Send and receive was successful
@@ -465,41 +461,43 @@ Ndb::closeTransaction(NdbConnection* aConnection)
CHECK_STATUS_MACRO_VOID;
tCon = theTransactionList;
-
+
if (aConnection == tCon) { // Remove the active connection object
- theTransactionList = tCon->next(); // from the transaction list.
+ theTransactionList = tCon->next(); // from the transaction list.
} else {
while (aConnection != tCon) {
if (tCon == NULL) {
//-----------------------------------------------------
// closeTransaction called on non-existing transaction
//-----------------------------------------------------
+
+ if(aConnection->theError.code == 4008){
+ /**
+ * When a SCAN timed-out, returning the NdbConnection leads
+ * to reuse. And TC crashes when the API tries to reuse it to
+ * something else...
+ */
#ifdef VM_TRACE
- printf("Non-existing transaction into closeTransaction\n");
+ printf("Scan timeout:ed NdbConnection-> "
+ "not returning it-> memory leak\n");
+#endif
+ return;
+ }
+
+#ifdef VM_TRACE
+ printf("Non-existing transaction into closeTransaction\n");
abort();
#endif
- return;
+ return;
}//if
tPreviousCon = tCon;
tCon = tCon->next();
}//while
tPreviousCon->next(tCon->next());
}//if
-
+
aConnection->release();
-
- if(aConnection->theError.code == 4008){
- /**
- * When a SCAN timed-out, returning the NdbConnection leads
- * to reuse. And TC crashes when the API tries to reuse it to
- * something else...
- */
-#ifdef VM_TRACE
- printf("Scan timeout:ed NdbConnection-> not returning it-> memory leak\n");
-#endif
- return;
- }
-
+
if(aConnection->theError.code == 4008){
/**
* Something timed-out, returning the NdbConnection leads
@@ -511,7 +509,7 @@ Ndb::closeTransaction(NdbConnection* aConnection)
#endif
return;
}
-
+
if (aConnection->theReleaseOnClose == false) {
/**
* Put it back in idle list for that node
diff --git a/ndb/src/ndbapi/NdbApiSignal.cpp b/ndb/src/ndbapi/NdbApiSignal.cpp
index a44937cd398..d173a462020 100644
--- a/ndb/src/ndbapi/NdbApiSignal.cpp
+++ b/ndb/src/ndbapi/NdbApiSignal.cpp
@@ -46,6 +46,7 @@ Adjust: 971114 UABMNST First version.
#include <signaldata/IndxKeyInfo.hpp>
#include <signaldata/IndxAttrInfo.hpp>
#include <signaldata/TcHbRep.hpp>
+#include <signaldata/ScanTab.hpp>
#include <NdbOut.hpp>
@@ -188,7 +189,7 @@ NdbApiSignal::setSignal(int aNdbSignalType)
theTrace = TestOrd::TraceAPI;
theReceiversBlockNumber = DBTC;
theVerId_signalNumber = GSN_SCAN_TABREQ;
- theLength = 25;
+ theLength = 9; // ScanTabReq::SignalLength;
}
break;
diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp
index fbfd0e99238..8ccd0aa8523 100644
--- a/ndb/src/ndbapi/NdbConnection.cpp
+++ b/ndb/src/ndbapi/NdbConnection.cpp
@@ -27,11 +27,12 @@ Description: Interface between TIS and NDB
Documentation:
Adjust: 971022 UABMNST First version.
*****************************************************************************/
-#include "NdbOut.hpp"
-#include "NdbConnection.hpp"
-#include "NdbOperation.hpp"
-#include "NdbScanOperation.hpp"
-#include "NdbIndexOperation.hpp"
+#include <NdbOut.hpp>
+#include <NdbConnection.hpp>
+#include <NdbOperation.hpp>
+#include <NdbScanOperation.hpp>
+#include <NdbIndexScanOperation.hpp>
+#include <NdbIndexOperation.hpp>
#include "NdbApiSignal.hpp"
#include "TransporterFacade.hpp"
#include "API.hpp"
@@ -79,15 +80,12 @@ NdbConnection::NdbConnection( Ndb* aNdb ) :
theTransactionIsStarted(false),
theDBnode(0),
theReleaseOnClose(false),
- // Cursor operations
+ // Scan operations
m_waitForReply(true),
- m_theFirstCursorOperation(NULL),
- m_theLastCursorOperation(NULL),
- m_firstExecutedCursorOp(NULL),
+ m_theFirstScanOperation(NULL),
+ m_theLastScanOperation(NULL),
+ m_firstExecutedScanOp(NULL),
// Scan operations
- theScanFinished(0),
- theCurrentScanRec(NULL),
- thePreviousScanRec(NULL),
theScanningOp(NULL),
theBuddyConPtr(0xFFFFFFFF)
{
@@ -117,7 +115,6 @@ NdbConnection::init()
theListState = NotInList;
theInUseState = true;
theTransactionIsStarted = false;
- theScanFinished = 0;
theNext = NULL;
theFirstOpInList = NULL;
@@ -128,9 +125,6 @@ NdbConnection::init()
theFirstExecOpInList = NULL;
theLastExecOpInList = NULL;
- theCurrentScanRec = NULL;
- thePreviousScanRec = NULL;
-
theCompletedFirstOp = NULL;
theGlobalCheckpointId = 0;
@@ -146,11 +140,11 @@ NdbConnection::init()
theSimpleState = true;
theSendStatus = InitState;
theMagicNumber = 0x37412619;
- // Cursor operations
+ // Scan operations
m_waitForReply = true;
- m_theFirstCursorOperation = NULL;
- m_theLastCursorOperation = NULL;
- m_firstExecutedCursorOp = 0;
+ m_theFirstScanOperation = NULL;
+ m_theLastScanOperation = NULL;
+ m_firstExecutedScanOp = 0;
theBuddyConPtr = 0xFFFFFFFF;
}//NdbConnection::init()
@@ -331,7 +325,7 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
*/
theError.code = 0;
- NdbCursorOperation* tcOp = m_theFirstCursorOperation;
+ NdbScanOperation* tcOp = m_theFirstScanOperation;
if (tcOp != 0){
// Execute any cursor operations
while (tcOp != NULL) {
@@ -340,14 +334,14 @@ NdbConnection::executeAsynchPrepare( ExecType aTypeOfExec,
if (tReturnCode == -1) {
return;
}//if
- tcOp = (NdbCursorOperation*)tcOp->next();
+ tcOp = (NdbScanOperation*)tcOp->next();
} // while
- m_theLastCursorOperation->next(m_firstExecutedCursorOp);
- m_firstExecutedCursorOp = m_theFirstCursorOperation;
+ m_theLastScanOperation->next(m_firstExecutedScanOp);
+ m_firstExecutedScanOp = m_theFirstScanOperation;
// Discard cursor operations, since these are also
// in the complete operations list we do not need
// to release them.
- m_theFirstCursorOperation = m_theLastCursorOperation = NULL;
+ m_theFirstScanOperation = m_theLastScanOperation = NULL;
}
bool tTransactionIsStarted = theTransactionIsStarted;
@@ -714,17 +708,14 @@ Remark: Release all operations.
******************************************************************************/
void
NdbConnection::release(){
- if (theTransactionIsStarted == true && theScanningOp != NULL )
- stopScan();
-
releaseOperations();
if ( (theTransactionIsStarted == true) &&
- ((theCommitStatus != Committed) &&
- (theCommitStatus != Aborted))) {
-/****************************************************************************
- * The user did not perform any rollback but simply closed the
- * transaction. We must rollback Ndb since Ndb have been contacted.
-******************************************************************************/
+ ((theCommitStatus != Committed) &&
+ (theCommitStatus != Aborted))) {
+ /************************************************************************
+ * The user did not perform any rollback but simply closed the
+ * transaction. We must rollback Ndb since Ndb have been contacted.
+ ************************************************************************/
execute(Rollback);
}//if
theMagicNumber = 0xFE11DC;
@@ -756,8 +747,8 @@ void
NdbConnection::releaseOperations()
{
// Release any open scans
- releaseCursorOperations(m_theFirstCursorOperation);
- releaseCursorOperations(m_firstExecutedCursorOp);
+ releaseScanOperations(m_theFirstScanOperation);
+ releaseScanOperations(m_firstExecutedScanOp);
releaseOps(theCompletedFirstOp);
releaseOps(theFirstOpInList);
@@ -769,9 +760,9 @@ NdbConnection::releaseOperations()
theLastOpInList = NULL;
theLastExecOpInList = NULL;
theScanningOp = NULL;
- m_theFirstCursorOperation = NULL;
- m_theLastCursorOperation = NULL;
- m_firstExecutedCursorOp = NULL;
+ m_theFirstScanOperation = NULL;
+ m_theLastScanOperation = NULL;
+ m_firstExecutedScanOp = NULL;
}//NdbConnection::releaseOperations()
void
@@ -782,24 +773,21 @@ NdbConnection::releaseCompletedOperations()
}//NdbConnection::releaseOperations()
/******************************************************************************
-void releaseCursorOperations();
+void releaseScanOperations();
Remark: Release all cursor operations.
(NdbScanOperation and NdbIndexOperation)
******************************************************************************/
void
-NdbConnection::releaseCursorOperations(NdbCursorOperation* cursorOp)
+NdbConnection::releaseScanOperations(NdbIndexScanOperation* cursorOp)
{
while(cursorOp != 0){
- NdbCursorOperation* next = (NdbCursorOperation*)cursorOp->next();
+ NdbIndexScanOperation* next = (NdbIndexScanOperation*)cursorOp->next();
cursorOp->release();
- if (cursorOp->cursorType() == NdbCursorOperation::ScanCursor)
- theNdb->releaseScanOperation((NdbScanOperation*)cursorOp);
- else
- theNdb->releaseOperation(cursorOp);
+ theNdb->releaseScanOperation(cursorOp);
cursorOp = next;
}
-}//NdbConnection::releaseCursorOperations()
+}//NdbConnection::releaseScanOperations()
/*****************************************************************************
NdbOperation* getNdbOperation(const char* aTableName);
@@ -833,45 +821,6 @@ NdbConnection::getNdbOperation(const char* aTableName)
}//NdbConnection::getNdbOperation()
/*****************************************************************************
-NdbOperation* getNdbOperation(const char* anIndexName, const char* aTableName);
-
-Return Value Return a pointer to a NdbOperation object if getNdbOperation
- was succesful.
- Return NULL : In all other case.
-Parameters: anIndexName : Name of the index to use.
- aTableName : Name of the database table.
-Remark: Get an operation from NdbOperation idlelist and get the
- NdbConnection object
- who was fetch by startTransaction pointing to this operation
- getOperation will set the theTableId in the NdbOperation object.
- synchronous
-******************************************************************************/
-NdbOperation*
-NdbConnection::getNdbOperation(const char* anIndexName, const char* aTableName)
-{
- if ((theError.code == 0) &&
- (theCommitStatus == Started)){
- NdbIndexImpl* index =
- theNdb->theDictionary->getIndex(anIndexName, aTableName);
- NdbTableImpl* table = theNdb->theDictionary->getTable(aTableName);
- NdbTableImpl* indexTable =
- theNdb->theDictionary->getIndexTable(index, table);
- if (indexTable != 0){
- return getNdbOperation(indexTable);
- } else {
- setErrorCode(theNdb->theDictionary->getNdbError().code);
- return NULL;
- }//if
- } else {
- if (theError.code == 0) {
- setOperationErrorCodeAbort(4114);
- }//if
-
- return NULL;
- }//if
-}//NdbConnection::getNdbOperation()
-
-/*****************************************************************************
NdbOperation* getNdbOperation(int aTableId);
Return Value Return a pointer to a NdbOperation object if getNdbOperation
@@ -956,8 +905,9 @@ Remark: Get an operation from NdbScanOperation idlelist and get the NdbC
who was fetch by startTransaction pointing to this operation
getOperation will set the theTableId in the NdbOperation object.synchronous
******************************************************************************/
-NdbScanOperation*
-NdbConnection::getNdbScanOperation(const char* anIndexName, const char* aTableName)
+NdbIndexScanOperation*
+NdbConnection::getNdbIndexScanOperation(const char* anIndexName,
+ const char* aTableName)
{
if (theCommitStatus == Started){
NdbIndexImpl* index =
@@ -966,7 +916,9 @@ NdbConnection::getNdbScanOperation(const char* anIndexName, const char* aTableNa
NdbTableImpl* indexTable =
theNdb->theDictionary->getIndexTable(index, table);
if (indexTable != 0){
- return getNdbScanOperation(indexTable);
+ NdbIndexScanOperation* tOp = getNdbScanOperation(indexTable);
+ if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor;
+ return tOp;
} else {
setOperationErrorCodeAbort(theNdb->theError.code);
return NULL;
@@ -987,21 +939,21 @@ Remark: Get an operation from NdbScanOperation object idlelist and get t
object who was fetch by startTransaction pointing to this operation
getOperation will set the theTableId in the NdbOperation object, synchronous.
*****************************************************************************/
-NdbScanOperation*
+NdbIndexScanOperation*
NdbConnection::getNdbScanOperation(NdbTableImpl * tab)
{
- NdbScanOperation* tOp;
+ NdbIndexScanOperation* tOp;
tOp = theNdb->getScanOperation();
if (tOp == NULL)
goto getNdbOp_error1;
// Link scan operation into list of cursor operations
- if (m_theLastCursorOperation == NULL)
- m_theFirstCursorOperation = m_theLastCursorOperation = tOp;
+ if (m_theLastScanOperation == NULL)
+ m_theFirstScanOperation = m_theLastScanOperation = tOp;
else {
- m_theLastCursorOperation->next(tOp);
- m_theLastCursorOperation = tOp;
+ m_theLastScanOperation->next(tOp);
+ m_theLastScanOperation = tOp;
}
tOp->next(NULL);
if (tOp->init(tab, this) != -1) {
@@ -1211,12 +1163,12 @@ Remark:
int
NdbConnection::receiveTC_COMMITCONF(const TcCommitConf * commitConf)
{
- if(theStatus != Connected){
- return -1;
+ if(checkState_TransId(&commitConf->transId1)){
+ theCommitStatus = Committed;
+ theCompletionStatus = CompletedSuccess;
+ return 0;
}
- theCommitStatus = Committed;
- theCompletionStatus = CompletedSuccess;
- return 0;
+ return -1;
}//NdbConnection::receiveTC_COMMITCONF()
/******************************************************************************
@@ -1230,33 +1182,33 @@ Remark:
int
NdbConnection::receiveTC_COMMITREF(NdbApiSignal* aSignal)
{
- if(theStatus != Connected){
- return -1;
+ const TcCommitRef * ref = CAST_CONSTPTR(TcCommitRef, aSignal->getDataPtr());
+ if(checkState_TransId(&ref->transId1)){
+ setOperationErrorCodeAbort(ref->errorCode);
+ theCommitStatus = Aborted;
+ theCompletionStatus = CompletedFailure;
+ return 0;
}
- const TcCommitRef * const ref = CAST_CONSTPTR(TcCommitRef, aSignal->getDataPtr());
- setOperationErrorCodeAbort(ref->errorCode);
- theCommitStatus = Aborted;
- theCompletionStatus = CompletedFailure;
- return 0;
+ return -1;
}//NdbConnection::receiveTC_COMMITREF()
-/*******************************************************************************
+/******************************************************************************
int receiveTCROLLBACKCONF(NdbApiSignal* aSignal);
Return Value: Return 0 : receiveTCROLLBACKCONF was successful.
Return -1: In all other case.
Parameters: aSignal: The signal object pointer.
Remark:
-*******************************************************************************/
+******************************************************************************/
int
NdbConnection::receiveTCROLLBACKCONF(NdbApiSignal* aSignal)
{
- if(theStatus != Connected){
- return -1;
+ if(checkState_TransId(aSignal->getDataPtr() + 1)){
+ theCommitStatus = Aborted;
+ theCompletionStatus = CompletedSuccess;
+ return 0;
}
- theCommitStatus = Aborted;
- theCompletionStatus = CompletedSuccess;
- return 0;
+ return -1;
}//NdbConnection::receiveTCROLLBACKCONF()
/*******************************************************************************
@@ -1270,13 +1222,13 @@ Remark:
int
NdbConnection::receiveTCROLLBACKREF(NdbApiSignal* aSignal)
{
- if(theStatus != Connected){
- return -1;
+ if(checkState_TransId(aSignal->getDataPtr() + 1)){
+ setOperationErrorCodeAbort(aSignal->readData(4));
+ theCommitStatus = Aborted;
+ theCompletionStatus = CompletedFailure;
+ return 0;
}
- setOperationErrorCodeAbort(aSignal->readData(2));
- theCommitStatus = Aborted;
- theCompletionStatus = CompletedFailure;
- return 0;
+ return -1;
}//NdbConnection::receiveTCROLLBACKREF()
/*****************************************************************************
@@ -1291,36 +1243,26 @@ Remark: Handles the reception of the ROLLBACKREP signal.
int
NdbConnection::receiveTCROLLBACKREP( NdbApiSignal* aSignal)
{
- Uint64 tRecTransId, tCurrTransId;
- Uint32 tTmp1, tTmp2;
-
- if (theStatus != Connected) {
- return -1;
- }//if
-/*****************************************************************************
+ /****************************************************************************
Check that we are expecting signals from this transaction and that it doesn't
belong to a transaction already completed. Simply ignore messages from other
transactions.
-******************************************************************************/
- tTmp1 = aSignal->readData(2);
- tTmp2 = aSignal->readData(3);
- tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
- tCurrTransId = this->getTransactionId();
- if (tCurrTransId != tRecTransId) {
- return -1;
- }//if
- theError.code = aSignal->readData(4); // Override any previous errors
-
-/**********************************************************************/
-/* A serious error has occured. This could be due to deadlock or */
-/* lack of resources or simply a programming error in NDB. This */
-/* transaction will be aborted. Actually it has already been */
-/* and we only need to report completion and return with the */
-/* error code to the application. */
-/**********************************************************************/
- theCompletionStatus = CompletedFailure;
- theCommitStatus = Aborted;
- return 0;
+ ****************************************************************************/
+ if(checkState_TransId(aSignal->getDataPtr() + 1)){
+ theError.code = aSignal->readData(4);// Override any previous errors
+
+ /**********************************************************************/
+ /* A serious error has occured. This could be due to deadlock or */
+ /* lack of resources or simply a programming error in NDB. This */
+ /* transaction will be aborted. Actually it has already been */
+ /* and we only need to report completion and return with the */
+ /* error code to the application. */
+ /**********************************************************************/
+ theCompletionStatus = CompletedFailure;
+ theCommitStatus = Aborted;
+ return 0;
+ }
+ return -1;
}//NdbConnection::receiveTCROLLBACKREP()
/*******************************************************************************
@@ -1334,47 +1276,38 @@ Remark:
int
NdbConnection::receiveTCKEYCONF(const TcKeyConf * keyConf, Uint32 aDataLength)
{
- Uint64 tRecTransId;
- NdbOperation* tOp;
- Uint32 tConditionFlag;
-
+ NdbReceiver* tOp;
const Uint32 tTemp = keyConf->confInfo;
- const Uint32 tTmp1 = keyConf->transId1;
- const Uint32 tTmp2 = keyConf->transId2;
-/******************************************************************************
+ /***************************************************************************
Check that we are expecting signals from this transaction and that it
doesn't belong to a transaction already completed. Simply ignore messages
from other transactions.
-******************************************************************************/
- tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
+ ***************************************************************************/
+ if(checkState_TransId(&keyConf->transId1)){
- const Uint32 tNoOfOperations = TcKeyConf::getNoOfOperations(tTemp);
- const Uint32 tCommitFlag = TcKeyConf::getCommitFlag(tTemp);
- tConditionFlag = (Uint32)(((aDataLength - 5) >> 1) - tNoOfOperations);
- tConditionFlag |= (Uint32)(tNoOfOperations > 10);
- tConditionFlag |= (Uint32)(tNoOfOperations <= 0);
- tConditionFlag |= (Uint32)(theTransactionId - tRecTransId);
- tConditionFlag |= (Uint32)(theStatus - Connected);
+ const Uint32 tNoOfOperations = TcKeyConf::getNoOfOperations(tTemp);
+ const Uint32 tCommitFlag = TcKeyConf::getCommitFlag(tTemp);
- if (tConditionFlag == 0) {
const Uint32* tPtr = (Uint32 *)&keyConf->operations[0];
+ Uint32 tNoComp = theNoOfOpCompleted;
for (Uint32 i = 0; i < tNoOfOperations ; i++) {
- tOp = theNdb->void2rec_op(theNdb->int2void(*tPtr));
+ tOp = theNdb->void2rec(theNdb->int2void(*tPtr));
tPtr++;
const Uint32 tAttrInfoLen = *tPtr;
tPtr++;
- if (tOp && tOp->checkMagicNumber() != -1) {
- tOp->TCOPCONF(tAttrInfoLen);
+ if (tOp && tOp->checkMagicNumber()) {
+ tNoComp += tOp->execTCOPCONF(tAttrInfoLen);
} else {
return -1;
}//if
}//for
- Uint32 tNoComp = theNoOfOpCompleted;
Uint32 tNoSent = theNoOfOpSent;
+ theNoOfOpCompleted = tNoComp;
Uint32 tGCI = keyConf->gci;
if (tCommitFlag == 1) {
theCommitStatus = Committed;
theGlobalCheckpointId = tGCI;
+ theTransactionId++;
} else if ((tNoComp >= tNoSent) &&
(theLastExecOpInList->theCommitIndicator == 1)){
/**********************************************************************/
@@ -1406,50 +1339,46 @@ Remark: Handles the reception of the TCKEY_FAILCONF signal.
int
NdbConnection::receiveTCKEY_FAILCONF(const TcKeyFailConf * failConf)
{
- Uint64 tRecTransId, tCurrTransId;
- Uint32 tTmp1, tTmp2;
NdbOperation* tOp;
- if (theStatus != Connected) {
- return -1;
- }//if
/*
- Check that we are expecting signals from this transaction and that it
- doesn't belong to a transaction already completed. Simply ignore
- messages from other transactions.
+ Check that we are expecting signals from this transaction and that it
+ doesn't belong to a transaction already completed. Simply ignore
+ messages from other transactions.
*/
- tTmp1 = failConf->transId1;
- tTmp2 = failConf->transId2;
- tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
- tCurrTransId = this->getTransactionId();
- if (tCurrTransId != tRecTransId) {
- return -1;
- }//if
- /*
- A node failure of the TC node occured. The transaction has
- been committed.
- */
- theCommitStatus = Committed;
- tOp = theFirstExecOpInList;
- while (tOp != NULL) {
+ if(checkState_TransId(&failConf->transId1)){
/*
- Check if the transaction expected read values...
- If it did some of them might have gotten lost even if we succeeded
- in committing the transaction.
+ A node failure of the TC node occured. The transaction has
+ been committed.
*/
- if (tOp->theAI_ElementLen != 0) {
- theCompletionStatus = CompletedFailure;
- setOperationErrorCodeAbort(4115);
- break;
- }//if
- if (tOp->theCurrentRecAttr != NULL) {
- theCompletionStatus = CompletedFailure;
- setOperationErrorCodeAbort(4115);
- break;
- }//if
- tOp = tOp->next();
- }//while
- theReleaseOnClose = true;
- return 0;
+ theCommitStatus = Committed;
+ tOp = theFirstExecOpInList;
+ while (tOp != NULL) {
+ /*
+ * Check if the transaction expected read values...
+ * If it did some of them might have gotten lost even if we succeeded
+ * in committing the transaction.
+ */
+ switch(tOp->theOperationType){
+ case UpdateRequest:
+ case InsertRequest:
+ case DeleteRequest:
+ case WriteRequest:
+ tOp = tOp->next();
+ break;
+ case ReadRequest:
+ case ReadExclusive:
+ case OpenScanRequest:
+ case OpenRangeScanRequest:
+ theCompletionStatus = CompletedFailure;
+ setOperationErrorCodeAbort(4115);
+ tOp = NULL;
+ break;
+ }//if
+ }//while
+ theReleaseOnClose = true;
+ return 0;
+ }
+ return -1;
}//NdbConnection::receiveTCKEY_FAILCONF()
/*************************************************************************
@@ -1464,101 +1393,75 @@ Remark: Handles the reception of the TCKEY_FAILREF signal.
int
NdbConnection::receiveTCKEY_FAILREF(NdbApiSignal* aSignal)
{
- Uint64 tRecTransId, tCurrTransId;
- Uint32 tTmp1, tTmp2;
-
- if (theStatus != Connected) {
- return -1;
- }//if
/*
- Check that we are expecting signals from this transaction and
- that it doesn't belong to a transaction already
- completed. Simply ignore messages from other transactions.
+ Check that we are expecting signals from this transaction and
+ that it doesn't belong to a transaction already
+ completed. Simply ignore messages from other transactions.
*/
- tTmp1 = aSignal->readData(2);
- tTmp2 = aSignal->readData(3);
- tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
- tCurrTransId = this->getTransactionId();
- if (tCurrTransId != tRecTransId) {
- return -1;
- }//if
- /*
- We received an indication of that this transaction was aborted due to a
- node failure.
- */
- if (theSendStatus == sendTC_ROLLBACK) {
+ if(checkState_TransId(aSignal->getDataPtr()+1)){
/*
- We were in the process of sending a rollback anyways. We will
- report it as a success.
+ We received an indication of that this transaction was aborted due to a
+ node failure.
*/
- theCompletionStatus = CompletedSuccess;
- } else {
- theCompletionStatus = CompletedFailure;
- theError.code = 4031;
- }//if
- theReleaseOnClose = true;
- theCommitStatus = Aborted;
- return 0;
+ if (theSendStatus == sendTC_ROLLBACK) {
+ /*
+ We were in the process of sending a rollback anyways. We will
+ report it as a success.
+ */
+ theCompletionStatus = CompletedSuccess;
+ } else {
+ theCompletionStatus = CompletedFailure;
+ theError.code = 4031;
+ }//if
+ theReleaseOnClose = true;
+ theCommitStatus = Aborted;
+ return 0;
+ }
+ return -1;
}//NdbConnection::receiveTCKEY_FAILREF()
-/*******************************************************************************
+/******************************************************************************
int receiveTCINDXCONF(NdbApiSignal* aSignal, Uint32 long_short_ind);
Return Value: Return 0 : receiveTCINDXCONF was successful.
Return -1: In all other case.
Parameters: aSignal: The signal object pointer.
Remark:
-*******************************************************************************/
+******************************************************************************/
int
-NdbConnection::receiveTCINDXCONF(const TcIndxConf * indxConf, Uint32 aDataLength)
+NdbConnection::receiveTCINDXCONF(const TcIndxConf * indxConf,
+ Uint32 aDataLength)
{
- Uint64 tRecTransId;
- Uint32 tConditionFlag;
-
- const Uint32 tTemp = indxConf->confInfo;
- const Uint32 tTmp1 = indxConf->transId1;
- const Uint32 tTmp2 = indxConf->transId2;
-/******************************************************************************
-Check that we are expecting signals from this transaction and that it
-doesn't belong to a transaction already completed. Simply ignore messages
-from other transactions.
-******************************************************************************/
- tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
-
- const Uint32 tNoOfOperations = TcIndxConf::getNoOfOperations(tTemp);
- const Uint32 tCommitFlag = TcKeyConf::getCommitFlag(tTemp);
-
- tConditionFlag = (Uint32)(((aDataLength - 5) >> 1) - tNoOfOperations);
- tConditionFlag |= (Uint32)(tNoOfOperations > 10);
- tConditionFlag |= (Uint32)(tNoOfOperations <= 0);
- tConditionFlag |= (Uint32)(theTransactionId - tRecTransId);
- tConditionFlag |= (Uint32)(theStatus - Connected);
-
- if (tConditionFlag == 0) {
+ if(checkState_TransId(&indxConf->transId1)){
+ const Uint32 tTemp = indxConf->confInfo;
+ const Uint32 tNoOfOperations = TcIndxConf::getNoOfOperations(tTemp);
+ const Uint32 tCommitFlag = TcKeyConf::getCommitFlag(tTemp);
+
const Uint32* tPtr = (Uint32 *)&indxConf->operations[0];
+ Uint32 tNoComp = theNoOfOpCompleted;
for (Uint32 i = 0; i < tNoOfOperations ; i++) {
- NdbIndexOperation* tOp = theNdb->void2rec_iop(theNdb->int2void(*tPtr));
+ NdbReceiver* tOp = theNdb->void2rec(theNdb->int2void(*tPtr));
tPtr++;
const Uint32 tAttrInfoLen = *tPtr;
tPtr++;
- if (tOp && tOp->checkMagicNumber() != -1) {
- tOp->TCOPCONF(tAttrInfoLen);
+ if (tOp && tOp->checkMagicNumber()) {
+ tNoComp += tOp->execTCOPCONF(tAttrInfoLen);
} else {
return -1;
}//if
}//for
- Uint32 tNoComp = theNoOfOpCompleted;
Uint32 tNoSent = theNoOfOpSent;
Uint32 tGCI = indxConf->gci;
+ theNoOfOpCompleted = tNoComp;
if (tCommitFlag == 1) {
theCommitStatus = Committed;
theGlobalCheckpointId = tGCI;
} else if ((tNoComp >= tNoSent) &&
(theLastExecOpInList->theCommitIndicator == 1)){
-/**********************************************************************/
-// We sent the transaction with Commit flag set and received a CONF with
-// no Commit flag set. This is clearly an anomaly.
-/**********************************************************************/
+ /**********************************************************************/
+ // We sent the transaction with Commit flag set and received a CONF with
+ // no Commit flag set. This is clearly an anomaly.
+ /**********************************************************************/
theError.code = 4011;
theCompletionStatus = CompletedFailure;
theCommitStatus = Aborted;
@@ -1584,36 +1487,21 @@ Remark: Handles the reception of the TCINDXREF signal.
int
NdbConnection::receiveTCINDXREF( NdbApiSignal* aSignal)
{
- Uint64 tRecTransId, tCurrTransId;
- Uint32 tTmp1, tTmp2;
-
- if (theStatus != Connected) {
- return -1;
- }//if
-/*****************************************************************************
-Check that we are expecting signals from this transaction and that it doesn't
-belong to a transaction already completed. Simply ignore messages from other
-transactions.
-******************************************************************************/
- tTmp1 = aSignal->readData(2);
- tTmp2 = aSignal->readData(3);
- tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
- tCurrTransId = this->getTransactionId();
- if (tCurrTransId != tRecTransId) {
- return -1;
- }//if
- theError.code = aSignal->readData(4); // Override any previous errors
-
-/**********************************************************************/
-/* A serious error has occured. This could be due to deadlock or */
-/* lack of resources or simply a programming error in NDB. This */
-/* transaction will be aborted. Actually it has already been */
-/* and we only need to report completion and return with the */
-/* error code to the application. */
-/**********************************************************************/
- theCompletionStatus = CompletedFailure;
- theCommitStatus = Aborted;
- return 0;
+ if(checkState_TransId(aSignal->getDataPtr()+1)){
+ theError.code = aSignal->readData(4); // Override any previous errors
+
+ /**********************************************************************/
+ /* A serious error has occured. This could be due to deadlock or */
+ /* lack of resources or simply a programming error in NDB. This */
+ /* transaction will be aborted. Actually it has already been */
+ /* and we only need to report completion and return with the */
+ /* error code to the application. */
+ /**********************************************************************/
+ theCompletionStatus = CompletedFailure;
+ theCommitStatus = Aborted;
+ return 0;
+ }
+ return -1;
}//NdbConnection::receiveTCINDXREF()
/*******************************************************************************
diff --git a/ndb/src/ndbapi/NdbConnectionScan.cpp b/ndb/src/ndbapi/NdbConnectionScan.cpp
index 962acc0bdac..ea45f2b5a00 100644
--- a/ndb/src/ndbapi/NdbConnectionScan.cpp
+++ b/ndb/src/ndbapi/NdbConnectionScan.cpp
@@ -33,7 +33,6 @@
#include <NdbConnection.hpp>
#include <NdbOperation.hpp>
#include <NdbScanOperation.hpp>
-#include "NdbScanReceiver.hpp"
#include "NdbApiSignal.hpp"
#include "TransporterFacade.hpp"
#include "NdbUtil.hpp"
@@ -49,299 +48,6 @@
#define WAITFOR_SCAN_TIMEOUT 120000
-/*****************************************************************************
- * int executeScan();
- *
- * 1. Check that the transaction is started and other important preconditions
- * 2. Tell the kernel to start scanning by sending one SCAN_TABREQ, if
- * parallelism is greater than 16 also send one SCAN_TABINFO for each
- * additional 16
- * Define which attributes to scan in ATTRINFO, this signal also holds the
- * interpreted program
- * 3. Wait for the answer of the SCAN_TABREQ. This is either a SCAN_TABCONF if
- * the scan was correctly defined and a SCAN_TABREF if the scan couldn't
- * be started.
- * 4. Check the result, if scan was not started return -1
- *
- ****************************************************************************/
-int
-NdbConnection::executeScan(){
- if (theTransactionIsStarted == true){ // Transaction already started.
- setErrorCode(4600);
- return -1;
- }
- if (theStatus != Connected) { // Lost connection
- setErrorCode(4601);
- return -1;
- }
- if (theScanningOp == NULL){
- setErrorCode(4602); // getNdbOperation must be called before executeScan
- return -1;
- }
- TransporterFacade* tp = TransporterFacade::instance();
- theNoOfOpCompleted = 0;
- theNoOfSCANTABCONFRecv = 0;
- tp->lock_mutex();
- if (tp->get_node_alive(theDBnode) &&
- (tp->getNodeSequence(theDBnode) == theNodeSequence)) {
- if (tp->check_send_size(theDBnode, get_send_size())) {
- theTransactionIsStarted = true;
- if (sendScanStart() == -1){
- tp->unlock_mutex();
- return -1;
- }//if
- theNdb->theWaiter.m_node = theDBnode;
- theNdb->theWaiter.m_state = WAIT_SCAN;
- int res = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
- if (res == 0) {
- return 0;
- } else {
- if (res == -1) {
- setErrorCode(4008);
- } else if (res == -2) {
- theTransactionIsStarted = false;
- theReleaseOnClose = true;
- setErrorCode(4028);
- } else {
- ndbout << "Impossible return from receiveResponse in executeScan";
- ndbout << endl;
- abort();
- }//if
- theCommitStatus = Aborted;
- return -1;
- }//if
- } else {
- TRACE_DEBUG("Start a scan with send buffer full attempted");
- setErrorCode(4022);
- theCommitStatus = Aborted;
- }//if
- } else {
- if (!(tp->get_node_stopping(theDBnode) &&
- (tp->getNodeSequence(theDBnode) == theNodeSequence))) {
- TRACE_DEBUG("The node is hard dead when attempting to start a scan");
- setErrorCode(4029);
- theReleaseOnClose = true;
- } else {
- TRACE_DEBUG("The node is stopping when attempting to start a scan");
- setErrorCode(4030);
- }//if
- theCommitStatus = Aborted;
- }//if
- tp->unlock_mutex();
- return -1;
-}
-
-/******************************************************************************
- * int nextScanResult();
- * Remark:
- * This method is used to distribute data received to the application.
- * Iterate through the list and search for operations that haven't
- * been distributed yet (status != Finished).
- * If there are no more operations/records still waiting to be exececuted
- * we have to send SCAN_NEXTREQ to fetch next set of records.
- *
- * TODO - This function should be able to return a value indicating if
- * there are any more records already fetched from memory or if it has to
- * ask the db for more. This would mean we could get better performance when
- * takeOver is used wince we can take over all ops already fetched, put them
- * in another trans and send them of to the db when there are no more records
- * already fetched. Maybe use a new argument to the function for this
-******************************************************************************/
-int
-NdbConnection::nextScanResult(bool fetchAllowed){
-
- if (theTransactionIsStarted != true){ // Transaction not started.
- setErrorCode(4601);
- return -1;
- }
- // Scan has finished ok but no operations recived = empty recordset.
- if(theScanFinished == true){
- return 1; // No more records
- }
- if (theStatus != Connected){// Lost connection
- setErrorCode(4601);
- return -1;
- }
- // Something went wrong, probably we got a SCAN_TABREF earlier.
- if (theCompletionStatus == CompletedFailure) {
- return -1;
- }
- if (theNoOfOpCompleted == theNoOfOpFetched) {
- // There are no more records cached in NdbApi
- if (fetchAllowed == true){
- // Get some more records from db
-
- if (fetchNextScanResult() == -1){
- return -1;
- }
- if (theScanFinished == true) { // The scan has finished.
- return 1; // 1 = No more records
- }
- if (theCompletionStatus == CompletedFailure) {
- return -1; // Something went wrong, probably we got a SCAN_TABREF.
- }
- } else {
- // There where no more cached records in NdbApi
- // and we where not allowed to go to db and ask for
- // more
- return 2;
- }
- }
-
- // It's not allowed to come here without any cached records
- if (theCurrentScanRec == NULL){
-#ifdef VM_TRACE
- ndbout << "nextScanResult("<<fetchAllowed<<")"<<endl
- << " theTransactionIsStarted = " << theTransactionIsStarted << endl
- << " theScanFinished = " << theScanFinished << endl
- << " theCommitStatus = " << theCommitStatus << endl
- << " theStatus = " << theStatus << endl
- << " theCompletionStatus = " << theCompletionStatus << endl
- << " theNoOfOpCompleted = " << theNoOfOpCompleted << endl
- << " theNoOfOpFetched = " << theNoOfOpFetched << endl
- << " theScanningOp = " << theScanningOp << endl
- << " theNoOfSCANTABCONFRecv = "<< theNoOfSCANTABCONFRecv << endl
- << " theNdb->theWaiter.m_node = " <<theNdb->theWaiter.m_node<<endl
- << " theNdb->theWaiter.m_state = " << theNdb->theWaiter.m_state << endl;
- abort();
-#endif
- return -1;
- }
-
- // Execute the saved signals for this operation.
- NdbScanReceiver* tScanRec = theCurrentScanRec;
- theScanningOp->theCurrRecAI_Len = 0;
- theScanningOp->theCurrentRecAttr = theScanningOp->theFirstRecAttr;
- if(tScanRec->executeSavedSignals() != 0)
- return -1;
- theNoOfOpCompleted++;
- // Remember for next iteration and takeOverScanOp
- thePreviousScanRec = tScanRec;
- theCurrentScanRec = tScanRec->next();
- return 0; // 0 = There are more rows to be fetched.
-}
-
-/******************************************************************************
- * int stopScan()
- * Remark: By sending SCAN_NEXTREQ with data word 2 set to TRUE we
- * abort the scan process.
- *****************************************************************************/
-int
-NdbConnection::stopScan()
-{
- if(theScanFinished == true){
- return 0;
- }
- if (theCompletionStatus == CompletedFailure){
- return 0;
- }
-
- if (theScanningOp == 0){
- return 0;
- }
-
- theNoOfOpCompleted = 0;
- theNoOfSCANTABCONFRecv = 0;
- theScanningOp->prepareNextScanResult();
- return sendScanNext(1);
-}
-
-
-/********************************************************************
- * int sendScanStart()
- *
- * Send the signals reuired to define and start the scan
- * 1. Send SCAN_TABREQ
- * 2. Send SCAN_TABINFO(if any, parallelism must be > 16)
- * 3. Send ATTRINFO signals
- *
- * Returns -1 if an error occurs otherwise 0.
- *
- ********************************************************************/
-int
-NdbConnection::sendScanStart(){
-
- /***** 0. Prepare signals ******************/
- // This might modify variables and signals
- if(theScanningOp->prepareSendScan(theTCConPtr,
- theTransactionId) == -1)
- return -1;
-
- /***** 1. Send SCAN_TABREQ **************/
- /***** 2. Send SCAN_TABINFO *************/
- /***** 3. Send ATTRINFO signals *********/
- if (theScanningOp->doSendScan(theDBnode) == -1)
- return -1;
- return 0;
-}
-
-
-int
-NdbConnection::fetchNextScanResult(){
- theNoOfOpCompleted = 0;
- theNoOfSCANTABCONFRecv = 0;
- theScanningOp->prepareNextScanResult();
- return sendScanNext(0);
-}
-
-
-
-/***********************************************************
- * int sendScanNext(int stopScanFlag)
- *
- * ************************************************************/
-int NdbConnection::sendScanNext(bool stopScanFlag){
- NdbApiSignal tSignal(theNdb->theMyRef);
- Uint32 tTransId1, tTransId2;
- tSignal.setSignal(GSN_SCAN_NEXTREQ);
- tSignal.setData(theTCConPtr, 1);
- // Set the stop flag in word 2(1 = stop)
- Uint32 tStopValue;
- tStopValue = stopScanFlag == true ? 1 : 0;
- tSignal.setData(tStopValue, 2);
- tTransId1 = (Uint32) theTransactionId;
- tTransId2 = (Uint32) (theTransactionId >> 32);
- tSignal.setData(tTransId1, 3);
- tSignal.setData(tTransId2, 4);
- tSignal.setLength(4);
- Uint32 conn_seq = theNodeSequence;
- int return_code = theNdb->sendRecSignal(theDBnode,
- WAIT_SCAN,
- &tSignal,
- conn_seq);
- if (return_code == 0) {
- return 0;
- } else if (return_code == -1) { // Time-out
- TRACE_DEBUG("Time-out when sending sendScanNext");
- setErrorCode(4024);
- theTransactionIsStarted = false;
- theReleaseOnClose = true;
- theCommitStatus = Aborted;
- } else if (return_code == -2) { // Node failed
- TRACE_DEBUG("Node failed when sendScanNext");
- setErrorCode(4027);
- theTransactionIsStarted = false;
- theReleaseOnClose = true;
- theCommitStatus = Aborted;
- } else if (return_code == -3) {
- TRACE_DEBUG("Send failed when sendScanNext");
- setErrorCode(4033);
- theTransactionIsStarted = false;
- theReleaseOnClose = true;
- theCommitStatus = Aborted;
- } else if (return_code == -4) {
- TRACE_DEBUG("Send buffer full when sendScanNext");
- setErrorCode(4032);
- } else if (return_code == -5) {
- TRACE_DEBUG("Node stopping when sendScanNext");
- setErrorCode(4034);
- } else {
- ndbout << "Impossible return from sendRecSignal" << endl;
- abort();
- }//if
- return -1;
-}
-
/***************************************************************************
* int receiveSCAN_TABREF(NdbApiSignal* aSignal)
@@ -352,39 +58,13 @@ int NdbConnection::sendScanNext(bool stopScanFlag){
****************************************************************************/
int
NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){
- const ScanTabRef * const scanTabRef = CAST_CONSTPTR(ScanTabRef, aSignal->getDataPtr());
- if (theStatus != Connected){
-#ifdef VM_TRACE
- ndbout << "SCAN_TABREF dropped, theStatus = " << theStatus << endl;
-#endif
- return -1;
- }
- if (aSignal->getLength() != ScanTabRef::SignalLength){
-#ifdef VM_TRACE
- ndbout << "SCAN_TABREF dropped, signal length " << aSignal->getLength() << endl;
-#endif
- return -1;
- }
- const Uint64 tCurrTransId = this->getTransactionId();
- const Uint64 tRecTransId = (Uint64)scanTabRef->transId1 +
- ((Uint64)scanTabRef->transId2 << 32);
- if ((tRecTransId - tCurrTransId) != (Uint64)0){
-#ifdef VM_TRACE
- ndbout << "SCAN_TABREF dropped, wrong transid" << endl;
-#endif
- return -1;
+ const ScanTabRef * ref = CAST_CONSTPTR(ScanTabRef, aSignal->getDataPtr());
+
+ if(checkState_TransId(&ref->transId1)){
+ theScanningOp->execCLOSE_SCAN_REP(ref->errorCode);
+ return 0;
}
-#if 0
- ndbout << "SCAN_TABREF, "
- <<"transid=("<<hex<<scanTabRef->transId1<<", "<<hex<<scanTabRef->transId2<<")"
- <<", err="<<dec<<scanTabRef->errorCode << endl;
-#endif
- setErrorCode(scanTabRef->errorCode);
- theCompletionStatus = CompletedFailure;
- theCommitStatus = Aborted; // Indicate that this "transaction" was aborted
- theTransactionIsStarted = false;
- theScanningOp->releaseSignals();
- return 0;
+ return -1;
}
/*****************************************************************************
@@ -401,173 +81,43 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){
*
*****************************************************************************/
int
-NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal)
-{
- const ScanTabConf * const conf = CAST_CONSTPTR(ScanTabConf, aSignal->getDataPtr());
- if (theStatus != Connected){
-#ifdef VM_TRACE
- ndbout << "Dropping SCAN_TABCONF, theStatus = "<< theStatus << endl;
-#endif
- return -1;
- }
- if(aSignal->getLength() != ScanTabConf::SignalLength){
-#ifdef VM_TRACE
- ndbout << "Dropping SCAN_TABCONF, getLength = "<< aSignal->getLength() << endl;
-#endif
- return -1;
- }
- const Uint64 tCurrTransId = this->getTransactionId();
- const Uint64 tRecTransId =
- (Uint64)conf->transId1 + ((Uint64)conf->transId2 << 32);
- if ((tRecTransId - tCurrTransId) != (Uint64)0){
-#ifdef VM_TRACE
- ndbout << "Dropping SCAN_TABCONF, wrong transid" << endl;
-#endif
- return -1;
- }
-
- const Uint8 scanStatus =
- ScanTabConf::getScanStatus(conf->requestInfo);
-
- if (scanStatus != 0) {
- theCompletionStatus = CompletedSuccess;
- theCommitStatus = Committed;
- theScanFinished = true;
- return 0;
- }
-
- // There can only be one SCANTABCONF
- assert(theNoOfSCANTABCONFRecv == 0);
- theNoOfSCANTABCONFRecv++;
-
- // Save a copy of the signal
- NdbApiSignal * tCopy = new NdbApiSignal(0);//getSignal();
- if (tCopy == NULL){
- setErrorCode(4000);
- return 2; // theWaiter.m_state = NO_WAIT
- }
- tCopy->copyFrom(aSignal);
- tCopy->next(NULL);
- theScanningOp->theSCAN_TABCONF_Recv = tCopy;
-
- return checkNextScanResultComplete();
-
-}
-
-/*****************************************************************************
- * int receiveSCAN_TABINFO(NdbApiSignal* aSignal)
- *
- * Receive SCAN_TABINFO
- *
- *****************************************************************************/
-int
-NdbConnection::receiveSCAN_TABINFO(NdbApiSignal* aSignal)
+NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
+ const Uint32 * ops, Uint32 len)
{
- if (theStatus != Connected){
- //ndbout << "SCAN_TABINFO dropped, theStatus = " << theStatus << endl;
- return -1;
- }
- if (aSignal->getLength() != ScanTabInfo::SignalLength){
- //ndbout << "SCAN_TABINFO dropped, length = " << aSignal->getLength() << endl;
- return -1;
- }
-
- NdbApiSignal * tCopy = new NdbApiSignal(0);//getSignal();
- if (tCopy == NULL){
- setErrorCode(4000);
- return 2; // theWaiter.m_state = NO_WAIT
- }
- tCopy->copyFrom(aSignal);
- tCopy->next(NULL);
-
- // Put the signal last in list
- if (theScanningOp->theFirstSCAN_TABINFO_Recv == NULL)
- theScanningOp->theFirstSCAN_TABINFO_Recv = tCopy;
- else
- theScanningOp->theLastSCAN_TABINFO_Recv->next(tCopy);
- theScanningOp->theLastSCAN_TABINFO_Recv = tCopy;
-
- return checkNextScanResultComplete();
-}
-
-/******************************************************************************
- * int checkNextScanResultComplete(NdbApiSignal* aSignal)
- *
- * Remark Traverses all the lists that are associated with
- * this resultset and checks if all signals are there.
- * If all required signal are received return 0
- *
- *
- *****************************************************************************/
-int
-NdbConnection::checkNextScanResultComplete(){
-
- if (theNoOfSCANTABCONFRecv != 1) {
- return -1;
- }
-
- Uint32 tNoOfOpFetched = 0;
- theCurrentScanRec = NULL;
- thePreviousScanRec = NULL;
-
- const ScanTabConf * const conf =
- CAST_CONSTPTR(ScanTabConf, theScanningOp->theSCAN_TABCONF_Recv->getDataPtr());
- const Uint32 numOperations = ScanTabConf::getOperations(conf->requestInfo);
- Uint32 sigIndex = 0;
- NdbApiSignal* tSignal = theScanningOp->theFirstSCAN_TABINFO_Recv;
- while(tSignal != NULL){
- const ScanTabInfo * const info = CAST_CONSTPTR(ScanTabInfo, tSignal->getDataPtr());
- // Loop through the operations for this SCAN_TABINFO
- // tOpAndLength is allowed to be zero, this means no
- // TRANSID_AI signals where sent for this record
- // I.e getValue was called 0 times when defining scan
-
- // The max number of operations in each signal is 16
- Uint32 numOpsInSig = numOperations - sigIndex*16;
- if (numOpsInSig > 16)
- numOpsInSig = 16;
- for(Uint32 i = 0; i < numOpsInSig; i++){
- const Uint32 tOpAndLength = info->operLenAndIdx[i];
- const Uint32 tOpIndex = ScanTabInfo::getIdx(tOpAndLength);
- const Uint32 tOpLen = ScanTabInfo::getLen(tOpAndLength);
-
- assert(tOpIndex < 256);
- NdbScanReceiver* tScanRec =
- theScanningOp->theScanReceiversArray[tOpIndex];
- assert(tScanRec != NULL);
- if(tScanRec->isCompleted(tOpLen))
- tScanRec->setCompleted();
- else{
- return -1; // At least one receiver was not ready
- }
-
- // Build list of scan receivers
- if (theCurrentScanRec == NULL) {
- theCurrentScanRec = tScanRec;
- thePreviousScanRec = tScanRec;
- } else {
- thePreviousScanRec->next(tScanRec);
- thePreviousScanRec = tScanRec;
+ const ScanTabConf * conf = CAST_CONSTPTR(ScanTabConf, aSignal->getDataPtr());
+ if(checkState_TransId(&conf->transId1)){
+
+ if (conf->requestInfo == ScanTabConf::EndOfData) {
+ theScanningOp->execCLOSE_SCAN_REP(0);
+ return 0;
+ }
+
+ int noComp = -1;
+ for(Uint32 i = 0; i<len; i += 3){
+ Uint32 ptrI = * ops++;
+ Uint32 tcPtrI = * ops++;
+ Uint32 info = * ops++;
+ Uint32 opCount = ScanTabConf::getRows(info);
+ Uint32 totalLen = ScanTabConf::getLength(info);
+
+ void * tPtr = theNdb->int2void(ptrI);
+ assert(tPtr); // For now
+ NdbReceiver* tOp = theNdb->void2rec(tPtr);
+ if (tOp && tOp->checkMagicNumber()){
+ if(tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)){
+ /**
+ *
+ */
+ noComp++;
+ theScanningOp->receiver_delivered(tOp);
+ } else if(info == ScanTabConf::EndOfData){
+ noComp++;
+ theScanningOp->receiver_completed(tOp);
+ }
}
- tNoOfOpFetched++;
}
- tSignal = tSignal->next();
- sigIndex++;
- }
-
- // Check number of operations fetched against value in SCANTAB_CONF
- if (tNoOfOpFetched != numOperations) {
- setErrorCode(4113);
- return 2; // theWaiter.m_state = NO_WAIT
+ return noComp;
}
- // All signals for this resultset recieved
- // release SCAN_TAB signals
- theNoOfSCANTABCONFRecv = 0;
- theScanningOp->releaseSignals();
-
- // We have received all operations with correct lengths.
- thePreviousScanRec = NULL;
- theNoOfOpFetched = tNoOfOpFetched;
- return 0;
+ return -1;
}
diff --git a/ndb/src/ndbapi/NdbCursorOperation.cpp b/ndb/src/ndbapi/NdbCursorOperation.cpp
index e4dd600c57f..a9f84c4c110 100644
--- a/ndb/src/ndbapi/NdbCursorOperation.cpp
+++ b/ndb/src/ndbapi/NdbCursorOperation.cpp
@@ -30,8 +30,6 @@
#include <NdbResultSet.hpp>
NdbCursorOperation::NdbCursorOperation(Ndb* aNdb) :
- NdbOperation(aNdb),
- m_resultSet(0)
{
}
@@ -48,10 +46,6 @@ void NdbCursorOperation::cursInit()
NdbResultSet* NdbCursorOperation::getResultSet()
{
- if (!m_resultSet)
- m_resultSet = new NdbResultSet(this);
-
- return m_resultSet;
}
diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp
index 3b31fa70e35..899359b12a4 100644
--- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp
+++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp
@@ -257,6 +257,7 @@ NdbTableImpl::init(){
m_indexType = NdbDictionary::Index::Undefined;
m_noOfKeys = 0;
+ m_fragmentCount = 0;
}
bool
@@ -275,11 +276,9 @@ NdbTableImpl::equal(const NdbTableImpl& obj) const
if(strcmp(m_internalName.c_str(), obj.m_internalName.c_str()) != 0){
return false;
}
-
if(m_fragmentType != obj.m_fragmentType){
return false;
}
-
if(m_columns.size() != obj.m_columns.size()){
return false;
}
@@ -318,6 +317,7 @@ NdbTableImpl::assign(const NdbTableImpl& org)
m_newExternalName.assign(org.m_newExternalName);
m_frm.assign(org.m_frm.get_data(), org.m_frm.length());
m_fragmentType = org.m_fragmentType;
+ m_fragmentCount = org.m_fragmentCount;
for(unsigned i = 0; i<org.m_columns.size(); i++){
NdbColumnImpl * col = new NdbColumnImpl();
@@ -826,6 +826,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
m_waiter.m_state = wst;
m_waiter.wait(theWait);
+ m_transporter->unlock_mutex();
// End of Protected area
if(m_waiter.m_state == NO_WAIT && m_error.code == 0){
@@ -1115,6 +1116,7 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
impl->m_kvalue = tableDesc.TableKValue;
impl->m_minLoadFactor = tableDesc.MinLoadFactor;
impl->m_maxLoadFactor = tableDesc.MaxLoadFactor;
+ impl->m_fragmentCount = tableDesc.FragmentCount;
impl->m_indexType = (NdbDictionary::Index::Type)
getApiConstant(tableDesc.TableType,
@@ -1198,6 +1200,8 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
it.next();
}
impl->m_noOfKeys = keyCount;
+ impl->m_keyLenInWords = keyInfoPos;
+
* ret = impl;
return 0;
}
@@ -2707,6 +2711,7 @@ NdbDictInterface::listObjects(NdbApiSignal* signal)
m_waiter.m_node = aNodeId;
m_waiter.m_state = WAIT_LIST_TABLES_CONF;
m_waiter.wait(WAITFOR_RESPONSE_TIMEOUT);
+ m_transporter->unlock_mutex();
// end protected
if (m_waiter.m_state == NO_WAIT && m_error.code == 0)
return 0;
diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/ndb/src/ndbapi/NdbDictionaryImpl.hpp
index 3263a636a79..311d101f8f4 100644
--- a/ndb/src/ndbapi/NdbDictionaryImpl.hpp
+++ b/ndb/src/ndbapi/NdbDictionaryImpl.hpp
@@ -123,7 +123,9 @@ public:
int m_kvalue;
int m_minLoadFactor;
int m_maxLoadFactor;
-
+ int m_keyLenInWords;
+ int m_fragmentCount;
+
NdbDictionaryImpl * m_dictionary;
NdbIndexImpl * m_index;
NdbColumnImpl * getColumn(unsigned attrId);
diff --git a/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/ndb/src/ndbapi/NdbEventOperationImpl.cpp
index 7b4afc72ef7..6ece69cce91 100644
--- a/ndb/src/ndbapi/NdbEventOperationImpl.cpp
+++ b/ndb/src/ndbapi/NdbEventOperationImpl.cpp
@@ -166,7 +166,7 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
}
//theErrorLine++;
- tRecAttr->setUNDEFINED();
+ tRecAttr->setNULL();
// We want to keep the list sorted to make data insertion easier later
if (theFirstRecAttr == NULL) {
@@ -387,7 +387,7 @@ NdbEventOperationImpl::next(int *pOverrun)
while (tAttrId > tRecAttrId) {
//printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
- tWorkingRecAttr->setUNDEFINED();
+ tWorkingRecAttr->setNULL();
tWorkingRecAttr = tWorkingRecAttr->next();
if (tWorkingRecAttr == NULL)
break;
@@ -399,19 +399,16 @@ NdbEventOperationImpl::next(int *pOverrun)
//printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
if (tAttrId == tRecAttrId) {
- tWorkingRecAttr->setNotNULL();
if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey())
hasSomeData++;
//printf("set!\n");
- Uint32 *theRef = (Uint32*)tWorkingRecAttr->aRef();
- Uint32 *theEndRef = theRef + tDataSz;
- while (theRef < theEndRef)
- *theRef++ = *aDataPtr++;
+ tWorkingRecAttr->receive_data(aDataPtr, tDataSz);
// move forward, data has already moved forward
aAttrPtr++;
+ aDataPtr += tDataSz;
tWorkingRecAttr = tWorkingRecAttr->next();
} else {
// move only attr forward
@@ -423,7 +420,7 @@ NdbEventOperationImpl::next(int *pOverrun)
while (tWorkingRecAttr != NULL) {
tRecAttrId = tWorkingRecAttr->attrId();
//printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
- tWorkingRecAttr->setUNDEFINED();
+ tWorkingRecAttr->setNULL();
tWorkingRecAttr = tWorkingRecAttr->next();
}
@@ -436,7 +433,7 @@ NdbEventOperationImpl::next(int *pOverrun)
tDataSz = AttributeHeader(*aDataPtr).getDataSize();
aDataPtr++;
while (tAttrId > tRecAttrId) {
- tWorkingRecAttr->setUNDEFINED();
+ tWorkingRecAttr->setNULL();
tWorkingRecAttr = tWorkingRecAttr->next();
if (tWorkingRecAttr == NULL)
break;
@@ -445,16 +442,11 @@ NdbEventOperationImpl::next(int *pOverrun)
if (tWorkingRecAttr == NULL)
break;
if (tAttrId == tRecAttrId) {
- tWorkingRecAttr->setNotNULL();
-
if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey())
hasSomeData++;
- Uint32 *theRef = (Uint32*)tWorkingRecAttr->aRef();
- Uint32 *theEndRef = theRef + tDataSz;
- while (theRef < theEndRef)
- *theRef++ = *aDataPtr++;
-
+ tWorkingRecAttr->receive_data(aDataPtr, tDataSz);
+ aDataPtr += tDataSz;
// move forward, data+attr has already moved forward
tWorkingRecAttr = tWorkingRecAttr->next();
} else {
@@ -463,7 +455,7 @@ NdbEventOperationImpl::next(int *pOverrun)
}
}
while (tWorkingRecAttr != NULL) {
- tWorkingRecAttr->setUNDEFINED();
+ tWorkingRecAttr->setNULL();
tWorkingRecAttr = tWorkingRecAttr->next();
}
diff --git a/ndb/src/ndbapi/NdbImpl.hpp b/ndb/src/ndbapi/NdbImpl.hpp
index cd05335b337..1fb1969b589 100644
--- a/ndb/src/ndbapi/NdbImpl.hpp
+++ b/ndb/src/ndbapi/NdbImpl.hpp
@@ -35,6 +35,7 @@ public:
#include <NdbError.hpp>
#include <NdbCondition.h>
#include <NdbReceiver.hpp>
+#include <NdbOperation.hpp>
#include <NdbTick.h>
@@ -83,12 +84,13 @@ Ndb::void2rec_iop(void* val){
return (NdbIndexOperation*)(void2rec(val)->getOwner());
}
-inline
-NdbScanReceiver*
-Ndb::void2rec_srec(void* val){
- return (NdbScanReceiver*)(void2rec(val)->getOwner());
+inline
+NdbConnection *
+NdbReceiver::getTransaction(){
+ return ((NdbOperation*)m_owner)->theNdbCon;
}
+
inline
int
Ndb::checkInitState()
@@ -151,7 +153,6 @@ NdbWaiter::wait(int waitTime)
waitTime = maxTime - NdbTick_CurrentMillisecond();
}
}
- NdbMutex_Unlock((NdbMutex*)m_mutex);
}
inline
diff --git a/ndb/src/ndbapi/NdbIndexOperation.cpp b/ndb/src/ndbapi/NdbIndexOperation.cpp
index 02d94f39f2d..d976c912c5e 100644
--- a/ndb/src/ndbapi/NdbIndexOperation.cpp
+++ b/ndb/src/ndbapi/NdbIndexOperation.cpp
@@ -52,7 +52,7 @@ NdbIndexOperation::NdbIndexOperation(Ndb* aNdb) :
/**
* Change receiver type
*/
- theReceiver.init(NdbReceiver::NDB_INDEX_OPERATION, this);
+ theReceiver.init(NdbReceiver::NDB_INDEX_OPERATION, this, false);
}
NdbIndexOperation::~NdbIndexOperation()
@@ -664,10 +664,8 @@ NdbIndexOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransactionId)
tSignal = tnextSignal;
} while (tSignal != NULL);
}//if
- NdbRecAttr* tRecAttrObject = theFirstRecAttr;
theStatus = WaitResponse;
- theCurrentRecAttr = tRecAttrObject;
-
+ theReceiver.prepareSend();
return 0;
}
diff --git a/ndb/src/ndbapi/NdbOperation.cpp b/ndb/src/ndbapi/NdbOperation.cpp
index 8bc9111d060..74d6a462ac8 100644
--- a/ndb/src/ndbapi/NdbOperation.cpp
+++ b/ndb/src/ndbapi/NdbOperation.cpp
@@ -54,7 +54,6 @@ NdbOperation::NdbOperation(Ndb* aNdb) :
//theTable(aTable),
theNdbCon(NULL),
theNext(NULL),
- theNextScanOp(NULL),
theTCREQ(NULL),
theFirstATTRINFO(NULL),
theCurrentATTRINFO(NULL),
@@ -62,8 +61,6 @@ NdbOperation::NdbOperation(Ndb* aNdb) :
theAI_LenInCurrAI(0),
theFirstKEYINFO(NULL),
theLastKEYINFO(NULL),
- theFirstRecAttr(NULL),
- theCurrentRecAttr(NULL),
theFirstLabel(NULL),
theLastLabel(NULL),
@@ -76,10 +73,6 @@ NdbOperation::NdbOperation(Ndb* aNdb) :
theNoOfLabels(0),
theNoOfSubroutines(0),
- theTotalRecAI_Len(0),
- theCurrRecAI_Len(0),
- theAI_ElementLen(0),
- theCurrElemPtr(NULL),
m_currentTable(NULL), //theTableId(0xFFFF),
m_accessTable(NULL), //theAccessTableId(0xFFFF),
//theSchemaVersion(0),
@@ -95,17 +88,9 @@ NdbOperation::NdbOperation(Ndb* aNdb) :
m_tcReqGSN(GSN_TCKEYREQ),
m_keyInfoGSN(GSN_KEYINFO),
m_attrInfoGSN(GSN_ATTRINFO),
- theParallelism(0),
- theScanReceiversArray(NULL),
- theSCAN_TABREQ(NULL),
- theFirstSCAN_TABINFO_Send(NULL),
- theLastSCAN_TABINFO_Send(NULL),
- theFirstSCAN_TABINFO_Recv(NULL),
- theLastSCAN_TABINFO_Recv(NULL),
- theSCAN_TABCONF_Recv(NULL),
theBoundATTRINFO(NULL)
{
- theReceiver.init(NdbReceiver::NDB_OPERATION, this);
+ theReceiver.init(NdbReceiver::NDB_OPERATION, this, false);
theError.code = 0;
}
/*****************************************************************************
@@ -165,7 +150,7 @@ NdbOperation::init(NdbTableImpl* tab, NdbConnection* myConnection){
theNdbCon = myConnection;
for (Uint32 i=0; i<NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY; i++)
for (int j=0; j<3; j++)
- theTupleKeyDefined[i][j] = false;
+ theTupleKeyDefined[i][j] = 0;
theFirstATTRINFO = NULL;
theCurrentATTRINFO = NULL;
@@ -175,13 +160,11 @@ NdbOperation::init(NdbTableImpl* tab, NdbConnection* myConnection){
theTupKeyLen = 0;
theNoOfTupKeyDefined = 0;
- theTotalCurrAI_Len = 0;
- theAI_LenInCurrAI = 0;
- theTotalRecAI_Len = 0;
theDistrKeySize = 0;
theDistributionGroup = 0;
- theCurrRecAI_Len = 0;
- theAI_ElementLen = 0;
+
+ theTotalCurrAI_Len = 0;
+ theAI_LenInCurrAI = 0;
theStartIndicator = 0;
theCommitIndicator = 0;
theSimpleIndicator = 0;
@@ -191,9 +174,6 @@ NdbOperation::init(NdbTableImpl* tab, NdbConnection* myConnection){
theDistrGroupType = 0;
theDistrKeyIndicator = 0;
theScanInfo = 0;
- theFirstRecAttr = NULL;
- theCurrentRecAttr = NULL;
- theCurrElemPtr = NULL;
theTotalNrOfKeyWordInSignal = 8;
theMagicNumber = 0xABCDEF01;
theBoundATTRINFO = NULL;
@@ -212,6 +192,7 @@ NdbOperation::init(NdbTableImpl* tab, NdbConnection* myConnection){
tcKeyReq->scanInfo = 0;
theKEYINFOptr = &tcKeyReq->keyInfo[0];
theATTRINFOptr = &tcKeyReq->attrInfo[0];
+ theReceiver.init(NdbReceiver::NDB_OPERATION, this, false);
return 0;
}
@@ -226,8 +207,6 @@ NdbOperation::release()
{
NdbApiSignal* tSignal;
NdbApiSignal* tSaveSignal;
- NdbRecAttr* tRecAttr;
- NdbRecAttr* tSaveRecAttr;
NdbBranch* tBranch;
NdbBranch* tSaveBranch;
NdbLabel* tLabel;
@@ -260,15 +239,6 @@ NdbOperation::release()
}
theFirstKEYINFO = NULL;
theLastKEYINFO = NULL;
- tRecAttr = theFirstRecAttr;
- while (tRecAttr != NULL)
- {
- tSaveRecAttr = tRecAttr;
- tRecAttr = tRecAttr->next();
- theNdb->releaseRecAttr(tSaveRecAttr);
- }
- theFirstRecAttr = NULL;
- theCurrentRecAttr = NULL;
if (theInterpretIndicator == 1)
{
tBranch = theFirstBranch;
@@ -308,19 +278,18 @@ NdbOperation::release()
}
theBoundATTRINFO = NULL;
}
- releaseScan();
}
NdbRecAttr*
NdbOperation::getValue(const char* anAttrName, char* aValue)
{
- return getValue(m_currentTable->getColumn(anAttrName), aValue);
+ return getValue_impl(m_currentTable->getColumn(anAttrName), aValue);
}
NdbRecAttr*
NdbOperation::getValue(Uint32 anAttrId, char* aValue)
{
- return getValue(m_currentTable->getColumn(anAttrId), aValue);
+ return getValue_impl(m_currentTable->getColumn(anAttrId), aValue);
}
int
@@ -416,16 +385,4 @@ NdbOperation::write_attr(Uint32 anAttrId, Uint32 RegDest)
return write_attr(m_currentTable->getColumn(anAttrId), RegDest);
}
-int
-NdbOperation::setBound(const char* anAttrName, int type, const void* aValue, Uint32 len)
-{
- return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len);
-}
-
-int
-NdbOperation::setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len)
-{
- return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len);
-}
-
diff --git a/ndb/src/ndbapi/NdbOperationDefine.cpp b/ndb/src/ndbapi/NdbOperationDefine.cpp
index 20134068075..c174c6a629a 100644
--- a/ndb/src/ndbapi/NdbOperationDefine.cpp
+++ b/ndb/src/ndbapi/NdbOperationDefine.cpp
@@ -34,6 +34,7 @@
#include "NdbUtil.hpp"
#include "NdbOut.hpp"
#include "NdbImpl.hpp"
+#include <NdbScanOperation.hpp>
#include <Interpreter.hpp>
@@ -261,30 +262,10 @@ NdbOperation::interpretedUpdateTuple()
theStatus = OperationDefined;
tNdbCon->theSimpleState = 0;
theOperationType = UpdateRequest;
- theInterpretIndicator = 1;
theAI_LenInCurrAI = 25;
theErrorLine = tErrorLine++;
- theTotalCurrAI_Len = 5;
- theSubroutineSize = 0;
- theInitialReadSize = 0;
- theInterpretedSize = 0;
- theFinalUpdateSize = 0;
- theFinalReadSize = 0;
-
- theFirstLabel = NULL;
- theLastLabel = NULL;
- theFirstBranch = NULL;
- theLastBranch = NULL;
-
- theFirstCall = NULL;
- theLastCall = NULL;
- theFirstSubroutine = NULL;
- theLastSubroutine = NULL;
-
- theNoOfLabels = 0;
- theNoOfSubroutines = 0;
-
+ initInterpreter();
return 0;
} else {
setErrorCode(4200);
@@ -304,30 +285,11 @@ NdbOperation::interpretedDeleteTuple()
theStatus = OperationDefined;
tNdbCon->theSimpleState = 0;
theOperationType = DeleteRequest;
- theInterpretIndicator = 1;
theErrorLine = tErrorLine++;
theAI_LenInCurrAI = 25;
- theTotalCurrAI_Len = 5;
- theSubroutineSize = 0;
- theInitialReadSize = 0;
- theInterpretedSize = 0;
- theFinalUpdateSize = 0;
- theFinalReadSize = 0;
-
- theFirstLabel = NULL;
- theLastLabel = NULL;
- theFirstBranch = NULL;
- theLastBranch = NULL;
-
- theFirstCall = NULL;
- theLastCall = NULL;
- theFirstSubroutine = NULL;
- theLastSubroutine = NULL;
-
- theNoOfLabels = 0;
- theNoOfSubroutines = 0;
+ initInterpreter();
return 0;
} else {
setErrorCode(4200);
@@ -347,14 +309,14 @@ NdbOperation::interpretedDeleteTuple()
* Remark: Define an attribute to retrieve in query.
*****************************************************************************/
NdbRecAttr*
-NdbOperation::getValue(const NdbColumnImpl* tAttrInfo, char* aValue)
+NdbOperation::getValue_impl(const NdbColumnImpl* tAttrInfo, char* aValue)
{
NdbRecAttr* tRecAttr;
if ((tAttrInfo != NULL) &&
(!tAttrInfo->m_indexOnly) &&
(theStatus != Init)){
if (theStatus == SetBound) {
- saveBoundATTRINFO();
+ ((NdbScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue;
}
if (theStatus != GetValue) {
@@ -386,33 +348,15 @@ NdbOperation::getValue(const NdbColumnImpl* tAttrInfo, char* aValue)
// Insert Attribute Id into ATTRINFO part.
/************************************************************************
- * Get a Receive Attribute object and link it into the operation object.
- ************************************************************************/
- tRecAttr = theNdb->getRecAttr();
- if (tRecAttr != NULL) {
- if (theFirstRecAttr == NULL)
- theFirstRecAttr = tRecAttr;
- else
- theCurrentRecAttr->next(tRecAttr);
- theCurrentRecAttr = tRecAttr;
- tRecAttr->next(NULL);
-
- /**********************************************************************
- * Now set the attribute identity and the pointer to the data in
- * the RecAttr object
- * Also set attribute size, array size and attribute type
- ********************************************************************/
- if (tRecAttr->setup(tAttrInfo, aValue) == 0) {
- theErrorLine++;
- return tRecAttr;
- } else {
- setErrorCodeAbort(4000);
- return NULL;
- }
- } else {
+ * Get a Receive Attribute object and link it into the operation object.
+ ***********************************************************************/
+ if((tRecAttr = theReceiver.getValue(tAttrInfo, aValue)) != 0){
+ theErrorLine++;
+ return tRecAttr;
+ } else {
setErrorCodeAbort(4000);
return NULL;
- }//if getRecAttr failure
+ }
} else {
return NULL;
}//if insertATTRINFO failure
@@ -603,47 +547,6 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
return 0;
}//NdbOperation::setValue()
-/*
- * Define bound on index column in range scan.
- */
-int
-NdbOperation::setBound(const NdbColumnImpl* tAttrInfo, int type, const void* aValue, Uint32 len)
-{
- if (theOperationType == OpenRangeScanRequest &&
- theStatus == SetBound &&
- (0 <= type && type <= 4) &&
- aValue != NULL &&
- len <= 8000) {
- // bound type
- insertATTRINFO(type);
- // attribute header
- Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
- if (len != sizeInBytes && (len != 0)) {
- setErrorCodeAbort(4209);
- return -1;
- }
- len = sizeInBytes;
- Uint32 tIndexAttrId = tAttrInfo->m_attrId;
- Uint32 sizeInWords = (len + 3) / 4;
- AttributeHeader ah(tIndexAttrId, sizeInWords);
- insertATTRINFO(ah.m_value);
- // attribute data
- if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0)
- insertATTRINFOloop((const Uint32*)aValue, sizeInWords);
- else {
- Uint32 temp[2000];
- memcpy(temp, aValue, len);
- while ((len & 0x3) != 0)
- ((char*)temp)[len++] = 0;
- insertATTRINFOloop(temp, sizeInWords);
- }
- return 0;
- } else {
- setErrorCodeAbort(4228); // XXX wrong code
- return -1;
- }
-}
-
/****************************************************************************
* int insertATTRINFO( Uint32 aData );
*
diff --git a/ndb/src/ndbapi/NdbOperationExec.cpp b/ndb/src/ndbapi/NdbOperationExec.cpp
index d00c527550d..3d8c2f29615 100644
--- a/ndb/src/ndbapi/NdbOperationExec.cpp
+++ b/ndb/src/ndbapi/NdbOperationExec.cpp
@@ -46,83 +46,6 @@ Documentation:
#include <NdbOut.hpp>
-/******************************************************************************
-int doSend()
-
-Return Value: Return >0 : send was succesful, returns number of signals sent
- Return -1: In all other case.
-Parameters: aProcessorId: Receiving processor node
-Remark: Sends the ATTRINFO signal(s)
-******************************************************************************/
-int
-NdbOperation::doSendScan(int aProcessorId)
-{
- Uint32 tSignalCount = 0;
- NdbApiSignal* tSignal;
-
- if (theInterpretIndicator != 1 ||
- (theOperationType != OpenScanRequest &&
- theOperationType != OpenRangeScanRequest)) {
- setErrorCodeAbort(4005);
- return -1;
- }
-
- assert(theSCAN_TABREQ != NULL);
- tSignal = theSCAN_TABREQ;
- if (tSignal->setSignal(GSN_SCAN_TABREQ) == -1) {
- setErrorCode(4001);
- return -1;
- }
- // Update the "attribute info length in words" in SCAN_TABREQ before
- // sending it. This could not be done in openScan because
- // we created the ATTRINFO signals after the SCAN_TABREQ signal.
- ScanTabReq * const scanTabReq = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
- scanTabReq->attrLen = theTotalCurrAI_Len;
- if (theOperationType == OpenRangeScanRequest)
- scanTabReq->attrLen += theTotalBoundAI_Len;
- TransporterFacade *tp = TransporterFacade::instance();
- if (tp->sendSignal(tSignal, aProcessorId) == -1) {
- setErrorCode(4002);
- return -1;
- }
- tSignalCount++;
-
- tSignal = theFirstSCAN_TABINFO_Send;
- while (tSignal != NULL){
- if (tp->sendSignal(tSignal, aProcessorId)) {
- setErrorCode(4002);
- return -1;
- }
- tSignalCount++;
- tSignal = tSignal->next();
- }
-
- if (theOperationType == OpenRangeScanRequest) {
- // must have at least one signal since it contains attrLen for bounds
- assert(theBoundATTRINFO != NULL);
- tSignal = theBoundATTRINFO;
- while (tSignal != NULL) {
- if (tp->sendSignal(tSignal,aProcessorId) == -1){
- setErrorCode(4002);
- return -1;
- }
- tSignalCount++;
- tSignal = tSignal->next();
- }
- }
-
- tSignal = theFirstATTRINFO;
- while (tSignal != NULL) {
- if (tp->sendSignal(tSignal,aProcessorId) == -1){
- setErrorCode(4002);
- return -1;
- }
- tSignalCount++;
- tSignal = tSignal->next();
- }
- theStatus = WaitResponse;
- return tSignalCount;
-}//NdbOperation::doSendScan()
void
NdbOperation::setLastFlag(NdbApiSignal* signal, Uint32 lastFlag)
@@ -178,62 +101,6 @@ NdbOperation::doSend(int aNodeId, Uint32 lastFlag)
}//NdbOperation::doSend()
/***************************************************************************
-int prepareSendScan(Uint32 aTC_ConnectPtr,
- Uint64 aTransactionId)
-
-Return Value: Return 0 : preparation of send was succesful.
- Return -1: In all other case.
-Parameters: aTC_ConnectPtr: the Connect pointer to TC.
- aTransactionId: the Transaction identity of the transaction.
-Remark: Puts the the final data into ATTRINFO signal(s) after this
- we know the how many signal to send and their sizes
-***************************************************************************/
-int NdbOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
- Uint64 aTransactionId){
-
- if (theInterpretIndicator != 1 ||
- (theOperationType != OpenScanRequest &&
- theOperationType != OpenRangeScanRequest)) {
- setErrorCodeAbort(4005);
- return -1;
- }
-
- if (theStatus == SetBound) {
- saveBoundATTRINFO();
- theStatus = GetValue;
- }
-
- theErrorLine = 0;
-
- // In preapareSendInterpreted we set the sizes (word 4-8) in the
- // first ATTRINFO signal.
- if (prepareSendInterpreted() == -1)
- return -1;
-
- const Uint32 transId1 = (Uint32) (aTransactionId & 0xFFFFFFFF);
- const Uint32 transId2 = (Uint32) (aTransactionId >> 32);
-
- if (theOperationType == OpenRangeScanRequest) {
- NdbApiSignal* tSignal = theBoundATTRINFO;
- do{
- tSignal->setData(aTC_ConnectPtr, 1);
- tSignal->setData(transId1, 2);
- tSignal->setData(transId2, 3);
- tSignal = tSignal->next();
- } while (tSignal != NULL);
- }
- theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
- NdbApiSignal* tSignal = theFirstATTRINFO;
- do{
- tSignal->setData(aTC_ConnectPtr, 1);
- tSignal->setData(transId1, 2);
- tSignal->setData(transId2, 3);
- tSignal = tSignal->next();
- } while (tSignal != NULL);
- return 0;
-}
-
-/***************************************************************************
int prepareSend(Uint32 aTC_ConnectPtr,
Uint64 aTransactionId)
@@ -457,6 +324,7 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId)
theTCREQ->setLength(tcKeyReq->getAIInTcKeyReq(tReqInfo) +
tAttrInfoIndex + TcKeyReq::StaticLength);
+
tAIDataPtr[0] = Tdata1;
tAIDataPtr[1] = Tdata2;
tAIDataPtr[2] = Tdata3;
@@ -479,9 +347,8 @@ NdbOperation::prepareSend(Uint32 aTC_ConnectPtr, Uint64 aTransId)
tSignal = tnextSignal;
} while (tSignal != NULL);
}//if
- NdbRecAttr* tRecAttrObject = theFirstRecAttr;
theStatus = WaitResponse;
- theCurrentRecAttr = tRecAttrObject;
+ theReceiver.prepareSend();
return 0;
}//NdbOperation::prepareSend()
@@ -648,71 +515,10 @@ NdbOperation::prepareSendInterpreted()
theFirstATTRINFO->setData(tFinalReadSize, 7);
theFirstATTRINFO->setData(tSubroutineSize, 8);
}//if
+ theReceiver.prepareSend();
return 0;
}//NdbOperation::prepareSendInterpreted()
-/***************************************************************************
-int TCOPCONF(int anAttrInfoLen)
-
-Return Value: Return 0 : send was succesful.
- Return -1: In all other case.
-Parameters: anAttrInfoLen: The length of the attribute information from TC.
-Remark: Handles the reception of the TC[KEY/INDX]CONF signal.
-***************************************************************************/
-void
-NdbOperation::TCOPCONF(Uint32 anAttrInfoLen)
-{
- Uint32 tCurrRecLen = theCurrRecAI_Len;
- if (theStatus == WaitResponse) {
- theTotalRecAI_Len = anAttrInfoLen;
- if (anAttrInfoLen == tCurrRecLen) {
- Uint32 tAI_ElemLen = theAI_ElementLen;
- NdbRecAttr* tCurrRecAttr = theCurrentRecAttr;
- theStatus = Finished;
-
- if ((tAI_ElemLen == 0) &&
- (tCurrRecAttr == NULL)) {
- NdbRecAttr* tRecAttr = theFirstRecAttr;
- while (tRecAttr != NULL) {
- if (tRecAttr->copyoutRequired()) // copy to application buffer
- tRecAttr->copyout();
- tRecAttr = tRecAttr->next();
- }
- theNdbCon->OpCompleteSuccess();
- return;
- } else if (tAI_ElemLen != 0) {
- setErrorCode(4213);
- theNdbCon->OpCompleteFailure();
- return;
- } else {
- setErrorCode(4214);
- theNdbCon->OpCompleteFailure();
- return;
- }//if
- } else if (anAttrInfoLen > tCurrRecLen) {
- return;
- } else {
- theStatus = Finished;
-
- if (theAI_ElementLen != 0) {
- setErrorCode(4213);
- theNdbCon->OpCompleteFailure();
- return;
- }//if
- if (theCurrentRecAttr != NULL) {
- setErrorCode(4214);
- theNdbCon->OpCompleteFailure();
- return;
- }//if
- theNdbCon->OpCompleteFailure();
- return;
- }//if
- } else {
- setErrorCode(4004);
- }//if
- return;
-}//NdbOperation::TCKEYOPCONF()
-
int
NdbOperation::checkState_TransId(NdbApiSignal* aSignal)
{
@@ -777,188 +583,13 @@ NdbOperation::receiveTCKEYREF( NdbApiSignal* aSignal)
}//NdbOperation::receiveTCKEYREF()
-/***************************************************************************
-int receiveREAD_CONF( NdbApiSignal* aSignal)
-
-Return Value: Return 0 : send was succesful.
- Return -1: In all other case.
-Parameters: aSignal: the signal object that contains the READCONF signal from TUP.
-Remark: Handles the reception of the READCONF signal.
-***************************************************************************/
-int
-NdbOperation::receiveREAD_CONF(const Uint32* aDataPtr, Uint32 aDataLength)
-{
- Uint64 tRecTransId, tCurrTransId;
- Uint32 tCondFlag = (Uint32)(theStatus - WaitResponse);
- Uint32 tTotLen = aDataPtr[3];
-
- tRecTransId = (Uint64)aDataPtr[1] + ((Uint64)aDataPtr[2] << 32);
- tCurrTransId = theNdbCon->getTransactionId();
- tCondFlag |= (Uint32)((tRecTransId - tCurrTransId) != (Uint64)0);
- tCondFlag |= (Uint32)(aDataLength < 4);
-
- if (tCondFlag == 0) {
- theTotalRecAI_Len = tTotLen;
- int tRetValue = receiveREAD_AI((Uint32*)&aDataPtr[4], (aDataLength - 4));
- if (theStatus == Finished) {
- return tRetValue;
- } else {
- theStatus = Finished;
- return theNdbCon->OpCompleteFailure();
- }//if
- }//if
-#ifdef NDB_NO_DROPPED_SIGNAL
- abort();
-#endif
- return -1;
-}//NdbOperation::receiveREAD_CONF()
-
-/***************************************************************************
-int receiveTRANSID_AI( NdbApiSignal* aSignal)
-
-Return Value: Return 0 : send was succesful.
- Return -1: In all other case.
-Parameters: aSignal: the signal object that contains the TRANSID_AI signal.
-Remark: Handles the reception of the TRANSID_AI signal.
-***************************************************************************/
-int
-NdbOperation::receiveTRANSID_AI(const Uint32* aDataPtr, Uint32 aDataLength)
-{
- Uint64 tRecTransId, tCurrTransId;
- Uint32 tCondFlag = (Uint32)(theStatus - WaitResponse);
-
- tRecTransId = (Uint64)aDataPtr[1] + ((Uint64)aDataPtr[2] << 32);
- tCurrTransId = theNdbCon->getTransactionId();
- tCondFlag |= (Uint32)((tRecTransId - tCurrTransId) != (Uint64)0);
- tCondFlag |= (Uint32)(aDataLength < 3);
-
- if (tCondFlag == 0) {
- return receiveREAD_AI((Uint32*)&aDataPtr[3], (aDataLength - 3));
- }//if
-#ifdef NDB_NO_DROPPED_SIGNAL
- abort();
-#endif
- return -1;
-}//NdbOperation::receiveTRANSID_AI()
-
-/***************************************************************************
-int receiveREAD_AI( NdbApiSignal* aSignal, int aLength, int aStartPos)
-
-Return Value: Return 0 : send was succesoccurredful.
- Return -1: In all other case.
-Parameters: aSignal: the signal object that contains the LEN_ATTRINFO11 signal.
- aLength:
- aStartPos:
-Remark: Handles the reception of the LEN_ATTRINFO11 signal.
-***************************************************************************/
-int
-NdbOperation::receiveREAD_AI(Uint32* aDataPtr, Uint32 aLength)
-{
-
- register Uint32 tAI_ElementLen = theAI_ElementLen;
- register Uint32* tCurrElemPtr = theCurrElemPtr;
- if (theError.code == 0) {
- // If inconsistency error occurred we will still continue
- // receiving signals since we need to know whether commit
- // has occurred.
-
- register Uint32 tData;
- for (register Uint32 i = 0; i < aLength ; i++, aDataPtr++)
- {
- // Code to receive Attribute Information
- tData = *aDataPtr;
- if (tAI_ElementLen != 0) {
- tAI_ElementLen--;
- *tCurrElemPtr = tData;
- tCurrElemPtr++;
- continue;
- } else {
- // Waiting for a new attribute element
- NdbRecAttr* tWorkingRecAttr;
-
- tWorkingRecAttr = theCurrentRecAttr;
- AttributeHeader ah(tData);
- const Uint32 tAttrId = ah.getAttributeId();
- const Uint32 tAttrSize = ah.getDataSize();
- if ((tWorkingRecAttr != NULL) &&
- (tWorkingRecAttr->attrId() == tAttrId)) {
- ;
- } else {
- setErrorCode(4211);
- break;
- }//if
- theCurrentRecAttr = tWorkingRecAttr->next();
- NdbColumnImpl * col = m_currentTable->getColumn(tAttrId);
- if (ah.isNULL()) {
- // Return a Null value from the NDB to the attribute.
- if(col != 0 && col->m_nullable) {
- tWorkingRecAttr->setNULL();
- tAI_ElementLen = 0;
- } else {
- setErrorCode(4212);
- break;
- }//if
- } else {
- // Return a value from the NDB to the attribute.
- tWorkingRecAttr->setNotNULL();
- const Uint32 sizeInBytes = col->m_attrSize * col->m_arraySize;
- const Uint32 sizeInWords = (sizeInBytes + 3) / 4;
- tAI_ElementLen = tAttrSize;
- tCurrElemPtr = (Uint32*)tWorkingRecAttr->aRef();
- if (sizeInWords == tAttrSize){
- continue;
- } else {
- setErrorCode(4201);
- break;
- }//if
- }//if
- }//if
- }//for
- }//if
- Uint32 tCurrRecLen = theCurrRecAI_Len;
- Uint32 tTotRecLen = theTotalRecAI_Len;
- theAI_ElementLen = tAI_ElementLen;
- theCurrElemPtr = tCurrElemPtr;
- tCurrRecLen = tCurrRecLen + aLength;
- theCurrRecAI_Len = tCurrRecLen; // Update Current Received AI Length
- if (tTotRecLen == tCurrRecLen){ // Operation completed
- NdbRecAttr* tCurrRecAttr = theCurrentRecAttr;
- theStatus = Finished;
-
- NdbConnection* tNdbCon = theNdbCon;
- if ((tAI_ElementLen == 0) &&
- (tCurrRecAttr == NULL)) {
- NdbRecAttr* tRecAttr = theFirstRecAttr;
- while (tRecAttr != NULL) {
- if (tRecAttr->copyoutRequired()) // copy to application buffer
- tRecAttr->copyout();
- tRecAttr = tRecAttr->next();
- }
- return tNdbCon->OpCompleteSuccess();
- } else if (tAI_ElementLen != 0) {
- setErrorCode(4213);
- return tNdbCon->OpCompleteFailure();
- } else {
- setErrorCode(4214);
- return tNdbCon->OpCompleteFailure();
- }//if
- }
- else if ((tCurrRecLen > tTotRecLen) &&
- (tTotRecLen > 0)) { /* == 0 if TCKEYCONF not yet received */
- setErrorCode(4215);
- theStatus = Finished;
-
- return theNdbCon->OpCompleteFailure();
- }//if
- return -1; // Continue waiting for more signals of this operation
-}//NdbOperation::receiveREAD_AI()
void
NdbOperation::handleFailedAI_ElemLen()
{
- NdbRecAttr* tRecAttr = theFirstRecAttr;
+ NdbRecAttr* tRecAttr = theReceiver.theFirstRecAttr;
while (tRecAttr != NULL) {
- tRecAttr->setUNDEFINED();
+ tRecAttr->setNULL();
tRecAttr = tRecAttr->next();
}//while
}//NdbOperation::handleFailedAI_ElemLen()
diff --git a/ndb/src/ndbapi/NdbOperationInt.cpp b/ndb/src/ndbapi/NdbOperationInt.cpp
index e61fc5b05d7..9abcfd6ef33 100644
--- a/ndb/src/ndbapi/NdbOperationInt.cpp
+++ b/ndb/src/ndbapi/NdbOperationInt.cpp
@@ -33,6 +33,7 @@ Adjust: 991029 UABRONM First version.
#include "NdbRecAttr.hpp"
#include "NdbUtil.hpp"
#include "Interpreter.hpp"
+#include <NdbScanOperation.hpp>
#ifdef VM_TRACE
#include <NdbEnv.h>
@@ -43,6 +44,31 @@ Adjust: 991029 UABRONM First version.
#define INT_DEBUG(x)
#endif
+void
+NdbOperation::initInterpreter(){
+ theFirstLabel = NULL;
+ theLastLabel = NULL;
+ theFirstBranch = NULL;
+ theLastBranch = NULL;
+
+ theFirstCall = NULL;
+ theLastCall = NULL;
+ theFirstSubroutine = NULL;
+ theLastSubroutine = NULL;
+
+ theNoOfLabels = 0;
+ theNoOfSubroutines = 0;
+
+ theSubroutineSize = 0;
+ theInitialReadSize = 0;
+ theInterpretedSize = 0;
+ theFinalUpdateSize = 0;
+ theFinalReadSize = 0;
+ theInterpretIndicator = 1;
+
+ theTotalCurrAI_Len = 5;
+}
+
int
NdbOperation::incCheck(const NdbColumnImpl* tNdbColumnImpl)
{
@@ -191,7 +217,7 @@ NdbOperation::initial_interpreterCheck()
{
if ((theInterpretIndicator == 1)) {
if (theStatus == SetBound) {
- saveBoundATTRINFO();
+ ((NdbScanOperation*)this)->saveBoundATTRINFO();
theStatus = GetValue;
}
if (theStatus == ExecInterpretedValue) {
diff --git a/ndb/src/ndbapi/NdbOperationScan.cpp b/ndb/src/ndbapi/NdbOperationScan.cpp
index 0c377d3fd98..283eb591bdb 100644
--- a/ndb/src/ndbapi/NdbOperationScan.cpp
+++ b/ndb/src/ndbapi/NdbOperationScan.cpp
@@ -14,563 +14,3 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#include "NdbOperation.hpp"
-#include "NdbScanReceiver.hpp"
-
-#include <signaldata/TcKeyReq.hpp>
-#include <signaldata/ScanTab.hpp>
-#include <signaldata/ScanFrag.hpp>
-#include <signaldata/KeyInfo.hpp>
-
-
-/******************************************************************************
- * int openScanRead();
- *****************************************************************************/
-int
-NdbOperation::openScanRead(Uint32 aParallelism)
-{
- aParallelism = checkParallelism(aParallelism);
-
- if ((theNdbCon->theCommitStatus != NdbConnection::Started) &&
- (theStatus != Init) &&
- (aParallelism == 0)) {
- setErrorCode(4200);
- return -1;
- }
- return openScan(aParallelism, false, false, false);
-}
-
-/****************************************************************************
- * int openScanExclusive();
- ****************************************************************************/
-int
-NdbOperation::openScanExclusive(Uint32 aParallelism)
-{
- aParallelism = checkParallelism(aParallelism);
-
- if ((theNdbCon->theCommitStatus != NdbConnection::Started) &&
- (theStatus != Init) &&
- (aParallelism == 0)) {
- setErrorCode(4200);
- return -1;
- }
- return openScan(aParallelism, true, true, false);
-}
-
-/******************************************************************************
- * int openScanReadHoldLock();
- *****************************************************************************/
-int
-NdbOperation::openScanReadHoldLock(Uint32 aParallelism)
-{
- aParallelism = checkParallelism(aParallelism);
-
- if ((theNdbCon->theCommitStatus != NdbConnection::Started) &&
- (theStatus != Init) &&
- (aParallelism == 0)) {
- setErrorCode(4200);
- return -1;
- }
- return openScan(aParallelism, false, true, false);
-}
-
-/******************************************************************************
- * int openScanReadCommitted();
- *****************************************************************************/
-int
-NdbOperation::openScanReadCommitted(Uint32 aParallelism)
-{
- aParallelism = checkParallelism(aParallelism);
-
- if ((theNdbCon->theCommitStatus != NdbConnection::Started) &&
- (theStatus != Init) &&
- (aParallelism == 0)) {
- setErrorCode(4200);
- return -1;
- }
- return openScan(aParallelism, false, false, true);
-}
-
-/****************************************************************************
- * int checkParallelism();
- * Remark If the parallelism is set wrong the number of scan-operations
- * will not correspond to the number of TRANSID_AI signals returned
- * from NDB and the result will be a crash, therefore
- * we adjust it or return an error if the value is totally wrong.
- ****************************************************************************/
-int
-NdbOperation::checkParallelism(Uint32 aParallelism)
-{
- if (aParallelism == 0) {
- setErrorCodeAbort(4232);
- return 0;
- }
- if (aParallelism > 16) {
- if (aParallelism <= 240) {
-
- /**
- * If tscanConcurrency > 16 it must be a multiple of 16
- */
- if (((aParallelism >> 4) << 4) < aParallelism) {
- aParallelism = ((aParallelism >> 4) << 4) + 16;
- }//if
-
- /*---------------------------------------------------------------*/
- /* We cannot have a parallelism > 16 per node */
- /*---------------------------------------------------------------*/
- if ((aParallelism / theNdb->theNoOfDBnodes) > 16) {
- aParallelism = theNdb->theNoOfDBnodes * 16;
- }//if
-
- } else {
- setErrorCodeAbort(4232);
- aParallelism = 0;
- }//if
- }//if
- return aParallelism;
-}//NdbOperation::checkParallelism()
-
-/**********************************************************************
- * int openScan();
- *************************************************************************/
-int
-NdbOperation::openScan(Uint32 aParallelism,
- bool lockMode, bool lockHoldMode, bool readCommitted)
-{
- aParallelism = checkParallelism(aParallelism);
- if(aParallelism == 0){
- return 0;
- }
- NdbScanReceiver* tScanRec;
- // It is only possible to call openScan if
- // 1. this transcation don't already contain another scan operation
- // 2. this transaction don't already contain other operations
- // 3. theScanOp contains a NdbScanOperation
- if (theNdbCon->theScanningOp != NULL){
- setErrorCode(4605);
- return -1;
- }
-
- if ((theNdbCon->theFirstOpInList != this) ||
- (theNdbCon->theLastOpInList != this)) {
- setErrorCode(4603);
- return -1;
- }
- theNdbCon->theScanningOp = this;
-
- initScan();
- theParallelism = aParallelism;
-
- // If the scan is on ordered index then it is a range scan
- if (m_currentTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
- m_currentTable->m_indexType == NdbDictionary::Index::UniqueOrderedIndex) {
- assert(m_currentTable == m_accessTable);
- m_currentTable = theNdb->theDictionary->getTable(m_currentTable->m_primaryTable.c_str());
- assert(m_currentTable != NULL);
- // Modify operation state
- theStatus = SetBound;
- theOperationType = OpenRangeScanRequest;
- }
-
- theScanReceiversArray = new NdbScanReceiver* [aParallelism];
- if (theScanReceiversArray == NULL){
- setErrorCodeAbort(4000);
- return -1;
- }
-
- for (Uint32 i = 0; i < aParallelism; i ++) {
- tScanRec = theNdb->getNdbScanRec();
- if (tScanRec == NULL) {
- setErrorCodeAbort(4000);
- return -1;
- }//if
- tScanRec->init(this, lockMode);
- theScanReceiversArray[i] = tScanRec;
- }
-
- theSCAN_TABREQ = theNdb->getSignal();
- if (theSCAN_TABREQ == NULL) {
- setErrorCodeAbort(4000);
- return -1;
- }//if
- ScanTabReq * const scanTabReq = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
- scanTabReq->apiConnectPtr = theNdbCon->theTCConPtr;
- scanTabReq->tableId = m_accessTable->m_tableId;
- scanTabReq->tableSchemaVersion = m_accessTable->m_version;
- scanTabReq->storedProcId = 0xFFFF;
- scanTabReq->buddyConPtr = theNdbCon->theBuddyConPtr;
-
- Uint32 reqInfo = 0;
- ScanTabReq::setParallelism(reqInfo, aParallelism);
- ScanTabReq::setLockMode(reqInfo, lockMode);
- ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
- ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
- if (theOperationType == OpenRangeScanRequest)
- ScanTabReq::setRangeScanFlag(reqInfo, true);
- scanTabReq->requestInfo = reqInfo;
-
- Uint64 transId = theNdbCon->getTransactionId();
- scanTabReq->transId1 = (Uint32) transId;
- scanTabReq->transId2 = (Uint32) (transId >> 32);
-
- for (Uint32 i = 0; i < 16 && i < aParallelism ; i++) {
- scanTabReq->apiOperationPtr[i] = theScanReceiversArray[i]->ptr2int();
- }//for
-
- // Create one additional SCAN_TABINFO for each
- // 16 of parallelism
- NdbApiSignal* tSignal;
- Uint32 tParallelism = aParallelism;
- while (tParallelism > 16) {
- tSignal = theNdb->getSignal();
- if (tSignal == NULL) {
- setErrorCodeAbort(4000);
- return -1;
- }//if
- if (tSignal->setSignal(GSN_SCAN_TABINFO) == -1) {
- setErrorCode(4001);
- return -1;
- }
- tSignal->next(theFirstSCAN_TABINFO_Send);
- theFirstSCAN_TABINFO_Send = tSignal;
- tParallelism -= 16;
- }//while
-
- // Format all SCAN_TABINFO signals
- tParallelism = 16;
- tSignal = theFirstSCAN_TABINFO_Send;
- while (tSignal != NULL) {
- tSignal->setData(theNdbCon->theTCConPtr, 1);
- for (int i = 0; i < 16 ; i++) {
- tSignal->setData(theScanReceiversArray[i + tParallelism]->ptr2int(), i + 2);
- }//for
- tSignal = tSignal->next();
- tParallelism += 16;
- }//while
-
- getFirstATTRINFOScan();
- return 0;
-}//NdbScanOperation::openScan()
-
-/*****************************************************************************
- * int getFirstATTRINFOScan( U_int32 aData )
- *
- * Return Value: Return 0: Successful
- * Return -1: All other cases
- * Parameters: None: Only allocate the first signal.
- * Remark: When a scan is defined we need to use this method instead
- * of insertATTRINFO for the first signal.
- * This is because we need not to mess up the code in
- * insertATTRINFO with if statements since we are not
- * interested in the TCKEYREQ signal.
- *****************************************************************************/
-int
-NdbOperation::getFirstATTRINFOScan()
-{
- NdbApiSignal* tSignal;
-
- tSignal = theNdb->getSignal();
- if (tSignal == NULL){
- setErrorCodeAbort(4000);
- return -1;
- }
- tSignal->setSignal(m_attrInfoGSN);
- theAI_LenInCurrAI = 8;
- theATTRINFOptr = &tSignal->getDataPtrSend()[8];
- theFirstATTRINFO = tSignal;
- theCurrentATTRINFO = tSignal;
- theCurrentATTRINFO->next(NULL);
- return 0;
-}
-
-/*
- * After setBound() are done, move the accumulated ATTRINFO signals to
- * a separate list. Then continue with normal scan.
- */
-int
-NdbOperation::saveBoundATTRINFO()
-{
- theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
- theBoundATTRINFO = theFirstATTRINFO;
- theTotalBoundAI_Len = theTotalCurrAI_Len;
- theTotalCurrAI_Len = 5;
- theBoundATTRINFO->setData(theTotalBoundAI_Len, 4);
- theBoundATTRINFO->setData(0, 5);
- theBoundATTRINFO->setData(0, 6);
- theBoundATTRINFO->setData(0, 7);
- theBoundATTRINFO->setData(0, 8);
- theStatus = GetValue;
- return getFirstATTRINFOScan();
-}
-
-/*****************************************************************************
- * void releaseScan()
- *
- * Return Value No return value.
- * Parameters: No parameters.
- * Remark: Release objects after scanning.
- *****************************************************************************/
-void
-NdbOperation::releaseScan()
-{
- NdbScanReceiver* tScanRec;
- TransporterFacade::instance()->lock_mutex();
- for (Uint32 i = 0; i < theParallelism && theScanReceiversArray != NULL; i++) {
- tScanRec = theScanReceiversArray[i];
- if (tScanRec != NULL) {
- tScanRec->release();
- tScanRec->next(NULL);
- }
- }
- TransporterFacade::instance()->unlock_mutex();
- releaseSignals();
-
- if (theScanReceiversArray != NULL) {
- for (Uint32 i = 0; i < theParallelism; i++) {
- NdbScanReceiver* tScanRec;
- tScanRec = theScanReceiversArray[i];
- if (tScanRec != NULL) {
- theNdb->releaseNdbScanRec(tScanRec);
- theScanReceiversArray[i] = NULL;
- }
- }
-
- delete [] theScanReceiversArray;
- }//if
- theScanReceiversArray = NULL;
-
- if (theSCAN_TABREQ != NULL){
- theNdb->releaseSignal(theSCAN_TABREQ);
- theSCAN_TABREQ = NULL;
- }
-}
-
-void NdbOperation::releaseSignals(){
- theNdb->releaseSignalsInList(&theFirstSCAN_TABINFO_Send);
- theFirstSCAN_TABINFO_Send = NULL;
- theLastSCAN_TABINFO_Send = NULL;
- // theNdb->releaseSignalsInList(&theFirstSCAN_TABINFO_Recv);
-
- while(theFirstSCAN_TABINFO_Recv != NULL){
- NdbApiSignal* tmp = theFirstSCAN_TABINFO_Recv;
- theFirstSCAN_TABINFO_Recv = tmp->next();
- delete tmp;
- }
- theFirstSCAN_TABINFO_Recv = NULL;
- theLastSCAN_TABINFO_Recv = NULL;
- if (theSCAN_TABCONF_Recv != NULL){
- // theNdb->releaseSignal(theSCAN_TABCONF_Recv);
- delete theSCAN_TABCONF_Recv;
- theSCAN_TABCONF_Recv = NULL;
- }
-}
-
-
-void NdbOperation::prepareNextScanResult(){
- NdbScanReceiver* tScanRec;
- for (Uint32 i = 0; i < theParallelism; i++) {
- tScanRec = theScanReceiversArray[i];
- assert(tScanRec != NULL);
- tScanRec->prepareNextScanResult();
- tScanRec->next(NULL);
- }
- releaseSignals();
-}
-
-/******************************************************************************
- * void initScan();
- *
- * Return Value: Return 0 : init was successful.
- * Return -1: In all other case.
- * Remark: Initiates operation record after allocation.
- *****************************************************************************/
-void
-NdbOperation::initScan()
-{
- theTotalRecAI_Len = 0;
- theCurrRecAI_Len = 0;
- theStatus = GetValue;
- theOperationType = OpenScanRequest;
- theCurrentRecAttr = theFirstRecAttr;
- theScanInfo = 0;
- theMagicNumber = 0xABCDEF01;
- theTotalCurrAI_Len = 5;
-
- theFirstLabel = NULL;
- theLastLabel = NULL;
- theFirstBranch = NULL;
- theLastBranch = NULL;
-
- theFirstCall = NULL;
- theLastCall = NULL;
- theFirstSubroutine = NULL;
- theLastSubroutine = NULL;
-
- theNoOfLabels = 0;
- theNoOfSubroutines = 0;
-
- theSubroutineSize = 0;
- theInitialReadSize = 0;
- theInterpretedSize = 0;
- theFinalUpdateSize = 0;
- theFinalReadSize = 0;
- theInterpretIndicator = 1;
-
-
- theFirstSCAN_TABINFO_Send = NULL;
- theLastSCAN_TABINFO_Send = NULL;
- theFirstSCAN_TABINFO_Recv = NULL;
- theLastSCAN_TABINFO_Recv = NULL;
- theSCAN_TABCONF_Recv = NULL;
-
- theScanReceiversArray = NULL;
-
- theTotalBoundAI_Len = 0;
- theBoundATTRINFO = NULL;
- return;
-}
-
-NdbOperation* NdbOperation::takeOverForDelete(NdbConnection* updateTrans){
- return takeOverScanOp(DeleteRequest, updateTrans);
-}
-
-NdbOperation* NdbOperation::takeOverForUpdate(NdbConnection* updateTrans){
- return takeOverScanOp(UpdateRequest, updateTrans);
-}
-/******************************************************************************
- * NdbOperation* takeOverScanOp(NdbConnection* updateTrans);
- *
- * Parameters: The update transactions NdbConnection pointer.
- * Return Value: A reference to the transferred operation object
- * or NULL if no success.
- * Remark: Take over the scanning transactions NdbOperation
- * object for a tuple to an update transaction,
- * which is the last operation read in nextScanResult()
- * (theNdbCon->thePreviousScanRec)
- *
- * FUTURE IMPLEMENTATION: (This note was moved from header file.)
- * In the future, it will even be possible to transfer
- * to a NdbConnection on another Ndb-object.
- * In this case the receiving NdbConnection-object must call
- * a method receiveOpFromScan to actually receive the information.
- * This means that the updating transactions can be placed
- * in separate threads and thus increasing the parallelism during
- * the scan process.
- *****************************************************************************/
-NdbOperation*
-NdbOperation::takeOverScanOp(OperationType opType, NdbConnection* updateTrans)
-{
- if (opType != UpdateRequest && opType != DeleteRequest) {
- setErrorCode(4604);
- return NULL;
- }
-
- const NdbScanReceiver* tScanRec = theNdbCon->thePreviousScanRec;
- if (tScanRec == NULL){
- // No operation read by nextScanResult
- setErrorCode(4609);
- return NULL;
- }
-
- if (tScanRec->theFirstKEYINFO20_Recv == NULL){
- // No KEYINFO20 received
- setErrorCode(4608);
- return NULL;
- }
-
- NdbOperation * newOp = updateTrans->getNdbOperation(m_currentTable);
- if (newOp == NULL){
- return NULL;
- }
-
- /**
- * Copy and caclulate attributes from the scanned operation to the
- * new operation
- */
- const KeyInfo20 * const firstKeyInfo20 =
- CAST_CONSTPTR(KeyInfo20, tScanRec->theFirstKEYINFO20_Recv->getDataPtr());
- const Uint32 totalKeyLen = firstKeyInfo20->keyLen;
- newOp->theTupKeyLen = totalKeyLen;
-
- newOp->theOperationType = opType;
- if (opType == DeleteRequest) {
- newOp->theStatus = GetValue;
- } else {
- newOp->theStatus = SetValue;
- }
- const Uint32 tScanInfo = firstKeyInfo20->scanInfo_Node & 0xFFFF;
- const Uint32 tTakeOverNode = firstKeyInfo20->scanInfo_Node >> 16;
- {
- UintR scanInfo = 0;
- TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
- TcKeyReq::setTakeOverScanNode(scanInfo, tTakeOverNode);
- TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
- newOp->theScanInfo = scanInfo;
- }
-
- /**
- * Copy received KEYINFO20 signals into TCKEYREQ and KEYINFO signals
- * put them in list of the new op
- */
- TcKeyReq * const tcKeyReq =
- CAST_PTR(TcKeyReq, newOp->theTCREQ->getDataPtrSend());
-
- // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ
- for (Uint32 i = 0; i < TcKeyReq::MaxKeyInfo; i++) {
- tcKeyReq->keyInfo[i] = firstKeyInfo20->keyData[i];
- }
- if (totalKeyLen > TcKeyReq::MaxKeyInfo) {
-
- Uint32 keyWordsCopied = TcKeyReq::MaxKeyInfo;
-
- // Create KEYINFO signals in newOp
- for (Uint32 i = keyWordsCopied; i < totalKeyLen; i += KeyInfo::DataLength){
- NdbApiSignal* tSignal = theNdb->getSignal();
- if (tSignal == NULL){
- setErrorCodeAbort(4000);
- return NULL;
- }
- if (tSignal->setSignal(GSN_KEYINFO) == -1){
- setErrorCodeAbort(4001);
- return NULL;
- }
- tSignal->next(newOp->theFirstKEYINFO);
- newOp->theFirstKEYINFO = tSignal;
- }
-
- // Init pointers to KEYINFO20 signal
- NdbApiSignal* currKeyInfo20 = tScanRec->theFirstKEYINFO20_Recv;
- const KeyInfo20 * keyInfo20 =
- CAST_CONSTPTR(KeyInfo20, currKeyInfo20->getDataPtr());
- Uint32 posInKeyInfo20 = keyWordsCopied;
-
- // Init pointers to KEYINFO signal
- NdbApiSignal* currKeyInfo = newOp->theFirstKEYINFO;
- KeyInfo * keyInfo = CAST_PTR(KeyInfo, currKeyInfo->getDataPtrSend());
- Uint32 posInKeyInfo = 0;
-
- // Copy from KEYINFO20 to KEYINFO
- while(keyWordsCopied < totalKeyLen){
- keyInfo->keyData[posInKeyInfo++] = keyInfo20->keyData[posInKeyInfo20++];
- keyWordsCopied++;
- if(keyWordsCopied >= totalKeyLen)
- break;
- if (posInKeyInfo20 >=
- (currKeyInfo20->getLength()-KeyInfo20::HeaderLength)){
- currKeyInfo20 = currKeyInfo20->next();
- keyInfo20 = CAST_CONSTPTR(KeyInfo20, currKeyInfo20->getDataPtr());
- posInKeyInfo20 = 0;
- }
- if (posInKeyInfo >= KeyInfo::DataLength){
- currKeyInfo = currKeyInfo->next();
- keyInfo = CAST_PTR(KeyInfo, currKeyInfo->getDataPtrSend());
- posInKeyInfo = 0;
- }
- }
- }
-
- return newOp;
-}
-
-
-
diff --git a/ndb/src/ndbapi/NdbRecAttr.cpp b/ndb/src/ndbapi/NdbRecAttr.cpp
index 0f7baeac4f5..18ce59745d0 100644
--- a/ndb/src/ndbapi/NdbRecAttr.cpp
+++ b/ndb/src/ndbapi/NdbRecAttr.cpp
@@ -57,6 +57,8 @@ NdbRecAttr::setup(const NdbColumnImpl* anAttrInfo, char* aValue)
theAttrSize = tAttrSize;
theArraySize = tArraySize;
theValue = aValue;
+ theNULLind = 0;
+ m_nullable = anAttrInfo->m_nullable;
// check alignment to signal data
// a future version could check alignment per data type as well
@@ -124,3 +126,19 @@ NdbRecAttr::clone() const {
memcpy(ret->theRef, theRef, n);
return ret;
}
+
+bool
+NdbRecAttr::receive_data(const Uint32 * data, Uint32 sz){
+ const Uint32 n = (theAttrSize * theArraySize + 3) >> 2;
+ if(n == sz){
+ if(!copyoutRequired())
+ memcpy(theRef, data, 4 * sz);
+ else
+ memcpy(theValue, data, theAttrSize * theArraySize);
+ return true;
+ } else if(sz == 0){
+ setNULL();
+ return true;
+ }
+ return false;
+}
diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp
index 4c461698a4a..7a538de3d7c 100644
--- a/ndb/src/ndbapi/NdbReceiver.cpp
+++ b/ndb/src/ndbapi/NdbReceiver.cpp
@@ -16,6 +16,10 @@
#include "NdbImpl.hpp"
#include <NdbReceiver.hpp>
+#include "NdbDictionaryImpl.hpp"
+#include <NdbRecAttr.hpp>
+#include <AttributeHeader.hpp>
+#include <NdbConnection.hpp>
NdbReceiver::NdbReceiver(Ndb *aNdb) :
theMagicNumber(0),
@@ -24,10 +28,11 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) :
m_type(NDB_UNINITIALIZED),
m_owner(0)
{
+ theCurrentRecAttr = theFirstRecAttr = 0;
}
void
-NdbReceiver::init(ReceiverType type, void* owner)
+NdbReceiver::init(ReceiverType type, void* owner, bool keyInfo)
{
theMagicNumber = 0x11223344;
m_type = type;
@@ -36,6 +41,24 @@ NdbReceiver::init(ReceiverType type, void* owner)
if (m_ndb)
m_id = m_ndb->theNdbObjectIdMap->map(this);
}
+
+ theFirstRecAttr = NULL;
+ theCurrentRecAttr = NULL;
+ m_key_info = (keyInfo ? 1 : 0);
+ m_defined_rows = 0;
+}
+
+void
+NdbReceiver::release(){
+ NdbRecAttr* tRecAttr = theFirstRecAttr;
+ while (tRecAttr != NULL)
+ {
+ NdbRecAttr* tSaveRecAttr = tRecAttr;
+ tRecAttr = tRecAttr->next();
+ m_ndb->releaseRecAttr(tSaveRecAttr);
+ }
+ theFirstRecAttr = NULL;
+ theCurrentRecAttr = NULL;
}
NdbReceiver::~NdbReceiver()
@@ -44,3 +67,150 @@ NdbReceiver::~NdbReceiver()
m_ndb->theNdbObjectIdMap->unmap(m_id, this);
}
}
+
+NdbRecAttr *
+NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
+ NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
+ if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
+ if (theFirstRecAttr == NULL)
+ theFirstRecAttr = tRecAttr;
+ else
+ theCurrentRecAttr->next(tRecAttr);
+ theCurrentRecAttr = tRecAttr;
+ tRecAttr->next(NULL);
+ return tRecAttr;
+ }
+ if(tRecAttr){
+ m_ndb->releaseRecAttr(tRecAttr);
+ }
+ return 0;
+}
+
+#define KEY_ATTR_ID (~0)
+
+void
+NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
+ m_defined_rows = rows;
+ m_rows = new NdbRecAttr*[rows + 1]; m_rows[rows] = 0;
+
+ NdbColumnImpl key;
+ if(key_size){
+ key.m_attrId = KEY_ATTR_ID;
+ key.m_arraySize = key_size+1;
+ key.m_attrSize = 4;
+ key.m_nullable = true; // So that receive works w.r.t KEYINFO20
+ }
+
+ for(Uint32 i = 0; i<rows; i++){
+ NdbRecAttr * prev = theCurrentRecAttr;
+
+ // Put key-recAttr fir on each row
+ if(key_size && !getValue(&key, (char*)0)){
+ abort();
+ return ; // -1
+ }
+
+ NdbRecAttr* tRecAttr = org->theFirstRecAttr;
+ while(tRecAttr != 0){
+ if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0))
+ tRecAttr = tRecAttr->next();
+ else
+ break;
+ }
+
+ if(tRecAttr){
+ abort();
+ return ;// -1;
+ }
+
+ // Store first recAttr for each row in m_rows[i]
+ if(prev){
+ m_rows[i] = prev->next();
+ } else {
+ m_rows[i] = theFirstRecAttr;
+ }
+ }
+
+ prepareSend();
+ return ; //0;
+}
+
+void
+NdbReceiver::copyout(NdbReceiver & dstRec){
+ NdbRecAttr* src = m_rows[m_current_row++];
+ NdbRecAttr* dst = dstRec.theFirstRecAttr;
+ Uint32 tmp = m_key_info;
+ if(tmp > 0){
+ src = src->next();
+ }
+
+ while(dst){
+ Uint32 len = ((src->theAttrSize * src->theArraySize)+3)/4;
+ dst->receive_data((Uint32*)src->aRef(), len);
+ src = src->next();
+ dst = dst->next();
+ }
+}
+
+int
+NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
+{
+ bool ok = true;
+ NdbRecAttr* currRecAttr = theCurrentRecAttr;
+ NdbRecAttr* prevRecAttr = currRecAttr;
+
+ for (Uint32 used = 0; used < aLength ; used++){
+ AttributeHeader ah(* aDataPtr++);
+ const Uint32 tAttrId = ah.getAttributeId();
+ const Uint32 tAttrSize = ah.getDataSize();
+
+ /**
+ * Set all results to NULL if not found...
+ */
+ while(currRecAttr && currRecAttr->attrId() != tAttrId){
+ ok &= currRecAttr->setNULL();
+ prevRecAttr = currRecAttr;
+ currRecAttr = currRecAttr->next();
+ }
+
+ if(ok && currRecAttr && currRecAttr->receive_data(aDataPtr, tAttrSize)){
+ used += tAttrSize;
+ aDataPtr += tAttrSize;
+ prevRecAttr = currRecAttr;
+ currRecAttr = currRecAttr->next();
+ } else {
+ ndbout_c("%p: ok: %d tAttrId: %d currRecAttr: %p",
+ this,ok, tAttrId, currRecAttr);
+ abort();
+ return -1;
+ }
+ }
+
+ theCurrentRecAttr = currRecAttr;
+
+ /**
+ * Update m_received_result_length
+ */
+ Uint32 tmp = m_received_result_length + aLength;
+ m_received_result_length = tmp;
+
+ return (tmp == m_expected_result_length ? 1 : 0);
+}
+
+int
+NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
+{
+ NdbRecAttr* currRecAttr = m_rows[m_current_row++];
+ assert(currRecAttr->attrId() == KEY_ATTR_ID);
+ currRecAttr->receive_data(aDataPtr, aLength + 1);
+
+ /**
+ * Save scanInfo in the end of keyinfo
+ */
+ ((Uint32*)currRecAttr->aRef())[aLength] = info;
+
+ Uint32 tmp = m_received_result_length + aLength;
+ m_received_result_length = tmp;
+
+ return (tmp == m_expected_result_length ? 1 : 0);
+}
diff --git a/ndb/src/ndbapi/NdbResultSet.cpp b/ndb/src/ndbapi/NdbResultSet.cpp
index 65ed43f60d8..d15c58ba972 100644
--- a/ndb/src/ndbapi/NdbResultSet.cpp
+++ b/ndb/src/ndbapi/NdbResultSet.cpp
@@ -30,7 +30,7 @@
#include <NdbConnection.hpp>
#include <NdbResultSet.hpp>
-NdbResultSet::NdbResultSet(NdbCursorOperation *owner)
+NdbResultSet::NdbResultSet(NdbScanOperation *owner)
: m_operation(owner)
{
}
@@ -55,49 +55,22 @@ void NdbResultSet::close()
NdbOperation*
NdbResultSet::updateTuple(){
- if(m_operation->cursorType() != NdbCursorOperation::ScanCursor){
- m_operation->setErrorCode(4003);
- return 0;
- }
-
- NdbScanOperation * op = (NdbScanOperation*)(m_operation);
- return op->takeOverScanOp(NdbOperation::UpdateRequest,
- op->m_transConnection);
+ return updateTuple(m_operation->m_transConnection);
}
NdbOperation*
NdbResultSet::updateTuple(NdbConnection* takeOverTrans){
- if(m_operation->cursorType() != NdbCursorOperation::ScanCursor){
- m_operation->setErrorCode(4003);
- return 0;
- }
-
return m_operation->takeOverScanOp(NdbOperation::UpdateRequest,
takeOverTrans);
}
int
NdbResultSet::deleteTuple(){
- if(m_operation->cursorType() != NdbCursorOperation::ScanCursor){
- m_operation->setErrorCode(4003);
- return 0;
- }
-
- NdbScanOperation * op = (NdbScanOperation*)(m_operation);
- void * res = op->takeOverScanOp(NdbOperation::DeleteRequest,
- op->m_transConnection);
- if(res == 0)
- return -1;
- return 0;
+ return deleteTuple(m_operation->m_transConnection);
}
int
NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){
- if(m_operation->cursorType() != NdbCursorOperation::ScanCursor){
- m_operation->setErrorCode(4003);
- return 0;
- }
-
void * res = m_operation->takeOverScanOp(NdbOperation::DeleteRequest,
takeOverTrans);
if(res == 0)
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index 8064eef838e..8a22d6a3c0f 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -26,36 +26,57 @@
* Adjust: 2002-04-01 UABMASD First version.
****************************************************************************/
-#include <ndb_global.h>
#include <Ndb.hpp>
#include <NdbScanOperation.hpp>
+#include <NdbIndexScanOperation.hpp>
#include <NdbConnection.hpp>
#include <NdbResultSet.hpp>
#include "NdbApiSignal.hpp"
#include <NdbOut.hpp>
#include "NdbDictionaryImpl.hpp"
+#include <NdbRecAttr.hpp>
+#include <NdbReceiver.hpp>
+
+#include <stdlib.h>
+#include <NdbSqlUtil.hpp>
+
+#include <signaldata/ScanTab.hpp>
+#include <signaldata/KeyInfo.hpp>
+#include <signaldata/TcKeyReq.hpp>
+
NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
- NdbCursorOperation(aNdb),
- m_transConnection(NULL),
- m_autoExecute(false),
- m_updateOp(false),
- m_deleteOp(false),
- m_setValueList(new SetValueRecList())
+ NdbOperation(aNdb),
+ m_resultSet(0),
+ m_transConnection(NULL)
{
+ theParallelism = 0;
+ m_allocated_receivers = 0;
+ m_prepared_receivers = 0;
+ m_api_receivers = 0;
+ m_conf_receivers = 0;
+ m_sent_receivers = 0;
+ m_receivers = 0;
}
NdbScanOperation::~NdbScanOperation()
{
- if (m_setValueList) delete m_setValueList;
+ fix_receivers(0, false);
+ if (m_resultSet)
+ delete m_resultSet;
}
-NdbCursorOperation::CursorType
-NdbScanOperation::cursorType()
+NdbResultSet*
+NdbScanOperation::getResultSet()
{
- return NdbCursorOperation::ScanCursor;
+ if (!m_resultSet)
+ m_resultSet = new NdbResultSet(this);
+
+ return m_resultSet;
}
+
+
void
NdbScanOperation::setErrorCode(int aErrorCode){
NdbConnection* tmp = theNdbCon;
@@ -90,267 +111,516 @@ NdbScanOperation::init(NdbTableImpl* tab, NdbConnection* myConnection)
setErrorCodeAbort(theNdb->getNdbError().code);
return -1;
}
- aScanConnection->theFirstOpInList = this;
- aScanConnection->theLastOpInList = this;
- NdbCursorOperation::cursInit();
- // NOTE! The hupped trans becomes the owner of the operation
- return NdbOperation::init(tab, aScanConnection);
-}
-
-NdbResultSet* NdbScanOperation::readTuples(Uint32 parallell,
- NdbCursorOperation::LockMode lm)
-{
- int res = 0;
- switch(lm){
- case NdbCursorOperation::LM_Read:
- parallell = (parallell == 0 ? 240 : parallell);
- res = openScan(parallell, false, true, false);
- break;
- case NdbCursorOperation::LM_Exclusive:
- parallell = (parallell == 0 ? 1 : parallell);
- res = openScan(parallell, true, true, false);
- break;
- case NdbCursorOperation::LM_Dirty:
- parallell = (parallell == 0 ? 240 : parallell);
- res = openScan(parallell, false, false, true);
- break;
- default:
- res = -1;
- setErrorCode(4003);
- }
- if(res == -1){
- return NULL;
- }
- theNdbCon->theFirstOpInList = 0;
- theNdbCon->theLastOpInList = 0;
- return getResultSet();
-}
-
-int NdbScanOperation::updateTuples(Uint32 parallelism)
-{
- if (openScanExclusive(parallelism) == -1) {
+ // NOTE! The hupped trans becomes the owner of the operation
+ if(NdbOperation::init(tab, aScanConnection) != 0){
return -1;
}
- theNdbCon->theFirstOpInList = 0;
- theNdbCon->theLastOpInList = 0;
-
- m_updateOp = true;
+
+ initInterpreter();
+
+ theStatus = GetValue;
+ theOperationType = OpenScanRequest;
+
+ theTotalBoundAI_Len = 0;
+ theBoundATTRINFO = NULL;
return 0;
}
-int NdbScanOperation::deleteTuples(Uint32 parallelism)
+NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
+ Uint32 batch,
+ Uint32 parallell)
{
- if (openScanExclusive(parallelism) == -1) {
- return -1;
- }
- theNdbCon->theFirstOpInList = 0;
- theNdbCon->theLastOpInList = 0;
+ m_ordered = 0;
- m_deleteOp = true;
+ Uint32 fragCount = m_currentTable->m_fragmentCount;
- return 0;
-}
+ if(batch + parallell == 0){ // Max speed
+ batch = 16;
+ parallell = fragCount;
+ }
-int NdbScanOperation::setValue(const char* anAttrName, const char* aValue, Uint32 len)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrName) == NULL)
- return -1;
+ if(batch == 0 && parallell > 0){ // Backward
+ batch = (parallell >= 16 ? 16 : parallell & 15);
+ parallell = (parallell + 15) / 16;
+
+ if(parallell == 0)
+ parallell = 1;
+ }
- m_setValueList->add(anAttrName, aValue, len);
- return 0;
-}
-
-int NdbScanOperation::setValue(const char* anAttrName, Int32 aValue)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrName) == NULL)
- return -1;
-
- m_setValueList->add(anAttrName, aValue);
- return 0;
-}
-
-int NdbScanOperation::setValue(const char* anAttrName, Uint32 aValue)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrName) == NULL)
- return -1;
+ if(parallell > fragCount)
+ parallell = fragCount;
+ else if(parallell == 0)
+ parallell = fragCount;
+
+ assert(parallell > 0);
+
+ // It is only possible to call openScan if
+ // 1. this transcation don't already contain another scan operation
+ // 2. this transaction don't already contain other operations
+ // 3. theScanOp contains a NdbScanOperation
+ if (theNdbCon->theScanningOp != NULL){
+ setErrorCode(4605);
+ return 0;
+ }
- m_setValueList->add(anAttrName, aValue);
- return 0;
-}
+ theNdbCon->theScanningOp = this;
-int NdbScanOperation::setValue(const char* anAttrName, Uint64 aValue)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrName) == NULL)
- return -1;
+ bool lockExcl, lockHoldMode, readCommitted;
+ switch(lm){
+ case NdbScanOperation::LM_Read:
+ lockExcl = false;
+ lockHoldMode = true;
+ readCommitted = false;
+ break;
+ case NdbScanOperation::LM_Exclusive:
+ lockExcl = true;
+ lockHoldMode = true;
+ readCommitted = false;
+ break;
+ case NdbScanOperation::LM_Dirty:
+ lockExcl = false;
+ lockHoldMode = false;
+ readCommitted = true;
+ break;
+ default:
+ setErrorCode(4003);
+ return 0;
+ }
- m_setValueList->add(anAttrName, aValue);
- return 0;
-}
+ m_keyInfo = lockExcl;
+
+ bool range = false;
+ if (m_currentTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
+ m_currentTable->m_indexType == NdbDictionary::Index::UniqueOrderedIndex){
+ assert(m_currentTable == m_accessTable);
+ m_currentTable = theNdb->theDictionary->
+ getTable(m_currentTable->m_primaryTable.c_str());
+ assert(m_currentTable != NULL);
+ // Modify operation state
+ theStatus = SetBound;
+ theOperationType = OpenRangeScanRequest;
+ range = true;
+ }
+
+ theParallelism = parallell;
+ theBatchSize = batch;
-int NdbScanOperation::setValue(const char* anAttrName, Int64 aValue)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrName) == NULL)
- return -1;
+ if(fix_receivers(parallell, lockExcl) == -1){
+ setErrorCodeAbort(4000);
+ return 0;
+ }
+
+ theSCAN_TABREQ = theNdb->getSignal();
+ if (theSCAN_TABREQ == NULL) {
+ setErrorCodeAbort(4000);
+ return 0;
+ }//if
+
+ ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
+ req->apiConnectPtr = theNdbCon->theTCConPtr;
+ req->tableId = m_accessTable->m_tableId;
+ req->tableSchemaVersion = m_accessTable->m_version;
+ req->storedProcId = 0xFFFF;
+ req->buddyConPtr = theNdbCon->theBuddyConPtr;
+
+ Uint32 reqInfo = 0;
+ ScanTabReq::setParallelism(reqInfo, parallell);
+ ScanTabReq::setScanBatch(reqInfo, batch);
+ ScanTabReq::setLockMode(reqInfo, lockExcl);
+ ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
+ ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
+ ScanTabReq::setRangeScanFlag(reqInfo, range);
+ req->requestInfo = reqInfo;
- m_setValueList->add(anAttrName, aValue);
- return 0;
-}
+ Uint64 transId = theNdbCon->getTransactionId();
+ req->transId1 = (Uint32) transId;
+ req->transId2 = (Uint32) (transId >> 32);
-int NdbScanOperation::setValue(const char* anAttrName, float aValue)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrName) == NULL)
- return -1;
+ getFirstATTRINFOScan();
- m_setValueList->add(anAttrName, aValue);
- return 0;
+ return getResultSet();
}
-int NdbScanOperation::setValue(const char* anAttrName, double aValue)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrName) == NULL)
- return -1;
+int
+NdbScanOperation::fix_receivers(Uint32 parallell, bool keyInfo){
+ if(parallell == 0 || parallell > m_allocated_receivers){
+ if(m_prepared_receivers) delete[] m_prepared_receivers;
+ if(m_receivers) delete[] m_receivers;
+ if(m_api_receivers) delete[] m_api_receivers;
+ if(m_conf_receivers) delete[] m_conf_receivers;
+ if(m_sent_receivers) delete[] m_sent_receivers;
+
+ m_allocated_receivers = parallell;
+ if(parallell == 0){
+ return 0;
+ }
+
+ m_prepared_receivers = new Uint32[parallell];
+ m_receivers = new NdbReceiver*[parallell];
+ m_api_receivers = new NdbReceiver*[parallell];
+ m_conf_receivers = new NdbReceiver*[parallell];
+ m_sent_receivers = new NdbReceiver*[parallell];
+
+ NdbReceiver* tScanRec;
+ for (Uint32 i = 0; i < parallell; i ++) {
+ tScanRec = theNdb->getNdbScanRec();
+ if (tScanRec == NULL) {
+ setErrorCodeAbort(4000);
+ return -1;
+ }//if
+ m_receivers[i] = tScanRec;
+ tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this, keyInfo);
+ }
+ }
- m_setValueList->add(anAttrName, aValue);
+ for(Uint32 i = 0; i<parallell; i++){
+ m_receivers[i]->m_list_index = i;
+ m_prepared_receivers[i] = m_receivers[i]->getId();
+ m_sent_receivers[i] = m_receivers[i];
+ m_conf_receivers[i] = 0;
+ m_api_receivers[i] = 0;
+ }
+
+ m_api_receivers_count = 0;
+ m_current_api_receiver = 0;
+ m_sent_receivers_count = parallell;
+ m_conf_receivers_count = 0;
return 0;
}
-
-int NdbScanOperation::setValue(Uint32 anAttrId, const char* aValue, Uint32 len)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrId) == NULL)
- return -1;
-
- m_setValueList->add(anAttrId, aValue, len);
- return 0;
+/**
+ * Move receiver from send array to conf:ed array
+ */
+void
+NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
+ Uint32 idx = tRec->m_list_index;
+ Uint32 last = m_sent_receivers_count - 1;
+ if(idx != last){
+ NdbReceiver * move = m_sent_receivers[last];
+ m_sent_receivers[idx] = move;
+ move->m_list_index = idx;
+ }
+ m_sent_receivers_count = last;
+
+ last = m_conf_receivers_count;
+ m_conf_receivers[last] = tRec;
+ m_conf_receivers_count = last + 1;
+ tRec->m_list_index = last;
+ tRec->m_current_row = 0;
}
-int NdbScanOperation::setValue(Uint32 anAttrId, Int32 aValue)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrId) == NULL)
- return -1;
-
- m_setValueList->add(anAttrId, aValue);
- return 0;
+/**
+ * Remove receiver as it's completed
+ */
+void
+NdbScanOperation::receiver_completed(NdbReceiver* tRec){
+ Uint32 idx = tRec->m_list_index;
+ Uint32 last = m_sent_receivers_count - 1;
+ if(idx != last){
+ NdbReceiver * move = m_sent_receivers[last];
+ m_sent_receivers[idx] = move;
+ move->m_list_index = idx;
+ }
+ m_sent_receivers_count = last;
}
-int NdbScanOperation::setValue(Uint32 anAttrId, Uint32 aValue)
+/*****************************************************************************
+ * int getFirstATTRINFOScan( U_int32 aData )
+ *
+ * Return Value: Return 0: Successful
+ * Return -1: All other cases
+ * Parameters: None: Only allocate the first signal.
+ * Remark: When a scan is defined we need to use this method instead
+ * of insertATTRINFO for the first signal.
+ * This is because we need not to mess up the code in
+ * insertATTRINFO with if statements since we are not
+ * interested in the TCKEYREQ signal.
+ *****************************************************************************/
+int
+NdbScanOperation::getFirstATTRINFOScan()
{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrId) == NULL)
- return -1;
+ NdbApiSignal* tSignal;
- m_setValueList->add(anAttrId, aValue);
- return 0;
-}
-
-int NdbScanOperation::setValue(Uint32 anAttrId, Uint64 aValue)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrId) == NULL)
- return -1;
+ tSignal = theNdb->getSignal();
+ if (tSignal == NULL){
+ setErrorCodeAbort(4000);
+ return -1;
+ }
+ tSignal->setSignal(m_attrInfoGSN);
+ theAI_LenInCurrAI = 8;
+ theATTRINFOptr = &tSignal->getDataPtrSend()[8];
+ theFirstATTRINFO = tSignal;
+ theCurrentATTRINFO = tSignal;
+ theCurrentATTRINFO->next(NULL);
- m_setValueList->add(anAttrId, aValue);
return 0;
}
-int NdbScanOperation::setValue(Uint32 anAttrId, Int64 aValue)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrId) == NULL)
- return -1;
+/**
+ * Constats for theTupleKeyDefined[][0]
+ */
+#define SETBOUND_EQ 1
+#define FAKE_PTR 2
+#define API_PTR 3
- m_setValueList->add(anAttrId, aValue);
- return 0;
-}
-int NdbScanOperation::setValue(Uint32 anAttrId, float aValue)
+/*
+ * After setBound() are done, move the accumulated ATTRINFO signals to
+ * a separate list. Then continue with normal scan.
+ */
+int
+NdbScanOperation::saveBoundATTRINFO()
{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrId) == NULL)
- return -1;
-
- m_setValueList->add(anAttrId, aValue);
- return 0;
+ theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
+ theBoundATTRINFO = theFirstATTRINFO;
+ theTotalBoundAI_Len = theTotalCurrAI_Len;
+ theTotalCurrAI_Len = 5;
+ theBoundATTRINFO->setData(theTotalBoundAI_Len, 4);
+ theBoundATTRINFO->setData(0, 5);
+ theBoundATTRINFO->setData(0, 6);
+ theBoundATTRINFO->setData(0, 7);
+ theBoundATTRINFO->setData(0, 8);
+ theStatus = GetValue;
+
+ int res = getFirstATTRINFOScan();
+
+ /**
+ * Define each key with getValue (if ordered)
+ * unless the one's with EqBound
+ */
+ if(!res && m_ordered){
+ Uint32 idx = 0;
+ Uint32 cnt = m_currentTable->getNoOfPrimaryKeys();
+ while(!theTupleKeyDefined[idx][0] && idx < cnt){
+ NdbColumnImpl* col = m_currentTable->getColumn(idx);
+ NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
+ UintPtr newVal = UintPtr(tmp);
+ theTupleKeyDefined[idx][0] = FAKE_PTR;
+ theTupleKeyDefined[idx][1] = (newVal & 0xFFFFFFFF);
+#if (SIZEOF_CHARP == 8)
+ theTupleKeyDefined[idx][2] = (newVal >> 32);
+#endif
+ idx++;
+ }
+ }
+ return res;
}
-int NdbScanOperation::setValue(Uint32 anAttrId, double aValue)
-{
- // Check if attribute exist
- if (m_currentTable->getColumn(anAttrId) == NULL)
- return -1;
+#define WAITFOR_SCAN_TIMEOUT 120000
- m_setValueList->add(anAttrId, aValue);
- return 0;
+int
+NdbScanOperation::executeCursor(int nodeId){
+ NdbConnection * tCon = theNdbCon;
+ TransporterFacade* tp = TransporterFacade::instance();
+ Guard guard(tp->theMutexPtr);
+ Uint32 seq = tCon->theNodeSequence;
+ if (tp->get_node_alive(nodeId) &&
+ (tp->getNodeSequence(nodeId) == seq)) {
+
+ if(prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)
+ return -1;
+
+ tCon->theMagicNumber = 0x37412619;
+
+ if (doSendScan(nodeId) == -1)
+ return -1;
+
+ return 0;
+ } else {
+ if (!(tp->get_node_stopping(nodeId) &&
+ (tp->getNodeSequence(nodeId) == seq))){
+ TRACE_DEBUG("The node is hard dead when attempting to start a scan");
+ setErrorCode(4029);
+ tCon->theReleaseOnClose = true;
+ abort();
+ } else {
+ TRACE_DEBUG("The node is stopping when attempting to start a scan");
+ setErrorCode(4030);
+ }//if
+ tCon->theCommitStatus = Aborted;
+ }//if
+ return -1;
}
-// Private methods
-
-int NdbScanOperation::executeCursor(int ProcessorId)
+int NdbScanOperation::nextResult(bool fetchAllowed)
{
- int result = theNdbCon->executeScan();
- // If the scan started ok and we are updating or deleting
- // iterate over all tuples
- if ((m_updateOp) || (m_deleteOp)) {
- NdbOperation* newOp;
-
- while ((result != -1) && (nextResult() == 0)) {
- if (m_updateOp) {
- newOp = takeOverScanOp(UpdateRequest, m_transConnection);
- // Pass setValues from scan operation to new operation
- m_setValueList->iterate(SetValueRecList::callSetValueFn, *newOp);
- // No need to call updateTuple since scan was taken over for update
- // it should be the same with delete - MASV
- // newOp->updateTuple();
- }
- else if (m_deleteOp) {
- newOp = takeOverScanOp(DeleteRequest, m_transConnection);
- // newOp->deleteTuple();
+ if(m_ordered)
+ return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed);
+
+ /**
+ * Check current receiver
+ */
+ int retVal = 2;
+ Uint32 idx = m_current_api_receiver;
+ Uint32 last = m_api_receivers_count;
+
+ /**
+ * Check next buckets
+ */
+ for(; idx < last; idx++){
+ NdbReceiver* tRec = m_api_receivers[idx];
+ if(tRec->nextResult()){
+ tRec->copyout(theReceiver);
+ retVal = 0;
+ break;
+ }
+ }
+
+ /**
+ * We have advanced atleast one bucket
+ */
+ if(!fetchAllowed){
+ m_current_api_receiver = idx;
+ return retVal;
+ }
+
+ Uint32 nodeId = theNdbCon->theDBnode;
+ TransporterFacade* tp = TransporterFacade::instance();
+ Guard guard(tp->theMutexPtr);
+ Uint32 seq = theNdbCon->theNodeSequence;
+ if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){
+
+ idx = m_current_api_receiver;
+ last = m_api_receivers_count;
+
+ do {
+ Uint32 cnt = m_conf_receivers_count;
+ Uint32 sent = m_sent_receivers_count;
+
+ if(cnt > 0){
+ /**
+ * Just move completed receivers
+ */
+ memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*));
+ last += cnt;
+ m_conf_receivers_count = 0;
+ } else if(retVal == 2 && sent > 0){
+ /**
+ * No completed...
+ */
+ theNdb->theWaiter.m_node = nodeId;
+ theNdb->theWaiter.m_state = WAIT_SCAN;
+ int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+ if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+ continue;
+ } else {
+ idx = last;
+ retVal = -1; //return_code;
+ }
+ } else if(retVal == 2){
+ /**
+ * No completed & no sent -> EndOfData
+ */
+ if(send_next_scan(0, true) == 0){ // Close scan
+ theNdb->theWaiter.m_node = nodeId;
+ theNdb->theWaiter.m_state = WAIT_SCAN;
+ int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+ if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+ return 1;
+ }
+ retVal = -1; //return_code;
+ } else {
+ retVal = -3;
+ }
+ idx = last;
}
-#if 0
- // takeOverScanOp will take over the lock that scan aquired
- // the lock is released when nextScanResult is called
- // That means that the "takeover" has to be sent to the kernel
- // before nextScanresult is called - MASV
- if (m_autoExecute){
- m_transConnection->execute(NoCommit);
+
+ if(retVal == 0)
+ break;
+
+ for(; idx < last; idx++){
+ NdbReceiver* tRec = m_api_receivers[idx];
+ if(tRec->nextResult()){
+ tRec->copyout(theReceiver);
+ retVal = 0;
+ break;
+ }
}
-#else
- m_transConnection->execute(NoCommit);
-#endif
- }
- closeScan();
+ } while(retVal == 2);
+ } else {
+ retVal = -3;
}
-
- return result;
+
+ m_api_receivers_count = last;
+ m_current_api_receiver = idx;
+
+ switch(retVal){
+ case 0:
+ case 1:
+ case 2:
+ return retVal;
+ case -1:
+ setErrorCode(4008); // Timeout
+ break;
+ case -2:
+ setErrorCode(4028); // Node fail
+ break;
+ case -3: // send_next_scan -> return fail (set error-code self)
+ break;
+ }
+
+ theNdbCon->theTransactionIsStarted = false;
+ theNdbCon->theReleaseOnClose = true;
+ return -1;
}
-int NdbScanOperation::nextResult(bool fetchAllowed)
-{
- int result = theNdbCon->nextScanResult(fetchAllowed);
- if (result == -1){
- // Move the error code from hupped transaction
- // to the real trans
- const NdbError err = theNdbCon->getNdbError();
- m_transConnection->setOperationErrorCode(err.code);
+int
+NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
+ if(cnt > 0 || stopScanFlag){
+ NdbApiSignal tSignal(theNdb->theMyRef);
+ tSignal.setSignal(GSN_SCAN_NEXTREQ);
+
+ Uint32* theData = tSignal.getDataPtrSend();
+ theData[0] = theNdbCon->theTCConPtr;
+ theData[1] = stopScanFlag == true ? 1 : 0;
+ Uint64 transId = theNdbCon->theTransactionId;
+ theData[2] = transId;
+ theData[3] = (Uint32) (transId >> 32);
+
+ /**
+ * Prepare ops
+ */
+ Uint32 last = m_sent_receivers_count;
+ Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
+ for(Uint32 i = 0; i<cnt; i++){
+ NdbReceiver * tRec = m_api_receivers[i];
+ m_sent_receivers[last+i] = tRec;
+ tRec->m_list_index = last+i;
+ prep_array[i] = tRec->m_tcPtrI;
+ tRec->prepareSend();
+ }
+ memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*));
+
+ Uint32 nodeId = theNdbCon->theDBnode;
+ TransporterFacade * tp = TransporterFacade::instance();
+ int ret;
+ if(cnt > 21){
+ tSignal.setLength(4);
+ LinearSectionPtr ptr[3];
+ ptr[0].p = prep_array;
+ ptr[0].sz = cnt;
+ ret = tp->sendFragmentedSignal(&tSignal, nodeId, ptr, 1);
+ } else {
+ tSignal.setLength(4+cnt);
+ ret = tp->sendSignal(&tSignal, nodeId);
+ }
+
+ m_sent_receivers_count = last + cnt + stopScanFlag;
+ m_api_receivers_count -= cnt;
+ m_current_api_receiver = 0;
+
+ return ret;
}
- return result;
+ return 0;
}
int
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
{
printf("NdbScanOperation::prepareSend\n");
+ abort();
return 0;
}
@@ -363,300 +633,689 @@ NdbScanOperation::doSend(int ProcessorId)
void NdbScanOperation::closeScan()
{
- if(theNdbCon){
- if (theNdbCon->stopScan() == -1)
- theError = theNdbCon->getNdbError();
- theNdb->closeTransaction(theNdbCon);
- theNdbCon = 0;
- }
- m_transConnection = NULL;
-}
+ do {
+ TransporterFacade* tp = TransporterFacade::instance();
+ Guard guard(tp->theMutexPtr);
-void NdbScanOperation::release(){
- closeScan();
- NdbCursorOperation::release();
-}
+ Uint32 seq = theNdbCon->theNodeSequence;
+ Uint32 nodeId = theNdbCon->theDBnode;
-void SetValueRecList::add(const char* anAttrName, const char* aValue, Uint32 len)
-{
- SetValueRec* newSetValueRec = new SetValueRec();
+ if(seq != tp->getNodeSequence(nodeId)){
+ theNdbCon->theReleaseOnClose = true;
+ break;
+ }
+
+ /**
+ * Wait for all running scans...
+ */
+ while(m_sent_receivers_count){
+ theNdb->theWaiter.m_node = nodeId;
+ theNdb->theWaiter.m_state = WAIT_SCAN;
+ int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+ switch(return_code){
+ case 0:
+ break;
+ case -1:
+ setErrorCode(4008);
+ case -2:
+ m_sent_receivers_count = 0;
+ m_api_receivers_count = 0;
+ m_conf_receivers_count = 0;
+ }
+ }
+
+ if(seq != tp->getNodeSequence(nodeId)){
+ theNdbCon->theReleaseOnClose = true;
+ break;
+ }
+
+ if(m_api_receivers_count+m_conf_receivers_count){
+ // Send close scan
+ send_next_scan(0, true); // Close scan
+
+ /**
+ * wait for close scan conf
+ */
+ do {
+ theNdb->theWaiter.m_node = nodeId;
+ theNdb->theWaiter.m_state = WAIT_SCAN;
+ int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+ switch(return_code){
+ case 0:
+ break;
+ case -1:
+ setErrorCode(4008);
+ case -2:
+ m_api_receivers_count = 0;
+ m_conf_receivers_count = 0;
+ }
+ } while(m_api_receivers_count+m_conf_receivers_count);
+ }
+ } while(0);
+
+ theNdbCon->theScanningOp = 0;
+ theNdb->closeTransaction(theNdbCon);
+
+ theNdbCon = 0;
+ m_transConnection = NULL;
+}
- newSetValueRec->stype = SetValueRec::SET_STRING_ATTR1;
- newSetValueRec->anAttrName = strdup(anAttrName);
- newSetValueRec->stringStruct.aStringValue = (char *) malloc(len);
- strlcpy(newSetValueRec->stringStruct.aStringValue, aValue, len);
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
+void
+NdbScanOperation::execCLOSE_SCAN_REP(Uint32 errCode){
+ /**
+ * We will receive no further signals from this scan
+ */
+ if(!errCode){
+ /**
+ * Normal termination
+ */
+ theNdbCon->theCommitStatus = Committed;
+ theNdbCon->theCompletionStatus = CompletedSuccess;
+ } else {
+ /**
+ * Something is fishy
+ */
+ abort();
}
+ m_api_receivers_count = 0;
+ m_conf_receivers_count = 0;
+ m_sent_receivers_count = 0;
}
-void SetValueRecList::add(const char* anAttrName, Int32 aValue)
+void NdbScanOperation::release()
{
- SetValueRec* newSetValueRec = new SetValueRec();
-
- newSetValueRec->stype = SetValueRec::SET_INT32_ATTR1;
- newSetValueRec->anAttrName = strdup(anAttrName);
- newSetValueRec->anInt32Value = aValue;
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
+ if(theNdbCon != 0 || m_transConnection != 0){
+ closeScan();
+ }
+ for(Uint32 i = 0; i<m_allocated_receivers; i++){
+ m_receivers[i]->release();
}
}
-void SetValueRecList::add(const char* anAttrName, Uint32 aValue)
-{
- SetValueRec* newSetValueRec = new SetValueRec();
+/***************************************************************************
+int prepareSendScan(Uint32 aTC_ConnectPtr,
+ Uint64 aTransactionId)
+
+Return Value: Return 0 : preparation of send was succesful.
+ Return -1: In all other case.
+Parameters: aTC_ConnectPtr: the Connect pointer to TC.
+ aTransactionId: the Transaction identity of the transaction.
+Remark: Puts the the final data into ATTRINFO signal(s) after this
+ we know the how many signal to send and their sizes
+***************************************************************************/
+int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
+ Uint64 aTransactionId){
+
+ if (theInterpretIndicator != 1 ||
+ (theOperationType != OpenScanRequest &&
+ theOperationType != OpenRangeScanRequest)) {
+ setErrorCodeAbort(4005);
+ return -1;
+ }
- newSetValueRec->stype = SetValueRec::SET_UINT32_ATTR1;
- newSetValueRec->anAttrName = strdup(anAttrName);
- newSetValueRec->anUint32Value = aValue;
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
+ if (theStatus == SetBound) {
+ saveBoundATTRINFO();
+ theStatus = GetValue;
}
-}
-void SetValueRecList::add(const char* anAttrName, Int64 aValue)
-{
- SetValueRec* newSetValueRec = new SetValueRec();
+ theErrorLine = 0;
- newSetValueRec->stype = SetValueRec::SET_INT64_ATTR1;
- newSetValueRec->anAttrName = strdup(anAttrName);
- newSetValueRec->anInt64Value = aValue;
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
+ // In preapareSendInterpreted we set the sizes (word 4-8) in the
+ // first ATTRINFO signal.
+ if (prepareSendInterpreted() == -1)
+ return -1;
+
+ if(m_ordered){
+ ((NdbIndexScanOperation*)this)->fix_get_values();
+ }
+
+ const Uint32 transId1 = (Uint32) (aTransactionId & 0xFFFFFFFF);
+ const Uint32 transId2 = (Uint32) (aTransactionId >> 32);
+
+ if (theOperationType == OpenRangeScanRequest) {
+ NdbApiSignal* tSignal = theBoundATTRINFO;
+ do{
+ tSignal->setData(aTC_ConnectPtr, 1);
+ tSignal->setData(transId1, 2);
+ tSignal->setData(transId2, 3);
+ tSignal = tSignal->next();
+ } while (tSignal != NULL);
+ }
+ theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
+ NdbApiSignal* tSignal = theFirstATTRINFO;
+ do{
+ tSignal->setData(aTC_ConnectPtr, 1);
+ tSignal->setData(transId1, 2);
+ tSignal->setData(transId2, 3);
+ tSignal = tSignal->next();
+ } while (tSignal != NULL);
+
+ /**
+ * Prepare all receivers
+ */
+ theReceiver.prepareSend();
+ bool keyInfo = m_keyInfo;
+ Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;
+ for(Uint32 i = 0; i<theParallelism; i++){
+ m_receivers[i]->do_get_value(&theReceiver, theBatchSize, key_size);
}
+ return 0;
}
-void SetValueRecList::add(const char* anAttrName, Uint64 aValue)
+/******************************************************************************
+int doSend()
+
+Return Value: Return >0 : send was succesful, returns number of signals sent
+ Return -1: In all other case.
+Parameters: aProcessorId: Receiving processor node
+Remark: Sends the ATTRINFO signal(s)
+******************************************************************************/
+int
+NdbScanOperation::doSendScan(int aProcessorId)
{
- SetValueRec* newSetValueRec = new SetValueRec();
+ Uint32 tSignalCount = 0;
+ NdbApiSignal* tSignal;
+
+ if (theInterpretIndicator != 1 ||
+ (theOperationType != OpenScanRequest &&
+ theOperationType != OpenRangeScanRequest)) {
+ setErrorCodeAbort(4005);
+ return -1;
+ }
+
+ assert(theSCAN_TABREQ != NULL);
+ tSignal = theSCAN_TABREQ;
+ if (tSignal->setSignal(GSN_SCAN_TABREQ) == -1) {
+ setErrorCode(4001);
+ return -1;
+ }
+ // Update the "attribute info length in words" in SCAN_TABREQ before
+ // sending it. This could not be done in openScan because
+ // we created the ATTRINFO signals after the SCAN_TABREQ signal.
+ ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
+ req->attrLen = theTotalCurrAI_Len;
+ if (theOperationType == OpenRangeScanRequest)
+ req->attrLen += theTotalBoundAI_Len;
+ TransporterFacade *tp = TransporterFacade::instance();
+ if(theParallelism > 16){
+ LinearSectionPtr ptr[3];
+ ptr[0].p = m_prepared_receivers;
+ ptr[0].sz = theParallelism;
+ if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) {
+ setErrorCode(4002);
+ return -1;
+ }
+ } else {
+ tSignal->setLength(9+theParallelism);
+ memcpy(tSignal->getDataPtrSend()+9, m_prepared_receivers, 4*theParallelism);
+ if (tp->sendSignal(tSignal, aProcessorId) == -1) {
+ setErrorCode(4002);
+ return -1;
+ }
+ }
- newSetValueRec->stype = SetValueRec::SET_UINT64_ATTR1;
- newSetValueRec->anAttrName = strdup(anAttrName);
- newSetValueRec->anUint64Value = aValue;
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
+ if (theOperationType == OpenRangeScanRequest) {
+ // must have at least one signal since it contains attrLen for bounds
+ assert(theBoundATTRINFO != NULL);
+ tSignal = theBoundATTRINFO;
+ while (tSignal != NULL) {
+ if (tp->sendSignal(tSignal,aProcessorId) == -1){
+ setErrorCode(4002);
+ return -1;
+ }
+ tSignalCount++;
+ tSignal = tSignal->next();
+ }
}
-}
+
+ tSignal = theFirstATTRINFO;
+ while (tSignal != NULL) {
+ if (tp->sendSignal(tSignal,aProcessorId) == -1){
+ setErrorCode(4002);
+ return -1;
+ }
+ tSignalCount++;
+ tSignal = tSignal->next();
+ }
+ theStatus = WaitResponse;
+ return tSignalCount;
+}//NdbOperation::doSendScan()
+
+/******************************************************************************
+ * NdbOperation* takeOverScanOp(NdbConnection* updateTrans);
+ *
+ * Parameters: The update transactions NdbConnection pointer.
+ * Return Value: A reference to the transferred operation object
+ * or NULL if no success.
+ * Remark: Take over the scanning transactions NdbOperation
+ * object for a tuple to an update transaction,
+ * which is the last operation read in nextScanResult()
+ * (theNdbCon->thePreviousScanRec)
+ *
+ * FUTURE IMPLEMENTATION: (This note was moved from header file.)
+ * In the future, it will even be possible to transfer
+ * to a NdbConnection on another Ndb-object.
+ * In this case the receiving NdbConnection-object must call
+ * a method receiveOpFromScan to actually receive the information.
+ * This means that the updating transactions can be placed
+ * in separate threads and thus increasing the parallelism during
+ * the scan process.
+ *****************************************************************************/
+NdbOperation*
+NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
+
+ Uint32 idx = m_current_api_receiver;
+ Uint32 last = m_api_receivers_count;
+
+ Uint32 row;
+ NdbReceiver * tRec;
+ NdbRecAttr * tRecAttr;
+ if(idx < last && (tRec = m_api_receivers[idx])
+ && ((row = tRec->m_current_row) <= tRec->m_defined_rows)
+ && (tRecAttr = tRec->m_rows[row-1])){
+
+ NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable);
+ if (newOp == NULL){
+ return NULL;
+ }
+
+ const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;
+
+ newOp->theTupKeyLen = len;
+ newOp->theOperationType = opType;
+ if (opType == DeleteRequest) {
+ newOp->theStatus = GetValue;
+ } else {
+ newOp->theStatus = SetValue;
+ }
+
+ const Uint32 * src = (Uint32*)tRecAttr->aRef();
+ const Uint32 tScanInfo = src[len] & 0xFFFF;
+ const Uint32 tTakeOverNode = src[len] >> 16;
+ {
+ UintR scanInfo = 0;
+ TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
+ TcKeyReq::setTakeOverScanNode(scanInfo, tTakeOverNode);
+ TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
+ newOp->theScanInfo = scanInfo;
+ }
-void SetValueRecList::add(const char* anAttrName, float aValue)
-{
- SetValueRec* newSetValueRec = new SetValueRec();
+ // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ
+ TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend());
+ Uint32 i = 0;
+ for (i = 0; i < TcKeyReq::MaxKeyInfo && i < len; i++) {
+ tcKeyReq->keyInfo[i] = * src++;
+ }
+
+ if(i < len){
+ NdbApiSignal* tSignal = theNdb->getSignal();
+ newOp->theFirstKEYINFO = tSignal;
+
+ Uint32 left = len - i;
+ while(tSignal && left > KeyInfo::DataLength){
+ tSignal->setSignal(GSN_KEYINFO);
+ KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
+ memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength);
+ src += KeyInfo::DataLength;
+ left -= KeyInfo::DataLength;
+
+ tSignal->next(theNdb->getSignal());
+ tSignal = tSignal->next();
+ }
- newSetValueRec->stype = SetValueRec::SET_FLOAT_ATTR1;
- newSetValueRec->anAttrName = strdup(anAttrName);
- newSetValueRec->aFloatValue = aValue;
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
+ if(tSignal && left > 0){
+ tSignal->setSignal(GSN_KEYINFO);
+ KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
+ memcpy(keyInfo->keyData, src, 4 * left);
+ }
+ }
+ return newOp;
}
+ return 0;
}
-void SetValueRecList::add(const char* anAttrName, double aValue)
+NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
+ : NdbScanOperation(aNdb)
{
- SetValueRec* newSetValueRec = new SetValueRec();
+}
- newSetValueRec->stype = SetValueRec::SET_DOUBLE_ATTR1;
- newSetValueRec->anAttrName = strdup(anAttrName);
- newSetValueRec->aDoubleValue = aValue;
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
- }
+NdbIndexScanOperation::~NdbIndexScanOperation(){
}
-void SetValueRecList::add(Uint32 anAttrId, const char* aValue, Uint32 len)
+int
+NdbIndexScanOperation::setBound(const char* anAttrName, int type, const void* aValue, Uint32 len)
{
- SetValueRec* newSetValueRec = new SetValueRec();
-
- newSetValueRec->stype = SetValueRec::SET_STRING_ATTR2;
- newSetValueRec->anAttrId = anAttrId;
- newSetValueRec->stringStruct.aStringValue = (char *) malloc(len);
- strlcpy(newSetValueRec->stringStruct.aStringValue, aValue, len);
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
- }
+ return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len);
}
-void SetValueRecList::add(Uint32 anAttrId, Int32 aValue)
+int
+NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len)
{
- SetValueRec* newSetValueRec = new SetValueRec();
+ return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len);
+}
- newSetValueRec->stype = SetValueRec::SET_INT32_ATTR2;
- newSetValueRec->anAttrId = anAttrId;
- newSetValueRec->anInt32Value = aValue;
- last->next = newSetValueRec;
- last = newSetValueRec;
+int
+NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject,
+ const char* aValue,
+ Uint32 len){
+ return setBound(anAttrObject, BoundEQ, aValue, len);
}
-void SetValueRecList::add(Uint32 anAttrId, Uint32 aValue)
-{
- SetValueRec* newSetValueRec = new SetValueRec();
+NdbRecAttr*
+NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo,
+ char* aValue){
+ if(!attrInfo->getPrimaryKey() || !m_ordered){
+ return NdbScanOperation::getValue_impl(attrInfo, aValue);
+ }
+
+ Uint32 id = attrInfo->m_attrId;
+ Uint32 marker = theTupleKeyDefined[id][0];
- newSetValueRec->stype = SetValueRec::SET_UINT32_ATTR2;
- newSetValueRec->anAttrId = anAttrId;
- newSetValueRec->anUint32Value = aValue;
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
+ if(marker == SETBOUND_EQ){
+ return NdbScanOperation::getValue_impl(attrInfo, aValue);
+ } else if(marker == API_PTR){
+ return NdbScanOperation::getValue_impl(attrInfo, aValue);
}
+
+ UintPtr oldVal;
+ oldVal = theTupleKeyDefined[id][1];
+#if (SIZEOF_CHARP == 8)
+ oldVal = oldVal | (((UintPtr)theTupleKeyDefined[id][2]) << 32);
+#endif
+ theTupleKeyDefined[id][0] = API_PTR;
+
+ NdbRecAttr* tmp = (NdbRecAttr*)oldVal;
+ tmp->setup(attrInfo, aValue);
+ return tmp;
}
-void SetValueRecList::add(Uint32 anAttrId, Int64 aValue)
+#include <AttributeHeader.hpp>
+/*
+ * Define bound on index column in range scan.
+ */
+int
+NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
+ int type, const void* aValue, Uint32 len)
{
- SetValueRec* newSetValueRec = new SetValueRec();
+ if (theOperationType == OpenRangeScanRequest &&
+ theStatus == SetBound &&
+ (0 <= type && type <= 4) &&
+ aValue != NULL &&
+ len <= 8000) {
+ // bound type
+
+ insertATTRINFO(type);
+ // attribute header
+ Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
+ if (len != sizeInBytes && (len != 0)) {
+ setErrorCodeAbort(4209);
+ return -1;
+ }
+ len = sizeInBytes;
+ Uint32 tIndexAttrId = tAttrInfo->m_attrId;
+ Uint32 sizeInWords = (len + 3) / 4;
+ AttributeHeader ah(tIndexAttrId, sizeInWords);
+ insertATTRINFO(ah.m_value);
+ // attribute data
+ if ((UintPtr(aValue) & 0x3) == 0 && (len & 0x3) == 0)
+ insertATTRINFOloop((const Uint32*)aValue, sizeInWords);
+ else {
+ Uint32 temp[2000];
+ memcpy(temp, aValue, len);
+ while ((len & 0x3) != 0)
+ ((char*)temp)[len++] = 0;
+ insertATTRINFOloop(temp, sizeInWords);
+ }
- newSetValueRec->stype = SetValueRec::SET_INT64_ATTR2;
- newSetValueRec->anAttrId = anAttrId;
- newSetValueRec->anInt64Value = aValue;
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
+ /**
+ * Do sorted stuff
+ */
+
+ /**
+ * The primary keys for an ordered index is defined in the beginning
+ * so it's safe to use [tIndexAttrId]
+ * (instead of looping as is NdbOperation::equal_impl)
+ */
+ if(!theTupleKeyDefined[tIndexAttrId][0]){
+ theNoOfTupKeyDefined++;
+ theTupleKeyDefined[tIndexAttrId][0] = SETBOUND_EQ;
+ m_sort_columns -= m_ordered;
+ }
+
+ return 0;
+ } else {
+ setErrorCodeAbort(4228); // XXX wrong code
+ return -1;
}
}
-void SetValueRecList::add(Uint32 anAttrId, Uint64 aValue)
-{
- SetValueRec* newSetValueRec = new SetValueRec();
+NdbResultSet*
+NdbIndexScanOperation::readTuples(LockMode lm,
+ Uint32 batch,
+ Uint32 parallel,
+ bool order_by){
+ NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0);
+ if(rs && order_by){
+ m_ordered = 1;
+ m_sort_columns = m_accessTable->getNoOfPrimaryKeys();
+ m_current_api_receiver = m_sent_receivers_count;
+ }
+ return rs;
+}
- newSetValueRec->stype = SetValueRec::SET_UINT64_ATTR2;
- newSetValueRec->anAttrId = anAttrId;
- newSetValueRec->anUint64Value = aValue;
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
+void
+NdbIndexScanOperation::fix_get_values(){
+ /**
+ * Loop through all getValues and set buffer pointer to "API" pointer
+ */
+ NdbRecAttr * curr = theReceiver.theFirstRecAttr;
+
+ Uint32 cnt = m_sort_columns;
+ assert(cnt < MAXNROFTUPLEKEY);
+
+ Uint32 idx = 0;
+ NdbTableImpl * tab = m_currentTable;
+ while(cnt > 0){ // To MAXNROFTUPLEKEY loops
+ NdbColumnImpl * col = tab->getColumn(idx);
+ if(col->getPrimaryKey()){
+ Uint32 val = theTupleKeyDefined[idx][0];
+ switch(val){
+ case FAKE_PTR:
+ curr->setup(col, 0);
+ // Fall-through
+ case API_PTR:
+ cnt--;
+ break;
+ case SETBOUND_EQ:
+ (void)1;
+#ifdef VM_TRACE
+ break;
+ default:
+ abort();
+#endif
+ }
+ }
+ idx++;
}
}
-void SetValueRecList::add(Uint32 anAttrId, float aValue)
-{
- SetValueRec* newSetValueRec = new SetValueRec();
+int
+NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
+ const NdbReceiver* t1,
+ const NdbReceiver* t2){
+
+ NdbRecAttr * r1 = t1->m_rows[t1->m_current_row];
+ NdbRecAttr * r2 = t2->m_rows[t2->m_current_row];
- newSetValueRec->stype = SetValueRec::SET_FLOAT_ATTR2;
- newSetValueRec->anAttrId = anAttrId;
- newSetValueRec->aFloatValue = aValue;
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
+ r1 = (skip ? r1->next() : r1);
+ r2 = (skip ? r2->next() : r2);
+
+ while(cols > 0){
+ Uint32 * d1 = (Uint32*)r1->aRef();
+ Uint32 * d2 = (Uint32*)r2->aRef();
+ unsigned r1_null = r1->isNULL();
+ if((r1_null ^ (unsigned)r2->isNULL())){
+ return (r1_null ? 1 : -1);
+ }
+ Uint32 type = NdbColumnImpl::getImpl(* r1->m_column).m_extType;
+ Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4;
+ if(!r1_null){
+ char r = NdbSqlUtil::cmp(type, d1, d2, size, size);
+ if(r){
+ assert(r != NdbSqlUtil::CmpUnknown);
+ assert(r != NdbSqlUtil::CmpError);
+ return r;
+ }
+ }
+ cols--;
+ r1 = r1->next();
+ r2 = r2->next();
}
+ return 0;
}
-void SetValueRecList::add(Uint32 anAttrId, double aValue)
-{
- SetValueRec* newSetValueRec = new SetValueRec();
+#define DEBUG_NEXT_RESULT 0
+
+int
+NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
+
+ Uint32 u_idx = m_current_api_receiver; // start of unsorted
+ Uint32 u_last = u_idx + 1; // last unsorted
+ Uint32 s_idx = u_last; // start of sorted
+ Uint32 s_last = theParallelism; // last sorted
+
+ NdbReceiver** arr = m_api_receivers;
+ NdbReceiver* tRec = arr[u_idx];
+
+ if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",
+ fetchAllowed,
+ (u_idx < s_last ? tRec->nextResult() : 0));
+
+ if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
+ u_idx, u_last,
+ s_idx, s_last);
+
+ bool fetchNeeded = (u_idx == s_last) || !tRec->nextResult();
+
+ if(fetchNeeded){
+ if(fetchAllowed){
+ if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
+ TransporterFacade* tp = TransporterFacade::instance();
+ Guard guard(tp->theMutexPtr);
+ Uint32 seq = theNdbCon->theNodeSequence;
+ Uint32 nodeId = theNdbCon->theDBnode;
+ if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(u_idx)){
+ Uint32 tmp = m_sent_receivers_count;
+ while(m_sent_receivers_count > 0){
+ theNdb->theWaiter.m_node = nodeId;
+ theNdb->theWaiter.m_state = WAIT_SCAN;
+ int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+ if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+ continue;
+ }
+ return -1;
+ }
+
+ u_idx = 0;
+ u_last = m_conf_receivers_count;
+ s_idx = (u_last > 1 ? s_last : s_idx);
+ m_conf_receivers_count = 0;
+ memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
+
+ if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
+ }
+ } else {
+ return 2;
+ }
+ }
- newSetValueRec->stype = SetValueRec::SET_DOUBLE_ATTR2;
- newSetValueRec->anAttrId = anAttrId;
- newSetValueRec->aDoubleValue = aValue;
- if (!last)
- first = last = newSetValueRec;
- else {
- last->next = newSetValueRec;
- last = newSetValueRec;
+ if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
+ u_idx, u_last,
+ s_idx, s_last);
+
+
+ Uint32 cols = m_sort_columns;
+ Uint32 skip = m_keyInfo;
+ while(u_idx < u_last){
+ u_last--;
+ tRec = arr[u_last];
+
+ // Do binary search instead to find place
+ Uint32 place = s_idx;
+ for(; place < s_last; place++){
+ if(compare(skip, cols, tRec, arr[place]) <= 0){
+ break;
+ }
+ }
+
+ if(place != s_idx){
+ if(DEBUG_NEXT_RESULT)
+ ndbout_c("memmove(%d, %d, %d)", s_idx-1, s_idx, (place - s_idx));
+ memmove(arr+s_idx-1, arr+s_idx, sizeof(char*)*(place - s_idx));
+ }
+
+ if(DEBUG_NEXT_RESULT) ndbout_c("putting %d @ %d", u_last, place - 1);
+ m_api_receivers[place-1] = tRec;
+ s_idx--;
}
-}
-void
-SetValueRecList::callSetValueFn(SetValueRec& aSetValueRec, NdbOperation& oper)
-{
- switch(aSetValueRec.stype) {
- case(SetValueRec::SET_STRING_ATTR1):
- oper.setValue(aSetValueRec.anAttrName, aSetValueRec.stringStruct.aStringValue, aSetValueRec.stringStruct.len);
- break;
- case(SetValueRec::SET_INT32_ATTR1):
- oper.setValue(aSetValueRec.anAttrName, aSetValueRec.anInt32Value);
- break;
- case(SetValueRec::SET_UINT32_ATTR1):
- oper.setValue(aSetValueRec.anAttrName, aSetValueRec.anUint32Value);
- break;
- case(SetValueRec::SET_INT64_ATTR1):
- oper.setValue(aSetValueRec.anAttrName, aSetValueRec.anInt64Value);
- break;
- case(SetValueRec::SET_UINT64_ATTR1):
- oper.setValue(aSetValueRec.anAttrName, aSetValueRec.anUint64Value);
- break;
- case(SetValueRec::SET_FLOAT_ATTR1):
- oper.setValue(aSetValueRec.anAttrName, aSetValueRec.aFloatValue);
- break;
- case(SetValueRec::SET_DOUBLE_ATTR1):
- oper.setValue(aSetValueRec.anAttrName, aSetValueRec.aDoubleValue);
- break;
- case(SetValueRec::SET_STRING_ATTR2):
- oper.setValue(aSetValueRec.anAttrId, aSetValueRec.stringStruct.aStringValue, aSetValueRec.stringStruct.len);
- break;
- case(SetValueRec::SET_INT32_ATTR2):
- oper.setValue(aSetValueRec.anAttrId, aSetValueRec.anInt32Value);
- break;
- case(SetValueRec::SET_UINT32_ATTR2):
- oper.setValue(aSetValueRec.anAttrId, aSetValueRec.anUint32Value);
- break;
- case(SetValueRec::SET_INT64_ATTR2):
- oper.setValue(aSetValueRec.anAttrId, aSetValueRec.anInt64Value);
- break;
- case(SetValueRec::SET_UINT64_ATTR2):
- oper.setValue(aSetValueRec.anAttrId, aSetValueRec.anUint64Value);
- break;
- case(SetValueRec::SET_FLOAT_ATTR2):
- oper.setValue(aSetValueRec.anAttrId, aSetValueRec.aFloatValue);
- break;
- case(SetValueRec::SET_DOUBLE_ATTR2):
- oper.setValue(aSetValueRec.anAttrId, aSetValueRec.aDoubleValue);
- break;
+ if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]",
+ u_idx, u_last,
+ s_idx, s_last);
+
+ m_current_api_receiver = s_idx;
+
+ if(DEBUG_NEXT_RESULT)
+ for(Uint32 i = s_idx; i<s_last; i++)
+ ndbout_c("%p", arr[i]);
+
+ tRec = m_api_receivers[s_idx];
+ if(s_idx < s_last && tRec->nextResult()){
+ tRec->copyout(theReceiver);
+ return 0;
}
-}
-SetValueRec::~SetValueRec()
-{
- if ((stype == SET_STRING_ATTR1) ||
- (stype == SET_INT32_ATTR1) ||
- (stype == SET_UINT32_ATTR1) ||
- (stype == SET_INT64_ATTR1) ||
- (stype == SET_UINT64_ATTR1) ||
- (stype == SET_FLOAT_ATTR1) ||
- (stype == SET_DOUBLE_ATTR1))
- free(anAttrName);
-
- if ((stype == SET_STRING_ATTR1) ||
- (stype == SET_STRING_ATTR2))
- free(stringStruct.aStringValue);
- if (next) delete next;
- next = 0;
+ TransporterFacade* tp = TransporterFacade::instance();
+ Guard guard(tp->theMutexPtr);
+ Uint32 seq = theNdbCon->theNodeSequence;
+ Uint32 nodeId = theNdbCon->theDBnode;
+ if(seq == tp->getNodeSequence(nodeId) && send_next_scan(0, true) == 0){
+ return 1;
+ }
+ return -1;
}
int
-NdbScanOperation::equal_impl(const NdbColumnImpl* anAttrObject,
- const char* aValue,
- Uint32 len){
- return setBound(anAttrObject, BoundEQ, aValue, len);
+NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
+ if(idx == theParallelism)
+ return 0;
+
+ NdbApiSignal tSignal(theNdb->theMyRef);
+ tSignal.setSignal(GSN_SCAN_NEXTREQ);
+
+ Uint32* theData = tSignal.getDataPtrSend();
+ theData[0] = theNdbCon->theTCConPtr;
+ theData[1] = 0;
+ Uint64 transId = theNdbCon->theTransactionId;
+ theData[2] = transId;
+ theData[3] = (Uint32) (transId >> 32);
+
+ /**
+ * Prepare ops
+ */
+ Uint32 last = m_sent_receivers_count;
+ Uint32 * prep_array = theData + 4;
+
+ NdbReceiver * tRec = m_api_receivers[idx];
+ m_sent_receivers[last] = tRec;
+ tRec->m_list_index = last;
+ prep_array[0] = tRec->m_tcPtrI;
+ tRec->prepareSend();
+
+ m_sent_receivers_count = last + 1;
+
+ Uint32 nodeId = theNdbCon->theDBnode;
+ TransporterFacade * tp = TransporterFacade::instance();
+ tSignal.setLength(4+1);
+ return tp->sendSignal(&tSignal, nodeId);
}
-
-
diff --git a/ndb/src/ndbapi/Ndbif.cpp b/ndb/src/ndbapi/Ndbif.cpp
index 2d722e12129..60eda978397 100644
--- a/ndb/src/ndbapi/Ndbif.cpp
+++ b/ndb/src/ndbapi/Ndbif.cpp
@@ -16,12 +16,11 @@
#include "NdbApiSignal.hpp"
+#include "AttrType.hpp"
#include "NdbImpl.hpp"
-//#include "NdbSchemaOp.hpp"
-//#include "NdbSchemaCon.hpp"
#include "NdbOperation.hpp"
#include "NdbIndexOperation.hpp"
-#include "NdbScanReceiver.hpp"
+#include "NdbScanOperation.hpp"
#include "NdbConnection.hpp"
#include "NdbRecAttr.hpp"
#include "NdbReceiver.hpp"
@@ -34,6 +33,9 @@
#include <signaldata/CreateIndx.hpp>
#include <signaldata/DropIndx.hpp>
#include <signaldata/TcIndx.hpp>
+#include <signaldata/TransIdAI.hpp>
+#include <signaldata/ScanFrag.hpp>
+#include <signaldata/ScanTab.hpp>
#include <ndb_limits.h>
#include <NdbOut.hpp>
@@ -41,12 +43,13 @@
/******************************************************************************
- * int init( int aMaxNoOfTransactions );
+ * int init( int aNrOfCon, int aNrOfOp );
*
* Return Value: Return 0 : init was successful.
* Return -1: In all other case.
- * Parameters: aMaxNoOfTransactions : Max number of simultaneous transations
- * Remark: Create pointers and idle list Synchronous.
+ * Parameters: aNrOfCon : Number of connections offered to the application.
+ * aNrOfOp : Number of operations offered to the application.
+ * Remark: Create pointers and idle list Synchronous.
****************************************************************************/
int
Ndb::init(int aMaxNoOfTransactions)
@@ -88,7 +91,7 @@ Ndb::init(int aMaxNoOfTransactions)
theMyRef = numberToRef(theNdbBlockNumber, theNode);
for (i = 1; i < MAX_NDB_NODES; i++){
- if (theFacade->getIsNodeDefined(i)){
+ if (theFacade->getIsDbNode(i)){
theDBnodes[theNoOfDBnodes] = i;
theNoOfDBnodes++;
}
@@ -252,9 +255,8 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId)
for (int i = tNoSentTransactions - 1; i >= 0; i--) {
NdbConnection* localCon = theSentTransactionsArray[i];
if (localCon->getConnectedNodeId() == aNodeId ) {
- const NdbConnection::SendStatusType sendStatus = localCon->theSendStatus;
- if (sendStatus == NdbConnection::sendTC_OP ||
- sendStatus == NdbConnection::sendTC_COMMIT) {
+ const SendStatusType sendStatus = localCon->theSendStatus;
+ if (sendStatus == sendTC_OP || sendStatus == sendTC_COMMIT) {
/*
A transaction was interrupted in the prepare phase by a node
failure. Since the transaction was not found in the phase
@@ -262,13 +264,13 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId)
we report a normal node failure abort.
*/
localCon->setOperationErrorCodeAbort(4010);
- localCon->theCompletionStatus = NdbConnection::CompletedFailure;
- } else if (sendStatus == NdbConnection::sendTC_ROLLBACK) {
+ localCon->theCompletionStatus = CompletedFailure;
+ } else if (sendStatus == sendTC_ROLLBACK) {
/*
We aimed for abort and abort we got even if it was by a node
failure. We will thus report it as a success.
*/
- localCon->theCompletionStatus = NdbConnection::CompletedSuccess;
+ localCon->theCompletionStatus = CompletedSuccess;
} else {
#ifdef VM_TRACE
printState("abortTransactionsAfterNodeFailure %x", this);
@@ -280,7 +282,7 @@ Ndb::abortTransactionsAfterNodeFailure(Uint16 aNodeId)
intact since the node was failing and they were aborted. Thus we
set commit state to Aborted and set state to release on close.
*/
- localCon->theCommitStatus = NdbConnection::Aborted;
+ localCon->theCommitStatus = Aborted;
localCon->theReleaseOnClose = true;
completedTransaction(localCon);
}//if
@@ -300,26 +302,28 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
NdbOperation* tOp;
NdbIndexOperation* tIndexOp;
NdbConnection* tCon;
- int tReturnCode;
+ int tReturnCode = -1;
const Uint32* tDataPtr = aSignal->getDataPtr();
const Uint32 tWaitState = theWaiter.m_state;
const Uint32 tSignalNumber = aSignal->readSignalNumber();
const Uint32 tFirstData = *tDataPtr;
+ const Uint32 tLen = aSignal->getLength();
+ void * tFirstDataPtr;
/*
- In order to support 64 bit processes in the application we need to use
- id's rather than a direct pointer to the object used. It is also a good
- idea that one cannot corrupt the application code by sending a corrupt
- memory pointer.
-
- All signals received by the API requires the first data word to be such
- an id to the receiving object.
+ In order to support 64 bit processes in the application we need to use
+ id's rather than a direct pointer to the object used. It is also a good
+ idea that one cannot corrupt the application code by sending a corrupt
+ memory pointer.
+
+ All signals received by the API requires the first data word to be such
+ an id to the receiving object.
*/
-
+
switch (tSignalNumber){
case GSN_TCKEYCONF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
const TcKeyConf * const keyConf = (TcKeyConf *)tDataPtr;
@@ -327,8 +331,8 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
tCon = void2con(tFirstDataPtr);
if ((tCon->checkMagicNumber() == 0) &&
- (tCon->theSendStatus == NdbConnection::sendTC_OP)) {
- tReturnCode = tCon->receiveTCKEYCONF(keyConf, aSignal->getLength());
+ (tCon->theSendStatus == sendTC_OP)) {
+ tReturnCode = tCon->receiveTCKEYCONF(keyConf, tLen);
if (tReturnCode != -1) {
completedTransaction(tCon);
}//if
@@ -346,91 +350,48 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
return;
}
- case GSN_READCONF:
- {
- void* tFirstDataPtr = int2void(tFirstData);
- if (tFirstDataPtr == 0) goto InvalidSignal;
-
- tOp = void2rec_op(tFirstDataPtr);
- if (tOp->checkMagicNumber() == 0) {
- tCon = tOp->theNdbCon;
- if (tCon != NULL) {
- if (tCon->theSendStatus == NdbConnection::sendTC_OP) {
- tReturnCode = tOp->receiveREAD_CONF(tDataPtr,
- aSignal->getLength());
- if (tReturnCode != -1) {
- completedTransaction(tCon);
- }//if
- }//if
- }//if
- }//if
- return;
- }
case GSN_TRANSID_AI:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
+ assert(tFirstDataPtr);
if (tFirstDataPtr == 0) goto InvalidSignal;
-
- // ndbout << "*** GSN_TRANSID_AI ***" << endl;
NdbReceiver* tRec = void2rec(tFirstDataPtr);
- if (tRec->getType() == NdbReceiver::NDB_OPERATION){
- // tOp = (NdbOperation*)tRec->getOwner();
- tOp = void2rec_op(tFirstDataPtr);
- // ndbout << "NDB_OPERATION" << endl;
- if (tOp->checkMagicNumber() == 0) {
- tCon = tOp->theNdbCon;
- if (tCon != NULL) {
- if (tCon->theSendStatus == NdbConnection::sendTC_OP) {
- tReturnCode = tOp->receiveTRANSID_AI(tDataPtr,
- aSignal->getLength());
- if (tReturnCode != -1) {
- completedTransaction(tCon);
- break;
- }
- }
- }
+ assert(tRec->checkMagicNumber());
+ assert(tRec->getTransaction());
+ assert(tRec->getTransaction()->checkState_TransId(((const TransIdAI*)tDataPtr)->transId));
+ if(tRec->checkMagicNumber() && (tCon = tRec->getTransaction()) &&
+ tCon->checkState_TransId(((const TransIdAI*)tDataPtr)->transId)){
+ Uint32 com;
+ if(aSignal->m_noOfSections > 0){
+ com = tRec->execTRANSID_AI(ptr[0].p, ptr[0].sz);
+ } else {
+ com = tRec->execTRANSID_AI(tDataPtr + TransIdAI::HeaderLength,
+ tLen - TransIdAI::HeaderLength);
}
- } else if (tRec->getType() == NdbReceiver::NDB_INDEX_OPERATION){
- // tOp = (NdbIndexOperation*)tRec->getOwner();
- tOp = void2rec_iop(tFirstDataPtr);
- // ndbout << "NDB_INDEX_OPERATION" << endl;
- if (tOp->checkMagicNumber() == 0) {
- tCon = tOp->theNdbCon;
- if (tCon != NULL) {
- if (tCon->theSendStatus == NdbConnection::sendTC_OP) {
- tReturnCode = tOp->receiveTRANSID_AI(tDataPtr,
- aSignal->getLength());
- if (tReturnCode != -1) {
- completedTransaction(tCon);
- break;
- }
- }
- }
- }
- } else if (tRec->getType() == NdbReceiver::NDB_SCANRECEIVER) {
- // NdbScanReceiver* tScanRec = (NdbScanReceiver*)tRec->getOwner();
- // NdbScanReceiver* tScanRec =
- // (NdbScanReceiver*)(void2rec(tFirstDataPtr)->getOwner());
- NdbScanReceiver* tScanRec = void2rec_srec(tFirstDataPtr);
- // ndbout << "NDB_SCANRECEIVER" << endl;
- if(tScanRec->checkMagicNumber() == 0){
- tReturnCode = tScanRec->receiveTRANSID_AI_SCAN(aSignal);
- if (tReturnCode != -1) {
+
+ if(com == 1){
+ switch(tRec->getType()){
+ case NdbReceiver::NDB_OPERATION:
+ case NdbReceiver::NDB_INDEX_OPERATION:
+ if(tCon->OpCompleteSuccess() != -1)
+ completedTransaction(tCon);
+ break;
+ case NdbReceiver::NDB_SCANRECEIVER:
+ tCon->theScanningOp->receiver_delivered(tRec);
theWaiter.m_state = NO_WAIT;
break;
+ default:
+ goto InvalidSignal;
}
}
+ break;
} else {
-#ifdef NDB_NO_DROPPED_SIGNAL
- abort();
-#endif
goto InvalidSignal;
}
- return;
}
case GSN_TCKEY_FAILCONF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
const TcKeyFailConf * const failConf = (TcKeyFailConf *)tDataPtr;
@@ -441,8 +402,8 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
if (tOp->checkMagicNumber() == 0) {
tCon = tOp->theNdbCon;
if (tCon != NULL) {
- if ((tCon->theSendStatus == NdbConnection::sendTC_OP) ||
- (tCon->theSendStatus == NdbConnection::sendTC_COMMIT)) {
+ if ((tCon->theSendStatus == sendTC_OP) ||
+ (tCon->theSendStatus == sendTC_COMMIT)) {
tReturnCode = tCon->receiveTCKEY_FAILCONF(failConf);
if (tReturnCode != -1) {
completedTransaction(tCon);
@@ -461,15 +422,15 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
case GSN_TCKEY_FAILREF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
tOp = void2rec_op(tFirstDataPtr);
if (tOp->checkMagicNumber() == 0) {
tCon = tOp->theNdbCon;
if (tCon != NULL) {
- if ((tCon->theSendStatus == NdbConnection::sendTC_OP) ||
- (tCon->theSendStatus == NdbConnection::sendTC_ROLLBACK)) {
+ if ((tCon->theSendStatus == sendTC_OP) ||
+ (tCon->theSendStatus == sendTC_ROLLBACK)) {
tReturnCode = tCon->receiveTCKEY_FAILREF(aSignal);
if (tReturnCode != -1) {
completedTransaction(tCon);
@@ -482,14 +443,14 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
case GSN_TCKEYREF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
tOp = void2rec_op(tFirstDataPtr);
if (tOp->checkMagicNumber() == 0) {
tCon = tOp->theNdbCon;
if (tCon != NULL) {
- if (tCon->theSendStatus == NdbConnection::sendTC_OP) {
+ if (tCon->theSendStatus == sendTC_OP) {
tReturnCode = tOp->receiveTCKEYREF(aSignal);
if (tReturnCode != -1) {
completedTransaction(tCon);
@@ -503,7 +464,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
case GSN_TC_COMMITCONF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
const TcCommitConf * const commitConf = (TcCommitConf *)tDataPtr;
@@ -511,7 +472,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
tCon = void2con(tFirstDataPtr);
if ((tCon->checkMagicNumber() == 0) &&
- (tCon->theSendStatus == NdbConnection::sendTC_COMMIT)) {
+ (tCon->theSendStatus == sendTC_COMMIT)) {
tReturnCode = tCon->receiveTC_COMMITCONF(commitConf);
if (tReturnCode != -1) {
completedTransaction(tCon);
@@ -531,12 +492,12 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
case GSN_TC_COMMITREF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
tCon = void2con(tFirstDataPtr);
if ((tCon->checkMagicNumber() == 0) &&
- (tCon->theSendStatus == NdbConnection::sendTC_COMMIT)) {
+ (tCon->theSendStatus == sendTC_COMMIT)) {
tReturnCode = tCon->receiveTC_COMMITREF(aSignal);
if (tReturnCode != -1) {
completedTransaction(tCon);
@@ -547,12 +508,12 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
case GSN_TCROLLBACKCONF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
tCon = void2con(tFirstDataPtr);
if ((tCon->checkMagicNumber() == 0) &&
- (tCon->theSendStatus == NdbConnection::sendTC_ROLLBACK)) {
+ (tCon->theSendStatus == sendTC_ROLLBACK)) {
tReturnCode = tCon->receiveTCROLLBACKCONF(aSignal);
if (tReturnCode != -1) {
completedTransaction(tCon);
@@ -562,12 +523,12 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
case GSN_TCROLLBACKREF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
tCon = void2con(tFirstDataPtr);
if ((tCon->checkMagicNumber() == 0) &&
- (tCon->theSendStatus == NdbConnection::sendTC_ROLLBACK)) {
+ (tCon->theSendStatus == sendTC_ROLLBACK)) {
tReturnCode = tCon->receiveTCROLLBACKREF(aSignal);
if (tReturnCode != -1) {
completedTransaction(tCon);
@@ -578,7 +539,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
case GSN_TCROLLBACKREP:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
tCon = void2con(tFirstDataPtr);
@@ -592,7 +553,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
case GSN_TCSEIZECONF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
if (tWaitState != WAIT_TC_SEIZE) {
@@ -612,7 +573,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
case GSN_TCSEIZEREF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
if (tWaitState != WAIT_TC_SEIZE) {
@@ -632,7 +593,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
case GSN_TCRELEASECONF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
if (tWaitState != WAIT_TC_RELEASE) {
@@ -650,7 +611,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
case GSN_TCRELEASEREF:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
if (tWaitState != WAIT_TC_RELEASE) {
@@ -704,7 +665,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
case GSN_DIHNDBTAMPER:
{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
if (tWaitState != WAIT_NDB_TAMPER)
@@ -718,27 +679,34 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
break;
}
case GSN_SCAN_TABCONF:
- {
- void* tFirstDataPtr = int2void(tFirstData);
- if (tFirstDataPtr == 0) goto InvalidSignal;
-
- //ndbout << "*** GSN_SCAN_TABCONF *** " << endl;
- if (tWaitState != WAIT_SCAN){
- return;
- }
- tCon = void2con(tFirstDataPtr);
- if (tCon->checkMagicNumber() != 0)
- return;
- tReturnCode = tCon->receiveSCAN_TABCONF(aSignal);
- if (tReturnCode != -1)
- theWaiter.m_state = NO_WAIT;
- break;
+ {
+ tFirstDataPtr = int2void(tFirstData);
+ assert(tFirstDataPtr);
+ assert(void2con(tFirstDataPtr));
+ assert(void2con(tFirstDataPtr)->checkMagicNumber() == 0);
+ if(tFirstDataPtr &&
+ (tCon = void2con(tFirstDataPtr)) && (tCon->checkMagicNumber() == 0)){
+
+ if(aSignal->m_noOfSections > 0){
+ tReturnCode = tCon->receiveSCAN_TABCONF(aSignal, ptr[0].p, ptr[0].sz);
+ } else {
+ tReturnCode =
+ tCon->receiveSCAN_TABCONF(aSignal,
+ tDataPtr + ScanTabConf::SignalLength,
+ tLen - ScanTabConf::SignalLength);
+ }
+ if (tReturnCode != -1)
+ theWaiter.m_state = NO_WAIT;
+ break;
+ } else {
+ goto InvalidSignal;
}
+ }
case GSN_SCAN_TABREF:
- {
- void* tFirstDataPtr = int2void(tFirstData);
+ {
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
-
+
if (tWaitState == WAIT_SCAN){
tCon = void2con(tFirstDataPtr);
if (tCon->checkMagicNumber() == 0){
@@ -753,43 +721,49 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
case GSN_SCAN_TABINFO:
{
- void* tFirstDataPtr = int2void(tFirstData);
- if (tFirstDataPtr == 0) goto InvalidSignal;
-
- //ndbout << "*** GSN_SCAN_TABINFO ***" << endl;
- if (tWaitState != WAIT_SCAN)
- return;
- tCon = void2con(tFirstDataPtr);
- if (tCon->checkMagicNumber() != 0)
- return;
- tReturnCode = tCon->receiveSCAN_TABINFO(aSignal);
- if (tReturnCode != -1)
- theWaiter.m_state = NO_WAIT;
- break;
+ goto InvalidSignal;
}
case GSN_KEYINFO20: {
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
-
- //ndbout << "*** GSN_KEYINFO20 ***" << endl;
- NdbScanReceiver* tScanRec = void2rec_srec(tFirstDataPtr);
- if (tScanRec->checkMagicNumber() != 0)
- return;
- tReturnCode = tScanRec->receiveKEYINFO20(aSignal);
- if (tReturnCode != -1)
- theWaiter.m_state = NO_WAIT;
- break;
+ NdbReceiver* tRec = void2rec(tFirstDataPtr);
+
+ if(tRec->checkMagicNumber() && (tCon = tRec->getTransaction()) &&
+ tCon->checkState_TransId(&((const KeyInfo20*)tDataPtr)->transId1)){
+
+ Uint32 len = ((const KeyInfo20*)tDataPtr)->keyLen;
+ Uint32 info = ((const KeyInfo20*)tDataPtr)->scanInfo_Node;
+ int com = -1;
+ if(aSignal->m_noOfSections > 0 && len == ptr[0].sz){
+ com = tRec->execKEYINFO20(info, ptr[0].p, len);
+ } else if(len == tLen - KeyInfo20::HeaderLength){
+ com = tRec->execKEYINFO20(info, tDataPtr+KeyInfo20::HeaderLength, len);
+ }
+
+ switch(com){
+ case 1:
+ tCon->theScanningOp->receiver_delivered(tRec);
+ theWaiter.m_state = NO_WAIT;
+ break;
+ case 0:
+ break;
+ case -1:
+ goto InvalidSignal;
+ }
+ break;
+ }
+ goto InvalidSignal;
}
case GSN_TCINDXCONF:{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
const TcIndxConf * const indxConf = (TcIndxConf *)tDataPtr;
const BlockReference aTCRef = aSignal->theSendersBlockRef;
tCon = void2con(tFirstDataPtr);
if ((tCon->checkMagicNumber() == 0) &&
- (tCon->theSendStatus == NdbConnection::sendTC_OP)) {
- tReturnCode = tCon->receiveTCINDXCONF(indxConf, aSignal->getLength());
+ (tCon->theSendStatus == sendTC_OP)) {
+ tReturnCode = tCon->receiveTCINDXCONF(indxConf, tLen);
if (tReturnCode != -1) {
completedTransaction(tCon);
}//if
@@ -804,14 +778,14 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
break;
}
case GSN_TCINDXREF:{
- void* tFirstDataPtr = int2void(tFirstData);
+ tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
tIndexOp = void2rec_iop(tFirstDataPtr);
if (tIndexOp->checkMagicNumber() == 0) {
tCon = tIndexOp->theNdbCon;
if (tCon != NULL) {
- if (tCon->theSendStatus == NdbConnection::sendTC_OP) {
+ if (tCon->theSendStatus == sendTC_OP) {
tReturnCode = tIndexOp->receiveTCINDXREF(aSignal);
if (tReturnCode != -1) {
completedTransaction(tCon);
@@ -865,8 +839,7 @@ Ndb::completedTransaction(NdbConnection* aCon)
Uint32 tTransArrayIndex = aCon->theTransArrayIndex;
Uint32 tNoSentTransactions = theNoOfSentTransactions;
Uint32 tNoCompletedTransactions = theNoOfCompletedTransactions;
- if ((tNoSentTransactions > 0) &&
- (aCon->theListState == NdbConnection::InSendList) &&
+ if ((tNoSentTransactions > 0) && (aCon->theListState == InSendList) &&
(tTransArrayIndex < tNoSentTransactions)) {
NdbConnection* tMoveCon = theSentTransactionsArray[tNoSentTransactions - 1];
@@ -880,7 +853,7 @@ Ndb::completedTransaction(NdbConnection* aCon)
theNoOfCompletedTransactions = tNoCompletedTransactions + 1;
theNoOfSentTransactions = tNoSentTransactions - 1;
- aCon->theListState = NdbConnection::InCompletedList;
+ aCon->theListState = InCompletedList;
aCon->handleExecuteCompletion();
if ((theMinNoOfEventsToWakeUp != 0) &&
(theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) {
@@ -895,8 +868,8 @@ Ndb::completedTransaction(NdbConnection* aCon)
ndbout << endl << flush;
#ifdef VM_TRACE
printState("completedTransaction abort");
-#endif
abort();
+#endif
}//if
}//Ndb::completedTransaction()
@@ -915,7 +888,7 @@ Ndb::reportCallback(NdbConnection** aCopyArray, Uint32 aNoOfCompletedTrans)
NdbAsynchCallback aCallback = aCopyArray[i]->theCallbackFunction;
int tResult = 0;
if (aCallback != NULL) {
- if (aCopyArray[i]->theReturnStatus == NdbConnection::ReturnFailure) {
+ if (aCopyArray[i]->theReturnStatus == ReturnFailure) {
tResult = -1;
}//if
(*aCallback)(tResult, aCopyArray[i], anyObject);
@@ -939,13 +912,13 @@ Ndb::pollCompleted(NdbConnection** aCopyArray)
if (tNoCompletedTransactions > 0) {
for (i = 0; i < tNoCompletedTransactions; i++) {
aCopyArray[i] = theCompletedTransactionsArray[i];
- if (aCopyArray[i]->theListState != NdbConnection::InCompletedList) {
+ if (aCopyArray[i]->theListState != InCompletedList) {
ndbout << "pollCompleted error ";
ndbout << aCopyArray[i]->theListState << endl;
abort();
}//if
theCompletedTransactionsArray[i] = NULL;
- aCopyArray[i]->theListState = NdbConnection::NotInList;
+ aCopyArray[i]->theListState = NotInList;
}//for
}//if
theNoOfCompletedTransactions = 0;
@@ -967,8 +940,8 @@ Ndb::check_send_timeout()
a_con->printState();
#endif
a_con->setOperationErrorCodeAbort(4012);
- a_con->theCommitStatus = NdbConnection::Aborted;
- a_con->theCompletionStatus = NdbConnection::CompletedFailure;
+ a_con->theCommitStatus = Aborted;
+ a_con->theCompletionStatus = CompletedFailure;
a_con->handleExecuteCompletion();
remove_sent_list(i);
insert_completed_list(a_con);
@@ -997,7 +970,7 @@ Ndb::insert_completed_list(NdbConnection* a_con)
Uint32 no_of_comp = theNoOfCompletedTransactions;
theCompletedTransactionsArray[no_of_comp] = a_con;
theNoOfCompletedTransactions = no_of_comp + 1;
- a_con->theListState = NdbConnection::InCompletedList;
+ a_con->theListState = InCompletedList;
a_con->theTransArrayIndex = no_of_comp;
return no_of_comp;
}
@@ -1008,7 +981,7 @@ Ndb::insert_sent_list(NdbConnection* a_con)
Uint32 no_of_sent = theNoOfSentTransactions;
theSentTransactionsArray[no_of_sent] = a_con;
theNoOfSentTransactions = no_of_sent + 1;
- a_con->theListState = NdbConnection::InSendList;
+ a_con->theListState = InSendList;
a_con->theTransArrayIndex = no_of_sent;
return no_of_sent;
}
@@ -1046,10 +1019,10 @@ Ndb::sendPrepTrans(int forceSend)
if ((tp->getNodeSequence(node_id) == a_con->theNodeSequence) &&
tp->get_node_alive(node_id) ||
(tp->get_node_stopping(node_id) &&
- ((a_con->theSendStatus == NdbConnection::sendABORT) ||
- (a_con->theSendStatus == NdbConnection::sendABORTfail) ||
- (a_con->theSendStatus == NdbConnection::sendCOMMITstate) ||
- (a_con->theSendStatus == NdbConnection::sendCompleted)))) {
+ ((a_con->theSendStatus == sendABORT) ||
+ (a_con->theSendStatus == sendABORTfail) ||
+ (a_con->theSendStatus == sendCOMMITstate) ||
+ (a_con->theSendStatus == sendCompleted)))) {
/*
We will send if
1) Node is alive and sequences are correct OR
@@ -1081,13 +1054,13 @@ Ndb::sendPrepTrans(int forceSend)
again and will thus set the state to Aborted to avoid a more or
less eternal loop of tries.
*/
- if (a_con->theSendStatus == NdbConnection::sendOperations) {
+ if (a_con->theSendStatus == sendOperations) {
a_con->setOperationErrorCodeAbort(4021);
- a_con->theCommitStatus = NdbConnection::NeedAbort;
+ a_con->theCommitStatus = NeedAbort;
TRACE_DEBUG("Send buffer full and sendOperations");
} else {
a_con->setOperationErrorCodeAbort(4026);
- a_con->theCommitStatus = NdbConnection::Aborted;
+ a_con->theCommitStatus = Aborted;
TRACE_DEBUG("Send buffer full, set state to Aborted");
}//if
}//if
@@ -1104,7 +1077,7 @@ Ndb::sendPrepTrans(int forceSend)
*/
TRACE_DEBUG("Abort a transaction when stopping a node");
a_con->setOperationErrorCodeAbort(4023);
- a_con->theCommitStatus = NdbConnection::NeedAbort;
+ a_con->theCommitStatus = NeedAbort;
} else {
/*
The node is hard dead and we cannot continue. We will also release
@@ -1114,10 +1087,10 @@ Ndb::sendPrepTrans(int forceSend)
a_con->setOperationErrorCodeAbort(4025);
a_con->theReleaseOnClose = true;
a_con->theTransactionIsStarted = false;
- a_con->theCommitStatus = NdbConnection::Aborted;
+ a_con->theCommitStatus = Aborted;
}//if
}//if
- a_con->theCompletionStatus = NdbConnection::CompletedFailure;
+ a_con->theCompletionStatus = CompletedFailure;
a_con->handleExecuteCompletion();
insert_completed_list(a_con);
}//for
@@ -1255,8 +1228,7 @@ Return: 0 - Response received
******************************************************************************/
int
-Ndb::receiveResponse(int waitTime)
-{
+Ndb::receiveResponse(int waitTime){
int tResultCode;
TransporterFacade::instance()->checkForceSend(theNdbBlockNumber);
@@ -1310,10 +1282,10 @@ Ndb::sendRecSignal(Uint16 node_id,
if (return_code != -1) {
theWaiter.m_node = node_id;
theWaiter.m_state = aWaitState;
- return receiveResponse();
- // End of protected area
- }//if
- return_code = -3;
+ return_code = receiveResponse();
+ } else {
+ return_code = -3;
+ }
} else {
return_code = -4;
}//if
diff --git a/ndb/src/ndbapi/Ndbinit.cpp b/ndb/src/ndbapi/Ndbinit.cpp
index 042faa431a0..1d9d6fee5e4 100644
--- a/ndb/src/ndbapi/Ndbinit.cpp
+++ b/ndb/src/ndbapi/Ndbinit.cpp
@@ -137,7 +137,7 @@ Ndb::Ndb( const char* aDataBase , const char* aDataBaseSchema) :
TransporterFacade * m_facade = 0;
if(theNoOfNdbObjects == 0){
- if ((m_facade = TransporterFacade::start_instance(0,ndbConnectString)) == 0)
+ if ((m_facade = TransporterFacade::start_instance(ndbConnectString)) == 0)
theInitState = InitConfigError;
} else {
m_facade = TransporterFacade::instance();
diff --git a/ndb/src/ndbapi/Ndblist.cpp b/ndb/src/ndbapi/Ndblist.cpp
index 0d9c0f60985..8cc8ba1a079 100644
--- a/ndb/src/ndbapi/Ndblist.cpp
+++ b/ndb/src/ndbapi/Ndblist.cpp
@@ -15,16 +15,13 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <NdbOut.hpp>
-#include "Ndb.hpp"
-//#include "NdbSchemaOp.hpp"
-//#include "NdbSchemaCon.hpp"
-#include "NdbOperation.hpp"
-#include "NdbScanOperation.hpp"
-#include "NdbIndexOperation.hpp"
-#include "NdbConnection.hpp"
+#include <Ndb.hpp>
+#include <NdbOperation.hpp>
+#include <NdbIndexOperation.hpp>
+#include <NdbIndexScanOperation.hpp>
+#include <NdbConnection.hpp>
#include "NdbApiSignal.hpp"
-#include "NdbRecAttr.hpp"
-#include "NdbScanReceiver.hpp"
+#include <NdbRecAttr.hpp>
#include "NdbUtil.hpp"
#include "API.hpp"
@@ -263,13 +260,13 @@ Ndb::getNdbLabel()
* Remark: Get a NdbScanReceiver from theScanRecList and return the
* object .
****************************************************************************/
-NdbScanReceiver*
+NdbReceiver*
Ndb::getNdbScanRec()
{
- NdbScanReceiver* tNdbScanRec;
+ NdbReceiver* tNdbScanRec;
if ( theScanList == NULL )
{
- tNdbScanRec = new NdbScanReceiver(this);
+ tNdbScanRec = new NdbReceiver(this);
if (tNdbScanRec == NULL)
{
return NULL;
@@ -344,17 +341,17 @@ Return Value: Return theOpList : if the getScanOperation was succesful.
Return NULL : In all other case.
Remark: Get an operation from theScanOpIdleList and return the object .
***************************************************************************/
-NdbScanOperation*
+NdbIndexScanOperation*
Ndb::getScanOperation()
{
- NdbScanOperation* tOp = theScanOpIdleList;
+ NdbIndexScanOperation* tOp = theScanOpIdleList;
if (tOp != NULL ) {
- NdbScanOperation* tOpNext = (NdbScanOperation*) tOp->next();
+ NdbIndexScanOperation* tOpNext = (NdbIndexScanOperation*)tOp->next();
tOp->next(NULL);
theScanOpIdleList = tOpNext;
return tOp;
} else {
- tOp = new NdbScanOperation(this);
+ tOp = new NdbIndexScanOperation(this);
if (tOp != NULL)
tOp->next(NULL);
}
@@ -495,7 +492,7 @@ Parameters: aNdbScanRec: The NdbScanReceiver object.
Remark: Add a NdbScanReceiver object into the Scan idlelist.
***************************************************************************/
void
-Ndb::releaseNdbScanRec(NdbScanReceiver* aNdbScanRec)
+Ndb::releaseNdbScanRec(NdbReceiver* aNdbScanRec)
{
aNdbScanRec->next(theScanList);
theScanList = aNdbScanRec;
@@ -544,12 +541,12 @@ Parameters: aScanOperation : The released NdbScanOperation object.
Remark: Add a NdbScanOperation object into the signal idlelist.
***************************************************************************/
void
-Ndb::releaseScanOperation(NdbScanOperation* aScanOperation)
+Ndb::releaseScanOperation(NdbIndexScanOperation* aScanOperation)
{
aScanOperation->next(theScanOpIdleList);
aScanOperation->theNdbCon = NULL;
aScanOperation->theMagicNumber = 0xFE11D2;
- theScanOpIdleList = (NdbScanOperation*)aScanOperation;
+ theScanOpIdleList = aScanOperation;
}
/***************************************************************************
@@ -623,7 +620,7 @@ void
Ndb::freeScanOperation()
{
NdbScanOperation* tOp = theScanOpIdleList;
- theScanOpIdleList = (NdbScanOperation *) theScanOpIdleList->next();
+ theScanOpIdleList = (NdbIndexScanOperation *) theScanOpIdleList->next();
delete tOp;
}
@@ -674,7 +671,7 @@ Remark: Always release the first item in the free list
void
Ndb::freeNdbScanRec()
{
- NdbScanReceiver* tNdbScanRec = theScanList;
+ NdbReceiver* tNdbScanRec = theScanList;
theScanList = theScanList->next();
delete tNdbScanRec;
}
diff --git a/ndb/src/ndbapi/ObjectMap.hpp b/ndb/src/ndbapi/ObjectMap.hpp
index 4abb54b5081..f67774bb413 100644
--- a/ndb/src/ndbapi/ObjectMap.hpp
+++ b/ndb/src/ndbapi/ObjectMap.hpp
@@ -93,26 +93,28 @@ inline
void *
NdbObjectIdMap::unmap(Uint32 id, void *object){
- int i = id>>2;
+ Uint32 i = id>>2;
// lock();
-
- void * obj = m_map[i].m_obj;
- if (object == obj) {
- m_map[i].m_next = m_firstFree;
- m_firstFree = i;
- } else {
- ndbout_c("Error: NdbObjectIdMap::::unmap(%u, 0x%x) obj=0x%x", id, object, obj);
- return 0;
- }
-
- // unlock();
-
+ if(i < m_size){
+ void * obj = m_map[i].m_obj;
+ if (object == obj) {
+ m_map[i].m_next = m_firstFree;
+ m_firstFree = i;
+ } else {
+ ndbout_c("Error: NdbObjectIdMap::::unmap(%u, 0x%x) obj=0x%x", id, object, obj);
+ return 0;
+ }
+
+ // unlock();
+
#ifdef DEBUG_OBJECTMAP
- ndbout_c("NdbObjectIdMap::unmap(%u) obj=0x%x", id, obj);
+ ndbout_c("NdbObjectIdMap::unmap(%u) obj=0x%x", id, obj);
#endif
-
- return obj;
+
+ return obj;
+ }
+ return 0;
}
inline void *
@@ -120,7 +122,11 @@ NdbObjectIdMap::getObject(Uint32 id){
#ifdef DEBUG_OBJECTMAP
ndbout_c("NdbObjectIdMap::getObject(%u) obj=0x%x", id, m_map[id>>2].m_obj);
#endif
- return m_map[id>>2].m_obj;
+ id >>= 2;
+ if(id < m_size){
+ return m_map[id].m_obj;
+ }
+ return 0;
}
inline void
@@ -129,7 +135,6 @@ NdbObjectIdMap::expand(Uint32 incSize){
MapEntry * tmp = (MapEntry*)malloc(newSize * sizeof(MapEntry));
memcpy(tmp, m_map, m_size * sizeof(MapEntry));
- free(m_map);
m_map = tmp;
for(Uint32 i = m_size; i<newSize; i++){
diff --git a/ndb/src/ndbapi/ScanOperation.txt b/ndb/src/ndbapi/ScanOperation.txt
index 7197cf66f7e..27e4e8c1755 100644
--- a/ndb/src/ndbapi/ScanOperation.txt
+++ b/ndb/src/ndbapi/ScanOperation.txt
@@ -8,3 +8,49 @@ theNdbCon -> z
z) NdbConnection (scan)
theScanningOp -> y
theFirstOpInList -> y (until after openScan)
+
+# SU
+
+ScanOpLen: includes KeyInfo
+New protocol
+
+# -- Impl.
+
+1) Scan uses one NdbReceiver per "parallelism"
+2) Each NdbReceiver can handle up to "batch size" rows
+3) API send one "pointer" per parallelism (prev. was one per row)
+4) API handles each receiver independently.
+ It can "nextResult"-one, receive one and close-one
+5) When a recevier has been "nextResult"-ed, the API can fetch from it again
+6) After doing "openScan"-req, no wait is performed
+ (only possible to block on nextResult(true) or closeScan)
+
+7) Instead of "ack"-ing each row with length,
+* Each row is sent in one lonw signal (unless to short)
+* Each NdbReceiver is ack-ed with #rows and sum(#length)
+* KeyInfo20 is one signal and included in sum(#length)
+
+8) The API receive(s) the data into NdbRecAttr-objects
+ (prev. it copied signals using new/delete)
+9) KeyInfo20 is also received into a NdbRecAttr-object
+10)
+
+# -- Close of scan
+
+1) Each NdbReciver gets a signal when it's complete
+ (0 rows is ack-ed)
+2) The API then "closes" this receiver
+3) The API can at any time close then scan for other reason(s)
+ (example dying)
+4) This is signal:ed via a NEXT_SCANREQ (close = 1)
+5) TC responds with a SCAN_TABCONF (close = 1)
+
+
+# -- Sorted
+
+1) The sorted scan is transparent to TC
+ It's a API only impl.
+2) The API makes the following adjustements:
+* Scan all fragments simultaniously (max parallelism)
+* Never return a row to the API if a NdbReciver is "outstanding"
+* Sort Receivers (only top row as they already are sorted within)
diff --git a/ndb/src/ndbapi/TransporterFacade.cpp b/ndb/src/ndbapi/TransporterFacade.cpp
index 7806e482f1f..e725144a8f8 100644
--- a/ndb/src/ndbapi/TransporterFacade.cpp
+++ b/ndb/src/ndbapi/TransporterFacade.cpp
@@ -28,6 +28,8 @@
#include "API.hpp"
#include <ConfigRetriever.hpp>
+#include <mgmapi_config_parameters.h>
+#include <mgmapi_configuration.hpp>
#include <NdbConfig.h>
#include <ndb_version.h>
#include <SignalLoggerManager.hpp>
@@ -331,39 +333,40 @@ atexit_stop_instance(){
* Which is protected by a mutex
*/
TransporterFacade*
-TransporterFacade::start_instance(Properties* props, const char *connectString)
-{
- bool ownProps = false;
- if (props == NULL) {
- // TransporterFacade used from API get config from mgmt srvr
- ConfigRetriever configRetriever;
- configRetriever.setConnectString(connectString);
- props = configRetriever.getConfig("API", NDB_VERSION);
- if (props == 0) {
- ndbout << "Configuration error: ";
- const char* erString = configRetriever.getErrorString();
- if (erString == 0) {
- erString = "No error specified!";
- }
- ndbout << erString << endl;
- return NULL;
+TransporterFacade::start_instance(const char * connectString){
+
+ // TransporterFacade used from API get config from mgmt srvr
+ ConfigRetriever configRetriever;
+ configRetriever.setConnectString(connectString);
+ ndb_mgm_configuration * props = configRetriever.getConfig(NDB_VERSION,
+ NODE_TYPE_API);
+ if (props == 0) {
+ ndbout << "Configuration error: ";
+ const char* erString = configRetriever.getErrorString();
+ if (erString == 0) {
+ erString = "No error specified!";
}
- props->put("LocalNodeId", configRetriever.getOwnNodeId());
- props->put("LocalNodeType", "API");
-
- ownProps = true;
+ ndbout << erString << endl;
+ return 0;
}
- TransporterFacade* tf = new TransporterFacade();
+ const int nodeId = configRetriever.getOwnNodeId();
- if (! tf->init(props)) {
+ TransporterFacade * tf = start_instance(nodeId, props);
+
+ free(props);
+ return tf;
+}
+
+TransporterFacade*
+TransporterFacade::start_instance(int nodeId,
+ const ndb_mgm_configuration* props)
+{
+ TransporterFacade* tf = new TransporterFacade();
+ if (! tf->init(nodeId, props)) {
delete tf;
return NULL;
}
- if (ownProps) {
- delete props;
- }
-
/**
* Install atexit handler
*/
@@ -498,61 +501,65 @@ TransporterFacade::TransporterFacade() :
}
bool
-TransporterFacade::init(Properties* props)
+TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
{
- IPCConfig config(props);
-
- if (config.init() != 0) {
- TRP_DEBUG( "IPCConfig object config failed to init()" );
- return false;
- }
- theOwnId = config.ownId();
-
+ theOwnId = nodeId;
theTransporterRegistry = new TransporterRegistry(this);
- if(config.configureTransporters(theTransporterRegistry) <= 0) {
+
+ const int res = IPCConfig::configureTransporters(nodeId,
+ * props,
+ * theTransporterRegistry);
+ if(res <= 0){
TRP_DEBUG( "configureTransporters returned 0 or less" );
return false;
}
+ ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
+ iter.first();
theClusterMgr = new ClusterMgr(* this);
- theClusterMgr->init(config);
-
- theReceiveThread = NdbThread_Create(runReceiveResponse_C,
- (void**)this,
- 32768,
- "ndb_receive",
- NDB_THREAD_PRIO_LOW);
-
- theSendThread = NdbThread_Create(runSendRequest_C,
- (void**)this,
- 32768,
- "ndb_send",
- NDB_THREAD_PRIO_LOW);
-
- theClusterMgr->startThread();
+ theClusterMgr->init(iter);
/**
* Unless there is a "Name", the initiated transporter is within
* an NDB Cluster. (If "Name" is defined, then the transporter
* is used to connect to a different system, i.e. NDB Cluster.)
*/
+#if 0
if (!props->contains("Name")) {
- const Properties* p = 0;
- if(!props->get("Node", ownId(), &p)) {
+#endif
+ iter.first();
+ if(iter.find(CFG_NODE_ID, nodeId)){
TRP_DEBUG( "Node info missing from config." );
return false;
}
Uint32 rank = 0;
- if (p->get("ArbitrationRank", &rank) && rank > 0) {
+ if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){
theArbitMgr = new ArbitMgr(* this);
theArbitMgr->setRank(rank);
Uint32 delay = 0;
- p->get("ArbitrationDelay", &delay);
+ iter.get(CFG_NODE_ARBIT_DELAY, &delay);
theArbitMgr->setDelay(delay);
}
+
+#if 0
}
+#endif
+
+ theReceiveThread = NdbThread_Create(runReceiveResponse_C,
+ (void**)this,
+ 32768,
+ "ndb_receive",
+ NDB_THREAD_PRIO_LOW);
+
+ theSendThread = NdbThread_Create(runSendRequest_C,
+ (void**)this,
+ 32768,
+ "ndb_send",
+ NDB_THREAD_PRIO_LOW);
+ theClusterMgr->startThread();
+
#ifdef API_TRACE
signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
#endif
diff --git a/ndb/src/ndbapi/TransporterFacade.hpp b/ndb/src/ndbapi/TransporterFacade.hpp
index 1fe72debe1c..4b76cbe864a 100644
--- a/ndb/src/ndbapi/TransporterFacade.hpp
+++ b/ndb/src/ndbapi/TransporterFacade.hpp
@@ -27,8 +27,8 @@
class ClusterMgr;
class ArbitMgr;
-class Properties;
class IPCConfig;
+struct ndb_mgm_configuration;
class Ndb;
class NdbApiSignal;
@@ -51,10 +51,11 @@ class TransporterFacade
public:
TransporterFacade();
virtual ~TransporterFacade();
- bool init(Properties* props);
+ bool init(Uint32, const ndb_mgm_configuration *);
static TransporterFacade* instance();
- static TransporterFacade* start_instance(Properties* ipcConfig, const char *connectString);
+ static TransporterFacade* start_instance(int, const ndb_mgm_configuration*);
+ static TransporterFacade* start_instance(const char *connectString);
static void stop_instance();
/**
@@ -79,7 +80,7 @@ public:
// Is node available for running transactions
bool get_node_alive(NodeId nodeId) const;
bool get_node_stopping(NodeId nodeId) const;
- bool getIsNodeDefined(NodeId nodeId) const;
+ bool getIsDbNode(NodeId nodeId) const;
bool getIsNodeSendable(NodeId nodeId) const;
Uint32 getNodeGrp(NodeId nodeId) const;
Uint32 getNodeSequence(NodeId nodeId) const;
@@ -255,8 +256,10 @@ TransporterFacade::check_send_size(Uint32 node_id, Uint32 send_size)
inline
bool
-TransporterFacade::getIsNodeDefined(NodeId n) const {
- return theClusterMgr->getNodeInfo(n).defined;
+TransporterFacade::getIsDbNode(NodeId n) const {
+ return
+ theClusterMgr->getNodeInfo(n).defined &&
+ theClusterMgr->getNodeInfo(n).m_info.m_type == NodeInfo::DB;
}
inline
diff --git a/ndb/src/ndbapi/signal-sender/SignalSender.cpp b/ndb/src/ndbapi/signal-sender/SignalSender.cpp
index e642848dcee..680d0c23b4a 100644
--- a/ndb/src/ndbapi/signal-sender/SignalSender.cpp
+++ b/ndb/src/ndbapi/signal-sender/SignalSender.cpp
@@ -71,7 +71,7 @@ SimpleSignal::print(FILE * out){
SignalSender::SignalSender(const char * connectString){
m_cond = NdbCondition_Create();
- theFacade = TransporterFacade::start_instance(0,connectString);
+ theFacade = TransporterFacade::start_instance(connectString);
m_blockNo = theFacade->open(this, execSignal, execNodeStatus);
assert(m_blockNo > 0);
}