diff options
Diffstat (limited to 'sql/ha_ndbcluster_binlog.cc')
-rw-r--r-- | sql/ha_ndbcluster_binlog.cc | 122 |
1 files changed, 99 insertions, 23 deletions
diff --git a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc index be75eff2575..07b0d907229 100644 --- a/sql/ha_ndbcluster_binlog.cc +++ b/sql/ha_ndbcluster_binlog.cc @@ -241,22 +241,30 @@ static void dbug_print_table(const char *info, TABLE *table) static void run_query(THD *thd, char *buf, char *end, const int *no_print_error, my_bool disable_binlog) { - ulong save_query_length= thd->query_length; - char *save_query= thd->query; + ulong save_thd_query_length= thd->query_length; + char *save_thd_query= thd->query; ulong save_thread_id= thd->variables.pseudo_thread_id; + struct system_status_var save_thd_status_var= thd->status_var; + THD_TRANS save_thd_transaction_all= thd->transaction.all; + THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt; ulonglong save_thd_options= thd->options; DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options)); - NET save_net= thd->net; + NET save_thd_net= thd->net; const char* found_semicolon= NULL; bzero((char*) &thd->net, sizeof(NET)); thd->query_length= end - buf; thd->query= buf; thd->variables.pseudo_thread_id= thread_id; + thd->transaction.stmt.modified_non_trans_table= FALSE; if (disable_binlog) thd->options&= ~OPTION_BIN_LOG; DBUG_PRINT("query", ("%s", thd->query)); + + DBUG_ASSERT(!thd->in_sub_stmt); + DBUG_ASSERT(!thd->prelocked_mode); + mysql_parse(thd, thd->query, thd->query_length, &found_semicolon); if (no_print_error && thd->is_slave_error) @@ -265,20 +273,36 @@ static void run_query(THD *thd, char *buf, char *end, Thd_ndb *thd_ndb= get_thd_ndb(thd); for (i= 0; no_print_error[i]; i++) if ((thd_ndb->m_error_code == no_print_error[i]) || - (thd->net.last_errno == (unsigned)no_print_error[i])) + (thd->main_da.sql_errno() == (unsigned) no_print_error[i])) break; if (!no_print_error[i]) sql_print_error("NDB: %s: error %s %d(ndb: %d) %d %d", - buf, thd->net.last_error, thd->net.last_errno, + buf, + thd->main_da.message(), + thd->main_da.sql_errno(), thd_ndb->m_error_code, (int) thd->is_error(), thd->is_slave_error); } + /* + XXX: this code is broken. mysql_parse()/mysql_reset_thd_for_next_command() + can not be called from within a statement, and + run_query() can be called from anywhere, including from within + a sub-statement. + This particular reset is a temporary hack to avoid an assert + for double assignment of the diagnostics area when run_query() + is called from ndbcluster_reset_logs(), which is called from + mysql_flush(). + */ + thd->main_da.reset_diagnostics_area(); thd->options= save_thd_options; - thd->query_length= save_query_length; - thd->query= save_query; + thd->query_length= save_thd_query_length; + thd->query= save_thd_query; thd->variables.pseudo_thread_id= save_thread_id; - thd->net= save_net; + thd->status_var= save_thd_status_var; + thd->transaction.all= save_thd_transaction_all; + thd->transaction.stmt= save_thd_transaction_stmt; + thd->net= save_thd_net; if (thd == injector_thd) { @@ -777,8 +801,9 @@ static int ndbcluster_create_ndb_apply_status_table(THD *thd) " end_pos BIGINT UNSIGNED NOT NULL, " " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB"); - const int no_print_error[4]= {ER_TABLE_EXISTS_ERROR, + const int no_print_error[5]= {ER_TABLE_EXISTS_ERROR, 701, + 702, 4009, 0}; // do not print error 701 etc run_query(thd, buf, end, no_print_error, TRUE); @@ -837,8 +862,9 @@ static int ndbcluster_create_schema_table(THD *thd) " type INT UNSIGNED NOT NULL," " PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB"); - const int no_print_error[4]= {ER_TABLE_EXISTS_ERROR, + const int no_print_error[5]= {ER_TABLE_EXISTS_ERROR, 701, + 702, 4009, 0}; // do not print error 701 etc run_query(thd, buf, end, no_print_error, TRUE); @@ -883,7 +909,7 @@ int ndbcluster_setup_binlog_table_shares(THD *thd) { if (ndb_extra_logging) sql_print_information("NDB Binlog: ndb tables writable"); - close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE); + close_cached_tables(NULL, NULL, TRUE, FALSE, FALSE); } pthread_mutex_unlock(&LOCK_open); /* Signal injector thread that all is setup */ @@ -1683,7 +1709,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, bzero((char*) &table_list,sizeof(table_list)); table_list.db= (char *)dbname; table_list.alias= table_list.table_name= (char *)tabname; - close_cached_tables(thd, 0, &table_list, TRUE); + close_cached_tables(thd, &table_list, TRUE, FALSE, FALSE); if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 1))) @@ -1789,7 +1815,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, bzero((char*) &table_list,sizeof(table_list)); table_list.db= (char *)dbname; table_list.alias= table_list.table_name= (char *)tabname; - close_cached_tables(thd, 0, &table_list); + close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE); /* ndb_share reference create free */ DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u", share->key, share->use_count)); @@ -1908,7 +1934,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, bzero((char*) &table_list,sizeof(table_list)); table_list.db= schema->db; table_list.alias= table_list.table_name= schema->name; - close_cached_tables(thd, 0, &table_list, FALSE); + close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE); } /* ndb_share reference temporary free */ if (share) @@ -2032,7 +2058,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ - close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, FALSE); + close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE); // fall through case NDBEVENT::TE_ALTER: ndb_handle_schema_change(thd, ndb, pOp, tmp_share); @@ -2189,7 +2215,7 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, bzero((char*) &table_list,sizeof(table_list)); table_list.db= schema->db; table_list.alias= table_list.table_name= schema->name; - close_cached_tables(thd, 0, &table_list, FALSE); + close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE); } if (schema_type != SOT_ALTER_TABLE) break; @@ -2300,9 +2326,12 @@ static int open_ndb_binlog_index(THD *thd, TABLE_LIST *tables, thd->clear_error(); if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH)) { - sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'", - thd->net.last_errno, - thd->net.last_error ? thd->net.last_error : ""); + if (thd->killed) + sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed"); + else + sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'", + thd->main_da.sql_errno(), + thd->main_da.message()); thd->proc_info= save_proc_info; return -1; } @@ -3587,6 +3616,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Thd_ndb *thd_ndb=0; int ndb_update_ndb_binlog_index= 1; injector *inj= injector::instance(); + uint incident_id= 0; #ifdef RUN_NDB_BINLOG_TIMER Timer main_timer; @@ -3693,18 +3723,64 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_mutex_unlock(&injector_mutex); pthread_cond_signal(&injector_cond); + /* + wait for mysql server to start (so that the binlog is started + and thus can receive the first GAP event) + */ + pthread_mutex_lock(&LOCK_server_started); + while (!mysqld_server_started) + { + struct timespec abstime; + set_timespec(abstime, 1); + pthread_cond_timedwait(&COND_server_started, &LOCK_server_started, + &abstime); + if (ndbcluster_terminating) + { + pthread_mutex_unlock(&LOCK_server_started); + pthread_mutex_lock(&LOCK_ndb_util_thread); + goto err; + } + } + pthread_mutex_unlock(&LOCK_server_started); restart: /* Main NDB Injector loop */ + while (ndb_binlog_running) { /* - Always insert a GAP event as we cannot know what has happened in the cluster - while not being connected. + check if it is the first log, if so we do not insert a GAP event + as there is really no log to have a GAP in + */ + if (incident_id == 0) + { + LOG_INFO log_info; + mysql_bin_log.get_current_log(&log_info); + int len= strlen(log_info.log_file_name); + uint no= 0; + if ((sscanf(log_info.log_file_name + len - 6, "%u", &no) == 1) && + no == 1) + { + /* this is the fist log, so skip GAP event */ + break; + } + } + + /* + Always insert a GAP event as we cannot know what has happened + in the cluster while not being connected. */ - LEX_STRING const msg= { C_STRING_WITH_LEN("Cluster connect") }; - inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg); + LEX_STRING const msg[2]= + { + { C_STRING_WITH_LEN("mysqld startup") }, + { C_STRING_WITH_LEN("cluster disconnect")} + }; + IF_DBUG(int error=) + inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg[incident_id]); + DBUG_ASSERT(!error); + break; } + incident_id= 1; { thd->proc_info= "Waiting for ndbcluster to start"; |