diff options
author | sachin <sachin.setiya@mariadb.com> | 2020-11-17 20:31:02 +0000 |
---|---|---|
committer | sachin <sachin.setiya@mariadb.com> | 2020-11-17 20:31:02 +0000 |
commit | f2f3fdc068b59e7bd29eaa7197bb88ea2420fe40 (patch) | |
tree | 715f27c479c2735321b4ee181db513c8fdb18946 | |
parent | 92cb9944231536a249e514ae1bde4908bd4c3206 (diff) | |
download | mariadb-git-f2f3fdc068b59e7bd29eaa7197bb88ea2420fe40.tar.gz |
Queue working without any issue
-rw-r--r-- | mysql-test/suite/rpl/t/xyz.test | 45 | ||||
-rw-r--r-- | sql/log_event.cc | 3 | ||||
-rw-r--r-- | sql/queue.h | 24 | ||||
-rw-r--r-- | sql/rpl_mi.cc | 3 | ||||
-rw-r--r-- | sql/rpl_queue.h | 5 | ||||
-rw-r--r-- | sql/slave.cc | 14 |
6 files changed, 67 insertions, 27 deletions
diff --git a/mysql-test/suite/rpl/t/xyz.test b/mysql-test/suite/rpl/t/xyz.test index 387d2c05b57..27d40fed5dd 100644 --- a/mysql-test/suite/rpl/t/xyz.test +++ b/mysql-test/suite/rpl/t/xyz.test @@ -12,29 +12,42 @@ eval change master to master_host= "127.0.0.1", master_port=$SERVER_MYPORT_1, ma --connection master create table t1(a int, b int , c int ); insert into t1 values(1,1,1); ---sleep 5 -insert into t1 select * from t1; - insert into t1 select * from t1; - insert into t1 select * from t1; - insert into t1 select * from t1; - insert into t1 select * from t1; - insert into t1 select * from t1; - insert into t1 select * from t1; - insert into t1 select * from t1; -# insert into t1 select * from t1; -# insert into t1 select * from t1; -# insert into t1 select * from t1; -# insert into t1 select * from t1; -# insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +insert into t1 select * from t1; +#insert into t1 select * from t1; +#insert into t1 select * from t1; +#insert into t1 select * from t1; +#insert into t1 select * from t1; +#insert into t1 select * from t1; +#insert into t1 select * from t1; +#insert into t1 select * from t1; select count(*) from t1; +# show binlog events; --source include/save_master_gtid.inc --connection slave ---sleep 2 +--source include/sync_with_master_gtid.inc select count(*) from t1; ---sleep 999992 --query_vertical show slave status +--connection master +drop table t1; +--source include/save_master_gtid.inc + +--connection slave +--source include/sync_with_master_gtid.inc --source include/stop_slave.inc diff --git a/sql/log_event.cc b/sql/log_event.cc index 39d6e7e99b0..98cee843dd7 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -825,6 +825,9 @@ int Log_event::read_log_event(r_queue *rpl_queue, IO_CACHE* file, String* packet if (el) { packet->append((const char *)el->event, el->total_length); + data_len= el->total_length; + rpl_queue->unlock_mutex(); + delete el; goto direct_path; } } diff --git a/sql/queue.h b/sql/queue.h index 73e81f10ef9..b9e24a3323c 100644 --- a/sql/queue.h +++ b/sql/queue.h @@ -4,6 +4,7 @@ #include "my_global.h" #include "my_base.h" #include "my_pthread.h" +#include "my_sys.h" #include "mysql/psi/mysql_thread.h" #include <cstring> @@ -58,6 +59,14 @@ class circular_buffer_queue return 0; } + void destroy() + { + my_free(buffer); + mysql_mutex_destroy(&lock_queue); + mysql_mutex_destroy(&free_queue); + mysql_cond_destroy(&free_cond); + } + /* We want to write in continues memory. */ @@ -78,9 +87,10 @@ class circular_buffer_queue { mysql_mutex_lock(&lock_queue); Element_type *el= new Element_type(tail, buffer, buffer_end); + //We are not going to unlock mutex till we get explicit call of + //unlock_mutex by caller thread (that means sql thread has copied data + //into its buffer) tail= el->tail; - mysql_mutex_unlock(&lock_queue); - mysql_cond_broadcast(&free_cond); return el; } return NULL; @@ -94,6 +104,16 @@ class circular_buffer_queue mysql_mutex_unlock(&free_queue); return enqueue(elem); } + + void lock_mutex() + { + mysql_mutex_lock(&lock_queue); + } + void unlock_mutex() + { + mysql_mutex_unlock(&lock_queue); + mysql_cond_broadcast(&free_cond); + } }; diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index f9e912bca7c..b4e2e0af975 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -668,7 +668,7 @@ file '%s')", fname); goto err; mi->rpl_queue= new circular_buffer_queue<slave_queue_element>(); - mi->rpl_queue->init(1000); + mi->rpl_queue->init(20000000); mi->inited = 1; mi->rli.is_relay_log_recovery= FALSE; // now change cache READ -> WRITE - must do this before flush_master_info @@ -846,6 +846,7 @@ void end_master_info(Master_info* mi) mysql_file_close(mi->fd, MYF(MY_WME)); mi->fd = -1; } + mi->rpl_queue->destroy(); mi->inited = 0; DBUG_VOID_RETURN; diff --git a/sql/rpl_queue.h b/sql/rpl_queue.h index 658b99ed328..d2dc44c5106 100644 --- a/sql/rpl_queue.h +++ b/sql/rpl_queue.h @@ -1,5 +1,6 @@ #ifndef RPL_QUEUE_H #define RPL_QUEUE_H +#include "my_sys.h" #include "queue.h" #include <cmath> #include <cstdlib> @@ -74,7 +75,7 @@ class slave_queue_element memcpy(buffer_start, event + t_len, total_length - t_len); return buffer_start + total_length - t_len; } - //NO wrapping needed + //NO wrapping needed //QTODO else { memcpy(ptr, event, total_length); @@ -84,7 +85,7 @@ class slave_queue_element ~slave_queue_element() { if (malloced) - free(event); + my_free(event); } //QTODO diff --git a/sql/slave.cc b/sql/slave.cc index 87d1558cb1a..897b1b7a6d8 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -7634,12 +7634,14 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) MYSQL_BIN_LOG::open() will write the buffered description event. */ old_pos= rli->event_relay_log_pos; - if ((ev= Log_event::read_log_event(rgi->rli->mi->rpl_queue, cur_log, - rli->relay_log.description_event_for_exec, - opt_slave_sql_verify_checksum))) - //uchar *ev_ptr; - //if ((ev_ptr= rgi->rli->mi->rpl_queue->dequeue()->event) && - // (ev= Log_event::read_log_event(ev_ptr, 22, ))) + if ((ev= Log_event::read_log_event(rgi->rli->mi->rpl_queue, cur_log, + rli->relay_log.description_event_for_exec, + opt_slave_sql_verify_checksum))) + //slave_queue_element *el; + //if ((el= rgi->rli->mi->rpl_queue->dequeue()) && + //// (ev= Log_event::read_log_event((const char*)el->event, el->total_length, NULL, + // rli->relay_log.description_event_for_exec, + // opt_slave_sql_verify_checksum ))) { /* read it while we have a lock, to avoid a mutex lock in |