summaryrefslogtreecommitdiff
path: root/ndb/src
diff options
context:
space:
mode:
authorunknown <brian@avenger.(none)>2004-11-24 12:34:44 -0800
committerunknown <brian@avenger.(none)>2004-11-24 12:34:44 -0800
commit80282a9418baf94eccaac7af6ec389a87f3f79b8 (patch)
tree184c2f723a69e1c6ecf790a3a85c9dcb045efea1 /ndb/src
parent0f61fec47e427e7780dbe67900b173a5fd99ac73 (diff)
parentc3272ae7186bb59a406a671080b98766dc8e4064 (diff)
downloadmariadb-git-80282a9418baf94eccaac7af6ec389a87f3f79b8.tar.gz
Merging 4.1 to 5.0
Build-tools/Do-compile: Auto merged client/Makefile.am: Auto merged client/mysqladmin.cc: Auto merged configure.in: Auto merged ndb/src/common/util/version.c: Auto merged sql/ha_ndbcluster.cc: Auto merged sql/ha_ndbcluster.h: Auto merged ndb/src/mgmsrv/main.cpp: Resolved NDB conflict between 4.1 and 5.0
Diffstat (limited to 'ndb/src')
-rw-r--r--ndb/src/common/debugger/signaldata/ScanTab.cpp5
-rw-r--r--ndb/src/common/mgmcommon/ConfigRetriever.cpp163
-rw-r--r--ndb/src/common/util/version.c1
-rw-r--r--ndb/src/kernel/blocks/dbdih/Dbdih.hpp1
-rw-r--r--ndb/src/kernel/blocks/dbdih/DbdihMain.cpp20
-rw-r--r--ndb/src/kernel/blocks/dblqh/Dblqh.hpp24
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhMain.cpp96
-rw-r--r--ndb/src/kernel/blocks/dbtc/Dbtc.hpp11
-rw-r--r--ndb/src/kernel/blocks/dbtc/DbtcMain.cpp102
-rw-r--r--ndb/src/kernel/blocks/suma/Suma.cpp2
-rw-r--r--ndb/src/kernel/main.cpp12
-rw-r--r--ndb/src/kernel/vm/Configuration.cpp41
-rw-r--r--ndb/src/kernel/vm/Configuration.hpp3
-rw-r--r--ndb/src/mgmapi/LocalConfig.cpp17
-rw-r--r--ndb/src/mgmapi/LocalConfig.hpp68
-rw-r--r--ndb/src/mgmapi/mgmapi.cpp138
-rw-r--r--ndb/src/mgmclient/CommandInterpreter.cpp26
-rw-r--r--ndb/src/mgmclient/main.cpp5
-rw-r--r--ndb/src/mgmsrv/MgmtSrvr.cpp109
-rw-r--r--ndb/src/mgmsrv/MgmtSrvr.hpp13
-rw-r--r--ndb/src/mgmsrv/MgmtSrvrConfig.cpp18
-rw-r--r--ndb/src/mgmsrv/main.cpp73
-rw-r--r--ndb/src/ndbapi/NdbConnection.cpp13
-rw-r--r--ndb/src/ndbapi/NdbConnectionScan.cpp34
-rw-r--r--ndb/src/ndbapi/NdbResultSet.cpp12
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp183
-rw-r--r--ndb/src/ndbapi/ndb_cluster_connection.cpp30
-rw-r--r--ndb/src/ndbapi/ndberror.c3
28 files changed, 722 insertions, 501 deletions
diff --git a/ndb/src/common/debugger/signaldata/ScanTab.cpp b/ndb/src/common/debugger/signaldata/ScanTab.cpp
index 72a4d9f94b9..0755ee0a856 100644
--- a/ndb/src/common/debugger/signaldata/ScanTab.cpp
+++ b/ndb/src/common/debugger/signaldata/ScanTab.cpp
@@ -30,13 +30,14 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
fprintf(output, " apiConnectPtr: H\'%.8x",
sig->apiConnectPtr);
fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo);
- fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u\n",
+ fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u ReadCommitted: %u\n",
sig->getParallelism(requestInfo),
sig->getScanBatch(requestInfo),
sig->getLockMode(requestInfo),
+ sig->getKeyinfoFlag(requestInfo),
sig->getHoldLockFlag(requestInfo),
sig->getRangeScanFlag(requestInfo),
- sig->getKeyinfoFlag(requestInfo));
+ sig->getReadCommittedFlag(requestInfo));
Uint32 keyLen = (sig->attrLenKeyLen >> 16);
Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF);
diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp
index a1b979f62d8..0af5eb2f83c 100644
--- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp
+++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp
@@ -20,7 +20,6 @@
#include <ConfigRetriever.hpp>
#include <SocketServer.hpp>
-#include "LocalConfig.hpp"
#include <NdbSleep.h>
#include <NdbOut.hpp>
@@ -45,90 +44,62 @@
//****************************************************************************
//****************************************************************************
-ConfigRetriever::ConfigRetriever(LocalConfig &local_config,
+ConfigRetriever::ConfigRetriever(const char * _connect_string,
Uint32 version, Uint32 node_type)
- : _localConfig(local_config)
{
- m_handle= 0;
m_version = version;
m_node_type = node_type;
- _ownNodeId = _localConfig._ownNodeId;
-}
+ _ownNodeId= 0;
-ConfigRetriever::~ConfigRetriever(){
+ m_handle= ndb_mgm_create_handle();
+ if (m_handle == 0) {
+ setError(CR_ERROR, "Unable to allocate mgm handle");
+ return;
+ }
+
+ if (ndb_mgm_set_connectstring(m_handle, _connect_string))
+ {
+ setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
+ return;
+ }
+ resetError();
+}
+
+ConfigRetriever::~ConfigRetriever()
+{
if (m_handle) {
ndb_mgm_disconnect(m_handle);
ndb_mgm_destroy_handle(&m_handle);
}
}
+Uint32
+ConfigRetriever::get_configuration_nodeid() const
+{
+ return ndb_mgm_get_configuration_nodeid(m_handle);
+}
+
+Uint32 ConfigRetriever::get_mgmd_port() const
+{
+ return ndb_mgm_get_connected_port(m_handle);
+}
+
+const char *ConfigRetriever::get_mgmd_host() const
+{
+ return ndb_mgm_get_connected_host(m_handle);
+}
//****************************************************************************
//****************************************************************************
int
-ConfigRetriever::do_connect(int exit_on_connect_failure){
-
- m_mgmd_port= 0;
- m_mgmd_host= 0;
-
- if(!m_handle)
- m_handle= ndb_mgm_create_handle();
-
- if (m_handle == 0) {
- setError(CR_ERROR, "Unable to allocate mgm handle");
- return -1;
- }
-
- int retry = 1;
- int retry_max = 12; // Max number of retry attempts
- int retry_interval= 5; // Seconds between each retry
- while(retry < retry_max){
- Uint32 type = CR_ERROR;
- BaseString tmp;
- for (unsigned int i = 0; i<_localConfig.ids.size(); i++){
- MgmtSrvrId * m = &_localConfig.ids[i];
- DBUG_PRINT("info",("trying %s:%d",
- m->name.c_str(),
- m->port));
- switch(m->type){
- case MgmId_TCP:
- tmp.assfmt("%s:%d", m->name.c_str(), m->port);
- if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) {
- m_mgmd_port= m->port;
- m_mgmd_host= m->name.c_str();
- DBUG_PRINT("info",("connected to ndb_mgmd at %s:%d",
- m_mgmd_host,
- m_mgmd_port));
- return 0;
- }
- setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle));
- case MgmId_File:
- break;
- }
- }
- if(latestErrorType == CR_RETRY){
- DBUG_PRINT("info",("CR_RETRY"));
- if (exit_on_connect_failure)
- return 1;
- REPORT_WARNING("Failed to retrieve cluster configuration");
- ndbout << "(Cause of failure: " << getErrorString() << ")" << endl;
- ndbout << "Attempt " << retry << " of " << retry_max << ". "
- << "Trying again in "<< retry_interval <<" seconds..."
- << endl << endl;
- NdbSleep_SecSleep(retry_interval);
- } else {
- break;
- }
- retry++;
- }
-
- ndb_mgm_destroy_handle(&m_handle);
- m_handle= 0;
- m_mgmd_port= 0;
- m_mgmd_host= 0;
- return -1;
+ConfigRetriever::do_connect(int no_retries,
+ int retry_delay_in_seconds, int verbose)
+{
+ return
+ (ndb_mgm_connect(m_handle,no_retries,retry_delay_in_seconds,verbose)==0) ?
+ 0 : -1;
}
//****************************************************************************
@@ -140,22 +111,9 @@ ConfigRetriever::getConfig() {
struct ndb_mgm_configuration * p = 0;
- if(m_handle != 0){
+ if(m_handle != 0)
p = getConfig(m_handle);
- } else {
- for (unsigned int i = 0; i<_localConfig.ids.size(); i++){
- MgmtSrvrId * m = &_localConfig.ids[i];
- switch(m->type){
- case MgmId_File:
- p = getConfig(m->name.c_str());
- break;
- case MgmId_TCP:
- break;
- }
- if(p)
- break;
- }
- }
+
if(p == 0)
return 0;
@@ -227,6 +185,16 @@ ConfigRetriever::setError(ErrorType et, const char * s){
latestErrorType = et;
}
+void
+ConfigRetriever::resetError(){
+ setError(CR_NO_ERROR,0);
+}
+
+int
+ConfigRetriever::hasError()
+{
+ return latestErrorType != CR_NO_ERROR;
+}
const char *
ConfigRetriever::getErrorString(){
@@ -341,16 +309,23 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf, Uint32
}
Uint32
-ConfigRetriever::allocNodeId(){
- unsigned nodeid= _ownNodeId;
-
- if(m_handle != 0){
- int res= ndb_mgm_alloc_nodeid(m_handle, m_version, &nodeid, m_node_type);
- if(res != 0) {
- setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
- return 0;
+ConfigRetriever::allocNodeId(int no_retries, int retry_delay_in_seconds)
+{
+ _ownNodeId= 0;
+ if(m_handle != 0)
+ {
+ while (1)
+ {
+ int res= ndb_mgm_alloc_nodeid(m_handle, m_version, m_node_type);
+ if(res >= 0)
+ return _ownNodeId= (Uint32)res;
+ if (no_retries == 0)
+ break;
+ no_retries--;
+ NdbSleep_SecSleep(retry_delay_in_seconds);
}
- }
-
- return _ownNodeId= nodeid;
+ setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
+ } else
+ setError(CR_ERROR, "management server handle not initialized");
+ return 0;
}
diff --git a/ndb/src/common/util/version.c b/ndb/src/common/util/version.c
index 965d0a735e1..7a537297861 100644
--- a/ndb/src/common/util/version.c
+++ b/ndb/src/common/util/version.c
@@ -70,7 +70,6 @@ struct NdbUpGradeCompatible {
#ifndef TEST_VERSION
struct NdbUpGradeCompatible ndbCompatibleTable_full[] = {
{ MAKE_VERSION(3,5,2), MAKE_VERSION(3,5,1), UG_Exact },
- { MAKE_VERSION(4,1,8), MAKE_VERSION(3,5,4), UG_Exact }, /* Aligned version with MySQL */
{ 0, 0, UG_Null }
};
diff --git a/ndb/src/kernel/blocks/dbdih/Dbdih.hpp b/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
index 14fa262f871..0a2d50cb876 100644
--- a/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
+++ b/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
@@ -147,7 +147,6 @@ public:
Uint32 nfConnect;
Uint32 table;
Uint32 userpointer;
- Uint32 nodeCount;
BlockReference userblockref;
};
typedef Ptr<ConnectRecord> ConnectRecordPtr;
diff --git a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
index 76aa745c3e0..4592b121c7e 100644
--- a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
+++ b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
@@ -7080,24 +7080,22 @@ void Dbdih::execDIGETPRIMREQ(Signal* signal)
ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE);
connectPtr.i = signal->theData[0];
- if(connectPtr.i != RNIL){
+ if(connectPtr.i != RNIL)
+ {
jam();
ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord);
- ndbrequire(connectPtr.p->connectState == ConnectRecord::INUSE);
- getFragstore(tabPtr.p, fragId, fragPtr);
- connectPtr.p->nodeCount = extractNodeInfo(fragPtr.p, connectPtr.p->nodes);
signal->theData[0] = connectPtr.p->userpointer;
- signal->theData[1] = passThrough;
- signal->theData[2] = connectPtr.p->nodes[0];
- sendSignal(connectPtr.p->userblockref, GSN_DIGETPRIMCONF, signal, 3, JBB);
- return;
- }//if
- //connectPtr.i == RNIL -> question without connect record
+ }
+ else
+ {
+ jam();
+ signal->theData[0] = RNIL;
+ }
+
Uint32 nodes[MAX_REPLICAS];
getFragstore(tabPtr.p, fragId, fragPtr);
Uint32 count = extractNodeInfo(fragPtr.p, nodes);
- signal->theData[0] = RNIL;
signal->theData[1] = passThrough;
signal->theData[2] = nodes[0];
signal->theData[3] = nodes[1];
diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
index 739c3c741fb..0c63cb5fe17 100644
--- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
+++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
@@ -550,6 +550,11 @@ public:
UintR scanErrorCounter;
UintR scanLocalFragid;
UintR scanSchemaVersion;
+
+ /**
+ * This is _always_ main table, even in range scan
+ * in which case scanTcrec->fragmentptr is different
+ */
Uint32 fragPtrI;
UintR scanStoredProcId;
ScanState scanState;
@@ -2925,4 +2930,23 @@ Dblqh::ScanRecord::check_scan_batch_completed() const
(max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes));
}
+inline
+void
+Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
+{
+ if (index == 0) {
+ acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
+ } else {
+ Uint32 attr_buf_index, attr_buf_rec;
+
+ AttrbufPtr regAttrPtr;
+ jam();
+ attr_buf_rec= (index + 31) / 32;
+ attr_buf_index= (index - 1) & 31;
+ regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
+ ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
+ acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
+ }
+}
+
#endif
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index 5622706a96c..88e8f25b004 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -3084,6 +3084,7 @@ void Dblqh::execATTRINFO(Signal* signal)
return;
break;
default:
+ ndbout_c("%d", regTcPtr->transactionState);
ndbrequire(false);
break;
}//switch
@@ -7161,10 +7162,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
// Update timer on tcConnectRecord
tcConnectptr.p->tcTimer = cLqhTimeOutCount;
-
init_acc_ptr_list(scanptr.p);
- scanptr.p->m_curr_batch_size_rows = 0;
- scanptr.p->m_curr_batch_size_bytes= 0;
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
scanNextLoopLab(signal);
}//Dblqh::continueScanNextReqLab()
@@ -7363,22 +7361,32 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
tcConnectptr.i = scanptr.p->scanTcrec;
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
releaseActiveFrag(signal);
+
if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) {
if ((scanptr.p->scanErrorCounter > 0) ||
(scanptr.p->scanCompletedStatus == ZTRUE)) {
jam();
+ scanptr.p->m_curr_batch_size_rows = 0;
+ scanptr.p->m_curr_batch_size_bytes = 0;
closeScanLab(signal);
} else if (scanptr.p->check_scan_batch_completed() &&
scanptr.p->scanLockHold != ZTRUE) {
jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
+ } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
+ jam();
+ closeScanLab(signal);
+ return;
} else {
jam();
/*
- We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only
- come here when scanHoldLock == ZTRUE
- */
+ * We came here after releasing locks after
+ * receiving SCAN_NEXTREQ from TC. We only come here
+ * when scanHoldLock == ZTRUE
+ */
+ scanptr.p->m_curr_batch_size_rows = 0;
+ scanptr.p->m_curr_batch_size_bytes = 0;
continueScanNextReqLab(signal);
}//if
} else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) {
@@ -7465,25 +7473,6 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP)
scanP->scan_acc_index = 0;
}
-inline
-void
-Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index)
-{
- if (index == 0) {
- acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0];
- } else {
- Uint32 attr_buf_index, attr_buf_rec;
-
- AttrbufPtr regAttrPtr;
- jam();
- attr_buf_rec= (index + 31) / 32;
- attr_buf_index= (index - 1) & 31;
- regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec];
- ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
- acc_ptr= (Uint32*)&regAttrPtr.p->attrbuf[attr_buf_index];
- }
-}
-
Uint32
Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP,
Uint32 index,
@@ -7714,6 +7703,9 @@ void Dblqh::abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode){
jam();
scanptr.i = scan_ptr_i;
c_scanRecordPool.getPtr(scanptr);
+
+ fragptr.i = tcConnectptr.p->fragmentptr;
+ ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
finishScanrec(signal);
releaseScanrec(signal);
tcConnectptr.p->transactionState = TcConnectionrec::IDLE;
@@ -8007,6 +7999,13 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
/*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
************************************************************ */
+ if (!scanptr.p->scanLockHold)
+ {
+ jam();
+ closeScanLab(signal);
+ return;
+ }
+
if (scanptr.p->scanCompletedStatus == ZTRUE) {
if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) {
@@ -8507,8 +8506,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
ScanFragRef::SignalLength, JBB);
} else {
jam();
- scanptr.p->m_curr_batch_size_rows = 0;
- scanptr.p->m_curr_batch_size_bytes= 0;
sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
}//if
finishScanrec(signal);
@@ -8576,10 +8573,12 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
/**
* Used for scan take over
*/
- FragrecordPtr tFragPtr;
- tFragPtr.i = fragptr.p->tableFragptr;
- ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
- scanptr.p->fragPtrI = fragptr.p->tableFragptr;
+ {
+ FragrecordPtr tFragPtr;
+ tFragPtr.i = fragptr.p->tableFragptr;
+ ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
+ scanptr.p->fragPtrI = fragptr.p->tableFragptr;
+ }
/**
* !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11
@@ -8588,8 +8587,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
stop += start;
- Uint32 free = tFragPtr.p->m_scanNumberMask.find(start);
-
+ Uint32 free = fragptr.p->m_scanNumberMask.find(start);
+
if(free == Fragrecord::ScanNumberMask::NotFound || free >= stop){
jam();
@@ -8603,16 +8602,16 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
*/
scanptr.p->scanState = ScanRecord::IN_QUEUE;
LocalDLFifoList<ScanRecord> queue(c_scanRecordPool,
- tFragPtr.p->m_queuedScans);
+ fragptr.p->m_queuedScans);
queue.add(scanptr);
return ZOK;
}
-
+
scanptr.p->scanNumber = free;
- tFragPtr.p->m_scanNumberMask.clear(free);// Update mask
+ fragptr.p->m_scanNumberMask.clear(free);// Update mask
- LocalDLList<ScanRecord> active(c_scanRecordPool, tFragPtr.p->m_activeScans);
+ LocalDLList<ScanRecord> active(c_scanRecordPool, fragptr.p->m_activeScans);
active.add(scanptr);
if(scanptr.p->scanKeyinfoFlag){
jam();
@@ -8672,12 +8671,8 @@ void Dblqh::finishScanrec(Signal* signal)
{
release_acc_ptr_list(scanptr.p);
- FragrecordPtr tFragPtr;
- tFragPtr.i = scanptr.p->fragPtrI;
- ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
-
LocalDLFifoList<ScanRecord> queue(c_scanRecordPool,
- tFragPtr.p->m_queuedScans);
+ fragptr.p->m_queuedScans);
if(scanptr.p->scanState == ScanRecord::IN_QUEUE){
jam();
@@ -8695,11 +8690,11 @@ void Dblqh::finishScanrec(Signal* signal)
ndbrequire(tmp.p == scanptr.p);
}
- LocalDLList<ScanRecord> scans(c_scanRecordPool, tFragPtr.p->m_activeScans);
+ LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans);
scans.release(scanptr);
const Uint32 scanNumber = scanptr.p->scanNumber;
- ndbrequire(!tFragPtr.p->m_scanNumberMask.get(scanNumber));
+ ndbrequire(!fragptr.p->m_scanNumberMask.get(scanNumber));
ScanRecordPtr restart;
/**
@@ -8707,13 +8702,13 @@ void Dblqh::finishScanrec(Signal* signal)
*/
if(scanNumber == NR_ScanNo || !queue.first(restart)){
jam();
- tFragPtr.p->m_scanNumberMask.set(scanNumber);
+ fragptr.p->m_scanNumberMask.set(scanNumber);
return;
}
if(ERROR_INSERTED(5034)){
jam();
- tFragPtr.p->m_scanNumberMask.set(scanNumber);
+ fragptr.p->m_scanNumberMask.set(scanNumber);
return;
}
@@ -8724,7 +8719,7 @@ void Dblqh::finishScanrec(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
restart.p->scanNumber = scanNumber;
restart.p->scanState = ScanRecord::WAIT_ACC_SCAN;
-
+
queue.remove(restart);
scans.add(restart);
if(restart.p->scanKeyinfoFlag){
@@ -8912,6 +8907,13 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
conf->total_len= total_len;
sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF,
signal, ScanFragConf::SignalLength, JBB);
+
+ if(!scanptr.p->scanLockHold)
+ {
+ jam();
+ scanptr.p->m_curr_batch_size_rows = 0;
+ scanptr.p->m_curr_batch_size_bytes= 0;
+ }
}//Dblqh::sendScanFragConf()
/* ######################################################################### */
diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
index a209df24c44..fb90ccc8c90 100644
--- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
+++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
@@ -1054,9 +1054,8 @@ public:
// Id of the ScanRecord this fragment scan belongs to
Uint32 scanRec;
- // The maximum number of operations that can be scanned before
- // returning to TC
- Uint16 scanFragConcurrency;
+ // The value of fragmentCompleted in the last received SCAN_FRAGCONF
+ Uint8 m_scan_frag_conf_status;
inline void startFragTimer(Uint32 timeVal){
scanFragTimer = timeVal;
@@ -1193,8 +1192,10 @@ public:
// Number of operation records per scanned fragment
// Number of operations in first batch
// Max number of bytes per batch
- Uint16 noOprecPerFrag;
- Uint16 first_batch_size;
+ union {
+ Uint16 first_batch_size_rows;
+ Uint16 batch_size_rows;
+ };
Uint32 batch_byte_size;
Uint32 scanRequestInfo; // ScanFrag format
diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
index d8b3ee10532..07dbb370ec6 100644
--- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
+++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
@@ -8646,9 +8646,9 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
scanptr.p->scanTableref = tabptr.i;
scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion;
scanptr.p->scanParallel = scanParallel;
- scanptr.p->noOprecPerFrag = noOprecPerFrag;
- scanptr.p->first_batch_size= scanTabReq->first_batch_size;
- scanptr.p->batch_byte_size= scanTabReq->batch_byte_size;
+ scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size;
+ scanptr.p->batch_byte_size = scanTabReq->batch_byte_size;
+ scanptr.p->batch_size_rows = noOprecPerFrag;
Uint32 tmp = 0;
const UintR ri = scanTabReq->requestInfo;
@@ -8672,7 +8672,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
ndbrequire(list.seize(ptr));
ptr.p->scanRec = scanptr.i;
ptr.p->scanFragId = 0;
- ptr.p->scanFragConcurrency = noOprecPerFrag;
ptr.p->m_apiPtr = cdata[i];
}//for
@@ -8945,6 +8944,25 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
scanptr.i = scanFragptr.p->scanRec;
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
+ /**
+ * This must be false as select count(*) otherwise
+ * can "pass" committing on backup fragments and
+ * get incorrect row count
+ */
+ if(false && ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo))
+ {
+ jam();
+ Uint32 max = 3+signal->theData[6];
+ Uint32 nodeid = getOwnNodeId();
+ for(Uint32 i = 3; i<max; i++)
+ if(signal->theData[i] == nodeid)
+ {
+ jam();
+ tnodeid = nodeid;
+ break;
+ }
+ }
+
{
/**
* Check table
@@ -9141,6 +9159,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0];
const Uint32 noCompletedOps = conf->completedOps;
+ const Uint32 status = conf->fragmentCompleted;
scanFragptr.i = conf->senderData;
c_scan_frag_pool.getPtr(scanFragptr);
@@ -9163,11 +9182,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
- const Uint32 status = conf->fragmentCompleted;
-
if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
jam();
- if(status == ZFALSE){
+ if(status == 0){
/**
* We have started closing = we sent a close -> ignore this
*/
@@ -9184,11 +9201,11 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
return;
}
- if(status == ZCLOSED && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
+ if(noCompletedOps == 0 && status != 0 &&
+ scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
/**
* Start on next fragment
*/
- ndbrequire(noCompletedOps == 0);
scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
scanFragptr.p->startFragTimer(ctcTimer);
@@ -9218,6 +9235,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
scanptr.p->m_queued_count++;
}
+ scanFragptr.p->m_scan_frag_conf_status = status;
scanFragptr.p->m_ops = noCompletedOps;
scanFragptr.p->m_totalLen = total_len;
scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY;
@@ -9311,7 +9329,6 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
/*********************************************************************
* APPLICATION IS CLOSING THE SCAN.
**********************************************************************/
- ndbrequire(len == 0);
close_scan_req(signal, scanptr, true);
return;
}//if
@@ -9330,11 +9347,12 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
// Copy op ptrs so I dont overwrite them when sending...
memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len);
- ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0];
- nextReq->closeFlag = ZFALSE;
- nextReq->transId1 = apiConnectptr.p->transid[0];
- nextReq->transId2 = apiConnectptr.p->transid[1];
- nextReq->batch_size_bytes= scanP->batch_byte_size;
+ ScanFragNextReq tmp;
+ tmp.closeFlag = ZFALSE;
+ tmp.transId1 = apiConnectptr.p->transid[0];
+ tmp.transId2 = apiConnectptr.p->transid[1];
+ tmp.batch_size_rows = scanP->batch_size_rows;
+ tmp.batch_size_bytes = scanP->batch_byte_size;
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
@@ -9344,15 +9362,37 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
c_scan_frag_pool.getPtr(scanFragptr);
ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED);
- scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
scanFragptr.p->startFragTimer(ctcTimer);
-
scanFragptr.p->m_ops = 0;
- nextReq->senderData = scanFragptr.i;
- nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency;
- sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
- ScanFragNextReq::SignalLength, JBB);
+ if(scanFragptr.p->m_scan_frag_conf_status)
+ {
+ /**
+ * last scan was complete
+ */
+ jam();
+ ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag);
+ scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
+
+ tcConnectptr.i = scanptr.p->scanTcrec;
+ ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
+ scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++;
+ signal->theData[0] = tcConnectptr.p->dihConnectptr;
+ signal->theData[1] = scanFragptr.i;
+ signal->theData[2] = scanptr.p->scanTableref;
+ signal->theData[3] = scanFragptr.p->scanFragId;
+ sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB);
+ }
+ else
+ {
+ jam();
+ scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
+ ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend();
+ * req = tmp;
+ req->senderData = scanFragptr.i;
+ sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
+ ScanFragNextReq::SignalLength, JBB);
+ }
delivered.remove(scanFragptr);
running.add(scanFragptr);
}//for
@@ -9416,7 +9456,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
ndbrequire(curr.p->scanFragState == ScanFragRec::DELIVERED);
delivered.remove(curr);
- if(curr.p->m_ops > 0){
+ if(curr.p->m_ops > 0 && curr.p->m_scan_frag_conf_status == 0){
jam();
running.add(curr);
curr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
@@ -9551,7 +9591,7 @@ void Dbtc::sendScanFragReq(Signal* signal,
req->transId1 = apiConnectptr.p->transid[0];
req->transId2 = apiConnectptr.p->transid[1];
req->clientOpPtr = scanFragP->m_apiPtr;
- req->batch_size_rows= scanFragP->scanFragConcurrency;
+ req->batch_size_rows= scanP->batch_size_rows;
req->batch_size_bytes= scanP->batch_byte_size;
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB);
@@ -9573,6 +9613,8 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
jam();
ops += 21;
}
+
+ Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId;
ScanTabConf * conf = (ScanTabConf*)&signal->theData[0];
conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
@@ -9588,24 +9630,25 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
ScanFragRecPtr curr = ptr; // Remove while iterating...
queued.next(ptr);
+ bool done = curr.p->m_scan_frag_conf_status && --left;
+
* ops++ = curr.p->m_apiPtr;
- * ops++ = curr.i;
+ * ops++ = done ? RNIL : curr.i;
* ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops;
queued.remove(curr);
- if(curr.p->m_ops > 0){
+ if(!done){
delivered.add(curr);
curr.p->scanFragState = ScanFragRec::DELIVERED;
curr.p->stopFragTimer();
} else {
- (* --ops) = ScanTabConf::EndOfData; ops++;
c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
}
}
}
-
+
if(scanPtr.p->m_delivered_scan_frags.isEmpty() &&
scanPtr.p->m_running_scan_frags.isEmpty()){
conf->requestInfo = op_count | ScanTabConf::EndOfData;
@@ -10424,9 +10467,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sfp.i,
sfp.p->scanFragState,
sfp.p->scanFragId);
- infoEvent(" nodeid=%d, concurr=%d, timer=%d",
+ infoEvent(" nodeid=%d, timer=%d",
refToNode(sfp.p->lqhBlockref),
- sfp.p->scanFragConcurrency,
sfp.p->scanFragTimer);
}
@@ -10504,7 +10546,7 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
sp.p->scanAiLength,
sp.p->scanParallel,
sp.p->scanReceivedOperations,
- sp.p->noOprecPerFrag);
+ sp.p->batch_size_rows);
infoEvent(" schv=%d, tab=%d, sproc=%d",
sp.p->scanSchemaVersion,
sp.p->scanTableref,
diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp
index d11d5f7176a..f6d9a0ac35a 100644
--- a/ndb/src/kernel/blocks/suma/Suma.cpp
+++ b/ndb/src/kernel/blocks/suma/Suma.cpp
@@ -1888,7 +1888,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->requestInfo = 0;
req->savePointId = 0;
ScanFragReq::setLockMode(req->requestInfo, 0);
- ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
+ ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
ScanFragReq::setAttrLen(req->requestInfo, attrLen);
req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
diff --git a/ndb/src/kernel/main.cpp b/ndb/src/kernel/main.cpp
index 926647838c9..f34e16318cd 100644
--- a/ndb/src/kernel/main.cpp
+++ b/ndb/src/kernel/main.cpp
@@ -19,7 +19,6 @@
#include <ndb_version.h>
#include "Configuration.hpp"
-#include <LocalConfig.hpp>
#include <TransporterRegistry.hpp>
#include "vm/SimBlockList.hpp"
@@ -69,16 +68,9 @@ int main(int argc, char** argv)
return NRT_Default;
}
- LocalConfig local_config;
- if (!local_config.init(theConfig->getConnectString(),0)){
- local_config.printError();
- local_config.printUsage();
- return NRT_Default;
- }
-
{ // Do configuration
signal(SIGPIPE, SIG_IGN);
- theConfig->fetch_configuration(local_config);
+ theConfig->fetch_configuration();
}
chdir(NdbConfig_get_path(0));
@@ -141,7 +133,7 @@ int main(int argc, char** argv)
exit(0);
}
g_eventLogger.info("Ndb has terminated (pid %d) restarting", child);
- theConfig->fetch_configuration(local_config);
+ theConfig->fetch_configuration();
}
g_eventLogger.info("Angel pid: %d ndb pid: %d", getppid(), getpid());
diff --git a/ndb/src/kernel/vm/Configuration.cpp b/ndb/src/kernel/vm/Configuration.cpp
index aac035fe1b7..931b4da5a17 100644
--- a/ndb/src/kernel/vm/Configuration.cpp
+++ b/ndb/src/kernel/vm/Configuration.cpp
@@ -17,7 +17,6 @@
#include <ndb_global.h>
#include <ndb_opts.h>
-#include <LocalConfig.hpp>
#include "Configuration.hpp"
#include <ErrorHandlingMacros.hpp>
#include "GlobalData.hpp"
@@ -35,6 +34,7 @@
#include <kernel_types.h>
#include <ndb_limits.h>
+#include <ndbapi_limits.h>
#include "pc.hpp"
#include <LogLevel.hpp>
#include <NdbSleep.h>
@@ -108,7 +108,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
bool
Configuration::init(int argc, char** argv)
{
- const char *load_default_groups[]= { "ndbd",0 };
+ const char *load_default_groups[]= { "mysql_cluster","ndbd",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
@@ -189,7 +189,7 @@ Configuration::closeConfiguration(){
}
void
-Configuration::fetch_configuration(LocalConfig &local_config){
+Configuration::fetch_configuration(){
/**
* Fetch configuration from management server
*/
@@ -199,8 +199,17 @@ Configuration::fetch_configuration(LocalConfig &local_config){
m_mgmd_port= 0;
m_mgmd_host= 0;
- m_config_retriever= new ConfigRetriever(local_config, NDB_VERSION, NODE_TYPE_DB);
- if(m_config_retriever->do_connect() == -1){
+ m_config_retriever= new ConfigRetriever(getConnectString(),
+ NDB_VERSION, NODE_TYPE_DB);
+
+ if (m_config_retriever->hasError())
+ {
+ ERROR_SET(fatal, ERR_INVALID_CONFIG,
+ "Could not connect initialize handle to management server",
+ m_config_retriever->getErrorString());
+ }
+
+ if(m_config_retriever->do_connect(12,5,1) == -1){
const char * s = m_config_retriever->getErrorString();
if(s == 0)
s = "No error given!";
@@ -215,13 +224,7 @@ Configuration::fetch_configuration(LocalConfig &local_config){
ConfigRetriever &cr= *m_config_retriever;
- if((globalData.ownId = cr.allocNodeId()) == 0){
- for(Uint32 i = 0; i<3; i++){
- NdbSleep_SecSleep(3);
- if((globalData.ownId = cr.allocNodeId()) != 0)
- break;
- }
- }
+ globalData.ownId = cr.allocNodeId(2 /*retry*/,3 /*delay*/);
if(globalData.ownId == 0){
ERROR_SET(fatal, ERR_INVALID_CONFIG,
@@ -452,6 +455,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){
unsigned int noOfTables = 0;
unsigned int noOfUniqueHashIndexes = 0;
unsigned int noOfOrderedIndexes = 0;
+ unsigned int noOfTriggers = 0;
unsigned int noOfReplicas = 0;
unsigned int noOfDBNodes = 0;
unsigned int noOfAPINodes = 0;
@@ -476,6 +480,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){
{ CFG_DB_NO_TABLES, &noOfTables, false },
{ CFG_DB_NO_ORDERED_INDEXES, &noOfOrderedIndexes, false },
{ CFG_DB_NO_UNIQUE_HASH_INDEXES, &noOfUniqueHashIndexes, false },
+ { CFG_DB_NO_TRIGGERS, &noOfTriggers, true },
{ CFG_DB_NO_REPLICAS, &noOfReplicas, false },
{ CFG_DB_NO_ATTRIBUTES, &noOfAttributes, false },
{ CFG_DB_NO_OPS, &noOfOperations, false },
@@ -584,6 +589,18 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){
ConfigValues::Iterator it2(*ownConfig, db.m_config);
it2.set(CFG_DB_NO_TABLES, noOfTables);
it2.set(CFG_DB_NO_ATTRIBUTES, noOfAttributes);
+ {
+ Uint32 neededNoOfTriggers = /* types: Insert/Update/Delete/Custom */
+ 3 * noOfUniqueHashIndexes + /* for unique hash indexes, I/U/D */
+ 3 * NDB_MAX_ACTIVE_EVENTS + /* for events in suma, I/U/D */
+ 3 * noOfTables + /* for backup, I/U/D */
+ noOfOrderedIndexes; /* for ordered indexes, C */
+ if (noOfTriggers < neededNoOfTriggers)
+ {
+ noOfTriggers= neededNoOfTriggers;
+ it2.set(CFG_DB_NO_TRIGGERS, noOfTriggers);
+ }
+ }
/**
* Do size calculations
diff --git a/ndb/src/kernel/vm/Configuration.hpp b/ndb/src/kernel/vm/Configuration.hpp
index e4cd64f5ca8..acf0e163a84 100644
--- a/ndb/src/kernel/vm/Configuration.hpp
+++ b/ndb/src/kernel/vm/Configuration.hpp
@@ -21,7 +21,6 @@
#include <ndb_types.h>
class ConfigRetriever;
-class LocalConfig;
class Configuration {
public:
@@ -33,7 +32,7 @@ public:
*/
bool init(int argc, char** argv);
- void fetch_configuration(LocalConfig &local_config);
+ void fetch_configuration();
void setupConfiguration();
void closeConfiguration();
diff --git a/ndb/src/mgmapi/LocalConfig.cpp b/ndb/src/mgmapi/LocalConfig.cpp
index d0ff97cdedf..8f1e2ee8100 100644
--- a/ndb/src/mgmapi/LocalConfig.cpp
+++ b/ndb/src/mgmapi/LocalConfig.cpp
@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#include <LocalConfig.hpp>
+#include "LocalConfig.hpp"
#include <NdbEnv.h>
#include <NdbConfig.h>
#include <NdbAutoPtr.hpp>
@@ -294,4 +294,19 @@ LocalConfig::readConnectString(const char * connectString,
return return_value;
}
+char *
+LocalConfig::makeConnectString(char *buf, int sz)
+{
+ int p= BaseString::snprintf(buf,sz,"nodeid=%d", _ownNodeId);
+ for (int i = 0; (i < ids.size()) && (sz-p > 0); i++)
+ {
+ if (ids[i].type != MgmId_TCP)
+ continue;
+ p+=BaseString::snprintf(buf+p,sz-p,",%s:%d",
+ ids[i].name.c_str(), ids[i].port);
+ }
+ buf[sz-1]=0;
+ return buf;
+}
+
template class Vector<MgmtSrvrId>;
diff --git a/ndb/src/mgmapi/LocalConfig.hpp b/ndb/src/mgmapi/LocalConfig.hpp
new file mode 100644
index 00000000000..c415ec1be91
--- /dev/null
+++ b/ndb/src/mgmapi/LocalConfig.hpp
@@ -0,0 +1,68 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef LocalConfig_H
+#define LocalConfig_H
+
+#include <ndb_global.h>
+#include <NdbOut.hpp>
+
+//****************************************************************************
+// Description: The class LocalConfig corresponds to the information possible
+// to give in the local configuration file.
+//*****************************************************************************
+
+enum MgmtSrvrId_Type {
+ MgmId_TCP = 0,
+ MgmId_File = 1
+};
+
+struct MgmtSrvrId {
+ MgmtSrvrId_Type type;
+ BaseString name;
+ unsigned int port;
+};
+
+struct LocalConfig {
+
+ int _ownNodeId;
+ Vector<MgmtSrvrId> ids;
+
+ int error_line;
+ char error_msg[256];
+
+ LocalConfig();
+ ~LocalConfig();
+ bool init(const char *connectString = 0,
+ const char *fileName = 0);
+
+ void printError() const;
+ void printUsage() const;
+
+ void setError(int lineNumber, const char * _msg);
+ bool readConnectString(const char *, const char *info);
+ bool readFile(const char * file, bool &fopenError);
+ bool parseLine(char * line, int lineNumber);
+
+ bool parseNodeId(const char *buf);
+ bool parseHostName(const char *buf);
+ bool parseFileName(const char *buf);
+ bool parseString(const char *buf, BaseString &err);
+ char * makeConnectString(char *buf, int sz);
+};
+
+#endif // LocalConfig_H
+
diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp
index 51f2d7cee01..ca3a2a2186d 100644
--- a/ndb/src/mgmapi/mgmapi.cpp
+++ b/ndb/src/mgmapi/mgmapi.cpp
@@ -20,6 +20,7 @@
#include <LocalConfig.hpp>
#include <NdbAutoPtr.hpp>
+#include <NdbSleep.h>
#include <NdbTCP.h>
#include "mgmapi.h"
#include "mgmapi_debug.h"
@@ -83,8 +84,8 @@ typedef Parser<ParserDummy> Parser_t;
#define NDB_MGM_MAX_ERR_DESC_SIZE 256
struct ndb_mgm_handle {
- char * hostname;
- unsigned short port;
+ char * connectstring;
+ int cfg_i;
int connected;
int last_error;
@@ -95,7 +96,7 @@ struct ndb_mgm_handle {
NDB_SOCKET_TYPE socket;
- char cfg_ptr[sizeof(LocalConfig)];
+ LocalConfig cfg;
#ifdef MGMAPI_LOG
FILE* logfile;
@@ -148,14 +149,16 @@ ndb_mgm_create_handle()
h->connected = 0;
h->last_error = 0;
h->last_error_line = 0;
- h->hostname = 0;
h->socket = NDB_INVALID_SOCKET;
h->read_timeout = 50000;
h->write_timeout = 100;
-
- new (h->cfg_ptr) LocalConfig;
+ h->cfg_i = 0;
strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE);
+
+ new (&(h->cfg)) LocalConfig;
+ h->cfg.init(0, 0);
+
#ifdef MGMAPI_LOG
h->logfile = 0;
#endif
@@ -163,6 +166,23 @@ ndb_mgm_create_handle()
return h;
}
+extern "C"
+int
+ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv)
+{
+ new (&(handle->cfg)) LocalConfig;
+ if (!handle->cfg.init(mgmsrv, 0) ||
+ handle->cfg.ids.size() == 0)
+ {
+ new (&(handle->cfg)) LocalConfig;
+ handle->cfg.init(0, 0); /* reset the LocalCongig */
+ SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, "");
+ return -1;
+ }
+ handle->cfg_i= 0;
+ return 0;
+}
+
/**
* Destroy a handle
*/
@@ -175,14 +195,13 @@ ndb_mgm_destroy_handle(NdbMgmHandle * handle)
if((* handle)->connected){
ndb_mgm_disconnect(* handle);
}
- my_free((* handle)->hostname,MYF(MY_ALLOW_ZERO_PTR));
#ifdef MGMAPI_LOG
if ((* handle)->logfile != 0){
fclose((* handle)->logfile);
(* handle)->logfile = 0;
}
#endif
- ((LocalConfig*)((*handle)->cfg_ptr))->~LocalConfig();
+ (*handle)->cfg.~LocalConfig();
my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR));
* handle = 0;
}
@@ -314,7 +333,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
*/
extern "C"
int
-ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv)
+ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
+ int retry_delay_in_seconds, int verbose)
{
SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_connect");
CHECK_HANDLE(handle, -1);
@@ -331,36 +351,48 @@ ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv)
/**
* Do connect
*/
- LocalConfig *cfg= (LocalConfig*)(handle->cfg_ptr);
- new (cfg) LocalConfig;
- if (!cfg->init(mgmsrv, 0) ||
- cfg->ids.size() == 0)
- {
- SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, "");
- return -1;
- }
-
+ LocalConfig &cfg= handle->cfg;
NDB_SOCKET_TYPE sockfd= NDB_INVALID_SOCKET;
Uint32 i;
- for (i = 0; i < cfg->ids.size(); i++)
+ while (sockfd == NDB_INVALID_SOCKET)
{
- if (cfg->ids[i].type != MgmId_TCP)
- continue;
- SocketClient s(cfg->ids[i].name.c_str(), cfg->ids[i].port);
- sockfd = s.connect();
+ // do all the mgmt servers
+ for (i = 0; i < cfg.ids.size(); i++)
+ {
+ if (cfg.ids[i].type != MgmId_TCP)
+ continue;
+ SocketClient s(cfg.ids[i].name.c_str(), cfg.ids[i].port);
+ sockfd = s.connect();
+ if (sockfd != NDB_INVALID_SOCKET)
+ break;
+ }
if (sockfd != NDB_INVALID_SOCKET)
break;
- }
- if (sockfd == NDB_INVALID_SOCKET)
- {
- setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
- "Unable to connect using connectstring %s", mgmsrv);
- return -1;
+ if (verbose > 0) {
+ char buf[1024];
+ ndbout_c("Unable to connect with connect string: %s",
+ cfg.makeConnectString(buf,sizeof(buf)));
+ verbose= -1;
+ }
+ if (no_retries == 0) {
+ char buf[1024];
+ setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
+ "Unable to connect with connect string: %s",
+ cfg.makeConnectString(buf,sizeof(buf)));
+ return -1;
+ }
+ if (verbose == -1) {
+ ndbout << "retrying every " << retry_delay_in_seconds << " seconds:";
+ verbose= -2;
+ }
+ NdbSleep_SecSleep(retry_delay_in_seconds);
+ if (verbose == -2) {
+ ndbout << " " << no_retries;
+ }
+ no_retries--;
}
- my_free(handle->hostname,MYF(MY_ALLOW_ZERO_PTR));
- handle->hostname = my_strdup(cfg->ids[i].name.c_str(),MYF(MY_WME));
- handle->port = cfg->ids[i].port;
+ handle->cfg_i = i;
handle->socket = sockfd;
handle->connected = 1;
@@ -1068,7 +1100,9 @@ ndb_mgm_listen_event(NdbMgmHandle handle, int filter[])
};
CHECK_HANDLE(handle, -1);
- SocketClient s(handle->hostname, handle->port);
+ const char *hostname= ndb_mgm_get_connected_host(handle);
+ int port= ndb_mgm_get_connected_port(handle);
+ SocketClient s(hostname, port);
const NDB_SOCKET_TYPE sockfd = s.connect();
if (sockfd < 0) {
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
@@ -1613,16 +1647,37 @@ ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *cfg)
extern "C"
int
-ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodeid, int nodetype)
+ndb_mgm_get_configuration_nodeid(NdbMgmHandle handle)
{
+ CHECK_HANDLE(handle, 0);
+ return handle->cfg._ownNodeId;
+}
+
+extern "C"
+int ndb_mgm_get_connected_port(NdbMgmHandle handle)
+{
+ return handle->cfg.ids[handle->cfg_i].port;
+}
+extern "C"
+const char *ndb_mgm_get_connected_host(NdbMgmHandle handle)
+{
+ return handle->cfg.ids[handle->cfg_i].name.c_str();
+}
+
+extern "C"
+int
+ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype)
+{
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
+ int nodeid= handle->cfg._ownNodeId;
+
Properties args;
args.put("version", version);
args.put("nodetype", nodetype);
- args.put("nodeid", *pnodeid);
+ args.put("nodeid", nodeid);
args.put("user", "mysqld");
args.put("password", "mysqld");
args.put("public key", "a public key");
@@ -1638,26 +1693,29 @@ ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodei
prop= ndb_mgm_call(handle, reply, "get nodeid", &args);
CHECK_REPLY(prop, -1);
- int res= -1;
+ nodeid= -1;
do {
const char * buf;
if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){
+ const char *hostname= ndb_mgm_get_connected_host(handle);
+ unsigned port= ndb_mgm_get_connected_port(handle);
BaseString err;
err.assfmt("Could not alloc node id at %s port %d: %s",
- handle->hostname, handle->port, buf);
+ hostname, port, buf);
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
err.c_str());
break;
}
- if(!prop->get("nodeid", pnodeid) != 0){
+ Uint32 _nodeid;
+ if(!prop->get("nodeid", &_nodeid) != 0){
ndbout_c("ERROR Message: <nodeid Unspecified>\n");
break;
}
- res= 0;
+ nodeid= _nodeid;
}while(0);
delete prop;
- return res;
+ return nodeid;
}
/*****************************************************************************
diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp
index bdeb885ed8b..54beaa49d3f 100644
--- a/ndb/src/mgmclient/CommandInterpreter.cpp
+++ b/ndb/src/mgmclient/CommandInterpreter.cpp
@@ -153,7 +153,6 @@ private:
NdbMgmHandle m_mgmsrv;
bool connected;
- const char *host;
int try_reconnect;
#ifdef HAVE_GLOBAL_REPLICATION
NdbRepHandle m_repserver;
@@ -379,15 +378,16 @@ CommandInterpreter::CommandInterpreter(const char *_host)
m_mgmsrv = ndb_mgm_create_handle();
if(m_mgmsrv == NULL) {
ndbout_c("Cannot create handle to management server.");
+ exit(-1);
+ }
+ if (ndb_mgm_set_connectstring(m_mgmsrv, _host))
+ {
printError();
+ exit(-1);
}
connected = false;
try_reconnect = 0;
- if (_host)
- host= my_strdup(_host,MYF(MY_WME));
- else
- host= 0;
#ifdef HAVE_GLOBAL_REPLICATION
rep_host = NULL;
m_repserver = NULL;
@@ -402,8 +402,6 @@ CommandInterpreter::~CommandInterpreter()
{
connected = false;
ndb_mgm_destroy_handle(&m_mgmsrv);
- my_free((char *)host,MYF(MY_ALLOW_ZERO_PTR));
- host = NULL;
}
static bool
@@ -438,18 +436,8 @@ bool
CommandInterpreter::connect()
{
if(!connected) {
- int tries = try_reconnect; // tries == 0 => infinite
- while(!connected) {
- if(ndb_mgm_connect(m_mgmsrv, host) == -1) {
- ndbout << "Cannot connect to management server (" << host << ").";
- tries--;
- if (tries == 0)
- break;
- ndbout << "Retrying in 5 seconds." << endl;
- NdbSleep_SecSleep(5);
- } else
- connected = true;
- }
+ if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1))
+ connected = true;
}
return connected;
}
diff --git a/ndb/src/mgmclient/main.cpp b/ndb/src/mgmclient/main.cpp
index 401a9198f30..f32cc683296 100644
--- a/ndb/src/mgmclient/main.cpp
+++ b/ndb/src/mgmclient/main.cpp
@@ -30,9 +30,10 @@ extern "C" int add_history(const char *command); /* From readline directory */
#include <NdbMain.h>
#include <NdbHost.h>
+#include <BaseString.hpp>
+#include <NdbOut.hpp>
#include <mgmapi.h>
#include <ndb_version.h>
-#include <LocalConfig.hpp>
#include "ndb_mgmclient.hpp"
@@ -138,7 +139,7 @@ int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *_host = 0;
int _port = 0;
- const char *load_default_groups[]= { "ndb_mgm",0 };
+ const char *load_default_groups[]= { "mysql_cluster","ndb_mgm",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp
index a49b29af275..81b5eb9dfb3 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.cpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.cpp
@@ -399,16 +399,20 @@ MgmtSrvr::getPort() const {
}
/* Constructor */
-MgmtSrvr::MgmtSrvr(NodeId nodeId,
- SocketServer *socket_server,
- const BaseString &configFilename,
- LocalConfig &local_config,
- Config * config):
+int MgmtSrvr::init()
+{
+ if ( _ownNodeId > 0)
+ return 0;
+ return -1;
+}
+
+MgmtSrvr::MgmtSrvr(SocketServer *socket_server,
+ const char *config_filename,
+ const char *connect_string) :
_blockNumber(1), // Hard coded block number since it makes it easy to send
// signals to other management servers.
m_socket_server(socket_server),
_ownReference(0),
- m_local_config(local_config),
theSignalIdleList(NULL),
theWaitState(WAIT_SUBSCRIBE_CONF),
m_statisticsListner(this)
@@ -416,6 +420,8 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
DBUG_ENTER("MgmtSrvr::MgmtSrvr");
+ _ownNodeId= 0;
+
_config = NULL;
_isStopThread = false;
@@ -426,12 +432,43 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
theFacade = 0;
m_newConfig = NULL;
- m_configFilename = configFilename;
+ m_configFilename.assign(config_filename);
m_nextConfigGenerationNumber = 0;
- _config = (config == 0 ? readConfig() : config);
-
+ m_config_retriever= new ConfigRetriever(connect_string,
+ NDB_VERSION, NDB_MGM_NODE_TYPE_MGM);
+
+ // first try to allocate nodeid from another management server
+ if(m_config_retriever->do_connect(0,0,0) == 0)
+ {
+ int tmp_nodeid= 0;
+ tmp_nodeid= m_config_retriever->allocNodeId(0 /*retry*/,0 /*delay*/);
+ if (tmp_nodeid == 0)
+ {
+ ndbout_c(m_config_retriever->getErrorString());
+ exit(-1);
+ }
+ // read config from other managent server
+ _config= fetchConfig();
+ if (_config == 0)
+ {
+ ndbout << m_config_retriever->getErrorString() << endl;
+ exit(-1);
+ }
+ _ownNodeId= tmp_nodeid;
+ }
+
+ if (_ownNodeId == 0)
+ {
+ // read config locally
+ _config= readConfig();
+ if (_config == 0) {
+ ndbout << "Unable to read config file" << endl;
+ exit(-1);
+ }
+ }
+
theMgmtWaitForResponseCondPtr = NdbCondition_Create();
m_configMutex = NdbMutex_Create();
@@ -443,9 +480,11 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
nodeTypes[i] = (enum ndb_mgm_node_type)-1;
m_connect_address[i].s_addr= 0;
}
+
{
- ndb_mgm_configuration_iterator * iter = ndb_mgm_create_configuration_iterator
- (config->m_configValues, CFG_SECTION_NODE);
+ ndb_mgm_configuration_iterator
+ *iter = ndb_mgm_create_configuration_iterator(_config->m_configValues,
+ CFG_SECTION_NODE);
for(ndb_mgm_first(iter); ndb_mgm_valid(iter); ndb_mgm_next(iter)){
unsigned type, id;
if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0)
@@ -478,8 +517,6 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
}
_props = NULL;
- _ownNodeId= 0;
- NodeId tmp= nodeId;
BaseString error_string;
if ((m_node_id_mutex = NdbMutex_Create()) == 0)
@@ -488,43 +525,25 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
exit(-1);
}
-#if 0
- char my_hostname[256];
- struct sockaddr_in tmp_addr;
- SOCKET_SIZE_TYPE addrlen= sizeof(tmp_addr);
- if (!g_no_nodeid_checks) {
- if (gethostname(my_hostname, sizeof(my_hostname))) {
- ndbout << "error: gethostname() - " << strerror(errno) << endl;
- exit(-1);
- }
- if (Ndb_getInAddr(&(((sockaddr_in*)&tmp_addr)->sin_addr),my_hostname)) {
- ndbout << "error: Ndb_getInAddr(" << my_hostname << ") - "
- << strerror(errno) << endl;
+ if (_ownNodeId == 0) // we did not get node id from other server
+ {
+ NodeId tmp= m_config_retriever->get_configuration_nodeid();
+
+ if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
+ 0, 0, error_string)){
+ ndbout << "Unable to obtain requested nodeid: "
+ << error_string.c_str() << endl;
exit(-1);
}
+ _ownNodeId = tmp;
}
- if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
- (struct sockaddr *)&tmp_addr,
- &addrlen, error_string)){
- ndbout << "Unable to obtain requested nodeid: "
- << error_string.c_str() << endl;
- exit(-1);
- }
-#else
- if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
- 0, 0, error_string)){
- ndbout << "Unable to obtain requested nodeid: "
- << error_string.c_str() << endl;
- exit(-1);
- }
-#endif
- _ownNodeId = tmp;
{
DBUG_PRINT("info", ("verifyConfig"));
- ConfigRetriever cr(m_local_config, NDB_VERSION, NDB_MGM_NODE_TYPE_MGM);
- if (!cr.verifyConfig(config->m_configValues, _ownNodeId)) {
- ndbout << cr.getErrorString() << endl;
+ if (!m_config_retriever->verifyConfig(_config->m_configValues,
+ _ownNodeId))
+ {
+ ndbout << m_config_retriever->getErrorString() << endl;
exit(-1);
}
}
@@ -657,6 +676,8 @@ MgmtSrvr::~MgmtSrvr()
NdbThread_WaitFor(m_signalRecvThread, &res);
NdbThread_Destroy(&m_signalRecvThread);
}
+ if (m_config_retriever)
+ delete m_config_retriever;
}
//****************************************************************************
diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp
index b3257491123..2ab11250d81 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.hpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.hpp
@@ -175,11 +175,10 @@ public:
/* Constructor */
- MgmtSrvr(NodeId nodeId, /* Local nodeid */
- SocketServer *socket_server,
- const BaseString &config_filename, /* Where to save config */
- LocalConfig &local_config, /* Ndb.cfg filename */
- Config * config);
+ MgmtSrvr(SocketServer *socket_server,
+ const char *config_filename, /* Where to save config */
+ const char *connect_string);
+ int init();
NodeId getOwnNodeId() const {return _ownNodeId;};
/**
@@ -538,7 +537,6 @@ private:
NdbMutex *m_configMutex;
const Config * _config;
Config * m_newConfig;
- LocalConfig &m_local_config;
BaseString m_configFilename;
Uint32 m_nextConfigGenerationNumber;
@@ -755,6 +753,9 @@ private:
Config *_props;
int send(class NdbApiSignal* signal, Uint32 node, Uint32 node_type);
+
+ ConfigRetriever *m_config_retriever;
+
public:
/**
* This method does not exist
diff --git a/ndb/src/mgmsrv/MgmtSrvrConfig.cpp b/ndb/src/mgmsrv/MgmtSrvrConfig.cpp
index 1d51061e909..6c4b4e9ae3c 100644
--- a/ndb/src/mgmsrv/MgmtSrvrConfig.cpp
+++ b/ndb/src/mgmsrv/MgmtSrvrConfig.cpp
@@ -272,30 +272,20 @@ MgmtSrvr::saveConfig(const Config *conf) {
Config *
MgmtSrvr::readConfig() {
- Config *conf = NULL;
- if(m_configFilename.length() != 0) {
- /* Use config file */
- InitConfigFileParser parser;
- conf = parser.parseConfig(m_configFilename.c_str());
-
- if(conf == NULL) {
- /* Try to get configuration from other MGM server */
- return fetchConfig();
- }
- }
+ Config *conf;
+ InitConfigFileParser parser;
+ conf = parser.parseConfig(m_configFilename.c_str());
return conf;
}
Config *
MgmtSrvr::fetchConfig() {
- ConfigRetriever cr(m_local_config, NDB_VERSION, NODE_TYPE_MGM);
- struct ndb_mgm_configuration * tmp = cr.getConfig();
+ struct ndb_mgm_configuration * tmp = m_config_retriever->getConfig();
if(tmp != 0){
Config * conf = new Config();
conf->m_configValues = tmp;
return conf;
}
-
return 0;
}
diff --git a/ndb/src/mgmsrv/main.cpp b/ndb/src/mgmsrv/main.cpp
index b588a2d0933..84ff98626e5 100644
--- a/ndb/src/mgmsrv/main.cpp
+++ b/ndb/src/mgmsrv/main.cpp
@@ -62,7 +62,6 @@ struct MgmGlobals {
int non_interactive;
int interactive;
const char * config_filename;
- const char * local_config_filename;
/** Stuff found in environment or in local config */
NodeId localNodeId;
@@ -70,9 +69,6 @@ struct MgmGlobals {
char * interface_name;
int port;
- /** The configuration of the cluster */
- Config * cluster_config;
-
/** The Mgmt Server */
MgmtSrvr * mgmObject;
@@ -86,9 +82,6 @@ static MgmGlobals glob;
/******************************************************************************
* Function prototypes
******************************************************************************/
-static bool readLocalConfig();
-static bool readGlobalConfig();
-
/**
* Global variables
*/
@@ -100,16 +93,28 @@ static char *opt_connect_str= 0;
static struct my_option my_long_options[] =
{
- NDB_STD_OPTS("ndb_mgm"),
+#ifndef DBUG_OFF
+ { "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.",
+ 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 },
+#endif
+ { "usage", '?', "Display this help and exit.",
+ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
+ { "help", '?', "Display this help and exit.",
+ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
+ { "version", 'V', "Output version information and exit.", 0, 0, 0,
+ GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
+ { "connect-string", 1023,
+ "Set connect string for connecting to ndb_mgmd. "
+ "<constr>=\"host=<hostname:port>[;nodeid=<id>]\". "
+ "Overides specifying entries in NDB_CONNECTSTRING and config file",
+ (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,
+ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "config-file", 'f', "Specify cluster configuration file",
(gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "daemon", 'd', "Run ndb_mgmd in daemon mode (default)",
(gptr*) &glob.daemon, (gptr*) &glob.daemon, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
- { "l", 'l', "Specify configuration file connect string (default Ndb.cfg if available)",
- (gptr*) &glob.local_config_filename, (gptr*) &glob.local_config_filename, 0,
- GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "interactive", 256, "Run interactive. Not supported but provided for testing purposes",
(gptr*) &glob.interactive, (gptr*) &glob.interactive, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
@@ -173,7 +178,7 @@ int main(int argc, char** argv)
global_mgmt_server_check = 1;
glob.config_filename= "config.ini";
- const char *load_default_groups[]= { "ndb_mgmd",0 };
+ const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
@@ -189,29 +194,16 @@ int main(int argc, char** argv)
MgmApiService * mapi = new MgmApiService();
- /****************************
- * Read configuration files *
- ****************************/
- LocalConfig local_config;
- if(!local_config.init(opt_connect_str,glob.local_config_filename)){
- local_config.printError();
- goto error_end;
- }
- glob.localNodeId = local_config._ownNodeId;
+ glob.mgmObject = new MgmtSrvr(glob.socketServer,
+ glob.config_filename,
+ opt_connect_str);
- if (!readGlobalConfig())
+ if (glob.mgmObject->init())
goto error_end;
- glob.mgmObject = new MgmtSrvr(glob.localNodeId, glob.socketServer,
- BaseString(glob.config_filename),
- local_config,
- glob.cluster_config);
-
chdir(NdbConfig_get_path(0));
- glob.cluster_config = 0;
glob.localNodeId= glob.mgmObject->getOwnNodeId();
-
if (glob.localNodeId == 0) {
goto error_end;
}
@@ -322,9 +314,7 @@ MgmGlobals::MgmGlobals(){
// Default values
port = 0;
config_filename = NULL;
- local_config_filename = NULL;
interface_name = 0;
- cluster_config = 0;
daemon = 1;
non_interactive = 0;
interactive = 0;
@@ -337,27 +327,6 @@ MgmGlobals::~MgmGlobals(){
delete socketServer;
if (mgmObject)
delete mgmObject;
- if (cluster_config)
- delete cluster_config;
if (interface_name)
free(interface_name);
}
-
-/**
- * @fn readGlobalConfig
- * @param glob : Global variables
- * @return true if success, false otherwise.
- */
-static bool
-readGlobalConfig() {
- if(glob.config_filename == NULL)
- return false;
-
- /* Use config file */
- InitConfigFileParser parser;
- glob.cluster_config = parser.parseConfig(glob.config_filename);
- if(glob.cluster_config == 0){
- return false;
- }
- return true;
-}
diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp
index 719c5bef49e..f4bb000300a 100644
--- a/ndb/src/ndbapi/NdbConnection.cpp
+++ b/ndb/src/ndbapi/NdbConnection.cpp
@@ -1122,8 +1122,11 @@ NdbConnection::getNdbIndexScanOperation(const NdbIndexImpl* index,
if (indexTable != 0){
NdbIndexScanOperation* tOp =
getNdbScanOperation((NdbTableImpl *) indexTable);
- tOp->m_currentTable = table;
- if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor;
+ if(tOp)
+ {
+ tOp->m_currentTable = table;
+ tOp->m_cursor_type = NdbScanOperation::IndexCursor;
+ }
return tOp;
} else {
setOperationErrorCodeAbort(theNdb->theError.code);
@@ -1618,9 +1621,6 @@ from other transactions.
/**
* There's always a TCKEYCONF when using IgnoreError
*/
-#ifdef VM_TRACE
- ndbout_c("Not completing transaction 2");
-#endif
return -1;
}
/**********************************************************************/
@@ -1872,9 +1872,6 @@ NdbConnection::OpCompleteFailure(Uint8 abortOption, bool setFailure)
/**
* There's always a TCKEYCONF when using IgnoreError
*/
-#ifdef VM_TRACE
- ndbout_c("Not completing transaction");
-#endif
return -1;
}
diff --git a/ndb/src/ndbapi/NdbConnectionScan.cpp b/ndb/src/ndbapi/NdbConnectionScan.cpp
index 3fe8993a42b..a1a220caacf 100644
--- a/ndb/src/ndbapi/NdbConnectionScan.cpp
+++ b/ndb/src/ndbapi/NdbConnectionScan.cpp
@@ -57,12 +57,18 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){
if(checkState_TransId(&ref->transId1)){
theScanningOp->theError.code = ref->errorCode;
+ theScanningOp->execCLOSE_SCAN_REP();
if(!ref->closeNeeded){
- theScanningOp->execCLOSE_SCAN_REP();
return 0;
}
- assert(theScanningOp->m_sent_receivers_count);
+
+ /**
+ * Setup so that close_impl will actually perform a close
+ * and not "close scan"-optimze it away
+ */
theScanningOp->m_conf_receivers_count++;
+ theScanningOp->m_conf_receivers[0] = theScanningOp->m_receivers[0];
+ theScanningOp->m_conf_receivers[0]->m_tcPtrI = ~0;
return 0;
} else {
#ifdef NDB_NO_DROPPED_SIGNAL
@@ -97,7 +103,7 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
theScanningOp->execCLOSE_SCAN_REP();
return 0;
}
-
+
for(Uint32 i = 0; i<len; i += 3){
Uint32 opCount, totalLen;
Uint32 ptrI = * ops++;
@@ -109,24 +115,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
void * tPtr = theNdb->int2void(ptrI);
assert(tPtr); // For now
NdbReceiver* tOp = theNdb->void2rec(tPtr);
- if (tOp && tOp->checkMagicNumber()){
- if(tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)){
- /**
- *
- */
- theScanningOp->receiver_delivered(tOp);
- } else if(info == ScanTabConf::EndOfData){
+ if (tOp && tOp->checkMagicNumber())
+ {
+ if (tcPtrI == RNIL && opCount == 0)
theScanningOp->receiver_completed(tOp);
- }
- }
- }
- if (conf->requestInfo & ScanTabConf::EndOfData) {
- if(theScanningOp->m_ordered)
- theScanningOp->m_api_receivers_count = 0;
- if(theScanningOp->m_api_receivers_count +
- theScanningOp->m_conf_receivers_count +
- theScanningOp->m_sent_receivers_count){
- abort();
+ else if (tOp->execSCANOPCONF(tcPtrI, totalLen, opCount))
+ theScanningOp->receiver_delivered(tOp);
}
}
return 0;
diff --git a/ndb/src/ndbapi/NdbResultSet.cpp b/ndb/src/ndbapi/NdbResultSet.cpp
index f270584d227..d9d71464026 100644
--- a/ndb/src/ndbapi/NdbResultSet.cpp
+++ b/ndb/src/ndbapi/NdbResultSet.cpp
@@ -44,10 +44,10 @@ void NdbResultSet::init()
{
}
-int NdbResultSet::nextResult(bool fetchAllowed)
+int NdbResultSet::nextResult(bool fetchAllowed, bool forceSend)
{
int res;
- if ((res = m_operation->nextResult(fetchAllowed)) == 0) {
+ if ((res = m_operation->nextResult(fetchAllowed, forceSend)) == 0) {
// handle blobs
NdbBlob* tBlob = m_operation->theBlobList;
while (tBlob != 0) {
@@ -67,9 +67,9 @@ int NdbResultSet::nextResult(bool fetchAllowed)
return res;
}
-void NdbResultSet::close()
+void NdbResultSet::close(bool forceSend)
{
- m_operation->closeScan();
+ m_operation->closeScan(forceSend);
}
NdbOperation*
@@ -98,6 +98,6 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){
}
int
-NdbResultSet::restart(){
- return m_operation->restart();
+NdbResultSet::restart(bool forceSend){
+ return m_operation->restart(forceSend);
}
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index 4b10ebb10cd..db0c294708d 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -35,6 +35,8 @@
#include <signaldata/AttrInfo.hpp>
#include <signaldata/TcKeyReq.hpp>
+#define DEBUG_NEXT_RESULT 0
+
NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
NdbOperation(aNdb),
m_resultSet(0),
@@ -275,6 +277,9 @@ NdbScanOperation::fix_receivers(Uint32 parallel){
void
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
if(theError.code == 0){
+ if(DEBUG_NEXT_RESULT)
+ ndbout_c("receiver_delivered");
+
Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1;
if(idx != last){
@@ -298,6 +303,9 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
void
NdbScanOperation::receiver_completed(NdbReceiver* tRec){
if(theError.code == 0){
+ if(DEBUG_NEXT_RESULT)
+ ndbout_c("receiver_completed");
+
Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1;
if(idx != last){
@@ -445,12 +453,12 @@ NdbScanOperation::executeCursor(int nodeId){
return -1;
}
-#define DEBUG_NEXT_RESULT 0
-int NdbScanOperation::nextResult(bool fetchAllowed)
+int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
{
if(m_ordered)
- return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed);
+ return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
+ forceSend);
/**
* Check current receiver
@@ -487,7 +495,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
- if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){
+ if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
+ forceSend) == 0){
idx = m_current_api_receiver;
last = m_api_receivers_count;
@@ -578,8 +587,9 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
}
int
-NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
- if(cnt > 0 || stopScanFlag){
+NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
+ bool forceSend){
+ if(cnt > 0){
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
@@ -595,38 +605,57 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
*/
Uint32 last = m_sent_receivers_count;
Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
+ Uint32 sent = 0;
for(Uint32 i = 0; i<cnt; i++){
NdbReceiver * tRec = m_api_receivers[i];
- m_sent_receivers[last+i] = tRec;
- tRec->m_list_index = last+i;
- prep_array[i] = tRec->m_tcPtrI;
- tRec->prepareSend();
+ if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
+ {
+ m_sent_receivers[last+sent] = tRec;
+ tRec->m_list_index = last+sent;
+ tRec->prepareSend();
+ sent++;
+ }
}
- memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*));
+ memmove(m_api_receivers, m_api_receivers+cnt,
+ (theParallelism-cnt) * sizeof(char*));
- Uint32 nodeId = theNdbCon->theDBnode;
- TransporterFacade * tp = TransporterFacade::instance();
- int ret;
- if(cnt > 21){
- tSignal.setLength(4);
- LinearSectionPtr ptr[3];
- ptr[0].p = prep_array;
- ptr[0].sz = cnt;
- ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
- } else {
- tSignal.setLength(4+cnt);
- ret = tp->sendSignal(&tSignal, nodeId);
+ int ret = 0;
+ if(sent)
+ {
+ Uint32 nodeId = theNdbCon->theDBnode;
+ TransporterFacade * tp = TransporterFacade::instance();
+ if(cnt > 21){
+ tSignal.setLength(4);
+ LinearSectionPtr ptr[3];
+ ptr[0].p = prep_array;
+ ptr[0].sz = sent;
+ ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
+ } else {
+ tSignal.setLength(4+sent);
+ ret = tp->sendSignal(&tSignal, nodeId);
+ }
}
+
+ if (!ret) checkForceSend(forceSend);
- m_sent_receivers_count = last + cnt + stopScanFlag;
+ m_sent_receivers_count = last + sent;
m_api_receivers_count -= cnt;
m_current_api_receiver = 0;
-
+
return ret;
}
return 0;
}
+void NdbScanOperation::checkForceSend(bool forceSend)
+{
+ if (forceSend) {
+ TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
+ } else {
+ TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
+ }//if
+}
+
int
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
{
@@ -642,7 +671,7 @@ NdbScanOperation::doSend(int ProcessorId)
return 0;
}
-void NdbScanOperation::closeScan()
+void NdbScanOperation::closeScan(bool forceSend)
{
if(m_transConnection){
if(DEBUG_NEXT_RESULT)
@@ -657,7 +686,7 @@ void NdbScanOperation::closeScan()
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
- close_impl(tp);
+ close_impl(tp, forceSend);
} while(0);
@@ -673,6 +702,7 @@ NdbScanOperation::execCLOSE_SCAN_REP(){
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
+ m_current_api_receiver = m_ordered ? theParallelism : 0;
}
void NdbScanOperation::release()
@@ -1293,7 +1323,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
}
int
-NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
+NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
+ bool forceSend){
Uint32 u_idx = 0, u_last = 0;
Uint32 s_idx = m_current_api_receiver; // first sorted
@@ -1319,7 +1350,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
- if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){
+ if(seq == tp->getNodeSequence(nodeId) &&
+ !send_next_scan_ordered(s_idx, forceSend)){
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
@@ -1408,14 +1440,26 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
}
int
-NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
+NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
if(idx == theParallelism)
return 0;
+ NdbReceiver* tRec = m_api_receivers[idx];
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
+ Uint32 last = m_sent_receivers_count;
Uint32* theData = tSignal.getDataPtrSend();
+ Uint32* prep_array = theData + 4;
+
+ m_current_api_receiver = idx + 1;
+ if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
+ {
+ if(DEBUG_NEXT_RESULT)
+ ndbout_c("receiver completed, don't send");
+ return 0;
+ }
+
theData[0] = theNdbCon->theTCConPtr;
theData[1] = 0;
Uint64 transId = theNdbCon->theTransactionId;
@@ -1425,35 +1469,35 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
/**
* Prepare ops
*/
- Uint32 last = m_sent_receivers_count;
- Uint32 * prep_array = theData + 4;
-
- NdbReceiver * tRec = m_api_receivers[idx];
m_sent_receivers[last] = tRec;
tRec->m_list_index = last;
- prep_array[0] = tRec->m_tcPtrI;
tRec->prepareSend();
-
m_sent_receivers_count = last + 1;
- m_current_api_receiver = idx + 1;
Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance();
tSignal.setLength(4+1);
- return tp->sendSignal(&tSignal, nodeId);
+ int ret= tp->sendSignal(&tSignal, nodeId);
+ if (!ret) checkForceSend(forceSend);
+ return ret;
}
int
-NdbScanOperation::close_impl(TransporterFacade* tp){
+NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
- if(seq != tp->getNodeSequence(nodeId)){
+ if(seq != tp->getNodeSequence(nodeId))
+ {
theNdbCon->theReleaseOnClose = true;
return -1;
}
- while(theError.code == 0 && m_sent_receivers_count){
+ /**
+ * Wait for outstanding
+ */
+ while(theError.code == 0 && m_sent_receivers_count)
+ {
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@@ -1471,18 +1515,52 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
}
}
- if(m_api_receivers_count+m_conf_receivers_count){
- // Send close scan
- if(send_next_scan(0, true) == -1){ // Close scan
- theNdbCon->theReleaseOnClose = true;
- return -1;
- }
+ /**
+ * move all conf'ed into api
+ * so that send_next_scan can check if they needs to be closed
+ */
+ Uint32 api = m_api_receivers_count;
+ Uint32 conf = m_conf_receivers_count;
+
+ if(m_ordered)
+ {
+ /**
+ * Ordered scan, keep the m_api_receivers "to the right"
+ */
+ memmove(m_api_receivers, m_api_receivers+m_current_api_receiver,
+ (theParallelism - m_current_api_receiver) * sizeof(char*));
+ api = (theParallelism - m_current_api_receiver);
+ m_api_receivers_count = api;
+ }
+
+ if(DEBUG_NEXT_RESULT)
+ ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
+ m_ordered, api, conf,
+ m_sent_receivers_count, m_current_api_receiver, theParallelism);
+
+ if(api+conf)
+ {
+ /**
+ * There's something to close
+ * setup m_api_receivers (for send_next_scan)
+ */
+ memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
+ m_api_receivers_count = api + conf;
+ m_conf_receivers_count = 0;
+ }
+
+ // Send close scan
+ if(send_next_scan(api+conf, true, forceSend) == -1)
+ {
+ theNdbCon->theReleaseOnClose = true;
+ return -1;
}
/**
* wait for close scan conf
*/
- while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
+ while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
+ {
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
@@ -1499,6 +1577,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
return -1;
}
}
+
return 0;
}
@@ -1520,7 +1599,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
}
int
-NdbScanOperation::restart()
+NdbScanOperation::restart(bool forceSend)
{
TransporterFacade* tp = TransporterFacade::instance();
@@ -1529,7 +1608,7 @@ NdbScanOperation::restart()
{
int res;
- if((res= close_impl(tp)))
+ if((res= close_impl(tp, forceSend)))
{
return res;
}
@@ -1548,13 +1627,13 @@ NdbScanOperation::restart()
}
int
-NdbIndexScanOperation::reset_bounds(){
+NdbIndexScanOperation::reset_bounds(bool forceSend){
int res;
{
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
- res= close_impl(tp);
+ res= close_impl(tp, forceSend);
}
if(!res)
diff --git a/ndb/src/ndbapi/ndb_cluster_connection.cpp b/ndb/src/ndbapi/ndb_cluster_connection.cpp
index 4c42fe1aeef..b2043b2c2c1 100644
--- a/ndb/src/ndbapi/ndb_cluster_connection.cpp
+++ b/ndb/src/ndbapi/ndb_cluster_connection.cpp
@@ -45,7 +45,6 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
else
m_connect_string= 0;
m_config_retriever= 0;
- m_local_config= 0;
m_connect_thread= 0;
m_connect_callback= 0;
@@ -125,38 +124,31 @@ int Ndb_cluster_connection::connect(int reconnect)
do {
if (m_config_retriever == 0)
{
- if (m_local_config == 0) {
- m_local_config= new LocalConfig();
- if (!m_local_config->init(m_connect_string,0)) {
- ndbout_c("Configuration error: Unable to retrieve local config");
- m_local_config->printError();
- m_local_config->printUsage();
- DBUG_RETURN(-1);
- }
- }
m_config_retriever=
- new ConfigRetriever(*m_local_config, NDB_VERSION, NODE_TYPE_API);
+ new ConfigRetriever(m_connect_string, NDB_VERSION, NODE_TYPE_API);
+ if (m_config_retriever->hasError())
+ {
+ printf("Could not connect initialize handle to management server",
+ m_config_retriever->getErrorString());
+ DBUG_RETURN(-1);
+ }
}
else
if (reconnect == 0)
DBUG_RETURN(0);
if (reconnect)
{
- int r= m_config_retriever->do_connect(1);
+ int r= m_config_retriever->do_connect(0,0,0);
if (r == 1)
DBUG_RETURN(1); // mgmt server not up yet
if (r == -1)
break;
}
else
- if(m_config_retriever->do_connect() == -1)
+ if(m_config_retriever->do_connect(12,5,1) == -1)
break;
- Uint32 nodeId = m_config_retriever->allocNodeId();
- for(Uint32 i = 0; nodeId == 0 && i<5; i++){
- NdbSleep_SecSleep(3);
- nodeId = m_config_retriever->allocNodeId();
- }
+ Uint32 nodeId = m_config_retriever->allocNodeId(4/*retries*/,3/*delay*/);
if(nodeId == 0)
break;
ndb_mgm_configuration * props = m_config_retriever->getConfig();
@@ -200,8 +192,6 @@ Ndb_cluster_connection::~Ndb_cluster_connection()
my_free(m_connect_string,MYF(MY_ALLOW_ZERO_PTR));
if (m_config_retriever)
delete m_config_retriever;
- if (m_local_config)
- delete m_local_config;
DBUG_VOID_RETURN;
}
diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c
index e08b80f2433..bc49358cc63 100644
--- a/ndb/src/ndbapi/ndberror.c
+++ b/ndb/src/ndbapi/ndberror.c
@@ -426,7 +426,8 @@ ErrorBundle ErrorCodes[] = {
{ 4267, IE, "Corrupted blob value" },
{ 4268, IE, "Error in blob head update forced rollback of transaction" },
{ 4268, IE, "Unknown blob error" },
- { 4269, IE, "No connection to ndb management server" }
+ { 4269, IE, "No connection to ndb management server" },
+ { 4335, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" }
};
static