diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 170 |
1 files changed, 156 insertions, 14 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 37979576b73..2747ce548e1 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -232,10 +232,12 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, *errmsg=0; pthread_mutex_t *log_lock=rli->relay_log.get_log_lock(); - pthread_mutex_lock(log_lock); + if (need_data_lock) pthread_mutex_lock(&rli->data_lock); + pthread_mutex_lock(log_lock); + /* Close log file and free buffers if it's already open */ if (rli->cur_log_fd >= 0) { @@ -298,9 +300,11 @@ err: if (!relay_log_purge) rli->log_space_limit= 0; pthread_cond_broadcast(&rli->data_cond); + + pthread_mutex_unlock(log_lock); + if (need_data_lock) pthread_mutex_unlock(&rli->data_lock); - pthread_mutex_unlock(log_lock); DBUG_RETURN ((*errmsg) ? 1 : 0); } @@ -1410,6 +1414,20 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli) } +/* + Reset UNTIL condition for RELAY_LOG_INFO + SYNOPSYS + clear_until_condition() + rli - RELAY_LOG_INFO structure where UNTIL condition should be reset + */ +void clear_until_condition(RELAY_LOG_INFO* rli) +{ + rli->until_condition= RELAY_LOG_INFO::UNTIL_NONE; + rli->until_log_name[0]= 0; + rli->until_log_pos= 0; +} + + int init_master_info(MASTER_INFO* mi, const char* master_info_fname, const char* slave_info_fname, bool abort_if_no_master_info_file) @@ -1648,6 +1666,10 @@ int show_master_info(THD* thd, MASTER_INFO* mi) MYSQL_TYPE_LONGLONG)); field_list.push_back(new Item_return_int("Relay_log_space", 10, MYSQL_TYPE_LONGLONG)); + field_list.push_back(new Item_empty_string("Until_condition", 6)); + field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN)); + field_list.push_back(new Item_return_int("Until_Log_pos", 10, + MYSQL_TYPE_LONGLONG)); if (protocol->send_fields(&field_list, 1)) DBUG_RETURN(-1); @@ -1694,6 +1716,14 @@ int show_master_info(THD* thd, MASTER_INFO* mi) protocol->store((uint32) mi->rli.slave_skip_counter); protocol->store((ulonglong) mi->rli.group_master_log_pos); protocol->store((ulonglong) mi->rli.log_space_total); + + protocol->store( + mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_NONE ? "None": + ( mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_MASTER_POS? "Master": + "Relay"), &my_charset_bin); + protocol->store(mi->rli.until_log_name, &my_charset_bin); + protocol->store((ulonglong) mi->rli.until_log_pos); + pthread_mutex_unlock(&mi->rli.data_lock); pthread_mutex_unlock(&mi->data_lock); @@ -1727,11 +1757,11 @@ st_relay_log_info::st_relay_log_info() cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0), - slave_running(0) + slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0) { group_relay_log_name[0]= event_relay_log_name[0]= group_master_log_name[0]= 0; - last_slave_error[0]=0; - + last_slave_error[0]=0; until_log_name[0]= 0; + bzero(&info_file,sizeof(info_file)); bzero(&cache_buf, sizeof(cache_buf)); pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); @@ -2151,14 +2181,127 @@ point. If you are sure that your master is ok, run this query manually on the\ } } +/* + Check if condition stated in UNTIL clause of START SLAVE is reached. + SYNOPSYS + st_relay_log_info::is_until_satisfied() + DESCRIPTION + Checks if UNTIL condition is reached. Uses caching result of last + comparison of current log file name and target log file name. So cached + value should be invalidated if current log file name changes + (see st_relay_log_info::notify_... functions). + + This caching is needed to avoid of expensive string comparisons and + strtol() conversions needed for log names comparison. We don't need to + compare them each time this function is called, we only need to do this + when current log name changes. If we have UNTIL_MASTER_POS condition we + need to do this only after Rotate_log_event::exec_event() (which is + rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS + condition then we should invalidate cached comarison value after + inc_group_relay_log_pos() which called for each group of events (so we + have some benefit if we have something like queries that use + autoincrement or if we have transactions). + + Should be called ONLY if until_condition != UNTIL_NONE ! + RETURN VALUE + true - condition met or error happened (condition seems to have + bad log file name) + false - condition not met +*/ + +bool st_relay_log_info::is_until_satisfied() +{ + const char *log_name; + ulonglong log_pos; + + DBUG_ASSERT(until_condition != UNTIL_NONE); + + if (until_condition == UNTIL_MASTER_POS) + { + log_name= group_master_log_name; + log_pos= group_master_log_pos; + } + else + { /* until_condition == UNTIL_RELAY_POS */ + log_name= group_relay_log_name; + log_pos= group_relay_log_pos; + } + + if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN) + { + /* + We have no cached comaprison results so we should compare log names + and cache result + */ + + DBUG_ASSERT(*log_name || log_pos == 0); + + if (*log_name) + { + const char *basename= log_name + dirname_length(log_name); + + const char *q= (const char*)(fn_ext(basename)+1); + if (strncmp(basename, until_log_name, (int)(q-basename)) == 0) + { + /* Now compare extensions. */ + char *q_end; + ulong log_name_extension= strtoul(q, &q_end, 10); + if (log_name_extension < until_log_name_extension) + until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS; + else + until_log_names_cmp_result= + (log_name_extension > until_log_name_extension) ? + UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ; + } + else + { + /* Probably error so we aborting */ + sql_print_error("Slave SQL thread is stopped because UNTIL " + "condition is bad."); + return true; + } + } + else + return until_log_pos == 0; + } + + return ((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL && + log_pos >= until_log_pos) || + until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER); +} + static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) { + /* + We acquire this mutex since we need it for all operations except + event execution. But we will release it in places where we will + wait for something for example inside of next_event(). + */ + pthread_mutex_lock(&rli->data_lock); + + if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE && + rli->is_until_satisfied()) + { + sql_print_error("Slave SQL thread stopped because it reached its" + " UNTIL position"); + /* + Setting abort_slave flag because we do not want additional message about + error in query execution to be printed. + */ + rli->abort_slave= 1; + pthread_mutex_unlock(&rli->data_lock); + return 1; + } + Log_event * ev = next_event(rli); + DBUG_ASSERT(rli->sql_thd==thd); + if (sql_slave_killed(thd,rli)) { /* do not forget to free ev ! */ + pthread_mutex_unlock(&rli->data_lock); if (ev) delete ev; return 1; } @@ -2166,7 +2309,6 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) { int type_code = ev->get_type_code(); int exec_res; - pthread_mutex_lock(&rli->data_lock); /* Skip queries originating from this server or number of @@ -2195,7 +2337,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) pthread_mutex_unlock(&rli->data_lock); delete ev; return 0; // avoid infinite update loops - } + } pthread_mutex_unlock(&rli->data_lock); thd->server_id = ev->server_id; // use the original server id for logging @@ -2211,6 +2353,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) } else { + pthread_mutex_unlock(&rli->data_lock); sql_print_error("\ Could not parse relay log event entry. The possible reasons are: the master's \ binary log is corrupted (you can check this by running 'mysqlbinlog' on the \ @@ -3228,17 +3371,18 @@ Log_event* next_event(RELAY_LOG_INFO* rli) pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); const char* errmsg=0; THD* thd = rli->sql_thd; + DBUG_ENTER("next_event"); DBUG_ASSERT(thd != 0); /* For most operations we need to protect rli members with data_lock, - so we will hold it for the most of the loop below - However, we will release it whenever it is worth the hassle, - and in the cases when we go into a pthread_cond_wait() with the - non-data_lock mutex + so we assume calling function acquired this mutex for us and we will + hold it for the most of the loop below However, we will release it + whenever it is worth the hassle, and in the cases when we go into a + pthread_cond_wait() with the non-data_lock mutex */ - pthread_mutex_lock(&rli->data_lock); + safe_mutex_assert_owner(&rli->data_lock); while (!sql_slave_killed(thd,rli)) { @@ -3284,7 +3428,6 @@ Log_event* next_event(RELAY_LOG_INFO* rli) DBUG_ASSERT(thd==rli->sql_thd); if (hot_log) pthread_mutex_unlock(log_lock); - pthread_mutex_unlock(&rli->data_lock); DBUG_RETURN(ev); } DBUG_ASSERT(thd==rli->sql_thd); @@ -3460,7 +3603,6 @@ event(errno: %d cur_log->error: %d)", errmsg = "slave SQL thread was killed"; err: - pthread_mutex_unlock(&rli->data_lock); if (errmsg) sql_print_error("Error reading relay log event: %s", errmsg); DBUG_RETURN(0); |