diff options
author | unknown <serg@serg.mylan> | 2006-04-13 15:34:39 +0200 |
---|---|---|
committer | unknown <serg@serg.mylan> | 2006-04-13 15:34:39 +0200 |
commit | ae040f25312501c62afd84d946d842dc10d9aabd (patch) | |
tree | 23592aeabac418e16afb94d05e61bd145c0d8ee0 /sql | |
parent | 720fe0c950a8151fddeb701567e8a6f6ab4d6742 (diff) | |
parent | 72d64fefd4d52808e5bd9b24bdee72182828e4db (diff) | |
download | mariadb-git-ae040f25312501c62afd84d946d842dc10d9aabd.tar.gz |
Merge bk-internal.mysql.com:/home/bk/mysql-5.1-new
into serg.mylan:/usr/home/serg/Abk/mysql-5.1
Diffstat (limited to 'sql')
-rw-r--r-- | sql/ha_ndbcluster.cc | 102 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 270 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.h | 6 |
3 files changed, 272 insertions, 106 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 28e026b8a10..587eabb82d2 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -4489,6 +4489,21 @@ int ha_ndbcluster::create(const char *name, DBUG_RETURN(my_errno); } +#ifdef HAVE_NDB_BINLOG + /* + Don't allow table creation unless + schema distribution table is setup + ( unless it is a creation of the schema dist table itself ) + */ + if (!schema_share && + !(strcmp(m_dbname, NDB_REP_DB) == 0 && + strcmp(m_tabname, NDB_SCHEMA_TABLE) == 0)) + { + DBUG_PRINT("info", ("Schema distribution table not setup")); + DBUG_RETURN(HA_ERR_NO_CONNECTION); + } +#endif /* HAVE_NDB_BINLOG */ + DBUG_PRINT("table", ("name: %s", m_tabname)); tab.setName(m_tabname); tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE)); @@ -5027,7 +5042,8 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) is_old_table_tmpfile= 0; String event_name(INJECTOR_EVENT_LEN); ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0); - ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share); + ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share, + "rename table"); } if (!result && !IS_TMP_PREFIX(new_tabname)) @@ -5111,6 +5127,15 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table"); NDBDICT *dict= ndb->getDictionary(); #ifdef HAVE_NDB_BINLOG + /* + Don't allow drop table unless + schema distribution table is setup + */ + if (!schema_share) + { + DBUG_PRINT("info", ("Schema distribution table not setup")); + DBUG_RETURN(HA_ERR_NO_CONNECTION); + } NDB_SHARE *share= get_share(path, 0, false); #endif @@ -5179,7 +5204,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0); ndbcluster_handle_drop_table(ndb, table_dropped ? event_name.c_ptr() : 0, - share); + share, "delete table"); } if (share) @@ -5208,6 +5233,18 @@ int ha_ndbcluster::delete_table(const char *name) set_dbname(name); set_tabname(name); +#ifdef HAVE_NDB_BINLOG + /* + Don't allow drop table unless + schema distribution table is setup + */ + if (!schema_share) + { + DBUG_PRINT("info", ("Schema distribution table not setup")); + DBUG_RETURN(HA_ERR_NO_CONNECTION); + } +#endif + if (check_ndb_connection()) DBUG_RETURN(HA_ERR_NO_CONNECTION); @@ -5429,6 +5466,11 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) if (!res) info(HA_STATUS_VARIABLE | HA_STATUS_CONST); +#ifdef HAVE_NDB_BINLOG + if (!ndb_binlog_tables_inited && ndb_binlog_running) + table->db_stat|= HA_READ_ONLY; +#endif + DBUG_RETURN(res); } @@ -5727,6 +5769,19 @@ int ndbcluster_drop_database_impl(const char *path) static void ndbcluster_drop_database(char *path) { + DBUG_ENTER("ndbcluster_drop_database"); +#ifdef HAVE_NDB_BINLOG + /* + Don't allow drop database unless + schema distribution table is setup + */ + if (!schema_share) + { + DBUG_PRINT("info", ("Schema distribution table not setup")); + DBUG_VOID_RETURN; + //DBUG_RETURN(HA_ERR_NO_CONNECTION); + } +#endif ndbcluster_drop_database_impl(path); #ifdef HAVE_NDB_BINLOG char db[FN_REFLEN]; @@ -5735,6 +5790,7 @@ static void ndbcluster_drop_database(char *path) current_thd->query, current_thd->query_length, db, "", 0, 0, SOT_DROP_DB); #endif + DBUG_VOID_RETURN; } /* find all tables in ndb and discover those needed @@ -5756,36 +5812,37 @@ int ndbcluster_find_all_files(THD *thd) DBUG_ENTER("ndbcluster_find_all_files"); Ndb* ndb; char key[FN_REFLEN]; - NdbDictionary::Dictionary::List list; if (!(ndb= check_ndb_in_thd(thd))) DBUG_RETURN(HA_ERR_NO_CONNECTION); NDBDICT *dict= ndb->getDictionary(); - int unhandled, retries= 5; + int unhandled, retries= 5, skipped; do { + NdbDictionary::Dictionary::List list; if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0) ERR_RETURN(dict->getNdbError()); unhandled= 0; + skipped= 0; + retries--; for (uint i= 0 ; i < list.count ; i++) { NDBDICT::List::Element& elmt= list.elements[i]; - int do_handle_table= 0; if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name)) { DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name)); continue; } DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name)); - if (elmt.state == NDBOBJ::StateOnline || - elmt.state == NDBOBJ::StateBackup) - do_handle_table= 1; - else if (!(elmt.state == NDBOBJ::StateBuilding)) + if (elmt.state != NDBOBJ::StateOnline && + elmt.state != NDBOBJ::StateBackup && + elmt.state != NDBOBJ::StateBuilding) { sql_print_information("NDB: skipping setup table %s.%s, in state %d", elmt.database, elmt.name, elmt.state); + skipped++; continue; } @@ -5794,7 +5851,7 @@ int ndbcluster_find_all_files(THD *thd) if (!(ndbtab= dict->getTable(elmt.name))) { - if (do_handle_table) + if (retries == 0) sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s", elmt.database, elmt.name, dict->getNdbError().code, @@ -5863,9 +5920,9 @@ int ndbcluster_find_all_files(THD *thd) pthread_mutex_unlock(&LOCK_open); } } - while (unhandled && retries--); + while (unhandled && retries); - DBUG_RETURN(0); + DBUG_RETURN(-(skipped + unhandled)); } int ndbcluster_find_files(THD *thd,const char *db,const char *path, @@ -7729,6 +7786,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) pthread_cond_wait(&COND_server_started, &LOCK_server_started); pthread_mutex_unlock(&LOCK_server_started); + ndbcluster_util_inited= 1; + /* Wait for cluster to start */ @@ -7760,6 +7819,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) } #ifdef HAVE_NDB_BINLOG + if (ndb_extra_logging && ndb_binlog_running) + sql_print_information("NDB Binlog: Ndb tables initially read only."); /* create tables needed by the replication */ ndbcluster_setup_binlog_table_shares(thd); #else @@ -7769,17 +7830,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ndbcluster_find_all_files(thd); #endif - ndbcluster_util_inited= 1; - -#ifdef HAVE_NDB_BINLOG - /* Signal injector thread that all is setup */ - pthread_cond_signal(&injector_cond); -#endif - set_timespec(abstime, 0); for (;!abort_loop;) { - pthread_mutex_lock(&LOCK_ndb_util_thread); pthread_cond_timedwait(&COND_ndb_util_thread, &LOCK_ndb_util_thread, @@ -7797,7 +7850,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) Check that the apply_status_share and schema_share has been created. If not try to create it */ - if (!apply_status_share || !schema_share) + if (!ndb_binlog_tables_inited) ndbcluster_setup_binlog_table_shares(thd); #endif @@ -10052,14 +10105,15 @@ static int ndbcluster_fill_files_table(THD *thd, TABLE_LIST *tables, COND *cond) } } - dict->listObjects(dflist, NdbDictionary::Object::Undofile); + NdbDictionary::Dictionary::List uflist; + dict->listObjects(uflist, NdbDictionary::Object::Undofile); ndberr= dict->getNdbError(); if (ndberr.classification != NdbError::NoError) ERR_RETURN(ndberr); - for (i= 0; i < dflist.count; i++) + for (i= 0; i < uflist.count; i++) { - NdbDictionary::Dictionary::List::Element& elt= dflist.elements[i]; + NdbDictionary::Dictionary::List::Element& elt= uflist.elements[i]; Ndb_cluster_connection_node_iter iter; unsigned id; diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index 60ccb661703..79e4fc790e0 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -48,6 +48,7 @@ int ndb_binlog_thread_running= 0; FALSE if not */ my_bool ndb_binlog_running= FALSE; +my_bool ndb_binlog_tables_inited= FALSE; /* Global reference to the ndb injector thread THD oject @@ -775,32 +776,50 @@ static int ndbcluster_create_schema_table(THD *thd) DBUG_RETURN(0); } -void ndbcluster_setup_binlog_table_shares(THD *thd) +int ndbcluster_setup_binlog_table_shares(THD *thd) { - int done_find_all_files= 0; if (!schema_share && ndbcluster_check_schema_share() == 0) { - if (!done_find_all_files) + pthread_mutex_lock(&LOCK_open); + ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE); + pthread_mutex_unlock(&LOCK_open); + if (!schema_share) { - ndbcluster_find_all_files(thd); - done_find_all_files= 1; + ndbcluster_create_schema_table(thd); + // always make sure we create the 'schema' first + if (!schema_share) + return 1; } - ndbcluster_create_schema_table(thd); - // always make sure we create the 'schema' first - if (!schema_share) - return; } if (!apply_status_share && ndbcluster_check_apply_status_share() == 0) { - if (!done_find_all_files) + pthread_mutex_lock(&LOCK_open); + ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE); + pthread_mutex_unlock(&LOCK_open); + if (!apply_status_share) { - ndbcluster_find_all_files(thd); - done_find_all_files= 1; + ndbcluster_create_apply_status_table(thd); + if (!apply_status_share) + return 1; } - ndbcluster_create_apply_status_table(thd); } + if (!ndbcluster_find_all_files(thd)) + { + pthread_mutex_lock(&LOCK_open); + ndb_binlog_tables_inited= TRUE; + if (ndb_binlog_running) + { + if (ndb_extra_logging) + sql_print_information("NDB Binlog: ndb tables writable"); + close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE); + } + pthread_mutex_unlock(&LOCK_open); + /* Signal injector thread that all is setup */ + pthread_cond_signal(&injector_cond); + } + return 0; } /* @@ -936,6 +955,31 @@ static char *ndb_pack_varchar(const NDBCOL *col, char *buf, /* log query in schema table */ +static void ndb_report_waiting(const char *key, + int the_time, + const char *op, + const char *obj) +{ + ulonglong ndb_latest_epoch= 0; + const char *proc_info= "<no info>"; + pthread_mutex_lock(&injector_mutex); + if (injector_ndb) + ndb_latest_epoch= injector_ndb->getLatestGCI(); + if (injector_thd) + proc_info= injector_thd->proc_info; + pthread_mutex_unlock(&injector_mutex); + sql_print_information("NDB %s:" + " waiting max %u sec for %s %s." + " epochs: (%u,%u,%u)" + " injector proc_info: %s" + ,key, the_time, op, obj + ,(uint)ndb_latest_handled_binlog_epoch + ,(uint)ndb_latest_received_binlog_epoch + ,(uint)ndb_latest_epoch + ,proc_info + ); +} + int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, const char *query, int query_length, const char *db, const char *table_name, @@ -965,6 +1009,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } char tmp_buf2[FN_REFLEN]; + const char *type_str; switch (type) { case SOT_DROP_TABLE: @@ -975,6 +1020,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query= tmp_buf2; query_length= (uint) (strxmov(tmp_buf2, "drop table `", table_name, "`", NullS) - tmp_buf2); + type_str= "drop table"; break; case SOT_RENAME_TABLE: /* redo the rename table query as is may contain several tables */ @@ -982,20 +1028,28 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query_length= (uint) (strxmov(tmp_buf2, "rename table `", old_db, ".", old_table_name, "` to `", db, ".", table_name, "`", NullS) - tmp_buf2); + type_str= "rename table"; break; case SOT_CREATE_TABLE: - // fall through + type_str= "create table"; + break; case SOT_ALTER_TABLE: + type_str= "create table"; break; case SOT_DROP_DB: + type_str= "drop db"; break; case SOT_CREATE_DB: + type_str= "create db"; break; case SOT_ALTER_DB: + type_str= "alter db"; break; case SOT_TABLESPACE: + type_str= "tablespace"; break; case SOT_LOGFILE_GROUP: + type_str= "logfile group"; break; default: abort(); /* should not happen, programming error */ @@ -1174,9 +1228,9 @@ end: struct timespec abstime; int i; set_timespec(abstime, 1); - (void) pthread_cond_timedwait(&injector_cond, - &ndb_schema_object->mutex, - &abstime); + int ret= pthread_cond_timedwait(&injector_cond, + &ndb_schema_object->mutex, + &abstime); (void) pthread_mutex_lock(&schema_share->mutex); for (i= 0; i < ndb_number_of_storage_nodes; i++) @@ -1198,16 +1252,19 @@ end: if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap)) break; - max_timeout--; - if (max_timeout == 0) + if (ret) { - sql_print_error("NDB create table: timed out. Ignoring..."); - break; + max_timeout--; + if (max_timeout == 0) + { + sql_print_error("NDB %s: distributing %s timed out. Ignoring...", + type_str, ndb_schema_object->key); + break; + } + if (ndb_extra_logging) + ndb_report_waiting(type_str, max_timeout, + "distributing", ndb_schema_object->key); } - if (ndb_extra_logging) - sql_print_information("NDB create table: " - "waiting max %u sec for create table %s.", - max_timeout, ndb_schema_object->key); } (void) pthread_mutex_unlock(&ndb_schema_object->mutex); } @@ -1509,9 +1566,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, share= 0; pOp->setCustomData(0); - + pthread_mutex_lock(&injector_mutex); - injector_ndb->dropEventOperation(pOp); + ndb->dropEventOperation(pOp); pOp= 0; pthread_mutex_unlock(&injector_mutex); @@ -1689,9 +1746,15 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, // skip break; case NDBEVENT::TE_CLUSTER_FAILURE: + // fall through case NDBEVENT::TE_DROP: + if (ndb_extra_logging && + ndb_binlog_tables_inited && ndb_binlog_running) + sql_print_information("NDB Binlog: ndb tables initially " + "read only on reconnect."); free_share(&schema_share); schema_share= 0; + ndb_binlog_tables_inited= FALSE; // fall through case NDBEVENT::TE_ALTER: ndb_handle_schema_change(thd, ndb, pOp, tmp_share); @@ -2385,7 +2448,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, } if (!op) { - pthread_mutex_unlock(&injector_mutex); sql_print_error("NDB Binlog: Creating NdbEventOperation failed for" " %s",event_name); push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, @@ -2393,6 +2455,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ndb->getNdbError().code, ndb->getNdbError().message, "NDB"); + pthread_mutex_unlock(&injector_mutex); DBUG_RETURN(-1); } @@ -2494,9 +2557,15 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, get_share(share); if (do_apply_status_share) + { apply_status_share= get_share(share); + (void) pthread_cond_signal(&injector_cond); + } else if (do_schema_share) + { schema_share= get_share(share); + (void) pthread_cond_signal(&injector_cond); + } DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u", share->key, share->op, share->use_count)); @@ -2513,7 +2582,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, */ int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, - NDB_SHARE *share) + NDB_SHARE *share, const char *type_str) { DBUG_ENTER("ndbcluster_handle_drop_table"); @@ -2569,21 +2638,24 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, { struct timespec abstime; set_timespec(abstime, 1); - (void) pthread_cond_timedwait(&injector_cond, - &share->mutex, - &abstime); - max_timeout--; + int ret= pthread_cond_timedwait(&injector_cond, + &share->mutex, + &abstime); if (share->op == 0) break; - if (max_timeout == 0) + if (ret) { - sql_print_error("NDB delete table: timed out. Ignoring..."); - break; + max_timeout--; + if (max_timeout == 0) + { + sql_print_error("NDB %s: %s timed out. Ignoring...", + type_str, share->key); + break; + } + if (ndb_extra_logging) + ndb_report_waiting(type_str, max_timeout, + type_str, share->key); } - if (ndb_extra_logging) - sql_print_information("NDB delete table: " - "waiting max %u sec for drop table %s.", - max_timeout, share->key); } (void) pthread_mutex_unlock(&share->mutex); #else @@ -2646,7 +2718,8 @@ static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp, } static int -ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, +ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb, + NdbEventOperation *pOp, Binlog_index_row &row) { NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); @@ -2655,18 +2728,23 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, /* make sure to flush any pending events as they can be dependent on one of the tables being changed below */ - injector_thd->binlog_flush_pending_rows_event(true); + thd->binlog_flush_pending_rows_event(true); switch (type) { case NDBEVENT::TE_CLUSTER_FAILURE: + if (ndb_extra_logging) + sql_print_information("NDB Binlog: cluster failure for %s.", share->key); if (apply_status_share == share) { + if (ndb_extra_logging && + ndb_binlog_tables_inited && ndb_binlog_running) + sql_print_information("NDB Binlog: ndb tables initially " + "read only on reconnect."); free_share(&apply_status_share); apply_status_share= 0; + ndb_binlog_tables_inited= FALSE; } - if (ndb_extra_logging) - sql_print_information("NDB Binlog: cluster failure for %s.", share->key); DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: " "%s received share: 0x%lx op: %lx share op: %lx " "op_old: %lx", @@ -2675,8 +2753,13 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, case NDBEVENT::TE_DROP: if (apply_status_share == share) { + if (ndb_extra_logging && + ndb_binlog_tables_inited && ndb_binlog_running) + sql_print_information("NDB Binlog: ndb tables initially " + "read only on reconnect."); free_share(&apply_status_share); apply_status_share= 0; + ndb_binlog_tables_inited= FALSE; } /* ToDo: remove printout */ if (ndb_extra_logging) @@ -2702,7 +2785,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, return 0; } - ndb_handle_schema_change(injector_thd, ndb, pOp, share); + ndb_handle_schema_change(thd, ndb, pOp, share); return 0; } @@ -2982,7 +3065,8 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, pthread_handler_t ndb_binlog_thread_func(void *arg) { THD *thd; /* needs to be first for thread_stack */ - Ndb *ndb= 0; + Ndb *i_ndb= 0; + Ndb *s_ndb= 0; Thd_ndb *thd_ndb=0; int ndb_update_binlog_index= 1; injector *inj= injector::instance(); @@ -3034,16 +3118,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_mutex_unlock(&LOCK_thread_count); thd->lex->start_transaction_opt= 0; - if (!(schema_ndb= new Ndb(g_ndb_cluster_connection, "")) || - schema_ndb->init()) + if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) || + s_ndb->init()) { sql_print_error("NDB Binlog: Getting Schema Ndb object failed"); goto err; } // empty database - if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) || - ndb->init()) + if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) || + i_ndb->init()) { sql_print_error("NDB Binlog: Getting Ndb object failed"); ndb_binlog_thread_running= -1; @@ -3064,7 +3148,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_mutex_lock(&injector_mutex); */ injector_thd= thd; - injector_ndb= ndb; + injector_ndb= i_ndb; + schema_ndb= s_ndb; ndb_binlog_thread_running= 1; if (opt_bin_log) { @@ -3087,7 +3172,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd->proc_info= "Waiting for ndbcluster to start"; pthread_mutex_lock(&injector_mutex); - while (!ndbcluster_util_inited) + while (!schema_share || !apply_status_share) { /* ndb not connected yet */ struct timespec abstime; @@ -3119,10 +3204,6 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd->db= db; if (ndb_binlog_running) open_binlog_index(thd, &binlog_tables, &binlog_index); - if (!apply_status_share) - { - sql_print_error("NDB: Could not get apply status share"); - } thd->db= db; } @@ -3150,14 +3231,14 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) int res= 0, tot_poll_wait= 1000; if (ndb_binlog_running) { - res= ndb->pollEvents(tot_poll_wait, &gci); + res= i_ndb->pollEvents(tot_poll_wait, &gci); tot_poll_wait= 0; } - int schema_res= schema_ndb->pollEvents(tot_poll_wait, &schema_gci); + int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci); ndb_latest_received_binlog_epoch= gci; while (gci > schema_gci && schema_res >= 0) - schema_res= schema_ndb->pollEvents(10, &schema_gci); + schema_res= s_ndb->pollEvents(10, &schema_gci); if ((abort_loop || do_ndbcluster_binlog_close_connection) && (ndb_latest_handled_binlog_epoch >= g_latest_trans_gci || @@ -3184,15 +3265,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) if (unlikely(schema_res > 0)) { - schema_ndb-> + thd->proc_info= "Processing events from schema table"; + s_ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); - schema_ndb-> + s_ndb-> setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); - NdbEventOperation *pOp= schema_ndb->nextEvent(); + NdbEventOperation *pOp= s_ndb->nextEvent(); while (pOp != NULL) { if (!pOp->hasError()) - ndb_binlog_thread_handle_schema_event(thd, schema_ndb, pOp, + ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp, &post_epoch_log_list, &post_epoch_unlock_list, &mem_root); @@ -3201,7 +3283,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) "binlog schema event", (ulong) pOp->getNdbError().code, pOp->getNdbError().message); - pOp= schema_ndb->nextEvent(); + pOp= s_ndb->nextEvent(); } } @@ -3213,7 +3295,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) int event_count= 0; #endif thd->proc_info= "Processing events"; - NdbEventOperation *pOp= ndb->nextEvent(); + NdbEventOperation *pOp= i_ndb->nextEvent(); Binlog_index_row row; while (pOp != NULL) { @@ -3224,9 +3306,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName())); DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch); - ndb-> + i_ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); - ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); + i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); bzero((char*) &row, sizeof(row)); injector::transaction trans; @@ -3235,7 +3317,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Uint32 iter= 0; const NdbEventOperation *gci_op; Uint32 event_types; - while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types)) + while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types)) != NULL) { NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData(); @@ -3321,7 +3403,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) event_count++; #endif if (pOp->hasError() && - ndb_binlog_thread_handle_error(ndb, pOp, row) < 0) + ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0) goto err; #ifndef DBUG_OFF @@ -3341,7 +3423,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Uint32 iter= 0; const NdbEventOperation *gci_op; Uint32 event_types; - while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types)) + while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types)) != NULL) { if (gci_op == pOp) @@ -3353,19 +3435,19 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) #endif if ((unsigned) pOp->getEventType() < (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT) - ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans); + ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans); else { // set injector_ndb database/schema from table internal name int ret= - ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable()); + i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable()); DBUG_ASSERT(ret == 0); - ndb_binlog_thread_handle_non_data_event(ndb, pOp, row); + ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row); // reset to catch errors - ndb->setDatabaseName(""); + i_ndb->setDatabaseName(""); } - pOp= ndb->nextEvent(); + pOp= i_ndb->nextEvent(); } while (pOp && pOp->getGCI() == gci); /* @@ -3379,6 +3461,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) if (trans.good()) { //DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes); + thd->proc_info= "Committing events to binlog"; injector::transaction::binlog_pos start= trans.start_pos(); if (int r= trans.commit()) { @@ -3418,10 +3501,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) } err: DBUG_PRINT("info",("Shutting down cluster binlog thread")); + thd->proc_info= "Shutting down"; close_thread_tables(thd); pthread_mutex_lock(&injector_mutex); /* don't mess with the injector_ndb anymore from other threads */ + injector_thd= 0; injector_ndb= 0; + schema_ndb= 0; pthread_mutex_unlock(&injector_mutex); thd->db= 0; // as not to try to free memory sql_print_information("Stopping Cluster Binlog"); @@ -3438,21 +3524,45 @@ err: } /* remove all event operations */ - if (ndb) + if (s_ndb) { NdbEventOperation *op; DBUG_PRINT("info",("removing all event operations")); - while ((op= ndb->getEventOperation())) + while ((op= s_ndb->getEventOperation())) { DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName())); DBUG_PRINT("info",("removing event operation on %s", op->getEvent()->getName())); NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); + DBUG_ASSERT(share != 0); + DBUG_ASSERT(share->op == op || + share->op_old == op); + share->op= share->op_old= 0; free_share(&share); - ndb->dropEventOperation(op); + s_ndb->dropEventOperation(op); + } + delete s_ndb; + s_ndb= 0; + } + if (i_ndb) + { + NdbEventOperation *op; + DBUG_PRINT("info",("removing all event operations")); + while ((op= i_ndb->getEventOperation())) + { + DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName())); + DBUG_PRINT("info",("removing event operation on %s", + op->getEvent()->getName())); + NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); + DBUG_ASSERT(share != 0); + DBUG_ASSERT(share->op == op || + share->op_old == op); + share->op= share->op_old= 0; + free_share(&share); + i_ndb->dropEventOperation(op); } - delete ndb; - ndb= 0; + delete i_ndb; + i_ndb= 0; } hash_free(&ndb_schema_objects); diff --git a/sql/ha_ndbcluster_binlog.h b/sql/ha_ndbcluster_binlog.h index fda025842a0..9d15016568b 100644 --- a/sql/ha_ndbcluster_binlog.h +++ b/sql/ha_ndbcluster_binlog.h @@ -101,7 +101,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, const char *old_db= 0, const char *old_table_name= 0); int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, - NDB_SHARE *share); + NDB_SHARE *share, + const char *type_str); void ndb_rep_event_name(String *event_name, const char *db, const char *tbl); int ndb_create_table_from_engine(THD *thd, const char *db, @@ -112,12 +113,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg); /* table cluster_replication.apply_status */ -void ndbcluster_setup_binlog_table_shares(THD *thd); +int ndbcluster_setup_binlog_table_shares(THD *thd); extern NDB_SHARE *apply_status_share; extern NDB_SHARE *schema_share; extern THD *injector_thd; extern my_bool ndb_binlog_running; +extern my_bool ndb_binlog_tables_inited; bool ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, |