summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/Makefile.am7
-rw-r--r--sql/ha_ndbcluster.cc835
-rw-r--r--sql/ha_ndbcluster.h37
-rw-r--r--sql/ha_ndbcluster_binlog.cc2732
-rw-r--r--sql/ha_ndbcluster_binlog.h162
-rw-r--r--sql/ha_ndbcluster_tables.h21
-rw-r--r--sql/handler.cc128
-rw-r--r--sql/handler.h54
-rw-r--r--sql/log.cc7
-rw-r--r--sql/log_event.cc37
-rw-r--r--sql/mysql_priv.h4
-rw-r--r--sql/mysqld.cc61
-rw-r--r--sql/rpl_injector.cc153
-rw-r--r--sql/rpl_injector.h251
-rw-r--r--sql/set_var.cc22
-rw-r--r--sql/slave.cc83
-rw-r--r--sql/sql_base.cc10
-rw-r--r--sql/sql_class.h1
-rw-r--r--sql/sql_db.cc16
-rw-r--r--sql/sql_parse.cc6
-rw-r--r--sql/sql_repl.cc10
21 files changed, 4418 insertions, 219 deletions
diff --git a/sql/Makefile.am b/sql/Makefile.am
index ddbfdb88ba5..4dd1e2bad9c 100644
--- a/sql/Makefile.am
+++ b/sql/Makefile.am
@@ -58,6 +58,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sql_select.h structs.h table.h sql_udf.h hash_filo.h\
lex.h lex_symbol.h sql_acl.h sql_crypt.h \
log_event.h sql_repl.h slave.h rpl_filter.h \
+ rpl_injector.h \
stacktrace.h sql_sort.h sql_cache.h set_var.h \
spatial.h gstream.h client_settings.h tzfile.h \
tztime.h my_decimal.h\
@@ -89,6 +90,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
sql_load.cc mf_iocache.cc field_conv.cc sql_show.cc \
sql_udf.cc sql_analyse.cc sql_analyse.h sql_cache.cc \
slave.cc sql_repl.cc rpl_filter.cc rpl_tblmap.cc \
+ rpl_injector.cc \
sql_union.cc sql_derived.cc \
client.c sql_client.cc mini_client_errors.c pack.c\
stacktrace.c repl_failsafe.h repl_failsafe.cc \
@@ -104,6 +106,8 @@ EXTRA_mysqld_SOURCES = ha_innodb.cc ha_berkeley.cc ha_archive.cc \
ha_innodb.h ha_berkeley.h ha_archive.h \
ha_blackhole.cc ha_federated.cc ha_ndbcluster.cc \
ha_blackhole.h ha_federated.h ha_ndbcluster.h \
+ ha_ndbcluster_binlog.cc ha_ndbcluster_binlog.h \
+ ha_ndbcluster_tables.h \
ha_partition.cc ha_partition.h
mysqld_DEPENDENCIES = @mysql_se_objs@
gen_lex_hash_SOURCES = gen_lex_hash.cc
@@ -160,6 +164,9 @@ ha_berkeley.o: ha_berkeley.cc ha_berkeley.h
ha_ndbcluster.o:ha_ndbcluster.cc ha_ndbcluster.h
$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
+ha_ndbcluster_binlog.o:ha_ndbcluster_binlog.cc ha_ndbcluster_binlog.h
+ $(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
+
#Until we can get rid of dependencies on ha_ndbcluster.h
handler.o: handler.cc ha_ndbcluster.h
$(CXXCOMPILE) @ndbcluster_includes@ $(CXXFLAGS) -c $<
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 7bc3af2c3aa..ed2a53ed2c6 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -33,6 +33,8 @@
#include <../util/Bitmask.hpp>
#include <ndbapi/NdbIndexStat.hpp>
+#include "ha_ndbcluster_binlog.h"
+
// options from from mysqld.cc
extern my_bool opt_ndb_optimized_node_selection;
extern const char *opt_ndbcluster_connectstring;
@@ -50,13 +52,9 @@ static const int parallelism= 0;
// createable against NDB from this handler
static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used
-static const char *ha_ndb_ext=".ndb";
-static const char share_prefix[]= "./";
-
-static int ndbcluster_close_connection(THD *thd);
-static int ndbcluster_commit(THD *thd, bool all);
-static int ndbcluster_rollback(THD *thd, bool all);
-static handler* ndbcluster_create_handler(TABLE_SHARE *table);
+static bool ndbcluster_init(void);
+static int ndbcluster_end(ha_panic_function flag);
+static bool ndbcluster_show_status(THD*,stat_print_fn *,enum ha_stat_type);
static int ndbcluster_alter_tablespace(THD* thd, st_alter_tablespace *info);
handlerton ndbcluster_hton = {
@@ -66,29 +64,7 @@ handlerton ndbcluster_hton = {
"Clustered, fault-tolerant, memory-based tables",
DB_TYPE_NDBCLUSTER,
ndbcluster_init,
- 0, /* slot */
- 0, /* savepoint size */
- ndbcluster_close_connection,
- NULL, /* savepoint_set */
- NULL, /* savepoint_rollback */
- NULL, /* savepoint_release */
- ndbcluster_commit,
- ndbcluster_rollback,
- NULL, /* prepare */
- NULL, /* recover */
- NULL, /* commit_by_xid */
- NULL, /* rollback_by_xid */
- NULL, /* create_cursor_read_view */
- NULL, /* set_cursor_read_view */
- NULL, /* close_cursor_read_view */
- ndbcluster_create_handler, /* Create a new handler */
- ndbcluster_drop_database, /* Drop a database */
- ndbcluster_end, /* Panic call */
- NULL, /* Start Consistent Snapshot */
- NULL, /* Flush logs */
- ndbcluster_show_status, /* Show status */
- ndbcluster_alter_tablespace,
- HTON_NO_FLAGS
+ ~(uint)0, /* slot */
};
static handler *ndbcluster_create_handler(TABLE_SHARE *table)
@@ -121,33 +97,24 @@ static handler *ndbcluster_create_handler(TABLE_SHARE *table)
break; \
}
-// Typedefs for long names
-typedef NdbDictionary::Object NDBOBJ;
-typedef NdbDictionary::Column NDBCOL;
-typedef NdbDictionary::Table NDBTAB;
-typedef NdbDictionary::Index NDBINDEX;
-typedef NdbDictionary::Dictionary NDBDICT;
-typedef NdbDictionary::Event NDBEVENT;
-
static int ndbcluster_inited= 0;
-static int ndbcluster_util_inited= 0;
+int ndbcluster_util_inited= 0;
static Ndb* g_ndb= NULL;
-static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
+Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
+unsigned char g_node_id_map[max_ndb_nodes];
// Handler synchronization
pthread_mutex_t ndbcluster_mutex;
// Table lock handling
-static HASH ndbcluster_open_tables;
+HASH ndbcluster_open_tables;
static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
my_bool not_used __attribute__((unused)));
-static NDB_SHARE *get_share(const char *key,
- bool create_if_not_exists= TRUE,
- bool have_lock= FALSE);
-static void free_share(NDB_SHARE **share, bool have_lock= FALSE);
-static void real_free_share(NDB_SHARE **share);
+#ifdef HAVE_NDB_BINLOG
+static int rename_share(NDB_SHARE *share, const char *new_key);
+#endif
static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len);
static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
@@ -157,35 +124,9 @@ static int unpackfrm(const void **data, uint *len,
static int ndb_get_table_statistics(Ndb*, const char *,
struct Ndb_statistics *);
-#ifndef DBUG_OFF
-void print_records(TABLE *table, const char *record)
-{
- if (_db_on_)
- {
- for (uint j= 0; j < table->s->fields; j++)
- {
- char buf[40];
- int pos= 0;
- Field *field= table->field[j];
- const byte* field_ptr= field->ptr - table->record[0] + record;
- int pack_len= field->pack_length();
- int n= pack_len < 10 ? pack_len : 10;
-
- for (int i= 0; i < n && pos < 20; i++)
- {
- pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]);
- }
- buf[pos]= 0;
- DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
- }
- }
-}
-#else
-#define print_records(a,b)
-#endif
// Util thread variables
-static pthread_t ndb_util_thread;
+pthread_t ndb_util_thread;
pthread_mutex_t LOCK_ndb_util_thread;
pthread_cond_t COND_ndb_util_thread;
pthread_handler_t ndb_util_thread_func(void *arg);
@@ -214,7 +155,7 @@ static long ndb_cluster_node_id= 0;
static const char * ndb_connected_host= 0;
static long ndb_connected_port= 0;
static long ndb_number_of_replicas= 0;
-static long ndb_number_of_storage_nodes= 0;
+long ndb_number_of_storage_nodes= 0;
static int update_status_variables(Ndb_cluster_connection *c)
{
@@ -235,9 +176,6 @@ SHOW_VAR ndb_status_variables[]= {
{NullS, NullS, SHOW_LONG}
};
-/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
-extern Uint64 g_latest_trans_gci;
-
/*
Error handling functions
*/
@@ -365,6 +303,7 @@ Thd_ndb::Thd_ndb()
all= NULL;
stmt= NULL;
error= 0;
+ options= 0;
}
Thd_ndb::~Thd_ndb()
@@ -391,14 +330,6 @@ Thd_ndb::~Thd_ndb()
}
inline
-Thd_ndb *
-get_thd_ndb(THD *thd) { return (Thd_ndb *) thd->ha_data[ndbcluster_hton.slot]; }
-
-inline
-void
-set_thd_ndb(THD *thd, Thd_ndb *thd_ndb) { thd->ha_data[ndbcluster_hton.slot]= thd_ndb; }
-
-inline
Ndb *ha_ndbcluster::get_ndb()
{
return get_thd_ndb(current_thd)->ndb;
@@ -2517,8 +2448,8 @@ int ha_ndbcluster::delete_row(const byte *record)
set to null.
*/
-static void ndb_unpack_record(TABLE *table, NdbValue *value,
- MY_BITMAP *defined, byte *buf)
+void ndb_unpack_record(TABLE *table, NdbValue *value,
+ MY_BITMAP *defined, byte *buf)
{
Field **p_field= table->field, *field= *p_field;
uint row_offset= (uint) (buf - table->record[0]);
@@ -2756,6 +2687,7 @@ int ha_ndbcluster::index_read_idx(byte *buf, uint index_no,
statistic_increment(current_thd->status_var.ha_read_key_count, &LOCK_status);
DBUG_ENTER("ha_ndbcluster::index_read_idx");
DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len));
+ close_scan();
index_init(index_no, 0);
DBUG_RETURN(index_read(buf, key, key_len, find_flag));
}
@@ -3167,6 +3099,16 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
m_use_write= FALSE;
m_ignore_dup_key= FALSE;
break;
+ case HA_EXTRA_IGNORE_NO_KEY:
+ DBUG_PRINT("info", ("HA_EXTRA_IGNORE_NO_KEY"));
+ DBUG_PRINT("info", ("Turning on AO_IgnoreError at Commit/NoCommit"));
+ m_ignore_no_key= TRUE;
+ break;
+ case HA_EXTRA_NO_IGNORE_NO_KEY:
+ DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_NO_KEY"));
+ DBUG_PRINT("info", ("Turning on AO_IgnoreError at Commit/NoCommit"));
+ m_ignore_no_key= FALSE;
+ break;
default:
break;
}
@@ -3597,7 +3539,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type)
Commit a transaction started in NDB
*/
-int ndbcluster_commit(THD *thd, bool all)
+static int ndbcluster_commit(THD *thd, bool all)
{
int res= 0;
Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -3648,7 +3590,7 @@ int ndbcluster_commit(THD *thd, bool all)
Rollback a transaction started in NDB
*/
-int ndbcluster_rollback(THD *thd, bool all)
+static int ndbcluster_rollback(THD *thd, bool all)
{
int res= 0;
Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -3989,12 +3931,16 @@ int ha_ndbcluster::create(const char *name,
if (create_from_engine)
{
/*
- Table alreay exists in NDB and frm file has been created by
+ Table already exists in NDB and frm file has been created by
caller.
Do Ndb specific stuff, such as create a .ndb file
*/
if ((my_errno= write_ndb_file()))
DBUG_RETURN(my_errno);
+#ifdef HAVE_NDB_BINLOG
+ if (ndb_binlog_thread_running > 0)
+ ndbcluster_create_binlog_setup(get_ndb(), name2, m_dbname, m_tabname, 0);
+#endif /* HAVE_NDB_BINLOG */
DBUG_RETURN(my_errno);
}
@@ -4133,6 +4079,74 @@ int ha_ndbcluster::create(const char *name,
if (!my_errno)
my_errno= write_ndb_file();
+#ifdef HAVE_NDB_BINLOG
+ if (!my_errno)
+ {
+ NDB_SHARE *share= 0;
+ pthread_mutex_lock(&ndbcluster_mutex);
+ /*
+ First make sure we get a "fresh" share here, not an old trailing one...
+ */
+ {
+ const char *key= name2;
+ uint length= (uint) strlen(key);
+ if ((share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+ (byte*) key, length)))
+ handle_trailing_share(share);
+ }
+ /*
+ get a new share
+ */
+ if (!(share= get_share(name2, form, true, true)))
+ {
+ sql_print_error("NDB: allocating table share for %s failed", name2);
+ /* my_errno is set */
+ }
+ pthread_mutex_unlock(&ndbcluster_mutex);
+
+ while (!IS_TMP_PREFIX(m_tabname))
+ {
+ const NDBTAB *t= dict->getTable(m_tabname);
+ String event_name(INJECTOR_EVENT_LEN);
+ ndb_rep_event_name(&event_name,m_dbname,m_tabname);
+
+ /*
+ Always create an event for the table, as other mysql servers
+ expect it to be there.
+ */
+ if (ndbcluster_create_event(ndb, t, event_name.c_ptr(), share) < 0)
+ {
+ /* this is only a serious error if the binlog is on */
+ if (share && ndb_binlog_thread_running > 0)
+ {
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ "Creating event for logging table failed. "
+ "See error log for details.");
+ }
+ break;
+ }
+ if (ndb_extra_logging)
+ sql_print_information("NDB Binlog: CREATE TABLE Event: %s",
+ event_name.c_ptr());
+
+ if (share && ndb_binlog_thread_running > 0 &&
+ ndbcluster_create_event_ops(share, t, event_name.c_ptr()) < 0)
+ {
+ sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations."
+ " Event: %s", name2);
+ /* a warning has been issued to the client */
+ }
+ ndbcluster_log_schema_op(current_thd, share,
+ current_thd->query, current_thd->query_length,
+ share->db, share->table_name,
+ 0, 0,
+ SOT_CREATE_TABLE);
+ break;
+ }
+ }
+#endif /* HAVE_NDB_BINLOG */
+
DBUG_RETURN(my_errno);
}
@@ -4227,6 +4241,15 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
if (!(orig_tab= dict->getTable(m_tabname)))
ERR_RETURN(dict->getNdbError());
}
+#ifdef HAVE_NDB_BINLOG
+ NDB_SHARE *share= 0;
+ if (ndb_binlog_thread_running > 0 &&
+ (share= get_share(from, 0, false)))
+ {
+ int r= rename_share(share, to);
+ DBUG_ASSERT(r == 0);
+ }
+#endif
m_table= (void *)orig_tab;
// Change current database to that of target table
set_dbname(to);
@@ -4234,6 +4257,14 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
if ((result= alter_table_name(new_tabname)))
{
+#ifdef HAVE_NDB_BINLOG
+ if (share)
+ {
+ int r= rename_share(share, from);
+ DBUG_ASSERT(r == 0);
+ free_share(&share);
+ }
+#endif
DBUG_RETURN(result);
}
@@ -4241,9 +4272,76 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
if ((result= handler::rename_table(from, to)))
{
// ToDo in 4.1 should rollback alter table...
+#ifdef HAVE_NDB_BINLOG
+ if (share)
+ free_share(&share);
+#endif
DBUG_RETURN(result);
}
+#ifdef HAVE_NDB_BINLOG
+ int is_old_table_tmpfile= 1;
+ if (share && share->op)
+ dict->forceGCPWait();
+
+ /* handle old table */
+ if (!IS_TMP_PREFIX(m_tabname))
+ {
+ is_old_table_tmpfile= 0;
+ String event_name(INJECTOR_EVENT_LEN);
+ ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0);
+ ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share);
+ }
+
+ if (!result && !IS_TMP_PREFIX(new_tabname))
+ {
+ /* always create an event for the table */
+ String event_name(INJECTOR_EVENT_LEN);
+ ndb_rep_event_name(&event_name, to + sizeof(share_prefix) - 1, 0);
+ const NDBTAB *ndbtab= dict->getTable(new_tabname);
+
+ if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share) >= 0)
+ {
+ if (ndb_extra_logging)
+ sql_print_information("NDB Binlog: RENAME Event: %s",
+ event_name.c_ptr());
+ if (share)
+ {
+ if (ndbcluster_create_event_ops(share, ndbtab,
+ event_name.c_ptr()) < 0)
+ {
+ sql_print_error("NDB Binlog: FAILED create event operations "
+ "during RENAME. Event %s", event_name.c_ptr());
+ /* a warning has been issued to the client */
+ }
+ }
+ }
+ else
+ {
+ sql_print_error("NDB Binlog: FAILED create event during RENAME. "
+ "Event: %s", event_name.c_ptr());
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ "Creating event for logging table failed. "
+ "See error log for details.");
+ }
+ if (is_old_table_tmpfile)
+ ndbcluster_log_schema_op(current_thd, share,
+ current_thd->query, current_thd->query_length,
+ m_dbname, new_tabname,
+ 0, 0,
+ SOT_ALTER_TABLE);
+ else
+ ndbcluster_log_schema_op(current_thd, share,
+ current_thd->query, current_thd->query_length,
+ m_dbname, new_tabname,
+ 0, 0,
+ SOT_RENAME_TABLE);
+ }
+ if (share)
+ free_share(&share);
+#endif
+
DBUG_RETURN(result);
}
@@ -4286,6 +4384,9 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
{
DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
NDBDICT *dict= ndb->getDictionary();
+#ifdef HAVE_NDB_BINLOG
+ NDB_SHARE *share= get_share(path, 0, false);
+#endif
/* Drop the table from NDB */
@@ -4302,9 +4403,75 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
if (res)
{
+#ifdef HAVE_NDB_BINLOG
+ /* the drop table failed for some reason, drop the share anyways */
+ if (share)
+ {
+ pthread_mutex_lock(&ndbcluster_mutex);
+ if (share->state != NSS_DROPPED)
+ {
+ /*
+ The share kept by the server has not been freed, free it
+ */
+ share->state= NSS_DROPPED;
+ free_share(&share, TRUE);
+ }
+ /* free the share taken above */
+ free_share(&share, TRUE);
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ }
+#endif
DBUG_RETURN(res);
}
+#ifdef HAVE_NDB_BINLOG
+ /* stop the logging of the dropped table, and cleanup */
+
+ /*
+ drop table is successful even if table does not exist in ndb
+ and in case table was actually not dropped, there is no need
+ to force a gcp, and setting the event_name to null will indicate
+ that there is no event to be dropped
+ */
+ int table_dropped= dict->getNdbError().code != 709;
+
+ if (!IS_TMP_PREFIX(table_name) && share)
+ {
+ ndbcluster_log_schema_op(current_thd, share,
+ current_thd->query, current_thd->query_length,
+ share->db, share->table_name,
+ 0, 0,
+ SOT_DROP_TABLE);
+ }
+ else if (table_dropped && share && share->op) /* ndbcluster_log_schema_op
+ will do a force GCP */
+ dict->forceGCPWait();
+
+ if (!IS_TMP_PREFIX(table_name))
+ {
+ String event_name(INJECTOR_EVENT_LEN);
+ ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0);
+ ndbcluster_handle_drop_table(ndb,
+ table_dropped ? event_name.c_ptr() : 0,
+ share);
+ }
+
+ if (share)
+ {
+ pthread_mutex_lock(&ndbcluster_mutex);
+ if (share->state != NSS_DROPPED)
+ {
+ /*
+ The share kept by the server has not been freed, free it
+ */
+ share->state= NSS_DROPPED;
+ free_share(&share, TRUE);
+ }
+ /* free the share taken above */
+ free_share(&share, TRUE);
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ }
+#endif
DBUG_RETURN(0);
}
@@ -4393,7 +4560,8 @@ ulonglong ha_ndbcluster::get_auto_increment()
HA_NO_PREFIX_CHAR_KEYS | \
HA_NEED_READ_RANGE_BUFFER | \
HA_CAN_GEOMETRY | \
- HA_CAN_BIT_FIELD
+ HA_CAN_BIT_FIELD | \
+ HA_PRIMARY_KEY_ALLOW_RANDOM_ACCESS
ha_ndbcluster::ha_ndbcluster(TABLE_SHARE *table_arg):
handler(&ndbcluster_hton, table_arg),
@@ -4518,7 +4686,7 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
DBUG_PRINT("info", (" ref_length: %d", ref_length));
}
// Init table lock structure
- if (!(m_share=get_share(name)))
+ if (!(m_share=get_share(name, table)))
DBUG_RETURN(1);
thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0);
@@ -4629,7 +4797,7 @@ int ha_ndbcluster::check_ndb_connection(THD* thd)
}
-int ndbcluster_close_connection(THD *thd)
+static int ndbcluster_close_connection(THD *thd)
{
Thd_ndb *thd_ndb= get_thd_ndb(thd);
DBUG_ENTER("ndbcluster_close_connection");
@@ -4792,14 +4960,21 @@ int ndbcluster_drop_database_impl(const char *path)
DBUG_RETURN(ret);
}
-void ndbcluster_drop_database(char *path)
+static void ndbcluster_drop_database(char *path)
{
ndbcluster_drop_database_impl(path);
+#ifdef HAVE_NDB_BINLOG
+ char db[FN_REFLEN];
+ ha_ndbcluster::set_dbname(path, db);
+ ndbcluster_log_schema_op(current_thd, 0,
+ current_thd->query, current_thd->query_length,
+ db, "", 0, 0, SOT_DROP_DB);
+#endif
}
/*
find all tables in ndb and discover those needed
*/
-static int ndbcluster_find_all_files(THD *thd)
+int ndbcluster_find_all_files(THD *thd)
{
DBUG_ENTER("ndbcluster_find_all_files");
Ndb* ndb;
@@ -4820,6 +4995,11 @@ static int ndbcluster_find_all_files(THD *thd)
for (uint i= 0 ; i < list.count ; i++)
{
NDBDICT::List::Element& elmt= list.elements[i];
+ if (IS_TMP_PREFIX(elmt.name))
+ {
+ DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
+ continue;
+ }
DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
if (!(elmt.state == NDBOBJ::StateBuilding ||
elmt.state == NDBOBJ::StateOnline))
@@ -4834,10 +5014,11 @@ static int ndbcluster_find_all_files(THD *thd)
if (!(ndbtab= dict->getTable(elmt.name)))
{
- sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
- elmt.database, elmt.name,
- dict->getNdbError().code,
- dict->getNdbError().message);
+ if (elmt.state == NDBOBJ::StateOnline)
+ sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
+ elmt.database, elmt.name,
+ dict->getNdbError().code,
+ dict->getNdbError().message);
unhandled++;
continue;
}
@@ -4876,6 +5057,31 @@ static int ndbcluster_find_all_files(THD *thd)
}
pthread_mutex_unlock(&LOCK_open);
}
+#ifdef HAVE_NDB_BINLOG
+ else if (ndb_binlog_thread_running > 0)
+ {
+ /* set up replication for this table */
+ NDB_SHARE *share;
+ pthread_mutex_lock(&ndbcluster_mutex);
+ if (((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
+ (byte*) key, strlen(key)))
+ && share->op == 0 && share->op_old == 0)
+ || share == 0)
+ {
+ /*
+ there is no binlog creation setup for this table
+ attempt to do it
+ */
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ pthread_mutex_lock(&LOCK_open);
+ ndbcluster_create_binlog_setup(ndb, key, elmt.database, elmt.name,
+ share);
+ pthread_mutex_unlock(&LOCK_open);
+ }
+ else
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ }
+#endif
}
}
while (unhandled && retries--);
@@ -4925,6 +5131,11 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
for (i= 0 ; i < list.count ; i++)
{
NDBDICT::List::Element& elmt= list.elements[i];
+ if (IS_TMP_PREFIX(elmt.name))
+ {
+ DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
+ continue;
+ }
DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name));
// Add only tables that belongs to db
@@ -4983,6 +5194,39 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
}
}
+#ifdef HAVE_NDB_BINLOG
+ /* setup logging to binlog for all discovered tables */
+ if (ndb_binlog_thread_running > 0)
+ {
+ char *end;
+ char *end1=
+ strxnmov(name, sizeof(name), mysql_data_home, "/", db, "/", NullS);
+ NDB_SHARE *share;
+ pthread_mutex_lock(&ndbcluster_mutex);
+ for (i= 0; i < ok_tables.records; i++)
+ {
+ file_name= (char*)hash_element(&ok_tables, i);
+ end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS);
+ if ((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
+ (byte*)name, end - name))
+ && share->op == 0 && share->op_old == 0)
+ {
+ /*
+ there is no binlog creation setup for this table
+ attempt to do it
+ */
+
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ pthread_mutex_lock(&LOCK_open);
+ ndbcluster_create_binlog_setup(ndb, name, db, file_name, share);
+ pthread_mutex_unlock(&LOCK_open);
+ pthread_mutex_lock(&ndbcluster_mutex);
+ }
+ }
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ }
+#endif
+
// Check for new files to discover
DBUG_PRINT("info", ("Checking for new files to discover"));
List<char> create_list;
@@ -5055,11 +5299,18 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
static int connect_callback()
{
update_status_variables(g_ndb_cluster_connection);
+
+ uint node_id, i= 0;
+ Ndb_cluster_connection_node_iter node_iter;
+ memset((void *)g_node_id_map, 0xFFFF, sizeof(g_node_id_map));
+ while ((node_id= g_ndb_cluster_connection->get_next_node(node_iter)))
+ g_node_id_map[node_id]= i++;
+
pthread_cond_signal(&COND_ndb_util_thread);
return 0;
}
-bool ndbcluster_init()
+static bool ndbcluster_init()
{
int res;
DBUG_ENTER("ndbcluster_init");
@@ -5067,6 +5318,22 @@ bool ndbcluster_init()
if (have_ndbcluster != SHOW_OPTION_YES)
goto ndbcluster_init_error;
+ {
+ handlerton &h= ndbcluster_hton;
+ h.close_connection= ndbcluster_close_connection;
+ h.commit= ndbcluster_commit;
+ h.rollback= ndbcluster_rollback;
+ h.create= ndbcluster_create_handler; /* Create a new handler */
+ h.drop_database= ndbcluster_drop_database; /* Drop a database */
+ h.panic= ndbcluster_end; /* Panic call */
+ h.show_status= ndbcluster_show_status; /* Show status */
+ h.alter_tablespace= ndbcluster_alter_tablespace; /* Show status */
+#ifdef HAVE_NDB_BINLOG
+ ndbcluster_binlog_init_handlerton();
+#endif
+ h.flags= HTON_NO_FLAGS;
+ }
+
// Set connectstring if specified
if (opt_ndbcluster_connectstring != 0)
DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));
@@ -5130,6 +5397,22 @@ bool ndbcluster_init()
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0);
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+#ifdef HAVE_NDB_BINLOG
+ /* start the ndb injector thread */
+ if (opt_bin_log)
+ {
+ if (binlog_row_based)
+ {
+ if (ndbcluster_binlog_start())
+ goto ndbcluster_init_error;
+ }
+ else
+ {
+ sql_print_error("NDB: only row based binary logging is supported");
+ }
+ }
+#endif /* HAVE_NDB_BINLOG */
+
pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_ndb_util_thread, NULL);
@@ -5160,26 +5443,13 @@ ndbcluster_init_error:
DBUG_RETURN(TRUE);
}
-
-/*
- End use of the NDB Cluster table handler
- - free all global variables allocated by
- ndbcluster_init()
-*/
-
-int ndbcluster_end(ha_panic_function type)
+static int ndbcluster_end(ha_panic_function type)
{
DBUG_ENTER("ndbcluster_end");
if (!ndbcluster_inited)
DBUG_RETURN(0);
- // Kill ndb utility thread
- (void) pthread_mutex_lock(&LOCK_ndb_util_thread);
- DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread));
- (void) pthread_cond_signal(&COND_ndb_util_thread);
- (void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
-
if (g_ndb)
{
#ifndef DBUG_OFF
@@ -5206,7 +5476,6 @@ int ndbcluster_end(ha_panic_function type)
pthread_mutex_destroy(&LOCK_ndb_util_thread);
pthread_cond_destroy(&COND_ndb_util_thread);
ndbcluster_inited= 0;
- ndbcluster_util_inited= 0;
DBUG_RETURN(0);
}
@@ -5673,60 +5942,6 @@ ha_ndbcluster::register_query_cache_table(THD *thd,
}
-#ifndef DBUG_OFF
-static void dbug_print_table(const char *info, TABLE *table)
-{
- if (table == 0)
- {
- DBUG_PRINT("info",("%s: (null)", info));
- return;
- }
- DBUG_PRINT("info",
- ("%s: %s.%s s->fields: %d "
- "reclength: %d rec_buff_length: %d record[0]: %lx "
- "record[1]: %lx",
- info,
- table->s->db,
- table->s->table_name,
- table->s->fields,
- table->s->reclength,
- table->s->rec_buff_length,
- table->record[0],
- table->record[1]));
-
- for (unsigned int i= 0; i < table->s->fields; i++)
- {
- Field *f= table->field[i];
- DBUG_PRINT("info",
- ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
- "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
- i,
- f->field_name,
- f->flags,
- (f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
- (f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
- (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
- (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
- (f->flags & BLOB_FLAG) ? ",blob" : "",
- (f->flags & BINARY_FLAG) ? ",binary" : "",
- f->real_type(),
- f->pack_length(),
- f->ptr, f->ptr - table->record[0],
- f->null_bit,
- f->null_ptr, (byte*) f->null_ptr - table->record[0]));
- if (f->type() == MYSQL_TYPE_BIT)
- {
- Field_bit *g= (Field_bit*) f;
- DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
- "bit_ofs: %u bit_len: %u",
- g->field_length, g->bit_ptr,
- (byte*) g->bit_ptr-table->record[0],
- g->bit_ofs, g->bit_len));
- }
- }
-}
-#endif
-
/*
Handling the shared NDB_SHARE structure that is needed to
provide table locking.
@@ -5756,6 +5971,12 @@ static void dbug_print_open_tables()
("db.tablename: %s.%s use_count: %d commit_count: %d",
share->db, share->table_name,
share->use_count, share->commit_count));
+#ifdef HAVE_NDB_BINLOG
+ if (share->table)
+ DBUG_PRINT("share",
+ ("table->s->db.table_name: %s.%s",
+ share->table->s->db.str, share->table->s->table_name.str));
+#endif
}
DBUG_VOID_RETURN;
}
@@ -5763,11 +5984,170 @@ static void dbug_print_open_tables()
#define dbug_print_open_tables()
#endif
+#ifdef HAVE_NDB_BINLOG
+/*
+ For some reason a share is still around, try to salvage the situation
+ by closing all cached tables. If the share still exists, there is an
+ error somewhere but only report this to the error log. Keep this
+ "trailing share" but rename it since there are still references to it
+ to avoid segmentation faults. There is a risk that the memory for
+ this trailing share leaks.
+
+ Must be called with previous pthread_mutex_lock(&ndbcluster_mutex)
+*/
+int handle_trailing_share(NDB_SHARE *share)
+{
+ static ulong trailing_share_id= 0;
+ DBUG_ENTER("handle_trailing_share");
+
+ ++share->use_count;
+ pthread_mutex_unlock(&ndbcluster_mutex);
+
+ close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
+
+ pthread_mutex_lock(&ndbcluster_mutex);
+ if (!--share->use_count)
+ {
+ DBUG_PRINT("info", ("NDB_SHARE: close_cashed_tables %s freed share.",
+ share->key));
+ real_free_share(&share);
+ DBUG_RETURN(0);
+ }
+
+ /*
+ share still exists, if share has not been dropped by server
+ release that share
+ */
+ if (share->state != NSS_DROPPED && !--share->use_count)
+ {
+ DBUG_PRINT("info", ("NDB_SHARE: %s already exists, "
+ "use_count=%d state != NSS_DROPPED.",
+ share->key, share->use_count));
+ real_free_share(&share);
+ DBUG_RETURN(0);
+ }
+ DBUG_PRINT("error", ("NDB_SHARE: %s already exists use_count=%d.",
+ share->key, share->use_count));
+
+ sql_print_error("NDB_SHARE: %s already exists use_count=%d."
+ " Moving away for safety, but possible memleak.",
+ share->key, share->use_count);
+ dbug_print_open_tables();
+
+ /*
+ This is probably an error. We can however save the situation
+ at the cost of a possible mem leak, by "renaming" the share
+ - First remove from hash
+ */
+ hash_delete(&ndbcluster_open_tables, (byte*) share);
+
+ /*
+ now give it a new name, just a running number
+ if space is not enough allocate some more
+ */
+ {
+ const uint min_key_length= 10;
+ if (share->key_length < min_key_length)
+ {
+ share->key= alloc_root(&share->mem_root, min_key_length + 1);
+ share->key_length= min_key_length;
+ }
+ share->key_length=
+ my_snprintf(share->key, min_key_length + 1, "#leak%d",
+ trailing_share_id++);
+ }
+ /* Keep it for possible the future trailing free */
+ my_hash_insert(&ndbcluster_open_tables, (byte*) share);
+
+ DBUG_RETURN(0);
+}
+
+/*
+ Rename share is used during rename table.
+*/
+static int rename_share(NDB_SHARE *share, const char *new_key)
+{
+ NDB_SHARE *tmp;
+ pthread_mutex_lock(&ndbcluster_mutex);
+ uint new_length= (uint) strlen(new_key);
+ DBUG_PRINT("rename_share", ("old_key: %s old__length: %d",
+ share->key, share->key_length));
+ if ((tmp= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+ (byte*) new_key, new_length)))
+ handle_trailing_share(tmp);
+
+ /* remove the share from hash */
+ hash_delete(&ndbcluster_open_tables, (byte*) share);
+ dbug_print_open_tables();
+
+ /* save old stuff if insert should fail */
+ uint old_length= share->key_length;
+ char *old_key= share->key;
+
+ /*
+ now allocate and set the new key, db etc
+ enough space for key, db, and table_name
+ */
+ share->key= alloc_root(&share->mem_root, 2 * (new_length + 1));
+ strmov(share->key, new_key);
+ share->key_length= new_length;
+
+ if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
+ {
+ // ToDo free the allocated stuff above?
+ DBUG_PRINT("error", ("rename_share: my_hash_insert %s failed",
+ share->key));
+ share->key= old_key;
+ share->key_length= old_length;
+ if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
+ {
+ sql_print_error("rename_share: failed to recover %s", share->key);
+ DBUG_PRINT("error", ("rename_share: my_hash_insert %s failed",
+ share->key));
+ }
+ dbug_print_open_tables();
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ return -1;
+ }
+ dbug_print_open_tables();
+
+ share->db= share->key + new_length + 1;
+ ha_ndbcluster::set_dbname(new_key, share->db);
+ share->table_name= share->db + strlen(share->db) + 1;
+ ha_ndbcluster::set_tabname(new_key, share->table_name);
+
+ DBUG_PRINT("rename_share",
+ ("0x%lx key: %s key_length: %d",
+ share, share->key, share->key_length));
+ DBUG_PRINT("rename_share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ share->db, share->table_name,
+ share->use_count, share->commit_count));
+ DBUG_PRINT("rename_share",
+ ("table->s->db.table_name: %s.%s",
+ share->table->s->db.str, share->table->s->table_name.str));
+
+ if (share->op == 0)
+ {
+ share->table->s->db.str= share->db;
+ share->table->s->db.length= strlen(share->db);
+ share->table->s->table_name.str= share->table_name;
+ share->table->s->table_name.length= strlen(share->table_name);
+ }
+ /* else rename will be handled when the ALTER event comes */
+ share->old_names= old_key;
+ // ToDo free old_names after ALTER EVENT
+
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ return 0;
+}
+#endif
+
/*
Increase refcount on existing share.
Always returns share and cannot fail.
*/
-static NDB_SHARE *get_share(NDB_SHARE *share)
+NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share)
{
pthread_mutex_lock(&ndbcluster_mutex);
share->use_count++;
@@ -5799,9 +6179,13 @@ static NDB_SHARE *get_share(NDB_SHARE *share)
have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
*/
-static NDB_SHARE *get_share(const char *key, bool create_if_not_exists,
- bool have_lock)
+NDB_SHARE *ndbcluster_get_share(const char *key, TABLE *table,
+ bool create_if_not_exists,
+ bool have_lock)
{
+ DBUG_ENTER("get_share");
+ DBUG_PRINT("info", ("get_share: key %s", key));
+ THD *thd= current_thd;
NDB_SHARE *share;
if (!have_lock)
pthread_mutex_lock(&ndbcluster_mutex);
@@ -5847,6 +6231,9 @@ static NDB_SHARE *get_share(const char *key, bool create_if_not_exists,
ha_ndbcluster::set_dbname(key, share->db);
share->table_name= share->db + strlen(share->db) + 1;
ha_ndbcluster::set_tabname(key, share->table_name);
+#ifdef HAVE_NDB_BINLOG
+ ndbcluster_binlog_init_share(share, table);
+#endif
*root_ptr= old_root;
}
else
@@ -5874,7 +6261,7 @@ static NDB_SHARE *get_share(const char *key, bool create_if_not_exists,
return share;
}
-static void real_free_share(NDB_SHARE **share)
+void ndbcluster_real_free_share(NDB_SHARE **share)
{
DBUG_PRINT("real_free_share",
("0x%lx key: %s key_length: %d",
@@ -5889,6 +6276,26 @@ static void real_free_share(NDB_SHARE **share)
pthread_mutex_destroy(&(*share)->mutex);
free_root(&(*share)->mem_root, MYF(0));
+#ifdef HAVE_NDB_BINLOG
+ if ((*share)->table)
+ {
+ closefrm((*share)->table, 0);
+#if 0 // todo ?
+ free_root(&(*share)->table->mem_root, MYF(0));
+#endif
+
+#ifndef DBUG_OFF
+ bzero((gptr)(*share)->table_share, sizeof(*(*share)->table_share));
+ bzero((gptr)(*share)->table, sizeof(*(*share)->table));
+#endif
+ my_free((gptr) (*share)->table_share, MYF(0));
+ my_free((gptr) (*share)->table, MYF(0));
+#ifndef DBUG_OFF
+ (*share)->table_share= 0;
+ (*share)->table= 0;
+#endif
+ }
+#endif
my_free((gptr) *share, MYF(0));
*share= 0;
@@ -5901,7 +6308,7 @@ static void real_free_share(NDB_SHARE **share)
have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
*/
-static void free_share(NDB_SHARE **share, bool have_lock)
+void ndbcluster_free_share(NDB_SHARE **share, bool have_lock)
{
if (!have_lock)
pthread_mutex_lock(&ndbcluster_mutex);
@@ -5927,7 +6334,6 @@ static void free_share(NDB_SHARE **share, bool have_lock)
}
-
/*
Internal representation of the frm blob
@@ -6601,7 +7007,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
Wait for cluster to start
*/
pthread_mutex_lock(&LOCK_ndb_util_thread);
- while (!ndb_cluster_node_id)
+ while (!ndb_cluster_node_id && (ndbcluster_hton.slot != ~(uint)0))
{
/* ndb not connected yet */
set_timespec(abstime, 1);
@@ -6616,13 +7022,35 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
}
pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ {
+ Thd_ndb *thd_ndb;
+ if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
+ {
+ sql_print_error("Could not allocate Thd_ndb object");
+ goto ndb_util_thread_end;
+ }
+ set_thd_ndb(thd, thd_ndb);
+ thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
+ }
+
+#ifdef HAVE_NDB_BINLOG
+ /* create tables needed by the replication */
+ ndbcluster_setup_binlog_table_shares(thd);
+#else
/*
Get all table definitions from the storage node
*/
ndbcluster_find_all_files(thd);
+#endif
ndbcluster_util_inited= 1;
+#ifdef HAVE_NDB_BINLOG
+ /* If running, signal injector thread that all is setup */
+ if (ndb_binlog_thread_running > 0)
+ pthread_cond_signal(&injector_cond);
+#endif
+
set_timespec(abstime, 0);
for (;!abort_loop;)
{
@@ -6639,6 +7067,15 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
if (abort_loop)
break; /* Shutting down server */
+#ifdef HAVE_NDB_BINLOG
+ /*
+ Check that the apply_status_share and schema_share has been created.
+ If not try to create it
+ */
+ if (!apply_status_share || !schema_share)
+ ndbcluster_setup_binlog_table_shares(thd);
+#endif
+
if (ndb_cache_check_time == 0)
{
/* Wake up in 1 second to check if value has changed */
@@ -6652,6 +7089,12 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
for (uint i= 0; i < ndbcluster_open_tables.records; i++)
{
share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
+#ifdef HAVE_NDB_BINLOG
+ if ((share->use_count - (int) (share->op != 0) - (int) (share->op != 0))
+ <= 0)
+ continue; // injector thread is the only user, skip statistics
+ share->util_lock= current_thd; // Mark that util thread has lock
+#endif /* HAVE_NDB_BINLOG */
share->use_count++; /* Make sure the table can't be closed */
DBUG_PRINT("ndb_util_thread",
("Found open table[%d]: %s, use_count: %d",
@@ -6666,6 +7109,17 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
List_iterator_fast<NDB_SHARE> it(util_open_tables);
while ((share= it++))
{
+#ifdef HAVE_NDB_BINLOG
+ if ((share->use_count - (int) (share->op != 0) - (int) (share->op != 0))
+ <= 1)
+ {
+ /*
+ Util thread and injector thread is the only user, skip statistics
+ */
+ free_share(&share);
+ continue;
+ }
+#endif /* HAVE_NDB_BINLOG */
DBUG_PRINT("ndb_util_thread",
("Fetching commit count for: %s",
share->key));
@@ -6727,6 +7181,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
}
}
ndb_util_thread_end:
+ sql_print_information("Stopping Cluster Utility thread");
net_end(&thd->net);
thd->cleanup();
delete thd;
@@ -8072,6 +8527,7 @@ ndbcluster_show_status(THD* thd, stat_print_fn *stat_print,
enum ha_stat_type stat_type)
{
char buf[IO_SIZE];
+ uint buflen;
DBUG_ENTER("ndbcluster_show_status");
if (have_ndbcluster != SHOW_OPTION_YES)
@@ -8082,7 +8538,23 @@ ndbcluster_show_status(THD* thd, stat_print_fn *stat_print,
{
DBUG_RETURN(FALSE);
}
-
+
+ update_status_variables(g_ndb_cluster_connection);
+ buflen=
+ my_snprintf(buf, sizeof(buf),
+ "cluster_node_id=%u, "
+ "connected_host=%s, "
+ "connected_port=%u, "
+ "number_of_storage_nodes=%u",
+ ndb_cluster_node_id,
+ ndb_connected_host,
+ ndb_connected_port,
+ ndb_number_of_storage_nodes);
+ if (stat_print(thd, ndbcluster_hton.name, strlen(ndbcluster_hton.name),
+ "connection", strlen("connection"),
+ buf, buflen))
+ DBUG_RETURN(TRUE);
+
if (get_thd_ndb(thd) && get_thd_ndb(thd)->ndb)
{
Ndb* ndb= (get_thd_ndb(thd))->ndb;
@@ -8090,7 +8562,7 @@ ndbcluster_show_status(THD* thd, stat_print_fn *stat_print,
tmp.m_name= 0;
while (ndb->get_free_list_usage(&tmp))
{
- uint buflen=
+ buflen=
my_snprintf(buf, sizeof(buf),
"created=%u, free=%u, sizeof=%u",
tmp.m_created, tmp.m_free, tmp.m_sizeof);
@@ -8099,11 +8571,14 @@ ndbcluster_show_status(THD* thd, stat_print_fn *stat_print,
DBUG_RETURN(TRUE);
}
}
- send_eof(thd);
-
+#ifdef HAVE_NDB_BINLOG
+ ndbcluster_show_status_binlog(thd, stat_print, stat_type);
+#endif
+
DBUG_RETURN(FALSE);
}
+
/*
Create a table in NDB Cluster
*/
diff --git a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h
index f05c1c32a1a..694c9d9ff53 100644
--- a/sql/ha_ndbcluster.h
+++ b/sql/ha_ndbcluster.h
@@ -42,8 +42,10 @@ class NdbEventOperation;
// connectstring to cluster if given by mysqld
extern const char *ndbcluster_connectstring;
extern ulong ndb_cache_check_time;
+#ifdef HAVE_NDB_BINLOG
extern ulong ndb_report_thresh_binlog_epoch_slip;
extern ulong ndb_report_thresh_binlog_mem_usage;
+#endif
typedef enum ndb_index_type {
UNDEFINED_INDEX = 0,
@@ -86,8 +88,26 @@ typedef struct st_ndbcluster_share {
ulonglong commit_count;
char *db;
char *table_name;
+#ifdef HAVE_NDB_BINLOG
+ uint32 flags;
+ NDB_SHARE_STATE state;
+ NdbEventOperation *op;
+ NdbEventOperation *op_old; // for rename table
+ char *old_names; // for rename table
+ TABLE_SHARE *table_share;
+ TABLE *table;
+ NdbValue *ndb_value[2];
+ MY_BITMAP *subscriber_bitmap;
+ MY_BITMAP slock_bitmap;
+ uint32 slock[256/32]; // 256 bits for lock status of table
+#endif
} NDB_SHARE;
+#ifdef HAVE_NDB_BINLOG
+/* NDB_SHARE.flags */
+#define NSF_HIDDEN_PK 1 /* table has hidden primary key */
+#endif
+
typedef enum ndb_item_type {
NDB_VALUE = 0, // Qualified more with Item::Type
NDB_FIELD = 1, // Qualified from table definition
@@ -461,6 +481,11 @@ class Ndb_cond_traverse_context
Place holder for ha_ndbcluster thread specific data
*/
+enum THD_NDB_OPTIONS
+{
+ TNO_NO_LOG_SCHEMA_OP= 1 << 0
+};
+
class Thd_ndb
{
public:
@@ -472,6 +497,7 @@ class Thd_ndb
NdbTransaction *all;
NdbTransaction *stmt;
int error;
+ uint32 options;
List<NDB_SHARE> changed_tables;
};
@@ -553,6 +579,9 @@ class ha_ndbcluster: public handler
bool low_byte_first() const;
bool has_transactions();
+
+ virtual bool is_injective() const { return true; }
+
const char* index_type(uint key_number);
double scan_time();
@@ -773,18 +802,10 @@ private:
extern SHOW_VAR ndb_status_variables[];
-bool ndbcluster_init(void);
-int ndbcluster_end(ha_panic_function flag);
-
int ndbcluster_discover(THD* thd, const char* dbname, const char* name,
const void** frmblob, uint* frmlen);
int ndbcluster_find_files(THD *thd,const char *db,const char *path,
const char *wild, bool dir, List<char> *files);
int ndbcluster_table_exists_in_engine(THD* thd,
const char *db, const char *name);
-void ndbcluster_drop_database(char* path);
-
void ndbcluster_print_error(int error, const NdbOperation *error_op);
-
-bool ndbcluster_show_status(THD*,stat_print_fn *,enum ha_stat_type);
-
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
new file mode 100644
index 00000000000..c80b2b27d8d
--- /dev/null
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -0,0 +1,2732 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "mysql_priv.h"
+#include "ha_ndbcluster.h"
+
+#ifdef HAVE_NDB_BINLOG
+#include "rpl_injector.h"
+#include "slave.h"
+#include "ha_ndbcluster_binlog.h"
+
+/*
+ defines for cluster replication table names
+*/
+#include "ha_ndbcluster_tables.h"
+#define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE
+#define NDB_SCHEMA_TABLE_FILE "./" NDB_REP_DB "/" NDB_SCHEMA_TABLE
+
+/*
+ Flag showing if the ndb injector thread is running, if so == 1
+ -1 if it was started but later stopped for some reason
+ 0 if never started
+*/
+int ndb_binlog_thread_running= 0;
+
+/*
+ Global reference to the ndb injector thread THD oject
+
+ Has one sole purpose, for setting the in_use table member variable
+ in get_share(...)
+*/
+THD *injector_thd= 0;
+
+/*
+ Global reference to ndb injector thd object.
+
+ Used mainly by the binlog index thread, but exposed to the client sql
+ thread for one reason; to setup the events operations for a table
+ to enable ndb injector thread receiving events.
+
+ Must therefore always be used with a surrounding
+ pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation
+*/
+static Ndb *injector_ndb= 0;
+static Ndb *schema_ndb= 0;
+
+/*
+ Mutex and condition used for interacting between client sql thread
+ and injector thread
+*/
+pthread_t ndb_binlog_thread;
+pthread_mutex_t injector_mutex;
+pthread_cond_t injector_cond;
+
+/* NDB Injector thread (used for binlog creation) */
+static ulonglong ndb_latest_applied_binlog_epoch= 0;
+static ulonglong ndb_latest_handled_binlog_epoch= 0;
+static ulonglong ndb_latest_received_binlog_epoch= 0;
+
+NDB_SHARE *apply_status_share= 0;
+NDB_SHARE *schema_share= 0;
+
+/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
+extern Uint64 g_latest_trans_gci;
+
+/*
+ Global variables for holding the binlog_index table reference
+*/
+static TABLE *binlog_index= 0;
+static TABLE_LIST binlog_tables;
+
+/*
+ Helper functions
+*/
+
+#ifndef DBUG_OFF
+static void print_records(TABLE *table, const char *record)
+{
+ if (_db_on_)
+ {
+ for (uint j= 0; j < table->s->fields; j++)
+ {
+ char buf[40];
+ int pos= 0;
+ Field *field= table->field[j];
+ const byte* field_ptr= field->ptr - table->record[0] + record;
+ int pack_len= field->pack_length();
+ int n= pack_len < 10 ? pack_len : 10;
+
+ for (int i= 0; i < n && pos < 20; i++)
+ {
+ pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]);
+ }
+ buf[pos]= 0;
+ DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
+ }
+ }
+}
+#else
+#define print_records(a,b)
+#endif
+
+
+#ifndef DBUG_OFF
+static void dbug_print_table(const char *info, TABLE *table)
+{
+ if (table == 0)
+ {
+ DBUG_PRINT("info",("%s: (null)", info));
+ return;
+ }
+ DBUG_PRINT("info",
+ ("%s: %s.%s s->fields: %d "
+ "reclength: %d rec_buff_length: %d record[0]: %lx "
+ "record[1]: %lx",
+ info,
+ table->s->db.str,
+ table->s->table_name.str,
+ table->s->fields,
+ table->s->reclength,
+ table->s->rec_buff_length,
+ table->record[0],
+ table->record[1]));
+
+ for (unsigned int i= 0; i < table->s->fields; i++)
+ {
+ Field *f= table->field[i];
+ DBUG_PRINT("info",
+ ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
+ "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
+ i,
+ f->field_name,
+ f->flags,
+ (f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
+ (f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
+ (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
+ (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
+ (f->flags & BLOB_FLAG) ? ",blob" : "",
+ (f->flags & BINARY_FLAG) ? ",binary" : "",
+ f->real_type(),
+ f->pack_length(),
+ f->ptr, f->ptr - table->record[0],
+ f->null_bit,
+ f->null_ptr, (byte*) f->null_ptr - table->record[0]));
+ if (f->type() == MYSQL_TYPE_BIT)
+ {
+ Field_bit *g= (Field_bit*) f;
+ DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
+ "bit_ofs: %u bit_len: %u",
+ g->field_length, g->bit_ptr,
+ (byte*) g->bit_ptr-table->record[0],
+ g->bit_ofs, g->bit_len));
+ }
+ }
+}
+#else
+#define dbug_print_table(a,b)
+#endif
+
+
+/*
+ Run a query through mysql_parse
+
+ Used to:
+ - purging the cluster_replication.binlog_index
+ - creating the cluster_replication.apply_status table
+*/
+static void run_query(THD *thd, char *buf, char *end,
+ my_bool print_error, my_bool disable_binlog)
+{
+ ulong save_query_length= thd->query_length;
+ char *save_query= thd->query;
+ ulong save_thread_id= thd->variables.pseudo_thread_id;
+ ulonglong save_thd_options= thd->options;
+ DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options));
+ NET save_net= thd->net;
+
+ bzero((char*) &thd->net, sizeof(NET));
+ thd->query_length= end - buf;
+ thd->query= buf;
+ thd->variables.pseudo_thread_id= thread_id;
+ if (disable_binlog)
+ thd->options&= ~OPTION_BIN_LOG;
+
+ DBUG_PRINT("query", ("%s", thd->query));
+ mysql_parse(thd, thd->query, thd->query_length);
+
+ if (print_error && thd->query_error)
+ {
+ sql_print_error("NDB: %s: error %s %d %d %d",
+ buf, thd->net.last_error, thd->net.last_errno,
+ thd->net.report_error, thd->query_error);
+ }
+
+ thd->options= save_thd_options;
+ thd->query_length= save_query_length;
+ thd->query= save_query;
+ thd->variables.pseudo_thread_id= save_thread_id;
+ thd->net= save_net;
+
+ if (thd == injector_thd)
+ {
+ /*
+ running the query will close all tables, including the binlog_index
+ used in injector_thd
+ */
+ binlog_index= 0;
+ }
+}
+
+/*
+ Initialize the binlog part of the NDB_SHARE
+*/
+void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
+{
+ THD *thd= current_thd;
+ MEM_ROOT *mem_root= &share->mem_root;
+
+ share->op= 0;
+ share->table= 0;
+ if (ndb_binlog_thread_running <= 0)
+ {
+ DBUG_ASSERT(_table != 0);
+ if (_table->s->primary_key == MAX_KEY)
+ share->flags|= NSF_HIDDEN_PK;
+ return;
+ }
+ while (1)
+ {
+ TABLE_SHARE *table_share=
+ (TABLE_SHARE *) my_malloc(sizeof(*table_share), MYF(MY_WME));
+ TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
+ int error;
+
+ init_tmp_table_share(table_share, share->db, 0, share->table_name,
+ share->key);
+ if ((error= open_table_def(thd, table_share, 0)))
+ {
+ sql_print_error("Unable to get table share for %s, error=%d",
+ share->key, error);
+ DBUG_PRINT("error", ("open_table_def failed %d", error));
+ my_free((gptr) table_share, MYF(0));
+ table_share= 0;
+ my_free((gptr) table, MYF(0));
+ table= 0;
+ break;
+ }
+ if ((error= open_table_from_share(thd, table_share, "", 0,
+ (uint) READ_ALL, 0, table)))
+ {
+ sql_print_error("Unable to open table for %s, error=%d(%d)",
+ share->key, error, my_errno);
+ DBUG_PRINT("error", ("open_table_from_share failed %d", error));
+ my_free((gptr) table_share, MYF(0));
+ table_share= 0;
+ my_free((gptr) table, MYF(0));
+ table= 0;
+ break;
+ }
+ assign_new_table_id(table);
+ if (!table->record[1] || table->record[1] == table->record[0])
+ {
+ table->record[1]= alloc_root(&table->mem_root,
+ table->s->rec_buff_length);
+ }
+ table->in_use= injector_thd;
+
+ table->s->db.str= share->db;
+ table->s->db.length= strlen(share->db);
+ table->s->table_name.str= share->table_name;
+ table->s->table_name.length= strlen(share->table_name);
+
+ share->table_share= table_share;
+ share->table= table;
+#ifndef DBUG_OFF
+ dbug_print_table("table", table);
+#endif
+ /*
+ ! do not touch the contents of the table
+ it may be in use by the injector thread
+ */
+ share->ndb_value[0]= (NdbValue*)
+ alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
+ + 1 /*extra for hidden key*/);
+ share->ndb_value[1]= (NdbValue*)
+ alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
+ +1 /*extra for hidden key*/);
+ {
+ int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
+ share->subscriber_bitmap= (MY_BITMAP*)
+ alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
+ for (i= 0; i < no_nodes; i++)
+ {
+ bitmap_init(&share->subscriber_bitmap[i],
+ (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
+ max_ndb_nodes, false);
+ bitmap_clear_all(&share->subscriber_bitmap[i]);
+ }
+ bitmap_init(&share->slock_bitmap, share->slock,
+ sizeof(share->slock)*8, false);
+ bitmap_clear_all(&share->slock_bitmap);
+ }
+ if (table->s->primary_key == MAX_KEY)
+ share->flags|= NSF_HIDDEN_PK;
+ break;
+ }
+}
+
+/*****************************************************************
+ functions called from master sql client threads
+****************************************************************/
+
+/*
+ called in mysql_show_binlog_events and reset_logs to make sure we wait for
+ all events originating from this mysql server to arrive in the binlog
+
+ Wait for the last epoch in which the last transaction is a part of.
+
+ Wait a maximum of 30 seconds.
+*/
+static void ndbcluster_binlog_wait(THD *thd)
+{
+ if (ndb_binlog_thread_running > 0)
+ {
+ DBUG_ENTER("ndbcluster_binlog_wait");
+ const char *save_info= thd ? thd->proc_info : 0;
+ ulonglong wait_epoch= g_latest_trans_gci;
+ int count= 30;
+ if (thd)
+ thd->proc_info= "Waiting for ndbcluster binlog update to "
+ "reach current position";
+ while (count && ndb_binlog_thread_running > 0 &&
+ ndb_latest_handled_binlog_epoch < wait_epoch)
+ {
+ count--;
+ sleep(1);
+ }
+ if (thd)
+ thd->proc_info= save_info;
+ DBUG_VOID_RETURN;
+ }
+}
+
+/*
+ Called from MYSQL_LOG::reset_logs in log.cc when binlog is emptied
+*/
+static int ndbcluster_reset_logs(THD *thd)
+{
+ if (ndb_binlog_thread_running <= 0)
+ return 0;
+
+ DBUG_ENTER("ndbcluster_reset_logs");
+
+ /*
+ Wait for all events orifinating from this mysql server has
+ reached the binlog before continuing to reset
+ */
+ ndbcluster_binlog_wait(thd);
+
+ char buf[1024];
+ char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_REP_TABLE);
+
+ run_query(thd, buf, end, FALSE, TRUE);
+
+ DBUG_RETURN(0);
+}
+
+/*
+ Called from MYSQL_LOG::purge_logs in log.cc when the binlog "file"
+ is removed
+*/
+
+static int
+ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
+{
+ if (ndb_binlog_thread_running <= 0)
+ return 0;
+
+ DBUG_ENTER("ndbcluster_binlog_index_purge_file");
+ DBUG_PRINT("enter", ("file: %s", file));
+
+ char buf[1024];
+ char *end= strmov(strmov(strmov(buf,
+ "DELETE FROM "
+ NDB_REP_DB "." NDB_REP_TABLE
+ " WHERE File='"), file), "'");
+
+ run_query(thd, buf, end, FALSE, TRUE);
+
+ DBUG_RETURN(0);
+}
+
+static void
+ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command,
+ const char *query, uint query_length,
+ const char *db, const char *table_name)
+{
+ DBUG_ENTER("ndbcluster_binlog_log_query");
+ DBUG_PRINT("enter", ("db: %s table_name: %s query: %s",
+ db, table_name, query));
+ DBUG_VOID_RETURN;
+}
+
+/*
+ End use of the NDB Cluster table handler
+ - free all global variables allocated by
+ ndbcluster_init()
+*/
+
+static int ndbcluster_binlog_end(THD *thd)
+{
+ DBUG_ENTER("ndb_binlog_end");
+
+ if (!ndbcluster_util_inited)
+ DBUG_RETURN(0);
+
+ // Kill ndb utility thread
+ (void) pthread_mutex_lock(&LOCK_ndb_util_thread);
+ DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread));
+ (void) pthread_cond_signal(&COND_ndb_util_thread);
+ (void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
+#ifdef HAVE_NDB_BINLOG
+ /* wait for injector thread to finish */
+ if (ndb_binlog_thread_running > 0)
+ {
+ pthread_mutex_lock(&injector_mutex);
+ while (ndb_binlog_thread_running > 0)
+ {
+ struct timespec abstime;
+ set_timespec(abstime, 1);
+ pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
+ }
+ pthread_mutex_unlock(&injector_mutex);
+ }
+
+ /* remove all shares */
+ {
+ pthread_mutex_lock(&ndbcluster_mutex);
+ for (uint i= 0; i < ndbcluster_open_tables.records; i++)
+ {
+ NDB_SHARE *share=
+ (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
+ if (share->table)
+ DBUG_PRINT("share",
+ ("table->s->db.table_name: %s.%s",
+ share->table->s->db.str, share->table->s->table_name.str));
+ if (share->state != NSS_DROPPED && !--share->use_count)
+ real_free_share(&share);
+ else
+ {
+ DBUG_PRINT("share",
+ ("[%d] 0x%lx key: %s key_length: %d",
+ i, share, share->key, share->key_length));
+ DBUG_PRINT("share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ share->db, share->table_name,
+ share->use_count, share->commit_count));
+ }
+ }
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ }
+#endif
+ ndbcluster_util_inited= 0;
+ DBUG_RETURN(0);
+}
+
+/*****************************************************************
+ functions called from slave sql client threads
+****************************************************************/
+static void ndbcluster_reset_slave(THD *thd)
+{
+ if (ndb_binlog_thread_running <= 0)
+ return;
+
+ DBUG_ENTER("ndbcluster_reset_slave");
+ char buf[1024];
+ char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
+ run_query(thd, buf, end, FALSE, TRUE);
+ DBUG_VOID_RETURN;
+}
+
+/*
+ Initialize the binlog part of the ndb handlerton
+*/
+static int ndbcluster_binlog_func(THD *thd, enum_binlog_func fn, void *arg)
+{
+ switch(fn)
+ {
+ case BFN_RESET_LOGS:
+ ndbcluster_reset_logs(thd);
+ break;
+ case BFN_RESET_SLAVE:
+ ndbcluster_reset_slave(thd);
+ break;
+ case BFN_BINLOG_WAIT:
+ ndbcluster_binlog_wait(thd);
+ break;
+ case BFN_BINLOG_END:
+ ndbcluster_binlog_end(thd);
+ break;
+ case BFN_BINLOG_PURGE_FILE:
+ ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
+ break;
+ }
+ return 0;
+}
+
+void ndbcluster_binlog_init_handlerton()
+{
+ handlerton &h= ndbcluster_hton;
+ h.binlog_func= ndbcluster_binlog_func;
+ h.binlog_log_query= ndbcluster_binlog_log_query;
+}
+
+
+
+
+
+/*
+ check the availability af the cluster_replication.apply_status share
+ - return share, but do not increase refcount
+ - return 0 if there is no share
+*/
+static NDB_SHARE *ndbcluster_check_apply_status_share()
+{
+ pthread_mutex_lock(&ndbcluster_mutex);
+
+ void *share= hash_search(&ndbcluster_open_tables,
+ NDB_APPLY_TABLE_FILE,
+ sizeof(NDB_APPLY_TABLE_FILE) - 1);
+ DBUG_PRINT("info",("ndbcluster_check_apply_status_share %s %p",
+ NDB_APPLY_TABLE_FILE, share));
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ return (NDB_SHARE*) share;
+}
+
+/*
+ check the availability af the cluster_replication.schema share
+ - return share, but do not increase refcount
+ - return 0 if there is no share
+*/
+static NDB_SHARE *ndbcluster_check_schema_share()
+{
+ pthread_mutex_lock(&ndbcluster_mutex);
+
+ void *share= hash_search(&ndbcluster_open_tables,
+ NDB_SCHEMA_TABLE_FILE,
+ sizeof(NDB_SCHEMA_TABLE_FILE) - 1);
+ DBUG_PRINT("info",("ndbcluster_check_schema_share %s %p",
+ NDB_SCHEMA_TABLE_FILE, share));
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ return (NDB_SHARE*) share;
+}
+
+/*
+ Create the cluster_replication.apply_status table
+*/
+static int ndbcluster_create_apply_status_table(THD *thd)
+{
+ DBUG_ENTER("ndbcluster_create_apply_status_table");
+
+ /*
+ Check if we already have the apply status table.
+ If so it should have been discovered at startup
+ and thus have a share
+ */
+
+ if (ndbcluster_check_apply_status_share())
+ DBUG_RETURN(0);
+
+ if (g_ndb_cluster_connection->get_no_ready() <= 0)
+ DBUG_RETURN(0);
+
+ char buf[1024], *end;
+
+ if (ndb_extra_logging)
+ sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE);
+
+ /*
+ Check if apply status table exists in MySQL "dictionary"
+ if so, remove it since there is none in Ndb
+ */
+ {
+ strxnmov(buf, sizeof(buf),
+ mysql_data_home,
+ "/" NDB_REP_DB "/" NDB_APPLY_TABLE,
+ reg_ext, NullS);
+ unpack_filename(buf,buf);
+ my_delete(buf, MYF(0));
+ }
+
+ /*
+ Note, updating this table schema must be reflected in ndb_restore
+ */
+ end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
+ NDB_REP_DB "." NDB_APPLY_TABLE
+ " ( server_id INT UNSIGNED NOT NULL,"
+ " epoch BIGINT UNSIGNED NOT NULL, "
+ " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB");
+
+ run_query(thd, buf, end, TRUE, TRUE);
+
+ DBUG_RETURN(0);
+}
+
+
+/*
+ Create the cluster_replication.schema table
+*/
+static int ndbcluster_create_schema_table(THD *thd)
+{
+ DBUG_ENTER("ndbcluster_create_schema_table");
+
+ /*
+ Check if we already have the schema table.
+ If so it should have been discovered at startup
+ and thus have a share
+ */
+
+ if (ndbcluster_check_schema_share())
+ DBUG_RETURN(0);
+
+ if (g_ndb_cluster_connection->get_no_ready() <= 0)
+ DBUG_RETURN(0);
+
+ char buf[1024], *end;
+
+ if (ndb_extra_logging)
+ sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_SCHEMA_TABLE);
+
+ /*
+ Check if schema table exists in MySQL "dictionary"
+ if so, remove it since there is none in Ndb
+ */
+ {
+ strxnmov(buf, sizeof(buf),
+ mysql_data_home,
+ "/" NDB_REP_DB "/" NDB_SCHEMA_TABLE,
+ reg_ext, NullS);
+ unpack_filename(buf,buf);
+ my_delete(buf, MYF(0));
+ }
+
+ /*
+ Update the defines below to reflect the table schema
+ */
+ end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
+ NDB_REP_DB "." NDB_SCHEMA_TABLE
+ " ( db VARCHAR(63) NOT NULL,"
+ " name VARCHAR(63) NOT NULL,"
+ " slock BINARY(32) NOT NULL,"
+ " query VARCHAR(4094) NOT NULL,"
+ " node_id INT UNSIGNED NOT NULL,"
+ " epoch BIGINT UNSIGNED NOT NULL,"
+ " id INT UNSIGNED NOT NULL,"
+ " version INT UNSIGNED NOT NULL,"
+ " type INT UNSIGNED NOT NULL,"
+ " PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB");
+
+ run_query(thd, buf, end, TRUE, TRUE);
+
+ DBUG_RETURN(0);
+}
+
+void ndbcluster_setup_binlog_table_shares(THD *thd)
+{
+ int done_find_all_files= 0;
+ if (!apply_status_share &&
+ ndbcluster_check_apply_status_share() == 0)
+ {
+ if (!done_find_all_files)
+ {
+ ndbcluster_find_all_files(thd);
+ done_find_all_files= 1;
+ }
+ ndbcluster_create_apply_status_table(thd);
+ }
+ if (!schema_share &&
+ ndbcluster_check_schema_share() == 0)
+ {
+ if (!done_find_all_files)
+ {
+ ndbcluster_find_all_files(thd);
+ done_find_all_files= 1;
+ }
+ ndbcluster_create_schema_table(thd);
+ }
+}
+
+/*
+ Defines and struct for schema table.
+ Should reflect table definition above.
+*/
+#define SCHEMA_DB_I 0u
+#define SCHEMA_NAME_I 1u
+#define SCHEMA_SLOCK_I 2u
+#define SCHEMA_QUERY_I 3u
+#define SCHEMA_NODE_ID_I 4u
+#define SCHEMA_EPOCH_I 5u
+#define SCHEMA_ID_I 6u
+#define SCHEMA_VERSION_I 7u
+#define SCHEMA_TYPE_I 8u
+#define SCHEMA_SIZE 9u
+#define SCHEMA_SLOCK_SIZE 32u
+#define SCHEMA_QUERY_SIZE 4096u
+
+struct Cluster_replication_schema
+{
+ unsigned char db_length;
+ char db[64];
+ unsigned char name_length;
+ char name[64];
+ unsigned char slock_length;
+ uint32 slock[SCHEMA_SLOCK_SIZE/4];
+ unsigned short query_length;
+ char query[SCHEMA_QUERY_SIZE];
+ Uint64 epoch;
+ uint32 node_id;
+ uint32 id;
+ uint32 version;
+ uint32 type;
+};
+
+/*
+ Transfer schema table data into corresponding struct
+*/
+static void ndbcluster_get_schema(TABLE *table,
+ Cluster_replication_schema *s)
+{
+ Field **field;
+ /* db varchar 1 length byte */
+ field= table->field;
+ s->db_length= *(uint8*)(*field)->ptr;
+ DBUG_ASSERT(s->db_length <= (*field)->field_length);
+ DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
+ memcpy(s->db, (*field)->ptr + 1, s->db_length);
+ s->db[s->db_length]= 0;
+ /* name varchar 1 length byte */
+ field++;
+ s->name_length= *(uint8*)(*field)->ptr;
+ DBUG_ASSERT(s->name_length <= (*field)->field_length);
+ DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
+ memcpy(s->name, (*field)->ptr + 1, s->name_length);
+ s->name[s->name_length]= 0;
+ /* slock fixed length */
+ field++;
+ s->slock_length= (*field)->field_length;
+ DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
+ memcpy(s->slock, (*field)->ptr, s->slock_length);
+ /* query varchar 2 length bytes */
+ field++;
+ s->query_length= uint2korr((*field)->ptr);
+ DBUG_ASSERT(s->query_length <= (*field)->field_length);
+ DBUG_ASSERT((*field)->field_length + 2 == sizeof(s->query));
+ memcpy(s->query, (*field)->ptr + 2, s->query_length);
+ s->query[s->query_length]= 0;
+ /* node_id */
+ field++;
+ s->node_id= ((Field_long *)*field)->val_int();
+ /* epoch */
+ field++;
+ s->epoch= ((Field_long *)*field)->val_int();
+ /* id */
+ field++;
+ s->id= ((Field_long *)*field)->val_int();
+ /* version */
+ field++;
+ s->version= ((Field_long *)*field)->val_int();
+ /* type */
+ field++;
+ s->type= ((Field_long *)*field)->val_int();
+}
+
+/*
+ helper function to pack a ndb varchar
+*/
+static char *ndb_pack_varchar(const NDBCOL *col, char *buf,
+ const char *str, int sz)
+{
+ switch (col->getArrayType())
+ {
+ case NDBCOL::ArrayTypeFixed:
+ memcpy(buf, str, sz);
+ break;
+ case NDBCOL::ArrayTypeShortVar:
+ *(unsigned char*)buf= (unsigned char)sz;
+ memcpy(buf + 1, str, sz);
+ break;
+ case NDBCOL::ArrayTypeMediumVar:
+ int2store(buf, sz);
+ memcpy(buf + 2, str, sz);
+ break;
+ }
+ return buf;
+}
+
+/*
+ log query in schema table
+*/
+int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
+ const char *query, int query_length,
+ const char *db, const char *table_name,
+ uint32 ndb_table_id,
+ uint32 ndb_table_version,
+ enum SCHEMA_OP_TYPE type)
+{
+ DBUG_ENTER("ndbcluster_log_schema_op");
+#ifdef NOT_YET
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+ if (!thd_ndb)
+ {
+ if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
+ {
+ sql_print_error("Could not allocate Thd_ndb object");
+ DBUG_RETURN(1);
+ }
+ set_thd_ndb(thd, thd_ndb);
+ }
+
+ DBUG_PRINT("enter",
+ ("query: %s db: %s table_name: %s thd_ndb->options: %d",
+ query, db, table_name, thd_ndb->options));
+ if (!schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
+ {
+ DBUG_RETURN(0);
+ }
+
+ char tmp_buf2[FN_REFLEN];
+ switch (type)
+ {
+ case SOT_DROP_TABLE:
+ /* drop database command, do not log at drop table */
+ if (thd->lex->sql_command == SQLCOM_DROP_DB)
+ DBUG_RETURN(0);
+ /* redo the drop table query as is may contain several tables */
+ query= tmp_buf2;
+ query_length= (uint) (strxmov(tmp_buf2, "drop table `",
+ table_name, "`", NullS) - tmp_buf2);
+ break;
+ case SOT_CREATE_TABLE:
+ break;
+ case SOT_RENAME_TABLE:
+ break;
+ case SOT_ALTER_TABLE:
+ break;
+ case SOT_DROP_DB:
+ break;
+ case SOT_CREATE_DB:
+ break;
+ case SOT_ALTER_DB:
+ break;
+ default:
+ abort(); /* should not happen, programming error */
+ }
+
+ const NdbError *ndb_error= 0;
+ uint32 node_id= g_ndb_cluster_connection->node_id();
+ Uint64 epoch= 0;
+ MY_BITMAP schema_subscribers;
+ uint32 bitbuf[sizeof(schema_share->slock)/4];
+ {
+ int i;
+ bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, false);
+ bitmap_set_all(&schema_subscribers);
+ (void) pthread_mutex_lock(&schema_share->mutex);
+ for (i= 0; i < ndb_number_of_storage_nodes; i++)
+ {
+ MY_BITMAP *table_subscribers= &schema_share->subscriber_bitmap[i];
+ if (!bitmap_is_clear_all(table_subscribers))
+ bitmap_intersect(&schema_subscribers,
+ table_subscribers);
+ }
+ (void) pthread_mutex_unlock(&schema_share->mutex);
+ bitmap_clear_bit(&schema_subscribers, node_id);
+
+ if (share)
+ {
+ (void) pthread_mutex_lock(&share->mutex);
+ memcpy(share->slock, schema_subscribers.bitmap, sizeof(share->slock));
+ (void) pthread_mutex_unlock(&share->mutex);
+ }
+
+ DBUG_DUMP("schema_subscribers", (char*)schema_subscribers.bitmap,
+ no_bytes_in_map(&schema_subscribers));
+ DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d",
+ bitmap_is_clear_all(&schema_subscribers)));
+ }
+
+ Ndb *ndb= thd_ndb->ndb;
+ char old_db[128];
+ strcpy(old_db, ndb->getDatabaseName());
+
+ char tmp_buf[SCHEMA_QUERY_SIZE];
+ NDBDICT *dict= ndb->getDictionary();
+ ndb->setDatabaseName(NDB_REP_DB);
+ const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
+ NdbTransaction *trans= 0;
+ int retries= 100;
+ const NDBCOL *col[SCHEMA_SIZE];
+ unsigned sz[SCHEMA_SIZE];
+
+ if (ndbtab == 0)
+ {
+ if (strcmp(NDB_REP_DB, db) != 0 ||
+ strcmp(NDB_SCHEMA_TABLE, table_name))
+ {
+ ndb_error= &dict->getNdbError();
+ goto end;
+ }
+ DBUG_RETURN(0);
+ }
+
+ {
+ uint i;
+ for (i= 0; i < SCHEMA_SIZE; i++)
+ {
+ col[i]= ndbtab->getColumn(i);
+ sz[i]= col[i]->getLength();
+ DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
+ }
+ }
+
+ while (1)
+ {
+ if ((trans= ndb->startTransaction()) == 0)
+ goto err;
+ {
+ NdbOperation *op= 0;
+ int r= 0;
+ r|= (op= trans->getNdbOperation(ndbtab)) == 0;
+ DBUG_ASSERT(r == 0);
+ r|= op->writeTuple();
+ DBUG_ASSERT(r == 0);
+
+ /* db */
+ ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
+ r|= op->equal(SCHEMA_DB_I, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ /* name */
+ ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
+ strlen(table_name));
+ r|= op->equal(SCHEMA_NAME_I, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ /* slock */
+ DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
+ r|= op->setValue(SCHEMA_SLOCK_I, (char*)schema_subscribers.bitmap);
+ DBUG_ASSERT(r == 0);
+ /* query */
+ ndb_pack_varchar(col[SCHEMA_QUERY_I], tmp_buf, query, query_length);
+ r|= op->setValue(SCHEMA_QUERY_I, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ /* node_id */
+ r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
+ DBUG_ASSERT(r == 0);
+ /* epoch */
+ r|= op->setValue(SCHEMA_EPOCH_I, epoch);
+ DBUG_ASSERT(r == 0);
+ /* id */
+ r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
+ DBUG_ASSERT(r == 0);
+ /* version */
+ r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
+ DBUG_ASSERT(r == 0);
+ /* type */
+ r|= op->setValue(SCHEMA_TYPE_I, (uint32)type);
+ DBUG_ASSERT(r == 0);
+ }
+ if (trans->execute(NdbTransaction::Commit) == 0)
+ {
+ dict->forceGCPWait();
+ DBUG_PRINT("info", ("logged: %s", query));
+ break;
+ }
+err:
+ if (trans->getNdbError().status == NdbError::TemporaryError)
+ {
+ if (retries--)
+ {
+ ndb->closeTransaction(trans);
+ continue; // retry
+ }
+ }
+ ndb_error= &trans->getNdbError();
+ break;
+ }
+end:
+ if (ndb_error)
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ ndb_error->code,
+ ndb_error->message,
+ "Could not log query '%s' on other mysqld's");
+
+ if (trans)
+ ndb->closeTransaction(trans);
+ ndb->setDatabaseName(old_db);
+
+ /*
+ Wait for other mysqld's to acknowledge the table operation
+ */
+ if (ndb_error == 0 &&
+ (type == SOT_CREATE_TABLE ||
+ type == SOT_RENAME_TABLE ||
+ type == SOT_ALTER_TABLE) &&
+ !bitmap_is_clear_all(&schema_subscribers))
+ {
+ int max_timeout= 10;
+ (void) pthread_mutex_lock(&share->mutex);
+ while (1)
+ {
+ struct timespec abstime;
+ int i;
+ set_timespec(abstime, 1);
+ (void) pthread_cond_timedwait(&injector_cond,
+ &share->mutex,
+ &abstime);
+
+ (void) pthread_mutex_lock(&schema_share->mutex);
+ for (i= 0; i < ndb_number_of_storage_nodes; i++)
+ {
+ /* remove any unsubscribed from schema_subscribers */
+ MY_BITMAP *tmp= &schema_share->subscriber_bitmap[i];
+ if (!bitmap_is_clear_all(tmp))
+ bitmap_intersect(&schema_subscribers, tmp);
+ }
+ (void) pthread_mutex_unlock(&schema_share->mutex);
+
+ /* remove any unsubscribed from share->slock */
+ bitmap_intersect(&share->slock_bitmap, &schema_subscribers);
+
+ DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap,
+ no_bytes_in_map(&share->slock_bitmap));
+
+ if (bitmap_is_clear_all(&share->slock_bitmap))
+ break;
+
+ max_timeout--;
+ if (max_timeout == 0)
+ {
+ sql_print_error("NDB create table: timed out. Ignoring...");
+ break;
+ }
+ sql_print_information("NDB create table: "
+ "waiting max %u sec for create table %s.",
+ max_timeout, share->key);
+ }
+ (void) pthread_mutex_unlock(&share->mutex);
+ }
+#endif
+ DBUG_RETURN(0);
+}
+
+/*
+ acknowledge handling of schema operation
+*/
+static int
+ndbcluster_update_slock(THD *thd,
+ const char *db,
+ const char *table_name)
+{
+ DBUG_ENTER("ndbcluster_update_slock");
+ if (!schema_share)
+ {
+ DBUG_RETURN(0);
+ }
+
+ const NdbError *ndb_error= 0;
+ uint32 node_id= g_ndb_cluster_connection->node_id();
+ Ndb *ndb= check_ndb_in_thd(thd);
+ char old_db[128];
+ strcpy(old_db, ndb->getDatabaseName());
+
+ char tmp_buf[SCHEMA_QUERY_SIZE];
+ NDBDICT *dict= ndb->getDictionary();
+ ndb->setDatabaseName(NDB_REP_DB);
+ const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
+ NdbTransaction *trans= 0;
+ int retries= 100;
+ const NDBCOL *col[SCHEMA_SIZE];
+ unsigned sz[SCHEMA_SIZE];
+
+ MY_BITMAP slock;
+ uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
+ bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
+
+ if (ndbtab == 0)
+ {
+ abort();
+ DBUG_RETURN(0);
+ }
+
+ {
+ uint i;
+ for (i= 0; i < SCHEMA_SIZE; i++)
+ {
+ col[i]= ndbtab->getColumn(i);
+ sz[i]= col[i]->getLength();
+ DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
+ }
+ }
+
+ while (1)
+ {
+ if ((trans= ndb->startTransaction()) == 0)
+ goto err;
+ {
+ NdbOperation *op= 0;
+ int r= 0;
+
+ /* read the bitmap exlusive */
+ r|= (op= trans->getNdbOperation(ndbtab)) == 0;
+ DBUG_ASSERT(r == 0);
+ r|= op->readTupleExclusive();
+ DBUG_ASSERT(r == 0);
+
+ /* db */
+ ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
+ r|= op->equal(SCHEMA_DB_I, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ /* name */
+ ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
+ strlen(table_name));
+ r|= op->equal(SCHEMA_NAME_I, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ /* slock */
+ r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
+ DBUG_ASSERT(r == 0);
+ }
+ if (trans->execute(NdbTransaction::NoCommit))
+ goto err;
+ bitmap_clear_bit(&slock, node_id);
+ {
+ NdbOperation *op= 0;
+ int r= 0;
+
+ /* now update the tuple */
+ r|= (op= trans->getNdbOperation(ndbtab)) == 0;
+ DBUG_ASSERT(r == 0);
+ r|= op->updateTuple();
+ DBUG_ASSERT(r == 0);
+
+ /* db */
+ ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
+ r|= op->equal(SCHEMA_DB_I, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ /* name */
+ ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
+ strlen(table_name));
+ r|= op->equal(SCHEMA_NAME_I, tmp_buf);
+ DBUG_ASSERT(r == 0);
+ /* slock */
+ r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
+ DBUG_ASSERT(r == 0);
+ /* node_id */
+ r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
+ DBUG_ASSERT(r == 0);
+ /* type */
+ r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
+ DBUG_ASSERT(r == 0);
+ }
+ if (trans->execute(NdbTransaction::Commit) == 0)
+ {
+ dict->forceGCPWait();
+ DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
+ node_id, db, table_name));
+ break;
+ }
+ err:
+ if (trans->getNdbError().status == NdbError::TemporaryError)
+ {
+ if (retries--)
+ {
+ ndb->closeTransaction(trans);
+ continue; // retry
+ }
+ }
+ ndb_error= &trans->getNdbError();
+ break;
+ }
+end:
+ if (ndb_error)
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ ndb_error->code,
+ ndb_error->message,
+ "Could not release lock on '%s.%s'",
+ db, table_name);
+ if (trans)
+ ndb->closeTransaction(trans);
+ ndb->setDatabaseName(old_db);
+ DBUG_RETURN(0);
+}
+
+/*
+ Handle _non_ data events from the storage nodes
+*/
+static int
+ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
+ NDB_SHARE *share)
+{
+ int remote_drop_table= 0, do_close_cached_tables= 0;
+
+ if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE &&
+ pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
+ {
+ ndb->setDatabaseName(share->table->s->db.str);
+ ha_ndbcluster::invalidate_dictionary_cache(share->table,
+ ndb,
+ share->table->s->table_name.str,
+ TRUE);
+ remote_drop_table= 1;
+ }
+
+ (void) pthread_mutex_lock(&share->mutex);
+ DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
+ if (share->op_old == pOp)
+ share->op_old= 0;
+ else
+ share->op= 0;
+ // either just us or drop table handling as well
+
+ /* Signal ha_ndbcluster::delete/rename_table that drop is done */
+ (void) pthread_mutex_unlock(&share->mutex);
+ (void) pthread_cond_signal(&injector_cond);
+
+ pthread_mutex_lock(&ndbcluster_mutex);
+ free_share(&share, TRUE);
+ if (remote_drop_table && share && share->state != NSS_DROPPED)
+ {
+ DBUG_PRINT("info", ("remote drop table"));
+ if (share->use_count != 1)
+ do_close_cached_tables= 1;
+ share->state= NSS_DROPPED;
+ free_share(&share, TRUE);
+ }
+ pthread_mutex_unlock(&ndbcluster_mutex);
+
+ share= 0;
+ pOp->setCustomData(0);
+
+ pthread_mutex_lock(&injector_mutex);
+ injector_ndb->dropEventOperation(pOp);
+ pOp= 0;
+ pthread_mutex_unlock(&injector_mutex);
+
+ if (do_close_cached_tables)
+ close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
+ return 0;
+}
+
+static int
+ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
+ NdbEventOperation *pOp,
+ List<Cluster_replication_schema>
+ *schema_list, MEM_ROOT *mem_root)
+{
+ DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
+ NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
+ if (share && schema_share == share)
+ {
+ NDBEVENT::TableEvent ev_type= pOp->getEventType();
+ DBUG_PRINT("enter", ("%s.%s ev_type: %d",
+ share->db, share->table_name, ev_type));
+ switch (ev_type)
+ {
+ case NDBEVENT::TE_UPDATE:
+ case NDBEVENT::TE_INSERT:
+ {
+ Cluster_replication_schema *schema= (Cluster_replication_schema *)
+ sql_alloc(sizeof(Cluster_replication_schema));
+ MY_BITMAP slock;
+ bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false);
+ uint node_id= g_ndb_cluster_connection->node_id();
+ ndbcluster_get_schema(share->table, schema);
+ if (schema->node_id != node_id)
+ {
+ int log_query= 0;
+ DBUG_PRINT("info", ("log query_length: %d query: '%s'",
+ schema->query_length, schema->query));
+ switch ((enum SCHEMA_OP_TYPE)schema->type)
+ {
+ case SOT_DROP_TABLE:
+ /* binlog dropping table after any table operations */
+ schema_list->push_back(schema, mem_root);
+ log_query= 0;
+ break;
+ case SOT_CREATE_TABLE:
+ /* fall through */
+ case SOT_RENAME_TABLE:
+ /* fall through */
+ case SOT_ALTER_TABLE:
+ pthread_mutex_lock(&LOCK_open);
+ if (ha_create_table_from_engine(thd, schema->db, schema->name))
+ {
+ sql_print_error("Could not discover table '%s.%s' from "
+ "binlog schema event '%s' from node %d",
+ schema->db, schema->name, schema->query,
+ schema->node_id);
+ }
+ pthread_mutex_unlock(&LOCK_open);
+ {
+ /* signal that schema operation has been handled */
+ DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length);
+ if (bitmap_is_set(&slock, node_id))
+ ndbcluster_update_slock(thd, schema->db, schema->name);
+ }
+ log_query= 1;
+ break;
+ case SOT_DROP_DB:
+ run_query(thd, schema->query,
+ schema->query + schema->query_length,
+ TRUE, /* print error */
+ TRUE); /* don't binlog the query */
+ /* binlog dropping database after any table operations */
+ schema_list->push_back(schema, mem_root);
+ log_query= 0;
+ break;
+ case SOT_CREATE_DB:
+ /* fall through */
+ case SOT_ALTER_DB:
+ run_query(thd, schema->query,
+ schema->query + schema->query_length,
+ TRUE, /* print error */
+ FALSE); /* binlog the query */
+ log_query= 0;
+ break;
+ case SOT_CLEAR_SLOCK:
+ {
+ char key[FN_REFLEN];
+ (void)strxnmov(key, FN_REFLEN, share_prefix, schema->db,
+ "/", schema->name, NullS);
+ NDB_SHARE *share= get_share(key, 0, false, false);
+ if (share)
+ {
+ pthread_mutex_lock(&share->mutex);
+ memcpy(share->slock, schema->slock, sizeof(share->slock));
+ DBUG_DUMP("share->slock_bitmap.bitmap",
+ (char*)share->slock_bitmap.bitmap,
+ no_bytes_in_map(&share->slock_bitmap));
+ pthread_mutex_unlock(&share->mutex);
+ pthread_cond_signal(&injector_cond);
+ free_share(&share);
+ }
+ DBUG_RETURN(0);
+ }
+ }
+ if (log_query)
+ {
+ char *thd_db_save= thd->db;
+ thd->db= schema->db;
+ thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
+ schema->query_length, FALSE,
+ schema->name[0] == 0);
+ thd->db= thd_db_save;
+ }
+ }
+ }
+ break;
+ case NDBEVENT::TE_DELETE:
+ // skip
+ break;
+ case NDBEVENT::TE_ALTER:
+ /* do the rename of the table in the share */
+ share->table->s->db.str= share->db;
+ share->table->s->db.length= strlen(share->db);
+ share->table->s->table_name.str= share->table_name;
+ share->table->s->table_name.length= strlen(share->table_name);
+ ndb_handle_schema_change(thd, ndb, pOp, share);
+ break;
+ case NDBEVENT::TE_CLUSTER_FAILURE:
+ case NDBEVENT::TE_DROP:
+ free_share(&schema_share);
+ schema_share= 0;
+ ndb_handle_schema_change(thd, ndb, pOp, share);
+ break;
+ case NDBEVENT::TE_NODE_FAILURE:
+ {
+ uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
+ DBUG_ASSERT(node_id != 0xFF);
+ (void) pthread_mutex_lock(&share->mutex);
+ bitmap_clear_all(&share->subscriber_bitmap[node_id]);
+ DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
+ (void) pthread_mutex_unlock(&share->mutex);
+ (void) pthread_cond_signal(&injector_cond);
+ break;
+ }
+ case NDBEVENT::TE_SUBSCRIBE:
+ {
+ uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
+ uint8 req_id= pOp->getReqNodeId();
+ DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
+ (void) pthread_mutex_lock(&share->mutex);
+ bitmap_set_bit(&share->subscriber_bitmap[node_id], req_id);
+ DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
+ (void) pthread_mutex_unlock(&share->mutex);
+ (void) pthread_cond_signal(&injector_cond);
+ break;
+ }
+ case NDBEVENT::TE_UNSUBSCRIBE:
+ {
+ uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
+ uint8 req_id= pOp->getReqNodeId();
+ DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
+ (void) pthread_mutex_lock(&share->mutex);
+ bitmap_clear_bit(&share->subscriber_bitmap[node_id], req_id);
+ DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
+ (void) pthread_mutex_unlock(&share->mutex);
+ (void) pthread_cond_signal(&injector_cond);
+ break;
+ }
+ default:
+ sql_print_error("NDB Binlog: unknown non data event %d for %s. "
+ "Ignoring...", (unsigned) ev_type, share->key);
+ }
+ }
+ DBUG_RETURN(0);
+}
+
+/*
+ Timer class for doing performance measurements
+*/
+
+/*********************************************************************
+ Internal helper functions for handeling of the cluster replication tables
+ - cluster_replication.binlog_index
+ - cluster_replication.apply_status
+*********************************************************************/
+
+/*
+ struct to hold the data to be inserted into the
+ cluster_replication.binlog_index table
+*/
+struct Binlog_index_row {
+ ulonglong gci;
+ const char *master_log_file;
+ ulonglong master_log_pos;
+ ulonglong n_inserts;
+ ulonglong n_updates;
+ ulonglong n_deletes;
+ ulonglong n_schemaops;
+};
+
+/*
+ Open the cluster_replication.binlog_index table
+*/
+static int open_binlog_index(THD *thd, TABLE_LIST *tables,
+ TABLE **binlog_index)
+{
+ static char repdb[]= NDB_REP_DB;
+ static char reptable[]= NDB_REP_TABLE;
+ const char *save_proc_info= thd->proc_info;
+
+ bzero((char*) tables, sizeof(*tables));
+ tables->db= repdb;
+ tables->alias= tables->table_name= reptable;
+ tables->lock_type= TL_WRITE;
+ thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
+ tables->required_type= FRMTYPE_TABLE;
+ uint counter;
+ thd->clear_error();
+ if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH))
+ {
+ sql_print_error("NDB Binlog: Opening binlog_index: %d, '%s'",
+ thd->net.last_errno,
+ thd->net.last_error ? thd->net.last_error : "");
+ thd->proc_info= save_proc_info;
+ return -1;
+ }
+ *binlog_index= tables->table;
+ thd->proc_info= save_proc_info;
+ return 0;
+}
+
+/*
+ Insert one row in the cluster_replication.binlog_index
+
+ declared friend in handler.h to be able to call write_row directly
+ so that this insert is not replicated
+*/
+int ndb_add_binlog_index(THD *thd, void *_row)
+{
+ Binlog_index_row &row= *(Binlog_index_row *) _row;
+ int error= 0;
+ bool need_reopen;
+ for ( ; ; ) /* loop for need_reopen */
+ {
+ if (!binlog_index && open_binlog_index(thd, &binlog_tables, &binlog_index))
+ {
+ error= -1;
+ goto add_binlog_index_err;
+ }
+
+ if (lock_tables(thd, &binlog_tables, 1, &need_reopen))
+ {
+ if (need_reopen)
+ {
+ close_tables_for_reopen(thd, &binlog_tables);
+ binlog_index= 0;
+ continue;
+ }
+ sql_print_error("NDB Binlog: Unable to lock table binlog_index");
+ error= -1;
+ goto add_binlog_index_err;
+ }
+ break;
+ }
+
+ binlog_index->field[0]->store(row.master_log_pos);
+ binlog_index->field[1]->store(row.master_log_file,
+ strlen(row.master_log_file),
+ &my_charset_bin);
+ binlog_index->field[2]->store(row.gci);
+ binlog_index->field[3]->store(row.n_inserts);
+ binlog_index->field[4]->store(row.n_updates);
+ binlog_index->field[5]->store(row.n_deletes);
+ binlog_index->field[6]->store(row.n_schemaops);
+
+ int r;
+ if ((r= binlog_index->file->write_row(binlog_index->record[0])))
+ {
+ sql_print_error("NDB Binlog: Writing row to binlog_index: %d", r);
+ error= -1;
+ goto add_binlog_index_err;
+ }
+
+ mysql_unlock_tables(thd, thd->lock);
+ thd->lock= 0;
+ return 0;
+add_binlog_index_err:
+ close_thread_tables(thd);
+ binlog_index= 0;
+ return error;
+}
+
+/*********************************************************************
+ Functions for start, stop, wait for ndbcluster binlog thread
+*********************************************************************/
+
+static int do_ndbcluster_binlog_close_connection= 0;
+
+int ndbcluster_binlog_start()
+{
+ DBUG_ENTER("ndbcluster_binlog_start");
+
+ pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&injector_cond, NULL);
+
+ /* Create injector thread */
+ if (pthread_create(&ndb_binlog_thread, &connection_attrib,
+ ndb_binlog_thread_func, 0))
+ {
+ DBUG_PRINT("error", ("Could not create ndb injector thread"));
+ pthread_cond_destroy(&injector_cond);
+ pthread_mutex_destroy(&injector_mutex);
+ DBUG_RETURN(-1);
+ }
+
+ /*
+ Wait for the ndb injector thread to finish starting up.
+ */
+ pthread_mutex_lock(&injector_mutex);
+ while (!ndb_binlog_thread_running)
+ pthread_cond_wait(&injector_cond, &injector_mutex);
+ pthread_mutex_unlock(&injector_mutex);
+
+ if (ndb_binlog_thread_running < 0)
+ DBUG_RETURN(-1);
+
+ DBUG_RETURN(0);
+}
+
+static void ndbcluster_binlog_close_connection(THD *thd)
+{
+ DBUG_ENTER("ndbcluster_binlog_close_connection");
+ const char *save_info= thd->proc_info;
+ thd->proc_info= "ndbcluster_binlog_close_connection";
+ do_ndbcluster_binlog_close_connection= 1;
+ while (ndb_binlog_thread_running > 0)
+ sleep(1);
+ thd->proc_info= save_info;
+ DBUG_VOID_RETURN;
+}
+
+/**************************************************************
+ Internal helper functions for creating/dropping ndb events
+ used by the client sql threads
+**************************************************************/
+void
+ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
+{
+ event_name->set_ascii("REPL$", 5);
+ event_name->append(db);
+ if (tbl)
+ {
+ event_name->append('/');
+ event_name->append(tbl);
+ }
+}
+
+/*
+ Common function for setting up everything for logging a table at
+ create/discover.
+*/
+int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
+ const char *db,
+ const char *table_name,
+ NDB_SHARE *share)
+{
+ DBUG_ENTER("ndbcluster_create_binlog_setup");
+
+ pthread_mutex_lock(&ndbcluster_mutex);
+
+ /* Handle any trailing share */
+ if (share == 0)
+ {
+ share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+ (byte*) key, strlen(key));
+ if (share)
+ handle_trailing_share(share);
+ }
+ else
+ handle_trailing_share(share);
+
+ /* Create share which is needed to hold replication information */
+ if (!(share= get_share(key, 0, true, true)))
+ {
+ sql_print_error("NDB Binlog: "
+ "allocating table share for %s failed", key);
+ }
+ pthread_mutex_unlock(&ndbcluster_mutex);
+
+ while (share && !IS_TMP_PREFIX(table_name))
+ {
+ /*
+ ToDo make sanity check of share so that the table is actually the same
+ I.e. we need to do open file from frm in this case
+ Currently awaiting this to be fixed in the 4.1 tree in the general
+ case
+ */
+
+ /* Create the event in NDB */
+ ndb->setDatabaseName(db);
+
+ NDBDICT *dict= ndb->getDictionary();
+ const NDBTAB *ndbtab= dict->getTable(table_name);
+ if (ndbtab == 0)
+ {
+ if (ndb_extra_logging)
+ sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
+ "%s, %d", key, dict->getNdbError().message,
+ dict->getNdbError().code);
+ break; // error
+ }
+ String event_name(INJECTOR_EVENT_LEN);
+ ndb_rep_event_name(&event_name, db, table_name);
+ /*
+ event should have been created by someone else,
+ but let's make sure, and create if it doesn't exist
+ */
+ if (!dict->getEvent(event_name.c_ptr()))
+ {
+ if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share))
+ {
+ sql_print_error("NDB Binlog: "
+ "FAILED CREATE (DISCOVER) TABLE Event: %s",
+ event_name.c_ptr());
+ break; // error
+ }
+ if (ndb_extra_logging)
+ sql_print_information("NDB Binlog: "
+ "CREATE (DISCOVER) TABLE Event: %s",
+ event_name.c_ptr());
+ }
+ else
+ if (ndb_extra_logging)
+ sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
+ event_name.c_ptr());
+
+ /*
+ create the event operations for receiving logging events
+ */
+ if (ndbcluster_create_event_ops(share, ndbtab,
+ event_name.c_ptr()) < 0)
+ {
+ sql_print_error("NDB Binlog:"
+ "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
+ event_name.c_ptr());
+ /* a warning has been issued to the client */
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(-1);
+}
+
+int
+ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
+ const char *event_name, NDB_SHARE *share)
+{
+ DBUG_ENTER("ndbcluster_create_event");
+ if (!share)
+ DBUG_RETURN(0);
+ NDBDICT *dict= ndb->getDictionary();
+ NDBEVENT my_event(event_name);
+ my_event.setTable(*ndbtab);
+ my_event.addTableEvent(NDBEVENT::TE_ALL);
+ if (share->flags & NSF_HIDDEN_PK)
+ {
+ /* No primary key, susbscribe for all attributes */
+ my_event.setReport(NDBEVENT::ER_ALL);
+ DBUG_PRINT("info", ("subscription all"));
+ }
+ else
+ {
+ if (schema_share || strcmp(share->db, NDB_REP_DB) ||
+ strcmp(share->table_name, NDB_SCHEMA_TABLE))
+ {
+ my_event.setReport(NDBEVENT::ER_UPDATED);
+ DBUG_PRINT("info", ("subscription only updated"));
+ }
+ else
+ {
+ my_event.setReport((NDBEVENT::EventReport)
+ (NDBEVENT::ER_ALL | NDBEVENT::ER_SUBSCRIBE));
+ DBUG_PRINT("info", ("subscription all and subscribe"));
+ }
+ }
+
+ /* add all columns to the event */
+ int n_cols= ndbtab->getNoOfColumns();
+ for(int a= 0; a < n_cols; a++)
+ my_event.addEventColumn(a);
+
+ if (dict->createEvent(my_event)) // Add event to database
+ {
+#ifdef NDB_BINLOG_EXTRA_WARNINGS
+ /*
+ failed, print a warning
+ */
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ dict->getNdbError().code,
+ dict->getNdbError().message, "NDB");
+#endif
+ if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
+ {
+ sql_print_error("NDB Binlog: Unable to create event in database. "
+ "Event: %s Error Code: %d Message: %s", event_name,
+ dict->getNdbError().code, dict->getNdbError().message);
+ DBUG_RETURN(-1);
+ }
+
+ /*
+ trailing event from before; an error, but try to correct it
+ */
+ if (dict->dropEvent(my_event.getName()))
+ {
+ sql_print_error("NDB Binlog: Unable to create event in database. "
+ " Attempt to correct with drop failed. "
+ "Event: %s Error Code: %d Message: %s",
+ event_name,
+ dict->getNdbError().code,
+ dict->getNdbError().message);
+ DBUG_RETURN(-1);
+ }
+
+ /*
+ try to add the event again
+ */
+ if (dict->createEvent(my_event))
+ {
+ sql_print_error("NDB Binlog: Unable to create event in database. "
+ " Attempt to correct with drop ok, but create failed. "
+ "Event: %s Error Code: %d Message: %s",
+ event_name,
+ dict->getNdbError().code,
+ dict->getNdbError().message);
+ DBUG_RETURN(-1);
+ }
+#ifdef NDB_BINLOG_EXTRA_WARNINGS
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ 0, "NDB Binlog: Removed trailing event",
+ "NDB");
+#endif
+ }
+
+ DBUG_RETURN(0);
+}
+
+inline int is_ndb_compatible_type(Field *field)
+{
+ return
+ !(field->flags & BLOB_FLAG) &&
+ field->type() != MYSQL_TYPE_BIT &&
+ field->pack_length() != 0;
+}
+
+/*
+ - create eventOperations for receiving log events
+ - setup ndb recattrs for reception of log event data
+ - "start" the event operation
+
+ used at create/discover of tables
+*/
+int
+ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
+ const char *event_name)
+{
+ /*
+ we are in either create table or rename table so table should be
+ locked, hence we can work with the share without locks
+ */
+
+ DBUG_ENTER("ndbcluster_create_event_ops");
+
+ DBUG_ASSERT(share != 0);
+
+ if (share->op)
+ {
+ assert(share->op->getCustomData() == (void *) share);
+
+ DBUG_ASSERT(share->use_count > 1);
+ sql_print_error("NDB Binlog: discover reusing old ev op");
+ free_share(&share); // old event op already has reference
+ DBUG_RETURN(0);
+ }
+
+ TABLE *table= share->table;
+ if (table)
+ {
+ /*
+ Logging of blob tables is not yet implemented, it would require:
+ 1. setup of events also on the blob attribute tables
+ 2. collect the pieces of the blob into one from an epoch to
+ provide a full blob to binlog
+ */
+ if (table->s->blob_fields)
+ {
+ sql_print_error("NDB Binlog: logging of blob table %s "
+ "is not supported", share->key);
+ DBUG_RETURN(0);
+ }
+ }
+
+ int do_schema_share= 0, do_apply_status_share= 0;
+ int retries= 100;
+ if (!schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
+ strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
+ do_schema_share= 1;
+ else if (!apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
+ strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
+ do_apply_status_share= 1;
+
+ while (1)
+ {
+ pthread_mutex_lock(&injector_mutex);
+ Ndb *ndb= injector_ndb;
+ if (do_schema_share)
+ ndb= schema_ndb;
+
+ if (ndb == 0)
+ {
+ pthread_mutex_unlock(&injector_mutex);
+ DBUG_RETURN(-1);
+ }
+
+ NdbEventOperation *op= ndb->createEventOperation(event_name);
+ if (!op)
+ {
+ pthread_mutex_unlock(&injector_mutex);
+ sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
+ " %s",event_name);
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ ndb->getNdbError().code,
+ ndb->getNdbError().message,
+ "NDB");
+ DBUG_RETURN(-1);
+ }
+
+ int n_columns= ndbtab->getNoOfColumns();
+ int n_fields= table ? table->s->fields : 0;
+ for (int j= 0; j < n_columns; j++)
+ {
+ const char *col_name= ndbtab->getColumn(j)->getName();
+ NdbRecAttr *attr0, *attr1;
+ if (j < n_fields)
+ {
+ Field *f= share->table->field[j];
+ if (is_ndb_compatible_type(f))
+ {
+ DBUG_PRINT("info", ("%s compatible", col_name));
+ attr0= op->getValue(col_name, f->ptr);
+ attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) +
+ share->table->record[1]);
+ }
+ else
+ {
+ DBUG_PRINT("info", ("%s non compatible", col_name));
+ attr0= op->getValue(col_name);
+ attr1= op->getPreValue(col_name);
+ }
+ }
+ else
+ {
+ DBUG_PRINT("info", ("%s hidden key", col_name));
+ attr0= op->getValue(col_name);
+ attr1= op->getPreValue(col_name);
+ }
+ share->ndb_value[0][j].rec= attr0;
+ share->ndb_value[1][j].rec= attr1;
+ }
+ op->setCustomData((void *) share); // set before execute
+ share->op= op; // assign op in NDB_SHARE
+ if (op->execute())
+ {
+ share->op= NULL;
+ retries--;
+ if (op->getNdbError().status != NdbError::TemporaryError &&
+ op->getNdbError().code != 1407)
+ retries= 0;
+ if (retries == 0)
+ {
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ op->getNdbError().code, op->getNdbError().message,
+ "NDB");
+ sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
+ event_name,
+ op->getNdbError().code, op->getNdbError().message);
+ }
+ ndb->dropEventOperation(op);
+ pthread_mutex_unlock(&injector_mutex);
+ if (retries)
+ continue;
+ DBUG_RETURN(-1);
+ }
+ pthread_mutex_unlock(&injector_mutex);
+ break;
+ }
+
+ get_share(share);
+ if (do_apply_status_share)
+ apply_status_share= get_share(share);
+ else if (do_schema_share)
+ schema_share= get_share(share);
+
+ DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u",
+ share->key, share->op, share->use_count));
+
+ if (ndb_extra_logging)
+ sql_print_information("NDB Binlog: logging %s", share->key);
+ DBUG_RETURN(0);
+}
+
+/*
+ when entering the calling thread should have a share lock id share != 0
+ then the injector thread will have one as well, i.e. share->use_count == 0
+ (unless it has already dropped... then share->op == 0)
+*/
+int
+ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
+ NDB_SHARE *share)
+{
+ DBUG_ENTER("ndbcluster_handle_drop_table");
+
+ NDBDICT *dict= ndb->getDictionary();
+ if (event_name && dict->dropEvent(event_name))
+ {
+ if (dict->getNdbError().code != 4710)
+ {
+ /* drop event failed for some reason, issue a warning */
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ dict->getNdbError().code,
+ dict->getNdbError().message, "NDB");
+ /* error is not that the event did not exist */
+ sql_print_error("NDB Binlog: Unable to drop event in database. "
+ "Event: %s Error Code: %d Message: %s",
+ event_name,
+ dict->getNdbError().code,
+ dict->getNdbError().message);
+ /* ToDo; handle error? */
+ if (share && share->op &&
+ share->op->getState() == NdbEventOperation::EO_EXECUTING &&
+ dict->getNdbError().code != 4009)
+ {
+ DBUG_ASSERT(false);
+ DBUG_RETURN(-1);
+ }
+ }
+ }
+
+ if (share == 0 || share->op == 0)
+ {
+ DBUG_RETURN(0);
+ }
+
+/*
+ Syncronized drop between client thread and injector thread is
+ neccessary in order to maintain ordering in the binlog,
+ such that the drop occurs _after_ any inserts/updates/deletes.
+
+ The penalty for this is that the drop table becomes slow.
+
+ This wait is however not strictly neccessary to produce a binlog
+ that is usable. However the slave does not currently handle
+ these out of order, thus we are keeping the SYNC_DROP_ defined
+ for now.
+*/
+#define SYNC_DROP_
+#ifdef SYNC_DROP_
+ (void) pthread_mutex_lock(&share->mutex);
+ int max_timeout= 10;
+ while (share->op)
+ {
+ struct timespec abstime;
+ set_timespec(abstime, 1);
+ (void) pthread_cond_timedwait(&injector_cond,
+ &share->mutex,
+ &abstime);
+ max_timeout--;
+ if (share->op == 0)
+ break;
+ if (max_timeout == 0)
+ {
+ sql_print_error("NDB delete table: timed out. Ignoring...");
+ break;
+ }
+ if (ndb_extra_logging)
+ sql_print_information("NDB delete table: "
+ "waiting max %u sec for drop table %s.",
+ max_timeout, share->key);
+ }
+ (void) pthread_mutex_unlock(&share->mutex);
+#else
+ (void) pthread_mutex_lock(&share->mutex);
+ share->op_old= share->op;
+ share->op= 0;
+ (void) pthread_mutex_unlock(&share->mutex);
+#endif
+
+ DBUG_RETURN(0);
+}
+
+
+/********************************************************************
+ Internal helper functions for differentd events from the stoarage nodes
+ used by the ndb injector thread
+********************************************************************/
+
+/*
+ Handle error states on events from the storage nodes
+*/
+static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp,
+ Binlog_index_row &row)
+{
+ NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
+ DBUG_ENTER("ndb_binlog_thread_handle_error");
+
+ int overrun= pOp->isOverrun();
+ if (overrun)
+ {
+ /*
+ ToDo: this error should rather clear the binlog_index...
+ and continue
+ */
+ sql_print_error("NDB Binlog: Overrun in event buffer, "
+ "this means we have dropped events. Cannot "
+ "continue binlog for %s", share->key);
+ pOp->clearError();
+ DBUG_RETURN(-1);
+ }
+
+ if (!pOp->isConsistent())
+ {
+ /*
+ ToDo: this error should rather clear the binlog_index...
+ and continue
+ */
+ sql_print_error("NDB Binlog: Not Consistent. Cannot "
+ "continue binlog for %s. Error code: %d"
+ " Message: %s", share->key,
+ pOp->getNdbError().code,
+ pOp->getNdbError().message);
+ pOp->clearError();
+ DBUG_RETURN(-1);
+ }
+ sql_print_error("NDB Binlog: unhandled error %d for table %s",
+ pOp->hasError(), share->key);
+ pOp->clearError();
+ DBUG_RETURN(0);
+}
+
+static int
+ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
+ Binlog_index_row &row)
+{
+ NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
+ NDBEVENT::TableEvent type= pOp->getEventType();
+
+ /* make sure to flush any pending events as they can be dependent
+ on one of the tables being changed below
+ */
+ injector_thd->binlog_flush_pending_rows_event(true);
+
+ switch (type)
+ {
+ case NDBEVENT::TE_CLUSTER_FAILURE:
+ if (apply_status_share == share)
+ {
+ free_share(&apply_status_share);
+ apply_status_share= 0;
+ }
+ if (ndb_extra_logging)
+ sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
+ DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: "
+ "%s received share: 0x%lx op: %lx share op: %lx "
+ "op_old: %lx",
+ share->key, share, pOp, share->op, share->op_old));
+ break;
+ case NDBEVENT::TE_ALTER:
+ /* ToDo: remove printout */
+ if (ndb_extra_logging)
+ sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
+ share_prefix, share->table->s->db.str,
+ share->table->s->table_name.str,
+ share->key);
+ /* do the rename of the table in the share */
+ share->table->s->db.str= share->db;
+ share->table->s->db.length= strlen(share->db);
+ share->table->s->table_name.str= share->table_name;
+ share->table->s->table_name.length= strlen(share->table_name);
+ goto drop_alter_common;
+ case NDBEVENT::TE_DROP:
+ if (apply_status_share == share)
+ {
+ free_share(&apply_status_share);
+ apply_status_share= 0;
+ }
+ /* ToDo: remove printout */
+ if (ndb_extra_logging)
+ sql_print_information("NDB Binlog: drop table %s.", share->key);
+drop_alter_common:
+ row.n_schemaops++;
+ DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: %lx "
+ "share op: %lx op_old: %lx",
+ type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
+ share->key, share, pOp, share->op, share->op_old));
+ break;
+ case NDBEVENT::TE_NODE_FAILURE:
+ /* fall through */
+ case NDBEVENT::TE_SUBSCRIBE:
+ /* fall through */
+ case NDBEVENT::TE_UNSUBSCRIBE:
+ /* ignore */
+ return 0;
+ default:
+ sql_print_error("NDB Binlog: unknown non data event %d for %s. "
+ "Ignoring...", (unsigned) type, share->key);
+ return 0;
+ }
+
+ ndb_handle_schema_change(injector_thd, ndb, pOp, share);
+ return 0;
+}
+
+/*
+ Handle data events from the storage nodes
+*/
+static int
+ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
+ Binlog_index_row &row,
+ injector::transaction &trans)
+{
+ NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
+ if (share == apply_status_share)
+ return 0;
+ TABLE *table= share->table;
+
+ assert(table != 0);
+
+ dbug_print_table("table", table);
+
+ TABLE_SHARE *table_s= table->s;
+ uint n_fields= table_s->fields;
+ MY_BITMAP b;
+ /* Potential buffer for the bitmap */
+ uint32 bitbuf[128 / (sizeof(uint32) * 8)];
+ bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL,
+ n_fields, false);
+ bitmap_set_all(&b);
+
+ /*
+ row data is already in table->record[0]
+ As we told the NdbEventOperation to do this
+ (saves moving data about many times)
+ */
+
+ switch(pOp->getEventType())
+ {
+ case NDBEVENT::TE_INSERT:
+ row.n_inserts++;
+ DBUG_PRINT("info", ("INSERT INTO %s", share->key));
+ {
+ ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
+ trans.write_row(::server_id, injector::transaction::table(table, true),
+ &b, n_fields, table->record[0]);
+ }
+ break;
+ case NDBEVENT::TE_DELETE:
+ row.n_deletes++;
+ DBUG_PRINT("info",("DELETE FROM %s", share->key));
+ {
+ /*
+ table->record[0] contains only the primary key in this case
+ since we do not have an after image
+ */
+ int n;
+ if (table->s->primary_key != MAX_KEY)
+ n= 0; /*
+ use the primary key only as it save time and space and
+ it is the only thing needed to log the delete
+ */
+ else
+ n= 1; /*
+ we use the before values since we don't have a primary key
+ since the mysql server does not handle the hidden primary
+ key
+ */
+
+ ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
+ print_records(table, table->record[n]);
+ trans.delete_row(::server_id, injector::transaction::table(table, true),
+ &b, n_fields, table->record[n]);
+ }
+ break;
+ case NDBEVENT::TE_UPDATE:
+ row.n_updates++;
+ DBUG_PRINT("info", ("UPDATE %s", share->key));
+ {
+ ndb_unpack_record(table, share->ndb_value[0],
+ &b, table->record[0]);
+ print_records(table, table->record[0]);
+ if (table->s->primary_key != MAX_KEY)
+ {
+ /*
+ since table has a primary key, we can to a write
+ using only after values
+ */
+ trans.write_row(::server_id, injector::transaction::table(table, true),
+ &b, n_fields, table->record[0]);// after values
+ }
+ else
+ {
+ /*
+ mysql server cannot handle the ndb hidden key and
+ therefore needs the before image as well
+ */
+ ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
+ print_records(table, table->record[1]);
+ trans.update_row(::server_id,
+ injector::transaction::table(table, true),
+ &b, n_fields,
+ table->record[1], // before values
+ table->record[0]);// after values
+ }
+ }
+ break;
+ default:
+ /* We should REALLY never get here. */
+ DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
+ break;
+ }
+
+ return 0;
+}
+
+//#define RUN_NDB_BINLOG_TIMER
+#ifdef RUN_NDB_BINLOG_TIMER
+class Timer
+{
+public:
+ Timer() { start(); }
+ void start() { gettimeofday(&m_start, 0); }
+ void stop() { gettimeofday(&m_stop, 0); }
+ ulong elapsed_ms()
+ {
+ return (ulong)
+ (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
+ ((longlong) m_stop.tv_usec -
+ (longlong) m_start.tv_usec + 999) / 1000);
+ }
+private:
+ struct timeval m_start,m_stop;
+};
+#endif
+
+/****************************************************************
+ Injector thread main loop
+****************************************************************/
+
+pthread_handler_t ndb_binlog_thread_func(void *arg)
+{
+ THD *thd; /* needs to be first for thread_stack */
+ Ndb *ndb= 0;
+ Thd_ndb *thd_ndb=0;
+ int ndb_update_binlog_index= 1;
+ injector *inj= injector::instance();
+
+ pthread_mutex_lock(&injector_mutex);
+ /*
+ Set up the Thread
+ */
+ my_thread_init();
+ DBUG_ENTER("ndb_binlog_thread");
+
+ thd= new THD; /* note that contructor of THD uses DBUG_ */
+ THD_CHECK_SENTRY(thd);
+
+ thd->thread_stack= (char*) &thd; /* remember where our stack is */
+ if (thd->store_globals())
+ {
+ thd->cleanup();
+ delete thd;
+ ndb_binlog_thread_running= -1;
+ pthread_mutex_unlock(&injector_mutex);
+ pthread_cond_signal(&injector_cond);
+ my_thread_end();
+ pthread_exit(0);
+ DBUG_RETURN(NULL);
+ }
+
+ thd->init_for_queries();
+ thd->command= COM_DAEMON;
+ thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
+ thd->version= refresh_version;
+ thd->set_time();
+ thd->main_security_ctx.host_or_ip= "";
+ thd->client_capabilities= 0;
+ my_net_init(&thd->net, 0);
+ thd->main_security_ctx.master_access= ~0;
+ thd->main_security_ctx.priv_user= 0;
+
+ /*
+ Set up ndb binlog
+ */
+ sql_print_information("Starting MySQL Cluster Binlog Thread");
+
+ pthread_detach_this_thread();
+ thd->real_id= pthread_self();
+ pthread_mutex_lock(&LOCK_thread_count);
+ thd->thread_id= thread_id++;
+ threads.append(thd);
+ pthread_mutex_unlock(&LOCK_thread_count);
+ thd->lex->start_transaction_opt= 0;
+
+ if (!(schema_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
+ schema_ndb->init())
+ {
+ sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
+ goto err;
+ }
+
+ if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) ||
+ ndb->init())
+ {
+ sql_print_error("NDB Binlog: Getting Ndb object failed");
+ ndb_binlog_thread_running= -1;
+ pthread_mutex_unlock(&injector_mutex);
+ pthread_cond_signal(&injector_cond);
+ goto err;
+ }
+
+ /*
+ Expose global reference to our ndb object.
+
+ Used by both sql client thread and binlog thread to interact
+ with the storage
+ pthread_mutex_lock(&injector_mutex);
+ */
+ injector_thd= thd;
+ injector_ndb= ndb;
+ ndb_binlog_thread_running= 1;
+
+ /*
+ We signal the thread that started us that we've finished
+ starting up.
+ */
+ pthread_mutex_unlock(&injector_mutex);
+ pthread_cond_signal(&injector_cond);
+
+ thd->proc_info= "Waiting for ndbcluster to start";
+
+ pthread_mutex_lock(&injector_mutex);
+ while (!ndbcluster_util_inited)
+ {
+ /* ndb not connected yet */
+ struct timespec abstime;
+ set_timespec(abstime, 1);
+ pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
+ if (abort_loop)
+ {
+ pthread_mutex_unlock(&injector_mutex);
+ goto err;
+ }
+ }
+ pthread_mutex_unlock(&injector_mutex);
+
+ /*
+ Main NDB Injector loop
+ */
+
+ DBUG_ASSERT(ndbcluster_hton.slot != ~(uint)0);
+ if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
+ {
+ sql_print_error("Could not allocate Thd_ndb object");
+ goto err;
+ }
+ set_thd_ndb(thd, thd_ndb);
+ thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
+ thd->query_id= 0; // to keep valgrind quiet
+ {
+ static char db[]= "";
+ thd->db= db;
+ open_binlog_index(thd, &binlog_tables, &binlog_index);
+ if (!apply_status_share)
+ {
+ sql_print_error("NDB: Could not get apply status share");
+ }
+ thd->db= db;
+ }
+
+#ifdef RUN_NDB_BINLOG_TIMER
+ Timer main_timer;
+#endif
+ for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) &&
+ ndb_latest_handled_binlog_epoch >= g_latest_trans_gci); )
+ {
+
+#ifdef RUN_NDB_BINLOG_TIMER
+ main_timer.stop();
+ sql_print_information("main_timer %ld ms", main_timer.elapsed_ms());
+ main_timer.start();
+#endif
+
+ /*
+ now we don't want any events before next gci is complete
+ */
+ thd->proc_info= "Waiting for event from ndbcluster";
+ thd->set_time();
+
+ /* wait for event or 1000 ms */
+ Uint64 gci, schema_gci;
+ int res= ndb->pollEvents(1000, &gci);
+ int schema_res= schema_ndb->pollEvents(0, &schema_gci);
+ ndb_latest_received_binlog_epoch= gci;
+
+ while (gci > schema_gci && schema_res >= 0)
+ schema_res= schema_ndb->pollEvents(10, &schema_gci);
+
+ if ((abort_loop || do_ndbcluster_binlog_close_connection) &&
+ ndb_latest_handled_binlog_epoch >= g_latest_trans_gci)
+ break; /* Shutting down server */
+
+ if (binlog_index && binlog_index->s->version < refresh_version)
+ {
+ if (binlog_index->s->version < refresh_version)
+ {
+ close_thread_tables(thd);
+ binlog_index= 0;
+ }
+ }
+
+ MEM_ROOT **root_ptr=
+ my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
+ MEM_ROOT *old_root= *root_ptr;
+ MEM_ROOT mem_root;
+ init_sql_alloc(&mem_root, 4096, 0);
+ List<Cluster_replication_schema> schema_list;
+ *root_ptr= &mem_root;
+
+ if (unlikely(schema_res > 0))
+ {
+ schema_ndb->
+ setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
+ schema_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
+ NdbEventOperation *pOp= schema_ndb->nextEvent();
+ while (pOp != NULL)
+ {
+ if (!pOp->hasError())
+ ndb_binlog_thread_handle_schema_event(thd, schema_ndb, pOp,
+ &schema_list, &mem_root);
+ else
+ sql_print_error("NDB: error %lu (%s) on handling "
+ "binlog schema event",
+ (ulong) pOp->getNdbError().code,
+ pOp->getNdbError().message);
+ pOp= schema_ndb->nextEvent();
+ }
+ }
+
+ if (res > 0)
+ {
+ DBUG_PRINT("info", ("pollEvents res: %d", res));
+#ifdef RUN_NDB_BINLOG_TIMER
+ Timer gci_timer, write_timer;
+ int event_count= 0;
+#endif
+ thd->proc_info= "Processing events";
+ NdbEventOperation *pOp= ndb->nextEvent();
+ Binlog_index_row row;
+ while (pOp != NULL)
+ {
+ ndb->
+ setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
+ ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
+
+ assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch);
+ bzero((char*) &row, sizeof(row));
+ injector::transaction trans= inj->new_trans(thd);
+ gci= pOp->getGCI();
+ if (apply_status_share)
+ {
+ TABLE *table= apply_status_share->table;
+ MY_BITMAP b;
+ uint32 bitbuf;
+ DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8);
+ bitmap_init(&b, &bitbuf, table->s->fields, false);
+ bitmap_set_all(&b);
+ table->field[0]->store((longlong)::server_id);
+ table->field[1]->store((longlong)gci);
+ trans.write_row(::server_id,
+ injector::transaction::table(table, true),
+ &b, table->s->fields,
+ table->record[0]);
+ }
+ else
+ {
+ sql_print_error("NDB: Could not get apply status share");
+ }
+#ifdef RUN_NDB_BINLOG_TIMER
+ write_timer.start();
+#endif
+ do
+ {
+#ifdef RUN_NDB_BINLOG_TIMER
+ event_count++;
+#endif
+ if (pOp->hasError() &&
+ ndb_binlog_thread_handle_error(ndb, pOp, row) < 0)
+ goto err;
+
+#ifndef DBUG_OFF
+ {
+ NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
+ DBUG_PRINT("info",
+ ("EVENT TYPE:%d GCI:%lld last applied: %lld "
+ "share: 0x%lx", pOp->getEventType(), gci,
+ ndb_latest_applied_binlog_epoch, share));
+ DBUG_ASSERT(share != 0);
+ }
+#endif
+ if ((unsigned) pOp->getEventType() <
+ (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
+ ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans);
+ else
+ ndb_binlog_thread_handle_non_data_event(ndb, pOp, row);
+
+ pOp= ndb->nextEvent();
+ } while (pOp && pOp->getGCI() == gci);
+
+ /*
+ note! pOp is not referring to an event in the next epoch
+ or is == 0
+ */
+#ifdef RUN_NDB_BINLOG_TIMER
+ write_timer.stop();
+#endif
+
+ if (row.n_inserts || row.n_updates
+ || row.n_deletes || row.n_schemaops)
+ {
+ injector::transaction::binlog_pos start= trans.start_pos();
+ if (int r= trans.commit())
+ {
+ sql_print_error("NDB binlog:"
+ "Error during COMMIT of GCI. Error: %d",
+ r);
+ /* TODO: Further handling? */
+ }
+ row.gci= gci;
+ row.master_log_file= start.file_name();
+ row.master_log_pos= start.file_pos();
+
+ DBUG_PRINT("info",("COMMIT gci %lld",gci));
+ if (ndb_update_binlog_index)
+ ndb_add_binlog_index(thd, &row);
+ ndb_latest_applied_binlog_epoch= gci;
+ }
+ else
+ trans.commit();
+ ndb_latest_handled_binlog_epoch= gci;
+#ifdef RUN_NDB_BINLOG_TIMER
+ gci_timer.stop();
+ sql_print_information("gci %ld event_count %d write time "
+ "%ld(%d e/s), total time %ld(%d e/s)",
+ (ulong)gci, event_count,
+ write_timer.elapsed_ms(),
+ event_count / write_timer.elapsed_ms(),
+ gci_timer.elapsed_ms(),
+ event_count / gci_timer.elapsed_ms());
+#endif
+ }
+ }
+
+ {
+ Cluster_replication_schema *schema;
+ while ((schema= schema_list.pop()))
+ {
+ char *thd_db_save= thd->db;
+ thd->db= schema->db;
+ thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
+ schema->query_length, FALSE,
+ schema->name[0] == 0);
+ thd->db= thd_db_save;
+ }
+ }
+ free_root(&mem_root, MYF(0));
+ *root_ptr= old_root;
+ ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
+ }
+err:
+ DBUG_PRINT("info",("Shutting down cluster binlog thread"));
+ close_thread_tables(thd);
+ pthread_mutex_lock(&injector_mutex);
+ /* don't mess with the injector_ndb anymore from other threads */
+ injector_ndb= 0;
+ pthread_mutex_unlock(&injector_mutex);
+ thd->db= 0; // as not to try to free memory
+ sql_print_information("Stopping Cluster Binlog");
+
+ if (apply_status_share)
+ free_share(&apply_status_share);
+ if (schema_share)
+ free_share(&schema_share);
+
+ /* remove all event operations */
+ if (ndb)
+ {
+ NdbEventOperation *op;
+ DBUG_PRINT("info",("removing all event operations"));
+ while ((op= ndb->getEventOperation()))
+ {
+ DBUG_PRINT("info",("removing event operation on %s",
+ op->getEvent()->getName()));
+ NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
+ free_share(&share);
+ ndb->dropEventOperation(op);
+ }
+ delete ndb;
+ ndb= 0;
+ }
+
+ // Placed here to avoid a memory leak; TODO: check if needed
+ net_end(&thd->net);
+ delete thd;
+
+ ndb_binlog_thread_running= -1;
+ (void) pthread_cond_signal(&injector_cond);
+
+ DBUG_PRINT("exit", ("ndb_binlog_thread"));
+ my_thread_end();
+
+ pthread_exit(0);
+ DBUG_RETURN(NULL);
+}
+
+bool
+ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
+ enum ha_stat_type stat_type)
+{
+ char buf[IO_SIZE];
+ uint buflen;
+ ulonglong ndb_latest_epoch= 0;
+ DBUG_ENTER("ndbcluster_show_status_binlog");
+
+ pthread_mutex_lock(&injector_mutex);
+ if (injector_ndb)
+ {
+ ndb_latest_epoch= injector_ndb->getLatestGCI();
+ pthread_mutex_unlock(&injector_mutex);
+
+ buflen=
+ snprintf(buf, sizeof(buf),
+ "latest_epoch=%llu, "
+ "latest_trans_epoch=%llu, "
+ "latest_received_binlog_epoch=%llu, "
+ "latest_handled_binlog_epoch=%llu, "
+ "latest_applied_binlog_epoch=%llu",
+ ndb_latest_epoch,
+ g_latest_trans_gci,
+ ndb_latest_received_binlog_epoch,
+ ndb_latest_handled_binlog_epoch,
+ ndb_latest_applied_binlog_epoch);
+ if (stat_print(thd, ndbcluster_hton.name, strlen(ndbcluster_hton.name),
+ "binlog", strlen("binlog"),
+ buf, buflen))
+ DBUG_RETURN(TRUE);
+ }
+ else
+ pthread_mutex_unlock(&injector_mutex);
+ DBUG_RETURN(FALSE);
+}
+
+#endif /* HAVE_NDB_BINLOG */
diff --git a/sql/ha_ndbcluster_binlog.h b/sql/ha_ndbcluster_binlog.h
new file mode 100644
index 00000000000..5334120b43f
--- /dev/null
+++ b/sql/ha_ndbcluster_binlog.h
@@ -0,0 +1,162 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+// Typedefs for long names
+typedef NdbDictionary::Object NDBOBJ;
+typedef NdbDictionary::Column NDBCOL;
+typedef NdbDictionary::Table NDBTAB;
+typedef NdbDictionary::Index NDBINDEX;
+typedef NdbDictionary::Dictionary NDBDICT;
+typedef NdbDictionary::Event NDBEVENT;
+
+#define IS_TMP_PREFIX(A) (is_prefix(A, tmp_file_prefix) || is_prefix(A, "@0023sql"))
+
+extern ulong ndb_extra_logging;
+
+#ifdef HAVE_NDB_BINLOG
+
+#define INJECTOR_EVENT_LEN 200
+
+enum SCHEMA_OP_TYPE
+{
+ SOT_DROP_TABLE,
+ SOT_CREATE_TABLE,
+ SOT_RENAME_TABLE,
+ SOT_ALTER_TABLE,
+ SOT_DROP_DB,
+ SOT_CREATE_DB,
+ SOT_ALTER_DB,
+ SOT_CLEAR_SLOCK
+};
+
+const uint max_ndb_nodes= 64; /* multiple of 32 */
+
+extern pthread_t ndb_binlog_thread;
+extern pthread_mutex_t injector_mutex;
+extern pthread_cond_t injector_cond;
+
+static const char *ha_ndb_ext=".ndb";
+static const char share_prefix[]= "./";
+
+extern unsigned char g_node_id_map[max_ndb_nodes];
+extern handlerton ndbcluster_hton;
+extern pthread_t ndb_util_thread;
+extern pthread_mutex_t LOCK_ndb_util_thread;
+extern pthread_cond_t COND_ndb_util_thread;
+extern int ndbcluster_util_inited;
+extern pthread_mutex_t ndbcluster_mutex;
+extern HASH ndbcluster_open_tables;
+extern Ndb_cluster_connection* g_ndb_cluster_connection;
+extern long ndb_number_of_storage_nodes;
+
+/*
+ Initialize the binlog part of the ndb handlerton
+*/
+void ndbcluster_binlog_init_handlerton();
+/*
+ Initialize the binlog part of the NDB_SHARE
+*/
+void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *table);
+
+int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
+ const char *db,
+ const char *table_name,
+ NDB_SHARE *share);
+int ndbcluster_create_event(Ndb *ndb, const NDBTAB *table,
+ const char *event_name, NDB_SHARE *share);
+int ndbcluster_create_event_ops(NDB_SHARE *share,
+ const NDBTAB *ndbtab,
+ const char *event_name);
+int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
+ const char *query, int query_length,
+ const char *db, const char *table_name,
+ uint32 ndb_table_id,
+ uint32 ndb_table_version,
+ enum SCHEMA_OP_TYPE type);
+int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
+ NDB_SHARE *share);
+void ndb_rep_event_name(String *event_name,
+ const char *db, const char *tbl);
+
+int ndbcluster_binlog_start();
+pthread_handler_t ndb_binlog_thread_func(void *arg);
+
+/*
+ table cluster_replication.apply_status
+*/
+void ndbcluster_setup_binlog_table_shares(THD *thd);
+extern NDB_SHARE *apply_status_share;
+extern NDB_SHARE *schema_share;
+
+extern THD *injector_thd;
+extern int ndb_binlog_thread_running;
+
+bool
+ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
+ enum ha_stat_type stat_type);
+
+/*
+ prototypes for ndb handler utility function also needed by
+ the ndb binlog code
+*/
+int ndbcluster_find_all_files(THD *thd);
+void ndb_unpack_record(TABLE *table, NdbValue *value,
+ MY_BITMAP *defined, byte *buf);
+
+NDB_SHARE *ndbcluster_get_share(const char *key,
+ TABLE *table,
+ bool create_if_not_exists,
+ bool have_lock);
+NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share);
+void ndbcluster_free_share(NDB_SHARE **share, bool have_lock);
+void ndbcluster_real_free_share(NDB_SHARE **share);
+int handle_trailing_share(NDB_SHARE *share);
+inline NDB_SHARE *get_share(const char *key,
+ TABLE *table,
+ bool create_if_not_exists= TRUE,
+ bool have_lock= FALSE)
+{
+ return ndbcluster_get_share(key, table, create_if_not_exists, have_lock);
+}
+
+inline NDB_SHARE *get_share(NDB_SHARE *share)
+{
+ return ndbcluster_get_share(share);
+}
+
+inline void free_share(NDB_SHARE **share, bool have_lock= FALSE)
+{
+ ndbcluster_free_share(share, have_lock);
+}
+
+inline void real_free_share(NDB_SHARE **share)
+{
+ ndbcluster_real_free_share(share);
+}
+
+inline
+Thd_ndb *
+get_thd_ndb(THD *thd) { return (Thd_ndb *) thd->ha_data[ndbcluster_hton.slot]; }
+
+inline
+void
+set_thd_ndb(THD *thd, Thd_ndb *thd_ndb) { thd->ha_data[ndbcluster_hton.slot]= thd_ndb; }
+
+Ndb* check_ndb_in_thd(THD* thd);
+
+
+#endif /* HAVE_NDB_BINLOG */
diff --git a/sql/ha_ndbcluster_tables.h b/sql/ha_ndbcluster_tables.h
new file mode 100644
index 00000000000..d726fd63e1d
--- /dev/null
+++ b/sql/ha_ndbcluster_tables.h
@@ -0,0 +1,21 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#define NDB_REP_DB "cluster_replication"
+#define NDB_REP_TABLE "binlog_index"
+#define NDB_APPLY_TABLE "apply_status"
+#define NDB_SCHEMA_TABLE "schema"
diff --git a/sql/handler.cc b/sql/handler.cc
index 2391c54971d..ead22b6b03f 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -2411,6 +2411,132 @@ int ha_table_exists_in_engine(THD* thd, const char* db, const char* name)
DBUG_RETURN(error);
}
+#ifdef HAVE_NDB_BINLOG
+/*
+ TODO: change this into a dynamic struct
+ List<handlerton> does not work as
+ 1. binlog_end is called when MEM_ROOT is gone
+ 2. cannot work with thd MEM_ROOT as memory should be freed
+*/
+#define MAX_HTON_LIST_ST 63
+struct hton_list_st
+{
+ handlerton *hton[MAX_HTON_LIST_ST];
+ uint sz;
+};
+
+struct binlog_func_st
+{
+ enum_binlog_func fn;
+ void *arg;
+};
+
+/*
+ Listing handlertons first to avoid recursive calls and deadlock
+*/
+static my_bool binlog_func_list(THD *thd, st_plugin_int *plugin, void *arg)
+{
+ hton_list_st *hton_list= (hton_list_st *)arg;
+ handlerton *hton= (handlerton *) plugin->plugin->info;
+ if (hton->state == SHOW_OPTION_YES && hton->binlog_func)
+ {
+ uint sz= hton_list->sz;
+ if (sz == MAX_HTON_LIST_ST-1)
+ {
+ /* list full */
+ return FALSE;
+ }
+ hton_list->hton[sz]= hton;
+ hton_list->sz= sz+1;
+ }
+ return FALSE;
+}
+
+static my_bool binlog_func_foreach(THD *thd, binlog_func_st *bfn)
+{
+ handlerton *hton;
+ hton_list_st hton_list;
+ hton_list.sz= 0;
+ plugin_foreach(thd, binlog_func_list,
+ MYSQL_STORAGE_ENGINE_PLUGIN, &hton_list);
+
+ uint i= 0, sz= hton_list.sz;
+ while(i < sz)
+ hton_list.hton[i++]->binlog_func(thd, bfn->fn, bfn->arg);
+ return FALSE;
+}
+
+int ha_reset_logs(THD *thd)
+{
+ binlog_func_st bfn= {BFN_RESET_LOGS, 0};
+ binlog_func_foreach(thd, &bfn);
+ return 0;
+}
+
+void ha_reset_slave(THD* thd)
+{
+ binlog_func_st bfn= {BFN_RESET_SLAVE, 0};
+ binlog_func_foreach(thd, &bfn);
+}
+
+void ha_binlog_wait(THD* thd)
+{
+ binlog_func_st bfn= {BFN_BINLOG_WAIT, 0};
+ binlog_func_foreach(thd, &bfn);
+}
+
+int ha_binlog_end(THD* thd)
+{
+ binlog_func_st bfn= {BFN_BINLOG_END, 0};
+ binlog_func_foreach(thd, &bfn);
+ return 0;
+}
+
+int ha_binlog_index_purge_file(THD *thd, const char *file)
+{
+ binlog_func_st bfn= {BFN_BINLOG_PURGE_FILE, (void *)file};
+ binlog_func_foreach(thd, &bfn);
+}
+
+struct binlog_log_query_st
+{
+ enum_binlog_command binlog_command;
+ const char *query;
+ uint query_length;
+ const char *db;
+ const char *table_name;
+};
+
+static my_bool binlog_log_query_handlerton(THD *thd,
+ st_plugin_int *plugin,
+ void *args)
+{
+ struct binlog_log_query_st *b= (struct binlog_log_query_st*)args;
+ handlerton *hton= (handlerton *) plugin->plugin->info;
+ if (hton->state == SHOW_OPTION_YES && hton->binlog_log_query)
+ hton->binlog_log_query(thd,
+ b->binlog_command,
+ b->query,
+ b->query_length,
+ b->db,
+ b->table_name);
+ return FALSE;
+}
+
+void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command,
+ const char *query, uint query_length,
+ const char *db, const char *table_name)
+{
+ struct binlog_log_query_st b;
+ b.binlog_command= binlog_command;
+ b.query= query;
+ b.query_length= query_length;
+ b.db= db;
+ b.table_name= table_name;
+ plugin_foreach(thd, binlog_log_query_handlerton,
+ MYSQL_STORAGE_ENGINE_PLUGIN, &b);
+}
+#endif
/*
Read the first row of a multi-range set.
@@ -2832,6 +2958,8 @@ template<class RowsEventT> int binlog_log_row(TABLE* table,
const byte *before_record,
const byte *after_record)
{
+ if (table->file->is_injective())
+ return 0;
bool error= 0;
THD *const thd= current_thd;
diff --git a/sql/handler.h b/sql/handler.h
index f6680679a35..1ed19b72331 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -205,6 +205,24 @@ enum row_type { ROW_TYPE_NOT_USED=-1, ROW_TYPE_DEFAULT, ROW_TYPE_FIXED,
ROW_TYPE_DYNAMIC, ROW_TYPE_COMPRESSED,
ROW_TYPE_REDUNDANT, ROW_TYPE_COMPACT };
+enum enum_binlog_func {
+ BFN_RESET_LOGS= 1,
+ BFN_RESET_SLAVE= 2,
+ BFN_BINLOG_WAIT= 3,
+ BFN_BINLOG_END= 4,
+ BFN_BINLOG_PURGE_FILE= 5
+};
+
+enum enum_binlog_command {
+ LOGCOM_CREATE_TABLE,
+ LOGCOM_ALTER_TABLE,
+ LOGCOM_RENAME_TABLE,
+ LOGCOM_DROP_TABLE,
+ LOGCOM_CREATE_DB,
+ LOGCOM_ALTER_DB,
+ LOGCOM_DROP_DB
+};
+
/* struct to hold information about the table that should be created */
/* Bits in used_fields */
@@ -420,7 +438,8 @@ typedef struct
handlerton structure version
*/
const int interface_version;
-#define MYSQL_HANDLERTON_INTERFACE_VERSION 0x0000
+/* last version change: 0x0001 in 5.1.6 */
+#define MYSQL_HANDLERTON_INTERFACE_VERSION 0x0001
/*
@@ -512,6 +531,15 @@ typedef struct
bool (*show_status)(THD *thd, stat_print_fn *print, enum ha_stat_type stat);
int (*alter_tablespace)(THD *thd, st_alter_tablespace *ts_info);
uint32 flags; /* global handler flags */
+ /*
+ Handlerton functions are not set in the different storage
+ engines static initialization. They are initialized at handler init.
+ Thus, leave them last in the struct.
+ */
+ int (*binlog_func)(THD *thd, enum_binlog_func fn, void *arg);
+ void (*binlog_log_query)(THD *thd, enum_binlog_command binlog_command,
+ const char *query, uint query_length,
+ const char *db, const char *table_name);
} handlerton;
extern const handlerton default_hton;
@@ -1195,6 +1223,12 @@ public:
virtual int ha_update_row(const byte * old_data, byte * new_data);
virtual int ha_delete_row(const byte * buf);
/*
+ If the handler does it's own injection of the rows, this member function
+ should return 'true'.
+ */
+ virtual bool is_injective() const { return false; }
+
+ /*
SYNOPSIS
start_bulk_update()
RETURN
@@ -1705,3 +1739,21 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht);
int ha_repl_report_sent_binlog(THD *thd, char *log_file_name,
my_off_t end_offset);
int ha_repl_report_replication_stop(THD *thd);
+
+#ifdef HAVE_NDB_BINLOG
+int ha_reset_logs(THD *thd);
+int ha_binlog_index_purge_file(THD *thd, const char *file);
+void ha_reset_slave(THD *thd);
+void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command,
+ const char *query, uint query_length,
+ const char *db, const char *table_name);
+void ha_binlog_wait(THD *thd);
+int ha_binlog_end(THD *thd);
+#else
+#define ha_reset_logs(a) 0
+#define ha_binlog_index_purge_file(a,b) 0
+#define ha_reset_slave(a)
+#define ha_binlog_log_query(a,b,c,d,e,f);
+#define ha_binlog_wait(a)
+#define ha_binlog_end(a) 0
+#endif
diff --git a/sql/log.cc b/sql/log.cc
index fe95419fffd..7232d3a24dd 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -988,6 +988,7 @@ bool MYSQL_LOG::reset_logs(THD* thd)
enum_log_type save_log_type;
DBUG_ENTER("reset_logs");
+ ha_reset_logs(thd);
/*
We need to get both locks to be sure that no one is trying to
write to the index log file.
@@ -1237,6 +1238,9 @@ int MYSQL_LOG::purge_logs(const char *to_log,
DBUG_PRINT("info",("purging %s",log_info.log_file_name));
if (!my_delete(log_info.log_file_name, MYF(0)) && decrease_log_space)
*decrease_log_space-= file_size;
+
+ ha_binlog_index_purge_file(current_thd, log_info.log_file_name);
+
if (find_next_log(&log_info, 0) || exit_loop)
break;
}
@@ -1297,6 +1301,9 @@ int MYSQL_LOG::purge_logs_before_date(time_t purge_time)
stat_area.st_mtime >= purge_time)
break;
my_delete(log_info.log_file_name, MYF(0));
+
+ ha_binlog_index_purge_file(current_thd, log_info.log_file_name);
+
if (find_next_log(&log_info, 0))
break;
}
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 712fff15774..04a6276d476 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -79,6 +79,20 @@ static void clear_all_errors(THD *thd, struct st_relay_log_info *rli)
inline int ignored_error_code(int err_code)
{
+#ifdef HAVE_NDB_BINLOG
+ /*
+ The following error codes are hard-coded and will always be ignored.
+ */
+ switch (err_code)
+ {
+ case ER_DB_CREATE_EXISTS:
+ case ER_DB_DROP_EXISTS:
+ return 1;
+ default:
+ /* Nothing to do */
+ break;
+ }
+#endif
return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
(use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
}
@@ -5276,7 +5290,8 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
{
slave_print_msg(ERROR_LEVEL, rli, error,
"Error in %s event: error during table %s.%s lock",
- get_type_str(), table->s->db, table->s->table_name);
+ get_type_str(), table->s->db.str,
+ table->s->table_name.str);
DBUG_RETURN(error);
}
/*
@@ -5412,7 +5427,12 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
if (error)
{ /* error has occured during the transaction */
- /*
+ slave_print_msg(ERROR_LEVEL, rli, error,
+ "Error in %s event: error during transaction execution "
+ "on table %s.%s",
+ get_type_str(), table->s->db.str,
+ table->s->table_name.str);
+ /*
If one day we honour --skip-slave-errors in row-based replication, and
the error should be skipped, then we would clear mappings, rollback,
close tables, but the slave SQL thread would not stop and then may
@@ -5485,7 +5505,8 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
slave_print_msg(ERROR_LEVEL, rli, error,
"Error in %s event: commit of row events failed, "
"table `%s`.`%s`",
- get_type_str(), table->s->db, table->s->table_name);
+ get_type_str(), table->s->db.str,
+ table->s->table_name.str);
DBUG_RETURN(error);
}
@@ -5585,8 +5606,8 @@ void Rows_log_event::pack_info(Protocol *protocol)
{
char buf[256];
char const *const flagstr= get_flags(STMT_END_F) ? "STMT_END_F" : "";
- char const *const dbnam= m_table->s->db;
- char const *const tblnam= m_table->s->table_name;
+ char const *const dbnam= m_table->s->db.str;
+ char const *const tblnam= m_table->s->table_name.str;
my_size_t bytes= snprintf(buf, sizeof(buf),
"%s.%s - %s", dbnam, tblnam, flagstr);
protocol->store(buf, bytes, &my_charset_bin);
@@ -6105,7 +6126,8 @@ int Write_rows_log_event::do_before_row_operations(TABLE *table)
*/
thd->lex->sql_command= SQLCOM_REPLACE;
- table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); // needed for ndbcluster
+ table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); // Needed for ndbcluster
+ table->file->extra(HA_EXTRA_IGNORE_NO_KEY); // Needed for ndbcluster
/*
TODO: the cluster team (Tomas?) says that it's better if the engine knows
how many rows are going to be inserted, then it can allocate needed memory
@@ -6373,6 +6395,9 @@ static int find_and_fetch_row(TABLE *table, byte *key, byte *record_buf)
DBUG_ASSERT(record_buf);
+ /* We need to retrieve all fields */
+ table->file->ha_set_all_bits_in_read_set();
+
if (table->s->keys > 0)
{
int error;
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index b3bc49b31d1..026234caf34 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -538,6 +538,7 @@ enum enum_mysql_completiontype {
COMMIT_RELEASE=-1, COMMIT=0, COMMIT_AND_CHAIN=6
};
+bool begin_trans(THD *thd);
int end_trans(THD *thd, enum enum_mysql_completiontype completion);
Item *negate_expression(THD *thd, Item *expr);
@@ -640,6 +641,7 @@ bool table_cache_init(void);
void table_cache_free(void);
bool table_def_init(void);
void table_def_free(void);
+void assign_new_table_id(TABLE *table);
uint cached_open_tables(void);
uint cached_table_definitions(void);
void kill_mysql(void);
@@ -1041,7 +1043,7 @@ bool is_equal(const LEX_STRING *a, const LEX_STRING *b);
bool remove_table_from_cache(THD *thd, const char *db, const char *table,
uint flags);
-bool close_cached_tables(THD *thd, bool wait_for_refresh, TABLE_LIST *tables);
+bool close_cached_tables(THD *thd, bool wait_for_refresh, TABLE_LIST *tables, bool have_lock = FALSE);
void copy_field_from_tmp_record(Field *field,int offset);
bool fill_record(THD *thd, Field **field, List<Item> &values,
bool ignore_errors);
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index cff969d4113..cf1a4c4c936 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -417,6 +417,11 @@ my_bool opt_ndb_shm, opt_ndb_optimized_node_selection;
ulong opt_ndb_cache_check_time;
const char *opt_ndb_mgmd;
ulong opt_ndb_nodeid;
+ulong ndb_extra_logging;
+#ifdef HAVE_NDB_BINLOG
+ulong ndb_report_thresh_binlog_epoch_slip;
+ulong ndb_report_thresh_binlog_mem_usage;
+#endif
extern SHOW_VAR ndb_status_variables[];
extern const char *ndb_distribution_names[];
@@ -1134,6 +1139,11 @@ void clean_up(bool print_message)
mysql_log.cleanup();
mysql_slow_log.cleanup();
+ /*
+ make sure that handlers finish up
+ what they have that is dependent on the binlog
+ */
+ ha_binlog_end(current_thd);
mysql_bin_log.cleanup();
#ifdef HAVE_REPLICATION
@@ -3106,11 +3116,12 @@ with --log-bin instead.");
}
if (opt_binlog_format_id == BF_UNSPECIFIED)
{
- /*
- We use statement-based by default, but could change this to be row-based
- if this is a cluster build (i.e. have_ndbcluster is true)...
- */
- opt_binlog_format_id= BF_STMT;
+#ifdef HAVE_NDB_BINLOG
+ if (have_ndbcluster == SHOW_OPTION_YES)
+ opt_binlog_format_id= BF_ROW;
+ else
+#endif
+ opt_binlog_format_id= BF_STMT;
}
#ifdef HAVE_ROW_BASED_REPLICATION
if (opt_binlog_format_id == BF_ROW)
@@ -4646,6 +4657,9 @@ enum options_mysqld
OPT_NDB_DISTRIBUTION,
OPT_NDB_INDEX_STAT_ENABLE,
OPT_NDB_INDEX_STAT_CACHE_ENTRIES, OPT_NDB_INDEX_STAT_UPDATE_FREQ,
+ OPT_NDB_EXTRA_LOGGING,
+ OPT_NDB_REPORT_THRESH_BINLOG_EPOCH_SLIP,
+ OPT_NDB_REPORT_THRESH_BINLOG_MEM_USAGE,
OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_COMPLETION_TYPE,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
@@ -4848,7 +4862,11 @@ Disable with --skip-bdb (will save memory).",
"Tell the master the form of binary logging to use: either 'row' for "
"row-based binary logging (which automatically turns on "
"innodb_locks_unsafe_for_binlog as it is safe in this case), or "
- "'statement' for statement-based logging. ",
+ "'statement' for statement-based logging. "
+#ifdef HAVE_NDB_BINLOG
+ "If ndbcluster is enabled, the default will be set to 'row'."
+#endif
+ ,
#else
"Tell the master the form of binary logging to use: this release build "
"supports only statement-based binary logging, so only 'statement' is "
@@ -5302,6 +5320,29 @@ Disable with --skip-ndbcluster (will save memory).",
(gptr*) &global_system_variables.ndb_force_send,
(gptr*) &global_system_variables.ndb_force_send,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
+ {"ndb-extra-logging", OPT_NDB_EXTRA_LOGGING,
+ "Turn on more logging in the error log.",
+ (gptr*) &ndb_extra_logging,
+ (gptr*) &ndb_extra_logging,
+ 0, GET_INT, OPT_ARG, 0, 0, 0, 0, 0, 0},
+#ifdef HAVE_NDB_BINLOG
+ {"ndb-report-thresh-binlog-epoch-slip", OPT_NDB_REPORT_THRESH_BINLOG_EPOCH_SLIP,
+ "Threshold on number of epochs to be behind before reporting binlog status. "
+ "E.g. 3 means that if the difference between what epoch has been received "
+ "from the storage nodes and what has been applied to the binlog is 3 or more, "
+ "a status message will be sent to the cluster log.",
+ (gptr*) &ndb_report_thresh_binlog_epoch_slip,
+ (gptr*) &ndb_report_thresh_binlog_epoch_slip,
+ 0, GET_ULONG, REQUIRED_ARG, 3, 0, 256, 0, 0, 0},
+ {"ndb-report-thresh-binlog-mem-usage", OPT_NDB_REPORT_THRESH_BINLOG_MEM_USAGE,
+ "Threshold on percentage of free memory before reporting binlog status. E.g. "
+ "10 means that if amount of available memory for receiving binlog data from "
+ "the storage nodes goes below 10%, "
+ "a status message will be sent to the cluster log.",
+ (gptr*) &ndb_report_thresh_binlog_mem_usage,
+ (gptr*) &ndb_report_thresh_binlog_mem_usage,
+ 0, GET_ULONG, REQUIRED_ARG, 10, 0, 100, 0, 0, 0},
+#endif
{"ndb-use-exact-count", OPT_NDB_USE_EXACT_COUNT,
"Use exact records count during query planning and for fast "
"select count(*), disable for faster queries.",
@@ -7500,6 +7541,14 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
}
opt_ndb_distribution_id= (enum ndb_distribution)(id-1);
break;
+ case OPT_NDB_EXTRA_LOGGING:
+ if (!argument)
+ ndb_extra_logging++;
+ else if (argument == disabled_my_option)
+ ndb_extra_logging= 0L;
+ else
+ ndb_extra_logging= atoi(argument);
+ break;
#endif
case OPT_INNODB:
#ifdef WITH_INNOBASE_STORAGE_ENGINE
diff --git a/sql/rpl_injector.cc b/sql/rpl_injector.cc
new file mode 100644
index 00000000000..a69dea9a158
--- /dev/null
+++ b/sql/rpl_injector.cc
@@ -0,0 +1,153 @@
+/*
+ Copyright (C) 2005 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "mysql_priv.h"
+#include "rpl_injector.h"
+#ifdef HAVE_ROW_BASED_REPLICATION
+
+/*
+ injector::transaction - member definitions
+*/
+
+/* inline since it's called below */
+inline
+injector::transaction::transaction(MYSQL_LOG *log, THD *thd)
+ : m_thd(thd)
+{
+ /*
+ Default initialization of m_start_pos (which initializes it to garbage).
+ We need to fill it in using the code below.
+ */
+ LOG_INFO log_info;
+ log->get_current_log(&log_info);
+ /* !!! binlog_pos does not follow RAII !!! */
+ m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
+ m_start_pos.m_file_pos= log_info.pos;
+
+ begin_trans(m_thd);
+}
+
+injector::transaction::~transaction()
+{
+ /* Needed since my_free expects a 'char*' (instead of 'void*'). */
+ char* const the_memory= const_cast<char*>(m_start_pos.m_file_name);
+
+ /*
+ We set the first character to null just to give all the copies of the
+ start position a (minimal) chance of seening that the memory is lost.
+ All assuming the my_free does not step over the memory, of course.
+ */
+ *the_memory= '\0';
+
+ my_free(the_memory, MYF(0));
+}
+
+int injector::transaction::commit()
+{
+ DBUG_ENTER("injector::transaction::commit()");
+ m_thd->binlog_flush_pending_rows_event(true);
+ end_trans(m_thd, COMMIT);
+ DBUG_RETURN(0);
+}
+
+
+int injector::transaction::write_row (server_id_type sid, table tbl,
+ MY_BITMAP const* cols, size_t colcnt,
+ record_type record)
+{
+ DBUG_ENTER("injector::transaction::write_row(...)");
+ m_thd->set_server_id(sid);
+ m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(),
+ cols, colcnt, record);
+ DBUG_RETURN(0);
+}
+
+
+int injector::transaction::delete_row(server_id_type sid, table tbl,
+ MY_BITMAP const* cols, size_t colcnt,
+ record_type record)
+{
+ DBUG_ENTER("injector::transaction::delete_row(...)");
+ m_thd->set_server_id(sid);
+ m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(),
+ cols, colcnt, record);
+ DBUG_RETURN(0);
+}
+
+
+int injector::transaction::update_row(server_id_type sid, table tbl,
+ MY_BITMAP const* cols, size_t colcnt,
+ record_type before, record_type after)
+{
+ DBUG_ENTER("injector::transaction::update_row(...)");
+ m_thd->set_server_id(sid);
+ m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
+ cols, colcnt, before, after);
+ DBUG_RETURN(0);
+}
+
+
+injector::transaction::binlog_pos injector::transaction::start_pos() const
+{
+ return m_start_pos;
+}
+
+
+/*
+ injector - member definitions
+*/
+
+/* This constructor is called below */
+inline injector::injector()
+{
+}
+
+static injector *s_injector= 0;
+injector *injector::instance()
+{
+ if (s_injector == 0)
+ s_injector= new injector;
+ /* "There can be only one [instance]" */
+ return s_injector;
+}
+
+
+
+injector::transaction injector::new_trans(THD *thd)
+{
+ DBUG_ENTER("injector::new_trans(THD*)");
+ /*
+ Currently, there is no alternative to using 'mysql_bin_log' since that
+ is hardcoded into the way the handler is using the binary log.
+ */
+ DBUG_RETURN(transaction(&mysql_bin_log, thd));
+}
+
+void injector::new_trans(THD *thd, injector::transaction *ptr)
+{
+ DBUG_ENTER("injector::new_trans(THD *, transaction *)");
+ /*
+ Currently, there is no alternative to using 'mysql_bin_log' since that
+ is hardcoded into the way the handler is using the binary log.
+ */
+ transaction trans(&mysql_bin_log, thd);
+ ptr->swap(trans);
+
+ DBUG_VOID_RETURN;
+}
+
+#endif
diff --git a/sql/rpl_injector.h b/sql/rpl_injector.h
new file mode 100644
index 00000000000..32d3fdd1a78
--- /dev/null
+++ b/sql/rpl_injector.h
@@ -0,0 +1,251 @@
+/*
+ Copyright (C) 2005 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef INJECTOR_H
+#define INJECTOR_H
+
+/* Pull in 'byte', 'my_off_t', and 'uint32' */
+#include <my_global.h>
+
+#ifdef HAVE_ROW_BASED_REPLICATION
+#include <my_bitmap.h>
+
+/* Forward declarations */
+class handler;
+class MYSQL_LOG;
+class st_table;
+
+typedef st_table TABLE;
+
+/*
+ Injector to inject rows into the MySQL server.
+
+ The injector class is used to notify the MySQL server of new rows that have
+ appeared outside of MySQL control.
+
+ The original purpose of this is to allow clusters---which handle replication
+ inside the cluster through other means---to insert new rows into binary log.
+ Note, however, that the injector should be used whenever rows are altered in
+ any manner that is outside of MySQL server visibility and which therefore
+ are not seen by the MySQL server.
+ */
+class injector
+{
+public:
+
+ /*
+ Get an instance of the injector.
+
+ DESCRIPTION
+ The injector is a Singleton, so this static function return the
+ available instance of the injector.
+
+ RETURN VALUE
+ A pointer to the available injector object.
+ */
+ static injector *instance();
+
+ /*
+ A transaction where rows can be added.
+
+ DESCRIPTION
+ The transaction class satisfy the **CopyConstructible** and
+ **Assignable** requirements. Note that the transaction is *not*
+ default constructible.
+ */
+ class transaction {
+ friend class injector;
+ public:
+ /* Convenience definitions */
+ typedef byte* record_type;
+ typedef uint32 server_id_type;
+
+ /*
+ Table reference.
+
+ RESPONSIBILITY
+
+ The class contains constructors to handle several forms of
+ references to tables. The constructors can implicitly be used to
+ construct references from, e.g., strings containing table names.
+
+ EXAMPLE
+
+ The class is intended to be used *by value*. Please, do not try to
+ construct objects of this type using 'new'; instead construct an
+ object, possibly a temporary object. For example:
+
+ injector::transaction::table tbl(share->table, true);
+ MY_BITMAP cols;
+ bitmap_init(&cols, NULL, (i + 7) / 8, false);
+ inj->write_row(::server_id, tbl, &cols, row_data);
+
+ or
+
+ MY_BITMAP cols;
+ bitmap_init(&cols, NULL, (i + 7) / 8, false);
+ inj->write_row(::server_id,
+ injector::transaction::table(share->table, true),
+ &cols, row_data);
+
+ This will work, be more efficient, and have greater chance of
+ inlining, not run the risk of losing pointers.
+
+ COLLABORATION
+
+ injector::transaction
+ Provide a flexible interface to the representation of tables.
+
+ */
+ class table
+ {
+ public:
+ table(TABLE *table, bool is_transactional)
+ : m_table(table), m_is_transactional(is_transactional)
+ {
+ }
+
+ char const *db_name() const { return m_table->s->db.str; }
+ char const *table_name() const { return m_table->s->table_name.str; }
+ TABLE *get_table() const { return m_table; }
+ bool is_transactional() const { return m_is_transactional; }
+
+ private:
+ TABLE *m_table;
+ bool m_is_transactional;
+ };
+
+ /*
+ Binlog position as a structure.
+ */
+ class binlog_pos {
+ friend class transaction;
+ public:
+ char const *file_name() const { return m_file_name; }
+ my_off_t file_pos() const { return m_file_pos; }
+
+ private:
+ char const *m_file_name;
+ my_off_t m_file_pos;
+ };
+
+ transaction() : m_thd(NULL) { }
+ transaction(transaction const&);
+ ~transaction();
+
+ /* Clear transaction, i.e., make calls to 'good()' return false. */
+ void clear() { m_thd= NULL; }
+
+ /* Is the transaction in a good state? */
+ bool good() const { return m_thd != NULL; }
+
+ /* Default assignment operator: standard implementation */
+ transaction& operator=(transaction t) {
+ swap(t);
+ return *this;
+ }
+
+ /*
+ Add a 'write row' entry to the transaction.
+ */
+ int write_row (server_id_type sid, table tbl,
+ MY_BITMAP const *cols, size_t colcnt,
+ record_type record);
+
+ /*
+ Add a 'delete row' entry to the transaction.
+ */
+ int delete_row(server_id_type sid, table tbl,
+ MY_BITMAP const *cols, size_t colcnt,
+ record_type record);
+
+ /*
+ Add an 'update row' entry to the transaction.
+ */
+ int update_row(server_id_type sid, table tbl,
+ MY_BITMAP const *cols, size_t colcnt,
+ record_type before, record_type after);
+
+ /*
+ Commit a transaction.
+
+ This member function will clean up after a sequence of *_row calls by,
+ for example, releasing resource and unlocking files.
+ */
+ int commit();
+
+ /*
+ Get the position for the start of the transaction.
+
+ Returns the position in the binary log of the first event in this
+ transaction. If no event is yet written, the position where the event
+ *will* be written is returned. This position is known, since a
+ new_transaction() will lock the binary log and prevent any other
+ writes to the binary log.
+ */
+ binlog_pos start_pos() const;
+
+ private:
+ /* Only the injector may construct these object */
+ transaction(MYSQL_LOG *, THD *);
+
+ void swap(transaction& o) {
+ /* std::swap(m_start_pos, o.m_start_pos); */
+ {
+ binlog_pos const tmp= m_start_pos;
+ m_start_pos= o.m_start_pos;
+ o.m_start_pos= tmp;
+ }
+
+ /* std::swap(m_thd, o.m_thd); */
+ {
+ THD* const tmp= m_thd;
+ m_thd= o.m_thd;
+ o.m_thd= tmp;
+ }
+ }
+
+ binlog_pos m_start_pos;
+ THD *m_thd;
+ };
+
+ /*
+ Create a new transaction. This member function will prepare for a
+ sequence of *_row calls by, for example, reserving resources and
+ locking files. There are two overloaded alternatives: one returning a
+ transaction by value and one using placement semantics. The following
+ two calls are equivalent, with the exception that the latter will
+ overwrite the transaction.
+
+ injector::transaction trans1= inj->new_trans(thd);
+
+ injector::transaction trans2;
+ inj->new_trans(thd, &trans);
+ */
+ transaction new_trans(THD *);
+ void new_trans(THD *, transaction *);
+
+private:
+ explicit injector();
+ ~injector() { } /* Nothing needs to be done */
+ injector(injector const&); /* You're not allowed to copy injector
+ instances.
+ */
+};
+
+#endif /* HAVE_ROW_BASED_REPLICATION */
+#endif /* INJECTOR_H */
diff --git a/sql/set_var.cc b/sql/set_var.cc
index f0b1779efc5..1ccd590171f 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -101,6 +101,11 @@ extern ulong srv_commit_concurrency;
/* WITH_NDBCLUSTER_STORAGE_ENGINE */
extern ulong ndb_cache_check_time;
+extern ulong ndb_extra_logging;
+#ifdef HAVE_NDB_BINLOG
+extern ulong ndb_report_thresh_binlog_epoch_slip;
+extern ulong ndb_report_thresh_binlog_mem_usage;
+#endif
@@ -481,6 +486,14 @@ sys_ndb_autoincrement_prefetch_sz("ndb_autoincrement_prefetch_sz",
&SV::ndb_autoincrement_prefetch_sz);
sys_var_thd_bool
sys_ndb_force_send("ndb_force_send", &SV::ndb_force_send);
+#ifdef HAVE_NDB_BINLOG
+sys_var_long_ptr
+sys_ndb_report_thresh_binlog_epoch_slip("ndb_report_thresh_binlog_epoch_slip",
+ &ndb_report_thresh_binlog_epoch_slip);
+sys_var_long_ptr
+sys_ndb_report_thresh_binlog_mem_usage("ndb_report_thresh_binlog_mem_usage",
+ &ndb_report_thresh_binlog_mem_usage);
+#endif
sys_var_thd_bool
sys_ndb_use_exact_count("ndb_use_exact_count", &SV::ndb_use_exact_count);
sys_var_thd_bool
@@ -496,6 +509,8 @@ sys_ndb_index_stat_cache_entries("ndb_index_stat_cache_entries",
sys_var_thd_ulong
sys_ndb_index_stat_update_freq("ndb_index_stat_update_freq",
&SV::ndb_index_stat_update_freq);
+sys_var_long_ptr
+sys_ndb_extra_logging("ndb_extra_logging", &ndb_extra_logging);
/* Time/date/datetime formats */
@@ -847,10 +862,17 @@ SHOW_VAR init_vars[]= {
{sys_ndb_autoincrement_prefetch_sz.name,
(char*) &sys_ndb_autoincrement_prefetch_sz, SHOW_SYS},
{sys_ndb_cache_check_time.name,(char*) &sys_ndb_cache_check_time, SHOW_SYS},
+ {sys_ndb_extra_logging.name,(char*) &sys_ndb_extra_logging, SHOW_SYS},
{sys_ndb_force_send.name, (char*) &sys_ndb_force_send, SHOW_SYS},
{sys_ndb_index_stat_cache_entries.name, (char*) &sys_ndb_index_stat_cache_entries, SHOW_SYS},
{sys_ndb_index_stat_enable.name, (char*) &sys_ndb_index_stat_enable, SHOW_SYS},
{sys_ndb_index_stat_update_freq.name, (char*) &sys_ndb_index_stat_update_freq, SHOW_SYS},
+#ifdef HAVE_NDB_BINLOG
+ {sys_ndb_report_thresh_binlog_epoch_slip.name,
+ (char*) &sys_ndb_report_thresh_binlog_epoch_slip, SHOW_SYS},
+ {sys_ndb_report_thresh_binlog_mem_usage.name,
+ (char*) &sys_ndb_report_thresh_binlog_mem_usage, SHOW_SYS},
+#endif
{sys_ndb_use_exact_count.name,(char*) &sys_ndb_use_exact_count, SHOW_SYS},
{sys_ndb_use_transactions.name,(char*) &sys_ndb_use_transactions, SHOW_SYS},
{sys_net_buffer_length.name,(char*) &sys_net_buffer_length, SHOW_SYS},
diff --git a/sql/slave.cc b/sql/slave.cc
index 99bddb7b9b0..41a13f2f5c5 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -39,7 +39,7 @@ typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
volatile bool slave_sql_running = 0, slave_io_running = 0;
char* slave_load_tmpdir = 0;
-MASTER_INFO *active_mi;
+MASTER_INFO *active_mi= 0;
my_bool replicate_same_server_id;
ulonglong relay_log_space_limit = 0;
@@ -2885,6 +2885,47 @@ bool st_relay_log_info::cached_charset_compare(char *charset)
return 0;
}
+/*
+ Check if the current error is of temporary nature of not.
+ Some errors are temporary in nature, such as
+ ER_LOCK_DEADLOCK and ER_LOCK_WAIT_TIMEOUT. Ndb also signals
+ that the error is temporary by pushing a warning with the error code
+ ER_GET_TEMPORARY_ERRMSG, if the originating error is temporary.
+*/
+static int has_temporary_error(THD *thd)
+{
+ if (thd->is_fatal_error)
+ return 0;
+
+ /*
+ Temporary error codes:
+ currently, InnoDB deadlock detected by InnoDB or lock
+ wait timeout (innodb_lock_wait_timeout exceeded
+ */
+ if (thd->net.last_errno == ER_LOCK_DEADLOCK ||
+ thd->net.last_errno == ER_LOCK_WAIT_TIMEOUT)
+ return 1;
+
+#ifdef HAVE_NDB_BINLOG
+ /*
+ currently temporary error set in ndbcluster
+ */
+ List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
+ MYSQL_ERROR *err;
+ while ((err= it++))
+ {
+ DBUG_PRINT("info", ("has warning %d %s", err->code, err->msg))
+ switch (err->code)
+ {
+ case ER_GET_TEMPORARY_ERRMSG:
+ return 1;
+ default:
+ break;
+ }
+ }
+#endif
+ return 0;
+}
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
{
@@ -3004,6 +3045,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
ev->when = time(NULL);
ev->thd = thd; // because up to this point, ev->thd == 0
exec_res = ev->exec_event(rli);
+ DBUG_PRINT("info", ("exec_event result = %d", exec_res));
DBUG_ASSERT(rli->sql_thd==thd);
/*
Format_description_log_event should not be deleted because it will be
@@ -3017,17 +3059,13 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
}
if (slave_trans_retries)
{
- if (exec_res &&
- (thd->net.last_errno == ER_LOCK_DEADLOCK ||
- thd->net.last_errno == ER_LOCK_WAIT_TIMEOUT) &&
- !thd->is_fatal_error)
+ if (exec_res && has_temporary_error(thd))
{
const char *errmsg;
/*
We were in a transaction which has been rolled back because of a
- deadlock (currently, InnoDB deadlock detected by InnoDB) or lock
- wait timeout (innodb_lock_wait_timeout exceeded); let's seek back to
- BEGIN log event and retry it all again.
+ temporary error;
+ let's seek back to BEGIN log event and retry it all again.
We have to not only seek but also
a) init_master_info(), to seek back to hot relay log's start for later
(for when we will come back to this hot log after re-processing the
@@ -3539,10 +3577,39 @@ Slave SQL thread aborted. Can't execute init_slave query");
{
// do not scare the user if SQL thread was simply killed or stopped
if (!sql_slave_killed(thd,rli))
+ {
+ /*
+ retrieve as much info as possible from the thd and, error codes and warnings
+ and print this to the error log as to allow the user to locate the error
+ */
+ if (thd->net.last_errno != 0)
+ {
+ if (rli->last_slave_errno == 0)
+ {
+ slave_print_msg(ERROR_LEVEL, rli, thd->net.last_errno,
+ thd->net.last_error ?
+ thd->net.last_error : "<no message>");
+ }
+ else if (rli->last_slave_errno != thd->net.last_errno)
+ {
+ sql_print_error("Slave (additional info): %s Error_code: %d",
+ thd->net.last_error ?
+ thd->net.last_error : "<no message>",
+ thd->net.last_errno);
+ }
+ }
+
+ /* Print any warnings issued */
+ List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
+ MYSQL_ERROR *err;
+ while ((err= it++))
+ sql_print_warning("Slave: %s Error_code: %d",err->msg, err->code);
+
sql_print_error("\
Error running query, slave SQL thread aborted. Fix the problem, and restart \
the slave SQL thread with \"SLAVE START\". We stopped at log \
'%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
+ }
goto err;
}
}
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 74a5848fa0a..85c5a481d47 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -803,13 +803,14 @@ void free_io_cache(TABLE *table)
*/
bool close_cached_tables(THD *thd, bool if_wait_for_refresh,
- TABLE_LIST *tables)
+ TABLE_LIST *tables, bool have_lock)
{
bool result=0;
DBUG_ENTER("close_cached_tables");
DBUG_ASSERT(thd || (!if_wait_for_refresh && !tables));
- VOID(pthread_mutex_lock(&LOCK_open));
+ if (!have_lock)
+ VOID(pthread_mutex_lock(&LOCK_open));
if (!tables)
{
refresh_version++; // Force close of open tables
@@ -888,7 +889,8 @@ bool close_cached_tables(THD *thd, bool if_wait_for_refresh,
for (TABLE *table=thd->open_tables; table ; table= table->next)
table->s->version= refresh_version;
}
- VOID(pthread_mutex_unlock(&LOCK_open));
+ if (!have_lock)
+ VOID(pthread_mutex_unlock(&LOCK_open));
if (if_wait_for_refresh)
{
pthread_mutex_lock(&thd->mysys_var->mutex);
@@ -2383,7 +2385,7 @@ void abort_locked_tables(THD *thd,const char *db, const char *table_name)
table->s->table_map_id is not ULONG_MAX.
*/
-static void assign_new_table_id(TABLE *table)
+void assign_new_table_id(TABLE *table)
{
static ulong last_table_id= ULONG_MAX;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index c56924774ba..cb8c2818a19 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1383,6 +1383,7 @@ public:
#define SYSTEM_THREAD_DELAYED_INSERT 1
#define SYSTEM_THREAD_SLAVE_IO 2
#define SYSTEM_THREAD_SLAVE_SQL 4
+#define SYSTEM_THREAD_NDBCLUSTER_BINLOG 8
/*
Used to hold information about file and file structure in exchainge
diff --git a/sql/sql_db.cc b/sql/sql_db.cc
index d91f091174f..fa01f98d723 100644
--- a/sql/sql_db.cc
+++ b/sql/sql_db.cc
@@ -401,6 +401,7 @@ bool mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info,
bool silent)
{
char path[FN_REFLEN+16];
+ char tmp_query[FN_REFLEN+16];
long result= 1;
int error= 0;
MY_STAT stat_info;
@@ -486,15 +487,20 @@ bool mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info,
if (!thd->query) // Only in replication
{
- query= path;
- query_length= (uint) (strxmov(path,"create database `", db, "`", NullS) -
- path);
+ query= tmp_query;
+ query_length= (uint) (strxmov(tmp_query,"create database `",
+ db, "`", NullS) - tmp_query);
}
else
{
query= thd->query;
query_length= thd->query_length;
}
+
+ ha_binlog_log_query(thd, LOGCOM_CREATE_DB,
+ query, query_length,
+ db, "");
+
if (mysql_bin_log.is_open())
{
Query_log_event qinfo(thd, query, query_length, 0,
@@ -569,6 +575,10 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info)
thd->variables.collation_database= thd->db_charset;
}
+ ha_binlog_log_query(thd, LOGCOM_ALTER_DB,
+ thd->query, thd->query_length,
+ db, "");
+
if (mysql_bin_log.is_open())
{
Query_log_event qinfo(thd, thd->query, thd->query_length, 0,
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index cb115adaffb..8238496175c 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -79,7 +79,7 @@ const char *command_name[]={
"Connect","Kill","Debug","Ping","Time","Delayed insert","Change user",
"Binlog Dump","Table Dump", "Connect Out", "Register Slave",
"Prepare", "Execute", "Long Data", "Close stmt",
- "Reset stmt", "Set option", "Fetch",
+ "Reset stmt", "Set option", "Fetch", "Daemon",
"Error" // Last command number
};
@@ -149,7 +149,7 @@ static bool end_active_trans(THD *thd)
DBUG_RETURN(error);
}
-static bool begin_trans(THD *thd)
+bool begin_trans(THD *thd)
{
int error=0;
if (unlikely(thd->in_sub_stmt))
@@ -6682,6 +6682,8 @@ void kill_one_thread(THD *thd, ulong id, bool only_kill_query)
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
+ if (tmp->command == COM_DAEMON)
+ continue;
if (tmp->thread_id == id)
{
pthread_mutex_lock(&tmp->LOCK_delete); // Lock from delete
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index dd70f90b3da..ed056f62fe3 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -972,6 +972,9 @@ int reset_slave(THD *thd, MASTER_INFO* mi)
error=1;
goto err;
}
+
+ ha_reset_slave(thd);
+
// delete relay logs, clear relay log coordinates
if ((error= purge_relay_logs(&mi->rli, thd,
1 /* just reset */,
@@ -1316,6 +1319,13 @@ bool mysql_show_binlog_events(THD* thd)
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
+ /*
+ Wait for handlers to insert any pending information
+ into the binlog. For e.g. ndb which updates the binlog asynchronously
+ this is needed so that the uses sees all its own commands in the binlog
+ */
+ ha_binlog_wait(thd);
+
if (mysql_bin_log.is_open())
{
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;