diff options
Diffstat (limited to 'ndb/src/ndbapi/Ndb.cpp')
-rw-r--r-- | ndb/src/ndbapi/Ndb.cpp | 196 |
1 files changed, 137 insertions, 59 deletions
diff --git a/ndb/src/ndbapi/Ndb.cpp b/ndb/src/ndbapi/Ndb.cpp index fe7260c4693..f09a7481d2d 100644 --- a/ndb/src/ndbapi/Ndb.cpp +++ b/ndb/src/ndbapi/Ndb.cpp @@ -154,26 +154,22 @@ Ndb::NDB_connect(Uint32 tNode) tNdbCon->Status(NdbConnection::Connecting); // Set status to connecting Uint32 nodeSequence; { // send and receive signal - tp->lock_mutex(); + Guard guard(tp->theMutexPtr); nodeSequence = tp->getNodeSequence(tNode); bool node_is_alive = tp->get_node_alive(tNode); if (node_is_alive) { tReturnCode = tp->sendSignal(tSignal, tNode); releaseSignal(tSignal); - if (tReturnCode == -1) { - tp->unlock_mutex(); - } else { + if (tReturnCode != -1) { theWaiter.m_node = tNode; theWaiter.m_state = WAIT_TC_SEIZE; tReturnCode = receiveResponse(); }//if } else { releaseSignal(tSignal); - tp->unlock_mutex(); tReturnCode = -1; }//if } - if ((tReturnCode == 0) && (tNdbCon->Status() == NdbConnection::Connected)) { //************************************************ // Send and receive was successful @@ -463,9 +459,9 @@ Ndb::closeTransaction(NdbConnection* aConnection) CHECK_STATUS_MACRO_VOID; tCon = theTransactionList; - + if (aConnection == tCon) { // Remove the active connection object - theTransactionList = tCon->next(); // from the transaction list. + theTransactionList = tCon->next(); // from the transaction list. } else { while (aConnection != tCon) { if (tCon == NULL) { @@ -473,44 +469,33 @@ Ndb::closeTransaction(NdbConnection* aConnection) // closeTransaction called on non-existing transaction //----------------------------------------------------- - if(aConnection->theError.code == 4008){ - /** - * When a SCAN timed-out, returning the NdbConnection leads - * to reuse. And TC crashes when the API tries to reuse it to - * something else... - */ + if(aConnection->theError.code == 4008){ + /** + * When a SCAN timed-out, returning the NdbConnection leads + * to reuse. And TC crashes when the API tries to reuse it to + * something else... + */ #ifdef VM_TRACE - printf("Scan timeout:ed NdbConnection-> not returning it-> memory leak\n"); + printf("Scan timeout:ed NdbConnection-> " + "not returning it-> memory leak\n"); #endif - return; - } + return; + } #ifdef VM_TRACE - printf("Non-existing transaction into closeTransaction\n"); + printf("Non-existing transaction into closeTransaction\n"); abort(); #endif - return; + return; }//if tPreviousCon = tCon; tCon = tCon->next(); }//while tPreviousCon->next(tCon->next()); }//if - + aConnection->release(); - - if(aConnection->theError.code == 4008){ - /** - * When a SCAN timed-out, returning the NdbConnection leads - * to reuse. And TC crashes when the API tries to reuse it to - * something else... - */ -#ifdef VM_TRACE - printf("Scan timeout:ed NdbConnection-> not returning it-> memory leak\n"); -#endif - return; - } - + if(aConnection->theError.code == 4008){ /** * Something timed-out, returning the NdbConnection leads @@ -522,7 +507,7 @@ Ndb::closeTransaction(NdbConnection* aConnection) #endif return; } - + if (aConnection->theReleaseOnClose == false) { /** * Put it back in idle list for that node @@ -729,9 +714,10 @@ Ndb::getNodeId() } /**************************************************************************** -Uint64 getTupleIdFromNdb( Uint32 aTableId ); +Uint64 getTupleIdFromNdb( Uint32 aTableId, Uint32 cacheSize ); Parameters: aTableId : The TableId. + cacheSize: Prefetch this many values Remark: Returns a new TupleId to the application. The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId. It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp. @@ -750,8 +736,19 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize) return tupleId; } +Uint64 +Ndb::getAutoIncrementValue(NdbDictionary::Table * aTable, Uint32 cacheSize) +{ + DEBUG_TRACE("getAutoIncrementValue"); + if (aTable == 0) + return ~0; + const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable); + Uint64 tupleId = getTupleIdFromNdb(table->m_tableId, cacheSize); + return tupleId; +} + Uint64 -Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize ) +Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize) { const NdbTableImpl* table = theDictionary->getTable(aTableName); if (table == 0) @@ -760,7 +757,7 @@ Ndb::getTupleIdFromNdb(const char* aTableName, Uint32 cacheSize ) } Uint64 -Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize ) +Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize) { if ( theFirstTupleId[aTableId] != theLastTupleId[aTableId] ) { @@ -773,31 +770,90 @@ Ndb::getTupleIdFromNdb(Uint32 aTableId, Uint32 cacheSize ) } } +Uint64 +Ndb::readAutoIncrementValue(const char* aTableName) +{ + DEBUG_TRACE("readtAutoIncrementValue"); + const NdbTableImpl* table = theDictionary->getTable(aTableName); + if (table == 0) + return ~0; + Uint64 tupleId = readTupleIdFromNdb(table->m_tableId); + return tupleId; +} + +Uint64 +Ndb::readAutoIncrementValue(NdbDictionary::Table * aTable) +{ + DEBUG_TRACE("readtAutoIncrementValue"); + if (aTable == 0) + return ~0; + const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable); + Uint64 tupleId = readTupleIdFromNdb(table->m_tableId); + return tupleId; +} + +Uint64 +Ndb::readTupleIdFromNdb(Uint32 aTableId) +{ + if ( theFirstTupleId[aTableId] == theLastTupleId[aTableId] ) + // Cache is empty, check next in database + return opTupleIdOnNdb(aTableId, 0, 3); + + return theFirstTupleId[aTableId] + 1; +} + bool -Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val) +Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase) { DEBUG_TRACE("setAutoIncrementValue " << val); const NdbTableImpl* table = theDictionary->getTable(aTableName); if (table == 0) return false; - return setTupleIdInNdb(table->m_tableId, val); + return setTupleIdInNdb(table->m_tableId, val, increase); +} + +bool +Ndb::setAutoIncrementValue(NdbDictionary::Table * aTable, Uint64 val, bool increase) +{ + DEBUG_TRACE("setAutoIncrementValue " << val); + if (aTable == 0) + return ~0; + const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable); + return setTupleIdInNdb(table->m_tableId, val, increase); } bool -Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val ) +Ndb::setTupleIdInNdb(const char* aTableName, Uint64 val, bool increase ) { DEBUG_TRACE("setTupleIdInNdb"); const NdbTableImpl* table = theDictionary->getTable(aTableName); if (table == 0) return false; - return setTupleIdInNdb(table->m_tableId, val); + return setTupleIdInNdb(table->m_tableId, val, increase); } bool -Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val ) +Ndb::setTupleIdInNdb(Uint32 aTableId, Uint64 val, bool increase ) { DEBUG_TRACE("setTupleIdInNdb"); - return (opTupleIdOnNdb(aTableId, val, 1) == val); + if (increase) + { + if (theFirstTupleId[aTableId] != theLastTupleId[aTableId]) + { + // We have a cache sequence + if (val <= theFirstTupleId[aTableId]+1) + return false; + if (val <= theLastTupleId[aTableId]) + { + theFirstTupleId[aTableId] = val - 1; + return true; + } + // else continue; + } + return (opTupleIdOnNdb(aTableId, val, 2) == val); + } + else + return (opTupleIdOnNdb(aTableId, val, 1) == val); } Uint64 @@ -809,7 +865,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op) NdbOperation* tOperation; Uint64 tValue; NdbRecAttr* tRecAttrResult; - + int result; Uint64 ret; CHECK_STATUS_MACRO_ZERO; @@ -835,15 +891,7 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op) case 0: tOperation->interpretedUpdateTuple(); tOperation->equal("SYSKEY_0", aTableId ); - { -#ifdef WORDS_BIGENDIAN - Uint64 cacheSize64 = opValue; // XXX interpreter bug on Uint32 - tOperation->incValue("NEXTID", cacheSize64); -#else - Uint32 cacheSize32 = opValue; // XXX for little-endian - tOperation->incValue("NEXTID", cacheSize32); -#endif - } + tOperation->incValue("NEXTID", opValue); tRecAttrResult = tOperation->getValue("NEXTID"); if (tConnection->execute( Commit ) == -1 ) @@ -863,10 +911,40 @@ Ndb::opTupleIdOnNdb(Uint32 aTableId, Uint64 opValue, Uint32 op) if (tConnection->execute( Commit ) == -1 ) goto error_handler; - theFirstTupleId[aTableId] = ~0; - theLastTupleId[aTableId] = ~0; + theFirstTupleId[aTableId] = ~(Uint64)0; + theLastTupleId[aTableId] = ~(Uint64)0; ret = opValue; break; + case 2: + tOperation->interpretedUpdateTuple(); + tOperation->equal("SYSKEY_0", aTableId ); + tOperation->load_const_u64(1, opValue); + tOperation->read_attr("NEXTID", 2); + tOperation->branch_le(2, 1, 0); + tOperation->write_attr("NEXTID", 1); + tOperation->interpret_exit_ok(); + tOperation->def_label(0); + tOperation->interpret_exit_nok(9999); + + if ( (result = tConnection->execute( Commit )) == -1 ) + goto error_handler; + + if (result == 9999) + ret = ~(Uint64)0; + else + { + theFirstTupleId[aTableId] = theLastTupleId[aTableId] = opValue - 1; + ret = opValue; + } + break; + case 3: + tOperation->readTuple(); + tOperation->equal("SYSKEY_0", aTableId ); + tRecAttrResult = tOperation->getValue("NEXTID"); + if (tConnection->execute( Commit ) == -1 ) + goto error_handler; + ret = tRecAttrResult->u_64_value(); + break; default: goto error_handler; } @@ -973,13 +1051,13 @@ Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes, */ { fragment2PrimaryNodeMap = new Uint32[noOfFragments]; - - for(Uint32 i = 0; i<noOfNodes; i++){ + Uint32 i; + for(i = 0; i<noOfNodes; i++){ fragment2PrimaryNodeMap[i] = nodeIds[i]; } // Sort them (bubble sort) - for(Uint32 i = 0; i<noOfNodes-1; i++) + for(i = 0; i<noOfNodes-1; i++) for(Uint32 j = i+1; j<noOfNodes; j++) if(fragment2PrimaryNodeMap[i] > fragment2PrimaryNodeMap[j]){ Uint32 tmp = fragment2PrimaryNodeMap[i]; @@ -987,7 +1065,7 @@ Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes, fragment2PrimaryNodeMap[j] = tmp; } - for(Uint32 i = 0; i<noOfNodes; i++){ + for(i = 0; i<noOfNodes; i++){ fragment2PrimaryNodeMap[i+noOfNodes] = fragment2PrimaryNodeMap[i]; } } |