diff options
author | Sachin Kumar <sachin.setiya@mariadb.com> | 2021-08-26 08:04:28 +0100 |
---|---|---|
committer | Sachin Kumar <sachin.setiya@mariadb.com> | 2021-08-26 08:04:28 +0100 |
commit | e4590597e5653aee228b88faeba866ad322ca889 (patch) | |
tree | 799afbf74cc3da3952ccd29d3297f8d77417c8e0 | |
parent | dfeee6133b771d5b0c63f8db97a916d33a49a84e (diff) | |
download | mariadb-git-10.6-4991.tar.gz |
commit 210.6-4991
-rw-r--r-- | sql/log.cc | 126 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 133 | ||||
-rw-r--r-- | sql/sql_repl.cc | 15 |
3 files changed, 142 insertions, 132 deletions
diff --git a/sql/log.cc b/sql/log.cc index 43f8267d98b..88215c9232c 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -522,132 +522,6 @@ void Log_event_writer::set_incident() cache_data->set_incident(); } -class gtid_index : public Sql_alloc -{ - MEM_ROOT *mem_root; - uint32 domain_id; - uint32 seq_no_modulus; - class gtid_index_elem:public Sql_alloc - { - ulong seq_no; - ulong file_pos; - //Mostly we will no require this - Dynamic_array<ulong> file_pos_array; - public: - gtid_index_elem(ulong seq_no_, ulong file_pos_): seq_no(seq_no_), - file_pos(file_pos_), file_pos_array(PSI_INSTRUMENT_MEM) - {} - ulong get_seq_no() - { - return seq_no; - } - ulong get_file_pos() - { - return file_pos; - } - }; -public: - gtid_index(MEM_ROOT *mem_root_, uint32 domain_id_, uint32 seq_no_modulus_= 1): - mem_root(mem_root_), domain_id(domain_id_), - seq_no_modulus(seq_no_modulus_) - { - data= new Dynamic_array<gtid_index_elem *>(mem_root); - } - Dynamic_array<gtid_index_elem *> *data; - - void insert(ulong seq_no, ulong file_pos) - { - if(!(seq_no % seq_no_modulus)) - data->append_val(new (mem_root) gtid_index_elem(seq_no, file_pos)); - } - - /* - We will return the closest cached seq_no. - After that the caller is supposed to seek into binlog file till it finds - the relevant gtid_seq_no. - */ - ulong find(ulong seq_no) - { - if(!data->elements()) - return 0; - //binary search - uint64 lo= 0, hi= data->elements() - 1; - if(seq_no % seq_no_modulus) - seq_no-= seq_no % seq_no_modulus; - - while(lo <= hi) - { - ulong mid= lo + (hi - lo)/2; - ulong mid_seq_no= data->at(mid)->get_seq_no(); - if(mid_seq_no == seq_no) - break; - if(mid_seq_no > seq_no) - hi= mid - 1; - else - lo= mid + 1; - } - return data->at(lo)->get_file_pos(); - } - - void fwrite() - { - - } - - void fread() - { - - } -}; - -class gtid_index_hash : public Sql_alloc -{ - MEM_ROOT *mem_root; - HASH hash; - struct entry: public Sql_alloc - { - uint32 domain_id; - uint32 index; - entry(uint32 domain_id_, uint32 index_):domain_id(domain_id_), index(index_) - {} - }; - Dynamic_array<gtid_index *> arr; -public: - gtid_index_hash(MEM_ROOT *mem_root_):mem_root(mem_root_), - arr(mem_root) - { - my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32, - offsetof(entry, domain_id), sizeof(uint32), NULL, my_free, - HASH_UNIQUE); - }; - void insert(rpl_gtid >id, ulong file_pos) - { - entry *e; - if((e= (entry *) my_hash_search(&hash, - (const uchar *)(>id.domain_id), 0))) - { - arr.at(e->index)->insert(gtid.seq_no, file_pos); - } - else - { - entry *e= new(mem_root) entry(gtid.domain_id, (uint32)arr.size()); - my_hash_insert(&hash, (uchar *)e); - arr.append_val(new(mem_root) gtid_index(mem_root, gtid.domain_id)); - } - } - - ulong find(rpl_gtid >id) - { - entry *e; - if((e= (entry *) my_hash_search(&hash, - (const uchar *)(>id.domain_id), 0))) - { - return arr.at(e->index)->find(gtid.seq_no); - } - return 0; - } -}; - gtid_index_hash *gtid_indexes; class binlog_cache_mngr { diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index 4d3c63f598c..9315aac08ff 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -18,6 +18,7 @@ #include "hash.h" #include "queues.h" +#include "sql_array.h" #include <atomic> /* Definitions for MariaDB global transaction ID (GTID). */ @@ -382,4 +383,136 @@ extern int gtid_check_rpl_slave_state_table(TABLE *table); extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len, uint32 *out_len); +class gtid_index : public Sql_alloc +{ + MEM_ROOT *mem_root; + uint32 domain_id; + uint32 seq_no_modulus; + IO_CACHE *curr_file; + class gtid_index_elem:public Sql_alloc + { + ulong seq_no; + ulong file_pos; + //Mostly we will no require this + Dynamic_array<ulong> file_pos_array; + public: + gtid_index_elem(ulong seq_no_, ulong file_pos_): seq_no(seq_no_), + file_pos(file_pos_), file_pos_array(PSI_INSTRUMENT_MEM) + {} + ulong get_seq_no() + { + return seq_no; + } + ulong get_file_pos() + { + return file_pos; + } + }; +public: + gtid_index(MEM_ROOT *mem_root_, uint32 domain_id_, uint32 seq_no_modulus_= 1, + IO_CACHE *file): + mem_root(mem_root_), domain_id(domain_id_), + seq_no_modulus(seq_no_modulus_), curr_file(file) + { + data= new Dynamic_array<gtid_index_elem *>(mem_root); + } + Dynamic_array<gtid_index_elem *> *data; + + void insert(ulong seq_no, ulong file_pos) + { + if(!(seq_no % seq_no_modulus)) + { + data->append_val(new (mem_root) gtid_index_elem(seq_no, file_pos)); + fwrite(seq_no, file_pos); + } + } + + /* + We will return the closest cached seq_no. + After that the caller is supposed to seek into binlog file till it finds + the relevant gtid_seq_no. + */ + ulong find(ulong seq_no) + { + if(!data->elements()) + return 0; + //binary search + uint64 lo= 0, hi= data->elements() - 1; + if(seq_no % seq_no_modulus) + seq_no-= seq_no % seq_no_modulus; + + while(lo < hi) + { + ulong mid= lo + (hi - lo)/2; + ulong mid_seq_no= data->at(mid)->get_seq_no(); + if(mid_seq_no == seq_no) + return data->at(mid)->get_file_pos(); + if(mid_seq_no > seq_no) + hi= mid - 1; + else + lo= mid + 1; + } + return data->at(lo)->get_file_pos(); + } + + void fwrite() + { + + } + + void fread() + { + + } +}; + +class gtid_index_hash : public Sql_alloc +{ + MEM_ROOT *mem_root; + HASH hash; + struct entry: public Sql_alloc + { + uint32 domain_id; + uint32 index; + entry(uint32 domain_id_, uint32 index_):domain_id(domain_id_), index(index_) + {} + }; + Dynamic_array<gtid_index *> arr; +public: + gtid_index_hash(MEM_ROOT *mem_root_):mem_root(mem_root_), + arr(mem_root) + { + my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32, + offsetof(entry, domain_id), sizeof(uint32), NULL, my_free, + HASH_UNIQUE); + }; + void insert(rpl_gtid >id, ulong file_pos) + { + entry *e; + if((e= (entry *) my_hash_search(&hash, + (const uchar *)(>id.domain_id), 0))) + { + arr.at(e->index)->insert(gtid.seq_no, file_pos); + } + else + { + entry *e= new(mem_root) entry(gtid.domain_id, (uint32)arr.size()); + my_hash_insert(&hash, (uchar *)e); + arr.append_val(new(mem_root) gtid_index(mem_root, gtid.domain_id)); + } + } + + ulong find(rpl_gtid >id) + { + entry *e; + if((e= (entry *) my_hash_search(&hash, + (const uchar *)(>id.domain_id), 0))) + { + return arr.at(e->index)->find(gtid.seq_no); + } + return 0; + } +}; + + #endif /* RPL_GTID_H */ diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 56aaf01f1c5..a6f07ba413c 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1302,7 +1302,8 @@ end: */ static const char * gtid_find_binlog_file(slave_connection_state *state, char *out_name, - slave_connection_state *until_gtid_state) + slave_connection_state *until_gtid_state, + rpl_gtid *out_gtid) { MEM_ROOT memroot; binlog_file_entry *list; @@ -1375,6 +1376,9 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name, for (i= 0; i < glev->count; ++i) { const rpl_gtid *gtid= state->find(glev->list[i].domain_id); + out_gtid->domain_id= gtid->domain_id; + out_gtid->server_id= gtid->server_id; + out_gtid->seq_no= gtid->seq_no; if (!gtid) { /* @@ -2107,7 +2111,7 @@ err: return info->error; } -//extern gtid_index_hash *gtid_indexes; +extern gtid_index_hash *gtid_indexes; static int init_binlog_sender(binlog_send_info *info, LOG_INFO *linfo, const char *log_ident, @@ -2202,17 +2206,16 @@ static int init_binlog_sender(binlog_send_info *info, info->error= error; return 1; } + rpl_gtid gtid= rpl_gtid{0, 0, 0}; if ((info->errmsg= gtid_find_binlog_file(&info->gtid_state, search_file_name, - info->until_gtid_state))) + info->until_gtid_state, >id))) { info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; return 1; } - /* start from beginning of binlog file */ - //gtid_indexes->insert(info->gtid_state, 1); - *pos = 4; + *pos= gtid_indexes->find(gtid); } else { |