summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc170
1 files changed, 156 insertions, 14 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 37979576b73..2747ce548e1 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -232,10 +232,12 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
*errmsg=0;
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
- pthread_mutex_lock(log_lock);
+
if (need_data_lock)
pthread_mutex_lock(&rli->data_lock);
+ pthread_mutex_lock(log_lock);
+
/* Close log file and free buffers if it's already open */
if (rli->cur_log_fd >= 0)
{
@@ -298,9 +300,11 @@ err:
if (!relay_log_purge)
rli->log_space_limit= 0;
pthread_cond_broadcast(&rli->data_cond);
+
+ pthread_mutex_unlock(log_lock);
+
if (need_data_lock)
pthread_mutex_unlock(&rli->data_lock);
- pthread_mutex_unlock(log_lock);
DBUG_RETURN ((*errmsg) ? 1 : 0);
}
@@ -1410,6 +1414,20 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
}
+/*
+ Reset UNTIL condition for RELAY_LOG_INFO
+ SYNOPSYS
+ clear_until_condition()
+ rli - RELAY_LOG_INFO structure where UNTIL condition should be reset
+ */
+void clear_until_condition(RELAY_LOG_INFO* rli)
+{
+ rli->until_condition= RELAY_LOG_INFO::UNTIL_NONE;
+ rli->until_log_name[0]= 0;
+ rli->until_log_pos= 0;
+}
+
+
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
const char* slave_info_fname,
bool abort_if_no_master_info_file)
@@ -1648,6 +1666,10 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
MYSQL_TYPE_LONGLONG));
field_list.push_back(new Item_return_int("Relay_log_space", 10,
MYSQL_TYPE_LONGLONG));
+ field_list.push_back(new Item_empty_string("Until_condition", 6));
+ field_list.push_back(new Item_empty_string("Until_Log_File", FN_REFLEN));
+ field_list.push_back(new Item_return_int("Until_Log_pos", 10,
+ MYSQL_TYPE_LONGLONG));
if (protocol->send_fields(&field_list, 1))
DBUG_RETURN(-1);
@@ -1694,6 +1716,14 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
protocol->store((uint32) mi->rli.slave_skip_counter);
protocol->store((ulonglong) mi->rli.group_master_log_pos);
protocol->store((ulonglong) mi->rli.log_space_total);
+
+ protocol->store(
+ mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_NONE ? "None":
+ ( mi->rli.until_condition==RELAY_LOG_INFO::UNTIL_MASTER_POS? "Master":
+ "Relay"), &my_charset_bin);
+ protocol->store(mi->rli.until_log_name, &my_charset_bin);
+ protocol->store((ulonglong) mi->rli.until_log_pos);
+
pthread_mutex_unlock(&mi->rli.data_lock);
pthread_mutex_unlock(&mi->data_lock);
@@ -1727,11 +1757,11 @@ st_relay_log_info::st_relay_log_info()
cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
ignore_log_space_limit(0), slave_skip_counter(0), abort_pos_wait(0),
slave_run_id(0), sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0),
- slave_running(0)
+ slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0)
{
group_relay_log_name[0]= event_relay_log_name[0]= group_master_log_name[0]= 0;
- last_slave_error[0]=0;
-
+ last_slave_error[0]=0; until_log_name[0]= 0;
+
bzero(&info_file,sizeof(info_file));
bzero(&cache_buf, sizeof(cache_buf));
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
@@ -2151,14 +2181,127 @@ point. If you are sure that your master is ok, run this query manually on the\
}
}
+/*
+ Check if condition stated in UNTIL clause of START SLAVE is reached.
+ SYNOPSYS
+ st_relay_log_info::is_until_satisfied()
+ DESCRIPTION
+ Checks if UNTIL condition is reached. Uses caching result of last
+ comparison of current log file name and target log file name. So cached
+ value should be invalidated if current log file name changes
+ (see st_relay_log_info::notify_... functions).
+
+ This caching is needed to avoid of expensive string comparisons and
+ strtol() conversions needed for log names comparison. We don't need to
+ compare them each time this function is called, we only need to do this
+ when current log name changes. If we have UNTIL_MASTER_POS condition we
+ need to do this only after Rotate_log_event::exec_event() (which is
+ rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
+ condition then we should invalidate cached comarison value after
+ inc_group_relay_log_pos() which called for each group of events (so we
+ have some benefit if we have something like queries that use
+ autoincrement or if we have transactions).
+
+ Should be called ONLY if until_condition != UNTIL_NONE !
+ RETURN VALUE
+ true - condition met or error happened (condition seems to have
+ bad log file name)
+ false - condition not met
+*/
+
+bool st_relay_log_info::is_until_satisfied()
+{
+ const char *log_name;
+ ulonglong log_pos;
+
+ DBUG_ASSERT(until_condition != UNTIL_NONE);
+
+ if (until_condition == UNTIL_MASTER_POS)
+ {
+ log_name= group_master_log_name;
+ log_pos= group_master_log_pos;
+ }
+ else
+ { /* until_condition == UNTIL_RELAY_POS */
+ log_name= group_relay_log_name;
+ log_pos= group_relay_log_pos;
+ }
+
+ if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
+ {
+ /*
+ We have no cached comaprison results so we should compare log names
+ and cache result
+ */
+
+ DBUG_ASSERT(*log_name || log_pos == 0);
+
+ if (*log_name)
+ {
+ const char *basename= log_name + dirname_length(log_name);
+
+ const char *q= (const char*)(fn_ext(basename)+1);
+ if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
+ {
+ /* Now compare extensions. */
+ char *q_end;
+ ulong log_name_extension= strtoul(q, &q_end, 10);
+ if (log_name_extension < until_log_name_extension)
+ until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
+ else
+ until_log_names_cmp_result=
+ (log_name_extension > until_log_name_extension) ?
+ UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
+ }
+ else
+ {
+ /* Probably error so we aborting */
+ sql_print_error("Slave SQL thread is stopped because UNTIL "
+ "condition is bad.");
+ return true;
+ }
+ }
+ else
+ return until_log_pos == 0;
+ }
+
+ return ((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
+ log_pos >= until_log_pos) ||
+ until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER);
+}
+
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
{
+ /*
+ We acquire this mutex since we need it for all operations except
+ event execution. But we will release it in places where we will
+ wait for something for example inside of next_event().
+ */
+ pthread_mutex_lock(&rli->data_lock);
+
+ if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE &&
+ rli->is_until_satisfied())
+ {
+ sql_print_error("Slave SQL thread stopped because it reached its"
+ " UNTIL position");
+ /*
+ Setting abort_slave flag because we do not want additional message about
+ error in query execution to be printed.
+ */
+ rli->abort_slave= 1;
+ pthread_mutex_unlock(&rli->data_lock);
+ return 1;
+ }
+
Log_event * ev = next_event(rli);
+
DBUG_ASSERT(rli->sql_thd==thd);
+
if (sql_slave_killed(thd,rli))
{
/* do not forget to free ev ! */
+ pthread_mutex_unlock(&rli->data_lock);
if (ev) delete ev;
return 1;
}
@@ -2166,7 +2309,6 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
{
int type_code = ev->get_type_code();
int exec_res;
- pthread_mutex_lock(&rli->data_lock);
/*
Skip queries originating from this server or number of
@@ -2195,7 +2337,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
pthread_mutex_unlock(&rli->data_lock);
delete ev;
return 0; // avoid infinite update loops
- }
+ }
pthread_mutex_unlock(&rli->data_lock);
thd->server_id = ev->server_id; // use the original server id for logging
@@ -2211,6 +2353,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
}
else
{
+ pthread_mutex_unlock(&rli->data_lock);
sql_print_error("\
Could not parse relay log event entry. The possible reasons are: the master's \
binary log is corrupted (you can check this by running 'mysqlbinlog' on the \
@@ -3228,17 +3371,18 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
const char* errmsg=0;
THD* thd = rli->sql_thd;
+
DBUG_ENTER("next_event");
DBUG_ASSERT(thd != 0);
/*
For most operations we need to protect rli members with data_lock,
- so we will hold it for the most of the loop below
- However, we will release it whenever it is worth the hassle,
- and in the cases when we go into a pthread_cond_wait() with the
- non-data_lock mutex
+ so we assume calling function acquired this mutex for us and we will
+ hold it for the most of the loop below However, we will release it
+ whenever it is worth the hassle, and in the cases when we go into a
+ pthread_cond_wait() with the non-data_lock mutex
*/
- pthread_mutex_lock(&rli->data_lock);
+ safe_mutex_assert_owner(&rli->data_lock);
while (!sql_slave_killed(thd,rli))
{
@@ -3284,7 +3428,6 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
DBUG_ASSERT(thd==rli->sql_thd);
if (hot_log)
pthread_mutex_unlock(log_lock);
- pthread_mutex_unlock(&rli->data_lock);
DBUG_RETURN(ev);
}
DBUG_ASSERT(thd==rli->sql_thd);
@@ -3460,7 +3603,6 @@ event(errno: %d cur_log->error: %d)",
errmsg = "slave SQL thread was killed";
err:
- pthread_mutex_unlock(&rli->data_lock);
if (errmsg)
sql_print_error("Error reading relay log event: %s", errmsg);
DBUG_RETURN(0);