summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsachin <sachin.setiya@mariadb.com>2020-11-17 20:31:02 +0000
committersachin <sachin.setiya@mariadb.com>2020-11-17 20:31:02 +0000
commitf2f3fdc068b59e7bd29eaa7197bb88ea2420fe40 (patch)
tree715f27c479c2735321b4ee181db513c8fdb18946
parent92cb9944231536a249e514ae1bde4908bd4c3206 (diff)
downloadmariadb-git-f2f3fdc068b59e7bd29eaa7197bb88ea2420fe40.tar.gz
Queue working without any issue
-rw-r--r--mysql-test/suite/rpl/t/xyz.test45
-rw-r--r--sql/log_event.cc3
-rw-r--r--sql/queue.h24
-rw-r--r--sql/rpl_mi.cc3
-rw-r--r--sql/rpl_queue.h5
-rw-r--r--sql/slave.cc14
6 files changed, 67 insertions, 27 deletions
diff --git a/mysql-test/suite/rpl/t/xyz.test b/mysql-test/suite/rpl/t/xyz.test
index 387d2c05b57..27d40fed5dd 100644
--- a/mysql-test/suite/rpl/t/xyz.test
+++ b/mysql-test/suite/rpl/t/xyz.test
@@ -12,29 +12,42 @@ eval change master to master_host= "127.0.0.1", master_port=$SERVER_MYPORT_1, ma
--connection master
create table t1(a int, b int , c int );
insert into t1 values(1,1,1);
---sleep 5
-insert into t1 select * from t1;
- insert into t1 select * from t1;
- insert into t1 select * from t1;
- insert into t1 select * from t1;
- insert into t1 select * from t1;
- insert into t1 select * from t1;
- insert into t1 select * from t1;
- insert into t1 select * from t1;
-# insert into t1 select * from t1;
-# insert into t1 select * from t1;
-# insert into t1 select * from t1;
-# insert into t1 select * from t1;
-# insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+insert into t1 select * from t1;
+#insert into t1 select * from t1;
+#insert into t1 select * from t1;
+#insert into t1 select * from t1;
+#insert into t1 select * from t1;
+#insert into t1 select * from t1;
+#insert into t1 select * from t1;
+#insert into t1 select * from t1;
select count(*) from t1;
+# show binlog events;
--source include/save_master_gtid.inc
--connection slave
---sleep 2
+--source include/sync_with_master_gtid.inc
select count(*) from t1;
---sleep 999992
--query_vertical show slave status
+--connection master
+drop table t1;
+--source include/save_master_gtid.inc
+
+--connection slave
+--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 39d6e7e99b0..98cee843dd7 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -825,6 +825,9 @@ int Log_event::read_log_event(r_queue *rpl_queue, IO_CACHE* file, String* packet
if (el)
{
packet->append((const char *)el->event, el->total_length);
+ data_len= el->total_length;
+ rpl_queue->unlock_mutex();
+ delete el;
goto direct_path;
}
}
diff --git a/sql/queue.h b/sql/queue.h
index 73e81f10ef9..b9e24a3323c 100644
--- a/sql/queue.h
+++ b/sql/queue.h
@@ -4,6 +4,7 @@
#include "my_global.h"
#include "my_base.h"
#include "my_pthread.h"
+#include "my_sys.h"
#include "mysql/psi/mysql_thread.h"
#include <cstring>
@@ -58,6 +59,14 @@ class circular_buffer_queue
return 0;
}
+ void destroy()
+ {
+ my_free(buffer);
+ mysql_mutex_destroy(&lock_queue);
+ mysql_mutex_destroy(&free_queue);
+ mysql_cond_destroy(&free_cond);
+ }
+
/*
We want to write in continues memory.
*/
@@ -78,9 +87,10 @@ class circular_buffer_queue
{
mysql_mutex_lock(&lock_queue);
Element_type *el= new Element_type(tail, buffer, buffer_end);
+ //We are not going to unlock mutex till we get explicit call of
+ //unlock_mutex by caller thread (that means sql thread has copied data
+ //into its buffer)
tail= el->tail;
- mysql_mutex_unlock(&lock_queue);
- mysql_cond_broadcast(&free_cond);
return el;
}
return NULL;
@@ -94,6 +104,16 @@ class circular_buffer_queue
mysql_mutex_unlock(&free_queue);
return enqueue(elem);
}
+
+ void lock_mutex()
+ {
+ mysql_mutex_lock(&lock_queue);
+ }
+ void unlock_mutex()
+ {
+ mysql_mutex_unlock(&lock_queue);
+ mysql_cond_broadcast(&free_cond);
+ }
};
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index f9e912bca7c..b4e2e0af975 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -668,7 +668,7 @@ file '%s')", fname);
goto err;
mi->rpl_queue= new circular_buffer_queue<slave_queue_element>();
- mi->rpl_queue->init(1000);
+ mi->rpl_queue->init(20000000);
mi->inited = 1;
mi->rli.is_relay_log_recovery= FALSE;
// now change cache READ -> WRITE - must do this before flush_master_info
@@ -846,6 +846,7 @@ void end_master_info(Master_info* mi)
mysql_file_close(mi->fd, MYF(MY_WME));
mi->fd = -1;
}
+ mi->rpl_queue->destroy();
mi->inited = 0;
DBUG_VOID_RETURN;
diff --git a/sql/rpl_queue.h b/sql/rpl_queue.h
index 658b99ed328..d2dc44c5106 100644
--- a/sql/rpl_queue.h
+++ b/sql/rpl_queue.h
@@ -1,5 +1,6 @@
#ifndef RPL_QUEUE_H
#define RPL_QUEUE_H
+#include "my_sys.h"
#include "queue.h"
#include <cmath>
#include <cstdlib>
@@ -74,7 +75,7 @@ class slave_queue_element
memcpy(buffer_start, event + t_len, total_length - t_len);
return buffer_start + total_length - t_len;
}
- //NO wrapping needed
+ //NO wrapping needed //QTODO
else
{
memcpy(ptr, event, total_length);
@@ -84,7 +85,7 @@ class slave_queue_element
~slave_queue_element()
{
if (malloced)
- free(event);
+ my_free(event);
}
//QTODO
diff --git a/sql/slave.cc b/sql/slave.cc
index 87d1558cb1a..897b1b7a6d8 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -7634,12 +7634,14 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
MYSQL_BIN_LOG::open() will write the buffered description event.
*/
old_pos= rli->event_relay_log_pos;
- if ((ev= Log_event::read_log_event(rgi->rli->mi->rpl_queue, cur_log,
- rli->relay_log.description_event_for_exec,
- opt_slave_sql_verify_checksum)))
- //uchar *ev_ptr;
- //if ((ev_ptr= rgi->rli->mi->rpl_queue->dequeue()->event) &&
- // (ev= Log_event::read_log_event(ev_ptr, 22, )))
+ if ((ev= Log_event::read_log_event(rgi->rli->mi->rpl_queue, cur_log,
+ rli->relay_log.description_event_for_exec,
+ opt_slave_sql_verify_checksum)))
+ //slave_queue_element *el;
+ //if ((el= rgi->rli->mi->rpl_queue->dequeue()) &&
+ //// (ev= Log_event::read_log_event((const char*)el->event, el->total_length, NULL,
+ // rli->relay_log.description_event_for_exec,
+ // opt_slave_sql_verify_checksum )))
{
/*
read it while we have a lock, to avoid a mutex lock in