diff options
author | unknown <brian@avenger.(none)> | 2004-11-24 12:34:44 -0800 |
---|---|---|
committer | unknown <brian@avenger.(none)> | 2004-11-24 12:34:44 -0800 |
commit | 80282a9418baf94eccaac7af6ec389a87f3f79b8 (patch) | |
tree | 184c2f723a69e1c6ecf790a3a85c9dcb045efea1 /ndb/src | |
parent | 0f61fec47e427e7780dbe67900b173a5fd99ac73 (diff) | |
parent | c3272ae7186bb59a406a671080b98766dc8e4064 (diff) | |
download | mariadb-git-80282a9418baf94eccaac7af6ec389a87f3f79b8.tar.gz |
Merging 4.1 to 5.0
Build-tools/Do-compile:
Auto merged
client/Makefile.am:
Auto merged
client/mysqladmin.cc:
Auto merged
configure.in:
Auto merged
ndb/src/common/util/version.c:
Auto merged
sql/ha_ndbcluster.cc:
Auto merged
sql/ha_ndbcluster.h:
Auto merged
ndb/src/mgmsrv/main.cpp:
Resolved NDB conflict between 4.1 and 5.0
Diffstat (limited to 'ndb/src')
28 files changed, 722 insertions, 501 deletions
diff --git a/ndb/src/common/debugger/signaldata/ScanTab.cpp b/ndb/src/common/debugger/signaldata/ScanTab.cpp index 72a4d9f94b9..0755ee0a856 100644 --- a/ndb/src/common/debugger/signaldata/ScanTab.cpp +++ b/ndb/src/common/debugger/signaldata/ScanTab.cpp @@ -30,13 +30,14 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv fprintf(output, " apiConnectPtr: H\'%.8x", sig->apiConnectPtr); fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo); - fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u\n", + fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u ReadCommitted: %u\n", sig->getParallelism(requestInfo), sig->getScanBatch(requestInfo), sig->getLockMode(requestInfo), + sig->getKeyinfoFlag(requestInfo), sig->getHoldLockFlag(requestInfo), sig->getRangeScanFlag(requestInfo), - sig->getKeyinfoFlag(requestInfo)); + sig->getReadCommittedFlag(requestInfo)); Uint32 keyLen = (sig->attrLenKeyLen >> 16); Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF); diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp index a1b979f62d8..0af5eb2f83c 100644 --- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp +++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp @@ -20,7 +20,6 @@ #include <ConfigRetriever.hpp> #include <SocketServer.hpp> -#include "LocalConfig.hpp" #include <NdbSleep.h> #include <NdbOut.hpp> @@ -45,90 +44,62 @@ //**************************************************************************** //**************************************************************************** -ConfigRetriever::ConfigRetriever(LocalConfig &local_config, +ConfigRetriever::ConfigRetriever(const char * _connect_string, Uint32 version, Uint32 node_type) - : _localConfig(local_config) { - m_handle= 0; m_version = version; m_node_type = node_type; - _ownNodeId = _localConfig._ownNodeId; -} + _ownNodeId= 0; -ConfigRetriever::~ConfigRetriever(){ + m_handle= ndb_mgm_create_handle(); + if (m_handle == 0) { + setError(CR_ERROR, "Unable to allocate mgm handle"); + return; + } + + if (ndb_mgm_set_connectstring(m_handle, _connect_string)) + { + setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle)); + return; + } + resetError(); +} + +ConfigRetriever::~ConfigRetriever() +{ if (m_handle) { ndb_mgm_disconnect(m_handle); ndb_mgm_destroy_handle(&m_handle); } } +Uint32 +ConfigRetriever::get_configuration_nodeid() const +{ + return ndb_mgm_get_configuration_nodeid(m_handle); +} + +Uint32 ConfigRetriever::get_mgmd_port() const +{ + return ndb_mgm_get_connected_port(m_handle); +} + +const char *ConfigRetriever::get_mgmd_host() const +{ + return ndb_mgm_get_connected_host(m_handle); +} //**************************************************************************** //**************************************************************************** int -ConfigRetriever::do_connect(int exit_on_connect_failure){ - - m_mgmd_port= 0; - m_mgmd_host= 0; - - if(!m_handle) - m_handle= ndb_mgm_create_handle(); - - if (m_handle == 0) { - setError(CR_ERROR, "Unable to allocate mgm handle"); - return -1; - } - - int retry = 1; - int retry_max = 12; // Max number of retry attempts - int retry_interval= 5; // Seconds between each retry - while(retry < retry_max){ - Uint32 type = CR_ERROR; - BaseString tmp; - for (unsigned int i = 0; i<_localConfig.ids.size(); i++){ - MgmtSrvrId * m = &_localConfig.ids[i]; - DBUG_PRINT("info",("trying %s:%d", - m->name.c_str(), - m->port)); - switch(m->type){ - case MgmId_TCP: - tmp.assfmt("%s:%d", m->name.c_str(), m->port); - if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) { - m_mgmd_port= m->port; - m_mgmd_host= m->name.c_str(); - DBUG_PRINT("info",("connected to ndb_mgmd at %s:%d", - m_mgmd_host, - m_mgmd_port)); - return 0; - } - setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle)); - case MgmId_File: - break; - } - } - if(latestErrorType == CR_RETRY){ - DBUG_PRINT("info",("CR_RETRY")); - if (exit_on_connect_failure) - return 1; - REPORT_WARNING("Failed to retrieve cluster configuration"); - ndbout << "(Cause of failure: " << getErrorString() << ")" << endl; - ndbout << "Attempt " << retry << " of " << retry_max << ". " - << "Trying again in "<< retry_interval <<" seconds..." - << endl << endl; - NdbSleep_SecSleep(retry_interval); - } else { - break; - } - retry++; - } - - ndb_mgm_destroy_handle(&m_handle); - m_handle= 0; - m_mgmd_port= 0; - m_mgmd_host= 0; - return -1; +ConfigRetriever::do_connect(int no_retries, + int retry_delay_in_seconds, int verbose) +{ + return + (ndb_mgm_connect(m_handle,no_retries,retry_delay_in_seconds,verbose)==0) ? + 0 : -1; } //**************************************************************************** @@ -140,22 +111,9 @@ ConfigRetriever::getConfig() { struct ndb_mgm_configuration * p = 0; - if(m_handle != 0){ + if(m_handle != 0) p = getConfig(m_handle); - } else { - for (unsigned int i = 0; i<_localConfig.ids.size(); i++){ - MgmtSrvrId * m = &_localConfig.ids[i]; - switch(m->type){ - case MgmId_File: - p = getConfig(m->name.c_str()); - break; - case MgmId_TCP: - break; - } - if(p) - break; - } - } + if(p == 0) return 0; @@ -227,6 +185,16 @@ ConfigRetriever::setError(ErrorType et, const char * s){ latestErrorType = et; } +void +ConfigRetriever::resetError(){ + setError(CR_NO_ERROR,0); +} + +int +ConfigRetriever::hasError() +{ + return latestErrorType != CR_NO_ERROR; +} const char * ConfigRetriever::getErrorString(){ @@ -341,16 +309,23 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf, Uint32 } Uint32 -ConfigRetriever::allocNodeId(){ - unsigned nodeid= _ownNodeId; - - if(m_handle != 0){ - int res= ndb_mgm_alloc_nodeid(m_handle, m_version, &nodeid, m_node_type); - if(res != 0) { - setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle)); - return 0; +ConfigRetriever::allocNodeId(int no_retries, int retry_delay_in_seconds) +{ + _ownNodeId= 0; + if(m_handle != 0) + { + while (1) + { + int res= ndb_mgm_alloc_nodeid(m_handle, m_version, m_node_type); + if(res >= 0) + return _ownNodeId= (Uint32)res; + if (no_retries == 0) + break; + no_retries--; + NdbSleep_SecSleep(retry_delay_in_seconds); } - } - - return _ownNodeId= nodeid; + setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle)); + } else + setError(CR_ERROR, "management server handle not initialized"); + return 0; } diff --git a/ndb/src/common/util/version.c b/ndb/src/common/util/version.c index 965d0a735e1..7a537297861 100644 --- a/ndb/src/common/util/version.c +++ b/ndb/src/common/util/version.c @@ -70,7 +70,6 @@ struct NdbUpGradeCompatible { #ifndef TEST_VERSION struct NdbUpGradeCompatible ndbCompatibleTable_full[] = { { MAKE_VERSION(3,5,2), MAKE_VERSION(3,5,1), UG_Exact }, - { MAKE_VERSION(4,1,8), MAKE_VERSION(3,5,4), UG_Exact }, /* Aligned version with MySQL */ { 0, 0, UG_Null } }; diff --git a/ndb/src/kernel/blocks/dbdih/Dbdih.hpp b/ndb/src/kernel/blocks/dbdih/Dbdih.hpp index 14fa262f871..0a2d50cb876 100644 --- a/ndb/src/kernel/blocks/dbdih/Dbdih.hpp +++ b/ndb/src/kernel/blocks/dbdih/Dbdih.hpp @@ -147,7 +147,6 @@ public: Uint32 nfConnect; Uint32 table; Uint32 userpointer; - Uint32 nodeCount; BlockReference userblockref; }; typedef Ptr<ConnectRecord> ConnectRecordPtr; diff --git a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp index 76aa745c3e0..4592b121c7e 100644 --- a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp +++ b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp @@ -7080,24 +7080,22 @@ void Dbdih::execDIGETPRIMREQ(Signal* signal) ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE); connectPtr.i = signal->theData[0]; - if(connectPtr.i != RNIL){ + if(connectPtr.i != RNIL) + { jam(); ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord); - ndbrequire(connectPtr.p->connectState == ConnectRecord::INUSE); - getFragstore(tabPtr.p, fragId, fragPtr); - connectPtr.p->nodeCount = extractNodeInfo(fragPtr.p, connectPtr.p->nodes); signal->theData[0] = connectPtr.p->userpointer; - signal->theData[1] = passThrough; - signal->theData[2] = connectPtr.p->nodes[0]; - sendSignal(connectPtr.p->userblockref, GSN_DIGETPRIMCONF, signal, 3, JBB); - return; - }//if - //connectPtr.i == RNIL -> question without connect record + } + else + { + jam(); + signal->theData[0] = RNIL; + } + Uint32 nodes[MAX_REPLICAS]; getFragstore(tabPtr.p, fragId, fragPtr); Uint32 count = extractNodeInfo(fragPtr.p, nodes); - signal->theData[0] = RNIL; signal->theData[1] = passThrough; signal->theData[2] = nodes[0]; signal->theData[3] = nodes[1]; diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index 739c3c741fb..0c63cb5fe17 100644 --- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -550,6 +550,11 @@ public: UintR scanErrorCounter; UintR scanLocalFragid; UintR scanSchemaVersion; + + /** + * This is _always_ main table, even in range scan + * in which case scanTcrec->fragmentptr is different + */ Uint32 fragPtrI; UintR scanStoredProcId; ScanState scanState; @@ -2925,4 +2930,23 @@ Dblqh::ScanRecord::check_scan_batch_completed() const (max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes)); } +inline +void +Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index) +{ + if (index == 0) { + acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0]; + } else { + Uint32 attr_buf_index, attr_buf_rec; + + AttrbufPtr regAttrPtr; + jam(); + attr_buf_rec= (index + 31) / 32; + attr_buf_index= (index - 1) & 31; + regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; + ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); + acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index]; + } +} + #endif diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 5622706a96c..88e8f25b004 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -3084,6 +3084,7 @@ void Dblqh::execATTRINFO(Signal* signal) return; break; default: + ndbout_c("%d", regTcPtr->transactionState); ndbrequire(false); break; }//switch @@ -7161,10 +7162,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal) // Update timer on tcConnectRecord tcConnectptr.p->tcTimer = cLqhTimeOutCount; - init_acc_ptr_list(scanptr.p); - scanptr.p->m_curr_batch_size_rows = 0; - scanptr.p->m_curr_batch_size_bytes= 0; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; scanNextLoopLab(signal); }//Dblqh::continueScanNextReqLab() @@ -7363,22 +7361,32 @@ void Dblqh::scanLockReleasedLab(Signal* signal) tcConnectptr.i = scanptr.p->scanTcrec; ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); releaseActiveFrag(signal); + if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) { if ((scanptr.p->scanErrorCounter > 0) || (scanptr.p->scanCompletedStatus == ZTRUE)) { jam(); + scanptr.p->m_curr_batch_size_rows = 0; + scanptr.p->m_curr_batch_size_bytes = 0; closeScanLab(signal); } else if (scanptr.p->check_scan_batch_completed() && scanptr.p->scanLockHold != ZTRUE) { jam(); scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; sendScanFragConf(signal, ZFALSE); + } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) { + jam(); + closeScanLab(signal); + return; } else { jam(); /* - We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only - come here when scanHoldLock == ZTRUE - */ + * We came here after releasing locks after + * receiving SCAN_NEXTREQ from TC. We only come here + * when scanHoldLock == ZTRUE + */ + scanptr.p->m_curr_batch_size_rows = 0; + scanptr.p->m_curr_batch_size_bytes = 0; continueScanNextReqLab(signal); }//if } else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) { @@ -7465,25 +7473,6 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP) scanP->scan_acc_index = 0; } -inline -void -Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index) -{ - if (index == 0) { - acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0]; - } else { - Uint32 attr_buf_index, attr_buf_rec; - - AttrbufPtr regAttrPtr; - jam(); - attr_buf_rec= (index + 31) / 32; - attr_buf_index= (index - 1) & 31; - regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; - ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); - acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index]; - } -} - Uint32 Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index, @@ -7714,6 +7703,9 @@ void Dblqh::abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode){ jam(); scanptr.i = scan_ptr_i; c_scanRecordPool.getPtr(scanptr); + + fragptr.i = tcConnectptr.p->fragmentptr; + ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord); finishScanrec(signal); releaseScanrec(signal); tcConnectptr.p->transactionState = TcConnectionrec::IDLE; @@ -8007,6 +7999,13 @@ void Dblqh::nextScanConfScanLab(Signal* signal) /************************************************************* * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. ************************************************************ */ + if (!scanptr.p->scanLockHold) + { + jam(); + closeScanLab(signal); + return; + } + if (scanptr.p->scanCompletedStatus == ZTRUE) { if ((scanptr.p->scanLockHold == ZTRUE) && (scanptr.p->m_curr_batch_size_rows > 0)) { @@ -8507,8 +8506,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal) ScanFragRef::SignalLength, JBB); } else { jam(); - scanptr.p->m_curr_batch_size_rows = 0; - scanptr.p->m_curr_batch_size_bytes= 0; sendScanFragConf(signal, ZSCAN_FRAG_CLOSED); }//if finishScanrec(signal); @@ -8576,10 +8573,12 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) /** * Used for scan take over */ - FragrecordPtr tFragPtr; - tFragPtr.i = fragptr.p->tableFragptr; - ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); - scanptr.p->fragPtrI = fragptr.p->tableFragptr; + { + FragrecordPtr tFragPtr; + tFragPtr.i = fragptr.p->tableFragptr; + ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); + scanptr.p->fragPtrI = fragptr.p->tableFragptr; + } /** * !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11 @@ -8588,8 +8587,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ); Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1); stop += start; - Uint32 free = tFragPtr.p->m_scanNumberMask.find(start); - + Uint32 free = fragptr.p->m_scanNumberMask.find(start); + if(free == Fragrecord::ScanNumberMask::NotFound || free >= stop){ jam(); @@ -8603,16 +8602,16 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) */ scanptr.p->scanState = ScanRecord::IN_QUEUE; LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, - tFragPtr.p->m_queuedScans); + fragptr.p->m_queuedScans); queue.add(scanptr); return ZOK; } - + scanptr.p->scanNumber = free; - tFragPtr.p->m_scanNumberMask.clear(free);// Update mask + fragptr.p->m_scanNumberMask.clear(free);// Update mask - LocalDLList<ScanRecord> active(c_scanRecordPool, tFragPtr.p->m_activeScans); + LocalDLList<ScanRecord> active(c_scanRecordPool, fragptr.p->m_activeScans); active.add(scanptr); if(scanptr.p->scanKeyinfoFlag){ jam(); @@ -8672,12 +8671,8 @@ void Dblqh::finishScanrec(Signal* signal) { release_acc_ptr_list(scanptr.p); - FragrecordPtr tFragPtr; - tFragPtr.i = scanptr.p->fragPtrI; - ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); - LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, - tFragPtr.p->m_queuedScans); + fragptr.p->m_queuedScans); if(scanptr.p->scanState == ScanRecord::IN_QUEUE){ jam(); @@ -8695,11 +8690,11 @@ void Dblqh::finishScanrec(Signal* signal) ndbrequire(tmp.p == scanptr.p); } - LocalDLList<ScanRecord> scans(c_scanRecordPool, tFragPtr.p->m_activeScans); + LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans); scans.release(scanptr); const Uint32 scanNumber = scanptr.p->scanNumber; - ndbrequire(!tFragPtr.p->m_scanNumberMask.get(scanNumber)); + ndbrequire(!fragptr.p->m_scanNumberMask.get(scanNumber)); ScanRecordPtr restart; /** @@ -8707,13 +8702,13 @@ void Dblqh::finishScanrec(Signal* signal) */ if(scanNumber == NR_ScanNo || !queue.first(restart)){ jam(); - tFragPtr.p->m_scanNumberMask.set(scanNumber); + fragptr.p->m_scanNumberMask.set(scanNumber); return; } if(ERROR_INSERTED(5034)){ jam(); - tFragPtr.p->m_scanNumberMask.set(scanNumber); + fragptr.p->m_scanNumberMask.set(scanNumber); return; } @@ -8724,7 +8719,7 @@ void Dblqh::finishScanrec(Signal* signal) ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); restart.p->scanNumber = scanNumber; restart.p->scanState = ScanRecord::WAIT_ACC_SCAN; - + queue.remove(restart); scans.add(restart); if(restart.p->scanKeyinfoFlag){ @@ -8912,6 +8907,13 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted) conf->total_len= total_len; sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF, signal, ScanFragConf::SignalLength, JBB); + + if(!scanptr.p->scanLockHold) + { + jam(); + scanptr.p->m_curr_batch_size_rows = 0; + scanptr.p->m_curr_batch_size_bytes= 0; + } }//Dblqh::sendScanFragConf() /* ######################################################################### */ diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp index a209df24c44..fb90ccc8c90 100644 --- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp +++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp @@ -1054,9 +1054,8 @@ public: // Id of the ScanRecord this fragment scan belongs to Uint32 scanRec; - // The maximum number of operations that can be scanned before - // returning to TC - Uint16 scanFragConcurrency; + // The value of fragmentCompleted in the last received SCAN_FRAGCONF + Uint8 m_scan_frag_conf_status; inline void startFragTimer(Uint32 timeVal){ scanFragTimer = timeVal; @@ -1193,8 +1192,10 @@ public: // Number of operation records per scanned fragment // Number of operations in first batch // Max number of bytes per batch - Uint16 noOprecPerFrag; - Uint16 first_batch_size; + union { + Uint16 first_batch_size_rows; + Uint16 batch_size_rows; + }; Uint32 batch_byte_size; Uint32 scanRequestInfo; // ScanFrag format diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index d8b3ee10532..07dbb370ec6 100644 --- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -8646,9 +8646,9 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, scanptr.p->scanTableref = tabptr.i; scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion; scanptr.p->scanParallel = scanParallel; - scanptr.p->noOprecPerFrag = noOprecPerFrag; - scanptr.p->first_batch_size= scanTabReq->first_batch_size; - scanptr.p->batch_byte_size= scanTabReq->batch_byte_size; + scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size; + scanptr.p->batch_byte_size = scanTabReq->batch_byte_size; + scanptr.p->batch_size_rows = noOprecPerFrag; Uint32 tmp = 0; const UintR ri = scanTabReq->requestInfo; @@ -8672,7 +8672,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, ndbrequire(list.seize(ptr)); ptr.p->scanRec = scanptr.i; ptr.p->scanFragId = 0; - ptr.p->scanFragConcurrency = noOprecPerFrag; ptr.p->m_apiPtr = cdata[i]; }//for @@ -8945,6 +8944,25 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal) scanptr.i = scanFragptr.p->scanRec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); + /** + * This must be false as select count(*) otherwise + * can "pass" committing on backup fragments and + * get incorrect row count + */ + if(false && ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo)) + { + jam(); + Uint32 max = 3+signal->theData[6]; + Uint32 nodeid = getOwnNodeId(); + for(Uint32 i = 3; i<max; i++) + if(signal->theData[i] == nodeid) + { + jam(); + tnodeid = nodeid; + break; + } + } + { /** * Check table @@ -9141,6 +9159,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0]; const Uint32 noCompletedOps = conf->completedOps; + const Uint32 status = conf->fragmentCompleted; scanFragptr.i = conf->senderData; c_scan_frag_pool.getPtr(scanFragptr); @@ -9163,11 +9182,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE); - const Uint32 status = conf->fragmentCompleted; - if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){ jam(); - if(status == ZFALSE){ + if(status == 0){ /** * We have started closing = we sent a close -> ignore this */ @@ -9184,11 +9201,11 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) return; } - if(status == ZCLOSED && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){ + if(noCompletedOps == 0 && status != 0 && + scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){ /** * Start on next fragment */ - ndbrequire(noCompletedOps == 0); scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; scanFragptr.p->startFragTimer(ctcTimer); @@ -9218,6 +9235,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) scanptr.p->m_queued_count++; } + scanFragptr.p->m_scan_frag_conf_status = status; scanFragptr.p->m_ops = noCompletedOps; scanFragptr.p->m_totalLen = total_len; scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY; @@ -9311,7 +9329,6 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) /********************************************************************* * APPLICATION IS CLOSING THE SCAN. **********************************************************************/ - ndbrequire(len == 0); close_scan_req(signal, scanptr, true); return; }//if @@ -9330,11 +9347,12 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) // Copy op ptrs so I dont overwrite them when sending... memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len); - ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0]; - nextReq->closeFlag = ZFALSE; - nextReq->transId1 = apiConnectptr.p->transid[0]; - nextReq->transId2 = apiConnectptr.p->transid[1]; - nextReq->batch_size_bytes= scanP->batch_byte_size; + ScanFragNextReq tmp; + tmp.closeFlag = ZFALSE; + tmp.transId1 = apiConnectptr.p->transid[0]; + tmp.transId2 = apiConnectptr.p->transid[1]; + tmp.batch_size_rows = scanP->batch_size_rows; + tmp.batch_size_bytes = scanP->batch_byte_size; ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags); ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags); @@ -9344,15 +9362,37 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) c_scan_frag_pool.getPtr(scanFragptr); ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED); - scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE; scanFragptr.p->startFragTimer(ctcTimer); - scanFragptr.p->m_ops = 0; - nextReq->senderData = scanFragptr.i; - nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency; - sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, - ScanFragNextReq::SignalLength, JBB); + if(scanFragptr.p->m_scan_frag_conf_status) + { + /** + * last scan was complete + */ + jam(); + ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag); + scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; + + tcConnectptr.i = scanptr.p->scanTcrec; + ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord); + scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++; + signal->theData[0] = tcConnectptr.p->dihConnectptr; + signal->theData[1] = scanFragptr.i; + signal->theData[2] = scanptr.p->scanTableref; + signal->theData[3] = scanFragptr.p->scanFragId; + sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB); + } + else + { + jam(); + scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE; + ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend(); + * req = tmp; + req->senderData = scanFragptr.i; + sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, + ScanFragNextReq::SignalLength, JBB); + } delivered.remove(scanFragptr); running.add(scanFragptr); }//for @@ -9416,7 +9456,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){ ndbrequire(curr.p->scanFragState == ScanFragRec::DELIVERED); delivered.remove(curr); - if(curr.p->m_ops > 0){ + if(curr.p->m_ops > 0 && curr.p->m_scan_frag_conf_status == 0){ jam(); running.add(curr); curr.p->scanFragState = ScanFragRec::LQH_ACTIVE; @@ -9551,7 +9591,7 @@ void Dbtc::sendScanFragReq(Signal* signal, req->transId1 = apiConnectptr.p->transid[0]; req->transId2 = apiConnectptr.p->transid[1]; req->clientOpPtr = scanFragP->m_apiPtr; - req->batch_size_rows= scanFragP->scanFragConcurrency; + req->batch_size_rows= scanP->batch_size_rows; req->batch_size_bytes= scanP->batch_byte_size; sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, ScanFragReq::SignalLength, JBB); @@ -9573,6 +9613,8 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) { jam(); ops += 21; } + + Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId; ScanTabConf * conf = (ScanTabConf*)&signal->theData[0]; conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect; @@ -9588,24 +9630,25 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) { ScanFragRecPtr curr = ptr; // Remove while iterating... queued.next(ptr); + bool done = curr.p->m_scan_frag_conf_status && --left; + * ops++ = curr.p->m_apiPtr; - * ops++ = curr.i; + * ops++ = done ? RNIL : curr.i; * ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops; queued.remove(curr); - if(curr.p->m_ops > 0){ + if(!done){ delivered.add(curr); curr.p->scanFragState = ScanFragRec::DELIVERED; curr.p->stopFragTimer(); } else { - (* --ops) = ScanTabConf::EndOfData; ops++; c_scan_frag_pool.release(curr); curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->stopFragTimer(); } } } - + if(scanPtr.p->m_delivered_scan_frags.isEmpty() && scanPtr.p->m_running_scan_frags.isEmpty()){ conf->requestInfo = op_count | ScanTabConf::EndOfData; @@ -10424,9 +10467,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) sfp.i, sfp.p->scanFragState, sfp.p->scanFragId); - infoEvent(" nodeid=%d, concurr=%d, timer=%d", + infoEvent(" nodeid=%d, timer=%d", refToNode(sfp.p->lqhBlockref), - sfp.p->scanFragConcurrency, sfp.p->scanFragTimer); } @@ -10504,7 +10546,7 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) sp.p->scanAiLength, sp.p->scanParallel, sp.p->scanReceivedOperations, - sp.p->noOprecPerFrag); + sp.p->batch_size_rows); infoEvent(" schv=%d, tab=%d, sproc=%d", sp.p->scanSchemaVersion, sp.p->scanTableref, diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp index d11d5f7176a..f6d9a0ac35a 100644 --- a/ndb/src/kernel/blocks/suma/Suma.cpp +++ b/ndb/src/kernel/blocks/suma/Suma.cpp @@ -1888,7 +1888,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ req->requestInfo = 0; req->savePointId = 0; ScanFragReq::setLockMode(req->requestInfo, 0); - ScanFragReq::setHoldLockFlag(req->requestInfo, 0); + ScanFragReq::setHoldLockFlag(req->requestInfo, 1); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); ScanFragReq::setAttrLen(req->requestInfo, attrLen); req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo; diff --git a/ndb/src/kernel/main.cpp b/ndb/src/kernel/main.cpp index 926647838c9..f34e16318cd 100644 --- a/ndb/src/kernel/main.cpp +++ b/ndb/src/kernel/main.cpp @@ -19,7 +19,6 @@ #include <ndb_version.h> #include "Configuration.hpp" -#include <LocalConfig.hpp> #include <TransporterRegistry.hpp> #include "vm/SimBlockList.hpp" @@ -69,16 +68,9 @@ int main(int argc, char** argv) return NRT_Default; } - LocalConfig local_config; - if (!local_config.init(theConfig->getConnectString(),0)){ - local_config.printError(); - local_config.printUsage(); - return NRT_Default; - } - { // Do configuration signal(SIGPIPE, SIG_IGN); - theConfig->fetch_configuration(local_config); + theConfig->fetch_configuration(); } chdir(NdbConfig_get_path(0)); @@ -141,7 +133,7 @@ int main(int argc, char** argv) exit(0); } g_eventLogger.info("Ndb has terminated (pid %d) restarting", child); - theConfig->fetch_configuration(local_config); + theConfig->fetch_configuration(); } g_eventLogger.info("Angel pid: %d ndb pid: %d", getppid(), getpid()); diff --git a/ndb/src/kernel/vm/Configuration.cpp b/ndb/src/kernel/vm/Configuration.cpp index aac035fe1b7..931b4da5a17 100644 --- a/ndb/src/kernel/vm/Configuration.cpp +++ b/ndb/src/kernel/vm/Configuration.cpp @@ -17,7 +17,6 @@ #include <ndb_global.h> #include <ndb_opts.h> -#include <LocalConfig.hpp> #include "Configuration.hpp" #include <ErrorHandlingMacros.hpp> #include "GlobalData.hpp" @@ -35,6 +34,7 @@ #include <kernel_types.h> #include <ndb_limits.h> +#include <ndbapi_limits.h> #include "pc.hpp" #include <LogLevel.hpp> #include <NdbSleep.h> @@ -108,7 +108,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), bool Configuration::init(int argc, char** argv) { - const char *load_default_groups[]= { "ndbd",0 }; + const char *load_default_groups[]= { "mysql_cluster","ndbd",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; @@ -189,7 +189,7 @@ Configuration::closeConfiguration(){ } void -Configuration::fetch_configuration(LocalConfig &local_config){ +Configuration::fetch_configuration(){ /** * Fetch configuration from management server */ @@ -199,8 +199,17 @@ Configuration::fetch_configuration(LocalConfig &local_config){ m_mgmd_port= 0; m_mgmd_host= 0; - m_config_retriever= new ConfigRetriever(local_config, NDB_VERSION, NODE_TYPE_DB); - if(m_config_retriever->do_connect() == -1){ + m_config_retriever= new ConfigRetriever(getConnectString(), + NDB_VERSION, NODE_TYPE_DB); + + if (m_config_retriever->hasError()) + { + ERROR_SET(fatal, ERR_INVALID_CONFIG, + "Could not connect initialize handle to management server", + m_config_retriever->getErrorString()); + } + + if(m_config_retriever->do_connect(12,5,1) == -1){ const char * s = m_config_retriever->getErrorString(); if(s == 0) s = "No error given!"; @@ -215,13 +224,7 @@ Configuration::fetch_configuration(LocalConfig &local_config){ ConfigRetriever &cr= *m_config_retriever; - if((globalData.ownId = cr.allocNodeId()) == 0){ - for(Uint32 i = 0; i<3; i++){ - NdbSleep_SecSleep(3); - if((globalData.ownId = cr.allocNodeId()) != 0) - break; - } - } + globalData.ownId = cr.allocNodeId(2 /*retry*/,3 /*delay*/); if(globalData.ownId == 0){ ERROR_SET(fatal, ERR_INVALID_CONFIG, @@ -452,6 +455,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){ unsigned int noOfTables = 0; unsigned int noOfUniqueHashIndexes = 0; unsigned int noOfOrderedIndexes = 0; + unsigned int noOfTriggers = 0; unsigned int noOfReplicas = 0; unsigned int noOfDBNodes = 0; unsigned int noOfAPINodes = 0; @@ -476,6 +480,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){ { CFG_DB_NO_TABLES, &noOfTables, false }, { CFG_DB_NO_ORDERED_INDEXES, &noOfOrderedIndexes, false }, { CFG_DB_NO_UNIQUE_HASH_INDEXES, &noOfUniqueHashIndexes, false }, + { CFG_DB_NO_TRIGGERS, &noOfTriggers, true }, { CFG_DB_NO_REPLICAS, &noOfReplicas, false }, { CFG_DB_NO_ATTRIBUTES, &noOfAttributes, false }, { CFG_DB_NO_OPS, &noOfOperations, false }, @@ -584,6 +589,18 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){ ConfigValues::Iterator it2(*ownConfig, db.m_config); it2.set(CFG_DB_NO_TABLES, noOfTables); it2.set(CFG_DB_NO_ATTRIBUTES, noOfAttributes); + { + Uint32 neededNoOfTriggers = /* types: Insert/Update/Delete/Custom */ + 3 * noOfUniqueHashIndexes + /* for unique hash indexes, I/U/D */ + 3 * NDB_MAX_ACTIVE_EVENTS + /* for events in suma, I/U/D */ + 3 * noOfTables + /* for backup, I/U/D */ + noOfOrderedIndexes; /* for ordered indexes, C */ + if (noOfTriggers < neededNoOfTriggers) + { + noOfTriggers= neededNoOfTriggers; + it2.set(CFG_DB_NO_TRIGGERS, noOfTriggers); + } + } /** * Do size calculations diff --git a/ndb/src/kernel/vm/Configuration.hpp b/ndb/src/kernel/vm/Configuration.hpp index e4cd64f5ca8..acf0e163a84 100644 --- a/ndb/src/kernel/vm/Configuration.hpp +++ b/ndb/src/kernel/vm/Configuration.hpp @@ -21,7 +21,6 @@ #include <ndb_types.h> class ConfigRetriever; -class LocalConfig; class Configuration { public: @@ -33,7 +32,7 @@ public: */ bool init(int argc, char** argv); - void fetch_configuration(LocalConfig &local_config); + void fetch_configuration(); void setupConfiguration(); void closeConfiguration(); diff --git a/ndb/src/mgmapi/LocalConfig.cpp b/ndb/src/mgmapi/LocalConfig.cpp index d0ff97cdedf..8f1e2ee8100 100644 --- a/ndb/src/mgmapi/LocalConfig.cpp +++ b/ndb/src/mgmapi/LocalConfig.cpp @@ -14,7 +14,7 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include <LocalConfig.hpp> +#include "LocalConfig.hpp" #include <NdbEnv.h> #include <NdbConfig.h> #include <NdbAutoPtr.hpp> @@ -294,4 +294,19 @@ LocalConfig::readConnectString(const char * connectString, return return_value; } +char * +LocalConfig::makeConnectString(char *buf, int sz) +{ + int p= BaseString::snprintf(buf,sz,"nodeid=%d", _ownNodeId); + for (int i = 0; (i < ids.size()) && (sz-p > 0); i++) + { + if (ids[i].type != MgmId_TCP) + continue; + p+=BaseString::snprintf(buf+p,sz-p,",%s:%d", + ids[i].name.c_str(), ids[i].port); + } + buf[sz-1]=0; + return buf; +} + template class Vector<MgmtSrvrId>; diff --git a/ndb/src/mgmapi/LocalConfig.hpp b/ndb/src/mgmapi/LocalConfig.hpp new file mode 100644 index 00000000000..c415ec1be91 --- /dev/null +++ b/ndb/src/mgmapi/LocalConfig.hpp @@ -0,0 +1,68 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef LocalConfig_H +#define LocalConfig_H + +#include <ndb_global.h> +#include <NdbOut.hpp> + +//**************************************************************************** +// Description: The class LocalConfig corresponds to the information possible +// to give in the local configuration file. +//***************************************************************************** + +enum MgmtSrvrId_Type { + MgmId_TCP = 0, + MgmId_File = 1 +}; + +struct MgmtSrvrId { + MgmtSrvrId_Type type; + BaseString name; + unsigned int port; +}; + +struct LocalConfig { + + int _ownNodeId; + Vector<MgmtSrvrId> ids; + + int error_line; + char error_msg[256]; + + LocalConfig(); + ~LocalConfig(); + bool init(const char *connectString = 0, + const char *fileName = 0); + + void printError() const; + void printUsage() const; + + void setError(int lineNumber, const char * _msg); + bool readConnectString(const char *, const char *info); + bool readFile(const char * file, bool &fopenError); + bool parseLine(char * line, int lineNumber); + + bool parseNodeId(const char *buf); + bool parseHostName(const char *buf); + bool parseFileName(const char *buf); + bool parseString(const char *buf, BaseString &err); + char * makeConnectString(char *buf, int sz); +}; + +#endif // LocalConfig_H + diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp index 51f2d7cee01..ca3a2a2186d 100644 --- a/ndb/src/mgmapi/mgmapi.cpp +++ b/ndb/src/mgmapi/mgmapi.cpp @@ -20,6 +20,7 @@ #include <LocalConfig.hpp> #include <NdbAutoPtr.hpp> +#include <NdbSleep.h> #include <NdbTCP.h> #include "mgmapi.h" #include "mgmapi_debug.h" @@ -83,8 +84,8 @@ typedef Parser<ParserDummy> Parser_t; #define NDB_MGM_MAX_ERR_DESC_SIZE 256 struct ndb_mgm_handle { - char * hostname; - unsigned short port; + char * connectstring; + int cfg_i; int connected; int last_error; @@ -95,7 +96,7 @@ struct ndb_mgm_handle { NDB_SOCKET_TYPE socket; - char cfg_ptr[sizeof(LocalConfig)]; + LocalConfig cfg; #ifdef MGMAPI_LOG FILE* logfile; @@ -148,14 +149,16 @@ ndb_mgm_create_handle() h->connected = 0; h->last_error = 0; h->last_error_line = 0; - h->hostname = 0; h->socket = NDB_INVALID_SOCKET; h->read_timeout = 50000; h->write_timeout = 100; - - new (h->cfg_ptr) LocalConfig; + h->cfg_i = 0; strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE); + + new (&(h->cfg)) LocalConfig; + h->cfg.init(0, 0); + #ifdef MGMAPI_LOG h->logfile = 0; #endif @@ -163,6 +166,23 @@ ndb_mgm_create_handle() return h; } +extern "C" +int +ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv) +{ + new (&(handle->cfg)) LocalConfig; + if (!handle->cfg.init(mgmsrv, 0) || + handle->cfg.ids.size() == 0) + { + new (&(handle->cfg)) LocalConfig; + handle->cfg.init(0, 0); /* reset the LocalCongig */ + SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, ""); + return -1; + } + handle->cfg_i= 0; + return 0; +} + /** * Destroy a handle */ @@ -175,14 +195,13 @@ ndb_mgm_destroy_handle(NdbMgmHandle * handle) if((* handle)->connected){ ndb_mgm_disconnect(* handle); } - my_free((* handle)->hostname,MYF(MY_ALLOW_ZERO_PTR)); #ifdef MGMAPI_LOG if ((* handle)->logfile != 0){ fclose((* handle)->logfile); (* handle)->logfile = 0; } #endif - ((LocalConfig*)((*handle)->cfg_ptr))->~LocalConfig(); + (*handle)->cfg.~LocalConfig(); my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR)); * handle = 0; } @@ -314,7 +333,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply, */ extern "C" int -ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv) +ndb_mgm_connect(NdbMgmHandle handle, int no_retries, + int retry_delay_in_seconds, int verbose) { SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_connect"); CHECK_HANDLE(handle, -1); @@ -331,36 +351,48 @@ ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv) /** * Do connect */ - LocalConfig *cfg= (LocalConfig*)(handle->cfg_ptr); - new (cfg) LocalConfig; - if (!cfg->init(mgmsrv, 0) || - cfg->ids.size() == 0) - { - SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, ""); - return -1; - } - + LocalConfig &cfg= handle->cfg; NDB_SOCKET_TYPE sockfd= NDB_INVALID_SOCKET; Uint32 i; - for (i = 0; i < cfg->ids.size(); i++) + while (sockfd == NDB_INVALID_SOCKET) { - if (cfg->ids[i].type != MgmId_TCP) - continue; - SocketClient s(cfg->ids[i].name.c_str(), cfg->ids[i].port); - sockfd = s.connect(); + // do all the mgmt servers + for (i = 0; i < cfg.ids.size(); i++) + { + if (cfg.ids[i].type != MgmId_TCP) + continue; + SocketClient s(cfg.ids[i].name.c_str(), cfg.ids[i].port); + sockfd = s.connect(); + if (sockfd != NDB_INVALID_SOCKET) + break; + } if (sockfd != NDB_INVALID_SOCKET) break; - } - if (sockfd == NDB_INVALID_SOCKET) - { - setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, - "Unable to connect using connectstring %s", mgmsrv); - return -1; + if (verbose > 0) { + char buf[1024]; + ndbout_c("Unable to connect with connect string: %s", + cfg.makeConnectString(buf,sizeof(buf))); + verbose= -1; + } + if (no_retries == 0) { + char buf[1024]; + setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, + "Unable to connect with connect string: %s", + cfg.makeConnectString(buf,sizeof(buf))); + return -1; + } + if (verbose == -1) { + ndbout << "retrying every " << retry_delay_in_seconds << " seconds:"; + verbose= -2; + } + NdbSleep_SecSleep(retry_delay_in_seconds); + if (verbose == -2) { + ndbout << " " << no_retries; + } + no_retries--; } - my_free(handle->hostname,MYF(MY_ALLOW_ZERO_PTR)); - handle->hostname = my_strdup(cfg->ids[i].name.c_str(),MYF(MY_WME)); - handle->port = cfg->ids[i].port; + handle->cfg_i = i; handle->socket = sockfd; handle->connected = 1; @@ -1068,7 +1100,9 @@ ndb_mgm_listen_event(NdbMgmHandle handle, int filter[]) }; CHECK_HANDLE(handle, -1); - SocketClient s(handle->hostname, handle->port); + const char *hostname= ndb_mgm_get_connected_host(handle); + int port= ndb_mgm_get_connected_port(handle); + SocketClient s(hostname, port); const NDB_SOCKET_TYPE sockfd = s.connect(); if (sockfd < 0) { setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, @@ -1613,16 +1647,37 @@ ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *cfg) extern "C" int -ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodeid, int nodetype) +ndb_mgm_get_configuration_nodeid(NdbMgmHandle handle) { + CHECK_HANDLE(handle, 0); + return handle->cfg._ownNodeId; +} + +extern "C" +int ndb_mgm_get_connected_port(NdbMgmHandle handle) +{ + return handle->cfg.ids[handle->cfg_i].port; +} +extern "C" +const char *ndb_mgm_get_connected_host(NdbMgmHandle handle) +{ + return handle->cfg.ids[handle->cfg_i].name.c_str(); +} + +extern "C" +int +ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype) +{ CHECK_HANDLE(handle, 0); CHECK_CONNECTED(handle, 0); + int nodeid= handle->cfg._ownNodeId; + Properties args; args.put("version", version); args.put("nodetype", nodetype); - args.put("nodeid", *pnodeid); + args.put("nodeid", nodeid); args.put("user", "mysqld"); args.put("password", "mysqld"); args.put("public key", "a public key"); @@ -1638,26 +1693,29 @@ ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodei prop= ndb_mgm_call(handle, reply, "get nodeid", &args); CHECK_REPLY(prop, -1); - int res= -1; + nodeid= -1; do { const char * buf; if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){ + const char *hostname= ndb_mgm_get_connected_host(handle); + unsigned port= ndb_mgm_get_connected_port(handle); BaseString err; err.assfmt("Could not alloc node id at %s port %d: %s", - handle->hostname, handle->port, buf); + hostname, port, buf); setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, err.c_str()); break; } - if(!prop->get("nodeid", pnodeid) != 0){ + Uint32 _nodeid; + if(!prop->get("nodeid", &_nodeid) != 0){ ndbout_c("ERROR Message: <nodeid Unspecified>\n"); break; } - res= 0; + nodeid= _nodeid; }while(0); delete prop; - return res; + return nodeid; } /***************************************************************************** diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp index bdeb885ed8b..54beaa49d3f 100644 --- a/ndb/src/mgmclient/CommandInterpreter.cpp +++ b/ndb/src/mgmclient/CommandInterpreter.cpp @@ -153,7 +153,6 @@ private: NdbMgmHandle m_mgmsrv; bool connected; - const char *host; int try_reconnect; #ifdef HAVE_GLOBAL_REPLICATION NdbRepHandle m_repserver; @@ -379,15 +378,16 @@ CommandInterpreter::CommandInterpreter(const char *_host) m_mgmsrv = ndb_mgm_create_handle(); if(m_mgmsrv == NULL) { ndbout_c("Cannot create handle to management server."); + exit(-1); + } + if (ndb_mgm_set_connectstring(m_mgmsrv, _host)) + { printError(); + exit(-1); } connected = false; try_reconnect = 0; - if (_host) - host= my_strdup(_host,MYF(MY_WME)); - else - host= 0; #ifdef HAVE_GLOBAL_REPLICATION rep_host = NULL; m_repserver = NULL; @@ -402,8 +402,6 @@ CommandInterpreter::~CommandInterpreter() { connected = false; ndb_mgm_destroy_handle(&m_mgmsrv); - my_free((char *)host,MYF(MY_ALLOW_ZERO_PTR)); - host = NULL; } static bool @@ -438,18 +436,8 @@ bool CommandInterpreter::connect() { if(!connected) { - int tries = try_reconnect; // tries == 0 => infinite - while(!connected) { - if(ndb_mgm_connect(m_mgmsrv, host) == -1) { - ndbout << "Cannot connect to management server (" << host << ")."; - tries--; - if (tries == 0) - break; - ndbout << "Retrying in 5 seconds." << endl; - NdbSleep_SecSleep(5); - } else - connected = true; - } + if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1)) + connected = true; } return connected; } diff --git a/ndb/src/mgmclient/main.cpp b/ndb/src/mgmclient/main.cpp index 401a9198f30..f32cc683296 100644 --- a/ndb/src/mgmclient/main.cpp +++ b/ndb/src/mgmclient/main.cpp @@ -30,9 +30,10 @@ extern "C" int add_history(const char *command); /* From readline directory */ #include <NdbMain.h> #include <NdbHost.h> +#include <BaseString.hpp> +#include <NdbOut.hpp> #include <mgmapi.h> #include <ndb_version.h> -#include <LocalConfig.hpp> #include "ndb_mgmclient.hpp" @@ -138,7 +139,7 @@ int main(int argc, char** argv){ NDB_INIT(argv[0]); const char *_host = 0; int _port = 0; - const char *load_default_groups[]= { "ndb_mgm",0 }; + const char *load_default_groups[]= { "mysql_cluster","ndb_mgm",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index a49b29af275..81b5eb9dfb3 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -399,16 +399,20 @@ MgmtSrvr::getPort() const { } /* Constructor */ -MgmtSrvr::MgmtSrvr(NodeId nodeId, - SocketServer *socket_server, - const BaseString &configFilename, - LocalConfig &local_config, - Config * config): +int MgmtSrvr::init() +{ + if ( _ownNodeId > 0) + return 0; + return -1; +} + +MgmtSrvr::MgmtSrvr(SocketServer *socket_server, + const char *config_filename, + const char *connect_string) : _blockNumber(1), // Hard coded block number since it makes it easy to send // signals to other management servers. m_socket_server(socket_server), _ownReference(0), - m_local_config(local_config), theSignalIdleList(NULL), theWaitState(WAIT_SUBSCRIBE_CONF), m_statisticsListner(this) @@ -416,6 +420,8 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, DBUG_ENTER("MgmtSrvr::MgmtSrvr"); + _ownNodeId= 0; + _config = NULL; _isStopThread = false; @@ -426,12 +432,43 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, theFacade = 0; m_newConfig = NULL; - m_configFilename = configFilename; + m_configFilename.assign(config_filename); m_nextConfigGenerationNumber = 0; - _config = (config == 0 ? readConfig() : config); - + m_config_retriever= new ConfigRetriever(connect_string, + NDB_VERSION, NDB_MGM_NODE_TYPE_MGM); + + // first try to allocate nodeid from another management server + if(m_config_retriever->do_connect(0,0,0) == 0) + { + int tmp_nodeid= 0; + tmp_nodeid= m_config_retriever->allocNodeId(0 /*retry*/,0 /*delay*/); + if (tmp_nodeid == 0) + { + ndbout_c(m_config_retriever->getErrorString()); + exit(-1); + } + // read config from other managent server + _config= fetchConfig(); + if (_config == 0) + { + ndbout << m_config_retriever->getErrorString() << endl; + exit(-1); + } + _ownNodeId= tmp_nodeid; + } + + if (_ownNodeId == 0) + { + // read config locally + _config= readConfig(); + if (_config == 0) { + ndbout << "Unable to read config file" << endl; + exit(-1); + } + } + theMgmtWaitForResponseCondPtr = NdbCondition_Create(); m_configMutex = NdbMutex_Create(); @@ -443,9 +480,11 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, nodeTypes[i] = (enum ndb_mgm_node_type)-1; m_connect_address[i].s_addr= 0; } + { - ndb_mgm_configuration_iterator * iter = ndb_mgm_create_configuration_iterator - (config->m_configValues, CFG_SECTION_NODE); + ndb_mgm_configuration_iterator + *iter = ndb_mgm_create_configuration_iterator(_config->m_configValues, + CFG_SECTION_NODE); for(ndb_mgm_first(iter); ndb_mgm_valid(iter); ndb_mgm_next(iter)){ unsigned type, id; if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0) @@ -478,8 +517,6 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, } _props = NULL; - _ownNodeId= 0; - NodeId tmp= nodeId; BaseString error_string; if ((m_node_id_mutex = NdbMutex_Create()) == 0) @@ -488,43 +525,25 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, exit(-1); } -#if 0 - char my_hostname[256]; - struct sockaddr_in tmp_addr; - SOCKET_SIZE_TYPE addrlen= sizeof(tmp_addr); - if (!g_no_nodeid_checks) { - if (gethostname(my_hostname, sizeof(my_hostname))) { - ndbout << "error: gethostname() - " << strerror(errno) << endl; - exit(-1); - } - if (Ndb_getInAddr(&(((sockaddr_in*)&tmp_addr)->sin_addr),my_hostname)) { - ndbout << "error: Ndb_getInAddr(" << my_hostname << ") - " - << strerror(errno) << endl; + if (_ownNodeId == 0) // we did not get node id from other server + { + NodeId tmp= m_config_retriever->get_configuration_nodeid(); + + if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, + 0, 0, error_string)){ + ndbout << "Unable to obtain requested nodeid: " + << error_string.c_str() << endl; exit(-1); } + _ownNodeId = tmp; } - if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, - (struct sockaddr *)&tmp_addr, - &addrlen, error_string)){ - ndbout << "Unable to obtain requested nodeid: " - << error_string.c_str() << endl; - exit(-1); - } -#else - if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, - 0, 0, error_string)){ - ndbout << "Unable to obtain requested nodeid: " - << error_string.c_str() << endl; - exit(-1); - } -#endif - _ownNodeId = tmp; { DBUG_PRINT("info", ("verifyConfig")); - ConfigRetriever cr(m_local_config, NDB_VERSION, NDB_MGM_NODE_TYPE_MGM); - if (!cr.verifyConfig(config->m_configValues, _ownNodeId)) { - ndbout << cr.getErrorString() << endl; + if (!m_config_retriever->verifyConfig(_config->m_configValues, + _ownNodeId)) + { + ndbout << m_config_retriever->getErrorString() << endl; exit(-1); } } @@ -657,6 +676,8 @@ MgmtSrvr::~MgmtSrvr() NdbThread_WaitFor(m_signalRecvThread, &res); NdbThread_Destroy(&m_signalRecvThread); } + if (m_config_retriever) + delete m_config_retriever; } //**************************************************************************** diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp index b3257491123..2ab11250d81 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -175,11 +175,10 @@ public: /* Constructor */ - MgmtSrvr(NodeId nodeId, /* Local nodeid */ - SocketServer *socket_server, - const BaseString &config_filename, /* Where to save config */ - LocalConfig &local_config, /* Ndb.cfg filename */ - Config * config); + MgmtSrvr(SocketServer *socket_server, + const char *config_filename, /* Where to save config */ + const char *connect_string); + int init(); NodeId getOwnNodeId() const {return _ownNodeId;}; /** @@ -538,7 +537,6 @@ private: NdbMutex *m_configMutex; const Config * _config; Config * m_newConfig; - LocalConfig &m_local_config; BaseString m_configFilename; Uint32 m_nextConfigGenerationNumber; @@ -755,6 +753,9 @@ private: Config *_props; int send(class NdbApiSignal* signal, Uint32 node, Uint32 node_type); + + ConfigRetriever *m_config_retriever; + public: /** * This method does not exist diff --git a/ndb/src/mgmsrv/MgmtSrvrConfig.cpp b/ndb/src/mgmsrv/MgmtSrvrConfig.cpp index 1d51061e909..6c4b4e9ae3c 100644 --- a/ndb/src/mgmsrv/MgmtSrvrConfig.cpp +++ b/ndb/src/mgmsrv/MgmtSrvrConfig.cpp @@ -272,30 +272,20 @@ MgmtSrvr::saveConfig(const Config *conf) { Config * MgmtSrvr::readConfig() { - Config *conf = NULL; - if(m_configFilename.length() != 0) { - /* Use config file */ - InitConfigFileParser parser; - conf = parser.parseConfig(m_configFilename.c_str()); - - if(conf == NULL) { - /* Try to get configuration from other MGM server */ - return fetchConfig(); - } - } + Config *conf; + InitConfigFileParser parser; + conf = parser.parseConfig(m_configFilename.c_str()); return conf; } Config * MgmtSrvr::fetchConfig() { - ConfigRetriever cr(m_local_config, NDB_VERSION, NODE_TYPE_MGM); - struct ndb_mgm_configuration * tmp = cr.getConfig(); + struct ndb_mgm_configuration * tmp = m_config_retriever->getConfig(); if(tmp != 0){ Config * conf = new Config(); conf->m_configValues = tmp; return conf; } - return 0; } diff --git a/ndb/src/mgmsrv/main.cpp b/ndb/src/mgmsrv/main.cpp index b588a2d0933..84ff98626e5 100644 --- a/ndb/src/mgmsrv/main.cpp +++ b/ndb/src/mgmsrv/main.cpp @@ -62,7 +62,6 @@ struct MgmGlobals { int non_interactive; int interactive; const char * config_filename; - const char * local_config_filename; /** Stuff found in environment or in local config */ NodeId localNodeId; @@ -70,9 +69,6 @@ struct MgmGlobals { char * interface_name; int port; - /** The configuration of the cluster */ - Config * cluster_config; - /** The Mgmt Server */ MgmtSrvr * mgmObject; @@ -86,9 +82,6 @@ static MgmGlobals glob; /****************************************************************************** * Function prototypes ******************************************************************************/ -static bool readLocalConfig(); -static bool readGlobalConfig(); - /** * Global variables */ @@ -100,16 +93,28 @@ static char *opt_connect_str= 0; static struct my_option my_long_options[] = { - NDB_STD_OPTS("ndb_mgm"), +#ifndef DBUG_OFF + { "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.", + 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 }, +#endif + { "usage", '?', "Display this help and exit.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "help", '?', "Display this help and exit.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "version", 'V', "Output version information and exit.", 0, 0, 0, + GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "connect-string", 1023, + "Set connect string for connecting to ndb_mgmd. " + "<constr>=\"host=<hostname:port>[;nodeid=<id>]\". " + "Overides specifying entries in NDB_CONNECTSTRING and config file", + (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, + GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "config-file", 'f', "Specify cluster configuration file", (gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "daemon", 'd', "Run ndb_mgmd in daemon mode (default)", (gptr*) &glob.daemon, (gptr*) &glob.daemon, 0, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 }, - { "l", 'l', "Specify configuration file connect string (default Ndb.cfg if available)", - (gptr*) &glob.local_config_filename, (gptr*) &glob.local_config_filename, 0, - GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "interactive", 256, "Run interactive. Not supported but provided for testing purposes", (gptr*) &glob.interactive, (gptr*) &glob.interactive, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, @@ -173,7 +178,7 @@ int main(int argc, char** argv) global_mgmt_server_check = 1; glob.config_filename= "config.ini"; - const char *load_default_groups[]= { "ndb_mgmd",0 }; + const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; @@ -189,29 +194,16 @@ int main(int argc, char** argv) MgmApiService * mapi = new MgmApiService(); - /**************************** - * Read configuration files * - ****************************/ - LocalConfig local_config; - if(!local_config.init(opt_connect_str,glob.local_config_filename)){ - local_config.printError(); - goto error_end; - } - glob.localNodeId = local_config._ownNodeId; + glob.mgmObject = new MgmtSrvr(glob.socketServer, + glob.config_filename, + opt_connect_str); - if (!readGlobalConfig()) + if (glob.mgmObject->init()) goto error_end; - glob.mgmObject = new MgmtSrvr(glob.localNodeId, glob.socketServer, - BaseString(glob.config_filename), - local_config, - glob.cluster_config); - chdir(NdbConfig_get_path(0)); - glob.cluster_config = 0; glob.localNodeId= glob.mgmObject->getOwnNodeId(); - if (glob.localNodeId == 0) { goto error_end; } @@ -322,9 +314,7 @@ MgmGlobals::MgmGlobals(){ // Default values port = 0; config_filename = NULL; - local_config_filename = NULL; interface_name = 0; - cluster_config = 0; daemon = 1; non_interactive = 0; interactive = 0; @@ -337,27 +327,6 @@ MgmGlobals::~MgmGlobals(){ delete socketServer; if (mgmObject) delete mgmObject; - if (cluster_config) - delete cluster_config; if (interface_name) free(interface_name); } - -/** - * @fn readGlobalConfig - * @param glob : Global variables - * @return true if success, false otherwise. - */ -static bool -readGlobalConfig() { - if(glob.config_filename == NULL) - return false; - - /* Use config file */ - InitConfigFileParser parser; - glob.cluster_config = parser.parseConfig(glob.config_filename); - if(glob.cluster_config == 0){ - return false; - } - return true; -} diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index 719c5bef49e..f4bb000300a 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -1122,8 +1122,11 @@ NdbConnection::getNdbIndexScanOperation(const NdbIndexImpl* index, if (indexTable != 0){ NdbIndexScanOperation* tOp = getNdbScanOperation((NdbTableImpl *) indexTable); - tOp->m_currentTable = table; - if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor; + if(tOp) + { + tOp->m_currentTable = table; + tOp->m_cursor_type = NdbScanOperation::IndexCursor; + } return tOp; } else { setOperationErrorCodeAbort(theNdb->theError.code); @@ -1618,9 +1621,6 @@ from other transactions. /** * There's always a TCKEYCONF when using IgnoreError */ -#ifdef VM_TRACE - ndbout_c("Not completing transaction 2"); -#endif return -1; } /**********************************************************************/ @@ -1872,9 +1872,6 @@ NdbConnection::OpCompleteFailure(Uint8 abortOption, bool setFailure) /** * There's always a TCKEYCONF when using IgnoreError */ -#ifdef VM_TRACE - ndbout_c("Not completing transaction"); -#endif return -1; } diff --git a/ndb/src/ndbapi/NdbConnectionScan.cpp b/ndb/src/ndbapi/NdbConnectionScan.cpp index 3fe8993a42b..a1a220caacf 100644 --- a/ndb/src/ndbapi/NdbConnectionScan.cpp +++ b/ndb/src/ndbapi/NdbConnectionScan.cpp @@ -57,12 +57,18 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){ if(checkState_TransId(&ref->transId1)){ theScanningOp->theError.code = ref->errorCode; + theScanningOp->execCLOSE_SCAN_REP(); if(!ref->closeNeeded){ - theScanningOp->execCLOSE_SCAN_REP(); return 0; } - assert(theScanningOp->m_sent_receivers_count); + + /** + * Setup so that close_impl will actually perform a close + * and not "close scan"-optimze it away + */ theScanningOp->m_conf_receivers_count++; + theScanningOp->m_conf_receivers[0] = theScanningOp->m_receivers[0]; + theScanningOp->m_conf_receivers[0]->m_tcPtrI = ~0; return 0; } else { #ifdef NDB_NO_DROPPED_SIGNAL @@ -97,7 +103,7 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, theScanningOp->execCLOSE_SCAN_REP(); return 0; } - + for(Uint32 i = 0; i<len; i += 3){ Uint32 opCount, totalLen; Uint32 ptrI = * ops++; @@ -109,24 +115,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, void * tPtr = theNdb->int2void(ptrI); assert(tPtr); // For now NdbReceiver* tOp = theNdb->void2rec(tPtr); - if (tOp && tOp->checkMagicNumber()){ - if(tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)){ - /** - * - */ - theScanningOp->receiver_delivered(tOp); - } else if(info == ScanTabConf::EndOfData){ + if (tOp && tOp->checkMagicNumber()) + { + if (tcPtrI == RNIL && opCount == 0) theScanningOp->receiver_completed(tOp); - } - } - } - if (conf->requestInfo & ScanTabConf::EndOfData) { - if(theScanningOp->m_ordered) - theScanningOp->m_api_receivers_count = 0; - if(theScanningOp->m_api_receivers_count + - theScanningOp->m_conf_receivers_count + - theScanningOp->m_sent_receivers_count){ - abort(); + else if (tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)) + theScanningOp->receiver_delivered(tOp); } } return 0; diff --git a/ndb/src/ndbapi/NdbResultSet.cpp b/ndb/src/ndbapi/NdbResultSet.cpp index f270584d227..d9d71464026 100644 --- a/ndb/src/ndbapi/NdbResultSet.cpp +++ b/ndb/src/ndbapi/NdbResultSet.cpp @@ -44,10 +44,10 @@ void NdbResultSet::init() { } -int NdbResultSet::nextResult(bool fetchAllowed) +int NdbResultSet::nextResult(bool fetchAllowed, bool forceSend) { int res; - if ((res = m_operation->nextResult(fetchAllowed)) == 0) { + if ((res = m_operation->nextResult(fetchAllowed, forceSend)) == 0) { // handle blobs NdbBlob* tBlob = m_operation->theBlobList; while (tBlob != 0) { @@ -67,9 +67,9 @@ int NdbResultSet::nextResult(bool fetchAllowed) return res; } -void NdbResultSet::close() +void NdbResultSet::close(bool forceSend) { - m_operation->closeScan(); + m_operation->closeScan(forceSend); } NdbOperation* @@ -98,6 +98,6 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){ } int -NdbResultSet::restart(){ - return m_operation->restart(); +NdbResultSet::restart(bool forceSend){ + return m_operation->restart(forceSend); } diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 4b10ebb10cd..db0c294708d 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -35,6 +35,8 @@ #include <signaldata/AttrInfo.hpp> #include <signaldata/TcKeyReq.hpp> +#define DEBUG_NEXT_RESULT 0 + NdbScanOperation::NdbScanOperation(Ndb* aNdb) : NdbOperation(aNdb), m_resultSet(0), @@ -275,6 +277,9 @@ NdbScanOperation::fix_receivers(Uint32 parallel){ void NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ if(theError.code == 0){ + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver_delivered"); + Uint32 idx = tRec->m_list_index; Uint32 last = m_sent_receivers_count - 1; if(idx != last){ @@ -298,6 +303,9 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ void NdbScanOperation::receiver_completed(NdbReceiver* tRec){ if(theError.code == 0){ + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver_completed"); + Uint32 idx = tRec->m_list_index; Uint32 last = m_sent_receivers_count - 1; if(idx != last){ @@ -445,12 +453,12 @@ NdbScanOperation::executeCursor(int nodeId){ return -1; } -#define DEBUG_NEXT_RESULT 0 -int NdbScanOperation::nextResult(bool fetchAllowed) +int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend) { if(m_ordered) - return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed); + return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed, + forceSend); /** * Check current receiver @@ -487,7 +495,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); Uint32 seq = theNdbCon->theNodeSequence; - if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){ + if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false, + forceSend) == 0){ idx = m_current_api_receiver; last = m_api_receivers_count; @@ -578,8 +587,9 @@ int NdbScanOperation::nextResult(bool fetchAllowed) } int -NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ - if(cnt > 0 || stopScanFlag){ +NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, + bool forceSend){ + if(cnt > 0){ NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); @@ -595,38 +605,57 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ */ Uint32 last = m_sent_receivers_count; Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4); + Uint32 sent = 0; for(Uint32 i = 0; i<cnt; i++){ NdbReceiver * tRec = m_api_receivers[i]; - m_sent_receivers[last+i] = tRec; - tRec->m_list_index = last+i; - prep_array[i] = tRec->m_tcPtrI; - tRec->prepareSend(); + if((prep_array[sent] = tRec->m_tcPtrI) != RNIL) + { + m_sent_receivers[last+sent] = tRec; + tRec->m_list_index = last+sent; + tRec->prepareSend(); + sent++; + } } - memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*)); + memmove(m_api_receivers, m_api_receivers+cnt, + (theParallelism-cnt) * sizeof(char*)); - Uint32 nodeId = theNdbCon->theDBnode; - TransporterFacade * tp = TransporterFacade::instance(); - int ret; - if(cnt > 21){ - tSignal.setLength(4); - LinearSectionPtr ptr[3]; - ptr[0].p = prep_array; - ptr[0].sz = cnt; - ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); - } else { - tSignal.setLength(4+cnt); - ret = tp->sendSignal(&tSignal, nodeId); + int ret = 0; + if(sent) + { + Uint32 nodeId = theNdbCon->theDBnode; + TransporterFacade * tp = TransporterFacade::instance(); + if(cnt > 21){ + tSignal.setLength(4); + LinearSectionPtr ptr[3]; + ptr[0].p = prep_array; + ptr[0].sz = sent; + ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); + } else { + tSignal.setLength(4+sent); + ret = tp->sendSignal(&tSignal, nodeId); + } } + + if (!ret) checkForceSend(forceSend); - m_sent_receivers_count = last + cnt + stopScanFlag; + m_sent_receivers_count = last + sent; m_api_receivers_count -= cnt; m_current_api_receiver = 0; - + return ret; } return 0; } +void NdbScanOperation::checkForceSend(bool forceSend) +{ + if (forceSend) { + TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber); + } else { + TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber); + }//if +} + int NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId) { @@ -642,7 +671,7 @@ NdbScanOperation::doSend(int ProcessorId) return 0; } -void NdbScanOperation::closeScan() +void NdbScanOperation::closeScan(bool forceSend) { if(m_transConnection){ if(DEBUG_NEXT_RESULT) @@ -657,7 +686,7 @@ void NdbScanOperation::closeScan() TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); - close_impl(tp); + close_impl(tp, forceSend); } while(0); @@ -673,6 +702,7 @@ NdbScanOperation::execCLOSE_SCAN_REP(){ m_api_receivers_count = 0; m_conf_receivers_count = 0; m_sent_receivers_count = 0; + m_current_api_receiver = m_ordered ? theParallelism : 0; } void NdbScanOperation::release() @@ -1293,7 +1323,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, } int -NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ +NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, + bool forceSend){ Uint32 u_idx = 0, u_last = 0; Uint32 s_idx = m_current_api_receiver; // first sorted @@ -1319,7 +1350,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ Guard guard(tp->theMutexPtr); Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; - if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){ + if(seq == tp->getNodeSequence(nodeId) && + !send_next_scan_ordered(s_idx, forceSend)){ Uint32 tmp = m_sent_receivers_count; s_idx = m_current_api_receiver; while(m_sent_receivers_count > 0 && !theError.code){ @@ -1408,14 +1440,26 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ } int -NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ +NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){ if(idx == theParallelism) return 0; + NdbReceiver* tRec = m_api_receivers[idx]; NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); + Uint32 last = m_sent_receivers_count; Uint32* theData = tSignal.getDataPtrSend(); + Uint32* prep_array = theData + 4; + + m_current_api_receiver = idx + 1; + if((prep_array[0] = tRec->m_tcPtrI) == RNIL) + { + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver completed, don't send"); + return 0; + } + theData[0] = theNdbCon->theTCConPtr; theData[1] = 0; Uint64 transId = theNdbCon->theTransactionId; @@ -1425,35 +1469,35 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ /** * Prepare ops */ - Uint32 last = m_sent_receivers_count; - Uint32 * prep_array = theData + 4; - - NdbReceiver * tRec = m_api_receivers[idx]; m_sent_receivers[last] = tRec; tRec->m_list_index = last; - prep_array[0] = tRec->m_tcPtrI; tRec->prepareSend(); - m_sent_receivers_count = last + 1; - m_current_api_receiver = idx + 1; Uint32 nodeId = theNdbCon->theDBnode; TransporterFacade * tp = TransporterFacade::instance(); tSignal.setLength(4+1); - return tp->sendSignal(&tSignal, nodeId); + int ret= tp->sendSignal(&tSignal, nodeId); + if (!ret) checkForceSend(forceSend); + return ret; } int -NdbScanOperation::close_impl(TransporterFacade* tp){ +NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; - if(seq != tp->getNodeSequence(nodeId)){ + if(seq != tp->getNodeSequence(nodeId)) + { theNdbCon->theReleaseOnClose = true; return -1; } - while(theError.code == 0 && m_sent_receivers_count){ + /** + * Wait for outstanding + */ + while(theError.code == 0 && m_sent_receivers_count) + { theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1471,18 +1515,52 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ } } - if(m_api_receivers_count+m_conf_receivers_count){ - // Send close scan - if(send_next_scan(0, true) == -1){ // Close scan - theNdbCon->theReleaseOnClose = true; - return -1; - } + /** + * move all conf'ed into api + * so that send_next_scan can check if they needs to be closed + */ + Uint32 api = m_api_receivers_count; + Uint32 conf = m_conf_receivers_count; + + if(m_ordered) + { + /** + * Ordered scan, keep the m_api_receivers "to the right" + */ + memmove(m_api_receivers, m_api_receivers+m_current_api_receiver, + (theParallelism - m_current_api_receiver) * sizeof(char*)); + api = (theParallelism - m_current_api_receiver); + m_api_receivers_count = api; + } + + if(DEBUG_NEXT_RESULT) + ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d", + m_ordered, api, conf, + m_sent_receivers_count, m_current_api_receiver, theParallelism); + + if(api+conf) + { + /** + * There's something to close + * setup m_api_receivers (for send_next_scan) + */ + memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*)); + m_api_receivers_count = api + conf; + m_conf_receivers_count = 0; + } + + // Send close scan + if(send_next_scan(api+conf, true, forceSend) == -1) + { + theNdbCon->theReleaseOnClose = true; + return -1; } /** * wait for close scan conf */ - while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){ + while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count) + { theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1499,6 +1577,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ return -1; } } + return 0; } @@ -1520,7 +1599,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){ } int -NdbScanOperation::restart() +NdbScanOperation::restart(bool forceSend) { TransporterFacade* tp = TransporterFacade::instance(); @@ -1529,7 +1608,7 @@ NdbScanOperation::restart() { int res; - if((res= close_impl(tp))) + if((res= close_impl(tp, forceSend))) { return res; } @@ -1548,13 +1627,13 @@ NdbScanOperation::restart() } int -NdbIndexScanOperation::reset_bounds(){ +NdbIndexScanOperation::reset_bounds(bool forceSend){ int res; { TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); - res= close_impl(tp); + res= close_impl(tp, forceSend); } if(!res) diff --git a/ndb/src/ndbapi/ndb_cluster_connection.cpp b/ndb/src/ndbapi/ndb_cluster_connection.cpp index 4c42fe1aeef..b2043b2c2c1 100644 --- a/ndb/src/ndbapi/ndb_cluster_connection.cpp +++ b/ndb/src/ndbapi/ndb_cluster_connection.cpp @@ -45,7 +45,6 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) else m_connect_string= 0; m_config_retriever= 0; - m_local_config= 0; m_connect_thread= 0; m_connect_callback= 0; @@ -125,38 +124,31 @@ int Ndb_cluster_connection::connect(int reconnect) do { if (m_config_retriever == 0) { - if (m_local_config == 0) { - m_local_config= new LocalConfig(); - if (!m_local_config->init(m_connect_string,0)) { - ndbout_c("Configuration error: Unable to retrieve local config"); - m_local_config->printError(); - m_local_config->printUsage(); - DBUG_RETURN(-1); - } - } m_config_retriever= - new ConfigRetriever(*m_local_config, NDB_VERSION, NODE_TYPE_API); + new ConfigRetriever(m_connect_string, NDB_VERSION, NODE_TYPE_API); + if (m_config_retriever->hasError()) + { + printf("Could not connect initialize handle to management server", + m_config_retriever->getErrorString()); + DBUG_RETURN(-1); + } } else if (reconnect == 0) DBUG_RETURN(0); if (reconnect) { - int r= m_config_retriever->do_connect(1); + int r= m_config_retriever->do_connect(0,0,0); if (r == 1) DBUG_RETURN(1); // mgmt server not up yet if (r == -1) break; } else - if(m_config_retriever->do_connect() == -1) + if(m_config_retriever->do_connect(12,5,1) == -1) break; - Uint32 nodeId = m_config_retriever->allocNodeId(); - for(Uint32 i = 0; nodeId == 0 && i<5; i++){ - NdbSleep_SecSleep(3); - nodeId = m_config_retriever->allocNodeId(); - } + Uint32 nodeId = m_config_retriever->allocNodeId(4/*retries*/,3/*delay*/); if(nodeId == 0) break; ndb_mgm_configuration * props = m_config_retriever->getConfig(); @@ -200,8 +192,6 @@ Ndb_cluster_connection::~Ndb_cluster_connection() my_free(m_connect_string,MYF(MY_ALLOW_ZERO_PTR)); if (m_config_retriever) delete m_config_retriever; - if (m_local_config) - delete m_local_config; DBUG_VOID_RETURN; } diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c index e08b80f2433..bc49358cc63 100644 --- a/ndb/src/ndbapi/ndberror.c +++ b/ndb/src/ndbapi/ndberror.c @@ -426,7 +426,8 @@ ErrorBundle ErrorCodes[] = { { 4267, IE, "Corrupted blob value" }, { 4268, IE, "Error in blob head update forced rollback of transaction" }, { 4268, IE, "Unknown blob error" }, - { 4269, IE, "No connection to ndb management server" } + { 4269, IE, "No connection to ndb management server" }, + { 4335, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" } }; static |