summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/r/rpl_rbr_monitor.result57
-rw-r--r--mysql-test/suite/rpl/t/rpl_rbr_monitor.test69
-rw-r--r--sql/log_event.cc102
3 files changed, 208 insertions, 20 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_rbr_monitor.result b/mysql-test/suite/rpl/r/rpl_rbr_monitor.result
new file mode 100644
index 00000000000..1079f02b5e3
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_rbr_monitor.result
@@ -0,0 +1,57 @@
+include/master-slave.inc
+[connection master]
+connection master;
+create table t1(a int primary key);
+connection slave;
+SET GLOBAL debug_dbug="+d,should_sleep_for_mdev7409";
+select * from t1;
+a
+connection master;
+insert into t1(a) values(1);
+#monitoring write rows
+connection slave;
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+#Write_row:- Writing row on table t1
+SET DEBUG_SYNC = 'now SIGNAL done';
+#monitoring update rows
+connection master;
+update t1 set a = a + 4194304 ;
+connection slave;
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+#Update_row:- finding row() that need to be updated from table t1
+SET DEBUG_SYNC = 'now SIGNAL done';
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+#Update_row:- unpacking row for table t1
+SET DEBUG_SYNC = 'now SIGNAL done';
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+#Update Row:- Updating row for table t1.
+SET DEBUG_SYNC = 'now SIGNAL done';
+#monitoring delete rows
+connection master;
+delete from t1 where a>1;
+connection slave;
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+#Delete_row:- finding row that need to be deleted from table t1
+SET DEBUG_SYNC = 'now SIGNAL done';
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+#Delete_row:- deleting row from table t1
+SET DEBUG_SYNC = 'now SIGNAL done';
+connection master;
+drop table t1;
+connection slave;
+SET GLOBAL debug_dbug="";
+SET DEBUG_SYNC= 'RESET';
+include/rpl_end.inc
+connection server_2;
+connection server_2;
+connection server_2;
+connection server_2;
+connection server_1;
+connection server_1;
+connection server_1;
+connection server_2;
+connection server_1;
+connection server_2;
+connection server_2;
+connection server_1;
+connection server_1;
diff --git a/mysql-test/suite/rpl/t/rpl_rbr_monitor.test b/mysql-test/suite/rpl/t/rpl_rbr_monitor.test
new file mode 100644
index 00000000000..cc948ab60c6
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_rbr_monitor.test
@@ -0,0 +1,69 @@
+--source include/have_innodb.inc
+--source include/have_binlog_format_row.inc
+--source include/have_debug.inc
+--source include/have_debug_sync.inc
+--source include/master-slave.inc
+--enable_connect_log
+
+--connection master
+create table t1(a int primary key);
+--save_master_pos
+
+--connection slave
+--sync_with_master
+SET GLOBAL debug_dbug="+d,should_sleep_for_mdev7409";
+select * from t1;
+
+--connection master
+insert into t1(a) values(1);
+--save_master_pos
+
+--echo #monitoring write rows
+--connection slave
+
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+--echo #Write_row:- Writing row on table t1
+SET DEBUG_SYNC = 'now SIGNAL done';
+--sync_with_master
+
+--echo #monitoring update rows
+--connection master
+update t1 set a = a + 4194304 ;
+
+--connection slave
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+--echo #Update_row:- finding row() that need to be updated from table t1
+SET DEBUG_SYNC = 'now SIGNAL done';
+
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+--echo #Update_row:- unpacking row for table t1
+SET DEBUG_SYNC = 'now SIGNAL done';
+
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+--echo #Update Row:- Updating row for table t1.
+SET DEBUG_SYNC = 'now SIGNAL done';
+
+--sync_with_master
+
+--echo #monitoring delete rows
+--connection master
+delete from t1 where a>1;
+
+--connection slave
+
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+--echo #Delete_row:- finding row that need to be deleted from table t1
+SET DEBUG_SYNC = 'now SIGNAL done';
+
+SET DEBUG_SYNC='now WAIT_FOR thd_info_set';
+--echo #Delete_row:- deleting row from table t1
+SET DEBUG_SYNC = 'now SIGNAL done';
+--sync_with_master
+
+#CleanUp
+--connection master
+drop table t1;
+--connection slave
+SET GLOBAL debug_dbug="";
+SET DEBUG_SYNC= 'RESET';
+--source include/rpl_end.inc
diff --git a/sql/log_event.cc b/sql/log_event.cc
index bae723402e7..5ba93893727 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -50,6 +50,7 @@
#include "rpl_utility.h"
#include "rpl_constants.h"
#include "sql_digest.h"
+#include "debug_sync.h"
#define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1))
@@ -11772,18 +11773,30 @@ Write_rows_log_event::do_exec_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table != NULL);
const char *tmp= thd->get_proc_info();
- const char *message= "Write_rows_log_event::write_row()";
+ char *tmp_db= thd->db, *table_name= m_table->s->table_name.str;
+ char *message, msg[128];
+ my_snprintf(msg, sizeof(msg),"Write_row:- writing row on table %s",
+ table_name);
+ thd->db= m_table->s->db.str;
+ message= msg;
#ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Write_rows_log_event::write_row(%lld)",
- (long long) wsrep_thd_trx_seqno(thd));
+ "Write_row:- Writing row(%lld) on table %s",
+ (long long) wsrep_thd_trx_seqno(thd), table_name);
message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */
thd_proc_info(thd, message);
+ DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
+ if(!my_strcasecmp(system_charset_info, "test", thd->db) &&
+ !my_strcasecmp(system_charset_info,"t1", table_name))
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done"));
+ };);
int error= write_row(rgi, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
thd_proc_info(thd, tmp);
+ thd->db= tmp_db;
if (error && !thd->is_error())
{
@@ -12379,32 +12392,51 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
{
int error;
const char *tmp= thd->get_proc_info();
- const char *message= "Delete_rows_log_event::find_row()";
+ char *tmp_db= thd->db, *table_name= m_table->s->table_name.str;
+ char *message, msg[128];
+ my_snprintf(msg, sizeof(msg),"Delete_row:- finding row that need to be deleted from table %s",
+ table_name);
+ thd->db= m_table->s->db.str;
+ message= msg;
const bool invoke_triggers=
slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers;
DBUG_ASSERT(m_table != NULL);
#ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Delete_rows_log_event::find_row(%lld)",
- (long long) wsrep_thd_trx_seqno(thd));
+ "Delete_rows:- finding row(%lld) that need to be deleted from table %s",
+ (long long) wsrep_thd_trx_seqno(thd), table_name) ;
message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */
thd_proc_info(thd, message);
+ DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
+ if(!my_strcasecmp(system_charset_info, "test", thd->db) &&
+ !my_strcasecmp(system_charset_info,"t1", table_name))
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done"));
+ };);
if (!(error= find_row(rgi)))
{
/*
Delete the record found, located in record[0]
*/
- message= "Delete_rows_log_event::ha_delete_row()";
+ my_snprintf(msg, sizeof(msg),"Delete_row:- deleting row from table %s",
+ table_name);
+ message= msg;
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Delete_rows_log_event::ha_delete_row(%lld)",
- (long long) wsrep_thd_trx_seqno(thd));
+ "Delete_row:- deleting row(%lld) from table %s",
+ (long long) wsrep_thd_trx_seqno(thd), table_name) ;
message= thd->wsrep_info;
#endif
thd_proc_info(thd, message);
+ DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
+ if(!my_strcasecmp(system_charset_info, "test", thd->db) &&
+ !my_strcasecmp(system_charset_info,"t1", table_name))
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done"));
+ };);
if (invoke_triggers &&
process_triggers(TRG_EVENT_DELETE, TRG_ACTION_BEFORE, FALSE))
@@ -12421,6 +12453,7 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
m_table->file->ha_index_or_rnd_end();
}
thd_proc_info(thd, tmp);
+ thd->db= tmp_db;
return error;
}
@@ -12539,17 +12572,29 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
const bool invoke_triggers=
slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers;
const char *tmp= thd->get_proc_info();
- const char *message= "Update_rows_log_event::find_row()";
+ char *tmp_db= thd->db, *table_name= m_table->s->table_name.str;
+ char *message, msg[128];
DBUG_ASSERT(m_table != NULL);
+ my_snprintf(msg, sizeof(msg), "Update_row:- finding row() that need to be updated from table %s",
+ table_name);
+ thd->db= m_table->s->db.str;
+ message= msg;
#ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Update_rows_log_event::find_row(%lld)",
- (long long) wsrep_thd_trx_seqno(thd));
+ "Update_row:- finding row(%lld) that need to be updated from table %s",
+ (long long) wsrep_thd_trx_seqno(thd), table_name) ;
message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */
thd_proc_info(thd, message);
+ DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
+ if(!my_strcasecmp(system_charset_info, "test", thd->db) &&
+ !my_strcasecmp(system_charset_info,"t1", table_name))
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done"));
+ };);
+
int error= find_row(rgi);
if (error)
{
@@ -12560,6 +12605,7 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
if ((m_curr_row= m_curr_row_end))
unpack_current_row(rgi, &m_cols_ai);
thd_proc_info(thd, tmp);
+ thd->db= tmp_db;
return error;
}
@@ -12577,16 +12623,24 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
store_record(m_table,record[1]);
m_curr_row= m_curr_row_end;
- message= "Update_rows_log_event::unpack_current_row()";
+ my_snprintf(msg, sizeof(msg),"Update_row:- unpacking row for table %s",
+ table_name);
+ message= msg;
#ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Update_rows_log_event::unpack_current_row(%lld)",
- (long long) wsrep_thd_trx_seqno(thd));
+ "Update_row:- unpacking row(%lld) for table %s",
+ (long long) wsrep_thd_trx_seqno(thd), table_name) ;
message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */
/* this also updates m_curr_row_end */
thd_proc_info(thd, message);
+ DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
+ if(!my_strcasecmp(system_charset_info, "test", thd->db) &&
+ !my_strcasecmp(system_charset_info,"t1", table_name))
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done"));
+ };);
if ((error= unpack_current_row(rgi, &m_cols_ai)))
goto err;
@@ -12604,15 +12658,23 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
#endif
- message= "Update_rows_log_event::ha_update_row()";
+ my_snprintf(msg, sizeof(msg),"Update_row:- Updating row for table %s",
+ table_name);
+ message= msg;
#ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Update_rows_log_event::ha_update_row(%lld)",
- (long long) wsrep_thd_trx_seqno(thd));
+ "Update_row:- Updating row(%lld) for table %s",
+ (long long) wsrep_thd_trx_seqno(thd), table_name) ;
message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */
thd_proc_info(thd, message);
+ DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{
+ if(!my_strcasecmp(system_charset_info, "test", thd->db) &&
+ !my_strcasecmp(system_charset_info,"t1", table_name))
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done"));
+ };);
if (invoke_triggers &&
process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_BEFORE, TRUE))
{
@@ -12634,9 +12696,9 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_AFTER, TRUE))
error= HA_ERR_GENERIC; // in case if error is not set yet
- thd_proc_info(thd, tmp);
-
err:
+ thd_proc_info(thd, tmp);
+ thd->db= tmp_db;
m_table->file->ha_index_or_rnd_end();
return error;
}