summaryrefslogtreecommitdiff
path: root/storage/ndb
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb')
-rw-r--r--storage/ndb/Makefile.am2
-rw-r--r--storage/ndb/include/kernel/GlobalSignalNumbers.h1
-rw-r--r--storage/ndb/include/kernel/signaldata/CopyFrag.hpp13
-rw-r--r--storage/ndb/include/mgmapi/mgmapi.h26
-rw-r--r--storage/ndb/include/mgmapi/mgmapi_debug.h14
-rw-r--r--storage/ndb/include/mgmcommon/ConfigRetriever.hpp3
-rw-r--r--storage/ndb/include/ndb_global.h.in4
-rw-r--r--storage/ndb/include/ndb_version.h.in3
-rw-r--r--storage/ndb/include/ndbapi/NdbDictionary.hpp10
-rw-r--r--storage/ndb/include/ndbapi/NdbScanOperation.hpp3
-rw-r--r--storage/ndb/include/ndbapi/NdbTransaction.hpp7
-rw-r--r--storage/ndb/include/util/InputStream.hpp11
-rw-r--r--storage/ndb/include/util/Parser.hpp5
-rw-r--r--storage/ndb/include/util/SocketClient.hpp3
-rw-r--r--storage/ndb/include/util/socket_io.h7
-rw-r--r--storage/ndb/src/common/debugger/signaldata/SignalNames.cpp1
-rw-r--r--storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp12
-rw-r--r--storage/ndb/src/common/transporter/Transporter.cpp18
-rw-r--r--storage/ndb/src/common/util/InputStream.cpp39
-rw-r--r--storage/ndb/src/common/util/Parser.cpp38
-rw-r--r--storage/ndb/src/common/util/SocketClient.cpp73
-rw-r--r--storage/ndb/src/common/util/socket_io.cpp35
-rw-r--r--storage/ndb/src/cw/cpcd/main.cpp2
-rw-r--r--storage/ndb/src/kernel/blocks/ERROR_codes.txt4
-rw-r--r--storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp4
-rw-r--r--storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp2
-rw-r--r--storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp377
-rw-r--r--storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp1
-rw-r--r--storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp3
-rw-r--r--storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp62
-rw-r--r--storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp30
-rw-r--r--storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp192
-rw-r--r--storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp38
-rw-r--r--storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp1
-rw-r--r--storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp21
-rw-r--r--storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp13
-rw-r--r--storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp221
-rw-r--r--storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp57
-rw-r--r--storage/ndb/src/kernel/vm/Configuration.cpp16
-rw-r--r--storage/ndb/src/mgmapi/mgmapi.cpp200
-rw-r--r--storage/ndb/src/mgmclient/main.cpp4
-rw-r--r--storage/ndb/src/mgmsrv/Services.cpp162
-rw-r--r--storage/ndb/src/mgmsrv/Services.hpp18
-rw-r--r--storage/ndb/src/mgmsrv/main.cpp4
-rw-r--r--storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp53
-rw-r--r--storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp2
-rw-r--r--storage/ndb/src/ndbapi/NdbScanOperation.cpp29
-rw-r--r--storage/ndb/src/ndbapi/NdbTransaction.cpp79
-rw-r--r--storage/ndb/test/ndbapi/testMgm.cpp45
-rw-r--r--storage/ndb/test/ndbapi/testOIBasic.cpp316
-rw-r--r--storage/ndb/test/ndbapi/testTimeout.cpp50
-rw-r--r--storage/ndb/test/run-test/daily-basic-tests.txt10
-rw-r--r--storage/ndb/tools/delete_all.cpp18
-rw-r--r--storage/ndb/tools/desc.cpp6
-rw-r--r--storage/ndb/tools/drop_index.cpp6
-rw-r--r--storage/ndb/tools/drop_tab.cpp8
-rw-r--r--storage/ndb/tools/listTables.cpp5
-rw-r--r--storage/ndb/tools/ndb_condig.cpp66
-rw-r--r--storage/ndb/tools/restore/restore_main.cpp7
-rw-r--r--storage/ndb/tools/select_all.cpp5
-rw-r--r--storage/ndb/tools/select_count.cpp6
-rw-r--r--storage/ndb/tools/waiter.cpp6
62 files changed, 1731 insertions, 746 deletions
diff --git a/storage/ndb/Makefile.am b/storage/ndb/Makefile.am
index 0b99ca39114..be9dc223d9d 100644
--- a/storage/ndb/Makefile.am
+++ b/storage/ndb/Makefile.am
@@ -1,6 +1,6 @@
SUBDIRS = src tools . include @ndb_opt_subdirs@
DIST_SUBDIRS = src tools include test docs
-EXTRA_DIST = config ndbapi-examples
+EXTRA_DIST = config ndbapi-examples plug.in
DIST_COMMON = $(srcdir)/Makefile.am $(srcdir)/Makefile.in
include $(top_srcdir)/storage/ndb/config/common.mk.am
diff --git a/storage/ndb/include/kernel/GlobalSignalNumbers.h b/storage/ndb/include/kernel/GlobalSignalNumbers.h
index 4b2c69e4bc6..b449915d756 100644
--- a/storage/ndb/include/kernel/GlobalSignalNumbers.h
+++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h
@@ -127,6 +127,7 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
/* 68 not unused */
/* 69 not unused */
/* 70 unused */
+#define GSN_UPDATE_FRAG_DIST_KEY_ORD 70
#define GSN_ACC_ABORTREQ 71
#define GSN_ACC_CHECK_SCAN 72
#define GSN_ACC_COMMITCONF 73
diff --git a/storage/ndb/include/kernel/signaldata/CopyFrag.hpp b/storage/ndb/include/kernel/signaldata/CopyFrag.hpp
index c92859fdcce..bc308c32935 100644
--- a/storage/ndb/include/kernel/signaldata/CopyFrag.hpp
+++ b/storage/ndb/include/kernel/signaldata/CopyFrag.hpp
@@ -30,7 +30,7 @@ class CopyFragReq {
*/
friend class Dblqh;
public:
- STATIC_CONST( SignalLength = 8 );
+ STATIC_CONST( SignalLength = 9 );
private:
Uint32 userPtr;
@@ -41,6 +41,8 @@ private:
Uint32 schemaVersion;
Uint32 distributionKey;
Uint32 gci;
+ Uint32 nodeCount;
+ Uint32 nodeList[1];
};
class CopyFragConf {
@@ -85,4 +87,13 @@ private:
Uint32 errorCode;
};
+struct UpdateFragDistKeyOrd
+{
+ Uint32 tableId;
+ Uint32 fragId;
+ Uint32 fragDistributionKey;
+
+ STATIC_CONST( SignalLength = 3 );
+};
+
#endif
diff --git a/storage/ndb/include/mgmapi/mgmapi.h b/storage/ndb/include/mgmapi/mgmapi.h
index e6449a7372d..b6b87ebaaa9 100644
--- a/storage/ndb/include/mgmapi/mgmapi.h
+++ b/storage/ndb/include/mgmapi/mgmapi.h
@@ -230,7 +230,9 @@ extern "C" {
NDB_MGM_SERVER_NOT_CONNECTED = 1010,
/** Could not connect to socker */
NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET = 1011,
-
+ /** Could not bind local address */
+ NDB_MGM_BIND_ADDRESS = 1012,
+
/* Alloc node id failures */
/** Generic error, retry may succeed */
NDB_MGM_ALLOCID_ERROR = 1101,
@@ -515,6 +517,15 @@ extern "C" {
const char *ndb_mgm_get_connectstring(NdbMgmHandle handle, char *buf, int buf_sz);
/**
+ * Set local bindaddress
+ * @param arg - Srting of form "host[:port]"
+ * @note must be called before connect
+ * @note Error on binding local address will not be reported until connect
+ * @return 0 on success
+ */
+ int ndb_mgm_set_bindaddress(NdbMgmHandle, const char * arg);
+
+ /**
* Gets the connectstring used for a connection
*
* @note This function returns the default connectstring if no call to
@@ -1070,6 +1081,19 @@ extern "C" {
int ndb_mgm_end_session(NdbMgmHandle handle);
/**
+ * ndb_mgm_get_fd
+ *
+ * get the file descriptor of the handle.
+ * INTERNAL ONLY.
+ * USE FOR TESTING. OTHER USES ARE NOT A GOOD IDEA.
+ *
+ * @param handle NDB management handle
+ * @return handle->socket
+ *
+ */
+ int ndb_mgm_get_fd(NdbMgmHandle handle);
+
+ /**
* Get the node id of the mgm server we're connected to
*/
Uint32 ndb_mgm_get_mgmd_nodeid(NdbMgmHandle handle);
diff --git a/storage/ndb/include/mgmapi/mgmapi_debug.h b/storage/ndb/include/mgmapi/mgmapi_debug.h
index 942e132d3b4..073d5f015ae 100644
--- a/storage/ndb/include/mgmapi/mgmapi_debug.h
+++ b/storage/ndb/include/mgmapi/mgmapi_debug.h
@@ -132,6 +132,20 @@ extern "C" {
const char * value,
struct ndb_mgm_reply* reply);
+ Uint64 ndb_mgm_get_session_id(NdbMgmHandle handle);
+
+ struct NdbMgmSession {
+ Uint64 id;
+ Uint32 m_stopSelf;
+ Uint32 m_stop;
+ Uint32 nodeid;
+ Uint32 parser_buffer_len;
+ Uint32 parser_status;
+ };
+
+ int ndb_mgm_get_session(NdbMgmHandle handle, Uint64 id,
+ struct NdbMgmSession *s, int *len);
+
#ifdef __cplusplus
}
#endif
diff --git a/storage/ndb/include/mgmcommon/ConfigRetriever.hpp b/storage/ndb/include/mgmcommon/ConfigRetriever.hpp
index 1b4ecd56f80..89a1eb976c8 100644
--- a/storage/ndb/include/mgmcommon/ConfigRetriever.hpp
+++ b/storage/ndb/include/mgmcommon/ConfigRetriever.hpp
@@ -28,7 +28,8 @@
class ConfigRetriever {
public:
ConfigRetriever(const char * _connect_string,
- Uint32 version, Uint32 nodeType);
+ Uint32 version, Uint32 nodeType,
+ const char * _bind_address = 0);
~ConfigRetriever();
int do_connect(int no_retries, int retry_delay_in_seconds, int verbose);
diff --git a/storage/ndb/include/ndb_global.h.in b/storage/ndb/include/ndb_global.h.in
index 15406ff4c3c..a427e5c820d 100644
--- a/storage/ndb/include/ndb_global.h.in
+++ b/storage/ndb/include/ndb_global.h.in
@@ -129,14 +129,10 @@ extern "C" {
#include "ndb_init.h"
-#ifdef SCO
-
#ifndef PATH_MAX
#define PATH_MAX 1024
#endif
-#endif /* SCO */
-
#ifndef MIN
#define MIN(x,y) (((x)<(y))?(x):(y))
#endif
diff --git a/storage/ndb/include/ndb_version.h.in b/storage/ndb/include/ndb_version.h.in
index 45d4b21cf6c..6a1b1f5292c 100644
--- a/storage/ndb/include/ndb_version.h.in
+++ b/storage/ndb/include/ndb_version.h.in
@@ -67,6 +67,9 @@ char ndb_version_string_buf[NDB_VERSION_STRING_BUF_SZ];
#define NDBD_DICT_LOCK_VERSION_5 MAKE_VERSION(5,0,23)
#define NDBD_DICT_LOCK_VERSION_5_1 MAKE_VERSION(5,1,12)
+#define NDBD_UPDATE_FRAG_DIST_KEY_50 MAKE_VERSION(5,0,26)
+#define NDBD_UPDATE_FRAG_DIST_KEY_51 MAKE_VERSION(5,1,12)
+
#define NDBD_QMGR_SINGLEUSER_VERSION_5 MAKE_VERSION(5,0,25)
#endif
diff --git a/storage/ndb/include/ndbapi/NdbDictionary.hpp b/storage/ndb/include/ndbapi/NdbDictionary.hpp
index 47dc5cd73dc..8d9ade2fb84 100644
--- a/storage/ndb/include/ndbapi/NdbDictionary.hpp
+++ b/storage/ndb/include/ndbapi/NdbDictionary.hpp
@@ -920,6 +920,16 @@ public:
bool getTemporary();
void setTemporary(bool);
+
+ /**
+ * Check if any of column in bitmaps are disk columns
+ * returns bitmap of different columns
+ * bit 0 = atleast 1 pk column is set
+ * bit 1 = atleast 1 disk column set
+ * bit 2 = atleast 1 non disk column set
+ * passing NULL pointer will equal to bitmap with all columns set
+ */
+ int checkColumns(const Uint32* bitmap, unsigned len_in_bytes) const;
#endif
// these 2 are not de-doxygenated
diff --git a/storage/ndb/include/ndbapi/NdbScanOperation.hpp b/storage/ndb/include/ndbapi/NdbScanOperation.hpp
index 34f62defa66..209cd7e29b8 100644
--- a/storage/ndb/include/ndbapi/NdbScanOperation.hpp
+++ b/storage/ndb/include/ndbapi/NdbScanOperation.hpp
@@ -42,7 +42,8 @@ public:
* readTuples.
*/
enum ScanFlag {
- SF_TupScan = (1 << 16), // scan TUP
+ SF_TupScan = (1 << 16), // scan TUP order
+ SF_DiskScan = (2 << 16), // scan in DISK order
SF_OrderBy = (1 << 24), // index scan in order
SF_Descending = (2 << 24), // index scan in descending order
SF_ReadRangeNo = (4 << 24), // enable @ref get_range_no
diff --git a/storage/ndb/include/ndbapi/NdbTransaction.hpp b/storage/ndb/include/ndbapi/NdbTransaction.hpp
index b3fb07d2905..8d367f1620a 100644
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp
@@ -658,8 +658,11 @@ private:
// Release all cursor operations in connection
void releaseOps(NdbOperation*);
void releaseScanOperations(NdbIndexScanOperation*);
- void releaseScanOperation(NdbIndexScanOperation*);
-
+ bool releaseScanOperation(NdbIndexScanOperation** listhead,
+ NdbIndexScanOperation** listtail,
+ NdbIndexScanOperation* op);
+ void releaseExecutedScanOperation(NdbIndexScanOperation*);
+
// Set the transaction identity of the transaction
void setTransactionId(Uint64 aTransactionId);
diff --git a/storage/ndb/include/util/InputStream.hpp b/storage/ndb/include/util/InputStream.hpp
index 10e22a70e42..4238b492917 100644
--- a/storage/ndb/include/util/InputStream.hpp
+++ b/storage/ndb/include/util/InputStream.hpp
@@ -19,14 +19,22 @@
#include <ndb_global.h>
#include <NdbTCP.h>
+#include <NdbMutex.h>
/**
* Input stream
*/
class InputStream {
public:
- virtual ~InputStream() {}
+ InputStream() { m_mutex= NULL; };
+ virtual ~InputStream() {};
virtual char* gets(char * buf, int bufLen) = 0;
+ /**
+ * Set the mutex to be UNLOCKED when blocking (e.g. select(2))
+ */
+ void set_mutex(NdbMutex *m) { m_mutex= m; };
+protected:
+ NdbMutex *m_mutex;
};
class FileInputStream : public InputStream {
@@ -42,6 +50,7 @@ extern FileInputStream Stdin;
class SocketInputStream : public InputStream {
NDB_SOCKET_TYPE m_socket;
unsigned m_timeout;
+ bool m_startover;
public:
SocketInputStream(NDB_SOCKET_TYPE socket, unsigned readTimeout = 1000);
virtual ~SocketInputStream() {}
diff --git a/storage/ndb/include/util/Parser.hpp b/storage/ndb/include/util/Parser.hpp
index 3baf7601a6c..5785961c293 100644
--- a/storage/ndb/include/util/Parser.hpp
+++ b/storage/ndb/include/util/Parser.hpp
@@ -61,12 +61,15 @@ public:
/**
* Context for parse
*/
- struct Context {
+ class Context {
+ public:
+ Context() { m_mutex= NULL; };
ParserStatus m_status;
const ParserRow<T> * m_currentCmd;
const ParserRow<T> * m_currentArg;
char * m_currentToken;
char m_tokenBuffer[512];
+ NdbMutex *m_mutex;
Vector<const ParserRow<T> *> m_aliasUsed;
};
diff --git a/storage/ndb/include/util/SocketClient.hpp b/storage/ndb/include/util/SocketClient.hpp
index bf1ad7d45d6..422560c8a78 100644
--- a/storage/ndb/include/util/SocketClient.hpp
+++ b/storage/ndb/include/util/SocketClient.hpp
@@ -37,7 +37,8 @@ public:
};
unsigned short get_port() { return m_port; };
char *get_server_name() { return m_server_name; };
- NDB_SOCKET_TYPE connect();
+ int bind(const char* toaddress, unsigned short toport);
+ NDB_SOCKET_TYPE connect(const char* toaddress = 0, unsigned short port = 0);
bool close();
};
diff --git a/storage/ndb/include/util/socket_io.h b/storage/ndb/include/util/socket_io.h
index a0e6c4e369d..fdac455ab57 100644
--- a/storage/ndb/include/util/socket_io.h
+++ b/storage/ndb/include/util/socket_io.h
@@ -21,12 +21,17 @@
#include <NdbTCP.h>
+#include <NdbMutex.h>
+
#ifdef __cplusplus
extern "C" {
#endif
int read_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len);
- int readln_socket(NDB_SOCKET_TYPE, int timeout_ms, char *, int len);
+
+ int readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
+ char * buf, int buflen, NdbMutex *mutex);
+
int write_socket(NDB_SOCKET_TYPE, int timeout_ms, const char[], int len);
int print_socket(NDB_SOCKET_TYPE, int timeout_ms, const char *, ...);
diff --git a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
index eed004a169f..259f7480e8f 100644
--- a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
+++ b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
@@ -633,6 +633,7 @@ const GsnName SignalNames [] = {
,{ GSN_DICT_LOCK_REF, "DICT_LOCK_REF" }
,{ GSN_DICT_UNLOCK_ORD, "DICT_UNLOCK_ORD" }
+ ,{ GSN_UPDATE_FRAG_DIST_KEY_ORD, "UPDATE_FRAG_DIST_KEY_ORD" }
,{ GSN_DICT_COMMIT_REQ, "DICT_COMMIT_REQ"}
};
const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);
diff --git a/storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp
index e24ed5b8ddc..ed55e2f2f2b 100644
--- a/storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp
+++ b/storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp
@@ -45,7 +45,8 @@
//****************************************************************************
ConfigRetriever::ConfigRetriever(const char * _connect_string,
- Uint32 version, Uint32 node_type)
+ Uint32 version, Uint32 node_type,
+ const char * _bindaddress)
{
DBUG_ENTER("ConfigRetriever::ConfigRetriever");
@@ -69,6 +70,15 @@ ConfigRetriever::ConfigRetriever(const char * _connect_string,
setError(CR_ERROR, tmp.c_str());
DBUG_VOID_RETURN;
}
+
+ if (_bindaddress)
+ {
+ if (ndb_mgm_set_bindaddress(m_handle, _bindaddress))
+ {
+ setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
+ DBUG_VOID_RETURN;
+ }
+ }
resetError();
DBUG_VOID_RETURN;
}
diff --git a/storage/ndb/src/common/transporter/Transporter.cpp b/storage/ndb/src/common/transporter/Transporter.cpp
index 820aa4cfc18..383456f1077 100644
--- a/storage/ndb/src/common/transporter/Transporter.cpp
+++ b/storage/ndb/src/common/transporter/Transporter.cpp
@@ -60,9 +60,6 @@ Transporter::Transporter(TransporterRegistry &t_reg,
}
strncpy(localHostName, lHostName, sizeof(localHostName));
- if (strlen(lHostName) > 0)
- Ndb_getInAddr(&localHostAddress, lHostName);
-
DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
remoteNodeId, localNodeId, isServer,
remoteHostName, localHostName,
@@ -128,10 +125,23 @@ Transporter::connect_client() {
return true;
if(isMgmConnection)
+ {
sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client);
+ }
else
+ {
+ if (!m_socket_client->init())
+ {
+ return false;
+ }
+ if (strlen(localHostName) > 0)
+ {
+ if (m_socket_client->bind(localHostName, 0) != 0)
+ return false;
+ }
sockfd= m_socket_client->connect();
-
+ }
+
return connect_client(sockfd);
}
diff --git a/storage/ndb/src/common/util/InputStream.cpp b/storage/ndb/src/common/util/InputStream.cpp
index 410e9a70e9c..59aeccef84d 100644
--- a/storage/ndb/src/common/util/InputStream.cpp
+++ b/storage/ndb/src/common/util/InputStream.cpp
@@ -36,26 +36,35 @@ FileInputStream::gets(char * buf, int bufLen){
SocketInputStream::SocketInputStream(NDB_SOCKET_TYPE socket,
unsigned readTimeout)
- : m_socket(socket) {
- m_timeout = readTimeout;
+ : m_socket(socket) {
+ m_startover= true;
+ m_timeout = readTimeout;
}
-char*
+char*
SocketInputStream::gets(char * buf, int bufLen) {
- buf[0] = 77;
assert(bufLen >= 2);
- int res = readln_socket(m_socket, m_timeout, buf, bufLen - 1);
+ int offset= 0;
+ if(m_startover)
+ {
+ buf[0]= '\0';
+ m_startover= false;
+ }
+ else
+ offset= strlen(buf);
+
+ int res = readln_socket(m_socket, m_timeout, buf+offset, bufLen-offset, m_mutex);
+
+ if(res == 0)
+ {
+ buf[0]=0;
+ return buf;
+ }
+
+ m_startover= true;
+
if(res == -1)
return 0;
- if(res == 0 && buf[0] == 77){ // select return 0
- buf[0] = 0;
- } else if(res == 0 && buf[0] == 0){ // only newline
- buf[0] = '\n';
- buf[1] = 0;
- } else {
- int len = strlen(buf);
- buf[len + 1] = '\0';
- buf[len] = '\n';
- }
+
return buf;
}
diff --git a/storage/ndb/src/common/util/Parser.cpp b/storage/ndb/src/common/util/Parser.cpp
index 3a86ae8f318..540724e4f68 100644
--- a/storage/ndb/src/common/util/Parser.cpp
+++ b/storage/ndb/src/common/util/Parser.cpp
@@ -32,6 +32,7 @@ public:
char* gets(char * buf, int bufLen);
void push_back(const char *);
+ void set_mutex(NdbMutex *m) { in.set_mutex(m); };
private:
InputStream & in;
char * buffer;
@@ -144,25 +145,32 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
{
DBUG_ENTER("ParserImpl::run");
+ input.set_mutex(ctx->m_mutex);
+
* pDst = 0;
bool ownStop = false;
if(stop == 0)
stop = &ownStop;
-
+
ctx->m_aliasUsed.clear();
-
+
const unsigned sz = sizeof(ctx->m_tokenBuffer);
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
if(Eof(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::Eof;
DBUG_RETURN(false);
}
-
- if(ctx->m_currentToken[0] == 0){
+
+ int last= strlen(ctx->m_currentToken);
+ if(last>0)
+ last--;
+
+ if(ctx->m_currentToken[last] !='\n'){
ctx->m_status = Parser<Dummy>::NoLine;
+ ctx->m_tokenBuffer[0]= '\0';
DBUG_RETURN(false);
}
-
+
if(Empty(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::EmptyLine;
DBUG_RETURN(false);
@@ -174,14 +182,14 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
ctx->m_status = Parser<Dummy>::UnknownCommand;
DBUG_RETURN(false);
}
-
+
Properties * p = new Properties();
-
+
bool invalidArgument = false;
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
-
- while((! * stop) &&
- !Eof(ctx->m_currentToken) &&
+
+ while((! * stop) &&
+ !Eof(ctx->m_currentToken) &&
!Empty(ctx->m_currentToken)){
if(ctx->m_currentToken[0] != 0){
trim(ctx->m_currentToken);
@@ -193,7 +201,7 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
}
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
}
-
+
if(invalidArgument){
char buf[sz];
char * tmp;
@@ -204,13 +212,13 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
}
DBUG_RETURN(false);
}
-
+
if(* stop){
delete p;
ctx->m_status = Parser<Dummy>::ExternalStop;
DBUG_RETURN(false);
}
-
+
if(!checkMandatory(ctx, p)){
ctx->m_status = Parser<Dummy>::MissingMandatoryArgument;
delete p;
@@ -226,9 +234,9 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
tmp.put("name", alias->name);
tmp.put("realName", alias->realName);
p->put("$ALIAS", i, &tmp);
- }
+ }
p->put("$ALIAS", ctx->m_aliasUsed.size());
-
+
ctx->m_status = Parser<Dummy>::Ok;
* pDst = p;
DBUG_RETURN(true);
diff --git a/storage/ndb/src/common/util/SocketClient.cpp b/storage/ndb/src/common/util/SocketClient.cpp
index 821624eb5c4..f4f2babf312 100644
--- a/storage/ndb/src/common/util/SocketClient.cpp
+++ b/storage/ndb/src/common/util/SocketClient.cpp
@@ -25,7 +25,7 @@ SocketClient::SocketClient(const char *server_name, unsigned short port, SocketA
{
m_auth= sa;
m_port= port;
- m_server_name= strdup(server_name);
+ m_server_name= server_name ? strdup(server_name) : 0;
m_sockfd= NDB_INVALID_SOCKET;
}
@@ -45,13 +45,16 @@ SocketClient::init()
if (m_sockfd != NDB_INVALID_SOCKET)
NDB_CLOSE_SOCKET(m_sockfd);
- memset(&m_servaddr, 0, sizeof(m_servaddr));
- m_servaddr.sin_family = AF_INET;
- m_servaddr.sin_port = htons(m_port);
- // Convert ip address presentation format to numeric format
- if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
- return false;
-
+ if (m_server_name)
+ {
+ memset(&m_servaddr, 0, sizeof(m_servaddr));
+ m_servaddr.sin_family = AF_INET;
+ m_servaddr.sin_port = htons(m_port);
+ // Convert ip address presentation format to numeric format
+ if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
+ return false;
+ }
+
m_sockfd= socket(AF_INET, SOCK_STREAM, 0);
if (m_sockfd == NDB_INVALID_SOCKET) {
return false;
@@ -62,8 +65,45 @@ SocketClient::init()
return true;
}
+int
+SocketClient::bind(const char* bindaddress, unsigned short localport)
+{
+ if (m_sockfd == NDB_INVALID_SOCKET)
+ return -1;
+
+ struct sockaddr_in local;
+ memset(&local, 0, sizeof(local));
+ local.sin_family = AF_INET;
+ local.sin_port = htons(localport);
+ // Convert ip address presentation format to numeric format
+ if (Ndb_getInAddr(&local.sin_addr, bindaddress))
+ {
+ return errno ? errno : EINVAL;
+ }
+
+ const int on = 1;
+ if (setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEADDR,
+ (const char*)&on, sizeof(on)) == -1) {
+
+ int ret = errno;
+ NDB_CLOSE_SOCKET(m_sockfd);
+ m_sockfd= NDB_INVALID_SOCKET;
+ return errno;
+ }
+
+ if (::bind(m_sockfd, (struct sockaddr*)&local, sizeof(local)) == -1)
+ {
+ int ret = errno;
+ NDB_CLOSE_SOCKET(m_sockfd);
+ m_sockfd= NDB_INVALID_SOCKET;
+ return ret;
+ }
+
+ return 0;
+}
+
NDB_SOCKET_TYPE
-SocketClient::connect()
+SocketClient::connect(const char *toaddress, unsigned short toport)
{
if (m_sockfd == NDB_INVALID_SOCKET)
{
@@ -74,6 +114,21 @@ SocketClient::connect()
return NDB_INVALID_SOCKET;
}
}
+
+ if (toaddress)
+ {
+ if (m_server_name)
+ free(m_server_name);
+ m_server_name = strdup(toaddress);
+ m_port = toport;
+ memset(&m_servaddr, 0, sizeof(m_servaddr));
+ m_servaddr.sin_family = AF_INET;
+ m_servaddr.sin_port = htons(toport);
+ // Convert ip address presentation format to numeric format
+ if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
+ return NDB_INVALID_SOCKET;
+ }
+
const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
if (r == -1) {
NDB_CLOSE_SOCKET(m_sockfd);
diff --git a/storage/ndb/src/common/util/socket_io.cpp b/storage/ndb/src/common/util/socket_io.cpp
index 58636e6663d..deb62d1867a 100644
--- a/storage/ndb/src/common/util/socket_io.cpp
+++ b/storage/ndb/src/common/util/socket_io.cpp
@@ -49,7 +49,7 @@ read_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
extern "C"
int
readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
- char * buf, int buflen){
+ char * buf, int buflen, NdbMutex *mutex){
if(buflen <= 1)
return 0;
@@ -65,7 +65,12 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
timeout.tv_sec = (timeout_millis / 1000);
timeout.tv_usec = (timeout_millis % 1000) * 1000;
+ if(mutex)
+ NdbMutex_Unlock(mutex);
const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
+ if(mutex)
+ NdbMutex_Lock(mutex);
+
if(selectRes == 0){
return 0;
}
@@ -75,7 +80,6 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
return -1;
}
- buf[0] = 0;
const int t = recv(socket, buf, buflen, MSG_PEEK);
if(t < 1)
@@ -87,27 +91,28 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
for(int i=0; i< t;i++)
{
if(buf[i] == '\n'){
- recv(socket, buf, i+1, 0);
- buf[i] = 0;
+ int r= recv(socket, buf, i+1, 0);
+ buf[i+1]= 0;
+ if(r < 1) {
+ fcntl(socket, F_SETFL, sock_flags);
+ return -1;
+ }
if(i > 0 && buf[i-1] == '\r'){
- i--;
- buf[i] = 0;
+ buf[i-1] = '\n';
+ buf[i]= '\0';
}
fcntl(socket, F_SETFL, sock_flags);
- return t;
+ return r;
}
}
- if(t == (buflen - 1)){
- recv(socket, buf, t, 0);
- buf[t] = 0;
- fcntl(socket, F_SETFL, sock_flags);
- return buflen;
- }
-
- return 0;
+ int r= recv(socket, buf, t, 0);
+ if(r>=0)
+ buf[r] = 0;
+ fcntl(socket, F_SETFL, sock_flags);
+ return r;
}
extern "C"
diff --git a/storage/ndb/src/cw/cpcd/main.cpp b/storage/ndb/src/cw/cpcd/main.cpp
index c320f07ef04..137735c9e76 100644
--- a/storage/ndb/src/cw/cpcd/main.cpp
+++ b/storage/ndb/src/cw/cpcd/main.cpp
@@ -82,6 +82,8 @@ int main(int argc, char** argv){
load_defaults("ndb_cpcd",load_default_groups,&argc,&argv);
if (handle_options(&argc, &argv, my_long_options, get_one_option)) {
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
exit(1);
diff --git a/storage/ndb/src/kernel/blocks/ERROR_codes.txt b/storage/ndb/src/kernel/blocks/ERROR_codes.txt
index 5b83ecffc6c..e55aaed407a 100644
--- a/storage/ndb/src/kernel/blocks/ERROR_codes.txt
+++ b/storage/ndb/src/kernel/blocks/ERROR_codes.txt
@@ -5,7 +5,7 @@ Next DBACC 3002
Next DBTUP 4024
Next DBLQH 5045
Next DBDICT 6007
-Next DBDIH 7177
+Next DBDIH 7178
Next DBTC 8038
Next CMVMI 9000
Next BACKUP 10022
@@ -66,6 +66,8 @@ Delay GCP_SAVEREQ by 10 secs
7030: Delay in GCP_PREPARE until node has completed a node failure
7031: Delay in GCP_PREPARE and die 3s later
+7177: Delay copying of sysfileData in execCOPY_GCIREQ
+
ERROR CODES FOR TESTING NODE FAILURE, LOCAL CHECKPOINT HANDLING:
-----------------------------------------------------------------
diff --git a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
index 150dc75f90c..a7865c356c8 100644
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
@@ -2928,10 +2928,10 @@ void Dbdict::checkSchemaStatus(Signal* signal)
// On NR get index from master because index state is not on file
Uint32 type= oldEntry->m_tableType;
- const bool file = c_systemRestart || !DictTabInfo::isIndex(type);
+ const bool file = (* newEntry == * oldEntry) &&
+ (c_systemRestart || !DictTabInfo::isIndex(type));
newEntry->m_info_words= oldEntry->m_info_words;
restartCreateTab(signal, tableId, oldEntry, newEntry, file);
-
return;
}
ndbrequire(ok);
diff --git a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
index d0b97c0eb81..3edaf146cd4 100644
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
@@ -1056,6 +1056,8 @@ private:
void removeStoredReplica(FragmentstorePtr regFragptr,
ReplicaRecordPtr replicaPtr);
void searchStoredReplicas(FragmentstorePtr regFragptr);
+ bool setup_create_replica(FragmentstorePtr, CreateReplicaRecord*,
+ ConstPtr<ReplicaRecord>);
void updateNodeInfo(FragmentstorePtr regFragptr);
//------------------------------------
diff --git a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
index 2f8a05fef6b..1cff7fba2fc 100644
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
@@ -636,23 +636,49 @@ void Dbdih::execCOPY_GCIREQ(Signal* signal)
ndbrequire(c_copyGCISlave.m_copyReason == CopyGCIReq::IDLE);
ndbrequire(c_copyGCISlave.m_expectedNextWord == tstart);
ndbrequire(reason != CopyGCIReq::IDLE);
-
+ bool isdone = (tstart + CopyGCIReq::DATA_SIZE) >= Sysfile::SYSFILE_SIZE32;
+
+ if (ERROR_INSERTED(7177))
+ {
+ jam();
+
+ if (signal->getLength() == 3)
+ {
+ jam();
+ goto done;
+ }
+ }
+
arrGuard(tstart + CopyGCIReq::DATA_SIZE, sizeof(sysfileData)/4);
for(Uint32 i = 0; i<CopyGCIReq::DATA_SIZE; i++)
cdata[tstart+i] = copyGCI->data[i];
- if ((tstart + CopyGCIReq::DATA_SIZE) >= Sysfile::SYSFILE_SIZE32) {
+ if (ERROR_INSERTED(7177) && isMaster() && isdone)
+ {
+ sendSignalWithDelay(reference(), GSN_COPY_GCIREQ, signal, 1000, 3);
+ return;
+ }
+
+done:
+ if (isdone)
+ {
jam();
c_copyGCISlave.m_expectedNextWord = 0;
- } else {
+ }
+ else
+ {
jam();
c_copyGCISlave.m_expectedNextWord += CopyGCIReq::DATA_SIZE;
return;
- }//if
-
- Uint32 tmp= SYSFILE->m_restart_seq;
- memcpy(sysfileData, cdata, sizeof(sysfileData));
- SYSFILE->m_restart_seq = tmp;
+ }
+
+ if (cmasterdihref != reference())
+ {
+ jam();
+ Uint32 tmp= SYSFILE->m_restart_seq;
+ memcpy(sysfileData, cdata, sizeof(sysfileData));
+ SYSFILE->m_restart_seq = tmp;
+ }
c_copyGCISlave.m_copyReason = reason;
c_copyGCISlave.m_senderRef = signal->senderBlockRef();
@@ -1305,9 +1331,9 @@ void Dbdih::execNDB_STTOR(Signal* signal)
if (isMaster()) {
jam();
systemRestartTakeOverLab(signal);
- if (anyActiveTakeOver() && false) {
+ if (anyActiveTakeOver())
+ {
jam();
- ndbout_c("1 - anyActiveTakeOver == true");
return;
}
}
@@ -2347,6 +2373,8 @@ Dbdih::systemRestartTakeOverLab(Signal* signal)
// NOT ACTIVE NODES THAT HAVE NOT YET BEEN TAKEN OVER NEEDS TAKE OVER
// IMMEDIATELY. IF WE ARE ALIVE WE TAKE OVER OUR OWN NODE.
/*-------------------------------------------------------------------*/
+ infoEvent("Take over of node %d started",
+ nodePtr.i);
startTakeOver(signal, RNIL, nodePtr.i, nodePtr.i);
}//if
break;
@@ -2459,6 +2487,12 @@ void Dbdih::nodeRestartTakeOver(Signal* signal, Uint32 startNodeId)
*--------------------------------------------------------------------*/
Uint32 takeOverNode = Sysfile::getTakeOverNode(startNodeId,
SYSFILE->takeOver);
+ if(takeOverNode == 0){
+ jam();
+ warningEvent("Bug in take-over code restarting");
+ takeOverNode = startNodeId;
+ }
+
startTakeOver(signal, RNIL, startNodeId, takeOverNode);
break;
}
@@ -2612,7 +2646,14 @@ void Dbdih::startTakeOver(Signal* signal,
Sysfile::setTakeOverNode(takeOverPtr.p->toFailedNode, SYSFILE->takeOver,
startNode);
takeOverPtr.p->toMasterStatus = TakeOverRecord::TO_START_COPY;
-
+
+ if (getNodeState().getSystemRestartInProgress())
+ {
+ jam();
+ checkToCopy();
+ checkToCopyCompleted(signal);
+ return;
+ }
cstartGcpNow = true;
}//Dbdih::startTakeOver()
@@ -3264,7 +3305,10 @@ void Dbdih::execCREATE_FRAGCONF(Signal* signal)
copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
copyFragReq->distributionKey = fragPtr.p->distributionKey;
copyFragReq->gci = gci;
- sendSignal(ref, GSN_COPY_FRAGREQ, signal, CopyFragReq::SignalLength, JBB);
+ copyFragReq->nodeCount = extractNodeInfo(fragPtr.p,
+ copyFragReq->nodeList);
+ sendSignal(ref, GSN_COPY_FRAGREQ, signal,
+ CopyFragReq::SignalLength + copyFragReq->nodeCount, JBB);
} else {
ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE);
jam();
@@ -3514,6 +3558,18 @@ void Dbdih::toCopyCompletedLab(Signal * signal, TakeOverRecordPtr takeOverPtr)
signal->theData[1] = takeOverPtr.p->toStartingNode;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+ if (getNodeState().getSystemRestartInProgress())
+ {
+ jam();
+ infoEvent("Take over of node %d complete", takeOverPtr.p->toStartingNode);
+ setNodeActiveStatus(takeOverPtr.p->toStartingNode, Sysfile::NS_Active);
+ takeOverPtr.p->toMasterStatus = TakeOverRecord::WAIT_LCP;
+ takeOverCompleted(takeOverPtr.p->toStartingNode);
+ checkToCopy();
+ checkToCopyCompleted(signal);
+ return;
+ }
+
c_lcpState.immediateLcpStart = true;
takeOverPtr.p->toMasterStatus = TakeOverRecord::WAIT_LCP;
@@ -3620,16 +3676,12 @@ void Dbdih::execEND_TOCONF(Signal* signal)
}//if
endTakeOver(takeOverPtr.i);
- ndbout_c("2 - endTakeOver");
if (cstartPhase == ZNDB_SPH4) {
jam();
- ndbrequire(false);
if (anyActiveTakeOver()) {
jam();
- ndbout_c("4 - anyActiveTakeOver == true");
return;
}//if
- ndbout_c("5 - anyActiveTakeOver == false -> ndbsttorry10Lab");
ndbsttorry10Lab(signal, __LINE__);
return;
}//if
@@ -8709,14 +8761,30 @@ Dbdih::resetReplicaSr(TabRecordPtr tabPtr){
resetReplicaLcp(replicaPtr.p, newestRestorableGCI);
- /* -----------------------------------------------------------------
- * LINK THE REPLICA INTO THE STORED REPLICA LIST. WE WILL USE THIS
- * NODE AS A STORED REPLICA.
- * WE MUST FIRST LINK IT OUT OF THE LIST OF OLD STORED REPLICAS.
- * --------------------------------------------------------------- */
- removeOldStoredReplica(fragPtr, replicaPtr);
- linkStoredReplica(fragPtr, replicaPtr);
-
+ /**
+ * Make sure we can also find REDO for restoring replica...
+ */
+ {
+ CreateReplicaRecord createReplica;
+ ConstPtr<ReplicaRecord> constReplicaPtr;
+ constReplicaPtr.i = replicaPtr.i;
+ constReplicaPtr.p = replicaPtr.p;
+ if (setup_create_replica(fragPtr,
+ &createReplica, constReplicaPtr))
+ {
+ removeOldStoredReplica(fragPtr, replicaPtr);
+ linkStoredReplica(fragPtr, replicaPtr);
+ }
+ else
+ {
+ infoEvent("Forcing take-over of node %d due to unsufficient REDO"
+ " for table %d fragment: %d",
+ nodePtr.i, tabPtr.i, i);
+
+ setNodeActiveStatus(nodePtr.i,
+ Sysfile::NS_NotActive_NotTakenOver);
+ }
+ }
}
default:
jam();
@@ -9838,6 +9906,7 @@ void Dbdih::calculateKeepGciLab(Signal* signal, Uint32 tableId, Uint32 fragId)
FragmentstorePtr fragPtr;
getFragstore(tabPtr.p, fragId, fragPtr);
checkKeepGci(tabPtr, fragId, fragPtr.p, fragPtr.p->storedReplicas);
+ checkKeepGci(tabPtr, fragId, fragPtr.p, fragPtr.p->oldStoredReplicas);
fragId++;
if (fragId >= tabPtr.p->totalfragments) {
jam();
@@ -10023,73 +10092,84 @@ void Dbdih::startNextChkpt(Signal* signal)
nodePtr.i = replicaPtr.p->procNode;
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
- if (replicaPtr.p->lcpOngoingFlag &&
- replicaPtr.p->lcpIdStarted < lcpId) {
- jam();
- //-------------------------------------------------------------------
- // We have found a replica on a node that performs local checkpoint
- // that is alive and that have not yet been started.
- //-------------------------------------------------------------------
-
- if (nodePtr.p->noOfStartedChkpt < 2) {
- jam();
- /**
- * Send LCP_FRAG_ORD to LQH
- */
-
- /**
- * Mark the replica so with lcpIdStarted == true
- */
- replicaPtr.p->lcpIdStarted = lcpId;
-
- Uint32 i = nodePtr.p->noOfStartedChkpt;
- nodePtr.p->startedChkpt[i].tableId = tabPtr.i;
- nodePtr.p->startedChkpt[i].fragId = curr.fragmentId;
- nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i;
- nodePtr.p->noOfStartedChkpt = i + 1;
-
- sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
- } else if (nodePtr.p->noOfQueuedChkpt < 2) {
- jam();
- /**
- * Put LCP_FRAG_ORD "in queue"
- */
-
- /**
- * Mark the replica so with lcpIdStarted == true
- */
- replicaPtr.p->lcpIdStarted = lcpId;
+ if (c_lcpState.m_participatingLQH.get(nodePtr.i))
+ {
+ if (replicaPtr.p->lcpOngoingFlag &&
+ replicaPtr.p->lcpIdStarted < lcpId)
+ {
+ jam();
+ //-------------------------------------------------------------------
+ // We have found a replica on a node that performs local checkpoint
+ // that is alive and that have not yet been started.
+ //-------------------------------------------------------------------
- Uint32 i = nodePtr.p->noOfQueuedChkpt;
- nodePtr.p->queuedChkpt[i].tableId = tabPtr.i;
- nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
- nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
- nodePtr.p->noOfQueuedChkpt = i + 1;
- } else {
- jam();
+ if (nodePtr.p->noOfStartedChkpt < 2)
+ {
+ jam();
+ /**
+ * Send LCP_FRAG_ORD to LQH
+ */
+
+ /**
+ * Mark the replica so with lcpIdStarted == true
+ */
+ replicaPtr.p->lcpIdStarted = lcpId;
- if(save){
+ Uint32 i = nodePtr.p->noOfStartedChkpt;
+ nodePtr.p->startedChkpt[i].tableId = tabPtr.i;
+ nodePtr.p->startedChkpt[i].fragId = curr.fragmentId;
+ nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i;
+ nodePtr.p->noOfStartedChkpt = i + 1;
+
+ sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
+ }
+ else if (nodePtr.p->noOfQueuedChkpt < 2)
+ {
+ jam();
/**
- * Stop increasing value on first that was "full"
+ * Put LCP_FRAG_ORD "in queue"
*/
- c_lcpState.currentFragment = curr;
- save = false;
- }
-
- busyNodes.set(nodePtr.i);
- if(busyNodes.count() == lcpNodes){
+
/**
- * There were no possibility to start the local checkpoint
- * and it was not possible to queue it up. In this case we
- * stop the start of local checkpoints until the nodes with a
- * backlog have performed more checkpoints. We will return and
- * will not continue the process of starting any more checkpoints.
+ * Mark the replica so with lcpIdStarted == true
*/
- return;
+ replicaPtr.p->lcpIdStarted = lcpId;
+
+ Uint32 i = nodePtr.p->noOfQueuedChkpt;
+ nodePtr.p->queuedChkpt[i].tableId = tabPtr.i;
+ nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
+ nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
+ nodePtr.p->noOfQueuedChkpt = i + 1;
+ }
+ else
+ {
+ jam();
+
+ if(save)
+ {
+ /**
+ * Stop increasing value on first that was "full"
+ */
+ c_lcpState.currentFragment = curr;
+ save = false;
+ }
+
+ busyNodes.set(nodePtr.i);
+ if(busyNodes.count() == lcpNodes)
+ {
+ /**
+ * There were no possibility to start the local checkpoint
+ * and it was not possible to queue it up. In this case we
+ * stop the start of local checkpoints until the nodes with a
+ * backlog have performed more checkpoints. We will return and
+ * will not continue the process of starting any more checkpoints.
+ */
+ return;
+ }//if
}//if
- }//if
- }
- }//while
+ }
+ }//while
+ }
curr.fragmentId++;
if (curr.fragmentId >= tabPtr.p->totalfragments) {
jam();
@@ -12838,16 +12918,75 @@ void Dbdih::removeTooNewCrashedReplicas(ReplicaRecordPtr rtnReplicaPtr)
/* CHECKPOINT WITHOUT NEEDING ANY EXTRA LOGGING FACILITIES.*/
/* A MAXIMUM OF FOUR NODES IS RETRIEVED. */
/*************************************************************************/
+bool
+Dbdih::setup_create_replica(FragmentstorePtr fragPtr,
+ CreateReplicaRecord* createReplicaPtrP,
+ ConstPtr<ReplicaRecord> replicaPtr)
+{
+ createReplicaPtrP->dataNodeId = replicaPtr.p->procNode;
+ createReplicaPtrP->replicaRec = replicaPtr.i;
+
+ /* ----------------------------------------------------------------- */
+ /* WE NEED TO SEARCH FOR A PROPER LOCAL CHECKPOINT TO USE FOR THE */
+ /* SYSTEM RESTART. */
+ /* ----------------------------------------------------------------- */
+ Uint32 startGci;
+ Uint32 startLcpNo;
+ Uint32 stopGci = SYSFILE->newestRestorableGCI;
+ bool result = findStartGci(replicaPtr,
+ stopGci,
+ startGci,
+ startLcpNo);
+ if (!result)
+ {
+ jam();
+ /* --------------------------------------------------------------- */
+ /* WE COULD NOT FIND ANY LOCAL CHECKPOINT. THE FRAGMENT THUS DO NOT*/
+ /* CONTAIN ANY VALID LOCAL CHECKPOINT. IT DOES HOWEVER CONTAIN A */
+ /* VALID FRAGMENT LOG. THUS BY FIRST CREATING THE FRAGMENT AND THEN*/
+ /* EXECUTING THE FRAGMENT LOG WE CAN CREATE THE FRAGMENT AS */
+ /* DESIRED. THIS SHOULD ONLY OCCUR AFTER CREATING A FRAGMENT. */
+ /* */
+ /* TO INDICATE THAT NO LOCAL CHECKPOINT IS TO BE USED WE SET THE */
+ /* LOCAL CHECKPOINT TO ZNIL. */
+ /* --------------------------------------------------------------- */
+ createReplicaPtrP->lcpNo = ZNIL;
+ }
+ else
+ {
+ jam();
+ /* --------------------------------------------------------------- */
+ /* WE FOUND A PROPER LOCAL CHECKPOINT TO RESTART FROM. */
+ /* SET LOCAL CHECKPOINT ID AND LOCAL CHECKPOINT NUMBER. */
+ /* --------------------------------------------------------------- */
+ createReplicaPtrP->lcpNo = startLcpNo;
+ arrGuard(startLcpNo, MAX_LCP_STORED);
+ createReplicaPtrP->createLcpId = replicaPtr.p->lcpId[startLcpNo];
+ }//if
+
+
+ /* ----------------------------------------------------------------- */
+ /* WE HAVE EITHER FOUND A LOCAL CHECKPOINT OR WE ARE PLANNING TO */
+ /* EXECUTE THE LOG FROM THE INITIAL CREATION OF THE TABLE. IN BOTH */
+ /* CASES WE NEED TO FIND A SET OF LOGS THAT CAN EXECUTE SUCH THAT */
+ /* WE RECOVER TO THE SYSTEM RESTART GLOBAL CHECKPOINT. */
+ /* -_--------------------------------------------------------------- */
+ return findLogNodes(createReplicaPtrP, fragPtr, startGci, stopGci);
+}
+
void Dbdih::searchStoredReplicas(FragmentstorePtr fragPtr)
{
Uint32 nextReplicaPtrI;
- ConstPtr<ReplicaRecord> replicaPtr;
+ Ptr<ReplicaRecord> replicaPtr;
replicaPtr.i = fragPtr.p->storedReplicas;
while (replicaPtr.i != RNIL) {
jam();
ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
nextReplicaPtrI = replicaPtr.p->nextReplica;
+ ConstPtr<ReplicaRecord> constReplicaPtr;
+ constReplicaPtr.i = replicaPtr.i;
+ constReplicaPtr.p = replicaPtr.p;
NodeRecordPtr nodePtr;
nodePtr.i = replicaPtr.p->procNode;
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
@@ -12867,69 +13006,13 @@ void Dbdih::searchStoredReplicas(FragmentstorePtr fragPtr)
createReplicaPtr.i = cnoOfCreateReplicas;
ptrCheckGuard(createReplicaPtr, 4, createReplicaRecord);
cnoOfCreateReplicas++;
- createReplicaPtr.p->dataNodeId = replicaPtr.p->procNode;
- createReplicaPtr.p->replicaRec = replicaPtr.i;
- /* ----------------------------------------------------------------- */
- /* WE NEED TO SEARCH FOR A PROPER LOCAL CHECKPOINT TO USE FOR THE */
- /* SYSTEM RESTART. */
- /* ----------------------------------------------------------------- */
- Uint32 startGci;
- Uint32 startLcpNo;
- Uint32 stopGci = SYSFILE->newestRestorableGCI;
- bool result = findStartGci(replicaPtr,
- stopGci,
- startGci,
- startLcpNo);
- if (!result) {
- jam();
- /* --------------------------------------------------------------- */
- /* WE COULD NOT FIND ANY LOCAL CHECKPOINT. THE FRAGMENT THUS DO NOT*/
- /* CONTAIN ANY VALID LOCAL CHECKPOINT. IT DOES HOWEVER CONTAIN A */
- /* VALID FRAGMENT LOG. THUS BY FIRST CREATING THE FRAGMENT AND THEN*/
- /* EXECUTING THE FRAGMENT LOG WE CAN CREATE THE FRAGMENT AS */
- /* DESIRED. THIS SHOULD ONLY OCCUR AFTER CREATING A FRAGMENT. */
- /* */
- /* TO INDICATE THAT NO LOCAL CHECKPOINT IS TO BE USED WE SET THE */
- /* LOCAL CHECKPOINT TO ZNIL. */
- /* --------------------------------------------------------------- */
- createReplicaPtr.p->lcpNo = ZNIL;
- } else {
- jam();
- /* --------------------------------------------------------------- */
- /* WE FOUND A PROPER LOCAL CHECKPOINT TO RESTART FROM. */
- /* SET LOCAL CHECKPOINT ID AND LOCAL CHECKPOINT NUMBER. */
- /* --------------------------------------------------------------- */
- createReplicaPtr.p->lcpNo = startLcpNo;
- arrGuard(startLcpNo, MAX_LCP_STORED);
- createReplicaPtr.p->createLcpId = replicaPtr.p->lcpId[startLcpNo];
- }//if
-
- if(ERROR_INSERTED(7073) || ERROR_INSERTED(7074)){
- jam();
- nodePtr.p->nodeStatus = NodeRecord::DEAD;
- }
-
- /* ----------------------------------------------------------------- */
- /* WE HAVE EITHER FOUND A LOCAL CHECKPOINT OR WE ARE PLANNING TO */
- /* EXECUTE THE LOG FROM THE INITIAL CREATION OF THE TABLE. IN BOTH */
- /* CASES WE NEED TO FIND A SET OF LOGS THAT CAN EXECUTE SUCH THAT */
- /* WE RECOVER TO THE SYSTEM RESTART GLOBAL CHECKPOINT. */
- /* -_--------------------------------------------------------------- */
- if (!findLogNodes(createReplicaPtr.p, fragPtr, startGci, stopGci)) {
- jam();
- /* --------------------------------------------------------------- */
- /* WE WERE NOT ABLE TO FIND ANY WAY OF RESTORING THIS REPLICA. */
- /* THIS IS A POTENTIAL SYSTEM ERROR. */
- /* --------------------------------------------------------------- */
- cnoOfCreateReplicas--;
- return;
- }//if
-
- if(ERROR_INSERTED(7073) || ERROR_INSERTED(7074)){
- jam();
- nodePtr.p->nodeStatus = NodeRecord::ALIVE;
- }
+ /**
+ * Should have been checked in resetReplicaSr
+ */
+ ndbrequire(setup_create_replica(fragPtr,
+ createReplicaPtr.p,
+ constReplicaPtr));
break;
}
default:
diff --git a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
index ec3042fa6dc..736134a8a8b 100644
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
@@ -2147,6 +2147,7 @@ private:
void execSTORED_PROCCONF(Signal* signal);
void execSTORED_PROCREF(Signal* signal);
void execCOPY_FRAGREQ(Signal* signal);
+ void execUPDATE_FRAG_DIST_KEY_ORD(Signal*);
void execCOPY_ACTIVEREQ(Signal* signal);
void execCOPY_STATEREQ(Signal* signal);
void execLQH_TRANSREQ(Signal* signal);
diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
index 890aaeb00f3..3f7892f774d 100644
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
@@ -301,6 +301,9 @@ Dblqh::Dblqh(Block_context& ctx):
addRecSignal(GSN_RESTORE_LCP_REF, &Dblqh::execRESTORE_LCP_REF);
addRecSignal(GSN_RESTORE_LCP_CONF, &Dblqh::execRESTORE_LCP_CONF);
+
+ addRecSignal(GSN_UPDATE_FRAG_DIST_KEY_ORD,
+ &Dblqh::execUPDATE_FRAG_DIST_KEY_ORD);
initData();
diff --git a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index 98f46ac8c44..9a7803efbec 100644
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -9913,6 +9913,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
const CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0];
tabptr.i = copyFragReq->tableId;
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
+ Uint32 i;
const Uint32 fragId = copyFragReq->fragId;
const Uint32 copyPtr = copyFragReq->userPtr;
const Uint32 userRef = copyFragReq->userRef;
@@ -9925,8 +9926,20 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
ndbrequire(cfirstfreeTcConrec != RNIL);
ndbrequire(fragptr.p->m_scanNumberMask.get(NR_ScanNo));
- fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
-
+ Uint32 key = fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
+
+ Uint32 checkversion = NDB_VERSION >= MAKE_VERSION(5,1,0) ?
+ NDBD_UPDATE_FRAG_DIST_KEY_51 : NDBD_UPDATE_FRAG_DIST_KEY_50;
+
+ Uint32 nodeCount = copyFragReq->nodeCount;
+ NdbNodeBitmask nodemask;
+ if (getNodeInfo(refToNode(userRef)).m_version >= checkversion)
+ {
+ ndbrequire(nodeCount <= MAX_REPLICAS);
+ for (i = 0; i<nodeCount; i++)
+ nodemask.set(copyFragReq->nodeList[i]);
+ }
+
if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
jam();
/**
@@ -10009,9 +10022,42 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
req->savePointId = tcConnectptr.p->savePointId;
sendSignal(scanptr.p->scanBlockref, GSN_ACC_SCANREQ, signal,
AccScanReq::SignalLength, JBB);
+
+ if (! nodemask.isclear())
+ {
+ ndbrequire(nodemask.get(getOwnNodeId()));
+ ndbrequire(nodemask.get(nodeId)); // cpy dest
+ nodemask.clear(getOwnNodeId());
+ nodemask.clear(nodeId);
+
+ UpdateFragDistKeyOrd*
+ ord = (UpdateFragDistKeyOrd*)signal->getDataPtrSend();
+ ord->tableId = tabptr.i;
+ ord->fragId = fragId;
+ ord->fragDistributionKey = key;
+ i = 0;
+ while ((i = nodemask.find(i+1)) != NdbNodeBitmask::NotFound)
+ {
+ if (getNodeInfo(i).m_version >= checkversion)
+ sendSignal(calcLqhBlockRef(i), GSN_UPDATE_FRAG_DIST_KEY_ORD,
+ signal, UpdateFragDistKeyOrd::SignalLength, JBB);
+ }
+ }
return;
}//Dblqh::execCOPY_FRAGREQ()
+void
+Dblqh::execUPDATE_FRAG_DIST_KEY_ORD(Signal * signal)
+{
+ jamEntry();
+ UpdateFragDistKeyOrd* ord =(UpdateFragDistKeyOrd*)signal->getDataPtr();
+
+ tabptr.i = ord->tableId;
+ ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
+ ndbrequire(getFragmentrec(signal, ord->fragId));
+ fragptr.p->fragDistributionKey = ord->fragDistributionKey;
+}
+
void Dblqh::accScanConfCopyLab(Signal* signal)
{
AccScanConf * const accScanConf = (AccScanConf *)&signal->theData[0];
@@ -18292,6 +18338,18 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
if(tabPtr.p->tableStatus != Tablerec::NOT_DEFINED){
infoEvent("Table %d Status: %d Usage: %d",
i, tabPtr.p->tableStatus, tabPtr.p->usageCount);
+
+ for (Uint32 j = 0; j<MAX_FRAG_PER_NODE; j++)
+ {
+ FragrecordPtr fragPtr;
+ if ((fragPtr.i = tabPtr.p->fragrec[j]) != RNIL)
+ {
+ c_fragment_pool.getPtr(fragPtr);
+ infoEvent(" frag: %d distKey: %u",
+ tabPtr.p->fragid[j],
+ fragPtr.p->fragDistributionKey);
+ }
+ }
}
}
return;
diff --git a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
index 18cb404fc8e..04f44a2742d 100644
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
@@ -1003,13 +1003,6 @@ Dbtc::handleFailedApiNode(Signal* signal,
TloopCount += 64;
break;
case CS_CONNECTED:
- /*********************************************************************/
- // The api record is connected to failed node. We need to release the
- // connection and set it in a disconnected state.
- /*********************************************************************/
- jam();
- releaseApiCon(signal, apiConnectptr.i);
- break;
case CS_REC_COMMITTING:
case CS_RECEIVING:
case CS_STARTED:
@@ -6317,6 +6310,18 @@ void Dbtc::timeOutFoundLab(Signal* signal, Uint32 TapiConPtr, Uint32 errCode)
break;
case CS_START_SCAN:{
jam();
+
+ /*
+ We are waiting for application to continue the transaction. In this
+ particular state we will use the application timeout parameter rather
+ than the shorter Deadlock detection timeout.
+ */
+ if (c_appl_timeout_value == 0 ||
+ (ctcTimer - getApiConTimer(apiConnectptr.i)) <= c_appl_timeout_value) {
+ jam();
+ return;
+ }//if
+
ScanRecordPtr scanPtr;
scanPtr.i = apiConnectptr.p->apiScanRec;
ptrCheckGuard(scanPtr, cscanrecFileSize, scanRecord);
@@ -9848,6 +9853,17 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
conf->requestInfo = op_count | ScanTabConf::EndOfData;
releaseScanResources(scanPtr);
}
+ else
+ {
+ if (scanPtr.p->m_running_scan_frags.isEmpty())
+ {
+ jam();
+ /**
+ * All scan frags delivered...waiting for API
+ */
+ setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
+ }
+ }
if(4 + 3 * op_count > 25){
jam();
diff --git a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
index 946504ec3a5..8882b1f8093 100644
--- a/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
+++ b/storage/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
@@ -126,41 +126,17 @@ private:
// forward declarations
struct DescEnt;
- /*
- * Pointer to array of Uint32.
- */
- struct Data {
- private:
- Uint32* m_data;
- public:
- Data();
- Data(Uint32* data);
- Data& operator=(Uint32* data);
- operator Uint32*() const;
- Data& operator+=(size_t n);
- AttributeHeader& ah() const;
- };
- friend class Data;
+ // Pointer to array of Uint32 represents attribute data and bounds
- /*
- * Pointer to array of constant Uint32.
- */
- struct ConstData;
- friend struct ConstData;
- struct ConstData {
- private:
- const Uint32* m_data;
- public:
- ConstData();
- ConstData(const Uint32* data);
- ConstData& operator=(const Uint32* data);
- operator const Uint32*() const;
- ConstData& operator+=(size_t n);
- const AttributeHeader& ah() const;
- // non-const pointer can be cast to const pointer
- ConstData(Data data);
- ConstData& operator=(Data data);
- };
+ typedef Uint32 *Data;
+ inline AttributeHeader& ah(Data data) {
+ return *reinterpret_cast<AttributeHeader*>(data);
+ }
+
+ typedef const Uint32* ConstData;
+ inline const AttributeHeader& ah(ConstData data) {
+ return *reinterpret_cast<const AttributeHeader*>(data);
+ }
// AttributeHeader size is assumed to be 1 word
STATIC_CONST( AttributeHeaderSize = 1 );
@@ -216,6 +192,7 @@ private:
unsigned m_tupVersion : 15; // version
TreeEnt();
// methods
+ bool eqtuple(const TreeEnt ent) const;
bool eq(const TreeEnt ent) const;
int cmp(const TreeEnt ent) const;
};
@@ -294,8 +271,7 @@ private:
struct TreePos {
TupLoc m_loc; // physical node address
Uint16 m_pos; // position 0 to m_occup
- Uint8 m_match; // at an existing entry
- Uint8 m_dir; // see scanNext()
+ Uint8 m_dir; // see scanNext
TreePos();
};
@@ -386,12 +362,13 @@ private:
enum {
Undef = 0,
First = 1, // before first entry
- Current = 2, // at current before locking
- Blocked = 3, // at current waiting for ACC lock
- Locked = 4, // at current and locked or no lock needed
- Next = 5, // looking for next extry
- Last = 6, // after last entry
- Aborting = 7, // lock wait at scan close
+ Current = 2, // at some entry
+ Found = 3, // return current as next scan result
+ Blocked = 4, // found and waiting for ACC lock
+ Locked = 5, // found and locked or no lock needed
+ Next = 6, // looking for next extry
+ Last = 7, // after last entry
+ Aborting = 8, // lock wait at scan close
Invalid = 9 // cannot return REF to LQH currently
};
Uint16 m_state;
@@ -568,6 +545,7 @@ private:
void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData);
void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
+ void unpackBound(const ScanBound& bound, Data data);
/*
* DbtuxMeta.cpp
@@ -642,7 +620,9 @@ private:
void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal);
void scanFirst(ScanOpPtr scanPtr);
+ void scanFind(ScanOpPtr scanPtr);
void scanNext(ScanOpPtr scanPtr, bool fromMaintReq);
+ bool scanCheck(ScanOpPtr scanPtr, TreeEnt ent);
bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
@@ -652,8 +632,8 @@ private:
/*
* DbtuxSearch.cpp
*/
- void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
- void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
+ bool searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
+ bool searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
@@ -750,99 +730,6 @@ private:
static unsigned max(unsigned x, unsigned y);
};
-// Dbtux::Data
-
-inline
-Dbtux::Data::Data() :
- m_data(0)
-{
-}
-
-inline
-Dbtux::Data::Data(Uint32* data) :
- m_data(data)
-{
-}
-
-inline Dbtux::Data&
-Dbtux::Data::operator=(Uint32* data)
-{
- m_data = data;
- return *this;
-}
-
-inline
-Dbtux::Data::operator Uint32*() const
-{
- return m_data;
-}
-
-inline Dbtux::Data&
-Dbtux::Data::operator+=(size_t n)
-{
- m_data += n;
- return *this;
-}
-
-inline AttributeHeader&
-Dbtux::Data::ah() const
-{
- return *reinterpret_cast<AttributeHeader*>(m_data);
-}
-
-// Dbtux::ConstData
-
-inline
-Dbtux::ConstData::ConstData() :
- m_data(0)
-{
-}
-
-inline
-Dbtux::ConstData::ConstData(const Uint32* data) :
- m_data(data)
-{
-}
-
-inline Dbtux::ConstData&
-Dbtux::ConstData::operator=(const Uint32* data)
-{
- m_data = data;
- return *this;
-}
-
-inline
-Dbtux::ConstData::operator const Uint32*() const
-{
- return m_data;
-}
-
-inline Dbtux::ConstData&
-Dbtux::ConstData::operator+=(size_t n)
-{
- m_data += n;
- return *this;
-}
-
-inline const AttributeHeader&
-Dbtux::ConstData::ah() const
-{
- return *reinterpret_cast<const AttributeHeader*>(m_data);
-}
-
-inline
-Dbtux::ConstData::ConstData(Data data) :
- m_data(static_cast<Uint32*>(data))
-{
-}
-
-inline Dbtux::ConstData&
-Dbtux::ConstData::operator=(Data data)
-{
- m_data = static_cast<Uint32*>(data);
- return *this;
-}
-
// Dbtux::TupLoc
inline
@@ -911,6 +798,13 @@ Dbtux::TreeEnt::TreeEnt() :
}
inline bool
+Dbtux::TreeEnt::eqtuple(const TreeEnt ent) const
+{
+ return
+ m_tupLoc == ent.m_tupLoc;
+}
+
+inline bool
Dbtux::TreeEnt::eq(const TreeEnt ent) const
{
return
@@ -929,10 +823,25 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const
return -1;
if (m_tupLoc.getPageOffset() > ent.m_tupLoc.getPageOffset())
return +1;
- if (m_tupVersion < ent.m_tupVersion)
- return -1;
- if (m_tupVersion > ent.m_tupVersion)
- return +1;
+ /*
+ * Guess if one tuple version has wrapped around. This is well
+ * defined ordering on existing versions since versions are assigned
+ * consecutively and different versions exists only on uncommitted
+ * tuple. Assuming max 2**14 uncommitted ops on same tuple.
+ */
+ const unsigned version_wrap_limit = (1 << (ZTUP_VERSION_BITS - 1));
+ if (m_tupVersion < ent.m_tupVersion) {
+ if (ent.m_tupVersion - m_tupVersion < version_wrap_limit)
+ return -1;
+ else
+ return +1;
+ }
+ if (m_tupVersion > ent.m_tupVersion) {
+ if (m_tupVersion - ent.m_tupVersion < version_wrap_limit)
+ return +1;
+ else
+ return -1;
+ }
return 0;
}
@@ -1000,7 +909,6 @@ inline
Dbtux::TreePos::TreePos() :
m_loc(),
m_pos(ZNIL),
- m_match(false),
m_dir(255)
{
}
diff --git a/storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp b/storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
index cf815b14c1a..058409c99b0 100644
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
@@ -34,7 +34,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
// skip to right position in search key only
for (unsigned i = 0; i < start; i++) {
jam();
- searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
+ searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
}
// number of words of entry data left
unsigned len2 = maxlen;
@@ -46,16 +46,16 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
break;
}
len2 -= AttributeHeaderSize;
- if (! searchKey.ah().isNULL()) {
- if (! entryData.ah().isNULL()) {
+ if (! ah(searchKey).isNULL()) {
+ if (! ah(entryData).isNULL()) {
jam();
// verify attribute id
const DescAttr& descAttr = descEnt.m_descAttr[start];
- ndbrequire(searchKey.ah().getAttributeId() == descAttr.m_primaryAttrId);
- ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
+ ndbrequire(ah(searchKey).getAttributeId() == descAttr.m_primaryAttrId);
+ ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
// sizes
- const unsigned size1 = searchKey.ah().getDataSize();
- const unsigned size2 = min(entryData.ah().getDataSize(), len2);
+ const unsigned size1 = ah(searchKey).getDataSize();
+ const unsigned size2 = min(ah(entryData).getDataSize(), len2);
len2 -= size2;
// compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start];
@@ -74,15 +74,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
break;
}
} else {
- if (! entryData.ah().isNULL()) {
+ if (! ah(entryData).isNULL()) {
jam();
// NULL < not NULL
ret = -1;
break;
}
}
- searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
- entryData += AttributeHeaderSize + entryData.ah().getDataSize();
+ searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
+ entryData += AttributeHeaderSize + ah(entryData).getDataSize();
start++;
}
return ret;
@@ -130,17 +130,17 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsign
// get and skip bound type (it is used after the loop)
type = boundInfo[0];
boundInfo += 1;
- if (! boundInfo.ah().isNULL()) {
- if (! entryData.ah().isNULL()) {
+ if (! ah(boundInfo).isNULL()) {
+ if (! ah(entryData).isNULL()) {
jam();
// verify attribute id
- const Uint32 index = boundInfo.ah().getAttributeId();
+ const Uint32 index = ah(boundInfo).getAttributeId();
ndbrequire(index < frag.m_numAttrs);
const DescAttr& descAttr = descEnt.m_descAttr[index];
- ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
+ ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
// sizes
- const unsigned size1 = boundInfo.ah().getDataSize();
- const unsigned size2 = min(entryData.ah().getDataSize(), len2);
+ const unsigned size1 = ah(boundInfo).getDataSize();
+ const unsigned size2 = min(ah(entryData).getDataSize(), len2);
len2 -= size2;
// compare
NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index];
@@ -159,14 +159,14 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsign
}
} else {
jam();
- if (! entryData.ah().isNULL()) {
+ if (! ah(entryData).isNULL()) {
jam();
// NULL < not NULL
return -1;
}
}
- boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
- entryData += AttributeHeaderSize + entryData.ah().getDataSize();
+ boundInfo += AttributeHeaderSize + ah(boundInfo).getDataSize();
+ entryData += AttributeHeaderSize + ah(entryData).getDataSize();
boundCount -= 1;
}
// all attributes were equal
diff --git a/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp b/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
index a9eae510ce6..e1e858622cc 100644
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
@@ -280,7 +280,6 @@ operator<<(NdbOut& out, const Dbtux::TreePos& pos)
out << "[TreePos " << hex << &pos;
out << " [loc " << pos.m_loc << "]";
out << " [pos " << dec << pos.m_pos << "]";
- out << " [match " << dec << pos.m_match << "]";
out << " [dir " << dec << pos.m_dir << "]";
out << "]";
return out;
diff --git a/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp b/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
index a2e285078c0..8aeb4934600 100644
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
@@ -225,7 +225,7 @@ Dbtux::setKeyAttrs(const Frag& frag)
const DescAttr& descAttr = descEnt.m_descAttr[i];
Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// set attr id and fixed size
- keyAttrs.ah() = AttributeHeader(descAttr.m_primaryAttrId, size);
+ ah(keyAttrs) = AttributeHeader(descAttr.m_primaryAttrId, size);
keyAttrs += 1;
// set comparison method pointer
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
@@ -255,8 +255,8 @@ Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
ConstData data = keyData;
Uint32 totalSize = 0;
for (Uint32 i = start; i < frag.m_numAttrs; i++) {
- Uint32 attrId = data.ah().getAttributeId();
- Uint32 dataSize = data.ah().getDataSize();
+ Uint32 attrId = ah(data).getAttributeId();
+ Uint32 dataSize = ah(data).getDataSize();
debugOut << i << " attrId=" << attrId << " size=" << dataSize;
data += 1;
for (Uint32 j = 0; j < dataSize; j++) {
@@ -294,7 +294,7 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2
unsigned len2 = maxlen2;
while (n != 0) {
jam();
- const unsigned dataSize = data1.ah().getDataSize();
+ const unsigned dataSize = ah(data1).getDataSize();
// copy header
if (len2 == 0)
return;
@@ -318,4 +318,17 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2
#endif
}
+void
+Dbtux::unpackBound(const ScanBound& bound, Data dest)
+{
+ ScanBoundIterator iter;
+ bound.first(iter);
+ const unsigned n = bound.getSize();
+ unsigned j;
+ for (j = 0; j < n; j++) {
+ dest[j] = *iter.data;
+ bound.next(iter);
+ }
+}
+
BLOCK_FUNCTIONS(Dbtux)
diff --git a/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp b/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
index b329d694f0b..4fc2c2dd968 100644
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
@@ -111,16 +111,17 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
// do the operation
req->errorCode = 0;
TreePos treePos;
+ bool ok;
switch (opCode) {
case TuxMaintReq::OpAdd:
jam();
- searchToAdd(frag, c_searchKey, ent, treePos);
+ ok = searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
- debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
+ debugOut << treePos << (! ok ? " - error" : "") << endl;
}
#endif
- if (treePos.m_match) {
+ if (! ok) {
jam();
// there is no "Building" state so this will have to do
if (indexPtr.p->m_state == Index::Online) {
@@ -150,13 +151,13 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
case TuxMaintReq::OpRemove:
jam();
- searchToRemove(frag, c_searchKey, ent, treePos);
+ ok = searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
- debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
+ debugOut << treePos << (! ok ? " - error" : "") << endl;
}
#endif
- if (! treePos.m_match) {
+ if (! ok) {
jam();
// there is no "Building" state so this will have to do
if (indexPtr.p->m_state == Index::Online) {
diff --git a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
index 55315806635..0ba9bd15b53 100644
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
@@ -424,21 +424,17 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam();
// search is done only once in single range scan
scanFirst(scanPtr);
-#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
- debugOut << "First scan " << scanPtr.i << " " << scan << endl;
- }
-#endif
}
- if (scan.m_state == ScanOp::Next) {
+ if (scan.m_state == ScanOp::Current ||
+ scan.m_state == ScanOp::Next) {
jam();
// look for next
- scanNext(scanPtr, false);
+ scanFind(scanPtr);
}
- // for reading tuple key in Current or Locked state
+ // for reading tuple key in Found or Locked state
Data pkData = c_dataBuffer;
unsigned pkSize = 0; // indicates not yet done
- if (scan.m_state == ScanOp::Current) {
+ if (scan.m_state == ScanOp::Found) {
// found an entry to return
jam();
ndbrequire(scan.m_accLockOp == RNIL);
@@ -512,8 +508,8 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam();
// max ops should depend on max scans (assert only)
ndbassert(false);
- // stay in Current state
- scan.m_state = ScanOp::Current;
+ // stay in Found state
+ scan.m_state = ScanOp::Found;
signal->theData[0] = scan.m_userPtr;
signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
@@ -700,44 +696,95 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
}
/*
- * Find start position for single range scan. If it exists, sets state
- * to Next and links the scan to the node. The first entry is returned
- * by scanNext.
+ * Find start position for single range scan.
*/
void
Dbtux::scanFirst(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
+ }
+#endif
TreeHead& tree = frag.m_tree;
// set up index keys for this operation
setKeyAttrs(frag);
// scan direction 0, 1
const unsigned idir = scan.m_descending;
- // unpack start key into c_dataBuffer
- const ScanBound& bound = *scan.m_bound[idir];
- ScanBoundIterator iter;
- bound.first(iter);
- for (unsigned j = 0; j < bound.getSize(); j++) {
- jam();
- c_dataBuffer[j] = *iter.data;
- bound.next(iter);
- }
+ unpackBound(*scan.m_bound[idir], c_dataBuffer);
TreePos treePos;
searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos);
- if (treePos.m_loc == NullTupLoc) {
- // empty result set
+ if (treePos.m_loc != NullTupLoc) {
+ scan.m_scanPos = treePos;
+ // link the scan to node found
+ NodeHandle node(frag);
+ selectNode(node, treePos.m_loc);
+ linkScan(node, scanPtr);
+ if (treePos.m_dir == 3) {
+ jam();
+ // check upper bound
+ TreeEnt ent = node.getEnt(treePos.m_pos);
+ if (scanCheck(scanPtr, ent))
+ scan.m_state = ScanOp::Current;
+ else
+ scan.m_state = ScanOp::Last;
+ } else {
+ scan.m_state = ScanOp::Next;
+ }
+ } else {
jam();
scan.m_state = ScanOp::Last;
- return;
}
- // set position and state
- scan.m_scanPos = treePos;
- scan.m_state = ScanOp::Next;
- // link the scan to node found
- NodeHandle node(frag);
- selectNode(node, treePos.m_loc);
- linkScan(node, scanPtr);
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Leave first scan " << scanPtr.i << " " << scan << endl;
+ }
+#endif
+}
+
+/*
+ * Look for entry to return as scan result.
+ */
+void
+Dbtux::scanFind(ScanOpPtr scanPtr)
+{
+ ScanOp& scan = *scanPtr.p;
+ Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Enter find scan " << scanPtr.i << " " << scan << endl;
+ }
+#endif
+ ndbrequire(scan.m_state == ScanOp::Current || scan.m_state == ScanOp::Next);
+ while (1) {
+ jam();
+ if (scan.m_state == ScanOp::Next)
+ scanNext(scanPtr, false);
+ if (scan.m_state == ScanOp::Current) {
+ jam();
+ const TreePos pos = scan.m_scanPos;
+ NodeHandle node(frag);
+ selectNode(node, pos.m_loc);
+ const TreeEnt ent = node.getEnt(pos.m_pos);
+ if (scanVisible(scanPtr, ent)) {
+ jam();
+ scan.m_state = ScanOp::Found;
+ scan.m_scanEnt = ent;
+ break;
+ }
+ } else {
+ jam();
+ break;
+ }
+ scan.m_state = ScanOp::Next;
+ }
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Leave find scan " << scanPtr.i << " " << scan << endl;
+ }
+#endif
}
/*
@@ -755,6 +802,11 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
*
* If an entry was found, scan direction is 3. Therefore tree
* re-organizations need not worry about scan direction.
+ *
+ * This method is also used to move a scan when its entry is removed
+ * (see moveScanList). If the scan is Blocked, we check if it remains
+ * Blocked on a different version of the tuple. Otherwise the tuple is
+ * lost and state becomes Current.
*/
void
Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
@@ -762,8 +814,8 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
- debugOut << "Next in scan " << scanPtr.i << " " << scan << endl;
+ if (debugFlags & (DebugMaint | DebugScan)) {
+ debugOut << "Enter next scan " << scanPtr.i << " " << scan << endl;
}
#endif
// cannot be moved away from tuple we have locked
@@ -773,15 +825,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
// scan direction
const unsigned idir = scan.m_descending; // 0, 1
const int jdir = 1 - 2 * (int)idir; // 1, -1
- // unpack end key into c_dataBuffer
- const ScanBound& bound = *scan.m_bound[1 - idir];
- ScanBoundIterator iter;
- bound.first(iter);
- for (unsigned j = 0; j < bound.getSize(); j++) {
- jam();
- c_dataBuffer[j] = *iter.data;
- bound.next(iter);
- }
+ unpackBound(*scan.m_bound[1 - idir], c_dataBuffer);
// use copy of position
TreePos pos = scan.m_scanPos;
// get and remember original node
@@ -795,15 +839,14 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
while (true) {
jam();
#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
- debugOut << "Scan next pos " << pos << " " << node << endl;
+ if (debugFlags & (DebugMaint | DebugScan)) {
+ debugOut << "Current scan " << scanPtr.i << " pos " << pos << " node " << node << endl;
}
#endif
if (pos.m_dir == 2) {
// coming up from root ends the scan
jam();
pos.m_loc = NullTupLoc;
- scan.m_state = ScanOp::Last;
break;
}
if (node.m_loc != pos.m_loc) {
@@ -835,41 +878,22 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
if (pos.m_dir == idir) {
// coming up from left child scan current node
jam();
- pos.m_pos = idir == 0 ? 0 : occup - 1;
- pos.m_match = false;
+ pos.m_pos = idir == 0 ? (Uint16)-1 : occup;
pos.m_dir = 3;
}
if (pos.m_dir == 3) {
- // within node
+ // before or within node
jam();
- // advance position
- if (! pos.m_match)
- pos.m_match = true;
- else
- // becomes ZNIL (which is > occup) if 0 and scan descending
- pos.m_pos += jdir;
+ // advance position - becomes ZNIL (> occup) if 0 and descending
+ pos.m_pos += jdir;
if (pos.m_pos < occup) {
jam();
- ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged
- // read and compare all attributes
- readKeyAttrs(frag, ent, 0, c_entryKey);
- int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, scan.m_boundCnt[1 - idir], c_entryKey);
- ndbrequire(ret != NdbSqlUtil::CmpUnknown);
- if (jdir * ret < 0) {
+ ent = node.getEnt(pos.m_pos);
+ if (! scanCheck(scanPtr, ent)) {
jam();
- // hit upper bound of single range scan
pos.m_loc = NullTupLoc;
- scan.m_state = ScanOp::Last;
- break;
- }
- // can we see it
- if (! scanVisible(scanPtr, ent)) {
- jam();
- continue;
}
- // found entry
- scan.m_state = ScanOp::Current;
break;
}
// after node proceed to right child
@@ -895,31 +919,64 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
// copy back position
scan.m_scanPos = pos;
// relink
- if (scan.m_state == ScanOp::Current) {
- ndbrequire(pos.m_match == true && pos.m_dir == 3);
+ if (pos.m_loc != NullTupLoc) {
+ ndbrequire(pos.m_dir == 3);
ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) {
jam();
unlinkScan(origNode, scanPtr);
linkScan(node, scanPtr);
}
- // copy found entry
- scan.m_scanEnt = ent;
- } else if (scan.m_state == ScanOp::Last) {
+ if (scan.m_state != ScanOp::Blocked) {
+ scan.m_state = ScanOp::Current;
+ } else {
+ jam();
+ ndbrequire(fromMaintReq);
+ TreeEnt& scanEnt = scan.m_scanEnt;
+ ndbrequire(scanEnt.m_tupLoc != NullTupLoc);
+ if (scanEnt.eqtuple(ent)) {
+ // remains blocked on another version
+ scanEnt = ent;
+ } else {
+ jam();
+ scanEnt.m_tupLoc = NullTupLoc;
+ scan.m_state = ScanOp::Current;
+ }
+ }
+ } else {
jam();
- ndbrequire(pos.m_loc == NullTupLoc);
unlinkScan(origNode, scanPtr);
- } else {
- ndbrequire(false);
+ scan.m_state = ScanOp::Last;
}
#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
- debugOut << "Next out scan " << scanPtr.i << " " << scan << endl;
+ if (debugFlags & (DebugMaint | DebugScan)) {
+ debugOut << "Leave next scan " << scanPtr.i << " " << scan << endl;
}
#endif
}
/*
+ * Check end key. Return true if scan is still within range.
+ */
+bool
+Dbtux::scanCheck(ScanOpPtr scanPtr, TreeEnt ent)
+{
+ ScanOp& scan = *scanPtr.p;
+ Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
+ const unsigned idir = scan.m_descending;
+ const int jdir = 1 - 2 * (int)idir;
+ unpackBound(*scan.m_bound[1 - idir], c_dataBuffer);
+ unsigned boundCnt = scan.m_boundCnt[1 - idir];
+ readKeyAttrs(frag, ent, 0, c_entryKey);
+ int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, boundCnt, c_entryKey);
+ ndbrequire(ret != NdbSqlUtil::CmpUnknown);
+ if (jdir * ret > 0)
+ return true;
+ // hit upper bound of single range scan
+ return false;
+}
+
+/*
* Check if an entry is visible to the scan.
*
* There is a special check to never accept same tuple twice in a row.
diff --git a/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp b/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
index b0e2a664bfd..4b5c0b791f9 100644
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
@@ -21,22 +21,18 @@
* Search for entry to add.
*
* Similar to searchToRemove (see below).
- *
- * TODO optimize for initial equal attrs in node min/max
*/
-void
+bool
Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
- // assume success
- treePos.m_match = false;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
- return;
+ return true;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
/*
@@ -94,9 +90,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
- // failed
- treePos.m_match = true;
- return;
+ // entry found - error
+ return false;
}
break;
}
@@ -104,7 +99,7 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
treePos.m_loc = currNode.m_loc;
// binary search
int lo = -1;
- unsigned hi = currNode.getOccup();
+ int hi = currNode.getOccup();
int ret;
while (1) {
jam();
@@ -126,9 +121,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
lo = j;
else {
treePos.m_pos = j;
- // failed
- treePos.m_match = true;
- return;
+ // entry found - error
+ return false;
}
if (hi - lo == 1)
break;
@@ -136,22 +130,23 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
if (ret < 0) {
jam();
treePos.m_pos = hi;
- return;
+ return true;
}
if (hi < currNode.getOccup()) {
jam();
treePos.m_pos = hi;
- return;
+ return true;
}
if (bottomNode.isNull()) {
jam();
treePos.m_pos = hi;
- return;
+ return true;
}
jam();
// backwards compatible for now
treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0;
+ return true;
}
/*
@@ -163,21 +158,17 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
* then the saved node is the g.l.b of the final node and we move back
* to it.
*/
-void
+bool
Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
- // assume success
- treePos.m_match = true;
if (currNode.m_loc == NullTupLoc) {
- // empty tree
+ // empty tree - failed
jam();
- // failed
- treePos.m_match = false;
- return;
+ return false;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
@@ -229,7 +220,7 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
- return;
+ return true;
}
break;
}
@@ -242,12 +233,12 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo
if (searchEnt.eq(currNode.getEnt(j))) {
jam();
treePos.m_pos = j;
- return;
+ return true;
}
}
treePos.m_pos = currNode.getOccup();
- // failed
- treePos.m_match = false;
+ // not found - failed
+ return false;
}
/*
@@ -278,8 +269,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun
currNode.m_loc = tree.m_root;
NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag);
- // always before entry
- treePos.m_match = false;
while (true) {
jam();
selectNode(currNode, currNode.m_loc);
@@ -315,7 +304,7 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun
treePos.m_dir = 3;
return;
}
- } else if (ret > 0) {
+ } else {
// bound is at or right of this node
jam();
const TupLoc loc = currNode.getLink(1);
@@ -327,8 +316,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun
currNode.m_loc = loc;
continue;
}
- } else {
- ndbrequire(false);
}
break;
}
@@ -369,8 +356,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou
currNode.m_loc = tree.m_root;
NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag);
- // always before entry
- treePos.m_match = false;
while (true) {
jam();
selectNode(currNode, currNode.m_loc);
@@ -403,7 +388,7 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou
// empty result set
return;
}
- } else if (ret > 0) {
+ } else {
// bound is at or right of this node
jam();
const TupLoc loc = currNode.getLink(1);
@@ -415,8 +400,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou
currNode.m_loc = loc;
continue;
}
- } else {
- ndbrequire(false);
}
break;
}
diff --git a/storage/ndb/src/kernel/vm/Configuration.cpp b/storage/ndb/src/kernel/vm/Configuration.cpp
index 12badffe0e0..81b87c818fb 100644
--- a/storage/ndb/src/kernel/vm/Configuration.cpp
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp
@@ -58,11 +58,14 @@ NDB_STD_OPTS_VARS;
// XXX should be my_bool ???
static int _daemon, _no_daemon, _foreground, _initial, _no_start;
static int _initialstart;
-static const char* _nowait_nodes;
+static const char* _nowait_nodes = 0;
+static const char* _bind_address = 0;
extern Uint32 g_start_type;
extern NdbNodeBitmask g_nowait_nodes;
+const char *load_default_groups[]= { "mysql_cluster","ndbd",0 };
+
/**
* Arguments to NDB process
*/
@@ -98,6 +101,10 @@ static struct my_option my_long_options[] =
"Perform initial start",
(gptr*) &_initialstart, (gptr*) &_initialstart, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+ { "bind-address", OPT_NOWAIT_NODES,
+ "Local bind address",
+ (gptr*) &_bind_address, (gptr*) &_bind_address, 0,
+ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
static void short_usage_sub(void)
@@ -108,6 +115,8 @@ static void usage()
{
short_usage_sub();
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
@@ -115,7 +124,6 @@ static void usage()
bool
Configuration::init(int argc, char** argv)
{
- const char *load_default_groups[]= { "mysql_cluster","ndbd",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
@@ -257,7 +265,9 @@ Configuration::fetch_configuration(){
m_mgmd_port= 0;
m_config_retriever= new ConfigRetriever(getConnectString(),
- NDB_VERSION, NODE_TYPE_DB);
+ NDB_VERSION,
+ NODE_TYPE_DB,
+ _bind_address);
if (m_config_retriever->hasError())
{
diff --git a/storage/ndb/src/mgmapi/mgmapi.cpp b/storage/ndb/src/mgmapi/mgmapi.cpp
index 3f3ae7136cb..df71d71a8c1 100644
--- a/storage/ndb/src/mgmapi/mgmapi.cpp
+++ b/storage/ndb/src/mgmapi/mgmapi.cpp
@@ -107,6 +107,7 @@ struct ndb_mgm_handle {
int mgmd_version_major;
int mgmd_version_minor;
int mgmd_version_build;
+ char * m_bindaddress;
};
#define SET_ERROR(h, e, s) setError(h, e, __LINE__, s)
@@ -168,6 +169,7 @@ ndb_mgm_create_handle()
h->cfg_i = -1;
h->errstream = stdout;
h->m_name = 0;
+ h->m_bindaddress = 0;
strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE);
@@ -215,6 +217,22 @@ ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv)
DBUG_RETURN(0);
}
+extern "C"
+int
+ndb_mgm_set_bindaddress(NdbMgmHandle handle, const char * arg)
+{
+ DBUG_ENTER("ndb_mgm_set_bindaddress");
+ if (handle->m_bindaddress)
+ free(handle->m_bindaddress);
+
+ if (arg)
+ handle->m_bindaddress = strdup(arg);
+ else
+ handle->m_bindaddress = 0;
+
+ DBUG_RETURN(0);
+}
+
/**
* Destroy a handle
*/
@@ -241,6 +259,8 @@ ndb_mgm_destroy_handle(NdbMgmHandle * handle)
#endif
(*handle)->cfg.~LocalConfig();
my_free((*handle)->m_name, MYF(MY_ALLOW_ZERO_PTR));
+ if ((*handle)->m_bindaddress)
+ free((*handle)->m_bindaddress);
my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR));
* handle = 0;
DBUG_VOID_RETURN;
@@ -433,6 +453,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
BaseString::snprintf(logname, 64, "mgmapi.log");
handle->logfile = fopen(logname, "w");
#endif
+ char buf[1024];
/**
* Do connect
@@ -440,6 +461,50 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
LocalConfig &cfg= handle->cfg;
NDB_SOCKET_TYPE sockfd= NDB_INVALID_SOCKET;
Uint32 i;
+ int binderror = 0;
+ SocketClient s(0, 0);
+ if (!s.init())
+ {
+ fprintf(handle->errstream,
+ "Unable to create socket, "
+ "while trying to connect with connect string: %s\n",
+ cfg.makeConnectString(buf,sizeof(buf)));
+
+ setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
+ "Unable to create socket, "
+ "while trying to connect with connect string: %s\n",
+ cfg.makeConnectString(buf,sizeof(buf)));
+ DBUG_RETURN(-1);
+ }
+
+ if (handle->m_bindaddress)
+ {
+ BaseString::snprintf(buf, sizeof(buf), handle->m_bindaddress);
+ unsigned short portno = 0;
+ char * port = strchr(buf, ':');
+ if (port != 0)
+ {
+ portno = atoi(port+1);
+ * port = 0;
+ }
+ int err;
+ if ((err = s.bind(buf, portno)) != 0)
+ {
+ fprintf(handle->errstream,
+ "Unable to bind local address %s errno: %d, "
+ "while trying to connect with connect string: %s\n",
+ handle->m_bindaddress, err,
+ cfg.makeConnectString(buf,sizeof(buf)));
+
+ setError(handle, NDB_MGM_BIND_ADDRESS, __LINE__,
+ "Unable to bind local address %s errno: %d, "
+ "while trying to connect with connect string: %s\n",
+ handle->m_bindaddress, err,
+ cfg.makeConnectString(buf,sizeof(buf)));
+ DBUG_RETURN(-1);
+ }
+ }
+
while (sockfd == NDB_INVALID_SOCKET)
{
// do all the mgmt servers
@@ -447,8 +512,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
{
if (cfg.ids[i].type != MgmId_TCP)
continue;
- SocketClient s(cfg.ids[i].name.c_str(), cfg.ids[i].port);
- sockfd = s.connect();
+ sockfd = s.connect(cfg.ids[i].name.c_str(), cfg.ids[i].port);
if (sockfd != NDB_INVALID_SOCKET)
break;
}
@@ -456,19 +520,17 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
break;
#ifndef DBUG_OFF
{
- char buf[1024];
DBUG_PRINT("info",("Unable to connect with connect string: %s",
cfg.makeConnectString(buf,sizeof(buf))));
}
#endif
if (verbose > 0) {
- char buf[1024];
- fprintf(handle->errstream, "Unable to connect with connect string: %s\n",
+ fprintf(handle->errstream,
+ "Unable to connect with connect string: %s\n",
cfg.makeConnectString(buf,sizeof(buf)));
verbose= -1;
}
if (no_retries == 0) {
- char buf[1024];
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
"Unable to connect with connect string: %s",
cfg.makeConnectString(buf,sizeof(buf)));
@@ -509,6 +571,18 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
}
/**
+ * Only used for low level testing
+ * Never to be used by end user.
+ * Or anybody who doesn't know exactly what they're doing.
+ */
+extern "C"
+int
+ndb_mgm_get_fd(NdbMgmHandle handle)
+{
+ return handle->socket;
+}
+
+/**
* Disconnect from a mgm server
*/
extern "C"
@@ -698,22 +772,16 @@ ndb_mgm_get_status(NdbMgmHandle handle)
SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
return NULL;
}
- if(buf[strlen(buf)-1] == '\n')
- buf[strlen(buf)-1] = '\0';
-
- if(strcmp("node status", buf) != 0) {
+ if(strcmp("node status\n", buf) != 0) {
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL;
}
-
if(!in.gets(buf, sizeof(buf)))
{
SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
return NULL;
}
- if(buf[strlen(buf)-1] == '\n')
- buf[strlen(buf)-1] = '\0';
-
+
BaseString tmp(buf);
Vector<BaseString> split;
tmp.split(split, ":");
@@ -721,7 +789,7 @@ ndb_mgm_get_status(NdbMgmHandle handle)
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL;
}
-
+
if(!(split[0].trim() == "nodes")){
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL;
@@ -2298,7 +2366,6 @@ ndb_mgm_check_connection(NdbMgmHandle handle){
SocketOutputStream out(handle->socket);
SocketInputStream in(handle->socket, handle->read_timeout);
char buf[32];
-
if (out.println("check connection"))
goto ndb_mgm_check_connection_error;
@@ -2508,7 +2575,6 @@ int ndb_mgm_end_session(NdbMgmHandle handle)
SocketInputStream in(handle->socket, handle->read_timeout);
char buf[32];
-
in.gets(buf, sizeof(buf));
DBUG_RETURN(0);
@@ -2566,4 +2632,104 @@ int ndb_mgm_get_version(NdbMgmHandle handle,
DBUG_RETURN(1);
}
+extern "C"
+Uint64
+ndb_mgm_get_session_id(NdbMgmHandle handle)
+{
+ Uint64 session_id=0;
+
+ DBUG_ENTER("ndb_mgm_get_session_id");
+ CHECK_HANDLE(handle, 0);
+ CHECK_CONNECTED(handle, 0);
+
+ Properties args;
+
+ const ParserRow<ParserDummy> reply[]= {
+ MGM_CMD("get session id reply", NULL, ""),
+ MGM_ARG("id", Int, Mandatory, "Node ID"),
+ MGM_END()
+ };
+
+ const Properties *prop;
+ prop = ndb_mgm_call(handle, reply, "get session id", &args);
+ CHECK_REPLY(prop, 0);
+
+ if(!prop->get("id",&session_id)){
+ fprintf(handle->errstream, "Unable to get session id\n");
+ return 0;
+ }
+
+ delete prop;
+ DBUG_RETURN(session_id);
+}
+
+extern "C"
+int
+ndb_mgm_get_session(NdbMgmHandle handle, Uint64 id,
+ struct NdbMgmSession *s, int *len)
+{
+ int retval= 0;
+ DBUG_ENTER("ndb_mgm_get_session");
+ CHECK_HANDLE(handle, 0);
+ CHECK_CONNECTED(handle, 0);
+
+ Properties args;
+ args.put("id", id);
+
+ const ParserRow<ParserDummy> reply[]= {
+ MGM_CMD("get session reply", NULL, ""),
+ MGM_ARG("id", Int, Mandatory, "Node ID"),
+ MGM_ARG("m_stopSelf", Int, Optional, "m_stopSelf"),
+ MGM_ARG("m_stop", Int, Optional, "stop session"),
+ MGM_ARG("nodeid", Int, Optional, "allocated node id"),
+ MGM_ARG("parser_buffer_len", Int, Optional, "waiting in buffer"),
+ MGM_ARG("parser_status", Int, Optional, "parser status"),
+ MGM_END()
+ };
+
+ const Properties *prop;
+ prop = ndb_mgm_call(handle, reply, "get session", &args);
+ CHECK_REPLY(prop, 0);
+
+ Uint64 r_id;
+ int rlen= 0;
+
+ if(!prop->get("id",&r_id)){
+ fprintf(handle->errstream, "Unable to get session id\n");
+ goto err;
+ }
+
+ s->id= r_id;
+ rlen+=sizeof(s->id);
+
+ if(prop->get("m_stopSelf",&(s->m_stopSelf)))
+ rlen+=sizeof(s->m_stopSelf);
+ else
+ goto err;
+
+ if(prop->get("m_stop",&(s->m_stop)))
+ rlen+=sizeof(s->m_stop);
+ else
+ goto err;
+
+ if(prop->get("nodeid",&(s->nodeid)))
+ rlen+=sizeof(s->nodeid);
+ else
+ goto err;
+
+ if(prop->get("parser_buffer_len",&(s->parser_buffer_len)))
+ {
+ rlen+=sizeof(s->parser_buffer_len);
+ if(prop->get("parser_status",&(s->parser_status)))
+ rlen+=sizeof(s->parser_status);
+ }
+
+ *len= rlen;
+ retval= 1;
+
+err:
+ delete prop;
+ DBUG_RETURN(retval);
+}
+
template class Vector<const ParserRow<ParserDummy>*>;
diff --git a/storage/ndb/src/mgmclient/main.cpp b/storage/ndb/src/mgmclient/main.cpp
index 954696ef013..9dc3672bedd 100644
--- a/storage/ndb/src/mgmclient/main.cpp
+++ b/storage/ndb/src/mgmclient/main.cpp
@@ -38,6 +38,7 @@ extern "C" int add_history(const char *command); /* From readline directory */
#include "ndb_mgmclient.hpp"
const char *progname = "ndb_mgm";
+const char *load_default_groups[]= { "mysql_cluster","ndb_mgm",0 };
static Ndb_mgmclient* com;
@@ -87,6 +88,8 @@ static void usage()
{
short_usage_sub();
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
@@ -128,7 +131,6 @@ int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *_host = 0;
int _port = 0;
- const char *load_default_groups[]= { "mysql_cluster","ndb_mgm",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
diff --git a/storage/ndb/src/mgmsrv/Services.cpp b/storage/ndb/src/mgmsrv/Services.cpp
index b2ff7e3d8fd..4a84095aa9f 100644
--- a/storage/ndb/src/mgmsrv/Services.cpp
+++ b/storage/ndb/src/mgmsrv/Services.cpp
@@ -270,6 +270,13 @@ ParserRow<MgmApiSession> commands[] = {
MGM_ARG("length", Int, Mandatory, "Length"),
MGM_ARG("data", String, Mandatory, "Data"),
+ MGM_CMD("list sessions", &MgmApiSession::listSessions, ""),
+
+ MGM_CMD("get session id", &MgmApiSession::getSessionId, ""),
+
+ MGM_CMD("get session", &MgmApiSession::getSession, ""),
+ MGM_ARG("id", Int, Mandatory, "SessionID"),
+
MGM_END()
};
@@ -282,7 +289,7 @@ struct PurgeStruct
NDB_TICKS tick;
};
-MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
+MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock, Uint64 session_id)
: SocketServer::Session(sock), m_mgmsrv(mgm)
{
DBUG_ENTER("MgmApiSession::MgmApiSession");
@@ -291,6 +298,9 @@ MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
m_parser = new Parser_t(commands, *m_input, true, true, true);
m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
m_stopSelf= 0;
+ m_ctx= NULL;
+ m_session_id= session_id;
+ m_mutex= NdbMutex_Create();
DBUG_VOID_RETURN;
}
@@ -314,6 +324,7 @@ MgmApiSession::~MgmApiSession()
g_RestartServer= true;
if(m_stopSelf)
g_StopServer= true;
+ NdbMutex_Destroy(m_mutex);
DBUG_VOID_RETURN;
}
@@ -323,11 +334,19 @@ MgmApiSession::runSession()
DBUG_ENTER("MgmApiSession::runSession");
Parser_t::Context ctx;
- while(!m_stop) {
+ ctx.m_mutex= m_mutex;
+ m_ctx= &ctx;
+ bool stop= false;
+ while(!stop) {
+ NdbMutex_Lock(m_mutex);
+
m_parser->run(ctx, *this);
if(ctx.m_currentToken == 0)
+ {
+ NdbMutex_Unlock(m_mutex);
break;
+ }
switch(ctx.m_status) {
case Parser_t::UnknownCommand:
@@ -348,13 +367,19 @@ MgmApiSession::runSession()
default:
break;
}
- }
+
+ stop= m_stop;
+ NdbMutex_Unlock(m_mutex);
+ };
+
+ NdbMutex_Lock(m_mutex);
+ m_ctx= NULL;
if(m_socket != NDB_INVALID_SOCKET)
{
NDB_CLOSE_SOCKET(m_socket);
m_socket= NDB_INVALID_SOCKET;
}
-
+ NdbMutex_Unlock(m_mutex);
DBUG_VOID_RETURN;
}
@@ -1592,11 +1617,6 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx,
result = -1;
goto done;
}
-
- m_mgmsrv.m_event_listner.add_listener(le);
-
- m_stop = true;
- m_socket = NDB_INVALID_SOCKET;
done:
m_output->println("listen event");
@@ -1604,6 +1624,13 @@ done:
if(result != 0)
m_output->println("msg: %s", msg.c_str());
m_output->println("");
+
+ if(result==0)
+ {
+ m_mgmsrv.m_event_listner.add_listener(le);
+ m_stop = true;
+ m_socket = NDB_INVALID_SOCKET;
+ }
}
void
@@ -1706,5 +1733,122 @@ MgmApiSession::report_event(Parser_t::Context &ctx,
m_output->println("");
}
+void
+MgmApiSession::list_session(SocketServer::Session *_s, void *data)
+{
+ MgmApiSession *s= (MgmApiSession *)_s;
+ MgmApiSession *lister= (MgmApiSession*) data;
+
+ if(s!=lister)
+ NdbMutex_Lock(s->m_mutex);
+
+ Uint64 id= s->m_session_id;
+ lister->m_output->println("session: %llu",id);
+ lister->m_output->println("session.%llu.m_stopSelf: %d",id,s->m_stopSelf);
+ lister->m_output->println("session.%llu.m_stop: %d",id,s->m_stop);
+ lister->m_output->println("session.%llu.allocated.nodeid: %d",id,s->m_allocated_resources->get_nodeid());
+ if(s->m_ctx)
+ {
+ int l= strlen(s->m_ctx->m_tokenBuffer);
+ char *buf= (char*) malloc(2*l+1);
+ char *b= buf;
+ for(int i=0; i<l;i++)
+ if(s->m_ctx->m_tokenBuffer[i]=='\n')
+ {
+ *b++='\\';
+ *b++='n';
+ }
+ else
+ {
+ *b++= s->m_ctx->m_tokenBuffer[i];
+ }
+ *b= '\0';
+
+ lister->m_output->println("session.%llu.parser.buffer.len: %u",id,l);
+ lister->m_output->println("session.%llu.parser.buffer: %s",id,buf);
+ lister->m_output->println("session.%llu.parser.status: %d",id,s->m_ctx->m_status);
+
+ free(buf);
+ }
+
+ if(s!=lister)
+ NdbMutex_Unlock(s->m_mutex);
+}
+
+void
+MgmApiSession::listSessions(Parser_t::Context &ctx,
+ Properties const &args) {
+ m_mgmsrv.get_socket_server()->foreachSession(list_session,(void*)this);
+
+ m_output->println("");
+}
+
+void
+MgmApiSession::getSessionId(Parser_t::Context &ctx,
+ Properties const &args) {
+ m_output->println("get session id reply");
+ m_output->println("id: %llu",m_session_id);
+ m_output->println("");
+}
+
+struct get_session_param {
+ MgmApiSession *l;
+ Uint64 id;
+ int found;
+};
+
+void
+MgmApiSession::get_session(SocketServer::Session *_s, void *data)
+{
+ struct get_session_param *p= (struct get_session_param*)data;
+ MgmApiSession *s= (MgmApiSession *)_s;
+
+ if(s!=p->l)
+ NdbMutex_Lock(s->m_mutex);
+
+ if(p->id != s->m_session_id)
+ {
+ if(s!=p->l)
+ NdbMutex_Unlock(s->m_mutex);
+ return;
+ }
+
+ p->found= true;
+ p->l->m_output->println("id: %llu",s->m_session_id);
+ p->l->m_output->println("m_stopSelf: %d",s->m_stopSelf);
+ p->l->m_output->println("m_stop: %d",s->m_stop);
+ p->l->m_output->println("nodeid: %d",s->m_allocated_resources->get_nodeid());
+ if(s->m_ctx)
+ {
+ int l= strlen(s->m_ctx->m_tokenBuffer);
+ p->l->m_output->println("parser_buffer_len: %u",l);
+ p->l->m_output->println("parser_status: %d",s->m_ctx->m_status);
+ }
+
+ if(s!=p->l)
+ NdbMutex_Unlock(s->m_mutex);
+}
+
+void
+MgmApiSession::getSession(Parser_t::Context &ctx,
+ Properties const &args) {
+ Uint64 id;
+ struct get_session_param p;
+
+ args.get("id", &id);
+
+ p.l= this;
+ p.id= id;
+ p.found= false;
+
+ m_output->println("get session reply");
+ m_mgmsrv.get_socket_server()->foreachSession(get_session,(void*)&p);
+
+ if(p.found==false)
+ m_output->println("id: 0");
+
+ m_output->println("");
+}
+
template class MutexVector<int>;
template class Vector<ParserRow<MgmApiSession> const*>;
diff --git a/storage/ndb/src/mgmsrv/Services.hpp b/storage/ndb/src/mgmsrv/Services.hpp
index b75ef6c277a..8b9ca7156b9 100644
--- a/storage/ndb/src/mgmsrv/Services.hpp
+++ b/storage/ndb/src/mgmsrv/Services.hpp
@@ -32,6 +32,8 @@ class MgmApiSession : public SocketServer::Session
{
static void stop_session_if_timed_out(SocketServer::Session *_s, void *data);
static void stop_session_if_not_connected(SocketServer::Session *_s, void *data);
+ static void list_session(SocketServer::Session *_s, void *data);
+ static void get_session(SocketServer::Session *_s, void *data);
private:
typedef Parser<MgmApiSession> Parser_t;
@@ -42,6 +44,11 @@ private:
MgmtSrvr::Allocated_resources *m_allocated_resources;
char m_err_str[1024];
int m_stopSelf; // -1 is restart, 0 do nothing, 1 stop
+ NdbMutex *m_mutex;
+
+ // for listing sessions and other fun:
+ Parser_t::Context *m_ctx;
+ Uint64 m_session_id;
void getConfig_common(Parser_t::Context &ctx,
const class Properties &args,
@@ -50,7 +57,7 @@ private:
{ return m_mgmsrv.getErrorText(err_no, m_err_str, sizeof(m_err_str)); }
public:
- MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock);
+ MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock, Uint64 session_id);
virtual ~MgmApiSession();
void runSession();
@@ -108,13 +115,20 @@ public:
void get_mgmd_nodeid(Parser_t::Context &ctx, Properties const &args);
void report_event(Parser_t::Context &ctx, Properties const &args);
+
+ void listSessions(Parser_t::Context &ctx, Properties const &args);
+
+ void getSessionId(Parser_t::Context &ctx, Properties const &args);
+ void getSession(Parser_t::Context &ctx, Properties const &args);
};
class MgmApiService : public SocketServer::Service {
class MgmtSrvr * m_mgmsrv;
+ Uint64 m_next_session_id; // Protected by m_sessions mutex it SocketServer
public:
MgmApiService(){
m_mgmsrv = 0;
+ m_next_session_id= 1;
}
void setMgm(class MgmtSrvr * mgmsrv){
@@ -122,7 +136,7 @@ public:
}
SocketServer::Session * newSession(NDB_SOCKET_TYPE socket){
- return new MgmApiSession(* m_mgmsrv, socket);
+ return new MgmApiSession(* m_mgmsrv, socket, m_next_session_id++);
}
};
diff --git a/storage/ndb/src/mgmsrv/main.cpp b/storage/ndb/src/mgmsrv/main.cpp
index 26cb0951d3a..9eb4ad9bde3 100644
--- a/storage/ndb/src/mgmsrv/main.cpp
+++ b/storage/ndb/src/mgmsrv/main.cpp
@@ -47,6 +47,7 @@
#define DEBUG(x) ndbout << x << endl;
const char progname[] = "mgmtsrvr";
+const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 };
// copied from mysql.cc to get readline
extern "C" {
@@ -183,6 +184,8 @@ static void usage()
{
short_usage_sub();
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
@@ -196,7 +199,6 @@ int main(int argc, char** argv)
NDB_INIT(argv[0]);
- const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
diff --git a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
index 6e65a5ba7e8..42ef7bbbaee 100644
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
@@ -773,6 +773,7 @@ NdbTableImpl::computeAggregates()
m_keyLenInWords = 0;
m_noOfDistributionKeys = 0;
m_noOfBlobs = 0;
+ m_noOfDiskColumns = 0;
Uint32 i, n;
for (i = 0; i < m_columns.size(); i++) {
NdbColumnImpl* col = m_columns[i];
@@ -785,6 +786,10 @@ NdbTableImpl::computeAggregates()
if (col->getBlobType())
m_noOfBlobs++;
+
+ if (col->getStorageType() == NdbDictionary::Column::StorageTypeDisk)
+ m_noOfDiskColumns++;
+
col->m_keyInfoPos = ~0;
}
if (m_noOfDistributionKeys == m_noOfKeys) {
@@ -1068,6 +1073,54 @@ NdbTableImpl::get_nodes(Uint32 hashValue, const Uint16 ** nodes) const
}
return 0;
}
+
+int
+NdbDictionary::Table::checkColumns(const Uint32* map, Uint32 len) const
+{
+ int ret = 0;
+ Uint32 colCnt = m_impl.m_columns.size();
+ if (map == 0)
+ {
+ ret |= 1;
+ ret |= (m_impl.m_noOfDiskColumns) ? 2 : 0;
+ ret |= (colCnt > m_impl.m_noOfDiskColumns) ? 4 : 0;
+ return ret;
+ }
+
+ NdbColumnImpl** cols = m_impl.m_columns.getBase();
+ const char * ptr = reinterpret_cast<const char*>(map);
+ const char * end = ptr + len;
+ Uint32 no = 0;
+ while (ptr < end)
+ {
+ Uint32 val = (Uint32)* ptr;
+ Uint32 idx = 1;
+ for (Uint32 i = 0; i<8; i++)
+ {
+ if (val & idx)
+ {
+ if (cols[no]->getPrimaryKey())
+ ret |= 1;
+ else
+ {
+ if (cols[no]->getStorageType() == NdbDictionary::Column::StorageTypeDisk)
+ ret |= 2;
+ else
+ ret |= 4;
+ }
+ }
+ no ++;
+ idx *= 2;
+ if (no == colCnt)
+ return ret;
+ }
+
+ ptr++;
+ }
+ return ret;
+}
+
+
/**
* NdbIndexImpl
diff --git a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
index 30e7628dfef..e5f68cfcc81 100644
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
@@ -225,7 +225,7 @@ public:
// if all pk = dk then this is zero!
Uint8 m_noOfDistributionKeys;
Uint8 m_noOfBlobs;
-
+ Uint8 m_noOfDiskColumns;
Uint8 m_replicaCount;
/**
diff --git a/storage/ndb/src/ndbapi/NdbScanOperation.cpp b/storage/ndb/src/ndbapi/NdbScanOperation.cpp
index 2ce39453a4f..2d47f79ee09 100644
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -174,7 +174,12 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
}
}
#endif
-
+ if (scan_flags & SF_DiskScan)
+ {
+ tupScan = true;
+ m_no_disk_flag = false;
+ }
+
bool rangeScan = false;
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)
{
@@ -696,9 +701,27 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp)
theNdbCon = NULL;
m_transConnection = NULL;
- if (releaseOp && tTransCon) {
+ if (tTransCon && releaseOp)
+ {
NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
- tTransCon->releaseScanOperation(tOp);
+
+ bool ret = true;
+ if (theStatus != WaitResponse)
+ {
+ /**
+ * Not executed yet
+ */
+ ret =
+ tTransCon->releaseScanOperation(&tTransCon->m_theFirstScanOperation,
+ &tTransCon->m_theLastScanOperation,
+ tOp);
+ }
+ else
+ {
+ ret = tTransCon->releaseScanOperation(&tTransCon->m_firstExecutedScanOp,
+ 0, tOp);
+ }
+ assert(ret);
}
tCon->theScanningOp = 0;
diff --git a/storage/ndb/src/ndbapi/NdbTransaction.cpp b/storage/ndb/src/ndbapi/NdbTransaction.cpp
index 9788b9bdce6..dad866ff453 100644
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp
@@ -978,52 +978,61 @@ void releaseScanOperation();
Remark: Release scan op when hupp'ed trans closed (save memory)
******************************************************************************/
void
-NdbTransaction::releaseScanOperation(NdbIndexScanOperation* cursorOp)
+NdbTransaction::releaseExecutedScanOperation(NdbIndexScanOperation* cursorOp)
{
- DBUG_ENTER("NdbTransaction::releaseScanOperation");
+ DBUG_ENTER("NdbTransaction::releaseExecutedScanOperation");
DBUG_PRINT("enter", ("this=0x%x op=0x%x", (UintPtr)this, (UintPtr)cursorOp));
+
+ releaseScanOperation(&m_firstExecutedScanOp, 0, cursorOp);
+
+ DBUG_VOID_RETURN;
+}//NdbTransaction::releaseExecutedScanOperation()
- // here is one reason to make op lists doubly linked
- if (cursorOp->m_executed)
+bool
+NdbTransaction::releaseScanOperation(NdbIndexScanOperation** listhead,
+ NdbIndexScanOperation** listtail,
+ NdbIndexScanOperation* op)
+{
+ if (* listhead == op)
{
- if (m_firstExecutedScanOp == cursorOp) {
- m_firstExecutedScanOp = (NdbIndexScanOperation*)cursorOp->theNext;
- cursorOp->release();
- theNdb->releaseScanOperation(cursorOp);
- } else if (m_firstExecutedScanOp != NULL) {
- NdbIndexScanOperation* tOp = m_firstExecutedScanOp;
- while (tOp->theNext != NULL) {
- if (tOp->theNext == cursorOp) {
- tOp->theNext = cursorOp->theNext;
- cursorOp->release();
- theNdb->releaseScanOperation(cursorOp);
- break;
- }
- tOp = (NdbIndexScanOperation*)tOp->theNext;
- }
+ * listhead = (NdbIndexScanOperation*)op->theNext;
+ if (listtail && *listtail == op)
+ {
+ assert(* listhead == 0);
+ * listtail = 0;
}
+
}
else
{
- if (m_theFirstScanOperation == cursorOp) {
- m_theFirstScanOperation = (NdbIndexScanOperation*)cursorOp->theNext;
- cursorOp->release();
- theNdb->releaseScanOperation(cursorOp);
- } else if (m_theFirstScanOperation != NULL) {
- NdbIndexScanOperation* tOp = m_theFirstScanOperation;
- while (tOp->theNext != NULL) {
- if (tOp->theNext == cursorOp) {
- tOp->theNext = cursorOp->theNext;
- cursorOp->release();
- theNdb->releaseScanOperation(cursorOp);
- break;
- }
- tOp = (NdbIndexScanOperation*)tOp->theNext;
+ NdbIndexScanOperation* tmp = * listhead;
+ while (tmp != NULL)
+ {
+ if (tmp->theNext == op)
+ {
+ tmp->theNext = (NdbIndexScanOperation*)op->theNext;
+ if (listtail && *listtail == op)
+ {
+ assert(op->theNext == 0);
+ *listtail = tmp;
+ }
+ break;
}
+ tmp = (NdbIndexScanOperation*)tmp->theNext;
}
+ if (tmp == NULL)
+ op = NULL;
}
- DBUG_VOID_RETURN;
-}//NdbTransaction::releaseScanOperation()
+
+ if (op != NULL)
+ {
+ op->release();
+ theNdb->releaseScanOperation(op);
+ return true;
+ }
+
+ return false;
+}
/*****************************************************************************
NdbOperation* getNdbOperation(const char* aTableName);
diff --git a/storage/ndb/test/ndbapi/testMgm.cpp b/storage/ndb/test/ndbapi/testMgm.cpp
index ef653d3f972..cd0efb34a97 100644
--- a/storage/ndb/test/ndbapi/testMgm.cpp
+++ b/storage/ndb/test/ndbapi/testMgm.cpp
@@ -21,6 +21,8 @@
#include <NdbRestarter.hpp>
#include <Vector.hpp>
#include <random.h>
+#include <mgmapi.h>
+#include <mgmapi_debug.h>
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){
@@ -167,6 +169,44 @@ int runTestSingleUserMode(NDBT_Context* ctx, NDBT_Step* step){
return result;
}
+int runTestApiSession(NDBT_Context* ctx, NDBT_Step* step)
+{
+ char *mgm= ctx->getRemoteMgm();
+ Uint64 session_id= 0;
+
+ NdbMgmHandle h;
+ h= ndb_mgm_create_handle();
+ ndb_mgm_set_connectstring(h, mgm);
+ ndb_mgm_connect(h,0,0,0);
+ int s= ndb_mgm_get_fd(h);
+ session_id= ndb_mgm_get_session_id(h);
+ ndbout << "MGM Session id: " << session_id << endl;
+ write(s,"get",3);
+ ndb_mgm_disconnect(h);
+ ndb_mgm_destroy_handle(&h);
+
+ struct NdbMgmSession sess;
+ int slen= sizeof(struct NdbMgmSession);
+
+ h= ndb_mgm_create_handle();
+ ndb_mgm_set_connectstring(h, mgm);
+ ndb_mgm_connect(h,0,0,0);
+
+ if(ndb_mgm_get_session(h,session_id,&sess,&slen))
+ {
+ ndbout << "Failed, session still exists" << endl;
+ ndb_mgm_disconnect(h);
+ ndb_mgm_destroy_handle(&h);
+ return NDBT_FAILED;
+ }
+ else
+ {
+ ndbout << "SUCCESS: session is gone" << endl;
+ ndb_mgm_disconnect(h);
+ ndb_mgm_destroy_handle(&h);
+ return NDBT_OK;
+ }
+}
NDBT_TESTSUITE(testMgm);
@@ -175,6 +215,11 @@ TESTCASE("SingleUserMode",
INITIALIZER(runTestSingleUserMode);
FINALIZER(runClearTable);
}
+TESTCASE("ApiSessionFailure",
+ "Test failures in MGMAPI session"){
+ INITIALIZER(runTestApiSession);
+
+}
NDBT_TESTSUITE_END(testMgm);
int main(int argc, const char** argv){
diff --git a/storage/ndb/test/ndbapi/testOIBasic.cpp b/storage/ndb/test/ndbapi/testOIBasic.cpp
index a752b9995f0..271f19a5477 100644
--- a/storage/ndb/test/ndbapi/testOIBasic.cpp
+++ b/storage/ndb/test/ndbapi/testOIBasic.cpp
@@ -47,7 +47,6 @@ struct Opt {
int m_die;
bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype;
- unsigned m_subsubloop;
const char* m_index;
unsigned m_loop;
bool m_msglock;
@@ -56,6 +55,7 @@ struct Opt {
unsigned m_pctnull;
unsigned m_rows;
unsigned m_samples;
+ unsigned m_scanbatch;
unsigned m_scanpar;
unsigned m_scanstop;
int m_seed;
@@ -74,7 +74,6 @@ struct Opt {
m_die(0),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
- m_subsubloop(4),
m_index(0),
m_loop(1),
m_msglock(true),
@@ -83,6 +82,7 @@ struct Opt {
m_pctnull(10),
m_rows(1000),
m_samples(0),
+ m_scanbatch(0),
m_scanpar(0),
m_scanstop(0),
m_seed(-1),
@@ -120,9 +120,10 @@ printhelp()
<< " -pctnull N pct NULL values in nullable column [" << d.m_pctnull << "]" << endl
<< " -rows N rows per thread [" << d.m_rows << "]" << endl
<< " -samples N samples for some timings (0=all) [" << d.m_samples << "]" << endl
- << " -scanpar N scan parallelism [" << d.m_scanpar << "]" << endl
+ << " -scanbatch N scan batch 0=default [" << d.m_scanbatch << "]" << endl
+ << " -scanpar N scan parallel 0=default [" << d.m_scanpar << "]" << endl
<< " -seed N srandom seed 0=loop number -1=random [" << d.m_seed << "]" << endl
- << " -subloop N subtest loop count [" << d.m_subloop << "]" << endl
+ << " -subloop N subtest (and subsubtest) loop count [" << d.m_subloop << "]" << endl
<< " -table xyz only given table numbers (digits 0-9)" << endl
<< " -threads N number of threads [" << d.m_threads << "]" << endl
<< " -vN verbosity [" << d.m_v << "]" << endl
@@ -294,6 +295,7 @@ struct Par : public Opt {
Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
+ char m_currcase[2];
unsigned m_lno;
unsigned m_slno;
unsigned m_totrows;
@@ -302,6 +304,7 @@ struct Par : public Opt {
unsigned m_pctrange;
unsigned m_pctbrange;
int m_bdir;
+ bool m_noindexkeyupdate;
// choice of key
bool m_randomkey;
// do verify after read
@@ -330,6 +333,7 @@ struct Par : public Opt {
m_pctrange(40),
m_pctbrange(80),
m_bdir(0),
+ m_noindexkeyupdate(false),
m_randomkey(false),
m_verify(false),
m_deadlock(false),
@@ -337,7 +341,9 @@ struct Par : public Opt {
m_lockmode(NdbOperation::LM_Read),
m_tupscan(false),
m_ordered(false),
- m_descending(false) {
+ m_descending(false)
+ {
+ m_currcase[0] = 0;
}
};
@@ -892,6 +898,8 @@ struct Tab {
const Col** m_col;
unsigned m_itabs;
const ITab** m_itab;
+ unsigned m_orderedindexes;
+ unsigned m_hashindexes;
// pk must contain an Unsigned column
unsigned m_keycol;
void coladd(unsigned k, Col* colptr);
@@ -906,6 +914,8 @@ Tab::Tab(const char* name, unsigned cols, unsigned itabs, unsigned keycol) :
m_col(new const Col* [cols + 1]),
m_itabs(itabs),
m_itab(new const ITab* [itabs + 1]),
+ m_orderedindexes(0),
+ m_hashindexes(0),
m_keycol(keycol)
{
for (unsigned k = 0; k <= cols; k++)
@@ -935,8 +945,12 @@ Tab::coladd(unsigned k, Col* colptr)
void
Tab::itabadd(unsigned j, ITab* itabptr)
{
- assert(j < m_itabs && m_itab[j] == 0);
+ assert(j < m_itabs && m_itab[j] == 0 && itabptr != 0);
m_itab[j] = itabptr;
+ if (itabptr->m_type == ITab::OrderedIndex)
+ m_orderedindexes++;
+ else
+ m_hashindexes++;
}
static NdbOut&
@@ -1434,7 +1448,7 @@ Con::readTuples(Par par)
int scan_flags = 0;
if (par.m_tupscan)
scan_flags |= NdbScanOperation::SF_TupScan;
- CHKCON(m_scanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar) == 0, *this);
+ CHKCON(m_scanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
return 0;
}
@@ -1442,7 +1456,12 @@ int
Con::readIndexTuples(Par par)
{
assert(m_tx != 0 && m_indexscanop != 0);
- CHKCON(m_indexscanop->readTuples(par.m_lockmode, 0, par.m_scanpar, par.m_ordered, par.m_descending) == 0, *this);
+ int scan_flags = 0;
+ if (par.m_ordered)
+ scan_flags |= NdbScanOperation::SF_OrderBy;
+ if (par.m_descending)
+ scan_flags |= NdbScanOperation::SF_Descending;
+ CHKCON(m_indexscanop->readTuples(par.m_lockmode, scan_flags, par.m_scanpar, par.m_scanbatch) == 0, *this);
return 0;
}
@@ -2193,7 +2212,7 @@ struct Row {
void copy(const Row& row2);
void calc(Par par, unsigned i, unsigned mask = 0);
const Row& dbrow() const;
- int verify(Par par, const Row& row2) const;
+ int verify(Par par, const Row& row2, bool pkonly) const;
int insrow(Par par);
int updrow(Par par);
int updrow(Par par, const ITab& itab);
@@ -2275,15 +2294,18 @@ Row::dbrow() const
}
int
-Row::verify(Par par, const Row& row2) const
+Row::verify(Par par, const Row& row2, bool pkonly) const
{
const Tab& tab = m_tab;
const Row& row1 = *this;
assert(&row1.m_tab == &row2.m_tab && row1.m_exist && row2.m_exist);
for (unsigned k = 0; k < tab.m_cols; k++) {
- const Val& val1 = *row1.m_val[k];
- const Val& val2 = *row2.m_val[k];
- CHK(val1.verify(par, val2) == 0);
+ const Col& col = row1.m_val[k]->m_col;
+ if (! pkonly || col.m_pk) {
+ const Val& val1 = *row1.m_val[k];
+ const Val& val2 = *row2.m_val[k];
+ CHK(val1.verify(par, val2) == 0);
+ }
}
return 0;
}
@@ -2585,8 +2607,11 @@ struct Set {
int getval(Par par);
int getkey(Par par, unsigned* i);
int putval(unsigned i, bool force, unsigned n = ~0);
+ // sort rows in-place according to ordered index
+ void sort(Par par, const ITab& itab);
+ void sort(Par par, const ITab& itab, unsigned lo, unsigned hi);
// verify
- int verify(Par par, const Set& set2) const;
+ int verify(Par par, const Set& set2, bool pkonly) const;
int verifyorder(Par par, const ITab& itab, bool descending) const;
// protect structure
NdbMutex* m_mutex;
@@ -2890,6 +2915,7 @@ Set::getkey(Par par, unsigned* i)
assert(m_rec[k] != 0);
const char* aRef = m_rec[k]->aRef();
Uint32 key = *(const Uint32*)aRef;
+ LL5("getkey: " << key);
CHK(key < m_rows);
*i = key;
return 0;
@@ -2922,8 +2948,43 @@ Set::putval(unsigned i, bool force, unsigned n)
return 0;
}
+void
+Set::sort(Par par, const ITab& itab)
+{
+ if (m_rows != 0)
+ sort(par, itab, 0, m_rows - 1);
+}
+
+void
+Set::sort(Par par, const ITab& itab, unsigned lo, unsigned hi)
+{
+ assert(lo < m_rows && hi < m_rows && lo <= hi);
+ Row* const p = m_row[lo];
+ unsigned i = lo;
+ unsigned j = hi;
+ while (i < j) {
+ while (i < j && m_row[j]->cmp(par, *p, itab) >= 0)
+ j--;
+ if (i < j) {
+ m_row[i] = m_row[j];
+ i++;
+ }
+ while (i < j && m_row[i]->cmp(par, *p, itab) <= 0)
+ i++;
+ if (i < j) {
+ m_row[j] = m_row[i];
+ j--;
+ }
+ }
+ m_row[i] = p;
+ if (lo < i)
+ sort(par, itab, lo, i - 1);
+ if (hi > i)
+ sort(par, itab, i + 1, hi);
+}
+
int
-Set::verify(Par par, const Set& set2) const
+Set::verify(Par par, const Set& set2, bool pkonly) const
{
assert(&m_tab == &set2.m_tab && m_rows == set2.m_rows);
LL4("verify set1 count=" << count() << " vs set2 count=" << set2.count());
@@ -2932,7 +2993,7 @@ Set::verify(Par par, const Set& set2) const
if (exist(i) != set2.exist(i)) {
ok = false;
} else if (exist(i)) {
- if (dbrow(i).verify(par, set2.dbrow(i)) != 0)
+ if (dbrow(i).verify(par, set2.dbrow(i), pkonly) != 0)
ok = false;
}
if (! ok) {
@@ -3490,7 +3551,7 @@ pkread(Par par)
con.closeTransaction();
}
if (par.m_verify)
- CHK(set1.verify(par, set2) == 0);
+ CHK(set1.verify(par, set2, false) == 0);
return 0;
}
@@ -3657,7 +3718,7 @@ hashindexread(Par par, const ITab& itab)
con.closeTransaction();
}
if (par.m_verify)
- CHK(set1.verify(par, set2) == 0);
+ CHK(set1.verify(par, set2, false) == 0);
return 0;
}
@@ -3698,7 +3759,7 @@ scanreadtable(Par par)
}
con.closeTransaction();
if (par.m_verify)
- CHK(set1.verify(par, set2) == 0);
+ CHK(set1.verify(par, set2, false) == 0);
LL3("scanread " << tab.m_name << " done rows=" << n);
return 0;
}
@@ -3730,6 +3791,23 @@ scanreadtablefast(Par par, unsigned countcheck)
return 0;
}
+// try to get interesting bounds
+static void
+calcscanbounds(Par par, const ITab& itab, BSet& bset, const Set& set, Set& set1)
+{
+ while (true) {
+ bset.calc(par);
+ bset.filter(par, set, set1);
+ unsigned n = set1.count();
+ // prefer proper subset
+ if (0 < n && n < set.m_rows)
+ break;
+ if (urandom(5) == 0)
+ break;
+ set1.reset();
+ }
+}
+
static int
scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
{
@@ -3738,21 +3816,11 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
const Set& set = par.set();
Set set1(tab, set.m_rows);
if (calc) {
- while (true) {
- bset.calc(par);
- bset.filter(par, set, set1);
- unsigned n = set1.count();
- // prefer proper subset
- if (0 < n && n < set.m_rows)
- break;
- if (urandom(3) == 0)
- break;
- set1.reset();
- }
+ calcscanbounds(par, itab, bset, set, set1);
} else {
bset.filter(par, set, set1);
}
- LL3("scanread " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " verify=" << par.m_verify << " ordered=" << par.m_ordered << " descending=" << par.m_descending);
+ LL3("scanread " << itab.m_name << " " << bset << " lockmode=" << par.m_lockmode << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
Set set2(tab, set.m_rows);
CHK(con.startTransaction() == 0);
CHK(con.getNdbIndexScanOperation(itab, tab) == 0);
@@ -3780,7 +3848,7 @@ scanreadindex(Par par, const ITab& itab, BSet& bset, bool calc)
}
con.closeTransaction();
if (par.m_verify) {
- CHK(set1.verify(par, set2) == 0);
+ CHK(set1.verify(par, set2, false) == 0);
if (par.m_ordered)
CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
}
@@ -3825,17 +3893,7 @@ scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
const Set& set = par.set();
Set set1(tab, set.m_rows);
if (calc) {
- while (true) {
- bset.calc(par);
- bset.filter(par, set, set1);
- unsigned n = set1.count();
- // prefer proper subset
- if (0 < n && n < set.m_rows)
- break;
- if (urandom(3) == 0)
- break;
- set1.reset();
- }
+ calcscanbounds(par, itab, bset, set, set1);
} else {
bset.filter(par, set, set1);
}
@@ -3867,7 +3925,7 @@ scanreadfilter(Par par, const ITab& itab, BSet& bset, bool calc)
}
con.closeTransaction();
if (par.m_verify) {
- CHK(set1.verify(par, set2) == 0);
+ CHK(set1.verify(par, set2, false) == 0);
}
LL3("scanfilter " << itab.m_name << " done rows=" << n);
return 0;
@@ -3877,7 +3935,7 @@ static int
scanreadindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
- for (unsigned i = 0; i < par.m_subsubloop; i++) {
+ for (unsigned i = 0; i < par.m_subloop; i++) {
if (itab.m_type == ITab::OrderedIndex) {
BSet bset(tab, itab, par.m_rows);
CHK(scanreadfilter(par, itab, bset, true) == 0);
@@ -4068,12 +4126,19 @@ out:
}
static int
-scanupdateindex(Par par, const ITab& itab, const BSet& bset)
+scanupdateindex(Par par, const ITab& itab, BSet& bset, bool calc)
{
Con& con = par.con();
const Tab& tab = par.tab();
Set& set = par.set();
- LL3("scan update " << itab.m_name);
+ // expected
+ Set set1(tab, set.m_rows);
+ if (calc) {
+ calcscanbounds(par, itab, bset, set, set1);
+ } else {
+ bset.filter(par, set, set1);
+ }
+ LL3("scan update " << itab.m_name << " " << bset << " expect=" << set1.count() << " ordered=" << par.m_ordered << " descending=" << par.m_descending << " verify=" << par.m_verify);
Set set2(tab, set.m_rows);
par.m_lockmode = NdbOperation::LM_Exclusive;
CHK(con.startTransaction() == 0);
@@ -4117,7 +4182,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
Par par2 = par;
par2.m_con = &con2;
set.dbsave(i);
- set.calc(par, i);
+ set.calc(par, i, ! par.m_noindexkeyupdate ? 0 : itab.m_colmask);
CHKTRY(set.setrow(par2, i) == 0, set.unlock());
LL4("scan update " << itab.m_name << ": " << row);
lst.push(i);
@@ -4131,6 +4196,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
goto out;
}
con2.closeTransaction();
+ LL4("scanupdateindex: committed batch [at 1]");
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
@@ -4148,6 +4214,7 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
goto out;
}
con2.closeTransaction();
+ LL4("scanupdateindex: committed batch [at 2]");
set.lock();
set.notpending(lst);
set.dbdiscard(lst);
@@ -4160,6 +4227,11 @@ scanupdateindex(Par par, const ITab& itab, const BSet& bset)
}
out:
con2.closeTransaction();
+ if (par.m_verify) {
+ CHK(set1.verify(par, set2, true) == 0);
+ if (par.m_ordered)
+ CHK(set2.verifyorder(par, itab, par.m_descending) == 0);
+ }
LL3("scan update " << itab.m_name << " rows updated=" << count);
con.closeTransaction();
return 0;
@@ -4169,11 +4241,10 @@ static int
scanupdateindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
- for (unsigned i = 0; i < par.m_subsubloop; i++) {
+ for (unsigned i = 0; i < par.m_subloop; i++) {
if (itab.m_type == ITab::OrderedIndex) {
BSet bset(tab, itab, par.m_rows);
- bset.calc(par);
- CHK(scanupdateindex(par, itab, bset) == 0);
+ CHK(scanupdateindex(par, itab, bset, true) == 0);
} else {
CHK(hashindexupdate(par, itab) == 0);
}
@@ -4205,22 +4276,6 @@ scanupdateall(Par par)
// medium level routines
static int
-readverify(Par par)
-{
- if (par.m_noverify)
- return 0;
- par.m_verify = true;
- if (par.m_abortpct != 0) {
- LL2("skip verify in this version"); // implement in 5.0 version
- par.m_verify = false;
- }
- par.m_lockmode = NdbOperation::LM_CommittedRead;
- CHK(pkread(par) == 0);
- CHK(scanreadall(par) == 0);
- return 0;
-}
-
-static int
readverifyfull(Par par)
{
if (par.m_noverify)
@@ -4277,7 +4332,7 @@ pkops(Par par)
{
const Tab& tab = par.tab();
par.m_randomkey = true;
- for (unsigned i = 0; i < par.m_subsubloop; i++) {
+ for (unsigned i = 0; i < par.m_subloop; i++) {
unsigned j = 0;
while (j < tab.m_itabs) {
if (tab.m_itab[j] != 0) {
@@ -4377,6 +4432,33 @@ mixedoperations(Par par)
}
static int
+parallelorderedupdate(Par par)
+{
+ const Tab& tab = par.tab();
+ unsigned k = 0;
+ for (unsigned i = 0; i < tab.m_itabs; i++) {
+ if (tab.m_itab[i] == 0)
+ continue;
+ const ITab& itab = *tab.m_itab[i];
+ if (itab.m_type != ITab::OrderedIndex)
+ continue;
+ // cannot sync threads yet except via subloop
+ if (k++ == par.m_slno % tab.m_orderedindexes) {
+ LL3("parallelorderedupdate: " << itab.m_name);
+ par.m_noindexkeyupdate = true;
+ par.m_ordered = true;
+ par.m_descending = (par.m_slno != 0);
+ par.m_verify = true;
+ BSet bset(tab, itab, par.m_rows); // empty bounds
+ // prefer empty bounds
+ unsigned sel = urandom(10);
+ CHK(scanupdateindex(par, itab, bset, sel < 2) == 0);
+ }
+ }
+ return 0;
+}
+
+static int
pkupdateindexbuild(Par par)
{
if (par.m_no == 0) {
@@ -4578,7 +4660,7 @@ getthrno()
static int
runstep(Par par, const char* fname, TFunc func, unsigned mode)
{
- LL2(fname);
+ LL2("step: " << fname);
const int threads = (mode & ST ? 1 : par.m_threads);
int n;
for (n = 0; n < threads; n++) {
@@ -4604,7 +4686,12 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
return 0;
}
-#define RUNSTEP(par, func, mode) CHK(runstep(par, #func, func, mode) == 0)
+#define RUNSTEP(par, func, mode) \
+ CHK(runstep(par, #func, func, mode) == 0)
+
+#define SUBLOOP(par) \
+ "subloop: " << par.m_lno << "/" << par.m_currcase << "/" << \
+ par.m_tab->m_name << "/" << par.m_slno
static int
tbuild(Par par)
@@ -4613,6 +4700,7 @@ tbuild(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
if (par.m_slno % 3 == 0) {
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
@@ -4630,7 +4718,7 @@ tbuild(Par par)
RUNSTEP(par, invalidateindex, MT);
}
RUNSTEP(par, readverifyfull, MT);
- // leave last one alone e.g. to continue manually
+ // leave last one
if (par.m_slno + 1 < par.m_subloop) {
RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverifyfull, MT);
@@ -4651,7 +4739,7 @@ tindexscan(Par par)
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
- LL4("subloop " << par.m_slno);
+ LL1(SUBLOOP(par));
RUNSTEP(par, readverifyindex, MT);
}
return 0;
@@ -4667,6 +4755,7 @@ tpkops(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
RUNSTEP(par, pkops, MT);
LL2("rows=" << par.set().count());
RUNSTEP(par, readverifyfull, MT);
@@ -4683,13 +4772,14 @@ tpkopsread(Par par)
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
- RUNSTEP(par, readverify, ST);
+ RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
RUNSTEP(par, pkupdatescanread, MT);
- RUNSTEP(par, readverify, ST);
+ RUNSTEP(par, readverifyfull, MT);
}
RUNSTEP(par, pkdelete, MT);
- RUNSTEP(par, readverify, ST);
+ RUNSTEP(par, readverifyfull, MT);
return 0;
}
@@ -4702,10 +4792,11 @@ tmixedops(Par par)
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
- RUNSTEP(par, readverify, ST);
+ RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
RUNSTEP(par, mixedoperations, MT);
- RUNSTEP(par, readverify, ST);
+ RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
@@ -4718,9 +4809,10 @@ tbusybuild(Par par)
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
- RUNSTEP(par, readverify, ST);
+ RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST);
}
return 0;
@@ -4736,10 +4828,29 @@ trollback(Par par)
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
- RUNSTEP(par, readverify, ST);
+ RUNSTEP(par, readverifyfull, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
RUNSTEP(par, mixedoperations, MT);
- RUNSTEP(par, readverify, ST);
+ RUNSTEP(par, readverifyfull, MT);
+ }
+ return 0;
+}
+
+static int
+tparupdate(Par par)
+{
+ RUNSTEP(par, droptable, ST);
+ RUNSTEP(par, createtable, ST);
+ RUNSTEP(par, invalidatetable, MT);
+ RUNSTEP(par, pkinsert, MT);
+ RUNSTEP(par, createindex, ST);
+ RUNSTEP(par, invalidateindex, MT);
+ RUNSTEP(par, readverifyfull, MT);
+ for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
+ RUNSTEP(par, parallelorderedupdate, MT);
+ RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
@@ -4752,6 +4863,7 @@ ttimebuild(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, createindex, ST);
@@ -4771,6 +4883,7 @@ ttimemaint(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, pkupdate, MT);
@@ -4800,6 +4913,7 @@ ttimescan(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
@@ -4826,6 +4940,7 @@ ttimepkread(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
+ LL1(SUBLOOP(par));
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
@@ -4867,6 +4982,7 @@ tcaselist[] = {
TCase("e", tmixedops, "pk operations and scan operations"),
TCase("f", tbusybuild, "pk operations and index build"),
TCase("g", trollback, "operations with random rollbacks"),
+ TCase("h", tparupdate, "parallel ordered update (bug20446)"),
TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"),
@@ -4924,17 +5040,16 @@ printtables()
static int
runtest(Par par)
{
- LL1("start");
if (par.m_seed == -1) {
// good enough for daily run
- unsigned short seed = (getpid() ^ time(0));
- LL1("random seed: " << seed);
+ unsigned short seed = (unsigned short)getpid();
+ LL0("random seed: " << seed);
srandom((unsigned)seed);
} else if (par.m_seed != 0) {
- LL1("random seed: " << par.m_seed);
+ LL0("random seed: " << par.m_seed);
srandom(par.m_seed);
} else {
- LL1("random seed: loop number");
+ LL0("random seed: loop number");
}
// cs
assert(par.m_csname != 0);
@@ -4959,22 +5074,25 @@ runtest(Par par)
assert(thr.m_thread != 0);
}
for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
- LL1("loop " << par.m_lno);
- if (par.m_seed == 0)
+ LL1("loop: " << par.m_lno);
+ if (par.m_seed == 0) {
+ LL1("random seed: " << par.m_lno);
srandom(par.m_lno);
+ }
for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
continue;
+ sprintf(par.m_currcase, "%c", tcase.m_name[0]);
makebuiltintables(par);
- LL1("case " << tcase.m_name << " - " << tcase.m_desc);
+ LL1("case: " << par.m_lno << "/" << tcase.m_name << " - " << tcase.m_desc);
for (unsigned j = 0; j < tabcount; j++) {
if (tablist[j] == 0)
continue;
const Tab& tab = *tablist[j];
par.m_tab = &tab;
par.m_set = new Set(tab, par.m_totrows);
- LL1("table " << tab.m_name);
+ LL1("table: " << par.m_lno << "/" << tcase.m_name << "/" << tab.m_name);
CHK(tcase.m_func(par) == 0);
delete par.m_set;
par.m_set = 0;
@@ -4993,15 +5111,21 @@ runtest(Par par)
delete [] g_thrlist;
g_thrlist = 0;
con.disconnect();
- LL1("done");
return 0;
}
-NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
+static const char* g_progname = "testOIBasic";
+
+int
+main(int argc, char** argv)
{
ndb_init();
- if (ndbout_mutex == NULL)
- ndbout_mutex = NdbMutex_Create();
+ unsigned i;
+ ndbout << g_progname;
+ for (i = 1; i < argc; i++)
+ ndbout << " " << argv[i];
+ ndbout << endl;
+ ndbout_mutex = NdbMutex_Create();
while (++argv, --argc > 0) {
const char* arg = argv[0];
if (*arg != '-') {
@@ -5111,6 +5235,12 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
continue;
}
}
+ if (strcmp(arg, "-scanbatch") == 0) {
+ if (++argv, --argc > 0) {
+ g_opt.m_scanbatch = atoi(argv[0]);
+ continue;
+ }
+ }
if (strcmp(arg, "-scanpar") == 0) {
if (++argv, --argc > 0) {
g_opt.m_scanpar = atoi(argv[0]);
diff --git a/storage/ndb/test/ndbapi/testTimeout.cpp b/storage/ndb/test/ndbapi/testTimeout.cpp
index 36fb34a50e2..e719cdf03e9 100644
--- a/storage/ndb/test/ndbapi/testTimeout.cpp
+++ b/storage/ndb/test/ndbapi/testTimeout.cpp
@@ -388,6 +388,45 @@ int runBuddyTransNoTimeout(NDBT_Context* ctx, NDBT_Step* step){
return result;
}
+int runBuddyTransTimeout(NDBT_Context* ctx, NDBT_Step* step){
+ int result = NDBT_OK;
+ int loops = ctx->getNumLoops();
+ int records = ctx->getNumRecords();
+ int stepNo = step->getStepNo();
+ ndbout << "TransactionInactiveTimeout="<< TIMEOUT <<endl;
+
+ HugoOperations hugoOps(*ctx->getTab());
+ Ndb* pNdb = GETNDB(step);
+
+ for (int l = 1; l < loops && result == NDBT_OK; l++){
+
+ NdbTransaction* pTrans = 0;
+ do{
+ pTrans = pNdb->startTransaction();
+ NdbScanOperation* pOp = pTrans->getNdbScanOperation(ctx->getTab());
+ CHECK(pOp->readTuples(NdbOperation::LM_Read, 0, 0, 1) == 0);
+ CHECK(pTrans->execute(NoCommit) == 0);
+
+ int sleep = 2 * TIMEOUT;
+ ndbout << "Sleeping for " << sleep << " milliseconds" << endl;
+ NdbSleep_MilliSleep(sleep);
+
+ int res = 0;
+ while((res = pOp->nextResult()) == 0);
+ ndbout_c("res: %d", res);
+ CHECK(res == -1);
+
+ } while(false);
+
+ if (pTrans)
+ {
+ pTrans->close();
+ }
+ }
+
+ return result;
+}
+
int
runError4012(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
@@ -495,6 +534,15 @@ TESTCASE("BuddyTransNoTimeout5",
FINALIZER(resetTransactionTimeout);
FINALIZER(runClearTable);
}
+TESTCASE("BuddyTransTimeout1",
+ "Start a scan and check that it gets aborted"){
+ INITIALIZER(runLoadTable);
+ INITIALIZER(setTransactionTimeout);
+ STEPS(runBuddyTransTimeout, 1);
+ FINALIZER(resetTransactionTimeout);
+ FINALIZER(runClearTable);
+}
+#if 0
TESTCASE("Error4012", ""){
TC_PROPERTY("TransactionDeadlockTimeout", 120000);
INITIALIZER(runLoadTable);
@@ -503,7 +551,7 @@ TESTCASE("Error4012", ""){
STEPS(runError4012, 2);
FINALIZER(runClearTable);
}
-
+#endif
NDBT_TESTSUITE_END(testTimeout);
int main(int argc, const char** argv){
diff --git a/storage/ndb/test/run-test/daily-basic-tests.txt b/storage/ndb/test/run-test/daily-basic-tests.txt
index ee669ba5f1c..f9551c8d526 100644
--- a/storage/ndb/test/run-test/daily-basic-tests.txt
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt
@@ -646,7 +646,15 @@ args:
max-time: 5000
cmd: testOIBasic
-args:
+args: -case abcdefz
+
+max-time: 2000
+cmd: testOIBasic
+args: -case gz
+
+max-time: 2000
+cmd: testOIBasic
+args: -case hz
max-time: 2500
cmd: testBitfield
diff --git a/storage/ndb/tools/delete_all.cpp b/storage/ndb/tools/delete_all.cpp
index fcf9b425bd0..95c3c7be0ce 100644
--- a/storage/ndb/tools/delete_all.cpp
+++ b/storage/ndb/tools/delete_all.cpp
@@ -27,8 +27,12 @@ static int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab,
NDB_STD_OPTS_VARS;
+const char *load_default_groups[]= { "mysql_cluster",0 };
+
static const char* _dbname = "TEST_DB";
static my_bool _transactional = false;
+static my_bool _tupscan = 0;
+static my_bool _diskscan = 0;
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_desc"),
@@ -38,6 +42,12 @@ static struct my_option my_long_options[] =
{ "transactional", 't', "Single transaction (may run out of operations)",
(gptr*) &_transactional, (gptr*) &_transactional, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+ { "tupscan", 999, "Run tupscan",
+ (gptr*) &_tupscan, (gptr*) &_tupscan, 0,
+ GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+ { "diskscan", 999, "Run diskcan",
+ (gptr*) &_diskscan, (gptr*) &_diskscan, 0,
+ GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
static void usage()
@@ -46,13 +56,14 @@ static void usage()
"tabname\n"\
"This program will delete all records in the specified table using scan delete.\n";
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
- const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
@@ -139,8 +150,11 @@ int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab,
goto failed;
}
+ int flags = 0;
+ flags |= _tupscan ? NdbScanOperation::SF_TupScan : 0;
+ flags |= _diskscan ? NdbScanOperation::SF_DiskScan : 0;
if( pOp->readTuples(NdbOperation::LM_Exclusive,
- NdbScanOperation::SF_TupScan, par) ) {
+ flags, par) ) {
goto failed;
}
diff --git a/storage/ndb/tools/desc.cpp b/storage/ndb/tools/desc.cpp
index 49f188d12c0..c042f745d9d 100644
--- a/storage/ndb/tools/desc.cpp
+++ b/storage/ndb/tools/desc.cpp
@@ -32,6 +32,9 @@ NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB";
static int _unqualified = 0;
static int _partinfo = 0;
+
+const char *load_default_groups[]= { "mysql_cluster",0 };
+
static int _retries = 0;
static struct my_option my_long_options[] =
{
@@ -57,6 +60,8 @@ static void usage()
"This program list all properties of table(s) in NDB Cluster.\n"\
" ex: desc T1 T2 T4\n";
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
@@ -65,7 +70,6 @@ static void print_part_info(Ndb* pNdb, NDBT_Table* pTab);
int main(int argc, char** argv){
NDB_INIT(argv[0]);
- const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
diff --git a/storage/ndb/tools/drop_index.cpp b/storage/ndb/tools/drop_index.cpp
index 24116f22784..aa207212dbe 100644
--- a/storage/ndb/tools/drop_index.cpp
+++ b/storage/ndb/tools/drop_index.cpp
@@ -24,6 +24,9 @@
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB";
+
+const char *load_default_groups[]= { "mysql_cluster",0 };
+
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_desc"),
@@ -38,13 +41,14 @@ static void usage()
"[<table> <index>]+\n"\
"This program will drop index(es) in Ndb\n";
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
- const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
diff --git a/storage/ndb/tools/drop_tab.cpp b/storage/ndb/tools/drop_tab.cpp
index 991e1505486..d14c60a2c6d 100644
--- a/storage/ndb/tools/drop_tab.cpp
+++ b/storage/ndb/tools/drop_tab.cpp
@@ -24,6 +24,9 @@
NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB";
+
+const char *load_default_groups[]= { "mysql_cluster",0 };
+
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_desc"),
@@ -37,14 +40,15 @@ static void usage()
char desc[] =
"tabname\n"\
"This program will drop one table in Ndb\n";
- ndb_std_print_version();
+ ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
- const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
diff --git a/storage/ndb/tools/listTables.cpp b/storage/ndb/tools/listTables.cpp
index 0e32d802d2d..3c891041879 100644
--- a/storage/ndb/tools/listTables.cpp
+++ b/storage/ndb/tools/listTables.cpp
@@ -34,6 +34,8 @@ static int _unqualified = 0;
static int _parsable = 0;
static int show_temp_status = 0;
+const char *load_default_groups[]= { "mysql_cluster",0 };
+
static void
fatal(char const* fmt, ...)
{
@@ -284,6 +286,8 @@ static void usage()
"To show all indexes for a table write table name as final argument\n"\
" ex: ndb_show_tables T1\n";
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
@@ -291,7 +295,6 @@ static void usage()
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char* _tabname;
- const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
diff --git a/storage/ndb/tools/ndb_condig.cpp b/storage/ndb/tools/ndb_condig.cpp
index 865c35b9042..8b862391c8e 100644
--- a/storage/ndb/tools/ndb_condig.cpp
+++ b/storage/ndb/tools/ndb_condig.cpp
@@ -19,6 +19,8 @@
*/
#include <ndb_global.h>
+#include <ndb_opts.h>
+
#include <my_sys.h>
#include <my_getopt.h>
#include <mysql_version.h>
@@ -29,6 +31,7 @@
#include <mgmapi.h>
#include <mgmapi_configuration.hpp>
#include <ConfigInfo.hpp>
+#include <NdbAutoPtr.hpp>
static int g_verbose = 0;
static int try_reconnect = 3;
@@ -45,34 +48,17 @@ static const char * g_row_delimiter=" ";
static const char * g_config_file = 0;
static int g_mycnf = 0;
-int g_print_full_config, opt_ndb_shm;
-my_bool opt_core;
+const char *load_default_groups[]= { "mysql_cluster",0 };
-typedef ndb_mgm_configuration_iterator Iter;
+NDB_STD_OPTS_VARS;
-static void ndb_std_print_version()
-{
- printf("MySQL distrib %s, for %s (%s)\n",
- MYSQL_SERVER_VERSION,SYSTEM_TYPE,MACHINE_TYPE);
-}
+int g_print_full_config;
+
+typedef ndb_mgm_configuration_iterator Iter;
static struct my_option my_long_options[] =
{
- { "usage", '?', "Display this help and exit.",
- 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
- { "help", '?', "Display this help and exit.",
- 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
- { "version", 'V', "Output version information and exit.", 0, 0, 0,
- GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
- { "ndb-connectstring", 256,
- "Set connect string for connecting to ndb_mgmd. "
- "Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". "
- "Overrides specifying entries in NDB_CONNECTSTRING and my.cnf",
- (gptr*) &g_connectstring, (gptr*) &g_connectstring,
- 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
- { "ndb-shm", 256, "Print nodes",
- (gptr*) &opt_ndb_shm, (gptr*) &opt_ndb_shm,
- 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
+ NDB_STD_OPTS("ndb_config"),
{ "nodes", 256, "Print nodes",
(gptr*) &g_nodes, (gptr*) &g_nodes,
0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
@@ -114,24 +100,11 @@ static void usage()
char desc[] =
"This program will retreive config options for a ndb cluster\n";
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
-static my_bool
-ndb_std_get_one_option(int optid,
- const struct my_option *opt __attribute__((unused)),
- char *argument)
-{
- switch (optid) {
- case 'V':
- ndb_std_print_version();
- exit(0);
- case '?':
- usage();
- exit(0);
- }
- return 0;
-}
/**
* Match/Apply framework
@@ -176,7 +149,6 @@ static ndb_mgm_configuration* load_configuration();
int
main(int argc, char** argv){
NDB_INIT(argv[0]);
- const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options,
@@ -408,28 +380,36 @@ HostMatch::eval(const Iter& iter)
if(iter.get(m_key, &valc) == 0)
{
- struct hostent *h1, *h2;
+ struct hostent *h1, *h2, copy1;
+ char *addr1;
h1 = gethostbyname(m_value.c_str());
if (h1 == NULL) {
return 0;
}
+ // gethostbyname returns a pointer to a static structure
+ // so we need to copy the results before doing the next call
+ memcpy(&copy1, h1, sizeof(struct hostent));
+ addr1 = (char *)malloc(copy1.h_length);
+ NdbAutoPtr<char> tmp_aptr(addr1);
+ memcpy(addr1, h1->h_addr, copy1.h_length);
+
h2 = gethostbyname(valc);
if (h2 == NULL) {
return 0;
}
- if (h1->h_addrtype != h2->h_addrtype) {
+ if (copy1.h_addrtype != h2->h_addrtype) {
return 0;
}
- if (h1->h_length != h2->h_length)
+ if (copy1.h_length != h2->h_length)
{
return 0;
}
- return 0 == memcmp(h1->h_addr, h2->h_addr, h1->h_length);
+ return 0 == memcmp(addr1, h2->h_addr, copy1.h_length);
}
return 0;
diff --git a/storage/ndb/tools/restore/restore_main.cpp b/storage/ndb/tools/restore/restore_main.cpp
index 8070dba9bf6..c6947f3bf01 100644
--- a/storage/ndb/tools/restore/restore_main.cpp
+++ b/storage/ndb/tools/restore/restore_main.cpp
@@ -60,6 +60,8 @@ static int _restore_meta = 0;
static int _no_restore_disk = 0;
BaseString g_options("ndb_restore");
+const char *load_default_groups[]= { "mysql_cluster","ndb_restore",0 };
+
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_restore"),
@@ -240,6 +242,8 @@ static void usage()
{
short_usage_sub();
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
@@ -686,10 +690,9 @@ main(int argc, char** argv)
clearConsumers();
ndbout_c("\nRestore successful, but encountered temporary error, "
"please look at configuration.");
- return NDBT_ProgramExit(NDBT_TEMPORARY);
}
}
-
+
clearConsumers();
return NDBT_ProgramExit(NDBT_OK);
} // main
diff --git a/storage/ndb/tools/select_all.cpp b/storage/ndb/tools/select_all.cpp
index fecb55cf734..87a083dad90 100644
--- a/storage/ndb/tools/select_all.cpp
+++ b/storage/ndb/tools/select_all.cpp
@@ -43,6 +43,8 @@ static const char* _delimiter = "\t";
static int _unqualified, _header, _parallelism, _useHexFormat, _lock,
_order, _descending;
+const char *load_default_groups[]= { "mysql_cluster",0 };
+
static int _tup = 0;
static int _dumpDisk = 0;
static int use_rowid = 0;
@@ -103,13 +105,14 @@ static void usage()
"It can also be used to dump the content of a table to file \n"\
" ex: select_all --no-header --delimiter=';' T4 > T4.data\n";
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
- const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
const char* _tabname;
int ho_error;
diff --git a/storage/ndb/tools/select_count.cpp b/storage/ndb/tools/select_count.cpp
index 6fa3c77f15a..0d5d4684878 100644
--- a/storage/ndb/tools/select_count.cpp
+++ b/storage/ndb/tools/select_count.cpp
@@ -37,6 +37,9 @@ NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB";
static int _parallelism = 240;
static int _lock = 0;
+
+const char *load_default_groups[]= { "mysql_cluster",0 };
+
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_desc"),
@@ -57,13 +60,14 @@ static void usage()
"tabname1 ... tabnameN\n"\
"This program will count the number of records in tables\n";
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
- const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
int ho_error;
#ifndef DBUG_OFF
diff --git a/storage/ndb/tools/waiter.cpp b/storage/ndb/tools/waiter.cpp
index e3d8733b0ed..c34463fba0e 100644
--- a/storage/ndb/tools/waiter.cpp
+++ b/storage/ndb/tools/waiter.cpp
@@ -38,6 +38,9 @@ NDB_STD_OPTS_VARS;
static int _no_contact = 0;
static int _not_started = 0;
static int _timeout = 120;
+
+const char *load_default_groups[]= { "mysql_cluster",0 };
+
static struct my_option my_long_options[] =
{
NDB_STD_OPTS("ndb_desc"),
@@ -56,13 +59,14 @@ static struct my_option my_long_options[] =
static void usage()
{
ndb_std_print_version();
+ print_defaults(MYSQL_CONFIG_NAME,load_default_groups);
+ puts("");
my_print_help(my_long_options);
my_print_variables(my_long_options);
}
int main(int argc, char** argv){
NDB_INIT(argv[0]);
- const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv);
const char* _hostName = NULL;
int ho_error;