diff options
-rw-r--r-- | mysql-test/suite/rpl/t/xyz.test | 58 | ||||
-rw-r--r-- | sql/log_event.cc | 11 | ||||
-rw-r--r-- | sql/log_event_server.cc | 9 | ||||
-rw-r--r-- | sql/rpl_mi.cc | 2 | ||||
-rw-r--r-- | sql/rpl_queue.h | 16 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 9 | ||||
-rw-r--r-- | sql/rpl_rli.h | 3 | ||||
-rw-r--r-- | unittest/sql/rpl_queue-t.cc | 2 |
8 files changed, 83 insertions, 27 deletions
diff --git a/mysql-test/suite/rpl/t/xyz.test b/mysql-test/suite/rpl/t/xyz.test index 72d44d8043e..3316fd98757 100644 --- a/mysql-test/suite/rpl/t/xyz.test +++ b/mysql-test/suite/rpl/t/xyz.test @@ -6,28 +6,47 @@ connect (slave,127.0.0.1,root,,test,$SERVER_MYPORT_2,); --connection slave eval change master to master_host= "127.0.0.1", master_port=$SERVER_MYPORT_1, master_user="root"; +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=5; +SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode; +SET @@GLOBAL.slave_parallel_mode='aggressive'; --source include/start_slave.inc ---query_vertical show slave status --connection master create table t1(a int, b int , c int ); insert into t1 values(1,1,1); -insert into t1 select * from t1; -insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; -#insert into t1 select * from t1; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; +insert into t1 select * from t1 limit 10; #insert into t1 select * from t1; #insert into t1 select * from t1; #insert into t1 select * from t1; @@ -40,7 +59,7 @@ select count(*) from t1; --connection slave --source include/sync_with_master_gtid.inc select count(*) from t1; ---query_vertical show slave status +show binlog events; --connection master drop table t1; @@ -49,6 +68,9 @@ drop table t1; --connection slave --source include/sync_with_master_gtid.inc --source include/stop_slave.inc +SET @@GLOBAL.slave_parallel_threads=@old_parallel_threads; +SET @@GLOBAL.slave_parallel_mode=@old_parallel_mode; +--sleep 1000 diff --git a/sql/log_event.cc b/sql/log_event.cc index 74c0d6a1c4f..94c443e18ba 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -1033,9 +1033,10 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len, r_queue *rpl_queue, uint32* size) { Log_event *ev; - void *memory __attribute__((unused)) = NULL ; - uint32 ev_size= 0; DBUG_ASSERT(buf == NULL || size == NULL); + void *memory = NULL ; +#ifndef MYSQL_CLIENT + uint32 ev_size= 0; if (rpl_queue && !size) { //get the size of the Log_event object @@ -1044,8 +1045,12 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len, { return NULL; } + //debug info + sql_print_information("Setiya , Size of available buffer Enqueue time%ld", + rpl_queue->free_size()); memory= rpl_queue->enqueue_1(ev_size); } +#endif switch(event_type) { case QUERY_EVENT: if (!buf) @@ -1360,8 +1365,10 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len, size= NULL; break; } +#ifndef MYSQL_CLIENT if (rpl_queue) rpl_queue->enqueue_2(ev_size); +#endif return ev; } diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 148ddd87469..69afab8a5ea 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -625,7 +625,7 @@ int Log_event::do_update_pos(rpl_group_info *rgi) the actual event execution reaches that point. */ if (!rgi->is_parallel_exec || is_group_event(get_type_code())) - rli->stmt_done(log_pos, thd, rgi); + rli->stmt_done(log_pos, thd, rgi, this); } DBUG_RETURN(0); // Cannot fail currently } @@ -1859,6 +1859,13 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, goto end; } } + //Update tail ptr to end of this event + //We are freeing memory on queue(dequeue) on the time commit only + rli->mi->rpl_queue->dequeue_by_tail_ptr((uchar *)this+sizeof(*this)); + //debug info + sql_print_information("Setiya , Size of available buffer Dequeue Time%ld", + rli->mi->rpl_queue->free_size()); + } thd->table_map_for_update= (table_map)table_map_for_update; diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 50a5cc31ad2..e1bbeee8a72 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -668,7 +668,7 @@ file '%s')", fname); goto err; mi->rpl_queue= new circular_buffer_queue_events(); - mi->rpl_queue->init(20000000); + mi->rpl_queue->init(200000); mi->inited = 1; mi->rli.is_relay_log_recovery= FALSE; // now change cache READ -> WRITE - must do this before flush_master_info diff --git a/sql/rpl_queue.h b/sql/rpl_queue.h index 288b74120b5..6e6887f6a0b 100644 --- a/sql/rpl_queue.h +++ b/sql/rpl_queue.h @@ -274,8 +274,11 @@ class circular_buffer_queue_events :public circular_buffer_queue<slave_queue_ele //enqueue_1 will take care of it. head+= size; } - - void* dequeue_1(uint32 size) + + //Dequeue by size , + //So the the size of element which needs to be dequeued will be + //given in argument + void* dequeue_by_size(uint32 size) { if (used_buffer() > 0 ) { @@ -300,6 +303,15 @@ class circular_buffer_queue_events :public circular_buffer_queue<slave_queue_ele return NULL; } + // This will simple update the tail ptr + void dequeue_by_tail_ptr(void *ptr) + { + DBUG_ASSERT(free_size() > 0); + lock_mutex(); + tail= (uchar *)ptr; + unlock_mutex(); + } + }; typedef circular_buffer_queue_events r_queue; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 8b540863e31..a914378de48 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1418,7 +1418,7 @@ bool Relay_log_info::is_until_satisfied(Log_event *ev) bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd, - rpl_group_info *rgi) + rpl_group_info *rgi, Log_event *ev) { int error= 0; DBUG_ENTER("Relay_log_info::stmt_done"); @@ -1481,6 +1481,13 @@ bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd, if (rgi->is_parallel_exec) mysql_mutex_unlock(&data_lock); } + if(mi->rpl_queue && ev->get_type_code() == QUERY_EVENT) + { + Query_log_event *qev= (Query_log_event* )ev; + if (!qev->is_commit() && !qev->is_rollback()) + mi->rpl_queue->dequeue_by_tail_ptr((uchar *)ev+ + sizeof(*(Query_log_event *)this)); + } DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE();); } DBUG_RETURN(error); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 17c7eb519a3..9016625f50f 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -447,7 +447,8 @@ public: relay log info and used to produce information for <code>SHOW SLAVE STATUS</code>. */ - bool stmt_done(my_off_t event_log_pos, THD *thd, rpl_group_info *rgi); + bool stmt_done(my_off_t event_log_pos, THD *thd, rpl_group_info *rgi, + Log_event *ev= NULL); int alloc_inuse_relaylog(const char *name); void free_inuse_relaylog(inuse_relaylog *ir); void reset_inuse_relaylog(); diff --git a/unittest/sql/rpl_queue-t.cc b/unittest/sql/rpl_queue-t.cc index d8d8bc234c1..01c2bb22a62 100644 --- a/unittest/sql/rpl_queue-t.cc +++ b/unittest/sql/rpl_queue-t.cc @@ -29,7 +29,7 @@ void enqueue(circular_buffer_queue_events *queue, char c) } void dequeue(circular_buffer_queue_events *queue) { - dummy_Log_event *dl= static_cast<dummy_Log_event *>(queue->dequeue_1(dummy_Log_event::get_size())); + dummy_Log_event *dl= static_cast<dummy_Log_event *>(queue->dequeue_by_size(dummy_Log_event::get_size())); fwrite(dl->arr, sizeof(char), 20, stdout); printf("\n"); } |