diff options
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_rbr_monitor.result | 57 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_rbr_monitor.test | 69 | ||||
-rw-r--r-- | sql/log_event.cc | 102 |
3 files changed, 208 insertions, 20 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_rbr_monitor.result b/mysql-test/suite/rpl/r/rpl_rbr_monitor.result new file mode 100644 index 00000000000..1079f02b5e3 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_rbr_monitor.result @@ -0,0 +1,57 @@ +include/master-slave.inc +[connection master] +connection master; +create table t1(a int primary key); +connection slave; +SET GLOBAL debug_dbug="+d,should_sleep_for_mdev7409"; +select * from t1; +a +connection master; +insert into t1(a) values(1); +#monitoring write rows +connection slave; +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +#Write_row:- Writing row on table t1 +SET DEBUG_SYNC = 'now SIGNAL done'; +#monitoring update rows +connection master; +update t1 set a = a + 4194304 ; +connection slave; +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +#Update_row:- finding row() that need to be updated from table t1 +SET DEBUG_SYNC = 'now SIGNAL done'; +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +#Update_row:- unpacking row for table t1 +SET DEBUG_SYNC = 'now SIGNAL done'; +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +#Update Row:- Updating row for table t1. +SET DEBUG_SYNC = 'now SIGNAL done'; +#monitoring delete rows +connection master; +delete from t1 where a>1; +connection slave; +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +#Delete_row:- finding row that need to be deleted from table t1 +SET DEBUG_SYNC = 'now SIGNAL done'; +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +#Delete_row:- deleting row from table t1 +SET DEBUG_SYNC = 'now SIGNAL done'; +connection master; +drop table t1; +connection slave; +SET GLOBAL debug_dbug=""; +SET DEBUG_SYNC= 'RESET'; +include/rpl_end.inc +connection server_2; +connection server_2; +connection server_2; +connection server_2; +connection server_1; +connection server_1; +connection server_1; +connection server_2; +connection server_1; +connection server_2; +connection server_2; +connection server_1; +connection server_1; diff --git a/mysql-test/suite/rpl/t/rpl_rbr_monitor.test b/mysql-test/suite/rpl/t/rpl_rbr_monitor.test new file mode 100644 index 00000000000..cc948ab60c6 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_rbr_monitor.test @@ -0,0 +1,69 @@ +--source include/have_innodb.inc +--source include/have_binlog_format_row.inc +--source include/have_debug.inc +--source include/have_debug_sync.inc +--source include/master-slave.inc +--enable_connect_log + +--connection master +create table t1(a int primary key); +--save_master_pos + +--connection slave +--sync_with_master +SET GLOBAL debug_dbug="+d,should_sleep_for_mdev7409"; +select * from t1; + +--connection master +insert into t1(a) values(1); +--save_master_pos + +--echo #monitoring write rows +--connection slave + +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +--echo #Write_row:- Writing row on table t1 +SET DEBUG_SYNC = 'now SIGNAL done'; +--sync_with_master + +--echo #monitoring update rows +--connection master +update t1 set a = a + 4194304 ; + +--connection slave +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +--echo #Update_row:- finding row() that need to be updated from table t1 +SET DEBUG_SYNC = 'now SIGNAL done'; + +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +--echo #Update_row:- unpacking row for table t1 +SET DEBUG_SYNC = 'now SIGNAL done'; + +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +--echo #Update Row:- Updating row for table t1. +SET DEBUG_SYNC = 'now SIGNAL done'; + +--sync_with_master + +--echo #monitoring delete rows +--connection master +delete from t1 where a>1; + +--connection slave + +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +--echo #Delete_row:- finding row that need to be deleted from table t1 +SET DEBUG_SYNC = 'now SIGNAL done'; + +SET DEBUG_SYNC='now WAIT_FOR thd_info_set'; +--echo #Delete_row:- deleting row from table t1 +SET DEBUG_SYNC = 'now SIGNAL done'; +--sync_with_master + +#CleanUp +--connection master +drop table t1; +--connection slave +SET GLOBAL debug_dbug=""; +SET DEBUG_SYNC= 'RESET'; +--source include/rpl_end.inc diff --git a/sql/log_event.cc b/sql/log_event.cc index bae723402e7..5ba93893727 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -50,6 +50,7 @@ #include "rpl_utility.h" #include "rpl_constants.h" #include "sql_digest.h" +#include "debug_sync.h" #define my_b_write_string(A, B) my_b_write((A), (uchar*)(B), (uint) (sizeof(B) - 1)) @@ -11772,18 +11773,30 @@ Write_rows_log_event::do_exec_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table != NULL); const char *tmp= thd->get_proc_info(); - const char *message= "Write_rows_log_event::write_row()"; + char *tmp_db= thd->db, *table_name= m_table->s->table_name.str; + char *message, msg[128]; + my_snprintf(msg, sizeof(msg),"Write_row:- writing row on table %s", + table_name); + thd->db= m_table->s->db.str; + message= msg; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Write_rows_log_event::write_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Write_row:- Writing row(%lld) on table %s", + (long long) wsrep_thd_trx_seqno(thd), table_name); message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + if(!my_strcasecmp(system_charset_info, "test", thd->db) && + !my_strcasecmp(system_charset_info,"t1", table_name)) + debug_sync_set_action(thd, + STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done")); + };); int error= write_row(rgi, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT); thd_proc_info(thd, tmp); + thd->db= tmp_db; if (error && !thd->is_error()) { @@ -12379,32 +12392,51 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) { int error; const char *tmp= thd->get_proc_info(); - const char *message= "Delete_rows_log_event::find_row()"; + char *tmp_db= thd->db, *table_name= m_table->s->table_name.str; + char *message, msg[128]; + my_snprintf(msg, sizeof(msg),"Delete_row:- finding row that need to be deleted from table %s", + table_name); + thd->db= m_table->s->db.str; + message= msg; const bool invoke_triggers= slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers; DBUG_ASSERT(m_table != NULL); #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Delete_rows_log_event::find_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Delete_rows:- finding row(%lld) that need to be deleted from table %s", + (long long) wsrep_thd_trx_seqno(thd), table_name) ; message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + if(!my_strcasecmp(system_charset_info, "test", thd->db) && + !my_strcasecmp(system_charset_info,"t1", table_name)) + debug_sync_set_action(thd, + STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done")); + };); if (!(error= find_row(rgi))) { /* Delete the record found, located in record[0] */ - message= "Delete_rows_log_event::ha_delete_row()"; + my_snprintf(msg, sizeof(msg),"Delete_row:- deleting row from table %s", + table_name); + message= msg; #ifdef WSREP_PROC_INFO snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Delete_rows_log_event::ha_delete_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Delete_row:- deleting row(%lld) from table %s", + (long long) wsrep_thd_trx_seqno(thd), table_name) ; message= thd->wsrep_info; #endif thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + if(!my_strcasecmp(system_charset_info, "test", thd->db) && + !my_strcasecmp(system_charset_info,"t1", table_name)) + debug_sync_set_action(thd, + STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done")); + };); if (invoke_triggers && process_triggers(TRG_EVENT_DELETE, TRG_ACTION_BEFORE, FALSE)) @@ -12421,6 +12453,7 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) m_table->file->ha_index_or_rnd_end(); } thd_proc_info(thd, tmp); + thd->db= tmp_db; return error; } @@ -12539,17 +12572,29 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) const bool invoke_triggers= slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers; const char *tmp= thd->get_proc_info(); - const char *message= "Update_rows_log_event::find_row()"; + char *tmp_db= thd->db, *table_name= m_table->s->table_name.str; + char *message, msg[128]; DBUG_ASSERT(m_table != NULL); + my_snprintf(msg, sizeof(msg), "Update_row:- finding row() that need to be updated from table %s", + table_name); + thd->db= m_table->s->db.str; + message= msg; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Update_rows_log_event::find_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Update_row:- finding row(%lld) that need to be updated from table %s", + (long long) wsrep_thd_trx_seqno(thd), table_name) ; message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + if(!my_strcasecmp(system_charset_info, "test", thd->db) && + !my_strcasecmp(system_charset_info,"t1", table_name)) + debug_sync_set_action(thd, + STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done")); + };); + int error= find_row(rgi); if (error) { @@ -12560,6 +12605,7 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) if ((m_curr_row= m_curr_row_end)) unpack_current_row(rgi, &m_cols_ai); thd_proc_info(thd, tmp); + thd->db= tmp_db; return error; } @@ -12577,16 +12623,24 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) store_record(m_table,record[1]); m_curr_row= m_curr_row_end; - message= "Update_rows_log_event::unpack_current_row()"; + my_snprintf(msg, sizeof(msg),"Update_row:- unpacking row for table %s", + table_name); + message= msg; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Update_rows_log_event::unpack_current_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Update_row:- unpacking row(%lld) for table %s", + (long long) wsrep_thd_trx_seqno(thd), table_name) ; message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ /* this also updates m_curr_row_end */ thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + if(!my_strcasecmp(system_charset_info, "test", thd->db) && + !my_strcasecmp(system_charset_info,"t1", table_name)) + debug_sync_set_action(thd, + STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done")); + };); if ((error= unpack_current_row(rgi, &m_cols_ai))) goto err; @@ -12604,15 +12658,23 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength); #endif - message= "Update_rows_log_event::ha_update_row()"; + my_snprintf(msg, sizeof(msg),"Update_row:- Updating row for table %s", + table_name); + message= msg; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Update_rows_log_event::ha_update_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Update_row:- Updating row(%lld) for table %s", + (long long) wsrep_thd_trx_seqno(thd), table_name) ; message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + if(!my_strcasecmp(system_charset_info, "test", thd->db) && + !my_strcasecmp(system_charset_info,"t1", table_name)) + debug_sync_set_action(thd, + STRING_WITH_LEN("now SIGNAL thd_info_set WAIT_FOR done")); + };); if (invoke_triggers && process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_BEFORE, TRUE)) { @@ -12634,9 +12696,9 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_AFTER, TRUE)) error= HA_ERR_GENERIC; // in case if error is not set yet - thd_proc_info(thd, tmp); - err: + thd_proc_info(thd, tmp); + thd->db= tmp_db; m_table->file->ha_index_or_rnd_end(); return error; } |