diff options
author | Sergei Golubchik <sergii@pisem.net> | 2012-10-19 20:38:59 +0200 |
---|---|---|
committer | Sergei Golubchik <sergii@pisem.net> | 2012-10-19 20:38:59 +0200 |
commit | e1f681c99b3e5462c033aaafa94ac295e626cde2 (patch) | |
tree | 2da5eff1a0d03831c2d85b32a7bc3df6ec37b522 /sql/slave.cc | |
parent | 52c84d144d3b07966d9b3bab8694eb012eef69ce (diff) | |
parent | 807fef40fffbbb8e92564a52b902b504ba8cfcdc (diff) | |
download | mariadb-git-e1f681c99b3e5462c033aaafa94ac295e626cde2.tar.gz |
10.0-base -> 10.0-monty
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 315 |
1 files changed, 231 insertions, 84 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 36a3d51b497..e1bac640e00 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -71,8 +71,10 @@ char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE]; char* slave_load_tmpdir = 0; Master_info *active_mi= 0; +Master_info_index *master_info_index; my_bool replicate_same_server_id; ulonglong relay_log_space_limit = 0; +LEX_STRING default_master_connection_name= { (char*) "", 0 }; /* When slave thread exits, we need to remember the temporary tables so we @@ -144,7 +146,8 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev); static bool wait_for_relay_log_space(Relay_log_info* rli); static inline bool io_slave_killed(THD* thd,Master_info* mi); static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli); -static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); +static int init_slave_thread(THD* thd, Master_info *mi, + SLAVE_THD_TYPE thd_type); static void print_slave_skip_errors(void); static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, @@ -159,6 +162,8 @@ static int terminate_slave_thread(THD *thd, volatile uint *slave_running, bool skip_lock); static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info); +static bool send_show_master_info_header(THD *thd, bool full); +static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full); /* Find out which replications threads are running @@ -263,15 +268,33 @@ int init_slave() So it's safer to take the lock. */ mysql_mutex_lock(&LOCK_active_mi); - /* - TODO: re-write this to interate through the list of files - for multi-master - */ - active_mi= new Master_info(relay_log_recovery); if (pthread_key_create(&RPL_MASTER_INFO, NULL)) goto err; + master_info_index= new Master_info_index; + if (!master_info_index || master_info_index->init_all_master_info()) + { + sql_print_error("Failed to initialize multi master structures"); + mysql_mutex_unlock(&LOCK_active_mi); + DBUG_RETURN(1); + } + if (!(active_mi= new Master_info(&default_master_connection_name, + relay_log_recovery)) || + active_mi->error()) + { + delete active_mi; + active_mi= 0; + goto err; + } + + if (master_info_index->add_master_info(active_mi, FALSE)) + { + delete active_mi; + active_mi= 0; + goto err; + } + /* If --slave-skip-errors=... was not used, the string value for the system variable has not been set up yet. Do it now. @@ -286,18 +309,11 @@ int init_slave() If master_host is specified, create the master_info file if it doesn't exists. */ - if (!active_mi) - { - sql_print_error("Failed to allocate memory for the master info structure"); - error= 1; - goto err; - } if (init_master_info(active_mi,master_info_file,relay_log_info_file, 1, (SLAVE_IO | SLAVE_SQL))) { sql_print_error("Failed to initialize the master info structure"); - error= 1; goto err; } @@ -313,14 +329,18 @@ int init_slave() SLAVE_IO | SLAVE_SQL)) { sql_print_error("Failed to create slave threads"); - error= 1; goto err; } } -err: +end: mysql_mutex_unlock(&LOCK_active_mi); DBUG_RETURN(error); + +err: + sql_print_error("Failed to allocate memory for the Master Info structure"); + error= 1; + goto end; } /* @@ -834,42 +854,19 @@ void end_slave() running presently. If a START SLAVE was in progress, the mutex lock below will make us wait until slave threads have started, and START SLAVE returns, then we terminate them here. + + We can also be called by cleanup(), which only happens if some + startup parameter to the server was wrong. */ mysql_mutex_lock(&LOCK_active_mi); - if (active_mi) - { - /* - TODO: replace the line below with - list_walk(&master_list, (list_walk_action)end_slave_on_walk,0); - once multi-master code is ready. - */ - terminate_slave_threads(active_mi,SLAVE_FORCE_ALL); - } + /* This will call terminate_slave_threads() on all connections */ + delete master_info_index; + master_info_index= 0; + active_mi= 0; mysql_mutex_unlock(&LOCK_active_mi); DBUG_VOID_RETURN; } -/** - Free all resources used by slave threads at time of executing shutdown. - The routine must be called after all possible users of @c active_mi - have left. - - SYNOPSIS - close_active_mi() - -*/ -void close_active_mi() -{ - mysql_mutex_lock(&LOCK_active_mi); - if (active_mi) - { - end_master_info(active_mi); - delete active_mi; - active_mi= 0; - } - mysql_mutex_unlock(&LOCK_active_mi); -} - static bool io_slave_killed(THD* thd, Master_info* mi) { DBUG_ENTER("io_slave_killed"); @@ -1767,6 +1764,37 @@ past_checksum: } } } + + /* Announce MariaDB slave capabilities. */ + DBUG_EXECUTE_IF("simulate_slave_capability_none", goto after_set_capability;); + { + int rc= DBUG_EVALUATE_IF("simulate_slave_capability_old_53", + mysql_real_query(mysql, STRING_WITH_LEN("SET @mariadb_slave_capability=" + STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_ANNOTATE))), + mysql_real_query(mysql, STRING_WITH_LEN("SET @mariadb_slave_capability=" + STRINGIFY_ARG(MARIA_SLAVE_CAPABILITY_MINE)))); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting @mariadb_slave_capability failed with error: %s", + mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @mariadb_slave_capability."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + } +after_set_capability: + err: if (errmsg) { @@ -1849,9 +1877,9 @@ static bool wait_for_relay_log_space(Relay_log_info* rli) #endif if (rli->sql_force_rotate_relay) { - mysql_mutex_lock(&active_mi->data_lock); + mysql_mutex_lock(&mi->data_lock); rotate_relay_log(rli->mi); - mysql_mutex_unlock(&active_mi->data_lock); + mysql_mutex_unlock(&mi->data_lock); rli->sql_force_rotate_relay= false; } @@ -2005,15 +2033,36 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, @retval FALSE success @retval TRUE failure */ -bool show_master_info(THD* thd, Master_info* mi) + +bool show_master_info(THD *thd, Master_info *mi, bool full) +{ + DBUG_ENTER("show_master_info"); + + if (send_show_master_info_header(thd, full)) + DBUG_RETURN(TRUE); + if (send_show_master_info_data(thd, mi, full)) + DBUG_RETURN(TRUE); + my_eof(thd); + DBUG_RETURN(FALSE); +} + +static bool send_show_master_info_header(THD *thd, bool full) { - // TODO: fix this for multi-master List<Item> field_list; Protocol *protocol= thd->protocol; - DBUG_ENTER("show_master_info"); + Master_info *mi; + DBUG_ENTER("show_master_info_header"); + + if (full) + { + field_list.push_back(new Item_empty_string("Connection_name", + MAX_CONNECTION_NAME)); + field_list.push_back(new Item_empty_string("Slave_SQL_State", + 30)); + } field_list.push_back(new Item_empty_string("Slave_IO_State", - 14)); + 30)); field_list.push_back(new Item_empty_string("Master_Host", sizeof(mi->host))); field_list.push_back(new Item_empty_string("Master_User", @@ -2080,22 +2129,52 @@ bool show_master_info(THD* thd, Master_info* mi) sizeof(mi->ssl_crl))); field_list.push_back(new Item_empty_string("Master_SSL_Crlpath", sizeof(mi->ssl_crlpath))); + if (full) + { + field_list.push_back(new Item_return_int("Retried_transactions", + 10, MYSQL_TYPE_LONG)); + field_list.push_back(new Item_return_int("Max_relay_log_size", + 10, MYSQL_TYPE_LONGLONG)); + field_list.push_back(new Item_return_int("Executed_log_entries", + 10, MYSQL_TYPE_LONG)); + field_list.push_back(new Item_return_int("Slave_received_heartbeats", + 10, MYSQL_TYPE_LONG)); + field_list.push_back(new Item_float("Slave_heartbeat_period", + 0.0, 3, 10)); + } if (protocol->send_result_set_metadata(&field_list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); + DBUG_RETURN(FALSE); +} + + +static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full) +{ + DBUG_ENTER("send_show_master_info_data"); if (mi->host[0]) { DBUG_PRINT("info",("host is set: '%s'", mi->host)); String *packet= &thd->packet; + Protocol *protocol= thd->protocol; + char buf[256]; + String tmp(buf, sizeof(buf), &my_charset_bin); + protocol->prepare_for_resend(); /* slave_running can be accessed without run_lock but not other non-volotile members like mi->io_thd, which is guarded by the mutex. */ + if (full) + protocol->store(mi->connection_name.str, mi->connection_name.length, + &my_charset_bin); mysql_mutex_lock(&mi->run_lock); + if (full) + protocol->store(mi->rli.sql_thd ? mi->rli.sql_thd->get_proc_info() : "", + &my_charset_bin); protocol->store(mi->io_thd ? mi->io_thd->get_proc_info() : "", &my_charset_bin); mysql_mutex_unlock(&mi->run_lock); @@ -2121,8 +2200,6 @@ bool show_master_info(THD* thd, Master_info* mi) protocol->store(rpl_filter->get_do_db()); protocol->store(rpl_filter->get_ignore_db()); - char buf[256]; - String tmp(buf, sizeof(buf), &my_charset_bin); rpl_filter->get_do_table(&tmp); protocol->store(&tmp); rpl_filter->get_ignore_table(&tmp); @@ -2232,6 +2309,14 @@ bool show_master_info(THD* thd, Master_info* mi) protocol->store(mi->ssl_ca, &my_charset_bin); // Master_Ssl_Crlpath protocol->store(mi->ssl_capath, &my_charset_bin); + if (full) + { + protocol->store((uint32) mi->rli.retried_trans); + protocol->store((ulonglong) mi->rli.max_relay_log_size); + protocol->store((uint32) mi->rli.executed_entries); + protocol->store((uint32) mi->received_heartbeats); + protocol->store((double) mi->heartbeat_period, 3, &tmp); + } mysql_mutex_unlock(&mi->rli.err_lock); mysql_mutex_unlock(&mi->err_lock); @@ -2241,6 +2326,69 @@ bool show_master_info(THD* thd, Master_info* mi) if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length())) DBUG_RETURN(TRUE); } + DBUG_RETURN(FALSE); +} + + +/* Used to sort connections by name */ + +static int cmp_mi_by_name(const Master_info **arg1, + const Master_info **arg2) +{ + return my_strcasecmp(system_charset_info, (*arg1)->connection_name.str, + (*arg2)->connection_name.str); +} + + +/** + Execute a SHOW FULL SLAVE STATUS statement. + + @param thd Pointer to THD object for the client thread executing the + statement. + + Elements are sorted according to the original connection_name. + + @retval FALSE success + @retval TRUE failure + + @note + master_info_index is protected by LOCK_active_mi. +*/ + +bool show_all_master_info(THD* thd) +{ + uint i, elements; + Master_info **tmp; + DBUG_ENTER("show_master_info"); + mysql_mutex_assert_owner(&LOCK_active_mi); + + if (send_show_master_info_header(thd, 1)) + DBUG_RETURN(TRUE); + + if (!(elements= master_info_index->master_info_hash.records)) + goto end; + + /* + Sort lines to get them into a predicted order + (needed for test cases and to not confuse users) + */ + if (!(tmp= (Master_info**) thd->alloc(sizeof(Master_info*) * elements))) + DBUG_RETURN(TRUE); + + for (i= 0; i < elements; i++) + { + tmp[i]= (Master_info *) my_hash_element(&master_info_index-> + master_info_hash, i); + } + my_qsort(tmp, elements, sizeof(Master_info*), (qsort_cmp) cmp_mi_by_name); + + for (i= 0; i < elements; i++) + { + if (send_show_master_info_data(thd, tmp[i], 1)) + DBUG_RETURN(TRUE); + } + +end: my_eof(thd); DBUG_RETURN(FALSE); } @@ -2294,7 +2442,8 @@ void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli) init_slave_thread() */ -static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) +static int init_slave_thread(THD* thd, Master_info *mi, + SLAVE_THD_TYPE thd_type) { DBUG_ENTER("init_slave_thread"); #if !defined(DBUG_OFF) @@ -2309,9 +2458,9 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) slave threads, since a replication event can become this much larger than the corresponding packet (query) sent from client to master. */ - thd->variables.max_allowed_packet= global_system_variables.max_allowed_packet - + MAX_LOG_EVENT_HEADER; /* note, incr over the global not session var */ - thd->slave_thread = 1; + thd->variables.max_allowed_packet= slave_max_allowed_packet; + thd->slave_thread= 1; + thd->connection_name= mi->connection_name; thd->enable_slow_log= opt_log_slow_slave_statements; thd->variables.log_slow_filter= global_system_variables.log_slow_filter; set_slave_thread_options(thd); @@ -2626,7 +2775,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) int reason= ev->shall_skip(rli); if (reason == Log_event::EVENT_SKIP_COUNT) - sql_slave_skip_counter= --rli->slave_skip_counter; + { + DBUG_ASSERT(rli->slave_skip_counter > 0); + rli->slave_skip_counter--; + } mysql_mutex_unlock(&rli->data_lock); if (reason == Log_event::EVENT_SKIP_NOT) exec_res= ev->apply_event(rli); @@ -2875,6 +3027,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS rli->trans_retries++; rli->retried_trans++; + statistic_increment(slave_retried_transactions, LOCK_status); mysql_mutex_unlock(&rli->data_lock); DBUG_PRINT("info", ("Slave retries transaction " "rli->trans_retries: %lu", rli->trans_retries)); @@ -3033,6 +3186,8 @@ pthread_handler_t handle_slave_io(void *arg) mysql= NULL ; retry_count= 0; + thd= new THD; // note that contructor of THD uses DBUG_ ! + mysql_mutex_lock(&mi->run_lock); /* Inform waiting threads that slave has started */ mi->slave_run_id++; @@ -3041,14 +3196,13 @@ pthread_handler_t handle_slave_io(void *arg) mi->events_till_disconnect = disconnect_slave_event_count; #endif - thd= new THD; // note that contructor of THD uses DBUG_ ! THD_CHECK_SENTRY(thd); mi->io_thd = thd; pthread_detach_this_thread(); thd->thread_stack= (char*) &thd; // remember where our stack is mi->clear_error(); - if (init_slave_thread(thd, SLAVE_THD_IO)) + if (init_slave_thread(thd, mi, SLAVE_THD_IO)) { mysql_cond_broadcast(&mi->start_cond); sql_print_error("Failed during slave I/O thread initialization"); @@ -3057,7 +3211,7 @@ pthread_handler_t handle_slave_io(void *arg) mysql_mutex_lock(&LOCK_thread_count); threads.append(thd); mysql_mutex_unlock(&LOCK_thread_count); - mi->slave_running = 1; + mi->slave_running = MYSQL_SLAVE_RUN_NOT_CONNECT; mi->abort_slave = 0; mysql_mutex_unlock(&mi->run_lock); mysql_cond_broadcast(&mi->start_cond); @@ -3097,6 +3251,7 @@ pthread_handler_t handle_slave_io(void *arg) thread, since a replication event can become this much larger than the corresponding packet (query) sent from client to master. */ + thd->net.max_packet_size= slave_max_allowed_packet; mysql->net.max_packet_size= thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER; } else @@ -3229,12 +3384,12 @@ reading event")) switch (mysql_error_number) { case CR_NET_PACKET_TOO_LARGE: sql_print_error("\ -Log entry on master is longer than max_allowed_packet (%ld) on \ +Log entry on master is longer than slave_max_allowed_packet (%lu) on \ slave. If the entry is correct, restart the server with a higher value of \ -max_allowed_packet", - thd->variables.max_allowed_packet); +slave_max_allowed_packet", + slave_max_allowed_packet); mi->report(ERROR_LEVEL, ER_NET_PACKET_TOO_LARGE, - "%s", ER(ER_NET_PACKET_TOO_LARGE)); + "%s", "Got a packet bigger than 'slave_max_allowed_packet' bytes"); goto err; case ER_MASTER_FATAL_ERROR_READING_BINLOG: mi->report(ERROR_LEVEL, ER_MASTER_FATAL_ERROR_READING_BINLOG, @@ -3368,7 +3523,7 @@ err_during_init: delete thd; mysql_mutex_unlock(&LOCK_thread_count); mi->abort_slave= 0; - mi->slave_running= 0; + mi->slave_running= MYSQL_SLAVE_NOT_RUN; mi->io_thd= 0; /* Note: the order of the two following calls (first broadcast, then unlock) @@ -3446,7 +3601,8 @@ pthread_handler_t handle_slave_sql(void *arg) my_off_t UNINIT_VAR(saved_log_pos); my_off_t UNINIT_VAR(saved_master_log_pos); my_off_t saved_skip= 0; - Relay_log_info* rli = &((Master_info*)arg)->rli; + Master_info *mi= ((Master_info*)arg); + Relay_log_info* rli = &mi->rli; const char *errmsg; // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff @@ -3460,6 +3616,7 @@ pthread_handler_t handle_slave_sql(void *arg) thd->thread_stack = (char*)&thd; // remember where our stack is DBUG_ASSERT(rli->inited); + DBUG_ASSERT(rli->mi == mi); mysql_mutex_lock(&rli->run_lock); DBUG_ASSERT(!rli->slave_running); errmsg= 0; @@ -3471,10 +3628,10 @@ pthread_handler_t handle_slave_sql(void *arg) /* Inform waiting threads that slave has started */ rli->slave_run_id++; - rli->slave_running = 1; + rli->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT; pthread_detach_this_thread(); - if (init_slave_thread(thd, SLAVE_THD_SQL)) + if (init_slave_thread(thd, mi, SLAVE_THD_SQL)) { /* TODO: this is currently broken - slave start and change master @@ -3715,6 +3872,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ } goto err; } + rli->executed_entries++; } /* Thread stopped. Print the current replication position to the log */ @@ -3745,9 +3903,9 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ err_during_init: /* We need data_lock, at least to wake up any waiting master_pos_wait() */ mysql_mutex_lock(&rli->data_lock); - DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun + DBUG_ASSERT(rli->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT); // tracking buffer overrun /* When master_pos_wait() wakes up it will check this and terminate */ - rli->slave_running= 0; + rli->slave_running= MYSQL_SLAVE_NOT_RUN; /* Forget the relay log's format */ delete rli->relay_log.description_event_for_exec; rli->relay_log.description_event_for_exec= 0; @@ -5266,11 +5424,6 @@ static Log_event* next_event(Relay_log_info* rli) mysql_mutex_lock(log_lock); if (rli->relay_log.is_active(rli->linfo.log_file_name)) { -#ifdef EXTRA_DEBUG - if (global_system_variables.log_warnings) - sql_print_information("next log '%s' is currently active", - rli->linfo.log_file_name); -#endif rli->cur_log= cur_log= rli->relay_log.get_log_file(); rli->cur_log_old_open_count= rli->relay_log.get_open_count(); DBUG_ASSERT(rli->cur_log_fd == -1); @@ -5353,11 +5506,6 @@ static Log_event* next_event(Relay_log_info* rli) ourselves. We are sure that the log is still not hot now (a log can get from hot to cold, but not from cold to hot). No need for LOCK_log. */ -#ifdef EXTRA_DEBUG - if (global_system_variables.log_warnings) - sql_print_information("next log '%s' is not active", - rli->linfo.log_file_name); -#endif // open_binlog() will check the magic header if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, &errmsg)) <0) @@ -5537,11 +5685,10 @@ bool rpl_master_has_bug(const Relay_log_info *rli, uint bug_id, bool report, */ bool rpl_master_erroneous_autoinc(THD *thd) { - if (active_mi && active_mi->rli.sql_thd == thd) + if (thd->rli_slave) { - Relay_log_info *rli= &active_mi->rli; DBUG_EXECUTE_IF("simulate_bug33029", return TRUE;); - return rpl_master_has_bug(rli, 33029, FALSE, NULL, NULL); + return rpl_master_has_bug(thd->rli_slave, 33029, FALSE, NULL, NULL); } return FALSE; } |