diff options
author | unknown <mskold/marty@mysql.com/linux.site> | 2006-08-30 11:41:21 +0200 |
---|---|---|
committer | unknown <mskold/marty@mysql.com/linux.site> | 2006-08-30 11:41:21 +0200 |
commit | 3dcf86736ad48960656d2f2fce9839e6ea330070 (patch) | |
tree | 84717b118a5ea841a5baefe32b6059bf0770d9eb | |
parent | 524914b2170927aa0b1d95b63d229ac0e11fc092 (diff) | |
download | mariadb-git-3dcf86736ad48960656d2f2fce9839e6ea330070.tar.gz |
NDBAPI cleanup
-rw-r--r-- | sql/ha_ndbcluster.cc | 16 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 31 | ||||
-rw-r--r-- | storage/ndb/include/ndbapi/Ndb.hpp | 9 | ||||
-rw-r--r-- | storage/ndb/include/ndbapi/NdbTransaction.hpp | 3 | ||||
-rw-r--r-- | storage/ndb/include/ndbapi/ndb_cluster_connection.hpp | 2 | ||||
-rw-r--r-- | storage/ndb/include/transporter/TransporterRegistry.hpp | 1 | ||||
-rw-r--r-- | storage/ndb/src/common/transporter/Packer.cpp | 4 | ||||
-rw-r--r-- | storage/ndb/src/common/transporter/TransporterRegistry.cpp | 14 | ||||
-rw-r--r-- | storage/ndb/src/common/util/ndb_init.c | 19 | ||||
-rw-r--r-- | storage/ndb/src/mgmapi/mgmapi.cpp | 2 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/Ndb.cpp | 6 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp | 55 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp | 27 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp | 5 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/NdbTransaction.cpp | 11 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/ndb_cluster_connection.cpp | 87 | ||||
-rw-r--r-- | storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp | 10 |
17 files changed, 184 insertions, 118 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index e8f2ec2af2b..140ea994cad 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -48,6 +48,16 @@ extern my_bool opt_ndb_optimized_node_selection; extern const char *opt_ndbcluster_connectstring; +// ndb interface initialization/cleanup +#ifdef __cplusplus +extern "C" { +#endif +extern void ndb_init_internal(); +extern void ndb_end_internal(); +#ifdef __cplusplus +} +#endif + const char *ndb_distribution_names[]= {"KEYHASH", "LINHASH", NullS}; TYPELIB ndb_distribution_typelib= { array_elements(ndb_distribution_names)-1, "", ndb_distribution_names, NULL }; @@ -6392,6 +6402,9 @@ static int ndbcluster_init() if (have_ndbcluster != SHOW_OPTION_YES) DBUG_RETURN(0); // nothing else to do + // Initialize ndb interface + ndb_init_internal(); + // Set connectstring if specified if (opt_ndbcluster_connectstring != 0) DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring)); @@ -6540,6 +6553,9 @@ static int ndbcluster_end(ha_panic_function type) delete g_ndb_cluster_connection; g_ndb_cluster_connection= NULL; + // cleanup ndb interface + ndb_end_internal(); + pthread_mutex_destroy(&ndbcluster_mutex); pthread_mutex_destroy(&LOCK_ndb_util_thread); pthread_cond_destroy(&COND_ndb_util_thread); diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index ee9e639199c..2b03d4f82f7 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -25,6 +25,7 @@ #include "slave.h" #include "ha_ndbcluster_binlog.h" #include "NdbDictionary.hpp" +#include "ndb_cluster_connection.hpp" #include <util/NdbAutoPtr.hpp> #ifdef ndb_dynamite @@ -111,8 +112,7 @@ static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key, static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, bool have_lock); -/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */ -extern Uint64 g_latest_trans_gci; +static Uint64 *p_latest_trans_gci= 0; /* Global variables for holding the binlog_index table reference @@ -439,7 +439,7 @@ static void ndbcluster_binlog_wait(THD *thd) { DBUG_ENTER("ndbcluster_binlog_wait"); const char *save_info= thd ? thd->proc_info : 0; - ulonglong wait_epoch= g_latest_trans_gci; + ulonglong wait_epoch= *p_latest_trans_gci; int count= 30; if (thd) thd->proc_info= "Waiting for ndbcluster binlog update to " @@ -3284,6 +3284,7 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, DBUG_VOID_RETURN; } + pthread_handler_t ndb_binlog_thread_func(void *arg) { THD *thd; /* needs to be first for thread_stack */ @@ -3292,6 +3293,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Thd_ndb *thd_ndb=0; int ndb_update_binlog_index= 1; injector *inj= injector::instance(); + #ifdef RUN_NDB_BINLOG_TIMER Timer main_timer; #endif @@ -3380,6 +3382,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) */ injector_thd= thd; injector_ndb= i_ndb; + p_latest_trans_gci= + injector_ndb->get_ndb_cluster_connection().get_latest_trans_gci(); schema_ndb= s_ndb; ndb_binlog_thread_running= 1; if (opt_bin_log) @@ -3476,7 +3480,7 @@ restart: "ndb_latest_handled_binlog_epoch: %u, while current epoch: %u. " "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.", (unsigned) ndb_latest_handled_binlog_epoch, (unsigned) schema_gci); - g_latest_trans_gci= 0; + *p_latest_trans_gci= 0; ndb_latest_handled_binlog_epoch= 0; ndb_latest_applied_binlog_epoch= 0; ndb_latest_received_binlog_epoch= 0; @@ -3503,7 +3507,7 @@ restart: } do_ndbcluster_binlog_close_connection= BCCC_running; for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) && - ndb_latest_handled_binlog_epoch >= g_latest_trans_gci) && + ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci) && do_ndbcluster_binlog_close_connection != BCCC_restart; ) { #ifndef DBUG_OFF @@ -3511,8 +3515,8 @@ restart: { DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, " "ndb_latest_handled_binlog_epoch: %llu, " - "g_latest_trans_gci: %llu", do_ndbcluster_binlog_close_connection, - ndb_latest_handled_binlog_epoch, g_latest_trans_gci)); + "*p_latest_trans_gci: %llu", do_ndbcluster_binlog_close_connection, + ndb_latest_handled_binlog_epoch, *p_latest_trans_gci)); } #endif #ifdef RUN_NDB_BINLOG_TIMER @@ -3548,7 +3552,7 @@ restart: } if ((abort_loop || do_ndbcluster_binlog_close_connection) && - (ndb_latest_handled_binlog_epoch >= g_latest_trans_gci || + (ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci || !ndb_binlog_running)) break; /* Shutting down server */ @@ -3598,11 +3602,11 @@ restart: { DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart")); do_ndbcluster_binlog_close_connection= BCCC_restart; - if (ndb_latest_received_binlog_epoch < g_latest_trans_gci && ndb_binlog_running) + if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running) { sql_print_error("NDB Binlog: latest transaction in epoch %lld not in binlog " "as latest received epoch is %lld", - g_latest_trans_gci, ndb_latest_received_binlog_epoch); + *p_latest_trans_gci, ndb_latest_received_binlog_epoch); } } } @@ -3784,11 +3788,11 @@ restart: { DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart")); do_ndbcluster_binlog_close_connection= BCCC_restart; - if (ndb_latest_received_binlog_epoch < g_latest_trans_gci && ndb_binlog_running) + if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running) { sql_print_error("NDB Binlog: latest transaction in epoch %lld not in binlog " "as latest received epoch is %lld", - g_latest_trans_gci, ndb_latest_received_binlog_epoch); + *p_latest_trans_gci, ndb_latest_received_binlog_epoch); } } } @@ -3861,6 +3865,7 @@ err: /* don't mess with the injector_ndb anymore from other threads */ injector_thd= 0; injector_ndb= 0; + p_latest_trans_gci= 0; schema_ndb= 0; pthread_mutex_unlock(&injector_mutex); thd->db= 0; // as not to try to free memory @@ -3960,7 +3965,7 @@ ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, "latest_handled_binlog_epoch=%s, " "latest_applied_binlog_epoch=%s", llstr(ndb_latest_epoch, buff1), - llstr(g_latest_trans_gci, buff2), + llstr(*p_latest_trans_gci, buff2), llstr(ndb_latest_received_binlog_epoch, buff3), llstr(ndb_latest_handled_binlog_epoch, buff4), llstr(ndb_latest_applied_binlog_epoch, buff5)); diff --git a/storage/ndb/include/ndbapi/Ndb.hpp b/storage/ndb/include/ndbapi/Ndb.hpp index 07f11f6e78a..5c6ad76c063 100644 --- a/storage/ndb/include/ndbapi/Ndb.hpp +++ b/storage/ndb/include/ndbapi/Ndb.hpp @@ -1095,6 +1095,15 @@ public: #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL /** + * The current ndb_cluster_connection get_ndb_cluster_connection. + * + * @return the current connection + */ + Ndb_cluster_connection& get_ndb_cluster_connection(); +#endif + +#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL + /** * The current catalog name can be fetched by getCatalogName. * * @return the current catalog name diff --git a/storage/ndb/include/ndbapi/NdbTransaction.hpp b/storage/ndb/include/ndbapi/NdbTransaction.hpp index 3edb8c60c13..b3fb07d2905 100644 --- a/storage/ndb/include/ndbapi/NdbTransaction.hpp +++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp @@ -735,6 +735,7 @@ private: Uint32 theTCConPtr; // Transaction Co-ordinator connection pointer. Uint64 theTransactionId; // theTransactionId of the transaction Uint32 theGlobalCheckpointId; // The gloabl checkpoint identity of the transaction + Uint64 *p_latest_trans_gci; // Reference to latest gci for connection ConStatusType theStatus; // The status of the connection enum CompletionStatus { NotCompleted, @@ -753,7 +754,7 @@ private: bool theTransactionIsStarted; bool theInUseState; bool theSimpleState; - Uint8 m_abortOption; // Type of commit + Uint8 m_abortOption; // Type of commi enum ListState { NotInList, diff --git a/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp b/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp index a803d010e61..e641802a08c 100644 --- a/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp +++ b/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp @@ -114,6 +114,8 @@ public: void init_get_next_node(Ndb_cluster_connection_node_iter &iter); unsigned int get_next_node(Ndb_cluster_connection_node_iter &iter); + + Uint64 *get_latest_trans_gci(); #endif private: diff --git a/storage/ndb/include/transporter/TransporterRegistry.hpp b/storage/ndb/include/transporter/TransporterRegistry.hpp index 0bb9733e8c4..89ae3a19e87 100644 --- a/storage/ndb/include/transporter/TransporterRegistry.hpp +++ b/storage/ndb/include/transporter/TransporterRegistry.hpp @@ -361,6 +361,7 @@ private: Uint32 poll_SHM(Uint32 timeOutMillis); int m_shm_own_pid; + int m_transp_count; }; #endif // Define of TransporterRegistry_H diff --git a/storage/ndb/src/common/transporter/Packer.cpp b/storage/ndb/src/common/transporter/Packer.cpp index bcfac8417bb..e9da641de75 100644 --- a/storage/ndb/src/common/transporter/Packer.cpp +++ b/storage/ndb/src/common/transporter/Packer.cpp @@ -213,8 +213,8 @@ TransporterRegistry::unpack(Uint32 * readPtr, Uint32 * eodPtr, NodeId remoteNodeId, IOState state) { - static SignalHeader signalHeader; - static LinearSectionPtr ptr[3]; + SignalHeader signalHeader; + LinearSectionPtr ptr[3]; Uint32 loop_count = 0; if(state == NoHalt || state == HaltOutput){ while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) { diff --git a/storage/ndb/src/common/transporter/TransporterRegistry.cpp b/storage/ndb/src/common/transporter/TransporterRegistry.cpp index 458f7c4f47e..4a0be702a86 100644 --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp @@ -80,14 +80,15 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) TransporterRegistry::TransporterRegistry(void * callback, unsigned _maxTransporters, - unsigned sizeOfLongSignalMemory) + unsigned sizeOfLongSignalMemory) : + m_mgm_handle(0), + m_transp_count(0) { DBUG_ENTER("TransporterRegistry::TransporterRegistry"); nodeIdSpecified = false; maxTransporters = _maxTransporters; sendCounter = 1; - m_mgm_handle= 0; callbackObj=callback; @@ -1002,7 +1003,6 @@ TransporterRegistry::performReceive() #endif } -static int x = 0; void TransporterRegistry::performSend() { @@ -1070,7 +1070,7 @@ TransporterRegistry::performSend() } #endif #ifdef NDB_TCP_TRANSPORTER - for (i = x; i < nTCPTransporters; i++) + for (i = m_transp_count; i < nTCPTransporters; i++) { TCP_Transporter *t = theTCPTransporters[i]; if (t && t->hasDataToSend() && t->isConnected() && @@ -1079,7 +1079,7 @@ TransporterRegistry::performSend() t->doSend(); } } - for (i = 0; i < x && i < nTCPTransporters; i++) + for (i = 0; i < m_transp_count && i < nTCPTransporters; i++) { TCP_Transporter *t = theTCPTransporters[i]; if (t && t->hasDataToSend() && t->isConnected() && @@ -1088,8 +1088,8 @@ TransporterRegistry::performSend() t->doSend(); } } - x++; - if (x == nTCPTransporters) x = 0; + m_transp_count++; + if (m_transp_count == nTCPTransporters) m_transp_count = 0; #endif #endif #ifdef NDB_SCI_TRANSPORTER diff --git a/storage/ndb/src/common/util/ndb_init.c b/storage/ndb/src/common/util/ndb_init.c index f3aa734d7f9..0c71a74371a 100644 --- a/storage/ndb/src/common/util/ndb_init.c +++ b/storage/ndb/src/common/util/ndb_init.c @@ -16,6 +16,16 @@ #include <ndb_global.h> #include <my_sys.h> +#include <NdbMutex.h> + +NdbMutex *g_ndb_connection_mutex = NULL; + +void +ndb_init_internal() +{ + if (!g_ndb_connection_mutex) + g_ndb_connection_mutex = NdbMutex_Create(); +} int ndb_init() @@ -25,11 +35,20 @@ ndb_init() write(2, err, strlen(err)); exit(1); } + ndb_init_internal(); return 0; } void +ndb_end_internal() +{ + if (g_ndb_connection_mutex) + NdbMutex_Destroy(g_ndb_connection_mutex); +} + +void ndb_end(int flags) { my_end(flags); + ndb_end_internal(); } diff --git a/storage/ndb/src/mgmapi/mgmapi.cpp b/storage/ndb/src/mgmapi/mgmapi.cpp index 43021a8d669..f9eb2abd003 100644 --- a/storage/ndb/src/mgmapi/mgmapi.cpp +++ b/storage/ndb/src/mgmapi/mgmapi.cpp @@ -1194,7 +1194,7 @@ const unsigned int * ndb_mgm_get_clusterlog_severity_filter(NdbMgmHandle handle) { SET_ERROR(handle, NDB_MGM_NO_ERROR, "Executing: ndb_mgm_get_clusterlog_severity_filter"); - static unsigned int enabled[(int)NDB_MGM_EVENT_SEVERITY_ALL]= + unsigned int enabled[(int)NDB_MGM_EVENT_SEVERITY_ALL]= {0,0,0,0,0,0,0}; const ParserRow<ParserDummy> getinfo_reply[] = { MGM_CMD("clusterlog", NULL, ""), diff --git a/storage/ndb/src/ndbapi/Ndb.cpp b/storage/ndb/src/ndbapi/Ndb.cpp index 5eddbc35665..973b400f5a2 100644 --- a/storage/ndb/src/ndbapi/Ndb.cpp +++ b/storage/ndb/src/ndbapi/Ndb.cpp @@ -1172,6 +1172,12 @@ convertEndian(Uint32 Data) } // <internal> +Ndb_cluster_connection & +Ndb::get_ndb_cluster_connection() +{ + return theImpl->m_ndb_cluster_connection; +} + const char * Ndb::getCatalogName() const { return theImpl->m_dbname.c_str(); diff --git a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp index b9c03f0b209..5de686a257f 100644 --- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -1303,8 +1303,6 @@ NdbDictionaryImpl::NdbDictionaryImpl(Ndb &ndb, m_local_table_data_size= 0; } -static int f_dictionary_count = 0; - NdbDictionaryImpl::~NdbDictionaryImpl() { NdbElement_t<Ndb_local_table_info> * curr = m_localHash.m_tableHash.getNext(0); @@ -1317,33 +1315,6 @@ NdbDictionaryImpl::~NdbDictionaryImpl() curr = m_localHash.m_tableHash.getNext(curr); } - - m_globalHash->lock(); - if(--f_dictionary_count == 0){ - delete NdbDictionary::Column::FRAGMENT; - delete NdbDictionary::Column::FRAGMENT_FIXED_MEMORY; - delete NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY; - delete NdbDictionary::Column::ROW_COUNT; - delete NdbDictionary::Column::COMMIT_COUNT; - delete NdbDictionary::Column::ROW_SIZE; - delete NdbDictionary::Column::RANGE_NO; - delete NdbDictionary::Column::DISK_REF; - delete NdbDictionary::Column::RECORDS_IN_RANGE; - delete NdbDictionary::Column::ROWID; - delete NdbDictionary::Column::ROW_GCI; - NdbDictionary::Column::FRAGMENT= 0; - NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= 0; - NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= 0; - NdbDictionary::Column::ROW_COUNT= 0; - NdbDictionary::Column::COMMIT_COUNT= 0; - NdbDictionary::Column::ROW_SIZE= 0; - NdbDictionary::Column::RANGE_NO= 0; - NdbDictionary::Column::DISK_REF= 0; - NdbDictionary::Column::RECORDS_IN_RANGE= 0; - NdbDictionary::Column::ROWID= 0; - NdbDictionary::Column::ROW_GCI= 0; - } - m_globalHash->unlock(); } else { assert(curr == 0); } @@ -1486,32 +1457,6 @@ NdbDictionaryImpl::setTransporter(class Ndb* ndb, { m_globalHash = &tf->m_globalDictCache; if(m_receiver.setTransporter(ndb, tf)){ - m_globalHash->lock(); - if(f_dictionary_count++ == 0){ - NdbDictionary::Column::FRAGMENT= - NdbColumnImpl::create_pseudo("NDB$FRAGMENT"); - NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= - NdbColumnImpl::create_pseudo("NDB$FRAGMENT_FIXED_MEMORY"); - NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= - NdbColumnImpl::create_pseudo("NDB$FRAGMENT_VARSIZED_MEMORY"); - NdbDictionary::Column::ROW_COUNT= - NdbColumnImpl::create_pseudo("NDB$ROW_COUNT"); - NdbDictionary::Column::COMMIT_COUNT= - NdbColumnImpl::create_pseudo("NDB$COMMIT_COUNT"); - NdbDictionary::Column::ROW_SIZE= - NdbColumnImpl::create_pseudo("NDB$ROW_SIZE"); - NdbDictionary::Column::RANGE_NO= - NdbColumnImpl::create_pseudo("NDB$RANGE_NO"); - NdbDictionary::Column::DISK_REF= - NdbColumnImpl::create_pseudo("NDB$DISK_REF"); - NdbDictionary::Column::RECORDS_IN_RANGE= - NdbColumnImpl::create_pseudo("NDB$RECORDS_IN_RANGE"); - NdbDictionary::Column::ROWID= - NdbColumnImpl::create_pseudo("NDB$ROWID"); - NdbDictionary::Column::ROW_GCI= - NdbColumnImpl::create_pseudo("NDB$ROW_GCI"); - } - m_globalHash->unlock(); return true; } return false; diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp index 06b0d7ea5b9..d3992981522 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp @@ -925,8 +925,6 @@ NdbEventOperationImpl::printAll() * Each Ndb object has a Object. */ -// ToDo ref count this so it get's destroyed -NdbMutex *NdbEventBuffer::p_add_drop_mutex= 0; NdbEventBuffer::NdbEventBuffer(Ndb *ndb) : m_system_nodes(ndb->theImpl->theNoOfDBnodes), @@ -938,7 +936,8 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) : m_max_free_thresh(100), m_gci_slip_thresh(3), m_dropped_ev_op(0), - m_active_op_count(0) + m_active_op_count(0), + m_add_drop_mutex(0) { #ifdef VM_TRACE m_latest_command= "NdbEventBuffer::NdbEventBuffer"; @@ -950,16 +949,6 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) : exit(-1); } m_mutex= ndb->theImpl->theWaiter.m_mutex; - lock(); - if (p_add_drop_mutex == 0) - { - if ((p_add_drop_mutex = NdbMutex_Create()) == NULL) { - ndbout_c("NdbEventBuffer: NdbMutex_Create() failed"); - exit(-1); - } - } - unlock(); - // ToDo set event buffer size // pre allocate event data array m_sz= 0; @@ -969,6 +958,10 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) : m_free_data= 0; m_free_data_sz= 0; + // get reference to mutex managed by current connection + m_add_drop_mutex= + m_ndb->theImpl->m_ndb_cluster_connection.m_event_add_drop_mutex; + // initialize lists bzero(&g_empty_gci_container, sizeof(Gci_container)); init_gci_containers(); @@ -1006,14 +999,6 @@ NdbEventBuffer::~NdbEventBuffer() } NdbCondition_Destroy(p_cond); - - lock(); - if (p_add_drop_mutex) - { - NdbMutex_Destroy(p_add_drop_mutex); - p_add_drop_mutex = 0; - } - unlock(); } void diff --git a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp index 561e79a137e..1444d182042 100644 --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp @@ -406,8 +406,8 @@ public: void dropEventOperation(NdbEventOperation *); static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp); - void add_drop_lock() { NdbMutex_Lock(p_add_drop_mutex); } - void add_drop_unlock() { NdbMutex_Unlock(p_add_drop_mutex); } + void add_drop_lock() { NdbMutex_Lock(m_add_drop_mutex); } + void add_drop_unlock() { NdbMutex_Unlock(m_add_drop_mutex); } void lock() { NdbMutex_Lock(m_mutex); } void unlock() { NdbMutex_Unlock(m_mutex); } @@ -509,6 +509,7 @@ private: NdbEventOperationImpl *m_dropped_ev_op; Uint32 m_active_op_count; + NdbMutex *m_add_drop_mutex; }; inline diff --git a/storage/ndb/src/ndbapi/NdbTransaction.cpp b/storage/ndb/src/ndbapi/NdbTransaction.cpp index 916135b12d5..9788b9bdce6 100644 --- a/storage/ndb/src/ndbapi/NdbTransaction.cpp +++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp @@ -32,8 +32,6 @@ #include <signaldata/TcKeyFailConf.hpp> #include <signaldata/TcHbRep.hpp> -Uint64 g_latest_trans_gci = 0; - /***************************************************************************** NdbTransaction( Ndb* aNdb ); @@ -64,6 +62,7 @@ NdbTransaction::NdbTransaction( Ndb* aNdb ) : theTCConPtr(0), theTransactionId(0), theGlobalCheckpointId(0), + p_latest_trans_gci(0), theStatus(NotConnected), theCompletionStatus(NotCompleted), theCommitStatus(NotStarted), @@ -129,6 +128,8 @@ NdbTransaction::init() theCompletedLastOp = NULL; theGlobalCheckpointId = 0; + p_latest_trans_gci = + theNdb->theImpl->m_ndb_cluster_connection.get_latest_trans_gci(); theCommitStatus = Started; theCompletionStatus = NotCompleted; m_abortOption = AbortOnError; @@ -1572,7 +1573,7 @@ NdbTransaction::receiveTC_COMMITCONF(const TcCommitConf * commitConf) theGlobalCheckpointId = commitConf->gci; // theGlobalCheckpointId == 0 if NoOp transaction if (theGlobalCheckpointId) - g_latest_trans_gci = theGlobalCheckpointId; + *p_latest_trans_gci = theGlobalCheckpointId; return 0; } else { #ifdef NDB_NO_DROPPED_SIGNAL @@ -1752,7 +1753,7 @@ from other transactions. theCommitStatus = Committed; theGlobalCheckpointId = tGCI; assert(tGCI); - g_latest_trans_gci = tGCI; + *p_latest_trans_gci = tGCI; } else if ((tNoComp >= tNoSent) && (theLastExecOpInList->theCommitIndicator == 1)){ @@ -1930,7 +1931,7 @@ NdbTransaction::receiveTCINDXCONF(const TcIndxConf * indxConf, theCommitStatus = Committed; theGlobalCheckpointId = tGCI; assert(tGCI); - g_latest_trans_gci = tGCI; + *p_latest_trans_gci = tGCI; } else if ((tNoComp >= tNoSent) && (theLastExecOpInList->theCommitIndicator == 1)){ /**********************************************************************/ diff --git a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp index a7e645f5100..12264a60082 100644 --- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp +++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp @@ -35,8 +35,6 @@ #include <EventLogger.hpp> EventLogger g_eventLogger; -static int g_run_connect_thread= 0; - #include <NdbMutex.h> #ifdef VM_TRACE NdbMutex *ndb_print_state_mutex= NULL; @@ -87,8 +85,9 @@ const char *Ndb_cluster_connection::get_connectstring(char *buf, pthread_handler_t run_ndb_cluster_connection_connect_thread(void *me) { - g_run_connect_thread= 1; - ((Ndb_cluster_connection_impl*) me)->connect_thread(); + Ndb_cluster_connection_impl* connection= (Ndb_cluster_connection_impl*) me; + connection->m_run_connect_thread= 1; + connection->connect_thread(); return me; } @@ -258,9 +257,6 @@ unsigned Ndb_cluster_connection::get_connect_count() const return m_impl.get_connect_count(); } - - - /* * Ndb_cluster_connection_impl */ @@ -269,11 +265,17 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char * connect_string) : Ndb_cluster_connection(*this), m_optimized_node_selection(1), - m_name(0) + m_name(0), + m_run_connect_thread(0), + m_event_add_drop_mutex(0), + m_latest_trans_gci(0) { DBUG_ENTER("Ndb_cluster_connection"); DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this)); + if (!m_event_add_drop_mutex) + m_event_add_drop_mutex= NdbMutex_Create(); + g_eventLogger.createConsoleHandler(); g_eventLogger.setCategory("NdbApi"); g_eventLogger.enable(Logger::LL_ON, Logger::LL_ERROR); @@ -301,6 +303,33 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char * } m_transporter_facade= new TransporterFacade(); + NdbMutex_Lock(g_ndb_connection_mutex); + if(g_ndb_connection_count++ == 0){ + NdbDictionary::Column::FRAGMENT= + NdbColumnImpl::create_pseudo("NDB$FRAGMENT"); + NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= + NdbColumnImpl::create_pseudo("NDB$FRAGMENT_FIXED_MEMORY"); + NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= + NdbColumnImpl::create_pseudo("NDB$FRAGMENT_VARSIZED_MEMORY"); + NdbDictionary::Column::ROW_COUNT= + NdbColumnImpl::create_pseudo("NDB$ROW_COUNT"); + NdbDictionary::Column::COMMIT_COUNT= + NdbColumnImpl::create_pseudo("NDB$COMMIT_COUNT"); + NdbDictionary::Column::ROW_SIZE= + NdbColumnImpl::create_pseudo("NDB$ROW_SIZE"); + NdbDictionary::Column::RANGE_NO= + NdbColumnImpl::create_pseudo("NDB$RANGE_NO"); + NdbDictionary::Column::DISK_REF= + NdbColumnImpl::create_pseudo("NDB$DISK_REF"); + NdbDictionary::Column::RECORDS_IN_RANGE= + NdbColumnImpl::create_pseudo("NDB$RECORDS_IN_RANGE"); + NdbDictionary::Column::ROWID= + NdbColumnImpl::create_pseudo("NDB$ROWID"); + NdbDictionary::Column::ROW_GCI= + NdbColumnImpl::create_pseudo("NDB$ROW_GCI"); + } + NdbMutex_Unlock(g_ndb_connection_mutex); + DBUG_VOID_RETURN; } @@ -314,7 +343,7 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl() if (m_connect_thread) { void *status; - g_run_connect_thread= 0; + m_run_connect_thread= 0; NdbThread_WaitFor(m_connect_thread, &status); NdbThread_Destroy(&m_connect_thread); m_connect_thread= 0; @@ -339,6 +368,36 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl() if (m_name) free(m_name); + NdbMutex_Lock(g_ndb_connection_mutex); + if(--g_ndb_connection_count == 0){ + delete NdbDictionary::Column::FRAGMENT; + delete NdbDictionary::Column::FRAGMENT_FIXED_MEMORY; + delete NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY; + delete NdbDictionary::Column::ROW_COUNT; + delete NdbDictionary::Column::COMMIT_COUNT; + delete NdbDictionary::Column::ROW_SIZE; + delete NdbDictionary::Column::RANGE_NO; + delete NdbDictionary::Column::DISK_REF; + delete NdbDictionary::Column::RECORDS_IN_RANGE; + delete NdbDictionary::Column::ROWID; + delete NdbDictionary::Column::ROW_GCI; + NdbDictionary::Column::FRAGMENT= 0; + NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= 0; + NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= 0; + NdbDictionary::Column::ROW_COUNT= 0; + NdbDictionary::Column::COMMIT_COUNT= 0; + NdbDictionary::Column::ROW_SIZE= 0; + NdbDictionary::Column::RANGE_NO= 0; + NdbDictionary::Column::DISK_REF= 0; + NdbDictionary::Column::RECORDS_IN_RANGE= 0; + NdbDictionary::Column::ROWID= 0; + NdbDictionary::Column::ROW_GCI= 0; + } + NdbMutex_Unlock(g_ndb_connection_mutex); + + if (m_event_add_drop_mutex) + NdbMutex_Destroy(m_event_add_drop_mutex); + DBUG_VOID_RETURN; } @@ -576,17 +635,23 @@ void Ndb_cluster_connection_impl::connect_thread() if (r == -1) { printf("Ndb_cluster_connection::connect_thread error\n"); DBUG_ASSERT(false); - g_run_connect_thread= 0; + m_run_connect_thread= 0; } else { // Wait before making a new connect attempt NdbSleep_SecSleep(1); } - } while (g_run_connect_thread); + } while (m_run_connect_thread); if (m_connect_callback) (*m_connect_callback)(); DBUG_VOID_RETURN; } +Uint64 * +Ndb_cluster_connection::get_latest_trans_gci() +{ + m_impl.get_latest_trans_gci(); +} + void Ndb_cluster_connection::init_get_next_node(Ndb_cluster_connection_node_iter &iter) { diff --git a/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp b/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp index 6f0df0403d9..5b350dca7ba 100644 --- a/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp +++ b/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp @@ -20,6 +20,10 @@ #include <ndb_cluster_connection.hpp> #include <Vector.hpp> +#include <NdbMutex.h> + +extern NdbMutex *g_ndb_connection_mutex; +static int g_ndb_connection_count = 0; class TransporterFacade; class ConfigRetriever; @@ -41,6 +45,9 @@ class Ndb_cluster_connection_impl : public Ndb_cluster_connection Uint32 get_next_node(Ndb_cluster_connection_node_iter &iter); inline unsigned get_connect_count() const; +public: + inline Uint64 *get_latest_trans_gci() { return &m_latest_trans_gci; } + private: friend class Ndb; friend class NdbImpl; @@ -72,6 +79,9 @@ private: int m_optimized_node_selection; char *m_name; + int m_run_connect_thread; + NdbMutex *m_event_add_drop_mutex; + Uint64 m_latest_trans_gci; }; #endif |