summaryrefslogtreecommitdiff
path: root/sql/rpl_mi.cc
diff options
context:
space:
mode:
authorNirbhay Choubey <nirbhay@mariadb.com>2014-12-03 22:30:48 -0500
committerNirbhay Choubey <nirbhay@mariadb.com>2014-12-03 22:30:48 -0500
commita50ddebb5cfa7b79540d155e8e41c7a07c4c3fbf (patch)
treef3eb22219d77fe510fce912278a44d0e81650856 /sql/rpl_mi.cc
parent7bf4f9f7f66192f07fa46ed30c38f8842502fc4f (diff)
downloadmariadb-git-a50ddebb5cfa7b79540d155e8e41c7a07c4c3fbf.tar.gz
MDEV-6593 : domain_id based replication filters
Implementation for domain ID based filtering of replication events.
Diffstat (limited to 'sql/rpl_mi.cc')
-rw-r--r--sql/rpl_mi.cc390
1 files changed, 366 insertions, 24 deletions
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 055dd09ac5c..6df9c237904 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -109,12 +109,11 @@ Master_info::~Master_info()
@return -1 if first argument is less, 0 if it equal to, 1 if it is greater
than the second
*/
-int change_master_server_id_cmp(ulong *id1, ulong *id2)
+static int change_master_id_cmp(const void *id1, const void *id2)
{
- return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
+ return (*(ulong *) id1 - *(ulong *) id2);
}
-
/**
Reports if the s_id server has been configured to ignore events
it generates with
@@ -132,12 +131,11 @@ bool Master_info::shall_ignore_server_id(ulong s_id)
{
if (likely(ignore_server_ids.elements == 1))
return (* (ulong*) dynamic_array_ptr(&ignore_server_ids, 0)) == s_id;
- else
+ else
return bsearch((const ulong *) &s_id,
ignore_server_ids.buffer,
ignore_server_ids.elements, sizeof(ulong),
- (int (*) (const void*, const void*)) change_master_server_id_cmp)
- != NULL;
+ change_master_id_cmp) != NULL;
}
void Master_info::clear_in_memory_info(bool all)
@@ -195,6 +193,46 @@ void init_master_log_pos(Master_info* mi)
DBUG_VOID_RETURN;
}
+/**
+ Parses the IO_CACHE for "key=" and returns the "key".
+
+ @param key [OUT] Key buffer
+ @param max_size [IN] Maximum buffer size
+ @param f [IN] IO_CACHE file
+
+ @retval 0 Either "key=" or '\n' found
+ @retval 1 EOF
+*/
+static int read_mi_key_from_file(char *key, int max_size, IO_CACHE *f)
+{
+ int i= 0, c;
+ char *last_p;
+
+ DBUG_ENTER("read_key_from_file");
+
+ while (((c= my_b_get(f)) != '\n') && (c != my_b_EOF))
+ {
+ last_p= key + i;
+
+ if (i < max_size)
+ {
+ if (c == '=')
+ {
+ /* We found '=', replace it by 0 and return. */
+ *last_p= 0;
+ DBUG_RETURN(0);
+ }
+ else
+ *last_p= c;
+ }
+ ++i;
+ }
+
+ if (c == my_b_EOF)
+ DBUG_RETURN(1);
+
+ DBUG_RETURN(0);
+}
enum {
LINES_IN_MASTER_INFO_WITH_SSL= 14,
@@ -499,20 +537,55 @@ file '%s')", fname);
}
/*
- Parse any extra key=value lines.
- Ignore unknown lines, to facilitate downgrades.
+ Parse any extra key=value lines. read_key_from_file() parses the file
+ for "key=" and returns the "key" if found. The "value" can then the
+ parsed on case by case basis. The "unknown" lines would be ignored to
+ facilitate downgrades.
*/
- while (!init_strvar_from_file(buf, sizeof(buf), &mi->file, 0))
+ while (!read_mi_key_from_file(buf, sizeof(buf), &mi->file))
{
- if (0 == strncmp(buf, STRING_WITH_LEN("using_gtid=")))
+ /* using_gtid */
+ if (!strncmp(buf, STRING_WITH_LEN("using_gtid")))
+ {
+ int val;
+ if (!init_intvar_from_file(&val, &mi->file, 0))
+ {
+ if (val == Master_info::USE_GTID_CURRENT_POS)
+ mi->using_gtid= Master_info::USE_GTID_CURRENT_POS;
+ else if (val == Master_info::USE_GTID_SLAVE_POS)
+ mi->using_gtid= Master_info::USE_GTID_SLAVE_POS;
+ else
+ mi->using_gtid= Master_info::USE_GTID_NO;
+ continue;
+ } else {
+ sql_print_error("Failed to initialize master info using_gtid");
+ goto errwithmsg;
+ }
+ }
+
+ /* DO_DOMAIN_IDS */
+ if (!strncmp(buf, STRING_WITH_LEN("do_domain_ids")))
{
- int val= atoi(buf + sizeof("using_gtid"));
- if (val == Master_info::USE_GTID_CURRENT_POS)
- mi->using_gtid= Master_info::USE_GTID_CURRENT_POS;
- else if (val == Master_info::USE_GTID_SLAVE_POS)
- mi->using_gtid= Master_info::USE_GTID_SLAVE_POS;
- else
- mi->using_gtid= Master_info::USE_GTID_NO;
+ if (mi->domain_id_filter.init_ids(&mi->file,
+ Domain_id_filter::DO_DOMAIN_IDS))
+ {
+ sql_print_error("Failed to initialize master info do_domain_ids");
+ goto errwithmsg;
+ }
+ continue;
+ }
+
+ /* IGNORE_DOMAIN_IDS */
+ if (!strncmp(buf, STRING_WITH_LEN("ignore_domain_ids")))
+ {
+ if (mi->domain_id_filter.init_ids(&mi->file,
+ Domain_id_filter::IGNORE_DOMAIN_IDS))
+ {
+ sql_print_error("Failed to initialize master info "
+ "ignore_domain_ids");
+ goto errwithmsg;
+ }
+ continue;
}
}
}
@@ -614,7 +687,7 @@ int flush_master_info(Master_info* mi,
if (err)
DBUG_RETURN(2);
}
-
+
/*
produce a line listing the total number and all the ignored server_id:s
*/
@@ -624,7 +697,7 @@ int flush_master_info(Master_info* mi,
(char *) my_malloc((sizeof(global_system_variables.server_id) * 3 + 1) *
(1 + mi->ignore_server_ids.elements), MYF(MY_WME));
if (!ignore_server_ids_buf)
- DBUG_RETURN(1);
+ DBUG_RETURN(1); /* error */
ulong cur_len= sprintf(ignore_server_ids_buf, "%u",
mi->ignore_server_ids.elements);
for (ulong i= 0; i < mi->ignore_server_ids.elements; i++)
@@ -635,6 +708,24 @@ int flush_master_info(Master_info* mi,
}
}
+ char *do_domain_ids_buf= 0, *ignore_domain_ids_buf= 0;
+
+ do_domain_ids_buf=
+ mi->domain_id_filter.as_string(Domain_id_filter::DO_DOMAIN_IDS);
+ if (do_domain_ids_buf == NULL)
+ {
+ err= 1; /* error */
+ goto done;
+ }
+
+ ignore_domain_ids_buf=
+ mi->domain_id_filter.as_string(Domain_id_filter::IGNORE_DOMAIN_IDS);
+ if (ignore_domain_ids_buf == NULL)
+ {
+ err= 1; /* error */
+ goto done;
+ }
+
/*
We flushed the relay log BEFORE the master.info file, because if we crash
now, we will get a duplicate event in the relay log at restart. If we
@@ -657,7 +748,9 @@ int flush_master_info(Master_info* mi,
my_b_printf(file,
"%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n"
"\n\n\n\n\n\n\n\n\n\n\n"
- "using_gtid=%d\n",
+ "using_gtid=%d\n"
+ "do_domain_ids=%s\n"
+ "ignore_domain_ids=%s\n",
LINES_IN_MASTER_INFO,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
mi->host, mi->user,
@@ -666,16 +759,24 @@ int flush_master_info(Master_info* mi,
mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert,
heartbeat_buf, "", ignore_server_ids_buf,
"", 0,
- mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid);
- my_free(ignore_server_ids_buf);
+ mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid,
+ do_domain_ids_buf, ignore_domain_ids_buf);
err= flush_io_cache(file);
- if (sync_masterinfo_period && !err &&
+ if (sync_masterinfo_period && !err &&
++(mi->sync_counter) >= sync_masterinfo_period)
{
err= my_sync(mi->fd, MYF(MY_WME));
mi->sync_counter= 0;
}
- DBUG_RETURN(-err);
+
+ /* Fix err; flush_io_cache()/my_sync() may return -1 */
+ err= (err != 0) ? 1 : 0;
+
+done:
+ my_free(ignore_server_ids_buf);
+ my_free(do_domain_ids_buf);
+ my_free(ignore_domain_ids_buf);
+ DBUG_RETURN(err);
}
@@ -1361,4 +1462,245 @@ bool Master_info_index::stop_all_slaves(THD *thd)
DBUG_RETURN(result);
}
+Domain_id_filter::Domain_id_filter() : m_filter(false)
+{
+ for (int i= DO_DOMAIN_IDS; i <= IGNORE_DOMAIN_IDS; i ++)
+ {
+ my_init_dynamic_array(&m_domain_ids[i], sizeof(ulong), 16, 16, MYF(0));
+ }
+}
+
+Domain_id_filter::~Domain_id_filter()
+{
+ for (int i= DO_DOMAIN_IDS; i <= IGNORE_DOMAIN_IDS; i ++)
+ {
+ delete_dynamic(&m_domain_ids[i]);
+ }
+}
+
+/**
+ Update m_filter flag for the current group by looking up its domain id in the
+ domain ids list. DO_DOMAIN_IDS list is only looked-up is both (do & ignore)
+ list are non-empty.
+*/
+void Domain_id_filter::do_filter(ulong domain_id)
+{
+ DYNAMIC_ARRAY *do_domain_ids= &m_domain_ids[DO_DOMAIN_IDS];
+ DYNAMIC_ARRAY *ignore_domain_ids= &m_domain_ids[IGNORE_DOMAIN_IDS];
+
+ if (do_domain_ids->elements > 0)
+ {
+ if (likely(do_domain_ids->elements == 1))
+ m_filter= ((* (ulong *) dynamic_array_ptr(do_domain_ids, 0))
+ != domain_id);
+ else
+ m_filter= (bsearch((const ulong *) &domain_id, do_domain_ids->buffer,
+ do_domain_ids->elements, sizeof(ulong),
+ change_master_id_cmp) == NULL);
+ }
+ else if (ignore_domain_ids->elements > 0)
+ {
+ if (likely(ignore_domain_ids->elements == 1))
+ m_filter= ((* (ulong *) dynamic_array_ptr(ignore_domain_ids, 0)) ==
+ domain_id);
+ else
+ m_filter= (bsearch((const ulong *) &domain_id, ignore_domain_ids->buffer,
+ ignore_domain_ids->elements, sizeof(ulong),
+ change_master_id_cmp) != NULL);
+ }
+ return;
+}
+
+/**
+ Reset m_filter. It should be called when IO thread receives COMMIT_EVENT or
+ XID_EVENT.
+*/
+void Domain_id_filter::reset_filter()
+{
+ m_filter= false;
+}
+
+/**
+ Update the do/ignore domain id filter lists.
+
+ @param do_ids [IN] domain ids to be kept
+ @param ignore_ids [IN] domain ids to be filtered out
+ @param using_gtid [IN] use GTID?
+
+ @retval false Success
+ true Error
+*/
+bool Domain_id_filter::update_ids(DYNAMIC_ARRAY *do_ids,
+ DYNAMIC_ARRAY *ignore_ids,
+ bool using_gtid)
+{
+ bool do_list_empty, ignore_list_empty;
+
+ if (do_ids)
+ {
+ do_list_empty= (do_ids->elements > 0) ? false : true;
+ } else {
+ do_list_empty= (m_domain_ids[DO_DOMAIN_IDS].elements > 0) ? false : true;
+ }
+
+ if (ignore_ids)
+ {
+ ignore_list_empty= (ignore_ids->elements > 0) ? false : true;
+ } else {
+ ignore_list_empty= (m_domain_ids[IGNORE_DOMAIN_IDS].elements > 0) ? false :
+ true;
+ }
+
+ if (!do_list_empty && !ignore_list_empty)
+ {
+ sql_print_error("Both DO_DOMAIN_IDS & IGNORE_DOMAIN_IDS lists can't be "
+ "non-empty at the same time");
+ return true;
+ }
+
+ if (using_gtid == Master_info::USE_GTID_NO &&
+ (!do_list_empty || !ignore_list_empty))
+ {
+ sql_print_error("DO_DOMAIN_IDS or IGNORE_DOMAIN_IDS lists can't be "
+ "non-empty in non-GTID mode (MASTER_USE_GTID=no)");
+ return true;
+ }
+
+ if (do_ids)
+ update_change_master_ids(do_ids, &m_domain_ids[DO_DOMAIN_IDS]);
+
+ if (ignore_ids)
+ update_change_master_ids(ignore_ids, &m_domain_ids[IGNORE_DOMAIN_IDS]);
+
+ m_filter= false;
+
+ return false;
+}
+
+/**
+ Serialize and store the ids from domain id lists into the thd's protocol
+ buffer.
+
+ @param thd [IN] thread handler
+
+ @retval void
+*/
+void Domain_id_filter::store_ids(THD *thd)
+{
+ for (int i= DO_DOMAIN_IDS; i <= IGNORE_DOMAIN_IDS; i ++)
+ {
+ prot_store_ids(thd, &m_domain_ids[i]);
+ }
+}
+
+/**
+ Initialize the given domain_id list (DYNAMIC_ARRAY) with the
+ space-separated list of numbers from the specified IO_CACHE where
+ the first number represents the total number of entries to follows.
+
+ @param f [IN] IO_CACHE file
+ @param type [IN] domain id list type
+
+ @retval false Success
+ true Error
+*/
+bool Domain_id_filter::init_ids(IO_CACHE *f, enum_list_type type)
+{
+ return init_dynarray_intvar_from_file(&m_domain_ids[type], f);
+}
+
+/**
+ Return the elements of the give domain id list type as string.
+
+ @param type [IN] domain id list type
+
+ @retval a string buffer storing the total number
+ of elements followed by the individual
+ elements (space-separated) in the
+ specified list.
+
+ Note: Its caller's responsibility to free the returned string buffer.
+*/
+char *Domain_id_filter::as_string(enum_list_type type)
+{
+ char *buf;
+ size_t sz;
+ DYNAMIC_ARRAY *ids= &m_domain_ids[type];
+
+ sz= (sizeof(ulong) * 3 + 1) * (1 + ids->elements);
+
+ if (!(buf= (char *) my_malloc(sz, MYF(MY_WME))))
+ return NULL;
+
+ // Store the total number of elements followed by the individual elements.
+ ulong cur_len= sprintf(buf, "%u", ids->elements);
+ sz-= cur_len;
+
+ for (uint i= 0; i < ids->elements; i++)
+ {
+ ulong domain_id;
+ get_dynamic(ids, (void *) &domain_id, i);
+ cur_len+= my_snprintf(buf + cur_len, sz, " %u", domain_id);
+ sz-= cur_len;
+ }
+ return buf;
+}
+
+void update_change_master_ids(DYNAMIC_ARRAY *new_ids, DYNAMIC_ARRAY *old_ids)
+{
+ reset_dynamic(old_ids);
+
+ /* bsearch requires an ordered list. */
+ sort_dynamic(new_ids, change_master_id_cmp);
+
+ for (uint i= 0; i < new_ids->elements; i++)
+ {
+ ulong id;
+ get_dynamic(new_ids, (void *) &id, i);
+
+ if (bsearch((const ulong *) &id, old_ids->buffer, old_ids->elements,
+ sizeof(ulong), change_master_id_cmp) == NULL)
+ {
+ insert_dynamic(old_ids, (ulong *) &id);
+ }
+ }
+ return;
+}
+
+/**
+ Serialize and store the ids from the given ids DYNAMIC_ARRAY into the thd's
+ protocol buffer.
+
+ @param thd [IN] thread handler
+ @param ids [IN] ids list
+
+ @retval void
+*/
+
+void prot_store_ids(THD *thd, DYNAMIC_ARRAY *ids)
+{
+ char buff[FN_REFLEN];
+ uint i, cur_len;
+
+ for (i= 0, buff[0]= 0, cur_len= 0; i < ids->elements; i++)
+ {
+ ulong id, len;
+ char dbuff[FN_REFLEN];
+ get_dynamic(ids, (void *) &id, i);
+ len= sprintf(dbuff, (i == 0 ? "%lu" : ", %lu"), id);
+ if (cur_len + len + 4 > FN_REFLEN)
+ {
+ /*
+ break the loop whenever remained space could not fit
+ ellipses on the next cycle
+ */
+ sprintf(dbuff + cur_len, "...");
+ break;
+ }
+ cur_len += sprintf(buff + cur_len, "%s", dbuff);
+ }
+ thd->protocol->store(buff, &my_charset_bin);
+ return;
+}
+
#endif /* HAVE_REPLICATION */