summaryrefslogtreecommitdiff
path: root/ndb/src/ndbapi/Ndb.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/ndbapi/Ndb.cpp')
-rw-r--r--ndb/src/ndbapi/Ndb.cpp196
1 files changed, 137 insertions, 59 deletions
diff --git a/ndb/src/ndbapi/Ndb.cpp b/ndb/src/ndbapi/Ndb.cpp
index fe7260c4693..f09a7481d2d 100644
--- a/ndb/src/ndbapi/Ndb.cpp
+++ b/ndb/src/ndbapi/Ndb.cpp
@@ -154,26 +154,22 @@ Ndb::NDB_connect(Uint32 tNode)
tNdbCon->Status(NdbConnection::Connecting); // Set status to connecting
Uint32 nodeSequence;
{ // send and receive signal
- tp->lock_mutex();
+ Guard guard(tp->theMutexPtr);
nodeSequence = tp->getNodeSequence(tNode);
bool node_is_alive = tp->get_node_alive(tNode);
if (node_is_alive) {
tReturnCode = tp->sendSignal(tSignal, tNode);
releaseSignal(tSignal);
- if (tReturnCode == -1) {
- tp->unlock_mutex();
- } else {
+ if (tReturnCode != -1) {
theWaiter.m_node = tNode;
theWaiter.m_state = WAIT_TC_SEIZE;
tReturnCode = receiveResponse();
}//if
} else {
releaseSignal(tSignal);
- tp->unlock_mutex();
tReturnCode = -1;
}//if
}
-
if ((tReturnCode == 0) && (tNdbCon->Status() == NdbConnection::Connected)) {
//************************************************
// Send and receive was successful
@@ -463,9 +459,9 @@ Ndb::closeTransaction(NdbConnection* aConnection)
CHECK_STATUS_MACRO_VOID;
tCon = theTransactionList;
-
+
if (aConnection == tCon) { // Remove the active connection object
- theTransactionList = tCon->next(); // from the transaction list.
+ theTransactionList = tCon->next(); // from the transaction list.
} else {
while (aConnection != tCon) {
if (tCon == NULL) {
@@ -473,44 +469,33 @@ Ndb::closeTransaction(NdbConnection* aConnection)
// closeTransaction called on non-existing transaction
//-----------------------------------------------------
- if(aConnection->theError.code == 4008){
- /**
- * When a SCAN timed-out, returning the NdbConnection leads
- * to reuse. And TC crashes when the API tries to reuse it to
- * something else...
- */
+ if(aConnection->theError.code == 4008){
+ /**
+ * When a SCAN timed-out, returning the NdbConnection leads
+ * to reuse. And TC crashes when the API tries to reuse it to
+ * something else...
+ */
#ifdef VM_TRACE
- printf("Scan timeout:ed NdbConnection-> not returning it-> memory leak\n");
+ printf("Scan timeout:ed NdbConnection-> "
+ "not returning it-> memory leak\n");
#endif
- return;
- }
+ return;
+ }
#ifdef VM_TRACE
- printf("Non-existing transaction into closeTransaction\n");
+ printf("Non-existing transaction into closeTransaction\n");
abort();
#endif
- return;
+ return;
}//if
tPreviousCon = tCon;
tCon = tCon->next();
}//while
tPreviousCon->next(tCon->next());
}//if
-
+
aConnection->release();
-
- if(aConnection->theError.code == 4008){
- /**
- * When a SCAN timed-out, returning the NdbConnection leads
- * to reuse. And TC crashes when the API tries to reuse it to
- * something else...
- */
-#ifdef VM_TRACE
- printf("Scan timeout:ed NdbConnection-> not returning it-> memory leak\n");
-#endif
- return;
- }
-
+
if(aConnection->theError.code == 4008){
/**
* Something timed-out, returning the NdbConnection leads
@@ -522,7 +507,7 @@ Ndb::closeTransaction(NdbConnection* aConnection)
#endif
return;
}
-
+
if (aConnection->theReleaseOnClose == false) {
/**
* Put it back in idle list for that node
@@ -729,9 +714,10 @@ Ndb::getNodeId()
}
/****************************************************************************
-Uint64 getTupleIdFromNdb( Uint32 aTableId );
+Uint64 getTupleIdFromNdb( Uint32 aTableId, Uint32 cacheSize );
Parameters: aTableId : The TableId.
+ cacheSize: Prefetch this many values
Remark: Returns a new TupleId to the application.
The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
@@ -750,8 +736,19 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
return tupleId;
}
+Uint64
+Ndb::getAutoIncrementValue(NdbDictionary::Table * aTable, Uint32 cacheSize)
+{
+ DEBUG_TRACE("getAutoIncrementValue");
+ if (aTable == 0)
+ return ~0;
+ const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+ Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize);
+ return tupleId;
+}
+
Uint64
-Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize )
+Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize)
{
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0)
@@ -760,7 +757,7 @@ Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize )
}
Uint64
-Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize )
+Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize)
{
if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] )
{
@@ -773,31 +770,90 @@ Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize )
}
}
+Uint64
+Ndb::readAutoIncrementValue(const char* aTableName)
+{
+ DEBUG_TRACE("readtAutoIncrementValue");
+ const NdbTableImpl* table = theDictionary->getTable(aTableName);
+ if (table == 0)
+ return ~0;
+ Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
+ return tupleId;
+}
+
+Uint64
+Ndb::readAutoIncrementValue(NdbDictionary::Table * aTable)
+{
+ DEBUG_TRACE("readtAutoIncrementValue");
+ if (aTable == 0)
+ return ~0;
+ const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+ Uint64 tupleId = readTupleIdFromNdb(table->m_tableId);
+ return tupleId;
+}
+
+Uint64
+Ndb::readTupleIdFromNdb(Uint32 aTableId)
+{
+ if ( theFirstTupleId[aTableId] == theLastTupleId[aTableId] )
+ // Cache is empty, check next in database
+ return opTupleIdOnNdb(aTableId, 0, 3);
+
+ return theFirstTupleId[aTableId] + 1;
+}
+
bool
-Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val)
+Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
{
DEBUG_TRACE("setAutoIncrementValue " << val);
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0)
return false;
- return setTupleIdInNdb(table->m_tableId, val);
+ return setTupleIdInNdb(table->m_tableId, val, increase);
+}
+
+bool
+Ndb::setAutoIncrementValue(NdbDictionary::Table * aTable, Uint64 val, bool increase)
+{
+ DEBUG_TRACE("setAutoIncrementValue " << val);
+ if (aTable == 0)
+ return ~0;
+ const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
+ return setTupleIdInNdb(table->m_tableId, val, increase);
}
bool
-Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val )
+Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase )
{
DEBUG_TRACE("setTupleIdInNdb");
const NdbTableImpl* table = theDictionary->getTable(aTableName);
if (table == 0)
return false;
- return setTupleIdInNdb(table->m_tableId, val);
+ return setTupleIdInNdb(table->m_tableId, val, increase);
}
bool
-Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val )
+Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase )
{
DEBUG_TRACE("setTupleIdInNdb");
- return (opTupleIdOnNdb(aTableId, val, 1) == val);
+ if (increase)
+ {
+ if (theFirstTupleId[aTableId] != theLastTupleId[aTableId])
+ {
+ // We have a cache sequence
+ if (val <= theFirstTupleId[aTableId]+1)
+ return false;
+ if (val <= theLastTupleId[aTableId])
+ {
+ theFirstTupleId[aTableId] = val - 1;
+ return true;
+ }
+ // else continue;
+ }
+ return (opTupleIdOnNdb(aTableId, val, 2) == val);
+ }
+ else
+ return (opTupleIdOnNdb(aTableId, val, 1) == val);
}
Uint64
@@ -809,7 +865,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
NdbOperation* tOperation;
Uint64 tValue;
NdbRecAttr* tRecAttrResult;
-
+ int result;
Uint64 ret;
CHECK_STATUS_MACRO_ZERO;
@@ -835,15 +891,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
case 0:
tOperation->interpretedUpdateTuple();
tOperation->equal("SYSKEY_0", aTableId );
- {
-#ifdef WORDS_BIGENDIAN
- Uint64 cacheSize64 = opValue; // XXX interpreter bug on Uint32
- tOperation->incValue("NEXTID", cacheSize64);
-#else
- Uint32 cacheSize32 = opValue; // XXX for little-endian
- tOperation->incValue("NEXTID", cacheSize32);
-#endif
- }
+ tOperation->incValue("NEXTID", opValue);
tRecAttrResult = tOperation->getValue("NEXTID");
if (tConnection->execute( Commit ) == -1 )
@@ -863,10 +911,40 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op)
if (tConnection->execute( Commit ) == -1 )
goto error_handler;
- theFirstTupleId[aTableId] = ~0;
- theLastTupleId[aTableId] = ~0;
+ theFirstTupleId[aTableId] = ~(Uint64)0;
+ theLastTupleId[aTableId] = ~(Uint64)0;
ret = opValue;
break;
+ case 2:
+ tOperation->interpretedUpdateTuple();
+ tOperation->equal("SYSKEY_0", aTableId );
+ tOperation->load_const_u64(1, opValue);
+ tOperation->read_attr("NEXTID", 2);
+ tOperation->branch_le(2, 1, 0);
+ tOperation->write_attr("NEXTID", 1);
+ tOperation->interpret_exit_ok();
+ tOperation->def_label(0);
+ tOperation->interpret_exit_nok(9999);
+
+ if ( (result = tConnection->execute( Commit )) == -1 )
+ goto error_handler;
+
+ if (result == 9999)
+ ret = ~(Uint64)0;
+ else
+ {
+ theFirstTupleId[aTableId] = theLastTupleId[aTableId] = opValue - 1;
+ ret = opValue;
+ }
+ break;
+ case 3:
+ tOperation->readTuple();
+ tOperation->equal("SYSKEY_0", aTableId );
+ tRecAttrResult = tOperation->getValue("NEXTID");
+ if (tConnection->execute( Commit ) == -1 )
+ goto error_handler;
+ ret = tRecAttrResult->u_64_value();
+ break;
default:
goto error_handler;
}
@@ -973,13 +1051,13 @@ Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes,
*/
{
fragment2PrimaryNodeMap = new Uint32[noOfFragments];
-
- for(Uint32 i = 0; i<noOfNodes; i++){
+ Uint32 i;
+ for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i] = nodeIds[i];
}
// Sort them (bubble sort)
- for(Uint32 i = 0; i<noOfNodes-1; i++)
+ for(i = 0; i<noOfNodes-1; i++)
for(Uint32 j = i+1; j<noOfNodes; j++)
if(fragment2PrimaryNodeMap[i] > fragment2PrimaryNodeMap[j]){
Uint32 tmp = fragment2PrimaryNodeMap[i];
@@ -987,7 +1065,7 @@ Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes,
fragment2PrimaryNodeMap[j] = tmp;
}
- for(Uint32 i = 0; i<noOfNodes; i++){
+ for(i = 0; i<noOfNodes; i++){
fragment2PrimaryNodeMap[i+noOfNodes] = fragment2PrimaryNodeMap[i];
}
}