diff options
47 files changed, 1101 insertions, 250 deletions
diff --git a/ndb/include/kernel/signaldata/CreateEvnt.hpp b/ndb/include/kernel/signaldata/CreateEvnt.hpp index 72dab96f8b6..8712ce8890c 100644 --- a/ndb/include/kernel/signaldata/CreateEvnt.hpp +++ b/ndb/include/kernel/signaldata/CreateEvnt.hpp @@ -368,7 +368,6 @@ struct CreateEvntRef { EventNameTooLong = 4708, EventNameExists = 746, EventNotFound = 4731, - AttributeNotStored = 4245, AttributeNullable = 4246, BadRequestType = 4247, InvalidName = 4248, diff --git a/ndb/include/kernel/signaldata/CreateIndx.hpp b/ndb/include/kernel/signaldata/CreateIndx.hpp index bb099533301..a9dc653f349 100644 --- a/ndb/include/kernel/signaldata/CreateIndx.hpp +++ b/ndb/include/kernel/signaldata/CreateIndx.hpp @@ -198,7 +198,6 @@ public: IndexNameTooLong = 4241, TooManyIndexes = 4242, IndexExists = 4244, - AttributeNotStored = 4245, AttributeNullable = 4246, BadRequestType = 4247, InvalidName = 4248, diff --git a/ndb/include/kernel/signaldata/SumaImpl.hpp b/ndb/include/kernel/signaldata/SumaImpl.hpp index 89ade067dcd..75fb65e1ad2 100644 --- a/ndb/include/kernel/signaldata/SumaImpl.hpp +++ b/ndb/include/kernel/signaldata/SumaImpl.hpp @@ -592,11 +592,11 @@ public: Uint32 subscriptionId; Uint32 subscriptionKey; - Uint32 err; union { // Haven't decide what to call it Uint32 senderData; Uint32 subscriberData; }; + Uint32 err; }; class SumaStartMe { diff --git a/ndb/include/mgmapi/mgmapi.h b/ndb/include/mgmapi/mgmapi.h index 17c227853fe..dca88e4950e 100644 --- a/ndb/include/mgmapi/mgmapi.h +++ b/ndb/include/mgmapi/mgmapi.h @@ -140,7 +140,6 @@ */ #include <ndb_types.h> -#include <NdbTCP.h> #include "ndb_logevent.h" #include "mgmapi_config_parameters.h" @@ -987,15 +986,9 @@ extern "C" { /** - * Convert connection to transporter - * @param handle NDB management handle. - * - * @return socket - * - * @note the socket is now able to be used as a transporter connection + * Get the node id of the mgm server we're connected to */ - NDB_SOCKET_TYPE ndb_mgm_convert_to_transporter(NdbMgmHandle handle); - + Uint32 ndb_mgm_get_mgmd_nodeid(NdbMgmHandle handle); /** * Config iterator diff --git a/ndb/include/mgmapi/mgmapi_debug.h b/ndb/include/mgmapi/mgmapi_debug.h index 32a89535456..e86d9d4b768 100644 --- a/ndb/include/mgmapi/mgmapi_debug.h +++ b/ndb/include/mgmapi/mgmapi_debug.h @@ -132,42 +132,6 @@ extern "C" { const char * value, struct ndb_mgm_reply* reply); - /** - * Set an integer parameter for a connection - * - * @param handle the NDB management handle. - * @param node1 the node1 id - * @param node2 the node2 id - * @param param the parameter (e.g. CFG_CONNECTION_SERVER_PORT) - * @param value what to set it to - * @param reply from ndb_mgmd - */ - int ndb_mgm_set_connection_int_parameter(NdbMgmHandle handle, - int node1, - int node2, - int param, - int value, - struct ndb_mgm_reply* reply); - - /** - * Get an integer parameter for a connection - * - * @param handle the NDB management handle. - * @param node1 the node1 id - * @param node2 the node2 id - * @param param the parameter (e.g. CFG_CONNECTION_SERVER_PORT) - * @param value where to store the retreived value. In the case of - * error, value is not changed. - * @param reply from ndb_mgmd - * @return 0 on success. < 0 on error. - */ - int ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, - int node1, - int node2, - int param, - int *value, - struct ndb_mgm_reply* reply); - #ifdef __cplusplus } #endif diff --git a/ndb/include/mgmcommon/ConfigRetriever.hpp b/ndb/include/mgmcommon/ConfigRetriever.hpp index b91bb362837..95d257dea23 100644 --- a/ndb/include/mgmcommon/ConfigRetriever.hpp +++ b/ndb/include/mgmcommon/ConfigRetriever.hpp @@ -76,6 +76,7 @@ public: const char *get_mgmd_host() const; const char *get_connectstring(char *buf, int buf_sz) const; NdbMgmHandle get_mgmHandle() { return m_handle; }; + NdbMgmHandle* get_mgmHandlePtr() { return &m_handle; }; Uint32 get_configuration_nodeid() const; private: diff --git a/ndb/include/ndbapi/NdbRecAttr.hpp b/ndb/include/ndbapi/NdbRecAttr.hpp index d1c29adef4f..50de4f3277e 100644 --- a/ndb/include/ndbapi/NdbRecAttr.hpp +++ b/ndb/include/ndbapi/NdbRecAttr.hpp @@ -252,6 +252,7 @@ private: Uint32 attrId() const; /* Get attribute id */ bool setNULL(); /* Set NULL indicator */ + void setUNDEFINED(); /* Set UNDEFINED indicator */ bool receive_data(const Uint32*, Uint32); void release(); /* Release memory if allocated */ @@ -423,6 +424,13 @@ NdbRecAttr::setNULL() } inline +void +NdbRecAttr::setUNDEFINED() +{ + theNULLind = -1; +} + +inline int NdbRecAttr::isNULL() const { diff --git a/ndb/include/transporter/TransporterRegistry.hpp b/ndb/include/transporter/TransporterRegistry.hpp index a31fa1d5ce2..363cdabe10a 100644 --- a/ndb/include/transporter/TransporterRegistry.hpp +++ b/ndb/include/transporter/TransporterRegistry.hpp @@ -116,12 +116,21 @@ public: */ bool connect_server(NDB_SOCKET_TYPE sockfd); + bool connect_client(NdbMgmHandle *h); + /** - * use a mgmd connection to connect as a transporter + * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter + * and returns the socket. */ NDB_SOCKET_TYPE connect_ndb_mgmd(SocketClient *sc); /** + * Given a connected NdbMgmHandle, turns it into a transporter + * and returns the socket. + */ + NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle *h); + + /** * Remove all transporters */ void removeAll(); diff --git a/ndb/ndbapi-examples/ndbapi_async_example/Makefile b/ndb/ndbapi-examples/ndbapi_async_example/Makefile index 4df9367fc29..55e4a13343f 100644 --- a/ndb/ndbapi-examples/ndbapi_async_example/Makefile +++ b/ndb/ndbapi-examples/ndbapi_async_example/Makefile @@ -10,11 +10,11 @@ TOP_SRCDIR = ../../.. INCLUDE_DIR = $(TOP_SRCDIR) LIB_DIR = -L$(TOP_SRCDIR)/ndb/src/.libs \ -L$(TOP_SRCDIR)/libmysql_r/.libs \ - -L$(TOP_SRCDIR)/mysys + -L$(TOP_SRCDIR)/mysys -L$(TOP_SRCDIR)/strings SYS_LIB = $(TARGET): $(OBJS) - $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lz $(SYS_LIB) -o $(TARGET) + $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(TARGET).o: $(SRCS) $(CXX) $(CFLAGS) -I$(INCLUDE_DIR)/include -I$(INCLUDE_DIR)/extra -I$(INCLUDE_DIR)/ndb/include -I$(INCLUDE_DIR)/ndb/include/ndbapi $(SRCS) diff --git a/ndb/ndbapi-examples/ndbapi_async_example1/Makefile b/ndb/ndbapi-examples/ndbapi_async_example1/Makefile index b6fc31a00e5..7f6ea0b4d25 100644 --- a/ndb/ndbapi-examples/ndbapi_async_example1/Makefile +++ b/ndb/ndbapi-examples/ndbapi_async_example1/Makefile @@ -8,11 +8,11 @@ LFLAGS = -Wall INCLUDE_DIR = ../../include LIB_DIR = -L../../src/.libs \ -L../../../libmysql_r/.libs \ - -L../../../mysys + -L../../../mysys -L../../../strings SYS_LIB = $(TARGET): $(OBJS) - $(CXX) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lz $(SYS_LIB) -o $(TARGET) + $(CXX) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(TARGET).o: $(SRCS) $(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi $(SRCS) diff --git a/ndb/ndbapi-examples/ndbapi_event_example/Makefile b/ndb/ndbapi-examples/ndbapi_event_example/Makefile index 07d244c9346..12e109c654f 100644 --- a/ndb/ndbapi-examples/ndbapi_event_example/Makefile +++ b/ndb/ndbapi-examples/ndbapi_event_example/Makefile @@ -10,11 +10,11 @@ TOP_SRCDIR = ../../.. INCLUDE_DIR = $(TOP_SRCDIR)/ndb/include LIB_DIR = -L$(TOP_SRCDIR)/ndb/src/.libs \ -L$(TOP_SRCDIR)/libmysql_r/.libs \ - -L$(TOP_SRCDIR)/mysys + -L$(TOP_SRCDIR)/mysys -L$(TOP_SRCDIR)/strings SYS_LIB = $(TARGET): $(OBJS) - $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lz $(SYS_LIB) -o $(TARGET) + $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(TARGET).o: $(SRCS) $(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi $(SRCS) diff --git a/ndb/ndbapi-examples/ndbapi_retries_example/Makefile b/ndb/ndbapi-examples/ndbapi_retries_example/Makefile index c7a8946cd9a..829a7009031 100644 --- a/ndb/ndbapi-examples/ndbapi_retries_example/Makefile +++ b/ndb/ndbapi-examples/ndbapi_retries_example/Makefile @@ -8,11 +8,11 @@ LFLAGS = -Wall INCLUDE_DIR = ../../include LIB_DIR = -L../../src/.libs \ -L../../../libmysql_r/.libs \ - -L../../../mysys + -L../../../mysys -L../../../strings SYS_LIB = $(TARGET): $(OBJS) - $(CXX) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lz $(SYS_LIB) -o $(TARGET) + $(CXX) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(TARGET).o: $(SRCS) $(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi $(SRCS) diff --git a/ndb/ndbapi-examples/ndbapi_scan_example/Makefile b/ndb/ndbapi-examples/ndbapi_scan_example/Makefile index c5883757e5e..31886b02bf1 100644 --- a/ndb/ndbapi-examples/ndbapi_scan_example/Makefile +++ b/ndb/ndbapi-examples/ndbapi_scan_example/Makefile @@ -10,11 +10,11 @@ TOP_SRCDIR = ../../.. INCLUDE_DIR = $(TOP_SRCDIR) LIB_DIR = -L$(TOP_SRCDIR)/ndb/src/.libs \ -L$(TOP_SRCDIR)/libmysql_r/.libs \ - -L$(TOP_SRCDIR)/mysys + -L$(TOP_SRCDIR)/mysys -L$(TOP_SRCDIR)/strings SYS_LIB = $(TARGET): $(OBJS) - $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lz $(SYS_LIB) -o $(TARGET) + $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(TARGET).o: $(SRCS) $(CXX) $(CFLAGS) -I$(INCLUDE_DIR)/include -I$(INCLUDE_DIR)/extra -I$(INCLUDE_DIR)/ndb/include -I$(INCLUDE_DIR)/ndb/include/ndbapi $(SRCS) diff --git a/ndb/ndbapi-examples/ndbapi_simple_example/Makefile b/ndb/ndbapi-examples/ndbapi_simple_example/Makefile index 99d4bfe68a6..0a59584fb66 100644 --- a/ndb/ndbapi-examples/ndbapi_simple_example/Makefile +++ b/ndb/ndbapi-examples/ndbapi_simple_example/Makefile @@ -10,11 +10,11 @@ TOP_SRCDIR = ../../.. INCLUDE_DIR = $(TOP_SRCDIR) LIB_DIR = -L$(TOP_SRCDIR)/ndb/src/.libs \ -L$(TOP_SRCDIR)/libmysql_r/.libs \ - -L$(TOP_SRCDIR)/mysys + -L$(TOP_SRCDIR)/mysys -L$(TOP_SRCDIR)/strings SYS_LIB = $(TARGET): $(OBJS) - $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lz $(SYS_LIB) -o $(TARGET) + $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(TARGET).o: $(SRCS) $(CXX) $(CFLAGS) -I$(INCLUDE_DIR)/include -I$(INCLUDE_DIR)/ndb/include -I$(INCLUDE_DIR)/ndb/include/ndbapi $(SRCS) diff --git a/ndb/ndbapi-examples/ndbapi_simple_index_example/Makefile b/ndb/ndbapi-examples/ndbapi_simple_index_example/Makefile index dc17ff0eeaa..d4356055935 100644 --- a/ndb/ndbapi-examples/ndbapi_simple_index_example/Makefile +++ b/ndb/ndbapi-examples/ndbapi_simple_index_example/Makefile @@ -10,11 +10,11 @@ TOP_SRCDIR = ../../.. INCLUDE_DIR = $(TOP_SRCDIR) LIB_DIR = -L$(TOP_SRCDIR)/ndb/src/.libs \ -L$(TOP_SRCDIR)/libmysql_r/.libs \ - -L$(TOP_SRCDIR)/mysys + -L$(TOP_SRCDIR)/mysys -L$(TOP_SRCDIR)/strings SYS_LIB = $(TARGET): $(OBJS) - $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lz $(SYS_LIB) -o $(TARGET) + $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(TARGET).o: $(SRCS) $(CXX) $(CFLAGS) -I$(INCLUDE_DIR)/include -I$(INCLUDE_DIR)/ndb/include -I$(INCLUDE_DIR)/ndb/include/ndbapi $(SRCS) diff --git a/ndb/src/common/transporter/Makefile.am b/ndb/src/common/transporter/Makefile.am index b902012e56d..4c277097a91 100644 --- a/ndb/src/common/transporter/Makefile.am +++ b/ndb/src/common/transporter/Makefile.am @@ -13,7 +13,7 @@ EXTRA_libtransporter_la_SOURCES = SHM_Transporter.cpp SHM_Transporter.unix.cpp S libtransporter_la_LIBADD = @ndb_transporter_opt_objs@ libtransporter_la_DEPENDENCIES = @ndb_transporter_opt_objs@ -INCLUDES_LOC = -I$(top_srcdir)/ndb/include/mgmapi -I$(top_srcdir)/ndb/include/debugger -I$(top_srcdir)/ndb/include/kernel -I$(top_srcdir)/ndb/include/transporter @NDB_SCI_INCLUDES@ +INCLUDES_LOC = -I$(top_srcdir)/ndb/include/mgmapi -I$(top_srcdir)/ndb/src/mgmapi -I$(top_srcdir)/ndb/include/debugger -I$(top_srcdir)/ndb/include/kernel -I$(top_srcdir)/ndb/include/transporter @NDB_SCI_INCLUDES@ include $(top_srcdir)/ndb/config/common.mk.am include $(top_srcdir)/ndb/config/type_util.mk.am diff --git a/ndb/src/common/transporter/Transporter.cpp b/ndb/src/common/transporter/Transporter.cpp index a888d98b832..86e9b8c8171 100644 --- a/ndb/src/common/transporter/Transporter.cpp +++ b/ndb/src/common/transporter/Transporter.cpp @@ -124,6 +124,15 @@ Transporter::connect_client() { else sockfd= m_socket_client->connect(); + connect_client(sockfd); +} + +bool +Transporter::connect_client(NDB_SOCKET_TYPE sockfd) { + + if(m_connected) + return true; + if (sockfd == NDB_INVALID_SOCKET) return false; diff --git a/ndb/src/common/transporter/Transporter.hpp b/ndb/src/common/transporter/Transporter.hpp index 5b25afa0d89..53414f1179d 100644 --- a/ndb/src/common/transporter/Transporter.hpp +++ b/ndb/src/common/transporter/Transporter.hpp @@ -44,6 +44,7 @@ public: * Use isConnected() to check status */ bool connect_client(); + bool connect_client(NDB_SOCKET_TYPE sockfd); bool connect_server(NDB_SOCKET_TYPE socket); /** diff --git a/ndb/src/common/transporter/TransporterRegistry.cpp b/ndb/src/common/transporter/TransporterRegistry.cpp index b8dd2d1f561..f331b1660c1 100644 --- a/ndb/src/common/transporter/TransporterRegistry.cpp +++ b/ndb/src/common/transporter/TransporterRegistry.cpp @@ -49,6 +49,7 @@ extern int g_ndb_shm_signum; #include <OutputStream.hpp> #include <mgmapi/mgmapi.h> +#include <mgmapi_internal.h> #include <mgmapi/mgmapi_debug.h> #include <EventLogger.hpp> @@ -1365,6 +1366,8 @@ TransporterRegistry::add_transporter_interface(NodeId remoteNodeId, bool TransporterRegistry::start_service(SocketServer& socket_server) { + struct ndb_mgm_reply mgm_reply; + DBUG_ENTER("TransporterRegistry::start_service"); if (m_transporter_interface.size() > 0 && !nodeIdSpecified) { @@ -1521,63 +1524,102 @@ TransporterRegistry::get_transporter(NodeId nodeId) { return theTransporters[nodeId]; } -NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc) +bool TransporterRegistry::connect_client(NdbMgmHandle *h) { - NdbMgmHandle h= ndb_mgm_create_handle(); - struct ndb_mgm_reply mgm_reply; + DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)"); - if ( h == NULL ) + Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h); + + if(!mgm_nodeid) { - return NDB_INVALID_SOCKET; + ndbout_c("%s: %d", __FILE__, __LINE__); + return false; } - - /** - * Set connectstring - */ + Transporter * t = theTransporters[mgm_nodeid]; + if (!t) { - char c[100]; - char *cs= &c[0]; - unsigned len= strlen(sc->get_server_name())+20; - if( len > sizeof(c) ) - { - /* - * server name is long. malloc enough for it and the port number - */ - cs= (char*)malloc(len*sizeof(char)); - if(!cs) - { - ndb_mgm_destroy_handle(&h); - return NDB_INVALID_SOCKET; - } - } - snprintf(cs,len,"%s:%u",sc->get_server_name(),sc->get_port()); - ndb_mgm_set_connectstring(h, cs); - if(cs != &c[0]) - free(cs); + ndbout_c("%s: %d", __FILE__, __LINE__); + return false; } + DBUG_RETURN(t->connect_client(connect_ndb_mgmd(h))); +} - if(ndb_mgm_connect(h, 0, 0, 0)<0) +/** + * Given a connected NdbMgmHandle, turns it into a transporter + * and returns the socket. + */ +NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h) +{ + struct ndb_mgm_reply mgm_reply; + + if ( h==NULL || *h == NULL ) { - ndb_mgm_destroy_handle(&h); + ndbout_c("%s: %d", __FILE__, __LINE__); return NDB_INVALID_SOCKET; } for(unsigned int i=0;i < m_transporter_interface.size();i++) - if (ndb_mgm_set_connection_int_parameter(h, + if (m_transporter_interface[i].m_s_service_port < 0 + && ndb_mgm_set_connection_int_parameter(*h, get_localNodeId(), m_transporter_interface[i].m_remote_nodeId, CFG_CONNECTION_SERVER_PORT, m_transporter_interface[i].m_s_service_port, &mgm_reply) < 0) { - ndb_mgm_destroy_handle(&h); + ndbout_c("Error: %s: %d", + ndb_mgm_get_latest_error_desc(*h), + ndb_mgm_get_latest_error(*h)); + ndbout_c("%s: %d", __FILE__, __LINE__); + ndb_mgm_destroy_handle(h); return NDB_INVALID_SOCKET; } + /** + * convert_to_transporter also disposes of the handle (i.e. we don't leak + * memory here. + */ NDB_SOCKET_TYPE sockfd= ndb_mgm_convert_to_transporter(h); if ( sockfd == NDB_INVALID_SOCKET) - ndb_mgm_destroy_handle(&h); + { + ndbout_c("Error: %s: %d", + ndb_mgm_get_latest_error_desc(*h), + ndb_mgm_get_latest_error(*h)); + ndbout_c("%s: %d", __FILE__, __LINE__); + ndb_mgm_destroy_handle(h); + } return sockfd; } +/** + * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter + * and returns the socket. + */ +NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc) +{ + NdbMgmHandle h= ndb_mgm_create_handle(); + + if ( h == NULL ) + { + return NDB_INVALID_SOCKET; + } + + /** + * Set connectstring + */ + { + BaseString cs; + cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port()); + ndb_mgm_set_connectstring(h, cs.c_str()); + } + + if(ndb_mgm_connect(h, 0, 0, 0)<0) + { + ndb_mgm_destroy_handle(&h); + return NDB_INVALID_SOCKET; + } + + return connect_ndb_mgmd(&h); +} + template class Vector<TransporterRegistry::Transporter_interface>; diff --git a/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp b/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp index d6960ce154e..ecaeadff47a 100644 --- a/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp +++ b/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp @@ -35,6 +35,9 @@ void Qmgr::initData() Uint32 hbDBAPI = 500; setHbApiDelay(hbDBAPI); + + c_connectedNodes.clear(); + c_connectedNodes.set(getOwnNodeId()); }//Qmgr::initData() void Qmgr::initRecords() diff --git a/ndb/src/kernel/main.cpp b/ndb/src/kernel/main.cpp index c156a26500c..bd15ef37e20 100644 --- a/ndb/src/kernel/main.cpp +++ b/ndb/src/kernel/main.cpp @@ -190,6 +190,13 @@ int main(int argc, char** argv) exit(-1); } + // Re-use the mgm handle as a transporter + if(!globalTransporterRegistry.connect_client( + theConfig->get_config_retriever()->get_mgmHandlePtr())) + ERROR_SET(fatal, ERR_INVALID_CONFIG, + "Connection to mgmd terminated before setup was complete", + "StopOnError missing"); + if (!globalTransporterRegistry.start_clients()){ ndbout_c("globalTransporterRegistry.start_clients() failed"); exit(-1); diff --git a/ndb/src/kernel/vm/Configuration.cpp b/ndb/src/kernel/vm/Configuration.cpp index 3170939f8d8..650d914035f 100644 --- a/ndb/src/kernel/vm/Configuration.cpp +++ b/ndb/src/kernel/vm/Configuration.cpp @@ -189,7 +189,6 @@ Configuration::fetch_configuration(){ } m_mgmd_port= 0; - m_mgmd_host= 0; m_config_retriever= new ConfigRetriever(getConnectString(), NDB_VERSION, NODE_TYPE_DB); @@ -211,7 +210,7 @@ Configuration::fetch_configuration(){ } m_mgmd_port= m_config_retriever->get_mgmd_port(); - m_mgmd_host= m_config_retriever->get_mgmd_host(); + m_mgmd_host.assign(m_config_retriever->get_mgmd_host()); ConfigRetriever &cr= *m_config_retriever; diff --git a/ndb/src/kernel/vm/Configuration.hpp b/ndb/src/kernel/vm/Configuration.hpp index a257881e353..6ca6d9a1f17 100644 --- a/ndb/src/kernel/vm/Configuration.hpp +++ b/ndb/src/kernel/vm/Configuration.hpp @@ -17,6 +17,7 @@ #ifndef Configuration_H #define Configuration_H +#include <util/BaseString.hpp> #include <mgmapi.h> #include <ndb_types.h> @@ -67,7 +68,7 @@ public: const ndb_mgm_configuration_iterator * getOwnConfigIterator() const; Uint32 get_mgmd_port() const {return m_mgmd_port;}; - const char *get_mgmd_host() const {return m_mgmd_host;}; + const char *get_mgmd_host() const {return m_mgmd_host.c_str();}; ConfigRetriever* get_config_retriever() { return m_config_retriever; }; class LogLevel * m_logLevel; @@ -99,7 +100,7 @@ private: bool _initialStart; char * _connectString; Uint32 m_mgmd_port; - const char *m_mgmd_host; + BaseString m_mgmd_host; bool _daemonMode; void calcSizeAlt(class ConfigValues * ); diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp index bd4052f51d9..67ee307bb68 100644 --- a/ndb/src/mgmapi/mgmapi.cpp +++ b/ndb/src/mgmapi/mgmapi.cpp @@ -23,6 +23,7 @@ #include <NdbSleep.h> #include <NdbTCP.h> #include <mgmapi.h> +#include <mgmapi_internal.h> #include <mgmapi_debug.h> #include "mgmapi_configuration.hpp" #include <socket_io.h> @@ -2189,23 +2190,54 @@ ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, extern "C" NDB_SOCKET_TYPE -ndb_mgm_convert_to_transporter(NdbMgmHandle handle) +ndb_mgm_convert_to_transporter(NdbMgmHandle *handle) { NDB_SOCKET_TYPE s; - CHECK_HANDLE(handle, NDB_INVALID_SOCKET); - CHECK_CONNECTED(handle, NDB_INVALID_SOCKET); + CHECK_HANDLE((*handle), NDB_INVALID_SOCKET); + CHECK_CONNECTED((*handle), NDB_INVALID_SOCKET); - handle->connected= 0; // we pretend we're disconnected - s= handle->socket; + (*handle)->connected= 0; // we pretend we're disconnected + s= (*handle)->socket; SocketOutputStream s_output(s); s_output.println("transporter connect"); s_output.println(""); - ndb_mgm_destroy_handle(&handle); // set connected=0, so won't disconnect + ndb_mgm_destroy_handle(handle); // set connected=0, so won't disconnect return s; } +extern "C" +Uint32 +ndb_mgm_get_mgmd_nodeid(NdbMgmHandle handle) +{ + Uint32 nodeid=0; + + DBUG_ENTER("ndb_mgm_get_mgmd_nodeid"); + CHECK_HANDLE(handle, 0); + CHECK_CONNECTED(handle, 0); + + Properties args; + + const ParserRow<ParserDummy> reply[]= { + MGM_CMD("get mgmd nodeid reply", NULL, ""), + MGM_ARG("nodeid", Int, Mandatory, "Node ID"), + MGM_END() + }; + + const Properties *prop; + prop = ndb_mgm_call(handle, reply, "get mgmd nodeid", &args); + CHECK_REPLY(prop, 0); + + if(!prop->get("nodeid",&nodeid)){ + ndbout_c("Unable to get value"); + return 0; + } + + delete prop; + DBUG_RETURN(nodeid); +} + template class Vector<const ParserRow<ParserDummy>*>; diff --git a/ndb/src/mgmapi/mgmapi_internal.h b/ndb/src/mgmapi/mgmapi_internal.h new file mode 100644 index 00000000000..90f93129f2a --- /dev/null +++ b/ndb/src/mgmapi/mgmapi_internal.h @@ -0,0 +1,77 @@ +/* Copyright (C) 2005 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef MGMAPI_INTERNAL_H +#define MGMAPI_INTERNAL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include <NdbTCP.h> + + /** + * Set an integer parameter for a connection + * + * @param handle the NDB management handle. + * @param node1 the node1 id + * @param node2 the node2 id + * @param param the parameter (e.g. CFG_CONNECTION_SERVER_PORT) + * @param value what to set it to + * @param reply from ndb_mgmd + */ + int ndb_mgm_set_connection_int_parameter(NdbMgmHandle handle, + int node1, + int node2, + int param, + int value, + struct ndb_mgm_reply* reply); + + /** + * Get an integer parameter for a connection + * + * @param handle the NDB management handle. + * @param node1 the node1 id + * @param node2 the node2 id + * @param param the parameter (e.g. CFG_CONNECTION_SERVER_PORT) + * @param value where to store the retreived value. In the case of + * error, value is not changed. + * @param reply from ndb_mgmd + * @return 0 on success. < 0 on error. + */ + int ndb_mgm_get_connection_int_parameter(NdbMgmHandle handle, + int node1, + int node2, + int param, + int *value, + struct ndb_mgm_reply* reply); + + /** + * Convert connection to transporter + * @param handle NDB management handle. + * + * @return socket + * + * @note the socket is now able to be used as a transporter connection + */ + NDB_SOCKET_TYPE ndb_mgm_convert_to_transporter(NdbMgmHandle *handle); + +#ifdef __cplusplus +} +#endif + + +#endif diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index e915216c793..21c3c4d614d 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -2138,10 +2138,7 @@ MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) { const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i); - if (node.connected) - { - connected_nodes.bitOR(node.m_state.m_connected_nodes); - } + connected_nodes.bitOR(node.m_state.m_connected_nodes); } } } diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp index fdfe2f92aca..bbd3db1e734 100644 --- a/ndb/src/mgmsrv/Services.cpp +++ b/ndb/src/mgmsrv/Services.cpp @@ -266,6 +266,8 @@ ParserRow<MgmApiSession> commands[] = { MGM_CMD("transporter connect", &MgmApiSession::transporter_connect, ""), + MGM_CMD("get mgmd nodeid", &MgmApiSession::get_mgmd_nodeid, ""), + MGM_END() }; @@ -1544,14 +1546,22 @@ MgmApiSession::check_connection(Parser_t::Context &ctx, void MgmApiSession::transporter_connect(Parser_t::Context &ctx, - Properties const &args) { - NDB_SOCKET_TYPE s= m_socket; + Properties const &args) +{ + m_mgmsrv.transporter_connect(m_socket); m_stop= true; m_stopped= true; // force a stop (no closing socket) m_socket= NDB_INVALID_SOCKET; // so nobody closes it +} - m_mgmsrv.transporter_connect(s); +void +MgmApiSession::get_mgmd_nodeid(Parser_t::Context &ctx, + Properties const &args) +{ + m_output->println("get mgmd nodeid reply"); + m_output->println("nodeid:%u",m_mgmsrv.getOwnNodeId()); + m_output->println(""); } template class MutexVector<int>; diff --git a/ndb/src/mgmsrv/Services.hpp b/ndb/src/mgmsrv/Services.hpp index e4fddea7d04..ff9008b05a8 100644 --- a/ndb/src/mgmsrv/Services.hpp +++ b/ndb/src/mgmsrv/Services.hpp @@ -99,6 +99,8 @@ public: void check_connection(Parser_t::Context &ctx, const class Properties &args); void transporter_connect(Parser_t::Context &ctx, Properties const &args); + + void get_mgmd_nodeid(Parser_t::Context &ctx, Properties const &args); void repCommand(Parser_t::Context &ctx, const class Properties &args); }; diff --git a/ndb/src/ndbapi/ClusterMgr.cpp b/ndb/src/ndbapi/ClusterMgr.cpp index 183cb6488f8..71938e27037 100644 --- a/ndb/src/ndbapi/ClusterMgr.cpp +++ b/ndb/src/ndbapi/ClusterMgr.cpp @@ -260,6 +260,7 @@ ClusterMgr::Node::Node() : m_state(NodeState::SL_NOTHING) { compatible = nfCompleteRep = true; connected = defined = m_alive = false; + m_state.m_connected_nodes.clear(); } /****************************************************************************** @@ -434,6 +435,9 @@ ClusterMgr::reportDisconnected(NodeId nodeId){ noOfConnectedNodes--; theNodes[nodeId].connected = false; + + theNodes[nodeId].m_state.m_connected_nodes.clear(); + reportNodeFailed(nodeId); } diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index b522efb7792..ff87a72f636 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -1950,19 +1950,23 @@ void NdbDictInterface::execDROP_TABLE_CONF(NdbApiSignal * signal, LinearSectionPtr ptr[3]) { + DBUG_ENTER("NdbDictInterface::execDROP_TABLE_CONF"); //DropTableConf* const conf = CAST_CONSTPTR(DropTableConf, signal->getDataPtr()); m_waiter.signal(NO_WAIT); + DBUG_VOID_RETURN; } void NdbDictInterface::execDROP_TABLE_REF(NdbApiSignal * signal, LinearSectionPtr ptr[3]) { + DBUG_ENTER("NdbDictInterface::execDROP_TABLE_REF"); const DropTableRef* const ref = CAST_CONSTPTR(DropTableRef, signal->getDataPtr()); m_error.code= ref->errorCode; m_masterNodeId = ref->masterNodeId; m_waiter.signal(NO_WAIT); + DBUG_VOID_RETURN; } int diff --git a/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/ndb/src/ndbapi/NdbEventOperationImpl.cpp index bafb8f7ca38..b00b0d82cba 100644 --- a/ndb/src/ndbapi/NdbEventOperationImpl.cpp +++ b/ndb/src/ndbapi/NdbEventOperationImpl.cpp @@ -58,10 +58,14 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, m_state(EO_ERROR), m_bufferL(bufferLength) { m_eventId = 0; - theFirstRecAttrs[0] = NULL; - theCurrentRecAttrs[0] = NULL; - theFirstRecAttrs[1] = NULL; - theCurrentRecAttrs[1] = NULL; + theFirstPkAttrs[0] = NULL; + theCurrentPkAttrs[0] = NULL; + theFirstPkAttrs[1] = NULL; + theCurrentPkAttrs[1] = NULL; + theFirstDataAttrs[0] = NULL; + theCurrentDataAttrs[0] = NULL; + theFirstDataAttrs[1] = NULL; + theCurrentDataAttrs[1] = NULL; sdata = NULL; ptr[0].p = NULL; ptr[1].p = NULL; @@ -80,6 +84,8 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, m_eventImpl = &myEvnt->m_impl; + m_eventId = m_eventImpl->m_eventId; + m_bufferHandle = m_ndb->getGlobalEventBufferHandle(); if (m_bufferHandle->m_bufferL > 0) m_bufferL =m_bufferHandle->m_bufferL; @@ -94,7 +100,15 @@ NdbEventOperationImpl::~NdbEventOperationImpl() int i; if (sdata) NdbMem_Free((char*)sdata); for (i=0 ; i<2; i++) { - NdbRecAttr *p = theFirstRecAttrs[i]; + NdbRecAttr *p = theFirstPkAttrs[i]; + while (p) { + NdbRecAttr *p_next = p->next(); + m_ndb->releaseRecAttr(p); + p = p_next; + } + } + for (i=0 ; i<2; i++) { + NdbRecAttr *p = theFirstDataAttrs[i]; while (p) { NdbRecAttr *p_next = p->next(); m_ndb->releaseRecAttr(p); @@ -138,14 +152,26 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in { DBUG_ENTER("NdbEventOperationImpl::getValue"); // Insert Attribute Id into ATTRINFO part. - NdbRecAttr *&theFirstRecAttr = theFirstRecAttrs[n]; - NdbRecAttr *&theCurrentRecAttr = theCurrentRecAttrs[n]; - + + NdbRecAttr **theFirstAttr; + NdbRecAttr **theCurrentAttr; + + if (tAttrInfo->getPrimaryKey()) + { + theFirstAttr = &theFirstPkAttrs[n]; + theCurrentAttr = &theCurrentPkAttrs[n]; + } + else + { + theFirstAttr = &theFirstDataAttrs[n]; + theCurrentAttr = &theCurrentDataAttrs[n]; + } + /************************************************************************ * Get a Receive Attribute object and link it into the operation object. ************************************************************************/ - NdbRecAttr *tRecAttr = m_ndb->getRecAttr(); - if (tRecAttr == NULL) { + NdbRecAttr *tAttr = m_ndb->getRecAttr(); + if (tAttr == NULL) { exit(-1); //setErrorCodeAbort(4000); DBUG_RETURN(NULL); @@ -156,51 +182,51 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in * the RecAttr object * Also set attribute size, array size and attribute type ********************************************************************/ - if (tRecAttr->setup(tAttrInfo, aValue)) { + if (tAttr->setup(tAttrInfo, aValue)) { //setErrorCodeAbort(4000); - m_ndb->releaseRecAttr(tRecAttr); + m_ndb->releaseRecAttr(tAttr); exit(-1); DBUG_RETURN(NULL); } //theErrorLine++; - tRecAttr->setNULL(); + tAttr->setUNDEFINED(); // We want to keep the list sorted to make data insertion easier later - if (theFirstRecAttr == NULL) { - theFirstRecAttr = tRecAttr; - theCurrentRecAttr = tRecAttr; - tRecAttr->next(NULL); + + if (*theFirstAttr == NULL) { + *theFirstAttr = tAttr; + *theCurrentAttr = tAttr; + tAttr->next(NULL); } else { Uint32 tAttrId = tAttrInfo->m_attrId; - if (tAttrId > theCurrentRecAttr->attrId()) { // right order - theCurrentRecAttr->next(tRecAttr); - tRecAttr->next(NULL); - theCurrentRecAttr = tRecAttr; - } else if (theFirstRecAttr->next() == NULL || // only one in list - theFirstRecAttr->attrId() > tAttrId) {// or first - tRecAttr->next(theFirstRecAttr); - theFirstRecAttr = tRecAttr; + if (tAttrId > (*theCurrentAttr)->attrId()) { // right order + (*theCurrentAttr)->next(tAttr); + tAttr->next(NULL); + *theCurrentAttr = tAttr; + } else if ((*theFirstAttr)->next() == NULL || // only one in list + (*theFirstAttr)->attrId() > tAttrId) {// or first + tAttr->next(*theFirstAttr); + *theFirstAttr = tAttr; } else { // at least 2 in list and not first and not last - NdbRecAttr *p = theFirstRecAttr; + NdbRecAttr *p = *theFirstAttr; NdbRecAttr *p_next = p->next(); while (tAttrId > p_next->attrId()) { p = p_next; p_next = p->next(); } if (tAttrId == p_next->attrId()) { // Using same attribute twice - tRecAttr->release(); // do I need to do this? - m_ndb->releaseRecAttr(tRecAttr); + tAttr->release(); // do I need to do this? + m_ndb->releaseRecAttr(tAttr); exit(-1); DBUG_RETURN(NULL); } // this is it, between p and p_next - p->next(tRecAttr); - tRecAttr->next(p_next); + p->next(tAttr); + tAttr->next(p_next); } } - - DBUG_RETURN(tRecAttr); + DBUG_RETURN(tAttr); } int @@ -213,7 +239,8 @@ NdbEventOperationImpl::execute() DBUG_RETURN(-1); } - if (theFirstRecAttrs[0] == NULL) { // defaults to get all + if (theFirstPkAttrs[0] == NULL && + theFirstDataAttrs[0] == NULL) { // defaults to get all } @@ -362,8 +389,10 @@ NdbEventOperationImpl::next(int *pOverrun) #endif // now move the data into the RecAttrs - if ((theFirstRecAttrs[0] == NULL) && - (theFirstRecAttrs[1] == NULL)) + if ((theFirstPkAttrs[0] == NULL) && + (theFirstPkAttrs[1] == NULL) && + (theFirstDataAttrs[0] == NULL) && + (theFirstDataAttrs[1] == NULL)) { DBUG_RETURN(r); } @@ -385,11 +414,28 @@ NdbEventOperationImpl::next(int *pOverrun) printf("\n"); #endif - NdbRecAttr *tWorkingRecAttr = theFirstRecAttrs[0]; - // copy data into the RecAttr's // we assume that the respective attribute lists are sorted + // first the pk's + { + NdbRecAttr *tAttr= theFirstPkAttrs[0]; + while(tAttr) + { + assert(aAttrPtr < aAttrEndPtr); + unsigned tDataSz= AttributeHeader(*aAttrPtr).getDataSize(); + assert(tAttr->attrId() == + AttributeHeader(*aAttrPtr).getAttributeId()); + assert(tAttr->receive_data(aDataPtr, tDataSz)); + // next + aAttrPtr++; + aDataPtr+= tDataSz; + tAttr= tAttr->next(); + } + } + + NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0]; + Uint32 tRecAttrId; Uint32 tAttrId; Uint32 tDataSz; @@ -401,7 +447,7 @@ NdbEventOperationImpl::next(int *pOverrun) while (tAttrId > tRecAttrId) { //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); - tWorkingRecAttr->setNULL(); + tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); if (tWorkingRecAttr == NULL) break; @@ -413,32 +459,25 @@ NdbEventOperationImpl::next(int *pOverrun) //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); if (tAttrId == tRecAttrId) { - if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()) - hasSomeData++; + hasSomeData++; //printf("set!\n"); - tWorkingRecAttr->receive_data(aDataPtr, tDataSz); - - // move forward, data has already moved forward - aAttrPtr++; - aDataPtr += tDataSz; + assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz)); tWorkingRecAttr = tWorkingRecAttr->next(); - } else { - // move only attr forward - aAttrPtr++; - aDataPtr += tDataSz; } + aAttrPtr++; + aDataPtr += tDataSz; } while (tWorkingRecAttr != NULL) { tRecAttrId = tWorkingRecAttr->attrId(); //printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); - tWorkingRecAttr->setNULL(); + tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); } - tWorkingRecAttr = theFirstRecAttrs[1]; + tWorkingRecAttr = theFirstDataAttrs[1]; aDataPtr = ptr[2].p; Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz; while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) { @@ -447,7 +486,7 @@ NdbEventOperationImpl::next(int *pOverrun) tDataSz = AttributeHeader(*aDataPtr).getDataSize(); aDataPtr++; while (tAttrId > tRecAttrId) { - tWorkingRecAttr->setNULL(); + tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); if (tWorkingRecAttr == NULL) break; @@ -456,20 +495,16 @@ NdbEventOperationImpl::next(int *pOverrun) if (tWorkingRecAttr == NULL) break; if (tAttrId == tRecAttrId) { - if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()) - hasSomeData++; + assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()); + hasSomeData++; - tWorkingRecAttr->receive_data(aDataPtr, tDataSz); - aDataPtr += tDataSz; - // move forward, data+attr has already moved forward + assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz)); tWorkingRecAttr = tWorkingRecAttr->next(); - } else { - // move only data+attr forward - aDataPtr += tDataSz; } + aDataPtr += tDataSz; } while (tWorkingRecAttr != NULL) { - tWorkingRecAttr->setNULL(); + tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); } @@ -504,7 +539,16 @@ NdbEventOperationImpl::print() ndbout << "EventId " << m_eventId << "\n"; for (int i = 0; i < 2; i++) { - NdbRecAttr *p = theFirstRecAttrs[i]; + NdbRecAttr *p = theFirstPkAttrs[i]; + ndbout << " %u " << i; + while (p) { + ndbout << " : " << p->attrId() << " = " << *p; + p = p->next(); + } + ndbout << "\n"; + } + for (int i = 0; i < 2; i++) { + NdbRecAttr *p = theFirstDataAttrs[i]; ndbout << " %u " << i; while (p) { ndbout << " : " << p->attrId() << " = " << *p; @@ -905,6 +949,7 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent int bufferId= -1; Uint32 eventId= eventOp->m_eventId; + DBUG_PRINT("enter",("eventId: %u", eventId)); // add_drop_lock(); // only one thread can do add or drop at a time // Find place where eventId already set diff --git a/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/ndb/src/ndbapi/NdbEventOperationImpl.hpp index 3fcbfd8fe7c..96958979c76 100644 --- a/ndb/src/ndbapi/NdbEventOperationImpl.hpp +++ b/ndb/src/ndbapi/NdbEventOperationImpl.hpp @@ -60,8 +60,10 @@ public: NdbEventImpl *m_eventImpl; NdbGlobalEventBufferHandle *m_bufferHandle; - NdbRecAttr *theFirstRecAttrs[2]; - NdbRecAttr *theCurrentRecAttrs[2]; + NdbRecAttr *theFirstPkAttrs[2]; + NdbRecAttr *theCurrentPkAttrs[2]; + NdbRecAttr *theFirstDataAttrs[2]; + NdbRecAttr *theCurrentDataAttrs[2]; NdbEventOperation::State m_state; Uint32 m_eventId; diff --git a/ndb/src/ndbapi/ndb_cluster_connection.cpp b/ndb/src/ndbapi/ndb_cluster_connection.cpp index 1990d2b6d52..49aded8e0ac 100644 --- a/ndb/src/ndbapi/ndb_cluster_connection.cpp +++ b/ndb/src/ndbapi/ndb_cluster_connection.cpp @@ -29,6 +29,7 @@ #include <ConfigRetriever.hpp> #include <ndb_version.h> #include <mgmapi_debug.h> +#include <mgmapi_internal.h> #include <md5_hash.hpp> #include <EventLogger.hpp> diff --git a/ndb/src/ndbapi/ndberror.c b/ndb/src/ndbapi/ndberror.c index 08f2c8dcc7a..b033d81fa33 100644 --- a/ndb/src/ndbapi/ndberror.c +++ b/ndb/src/ndbapi/ndberror.c @@ -494,7 +494,6 @@ ErrorBundle ErrorCodes[] = { { 4242, AE, "Too many indexes" }, { 4243, AE, "Index not found" }, { 4244, OE, "Index or table with given name already exists" }, - { 4245, AE, "Index attribute must be defined as stored, i.e. the StorageAttributeType must be defined as NormalStorageAttribute"}, { 4247, AE, "Illegal index/trigger create/drop/alter request" }, { 4248, AE, "Trigger/index name invalid" }, { 4249, AE, "Invalid table" }, diff --git a/ndb/test/include/HugoOperations.hpp b/ndb/test/include/HugoOperations.hpp index e421d9b5b6d..05137710609 100644 --- a/ndb/test/include/HugoOperations.hpp +++ b/ndb/test/include/HugoOperations.hpp @@ -95,6 +95,8 @@ public: int numRecords = 1); NdbIndexScanOperation* pIndexScanOp; + + NDBT_ResultRow& get_row(Uint32 idx) { return *rows[idx];} protected: void allocRows(int rows); void deallocRows(); diff --git a/ndb/test/include/NDBT_Error.hpp b/ndb/test/include/NDBT_Error.hpp index ef107072465..6775a107196 100644 --- a/ndb/test/include/NDBT_Error.hpp +++ b/ndb/test/include/NDBT_Error.hpp @@ -91,7 +91,11 @@ private: ; \ } -#define ERR(error) ERR_OUT(g_err, error) +#define ERR(error) \ +{ \ + const NdbError &_error= (error); \ + ERR_OUT(g_err, _error); \ +} #define ERR_INFO(error) ERR_OUT(g_info, error) #endif diff --git a/ndb/test/include/NDBT_ResultRow.hpp b/ndb/test/include/NDBT_ResultRow.hpp index 6072d0ea510..cbb5d7f6c6a 100644 --- a/ndb/test/include/NDBT_ResultRow.hpp +++ b/ndb/test/include/NDBT_ResultRow.hpp @@ -27,7 +27,7 @@ public: const NdbRecAttr * attributeStore(int i) const ; const NdbRecAttr * attributeStore(const char* name) const ; - BaseString c_str(); + BaseString c_str() const ; NdbOut & header (NdbOut &) const; friend NdbOut & operator << (NdbOut&, const NDBT_ResultRow &); @@ -36,6 +36,11 @@ public: * Make copy of NDBT_ResultRow */ NDBT_ResultRow * clone() const; + + bool operator==(const NDBT_ResultRow&) const ; + bool operator!=(const NDBT_ResultRow& other) const { + return ! (*this == other); + } private: int cols; diff --git a/ndb/test/include/NdbSchemaOp.hpp b/ndb/test/include/NdbSchemaOp.hpp index da55f5f9aa5..1edbc155643 100644 --- a/ndb/test/include/NdbSchemaOp.hpp +++ b/ndb/test/include/NdbSchemaOp.hpp @@ -79,29 +79,6 @@ }; /** - * Where attribute is stored. - * - * This is used to indicate whether a primary key - * should only be stored in the index storage and not in the data storage - * or if it should be stored in both places. - * The first alternative makes the attribute take less space, - * but makes it impossible to scan using attribute. - * - * @note Use NormalStorageAttribute for most cases. - * (IndexStorageAttribute should only be used on primary key - * attributes and only if you do not want to scan using the attribute.) - */ - enum StorageAttributeType { - NoStorageAttributeTypeDefined = -1, ///< <i>Missing explanation</i> - IndexStorageAttribute, ///< Attribute is only stored in - ///< index storage (ACC) - NormalStorageAttribute ///< Attribute values are stored - ///< both in the index (ACC) and - ///< in the data storage (TUP) - }; - - - /** * Type of fragmentation used for a table */ enum FragmentType { @@ -405,27 +382,7 @@ public: * the attribute. * <br> * Legal values: true, false - * @param aStType Stored in both index and data storage or - * only store in index data storage. - * <br> - * This parameter is only of interest for tuple - * key attributes. - * All tuple key attributes values are always stored - * in the index storage part. - * If this parameter is set to - * IndexStorageAttribute, then the attribute values - * will <em>only</em> be stored in the index - * storage part and <em>not</em> in the data - * storage part. - * <br> - * If there will be no scans using the primary - * key attribute and if the size of the attribute - * is large, then this might be of interest. - * A typical example is a table where - * http-addresses are used as primary key. - * <br> - * Legal values: NormalStorageAttribute, - * IndexStorageAttribute + * @param aStType Obsolete since wl-2066 * @param aDistributionKey Sometimes it is preferable to use a subset * of the primary key as the distribution key. * An example is TPC-C where it might be @@ -474,7 +431,7 @@ public: AttrType aAttrType = UnSigned, StorageMode aStorageMode = MMBased, bool nullable = false, - StorageAttributeType aStType= NormalStorageAttribute, + int aStType= 0, // obsolete int aDistributionKey = 0, int aDistributionGroup = 0, int aDistributionGroupNoOfBits = 16, @@ -491,7 +448,7 @@ public: AttrType aAttrType, StorageMode aStorageMode, NullAttributeType aNullAttr, - StorageAttributeType aStType = NormalStorageAttribute, + int aStType, // obsolete int aDistributionKey = 0, int aDistributionGroup = 0, int aDistributionGroupNoOfBits = 16){ diff --git a/ndb/test/include/UtilTransactions.hpp b/ndb/test/include/UtilTransactions.hpp index 16732008ad1..afdbc5c3445 100644 --- a/ndb/test/include/UtilTransactions.hpp +++ b/ndb/test/include/UtilTransactions.hpp @@ -66,6 +66,14 @@ public: int copyTableData(Ndb*, const char* destName); + /** + * Compare this table with other_table + * + * return 0 - on equality + * -1 - on error + * >0 - otherwise + */ + int compare(Ndb*, const char * other_table, int flags); private: static int takeOverAndDeleteRecord(Ndb*, diff --git a/ndb/test/ndbapi/Makefile.am b/ndb/test/ndbapi/Makefile.am index 9a1726ae11a..1d2dfb3f948 100644 --- a/ndb/test/ndbapi/Makefile.am +++ b/ndb/test/ndbapi/Makefile.am @@ -33,7 +33,8 @@ testDeadlock \ test_event ndbapi_slow_select testReadPerf testLcp \ testPartitioning \ testBitfield \ -DbCreate DbAsyncGenerator +DbCreate DbAsyncGenerator \ +test_event_multi_table #flexTimedAsynch #testBlobs @@ -76,6 +77,7 @@ testPartitioning_SOURCES = testPartitioning.cpp testBitfield_SOURCES = testBitfield.cpp DbCreate_SOURCES = bench/mainPopulate.cpp bench/dbPopulate.cpp bench/userInterface.cpp bench/dbPopulate.h bench/userInterface.h bench/testData.h bench/testDefinitions.h bench/ndb_schema.hpp bench/ndb_error.hpp DbAsyncGenerator_SOURCES = bench/mainAsyncGenerator.cpp bench/asyncGenerator.cpp bench/ndb_async2.cpp bench/dbGenerator.h bench/macros.h bench/userInterface.h bench/testData.h bench/testDefinitions.h bench/ndb_schema.hpp bench/ndb_error.hpp +test_event_multi_table_SOURCES = test_event_multi_table.cpp INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel diff --git a/ndb/test/ndbapi/bench/userInterface.cpp b/ndb/test/ndbapi/bench/userInterface.cpp index 683552c3133..35e88183230 100644 --- a/ndb/test/ndbapi/bench/userInterface.cpp +++ b/ndb/test/ndbapi/bench/userInterface.cpp @@ -173,7 +173,7 @@ create_table_server(Ndb * pNdb){ String, MMBased, NotNullAttribute, - NormalStorageAttribute, + 0, 0, 1, 16); @@ -376,7 +376,7 @@ create_table_subscriber(Ndb * pNdb){ String, MMBased, NotNullAttribute, - NormalStorageAttribute, + 0, 0, 1, 16); @@ -494,7 +494,7 @@ create_table_session(Ndb * pNdb){ String, MMBased, NotNullAttribute, - NormalStorageAttribute, + 0, 0, 1, 16); diff --git a/ndb/test/ndbapi/test_event.cpp b/ndb/test/ndbapi/test_event.cpp index 3619fe195cf..2df50f21e43 100644 --- a/ndb/test/ndbapi/test_event.cpp +++ b/ndb/test/ndbapi/test_event.cpp @@ -14,11 +14,11 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include "NDBT_Test.hpp" -#include "NDBT_ReturnCodes.h" -#include "HugoTransactions.hpp" -#include "UtilTransactions.hpp" -#include "TestNdbEventOperation.hpp" +#include <NDBT_Test.hpp> +#include <NDBT_ReturnCodes.h> +#include <HugoTransactions.hpp> +#include <UtilTransactions.hpp> +#include <TestNdbEventOperation.hpp> #define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb() @@ -206,15 +206,10 @@ int runVerify(NDBT_Context* ctx, NDBT_Step* step) char buf[1024]; sprintf(buf, "%s_SHADOW", table->getName()); - const NdbDictionary::Table * table_shadow; - if ((table_shadow = GETNDB(step)->getDictionary()->getTable(buf)) == 0) - { - g_err << "Unable to get table " << buf << endl; - return NDBT_FAILED; - } - HugoTransactions hugoTrans(*table_shadow); - if (hugoTrans.pkReadRecords(GETNDB(step), records) != 0){ + HugoTransactions hugoTrans(*table); + if (hugoTrans.compare(GETNDB(step), buf, 0)) + { return NDBT_FAILED; } @@ -268,7 +263,10 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step) //printf("now waiting for event...\n"); res= GETNDB(step)->pollEvents(1000); // wait for event or 1000 ms if (res <= 0) + { + ndbout_c("********************"); continue; + } //printf("got data! %d\n", r); int overrun= 0; @@ -337,12 +335,36 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step) DBUG_RETURN(NDBT_FAILED); } break; - case NdbDictionary::Event::TE_ALL: + default: abort(); } for (i= 0; i < n_columns; i++) { + if (recAttr[i]->isNULL()) + { + if (table->getColumn(i)->getPrimaryKey()) + { + g_err << "internal error: primary key isNull()=" + << recAttr[i]->isNULL() << endl; + DBUG_RETURN(NDBT_FAILED); + } + switch (pOp->getEventType()) { + case NdbDictionary::Event::TE_INSERT: + if (recAttr[i]->isNULL() < 0) + { + g_err << "internal error: missing value for insert\n"; + DBUG_RETURN(NDBT_FAILED); + } + break; + case NdbDictionary::Event::TE_DELETE: + break; + case NdbDictionary::Event::TE_UPDATE: + break; + default: + abort(); + } + } if (table->getColumn(i)->getPrimaryKey() && op->equal(i,recAttr[i]->aRef())) { @@ -358,7 +380,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step) for (i= 0; i < n_columns; i++) { if (!table->getColumn(i)->getPrimaryKey() && - op->setValue(i,recAttr[i]->aRef())) + op->setValue(i,recAttr[i]->isNULL() ? 0:recAttr[i]->aRef())) { g_err << "setValue(insert) " << i << " " << op->getNdbError().code << " " @@ -374,7 +396,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step) { if (!table->getColumn(i)->getPrimaryKey() && recAttr[i]->isNULL() >= 0 && - op->setValue(i,recAttr[i]->aRef())) + op->setValue(i,recAttr[i]->isNULL() ? 0:recAttr[i]->aRef())) { g_err << "setValue(update) " << i << " " << op->getNdbError().code << " " diff --git a/ndb/test/ndbapi/test_event_multi_table.cpp b/ndb/test/ndbapi/test_event_multi_table.cpp new file mode 100644 index 00000000000..f16504029fa --- /dev/null +++ b/ndb/test/ndbapi/test_event_multi_table.cpp @@ -0,0 +1,487 @@ +/* Copyright (C) 2005 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <ndb_global.h> +#include <ndb_opts.h> +#include <NDBT_Test.hpp> +#include <NDBT_ReturnCodes.h> +#include <HugoTransactions.hpp> +#include <UtilTransactions.hpp> +#include <TestNdbEventOperation.hpp> + +static void usage() +{ + ndb_std_print_version(); +} + +static int start_transaction(Ndb *ndb, Vector<HugoOperations*> &ops) +{ + if (ops[0]->startTransaction(ndb) != NDBT_OK) + return -1; + NdbTransaction * t= ops[0]->getTransaction(); + for (int i= ops.size()-1; i > 0; i--) + { + ops[i]->setTransaction(t); + } + return 0; +} + +static int close_transaction(Ndb *ndb, Vector<HugoOperations*> &ops) +{ + if (ops[0]->closeTransaction(ndb) != NDBT_OK) + return -1; + for (int i= ops.size()-1; i > 0; i--) + { + ops[i]->setTransaction(NULL); + } + return 0; +} + +static int execute_commit(Ndb *ndb, Vector<HugoOperations*> &ops) +{ + if (ops[0]->execute_Commit(ndb) != NDBT_OK) + return -1; + return 0; +} + +static int copy_events(Ndb *ndb, + Vector<NdbEventOperation *> &ops, + Vector<const NdbDictionary::Table *> &tabs, + Vector<Vector<NdbRecAttr *> > &values) +{ + DBUG_ENTER("copy_events"); + int r= 0; + while (1) + { + int res= ndb->pollEvents(1000); // wait for event or 1000 ms + DBUG_PRINT("info", ("pollEvents res=%d", r)); + if (res <= 0) + { + break; + } + for (unsigned i_ops= 0; i_ops < ops.size(); i_ops++) + { + NdbEventOperation *pOp= ops[i_ops]; + const NdbDictionary::Table *table= tabs[i_ops]; + Vector<NdbRecAttr *> &recAttr= values[i_ops]; + + int overrun= 0; + unsigned i; + unsigned n_columns= table->getNoOfColumns(); + while (pOp->next(&overrun) > 0) + { + if (overrun) + { + g_err << "buffer overrun\n"; + DBUG_RETURN(-1); + } + r++; + + Uint32 gci= pOp->getGCI(); + + if (!pOp->isConsistent()) { + g_err << "A node failure has occured and events might be missing\n"; + DBUG_RETURN(-1); + } + + int noRetries= 0; + do + { + NdbTransaction *trans= ndb->startTransaction(); + if (trans == 0) + { + g_err << "startTransaction failed " + << ndb->getNdbError().code << " " + << ndb->getNdbError().message << endl; + DBUG_RETURN(-1); + } + + NdbOperation *op= trans->getNdbOperation(table); + if (op == 0) + { + g_err << "getNdbOperation failed " + << trans->getNdbError().code << " " + << trans->getNdbError().message << endl; + DBUG_RETURN(-1); + } + + switch (pOp->getEventType()) { + case NdbDictionary::Event::TE_INSERT: + if (op->insertTuple()) + { + g_err << "insertTuple " + << op->getNdbError().code << " " + << op->getNdbError().message << endl; + DBUG_RETURN(-1); + } + break; + case NdbDictionary::Event::TE_DELETE: + if (op->deleteTuple()) + { + g_err << "deleteTuple " + << op->getNdbError().code << " " + << op->getNdbError().message << endl; + DBUG_RETURN(-1); + } + break; + case NdbDictionary::Event::TE_UPDATE: + if (op->updateTuple()) + { + g_err << "updateTuple " + << op->getNdbError().code << " " + << op->getNdbError().message << endl; + DBUG_RETURN(-1); + } + break; + default: + abort(); + } + + for (i= 0; i < n_columns; i++) + { + if (recAttr[i]->isNULL()) + { + if (table->getColumn(i)->getPrimaryKey()) + { + g_err << "internal error: primary key isNull()=" + << recAttr[i]->isNULL() << endl; + DBUG_RETURN(NDBT_FAILED); + } + switch (pOp->getEventType()) { + case NdbDictionary::Event::TE_INSERT: + if (recAttr[i]->isNULL() < 0) + { + g_err << "internal error: missing value for insert\n"; + DBUG_RETURN(NDBT_FAILED); + } + break; + case NdbDictionary::Event::TE_DELETE: + break; + case NdbDictionary::Event::TE_UPDATE: + break; + default: + abort(); + } + } + if (table->getColumn(i)->getPrimaryKey() && + op->equal(i,recAttr[i]->aRef())) + { + g_err << "equal " << i << " " + << op->getNdbError().code << " " + << op->getNdbError().message << endl; + DBUG_RETURN(NDBT_FAILED); + } + } + + switch (pOp->getEventType()) { + case NdbDictionary::Event::TE_INSERT: + for (i= 0; i < n_columns; i++) + { + if (!table->getColumn(i)->getPrimaryKey() && + op->setValue(i,recAttr[i]->isNULL() ? 0:recAttr[i]->aRef())) + { + g_err << "setValue(insert) " << i << " " + << op->getNdbError().code << " " + << op->getNdbError().message << endl; + DBUG_RETURN(-1); + } + } + break; + case NdbDictionary::Event::TE_DELETE: + break; + case NdbDictionary::Event::TE_UPDATE: + for (i= 0; i < n_columns; i++) + { + if (!table->getColumn(i)->getPrimaryKey() && + recAttr[i]->isNULL() >= 0 && + op->setValue(i,recAttr[i]->isNULL() ? 0:recAttr[i]->aRef())) + { + g_err << "setValue(update) " << i << " " + << op->getNdbError().code << " " + << op->getNdbError().message << endl; + DBUG_RETURN(NDBT_FAILED); + } + } + break; + case NdbDictionary::Event::TE_ALL: + abort(); + } + if (trans->execute(Commit) == 0) + { + trans->close(); + // everything ok + break; + } + if (noRetries++ == 10 || + trans->getNdbError().status != NdbError::TemporaryError) + { + g_err << "execute " << r << " failed " + << trans->getNdbError().code << " " + << trans->getNdbError().message << endl; + trans->close(); + DBUG_RETURN(-1); + } + trans->close(); + NdbSleep_MilliSleep(100); // sleep before retying + } while(1); + } + } + } + DBUG_RETURN(r); +} + +static int verify_copy(Ndb *ndb, + Vector<const NdbDictionary::Table *> &tabs1, + Vector<const NdbDictionary::Table *> &tabs2) +{ + for (unsigned i= 0; i < tabs1.size(); i++) + if (tabs1[i]) + { + HugoTransactions hugoTrans(*tabs1[i]); + if (hugoTrans.compare(ndb, tabs2[i]->getName(), 0)) + return -1; + } + return 0; +} + +NDB_STD_OPTS_VARS; + +static const char* _dbname = "TEST_DB"; +static struct my_option my_long_options[] = +{ + NDB_STD_OPTS(""), + { "database", 'd', "Name of database table is in", + (gptr*) &_dbname, (gptr*) &_dbname, 0, + GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} +}; + +int +main(int argc, char** argv) +{ + NDB_INIT(argv[0]); + const char *load_default_groups[]= { "mysql_cluster",0 }; + load_defaults("my",load_default_groups,&argc,&argv); + + int ho_error; +#ifndef DBUG_OFF + opt_debug= "d:t:F:L"; +#endif + if ((ho_error=handle_options(&argc, &argv, my_long_options, + ndb_std_get_one_option))) + return NDBT_ProgramExit(NDBT_WRONGARGS); + + DBUG_ENTER("main"); + Ndb_cluster_connection con(opt_connect_str); + if(con.connect(12, 5, 1)) + { + DBUG_RETURN(NDBT_ProgramExit(NDBT_FAILED)); + } + + + Ndb ndb(&con,_dbname); + ndb.init(); + while (ndb.waitUntilReady() != 0); + + NdbDictionary::Dictionary * dict = ndb.getDictionary(); + int no_error= 1; + int i; + + // create all tables + Vector<const NdbDictionary::Table*> pTabs; + for (i= 0; no_error && argc; argc--, i++) + { + dict->dropTable(argv[i]); + NDBT_Tables::createTable(&ndb, argv[i]); + const NdbDictionary::Table *pTab= dict->getTable(argv[i]); + if (pTab == 0) + { + ndbout << "Failed to create table" << endl; + ndbout << dict->getNdbError() << endl; + no_error= 0; + break; + } + pTabs.push_back(pTab); + } + pTabs.push_back(NULL); + + // create an event for each table + for (i= 0; no_error && pTabs[i]; i++) + { + HugoTransactions ht(*pTabs[i]); + if (ht.createEvent(&ndb)){ + no_error= 0; + break; + } + } + + // create an event operation for each event + Vector<NdbEventOperation *> pOps; + for (i= 0; no_error && pTabs[i]; i++) + { + char buf[1024]; + sprintf(buf, "%s_EVENT", pTabs[i]->getName()); + NdbEventOperation *pOp= ndb.createEventOperation(buf, 1000); + if ( pOp == NULL ) + { + no_error= 0; + break; + } + pOps.push_back(pOp); + } + + // get storage for each event operation + Vector<Vector<NdbRecAttr*> > values; + Vector<Vector<NdbRecAttr*> > pre_values; + for (i= 0; no_error && pTabs[i]; i++) + { + int n_columns= pTabs[i]->getNoOfColumns(); + Vector<NdbRecAttr*> tmp_a; + Vector<NdbRecAttr*> tmp_b; + for (int j = 0; j < n_columns; j++) { + tmp_a.push_back(pOps[i]->getValue(pTabs[i]->getColumn(j)->getName())); + tmp_b.push_back(pOps[i]->getPreValue(pTabs[i]->getColumn(j)->getName())); + } + values.push_back(tmp_a); + pre_values.push_back(tmp_b); + } + + // start receiving events + for (i= 0; no_error && pTabs[i]; i++) + { + if ( pOps[i]->execute() ) + { + no_error= 0; + break; + } + } + + // create a "shadow" table for each table + Vector<const NdbDictionary::Table*> pShadowTabs; + for (i= 0; no_error && pTabs[i]; i++) + { + char buf[1024]; + sprintf(buf, "%s_SHADOW", pTabs[i]->getName()); + + dict->dropTable(buf); + if (dict->getTable(buf)) + { + no_error= 0; + break; + } + + NdbDictionary::Table table_shadow(*pTabs[i]); + table_shadow.setName(buf); + dict->createTable(table_shadow); + pShadowTabs.push_back(dict->getTable(buf)); + if (!pShadowTabs[i]) + { + no_error= 0; + break; + } + } + + // create a hugo operation per table + Vector<HugoOperations *> hugo_ops; + for (i= 0; no_error && pTabs[i]; i++) + { + hugo_ops.push_back(new HugoOperations(*pTabs[i])); + } + + sleep(5); + + // insert 3 records per table + do { + if (start_transaction(&ndb, hugo_ops)) + { + no_error= 0; + break; + } + for (i= 0; no_error && pTabs[i]; i++) + { + hugo_ops[i]->pkInsertRecord(&ndb, 0, 3); + } + if (execute_commit(&ndb, hugo_ops)) + { + no_error= 0; + break; + } + if(close_transaction(&ndb, hugo_ops)) + { + no_error= 0; + break; + } + } while(0); + + // copy events and verify + do { + if (copy_events(&ndb, pOps, pShadowTabs, values) < 0) + { + no_error= 0; + break; + } + if (verify_copy(&ndb, pTabs, pShadowTabs)) + { + no_error= 0; + break; + } + } while (0); + + // update 2 records in first table + do { + if (start_transaction(&ndb, hugo_ops)) + { + no_error= 0; + break; + } + + hugo_ops[0]->pkUpdateRecord(&ndb, 2); + + if (execute_commit(&ndb, hugo_ops)) + { + no_error= 0; + break; + } + if(close_transaction(&ndb, hugo_ops)) + { + no_error= 0; + break; + } + } while(0); + + // copy events and verify + do { + if (copy_events(&ndb, pOps, pShadowTabs, values) < 0) + { + no_error= 0; + break; + } + if (verify_copy(&ndb, pTabs, pShadowTabs)) + { + no_error= 0; + break; + } + } while (0); + + if (no_error) + DBUG_RETURN(NDBT_ProgramExit(NDBT_OK)); + DBUG_RETURN(NDBT_ProgramExit(NDBT_FAILED)); +} + +template class Vector<HugoOperations *>; +template class Vector<NdbEventOperation *>; +template class Vector<NdbRecAttr*>; +template class Vector<Vector<NdbRecAttr*> >; diff --git a/ndb/test/run-test/daily-devel-tests.txt b/ndb/test/run-test/daily-devel-tests.txt index ec90a88a77f..20f54e031e5 100644 --- a/ndb/test/run-test/daily-devel-tests.txt +++ b/ndb/test/run-test/daily-devel-tests.txt @@ -206,5 +206,5 @@ args: -l 1 -n SR9 T1 # max-time: 2500 cmd: test_event -args: -n BasicEventOperation T1 T6 +args: -n EventOperationApplier diff --git a/ndb/test/src/NDBT_ResultRow.cpp b/ndb/test/src/NDBT_ResultRow.cpp index f82963901b1..8e92a57d2e4 100644 --- a/ndb/test/src/NDBT_ResultRow.cpp +++ b/ndb/test/src/NDBT_ResultRow.cpp @@ -84,7 +84,7 @@ NDBT_ResultRow::header (NdbOut & out) const { return out; } -BaseString NDBT_ResultRow::c_str() { +BaseString NDBT_ResultRow::c_str() const { BaseString str; @@ -133,3 +133,10 @@ NDBT_ResultRow::clone () const { return row; } + +bool +NDBT_ResultRow::operator==(const NDBT_ResultRow& other) const +{ + // quick and dirty + return c_str() == other.c_str(); +} diff --git a/ndb/test/src/NdbSchemaOp.cpp b/ndb/test/src/NdbSchemaOp.cpp index 9bce0b10fc3..4281ceb02c8 100644 --- a/ndb/test/src/NdbSchemaOp.cpp +++ b/ndb/test/src/NdbSchemaOp.cpp @@ -113,7 +113,7 @@ NdbSchemaOp::createAttribute( const char* anAttrName, AttrType anAttrType, StorageMode aStorageMode, bool nullable, - StorageAttributeType aStorageAttr, + int aStorageAttr, int aDistributionKeyFlag, int aDistributionGroupFlag, int aDistributionGroupNoOfBits, diff --git a/ndb/test/src/UtilTransactions.cpp b/ndb/test/src/UtilTransactions.cpp index a7c9751ed09..65c1a7ded31 100644 --- a/ndb/test/src/UtilTransactions.cpp +++ b/ndb/test/src/UtilTransactions.cpp @@ -1300,3 +1300,152 @@ UtilTransactions::getOperation(NdbConnection* pTrans, return 0; } } + +#include <HugoOperations.hpp> + +int +UtilTransactions::compare(Ndb* pNdb, const char* tab_name2, int flags){ + + + NdbError err; + int return_code= -1, row_count= 0; + int retryAttempt = 0, retryMax = 10; + + HugoCalculator calc(tab); + NDBT_ResultRow row(tab); + NdbTransaction* pTrans= 0; + const NdbDictionary::Table* tmp= pNdb->getDictionary()->getTable(tab_name2); + if(tmp == 0) + { + g_err << "Unable to lookup table: " << tab_name2 + << endl << pNdb->getDictionary()->getNdbError() << endl; + return -1; + } + const NdbDictionary::Table& tab2= *tmp; + + HugoOperations cmp(tab2); + UtilTransactions count(tab2); + + while (true){ + + if (retryAttempt++ >= retryMax){ + g_info << "ERROR: has retried this operation " << retryAttempt + << " times, failing!" << endl; + return -1; + } + + NdbScanOperation *pOp= 0; + pTrans = pNdb->startTransaction(); + if (pTrans == NULL) { + err = pNdb->getNdbError(); + goto error; + } + + pOp= pTrans->getNdbScanOperation(tab.getName()); + if (pOp == NULL) { + ERR(err= pTrans->getNdbError()); + goto error; + } + + if( pOp->readTuples(NdbScanOperation::LM_Read) ) { + ERR(err= pTrans->getNdbError()); + goto error; + } + + if( pOp->interpret_exit_ok() == -1 ) { + ERR(err= pTrans->getNdbError()); + goto error; + } + + // Read all attributes + { + for (int a = 0; a < tab.getNoOfColumns(); a++){ + if ((row.attributeStore(a) = + pOp->getValue(tab.getColumn(a)->getName())) == 0) { + ERR(err= pTrans->getNdbError()); + goto error; + } + } + } + + if( pTrans->execute(NoCommit) == -1 ) { + ERR(err= pTrans->getNdbError()); + goto error; + } + + { + int eof; + while((eof = pOp->nextResult(true)) == 0) + { + do { + row_count++; + if(cmp.startTransaction(pNdb) != NDBT_OK) + { + ERR(err= pNdb->getNdbError()); + goto error; + } + int rowNo= calc.getIdValue(&row); + if(cmp.pkReadRecord(pNdb, rowNo, 1) != NDBT_OK) + { + ERR(err= cmp.getTransaction()->getNdbError()); + goto error; + } + if(cmp.execute_Commit(pNdb) != NDBT_OK) + { + ERR(err= cmp.getTransaction()->getNdbError()); + goto error; + } + if(row != cmp.get_row(0)) + { + g_err << "COMPARE FAILED" << endl; + g_err << row << endl; + g_err << cmp.get_row(0) << endl; + return_code= 1; + goto close; + } + retryAttempt= 0; + cmp.closeTransaction(pNdb); + } while((eof = pOp->nextResult(false)) == 0); + } + if (eof == -1) + { + err = pTrans->getNdbError(); + goto error; + } + } + + pTrans->close(); pTrans= 0; + + g_info << row_count << " rows compared" << endl; + { + int row_count2; + if(count.selectCount(pNdb, 0, &row_count2) != NDBT_OK) + { + g_err << "Failed to count rows in tab_name2" << endl; + return -1; + } + + g_info << row_count2 << " rows in tab_name2" << endl; + return (row_count == row_count2 ? 0 : 1); + } +error: + if(err.status == NdbError::TemporaryError) + { + NdbSleep_MilliSleep(50); + if(pTrans != 0) + { + pTrans->close(); + pTrans= 0; + } + if(cmp.getTransaction()) + cmp.closeTransaction(pNdb); + continue; + } + break; + } + +close: + if(pTrans != 0) pTrans->close(); + + return return_code; +} |