diff options
73 files changed, 1173 insertions, 761 deletions
diff --git a/Build-tools/Do-compile b/Build-tools/Do-compile index fc73b1fb7e7..e98c3d84937 100755 --- a/Build-tools/Do-compile +++ b/Build-tools/Do-compile @@ -374,6 +374,7 @@ $ENV{"LD_LIBRARY_PATH"}= ("$test_dir/lib" . if ($opt_stage <= 5 && !$opt_no_test && !$opt_no_mysqltest) { my $flags= ""; + my $force= ""; $flags.= " --with-ndbcluster" if ($opt_with_cluster); $flags.= " --force" if (!$opt_one_error); log_timestamp(); diff --git a/client/Makefile.am b/client/Makefile.am index a78efec1eee..0362c2f3358 100644 --- a/client/Makefile.am +++ b/client/Makefile.am @@ -26,8 +26,8 @@ bin_PROGRAMS = mysql mysqladmin mysqlcheck mysqlshow \ mysqldump mysqlimport mysqltest mysqlbinlog mysqlmanagerc mysqlmanager-pwgen noinst_HEADERS = sql_string.h completion_hash.h my_readline.h \ client_priv.h -mysqladmin_SOURCES = mysqladmin.cc mysql_SOURCES = mysql.cc readline.cc sql_string.cc completion_hash.cc +mysqladmin_SOURCES = mysqladmin.cc mysql_LDADD = @readline_link@ @TERMCAP_LIB@ $(LDADD) $(CXXLDFLAGS) mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS) mysql_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) $(DEPLIB) diff --git a/innobase/configure.in b/innobase/configure.in index 652291f1f38..d83da9fdc5c 100644 --- a/innobase/configure.in +++ b/innobase/configure.in @@ -110,6 +110,9 @@ esac case "$target" in i[[4567]]86-*-*) CFLAGS="$CFLAGS -DUNIV_INTEL_X86";; + # The compiler on Linux/S390 does not seem to have inlining + s390-*-*) + CFLAGS="$CFLAGS -DUNIV_MUST_NOT_INLINE";; esac AC_OUTPUT(Makefile os/Makefile ut/Makefile btr/Makefile dnl diff --git a/mysql-test/r/ctype_uca.result b/mysql-test/r/ctype_uca.result index 7620b18eea6..cb060ad7ee4 100644 --- a/mysql-test/r/ctype_uca.result +++ b/mysql-test/r/ctype_uca.result @@ -19,6 +19,9 @@ select 'a ' = 'a\t', 'a ' < 'a\t', 'a ' > 'a\t'; select 'a a' > 'a', 'a \t' < 'a'; 'a a' > 'a' 'a \t' < 'a' 1 1 +select 'c' like '\_' as want0; +want0 +0 CREATE TABLE t ( c char(20) NOT NULL ) ENGINE=MyISAM DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; diff --git a/mysql-test/r/ctype_utf8.result b/mysql-test/r/ctype_utf8.result index 945ec8eae99..599d49208e7 100644 --- a/mysql-test/r/ctype_utf8.result +++ b/mysql-test/r/ctype_utf8.result @@ -814,3 +814,6 @@ t2 CREATE TABLE `t2` ( ) ENGINE=MyISAM DEFAULT CHARSET=latin1 drop table t2; drop table t1; +select 'c' like '\_' as want0; +want0 +0 diff --git a/mysql-test/t/ctype_uca.test b/mysql-test/t/ctype_uca.test index e640e6b53dc..11833ba9bc7 100644 --- a/mysql-test/t/ctype_uca.test +++ b/mysql-test/t/ctype_uca.test @@ -25,6 +25,11 @@ select 'a ' = 'a\t', 'a ' < 'a\t', 'a ' > 'a\t'; select 'a a' > 'a', 'a \t' < 'a'; # +# Bug #6787 LIKE not working properly with _ and utf8 data +# +select 'c' like '\_' as want0; + +# # Bug #5679 utf8_unicode_ci LIKE--trailing % doesn't equal zero characters # CREATE TABLE t ( diff --git a/mysql-test/t/ctype_utf8.test b/mysql-test/t/ctype_utf8.test index c75b1dee63c..42031be8f3c 100644 --- a/mysql-test/t/ctype_utf8.test +++ b/mysql-test/t/ctype_utf8.test @@ -660,3 +660,9 @@ create table t2 select concat(a,_utf8'') as a, concat(b,_utf8'')as b from t1; show create table t2; drop table t2; drop table t1; + +# +# Bug #6787 LIKE not working properly with _ and utf8 data +# +select 'c' like '\_' as want0; + diff --git a/ndb/docs/wl2077.txt b/ndb/docs/wl2077.txt new file mode 100644 index 00000000000..5a77c18aa2a --- /dev/null +++ b/ndb/docs/wl2077.txt @@ -0,0 +1,35 @@ + +100' * (select 1 from T1 (1M rows) where key = rand()); +1 host, 1 ndbd, api co-hosted +results in 1000 rows / sec + + wo/reset bounds w/ rb +4.1-read committed a) 4.9 b) 7.4 +4.1-read hold lock c) 4.7 d) 6.7 + +wl2077-read committed 6.4 (+30%) 10.8 (+45%) +wl2077-read hold lock 4.6 (-1%) 6.7 (+ 0%) + +-- Comparision e) +serial pk: 10.9' +batched (1000): 59' +serial uniq index: 8.4' +batched (1000): 33' +index range (1000): 186' + +---- + +load) testScanPerf -c 1 -d 1 T1 +a) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 0 T1 +b) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 1 T1 +c) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 0 T1 +d) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 1 T1 +e) testReadPerf -i 25 -c 0 -d 0 T1 + +--- music join 1db-co 2db-co + +4.1 13s 14s +4.1 wo/ blobs 1.7s 3.2s + +wl2077 12s 14s +wl2077 wo/ blobs 1.2s (-30%) 2.5s (-22%) diff --git a/ndb/include/Makefile.am b/ndb/include/Makefile.am index ca2e8152352..61f55cf9d61 100644 --- a/ndb/include/Makefile.am +++ b/ndb/include/Makefile.am @@ -28,15 +28,14 @@ ndbapi/NdbIndexScanOperation.hpp \ ndbapi/ndberror.h mgmapiinclude_HEADERS = \ -mgmapi/LocalConfig.hpp \ mgmapi/mgmapi.h \ -mgmapi/mgmapi_debug.h +mgmapi/mgmapi_debug.h \ +mgmapi/mgmapi_config_parameters.h \ +mgmapi/mgmapi_config_parameters_debug.h noinst_HEADERS = \ ndb_global.h \ -ndb_net.h \ -mgmapi/mgmapi_config_parameters.h \ -mgmapi/mgmapi_config_parameters_debug.h +ndb_net.h EXTRA_DIST = debugger editline kernel logger mgmcommon \ portlib transporter util diff --git a/ndb/include/mgmapi/mgmapi.h b/ndb/include/mgmapi/mgmapi.h index f1ef357421b..a23417f153a 100644 --- a/ndb/include/mgmapi/mgmapi.h +++ b/ndb/include/mgmapi/mgmapi.h @@ -356,11 +356,26 @@ extern "C" { /** * Create a handle to a management server * - * @return A management handle<br> - * or NULL if no management handle could be created. + * @return A management handle<br> + * or NULL if no management handle could be created. */ NdbMgmHandle ndb_mgm_create_handle(); + /** + * Set connecst string to management server + * + * @param handle Management handle + * @param connect_string Connect string to the management server, + * + * @return -1 on error. + */ + int ndb_mgm_set_connectstring(NdbMgmHandle handle, + const char *connect_string); + + int ndb_mgm_get_configuration_nodeid(NdbMgmHandle handle); + int ndb_mgm_get_connected_port(NdbMgmHandle handle); + const char *ndb_mgm_get_connected_host(NdbMgmHandle handle); + /** * Destroy a management server handle * @@ -378,11 +393,10 @@ extern "C" { * Connect to a management server * * @param handle Management handle. - * @param mgmsrv Hostname and port of the management server, - * "hostname:port". * @return -1 on error. */ - int ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv); + int ndb_mgm_connect(NdbMgmHandle handle, int no_retries, + int retry_delay_in_seconds, int verbose); /** * Disconnect from a management server @@ -709,9 +723,7 @@ extern "C" { void ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *); int ndb_mgm_alloc_nodeid(NdbMgmHandle handle, - unsigned version, - unsigned *pnodeid, - int nodetype); + unsigned version, int nodetype); /** * Config iterator */ diff --git a/ndb/include/mgmcommon/ConfigRetriever.hpp b/ndb/include/mgmcommon/ConfigRetriever.hpp index 6c32255e921..80449628867 100644 --- a/ndb/include/mgmcommon/ConfigRetriever.hpp +++ b/ndb/include/mgmcommon/ConfigRetriever.hpp @@ -20,7 +20,6 @@ #include <ndb_types.h> #include <mgmapi.h> #include <BaseString.hpp> -#include <LocalConfig.hpp> /** * @class ConfigRetriever @@ -28,10 +27,11 @@ */ class ConfigRetriever { public: - ConfigRetriever(LocalConfig &local_config, Uint32 version, Uint32 nodeType); + ConfigRetriever(const char * _connect_string, + Uint32 version, Uint32 nodeType); ~ConfigRetriever(); - int do_connect(int exit_on_connect_failure= false); + int do_connect(int no_retries, int retry_delay_in_seconds, int verbose); /** * Get configuration for current node. @@ -46,12 +46,14 @@ public: */ struct ndb_mgm_configuration * getConfig(); + void resetError(); + int hasError(); const char * getErrorString(); /** * @return Node id of this node (as stated in local config or connectString) */ - Uint32 allocNodeId(); + Uint32 allocNodeId(int no_retries, int retry_delay_in_seconds); /** * Get config using socket @@ -68,22 +70,26 @@ public: */ bool verifyConfig(const struct ndb_mgm_configuration *, Uint32 nodeid); - Uint32 get_mgmd_port() const {return m_mgmd_port;}; - const char *get_mgmd_host() const {return m_mgmd_host;}; + Uint32 get_mgmd_port() const; + const char *get_mgmd_host() const; + + Uint32 get_configuration_nodeid() const; private: BaseString errorString; enum ErrorType { - CR_ERROR = 0, - CR_RETRY = 1 + CR_NO_ERROR = 0, + CR_ERROR = 1, + CR_RETRY = 2 }; ErrorType latestErrorType; void setError(ErrorType, const char * errorMsg); - struct LocalConfig& _localConfig; - Uint32 _ownNodeId; + Uint32 _ownNodeId; + /* Uint32 m_mgmd_port; const char *m_mgmd_host; + */ Uint32 m_version; Uint32 m_node_type; diff --git a/ndb/include/ndbapi/NdbIndexScanOperation.hpp b/ndb/include/ndbapi/NdbIndexScanOperation.hpp index 66b3fc9d43b..a3388f62f58 100644 --- a/ndb/include/ndbapi/NdbIndexScanOperation.hpp +++ b/ndb/include/ndbapi/NdbIndexScanOperation.hpp @@ -113,7 +113,7 @@ public: * Reset bounds and put operation in list that will be * sent on next execute */ - int reset_bounds(); + int reset_bounds(bool forceSend = false); bool getSorted() const { return m_ordered; } private: @@ -127,8 +127,8 @@ private: virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*); void fix_get_values(); - int next_result_ordered(bool fetchAllowed); - int send_next_scan_ordered(Uint32 idx); + int next_result_ordered(bool fetchAllowed, bool forceSend = false); + int send_next_scan_ordered(Uint32 idx, bool forceSend = false); int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*); Uint32 m_sort_columns; diff --git a/ndb/include/ndbapi/NdbResultSet.hpp b/ndb/include/ndbapi/NdbResultSet.hpp index 478daf8aad2..dc0288a380c 100644 --- a/ndb/include/ndbapi/NdbResultSet.hpp +++ b/ndb/include/ndbapi/NdbResultSet.hpp @@ -89,17 +89,17 @@ public: * - 1: if there are no more tuples to scan. * - 2: if there are no more cached records in NdbApi */ - int nextResult(bool fetchAllowed = true); + int nextResult(bool fetchAllowed = true, bool forceSend = false); /** * Close result set (scan) */ - void close(); + void close(bool forceSend = false); /** * Restart */ - int restart(); + int restart(bool forceSend = false); /** * Transfer scan operation to an updating transaction. Use this function diff --git a/ndb/include/ndbapi/NdbScanOperation.hpp b/ndb/include/ndbapi/NdbScanOperation.hpp index 2e4d173ac75..3c95c79e776 100644 --- a/ndb/include/ndbapi/NdbScanOperation.hpp +++ b/ndb/include/ndbapi/NdbScanOperation.hpp @@ -90,11 +90,11 @@ protected: NdbScanOperation(Ndb* aNdb); virtual ~NdbScanOperation(); - int nextResult(bool fetchAllowed = true); + int nextResult(bool fetchAllowed = true, bool forceSend = false); virtual void release(); - void closeScan(); - int close_impl(class TransporterFacade*); + void closeScan(bool forceSend = false); + int close_impl(class TransporterFacade*, bool forceSend = false); // Overloaded methods from NdbCursorOperation int executeCursor(int ProcessorId); @@ -103,6 +103,7 @@ protected: int init(const NdbTableImpl* tab, NdbConnection* myConnection); int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId); int doSend(int ProcessorId); + void checkForceSend(bool forceSend); virtual void setErrorCode(int aErrorCode); virtual void setErrorCodeAbort(int aErrorCode); @@ -138,7 +139,7 @@ protected: Uint32 m_sent_receivers_count; // NOTE needs mutex to access NdbReceiver** m_sent_receivers; // receive thread puts them here - int send_next_scan(Uint32 cnt, bool close); + int send_next_scan(Uint32 cnt, bool close, bool forceSend = false); void receiver_delivered(NdbReceiver*); void receiver_completed(NdbReceiver*); void execCLOSE_SCAN_REP(); @@ -148,7 +149,7 @@ protected: Uint32 m_ordered; - int restart(); + int restart(bool forceSend = false); }; inline diff --git a/ndb/include/ndbapi/ndb_cluster_connection.hpp b/ndb/include/ndbapi/ndb_cluster_connection.hpp index f8e6f25ce73..59d5a038844 100644 --- a/ndb/include/ndbapi/ndb_cluster_connection.hpp +++ b/ndb/include/ndbapi/ndb_cluster_connection.hpp @@ -19,7 +19,6 @@ #define CLUSTER_CONNECTION_HPP class TransporterFacade; -class LocalConfig; class ConfigRetriever; class NdbThread; @@ -38,7 +37,6 @@ private: void connect_thread(); char *m_connect_string; TransporterFacade *m_facade; - LocalConfig *m_local_config; ConfigRetriever *m_config_retriever; NdbThread *m_connect_thread; int (*m_connect_callback)(void); diff --git a/ndb/include/util/ndb_opts.h b/ndb/include/util/ndb_opts.h index 6cba9c04449..f7ae3b5489e 100644 --- a/ndb/include/util/ndb_opts.h +++ b/ndb/include/util/ndb_opts.h @@ -32,10 +32,13 @@ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ { "version", 'V', "Output version information and exit.", 0, 0, 0, \ GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ - { "connect-string", 'c', \ + { "ndb-connectstring", 'c', \ "Set connect string for connecting to ndb_mgmd. " \ - "<constr>=\"host=<hostname:port>[;nodeid=<id>]\". " \ - "Overides specifying entries in NDB_CONNECTSTRING and config file", \ + "Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \ + "Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \ + (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \ + GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\ + { "connect-string", 'c', "same as --ndb-connectstring",\ (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 } #else @@ -46,11 +49,14 @@ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ { "version", 'V', "Output version information and exit.", 0, 0, 0, \ GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ - { "connect-string", 'c', \ + { "ndb-connectstring", 'c', \ "Set connect string for connecting to ndb_mgmd. " \ - "<constr>=\"host=<hostname:port>[;nodeid=<id>]\". " \ - "Overides specifying entries in NDB_CONNECTSTRING and config file", \ + "Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \ + "Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \ (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \ + GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\ + { "connect-string", 'c', "same as --ndb-connectstring",\ + (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,\ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 } #endif diff --git a/ndb/src/common/debugger/signaldata/ScanTab.cpp b/ndb/src/common/debugger/signaldata/ScanTab.cpp index 72a4d9f94b9..0755ee0a856 100644 --- a/ndb/src/common/debugger/signaldata/ScanTab.cpp +++ b/ndb/src/common/debugger/signaldata/ScanTab.cpp @@ -30,13 +30,14 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv fprintf(output, " apiConnectPtr: H\'%.8x", sig->apiConnectPtr); fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo); - fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u\n", + fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u ReadCommitted: %u\n", sig->getParallelism(requestInfo), sig->getScanBatch(requestInfo), sig->getLockMode(requestInfo), + sig->getKeyinfoFlag(requestInfo), sig->getHoldLockFlag(requestInfo), sig->getRangeScanFlag(requestInfo), - sig->getKeyinfoFlag(requestInfo)); + sig->getReadCommittedFlag(requestInfo)); Uint32 keyLen = (sig->attrLenKeyLen >> 16); Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF); diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp index a1b979f62d8..0af5eb2f83c 100644 --- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp +++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp @@ -20,7 +20,6 @@ #include <ConfigRetriever.hpp> #include <SocketServer.hpp> -#include "LocalConfig.hpp" #include <NdbSleep.h> #include <NdbOut.hpp> @@ -45,90 +44,62 @@ //**************************************************************************** //**************************************************************************** -ConfigRetriever::ConfigRetriever(LocalConfig &local_config, +ConfigRetriever::ConfigRetriever(const char * _connect_string, Uint32 version, Uint32 node_type) - : _localConfig(local_config) { - m_handle= 0; m_version = version; m_node_type = node_type; - _ownNodeId = _localConfig._ownNodeId; -} + _ownNodeId= 0; -ConfigRetriever::~ConfigRetriever(){ + m_handle= ndb_mgm_create_handle(); + if (m_handle == 0) { + setError(CR_ERROR, "Unable to allocate mgm handle"); + return; + } + + if (ndb_mgm_set_connectstring(m_handle, _connect_string)) + { + setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle)); + return; + } + resetError(); +} + +ConfigRetriever::~ConfigRetriever() +{ if (m_handle) { ndb_mgm_disconnect(m_handle); ndb_mgm_destroy_handle(&m_handle); } } +Uint32 +ConfigRetriever::get_configuration_nodeid() const +{ + return ndb_mgm_get_configuration_nodeid(m_handle); +} + +Uint32 ConfigRetriever::get_mgmd_port() const +{ + return ndb_mgm_get_connected_port(m_handle); +} + +const char *ConfigRetriever::get_mgmd_host() const +{ + return ndb_mgm_get_connected_host(m_handle); +} //**************************************************************************** //**************************************************************************** int -ConfigRetriever::do_connect(int exit_on_connect_failure){ - - m_mgmd_port= 0; - m_mgmd_host= 0; - - if(!m_handle) - m_handle= ndb_mgm_create_handle(); - - if (m_handle == 0) { - setError(CR_ERROR, "Unable to allocate mgm handle"); - return -1; - } - - int retry = 1; - int retry_max = 12; // Max number of retry attempts - int retry_interval= 5; // Seconds between each retry - while(retry < retry_max){ - Uint32 type = CR_ERROR; - BaseString tmp; - for (unsigned int i = 0; i<_localConfig.ids.size(); i++){ - MgmtSrvrId * m = &_localConfig.ids[i]; - DBUG_PRINT("info",("trying %s:%d", - m->name.c_str(), - m->port)); - switch(m->type){ - case MgmId_TCP: - tmp.assfmt("%s:%d", m->name.c_str(), m->port); - if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) { - m_mgmd_port= m->port; - m_mgmd_host= m->name.c_str(); - DBUG_PRINT("info",("connected to ndb_mgmd at %s:%d", - m_mgmd_host, - m_mgmd_port)); - return 0; - } - setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle)); - case MgmId_File: - break; - } - } - if(latestErrorType == CR_RETRY){ - DBUG_PRINT("info",("CR_RETRY")); - if (exit_on_connect_failure) - return 1; - REPORT_WARNING("Failed to retrieve cluster configuration"); - ndbout << "(Cause of failure: " << getErrorString() << ")" << endl; - ndbout << "Attempt " << retry << " of " << retry_max << ". " - << "Trying again in "<< retry_interval <<" seconds..." - << endl << endl; - NdbSleep_SecSleep(retry_interval); - } else { - break; - } - retry++; - } - - ndb_mgm_destroy_handle(&m_handle); - m_handle= 0; - m_mgmd_port= 0; - m_mgmd_host= 0; - return -1; +ConfigRetriever::do_connect(int no_retries, + int retry_delay_in_seconds, int verbose) +{ + return + (ndb_mgm_connect(m_handle,no_retries,retry_delay_in_seconds,verbose)==0) ? + 0 : -1; } //**************************************************************************** @@ -140,22 +111,9 @@ ConfigRetriever::getConfig() { struct ndb_mgm_configuration * p = 0; - if(m_handle != 0){ + if(m_handle != 0) p = getConfig(m_handle); - } else { - for (unsigned int i = 0; i<_localConfig.ids.size(); i++){ - MgmtSrvrId * m = &_localConfig.ids[i]; - switch(m->type){ - case MgmId_File: - p = getConfig(m->name.c_str()); - break; - case MgmId_TCP: - break; - } - if(p) - break; - } - } + if(p == 0) return 0; @@ -227,6 +185,16 @@ ConfigRetriever::setError(ErrorType et, const char * s){ latestErrorType = et; } +void +ConfigRetriever::resetError(){ + setError(CR_NO_ERROR,0); +} + +int +ConfigRetriever::hasError() +{ + return latestErrorType != CR_NO_ERROR; +} const char * ConfigRetriever::getErrorString(){ @@ -341,16 +309,23 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf, Uint32 } Uint32 -ConfigRetriever::allocNodeId(){ - unsigned nodeid= _ownNodeId; - - if(m_handle != 0){ - int res= ndb_mgm_alloc_nodeid(m_handle, m_version, &nodeid, m_node_type); - if(res != 0) { - setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle)); - return 0; +ConfigRetriever::allocNodeId(int no_retries, int retry_delay_in_seconds) +{ + _ownNodeId= 0; + if(m_handle != 0) + { + while (1) + { + int res= ndb_mgm_alloc_nodeid(m_handle, m_version, m_node_type); + if(res >= 0) + return _ownNodeId= (Uint32)res; + if (no_retries == 0) + break; + no_retries--; + NdbSleep_SecSleep(retry_delay_in_seconds); } - } - - return _ownNodeId= nodeid; + setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle)); + } else + setError(CR_ERROR, "management server handle not initialized"); + return 0; } diff --git a/ndb/src/common/util/version.c b/ndb/src/common/util/version.c index 965d0a735e1..7a537297861 100644 --- a/ndb/src/common/util/version.c +++ b/ndb/src/common/util/version.c @@ -70,7 +70,6 @@ struct NdbUpGradeCompatible { #ifndef TEST_VERSION struct NdbUpGradeCompatible ndbCompatibleTable_full[] = { { MAKE_VERSION(3,5,2), MAKE_VERSION(3,5,1), UG_Exact }, - { MAKE_VERSION(4,1,8), MAKE_VERSION(3,5,4), UG_Exact }, /* Aligned version with MySQL */ { 0, 0, UG_Null } }; diff --git a/ndb/src/kernel/blocks/dbdih/Dbdih.hpp b/ndb/src/kernel/blocks/dbdih/Dbdih.hpp index 14fa262f871..0a2d50cb876 100644 --- a/ndb/src/kernel/blocks/dbdih/Dbdih.hpp +++ b/ndb/src/kernel/blocks/dbdih/Dbdih.hpp @@ -147,7 +147,6 @@ public: Uint32 nfConnect; Uint32 table; Uint32 userpointer; - Uint32 nodeCount; BlockReference userblockref; }; typedef Ptr<ConnectRecord> ConnectRecordPtr; diff --git a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp index 76aa745c3e0..4592b121c7e 100644 --- a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp +++ b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp @@ -7080,24 +7080,22 @@ void Dbdih::execDIGETPRIMREQ(Signal* signal) ndbrequire(tabPtr.p->tabStatus == TabRecord::TS_ACTIVE); connectPtr.i = signal->theData[0]; - if(connectPtr.i != RNIL){ + if(connectPtr.i != RNIL) + { jam(); ptrCheckGuard(connectPtr, cconnectFileSize, connectRecord); - ndbrequire(connectPtr.p->connectState == ConnectRecord::INUSE); - getFragstore(tabPtr.p, fragId, fragPtr); - connectPtr.p->nodeCount = extractNodeInfo(fragPtr.p, connectPtr.p->nodes); signal->theData[0] = connectPtr.p->userpointer; - signal->theData[1] = passThrough; - signal->theData[2] = connectPtr.p->nodes[0]; - sendSignal(connectPtr.p->userblockref, GSN_DIGETPRIMCONF, signal, 3, JBB); - return; - }//if - //connectPtr.i == RNIL -> question without connect record + } + else + { + jam(); + signal->theData[0] = RNIL; + } + Uint32 nodes[MAX_REPLICAS]; getFragstore(tabPtr.p, fragId, fragPtr); Uint32 count = extractNodeInfo(fragPtr.p, nodes); - signal->theData[0] = RNIL; signal->theData[1] = passThrough; signal->theData[2] = nodes[0]; signal->theData[3] = nodes[1]; diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index 739c3c741fb..0c63cb5fe17 100644 --- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -550,6 +550,11 @@ public: UintR scanErrorCounter; UintR scanLocalFragid; UintR scanSchemaVersion; + + /** + * This is _always_ main table, even in range scan + * in which case scanTcrec->fragmentptr is different + */ Uint32 fragPtrI; UintR scanStoredProcId; ScanState scanState; @@ -2925,4 +2930,23 @@ Dblqh::ScanRecord::check_scan_batch_completed() const (max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes)); } +inline +void +Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index) +{ + if (index == 0) { + acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0]; + } else { + Uint32 attr_buf_index, attr_buf_rec; + + AttrbufPtr regAttrPtr; + jam(); + attr_buf_rec= (index + 31) / 32; + attr_buf_index= (index - 1) & 31; + regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; + ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); + acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index]; + } +} + #endif diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 5622706a96c..88e8f25b004 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -3084,6 +3084,7 @@ void Dblqh::execATTRINFO(Signal* signal) return; break; default: + ndbout_c("%d", regTcPtr->transactionState); ndbrequire(false); break; }//switch @@ -7161,10 +7162,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal) // Update timer on tcConnectRecord tcConnectptr.p->tcTimer = cLqhTimeOutCount; - init_acc_ptr_list(scanptr.p); - scanptr.p->m_curr_batch_size_rows = 0; - scanptr.p->m_curr_batch_size_bytes= 0; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; scanNextLoopLab(signal); }//Dblqh::continueScanNextReqLab() @@ -7363,22 +7361,32 @@ void Dblqh::scanLockReleasedLab(Signal* signal) tcConnectptr.i = scanptr.p->scanTcrec; ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); releaseActiveFrag(signal); + if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) { if ((scanptr.p->scanErrorCounter > 0) || (scanptr.p->scanCompletedStatus == ZTRUE)) { jam(); + scanptr.p->m_curr_batch_size_rows = 0; + scanptr.p->m_curr_batch_size_bytes = 0; closeScanLab(signal); } else if (scanptr.p->check_scan_batch_completed() && scanptr.p->scanLockHold != ZTRUE) { jam(); scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; sendScanFragConf(signal, ZFALSE); + } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) { + jam(); + closeScanLab(signal); + return; } else { jam(); /* - We came here after releasing locks after receiving SCAN_NEXTREQ from TC. We only - come here when scanHoldLock == ZTRUE - */ + * We came here after releasing locks after + * receiving SCAN_NEXTREQ from TC. We only come here + * when scanHoldLock == ZTRUE + */ + scanptr.p->m_curr_batch_size_rows = 0; + scanptr.p->m_curr_batch_size_bytes = 0; continueScanNextReqLab(signal); }//if } else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) { @@ -7465,25 +7473,6 @@ Dblqh::init_acc_ptr_list(ScanRecord* scanP) scanP->scan_acc_index = 0; } -inline -void -Dblqh::i_get_acc_ptr(ScanRecord* scanP, Uint32* &acc_ptr, Uint32 index) -{ - if (index == 0) { - acc_ptr= (Uint32*)&scanP->scan_acc_op_ptr[0]; - } else { - Uint32 attr_buf_index, attr_buf_rec; - - AttrbufPtr regAttrPtr; - jam(); - attr_buf_rec= (index + 31) / 32; - attr_buf_index= (index - 1) & 31; - regAttrPtr.i= scanP->scan_acc_op_ptr[attr_buf_rec]; - ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf); - acc_ptr= (Uint32*)®AttrPtr.p->attrbuf[attr_buf_index]; - } -} - Uint32 Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index, @@ -7714,6 +7703,9 @@ void Dblqh::abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode){ jam(); scanptr.i = scan_ptr_i; c_scanRecordPool.getPtr(scanptr); + + fragptr.i = tcConnectptr.p->fragmentptr; + ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord); finishScanrec(signal); releaseScanrec(signal); tcConnectptr.p->transactionState = TcConnectionrec::IDLE; @@ -8007,6 +7999,13 @@ void Dblqh::nextScanConfScanLab(Signal* signal) /************************************************************* * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. ************************************************************ */ + if (!scanptr.p->scanLockHold) + { + jam(); + closeScanLab(signal); + return; + } + if (scanptr.p->scanCompletedStatus == ZTRUE) { if ((scanptr.p->scanLockHold == ZTRUE) && (scanptr.p->m_curr_batch_size_rows > 0)) { @@ -8507,8 +8506,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal) ScanFragRef::SignalLength, JBB); } else { jam(); - scanptr.p->m_curr_batch_size_rows = 0; - scanptr.p->m_curr_batch_size_bytes= 0; sendScanFragConf(signal, ZSCAN_FRAG_CLOSED); }//if finishScanrec(signal); @@ -8576,10 +8573,12 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) /** * Used for scan take over */ - FragrecordPtr tFragPtr; - tFragPtr.i = fragptr.p->tableFragptr; - ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); - scanptr.p->fragPtrI = fragptr.p->tableFragptr; + { + FragrecordPtr tFragPtr; + tFragPtr.i = fragptr.p->tableFragptr; + ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); + scanptr.p->fragPtrI = fragptr.p->tableFragptr; + } /** * !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11 @@ -8588,8 +8587,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ); Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1); stop += start; - Uint32 free = tFragPtr.p->m_scanNumberMask.find(start); - + Uint32 free = fragptr.p->m_scanNumberMask.find(start); + if(free == Fragrecord::ScanNumberMask::NotFound || free >= stop){ jam(); @@ -8603,16 +8602,16 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) */ scanptr.p->scanState = ScanRecord::IN_QUEUE; LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, - tFragPtr.p->m_queuedScans); + fragptr.p->m_queuedScans); queue.add(scanptr); return ZOK; } - + scanptr.p->scanNumber = free; - tFragPtr.p->m_scanNumberMask.clear(free);// Update mask + fragptr.p->m_scanNumberMask.clear(free);// Update mask - LocalDLList<ScanRecord> active(c_scanRecordPool, tFragPtr.p->m_activeScans); + LocalDLList<ScanRecord> active(c_scanRecordPool, fragptr.p->m_activeScans); active.add(scanptr); if(scanptr.p->scanKeyinfoFlag){ jam(); @@ -8672,12 +8671,8 @@ void Dblqh::finishScanrec(Signal* signal) { release_acc_ptr_list(scanptr.p); - FragrecordPtr tFragPtr; - tFragPtr.i = scanptr.p->fragPtrI; - ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); - LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, - tFragPtr.p->m_queuedScans); + fragptr.p->m_queuedScans); if(scanptr.p->scanState == ScanRecord::IN_QUEUE){ jam(); @@ -8695,11 +8690,11 @@ void Dblqh::finishScanrec(Signal* signal) ndbrequire(tmp.p == scanptr.p); } - LocalDLList<ScanRecord> scans(c_scanRecordPool, tFragPtr.p->m_activeScans); + LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans); scans.release(scanptr); const Uint32 scanNumber = scanptr.p->scanNumber; - ndbrequire(!tFragPtr.p->m_scanNumberMask.get(scanNumber)); + ndbrequire(!fragptr.p->m_scanNumberMask.get(scanNumber)); ScanRecordPtr restart; /** @@ -8707,13 +8702,13 @@ void Dblqh::finishScanrec(Signal* signal) */ if(scanNumber == NR_ScanNo || !queue.first(restart)){ jam(); - tFragPtr.p->m_scanNumberMask.set(scanNumber); + fragptr.p->m_scanNumberMask.set(scanNumber); return; } if(ERROR_INSERTED(5034)){ jam(); - tFragPtr.p->m_scanNumberMask.set(scanNumber); + fragptr.p->m_scanNumberMask.set(scanNumber); return; } @@ -8724,7 +8719,7 @@ void Dblqh::finishScanrec(Signal* signal) ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); restart.p->scanNumber = scanNumber; restart.p->scanState = ScanRecord::WAIT_ACC_SCAN; - + queue.remove(restart); scans.add(restart); if(restart.p->scanKeyinfoFlag){ @@ -8912,6 +8907,13 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted) conf->total_len= total_len; sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF, signal, ScanFragConf::SignalLength, JBB); + + if(!scanptr.p->scanLockHold) + { + jam(); + scanptr.p->m_curr_batch_size_rows = 0; + scanptr.p->m_curr_batch_size_bytes= 0; + } }//Dblqh::sendScanFragConf() /* ######################################################################### */ diff --git a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp index a209df24c44..fb90ccc8c90 100644 --- a/ndb/src/kernel/blocks/dbtc/Dbtc.hpp +++ b/ndb/src/kernel/blocks/dbtc/Dbtc.hpp @@ -1054,9 +1054,8 @@ public: // Id of the ScanRecord this fragment scan belongs to Uint32 scanRec; - // The maximum number of operations that can be scanned before - // returning to TC - Uint16 scanFragConcurrency; + // The value of fragmentCompleted in the last received SCAN_FRAGCONF + Uint8 m_scan_frag_conf_status; inline void startFragTimer(Uint32 timeVal){ scanFragTimer = timeVal; @@ -1193,8 +1192,10 @@ public: // Number of operation records per scanned fragment // Number of operations in first batch // Max number of bytes per batch - Uint16 noOprecPerFrag; - Uint16 first_batch_size; + union { + Uint16 first_batch_size_rows; + Uint16 batch_size_rows; + }; Uint32 batch_byte_size; Uint32 scanRequestInfo; // ScanFrag format diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index d8b3ee10532..07dbb370ec6 100644 --- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -8646,9 +8646,9 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, scanptr.p->scanTableref = tabptr.i; scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion; scanptr.p->scanParallel = scanParallel; - scanptr.p->noOprecPerFrag = noOprecPerFrag; - scanptr.p->first_batch_size= scanTabReq->first_batch_size; - scanptr.p->batch_byte_size= scanTabReq->batch_byte_size; + scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size; + scanptr.p->batch_byte_size = scanTabReq->batch_byte_size; + scanptr.p->batch_size_rows = noOprecPerFrag; Uint32 tmp = 0; const UintR ri = scanTabReq->requestInfo; @@ -8672,7 +8672,6 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr, ndbrequire(list.seize(ptr)); ptr.p->scanRec = scanptr.i; ptr.p->scanFragId = 0; - ptr.p->scanFragConcurrency = noOprecPerFrag; ptr.p->m_apiPtr = cdata[i]; }//for @@ -8945,6 +8944,25 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal) scanptr.i = scanFragptr.p->scanRec; ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); + /** + * This must be false as select count(*) otherwise + * can "pass" committing on backup fragments and + * get incorrect row count + */ + if(false && ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo)) + { + jam(); + Uint32 max = 3+signal->theData[6]; + Uint32 nodeid = getOwnNodeId(); + for(Uint32 i = 3; i<max; i++) + if(signal->theData[i] == nodeid) + { + jam(); + tnodeid = nodeid; + break; + } + } + { /** * Check table @@ -9141,6 +9159,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0]; const Uint32 noCompletedOps = conf->completedOps; + const Uint32 status = conf->fragmentCompleted; scanFragptr.i = conf->senderData; c_scan_frag_pool.getPtr(scanFragptr); @@ -9163,11 +9182,9 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE); - const Uint32 status = conf->fragmentCompleted; - if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){ jam(); - if(status == ZFALSE){ + if(status == 0){ /** * We have started closing = we sent a close -> ignore this */ @@ -9184,11 +9201,11 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) return; } - if(status == ZCLOSED && scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){ + if(noCompletedOps == 0 && status != 0 && + scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){ /** * Start on next fragment */ - ndbrequire(noCompletedOps == 0); scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; scanFragptr.p->startFragTimer(ctcTimer); @@ -9218,6 +9235,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) scanptr.p->m_queued_count++; } + scanFragptr.p->m_scan_frag_conf_status = status; scanFragptr.p->m_ops = noCompletedOps; scanFragptr.p->m_totalLen = total_len; scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY; @@ -9311,7 +9329,6 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) /********************************************************************* * APPLICATION IS CLOSING THE SCAN. **********************************************************************/ - ndbrequire(len == 0); close_scan_req(signal, scanptr, true); return; }//if @@ -9330,11 +9347,12 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) // Copy op ptrs so I dont overwrite them when sending... memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len); - ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0]; - nextReq->closeFlag = ZFALSE; - nextReq->transId1 = apiConnectptr.p->transid[0]; - nextReq->transId2 = apiConnectptr.p->transid[1]; - nextReq->batch_size_bytes= scanP->batch_byte_size; + ScanFragNextReq tmp; + tmp.closeFlag = ZFALSE; + tmp.transId1 = apiConnectptr.p->transid[0]; + tmp.transId2 = apiConnectptr.p->transid[1]; + tmp.batch_size_rows = scanP->batch_size_rows; + tmp.batch_size_bytes = scanP->batch_byte_size; ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags); ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags); @@ -9344,15 +9362,37 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) c_scan_frag_pool.getPtr(scanFragptr); ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED); - scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE; scanFragptr.p->startFragTimer(ctcTimer); - scanFragptr.p->m_ops = 0; - nextReq->senderData = scanFragptr.i; - nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency; - sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, - ScanFragNextReq::SignalLength, JBB); + if(scanFragptr.p->m_scan_frag_conf_status) + { + /** + * last scan was complete + */ + jam(); + ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag); + scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF; + + tcConnectptr.i = scanptr.p->scanTcrec; + ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord); + scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++; + signal->theData[0] = tcConnectptr.p->dihConnectptr; + signal->theData[1] = scanFragptr.i; + signal->theData[2] = scanptr.p->scanTableref; + signal->theData[3] = scanFragptr.p->scanFragId; + sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB); + } + else + { + jam(); + scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE; + ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend(); + * req = tmp; + req->senderData = scanFragptr.i; + sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, + ScanFragNextReq::SignalLength, JBB); + } delivered.remove(scanFragptr); running.add(scanFragptr); }//for @@ -9416,7 +9456,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){ ndbrequire(curr.p->scanFragState == ScanFragRec::DELIVERED); delivered.remove(curr); - if(curr.p->m_ops > 0){ + if(curr.p->m_ops > 0 && curr.p->m_scan_frag_conf_status == 0){ jam(); running.add(curr); curr.p->scanFragState = ScanFragRec::LQH_ACTIVE; @@ -9551,7 +9591,7 @@ void Dbtc::sendScanFragReq(Signal* signal, req->transId1 = apiConnectptr.p->transid[0]; req->transId2 = apiConnectptr.p->transid[1]; req->clientOpPtr = scanFragP->m_apiPtr; - req->batch_size_rows= scanFragP->scanFragConcurrency; + req->batch_size_rows= scanP->batch_size_rows; req->batch_size_bytes= scanP->batch_byte_size; sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, ScanFragReq::SignalLength, JBB); @@ -9573,6 +9613,8 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) { jam(); ops += 21; } + + Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId; ScanTabConf * conf = (ScanTabConf*)&signal->theData[0]; conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect; @@ -9588,24 +9630,25 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) { ScanFragRecPtr curr = ptr; // Remove while iterating... queued.next(ptr); + bool done = curr.p->m_scan_frag_conf_status && --left; + * ops++ = curr.p->m_apiPtr; - * ops++ = curr.i; + * ops++ = done ? RNIL : curr.i; * ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops; queued.remove(curr); - if(curr.p->m_ops > 0){ + if(!done){ delivered.add(curr); curr.p->scanFragState = ScanFragRec::DELIVERED; curr.p->stopFragTimer(); } else { - (* --ops) = ScanTabConf::EndOfData; ops++; c_scan_frag_pool.release(curr); curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->stopFragTimer(); } } } - + if(scanPtr.p->m_delivered_scan_frags.isEmpty() && scanPtr.p->m_running_scan_frags.isEmpty()){ conf->requestInfo = op_count | ScanTabConf::EndOfData; @@ -10424,9 +10467,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) sfp.i, sfp.p->scanFragState, sfp.p->scanFragId); - infoEvent(" nodeid=%d, concurr=%d, timer=%d", + infoEvent(" nodeid=%d, timer=%d", refToNode(sfp.p->lqhBlockref), - sfp.p->scanFragConcurrency, sfp.p->scanFragTimer); } @@ -10504,7 +10546,7 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) sp.p->scanAiLength, sp.p->scanParallel, sp.p->scanReceivedOperations, - sp.p->noOprecPerFrag); + sp.p->batch_size_rows); infoEvent(" schv=%d, tab=%d, sproc=%d", sp.p->scanSchemaVersion, sp.p->scanTableref, diff --git a/ndb/src/kernel/blocks/suma/Suma.cpp b/ndb/src/kernel/blocks/suma/Suma.cpp index d11d5f7176a..f6d9a0ac35a 100644 --- a/ndb/src/kernel/blocks/suma/Suma.cpp +++ b/ndb/src/kernel/blocks/suma/Suma.cpp @@ -1888,7 +1888,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ req->requestInfo = 0; req->savePointId = 0; ScanFragReq::setLockMode(req->requestInfo, 0); - ScanFragReq::setHoldLockFlag(req->requestInfo, 0); + ScanFragReq::setHoldLockFlag(req->requestInfo, 1); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); ScanFragReq::setAttrLen(req->requestInfo, attrLen); req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo; diff --git a/ndb/src/kernel/main.cpp b/ndb/src/kernel/main.cpp index 926647838c9..f34e16318cd 100644 --- a/ndb/src/kernel/main.cpp +++ b/ndb/src/kernel/main.cpp @@ -19,7 +19,6 @@ #include <ndb_version.h> #include "Configuration.hpp" -#include <LocalConfig.hpp> #include <TransporterRegistry.hpp> #include "vm/SimBlockList.hpp" @@ -69,16 +68,9 @@ int main(int argc, char** argv) return NRT_Default; } - LocalConfig local_config; - if (!local_config.init(theConfig->getConnectString(),0)){ - local_config.printError(); - local_config.printUsage(); - return NRT_Default; - } - { // Do configuration signal(SIGPIPE, SIG_IGN); - theConfig->fetch_configuration(local_config); + theConfig->fetch_configuration(); } chdir(NdbConfig_get_path(0)); @@ -141,7 +133,7 @@ int main(int argc, char** argv) exit(0); } g_eventLogger.info("Ndb has terminated (pid %d) restarting", child); - theConfig->fetch_configuration(local_config); + theConfig->fetch_configuration(); } g_eventLogger.info("Angel pid: %d ndb pid: %d", getppid(), getpid()); diff --git a/ndb/src/kernel/vm/Configuration.cpp b/ndb/src/kernel/vm/Configuration.cpp index aac035fe1b7..931b4da5a17 100644 --- a/ndb/src/kernel/vm/Configuration.cpp +++ b/ndb/src/kernel/vm/Configuration.cpp @@ -17,7 +17,6 @@ #include <ndb_global.h> #include <ndb_opts.h> -#include <LocalConfig.hpp> #include "Configuration.hpp" #include <ErrorHandlingMacros.hpp> #include "GlobalData.hpp" @@ -35,6 +34,7 @@ #include <kernel_types.h> #include <ndb_limits.h> +#include <ndbapi_limits.h> #include "pc.hpp" #include <LogLevel.hpp> #include <NdbSleep.h> @@ -108,7 +108,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), bool Configuration::init(int argc, char** argv) { - const char *load_default_groups[]= { "ndbd",0 }; + const char *load_default_groups[]= { "mysql_cluster","ndbd",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; @@ -189,7 +189,7 @@ Configuration::closeConfiguration(){ } void -Configuration::fetch_configuration(LocalConfig &local_config){ +Configuration::fetch_configuration(){ /** * Fetch configuration from management server */ @@ -199,8 +199,17 @@ Configuration::fetch_configuration(LocalConfig &local_config){ m_mgmd_port= 0; m_mgmd_host= 0; - m_config_retriever= new ConfigRetriever(local_config, NDB_VERSION, NODE_TYPE_DB); - if(m_config_retriever->do_connect() == -1){ + m_config_retriever= new ConfigRetriever(getConnectString(), + NDB_VERSION, NODE_TYPE_DB); + + if (m_config_retriever->hasError()) + { + ERROR_SET(fatal, ERR_INVALID_CONFIG, + "Could not connect initialize handle to management server", + m_config_retriever->getErrorString()); + } + + if(m_config_retriever->do_connect(12,5,1) == -1){ const char * s = m_config_retriever->getErrorString(); if(s == 0) s = "No error given!"; @@ -215,13 +224,7 @@ Configuration::fetch_configuration(LocalConfig &local_config){ ConfigRetriever &cr= *m_config_retriever; - if((globalData.ownId = cr.allocNodeId()) == 0){ - for(Uint32 i = 0; i<3; i++){ - NdbSleep_SecSleep(3); - if((globalData.ownId = cr.allocNodeId()) != 0) - break; - } - } + globalData.ownId = cr.allocNodeId(2 /*retry*/,3 /*delay*/); if(globalData.ownId == 0){ ERROR_SET(fatal, ERR_INVALID_CONFIG, @@ -452,6 +455,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){ unsigned int noOfTables = 0; unsigned int noOfUniqueHashIndexes = 0; unsigned int noOfOrderedIndexes = 0; + unsigned int noOfTriggers = 0; unsigned int noOfReplicas = 0; unsigned int noOfDBNodes = 0; unsigned int noOfAPINodes = 0; @@ -476,6 +480,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){ { CFG_DB_NO_TABLES, &noOfTables, false }, { CFG_DB_NO_ORDERED_INDEXES, &noOfOrderedIndexes, false }, { CFG_DB_NO_UNIQUE_HASH_INDEXES, &noOfUniqueHashIndexes, false }, + { CFG_DB_NO_TRIGGERS, &noOfTriggers, true }, { CFG_DB_NO_REPLICAS, &noOfReplicas, false }, { CFG_DB_NO_ATTRIBUTES, &noOfAttributes, false }, { CFG_DB_NO_OPS, &noOfOperations, false }, @@ -584,6 +589,18 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){ ConfigValues::Iterator it2(*ownConfig, db.m_config); it2.set(CFG_DB_NO_TABLES, noOfTables); it2.set(CFG_DB_NO_ATTRIBUTES, noOfAttributes); + { + Uint32 neededNoOfTriggers = /* types: Insert/Update/Delete/Custom */ + 3 * noOfUniqueHashIndexes + /* for unique hash indexes, I/U/D */ + 3 * NDB_MAX_ACTIVE_EVENTS + /* for events in suma, I/U/D */ + 3 * noOfTables + /* for backup, I/U/D */ + noOfOrderedIndexes; /* for ordered indexes, C */ + if (noOfTriggers < neededNoOfTriggers) + { + noOfTriggers= neededNoOfTriggers; + it2.set(CFG_DB_NO_TRIGGERS, noOfTriggers); + } + } /** * Do size calculations diff --git a/ndb/src/kernel/vm/Configuration.hpp b/ndb/src/kernel/vm/Configuration.hpp index e4cd64f5ca8..acf0e163a84 100644 --- a/ndb/src/kernel/vm/Configuration.hpp +++ b/ndb/src/kernel/vm/Configuration.hpp @@ -21,7 +21,6 @@ #include <ndb_types.h> class ConfigRetriever; -class LocalConfig; class Configuration { public: @@ -33,7 +32,7 @@ public: */ bool init(int argc, char** argv); - void fetch_configuration(LocalConfig &local_config); + void fetch_configuration(); void setupConfiguration(); void closeConfiguration(); diff --git a/ndb/src/mgmapi/LocalConfig.cpp b/ndb/src/mgmapi/LocalConfig.cpp index d0ff97cdedf..8f1e2ee8100 100644 --- a/ndb/src/mgmapi/LocalConfig.cpp +++ b/ndb/src/mgmapi/LocalConfig.cpp @@ -14,7 +14,7 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include <LocalConfig.hpp> +#include "LocalConfig.hpp" #include <NdbEnv.h> #include <NdbConfig.h> #include <NdbAutoPtr.hpp> @@ -294,4 +294,19 @@ LocalConfig::readConnectString(const char * connectString, return return_value; } +char * +LocalConfig::makeConnectString(char *buf, int sz) +{ + int p= BaseString::snprintf(buf,sz,"nodeid=%d", _ownNodeId); + for (int i = 0; (i < ids.size()) && (sz-p > 0); i++) + { + if (ids[i].type != MgmId_TCP) + continue; + p+=BaseString::snprintf(buf+p,sz-p,",%s:%d", + ids[i].name.c_str(), ids[i].port); + } + buf[sz-1]=0; + return buf; +} + template class Vector<MgmtSrvrId>; diff --git a/ndb/include/mgmapi/LocalConfig.hpp b/ndb/src/mgmapi/LocalConfig.hpp index 9ceeffdba36..c415ec1be91 100644 --- a/ndb/include/mgmapi/LocalConfig.hpp +++ b/ndb/src/mgmapi/LocalConfig.hpp @@ -61,6 +61,7 @@ struct LocalConfig { bool parseHostName(const char *buf); bool parseFileName(const char *buf); bool parseString(const char *buf, BaseString &err); + char * makeConnectString(char *buf, int sz); }; #endif // LocalConfig_H diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp index 51f2d7cee01..ca3a2a2186d 100644 --- a/ndb/src/mgmapi/mgmapi.cpp +++ b/ndb/src/mgmapi/mgmapi.cpp @@ -20,6 +20,7 @@ #include <LocalConfig.hpp> #include <NdbAutoPtr.hpp> +#include <NdbSleep.h> #include <NdbTCP.h> #include "mgmapi.h" #include "mgmapi_debug.h" @@ -83,8 +84,8 @@ typedef Parser<ParserDummy> Parser_t; #define NDB_MGM_MAX_ERR_DESC_SIZE 256 struct ndb_mgm_handle { - char * hostname; - unsigned short port; + char * connectstring; + int cfg_i; int connected; int last_error; @@ -95,7 +96,7 @@ struct ndb_mgm_handle { NDB_SOCKET_TYPE socket; - char cfg_ptr[sizeof(LocalConfig)]; + LocalConfig cfg; #ifdef MGMAPI_LOG FILE* logfile; @@ -148,14 +149,16 @@ ndb_mgm_create_handle() h->connected = 0; h->last_error = 0; h->last_error_line = 0; - h->hostname = 0; h->socket = NDB_INVALID_SOCKET; h->read_timeout = 50000; h->write_timeout = 100; - - new (h->cfg_ptr) LocalConfig; + h->cfg_i = 0; strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE); + + new (&(h->cfg)) LocalConfig; + h->cfg.init(0, 0); + #ifdef MGMAPI_LOG h->logfile = 0; #endif @@ -163,6 +166,23 @@ ndb_mgm_create_handle() return h; } +extern "C" +int +ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv) +{ + new (&(handle->cfg)) LocalConfig; + if (!handle->cfg.init(mgmsrv, 0) || + handle->cfg.ids.size() == 0) + { + new (&(handle->cfg)) LocalConfig; + handle->cfg.init(0, 0); /* reset the LocalCongig */ + SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, ""); + return -1; + } + handle->cfg_i= 0; + return 0; +} + /** * Destroy a handle */ @@ -175,14 +195,13 @@ ndb_mgm_destroy_handle(NdbMgmHandle * handle) if((* handle)->connected){ ndb_mgm_disconnect(* handle); } - my_free((* handle)->hostname,MYF(MY_ALLOW_ZERO_PTR)); #ifdef MGMAPI_LOG if ((* handle)->logfile != 0){ fclose((* handle)->logfile); (* handle)->logfile = 0; } #endif - ((LocalConfig*)((*handle)->cfg_ptr))->~LocalConfig(); + (*handle)->cfg.~LocalConfig(); my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR)); * handle = 0; } @@ -314,7 +333,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply, */ extern "C" int -ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv) +ndb_mgm_connect(NdbMgmHandle handle, int no_retries, + int retry_delay_in_seconds, int verbose) { SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_connect"); CHECK_HANDLE(handle, -1); @@ -331,36 +351,48 @@ ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv) /** * Do connect */ - LocalConfig *cfg= (LocalConfig*)(handle->cfg_ptr); - new (cfg) LocalConfig; - if (!cfg->init(mgmsrv, 0) || - cfg->ids.size() == 0) - { - SET_ERROR(handle, NDB_MGM_ILLEGAL_CONNECT_STRING, ""); - return -1; - } - + LocalConfig &cfg= handle->cfg; NDB_SOCKET_TYPE sockfd= NDB_INVALID_SOCKET; Uint32 i; - for (i = 0; i < cfg->ids.size(); i++) + while (sockfd == NDB_INVALID_SOCKET) { - if (cfg->ids[i].type != MgmId_TCP) - continue; - SocketClient s(cfg->ids[i].name.c_str(), cfg->ids[i].port); - sockfd = s.connect(); + // do all the mgmt servers + for (i = 0; i < cfg.ids.size(); i++) + { + if (cfg.ids[i].type != MgmId_TCP) + continue; + SocketClient s(cfg.ids[i].name.c_str(), cfg.ids[i].port); + sockfd = s.connect(); + if (sockfd != NDB_INVALID_SOCKET) + break; + } if (sockfd != NDB_INVALID_SOCKET) break; - } - if (sockfd == NDB_INVALID_SOCKET) - { - setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, - "Unable to connect using connectstring %s", mgmsrv); - return -1; + if (verbose > 0) { + char buf[1024]; + ndbout_c("Unable to connect with connect string: %s", + cfg.makeConnectString(buf,sizeof(buf))); + verbose= -1; + } + if (no_retries == 0) { + char buf[1024]; + setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, + "Unable to connect with connect string: %s", + cfg.makeConnectString(buf,sizeof(buf))); + return -1; + } + if (verbose == -1) { + ndbout << "retrying every " << retry_delay_in_seconds << " seconds:"; + verbose= -2; + } + NdbSleep_SecSleep(retry_delay_in_seconds); + if (verbose == -2) { + ndbout << " " << no_retries; + } + no_retries--; } - my_free(handle->hostname,MYF(MY_ALLOW_ZERO_PTR)); - handle->hostname = my_strdup(cfg->ids[i].name.c_str(),MYF(MY_WME)); - handle->port = cfg->ids[i].port; + handle->cfg_i = i; handle->socket = sockfd; handle->connected = 1; @@ -1068,7 +1100,9 @@ ndb_mgm_listen_event(NdbMgmHandle handle, int filter[]) }; CHECK_HANDLE(handle, -1); - SocketClient s(handle->hostname, handle->port); + const char *hostname= ndb_mgm_get_connected_host(handle); + int port= ndb_mgm_get_connected_port(handle); + SocketClient s(hostname, port); const NDB_SOCKET_TYPE sockfd = s.connect(); if (sockfd < 0) { setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, @@ -1613,16 +1647,37 @@ ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *cfg) extern "C" int -ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodeid, int nodetype) +ndb_mgm_get_configuration_nodeid(NdbMgmHandle handle) { + CHECK_HANDLE(handle, 0); + return handle->cfg._ownNodeId; +} + +extern "C" +int ndb_mgm_get_connected_port(NdbMgmHandle handle) +{ + return handle->cfg.ids[handle->cfg_i].port; +} +extern "C" +const char *ndb_mgm_get_connected_host(NdbMgmHandle handle) +{ + return handle->cfg.ids[handle->cfg_i].name.c_str(); +} + +extern "C" +int +ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype) +{ CHECK_HANDLE(handle, 0); CHECK_CONNECTED(handle, 0); + int nodeid= handle->cfg._ownNodeId; + Properties args; args.put("version", version); args.put("nodetype", nodetype); - args.put("nodeid", *pnodeid); + args.put("nodeid", nodeid); args.put("user", "mysqld"); args.put("password", "mysqld"); args.put("public key", "a public key"); @@ -1638,26 +1693,29 @@ ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodei prop= ndb_mgm_call(handle, reply, "get nodeid", &args); CHECK_REPLY(prop, -1); - int res= -1; + nodeid= -1; do { const char * buf; if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){ + const char *hostname= ndb_mgm_get_connected_host(handle); + unsigned port= ndb_mgm_get_connected_port(handle); BaseString err; err.assfmt("Could not alloc node id at %s port %d: %s", - handle->hostname, handle->port, buf); + hostname, port, buf); setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, err.c_str()); break; } - if(!prop->get("nodeid", pnodeid) != 0){ + Uint32 _nodeid; + if(!prop->get("nodeid", &_nodeid) != 0){ ndbout_c("ERROR Message: <nodeid Unspecified>\n"); break; } - res= 0; + nodeid= _nodeid; }while(0); delete prop; - return res; + return nodeid; } /***************************************************************************** diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp index bdeb885ed8b..54beaa49d3f 100644 --- a/ndb/src/mgmclient/CommandInterpreter.cpp +++ b/ndb/src/mgmclient/CommandInterpreter.cpp @@ -153,7 +153,6 @@ private: NdbMgmHandle m_mgmsrv; bool connected; - const char *host; int try_reconnect; #ifdef HAVE_GLOBAL_REPLICATION NdbRepHandle m_repserver; @@ -379,15 +378,16 @@ CommandInterpreter::CommandInterpreter(const char *_host) m_mgmsrv = ndb_mgm_create_handle(); if(m_mgmsrv == NULL) { ndbout_c("Cannot create handle to management server."); + exit(-1); + } + if (ndb_mgm_set_connectstring(m_mgmsrv, _host)) + { printError(); + exit(-1); } connected = false; try_reconnect = 0; - if (_host) - host= my_strdup(_host,MYF(MY_WME)); - else - host= 0; #ifdef HAVE_GLOBAL_REPLICATION rep_host = NULL; m_repserver = NULL; @@ -402,8 +402,6 @@ CommandInterpreter::~CommandInterpreter() { connected = false; ndb_mgm_destroy_handle(&m_mgmsrv); - my_free((char *)host,MYF(MY_ALLOW_ZERO_PTR)); - host = NULL; } static bool @@ -438,18 +436,8 @@ bool CommandInterpreter::connect() { if(!connected) { - int tries = try_reconnect; // tries == 0 => infinite - while(!connected) { - if(ndb_mgm_connect(m_mgmsrv, host) == -1) { - ndbout << "Cannot connect to management server (" << host << ")."; - tries--; - if (tries == 0) - break; - ndbout << "Retrying in 5 seconds." << endl; - NdbSleep_SecSleep(5); - } else - connected = true; - } + if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1)) + connected = true; } return connected; } diff --git a/ndb/src/mgmclient/main.cpp b/ndb/src/mgmclient/main.cpp index 401a9198f30..f32cc683296 100644 --- a/ndb/src/mgmclient/main.cpp +++ b/ndb/src/mgmclient/main.cpp @@ -30,9 +30,10 @@ extern "C" int add_history(const char *command); /* From readline directory */ #include <NdbMain.h> #include <NdbHost.h> +#include <BaseString.hpp> +#include <NdbOut.hpp> #include <mgmapi.h> #include <ndb_version.h> -#include <LocalConfig.hpp> #include "ndb_mgmclient.hpp" @@ -138,7 +139,7 @@ int main(int argc, char** argv){ NDB_INIT(argv[0]); const char *_host = 0; int _port = 0; - const char *load_default_groups[]= { "ndb_mgm",0 }; + const char *load_default_groups[]= { "mysql_cluster","ndb_mgm",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index a49b29af275..81b5eb9dfb3 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -399,16 +399,20 @@ MgmtSrvr::getPort() const { } /* Constructor */ -MgmtSrvr::MgmtSrvr(NodeId nodeId, - SocketServer *socket_server, - const BaseString &configFilename, - LocalConfig &local_config, - Config * config): +int MgmtSrvr::init() +{ + if ( _ownNodeId > 0) + return 0; + return -1; +} + +MgmtSrvr::MgmtSrvr(SocketServer *socket_server, + const char *config_filename, + const char *connect_string) : _blockNumber(1), // Hard coded block number since it makes it easy to send // signals to other management servers. m_socket_server(socket_server), _ownReference(0), - m_local_config(local_config), theSignalIdleList(NULL), theWaitState(WAIT_SUBSCRIBE_CONF), m_statisticsListner(this) @@ -416,6 +420,8 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, DBUG_ENTER("MgmtSrvr::MgmtSrvr"); + _ownNodeId= 0; + _config = NULL; _isStopThread = false; @@ -426,12 +432,43 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, theFacade = 0; m_newConfig = NULL; - m_configFilename = configFilename; + m_configFilename.assign(config_filename); m_nextConfigGenerationNumber = 0; - _config = (config == 0 ? readConfig() : config); - + m_config_retriever= new ConfigRetriever(connect_string, + NDB_VERSION, NDB_MGM_NODE_TYPE_MGM); + + // first try to allocate nodeid from another management server + if(m_config_retriever->do_connect(0,0,0) == 0) + { + int tmp_nodeid= 0; + tmp_nodeid= m_config_retriever->allocNodeId(0 /*retry*/,0 /*delay*/); + if (tmp_nodeid == 0) + { + ndbout_c(m_config_retriever->getErrorString()); + exit(-1); + } + // read config from other managent server + _config= fetchConfig(); + if (_config == 0) + { + ndbout << m_config_retriever->getErrorString() << endl; + exit(-1); + } + _ownNodeId= tmp_nodeid; + } + + if (_ownNodeId == 0) + { + // read config locally + _config= readConfig(); + if (_config == 0) { + ndbout << "Unable to read config file" << endl; + exit(-1); + } + } + theMgmtWaitForResponseCondPtr = NdbCondition_Create(); m_configMutex = NdbMutex_Create(); @@ -443,9 +480,11 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, nodeTypes[i] = (enum ndb_mgm_node_type)-1; m_connect_address[i].s_addr= 0; } + { - ndb_mgm_configuration_iterator * iter = ndb_mgm_create_configuration_iterator - (config->m_configValues, CFG_SECTION_NODE); + ndb_mgm_configuration_iterator + *iter = ndb_mgm_create_configuration_iterator(_config->m_configValues, + CFG_SECTION_NODE); for(ndb_mgm_first(iter); ndb_mgm_valid(iter); ndb_mgm_next(iter)){ unsigned type, id; if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0) @@ -478,8 +517,6 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, } _props = NULL; - _ownNodeId= 0; - NodeId tmp= nodeId; BaseString error_string; if ((m_node_id_mutex = NdbMutex_Create()) == 0) @@ -488,43 +525,25 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId, exit(-1); } -#if 0 - char my_hostname[256]; - struct sockaddr_in tmp_addr; - SOCKET_SIZE_TYPE addrlen= sizeof(tmp_addr); - if (!g_no_nodeid_checks) { - if (gethostname(my_hostname, sizeof(my_hostname))) { - ndbout << "error: gethostname() - " << strerror(errno) << endl; - exit(-1); - } - if (Ndb_getInAddr(&(((sockaddr_in*)&tmp_addr)->sin_addr),my_hostname)) { - ndbout << "error: Ndb_getInAddr(" << my_hostname << ") - " - << strerror(errno) << endl; + if (_ownNodeId == 0) // we did not get node id from other server + { + NodeId tmp= m_config_retriever->get_configuration_nodeid(); + + if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, + 0, 0, error_string)){ + ndbout << "Unable to obtain requested nodeid: " + << error_string.c_str() << endl; exit(-1); } + _ownNodeId = tmp; } - if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, - (struct sockaddr *)&tmp_addr, - &addrlen, error_string)){ - ndbout << "Unable to obtain requested nodeid: " - << error_string.c_str() << endl; - exit(-1); - } -#else - if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, - 0, 0, error_string)){ - ndbout << "Unable to obtain requested nodeid: " - << error_string.c_str() << endl; - exit(-1); - } -#endif - _ownNodeId = tmp; { DBUG_PRINT("info", ("verifyConfig")); - ConfigRetriever cr(m_local_config, NDB_VERSION, NDB_MGM_NODE_TYPE_MGM); - if (!cr.verifyConfig(config->m_configValues, _ownNodeId)) { - ndbout << cr.getErrorString() << endl; + if (!m_config_retriever->verifyConfig(_config->m_configValues, + _ownNodeId)) + { + ndbout << m_config_retriever->getErrorString() << endl; exit(-1); } } @@ -657,6 +676,8 @@ MgmtSrvr::~MgmtSrvr() NdbThread_WaitFor(m_signalRecvThread, &res); NdbThread_Destroy(&m_signalRecvThread); } + if (m_config_retriever) + delete m_config_retriever; } //**************************************************************************** diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp index b3257491123..2ab11250d81 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -175,11 +175,10 @@ public: /* Constructor */ - MgmtSrvr(NodeId nodeId, /* Local nodeid */ - SocketServer *socket_server, - const BaseString &config_filename, /* Where to save config */ - LocalConfig &local_config, /* Ndb.cfg filename */ - Config * config); + MgmtSrvr(SocketServer *socket_server, + const char *config_filename, /* Where to save config */ + const char *connect_string); + int init(); NodeId getOwnNodeId() const {return _ownNodeId;}; /** @@ -538,7 +537,6 @@ private: NdbMutex *m_configMutex; const Config * _config; Config * m_newConfig; - LocalConfig &m_local_config; BaseString m_configFilename; Uint32 m_nextConfigGenerationNumber; @@ -755,6 +753,9 @@ private: Config *_props; int send(class NdbApiSignal* signal, Uint32 node, Uint32 node_type); + + ConfigRetriever *m_config_retriever; + public: /** * This method does not exist diff --git a/ndb/src/mgmsrv/MgmtSrvrConfig.cpp b/ndb/src/mgmsrv/MgmtSrvrConfig.cpp index 1d51061e909..6c4b4e9ae3c 100644 --- a/ndb/src/mgmsrv/MgmtSrvrConfig.cpp +++ b/ndb/src/mgmsrv/MgmtSrvrConfig.cpp @@ -272,30 +272,20 @@ MgmtSrvr::saveConfig(const Config *conf) { Config * MgmtSrvr::readConfig() { - Config *conf = NULL; - if(m_configFilename.length() != 0) { - /* Use config file */ - InitConfigFileParser parser; - conf = parser.parseConfig(m_configFilename.c_str()); - - if(conf == NULL) { - /* Try to get configuration from other MGM server */ - return fetchConfig(); - } - } + Config *conf; + InitConfigFileParser parser; + conf = parser.parseConfig(m_configFilename.c_str()); return conf; } Config * MgmtSrvr::fetchConfig() { - ConfigRetriever cr(m_local_config, NDB_VERSION, NODE_TYPE_MGM); - struct ndb_mgm_configuration * tmp = cr.getConfig(); + struct ndb_mgm_configuration * tmp = m_config_retriever->getConfig(); if(tmp != 0){ Config * conf = new Config(); conf->m_configValues = tmp; return conf; } - return 0; } diff --git a/ndb/src/mgmsrv/main.cpp b/ndb/src/mgmsrv/main.cpp index b588a2d0933..84ff98626e5 100644 --- a/ndb/src/mgmsrv/main.cpp +++ b/ndb/src/mgmsrv/main.cpp @@ -62,7 +62,6 @@ struct MgmGlobals { int non_interactive; int interactive; const char * config_filename; - const char * local_config_filename; /** Stuff found in environment or in local config */ NodeId localNodeId; @@ -70,9 +69,6 @@ struct MgmGlobals { char * interface_name; int port; - /** The configuration of the cluster */ - Config * cluster_config; - /** The Mgmt Server */ MgmtSrvr * mgmObject; @@ -86,9 +82,6 @@ static MgmGlobals glob; /****************************************************************************** * Function prototypes ******************************************************************************/ -static bool readLocalConfig(); -static bool readGlobalConfig(); - /** * Global variables */ @@ -100,16 +93,28 @@ static char *opt_connect_str= 0; static struct my_option my_long_options[] = { - NDB_STD_OPTS("ndb_mgm"), +#ifndef DBUG_OFF + { "debug", '#', "Output debug log. Often this is 'd:t:o,filename'.", + 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0 }, +#endif + { "usage", '?', "Display this help and exit.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "help", '?', "Display this help and exit.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "version", 'V', "Output version information and exit.", 0, 0, 0, + GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "connect-string", 1023, + "Set connect string for connecting to ndb_mgmd. " + "<constr>=\"host=<hostname:port>[;nodeid=<id>]\". " + "Overides specifying entries in NDB_CONNECTSTRING and config file", + (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, + GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "config-file", 'f', "Specify cluster configuration file", (gptr*) &glob.config_filename, (gptr*) &glob.config_filename, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "daemon", 'd', "Run ndb_mgmd in daemon mode (default)", (gptr*) &glob.daemon, (gptr*) &glob.daemon, 0, GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 }, - { "l", 'l', "Specify configuration file connect string (default Ndb.cfg if available)", - (gptr*) &glob.local_config_filename, (gptr*) &glob.local_config_filename, 0, - GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "interactive", 256, "Run interactive. Not supported but provided for testing purposes", (gptr*) &glob.interactive, (gptr*) &glob.interactive, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, @@ -173,7 +178,7 @@ int main(int argc, char** argv) global_mgmt_server_check = 1; glob.config_filename= "config.ini"; - const char *load_default_groups[]= { "ndb_mgmd",0 }; + const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; @@ -189,29 +194,16 @@ int main(int argc, char** argv) MgmApiService * mapi = new MgmApiService(); - /**************************** - * Read configuration files * - ****************************/ - LocalConfig local_config; - if(!local_config.init(opt_connect_str,glob.local_config_filename)){ - local_config.printError(); - goto error_end; - } - glob.localNodeId = local_config._ownNodeId; + glob.mgmObject = new MgmtSrvr(glob.socketServer, + glob.config_filename, + opt_connect_str); - if (!readGlobalConfig()) + if (glob.mgmObject->init()) goto error_end; - glob.mgmObject = new MgmtSrvr(glob.localNodeId, glob.socketServer, - BaseString(glob.config_filename), - local_config, - glob.cluster_config); - chdir(NdbConfig_get_path(0)); - glob.cluster_config = 0; glob.localNodeId= glob.mgmObject->getOwnNodeId(); - if (glob.localNodeId == 0) { goto error_end; } @@ -322,9 +314,7 @@ MgmGlobals::MgmGlobals(){ // Default values port = 0; config_filename = NULL; - local_config_filename = NULL; interface_name = 0; - cluster_config = 0; daemon = 1; non_interactive = 0; interactive = 0; @@ -337,27 +327,6 @@ MgmGlobals::~MgmGlobals(){ delete socketServer; if (mgmObject) delete mgmObject; - if (cluster_config) - delete cluster_config; if (interface_name) free(interface_name); } - -/** - * @fn readGlobalConfig - * @param glob : Global variables - * @return true if success, false otherwise. - */ -static bool -readGlobalConfig() { - if(glob.config_filename == NULL) - return false; - - /* Use config file */ - InitConfigFileParser parser; - glob.cluster_config = parser.parseConfig(glob.config_filename); - if(glob.cluster_config == 0){ - return false; - } - return true; -} diff --git a/ndb/src/ndbapi/NdbConnection.cpp b/ndb/src/ndbapi/NdbConnection.cpp index 719c5bef49e..f4bb000300a 100644 --- a/ndb/src/ndbapi/NdbConnection.cpp +++ b/ndb/src/ndbapi/NdbConnection.cpp @@ -1122,8 +1122,11 @@ NdbConnection::getNdbIndexScanOperation(const NdbIndexImpl* index, if (indexTable != 0){ NdbIndexScanOperation* tOp = getNdbScanOperation((NdbTableImpl *) indexTable); - tOp->m_currentTable = table; - if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor; + if(tOp) + { + tOp->m_currentTable = table; + tOp->m_cursor_type = NdbScanOperation::IndexCursor; + } return tOp; } else { setOperationErrorCodeAbort(theNdb->theError.code); @@ -1618,9 +1621,6 @@ from other transactions. /** * There's always a TCKEYCONF when using IgnoreError */ -#ifdef VM_TRACE - ndbout_c("Not completing transaction 2"); -#endif return -1; } /**********************************************************************/ @@ -1872,9 +1872,6 @@ NdbConnection::OpCompleteFailure(Uint8 abortOption, bool setFailure) /** * There's always a TCKEYCONF when using IgnoreError */ -#ifdef VM_TRACE - ndbout_c("Not completing transaction"); -#endif return -1; } diff --git a/ndb/src/ndbapi/NdbConnectionScan.cpp b/ndb/src/ndbapi/NdbConnectionScan.cpp index 3fe8993a42b..a1a220caacf 100644 --- a/ndb/src/ndbapi/NdbConnectionScan.cpp +++ b/ndb/src/ndbapi/NdbConnectionScan.cpp @@ -57,12 +57,18 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){ if(checkState_TransId(&ref->transId1)){ theScanningOp->theError.code = ref->errorCode; + theScanningOp->execCLOSE_SCAN_REP(); if(!ref->closeNeeded){ - theScanningOp->execCLOSE_SCAN_REP(); return 0; } - assert(theScanningOp->m_sent_receivers_count); + + /** + * Setup so that close_impl will actually perform a close + * and not "close scan"-optimze it away + */ theScanningOp->m_conf_receivers_count++; + theScanningOp->m_conf_receivers[0] = theScanningOp->m_receivers[0]; + theScanningOp->m_conf_receivers[0]->m_tcPtrI = ~0; return 0; } else { #ifdef NDB_NO_DROPPED_SIGNAL @@ -97,7 +103,7 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, theScanningOp->execCLOSE_SCAN_REP(); return 0; } - + for(Uint32 i = 0; i<len; i += 3){ Uint32 opCount, totalLen; Uint32 ptrI = * ops++; @@ -109,24 +115,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, void * tPtr = theNdb->int2void(ptrI); assert(tPtr); // For now NdbReceiver* tOp = theNdb->void2rec(tPtr); - if (tOp && tOp->checkMagicNumber()){ - if(tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)){ - /** - * - */ - theScanningOp->receiver_delivered(tOp); - } else if(info == ScanTabConf::EndOfData){ + if (tOp && tOp->checkMagicNumber()) + { + if (tcPtrI == RNIL && opCount == 0) theScanningOp->receiver_completed(tOp); - } - } - } - if (conf->requestInfo & ScanTabConf::EndOfData) { - if(theScanningOp->m_ordered) - theScanningOp->m_api_receivers_count = 0; - if(theScanningOp->m_api_receivers_count + - theScanningOp->m_conf_receivers_count + - theScanningOp->m_sent_receivers_count){ - abort(); + else if (tOp->execSCANOPCONF(tcPtrI, totalLen, opCount)) + theScanningOp->receiver_delivered(tOp); } } return 0; diff --git a/ndb/src/ndbapi/NdbResultSet.cpp b/ndb/src/ndbapi/NdbResultSet.cpp index f270584d227..d9d71464026 100644 --- a/ndb/src/ndbapi/NdbResultSet.cpp +++ b/ndb/src/ndbapi/NdbResultSet.cpp @@ -44,10 +44,10 @@ void NdbResultSet::init() { } -int NdbResultSet::nextResult(bool fetchAllowed) +int NdbResultSet::nextResult(bool fetchAllowed, bool forceSend) { int res; - if ((res = m_operation->nextResult(fetchAllowed)) == 0) { + if ((res = m_operation->nextResult(fetchAllowed, forceSend)) == 0) { // handle blobs NdbBlob* tBlob = m_operation->theBlobList; while (tBlob != 0) { @@ -67,9 +67,9 @@ int NdbResultSet::nextResult(bool fetchAllowed) return res; } -void NdbResultSet::close() +void NdbResultSet::close(bool forceSend) { - m_operation->closeScan(); + m_operation->closeScan(forceSend); } NdbOperation* @@ -98,6 +98,6 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){ } int -NdbResultSet::restart(){ - return m_operation->restart(); +NdbResultSet::restart(bool forceSend){ + return m_operation->restart(forceSend); } diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 4b10ebb10cd..db0c294708d 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -35,6 +35,8 @@ #include <signaldata/AttrInfo.hpp> #include <signaldata/TcKeyReq.hpp> +#define DEBUG_NEXT_RESULT 0 + NdbScanOperation::NdbScanOperation(Ndb* aNdb) : NdbOperation(aNdb), m_resultSet(0), @@ -275,6 +277,9 @@ NdbScanOperation::fix_receivers(Uint32 parallel){ void NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ if(theError.code == 0){ + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver_delivered"); + Uint32 idx = tRec->m_list_index; Uint32 last = m_sent_receivers_count - 1; if(idx != last){ @@ -298,6 +303,9 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ void NdbScanOperation::receiver_completed(NdbReceiver* tRec){ if(theError.code == 0){ + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver_completed"); + Uint32 idx = tRec->m_list_index; Uint32 last = m_sent_receivers_count - 1; if(idx != last){ @@ -445,12 +453,12 @@ NdbScanOperation::executeCursor(int nodeId){ return -1; } -#define DEBUG_NEXT_RESULT 0 -int NdbScanOperation::nextResult(bool fetchAllowed) +int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend) { if(m_ordered) - return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed); + return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed, + forceSend); /** * Check current receiver @@ -487,7 +495,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); Uint32 seq = theNdbCon->theNodeSequence; - if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){ + if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false, + forceSend) == 0){ idx = m_current_api_receiver; last = m_api_receivers_count; @@ -578,8 +587,9 @@ int NdbScanOperation::nextResult(bool fetchAllowed) } int -NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ - if(cnt > 0 || stopScanFlag){ +NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, + bool forceSend){ + if(cnt > 0){ NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); @@ -595,38 +605,57 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ */ Uint32 last = m_sent_receivers_count; Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4); + Uint32 sent = 0; for(Uint32 i = 0; i<cnt; i++){ NdbReceiver * tRec = m_api_receivers[i]; - m_sent_receivers[last+i] = tRec; - tRec->m_list_index = last+i; - prep_array[i] = tRec->m_tcPtrI; - tRec->prepareSend(); + if((prep_array[sent] = tRec->m_tcPtrI) != RNIL) + { + m_sent_receivers[last+sent] = tRec; + tRec->m_list_index = last+sent; + tRec->prepareSend(); + sent++; + } } - memcpy(&m_api_receivers[0], &m_api_receivers[cnt], cnt * sizeof(char*)); + memmove(m_api_receivers, m_api_receivers+cnt, + (theParallelism-cnt) * sizeof(char*)); - Uint32 nodeId = theNdbCon->theDBnode; - TransporterFacade * tp = TransporterFacade::instance(); - int ret; - if(cnt > 21){ - tSignal.setLength(4); - LinearSectionPtr ptr[3]; - ptr[0].p = prep_array; - ptr[0].sz = cnt; - ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); - } else { - tSignal.setLength(4+cnt); - ret = tp->sendSignal(&tSignal, nodeId); + int ret = 0; + if(sent) + { + Uint32 nodeId = theNdbCon->theDBnode; + TransporterFacade * tp = TransporterFacade::instance(); + if(cnt > 21){ + tSignal.setLength(4); + LinearSectionPtr ptr[3]; + ptr[0].p = prep_array; + ptr[0].sz = sent; + ret = tp->sendSignal(&tSignal, nodeId, ptr, 1); + } else { + tSignal.setLength(4+sent); + ret = tp->sendSignal(&tSignal, nodeId); + } } + + if (!ret) checkForceSend(forceSend); - m_sent_receivers_count = last + cnt + stopScanFlag; + m_sent_receivers_count = last + sent; m_api_receivers_count -= cnt; m_current_api_receiver = 0; - + return ret; } return 0; } +void NdbScanOperation::checkForceSend(bool forceSend) +{ + if (forceSend) { + TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber); + } else { + TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber); + }//if +} + int NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId) { @@ -642,7 +671,7 @@ NdbScanOperation::doSend(int ProcessorId) return 0; } -void NdbScanOperation::closeScan() +void NdbScanOperation::closeScan(bool forceSend) { if(m_transConnection){ if(DEBUG_NEXT_RESULT) @@ -657,7 +686,7 @@ void NdbScanOperation::closeScan() TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); - close_impl(tp); + close_impl(tp, forceSend); } while(0); @@ -673,6 +702,7 @@ NdbScanOperation::execCLOSE_SCAN_REP(){ m_api_receivers_count = 0; m_conf_receivers_count = 0; m_sent_receivers_count = 0; + m_current_api_receiver = m_ordered ? theParallelism : 0; } void NdbScanOperation::release() @@ -1293,7 +1323,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, } int -NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ +NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, + bool forceSend){ Uint32 u_idx = 0, u_last = 0; Uint32 s_idx = m_current_api_receiver; // first sorted @@ -1319,7 +1350,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ Guard guard(tp->theMutexPtr); Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; - if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){ + if(seq == tp->getNodeSequence(nodeId) && + !send_next_scan_ordered(s_idx, forceSend)){ Uint32 tmp = m_sent_receivers_count; s_idx = m_current_api_receiver; while(m_sent_receivers_count > 0 && !theError.code){ @@ -1408,14 +1440,26 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ } int -NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ +NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){ if(idx == theParallelism) return 0; + NdbReceiver* tRec = m_api_receivers[idx]; NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); + Uint32 last = m_sent_receivers_count; Uint32* theData = tSignal.getDataPtrSend(); + Uint32* prep_array = theData + 4; + + m_current_api_receiver = idx + 1; + if((prep_array[0] = tRec->m_tcPtrI) == RNIL) + { + if(DEBUG_NEXT_RESULT) + ndbout_c("receiver completed, don't send"); + return 0; + } + theData[0] = theNdbCon->theTCConPtr; theData[1] = 0; Uint64 transId = theNdbCon->theTransactionId; @@ -1425,35 +1469,35 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ /** * Prepare ops */ - Uint32 last = m_sent_receivers_count; - Uint32 * prep_array = theData + 4; - - NdbReceiver * tRec = m_api_receivers[idx]; m_sent_receivers[last] = tRec; tRec->m_list_index = last; - prep_array[0] = tRec->m_tcPtrI; tRec->prepareSend(); - m_sent_receivers_count = last + 1; - m_current_api_receiver = idx + 1; Uint32 nodeId = theNdbCon->theDBnode; TransporterFacade * tp = TransporterFacade::instance(); tSignal.setLength(4+1); - return tp->sendSignal(&tSignal, nodeId); + int ret= tp->sendSignal(&tSignal, nodeId); + if (!ret) checkForceSend(forceSend); + return ret; } int -NdbScanOperation::close_impl(TransporterFacade* tp){ +NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; - if(seq != tp->getNodeSequence(nodeId)){ + if(seq != tp->getNodeSequence(nodeId)) + { theNdbCon->theReleaseOnClose = true; return -1; } - while(theError.code == 0 && m_sent_receivers_count){ + /** + * Wait for outstanding + */ + while(theError.code == 0 && m_sent_receivers_count) + { theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1471,18 +1515,52 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ } } - if(m_api_receivers_count+m_conf_receivers_count){ - // Send close scan - if(send_next_scan(0, true) == -1){ // Close scan - theNdbCon->theReleaseOnClose = true; - return -1; - } + /** + * move all conf'ed into api + * so that send_next_scan can check if they needs to be closed + */ + Uint32 api = m_api_receivers_count; + Uint32 conf = m_conf_receivers_count; + + if(m_ordered) + { + /** + * Ordered scan, keep the m_api_receivers "to the right" + */ + memmove(m_api_receivers, m_api_receivers+m_current_api_receiver, + (theParallelism - m_current_api_receiver) * sizeof(char*)); + api = (theParallelism - m_current_api_receiver); + m_api_receivers_count = api; + } + + if(DEBUG_NEXT_RESULT) + ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d", + m_ordered, api, conf, + m_sent_receivers_count, m_current_api_receiver, theParallelism); + + if(api+conf) + { + /** + * There's something to close + * setup m_api_receivers (for send_next_scan) + */ + memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*)); + m_api_receivers_count = api + conf; + m_conf_receivers_count = 0; + } + + // Send close scan + if(send_next_scan(api+conf, true, forceSend) == -1) + { + theNdbCon->theReleaseOnClose = true; + return -1; } /** * wait for close scan conf */ - while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){ + while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count) + { theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_state = WAIT_SCAN; int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); @@ -1499,6 +1577,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ return -1; } } + return 0; } @@ -1520,7 +1599,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){ } int -NdbScanOperation::restart() +NdbScanOperation::restart(bool forceSend) { TransporterFacade* tp = TransporterFacade::instance(); @@ -1529,7 +1608,7 @@ NdbScanOperation::restart() { int res; - if((res= close_impl(tp))) + if((res= close_impl(tp, forceSend))) { return res; } @@ -1548,13 +1627,13 @@ NdbScanOperation::restart() } int -NdbIndexScanOperation::reset_bounds(){ +NdbIndexScanOperation::reset_bounds(bool forceSend){ int res; { TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); - res= close_impl(tp); + res= close_impl(tp, forceSend); } if(!res) diff --git a/ndb/src/ndbapi/ndb_cluster_connection.cpp b/ndb/src/ndbapi/ndb_cluster_connection.cpp index 4c42fe1aeef..b2043b2c2c1 100644 --- a/ndb/src/ndbapi/ndb_cluster_connection.cpp +++ b/ndb/src/ndbapi/ndb_cluster_connection.cpp @@ -45,7 +45,6 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) else m_connect_string= 0; m_config_retriever= 0; - m_local_config= 0; m_connect_thread= 0; m_connect_callback= 0; @@ -125,38 +124,31 @@ int Ndb_cluster_connection::connect(int reconnect) do { if (m_config_retriever == 0) { - if (m_local_config == 0) { - m_local_config= new LocalConfig(); - if (!m_local_config->init(m_connect_string,0)) { - ndbout_c("Configuration error: Unable to retrieve local config"); - m_local_config->printError(); - m_local_config->printUsage(); - DBUG_RETURN(-1); - } - } m_config_retriever= - new ConfigRetriever(*m_local_config, NDB_VERSION, NODE_TYPE_API); + new ConfigRetriever(m_connect_string, NDB_VERSION, NODE_TYPE_API); + if (m_config_retriever->hasError()) + { + printf("Could not connect initialize handle to management server", + m_config_retriever->getErrorString()); + DBUG_RETURN(-1); + } } else if (reconnect == 0) DBUG_RETURN(0); if (reconnect) { - int r= m_config_retriever->do_connect(1); + int r= m_config_retriever->do_connect(0,0,0); if (r == 1) DBUG_RETURN(1); // mgmt server not up yet if (r == -1) break; } else - if(m_config_retriever->do_connect() == -1) + if(m_config_retriever->do_connect(12,5,1) == -1) break; - Uint32 nodeId = m_config_retriever->allocNodeId(); - for(Uint32 i = 0; nodeId == 0 && i<5; i++){ - NdbSleep_SecSleep(3); - nodeId = m_config_retriever->allocNodeId(); - } + Uint32 nodeId = m_config_retriever->allocNodeId(4/*retries*/,3/*delay*/); if(nodeId == 0) break; ndb_mgm_configuration * props = m_config_retriever->getConfig(); @@ -200,8 +192,6 @@ Ndb_cluster_connection::~Ndb_cluster_connection() my_free(m_connect_string,MYF(MY_ALLOW_ZERO_PTR)); if (m_config_retriever) delete m_config_retriever; - if (m_local_config) - delete m_local_config; DBUG_VOID_RETURN; } diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c index e08b80f2433..bc49358cc63 100644 --- a/ndb/src/ndbapi/ndberror.c +++ b/ndb/src/ndbapi/ndberror.c @@ -426,7 +426,8 @@ ErrorBundle ErrorCodes[] = { { 4267, IE, "Corrupted blob value" }, { 4268, IE, "Error in blob head update forced rollback of transaction" }, { 4268, IE, "Unknown blob error" }, - { 4269, IE, "No connection to ndb management server" } + { 4269, IE, "No connection to ndb management server" }, + { 4335, AE, "Only one autoincrement column allowed per table. Having a table without primary key uses an autoincremented hidden key, i.e. a table without a primary key can not have an autoincremented column" } }; static diff --git a/ndb/test/include/HugoTransactions.hpp b/ndb/test/include/HugoTransactions.hpp index 19e4cb43336..b833f2ac629 100644 --- a/ndb/test/include/HugoTransactions.hpp +++ b/ndb/test/include/HugoTransactions.hpp @@ -36,15 +36,21 @@ public: bool allowConstraintViolation = true, int doSleep = 0, bool oneTrans = false); + int scanReadRecords(Ndb*, int records, int abort = 0, int parallelism = 0, - bool committed = false); - int scanReadCommittedRecords(Ndb*, - int records, - int abort = 0, - int parallelism = 0); + NdbOperation::LockMode = NdbOperation::LM_Read); + + int scanReadRecords(Ndb*, + const NdbDictionary::Index*, + int records, + int abort = 0, + int parallelism = 0, + NdbOperation::LockMode = NdbOperation::LM_Read, + bool sorted = false); + int pkReadRecords(Ndb*, int records, int batchsize = 1, diff --git a/ndb/test/include/NdbRestarter.hpp b/ndb/test/include/NdbRestarter.hpp index 114726f6a2b..19a88b4f8ad 100644 --- a/ndb/test/include/NdbRestarter.hpp +++ b/ndb/test/include/NdbRestarter.hpp @@ -87,8 +87,6 @@ protected: bool connected; BaseString addr; - BaseString host; - int port; NdbMgmHandle handle; ndb_mgm_configuration * m_config; protected: diff --git a/ndb/test/include/UtilTransactions.hpp b/ndb/test/include/UtilTransactions.hpp index 37cd99550a5..23902f3b317 100644 --- a/ndb/test/include/UtilTransactions.hpp +++ b/ndb/test/include/UtilTransactions.hpp @@ -53,11 +53,11 @@ public: int selectCount(Ndb*, int parallelism = 0, int* count_rows = NULL, - ScanLock lock = SL_Read, + NdbOperation::LockMode lm = NdbOperation::LM_CommittedRead, NdbConnection* pTrans = NULL); int scanReadRecords(Ndb*, int parallelism, - bool exclusive, + NdbOperation::LockMode lm, int records, int noAttribs, int* attrib_list, diff --git a/ndb/test/ndbapi/testReadPerf.cpp b/ndb/test/ndbapi/testReadPerf.cpp index 380a809ad00..3adcb5a2d9b 100644 --- a/ndb/test/ndbapi/testReadPerf.cpp +++ b/ndb/test/ndbapi/testReadPerf.cpp @@ -391,8 +391,15 @@ run_read(){ void print_result(){ + int tmp = 1; + tmp *= g_paramters[P_RANGE].value; + tmp *= g_paramters[P_LOOPS].value; + + int t, t2; for(int i = 0; i<P_OP_TYPES; i++){ - g_err.println("%s avg: %u us/row", g_ops[i], - (1000*g_times[i])/(g_paramters[P_RANGE].value*g_paramters[P_LOOPS].value)); + g_err << g_ops[i] << " avg: " + << (int)((1000*g_times[i])/tmp) + << " us/row (" + << (1000 * tmp)/g_times[i] << " rows / sec)" << endl; } } diff --git a/ndb/test/ndbapi/testScan.cpp b/ndb/test/ndbapi/testScan.cpp index 0cd30dfefde..51913e8fbf9 100644 --- a/ndb/test/ndbapi/testScan.cpp +++ b/ndb/test/ndbapi/testScan.cpp @@ -242,8 +242,9 @@ int runScanReadCommitted(NDBT_Context* ctx, NDBT_Step* step){ HugoTransactions hugoTrans(*ctx->getTab()); while (i<loops && !ctx->isTestStopped()) { g_info << i << ": "; - if (hugoTrans.scanReadCommittedRecords(GETNDB(step), records, - abort, parallelism) != 0){ + if (hugoTrans.scanReadRecords(GETNDB(step), records, + abort, parallelism, + NdbOperation::LM_CommittedRead) != 0){ return NDBT_FAILED; } i++; @@ -639,7 +640,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){ g_info << (unsigned)i << endl; if(utilTrans.scanReadRecords(GETNDB(step), parallelism, - false, + NdbOperation::LM_Read, records, alist.attriblist[i]->numAttribs, alist.attriblist[i]->attribs) != 0){ @@ -647,7 +648,7 @@ int runCheckGetValue(NDBT_Context* ctx, NDBT_Step* step){ } if(utilTrans.scanReadRecords(GETNDB(step), parallelism, - true, + NdbOperation::LM_Read, records, alist.attriblist[i]->numAttribs, alist.attriblist[i]->attribs) != 0){ diff --git a/ndb/test/ndbapi/testScanPerf.cpp b/ndb/test/ndbapi/testScanPerf.cpp index 003fc67179f..ee2a92e88a9 100644 --- a/ndb/test/ndbapi/testScanPerf.cpp +++ b/ndb/test/ndbapi/testScanPerf.cpp @@ -39,8 +39,9 @@ struct Parameter { #define P_LOOPS 8 #define P_CREATE 9 #define P_LOAD 10 +#define P_RESET 11 -#define P_MAX 11 +#define P_MAX 12 static Parameter @@ -55,7 +56,8 @@ g_paramters[] = { { "size", 1000000, 1, ~0 }, { "iterations", 3, 1, ~0 }, { "create_drop", 1, 0, 1 }, - { "data", 1, 0, 1 } + { "data", 1, 0, 1 }, + { "q-reset bounds", 0, 1, 0 } }; static Ndb* g_ndb = 0; @@ -219,21 +221,29 @@ run_scan(){ NDB_TICKS start1, stop; int sum_time= 0; + int sample_rows = 0; + NDB_TICKS sample_start = NdbTick_CurrentMillisecond(); + Uint32 tot = g_paramters[P_ROWS].value; + if(g_paramters[P_BOUND].value == 2 || g_paramters[P_FILT].value == 2) + iter *= g_paramters[P_ROWS].value; + + NdbScanOperation * pOp = 0; + NdbIndexScanOperation * pIOp = 0; + NdbConnection * pTrans = 0; + NdbResultSet * rs = 0; + int check = 0; + for(int i = 0; i<iter; i++){ start1 = NdbTick_CurrentMillisecond(); - NdbConnection * pTrans = g_ndb->startTransaction(); + pTrans = pTrans ? pTrans : g_ndb->startTransaction(); if(!pTrans){ g_err << "Failed to start transaction" << endl; err(g_ndb->getNdbError()); return -1; } - NdbScanOperation * pOp; - NdbIndexScanOperation * pIOp; - - NdbResultSet * rs; int par = g_paramters[P_PARRA].value; int bat = g_paramters[P_BATCH].value; NdbScanOperation::LockMode lm; @@ -256,9 +266,17 @@ run_scan(){ assert(pOp); rs = pOp->readTuples(lm, bat, par); } else { - pOp = pIOp = pTrans->getNdbIndexScanOperation(g_indexname, g_tablename); - bool ord = g_paramters[P_ACCESS].value == 2; - rs = pIOp->readTuples(lm, bat, par, ord); + if(g_paramters[P_RESET].value == 0 || pIOp == 0) + { + pOp= pIOp= pTrans->getNdbIndexScanOperation(g_indexname, g_tablename); + bool ord = g_paramters[P_ACCESS].value == 2; + rs = pIOp->readTuples(lm, bat, par, ord); + } + else + { + pIOp->reset_bounds(); + } + switch(g_paramters[P_BOUND].value){ case 0: // All break; @@ -268,20 +286,22 @@ run_scan(){ case 2: { // 1 row default: assert(g_table->getNoOfPrimaryKeys() == 1); // only impl. so far - abort(); -#if 0 int tot = g_paramters[P_ROWS].value; int row = rand() % tot; +#if 0 fix_eq_bound(pIOp, row); +#else + pIOp->setBound((Uint32)0, NdbIndexScanOperation::BoundEQ, &row); #endif break; } } + if(g_paramters[P_RESET].value == 1) + goto execute; } assert(pOp); assert(rs); - int check = 0; switch(g_paramters[P_FILT].value){ case 0: // All check = pOp->interpret_exit_ok(); @@ -313,7 +333,7 @@ run_scan(){ for(int i = 0; i<g_table->getNoOfColumns(); i++){ pOp->getValue(i); } - +execute: int rows = 0; check = pTrans->execute(NoCommit); assert(check == 0); @@ -334,19 +354,29 @@ run_scan(){ return -1; } assert(check == 1); - g_info << "Found " << rows << " rows" << endl; - - pTrans->close(); - + if(g_paramters[P_RESET].value == 0) + { + pTrans->close(); + pTrans = 0; + } stop = NdbTick_CurrentMillisecond(); + int time_passed= (int)(stop - start1); - g_err.println("Time: %d ms = %u rows/sec", time_passed, - (1000*tot)/time_passed); + sample_rows += rows; sum_time+= time_passed; + + if(sample_rows >= tot) + { + int sample_time = (int)(stop - sample_start); + g_info << "Found " << sample_rows << " rows" << endl; + g_err.println("Time: %d ms = %u rows/sec", sample_time, + (1000*sample_rows)/sample_time); + sample_rows = 0; + sample_start = stop; + } } - sum_time= sum_time / iter; - - g_err.println("Avg time: %d ms = %u rows/sec", sum_time, - (1000*tot)/sum_time); + + g_err.println("Avg time: %d ms = %u rows/sec", sum_time/iter, + (1000*tot*iter)/sum_time); return 0; } diff --git a/ndb/test/run-test/daily-basic-tests.txt b/ndb/test/run-test/daily-basic-tests.txt index 8d7e8a06c72..aa38fb4763c 100644 --- a/ndb/test/run-test/daily-basic-tests.txt +++ b/ndb/test/run-test/daily-basic-tests.txt @@ -222,6 +222,10 @@ max-time: 500 cmd: testScan args: -n ScanRead488 -l 10 T6 +max-time: 500 +cmd: testScan +args: -n ScanRead488Timeout -l 10 T6 + max-time: 600 cmd: testScan args: -n ScanRead40 -l 100 T2 diff --git a/ndb/test/run-test/main.cpp b/ndb/test/run-test/main.cpp index e5f73bc6a5c..ac7710d9546 100644 --- a/ndb/test/run-test/main.cpp +++ b/ndb/test/run-test/main.cpp @@ -538,15 +538,19 @@ connect_ndb_mgm(atrt_process & proc){ } BaseString tmp = proc.m_hostname; tmp.appfmt(":%d", proc.m_ndb_mgm_port); - time_t start = time(0); - const time_t max_connect_time = 30; - do { - if(ndb_mgm_connect(handle, tmp.c_str()) != -1){ - proc.m_ndb_mgm_handle = handle; - return true; - } - sleep(1); - } while(time(0) < (start + max_connect_time)); + + if (ndb_mgm_set_connectstring(handle,tmp.c_str())) + { + g_logger.critical("Unable to create parse connectstring"); + return false; + } + + if(ndb_mgm_connect(handle, 30, 1, 0) != -1) + { + proc.m_ndb_mgm_handle = handle; + return true; + } + g_logger.critical("Unable to connect to ndb mgm %s", tmp.c_str()); return false; } diff --git a/ndb/test/src/HugoTransactions.cpp b/ndb/test/src/HugoTransactions.cpp index 456bfffbb77..096f5406bbf 100644 --- a/ndb/test/src/HugoTransactions.cpp +++ b/ndb/test/src/HugoTransactions.cpp @@ -29,26 +29,175 @@ HugoTransactions::~HugoTransactions(){ deallocRows(); } - -int HugoTransactions::scanReadCommittedRecords(Ndb* pNdb, +int +HugoTransactions::scanReadRecords(Ndb* pNdb, int records, int abortPercent, - int parallelism){ - return scanReadRecords(pNdb, records, abortPercent, parallelism, true); + int parallelism, + NdbOperation::LockMode lm) +{ + + int retryAttempt = 0; + const int retryMax = 100; + int check, a; + NdbConnection *pTrans; + NdbScanOperation *pOp; + + while (true){ + + if (retryAttempt >= retryMax){ + g_err << "ERROR: has retried this operation " << retryAttempt + << " times, failing!" << endl; + return NDBT_FAILED; + } + + pTrans = pNdb->startTransaction(); + if (pTrans == NULL) { + const NdbError err = pNdb->getNdbError(); + + if (err.status == NdbError::TemporaryError){ + ERR(err); + NdbSleep_MilliSleep(50); + retryAttempt++; + continue; + } + ERR(err); + return NDBT_FAILED; + } + + pOp = pTrans->getNdbScanOperation(tab.getName()); + if (pOp == NULL) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + NdbResultSet * rs; + rs = pOp ->readTuples(lm); + + if( rs == 0 ) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + check = pOp->interpret_exit_ok(); + if( check == -1 ) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + for(a = 0; a<tab.getNoOfColumns(); a++){ + if((row.attributeStore(a) = + pOp->getValue(tab.getColumn(a)->getName())) == 0) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + } + + check = pTrans->execute(NoCommit); + if( check == -1 ) { + const NdbError err = pTrans->getNdbError(); + if (err.status == NdbError::TemporaryError){ + ERR(err); + pNdb->closeTransaction(pTrans); + NdbSleep_MilliSleep(50); + retryAttempt++; + continue; + } + ERR(err); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + // Abort after 1-100 or 1-records rows + int ranVal = rand(); + int abortCount = ranVal % (records == 0 ? 100 : records); + bool abortTrans = false; + if (abort > 0){ + // Abort if abortCount is less then abortPercent + if (abortCount < abortPercent) + abortTrans = true; + } + + int eof; + int rows = 0; + while((eof = rs->nextResult(true)) == 0){ + rows++; + if (calc.verifyRowValues(&row) != 0){ + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + if (abortCount == rows && abortTrans == true){ + ndbout << "Scan is aborted" << endl; + g_info << "Scan is aborted" << endl; + rs->close(); + if( check == -1 ) { + ERR(pTrans->getNdbError()); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + pNdb->closeTransaction(pTrans); + return NDBT_OK; + } + } + if (eof == -1) { + const NdbError err = pTrans->getNdbError(); + + if (err.status == NdbError::TemporaryError){ + ERR_INFO(err); + pNdb->closeTransaction(pTrans); + NdbSleep_MilliSleep(50); + switch (err.code){ + case 488: + case 245: + case 490: + // Too many active scans, no limit on number of retry attempts + break; + default: + retryAttempt++; + } + continue; + } + ERR(err); + pNdb->closeTransaction(pTrans); + return NDBT_FAILED; + } + + pNdb->closeTransaction(pTrans); + + g_info << rows << " rows have been read" << endl; + if (records != 0 && rows != records){ + g_err << "Check expected number of records failed" << endl + << " expected=" << records <<", " << endl + << " read=" << rows << endl; + return NDBT_FAILED; + } + + return NDBT_OK; + } + return NDBT_FAILED; } int HugoTransactions::scanReadRecords(Ndb* pNdb, + const NdbDictionary::Index * pIdx, int records, int abortPercent, int parallelism, - bool committed){ + NdbOperation::LockMode lm, + bool sorted) +{ int retryAttempt = 0; const int retryMax = 100; int check, a; NdbConnection *pTrans; - NdbScanOperation *pOp; + NdbIndexScanOperation *pOp; while (true){ @@ -72,7 +221,7 @@ HugoTransactions::scanReadRecords(Ndb* pNdb, return NDBT_FAILED; } - pOp = pTrans->getNdbScanOperation(tab.getName()); + pOp = pTrans->getNdbIndexScanOperation(pIdx->getName(), tab.getName()); if (pOp == NULL) { ERR(pTrans->getNdbError()); pNdb->closeTransaction(pTrans); @@ -80,8 +229,7 @@ HugoTransactions::scanReadRecords(Ndb* pNdb, } NdbResultSet * rs; - rs = pOp ->readTuples(committed ? NdbScanOperation::LM_CommittedRead : - NdbScanOperation::LM_Read); + rs = pOp ->readTuples(lm, 0, parallelism, sorted); if( rs == 0 ) { ERR(pTrans->getNdbError()); diff --git a/ndb/test/src/NDBT_Test.cpp b/ndb/test/src/NDBT_Test.cpp index 367223f8c98..1434617c988 100644 --- a/ndb/test/src/NDBT_Test.cpp +++ b/ndb/test/src/NDBT_Test.cpp @@ -839,9 +839,9 @@ void NDBT_TestSuite::execute(Ndb* ndb, const NdbDictionary::Table* pTab, continue; } pTab2 = pDict->getTable(pTab->getName()); - } else { + } else if(!pTab2) { pTab2 = pTab; - } + } ctx = new NDBT_Context(); ctx->setTab(pTab2); diff --git a/ndb/test/src/NdbBackup.cpp b/ndb/test/src/NdbBackup.cpp index 1ce48d495a5..09f52bf0bed 100644 --- a/ndb/test/src/NdbBackup.cpp +++ b/ndb/test/src/NdbBackup.cpp @@ -69,28 +69,19 @@ NdbBackup::getBackupDataDirForNode(int _node_id){ /** * Fetch configuration from management server */ - LocalConfig lc; - if (!lc.init(0,0)) { - abort(); - } - ConfigRetriever cr(lc, 0, NODE_TYPE_API); - ndb_mgm_configuration * p = 0; + ndb_mgm_configuration *p; + if (connect()) + return NULL; - BaseString tmp; tmp.assfmt("%s:%d", host.c_str(), port); - NdbMgmHandle handle = ndb_mgm_create_handle(); - if(handle == 0 || ndb_mgm_connect(handle, tmp.c_str()) != 0 || - (p = ndb_mgm_get_configuration(handle, 0)) == 0){ - - const char * s = 0; - if(p == 0 && handle != 0){ - s = ndb_mgm_get_latest_error_msg(handle); - if(s == 0) - s = "No error given!"; + if ((p = ndb_mgm_get_configuration(handle, 0)) == 0) + { + const char * s= ndb_mgm_get_latest_error_msg(handle); + if(s == 0) + s = "No error given!"; - ndbout << "Could not fetch configuration" << endl; - ndbout << s << endl; - return NULL; - } + ndbout << "Could not fetch configuration" << endl; + ndbout << s << endl; + return NULL; } /** @@ -155,13 +146,14 @@ NdbBackup::execRestore(bool _restore_data, ndbout << "scp res: " << res << endl; - BaseString::snprintf(buf, 255, "%sndb_restore -c \"host=%s\" -n %d -b %d %s %s .", + BaseString::snprintf(buf, 255, "%sndb_restore -c \"%s:%d\" -n %d -b %d %s %s .", #if 1 "", #else "valgrind --leak-check=yes -v " #endif - addr.c_str(), + ndb_mgm_get_connected_host(handle), + ndb_mgm_get_connected_port(handle), _node_id, _backup_id, _restore_data?"-r":"", diff --git a/ndb/test/src/NdbRestarter.cpp b/ndb/test/src/NdbRestarter.cpp index 4d6d3ddc001..91c0963feae 100644 --- a/ndb/test/src/NdbRestarter.cpp +++ b/ndb/test/src/NdbRestarter.cpp @@ -18,7 +18,6 @@ #include <NdbOut.hpp> #include <NdbSleep.h> #include <NdbTick.h> -#include <LocalConfig.hpp> #include <mgmapi_debug.h> #include <NDBT_Output.hpp> #include <random.h> @@ -33,42 +32,11 @@ NdbRestarter::NdbRestarter(const char* _addr): connected(false), - port(-1), handle(NULL), m_config(0) { if (_addr == NULL){ - LocalConfig lcfg; - if(!lcfg.init()){ - lcfg.printError(); - lcfg.printUsage(); - g_err << "NdbRestarter - Error parsing local config file" << endl; - return; - } - - if (lcfg.ids.size() == 0){ - g_err << "NdbRestarter - No management servers configured in local config file" << endl; - return; - } - - for (int i = 0; i<lcfg.ids.size(); i++){ - MgmtSrvrId * m = &lcfg.ids[i]; - - switch(m->type){ - case MgmId_TCP: - char buf[255]; - snprintf(buf, 255, "%s:%d", m->name.c_str(), m->port); - addr.assign(buf); - host.assign(m->name.c_str()); - port = m->port; - return; - break; - case MgmId_File: - break; - default: - break; - } - } + addr.assign(""); } else { addr.assign(_addr); } @@ -391,13 +359,22 @@ NdbRestarter::isConnected(){ int NdbRestarter::connect(){ + disconnect(); handle = ndb_mgm_create_handle(); if (handle == NULL){ g_err << "handle == NULL" << endl; return -1; } g_info << "Connecting to mgmsrv at " << addr.c_str() << endl; - if (ndb_mgm_connect(handle, addr.c_str()) == -1) { + if (ndb_mgm_set_connectstring(handle,addr.c_str())) + { + MGMERR(handle); + g_err << "Connection to " << addr.c_str() << " failed" << endl; + return -1; + } + + if (ndb_mgm_connect(handle, 0, 0, 0) == -1) + { MGMERR(handle); g_err << "Connection to " << addr.c_str() << " failed" << endl; return -1; diff --git a/ndb/test/src/UtilTransactions.cpp b/ndb/test/src/UtilTransactions.cpp index c0e6effd244..869f7fc76cb 100644 --- a/ndb/test/src/UtilTransactions.cpp +++ b/ndb/test/src/UtilTransactions.cpp @@ -619,7 +619,7 @@ UtilTransactions::addRowToInsert(Ndb* pNdb, int UtilTransactions::scanReadRecords(Ndb* pNdb, int parallelism, - bool exclusive, + NdbOperation::LockMode lm, int records, int noAttribs, int *attrib_list, @@ -669,10 +669,7 @@ UtilTransactions::scanReadRecords(Ndb* pNdb, return NDBT_FAILED; } - NdbResultSet * rs = pOp->readTuples(exclusive ? - NdbScanOperation::LM_Exclusive : - NdbScanOperation::LM_Read, - 0, parallelism); + NdbResultSet * rs = pOp->readTuples(lm, 0, parallelism); if( rs == 0 ) { ERR(pTrans->getNdbError()); pNdb->closeTransaction(pTrans); @@ -761,7 +758,7 @@ int UtilTransactions::selectCount(Ndb* pNdb, int parallelism, int* count_rows, - ScanLock lock, + NdbOperation::LockMode lm, NdbConnection* pTrans){ int retryAttempt = 0; @@ -785,19 +782,7 @@ UtilTransactions::selectCount(Ndb* pNdb, return NDBT_FAILED; } - NdbResultSet * rs; - switch(lock){ - case SL_ReadHold: - rs = pOp->readTuples(NdbScanOperation::LM_Read); - break; - case SL_Exclusive: - rs = pOp->readTuples(NdbScanOperation::LM_Exclusive); - break; - case SL_Read: - default: - rs = pOp->readTuples(NdbScanOperation::LM_CommittedRead); - } - + NdbResultSet * rs = pOp->readTuples(lm); if( rs == 0) { ERR(pTrans->getNdbError()); pNdb->closeTransaction(pTrans); diff --git a/ndb/test/tools/create_index.cpp b/ndb/test/tools/create_index.cpp index 75a657522f6..6e4c5377f4a 100644 --- a/ndb/test/tools/create_index.cpp +++ b/ndb/test/tools/create_index.cpp @@ -30,7 +30,7 @@ main(int argc, const char** argv){ const char* _dbname = "TEST_DB"; int _help = 0; - int _ordered, _pk; + int _ordered = 0, _pk = 1; struct getargs args[] = { { "database", 'd', arg_string, &_dbname, "dbname", diff --git a/ndb/test/tools/hugoScanRead.cpp b/ndb/test/tools/hugoScanRead.cpp index cdfdcea4654..42180207a8a 100644 --- a/ndb/test/tools/hugoScanRead.cpp +++ b/ndb/test/tools/hugoScanRead.cpp @@ -35,13 +35,17 @@ int main(int argc, const char** argv){ int _parallelism = 1; const char* _tabname = NULL; int _help = 0; - + int lock = NdbOperation::LM_Read; + int sorted = 0; + struct getargs args[] = { { "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" }, { "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" }, { "parallelism", 'p', arg_integer, &_parallelism, "parallelism(1-240)", "para" }, { "records", 'r', arg_integer, &_records, "Number of records", "recs" }, - { "usage", '?', arg_flag, &_help, "Print help", "" } + { "usage", '?', arg_flag, &_help, "Print help", "" }, + { "lock", 'm', arg_integer, &lock, "lock mode", "" }, + { "sorted", 's', arg_flag, &sorted, "sorted", "" } }; int num_args = sizeof(args) / sizeof(args[0]); int optind = 0; @@ -73,16 +77,48 @@ int main(int argc, const char** argv){ ndbout << " Table " << _tabname << " does not exist!" << endl; return NDBT_ProgramExit(NDBT_WRONGARGS); } + + const NdbDictionary::Index * pIdx = 0; + if(optind+1 < argc) + { + pIdx = MyNdb.getDictionary()->getIndex(argv[optind+1], _tabname); + if(!pIdx) + ndbout << " Index " << argv[optind+1] << " not found" << endl; + else + if(pIdx->getType() != NdbDictionary::Index::UniqueOrderedIndex && + pIdx->getType() != NdbDictionary::Index::OrderedIndex) + { + ndbout << " Index " << argv[optind+1] << " is not scannable" << endl; + pIdx = 0; + } + } HugoTransactions hugoTrans(*pTab); int i = 0; while (i<_loops || _loops==0) { ndbout << i << ": "; - if(hugoTrans.scanReadRecords(&MyNdb, - 0, - _abort, - _parallelism) != 0){ - return NDBT_ProgramExit(NDBT_FAILED); + if(!pIdx) + { + if(hugoTrans.scanReadRecords(&MyNdb, + 0, + _abort, + _parallelism, + (NdbOperation::LockMode)lock) != 0) + { + return NDBT_ProgramExit(NDBT_FAILED); + } + } + else + { + if(hugoTrans.scanReadRecords(&MyNdb, pIdx, + 0, + _abort, + _parallelism, + (NdbOperation::LockMode)lock, + sorted) != 0) + { + return NDBT_ProgramExit(NDBT_FAILED); + } } i++; } diff --git a/ndb/tools/delete_all.cpp b/ndb/tools/delete_all.cpp index a4fd73a5128..046ac8005d2 100644 --- a/ndb/tools/delete_all.cpp +++ b/ndb/tools/delete_all.cpp @@ -67,7 +67,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), int main(int argc, char** argv){ NDB_INIT(argv[0]); - const char *load_default_groups[]= { "ndb_tools",0 }; + const char *load_default_groups[]= { "mysql_cluster",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) diff --git a/ndb/tools/desc.cpp b/ndb/tools/desc.cpp index 8f7a2031ef0..c5e9efdfa8a 100644 --- a/ndb/tools/desc.cpp +++ b/ndb/tools/desc.cpp @@ -67,7 +67,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), int main(int argc, char** argv){ NDB_INIT(argv[0]); - const char *load_default_groups[]= { "ndb_tools",0 }; + const char *load_default_groups[]= { "mysql_cluster",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) diff --git a/ndb/tools/drop_index.cpp b/ndb/tools/drop_index.cpp index 1d4b454682f..6600811e0c4 100644 --- a/ndb/tools/drop_index.cpp +++ b/ndb/tools/drop_index.cpp @@ -64,7 +64,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), int main(int argc, char** argv){ NDB_INIT(argv[0]); - const char *load_default_groups[]= { "ndb_tools",0 }; + const char *load_default_groups[]= { "mysql_cluster",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) diff --git a/ndb/tools/drop_tab.cpp b/ndb/tools/drop_tab.cpp index 3362c7de47b..0661a8c599b 100644 --- a/ndb/tools/drop_tab.cpp +++ b/ndb/tools/drop_tab.cpp @@ -64,7 +64,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), int main(int argc, char** argv){ NDB_INIT(argv[0]); - const char *load_default_groups[]= { "ndb_tools",0 }; + const char *load_default_groups[]= { "mysql_cluster",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) diff --git a/ndb/tools/listTables.cpp b/ndb/tools/listTables.cpp index 05e864a35c4..b923207a4fe 100644 --- a/ndb/tools/listTables.cpp +++ b/ndb/tools/listTables.cpp @@ -220,7 +220,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), int main(int argc, char** argv){ NDB_INIT(argv[0]); const char* _tabname; - const char *load_default_groups[]= { "ndb_tools",0 }; + const char *load_default_groups[]= { "mysql_cluster",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) diff --git a/ndb/tools/restore/restore_main.cpp b/ndb/tools/restore/restore_main.cpp index c43791c6723..409ebd54764 100644 --- a/ndb/tools/restore/restore_main.cpp +++ b/ndb/tools/restore/restore_main.cpp @@ -143,7 +143,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), bool readArguments(int *pargc, char*** pargv) { - const char *load_default_groups[]= { "ndb_tools","ndb_restore",0 }; + const char *load_default_groups[]= { "mysql_cluster","ndb_restore",0 }; load_defaults("my",load_default_groups,pargc,pargv); if (handle_options(pargc, pargv, my_long_options, get_one_option)) { diff --git a/ndb/tools/select_all.cpp b/ndb/tools/select_all.cpp index 758c1e48c88..5efeed485a4 100644 --- a/ndb/tools/select_all.cpp +++ b/ndb/tools/select_all.cpp @@ -50,7 +50,7 @@ static struct my_option my_long_options[] = GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "parallelism", 'p', "parallelism", (gptr*) &_parallelism, (gptr*) &_parallelism, 0, - GET_INT, REQUIRED_ARG, 240, 0, 0, 0, 0, 0 }, + GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "lock", 'l', "Read(0), Read-hold(1), Exclusive(2)", (gptr*) &_lock, (gptr*) &_lock, 0, GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, @@ -105,7 +105,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), int main(int argc, char** argv){ NDB_INIT(argv[0]); - const char *load_default_groups[]= { "ndb_tools",0 }; + const char *load_default_groups[]= { "mysql_cluster",0 }; load_defaults("my",load_default_groups,&argc,&argv); const char* _tabname; int ho_error; @@ -133,13 +133,18 @@ int main(int argc, char** argv){ const NdbDictionary::Table* pTab = NDBT_Table::discoverTableFromDb(&MyNdb, _tabname); const NdbDictionary::Index * pIdx = 0; if(argc > 1){ - pIdx = MyNdb.getDictionary()->getIndex(argv[0], _tabname); + pIdx = MyNdb.getDictionary()->getIndex(argv[1], _tabname); } if(pTab == NULL){ ndbout << " Table " << _tabname << " does not exist!" << endl; return NDBT_ProgramExit(NDBT_WRONGARGS); } + + if(argc > 1 && pIdx == 0) + { + ndbout << " Index " << argv[1] << " does not exists" << endl; + } if(_order && pIdx == NULL){ ndbout << " Order flag given without an index" << endl; diff --git a/ndb/tools/select_count.cpp b/ndb/tools/select_count.cpp index 6ee49ddbff0..c3491f842d8 100644 --- a/ndb/tools/select_count.cpp +++ b/ndb/tools/select_count.cpp @@ -83,7 +83,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), int main(int argc, char** argv){ NDB_INIT(argv[0]); - const char *load_default_groups[]= { "ndb_tools",0 }; + const char *load_default_groups[]= { "mysql_cluster",0 }; load_defaults("my",load_default_groups,&argc,&argv); int ho_error; if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) diff --git a/ndb/tools/waiter.cpp b/ndb/tools/waiter.cpp index e24164ea807..5973b046f8f 100644 --- a/ndb/tools/waiter.cpp +++ b/ndb/tools/waiter.cpp @@ -23,7 +23,6 @@ #include <NdbOut.hpp> #include <NdbSleep.h> #include <kernel/ndb_limits.h> -#include <LocalConfig.hpp> #include <NDBT.hpp> @@ -75,7 +74,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), int main(int argc, char** argv){ NDB_INIT(argv[0]); - const char *load_default_groups[]= { "ndb_tools",0 }; + const char *load_default_groups[]= { "mysql_cluster",0 }; load_defaults("my",load_default_groups,&argc,&argv); const char* _hostName = NULL; int ho_error; @@ -85,39 +84,8 @@ int main(int argc, char** argv){ char buf[255]; _hostName = argv[0]; - if (_hostName == NULL){ - LocalConfig lcfg; - if(!lcfg.init(opt_connect_str, 0)) - { - lcfg.printError(); - lcfg.printUsage(); - g_err << "Error parsing local config file" << endl; - return NDBT_ProgramExit(NDBT_FAILED); - } - - for (unsigned i = 0; i<lcfg.ids.size();i++) - { - MgmtSrvrId * m = &lcfg.ids[i]; - - switch(m->type){ - case MgmId_TCP: - snprintf(buf, 255, "%s:%d", m->name.c_str(), m->port); - _hostName = buf; - break; - case MgmId_File: - break; - default: - break; - } - if (_hostName != NULL) - break; - } - if (_hostName == NULL) - { - g_err << "No management servers configured in local config file" << endl; - return NDBT_ProgramExit(NDBT_FAILED); - } - } + if (_hostName == 0) + _hostName= opt_connect_str; if (_no_contact) { if (waitClusterStatus(_hostName, NDB_MGM_NODE_STATUS_NO_CONTACT, _timeout) != 0) @@ -210,13 +178,19 @@ waitClusterStatus(const char* _addr, int _nodes[MAX_NDB_NODES]; int _num_nodes = 0; - handle = ndb_mgm_create_handle(); + handle = ndb_mgm_create_handle(); if (handle == NULL){ g_err << "handle == NULL" << endl; return -1; } g_info << "Connecting to mgmsrv at " << _addr << endl; - if (ndb_mgm_connect(handle, _addr) == -1) { + if (ndb_mgm_set_connectstring(handle, _addr)) + { + MGMERR(handle); + g_err << "Connectstring " << _addr << " invalid" << endl; + return -1; + } + if (ndb_mgm_connect(handle,0,0,1)) { MGMERR(handle); g_err << "Connection to " << _addr << " failed" << endl; return -1; diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 0dc1399db24..a77d9cf05af 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -1268,7 +1268,7 @@ inline int ha_ndbcluster::next_result(byte *buf) m_ops_pending= 0; m_blobs_pending= FALSE; } - check= cursor->nextResult(contact_ndb); + check= cursor->nextResult(contact_ndb, m_force_send); if (check == 0) { // One more record found @@ -1561,7 +1561,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, DBUG_ASSERT(op->getSorted() == sorted); DBUG_ASSERT(op->getLockMode() == (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type)); - if(op->reset_bounds()) + if(op->reset_bounds(m_force_send)) DBUG_RETURN(ndb_err(m_active_trans)); } @@ -2394,7 +2394,7 @@ int ha_ndbcluster::index_last(byte *buf) int res; if((res= ordered_index_scan(0, 0, TRUE, buf)) == 0){ NdbResultSet *cursor= m_active_cursor; - while((res= cursor->nextResult(TRUE)) == 0); + while((res= cursor->nextResult(TRUE, m_force_send)) == 0); if(res == 1){ unpack_record(buf); table->status= 0; @@ -2480,7 +2480,7 @@ int ha_ndbcluster::rnd_init(bool scan) { if (!scan) DBUG_RETURN(1); - int res= cursor->restart(); + int res= cursor->restart(m_force_send); DBUG_ASSERT(res == 0); } index_init(table->primary_key); @@ -2511,7 +2511,7 @@ int ha_ndbcluster::close_scan() m_ops_pending= 0; } - cursor->close(); + cursor->close(m_force_send); m_active_cursor= NULL; DBUG_RETURN(0); } @@ -3035,6 +3035,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) m_transaction_on= FALSE; else m_transaction_on= thd->variables.ndb_use_transactions; + // m_use_local_query_cache= thd->variables.ndb_use_local_query_cache; m_active_trans= thd->transaction.all.ndb_tid ? (NdbConnection*)thd->transaction.all.ndb_tid: @@ -3761,7 +3762,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_ha_not_exact_count(FALSE), m_force_send(TRUE), m_autoincrement_prefetch(32), - m_transaction_on(TRUE) + m_transaction_on(TRUE), + m_use_local_query_cache(FALSE) { int i; @@ -4449,7 +4451,7 @@ bool ha_ndbcluster::low_byte_first() const } bool ha_ndbcluster::has_transactions() { - return TRUE; + return m_transaction_on; } const char* ha_ndbcluster::index_type(uint key_number) { @@ -4466,7 +4468,10 @@ const char* ha_ndbcluster::index_type(uint key_number) } uint8 ha_ndbcluster::table_cache_type() { - return HA_CACHE_TBL_NOCACHE; + if (m_use_local_query_cache) + return HA_CACHE_TBL_TRANSACT; + else + return HA_CACHE_TBL_NOCACHE; } /* @@ -4634,13 +4639,12 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, { DBUG_ENTER("ndb_get_table_statistics"); DBUG_PRINT("enter", ("table: %s", table)); - + NdbConnection* pTrans= ndb->startTransaction(); do { - NdbConnection* pTrans= ndb->startTransaction(); if (pTrans == NULL) break; - + NdbScanOperation* pOp= pTrans->getNdbScanOperation(table); if (pOp == NULL) break; @@ -4657,13 +4661,13 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows); pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits); - check= pTrans->execute(NoCommit); + check= pTrans->execute(NoCommit, AbortOnError, TRUE); if (check == -1) break; Uint64 sum_rows= 0; Uint64 sum_commits= 0; - while((check= rs->nextResult(TRUE)) == 0) + while((check= rs->nextResult(TRUE, TRUE)) == 0) { sum_rows+= rows; sum_commits+= commits; @@ -4672,6 +4676,8 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, if (check == -1) break; + rs->close(TRUE); + ndb->closeTransaction(pTrans); if(row_count) * row_count= sum_rows; @@ -4681,6 +4687,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, DBUG_RETURN(0); } while(0); + ndb->closeTransaction(pTrans); DBUG_PRINT("exit", ("failed")); DBUG_RETURN(-1); } diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index 354a98e698c..2f18a52b8e9 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -239,10 +239,12 @@ class ha_ndbcluster: public handler char *m_blobs_buffer; uint32 m_blobs_buffer_size; uint m_dupkey; + // set from thread variables at external lock bool m_ha_not_exact_count; bool m_force_send; ha_rows m_autoincrement_prefetch; bool m_transaction_on; + bool m_use_local_query_cache; void set_rec_per_key(); void records_update(); diff --git a/strings/ctype-uca.c b/strings/ctype-uca.c index 8df5b3277c1..89c876ad10c 100644 --- a/strings/ctype-uca.c +++ b/strings/ctype-uca.c @@ -7288,6 +7288,7 @@ int my_wildcmp_uca(CHARSET_INFO *cs, { while (1) { + my_bool escaped= 0; if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr, (const uchar*)wildend)) <= 0) return 1; @@ -7305,6 +7306,7 @@ int my_wildcmp_uca(CHARSET_INFO *cs, (const uchar*)wildend)) <= 0) return 1; wildstr+= scan; + escaped= 1; } if ((scan= mb_wc(cs, &s_wc, (const uchar*)str, @@ -7312,7 +7314,7 @@ int my_wildcmp_uca(CHARSET_INFO *cs, return 1; str+= scan; - if (w_wc == (my_wc_t)w_one) + if (!escaped && w_wc == (my_wc_t)w_one) { result= 1; /* Found an anchor char */ } diff --git a/strings/ctype-utf8.c b/strings/ctype-utf8.c index b3097649158..ce9346eb475 100644 --- a/strings/ctype-utf8.c +++ b/strings/ctype-utf8.c @@ -1545,31 +1545,33 @@ int my_wildcmp_unicode(CHARSET_INFO *cs, { while (1) { + my_bool escaped= 0; if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr, (const uchar*)wildend)) <= 0) return 1; - - if (w_wc == (my_wc_t)escape) - { - wildstr+= scan; - if ((scan= mb_wc(cs,&w_wc, (const uchar*)wildstr, - (const uchar*)wildend)) <= 0) - return 1; - } - + if (w_wc == (my_wc_t)w_many) { result= 1; /* Found an anchor char */ break; } - + wildstr+= scan; + if (w_wc == (my_wc_t)escape) + { + if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr, + (const uchar*)wildend)) <= 0) + return 1; + wildstr+= scan; + escaped= 1; + } + if ((scan= mb_wc(cs, &s_wc, (const uchar*)str, - (const uchar*)str_end)) <=0) + (const uchar*)str_end)) <= 0) return 1; str+= scan; - if (w_wc == (my_wc_t)w_one) + if (!escaped && w_wc == (my_wc_t)w_one) { result= 1; /* Found an anchor char */ } diff --git a/strings/uca-dump.c b/strings/uca-dump.c index db5cb7e999a..dd3b74a55e8 100644 --- a/strings/uca-dump.c +++ b/strings/uca-dump.c @@ -269,7 +269,7 @@ int main(int ac, char **av) */ int tmp= weight[i]; if (w == 2 && tmp) - tmp= (int)(0x100 - weight[i]); + tmp= (int)(0x20 - weight[i]); printf("0x%04X", tmp); @@ -304,7 +304,7 @@ int main(int ac, char **av) const char *comma= page < MY_UCA_NPAGES-1 ? "," : ""; const char *nline= (page+1) % 4 ? "" : "\n"; if (!pagemaxlen[page]) - printf("NULL %s%s", comma , nline); + printf("NULL %s%s%s", w ? " ": "", comma , nline); else printf("page%03Xdata%s%s%s", page, pname[w], comma, nline); } |