summaryrefslogtreecommitdiff
path: root/sql/ha_ndbcluster_binlog.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/ha_ndbcluster_binlog.cc')
-rw-r--r--sql/ha_ndbcluster_binlog.cc122
1 files changed, 99 insertions, 23 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
index be75eff2575..07b0d907229 100644
--- a/sql/ha_ndbcluster_binlog.cc
+++ b/sql/ha_ndbcluster_binlog.cc
@@ -241,22 +241,30 @@ static void dbug_print_table(const char *info, TABLE *table)
static void run_query(THD *thd, char *buf, char *end,
const int *no_print_error, my_bool disable_binlog)
{
- ulong save_query_length= thd->query_length;
- char *save_query= thd->query;
+ ulong save_thd_query_length= thd->query_length;
+ char *save_thd_query= thd->query;
ulong save_thread_id= thd->variables.pseudo_thread_id;
+ struct system_status_var save_thd_status_var= thd->status_var;
+ THD_TRANS save_thd_transaction_all= thd->transaction.all;
+ THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt;
ulonglong save_thd_options= thd->options;
DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options));
- NET save_net= thd->net;
+ NET save_thd_net= thd->net;
const char* found_semicolon= NULL;
bzero((char*) &thd->net, sizeof(NET));
thd->query_length= end - buf;
thd->query= buf;
thd->variables.pseudo_thread_id= thread_id;
+ thd->transaction.stmt.modified_non_trans_table= FALSE;
if (disable_binlog)
thd->options&= ~OPTION_BIN_LOG;
DBUG_PRINT("query", ("%s", thd->query));
+
+ DBUG_ASSERT(!thd->in_sub_stmt);
+ DBUG_ASSERT(!thd->prelocked_mode);
+
mysql_parse(thd, thd->query, thd->query_length, &found_semicolon);
if (no_print_error && thd->is_slave_error)
@@ -265,20 +273,36 @@ static void run_query(THD *thd, char *buf, char *end,
Thd_ndb *thd_ndb= get_thd_ndb(thd);
for (i= 0; no_print_error[i]; i++)
if ((thd_ndb->m_error_code == no_print_error[i]) ||
- (thd->net.last_errno == (unsigned)no_print_error[i]))
+ (thd->main_da.sql_errno() == (unsigned) no_print_error[i]))
break;
if (!no_print_error[i])
sql_print_error("NDB: %s: error %s %d(ndb: %d) %d %d",
- buf, thd->net.last_error, thd->net.last_errno,
+ buf,
+ thd->main_da.message(),
+ thd->main_da.sql_errno(),
thd_ndb->m_error_code,
(int) thd->is_error(), thd->is_slave_error);
}
+ /*
+ XXX: this code is broken. mysql_parse()/mysql_reset_thd_for_next_command()
+ can not be called from within a statement, and
+ run_query() can be called from anywhere, including from within
+ a sub-statement.
+ This particular reset is a temporary hack to avoid an assert
+ for double assignment of the diagnostics area when run_query()
+ is called from ndbcluster_reset_logs(), which is called from
+ mysql_flush().
+ */
+ thd->main_da.reset_diagnostics_area();
thd->options= save_thd_options;
- thd->query_length= save_query_length;
- thd->query= save_query;
+ thd->query_length= save_thd_query_length;
+ thd->query= save_thd_query;
thd->variables.pseudo_thread_id= save_thread_id;
- thd->net= save_net;
+ thd->status_var= save_thd_status_var;
+ thd->transaction.all= save_thd_transaction_all;
+ thd->transaction.stmt= save_thd_transaction_stmt;
+ thd->net= save_thd_net;
if (thd == injector_thd)
{
@@ -777,8 +801,9 @@ static int ndbcluster_create_ndb_apply_status_table(THD *thd)
" end_pos BIGINT UNSIGNED NOT NULL, "
" PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB");
- const int no_print_error[4]= {ER_TABLE_EXISTS_ERROR,
+ const int no_print_error[5]= {ER_TABLE_EXISTS_ERROR,
701,
+ 702,
4009,
0}; // do not print error 701 etc
run_query(thd, buf, end, no_print_error, TRUE);
@@ -837,8 +862,9 @@ static int ndbcluster_create_schema_table(THD *thd)
" type INT UNSIGNED NOT NULL,"
" PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB");
- const int no_print_error[4]= {ER_TABLE_EXISTS_ERROR,
+ const int no_print_error[5]= {ER_TABLE_EXISTS_ERROR,
701,
+ 702,
4009,
0}; // do not print error 701 etc
run_query(thd, buf, end, no_print_error, TRUE);
@@ -883,7 +909,7 @@ int ndbcluster_setup_binlog_table_shares(THD *thd)
{
if (ndb_extra_logging)
sql_print_information("NDB Binlog: ndb tables writable");
- close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
+ close_cached_tables(NULL, NULL, TRUE, FALSE, FALSE);
}
pthread_mutex_unlock(&LOCK_open);
/* Signal injector thread that all is setup */
@@ -1683,7 +1709,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
bzero((char*) &table_list,sizeof(table_list));
table_list.db= (char *)dbname;
table_list.alias= table_list.table_name= (char *)tabname;
- close_cached_tables(thd, 0, &table_list, TRUE);
+ close_cached_tables(thd, &table_list, TRUE, FALSE, FALSE);
if ((error= ndbcluster_binlog_open_table(thd, share,
table_share, table, 1)))
@@ -1789,7 +1815,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
bzero((char*) &table_list,sizeof(table_list));
table_list.db= (char *)dbname;
table_list.alias= table_list.table_name= (char *)tabname;
- close_cached_tables(thd, 0, &table_list);
+ close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
/* ndb_share reference create free */
DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
share->key, share->use_count));
@@ -1908,7 +1934,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
bzero((char*) &table_list,sizeof(table_list));
table_list.db= schema->db;
table_list.alias= table_list.table_name= schema->name;
- close_cached_tables(thd, 0, &table_list, FALSE);
+ close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
}
/* ndb_share reference temporary free */
if (share)
@@ -2032,7 +2058,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
pthread_mutex_unlock(&ndb_schema_share_mutex);
/* end protect ndb_schema_share */
- close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, FALSE);
+ close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE);
// fall through
case NDBEVENT::TE_ALTER:
ndb_handle_schema_change(thd, ndb, pOp, tmp_share);
@@ -2189,7 +2215,7 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
bzero((char*) &table_list,sizeof(table_list));
table_list.db= schema->db;
table_list.alias= table_list.table_name= schema->name;
- close_cached_tables(thd, 0, &table_list, FALSE);
+ close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
}
if (schema_type != SOT_ALTER_TABLE)
break;
@@ -2300,9 +2326,12 @@ static int open_ndb_binlog_index(THD *thd, TABLE_LIST *tables,
thd->clear_error();
if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH))
{
- sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
- thd->net.last_errno,
- thd->net.last_error ? thd->net.last_error : "");
+ if (thd->killed)
+ sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
+ else
+ sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
+ thd->main_da.sql_errno(),
+ thd->main_da.message());
thd->proc_info= save_proc_info;
return -1;
}
@@ -3587,6 +3616,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Thd_ndb *thd_ndb=0;
int ndb_update_ndb_binlog_index= 1;
injector *inj= injector::instance();
+ uint incident_id= 0;
#ifdef RUN_NDB_BINLOG_TIMER
Timer main_timer;
@@ -3693,18 +3723,64 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
pthread_mutex_unlock(&injector_mutex);
pthread_cond_signal(&injector_cond);
+ /*
+ wait for mysql server to start (so that the binlog is started
+ and thus can receive the first GAP event)
+ */
+ pthread_mutex_lock(&LOCK_server_started);
+ while (!mysqld_server_started)
+ {
+ struct timespec abstime;
+ set_timespec(abstime, 1);
+ pthread_cond_timedwait(&COND_server_started, &LOCK_server_started,
+ &abstime);
+ if (ndbcluster_terminating)
+ {
+ pthread_mutex_unlock(&LOCK_server_started);
+ pthread_mutex_lock(&LOCK_ndb_util_thread);
+ goto err;
+ }
+ }
+ pthread_mutex_unlock(&LOCK_server_started);
restart:
/*
Main NDB Injector loop
*/
+ while (ndb_binlog_running)
{
/*
- Always insert a GAP event as we cannot know what has happened in the cluster
- while not being connected.
+ check if it is the first log, if so we do not insert a GAP event
+ as there is really no log to have a GAP in
+ */
+ if (incident_id == 0)
+ {
+ LOG_INFO log_info;
+ mysql_bin_log.get_current_log(&log_info);
+ int len= strlen(log_info.log_file_name);
+ uint no= 0;
+ if ((sscanf(log_info.log_file_name + len - 6, "%u", &no) == 1) &&
+ no == 1)
+ {
+ /* this is the fist log, so skip GAP event */
+ break;
+ }
+ }
+
+ /*
+ Always insert a GAP event as we cannot know what has happened
+ in the cluster while not being connected.
*/
- LEX_STRING const msg= { C_STRING_WITH_LEN("Cluster connect") };
- inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
+ LEX_STRING const msg[2]=
+ {
+ { C_STRING_WITH_LEN("mysqld startup") },
+ { C_STRING_WITH_LEN("cluster disconnect")}
+ };
+ IF_DBUG(int error=)
+ inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg[incident_id]);
+ DBUG_ASSERT(!error);
+ break;
}
+ incident_id= 1;
{
thd->proc_info= "Waiting for ndbcluster to start";