diff options
Diffstat (limited to 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp')
-rw-r--r-- | storage/ndb/src/ndbapi/ndb_cluster_connection.cpp | 694 |
1 files changed, 0 insertions, 694 deletions
diff --git a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp deleted file mode 100644 index 4228e26fd34..00000000000 --- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp +++ /dev/null @@ -1,694 +0,0 @@ -/* Copyright (c) 2003-2007 MySQL AB - Use is subject to license terms - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */ - -#include <ndb_global.h> -#include <my_pthread.h> -#include <my_sys.h> - -#include "ndb_cluster_connection_impl.hpp" -#include <mgmapi_configuration.hpp> -#include <mgmapi_config_parameters.h> -#include <TransporterFacade.hpp> -#include <NdbOut.hpp> -#include <NdbSleep.h> -#include <NdbThread.h> -#include <ndb_limits.h> -#include <ConfigRetriever.hpp> -#include <ndb_version.h> -#include <mgmapi_debug.h> -#include <mgmapi_internal.h> -#include <md5_hash.hpp> - -#include <EventLogger.hpp> -EventLogger g_eventLogger; - -#include <NdbMutex.h> -#ifdef VM_TRACE -NdbMutex *ndb_print_state_mutex= NULL; -#endif - -static int g_ndb_connection_count = 0; - -/* - * Ndb_cluster_connection - */ - -Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) - : m_impl(* new Ndb_cluster_connection_impl(connect_string)) -{ -} - -Ndb_cluster_connection::Ndb_cluster_connection -(Ndb_cluster_connection_impl& impl) : m_impl(impl) -{ -} - -Ndb_cluster_connection::~Ndb_cluster_connection() -{ - Ndb_cluster_connection_impl *tmp = &m_impl; - if (this != tmp) - delete tmp; -} - -int Ndb_cluster_connection::get_connected_port() const -{ - if (m_impl.m_config_retriever) - return m_impl.m_config_retriever->get_mgmd_port(); - return -1; -} - -const char *Ndb_cluster_connection::get_connected_host() const -{ - if (m_impl.m_config_retriever) - return m_impl.m_config_retriever->get_mgmd_host(); - return 0; -} - -const char *Ndb_cluster_connection::get_connectstring(char *buf, - int buf_sz) const -{ - if (m_impl.m_config_retriever) - return m_impl.m_config_retriever->get_connectstring(buf,buf_sz); - return 0; -} - -pthread_handler_t run_ndb_cluster_connection_connect_thread(void *me) -{ - Ndb_cluster_connection_impl* connection= (Ndb_cluster_connection_impl*) me; - connection->m_run_connect_thread= 1; - connection->connect_thread(); - return me; -} - -int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void)) -{ - int r; - DBUG_ENTER("Ndb_cluster_connection::start_connect_thread"); - m_impl.m_connect_callback= connect_callback; - if ((r = connect(0,0,0)) == 1) - { - DBUG_PRINT("info",("starting thread")); - m_impl.m_connect_thread= - NdbThread_Create(run_ndb_cluster_connection_connect_thread, - (void**)&m_impl, 32768, "ndb_cluster_connection", - NDB_THREAD_PRIO_LOW); - } - else if (r < 0) - { - DBUG_RETURN(-1); - } - else if (m_impl.m_connect_callback) - { - (*m_impl.m_connect_callback)(); - } - DBUG_RETURN(0); -} - -void Ndb_cluster_connection::set_optimized_node_selection(int val) -{ - m_impl.m_optimized_node_selection= val; -} - -void -Ndb_cluster_connection_impl::init_get_next_node -(Ndb_cluster_connection_node_iter &iter) -{ - if (iter.scan_state != (Uint8)~0) - iter.cur_pos= iter.scan_state; - if (iter.cur_pos >= no_db_nodes()) - iter.cur_pos= 0; - iter.init_pos= iter.cur_pos; - iter.scan_state= 0; - // fprintf(stderr,"[init %d]",iter.init_pos); - return; -} - -Uint32 -Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter) -{ - Uint32 cur_pos= iter.cur_pos; - if (cur_pos >= no_db_nodes()) - return 0; - - Ndb_cluster_connection_impl::Node *nodes= m_impl.m_all_nodes.getBase(); - Ndb_cluster_connection_impl::Node &node= nodes[cur_pos]; - - if (iter.scan_state != (Uint8)~0) - { - assert(iter.scan_state < no_db_nodes()); - if (nodes[iter.scan_state].group == node.group) - iter.scan_state= ~0; - else - return nodes[iter.scan_state++].id; - } - - // fprintf(stderr,"[%d]",node.id); - - cur_pos++; - Uint32 init_pos= iter.init_pos; - if (cur_pos == node.next_group) - { - cur_pos= nodes[init_pos].this_group; - } - - // fprintf(stderr,"[cur_pos %d]",cur_pos); - if (cur_pos != init_pos) - iter.cur_pos= cur_pos; - else - { - iter.cur_pos= node.next_group; - iter.init_pos= node.next_group; - } - return node.id; -} - -unsigned -Ndb_cluster_connection::no_db_nodes() -{ - return m_impl.m_all_nodes.size(); -} - -unsigned -Ndb_cluster_connection::node_id() -{ - return m_impl.m_transporter_facade->ownId(); -} - - -int Ndb_cluster_connection::get_no_ready() -{ - TransporterFacade *tp = m_impl.m_transporter_facade; - if (tp == 0 || tp->ownId() == 0) - return -1; - - unsigned int foundAliveNode = 0; - tp->lock_mutex(); - for(unsigned i= 0; i < no_db_nodes(); i++) - { - //************************************************ - // If any node is answering, ndb is answering - //************************************************ - if (tp->get_node_alive(m_impl.m_all_nodes[i].id) != 0) { - foundAliveNode++; - } - } - tp->unlock_mutex(); - - return foundAliveNode; -} - -int -Ndb_cluster_connection::wait_until_ready(int timeout, - int timeout_after_first_alive) -{ - DBUG_ENTER("Ndb_cluster_connection::wait_until_ready"); - TransporterFacade *tp = m_impl.m_transporter_facade; - if (tp == 0) - { - DBUG_RETURN(-1); - } - if (tp->ownId() == 0) - { - DBUG_RETURN(-1); - } - int secondsCounter = 0; - int milliCounter = 0; - int noChecksSinceFirstAliveFound = 0; - do { - unsigned int foundAliveNode = get_no_ready(); - - if (foundAliveNode == no_db_nodes()) - { - DBUG_RETURN(0); - } - else if (foundAliveNode > 0) - { - noChecksSinceFirstAliveFound++; - // 100 ms delay -> 10* - if (noChecksSinceFirstAliveFound > 10*timeout_after_first_alive) - DBUG_RETURN(1); - } - else if (secondsCounter >= timeout) - { // no alive nodes and timed out - DBUG_RETURN(-1); - } - NdbSleep_MilliSleep(100); - milliCounter += 100; - if (milliCounter >= 1000) { - secondsCounter++; - milliCounter = 0; - }//if - } while (1); -} - -unsigned Ndb_cluster_connection::get_connect_count() const -{ - return m_impl.get_connect_count(); -} - -/* - * Ndb_cluster_connection_impl - */ - -Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char * - connect_string) - : Ndb_cluster_connection(*this), - m_optimized_node_selection(1), - m_name(0), - m_run_connect_thread(0), - m_event_add_drop_mutex(0), - m_latest_trans_gci(0) -{ - DBUG_ENTER("Ndb_cluster_connection"); - DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%lx", (long) this)); - - if (!m_event_add_drop_mutex) - m_event_add_drop_mutex= NdbMutex_Create(); - - g_eventLogger.createConsoleHandler(); - g_eventLogger.setCategory("NdbApi"); - g_eventLogger.enable(Logger::LL_ON, Logger::LL_ERROR); - - m_connect_thread= 0; - m_connect_callback= 0; - -#ifdef VM_TRACE - if (ndb_print_state_mutex == NULL) - ndb_print_state_mutex= NdbMutex_Create(); -#endif - m_config_retriever= - new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API); - if (m_config_retriever->hasError()) - { - printf("Could not initialize handle to management server: %s\n", - m_config_retriever->getErrorString()); - delete m_config_retriever; - m_config_retriever= 0; - } - if (m_name) - { - NdbMgmHandle h= m_config_retriever->get_mgmHandle(); - ndb_mgm_set_name(h, m_name); - } - m_transporter_facade= new TransporterFacade(); - - NdbMutex_Lock(g_ndb_connection_mutex); - if(g_ndb_connection_count++ == 0){ - NdbDictionary::Column::FRAGMENT= - NdbColumnImpl::create_pseudo("NDB$FRAGMENT"); - NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= - NdbColumnImpl::create_pseudo("NDB$FRAGMENT_FIXED_MEMORY"); - NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= - NdbColumnImpl::create_pseudo("NDB$FRAGMENT_VARSIZED_MEMORY"); - NdbDictionary::Column::ROW_COUNT= - NdbColumnImpl::create_pseudo("NDB$ROW_COUNT"); - NdbDictionary::Column::COMMIT_COUNT= - NdbColumnImpl::create_pseudo("NDB$COMMIT_COUNT"); - NdbDictionary::Column::ROW_SIZE= - NdbColumnImpl::create_pseudo("NDB$ROW_SIZE"); - NdbDictionary::Column::RANGE_NO= - NdbColumnImpl::create_pseudo("NDB$RANGE_NO"); - NdbDictionary::Column::DISK_REF= - NdbColumnImpl::create_pseudo("NDB$DISK_REF"); - NdbDictionary::Column::RECORDS_IN_RANGE= - NdbColumnImpl::create_pseudo("NDB$RECORDS_IN_RANGE"); - NdbDictionary::Column::ROWID= - NdbColumnImpl::create_pseudo("NDB$ROWID"); - NdbDictionary::Column::ROW_GCI= - NdbColumnImpl::create_pseudo("NDB$ROW_GCI"); - NdbDictionary::Column::ANY_VALUE= - NdbColumnImpl::create_pseudo("NDB$ANY_VALUE"); - NdbDictionary::Column::COPY_ROWID= - NdbColumnImpl::create_pseudo("NDB$COPY_ROWID"); - } - NdbMutex_Unlock(g_ndb_connection_mutex); - - DBUG_VOID_RETURN; -} - -Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl() -{ - DBUG_ENTER("~Ndb_cluster_connection"); - if (m_transporter_facade != 0) - { - m_transporter_facade->stop_instance(); - } - if (m_connect_thread) - { - void *status; - m_run_connect_thread= 0; - NdbThread_WaitFor(m_connect_thread, &status); - NdbThread_Destroy(&m_connect_thread); - m_connect_thread= 0; - } - if (m_transporter_facade != 0) - { - delete m_transporter_facade; - m_transporter_facade = 0; - } - if (m_config_retriever) - { - delete m_config_retriever; - m_config_retriever= NULL; - } -#ifdef VM_TRACE - if (ndb_print_state_mutex != NULL) - { - NdbMutex_Destroy(ndb_print_state_mutex); - ndb_print_state_mutex= NULL; - } -#endif - if (m_name) - free(m_name); - - NdbMutex_Lock(g_ndb_connection_mutex); - if(--g_ndb_connection_count == 0){ - delete NdbDictionary::Column::FRAGMENT; - delete NdbDictionary::Column::FRAGMENT_FIXED_MEMORY; - delete NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY; - delete NdbDictionary::Column::ROW_COUNT; - delete NdbDictionary::Column::COMMIT_COUNT; - delete NdbDictionary::Column::ROW_SIZE; - delete NdbDictionary::Column::RANGE_NO; - delete NdbDictionary::Column::DISK_REF; - delete NdbDictionary::Column::RECORDS_IN_RANGE; - delete NdbDictionary::Column::ROWID; - delete NdbDictionary::Column::ROW_GCI; - delete NdbDictionary::Column::ANY_VALUE; - NdbDictionary::Column::FRAGMENT= 0; - NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= 0; - NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= 0; - NdbDictionary::Column::ROW_COUNT= 0; - NdbDictionary::Column::COMMIT_COUNT= 0; - NdbDictionary::Column::ROW_SIZE= 0; - NdbDictionary::Column::RANGE_NO= 0; - NdbDictionary::Column::DISK_REF= 0; - NdbDictionary::Column::RECORDS_IN_RANGE= 0; - NdbDictionary::Column::ROWID= 0; - NdbDictionary::Column::ROW_GCI= 0; - NdbDictionary::Column::ANY_VALUE= 0; - - delete NdbDictionary::Column::COPY_ROWID; - NdbDictionary::Column::COPY_ROWID = 0; - } - NdbMutex_Unlock(g_ndb_connection_mutex); - - if (m_event_add_drop_mutex) - NdbMutex_Destroy(m_event_add_drop_mutex); - - DBUG_VOID_RETURN; -} - -void -Ndb_cluster_connection_impl::set_name(const char *name) -{ - if (m_name) - free(m_name); - m_name= strdup(name); - if (m_config_retriever && m_name) - { - NdbMgmHandle h= m_config_retriever->get_mgmHandle(); - ndb_mgm_set_name(h, m_name); - } -} - -int -Ndb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid, - const ndb_mgm_configuration - &config) -{ - DBUG_ENTER("Ndb_cluster_connection_impl::init_nodes_vector"); - ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION); - - for(iter.first(); iter.valid(); iter.next()) - { - Uint32 nodeid1, nodeid2, remoteNodeId, group= 5; - const char * remoteHostName= 0, * localHostName= 0; - if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1)) continue; - if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2)) continue; - - if(nodeid1 != nodeid && nodeid2 != nodeid) continue; - remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1); - - iter.get(CFG_CONNECTION_GROUP, &group); - - { - const char * host1= 0, * host2= 0; - iter.get(CFG_CONNECTION_HOSTNAME_1, &host1); - iter.get(CFG_CONNECTION_HOSTNAME_2, &host2); - localHostName = (nodeid == nodeid1 ? host1 : host2); - remoteHostName = (nodeid == nodeid1 ? host2 : host1); - } - - Uint32 type = ~0; - if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue; - - switch(type){ - case CONNECTION_TYPE_SHM:{ - break; - } - case CONNECTION_TYPE_SCI:{ - break; - } - case CONNECTION_TYPE_TCP:{ - // connecting through localhost - // check if config_hostname is local - if (SocketServer::tryBind(0,remoteHostName)) - group--; // upgrade group value - break; - } - } - if (m_impl.m_all_nodes.push_back(Node(group,remoteNodeId))) - { - DBUG_RETURN(-1); - } - DBUG_PRINT("info",("saved %d %d", group,remoteNodeId)); - for (int i= m_impl.m_all_nodes.size()-2; - i >= 0 && m_impl.m_all_nodes[i].group > m_impl.m_all_nodes[i+1].group; - i--) - { - Node tmp= m_impl.m_all_nodes[i]; - m_impl.m_all_nodes[i]= m_impl.m_all_nodes[i+1]; - m_impl.m_all_nodes[i+1]= tmp; - } - } - - int i; - Uint32 cur_group, i_group= 0; - cur_group= ~0; - for (i= (int)m_impl.m_all_nodes.size()-1; i >= 0; i--) - { - if (m_impl.m_all_nodes[i].group != cur_group) - { - cur_group= m_impl.m_all_nodes[i].group; - i_group= i+1; - } - m_impl.m_all_nodes[i].next_group= i_group; - } - cur_group= ~0; - for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++) - { - if (m_impl.m_all_nodes[i].group != cur_group) - { - cur_group= m_impl.m_all_nodes[i].group; - i_group= i; - } - m_impl.m_all_nodes[i].this_group= i_group; - } -#if 0 - for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++) - { - fprintf(stderr, "[%d] %d %d %d %d\n", - i, - m_impl.m_all_nodes[i].id, - m_impl.m_all_nodes[i].group, - m_impl.m_all_nodes[i].this_group, - m_impl.m_all_nodes[i].next_group); - } - - do_test(); -#endif - DBUG_RETURN(0); -} - -void -Ndb_cluster_connection_impl::do_test() -{ - Ndb_cluster_connection_node_iter iter; - int n= no_db_nodes()+5; - Uint32 *nodes= new Uint32[n+1]; - - for (int g= 0; g < n; g++) - { - for (int h= 0; h < n; h++) - { - Uint32 id; - Ndb_cluster_connection_node_iter iter2; - { - for (int j= 0; j < g; j++) - { - nodes[j]= get_next_node(iter2); - } - } - - for (int i= 0; i < n; i++) - { - init_get_next_node(iter); - fprintf(stderr, "%d dead:(", g); - id= 0; - while (id == 0) - { - if ((id= get_next_node(iter)) == 0) - break; - for (int j= 0; j < g; j++) - { - if (nodes[j] == id) - { - fprintf(stderr, " %d", id); - id= 0; - break; - } - } - } - fprintf(stderr, ")"); - if (id == 0) - { - break; - } - fprintf(stderr, " %d\n", id); - } - fprintf(stderr, "\n"); - } - } - delete [] nodes; -} - -void Ndb_cluster_connection::set_name(const char *name) -{ - m_impl.set_name(name); -} - -int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds, - int verbose) -{ - struct ndb_mgm_reply mgm_reply; - - DBUG_ENTER("Ndb_cluster_connection::connect"); - do { - if (m_impl.m_config_retriever == 0) - DBUG_RETURN(-1); - if (m_impl.m_config_retriever->do_connect(no_retries, - retry_delay_in_seconds, - verbose)) - DBUG_RETURN(1); // mgmt server not up yet - - Uint32 nodeId = m_impl.m_config_retriever->allocNodeId(4/*retries*/, - 3/*delay*/); - if(nodeId == 0) - break; - ndb_mgm_configuration * props = m_impl.m_config_retriever->getConfig(); - if(props == 0) - break; - - m_impl.m_transporter_facade->start_instance(nodeId, props); - if (m_impl.init_nodes_vector(nodeId, *props)) - { - ndbout_c("Ndb_cluster_connection::connect: malloc failure"); - DBUG_RETURN(-1); - } - - for(unsigned i=0; - i<m_impl.m_transporter_facade->get_registry()->m_transporter_interface.size(); - i++) - ndb_mgm_set_connection_int_parameter(m_impl.m_config_retriever->get_mgmHandle(), - nodeId, - m_impl.m_transporter_facade->get_registry() - ->m_transporter_interface[i] - .m_remote_nodeId, - CFG_CONNECTION_SERVER_PORT, - m_impl.m_transporter_facade->get_registry() - ->m_transporter_interface[i] - .m_s_service_port, - &mgm_reply); - - ndb_mgm_destroy_configuration(props); - m_impl.m_transporter_facade->connected(); - DBUG_RETURN(0); - } while(0); - - ndbout << "Configuration error: "; - const char* erString = m_impl.m_config_retriever->getErrorString(); - if (erString == 0) { - erString = "No error specified!"; - } - ndbout << erString << endl; - DBUG_RETURN(-1); -} - -void Ndb_cluster_connection_impl::connect_thread() -{ - DBUG_ENTER("Ndb_cluster_connection_impl::connect_thread"); - int r; - do { - NdbSleep_SecSleep(1); - if ((r = connect(0,0,0)) == 0) - break; - if (r == -1) { - printf("Ndb_cluster_connection::connect_thread error\n"); - DBUG_ASSERT(false); - m_run_connect_thread= 0; - } else { - // Wait before making a new connect attempt - NdbSleep_SecSleep(1); - } - } while (m_run_connect_thread); - if (m_connect_callback) - (*m_connect_callback)(); - DBUG_VOID_RETURN; -} - -Uint64 * -Ndb_cluster_connection::get_latest_trans_gci() -{ - return m_impl.get_latest_trans_gci(); -} - -void -Ndb_cluster_connection::init_get_next_node(Ndb_cluster_connection_node_iter &iter) -{ - m_impl.init_get_next_node(iter); -} - -Uint32 -Ndb_cluster_connection::get_next_node(Ndb_cluster_connection_node_iter &iter) -{ - return m_impl.get_next_node(iter); -} - -unsigned -Ndb_cluster_connection::get_active_ndb_objects() const -{ - return m_impl.m_transporter_facade->get_active_ndb_objects(); -} - -int Ndb_cluster_connection::set_timeout(int timeout_ms) -{ - return ndb_mgm_set_timeout(m_impl.m_config_retriever->get_mgmHandle(), - timeout_ms); -} - -template class Vector<Ndb_cluster_connection_impl::Node>; - |