summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <serg@serg.mylan>2006-04-13 15:34:39 +0200
committerunknown <serg@serg.mylan>2006-04-13 15:34:39 +0200
commitae040f25312501c62afd84d946d842dc10d9aabd (patch)
tree23592aeabac418e16afb94d05e61bd145c0d8ee0 /sql
parent720fe0c950a8151fddeb701567e8a6f6ab4d6742 (diff)
parent72d64fefd4d52808e5bd9b24bdee72182828e4db (diff)
downloadmariadb-git-ae040f25312501c62afd84d946d842dc10d9aabd.tar.gz
Merge bk-internal.mysql.com:/home/bk/mysql-5.1-new
into serg.mylan:/usr/home/serg/Abk/mysql-5.1
Diffstat (limited to 'sql')
-rw-r--r--sql/ha_ndbcluster.cc102
-rw-r--r--sql/ha_ndbcluster_binlog.cc270
-rw-r--r--sql/ha_ndbcluster_binlog.h6
3 files changed, 272 insertions, 106 deletions
diff --git a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
index 28e026b8a10..587eabb82d2 100644
--- a/sql/ha_ndbcluster.cc
+++ b/sql/ha_ndbcluster.cc
@@ -4489,6 +4489,21 @@ int ha_ndbcluster::create(const char *name,
DBUG_RETURN(my_errno);
}
+#ifdef HAVE_NDB_BINLOG
+ /*
+ Don't allow table creation unless
+ schema distribution table is setup
+ ( unless it is a creation of the schema dist table itself )
+ */
+ if (!schema_share &&
+ !(strcmp(m_dbname, NDB_REP_DB) == 0 &&
+ strcmp(m_tabname, NDB_SCHEMA_TABLE) == 0))
+ {
+ DBUG_PRINT("info", ("Schema distribution table not setup"));
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ }
+#endif /* HAVE_NDB_BINLOG */
+
DBUG_PRINT("table", ("name: %s", m_tabname));
tab.setName(m_tabname);
tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));
@@ -5027,7 +5042,8 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
is_old_table_tmpfile= 0;
String event_name(INJECTOR_EVENT_LEN);
ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0);
- ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share);
+ ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share,
+ "rename table");
}
if (!result && !IS_TMP_PREFIX(new_tabname))
@@ -5111,6 +5127,15 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
NDBDICT *dict= ndb->getDictionary();
#ifdef HAVE_NDB_BINLOG
+ /*
+ Don't allow drop table unless
+ schema distribution table is setup
+ */
+ if (!schema_share)
+ {
+ DBUG_PRINT("info", ("Schema distribution table not setup"));
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ }
NDB_SHARE *share= get_share(path, 0, false);
#endif
@@ -5179,7 +5204,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0);
ndbcluster_handle_drop_table(ndb,
table_dropped ? event_name.c_ptr() : 0,
- share);
+ share, "delete table");
}
if (share)
@@ -5208,6 +5233,18 @@ int ha_ndbcluster::delete_table(const char *name)
set_dbname(name);
set_tabname(name);
+#ifdef HAVE_NDB_BINLOG
+ /*
+ Don't allow drop table unless
+ schema distribution table is setup
+ */
+ if (!schema_share)
+ {
+ DBUG_PRINT("info", ("Schema distribution table not setup"));
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ }
+#endif
+
if (check_ndb_connection())
DBUG_RETURN(HA_ERR_NO_CONNECTION);
@@ -5429,6 +5466,11 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
if (!res)
info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
+#ifdef HAVE_NDB_BINLOG
+ if (!ndb_binlog_tables_inited && ndb_binlog_running)
+ table->db_stat|= HA_READ_ONLY;
+#endif
+
DBUG_RETURN(res);
}
@@ -5727,6 +5769,19 @@ int ndbcluster_drop_database_impl(const char *path)
static void ndbcluster_drop_database(char *path)
{
+ DBUG_ENTER("ndbcluster_drop_database");
+#ifdef HAVE_NDB_BINLOG
+ /*
+ Don't allow drop database unless
+ schema distribution table is setup
+ */
+ if (!schema_share)
+ {
+ DBUG_PRINT("info", ("Schema distribution table not setup"));
+ DBUG_VOID_RETURN;
+ //DBUG_RETURN(HA_ERR_NO_CONNECTION);
+ }
+#endif
ndbcluster_drop_database_impl(path);
#ifdef HAVE_NDB_BINLOG
char db[FN_REFLEN];
@@ -5735,6 +5790,7 @@ static void ndbcluster_drop_database(char *path)
current_thd->query, current_thd->query_length,
db, "", 0, 0, SOT_DROP_DB);
#endif
+ DBUG_VOID_RETURN;
}
/*
find all tables in ndb and discover those needed
@@ -5756,36 +5812,37 @@ int ndbcluster_find_all_files(THD *thd)
DBUG_ENTER("ndbcluster_find_all_files");
Ndb* ndb;
char key[FN_REFLEN];
- NdbDictionary::Dictionary::List list;
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
NDBDICT *dict= ndb->getDictionary();
- int unhandled, retries= 5;
+ int unhandled, retries= 5, skipped;
do
{
+ NdbDictionary::Dictionary::List list;
if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
ERR_RETURN(dict->getNdbError());
unhandled= 0;
+ skipped= 0;
+ retries--;
for (uint i= 0 ; i < list.count ; i++)
{
NDBDICT::List::Element& elmt= list.elements[i];
- int do_handle_table= 0;
if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name))
{
DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
continue;
}
DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
- if (elmt.state == NDBOBJ::StateOnline ||
- elmt.state == NDBOBJ::StateBackup)
- do_handle_table= 1;
- else if (!(elmt.state == NDBOBJ::StateBuilding))
+ if (elmt.state != NDBOBJ::StateOnline &&
+ elmt.state != NDBOBJ::StateBackup &&
+ elmt.state != NDBOBJ::StateBuilding)
{
sql_print_information("NDB: skipping setup table %s.%s, in state %d",
elmt.database, elmt.name, elmt.state);
+ skipped++;
continue;
}
@@ -5794,7 +5851,7 @@ int ndbcluster_find_all_files(THD *thd)
if (!(ndbtab= dict->getTable(elmt.name)))
{
- if (do_handle_table)
+ if (retries == 0)
sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
elmt.database, elmt.name,
dict->getNdbError().code,
@@ -5863,9 +5920,9 @@ int ndbcluster_find_all_files(THD *thd)
pthread_mutex_unlock(&LOCK_open);
}
}
- while (unhandled && retries--);
+ while (unhandled && retries);
- DBUG_RETURN(0);
+ DBUG_RETURN(-(skipped + unhandled));
}
int ndbcluster_find_files(THD *thd,const char *db,const char *path,
@@ -7729,6 +7786,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
pthread_cond_wait(&COND_server_started, &LOCK_server_started);
pthread_mutex_unlock(&LOCK_server_started);
+ ndbcluster_util_inited= 1;
+
/*
Wait for cluster to start
*/
@@ -7760,6 +7819,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
}
#ifdef HAVE_NDB_BINLOG
+ if (ndb_extra_logging && ndb_binlog_running)
+ sql_print_information("NDB Binlog: Ndb tables initially read only.");
/* create tables needed by the replication */
ndbcluster_setup_binlog_table_shares(thd);
#else
@@ -7769,17 +7830,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
ndbcluster_find_all_files(thd);
#endif
- ndbcluster_util_inited= 1;
-
-#ifdef HAVE_NDB_BINLOG
- /* Signal injector thread that all is setup */
- pthread_cond_signal(&injector_cond);
-#endif
-
set_timespec(abstime, 0);
for (;!abort_loop;)
{
-
pthread_mutex_lock(&LOCK_ndb_util_thread);
pthread_cond_timedwait(&COND_ndb_util_thread,
&LOCK_ndb_util_thread,
@@ -7797,7 +7850,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
Check that the apply_status_share and schema_share has been created.
If not try to create it
*/
- if (!apply_status_share || !schema_share)
+ if (!ndb_binlog_tables_inited)
ndbcluster_setup_binlog_table_shares(thd);
#endif
@@ -10052,14 +10105,15 @@ static int ndbcluster_fill_files_table(THD *thd, TABLE_LIST *tables, COND *cond)
}
}
- dict->listObjects(dflist, NdbDictionary::Object::Undofile);
+ NdbDictionary::Dictionary::List uflist;
+ dict->listObjects(uflist, NdbDictionary::Object::Undofile);
ndberr= dict->getNdbError();
if (ndberr.classification != NdbError::NoError)
ERR_RETURN(ndberr);
- for (i= 0; i < dflist.count; i++)
+ for (i= 0; i < uflist.count; i++)
{
- NdbDictionary::Dictionary::List::Element& elt= dflist.elements[i];
+ NdbDictionary::Dictionary::List::Element& elt= uflist.elements[i];
Ndb_cluster_connection_node_iter iter;
unsigned id;
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
index 60ccb661703..79e4fc790e0 100644
--- a/sql/ha_ndbcluster_binlog.cc
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -48,6 +48,7 @@ int ndb_binlog_thread_running= 0;
FALSE if not
*/
my_bool ndb_binlog_running= FALSE;
+my_bool ndb_binlog_tables_inited= FALSE;
/*
Global reference to the ndb injector thread THD oject
@@ -775,32 +776,50 @@ static int ndbcluster_create_schema_table(THD *thd)
DBUG_RETURN(0);
}
-void ndbcluster_setup_binlog_table_shares(THD *thd)
+int ndbcluster_setup_binlog_table_shares(THD *thd)
{
- int done_find_all_files= 0;
if (!schema_share &&
ndbcluster_check_schema_share() == 0)
{
- if (!done_find_all_files)
+ pthread_mutex_lock(&LOCK_open);
+ ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
+ pthread_mutex_unlock(&LOCK_open);
+ if (!schema_share)
{
- ndbcluster_find_all_files(thd);
- done_find_all_files= 1;
+ ndbcluster_create_schema_table(thd);
+ // always make sure we create the 'schema' first
+ if (!schema_share)
+ return 1;
}
- ndbcluster_create_schema_table(thd);
- // always make sure we create the 'schema' first
- if (!schema_share)
- return;
}
if (!apply_status_share &&
ndbcluster_check_apply_status_share() == 0)
{
- if (!done_find_all_files)
+ pthread_mutex_lock(&LOCK_open);
+ ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
+ pthread_mutex_unlock(&LOCK_open);
+ if (!apply_status_share)
{
- ndbcluster_find_all_files(thd);
- done_find_all_files= 1;
+ ndbcluster_create_apply_status_table(thd);
+ if (!apply_status_share)
+ return 1;
}
- ndbcluster_create_apply_status_table(thd);
}
+ if (!ndbcluster_find_all_files(thd))
+ {
+ pthread_mutex_lock(&LOCK_open);
+ ndb_binlog_tables_inited= TRUE;
+ if (ndb_binlog_running)
+ {
+ if (ndb_extra_logging)
+ sql_print_information("NDB Binlog: ndb tables writable");
+ close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
+ }
+ pthread_mutex_unlock(&LOCK_open);
+ /* Signal injector thread that all is setup */
+ pthread_cond_signal(&injector_cond);
+ }
+ return 0;
}
/*
@@ -936,6 +955,31 @@ static char *ndb_pack_varchar(const NDBCOL *col, char *buf,
/*
log query in schema table
*/
+static void ndb_report_waiting(const char *key,
+ int the_time,
+ const char *op,
+ const char *obj)
+{
+ ulonglong ndb_latest_epoch= 0;
+ const char *proc_info= "<no info>";
+ pthread_mutex_lock(&injector_mutex);
+ if (injector_ndb)
+ ndb_latest_epoch= injector_ndb->getLatestGCI();
+ if (injector_thd)
+ proc_info= injector_thd->proc_info;
+ pthread_mutex_unlock(&injector_mutex);
+ sql_print_information("NDB %s:"
+ " waiting max %u sec for %s %s."
+ " epochs: (%u,%u,%u)"
+ " injector proc_info: %s"
+ ,key, the_time, op, obj
+ ,(uint)ndb_latest_handled_binlog_epoch
+ ,(uint)ndb_latest_received_binlog_epoch
+ ,(uint)ndb_latest_epoch
+ ,proc_info
+ );
+}
+
int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
const char *query, int query_length,
const char *db, const char *table_name,
@@ -965,6 +1009,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
}
char tmp_buf2[FN_REFLEN];
+ const char *type_str;
switch (type)
{
case SOT_DROP_TABLE:
@@ -975,6 +1020,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
query= tmp_buf2;
query_length= (uint) (strxmov(tmp_buf2, "drop table `",
table_name, "`", NullS) - tmp_buf2);
+ type_str= "drop table";
break;
case SOT_RENAME_TABLE:
/* redo the rename table query as is may contain several tables */
@@ -982,20 +1028,28 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
query_length= (uint) (strxmov(tmp_buf2, "rename table `",
old_db, ".", old_table_name, "` to `",
db, ".", table_name, "`", NullS) - tmp_buf2);
+ type_str= "rename table";
break;
case SOT_CREATE_TABLE:
- // fall through
+ type_str= "create table";
+ break;
case SOT_ALTER_TABLE:
+ type_str= "create table";
break;
case SOT_DROP_DB:
+ type_str= "drop db";
break;
case SOT_CREATE_DB:
+ type_str= "create db";
break;
case SOT_ALTER_DB:
+ type_str= "alter db";
break;
case SOT_TABLESPACE:
+ type_str= "tablespace";
break;
case SOT_LOGFILE_GROUP:
+ type_str= "logfile group";
break;
default:
abort(); /* should not happen, programming error */
@@ -1174,9 +1228,9 @@ end:
struct timespec abstime;
int i;
set_timespec(abstime, 1);
- (void) pthread_cond_timedwait(&injector_cond,
- &ndb_schema_object->mutex,
- &abstime);
+ int ret= pthread_cond_timedwait(&injector_cond,
+ &ndb_schema_object->mutex,
+ &abstime);
(void) pthread_mutex_lock(&schema_share->mutex);
for (i= 0; i < ndb_number_of_storage_nodes; i++)
@@ -1198,16 +1252,19 @@ end:
if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
break;
- max_timeout--;
- if (max_timeout == 0)
+ if (ret)
{
- sql_print_error("NDB create table: timed out. Ignoring...");
- break;
+ max_timeout--;
+ if (max_timeout == 0)
+ {
+ sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
+ type_str, ndb_schema_object->key);
+ break;
+ }
+ if (ndb_extra_logging)
+ ndb_report_waiting(type_str, max_timeout,
+ "distributing", ndb_schema_object->key);
}
- if (ndb_extra_logging)
- sql_print_information("NDB create table: "
- "waiting max %u sec for create table %s.",
- max_timeout, ndb_schema_object->key);
}
(void) pthread_mutex_unlock(&ndb_schema_object->mutex);
}
@@ -1509,9 +1566,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
share= 0;
pOp->setCustomData(0);
-
+
pthread_mutex_lock(&injector_mutex);
- injector_ndb->dropEventOperation(pOp);
+ ndb->dropEventOperation(pOp);
pOp= 0;
pthread_mutex_unlock(&injector_mutex);
@@ -1689,9 +1746,15 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
// skip
break;
case NDBEVENT::TE_CLUSTER_FAILURE:
+ // fall through
case NDBEVENT::TE_DROP:
+ if (ndb_extra_logging &&
+ ndb_binlog_tables_inited && ndb_binlog_running)
+ sql_print_information("NDB Binlog: ndb tables initially "
+ "read only on reconnect.");
free_share(&schema_share);
schema_share= 0;
+ ndb_binlog_tables_inited= FALSE;
// fall through
case NDBEVENT::TE_ALTER:
ndb_handle_schema_change(thd, ndb, pOp, tmp_share);
@@ -2385,7 +2448,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
}
if (!op)
{
- pthread_mutex_unlock(&injector_mutex);
sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
" %s",event_name);
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
@@ -2393,6 +2455,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
ndb->getNdbError().code,
ndb->getNdbError().message,
"NDB");
+ pthread_mutex_unlock(&injector_mutex);
DBUG_RETURN(-1);
}
@@ -2494,9 +2557,15 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
get_share(share);
if (do_apply_status_share)
+ {
apply_status_share= get_share(share);
+ (void) pthread_cond_signal(&injector_cond);
+ }
else if (do_schema_share)
+ {
schema_share= get_share(share);
+ (void) pthread_cond_signal(&injector_cond);
+ }
DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u",
share->key, share->op, share->use_count));
@@ -2513,7 +2582,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
*/
int
ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
- NDB_SHARE *share)
+ NDB_SHARE *share, const char *type_str)
{
DBUG_ENTER("ndbcluster_handle_drop_table");
@@ -2569,21 +2638,24 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
{
struct timespec abstime;
set_timespec(abstime, 1);
- (void) pthread_cond_timedwait(&injector_cond,
- &share->mutex,
- &abstime);
- max_timeout--;
+ int ret= pthread_cond_timedwait(&injector_cond,
+ &share->mutex,
+ &abstime);
if (share->op == 0)
break;
- if (max_timeout == 0)
+ if (ret)
{
- sql_print_error("NDB delete table: timed out. Ignoring...");
- break;
+ max_timeout--;
+ if (max_timeout == 0)
+ {
+ sql_print_error("NDB %s: %s timed out. Ignoring...",
+ type_str, share->key);
+ break;
+ }
+ if (ndb_extra_logging)
+ ndb_report_waiting(type_str, max_timeout,
+ type_str, share->key);
}
- if (ndb_extra_logging)
- sql_print_information("NDB delete table: "
- "waiting max %u sec for drop table %s.",
- max_timeout, share->key);
}
(void) pthread_mutex_unlock(&share->mutex);
#else
@@ -2646,7 +2718,8 @@ static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp,
}
static int
-ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
+ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
+ NdbEventOperation *pOp,
Binlog_index_row &row)
{
NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
@@ -2655,18 +2728,23 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
/* make sure to flush any pending events as they can be dependent
on one of the tables being changed below
*/
- injector_thd->binlog_flush_pending_rows_event(true);
+ thd->binlog_flush_pending_rows_event(true);
switch (type)
{
case NDBEVENT::TE_CLUSTER_FAILURE:
+ if (ndb_extra_logging)
+ sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
if (apply_status_share == share)
{
+ if (ndb_extra_logging &&
+ ndb_binlog_tables_inited && ndb_binlog_running)
+ sql_print_information("NDB Binlog: ndb tables initially "
+ "read only on reconnect.");
free_share(&apply_status_share);
apply_status_share= 0;
+ ndb_binlog_tables_inited= FALSE;
}
- if (ndb_extra_logging)
- sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: "
"%s received share: 0x%lx op: %lx share op: %lx "
"op_old: %lx",
@@ -2675,8 +2753,13 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
case NDBEVENT::TE_DROP:
if (apply_status_share == share)
{
+ if (ndb_extra_logging &&
+ ndb_binlog_tables_inited && ndb_binlog_running)
+ sql_print_information("NDB Binlog: ndb tables initially "
+ "read only on reconnect.");
free_share(&apply_status_share);
apply_status_share= 0;
+ ndb_binlog_tables_inited= FALSE;
}
/* ToDo: remove printout */
if (ndb_extra_logging)
@@ -2702,7 +2785,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
return 0;
}
- ndb_handle_schema_change(injector_thd, ndb, pOp, share);
+ ndb_handle_schema_change(thd, ndb, pOp, share);
return 0;
}
@@ -2982,7 +3065,8 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
pthread_handler_t ndb_binlog_thread_func(void *arg)
{
THD *thd; /* needs to be first for thread_stack */
- Ndb *ndb= 0;
+ Ndb *i_ndb= 0;
+ Ndb *s_ndb= 0;
Thd_ndb *thd_ndb=0;
int ndb_update_binlog_index= 1;
injector *inj= injector::instance();
@@ -3034,16 +3118,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
pthread_mutex_unlock(&LOCK_thread_count);
thd->lex->start_transaction_opt= 0;
- if (!(schema_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
- schema_ndb->init())
+ if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
+ s_ndb->init())
{
sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
goto err;
}
// empty database
- if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) ||
- ndb->init())
+ if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
+ i_ndb->init())
{
sql_print_error("NDB Binlog: Getting Ndb object failed");
ndb_binlog_thread_running= -1;
@@ -3064,7 +3148,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
pthread_mutex_lock(&injector_mutex);
*/
injector_thd= thd;
- injector_ndb= ndb;
+ injector_ndb= i_ndb;
+ schema_ndb= s_ndb;
ndb_binlog_thread_running= 1;
if (opt_bin_log)
{
@@ -3087,7 +3172,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
thd->proc_info= "Waiting for ndbcluster to start";
pthread_mutex_lock(&injector_mutex);
- while (!ndbcluster_util_inited)
+ while (!schema_share || !apply_status_share)
{
/* ndb not connected yet */
struct timespec abstime;
@@ -3119,10 +3204,6 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
thd->db= db;
if (ndb_binlog_running)
open_binlog_index(thd, &binlog_tables, &binlog_index);
- if (!apply_status_share)
- {
- sql_print_error("NDB: Could not get apply status share");
- }
thd->db= db;
}
@@ -3150,14 +3231,14 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
int res= 0, tot_poll_wait= 1000;
if (ndb_binlog_running)
{
- res= ndb->pollEvents(tot_poll_wait, &gci);
+ res= i_ndb->pollEvents(tot_poll_wait, &gci);
tot_poll_wait= 0;
}
- int schema_res= schema_ndb->pollEvents(tot_poll_wait, &schema_gci);
+ int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
ndb_latest_received_binlog_epoch= gci;
while (gci > schema_gci && schema_res >= 0)
- schema_res= schema_ndb->pollEvents(10, &schema_gci);
+ schema_res= s_ndb->pollEvents(10, &schema_gci);
if ((abort_loop || do_ndbcluster_binlog_close_connection) &&
(ndb_latest_handled_binlog_epoch >= g_latest_trans_gci ||
@@ -3184,15 +3265,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
if (unlikely(schema_res > 0))
{
- schema_ndb->
+ thd->proc_info= "Processing events from schema table";
+ s_ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
- schema_ndb->
+ s_ndb->
setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
- NdbEventOperation *pOp= schema_ndb->nextEvent();
+ NdbEventOperation *pOp= s_ndb->nextEvent();
while (pOp != NULL)
{
if (!pOp->hasError())
- ndb_binlog_thread_handle_schema_event(thd, schema_ndb, pOp,
+ ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp,
&post_epoch_log_list,
&post_epoch_unlock_list,
&mem_root);
@@ -3201,7 +3283,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
"binlog schema event",
(ulong) pOp->getNdbError().code,
pOp->getNdbError().message);
- pOp= schema_ndb->nextEvent();
+ pOp= s_ndb->nextEvent();
}
}
@@ -3213,7 +3295,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
int event_count= 0;
#endif
thd->proc_info= "Processing events";
- NdbEventOperation *pOp= ndb->nextEvent();
+ NdbEventOperation *pOp= i_ndb->nextEvent();
Binlog_index_row row;
while (pOp != NULL)
{
@@ -3224,9 +3306,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
- ndb->
+ i_ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
- ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
+ i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
bzero((char*) &row, sizeof(row));
injector::transaction trans;
@@ -3235,7 +3317,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Uint32 iter= 0;
const NdbEventOperation *gci_op;
Uint32 event_types;
- while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types))
+ while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
!= NULL)
{
NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData();
@@ -3321,7 +3403,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
event_count++;
#endif
if (pOp->hasError() &&
- ndb_binlog_thread_handle_error(ndb, pOp, row) < 0)
+ ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0)
goto err;
#ifndef DBUG_OFF
@@ -3341,7 +3423,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Uint32 iter= 0;
const NdbEventOperation *gci_op;
Uint32 event_types;
- while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types))
+ while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
!= NULL)
{
if (gci_op == pOp)
@@ -3353,19 +3435,19 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
#endif
if ((unsigned) pOp->getEventType() <
(unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
- ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans);
+ ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans);
else
{
// set injector_ndb database/schema from table internal name
int ret=
- ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
+ i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
DBUG_ASSERT(ret == 0);
- ndb_binlog_thread_handle_non_data_event(ndb, pOp, row);
+ ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row);
// reset to catch errors
- ndb->setDatabaseName("");
+ i_ndb->setDatabaseName("");
}
- pOp= ndb->nextEvent();
+ pOp= i_ndb->nextEvent();
} while (pOp && pOp->getGCI() == gci);
/*
@@ -3379,6 +3461,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
if (trans.good())
{
//DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes);
+ thd->proc_info= "Committing events to binlog";
injector::transaction::binlog_pos start= trans.start_pos();
if (int r= trans.commit())
{
@@ -3418,10 +3501,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
}
err:
DBUG_PRINT("info",("Shutting down cluster binlog thread"));
+ thd->proc_info= "Shutting down";
close_thread_tables(thd);
pthread_mutex_lock(&injector_mutex);
/* don't mess with the injector_ndb anymore from other threads */
+ injector_thd= 0;
injector_ndb= 0;
+ schema_ndb= 0;
pthread_mutex_unlock(&injector_mutex);
thd->db= 0; // as not to try to free memory
sql_print_information("Stopping Cluster Binlog");
@@ -3438,21 +3524,45 @@ err:
}
/* remove all event operations */
- if (ndb)
+ if (s_ndb)
{
NdbEventOperation *op;
DBUG_PRINT("info",("removing all event operations"));
- while ((op= ndb->getEventOperation()))
+ while ((op= s_ndb->getEventOperation()))
{
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
DBUG_PRINT("info",("removing event operation on %s",
op->getEvent()->getName()));
NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
+ DBUG_ASSERT(share != 0);
+ DBUG_ASSERT(share->op == op ||
+ share->op_old == op);
+ share->op= share->op_old= 0;
free_share(&share);
- ndb->dropEventOperation(op);
+ s_ndb->dropEventOperation(op);
+ }
+ delete s_ndb;
+ s_ndb= 0;
+ }
+ if (i_ndb)
+ {
+ NdbEventOperation *op;
+ DBUG_PRINT("info",("removing all event operations"));
+ while ((op= i_ndb->getEventOperation()))
+ {
+ DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
+ DBUG_PRINT("info",("removing event operation on %s",
+ op->getEvent()->getName()));
+ NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
+ DBUG_ASSERT(share != 0);
+ DBUG_ASSERT(share->op == op ||
+ share->op_old == op);
+ share->op= share->op_old= 0;
+ free_share(&share);
+ i_ndb->dropEventOperation(op);
}
- delete ndb;
- ndb= 0;
+ delete i_ndb;
+ i_ndb= 0;
}
hash_free(&ndb_schema_objects);
diff --git a/sql/ha_ndbcluster_binlog.h b/sql/ha_ndbcluster_binlog.h
index fda025842a0..9d15016568b 100644
--- a/sql/ha_ndbcluster_binlog.h
+++ b/sql/ha_ndbcluster_binlog.h
@@ -101,7 +101,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
const char *old_db= 0,
const char *old_table_name= 0);
int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
- NDB_SHARE *share);
+ NDB_SHARE *share,
+ const char *type_str);
void ndb_rep_event_name(String *event_name,
const char *db, const char *tbl);
int ndb_create_table_from_engine(THD *thd, const char *db,
@@ -112,12 +113,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg);
/*
table cluster_replication.apply_status
*/
-void ndbcluster_setup_binlog_table_shares(THD *thd);
+int ndbcluster_setup_binlog_table_shares(THD *thd);
extern NDB_SHARE *apply_status_share;
extern NDB_SHARE *schema_share;
extern THD *injector_thd;
extern my_bool ndb_binlog_running;
+extern my_bool ndb_binlog_tables_inited;
bool
ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,