summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <mronstrom@mysql.com[mikron]>2005-07-26 17:12:05 +0200
committerunknown <mronstrom@mysql.com[mikron]>2005-07-26 17:12:05 +0200
commit6cc3eee46e1ede6f48807c42aba7e4a6062a5959 (patch)
tree9d97d11f76f5e3f099a1376376766cda02859616
parentbc2776d9571f70df27473eb7f0420bd84799821f (diff)
downloadmariadb-git-6cc3eee46e1ede6f48807c42aba7e4a6062a5959.tar.gz
wl2405.patch
storage/ndb/include/ndbapi/Ndb.hpp: Import patch wl2405.patch storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp: Import patch wl2405.patch storage/ndb/include/ndbapi/NdbScanOperation.hpp: Import patch wl2405.patch storage/ndb/include/portlib/NdbThread.h: Import patch wl2405.patch storage/ndb/src/common/portlib/NdbThread.c: Import patch wl2405.patch storage/ndb/src/common/transporter/TransporterRegistry.cpp: Import patch wl2405.patch storage/ndb/src/ndbapi/Ndb.cpp: Import patch wl2405.patch storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp: Import patch wl2405.patch storage/ndb/src/ndbapi/NdbScanOperation.cpp: Import patch wl2405.patch storage/ndb/src/ndbapi/NdbWaiter.hpp: Import patch wl2405.patch storage/ndb/src/ndbapi/Ndbif.cpp: Import patch wl2405.patch storage/ndb/src/ndbapi/Ndbinit.cpp: Import patch wl2405.patch storage/ndb/src/ndbapi/TransporterFacade.cpp: Import patch wl2405.patch storage/ndb/src/ndbapi/TransporterFacade.hpp: Import patch wl2405.patch
-rw-r--r--storage/ndb/include/ndbapi/Ndb.hpp17
-rw-r--r--storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp2
-rw-r--r--storage/ndb/include/ndbapi/NdbScanOperation.hpp7
-rw-r--r--storage/ndb/include/portlib/NdbThread.h8
-rw-r--r--storage/ndb/src/common/portlib/NdbThread.c30
-rw-r--r--storage/ndb/src/common/transporter/TransporterRegistry.cpp9
-rw-r--r--storage/ndb/src/ndbapi/Ndb.cpp20
-rw-r--r--storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp41
-rw-r--r--storage/ndb/src/ndbapi/NdbScanOperation.cpp109
-rw-r--r--storage/ndb/src/ndbapi/NdbWaiter.hpp39
-rw-r--r--storage/ndb/src/ndbapi/Ndbif.cpp190
-rw-r--r--storage/ndb/src/ndbapi/Ndbinit.cpp5
-rw-r--r--storage/ndb/src/ndbapi/TransporterFacade.cpp324
-rw-r--r--storage/ndb/src/ndbapi/TransporterFacade.hpp64
14 files changed, 640 insertions, 225 deletions
diff --git a/storage/ndb/include/ndbapi/Ndb.hpp b/storage/ndb/include/ndbapi/Ndb.hpp
index db2212075e8..1c9d5ed8ad0 100644
--- a/storage/ndb/include/ndbapi/Ndb.hpp
+++ b/storage/ndb/include/ndbapi/Ndb.hpp
@@ -984,6 +984,8 @@ class BaseString;
class NdbEventOperation;
class NdbBlob;
class NdbReceiver;
+class TransporterFacade;
+class PollGuard;
typedef void (* NdbEventCallback)(NdbEventOperation*, Ndb*, void*);
@@ -1462,7 +1464,12 @@ public:
/*****************************************************************************
* These are service routines used by the other classes in the NDBAPI.
****************************************************************************/
+ Uint32 get_cond_wait_index() { return cond_wait_index; }
+ void set_cond_wait_index(Uint32 index) { cond_wait_index = index; }
private:
+ Uint32 cond_wait_index;
+ Ndb *cond_signal_ndb;
+ void cond_signal();
void setup(Ndb_cluster_connection *ndb_cluster_connection,
const char* aCatalogName, const char* aSchemaName);
@@ -1513,13 +1520,11 @@ private:
// synchronous and asynchronous interface
void handleReceivedSignal(NdbApiSignal* anApiSignal, struct LinearSectionPtr ptr[3]);
- // Receive response signals
- int receiveResponse(int waitTime = WAITFOR_RESPONSE_TIMEOUT);
-
int sendRecSignal(Uint16 aNodeId,
Uint32 aWaitState,
NdbApiSignal* aSignal,
- Uint32 nodeSequence);
+ Uint32 nodeSequence,
+ Uint32 *ret_conn_seq= 0);
// Sets Restart GCI in Ndb object
void RestartGCI(int aRestartGCI);
@@ -1576,7 +1581,9 @@ private:
Uint32 pollCompleted(NdbTransaction** aCopyArray);
void sendPrepTrans(int forceSend);
void reportCallback(NdbTransaction** aCopyArray, Uint32 aNoOfComplTrans);
- void waitCompletedTransactions(int milliSecs, int noOfEventsToWaitFor);
+ int poll_trans(int milliSecs, int noOfEventsToWaitFor, PollGuard *pg);
+ void waitCompletedTransactions(int milliSecs, int noOfEventsToWaitFor,
+ PollGuard *pg);
void completedTransaction(NdbTransaction* aTransaction);
void completedScanTransaction(NdbTransaction* aTransaction);
diff --git a/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp b/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp
index 0a31f228921..c231b927581 100644
--- a/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp
+++ b/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp
@@ -161,7 +161,7 @@ private:
void fix_get_values();
int next_result_ordered(bool fetchAllowed, bool forceSend = false);
- int send_next_scan_ordered(Uint32 idx, bool forceSend = false);
+ int send_next_scan_ordered(Uint32 idx);
int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*);
Uint32 m_sort_columns;
diff --git a/storage/ndb/include/ndbapi/NdbScanOperation.hpp b/storage/ndb/include/ndbapi/NdbScanOperation.hpp
index bf8f362cefc..b32f6050704 100644
--- a/storage/ndb/include/ndbapi/NdbScanOperation.hpp
+++ b/storage/ndb/include/ndbapi/NdbScanOperation.hpp
@@ -21,6 +21,7 @@
class NdbBlob;
class NdbResultSet;
+class PollGuard;
/**
* @class NdbScanOperation
@@ -183,7 +184,8 @@ protected:
int nextResultImpl(bool fetchAllowed = true, bool forceSend = false);
virtual void release();
- int close_impl(class TransporterFacade*, bool forceSend = false);
+ int close_impl(class TransporterFacade*, bool forceSend,
+ PollGuard *poll_guard);
// Overloaded methods from NdbCursorOperation
int executeCursor(int ProcessorId);
@@ -192,7 +194,6 @@ protected:
int init(const NdbTableImpl* tab, NdbTransaction*);
int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId);
int doSend(int ProcessorId);
- void checkForceSend(bool forceSend);
virtual void setErrorCode(int aErrorCode);
virtual void setErrorCodeAbort(int aErrorCode);
@@ -234,7 +235,7 @@ protected:
Uint32 m_sent_receivers_count; // NOTE needs mutex to access
NdbReceiver** m_sent_receivers; // receive thread puts them here
- int send_next_scan(Uint32 cnt, bool close, bool forceSend = false);
+ int send_next_scan(Uint32 cnt, bool close);
void receiver_delivered(NdbReceiver*);
void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP();
diff --git a/storage/ndb/include/portlib/NdbThread.h b/storage/ndb/include/portlib/NdbThread.h
index e86deee4354..003a7e3e151 100644
--- a/storage/ndb/include/portlib/NdbThread.h
+++ b/storage/ndb/include/portlib/NdbThread.h
@@ -37,6 +37,14 @@ typedef size_t NDB_THREAD_STACKSIZE;
struct NdbThread;
+/*
+ Method to block/unblock thread from receiving KILL signal with
+ signum set in g_ndb_shm_signum in a portable manner.
+*/
+#ifdef NDB_SHM_TRANSPORTER
+void NdbThread_set_shm_sigmask(bool block);
+#endif
+
/**
* Create a thread
*
diff --git a/storage/ndb/src/common/portlib/NdbThread.c b/storage/ndb/src/common/portlib/NdbThread.c
index 55ebc4c8111..85c2de4fd5e 100644
--- a/storage/ndb/src/common/portlib/NdbThread.c
+++ b/storage/ndb/src/common/portlib/NdbThread.c
@@ -36,6 +36,27 @@ struct NdbThread
void * object;
};
+
+#ifdef NDB_SHM_TRANSPORTER
+void NdbThread_set_shm_sigmask(bool block)
+{
+ DBUG_ENTER("NdbThread_set_shm_sigmask");
+ if (g_ndb_shm_signum)
+ {
+ sigset_t mask;
+ DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
+ sigemptyset(&mask);
+ sigaddset(&mask, g_ndb_shm_signum);
+ if (block)
+ pthread_sigmask(SIG_BLOCK, &mask, 0);
+ else
+ pthread_sigmask(SIG_UNBLOCK, &mask, 0);
+ }
+ DBUG_VOID_RETURN;
+}
+#endif
+
+
static
void*
ndb_thread_wrapper(void* _ss){
@@ -43,14 +64,7 @@ ndb_thread_wrapper(void* _ss){
{
DBUG_ENTER("ndb_thread_wrapper");
#ifdef NDB_SHM_TRANSPORTER
- if (g_ndb_shm_signum)
- {
- sigset_t mask;
- DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
- sigemptyset(&mask);
- sigaddset(&mask, g_ndb_shm_signum);
- pthread_sigmask(SIG_BLOCK, &mask, 0);
- }
+ NdbThread_set_shm_sigmask(true);
#endif
{
void *ret;
diff --git a/storage/ndb/src/common/transporter/TransporterRegistry.cpp b/storage/ndb/src/common/transporter/TransporterRegistry.cpp
index 86bfa385c04..963f5020bd4 100644
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp
@@ -457,10 +457,7 @@ TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
* Make sure to block g_ndb_shm_signum
* TransporterRegistry::init is run from "main" thread
*/
- sigset_t mask;
- sigemptyset(&mask);
- sigaddset(&mask, g_ndb_shm_signum);
- pthread_sigmask(SIG_BLOCK, &mask, 0);
+ NdbThread_set_shm_sigmask(true);
}
if(config->shm.signum != g_ndb_shm_signum)
@@ -1490,11 +1487,9 @@ TransporterRegistry::startReceiving()
DBUG_PRINT("info",("Install signal handler for signum %d",
g_ndb_shm_signum));
struct sigaction sa;
+ NdbThread_set_shm_sigmask(false);
sigemptyset(&sa.sa_mask);
- sigaddset(&sa.sa_mask, g_ndb_shm_signum);
- pthread_sigmask(SIG_UNBLOCK, &sa.sa_mask, 0);
sa.sa_handler = shm_sig_handler;
- sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
int ret;
while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
diff --git a/storage/ndb/src/ndbapi/Ndb.cpp b/storage/ndb/src/ndbapi/Ndb.cpp
index 7893aaae15c..c5d32f59c3c 100644
--- a/storage/ndb/src/ndbapi/Ndb.cpp
+++ b/storage/ndb/src/ndbapi/Ndb.cpp
@@ -173,23 +173,9 @@ Ndb::NDB_connect(Uint32 tNode)
tSignal->setData(theMyRef, 2); // Set my block reference
tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting
Uint32 nodeSequence;
- { // send and receive signal
- Guard guard(tp->theMutexPtr);
- nodeSequence = tp->getNodeSequence(tNode);
- bool node_is_alive = tp->get_node_alive(tNode);
- if (node_is_alive) {
- tReturnCode = tp->sendSignal(tSignal, tNode);
- releaseSignal(tSignal);
- if (tReturnCode != -1) {
- theImpl->theWaiter.m_node = tNode;
- theImpl->theWaiter.m_state = WAIT_TC_SEIZE;
- tReturnCode = receiveResponse();
- }//if
- } else {
- releaseSignal(tSignal);
- tReturnCode = -1;
- }//if
- }
+ tReturnCode= sendRecSignal(tNode, WAIT_TC_SEIZE, tSignal,
+ 0, &nodeSequence);
+ releaseSignal(tSignal);
if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) {
//************************************************
// Send and receive was successful
diff --git a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
index ce5827ab7f4..582f4baaef4 100644
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
@@ -989,7 +989,13 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
m_buffer.clear();
// Protected area
- m_transporter->lock_mutex();
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference));
Uint32 aNodeId;
if (useMasterNodeId) {
if ((m_masterNodeId == 0) ||
@@ -1002,7 +1008,6 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
}
if(aNodeId == 0){
m_error.code= 4009;
- m_transporter->unlock_mutex();
DBUG_RETURN(-1);
}
{
@@ -1023,21 +1028,15 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
r = m_transporter->sendSignal(signal, aNodeId);
}
if(r != 0){
- m_transporter->unlock_mutex();
continue;
}
}
m_error.code= 0;
-
- m_waiter.m_node = aNodeId;
- m_waiter.m_state = wst;
-
- m_waiter.wait(theWait);
- m_transporter->unlock_mutex();
+ int ret_val= poll_guard.wait_n_unlock(theWait, aNodeId, wst);
// End of Protected area
- if(m_waiter.m_state == NO_WAIT && m_error.code == 0){
+ if(ret_val == 0 && m_error.code == 0){
// Normal return
DBUG_RETURN(0);
}
@@ -1045,7 +1044,7 @@ NdbDictInterface::dictSignal(NdbApiSignal* signal,
/**
* Handle error codes
*/
- if(m_waiter.m_state == WAIT_NODE_FAILURE)
+ if(ret_val == -2) //WAIT_NODE_FAILURE
continue;
if(m_waiter.m_state == WST_WAIT_TIMEOUT)
@@ -3166,26 +3165,28 @@ NdbDictInterface::listObjects(NdbApiSignal* signal)
for (Uint32 i = 0; i < RETRIES; i++) {
m_buffer.clear();
// begin protected
- m_transporter->lock_mutex();
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(m_transporter, &m_waiter, refToBlock(m_reference));
Uint16 aNodeId = m_transporter->get_an_alive_node();
if (aNodeId == 0) {
m_error.code= 4009;
- m_transporter->unlock_mutex();
return -1;
}
if (m_transporter->sendSignal(signal, aNodeId) != 0) {
- m_transporter->unlock_mutex();
continue;
}
m_error.code= 0;
- m_waiter.m_node = aNodeId;
- m_waiter.m_state = WAIT_LIST_TABLES_CONF;
- m_waiter.wait(WAITFOR_RESPONSE_TIMEOUT);
- m_transporter->unlock_mutex();
+ int ret_val= poll_guard.wait_n_unlock(WAITFOR_RESPONSE_TIMEOUT,
+ aNodeId, WAIT_LIST_TABLES_CONF);
// end protected
- if (m_waiter.m_state == NO_WAIT && m_error.code == 0)
+ if (ret_val == 0 && m_error.code == 0)
return 0;
- if (m_waiter.m_state == WAIT_NODE_FAILURE)
+ if (ret_val == -2) //WAIT_NODE_FAILURE
continue;
return -1;
}
diff --git a/storage/ndb/src/ndbapi/NdbScanOperation.cpp b/storage/ndb/src/ndbapi/NdbScanOperation.cpp
index e0a480e02f7..288b8dc8bd8 100644
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -460,13 +460,20 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
if(theError.code)
return -1;
Uint32 seq = theNdbCon->theNodeSequence;
- if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
- forceSend) == 0){
+ if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0)
+ {
idx = m_current_api_receiver;
last = m_api_receivers_count;
@@ -495,10 +502,9 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
/**
* No completed...
*/
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
- if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+ int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
+ forceSend);
+ if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
} else {
idx = last;
@@ -557,8 +563,8 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
}
int
-NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
- bool forceSend){
+NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag)
+{
if(cnt > 0){
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
@@ -605,9 +611,6 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
ret = tp->sendSignal(&tSignal, nodeId);
}
}
-
- if (!ret) checkForceSend(forceSend);
-
m_sent_receivers_count = last + sent;
m_api_receivers_count -= cnt;
m_current_api_receiver = 0;
@@ -617,15 +620,6 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
return 0;
}
-void NdbScanOperation::checkForceSend(bool forceSend)
-{
- if (forceSend) {
- TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
- } else {
- TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
- }//if
-}
-
int
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
{
@@ -661,9 +655,15 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp)
m_sent_receivers_count);
TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
- close_impl(tp, forceSend);
-
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
+ close_impl(tp, forceSend, &poll_guard);
}
NdbConnection* tCon = theNdbCon;
@@ -1338,20 +1338,26 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
if(fetchAllowed){
if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
if(theError.code)
return -1;
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) &&
- !send_next_scan_ordered(s_idx, forceSend)){
+ !send_next_scan_ordered(s_idx)){
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
- if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+ int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
+ forceSend);
+ if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
}
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
@@ -1438,7 +1444,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
}
int
-NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
+NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx)
+{
if(idx == theParallelism)
return 0;
@@ -1476,12 +1483,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
TransporterFacade * tp = TransporterFacade::instance();
tSignal.setLength(4+1);
int ret= tp->sendSignal(&tSignal, nodeId);
- if (!ret) checkForceSend(forceSend);
return ret;
}
int
-NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
+NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
+ PollGuard *poll_guard)
+{
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
@@ -1496,9 +1504,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/
while(theError.code == 0 && m_sent_receivers_count)
{
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+ int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
+ false);
switch(return_code){
case 0:
break;
@@ -1555,7 +1562,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
}
// Send close scan
- if(send_next_scan(api+conf, true, forceSend) == -1)
+ if(send_next_scan(api+conf, true) == -1)
{
theNdbCon->theReleaseOnClose = true;
return -1;
@@ -1566,9 +1573,8 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
+ int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
+ forceSend);
switch(return_code){
case 0:
break;
@@ -1608,12 +1614,19 @@ NdbScanOperation::restart(bool forceSend)
{
TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
Uint32 nodeId = theNdbCon->theDBnode;
{
int res;
- if((res= close_impl(tp, forceSend)))
+ if((res= close_impl(tp, forceSend, &poll_guard)))
{
return res;
}
@@ -1627,7 +1640,6 @@ NdbScanOperation::restart(bool forceSend)
theError.code = 0;
if (doSendScan(nodeId) == -1)
return -1;
-
return 0;
}
@@ -1637,8 +1649,15 @@ NdbIndexScanOperation::reset_bounds(bool forceSend){
{
TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
- res= close_impl(tp, forceSend);
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
+ res= close_impl(tp, forceSend, &poll_guard);
}
if(!res)
diff --git a/storage/ndb/src/ndbapi/NdbWaiter.hpp b/storage/ndb/src/ndbapi/NdbWaiter.hpp
index 8b7b2a75879..4ccfb40b5ba 100644
--- a/storage/ndb/src/ndbapi/NdbWaiter.hpp
+++ b/storage/ndb/src/ndbapi/NdbWaiter.hpp
@@ -54,10 +54,19 @@ public:
void wait(int waitTime);
void nodeFail(Uint32 node);
void signal(Uint32 state);
+ void cond_signal();
+ void set_poll_owner(bool poll_owner) { m_poll_owner= poll_owner; }
+ Uint32 get_state() { return m_state; }
+ void set_state(Uint32 state) { m_state= state; }
+ void set_node(Uint32 node) { m_node= node; }
+ Uint32 get_cond_wait_index() { return m_cond_wait_index; }
+ void set_cond_wait_index(Uint32 index) { m_cond_wait_index= index; }
Uint32 m_node;
Uint32 m_state;
void * m_mutex;
+ bool m_poll_owner;
+ Uint32 m_cond_wait_index;
struct NdbCondition * m_condition;
};
@@ -65,22 +74,8 @@ inline
void
NdbWaiter::wait(int waitTime)
{
- const bool forever = (waitTime == -1);
- const NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
- while (1) {
- if (m_state == NO_WAIT || m_state == WAIT_NODE_FAILURE)
- break;
- if (forever) {
- NdbCondition_Wait(m_condition, (NdbMutex*)m_mutex);
- } else {
- if (waitTime <= 0) {
- m_state = WST_WAIT_TIMEOUT;
- break;
- }
- NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime);
- waitTime = maxTime - NdbTick_CurrentMillisecond();
- }
- }
+ assert(!m_poll_owner);
+ NdbCondition_WaitTimeout(m_condition, (NdbMutex*)m_mutex, waitTime);
}
inline
@@ -88,7 +83,8 @@ void
NdbWaiter::nodeFail(Uint32 aNodeId){
if (m_state != NO_WAIT && m_node == aNodeId){
m_state = WAIT_NODE_FAILURE;
- NdbCondition_Signal(m_condition);
+ if (!m_poll_owner)
+ NdbCondition_Signal(m_condition);
}
}
@@ -96,7 +92,14 @@ inline
void
NdbWaiter::signal(Uint32 state){
m_state = state;
- NdbCondition_Signal(m_condition);
+ if (!m_poll_owner)
+ NdbCondition_Signal(m_condition);
}
+inline
+void
+NdbWaiter::cond_signal()
+{
+ NdbCondition_Signal(m_condition);
+}
#endif
diff --git a/storage/ndb/src/ndbapi/Ndbif.cpp b/storage/ndb/src/ndbapi/Ndbif.cpp
index fee6f0930ad..ca7b5aee84a 100644
--- a/storage/ndb/src/ndbapi/Ndbif.cpp
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp
@@ -332,6 +332,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
const Uint32 tFirstData = *tDataPtr;
const Uint32 tLen = aSignal->getLength();
void * tFirstDataPtr;
+ NdbWaiter *t_waiter;
/*
In order to support 64 bit processes in the application we need to use
@@ -470,7 +471,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
ndbout_c("Recevied TCKEY_FAILREF wo/ operation");
#endif
return;
- break;
+ return;
}
case GSN_TCKEYREF:
{
@@ -677,12 +678,12 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
case GSN_LIST_TABLES_CONF:
NdbDictInterface::execSignal(&theDictionary->m_receiver,
aSignal, ptr);
- break;
+ return;
case GSN_SUB_META_DATA:
case GSN_SUB_REMOVE_CONF:
case GSN_SUB_REMOVE_REF:
- break; // ignore these signals
+ return; // ignore these signals
case GSN_SUB_GCP_COMPLETE_REP:
case GSN_SUB_START_CONF:
case GSN_SUB_START_REF:
@@ -691,7 +692,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
case GSN_SUB_STOP_REF:
NdbDictInterface::execSignal(&theDictionary->m_receiver,
aSignal, ptr);
- break;
+ return;
case GSN_DIHNDBTAMPER:
{
@@ -833,11 +834,32 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
default:
goto InvalidSignal;
- }//switch
-
- if (theImpl->theWaiter.m_state == NO_WAIT) {
- // Wake up the thread waiting for response
- NdbCondition_Signal(theImpl->theWaiter.m_condition);
+ }//swich
+
+ t_waiter= &theImpl->theWaiter;
+ if (t_waiter->get_state() == NO_WAIT && tWaitState != NO_WAIT)
+ {
+ /*
+ If our waiter object is the owner of the "poll rights", then we
+ can simply return, we will return from this routine to the
+ place where external_poll was called. From there it will move
+ the "poll ownership" to a new thread if available.
+
+ If our waiter object doesn't own the "poll rights", then we must
+ signal the thread from where this waiter object called
+ its conditional wait. This will wake up this thread so that it
+ can continue its work.
+ */
+ TransporterFacade *tp= TransporterFacade::instance();
+ if (tp->get_poll_owner() != t_waiter)
+ {
+ /*
+ Wake up the thread waiting for response and remove it from queue
+ of objects waiting for receive completion
+ */
+ tp->remove_from_cond_wait_queue(t_waiter);
+ t_waiter->cond_signal();
+ }
}//if
return;
@@ -892,7 +914,19 @@ Ndb::completedTransaction(NdbTransaction* aCon)
if ((theMinNoOfEventsToWakeUp != 0) &&
(theNoOfCompletedTransactions >= theMinNoOfEventsToWakeUp)) {
theMinNoOfEventsToWakeUp = 0;
- NdbCondition_Signal(theImpl->theWaiter.m_condition);
+ TransporterFacade *tp = TransporterFacade::instance();
+ NdbWaiter *t_waiter= &theImpl->theWaiter;
+ if (tp->get_poll_owner() != t_waiter) {
+ /*
+ When we come here, this is executed by the thread owning the "poll
+ rights". This thread is not where our waiter object belongs.
+ Thus we wake up the thread owning this waiter object but first
+ we must remove it from the conditional wait queue so that we
+ don't assign it as poll owner later on.
+ */
+ tp->remove_from_cond_wait_queue(t_waiter);
+ t_waiter->cond_signal();
+ }
return;
}//if
} else {
@@ -1151,7 +1185,8 @@ Remark: First send all prepared operations and then check if there are any
******************************************************************************/
void
Ndb::waitCompletedTransactions(int aMilliSecondsToWait,
- int noOfEventsToWaitFor)
+ int noOfEventsToWaitFor,
+ PollGuard *poll_guard)
{
theImpl->theWaiter.m_state = NO_WAIT;
/**
@@ -1160,22 +1195,24 @@ Ndb::waitCompletedTransactions(int aMilliSecondsToWait,
* (see ReportFailure)
*/
int waitTime = aMilliSecondsToWait;
- NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + (NDB_TICKS)waitTime;
+ NDB_TICKS currTime = NdbTick_CurrentMillisecond();
+ NDB_TICKS maxTime = currTime + (NDB_TICKS)waitTime;
theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;
do {
if (waitTime < 1000) waitTime = 1000;
- NdbCondition_WaitTimeout(theImpl->theWaiter.m_condition,
- (NdbMutex*)theImpl->theWaiter.m_mutex,
- waitTime);
+ poll_guard->wait_for_input(waitTime);
if (theNoOfCompletedTransactions >= (Uint32)noOfEventsToWaitFor) {
break;
}//if
theMinNoOfEventsToWakeUp = noOfEventsToWaitFor;
waitTime = (int)(maxTime - NdbTick_CurrentMillisecond());
} while (waitTime > 0);
- return;
}//Ndb::waitCompletedTransactions()
+void Ndb::cond_signal()
+{
+ NdbCondition_Signal(theImpl->theWaiter.m_condition);
+}
/*****************************************************************************
void sendPreparedTransactions(int forceSend = 0);
@@ -1203,28 +1240,39 @@ Remark: First send all prepared operations and then check if there are any
int
Ndb::sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup, int forceSend)
{
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard pg(TransporterFacade::instance(), &theImpl->theWaiter,
+ theNdbBlockNumber);
+ sendPrepTrans(forceSend);
+ return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg);
+}
+
+int
+Ndb::poll_trans(int aMillisecondNumber, int minNoOfEventsToWakeup,
+ PollGuard *pg)
+{
NdbTransaction* tConArray[1024];
Uint32 tNoCompletedTransactions;
-
- //theCurrentConnectCounter = 0;
- //theCurrentConnectIndex++;
- TransporterFacade::instance()->lock_mutex();
- sendPrepTrans(forceSend);
if ((minNoOfEventsToWakeup <= 0) ||
((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) {
minNoOfEventsToWakeup = theNoOfSentTransactions;
}//if
if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) &&
(aMillisecondNumber > 0)) {
- waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup);
+ waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup, pg);
tNoCompletedTransactions = pollCompleted(tConArray);
} else {
tNoCompletedTransactions = pollCompleted(tConArray);
}//if
- TransporterFacade::instance()->unlock_mutex();
+ pg->unlock_and_signal();
reportCallback(tConArray, tNoCompletedTransactions);
return tNoCompletedTransactions;
-}//Ndb::sendPollNdb()
+}
/*****************************************************************************
int pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup);
@@ -1236,67 +1284,23 @@ Remark: Check if there are any transactions already completed. Wait for not
int
Ndb::pollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup)
{
- NdbTransaction* tConArray[1024];
- Uint32 tNoCompletedTransactions;
-
- //theCurrentConnectCounter = 0;
- //theCurrentConnectIndex++;
- TransporterFacade::instance()->lock_mutex();
- if ((minNoOfEventsToWakeup == 0) ||
- ((Uint32)minNoOfEventsToWakeup > theNoOfSentTransactions)) {
- minNoOfEventsToWakeup = theNoOfSentTransactions;
- }//if
- if ((theNoOfCompletedTransactions < (Uint32)minNoOfEventsToWakeup) &&
- (aMillisecondNumber > 0)) {
- waitCompletedTransactions(aMillisecondNumber, minNoOfEventsToWakeup);
- tNoCompletedTransactions = pollCompleted(tConArray);
- } else {
- tNoCompletedTransactions = pollCompleted(tConArray);
- }//if
- TransporterFacade::instance()->unlock_mutex();
- reportCallback(tConArray, tNoCompletedTransactions);
- return tNoCompletedTransactions;
-}//Ndb::sendPollNdbWithoutWait()
-
-/*****************************************************************************
-int receiveOptimisedResponse();
-
-Return: 0 - Response received
- -1 - Timeout occured waiting for response
- -2 - Node failure interupted wait for response
-
-******************************************************************************/
-int
-Ndb::receiveResponse(int waitTime){
- int tResultCode;
- TransporterFacade::instance()->checkForceSend(theNdbBlockNumber);
-
- theImpl->theWaiter.wait(waitTime);
-
- if(theImpl->theWaiter.m_state == NO_WAIT) {
- tResultCode = 0;
- } else {
-
-#ifdef VM_TRACE
- ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";
- ndbout << theImpl->theWaiter.m_state << endl;
-#endif
-
- if (theImpl->theWaiter.m_state == WAIT_NODE_FAILURE){
- tResultCode = -2;
- } else {
- tResultCode = -1;
- }
- theImpl->theWaiter.m_state = NO_WAIT;
- }
- return tResultCode;
-}//Ndb::receiveResponse()
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard pg(TransporterFacade::instance(), &theImpl->theWaiter,
+ theNdbBlockNumber);
+ return poll_trans(aMillisecondNumber, minNoOfEventsToWakeup, &pg);
+}
int
Ndb::sendRecSignal(Uint16 node_id,
Uint32 aWaitState,
NdbApiSignal* aSignal,
- Uint32 conn_seq)
+ Uint32 conn_seq,
+ Uint32 *ret_conn_seq)
{
/*
In most situations 0 is returned.
@@ -1309,19 +1313,28 @@ Ndb::sendRecSignal(Uint16 node_id,
*/
int return_code;
+ Uint32 read_conn_seq;
TransporterFacade* tp = TransporterFacade::instance();
- Uint32 send_size = 1; // Always sends one signal only
- tp->lock_mutex();
+ Uint32 send_size = 1; // Always sends one signal only
// Protected area
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(tp,&theImpl->theWaiter,theNdbBlockNumber);
+ read_conn_seq= tp->getNodeSequence(node_id);
+ if (ret_conn_seq)
+ *ret_conn_seq= read_conn_seq;
if ((tp->get_node_alive(node_id)) &&
- ((tp->getNodeSequence(node_id) == conn_seq) ||
+ ((read_conn_seq == conn_seq) ||
(conn_seq == 0))) {
if (tp->check_send_size(node_id, send_size)) {
return_code = tp->sendSignal(aSignal, node_id);
if (return_code != -1) {
- theImpl->theWaiter.m_node = node_id;
- theImpl->theWaiter.m_state = aWaitState;
- return_code = receiveResponse();
+ return poll_guard.wait_n_unlock(WAITFOR_RESPONSE_TIMEOUT,node_id,
+ aWaitState, false);
} else {
return_code = -3;
}
@@ -1330,16 +1343,15 @@ Ndb::sendRecSignal(Uint16 node_id,
}//if
} else {
if ((tp->get_node_stopping(node_id)) &&
- ((tp->getNodeSequence(node_id) == conn_seq) ||
+ ((read_conn_seq == conn_seq) ||
(conn_seq == 0))) {
return_code = -5;
} else {
return_code = -2;
}//if
}//if
- tp->unlock_mutex();
- // End of protected area
return return_code;
+ // End of protected area
}//Ndb::sendRecSignal()
void
diff --git a/storage/ndb/src/ndbapi/Ndbinit.cpp b/storage/ndb/src/ndbapi/Ndbinit.cpp
index bbc1474f45d..6efcc55c32e 100644
--- a/storage/ndb/src/ndbapi/Ndbinit.cpp
+++ b/storage/ndb/src/ndbapi/Ndbinit.cpp
@@ -86,6 +86,9 @@ void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
theFirstTransId= 0;
theMyRef= 0;
+ cond_wait_index = TransporterFacade::MAX_NO_THREADS;
+ cond_signal_ndb = NULL;
+
fullyQualifiedNames = true;
#ifdef POORMANSPURIFY
@@ -217,6 +220,8 @@ NdbWaiter::NdbWaiter(){
m_node = 0;
m_state = NO_WAIT;
m_mutex = 0;
+ m_poll_owner= false;
+ m_cond_wait_index= TransporterFacade::MAX_NO_THREADS;
m_condition = NdbCondition_Create();
}
diff --git a/storage/ndb/src/ndbapi/TransporterFacade.cpp b/storage/ndb/src/ndbapi/TransporterFacade.cpp
index b143f1a9944..91091aaea06 100644
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp
@@ -450,20 +450,33 @@ runReceiveResponse_C(void * me)
return 0;
}
+/*
+ The receiver thread is changed to only wake up once every 10 milliseconds
+ to poll. It will first check that nobody owns the poll "right" before
+ polling. This means that methods using the receiveResponse and
+ sendRecSignal will have a slightly longer response time if they are
+ executed without any parallel key lookups. Currently also scans are
+ affected but this is to be fixed.
+*/
void TransporterFacade::threadMainReceive(void)
{
theTransporterRegistry->startReceiving();
+#ifdef NDB_SHM_TRANSPORTER
+ NdbThread_set_shm_sigmask(true);
+#endif
NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->update_connections();
NdbMutex_Unlock(theMutexPtr);
while(!theStopReceive) {
for(int i = 0; i<10; i++){
- const int res = theTransporterRegistry->pollReceive(10);
- if(res > 0){
- NdbMutex_Lock(theMutexPtr);
- theTransporterRegistry->performReceive();
- NdbMutex_Unlock(theMutexPtr);
+ NdbSleep_MilliSleep(10);
+ NdbMutex_Lock(theMutexPtr);
+ if (poll_owner == NULL) {
+ const int res = theTransporterRegistry->pollReceive(0);
+ if(res > 0)
+ theTransporterRegistry->performReceive();
}
+ NdbMutex_Unlock(theMutexPtr);
}
NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->update_connections();
@@ -471,6 +484,126 @@ void TransporterFacade::threadMainReceive(void)
}//while
theTransporterRegistry->stopReceiving();
}
+/*
+ This method is called by worker thread that owns the poll "rights".
+ It waits for events and if something arrives it takes care of it
+ and returns to caller. It will quickly come back here if not all
+ data was received for the worker thread.
+*/
+void TransporterFacade::external_poll(Uint32 wait_time)
+{
+ NdbMutex_Unlock(theMutexPtr);
+ const int res = theTransporterRegistry->pollReceive(wait_time);
+ NdbMutex_Lock(theMutexPtr);
+ if (res > 0) {
+ theTransporterRegistry->performReceive();
+ }
+}
+
+/*
+ This Ndb object didn't get hold of the poll "right" and will wait on a
+ conditional mutex wait instead. It is put into the conditional wait
+ queue so that it is accessible to take over the poll "right" if needed.
+ The method gets a free entry in the free list and puts it first in the
+ doubly linked list. Finally it assigns the ndb object reference to the
+ entry.
+*/
+Uint32 TransporterFacade::put_in_cond_wait_queue(NdbWaiter *aWaiter)
+{
+ /*
+ Get first free entry
+ */
+ Uint32 index = first_free_cond_wait;
+ assert(index < MAX_NO_THREADS);
+ first_free_cond_wait = cond_wait_array[index].next_cond_wait;
+
+ /*
+ Put in doubly linked list
+ */
+ cond_wait_array[index].next_cond_wait = MAX_NO_THREADS;
+ cond_wait_array[index].prev_cond_wait = last_in_cond_wait;
+ if (last_in_cond_wait == MAX_NO_THREADS) {
+ first_in_cond_wait = index;
+ } else
+ cond_wait_array[last_in_cond_wait].next_cond_wait = index;
+ last_in_cond_wait = index;
+
+ cond_wait_array[index].cond_wait_object = aWaiter;
+ aWaiter->set_cond_wait_index(index);
+ return index;
+}
+
+/*
+ Somebody is about to signal the thread to wake it up, it could also
+ be that it woke up on a timeout and found himself still in the list.
+ Removes the entry from the doubly linked list.
+ Inserts the entry into the free list.
+ NULLifies the ndb object reference entry and sets the index in the
+ Ndb object to NIL (=MAX_NO_THREADS)
+*/
+void TransporterFacade::remove_from_cond_wait_queue(NdbWaiter *aWaiter)
+{
+ Uint32 index = aWaiter->get_cond_wait_index();
+ assert(index < MAX_NO_THREADS &&
+ cond_wait_array[index].cond_wait_object == aWaiter);
+ /*
+ Remove from doubly linked list
+ */
+ Uint32 prev_elem, next_elem;
+ prev_elem = cond_wait_array[index].prev_cond_wait;
+ next_elem = cond_wait_array[index].next_cond_wait;
+ if (prev_elem != MAX_NO_THREADS)
+ cond_wait_array[prev_elem].next_cond_wait = next_elem;
+ else
+ first_in_cond_wait = next_elem;
+ if (next_elem != MAX_NO_THREADS)
+ cond_wait_array[next_elem].prev_cond_wait = prev_elem;
+ else
+ last_in_cond_wait = prev_elem;
+ /*
+ Insert into free list
+ */
+ cond_wait_array[index].next_cond_wait = first_free_cond_wait;
+ cond_wait_array[index].prev_cond_wait = MAX_NO_THREADS;
+ first_free_cond_wait = index;
+
+ cond_wait_array[index].cond_wait_object = NULL;
+ aWaiter->set_cond_wait_index(MAX_NO_THREADS);
+}
+
+/*
+ Get the latest Ndb object from the conditional wait queue
+ and also remove it from the list.
+*/
+NdbWaiter* TransporterFacade::rem_last_from_cond_wait_queue()
+{
+ NdbWaiter *tWaiter;
+ Uint32 index = last_in_cond_wait;
+ if (last_in_cond_wait == MAX_NO_THREADS)
+ return NULL;
+ tWaiter = cond_wait_array[index].cond_wait_object;
+ remove_from_cond_wait_queue(tWaiter);
+ return tWaiter;
+}
+
+void TransporterFacade::init_cond_wait_queue()
+{
+ Uint32 i;
+ /*
+ Initialise the doubly linked list as empty
+ */
+ first_in_cond_wait = MAX_NO_THREADS;
+ last_in_cond_wait = MAX_NO_THREADS;
+ /*
+ Initialise free list
+ */
+ first_free_cond_wait = 0;
+ for (i = 0; i < MAX_NO_THREADS; i++) {
+ cond_wait_array[i].cond_wait_object = NULL;
+ cond_wait_array[i].next_cond_wait = i+1;
+ cond_wait_array[i].prev_cond_wait = MAX_NO_THREADS;
+ }
+}
TransporterFacade::TransporterFacade() :
theTransporterRegistry(0),
@@ -480,7 +613,8 @@ TransporterFacade::TransporterFacade() :
m_fragmented_signal_id(0)
{
DBUG_ENTER("TransporterFacade::TransporterFacade");
-
+ init_cond_wait_queue();
+ poll_owner = NULL;
theOwnId = 0;
theMutexPtr = NdbMutex_Create();
@@ -1119,5 +1253,183 @@ TransporterFacade::ThreadData::close(int number){
return 0;
}
+PollGuard::PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter,
+ Uint32 block_no)
+{
+ m_tp= tp;
+ m_waiter= aWaiter;
+ m_locked= true;
+ m_block_no= block_no;
+ tp->lock_mutex();
+}
+
+/*
+ This is a common routine for possibly forcing the send of buffered signals
+ and receiving response the thread is waiting for. It is designed to be
+ useful from:
+ 1) PK, UK lookups using the asynchronous interface
+ This routine uses the wait_for_input routine instead since it has
+ special end conditions due to the asynchronous nature of its usage.
+ 2) Scans
+ 3) dictSignal
+ It uses a NdbWaiter object to wait on the events and this object is
+ linked into the conditional wait queue. Thus this object contains
+ a reference to its place in the queue.
+
+ It replaces the method receiveResponse previously used on the Ndb object
+*/
+int PollGuard::wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state,
+ bool forceSend)
+{
+ int ret_val;
+ m_waiter->set_node(nodeId);
+ m_waiter->set_state(state);
+ ret_val= wait_for_input_in_loop(wait_time, forceSend);
+ unlock_and_signal();
+ return ret_val;
+}
+
+int PollGuard::wait_scan(int wait_time, NodeId nodeId, bool forceSend)
+{
+ m_waiter->set_node(nodeId);
+ m_waiter->set_state(WAIT_SCAN);
+ return wait_for_input_in_loop(wait_time, forceSend);
+}
+
+int PollGuard::wait_for_input_in_loop(int wait_time, bool forceSend)
+{
+ int ret_val, response_time;
+ if (forceSend)
+ m_tp->forceSend(m_block_no);
+ else
+ m_tp->checkForceSend(m_block_no);
+ if (wait_time == -1) //Means wait forever
+ response_time= WAITFOR_RESPONSE_TIMEOUT;
+ else
+ response_time= wait_time;
+ NDB_TICKS curr_time = NdbTick_CurrentMillisecond();
+ NDB_TICKS max_time = curr_time + (NDB_TICKS)wait_time;
+ do
+ {
+ wait_for_input(response_time);
+ Uint32 state= m_waiter->get_state();
+ if (state == NO_WAIT)
+ {
+ return 0;
+ }
+ else if (state == WAIT_NODE_FAILURE)
+ {
+ ret_val= -2;
+ break;
+ }
+ if (wait_time == -1)
+ {
+#ifdef VM_TRACE
+ ndbout << "Waited WAITFOR_RESPONSE_TIMEOUT, continuing wait" << endl;
+#endif
+ continue;
+ }
+ wait_time= max_time - NdbTick_CurrentMillisecond();
+ if (wait_time <= 0)
+ {
+#ifdef VM_TRACE
+ ndbout << "Time-out state is " << m_waiter->get_state() << endl;
+#endif
+ m_waiter->set_state(WST_WAIT_TIMEOUT);
+ ret_val= -1;
+ break;
+ }
+ } while (1);
+#ifdef VM_TRACE
+ ndbout << "ERR: receiveResponse - theImpl->theWaiter.m_state = ";
+ ndbout << m_waiter->get_state() << endl;
+#endif
+ m_waiter->set_state(NO_WAIT);
+ return ret_val;
+}
+
+void PollGuard::wait_for_input(int wait_time)
+{
+ NdbWaiter *t_poll_owner= m_tp->get_poll_owner();
+ if (t_poll_owner != NULL && t_poll_owner != m_waiter)
+ {
+ /*
+ We didn't get hold of the poll "right". We will sleep on a
+ conditional mutex until the thread owning the poll "right"
+ will wake us up after all data is received. If no data arrives
+ we will wake up eventually due to the timeout.
+ After receiving all data we take the object out of the cond wait
+ queue if it hasn't happened already. It is usually already out of the
+ queue but at time-out it could be that the object is still there.
+ */
+ Uint32 cond_wait_index= m_tp->put_in_cond_wait_queue(m_waiter);
+ m_waiter->wait(wait_time);
+ if (m_waiter->get_cond_wait_index() != TransporterFacade::MAX_NO_THREADS)
+ {
+ m_tp->remove_from_cond_wait_queue(m_waiter);
+ }
+ }
+ else
+ {
+ /*
+ We got the poll "right" and we poll until data is received. After
+ receiving data we will check if all data is received, if not we
+ poll again.
+ */
+ if (t_poll_owner)
+ {
+#ifdef NDB_SHM_TRANSPORTER
+ /*
+ If shared memory transporters are used we need to set our sigmask
+ such that we wake up also on interrupts on the shared memory
+ interrupt signal.
+ */
+ NdbThread_set_shm_sigmask(false);
+#endif
+ m_tp->set_poll_owner(m_waiter);
+ m_waiter->set_poll_owner(true);
+ }
+ m_tp->external_poll((Uint32)wait_time);
+ }
+}
+
+void PollGuard::unlock_and_signal()
+{
+ NdbWaiter *t_signal_cond_waiter= 0;
+ if (!m_locked)
+ return;
+ /*
+ When completing the poll for this thread we must return the poll
+ ownership if we own it. We will give it to the last thread that
+ came here (the most recent) which is likely to be the one also
+ last to complete. We will remove that thread from the conditional
+ wait queue and set him as the new owner of the poll "right".
+ We will wait however with the signal until we have unlocked the
+ mutex for performance reasons.
+ See Stevens book on Unix NetworkProgramming: The Sockets Networking
+ API Volume 1 Third Edition on page 703-704 for a discussion on this
+ subject.
+ */
+ if (m_tp->get_poll_owner() == m_waiter)
+ {
+#ifdef NDB_SHM_TRANSPORTER
+ /*
+ If shared memory transporters are used we need to reset our sigmask
+ since we are no longer the thread to receive interrupts.
+ */
+ NdbThread_set_shm_sigmask(true);
+#endif
+ m_waiter->set_poll_owner(false);
+ t_signal_cond_waiter= m_tp->rem_last_from_cond_wait_queue();
+ m_tp->set_poll_owner(t_signal_cond_waiter);
+ if (t_signal_cond_waiter)
+ t_signal_cond_waiter->set_poll_owner(true);
+ }
+ m_tp->unlock_mutex();
+ if (t_signal_cond_waiter)
+ t_signal_cond_waiter->cond_signal();
+ m_locked=false;
+}
+
template class Vector<NodeStatusFunction>;
template class Vector<TransporterFacade::ThreadData::Object_Execute>;
diff --git a/storage/ndb/src/ndbapi/TransporterFacade.hpp b/storage/ndb/src/ndbapi/TransporterFacade.hpp
index fa070889dd9..34e1a944276 100644
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp
@@ -34,6 +34,7 @@ class ConfigRetriever;
class Ndb;
class NdbApiSignal;
+class NdbWaiter;
typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
@@ -47,6 +48,11 @@ extern "C" {
class TransporterFacade
{
public:
+ /**
+ * Max number of Ndb objects.
+ * (Ndb objects should not be shared by different threads.)
+ */
+ STATIC_CONST( MAX_NO_THREADS = 4711 );
TransporterFacade();
virtual ~TransporterFacade();
bool init(Uint32, const ndb_mgm_configuration *);
@@ -114,10 +120,44 @@ public:
TransporterRegistry* get_registry() { return theTransporterRegistry;};
+/*
+ When a thread has sent its signals and is ready to wait for reception
+ of these it does normally always wait on a conditional mutex and
+ the actual reception is handled by the receiver thread in the NDB API.
+ With the below new methods and variables each thread has the possibility
+ of becoming owner of the "right" to poll for signals. Effectually this
+ means that the thread acts temporarily as a receiver thread.
+ For the thread that succeeds in grabbing this "ownership" it will avoid
+ a number of expensive calls to conditional mutex and even more expensive
+ context switches to wake up.
+ When an owner of the poll "right" has completed its own task it is likely
+ that there are others still waiting. In this case we pick one of the
+ threads as new owner of the poll "right". Since we want to switch owner
+ as seldom as possible we always pick the last thread which is likely to
+ be the last to complete its reception.
+*/
+ void external_poll(Uint32 wait_time);
+ NdbWaiter* get_poll_owner(void) const { return poll_owner; }
+ void set_poll_owner(NdbWaiter* new_owner) { poll_owner= new_owner; }
+ Uint32 put_in_cond_wait_queue(NdbWaiter *aWaiter);
+ void remove_from_cond_wait_queue(NdbWaiter *aWaiter);
+ NdbWaiter* rem_last_from_cond_wait_queue();
// heart beat received from a node (e.g. a signal came)
void hb_received(NodeId n);
private:
+ void init_cond_wait_queue();
+ struct CondWaitQueueElement {
+ NdbWaiter *cond_wait_object;
+ Uint32 next_cond_wait;
+ Uint32 prev_cond_wait;
+ };
+ NdbWaiter *poll_owner;
+ CondWaitQueueElement cond_wait_array[MAX_NO_THREADS];
+ Uint32 first_in_cond_wait;
+ Uint32 first_free_cond_wait;
+ Uint32 last_in_cond_wait;
+ /* End poll owner stuff */
/**
* Send a signal unconditional of node status (used by ClusterMgr)
*/
@@ -172,12 +212,6 @@ private:
/**
* Block number handling
*/
-public:
- /**
- * Max number of Ndb objects.
- * (Ndb objects should not be shared by different threads.)
- */
- STATIC_CONST( MAX_NO_THREADS = 4711 );
private:
struct ThreadData {
@@ -245,6 +279,24 @@ public:
GlobalDictCache m_globalDictCache;
};
+class PollGuard
+{
+ public:
+ PollGuard(TransporterFacade *tp, NdbWaiter *aWaiter, Uint32 block_no);
+ ~PollGuard() { unlock_and_signal(); }
+ int wait_n_unlock(int wait_time, NodeId nodeId, Uint32 state,
+ bool forceSend= false);
+ int wait_for_input_in_loop(int wait_time, bool forceSend);
+ void wait_for_input(int wait_time);
+ int wait_scan(int wait_time, NodeId nodeId, bool forceSend);
+ void unlock_and_signal();
+ private:
+ TransporterFacade *m_tp;
+ NdbWaiter *m_waiter;
+ Uint32 m_block_no;
+ bool m_locked;
+};
+
inline
TransporterFacade*
TransporterFacade::instance()