diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-12-05 16:09:48 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-12-06 08:49:50 +0100 |
commit | db21fddc3740dfa48f3443751c48282467afac5e (patch) | |
tree | 3ed31a4a5ce9abf3fe98cdd9c9bdf68ba8b5832b /sql/rpl_mi.cc | |
parent | 1e3f09f1638e2bdec6029f6c98317d17d7ca76d1 (diff) | |
download | mariadb-git-db21fddc3740dfa48f3443751c48282467afac5e.tar.gz |
MDEV-6676: Optimistic parallel replication
Implement a new mode for parallel replication. In this mode, all transactions
are optimistically attempted applied in parallel. In case of conflicts, the
offending transaction is rolled back and retried later non-parallel.
This is an early-release patch to facilitate testing, more changes to user
interface / options will be expected. The new mode is not enabled by default.
Diffstat (limited to 'sql/rpl_mi.cc')
-rw-r--r-- | sql/rpl_mi.cc | 175 |
1 files changed, 174 insertions, 1 deletions
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 6df9c237904..9857a888340 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -72,6 +72,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, connection_name.length); copy_filter_setting(rpl_filter, global_rpl_filter); + mi_slave_parallel_mode_lookup(&connection_name, ¶llel_mode); + my_init_dynamic_array(&ignore_server_ids, sizeof(global_system_variables.server_id), 16, 16, MYF(0)); @@ -176,6 +178,7 @@ void init_master_log_pos(Master_info* mi) mi->events_queued_since_last_gtid= 0; mi->gtid_reconnect_event_skip_count= 0; mi->gtid_event_seen= false; + mi_slave_parallel_mode_lookup(&mi->connection_name, &mi->parallel_mode); /* Intentionally init ssl_verify_server_cert to 0, no option available */ mi->ssl_verify_server_cert= 0; @@ -587,6 +590,14 @@ file '%s')", fname); } continue; } + else if (!strncmp(buf, STRING_WITH_LEN("END_MARKER"))) + { + /* + Guard agaist extra left-overs at the end of file, in case a later + update causes the file to shrink compared to earlier contents. + */ + break; + } } } } @@ -750,7 +761,8 @@ int flush_master_info(Master_info* mi, "\n\n\n\n\n\n\n\n\n\n\n" "using_gtid=%d\n" "do_domain_ids=%s\n" - "ignore_domain_ids=%s\n", + "ignore_domain_ids=%s\n" + "END_MARKER\n", LINES_IN_MASTER_INFO, mi->master_log_name, llstr(mi->master_log_pos, lbuf), mi->host, mi->user, @@ -1703,4 +1715,165 @@ void prot_store_ids(THD *thd, DYNAMIC_ARRAY *ids) return; } +/* + We need to handle per-Master_info command line options special. + Because when command line options are parsed, we do not yet have any + Master_info objects. +*/ +static HASH mi_cmdline_hash; +static bool mi_cmdline_hash_inited= false; +struct mi_cmdline_entry { + LEX_STRING cmp_connection_name; + ulonglong parallel_mode; +}; + + +static uchar * +get_key_cmdline(const uchar *ptr, size_t *length_p, + my_bool unused __attribute__((unused))) +{ + mi_cmdline_entry *entry= (mi_cmdline_entry *)ptr; + *length_p= entry->cmp_connection_name.length; + return (uchar *)entry->cmp_connection_name.str; +} + + +static void +free_key_cmdline(void *ptr) +{ + mi_cmdline_entry *entry= (mi_cmdline_entry *)ptr; + my_free(entry->cmp_connection_name.str); + my_free(entry); +} + + +int +mi_cmdline_init() +{ + if (mi_cmdline_hash_inited) + return 0; + if (my_hash_init(&mi_cmdline_hash, system_charset_info, + MAX_REPLICATION_THREAD, 0, 0, get_key_cmdline, + free_key_cmdline, HASH_UNIQUE)) + { + sql_print_error("Initializing Master_info command line option hash table failed"); + return 1; + } + mi_cmdline_hash_inited= true; + return 0; +} + + +void +mi_cmdline_destroy() +{ + if (mi_cmdline_hash_inited) + my_hash_free(&mi_cmdline_hash); + mi_cmdline_hash_inited= false; +} + + +static mi_cmdline_entry * +mi_cmdline_entry_get(LEX_STRING *connection_name, bool create_if_missing) +{ + LEX_STRING cmp_connection_name; + mi_cmdline_entry *entry; + + if (!mi_cmdline_hash_inited && + (!create_if_missing || mi_cmdline_init())) + return NULL; + + /* Create a lowercase key for hash lookup. */ + cmp_connection_name.length= connection_name->length; + if (!(cmp_connection_name.str= (char *)my_malloc(connection_name->length+1, + MYF(MY_WME)))) + { + if (create_if_missing) + my_error(ER_OUTOFMEMORY, MYF(0), connection_name->length+1); + return NULL; + } + memcpy(cmp_connection_name.str, connection_name->str, + connection_name->length + 1); + my_casedn_str(system_charset_info, cmp_connection_name.str); + + if ((entry= (mi_cmdline_entry *) + my_hash_search(&mi_cmdline_hash, (uchar *)cmp_connection_name.str, + cmp_connection_name.length)) || + !create_if_missing) + { + my_free(cmp_connection_name.str); + return entry; + } + + if (!(entry= (mi_cmdline_entry *)my_malloc(sizeof(*entry), MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*entry)); + my_free(cmp_connection_name.str); + return NULL; + } + entry->cmp_connection_name= cmp_connection_name; + entry->parallel_mode= opt_slave_parallel_mode; + if (my_hash_insert(&mi_cmdline_hash, (uchar *)entry)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + my_free(entry); + my_free(cmp_connection_name.str); + return NULL; + } + return entry; +} + + +/* + Look up a command line value for slave_parallel_mode. The value is returned + in *out_mode. If no command line value was given for this particular + connection name, the default value in opt_slave_parallel_mode is used. +*/ +void +mi_slave_parallel_mode_lookup(LEX_STRING *connection_name, ulonglong *out_mode) +{ + mi_cmdline_entry *entry; + + if (!mi_cmdline_hash_inited || + !(entry= mi_cmdline_entry_get(connection_name, false))) + *out_mode= opt_slave_parallel_mode; + else + *out_mode= entry->parallel_mode; +} + + +/* + Get a pointer to the location holding the value of the slave_parallel_mode + command line option for the given connection name. The pointer is returned + in *out_mode_ptr + + If create_if_missing is true, then a new entry will be created if one did + not already exists. If false, then NULL will be returned in *out_mode_ptr if + an entry does not exist. + + Returns 1 on error, 0 if ok. +*/ +int +mi_slave_parallel_mode_ptr(LEX_STRING *connection_name, + ulonglong **out_mode_ptr, bool create_if_missing) +{ + mi_cmdline_entry *entry; + + *out_mode_ptr= NULL; + if (!create_if_missing && !mi_cmdline_hash_inited) + return 0; + + entry= mi_cmdline_entry_get(connection_name, create_if_missing); + if (!entry) + { + if (create_if_missing) + return 1; + else + return 0; + } + *out_mode_ptr= &entry->parallel_mode; + return 0; +} + + #endif /* HAVE_REPLICATION */ |