diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/Makefile.am | 7 | ||||
-rw-r--r-- | sql/ha_ndbcluster.cc | 835 | ||||
-rw-r--r-- | sql/ha_ndbcluster.h | 37 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 2732 | ||||
-rw-r--r-- | sql/ha_ndbcluster_binlog.h | 162 | ||||
-rw-r--r-- | sql/ha_ndbcluster_tables.h | 21 | ||||
-rw-r--r-- | sql/handler.cc | 128 | ||||
-rw-r--r-- | sql/handler.h | 54 | ||||
-rw-r--r-- | sql/log.cc | 7 | ||||
-rw-r--r-- | sql/log_event.cc | 37 | ||||
-rw-r--r-- | sql/mysql_priv.h | 4 | ||||
-rw-r--r-- | sql/mysqld.cc | 61 | ||||
-rw-r--r-- | sql/rpl_injector.cc | 153 | ||||
-rw-r--r-- | sql/rpl_injector.h | 251 | ||||
-rw-r--r-- | sql/set_var.cc | 22 | ||||
-rw-r--r-- | sql/slave.cc | 83 | ||||
-rw-r--r-- | sql/sql_base.cc | 10 | ||||
-rw-r--r-- | sql/sql_class.h | 1 | ||||
-rw-r--r-- | sql/sql_db.cc | 16 | ||||
-rw-r--r-- | sql/sql_parse.cc | 6 | ||||
-rw-r--r-- | sql/sql_repl.cc | 10 |
21 files changed, 4418 insertions, 219 deletions
diff --git a/sql/Makefile.am b/sql/Makefile.am index ddbfdb88ba5..4dd1e2bad9c 100644 --- a/sql/Makefile.am +++ b/sql/Makefile.am @@ -58,6 +58,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ sql_select.h structs.h table.h sql_udf.h hash_filo.h\ lex.h lex_symbol.h sql_acl.h sql_crypt.h \ log_event.h sql_repl.h slave.h rpl_filter.h \ + rpl_injector.h \ stacktrace.h sql_sort.h sql_cache.h set_var.h \ spatial.h gstream.h client_settings.h tzfile.h \ tztime.h my_decimal.h\ @@ -89,6 +90,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ sql_load.cc mf_iocache.cc field_conv.cc sql_show.cc \ sql_udf.cc sql_analyse.cc sql_analyse.h sql_cache.cc \ slave.cc sql_repl.cc rpl_filter.cc rpl_tblmap.cc \ + rpl_injector.cc \ sql_union.cc sql_derived.cc \ client.c sql_client.cc mini_client_errors.c pack.c\ stacktrace.c repl_failsafe.h repl_failsafe.cc \ @@ -104,6 +106,8 @@ EXTRA_mysqld_SOURCES = ha_innodb.cc ha_berkeley.cc ha_archive.cc \ ha_innodb.h ha_berkeley.h ha_archive.h \ ha_blackhole.cc ha_federated.cc ha_ndbcluster.cc \ ha_blackhole.h ha_federated.h ha_ndbcluster.h \ + ha_ndbcluster_binlog.cc ha_ndbcluster_binlog.h \ + ha_ndbcluster_tables.h \ ha_partition.cc ha_partition.h mysqld_DEPENDENCIES = @mysql_se_objs@ gen_lex_hash_SOURCES = gen_lex_hash.cc @@ -160,6 +164,9 @@ ha_berkeley.o: ha_berkeley.cc ha_berkeley.h ha_ndbcluster.o:ha_ndbcluster.cc ha_ndbcluster.h $(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $< +ha_ndbcluster_binlog.o:ha_ndbcluster_binlog.cc ha_ndbcluster_binlog.h + $(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $< + #Until we can get rid of dependencies on ha_ndbcluster.h handler.o: handler.cc ha_ndbcluster.h $(CXXCOMPILE) @ndbcluster_includes@ $(CXXFLAGS) -c $< diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc index 7bc3af2c3aa..ed2a53ed2c6 100644 --- a/sql/ha_ndbcluster.cc +++ b/sql/ha_ndbcluster.cc @@ -33,6 +33,8 @@ #include <../util/Bitmask.hpp> #include <ndbapi/NdbIndexStat.hpp> +#include "ha_ndbcluster_binlog.h" + // options from from mysqld.cc extern my_bool opt_ndb_optimized_node_selection; extern const char *opt_ndbcluster_connectstring; @@ -50,13 +52,9 @@ static const int parallelism= 0; // createable against NDB from this handler static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used -static const char *ha_ndb_ext=".ndb"; -static const char share_prefix[]= "./"; - -static int ndbcluster_close_connection(THD *thd); -static int ndbcluster_commit(THD *thd, bool all); -static int ndbcluster_rollback(THD *thd, bool all); -static handler* ndbcluster_create_handler(TABLE_SHARE *table); +static bool ndbcluster_init(void); +static int ndbcluster_end(ha_panic_function flag); +static bool ndbcluster_show_status(THD*,stat_print_fn *,enum ha_stat_type); static int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info); handlerton ndbcluster_hton = { @@ -66,29 +64,7 @@ handlerton ndbcluster_hton = { "Clustered, fault-tolerant, memory-based tables", DB_TYPE_NDBCLUSTER, ndbcluster_init, - 0, /* slot */ - 0, /* savepoint size */ - ndbcluster_close_connection, - NULL, /* savepoint_set */ - NULL, /* savepoint_rollback */ - NULL, /* savepoint_release */ - ndbcluster_commit, - ndbcluster_rollback, - NULL, /* prepare */ - NULL, /* recover */ - NULL, /* commit_by_xid */ - NULL, /* rollback_by_xid */ - NULL, /* create_cursor_read_view */ - NULL, /* set_cursor_read_view */ - NULL, /* close_cursor_read_view */ - ndbcluster_create_handler, /* Create a new handler */ - ndbcluster_drop_database, /* Drop a database */ - ndbcluster_end, /* Panic call */ - NULL, /* Start Consistent Snapshot */ - NULL, /* Flush logs */ - ndbcluster_show_status, /* Show status */ - ndbcluster_alter_tablespace, - HTON_NO_FLAGS + ~(uint)0, /* slot */ }; static handler *ndbcluster_create_handler(TABLE_SHARE *table) @@ -121,33 +97,24 @@ static handler *ndbcluster_create_handler(TABLE_SHARE *table) break; \ } -// Typedefs for long names -typedef NdbDictionary::Object NDBOBJ; -typedef NdbDictionary::Column NDBCOL; -typedef NdbDictionary::Table NDBTAB; -typedef NdbDictionary::Index NDBINDEX; -typedef NdbDictionary::Dictionary NDBDICT; -typedef NdbDictionary::Event NDBEVENT; - static int ndbcluster_inited= 0; -static int ndbcluster_util_inited= 0; +int ndbcluster_util_inited= 0; static Ndb* g_ndb= NULL; -static Ndb_cluster_connection* g_ndb_cluster_connection= NULL; +Ndb_cluster_connection* g_ndb_cluster_connection= NULL; +unsigned char g_node_id_map[max_ndb_nodes]; // Handler synchronization pthread_mutex_t ndbcluster_mutex; // Table lock handling -static HASH ndbcluster_open_tables; +HASH ndbcluster_open_tables; static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length, my_bool not_used __attribute__((unused))); -static NDB_SHARE *get_share(const char *key, - bool create_if_not_exists= TRUE, - bool have_lock= FALSE); -static void free_share(NDB_SHARE **share, bool have_lock= FALSE); -static void real_free_share(NDB_SHARE **share); +#ifdef HAVE_NDB_BINLOG +static int rename_share(NDB_SHARE *share, const char *new_key); +#endif static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len); static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len); @@ -157,35 +124,9 @@ static int unpackfrm(const void **data, uint *len, static int ndb_get_table_statistics(Ndb*, const char *, struct Ndb_statistics *); -#ifndef DBUG_OFF -void print_records(TABLE *table, const char *record) -{ - if (_db_on_) - { - for (uint j= 0; j < table->s->fields; j++) - { - char buf[40]; - int pos= 0; - Field *field= table->field[j]; - const byte* field_ptr= field->ptr - table->record[0] + record; - int pack_len= field->pack_length(); - int n= pack_len < 10 ? pack_len : 10; - - for (int i= 0; i < n && pos < 20; i++) - { - pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]); - } - buf[pos]= 0; - DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf)); - } - } -} -#else -#define print_records(a,b) -#endif // Util thread variables -static pthread_t ndb_util_thread; +pthread_t ndb_util_thread; pthread_mutex_t LOCK_ndb_util_thread; pthread_cond_t COND_ndb_util_thread; pthread_handler_t ndb_util_thread_func(void *arg); @@ -214,7 +155,7 @@ static long ndb_cluster_node_id= 0; static const char * ndb_connected_host= 0; static long ndb_connected_port= 0; static long ndb_number_of_replicas= 0; -static long ndb_number_of_storage_nodes= 0; +long ndb_number_of_storage_nodes= 0; static int update_status_variables(Ndb_cluster_connection *c) { @@ -235,9 +176,6 @@ SHOW_VAR ndb_status_variables[]= { {NullS, NullS, SHOW_LONG} }; -/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */ -extern Uint64 g_latest_trans_gci; - /* Error handling functions */ @@ -365,6 +303,7 @@ Thd_ndb::Thd_ndb() all= NULL; stmt= NULL; error= 0; + options= 0; } Thd_ndb::~Thd_ndb() @@ -391,14 +330,6 @@ Thd_ndb::~Thd_ndb() } inline -Thd_ndb * -get_thd_ndb(THD *thd) { return (Thd_ndb *) thd->ha_data[ndbcluster_hton.slot]; } - -inline -void -set_thd_ndb(THD *thd, Thd_ndb *thd_ndb) { thd->ha_data[ndbcluster_hton.slot]= thd_ndb; } - -inline Ndb *ha_ndbcluster::get_ndb() { return get_thd_ndb(current_thd)->ndb; @@ -2517,8 +2448,8 @@ int ha_ndbcluster::delete_row(const byte *record) set to null. */ -static void ndb_unpack_record(TABLE *table, NdbValue *value, - MY_BITMAP *defined, byte *buf) +void ndb_unpack_record(TABLE *table, NdbValue *value, + MY_BITMAP *defined, byte *buf) { Field **p_field= table->field, *field= *p_field; uint row_offset= (uint) (buf - table->record[0]); @@ -2756,6 +2687,7 @@ int ha_ndbcluster::index_read_idx(byte *buf, uint index_no, statistic_increment(current_thd->status_var.ha_read_key_count, &LOCK_status); DBUG_ENTER("ha_ndbcluster::index_read_idx"); DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len)); + close_scan(); index_init(index_no, 0); DBUG_RETURN(index_read(buf, key, key_len, find_flag)); } @@ -3167,6 +3099,16 @@ int ha_ndbcluster::extra(enum ha_extra_function operation) m_use_write= FALSE; m_ignore_dup_key= FALSE; break; + case HA_EXTRA_IGNORE_NO_KEY: + DBUG_PRINT("info", ("HA_EXTRA_IGNORE_NO_KEY")); + DBUG_PRINT("info", ("Turning on AO_IgnoreError at Commit/NoCommit")); + m_ignore_no_key= TRUE; + break; + case HA_EXTRA_NO_IGNORE_NO_KEY: + DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_NO_KEY")); + DBUG_PRINT("info", ("Turning on AO_IgnoreError at Commit/NoCommit")); + m_ignore_no_key= FALSE; + break; default: break; } @@ -3597,7 +3539,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) Commit a transaction started in NDB */ -int ndbcluster_commit(THD *thd, bool all) +static int ndbcluster_commit(THD *thd, bool all) { int res= 0; Thd_ndb *thd_ndb= get_thd_ndb(thd); @@ -3648,7 +3590,7 @@ int ndbcluster_commit(THD *thd, bool all) Rollback a transaction started in NDB */ -int ndbcluster_rollback(THD *thd, bool all) +static int ndbcluster_rollback(THD *thd, bool all) { int res= 0; Thd_ndb *thd_ndb= get_thd_ndb(thd); @@ -3989,12 +3931,16 @@ int ha_ndbcluster::create(const char *name, if (create_from_engine) { /* - Table alreay exists in NDB and frm file has been created by + Table already exists in NDB and frm file has been created by caller. Do Ndb specific stuff, such as create a .ndb file */ if ((my_errno= write_ndb_file())) DBUG_RETURN(my_errno); +#ifdef HAVE_NDB_BINLOG + if (ndb_binlog_thread_running > 0) + ndbcluster_create_binlog_setup(get_ndb(), name2, m_dbname, m_tabname, 0); +#endif /* HAVE_NDB_BINLOG */ DBUG_RETURN(my_errno); } @@ -4133,6 +4079,74 @@ int ha_ndbcluster::create(const char *name, if (!my_errno) my_errno= write_ndb_file(); +#ifdef HAVE_NDB_BINLOG + if (!my_errno) + { + NDB_SHARE *share= 0; + pthread_mutex_lock(&ndbcluster_mutex); + /* + First make sure we get a "fresh" share here, not an old trailing one... + */ + { + const char *key= name2; + uint length= (uint) strlen(key); + if ((share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables, + (byte*) key, length))) + handle_trailing_share(share); + } + /* + get a new share + */ + if (!(share= get_share(name2, form, true, true))) + { + sql_print_error("NDB: allocating table share for %s failed", name2); + /* my_errno is set */ + } + pthread_mutex_unlock(&ndbcluster_mutex); + + while (!IS_TMP_PREFIX(m_tabname)) + { + const NDBTAB *t= dict->getTable(m_tabname); + String event_name(INJECTOR_EVENT_LEN); + ndb_rep_event_name(&event_name,m_dbname,m_tabname); + + /* + Always create an event for the table, as other mysql servers + expect it to be there. + */ + if (ndbcluster_create_event(ndb, t, event_name.c_ptr(), share) < 0) + { + /* this is only a serious error if the binlog is on */ + if (share && ndb_binlog_thread_running > 0) + { + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + "Creating event for logging table failed. " + "See error log for details."); + } + break; + } + if (ndb_extra_logging) + sql_print_information("NDB Binlog: CREATE TABLE Event: %s", + event_name.c_ptr()); + + if (share && ndb_binlog_thread_running > 0 && + ndbcluster_create_event_ops(share, t, event_name.c_ptr()) < 0) + { + sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations." + " Event: %s", name2); + /* a warning has been issued to the client */ + } + ndbcluster_log_schema_op(current_thd, share, + current_thd->query, current_thd->query_length, + share->db, share->table_name, + 0, 0, + SOT_CREATE_TABLE); + break; + } + } +#endif /* HAVE_NDB_BINLOG */ + DBUG_RETURN(my_errno); } @@ -4227,6 +4241,15 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) if (!(orig_tab= dict->getTable(m_tabname))) ERR_RETURN(dict->getNdbError()); } +#ifdef HAVE_NDB_BINLOG + NDB_SHARE *share= 0; + if (ndb_binlog_thread_running > 0 && + (share= get_share(from, 0, false))) + { + int r= rename_share(share, to); + DBUG_ASSERT(r == 0); + } +#endif m_table= (void *)orig_tab; // Change current database to that of target table set_dbname(to); @@ -4234,6 +4257,14 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) if ((result= alter_table_name(new_tabname))) { +#ifdef HAVE_NDB_BINLOG + if (share) + { + int r= rename_share(share, from); + DBUG_ASSERT(r == 0); + free_share(&share); + } +#endif DBUG_RETURN(result); } @@ -4241,9 +4272,76 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) if ((result= handler::rename_table(from, to))) { // ToDo in 4.1 should rollback alter table... +#ifdef HAVE_NDB_BINLOG + if (share) + free_share(&share); +#endif DBUG_RETURN(result); } +#ifdef HAVE_NDB_BINLOG + int is_old_table_tmpfile= 1; + if (share && share->op) + dict->forceGCPWait(); + + /* handle old table */ + if (!IS_TMP_PREFIX(m_tabname)) + { + is_old_table_tmpfile= 0; + String event_name(INJECTOR_EVENT_LEN); + ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0); + ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share); + } + + if (!result && !IS_TMP_PREFIX(new_tabname)) + { + /* always create an event for the table */ + String event_name(INJECTOR_EVENT_LEN); + ndb_rep_event_name(&event_name, to + sizeof(share_prefix) - 1, 0); + const NDBTAB *ndbtab= dict->getTable(new_tabname); + + if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share) >= 0) + { + if (ndb_extra_logging) + sql_print_information("NDB Binlog: RENAME Event: %s", + event_name.c_ptr()); + if (share) + { + if (ndbcluster_create_event_ops(share, ndbtab, + event_name.c_ptr()) < 0) + { + sql_print_error("NDB Binlog: FAILED create event operations " + "during RENAME. Event %s", event_name.c_ptr()); + /* a warning has been issued to the client */ + } + } + } + else + { + sql_print_error("NDB Binlog: FAILED create event during RENAME. " + "Event: %s", event_name.c_ptr()); + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + "Creating event for logging table failed. " + "See error log for details."); + } + if (is_old_table_tmpfile) + ndbcluster_log_schema_op(current_thd, share, + current_thd->query, current_thd->query_length, + m_dbname, new_tabname, + 0, 0, + SOT_ALTER_TABLE); + else + ndbcluster_log_schema_op(current_thd, share, + current_thd->query, current_thd->query_length, + m_dbname, new_tabname, + 0, 0, + SOT_RENAME_TABLE); + } + if (share) + free_share(&share); +#endif + DBUG_RETURN(result); } @@ -4286,6 +4384,9 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, { DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table"); NDBDICT *dict= ndb->getDictionary(); +#ifdef HAVE_NDB_BINLOG + NDB_SHARE *share= get_share(path, 0, false); +#endif /* Drop the table from NDB */ @@ -4302,9 +4403,75 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, if (res) { +#ifdef HAVE_NDB_BINLOG + /* the drop table failed for some reason, drop the share anyways */ + if (share) + { + pthread_mutex_lock(&ndbcluster_mutex); + if (share->state != NSS_DROPPED) + { + /* + The share kept by the server has not been freed, free it + */ + share->state= NSS_DROPPED; + free_share(&share, TRUE); + } + /* free the share taken above */ + free_share(&share, TRUE); + pthread_mutex_unlock(&ndbcluster_mutex); + } +#endif DBUG_RETURN(res); } +#ifdef HAVE_NDB_BINLOG + /* stop the logging of the dropped table, and cleanup */ + + /* + drop table is successful even if table does not exist in ndb + and in case table was actually not dropped, there is no need + to force a gcp, and setting the event_name to null will indicate + that there is no event to be dropped + */ + int table_dropped= dict->getNdbError().code != 709; + + if (!IS_TMP_PREFIX(table_name) && share) + { + ndbcluster_log_schema_op(current_thd, share, + current_thd->query, current_thd->query_length, + share->db, share->table_name, + 0, 0, + SOT_DROP_TABLE); + } + else if (table_dropped && share && share->op) /* ndbcluster_log_schema_op + will do a force GCP */ + dict->forceGCPWait(); + + if (!IS_TMP_PREFIX(table_name)) + { + String event_name(INJECTOR_EVENT_LEN); + ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0); + ndbcluster_handle_drop_table(ndb, + table_dropped ? event_name.c_ptr() : 0, + share); + } + + if (share) + { + pthread_mutex_lock(&ndbcluster_mutex); + if (share->state != NSS_DROPPED) + { + /* + The share kept by the server has not been freed, free it + */ + share->state= NSS_DROPPED; + free_share(&share, TRUE); + } + /* free the share taken above */ + free_share(&share, TRUE); + pthread_mutex_unlock(&ndbcluster_mutex); + } +#endif DBUG_RETURN(0); } @@ -4393,7 +4560,8 @@ ulonglong ha_ndbcluster::get_auto_increment() HA_NO_PREFIX_CHAR_KEYS | \ HA_NEED_READ_RANGE_BUFFER | \ HA_CAN_GEOMETRY | \ - HA_CAN_BIT_FIELD + HA_CAN_BIT_FIELD | \ + HA_PRIMARY_KEY_ALLOW_RANDOM_ACCESS ha_ndbcluster::ha_ndbcluster(TABLE_SHARE *table_arg): handler(&ndbcluster_hton, table_arg), @@ -4518,7 +4686,7 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) DBUG_PRINT("info", (" ref_length: %d", ref_length)); } // Init table lock structure - if (!(m_share=get_share(name))) + if (!(m_share=get_share(name, table))) DBUG_RETURN(1); thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0); @@ -4629,7 +4797,7 @@ int ha_ndbcluster::check_ndb_connection(THD* thd) } -int ndbcluster_close_connection(THD *thd) +static int ndbcluster_close_connection(THD *thd) { Thd_ndb *thd_ndb= get_thd_ndb(thd); DBUG_ENTER("ndbcluster_close_connection"); @@ -4792,14 +4960,21 @@ int ndbcluster_drop_database_impl(const char *path) DBUG_RETURN(ret); } -void ndbcluster_drop_database(char *path) +static void ndbcluster_drop_database(char *path) { ndbcluster_drop_database_impl(path); +#ifdef HAVE_NDB_BINLOG + char db[FN_REFLEN]; + ha_ndbcluster::set_dbname(path, db); + ndbcluster_log_schema_op(current_thd, 0, + current_thd->query, current_thd->query_length, + db, "", 0, 0, SOT_DROP_DB); +#endif } /* find all tables in ndb and discover those needed */ -static int ndbcluster_find_all_files(THD *thd) +int ndbcluster_find_all_files(THD *thd) { DBUG_ENTER("ndbcluster_find_all_files"); Ndb* ndb; @@ -4820,6 +4995,11 @@ static int ndbcluster_find_all_files(THD *thd) for (uint i= 0 ; i < list.count ; i++) { NDBDICT::List::Element& elmt= list.elements[i]; + if (IS_TMP_PREFIX(elmt.name)) + { + DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name)); + continue; + } DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name)); if (!(elmt.state == NDBOBJ::StateBuilding || elmt.state == NDBOBJ::StateOnline)) @@ -4834,10 +5014,11 @@ static int ndbcluster_find_all_files(THD *thd) if (!(ndbtab= dict->getTable(elmt.name))) { - sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s", - elmt.database, elmt.name, - dict->getNdbError().code, - dict->getNdbError().message); + if (elmt.state == NDBOBJ::StateOnline) + sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s", + elmt.database, elmt.name, + dict->getNdbError().code, + dict->getNdbError().message); unhandled++; continue; } @@ -4876,6 +5057,31 @@ static int ndbcluster_find_all_files(THD *thd) } pthread_mutex_unlock(&LOCK_open); } +#ifdef HAVE_NDB_BINLOG + else if (ndb_binlog_thread_running > 0) + { + /* set up replication for this table */ + NDB_SHARE *share; + pthread_mutex_lock(&ndbcluster_mutex); + if (((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables, + (byte*) key, strlen(key))) + && share->op == 0 && share->op_old == 0) + || share == 0) + { + /* + there is no binlog creation setup for this table + attempt to do it + */ + pthread_mutex_unlock(&ndbcluster_mutex); + pthread_mutex_lock(&LOCK_open); + ndbcluster_create_binlog_setup(ndb, key, elmt.database, elmt.name, + share); + pthread_mutex_unlock(&LOCK_open); + } + else + pthread_mutex_unlock(&ndbcluster_mutex); + } +#endif } } while (unhandled && retries--); @@ -4925,6 +5131,11 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, for (i= 0 ; i < list.count ; i++) { NDBDICT::List::Element& elmt= list.elements[i]; + if (IS_TMP_PREFIX(elmt.name)) + { + DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name)); + continue; + } DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name)); // Add only tables that belongs to db @@ -4983,6 +5194,39 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, } } +#ifdef HAVE_NDB_BINLOG + /* setup logging to binlog for all discovered tables */ + if (ndb_binlog_thread_running > 0) + { + char *end; + char *end1= + strxnmov(name, sizeof(name), mysql_data_home, "/", db, "/", NullS); + NDB_SHARE *share; + pthread_mutex_lock(&ndbcluster_mutex); + for (i= 0; i < ok_tables.records; i++) + { + file_name= (char*)hash_element(&ok_tables, i); + end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS); + if ((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables, + (byte*)name, end - name)) + && share->op == 0 && share->op_old == 0) + { + /* + there is no binlog creation setup for this table + attempt to do it + */ + + pthread_mutex_unlock(&ndbcluster_mutex); + pthread_mutex_lock(&LOCK_open); + ndbcluster_create_binlog_setup(ndb, name, db, file_name, share); + pthread_mutex_unlock(&LOCK_open); + pthread_mutex_lock(&ndbcluster_mutex); + } + } + pthread_mutex_unlock(&ndbcluster_mutex); + } +#endif + // Check for new files to discover DBUG_PRINT("info", ("Checking for new files to discover")); List<char> create_list; @@ -5055,11 +5299,18 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, static int connect_callback() { update_status_variables(g_ndb_cluster_connection); + + uint node_id, i= 0; + Ndb_cluster_connection_node_iter node_iter; + memset((void *)g_node_id_map, 0xFFFF, sizeof(g_node_id_map)); + while ((node_id= g_ndb_cluster_connection->get_next_node(node_iter))) + g_node_id_map[node_id]= i++; + pthread_cond_signal(&COND_ndb_util_thread); return 0; } -bool ndbcluster_init() +static bool ndbcluster_init() { int res; DBUG_ENTER("ndbcluster_init"); @@ -5067,6 +5318,22 @@ bool ndbcluster_init() if (have_ndbcluster != SHOW_OPTION_YES) goto ndbcluster_init_error; + { + handlerton &h= ndbcluster_hton; + h.close_connection= ndbcluster_close_connection; + h.commit= ndbcluster_commit; + h.rollback= ndbcluster_rollback; + h.create= ndbcluster_create_handler; /* Create a new handler */ + h.drop_database= ndbcluster_drop_database; /* Drop a database */ + h.panic= ndbcluster_end; /* Panic call */ + h.show_status= ndbcluster_show_status; /* Show status */ + h.alter_tablespace= ndbcluster_alter_tablespace; /* Show status */ +#ifdef HAVE_NDB_BINLOG + ndbcluster_binlog_init_handlerton(); +#endif + h.flags= HTON_NO_FLAGS; + } + // Set connectstring if specified if (opt_ndbcluster_connectstring != 0) DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring)); @@ -5130,6 +5397,22 @@ bool ndbcluster_init() (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0, (hash_get_key) ndbcluster_get_key,0,0); pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST); +#ifdef HAVE_NDB_BINLOG + /* start the ndb injector thread */ + if (opt_bin_log) + { + if (binlog_row_based) + { + if (ndbcluster_binlog_start()) + goto ndbcluster_init_error; + } + else + { + sql_print_error("NDB: only row based binary logging is supported"); + } + } +#endif /* HAVE_NDB_BINLOG */ + pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST); pthread_cond_init(&COND_ndb_util_thread, NULL); @@ -5160,26 +5443,13 @@ ndbcluster_init_error: DBUG_RETURN(TRUE); } - -/* - End use of the NDB Cluster table handler - - free all global variables allocated by - ndbcluster_init() -*/ - -int ndbcluster_end(ha_panic_function type) +static int ndbcluster_end(ha_panic_function type) { DBUG_ENTER("ndbcluster_end"); if (!ndbcluster_inited) DBUG_RETURN(0); - // Kill ndb utility thread - (void) pthread_mutex_lock(&LOCK_ndb_util_thread); - DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread)); - (void) pthread_cond_signal(&COND_ndb_util_thread); - (void) pthread_mutex_unlock(&LOCK_ndb_util_thread); - if (g_ndb) { #ifndef DBUG_OFF @@ -5206,7 +5476,6 @@ int ndbcluster_end(ha_panic_function type) pthread_mutex_destroy(&LOCK_ndb_util_thread); pthread_cond_destroy(&COND_ndb_util_thread); ndbcluster_inited= 0; - ndbcluster_util_inited= 0; DBUG_RETURN(0); } @@ -5673,60 +5942,6 @@ ha_ndbcluster::register_query_cache_table(THD *thd, } -#ifndef DBUG_OFF -static void dbug_print_table(const char *info, TABLE *table) -{ - if (table == 0) - { - DBUG_PRINT("info",("%s: (null)", info)); - return; - } - DBUG_PRINT("info", - ("%s: %s.%s s->fields: %d " - "reclength: %d rec_buff_length: %d record[0]: %lx " - "record[1]: %lx", - info, - table->s->db, - table->s->table_name, - table->s->fields, - table->s->reclength, - table->s->rec_buff_length, - table->record[0], - table->record[1])); - - for (unsigned int i= 0; i < table->s->fields; i++) - { - Field *f= table->field[i]; - DBUG_PRINT("info", - ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d " - "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]", - i, - f->field_name, - f->flags, - (f->flags & PRI_KEY_FLAG) ? "pri" : "attr", - (f->flags & NOT_NULL_FLAG) ? "" : ",nullable", - (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed", - (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "", - (f->flags & BLOB_FLAG) ? ",blob" : "", - (f->flags & BINARY_FLAG) ? ",binary" : "", - f->real_type(), - f->pack_length(), - f->ptr, f->ptr - table->record[0], - f->null_bit, - f->null_ptr, (byte*) f->null_ptr - table->record[0])); - if (f->type() == MYSQL_TYPE_BIT) - { - Field_bit *g= (Field_bit*) f; - DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] " - "bit_ofs: %u bit_len: %u", - g->field_length, g->bit_ptr, - (byte*) g->bit_ptr-table->record[0], - g->bit_ofs, g->bit_len)); - } - } -} -#endif - /* Handling the shared NDB_SHARE structure that is needed to provide table locking. @@ -5756,6 +5971,12 @@ static void dbug_print_open_tables() ("db.tablename: %s.%s use_count: %d commit_count: %d", share->db, share->table_name, share->use_count, share->commit_count)); +#ifdef HAVE_NDB_BINLOG + if (share->table) + DBUG_PRINT("share", + ("table->s->db.table_name: %s.%s", + share->table->s->db.str, share->table->s->table_name.str)); +#endif } DBUG_VOID_RETURN; } @@ -5763,11 +5984,170 @@ static void dbug_print_open_tables() #define dbug_print_open_tables() #endif +#ifdef HAVE_NDB_BINLOG +/* + For some reason a share is still around, try to salvage the situation + by closing all cached tables. If the share still exists, there is an + error somewhere but only report this to the error log. Keep this + "trailing share" but rename it since there are still references to it + to avoid segmentation faults. There is a risk that the memory for + this trailing share leaks. + + Must be called with previous pthread_mutex_lock(&ndbcluster_mutex) +*/ +int handle_trailing_share(NDB_SHARE *share) +{ + static ulong trailing_share_id= 0; + DBUG_ENTER("handle_trailing_share"); + + ++share->use_count; + pthread_mutex_unlock(&ndbcluster_mutex); + + close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE); + + pthread_mutex_lock(&ndbcluster_mutex); + if (!--share->use_count) + { + DBUG_PRINT("info", ("NDB_SHARE: close_cashed_tables %s freed share.", + share->key)); + real_free_share(&share); + DBUG_RETURN(0); + } + + /* + share still exists, if share has not been dropped by server + release that share + */ + if (share->state != NSS_DROPPED && !--share->use_count) + { + DBUG_PRINT("info", ("NDB_SHARE: %s already exists, " + "use_count=%d state != NSS_DROPPED.", + share->key, share->use_count)); + real_free_share(&share); + DBUG_RETURN(0); + } + DBUG_PRINT("error", ("NDB_SHARE: %s already exists use_count=%d.", + share->key, share->use_count)); + + sql_print_error("NDB_SHARE: %s already exists use_count=%d." + " Moving away for safety, but possible memleak.", + share->key, share->use_count); + dbug_print_open_tables(); + + /* + This is probably an error. We can however save the situation + at the cost of a possible mem leak, by "renaming" the share + - First remove from hash + */ + hash_delete(&ndbcluster_open_tables, (byte*) share); + + /* + now give it a new name, just a running number + if space is not enough allocate some more + */ + { + const uint min_key_length= 10; + if (share->key_length < min_key_length) + { + share->key= alloc_root(&share->mem_root, min_key_length + 1); + share->key_length= min_key_length; + } + share->key_length= + my_snprintf(share->key, min_key_length + 1, "#leak%d", + trailing_share_id++); + } + /* Keep it for possible the future trailing free */ + my_hash_insert(&ndbcluster_open_tables, (byte*) share); + + DBUG_RETURN(0); +} + +/* + Rename share is used during rename table. +*/ +static int rename_share(NDB_SHARE *share, const char *new_key) +{ + NDB_SHARE *tmp; + pthread_mutex_lock(&ndbcluster_mutex); + uint new_length= (uint) strlen(new_key); + DBUG_PRINT("rename_share", ("old_key: %s old__length: %d", + share->key, share->key_length)); + if ((tmp= (NDB_SHARE*) hash_search(&ndbcluster_open_tables, + (byte*) new_key, new_length))) + handle_trailing_share(tmp); + + /* remove the share from hash */ + hash_delete(&ndbcluster_open_tables, (byte*) share); + dbug_print_open_tables(); + + /* save old stuff if insert should fail */ + uint old_length= share->key_length; + char *old_key= share->key; + + /* + now allocate and set the new key, db etc + enough space for key, db, and table_name + */ + share->key= alloc_root(&share->mem_root, 2 * (new_length + 1)); + strmov(share->key, new_key); + share->key_length= new_length; + + if (my_hash_insert(&ndbcluster_open_tables, (byte*) share)) + { + // ToDo free the allocated stuff above? + DBUG_PRINT("error", ("rename_share: my_hash_insert %s failed", + share->key)); + share->key= old_key; + share->key_length= old_length; + if (my_hash_insert(&ndbcluster_open_tables, (byte*) share)) + { + sql_print_error("rename_share: failed to recover %s", share->key); + DBUG_PRINT("error", ("rename_share: my_hash_insert %s failed", + share->key)); + } + dbug_print_open_tables(); + pthread_mutex_unlock(&ndbcluster_mutex); + return -1; + } + dbug_print_open_tables(); + + share->db= share->key + new_length + 1; + ha_ndbcluster::set_dbname(new_key, share->db); + share->table_name= share->db + strlen(share->db) + 1; + ha_ndbcluster::set_tabname(new_key, share->table_name); + + DBUG_PRINT("rename_share", + ("0x%lx key: %s key_length: %d", + share, share->key, share->key_length)); + DBUG_PRINT("rename_share", + ("db.tablename: %s.%s use_count: %d commit_count: %d", + share->db, share->table_name, + share->use_count, share->commit_count)); + DBUG_PRINT("rename_share", + ("table->s->db.table_name: %s.%s", + share->table->s->db.str, share->table->s->table_name.str)); + + if (share->op == 0) + { + share->table->s->db.str= share->db; + share->table->s->db.length= strlen(share->db); + share->table->s->table_name.str= share->table_name; + share->table->s->table_name.length= strlen(share->table_name); + } + /* else rename will be handled when the ALTER event comes */ + share->old_names= old_key; + // ToDo free old_names after ALTER EVENT + + pthread_mutex_unlock(&ndbcluster_mutex); + return 0; +} +#endif + /* Increase refcount on existing share. Always returns share and cannot fail. */ -static NDB_SHARE *get_share(NDB_SHARE *share) +NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share) { pthread_mutex_lock(&ndbcluster_mutex); share->use_count++; @@ -5799,9 +6179,13 @@ static NDB_SHARE *get_share(NDB_SHARE *share) have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken */ -static NDB_SHARE *get_share(const char *key, bool create_if_not_exists, - bool have_lock) +NDB_SHARE *ndbcluster_get_share(const char *key, TABLE *table, + bool create_if_not_exists, + bool have_lock) { + DBUG_ENTER("get_share"); + DBUG_PRINT("info", ("get_share: key %s", key)); + THD *thd= current_thd; NDB_SHARE *share; if (!have_lock) pthread_mutex_lock(&ndbcluster_mutex); @@ -5847,6 +6231,9 @@ static NDB_SHARE *get_share(const char *key, bool create_if_not_exists, ha_ndbcluster::set_dbname(key, share->db); share->table_name= share->db + strlen(share->db) + 1; ha_ndbcluster::set_tabname(key, share->table_name); +#ifdef HAVE_NDB_BINLOG + ndbcluster_binlog_init_share(share, table); +#endif *root_ptr= old_root; } else @@ -5874,7 +6261,7 @@ static NDB_SHARE *get_share(const char *key, bool create_if_not_exists, return share; } -static void real_free_share(NDB_SHARE **share) +void ndbcluster_real_free_share(NDB_SHARE **share) { DBUG_PRINT("real_free_share", ("0x%lx key: %s key_length: %d", @@ -5889,6 +6276,26 @@ static void real_free_share(NDB_SHARE **share) pthread_mutex_destroy(&(*share)->mutex); free_root(&(*share)->mem_root, MYF(0)); +#ifdef HAVE_NDB_BINLOG + if ((*share)->table) + { + closefrm((*share)->table, 0); +#if 0 // todo ? + free_root(&(*share)->table->mem_root, MYF(0)); +#endif + +#ifndef DBUG_OFF + bzero((gptr)(*share)->table_share, sizeof(*(*share)->table_share)); + bzero((gptr)(*share)->table, sizeof(*(*share)->table)); +#endif + my_free((gptr) (*share)->table_share, MYF(0)); + my_free((gptr) (*share)->table, MYF(0)); +#ifndef DBUG_OFF + (*share)->table_share= 0; + (*share)->table= 0; +#endif + } +#endif my_free((gptr) *share, MYF(0)); *share= 0; @@ -5901,7 +6308,7 @@ static void real_free_share(NDB_SHARE **share) have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken */ -static void free_share(NDB_SHARE **share, bool have_lock) +void ndbcluster_free_share(NDB_SHARE **share, bool have_lock) { if (!have_lock) pthread_mutex_lock(&ndbcluster_mutex); @@ -5927,7 +6334,6 @@ static void free_share(NDB_SHARE **share, bool have_lock) } - /* Internal representation of the frm blob @@ -6601,7 +7007,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) Wait for cluster to start */ pthread_mutex_lock(&LOCK_ndb_util_thread); - while (!ndb_cluster_node_id) + while (!ndb_cluster_node_id && (ndbcluster_hton.slot != ~(uint)0)) { /* ndb not connected yet */ set_timespec(abstime, 1); @@ -6616,13 +7022,35 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) } pthread_mutex_unlock(&LOCK_ndb_util_thread); + { + Thd_ndb *thd_ndb; + if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb())) + { + sql_print_error("Could not allocate Thd_ndb object"); + goto ndb_util_thread_end; + } + set_thd_ndb(thd, thd_ndb); + thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP; + } + +#ifdef HAVE_NDB_BINLOG + /* create tables needed by the replication */ + ndbcluster_setup_binlog_table_shares(thd); +#else /* Get all table definitions from the storage node */ ndbcluster_find_all_files(thd); +#endif ndbcluster_util_inited= 1; +#ifdef HAVE_NDB_BINLOG + /* If running, signal injector thread that all is setup */ + if (ndb_binlog_thread_running > 0) + pthread_cond_signal(&injector_cond); +#endif + set_timespec(abstime, 0); for (;!abort_loop;) { @@ -6639,6 +7067,15 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) if (abort_loop) break; /* Shutting down server */ +#ifdef HAVE_NDB_BINLOG + /* + Check that the apply_status_share and schema_share has been created. + If not try to create it + */ + if (!apply_status_share || !schema_share) + ndbcluster_setup_binlog_table_shares(thd); +#endif + if (ndb_cache_check_time == 0) { /* Wake up in 1 second to check if value has changed */ @@ -6652,6 +7089,12 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) for (uint i= 0; i < ndbcluster_open_tables.records; i++) { share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i); +#ifdef HAVE_NDB_BINLOG + if ((share->use_count - (int) (share->op != 0) - (int) (share->op != 0)) + <= 0) + continue; // injector thread is the only user, skip statistics + share->util_lock= current_thd; // Mark that util thread has lock +#endif /* HAVE_NDB_BINLOG */ share->use_count++; /* Make sure the table can't be closed */ DBUG_PRINT("ndb_util_thread", ("Found open table[%d]: %s, use_count: %d", @@ -6666,6 +7109,17 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) List_iterator_fast<NDB_SHARE> it(util_open_tables); while ((share= it++)) { +#ifdef HAVE_NDB_BINLOG + if ((share->use_count - (int) (share->op != 0) - (int) (share->op != 0)) + <= 1) + { + /* + Util thread and injector thread is the only user, skip statistics + */ + free_share(&share); + continue; + } +#endif /* HAVE_NDB_BINLOG */ DBUG_PRINT("ndb_util_thread", ("Fetching commit count for: %s", share->key)); @@ -6727,6 +7181,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) } } ndb_util_thread_end: + sql_print_information("Stopping Cluster Utility thread"); net_end(&thd->net); thd->cleanup(); delete thd; @@ -8072,6 +8527,7 @@ ndbcluster_show_status(THD* thd, stat_print_fn *stat_print, enum ha_stat_type stat_type) { char buf[IO_SIZE]; + uint buflen; DBUG_ENTER("ndbcluster_show_status"); if (have_ndbcluster != SHOW_OPTION_YES) @@ -8082,7 +8538,23 @@ ndbcluster_show_status(THD* thd, stat_print_fn *stat_print, { DBUG_RETURN(FALSE); } - + + update_status_variables(g_ndb_cluster_connection); + buflen= + my_snprintf(buf, sizeof(buf), + "cluster_node_id=%u, " + "connected_host=%s, " + "connected_port=%u, " + "number_of_storage_nodes=%u", + ndb_cluster_node_id, + ndb_connected_host, + ndb_connected_port, + ndb_number_of_storage_nodes); + if (stat_print(thd, ndbcluster_hton.name, strlen(ndbcluster_hton.name), + "connection", strlen("connection"), + buf, buflen)) + DBUG_RETURN(TRUE); + if (get_thd_ndb(thd) && get_thd_ndb(thd)->ndb) { Ndb* ndb= (get_thd_ndb(thd))->ndb; @@ -8090,7 +8562,7 @@ ndbcluster_show_status(THD* thd, stat_print_fn *stat_print, tmp.m_name= 0; while (ndb->get_free_list_usage(&tmp)) { - uint buflen= + buflen= my_snprintf(buf, sizeof(buf), "created=%u, free=%u, sizeof=%u", tmp.m_created, tmp.m_free, tmp.m_sizeof); @@ -8099,11 +8571,14 @@ ndbcluster_show_status(THD* thd, stat_print_fn *stat_print, DBUG_RETURN(TRUE); } } - send_eof(thd); - +#ifdef HAVE_NDB_BINLOG + ndbcluster_show_status_binlog(thd, stat_print, stat_type); +#endif + DBUG_RETURN(FALSE); } + /* Create a table in NDB Cluster */ diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h index f05c1c32a1a..694c9d9ff53 100644 --- a/sql/ha_ndbcluster.h +++ b/sql/ha_ndbcluster.h @@ -42,8 +42,10 @@ class NdbEventOperation; // connectstring to cluster if given by mysqld extern const char *ndbcluster_connectstring; extern ulong ndb_cache_check_time; +#ifdef HAVE_NDB_BINLOG extern ulong ndb_report_thresh_binlog_epoch_slip; extern ulong ndb_report_thresh_binlog_mem_usage; +#endif typedef enum ndb_index_type { UNDEFINED_INDEX = 0, @@ -86,8 +88,26 @@ typedef struct st_ndbcluster_share { ulonglong commit_count; char *db; char *table_name; +#ifdef HAVE_NDB_BINLOG + uint32 flags; + NDB_SHARE_STATE state; + NdbEventOperation *op; + NdbEventOperation *op_old; // for rename table + char *old_names; // for rename table + TABLE_SHARE *table_share; + TABLE *table; + NdbValue *ndb_value[2]; + MY_BITMAP *subscriber_bitmap; + MY_BITMAP slock_bitmap; + uint32 slock[256/32]; // 256 bits for lock status of table +#endif } NDB_SHARE; +#ifdef HAVE_NDB_BINLOG +/* NDB_SHARE.flags */ +#define NSF_HIDDEN_PK 1 /* table has hidden primary key */ +#endif + typedef enum ndb_item_type { NDB_VALUE = 0, // Qualified more with Item::Type NDB_FIELD = 1, // Qualified from table definition @@ -461,6 +481,11 @@ class Ndb_cond_traverse_context Place holder for ha_ndbcluster thread specific data */ +enum THD_NDB_OPTIONS +{ + TNO_NO_LOG_SCHEMA_OP= 1 << 0 +}; + class Thd_ndb { public: @@ -472,6 +497,7 @@ class Thd_ndb NdbTransaction *all; NdbTransaction *stmt; int error; + uint32 options; List<NDB_SHARE> changed_tables; }; @@ -553,6 +579,9 @@ class ha_ndbcluster: public handler bool low_byte_first() const; bool has_transactions(); + + virtual bool is_injective() const { return true; } + const char* index_type(uint key_number); double scan_time(); @@ -773,18 +802,10 @@ private: extern SHOW_VAR ndb_status_variables[]; -bool ndbcluster_init(void); -int ndbcluster_end(ha_panic_function flag); - int ndbcluster_discover(THD* thd, const char* dbname, const char* name, const void** frmblob, uint* frmlen); int ndbcluster_find_files(THD *thd,const char *db,const char *path, const char *wild, bool dir, List<char> *files); int ndbcluster_table_exists_in_engine(THD* thd, const char *db, const char *name); -void ndbcluster_drop_database(char* path); - void ndbcluster_print_error(int error, const NdbOperation *error_op); - -bool ndbcluster_show_status(THD*,stat_print_fn *,enum ha_stat_type); - diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc new file mode 100644 index 00000000000..c80b2b27d8d --- /dev/null +++ b/sql/ha_ndbcluster_binlog.cc @@ -0,0 +1,2732 @@ +/* Copyright (C) 2000-2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "mysql_priv.h" +#include "ha_ndbcluster.h" + +#ifdef HAVE_NDB_BINLOG +#include "rpl_injector.h" +#include "slave.h" +#include "ha_ndbcluster_binlog.h" + +/* + defines for cluster replication table names +*/ +#include "ha_ndbcluster_tables.h" +#define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE +#define NDB_SCHEMA_TABLE_FILE "./" NDB_REP_DB "/" NDB_SCHEMA_TABLE + +/* + Flag showing if the ndb injector thread is running, if so == 1 + -1 if it was started but later stopped for some reason + 0 if never started +*/ +int ndb_binlog_thread_running= 0; + +/* + Global reference to the ndb injector thread THD oject + + Has one sole purpose, for setting the in_use table member variable + in get_share(...) +*/ +THD *injector_thd= 0; + +/* + Global reference to ndb injector thd object. + + Used mainly by the binlog index thread, but exposed to the client sql + thread for one reason; to setup the events operations for a table + to enable ndb injector thread receiving events. + + Must therefore always be used with a surrounding + pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation +*/ +static Ndb *injector_ndb= 0; +static Ndb *schema_ndb= 0; + +/* + Mutex and condition used for interacting between client sql thread + and injector thread +*/ +pthread_t ndb_binlog_thread; +pthread_mutex_t injector_mutex; +pthread_cond_t injector_cond; + +/* NDB Injector thread (used for binlog creation) */ +static ulonglong ndb_latest_applied_binlog_epoch= 0; +static ulonglong ndb_latest_handled_binlog_epoch= 0; +static ulonglong ndb_latest_received_binlog_epoch= 0; + +NDB_SHARE *apply_status_share= 0; +NDB_SHARE *schema_share= 0; + +/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */ +extern Uint64 g_latest_trans_gci; + +/* + Global variables for holding the binlog_index table reference +*/ +static TABLE *binlog_index= 0; +static TABLE_LIST binlog_tables; + +/* + Helper functions +*/ + +#ifndef DBUG_OFF +static void print_records(TABLE *table, const char *record) +{ + if (_db_on_) + { + for (uint j= 0; j < table->s->fields; j++) + { + char buf[40]; + int pos= 0; + Field *field= table->field[j]; + const byte* field_ptr= field->ptr - table->record[0] + record; + int pack_len= field->pack_length(); + int n= pack_len < 10 ? pack_len : 10; + + for (int i= 0; i < n && pos < 20; i++) + { + pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]); + } + buf[pos]= 0; + DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf)); + } + } +} +#else +#define print_records(a,b) +#endif + + +#ifndef DBUG_OFF +static void dbug_print_table(const char *info, TABLE *table) +{ + if (table == 0) + { + DBUG_PRINT("info",("%s: (null)", info)); + return; + } + DBUG_PRINT("info", + ("%s: %s.%s s->fields: %d " + "reclength: %d rec_buff_length: %d record[0]: %lx " + "record[1]: %lx", + info, + table->s->db.str, + table->s->table_name.str, + table->s->fields, + table->s->reclength, + table->s->rec_buff_length, + table->record[0], + table->record[1])); + + for (unsigned int i= 0; i < table->s->fields; i++) + { + Field *f= table->field[i]; + DBUG_PRINT("info", + ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d " + "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]", + i, + f->field_name, + f->flags, + (f->flags & PRI_KEY_FLAG) ? "pri" : "attr", + (f->flags & NOT_NULL_FLAG) ? "" : ",nullable", + (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed", + (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "", + (f->flags & BLOB_FLAG) ? ",blob" : "", + (f->flags & BINARY_FLAG) ? ",binary" : "", + f->real_type(), + f->pack_length(), + f->ptr, f->ptr - table->record[0], + f->null_bit, + f->null_ptr, (byte*) f->null_ptr - table->record[0])); + if (f->type() == MYSQL_TYPE_BIT) + { + Field_bit *g= (Field_bit*) f; + DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] " + "bit_ofs: %u bit_len: %u", + g->field_length, g->bit_ptr, + (byte*) g->bit_ptr-table->record[0], + g->bit_ofs, g->bit_len)); + } + } +} +#else +#define dbug_print_table(a,b) +#endif + + +/* + Run a query through mysql_parse + + Used to: + - purging the cluster_replication.binlog_index + - creating the cluster_replication.apply_status table +*/ +static void run_query(THD *thd, char *buf, char *end, + my_bool print_error, my_bool disable_binlog) +{ + ulong save_query_length= thd->query_length; + char *save_query= thd->query; + ulong save_thread_id= thd->variables.pseudo_thread_id; + ulonglong save_thd_options= thd->options; + DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options)); + NET save_net= thd->net; + + bzero((char*) &thd->net, sizeof(NET)); + thd->query_length= end - buf; + thd->query= buf; + thd->variables.pseudo_thread_id= thread_id; + if (disable_binlog) + thd->options&= ~OPTION_BIN_LOG; + + DBUG_PRINT("query", ("%s", thd->query)); + mysql_parse(thd, thd->query, thd->query_length); + + if (print_error && thd->query_error) + { + sql_print_error("NDB: %s: error %s %d %d %d", + buf, thd->net.last_error, thd->net.last_errno, + thd->net.report_error, thd->query_error); + } + + thd->options= save_thd_options; + thd->query_length= save_query_length; + thd->query= save_query; + thd->variables.pseudo_thread_id= save_thread_id; + thd->net= save_net; + + if (thd == injector_thd) + { + /* + running the query will close all tables, including the binlog_index + used in injector_thd + */ + binlog_index= 0; + } +} + +/* + Initialize the binlog part of the NDB_SHARE +*/ +void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) +{ + THD *thd= current_thd; + MEM_ROOT *mem_root= &share->mem_root; + + share->op= 0; + share->table= 0; + if (ndb_binlog_thread_running <= 0) + { + DBUG_ASSERT(_table != 0); + if (_table->s->primary_key == MAX_KEY) + share->flags|= NSF_HIDDEN_PK; + return; + } + while (1) + { + TABLE_SHARE *table_share= + (TABLE_SHARE *) my_malloc(sizeof(*table_share), MYF(MY_WME)); + TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME)); + int error; + + init_tmp_table_share(table_share, share->db, 0, share->table_name, + share->key); + if ((error= open_table_def(thd, table_share, 0))) + { + sql_print_error("Unable to get table share for %s, error=%d", + share->key, error); + DBUG_PRINT("error", ("open_table_def failed %d", error)); + my_free((gptr) table_share, MYF(0)); + table_share= 0; + my_free((gptr) table, MYF(0)); + table= 0; + break; + } + if ((error= open_table_from_share(thd, table_share, "", 0, + (uint) READ_ALL, 0, table))) + { + sql_print_error("Unable to open table for %s, error=%d(%d)", + share->key, error, my_errno); + DBUG_PRINT("error", ("open_table_from_share failed %d", error)); + my_free((gptr) table_share, MYF(0)); + table_share= 0; + my_free((gptr) table, MYF(0)); + table= 0; + break; + } + assign_new_table_id(table); + if (!table->record[1] || table->record[1] == table->record[0]) + { + table->record[1]= alloc_root(&table->mem_root, + table->s->rec_buff_length); + } + table->in_use= injector_thd; + + table->s->db.str= share->db; + table->s->db.length= strlen(share->db); + table->s->table_name.str= share->table_name; + table->s->table_name.length= strlen(share->table_name); + + share->table_share= table_share; + share->table= table; +#ifndef DBUG_OFF + dbug_print_table("table", table); +#endif + /* + ! do not touch the contents of the table + it may be in use by the injector thread + */ + share->ndb_value[0]= (NdbValue*) + alloc_root(mem_root, sizeof(NdbValue) * table->s->fields + + 1 /*extra for hidden key*/); + share->ndb_value[1]= (NdbValue*) + alloc_root(mem_root, sizeof(NdbValue) * table->s->fields + +1 /*extra for hidden key*/); + { + int i, no_nodes= g_ndb_cluster_connection->no_db_nodes(); + share->subscriber_bitmap= (MY_BITMAP*) + alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP)); + for (i= 0; i < no_nodes; i++) + { + bitmap_init(&share->subscriber_bitmap[i], + (Uint32*)alloc_root(mem_root, max_ndb_nodes/8), + max_ndb_nodes, false); + bitmap_clear_all(&share->subscriber_bitmap[i]); + } + bitmap_init(&share->slock_bitmap, share->slock, + sizeof(share->slock)*8, false); + bitmap_clear_all(&share->slock_bitmap); + } + if (table->s->primary_key == MAX_KEY) + share->flags|= NSF_HIDDEN_PK; + break; + } +} + +/***************************************************************** + functions called from master sql client threads +****************************************************************/ + +/* + called in mysql_show_binlog_events and reset_logs to make sure we wait for + all events originating from this mysql server to arrive in the binlog + + Wait for the last epoch in which the last transaction is a part of. + + Wait a maximum of 30 seconds. +*/ +static void ndbcluster_binlog_wait(THD *thd) +{ + if (ndb_binlog_thread_running > 0) + { + DBUG_ENTER("ndbcluster_binlog_wait"); + const char *save_info= thd ? thd->proc_info : 0; + ulonglong wait_epoch= g_latest_trans_gci; + int count= 30; + if (thd) + thd->proc_info= "Waiting for ndbcluster binlog update to " + "reach current position"; + while (count && ndb_binlog_thread_running > 0 && + ndb_latest_handled_binlog_epoch < wait_epoch) + { + count--; + sleep(1); + } + if (thd) + thd->proc_info= save_info; + DBUG_VOID_RETURN; + } +} + +/* + Called from MYSQL_LOG::reset_logs in log.cc when binlog is emptied +*/ +static int ndbcluster_reset_logs(THD *thd) +{ + if (ndb_binlog_thread_running <= 0) + return 0; + + DBUG_ENTER("ndbcluster_reset_logs"); + + /* + Wait for all events orifinating from this mysql server has + reached the binlog before continuing to reset + */ + ndbcluster_binlog_wait(thd); + + char buf[1024]; + char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_REP_TABLE); + + run_query(thd, buf, end, FALSE, TRUE); + + DBUG_RETURN(0); +} + +/* + Called from MYSQL_LOG::purge_logs in log.cc when the binlog "file" + is removed +*/ + +static int +ndbcluster_binlog_index_purge_file(THD *thd, const char *file) +{ + if (ndb_binlog_thread_running <= 0) + return 0; + + DBUG_ENTER("ndbcluster_binlog_index_purge_file"); + DBUG_PRINT("enter", ("file: %s", file)); + + char buf[1024]; + char *end= strmov(strmov(strmov(buf, + "DELETE FROM " + NDB_REP_DB "." NDB_REP_TABLE + " WHERE File='"), file), "'"); + + run_query(thd, buf, end, FALSE, TRUE); + + DBUG_RETURN(0); +} + +static void +ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command, + const char *query, uint query_length, + const char *db, const char *table_name) +{ + DBUG_ENTER("ndbcluster_binlog_log_query"); + DBUG_PRINT("enter", ("db: %s table_name: %s query: %s", + db, table_name, query)); + DBUG_VOID_RETURN; +} + +/* + End use of the NDB Cluster table handler + - free all global variables allocated by + ndbcluster_init() +*/ + +static int ndbcluster_binlog_end(THD *thd) +{ + DBUG_ENTER("ndb_binlog_end"); + + if (!ndbcluster_util_inited) + DBUG_RETURN(0); + + // Kill ndb utility thread + (void) pthread_mutex_lock(&LOCK_ndb_util_thread); + DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread)); + (void) pthread_cond_signal(&COND_ndb_util_thread); + (void) pthread_mutex_unlock(&LOCK_ndb_util_thread); + +#ifdef HAVE_NDB_BINLOG + /* wait for injector thread to finish */ + if (ndb_binlog_thread_running > 0) + { + pthread_mutex_lock(&injector_mutex); + while (ndb_binlog_thread_running > 0) + { + struct timespec abstime; + set_timespec(abstime, 1); + pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); + } + pthread_mutex_unlock(&injector_mutex); + } + + /* remove all shares */ + { + pthread_mutex_lock(&ndbcluster_mutex); + for (uint i= 0; i < ndbcluster_open_tables.records; i++) + { + NDB_SHARE *share= + (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i); + if (share->table) + DBUG_PRINT("share", + ("table->s->db.table_name: %s.%s", + share->table->s->db.str, share->table->s->table_name.str)); + if (share->state != NSS_DROPPED && !--share->use_count) + real_free_share(&share); + else + { + DBUG_PRINT("share", + ("[%d] 0x%lx key: %s key_length: %d", + i, share, share->key, share->key_length)); + DBUG_PRINT("share", + ("db.tablename: %s.%s use_count: %d commit_count: %d", + share->db, share->table_name, + share->use_count, share->commit_count)); + } + } + pthread_mutex_unlock(&ndbcluster_mutex); + } +#endif + ndbcluster_util_inited= 0; + DBUG_RETURN(0); +} + +/***************************************************************** + functions called from slave sql client threads +****************************************************************/ +static void ndbcluster_reset_slave(THD *thd) +{ + if (ndb_binlog_thread_running <= 0) + return; + + DBUG_ENTER("ndbcluster_reset_slave"); + char buf[1024]; + char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE); + run_query(thd, buf, end, FALSE, TRUE); + DBUG_VOID_RETURN; +} + +/* + Initialize the binlog part of the ndb handlerton +*/ +static int ndbcluster_binlog_func(THD *thd, enum_binlog_func fn, void *arg) +{ + switch(fn) + { + case BFN_RESET_LOGS: + ndbcluster_reset_logs(thd); + break; + case BFN_RESET_SLAVE: + ndbcluster_reset_slave(thd); + break; + case BFN_BINLOG_WAIT: + ndbcluster_binlog_wait(thd); + break; + case BFN_BINLOG_END: + ndbcluster_binlog_end(thd); + break; + case BFN_BINLOG_PURGE_FILE: + ndbcluster_binlog_index_purge_file(thd, (const char *)arg); + break; + } + return 0; +} + +void ndbcluster_binlog_init_handlerton() +{ + handlerton &h= ndbcluster_hton; + h.binlog_func= ndbcluster_binlog_func; + h.binlog_log_query= ndbcluster_binlog_log_query; +} + + + + + +/* + check the availability af the cluster_replication.apply_status share + - return share, but do not increase refcount + - return 0 if there is no share +*/ +static NDB_SHARE *ndbcluster_check_apply_status_share() +{ + pthread_mutex_lock(&ndbcluster_mutex); + + void *share= hash_search(&ndbcluster_open_tables, + NDB_APPLY_TABLE_FILE, + sizeof(NDB_APPLY_TABLE_FILE) - 1); + DBUG_PRINT("info",("ndbcluster_check_apply_status_share %s %p", + NDB_APPLY_TABLE_FILE, share)); + pthread_mutex_unlock(&ndbcluster_mutex); + return (NDB_SHARE*) share; +} + +/* + check the availability af the cluster_replication.schema share + - return share, but do not increase refcount + - return 0 if there is no share +*/ +static NDB_SHARE *ndbcluster_check_schema_share() +{ + pthread_mutex_lock(&ndbcluster_mutex); + + void *share= hash_search(&ndbcluster_open_tables, + NDB_SCHEMA_TABLE_FILE, + sizeof(NDB_SCHEMA_TABLE_FILE) - 1); + DBUG_PRINT("info",("ndbcluster_check_schema_share %s %p", + NDB_SCHEMA_TABLE_FILE, share)); + pthread_mutex_unlock(&ndbcluster_mutex); + return (NDB_SHARE*) share; +} + +/* + Create the cluster_replication.apply_status table +*/ +static int ndbcluster_create_apply_status_table(THD *thd) +{ + DBUG_ENTER("ndbcluster_create_apply_status_table"); + + /* + Check if we already have the apply status table. + If so it should have been discovered at startup + and thus have a share + */ + + if (ndbcluster_check_apply_status_share()) + DBUG_RETURN(0); + + if (g_ndb_cluster_connection->get_no_ready() <= 0) + DBUG_RETURN(0); + + char buf[1024], *end; + + if (ndb_extra_logging) + sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE); + + /* + Check if apply status table exists in MySQL "dictionary" + if so, remove it since there is none in Ndb + */ + { + strxnmov(buf, sizeof(buf), + mysql_data_home, + "/" NDB_REP_DB "/" NDB_APPLY_TABLE, + reg_ext, NullS); + unpack_filename(buf,buf); + my_delete(buf, MYF(0)); + } + + /* + Note, updating this table schema must be reflected in ndb_restore + */ + end= strmov(buf, "CREATE TABLE IF NOT EXISTS " + NDB_REP_DB "." NDB_APPLY_TABLE + " ( server_id INT UNSIGNED NOT NULL," + " epoch BIGINT UNSIGNED NOT NULL, " + " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB"); + + run_query(thd, buf, end, TRUE, TRUE); + + DBUG_RETURN(0); +} + + +/* + Create the cluster_replication.schema table +*/ +static int ndbcluster_create_schema_table(THD *thd) +{ + DBUG_ENTER("ndbcluster_create_schema_table"); + + /* + Check if we already have the schema table. + If so it should have been discovered at startup + and thus have a share + */ + + if (ndbcluster_check_schema_share()) + DBUG_RETURN(0); + + if (g_ndb_cluster_connection->get_no_ready() <= 0) + DBUG_RETURN(0); + + char buf[1024], *end; + + if (ndb_extra_logging) + sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_SCHEMA_TABLE); + + /* + Check if schema table exists in MySQL "dictionary" + if so, remove it since there is none in Ndb + */ + { + strxnmov(buf, sizeof(buf), + mysql_data_home, + "/" NDB_REP_DB "/" NDB_SCHEMA_TABLE, + reg_ext, NullS); + unpack_filename(buf,buf); + my_delete(buf, MYF(0)); + } + + /* + Update the defines below to reflect the table schema + */ + end= strmov(buf, "CREATE TABLE IF NOT EXISTS " + NDB_REP_DB "." NDB_SCHEMA_TABLE + " ( db VARCHAR(63) NOT NULL," + " name VARCHAR(63) NOT NULL," + " slock BINARY(32) NOT NULL," + " query VARCHAR(4094) NOT NULL," + " node_id INT UNSIGNED NOT NULL," + " epoch BIGINT UNSIGNED NOT NULL," + " id INT UNSIGNED NOT NULL," + " version INT UNSIGNED NOT NULL," + " type INT UNSIGNED NOT NULL," + " PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB"); + + run_query(thd, buf, end, TRUE, TRUE); + + DBUG_RETURN(0); +} + +void ndbcluster_setup_binlog_table_shares(THD *thd) +{ + int done_find_all_files= 0; + if (!apply_status_share && + ndbcluster_check_apply_status_share() == 0) + { + if (!done_find_all_files) + { + ndbcluster_find_all_files(thd); + done_find_all_files= 1; + } + ndbcluster_create_apply_status_table(thd); + } + if (!schema_share && + ndbcluster_check_schema_share() == 0) + { + if (!done_find_all_files) + { + ndbcluster_find_all_files(thd); + done_find_all_files= 1; + } + ndbcluster_create_schema_table(thd); + } +} + +/* + Defines and struct for schema table. + Should reflect table definition above. +*/ +#define SCHEMA_DB_I 0u +#define SCHEMA_NAME_I 1u +#define SCHEMA_SLOCK_I 2u +#define SCHEMA_QUERY_I 3u +#define SCHEMA_NODE_ID_I 4u +#define SCHEMA_EPOCH_I 5u +#define SCHEMA_ID_I 6u +#define SCHEMA_VERSION_I 7u +#define SCHEMA_TYPE_I 8u +#define SCHEMA_SIZE 9u +#define SCHEMA_SLOCK_SIZE 32u +#define SCHEMA_QUERY_SIZE 4096u + +struct Cluster_replication_schema +{ + unsigned char db_length; + char db[64]; + unsigned char name_length; + char name[64]; + unsigned char slock_length; + uint32 slock[SCHEMA_SLOCK_SIZE/4]; + unsigned short query_length; + char query[SCHEMA_QUERY_SIZE]; + Uint64 epoch; + uint32 node_id; + uint32 id; + uint32 version; + uint32 type; +}; + +/* + Transfer schema table data into corresponding struct +*/ +static void ndbcluster_get_schema(TABLE *table, + Cluster_replication_schema *s) +{ + Field **field; + /* db varchar 1 length byte */ + field= table->field; + s->db_length= *(uint8*)(*field)->ptr; + DBUG_ASSERT(s->db_length <= (*field)->field_length); + DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db)); + memcpy(s->db, (*field)->ptr + 1, s->db_length); + s->db[s->db_length]= 0; + /* name varchar 1 length byte */ + field++; + s->name_length= *(uint8*)(*field)->ptr; + DBUG_ASSERT(s->name_length <= (*field)->field_length); + DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name)); + memcpy(s->name, (*field)->ptr + 1, s->name_length); + s->name[s->name_length]= 0; + /* slock fixed length */ + field++; + s->slock_length= (*field)->field_length; + DBUG_ASSERT((*field)->field_length == sizeof(s->slock)); + memcpy(s->slock, (*field)->ptr, s->slock_length); + /* query varchar 2 length bytes */ + field++; + s->query_length= uint2korr((*field)->ptr); + DBUG_ASSERT(s->query_length <= (*field)->field_length); + DBUG_ASSERT((*field)->field_length + 2 == sizeof(s->query)); + memcpy(s->query, (*field)->ptr + 2, s->query_length); + s->query[s->query_length]= 0; + /* node_id */ + field++; + s->node_id= ((Field_long *)*field)->val_int(); + /* epoch */ + field++; + s->epoch= ((Field_long *)*field)->val_int(); + /* id */ + field++; + s->id= ((Field_long *)*field)->val_int(); + /* version */ + field++; + s->version= ((Field_long *)*field)->val_int(); + /* type */ + field++; + s->type= ((Field_long *)*field)->val_int(); +} + +/* + helper function to pack a ndb varchar +*/ +static char *ndb_pack_varchar(const NDBCOL *col, char *buf, + const char *str, int sz) +{ + switch (col->getArrayType()) + { + case NDBCOL::ArrayTypeFixed: + memcpy(buf, str, sz); + break; + case NDBCOL::ArrayTypeShortVar: + *(unsigned char*)buf= (unsigned char)sz; + memcpy(buf + 1, str, sz); + break; + case NDBCOL::ArrayTypeMediumVar: + int2store(buf, sz); + memcpy(buf + 2, str, sz); + break; + } + return buf; +} + +/* + log query in schema table +*/ +int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, + const char *query, int query_length, + const char *db, const char *table_name, + uint32 ndb_table_id, + uint32 ndb_table_version, + enum SCHEMA_OP_TYPE type) +{ + DBUG_ENTER("ndbcluster_log_schema_op"); +#ifdef NOT_YET + Thd_ndb *thd_ndb= get_thd_ndb(thd); + if (!thd_ndb) + { + if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb())) + { + sql_print_error("Could not allocate Thd_ndb object"); + DBUG_RETURN(1); + } + set_thd_ndb(thd, thd_ndb); + } + + DBUG_PRINT("enter", + ("query: %s db: %s table_name: %s thd_ndb->options: %d", + query, db, table_name, thd_ndb->options)); + if (!schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP) + { + DBUG_RETURN(0); + } + + char tmp_buf2[FN_REFLEN]; + switch (type) + { + case SOT_DROP_TABLE: + /* drop database command, do not log at drop table */ + if (thd->lex->sql_command == SQLCOM_DROP_DB) + DBUG_RETURN(0); + /* redo the drop table query as is may contain several tables */ + query= tmp_buf2; + query_length= (uint) (strxmov(tmp_buf2, "drop table `", + table_name, "`", NullS) - tmp_buf2); + break; + case SOT_CREATE_TABLE: + break; + case SOT_RENAME_TABLE: + break; + case SOT_ALTER_TABLE: + break; + case SOT_DROP_DB: + break; + case SOT_CREATE_DB: + break; + case SOT_ALTER_DB: + break; + default: + abort(); /* should not happen, programming error */ + } + + const NdbError *ndb_error= 0; + uint32 node_id= g_ndb_cluster_connection->node_id(); + Uint64 epoch= 0; + MY_BITMAP schema_subscribers; + uint32 bitbuf[sizeof(schema_share->slock)/4]; + { + int i; + bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, false); + bitmap_set_all(&schema_subscribers); + (void) pthread_mutex_lock(&schema_share->mutex); + for (i= 0; i < ndb_number_of_storage_nodes; i++) + { + MY_BITMAP *table_subscribers= &schema_share->subscriber_bitmap[i]; + if (!bitmap_is_clear_all(table_subscribers)) + bitmap_intersect(&schema_subscribers, + table_subscribers); + } + (void) pthread_mutex_unlock(&schema_share->mutex); + bitmap_clear_bit(&schema_subscribers, node_id); + + if (share) + { + (void) pthread_mutex_lock(&share->mutex); + memcpy(share->slock, schema_subscribers.bitmap, sizeof(share->slock)); + (void) pthread_mutex_unlock(&share->mutex); + } + + DBUG_DUMP("schema_subscribers", (char*)schema_subscribers.bitmap, + no_bytes_in_map(&schema_subscribers)); + DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d", + bitmap_is_clear_all(&schema_subscribers))); + } + + Ndb *ndb= thd_ndb->ndb; + char old_db[128]; + strcpy(old_db, ndb->getDatabaseName()); + + char tmp_buf[SCHEMA_QUERY_SIZE]; + NDBDICT *dict= ndb->getDictionary(); + ndb->setDatabaseName(NDB_REP_DB); + const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE); + NdbTransaction *trans= 0; + int retries= 100; + const NDBCOL *col[SCHEMA_SIZE]; + unsigned sz[SCHEMA_SIZE]; + + if (ndbtab == 0) + { + if (strcmp(NDB_REP_DB, db) != 0 || + strcmp(NDB_SCHEMA_TABLE, table_name)) + { + ndb_error= &dict->getNdbError(); + goto end; + } + DBUG_RETURN(0); + } + + { + uint i; + for (i= 0; i < SCHEMA_SIZE; i++) + { + col[i]= ndbtab->getColumn(i); + sz[i]= col[i]->getLength(); + DBUG_ASSERT(sz[i] <= sizeof(tmp_buf)); + } + } + + while (1) + { + if ((trans= ndb->startTransaction()) == 0) + goto err; + { + NdbOperation *op= 0; + int r= 0; + r|= (op= trans->getNdbOperation(ndbtab)) == 0; + DBUG_ASSERT(r == 0); + r|= op->writeTuple(); + DBUG_ASSERT(r == 0); + + /* db */ + ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db)); + r|= op->equal(SCHEMA_DB_I, tmp_buf); + DBUG_ASSERT(r == 0); + /* name */ + ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name, + strlen(table_name)); + r|= op->equal(SCHEMA_NAME_I, tmp_buf); + DBUG_ASSERT(r == 0); + /* slock */ + DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf)); + r|= op->setValue(SCHEMA_SLOCK_I, (char*)schema_subscribers.bitmap); + DBUG_ASSERT(r == 0); + /* query */ + ndb_pack_varchar(col[SCHEMA_QUERY_I], tmp_buf, query, query_length); + r|= op->setValue(SCHEMA_QUERY_I, tmp_buf); + DBUG_ASSERT(r == 0); + /* node_id */ + r|= op->setValue(SCHEMA_NODE_ID_I, node_id); + DBUG_ASSERT(r == 0); + /* epoch */ + r|= op->setValue(SCHEMA_EPOCH_I, epoch); + DBUG_ASSERT(r == 0); + /* id */ + r|= op->setValue(SCHEMA_ID_I, ndb_table_id); + DBUG_ASSERT(r == 0); + /* version */ + r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version); + DBUG_ASSERT(r == 0); + /* type */ + r|= op->setValue(SCHEMA_TYPE_I, (uint32)type); + DBUG_ASSERT(r == 0); + } + if (trans->execute(NdbTransaction::Commit) == 0) + { + dict->forceGCPWait(); + DBUG_PRINT("info", ("logged: %s", query)); + break; + } +err: + if (trans->getNdbError().status == NdbError::TemporaryError) + { + if (retries--) + { + ndb->closeTransaction(trans); + continue; // retry + } + } + ndb_error= &trans->getNdbError(); + break; + } +end: + if (ndb_error) + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + ndb_error->code, + ndb_error->message, + "Could not log query '%s' on other mysqld's"); + + if (trans) + ndb->closeTransaction(trans); + ndb->setDatabaseName(old_db); + + /* + Wait for other mysqld's to acknowledge the table operation + */ + if (ndb_error == 0 && + (type == SOT_CREATE_TABLE || + type == SOT_RENAME_TABLE || + type == SOT_ALTER_TABLE) && + !bitmap_is_clear_all(&schema_subscribers)) + { + int max_timeout= 10; + (void) pthread_mutex_lock(&share->mutex); + while (1) + { + struct timespec abstime; + int i; + set_timespec(abstime, 1); + (void) pthread_cond_timedwait(&injector_cond, + &share->mutex, + &abstime); + + (void) pthread_mutex_lock(&schema_share->mutex); + for (i= 0; i < ndb_number_of_storage_nodes; i++) + { + /* remove any unsubscribed from schema_subscribers */ + MY_BITMAP *tmp= &schema_share->subscriber_bitmap[i]; + if (!bitmap_is_clear_all(tmp)) + bitmap_intersect(&schema_subscribers, tmp); + } + (void) pthread_mutex_unlock(&schema_share->mutex); + + /* remove any unsubscribed from share->slock */ + bitmap_intersect(&share->slock_bitmap, &schema_subscribers); + + DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap, + no_bytes_in_map(&share->slock_bitmap)); + + if (bitmap_is_clear_all(&share->slock_bitmap)) + break; + + max_timeout--; + if (max_timeout == 0) + { + sql_print_error("NDB create table: timed out. Ignoring..."); + break; + } + sql_print_information("NDB create table: " + "waiting max %u sec for create table %s.", + max_timeout, share->key); + } + (void) pthread_mutex_unlock(&share->mutex); + } +#endif + DBUG_RETURN(0); +} + +/* + acknowledge handling of schema operation +*/ +static int +ndbcluster_update_slock(THD *thd, + const char *db, + const char *table_name) +{ + DBUG_ENTER("ndbcluster_update_slock"); + if (!schema_share) + { + DBUG_RETURN(0); + } + + const NdbError *ndb_error= 0; + uint32 node_id= g_ndb_cluster_connection->node_id(); + Ndb *ndb= check_ndb_in_thd(thd); + char old_db[128]; + strcpy(old_db, ndb->getDatabaseName()); + + char tmp_buf[SCHEMA_QUERY_SIZE]; + NDBDICT *dict= ndb->getDictionary(); + ndb->setDatabaseName(NDB_REP_DB); + const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE); + NdbTransaction *trans= 0; + int retries= 100; + const NDBCOL *col[SCHEMA_SIZE]; + unsigned sz[SCHEMA_SIZE]; + + MY_BITMAP slock; + uint32 bitbuf[SCHEMA_SLOCK_SIZE/4]; + bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false); + + if (ndbtab == 0) + { + abort(); + DBUG_RETURN(0); + } + + { + uint i; + for (i= 0; i < SCHEMA_SIZE; i++) + { + col[i]= ndbtab->getColumn(i); + sz[i]= col[i]->getLength(); + DBUG_ASSERT(sz[i] <= sizeof(tmp_buf)); + } + } + + while (1) + { + if ((trans= ndb->startTransaction()) == 0) + goto err; + { + NdbOperation *op= 0; + int r= 0; + + /* read the bitmap exlusive */ + r|= (op= trans->getNdbOperation(ndbtab)) == 0; + DBUG_ASSERT(r == 0); + r|= op->readTupleExclusive(); + DBUG_ASSERT(r == 0); + + /* db */ + ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db)); + r|= op->equal(SCHEMA_DB_I, tmp_buf); + DBUG_ASSERT(r == 0); + /* name */ + ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name, + strlen(table_name)); + r|= op->equal(SCHEMA_NAME_I, tmp_buf); + DBUG_ASSERT(r == 0); + /* slock */ + r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0; + DBUG_ASSERT(r == 0); + } + if (trans->execute(NdbTransaction::NoCommit)) + goto err; + bitmap_clear_bit(&slock, node_id); + { + NdbOperation *op= 0; + int r= 0; + + /* now update the tuple */ + r|= (op= trans->getNdbOperation(ndbtab)) == 0; + DBUG_ASSERT(r == 0); + r|= op->updateTuple(); + DBUG_ASSERT(r == 0); + + /* db */ + ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db)); + r|= op->equal(SCHEMA_DB_I, tmp_buf); + DBUG_ASSERT(r == 0); + /* name */ + ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name, + strlen(table_name)); + r|= op->equal(SCHEMA_NAME_I, tmp_buf); + DBUG_ASSERT(r == 0); + /* slock */ + r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap); + DBUG_ASSERT(r == 0); + /* node_id */ + r|= op->setValue(SCHEMA_NODE_ID_I, node_id); + DBUG_ASSERT(r == 0); + /* type */ + r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK); + DBUG_ASSERT(r == 0); + } + if (trans->execute(NdbTransaction::Commit) == 0) + { + dict->forceGCPWait(); + DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'", + node_id, db, table_name)); + break; + } + err: + if (trans->getNdbError().status == NdbError::TemporaryError) + { + if (retries--) + { + ndb->closeTransaction(trans); + continue; // retry + } + } + ndb_error= &trans->getNdbError(); + break; + } +end: + if (ndb_error) + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + ndb_error->code, + ndb_error->message, + "Could not release lock on '%s.%s'", + db, table_name); + if (trans) + ndb->closeTransaction(trans); + ndb->setDatabaseName(old_db); + DBUG_RETURN(0); +} + +/* + Handle _non_ data events from the storage nodes +*/ +static int +ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, + NDB_SHARE *share) +{ + int remote_drop_table= 0, do_close_cached_tables= 0; + + if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE && + pOp->getReqNodeId() != g_ndb_cluster_connection->node_id()) + { + ndb->setDatabaseName(share->table->s->db.str); + ha_ndbcluster::invalidate_dictionary_cache(share->table, + ndb, + share->table->s->table_name.str, + TRUE); + remote_drop_table= 1; + } + + (void) pthread_mutex_lock(&share->mutex); + DBUG_ASSERT(share->op == pOp || share->op_old == pOp); + if (share->op_old == pOp) + share->op_old= 0; + else + share->op= 0; + // either just us or drop table handling as well + + /* Signal ha_ndbcluster::delete/rename_table that drop is done */ + (void) pthread_mutex_unlock(&share->mutex); + (void) pthread_cond_signal(&injector_cond); + + pthread_mutex_lock(&ndbcluster_mutex); + free_share(&share, TRUE); + if (remote_drop_table && share && share->state != NSS_DROPPED) + { + DBUG_PRINT("info", ("remote drop table")); + if (share->use_count != 1) + do_close_cached_tables= 1; + share->state= NSS_DROPPED; + free_share(&share, TRUE); + } + pthread_mutex_unlock(&ndbcluster_mutex); + + share= 0; + pOp->setCustomData(0); + + pthread_mutex_lock(&injector_mutex); + injector_ndb->dropEventOperation(pOp); + pOp= 0; + pthread_mutex_unlock(&injector_mutex); + + if (do_close_cached_tables) + close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0); + return 0; +} + +static int +ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, + NdbEventOperation *pOp, + List<Cluster_replication_schema> + *schema_list, MEM_ROOT *mem_root) +{ + DBUG_ENTER("ndb_binlog_thread_handle_schema_event"); + NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); + if (share && schema_share == share) + { + NDBEVENT::TableEvent ev_type= pOp->getEventType(); + DBUG_PRINT("enter", ("%s.%s ev_type: %d", + share->db, share->table_name, ev_type)); + switch (ev_type) + { + case NDBEVENT::TE_UPDATE: + case NDBEVENT::TE_INSERT: + { + Cluster_replication_schema *schema= (Cluster_replication_schema *) + sql_alloc(sizeof(Cluster_replication_schema)); + MY_BITMAP slock; + bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false); + uint node_id= g_ndb_cluster_connection->node_id(); + ndbcluster_get_schema(share->table, schema); + if (schema->node_id != node_id) + { + int log_query= 0; + DBUG_PRINT("info", ("log query_length: %d query: '%s'", + schema->query_length, schema->query)); + switch ((enum SCHEMA_OP_TYPE)schema->type) + { + case SOT_DROP_TABLE: + /* binlog dropping table after any table operations */ + schema_list->push_back(schema, mem_root); + log_query= 0; + break; + case SOT_CREATE_TABLE: + /* fall through */ + case SOT_RENAME_TABLE: + /* fall through */ + case SOT_ALTER_TABLE: + pthread_mutex_lock(&LOCK_open); + if (ha_create_table_from_engine(thd, schema->db, schema->name)) + { + sql_print_error("Could not discover table '%s.%s' from " + "binlog schema event '%s' from node %d", + schema->db, schema->name, schema->query, + schema->node_id); + } + pthread_mutex_unlock(&LOCK_open); + { + /* signal that schema operation has been handled */ + DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); + if (bitmap_is_set(&slock, node_id)) + ndbcluster_update_slock(thd, schema->db, schema->name); + } + log_query= 1; + break; + case SOT_DROP_DB: + run_query(thd, schema->query, + schema->query + schema->query_length, + TRUE, /* print error */ + TRUE); /* don't binlog the query */ + /* binlog dropping database after any table operations */ + schema_list->push_back(schema, mem_root); + log_query= 0; + break; + case SOT_CREATE_DB: + /* fall through */ + case SOT_ALTER_DB: + run_query(thd, schema->query, + schema->query + schema->query_length, + TRUE, /* print error */ + FALSE); /* binlog the query */ + log_query= 0; + break; + case SOT_CLEAR_SLOCK: + { + char key[FN_REFLEN]; + (void)strxnmov(key, FN_REFLEN, share_prefix, schema->db, + "/", schema->name, NullS); + NDB_SHARE *share= get_share(key, 0, false, false); + if (share) + { + pthread_mutex_lock(&share->mutex); + memcpy(share->slock, schema->slock, sizeof(share->slock)); + DBUG_DUMP("share->slock_bitmap.bitmap", + (char*)share->slock_bitmap.bitmap, + no_bytes_in_map(&share->slock_bitmap)); + pthread_mutex_unlock(&share->mutex); + pthread_cond_signal(&injector_cond); + free_share(&share); + } + DBUG_RETURN(0); + } + } + if (log_query) + { + char *thd_db_save= thd->db; + thd->db= schema->db; + thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query, + schema->query_length, FALSE, + schema->name[0] == 0); + thd->db= thd_db_save; + } + } + } + break; + case NDBEVENT::TE_DELETE: + // skip + break; + case NDBEVENT::TE_ALTER: + /* do the rename of the table in the share */ + share->table->s->db.str= share->db; + share->table->s->db.length= strlen(share->db); + share->table->s->table_name.str= share->table_name; + share->table->s->table_name.length= strlen(share->table_name); + ndb_handle_schema_change(thd, ndb, pOp, share); + break; + case NDBEVENT::TE_CLUSTER_FAILURE: + case NDBEVENT::TE_DROP: + free_share(&schema_share); + schema_share= 0; + ndb_handle_schema_change(thd, ndb, pOp, share); + break; + case NDBEVENT::TE_NODE_FAILURE: + { + uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; + DBUG_ASSERT(node_id != 0xFF); + (void) pthread_mutex_lock(&share->mutex); + bitmap_clear_all(&share->subscriber_bitmap[node_id]); + DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id)); + (void) pthread_mutex_unlock(&share->mutex); + (void) pthread_cond_signal(&injector_cond); + break; + } + case NDBEVENT::TE_SUBSCRIBE: + { + uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; + uint8 req_id= pOp->getReqNodeId(); + DBUG_ASSERT(req_id != 0 && node_id != 0xFF); + (void) pthread_mutex_lock(&share->mutex); + bitmap_set_bit(&share->subscriber_bitmap[node_id], req_id); + DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id)); + (void) pthread_mutex_unlock(&share->mutex); + (void) pthread_cond_signal(&injector_cond); + break; + } + case NDBEVENT::TE_UNSUBSCRIBE: + { + uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; + uint8 req_id= pOp->getReqNodeId(); + DBUG_ASSERT(req_id != 0 && node_id != 0xFF); + (void) pthread_mutex_lock(&share->mutex); + bitmap_clear_bit(&share->subscriber_bitmap[node_id], req_id); + DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id)); + (void) pthread_mutex_unlock(&share->mutex); + (void) pthread_cond_signal(&injector_cond); + break; + } + default: + sql_print_error("NDB Binlog: unknown non data event %d for %s. " + "Ignoring...", (unsigned) ev_type, share->key); + } + } + DBUG_RETURN(0); +} + +/* + Timer class for doing performance measurements +*/ + +/********************************************************************* + Internal helper functions for handeling of the cluster replication tables + - cluster_replication.binlog_index + - cluster_replication.apply_status +*********************************************************************/ + +/* + struct to hold the data to be inserted into the + cluster_replication.binlog_index table +*/ +struct Binlog_index_row { + ulonglong gci; + const char *master_log_file; + ulonglong master_log_pos; + ulonglong n_inserts; + ulonglong n_updates; + ulonglong n_deletes; + ulonglong n_schemaops; +}; + +/* + Open the cluster_replication.binlog_index table +*/ +static int open_binlog_index(THD *thd, TABLE_LIST *tables, + TABLE **binlog_index) +{ + static char repdb[]= NDB_REP_DB; + static char reptable[]= NDB_REP_TABLE; + const char *save_proc_info= thd->proc_info; + + bzero((char*) tables, sizeof(*tables)); + tables->db= repdb; + tables->alias= tables->table_name= reptable; + tables->lock_type= TL_WRITE; + thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE; + tables->required_type= FRMTYPE_TABLE; + uint counter; + thd->clear_error(); + if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH)) + { + sql_print_error("NDB Binlog: Opening binlog_index: %d, '%s'", + thd->net.last_errno, + thd->net.last_error ? thd->net.last_error : ""); + thd->proc_info= save_proc_info; + return -1; + } + *binlog_index= tables->table; + thd->proc_info= save_proc_info; + return 0; +} + +/* + Insert one row in the cluster_replication.binlog_index + + declared friend in handler.h to be able to call write_row directly + so that this insert is not replicated +*/ +int ndb_add_binlog_index(THD *thd, void *_row) +{ + Binlog_index_row &row= *(Binlog_index_row *) _row; + int error= 0; + bool need_reopen; + for ( ; ; ) /* loop for need_reopen */ + { + if (!binlog_index && open_binlog_index(thd, &binlog_tables, &binlog_index)) + { + error= -1; + goto add_binlog_index_err; + } + + if (lock_tables(thd, &binlog_tables, 1, &need_reopen)) + { + if (need_reopen) + { + close_tables_for_reopen(thd, &binlog_tables); + binlog_index= 0; + continue; + } + sql_print_error("NDB Binlog: Unable to lock table binlog_index"); + error= -1; + goto add_binlog_index_err; + } + break; + } + + binlog_index->field[0]->store(row.master_log_pos); + binlog_index->field[1]->store(row.master_log_file, + strlen(row.master_log_file), + &my_charset_bin); + binlog_index->field[2]->store(row.gci); + binlog_index->field[3]->store(row.n_inserts); + binlog_index->field[4]->store(row.n_updates); + binlog_index->field[5]->store(row.n_deletes); + binlog_index->field[6]->store(row.n_schemaops); + + int r; + if ((r= binlog_index->file->write_row(binlog_index->record[0]))) + { + sql_print_error("NDB Binlog: Writing row to binlog_index: %d", r); + error= -1; + goto add_binlog_index_err; + } + + mysql_unlock_tables(thd, thd->lock); + thd->lock= 0; + return 0; +add_binlog_index_err: + close_thread_tables(thd); + binlog_index= 0; + return error; +} + +/********************************************************************* + Functions for start, stop, wait for ndbcluster binlog thread +*********************************************************************/ + +static int do_ndbcluster_binlog_close_connection= 0; + +int ndbcluster_binlog_start() +{ + DBUG_ENTER("ndbcluster_binlog_start"); + + pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST); + pthread_cond_init(&injector_cond, NULL); + + /* Create injector thread */ + if (pthread_create(&ndb_binlog_thread, &connection_attrib, + ndb_binlog_thread_func, 0)) + { + DBUG_PRINT("error", ("Could not create ndb injector thread")); + pthread_cond_destroy(&injector_cond); + pthread_mutex_destroy(&injector_mutex); + DBUG_RETURN(-1); + } + + /* + Wait for the ndb injector thread to finish starting up. + */ + pthread_mutex_lock(&injector_mutex); + while (!ndb_binlog_thread_running) + pthread_cond_wait(&injector_cond, &injector_mutex); + pthread_mutex_unlock(&injector_mutex); + + if (ndb_binlog_thread_running < 0) + DBUG_RETURN(-1); + + DBUG_RETURN(0); +} + +static void ndbcluster_binlog_close_connection(THD *thd) +{ + DBUG_ENTER("ndbcluster_binlog_close_connection"); + const char *save_info= thd->proc_info; + thd->proc_info= "ndbcluster_binlog_close_connection"; + do_ndbcluster_binlog_close_connection= 1; + while (ndb_binlog_thread_running > 0) + sleep(1); + thd->proc_info= save_info; + DBUG_VOID_RETURN; +} + +/************************************************************** + Internal helper functions for creating/dropping ndb events + used by the client sql threads +**************************************************************/ +void +ndb_rep_event_name(String *event_name,const char *db, const char *tbl) +{ + event_name->set_ascii("REPL$", 5); + event_name->append(db); + if (tbl) + { + event_name->append('/'); + event_name->append(tbl); + } +} + +/* + Common function for setting up everything for logging a table at + create/discover. +*/ +int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, + const char *db, + const char *table_name, + NDB_SHARE *share) +{ + DBUG_ENTER("ndbcluster_create_binlog_setup"); + + pthread_mutex_lock(&ndbcluster_mutex); + + /* Handle any trailing share */ + if (share == 0) + { + share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables, + (byte*) key, strlen(key)); + if (share) + handle_trailing_share(share); + } + else + handle_trailing_share(share); + + /* Create share which is needed to hold replication information */ + if (!(share= get_share(key, 0, true, true))) + { + sql_print_error("NDB Binlog: " + "allocating table share for %s failed", key); + } + pthread_mutex_unlock(&ndbcluster_mutex); + + while (share && !IS_TMP_PREFIX(table_name)) + { + /* + ToDo make sanity check of share so that the table is actually the same + I.e. we need to do open file from frm in this case + Currently awaiting this to be fixed in the 4.1 tree in the general + case + */ + + /* Create the event in NDB */ + ndb->setDatabaseName(db); + + NDBDICT *dict= ndb->getDictionary(); + const NDBTAB *ndbtab= dict->getTable(table_name); + if (ndbtab == 0) + { + if (ndb_extra_logging) + sql_print_information("NDB Binlog: Failed to get table %s from ndb: " + "%s, %d", key, dict->getNdbError().message, + dict->getNdbError().code); + break; // error + } + String event_name(INJECTOR_EVENT_LEN); + ndb_rep_event_name(&event_name, db, table_name); + /* + event should have been created by someone else, + but let's make sure, and create if it doesn't exist + */ + if (!dict->getEvent(event_name.c_ptr())) + { + if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share)) + { + sql_print_error("NDB Binlog: " + "FAILED CREATE (DISCOVER) TABLE Event: %s", + event_name.c_ptr()); + break; // error + } + if (ndb_extra_logging) + sql_print_information("NDB Binlog: " + "CREATE (DISCOVER) TABLE Event: %s", + event_name.c_ptr()); + } + else + if (ndb_extra_logging) + sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s", + event_name.c_ptr()); + + /* + create the event operations for receiving logging events + */ + if (ndbcluster_create_event_ops(share, ndbtab, + event_name.c_ptr()) < 0) + { + sql_print_error("NDB Binlog:" + "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s", + event_name.c_ptr()); + /* a warning has been issued to the client */ + DBUG_RETURN(0); + } + DBUG_RETURN(0); + } + DBUG_RETURN(-1); +} + +int +ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, + const char *event_name, NDB_SHARE *share) +{ + DBUG_ENTER("ndbcluster_create_event"); + if (!share) + DBUG_RETURN(0); + NDBDICT *dict= ndb->getDictionary(); + NDBEVENT my_event(event_name); + my_event.setTable(*ndbtab); + my_event.addTableEvent(NDBEVENT::TE_ALL); + if (share->flags & NSF_HIDDEN_PK) + { + /* No primary key, susbscribe for all attributes */ + my_event.setReport(NDBEVENT::ER_ALL); + DBUG_PRINT("info", ("subscription all")); + } + else + { + if (schema_share || strcmp(share->db, NDB_REP_DB) || + strcmp(share->table_name, NDB_SCHEMA_TABLE)) + { + my_event.setReport(NDBEVENT::ER_UPDATED); + DBUG_PRINT("info", ("subscription only updated")); + } + else + { + my_event.setReport((NDBEVENT::EventReport) + (NDBEVENT::ER_ALL | NDBEVENT::ER_SUBSCRIBE)); + DBUG_PRINT("info", ("subscription all and subscribe")); + } + } + + /* add all columns to the event */ + int n_cols= ndbtab->getNoOfColumns(); + for(int a= 0; a < n_cols; a++) + my_event.addEventColumn(a); + + if (dict->createEvent(my_event)) // Add event to database + { +#ifdef NDB_BINLOG_EXTRA_WARNINGS + /* + failed, print a warning + */ + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + dict->getNdbError().code, + dict->getNdbError().message, "NDB"); +#endif + if (dict->getNdbError().classification != NdbError::SchemaObjectExists) + { + sql_print_error("NDB Binlog: Unable to create event in database. " + "Event: %s Error Code: %d Message: %s", event_name, + dict->getNdbError().code, dict->getNdbError().message); + DBUG_RETURN(-1); + } + + /* + trailing event from before; an error, but try to correct it + */ + if (dict->dropEvent(my_event.getName())) + { + sql_print_error("NDB Binlog: Unable to create event in database. " + " Attempt to correct with drop failed. " + "Event: %s Error Code: %d Message: %s", + event_name, + dict->getNdbError().code, + dict->getNdbError().message); + DBUG_RETURN(-1); + } + + /* + try to add the event again + */ + if (dict->createEvent(my_event)) + { + sql_print_error("NDB Binlog: Unable to create event in database. " + " Attempt to correct with drop ok, but create failed. " + "Event: %s Error Code: %d Message: %s", + event_name, + dict->getNdbError().code, + dict->getNdbError().message); + DBUG_RETURN(-1); + } +#ifdef NDB_BINLOG_EXTRA_WARNINGS + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + 0, "NDB Binlog: Removed trailing event", + "NDB"); +#endif + } + + DBUG_RETURN(0); +} + +inline int is_ndb_compatible_type(Field *field) +{ + return + !(field->flags & BLOB_FLAG) && + field->type() != MYSQL_TYPE_BIT && + field->pack_length() != 0; +} + +/* + - create eventOperations for receiving log events + - setup ndb recattrs for reception of log event data + - "start" the event operation + + used at create/discover of tables +*/ +int +ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, + const char *event_name) +{ + /* + we are in either create table or rename table so table should be + locked, hence we can work with the share without locks + */ + + DBUG_ENTER("ndbcluster_create_event_ops"); + + DBUG_ASSERT(share != 0); + + if (share->op) + { + assert(share->op->getCustomData() == (void *) share); + + DBUG_ASSERT(share->use_count > 1); + sql_print_error("NDB Binlog: discover reusing old ev op"); + free_share(&share); // old event op already has reference + DBUG_RETURN(0); + } + + TABLE *table= share->table; + if (table) + { + /* + Logging of blob tables is not yet implemented, it would require: + 1. setup of events also on the blob attribute tables + 2. collect the pieces of the blob into one from an epoch to + provide a full blob to binlog + */ + if (table->s->blob_fields) + { + sql_print_error("NDB Binlog: logging of blob table %s " + "is not supported", share->key); + DBUG_RETURN(0); + } + } + + int do_schema_share= 0, do_apply_status_share= 0; + int retries= 100; + if (!schema_share && strcmp(share->db, NDB_REP_DB) == 0 && + strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0) + do_schema_share= 1; + else if (!apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 && + strcmp(share->table_name, NDB_APPLY_TABLE) == 0) + do_apply_status_share= 1; + + while (1) + { + pthread_mutex_lock(&injector_mutex); + Ndb *ndb= injector_ndb; + if (do_schema_share) + ndb= schema_ndb; + + if (ndb == 0) + { + pthread_mutex_unlock(&injector_mutex); + DBUG_RETURN(-1); + } + + NdbEventOperation *op= ndb->createEventOperation(event_name); + if (!op) + { + pthread_mutex_unlock(&injector_mutex); + sql_print_error("NDB Binlog: Creating NdbEventOperation failed for" + " %s",event_name); + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + ndb->getNdbError().code, + ndb->getNdbError().message, + "NDB"); + DBUG_RETURN(-1); + } + + int n_columns= ndbtab->getNoOfColumns(); + int n_fields= table ? table->s->fields : 0; + for (int j= 0; j < n_columns; j++) + { + const char *col_name= ndbtab->getColumn(j)->getName(); + NdbRecAttr *attr0, *attr1; + if (j < n_fields) + { + Field *f= share->table->field[j]; + if (is_ndb_compatible_type(f)) + { + DBUG_PRINT("info", ("%s compatible", col_name)); + attr0= op->getValue(col_name, f->ptr); + attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) + + share->table->record[1]); + } + else + { + DBUG_PRINT("info", ("%s non compatible", col_name)); + attr0= op->getValue(col_name); + attr1= op->getPreValue(col_name); + } + } + else + { + DBUG_PRINT("info", ("%s hidden key", col_name)); + attr0= op->getValue(col_name); + attr1= op->getPreValue(col_name); + } + share->ndb_value[0][j].rec= attr0; + share->ndb_value[1][j].rec= attr1; + } + op->setCustomData((void *) share); // set before execute + share->op= op; // assign op in NDB_SHARE + if (op->execute()) + { + share->op= NULL; + retries--; + if (op->getNdbError().status != NdbError::TemporaryError && + op->getNdbError().code != 1407) + retries= 0; + if (retries == 0) + { + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + op->getNdbError().code, op->getNdbError().message, + "NDB"); + sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s", + event_name, + op->getNdbError().code, op->getNdbError().message); + } + ndb->dropEventOperation(op); + pthread_mutex_unlock(&injector_mutex); + if (retries) + continue; + DBUG_RETURN(-1); + } + pthread_mutex_unlock(&injector_mutex); + break; + } + + get_share(share); + if (do_apply_status_share) + apply_status_share= get_share(share); + else if (do_schema_share) + schema_share= get_share(share); + + DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u", + share->key, share->op, share->use_count)); + + if (ndb_extra_logging) + sql_print_information("NDB Binlog: logging %s", share->key); + DBUG_RETURN(0); +} + +/* + when entering the calling thread should have a share lock id share != 0 + then the injector thread will have one as well, i.e. share->use_count == 0 + (unless it has already dropped... then share->op == 0) +*/ +int +ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, + NDB_SHARE *share) +{ + DBUG_ENTER("ndbcluster_handle_drop_table"); + + NDBDICT *dict= ndb->getDictionary(); + if (event_name && dict->dropEvent(event_name)) + { + if (dict->getNdbError().code != 4710) + { + /* drop event failed for some reason, issue a warning */ + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, + ER_GET_ERRMSG, ER(ER_GET_ERRMSG), + dict->getNdbError().code, + dict->getNdbError().message, "NDB"); + /* error is not that the event did not exist */ + sql_print_error("NDB Binlog: Unable to drop event in database. " + "Event: %s Error Code: %d Message: %s", + event_name, + dict->getNdbError().code, + dict->getNdbError().message); + /* ToDo; handle error? */ + if (share && share->op && + share->op->getState() == NdbEventOperation::EO_EXECUTING && + dict->getNdbError().code != 4009) + { + DBUG_ASSERT(false); + DBUG_RETURN(-1); + } + } + } + + if (share == 0 || share->op == 0) + { + DBUG_RETURN(0); + } + +/* + Syncronized drop between client thread and injector thread is + neccessary in order to maintain ordering in the binlog, + such that the drop occurs _after_ any inserts/updates/deletes. + + The penalty for this is that the drop table becomes slow. + + This wait is however not strictly neccessary to produce a binlog + that is usable. However the slave does not currently handle + these out of order, thus we are keeping the SYNC_DROP_ defined + for now. +*/ +#define SYNC_DROP_ +#ifdef SYNC_DROP_ + (void) pthread_mutex_lock(&share->mutex); + int max_timeout= 10; + while (share->op) + { + struct timespec abstime; + set_timespec(abstime, 1); + (void) pthread_cond_timedwait(&injector_cond, + &share->mutex, + &abstime); + max_timeout--; + if (share->op == 0) + break; + if (max_timeout == 0) + { + sql_print_error("NDB delete table: timed out. Ignoring..."); + break; + } + if (ndb_extra_logging) + sql_print_information("NDB delete table: " + "waiting max %u sec for drop table %s.", + max_timeout, share->key); + } + (void) pthread_mutex_unlock(&share->mutex); +#else + (void) pthread_mutex_lock(&share->mutex); + share->op_old= share->op; + share->op= 0; + (void) pthread_mutex_unlock(&share->mutex); +#endif + + DBUG_RETURN(0); +} + + +/******************************************************************** + Internal helper functions for differentd events from the stoarage nodes + used by the ndb injector thread +********************************************************************/ + +/* + Handle error states on events from the storage nodes +*/ +static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp, + Binlog_index_row &row) +{ + NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); + DBUG_ENTER("ndb_binlog_thread_handle_error"); + + int overrun= pOp->isOverrun(); + if (overrun) + { + /* + ToDo: this error should rather clear the binlog_index... + and continue + */ + sql_print_error("NDB Binlog: Overrun in event buffer, " + "this means we have dropped events. Cannot " + "continue binlog for %s", share->key); + pOp->clearError(); + DBUG_RETURN(-1); + } + + if (!pOp->isConsistent()) + { + /* + ToDo: this error should rather clear the binlog_index... + and continue + */ + sql_print_error("NDB Binlog: Not Consistent. Cannot " + "continue binlog for %s. Error code: %d" + " Message: %s", share->key, + pOp->getNdbError().code, + pOp->getNdbError().message); + pOp->clearError(); + DBUG_RETURN(-1); + } + sql_print_error("NDB Binlog: unhandled error %d for table %s", + pOp->hasError(), share->key); + pOp->clearError(); + DBUG_RETURN(0); +} + +static int +ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, + Binlog_index_row &row) +{ + NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); + NDBEVENT::TableEvent type= pOp->getEventType(); + + /* make sure to flush any pending events as they can be dependent + on one of the tables being changed below + */ + injector_thd->binlog_flush_pending_rows_event(true); + + switch (type) + { + case NDBEVENT::TE_CLUSTER_FAILURE: + if (apply_status_share == share) + { + free_share(&apply_status_share); + apply_status_share= 0; + } + if (ndb_extra_logging) + sql_print_information("NDB Binlog: cluster failure for %s.", share->key); + DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: " + "%s received share: 0x%lx op: %lx share op: %lx " + "op_old: %lx", + share->key, share, pOp, share->op, share->op_old)); + break; + case NDBEVENT::TE_ALTER: + /* ToDo: remove printout */ + if (ndb_extra_logging) + sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.", + share_prefix, share->table->s->db.str, + share->table->s->table_name.str, + share->key); + /* do the rename of the table in the share */ + share->table->s->db.str= share->db; + share->table->s->db.length= strlen(share->db); + share->table->s->table_name.str= share->table_name; + share->table->s->table_name.length= strlen(share->table_name); + goto drop_alter_common; + case NDBEVENT::TE_DROP: + if (apply_status_share == share) + { + free_share(&apply_status_share); + apply_status_share= 0; + } + /* ToDo: remove printout */ + if (ndb_extra_logging) + sql_print_information("NDB Binlog: drop table %s.", share->key); +drop_alter_common: + row.n_schemaops++; + DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: %lx " + "share op: %lx op_old: %lx", + type == NDBEVENT::TE_DROP ? "DROP" : "ALTER", + share->key, share, pOp, share->op, share->op_old)); + break; + case NDBEVENT::TE_NODE_FAILURE: + /* fall through */ + case NDBEVENT::TE_SUBSCRIBE: + /* fall through */ + case NDBEVENT::TE_UNSUBSCRIBE: + /* ignore */ + return 0; + default: + sql_print_error("NDB Binlog: unknown non data event %d for %s. " + "Ignoring...", (unsigned) type, share->key); + return 0; + } + + ndb_handle_schema_change(injector_thd, ndb, pOp, share); + return 0; +} + +/* + Handle data events from the storage nodes +*/ +static int +ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, + Binlog_index_row &row, + injector::transaction &trans) +{ + NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData(); + if (share == apply_status_share) + return 0; + TABLE *table= share->table; + + assert(table != 0); + + dbug_print_table("table", table); + + TABLE_SHARE *table_s= table->s; + uint n_fields= table_s->fields; + MY_BITMAP b; + /* Potential buffer for the bitmap */ + uint32 bitbuf[128 / (sizeof(uint32) * 8)]; + bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL, + n_fields, false); + bitmap_set_all(&b); + + /* + row data is already in table->record[0] + As we told the NdbEventOperation to do this + (saves moving data about many times) + */ + + switch(pOp->getEventType()) + { + case NDBEVENT::TE_INSERT: + row.n_inserts++; + DBUG_PRINT("info", ("INSERT INTO %s", share->key)); + { + ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); + trans.write_row(::server_id, injector::transaction::table(table, true), + &b, n_fields, table->record[0]); + } + break; + case NDBEVENT::TE_DELETE: + row.n_deletes++; + DBUG_PRINT("info",("DELETE FROM %s", share->key)); + { + /* + table->record[0] contains only the primary key in this case + since we do not have an after image + */ + int n; + if (table->s->primary_key != MAX_KEY) + n= 0; /* + use the primary key only as it save time and space and + it is the only thing needed to log the delete + */ + else + n= 1; /* + we use the before values since we don't have a primary key + since the mysql server does not handle the hidden primary + key + */ + + ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]); + print_records(table, table->record[n]); + trans.delete_row(::server_id, injector::transaction::table(table, true), + &b, n_fields, table->record[n]); + } + break; + case NDBEVENT::TE_UPDATE: + row.n_updates++; + DBUG_PRINT("info", ("UPDATE %s", share->key)); + { + ndb_unpack_record(table, share->ndb_value[0], + &b, table->record[0]); + print_records(table, table->record[0]); + if (table->s->primary_key != MAX_KEY) + { + /* + since table has a primary key, we can to a write + using only after values + */ + trans.write_row(::server_id, injector::transaction::table(table, true), + &b, n_fields, table->record[0]);// after values + } + else + { + /* + mysql server cannot handle the ndb hidden key and + therefore needs the before image as well + */ + ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]); + print_records(table, table->record[1]); + trans.update_row(::server_id, + injector::transaction::table(table, true), + &b, n_fields, + table->record[1], // before values + table->record[0]);// after values + } + } + break; + default: + /* We should REALLY never get here. */ + DBUG_PRINT("info", ("default - uh oh, a brain exploded.")); + break; + } + + return 0; +} + +//#define RUN_NDB_BINLOG_TIMER +#ifdef RUN_NDB_BINLOG_TIMER +class Timer +{ +public: + Timer() { start(); } + void start() { gettimeofday(&m_start, 0); } + void stop() { gettimeofday(&m_stop, 0); } + ulong elapsed_ms() + { + return (ulong) + (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 + + ((longlong) m_stop.tv_usec - + (longlong) m_start.tv_usec + 999) / 1000); + } +private: + struct timeval m_start,m_stop; +}; +#endif + +/**************************************************************** + Injector thread main loop +****************************************************************/ + +pthread_handler_t ndb_binlog_thread_func(void *arg) +{ + THD *thd; /* needs to be first for thread_stack */ + Ndb *ndb= 0; + Thd_ndb *thd_ndb=0; + int ndb_update_binlog_index= 1; + injector *inj= injector::instance(); + + pthread_mutex_lock(&injector_mutex); + /* + Set up the Thread + */ + my_thread_init(); + DBUG_ENTER("ndb_binlog_thread"); + + thd= new THD; /* note that contructor of THD uses DBUG_ */ + THD_CHECK_SENTRY(thd); + + thd->thread_stack= (char*) &thd; /* remember where our stack is */ + if (thd->store_globals()) + { + thd->cleanup(); + delete thd; + ndb_binlog_thread_running= -1; + pthread_mutex_unlock(&injector_mutex); + pthread_cond_signal(&injector_cond); + my_thread_end(); + pthread_exit(0); + DBUG_RETURN(NULL); + } + + thd->init_for_queries(); + thd->command= COM_DAEMON; + thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG; + thd->version= refresh_version; + thd->set_time(); + thd->main_security_ctx.host_or_ip= ""; + thd->client_capabilities= 0; + my_net_init(&thd->net, 0); + thd->main_security_ctx.master_access= ~0; + thd->main_security_ctx.priv_user= 0; + + /* + Set up ndb binlog + */ + sql_print_information("Starting MySQL Cluster Binlog Thread"); + + pthread_detach_this_thread(); + thd->real_id= pthread_self(); + pthread_mutex_lock(&LOCK_thread_count); + thd->thread_id= thread_id++; + threads.append(thd); + pthread_mutex_unlock(&LOCK_thread_count); + thd->lex->start_transaction_opt= 0; + + if (!(schema_ndb= new Ndb(g_ndb_cluster_connection, "")) || + schema_ndb->init()) + { + sql_print_error("NDB Binlog: Getting Schema Ndb object failed"); + goto err; + } + + if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) || + ndb->init()) + { + sql_print_error("NDB Binlog: Getting Ndb object failed"); + ndb_binlog_thread_running= -1; + pthread_mutex_unlock(&injector_mutex); + pthread_cond_signal(&injector_cond); + goto err; + } + + /* + Expose global reference to our ndb object. + + Used by both sql client thread and binlog thread to interact + with the storage + pthread_mutex_lock(&injector_mutex); + */ + injector_thd= thd; + injector_ndb= ndb; + ndb_binlog_thread_running= 1; + + /* + We signal the thread that started us that we've finished + starting up. + */ + pthread_mutex_unlock(&injector_mutex); + pthread_cond_signal(&injector_cond); + + thd->proc_info= "Waiting for ndbcluster to start"; + + pthread_mutex_lock(&injector_mutex); + while (!ndbcluster_util_inited) + { + /* ndb not connected yet */ + struct timespec abstime; + set_timespec(abstime, 1); + pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); + if (abort_loop) + { + pthread_mutex_unlock(&injector_mutex); + goto err; + } + } + pthread_mutex_unlock(&injector_mutex); + + /* + Main NDB Injector loop + */ + + DBUG_ASSERT(ndbcluster_hton.slot != ~(uint)0); + if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb())) + { + sql_print_error("Could not allocate Thd_ndb object"); + goto err; + } + set_thd_ndb(thd, thd_ndb); + thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP; + thd->query_id= 0; // to keep valgrind quiet + { + static char db[]= ""; + thd->db= db; + open_binlog_index(thd, &binlog_tables, &binlog_index); + if (!apply_status_share) + { + sql_print_error("NDB: Could not get apply status share"); + } + thd->db= db; + } + +#ifdef RUN_NDB_BINLOG_TIMER + Timer main_timer; +#endif + for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) && + ndb_latest_handled_binlog_epoch >= g_latest_trans_gci); ) + { + +#ifdef RUN_NDB_BINLOG_TIMER + main_timer.stop(); + sql_print_information("main_timer %ld ms", main_timer.elapsed_ms()); + main_timer.start(); +#endif + + /* + now we don't want any events before next gci is complete + */ + thd->proc_info= "Waiting for event from ndbcluster"; + thd->set_time(); + + /* wait for event or 1000 ms */ + Uint64 gci, schema_gci; + int res= ndb->pollEvents(1000, &gci); + int schema_res= schema_ndb->pollEvents(0, &schema_gci); + ndb_latest_received_binlog_epoch= gci; + + while (gci > schema_gci && schema_res >= 0) + schema_res= schema_ndb->pollEvents(10, &schema_gci); + + if ((abort_loop || do_ndbcluster_binlog_close_connection) && + ndb_latest_handled_binlog_epoch >= g_latest_trans_gci) + break; /* Shutting down server */ + + if (binlog_index && binlog_index->s->version < refresh_version) + { + if (binlog_index->s->version < refresh_version) + { + close_thread_tables(thd); + binlog_index= 0; + } + } + + MEM_ROOT **root_ptr= + my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC); + MEM_ROOT *old_root= *root_ptr; + MEM_ROOT mem_root; + init_sql_alloc(&mem_root, 4096, 0); + List<Cluster_replication_schema> schema_list; + *root_ptr= &mem_root; + + if (unlikely(schema_res > 0)) + { + schema_ndb-> + setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); + schema_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); + NdbEventOperation *pOp= schema_ndb->nextEvent(); + while (pOp != NULL) + { + if (!pOp->hasError()) + ndb_binlog_thread_handle_schema_event(thd, schema_ndb, pOp, + &schema_list, &mem_root); + else + sql_print_error("NDB: error %lu (%s) on handling " + "binlog schema event", + (ulong) pOp->getNdbError().code, + pOp->getNdbError().message); + pOp= schema_ndb->nextEvent(); + } + } + + if (res > 0) + { + DBUG_PRINT("info", ("pollEvents res: %d", res)); +#ifdef RUN_NDB_BINLOG_TIMER + Timer gci_timer, write_timer; + int event_count= 0; +#endif + thd->proc_info= "Processing events"; + NdbEventOperation *pOp= ndb->nextEvent(); + Binlog_index_row row; + while (pOp != NULL) + { + ndb-> + setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); + ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); + + assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch); + bzero((char*) &row, sizeof(row)); + injector::transaction trans= inj->new_trans(thd); + gci= pOp->getGCI(); + if (apply_status_share) + { + TABLE *table= apply_status_share->table; + MY_BITMAP b; + uint32 bitbuf; + DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8); + bitmap_init(&b, &bitbuf, table->s->fields, false); + bitmap_set_all(&b); + table->field[0]->store((longlong)::server_id); + table->field[1]->store((longlong)gci); + trans.write_row(::server_id, + injector::transaction::table(table, true), + &b, table->s->fields, + table->record[0]); + } + else + { + sql_print_error("NDB: Could not get apply status share"); + } +#ifdef RUN_NDB_BINLOG_TIMER + write_timer.start(); +#endif + do + { +#ifdef RUN_NDB_BINLOG_TIMER + event_count++; +#endif + if (pOp->hasError() && + ndb_binlog_thread_handle_error(ndb, pOp, row) < 0) + goto err; + +#ifndef DBUG_OFF + { + NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData(); + DBUG_PRINT("info", + ("EVENT TYPE:%d GCI:%lld last applied: %lld " + "share: 0x%lx", pOp->getEventType(), gci, + ndb_latest_applied_binlog_epoch, share)); + DBUG_ASSERT(share != 0); + } +#endif + if ((unsigned) pOp->getEventType() < + (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT) + ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans); + else + ndb_binlog_thread_handle_non_data_event(ndb, pOp, row); + + pOp= ndb->nextEvent(); + } while (pOp && pOp->getGCI() == gci); + + /* + note! pOp is not referring to an event in the next epoch + or is == 0 + */ +#ifdef RUN_NDB_BINLOG_TIMER + write_timer.stop(); +#endif + + if (row.n_inserts || row.n_updates + || row.n_deletes || row.n_schemaops) + { + injector::transaction::binlog_pos start= trans.start_pos(); + if (int r= trans.commit()) + { + sql_print_error("NDB binlog:" + "Error during COMMIT of GCI. Error: %d", + r); + /* TODO: Further handling? */ + } + row.gci= gci; + row.master_log_file= start.file_name(); + row.master_log_pos= start.file_pos(); + + DBUG_PRINT("info",("COMMIT gci %lld",gci)); + if (ndb_update_binlog_index) + ndb_add_binlog_index(thd, &row); + ndb_latest_applied_binlog_epoch= gci; + } + else + trans.commit(); + ndb_latest_handled_binlog_epoch= gci; +#ifdef RUN_NDB_BINLOG_TIMER + gci_timer.stop(); + sql_print_information("gci %ld event_count %d write time " + "%ld(%d e/s), total time %ld(%d e/s)", + (ulong)gci, event_count, + write_timer.elapsed_ms(), + event_count / write_timer.elapsed_ms(), + gci_timer.elapsed_ms(), + event_count / gci_timer.elapsed_ms()); +#endif + } + } + + { + Cluster_replication_schema *schema; + while ((schema= schema_list.pop())) + { + char *thd_db_save= thd->db; + thd->db= schema->db; + thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query, + schema->query_length, FALSE, + schema->name[0] == 0); + thd->db= thd_db_save; + } + } + free_root(&mem_root, MYF(0)); + *root_ptr= old_root; + ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch; + } +err: + DBUG_PRINT("info",("Shutting down cluster binlog thread")); + close_thread_tables(thd); + pthread_mutex_lock(&injector_mutex); + /* don't mess with the injector_ndb anymore from other threads */ + injector_ndb= 0; + pthread_mutex_unlock(&injector_mutex); + thd->db= 0; // as not to try to free memory + sql_print_information("Stopping Cluster Binlog"); + + if (apply_status_share) + free_share(&apply_status_share); + if (schema_share) + free_share(&schema_share); + + /* remove all event operations */ + if (ndb) + { + NdbEventOperation *op; + DBUG_PRINT("info",("removing all event operations")); + while ((op= ndb->getEventOperation())) + { + DBUG_PRINT("info",("removing event operation on %s", + op->getEvent()->getName())); + NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); + free_share(&share); + ndb->dropEventOperation(op); + } + delete ndb; + ndb= 0; + } + + // Placed here to avoid a memory leak; TODO: check if needed + net_end(&thd->net); + delete thd; + + ndb_binlog_thread_running= -1; + (void) pthread_cond_signal(&injector_cond); + + DBUG_PRINT("exit", ("ndb_binlog_thread")); + my_thread_end(); + + pthread_exit(0); + DBUG_RETURN(NULL); +} + +bool +ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, + enum ha_stat_type stat_type) +{ + char buf[IO_SIZE]; + uint buflen; + ulonglong ndb_latest_epoch= 0; + DBUG_ENTER("ndbcluster_show_status_binlog"); + + pthread_mutex_lock(&injector_mutex); + if (injector_ndb) + { + ndb_latest_epoch= injector_ndb->getLatestGCI(); + pthread_mutex_unlock(&injector_mutex); + + buflen= + snprintf(buf, sizeof(buf), + "latest_epoch=%llu, " + "latest_trans_epoch=%llu, " + "latest_received_binlog_epoch=%llu, " + "latest_handled_binlog_epoch=%llu, " + "latest_applied_binlog_epoch=%llu", + ndb_latest_epoch, + g_latest_trans_gci, + ndb_latest_received_binlog_epoch, + ndb_latest_handled_binlog_epoch, + ndb_latest_applied_binlog_epoch); + if (stat_print(thd, ndbcluster_hton.name, strlen(ndbcluster_hton.name), + "binlog", strlen("binlog"), + buf, buflen)) + DBUG_RETURN(TRUE); + } + else + pthread_mutex_unlock(&injector_mutex); + DBUG_RETURN(FALSE); +} + +#endif /* HAVE_NDB_BINLOG */ diff --git a/sql/ha_ndbcluster_binlog.h b/sql/ha_ndbcluster_binlog.h new file mode 100644 index 00000000000..5334120b43f --- /dev/null +++ b/sql/ha_ndbcluster_binlog.h @@ -0,0 +1,162 @@ +/* Copyright (C) 2000-2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +// Typedefs for long names +typedef NdbDictionary::Object NDBOBJ; +typedef NdbDictionary::Column NDBCOL; +typedef NdbDictionary::Table NDBTAB; +typedef NdbDictionary::Index NDBINDEX; +typedef NdbDictionary::Dictionary NDBDICT; +typedef NdbDictionary::Event NDBEVENT; + +#define IS_TMP_PREFIX(A) (is_prefix(A, tmp_file_prefix) || is_prefix(A, "@0023sql")) + +extern ulong ndb_extra_logging; + +#ifdef HAVE_NDB_BINLOG + +#define INJECTOR_EVENT_LEN 200 + +enum SCHEMA_OP_TYPE +{ + SOT_DROP_TABLE, + SOT_CREATE_TABLE, + SOT_RENAME_TABLE, + SOT_ALTER_TABLE, + SOT_DROP_DB, + SOT_CREATE_DB, + SOT_ALTER_DB, + SOT_CLEAR_SLOCK +}; + +const uint max_ndb_nodes= 64; /* multiple of 32 */ + +extern pthread_t ndb_binlog_thread; +extern pthread_mutex_t injector_mutex; +extern pthread_cond_t injector_cond; + +static const char *ha_ndb_ext=".ndb"; +static const char share_prefix[]= "./"; + +extern unsigned char g_node_id_map[max_ndb_nodes]; +extern handlerton ndbcluster_hton; +extern pthread_t ndb_util_thread; +extern pthread_mutex_t LOCK_ndb_util_thread; +extern pthread_cond_t COND_ndb_util_thread; +extern int ndbcluster_util_inited; +extern pthread_mutex_t ndbcluster_mutex; +extern HASH ndbcluster_open_tables; +extern Ndb_cluster_connection* g_ndb_cluster_connection; +extern long ndb_number_of_storage_nodes; + +/* + Initialize the binlog part of the ndb handlerton +*/ +void ndbcluster_binlog_init_handlerton(); +/* + Initialize the binlog part of the NDB_SHARE +*/ +void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *table); + +int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, + const char *db, + const char *table_name, + NDB_SHARE *share); +int ndbcluster_create_event(Ndb *ndb, const NDBTAB *table, + const char *event_name, NDB_SHARE *share); +int ndbcluster_create_event_ops(NDB_SHARE *share, + const NDBTAB *ndbtab, + const char *event_name); +int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, + const char *query, int query_length, + const char *db, const char *table_name, + uint32 ndb_table_id, + uint32 ndb_table_version, + enum SCHEMA_OP_TYPE type); +int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, + NDB_SHARE *share); +void ndb_rep_event_name(String *event_name, + const char *db, const char *tbl); + +int ndbcluster_binlog_start(); +pthread_handler_t ndb_binlog_thread_func(void *arg); + +/* + table cluster_replication.apply_status +*/ +void ndbcluster_setup_binlog_table_shares(THD *thd); +extern NDB_SHARE *apply_status_share; +extern NDB_SHARE *schema_share; + +extern THD *injector_thd; +extern int ndb_binlog_thread_running; + +bool +ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, + enum ha_stat_type stat_type); + +/* + prototypes for ndb handler utility function also needed by + the ndb binlog code +*/ +int ndbcluster_find_all_files(THD *thd); +void ndb_unpack_record(TABLE *table, NdbValue *value, + MY_BITMAP *defined, byte *buf); + +NDB_SHARE *ndbcluster_get_share(const char *key, + TABLE *table, + bool create_if_not_exists, + bool have_lock); +NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share); +void ndbcluster_free_share(NDB_SHARE **share, bool have_lock); +void ndbcluster_real_free_share(NDB_SHARE **share); +int handle_trailing_share(NDB_SHARE *share); +inline NDB_SHARE *get_share(const char *key, + TABLE *table, + bool create_if_not_exists= TRUE, + bool have_lock= FALSE) +{ + return ndbcluster_get_share(key, table, create_if_not_exists, have_lock); +} + +inline NDB_SHARE *get_share(NDB_SHARE *share) +{ + return ndbcluster_get_share(share); +} + +inline void free_share(NDB_SHARE **share, bool have_lock= FALSE) +{ + ndbcluster_free_share(share, have_lock); +} + +inline void real_free_share(NDB_SHARE **share) +{ + ndbcluster_real_free_share(share); +} + +inline +Thd_ndb * +get_thd_ndb(THD *thd) { return (Thd_ndb *) thd->ha_data[ndbcluster_hton.slot]; } + +inline +void +set_thd_ndb(THD *thd, Thd_ndb *thd_ndb) { thd->ha_data[ndbcluster_hton.slot]= thd_ndb; } + +Ndb* check_ndb_in_thd(THD* thd); + + +#endif /* HAVE_NDB_BINLOG */ diff --git a/sql/ha_ndbcluster_tables.h b/sql/ha_ndbcluster_tables.h new file mode 100644 index 00000000000..d726fd63e1d --- /dev/null +++ b/sql/ha_ndbcluster_tables.h @@ -0,0 +1,21 @@ +/* Copyright (C) 2000-2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#define NDB_REP_DB "cluster_replication" +#define NDB_REP_TABLE "binlog_index" +#define NDB_APPLY_TABLE "apply_status" +#define NDB_SCHEMA_TABLE "schema" diff --git a/sql/handler.cc b/sql/handler.cc index 2391c54971d..ead22b6b03f 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2411,6 +2411,132 @@ int ha_table_exists_in_engine(THD* thd, const char* db, const char* name) DBUG_RETURN(error); } +#ifdef HAVE_NDB_BINLOG +/* + TODO: change this into a dynamic struct + List<handlerton> does not work as + 1. binlog_end is called when MEM_ROOT is gone + 2. cannot work with thd MEM_ROOT as memory should be freed +*/ +#define MAX_HTON_LIST_ST 63 +struct hton_list_st +{ + handlerton *hton[MAX_HTON_LIST_ST]; + uint sz; +}; + +struct binlog_func_st +{ + enum_binlog_func fn; + void *arg; +}; + +/* + Listing handlertons first to avoid recursive calls and deadlock +*/ +static my_bool binlog_func_list(THD *thd, st_plugin_int *plugin, void *arg) +{ + hton_list_st *hton_list= (hton_list_st *)arg; + handlerton *hton= (handlerton *) plugin->plugin->info; + if (hton->state == SHOW_OPTION_YES && hton->binlog_func) + { + uint sz= hton_list->sz; + if (sz == MAX_HTON_LIST_ST-1) + { + /* list full */ + return FALSE; + } + hton_list->hton[sz]= hton; + hton_list->sz= sz+1; + } + return FALSE; +} + +static my_bool binlog_func_foreach(THD *thd, binlog_func_st *bfn) +{ + handlerton *hton; + hton_list_st hton_list; + hton_list.sz= 0; + plugin_foreach(thd, binlog_func_list, + MYSQL_STORAGE_ENGINE_PLUGIN, &hton_list); + + uint i= 0, sz= hton_list.sz; + while(i < sz) + hton_list.hton[i++]->binlog_func(thd, bfn->fn, bfn->arg); + return FALSE; +} + +int ha_reset_logs(THD *thd) +{ + binlog_func_st bfn= {BFN_RESET_LOGS, 0}; + binlog_func_foreach(thd, &bfn); + return 0; +} + +void ha_reset_slave(THD* thd) +{ + binlog_func_st bfn= {BFN_RESET_SLAVE, 0}; + binlog_func_foreach(thd, &bfn); +} + +void ha_binlog_wait(THD* thd) +{ + binlog_func_st bfn= {BFN_BINLOG_WAIT, 0}; + binlog_func_foreach(thd, &bfn); +} + +int ha_binlog_end(THD* thd) +{ + binlog_func_st bfn= {BFN_BINLOG_END, 0}; + binlog_func_foreach(thd, &bfn); + return 0; +} + +int ha_binlog_index_purge_file(THD *thd, const char *file) +{ + binlog_func_st bfn= {BFN_BINLOG_PURGE_FILE, (void *)file}; + binlog_func_foreach(thd, &bfn); +} + +struct binlog_log_query_st +{ + enum_binlog_command binlog_command; + const char *query; + uint query_length; + const char *db; + const char *table_name; +}; + +static my_bool binlog_log_query_handlerton(THD *thd, + st_plugin_int *plugin, + void *args) +{ + struct binlog_log_query_st *b= (struct binlog_log_query_st*)args; + handlerton *hton= (handlerton *) plugin->plugin->info; + if (hton->state == SHOW_OPTION_YES && hton->binlog_log_query) + hton->binlog_log_query(thd, + b->binlog_command, + b->query, + b->query_length, + b->db, + b->table_name); + return FALSE; +} + +void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command, + const char *query, uint query_length, + const char *db, const char *table_name) +{ + struct binlog_log_query_st b; + b.binlog_command= binlog_command; + b.query= query; + b.query_length= query_length; + b.db= db; + b.table_name= table_name; + plugin_foreach(thd, binlog_log_query_handlerton, + MYSQL_STORAGE_ENGINE_PLUGIN, &b); +} +#endif /* Read the first row of a multi-range set. @@ -2832,6 +2958,8 @@ template<class RowsEventT> int binlog_log_row(TABLE* table, const byte *before_record, const byte *after_record) { + if (table->file->is_injective()) + return 0; bool error= 0; THD *const thd= current_thd; diff --git a/sql/handler.h b/sql/handler.h index f6680679a35..1ed19b72331 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -205,6 +205,24 @@ enum row_type { ROW_TYPE_NOT_USED=-1, ROW_TYPE_DEFAULT, ROW_TYPE_FIXED, ROW_TYPE_DYNAMIC, ROW_TYPE_COMPRESSED, ROW_TYPE_REDUNDANT, ROW_TYPE_COMPACT }; +enum enum_binlog_func { + BFN_RESET_LOGS= 1, + BFN_RESET_SLAVE= 2, + BFN_BINLOG_WAIT= 3, + BFN_BINLOG_END= 4, + BFN_BINLOG_PURGE_FILE= 5 +}; + +enum enum_binlog_command { + LOGCOM_CREATE_TABLE, + LOGCOM_ALTER_TABLE, + LOGCOM_RENAME_TABLE, + LOGCOM_DROP_TABLE, + LOGCOM_CREATE_DB, + LOGCOM_ALTER_DB, + LOGCOM_DROP_DB +}; + /* struct to hold information about the table that should be created */ /* Bits in used_fields */ @@ -420,7 +438,8 @@ typedef struct handlerton structure version */ const int interface_version; -#define MYSQL_HANDLERTON_INTERFACE_VERSION 0x0000 +/* last version change: 0x0001 in 5.1.6 */ +#define MYSQL_HANDLERTON_INTERFACE_VERSION 0x0001 /* @@ -512,6 +531,15 @@ typedef struct bool (*show_status)(THD *thd, stat_print_fn *print, enum ha_stat_type stat); int (*alter_tablespace)(THD *thd, st_alter_tablespace *ts_info); uint32 flags; /* global handler flags */ + /* + Handlerton functions are not set in the different storage + engines static initialization. They are initialized at handler init. + Thus, leave them last in the struct. + */ + int (*binlog_func)(THD *thd, enum_binlog_func fn, void *arg); + void (*binlog_log_query)(THD *thd, enum_binlog_command binlog_command, + const char *query, uint query_length, + const char *db, const char *table_name); } handlerton; extern const handlerton default_hton; @@ -1195,6 +1223,12 @@ public: virtual int ha_update_row(const byte * old_data, byte * new_data); virtual int ha_delete_row(const byte * buf); /* + If the handler does it's own injection of the rows, this member function + should return 'true'. + */ + virtual bool is_injective() const { return false; } + + /* SYNOPSIS start_bulk_update() RETURN @@ -1705,3 +1739,21 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht); int ha_repl_report_sent_binlog(THD *thd, char *log_file_name, my_off_t end_offset); int ha_repl_report_replication_stop(THD *thd); + +#ifdef HAVE_NDB_BINLOG +int ha_reset_logs(THD *thd); +int ha_binlog_index_purge_file(THD *thd, const char *file); +void ha_reset_slave(THD *thd); +void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command, + const char *query, uint query_length, + const char *db, const char *table_name); +void ha_binlog_wait(THD *thd); +int ha_binlog_end(THD *thd); +#else +#define ha_reset_logs(a) 0 +#define ha_binlog_index_purge_file(a,b) 0 +#define ha_reset_slave(a) +#define ha_binlog_log_query(a,b,c,d,e,f); +#define ha_binlog_wait(a) +#define ha_binlog_end(a) 0 +#endif diff --git a/sql/log.cc b/sql/log.cc index fe95419fffd..7232d3a24dd 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -988,6 +988,7 @@ bool MYSQL_LOG::reset_logs(THD* thd) enum_log_type save_log_type; DBUG_ENTER("reset_logs"); + ha_reset_logs(thd); /* We need to get both locks to be sure that no one is trying to write to the index log file. @@ -1237,6 +1238,9 @@ int MYSQL_LOG::purge_logs(const char *to_log, DBUG_PRINT("info",("purging %s",log_info.log_file_name)); if (!my_delete(log_info.log_file_name, MYF(0)) && decrease_log_space) *decrease_log_space-= file_size; + + ha_binlog_index_purge_file(current_thd, log_info.log_file_name); + if (find_next_log(&log_info, 0) || exit_loop) break; } @@ -1297,6 +1301,9 @@ int MYSQL_LOG::purge_logs_before_date(time_t purge_time) stat_area.st_mtime >= purge_time) break; my_delete(log_info.log_file_name, MYF(0)); + + ha_binlog_index_purge_file(current_thd, log_info.log_file_name); + if (find_next_log(&log_info, 0)) break; } diff --git a/sql/log_event.cc b/sql/log_event.cc index 712fff15774..04a6276d476 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -79,6 +79,20 @@ static void clear_all_errors(THD *thd, struct st_relay_log_info *rli) inline int ignored_error_code(int err_code) { +#ifdef HAVE_NDB_BINLOG + /* + The following error codes are hard-coded and will always be ignored. + */ + switch (err_code) + { + case ER_DB_CREATE_EXISTS: + case ER_DB_DROP_EXISTS: + return 1; + default: + /* Nothing to do */ + break; + } +#endif return ((err_code == ER_SLAVE_IGNORED_TABLE) || (use_slave_mask && bitmap_is_set(&slave_error_mask, err_code))); } @@ -5276,7 +5290,8 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) { slave_print_msg(ERROR_LEVEL, rli, error, "Error in %s event: error during table %s.%s lock", - get_type_str(), table->s->db, table->s->table_name); + get_type_str(), table->s->db.str, + table->s->table_name.str); DBUG_RETURN(error); } /* @@ -5412,7 +5427,12 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) if (error) { /* error has occured during the transaction */ - /* + slave_print_msg(ERROR_LEVEL, rli, error, + "Error in %s event: error during transaction execution " + "on table %s.%s", + get_type_str(), table->s->db.str, + table->s->table_name.str); + /* If one day we honour --skip-slave-errors in row-based replication, and the error should be skipped, then we would clear mappings, rollback, close tables, but the slave SQL thread would not stop and then may @@ -5485,7 +5505,8 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) slave_print_msg(ERROR_LEVEL, rli, error, "Error in %s event: commit of row events failed, " "table `%s`.`%s`", - get_type_str(), table->s->db, table->s->table_name); + get_type_str(), table->s->db.str, + table->s->table_name.str); DBUG_RETURN(error); } @@ -5585,8 +5606,8 @@ void Rows_log_event::pack_info(Protocol *protocol) { char buf[256]; char const *const flagstr= get_flags(STMT_END_F) ? "STMT_END_F" : ""; - char const *const dbnam= m_table->s->db; - char const *const tblnam= m_table->s->table_name; + char const *const dbnam= m_table->s->db.str; + char const *const tblnam= m_table->s->table_name.str; my_size_t bytes= snprintf(buf, sizeof(buf), "%s.%s - %s", dbnam, tblnam, flagstr); protocol->store(buf, bytes, &my_charset_bin); @@ -6105,7 +6126,8 @@ int Write_rows_log_event::do_before_row_operations(TABLE *table) */ thd->lex->sql_command= SQLCOM_REPLACE; - table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); // needed for ndbcluster + table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); // Needed for ndbcluster + table->file->extra(HA_EXTRA_IGNORE_NO_KEY); // Needed for ndbcluster /* TODO: the cluster team (Tomas?) says that it's better if the engine knows how many rows are going to be inserted, then it can allocate needed memory @@ -6373,6 +6395,9 @@ static int find_and_fetch_row(TABLE *table, byte *key, byte *record_buf) DBUG_ASSERT(record_buf); + /* We need to retrieve all fields */ + table->file->ha_set_all_bits_in_read_set(); + if (table->s->keys > 0) { int error; diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index b3bc49b31d1..026234caf34 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -538,6 +538,7 @@ enum enum_mysql_completiontype { COMMIT_RELEASE=-1, COMMIT=0, COMMIT_AND_CHAIN=6 }; +bool begin_trans(THD *thd); int end_trans(THD *thd, enum enum_mysql_completiontype completion); Item *negate_expression(THD *thd, Item *expr); @@ -640,6 +641,7 @@ bool table_cache_init(void); void table_cache_free(void); bool table_def_init(void); void table_def_free(void); +void assign_new_table_id(TABLE *table); uint cached_open_tables(void); uint cached_table_definitions(void); void kill_mysql(void); @@ -1041,7 +1043,7 @@ bool is_equal(const LEX_STRING *a, const LEX_STRING *b); bool remove_table_from_cache(THD *thd, const char *db, const char *table, uint flags); -bool close_cached_tables(THD *thd, bool wait_for_refresh, TABLE_LIST *tables); +bool close_cached_tables(THD *thd, bool wait_for_refresh, TABLE_LIST *tables, bool have_lock = FALSE); void copy_field_from_tmp_record(Field *field,int offset); bool fill_record(THD *thd, Field **field, List<Item> &values, bool ignore_errors); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index cff969d4113..cf1a4c4c936 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -417,6 +417,11 @@ my_bool opt_ndb_shm, opt_ndb_optimized_node_selection; ulong opt_ndb_cache_check_time; const char *opt_ndb_mgmd; ulong opt_ndb_nodeid; +ulong ndb_extra_logging; +#ifdef HAVE_NDB_BINLOG +ulong ndb_report_thresh_binlog_epoch_slip; +ulong ndb_report_thresh_binlog_mem_usage; +#endif extern SHOW_VAR ndb_status_variables[]; extern const char *ndb_distribution_names[]; @@ -1134,6 +1139,11 @@ void clean_up(bool print_message) mysql_log.cleanup(); mysql_slow_log.cleanup(); + /* + make sure that handlers finish up + what they have that is dependent on the binlog + */ + ha_binlog_end(current_thd); mysql_bin_log.cleanup(); #ifdef HAVE_REPLICATION @@ -3106,11 +3116,12 @@ with --log-bin instead."); } if (opt_binlog_format_id == BF_UNSPECIFIED) { - /* - We use statement-based by default, but could change this to be row-based - if this is a cluster build (i.e. have_ndbcluster is true)... - */ - opt_binlog_format_id= BF_STMT; +#ifdef HAVE_NDB_BINLOG + if (have_ndbcluster == SHOW_OPTION_YES) + opt_binlog_format_id= BF_ROW; + else +#endif + opt_binlog_format_id= BF_STMT; } #ifdef HAVE_ROW_BASED_REPLICATION if (opt_binlog_format_id == BF_ROW) @@ -4646,6 +4657,9 @@ enum options_mysqld OPT_NDB_DISTRIBUTION, OPT_NDB_INDEX_STAT_ENABLE, OPT_NDB_INDEX_STAT_CACHE_ENTRIES, OPT_NDB_INDEX_STAT_UPDATE_FREQ, + OPT_NDB_EXTRA_LOGGING, + OPT_NDB_REPORT_THRESH_BINLOG_EPOCH_SLIP, + OPT_NDB_REPORT_THRESH_BINLOG_MEM_USAGE, OPT_SKIP_SAFEMALLOC, OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_COMPLETION_TYPE, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS, @@ -4848,7 +4862,11 @@ Disable with --skip-bdb (will save memory).", "Tell the master the form of binary logging to use: either 'row' for " "row-based binary logging (which automatically turns on " "innodb_locks_unsafe_for_binlog as it is safe in this case), or " - "'statement' for statement-based logging. ", + "'statement' for statement-based logging. " +#ifdef HAVE_NDB_BINLOG + "If ndbcluster is enabled, the default will be set to 'row'." +#endif + , #else "Tell the master the form of binary logging to use: this release build " "supports only statement-based binary logging, so only 'statement' is " @@ -5302,6 +5320,29 @@ Disable with --skip-ndbcluster (will save memory).", (gptr*) &global_system_variables.ndb_force_send, (gptr*) &global_system_variables.ndb_force_send, 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0}, + {"ndb-extra-logging", OPT_NDB_EXTRA_LOGGING, + "Turn on more logging in the error log.", + (gptr*) &ndb_extra_logging, + (gptr*) &ndb_extra_logging, + 0, GET_INT, OPT_ARG, 0, 0, 0, 0, 0, 0}, +#ifdef HAVE_NDB_BINLOG + {"ndb-report-thresh-binlog-epoch-slip", OPT_NDB_REPORT_THRESH_BINLOG_EPOCH_SLIP, + "Threshold on number of epochs to be behind before reporting binlog status. " + "E.g. 3 means that if the difference between what epoch has been received " + "from the storage nodes and what has been applied to the binlog is 3 or more, " + "a status message will be sent to the cluster log.", + (gptr*) &ndb_report_thresh_binlog_epoch_slip, + (gptr*) &ndb_report_thresh_binlog_epoch_slip, + 0, GET_ULONG, REQUIRED_ARG, 3, 0, 256, 0, 0, 0}, + {"ndb-report-thresh-binlog-mem-usage", OPT_NDB_REPORT_THRESH_BINLOG_MEM_USAGE, + "Threshold on percentage of free memory before reporting binlog status. E.g. " + "10 means that if amount of available memory for receiving binlog data from " + "the storage nodes goes below 10%, " + "a status message will be sent to the cluster log.", + (gptr*) &ndb_report_thresh_binlog_mem_usage, + (gptr*) &ndb_report_thresh_binlog_mem_usage, + 0, GET_ULONG, REQUIRED_ARG, 10, 0, 100, 0, 0, 0}, +#endif {"ndb-use-exact-count", OPT_NDB_USE_EXACT_COUNT, "Use exact records count during query planning and for fast " "select count(*), disable for faster queries.", @@ -7500,6 +7541,14 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), } opt_ndb_distribution_id= (enum ndb_distribution)(id-1); break; + case OPT_NDB_EXTRA_LOGGING: + if (!argument) + ndb_extra_logging++; + else if (argument == disabled_my_option) + ndb_extra_logging= 0L; + else + ndb_extra_logging= atoi(argument); + break; #endif case OPT_INNODB: #ifdef WITH_INNOBASE_STORAGE_ENGINE diff --git a/sql/rpl_injector.cc b/sql/rpl_injector.cc new file mode 100644 index 00000000000..a69dea9a158 --- /dev/null +++ b/sql/rpl_injector.cc @@ -0,0 +1,153 @@ +/* + Copyright (C) 2005 MySQL AB & MySQL Finland AB & TCX DataKonsult AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "mysql_priv.h" +#include "rpl_injector.h" +#ifdef HAVE_ROW_BASED_REPLICATION + +/* + injector::transaction - member definitions +*/ + +/* inline since it's called below */ +inline +injector::transaction::transaction(MYSQL_LOG *log, THD *thd) + : m_thd(thd) +{ + /* + Default initialization of m_start_pos (which initializes it to garbage). + We need to fill it in using the code below. + */ + LOG_INFO log_info; + log->get_current_log(&log_info); + /* !!! binlog_pos does not follow RAII !!! */ + m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0)); + m_start_pos.m_file_pos= log_info.pos; + + begin_trans(m_thd); +} + +injector::transaction::~transaction() +{ + /* Needed since my_free expects a 'char*' (instead of 'void*'). */ + char* const the_memory= const_cast<char*>(m_start_pos.m_file_name); + + /* + We set the first character to null just to give all the copies of the + start position a (minimal) chance of seening that the memory is lost. + All assuming the my_free does not step over the memory, of course. + */ + *the_memory= '\0'; + + my_free(the_memory, MYF(0)); +} + +int injector::transaction::commit() +{ + DBUG_ENTER("injector::transaction::commit()"); + m_thd->binlog_flush_pending_rows_event(true); + end_trans(m_thd, COMMIT); + DBUG_RETURN(0); +} + + +int injector::transaction::write_row (server_id_type sid, table tbl, + MY_BITMAP const* cols, size_t colcnt, + record_type record) +{ + DBUG_ENTER("injector::transaction::write_row(...)"); + m_thd->set_server_id(sid); + m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(), + cols, colcnt, record); + DBUG_RETURN(0); +} + + +int injector::transaction::delete_row(server_id_type sid, table tbl, + MY_BITMAP const* cols, size_t colcnt, + record_type record) +{ + DBUG_ENTER("injector::transaction::delete_row(...)"); + m_thd->set_server_id(sid); + m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(), + cols, colcnt, record); + DBUG_RETURN(0); +} + + +int injector::transaction::update_row(server_id_type sid, table tbl, + MY_BITMAP const* cols, size_t colcnt, + record_type before, record_type after) +{ + DBUG_ENTER("injector::transaction::update_row(...)"); + m_thd->set_server_id(sid); + m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(), + cols, colcnt, before, after); + DBUG_RETURN(0); +} + + +injector::transaction::binlog_pos injector::transaction::start_pos() const +{ + return m_start_pos; +} + + +/* + injector - member definitions +*/ + +/* This constructor is called below */ +inline injector::injector() +{ +} + +static injector *s_injector= 0; +injector *injector::instance() +{ + if (s_injector == 0) + s_injector= new injector; + /* "There can be only one [instance]" */ + return s_injector; +} + + + +injector::transaction injector::new_trans(THD *thd) +{ + DBUG_ENTER("injector::new_trans(THD*)"); + /* + Currently, there is no alternative to using 'mysql_bin_log' since that + is hardcoded into the way the handler is using the binary log. + */ + DBUG_RETURN(transaction(&mysql_bin_log, thd)); +} + +void injector::new_trans(THD *thd, injector::transaction *ptr) +{ + DBUG_ENTER("injector::new_trans(THD *, transaction *)"); + /* + Currently, there is no alternative to using 'mysql_bin_log' since that + is hardcoded into the way the handler is using the binary log. + */ + transaction trans(&mysql_bin_log, thd); + ptr->swap(trans); + + DBUG_VOID_RETURN; +} + +#endif diff --git a/sql/rpl_injector.h b/sql/rpl_injector.h new file mode 100644 index 00000000000..32d3fdd1a78 --- /dev/null +++ b/sql/rpl_injector.h @@ -0,0 +1,251 @@ +/* + Copyright (C) 2005 MySQL AB & MySQL Finland AB & TCX DataKonsult AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef INJECTOR_H +#define INJECTOR_H + +/* Pull in 'byte', 'my_off_t', and 'uint32' */ +#include <my_global.h> + +#ifdef HAVE_ROW_BASED_REPLICATION +#include <my_bitmap.h> + +/* Forward declarations */ +class handler; +class MYSQL_LOG; +class st_table; + +typedef st_table TABLE; + +/* + Injector to inject rows into the MySQL server. + + The injector class is used to notify the MySQL server of new rows that have + appeared outside of MySQL control. + + The original purpose of this is to allow clusters---which handle replication + inside the cluster through other means---to insert new rows into binary log. + Note, however, that the injector should be used whenever rows are altered in + any manner that is outside of MySQL server visibility and which therefore + are not seen by the MySQL server. + */ +class injector +{ +public: + + /* + Get an instance of the injector. + + DESCRIPTION + The injector is a Singleton, so this static function return the + available instance of the injector. + + RETURN VALUE + A pointer to the available injector object. + */ + static injector *instance(); + + /* + A transaction where rows can be added. + + DESCRIPTION + The transaction class satisfy the **CopyConstructible** and + **Assignable** requirements. Note that the transaction is *not* + default constructible. + */ + class transaction { + friend class injector; + public: + /* Convenience definitions */ + typedef byte* record_type; + typedef uint32 server_id_type; + + /* + Table reference. + + RESPONSIBILITY + + The class contains constructors to handle several forms of + references to tables. The constructors can implicitly be used to + construct references from, e.g., strings containing table names. + + EXAMPLE + + The class is intended to be used *by value*. Please, do not try to + construct objects of this type using 'new'; instead construct an + object, possibly a temporary object. For example: + + injector::transaction::table tbl(share->table, true); + MY_BITMAP cols; + bitmap_init(&cols, NULL, (i + 7) / 8, false); + inj->write_row(::server_id, tbl, &cols, row_data); + + or + + MY_BITMAP cols; + bitmap_init(&cols, NULL, (i + 7) / 8, false); + inj->write_row(::server_id, + injector::transaction::table(share->table, true), + &cols, row_data); + + This will work, be more efficient, and have greater chance of + inlining, not run the risk of losing pointers. + + COLLABORATION + + injector::transaction + Provide a flexible interface to the representation of tables. + + */ + class table + { + public: + table(TABLE *table, bool is_transactional) + : m_table(table), m_is_transactional(is_transactional) + { + } + + char const *db_name() const { return m_table->s->db.str; } + char const *table_name() const { return m_table->s->table_name.str; } + TABLE *get_table() const { return m_table; } + bool is_transactional() const { return m_is_transactional; } + + private: + TABLE *m_table; + bool m_is_transactional; + }; + + /* + Binlog position as a structure. + */ + class binlog_pos { + friend class transaction; + public: + char const *file_name() const { return m_file_name; } + my_off_t file_pos() const { return m_file_pos; } + + private: + char const *m_file_name; + my_off_t m_file_pos; + }; + + transaction() : m_thd(NULL) { } + transaction(transaction const&); + ~transaction(); + + /* Clear transaction, i.e., make calls to 'good()' return false. */ + void clear() { m_thd= NULL; } + + /* Is the transaction in a good state? */ + bool good() const { return m_thd != NULL; } + + /* Default assignment operator: standard implementation */ + transaction& operator=(transaction t) { + swap(t); + return *this; + } + + /* + Add a 'write row' entry to the transaction. + */ + int write_row (server_id_type sid, table tbl, + MY_BITMAP const *cols, size_t colcnt, + record_type record); + + /* + Add a 'delete row' entry to the transaction. + */ + int delete_row(server_id_type sid, table tbl, + MY_BITMAP const *cols, size_t colcnt, + record_type record); + + /* + Add an 'update row' entry to the transaction. + */ + int update_row(server_id_type sid, table tbl, + MY_BITMAP const *cols, size_t colcnt, + record_type before, record_type after); + + /* + Commit a transaction. + + This member function will clean up after a sequence of *_row calls by, + for example, releasing resource and unlocking files. + */ + int commit(); + + /* + Get the position for the start of the transaction. + + Returns the position in the binary log of the first event in this + transaction. If no event is yet written, the position where the event + *will* be written is returned. This position is known, since a + new_transaction() will lock the binary log and prevent any other + writes to the binary log. + */ + binlog_pos start_pos() const; + + private: + /* Only the injector may construct these object */ + transaction(MYSQL_LOG *, THD *); + + void swap(transaction& o) { + /* std::swap(m_start_pos, o.m_start_pos); */ + { + binlog_pos const tmp= m_start_pos; + m_start_pos= o.m_start_pos; + o.m_start_pos= tmp; + } + + /* std::swap(m_thd, o.m_thd); */ + { + THD* const tmp= m_thd; + m_thd= o.m_thd; + o.m_thd= tmp; + } + } + + binlog_pos m_start_pos; + THD *m_thd; + }; + + /* + Create a new transaction. This member function will prepare for a + sequence of *_row calls by, for example, reserving resources and + locking files. There are two overloaded alternatives: one returning a + transaction by value and one using placement semantics. The following + two calls are equivalent, with the exception that the latter will + overwrite the transaction. + + injector::transaction trans1= inj->new_trans(thd); + + injector::transaction trans2; + inj->new_trans(thd, &trans); + */ + transaction new_trans(THD *); + void new_trans(THD *, transaction *); + +private: + explicit injector(); + ~injector() { } /* Nothing needs to be done */ + injector(injector const&); /* You're not allowed to copy injector + instances. + */ +}; + +#endif /* HAVE_ROW_BASED_REPLICATION */ +#endif /* INJECTOR_H */ diff --git a/sql/set_var.cc b/sql/set_var.cc index f0b1779efc5..1ccd590171f 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -101,6 +101,11 @@ extern ulong srv_commit_concurrency; /* WITH_NDBCLUSTER_STORAGE_ENGINE */ extern ulong ndb_cache_check_time; +extern ulong ndb_extra_logging; +#ifdef HAVE_NDB_BINLOG +extern ulong ndb_report_thresh_binlog_epoch_slip; +extern ulong ndb_report_thresh_binlog_mem_usage; +#endif @@ -481,6 +486,14 @@ sys_ndb_autoincrement_prefetch_sz("ndb_autoincrement_prefetch_sz", &SV::ndb_autoincrement_prefetch_sz); sys_var_thd_bool sys_ndb_force_send("ndb_force_send", &SV::ndb_force_send); +#ifdef HAVE_NDB_BINLOG +sys_var_long_ptr +sys_ndb_report_thresh_binlog_epoch_slip("ndb_report_thresh_binlog_epoch_slip", + &ndb_report_thresh_binlog_epoch_slip); +sys_var_long_ptr +sys_ndb_report_thresh_binlog_mem_usage("ndb_report_thresh_binlog_mem_usage", + &ndb_report_thresh_binlog_mem_usage); +#endif sys_var_thd_bool sys_ndb_use_exact_count("ndb_use_exact_count", &SV::ndb_use_exact_count); sys_var_thd_bool @@ -496,6 +509,8 @@ sys_ndb_index_stat_cache_entries("ndb_index_stat_cache_entries", sys_var_thd_ulong sys_ndb_index_stat_update_freq("ndb_index_stat_update_freq", &SV::ndb_index_stat_update_freq); +sys_var_long_ptr +sys_ndb_extra_logging("ndb_extra_logging", &ndb_extra_logging); /* Time/date/datetime formats */ @@ -847,10 +862,17 @@ SHOW_VAR init_vars[]= { {sys_ndb_autoincrement_prefetch_sz.name, (char*) &sys_ndb_autoincrement_prefetch_sz, SHOW_SYS}, {sys_ndb_cache_check_time.name,(char*) &sys_ndb_cache_check_time, SHOW_SYS}, + {sys_ndb_extra_logging.name,(char*) &sys_ndb_extra_logging, SHOW_SYS}, {sys_ndb_force_send.name, (char*) &sys_ndb_force_send, SHOW_SYS}, {sys_ndb_index_stat_cache_entries.name, (char*) &sys_ndb_index_stat_cache_entries, SHOW_SYS}, {sys_ndb_index_stat_enable.name, (char*) &sys_ndb_index_stat_enable, SHOW_SYS}, {sys_ndb_index_stat_update_freq.name, (char*) &sys_ndb_index_stat_update_freq, SHOW_SYS}, +#ifdef HAVE_NDB_BINLOG + {sys_ndb_report_thresh_binlog_epoch_slip.name, + (char*) &sys_ndb_report_thresh_binlog_epoch_slip, SHOW_SYS}, + {sys_ndb_report_thresh_binlog_mem_usage.name, + (char*) &sys_ndb_report_thresh_binlog_mem_usage, SHOW_SYS}, +#endif {sys_ndb_use_exact_count.name,(char*) &sys_ndb_use_exact_count, SHOW_SYS}, {sys_ndb_use_transactions.name,(char*) &sys_ndb_use_transactions, SHOW_SYS}, {sys_net_buffer_length.name,(char*) &sys_net_buffer_length, SHOW_SYS}, diff --git a/sql/slave.cc b/sql/slave.cc index 99bddb7b9b0..41a13f2f5c5 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -39,7 +39,7 @@ typedef bool (*CHECK_KILLED_FUNC)(THD*,void*); volatile bool slave_sql_running = 0, slave_io_running = 0; char* slave_load_tmpdir = 0; -MASTER_INFO *active_mi; +MASTER_INFO *active_mi= 0; my_bool replicate_same_server_id; ulonglong relay_log_space_limit = 0; @@ -2885,6 +2885,47 @@ bool st_relay_log_info::cached_charset_compare(char *charset) return 0; } +/* + Check if the current error is of temporary nature of not. + Some errors are temporary in nature, such as + ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals + that the error is temporary by pushing a warning with the error code + ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary. +*/ +static int has_temporary_error(THD *thd) +{ + if (thd->is_fatal_error) + return 0; + + /* + Temporary error codes: + currently, InnoDB deadlock detected by InnoDB or lock + wait timeout (innodb_lock_wait_timeout exceeded + */ + if (thd->net.last_errno == ER_LOCK_DEADLOCK || + thd->net.last_errno == ER_LOCK_WAIT_TIMEOUT) + return 1; + +#ifdef HAVE_NDB_BINLOG + /* + currently temporary error set in ndbcluster + */ + List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); + MYSQL_ERROR *err; + while ((err= it++)) + { + DBUG_PRINT("info", ("has warning %d %s", err->code, err->msg)) + switch (err->code) + { + case ER_GET_TEMPORARY_ERRMSG: + return 1; + default: + break; + } + } +#endif + return 0; +} static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) { @@ -3004,6 +3045,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) ev->when = time(NULL); ev->thd = thd; // because up to this point, ev->thd == 0 exec_res = ev->exec_event(rli); + DBUG_PRINT("info", ("exec_event result = %d", exec_res)); DBUG_ASSERT(rli->sql_thd==thd); /* Format_description_log_event should not be deleted because it will be @@ -3017,17 +3059,13 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) } if (slave_trans_retries) { - if (exec_res && - (thd->net.last_errno == ER_LOCK_DEADLOCK || - thd->net.last_errno == ER_LOCK_WAIT_TIMEOUT) && - !thd->is_fatal_error) + if (exec_res && has_temporary_error(thd)) { const char *errmsg; /* We were in a transaction which has been rolled back because of a - deadlock (currently, InnoDB deadlock detected by InnoDB) or lock - wait timeout (innodb_lock_wait_timeout exceeded); let's seek back to - BEGIN log event and retry it all again. + temporary error; + let's seek back to BEGIN log event and retry it all again. We have to not only seek but also a) init_master_info(), to seek back to hot relay log's start for later (for when we will come back to this hot log after re-processing the @@ -3539,10 +3577,39 @@ Slave SQL thread aborted. Can't execute init_slave query"); { // do not scare the user if SQL thread was simply killed or stopped if (!sql_slave_killed(thd,rli)) + { + /* + retrieve as much info as possible from the thd and, error codes and warnings + and print this to the error log as to allow the user to locate the error + */ + if (thd->net.last_errno != 0) + { + if (rli->last_slave_errno == 0) + { + slave_print_msg(ERROR_LEVEL, rli, thd->net.last_errno, + thd->net.last_error ? + thd->net.last_error : "<no message>"); + } + else if (rli->last_slave_errno != thd->net.last_errno) + { + sql_print_error("Slave (additional info): %s Error_code: %d", + thd->net.last_error ? + thd->net.last_error : "<no message>", + thd->net.last_errno); + } + } + + /* Print any warnings issued */ + List_iterator_fast<MYSQL_ERROR> it(thd->warn_list); + MYSQL_ERROR *err; + while ((err= it++)) + sql_print_warning("Slave: %s Error_code: %d",err->msg, err->code); + sql_print_error("\ Error running query, slave SQL thread aborted. Fix the problem, and restart \ the slave SQL thread with \"SLAVE START\". We stopped at log \ '%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff)); + } goto err; } } diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 74a5848fa0a..85c5a481d47 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -803,13 +803,14 @@ void free_io_cache(TABLE *table) */ bool close_cached_tables(THD *thd, bool if_wait_for_refresh, - TABLE_LIST *tables) + TABLE_LIST *tables, bool have_lock) { bool result=0; DBUG_ENTER("close_cached_tables"); DBUG_ASSERT(thd || (!if_wait_for_refresh && !tables)); - VOID(pthread_mutex_lock(&LOCK_open)); + if (!have_lock) + VOID(pthread_mutex_lock(&LOCK_open)); if (!tables) { refresh_version++; // Force close of open tables @@ -888,7 +889,8 @@ bool close_cached_tables(THD *thd, bool if_wait_for_refresh, for (TABLE *table=thd->open_tables; table ; table= table->next) table->s->version= refresh_version; } - VOID(pthread_mutex_unlock(&LOCK_open)); + if (!have_lock) + VOID(pthread_mutex_unlock(&LOCK_open)); if (if_wait_for_refresh) { pthread_mutex_lock(&thd->mysys_var->mutex); @@ -2383,7 +2385,7 @@ void abort_locked_tables(THD *thd,const char *db, const char *table_name) table->s->table_map_id is not ULONG_MAX. */ -static void assign_new_table_id(TABLE *table) +void assign_new_table_id(TABLE *table) { static ulong last_table_id= ULONG_MAX; diff --git a/sql/sql_class.h b/sql/sql_class.h index c56924774ba..cb8c2818a19 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1383,6 +1383,7 @@ public: #define SYSTEM_THREAD_DELAYED_INSERT 1 #define SYSTEM_THREAD_SLAVE_IO 2 #define SYSTEM_THREAD_SLAVE_SQL 4 +#define SYSTEM_THREAD_NDBCLUSTER_BINLOG 8 /* Used to hold information about file and file structure in exchainge diff --git a/sql/sql_db.cc b/sql/sql_db.cc index d91f091174f..fa01f98d723 100644 --- a/sql/sql_db.cc +++ b/sql/sql_db.cc @@ -401,6 +401,7 @@ bool mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info, bool silent) { char path[FN_REFLEN+16]; + char tmp_query[FN_REFLEN+16]; long result= 1; int error= 0; MY_STAT stat_info; @@ -486,15 +487,20 @@ bool mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info, if (!thd->query) // Only in replication { - query= path; - query_length= (uint) (strxmov(path,"create database `", db, "`", NullS) - - path); + query= tmp_query; + query_length= (uint) (strxmov(tmp_query,"create database `", + db, "`", NullS) - tmp_query); } else { query= thd->query; query_length= thd->query_length; } + + ha_binlog_log_query(thd, LOGCOM_CREATE_DB, + query, query_length, + db, ""); + if (mysql_bin_log.is_open()) { Query_log_event qinfo(thd, query, query_length, 0, @@ -569,6 +575,10 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info) thd->variables.collation_database= thd->db_charset; } + ha_binlog_log_query(thd, LOGCOM_ALTER_DB, + thd->query, thd->query_length, + db, ""); + if (mysql_bin_log.is_open()) { Query_log_event qinfo(thd, thd->query, thd->query_length, 0, diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index cb115adaffb..8238496175c 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -79,7 +79,7 @@ const char *command_name[]={ "Connect","Kill","Debug","Ping","Time","Delayed insert","Change user", "Binlog Dump","Table Dump", "Connect Out", "Register Slave", "Prepare", "Execute", "Long Data", "Close stmt", - "Reset stmt", "Set option", "Fetch", + "Reset stmt", "Set option", "Fetch", "Daemon", "Error" // Last command number }; @@ -149,7 +149,7 @@ static bool end_active_trans(THD *thd) DBUG_RETURN(error); } -static bool begin_trans(THD *thd) +bool begin_trans(THD *thd) { int error=0; if (unlikely(thd->in_sub_stmt)) @@ -6682,6 +6682,8 @@ void kill_one_thread(THD *thd, ulong id, bool only_kill_query) I_List_iterator<THD> it(threads); while ((tmp=it++)) { + if (tmp->command == COM_DAEMON) + continue; if (tmp->thread_id == id) { pthread_mutex_lock(&tmp->LOCK_delete); // Lock from delete diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index dd70f90b3da..ed056f62fe3 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -972,6 +972,9 @@ int reset_slave(THD *thd, MASTER_INFO* mi) error=1; goto err; } + + ha_reset_slave(thd); + // delete relay logs, clear relay log coordinates if ((error= purge_relay_logs(&mi->rli, thd, 1 /* just reset */, @@ -1316,6 +1319,13 @@ bool mysql_show_binlog_events(THD* thd) Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); + /* + Wait for handlers to insert any pending information + into the binlog. For e.g. ndb which updates the binlog asynchronously + this is needed so that the uses sees all its own commands in the binlog + */ + ha_binlog_wait(thd); + if (mysql_bin_log.is_open()) { LEX_MASTER_INFO *lex_mi= &thd->lex->mi; |