diff options
author | Sergei Golubchik <serg@mariadb.org> | 2015-11-19 17:48:36 +0100 |
---|---|---|
committer | Sergei Golubchik <serg@mariadb.org> | 2015-11-19 17:48:36 +0100 |
commit | 7f19330c595e3183d079fe2c18eecc74740e8f83 (patch) | |
tree | 8b853976cd14d96d5415757be525133b32be0c93 /sql | |
parent | 4046ed12bcddfd831c510b022cb7af224be9457b (diff) | |
parent | f4421c893b50f05078f14d33c47d21f52f59f8a7 (diff) | |
download | mariadb-git-7f19330c595e3183d079fe2c18eecc74740e8f83.tar.gz |
Merge branch 'github/10.0-galera' into 10.1
Diffstat (limited to 'sql')
-rw-r--r-- | sql/events.cc | 3 | ||||
-rw-r--r-- | sql/mdl.cc | 20 | ||||
-rw-r--r-- | sql/mdl.h | 2 | ||||
-rw-r--r-- | sql/slave.cc | 81 | ||||
-rw-r--r-- | sql/sql_parse.cc | 20 | ||||
-rw-r--r-- | sql/sql_partition_admin.cc | 23 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 18 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 96 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 3 | ||||
-rw-r--r-- | sql/wsrep_notify.cc | 2 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 207 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 12 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 93 | ||||
-rw-r--r-- | sql/wsrep_utils.h | 26 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 8 |
15 files changed, 444 insertions, 170 deletions
diff --git a/sql/events.cc b/sql/events.cc index abc798b659a..b80ec993ac4 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -1216,7 +1216,8 @@ int wsrep_create_event_query(THD *thd, uchar** buf, size_t* buf_len) if (create_query_string(thd, &log_query)) { - WSREP_WARN("events create string failed: %s", thd->query()); + WSREP_WARN("events create string failed: schema: %s, query: %s", + (thd->db ? thd->db : "(null)"), thd->query()); return 1; } return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len); diff --git a/sql/mdl.cc b/sql/mdl.cc index ec380aa7432..73b64b35dff 100644 --- a/sql/mdl.cc +++ b/sql/mdl.cc @@ -1069,7 +1069,7 @@ MDL_wait::timed_wait(MDL_context_owner *owner, struct timespec *abs_timeout, wait_result= mysql_cond_wait(&m_COND_wait_status, &m_LOCK_wait_status); } else -#endif +#endif /* WITH_WSREP */ wait_result= mysql_cond_timedwait(&m_COND_wait_status, &m_LOCK_wait_status, abs_timeout); } @@ -1170,7 +1170,8 @@ void MDL_lock::Ticket_list::add_ticket(MDL_ticket *ticket) if (granted->get_ctx() != ticket->get_ctx() && granted->is_incompatible_when_granted(ticket->get_type())) { - if (!wsrep_grant_mdl_exception(ticket->get_ctx(), granted)) + if (!wsrep_grant_mdl_exception(ticket->get_ctx(), granted, + &ticket->get_lock()->key)) { WSREP_DEBUG("MDL victim killed at add_ticket"); } @@ -1561,7 +1562,7 @@ MDL_lock::can_grant_lock(enum_mdl_type type_arg, wsrep_thd_query(requestor_ctx->get_thd())); can_grant = true; } - else if (!wsrep_grant_mdl_exception(requestor_ctx, ticket)) + else if (!wsrep_grant_mdl_exception(requestor_ctx, ticket, &key)) { wsrep_can_grant= FALSE; if (wsrep_log_conflicts) @@ -2904,6 +2905,19 @@ void MDL_context::release_explicit_locks() release_locks_stored_before(MDL_EXPLICIT, NULL); } +bool MDL_context::has_explicit_locks() +{ + MDL_ticket *ticket = NULL; + + Ticket_iterator it(m_tickets[MDL_EXPLICIT]); + + while ((ticket = it++)) + { + return true; + } + + return false; +} #ifdef WITH_WSREP void MDL_ticket::wsrep_report(bool debug) diff --git a/sql/mdl.h b/sql/mdl.h index a80de5a4e71..7961f1f24b2 100644 --- a/sql/mdl.h +++ b/sql/mdl.h @@ -458,6 +458,7 @@ public: MDL_key key; public: + static void *operator new(size_t size, MEM_ROOT *mem_root) throw () { return alloc_root(mem_root, size); } static void operator delete(void *ptr, MEM_ROOT *mem_root) {} @@ -930,6 +931,7 @@ private: public: THD *get_thd() const { return m_owner->get_thd(); } + bool has_explicit_locks(); void find_deadlock(); ulong get_thread_id() const { return thd_get_thread_id(get_thd()); } diff --git a/sql/slave.cc b/sql/slave.cc index 58cb508f8c8..895a8c2167e 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3374,6 +3374,17 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, if (reason == Log_event::EVENT_SKIP_NOT) exec_res= ev->apply_event(rgi); +#ifdef WITH_WSREP + if (exec_res && thd->wsrep_conflict_state != NO_CONFLICT) + { + WSREP_DEBUG("SQL apply failed, res %d conflict state: %d", + exec_res, thd->wsrep_conflict_state); + rli->abort_slave= 1; + rli->report(ERROR_LEVEL, ER_UNKNOWN_COM_ERROR, rgi->gtid_info(), + "Node has dropped from cluster"); + } +#endif + #ifndef DBUG_OFF /* This only prints information to the debug trace. @@ -3693,6 +3704,10 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, serial_rgi->event_relay_log_pos= rli->event_relay_log_pos; exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL); +#ifdef WITH_WSREP + WSREP_DEBUG("apply_event_and_update_pos() result: %d", exec_res); +#endif /* WITH_WSREP */ + delete_or_keep_event_post_apply(serial_rgi, typ, ev); /* @@ -3702,6 +3717,12 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, if (exec_res == 2) DBUG_RETURN(1); +#ifdef WITH_WSREP + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + if (thd->wsrep_conflict_state == NO_CONFLICT) + { + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); +#endif /* WITH_WSREP */ if (slave_trans_retries) { int UNINIT_VAR(temp_err); @@ -3774,6 +3795,12 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, serial_rgi->trans_retries)); } } +#ifdef WITH_WSREP + } + else + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); +#endif /* WITH_WSREP */ + thread_safe_increment64(&rli->executed_entries); DBUG_RETURN(exec_res); } @@ -4756,15 +4783,21 @@ pthread_handler_t handle_slave_sql(void *arg) if (exec_relay_log_event(thd, rli, serial_rgi)) { +#ifdef WITH_WSREP + if (thd->wsrep_conflict_state != NO_CONFLICT) + { + wsrep_node_dropped= TRUE; + rli->abort_slave= TRUE; + } +#endif /* WITH_WSREP */ + DBUG_PRINT("info", ("exec_relay_log_event() failed")); // do not scare the user if SQL thread was simply killed or stopped if (!sql_slave_killed(serial_rgi)) { slave_output_error_info(serial_rgi, thd); if (WSREP_ON && rli->last_error().number == ER_UNKNOWN_COM_ERROR) - { - wsrep_node_dropped= TRUE; - } + wsrep_node_dropped= TRUE; } goto err; } @@ -4889,27 +4922,33 @@ err_during_init: thd->rgi_fake= thd->rgi_slave= NULL; delete serial_rgi; mysql_mutex_unlock(&LOCK_thread_count); + #ifdef WITH_WSREP - /* if slave stopped due to node going non primary, we set global flag to - trigger automatic restart of slave when node joins back to cluster + /* + If slave stopped due to node going non primary, we set global flag to + trigger automatic restart of slave when node joins back to cluster. */ - if (WSREP_ON && wsrep_node_dropped && wsrep_restart_slave) - { - if (wsrep_ready) - { - WSREP_INFO("Slave error due to node temporarily non-primary" - "SQL slave will continue"); - wsrep_node_dropped= FALSE; - mysql_mutex_unlock(&rli->run_lock); - goto wsrep_restart_point; - } else { - WSREP_INFO("Slave error due to node going non-primary"); - WSREP_INFO("wsrep_restart_slave was set and therefore slave will be " - "automatically restarted when node joins back to cluster"); - wsrep_restart_slave_activated= TRUE; - } - } + if (WSREP_ON && wsrep_node_dropped && wsrep_restart_slave) + { + if (wsrep_ready) + { + WSREP_INFO("Slave error due to node temporarily non-primary" + "SQL slave will continue"); + wsrep_node_dropped= FALSE; + mysql_mutex_unlock(&rli->run_lock); + WSREP_DEBUG("wsrep_conflict_state now: %d", thd->wsrep_conflict_state); + WSREP_INFO("slave restart: %d", thd->wsrep_conflict_state); + thd->wsrep_conflict_state= NO_CONFLICT; + goto wsrep_restart_point; + } else { + WSREP_INFO("Slave error due to node going non-primary"); + WSREP_INFO("wsrep_restart_slave was set and therefore slave will be " + "automatically restarted when node joins back to cluster."); + wsrep_restart_slave_activated= TRUE; + } + } #endif /* WITH_WSREP */ + /* Note: the order of the broadcast and unlock calls below (first broadcast, then unlock) is important. Otherwise a killer_thread can execute between the calls and diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 1388ec01459..ba9dc7eb854 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -4829,6 +4829,25 @@ end_with_restore_list: */ if (!reload_acl_and_cache(thd, lex->type, first_table, &write_to_binlog)) { +#ifdef WITH_WSREP + if ((lex->type & REFRESH_TABLES) && !(lex->type & (REFRESH_FOR_EXPORT|REFRESH_READ_LOCK))) + { + /* + This is done after reload_acl_and_cache is because + LOCK TABLES is not replicated in galera, the upgrade of which + is checked in reload_acl_and_cache. + Hence, done after/if we are able to upgrade locks. + */ + if (first_table) + { + WSREP_TO_ISOLATION_BEGIN(NULL, NULL, first_table); + } + else + { + WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL); + } + } +#endif /* WITH_WSREP */ /* We WANT to write and we CAN write. ! we write after unlocking the table. @@ -7134,6 +7153,7 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, /* Performance Schema Interface instrumentation, end */ MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); thd->m_statement_psi= NULL; + thd->m_digest= NULL; close_thread_tables(thd); thd->wsrep_conflict_state= RETRY_AUTOCOMMIT; diff --git a/sql/sql_partition_admin.cc b/sql/sql_partition_admin.cc index e237fb1ad00..9e67a21e8f4 100644 --- a/sql/sql_partition_admin.cc +++ b/sql/sql_partition_admin.cc @@ -783,20 +783,17 @@ bool Sql_cmd_alter_table_truncate_partition::execute(THD *thd) DBUG_RETURN(TRUE); #ifdef WITH_WSREP - if (WSREP_ON) + /* Forward declaration */ + TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl); + + if (WSREP(thd) && (!thd->is_current_stmt_binlog_format_row() || + !find_temporary_table(thd, first_table)) && + wsrep_to_isolation_begin( + thd, first_table->db, first_table->table_name, NULL) + ) { - /* Forward declaration */ - TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl); - - if ((!thd->is_current_stmt_binlog_format_row() || - !find_temporary_table(thd, first_table)) && - wsrep_to_isolation_begin( - thd, first_table->db, first_table->table_name, NULL) - ) - { - WSREP_WARN("ALTER TABLE TRUNCATE PARTITION isolation failure"); - DBUG_RETURN(TRUE); - } + WSREP_WARN("ALTER TABLE TRUNCATE PARTITION isolation failure"); + DBUG_RETURN(TRUE); } #endif /* WITH_WSREP */ diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index f45ba5e5a39..5bec467708c 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -245,8 +245,9 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all) if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) { DBUG_PRINT("wsrep", ("setting rollback fail")); - WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", - (long long)thd->real_id, thd->query()); + WSREP_ERROR("settting rollback fail: thd: %llu, schema: %s, SQL: %s", + (long long)thd->real_id, (thd->db ? thd->db : "(null)"), + thd->query()); } wsrep_cleanup_transaction(thd); } @@ -286,8 +287,9 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all) if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) { DBUG_PRINT("wsrep", ("setting rollback fail")); - WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", - (long long)thd->real_id, thd->query()); + WSREP_ERROR("settting rollback fail: thd: %llu, schema: %s, SQL: %s", + (long long)thd->real_id, (thd->db ? thd->db : "(null)"), + thd->query()); } } wsrep_cleanup_transaction(thd); @@ -448,9 +450,11 @@ wsrep_run_wsrep_commit(THD *thd, bool all) if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_ws_handle.trx_id) { WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %zu\n" + "schema: %s \n" "QUERY: %s\n" " => Skipping replication", - thd->thread_id, data_len, thd->query()); + thd->thread_id, data_len, + (thd->db ? thd->db : "(null)"), thd->query()); rcode = WSREP_TRX_FAIL; } else if (!rcode) @@ -465,8 +469,8 @@ wsrep_run_wsrep_commit(THD *thd, bool all) &thd->wsrep_trx_meta); if (rcode == WSREP_TRX_MISSING) { - WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s", - thd->thread_id, thd->query()); + WSREP_WARN("Transaction missing in provider, thd: %ld, schema: %s, SQL: %s", + thd->thread_id, (thd->db ? thd->db : "(null)"), thd->query()); rcode = WSREP_TRX_FAIL; } else if (rcode == WSREP_BF_ABORT) { WSREP_DEBUG("thd %lu seqno %lld BF aborted by provider, will replay", diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 0158a78a10b..90b1c132ae9 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -922,19 +922,10 @@ bool wsrep_start_replication() wsrep_sst_donor, bootstrap))) { - if (-ESOCKTNOSUPPORT == rcode) - { - DBUG_PRINT("wsrep",("unrecognized cluster address: '%s', rcode: %d", - wsrep_cluster_address, rcode)); - WSREP_ERROR("unrecognized cluster address: '%s', rcode: %d", - wsrep_cluster_address, rcode); - } - else - { - DBUG_PRINT("wsrep",("wsrep->connect() failed: %d", rcode)); - WSREP_ERROR("wsrep::connect() failed: %d", rcode); - } - + DBUG_PRINT("wsrep",("wsrep->connect(%s) failed: %d", + wsrep_cluster_address, rcode)); + WSREP_ERROR("wsrep::connect(%s) failed: %d", + wsrep_cluster_address, rcode); return false; } else @@ -1277,7 +1268,8 @@ int wsrep_alter_event_query(THD *thd, uchar** buf, size_t* buf_len) if (wsrep_alter_query_string(thd, &log_query)) { - WSREP_WARN("events alter string failed: %s", thd->query()); + WSREP_WARN("events alter string failed: schema: %s, query: %s", + (thd->db ? thd->db : "(null)"), thd->query()); return 1; } return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len); @@ -1425,9 +1417,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, } else if (key_arr.keys_len > 0) { /* jump to error handler in mysql_execute_command() */ - WSREP_WARN("TO isolation failed for: %d, sql: %s. Check wsrep " + WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. Check wsrep " "connection state and retry the query.", - ret, (thd->query()) ? thd->query() : "void"); + ret, + (thd->db ? thd->db : "(null)"), + (thd->query()) ? thd->query() : "void"); my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check " "your wsrep connection state and retry the query."); wsrep_keys_free(&key_arr); @@ -1460,8 +1454,10 @@ static void wsrep_TOI_end(THD *thd) { WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd)); } else { - WSREP_WARN("TO isolation end failed for: %d, sql: %s", - ret, (thd->query()) ? thd->query() : "void"); + WSREP_WARN("TO isolation end failed for: %d, schema: %s, sql: %s", + ret, + (thd->db ? thd->db : "(null)"), + (thd->query()) ? thd->query() : "void"); } } @@ -1474,7 +1470,10 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) ret = wsrep->desync(wsrep); if (ret != WSREP_OK) { - WSREP_WARN("RSU desync failed %d for %s", ret, thd->query()); + WSREP_WARN("RSU desync failed %d for schema: %s, query: %s", + ret, + (thd->db ? thd->db : "(null)"), + thd->query()); my_error(ER_LOCK_DEADLOCK, MYF(0)); return(ret); } @@ -1485,7 +1484,9 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) if (wsrep_wait_committing_connections_close(5000)) { /* no can do, bail out from DDL */ - WSREP_WARN("RSU failed due to pending transactions, %s", thd->query()); + WSREP_WARN("RSU failed due to pending transactions, schema: %s, query %s", + (thd->db ? thd->db : "(null)"), + thd->query()); mysql_mutex_lock(&LOCK_wsrep_replaying); wsrep_replaying--; mysql_mutex_unlock(&LOCK_wsrep_replaying); @@ -1493,7 +1494,10 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) ret = wsrep->resync(wsrep); if (ret != WSREP_OK) { - WSREP_WARN("resync failed %d for %s", ret, thd->query()); + WSREP_WARN("resync failed %d for schema: %s, query: %s", + ret, + (thd->db ? thd->db : "(null)"), + thd->query()); } my_error(ER_LOCK_DEADLOCK, MYF(0)); return(1); @@ -1502,7 +1506,9 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) wsrep_seqno_t seqno = wsrep->pause(wsrep); if (seqno == WSREP_SEQNO_UNDEFINED) { - WSREP_WARN("pause failed %lld for %s", (long long)seqno, thd->query()); + WSREP_WARN("pause failed %lld for schema: %s, query: %s", (long long)seqno, + (thd->db ? thd->db : "(null)"), + thd->query()); return(1); } WSREP_DEBUG("paused at %lld", (long long)seqno); @@ -1524,12 +1530,16 @@ static void wsrep_RSU_end(THD *thd) ret = wsrep->resume(wsrep); if (ret != WSREP_OK) { - WSREP_WARN("resume failed %d for %s", ret, thd->query()); + WSREP_WARN("resume failed %d for schema: %s, query: %s", ret, + (thd->db ? thd->db : "(null)"), + thd->query()); } ret = wsrep->resync(wsrep); if (ret != WSREP_OK) { - WSREP_WARN("resync failed %d for %s", ret, thd->query()); + WSREP_WARN("resync failed %d for schema: %s, query: %s", ret, + (thd->db ? thd->db : "(null)"), + thd->query()); return; } thd->variables.wsrep_on = 1; @@ -1550,8 +1560,10 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, if (thd->wsrep_conflict_state == MUST_ABORT) { - WSREP_INFO("thread: %lu, %s has been aborted due to multi-master conflict", - thd->thread_id, thd->query()); + WSREP_INFO("thread: %lu, schema: %s, query: %s has been aborted due to multi-master conflict", + thd->thread_id, + (thd->db ? thd->db : "(null)"), + thd->query()); mysql_mutex_unlock(&thd->LOCK_wsrep_thd); return WSREP_TRX_FAIL; } @@ -1629,15 +1641,16 @@ void wsrep_to_isolation_end(THD *thd) } } -#define WSREP_MDL_LOG(severity, msg, req, gra) \ +#define WSREP_MDL_LOG(severity, msg, schema, schema_len, req, gra) \ WSREP_##severity( \ "%s\n" \ + "schema: %.*s\n" \ "request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \ "granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \ - msg, \ + msg, schema_len, schema, \ req->thread_id, (long long)wsrep_thd_trx_seqno(req), \ req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \ - req->get_command(), req->lex->sql_command, req->query(), \ + req->get_command(), req->lex->sql_command, req->query(), \ gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \ gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ gra->get_command(), gra->lex->sql_command, gra->query()); @@ -1654,7 +1667,8 @@ void wsrep_to_isolation_end(THD *thd) bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, - MDL_ticket *ticket + MDL_ticket *ticket, + const MDL_key *key ) { /* Fallback to the non-wsrep behaviour */ if (!WSREP_ON) return FALSE; @@ -1663,29 +1677,35 @@ wsrep_grant_mdl_exception(MDL_context *requestor_ctx, THD *granted_thd = ticket->get_ctx()->get_thd(); bool ret = FALSE; + const char* schema= key->db_name(); + int schema_len= key->db_name_length(); + mysql_mutex_lock(&request_thd->LOCK_wsrep_thd); if (request_thd->wsrep_exec_mode == TOTAL_ORDER || request_thd->wsrep_exec_mode == REPL_RECV) { mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd); - WSREP_MDL_LOG(DEBUG, "MDL conflict ", request_thd, granted_thd); + WSREP_MDL_LOG(DEBUG, "MDL conflict ", schema, schema_len, + request_thd, granted_thd); ticket->wsrep_report(wsrep_debug); mysql_mutex_lock(&granted_thd->LOCK_wsrep_thd); if (granted_thd->wsrep_exec_mode == TOTAL_ORDER || granted_thd->wsrep_exec_mode == REPL_RECV) { - WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", request_thd, granted_thd); + WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len, + request_thd, granted_thd); ticket->wsrep_report(true); mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); ret = TRUE; } - else if (granted_thd->lex->sql_command == SQLCOM_FLUSH) + else if (granted_thd->lex->sql_command == SQLCOM_FLUSH || + granted_thd->mdl_context.has_explicit_locks()) { - WSREP_DEBUG("MDL granted over FLUSH BF"); + WSREP_DEBUG("BF thread waiting for FLUSH"); ticket->wsrep_report(wsrep_debug); mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); - ret = TRUE; + ret = FALSE; } else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE) { @@ -1705,7 +1725,8 @@ wsrep_grant_mdl_exception(MDL_context *requestor_ctx, } else { - WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", request_thd, granted_thd); + WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len, + request_thd, granted_thd); ticket->wsrep_report(wsrep_debug); mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); @@ -2177,7 +2198,8 @@ int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len) &(thd->lex->definer->host), saved_mode)) { - WSREP_WARN("SP create string failed: %s", thd->query()); + WSREP_WARN("SP create string failed: schema: %s, query: %s", + (thd->db ? thd->db : "(null)"), thd->query()); return 1; } diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 24af4ec0953..3c36e88eb6d 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -289,7 +289,8 @@ int wsrep_alter_event_query(THD *thd, uchar** buf, size_t* buf_len); extern bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx, - MDL_ticket *ticket); + MDL_ticket *ticket, + const MDL_key *key); IO_CACHE * get_trans_log(THD * thd); bool wsrep_trans_cache_is_empty(THD *thd); void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end); diff --git a/sql/wsrep_notify.cc b/sql/wsrep_notify.cc index 6eefb961b62..e7d30d5a9c1 100644 --- a/sql/wsrep_notify.cc +++ b/sql/wsrep_notify.cc @@ -97,7 +97,7 @@ void wsrep_notify_status (wsrep_member_status_t status, return; } - wsp::process p(cmd_ptr, "r"); + wsp::process p(cmd_ptr, "r", NULL); p.wait(); int err = p.error(); diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index b31d433de08..c75f2c116ec 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -315,12 +315,14 @@ void wsrep_sst_continue () struct sst_thread_arg { const char* cmd; - int err; + char** env; char* ret_str; + int err; mysql_mutex_t lock; mysql_cond_t cond; - sst_thread_arg (const char* c) : cmd(c), err(-1), ret_str(0) + sst_thread_arg (const char* c, char** e) + : cmd(c), env(e), ret_str(0), err(-1) { mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL); @@ -405,7 +407,7 @@ static void* sst_joiner_thread (void* a) WSREP_INFO("Running: '%s'", arg->cmd); - wsp::process proc (arg->cmd, "r"); + wsp::process proc (arg->cmd, "r", arg->env); if (proc.pipe() && !proc.error()) { @@ -519,12 +521,44 @@ err: return NULL; } +#define WSREP_SST_AUTH_ENV "WSREP_SST_OPT_AUTH" + +static int sst_append_auth_env(wsp::env& env, const char* sst_auth) +{ + int const sst_auth_size= strlen(WSREP_SST_AUTH_ENV) + 1 /* = */ + + (sst_auth ? strlen(sst_auth) : 0) + 1 /* \0 */; + + wsp::string sst_auth_str(sst_auth_size); // for automatic cleanup on return + if (!sst_auth_str()) return -ENOMEM; + + int ret= snprintf(sst_auth_str(), sst_auth_size, "%s=%s", + WSREP_SST_AUTH_ENV, sst_auth ? sst_auth : ""); + + if (ret < 0 || ret >= sst_auth_size) + { + WSREP_ERROR("sst_append_auth_env(): snprintf() failed: %d", ret); + return (ret < 0 ? ret : -EMSGSIZE); + } + + env.append(sst_auth_str()); + return -env.error(); +} + static ssize_t sst_prepare_other (const char* method, + const char* sst_auth, const char* addr_in, const char** addr_out) { - char cmd_str[1024]; - const char* sst_dir= mysql_real_data_home; + int const cmd_len= 4096; + wsp::string cmd_str(cmd_len); + + if (!cmd_str()) + { + WSREP_ERROR("sst_prepare_other(): could not allocate cmd buffer of %d bytes", + cmd_len); + return -ENOMEM; + } + const char* binlog_opt= ""; char* binlog_opt_val= NULL; @@ -539,35 +573,47 @@ static ssize_t sst_prepare_other (const char* method, make_wsrep_defaults_file(); - ret= snprintf (cmd_str, sizeof(cmd_str), + ret= snprintf (cmd_str(), cmd_len, "wsrep_sst_%s " WSREP_SST_OPT_ROLE" 'joiner' " WSREP_SST_OPT_ADDR" '%s' " - WSREP_SST_OPT_AUTH" '%s' " WSREP_SST_OPT_DATA" '%s' " " %s " WSREP_SST_OPT_PARENT" '%d'" " %s '%s' ", - method, addr_in, (sst_auth_real) ? sst_auth_real : "", - sst_dir, wsrep_defaults_file, (int)getpid(), + method, addr_in, mysql_real_data_home, + wsrep_defaults_file, (int)getpid(), binlog_opt, binlog_opt_val); my_free(binlog_opt_val); - if (ret < 0 || ret >= (int)sizeof(cmd_str)) + if (ret < 0 || ret >= cmd_len) { WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret); return (ret < 0 ? ret : -EMSGSIZE); } + wsp::env env(NULL); + if (env.error()) + { + WSREP_ERROR("sst_prepare_other(): env. var ctor failed: %d", -env.error()); + return -env.error(); + } + + if ((ret= sst_append_auth_env(env, sst_auth))) + { + WSREP_ERROR("sst_prepare_other(): appending auth failed: %d", ret); + return ret; + } + pthread_t tmp; - sst_thread_arg arg(cmd_str); + sst_thread_arg arg(cmd_str(), env()); mysql_mutex_lock (&arg.lock); ret = pthread_create (&tmp, NULL, sst_joiner_thread, &arg); if (ret) { WSREP_ERROR("sst_prepare_other(): pthread_create() failed: %d (%s)", ret, strerror(ret)); - return ret; + return -ret; } mysql_cond_wait (&arg.cond, &arg.lock); @@ -713,7 +759,8 @@ ssize_t wsrep_sst_prepare (void** msg) return 0; } - addr_len = sst_prepare_other (wsrep_sst_method, addr_in, &addr_out); + addr_len = sst_prepare_other (wsrep_sst_method, sst_auth_real, + addr_in, &addr_out); if (addr_len < 0) { WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.", @@ -746,13 +793,13 @@ ssize_t wsrep_sst_prepare (void** msg) } // helper method for donors -static int sst_run_shell (const char* cmd_str, int max_tries) +static int sst_run_shell (const char* cmd_str, char** env, int max_tries) { int ret = 0; for (int tries=1; tries <= max_tries; tries++) { - wsp::process proc (cmd_str, "r"); + wsp::process proc (cmd_str, "r", env); if (NULL != proc.pipe()) { @@ -782,19 +829,12 @@ static void sst_reject_queries(my_bool close_conn) if (TRUE == close_conn) wsrep_close_client_connections(FALSE); } -static int sst_mysqldump_check_addr (const char* user, - const char* pswd, - const char* host, - int port) -{ - return 0; -} - static int sst_donate_mysqldump (const char* addr, const wsrep_uuid_t* uuid, const char* uuid_str, wsrep_seqno_t seqno, - bool bypass) + bool bypass, + char** env) // carries auth info { char host[256]; wsp::Address address(addr); @@ -807,56 +847,46 @@ static int sst_donate_mysqldump (const char* addr, memcpy(host, address.get_address(), address.get_address_len()); int port= address.get_port(); - const char* auth = sst_auth_real; - const char* pswd = (auth) ? strchr (auth, ':') : NULL; - size_t user_len; - if (pswd) - { - pswd += 1; - user_len = pswd - auth; - } - else + int const cmd_len= 4096; + wsp::string cmd_str(cmd_len); + + if (!cmd_str()) { - pswd = ""; - user_len = (auth) ? strlen (auth) + 1 : 1; + WSREP_ERROR("sst_donate_mysqldump(): " + "could not allocate cmd buffer of %d bytes", cmd_len); + return -ENOMEM; } - char *user= (char *) alloca(user_len); + if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE); - strncpy (user, (auth) ? auth : "", user_len - 1); - user[user_len - 1] = '\0'; + make_wsrep_defaults_file(); - int ret = sst_mysqldump_check_addr (user, pswd, host, port); - if (!ret) + int ret= snprintf (cmd_str(), cmd_len, + "wsrep_sst_mysqldump " + WSREP_SST_OPT_HOST" '%s' " + WSREP_SST_OPT_PORT" '%d' " + WSREP_SST_OPT_LPORT" '%u' " + WSREP_SST_OPT_SOCKET" '%s' " + " %s " + WSREP_SST_OPT_GTID" '%s:%lld' " + WSREP_SST_OPT_GTID_DOMAIN_ID" '%d'" + "%s", + host, port, mysqld_port, mysqld_unix_port, + wsrep_defaults_file, uuid_str, + (long long)seqno, wsrep_gtid_domain_id, + bypass ? " "WSREP_SST_OPT_BYPASS : ""); + + if (ret < 0 || ret >= cmd_len) { - char cmd_str[1024]; - - if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE); - - make_wsrep_defaults_file(); - - snprintf (cmd_str, sizeof(cmd_str), - "wsrep_sst_mysqldump " - WSREP_SST_OPT_USER" '%s' " - WSREP_SST_OPT_PSWD" '%s' " - WSREP_SST_OPT_HOST" '%s' " - WSREP_SST_OPT_PORT" '%d' " - WSREP_SST_OPT_LPORT" '%u' " - WSREP_SST_OPT_SOCKET" '%s' " - " %s " - WSREP_SST_OPT_GTID" '%s:%lld' " - WSREP_SST_OPT_GTID_DOMAIN_ID" '%d'" - "%s", - user, pswd, host, port, mysqld_port, mysqld_unix_port, - wsrep_defaults_file, uuid_str, (long long)seqno, - wsrep_gtid_domain_id, bypass ? " "WSREP_SST_OPT_BYPASS : ""); - - WSREP_DEBUG("Running: '%s'", cmd_str); - - ret= sst_run_shell (cmd_str, 3); + WSREP_ERROR("sst_donate_mysqldump(): snprintf() failed: %d", ret); + return (ret < 0 ? ret : -EMSGSIZE); } + WSREP_DEBUG("Running: '%s'", cmd_str()); + + ret= sst_run_shell (cmd_str(), env, 3); + wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)}; wsrep->sst_sent (wsrep, &state_id, ret); @@ -1012,7 +1042,7 @@ static void* sst_donor_thread (void* a) wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can // operate with wsrep_ready == OFF - wsp::process proc(arg->cmd, "r"); + wsp::process proc(arg->cmd, "r", arg->env); err= proc.error(); @@ -1099,9 +1129,19 @@ static int sst_donate_other (const char* method, const char* addr, const char* uuid, wsrep_seqno_t seqno, - bool bypass) + bool bypass, + char** env) // carries auth info { - char cmd_str[4096]; + int const cmd_len= 4096; + wsp::string cmd_str(cmd_len); + + if (!cmd_str()) + { + WSREP_ERROR("sst_donate_other(): " + "could not allocate cmd buffer of %d bytes", cmd_len); + return -ENOMEM; + } + const char* binlog_opt= ""; char* binlog_opt_val= NULL; @@ -1115,11 +1155,10 @@ static int sst_donate_other (const char* method, make_wsrep_defaults_file(); - ret= snprintf (cmd_str, sizeof(cmd_str), + ret= snprintf (cmd_str(), cmd_len, "wsrep_sst_%s " WSREP_SST_OPT_ROLE" 'donor' " WSREP_SST_OPT_ADDR" '%s' " - WSREP_SST_OPT_AUTH" '%s' " WSREP_SST_OPT_SOCKET" '%s' " WSREP_SST_OPT_DATA" '%s' " " %s " @@ -1127,14 +1166,14 @@ static int sst_donate_other (const char* method, WSREP_SST_OPT_GTID" '%s:%lld' " WSREP_SST_OPT_GTID_DOMAIN_ID" '%d'" "%s", - method, addr, sst_auth_real, mysqld_unix_port, - mysql_real_data_home, wsrep_defaults_file, + method, addr, mysqld_unix_port, mysql_real_data_home, + wsrep_defaults_file, binlog_opt, binlog_opt_val, uuid, (long long) seqno, wsrep_gtid_domain_id, bypass ? " "WSREP_SST_OPT_BYPASS : ""); my_free(binlog_opt_val); - if (ret < 0 || ret >= (int) sizeof(cmd_str)) + if (ret < 0 || ret >= cmd_len) { WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret); return (ret < 0 ? ret : -EMSGSIZE); @@ -1143,7 +1182,7 @@ static int sst_donate_other (const char* method, if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE); pthread_t tmp; - sst_thread_arg arg(cmd_str); + sst_thread_arg arg(cmd_str(), env); mysql_mutex_lock (&arg.lock); ret = pthread_create (&tmp, NULL, sst_donor_thread, &arg); if (ret) @@ -1176,18 +1215,32 @@ wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, char uuid_str[37]; wsrep_uuid_print (¤t_gtid->uuid, uuid_str, sizeof(uuid_str)); + wsp::env env(NULL); + if (env.error()) + { + WSREP_ERROR("wsrep_sst_donate_cb(): env var ctor failed: %d", -env.error()); + return WSREP_CB_FAILURE; + } + int ret; + if ((ret= sst_append_auth_env(env, sst_auth_real))) + { + WSREP_ERROR("wsrep_sst_donate_cb(): appending auth env failed: %d", ret); + return WSREP_CB_FAILURE; + } + if (!strcmp (WSREP_SST_MYSQLDUMP, method)) { ret = sst_donate_mysqldump(data, ¤t_gtid->uuid, uuid_str, - current_gtid->seqno, bypass); + current_gtid->seqno, bypass, env()); } else { - ret = sst_donate_other(method, data, uuid_str, current_gtid->seqno,bypass); + ret = sst_donate_other(method, data, uuid_str, + current_gtid->seqno, bypass, env()); } - return (ret > 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE); + return (ret >= 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE); } void wsrep_SE_init_grab() diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index 9e608f94848..ab09a9e3a99 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -215,6 +215,7 @@ void wsrep_replay_transaction(THD *thd) */ MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); thd->m_statement_psi= NULL; + thd->m_digest= NULL; thd_proc_info(thd, "wsrep replaying trx"); WSREP_DEBUG("replay trx: %s %lld", thd->query() ? thd->query() : "void", @@ -274,8 +275,10 @@ void wsrep_replay_transaction(THD *thd) wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle); break; default: - WSREP_ERROR("trx_replay failed for: %d, query: %s", - rcode, thd->query() ? thd->query() : "void"); + WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s", + rcode, + (thd->db ? thd->db : "(null)"), + thd->query() ? thd->query() : "void"); /* we're now in inconsistent state, must abort */ /* http://bazaar.launchpad.net/~codership/codership-mysql/5.6/revision/3962#sql/wsrep_thd.cc */ @@ -608,3 +611,8 @@ int wsrep_thd_in_locking_session(void *thd_ptr) return 0; } +bool wsrep_thd_has_explicit_locks(THD *thd) +{ + assert(thd); + return thd->mdl_context.has_explicit_locks(); +} diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc index 2998a5b04ac..56f1018deac 100644 --- a/sql/wsrep_utils.cc +++ b/sql/wsrep_utils.cc @@ -63,7 +63,7 @@ wsrep_prepend_PATH (const char* path) size_t const new_path_len(strlen(old_path) + strlen(":") + strlen(path) + 1); - char* const new_path (reinterpret_cast<char*>(malloc(new_path_len))); + char* const new_path (static_cast<char*>(malloc(new_path_len))); if (new_path) { @@ -89,6 +89,90 @@ wsrep_prepend_PATH (const char* path) namespace wsp { +bool +env::ctor_common(char** e) +{ + env_ = static_cast<char**>(malloc((len_ + 1) * sizeof(char*))); + + if (env_) + { + for (size_t i(0); i < len_; ++i) + { + assert(e[i]); // caller should make sure about len_ + env_[i] = strdup(e[i]); + if (!env_[i]) + { + errno_ = errno; + WSREP_ERROR("Failed to allocate env. var: %s", e[i]); + return true; + } + } + + env_[len_] = NULL; + return false; + } + else + { + errno_ = errno; + WSREP_ERROR("Failed to allocate env. var vector of length: %zu", len_); + return true; + } +} + +void +env::dtor() +{ + if (env_) + { + /* don't need to go beyond the first NULL */ + for (size_t i(0); env_[i] != NULL; ++i) { free(env_[i]); } + free(env_); + env_ = NULL; + } + len_ = 0; +} + +env::env(char** e) + : len_(0), env_(NULL), errno_(0) +{ + if (!e) { e = environ; } + /* count the size of the vector */ + while (e[len_]) { ++len_; } + + if (ctor_common(e)) dtor(); +} + +env::env(const env& e) + : len_(e.len_), env_(0), errno_(0) +{ + if (ctor_common(e.env_)) dtor(); +} + +env::~env() { dtor(); } + +int +env::append(const char* val) +{ + char** tmp = static_cast<char**>(realloc(env_, (len_ + 2)*sizeof(char*))); + + if (tmp) + { + env_ = tmp; + env_[len_] = strdup(val); + + if (env_[len_]) + { + ++len_; + env_[len_] = NULL; + } + else errno_ = errno; + } + else errno_ = errno; + + return errno_; +} + + #define PIPE_READ 0 #define PIPE_WRITE 1 #define STDIN_FD 0 @@ -98,7 +182,7 @@ namespace wsp # define POSIX_SPAWN_USEVFORK 0 #endif -process::process (const char* cmd, const char* type) +process::process (const char* cmd, const char* type, char** env) : str_(cmd ? strdup(cmd) : strdup("")), io_(NULL), err_(EINVAL), pid_(0) { if (0 == str_) @@ -120,6 +204,8 @@ process::process (const char* cmd, const char* type) return; } + if (NULL == env) { env = environ; } // default to global environment + int pipe_fds[2] = { -1, }; if (::pipe(pipe_fds)) { @@ -215,7 +301,7 @@ process::process (const char* cmd, const char* type) goto cleanup_fact; } - err_ = posix_spawnp (&pid_, pargv[0], &fact, &attr, pargv, environ); + err_ = posix_spawnp (&pid_, pargv[0], &fact, &attr, pargv, env); if (err_) { WSREP_ERROR ("posix_spawnp(%s) failed: %d (%s)", @@ -309,6 +395,7 @@ process::wait () { case 126: err_ = EACCES; break; /* Permission denied */ case 127: err_ = ENOENT; break; /* No such file or directory */ + case 143: err_ = EINTR; break; /* Subprocess killed */ } WSREP_ERROR("Process completed with error: %s: %d (%s)", str_, err_, strerror(err_)); diff --git a/sql/wsrep_utils.h b/sql/wsrep_utils.h index 32829c605fb..ed699eabec9 100644 --- a/sql/wsrep_utils.h +++ b/sql/wsrep_utils.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2013 Codership Oy <info@codership.com> +/* Copyright (C) 2013-2015 Codership Oy <info@codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -236,6 +236,25 @@ private: extern wsp::Config_state wsrep_config_state; namespace wsp { +/* a class to manage env vars array */ +class env +{ +private: + size_t len_; + char** env_; + int errno_; + bool ctor_common(char** e); + void dtor(); + env& operator =(env); +public: + explicit env(char** env); + explicit env(const env&); + ~env(); + int append(const char* var); /* add a new env. var */ + int error() const { return errno_; } + char** operator()() { return env_; } +}; + /* A small class to run external programs. */ class process { @@ -248,8 +267,9 @@ private: public: /*! @arg type is a pointer to a null-terminated string which must contain either the letter 'r' for reading or the letter 'w' for writing. + @arg env optional null-terminated vector of environment variables */ - process (const char* cmd, const char* type); + process (const char* cmd, const char* type, char** env); ~process (); FILE* pipe () { return io_; } @@ -282,6 +302,8 @@ class string { public: string() : string_(0) {} + explicit string(size_t s) : string_(static_cast<char*>(malloc(s))) {} + char* operator()() { return string_; } void set(char* str) { if (string_) free (string_); string_ = str; } ~string() { set (0); } private: diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index d6aa1ca5c79..9c01e54f48d 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -512,14 +512,18 @@ bool wsrep_desync_update (sys_var *self, THD* thd, enum_var_type type) if (wsrep_desync) { ret = wsrep->desync (wsrep); if (ret != WSREP_OK) { - WSREP_WARN ("SET desync failed %d for %s", ret, thd->query()); + WSREP_WARN ("SET desync failed %d for schema: %s, query: %s", ret, + (thd->db ? thd->db : "(null)"), + thd->query()); my_error (ER_CANNOT_USER, MYF(0), "'desync'", thd->query()); return true; } } else { ret = wsrep->resync (wsrep); if (ret != WSREP_OK) { - WSREP_WARN ("SET resync failed %d for %s", ret, thd->query()); + WSREP_WARN ("SET resync failed %d for schema: %s, query: %s", ret, + (thd->db ? thd->db : "(null)"), + thd->query()); my_error (ER_CANNOT_USER, MYF(0), "'resync'", thd->query()); return true; } |