summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorsachin <sachin.setiya@mariadb.com>2021-03-11 21:59:13 +0000
committersachin <sachin.setiya@mariadb.com>2021-03-11 21:59:13 +0000
commit3ad829d31aeb56cd76331d3478a1d8e04ed7300a (patch)
treee855752cc9ed8b90b67b89b51336ac26ed977f7a /sql
parent4b4a020d1ce15275689fc307b8d13c107e6a41be (diff)
downloadmariadb-git-queue-v1.tar.gz
commit 5queue-v1
Diffstat (limited to 'sql')
-rw-r--r--sql/log_event.cc11
-rw-r--r--sql/log_event_server.cc9
-rw-r--r--sql/rpl_mi.cc2
-rw-r--r--sql/rpl_queue.h16
-rw-r--r--sql/rpl_rli.cc9
-rw-r--r--sql/rpl_rli.h3
6 files changed, 42 insertions, 8 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 74c0d6a1c4f..94c443e18ba 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -1033,9 +1033,10 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len,
r_queue *rpl_queue, uint32* size)
{
Log_event *ev;
- void *memory __attribute__((unused)) = NULL ;
- uint32 ev_size= 0;
DBUG_ASSERT(buf == NULL || size == NULL);
+ void *memory = NULL ;
+#ifndef MYSQL_CLIENT
+ uint32 ev_size= 0;
if (rpl_queue && !size)
{
//get the size of the Log_event object
@@ -1044,8 +1045,12 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len,
{
return NULL;
}
+ //debug info
+ sql_print_information("Setiya , Size of available buffer Enqueue time%ld",
+ rpl_queue->free_size());
memory= rpl_queue->enqueue_1(ev_size);
}
+#endif
switch(event_type) {
case QUERY_EVENT:
if (!buf)
@@ -1360,8 +1365,10 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len,
size= NULL;
break;
}
+#ifndef MYSQL_CLIENT
if (rpl_queue)
rpl_queue->enqueue_2(ev_size);
+#endif
return ev;
}
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 148ddd87469..69afab8a5ea 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -625,7 +625,7 @@ int Log_event::do_update_pos(rpl_group_info *rgi)
the actual event execution reaches that point.
*/
if (!rgi->is_parallel_exec || is_group_event(get_type_code()))
- rli->stmt_done(log_pos, thd, rgi);
+ rli->stmt_done(log_pos, thd, rgi, this);
}
DBUG_RETURN(0); // Cannot fail currently
}
@@ -1859,6 +1859,13 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
goto end;
}
}
+ //Update tail ptr to end of this event
+ //We are freeing memory on queue(dequeue) on the time commit only
+ rli->mi->rpl_queue->dequeue_by_tail_ptr((uchar *)this+sizeof(*this));
+ //debug info
+ sql_print_information("Setiya , Size of available buffer Dequeue Time%ld",
+ rli->mi->rpl_queue->free_size());
+
}
thd->table_map_for_update= (table_map)table_map_for_update;
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 50a5cc31ad2..e1bbeee8a72 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -668,7 +668,7 @@ file '%s')", fname);
goto err;
mi->rpl_queue= new circular_buffer_queue_events();
- mi->rpl_queue->init(20000000);
+ mi->rpl_queue->init(200000);
mi->inited = 1;
mi->rli.is_relay_log_recovery= FALSE;
// now change cache READ -> WRITE - must do this before flush_master_info
diff --git a/sql/rpl_queue.h b/sql/rpl_queue.h
index 288b74120b5..6e6887f6a0b 100644
--- a/sql/rpl_queue.h
+++ b/sql/rpl_queue.h
@@ -274,8 +274,11 @@ class circular_buffer_queue_events :public circular_buffer_queue<slave_queue_ele
//enqueue_1 will take care of it.
head+= size;
}
-
- void* dequeue_1(uint32 size)
+
+ //Dequeue by size ,
+ //So the the size of element which needs to be dequeued will be
+ //given in argument
+ void* dequeue_by_size(uint32 size)
{
if (used_buffer() > 0 )
{
@@ -300,6 +303,15 @@ class circular_buffer_queue_events :public circular_buffer_queue<slave_queue_ele
return NULL;
}
+ // This will simple update the tail ptr
+ void dequeue_by_tail_ptr(void *ptr)
+ {
+ DBUG_ASSERT(free_size() > 0);
+ lock_mutex();
+ tail= (uchar *)ptr;
+ unlock_mutex();
+ }
+
};
typedef circular_buffer_queue_events r_queue;
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 8b540863e31..a914378de48 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1418,7 +1418,7 @@ bool Relay_log_info::is_until_satisfied(Log_event *ev)
bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
- rpl_group_info *rgi)
+ rpl_group_info *rgi, Log_event *ev)
{
int error= 0;
DBUG_ENTER("Relay_log_info::stmt_done");
@@ -1481,6 +1481,13 @@ bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
if (rgi->is_parallel_exec)
mysql_mutex_unlock(&data_lock);
}
+ if(mi->rpl_queue && ev->get_type_code() == QUERY_EVENT)
+ {
+ Query_log_event *qev= (Query_log_event* )ev;
+ if (!qev->is_commit() && !qev->is_rollback())
+ mi->rpl_queue->dequeue_by_tail_ptr((uchar *)ev+
+ sizeof(*(Query_log_event *)this));
+ }
DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
}
DBUG_RETURN(error);
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 17c7eb519a3..9016625f50f 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -447,7 +447,8 @@ public:
relay log info and used to produce information for <code>SHOW
SLAVE STATUS</code>.
*/
- bool stmt_done(my_off_t event_log_pos, THD *thd, rpl_group_info *rgi);
+ bool stmt_done(my_off_t event_log_pos, THD *thd, rpl_group_info *rgi,
+ Log_event *ev= NULL);
int alloc_inuse_relaylog(const char *name);
void free_inuse_relaylog(inuse_relaylog *ir);
void reset_inuse_relaylog();