summaryrefslogtreecommitdiff
path: root/sql/rpl_mi.cc
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2014-12-05 16:09:48 +0100
committerKristian Nielsen <knielsen@knielsen-hq.org>2014-12-06 08:49:50 +0100
commitdb21fddc3740dfa48f3443751c48282467afac5e (patch)
tree3ed31a4a5ce9abf3fe98cdd9c9bdf68ba8b5832b /sql/rpl_mi.cc
parent1e3f09f1638e2bdec6029f6c98317d17d7ca76d1 (diff)
downloadmariadb-git-db21fddc3740dfa48f3443751c48282467afac5e.tar.gz
MDEV-6676: Optimistic parallel replication
Implement a new mode for parallel replication. In this mode, all transactions are optimistically attempted applied in parallel. In case of conflicts, the offending transaction is rolled back and retried later non-parallel. This is an early-release patch to facilitate testing, more changes to user interface / options will be expected. The new mode is not enabled by default.
Diffstat (limited to 'sql/rpl_mi.cc')
-rw-r--r--sql/rpl_mi.cc175
1 files changed, 174 insertions, 1 deletions
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 6df9c237904..9857a888340 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -72,6 +72,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
connection_name.length);
copy_filter_setting(rpl_filter, global_rpl_filter);
+ mi_slave_parallel_mode_lookup(&connection_name, &parallel_mode);
+
my_init_dynamic_array(&ignore_server_ids,
sizeof(global_system_variables.server_id), 16, 16,
MYF(0));
@@ -176,6 +178,7 @@ void init_master_log_pos(Master_info* mi)
mi->events_queued_since_last_gtid= 0;
mi->gtid_reconnect_event_skip_count= 0;
mi->gtid_event_seen= false;
+ mi_slave_parallel_mode_lookup(&mi->connection_name, &mi->parallel_mode);
/* Intentionally init ssl_verify_server_cert to 0, no option available */
mi->ssl_verify_server_cert= 0;
@@ -587,6 +590,14 @@ file '%s')", fname);
}
continue;
}
+ else if (!strncmp(buf, STRING_WITH_LEN("END_MARKER")))
+ {
+ /*
+ Guard agaist extra left-overs at the end of file, in case a later
+ update causes the file to shrink compared to earlier contents.
+ */
+ break;
+ }
}
}
}
@@ -750,7 +761,8 @@ int flush_master_info(Master_info* mi,
"\n\n\n\n\n\n\n\n\n\n\n"
"using_gtid=%d\n"
"do_domain_ids=%s\n"
- "ignore_domain_ids=%s\n",
+ "ignore_domain_ids=%s\n"
+ "END_MARKER\n",
LINES_IN_MASTER_INFO,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
mi->host, mi->user,
@@ -1703,4 +1715,165 @@ void prot_store_ids(THD *thd, DYNAMIC_ARRAY *ids)
return;
}
+/*
+ We need to handle per-Master_info command line options special.
+ Because when command line options are parsed, we do not yet have any
+ Master_info objects.
+*/
+static HASH mi_cmdline_hash;
+static bool mi_cmdline_hash_inited= false;
+struct mi_cmdline_entry {
+ LEX_STRING cmp_connection_name;
+ ulonglong parallel_mode;
+};
+
+
+static uchar *
+get_key_cmdline(const uchar *ptr, size_t *length_p,
+ my_bool unused __attribute__((unused)))
+{
+ mi_cmdline_entry *entry= (mi_cmdline_entry *)ptr;
+ *length_p= entry->cmp_connection_name.length;
+ return (uchar *)entry->cmp_connection_name.str;
+}
+
+
+static void
+free_key_cmdline(void *ptr)
+{
+ mi_cmdline_entry *entry= (mi_cmdline_entry *)ptr;
+ my_free(entry->cmp_connection_name.str);
+ my_free(entry);
+}
+
+
+int
+mi_cmdline_init()
+{
+ if (mi_cmdline_hash_inited)
+ return 0;
+ if (my_hash_init(&mi_cmdline_hash, system_charset_info,
+ MAX_REPLICATION_THREAD, 0, 0, get_key_cmdline,
+ free_key_cmdline, HASH_UNIQUE))
+ {
+ sql_print_error("Initializing Master_info command line option hash table failed");
+ return 1;
+ }
+ mi_cmdline_hash_inited= true;
+ return 0;
+}
+
+
+void
+mi_cmdline_destroy()
+{
+ if (mi_cmdline_hash_inited)
+ my_hash_free(&mi_cmdline_hash);
+ mi_cmdline_hash_inited= false;
+}
+
+
+static mi_cmdline_entry *
+mi_cmdline_entry_get(LEX_STRING *connection_name, bool create_if_missing)
+{
+ LEX_STRING cmp_connection_name;
+ mi_cmdline_entry *entry;
+
+ if (!mi_cmdline_hash_inited &&
+ (!create_if_missing || mi_cmdline_init()))
+ return NULL;
+
+ /* Create a lowercase key for hash lookup. */
+ cmp_connection_name.length= connection_name->length;
+ if (!(cmp_connection_name.str= (char *)my_malloc(connection_name->length+1,
+ MYF(MY_WME))))
+ {
+ if (create_if_missing)
+ my_error(ER_OUTOFMEMORY, MYF(0), connection_name->length+1);
+ return NULL;
+ }
+ memcpy(cmp_connection_name.str, connection_name->str,
+ connection_name->length + 1);
+ my_casedn_str(system_charset_info, cmp_connection_name.str);
+
+ if ((entry= (mi_cmdline_entry *)
+ my_hash_search(&mi_cmdline_hash, (uchar *)cmp_connection_name.str,
+ cmp_connection_name.length)) ||
+ !create_if_missing)
+ {
+ my_free(cmp_connection_name.str);
+ return entry;
+ }
+
+ if (!(entry= (mi_cmdline_entry *)my_malloc(sizeof(*entry), MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*entry));
+ my_free(cmp_connection_name.str);
+ return NULL;
+ }
+ entry->cmp_connection_name= cmp_connection_name;
+ entry->parallel_mode= opt_slave_parallel_mode;
+ if (my_hash_insert(&mi_cmdline_hash, (uchar *)entry))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ my_free(entry);
+ my_free(cmp_connection_name.str);
+ return NULL;
+ }
+ return entry;
+}
+
+
+/*
+ Look up a command line value for slave_parallel_mode. The value is returned
+ in *out_mode. If no command line value was given for this particular
+ connection name, the default value in opt_slave_parallel_mode is used.
+*/
+void
+mi_slave_parallel_mode_lookup(LEX_STRING *connection_name, ulonglong *out_mode)
+{
+ mi_cmdline_entry *entry;
+
+ if (!mi_cmdline_hash_inited ||
+ !(entry= mi_cmdline_entry_get(connection_name, false)))
+ *out_mode= opt_slave_parallel_mode;
+ else
+ *out_mode= entry->parallel_mode;
+}
+
+
+/*
+ Get a pointer to the location holding the value of the slave_parallel_mode
+ command line option for the given connection name. The pointer is returned
+ in *out_mode_ptr
+
+ If create_if_missing is true, then a new entry will be created if one did
+ not already exists. If false, then NULL will be returned in *out_mode_ptr if
+ an entry does not exist.
+
+ Returns 1 on error, 0 if ok.
+*/
+int
+mi_slave_parallel_mode_ptr(LEX_STRING *connection_name,
+ ulonglong **out_mode_ptr, bool create_if_missing)
+{
+ mi_cmdline_entry *entry;
+
+ *out_mode_ptr= NULL;
+ if (!create_if_missing && !mi_cmdline_hash_inited)
+ return 0;
+
+ entry= mi_cmdline_entry_get(connection_name, create_if_missing);
+ if (!entry)
+ {
+ if (create_if_missing)
+ return 1;
+ else
+ return 0;
+ }
+ *out_mode_ptr= &entry->parallel_mode;
+ return 0;
+}
+
+
#endif /* HAVE_REPLICATION */