diff options
author | Nirbhay Choubey <nirbhay@mariadb.com> | 2014-12-03 22:30:48 -0500 |
---|---|---|
committer | Nirbhay Choubey <nirbhay@mariadb.com> | 2014-12-03 22:30:48 -0500 |
commit | a50ddebb5cfa7b79540d155e8e41c7a07c4c3fbf (patch) | |
tree | f3eb22219d77fe510fce912278a44d0e81650856 /sql/rpl_mi.cc | |
parent | 7bf4f9f7f66192f07fa46ed30c38f8842502fc4f (diff) | |
download | mariadb-git-a50ddebb5cfa7b79540d155e8e41c7a07c4c3fbf.tar.gz |
MDEV-6593 : domain_id based replication filters
Implementation for domain ID based filtering of replication events.
Diffstat (limited to 'sql/rpl_mi.cc')
-rw-r--r-- | sql/rpl_mi.cc | 390 |
1 files changed, 366 insertions, 24 deletions
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 055dd09ac5c..6df9c237904 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -109,12 +109,11 @@ Master_info::~Master_info() @return -1 if first argument is less, 0 if it equal to, 1 if it is greater than the second */ -int change_master_server_id_cmp(ulong *id1, ulong *id2) +static int change_master_id_cmp(const void *id1, const void *id2) { - return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0); + return (*(ulong *) id1 - *(ulong *) id2); } - /** Reports if the s_id server has been configured to ignore events it generates with @@ -132,12 +131,11 @@ bool Master_info::shall_ignore_server_id(ulong s_id) { if (likely(ignore_server_ids.elements == 1)) return (* (ulong*) dynamic_array_ptr(&ignore_server_ids, 0)) == s_id; - else + else return bsearch((const ulong *) &s_id, ignore_server_ids.buffer, ignore_server_ids.elements, sizeof(ulong), - (int (*) (const void*, const void*)) change_master_server_id_cmp) - != NULL; + change_master_id_cmp) != NULL; } void Master_info::clear_in_memory_info(bool all) @@ -195,6 +193,46 @@ void init_master_log_pos(Master_info* mi) DBUG_VOID_RETURN; } +/** + Parses the IO_CACHE for "key=" and returns the "key". + + @param key [OUT] Key buffer + @param max_size [IN] Maximum buffer size + @param f [IN] IO_CACHE file + + @retval 0 Either "key=" or '\n' found + @retval 1 EOF +*/ +static int read_mi_key_from_file(char *key, int max_size, IO_CACHE *f) +{ + int i= 0, c; + char *last_p; + + DBUG_ENTER("read_key_from_file"); + + while (((c= my_b_get(f)) != '\n') && (c != my_b_EOF)) + { + last_p= key + i; + + if (i < max_size) + { + if (c == '=') + { + /* We found '=', replace it by 0 and return. */ + *last_p= 0; + DBUG_RETURN(0); + } + else + *last_p= c; + } + ++i; + } + + if (c == my_b_EOF) + DBUG_RETURN(1); + + DBUG_RETURN(0); +} enum { LINES_IN_MASTER_INFO_WITH_SSL= 14, @@ -499,20 +537,55 @@ file '%s')", fname); } /* - Parse any extra key=value lines. - Ignore unknown lines, to facilitate downgrades. + Parse any extra key=value lines. read_key_from_file() parses the file + for "key=" and returns the "key" if found. The "value" can then the + parsed on case by case basis. The "unknown" lines would be ignored to + facilitate downgrades. */ - while (!init_strvar_from_file(buf, sizeof(buf), &mi->file, 0)) + while (!read_mi_key_from_file(buf, sizeof(buf), &mi->file)) { - if (0 == strncmp(buf, STRING_WITH_LEN("using_gtid="))) + /* using_gtid */ + if (!strncmp(buf, STRING_WITH_LEN("using_gtid"))) + { + int val; + if (!init_intvar_from_file(&val, &mi->file, 0)) + { + if (val == Master_info::USE_GTID_CURRENT_POS) + mi->using_gtid= Master_info::USE_GTID_CURRENT_POS; + else if (val == Master_info::USE_GTID_SLAVE_POS) + mi->using_gtid= Master_info::USE_GTID_SLAVE_POS; + else + mi->using_gtid= Master_info::USE_GTID_NO; + continue; + } else { + sql_print_error("Failed to initialize master info using_gtid"); + goto errwithmsg; + } + } + + /* DO_DOMAIN_IDS */ + if (!strncmp(buf, STRING_WITH_LEN("do_domain_ids"))) { - int val= atoi(buf + sizeof("using_gtid")); - if (val == Master_info::USE_GTID_CURRENT_POS) - mi->using_gtid= Master_info::USE_GTID_CURRENT_POS; - else if (val == Master_info::USE_GTID_SLAVE_POS) - mi->using_gtid= Master_info::USE_GTID_SLAVE_POS; - else - mi->using_gtid= Master_info::USE_GTID_NO; + if (mi->domain_id_filter.init_ids(&mi->file, + Domain_id_filter::DO_DOMAIN_IDS)) + { + sql_print_error("Failed to initialize master info do_domain_ids"); + goto errwithmsg; + } + continue; + } + + /* IGNORE_DOMAIN_IDS */ + if (!strncmp(buf, STRING_WITH_LEN("ignore_domain_ids"))) + { + if (mi->domain_id_filter.init_ids(&mi->file, + Domain_id_filter::IGNORE_DOMAIN_IDS)) + { + sql_print_error("Failed to initialize master info " + "ignore_domain_ids"); + goto errwithmsg; + } + continue; } } } @@ -614,7 +687,7 @@ int flush_master_info(Master_info* mi, if (err) DBUG_RETURN(2); } - + /* produce a line listing the total number and all the ignored server_id:s */ @@ -624,7 +697,7 @@ int flush_master_info(Master_info* mi, (char *) my_malloc((sizeof(global_system_variables.server_id) * 3 + 1) * (1 + mi->ignore_server_ids.elements), MYF(MY_WME)); if (!ignore_server_ids_buf) - DBUG_RETURN(1); + DBUG_RETURN(1); /* error */ ulong cur_len= sprintf(ignore_server_ids_buf, "%u", mi->ignore_server_ids.elements); for (ulong i= 0; i < mi->ignore_server_ids.elements; i++) @@ -635,6 +708,24 @@ int flush_master_info(Master_info* mi, } } + char *do_domain_ids_buf= 0, *ignore_domain_ids_buf= 0; + + do_domain_ids_buf= + mi->domain_id_filter.as_string(Domain_id_filter::DO_DOMAIN_IDS); + if (do_domain_ids_buf == NULL) + { + err= 1; /* error */ + goto done; + } + + ignore_domain_ids_buf= + mi->domain_id_filter.as_string(Domain_id_filter::IGNORE_DOMAIN_IDS); + if (ignore_domain_ids_buf == NULL) + { + err= 1; /* error */ + goto done; + } + /* We flushed the relay log BEFORE the master.info file, because if we crash now, we will get a duplicate event in the relay log at restart. If we @@ -657,7 +748,9 @@ int flush_master_info(Master_info* mi, my_b_printf(file, "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n" "\n\n\n\n\n\n\n\n\n\n\n" - "using_gtid=%d\n", + "using_gtid=%d\n" + "do_domain_ids=%s\n" + "ignore_domain_ids=%s\n", LINES_IN_MASTER_INFO, mi->master_log_name, llstr(mi->master_log_pos, lbuf), mi->host, mi->user, @@ -666,16 +759,24 @@ int flush_master_info(Master_info* mi, mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert, heartbeat_buf, "", ignore_server_ids_buf, "", 0, - mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid); - my_free(ignore_server_ids_buf); + mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid, + do_domain_ids_buf, ignore_domain_ids_buf); err= flush_io_cache(file); - if (sync_masterinfo_period && !err && + if (sync_masterinfo_period && !err && ++(mi->sync_counter) >= sync_masterinfo_period) { err= my_sync(mi->fd, MYF(MY_WME)); mi->sync_counter= 0; } - DBUG_RETURN(-err); + + /* Fix err; flush_io_cache()/my_sync() may return -1 */ + err= (err != 0) ? 1 : 0; + +done: + my_free(ignore_server_ids_buf); + my_free(do_domain_ids_buf); + my_free(ignore_domain_ids_buf); + DBUG_RETURN(err); } @@ -1361,4 +1462,245 @@ bool Master_info_index::stop_all_slaves(THD *thd) DBUG_RETURN(result); } +Domain_id_filter::Domain_id_filter() : m_filter(false) +{ + for (int i= DO_DOMAIN_IDS; i <= IGNORE_DOMAIN_IDS; i ++) + { + my_init_dynamic_array(&m_domain_ids[i], sizeof(ulong), 16, 16, MYF(0)); + } +} + +Domain_id_filter::~Domain_id_filter() +{ + for (int i= DO_DOMAIN_IDS; i <= IGNORE_DOMAIN_IDS; i ++) + { + delete_dynamic(&m_domain_ids[i]); + } +} + +/** + Update m_filter flag for the current group by looking up its domain id in the + domain ids list. DO_DOMAIN_IDS list is only looked-up is both (do & ignore) + list are non-empty. +*/ +void Domain_id_filter::do_filter(ulong domain_id) +{ + DYNAMIC_ARRAY *do_domain_ids= &m_domain_ids[DO_DOMAIN_IDS]; + DYNAMIC_ARRAY *ignore_domain_ids= &m_domain_ids[IGNORE_DOMAIN_IDS]; + + if (do_domain_ids->elements > 0) + { + if (likely(do_domain_ids->elements == 1)) + m_filter= ((* (ulong *) dynamic_array_ptr(do_domain_ids, 0)) + != domain_id); + else + m_filter= (bsearch((const ulong *) &domain_id, do_domain_ids->buffer, + do_domain_ids->elements, sizeof(ulong), + change_master_id_cmp) == NULL); + } + else if (ignore_domain_ids->elements > 0) + { + if (likely(ignore_domain_ids->elements == 1)) + m_filter= ((* (ulong *) dynamic_array_ptr(ignore_domain_ids, 0)) == + domain_id); + else + m_filter= (bsearch((const ulong *) &domain_id, ignore_domain_ids->buffer, + ignore_domain_ids->elements, sizeof(ulong), + change_master_id_cmp) != NULL); + } + return; +} + +/** + Reset m_filter. It should be called when IO thread receives COMMIT_EVENT or + XID_EVENT. +*/ +void Domain_id_filter::reset_filter() +{ + m_filter= false; +} + +/** + Update the do/ignore domain id filter lists. + + @param do_ids [IN] domain ids to be kept + @param ignore_ids [IN] domain ids to be filtered out + @param using_gtid [IN] use GTID? + + @retval false Success + true Error +*/ +bool Domain_id_filter::update_ids(DYNAMIC_ARRAY *do_ids, + DYNAMIC_ARRAY *ignore_ids, + bool using_gtid) +{ + bool do_list_empty, ignore_list_empty; + + if (do_ids) + { + do_list_empty= (do_ids->elements > 0) ? false : true; + } else { + do_list_empty= (m_domain_ids[DO_DOMAIN_IDS].elements > 0) ? false : true; + } + + if (ignore_ids) + { + ignore_list_empty= (ignore_ids->elements > 0) ? false : true; + } else { + ignore_list_empty= (m_domain_ids[IGNORE_DOMAIN_IDS].elements > 0) ? false : + true; + } + + if (!do_list_empty && !ignore_list_empty) + { + sql_print_error("Both DO_DOMAIN_IDS & IGNORE_DOMAIN_IDS lists can't be " + "non-empty at the same time"); + return true; + } + + if (using_gtid == Master_info::USE_GTID_NO && + (!do_list_empty || !ignore_list_empty)) + { + sql_print_error("DO_DOMAIN_IDS or IGNORE_DOMAIN_IDS lists can't be " + "non-empty in non-GTID mode (MASTER_USE_GTID=no)"); + return true; + } + + if (do_ids) + update_change_master_ids(do_ids, &m_domain_ids[DO_DOMAIN_IDS]); + + if (ignore_ids) + update_change_master_ids(ignore_ids, &m_domain_ids[IGNORE_DOMAIN_IDS]); + + m_filter= false; + + return false; +} + +/** + Serialize and store the ids from domain id lists into the thd's protocol + buffer. + + @param thd [IN] thread handler + + @retval void +*/ +void Domain_id_filter::store_ids(THD *thd) +{ + for (int i= DO_DOMAIN_IDS; i <= IGNORE_DOMAIN_IDS; i ++) + { + prot_store_ids(thd, &m_domain_ids[i]); + } +} + +/** + Initialize the given domain_id list (DYNAMIC_ARRAY) with the + space-separated list of numbers from the specified IO_CACHE where + the first number represents the total number of entries to follows. + + @param f [IN] IO_CACHE file + @param type [IN] domain id list type + + @retval false Success + true Error +*/ +bool Domain_id_filter::init_ids(IO_CACHE *f, enum_list_type type) +{ + return init_dynarray_intvar_from_file(&m_domain_ids[type], f); +} + +/** + Return the elements of the give domain id list type as string. + + @param type [IN] domain id list type + + @retval a string buffer storing the total number + of elements followed by the individual + elements (space-separated) in the + specified list. + + Note: Its caller's responsibility to free the returned string buffer. +*/ +char *Domain_id_filter::as_string(enum_list_type type) +{ + char *buf; + size_t sz; + DYNAMIC_ARRAY *ids= &m_domain_ids[type]; + + sz= (sizeof(ulong) * 3 + 1) * (1 + ids->elements); + + if (!(buf= (char *) my_malloc(sz, MYF(MY_WME)))) + return NULL; + + // Store the total number of elements followed by the individual elements. + ulong cur_len= sprintf(buf, "%u", ids->elements); + sz-= cur_len; + + for (uint i= 0; i < ids->elements; i++) + { + ulong domain_id; + get_dynamic(ids, (void *) &domain_id, i); + cur_len+= my_snprintf(buf + cur_len, sz, " %u", domain_id); + sz-= cur_len; + } + return buf; +} + +void update_change_master_ids(DYNAMIC_ARRAY *new_ids, DYNAMIC_ARRAY *old_ids) +{ + reset_dynamic(old_ids); + + /* bsearch requires an ordered list. */ + sort_dynamic(new_ids, change_master_id_cmp); + + for (uint i= 0; i < new_ids->elements; i++) + { + ulong id; + get_dynamic(new_ids, (void *) &id, i); + + if (bsearch((const ulong *) &id, old_ids->buffer, old_ids->elements, + sizeof(ulong), change_master_id_cmp) == NULL) + { + insert_dynamic(old_ids, (ulong *) &id); + } + } + return; +} + +/** + Serialize and store the ids from the given ids DYNAMIC_ARRAY into the thd's + protocol buffer. + + @param thd [IN] thread handler + @param ids [IN] ids list + + @retval void +*/ + +void prot_store_ids(THD *thd, DYNAMIC_ARRAY *ids) +{ + char buff[FN_REFLEN]; + uint i, cur_len; + + for (i= 0, buff[0]= 0, cur_len= 0; i < ids->elements; i++) + { + ulong id, len; + char dbuff[FN_REFLEN]; + get_dynamic(ids, (void *) &id, i); + len= sprintf(dbuff, (i == 0 ? "%lu" : ", %lu"), id); + if (cur_len + len + 4 > FN_REFLEN) + { + /* + break the loop whenever remained space could not fit + ellipses on the next cycle + */ + sprintf(dbuff + cur_len, "..."); + break; + } + cur_len += sprintf(buff + cur_len, "%s", dbuff); + } + thd->protocol->store(buff, &my_charset_bin); + return; +} + #endif /* HAVE_REPLICATION */ |