diff options
author | unknown <mronstrom@mysql.com[mikron]> | 2005-07-26 17:12:05 +0200 |
---|---|---|
committer | unknown <mronstrom@mysql.com[mikron]> | 2005-07-26 17:12:05 +0200 |
commit | 6cc3eee46e1ede6f48807c42aba7e4a6062a5959 (patch) | |
tree | 9d97d11f76f5e3f099a1376376766cda02859616 | |
parent | bc2776d9571f70df27473eb7f0420bd84799821f (diff) | |
download | mariadb-git-6cc3eee46e1ede6f48807c42aba7e4a6062a5959.tar.gz |
wl2405.patch
storage/ndb/include/ndbapi/Ndb.hpp:
Import patch wl2405.patch
storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp:
Import patch wl2405.patch
storage/ndb/include/ndbapi/NdbScanOperation.hpp:
Import patch wl2405.patch
storage/ndb/include/portlib/NdbThread.h:
Import patch wl2405.patch
storage/ndb/src/common/portlib/NdbThread.c:
Import patch wl2405.patch
storage/ndb/src/common/transporter/TransporterRegistry.cpp:
Import patch wl2405.patch
storage/ndb/src/ndbapi/Ndb.cpp:
Import patch wl2405.patch
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
Import patch wl2405.patch
storage/ndb/src/ndbapi/NdbScanOperation.cpp:
Import patch wl2405.patch
storage/ndb/src/ndbapi/NdbWaiter.hpp:
Import patch wl2405.patch
storage/ndb/src/ndbapi/Ndbif.cpp:
Import patch wl2405.patch
storage/ndb/src/ndbapi/Ndbinit.cpp:
Import patch wl2405.patch
storage/ndb/src/ndbapi/TransporterFacade.cpp:
Import patch wl2405.patch
storage/ndb/src/ndbapi/TransporterFacade.hpp:
Import patch wl2405.patch
-rw-r--r-- | storage/ndb/include/ndbapi/Ndb.hpp | 17 | ||||
-rw-r--r-- | storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp | 2 | ||||
-rw-r--r-- | storage/ndb/include/ndbapi/NdbScanOperation.hpp | 7 | ||||
-rw-r--r-- | storage/ndb/include/portlib/NdbThread.h | 8 | ||||
-rw-r--r-- | storage/ndb/src/common/portlib/NdbThread.c | 30 | ||||
-rw-r--r-- | storage/ndb/src/common/transporter/TransporterRegistry.cpp | 9 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/Ndb.cpp | 20 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp | 41 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/NdbScanOperation.cpp | 109 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/NdbWaiter.hpp | 39 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/Ndbif.cpp | 190 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/Ndbinit.cpp | 5 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/TransporterFacade.cpp | 324 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/TransporterFacade.hpp | 64 |
14 files changed, 640 insertions, 225 deletions
diff --git a/storage/ndb/include/ndbapi/Ndb.hpp b/storage/ndb/include/ndbapi/Ndb.hpp index db2212075e8..1c9d5ed8ad0 100644 --- a/storage/ndb/include/ndbapi/Ndb.hpp +++ b/storage/ndb/include/ndbapi/Ndb.hpp @@ -984,6 +984,8 @@ class BaseString; class NdbEventOperation; class NdbBlob; class NdbReceiver; +class TransporterFacade; +class PollGuard; typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*); @@ -1462,7 +1464,12 @@ public: /***************************************************************************** * These are service routines used by the other classes in the NDBAPI. ****************************************************************************/ + Uint32 get_cond_wait_index() { return cond_wait_index; } + void set_cond_wait_index(Uint32 index) { cond_wait_index = index; } private: + Uint32 cond_wait_index; + Ndb *cond_signal_ndb; + void cond_signal(); void setup(Ndb_cluster_connection *ndb_cluster_connection, const char* aCatalogName, const char* aSchemaName); @@ -1513,13 +1520,11 @@ private: // synchronous and asynchronous interface void handleReceivedSignal(NdbApiSignal* anApiSignal, struct LinearSectionPtr ptr[3]); - // Receive response signals - int receiveResponse(int waitTime = WAITFOR_RESPONSE_TIMEOUT); - int sendRecSignal(Uint16 aNodeId, Uint32 aWaitState, NdbApiSignal* aSignal, - Uint32 nodeSequence); + Uint32 nodeSequence, + Uint32 *ret_conn_seq= 0); // Sets Restart GCI in Ndb object void RestartGCI(int aRestartGCI); @@ -1576,7 +1581,9 @@ private: Uint32 pollCompleted(NdbTransaction** aCopyArray); void sendPrepTrans(int forceSend); void reportCallback(NdbTransaction** aCopyArray, Uint32 aNoOfComplTrans); - void waitCompletedTransactions(int milliSecs, int noOfEventsToWaitFor); + int poll_trans(int milliSecs, int noOfEventsToWaitFor, PollGuard *pg); + void waitCompletedTransactions(int milliSecs, int noOfEventsToWaitFor, + PollGuard *pg); void completedTransaction(NdbTransaction* aTransaction); void completedScanTransaction(NdbTransaction* aTransaction); diff --git a/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp b/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp index 0a31f228921..c231b927581 100644 --- a/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp +++ b/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp @@ -161,7 +161,7 @@ private: void fix_get_values(); int next_result_ordered(bool fetchAllowed, bool forceSend = false); - int send_next_scan_ordered(Uint32 idx, bool forceSend = false); + int send_next_scan_ordered(Uint32 idx); int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*); Uint32 m_sort_columns; diff --git a/storage/ndb/include/ndbapi/NdbScanOperation.hpp b/storage/ndb/include/ndbapi/NdbScanOperation.hpp index bf8f362cefc..b32f6050704 100644 --- a/storage/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/storage/ndb/include/ndbapi/NdbScanOperation.hpp @@ -21,6 +21,7 @@ class NdbBlob; class NdbResultSet; +class PollGuard; /** * @class NdbScanOperation @@ -183,7 +184,8 @@ protected: int nextResultImpl(bool fetchAllowed = true, bool forceSend = false); virtual void release(); - int close_impl(class TransporterFacade*, bool forceSend = false); + int close_impl(class TransporterFacade*, bool forceSend, + PollGuard *poll_guard); // Overloaded methods from NdbCursorOperation int executeCursor(int ProcessorId); @@ -192,7 +194,6 @@ protected: int init(const NdbTableImpl* tab, NdbTransaction*); int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId); int doSend(int ProcessorId); - void checkForceSend(bool forceSend); virtual void setErrorCode(int aErrorCode); virtual void setErrorCodeAbort(int aErrorCode); @@ -234,7 +235,7 @@ protected: Uint32 m_sent_receivers_count; // NOTE needs mutex to access NdbReceiver** m_sent_receivers; // receive thread puts them here - int send_next_scan(Uint32 cnt, bool close, bool forceSend = false); + int send_next_scan(Uint32 cnt, bool close); void receiver_delivered(NdbReceiver*); void receiver_completed(NdbReceiver*); void execCLOSE_SCAN_REP(); diff --git a/storage/ndb/include/portlib/NdbThread.h b/storage/ndb/include/portlib/NdbThread.h index e86deee4354..003a7e3e151 100644 --- a/storage/ndb/include/portlib/NdbThread.h +++ b/storage/ndb/include/portlib/NdbThread.h @@ -37,6 +37,14 @@ typedef size_t NDB_THREAD_STACKSIZE; struct NdbThread; +/* + Method to block/unblock thread from receiving KILL signal with + signum set in g_ndb_shm_signum in a portable manner. +*/ +#ifdef NDB_SHM_TRANSPORTER +void NdbThread_set_shm_sigmask(bool block); +#endif + /** * Create a thread * diff --git a/storage/ndb/src/common/portlib/NdbThread.c b/storage/ndb/src/common/portlib/NdbThread.c index 55ebc4c8111..85c2de4fd5e 100644 --- a/storage/ndb/src/common/portlib/NdbThread.c +++ b/storage/ndb/src/common/portlib/NdbThread.c @@ -36,6 +36,27 @@ struct NdbThread void * object; }; + +#ifdef NDB_SHM_TRANSPORTER +void NdbThread_set_shm_sigmask(bool block) +{ + DBUG_ENTER("NdbThread_set_shm_sigmask"); + if (g_ndb_shm_signum) + { + sigset_t mask; + DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum)); + sigemptyset(&mask); + sigaddset(&mask, g_ndb_shm_signum); + if (block) + pthread_sigmask(SIG_BLOCK, &mask, 0); + else + pthread_sigmask(SIG_UNBLOCK, &mask, 0); + } + DBUG_VOID_RETURN; +} +#endif + + static void* ndb_thread_wrapper(void* _ss){ @@ -43,14 +64,7 @@ ndb_thread_wrapper(void* _ss){ { DBUG_ENTER("ndb_thread_wrapper"); #ifdef NDB_SHM_TRANSPORTER - if (g_ndb_shm_signum) - { - sigset_t mask; - DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum)); - sigemptyset(&mask); - sigaddset(&mask, g_ndb_shm_signum); - pthread_sigmask(SIG_BLOCK, &mask, 0); - } + NdbThread_set_shm_sigmask(true); #endif { void *ret; diff --git a/storage/ndb/src/common/transporter/TransporterRegistry.cpp b/storage/ndb/src/common/transporter/TransporterRegistry.cpp index 86bfa385c04..963f5020bd4 100644 --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp @@ -457,10 +457,7 @@ TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) { * Make sure to block g_ndb_shm_signum * TransporterRegistry::init is run from "main" thread */ - sigset_t mask; - sigemptyset(&mask); - sigaddset(&mask, g_ndb_shm_signum); - pthread_sigmask(SIG_BLOCK, &mask, 0); + NdbThread_set_shm_sigmask(true); } if(config->shm.signum != g_ndb_shm_signum) @@ -1490,11 +1487,9 @@ TransporterRegistry::startReceiving() DBUG_PRINT("info",("Install signal handler for signum %d", g_ndb_shm_signum)); struct sigaction sa; + NdbThread_set_shm_sigmask(false); sigemptyset(&sa.sa_mask); - sigaddset(&sa.sa_mask, g_ndb_shm_signum); - pthread_sigmask(SIG_UNBLOCK, &sa.sa_mask, 0); sa.sa_handler = shm_sig_handler; - sigemptyset(&sa.sa_mask); sa.sa_flags = 0; int ret; while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR); diff --git a/storage/ndb/src/ndbapi/Ndb.cpp b/storage/ndb/src/ndbapi/Ndb.cpp index 7893aaae15c..c5d32f59c3c 100644 --- a/storage/ndb/src/ndbapi/Ndb.cpp +++ b/storage/ndb/src/ndbapi/Ndb.cpp @@ -173,23 +173,9 @@ Ndb::NDB_connect(Uint32 tNode) tSignal->setData(theMyRef, 2); // Set my block reference tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting Uint32 nodeSequence; - { // send and receive signal - Guard guard(tp->theMutexPtr); - nodeSequence = tp->getNodeSequence(tNode); - bool node_is_alive = tp->get_node_alive(tNode); - if (node_is_alive) { - tReturnCode = tp->sendSignal(tSignal, tNode); - releaseSignal(tSignal); - if (tReturnCode != -1) { - theImpl->theWaiter.m_node = tNode; - theImpl->theWaiter.m_state = WAIT_TC_SEIZE; - tReturnCode = receiveResponse(); - }//if - } else { - releaseSignal(tSignal); - tReturnCode = -1; - }//if - } + tReturnCode= sendRecSignal(tNode, WAIT_TC_SEIZE, tSignal, + 0, &nodeSequence); + releaseSignal(tSignal); if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) { //************************************************ // Send and receive was successful diff --git a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp index ce5827ab7f4..582f4baaef4 100644 --- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -989,7 +989,13 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal, m_buffer.clear(); // Protected area - m_transporter->lock_mutex(); + /* + The PollGuard has an implicit call of unlock_and_signal through the + ~PollGuard method. This method is called implicitly by the compiler + in all places where the object is out of context due to a return, + break, continue or simply end of statement block + */ + PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference)); Uint32 aNodeId; if (useMasterNodeId) { if ((m_masterNodeId == 0) || @@ -1002,7 +1008,6 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal, } if(aNodeId == 0){ m_error.code= 4009; - m_transporter->unlock_mutex(); DBUG_RETURN(-1); } { @@ -1023,21 +1028,15 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal, r = m_transporter->sendSignal(signal, aNodeId); } if(r != 0){ - m_transporter->unlock_mutex(); continue; } } m_error.code= 0; - - m_waiter.m_node = aNodeId; - m_waiter.m_state = wst; - - m_waiter.wait(theWait); - m_transporter->unlock_mutex(); + int ret_val= poll_guard.wait_n_unlock(theWait, aNodeId, wst); // End of Protected area - if(m_waiter.m_state == NO_WAIT && m_error.code == 0){ + if(ret_val == 0 && m_error.code == 0){ // Normal return DBUG_RETURN(0); } @@ -1045,7 +1044,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal, /** * Handle error codes */ - if(m_waiter.m_state == WAIT_NODE_FAILURE) + if(ret_val == -2) //WAIT_NODE_FAILURE continue; if(m_waiter.m_state == WST_WAIT_TIMEOUT) @@ -3166,26 +3165,28 @@ NdbDictInterface::listObjects(NdbApiSignal* signal) for (Uint32 i = 0; i < RETRIES; i++) { m_buffer.clear(); // begin protected - m_transporter->lock_mutex(); + /* + The PollGuard has an implicit call of unlock_and_signal through the + ~PollGuard method. This method is called implicitly by the compiler + in all places where the object is out of context due to a return, + break, continue or simply end of statement block + */ + PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference)); Uint16 aNodeId = m_transporter->get_an_alive_node(); if (aNodeId == 0) { m_error.code= 4009; - m_transporter->unlock_mutex(); return -1; } if (m_transporter->sendSignal(signal, aNodeId) != 0) { - m_transporter->unlock_mutex(); continue; } m_error.code= 0; - m_waiter.m_node = aNodeId; - m_waiter.m_state = WAIT_LIST_TABLES_CONF; - m_waiter.wait(WAITFOR_RESPONSE_TIMEOUT); - m_transporter->unlock_mutex(); + int ret_val= poll_guard.wait_n_unlock(WAITFOR_RESPONSE_TIMEOUT, + aNodeId, WAIT_LIST_TABLES_CONF); // end protected - if (m_waiter.m_state == NO_WAIT && m_error.code == 0) + if (ret_val == 0 && m_error.code == 0) return 0; - if (m_waiter.m_state == WAIT_NODE_FAILURE) + if (ret_val == -2) //WAIT_NODE_FAILURE continue; return -1; } diff --git a/storage/ndb/src/ndbapi/NdbScanOperation.cpp b/storage/ndb/src/ndbapi/NdbScanOperation.cpp index e0a480e02f7..288b8dc8bd8 100644 --- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp @@ -460,13 +460,20 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend) Uint32 nodeId = theNdbCon->theDBnode; TransporterFacade* tp = TransporterFacade::instance(); - Guard guard(tp->theMutexPtr); + /* + The PollGuard has an implicit call of unlock_and_signal through the + ~PollGuard method. This method is called implicitly by the compiler + in all places where the object is out of context due to a return, + break, continue or simply end of statement block + */ + PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter, + theNdb->theNdbBlockNumber); if(theError.code) return -1; Uint32 seq = theNdbCon->theNodeSequence; - if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false, - forceSend) == 0){ + if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0) + { idx = m_current_api_receiver; last = m_api_receivers_count; @@ -495,10 +502,9 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend) /** * No completed... */ - theNdb->theImpl->theWaiter.m_node = nodeId; - theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; - int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); - if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { + int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId, + forceSend); + if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) { continue; } else { idx = last; @@ -557,8 +563,8 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend) } int -NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, - bool forceSend){ +NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag) +{ if(cnt > 0){ NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); @@ -605,9 +611,6 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, ret = tp->sendSignal(&tSignal, nodeId); } } - - if (!ret) checkForceSend(forceSend); - m_sent_receivers_count = last + sent; m_api_receivers_count -= cnt; m_current_api_receiver = 0; @@ -617,15 +620,6 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, return 0; } -void NdbScanOperation::checkForceSend(bool forceSend) -{ - if (forceSend) { - TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber); - } else { - TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber); - }//if -} - int NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId) { @@ -661,9 +655,15 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp) m_sent_receivers_count); TransporterFacade* tp = TransporterFacade::instance(); - Guard guard(tp->theMutexPtr); - close_impl(tp, forceSend); - + /* + The PollGuard has an implicit call of unlock_and_signal through the + ~PollGuard method. This method is called implicitly by the compiler + in all places where the object is out of context due to a return, + break, continue or simply end of statement block + */ + PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter, + theNdb->theNdbBlockNumber); + close_impl(tp, forceSend, &poll_guard); } NdbConnection* tCon = theNdbCon; @@ -1338,20 +1338,26 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, if(fetchAllowed){ if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch..."); TransporterFacade* tp = TransporterFacade::instance(); - Guard guard(tp->theMutexPtr); + /* + The PollGuard has an implicit call of unlock_and_signal through the + ~PollGuard method. This method is called implicitly by the compiler + in all places where the object is out of context due to a return, + break, continue or simply end of statement block + */ + PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter, + theNdb->theNdbBlockNumber); if(theError.code) return -1; Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; if(seq == tp->getNodeSequence(nodeId) && - !send_next_scan_ordered(s_idx, forceSend)){ + !send_next_scan_ordered(s_idx)){ Uint32 tmp = m_sent_receivers_count; s_idx = m_current_api_receiver; while(m_sent_receivers_count > 0 && !theError.code){ - theNdb->theImpl->theWaiter.m_node = nodeId; - theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; - int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); - if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { + int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId, + forceSend); + if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) { continue; } if(DEBUG_NEXT_RESULT) ndbout_c("return -1"); @@ -1438,7 +1444,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, } int -NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){ +NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx) +{ if(idx == theParallelism) return 0; @@ -1476,12 +1483,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){ TransporterFacade * tp = TransporterFacade::instance(); tSignal.setLength(4+1); int ret= tp->sendSignal(&tSignal, nodeId); - if (!ret) checkForceSend(forceSend); return ret; } int -NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ +NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend, + PollGuard *poll_guard) +{ Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; @@ -1496,9 +1504,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ */ while(theError.code == 0 && m_sent_receivers_count) { - theNdb->theImpl->theWaiter.m_node = nodeId; - theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; - int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId, + false); switch(return_code){ case 0: break; @@ -1555,7 +1562,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ } // Send close scan - if(send_next_scan(api+conf, true, forceSend) == -1) + if(send_next_scan(api+conf, true) == -1) { theNdbCon->theReleaseOnClose = true; return -1; @@ -1566,9 +1573,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ */ while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count) { - theNdb->theImpl->theWaiter.m_node = nodeId; - theNdb->theImpl->theWaiter.m_state = WAIT_SCAN; - int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); + int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId, + forceSend); switch(return_code){ case 0: break; @@ -1608,12 +1614,19 @@ NdbScanOperation::restart(bool forceSend) { TransporterFacade* tp = TransporterFacade::instance(); - Guard guard(tp->theMutexPtr); + /* + The PollGuard has an implicit call of unlock_and_signal through the + ~PollGuard method. This method is called implicitly by the compiler + in all places where the object is out of context due to a return, + break, continue or simply end of statement block + */ + PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter, + theNdb->theNdbBlockNumber); Uint32 nodeId = theNdbCon->theDBnode; { int res; - if((res= close_impl(tp, forceSend))) + if((res= close_impl(tp, forceSend, &poll_guard))) { return res; } @@ -1627,7 +1640,6 @@ NdbScanOperation::restart(bool forceSend) theError.code = 0; if (doSendScan(nodeId) == -1) return -1; - return 0; } @@ -1637,8 +1649,15 @@ NdbIndexScanOperation::reset_bounds(bool forceSend){ { TransporterFacade* tp = TransporterFacade::instance(); - Guard guard(tp->theMutexPtr); - res= close_impl(tp, forceSend); + /* + The PollGuard has an implicit call of unlock_and_signal through the + ~PollGuard method. This method is called implicitly by the compiler + in all places where the object is out of context due to a return, + break, continue or simply end of statement block + */ + PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter, + theNdb->theNdbBlockNumber); + res= close_impl(tp, forceSend, &poll_guard); } if(!res) diff --git a/storage/ndb/src/ndbapi/NdbWaiter.hpp b/storage/ndb/src/ndbapi/NdbWaiter.hpp index 8b7b2a75879..4ccfb40b5ba 100644 --- a/storage/ndb/src/ndbapi/NdbWaiter.hpp +++ b/storage/ndb/src/ndbapi/NdbWaiter.hpp @@ -54,10 +54,19 @@ public: void wait(int waitTime); void nodeFail(Uint32 node); void signal(Uint32 state); + void cond_signal(); + void set_poll_owner(bool poll_owner) { m_poll_owner= poll_owner; } + Uint32 get_state() { return m_state; } + void set_state(Uint32 state) { m_state= state; } + void set_node(Uint32 node) { m_node= node; } + Uint32 get_cond_wait_index() { return m_cond_wait_index; } + void set_cond_wait_index(Uint32 index) { m_cond_wait_index= index; } Uint32 m_node; Uint32 m_state; void * m_mutex; + bool m_poll_owner; + Uint32 m_cond_wait_index; struct NdbCondition * m_condition; }; @@ -65,22 +74,8 @@ inline void NdbWaiter::wait(int waitTime) { - const bool forever = (waitTime == -1); - const NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime; - while (1) { - if (m_state == NO_WAIT || m_state == WAIT_NODE_FAILURE) - break; - if (forever) { - NdbCondition_Wait(m_condition, (NdbMutex*)m_mutex); - } else { - if (waitTime <= 0) { - m_state = WST_WAIT_TIMEOUT; - break; - } - NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime); - waitTime = maxTime - NdbTick_CurrentMillisecond(); - } - } + assert(!m_poll_owner); + NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime); } inline @@ -88,7 +83,8 @@ void NdbWaiter::nodeFail(Uint32 aNodeId){ if (m_state != NO_WAIT && m_node == aNodeId){ m_state = WAIT_NODE_FAILURE; - NdbCondition_Signal(m_condition); + if (!m_poll_owner) + NdbCondition_Signal(m_condition); } } @@ -96,7 +92,14 @@ inline void NdbWaiter::signal(Uint32 state){ m_state = state; - NdbCondition_Signal(m_condition); + if (!m_poll_owner) + NdbCondition_Signal(m_condition); } +inline +void +NdbWaiter::cond_signal() +{ + NdbCondition_Signal(m_condition); +} #endif diff --git a/storage/ndb/src/ndbapi/Ndbif.cpp b/storage/ndb/src/ndbapi/Ndbif.cpp index fee6f0930ad..ca7b5aee84a 100644 --- a/storage/ndb/src/ndbapi/Ndbif.cpp +++ b/storage/ndb/src/ndbapi/Ndbif.cpp @@ -332,6 +332,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) const Uint32 tFirstData = *tDataPtr; const Uint32 tLen = aSignal->getLength(); void * tFirstDataPtr; + NdbWaiter *t_waiter; /* In order to support 64 bit processes in the application we need to use @@ -470,7 +471,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) ndbout_c("Recevied TCKEY_FAILREF wo/ operation"); #endif return; - break; + return; } case GSN_TCKEYREF: { @@ -677,12 +678,12 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) case GSN_LIST_TABLES_CONF: NdbDictInterface::execSignal(&theDictionary->m_receiver, aSignal, ptr); - break; + return; case GSN_SUB_META_DATA: case GSN_SUB_REMOVE_CONF: case GSN_SUB_REMOVE_REF: - break; // ignore these signals + return; // ignore these signals case GSN_SUB_GCP_COMPLETE_REP: case GSN_SUB_START_CONF: case GSN_SUB_START_REF: @@ -691,7 +692,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) case GSN_SUB_STOP_REF: NdbDictInterface::execSignal(&theDictionary->m_receiver, aSignal, ptr); - break; + return; case GSN_DIHNDBTAMPER: { @@ -833,11 +834,32 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3]) } default: goto InvalidSignal; - }//switch - - if (theImpl->theWaiter.m_state == NO_WAIT) { - // Wake up the thread waiting for response - NdbCondition_Signal(theImpl->theWaiter.m_condition); + }//swich + + t_waiter= &theImpl->theWaiter; + if (t_waiter->get_state() == NO_WAIT && tWaitState != NO_WAIT) + { + /* + If our waiter object is the owner of the "poll rights", then we + can simply return, we will return from this routine to the + place where external_poll was called. From there it will move + the "poll ownership" to a new thread if available. + + If our waiter object doesn't own the "poll rights", then we must + signal the thread from where this waiter object called + its conditional wait. This will wake up this thread so that it + can continue its work. + */ + TransporterFacade *tp= TransporterFacade::instance(); + if (tp->get_poll_owner() != t_waiter) + { + /* + Wake up the thread waiting for response and remove it from queue + of objects waiting for receive completion + */ + tp->remove_from_cond_wait_queue(t_waiter); + t_waiter->cond_signal(); + } }//if return; @@ -892,7 +914,19 @@ Ndb::completedTransaction(NdbTransaction* aCon) if ((theMinNoOfEventsToWakeUp != 0) && (theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) { theMinNoOfEventsToWakeUp = 0; - NdbCondition_Signal(theImpl->theWaiter.m_condition); + TransporterFacade *tp = TransporterFacade::instance(); + NdbWaiter *t_waiter= &theImpl->theWaiter; + if (tp->get_poll_owner() != t_waiter) { + /* + When we come here, this is executed by the thread owning the "poll + rights". This thread is not where our waiter object belongs. + Thus we wake up the thread owning this waiter object but first + we must remove it from the conditional wait queue so that we + don't assign it as poll owner later on. + */ + tp->remove_from_cond_wait_queue(t_waiter); + t_waiter->cond_signal(); + } return; }//if } else { @@ -1151,7 +1185,8 @@ Remark: First send all prepared operations and then check if there are any ******************************************************************************/ void Ndb::waitCompletedTransactions(int aMilliSecondsToWait, - int noOfEventsToWaitFor) + int noOfEventsToWaitFor, + PollGuard *poll_guard) { theImpl->theWaiter.m_state = NO_WAIT; /** @@ -1160,22 +1195,24 @@ Ndb::waitCompletedTransactions(int aMilliSecondsToWait, * (see ReportFailure) */ int waitTime = aMilliSecondsToWait; - NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + (NDB_TICKS)waitTime; + NDB_TICKS currTime = NdbTick_CurrentMillisecond(); + NDB_TICKS maxTime = currTime + (NDB_TICKS)waitTime; theMinNoOfEventsToWakeUp = noOfEventsToWaitFor; do { if (waitTime < 1000) waitTime = 1000; - NdbCondition_WaitTimeout(theImpl->theWaiter.m_condition, - (NdbMutex*)theImpl->theWaiter.m_mutex, - waitTime); + poll_guard->wait_for_input(waitTime); if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) { break; }//if theMinNoOfEventsToWakeUp = noOfEventsToWaitFor; waitTime = (int)(maxTime - NdbTick_CurrentMillisecond()); } while (waitTime > 0); - return; }//Ndb::waitCompletedTransactions() +void Ndb::cond_signal() +{ + NdbCondition_Signal(theImpl->theWaiter.m_condition); +} /***************************************************************************** void sendPreparedTransactions(int forceSend = 0); @@ -1203,28 +1240,39 @@ Remark: First send all prepared operations and then check if there are any int Ndb::sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup, int forceSend) { + /* + The PollGuard has an implicit call of unlock_and_signal through the + ~PollGuard method. This method is called implicitly by the compiler + in all places where the object is out of context due to a return, + break, continue or simply end of statement block + */ + PollGuard pg(TransporterFacade::instance(), &theImpl->theWaiter, + theNdbBlockNumber); + sendPrepTrans(forceSend); + return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg); +} + +int +Ndb::poll_trans(int aMillisecondNumber, int minNoOfEventsToWakeup, + PollGuard *pg) +{ NdbTransaction* tConArray[1024]; Uint32 tNoCompletedTransactions; - - //theCurrentConnectCounter = 0; - //theCurrentConnectIndex++; - TransporterFacade::instance()->lock_mutex(); - sendPrepTrans(forceSend); if ((minNoOfEventsToWakeup <= 0) || ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) { minNoOfEventsToWakeup = theNoOfSentTransactions; }//if if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) && (aMillisecondNumber > 0)) { - waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup); + waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup, pg); tNoCompletedTransactions = pollCompleted(tConArray); } else { tNoCompletedTransactions = pollCompleted(tConArray); }//if - TransporterFacade::instance()->unlock_mutex(); + pg->unlock_and_signal(); reportCallback(tConArray, tNoCompletedTransactions); return tNoCompletedTransactions; -}//Ndb::sendPollNdb() +} /***************************************************************************** int pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup); @@ -1236,67 +1284,23 @@ Remark: Check if there are any transactions already completed. Wait for not int Ndb::pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup) { - NdbTransaction* tConArray[1024]; - Uint32 tNoCompletedTransactions; - - //theCurrentConnectCounter = 0; - //theCurrentConnectIndex++; - TransporterFacade::instance()->lock_mutex(); - if ((minNoOfEventsToWakeup == 0) || - ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) { - minNoOfEventsToWakeup = theNoOfSentTransactions; - }//if - if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) && - (aMillisecondNumber > 0)) { - waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup); - tNoCompletedTransactions = pollCompleted(tConArray); - } else { - tNoCompletedTransactions = pollCompleted(tConArray); - }//if - TransporterFacade::instance()->unlock_mutex(); - reportCallback(tConArray, tNoCompletedTransactions); - return tNoCompletedTransactions; -}//Ndb::sendPollNdbWithoutWait() - -/***************************************************************************** -int receiveOptimisedResponse(); - -Return: 0 - Response received - -1 - Timeout occured waiting for response - -2 - Node failure interupted wait for response - -******************************************************************************/ -int -Ndb::receiveResponse(int waitTime){ - int tResultCode; - TransporterFacade::instance()->checkForceSend(theNdbBlockNumber); - - theImpl->theWaiter.wait(waitTime); - - if(theImpl->theWaiter.m_state == NO_WAIT) { - tResultCode = 0; - } else { - -#ifdef VM_TRACE - ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = "; - ndbout << theImpl->theWaiter.m_state << endl; -#endif - - if (theImpl->theWaiter.m_state == WAIT_NODE_FAILURE){ - tResultCode = -2; - } else { - tResultCode = -1; - } - theImpl->theWaiter.m_state = NO_WAIT; - } - return tResultCode; -}//Ndb::receiveResponse() + /* + The PollGuard has an implicit call of unlock_and_signal through the + ~PollGuard method. This method is called implicitly by the compiler + in all places where the object is out of context due to a return, + break, continue or simply end of statement block + */ + PollGuard pg(TransporterFacade::instance(), &theImpl->theWaiter, + theNdbBlockNumber); + return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg); +} int Ndb::sendRecSignal(Uint16 node_id, Uint32 aWaitState, NdbApiSignal* aSignal, - Uint32 conn_seq) + Uint32 conn_seq, + Uint32 *ret_conn_seq) { /* In most situations 0 is returned. @@ -1309,19 +1313,28 @@ Ndb::sendRecSignal(Uint16 node_id, */ int return_code; + Uint32 read_conn_seq; TransporterFacade* tp = TransporterFacade::instance(); - Uint32 send_size = 1; // Always sends one signal only - tp->lock_mutex(); + Uint32 send_size = 1; // Always sends one signal only // Protected area + /* + The PollGuard has an implicit call of unlock_and_signal through the + ~PollGuard method. This method is called implicitly by the compiler + in all places where the object is out of context due to a return, + break, continue or simply end of statement block + */ + PollGuard poll_guard(tp,&theImpl->theWaiter,theNdbBlockNumber); + read_conn_seq= tp->getNodeSequence(node_id); + if (ret_conn_seq) + *ret_conn_seq= read_conn_seq; if ((tp->get_node_alive(node_id)) && - ((tp->getNodeSequence(node_id) == conn_seq) || + ((read_conn_seq == conn_seq) || (conn_seq == 0))) { if (tp->check_send_size(node_id, send_size)) { return_code = tp->sendSignal(aSignal, node_id); if (return_code != -1) { - theImpl->theWaiter.m_node = node_id; - theImpl->theWaiter.m_state = aWaitState; - return_code = receiveResponse(); + return poll_guard.wait_n_unlock(WAITFOR_RESPONSE_TIMEOUT,node_id, + aWaitState, false); } else { return_code = -3; } @@ -1330,16 +1343,15 @@ Ndb::sendRecSignal(Uint16 node_id, }//if } else { if ((tp->get_node_stopping(node_id)) && - ((tp->getNodeSequence(node_id) == conn_seq) || + ((read_conn_seq == conn_seq) || (conn_seq == 0))) { return_code = -5; } else { return_code = -2; }//if }//if - tp->unlock_mutex(); - // End of protected area return return_code; + // End of protected area }//Ndb::sendRecSignal() void diff --git a/storage/ndb/src/ndbapi/Ndbinit.cpp b/storage/ndb/src/ndbapi/Ndbinit.cpp index bbc1474f45d..6efcc55c32e 100644 --- a/storage/ndb/src/ndbapi/Ndbinit.cpp +++ b/storage/ndb/src/ndbapi/Ndbinit.cpp @@ -86,6 +86,9 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, theFirstTransId= 0; theMyRef= 0; + cond_wait_index = TransporterFacade::MAX_NO_THREADS; + cond_signal_ndb = NULL; + fullyQualifiedNames = true; #ifdef POORMANSPURIFY @@ -217,6 +220,8 @@ NdbWaiter::NdbWaiter(){ m_node = 0; m_state = NO_WAIT; m_mutex = 0; + m_poll_owner= false; + m_cond_wait_index= TransporterFacade::MAX_NO_THREADS; m_condition = NdbCondition_Create(); } diff --git a/storage/ndb/src/ndbapi/TransporterFacade.cpp b/storage/ndb/src/ndbapi/TransporterFacade.cpp index b143f1a9944..91091aaea06 100644 --- a/storage/ndb/src/ndbapi/TransporterFacade.cpp +++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp @@ -450,20 +450,33 @@ runReceiveResponse_C(void * me) return 0; } +/* + The receiver thread is changed to only wake up once every 10 milliseconds + to poll. It will first check that nobody owns the poll "right" before + polling. This means that methods using the receiveResponse and + sendRecSignal will have a slightly longer response time if they are + executed without any parallel key lookups. Currently also scans are + affected but this is to be fixed. +*/ void TransporterFacade::threadMainReceive(void) { theTransporterRegistry->startReceiving(); +#ifdef NDB_SHM_TRANSPORTER + NdbThread_set_shm_sigmask(true); +#endif NdbMutex_Lock(theMutexPtr); theTransporterRegistry->update_connections(); NdbMutex_Unlock(theMutexPtr); while(!theStopReceive) { for(int i = 0; i<10; i++){ - const int res = theTransporterRegistry->pollReceive(10); - if(res > 0){ - NdbMutex_Lock(theMutexPtr); - theTransporterRegistry->performReceive(); - NdbMutex_Unlock(theMutexPtr); + NdbSleep_MilliSleep(10); + NdbMutex_Lock(theMutexPtr); + if (poll_owner == NULL) { + const int res = theTransporterRegistry->pollReceive(0); + if(res > 0) + theTransporterRegistry->performReceive(); } + NdbMutex_Unlock(theMutexPtr); } NdbMutex_Lock(theMutexPtr); theTransporterRegistry->update_connections(); @@ -471,6 +484,126 @@ void TransporterFacade::threadMainReceive(void) }//while theTransporterRegistry->stopReceiving(); } +/* + This method is called by worker thread that owns the poll "rights". + It waits for events and if something arrives it takes care of it + and returns to caller. It will quickly come back here if not all + data was received for the worker thread. +*/ +void TransporterFacade::external_poll(Uint32 wait_time) +{ + NdbMutex_Unlock(theMutexPtr); + const int res = theTransporterRegistry->pollReceive(wait_time); + NdbMutex_Lock(theMutexPtr); + if (res > 0) { + theTransporterRegistry->performReceive(); + } +} + +/* + This Ndb object didn't get hold of the poll "right" and will wait on a + conditional mutex wait instead. It is put into the conditional wait + queue so that it is accessible to take over the poll "right" if needed. + The method gets a free entry in the free list and puts it first in the + doubly linked list. Finally it assigns the ndb object reference to the + entry. +*/ +Uint32 TransporterFacade::put_in_cond_wait_queue(NdbWaiter *aWaiter) +{ + /* + Get first free entry + */ + Uint32 index = first_free_cond_wait; + assert(index < MAX_NO_THREADS); + first_free_cond_wait = cond_wait_array[index].next_cond_wait; + + /* + Put in doubly linked list + */ + cond_wait_array[index].next_cond_wait = MAX_NO_THREADS; + cond_wait_array[index].prev_cond_wait = last_in_cond_wait; + if (last_in_cond_wait == MAX_NO_THREADS) { + first_in_cond_wait = index; + } else + cond_wait_array[last_in_cond_wait].next_cond_wait = index; + last_in_cond_wait = index; + + cond_wait_array[index].cond_wait_object = aWaiter; + aWaiter->set_cond_wait_index(index); + return index; +} + +/* + Somebody is about to signal the thread to wake it up, it could also + be that it woke up on a timeout and found himself still in the list. + Removes the entry from the doubly linked list. + Inserts the entry into the free list. + NULLifies the ndb object reference entry and sets the index in the + Ndb object to NIL (=MAX_NO_THREADS) +*/ +void TransporterFacade::remove_from_cond_wait_queue(NdbWaiter *aWaiter) +{ + Uint32 index = aWaiter->get_cond_wait_index(); + assert(index < MAX_NO_THREADS && + cond_wait_array[index].cond_wait_object == aWaiter); + /* + Remove from doubly linked list + */ + Uint32 prev_elem, next_elem; + prev_elem = cond_wait_array[index].prev_cond_wait; + next_elem = cond_wait_array[index].next_cond_wait; + if (prev_elem != MAX_NO_THREADS) + cond_wait_array[prev_elem].next_cond_wait = next_elem; + else + first_in_cond_wait = next_elem; + if (next_elem != MAX_NO_THREADS) + cond_wait_array[next_elem].prev_cond_wait = prev_elem; + else + last_in_cond_wait = prev_elem; + /* + Insert into free list + */ + cond_wait_array[index].next_cond_wait = first_free_cond_wait; + cond_wait_array[index].prev_cond_wait = MAX_NO_THREADS; + first_free_cond_wait = index; + + cond_wait_array[index].cond_wait_object = NULL; + aWaiter->set_cond_wait_index(MAX_NO_THREADS); +} + +/* + Get the latest Ndb object from the conditional wait queue + and also remove it from the list. +*/ +NdbWaiter* TransporterFacade::rem_last_from_cond_wait_queue() +{ + NdbWaiter *tWaiter; + Uint32 index = last_in_cond_wait; + if (last_in_cond_wait == MAX_NO_THREADS) + return NULL; + tWaiter = cond_wait_array[index].cond_wait_object; + remove_from_cond_wait_queue(tWaiter); + return tWaiter; +} + +void TransporterFacade::init_cond_wait_queue() +{ + Uint32 i; + /* + Initialise the doubly linked list as empty + */ + first_in_cond_wait = MAX_NO_THREADS; + last_in_cond_wait = MAX_NO_THREADS; + /* + Initialise free list + */ + first_free_cond_wait = 0; + for (i = 0; i < MAX_NO_THREADS; i++) { + cond_wait_array[i].cond_wait_object = NULL; + cond_wait_array[i].next_cond_wait = i+1; + cond_wait_array[i].prev_cond_wait = MAX_NO_THREADS; + } +} TransporterFacade::TransporterFacade() : theTransporterRegistry(0), @@ -480,7 +613,8 @@ TransporterFacade::TransporterFacade() : m_fragmented_signal_id(0) { DBUG_ENTER("TransporterFacade::TransporterFacade"); - + init_cond_wait_queue(); + poll_owner = NULL; theOwnId = 0; theMutexPtr = NdbMutex_Create(); @@ -1119,5 +1253,183 @@ TransporterFacade::ThreadData::close(int number){ return 0; } +PollGuard::PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter, + Uint32 block_no) +{ + m_tp= tp; + m_waiter= aWaiter; + m_locked= true; + m_block_no= block_no; + tp->lock_mutex(); +} + +/* + This is a common routine for possibly forcing the send of buffered signals + and receiving response the thread is waiting for. It is designed to be + useful from: + 1) PK, UK lookups using the asynchronous interface + This routine uses the wait_for_input routine instead since it has + special end conditions due to the asynchronous nature of its usage. + 2) Scans + 3) dictSignal + It uses a NdbWaiter object to wait on the events and this object is + linked into the conditional wait queue. Thus this object contains + a reference to its place in the queue. + + It replaces the method receiveResponse previously used on the Ndb object +*/ +int PollGuard::wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state, + bool forceSend) +{ + int ret_val; + m_waiter->set_node(nodeId); + m_waiter->set_state(state); + ret_val= wait_for_input_in_loop(wait_time, forceSend); + unlock_and_signal(); + return ret_val; +} + +int PollGuard::wait_scan(int wait_time, NodeId nodeId, bool forceSend) +{ + m_waiter->set_node(nodeId); + m_waiter->set_state(WAIT_SCAN); + return wait_for_input_in_loop(wait_time, forceSend); +} + +int PollGuard::wait_for_input_in_loop(int wait_time, bool forceSend) +{ + int ret_val, response_time; + if (forceSend) + m_tp->forceSend(m_block_no); + else + m_tp->checkForceSend(m_block_no); + if (wait_time == -1) //Means wait forever + response_time= WAITFOR_RESPONSE_TIMEOUT; + else + response_time= wait_time; + NDB_TICKS curr_time = NdbTick_CurrentMillisecond(); + NDB_TICKS max_time = curr_time + (NDB_TICKS)wait_time; + do + { + wait_for_input(response_time); + Uint32 state= m_waiter->get_state(); + if (state == NO_WAIT) + { + return 0; + } + else if (state == WAIT_NODE_FAILURE) + { + ret_val= -2; + break; + } + if (wait_time == -1) + { +#ifdef VM_TRACE + ndbout << "Waited WAITFOR_RESPONSE_TIMEOUT, continuing wait" << endl; +#endif + continue; + } + wait_time= max_time - NdbTick_CurrentMillisecond(); + if (wait_time <= 0) + { +#ifdef VM_TRACE + ndbout << "Time-out state is " << m_waiter->get_state() << endl; +#endif + m_waiter->set_state(WST_WAIT_TIMEOUT); + ret_val= -1; + break; + } + } while (1); +#ifdef VM_TRACE + ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = "; + ndbout << m_waiter->get_state() << endl; +#endif + m_waiter->set_state(NO_WAIT); + return ret_val; +} + +void PollGuard::wait_for_input(int wait_time) +{ + NdbWaiter *t_poll_owner= m_tp->get_poll_owner(); + if (t_poll_owner != NULL && t_poll_owner != m_waiter) + { + /* + We didn't get hold of the poll "right". We will sleep on a + conditional mutex until the thread owning the poll "right" + will wake us up after all data is received. If no data arrives + we will wake up eventually due to the timeout. + After receiving all data we take the object out of the cond wait + queue if it hasn't happened already. It is usually already out of the + queue but at time-out it could be that the object is still there. + */ + Uint32 cond_wait_index= m_tp->put_in_cond_wait_queue(m_waiter); + m_waiter->wait(wait_time); + if (m_waiter->get_cond_wait_index() != TransporterFacade::MAX_NO_THREADS) + { + m_tp->remove_from_cond_wait_queue(m_waiter); + } + } + else + { + /* + We got the poll "right" and we poll until data is received. After + receiving data we will check if all data is received, if not we + poll again. + */ + if (t_poll_owner) + { +#ifdef NDB_SHM_TRANSPORTER + /* + If shared memory transporters are used we need to set our sigmask + such that we wake up also on interrupts on the shared memory + interrupt signal. + */ + NdbThread_set_shm_sigmask(false); +#endif + m_tp->set_poll_owner(m_waiter); + m_waiter->set_poll_owner(true); + } + m_tp->external_poll((Uint32)wait_time); + } +} + +void PollGuard::unlock_and_signal() +{ + NdbWaiter *t_signal_cond_waiter= 0; + if (!m_locked) + return; + /* + When completing the poll for this thread we must return the poll + ownership if we own it. We will give it to the last thread that + came here (the most recent) which is likely to be the one also + last to complete. We will remove that thread from the conditional + wait queue and set him as the new owner of the poll "right". + We will wait however with the signal until we have unlocked the + mutex for performance reasons. + See Stevens book on Unix NetworkProgramming: The Sockets Networking + API Volume 1 Third Edition on page 703-704 for a discussion on this + subject. + */ + if (m_tp->get_poll_owner() == m_waiter) + { +#ifdef NDB_SHM_TRANSPORTER + /* + If shared memory transporters are used we need to reset our sigmask + since we are no longer the thread to receive interrupts. + */ + NdbThread_set_shm_sigmask(true); +#endif + m_waiter->set_poll_owner(false); + t_signal_cond_waiter= m_tp->rem_last_from_cond_wait_queue(); + m_tp->set_poll_owner(t_signal_cond_waiter); + if (t_signal_cond_waiter) + t_signal_cond_waiter->set_poll_owner(true); + } + m_tp->unlock_mutex(); + if (t_signal_cond_waiter) + t_signal_cond_waiter->cond_signal(); + m_locked=false; +} + template class Vector<NodeStatusFunction>; template class Vector<TransporterFacade::ThreadData::Object_Execute>; diff --git a/storage/ndb/src/ndbapi/TransporterFacade.hpp b/storage/ndb/src/ndbapi/TransporterFacade.hpp index fa070889dd9..34e1a944276 100644 --- a/storage/ndb/src/ndbapi/TransporterFacade.hpp +++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp @@ -34,6 +34,7 @@ class ConfigRetriever; class Ndb; class NdbApiSignal; +class NdbWaiter; typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]); typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete); @@ -47,6 +48,11 @@ extern "C" { class TransporterFacade { public: + /** + * Max number of Ndb objects. + * (Ndb objects should not be shared by different threads.) + */ + STATIC_CONST( MAX_NO_THREADS = 4711 ); TransporterFacade(); virtual ~TransporterFacade(); bool init(Uint32, const ndb_mgm_configuration *); @@ -114,10 +120,44 @@ public: TransporterRegistry* get_registry() { return theTransporterRegistry;}; +/* + When a thread has sent its signals and is ready to wait for reception + of these it does normally always wait on a conditional mutex and + the actual reception is handled by the receiver thread in the NDB API. + With the below new methods and variables each thread has the possibility + of becoming owner of the "right" to poll for signals. Effectually this + means that the thread acts temporarily as a receiver thread. + For the thread that succeeds in grabbing this "ownership" it will avoid + a number of expensive calls to conditional mutex and even more expensive + context switches to wake up. + When an owner of the poll "right" has completed its own task it is likely + that there are others still waiting. In this case we pick one of the + threads as new owner of the poll "right". Since we want to switch owner + as seldom as possible we always pick the last thread which is likely to + be the last to complete its reception. +*/ + void external_poll(Uint32 wait_time); + NdbWaiter* get_poll_owner(void) const { return poll_owner; } + void set_poll_owner(NdbWaiter* new_owner) { poll_owner= new_owner; } + Uint32 put_in_cond_wait_queue(NdbWaiter *aWaiter); + void remove_from_cond_wait_queue(NdbWaiter *aWaiter); + NdbWaiter* rem_last_from_cond_wait_queue(); // heart beat received from a node (e.g. a signal came) void hb_received(NodeId n); private: + void init_cond_wait_queue(); + struct CondWaitQueueElement { + NdbWaiter *cond_wait_object; + Uint32 next_cond_wait; + Uint32 prev_cond_wait; + }; + NdbWaiter *poll_owner; + CondWaitQueueElement cond_wait_array[MAX_NO_THREADS]; + Uint32 first_in_cond_wait; + Uint32 first_free_cond_wait; + Uint32 last_in_cond_wait; + /* End poll owner stuff */ /** * Send a signal unconditional of node status (used by ClusterMgr) */ @@ -172,12 +212,6 @@ private: /** * Block number handling */ -public: - /** - * Max number of Ndb objects. - * (Ndb objects should not be shared by different threads.) - */ - STATIC_CONST( MAX_NO_THREADS = 4711 ); private: struct ThreadData { @@ -245,6 +279,24 @@ public: GlobalDictCache m_globalDictCache; }; +class PollGuard +{ + public: + PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter, Uint32 block_no); + ~PollGuard() { unlock_and_signal(); } + int wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state, + bool forceSend= false); + int wait_for_input_in_loop(int wait_time, bool forceSend); + void wait_for_input(int wait_time); + int wait_scan(int wait_time, NodeId nodeId, bool forceSend); + void unlock_and_signal(); + private: + TransporterFacade *m_tp; + NdbWaiter *m_waiter; + Uint32 m_block_no; + bool m_locked; +}; + inline TransporterFacade* TransporterFacade::instance() |