diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/handler.cc | 48 | ||||
-rw-r--r-- | sql/log.cc | 2 | ||||
-rw-r--r-- | sql/mysqld.cc | 23 | ||||
-rw-r--r-- | sql/sql_class.cc | 4 | ||||
-rw-r--r-- | sql/sql_class.h | 2 | ||||
-rw-r--r-- | sql/sql_parse.cc | 93 | ||||
-rw-r--r-- | sql/sql_partition_admin.cc | 2 | ||||
-rw-r--r-- | sql/sql_table.cc | 14 | ||||
-rw-r--r-- | sql/sys_vars.cc | 12 | ||||
-rw-r--r-- | sql/wsrep_applier.cc | 7 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 1 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 231 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 7 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 9 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 14 | ||||
-rw-r--r-- | sql/wsrep_utils.h | 3 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 49 | ||||
-rw-r--r-- | sql/wsrep_var.h | 4 |
18 files changed, 342 insertions, 183 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index d5d57f3e9a8..6b7185ec1c7 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -72,6 +72,14 @@ KEY_CREATE_INFO default_key_create_info= ulong total_ha= 0; /* number of storage engines (from handlertons[]) that support 2pc */ ulong total_ha_2pc= 0; +#ifndef DBUG_OFF +/* + Number of non-mandatory 2pc handlertons whose initialization failed + to estimate total_ha_2pc value under supposition of the failures + have not occcured. +*/ +ulong failed_ha_2pc= 0; +#endif /* size of savepoint storage area (see ha_init) */ ulong savepoint_alloc_size= 0; @@ -648,6 +656,10 @@ err_deinit: (void) plugin->plugin->deinit(NULL); err: +#ifndef DBUG_OFF + if (hton->prepare && hton->state == SHOW_OPTION_YES) + failed_ha_2pc++; +#endif my_free(hton); err_no_hton_memory: plugin->data= NULL; @@ -1659,6 +1671,11 @@ int ha_rollback_trans(THD *thd, bool all) { // cannot happen my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err); error=1; +#ifdef WITH_WSREP + WSREP_WARN("handlerton rollback failed, thd %lu %lld conf %d SQL %s", + thd->thread_id, thd->query_id, thd->wsrep_conflict_state, + thd->query()); +#endif /* WITH_WSREP */ } status_var_increment(thd->status_var.ha_rollback_count); ha_info_next= ha_info->next(); @@ -1853,7 +1870,7 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, { #ifndef DBUG_OFF char buf[XIDDATASIZE*4+6]; // see xid_to_str - sql_print_information("ignore xid %s", xid_to_str(buf, info->list+i)); + DBUG_PRINT("info", ("ignore xid %s", xid_to_str(buf, info->list+i))); #endif xid_cache_insert(info->list+i, XA_PREPARED); info->found_foreign_xids++; @@ -1870,19 +1887,31 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin, tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT) { #ifndef DBUG_OFF - char buf[XIDDATASIZE*4+6]; // see xid_to_str - sql_print_information("commit xid %s", xid_to_str(buf, info->list+i)); + int rc= +#endif + hton->commit_by_xid(hton, info->list+i); +#ifndef DBUG_OFF + if (rc == 0) + { + char buf[XIDDATASIZE*4+6]; // see xid_to_str + DBUG_PRINT("info", ("commit xid %s", xid_to_str(buf, info->list+i))); + } #endif - hton->commit_by_xid(hton, info->list+i); } else { #ifndef DBUG_OFF - char buf[XIDDATASIZE*4+6]; // see xid_to_str - sql_print_information("rollback xid %s", - xid_to_str(buf, info->list+i)); + int rc= +#endif + hton->rollback_by_xid(hton, info->list+i); +#ifndef DBUG_OFF + if (rc == 0) + { + char buf[XIDDATASIZE*4+6]; // see xid_to_str + DBUG_PRINT("info", ("rollback xid %s", + xid_to_str(buf, info->list+i))); + } #endif - hton->rollback_by_xid(hton, info->list+i); } } if (got < info->len) @@ -1904,7 +1933,8 @@ int ha_recover(HASH *commit_list) /* commit_list and tc_heuristic_recover cannot be set both */ DBUG_ASSERT(info.commit_list==0 || tc_heuristic_recover==0); /* if either is set, total_ha_2pc must be set too */ - DBUG_ASSERT(info.dry_run || total_ha_2pc>(ulong)opt_bin_log); + DBUG_ASSERT(info.dry_run || + (failed_ha_2pc + total_ha_2pc) > (ulong)opt_bin_log); if (total_ha_2pc <= (ulong)opt_bin_log) DBUG_RETURN(0); diff --git a/sql/log.cc b/sql/log.cc index 5a5f3cdf808..de501d94100 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -9317,8 +9317,10 @@ int TC_LOG_BINLOG::open(const char *opt_name) if (using_heuristic_recover()) { + mysql_mutex_lock(&LOCK_log); /* generate a new binlog to mask a corrupted one */ open(opt_name, LOG_BIN, 0, 0, WRITE_CACHE, max_binlog_size, 0, TRUE); + mysql_mutex_unlock(&LOCK_log); cleanup(); return 1; } diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 06e0c86ecf4..01d65413f08 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -9311,8 +9311,29 @@ mysqld_get_one_option(int optid, const struct my_option *opt, char *argument) } #ifdef WITH_WSREP case OPT_WSREP_CAUSAL_READS: - wsrep_causal_reads_update(&global_system_variables); + { + if (global_system_variables.wsrep_causal_reads) + { + WSREP_WARN("option --wsrep-causal-reads is deprecated"); + if (!(global_system_variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ)) + { + WSREP_WARN("--wsrep-causal-reads=ON takes precedence over --wsrep-sync-wait=%u. " + "WSREP_SYNC_WAIT_BEFORE_READ is on", + global_system_variables.wsrep_sync_wait); + global_system_variables.wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ; + } + } + else + { + if (global_system_variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ) { + WSREP_WARN("--wsrep-sync-wait=%u takes precedence over --wsrep-causal-reads=OFF. " + "WSREP_SYNC_WAIT_BEFORE_READ is on", + global_system_variables.wsrep_sync_wait); + global_system_variables.wsrep_causal_reads = 1; + } + } break; + } case OPT_WSREP_SYNC_WAIT: global_system_variables.wsrep_causal_reads= MY_TEST(global_system_variables.wsrep_sync_wait & diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 1b176d35512..e8fe4afc690 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -873,6 +873,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier) wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */ wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED; wsrep_affected_rows = 0; + wsrep_replicate_GTID = false; + wsrep_skip_wsrep_GTID = false; #endif /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); @@ -1304,6 +1306,8 @@ void THD::init(void) wsrep_TOI_pre_query_len = 0; wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED; wsrep_affected_rows = 0; + wsrep_replicate_GTID = false; + wsrep_skip_wsrep_GTID = false; #endif /* WITH_WSREP */ if (variables.sql_log_bin) diff --git a/sql/sql_class.h b/sql/sql_class.h index ba72c46556d..e1be5dc91f0 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -4308,6 +4308,8 @@ public: bool wsrep_ignore_table; wsrep_gtid_t wsrep_sync_wait_gtid; ulong wsrep_affected_rows; + bool wsrep_replicate_GTID; + bool wsrep_skip_wsrep_GTID; #endif /* WITH_WSREP */ /* Handling of timeouts for commands */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 5ebc9f0087d..efa48c6e241 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -3385,6 +3385,7 @@ mysql_execute_command(THD *thd) #endif case SQLCOM_SHOW_STATUS: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); execute_show_status(thd, all_tables); break; } @@ -3423,21 +3424,21 @@ mysql_execute_command(THD *thd) case SQLCOM_SHOW_TABLE_STATUS: case SQLCOM_SHOW_OPEN_TABLES: case SQLCOM_SHOW_GENERIC: + case SQLCOM_SHOW_PLUGINS: case SQLCOM_SHOW_FIELDS: case SQLCOM_SHOW_KEYS: - case SQLCOM_SELECT: - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) - goto error; - /* fall through */ - case SQLCOM_SHOW_PLUGINS: case SQLCOM_SHOW_VARIABLES: case SQLCOM_SHOW_CHARSETS: case SQLCOM_SHOW_COLLATIONS: case SQLCOM_SHOW_STORAGE_ENGINES: case SQLCOM_SHOW_PROFILE: - { + case SQLCOM_SELECT: + { #ifdef WITH_WSREP - DBUG_ASSERT(thd->wsrep_exec_mode != REPL_RECV); + if (lex->sql_command == SQLCOM_SELECT) + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_READ) + else + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW) #endif /* WITH_WSREP */ thd->status_var.last_query_cost= 0.0; @@ -3566,6 +3567,7 @@ mysql_execute_command(THD *thd) case SQLCOM_SHOW_RELAYLOG_EVENTS: /* fall through */ case SQLCOM_SHOW_BINLOG_EVENTS: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (check_global_access(thd, REPL_SLAVE_ACL)) goto error; res = mysql_show_binlog_events(thd); @@ -4144,6 +4146,7 @@ end_with_restore_list: { if (check_global_access(thd, SUPER_ACL | REPL_CLIENT_ACL)) goto error; + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); res = show_binlogs(thd); break; } @@ -4151,15 +4154,13 @@ end_with_restore_list: #endif /* EMBEDDED_LIBRARY */ case SQLCOM_SHOW_CREATE: { - DBUG_ASSERT(first_table == all_tables && first_table != 0); + DBUG_ASSERT(first_table == all_tables && first_table != 0); #ifdef DONT_ALLOW_SHOW_COMMANDS my_message(ER_NOT_ALLOWED_COMMAND, ER_THD(thd, ER_NOT_ALLOWED_COMMAND), MYF(0)); /* purecov: inspected */ goto error; #else - - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) - goto error; + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); /* Access check: @@ -4223,8 +4224,7 @@ end_with_restore_list: case SQLCOM_CHECKSUM: { DBUG_ASSERT(first_table == all_tables && first_table != 0); - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) - goto error; + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_READ); if (check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE)) @@ -4235,6 +4235,7 @@ end_with_restore_list: } case SQLCOM_UPDATE: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE); ha_rows found= 0, updated= 0; DBUG_ASSERT(first_table == all_tables && first_table != 0); if (WSREP_CLIENT(thd) && @@ -4277,9 +4278,7 @@ end_with_restore_list: /* if we switched from normal update, rights are checked */ if (up_result != 2) { - if (WSREP_CLIENT(thd) && - wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE)) - goto error; + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE); if ((res= multi_update_precheck(thd, all_tables))) break; } @@ -4349,10 +4348,6 @@ end_with_restore_list: break; } case SQLCOM_REPLACE: - { - if (WSREP_CLIENT(thd) && - wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE)) - goto error; #ifndef DBUG_OFF if (mysql_bin_log.is_open()) { @@ -4387,10 +4382,10 @@ end_with_restore_list: DBUG_PRINT("debug", ("Just after generate_incident()")); } #endif - } /* fall through */ case SQLCOM_INSERT: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE); DBUG_ASSERT(first_table == all_tables && first_table != 0); if (WSREP_CLIENT(thd) && @@ -4449,6 +4444,7 @@ end_with_restore_list: case SQLCOM_REPLACE_SELECT: case SQLCOM_INSERT_SELECT: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE); select_result *sel_result; bool explain= MY_TEST(lex->describe); DBUG_ASSERT(first_table == all_tables && first_table != 0); @@ -4572,6 +4568,7 @@ end_with_restore_list: } case SQLCOM_DELETE: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE); select_result *sel_result=lex->result; DBUG_ASSERT(first_table == all_tables && first_table != 0); if (WSREP_CLIENT(thd) && @@ -4632,6 +4629,7 @@ end_with_restore_list: } case SQLCOM_DELETE_MULTI: { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE); DBUG_ASSERT(first_table == all_tables && first_table != 0); TABLE_LIST *aux_tables= thd->lex->auxiliary_table_list.first; multi_delete *result; @@ -4712,19 +4710,6 @@ end_with_restore_list: /* So that DROP TEMPORARY TABLE gets to binlog at commit/rollback */ thd->variables.option_bits|= OPTION_KEEP_LOG; } - if (WSREP(thd)) - { - for (TABLE_LIST *table= all_tables; table; table= table->next_global) - { - if (!lex->tmp_table() && - (!thd->is_current_stmt_binlog_format_row() || - !thd->find_temporary_table(table))) - { - WSREP_TO_ISOLATION_BEGIN(NULL, NULL, all_tables); - break; - } - } - } /* If we are a slave, we should add IF EXISTS if the query executed on the master without an error. This will help a slave to @@ -4734,6 +4719,20 @@ end_with_restore_list: if (thd->slave_thread && !thd->slave_expected_error && slave_ddl_exec_mode_options == SLAVE_EXEC_MODE_IDEMPOTENT) lex->create_info.set(DDL_options_st::OPT_IF_EXISTS); + + if (WSREP(thd)) + { + for (TABLE_LIST *table= all_tables; table; table= table->next_global) + { + if (!lex->tmp_table() && + (!thd->is_current_stmt_binlog_format_row() || + !thd->find_temporary_table(table))) + { + WSREP_TO_ISOLATION_BEGIN(NULL, NULL, all_tables); + break; + } + } + } /* DDL and binlog write order are protected by metadata locks. */ res= mysql_rm_table(thd, first_table, lex->if_exists(), lex->tmp_table()); @@ -5047,9 +5046,7 @@ end_with_restore_list: db_name.length= lex->name.length; strmov(db_name.str, lex->name.str); -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (check_db_name(&db_name)) { @@ -5104,9 +5101,7 @@ end_with_restore_list: /* lex->unit.cleanup() is called outside, no need to call it here */ break; case SQLCOM_SHOW_CREATE_EVENT: -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); res= Events::show_create_event(thd, lex->spname->m_db, lex->spname->m_name); break; @@ -5490,6 +5485,7 @@ end_with_restore_list: if (!grant_user) goto error; + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); res = mysql_show_grants(thd, grant_user); break; } @@ -5969,18 +5965,14 @@ end_with_restore_list: } case SQLCOM_SHOW_CREATE_PROC: { -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (sp_show_create_routine(thd, TYPE_ENUM_PROCEDURE, lex->spname)) goto error; break; } case SQLCOM_SHOW_CREATE_FUNC: { -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (sp_show_create_routine(thd, TYPE_ENUM_FUNCTION, lex->spname)) goto error; break; @@ -5993,9 +5985,7 @@ end_with_restore_list: stored_procedure_type type= (lex->sql_command == SQLCOM_SHOW_PROC_CODE ? TYPE_ENUM_PROCEDURE : TYPE_ENUM_FUNCTION); -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (sp_cache_routine(thd, type, lex->spname, FALSE, &sp)) goto error; if (!sp || sp->show_routine_code(thd)) @@ -6017,9 +6007,7 @@ end_with_restore_list: if (check_ident_length(&lex->spname->m_name)) goto error; -#ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; -#endif /* WITH_WSREP */ + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_SHOW); if (show_create_trigger(thd, lex->spname)) goto error; /* Error has been already logged. */ @@ -6477,6 +6465,7 @@ static bool execute_show_status(THD *thd, TABLE_LIST *all_tables) if (!(res= check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE))) res= execute_sqlcom_select(thd, all_tables); + /* Don't log SHOW STATUS commands to slow query log */ thd->server_status&= ~(SERVER_QUERY_NO_INDEX_USED | SERVER_QUERY_NO_GOOD_INDEX_USED); diff --git a/sql/sql_partition_admin.cc b/sql/sql_partition_admin.cc index f5e47206963..519b7cc8195 100644 --- a/sql/sql_partition_admin.cc +++ b/sql/sql_partition_admin.cc @@ -785,7 +785,7 @@ bool Sql_cmd_alter_table_truncate_partition::execute(THD *thd) (!thd->is_current_stmt_binlog_format_row() || !thd->find_temporary_table(first_table)) && wsrep_to_isolation_begin( - thd, first_table->db, first_table->table_name, NULL) + thd, first_table->db, first_table->table_name, NULL) ) { WSREP_WARN("ALTER TABLE TRUNCATE PARTITION isolation failure"); diff --git a/sql/sql_table.cc b/sql/sql_table.cc index 8191782f24d..0d539758b68 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -2090,6 +2090,7 @@ bool mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists, DBUG_RETURN(TRUE); my_ok(thd); DBUG_RETURN(FALSE); + } @@ -2136,7 +2137,6 @@ static uint32 comment_length(THD *thd, uint32 comment_pos, return 0; } - /** Execute the drop of a normal or temporary table. @@ -2560,6 +2560,9 @@ err: /* Chop of the last comma */ built_non_trans_tmp_query.chop(); built_non_trans_tmp_query.append(" /* generated by server */"); +#ifdef WITH_WSREP + thd->wsrep_skip_wsrep_GTID = true; +#endif /* WITH_WSREP */ error |= thd->binlog_query(THD::STMT_QUERY_TYPE, built_non_trans_tmp_query.ptr(), built_non_trans_tmp_query.length(), @@ -2572,6 +2575,9 @@ err: /* Chop of the last comma */ built_trans_tmp_query.chop(); built_trans_tmp_query.append(" /* generated by server */"); +#ifdef WITH_WSREP + thd->wsrep_skip_wsrep_GTID = true; +#endif /* WITH_WSREP */ error |= thd->binlog_query(THD::STMT_QUERY_TYPE, built_trans_tmp_query.ptr(), built_trans_tmp_query.length(), @@ -2586,6 +2592,9 @@ err: built_query.append(" /* generated by server */"); int error_code = non_tmp_error ? thd->get_stmt_da()->sql_errno() : 0; +#ifdef WITH_WSREP + thd->wsrep_skip_wsrep_GTID = false; +#endif /* WITH_WSREP */ error |= thd->binlog_query(THD::STMT_QUERY_TYPE, built_query.ptr(), built_query.length(), @@ -2634,6 +2643,9 @@ err: } end: +#ifdef WITH_WSREP + thd->wsrep_skip_wsrep_GTID = false; +#endif /* WITH_WSREP */ DBUG_RETURN(error); } diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index bbb2d36f2d5..76de63c6192 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4936,7 +4936,7 @@ static Sys_var_ulong Sys_wsrep_slave_threads( GLOBAL_VAR(wsrep_slave_threads), CMD_LINE(REQUIRED_ARG), VALID_RANGE(1, 512), DEFAULT(1), BLOCK_SIZE(1), &PLock_wsrep_slave_threads, NOT_IN_BINLOG, - ON_CHECK(wsrep_slave_threads_check), + ON_CHECK(NULL), ON_UPDATE(wsrep_slave_threads_update)); static Sys_var_charptr Sys_wsrep_dbug_option( @@ -5048,21 +5048,13 @@ static Sys_var_mybool Sys_wsrep_certify_nonPK( GLOBAL_VAR(wsrep_certify_nonPK), CMD_LINE(OPT_ARG), DEFAULT(TRUE)); -static bool fix_wsrep_causal_reads(sys_var *self, THD* thd, enum_var_type var_type) -{ - if (var_type == OPT_GLOBAL) - wsrep_causal_reads_update(&global_system_variables); - else - wsrep_causal_reads_update(&thd->variables); - return false; -} static Sys_var_mybool Sys_wsrep_causal_reads( "wsrep_causal_reads", "Setting this variable is equivalent " "to setting wsrep_sync_wait READ flag", SESSION_VAR(wsrep_causal_reads), CMD_LINE(OPT_ARG, OPT_WSREP_CAUSAL_READS), DEFAULT(FALSE), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), - ON_UPDATE(fix_wsrep_causal_reads), + ON_UPDATE(wsrep_causal_reads_update), DEPRECATED("'@@wsrep_sync_wait=1'")); static Sys_var_uint Sys_wsrep_sync_wait( diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index cef59513485..a299ddf074f 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -217,12 +217,15 @@ wsrep_cb_status_t wsrep_apply_cb(void* const ctx, { THD* const thd((THD*)ctx); + assert(thd->wsrep_apply_toi == false); + // Allow tests to block the applier thread using the DBUG facilities. DBUG_EXECUTE_IF("sync.wsrep_apply_cb", { const char act[]= "now " - "wait_for signal.wsrep_apply_cb"; + "SIGNAL sync.wsrep_apply_cb_reached " + "WAIT_FOR signal.wsrep_apply_cb"; DBUG_ASSERT(!debug_sync_set_action(thd, STRING_WITH_LEN(act))); };); @@ -382,7 +385,7 @@ wsrep_cb_status_t wsrep_commit_cb(void* const ctx, mysql_mutex_unlock(&LOCK_wsrep_slave_threads); } - if (*exit == false && thd->wsrep_applier) + if (thd->wsrep_applier) { /* From trans_begin() */ thd->variables.option_bits|= OPTION_BEGIN; diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index e34f1b92d52..ba11d94e6fa 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -42,6 +42,7 @@ void wsrep_cleanup_transaction(THD *thd) thd->wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; thd->wsrep_exec_mode= LOCAL_STATE; thd->wsrep_affected_rows= 0; + thd->wsrep_skip_wsrep_GTID= false; return; } diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 30ae2c77fa3..d95c85f3008 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -2,7 +2,7 @@ This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. + the Free Software Foundation; version 2 of the License.x1 This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -16,6 +16,7 @@ #include <mysqld.h> #include <sql_class.h> #include <sql_parse.h> +#include <sql_base.h> /* find_temporary_table() */ #include "slave.h" #include "rpl_mi.h" #include "sql_repl.h" @@ -983,8 +984,6 @@ bool wsrep_must_sync_wait (THD* thd, uint mask) { return (thd->variables.wsrep_sync_wait & mask) && thd->variables.wsrep_on && - !(thd->variables.wsrep_dirty_reads && - !is_update_query(thd->lex->sql_command)) && !thd->in_active_multi_stmt_transaction() && thd->wsrep_conflict_state != REPLAYING && thd->wsrep_sync_wait_gtid.seqno == WSREP_SEQNO_UNDEFINED; @@ -1113,83 +1112,70 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd, const TABLE_LIST* table_list, wsrep_key_arr_t* ka) { - ka->keys= 0; - ka->keys_len= 0; + ka->keys= 0; + ka->keys_len= 0; - if (db || table) + if (db || table) + { + if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0)))) + { + WSREP_ERROR("Can't allocate memory for key_array"); + goto err; + } + ka->keys_len= 1; + if (!(ka->keys[0].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - TABLE_LIST tmp_table; + WSREP_ERROR("Can't allocate memory for key_parts"); + goto err; + } + ka->keys[0].key_parts_num= 2; + if (!wsrep_prepare_key_for_isolation( + db, table, + (wsrep_buf_t*)ka->keys[0].key_parts, + &ka->keys[0].key_parts_num)) + { + WSREP_ERROR("Preparing keys for isolation failed (1)"); + goto err; + } + } - memset(&tmp_table, 0, sizeof(tmp_table)); - tmp_table.table_name= (char*)table; - tmp_table.db= (char*)db; - tmp_table.mdl_request.init(MDL_key::GLOBAL, (db) ? db : "", - (table) ? table : "", - MDL_INTENTION_EXCLUSIVE, MDL_STATEMENT); + for (const TABLE_LIST* table= table_list; table; table= table->next_global) + { + wsrep_key_t* tmp; + if (ka->keys) + tmp= (wsrep_key_t*)my_realloc(ka->keys, + (ka->keys_len + 1) * sizeof(wsrep_key_t), + MYF(0)); + else + tmp= (wsrep_key_t*)my_malloc((ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0)); - if (!table || !thd->find_temporary_table(&tmp_table)) - { - if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0)))) - { - WSREP_ERROR("Can't allocate memory for key_array"); - goto err; - } - ka->keys_len= 1; - if (!(ka->keys[0].key_parts= (wsrep_buf_t*) - my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) - { - WSREP_ERROR("Can't allocate memory for key_parts"); - goto err; - } - ka->keys[0].key_parts_num= 2; - if (!wsrep_prepare_key_for_isolation( - db, table, - (wsrep_buf_t*)ka->keys[0].key_parts, - &ka->keys[0].key_parts_num)) - { - WSREP_ERROR("Preparing keys for isolation failed"); - goto err; - } - } + if (!tmp) + { + WSREP_ERROR("Can't allocate memory for key_array"); + goto err; } - - for (const TABLE_LIST* table= table_list; table; table= table->next_global) + ka->keys= tmp; + if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - if (!thd->find_temporary_table(table)) - { - wsrep_key_t* tmp; - tmp= (wsrep_key_t*)my_realloc( - ka->keys, (ka->keys_len + 1) * sizeof(wsrep_key_t), - MYF(MY_ALLOW_ZERO_PTR)); - - if (!tmp) - { - WSREP_ERROR("Can't allocate memory for key_array"); - goto err; - } - ka->keys= tmp; - if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*) - my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) - { - WSREP_ERROR("Can't allocate memory for key_parts"); - goto err; - } - ka->keys[ka->keys_len].key_parts_num= 2; - ++ka->keys_len; - if (!wsrep_prepare_key_for_isolation( - table->db, table->table_name, - (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts, - &ka->keys[ka->keys_len - 1].key_parts_num)) - { - WSREP_ERROR("Preparing keys for isolation failed"); - goto err; - } - } + WSREP_ERROR("Can't allocate memory for key_parts"); + goto err; } - return true; + ka->keys[ka->keys_len].key_parts_num= 2; + ++ka->keys_len; + if (!wsrep_prepare_key_for_isolation(table->db, table->table_name, + (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts, + &ka->keys[ka->keys_len - 1].key_parts_num)) + { + WSREP_ERROR("Preparing keys for isolation failed (2)"); + goto err; + } + } + return 0; err: wsrep_keys_free(ka); - return false; + return 1; } @@ -1404,6 +1390,84 @@ static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len); static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len); /* + Decide if statement should run in TOI. + + Look if table or table_list contain temporary tables. If the + statement affects only temporary tables, statement should not run + in TOI. If the table list contains mix of regular and temporary tables + (DROP TABLE, OPTIMIZE, ANALYZE), statement should be run in TOI but + should be rewritten at later time for replication to contain only + non-temporary tables. + */ +static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table, + const TABLE_LIST *table_list) +{ + DBUG_ASSERT(!table || db); + DBUG_ASSERT(table_list || db); + + LEX* lex= thd->lex; + SELECT_LEX* select_lex= &lex->select_lex; + TABLE_LIST* first_table= select_lex->table_list.first; + + switch (lex->sql_command) + { + case SQLCOM_CREATE_TABLE: + DBUG_ASSERT(!table_list); + if (thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) + { + return false; + } + return true; + + case SQLCOM_CREATE_VIEW: + + DBUG_ASSERT(!table_list); + DBUG_ASSERT(first_table); /* First table is view name */ + /* + If any of the remaining tables refer to temporary table error + is returned to client, so TOI can be skipped + */ + for (TABLE_LIST* it= first_table->next_global; it; it= it->next_global) + { + if (thd->find_temporary_table(it)) + { + return false; + } + } + return true; + + case SQLCOM_CREATE_TRIGGER: + + DBUG_ASSERT(!table_list); + DBUG_ASSERT(first_table); + + if (thd->find_temporary_table(first_table)) + { + return false; + } + return true; + + default: + if (table && !thd->find_temporary_table(db, table)) + { + return true; + } + + if (table_list) + { + for (TABLE_LIST* table= first_table; table; table= table->next_global) + { + if (!thd->find_temporary_table(table->db, table->table_name)) + { + return true; + } + } + } + return !(table || table_list); + } +} + +/* returns: 0: statement was replicated as TOI 1: TOI replication was skipped @@ -1418,6 +1482,12 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, int buf_err; int rc= 0; + if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false) + { + WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd)); + return 1; + } + WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, thd->query() ); switch (thd->lex->sql_command) @@ -1445,16 +1515,16 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, } /* fallthrough */ default: - buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), &buf, - &buf_len); + buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), + &buf, &buf_len); break; } wsrep_key_arr_t key_arr= {0, 0}; struct wsrep_buf buff = { buf, buf_len }; - if (!buf_err && - wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&& - key_arr.keys_len > 0 && + if (!buf_err && + !wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr) && + key_arr.keys_len > 0 && WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, key_arr.keys, key_arr.keys_len, &buff, 1, @@ -1651,9 +1721,12 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE) { switch (thd->variables.wsrep_OSU_method) { - case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_, - table_list); break; - case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break; + case WSREP_OSU_TOI: + ret = wsrep_TOI_begin(thd, db_, table_, table_list); + break; + case WSREP_OSU_RSU: + ret = wsrep_RSU_begin(thd, db_, table_); + break; default: WSREP_ERROR("Unsupported OSU method: %lu", thd->variables.wsrep_OSU_method); diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 069801b43e3..3d46e17af99 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -99,11 +99,12 @@ enum enum_wsrep_OSU_method { enum enum_wsrep_sync_wait { WSREP_SYNC_WAIT_NONE = 0x0, - // show, select, begin + // select, begin WSREP_SYNC_WAIT_BEFORE_READ = 0x1, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE = 0x2, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE = 0x4, - WSREP_SYNC_WAIT_MAX = 0x7 + WSREP_SYNC_WAIT_BEFORE_SHOW = 0x8, + WSREP_SYNC_WAIT_MAX = 0xF }; // MySQL status variables @@ -223,6 +224,8 @@ extern wsrep_seqno_t wsrep_locked_seqno; #define WSREP_PROVIDER_EXISTS \ (wsrep_provider && strncasecmp(wsrep_provider, WSREP_NONE, FN_REFLEN)) +#define WSREP_QUERY(thd) (thd->query()) + extern void wsrep_ready_wait(); class Ha_trx_info; diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index 076da23967a..05cf36fd9dc 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -746,7 +746,7 @@ ssize_t wsrep_sst_prepare (void** msg) // Attempt 1: wsrep_sst_receive_address if (wsrep_sst_receive_address && - strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO)) + strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO)) { addr_in= wsrep_sst_receive_address; } @@ -886,16 +886,13 @@ static int sst_donate_mysqldump (const char* addr, { char host[256]; wsp::Address address(addr); - if (!address.is_valid()) { WSREP_ERROR("Could not parse SST address : %s", addr); return 0; } - memcpy(host, address.get_address(), address.get_address_len()); int port= address.get_port(); - int const cmd_len= 4096; wsp::string cmd_str(cmd_len); @@ -912,7 +909,7 @@ static int sst_donate_mysqldump (const char* addr, int ret= snprintf (cmd_str(), cmd_len, "wsrep_sst_mysqldump " - WSREP_SST_OPT_HOST" '%s' " + WSREP_SST_OPT_ADDR" '%s' " WSREP_SST_OPT_PORT" '%d' " WSREP_SST_OPT_LPORT" '%u' " WSREP_SST_OPT_SOCKET" '%s' " @@ -920,7 +917,7 @@ static int sst_donate_mysqldump (const char* addr, WSREP_SST_OPT_GTID" '%s:%lld' " WSREP_SST_OPT_GTID_DOMAIN_ID" '%d'" "%s", - host, port, mysqld_port, mysqld_unix_port, + addr, port, mysqld_port, mysqld_unix_port, wsrep_defaults_file, uuid_str, (long long)seqno, wsrep_gtid_domain_id, bypass ? " " WSREP_SST_OPT_BYPASS : ""); diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc index 9c77cb49256..1a358877a35 100644 --- a/sql/wsrep_utils.cc +++ b/sql/wsrep_utils.cc @@ -573,3 +573,17 @@ done: return ret; } +/* returns the length of the host part of the address string */ +size_t wsrep_host_len(const char* const addr, size_t const addr_len) +{ + // check for IPv6 notation first + const char* const bracket= ('[' == addr[0] ? strchr(addr, ']') : NULL); + + if (bracket) { // IPv6 + return (bracket - addr + 1); + } + else { // host part ends at ':' or end of string + const char* const colon= strchr(addr, ':'); + return (colon ? colon - addr : addr_len); + } +} diff --git a/sql/wsrep_utils.h b/sql/wsrep_utils.h index e4f421930f4..277cea9dc31 100644 --- a/sql/wsrep_utils.h +++ b/sql/wsrep_utils.h @@ -22,6 +22,9 @@ unsigned int wsrep_check_ip (const char* const addr, bool *is_ipv6); size_t wsrep_guess_ip (char* buf, size_t buf_len); +/* returns the length of the host part of the address string */ +size_t wsrep_host_len(const char* addr, size_t addr_len); + namespace wsp { class Address { diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index b34fdf8b7ed..41a8f6c7eb0 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -27,6 +27,8 @@ #include <cstdlib> +static long wsrep_prev_slave_threads = wsrep_slave_threads; + int wsrep_init_vars() { wsrep_provider = my_strdup(WSREP_NONE, MYF(MY_WME)); @@ -49,12 +51,23 @@ bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type) return false; } -bool wsrep_causal_reads_update (SV *sv) +bool wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type) { - if (sv->wsrep_causal_reads) { - sv->wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ; + // global setting should not affect session setting. + // if (var_type == OPT_GLOBAL) { + // thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads; + // } + if (thd->variables.wsrep_causal_reads) { + thd->variables.wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ; + } else { + thd->variables.wsrep_sync_wait &= ~WSREP_SYNC_WAIT_BEFORE_READ; + } + + // update global settings too. + if (global_system_variables.wsrep_causal_reads) { + global_system_variables.wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ; } else { - sv->wsrep_sync_wait &= ~WSREP_SYNC_WAIT_BEFORE_READ; + global_system_variables.wsrep_sync_wait &= ~WSREP_SYNC_WAIT_BEFORE_READ; } return false; @@ -62,12 +75,17 @@ bool wsrep_causal_reads_update (SV *sv) bool wsrep_sync_wait_update (sys_var* self, THD* thd, enum_var_type var_type) { - if (var_type == OPT_GLOBAL) - global_system_variables.wsrep_causal_reads = - MY_TEST(global_system_variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ); - else - thd->variables.wsrep_causal_reads = - MY_TEST(thd->variables.wsrep_sync_wait & WSREP_SYNC_WAIT_BEFORE_READ); + // global setting should not affect session setting. + // if (var_type == OPT_GLOBAL) { + // thd->variables.wsrep_sync_wait = global_system_variables.wsrep_sync_wait; + // } + thd->variables.wsrep_causal_reads = thd->variables.wsrep_sync_wait & + WSREP_SYNC_WAIT_BEFORE_READ; + + // update global settings too + global_system_variables.wsrep_causal_reads = global_system_variables.wsrep_sync_wait & + WSREP_SYNC_WAIT_BEFORE_READ; + return false; } @@ -528,18 +546,15 @@ void wsrep_node_address_init (const char* value) wsrep_node_address = (value) ? my_strdup(value, MYF(0)) : NULL; } -bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var) +static void wsrep_slave_count_change_update () { - mysql_mutex_lock(&LOCK_wsrep_slave_threads); - wsrep_slave_count_change += (var->save_result.ulonglong_value - - wsrep_slave_threads); - mysql_mutex_unlock(&LOCK_wsrep_slave_threads); - - return 0; + wsrep_slave_count_change += (wsrep_slave_threads - wsrep_prev_slave_threads); + wsrep_prev_slave_threads = wsrep_slave_threads; } bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type) { + wsrep_slave_count_change_update(); if (wsrep_slave_count_change > 0) { wsrep_create_appliers(wsrep_slave_count_change); diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h index 1509fc7d589..dde59d1503f 100644 --- a/sql/wsrep_var.h +++ b/sql/wsrep_var.h @@ -41,9 +41,7 @@ int wsrep_init_vars(); #define DEFAULT_ARGS (THD* thd, enum_var_type var_type) #define INIT_ARGS (const char* opt) -struct system_variables; -bool wsrep_causal_reads_update(struct system_variables *sv); - +extern bool wsrep_causal_reads_update UPDATE_ARGS; extern bool wsrep_on_update UPDATE_ARGS; extern bool wsrep_sync_wait_update UPDATE_ARGS; extern bool wsrep_start_position_check CHECK_ARGS; |