diff options
author | sachin <sachin.setiya@mariadb.com> | 2021-03-11 21:59:13 +0000 |
---|---|---|
committer | sachin <sachin.setiya@mariadb.com> | 2021-03-11 21:59:13 +0000 |
commit | 3ad829d31aeb56cd76331d3478a1d8e04ed7300a (patch) | |
tree | e855752cc9ed8b90b67b89b51336ac26ed977f7a /sql | |
parent | 4b4a020d1ce15275689fc307b8d13c107e6a41be (diff) | |
download | mariadb-git-queue-v1.tar.gz |
commit 5queue-v1
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log_event.cc | 11 | ||||
-rw-r--r-- | sql/log_event_server.cc | 9 | ||||
-rw-r--r-- | sql/rpl_mi.cc | 2 | ||||
-rw-r--r-- | sql/rpl_queue.h | 16 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 9 | ||||
-rw-r--r-- | sql/rpl_rli.h | 3 |
6 files changed, 42 insertions, 8 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 74c0d6a1c4f..94c443e18ba 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -1033,9 +1033,10 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len, r_queue *rpl_queue, uint32* size) { Log_event *ev; - void *memory __attribute__((unused)) = NULL ; - uint32 ev_size= 0; DBUG_ASSERT(buf == NULL || size == NULL); + void *memory = NULL ; +#ifndef MYSQL_CLIENT + uint32 ev_size= 0; if (rpl_queue && !size) { //get the size of the Log_event object @@ -1044,8 +1045,12 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len, { return NULL; } + //debug info + sql_print_information("Setiya , Size of available buffer Enqueue time%ld", + rpl_queue->free_size()); memory= rpl_queue->enqueue_1(ev_size); } +#endif switch(event_type) { case QUERY_EVENT: if (!buf) @@ -1360,8 +1365,10 @@ Log_event * create_log_event_or_get_size(const char *buf, uint event_len, size= NULL; break; } +#ifndef MYSQL_CLIENT if (rpl_queue) rpl_queue->enqueue_2(ev_size); +#endif return ev; } diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 148ddd87469..69afab8a5ea 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -625,7 +625,7 @@ int Log_event::do_update_pos(rpl_group_info *rgi) the actual event execution reaches that point. */ if (!rgi->is_parallel_exec || is_group_event(get_type_code())) - rli->stmt_done(log_pos, thd, rgi); + rli->stmt_done(log_pos, thd, rgi, this); } DBUG_RETURN(0); // Cannot fail currently } @@ -1859,6 +1859,13 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, goto end; } } + //Update tail ptr to end of this event + //We are freeing memory on queue(dequeue) on the time commit only + rli->mi->rpl_queue->dequeue_by_tail_ptr((uchar *)this+sizeof(*this)); + //debug info + sql_print_information("Setiya , Size of available buffer Dequeue Time%ld", + rli->mi->rpl_queue->free_size()); + } thd->table_map_for_update= (table_map)table_map_for_update; diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 50a5cc31ad2..e1bbeee8a72 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -668,7 +668,7 @@ file '%s')", fname); goto err; mi->rpl_queue= new circular_buffer_queue_events(); - mi->rpl_queue->init(20000000); + mi->rpl_queue->init(200000); mi->inited = 1; mi->rli.is_relay_log_recovery= FALSE; // now change cache READ -> WRITE - must do this before flush_master_info diff --git a/sql/rpl_queue.h b/sql/rpl_queue.h index 288b74120b5..6e6887f6a0b 100644 --- a/sql/rpl_queue.h +++ b/sql/rpl_queue.h @@ -274,8 +274,11 @@ class circular_buffer_queue_events :public circular_buffer_queue<slave_queue_ele //enqueue_1 will take care of it. head+= size; } - - void* dequeue_1(uint32 size) + + //Dequeue by size , + //So the the size of element which needs to be dequeued will be + //given in argument + void* dequeue_by_size(uint32 size) { if (used_buffer() > 0 ) { @@ -300,6 +303,15 @@ class circular_buffer_queue_events :public circular_buffer_queue<slave_queue_ele return NULL; } + // This will simple update the tail ptr + void dequeue_by_tail_ptr(void *ptr) + { + DBUG_ASSERT(free_size() > 0); + lock_mutex(); + tail= (uchar *)ptr; + unlock_mutex(); + } + }; typedef circular_buffer_queue_events r_queue; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 8b540863e31..a914378de48 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1418,7 +1418,7 @@ bool Relay_log_info::is_until_satisfied(Log_event *ev) bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd, - rpl_group_info *rgi) + rpl_group_info *rgi, Log_event *ev) { int error= 0; DBUG_ENTER("Relay_log_info::stmt_done"); @@ -1481,6 +1481,13 @@ bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd, if (rgi->is_parallel_exec) mysql_mutex_unlock(&data_lock); } + if(mi->rpl_queue && ev->get_type_code() == QUERY_EVENT) + { + Query_log_event *qev= (Query_log_event* )ev; + if (!qev->is_commit() && !qev->is_rollback()) + mi->rpl_queue->dequeue_by_tail_ptr((uchar *)ev+ + sizeof(*(Query_log_event *)this)); + } DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE();); } DBUG_RETURN(error); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 17c7eb519a3..9016625f50f 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -447,7 +447,8 @@ public: relay log info and used to produce information for <code>SHOW SLAVE STATUS</code>. */ - bool stmt_done(my_off_t event_log_pos, THD *thd, rpl_group_info *rgi); + bool stmt_done(my_off_t event_log_pos, THD *thd, rpl_group_info *rgi, + Log_event *ev= NULL); int alloc_inuse_relaylog(const char *name); void free_inuse_relaylog(inuse_relaylog *ir); void reset_inuse_relaylog(); |