summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/t/xyz.test58
-rw-r--r--sql/log_event.cc11
-rw-r--r--sql/log_event_server.cc9
-rw-r--r--sql/rpl_mi.cc2
-rw-r--r--sql/rpl_queue.h16
-rw-r--r--sql/rpl_rli.cc9
-rw-r--r--sql/rpl_rli.h3
-rw-r--r--unittest/sql/rpl_queue-t.cc2
8 files changed, 83 insertions, 27 deletions
diff --git a/mysql-test/suite/rpl/t/xyz.test b/mysql-test/suite/rpl/t/xyz.test
index 72d44d8043e..3316fd98757 100644
--- a/mysql-test/suite/rpl/t/xyz.test
+++ b/mysql-test/suite/rpl/t/xyz.test
@@ -6,28 +6,47 @@ connect (slave,127.0.0.1,root,,test,$SERVER_MYPORT_2,);
--connection slave
eval change master to master_host= "127.0.0.1", master_port=$SERVER_MYPORT_1, master_user="root";
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=5;
+SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
+SET @@GLOBAL.slave_parallel_mode='aggressive';
--source include/start_slave.inc
---query_vertical show slave status
--connection master
create table t1(a int, b int , c int );
insert into t1 values(1,1,1);
-insert into t1 select * from t1;
-insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
-#insert into t1 select * from t1;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
+insert into t1 select * from t1 limit 10;
#insert into t1 select * from t1;
#insert into t1 select * from t1;
#insert into t1 select * from t1;
@@ -40,7 +59,7 @@ select count(*) from t1;
--connection slave
--source include/sync_with_master_gtid.inc
select count(*) from t1;
---query_vertical show slave status
+show binlog events;
--connection master
drop table t1;
@@ -49,6 +68,9 @@ drop table t1;
--connection slave
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
+SET @@GLOBAL.slave_parallel_threads=@old_parallel_threads;
+SET @@GLOBAL.slave_parallel_mode=@old_parallel_mode;
+--sleep 1000
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 74c0d6a1c4f..94c443e18ba 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -1033,9 +1033,10 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len,
r_queue *rpl_queue, uint32* size)
{
Log_event *ev;
- void *memory __attribute__((unused)) = NULL ;
- uint32 ev_size= 0;
DBUG_ASSERT(buf == NULL || size == NULL);
+ void *memory = NULL ;
+#ifndef MYSQL_CLIENT
+ uint32 ev_size= 0;
if (rpl_queue && !size)
{
//get the size of the Log_event object
@@ -1044,8 +1045,12 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len,
{
return NULL;
}
+ //debug info
+ sql_print_information("Setiya , Size of available buffer Enqueue time%ld",
+ rpl_queue->free_size());
memory= rpl_queue->enqueue_1(ev_size);
}
+#endif
switch(event_type) {
case QUERY_EVENT:
if (!buf)
@@ -1360,8 +1365,10 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len,
size= NULL;
break;
}
+#ifndef MYSQL_CLIENT
if (rpl_queue)
rpl_queue->enqueue_2(ev_size);
+#endif
return ev;
}
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 148ddd87469..69afab8a5ea 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -625,7 +625,7 @@ int Log_event::do_update_pos(rpl_group_info *rgi)
the actual event execution reaches that point.
*/
if (!rgi->is_parallel_exec || is_group_event(get_type_code()))
- rli->stmt_done(log_pos, thd, rgi);
+ rli->stmt_done(log_pos, thd, rgi, this);
}
DBUG_RETURN(0); // Cannot fail currently
}
@@ -1859,6 +1859,13 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
goto end;
}
}
+ //Update tail ptr to end of this event
+ //We are freeing memory on queue(dequeue) on the time commit only
+ rli->mi->rpl_queue->dequeue_by_tail_ptr((uchar *)this+sizeof(*this));
+ //debug info
+ sql_print_information("Setiya , Size of available buffer Dequeue Time%ld",
+ rli->mi->rpl_queue->free_size());
+
}
thd->table_map_for_update= (table_map)table_map_for_update;
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 50a5cc31ad2..e1bbeee8a72 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -668,7 +668,7 @@ file '%s')", fname);
goto err;
mi->rpl_queue= new circular_buffer_queue_events();
- mi->rpl_queue->init(20000000);
+ mi->rpl_queue->init(200000);
mi->inited = 1;
mi->rli.is_relay_log_recovery= FALSE;
// now change cache READ -> WRITE - must do this before flush_master_info
diff --git a/sql/rpl_queue.h b/sql/rpl_queue.h
index 288b74120b5..6e6887f6a0b 100644
--- a/sql/rpl_queue.h
+++ b/sql/rpl_queue.h
@@ -274,8 +274,11 @@ class circular_buffer_queue_events :public circular_buffer_queue<slave_queue_ele
//enqueue_1 will take care of it.
head+= size;
}
-
- void* dequeue_1(uint32 size)
+
+ //Dequeue by size ,
+ //So the the size of element which needs to be dequeued will be
+ //given in argument
+ void* dequeue_by_size(uint32 size)
{
if (used_buffer() > 0 )
{
@@ -300,6 +303,15 @@ class circular_buffer_queue_events :public circular_buffer_queue<slave_queue_ele
return NULL;
}
+ // This will simple update the tail ptr
+ void dequeue_by_tail_ptr(void *ptr)
+ {
+ DBUG_ASSERT(free_size() > 0);
+ lock_mutex();
+ tail= (uchar *)ptr;
+ unlock_mutex();
+ }
+
};
typedef circular_buffer_queue_events r_queue;
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 8b540863e31..a914378de48 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1418,7 +1418,7 @@ bool Relay_log_info::is_until_satisfied(Log_event *ev)
bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
- rpl_group_info *rgi)
+ rpl_group_info *rgi, Log_event *ev)
{
int error= 0;
DBUG_ENTER("Relay_log_info::stmt_done");
@@ -1481,6 +1481,13 @@ bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
if (rgi->is_parallel_exec)
mysql_mutex_unlock(&data_lock);
}
+ if(mi->rpl_queue && ev->get_type_code() == QUERY_EVENT)
+ {
+ Query_log_event *qev= (Query_log_event* )ev;
+ if (!qev->is_commit() && !qev->is_rollback())
+ mi->rpl_queue->dequeue_by_tail_ptr((uchar *)ev+
+ sizeof(*(Query_log_event *)this));
+ }
DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
}
DBUG_RETURN(error);
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 17c7eb519a3..9016625f50f 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -447,7 +447,8 @@ public:
relay log info and used to produce information for <code>SHOW
SLAVE STATUS</code>.
*/
- bool stmt_done(my_off_t event_log_pos, THD *thd, rpl_group_info *rgi);
+ bool stmt_done(my_off_t event_log_pos, THD *thd, rpl_group_info *rgi,
+ Log_event *ev= NULL);
int alloc_inuse_relaylog(const char *name);
void free_inuse_relaylog(inuse_relaylog *ir);
void reset_inuse_relaylog();
diff --git a/unittest/sql/rpl_queue-t.cc b/unittest/sql/rpl_queue-t.cc
index d8d8bc234c1..01c2bb22a62 100644
--- a/unittest/sql/rpl_queue-t.cc
+++ b/unittest/sql/rpl_queue-t.cc
@@ -29,7 +29,7 @@ void enqueue(circular_buffer_queue_events *queue, char c)
}
void dequeue(circular_buffer_queue_events *queue)
{
- dummy_Log_event *dl= static_cast<dummy_Log_event *>(queue->dequeue_1(dummy_Log_event::get_size()));
+ dummy_Log_event *dl= static_cast<dummy_Log_event *>(queue->dequeue_by_size(dummy_Log_event::get_size()));
fwrite(dl->arr, sizeof(char), 20, stdout);
printf("\n");
}