summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSachin Kumar <sachin.setiya@mariadb.com>2021-08-26 08:04:28 +0100
committerSachin Kumar <sachin.setiya@mariadb.com>2021-08-26 08:04:28 +0100
commite4590597e5653aee228b88faeba866ad322ca889 (patch)
tree799afbf74cc3da3952ccd29d3297f8d77417c8e0
parentdfeee6133b771d5b0c63f8db97a916d33a49a84e (diff)
downloadmariadb-git-10.6-4991.tar.gz
commit 210.6-4991
-rw-r--r--sql/log.cc126
-rw-r--r--sql/rpl_gtid.h133
-rw-r--r--sql/sql_repl.cc15
3 files changed, 142 insertions, 132 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 43f8267d98b..88215c9232c 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -522,132 +522,6 @@ void Log_event_writer::set_incident()
cache_data->set_incident();
}
-class gtid_index : public Sql_alloc
-{
- MEM_ROOT *mem_root;
- uint32 domain_id;
- uint32 seq_no_modulus;
- class gtid_index_elem:public Sql_alloc
- {
- ulong seq_no;
- ulong file_pos;
- //Mostly we will no require this
- Dynamic_array<ulong> file_pos_array;
- public:
- gtid_index_elem(ulong seq_no_, ulong file_pos_): seq_no(seq_no_),
- file_pos(file_pos_), file_pos_array(PSI_INSTRUMENT_MEM)
- {}
- ulong get_seq_no()
- {
- return seq_no;
- }
- ulong get_file_pos()
- {
- return file_pos;
- }
- };
-public:
- gtid_index(MEM_ROOT *mem_root_, uint32 domain_id_, uint32 seq_no_modulus_= 1):
- mem_root(mem_root_), domain_id(domain_id_),
- seq_no_modulus(seq_no_modulus_)
- {
- data= new Dynamic_array<gtid_index_elem *>(mem_root);
- }
- Dynamic_array<gtid_index_elem *> *data;
-
- void insert(ulong seq_no, ulong file_pos)
- {
- if(!(seq_no % seq_no_modulus))
- data->append_val(new (mem_root) gtid_index_elem(seq_no, file_pos));
- }
-
- /*
- We will return the closest cached seq_no.
- After that the caller is supposed to seek into binlog file till it finds
- the relevant gtid_seq_no.
- */
- ulong find(ulong seq_no)
- {
- if(!data->elements())
- return 0;
- //binary search
- uint64 lo= 0, hi= data->elements() - 1;
- if(seq_no % seq_no_modulus)
- seq_no-= seq_no % seq_no_modulus;
-
- while(lo <= hi)
- {
- ulong mid= lo + (hi - lo)/2;
- ulong mid_seq_no= data->at(mid)->get_seq_no();
- if(mid_seq_no == seq_no)
- break;
- if(mid_seq_no > seq_no)
- hi= mid - 1;
- else
- lo= mid + 1;
- }
- return data->at(lo)->get_file_pos();
- }
-
- void fwrite()
- {
-
- }
-
- void fread()
- {
-
- }
-};
-
-class gtid_index_hash : public Sql_alloc
-{
- MEM_ROOT *mem_root;
- HASH hash;
- struct entry: public Sql_alloc
- {
- uint32 domain_id;
- uint32 index;
- entry(uint32 domain_id_, uint32 index_):domain_id(domain_id_), index(index_)
- {}
- };
- Dynamic_array<gtid_index *> arr;
-public:
- gtid_index_hash(MEM_ROOT *mem_root_):mem_root(mem_root_),
- arr(mem_root)
- {
- my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
- offsetof(entry, domain_id), sizeof(uint32), NULL, my_free,
- HASH_UNIQUE);
- };
- void insert(rpl_gtid &gtid, ulong file_pos)
- {
- entry *e;
- if((e= (entry *) my_hash_search(&hash,
- (const uchar *)(&gtid.domain_id), 0)))
- {
- arr.at(e->index)->insert(gtid.seq_no, file_pos);
- }
- else
- {
- entry *e= new(mem_root) entry(gtid.domain_id, (uint32)arr.size());
- my_hash_insert(&hash, (uchar *)e);
- arr.append_val(new(mem_root) gtid_index(mem_root, gtid.domain_id));
- }
- }
-
- ulong find(rpl_gtid &gtid)
- {
- entry *e;
- if((e= (entry *) my_hash_search(&hash,
- (const uchar *)(&gtid.domain_id), 0)))
- {
- return arr.at(e->index)->find(gtid.seq_no);
- }
- return 0;
- }
-};
-
gtid_index_hash *gtid_indexes;
class binlog_cache_mngr {
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 4d3c63f598c..9315aac08ff 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -18,6 +18,7 @@
#include "hash.h"
#include "queues.h"
+#include "sql_array.h"
#include <atomic>
/* Definitions for MariaDB global transaction ID (GTID). */
@@ -382,4 +383,136 @@ extern int gtid_check_rpl_slave_state_table(TABLE *table);
extern rpl_gtid *gtid_parse_string_to_list(const char *p, size_t len,
uint32 *out_len);
+class gtid_index : public Sql_alloc
+{
+ MEM_ROOT *mem_root;
+ uint32 domain_id;
+ uint32 seq_no_modulus;
+ IO_CACHE *curr_file;
+ class gtid_index_elem:public Sql_alloc
+ {
+ ulong seq_no;
+ ulong file_pos;
+ //Mostly we will no require this
+ Dynamic_array<ulong> file_pos_array;
+ public:
+ gtid_index_elem(ulong seq_no_, ulong file_pos_): seq_no(seq_no_),
+ file_pos(file_pos_), file_pos_array(PSI_INSTRUMENT_MEM)
+ {}
+ ulong get_seq_no()
+ {
+ return seq_no;
+ }
+ ulong get_file_pos()
+ {
+ return file_pos;
+ }
+ };
+public:
+ gtid_index(MEM_ROOT *mem_root_, uint32 domain_id_, uint32 seq_no_modulus_= 1,
+ IO_CACHE *file):
+ mem_root(mem_root_), domain_id(domain_id_),
+ seq_no_modulus(seq_no_modulus_), curr_file(file)
+ {
+ data= new Dynamic_array<gtid_index_elem *>(mem_root);
+ }
+ Dynamic_array<gtid_index_elem *> *data;
+
+ void insert(ulong seq_no, ulong file_pos)
+ {
+ if(!(seq_no % seq_no_modulus))
+ {
+ data->append_val(new (mem_root) gtid_index_elem(seq_no, file_pos));
+ fwrite(seq_no, file_pos);
+ }
+ }
+
+ /*
+ We will return the closest cached seq_no.
+ After that the caller is supposed to seek into binlog file till it finds
+ the relevant gtid_seq_no.
+ */
+ ulong find(ulong seq_no)
+ {
+ if(!data->elements())
+ return 0;
+ //binary search
+ uint64 lo= 0, hi= data->elements() - 1;
+ if(seq_no % seq_no_modulus)
+ seq_no-= seq_no % seq_no_modulus;
+
+ while(lo < hi)
+ {
+ ulong mid= lo + (hi - lo)/2;
+ ulong mid_seq_no= data->at(mid)->get_seq_no();
+ if(mid_seq_no == seq_no)
+ return data->at(mid)->get_file_pos();
+ if(mid_seq_no > seq_no)
+ hi= mid - 1;
+ else
+ lo= mid + 1;
+ }
+ return data->at(lo)->get_file_pos();
+ }
+
+ void fwrite()
+ {
+
+ }
+
+ void fread()
+ {
+
+ }
+};
+
+class gtid_index_hash : public Sql_alloc
+{
+ MEM_ROOT *mem_root;
+ HASH hash;
+ struct entry: public Sql_alloc
+ {
+ uint32 domain_id;
+ uint32 index;
+ entry(uint32 domain_id_, uint32 index_):domain_id(domain_id_), index(index_)
+ {}
+ };
+ Dynamic_array<gtid_index *> arr;
+public:
+ gtid_index_hash(MEM_ROOT *mem_root_):mem_root(mem_root_),
+ arr(mem_root)
+ {
+ my_hash_init(PSI_INSTRUMENT_ME, &hash, &my_charset_bin, 32,
+ offsetof(entry, domain_id), sizeof(uint32), NULL, my_free,
+ HASH_UNIQUE);
+ };
+ void insert(rpl_gtid &gtid, ulong file_pos)
+ {
+ entry *e;
+ if((e= (entry *) my_hash_search(&hash,
+ (const uchar *)(&gtid.domain_id), 0)))
+ {
+ arr.at(e->index)->insert(gtid.seq_no, file_pos);
+ }
+ else
+ {
+ entry *e= new(mem_root) entry(gtid.domain_id, (uint32)arr.size());
+ my_hash_insert(&hash, (uchar *)e);
+ arr.append_val(new(mem_root) gtid_index(mem_root, gtid.domain_id));
+ }
+ }
+
+ ulong find(rpl_gtid &gtid)
+ {
+ entry *e;
+ if((e= (entry *) my_hash_search(&hash,
+ (const uchar *)(&gtid.domain_id), 0)))
+ {
+ return arr.at(e->index)->find(gtid.seq_no);
+ }
+ return 0;
+ }
+};
+
+
#endif /* RPL_GTID_H */
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 56aaf01f1c5..a6f07ba413c 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -1302,7 +1302,8 @@ end:
*/
static const char *
gtid_find_binlog_file(slave_connection_state *state, char *out_name,
- slave_connection_state *until_gtid_state)
+ slave_connection_state *until_gtid_state,
+ rpl_gtid *out_gtid)
{
MEM_ROOT memroot;
binlog_file_entry *list;
@@ -1375,6 +1376,9 @@ gtid_find_binlog_file(slave_connection_state *state, char *out_name,
for (i= 0; i < glev->count; ++i)
{
const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
+ out_gtid->domain_id= gtid->domain_id;
+ out_gtid->server_id= gtid->server_id;
+ out_gtid->seq_no= gtid->seq_no;
if (!gtid)
{
/*
@@ -2107,7 +2111,7 @@ err:
return info->error;
}
-//extern gtid_index_hash *gtid_indexes;
+extern gtid_index_hash *gtid_indexes;
static int init_binlog_sender(binlog_send_info *info,
LOG_INFO *linfo,
const char *log_ident,
@@ -2202,17 +2206,16 @@ static int init_binlog_sender(binlog_send_info *info,
info->error= error;
return 1;
}
+ rpl_gtid gtid= rpl_gtid{0, 0, 0};
if ((info->errmsg= gtid_find_binlog_file(&info->gtid_state,
search_file_name,
- info->until_gtid_state)))
+ info->until_gtid_state, &gtid)))
{
info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return 1;
}
- /* start from beginning of binlog file */
- //gtid_indexes->insert(info->gtid_state, 1);
- *pos = 4;
+ *pos= gtid_indexes->find(gtid);
}
else
{